Compare commits

...

8 Commits

Author SHA1 Message Date
Kit Langton
8e81a7797e fix(session): adapt compaction to async processor 2026-03-27 23:24:17 -04:00
Kit Langton
cdfdb91403 refactor(session): tighten processor abort handling 2026-03-27 23:24:17 -04:00
Kit Langton
9b4eba1f98 refactor(session): effectify session processor 2026-03-27 23:24:17 -04:00
Kit Langton
366a94bf33 refactor(session): clean up processor retry flow 2026-03-27 23:24:17 -04:00
Kit Langton
53e556f8ee refactor(session): rename lastThree to recentParts, add retry comment 2026-03-27 23:24:17 -04:00
Kit Langton
c07dae5a2d refactor(session): preserve processor effect follow-up 2026-03-27 23:24:17 -04:00
Kit Langton
95908b4631 refactor(session): convert SessionProcessor to Effect internals
Replace the while(true) + for-await + try/catch pattern with:
- Stream.fromAsyncIterable + Stream.runForEachWhile for event consumption
- Recursive Effect for retry logic (preserves SessionRetry.delay backoff)
- Effect.ensuring for cleanup guarantees
- Effect.catch for error classification

process() still returns Promise externally (via Effect.runPromise) so
callers don't change. Event handling and cleanup logic unchanged.
2026-03-27 23:24:16 -04:00
Kit Langton
4b0d2bf4b3 refactor(session): extract handleEvent and cleanup from SessionProcessor
Pure structural refactor — no Effect yet. Extracts the switch statement
into a standalone handleEvent function and the cleanup section into a
cleanup function. ProcessorContext object shares mutable state via
getters/setters. Zero behavior change (verified by two review agents).
2026-03-27 23:24:16 -04:00
6 changed files with 550 additions and 436 deletions

View File

