mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-03 04:25:10 +00:00
Compare commits
4 Commits
beta
...
kit/dev-me
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e68a927f2 | ||
|
|
4b45a3d368 | ||
|
|
946d2eecbe | ||
|
|
76b90e3fb1 |
@@ -16,6 +16,7 @@ import { win32DisableProcessedInput, win32InstallCtrlCGuard } from "./win32"
|
||||
import { TuiConfig } from "@/config/tui"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { writeHeapSnapshot } from "v8"
|
||||
import { Memory } from "@/debug/memory"
|
||||
|
||||
declare global {
|
||||
const OPENCODE_WORKER_PATH: string
|
||||
@@ -129,6 +130,7 @@ export const TuiThreadCommand = cmd({
|
||||
return
|
||||
}
|
||||
const cwd = Filesystem.resolve(process.cwd())
|
||||
const stopMem = Memory.start("tui")
|
||||
|
||||
const worker = new Worker(file, {
|
||||
env: Object.fromEntries(
|
||||
@@ -161,6 +163,7 @@ export const TuiThreadCommand = cmd({
|
||||
process.off("uncaughtException", error)
|
||||
process.off("unhandledRejection", error)
|
||||
process.off("SIGUSR2", reload)
|
||||
stopMem()
|
||||
await withTimeout(client.call("shutdown", undefined), 5000).catch((error) => {
|
||||
Log.Default.warn("worker shutdown failed", {
|
||||
error: errorMessage(error),
|
||||
|
||||
@@ -13,6 +13,7 @@ import { Flag } from "@/flag/flag"
|
||||
import { setTimeout as sleep } from "node:timers/promises"
|
||||
import { writeHeapSnapshot } from "node:v8"
|
||||
import { WorkspaceID } from "@/control-plane/schema"
|
||||
import { Memory } from "@/debug/memory"
|
||||
|
||||
await Log.init({
|
||||
print: process.argv.includes("--print-logs"),
|
||||
@@ -35,6 +36,8 @@ process.on("uncaughtException", (e) => {
|
||||
})
|
||||
})
|
||||
|
||||
const stopMem = Memory.start("server")
|
||||
|
||||
// Subscribe to global events and forward them via RPC
|
||||
GlobalBus.on("event", (event) => {
|
||||
Rpc.emit("global.event", event)
|
||||
@@ -156,6 +159,7 @@ export const rpc = {
|
||||
},
|
||||
async shutdown() {
|
||||
Log.Default.info("worker shutting down")
|
||||
stopMem()
|
||||
if (eventStream.abort) eventStream.abort.abort()
|
||||
await Instance.disposeAll()
|
||||
if (server) await server.stop(true)
|
||||
|
||||
122
packages/opencode/src/debug/memory.ts
Normal file
122
packages/opencode/src/debug/memory.ts
Normal file
@@ -0,0 +1,122 @@
|
||||
import { Global } from "@/global"
|
||||
import { Installation } from "@/installation"
|
||||
import { stats } from "@/util/queue"
|
||||
import { Log } from "@/util/log"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { appendFile, mkdir } from "fs/promises"
|
||||
import { writeHeapSnapshot } from "node:v8"
|
||||
import path from "path"
|
||||
|
||||
const log = Log.create({ service: "memory" })
|
||||
|
||||
const root = process.env.OPENCODE_DEBUG_DIR ?? path.join(Global.Path.state, "debug")
|
||||
const file = path.join(root, "memory.jsonl")
|
||||
const snap = path.join(root, "snapshots")
|
||||
|
||||
export namespace Memory {
|
||||
export function start(name: string) {
|
||||
if (!Installation.isLocal()) return () => {}
|
||||
|
||||
let busy = false
|
||||
let last = 0
|
||||
const every = num("OPENCODE_DEBUG_MEMORY_INTERVAL_MS") ?? 10_000
|
||||
const limit = (num("OPENCODE_DEBUG_MEMORY_RSS_MB") ?? 1_500) * 1024 * 1024
|
||||
const cool = num("OPENCODE_DEBUG_MEMORY_COOLDOWN_MS") ?? 5 * 60 * 1000
|
||||
|
||||
const tick = async (kind: "start" | "sample") => {
|
||||
if (busy) return
|
||||
busy = true
|
||||
|
||||
try {
|
||||
const now = Date.now()
|
||||
const mem = process.memoryUsage()
|
||||
const q = stats()
|
||||
.filter((item) => item.size > 0 || item.max > 0)
|
||||
.sort((a, b) => b.size - a.size || b.max - a.max)
|
||||
.slice(0, 10)
|
||||
const row = {
|
||||
kind: "sample",
|
||||
time: new Date(now).toISOString(),
|
||||
name,
|
||||
pid: process.pid,
|
||||
rss: mem.rss,
|
||||
heap: mem.heapUsed,
|
||||
ext: mem.external,
|
||||
buf: mem.arrayBuffers,
|
||||
queues: q,
|
||||
}
|
||||
|
||||
await line(row)
|
||||
|
||||
if (kind === "start" || mem.rss < limit || now - last < cool) return
|
||||
|
||||
last = now
|
||||
const tag = stamp(now)
|
||||
const heap = path.join(snap, `${tag}-${name}.heapsnapshot`)
|
||||
await mkdir(snap, { recursive: true })
|
||||
writeHeapSnapshot(heap)
|
||||
|
||||
const meta = {
|
||||
kind: "snapshot",
|
||||
time: row.time,
|
||||
name,
|
||||
pid: process.pid,
|
||||
trigger: {
|
||||
type: "rss",
|
||||
limit,
|
||||
value: mem.rss,
|
||||
},
|
||||
memory: mem,
|
||||
queues: q,
|
||||
}
|
||||
|
||||
await Filesystem.writeJson(path.join(snap, `${tag}-${name}.json`), meta)
|
||||
await line({ ...meta, heap })
|
||||
log.warn("memory snapshot written", {
|
||||
name,
|
||||
pid: process.pid,
|
||||
rss_mb: mb(mem.rss),
|
||||
limit_mb: mb(limit),
|
||||
heap,
|
||||
})
|
||||
} catch (err) {
|
||||
log.warn("memory monitor failed", {
|
||||
name,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
} finally {
|
||||
busy = false
|
||||
}
|
||||
}
|
||||
|
||||
const timer = setInterval(() => {
|
||||
void tick("sample")
|
||||
}, every)
|
||||
timer.unref?.()
|
||||
void tick("start")
|
||||
|
||||
return () => {
|
||||
clearInterval(timer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function line(input: unknown) {
|
||||
await mkdir(root, { recursive: true })
|
||||
await appendFile(file, JSON.stringify(input) + "\n")
|
||||
}
|
||||
|
||||
function num(key: string) {
|
||||
const value = process.env[key]
|
||||
if (!value) return undefined
|
||||
const parsed = Number(value)
|
||||
return Number.isFinite(parsed) && parsed > 0 ? parsed : undefined
|
||||
}
|
||||
|
||||
function mb(value: number) {
|
||||
return Math.round((value / 1024 / 1024) * 10) / 10
|
||||
}
|
||||
|
||||
function stamp(now: number) {
|
||||
return new Date(now).toISOString().replaceAll(":", "-").replaceAll(".", "-")
|
||||
}
|
||||
@@ -32,8 +32,8 @@ export const EventRoutes = () =>
|
||||
c.header("X-Accel-Buffering", "no")
|
||||
c.header("X-Content-Type-Options", "nosniff")
|
||||
return streamSSE(c, async (stream) => {
|
||||
const q = new AsyncQueue<string | null>()
|
||||
let done = false
|
||||
const q = new AsyncQueue<string | null>({ name: "sse:event" })
|
||||
let closed = false
|
||||
|
||||
q.push(
|
||||
JSON.stringify({
|
||||
@@ -53,11 +53,12 @@ export const EventRoutes = () =>
|
||||
}, 10_000)
|
||||
|
||||
const stop = () => {
|
||||
if (done) return
|
||||
done = true
|
||||
if (closed) return
|
||||
closed = true
|
||||
clearInterval(heartbeat)
|
||||
unsub()
|
||||
q.push(null)
|
||||
q.untrack()
|
||||
log.info("event disconnected")
|
||||
}
|
||||
|
||||
|
||||
@@ -17,10 +17,10 @@ const log = Log.create({ service: "server" })
|
||||
|
||||
export const GlobalDisposedEvent = BusEvent.define("global.disposed", z.object({}))
|
||||
|
||||
async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>) => () => void) {
|
||||
async function streamEvents(c: Context, name: string, subscribe: (q: AsyncQueue<string | null>) => () => void) {
|
||||
return streamSSE(c, async (stream) => {
|
||||
const q = new AsyncQueue<string | null>()
|
||||
let done = false
|
||||
const q = new AsyncQueue<string | null>({ name })
|
||||
let closed = false
|
||||
|
||||
q.push(
|
||||
JSON.stringify({
|
||||
@@ -44,11 +44,12 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
|
||||
}, 10_000)
|
||||
|
||||
const stop = () => {
|
||||
if (done) return
|
||||
done = true
|
||||
if (closed) return
|
||||
closed = true
|
||||
clearInterval(heartbeat)
|
||||
unsub()
|
||||
q.push(null)
|
||||
q.untrack()
|
||||
log.info("global event disconnected")
|
||||
}
|
||||
|
||||
@@ -122,7 +123,7 @@ export const GlobalRoutes = lazy(() =>
|
||||
c.header("X-Accel-Buffering", "no")
|
||||
c.header("X-Content-Type-Options", "nosniff")
|
||||
|
||||
return streamEvents(c, (q) => {
|
||||
return streamEvents(c, "sse:global", (q) => {
|
||||
async function handler(event: any) {
|
||||
q.push(JSON.stringify(event))
|
||||
}
|
||||
@@ -161,7 +162,7 @@ export const GlobalRoutes = lazy(() =>
|
||||
c.header("Cache-Control", "no-cache, no-transform")
|
||||
c.header("X-Accel-Buffering", "no")
|
||||
c.header("X-Content-Type-Options", "nosniff")
|
||||
return streamEvents(c, (q) => {
|
||||
return streamEvents(c, "sse:sync", (q) => {
|
||||
return SyncEvent.subscribeAll(({ def, event }) => {
|
||||
// TODO: don't pass def, just pass the type (and it should
|
||||
// be versioned)
|
||||
|
||||
@@ -1,21 +1,89 @@
|
||||
type Stat = {
|
||||
id: number
|
||||
name: string
|
||||
size: number
|
||||
max: number
|
||||
push: number
|
||||
pull: number
|
||||
wait: number
|
||||
}
|
||||
|
||||
const all = new Map<number, Stat>()
|
||||
let next = 0
|
||||
|
||||
export function stats() {
|
||||
return [...all.values()].map((item) => ({ ...item }))
|
||||
}
|
||||
|
||||
export class AsyncQueue<T> implements AsyncIterable<T> {
|
||||
private queue: T[] = []
|
||||
private resolvers: ((value: T) => void)[] = []
|
||||
private id: number | undefined
|
||||
|
||||
constructor(input?: { name?: string }) {
|
||||
if (!input?.name) return
|
||||
this.id = ++next
|
||||
all.set(this.id, {
|
||||
id: this.id,
|
||||
name: input.name,
|
||||
size: 0,
|
||||
max: 0,
|
||||
push: 0,
|
||||
pull: 0,
|
||||
wait: 0,
|
||||
})
|
||||
}
|
||||
|
||||
push(item: T) {
|
||||
const itemStat = this.item()
|
||||
if (itemStat) itemStat.push++
|
||||
const resolve = this.resolvers.shift()
|
||||
if (resolve) resolve(item)
|
||||
else this.queue.push(item)
|
||||
this.sync()
|
||||
}
|
||||
|
||||
async next(): Promise<T> {
|
||||
if (this.queue.length > 0) return this.queue.shift()!
|
||||
return new Promise((resolve) => this.resolvers.push(resolve))
|
||||
if (this.queue.length > 0) {
|
||||
const value = this.queue.shift()!
|
||||
const itemStat = this.item()
|
||||
if (itemStat) itemStat.pull++
|
||||
this.sync()
|
||||
return value
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
this.resolvers.push((value) => {
|
||||
const itemStat = this.item()
|
||||
if (itemStat) itemStat.pull++
|
||||
this.sync()
|
||||
resolve(value)
|
||||
})
|
||||
this.sync()
|
||||
})
|
||||
}
|
||||
|
||||
untrack() {
|
||||
if (this.id === undefined) return
|
||||
all.delete(this.id)
|
||||
}
|
||||
|
||||
async *[Symbol.asyncIterator]() {
|
||||
while (true) yield await this.next()
|
||||
}
|
||||
|
||||
private item() {
|
||||
if (this.id === undefined) return
|
||||
return all.get(this.id)
|
||||
}
|
||||
|
||||
private sync() {
|
||||
const itemStat = this.item()
|
||||
if (!itemStat) return
|
||||
itemStat.size = this.queue.length
|
||||
itemStat.max = Math.max(itemStat.max, itemStat.size)
|
||||
itemStat.wait = this.resolvers.length
|
||||
}
|
||||
}
|
||||
|
||||
export async function work<T>(concurrency: number, items: T[], fn: (item: T) => Promise<void>) {
|
||||
|
||||
Reference in New Issue
Block a user