diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 223e71639c..7e6b81317b 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -235,7 +235,6 @@ When constructing the summary, try to stick to this template: assistantMessage: msg, sessionID: input.sessionID, model, - abort: input.abort, }) const cancel = Effect.fn("SessionCompaction.cancel")(function* () { if (!input.abort.aborted || msg.time.completed) return @@ -248,7 +247,6 @@ When constructing the summary, try to stick to this template: .process({ user: userMessage, agent, - abort: input.abort, sessionID: input.sessionID, tools: {}, system: [], diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index 02b72f70a4..4d7d80b241 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -1,6 +1,6 @@ import { Provider } from "@/provider/provider" import { Log } from "@/util/log" -import { Effect, Layer, ServiceMap } from "effect" +import { Effect, Layer, Record, ServiceMap } from "effect" import * as Stream from "effect/Stream" import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai" import { mergeDeep, pipe } from "remeda" @@ -28,7 +28,6 @@ export namespace LLM { agent: Agent.Info permission?: Permission.Ruleset system: string[] - abort: AbortSignal messages: ModelMessage[] small?: boolean tools: Record @@ -36,6 +35,10 @@ export namespace LLM { toolChoice?: "auto" | "required" | "none" } + export type StreamRequest = StreamInput & { + abort: AbortSignal + } + export type Event = Awaited>["fullStream"] extends AsyncIterable ? T : never export interface Interface { @@ -50,7 +53,7 @@ export namespace LLM { return Service.of({ stream(input) { return Stream.unwrap( - Effect.promise(() => LLM.stream(input)).pipe( + Effect.promise((signal) => LLM.stream({ ...input, abort: signal })).pipe( Effect.map((result) => Stream.fromAsyncIterable(result.fullStream, (err) => err).pipe( Stream.mapEffect((event) => Effect.succeed(event)), @@ -65,7 +68,7 @@ export namespace LLM { export const defaultLayer = layer - export async function stream(input: StreamInput) { + export async function stream(input: StreamRequest) { const l = log .clone() .tag("providerID", input.model.providerID) @@ -314,17 +317,12 @@ export namespace LLM { }) } - async function resolveTools(input: Pick) { + function resolveTools(input: Pick) { const disabled = Permission.disabled( Object.keys(input.tools), Permission.merge(input.agent.permission, input.permission ?? []), ) - for (const tool of Object.keys(input.tools)) { - if (input.user.tools?.[tool] === false || disabled.has(tool)) { - delete input.tools[tool] - } - } - return input.tools + return Record.filter(input.tools, (_, k) => input.user.tools?.[k] !== false && !disabled.has(k)) } // Check if messages contain any tool-call content diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index d2459cd8ba..7ee23bf306 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -2,7 +2,6 @@ import { Cause, Effect, Exit, Layer, ServiceMap } from "effect" import * as Stream from "effect/Stream" import { Agent } from "@/agent/agent" import { Bus } from "@/bus" -import { makeRuntime } from "@/effect/run-service" import { Config } from "@/config/config" import { Permission } from "@/permission" import { Plugin } from "@/plugin" @@ -35,17 +34,10 @@ export namespace SessionProcessor { readonly process: (streamInput: LLM.StreamInput) => Effect.Effect } - export interface Info { - readonly message: MessageV2.Assistant - readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined - readonly process: (streamInput: LLM.StreamInput) => Promise - } - type Input = { assistantMessage: MessageV2.Assistant sessionID: SessionID model: Provider.Model - abort: AbortSignal } export interface Interface { @@ -96,7 +88,6 @@ export namespace SessionProcessor { assistantMessage: input.assistantMessage, sessionID: input.sessionID, model: input.model, - abort: input.abort, toolcalls: {}, shouldBreak: false, snapshot: undefined, @@ -105,11 +96,12 @@ export namespace SessionProcessor { currentText: undefined, reasoningMap: {}, } + let aborted = false const parse = (e: unknown) => MessageV2.fromError(e, { providerID: input.model.providerID, - aborted: input.abort.aborted, + aborted, }) const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) { @@ -440,16 +432,12 @@ export namespace SessionProcessor { const stream = llm.stream(streamInput) yield* stream.pipe( - Stream.tap((event) => - Effect.gen(function* () { - input.abort.throwIfAborted() - yield* handleEvent(event) - }), - ), + Stream.tap((event) => handleEvent(event)), Stream.takeUntil(() => ctx.needsCompaction), Stream.runDrain, ) }).pipe( + Effect.onInterrupt(() => Effect.sync(() => void (aborted = true))), Effect.catchCauseIf( (cause) => !Cause.hasInterruptsOnly(cause), (cause) => Effect.fail(Cause.squash(cause)), @@ -468,17 +456,20 @@ export namespace SessionProcessor { ), Effect.catchCause((cause) => Cause.hasInterruptsOnly(cause) - ? halt(new DOMException("Aborted", "AbortError")) + ? Effect.gen(function* () { + aborted = true + yield* halt(new DOMException("Aborted", "AbortError")) + }) : halt(Cause.squash(cause)), ), Effect.ensuring(cleanup()), ) - if (input.abort.aborted && !ctx.assistantMessage.error) { + if (aborted && !ctx.assistantMessage.error) { yield* abort() } if (ctx.needsCompaction) return "compact" - if (ctx.blocked || ctx.assistantMessage.error || input.abort.aborted) return "stop" + if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop" return "continue" }) @@ -526,29 +517,4 @@ export namespace SessionProcessor { ), ), ) - - const { runPromise } = makeRuntime(Service, defaultLayer) - - export async function create(input: Input): Promise { - const hit = await runPromise((svc) => svc.create(input)) - return { - get message() { - return hit.message - }, - partFromToolCall(toolCallID: string) { - return hit.partFromToolCall(toolCallID) - }, - async process(streamInput: LLM.StreamInput) { - const exit = await Effect.runPromiseExit(hit.process(streamInput), { signal: input.abort }) - if (Exit.isFailure(exit)) { - if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) { - await Effect.runPromise(hit.abort()) - return "stop" - } - throw Cause.squash(exit.cause) - } - return exit.value - }, - } - } } diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 8d418c8875..a54f8f7471 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -201,7 +201,8 @@ export namespace SessionPrompt { const text = yield* Effect.promise(async (signal) => { const mdl = ag.model ? await Provider.getModel(ag.model.providerID, ag.model.modelID) - : (await Provider.getSmallModel(input.providerID)) ?? (await Provider.getModel(input.providerID, input.modelID)) + : ((await Provider.getSmallModel(input.providerID)) ?? + (await Provider.getModel(input.providerID, input.modelID))) const msgs = onlySubtasks ? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }] : await MessageV2.toModelMessages(context, mdl) @@ -236,7 +237,9 @@ export namespace SessionPrompt { const hint = e.data.suggestions?.length ? ` Did you mean: ${e.data.suggestions.join(", ")}?` : "" Bus.publish(Session.Event.Error, { sessionID, - error: new NamedError.Unknown({ message: `Model not found: ${e.data.providerID}/${e.data.modelID}.${hint}` }).toObject(), + error: new NamedError.Unknown({ + message: `Model not found: ${e.data.providerID}/${e.data.modelID}.${hint}`, + }).toObject(), }) } throw e @@ -288,7 +291,13 @@ export namespace SessionPrompt { const { clientName, uri } = part.source log.info("mcp resource", { clientName, uri, mime: part.mime }) const pieces: Draft[] = [ - { messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Reading MCP resource: ${part.filename} (${uri})` }, + { + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: `Reading MCP resource: ${part.filename} (${uri})`, + }, ] try { const content = await MCP.readResource(clientName, uri) @@ -296,17 +305,35 @@ export namespace SessionPrompt { const items = Array.isArray(content.contents) ? content.contents : [content.contents] for (const c of items) { if ("text" in c && c.text) { - pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: c.text }) + pieces.push({ + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: c.text, + }) } else if ("blob" in c && c.blob) { const mime = "mimeType" in c ? c.mimeType : part.mime - pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `[Binary content: ${mime}]` }) + pieces.push({ + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: `[Binary content: ${mime}]`, + }) } } pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID }) } catch (error: unknown) { log.error("failed to read MCP resource", { error, clientName, uri }) const message = error instanceof Error ? error.message : String(error) - pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Failed to read MCP resource ${part.filename}: ${message}` }) + pieces.push({ + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: `Failed to read MCP resource ${part.filename}: ${message}`, + }) } return pieces } @@ -315,8 +342,20 @@ export namespace SessionPrompt { case "data:": if (part.mime === "text/plain") { return [ - { messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Called the Read tool with the following input: ${JSON.stringify({ filePath: part.filename })}` }, - { messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: decodeDataUrl(part.url) }, + { + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: `Called the Read tool with the following input: ${JSON.stringify({ filePath: part.filename })}`, + }, + { + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: decodeDataUrl(part.url), + }, { ...part, messageID: info.id, sessionID: input.sessionID }, ] } @@ -353,7 +392,13 @@ export namespace SessionPrompt { } const args = { filePath: filepath, offset, limit } const pieces: Draft[] = [ - { messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Called the Read tool with the following input: ${JSON.stringify(args)}` }, + { + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: `Called the Read tool with the following input: ${JSON.stringify(args)}`, + }, ] await ReadTool.init() .then(async (t) => { @@ -369,9 +414,23 @@ export namespace SessionPrompt { ask: async () => {}, } const result = await t.execute(args, ctx) - pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: result.output }) + pieces.push({ + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: result.output, + }) if (result.attachments?.length) { - pieces.push(...result.attachments.map((a) => ({ ...a, synthetic: true, filename: a.filename ?? part.filename, messageID: info.id, sessionID: input.sessionID }))) + pieces.push( + ...result.attachments.map((a) => ({ + ...a, + synthetic: true, + filename: a.filename ?? part.filename, + messageID: info.id, + sessionID: input.sessionID, + })), + ) } else { pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID }) } @@ -379,8 +438,17 @@ export namespace SessionPrompt { .catch((error) => { log.error("failed to read file", { error }) const message = error instanceof Error ? error.message : error.toString() - Bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: new NamedError.Unknown({ message }).toObject() }) - pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Read tool failed to read ${filepath} with the following error: ${message}` }) + Bus.publish(Session.Event.Error, { + sessionID: input.sessionID, + error: new NamedError.Unknown({ message }).toObject(), + }) + pieces.push({ + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: `Read tool failed to read ${filepath} with the following error: ${message}`, + }) }) return pieces } @@ -399,15 +467,33 @@ export namespace SessionPrompt { } const result = await ReadTool.init().then((t) => t.execute(args, ctx)) return [ - { messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Called the Read tool with the following input: ${JSON.stringify(args)}` }, - { messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: result.output }, + { + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: `Called the Read tool with the following input: ${JSON.stringify(args)}`, + }, + { + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: result.output, + }, { ...part, messageID: info.id, sessionID: input.sessionID }, ] } await FileTime.read(input.sessionID, filepath) return [ - { messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Called the Read tool with the following input: {"filePath":"${filepath}"}` }, + { + messageID: info.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: `Called the Read tool with the following input: {"filePath":"${filepath}"}`, + }, { id: part.id, messageID: info.id, @@ -433,7 +519,10 @@ export namespace SessionPrompt { sessionID: input.sessionID, type: "text", synthetic: true, - text: " Use the above message and context to generate a prompt and call the task tool with subagent: " + part.name + hint, + text: + " Use the above message and context to generate a prompt and call the task tool with subagent: " + + part.name + + hint, }, ] } @@ -443,13 +532,17 @@ export namespace SessionPrompt { ).then((x) => x.flat().map(assign)), ) - yield* plugin.trigger("chat.message", { - sessionID: input.sessionID, - agent: input.agent, - model: input.model, - messageID: input.messageID, - variant: input.variant, - }, { message: info, parts }) + yield* plugin.trigger( + "chat.message", + { + sessionID: input.sessionID, + agent: input.agent, + model: input.model, + messageID: input.messageID, + variant: input.variant, + }, + { message: info, parts }, + ) const parsed = MessageV2.Info.safeParse(info) if (!parsed.success) { @@ -614,12 +707,10 @@ export namespace SessionPrompt { time: { created: Date.now() }, sessionID, }) - const ctrl = new AbortController() const handle = yield* processor.create({ assistantMessage: msg as MessageV2.Assistant, sessionID, model, - abort: ctrl.signal, }) const outcome: "break" | "continue" = yield* Effect.onExit( @@ -685,7 +776,6 @@ export namespace SessionPrompt { user: lastUser!, agent, permission: session.permission, - abort: ctrl.signal, sessionID, system, messages: [...modelMsgs, ...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : [])], @@ -727,7 +817,6 @@ export namespace SessionPrompt { }), (exit) => Effect.gen(function* () { - ctrl.abort() if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort() InstructionPrompt.clear(handle.message.id) }), diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts index 9c8559c35a..4795823513 100644 --- a/packages/opencode/test/session/compaction.test.ts +++ b/packages/opencode/test/session/compaction.test.ts @@ -129,7 +129,7 @@ async function tool(sessionID: SessionID, messageID: MessageID, tool: string, ou } function fake( - input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0], + input: Parameters[0], result: "continue" | "compact", ) { const msg = input.assistantMessage diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index cd9d97e15f..9efbaa159d 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -1,7 +1,7 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect } from "bun:test" import { APICallError } from "ai" -import { Effect, Layer, ServiceMap } from "effect" +import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect" import * as Stream from "effect/Stream" import path from "path" import type { Agent } from "../../src/agent/agent" @@ -120,21 +120,8 @@ function fail(err: E, ...items: LLM.Event[]) { return stream(...items).pipe(Stream.concat(Stream.fail(err))) } -function wait(abort: AbortSignal) { - return Effect.promise( - () => - new Promise((done) => { - abort.addEventListener("abort", () => done(), { once: true }) - }), - ) -} - -function hang(input: LLM.StreamInput, ...items: LLM.Event[]) { - return stream(...items).pipe( - Stream.concat( - Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))), - ), - ) +function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) { + return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never))) } function model(context: number): Provider.Model { @@ -291,13 +278,11 @@ it.effect("session.processor effect tests capture llm input cleanly", () => { const chat = yield* session.create({}) const parent = yield* user(chat.id, "hi") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const abort = new AbortController() const mdl = model(100) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, model: mdl, - abort: abort.signal, }) const input = { @@ -313,7 +298,6 @@ it.effect("session.processor effect tests capture llm input cleanly", () => { model: mdl, agent: agent(), system: [], - abort: abort.signal, messages: [{ role: "user", content: "hi" }], tools: {}, } satisfies LLM.StreamInput @@ -359,13 +343,11 @@ it.effect("session.processor effect tests stop after token overflow requests com const chat = yield* session.create({}) const parent = yield* user(chat.id, "compact") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const abort = new AbortController() const mdl = model(20) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, model: mdl, - abort: abort.signal, }) const value = yield* handle.process({ @@ -381,7 +363,6 @@ it.effect("session.processor effect tests stop after token overflow requests com model: mdl, agent: agent(), system: [], - abort: abort.signal, messages: [{ role: "user", content: "compact" }], tools: {}, }) @@ -433,13 +414,11 @@ it.effect("session.processor effect tests reset reasoning state across retries", const chat = yield* session.create({}) const parent = yield* user(chat.id, "reason") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const abort = new AbortController() const mdl = model(100) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, model: mdl, - abort: abort.signal, }) const value = yield* handle.process({ @@ -455,7 +434,6 @@ it.effect("session.processor effect tests reset reasoning state across retries", model: mdl, agent: agent(), system: [], - abort: abort.signal, messages: [{ role: "user", content: "reason" }], tools: {}, }) @@ -485,13 +463,11 @@ it.effect("session.processor effect tests do not retry unknown json errors", () const chat = yield* session.create({}) const parent = yield* user(chat.id, "json") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const abort = new AbortController() const mdl = model(100) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, model: mdl, - abort: abort.signal, }) const value = yield* handle.process({ @@ -507,7 +483,6 @@ it.effect("session.processor effect tests do not retry unknown json errors", () model: mdl, agent: agent(), system: [], - abort: abort.signal, messages: [{ role: "user", content: "json" }], tools: {}, }) @@ -535,13 +510,11 @@ it.effect("session.processor effect tests retry recognized structured json error const chat = yield* session.create({}) const parent = yield* user(chat.id, "retry json") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const abort = new AbortController() const mdl = model(100) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, model: mdl, - abort: abort.signal, }) const value = yield* handle.process({ @@ -557,7 +530,6 @@ it.effect("session.processor effect tests retry recognized structured json error model: mdl, agent: agent(), system: [], - abort: abort.signal, messages: [{ role: "user", content: "retry json" }], tools: {}, }) @@ -601,7 +573,6 @@ it.effect("session.processor effect tests publish retry status updates", () => { const chat = yield* session.create({}) const parent = yield* user(chat.id, "retry") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const abort = new AbortController() const mdl = model(100) const states: number[] = [] const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => { @@ -612,7 +583,6 @@ it.effect("session.processor effect tests publish retry status updates", () => { assistantMessage: msg, sessionID: chat.id, model: mdl, - abort: abort.signal, }) const value = yield* handle.process({ @@ -628,7 +598,6 @@ it.effect("session.processor effect tests publish retry status updates", () => { model: mdl, agent: agent(), system: [], - abort: abort.signal, messages: [{ role: "user", content: "retry" }], tools: {}, }) @@ -656,13 +625,11 @@ it.effect("session.processor effect tests compact on structured context overflow const chat = yield* session.create({}) const parent = yield* user(chat.id, "compact json") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const abort = new AbortController() const mdl = model(100) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, model: mdl, - abort: abort.signal, }) const value = yield* handle.process({ @@ -678,7 +645,6 @@ it.effect("session.processor effect tests compact on structured context overflow model: mdl, agent: agent(), system: [], - abort: abort.signal, messages: [{ role: "user", content: "compact json" }], tools: {}, }) @@ -710,17 +676,15 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean const chat = yield* session.create({}) const parent = yield* user(chat.id, "tool abort") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const abort = new AbortController() const mdl = model(100) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, model: mdl, - abort: abort.signal, }) - const run = Effect.runPromise( - handle.process({ + const run = yield* handle + .process({ user: { id: parent.id, sessionID: chat.id, @@ -733,20 +697,25 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean model: mdl, agent: agent(), system: [], - abort: abort.signal, messages: [{ role: "user", content: "tool abort" }], tools: {}, - }), - ) + }) + .pipe(Effect.forkChild) yield* Effect.promise(() => ready.promise) - abort.abort() + yield* Fiber.interrupt(run) - const value = yield* Effect.promise(() => run) + const exit = yield* Fiber.await(run) + if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) { + yield* handle.abort() + } const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool") - expect(value).toBe("stop") + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true) + } expect(yield* test.calls).toBe(1) expect(tool?.state.status).toBe("error") if (tool?.state.status === "error") { @@ -779,7 +748,6 @@ it.effect("session.processor effect tests record aborted errors and idle state", const chat = yield* session.create({}) const parent = yield* user(chat.id, "abort") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const abort = new AbortController() const mdl = model(100) const errs: string[] = [] const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => { @@ -792,11 +760,10 @@ it.effect("session.processor effect tests record aborted errors and idle state", assistantMessage: msg, sessionID: chat.id, model: mdl, - abort: abort.signal, }) - const run = Effect.runPromise( - handle.process({ + const run = yield* handle + .process({ user: { id: parent.id, sessionID: chat.id, @@ -809,22 +776,27 @@ it.effect("session.processor effect tests record aborted errors and idle state", model: mdl, agent: agent(), system: [], - abort: abort.signal, messages: [{ role: "user", content: "abort" }], tools: {}, - }), - ) + }) + .pipe(Effect.forkChild) yield* Effect.promise(() => ready.promise) - abort.abort() + yield* Fiber.interrupt(run) - const value = yield* Effect.promise(() => run) + const exit = yield* Fiber.await(run) + if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) { + yield* handle.abort() + } yield* Effect.promise(() => seen.promise) const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) const state = yield* status.get(chat.id) off() - expect(value).toBe("stop") + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true) + } expect(handle.message.error?.name).toBe("MessageAbortedError") expect(stored.info.role).toBe("assistant") if (stored.info.role === "assistant") { diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 928089eac7..56828bf865 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -8,6 +8,9 @@ import { Agent as AgentSvc } from "../../src/agent/agent" import { Bus } from "../../src/bus" import { Command } from "../../src/command" import { Config } from "../../src/config/config" +import { FileTime } from "../../src/file/time" +import { LSP } from "../../src/lsp" +import { MCP } from "../../src/mcp" import { Permission } from "../../src/permission" import { Plugin } from "../../src/plugin" import type { Provider } from "../../src/provider/provider" @@ -98,21 +101,39 @@ function finish(): LLM.Event { return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() } } -function wait(abort: AbortSignal) { - return Effect.promise( - () => - new Promise((done) => { - abort.addEventListener("abort", () => done(), { once: true }) - }), - ) +function finishToolCallsStep(): LLM.Event { + return { + type: "finish-step", + finishReason: "tool-calls", + rawFinishReason: "tool_calls", + response: { id: "res", modelId: "test-model", timestamp: new Date() }, + providerMetadata: undefined, + usage: usage(), + } } -function hang(input: LLM.StreamInput, ...items: LLM.Event[]) { - return stream(...items).pipe( - Stream.concat( - Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))), - ), - ) +function finishToolCalls(): LLM.Event { + return { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() } +} + +function replyStop(text: string, id = "t") { + return [start(), textStart(id), textDelta(id, text), textEnd(id), finishStep(), finish()] as const +} + +function replyToolCalls(text: string, id = "t") { + return [start(), textStart(id), textDelta(id, text), textEnd(id), finishToolCallsStep(), finishToolCalls()] as const +} + +function toolInputStart(id: string, toolName: string): LLM.Event { + return { type: "tool-input-start", id, toolName } +} + +function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event { + return { type: "tool-call", toolCallId, toolName, input } +} + +function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) { + return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never))) } function defer() { @@ -123,6 +144,36 @@ function defer() { return { promise, resolve } } +function waitMs(ms: number) { + return Effect.promise(() => new Promise((done) => setTimeout(done, ms))) +} + +function toolPart(parts: MessageV2.Part[]) { + return parts.find((part): part is MessageV2.ToolPart => part.type === "tool") +} + +type CompletedToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateCompleted } +type ErrorToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateError } +type RunningToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateRunning } + +function completedTool(parts: MessageV2.Part[]) { + const part = toolPart(parts) + expect(part?.state.status).toBe("completed") + return part?.state.status === "completed" ? (part as CompletedToolPart) : undefined +} + +function errorTool(parts: MessageV2.Part[]) { + const part = toolPart(parts) + expect(part?.state.status).toBe("error") + return part?.state.status === "error" ? (part as ErrorToolPart) : undefined +} + +function runningTool(parts: MessageV2.Part[]) { + const part = toolPart(parts) + expect(part?.state.status).toBe("running") + return part?.state.status === "running" ? (part as RunningToolPart) : undefined +} + const llm = Layer.unwrap( Effect.gen(function* () { const queue: Script[] = [] @@ -160,6 +211,59 @@ const llm = Layer.unwrap( }), ) +const mcp = Layer.succeed( + MCP.Service, + MCP.Service.of({ + status: () => Effect.succeed({}), + clients: () => Effect.succeed({}), + tools: () => Effect.succeed({}), + prompts: () => Effect.succeed({}), + resources: () => Effect.succeed({}), + add: () => Effect.succeed({ status: { status: "disabled" as const } }), + connect: () => Effect.void, + disconnect: () => Effect.void, + getPrompt: () => Effect.succeed(undefined), + readResource: () => Effect.succeed(undefined), + startAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"), + authenticate: () => Effect.die("unexpected MCP auth in prompt-effect tests"), + finishAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"), + removeAuth: () => Effect.void, + supportsOAuth: () => Effect.succeed(false), + hasStoredTokens: () => Effect.succeed(false), + getAuthStatus: () => Effect.succeed("not_authenticated" as const), + }), +) + +const lsp = Layer.succeed( + LSP.Service, + LSP.Service.of({ + init: () => Effect.void, + status: () => Effect.succeed([]), + hasClients: () => Effect.succeed(false), + touchFile: () => Effect.void, + diagnostics: () => Effect.succeed({}), + hover: () => Effect.succeed(undefined), + definition: () => Effect.succeed([]), + references: () => Effect.succeed([]), + implementation: () => Effect.succeed([]), + documentSymbol: () => Effect.succeed([]), + workspaceSymbol: () => Effect.succeed([]), + prepareCallHierarchy: () => Effect.succeed([]), + incomingCalls: () => Effect.succeed([]), + outgoingCalls: () => Effect.succeed([]), + }), +) + +const filetime = Layer.succeed( + FileTime.Service, + FileTime.Service.of({ + read: () => Effect.void, + get: () => Effect.succeed(undefined), + assert: () => Effect.void, + withLock: (_filepath, fn) => Effect.promise(fn), + }), +) + const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer)) const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) const deps = Layer.mergeAll( @@ -170,6 +274,9 @@ const deps = Layer.mergeAll( Permission.layer, Plugin.defaultLayer, Config.defaultLayer, + filetime, + lsp, + mcp, AppFileSystem.defaultLayer, status, llm, @@ -260,17 +367,36 @@ const seed = Effect.fn("test.seed")(function* (sessionID: SessionID, opts?: { fi return { user: msg, assistant } }) -// Priority 1: Loop lifecycle +const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) => + Effect.gen(function* () { + const session = yield* Session.Service + yield* session.updatePart({ + id: PartID.ascending(), + messageID, + sessionID, + type: "subtask", + prompt: "look into the cache key path", + description: "inspect bug", + agent: "general", + model, + }) + }) + +const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) { + const test = yield* TestLLM + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create(input ?? {}) + return { test, prompt, sessions, chat } +}) + +// Loop semantics it.effect("loop exits immediately when last assistant has stop finish", () => provideTmpdirInstance( (dir) => Effect.gen(function* () { - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service - - const chat = yield* sessions.create({}) + const { test, prompt, chat } = yield* boot() yield* seed(chat.id, { finish: "stop" }) const result = yield* prompt.loop({ sessionID: chat.id }) @@ -286,13 +412,8 @@ it.effect("loop calls LLM and returns assistant message", () => provideTmpdirInstance( (dir) => Effect.gen(function* () { - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service - - yield* test.reply(start(), textStart(), textDelta("t", "world"), textEnd(), finishStep(), finish()) - - const chat = yield* sessions.create({}) + const { test, prompt, chat } = yield* boot() + yield* test.reply(...replyStop("world")) yield* user(chat.id, "hello") const result = yield* prompt.loop({ sessionID: chat.id }) @@ -309,16 +430,32 @@ it.effect("loop continues when finish is tool-calls", () => provideTmpdirInstance( (dir) => Effect.gen(function* () { - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service + const { test, prompt, chat } = yield* boot() + yield* test.reply(...replyToolCalls("first")) + yield* test.reply(...replyStop("second")) + yield* user(chat.id, "hello") - // First reply finishes with tool-calls, second with stop + const result = yield* prompt.loop({ sessionID: chat.id }) + expect(yield* test.calls).toBe(2) + expect(result.info.role).toBe("assistant") + }), + { git: true, config: cfg }, + ), +) + +it.effect("failed subtask preserves metadata on error tool state", () => + provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const { test, prompt, chat } = yield* boot({ title: "Pinned" }) yield* test.reply( start(), - textStart(), - textDelta("t", "first"), - textEnd(), + toolInputStart("task-1", "task"), + toolCall("task-1", "task", { + description: "inspect bug", + prompt: "look into the cache key path", + subagent_type: "general", + }), { type: "finish-step", finishReason: "tool-calls", @@ -329,16 +466,41 @@ it.effect("loop continues when finish is tool-calls", () => }, { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() }, ) - yield* test.reply(start(), textStart(), textDelta("t", "second"), textEnd(), finishStep(), finish()) - - const chat = yield* sessions.create({}) - yield* user(chat.id, "hello") + yield* test.reply(...replyStop("done")) + const msg = yield* user(chat.id, "hello") + yield* addSubtask(chat.id, msg.id) const result = yield* prompt.loop({ sessionID: chat.id }) - expect(yield* test.calls).toBe(2) expect(result.info.role).toBe("assistant") + expect(yield* test.calls).toBe(2) + + const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id))) + const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") + expect(taskMsg?.info.role).toBe("assistant") + if (!taskMsg || taskMsg.info.role !== "assistant") return + + const tool = errorTool(taskMsg.parts) + if (!tool) return + + expect(tool.state.error).toContain("Tool execution failed") + expect(tool.state.metadata).toBeDefined() + expect(tool.state.metadata?.sessionId).toBeDefined() + expect(tool.state.metadata?.model).toEqual({ + providerID: ProviderID.make("test"), + modelID: ModelID.make("missing-model"), + }) }), - { git: true, config: cfg }, + { + git: true, + config: { + ...cfg, + agent: { + general: { + model: "test/missing-model", + }, + }, + }, + }, ), ) @@ -375,7 +537,7 @@ it.effect("loop sets status to busy then idle", () => ), ) -// Priority 2: Cancel safety +// Cancel semantics it.effect( "cancel interrupts loop and returns last assistant", @@ -383,11 +545,7 @@ it.effect( provideTmpdirInstance( (dir) => Effect.gen(function* () { - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service - - const chat = yield* sessions.create({}) + const { test, prompt, chat } = yield* boot() yield* seed(chat.id) // Make LLM hang so the loop blocks @@ -398,7 +556,7 @@ it.effect( const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) // Give the loop time to start - yield* Effect.promise(() => new Promise((r) => setTimeout(r, 200))) + yield* waitMs(200) yield* prompt.cancel(chat.id) const exit = yield* Fiber.await(fiber) @@ -419,17 +577,13 @@ it.effect( (dir) => Effect.gen(function* () { const ready = defer() - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service + const { test, prompt, chat } = yield* boot() yield* test.push((input) => hang(input, start()).pipe( Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), ), ) - - const chat = yield* sessions.create({}) yield* user(chat.id, "hello") const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) @@ -457,24 +611,20 @@ it.effect( (dir) => Effect.gen(function* () { const ready = defer() - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service + const { test, prompt, chat } = yield* boot() yield* test.push((input) => hang(input, start()).pipe( Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), ), ) - - const chat = yield* sessions.create({}) yield* user(chat.id, "hello") const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) yield* Effect.promise(() => ready.promise) // Queue a second caller const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => new Promise((r) => setTimeout(r, 50))) + yield* waitMs(50) yield* prompt.cancel(chat.id) @@ -488,17 +638,13 @@ it.effect( 30_000, ) -// Priority 3: Deferred queue +// Queue semantics it.effect("concurrent loop callers get same result", () => provideTmpdirInstance( (dir) => Effect.gen(function* () { - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service - - const chat = yield* sessions.create({}) + const { prompt, chat } = yield* boot() yield* seed(chat.id, { finish: "stop" }) const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], { @@ -516,14 +662,10 @@ it.effect("concurrent loop callers all receive same error result", () => provideTmpdirInstance( (dir) => Effect.gen(function* () { - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service + const { test, prompt, chat } = yield* boot() // Push a stream that fails — the loop records the error on the assistant message yield* test.push(Stream.fail(new Error("boom"))) - - const chat = yield* sessions.create({}) yield* user(chat.id, "hello") const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], { @@ -590,7 +732,7 @@ it.effect("assertNotBusy succeeds when idle", () => ), ) -// Priority 4: Shell basics +// Shell semantics it.effect( "shell rejects with BusyError when loop running", @@ -599,17 +741,13 @@ it.effect( (dir) => Effect.gen(function* () { const ready = defer() - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service + const { test, prompt, chat } = yield* boot() yield* test.push((input) => hang(input, start()).pipe( Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), ), ) - - const chat = yield* sessions.create({}) yield* user(chat.id, "hi") const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) @@ -626,27 +764,78 @@ it.effect( 30_000, ) +it.effect("shell captures stdout and stderr in completed tool output", () => + provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const { prompt, chat } = yield* boot() + const result = yield* prompt.shell({ + sessionID: chat.id, + agent: "build", + command: "printf out && printf err >&2", + }) + + expect(result.info.role).toBe("assistant") + const tool = completedTool(result.parts) + if (!tool) return + + expect(tool.state.output).toContain("out") + expect(tool.state.output).toContain("err") + expect(tool.state.metadata.output).toContain("out") + expect(tool.state.metadata.output).toContain("err") + }), + { git: true, config: cfg }, + ), +) + +it.effect( + "shell updates running metadata before process exit", + () => + provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const { prompt, chat } = yield* boot() + + const fiber = yield* prompt + .shell({ sessionID: chat.id, agent: "build", command: "printf first && sleep 0.2 && printf second" }) + .pipe(Effect.forkChild) + + yield* Effect.promise(async () => { + const start = Date.now() + while (Date.now() - start < 2000) { + const msgs = await MessageV2.filterCompacted(MessageV2.stream(chat.id)) + const taskMsg = msgs.find((item) => item.info.role === "assistant") + const tool = taskMsg ? runningTool(taskMsg.parts) : undefined + if (tool?.state.metadata?.output.includes("first")) return + await new Promise((done) => setTimeout(done, 20)) + } + throw new Error("timed out waiting for running shell metadata") + }) + + const exit = yield* Fiber.await(fiber) + expect(Exit.isSuccess(exit)).toBe(true) + }), + { git: true, config: cfg }, + ), + 30_000, +) + it.effect( "loop waits while shell runs and starts after shell exits", () => provideTmpdirInstance( (dir) => Effect.gen(function* () { - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service - - yield* test.reply(start(), textStart(), textDelta("t", "after-shell"), textEnd(), finishStep(), finish()) - - const chat = yield* sessions.create({}) + const { test, prompt, chat } = yield* boot() + yield* test.reply(...replyStop("after-shell")) const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" }) .pipe(Effect.forkChild) - yield* Effect.promise(() => new Promise((done) => setTimeout(done, 50))) + yield* waitMs(50) const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => new Promise((done) => setTimeout(done, 50))) + yield* waitMs(50) expect(yield* test.calls).toBe(0) @@ -671,22 +860,17 @@ it.effect( provideTmpdirInstance( (dir) => Effect.gen(function* () { - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service - - yield* test.reply(start(), textStart(), textDelta("t", "done"), textEnd(), finishStep(), finish()) - - const chat = yield* sessions.create({}) + const { test, prompt, chat } = yield* boot() + yield* test.reply(...replyStop("done")) const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" }) .pipe(Effect.forkChild) - yield* Effect.promise(() => new Promise((done) => setTimeout(done, 50))) + yield* waitMs(50) const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => new Promise((done) => setTimeout(done, 50))) + yield* waitMs(50) expect(yield* test.calls).toBe(0) @@ -712,15 +896,12 @@ it.effect( provideTmpdirInstance( (dir) => Effect.gen(function* () { - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service - - const chat = yield* sessions.create({}) + const { prompt, chat } = yield* boot() const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" }) .pipe(Effect.forkChild) - yield* Effect.promise(() => new Promise((done) => setTimeout(done, 50))) + yield* waitMs(50) yield* prompt.cancel(chat.id) @@ -747,15 +928,12 @@ it.effect( provideTmpdirInstance( (dir) => Effect.gen(function* () { - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service - - const chat = yield* sessions.create({}) + const { prompt, chat } = yield* boot() const a = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" }) .pipe(Effect.forkChild) - yield* Effect.promise(() => new Promise((done) => setTimeout(done, 50))) + yield* waitMs(50) const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true)