mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-10 16:04:58 +00:00
Compare commits
1 Commits
dev
...
kit/effect
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea19ee7966 |
@@ -7,7 +7,7 @@ import { Observability } from "./oltp"
|
||||
|
||||
export const memoMap = Layer.makeMemoMapUnsafe()
|
||||
|
||||
function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
|
||||
export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
|
||||
try {
|
||||
const ctx = Instance.current
|
||||
return Effect.provideService(effect, InstanceRef, ctx)
|
||||
|
||||
@@ -45,7 +45,7 @@ import { decodeDataUrl } from "@/util/data-url"
|
||||
import { Process } from "@/util/process"
|
||||
import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { attach, makeRuntime } from "@/effect/run-service"
|
||||
import { TaskTool } from "@/tool/task"
|
||||
import { SessionRunState } from "./run-state"
|
||||
|
||||
@@ -62,6 +62,64 @@ IMPORTANT:
|
||||
|
||||
const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested structured output. You MUST use the StructuredOutput tool to provide your final response. Do NOT respond with plain text - you MUST call the StructuredOutput tool with your answer formatted according to the schema.`
|
||||
|
||||
/**
|
||||
* Bridges an AI SDK Promise-based `execute` callback to Effect with graceful
|
||||
* cancel semantics.
|
||||
*
|
||||
* On the happy path: runs `before`, awaits `execute()`, then `finalize(result)`
|
||||
* and returns the output.
|
||||
*
|
||||
* On cancel mid-flight: the `onInterrupt` finalizer re-awaits the same in-flight
|
||||
* native Promise uninterruptibly, runs `finalize` again on the eventual result,
|
||||
* and posts it via `onCancel` (the processor side channel). This is what lets
|
||||
* cancelled bash surface its truncated output through the normal completion
|
||||
* path instead of getting stamped as aborted by processor cleanup.
|
||||
*
|
||||
* The returned Promise always resolves with a finalized output when one is
|
||||
* available (even on interrupt), so the SDK reports the tool as successfully
|
||||
* completed rather than as a tool-error.
|
||||
*
|
||||
* `attach` captures the current Instance context via InstanceRef so the
|
||||
* onInterrupt finalizer — which runs outside the AsyncLocalStorage chain
|
||||
* `execute()` is called from — can still resolve it through the ServiceMap.
|
||||
*/
|
||||
function runToolExecute<Raw, Output>(options: {
|
||||
signal: AbortSignal | undefined
|
||||
before: Effect.Effect<unknown, any, any>
|
||||
execute: () => Promise<Raw>
|
||||
finalize: (result: Raw) => Effect.Effect<Output, any, any>
|
||||
onCancel: (output: Output) => Effect.Effect<unknown, any, any>
|
||||
}): Promise<Output> {
|
||||
let pending: Promise<Raw> | undefined
|
||||
let rescued: Output | undefined
|
||||
const wait = Effect.promise(() => pending!)
|
||||
|
||||
const program = Effect.gen(function* () {
|
||||
yield* options.before
|
||||
pending = options.execute()
|
||||
const result = yield* wait
|
||||
return yield* options.finalize(result)
|
||||
}).pipe(
|
||||
// On interrupt, re-await the in-flight Promise uninterruptibly (finalizers
|
||||
// always are), finalize it the same way, and post through the side channel.
|
||||
// Stash the output so catchCause below can surface it instead of the cause.
|
||||
Effect.onInterrupt(() =>
|
||||
Effect.gen(function* () {
|
||||
if (pending === undefined) return
|
||||
const result = yield* wait
|
||||
const output = yield* options.finalize(result)
|
||||
rescued = output
|
||||
yield* options.onCancel(output)
|
||||
}).pipe(Effect.catchCause(() => Effect.void)),
|
||||
),
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.suspend(() => (rescued !== undefined ? Effect.succeed(rescued!) : Effect.failCause(cause))),
|
||||
),
|
||||
)
|
||||
|
||||
return Effect.runPromise(attach(program as Effect.Effect<Output>), { signal: options.signal })
|
||||
}
|
||||
|
||||
export namespace SessionPrompt {
|
||||
const log = Log.create({ service: "session.prompt" })
|
||||
|
||||
@@ -396,35 +454,28 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
description: item.description,
|
||||
inputSchema: jsonSchema(schema as any),
|
||||
execute(args, options) {
|
||||
return Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const ctx = context(args, options)
|
||||
yield* plugin.trigger(
|
||||
"tool.execute.before",
|
||||
{ tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID },
|
||||
{ args },
|
||||
)
|
||||
const result = yield* Effect.promise(() => item.execute(args, ctx))
|
||||
const output = {
|
||||
...result,
|
||||
attachments: result.attachments?.map((attachment) => ({
|
||||
...attachment,
|
||||
id: PartID.ascending(),
|
||||
sessionID: ctx.sessionID,
|
||||
messageID: input.processor.message.id,
|
||||
})),
|
||||
}
|
||||
yield* plugin.trigger(
|
||||
"tool.execute.after",
|
||||
{ tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args },
|
||||
output,
|
||||
)
|
||||
if (options.abortSignal?.aborted) {
|
||||
yield* input.processor.completeToolCall(options.toolCallId, output)
|
||||
}
|
||||
return output
|
||||
}),
|
||||
)
|
||||
const ctx = context(args, options)
|
||||
const meta = { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID }
|
||||
return runToolExecute({
|
||||
signal: options.abortSignal,
|
||||
before: plugin.trigger("tool.execute.before", meta, { args }),
|
||||
execute: () => item.execute(args, ctx),
|
||||
finalize: (result) =>
|
||||
Effect.gen(function* () {
|
||||
const output = {
|
||||
...result,
|
||||
attachments: result.attachments?.map((attachment) => ({
|
||||
...attachment,
|
||||
id: PartID.ascending(),
|
||||
sessionID: ctx.sessionID,
|
||||
messageID: input.processor.message.id,
|
||||
})),
|
||||
}
|
||||
yield* plugin.trigger("tool.execute.after", { ...meta, args }, output)
|
||||
return output
|
||||
}),
|
||||
onCancel: (output) => input.processor.completeToolCall(options.toolCallId, output),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -436,74 +487,64 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
const schema = yield* Effect.promise(() => Promise.resolve(asSchema(item.inputSchema).jsonSchema))
|
||||
const transformed = ProviderTransform.schema(input.model, schema)
|
||||
item.inputSchema = jsonSchema(transformed)
|
||||
item.execute = (args, opts) =>
|
||||
Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const ctx = context(args, opts)
|
||||
yield* plugin.trigger(
|
||||
"tool.execute.before",
|
||||
{ tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId },
|
||||
{ args },
|
||||
)
|
||||
item.execute = (args, opts) => {
|
||||
const ctx = context(args, opts)
|
||||
const meta = { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId }
|
||||
type Raw = Awaited<ReturnType<NonNullable<typeof execute>>>
|
||||
return runToolExecute({
|
||||
signal: opts.abortSignal,
|
||||
before: Effect.gen(function* () {
|
||||
yield* plugin.trigger("tool.execute.before", meta, { args })
|
||||
yield* Effect.promise(() => ctx.ask({ permission: key, metadata: {}, patterns: ["*"], always: ["*"] }))
|
||||
const result: Awaited<ReturnType<NonNullable<typeof execute>>> = yield* Effect.promise(() =>
|
||||
execute(args, opts),
|
||||
)
|
||||
yield* plugin.trigger(
|
||||
"tool.execute.after",
|
||||
{ tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId, args },
|
||||
result,
|
||||
)
|
||||
|
||||
const textParts: string[] = []
|
||||
const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
|
||||
for (const contentItem of result.content) {
|
||||
if (contentItem.type === "text") textParts.push(contentItem.text)
|
||||
else if (contentItem.type === "image") {
|
||||
attachments.push({
|
||||
type: "file",
|
||||
mime: contentItem.mimeType,
|
||||
url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
|
||||
})
|
||||
} else if (contentItem.type === "resource") {
|
||||
const { resource } = contentItem
|
||||
if (resource.text) textParts.push(resource.text)
|
||||
if (resource.blob) {
|
||||
}),
|
||||
execute: (): Promise<Raw> => execute(args, opts),
|
||||
finalize: (result) =>
|
||||
Effect.gen(function* () {
|
||||
yield* plugin.trigger("tool.execute.after", { ...meta, args }, result)
|
||||
const textParts: string[] = []
|
||||
const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
|
||||
for (const contentItem of result.content) {
|
||||
if (contentItem.type === "text") textParts.push(contentItem.text)
|
||||
else if (contentItem.type === "image") {
|
||||
attachments.push({
|
||||
type: "file",
|
||||
mime: resource.mimeType ?? "application/octet-stream",
|
||||
url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
|
||||
filename: resource.uri,
|
||||
mime: contentItem.mimeType,
|
||||
url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
|
||||
})
|
||||
} else if (contentItem.type === "resource") {
|
||||
const { resource } = contentItem
|
||||
if (resource.text) textParts.push(resource.text)
|
||||
if (resource.blob) {
|
||||
attachments.push({
|
||||
type: "file",
|
||||
mime: resource.mimeType ?? "application/octet-stream",
|
||||
url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
|
||||
filename: resource.uri,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent)
|
||||
const metadata = {
|
||||
...(result.metadata ?? {}),
|
||||
truncated: truncated.truncated,
|
||||
...(truncated.truncated && { outputPath: truncated.outputPath }),
|
||||
}
|
||||
|
||||
const output = {
|
||||
title: "",
|
||||
metadata,
|
||||
output: truncated.content,
|
||||
attachments: attachments.map((attachment) => ({
|
||||
...attachment,
|
||||
id: PartID.ascending(),
|
||||
sessionID: ctx.sessionID,
|
||||
messageID: input.processor.message.id,
|
||||
})),
|
||||
content: result.content,
|
||||
}
|
||||
if (opts.abortSignal?.aborted) {
|
||||
yield* input.processor.completeToolCall(opts.toolCallId, output)
|
||||
}
|
||||
return output
|
||||
}),
|
||||
)
|
||||
const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent)
|
||||
return {
|
||||
title: "",
|
||||
metadata: {
|
||||
...(result.metadata ?? {}),
|
||||
truncated: truncated.truncated,
|
||||
...(truncated.truncated && { outputPath: truncated.outputPath }),
|
||||
},
|
||||
output: truncated.content,
|
||||
attachments: attachments.map((attachment) => ({
|
||||
...attachment,
|
||||
id: PartID.ascending(),
|
||||
sessionID: ctx.sessionID,
|
||||
messageID: input.processor.message.id,
|
||||
})),
|
||||
content: result.content,
|
||||
}
|
||||
}),
|
||||
onCancel: (output) => input.processor.completeToolCall(opts.toolCallId, output),
|
||||
})
|
||||
}
|
||||
tools[key] = item
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user