diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index 00353dcca3..e7e65f8901 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -641,7 +641,7 @@ export const layer = Layer.effect( // "claim" this session so any future events coming from // the old workspace are ignored - SyncEvent.claim(input.sessionID, input.workspaceID ?? previous.projectID) + yield* sync.claim(input.sessionID, input.workspaceID ?? previous.projectID) } } diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 931f5e84c8..5c29101b6c 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -10,7 +10,6 @@ import { EventID } from "./schema" import { Flag } from "@opencode-ai/core/flag/flag" import { Context, Effect, Layer, Schema as EffectSchema } from "effect" import type { DeepMutable } from "@opencode-ai/core/schema" -import { makeRuntime } from "@/effect/run-service" import { serviceUse } from "@/effect/service-use" import { InstanceState } from "@/effect/instance-state" @@ -63,6 +62,7 @@ export interface Interface { options?: { publish: boolean; ownerID?: string }, ) => Effect.Effect readonly remove: (aggregateID: string) => Effect.Effect + readonly claim: (aggregateID: string, ownerID: string) => Effect.Effect } export class Service extends Context.Service()("@opencode/SyncEvent") {} @@ -175,11 +175,24 @@ export const layer = Layer.effect(Service)( }) }) + const claim: Interface["claim"] = Effect.fn("SyncEvent.claim")((aggregateID, ownerID) => + Effect.sync(() => + Database.use((db) => + db + .update(EventSequenceTable) + .set({ owner_id: ownerID }) + .where(eq(EventSequenceTable.aggregate_id, aggregateID)) + .run(), + ), + ), + ) + return Service.of({ run, replay, replayAll, remove, + claim, }) }), ) @@ -188,8 +201,6 @@ export const defaultLayer = layer export const use = serviceUse(Service) -const runtime = makeRuntime(Service, defaultLayer) - export const registry = new Map() let projectors: Map | undefined const versions = new Map() @@ -336,32 +347,6 @@ function process( }) } -export function replay(event: SerializedEvent, options?: { publish: boolean; ownerID?: string }) { - return runtime.runSync((sync) => sync.replay(event, options)) -} - -export function replayAll(events: SerializedEvent[], options?: { publish: boolean; ownerID?: string }) { - return runtime.runSync((sync) => sync.replayAll(events, options)) -} - -export function run(def: Def, data: Event["data"], options?: { publish?: boolean }) { - return runtime.runSync((sync) => sync.run(def, data, options)) -} - -export function remove(aggregateID: string) { - return runtime.runSync((sync) => sync.remove(aggregateID)) -} - -export function claim(aggregateID: string, ownerID: string) { - Database.use((db) => - db - .update(EventSequenceTable) - .set({ owner_id: ownerID }) - .where(eq(EventSequenceTable.aggregate_id, aggregateID)) - .run(), - ) -} - export function effectPayloads() { return registry .entries() diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index 0986b39044..10f593a571 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -4,7 +4,7 @@ import { Effect, Layer, Schema } from "effect" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { Bus } from "../../src/bus" import { SyncEvent } from "../../src/sync" -import { Database } from "@/storage/db" +import { Database, eq } from "@/storage/db" import { EventSequenceTable, EventTable } from "../../src/sync/event.sql" import { MessageID } from "../../src/session/schema" import { Flag } from "@opencode-ai/core/flag/flag" @@ -323,5 +323,28 @@ describe("SyncEvent", () => { }), ), ) + + it.live( + "claim updates the event sequence owner", + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + const id = MessageID.ascending() + + yield* SyncEvent.use.run(Created, { id, name: "claimed" }, { publish: false }) + yield* SyncEvent.use.claim(id, "owner-1") + yield* SyncEvent.use.claim(id, "owner-2") + + const row = Database.use((db) => + db + .select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, id)) + .get(), + ) + expect(row).toEqual({ seq: 0, ownerID: "owner-2" }) + }), + ), + ) }) })