mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-16 17:42:30 +00:00
Drop synchronous SyncEvent facades (#26789)
This commit is contained in:
@@ -641,7 +641,7 @@ export const layer = Layer.effect(
|
||||
|
||||
// "claim" this session so any future events coming from
|
||||
// the old workspace are ignored
|
||||
SyncEvent.claim(input.sessionID, input.workspaceID ?? previous.projectID)
|
||||
yield* sync.claim(input.sessionID, input.workspaceID ?? previous.projectID)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ import { EventID } from "./schema"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { Context, Effect, Layer, Schema as EffectSchema } from "effect"
|
||||
import type { DeepMutable } from "@opencode-ai/core/schema"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { serviceUse } from "@/effect/service-use"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
|
||||
@@ -63,6 +62,7 @@ export interface Interface {
|
||||
options?: { publish: boolean; ownerID?: string },
|
||||
) => Effect.Effect<string | undefined>
|
||||
readonly remove: (aggregateID: string) => Effect.Effect<void>
|
||||
readonly claim: (aggregateID: string, ownerID: string) => Effect.Effect<void>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
|
||||
@@ -175,11 +175,24 @@ export const layer = Layer.effect(Service)(
|
||||
})
|
||||
})
|
||||
|
||||
const claim: Interface["claim"] = Effect.fn("SyncEvent.claim")((aggregateID, ownerID) =>
|
||||
Effect.sync(() =>
|
||||
Database.use((db) =>
|
||||
db
|
||||
.update(EventSequenceTable)
|
||||
.set({ owner_id: ownerID })
|
||||
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
|
||||
.run(),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
return Service.of({
|
||||
run,
|
||||
replay,
|
||||
replayAll,
|
||||
remove,
|
||||
claim,
|
||||
})
|
||||
}),
|
||||
)
|
||||
@@ -188,8 +201,6 @@ export const defaultLayer = layer
|
||||
|
||||
export const use = serviceUse(Service)
|
||||
|
||||
const runtime = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export const registry = new Map<string, Definition>()
|
||||
let projectors: Map<Definition, ProjectorFunc> | undefined
|
||||
const versions = new Map<string, number>()
|
||||
@@ -336,32 +347,6 @@ function process<Def extends Definition>(
|
||||
})
|
||||
}
|
||||
|
||||
export function replay(event: SerializedEvent, options?: { publish: boolean; ownerID?: string }) {
|
||||
return runtime.runSync((sync) => sync.replay(event, options))
|
||||
}
|
||||
|
||||
export function replayAll(events: SerializedEvent[], options?: { publish: boolean; ownerID?: string }) {
|
||||
return runtime.runSync((sync) => sync.replayAll(events, options))
|
||||
}
|
||||
|
||||
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
|
||||
return runtime.runSync((sync) => sync.run(def, data, options))
|
||||
}
|
||||
|
||||
export function remove(aggregateID: string) {
|
||||
return runtime.runSync((sync) => sync.remove(aggregateID))
|
||||
}
|
||||
|
||||
export function claim(aggregateID: string, ownerID: string) {
|
||||
Database.use((db) =>
|
||||
db
|
||||
.update(EventSequenceTable)
|
||||
.set({ owner_id: ownerID })
|
||||
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
|
||||
.run(),
|
||||
)
|
||||
}
|
||||
|
||||
export function effectPayloads() {
|
||||
return registry
|
||||
.entries()
|
||||
|
||||
@@ -4,7 +4,7 @@ import { Effect, Layer, Schema } from "effect"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { SyncEvent } from "../../src/sync"
|
||||
import { Database } from "@/storage/db"
|
||||
import { Database, eq } from "@/storage/db"
|
||||
import { EventSequenceTable, EventTable } from "../../src/sync/event.sql"
|
||||
import { MessageID } from "../../src/session/schema"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
@@ -323,5 +323,28 @@ describe("SyncEvent", () => {
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
it.live(
|
||||
"claim updates the event sequence owner",
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const { Created } = setup()
|
||||
const id = MessageID.ascending()
|
||||
|
||||
yield* SyncEvent.use.run(Created, { id, name: "claimed" }, { publish: false })
|
||||
yield* SyncEvent.use.claim(id, "owner-1")
|
||||
yield* SyncEvent.use.claim(id, "owner-2")
|
||||
|
||||
const row = Database.use((db) =>
|
||||
db
|
||||
.select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, id))
|
||||
.get(),
|
||||
)
|
||||
expect(row).toEqual({ seq: 0, ownerID: "owner-2" })
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user