diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index eb01739c15..e524217a4a 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -350,14 +350,14 @@ export namespace Session { readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect readonly children: (parentID: SessionID) => Effect.Effect readonly remove: (sessionID: SessionID) => Effect.Effect - readonly updateMessage: (msg: MessageV2.Info) => Effect.Effect + readonly updateMessage: (msg: T) => Effect.Effect readonly removeMessage: (input: { sessionID: SessionID; messageID: MessageID }) => Effect.Effect readonly removePart: (input: { sessionID: SessionID messageID: MessageID partID: PartID }) => Effect.Effect - readonly updatePart: (part: MessageV2.Part) => Effect.Effect + readonly updatePart: (part: T) => Effect.Effect readonly updatePartDelta: (input: { sessionID: SessionID messageID: MessageID @@ -485,26 +485,23 @@ export namespace Session { } }) - const updateMessage = Effect.fn("Session.updateMessage")(function* (msg: MessageV2.Info) { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.Updated, { - sessionID: msg.sessionID, - info: msg, - }), - ) - return msg - }) + const updateMessage = (msg: T): Effect.Effect => + Effect.gen(function* () { + yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })) + return msg + }).pipe(Effect.withSpan("Session.updateMessage")) - const updatePart = Effect.fn("Session.updatePart")(function* (part: MessageV2.Part) { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.PartUpdated, { - sessionID: part.sessionID, - part: structuredClone(part), - time: Date.now(), - }), - ) - return part - }) + const updatePart = (part: T): Effect.Effect => + Effect.gen(function* () { + yield* Effect.sync(() => + SyncEvent.run(MessageV2.Event.PartUpdated, { + sessionID: part.sessionID, + part: structuredClone(part), + time: Date.now(), + }), + ) + return part + }).pipe(Effect.withSpan("Session.updatePart")) const create = Effect.fn("Session.create")(function* (input?: { parentID?: SessionID @@ -867,7 +864,17 @@ export namespace Session { export const children = fn(SessionID.zod, (id) => runPromise((svc) => svc.children(id))) export const remove = fn(SessionID.zod, (id) => runPromise((svc) => svc.remove(id))) - export const updateMessage = fn(MessageV2.Info, (msg) => runPromise((svc) => svc.updateMessage(msg))) + export const updateMessage = Object.assign( + async function updateMessage(msg: T): Promise { + return runPromise((svc) => svc.updateMessage(MessageV2.Info.parse(msg) as T)) + }, + { + schema: MessageV2.Info, + force(msg: T): Promise { + return runPromise((svc) => svc.updateMessage(msg)) + }, + }, + ) export const removeMessage = fn(z.object({ sessionID: SessionID.zod, messageID: MessageID.zod }), (input) => runPromise((svc) => svc.removeMessage(input)), @@ -878,7 +885,17 @@ export namespace Session { (input) => runPromise((svc) => svc.removePart(input)), ) - export const updatePart = fn(MessageV2.Part, (part) => runPromise((svc) => svc.updatePart(part))) + export const updatePart = Object.assign( + async function updatePart(part: T): Promise { + return runPromise((svc) => svc.updatePart(MessageV2.Part.parse(part) as T)) + }, + { + schema: MessageV2.Part, + force(part: T): Promise { + return runPromise((svc) => svc.updatePart(part)) + }, + }, + ) export const updatePartDelta = fn( z.object({ diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 56828bf865..30140a3324 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -386,7 +386,7 @@ const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) { const test = yield* TestLLM const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service - const chat = yield* sessions.create(input ?? {}) + const chat = yield* sessions.create(input ?? { title: "Pinned" }) return { test, prompt, sessions, chat } }) @@ -438,6 +438,10 @@ it.effect("loop continues when finish is tool-calls", () => const result = yield* prompt.loop({ sessionID: chat.id }) expect(yield* test.calls).toBe(2) expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") { + expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true) + expect(result.info.finish).toBe("stop") + } }), { git: true, config: cfg }, ), @@ -540,7 +544,7 @@ it.effect("loop sets status to busy then idle", () => // Cancel semantics it.effect( - "cancel interrupts loop and returns last assistant", + "cancel interrupts loop and resolves with an assistant message", () => provideTmpdirInstance( (dir) => @@ -629,9 +633,11 @@ it.effect( yield* prompt.cancel(chat.id) const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) - // Both should resolve (success or interrupt, not error) - expect(Exit.isFailure(exitA) && !Cause.hasInterruptsOnly(exitA.cause)).toBe(false) - expect(Exit.isFailure(exitB) && !Cause.hasInterruptsOnly(exitB.cause)).toBe(false) + expect(Exit.isSuccess(exitA)).toBe(true) + expect(Exit.isSuccess(exitB)).toBe(true) + if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) { + expect(exitA.value.info.id).toBe(exitB.value.info.id) + } }), { git: true, config: cfg }, ), @@ -678,6 +684,9 @@ it.effect("concurrent loop callers all receive same error result", () => if (a.info.role === "assistant") { expect(a.info.error).toBeDefined() } + if (b.info.role === "assistant") { + expect(b.info.error).toBeDefined() + } }), { git: true, config: cfg }, ), @@ -708,6 +717,9 @@ it.effect( const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) + } yield* prompt.cancel(chat.id) yield* Fiber.await(fiber) @@ -755,6 +767,9 @@ it.effect( const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) + } yield* prompt.cancel(chat.id) yield* Fiber.await(fiber) @@ -805,8 +820,8 @@ it.effect( while (Date.now() - start < 2000) { const msgs = await MessageV2.filterCompacted(MessageV2.stream(chat.id)) const taskMsg = msgs.find((item) => item.info.role === "assistant") - const tool = taskMsg ? runningTool(taskMsg.parts) : undefined - if (tool?.state.metadata?.output.includes("first")) return + const tool = taskMsg ? toolPart(taskMsg.parts) : undefined + if (tool?.state.status === "running" && tool.state.metadata?.output.includes("first")) return await new Promise((done) => setTimeout(done, 20)) } throw new Error("timed out waiting for running shell metadata") @@ -909,7 +924,10 @@ it.effect( expect(Exit.isSuccess(exit)).toBe(true) if (Exit.isSuccess(exit)) { expect(exit.value.info.role).toBe("assistant") - expect(exit.value.parts.some((part) => part.type === "tool")).toBe(true) + const tool = completedTool(exit.value.parts) + if (tool) { + expect(tool.state.output).toContain("User aborted the command") + } } const status = yield* SessionStatus.Service @@ -922,6 +940,37 @@ it.effect( 30_000, ) +it.effect( + "cancel interrupts loop queued behind shell", + () => + provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const { prompt, chat } = yield* boot() + + const sh = yield* prompt + .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" }) + .pipe(Effect.forkChild) + yield* waitMs(50) + + const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* waitMs(50) + + yield* prompt.cancel(chat.id) + + const exit = yield* Fiber.await(run) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true) + } + + yield* Fiber.await(sh) + }), + { git: true, config: cfg }, + ), + 30_000, +) + it.effect( "shell rejects when another shell is already running", () => @@ -937,6 +986,9 @@ it.effect( const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) + } yield* prompt.cancel(chat.id) yield* Fiber.await(a)