fix(session): type busy errors (#27410)

This commit is contained in:
Kit Langton
2026-05-13 21:28:04 -04:00
committed by GitHub
parent 3fc7486d15
commit edf7649400
8 changed files with 38 additions and 64 deletions

View File

@@ -4,11 +4,12 @@ export interface Runner<A, E = never> {
readonly state: State<A, E>
readonly busy: boolean
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly startShell: (work: Effect.Effect<A, E>, ready?: Latch.Latch) => Effect.Effect<A, E>
readonly startShell: (work: Effect.Effect<A, E>, ready?: Latch.Latch) => Effect.Effect<A, E | Busy>
readonly cancel: Effect.Effect<void>
}
export class Cancelled extends Schema.TaggedErrorClass<Cancelled>()("RunnerCancelled", {}) {}
export class Busy extends Schema.TaggedErrorClass<Busy>()("RunnerBusy", {}) {}
interface RunHandle<A, E> {
id: number
@@ -41,12 +42,11 @@ export const make = <A, E = never>(
onIdle?: Effect.Effect<void>
onBusy?: Effect.Effect<void>
onInterrupt?: Effect.Effect<A, E>
busy?: () => never
},
): Runner<A, E> => {
const ref = SynchronizedRef.makeUnsafe<State<A, E>>({ _tag: "Idle" })
const idle = opts?.onIdle ?? Effect.void
const busy = opts?.onBusy ?? Effect.void
const onBusy = opts?.onBusy ?? Effect.void
const onInterrupt = opts?.onInterrupt
let ids = 0
@@ -137,20 +137,15 @@ export const make = <A, E = never>(
}),
).pipe(Effect.flatten)
const startShell = (work: Effect.Effect<A, E>, ready?: Latch.Latch) =>
const startShell = (work: Effect.Effect<A, E>, ready?: Latch.Latch): Effect.Effect<A, E | Busy> =>
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {
if (st._tag !== "Idle") {
return [
Effect.sync(() => {
if (opts?.busy) opts.busy()
throw new Error("Runner is busy")
}),
st,
] as const
const reject: Effect.Effect<A, E | Busy> = Effect.fail(new Busy())
return [reject, st] as const
}
yield* busy
yield* onBusy
const id = next()
const cancelled = yield* Deferred.make<void>()
const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)

View File

