Compare commits

...

2 Commits

Author SHA1 Message Date
Kit Langton
2e6d7bb517 fix(sync): keep event application synchronous 2026-04-01 23:49:22 -04:00
Kit Langton
dc719269b6 refactor(sync): effectify sync event 2026-04-01 23:31:36 -04:00

View File

@@ -1,6 +1,8 @@
import z from "zod"
import type { ZodObject } from "zod"
import { EventEmitter } from "events"
import { Effect, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service"
import { Database, eq } from "@/storage/db"
import { Bus as ProjectBus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
@@ -31,37 +33,18 @@ export namespace SyncEvent {
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()
let frozen = false
let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>()
export function reset() {
frozen = false
projectors = undefined
convertEvent = (_, data) => data
type State = {
projectors: Map<Definition, ProjectorFunc> | undefined
convert: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
bus: EventEmitter<{ event: [{ def: Definition; event: Event }] }>
}
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
projectors = new Map(input.projectors)
export const registry = new Map<string, Definition>()
const versions = new Map<string, number>()
let frozen = false
// Install all the latest event defs to the bus. We only ever emit
// latest versions from code, and keep around old versions for
// replaying. Replaying does not go through the bus, and it
// simplifies the bus to only use unversioned latest events
for (let [type, version] of versions.entries()) {
let def = registry.get(versionedType(type, version))!
BusEvent.define(def.type, def.properties || def.schema)
}
// Freeze the system so it clearly errors if events are defined
// after `init` which would cause bugs
frozen = true
convertEvent = input.convertEvent || ((_, data) => data)
function noop(_: string, data: Event["data"]) {
return data
}
export function versionedType<A extends string>(type: A): A
@@ -102,140 +85,222 @@ export namespace SyncEvent {
return [def, func as ProjectorFunc]
}
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
const projector = projectors.get(def)
if (!projector) {
throw new Error(`Projector not found for event: ${def.type}`)
}
// idempotent: need to ignore any events already logged
Database.transaction((tx) => {
projector(tx, event.data)
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
tx.insert(EventSequenceTable)
.values({
aggregate_id: event.aggregateID,
seq: event.seq,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq: event.seq },
})
.run()
tx.insert(EventTable)
.values({
id: event.id,
seq: event.seq,
aggregate_id: event.aggregateID,
type: versionedType(def.type, def.version),
data: event.data as Record<string, unknown>,
})
.run()
}
Database.effect(() => {
Bus.emit("event", {
def,
event,
})
if (options?.publish) {
const result = convertEvent(def.type, event.data)
if (result instanceof Promise) {
result.then((data) => {
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
})
} else {
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
}
}
})
})
export interface Interface {
readonly reset: () => Effect.Effect<void>
readonly init: (input: {
projectors: Array<[Definition, ProjectorFunc]>
convertEvent?: State["convert"]
}) => Effect.Effect<void>
readonly replay: (event: SerializedEvent, options?: { republish: boolean }) => Effect.Effect<void>
readonly run: <Def extends Definition>(def: Def, data: Event<Def>["data"]) => Effect.Effect<void>
readonly remove: (aggregateID: string) => Effect.Effect<void>
readonly subscribeAll: (handler: (event: { def: Definition; event: Event }) => void) => Effect.Effect<() => void>
}
// TODO:
//
// * Support applying multiple events at one time. One transaction,
// and it validets all the sequence ids
// * when loading events from db, apply zod validation to ensure shape
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SyncEvent") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const state: State = {
projectors: undefined,
convert: noop,
bus: new EventEmitter<{ event: [{ def: Definition; event: Event }] }>(),
}
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
if (state.projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
const projector = state.projectors.get(def)
if (!projector) {
throw new Error(`Projector not found for event: ${def.type}`)
}
Database.transaction((tx) => {
projector(tx, event.data)
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
tx.insert(EventSequenceTable)
.values({
aggregate_id: event.aggregateID,
seq: event.seq,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq: event.seq },
})
.run()
tx.insert(EventTable)
.values({
id: event.id,
seq: event.seq,
aggregate_id: event.aggregateID,
type: versionedType(def.type, def.version),
data: event.data as Record<string, unknown>,
})
.run()
}
Database.effect(() => {
state.bus.emit("event", { def, event })
if (!options.publish) return
const result = state.convert(def.type, event.data)
if (result instanceof Promise) {
result.then((data) => {
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
})
return
}
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
})
})
}
const reset = Effect.fn("SyncEvent.reset")(() =>
Effect.sync(() => {
frozen = false
state.projectors = undefined
state.convert = noop
}),
)
const init = Effect.fn("SyncEvent.init")(
(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) =>
Effect.sync(() => {
state.projectors = new Map(input.projectors)
for (const [type, version] of versions.entries()) {
const def = registry.get(versionedType(type, version))!
BusEvent.define(def.type, def.properties)
}
frozen = true
state.convert = input.convertEvent || noop
}),
)
// TODO:
//
// * Support applying multiple events at one time. One transaction,
// and it validets all the sequence ids
// * when loading events from db, apply zod validation to ensure shape
const replay = Effect.fn("SyncEvent.replay")(function* (
event: SerializedEvent,
options?: { republish: boolean },
) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) {
return
}
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(
`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
)
}
yield* Effect.sync(() => process(def, event, { publish: !!options?.republish }))
})
const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* <Def extends Definition>(
def: Def,
data: Event<Def>["data"],
) {
const agg = (data as Record<string, string>)[def.aggregate]
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish: true })
},
{
behavior: "immediate",
},
)
})
const remove = Effect.fn("SyncEvent.remove")((aggregateID: string) =>
Effect.sync(() => {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
}),
)
const subscribeAll = Effect.fn("SyncEvent.subscribeAll")(
(handler: (event: { def: Definition; event: Event }) => void) =>
Effect.sync(() => {
state.bus.on("event", handler)
return () => state.bus.off("event", handler)
}),
)
return Service.of({ reset, init, replay, run, remove, subscribeAll })
}),
)
export const defaultLayer = layer
const { runSync } = makeRuntime(Service, defaultLayer)
export function reset() {
return runSync((svc) => svc.reset())
}
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) {
return runSync((svc) => svc.init(input))
}
export function replay(event: SerializedEvent, options?: { republish: boolean }) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) {
return
}
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
}
process(def, event, { publish: !!options?.republish })
return runSync((svc) => svc.replay(event, options))
}
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"]) {
const agg = (data as Record<string, string>)[def.aggregate]
// This should never happen: we've enforced it via typescript in
// the definition
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish: true })
},
{
behavior: "immediate",
},
)
return runSync((svc) => svc.run(def, data))
}
export function remove(aggregateID: string) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
return runSync((svc) => svc.remove(aggregateID))
}
export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) {
Bus.on("event", handler)
return () => Bus.off("event", handler)
return runSync((svc) => svc.subscribeAll(handler))
}
export function payloads() {