mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-25 07:15:19 +00:00
196 lines
5.2 KiB
TypeScript
196 lines
5.2 KiB
TypeScript
import { Installation } from "@/installation"
|
|
import { Server } from "@/server/server"
|
|
import { Log } from "@/util/log"
|
|
import { Instance } from "@/project/instance"
|
|
import { InstanceBootstrap } from "@/project/bootstrap"
|
|
import { Rpc } from "@/util/rpc"
|
|
import { upgrade } from "@/cli/upgrade"
|
|
import { Config } from "@/config/config"
|
|
import { Bus } from "@/bus"
|
|
import { GlobalBus } from "@/bus/global"
|
|
import type { Event } from "@opencode-ai/sdk/v2"
|
|
import { Flag } from "@/flag/flag"
|
|
import { setTimeout as sleep } from "node:timers/promises"
|
|
import { writeHeapSnapshot } from "node:v8"
|
|
import { WorkspaceID } from "@/control-plane/schema"
|
|
import { Heap } from "@/cli/heap"
|
|
|
|
await Log.init({
|
|
print: process.argv.includes("--print-logs"),
|
|
dev: Installation.isLocal(),
|
|
level: (() => {
|
|
if (Installation.isLocal()) return "DEBUG"
|
|
return "INFO"
|
|
})(),
|
|
})
|
|
|
|
Heap.start()
|
|
|
|
process.on("unhandledRejection", (e) => {
|
|
Log.Default.error("rejection", {
|
|
e: e instanceof Error ? e.message : e,
|
|
})
|
|
})
|
|
|
|
process.on("uncaughtException", (e) => {
|
|
Log.Default.error("exception", {
|
|
e: e instanceof Error ? e.message : e,
|
|
})
|
|
})
|
|
|
|
// Subscribe to global events and forward them via RPC
|
|
GlobalBus.on("event", (event) => {
|
|
Rpc.emit("global.event", event)
|
|
})
|
|
|
|
let server: Awaited<ReturnType<typeof Server.listen>> | undefined
|
|
|
|
const eventStreams = new Map<string, AbortController>()
|
|
|
|
function startEventStream(directory: string) {
|
|
const id = crypto.randomUUID()
|
|
|
|
const abort = new AbortController()
|
|
const signal = abort.signal
|
|
|
|
eventStreams.set(id, abort)
|
|
|
|
async function run() {
|
|
while (!signal.aborted) {
|
|
const shouldReconnect = await Instance.provide({
|
|
directory,
|
|
init: InstanceBootstrap,
|
|
fn: () =>
|
|
new Promise<boolean>((resolve) => {
|
|
Rpc.emit("event", {
|
|
type: "server.connected",
|
|
properties: {},
|
|
} satisfies Event)
|
|
|
|
let settled = false
|
|
const settle = (value: boolean) => {
|
|
if (settled) return
|
|
settled = true
|
|
signal.removeEventListener("abort", onAbort)
|
|
unsub()
|
|
resolve(value)
|
|
}
|
|
|
|
const unsub = Bus.subscribeAll((event) => {
|
|
Rpc.emit("event", {
|
|
id,
|
|
event: event as Event,
|
|
})
|
|
if (event.type === Bus.InstanceDisposed.type) {
|
|
settle(true)
|
|
}
|
|
})
|
|
|
|
const onAbort = () => {
|
|
settle(false)
|
|
}
|
|
|
|
signal.addEventListener("abort", onAbort, { once: true })
|
|
}),
|
|
}).catch((error) => {
|
|
Log.Default.error("event stream subscribe error", {
|
|
error: error instanceof Error ? error.message : error,
|
|
})
|
|
return false
|
|
})
|
|
|
|
if (!shouldReconnect || signal.aborted) {
|
|
break
|
|
}
|
|
|
|
if (!signal.aborted) {
|
|
await sleep(250)
|
|
}
|
|
}
|
|
}
|
|
|
|
run().catch((error) => {
|
|
Log.Default.error("event stream error", {
|
|
error: error instanceof Error ? error.message : error,
|
|
})
|
|
})
|
|
|
|
return id
|
|
}
|
|
|
|
function stopEventStream(id: string) {
|
|
const abortController = eventStreams.get(id)
|
|
if (!abortController) return
|
|
|
|
abortController.abort()
|
|
eventStreams.delete(id)
|
|
}
|
|
|
|
export const rpc = {
|
|
async fetch(input: { url: string; method: string; headers: Record<string, string>; body?: string }) {
|
|
const headers = { ...input.headers }
|
|
const auth = getAuthorizationHeader()
|
|
if (auth && !headers["authorization"] && !headers["Authorization"]) {
|
|
headers["Authorization"] = auth
|
|
}
|
|
const request = new Request(input.url, {
|
|
method: input.method,
|
|
headers,
|
|
body: input.body,
|
|
})
|
|
const response = await Server.Default().app.fetch(request)
|
|
const body = await response.text()
|
|
return {
|
|
status: response.status,
|
|
headers: Object.fromEntries(response.headers.entries()),
|
|
body,
|
|
}
|
|
},
|
|
snapshot() {
|
|
const result = writeHeapSnapshot("server.heapsnapshot")
|
|
return result
|
|
},
|
|
async server(input: { port: number; hostname: string; mdns?: boolean; cors?: string[] }) {
|
|
if (server) await server.stop(true)
|
|
server = await Server.listen(input)
|
|
return { url: server.url.toString() }
|
|
},
|
|
async checkUpgrade(input: { directory: string }) {
|
|
await Instance.provide({
|
|
directory: input.directory,
|
|
init: InstanceBootstrap,
|
|
fn: async () => {
|
|
await upgrade().catch(() => {})
|
|
},
|
|
})
|
|
},
|
|
async reload() {
|
|
await Config.invalidate(true)
|
|
},
|
|
async subscribe(input: { directory: string | undefined }) {
|
|
return startEventStream(input.directory || process.cwd())
|
|
},
|
|
async unsubscribe(input: { id: string }) {
|
|
stopEventStream(input.id)
|
|
},
|
|
async shutdown() {
|
|
Log.Default.info("worker shutting down")
|
|
|
|
for (const id of [...eventStreams.keys()]) {
|
|
stopEventStream(id)
|
|
}
|
|
|
|
await Instance.disposeAll()
|
|
if (server) await server.stop(true)
|
|
},
|
|
}
|
|
|
|
Rpc.listen(rpc)
|
|
|
|
function getAuthorizationHeader(): string | undefined {
|
|
const password = Flag.OPENCODE_SERVER_PASSWORD
|
|
if (!password) return undefined
|
|
const username = Flag.OPENCODE_SERVER_USERNAME ?? "opencode"
|
|
return `Basic ${btoa(`${username}:${password}`)}`
|
|
}
|