Compare commits

...

1 Commits

Author SHA1 Message Date
Kit Langton
ea19ee7966 refactor(session): use onInterrupt finalizer for cancelled tool output
Wire the AI SDK's abortSignal into the tool fiber via runPromiseExit's
signal option so interruption is first-class, and move the "finalize on
cancel" path into an Effect.onInterrupt finalizer that re-awaits the
still-running native Promise uninterruptibly, builds the output, and
posts it through completeToolCall.

Replaces the imperative `if (options.abortSignal?.aborted)` tail check
with structural interruption handling. When the fiber is interrupted,
the finalizer captures the truncated bash output (or MCP tool result)
and the .then on runPromiseExit resolves the SDK's Promise with the
captured value instead of propagating the interrupt cause as a
rejection, so the tool is reported as successfully completed rather
than as a tool-error.

InstanceRef is provided on the tool fiber so InstanceState.context
resolves through ServiceMap rather than falling through to the
AsyncLocalStorage, which the onInterrupt finalizer runs outside of.
2026-04-09 16:40:57 -04:00
2 changed files with 134 additions and 93 deletions

View File

@@ -7,7 +7,7 @@ import { Observability } from "./oltp"
export const memoMap = Layer.makeMemoMapUnsafe()
function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
try {
const ctx = Instance.current
return Effect.provideService(effect, InstanceRef, ctx)

View File

@@ -45,7 +45,7 @@ import { decodeDataUrl } from "@/util/data-url"
import { Process } from "@/util/process"
import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import { attach, makeRuntime } from "@/effect/run-service"
import { TaskTool } from "@/tool/task"
import { SessionRunState } from "./run-state"
@@ -62,6 +62,64 @@ IMPORTANT:
const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested structured output. You MUST use the StructuredOutput tool to provide your final response. Do NOT respond with plain text - you MUST call the StructuredOutput tool with your answer formatted according to the schema.`
/**
* Bridges an AI SDK Promise-based `execute` callback to Effect with graceful
* cancel semantics.
*
* On the happy path: runs `before`, awaits `execute()`, then `finalize(result)`
* and returns the output.
*
* On cancel mid-flight: the `onInterrupt` finalizer re-awaits the same in-flight
* native Promise uninterruptibly, runs `finalize` again on the eventual result,
* and posts it via `onCancel` (the processor side channel). This is what lets
* cancelled bash surface its truncated output through the normal completion
* path instead of getting stamped as aborted by processor cleanup.
*
* The returned Promise always resolves with a finalized output when one is
* available (even on interrupt), so the SDK reports the tool as successfully
* completed rather than as a tool-error.
*
* `attach` captures the current Instance context via InstanceRef so the
* onInterrupt finalizer — which runs outside the AsyncLocalStorage chain
* `execute()` is called from — can still resolve it through the ServiceMap.
*/
function runToolExecute<Raw, Output>(options: {
signal: AbortSignal | undefined
before: Effect.Effect<unknown, any, any>
execute: () => Promise<Raw>
finalize: (result: Raw) => Effect.Effect<Output, any, any>
onCancel: (output: Output) => Effect.Effect<unknown, any, any>
}): Promise<Output> {
let pending: Promise<Raw> | undefined
let rescued: Output | undefined
const wait = Effect.promise(() => pending!)
const program = Effect.gen(function* () {
yield* options.before
pending = options.execute()
const result = yield* wait
return yield* options.finalize(result)
}).pipe(
// On interrupt, re-await the in-flight Promise uninterruptibly (finalizers
// always are), finalize it the same way, and post through the side channel.
// Stash the output so catchCause below can surface it instead of the cause.
Effect.onInterrupt(() =>
Effect.gen(function* () {
if (pending === undefined) return
const result = yield* wait
const output = yield* options.finalize(result)
rescued = output
yield* options.onCancel(output)
}).pipe(Effect.catchCause(() => Effect.void)),
),
Effect.catchCause((cause) =>
Effect.suspend(() => (rescued !== undefined ? Effect.succeed(rescued!) : Effect.failCause(cause))),
),
)
return Effect.runPromise(attach(program as Effect.Effect<Output>), { signal: options.signal })
}
export namespace SessionPrompt {
const log = Log.create({ service: "session.prompt" })
@@ -396,35 +454,28 @@ NOTE: At any point in time through this workflow you should feel free to ask the
description: item.description,
inputSchema: jsonSchema(schema as any),
execute(args, options) {
return Effect.runPromise(
Effect.gen(function* () {
const ctx = context(args, options)
yield* plugin.trigger(
"tool.execute.before",
{ tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID },
{ args },
)
const result = yield* Effect.promise(() => item.execute(args, ctx))
const output = {
...result,
attachments: result.attachments?.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
})),
}
yield* plugin.trigger(
"tool.execute.after",
{ tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args },
output,
)
if (options.abortSignal?.aborted) {
yield* input.processor.completeToolCall(options.toolCallId, output)
}
return output
}),
)
const ctx = context(args, options)
const meta = { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID }
return runToolExecute({
signal: options.abortSignal,
before: plugin.trigger("tool.execute.before", meta, { args }),
execute: () => item.execute(args, ctx),
finalize: (result) =>
Effect.gen(function* () {
const output = {
...result,
attachments: result.attachments?.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
})),
}
yield* plugin.trigger("tool.execute.after", { ...meta, args }, output)
return output
}),
onCancel: (output) => input.processor.completeToolCall(options.toolCallId, output),
})
},
})
}
@@ -436,74 +487,64 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const schema = yield* Effect.promise(() => Promise.resolve(asSchema(item.inputSchema).jsonSchema))
const transformed = ProviderTransform.schema(input.model, schema)
item.inputSchema = jsonSchema(transformed)
item.execute = (args, opts) =>
Effect.runPromise(
Effect.gen(function* () {
const ctx = context(args, opts)
yield* plugin.trigger(
"tool.execute.before",
{ tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId },
{ args },
)
item.execute = (args, opts) => {
const ctx = context(args, opts)
const meta = { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId }
type Raw = Awaited<ReturnType<NonNullable<typeof execute>>>
return runToolExecute({
signal: opts.abortSignal,
before: Effect.gen(function* () {
yield* plugin.trigger("tool.execute.before", meta, { args })
yield* Effect.promise(() => ctx.ask({ permission: key, metadata: {}, patterns: ["*"], always: ["*"] }))
const result: Awaited<ReturnType<NonNullable<typeof execute>>> = yield* Effect.promise(() =>
execute(args, opts),
)
yield* plugin.trigger(
"tool.execute.after",
{ tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId, args },
result,
)
const textParts: string[] = []
const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
for (const contentItem of result.content) {
if (contentItem.type === "text") textParts.push(contentItem.text)
else if (contentItem.type === "image") {
attachments.push({
type: "file",
mime: contentItem.mimeType,
url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
})
} else if (contentItem.type === "resource") {
const { resource } = contentItem
if (resource.text) textParts.push(resource.text)
if (resource.blob) {
}),
execute: (): Promise<Raw> => execute(args, opts),
finalize: (result) =>
Effect.gen(function* () {
yield* plugin.trigger("tool.execute.after", { ...meta, args }, result)
const textParts: string[] = []
const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
for (const contentItem of result.content) {
if (contentItem.type === "text") textParts.push(contentItem.text)
else if (contentItem.type === "image") {
attachments.push({
type: "file",
mime: resource.mimeType ?? "application/octet-stream",
url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
filename: resource.uri,
mime: contentItem.mimeType,
url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
})
} else if (contentItem.type === "resource") {
const { resource } = contentItem
if (resource.text) textParts.push(resource.text)
if (resource.blob) {
attachments.push({
type: "file",
mime: resource.mimeType ?? "application/octet-stream",
url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
filename: resource.uri,
})
}
}
}
}
const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent)
const metadata = {
...(result.metadata ?? {}),
truncated: truncated.truncated,
...(truncated.truncated && { outputPath: truncated.outputPath }),
}
const output = {
title: "",
metadata,
output: truncated.content,
attachments: attachments.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
})),
content: result.content,
}
if (opts.abortSignal?.aborted) {
yield* input.processor.completeToolCall(opts.toolCallId, output)
}
return output
}),
)
const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent)
return {
title: "",
metadata: {
...(result.metadata ?? {}),
truncated: truncated.truncated,
...(truncated.truncated && { outputPath: truncated.outputPath }),
},
output: truncated.content,
attachments: attachments.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
})),
content: result.content,
}
}),
onCancel: (output) => input.processor.completeToolCall(opts.toolCallId, output),
})
}
tools[key] = item
}