From 67ad2e3ae545fe7066aff4c240625c13c8893e72 Mon Sep 17 00:00:00 2001 From: Dax Raad Date: Fri, 9 Jan 2026 16:31:28 -0500 Subject: [PATCH] sync --- packages/opencode/src/cli/cmd/tui/app.tsx | 18 ++++- .../opencode/src/cli/cmd/tui/context/sdk.tsx | 77 ++++++++++++------- packages/opencode/src/cli/cmd/tui/thread.ts | 63 ++++++++++++++- packages/opencode/src/cli/cmd/tui/worker.ts | 57 ++++++++++---- packages/opencode/src/server/server.ts | 4 + packages/opencode/src/util/rpc.ts | 24 ++++++ 6 files changed, 194 insertions(+), 49 deletions(-) diff --git a/packages/opencode/src/cli/cmd/tui/app.tsx b/packages/opencode/src/cli/cmd/tui/app.tsx index 50f4f1ae5c..aa62c6c58e 100644 --- a/packages/opencode/src/cli/cmd/tui/app.tsx +++ b/packages/opencode/src/cli/cmd/tui/app.tsx @@ -97,7 +97,16 @@ async function getTerminalBackgroundColor(): Promise<"dark" | "light"> { }) } -export function tui(input: { url: string; args: Args; directory?: string; onExit?: () => Promise }) { +import type { EventSource } from "./context/sdk" + +export function tui(input: { + url: string + args: Args + directory?: string + fetch?: typeof fetch + events?: EventSource + onExit?: () => Promise +}) { // promise to prevent immediate exit return new Promise(async (resolve) => { const mode = await getTerminalBackgroundColor() @@ -117,7 +126,12 @@ export function tui(input: { url: string; args: Args; directory?: string; onExit - + diff --git a/packages/opencode/src/cli/cmd/tui/context/sdk.tsx b/packages/opencode/src/cli/cmd/tui/context/sdk.tsx index 4bcabaeaf5..3339e7b00d 100644 --- a/packages/opencode/src/cli/cmd/tui/context/sdk.tsx +++ b/packages/opencode/src/cli/cmd/tui/context/sdk.tsx @@ -3,21 +3,66 @@ import { createSimpleContext } from "./helper" import { createGlobalEmitter } from "@solid-primitives/event-bus" import { batch, onCleanup, onMount } from "solid-js" +export type EventSource = { + on: (handler: (event: Event) => void) => () => void +} + export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ name: "SDK", - init: (props: { url: string; directory?: string }) => { + init: (props: { url: string; directory?: string; fetch?: typeof fetch; events?: EventSource }) => { const abort = new AbortController() const sdk = createOpencodeClient({ baseUrl: props.url, signal: abort.signal, directory: props.directory, + fetch: props.fetch, }) const emitter = createGlobalEmitter<{ [key in Event["type"]]: Extract }>() + let queue: Event[] = [] + let timer: Timer | undefined + let last = 0 + + const flush = () => { + if (queue.length === 0) return + const events = queue + queue = [] + timer = undefined + last = Date.now() + // Batch all event emissions so all store updates result in a single render + batch(() => { + for (const event of events) { + emitter.emit(event.type, event) + } + }) + } + + const handleEvent = (event: Event) => { + queue.push(event) + const elapsed = Date.now() - last + + if (timer) return + // If we just flushed recently (within 16ms), batch this with future events + // Otherwise, process immediately to avoid latency + if (elapsed < 16) { + timer = setTimeout(flush, 16) + return + } + flush() + } + onMount(async () => { + // If an event source is provided, use it instead of SSE + if (props.events) { + const unsub = props.events.on(handleEvent) + onCleanup(unsub) + return + } + + // Fall back to SSE while (true) { if (abort.signal.aborted) break const events = await sdk.event.subscribe( @@ -26,36 +71,9 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ signal: abort.signal, }, ) - let queue: Event[] = [] - let timer: Timer | undefined - let last = 0 - - const flush = () => { - if (queue.length === 0) return - const events = queue - queue = [] - timer = undefined - last = Date.now() - // Batch all event emissions so all store updates result in a single render - batch(() => { - for (const event of events) { - emitter.emit(event.type, event) - } - }) - } for await (const event of events.stream) { - queue.push(event) - const elapsed = Date.now() - last - - if (timer) continue - // If we just flushed recently (within 16ms), batch this with future events - // Otherwise, process immediately to avoid latency - if (elapsed < 16) { - timer = setTimeout(flush, 16) - continue - } - flush() + handleEvent(event) } // Flush any remaining events @@ -68,6 +86,7 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ onCleanup(() => { abort.abort() + if (timer) clearTimeout(timer) }) return { client: sdk, event: emitter, url: props.url } diff --git a/packages/opencode/src/cli/cmd/tui/thread.ts b/packages/opencode/src/cli/cmd/tui/thread.ts index 7b6f028fcd..b9654d3391 100644 --- a/packages/opencode/src/cli/cmd/tui/thread.ts +++ b/packages/opencode/src/cli/cmd/tui/thread.ts @@ -7,11 +7,39 @@ import { UI } from "@/cli/ui" import { iife } from "@/util/iife" import { Log } from "@/util/log" import { withNetworkOptions, resolveNetworkOptions } from "@/cli/network" +import type { Event } from "@opencode-ai/sdk/v2" +import type { EventSource } from "./context/sdk" declare global { const OPENCODE_WORKER_PATH: string } +type RpcClient = ReturnType> + +function createWorkerFetch(client: RpcClient): typeof fetch { + const fn = async (input: RequestInfo | URL, init?: RequestInit): Promise => { + const request = new Request(input, init) + const body = request.body ? await request.text() : undefined + const result = await client.call("fetch", { + url: request.url, + method: request.method, + headers: Object.fromEntries(request.headers.entries()), + body, + }) + return new Response(result.body, { + status: result.status, + headers: result.headers, + }) + } + return fn as typeof fetch +} + +function createEventSource(client: RpcClient): EventSource { + return { + on: (handler) => client.on("event", handler), + } +} + export const TuiThreadCommand = cmd({ command: "$0 [project]", describe: "start opencode tui", @@ -80,16 +108,45 @@ export const TuiThreadCommand = cmd({ process.on("SIGUSR2", async () => { await client.call("reload", undefined) }) - const opts = await resolveNetworkOptions(args) - const server = await client.call("server", opts) + const prompt = await iife(async () => { const piped = !process.stdin.isTTY ? await Bun.stdin.text() : undefined if (!args.prompt) return piped return piped ? piped + "\n" + args.prompt : args.prompt }) + // Check if server should be started (port or hostname explicitly set in CLI or config) + const networkOpts = await resolveNetworkOptions(args) + const shouldStartServer = + process.argv.includes("--port") || + process.argv.includes("--hostname") || + process.argv.includes("--mdns") || + networkOpts.mdns || + networkOpts.port !== 0 || + networkOpts.hostname !== "127.0.0.1" + + // Subscribe to events from worker + await client.call("subscribe", { directory: cwd }) + + let url: string + let customFetch: typeof fetch | undefined + let events: EventSource | undefined + + if (shouldStartServer) { + // Start HTTP server for external access + const server = await client.call("server", networkOpts) + url = server.url + } else { + // Use direct RPC communication (no HTTP) + url = "http://opencode.internal" + customFetch = createWorkerFetch(client) + events = createEventSource(client) + } + const tuiPromise = tui({ - url: server.url, + url, + fetch: customFetch, + events, args: { continue: args.continue, sessionID: args.session, diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index d32612dd59..ea88e45f1d 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -5,8 +5,10 @@ import { Instance } from "@/project/instance" import { InstanceBootstrap } from "@/project/bootstrap" import { Rpc } from "@/util/rpc" import { upgrade } from "@/cli/upgrade" -import type { BunWebSocketData } from "hono/bun" import { Config } from "@/config/config" +import { Bus } from "@/bus" +import { GlobalBus } from "@/bus/global" +import type { BunWebSocketData } from "hono/bun" await Log.init({ print: process.argv.includes("--print-logs"), @@ -29,20 +31,47 @@ process.on("uncaughtException", (e) => { }) }) -let server: Bun.Server +// Subscribe to global events and forward them via RPC +GlobalBus.on("event", (event) => { + Rpc.emit("global.event", event) +}) + +let server: Bun.Server | undefined + export const rpc = { - async server(input: { port: number; hostname: string; mdns?: boolean }) { - if (server) await server.stop(true) - try { - server = Server.listen(input) - return { - url: server.url.toString(), - } - } catch (e) { - console.error(e) - throw e + async fetch(input: { url: string; method: string; headers: Record; body?: string }) { + const request = new Request(input.url, { + method: input.method, + headers: input.headers, + body: input.body, + }) + const response = await Server.App().fetch(request) + const body = await response.text() + return { + status: response.status, + headers: Object.fromEntries(response.headers.entries()), + body, } }, + async server(input: { port: number; hostname: string; mdns?: boolean; cors?: string[] }) { + if (server) await server.stop(true) + server = Server.listen(input) + return { url: server.url.toString() } + }, + async subscribe(input: { directory: string }) { + return Instance.provide({ + directory: input.directory, + init: InstanceBootstrap, + fn: async () => { + Bus.subscribeAll((event) => { + Rpc.emit("event", event) + }) + // Emit connected event + Rpc.emit("event", { type: "server.connected", properties: {} }) + return { subscribed: true } + }, + }) + }, async checkUpgrade(input: { directory: string }) { await Instance.provide({ directory: input.directory, @@ -59,9 +88,7 @@ export const rpc = { async shutdown() { Log.Default.info("worker shutting down") await Instance.disposeAll() - // TODO: this should be awaited, but ws connections are - // causing this to hang, need to revisit this - server.stop(true) + if (server) server.stop(true) }, } diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index c7baec778c..f2545a7473 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -2826,6 +2826,10 @@ export namespace Server { host: "app.opencode.ai", }, }) + response.headers.set( + "Content-Security-Policy", + "default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self' data:; connect-src 'self'", + ) return response }) as unknown as Hono, ) diff --git a/packages/opencode/src/util/rpc.ts b/packages/opencode/src/util/rpc.ts index 57c695c480..ebd8be40e4 100644 --- a/packages/opencode/src/util/rpc.ts +++ b/packages/opencode/src/util/rpc.ts @@ -13,11 +13,16 @@ export namespace Rpc { } } + export function emit(event: string, data: unknown) { + postMessage(JSON.stringify({ type: "rpc.event", event, data })) + } + export function client(target: { postMessage: (data: string) => void | null onmessage: ((this: Worker, ev: MessageEvent) => any) | null }) { const pending = new Map void>() + const listeners = new Map void>>() let id = 0 target.onmessage = async (evt) => { const parsed = JSON.parse(evt.data) @@ -28,6 +33,14 @@ export namespace Rpc { pending.delete(parsed.id) } } + if (parsed.type === "rpc.event") { + const handlers = listeners.get(parsed.event) + if (handlers) { + for (const handler of handlers) { + handler(parsed.data) + } + } + } } return { call(method: Method, input: Parameters[0]): Promise> { @@ -37,6 +50,17 @@ export namespace Rpc { target.postMessage(JSON.stringify({ type: "rpc.request", method, input, id: requestId })) }) }, + on(event: string, handler: (data: Data) => void) { + let handlers = listeners.get(event) + if (!handlers) { + handlers = new Set() + listeners.set(event, handlers) + } + handlers.add(handler) + return () => { + handlers!.delete(handler) + } + }, } } }