mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-16 01:22:58 +00:00
Use SyncEvent service at event call sites (#26782)
This commit is contained in:
@@ -676,14 +676,12 @@ export const layer = Layer.effect(
|
||||
}
|
||||
|
||||
if (input.workspaceID === null) {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(Session.Event.Updated, {
|
||||
sessionID: input.sessionID,
|
||||
info: {
|
||||
workspaceID: null,
|
||||
},
|
||||
}),
|
||||
)
|
||||
yield* sync.run(Session.Event.Updated, {
|
||||
sessionID: input.sessionID,
|
||||
info: {
|
||||
workspaceID: null,
|
||||
},
|
||||
})
|
||||
|
||||
log.info("session warp complete", {
|
||||
workspaceID: input.workspaceID,
|
||||
|
||||
@@ -18,8 +18,9 @@ import { InstanceState } from "@/effect/instance-state"
|
||||
import { isOverflow as overflow, usable } from "./overflow"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { serviceUse } from "@/effect/service-use"
|
||||
import { EventV2 } from "@/v2/event"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { SessionEvent } from "@/v2/session-event"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
|
||||
const log = Log.create({ service: "session.compaction" })
|
||||
|
||||
@@ -219,6 +220,7 @@ export const layer: Layer.Layer<
|
||||
| Plugin.Service
|
||||
| SessionProcessor.Service
|
||||
| Provider.Service
|
||||
| SyncEvent.Service
|
||||
> = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
@@ -229,6 +231,7 @@ export const layer: Layer.Layer<
|
||||
const plugin = yield* Plugin.Service
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const provider = yield* Provider.Service
|
||||
const sync = yield* SyncEvent.Service
|
||||
|
||||
const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: {
|
||||
tokens: MessageV2.Assistant["tokens"]
|
||||
@@ -567,12 +570,14 @@ export const layer: Layer.Layer<
|
||||
parts: [],
|
||||
},
|
||||
)
|
||||
EventV2.run(SessionEvent.Compaction.Ended.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
text: summary ?? "",
|
||||
include: selected.tail_start_id,
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Compaction.Ended.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
text: summary ?? "",
|
||||
include: selected.tail_start_id,
|
||||
})
|
||||
}
|
||||
yield* bus.publish(Event.Compacted, { sessionID: input.sessionID })
|
||||
}
|
||||
return result
|
||||
@@ -601,11 +606,13 @@ export const layer: Layer.Layer<
|
||||
auto: input.auto,
|
||||
overflow: input.overflow,
|
||||
})
|
||||
EventV2.run(SessionEvent.Compaction.Started.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
reason: input.auto ? "auto" : "manual",
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Compaction.Started.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
reason: input.auto ? "auto" : "manual",
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
return Service.of({
|
||||
@@ -626,6 +633,7 @@ export const defaultLayer = Layer.suspend(() =>
|
||||
Layer.provide(Plugin.defaultLayer),
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -21,10 +21,11 @@ import { Question } from "@/question"
|
||||
import { errorMessage } from "@/util/error"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { isRecord } from "@/util/record"
|
||||
import { EventV2 } from "@/v2/event"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { SessionEvent } from "@/v2/session-event"
|
||||
import { Modelv2 } from "@/v2/model"
|
||||
import * as DateTime from "effect/DateTime"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
|
||||
const DOOM_LOOP_THRESHOLD = 3
|
||||
const log = Log.create({ service: "session.processor" })
|
||||
@@ -96,6 +97,7 @@ export const layer: Layer.Layer<
|
||||
| Image.Service
|
||||
| SessionSummary.Service
|
||||
| SessionStatus.Service
|
||||
| SyncEvent.Service
|
||||
> = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
@@ -111,6 +113,7 @@ export const layer: Layer.Layer<
|
||||
const scope = yield* Scope.Scope
|
||||
const status = yield* SessionStatus.Service
|
||||
const image = yield* Image.Service
|
||||
const sync = yield* SyncEvent.Service
|
||||
|
||||
const create = Effect.fn("SessionProcessor.create")(function* (input: Input) {
|
||||
// Pre-capture snapshot before the LLM stream starts. The AI SDK
|
||||
@@ -229,11 +232,13 @@ export const layer: Layer.Layer<
|
||||
case "reasoning-start":
|
||||
if (value.id in ctx.reasoningMap) return
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Reasoning.Started.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
reasoningID: value.id,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Reasoning.Started.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
reasoningID: value.id,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
ctx.reasoningMap[value.id] = {
|
||||
id: PartID.ascending(),
|
||||
messageID: ctx.assistantMessage.id,
|
||||
@@ -262,12 +267,14 @@ export const layer: Layer.Layer<
|
||||
case "reasoning-end":
|
||||
if (!(value.id in ctx.reasoningMap)) return
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Reasoning.Ended.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
reasoningID: value.id,
|
||||
text: ctx.reasoningMap[value.id].text,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Reasoning.Ended.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
reasoningID: value.id,
|
||||
text: ctx.reasoningMap[value.id].text,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
// oxlint-disable-next-line no-self-assign -- reactivity trigger
|
||||
ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text
|
||||
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
|
||||
@@ -281,12 +288,14 @@ export const layer: Layer.Layer<
|
||||
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
|
||||
}
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Tool.Input.Started.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.id,
|
||||
name: value.toolName,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Tool.Input.Started.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.id,
|
||||
name: value.toolName,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
const part = yield* session.updatePart({
|
||||
id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
|
||||
messageID: ctx.assistantMessage.id,
|
||||
@@ -310,12 +319,14 @@ export const layer: Layer.Layer<
|
||||
|
||||
case "tool-input-end": {
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Tool.Input.Ended.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.id,
|
||||
text: "",
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Tool.Input.Ended.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.id,
|
||||
text: "",
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -325,17 +336,19 @@ export const layer: Layer.Layer<
|
||||
}
|
||||
const toolCall = yield* readToolCall(value.toolCallId)
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Tool.Called.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.toolCallId,
|
||||
tool: value.toolName,
|
||||
input: value.input,
|
||||
provider: {
|
||||
executed: toolCall?.part.metadata?.providerExecuted === true,
|
||||
...(value.providerMetadata ? { metadata: value.providerMetadata } : {}),
|
||||
},
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Tool.Called.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.toolCallId,
|
||||
tool: value.toolName,
|
||||
input: value.input,
|
||||
provider: {
|
||||
executed: toolCall?.part.metadata?.providerExecuted === true,
|
||||
...(value.providerMetadata ? { metadata: value.providerMetadata } : {}),
|
||||
},
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
yield* updateToolCall(value.toolCallId, (match) => ({
|
||||
...match,
|
||||
tool: value.toolName,
|
||||
@@ -405,27 +418,29 @@ export const layer: Layer.Layer<
|
||||
attachments: attachments?.length ? attachments : undefined,
|
||||
}
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Tool.Success.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.toolCallId,
|
||||
structured: output.metadata,
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: output.output,
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Tool.Success.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.toolCallId,
|
||||
structured: output.metadata,
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: output.output,
|
||||
},
|
||||
...(output.attachments?.map((item: MessageV2.FilePart) => ({
|
||||
type: "file",
|
||||
uri: item.url,
|
||||
mime: item.mime,
|
||||
name: item.filename,
|
||||
})) ?? []),
|
||||
],
|
||||
provider: {
|
||||
executed: toolCall?.part.metadata?.providerExecuted === true,
|
||||
},
|
||||
...(output.attachments?.map((item: MessageV2.FilePart) => ({
|
||||
type: "file",
|
||||
uri: item.url,
|
||||
mime: item.mime,
|
||||
name: item.filename,
|
||||
})) ?? []),
|
||||
],
|
||||
provider: {
|
||||
executed: toolCall?.part.metadata?.providerExecuted === true,
|
||||
},
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
yield* completeToolCall(value.toolCallId, output)
|
||||
return
|
||||
}
|
||||
@@ -433,18 +448,20 @@ export const layer: Layer.Layer<
|
||||
case "tool-error": {
|
||||
const toolCall = yield* readToolCall(value.toolCallId)
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Tool.Failed.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.toolCallId,
|
||||
error: {
|
||||
type: "unknown",
|
||||
message: errorMessage(value.error),
|
||||
},
|
||||
provider: {
|
||||
executed: toolCall?.part.metadata?.providerExecuted === true,
|
||||
},
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Tool.Failed.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
callID: value.toolCallId,
|
||||
error: {
|
||||
type: "unknown",
|
||||
message: errorMessage(value.error),
|
||||
},
|
||||
provider: {
|
||||
executed: toolCall?.part.metadata?.providerExecuted === true,
|
||||
},
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
yield* failToolCall(value.toolCallId, value.error)
|
||||
return
|
||||
}
|
||||
@@ -456,17 +473,19 @@ export const layer: Layer.Layer<
|
||||
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
|
||||
if (!ctx.assistantMessage.summary) {
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Step.Started.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
agent: input.assistantMessage.agent,
|
||||
model: {
|
||||
id: Modelv2.ID.make(ctx.model.id),
|
||||
providerID: Modelv2.ProviderID.make(ctx.model.providerID),
|
||||
variant: Modelv2.VariantID.make(input.assistantMessage.variant ?? "default"),
|
||||
},
|
||||
snapshot: ctx.snapshot,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Step.Started.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
agent: input.assistantMessage.agent,
|
||||
model: {
|
||||
id: Modelv2.ID.make(ctx.model.id),
|
||||
providerID: Modelv2.ProviderID.make(ctx.model.providerID),
|
||||
variant: Modelv2.VariantID.make(input.assistantMessage.variant ?? "default"),
|
||||
},
|
||||
snapshot: ctx.snapshot,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
}
|
||||
yield* session.updatePart({
|
||||
id: PartID.ascending(),
|
||||
@@ -486,14 +505,16 @@ export const layer: Layer.Layer<
|
||||
})
|
||||
if (!ctx.assistantMessage.summary) {
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Step.Ended.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
finish: value.finishReason,
|
||||
cost: usage.cost,
|
||||
tokens: usage.tokens,
|
||||
snapshot: completedSnapshot,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Step.Ended.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
finish: value.finishReason,
|
||||
cost: usage.cost,
|
||||
tokens: usage.tokens,
|
||||
snapshot: completedSnapshot,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
}
|
||||
ctx.assistantMessage.finish = value.finishReason
|
||||
ctx.assistantMessage.cost += usage.cost
|
||||
@@ -541,10 +562,12 @@ export const layer: Layer.Layer<
|
||||
case "text-start":
|
||||
if (!ctx.assistantMessage.summary) {
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Text.Started.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Text.Started.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
}
|
||||
ctx.currentText = {
|
||||
id: PartID.ascending(),
|
||||
@@ -586,11 +609,13 @@ export const layer: Layer.Layer<
|
||||
)).text
|
||||
if (!ctx.assistantMessage.summary) {
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Text.Ended.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
text: ctx.currentText.text,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Text.Ended.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
text: ctx.currentText.text,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
}
|
||||
{
|
||||
const end = Date.now()
|
||||
@@ -680,14 +705,16 @@ export const layer: Layer.Layer<
|
||||
}
|
||||
if (!ctx.assistantMessage.summary) {
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Step.Failed.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
error: {
|
||||
type: "unknown",
|
||||
message: errorMessage(e),
|
||||
},
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Step.Failed.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
error: {
|
||||
type: "unknown",
|
||||
message: errorMessage(e),
|
||||
},
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
}
|
||||
}
|
||||
ctx.assistantMessage.error = error
|
||||
yield* bus.publish(Session.Event.Error, {
|
||||
@@ -732,22 +759,28 @@ export const layer: Layer.Layer<
|
||||
parse,
|
||||
set: (info) => {
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Retried.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
attempt: info.attempt,
|
||||
error: {
|
||||
message: info.message,
|
||||
isRetryable: true,
|
||||
},
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
return status.set(ctx.sessionID, {
|
||||
type: "retry",
|
||||
attempt: info.attempt,
|
||||
message: info.message,
|
||||
action: info.action,
|
||||
next: info.next,
|
||||
})
|
||||
const event = Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM
|
||||
? sync.run(SessionEvent.Retried.Sync, {
|
||||
sessionID: ctx.sessionID,
|
||||
attempt: info.attempt,
|
||||
error: {
|
||||
message: info.message,
|
||||
isRetryable: true,
|
||||
},
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
})
|
||||
: Effect.void
|
||||
return event.pipe(
|
||||
Effect.andThen(
|
||||
status.set(ctx.sessionID, {
|
||||
type: "retry",
|
||||
attempt: info.attempt,
|
||||
message: info.message,
|
||||
action: info.action,
|
||||
next: info.next,
|
||||
}),
|
||||
),
|
||||
)
|
||||
},
|
||||
}),
|
||||
),
|
||||
@@ -788,6 +821,7 @@ export const defaultLayer = Layer.suspend(() =>
|
||||
Layer.provide(Image.defaultLayer),
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ import { InstanceState } from "@/effect/instance-state"
|
||||
import { TaskTool, type TaskPromptOps } from "@/tool/task"
|
||||
import { SessionRunState } from "./run-state"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { EventV2 } from "@/v2/event"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { SessionEvent } from "@/v2/session-event"
|
||||
import { Modelv2 } from "@/v2/model"
|
||||
import { AgentAttachment, FileAttachment, Source } from "@/v2/session-prompt"
|
||||
@@ -118,6 +118,7 @@ export const layer = Layer.effect(
|
||||
const summary = yield* SessionSummary.Service
|
||||
const sys = yield* SystemPrompt.Service
|
||||
const llm = yield* LLM.Service
|
||||
const sync = yield* SyncEvent.Service
|
||||
const runner = Effect.fn("SessionPrompt.runner")(function* () {
|
||||
return yield* EffectBridge.make()
|
||||
})
|
||||
@@ -809,12 +810,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
},
|
||||
}
|
||||
yield* sessions.updatePart(part)
|
||||
EventV2.run(SessionEvent.Shell.Started.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(started),
|
||||
callID,
|
||||
command: input.command,
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Shell.Started.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(started),
|
||||
callID,
|
||||
command: input.command,
|
||||
})
|
||||
}
|
||||
return { msg, part, cwd: ctx.directory }
|
||||
}).pipe(Effect.ensuring(markReady))
|
||||
|
||||
@@ -830,12 +833,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
output += "\n\n" + ["<metadata>", "User aborted the command", "</metadata>"].join("\n")
|
||||
}
|
||||
const completed = Date.now()
|
||||
EventV2.run(SessionEvent.Shell.Ended.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(completed),
|
||||
callID: part.callID,
|
||||
output,
|
||||
})
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Shell.Ended.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(completed),
|
||||
callID: part.callID,
|
||||
output,
|
||||
})
|
||||
}
|
||||
if (!msg.time.completed) {
|
||||
msg.time.completed = completed
|
||||
yield* sessions.updateMessage(msg)
|
||||
@@ -975,34 +980,26 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
}
|
||||
|
||||
if (current?.agent !== info.agent) {
|
||||
EventV2.run(
|
||||
SessionEvent.AgentSwitched.Sync,
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(info.time.created),
|
||||
agent: info.agent,
|
||||
},
|
||||
{ bypassExperimentalEventSystem: true },
|
||||
)
|
||||
yield* sync.run(SessionEvent.AgentSwitched.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(info.time.created),
|
||||
agent: info.agent,
|
||||
})
|
||||
}
|
||||
if (
|
||||
current?.model?.providerID !== info.model.providerID ||
|
||||
current.model.id !== info.model.modelID ||
|
||||
(current.model.variant === "default" ? undefined : current.model.variant) !== info.model.variant
|
||||
) {
|
||||
EventV2.run(
|
||||
SessionEvent.ModelSwitched.Sync,
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(info.time.created),
|
||||
model: {
|
||||
id: Modelv2.ID.make(info.model.modelID),
|
||||
providerID: Modelv2.ProviderID.make(info.model.providerID),
|
||||
variant: Modelv2.VariantID.make(info.model.variant ?? "default"),
|
||||
},
|
||||
yield* sync.run(SessionEvent.ModelSwitched.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(info.time.created),
|
||||
model: {
|
||||
id: Modelv2.ID.make(info.model.modelID),
|
||||
providerID: Modelv2.ProviderID.make(info.model.providerID),
|
||||
variant: Modelv2.VariantID.make(info.model.variant ?? "default"),
|
||||
},
|
||||
{ bypassExperimentalEventSystem: true },
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
yield* Effect.addFinalizer(() => instruction.clear(info.id))
|
||||
@@ -1371,23 +1368,27 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
},
|
||||
)
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Prompted.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(info.time.created),
|
||||
prompt: {
|
||||
text: nextPrompt.text.join("\n"),
|
||||
files: nextPrompt.files,
|
||||
agents: nextPrompt.agents,
|
||||
},
|
||||
})
|
||||
for (const text of nextPrompt.synthetic) {
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
EventV2.run(SessionEvent.Synthetic.Sync, {
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Prompted.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(info.time.created),
|
||||
text,
|
||||
prompt: {
|
||||
text: nextPrompt.text.join("\n"),
|
||||
files: nextPrompt.files,
|
||||
agents: nextPrompt.agents,
|
||||
},
|
||||
})
|
||||
}
|
||||
for (const text of nextPrompt.synthetic) {
|
||||
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
|
||||
yield* sync.run(SessionEvent.Synthetic.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(info.time.created),
|
||||
text,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return { info, parts }
|
||||
}, Effect.scoped)
|
||||
@@ -1820,6 +1821,7 @@ export const defaultLayer = Layer.suspend(() =>
|
||||
LLM.defaultLayer,
|
||||
Bus.layer,
|
||||
CrossSpawnSpawner.defaultLayer,
|
||||
SyncEvent.defaultLayer,
|
||||
),
|
||||
),
|
||||
),
|
||||
|
||||
@@ -12,6 +12,7 @@ import { SessionEvent } from "./session-event"
|
||||
import { V2Schema } from "./schema"
|
||||
import { optionalOmitUndefined } from "@opencode-ai/core/schema"
|
||||
import { Modelv2 } from "./model"
|
||||
import { SyncEvent } from "@/sync"
|
||||
|
||||
export const Delivery = Schema.Literals(["immediate", "deferred"]).annotate({
|
||||
identifier: "Session.Delivery",
|
||||
@@ -113,6 +114,7 @@ export class Service extends Context.Service<Service, Interface>()("@opencode/v2
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const sync = yield* SyncEvent.Service
|
||||
const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message)
|
||||
|
||||
const decode = (row: typeof SessionMessageTable.$inferSelect) =>
|
||||
@@ -269,26 +271,18 @@ export const layer = Layer.effect(
|
||||
shell: Effect.fn("V2Session.shell")(function* (_input) {}),
|
||||
skill: Effect.fn("V2Session.skill")(function* (_input) {}),
|
||||
switchAgent: Effect.fn("V2Session.switchAgent")(function* (input) {
|
||||
EventV2.run(
|
||||
SessionEvent.AgentSwitched.Sync,
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
agent: input.agent,
|
||||
},
|
||||
{ bypassExperimentalEventSystem: true },
|
||||
)
|
||||
yield* sync.run(SessionEvent.AgentSwitched.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
agent: input.agent,
|
||||
})
|
||||
}),
|
||||
switchModel: Effect.fn("V2Session.switchModel")(function* (input) {
|
||||
EventV2.run(
|
||||
SessionEvent.ModelSwitched.Sync,
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
model: input.model,
|
||||
},
|
||||
{ bypassExperimentalEventSystem: true },
|
||||
)
|
||||
yield* sync.run(SessionEvent.ModelSwitched.Sync, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
model: input.model,
|
||||
})
|
||||
}),
|
||||
subagent: Effect.fn("V2Session.subagent")(function* (input) {
|
||||
const parent = yield* result.get(input.parentID)
|
||||
@@ -319,6 +313,6 @@ export const layer = Layer.effect(
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer
|
||||
export const defaultLayer = layer.pipe(Layer.provide(SyncEvent.defaultLayer))
|
||||
|
||||
export * as SessionV2 from "./session"
|
||||
|
||||
@@ -27,6 +27,7 @@ import { ProviderTest } from "../fake/provider"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { TestConfig } from "../fixture/config"
|
||||
import { SyncEvent } from "@/sync"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
@@ -223,6 +224,7 @@ const deps = Layer.mergeAll(
|
||||
Plugin.defaultLayer,
|
||||
Bus.layer,
|
||||
Config.defaultLayer,
|
||||
SyncEvent.defaultLayer,
|
||||
)
|
||||
|
||||
const env = Layer.mergeAll(
|
||||
@@ -269,6 +271,7 @@ function compactionProcessLayer(options?: CompactionProcessOptions) {
|
||||
Layer.provide(status),
|
||||
Layer.provide(bus),
|
||||
Layer.provide(options?.config ?? Config.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { provideTmpdirServer } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { raw, reply, TestLLMServer } from "../lib/llm-server"
|
||||
import { SyncEvent } from "@/sync"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
@@ -166,6 +167,7 @@ const deps = Layer.mergeAll(
|
||||
LLM.defaultLayer,
|
||||
Provider.defaultLayer,
|
||||
status,
|
||||
SyncEvent.defaultLayer,
|
||||
).pipe(Layer.provideMerge(infra))
|
||||
const env = Layer.mergeAll(
|
||||
TestLLMServer.layer,
|
||||
|
||||
@@ -50,6 +50,7 @@ import { Reference } from "../../src/reference/reference"
|
||||
import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { reply, TestLLMServer } from "../lib/llm-server"
|
||||
import { SyncEvent } from "@/sync"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
@@ -174,6 +175,7 @@ function makeHttp() {
|
||||
mcp,
|
||||
AppFileSystem.defaultLayer,
|
||||
status,
|
||||
SyncEvent.defaultLayer,
|
||||
).pipe(Layer.provideMerge(infra))
|
||||
const question = Question.layer.pipe(Layer.provideMerge(deps))
|
||||
const todo = Todo.layer.pipe(Layer.provideMerge(deps))
|
||||
|
||||
@@ -58,6 +58,7 @@ import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { Ripgrep } from "../../src/file/ripgrep"
|
||||
import { Format } from "../../src/format"
|
||||
import { Reference } from "../../src/reference/reference"
|
||||
import { SyncEvent } from "@/sync"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
@@ -124,6 +125,7 @@ function makeHttp() {
|
||||
mcp,
|
||||
AppFileSystem.defaultLayer,
|
||||
status,
|
||||
SyncEvent.defaultLayer,
|
||||
).pipe(Layer.provideMerge(infra))
|
||||
const question = Question.layer.pipe(Layer.provideMerge(deps))
|
||||
const todo = Todo.layer.pipe(Layer.provideMerge(deps))
|
||||
|
||||
Reference in New Issue
Block a user