diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts
index e825eaddcf..00353dcca3 100644
--- a/packages/opencode/src/control-plane/workspace.ts
+++ b/packages/opencode/src/control-plane/workspace.ts
@@ -676,14 +676,12 @@ export const layer = Layer.effect(
}
if (input.workspaceID === null) {
- yield* Effect.sync(() =>
- SyncEvent.run(Session.Event.Updated, {
- sessionID: input.sessionID,
- info: {
- workspaceID: null,
- },
- }),
- )
+ yield* sync.run(Session.Event.Updated, {
+ sessionID: input.sessionID,
+ info: {
+ workspaceID: null,
+ },
+ })
log.info("session warp complete", {
workspaceID: input.workspaceID,
diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts
index 3b16a9a5d7..4eafbdf749 100644
--- a/packages/opencode/src/session/compaction.ts
+++ b/packages/opencode/src/session/compaction.ts
@@ -18,8 +18,9 @@ import { InstanceState } from "@/effect/instance-state"
import { isOverflow as overflow, usable } from "./overflow"
import { makeRuntime } from "@/effect/run-service"
import { serviceUse } from "@/effect/service-use"
-import { EventV2 } from "@/v2/event"
+import { SyncEvent } from "@/sync"
import { SessionEvent } from "@/v2/session-event"
+import { Flag } from "@opencode-ai/core/flag/flag"
const log = Log.create({ service: "session.compaction" })
@@ -219,6 +220,7 @@ export const layer: Layer.Layer<
| Plugin.Service
| SessionProcessor.Service
| Provider.Service
+ | SyncEvent.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
@@ -229,6 +231,7 @@ export const layer: Layer.Layer<
const plugin = yield* Plugin.Service
const processors = yield* SessionProcessor.Service
const provider = yield* Provider.Service
+ const sync = yield* SyncEvent.Service
const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: {
tokens: MessageV2.Assistant["tokens"]
@@ -567,12 +570,14 @@ export const layer: Layer.Layer<
parts: [],
},
)
- EventV2.run(SessionEvent.Compaction.Ended.Sync, {
- sessionID: input.sessionID,
- timestamp: DateTime.makeUnsafe(Date.now()),
- text: summary ?? "",
- include: selected.tail_start_id,
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Compaction.Ended.Sync, {
+ sessionID: input.sessionID,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ text: summary ?? "",
+ include: selected.tail_start_id,
+ })
+ }
yield* bus.publish(Event.Compacted, { sessionID: input.sessionID })
}
return result
@@ -601,11 +606,13 @@ export const layer: Layer.Layer<
auto: input.auto,
overflow: input.overflow,
})
- EventV2.run(SessionEvent.Compaction.Started.Sync, {
- sessionID: input.sessionID,
- timestamp: DateTime.makeUnsafe(Date.now()),
- reason: input.auto ? "auto" : "manual",
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Compaction.Started.Sync, {
+ sessionID: input.sessionID,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ reason: input.auto ? "auto" : "manual",
+ })
+ }
})
return Service.of({
@@ -626,6 +633,7 @@ export const defaultLayer = Layer.suspend(() =>
Layer.provide(Plugin.defaultLayer),
Layer.provide(Bus.layer),
Layer.provide(Config.defaultLayer),
+ Layer.provide(SyncEvent.defaultLayer),
),
)
diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts
index 7df54d5451..579c4cc42c 100644
--- a/packages/opencode/src/session/processor.ts
+++ b/packages/opencode/src/session/processor.ts
@@ -21,10 +21,11 @@ import { Question } from "@/question"
import { errorMessage } from "@/util/error"
import * as Log from "@opencode-ai/core/util/log"
import { isRecord } from "@/util/record"
-import { EventV2 } from "@/v2/event"
+import { SyncEvent } from "@/sync"
import { SessionEvent } from "@/v2/session-event"
import { Modelv2 } from "@/v2/model"
import * as DateTime from "effect/DateTime"
+import { Flag } from "@opencode-ai/core/flag/flag"
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
@@ -96,6 +97,7 @@ export const layer: Layer.Layer<
| Image.Service
| SessionSummary.Service
| SessionStatus.Service
+ | SyncEvent.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
@@ -111,6 +113,7 @@ export const layer: Layer.Layer<
const scope = yield* Scope.Scope
const status = yield* SessionStatus.Service
const image = yield* Image.Service
+ const sync = yield* SyncEvent.Service
const create = Effect.fn("SessionProcessor.create")(function* (input: Input) {
// Pre-capture snapshot before the LLM stream starts. The AI SDK
@@ -229,11 +232,13 @@ export const layer: Layer.Layer<
case "reasoning-start":
if (value.id in ctx.reasoningMap) return
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Reasoning.Started.Sync, {
- sessionID: ctx.sessionID,
- reasoningID: value.id,
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Reasoning.Started.Sync, {
+ sessionID: ctx.sessionID,
+ reasoningID: value.id,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
ctx.reasoningMap[value.id] = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -262,12 +267,14 @@ export const layer: Layer.Layer<
case "reasoning-end":
if (!(value.id in ctx.reasoningMap)) return
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Reasoning.Ended.Sync, {
- sessionID: ctx.sessionID,
- reasoningID: value.id,
- text: ctx.reasoningMap[value.id].text,
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Reasoning.Ended.Sync, {
+ sessionID: ctx.sessionID,
+ reasoningID: value.id,
+ text: ctx.reasoningMap[value.id].text,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
// oxlint-disable-next-line no-self-assign -- reactivity trigger
ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
@@ -281,12 +288,14 @@ export const layer: Layer.Layer<
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Tool.Input.Started.Sync, {
- sessionID: ctx.sessionID,
- callID: value.id,
- name: value.toolName,
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Tool.Input.Started.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.id,
+ name: value.toolName,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
const part = yield* session.updatePart({
id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -310,12 +319,14 @@ export const layer: Layer.Layer<
case "tool-input-end": {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Tool.Input.Ended.Sync, {
- sessionID: ctx.sessionID,
- callID: value.id,
- text: "",
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Tool.Input.Ended.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.id,
+ text: "",
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
return
}
@@ -325,17 +336,19 @@ export const layer: Layer.Layer<
}
const toolCall = yield* readToolCall(value.toolCallId)
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Tool.Called.Sync, {
- sessionID: ctx.sessionID,
- callID: value.toolCallId,
- tool: value.toolName,
- input: value.input,
- provider: {
- executed: toolCall?.part.metadata?.providerExecuted === true,
- ...(value.providerMetadata ? { metadata: value.providerMetadata } : {}),
- },
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Tool.Called.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.toolCallId,
+ tool: value.toolName,
+ input: value.input,
+ provider: {
+ executed: toolCall?.part.metadata?.providerExecuted === true,
+ ...(value.providerMetadata ? { metadata: value.providerMetadata } : {}),
+ },
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
yield* updateToolCall(value.toolCallId, (match) => ({
...match,
tool: value.toolName,
@@ -405,27 +418,29 @@ export const layer: Layer.Layer<
attachments: attachments?.length ? attachments : undefined,
}
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Tool.Success.Sync, {
- sessionID: ctx.sessionID,
- callID: value.toolCallId,
- structured: output.metadata,
- content: [
- {
- type: "text",
- text: output.output,
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Tool.Success.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.toolCallId,
+ structured: output.metadata,
+ content: [
+ {
+ type: "text",
+ text: output.output,
+ },
+ ...(output.attachments?.map((item: MessageV2.FilePart) => ({
+ type: "file",
+ uri: item.url,
+ mime: item.mime,
+ name: item.filename,
+ })) ?? []),
+ ],
+ provider: {
+ executed: toolCall?.part.metadata?.providerExecuted === true,
},
- ...(output.attachments?.map((item: MessageV2.FilePart) => ({
- type: "file",
- uri: item.url,
- mime: item.mime,
- name: item.filename,
- })) ?? []),
- ],
- provider: {
- executed: toolCall?.part.metadata?.providerExecuted === true,
- },
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
yield* completeToolCall(value.toolCallId, output)
return
}
@@ -433,18 +448,20 @@ export const layer: Layer.Layer<
case "tool-error": {
const toolCall = yield* readToolCall(value.toolCallId)
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Tool.Failed.Sync, {
- sessionID: ctx.sessionID,
- callID: value.toolCallId,
- error: {
- type: "unknown",
- message: errorMessage(value.error),
- },
- provider: {
- executed: toolCall?.part.metadata?.providerExecuted === true,
- },
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Tool.Failed.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.toolCallId,
+ error: {
+ type: "unknown",
+ message: errorMessage(value.error),
+ },
+ provider: {
+ executed: toolCall?.part.metadata?.providerExecuted === true,
+ },
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
yield* failToolCall(value.toolCallId, value.error)
return
}
@@ -456,17 +473,19 @@ export const layer: Layer.Layer<
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Step.Started.Sync, {
- sessionID: ctx.sessionID,
- agent: input.assistantMessage.agent,
- model: {
- id: Modelv2.ID.make(ctx.model.id),
- providerID: Modelv2.ProviderID.make(ctx.model.providerID),
- variant: Modelv2.VariantID.make(input.assistantMessage.variant ?? "default"),
- },
- snapshot: ctx.snapshot,
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Step.Started.Sync, {
+ sessionID: ctx.sessionID,
+ agent: input.assistantMessage.agent,
+ model: {
+ id: Modelv2.ID.make(ctx.model.id),
+ providerID: Modelv2.ProviderID.make(ctx.model.providerID),
+ variant: Modelv2.VariantID.make(input.assistantMessage.variant ?? "default"),
+ },
+ snapshot: ctx.snapshot,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
}
yield* session.updatePart({
id: PartID.ascending(),
@@ -486,14 +505,16 @@ export const layer: Layer.Layer<
})
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Step.Ended.Sync, {
- sessionID: ctx.sessionID,
- finish: value.finishReason,
- cost: usage.cost,
- tokens: usage.tokens,
- snapshot: completedSnapshot,
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Step.Ended.Sync, {
+ sessionID: ctx.sessionID,
+ finish: value.finishReason,
+ cost: usage.cost,
+ tokens: usage.tokens,
+ snapshot: completedSnapshot,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
}
ctx.assistantMessage.finish = value.finishReason
ctx.assistantMessage.cost += usage.cost
@@ -541,10 +562,12 @@ export const layer: Layer.Layer<
case "text-start":
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Text.Started.Sync, {
- sessionID: ctx.sessionID,
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Text.Started.Sync, {
+ sessionID: ctx.sessionID,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
}
ctx.currentText = {
id: PartID.ascending(),
@@ -586,11 +609,13 @@ export const layer: Layer.Layer<
)).text
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Text.Ended.Sync, {
- sessionID: ctx.sessionID,
- text: ctx.currentText.text,
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Text.Ended.Sync, {
+ sessionID: ctx.sessionID,
+ text: ctx.currentText.text,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
}
{
const end = Date.now()
@@ -680,14 +705,16 @@ export const layer: Layer.Layer<
}
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Step.Failed.Sync, {
- sessionID: ctx.sessionID,
- error: {
- type: "unknown",
- message: errorMessage(e),
- },
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Step.Failed.Sync, {
+ sessionID: ctx.sessionID,
+ error: {
+ type: "unknown",
+ message: errorMessage(e),
+ },
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
}
ctx.assistantMessage.error = error
yield* bus.publish(Session.Event.Error, {
@@ -732,22 +759,28 @@ export const layer: Layer.Layer<
parse,
set: (info) => {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Retried.Sync, {
- sessionID: ctx.sessionID,
- attempt: info.attempt,
- error: {
- message: info.message,
- isRetryable: true,
- },
- timestamp: DateTime.makeUnsafe(Date.now()),
- })
- return status.set(ctx.sessionID, {
- type: "retry",
- attempt: info.attempt,
- message: info.message,
- action: info.action,
- next: info.next,
- })
+ const event = Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM
+ ? sync.run(SessionEvent.Retried.Sync, {
+ sessionID: ctx.sessionID,
+ attempt: info.attempt,
+ error: {
+ message: info.message,
+ isRetryable: true,
+ },
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ : Effect.void
+ return event.pipe(
+ Effect.andThen(
+ status.set(ctx.sessionID, {
+ type: "retry",
+ attempt: info.attempt,
+ message: info.message,
+ action: info.action,
+ next: info.next,
+ }),
+ ),
+ )
},
}),
),
@@ -788,6 +821,7 @@ export const defaultLayer = Layer.suspend(() =>
Layer.provide(Image.defaultLayer),
Layer.provide(Bus.layer),
Layer.provide(Config.defaultLayer),
+ Layer.provide(SyncEvent.defaultLayer),
),
)
diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts
index 521bacbc2f..934427f569 100644
--- a/packages/opencode/src/session/prompt.ts
+++ b/packages/opencode/src/session/prompt.ts
@@ -54,7 +54,7 @@ import { InstanceState } from "@/effect/instance-state"
import { TaskTool, type TaskPromptOps } from "@/tool/task"
import { SessionRunState } from "./run-state"
import { EffectBridge } from "@/effect/bridge"
-import { EventV2 } from "@/v2/event"
+import { SyncEvent } from "@/sync"
import { SessionEvent } from "@/v2/session-event"
import { Modelv2 } from "@/v2/model"
import { AgentAttachment, FileAttachment, Source } from "@/v2/session-prompt"
@@ -118,6 +118,7 @@ export const layer = Layer.effect(
const summary = yield* SessionSummary.Service
const sys = yield* SystemPrompt.Service
const llm = yield* LLM.Service
+ const sync = yield* SyncEvent.Service
const runner = Effect.fn("SessionPrompt.runner")(function* () {
return yield* EffectBridge.make()
})
@@ -809,12 +810,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the
},
}
yield* sessions.updatePart(part)
- EventV2.run(SessionEvent.Shell.Started.Sync, {
- sessionID: input.sessionID,
- timestamp: DateTime.makeUnsafe(started),
- callID,
- command: input.command,
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Shell.Started.Sync, {
+ sessionID: input.sessionID,
+ timestamp: DateTime.makeUnsafe(started),
+ callID,
+ command: input.command,
+ })
+ }
return { msg, part, cwd: ctx.directory }
}).pipe(Effect.ensuring(markReady))
@@ -830,12 +833,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the
output += "\n\n" + ["", "User aborted the command", ""].join("\n")
}
const completed = Date.now()
- EventV2.run(SessionEvent.Shell.Ended.Sync, {
- sessionID: input.sessionID,
- timestamp: DateTime.makeUnsafe(completed),
- callID: part.callID,
- output,
- })
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Shell.Ended.Sync, {
+ sessionID: input.sessionID,
+ timestamp: DateTime.makeUnsafe(completed),
+ callID: part.callID,
+ output,
+ })
+ }
if (!msg.time.completed) {
msg.time.completed = completed
yield* sessions.updateMessage(msg)
@@ -975,34 +980,26 @@ NOTE: At any point in time through this workflow you should feel free to ask the
}
if (current?.agent !== info.agent) {
- EventV2.run(
- SessionEvent.AgentSwitched.Sync,
- {
- sessionID: input.sessionID,
- timestamp: DateTime.makeUnsafe(info.time.created),
- agent: info.agent,
- },
- { bypassExperimentalEventSystem: true },
- )
+ yield* sync.run(SessionEvent.AgentSwitched.Sync, {
+ sessionID: input.sessionID,
+ timestamp: DateTime.makeUnsafe(info.time.created),
+ agent: info.agent,
+ })
}
if (
current?.model?.providerID !== info.model.providerID ||
current.model.id !== info.model.modelID ||
(current.model.variant === "default" ? undefined : current.model.variant) !== info.model.variant
) {
- EventV2.run(
- SessionEvent.ModelSwitched.Sync,
- {
- sessionID: input.sessionID,
- timestamp: DateTime.makeUnsafe(info.time.created),
- model: {
- id: Modelv2.ID.make(info.model.modelID),
- providerID: Modelv2.ProviderID.make(info.model.providerID),
- variant: Modelv2.VariantID.make(info.model.variant ?? "default"),
- },
+ yield* sync.run(SessionEvent.ModelSwitched.Sync, {
+ sessionID: input.sessionID,
+ timestamp: DateTime.makeUnsafe(info.time.created),
+ model: {
+ id: Modelv2.ID.make(info.model.modelID),
+ providerID: Modelv2.ProviderID.make(info.model.providerID),
+ variant: Modelv2.VariantID.make(info.model.variant ?? "default"),
},
- { bypassExperimentalEventSystem: true },
- )
+ })
}
yield* Effect.addFinalizer(() => instruction.clear(info.id))
@@ -1371,23 +1368,27 @@ NOTE: At any point in time through this workflow you should feel free to ask the
},
)
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Prompted.Sync, {
- sessionID: input.sessionID,
- timestamp: DateTime.makeUnsafe(info.time.created),
- prompt: {
- text: nextPrompt.text.join("\n"),
- files: nextPrompt.files,
- agents: nextPrompt.agents,
- },
- })
- for (const text of nextPrompt.synthetic) {
- // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
- EventV2.run(SessionEvent.Synthetic.Sync, {
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Prompted.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(info.time.created),
- text,
+ prompt: {
+ text: nextPrompt.text.join("\n"),
+ files: nextPrompt.files,
+ agents: nextPrompt.agents,
+ },
})
}
+ for (const text of nextPrompt.synthetic) {
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
+ yield* sync.run(SessionEvent.Synthetic.Sync, {
+ sessionID: input.sessionID,
+ timestamp: DateTime.makeUnsafe(info.time.created),
+ text,
+ })
+ }
+ }
return { info, parts }
}, Effect.scoped)
@@ -1820,6 +1821,7 @@ export const defaultLayer = Layer.suspend(() =>
LLM.defaultLayer,
Bus.layer,
CrossSpawnSpawner.defaultLayer,
+ SyncEvent.defaultLayer,
),
),
),
diff --git a/packages/opencode/src/v2/session.ts b/packages/opencode/src/v2/session.ts
index 39deabfc7c..3b0b61dcbc 100644
--- a/packages/opencode/src/v2/session.ts
+++ b/packages/opencode/src/v2/session.ts
@@ -12,6 +12,7 @@ import { SessionEvent } from "./session-event"
import { V2Schema } from "./schema"
import { optionalOmitUndefined } from "@opencode-ai/core/schema"
import { Modelv2 } from "./model"
+import { SyncEvent } from "@/sync"
export const Delivery = Schema.Literals(["immediate", "deferred"]).annotate({
identifier: "Session.Delivery",
@@ -113,6 +114,7 @@ export class Service extends Context.Service()("@opencode/v2
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
+ const sync = yield* SyncEvent.Service
const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message)
const decode = (row: typeof SessionMessageTable.$inferSelect) =>
@@ -269,26 +271,18 @@ export const layer = Layer.effect(
shell: Effect.fn("V2Session.shell")(function* (_input) {}),
skill: Effect.fn("V2Session.skill")(function* (_input) {}),
switchAgent: Effect.fn("V2Session.switchAgent")(function* (input) {
- EventV2.run(
- SessionEvent.AgentSwitched.Sync,
- {
- sessionID: input.sessionID,
- timestamp: DateTime.makeUnsafe(Date.now()),
- agent: input.agent,
- },
- { bypassExperimentalEventSystem: true },
- )
+ yield* sync.run(SessionEvent.AgentSwitched.Sync, {
+ sessionID: input.sessionID,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ agent: input.agent,
+ })
}),
switchModel: Effect.fn("V2Session.switchModel")(function* (input) {
- EventV2.run(
- SessionEvent.ModelSwitched.Sync,
- {
- sessionID: input.sessionID,
- timestamp: DateTime.makeUnsafe(Date.now()),
- model: input.model,
- },
- { bypassExperimentalEventSystem: true },
- )
+ yield* sync.run(SessionEvent.ModelSwitched.Sync, {
+ sessionID: input.sessionID,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ model: input.model,
+ })
}),
subagent: Effect.fn("V2Session.subagent")(function* (input) {
const parent = yield* result.get(input.parentID)
@@ -319,6 +313,6 @@ export const layer = Layer.effect(
}),
)
-export const defaultLayer = layer
+export const defaultLayer = layer.pipe(Layer.provide(SyncEvent.defaultLayer))
export * as SessionV2 from "./session"
diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts
index b48e68b10f..8f987b4d10 100644
--- a/packages/opencode/test/session/compaction.test.ts
+++ b/packages/opencode/test/session/compaction.test.ts
@@ -27,6 +27,7 @@ import { ProviderTest } from "../fake/provider"
import { testEffect } from "../lib/effect"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { TestConfig } from "../fixture/config"
+import { SyncEvent } from "@/sync"
void Log.init({ print: false })
@@ -223,6 +224,7 @@ const deps = Layer.mergeAll(
Plugin.defaultLayer,
Bus.layer,
Config.defaultLayer,
+ SyncEvent.defaultLayer,
)
const env = Layer.mergeAll(
@@ -269,6 +271,7 @@ function compactionProcessLayer(options?: CompactionProcessOptions) {
Layer.provide(status),
Layer.provide(bus),
Layer.provide(options?.config ?? Config.defaultLayer),
+ Layer.provide(SyncEvent.defaultLayer),
)
}
diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts
index a0736b459b..56ff102430 100644
--- a/packages/opencode/test/session/processor-effect.test.ts
+++ b/packages/opencode/test/session/processor-effect.test.ts
@@ -24,6 +24,7 @@ import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { provideTmpdirServer } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { raw, reply, TestLLMServer } from "../lib/llm-server"
+import { SyncEvent } from "@/sync"
void Log.init({ print: false })
@@ -166,6 +167,7 @@ const deps = Layer.mergeAll(
LLM.defaultLayer,
Provider.defaultLayer,
status,
+ SyncEvent.defaultLayer,
).pipe(Layer.provideMerge(infra))
const env = Layer.mergeAll(
TestLLMServer.layer,
diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts
index 98a69fce96..42c9a81cd2 100644
--- a/packages/opencode/test/session/prompt.test.ts
+++ b/packages/opencode/test/session/prompt.test.ts
@@ -50,6 +50,7 @@ import { Reference } from "../../src/reference/reference"
import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { reply, TestLLMServer } from "../lib/llm-server"
+import { SyncEvent } from "@/sync"
void Log.init({ print: false })
@@ -174,6 +175,7 @@ function makeHttp() {
mcp,
AppFileSystem.defaultLayer,
status,
+ SyncEvent.defaultLayer,
).pipe(Layer.provideMerge(infra))
const question = Question.layer.pipe(Layer.provideMerge(deps))
const todo = Todo.layer.pipe(Layer.provideMerge(deps))
diff --git a/packages/opencode/test/session/snapshot-tool-race.test.ts b/packages/opencode/test/session/snapshot-tool-race.test.ts
index 251a4acf3f..5c47df4c0d 100644
--- a/packages/opencode/test/session/snapshot-tool-race.test.ts
+++ b/packages/opencode/test/session/snapshot-tool-race.test.ts
@@ -58,6 +58,7 @@ import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Ripgrep } from "../../src/file/ripgrep"
import { Format } from "../../src/format"
import { Reference } from "../../src/reference/reference"
+import { SyncEvent } from "@/sync"
void Log.init({ print: false })
@@ -124,6 +125,7 @@ function makeHttp() {
mcp,
AppFileSystem.defaultLayer,
status,
+ SyncEvent.defaultLayer,
).pipe(Layer.provideMerge(infra))
const question = Question.layer.pipe(Layer.provideMerge(deps))
const todo = Todo.layer.pipe(Layer.provideMerge(deps))