diff --git a/packages/opencode/src/cli/cmd/run/runtime.ts b/packages/opencode/src/cli/cmd/run/runtime.ts index d811106bd8..882ff2e6c7 100644 --- a/packages/opencode/src/cli/cmd/run/runtime.ts +++ b/packages/opencode/src/cli/cmd/run/runtime.ts @@ -487,6 +487,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { const handle = await mod.createSessionTransport({ sdk: ctx.sdk, + directory: ctx.directory, sessionID: state.sessionID, thinking: input.thinking, limits: () => state.limits, diff --git a/packages/opencode/src/cli/cmd/run/stream.transport.ts b/packages/opencode/src/cli/cmd/run/stream.transport.ts index d4b73ce6fa..c528c3d374 100644 --- a/packages/opencode/src/cli/cmd/run/stream.transport.ts +++ b/packages/opencode/src/cli/cmd/run/stream.transport.ts @@ -1,8 +1,9 @@ -// SDK event subscription and prompt turn coordination. +// Global event subscription and prompt turn coordination. // -// Creates a long-lived event stream subscription and feeds every event -// through the session-data reducer. The reducer produces scrollback commits -// and footer patches, which get forwarded to the footer through stream.ts. +// Creates a long-lived global event stream subscription and feeds relevant +// events for the current session tree through the reducers. The reducers +// produce scrollback commits and footer patches, which get forwarded to the +// footer through stream.ts. // // Prompt turns are one-at-a-time: runPromptTurn() sends the prompt to the // SDK, arms a deferred Wait, and resolves when the session becomes idle. @@ -14,7 +15,7 @@ // The tick counter prevents stale idle events from resolving the wrong turn. // We also re-check live session status before resolving an idle event so a // delayed idle from an older turn cannot complete a newer busy turn. -import type { Event, OpencodeClient } from "@opencode-ai/sdk/v2" +import type { Event, GlobalEvent, OpencodeClient } from "@opencode-ai/sdk/v2" import { Context, Deferred, Effect, Exit, Layer, Scope, Stream } from "effect" import { makeRuntime } from "@/effect/run-service" import { @@ -62,6 +63,7 @@ type Trace = { type StreamInput = { sdk: OpencodeClient + directory?: string sessionID: string thinking: boolean limits: () => Record @@ -151,6 +153,40 @@ function isEvent(value: unknown): value is Event { return typeof type === "string" && !!properties && typeof properties === "object" } +function isGlobalEvent(value: unknown): value is GlobalEvent { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return false + } + + const payload = Reflect.get(value, "payload") + return !!payload && typeof payload === "object" +} + +function globalPayloadEvent(value: unknown): Event | undefined { + if (!isGlobalEvent(value)) { + return undefined + } + + const payload = value.payload + if (payload.type === "sync") { + return undefined + } + + return isEvent(payload) ? payload : undefined +} + +function isMatchingDisposeEvent(value: unknown, directory: string | undefined): boolean { + if (!directory || !isGlobalEvent(value)) { + return false + } + + if (value.directory !== directory) { + return false + } + + return value.payload.type === "server.instance.disposed" +} + function active(event: Event, sessionID: string): boolean { if (sid(event) !== sessionID) { return false @@ -371,7 +407,7 @@ function createLayer(input: StreamInput) { const events = yield* Scope.provide(scope)( Effect.acquireRelease( Effect.promise(() => - input.sdk.event.subscribe(undefined, { + input.sdk.global.event({ signal: abort.signal, }), ), @@ -397,7 +433,6 @@ function createLayer(input: StreamInput) { blockers: new Map(), } const recovering = new Set() - const currentSubagentState = () => { if (state.selectedSubagent && !state.subagent.tabs.has(state.selectedSubagent)) { state.selectedSubagent = undefined @@ -526,6 +561,38 @@ function createLayer(input: StreamInput) { Effect.orElseSucceed(() => []), ) + const bootstrapSubagentHistory = Effect.fn("RunStreamTransport.bootstrapSubagentHistory")( + function* (sessions: string[]) { + yield* Effect.forEach( + sessions, + (sessionID) => + messages(sessionID, SUBAGENT_CALL_BOOTSTRAP_LIMIT).pipe( + Effect.tap((messagesList) => + Effect.sync(() => { + if ( + !bootstrapSubagentCalls({ + data: state.subagent, + sessionID, + messages: messagesList, + thinking: input.thinking, + limits: input.limits(), + }) + ) { + return + } + + syncFooter([], undefined, currentSubagentState()) + }), + ), + ), + { + concurrency: 4, + discard: true, + }, + ) + }, + ) + const bootstrap = Effect.fn("RunStreamTransport.bootstrap")(function* () { const [messagesList, children, permissions, questions] = yield* Effect.all( [ @@ -566,33 +633,6 @@ function createLayer(input: StreamInput) { questions, }) - const sessions = [ - ...new Set( - listSubagentPermissions(state.subagent) - .filter((item) => item.tool && item.metadata?.input === undefined) - .map((item) => item.sessionID), - ), - ] - yield* Effect.forEach( - sessions, - (sessionID) => - messages(sessionID, SUBAGENT_CALL_BOOTSTRAP_LIMIT).pipe( - Effect.tap((messagesList) => - Effect.sync(() => { - bootstrapSubagentCalls({ - data: state.subagent, - sessionID, - messages: messagesList, - }) - }), - ), - ), - { - concurrency: "unbounded", - discard: true, - }, - ) - for (const request of [ ...state.data.permissions, ...listSubagentPermissions(state.subagent), @@ -605,6 +645,13 @@ function createLayer(input: StreamInput) { const snapshot = currentSubagentState() traceTabs(input.trace, [], snapshot.tabs) syncFooter([], undefined, snapshot) + + const sessions = [...state.subagent.tabs.keys()] + if (sessions.length === 0) { + return + } + + yield* bootstrapSubagentHistory(sessions).pipe(Effect.forkIn(scope, { startImmediately: true }), Effect.asVoid) }) const idle = Effect.fn("RunStreamTransport.idle")((fallback: boolean) => @@ -700,11 +747,22 @@ function createLayer(input: StreamInput) { return } - if (!isEvent(item)) { + if (isMatchingDisposeEvent(item, input.directory)) { + yield* fail(new Error("instance disposed")) + yield* closeScope() + return + } + + const event = globalPayloadEvent(item) + if (!event) { + return + } + + const sessionID = sid(event) + if (sessionID !== input.sessionID && (!sessionID || !state.subagent.tabs.has(sessionID))) { return } - const event = item input.trace?.write("recv.event", event) trackBlocker(event) @@ -754,7 +812,7 @@ function createLayer(input: StreamInput) { Effect.ensuring( Effect.gen(function* () { if (!abort.signal.aborted && !state.fault) { - yield* fail(new Error("session event stream closed")) + yield* fail(new Error("global event stream closed")) } closeStream() }), diff --git a/packages/opencode/src/cli/cmd/run/subagent-data.ts b/packages/opencode/src/cli/cmd/run/subagent-data.ts index e834ff74f0..e9dcd6538a 100644 --- a/packages/opencode/src/cli/cmd/run/subagent-data.ts +++ b/packages/opencode/src/cli/cmd/run/subagent-data.ts @@ -1,4 +1,4 @@ -import type { Event, Part, PermissionRequest, QuestionRequest, ToolPart } from "@opencode-ai/sdk/v2" +import type { Event, Message, Part, PermissionRequest, QuestionRequest, ToolPart } from "@opencode-ai/sdk/v2" import * as Locale from "@/util/locale" import { bootstrapSessionData, @@ -22,6 +22,10 @@ type SessionMessage = { parts: Part[] } +type BootstrapChildMessage = SessionMessage & { + info: Message +} + type Frame = { key: string commit: StreamCommit @@ -513,6 +517,70 @@ function applyChildEvent(input: { return changed || queueChanged(input.detail.data, before) } +function bootstrapChildEvent(input: { + detail: DetailState + event: Event + thinking: boolean + limits: Record +}) { + const out = reduceSessionData({ + data: input.detail.data, + event: input.event, + sessionID: input.detail.sessionID, + thinking: input.thinking, + limits: input.limits, + }) + + return appendCommits(input.detail, out.commits) +} + +function bootstrapChildMessages(input: { + detail: DetailState + messages: BootstrapChildMessage[] + thinking: boolean + limits: Record +}) { + let changed = false + + for (const message of input.messages) { + changed = + bootstrapChildEvent({ + detail: input.detail, + event: { + id: `bootstrap:message:${message.info.id}`, + type: "message.updated", + properties: { + sessionID: input.detail.sessionID, + info: message.info, + }, + }, + thinking: input.thinking, + limits: input.limits, + }) || changed + + for (const part of message.parts) { + changed = + bootstrapChildEvent({ + detail: input.detail, + event: { + id: `bootstrap:part:${part.id}`, + type: "message.part.updated", + properties: { + sessionID: input.detail.sessionID, + part, + time: 0, + }, + }, + thinking: input.thinking, + limits: input.limits, + }) || changed + } + } + + compactDetail(input.detail) + return changed +} + function knownSession(data: SubagentData, sessionID: string) { return data.tabs.has(sessionID) } @@ -634,7 +702,13 @@ export function bootstrapSubagentData(input: BootstrapSubagentInput) { return changed } -export function bootstrapSubagentCalls(input: { data: SubagentData; sessionID: string; messages: SessionMessage[] }) { +export function bootstrapSubagentCalls(input: { + data: SubagentData + sessionID: string + messages: BootstrapChildMessage[] + thinking: boolean + limits: Record +}) { if (!knownSession(input.data, input.sessionID) || input.messages.length === 0) { return false } @@ -648,9 +722,14 @@ export function bootstrapSubagentCalls(input: { data: SubagentData; sessionID: s permissions: detail.data.permissions, questions: detail.data.questions, }) - compactDetail(detail) + const changed = bootstrapChildMessages({ + detail, + messages: input.messages, + thinking: input.thinking, + limits: input.limits, + }) - return beforeCallCount !== detail.data.call.size || queueChanged(detail.data, before) + return changed || beforeCallCount !== detail.data.call.size || queueChanged(detail.data, before) } export function clearFinishedSubagents(data: SubagentData) { diff --git a/packages/opencode/test/cli/run/stream.transport.test.ts b/packages/opencode/test/cli/run/stream.transport.test.ts index dab5264bd6..9d832d1e34 100644 --- a/packages/opencode/test/cli/run/stream.transport.test.ts +++ b/packages/opencode/test/cli/run/stream.transport.test.ts @@ -1,9 +1,10 @@ import { afterEach, describe, expect, mock, spyOn, test } from "bun:test" -import { OpencodeClient } from "@opencode-ai/sdk/v2" +import { OpencodeClient, type GlobalEvent } from "@opencode-ai/sdk/v2" import { createSessionTransport } from "@/cli/cmd/run/stream.transport" import type { FooterApi, FooterEvent, RunFilePart, StreamCommit } from "@/cli/cmd/run/types" type EventStream = Awaited>["stream"] +type GlobalEventStream = Awaited>["stream"] type SdkEvent = EventStream extends AsyncGenerator ? T : never type SessionMessage = NonNullable>["data"]>[number] type SessionChild = NonNullable>["data"]>[number] @@ -81,12 +82,12 @@ function assistant(id: string) { } satisfies SdkEvent } -function feed() { - const list: SdkEvent[] = [] +function feed() { + const list: T[] = [] let done = false let wake: (() => void) | undefined - const stream: EventStream = (async function* () { + const wrapped = (async function* () { while (!done || list.length > 0) { if (list.length === 0) { await new Promise((resolve) => { @@ -105,8 +106,8 @@ function feed() { })() return { - stream, - push(value: SdkEvent) { + stream: wrapped, + push(value: T) { list.push(value) wake?.() wake = undefined @@ -119,6 +120,14 @@ function feed() { } } +function eventFeed() { + return feed() +} + +function globalFeed() { + return feed() +} + function emptyStream(): EventStream { return (async function* (): AsyncGenerator {})() } @@ -136,6 +145,18 @@ function sse(stream: EventStream) { return Promise.resolve({ stream }) } +function globalSse(stream: GlobalEventStream) { + return Promise.resolve({ stream }) +} + +function wrapGlobalStream(stream: EventStream): GlobalEventStream { + return (async function* () { + for await (const event of stream) { + yield globalEvent(event) + } + })() +} + function statusMap(busy: boolean): SessionStatusMap { if (busy) { return { "session-1": { type: "busy" } } @@ -235,10 +256,10 @@ function completedTool(input: { } } -function textPart(id: string, messageID: string, text: string): TextPart { +function textPart(id: string, messageID: string, text: string, sessionID = "session-1"): TextPart { return { id, - sessionID: "session-1", + sessionID, messageID, type: "text", text, @@ -298,6 +319,14 @@ function child(id: string): SessionChild { } } +function globalEvent(payload: GlobalEvent["payload"]): GlobalEvent { + return { + directory: "/tmp", + project: "project-1", + payload, + } +} + function footer(fn?: (commit: StreamCommit) => void) { const commits: StreamCommit[] = [] const events: FooterEvent[] = [] @@ -333,7 +362,9 @@ function footer(fn?: (commit: StreamCommit) => void) { function sdk( input: { stream?: EventStream + globalStream?: GlobalEventStream subscribe?: OpencodeClient["event"]["subscribe"] + globalEvent?: OpencodeClient["global"]["event"] promptAsync?: OpencodeClient["session"]["promptAsync"] status?: OpencodeClient["session"]["status"] messages?: OpencodeClient["session"]["messages"] @@ -345,6 +376,8 @@ function sdk( const client = new OpencodeClient() const subscribe: OpencodeClient["event"]["subscribe"] = input.subscribe ?? (() => sse(input.stream ?? emptyStream())) + const globalEvent: OpencodeClient["global"]["event"] = + input.globalEvent ?? (() => globalSse(input.globalStream ?? wrapGlobalStream(input.stream ?? emptyStream()))) const promptAsync: OpencodeClient["session"]["promptAsync"] = input.promptAsync ?? (() => ok(undefined)) const status: OpencodeClient["session"]["status"] = input.status ?? (() => ok({})) const messages: OpencodeClient["session"]["messages"] = input.messages ?? (() => ok([])) @@ -353,6 +386,7 @@ function sdk( const questions: OpencodeClient["question"]["list"] = input.questions ?? (() => ok([])) spyOn(client.event, "subscribe").mockImplementation(subscribe) + spyOn(client.global, "event").mockImplementation(globalEvent) spyOn(client.session, "promptAsync").mockImplementation(promptAsync) spyOn(client.session, "status").mockImplementation(status) spyOn(client.session, "messages").mockImplementation(messages) @@ -365,7 +399,7 @@ function sdk( describe("run stream transport", () => { test("bootstraps child tabs and resumed blocker input", async () => { - const src = feed() + const src = eventFeed() const ui = footer() const transport = await createSessionTransport({ sdk: sdk({ @@ -440,61 +474,67 @@ describe("run stream transport", () => { }) try { - expect(ui.events).toContainEqual({ - type: "stream.subagent", - state: { - tabs: [ - expect.objectContaining({ - sessionID: "child-1", - label: "Explore", - description: "Explore run folder", - status: "running", - }), - ], - details: {}, - permissions: [ - expect.objectContaining({ - id: "perm-1", - sessionID: "child-1", - metadata: { - input: { - filePath: "src/run/subagent-data.ts", - diff: "@@ -1 +1 @@", - }, - }, - }), - ], - questions: [], - }, + const boot = await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + const state = item?.type === "stream.subagent" ? item.state : undefined + return state?.tabs.some((tab) => tab.sessionID === "child-1") && state.permissions.some((req) => req.id === "perm-1") + ? state + : undefined }) + expect(boot.tabs).toEqual([ + expect.objectContaining({ + sessionID: "child-1", + label: "Explore", + description: "Explore run folder", + status: "running", + }), + ]) + expect(boot.permissions).toEqual([ + expect.objectContaining({ + id: "perm-1", + sessionID: "child-1", + metadata: { + input: { + filePath: "src/run/subagent-data.ts", + diff: "@@ -1 +1 @@", + }, + }, + }), + ]) + transport.selectSubagent("child-1") - expect(ui.events).toContainEqual({ - type: "stream.subagent", - state: { - tabs: [ + const selected = await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + const state = item?.type === "stream.subagent" ? item.state : undefined + const detail = state?.details["child-1"] + return detail?.commits.some((commit) => commit.kind === "tool" && commit.tool === "edit" && commit.phase === "start") + ? state + : undefined + }) + + expect(selected.details).toEqual({ + "child-1": { + sessionID: "child-1", + commits: [ expect.objectContaining({ - sessionID: "child-1", - label: "Explore", + kind: "tool", + tool: "edit", + phase: "start", }), ], - details: { - "child-1": { - sessionID: "child-1", - commits: [], - }, - }, - permissions: [ - expect.objectContaining({ - id: "perm-1", - }), - ], - questions: [], }, }) - expect(ui.events).toContainEqual({ + expect( + await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.view") + return item?.type === "stream.view" && item.view.type === "permission" && item.view.request.id === "perm-1" + ? item + : undefined + }), + ).toEqual({ type: "stream.view", view: { type: "permission", @@ -515,8 +555,265 @@ describe("run stream transport", () => { } }) + test("bootstraps child session output before selection", async () => { + const ui = footer() + const transport = await createSessionTransport({ + sdk: sdk({ + messages: async ({ sessionID }) => { + if (sessionID === "session-1") { + return ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [ + completedTool({ + sessionID: "session-1", + messageID: "msg-1", + id: "task-1", + callID: "call-1", + tool: "task", + body: { + description: "Explore run.ts", + subagent_type: "explore", + }, + metadata: { + sessionId: "child-1", + }, + }), + ], + }), + ]) + } + + return sessionID === "child-1" + ? ok([ + assistantMessage({ + sessionID: "child-1", + id: "msg-child-1", + parts: [textPart("txt-child-1", "msg-child-1", "subagent summary", "child-1")], + }), + ]) + : ok([]) + }, + children: async () => ok([child("child-1")]), + }), + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + return item?.type === "stream.subagent" && item.state.tabs.some((tab) => tab.sessionID === "child-1") + ? item + : undefined + }) + + transport.selectSubagent("child-1") + + expect( + await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + const detail = item?.type === "stream.subagent" ? item.state.details["child-1"] : undefined + return detail?.commits.some((commit) => commit.kind === "assistant" && commit.text === "subagent summary") + ? detail + : undefined + }), + ).toEqual({ + sessionID: "child-1", + commits: [ + expect.objectContaining({ + kind: "assistant", + text: "subagent summary", + }), + ], + }) + } finally { + await transport.close() + } + }) + + test("does not block startup on child history bootstrap", async () => { + const pending = defer>>>() + const ui = footer() + let transport: Awaited> | undefined + + const task = createSessionTransport({ + sdk: sdk({ + messages: async ({ sessionID }) => { + if (sessionID === "session-1") { + return ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [ + runningTool({ + sessionID: "session-1", + messageID: "msg-1", + id: "task-1", + callID: "call-1", + tool: "task", + body: { + description: "Explore run.ts", + subagent_type: "explore", + }, + metadata: { + sessionId: "child-1", + }, + }), + ], + }), + ]) + } + + if (sessionID === "child-1") { + return pending.promise + } + + return ok([]) + }, + children: async () => ok([child("child-1")]), + }), + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }).then((item) => { + transport = item + return item + }) + + try { + const state = await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + return item?.type === "stream.subagent" && item.state.tabs.some((tab) => tab.sessionID === "child-1") + ? item.state + : undefined + }) + + await waitFor(() => transport) + + expect(state).toEqual({ + tabs: [expect.objectContaining({ sessionID: "child-1", status: "running" })], + details: {}, + permissions: [], + questions: [], + }) + } finally { + pending.resolve(ok([])) + await task + await transport?.close() + } + }) + + test("streams selected subagent output from global events while it is running", async () => { + const global = globalFeed() + const ui = footer() + const transport = await createSessionTransport({ + sdk: sdk({ + globalStream: global.stream, + }), + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + global.push(globalEvent(assistant("msg-1"))) + global.push( + globalEvent( + toolUpdated( + runningTool({ + sessionID: "session-1", + messageID: "msg-1", + id: "task-1", + callID: "call-1", + tool: "task", + body: { + description: "Explore run.ts", + subagent_type: "explore", + }, + metadata: { + sessionId: "child-1", + }, + }), + ), + ), + ) + + await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + return item?.type === "stream.subagent" && item.state.tabs.some((tab) => tab.sessionID === "child-1") + ? item + : undefined + }) + + transport.selectSubagent("child-1") + + global.push( + globalEvent({ + id: "evt-child-message", + type: "message.updated", + properties: { + sessionID: "child-1", + info: assistantMessage({ + sessionID: "child-1", + id: "msg-child-1", + parts: [], + }).info, + }, + }), + ) + global.push(globalEvent(textUpdated(textPart("txt-child-1", "msg-child-1", "hello", "child-1")))) + + expect( + await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + const detail = item?.type === "stream.subagent" ? item.state.details["child-1"] : undefined + return detail?.commits.some((commit) => commit.kind === "assistant" && commit.text === "hello") + ? detail + : undefined + }), + ).toEqual({ + sessionID: "child-1", + commits: [ + expect.objectContaining({ + kind: "assistant", + text: "hello", + }), + ], + }) + + global.push(globalEvent(textUpdated(textPart("txt-child-1", "msg-child-1", "hello world", "child-1")))) + + expect( + await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + const detail = item?.type === "stream.subagent" ? item.state.details["child-1"] : undefined + return detail?.commits.some((commit) => commit.kind === "assistant" && commit.text === "hello world") + ? detail + : undefined + }, 2_000), + ).toEqual({ + sessionID: "child-1", + commits: [ + expect.objectContaining({ + kind: "assistant", + text: "hello world", + }), + ], + }) + } finally { + global.close() + await transport.close() + } + }) + test("recovers pending questions from question.list when question.asked is missed", async () => { - const src = feed() + const src = eventFeed() const ui = footer() let questionCalls = 0 const request = { @@ -639,7 +936,7 @@ describe("run stream transport", () => { }) test("does not resurrect questions if question.list resolves after tool completion", async () => { - const src = feed() + const src = eventFeed() const ui = footer() const started = defer() const request = { @@ -736,6 +1033,12 @@ describe("run stream transport", () => { }), ), ) + await waitFor(() => { + const commit = ui.commits.findLast( + (item) => item.kind === "tool" && item.partID === "question-race-tool-1" && item.toolState === "completed", + ) + return commit ? true : undefined + }) pending.resolve(ok([request])) await Bun.sleep(50) @@ -756,7 +1059,7 @@ describe("run stream transport", () => { }) test("respects the includeFiles flag when building prompt payloads", async () => { - const src = feed() + const src = eventFeed() const ui = footer() const seen: unknown[] = [] const file: RunFilePart = { @@ -818,7 +1121,7 @@ describe("run stream transport", () => { }) test("falls back to session status polling when idle events are missing", async () => { - const src = feed() + const src = eventFeed() const ui = footer() let busy = true const transport = await createSessionTransport({ @@ -858,7 +1161,7 @@ describe("run stream transport", () => { }) test("flushes interrupted output when the active turn aborts", async () => { - const src = feed() + const src = eventFeed() const seen = defer() const ui = footer((commit) => { if (commit.kind === "assistant" && commit.phase === "progress") { @@ -927,7 +1230,7 @@ describe("run stream transport", () => { }) test("closes an active turn without rejecting it", async () => { - const src = feed() + const src = eventFeed() const ui = footer() const ready = defer() let aborted = false @@ -982,11 +1285,11 @@ describe("run stream transport", () => { const transport = await createSessionTransport({ sdk: sdk({ - subscribe: () => - sse( - (async function* (): AsyncGenerator { + globalEvent: () => + globalSse( + (async function* (): AsyncGenerator { await ready.promise - yield busy() + yield globalEvent(busy()) throw new Error("boom") })(), ), @@ -1018,8 +1321,56 @@ describe("run stream transport", () => { } }) + test("rejects the active turn when the backing instance is disposed", async () => { + const ui = footer() + const ready = defer() + + const transport = await createSessionTransport({ + sdk: sdk({ + globalEvent: () => + globalSse( + (async function* (): AsyncGenerator { + await ready.promise + yield globalEvent({ + id: "evt-disposed", + type: "server.instance.disposed", + properties: { + directory: "/tmp", + }, + }) + })(), + ), + promptAsync: async () => { + ready.resolve() + return ok(undefined) + }, + status: async () => ok({}), + }), + directory: "/tmp", + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + await expect( + transport.runPromptTurn({ + agent: undefined, + model: undefined, + variant: undefined, + prompt: { text: "hello", parts: [] }, + files: [], + includeFiles: false, + }), + ).rejects.toThrow("instance disposed") + } finally { + await transport.close() + } + }) + test("rejects concurrent turns", async () => { - const src = feed() + const src = eventFeed() const ui = footer() const transport = await createSessionTransport({ sdk: sdk({ diff --git a/packages/opencode/test/cli/run/subagent-data.test.ts b/packages/opencode/test/cli/run/subagent-data.test.ts index 8d53c5485d..8d2dad365b 100644 --- a/packages/opencode/test/cli/run/subagent-data.test.ts +++ b/packages/opencode/test/cli/run/subagent-data.test.ts @@ -2,6 +2,7 @@ import { describe, expect, test } from "bun:test" import type { Event } from "@opencode-ai/sdk/v2" import { entryBody } from "@/cli/cmd/run/entry.body" import { + bootstrapSubagentCalls, bootstrapSubagentData, clearFinishedSubagents, createSubagentData, @@ -10,6 +11,7 @@ import { } from "@/cli/cmd/run/subagent-data" type SessionMessage = Parameters[0]["messages"][number] +type ChildMessage = Parameters[0]["messages"][number] function visible(commits: Array[0]>) { return commits.flatMap((item) => { @@ -120,6 +122,65 @@ function question(id: string, sessionID: string) { } } +function childMessage(input: { + messageID: string + sessionID: string + role: "user" | "assistant" + parts: ChildMessage["parts"] +}) { + if (input.role === "user") { + return { + info: { + id: input.messageID, + sessionID: input.sessionID, + role: "user", + time: { + created: 1, + }, + agent: "test", + model: { + providerID: "openai", + modelID: "gpt-5", + }, + }, + parts: input.parts, + } satisfies ChildMessage + } + + return { + info: { + id: input.messageID, + sessionID: input.sessionID, + role: "assistant", + time: { + created: 2, + completed: 3, + }, + parentID: "msg-user-1", + providerID: "openai", + modelID: "gpt-5", + mode: "default", + agent: "explore", + path: { + cwd: "/tmp", + root: "/tmp", + }, + cost: 0, + tokens: { + input: 1, + output: 1, + reasoning: 0, + cache: { + read: 0, + write: 0, + }, + }, + finish: "stop", + }, + parts: input.parts, + } satisfies ChildMessage +} + describe("run subagent data", () => { test("bootstraps tabs and child blockers from parent task parts", () => { const data = createSubagentData() @@ -309,6 +370,73 @@ describe("run subagent data", () => { expect(snapshot.questions).toEqual([]) }) + test("replays bootstrapped child session messages into inspector commits", () => { + const data = createSubagentData() + + bootstrapSubagentData({ + data, + messages: [taskMessage("child-1", "completed")], + children: [{ id: "child-1" }], + permissions: [], + questions: [], + }) + + expect( + bootstrapSubagentCalls({ + data, + sessionID: "child-1", + messages: [ + childMessage({ + messageID: "msg-user-1", + sessionID: "child-1", + role: "user", + parts: [ + { + id: "txt-user-1", + messageID: "msg-user-1", + sessionID: "child-1", + type: "text", + text: "Inspect footer tabs", + time: { start: 1, end: 1 }, + }, + ], + }), + childMessage({ + messageID: "msg-assistant-1", + sessionID: "child-1", + role: "assistant", + parts: [ + { + id: "reason-1", + messageID: "msg-assistant-1", + sessionID: "child-1", + type: "reasoning", + text: "planning next steps", + time: { start: 2, end: 2 }, + }, + { + id: "txt-1", + messageID: "msg-assistant-1", + sessionID: "child-1", + type: "text", + text: "hello world", + time: { start: 2, end: 3 }, + }, + ], + }), + ], + thinking: true, + limits: {}, + }), + ).toBe(true) + + expect(visible(snapshotSubagentData(data).details["child-1"]?.commits ?? [])).toEqual([ + "› Inspect footer tabs", + "_Thinking:_ planning next steps", + "hello world", + ]) + }) + test("clears finished tabs on the next parent prompt", () => { const data = createSubagentData()