From e76cf967e60995986a4dd99d818fc900fa82f904 Mon Sep 17 00:00:00 2001 From: Aiden Cline <63023139+rekram1-node@users.noreply.github.com> Date: Thu, 14 May 2026 01:19:11 -0500 Subject: [PATCH] fix(session): finalize interrupted assistant messages (#27254) --- packages/opencode/src/session/prompt.ts | 26 +++- packages/opencode/test/session/prompt.test.ts | 125 +++++++++++++++++- 2 files changed, 139 insertions(+), 12 deletions(-) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 6248ce455c..5d79bff049 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -1755,12 +1755,25 @@ NOTE: At any point in time through this workflow you should feel free to ask the sessionID, } yield* sessions.updateMessage(msg) - const handle = yield* processor.create({ - assistantMessage: msg, - sessionID, - model, + + const finalizeInterruptedAssistant = Effect.gen(function* () { + if (msg.time.completed) return + msg.error ??= MessageV2.fromError(new DOMException("Aborted", "AbortError"), { + providerID: msg.providerID, + aborted: true, + }) + msg.time.completed = Date.now() + yield* sessions.updateMessage(msg) }) + const handle = yield* processor + .create({ + assistantMessage: msg, + sessionID, + model, + }) + .pipe(Effect.onInterrupt(() => finalizeInterruptedAssistant)) + const outcome: "break" | "continue" = yield* Effect.gen(function* () { const lastUserMsg = msgs.findLast((m) => m.info.role === "user") const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false @@ -1859,7 +1872,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the }) } return "continue" as const - }).pipe(Effect.ensuring(instruction.clear(handle.message.id))) + }).pipe( + Effect.ensuring(instruction.clear(handle.message.id)), + Effect.onInterrupt(() => finalizeInterruptedAssistant), + ) if (outcome === "break") break continue } diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 9ad2fbe1f6..9ac038aa66 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -152,7 +152,16 @@ const lsp = Layer.succeed( const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer)) const run = SessionRunState.layer.pipe(Layer.provide(status)) const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) -function makeHttp() { + +const processorCreateStarted: Array<() => void> = [] +const blockingProcessor = Layer.succeed( + SessionProcessor.Service, + SessionProcessor.Service.of({ + create: () => Effect.sync(() => processorCreateStarted.shift()?.()).pipe(Effect.andThen(Effect.never)), + }), +) + +function makeHttp(input?: { processor?: "blocking" }) { const deps = Layer.mergeAll( Session.defaultLayer, Snapshot.defaultLayer, @@ -186,12 +195,15 @@ function makeHttp() { Layer.provideMerge(deps), ) const trunc = Truncate.layer.pipe(Layer.provideMerge(deps)) - const proc = SessionProcessor.layer.pipe( - Layer.provide(summary), - Layer.provide(Image.defaultLayer), - Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })), - Layer.provideMerge(deps), - ) + const proc = + input?.processor === "blocking" + ? blockingProcessor + : SessionProcessor.layer.pipe( + Layer.provide(summary), + Layer.provide(Image.defaultLayer), + Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })), + Layer.provideMerge(deps), + ) const compact = SessionCompaction.layer.pipe( Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })), Layer.provideMerge(proc), @@ -218,6 +230,7 @@ function makeHttp() { } const it = testEffect(makeHttp()) +const race = testEffect(makeHttp({ processor: "blocking" })) const unix = process.platform !== "win32" ? it.instance : it.instance.skip // Config that registers a custom "test" provider with a "test-model" model @@ -341,6 +354,14 @@ const deferredAsPromise = (deferred: Deferred.Deferred): PromiseLike => }, }) +function defer() { + let resolve!: (value: T | PromiseLike) => void + const promise = new Promise((done) => { + resolve = done + }) + return { promise, resolve } +} + const succeedVoid = (deferred: Deferred.Deferred) => { Effect.runSync(Deferred.succeed(deferred, void 0).pipe(Effect.ignore)) } @@ -896,6 +917,96 @@ it.instance( 3_000, ) +race.instance( + "finalizes assistant when cancelled before processor creation completes", + () => + Effect.gen(function* () { + yield* useServerConfig(providerCfg) + processorCreateStarted.length = 0 + yield* Effect.addFinalizer(() => + Effect.sync(() => { + processorCreateStarted.length = 0 + }), + ) + + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Processor creation race" }) + + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "first" }], + }) + + const firstCreate = defer() + processorCreateStarted.push(firstCreate.resolve) + const first = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* Effect.promise(() => firstCreate.promise) + + yield* prompt.cancel(chat.id) + const firstExit = yield* Fiber.await(first) + expect(Exit.isSuccess(firstExit)).toBe(true) + + let messages = yield* sessions.messages({ sessionID: chat.id }) + const firstInterrupted = messages.at(-1) + expect(firstInterrupted?.info.role).toBe("assistant") + expect(firstInterrupted?.parts).toHaveLength(0) + if (firstInterrupted?.info.role === "assistant") { + expect(firstInterrupted.info.finish).toBeUndefined() + expect(firstInterrupted.info.time.completed).toBeNumber() + expect(firstInterrupted.info.error?.name).toBe("MessageAbortedError") + } + + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "second" }], + }) + + const secondCreate = defer() + processorCreateStarted.push(secondCreate.resolve) + const second = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* Effect.promise(() => secondCreate.promise) + + yield* prompt.cancel(chat.id) + const secondExit = yield* Fiber.await(second) + expect(Exit.isSuccess(secondExit)).toBe(true) + + messages = yield* sessions.messages({ sessionID: chat.id }) + const poisonMessages = messages.filter( + (message) => + message.info.role === "assistant" && + message.parts.length === 0 && + !message.info.finish && + !message.info.time.completed && + !message.info.error, + ) + expect(poisonMessages).toHaveLength(0) + + const interruptedMessages = messages.filter( + (message) => + message.info.role === "assistant" && + message.parts.length === 0 && + message.info.time.completed && + message.info.error?.name === "MessageAbortedError", + ) + expect(interruptedMessages).toHaveLength(2) + + const lastUser = messages.at(-2) + const lastAssistant = messages.at(-1) + expect(lastUser?.info.role).toBe("user") + expect(lastAssistant?.info.role).toBe("assistant") + if (lastUser?.info.role === "user" && lastAssistant?.info.role === "assistant") { + expect(lastAssistant.info.parentID).toBe(lastUser?.info.id) + } + }), + { git: true }, + 3_000, +) + it.instance( "cancel finalizes subtask tool state", () =>