tui: refactor event streaming to use SDK instead of manual RPC subscription

This commit is contained in:
Dax Raad
2026-01-11 18:53:34 -05:00
parent 0c0057a7de
commit 3a30773874
3 changed files with 125 additions and 29 deletions

View File

@@ -4,8 +4,71 @@
- AVOID unnecessary destructuring of variables. instead of doing `const { a, b }
= obj` just reference it as obj.a and obj.b. this preserves context
- AVOID `try`/`catch` where possible
- AVOID `else` statements
- AVOID using `any` type
- AVOID `let` statements
- PREFER single word variable names where possible
- Use as many bun apis as possible like Bun.file()
# Avoid let statements
we don't like let statements, especially combined with if/else statements.
prefer const
This is bad:
Good:
```ts
const foo = condition ? 1 : 2
```
Bad:
```ts
let foo
if (condition) foo = 1
else foo = 2
```
# Avoid else statements
Prefer early returns or even using `iife` to avoid else statements
Good:
```ts
function foo() {
if (condition) return 1
return 2
}
```
Bad:
```ts
function foo() {
if (condition) return 1
else return 2
}
```
# Prefer single word naming
Try your best to find a single word name for your variables, functions, etc.
Only use multiple words if you cannot.
Good:
```ts
const foo = 1
const bar = 2
const baz = 3
```
Bad:
```ts
const fooBar = 1
const barBaz = 2
const bazFoo = 3
```

View File

@@ -34,15 +34,9 @@ function createWorkerFetch(client: RpcClient): typeof fetch {
return fn as typeof fetch
}
function createEventSource(client: RpcClient, directory: string): EventSource {
function createEventSource(client: RpcClient): EventSource {
return {
on: (handler) =>
client.on<Event>("event", (event) => {
handler(event)
if (event.type === "server.instance.disposed") {
client.call("subscribe", { directory }).catch(() => {})
}
}),
on: (handler) => client.on<Event>("event", handler),
}
}
@@ -131,9 +125,6 @@ export const TuiThreadCommand = cmd({
networkOpts.port !== 0 ||
networkOpts.hostname !== "127.0.0.1"
// Subscribe to events from worker
await client.call("subscribe", { directory: cwd })
let url: string
let customFetch: typeof fetch | undefined
let events: EventSource | undefined
@@ -146,7 +137,7 @@ export const TuiThreadCommand = cmd({
// Use direct RPC communication (no HTTP)
url = "http://opencode.internal"
customFetch = createWorkerFetch(client)
events = createEventSource(client, cwd)
events = createEventSource(client)
}
const tuiPromise = tui({

View File

@@ -6,8 +6,8 @@ import { InstanceBootstrap } from "@/project/bootstrap"
import { Rpc } from "@/util/rpc"
import { upgrade } from "@/cli/upgrade"
import { Config } from "@/config/config"
import { Bus } from "@/bus"
import { GlobalBus } from "@/bus/global"
import { createOpencodeClient, type Event } from "@opencode-ai/sdk/v2"
import type { BunWebSocketData } from "hono/bun"
await Log.init({
@@ -38,6 +38,61 @@ GlobalBus.on("event", (event) => {
let server: Bun.Server<BunWebSocketData> | undefined
const eventStream = {
abort: undefined as AbortController | undefined,
}
const startEventStream = (directory: string) => {
if (eventStream.abort) eventStream.abort.abort()
const abort = new AbortController()
eventStream.abort = abort
const signal = abort.signal
const fetchFn = (async (input: RequestInfo | URL, init?: RequestInit) => {
const request = new Request(input, init)
return Server.App().fetch(request)
}) as typeof globalThis.fetch
const sdk = createOpencodeClient({
baseUrl: "http://opencode.internal",
directory,
fetch: fetchFn,
signal,
})
;(async () => {
while (!signal.aborted) {
const events = await Promise.resolve(
sdk.event.subscribe(
{},
{
signal,
},
),
).catch(() => undefined)
if (!events) {
await Bun.sleep(250)
continue
}
for await (const event of events.stream) {
Rpc.emit("event", event as Event)
}
if (!signal.aborted) {
await Bun.sleep(250)
}
}
})().catch((error) => {
Log.Default.error("event stream error", {
error: error instanceof Error ? error.message : error,
})
})
}
startEventStream(process.cwd())
export const rpc = {
async fetch(input: { url: string; method: string; headers: Record<string, string>; body?: string }) {
const request = new Request(input.url, {
@@ -58,20 +113,6 @@ export const rpc = {
server = Server.listen(input)
return { url: server.url.toString() }
},
async subscribe(input: { directory: string }) {
return Instance.provide({
directory: input.directory,
init: InstanceBootstrap,
fn: async () => {
Bus.subscribeAll((event) => {
Rpc.emit("event", event)
})
// Emit connected event
Rpc.emit("event", { type: "server.connected", properties: {} })
return { subscribed: true }
},
})
},
async checkUpgrade(input: { directory: string }) {
await Instance.provide({
directory: input.directory,
@@ -87,6 +128,7 @@ export const rpc = {
},
async shutdown() {
Log.Default.info("worker shutting down")
if (eventStream.abort) eventStream.abort.abort()
await Instance.disposeAll()
if (server) server.stop(true)
},