remove abort from llm and processor services

This commit is contained in:
Kit Langton
2026-03-28 21:35:24 -04:00
parent 724e599cb0
commit eb05bda03b
7 changed files with 446 additions and 245 deletions

View File

@@ -235,7 +235,6 @@ When constructing the summary, try to stick to this template:
assistantMessage: msg,
sessionID: input.sessionID,
model,
abort: input.abort,
})
const cancel = Effect.fn("SessionCompaction.cancel")(function* () {
if (!input.abort.aborted || msg.time.completed) return
@@ -248,7 +247,6 @@ When constructing the summary, try to stick to this template:
.process({
user: userMessage,
agent,
abort: input.abort,
sessionID: input.sessionID,
tools: {},
system: [],

View File

@@ -1,6 +1,6 @@
import { Provider } from "@/provider/provider"
import { Log } from "@/util/log"
import { Effect, Layer, ServiceMap } from "effect"
import { Effect, Layer, Record, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
import { mergeDeep, pipe } from "remeda"
@@ -28,7 +28,6 @@ export namespace LLM {
agent: Agent.Info
permission?: Permission.Ruleset
system: string[]
abort: AbortSignal
messages: ModelMessage[]
small?: boolean
tools: Record<string, Tool>
@@ -36,6 +35,10 @@ export namespace LLM {
toolChoice?: "auto" | "required" | "none"
}
export type StreamRequest = StreamInput & {
abort: AbortSignal
}
export type Event = Awaited<ReturnType<typeof stream>>["fullStream"] extends AsyncIterable<infer T> ? T : never
export interface Interface {
@@ -50,7 +53,7 @@ export namespace LLM {
return Service.of({
stream(input) {
return Stream.unwrap(
Effect.promise(() => LLM.stream(input)).pipe(
Effect.promise((signal) => LLM.stream({ ...input, abort: signal })).pipe(
Effect.map((result) =>
Stream.fromAsyncIterable(result.fullStream, (err) => err).pipe(
Stream.mapEffect((event) => Effect.succeed(event)),
@@ -65,7 +68,7 @@ export namespace LLM {
export const defaultLayer = layer
export async function stream(input: StreamInput) {
export async function stream(input: StreamRequest) {
const l = log
.clone()
.tag("providerID", input.model.providerID)
@@ -314,17 +317,12 @@ export namespace LLM {
})
}
async function resolveTools(input: Pick<StreamInput, "tools" | "agent" | "permission" | "user">) {
function resolveTools(input: Pick<StreamInput, "tools" | "agent" | "permission" | "user">) {
const disabled = Permission.disabled(
Object.keys(input.tools),
Permission.merge(input.agent.permission, input.permission ?? []),
)
for (const tool of Object.keys(input.tools)) {
if (input.user.tools?.[tool] === false || disabled.has(tool)) {
delete input.tools[tool]
}
}
return input.tools
return Record.filter(input.tools, (_, k) => input.user.tools?.[k] !== false && !disabled.has(k))
}
// Check if messages contain any tool-call content

View File

@@ -2,7 +2,6 @@ import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
import { makeRuntime } from "@/effect/run-service"
import { Config } from "@/config/config"
import { Permission } from "@/permission"
import { Plugin } from "@/plugin"
@@ -35,17 +34,10 @@ export namespace SessionProcessor {
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
}
export interface Info {
readonly message: MessageV2.Assistant
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
readonly process: (streamInput: LLM.StreamInput) => Promise<Result>
}
type Input = {
assistantMessage: MessageV2.Assistant
sessionID: SessionID
model: Provider.Model
abort: AbortSignal
}
export interface Interface {
@@ -96,7 +88,6 @@ export namespace SessionProcessor {
assistantMessage: input.assistantMessage,
sessionID: input.sessionID,
model: input.model,
abort: input.abort,
toolcalls: {},
shouldBreak: false,
snapshot: undefined,
@@ -105,11 +96,12 @@ export namespace SessionProcessor {
currentText: undefined,
reasoningMap: {},
}
let aborted = false
const parse = (e: unknown) =>
MessageV2.fromError(e, {
providerID: input.model.providerID,
aborted: input.abort.aborted,
aborted,
})
const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
@@ -440,16 +432,12 @@ export namespace SessionProcessor {
const stream = llm.stream(streamInput)
yield* stream.pipe(
Stream.tap((event) =>
Effect.gen(function* () {
input.abort.throwIfAborted()
yield* handleEvent(event)
}),
),
Stream.tap((event) => handleEvent(event)),
Stream.takeUntil(() => ctx.needsCompaction),
Stream.runDrain,
)
}).pipe(
Effect.onInterrupt(() => Effect.sync(() => void (aborted = true))),
Effect.catchCauseIf(
(cause) => !Cause.hasInterruptsOnly(cause),
(cause) => Effect.fail(Cause.squash(cause)),
@@ -468,17 +456,20 @@ export namespace SessionProcessor {
),
Effect.catchCause((cause) =>
Cause.hasInterruptsOnly(cause)
? halt(new DOMException("Aborted", "AbortError"))
? Effect.gen(function* () {
aborted = true
yield* halt(new DOMException("Aborted", "AbortError"))
})
: halt(Cause.squash(cause)),
),
Effect.ensuring(cleanup()),
)
if (input.abort.aborted && !ctx.assistantMessage.error) {
if (aborted && !ctx.assistantMessage.error) {
yield* abort()
}
if (ctx.needsCompaction) return "compact"
if (ctx.blocked || ctx.assistantMessage.error || input.abort.aborted) return "stop"
if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop"
return "continue"
})
@@ -526,29 +517,4 @@ export namespace SessionProcessor {
),
),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function create(input: Input): Promise<Info> {
const hit = await runPromise((svc) => svc.create(input))
return {
get message() {
return hit.message
},
partFromToolCall(toolCallID: string) {
return hit.partFromToolCall(toolCallID)
},
async process(streamInput: LLM.StreamInput) {
const exit = await Effect.runPromiseExit(hit.process(streamInput), { signal: input.abort })
if (Exit.isFailure(exit)) {
if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) {
await Effect.runPromise(hit.abort())
return "stop"
}
throw Cause.squash(exit.cause)
}
return exit.value
},
}
}
}

View File

@@ -201,7 +201,8 @@ export namespace SessionPrompt {
const text = yield* Effect.promise(async (signal) => {
const mdl = ag.model
? await Provider.getModel(ag.model.providerID, ag.model.modelID)
: (await Provider.getSmallModel(input.providerID)) ?? (await Provider.getModel(input.providerID, input.modelID))
: ((await Provider.getSmallModel(input.providerID)) ??
(await Provider.getModel(input.providerID, input.modelID)))
const msgs = onlySubtasks
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
: await MessageV2.toModelMessages(context, mdl)
@@ -236,7 +237,9 @@ export namespace SessionPrompt {
const hint = e.data.suggestions?.length ? ` Did you mean: ${e.data.suggestions.join(", ")}?` : ""
Bus.publish(Session.Event.Error, {
sessionID,
error: new NamedError.Unknown({ message: `Model not found: ${e.data.providerID}/${e.data.modelID}.${hint}` }).toObject(),
error: new NamedError.Unknown({
message: `Model not found: ${e.data.providerID}/${e.data.modelID}.${hint}`,
}).toObject(),
})
}
throw e
@@ -288,7 +291,13 @@ export namespace SessionPrompt {
const { clientName, uri } = part.source
log.info("mcp resource", { clientName, uri, mime: part.mime })
const pieces: Draft<MessageV2.Part>[] = [
{ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Reading MCP resource: ${part.filename} (${uri})` },
{
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: `Reading MCP resource: ${part.filename} (${uri})`,
},
]
try {
const content = await MCP.readResource(clientName, uri)
@@ -296,17 +305,35 @@ export namespace SessionPrompt {
const items = Array.isArray(content.contents) ? content.contents : [content.contents]
for (const c of items) {
if ("text" in c && c.text) {
pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: c.text })
pieces.push({
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: c.text,
})
} else if ("blob" in c && c.blob) {
const mime = "mimeType" in c ? c.mimeType : part.mime
pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `[Binary content: ${mime}]` })
pieces.push({
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: `[Binary content: ${mime}]`,
})
}
}
pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID })
} catch (error: unknown) {
log.error("failed to read MCP resource", { error, clientName, uri })
const message = error instanceof Error ? error.message : String(error)
pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Failed to read MCP resource ${part.filename}: ${message}` })
pieces.push({
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: `Failed to read MCP resource ${part.filename}: ${message}`,
})
}
return pieces
}
@@ -315,8 +342,20 @@ export namespace SessionPrompt {
case "data:":
if (part.mime === "text/plain") {
return [
{ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Called the Read tool with the following input: ${JSON.stringify({ filePath: part.filename })}` },
{ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: decodeDataUrl(part.url) },
{
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: `Called the Read tool with the following input: ${JSON.stringify({ filePath: part.filename })}`,
},
{
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: decodeDataUrl(part.url),
},
{ ...part, messageID: info.id, sessionID: input.sessionID },
]
}
@@ -353,7 +392,13 @@ export namespace SessionPrompt {
}
const args = { filePath: filepath, offset, limit }
const pieces: Draft<MessageV2.Part>[] = [
{ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Called the Read tool with the following input: ${JSON.stringify(args)}` },
{
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
},
]
await ReadTool.init()
.then(async (t) => {
@@ -369,9 +414,23 @@ export namespace SessionPrompt {
ask: async () => {},
}
const result = await t.execute(args, ctx)
pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: result.output })
pieces.push({
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: result.output,
})
if (result.attachments?.length) {
pieces.push(...result.attachments.map((a) => ({ ...a, synthetic: true, filename: a.filename ?? part.filename, messageID: info.id, sessionID: input.sessionID })))
pieces.push(
...result.attachments.map((a) => ({
...a,
synthetic: true,
filename: a.filename ?? part.filename,
messageID: info.id,
sessionID: input.sessionID,
})),
)
} else {
pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID })
}
@@ -379,8 +438,17 @@ export namespace SessionPrompt {
.catch((error) => {
log.error("failed to read file", { error })
const message = error instanceof Error ? error.message : error.toString()
Bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: new NamedError.Unknown({ message }).toObject() })
pieces.push({ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Read tool failed to read ${filepath} with the following error: ${message}` })
Bus.publish(Session.Event.Error, {
sessionID: input.sessionID,
error: new NamedError.Unknown({ message }).toObject(),
})
pieces.push({
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: `Read tool failed to read ${filepath} with the following error: ${message}`,
})
})
return pieces
}
@@ -399,15 +467,33 @@ export namespace SessionPrompt {
}
const result = await ReadTool.init().then((t) => t.execute(args, ctx))
return [
{ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Called the Read tool with the following input: ${JSON.stringify(args)}` },
{ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: result.output },
{
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
},
{
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: result.output,
},
{ ...part, messageID: info.id, sessionID: input.sessionID },
]
}
await FileTime.read(input.sessionID, filepath)
return [
{ messageID: info.id, sessionID: input.sessionID, type: "text", synthetic: true, text: `Called the Read tool with the following input: {"filePath":"${filepath}"}` },
{
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: `Called the Read tool with the following input: {"filePath":"${filepath}"}`,
},
{
id: part.id,
messageID: info.id,
@@ -433,7 +519,10 @@ export namespace SessionPrompt {
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: " Use the above message and context to generate a prompt and call the task tool with subagent: " + part.name + hint,
text:
" Use the above message and context to generate a prompt and call the task tool with subagent: " +
part.name +
hint,
},
]
}
@@ -443,13 +532,17 @@ export namespace SessionPrompt {
).then((x) => x.flat().map(assign)),
)
yield* plugin.trigger("chat.message", {
sessionID: input.sessionID,
agent: input.agent,
model: input.model,
messageID: input.messageID,
variant: input.variant,
}, { message: info, parts })
yield* plugin.trigger(
"chat.message",
{
sessionID: input.sessionID,
agent: input.agent,
model: input.model,
messageID: input.messageID,
variant: input.variant,
},
{ message: info, parts },
)
const parsed = MessageV2.Info.safeParse(info)
if (!parsed.success) {
@@ -614,12 +707,10 @@ export namespace SessionPrompt {
time: { created: Date.now() },
sessionID,
})
const ctrl = new AbortController()
const handle = yield* processor.create({
assistantMessage: msg as MessageV2.Assistant,
sessionID,
model,
abort: ctrl.signal,
})
const outcome: "break" | "continue" = yield* Effect.onExit(
@@ -685,7 +776,6 @@ export namespace SessionPrompt {
user: lastUser!,
agent,
permission: session.permission,
abort: ctrl.signal,
sessionID,
system,
messages: [...modelMsgs, ...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : [])],
@@ -727,7 +817,6 @@ export namespace SessionPrompt {
}),
(exit) =>
Effect.gen(function* () {
ctrl.abort()
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort()
InstructionPrompt.clear(handle.message.id)
}),

View File

@@ -129,7 +129,7 @@ async function tool(sessionID: SessionID, messageID: MessageID, tool: string, ou
}
function fake(
input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0],
input: Parameters<SessionProcessorModule.SessionProcessor.Interface["create"]>[0],
result: "continue" | "compact",
) {
const msg = input.assistantMessage

View File

@@ -1,7 +1,7 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { APICallError } from "ai"
import { Effect, Layer, ServiceMap } from "effect"
import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import path from "path"
import type { Agent } from "../../src/agent/agent"
@@ -120,21 +120,8 @@ function fail<E>(err: E, ...items: LLM.Event[]) {
return stream(...items).pipe(Stream.concat(Stream.fail(err)))
}
function wait(abort: AbortSignal) {
return Effect.promise(
() =>
new Promise<void>((done) => {
abort.addEventListener("abort", () => done(), { once: true })
}),
)
}
function hang(input: LLM.StreamInput, ...items: LLM.Event[]) {
return stream(...items).pipe(
Stream.concat(
Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))),
),
)
function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
}
function model(context: number): Provider.Model {
@@ -291,13 +278,11 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
const chat = yield* session.create({})
const parent = yield* user(chat.id, "hi")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const input = {
@@ -313,7 +298,6 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "hi" }],
tools: {},
} satisfies LLM.StreamInput
@@ -359,13 +343,11 @@ it.effect("session.processor effect tests stop after token overflow requests com
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(20)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
@@ -381,7 +363,6 @@ it.effect("session.processor effect tests stop after token overflow requests com
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "compact" }],
tools: {},
})
@@ -433,13 +414,11 @@ it.effect("session.processor effect tests reset reasoning state across retries",
const chat = yield* session.create({})
const parent = yield* user(chat.id, "reason")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
@@ -455,7 +434,6 @@ it.effect("session.processor effect tests reset reasoning state across retries",
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "reason" }],
tools: {},
})
@@ -485,13 +463,11 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
const chat = yield* session.create({})
const parent = yield* user(chat.id, "json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
@@ -507,7 +483,6 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "json" }],
tools: {},
})
@@ -535,13 +510,11 @@ it.effect("session.processor effect tests retry recognized structured json error
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
@@ -557,7 +530,6 @@ it.effect("session.processor effect tests retry recognized structured json error
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "retry json" }],
tools: {},
})
@@ -601,7 +573,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const states: number[] = []
const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
@@ -612,7 +583,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
@@ -628,7 +598,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "retry" }],
tools: {},
})
@@ -656,13 +625,11 @@ it.effect("session.processor effect tests compact on structured context overflow
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
@@ -678,7 +645,6 @@ it.effect("session.processor effect tests compact on structured context overflow
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "compact json" }],
tools: {},
})
@@ -710,17 +676,15 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
const chat = yield* session.create({})
const parent = yield* user(chat.id, "tool abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const run = Effect.runPromise(
handle.process({
const run = yield* handle
.process({
user: {
id: parent.id,
sessionID: chat.id,
@@ -733,20 +697,25 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "tool abort" }],
tools: {},
}),
)
})
.pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
abort.abort()
yield* Fiber.interrupt(run)
const value = yield* Effect.promise(() => run)
const exit = yield* Fiber.await(run)
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
yield* handle.abort()
}
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
expect(value).toBe("stop")
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
}
expect(yield* test.calls).toBe(1)
expect(tool?.state.status).toBe("error")
if (tool?.state.status === "error") {
@@ -779,7 +748,6 @@ it.effect("session.processor effect tests record aborted errors and idle state",
const chat = yield* session.create({})
const parent = yield* user(chat.id, "abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const errs: string[] = []
const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
@@ -792,11 +760,10 @@ it.effect("session.processor effect tests record aborted errors and idle state",
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const run = Effect.runPromise(
handle.process({
const run = yield* handle
.process({
user: {
id: parent.id,
sessionID: chat.id,
@@ -809,22 +776,27 @@ it.effect("session.processor effect tests record aborted errors and idle state",
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "abort" }],
tools: {},
}),
)
})
.pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
abort.abort()
yield* Fiber.interrupt(run)
const value = yield* Effect.promise(() => run)
const exit = yield* Fiber.await(run)
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
yield* handle.abort()
}
yield* Effect.promise(() => seen.promise)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const state = yield* status.get(chat.id)
off()
expect(value).toBe("stop")
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
}
expect(handle.message.error?.name).toBe("MessageAbortedError")
expect(stored.info.role).toBe("assistant")
if (stored.info.role === "assistant") {

View File

@@ -8,6 +8,9 @@ import { Agent as AgentSvc } from "../../src/agent/agent"
import { Bus } from "../../src/bus"
import { Command } from "../../src/command"
import { Config } from "../../src/config/config"
import { FileTime } from "../../src/file/time"
import { LSP } from "../../src/lsp"
import { MCP } from "../../src/mcp"
import { Permission } from "../../src/permission"
import { Plugin } from "../../src/plugin"
import type { Provider } from "../../src/provider/provider"
@@ -98,21 +101,39 @@ function finish(): LLM.Event {
return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() }
}
function wait(abort: AbortSignal) {
return Effect.promise(
() =>
new Promise<void>((done) => {
abort.addEventListener("abort", () => done(), { once: true })
}),
)
function finishToolCallsStep(): LLM.Event {
return {
type: "finish-step",
finishReason: "tool-calls",
rawFinishReason: "tool_calls",
response: { id: "res", modelId: "test-model", timestamp: new Date() },
providerMetadata: undefined,
usage: usage(),
}
}
function hang(input: LLM.StreamInput, ...items: LLM.Event[]) {
return stream(...items).pipe(
Stream.concat(
Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))),
),
)
function finishToolCalls(): LLM.Event {
return { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() }
}
function replyStop(text: string, id = "t") {
return [start(), textStart(id), textDelta(id, text), textEnd(id), finishStep(), finish()] as const
}
function replyToolCalls(text: string, id = "t") {
return [start(), textStart(id), textDelta(id, text), textEnd(id), finishToolCallsStep(), finishToolCalls()] as const
}
function toolInputStart(id: string, toolName: string): LLM.Event {
return { type: "tool-input-start", id, toolName }
}
function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event {
return { type: "tool-call", toolCallId, toolName, input }
}
function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
}
function defer<T>() {
@@ -123,6 +144,36 @@ function defer<T>() {
return { promise, resolve }
}
function waitMs(ms: number) {
return Effect.promise(() => new Promise<void>((done) => setTimeout(done, ms)))
}
function toolPart(parts: MessageV2.Part[]) {
return parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
}
type CompletedToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateCompleted }
type ErrorToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateError }
type RunningToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateRunning }
function completedTool(parts: MessageV2.Part[]) {
const part = toolPart(parts)
expect(part?.state.status).toBe("completed")
return part?.state.status === "completed" ? (part as CompletedToolPart) : undefined
}
function errorTool(parts: MessageV2.Part[]) {
const part = toolPart(parts)
expect(part?.state.status).toBe("error")
return part?.state.status === "error" ? (part as ErrorToolPart) : undefined
}
function runningTool(parts: MessageV2.Part[]) {
const part = toolPart(parts)
expect(part?.state.status).toBe("running")
return part?.state.status === "running" ? (part as RunningToolPart) : undefined
}
const llm = Layer.unwrap(
Effect.gen(function* () {
const queue: Script[] = []
@@ -160,6 +211,59 @@ const llm = Layer.unwrap(
}),
)
const mcp = Layer.succeed(
MCP.Service,
MCP.Service.of({
status: () => Effect.succeed({}),
clients: () => Effect.succeed({}),
tools: () => Effect.succeed({}),
prompts: () => Effect.succeed({}),
resources: () => Effect.succeed({}),
add: () => Effect.succeed({ status: { status: "disabled" as const } }),
connect: () => Effect.void,
disconnect: () => Effect.void,
getPrompt: () => Effect.succeed(undefined),
readResource: () => Effect.succeed(undefined),
startAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
authenticate: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
finishAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
removeAuth: () => Effect.void,
supportsOAuth: () => Effect.succeed(false),
hasStoredTokens: () => Effect.succeed(false),
getAuthStatus: () => Effect.succeed("not_authenticated" as const),
}),
)
const lsp = Layer.succeed(
LSP.Service,
LSP.Service.of({
init: () => Effect.void,
status: () => Effect.succeed([]),
hasClients: () => Effect.succeed(false),
touchFile: () => Effect.void,
diagnostics: () => Effect.succeed({}),
hover: () => Effect.succeed(undefined),
definition: () => Effect.succeed([]),
references: () => Effect.succeed([]),
implementation: () => Effect.succeed([]),
documentSymbol: () => Effect.succeed([]),
workspaceSymbol: () => Effect.succeed([]),
prepareCallHierarchy: () => Effect.succeed([]),
incomingCalls: () => Effect.succeed([]),
outgoingCalls: () => Effect.succeed([]),
}),
)
const filetime = Layer.succeed(
FileTime.Service,
FileTime.Service.of({
read: () => Effect.void,
get: () => Effect.succeed(undefined),
assert: () => Effect.void,
withLock: (_filepath, fn) => Effect.promise(fn),
}),
)
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
const deps = Layer.mergeAll(
@@ -170,6 +274,9 @@ const deps = Layer.mergeAll(
Permission.layer,
Plugin.defaultLayer,
Config.defaultLayer,
filetime,
lsp,
mcp,
AppFileSystem.defaultLayer,
status,
llm,
@@ -260,17 +367,36 @@ const seed = Effect.fn("test.seed")(function* (sessionID: SessionID, opts?: { fi
return { user: msg, assistant }
})
// Priority 1: Loop lifecycle
const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) =>
Effect.gen(function* () {
const session = yield* Session.Service
yield* session.updatePart({
id: PartID.ascending(),
messageID,
sessionID,
type: "subtask",
prompt: "look into the cache key path",
description: "inspect bug",
agent: "general",
model,
})
})
const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create(input ?? {})
return { test, prompt, sessions, chat }
})
// Loop semantics
it.effect("loop exits immediately when last assistant has stop finish", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({})
const { test, prompt, chat } = yield* boot()
yield* seed(chat.id, { finish: "stop" })
const result = yield* prompt.loop({ sessionID: chat.id })
@@ -286,13 +412,8 @@ it.effect("loop calls LLM and returns assistant message", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
yield* test.reply(start(), textStart(), textDelta("t", "world"), textEnd(), finishStep(), finish())
const chat = yield* sessions.create({})
const { test, prompt, chat } = yield* boot()
yield* test.reply(...replyStop("world"))
yield* user(chat.id, "hello")
const result = yield* prompt.loop({ sessionID: chat.id })
@@ -309,16 +430,32 @@ it.effect("loop continues when finish is tool-calls", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const { test, prompt, chat } = yield* boot()
yield* test.reply(...replyToolCalls("first"))
yield* test.reply(...replyStop("second"))
yield* user(chat.id, "hello")
// First reply finishes with tool-calls, second with stop
const result = yield* prompt.loop({ sessionID: chat.id })
expect(yield* test.calls).toBe(2)
expect(result.info.role).toBe("assistant")
}),
{ git: true, config: cfg },
),
)
it.effect("failed subtask preserves metadata on error tool state", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { test, prompt, chat } = yield* boot({ title: "Pinned" })
yield* test.reply(
start(),
textStart(),
textDelta("t", "first"),
textEnd(),
toolInputStart("task-1", "task"),
toolCall("task-1", "task", {
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
}),
{
type: "finish-step",
finishReason: "tool-calls",
@@ -329,16 +466,41 @@ it.effect("loop continues when finish is tool-calls", () =>
},
{ type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() },
)
yield* test.reply(start(), textStart(), textDelta("t", "second"), textEnd(), finishStep(), finish())
const chat = yield* sessions.create({})
yield* user(chat.id, "hello")
yield* test.reply(...replyStop("done"))
const msg = yield* user(chat.id, "hello")
yield* addSubtask(chat.id, msg.id)
const result = yield* prompt.loop({ sessionID: chat.id })
expect(yield* test.calls).toBe(2)
expect(result.info.role).toBe("assistant")
expect(yield* test.calls).toBe(2)
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return
const tool = errorTool(taskMsg.parts)
if (!tool) return
expect(tool.state.error).toContain("Tool execution failed")
expect(tool.state.metadata).toBeDefined()
expect(tool.state.metadata?.sessionId).toBeDefined()
expect(tool.state.metadata?.model).toEqual({
providerID: ProviderID.make("test"),
modelID: ModelID.make("missing-model"),
})
}),
{ git: true, config: cfg },
{
git: true,
config: {
...cfg,
agent: {
general: {
model: "test/missing-model",
},
},
},
},
),
)
@@ -375,7 +537,7 @@ it.effect("loop sets status to busy then idle", () =>
),
)
// Priority 2: Cancel safety
// Cancel semantics
it.effect(
"cancel interrupts loop and returns last assistant",
@@ -383,11 +545,7 @@ it.effect(
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({})
const { test, prompt, chat } = yield* boot()
yield* seed(chat.id)
// Make LLM hang so the loop blocks
@@ -398,7 +556,7 @@ it.effect(
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
// Give the loop time to start
yield* Effect.promise(() => new Promise<void>((r) => setTimeout(r, 200)))
yield* waitMs(200)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
@@ -419,17 +577,13 @@ it.effect(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const { test, prompt, chat } = yield* boot()
yield* test.push((input) =>
hang(input, start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
const chat = yield* sessions.create({})
yield* user(chat.id, "hello")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
@@ -457,24 +611,20 @@ it.effect(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const { test, prompt, chat } = yield* boot()
yield* test.push((input) =>
hang(input, start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
const chat = yield* sessions.create({})
yield* user(chat.id, "hello")
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
// Queue a second caller
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => new Promise<void>((r) => setTimeout(r, 50)))
yield* waitMs(50)
yield* prompt.cancel(chat.id)
@@ -488,17 +638,13 @@ it.effect(
30_000,
)
// Priority 3: Deferred queue
// Queue semantics
it.effect("concurrent loop callers get same result", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({})
const { prompt, chat } = yield* boot()
yield* seed(chat.id, { finish: "stop" })
const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
@@ -516,14 +662,10 @@ it.effect("concurrent loop callers all receive same error result", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const { test, prompt, chat } = yield* boot()
// Push a stream that fails — the loop records the error on the assistant message
yield* test.push(Stream.fail(new Error("boom")))
const chat = yield* sessions.create({})
yield* user(chat.id, "hello")
const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
@@ -590,7 +732,7 @@ it.effect("assertNotBusy succeeds when idle", () =>
),
)
// Priority 4: Shell basics
// Shell semantics
it.effect(
"shell rejects with BusyError when loop running",
@@ -599,17 +741,13 @@ it.effect(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const { test, prompt, chat } = yield* boot()
yield* test.push((input) =>
hang(input, start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
const chat = yield* sessions.create({})
yield* user(chat.id, "hi")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
@@ -626,27 +764,78 @@ it.effect(
30_000,
)
it.effect("shell captures stdout and stderr in completed tool output", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const result = yield* prompt.shell({
sessionID: chat.id,
agent: "build",
command: "printf out && printf err >&2",
})
expect(result.info.role).toBe("assistant")
const tool = completedTool(result.parts)
if (!tool) return
expect(tool.state.output).toContain("out")
expect(tool.state.output).toContain("err")
expect(tool.state.metadata.output).toContain("out")
expect(tool.state.metadata.output).toContain("err")
}),
{ git: true, config: cfg },
),
)
it.effect(
"shell updates running metadata before process exit",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const fiber = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "printf first && sleep 0.2 && printf second" })
.pipe(Effect.forkChild)
yield* Effect.promise(async () => {
const start = Date.now()
while (Date.now() - start < 2000) {
const msgs = await MessageV2.filterCompacted(MessageV2.stream(chat.id))
const taskMsg = msgs.find((item) => item.info.role === "assistant")
const tool = taskMsg ? runningTool(taskMsg.parts) : undefined
if (tool?.state.metadata?.output.includes("first")) return
await new Promise((done) => setTimeout(done, 20))
}
throw new Error("timed out waiting for running shell metadata")
})
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
}),
{ git: true, config: cfg },
),
30_000,
)
it.effect(
"loop waits while shell runs and starts after shell exits",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
yield* test.reply(start(), textStart(), textDelta("t", "after-shell"), textEnd(), finishStep(), finish())
const chat = yield* sessions.create({})
const { test, prompt, chat } = yield* boot()
yield* test.reply(...replyStop("after-shell"))
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
.pipe(Effect.forkChild)
yield* Effect.promise(() => new Promise<void>((done) => setTimeout(done, 50)))
yield* waitMs(50)
const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => new Promise<void>((done) => setTimeout(done, 50)))
yield* waitMs(50)
expect(yield* test.calls).toBe(0)
@@ -671,22 +860,17 @@ it.effect(
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
yield* test.reply(start(), textStart(), textDelta("t", "done"), textEnd(), finishStep(), finish())
const chat = yield* sessions.create({})
const { test, prompt, chat } = yield* boot()
yield* test.reply(...replyStop("done"))
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
.pipe(Effect.forkChild)
yield* Effect.promise(() => new Promise<void>((done) => setTimeout(done, 50)))
yield* waitMs(50)
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => new Promise<void>((done) => setTimeout(done, 50)))
yield* waitMs(50)
expect(yield* test.calls).toBe(0)
@@ -712,15 +896,12 @@ it.effect(
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({})
const { prompt, chat } = yield* boot()
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
yield* Effect.promise(() => new Promise<void>((done) => setTimeout(done, 50)))
yield* waitMs(50)
yield* prompt.cancel(chat.id)
@@ -747,15 +928,12 @@ it.effect(
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({})
const { prompt, chat } = yield* boot()
const a = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
yield* Effect.promise(() => new Promise<void>((done) => setTimeout(done, 50)))
yield* waitMs(50)
const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)