diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index 5d7e8778d7..f21a61c97e 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -4,11 +4,12 @@ export interface Runner { readonly state: State readonly busy: boolean readonly ensureRunning: (work: Effect.Effect) => Effect.Effect - readonly startShell: (work: Effect.Effect, ready?: Latch.Latch) => Effect.Effect + readonly startShell: (work: Effect.Effect, ready?: Latch.Latch) => Effect.Effect readonly cancel: Effect.Effect } export class Cancelled extends Schema.TaggedErrorClass()("RunnerCancelled", {}) {} +export class Busy extends Schema.TaggedErrorClass()("RunnerBusy", {}) {} interface RunHandle { id: number @@ -41,12 +42,11 @@ export const make = ( onIdle?: Effect.Effect onBusy?: Effect.Effect onInterrupt?: Effect.Effect - busy?: () => never }, ): Runner => { const ref = SynchronizedRef.makeUnsafe>({ _tag: "Idle" }) const idle = opts?.onIdle ?? Effect.void - const busy = opts?.onBusy ?? Effect.void + const onBusy = opts?.onBusy ?? Effect.void const onInterrupt = opts?.onInterrupt let ids = 0 @@ -137,20 +137,15 @@ export const make = ( }), ).pipe(Effect.flatten) - const startShell = (work: Effect.Effect, ready?: Latch.Latch) => + const startShell = (work: Effect.Effect, ready?: Latch.Latch): Effect.Effect => SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { if (st._tag !== "Idle") { - return [ - Effect.sync(() => { - if (opts?.busy) opts.busy() - throw new Error("Runner is busy") - }), - st, - ] as const + const reject: Effect.Effect = Effect.fail(new Busy()) + return [reject, st] as const } - yield* busy + yield* onBusy const id = next() const cancelled = yield* Deferred.make() const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts index 1fb2455ab5..e3f79965da 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts @@ -58,13 +58,10 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session", const bus = yield* Bus.Service const scope = yield* Scope.Scope - const mapBusy = (effect: Effect.Effect): Effect.Effect => - effect.pipe( - Effect.catchCause((cause): Effect.Effect => { - if (Cause.squash(cause) instanceof Session.BusyError) return Effect.fail(new HttpApiError.BadRequest({})) - return Effect.failCause(cause) - }), - ) + const mapBusy = ( + effect: Effect.Effect, + ): Effect.Effect => + effect.pipe(Effect.catchTag("SessionBusyError", () => Effect.fail(new HttpApiError.BadRequest({})))) const list = Effect.fn("SessionHttpApi.list")(function* (ctx: { query: typeof ListQuery.Type }) { return yield* session.list({ diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 1ae4114269..12a9e1a8b0 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -167,7 +167,7 @@ export interface Interface { readonly cancel: (sessionID: SessionID) => Effect.Effect readonly prompt: (input: PromptInput) => Effect.Effect readonly loop: (input: LoopInput) => Effect.Effect - readonly shell: (input: ShellInput) => Effect.Effect + readonly shell: (input: ShellInput) => Effect.Effect readonly command: (input: CommandInput) => Effect.Effect readonly resolvePromptParts: (template: string) => Effect.Effect } @@ -1864,12 +1864,12 @@ NOTE: At any point in time through this workflow you should feel free to ask the return yield* state.ensureRunning(input.sessionID, lastAssistant(input.sessionID), runLoop(input.sessionID)) }) - const shell: (input: ShellInput) => Effect.Effect = Effect.fn("SessionPrompt.shell")( - function* (input: ShellInput) { - const ready = yield* Latch.make() - return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready) - }, - ) + const shell: (input: ShellInput) => Effect.Effect = Effect.fn( + "SessionPrompt.shell", + )(function* (input: ShellInput) { + const ready = yield* Latch.make() + return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready) + }) const command = Effect.fn("SessionPrompt.command")(function* (input: CommandInput) { yield* elog.info("command", { sessionID: input.sessionID, command: input.command, agent: input.agent }) diff --git a/packages/opencode/src/session/revert.ts b/packages/opencode/src/session/revert.ts index 8ba7b265b9..950d533a3d 100644 --- a/packages/opencode/src/session/revert.ts +++ b/packages/opencode/src/session/revert.ts @@ -20,8 +20,8 @@ export const RevertInput = Schema.Struct({ export type RevertInput = Schema.Schema.Type export interface Interface { - readonly revert: (input: RevertInput) => Effect.Effect - readonly unrevert: (input: { sessionID: SessionID }) => Effect.Effect + readonly revert: (input: RevertInput) => Effect.Effect + readonly unrevert: (input: { sessionID: SessionID }) => Effect.Effect readonly cleanup: (session: Session.Info) => Effect.Effect } diff --git a/packages/opencode/src/session/run-state.ts b/packages/opencode/src/session/run-state.ts index 9d4986f174..e33dba0054 100644 --- a/packages/opencode/src/session/run-state.ts +++ b/packages/opencode/src/session/run-state.ts @@ -7,7 +7,7 @@ import { SessionID } from "./schema" import { SessionStatus } from "./status" export interface Interface { - readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect + readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect readonly cancel: (sessionID: SessionID) => Effect.Effect readonly ensureRunning: ( sessionID: SessionID, @@ -19,7 +19,7 @@ export interface Interface { onInterrupt: Effect.Effect, work: Effect.Effect, ready?: Latch.Latch, - ) => Effect.Effect + ) => Effect.Effect } export class Service extends Context.Service()("@opencode/SessionRunState") {} @@ -60,9 +60,6 @@ export const layer = Layer.effect( }), onBusy: status.set(sessionID, { type: "busy" }), onInterrupt, - busy: () => { - throw new Session.BusyError(sessionID) - }, }) data.runners.set(sessionID, next) return next @@ -71,7 +68,7 @@ export const layer = Layer.effect( const assertNotBusy = Effect.fn("SessionRunState.assertNotBusy")(function* (sessionID: SessionID) { const data = yield* InstanceState.get(state) const existing = data.runners.get(sessionID) - if (existing?.busy) throw new Session.BusyError(sessionID) + if (existing?.busy) yield* busyError(sessionID) }) const cancel = Effect.fn("SessionRunState.cancel")(function* (sessionID: SessionID) { @@ -98,7 +95,9 @@ export const layer = Layer.effect( work: Effect.Effect, ready?: Latch.Latch, ) { - return yield* (yield* runner(sessionID, onInterrupt)).startShell(work, ready) + return yield* (yield* runner(sessionID, onInterrupt)) + .startShell(work, ready) + .pipe(Effect.catchTag("RunnerBusy", () => Effect.fail(busyError(sessionID)))) }) return Service.of({ assertNotBusy, cancel, ensureRunning, startShell }) @@ -107,4 +106,8 @@ export const layer = Layer.effect( export const defaultLayer = layer.pipe(Layer.provide(SessionStatus.defaultLayer)) +function busyError(sessionID: SessionID) { + return new Session.BusyError({ sessionID }) +} + export * as SessionRunState from "./run-state" diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index 85486480aa..3fb924dc81 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -442,11 +442,9 @@ export const getUsage = (input: { model: Provider.Model; usage: LanguageModelUsa } } -export class BusyError extends Error { - constructor(public readonly sessionID: string) { - super(`Session ${sessionID} is busy`) - } -} +export class BusyError extends Schema.TaggedErrorClass()("SessionBusyError", { + sessionID: SessionID, +}) {} export type NotFound = NotFoundError diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts index 3030ca64e0..27fe9e0254 100644 --- a/packages/opencode/test/effect/runner.test.ts +++ b/packages/opencode/test/effect/runner.test.ts @@ -1,5 +1,5 @@ import { describe, expect } from "bun:test" -import { Deferred, Effect, Exit, Fiber, Latch, Ref, Scope } from "effect" +import { Cause, Deferred, Effect, Exit, Fiber, Latch, Ref, Scope } from "effect" import { Runner } from "@/effect/runner" import { it } from "../lib/effect" @@ -302,34 +302,13 @@ describe("Runner", () => { const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) expect(Cause.squash(exit.cause)).toBeInstanceOf(Runner.Busy) yield* Deferred.succeed(gate, undefined) yield* Fiber.await(sh) }), ) - it.live( - "shell rejects via busy callback and cancel still stops the first shell", - Effect.gen(function* () { - const s = yield* Scope.Scope - const runner = Runner.make(s, { - busy: () => { - throw new Error("busy") - }, - }) - - const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild) - yield* waitForState(runner, "Shell") - - const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit) - expect(Exit.isFailure(exit)).toBe(true) - - yield* runner.cancel - const done = yield* Fiber.await(sh) - expect(Exit.isFailure(done)).toBe(true) - }), - ) - it.live( "cancel interrupts shell", Effect.gen(function* () { diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index c87e11880c..9ad2fbe1f6 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -1119,7 +1119,7 @@ it.instance( ) it.instance( - "assertNotBusy throws BusyError when loop running", + "assertNotBusy fails with BusyError when loop running", () => Effect.gen(function* () { const { llm } = yield* useServerConfig(providerCfg) @@ -1138,6 +1138,7 @@ it.instance( expect(Exit.isFailure(exit)).toBe(true) if (Exit.isFailure(exit)) { expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) + expect(Cause.squash(exit.cause)).toMatchObject({ _tag: "SessionBusyError", sessionID: chat.id }) } yield* prompt.cancel(chat.id) @@ -1181,6 +1182,7 @@ it.instance( expect(Exit.isFailure(exit)).toBe(true) if (Exit.isFailure(exit)) { expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) + expect(Cause.squash(exit.cause)).toMatchObject({ _tag: "SessionBusyError", sessionID: chat.id }) } yield* prompt.cancel(chat.id)