@@ -58,13 +58,10 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session",
const bus = yield* Bus.Service
const scope = yield* Scope.Scope
const mapBusy = <A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E | HttpApiError.BadRequest, R> =>
effect.pipe(
Effect.catchCause((cause): Effect.Effect<never, E | HttpApiError.BadRequest> => {
if (Cause.squash(cause) instanceof Session.BusyError) return Effect.fail(new HttpApiError.BadRequest({}))
return Effect.failCause(cause)
}),
)
const mapBusy = <A, R>(
effect: Effect.Effect<A, Session.BusyError, R>,
): Effect.Effect<A, HttpApiError.BadRequest, R> =>
effect.pipe(Effect.catchTag("SessionBusyError", () => Effect.fail(new HttpApiError.BadRequest({}))))
const list = Effect.fn("SessionHttpApi.list")(function* (ctx: { query: typeof ListQuery.Type }) {
return yield* session.list({

View File

@@ -167,7 +167,7 @@ export interface Interface {
readonly cancel: (sessionID: SessionID) => Effect.Effect<void>
readonly prompt: (input: PromptInput) => Effect.Effect<MessageV2.WithParts>
readonly loop: (input: LoopInput) => Effect.Effect<MessageV2.WithParts>
readonly shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts>
readonly shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts, Session.BusyError>
readonly command: (input: CommandInput) => Effect.Effect<MessageV2.WithParts>
readonly resolvePromptParts: (template: string) => Effect.Effect<PromptInput["parts"]>
}
@@ -1864,12 +1864,12 @@ NOTE: At any point in time through this workflow you should feel free to ask the
return yield* state.ensureRunning(input.sessionID, lastAssistant(input.sessionID), runLoop(input.sessionID))
})
const shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.shell")(
function* (input: ShellInput) {
const ready = yield* Latch.make()
return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready)
},
)
const shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts, Session.BusyError> = Effect.fn(
"SessionPrompt.shell",
)(function* (input: ShellInput) {
const ready = yield* Latch.make()
return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready)
})
const command = Effect.fn("SessionPrompt.command")(function* (input: CommandInput) {
yield* elog.info("command", { sessionID: input.sessionID, command: input.command, agent: input.agent })

View File

@@ -20,8 +20,8 @@ export const RevertInput = Schema.Struct({
export type RevertInput = Schema.Schema.Type<typeof RevertInput>
export interface Interface {
readonly revert: (input: RevertInput) => Effect.Effect<Session.Info>
readonly unrevert: (input: { sessionID: SessionID }) => Effect.Effect<Session.Info>
readonly revert: (input: RevertInput) => Effect.Effect<Session.Info, Session.BusyError>
readonly unrevert: (input: { sessionID: SessionID }) => Effect.Effect<Session.Info, Session.BusyError>
readonly cleanup: (session: Session.Info) => Effect.Effect<void>
}

View File

@@ -7,7 +7,7 @@ import { SessionID } from "./schema"
import { SessionStatus } from "./status"
export interface Interface {
readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect<void>
readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect<void, Session.BusyError>
readonly cancel: (sessionID: SessionID) => Effect.Effect<void>
readonly ensureRunning: (
sessionID: SessionID,
@@ -19,7 +19,7 @@ export interface Interface {
onInterrupt: Effect.Effect<MessageV2.WithParts>,
work: Effect.Effect<MessageV2.WithParts>,
ready?: Latch.Latch,
) => Effect.Effect<MessageV2.WithParts>
) => Effect.Effect<MessageV2.WithParts, Session.BusyError>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/SessionRunState") {}
@@ -60,9 +60,6 @@ export const layer = Layer.effect(
}),
onBusy: status.set(sessionID, { type: "busy" }),
onInterrupt,
busy: () => {
throw new Session.BusyError(sessionID)
},
})
data.runners.set(sessionID, next)
return next
@@ -71,7 +68,7 @@ export const layer = Layer.effect(
const assertNotBusy = Effect.fn("SessionRunState.assertNotBusy")(function* (sessionID: SessionID) {
const data = yield* InstanceState.get(state)
const existing = data.runners.get(sessionID)
if (existing?.busy) throw new Session.BusyError(sessionID)
if (existing?.busy) yield* busyError(sessionID)
})
const cancel = Effect.fn("SessionRunState.cancel")(function* (sessionID: SessionID) {
@@ -98,7 +95,9 @@ export const layer = Layer.effect(
work: Effect.Effect<MessageV2.WithParts>,
ready?: Latch.Latch,
) {
return yield* (yield* runner(sessionID, onInterrupt)).startShell(work, ready)
return yield* (yield* runner(sessionID, onInterrupt))
.startShell(work, ready)
.pipe(Effect.catchTag("RunnerBusy", () => Effect.fail(busyError(sessionID))))
})
return Service.of({ assertNotBusy, cancel, ensureRunning, startShell })
@@ -107,4 +106,8 @@ export const layer = Layer.effect(
export const defaultLayer = layer.pipe(Layer.provide(SessionStatus.defaultLayer))
function busyError(sessionID: SessionID) {
return new Session.BusyError({ sessionID })
}
export * as SessionRunState from "./run-state"

View File

@@ -442,11 +442,9 @@ export const getUsage = (input: { model: Provider.Model; usage: LanguageModelUsa
}
}
export class BusyError extends Error {
constructor(public readonly sessionID: string) {
super(`Session ${sessionID} is busy`)
}
}
export class BusyError extends Schema.TaggedErrorClass<BusyError>()("SessionBusyError", {
sessionID: SessionID,
}) {}
export type NotFound = NotFoundError

View File

@@ -1,5 +1,5 @@
import { describe, expect } from "bun:test"
import { Deferred, Effect, Exit, Fiber, Latch, Ref, Scope } from "effect"
import { Cause, Deferred, Effect, Exit, Fiber, Latch, Ref, Scope } from "effect"
import { Runner } from "@/effect/runner"
import { it } from "../lib/effect"
@@ -302,34 +302,13 @@ describe("Runner", () => {
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) expect(Cause.squash(exit.cause)).toBeInstanceOf(Runner.Busy)
yield* Deferred.succeed(gate, undefined)
yield* Fiber.await(sh)
}),
)
it.live(
"shell rejects via busy callback and cancel still stops the first shell",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s, {
busy: () => {
throw new Error("busy")
},
})
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
yield* waitForState(runner, "Shell")
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
yield* runner.cancel
const done = yield* Fiber.await(sh)
expect(Exit.isFailure(done)).toBe(true)
}),
)
it.live(
"cancel interrupts shell",
Effect.gen(function* () {

View File

@@ -1119,7 +1119,7 @@ it.instance(
)
it.instance(
"assertNotBusy throws BusyError when loop running",
"assertNotBusy fails with BusyError when loop running",
() =>
Effect.gen(function* () {
const { llm } = yield* useServerConfig(providerCfg)
@@ -1138,6 +1138,7 @@ it.instance(
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
expect(Cause.squash(exit.cause)).toMatchObject({ _tag: "SessionBusyError", sessionID: chat.id })
}
yield* prompt.cancel(chat.id)
@@ -1181,6 +1182,7 @@ it.instance(
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
expect(Cause.squash(exit.cause)).toMatchObject({ _tag: "SessionBusyError", sessionID: chat.id })
}
yield* prompt.cancel(chat.id)