mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-23 22:34:53 +00:00
Merge origin/dev into fix/interrupted-bash-output
This commit is contained in:
@@ -46,7 +46,7 @@ export namespace FileTime {
|
||||
const disableCheck = yield* Flag.OPENCODE_DISABLE_FILETIME_CHECK
|
||||
|
||||
const stamp = Effect.fnUntraced(function* (file: string) {
|
||||
const info = yield* fsys.stat(file).pipe(Effect.catch(() => Effect.succeed(undefined)))
|
||||
const info = yield* fsys.stat(file).pipe(Effect.catch(() => Effect.void))
|
||||
return {
|
||||
read: yield* DateTime.nowAsDate,
|
||||
mtime: info ? Option.getOrUndefined(info.mtime)?.getTime() : undefined,
|
||||
|
||||
@@ -501,7 +501,7 @@ export namespace MCP {
|
||||
return
|
||||
}
|
||||
|
||||
const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.succeed(undefined)))
|
||||
const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.void))
|
||||
if (!result) return
|
||||
|
||||
s.status[key] = result.status
|
||||
|
||||
@@ -158,7 +158,7 @@ export namespace Project {
|
||||
return yield* fs.readFileString(pathSvc.join(dir, "opencode")).pipe(
|
||||
Effect.map((x) => x.trim()),
|
||||
Effect.map(ProjectID.make),
|
||||
Effect.catch(() => Effect.succeed(undefined)),
|
||||
Effect.catch(() => Effect.void),
|
||||
)
|
||||
})
|
||||
|
||||
|
||||
@@ -253,23 +253,21 @@ When constructing the summary, try to stick to this template:
|
||||
sessionID: input.sessionID,
|
||||
model,
|
||||
})
|
||||
const result = yield* processor
|
||||
.process({
|
||||
user: userMessage,
|
||||
agent,
|
||||
sessionID: input.sessionID,
|
||||
tools: {},
|
||||
system: [],
|
||||
messages: [
|
||||
...modelMessages,
|
||||
{
|
||||
role: "user",
|
||||
content: [{ type: "text", text: prompt }],
|
||||
},
|
||||
],
|
||||
model,
|
||||
})
|
||||
.pipe(Effect.onInterrupt(() => processor.abort()))
|
||||
const result = yield* processor.process({
|
||||
user: userMessage,
|
||||
agent,
|
||||
sessionID: input.sessionID,
|
||||
tools: {},
|
||||
system: [],
|
||||
messages: [
|
||||
...modelMessages,
|
||||
{
|
||||
role: "user",
|
||||
content: [{ type: "text", text: prompt }],
|
||||
},
|
||||
],
|
||||
model,
|
||||
})
|
||||
|
||||
if (result === "compact") {
|
||||
processor.message.error = new MessageV2.ContextOverflowError({
|
||||
|
||||
@@ -31,7 +31,6 @@ export namespace SessionProcessor {
|
||||
export interface Handle {
|
||||
readonly message: MessageV2.Assistant
|
||||
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
|
||||
readonly abort: () => Effect.Effect<void>
|
||||
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
|
||||
}
|
||||
|
||||
@@ -432,19 +431,6 @@ export namespace SessionProcessor {
|
||||
yield* status.set(ctx.sessionID, { type: "idle" })
|
||||
})
|
||||
|
||||
const abort = Effect.fn("SessionProcessor.abort")(() =>
|
||||
Effect.gen(function* () {
|
||||
if (!ctx.assistantMessage.error) {
|
||||
yield* halt(new DOMException("Aborted", "AbortError"))
|
||||
}
|
||||
if (!ctx.assistantMessage.time.completed) {
|
||||
yield* cleanup()
|
||||
return
|
||||
}
|
||||
yield* session.updateMessage(ctx.assistantMessage)
|
||||
}),
|
||||
)
|
||||
|
||||
const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
|
||||
log.info("process")
|
||||
ctx.needsCompaction = false
|
||||
@@ -462,7 +448,14 @@ export namespace SessionProcessor {
|
||||
Stream.runDrain,
|
||||
)
|
||||
}).pipe(
|
||||
Effect.onInterrupt(() => Effect.sync(() => void (aborted = true))),
|
||||
Effect.onInterrupt(() =>
|
||||
Effect.gen(function* () {
|
||||
aborted = true
|
||||
if (!ctx.assistantMessage.error) {
|
||||
yield* halt(new DOMException("Aborted", "AbortError"))
|
||||
}
|
||||
}),
|
||||
),
|
||||
Effect.catchCauseIf(
|
||||
(cause) => !Cause.hasInterruptsOnly(cause),
|
||||
(cause) => Effect.fail(Cause.squash(cause)),
|
||||
@@ -483,13 +476,10 @@ export namespace SessionProcessor {
|
||||
Effect.ensuring(cleanup()),
|
||||
)
|
||||
|
||||
if (aborted && !ctx.assistantMessage.error) {
|
||||
yield* abort()
|
||||
}
|
||||
if (ctx.needsCompaction) return "compact"
|
||||
if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop"
|
||||
if (ctx.blocked || ctx.assistantMessage.error) return "stop"
|
||||
return "continue"
|
||||
}).pipe(Effect.onInterrupt(() => abort().pipe(Effect.asVoid)))
|
||||
})
|
||||
})
|
||||
|
||||
return {
|
||||
@@ -499,7 +489,6 @@ export namespace SessionProcessor {
|
||||
partFromToolCall(toolCallID: string) {
|
||||
return ctx.toolcalls[toolCallID]
|
||||
},
|
||||
abort,
|
||||
process,
|
||||
} satisfies Handle
|
||||
})
|
||||
|
||||
@@ -559,7 +559,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
}) {
|
||||
const { task, model, lastUser, sessionID, session, msgs } = input
|
||||
const ctx = yield* InstanceState.context
|
||||
const taskTool = yield* registry.fromID(TaskTool.id)
|
||||
const { task: taskTool } = yield* registry.named()
|
||||
const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model
|
||||
const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({
|
||||
id: MessageID.ascending(),
|
||||
@@ -964,9 +964,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
const same = ag.model && model.providerID === ag.model.providerID && model.modelID === ag.model.modelID
|
||||
const full =
|
||||
!input.variant && ag.variant && same
|
||||
? yield* provider
|
||||
.getModel(model.providerID, model.modelID)
|
||||
.pipe(Effect.catch(() => Effect.succeed(undefined)))
|
||||
? yield* provider.getModel(model.providerID, model.modelID).pipe(Effect.catchDefect(() => Effect.void))
|
||||
: undefined
|
||||
const variant = input.variant ?? (ag.variant && full?.variants?.[ag.variant] ? ag.variant : undefined)
|
||||
|
||||
@@ -986,9 +984,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
format: input.format,
|
||||
}
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
InstanceState.withALS(() => instruction.clear(info.id)).pipe(Effect.flatMap((x) => x)),
|
||||
)
|
||||
yield* Effect.addFinalizer(() => instruction.clear(info.id))
|
||||
|
||||
type Draft<T> = T extends MessageV2.Part ? Omit<T, "id"> & { id?: string } : never
|
||||
const assign = (part: Draft<MessageV2.Part>): MessageV2.Part => ({
|
||||
@@ -1080,6 +1076,21 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
const filepath = fileURLToPath(part.url)
|
||||
if (yield* fsys.isDir(filepath)) part.mime = "application/x-directory"
|
||||
|
||||
const { read } = yield* registry.named()
|
||||
const execRead = (args: Parameters<typeof read.execute>[0], extra?: Tool.Context["extra"]) =>
|
||||
Effect.promise((signal: AbortSignal) =>
|
||||
read.execute(args, {
|
||||
sessionID: input.sessionID,
|
||||
abort: signal,
|
||||
agent: input.agent!,
|
||||
messageID: info.id,
|
||||
extra: { bypassCwdCheck: true, ...extra },
|
||||
messages: [],
|
||||
metadata: async () => {},
|
||||
ask: async () => {},
|
||||
}),
|
||||
)
|
||||
|
||||
if (part.mime === "text/plain") {
|
||||
let offset: number | undefined
|
||||
let limit: number | undefined
|
||||
@@ -1116,29 +1127,12 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
|
||||
},
|
||||
]
|
||||
const read = yield* registry.fromID("read").pipe(
|
||||
Effect.flatMap((t) =>
|
||||
provider.getModel(info.model.providerID, info.model.modelID).pipe(
|
||||
Effect.flatMap((mdl) =>
|
||||
Effect.promise(() =>
|
||||
t.execute(args, {
|
||||
sessionID: input.sessionID,
|
||||
abort: new AbortController().signal,
|
||||
agent: input.agent!,
|
||||
messageID: info.id,
|
||||
extra: { bypassCwdCheck: true, model: mdl },
|
||||
messages: [],
|
||||
metadata: async () => {},
|
||||
ask: async () => {},
|
||||
}),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
const exit = yield* provider.getModel(info.model.providerID, info.model.modelID).pipe(
|
||||
Effect.flatMap((mdl) => execRead(args, { model: mdl })),
|
||||
Effect.exit,
|
||||
)
|
||||
if (Exit.isSuccess(read)) {
|
||||
const result = read.value
|
||||
if (Exit.isSuccess(exit)) {
|
||||
const result = exit.value
|
||||
pieces.push({
|
||||
messageID: info.id,
|
||||
sessionID: input.sessionID,
|
||||
@@ -1160,7 +1154,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID })
|
||||
}
|
||||
} else {
|
||||
const error = Cause.squash(read.cause)
|
||||
const error = Cause.squash(exit.cause)
|
||||
log.error("failed to read file", { error })
|
||||
const message = error instanceof Error ? error.message : String(error)
|
||||
yield* bus.publish(Session.Event.Error, {
|
||||
@@ -1180,22 +1174,25 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
|
||||
if (part.mime === "application/x-directory") {
|
||||
const args = { filePath: filepath }
|
||||
const result = yield* registry.fromID("read").pipe(
|
||||
Effect.flatMap((t) =>
|
||||
Effect.promise(() =>
|
||||
t.execute(args, {
|
||||
sessionID: input.sessionID,
|
||||
abort: new AbortController().signal,
|
||||
agent: input.agent!,
|
||||
messageID: info.id,
|
||||
extra: { bypassCwdCheck: true },
|
||||
messages: [],
|
||||
metadata: async () => {},
|
||||
ask: async () => {},
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
const exit = yield* execRead(args).pipe(Effect.exit)
|
||||
if (Exit.isFailure(exit)) {
|
||||
const error = Cause.squash(exit.cause)
|
||||
log.error("failed to read directory", { error })
|
||||
const message = error instanceof Error ? error.message : String(error)
|
||||
yield* bus.publish(Session.Event.Error, {
|
||||
sessionID: input.sessionID,
|
||||
error: new NamedError.Unknown({ message }).toObject(),
|
||||
})
|
||||
return [
|
||||
{
|
||||
messageID: info.id,
|
||||
sessionID: input.sessionID,
|
||||
type: "text",
|
||||
synthetic: true,
|
||||
text: `Read tool failed to read ${filepath} with the following error: ${message}`,
|
||||
},
|
||||
]
|
||||
}
|
||||
return [
|
||||
{
|
||||
messageID: info.id,
|
||||
@@ -1209,7 +1206,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
sessionID: input.sessionID,
|
||||
type: "text",
|
||||
synthetic: true,
|
||||
text: result.output,
|
||||
text: exit.value.output,
|
||||
},
|
||||
{ ...part, messageID: info.id, sessionID: input.sessionID },
|
||||
]
|
||||
@@ -1458,110 +1455,104 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
model,
|
||||
})
|
||||
|
||||
const outcome: "break" | "continue" = yield* Effect.onExit(
|
||||
Effect.gen(function* () {
|
||||
const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
|
||||
const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
|
||||
const outcome: "break" | "continue" = yield* Effect.gen(function* () {
|
||||
const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
|
||||
const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
|
||||
|
||||
const tools = yield* resolveTools({
|
||||
agent,
|
||||
session,
|
||||
model,
|
||||
tools: lastUser.tools,
|
||||
processor: handle,
|
||||
bypassAgentCheck,
|
||||
messages: msgs,
|
||||
const tools = yield* resolveTools({
|
||||
agent,
|
||||
session,
|
||||
model,
|
||||
tools: lastUser.tools,
|
||||
processor: handle,
|
||||
bypassAgentCheck,
|
||||
messages: msgs,
|
||||
})
|
||||
|
||||
if (lastUser.format?.type === "json_schema") {
|
||||
tools["StructuredOutput"] = createStructuredOutputTool({
|
||||
schema: lastUser.format.schema,
|
||||
onSuccess(output) {
|
||||
structured = output
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
if (lastUser.format?.type === "json_schema") {
|
||||
tools["StructuredOutput"] = createStructuredOutputTool({
|
||||
schema: lastUser.format.schema,
|
||||
onSuccess(output) {
|
||||
structured = output
|
||||
},
|
||||
})
|
||||
}
|
||||
if (step === 1) SessionSummary.summarize({ sessionID, messageID: lastUser.id })
|
||||
|
||||
if (step === 1) SessionSummary.summarize({ sessionID, messageID: lastUser.id })
|
||||
|
||||
if (step > 1 && lastFinished) {
|
||||
for (const m of msgs) {
|
||||
if (m.info.role !== "user" || m.info.id <= lastFinished.id) continue
|
||||
for (const p of m.parts) {
|
||||
if (p.type !== "text" || p.ignored || p.synthetic) continue
|
||||
if (!p.text.trim()) continue
|
||||
p.text = [
|
||||
"<system-reminder>",
|
||||
"The user sent the following message:",
|
||||
p.text,
|
||||
"",
|
||||
"Please address this message and continue with your tasks.",
|
||||
"</system-reminder>",
|
||||
].join("\n")
|
||||
}
|
||||
if (step > 1 && lastFinished) {
|
||||
for (const m of msgs) {
|
||||
if (m.info.role !== "user" || m.info.id <= lastFinished.id) continue
|
||||
for (const p of m.parts) {
|
||||
if (p.type !== "text" || p.ignored || p.synthetic) continue
|
||||
if (!p.text.trim()) continue
|
||||
p.text = [
|
||||
"<system-reminder>",
|
||||
"The user sent the following message:",
|
||||
p.text,
|
||||
"",
|
||||
"Please address this message and continue with your tasks.",
|
||||
"</system-reminder>",
|
||||
].join("\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
|
||||
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
|
||||
|
||||
const [skills, env, instructions, modelMsgs] = yield* Effect.all([
|
||||
Effect.promise(() => SystemPrompt.skills(agent)),
|
||||
Effect.promise(() => SystemPrompt.environment(model)),
|
||||
instruction.system().pipe(Effect.orDie),
|
||||
MessageV2.toModelMessagesEffect(msgs, model),
|
||||
])
|
||||
const system = [...env, ...(skills ? [skills] : []), ...instructions]
|
||||
const format = lastUser.format ?? { type: "text" as const }
|
||||
if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
|
||||
const result = yield* handle.process({
|
||||
user: lastUser,
|
||||
agent,
|
||||
permission: session.permission,
|
||||
sessionID,
|
||||
parentSessionID: session.parentID,
|
||||
system,
|
||||
messages: [...modelMsgs, ...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : [])],
|
||||
tools,
|
||||
model,
|
||||
toolChoice: format.type === "json_schema" ? "required" : undefined,
|
||||
})
|
||||
const [skills, env, instructions, modelMsgs] = yield* Effect.all([
|
||||
Effect.promise(() => SystemPrompt.skills(agent)),
|
||||
Effect.promise(() => SystemPrompt.environment(model)),
|
||||
instruction.system().pipe(Effect.orDie),
|
||||
MessageV2.toModelMessagesEffect(msgs, model),
|
||||
])
|
||||
const system = [...env, ...(skills ? [skills] : []), ...instructions]
|
||||
const format = lastUser.format ?? { type: "text" as const }
|
||||
if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
|
||||
const result = yield* handle.process({
|
||||
user: lastUser,
|
||||
agent,
|
||||
permission: session.permission,
|
||||
sessionID,
|
||||
parentSessionID: session.parentID,
|
||||
system,
|
||||
messages: [...modelMsgs, ...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : [])],
|
||||
tools,
|
||||
model,
|
||||
toolChoice: format.type === "json_schema" ? "required" : undefined,
|
||||
})
|
||||
|
||||
if (structured !== undefined) {
|
||||
handle.message.structured = structured
|
||||
handle.message.finish = handle.message.finish ?? "stop"
|
||||
if (structured !== undefined) {
|
||||
handle.message.structured = structured
|
||||
handle.message.finish = handle.message.finish ?? "stop"
|
||||
yield* sessions.updateMessage(handle.message)
|
||||
return "break" as const
|
||||
}
|
||||
|
||||
const finished = handle.message.finish && !["tool-calls", "unknown"].includes(handle.message.finish)
|
||||
if (finished && !handle.message.error) {
|
||||
if (format.type === "json_schema") {
|
||||
handle.message.error = new MessageV2.StructuredOutputError({
|
||||
message: "Model did not produce structured output",
|
||||
retries: 0,
|
||||
}).toObject()
|
||||
yield* sessions.updateMessage(handle.message)
|
||||
return "break" as const
|
||||
}
|
||||
}
|
||||
|
||||
const finished = handle.message.finish && !["tool-calls", "unknown"].includes(handle.message.finish)
|
||||
if (finished && !handle.message.error) {
|
||||
if (format.type === "json_schema") {
|
||||
handle.message.error = new MessageV2.StructuredOutputError({
|
||||
message: "Model did not produce structured output",
|
||||
retries: 0,
|
||||
}).toObject()
|
||||
yield* sessions.updateMessage(handle.message)
|
||||
return "break" as const
|
||||
}
|
||||
}
|
||||
|
||||
if (result === "stop") return "break" as const
|
||||
if (result === "compact") {
|
||||
yield* compaction.create({
|
||||
sessionID,
|
||||
agent: lastUser.agent,
|
||||
model: lastUser.model,
|
||||
auto: true,
|
||||
overflow: !handle.message.finish,
|
||||
})
|
||||
}
|
||||
return "continue" as const
|
||||
}),
|
||||
Effect.fnUntraced(function* (exit) {
|
||||
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort()
|
||||
yield* InstanceState.withALS(() => instruction.clear(handle.message.id)).pipe(Effect.flatMap((x) => x))
|
||||
}),
|
||||
)
|
||||
if (result === "stop") return "break" as const
|
||||
if (result === "compact") {
|
||||
yield* compaction.create({
|
||||
sessionID,
|
||||
agent: lastUser.agent,
|
||||
model: lastUser.model,
|
||||
auto: true,
|
||||
overflow: !handle.message.finish,
|
||||
})
|
||||
}
|
||||
return "continue" as const
|
||||
}).pipe(Effect.ensuring(instruction.clear(handle.message.id)))
|
||||
if (outcome === "break") break
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -67,9 +67,7 @@ export const ReadTool = Tool.defineEffect(
|
||||
if (item.type === "directory") return item.name + "/"
|
||||
if (item.type !== "symlink") return item.name
|
||||
|
||||
const target = yield* fs
|
||||
.stat(path.join(filepath, item.name))
|
||||
.pipe(Effect.catch(() => Effect.succeed(undefined)))
|
||||
const target = yield* fs.stat(path.join(filepath, item.name)).pipe(Effect.catch(() => Effect.void))
|
||||
if (target?.type === "Directory") return item.name + "/"
|
||||
return item.name
|
||||
}),
|
||||
|
||||
@@ -42,24 +42,25 @@ import { Agent } from "../agent/agent"
|
||||
export namespace ToolRegistry {
|
||||
const log = Log.create({ service: "tool.registry" })
|
||||
|
||||
type TaskDef = Tool.InferDef<typeof TaskTool>
|
||||
type ReadDef = Tool.InferDef<typeof ReadTool>
|
||||
|
||||
type State = {
|
||||
custom: Tool.Def[]
|
||||
builtin: Tool.Def[]
|
||||
task: TaskDef
|
||||
read: ReadDef
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly ids: () => Effect.Effect<string[]>
|
||||
readonly all: () => Effect.Effect<Tool.Def[]>
|
||||
readonly named: {
|
||||
task: Tool.Info
|
||||
read: Tool.Info
|
||||
}
|
||||
readonly named: () => Effect.Effect<{ task: TaskDef; read: ReadDef }>
|
||||
readonly tools: (model: {
|
||||
providerID: ProviderID
|
||||
modelID: ModelID
|
||||
agent: Agent.Info
|
||||
}) => Effect.Effect<Tool.Def[]>
|
||||
readonly fromID: (id: string) => Effect.Effect<Tool.Def>
|
||||
}
|
||||
|
||||
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/ToolRegistry") {}
|
||||
@@ -183,6 +184,8 @@ export namespace ToolRegistry {
|
||||
...(Flag.OPENCODE_EXPERIMENTAL_LSP_TOOL ? [tool.lsp] : []),
|
||||
...(Flag.OPENCODE_EXPERIMENTAL_PLAN_MODE && Flag.OPENCODE_CLIENT === "cli" ? [tool.plan] : []),
|
||||
],
|
||||
task: tool.task,
|
||||
read: tool.read,
|
||||
}
|
||||
}),
|
||||
)
|
||||
@@ -192,13 +195,6 @@ export namespace ToolRegistry {
|
||||
return [...s.builtin, ...s.custom] as Tool.Def[]
|
||||
})
|
||||
|
||||
const fromID: Interface["fromID"] = Effect.fn("ToolRegistry.fromID")(function* (id: string) {
|
||||
const tools = yield* all()
|
||||
const match = tools.find((tool) => tool.id === id)
|
||||
if (!match) return yield* Effect.die(`Tool not found: ${id}`)
|
||||
return match
|
||||
})
|
||||
|
||||
const ids: Interface["ids"] = Effect.fn("ToolRegistry.ids")(function* () {
|
||||
return (yield* all()).map((tool) => tool.id)
|
||||
})
|
||||
@@ -245,7 +241,12 @@ export namespace ToolRegistry {
|
||||
)
|
||||
})
|
||||
|
||||
return Service.of({ ids, all, named: { task, read }, tools, fromID })
|
||||
const named: Interface["named"] = Effect.fn("ToolRegistry.named")(function* () {
|
||||
const s = yield* InstanceState.get(state)
|
||||
return { task: s.task, read: s.read }
|
||||
})
|
||||
|
||||
return Service.of({ ids, all, named, tools })
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
@@ -60,6 +60,13 @@ export namespace Tool {
|
||||
export type InferMetadata<T> =
|
||||
T extends Info<any, infer M> ? M : T extends Effect.Effect<Info<any, infer M>, any, any> ? M : never
|
||||
|
||||
export type InferDef<T> =
|
||||
T extends Info<infer P, infer M>
|
||||
? Def<P, M>
|
||||
: T extends Effect.Effect<Info<infer P, infer M>, any, any>
|
||||
? Def<P, M>
|
||||
: never
|
||||
|
||||
function wrap<Parameters extends z.ZodType, Result extends Metadata>(
|
||||
id: string,
|
||||
init: (() => Promise<DefWithoutID<Parameters, Result>>) | DefWithoutID<Parameters, Result>,
|
||||
@@ -118,7 +125,7 @@ export namespace Tool {
|
||||
)
|
||||
}
|
||||
|
||||
export function init(info: Info): Effect.Effect<Def> {
|
||||
export function init<P extends z.ZodType, M extends Metadata>(info: Info<P, M>): Effect.Effect<Def<P, M>> {
|
||||
return Effect.gen(function* () {
|
||||
const init = yield* Effect.promise(() => info.init())
|
||||
return {
|
||||
|
||||
@@ -139,7 +139,6 @@ function fake(
|
||||
get message() {
|
||||
return msg
|
||||
},
|
||||
abort: Effect.fn("TestSessionProcessor.abort")(() => Effect.void),
|
||||
partFromToolCall() {
|
||||
return {
|
||||
id: PartID.ascending(),
|
||||
|
||||
@@ -593,9 +593,6 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
|
||||
yield* Fiber.interrupt(run)
|
||||
|
||||
const exit = yield* Fiber.await(run)
|
||||
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
|
||||
yield* handle.abort()
|
||||
}
|
||||
const parts = MessageV2.parts(msg.id)
|
||||
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
|
||||
|
||||
@@ -666,9 +663,6 @@ it.live("session.processor effect tests record aborted errors and idle state", (
|
||||
yield* Fiber.interrupt(run)
|
||||
|
||||
const exit = yield* Fiber.await(run)
|
||||
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
|
||||
yield* handle.abort()
|
||||
}
|
||||
yield* Effect.promise(() => seen.promise)
|
||||
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
|
||||
const state = yield* sts.get(chat.id)
|
||||
|
||||
@@ -631,31 +631,22 @@ it.live(
|
||||
const ready = defer<void>()
|
||||
const aborted = defer<void>()
|
||||
const registry = yield* ToolRegistry.Service
|
||||
const init = registry.named.task.init
|
||||
registry.named.task.init = async () => ({
|
||||
description: "task",
|
||||
parameters: z.object({
|
||||
description: z.string(),
|
||||
prompt: z.string(),
|
||||
subagent_type: z.string(),
|
||||
task_id: z.string().optional(),
|
||||
command: z.string().optional(),
|
||||
}),
|
||||
execute: async (_args, ctx) => {
|
||||
ready.resolve()
|
||||
ctx.abort.addEventListener("abort", () => aborted.resolve(), { once: true })
|
||||
await new Promise<void>(() => {})
|
||||
return {
|
||||
title: "",
|
||||
metadata: {
|
||||
sessionId: SessionID.make("task"),
|
||||
model: ref,
|
||||
},
|
||||
output: "",
|
||||
}
|
||||
},
|
||||
})
|
||||
yield* Effect.addFinalizer(() => Effect.sync(() => void (registry.named.task.init = init)))
|
||||
const { task } = yield* registry.named()
|
||||
const original = task.execute
|
||||
task.execute = async (_args, ctx) => {
|
||||
ready.resolve()
|
||||
ctx.abort.addEventListener("abort", () => aborted.resolve(), { once: true })
|
||||
await new Promise<void>(() => {})
|
||||
return {
|
||||
title: "",
|
||||
metadata: {
|
||||
sessionId: SessionID.make("task"),
|
||||
model: ref,
|
||||
},
|
||||
output: "",
|
||||
}
|
||||
}
|
||||
yield* Effect.addFinalizer(() => Effect.sync(() => void (task.execute = original)))
|
||||
|
||||
const { prompt, chat } = yield* boot()
|
||||
const msg = yield* user(chat.id, "hello")
|
||||
@@ -1240,3 +1231,109 @@ unix(
|
||||
),
|
||||
30_000,
|
||||
)
|
||||
|
||||
// Abort signal propagation tests for inline tool execution
|
||||
|
||||
/** Override a tool's execute to hang until aborted. Returns ready/aborted defers and a finalizer. */
|
||||
function hangUntilAborted(tool: { execute: (...args: any[]) => any }) {
|
||||
const ready = defer<void>()
|
||||
const aborted = defer<void>()
|
||||
const original = tool.execute
|
||||
tool.execute = async (_args: any, ctx: any) => {
|
||||
ready.resolve()
|
||||
ctx.abort.addEventListener("abort", () => aborted.resolve(), { once: true })
|
||||
await new Promise<void>(() => {})
|
||||
return { title: "", metadata: {}, output: "" }
|
||||
}
|
||||
const restore = Effect.addFinalizer(() => Effect.sync(() => void (tool.execute = original)))
|
||||
return { ready, aborted, restore }
|
||||
}
|
||||
|
||||
it.live(
|
||||
"interrupt propagates abort signal to read tool via file part (text/plain)",
|
||||
() =>
|
||||
provideTmpdirInstance(
|
||||
(dir) =>
|
||||
Effect.gen(function* () {
|
||||
const registry = yield* ToolRegistry.Service
|
||||
const { read } = yield* registry.named()
|
||||
const { ready, aborted, restore } = hangUntilAborted(read)
|
||||
yield* restore
|
||||
|
||||
const prompt = yield* SessionPrompt.Service
|
||||
const sessions = yield* Session.Service
|
||||
const chat = yield* sessions.create({ title: "Abort Test" })
|
||||
|
||||
const testFile = path.join(dir, "test.txt")
|
||||
yield* Effect.promise(() => Bun.write(testFile, "hello world"))
|
||||
|
||||
const fiber = yield* prompt
|
||||
.prompt({
|
||||
sessionID: chat.id,
|
||||
agent: "build",
|
||||
parts: [
|
||||
{ type: "text", text: "read this" },
|
||||
{ type: "file", url: `file://${testFile}`, filename: "test.txt", mime: "text/plain" },
|
||||
],
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
|
||||
yield* Effect.promise(() => ready.promise)
|
||||
yield* Fiber.interrupt(fiber)
|
||||
|
||||
yield* Effect.promise(() =>
|
||||
Promise.race([
|
||||
aborted.promise,
|
||||
new Promise<void>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("abort signal not propagated within 2s")), 2_000),
|
||||
),
|
||||
]),
|
||||
)
|
||||
}),
|
||||
{ git: true, config: cfg },
|
||||
),
|
||||
30_000,
|
||||
)
|
||||
|
||||
it.live(
|
||||
"interrupt propagates abort signal to read tool via file part (directory)",
|
||||
() =>
|
||||
provideTmpdirInstance(
|
||||
(dir) =>
|
||||
Effect.gen(function* () {
|
||||
const registry = yield* ToolRegistry.Service
|
||||
const { read } = yield* registry.named()
|
||||
const { ready, aborted, restore } = hangUntilAborted(read)
|
||||
yield* restore
|
||||
|
||||
const prompt = yield* SessionPrompt.Service
|
||||
const sessions = yield* Session.Service
|
||||
const chat = yield* sessions.create({ title: "Abort Test" })
|
||||
|
||||
const fiber = yield* prompt
|
||||
.prompt({
|
||||
sessionID: chat.id,
|
||||
agent: "build",
|
||||
parts: [
|
||||
{ type: "text", text: "read this" },
|
||||
{ type: "file", url: `file://${dir}`, filename: "dir", mime: "application/x-directory" },
|
||||
],
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
|
||||
yield* Effect.promise(() => ready.promise)
|
||||
yield* Fiber.interrupt(fiber)
|
||||
|
||||
yield* Effect.promise(() =>
|
||||
Promise.race([
|
||||
aborted.promise,
|
||||
new Promise<void>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("abort signal not propagated within 2s")), 2_000),
|
||||
),
|
||||
]),
|
||||
)
|
||||
}),
|
||||
{ git: true, config: cfg },
|
||||
),
|
||||
30_000,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user