diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 84ea766568..94fda16a36 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -24,6 +24,318 @@ export namespace SessionProcessor { export type Info = Awaited> export type Result = Awaited> + interface ProcessorContext { + assistantMessage: MessageV2.Assistant + sessionID: SessionID + model: Provider.Model + abort: AbortSignal + toolcalls: Record + shouldBreak: boolean + snapshot: string | undefined + blocked: boolean + needsCompaction: boolean + currentText: MessageV2.TextPart | undefined + reasoningMap: Record + } + + type StreamResult = Awaited> + type StreamEvent = StreamResult["fullStream"] extends AsyncIterable ? T : never + + async function handleEvent(value: StreamEvent, ctx: ProcessorContext) { + switch (value.type) { + case "start": + await SessionStatus.set(ctx.sessionID, { type: "busy" }) + break + + case "reasoning-start": + if (value.id in ctx.reasoningMap) break + const reasoningPart = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "reasoning" as const, + text: "", + time: { start: Date.now() }, + metadata: value.providerMetadata, + } + ctx.reasoningMap[value.id] = reasoningPart + await Session.updatePart(reasoningPart) + break + + case "reasoning-delta": + if (value.id in ctx.reasoningMap) { + const part = ctx.reasoningMap[value.id] + part.text += value.text + if (value.providerMetadata) part.metadata = value.providerMetadata + await Session.updatePartDelta({ + sessionID: part.sessionID, + messageID: part.messageID, + partID: part.id, + field: "text", + delta: value.text, + }) + } + break + + case "reasoning-end": + if (value.id in ctx.reasoningMap) { + const part = ctx.reasoningMap[value.id] + part.text = part.text.trimEnd() + part.time = { ...part.time, end: Date.now() } + if (value.providerMetadata) part.metadata = value.providerMetadata + await Session.updatePart(part) + delete ctx.reasoningMap[value.id] + } + break + + case "tool-input-start": { + const part = await Session.updatePart({ + id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "tool", + tool: value.toolName, + callID: value.id, + state: { status: "pending", input: {}, raw: "" }, + }) + ctx.toolcalls[value.id] = part as MessageV2.ToolPart + break + } + + case "tool-input-delta": + break + + case "tool-input-end": + break + + case "tool-call": { + const match = ctx.toolcalls[value.toolCallId] + if (match) { + const part = await Session.updatePart({ + ...match, + tool: value.toolName, + state: { status: "running", input: value.input, time: { start: Date.now() } }, + metadata: value.providerMetadata, + }) + ctx.toolcalls[value.toolCallId] = part as MessageV2.ToolPart + + const parts = await MessageV2.parts(ctx.assistantMessage.id) + const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) + + if ( + lastThree.length === DOOM_LOOP_THRESHOLD && + lastThree.every( + (p) => + p.type === "tool" && + p.tool === value.toolName && + p.state.status !== "pending" && + JSON.stringify(p.state.input) === JSON.stringify(value.input), + ) + ) { + const agent = await Agent.get(ctx.assistantMessage.agent) + await Permission.ask({ + permission: "doom_loop", + patterns: [value.toolName], + sessionID: ctx.assistantMessage.sessionID, + metadata: { tool: value.toolName, input: value.input }, + always: [value.toolName], + ruleset: agent.permission, + }) + } + } + break + } + + case "tool-result": { + const match = ctx.toolcalls[value.toolCallId] + if (match && match.state.status === "running") { + await Session.updatePart({ + ...match, + state: { + status: "completed", + input: value.input ?? match.state.input, + output: value.output.output, + metadata: value.output.metadata, + title: value.output.title, + time: { start: match.state.time.start, end: Date.now() }, + attachments: value.output.attachments, + }, + }) + delete ctx.toolcalls[value.toolCallId] + } + break + } + + case "tool-error": { + const match = ctx.toolcalls[value.toolCallId] + if (match && match.state.status === "running") { + await Session.updatePart({ + ...match, + state: { + status: "error", + input: value.input ?? match.state.input, + error: value.error instanceof Error ? value.error.message : String(value.error), + time: { start: match.state.time.start, end: Date.now() }, + }, + }) + if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) { + ctx.blocked = ctx.shouldBreak + } + delete ctx.toolcalls[value.toolCallId] + } + break + } + + case "error": + throw value.error + + case "start-step": + ctx.snapshot = await Snapshot.track() + await Session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + snapshot: ctx.snapshot, + type: "step-start", + }) + break + + case "finish-step": { + const usage = Session.getUsage({ + model: ctx.model, + usage: value.usage, + metadata: value.providerMetadata, + }) + ctx.assistantMessage.finish = value.finishReason + ctx.assistantMessage.cost += usage.cost + ctx.assistantMessage.tokens = usage.tokens + await Session.updatePart({ + id: PartID.ascending(), + reason: value.finishReason, + snapshot: await Snapshot.track(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, + }) + await Session.updateMessage(ctx.assistantMessage) + if (ctx.snapshot) { + const patch = await Snapshot.patch(ctx.snapshot) + if (patch.files.length) { + await Session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + SessionSummary.summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }) + if ( + !ctx.assistantMessage.summary && + (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: ctx.model })) + ) { + ctx.needsCompaction = true + } + break + } + + case "text-start": + ctx.currentText = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "text", + text: "", + time: { start: Date.now() }, + metadata: value.providerMetadata, + } + await Session.updatePart(ctx.currentText) + break + + case "text-delta": + if (ctx.currentText) { + ctx.currentText.text += value.text + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + await Session.updatePartDelta({ + sessionID: ctx.currentText.sessionID, + messageID: ctx.currentText.messageID, + partID: ctx.currentText.id, + field: "text", + delta: value.text, + }) + } + break + + case "text-end": + if (ctx.currentText) { + ctx.currentText.text = ctx.currentText.text.trimEnd() + const textOutput = await Plugin.trigger( + "experimental.text.complete", + { + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.id, + partID: ctx.currentText.id, + }, + { text: ctx.currentText.text }, + ) + ctx.currentText.text = textOutput.text + ctx.currentText.time = { start: Date.now(), end: Date.now() } + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + await Session.updatePart(ctx.currentText) + } + ctx.currentText = undefined + break + + case "finish": + break + + default: + log.info("unhandled", { ...value }) + break + } + } + + async function cleanup(ctx: ProcessorContext) { + if (ctx.snapshot) { + const patch = await Snapshot.patch(ctx.snapshot) + if (patch.files.length) { + await Session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + const parts = await MessageV2.parts(ctx.assistantMessage.id) + for (const part of parts) { + if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { + await Session.updatePart({ + ...part, + state: { + ...part.state, + status: "error", + error: "Tool execution aborted", + time: { start: Date.now(), end: Date.now() }, + }, + }) + } + } + ctx.assistantMessage.time.completed = Date.now() + await Session.updateMessage(ctx.assistantMessage) + } + export function create(input: { assistantMessage: MessageV2.Assistant sessionID: SessionID @@ -47,322 +359,41 @@ export namespace SessionProcessor { log.info("process") needsCompaction = false const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true + + const ctx: ProcessorContext = { + assistantMessage: input.assistantMessage, + sessionID: input.sessionID, + model: input.model, + abort: input.abort, + toolcalls, + shouldBreak, + get snapshot() { return snapshot }, + set snapshot(v) { snapshot = v }, + get blocked() { return blocked }, + set blocked(v) { blocked = v }, + get needsCompaction() { return needsCompaction }, + set needsCompaction(v) { needsCompaction = v }, + currentText: undefined, + reasoningMap: {}, + } + while (true) { try { - let currentText: MessageV2.TextPart | undefined - let reasoningMap: Record = {} + ctx.currentText = undefined + ctx.reasoningMap = {} const stream = await LLM.stream(streamInput) for await (const value of stream.fullStream) { input.abort.throwIfAborted() - switch (value.type) { - case "start": - await SessionStatus.set(input.sessionID, { type: "busy" }) - break - - case "reasoning-start": - if (value.id in reasoningMap) { - continue - } - const reasoningPart = { - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "reasoning" as const, - text: "", - time: { - start: Date.now(), - }, - metadata: value.providerMetadata, - } - reasoningMap[value.id] = reasoningPart - await Session.updatePart(reasoningPart) - break - - case "reasoning-delta": - if (value.id in reasoningMap) { - const part = reasoningMap[value.id] - part.text += value.text - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: part.sessionID, - messageID: part.messageID, - partID: part.id, - field: "text", - delta: value.text, - }) - } - break - - case "reasoning-end": - if (value.id in reasoningMap) { - const part = reasoningMap[value.id] - part.text = part.text.trimEnd() - - part.time = { - ...part.time, - end: Date.now(), - } - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePart(part) - delete reasoningMap[value.id] - } - break - - case "tool-input-start": - const part = await Session.updatePart({ - id: toolcalls[value.id]?.id ?? PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "tool", - tool: value.toolName, - callID: value.id, - state: { - status: "pending", - input: {}, - raw: "", - }, - }) - toolcalls[value.id] = part as MessageV2.ToolPart - break - - case "tool-input-delta": - break - - case "tool-input-end": - break - - case "tool-call": { - const match = toolcalls[value.toolCallId] - if (match) { - const part = await Session.updatePart({ - ...match, - tool: value.toolName, - state: { - status: "running", - input: value.input, - time: { - start: Date.now(), - }, - }, - metadata: value.providerMetadata, - }) - toolcalls[value.toolCallId] = part as MessageV2.ToolPart - - const parts = await MessageV2.parts(input.assistantMessage.id) - const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) - - if ( - lastThree.length === DOOM_LOOP_THRESHOLD && - lastThree.every( - (p) => - p.type === "tool" && - p.tool === value.toolName && - p.state.status !== "pending" && - JSON.stringify(p.state.input) === JSON.stringify(value.input), - ) - ) { - const agent = await Agent.get(input.assistantMessage.agent) - await Permission.ask({ - permission: "doom_loop", - patterns: [value.toolName], - sessionID: input.assistantMessage.sessionID, - metadata: { - tool: value.toolName, - input: value.input, - }, - always: [value.toolName], - ruleset: agent.permission, - }) - } - } - break - } - case "tool-result": { - const match = toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "completed", - input: value.input ?? match.state.input, - output: value.output.output, - metadata: value.output.metadata, - title: value.output.title, - time: { - start: match.state.time.start, - end: Date.now(), - }, - attachments: value.output.attachments, - }, - }) - - delete toolcalls[value.toolCallId] - } - break - } - - case "tool-error": { - const match = toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "error", - input: value.input ?? match.state.input, - error: value.error instanceof Error ? value.error.message : String(value.error), - time: { - start: match.state.time.start, - end: Date.now(), - }, - }, - }) - - if ( - value.error instanceof Permission.RejectedError || - value.error instanceof Question.RejectedError - ) { - blocked = shouldBreak - } - delete toolcalls[value.toolCallId] - } - break - } - case "error": - throw value.error - - case "start-step": - snapshot = await Snapshot.track() - await Session.updatePart({ - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - snapshot, - type: "step-start", - }) - break - - case "finish-step": - const usage = Session.getUsage({ - model: input.model, - usage: value.usage, - metadata: value.providerMetadata, - }) - input.assistantMessage.finish = value.finishReason - input.assistantMessage.cost += usage.cost - input.assistantMessage.tokens = usage.tokens - await Session.updatePart({ - id: PartID.ascending(), - reason: value.finishReason, - snapshot: await Snapshot.track(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - await Session.updateMessage(input.assistantMessage) - if (snapshot) { - const patch = await Snapshot.patch(snapshot) - if (patch.files.length) { - await Session.updatePart({ - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - snapshot = undefined - } - SessionSummary.summarize({ - sessionID: input.sessionID, - messageID: input.assistantMessage.parentID, - }) - if ( - !input.assistantMessage.summary && - (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model })) - ) { - needsCompaction = true - } - break - - case "text-start": - currentText = { - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "text", - text: "", - time: { - start: Date.now(), - }, - metadata: value.providerMetadata, - } - await Session.updatePart(currentText) - break - - case "text-delta": - if (currentText) { - currentText.text += value.text - if (value.providerMetadata) currentText.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: currentText.sessionID, - messageID: currentText.messageID, - partID: currentText.id, - field: "text", - delta: value.text, - }) - } - break - - case "text-end": - if (currentText) { - currentText.text = currentText.text.trimEnd() - const textOutput = await Plugin.trigger( - "experimental.text.complete", - { - sessionID: input.sessionID, - messageID: input.assistantMessage.id, - partID: currentText.id, - }, - { text: currentText.text }, - ) - currentText.text = textOutput.text - currentText.time = { - start: Date.now(), - end: Date.now(), - } - if (value.providerMetadata) currentText.metadata = value.providerMetadata - await Session.updatePart(currentText) - } - currentText = undefined - break - - case "finish": - break - - default: - log.info("unhandled", { - ...value, - }) - continue - } + await handleEvent(value, ctx) if (needsCompaction) break } } catch (e: any) { - log.error("process", { - error: e, - stack: JSON.stringify(e.stack), - }) + log.error("process", { error: e, stack: JSON.stringify(e.stack) }) const error = MessageV2.fromError(e, { providerID: input.model.providerID, aborted: input.abort.aborted }) if (MessageV2.ContextOverflowError.isInstance(error)) { needsCompaction = true - Bus.publish(Session.Event.Error, { - sessionID: input.sessionID, - error, - }) + Bus.publish(Session.Event.Error, { sessionID: input.sessionID, error }) } else { const retry = SessionRetry.retryable(error) if (retry !== undefined) { @@ -385,39 +416,7 @@ export namespace SessionProcessor { await SessionStatus.set(input.sessionID, { type: "idle" }) } } - if (snapshot) { - const patch = await Snapshot.patch(snapshot) - if (patch.files.length) { - await Session.updatePart({ - id: PartID.ascending(), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - snapshot = undefined - } - const p = await MessageV2.parts(input.assistantMessage.id) - for (const part of p) { - if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { - await Session.updatePart({ - ...part, - state: { - ...part.state, - status: "error", - error: "Tool execution aborted", - time: { - start: Date.now(), - end: Date.now(), - }, - }, - }) - } - } - input.assistantMessage.time.completed = Date.now() - await Session.updateMessage(input.assistantMessage) + await cleanup(ctx) if (needsCompaction) return "compact" if (blocked) return "stop" if (input.assistantMessage.error) return "stop"