Compare commits

...

1 Commits

Author SHA1 Message Date
Dax Raad
9fee5bf807 tui: bypass local SSE event streaming in worker
Replace SDK event subscription with direct Bus.subscribeAll call.
This avoids the local HTTP roundtrip and SSE parsing overhead since
the worker runs in the same process as the server.

- Subscribe directly to Bus events instead of using SDK
- Wrap subscription in WorkspaceContext to match server behavior
- Handle reconnections when instance is disposed
- Properly handle abort signals for cleanup
2026-03-25 19:47:17 -04:00

View File

@@ -6,11 +6,14 @@ import { InstanceBootstrap } from "@/project/bootstrap"
import { Rpc } from "@/util/rpc"
import { upgrade } from "@/cli/upgrade"
import { Config } from "@/config/config"
import { Bus } from "@/bus"
import { GlobalBus } from "@/bus/global"
import { createOpencodeClient, type Event } from "@opencode-ai/sdk/v2"
import type { Event } from "@opencode-ai/sdk/v2"
import { Flag } from "@/flag/flag"
import { setTimeout as sleep } from "node:timers/promises"
import { writeHeapSnapshot } from "node:v8"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { WorkspaceID } from "@/control-plane/schema"
await Log.init({
print: process.argv.includes("--print-logs"),
@@ -50,39 +53,55 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) =>
eventStream.abort = abort
const signal = abort.signal
const fetchFn = (async (input: RequestInfo | URL, init?: RequestInit) => {
const request = new Request(input, init)
const auth = getAuthorizationHeader()
if (auth) request.headers.set("Authorization", auth)
return Server.Default().fetch(request)
}) as typeof globalThis.fetch
const sdk = createOpencodeClient({
baseUrl: "http://opencode.internal",
directory: input.directory,
experimental_workspaceID: input.workspaceID,
fetch: fetchFn,
signal,
})
const workspaceID = input.workspaceID ? WorkspaceID.make(input.workspaceID) : undefined
;(async () => {
while (!signal.aborted) {
const events = await Promise.resolve(
sdk.event.subscribe(
{},
{
signal,
},
),
).catch(() => undefined)
const shouldReconnect = await WorkspaceContext.provide({
workspaceID,
fn: () =>
Instance.provide({
directory: input.directory,
init: InstanceBootstrap,
fn: () =>
new Promise<boolean>((resolve) => {
Rpc.emit("event", {
type: "server.connected",
properties: {},
} satisfies Event)
if (!events) {
await sleep(250)
continue
}
let settled = false
const settle = (value: boolean) => {
if (settled) return
settled = true
signal.removeEventListener("abort", onAbort)
unsub()
resolve(value)
}
for await (const event of events.stream) {
Rpc.emit("event", event as Event)
const unsub = Bus.subscribeAll((event) => {
Rpc.emit("event", event as Event)
if (event.type === Bus.InstanceDisposed.type) {
settle(true)
}
})
const onAbort = () => {
settle(false)
}
signal.addEventListener("abort", onAbort, { once: true })
}),
}),
}).catch((error) => {
Log.Default.error("event stream subscribe error", {
error: error instanceof Error ? error.message : error,
})
return false
})
if (!shouldReconnect || signal.aborted) {
break
}
if (!signal.aborted) {