diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts
index 6248ce455c..5d79bff049 100644
--- a/packages/opencode/src/session/prompt.ts
+++ b/packages/opencode/src/session/prompt.ts
@@ -1755,12 +1755,25 @@ NOTE: At any point in time through this workflow you should feel free to ask the
sessionID,
}
yield* sessions.updateMessage(msg)
- const handle = yield* processor.create({
- assistantMessage: msg,
- sessionID,
- model,
+
+ const finalizeInterruptedAssistant = Effect.gen(function* () {
+ if (msg.time.completed) return
+ msg.error ??= MessageV2.fromError(new DOMException("Aborted", "AbortError"), {
+ providerID: msg.providerID,
+ aborted: true,
+ })
+ msg.time.completed = Date.now()
+ yield* sessions.updateMessage(msg)
})
+ const handle = yield* processor
+ .create({
+ assistantMessage: msg,
+ sessionID,
+ model,
+ })
+ .pipe(Effect.onInterrupt(() => finalizeInterruptedAssistant))
+
const outcome: "break" | "continue" = yield* Effect.gen(function* () {
const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
@@ -1859,7 +1872,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the
})
}
return "continue" as const
- }).pipe(Effect.ensuring(instruction.clear(handle.message.id)))
+ }).pipe(
+ Effect.ensuring(instruction.clear(handle.message.id)),
+ Effect.onInterrupt(() => finalizeInterruptedAssistant),
+ )
if (outcome === "break") break
continue
}
diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts
index 9ad2fbe1f6..9ac038aa66 100644
--- a/packages/opencode/test/session/prompt.test.ts
+++ b/packages/opencode/test/session/prompt.test.ts
@@ -152,7 +152,16 @@ const lsp = Layer.succeed(
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
const run = SessionRunState.layer.pipe(Layer.provide(status))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
-function makeHttp() {
+
+const processorCreateStarted: Array<() => void> = []
+const blockingProcessor = Layer.succeed(
+ SessionProcessor.Service,
+ SessionProcessor.Service.of({
+ create: () => Effect.sync(() => processorCreateStarted.shift()?.()).pipe(Effect.andThen(Effect.never)),
+ }),
+)
+
+function makeHttp(input?: { processor?: "blocking" }) {
const deps = Layer.mergeAll(
Session.defaultLayer,
Snapshot.defaultLayer,
@@ -186,12 +195,15 @@ function makeHttp() {
Layer.provideMerge(deps),
)
const trunc = Truncate.layer.pipe(Layer.provideMerge(deps))
- const proc = SessionProcessor.layer.pipe(
- Layer.provide(summary),
- Layer.provide(Image.defaultLayer),
- Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
- Layer.provideMerge(deps),
- )
+ const proc =
+ input?.processor === "blocking"
+ ? blockingProcessor
+ : SessionProcessor.layer.pipe(
+ Layer.provide(summary),
+ Layer.provide(Image.defaultLayer),
+ Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
+ Layer.provideMerge(deps),
+ )
const compact = SessionCompaction.layer.pipe(
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
Layer.provideMerge(proc),
@@ -218,6 +230,7 @@ function makeHttp() {
}
const it = testEffect(makeHttp())
+const race = testEffect(makeHttp({ processor: "blocking" }))
const unix = process.platform !== "win32" ? it.instance : it.instance.skip
// Config that registers a custom "test" provider with a "test-model" model
@@ -341,6 +354,14 @@ const deferredAsPromise = (deferred: Deferred.Deferred): PromiseLike =>
},
})
+function defer() {
+ let resolve!: (value: T | PromiseLike) => void
+ const promise = new Promise((done) => {
+ resolve = done
+ })
+ return { promise, resolve }
+}
+
const succeedVoid = (deferred: Deferred.Deferred) => {
Effect.runSync(Deferred.succeed(deferred, void 0).pipe(Effect.ignore))
}
@@ -896,6 +917,96 @@ it.instance(
3_000,
)
+race.instance(
+ "finalizes assistant when cancelled before processor creation completes",
+ () =>
+ Effect.gen(function* () {
+ yield* useServerConfig(providerCfg)
+ processorCreateStarted.length = 0
+ yield* Effect.addFinalizer(() =>
+ Effect.sync(() => {
+ processorCreateStarted.length = 0
+ }),
+ )
+
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Processor creation race" })
+
+ yield* prompt.prompt({
+ sessionID: chat.id,
+ agent: "build",
+ noReply: true,
+ parts: [{ type: "text", text: "first" }],
+ })
+
+ const firstCreate = defer()
+ processorCreateStarted.push(firstCreate.resolve)
+ const first = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => firstCreate.promise)
+
+ yield* prompt.cancel(chat.id)
+ const firstExit = yield* Fiber.await(first)
+ expect(Exit.isSuccess(firstExit)).toBe(true)
+
+ let messages = yield* sessions.messages({ sessionID: chat.id })
+ const firstInterrupted = messages.at(-1)
+ expect(firstInterrupted?.info.role).toBe("assistant")
+ expect(firstInterrupted?.parts).toHaveLength(0)
+ if (firstInterrupted?.info.role === "assistant") {
+ expect(firstInterrupted.info.finish).toBeUndefined()
+ expect(firstInterrupted.info.time.completed).toBeNumber()
+ expect(firstInterrupted.info.error?.name).toBe("MessageAbortedError")
+ }
+
+ yield* prompt.prompt({
+ sessionID: chat.id,
+ agent: "build",
+ noReply: true,
+ parts: [{ type: "text", text: "second" }],
+ })
+
+ const secondCreate = defer()
+ processorCreateStarted.push(secondCreate.resolve)
+ const second = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => secondCreate.promise)
+
+ yield* prompt.cancel(chat.id)
+ const secondExit = yield* Fiber.await(second)
+ expect(Exit.isSuccess(secondExit)).toBe(true)
+
+ messages = yield* sessions.messages({ sessionID: chat.id })
+ const poisonMessages = messages.filter(
+ (message) =>
+ message.info.role === "assistant" &&
+ message.parts.length === 0 &&
+ !message.info.finish &&
+ !message.info.time.completed &&
+ !message.info.error,
+ )
+ expect(poisonMessages).toHaveLength(0)
+
+ const interruptedMessages = messages.filter(
+ (message) =>
+ message.info.role === "assistant" &&
+ message.parts.length === 0 &&
+ message.info.time.completed &&
+ message.info.error?.name === "MessageAbortedError",
+ )
+ expect(interruptedMessages).toHaveLength(2)
+
+ const lastUser = messages.at(-2)
+ const lastAssistant = messages.at(-1)
+ expect(lastUser?.info.role).toBe("user")
+ expect(lastAssistant?.info.role).toBe("assistant")
+ if (lastUser?.info.role === "user" && lastAssistant?.info.role === "assistant") {
+ expect(lastAssistant.info.parentID).toBe(lastUser?.info.id)
+ }
+ }),
+ { git: true },
+ 3_000,
+)
+
it.instance(
"cancel finalizes subtask tool state",
() =>