cli/run: switch to global event stream (#26383)

This commit is contained in:
Simon Klee
2026-05-08 19:44:31 +02:00
committed by GitHub
parent 3052a79b32
commit 4d43d584fe
5 changed files with 723 additions and 106 deletions

View File

@@ -1,9 +1,10 @@
import { afterEach, describe, expect, mock, spyOn, test } from "bun:test"
import { OpencodeClient } from "@opencode-ai/sdk/v2"
import { OpencodeClient, type GlobalEvent } from "@opencode-ai/sdk/v2"
import { createSessionTransport } from "@/cli/cmd/run/stream.transport"
import type { FooterApi, FooterEvent, RunFilePart, StreamCommit } from "@/cli/cmd/run/types"
type EventStream = Awaited<ReturnType<OpencodeClient["event"]["subscribe"]>>["stream"]
type GlobalEventStream = Awaited<ReturnType<OpencodeClient["global"]["event"]>>["stream"]
type SdkEvent = EventStream extends AsyncGenerator<infer T, unknown, unknown> ? T : never
type SessionMessage = NonNullable<Awaited<ReturnType<OpencodeClient["session"]["messages"]>>["data"]>[number]
type SessionChild = NonNullable<Awaited<ReturnType<OpencodeClient["session"]["children"]>>["data"]>[number]
@@ -81,12 +82,12 @@ function assistant(id: string) {
} satisfies SdkEvent
}
function feed() {
const list: SdkEvent[] = []
function feed<T>() {
const list: T[] = []
let done = false
let wake: (() => void) | undefined
const stream: EventStream = (async function* () {
const wrapped = (async function* () {
while (!done || list.length > 0) {
if (list.length === 0) {
await new Promise<void>((resolve) => {
@@ -105,8 +106,8 @@ function feed() {
})()
return {
stream,
push(value: SdkEvent) {
stream: wrapped,
push(value: T) {
list.push(value)
wake?.()
wake = undefined
@@ -119,6 +120,14 @@ function feed() {
}
}
function eventFeed() {
return feed<SdkEvent>()
}
function globalFeed() {
return feed<GlobalEvent>()
}
function emptyStream(): EventStream {
return (async function* (): AsyncGenerator<SdkEvent> {})()
}
@@ -136,6 +145,18 @@ function sse(stream: EventStream) {
return Promise.resolve({ stream })
}
function globalSse(stream: GlobalEventStream) {
return Promise.resolve({ stream })
}
function wrapGlobalStream(stream: EventStream): GlobalEventStream {
return (async function* () {
for await (const event of stream) {
yield globalEvent(event)
}
})()
}
function statusMap(busy: boolean): SessionStatusMap {
if (busy) {
return { "session-1": { type: "busy" } }
@@ -235,10 +256,10 @@ function completedTool(input: {
}
}
function textPart(id: string, messageID: string, text: string): TextPart {
function textPart(id: string, messageID: string, text: string, sessionID = "session-1"): TextPart {
return {
id,
sessionID: "session-1",
sessionID,
messageID,
type: "text",
text,
@@ -298,6 +319,14 @@ function child(id: string): SessionChild {
}
}
function globalEvent(payload: GlobalEvent["payload"]): GlobalEvent {
return {
directory: "/tmp",
project: "project-1",
payload,
}
}
function footer(fn?: (commit: StreamCommit) => void) {
const commits: StreamCommit[] = []
const events: FooterEvent[] = []
@@ -333,7 +362,9 @@ function footer(fn?: (commit: StreamCommit) => void) {
function sdk(
input: {
stream?: EventStream
globalStream?: GlobalEventStream
subscribe?: OpencodeClient["event"]["subscribe"]
globalEvent?: OpencodeClient["global"]["event"]
promptAsync?: OpencodeClient["session"]["promptAsync"]
status?: OpencodeClient["session"]["status"]
messages?: OpencodeClient["session"]["messages"]
@@ -345,6 +376,8 @@ function sdk(
const client = new OpencodeClient()
const subscribe: OpencodeClient["event"]["subscribe"] = input.subscribe ?? (() => sse(input.stream ?? emptyStream()))
const globalEvent: OpencodeClient["global"]["event"] =
input.globalEvent ?? (() => globalSse(input.globalStream ?? wrapGlobalStream(input.stream ?? emptyStream())))
const promptAsync: OpencodeClient["session"]["promptAsync"] = input.promptAsync ?? (() => ok(undefined))
const status: OpencodeClient["session"]["status"] = input.status ?? (() => ok({}))
const messages: OpencodeClient["session"]["messages"] = input.messages ?? (() => ok([]))
@@ -353,6 +386,7 @@ function sdk(
const questions: OpencodeClient["question"]["list"] = input.questions ?? (() => ok([]))
spyOn(client.event, "subscribe").mockImplementation(subscribe)
spyOn(client.global, "event").mockImplementation(globalEvent)
spyOn(client.session, "promptAsync").mockImplementation(promptAsync)
spyOn(client.session, "status").mockImplementation(status)
spyOn(client.session, "messages").mockImplementation(messages)
@@ -365,7 +399,7 @@ function sdk(
describe("run stream transport", () => {
test("bootstraps child tabs and resumed blocker input", async () => {
const src = feed()
const src = eventFeed()
const ui = footer()
const transport = await createSessionTransport({
sdk: sdk({
@@ -440,61 +474,67 @@ describe("run stream transport", () => {
})
try {
expect(ui.events).toContainEqual({
type: "stream.subagent",
state: {
tabs: [
expect.objectContaining({
sessionID: "child-1",
label: "Explore",
description: "Explore run folder",
status: "running",
}),
],
details: {},
permissions: [
expect.objectContaining({
id: "perm-1",
sessionID: "child-1",
metadata: {
input: {
filePath: "src/run/subagent-data.ts",
diff: "@@ -1 +1 @@",
},
},
}),
],
questions: [],
},
const boot = await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
const state = item?.type === "stream.subagent" ? item.state : undefined
return state?.tabs.some((tab) => tab.sessionID === "child-1") && state.permissions.some((req) => req.id === "perm-1")
? state
: undefined
})
expect(boot.tabs).toEqual([
expect.objectContaining({
sessionID: "child-1",
label: "Explore",
description: "Explore run folder",
status: "running",
}),
])
expect(boot.permissions).toEqual([
expect.objectContaining({
id: "perm-1",
sessionID: "child-1",
metadata: {
input: {
filePath: "src/run/subagent-data.ts",
diff: "@@ -1 +1 @@",
},
},
}),
])
transport.selectSubagent("child-1")
expect(ui.events).toContainEqual({
type: "stream.subagent",
state: {
tabs: [
const selected = await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
const state = item?.type === "stream.subagent" ? item.state : undefined
const detail = state?.details["child-1"]
return detail?.commits.some((commit) => commit.kind === "tool" && commit.tool === "edit" && commit.phase === "start")
? state
: undefined
})
expect(selected.details).toEqual({
"child-1": {
sessionID: "child-1",
commits: [
expect.objectContaining({
sessionID: "child-1",
label: "Explore",
kind: "tool",
tool: "edit",
phase: "start",
}),
],
details: {
"child-1": {
sessionID: "child-1",
commits: [],
},
},
permissions: [
expect.objectContaining({
id: "perm-1",
}),
],
questions: [],
},
})
expect(ui.events).toContainEqual({
expect(
await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.view")
return item?.type === "stream.view" && item.view.type === "permission" && item.view.request.id === "perm-1"
? item
: undefined
}),
).toEqual({
type: "stream.view",
view: {
type: "permission",
@@ -515,8 +555,265 @@ describe("run stream transport", () => {
}
})
test("bootstraps child session output before selection", async () => {
const ui = footer()
const transport = await createSessionTransport({
sdk: sdk({
messages: async ({ sessionID }) => {
if (sessionID === "session-1") {
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [
completedTool({
sessionID: "session-1",
messageID: "msg-1",
id: "task-1",
callID: "call-1",
tool: "task",
body: {
description: "Explore run.ts",
subagent_type: "explore",
},
metadata: {
sessionId: "child-1",
},
}),
],
}),
])
}
return sessionID === "child-1"
? ok([
assistantMessage({
sessionID: "child-1",
id: "msg-child-1",
parts: [textPart("txt-child-1", "msg-child-1", "subagent summary", "child-1")],
}),
])
: ok([])
},
children: async () => ok([child("child-1")]),
}),
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
return item?.type === "stream.subagent" && item.state.tabs.some((tab) => tab.sessionID === "child-1")
? item
: undefined
})
transport.selectSubagent("child-1")
expect(
await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
const detail = item?.type === "stream.subagent" ? item.state.details["child-1"] : undefined
return detail?.commits.some((commit) => commit.kind === "assistant" && commit.text === "subagent summary")
? detail
: undefined
}),
).toEqual({
sessionID: "child-1",
commits: [
expect.objectContaining({
kind: "assistant",
text: "subagent summary",
}),
],
})
} finally {
await transport.close()
}
})
test("does not block startup on child history bootstrap", async () => {
const pending = defer<Awaited<ReturnType<typeof ok<SessionMessage[]>>>>()
const ui = footer()
let transport: Awaited<ReturnType<typeof createSessionTransport>> | undefined
const task = createSessionTransport({
sdk: sdk({
messages: async ({ sessionID }) => {
if (sessionID === "session-1") {
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [
runningTool({
sessionID: "session-1",
messageID: "msg-1",
id: "task-1",
callID: "call-1",
tool: "task",
body: {
description: "Explore run.ts",
subagent_type: "explore",
},
metadata: {
sessionId: "child-1",
},
}),
],
}),
])
}
if (sessionID === "child-1") {
return pending.promise
}
return ok([])
},
children: async () => ok([child("child-1")]),
}),
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
}).then((item) => {
transport = item
return item
})
try {
const state = await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
return item?.type === "stream.subagent" && item.state.tabs.some((tab) => tab.sessionID === "child-1")
? item.state
: undefined
})
await waitFor(() => transport)
expect(state).toEqual({
tabs: [expect.objectContaining({ sessionID: "child-1", status: "running" })],
details: {},
permissions: [],
questions: [],
})
} finally {
pending.resolve(ok([]))
await task
await transport?.close()
}
})
test("streams selected subagent output from global events while it is running", async () => {
const global = globalFeed()
const ui = footer()
const transport = await createSessionTransport({
sdk: sdk({
globalStream: global.stream,
}),
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
global.push(globalEvent(assistant("msg-1")))
global.push(
globalEvent(
toolUpdated(
runningTool({
sessionID: "session-1",
messageID: "msg-1",
id: "task-1",
callID: "call-1",
tool: "task",
body: {
description: "Explore run.ts",
subagent_type: "explore",
},
metadata: {
sessionId: "child-1",
},
}),
),
),
)
await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
return item?.type === "stream.subagent" && item.state.tabs.some((tab) => tab.sessionID === "child-1")
? item
: undefined
})
transport.selectSubagent("child-1")
global.push(
globalEvent({
id: "evt-child-message",
type: "message.updated",
properties: {
sessionID: "child-1",
info: assistantMessage({
sessionID: "child-1",
id: "msg-child-1",
parts: [],
}).info,
},
}),
)
global.push(globalEvent(textUpdated(textPart("txt-child-1", "msg-child-1", "hello", "child-1"))))
expect(
await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
const detail = item?.type === "stream.subagent" ? item.state.details["child-1"] : undefined
return detail?.commits.some((commit) => commit.kind === "assistant" && commit.text === "hello")
? detail
: undefined
}),
).toEqual({
sessionID: "child-1",
commits: [
expect.objectContaining({
kind: "assistant",
text: "hello",
}),
],
})
global.push(globalEvent(textUpdated(textPart("txt-child-1", "msg-child-1", "hello world", "child-1"))))
expect(
await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
const detail = item?.type === "stream.subagent" ? item.state.details["child-1"] : undefined
return detail?.commits.some((commit) => commit.kind === "assistant" && commit.text === "hello world")
? detail
: undefined
}, 2_000),
).toEqual({
sessionID: "child-1",
commits: [
expect.objectContaining({
kind: "assistant",
text: "hello world",
}),
],
})
} finally {
global.close()
await transport.close()
}
})
test("recovers pending questions from question.list when question.asked is missed", async () => {
const src = feed()
const src = eventFeed()
const ui = footer()
let questionCalls = 0
const request = {
@@ -639,7 +936,7 @@ describe("run stream transport", () => {
})
test("does not resurrect questions if question.list resolves after tool completion", async () => {
const src = feed()
const src = eventFeed()
const ui = footer()
const started = defer()
const request = {
@@ -736,6 +1033,12 @@ describe("run stream transport", () => {
}),
),
)
await waitFor(() => {
const commit = ui.commits.findLast(
(item) => item.kind === "tool" && item.partID === "question-race-tool-1" && item.toolState === "completed",
)
return commit ? true : undefined
})
pending.resolve(ok([request]))
await Bun.sleep(50)
@@ -756,7 +1059,7 @@ describe("run stream transport", () => {
})
test("respects the includeFiles flag when building prompt payloads", async () => {
const src = feed()
const src = eventFeed()
const ui = footer()
const seen: unknown[] = []
const file: RunFilePart = {
@@ -818,7 +1121,7 @@ describe("run stream transport", () => {
})
test("falls back to session status polling when idle events are missing", async () => {
const src = feed()
const src = eventFeed()
const ui = footer()
let busy = true
const transport = await createSessionTransport({
@@ -858,7 +1161,7 @@ describe("run stream transport", () => {
})
test("flushes interrupted output when the active turn aborts", async () => {
const src = feed()
const src = eventFeed()
const seen = defer()
const ui = footer((commit) => {
if (commit.kind === "assistant" && commit.phase === "progress") {
@@ -927,7 +1230,7 @@ describe("run stream transport", () => {
})
test("closes an active turn without rejecting it", async () => {
const src = feed()
const src = eventFeed()
const ui = footer()
const ready = defer()
let aborted = false
@@ -982,11 +1285,11 @@ describe("run stream transport", () => {
const transport = await createSessionTransport({
sdk: sdk({
subscribe: () =>
sse(
(async function* (): AsyncGenerator<SdkEvent> {
globalEvent: () =>
globalSse(
(async function* (): AsyncGenerator<GlobalEvent> {
await ready.promise
yield busy()
yield globalEvent(busy())
throw new Error("boom")
})(),
),
@@ -1018,8 +1321,56 @@ describe("run stream transport", () => {
}
})
test("rejects the active turn when the backing instance is disposed", async () => {
const ui = footer()
const ready = defer()
const transport = await createSessionTransport({
sdk: sdk({
globalEvent: () =>
globalSse(
(async function* (): AsyncGenerator<GlobalEvent> {
await ready.promise
yield globalEvent({
id: "evt-disposed",
type: "server.instance.disposed",
properties: {
directory: "/tmp",
},
})
})(),
),
promptAsync: async () => {
ready.resolve()
return ok(undefined)
},
status: async () => ok({}),
}),
directory: "/tmp",
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
await expect(
transport.runPromptTurn({
agent: undefined,
model: undefined,
variant: undefined,
prompt: { text: "hello", parts: [] },
files: [],
includeFiles: false,
}),
).rejects.toThrow("instance disposed")
} finally {
await transport.close()
}
})
test("rejects concurrent turns", async () => {
const src = feed()
const src = eventFeed()
const ui = footer()
const transport = await createSessionTransport({
sdk: sdk({

View File

@@ -2,6 +2,7 @@ import { describe, expect, test } from "bun:test"
import type { Event } from "@opencode-ai/sdk/v2"
import { entryBody } from "@/cli/cmd/run/entry.body"
import {
bootstrapSubagentCalls,
bootstrapSubagentData,
clearFinishedSubagents,
createSubagentData,
@@ -10,6 +11,7 @@ import {
} from "@/cli/cmd/run/subagent-data"
type SessionMessage = Parameters<typeof bootstrapSubagentData>[0]["messages"][number]
type ChildMessage = Parameters<typeof bootstrapSubagentCalls>[0]["messages"][number]
function visible(commits: Array<Parameters<typeof entryBody>[0]>) {
return commits.flatMap((item) => {
@@ -120,6 +122,65 @@ function question(id: string, sessionID: string) {
}
}
function childMessage(input: {
messageID: string
sessionID: string
role: "user" | "assistant"
parts: ChildMessage["parts"]
}) {
if (input.role === "user") {
return {
info: {
id: input.messageID,
sessionID: input.sessionID,
role: "user",
time: {
created: 1,
},
agent: "test",
model: {
providerID: "openai",
modelID: "gpt-5",
},
},
parts: input.parts,
} satisfies ChildMessage
}
return {
info: {
id: input.messageID,
sessionID: input.sessionID,
role: "assistant",
time: {
created: 2,
completed: 3,
},
parentID: "msg-user-1",
providerID: "openai",
modelID: "gpt-5",
mode: "default",
agent: "explore",
path: {
cwd: "/tmp",
root: "/tmp",
},
cost: 0,
tokens: {
input: 1,
output: 1,
reasoning: 0,
cache: {
read: 0,
write: 0,
},
},
finish: "stop",
},
parts: input.parts,
} satisfies ChildMessage
}
describe("run subagent data", () => {
test("bootstraps tabs and child blockers from parent task parts", () => {
const data = createSubagentData()
@@ -309,6 +370,73 @@ describe("run subagent data", () => {
expect(snapshot.questions).toEqual([])
})
test("replays bootstrapped child session messages into inspector commits", () => {
const data = createSubagentData()
bootstrapSubagentData({
data,
messages: [taskMessage("child-1", "completed")],
children: [{ id: "child-1" }],
permissions: [],
questions: [],
})
expect(
bootstrapSubagentCalls({
data,
sessionID: "child-1",
messages: [
childMessage({
messageID: "msg-user-1",
sessionID: "child-1",
role: "user",
parts: [
{
id: "txt-user-1",
messageID: "msg-user-1",
sessionID: "child-1",
type: "text",
text: "Inspect footer tabs",
time: { start: 1, end: 1 },
},
],
}),
childMessage({
messageID: "msg-assistant-1",
sessionID: "child-1",
role: "assistant",
parts: [
{
id: "reason-1",
messageID: "msg-assistant-1",
sessionID: "child-1",
type: "reasoning",
text: "planning next steps",
time: { start: 2, end: 2 },
},
{
id: "txt-1",
messageID: "msg-assistant-1",
sessionID: "child-1",
type: "text",
text: "hello world",
time: { start: 2, end: 3 },
},
],
}),
],
thinking: true,
limits: {},
}),
).toBe(true)
expect(visible(snapshotSubagentData(data).details["child-1"]?.commits ?? [])).toEqual([
" Inspect footer tabs",
"_Thinking:_ planning next steps",
"hello world",
])
})
test("clears finished tabs on the next parent prompt", () => {
const data = createSubagentData()