Compare commits

...

6 Commits

Author SHA1 Message Date
Kit Langton
c3eb2418ca fix(sync): preserve shared state and caller semantics 2026-04-11 00:10:41 -04:00
Kit Langton
397adb4190 fix(sync): restore instance context for deferred publish 2026-04-11 00:10:41 -04:00
Kit Langton
dbc136d3b3 Merge branch 'dev' into kit/effect-sync-event 2026-04-10 23:54:56 -04:00
Kit Langton
e2c41dce99 Merge branch 'dev' into kit/effect-sync-event 2026-04-10 23:51:34 -04:00
Kit Langton
f8580ab9f7 Merge branch 'dev' into kit/effect-sync-event 2026-04-10 23:42:10 -04:00
Kit Langton
af7ada9a7a refactor(sync): migrate SyncEvent to service layer 2026-04-10 23:34:38 -04:00
4 changed files with 282 additions and 198 deletions

View File

@@ -366,11 +366,12 @@ export namespace Session {
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> = Layer.effect(
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const storage = yield* Storage.Service
const sync = yield* SyncEvent.Service
const createNext = Effect.fn("Session.createNext")(function* (input: {
id?: SessionID
@@ -398,7 +399,7 @@ export namespace Session {
}
log.info("created", result)
yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
yield* sync.run(Event.Created, { sessionID: result.id, info: result })
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
// This only exist for backwards compatibility. We should not be
@@ -446,10 +447,12 @@ export namespace Session {
Effect.catchCause(() => Effect.succeed(false)),
)
yield* Effect.sync(() => {
SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
SyncEvent.remove(sessionID)
})
if (hasInstance) {
yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: true })
} else {
yield* Effect.sync(() => SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: false }))
}
yield* sync.remove(sessionID)
} catch (e) {
log.error(e)
}
@@ -457,19 +460,17 @@ export namespace Session {
const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
return msg
}).pipe(Effect.withSpan("Session.updateMessage"))
const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
}),
)
yield* sync.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
})
return part
}).pipe(Effect.withSpan("Session.updatePart"))
@@ -549,8 +550,7 @@ export namespace Session {
return session
})
const patch = (sessionID: SessionID, info: Patch) =>
Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info }))
const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info })
const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
yield* patch(sessionID, { time: { updated: Date.now() } })
@@ -607,12 +607,10 @@ export namespace Session {
sessionID: SessionID
messageID: MessageID
}) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
}),
)
yield* sync.run(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
})
return input.messageID
})
@@ -621,13 +619,11 @@ export namespace Session {
messageID: MessageID
partID: PartID
}) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
}),
)
yield* sync.run(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
})
return input.partID
})
@@ -678,7 +674,11 @@ export namespace Session {
}),
)
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer))
export const defaultLayer = layer.pipe(
Layer.provide(Bus.layer),
Layer.provide(Storage.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
)
const { runPromise } = makeRuntime(Service, defaultLayer)

View File

@@ -40,6 +40,7 @@ export namespace SessionRevert {
const bus = yield* Bus.Service
const summary = yield* SessionSummary.Service
const state = yield* SessionRunState.Service
const sync = yield* SyncEvent.Service
const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
yield* state.assertNotBusy(input.sessionID)
@@ -123,7 +124,7 @@ export namespace SessionRevert {
remove.push(msg)
}
for (const msg of remove) {
SyncEvent.run(MessageV2.Event.Removed, {
yield* sync.run(MessageV2.Event.Removed, {
sessionID,
messageID: msg.info.id,
})
@@ -135,7 +136,7 @@ export namespace SessionRevert {
const removeParts = target.parts.slice(idx)
target.parts = target.parts.slice(0, idx)
for (const part of removeParts) {
SyncEvent.run(MessageV2.Event.PartRemoved, {
yield* sync.run(MessageV2.Event.PartRemoved, {
sessionID,
messageID: target.info.id,
partID: part.id,
@@ -158,6 +159,7 @@ export namespace SessionRevert {
Layer.provide(Storage.defaultLayer),
Layer.provide(Bus.layer),
Layer.provide(SessionSummary.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
),
)

View File

@@ -24,20 +24,19 @@ export namespace SessionShare {
const session = yield* Session.Service
const shareNext = yield* ShareNext.Service
const scope = yield* Scope.Scope
const sync = yield* SyncEvent.Service
const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
const conf = yield* cfg.get()
if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
const result = yield* shareNext.create(sessionID)
yield* Effect.sync(() =>
SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
)
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } })
return result
})
const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
yield* shareNext.remove(sessionID)
yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })
})
const create = Effect.fn("SessionShare.create")(function* (input?: Parameters<typeof Session.create>[0]) {
@@ -57,6 +56,7 @@ export namespace SessionShare {
Layer.provide(ShareNext.defaultLayer),
Layer.provide(Session.defaultLayer),
Layer.provide(Config.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
)
const { runPromise } = makeRuntime(Service, defaultLayer)

View File

@@ -1,14 +1,20 @@
import z from "zod"
import type { ZodObject } from "zod"
import { EventEmitter } from "events"
import { Context, Effect, Layer } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import { Database, eq } from "@/storage/db"
import { Bus as ProjectBus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { EventSequenceTable, EventTable } from "./event.sql"
import { EventID } from "./schema"
import { Flag } from "@/flag/flag"
import { Instance } from "@/project/instance"
export namespace SyncEvent {
type Convert = (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
export type Definition = {
type: string
version: number
@@ -30,38 +36,249 @@ export namespace SyncEvent {
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
type Init = { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: Convert }
type Payload = { def: Definition; event: Event }
export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()
let frozen = false
let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
let projectors: Map<Definition, ProjectorFunc> | undefined
let convert: Convert = (_, data) => data as Record<string, unknown>
const bus = new EventEmitter<{ event: [Payload] }>()
const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>()
export function reset() {
frozen = false
projectors = undefined
convertEvent = (_, data) => data
export interface Interface {
readonly reset: () => Effect.Effect<void>
readonly init: (input: Init) => Effect.Effect<void>
readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect<void>
readonly run: <Def extends Definition>(
def: Def,
data: Event<Def>["data"],
options?: { publish?: boolean },
) => Effect.Effect<void>
readonly remove: (aggregateID: string) => Effect.Effect<void>
readonly subscribeAll: (handler: (event: Payload) => void) => Effect.Effect<() => void>
readonly payloads: () => z.ZodTypeAny
}
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
projectors = new Map(input.projectors)
export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
// Install all the latest event defs to the bus. We only ever emit
// latest versions from code, and keep around old versions for
// replaying. Replaying does not go through the bus, and it
// simplifies the bus to only use unversioned latest events
for (let [type, version] of versions.entries()) {
let def = registry.get(versionedType(type, version))!
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const reset = Effect.fn("SyncEvent.reset")(() =>
Effect.sync(() => {
frozen = false
projectors = undefined
convert = (_, data) => data as Record<string, unknown>
}),
)
BusEvent.define(def.type, def.properties || def.schema)
}
const init = Effect.fn("SyncEvent.init")((input: Init) =>
Effect.sync(() => {
projectors = new Map(input.projectors)
// Freeze the system so it clearly errors if events are defined
// after `init` which would cause bugs
frozen = true
convertEvent = input.convertEvent || ((_, data) => data)
// Install all the latest event defs to the bus. We only ever emit
// latest versions from code, and keep around old versions for
// replaying. Replaying does not go through the bus, and it
// simplifies the bus to only use unversioned latest events
for (const [type, version] of versions.entries()) {
const def = registry.get(versionedType(type, version))!
BusEvent.define(def.type, def.properties || def.schema)
}
// Freeze the system so it clearly errors if events are defined
// after `init` which would cause bugs
frozen = true
convert = input.convertEvent || ((_, data) => data as Record<string, unknown>)
}),
)
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
const projector = projectors.get(def)
if (!projector) {
throw new Error(`Projector not found for event: ${def.type}`)
}
// idempotent: need to ignore any events already logged
Database.transaction((tx) => {
projector(tx, event.data)
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
tx.insert(EventSequenceTable)
.values({
aggregate_id: event.aggregateID,
seq: event.seq,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq: event.seq },
})
.run()
tx.insert(EventTable)
.values({
id: event.id,
seq: event.seq,
aggregate_id: event.aggregateID,
type: versionedType(def.type, def.version),
data: event.data as Record<string, unknown>,
})
.run()
}
Database.effect(
InstanceState.bind(() => {
bus.emit("event", { def, event })
if (!options.publish) return
const result = convert(def.type, event.data)
if (result instanceof Promise) {
void result.then(
InstanceState.bind((data) => {
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
}),
)
return
}
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
}),
)
})
}
function replay(event: SerializedEvent, options?: { publish: boolean }) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) return
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(
`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
)
}
process(def, event, { publish: !!options?.publish })
}
function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
const agg = (data as Record<string, string>)[def.aggregate]
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
const publish = options?.publish ?? true
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish })
},
{
behavior: "immediate",
},
)
}
function remove(aggregateID: string) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
}
function subscribeAll(handler: (event: Payload) => void) {
bus.on("event", handler)
return () => bus.off("event", handler)
}
function payloads() {
return z
.union(
registry
.entries()
.map(([type, def]) => {
return z
.object({
type: z.literal(type),
aggregate: z.literal(def.aggregate),
data: def.schema,
})
.meta({
ref: "SyncEvent" + "." + def.type,
})
})
.toArray() as any,
)
.meta({
ref: "SyncEvent",
})
}
return Service.of({
reset,
init,
replay: (event, options) =>
Effect.gen(function* () {
const ctx = yield* InstanceState.context
return yield* Effect.sync(() => Instance.restore(ctx, () => replay(event, options)))
}),
run: (def, data, options) =>
options?.publish === false
? Effect.sync(() => run(def, data, options))
: Effect.gen(function* () {
const ctx = yield* InstanceState.context
return yield* Effect.sync(() => Instance.restore(ctx, () => run(def, data, options)))
}),
remove: (aggregateID) => Effect.sync(() => remove(aggregateID)),
subscribeAll: (handler) => Effect.sync(() => subscribeAll(handler)),
payloads,
})
}),
)
export const defaultLayer = layer
const { runSync } = makeRuntime(Service, defaultLayer)
export function reset() {
return runSync((svc) => svc.reset())
}
export function init(input: Init) {
return runSync((svc) => svc.init(input))
}
export function versionedType<A extends string>(type: A): A
@@ -102,63 +319,6 @@ export namespace SyncEvent {
return [def, func as ProjectorFunc]
}
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
const projector = projectors.get(def)
if (!projector) {
throw new Error(`Projector not found for event: ${def.type}`)
}
// idempotent: need to ignore any events already logged
Database.transaction((tx) => {
projector(tx, event.data)
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
tx.insert(EventSequenceTable)
.values({
aggregate_id: event.aggregateID,
seq: event.seq,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq: event.seq },
})
.run()
tx.insert(EventTable)
.values({
id: event.id,
seq: event.seq,
aggregate_id: event.aggregateID,
type: versionedType(def.type, def.version),
data: event.data as Record<string, unknown>,
})
.run()
}
Database.effect(() => {
Bus.emit("event", {
def,
event,
})
if (options?.publish) {
const result = convertEvent(def.type, event.data)
if (result instanceof Promise) {
result.then((data) => {
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
})
} else {
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
}
}
})
})
}
// TODO:
//
// * Support applying multiple events at one time. One transaction,
@@ -166,100 +326,22 @@ export namespace SyncEvent {
// * when loading events from db, apply zod validation to ensure shape
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) {
return
}
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
}
process(def, event, { publish: !!options?.publish })
return runSync((svc) => svc.replay(event, options))
}
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
const agg = (data as Record<string, string>)[def.aggregate]
// This should never happen: we've enforced it via typescript in
// the definition
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
const { publish = true } = options || {}
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish })
},
{
behavior: "immediate",
},
)
return runSync((svc) => svc.run(def, data, options))
}
export function remove(aggregateID: string) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
return runSync((svc) => svc.remove(aggregateID))
}
export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) {
Bus.on("event", handler)
return () => Bus.off("event", handler)
return runSync((svc) => svc.subscribeAll(handler))
}
export function payloads() {
return z
.union(
registry
.entries()
.map(([type, def]) => {
return z
.object({
type: z.literal(type),
aggregate: z.literal(def.aggregate),
data: def.schema,
})
.meta({
ref: "SyncEvent" + "." + def.type,
})
})
.toArray() as any,
)
.meta({
ref: "SyncEvent",
})
return runSync((svc) => Effect.sync(() => svc.payloads()))
}
}