@@ -207,12 +207,14 @@ export namespace SessionCompaction {
created: Date.now(),
},
})) as MessageV2.Assistant
const processor = SessionProcessor.create({
assistantMessage: msg,
sessionID: input.sessionID,
model,
abort: input.abort,
})
const processor = yield* Effect.promise(() =>
SessionProcessor.create({
assistantMessage: msg,
sessionID: input.sessionID,
model,
abort: input.abort,
}),
)
// Allow plugins to inject context or replace compaction prompt.
const compacting = yield* plugin.trigger(
"experimental.session.compacting",

View File

@@ -1,430 +1,501 @@
import { MessageV2 } from "./message-v2"
import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
import { makeRuntime } from "@/effect/run-service"
import { Config } from "@/config/config"
import { Permission } from "@/permission"
import { Plugin } from "@/plugin"
import { Snapshot } from "@/snapshot"
import { Log } from "@/util/log"
import { Session } from "."
import { Agent } from "@/agent/agent"
import { Snapshot } from "@/snapshot"
import { SessionSummary } from "./summary"
import { Bus } from "@/bus"
import { LLM } from "./llm"
import { MessageV2 } from "./message-v2"
import { PartID } from "./schema"
import type { SessionID } from "./schema"
import { SessionCompaction } from "./compaction"
import { SessionRetry } from "./retry"
import { SessionStatus } from "./status"
import { Plugin } from "@/plugin"
import { SessionSummary } from "./summary"
import type { Provider } from "@/provider/provider"
import { LLM } from "./llm"
import { Config } from "@/config/config"
import { SessionCompaction } from "./compaction"
import { Permission } from "@/permission"
import { Question } from "@/question"
import { PartID } from "./schema"
import type { SessionID, MessageID } from "./schema"
export namespace SessionProcessor {
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
export type Info = Awaited<ReturnType<typeof create>>
export type Result = Awaited<ReturnType<Info["process"]>>
export type Result = "compact" | "stop" | "continue"
export function create(input: {
export interface Handle {
readonly message: MessageV2.Assistant
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
}
export interface Info {
readonly message: MessageV2.Assistant
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
readonly process: (streamInput: LLM.StreamInput) => Promise<Result>
}
type Input = {
assistantMessage: MessageV2.Assistant
sessionID: SessionID
model: Provider.Model
abort: AbortSignal
}) {
const toolcalls: Record<string, MessageV2.ToolPart> = {}
let snapshot: string | undefined
let blocked = false
let attempt = 0
let needsCompaction = false
}
const result = {
get message() {
return input.assistantMessage
},
partFromToolCall(toolCallID: string) {
return toolcalls[toolCallID]
},
async process(streamInput: LLM.StreamInput) {
log.info("process")
needsCompaction = false
const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true
while (true) {
try {
let currentText: MessageV2.TextPart | undefined
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
const stream = await LLM.stream(streamInput)
export interface Interface {
readonly create: (input: Input) => Effect.Effect<Handle>
}
for await (const value of stream.fullStream) {
input.abort.throwIfAborted()
switch (value.type) {
case "start":
await SessionStatus.set(input.sessionID, { type: "busy" })
break
interface ProcessorContext extends Input {
toolcalls: Record<string, MessageV2.ToolPart>
shouldBreak: boolean
snapshot: string | undefined
blocked: boolean
needsCompaction: boolean
currentText: MessageV2.TextPart | undefined
reasoningMap: Record<string, MessageV2.ReasoningPart>
}
case "reasoning-start":
if (value.id in reasoningMap) {
continue
}
const reasoningPart = {
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "reasoning" as const,
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
}
reasoningMap[value.id] = reasoningPart
await Session.updatePart(reasoningPart)
break
type StreamResult = Awaited<ReturnType<typeof LLM.stream>>
type StreamEvent = StreamResult["fullStream"] extends AsyncIterable<infer T> ? T : never
case "reasoning-delta":
if (value.id in reasoningMap) {
const part = reasoningMap[value.id]
part.text += value.text
if (value.providerMetadata) part.metadata = value.providerMetadata
await Session.updatePartDelta({
sessionID: part.sessionID,
messageID: part.messageID,
partID: part.id,
field: "text",
delta: value.text,
})
}
break
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionProcessor") {}
case "reasoning-end":
if (value.id in reasoningMap) {
const part = reasoningMap[value.id]
part.text = part.text.trimEnd()
export const layer: Layer.Layer<
Service,
never,
| Session.Service
| Config.Service
| Bus.Service
| Snapshot.Service
| Agent.Service
| Permission.Service
| Plugin.Service
| SessionStatus.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
const session = yield* Session.Service
const config = yield* Config.Service
const bus = yield* Bus.Service
const snapshot = yield* Snapshot.Service
const agents = yield* Agent.Service
const permission = yield* Permission.Service
const plugin = yield* Plugin.Service
const status = yield* SessionStatus.Service
part.time = {
...part.time,
end: Date.now(),
}
if (value.providerMetadata) part.metadata = value.providerMetadata
await Session.updatePart(part)
delete reasoningMap[value.id]
}
break
const create = Effect.fn("SessionProcessor.create")(function* (input: Input) {
const ctx: ProcessorContext = {
assistantMessage: input.assistantMessage,
sessionID: input.sessionID,
model: input.model,
abort: input.abort,
toolcalls: {},
shouldBreak: false,
snapshot: undefined,
blocked: false,
needsCompaction: false,
currentText: undefined,
reasoningMap: {},
}
case "tool-input-start":
const part = await Session.updatePart({
id: toolcalls[value.id]?.id ?? PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "tool",
tool: value.toolName,
callID: value.id,
state: {
status: "pending",
input: {},
raw: "",
},
})
toolcalls[value.id] = part as MessageV2.ToolPart
break
const parse = (e: unknown) =>
MessageV2.fromError(e, {
providerID: input.model.providerID,
aborted: input.abort.aborted,
})
case "tool-input-delta":
break
const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
switch (value.type) {
case "start":
yield* status.set(ctx.sessionID, { type: "busy" })
return
case "tool-input-end":
break
case "tool-call": {
const match = toolcalls[value.toolCallId]
if (match) {
const part = await Session.updatePart({
...match,
tool: value.toolName,
state: {
status: "running",
input: value.input,
time: {
start: Date.now(),
},
},
metadata: value.providerMetadata,
})
toolcalls[value.toolCallId] = part as MessageV2.ToolPart
const parts = await MessageV2.parts(input.assistantMessage.id)
const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD)
if (
lastThree.length === DOOM_LOOP_THRESHOLD &&
lastThree.every(
(p) =>
p.type === "tool" &&
p.tool === value.toolName &&
p.state.status !== "pending" &&
JSON.stringify(p.state.input) === JSON.stringify(value.input),
)
) {
const agent = await Agent.get(input.assistantMessage.agent)
await Permission.ask({
permission: "doom_loop",
patterns: [value.toolName],
sessionID: input.assistantMessage.sessionID,
metadata: {
tool: value.toolName,
input: value.input,
},
always: [value.toolName],
ruleset: agent.permission,
})
}
}
break
}
case "tool-result": {
const match = toolcalls[value.toolCallId]
if (match && match.state.status === "running") {
await Session.updatePart({
...match,
state: {
status: "completed",
input: value.input ?? match.state.input,
output: value.output.output,
metadata: value.output.metadata,
title: value.output.title,
time: {
start: match.state.time.start,
end: Date.now(),
},
attachments: value.output.attachments,
},
})
delete toolcalls[value.toolCallId]
}
break
}
case "tool-error": {
const match = toolcalls[value.toolCallId]
if (match && match.state.status === "running") {
await Session.updatePart({
...match,
state: {
status: "error",
input: value.input ?? match.state.input,
error: value.error instanceof Error ? value.error.message : String(value.error),
time: {
start: match.state.time.start,
end: Date.now(),
},
},
})
if (
value.error instanceof Permission.RejectedError ||
value.error instanceof Question.RejectedError
) {
blocked = shouldBreak
}
delete toolcalls[value.toolCallId]
}
break
}
case "error":
throw value.error
case "start-step":
snapshot = await Snapshot.track()
await Session.updatePart({
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
snapshot,
type: "step-start",
})
break
case "finish-step":
const usage = Session.getUsage({
model: input.model,
usage: value.usage,
metadata: value.providerMetadata,
})
input.assistantMessage.finish = value.finishReason
input.assistantMessage.cost += usage.cost
input.assistantMessage.tokens = usage.tokens
await Session.updatePart({
id: PartID.ascending(),
reason: value.finishReason,
snapshot: await Snapshot.track(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
await Session.updateMessage(input.assistantMessage)
if (snapshot) {
const patch = await Snapshot.patch(snapshot)
if (patch.files.length) {
await Session.updatePart({
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
snapshot = undefined
}
SessionSummary.summarize({
sessionID: input.sessionID,
messageID: input.assistantMessage.parentID,
})
if (
!input.assistantMessage.summary &&
(await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model }))
) {
needsCompaction = true
}
break
case "text-start":
currentText = {
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "text",
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
}
await Session.updatePart(currentText)
break
case "text-delta":
if (currentText) {
currentText.text += value.text
if (value.providerMetadata) currentText.metadata = value.providerMetadata
await Session.updatePartDelta({
sessionID: currentText.sessionID,
messageID: currentText.messageID,
partID: currentText.id,
field: "text",
delta: value.text,
})
}
break
case "text-end":
if (currentText) {
currentText.text = currentText.text.trimEnd()
const textOutput = await Plugin.trigger(
"experimental.text.complete",
{
sessionID: input.sessionID,
messageID: input.assistantMessage.id,
partID: currentText.id,
},
{ text: currentText.text },
)
currentText.text = textOutput.text
currentText.time = {
start: Date.now(),
end: Date.now(),
}
if (value.providerMetadata) currentText.metadata = value.providerMetadata
await Session.updatePart(currentText)
}
currentText = undefined
break
case "finish":
break
default:
log.info("unhandled", {
...value,
})
continue
}
if (needsCompaction) break
}
} catch (e: any) {
log.error("process", {
error: e,
stack: JSON.stringify(e.stack),
})
const error = MessageV2.fromError(e, { providerID: input.model.providerID, aborted: input.abort.aborted })
if (MessageV2.ContextOverflowError.isInstance(error)) {
needsCompaction = true
Bus.publish(Session.Event.Error, {
sessionID: input.sessionID,
error,
})
} else {
const retry = SessionRetry.retryable(error)
if (retry !== undefined) {
attempt++
const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined)
await SessionStatus.set(input.sessionID, {
type: "retry",
attempt,
message: retry,
next: Date.now() + delay,
})
await SessionRetry.sleep(delay, input.abort).catch(() => {})
continue
}
input.assistantMessage.error = error
Bus.publish(Session.Event.Error, {
sessionID: input.assistantMessage.sessionID,
error: input.assistantMessage.error,
})
await SessionStatus.set(input.sessionID, { type: "idle" })
}
}
if (snapshot) {
const patch = await Snapshot.patch(snapshot)
if (patch.files.length) {
await Session.updatePart({
case "reasoning-start":
if (value.id in ctx.reasoningMap) return
ctx.reasoningMap[value.id] = {
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "reasoning",
text: "",
time: { start: Date.now() },
metadata: value.providerMetadata,
}
yield* session.updatePart(ctx.reasoningMap[value.id])
return
case "reasoning-delta":
if (!(value.id in ctx.reasoningMap)) return
ctx.reasoningMap[value.id].text += value.text
if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
yield* session.updatePartDelta({
sessionID: ctx.reasoningMap[value.id].sessionID,
messageID: ctx.reasoningMap[value.id].messageID,
partID: ctx.reasoningMap[value.id].id,
field: "text",
delta: value.text,
})
return
case "reasoning-end":
if (!(value.id in ctx.reasoningMap)) return
ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text.trimEnd()
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
yield* session.updatePart(ctx.reasoningMap[value.id])
delete ctx.reasoningMap[value.id]
return
case "tool-input-start":
ctx.toolcalls[value.id] = (yield* session.updatePart({
id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "tool",
tool: value.toolName,
callID: value.id,
state: { status: "pending", input: {}, raw: "" },
})) as MessageV2.ToolPart
return
case "tool-input-delta":
return
case "tool-input-end":
return
case "tool-call": {
const match = ctx.toolcalls[value.toolCallId]
if (!match) return
ctx.toolcalls[value.toolCallId] = (yield* session.updatePart({
...match,
tool: value.toolName,
state: { status: "running", input: value.input, time: { start: Date.now() } },
metadata: value.providerMetadata,
})) as MessageV2.ToolPart
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
if (
recentParts.length !== DOOM_LOOP_THRESHOLD ||
!recentParts.every(
(part) =>
part.type === "tool" &&
part.tool === value.toolName &&
part.state.status !== "pending" &&
JSON.stringify(part.state.input) === JSON.stringify(value.input),
)
) {
return
}
const agent = yield* agents.get(ctx.assistantMessage.agent)
yield* permission.ask({
permission: "doom_loop",
patterns: [value.toolName],
sessionID: ctx.assistantMessage.sessionID,
metadata: { tool: value.toolName, input: value.input },
always: [value.toolName],
ruleset: agent.permission,
})
return
}
case "tool-result": {
const match = ctx.toolcalls[value.toolCallId]
if (!match || match.state.status !== "running") return
yield* session.updatePart({
...match,
state: {
status: "completed",
input: value.input ?? match.state.input,
output: value.output.output,
metadata: value.output.metadata,
title: value.output.title,
time: { start: match.state.time.start, end: Date.now() },
attachments: value.output.attachments,
},
})
delete ctx.toolcalls[value.toolCallId]
return
}
case "tool-error": {
const match = ctx.toolcalls[value.toolCallId]
if (!match || match.state.status !== "running") return
yield* session.updatePart({
...match,
state: {
status: "error",
input: value.input ?? match.state.input,
error: value.error instanceof Error ? value.error.message : String(value.error),
time: { start: match.state.time.start, end: Date.now() },
},
})
if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) {
ctx.blocked = ctx.shouldBreak
}
delete ctx.toolcalls[value.toolCallId]
return
}
case "error":
throw value.error
case "start-step":
ctx.snapshot = yield* snapshot.track()
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
snapshot: ctx.snapshot,
type: "step-start",
})
return
case "finish-step": {
const usage = Session.getUsage({
model: ctx.model,
usage: value.usage,
metadata: value.providerMetadata,
})
ctx.assistantMessage.finish = value.finishReason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
yield* session.updatePart({
id: PartID.ascending(),
reason: value.finishReason,
snapshot: yield* snapshot.track(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
yield* session.updateMessage(ctx.assistantMessage)
if (ctx.snapshot) {
const patch = yield* snapshot.patch(ctx.snapshot)
if (patch.files.length) {
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
ctx.snapshot = undefined
}
yield* Effect.sync(() => {
void SessionSummary.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
})
})
if (
!ctx.assistantMessage.summary &&
(yield* Effect.promise(() => SessionCompaction.isOverflow({ tokens: usage.tokens, model: ctx.model })))
) {
ctx.needsCompaction = true
}
return
}
case "text-start":
ctx.currentText = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "text",
text: "",
time: { start: Date.now() },
metadata: value.providerMetadata,
}
yield* session.updatePart(ctx.currentText)
return
case "text-delta":
if (!ctx.currentText) return
ctx.currentText.text += value.text
if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
yield* session.updatePartDelta({
sessionID: ctx.currentText.sessionID,
messageID: ctx.currentText.messageID,
partID: ctx.currentText.id,
field: "text",
delta: value.text,
})
return
case "text-end":
if (!ctx.currentText) return
ctx.currentText.text = ctx.currentText.text.trimEnd()
ctx.currentText.text = (yield* plugin.trigger(
"experimental.text.complete",
{
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.id,
partID: ctx.currentText.id,
},
{ text: ctx.currentText.text },
)).text
ctx.currentText.time = { start: Date.now(), end: Date.now() }
if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
yield* session.updatePart(ctx.currentText)
ctx.currentText = undefined
return
case "finish":
return
default:
log.info("unhandled", { ...value })
return
}
})
const cleanup = Effect.fn("SessionProcessor.cleanup")(function* () {
if (ctx.snapshot) {
const patch = yield* snapshot.patch(ctx.snapshot)
if (patch.files.length) {
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
snapshot = undefined
ctx.snapshot = undefined
}
const p = await MessageV2.parts(input.assistantMessage.id)
for (const part of p) {
if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") {
await Session.updatePart({
...part,
state: {
...part.state,
status: "error",
error: "Tool execution aborted",
time: {
start: Date.now(),
end: Date.now(),
},
},
})
}
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
for (const part of parts) {
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
yield* session.updatePart({
...part,
state: {
...part.state,
status: "error",
error: "Tool execution aborted",
time: { start: Date.now(), end: Date.now() },
},
})
}
input.assistantMessage.time.completed = Date.now()
await Session.updateMessage(input.assistantMessage)
if (needsCompaction) return "compact"
if (blocked) return "stop"
if (input.assistantMessage.error) return "stop"
ctx.assistantMessage.time.completed = Date.now()
yield* session.updateMessage(ctx.assistantMessage)
})
const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) })
const error = parse(e)
if (MessageV2.ContextOverflowError.isInstance(error)) {
ctx.needsCompaction = true
yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error })
return
}
ctx.assistantMessage.error = error
yield* bus.publish(Session.Event.Error, {
sessionID: ctx.assistantMessage.sessionID,
error: ctx.assistantMessage.error,
})
yield* status.set(ctx.sessionID, { type: "idle" })
})
const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
log.info("process")
ctx.needsCompaction = false
ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
yield* Effect.gen(function* () {
ctx.currentText = undefined
ctx.reasoningMap = {}
const stream = yield* Effect.promise(() => LLM.stream(streamInput))
yield* Stream.fromAsyncIterable(stream.fullStream, (e) => e).pipe(
Stream.runForEachWhile((event) =>
Effect.gen(function* () {
// TODO: Revisit this once more of handleEvent is Effect-native.
// As local synchronous mutation shrinks, this guard and eventually the
// raw AbortSignal plumbing should be removable.
input.abort.throwIfAborted()
yield* handleEvent(event)
return !ctx.needsCompaction
}),
),
)
}).pipe(
Effect.catchCause((cause) => Effect.fail(Cause.squash(cause))),
Effect.retry(SessionRetry.policy({ sessionID: ctx.sessionID, parse })),
Effect.catchCauseIf(
(cause) => !Cause.hasInterruptsOnly(cause),
(cause) => halt(Cause.squash(cause)),
),
Effect.catchCause(() => Effect.interrupt),
Effect.onInterrupt(() => halt(new DOMException("Aborted", "AbortError"))),
Effect.ensuring(cleanup()),
)
if (ctx.needsCompaction) return "compact"
if (ctx.blocked || ctx.assistantMessage.error || input.abort.aborted) return "stop"
return "continue"
})
return {
get message() {
return ctx.assistantMessage
},
partFromToolCall(toolCallID: string) {
return ctx.toolcalls[toolCallID]
},
process,
} satisfies Handle
})
return Service.of({ create })
}),
)
export const defaultLayer = Layer.unwrap(
Effect.sync(() =>
layer.pipe(
Layer.provide(Session.defaultLayer),
Layer.provide(Snapshot.defaultLayer),
Layer.provide(Agent.defaultLayer),
Layer.provide(Permission.layer),
Layer.provide(Plugin.defaultLayer),
Layer.provide(SessionStatus.layer.pipe(Layer.provide(Bus.layer))),
Layer.provide(Bus.layer),
Layer.provide(Config.defaultLayer),
),
),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function create(input: Input): Promise<Info> {
const hit = await runPromise((svc) => svc.create(input))
return {
get message() {
return hit.message
},
partFromToolCall(toolCallID: string) {
return hit.partFromToolCall(toolCallID)
},
async process(streamInput: LLM.StreamInput) {
const exit = await Effect.runPromiseExit(hit.process(streamInput), { signal: input.abort })
if (Exit.isFailure(exit)) {
if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) return "stop"
throw Cause.squash(exit.cause)
}
return exit.value
},
}
return result
}
}

