This commit is contained in:
James Long
2026-03-22 20:47:00 -04:00
parent 83a8471315
commit 4123ff7424
6 changed files with 123 additions and 31 deletions

View File

@@ -2,21 +2,12 @@ import z from "zod"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { BusEvent } from "./bus-event"
import { SyncEvent } from "../sync"
import { GlobalBus } from "./global"
export namespace Bus {
const log = Log.create({ service: "bus" })
type Subscription = (event: any) => void
type SubscribableProperties<Def> = Def extends { data: infer Data }
? z.infer<Data>
: Def extends { properties: infer Properties }
? z.infer<Properties>
: never
type SubscribableDefinition = BusEvent.Definition | SyncEvent.Definition
export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
z.object({
@@ -72,16 +63,19 @@ export namespace Bus {
return Promise.all(pending)
}
export function subscribe<Def extends SubscribableDefinition>(
def: Def,
callback: (event: { type: Def["type"]; properties: SubscribableProperties<Def> }) => void,
export function subscribe<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: { type: Definition["type"]; properties: z.output<Definition["properties"]> }) => void,
) {
return raw(def.type, callback)
}
export function once<Definition extends SubscribableDefinition>(
export function once<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: { type: Definition["type"]; properties: SubscribableProperties<Definition> }) => "done" | undefined,
callback: (event: {
type: Definition["type"]
properties: z.output<Definition["properties"]>
}) => "done" | undefined,
) {
const unsub = subscribe(def, (event) => {
if (callback(event)) unsub()
@@ -108,11 +102,4 @@ export namespace Bus {
match.splice(index, 1)
}
}
export function fromSyncDefinition(def: SyncEvent.Definition) {
return {
type: def.type,
properties: def.data,
}
}
}

View File

@@ -161,6 +161,8 @@ export const GlobalRoutes = lazy(() =>
c.header("X-Content-Type-Options", "nosniff")
return streamEvents(c, (q) => {
return SyncEvent.subscribeAll(({ def, event }) => {
// TODO: don't pass def, just pass the type (and it should
// be versioned)
q.push(
JSON.stringify({
payload: {

View File

@@ -203,6 +203,10 @@ export namespace Session {
time: Info.shape.time.partial().optional(),
}),
}),
busSchema: z.object({
sessionID: SessionID.zod,
info: Info,
}),
}),
Deleted: SyncEvent.define({
type: "session.deleted",
@@ -229,6 +233,8 @@ export namespace Session {
),
}
type X = z.infer<(typeof Event.Updated)["properties"]>
export const create = fn(
z
.object({

View File

@@ -1 +0,0 @@
james@james-6.local.34647:1773877465

View File

@@ -0,0 +1,94 @@
# Goal
## Syncing with only one writer
This system defines a basic event sourcing system for session replayability. The goal is to allow for one device to control and modify the session, and allow multiple other devices to "sync" session data. The sync works by getting a log of events to replay and replaying them locally.
Because only one device is allowed to write, we don't need any kind of sophisticated distributed system clocks or causal ordering. We implement total ordering with a simple sequence id (a number) and increment it by one every time we generate an event.
## Bus event integration and backwards compatibility
This initial implementation aims to be fully backwards compatible. We should be able to land this without any visible changes to the user.
An existing `Bus` abstraction to send events already exists. We already send events like `session.created` through the system. We should not duplicate this.
The difference in event sourcing is events are sent _before_ the mutation happens, and "projectors" handle the effects and perform the mutations. This difference is subtle, and a necessary change for syncing to work.
So the goal is:
* Introduce a new syncing abstraction to handle event sourcing and projectors
* Seamlessly integrate these new events into the same existing `Bus` abstraction
* Maintain full backwards compatibility to reduce risk
## My approach
This directory introduces a new abstraction: `SyncEvent`. This handles all of the event sourcing.
There are now "sync events" which are different than "bus events". Bus events are defined like this:
```ts
const Diff = BusEvent.define(
"session.diff",
z.object({
sessionID: SessionID.zod,
diff: Snapshot.FileDiff.array(),
}),
)
```
You can do `Bus.publish(Diff, { ... })` to push these events, and `Bus.subscribe(Diff, handler)` to listen to them.
Sync events are a lower-level abstraction which are similar, but also handle the requirements for recording and replaying. Defining them looks like this:
```ts
const Created = SyncEvent.define({
type: "session.created",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info,
}),
})
```
Not too different, except they track a version and an "aggregate" field (will explain that later).
You do this to run an event, which is kind of like `Bus.publish` except that it runs through the event sourcing system:
```
SyncEvent.run(Created, { ... })
```
Importantly, **sync events automatically re-publish as bus events**. This makes them backwards compatible, and allows the `Bus` to still be the single abstraction that the system uses to listen for individual events.
**We have upgraded many of the session events to be sync events** (all of the ones that mutate the db). Sync and bus events are largely compatible. Here are the differences:
### Event shape
* The shape of the events are slightly different. A sync event has the `type`, `id`, `seq`, `aggregateID`, and `data` fields. A bus event has the `type` and `properties` fields. `data` and `properties` are largely the same thing. This conversation is automatically handled when the sync system re-published the event throught the bus.
The reason for this is because sync events need to track more information. I chose not to copy the `properties` naming to more clearly disambiguate the event types.
### Event flow
There is no way to subscribe to individual sync events in `SyncEvent`. You can use `subscribeAll` to receive _all_ of the events, which is needed for clients that want to record them.
To listen for individual events, use `Bus.subscribe`. You can pass in a sync event definition to it: `Bus.subscribe(Created, handler)`. This is fully supported.
You should never "publish" a sync event however: `Bus.publish(Created, ...)`. This will throw a type error on purpose; sync events must always be run through the sync system directly.
### Backwards compatibility
The system install projectors in `server/projectors.js`. It calls `SyncEvent.init` to do this. It also installs two different hooks for providing backwards compatibility:
* `convertDefinition`: a function that convert a zod definition for an event schema
* `convertEvent`: a function that converts an individual event's data
These hooks allow for arbitrary conversions at runtime, allowing for us to provide a backwards compatible interface to clients.
For example, the sync system changed the `session.updated` event to only include the fields that were changed, compared to before where it returned the full session object. We install converters to load the full session object and return it to clients.
**Important**:

View File

@@ -13,14 +13,16 @@ export namespace SyncEvent {
type: string
version: number
aggregate: string
data: z.ZodObject
schema: z.ZodObject
properties: z.ZodObject
}
export type Event<Def extends Definition = Definition> = {
id: string
seq: number
aggregateID: string
data: z.infer<Def["data"]>
data: z.infer<Def["schema"]>
}
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
@@ -37,7 +39,6 @@ export namespace SyncEvent {
export function init(input: {
projectors: Array<[Definition, ProjectorFunc]>
convertDefinition?: (type: string, data: ZodObject) => ZodObject
convertEvent?: Exclude<typeof convertEvent, undefined>
}) {
projectors = new Map(input.projectors)
@@ -48,9 +49,8 @@ export namespace SyncEvent {
// simplifies the bus to only use unversioned latest events
for (let [type, version] of versions.entries()) {
let def = registry.get(versionedType(type, version))!
let data = def.data
BusEvent.define(def.type, input.convertDefinition ? input.convertDefinition(type, data) : data)
BusEvent.define(def.type, def.properties || def.schema)
}
// Freeze the system so it clearly errors if events are defined
@@ -69,7 +69,8 @@ export namespace SyncEvent {
Type extends string,
Agg extends string,
Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
>(input: { type: Type; version: number; aggregate: Agg; schema: Schema }) {
BusSchema extends ZodObject<Record<Agg, z.ZodType<string>>> | undefined,
>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema: BusSchema }) {
if (frozen) {
throw new Error("Error defining sync event: sync system has been frozen")
}
@@ -77,8 +78,11 @@ export namespace SyncEvent {
const def = {
type: input.type,
version: input.version,
data: input.schema,
aggregate: input.aggregate,
schema: input.schema,
properties: (input.busSchema ?? input.schema) as BusSchema extends ZodObject<Record<Agg, z.ZodType<string>>>
? BusSchema
: Schema,
}
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
@@ -199,7 +203,7 @@ export namespace SyncEvent {
ProjectBus.publish(
{
type: def.type,
properties: def.data,
properties: def.schema,
},
convertEvent ? convertEvent(def.type, event.data) : event.data,
)
@@ -226,7 +230,7 @@ export namespace SyncEvent {
.object({
type: z.literal(type),
aggregate: z.literal(def.aggregate),
data: def.data,
data: def.schema,
})
.meta({
ref: "SyncEvent" + "." + def.type,