This commit is contained in:
Dax Raad
2026-01-09 16:31:28 -05:00
parent 7df36cf0fa
commit 67ad2e3ae5
6 changed files with 194 additions and 49 deletions

View File

@@ -97,7 +97,16 @@ async function getTerminalBackgroundColor(): Promise<"dark" | "light"> {
})
}
export function tui(input: { url: string; args: Args; directory?: string; onExit?: () => Promise<void> }) {
import type { EventSource } from "./context/sdk"
export function tui(input: {
url: string
args: Args
directory?: string
fetch?: typeof fetch
events?: EventSource
onExit?: () => Promise<void>
}) {
// promise to prevent immediate exit
return new Promise<void>(async (resolve) => {
const mode = await getTerminalBackgroundColor()
@@ -117,7 +126,12 @@ export function tui(input: { url: string; args: Args; directory?: string; onExit
<KVProvider>
<ToastProvider>
<RouteProvider>
<SDKProvider url={input.url} directory={input.directory}>
<SDKProvider
url={input.url}
directory={input.directory}
fetch={input.fetch}
events={input.events}
>
<SyncProvider>
<ThemeProvider mode={mode}>
<LocalProvider>

View File

@@ -3,21 +3,66 @@ import { createSimpleContext } from "./helper"
import { createGlobalEmitter } from "@solid-primitives/event-bus"
import { batch, onCleanup, onMount } from "solid-js"
export type EventSource = {
on: (handler: (event: Event) => void) => () => void
}
export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
name: "SDK",
init: (props: { url: string; directory?: string }) => {
init: (props: { url: string; directory?: string; fetch?: typeof fetch; events?: EventSource }) => {
const abort = new AbortController()
const sdk = createOpencodeClient({
baseUrl: props.url,
signal: abort.signal,
directory: props.directory,
fetch: props.fetch,
})
const emitter = createGlobalEmitter<{
[key in Event["type"]]: Extract<Event, { type: key }>
}>()
let queue: Event[] = []
let timer: Timer | undefined
let last = 0
const flush = () => {
if (queue.length === 0) return
const events = queue
queue = []
timer = undefined
last = Date.now()
// Batch all event emissions so all store updates result in a single render
batch(() => {
for (const event of events) {
emitter.emit(event.type, event)
}
})
}
const handleEvent = (event: Event) => {
queue.push(event)
const elapsed = Date.now() - last
if (timer) return
// If we just flushed recently (within 16ms), batch this with future events
// Otherwise, process immediately to avoid latency
if (elapsed < 16) {
timer = setTimeout(flush, 16)
return
}
flush()
}
onMount(async () => {
// If an event source is provided, use it instead of SSE
if (props.events) {
const unsub = props.events.on(handleEvent)
onCleanup(unsub)
return
}
// Fall back to SSE
while (true) {
if (abort.signal.aborted) break
const events = await sdk.event.subscribe(
@@ -26,36 +71,9 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
signal: abort.signal,
},
)
let queue: Event[] = []
let timer: Timer | undefined
let last = 0
const flush = () => {
if (queue.length === 0) return
const events = queue
queue = []
timer = undefined
last = Date.now()
// Batch all event emissions so all store updates result in a single render
batch(() => {
for (const event of events) {
emitter.emit(event.type, event)
}
})
}
for await (const event of events.stream) {
queue.push(event)
const elapsed = Date.now() - last
if (timer) continue
// If we just flushed recently (within 16ms), batch this with future events
// Otherwise, process immediately to avoid latency
if (elapsed < 16) {
timer = setTimeout(flush, 16)
continue
}
flush()
handleEvent(event)
}
// Flush any remaining events
@@ -68,6 +86,7 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
onCleanup(() => {
abort.abort()
if (timer) clearTimeout(timer)
})
return { client: sdk, event: emitter, url: props.url }

View File

@@ -7,11 +7,39 @@ import { UI } from "@/cli/ui"
import { iife } from "@/util/iife"
import { Log } from "@/util/log"
import { withNetworkOptions, resolveNetworkOptions } from "@/cli/network"
import type { Event } from "@opencode-ai/sdk/v2"
import type { EventSource } from "./context/sdk"
declare global {
const OPENCODE_WORKER_PATH: string
}
type RpcClient = ReturnType<typeof Rpc.client<typeof rpc>>
function createWorkerFetch(client: RpcClient): typeof fetch {
const fn = async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
const request = new Request(input, init)
const body = request.body ? await request.text() : undefined
const result = await client.call("fetch", {
url: request.url,
method: request.method,
headers: Object.fromEntries(request.headers.entries()),
body,
})
return new Response(result.body, {
status: result.status,
headers: result.headers,
})
}
return fn as typeof fetch
}
function createEventSource(client: RpcClient): EventSource {
return {
on: (handler) => client.on<Event>("event", handler),
}
}
export const TuiThreadCommand = cmd({
command: "$0 [project]",
describe: "start opencode tui",
@@ -80,16 +108,45 @@ export const TuiThreadCommand = cmd({
process.on("SIGUSR2", async () => {
await client.call("reload", undefined)
})
const opts = await resolveNetworkOptions(args)
const server = await client.call("server", opts)
const prompt = await iife(async () => {
const piped = !process.stdin.isTTY ? await Bun.stdin.text() : undefined
if (!args.prompt) return piped
return piped ? piped + "\n" + args.prompt : args.prompt
})
// Check if server should be started (port or hostname explicitly set in CLI or config)
const networkOpts = await resolveNetworkOptions(args)
const shouldStartServer =
process.argv.includes("--port") ||
process.argv.includes("--hostname") ||
process.argv.includes("--mdns") ||
networkOpts.mdns ||
networkOpts.port !== 0 ||
networkOpts.hostname !== "127.0.0.1"
// Subscribe to events from worker
await client.call("subscribe", { directory: cwd })
let url: string
let customFetch: typeof fetch | undefined
let events: EventSource | undefined
if (shouldStartServer) {
// Start HTTP server for external access
const server = await client.call("server", networkOpts)
url = server.url
} else {
// Use direct RPC communication (no HTTP)
url = "http://opencode.internal"
customFetch = createWorkerFetch(client)
events = createEventSource(client)
}
const tuiPromise = tui({
url: server.url,
url,
fetch: customFetch,
events,
args: {
continue: args.continue,
sessionID: args.session,

View File

@@ -5,8 +5,10 @@ import { Instance } from "@/project/instance"
import { InstanceBootstrap } from "@/project/bootstrap"
import { Rpc } from "@/util/rpc"
import { upgrade } from "@/cli/upgrade"
import type { BunWebSocketData } from "hono/bun"
import { Config } from "@/config/config"
import { Bus } from "@/bus"
import { GlobalBus } from "@/bus/global"
import type { BunWebSocketData } from "hono/bun"
await Log.init({
print: process.argv.includes("--print-logs"),
@@ -29,20 +31,47 @@ process.on("uncaughtException", (e) => {
})
})
let server: Bun.Server<BunWebSocketData>
// Subscribe to global events and forward them via RPC
GlobalBus.on("event", (event) => {
Rpc.emit("global.event", event)
})
let server: Bun.Server<BunWebSocketData> | undefined
export const rpc = {
async server(input: { port: number; hostname: string; mdns?: boolean }) {
if (server) await server.stop(true)
try {
server = Server.listen(input)
return {
url: server.url.toString(),
}
} catch (e) {
console.error(e)
throw e
async fetch(input: { url: string; method: string; headers: Record<string, string>; body?: string }) {
const request = new Request(input.url, {
method: input.method,
headers: input.headers,
body: input.body,
})
const response = await Server.App().fetch(request)
const body = await response.text()
return {
status: response.status,
headers: Object.fromEntries(response.headers.entries()),
body,
}
},
async server(input: { port: number; hostname: string; mdns?: boolean; cors?: string[] }) {
if (server) await server.stop(true)
server = Server.listen(input)
return { url: server.url.toString() }
},
async subscribe(input: { directory: string }) {
return Instance.provide({
directory: input.directory,
init: InstanceBootstrap,
fn: async () => {
Bus.subscribeAll((event) => {
Rpc.emit("event", event)
})
// Emit connected event
Rpc.emit("event", { type: "server.connected", properties: {} })
return { subscribed: true }
},
})
},
async checkUpgrade(input: { directory: string }) {
await Instance.provide({
directory: input.directory,
@@ -59,9 +88,7 @@ export const rpc = {
async shutdown() {
Log.Default.info("worker shutting down")
await Instance.disposeAll()
// TODO: this should be awaited, but ws connections are
// causing this to hang, need to revisit this
server.stop(true)
if (server) server.stop(true)
},
}

View File

@@ -2826,6 +2826,10 @@ export namespace Server {
host: "app.opencode.ai",
},
})
response.headers.set(
"Content-Security-Policy",
"default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self' data:; connect-src 'self'",
)
return response
}) as unknown as Hono,
)

View File

@@ -13,11 +13,16 @@ export namespace Rpc {
}
}
export function emit(event: string, data: unknown) {
postMessage(JSON.stringify({ type: "rpc.event", event, data }))
}
export function client<T extends Definition>(target: {
postMessage: (data: string) => void | null
onmessage: ((this: Worker, ev: MessageEvent<any>) => any) | null
}) {
const pending = new Map<number, (result: any) => void>()
const listeners = new Map<string, Set<(data: any) => void>>()
let id = 0
target.onmessage = async (evt) => {
const parsed = JSON.parse(evt.data)
@@ -28,6 +33,14 @@ export namespace Rpc {
pending.delete(parsed.id)
}
}
if (parsed.type === "rpc.event") {
const handlers = listeners.get(parsed.event)
if (handlers) {
for (const handler of handlers) {
handler(parsed.data)
}
}
}
}
return {
call<Method extends keyof T>(method: Method, input: Parameters<T[Method]>[0]): Promise<ReturnType<T[Method]>> {
@@ -37,6 +50,17 @@ export namespace Rpc {
target.postMessage(JSON.stringify({ type: "rpc.request", method, input, id: requestId }))
})
},
on<Data>(event: string, handler: (data: Data) => void) {
let handlers = listeners.get(event)
if (!handlers) {
handlers = new Set()
listeners.set(event, handlers)
}
handlers.add(handler)
return () => {
handlers!.delete(handler)
}
},
}
}
}