View File

@@ -594,7 +594,7 @@ export namespace SessionPrompt {
session,
})
const processor = SessionProcessor.create({
const processor = await SessionProcessor.create({
assistantMessage: (await Session.updateMessage({
id: MessageID.ascending(),
parentID: lastUser.id,

View File

@@ -1,28 +1,20 @@
import type { NamedError } from "@opencode-ai/util/error"
import { Clock, Effect, Schedule } from "effect"
import { MessageV2 } from "./message-v2"
import type { SessionID } from "./schema"
import { SessionStatus } from "./status"
import { iife } from "@/util/iife"
export namespace SessionRetry {
export type Err = ReturnType<NamedError["toObject"]>
export const RETRY_INITIAL_DELAY = 2000
export const RETRY_BACKOFF_FACTOR = 2
export const RETRY_MAX_DELAY_NO_HEADERS = 30_000 // 30 seconds
export const RETRY_MAX_DELAY = 2_147_483_647 // max 32-bit signed integer for setTimeout
export async function sleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
const abortHandler = () => {
clearTimeout(timeout)
reject(new DOMException("Aborted", "AbortError"))
}
const timeout = setTimeout(
() => {
signal.removeEventListener("abort", abortHandler)
resolve()
},
Math.min(ms, RETRY_MAX_DELAY),
)
signal.addEventListener("abort", abortHandler, { once: true })
})
function cap(ms: number) {
return Math.min(ms, RETRY_MAX_DELAY)
}
export function delay(attempt: number, error?: MessageV2.APIError) {
@@ -33,7 +25,7 @@ export namespace SessionRetry {
if (retryAfterMs) {
const parsedMs = Number.parseFloat(retryAfterMs)
if (!Number.isNaN(parsedMs)) {
return parsedMs
return cap(parsedMs)
}
}
@@ -42,23 +34,23 @@ export namespace SessionRetry {
const parsedSeconds = Number.parseFloat(retryAfter)
if (!Number.isNaN(parsedSeconds)) {
// convert seconds to milliseconds
return Math.ceil(parsedSeconds * 1000)
return cap(Math.ceil(parsedSeconds * 1000))
}
// Try parsing as HTTP date format
const parsed = Date.parse(retryAfter) - Date.now()
if (!Number.isNaN(parsed) && parsed > 0) {
return Math.ceil(parsed)
return cap(Math.ceil(parsed))
}
}
return RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1)
return cap(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1))
}
}
return Math.min(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1), RETRY_MAX_DELAY_NO_HEADERS)
return cap(Math.min(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1), RETRY_MAX_DELAY_NO_HEADERS))
}
export function retryable(error: ReturnType<NamedError["toObject"]>) {
export function retryable(error: Err) {
// context overflow errors should not be retried
if (MessageV2.ContextOverflowError.isInstance(error)) return undefined
if (MessageV2.APIError.isInstance(error)) {
@@ -98,4 +90,33 @@ export namespace SessionRetry {
return undefined
}
}
export function policy(opts: { sessionID: SessionID; parse: (error: unknown) => Err }) {
let attempt = 0
return Schedule.identity<unknown>().pipe(
Schedule.while(({ input }: Schedule.InputMetadata<unknown>) =>
Effect.succeed(retryable(opts.parse(input)) !== undefined),
),
Schedule.addDelay((input) =>
Effect.gen(function* () {
const error = opts.parse(input)
const message = retryable(error)
attempt += 1
const wait = delay(attempt, MessageV2.APIError.isInstance(error) ? error : undefined)
const now = yield* Clock.currentTimeMillis
if (message) {
yield* Effect.promise(() =>
SessionStatus.set(opts.sessionID, {
type: "retry",
attempt,
message,
next: now + wait,
}),
)
}
return wait
}),
),
)
}
}

View File

@@ -123,7 +123,7 @@ function fake(
result: "continue" | "compact",
): ReturnType<(typeof SessionProcessorModule.SessionProcessor)["create"]> {
const msg = input.assistantMessage
return {
return Promise.resolve({
get message() {
return msg
},
@@ -139,7 +139,7 @@ function fake(
}
},
process: async () => result,
}
})
}
function wait(ms = 50) {

View File

@@ -2,9 +2,14 @@ import { describe, expect, test } from "bun:test"
import type { NamedError } from "@opencode-ai/util/error"
import { APICallError } from "ai"
import { setTimeout as sleep } from "node:timers/promises"
import { Effect, Schedule } from "effect"
import { SessionRetry } from "../../src/session/retry"
import { MessageV2 } from "../../src/session/message-v2"
import { ProviderID } from "../../src/provider/schema"
import { SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
const providerID = ProviderID.make("test")
@@ -69,24 +74,39 @@ describe("session.retry.delay", () => {
expect(SessionRetry.delay(1, longError)).toBe(700000)
})
test("sleep caps delay to max 32-bit signed integer to avoid TimeoutOverflowWarning", async () => {
const controller = new AbortController()
test("caps oversized header delays to the runtime timer limit", () => {
const error = apiError({ "retry-after-ms": "999999999999" })
expect(SessionRetry.delay(1, error)).toBe(SessionRetry.RETRY_MAX_DELAY)
})
const warnings: string[] = []
const originalWarn = process.emitWarning
process.emitWarning = (warning: string | Error) => {
warnings.push(typeof warning === "string" ? warning : warning.message)
}
test("policy updates retry status and increments attempts", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
const sessionID = SessionID.make("session-retry-test")
const error = apiError({ "retry-after-ms": "0" })
const promise = SessionRetry.sleep(2_560_914_000, controller.signal)
controller.abort()
await Effect.runPromise(
Effect.gen(function* () {
const step = yield* Schedule.toStepWithMetadata(
SessionRetry.policy({
sessionID,
parse: (err) => err as MessageV2.APIError,
}),
)
yield* step(error)
yield* step(error)
}),
)
try {
await promise
} catch {}
process.emitWarning = originalWarn
expect(warnings.some((w) => w.includes("TimeoutOverflowWarning"))).toBe(false)
expect(await SessionStatus.get(sessionID)).toMatchObject({
type: "retry",
attempt: 2,
message: "boom",
})
},
})
})
})