From effd96755e70ddb9e0955bc444860d5cc8829248 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sun, 10 May 2026 21:20:13 -0400 Subject: [PATCH] Use SyncEvent service at event call sites (#26782) --- .../opencode/src/control-plane/workspace.ts | 14 +- packages/opencode/src/session/compaction.ts | 32 ++- packages/opencode/src/session/processor.ts | 272 ++++++++++-------- packages/opencode/src/session/prompt.ts | 96 ++++--- packages/opencode/src/v2/session.ts | 32 +-- .../opencode/test/session/compaction.test.ts | 3 + .../test/session/processor-effect.test.ts | 2 + packages/opencode/test/session/prompt.test.ts | 2 + .../test/session/snapshot-tool-race.test.ts | 2 + 9 files changed, 250 insertions(+), 205 deletions(-) diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index e825eaddcf..00353dcca3 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -676,14 +676,12 @@ export const layer = Layer.effect( } if (input.workspaceID === null) { - yield* Effect.sync(() => - SyncEvent.run(Session.Event.Updated, { - sessionID: input.sessionID, - info: { - workspaceID: null, - }, - }), - ) + yield* sync.run(Session.Event.Updated, { + sessionID: input.sessionID, + info: { + workspaceID: null, + }, + }) log.info("session warp complete", { workspaceID: input.workspaceID, diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 3b16a9a5d7..4eafbdf749 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -18,8 +18,9 @@ import { InstanceState } from "@/effect/instance-state" import { isOverflow as overflow, usable } from "./overflow" import { makeRuntime } from "@/effect/run-service" import { serviceUse } from "@/effect/service-use" -import { EventV2 } from "@/v2/event" +import { SyncEvent } from "@/sync" import { SessionEvent } from "@/v2/session-event" +import { Flag } from "@opencode-ai/core/flag/flag" const log = Log.create({ service: "session.compaction" }) @@ -219,6 +220,7 @@ export const layer: Layer.Layer< | Plugin.Service | SessionProcessor.Service | Provider.Service + | SyncEvent.Service > = Layer.effect( Service, Effect.gen(function* () { @@ -229,6 +231,7 @@ export const layer: Layer.Layer< const plugin = yield* Plugin.Service const processors = yield* SessionProcessor.Service const provider = yield* Provider.Service + const sync = yield* SyncEvent.Service const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: { tokens: MessageV2.Assistant["tokens"] @@ -567,12 +570,14 @@ export const layer: Layer.Layer< parts: [], }, ) - EventV2.run(SessionEvent.Compaction.Ended.Sync, { - sessionID: input.sessionID, - timestamp: DateTime.makeUnsafe(Date.now()), - text: summary ?? "", - include: selected.tail_start_id, - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Compaction.Ended.Sync, { + sessionID: input.sessionID, + timestamp: DateTime.makeUnsafe(Date.now()), + text: summary ?? "", + include: selected.tail_start_id, + }) + } yield* bus.publish(Event.Compacted, { sessionID: input.sessionID }) } return result @@ -601,11 +606,13 @@ export const layer: Layer.Layer< auto: input.auto, overflow: input.overflow, }) - EventV2.run(SessionEvent.Compaction.Started.Sync, { - sessionID: input.sessionID, - timestamp: DateTime.makeUnsafe(Date.now()), - reason: input.auto ? "auto" : "manual", - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Compaction.Started.Sync, { + sessionID: input.sessionID, + timestamp: DateTime.makeUnsafe(Date.now()), + reason: input.auto ? "auto" : "manual", + }) + } }) return Service.of({ @@ -626,6 +633,7 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(Plugin.defaultLayer), Layer.provide(Bus.layer), Layer.provide(Config.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), ), ) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 7df54d5451..579c4cc42c 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -21,10 +21,11 @@ import { Question } from "@/question" import { errorMessage } from "@/util/error" import * as Log from "@opencode-ai/core/util/log" import { isRecord } from "@/util/record" -import { EventV2 } from "@/v2/event" +import { SyncEvent } from "@/sync" import { SessionEvent } from "@/v2/session-event" import { Modelv2 } from "@/v2/model" import * as DateTime from "effect/DateTime" +import { Flag } from "@opencode-ai/core/flag/flag" const DOOM_LOOP_THRESHOLD = 3 const log = Log.create({ service: "session.processor" }) @@ -96,6 +97,7 @@ export const layer: Layer.Layer< | Image.Service | SessionSummary.Service | SessionStatus.Service + | SyncEvent.Service > = Layer.effect( Service, Effect.gen(function* () { @@ -111,6 +113,7 @@ export const layer: Layer.Layer< const scope = yield* Scope.Scope const status = yield* SessionStatus.Service const image = yield* Image.Service + const sync = yield* SyncEvent.Service const create = Effect.fn("SessionProcessor.create")(function* (input: Input) { // Pre-capture snapshot before the LLM stream starts. The AI SDK @@ -229,11 +232,13 @@ export const layer: Layer.Layer< case "reasoning-start": if (value.id in ctx.reasoningMap) return // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Reasoning.Started.Sync, { - sessionID: ctx.sessionID, - reasoningID: value.id, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Reasoning.Started.Sync, { + sessionID: ctx.sessionID, + reasoningID: value.id, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } ctx.reasoningMap[value.id] = { id: PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -262,12 +267,14 @@ export const layer: Layer.Layer< case "reasoning-end": if (!(value.id in ctx.reasoningMap)) return // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Reasoning.Ended.Sync, { - sessionID: ctx.sessionID, - reasoningID: value.id, - text: ctx.reasoningMap[value.id].text, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Reasoning.Ended.Sync, { + sessionID: ctx.sessionID, + reasoningID: value.id, + text: ctx.reasoningMap[value.id].text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } // oxlint-disable-next-line no-self-assign -- reactivity trigger ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() } @@ -281,12 +288,14 @@ export const layer: Layer.Layer< throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) } // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Tool.Input.Started.Sync, { - sessionID: ctx.sessionID, - callID: value.id, - name: value.toolName, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Tool.Input.Started.Sync, { + sessionID: ctx.sessionID, + callID: value.id, + name: value.toolName, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } const part = yield* session.updatePart({ id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -310,12 +319,14 @@ export const layer: Layer.Layer< case "tool-input-end": { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Tool.Input.Ended.Sync, { - sessionID: ctx.sessionID, - callID: value.id, - text: "", - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Tool.Input.Ended.Sync, { + sessionID: ctx.sessionID, + callID: value.id, + text: "", + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } return } @@ -325,17 +336,19 @@ export const layer: Layer.Layer< } const toolCall = yield* readToolCall(value.toolCallId) // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Tool.Called.Sync, { - sessionID: ctx.sessionID, - callID: value.toolCallId, - tool: value.toolName, - input: value.input, - provider: { - executed: toolCall?.part.metadata?.providerExecuted === true, - ...(value.providerMetadata ? { metadata: value.providerMetadata } : {}), - }, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Tool.Called.Sync, { + sessionID: ctx.sessionID, + callID: value.toolCallId, + tool: value.toolName, + input: value.input, + provider: { + executed: toolCall?.part.metadata?.providerExecuted === true, + ...(value.providerMetadata ? { metadata: value.providerMetadata } : {}), + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } yield* updateToolCall(value.toolCallId, (match) => ({ ...match, tool: value.toolName, @@ -405,27 +418,29 @@ export const layer: Layer.Layer< attachments: attachments?.length ? attachments : undefined, } // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Tool.Success.Sync, { - sessionID: ctx.sessionID, - callID: value.toolCallId, - structured: output.metadata, - content: [ - { - type: "text", - text: output.output, + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Tool.Success.Sync, { + sessionID: ctx.sessionID, + callID: value.toolCallId, + structured: output.metadata, + content: [ + { + type: "text", + text: output.output, + }, + ...(output.attachments?.map((item: MessageV2.FilePart) => ({ + type: "file", + uri: item.url, + mime: item.mime, + name: item.filename, + })) ?? []), + ], + provider: { + executed: toolCall?.part.metadata?.providerExecuted === true, }, - ...(output.attachments?.map((item: MessageV2.FilePart) => ({ - type: "file", - uri: item.url, - mime: item.mime, - name: item.filename, - })) ?? []), - ], - provider: { - executed: toolCall?.part.metadata?.providerExecuted === true, - }, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } yield* completeToolCall(value.toolCallId, output) return } @@ -433,18 +448,20 @@ export const layer: Layer.Layer< case "tool-error": { const toolCall = yield* readToolCall(value.toolCallId) // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Tool.Failed.Sync, { - sessionID: ctx.sessionID, - callID: value.toolCallId, - error: { - type: "unknown", - message: errorMessage(value.error), - }, - provider: { - executed: toolCall?.part.metadata?.providerExecuted === true, - }, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Tool.Failed.Sync, { + sessionID: ctx.sessionID, + callID: value.toolCallId, + error: { + type: "unknown", + message: errorMessage(value.error), + }, + provider: { + executed: toolCall?.part.metadata?.providerExecuted === true, + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } yield* failToolCall(value.toolCallId, value.error) return } @@ -456,17 +473,19 @@ export const layer: Layer.Layer< if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track() if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Step.Started.Sync, { - sessionID: ctx.sessionID, - agent: input.assistantMessage.agent, - model: { - id: Modelv2.ID.make(ctx.model.id), - providerID: Modelv2.ProviderID.make(ctx.model.providerID), - variant: Modelv2.VariantID.make(input.assistantMessage.variant ?? "default"), - }, - snapshot: ctx.snapshot, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Step.Started.Sync, { + sessionID: ctx.sessionID, + agent: input.assistantMessage.agent, + model: { + id: Modelv2.ID.make(ctx.model.id), + providerID: Modelv2.ProviderID.make(ctx.model.providerID), + variant: Modelv2.VariantID.make(input.assistantMessage.variant ?? "default"), + }, + snapshot: ctx.snapshot, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } } yield* session.updatePart({ id: PartID.ascending(), @@ -486,14 +505,16 @@ export const layer: Layer.Layer< }) if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Step.Ended.Sync, { - sessionID: ctx.sessionID, - finish: value.finishReason, - cost: usage.cost, - tokens: usage.tokens, - snapshot: completedSnapshot, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Step.Ended.Sync, { + sessionID: ctx.sessionID, + finish: value.finishReason, + cost: usage.cost, + tokens: usage.tokens, + snapshot: completedSnapshot, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } } ctx.assistantMessage.finish = value.finishReason ctx.assistantMessage.cost += usage.cost @@ -541,10 +562,12 @@ export const layer: Layer.Layer< case "text-start": if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Text.Started.Sync, { - sessionID: ctx.sessionID, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Text.Started.Sync, { + sessionID: ctx.sessionID, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } } ctx.currentText = { id: PartID.ascending(), @@ -586,11 +609,13 @@ export const layer: Layer.Layer< )).text if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Text.Ended.Sync, { - sessionID: ctx.sessionID, - text: ctx.currentText.text, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Text.Ended.Sync, { + sessionID: ctx.sessionID, + text: ctx.currentText.text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } } { const end = Date.now() @@ -680,14 +705,16 @@ export const layer: Layer.Layer< } if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Step.Failed.Sync, { - sessionID: ctx.sessionID, - error: { - type: "unknown", - message: errorMessage(e), - }, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Step.Failed.Sync, { + sessionID: ctx.sessionID, + error: { + type: "unknown", + message: errorMessage(e), + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } } ctx.assistantMessage.error = error yield* bus.publish(Session.Event.Error, { @@ -732,22 +759,28 @@ export const layer: Layer.Layer< parse, set: (info) => { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Retried.Sync, { - sessionID: ctx.sessionID, - attempt: info.attempt, - error: { - message: info.message, - isRetryable: true, - }, - timestamp: DateTime.makeUnsafe(Date.now()), - }) - return status.set(ctx.sessionID, { - type: "retry", - attempt: info.attempt, - message: info.message, - action: info.action, - next: info.next, - }) + const event = Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM + ? sync.run(SessionEvent.Retried.Sync, { + sessionID: ctx.sessionID, + attempt: info.attempt, + error: { + message: info.message, + isRetryable: true, + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + : Effect.void + return event.pipe( + Effect.andThen( + status.set(ctx.sessionID, { + type: "retry", + attempt: info.attempt, + message: info.message, + action: info.action, + next: info.next, + }), + ), + ) }, }), ), @@ -788,6 +821,7 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(Image.defaultLayer), Layer.provide(Bus.layer), Layer.provide(Config.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), ), ) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 521bacbc2f..934427f569 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -54,7 +54,7 @@ import { InstanceState } from "@/effect/instance-state" import { TaskTool, type TaskPromptOps } from "@/tool/task" import { SessionRunState } from "./run-state" import { EffectBridge } from "@/effect/bridge" -import { EventV2 } from "@/v2/event" +import { SyncEvent } from "@/sync" import { SessionEvent } from "@/v2/session-event" import { Modelv2 } from "@/v2/model" import { AgentAttachment, FileAttachment, Source } from "@/v2/session-prompt" @@ -118,6 +118,7 @@ export const layer = Layer.effect( const summary = yield* SessionSummary.Service const sys = yield* SystemPrompt.Service const llm = yield* LLM.Service + const sync = yield* SyncEvent.Service const runner = Effect.fn("SessionPrompt.runner")(function* () { return yield* EffectBridge.make() }) @@ -809,12 +810,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the }, } yield* sessions.updatePart(part) - EventV2.run(SessionEvent.Shell.Started.Sync, { - sessionID: input.sessionID, - timestamp: DateTime.makeUnsafe(started), - callID, - command: input.command, - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Shell.Started.Sync, { + sessionID: input.sessionID, + timestamp: DateTime.makeUnsafe(started), + callID, + command: input.command, + }) + } return { msg, part, cwd: ctx.directory } }).pipe(Effect.ensuring(markReady)) @@ -830,12 +833,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the output += "\n\n" + ["", "User aborted the command", ""].join("\n") } const completed = Date.now() - EventV2.run(SessionEvent.Shell.Ended.Sync, { - sessionID: input.sessionID, - timestamp: DateTime.makeUnsafe(completed), - callID: part.callID, - output, - }) + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Shell.Ended.Sync, { + sessionID: input.sessionID, + timestamp: DateTime.makeUnsafe(completed), + callID: part.callID, + output, + }) + } if (!msg.time.completed) { msg.time.completed = completed yield* sessions.updateMessage(msg) @@ -975,34 +980,26 @@ NOTE: At any point in time through this workflow you should feel free to ask the } if (current?.agent !== info.agent) { - EventV2.run( - SessionEvent.AgentSwitched.Sync, - { - sessionID: input.sessionID, - timestamp: DateTime.makeUnsafe(info.time.created), - agent: info.agent, - }, - { bypassExperimentalEventSystem: true }, - ) + yield* sync.run(SessionEvent.AgentSwitched.Sync, { + sessionID: input.sessionID, + timestamp: DateTime.makeUnsafe(info.time.created), + agent: info.agent, + }) } if ( current?.model?.providerID !== info.model.providerID || current.model.id !== info.model.modelID || (current.model.variant === "default" ? undefined : current.model.variant) !== info.model.variant ) { - EventV2.run( - SessionEvent.ModelSwitched.Sync, - { - sessionID: input.sessionID, - timestamp: DateTime.makeUnsafe(info.time.created), - model: { - id: Modelv2.ID.make(info.model.modelID), - providerID: Modelv2.ProviderID.make(info.model.providerID), - variant: Modelv2.VariantID.make(info.model.variant ?? "default"), - }, + yield* sync.run(SessionEvent.ModelSwitched.Sync, { + sessionID: input.sessionID, + timestamp: DateTime.makeUnsafe(info.time.created), + model: { + id: Modelv2.ID.make(info.model.modelID), + providerID: Modelv2.ProviderID.make(info.model.providerID), + variant: Modelv2.VariantID.make(info.model.variant ?? "default"), }, - { bypassExperimentalEventSystem: true }, - ) + }) } yield* Effect.addFinalizer(() => instruction.clear(info.id)) @@ -1371,23 +1368,27 @@ NOTE: At any point in time through this workflow you should feel free to ask the }, ) // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Prompted.Sync, { - sessionID: input.sessionID, - timestamp: DateTime.makeUnsafe(info.time.created), - prompt: { - text: nextPrompt.text.join("\n"), - files: nextPrompt.files, - agents: nextPrompt.agents, - }, - }) - for (const text of nextPrompt.synthetic) { - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - EventV2.run(SessionEvent.Synthetic.Sync, { + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Prompted.Sync, { sessionID: input.sessionID, timestamp: DateTime.makeUnsafe(info.time.created), - text, + prompt: { + text: nextPrompt.text.join("\n"), + files: nextPrompt.files, + agents: nextPrompt.agents, + }, }) } + for (const text of nextPrompt.synthetic) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Synthetic.Sync, { + sessionID: input.sessionID, + timestamp: DateTime.makeUnsafe(info.time.created), + text, + }) + } + } return { info, parts } }, Effect.scoped) @@ -1820,6 +1821,7 @@ export const defaultLayer = Layer.suspend(() => LLM.defaultLayer, Bus.layer, CrossSpawnSpawner.defaultLayer, + SyncEvent.defaultLayer, ), ), ), diff --git a/packages/opencode/src/v2/session.ts b/packages/opencode/src/v2/session.ts index 39deabfc7c..3b0b61dcbc 100644 --- a/packages/opencode/src/v2/session.ts +++ b/packages/opencode/src/v2/session.ts @@ -12,6 +12,7 @@ import { SessionEvent } from "./session-event" import { V2Schema } from "./schema" import { optionalOmitUndefined } from "@opencode-ai/core/schema" import { Modelv2 } from "./model" +import { SyncEvent } from "@/sync" export const Delivery = Schema.Literals(["immediate", "deferred"]).annotate({ identifier: "Session.Delivery", @@ -113,6 +114,7 @@ export class Service extends Context.Service()("@opencode/v2 export const layer = Layer.effect( Service, Effect.gen(function* () { + const sync = yield* SyncEvent.Service const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message) const decode = (row: typeof SessionMessageTable.$inferSelect) => @@ -269,26 +271,18 @@ export const layer = Layer.effect( shell: Effect.fn("V2Session.shell")(function* (_input) {}), skill: Effect.fn("V2Session.skill")(function* (_input) {}), switchAgent: Effect.fn("V2Session.switchAgent")(function* (input) { - EventV2.run( - SessionEvent.AgentSwitched.Sync, - { - sessionID: input.sessionID, - timestamp: DateTime.makeUnsafe(Date.now()), - agent: input.agent, - }, - { bypassExperimentalEventSystem: true }, - ) + yield* sync.run(SessionEvent.AgentSwitched.Sync, { + sessionID: input.sessionID, + timestamp: DateTime.makeUnsafe(Date.now()), + agent: input.agent, + }) }), switchModel: Effect.fn("V2Session.switchModel")(function* (input) { - EventV2.run( - SessionEvent.ModelSwitched.Sync, - { - sessionID: input.sessionID, - timestamp: DateTime.makeUnsafe(Date.now()), - model: input.model, - }, - { bypassExperimentalEventSystem: true }, - ) + yield* sync.run(SessionEvent.ModelSwitched.Sync, { + sessionID: input.sessionID, + timestamp: DateTime.makeUnsafe(Date.now()), + model: input.model, + }) }), subagent: Effect.fn("V2Session.subagent")(function* (input) { const parent = yield* result.get(input.parentID) @@ -319,6 +313,6 @@ export const layer = Layer.effect( }), ) -export const defaultLayer = layer +export const defaultLayer = layer.pipe(Layer.provide(SyncEvent.defaultLayer)) export * as SessionV2 from "./session" diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts index b48e68b10f..8f987b4d10 100644 --- a/packages/opencode/test/session/compaction.test.ts +++ b/packages/opencode/test/session/compaction.test.ts @@ -27,6 +27,7 @@ import { ProviderTest } from "../fake/provider" import { testEffect } from "../lib/effect" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { TestConfig } from "../fixture/config" +import { SyncEvent } from "@/sync" void Log.init({ print: false }) @@ -223,6 +224,7 @@ const deps = Layer.mergeAll( Plugin.defaultLayer, Bus.layer, Config.defaultLayer, + SyncEvent.defaultLayer, ) const env = Layer.mergeAll( @@ -269,6 +271,7 @@ function compactionProcessLayer(options?: CompactionProcessOptions) { Layer.provide(status), Layer.provide(bus), Layer.provide(options?.config ?? Config.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), ) } diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index a0736b459b..56ff102430 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -24,6 +24,7 @@ import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" import { raw, reply, TestLLMServer } from "../lib/llm-server" +import { SyncEvent } from "@/sync" void Log.init({ print: false }) @@ -166,6 +167,7 @@ const deps = Layer.mergeAll( LLM.defaultLayer, Provider.defaultLayer, status, + SyncEvent.defaultLayer, ).pipe(Layer.provideMerge(infra)) const env = Layer.mergeAll( TestLLMServer.layer, diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 98a69fce96..42c9a81cd2 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -50,6 +50,7 @@ import { Reference } from "../../src/reference/reference" import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" import { reply, TestLLMServer } from "../lib/llm-server" +import { SyncEvent } from "@/sync" void Log.init({ print: false }) @@ -174,6 +175,7 @@ function makeHttp() { mcp, AppFileSystem.defaultLayer, status, + SyncEvent.defaultLayer, ).pipe(Layer.provideMerge(infra)) const question = Question.layer.pipe(Layer.provideMerge(deps)) const todo = Todo.layer.pipe(Layer.provideMerge(deps)) diff --git a/packages/opencode/test/session/snapshot-tool-race.test.ts b/packages/opencode/test/session/snapshot-tool-race.test.ts index 251a4acf3f..5c47df4c0d 100644 --- a/packages/opencode/test/session/snapshot-tool-race.test.ts +++ b/packages/opencode/test/session/snapshot-tool-race.test.ts @@ -58,6 +58,7 @@ import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { Ripgrep } from "../../src/file/ripgrep" import { Format } from "../../src/format" import { Reference } from "../../src/reference/reference" +import { SyncEvent } from "@/sync" void Log.init({ print: false }) @@ -124,6 +125,7 @@ function makeHttp() { mcp, AppFileSystem.defaultLayer, status, + SyncEvent.defaultLayer, ).pipe(Layer.provideMerge(infra)) const question = Question.layer.pipe(Layer.provideMerge(deps)) const todo = Todo.layer.pipe(Layer.provideMerge(deps))