use SessionProcessor.Service, Session.Service, Agent.Service directly in layer

Yield effectified services instead of going through async facades. Eliminates
Effect→Promise→Effect double-bounce for processor.create, processor.process,
Session.get/touch/setPermission/updateMessage, and Agent.get/list. Await cancel
in session route. Remove redundant InstanceState.get in shellE ensuring block.
This commit is contained in:
Kit Langton
2026-03-28 12:29:17 -04:00
parent d7ae7609cd
commit e26193408f
3 changed files with 115 additions and 62 deletions

View File

@@ -381,7 +381,7 @@ export const SessionRoutes = lazy(() =>
}),
),
async (c) => {
SessionPrompt.cancel(c.req.valid("param").sessionID)
await SessionPrompt.cancel(c.req.valid("param").sessionID)
return c.json(true)
},
)

View File

@@ -48,7 +48,7 @@ import { Shell } from "@/shell/shell"
import { Truncate } from "@/tool/truncate"
import { decodeDataUrl } from "@/util/data-url"
import { Process } from "@/util/process"
import { Deferred, Effect, Fiber, Layer, Scope, ServiceMap } from "effect"
import { Cause, Deferred, Effect, Exit, Fiber, Layer, Scope, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
@@ -92,6 +92,7 @@ export namespace SessionPrompt {
const status = yield* SessionStatus.Service
const sessions = yield* Session.Service
const agents = yield* Agent.Service
const processor = yield* SessionProcessor.Service
const scope = yield* Scope.Scope
const cache = yield* InstanceState.make(
@@ -157,6 +158,15 @@ export namespace SessionPrompt {
return yield* loopE({ sessionID: input.sessionID })
})
const lastAssistant = (sessionID: SessionID) =>
Effect.promise(async () => {
for await (const item of MessageV2.stream(sessionID)) {
if (item.info.role === "user") continue
return item
}
throw new Error("Impossible")
})
const runLoop = Effect.fn("SessionPrompt.run")(function* (sessionID: SessionID) {
let structured: unknown | undefined
let step = 0
@@ -280,22 +290,21 @@ export namespace SessionPrompt {
time: { created: Date.now() },
sessionID,
})
const processor = yield* Effect.promise((signal) =>
SessionProcessor.create({
assistantMessage: msg as MessageV2.Assistant,
sessionID,
model,
abort: signal,
}),
)
const ctrl = new AbortController()
const handle = yield* processor.create({
assistantMessage: msg as MessageV2.Assistant,
sessionID,
model,
abort: ctrl.signal,
})
const outcome: "break" | "continue" = yield* Effect.ensuring(
const outcome: "break" | "continue" = yield* Effect.onExit(
Effect.gen(function* () {
const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
const tools = yield* Effect.promise(() =>
resolveTools({ agent, session, model, tools: lastUser!.tools, processor, bypassAgentCheck, messages: msgs }),
resolveTools({ agent, session, model, tools: lastUser!.tools, processor: handle, bypassAgentCheck, messages: msgs }),
)
if (lastUser!.format?.type === "json_schema") {
@@ -340,39 +349,37 @@ export namespace SessionPrompt {
const system = [...env, ...(skills ? [skills] : []), ...instructions]
const format = lastUser!.format ?? { type: "text" as const }
if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
const result = yield* Effect.promise((signal) =>
processor.process({
user: lastUser!,
agent,
permission: session.permission,
abort: signal,
sessionID,
system,
messages: [
...modelMsgs,
...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : []),
],
tools,
model,
toolChoice: format.type === "json_schema" ? "required" : undefined,
}),
)
const result = yield* handle.process({
user: lastUser!,
agent,
permission: session.permission,
abort: ctrl.signal,
sessionID,
system,
messages: [
...modelMsgs,
...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : []),
],
tools,
model,
toolChoice: format.type === "json_schema" ? "required" : undefined,
})
if (structured !== undefined) {
processor.message.structured = structured
processor.message.finish = processor.message.finish ?? "stop"
yield* sessions.updateMessage(processor.message)
handle.message.structured = structured
handle.message.finish = handle.message.finish ?? "stop"
yield* sessions.updateMessage(handle.message)
return "break" as const
}
const finished = processor.message.finish && !["tool-calls", "unknown"].includes(processor.message.finish)
if (finished && !processor.message.error) {
const finished = handle.message.finish && !["tool-calls", "unknown"].includes(handle.message.finish)
if (finished && !handle.message.error) {
if (format.type === "json_schema") {
processor.message.error = new MessageV2.StructuredOutputError({
handle.message.error = new MessageV2.StructuredOutputError({
message: "Model did not produce structured output",
retries: 0,
}).toObject()
yield* sessions.updateMessage(processor.message)
yield* sessions.updateMessage(handle.message)
return "break" as const
}
}
@@ -385,37 +392,48 @@ export namespace SessionPrompt {
agent: lastUser!.agent,
model: lastUser!.model,
auto: true,
overflow: !processor.message.finish,
overflow: !handle.message.finish,
}),
)
}
return "continue" as const
}),
Effect.sync(() => InstructionPrompt.clear(processor.message.id)),
(exit) =>
Effect.gen(function* () {
ctrl.abort()
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort()
InstructionPrompt.clear(handle.message.id)
}),
)
if (outcome === "break") break
continue
}
SessionCompaction.prune({ sessionID })
return yield* Effect.promise(async () => {
for await (const item of MessageV2.stream(sessionID)) {
if (item.info.role === "user") continue
return item
}
throw new Error("Impossible")
})
return yield* lastAssistant(sessionID)
})
type State = { loops: Map<string, LoopEntry>; shells: Map<string, Fiber.Fiber<MessageV2.WithParts, unknown>> }
const awaitFiber = <A>(fiber: Fiber.Fiber<A, unknown>, fallback: Effect.Effect<A>) =>
Effect.gen(function* () {
const exit = yield* Fiber.await(fiber)
if (Exit.isSuccess(exit)) return exit.value
if (Cause.hasInterruptsOnly(exit.cause)) return yield* fallback
return yield* Effect.failCause(exit.cause as Cause.Cause<never>)
})
const startLoop = Effect.fnUntraced(function* (s: State, sessionID: SessionID) {
const fiber = yield* runLoop(sessionID).pipe(
Effect.onExit((exit) =>
Effect.gen(function* () {
const entry = s.loops.get(sessionID)
if (entry) {
for (const d of entry.queue) yield* Deferred.done(d, exit)
// On interrupt, resolve queued callers with the last assistant message
const resolved = Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)
? Exit.succeed(yield* lastAssistant(sessionID))
: exit
for (const d of entry.queue) yield* Deferred.done(d, resolved)
}
s.loops.delete(sessionID)
yield* status.set(sessionID, { type: "idle" })
@@ -425,12 +443,11 @@ export namespace SessionPrompt {
)
const entry = s.loops.get(sessionID)
if (entry) {
// Queue already exists (created while shell was running) — attach fiber
entry.fiber = fiber
} else {
s.loops.set(sessionID, { fiber, queue: [] })
}
return yield* Fiber.join(fiber).pipe(Effect.orDie)
return yield* awaitFiber(fiber, lastAssistant(sessionID))
})
const loopE = Effect.fn("SessionPrompt.loop")(function* (input: z.infer<typeof LoopInput>) {
@@ -462,7 +479,6 @@ export namespace SessionPrompt {
const fiber = yield* Effect.promise((signal) => shellImpl(input, signal)).pipe(
Effect.ensuring(
Effect.gen(function* () {
const s = yield* InstanceState.get(cache)
s.shells.delete(input.sessionID)
// If callers queued a loop while the shell was running, start it
const pending = s.loops.get(input.sessionID)
@@ -477,7 +493,7 @@ export namespace SessionPrompt {
)
s.shells.set(input.sessionID, fiber)
return yield* Fiber.join(fiber).pipe(Effect.orDie)
return yield* awaitFiber(fiber, lastAssistant(input.sessionID))
})
const commandE = Effect.fn("SessionPrompt.command")(function* (input: CommandInput) {
@@ -508,6 +524,7 @@ export namespace SessionPrompt {
Effect.sync(() =>
layer.pipe(
Layer.provide(SessionStatus.layer),
Layer.provide(SessionProcessor.defaultLayer),
Layer.provide(Session.defaultLayer),
Layer.provide(Agent.defaultLayer),
Layer.provide(Bus.layer),
@@ -863,7 +880,7 @@ export namespace SessionPrompt {
model: Provider.Model
session: Session.Info
tools?: Record<string, boolean>
processor: SessionProcessor.Info
processor: Pick<SessionProcessor.Handle, "message" | "partFromToolCall">
bypassAgentCheck: boolean
messages: MessageV2.WithParts[]
}) {

View File

@@ -160,8 +160,7 @@ describe("session.prompt concurrency", () => {
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
// Don't seed an assistant message — loop will try to call Provider.getModel
// and hang. We cancel before it can fail.
// Seed only a user message — loop must call getModel to proceed
const userMsg: MessageV2.Info = {
id: MessageID.ascending(),
role: "user",
@@ -178,20 +177,57 @@ describe("session.prompt concurrency", () => {
type: "text",
text: "hello",
})
// Also seed an assistant message so lastAssistant() fallback can find it
const assistantMsg: MessageV2.Info = {
id: MessageID.ascending(),
role: "assistant",
parentID: userMsg.id,
sessionID: session.id,
mode: "build",
agent: "build",
cost: 0,
path: { cwd: "/tmp", root: "/tmp" },
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: "gpt-5.2" as any,
providerID: "openai" as any,
time: { created: Date.now() },
}
await Session.updateMessage(assistantMsg)
await Session.updatePart({
id: PartID.ascending(),
messageID: assistantMsg.id,
sessionID: session.id,
type: "text",
text: "hi there",
})
// Start loop — it will try to get model and either fail or hang
const loopPromise = SessionPrompt.loop({ sessionID: session.id }).catch(() => undefined)
const ready = deferred()
const gate = deferred()
const getModel = spyOn(Provider, "getModel").mockImplementation(async () => {
ready.resolve()
await gate.promise
throw new Error("test stop")
})
// Give it a tick to start
await new Promise((r) => setTimeout(r, 10))
try {
// Start loop — it will block in getModel (assistant has no finish, so loop continues)
const loopPromise = SessionPrompt.loop({ sessionID: session.id })
await SessionPrompt.cancel(session.id)
await ready.promise
const status = await SessionStatus.get(session.id)
expect(status.type).toBe("idle")
await SessionPrompt.cancel(session.id)
// Wait for the loop to settle
await loopPromise
const status = await SessionStatus.get(session.id)
expect(status.type).toBe("idle")
// loop should resolve cleanly, not throw "All fibers interrupted"
const result = await loopPromise
expect(result.info.role).toBe("assistant")
expect(result.info.id).toBe(assistantMsg.id)
} finally {
gate.resolve()
getModel.mockRestore()
}
},
})
}, 10000)