Compare commits

...

4 Commits

Author SHA1 Message Date
Kit Langton
6e68a927f2 Merge branch 'dev' into kit/dev-memory-observe 2026-04-02 15:48:23 -04:00
Kit Langton
4b45a3d368 Merge branch 'dev' into kit/dev-memory-observe 2026-04-02 14:59:36 -04:00
Kit Langton
946d2eecbe refactor: clarify queue tracking names 2026-04-02 13:12:19 -04:00
Kit Langton
76b90e3fb1 debug: add dev memory telemetry scaffolding 2026-04-02 13:08:23 -04:00
6 changed files with 212 additions and 13 deletions

View File

@@ -16,6 +16,7 @@ import { win32DisableProcessedInput, win32InstallCtrlCGuard } from "./win32"
import { TuiConfig } from "@/config/tui"
import { Instance } from "@/project/instance"
import { writeHeapSnapshot } from "v8"
import { Memory } from "@/debug/memory"
declare global {
const OPENCODE_WORKER_PATH: string
@@ -129,6 +130,7 @@ export const TuiThreadCommand = cmd({
return
}
const cwd = Filesystem.resolve(process.cwd())
const stopMem = Memory.start("tui")
const worker = new Worker(file, {
env: Object.fromEntries(
@@ -161,6 +163,7 @@ export const TuiThreadCommand = cmd({
process.off("uncaughtException", error)
process.off("unhandledRejection", error)
process.off("SIGUSR2", reload)
stopMem()
await withTimeout(client.call("shutdown", undefined), 5000).catch((error) => {
Log.Default.warn("worker shutdown failed", {
error: errorMessage(error),

View File

@@ -13,6 +13,7 @@ import { Flag } from "@/flag/flag"
import { setTimeout as sleep } from "node:timers/promises"
import { writeHeapSnapshot } from "node:v8"
import { WorkspaceID } from "@/control-plane/schema"
import { Memory } from "@/debug/memory"
await Log.init({
print: process.argv.includes("--print-logs"),
@@ -35,6 +36,8 @@ process.on("uncaughtException", (e) => {
})
})
const stopMem = Memory.start("server")
// Subscribe to global events and forward them via RPC
GlobalBus.on("event", (event) => {
Rpc.emit("global.event", event)
@@ -156,6 +159,7 @@ export const rpc = {
},
async shutdown() {
Log.Default.info("worker shutting down")
stopMem()
if (eventStream.abort) eventStream.abort.abort()
await Instance.disposeAll()
if (server) await server.stop(true)

View File

@@ -0,0 +1,122 @@
import { Global } from "@/global"
import { Installation } from "@/installation"
import { stats } from "@/util/queue"
import { Log } from "@/util/log"
import { Filesystem } from "@/util/filesystem"
import { appendFile, mkdir } from "fs/promises"
import { writeHeapSnapshot } from "node:v8"
import path from "path"
const log = Log.create({ service: "memory" })
const root = process.env.OPENCODE_DEBUG_DIR ?? path.join(Global.Path.state, "debug")
const file = path.join(root, "memory.jsonl")
const snap = path.join(root, "snapshots")
export namespace Memory {
export function start(name: string) {
if (!Installation.isLocal()) return () => {}
let busy = false
let last = 0
const every = num("OPENCODE_DEBUG_MEMORY_INTERVAL_MS") ?? 10_000
const limit = (num("OPENCODE_DEBUG_MEMORY_RSS_MB") ?? 1_500) * 1024 * 1024
const cool = num("OPENCODE_DEBUG_MEMORY_COOLDOWN_MS") ?? 5 * 60 * 1000
const tick = async (kind: "start" | "sample") => {
if (busy) return
busy = true
try {
const now = Date.now()
const mem = process.memoryUsage()
const q = stats()
.filter((item) => item.size > 0 || item.max > 0)
.sort((a, b) => b.size - a.size || b.max - a.max)
.slice(0, 10)
const row = {
kind: "sample",
time: new Date(now).toISOString(),
name,
pid: process.pid,
rss: mem.rss,
heap: mem.heapUsed,
ext: mem.external,
buf: mem.arrayBuffers,
queues: q,
}
await line(row)
if (kind === "start" || mem.rss < limit || now - last < cool) return
last = now
const tag = stamp(now)
const heap = path.join(snap, `${tag}-${name}.heapsnapshot`)
await mkdir(snap, { recursive: true })
writeHeapSnapshot(heap)
const meta = {
kind: "snapshot",
time: row.time,
name,
pid: process.pid,
trigger: {
type: "rss",
limit,
value: mem.rss,
},
memory: mem,
queues: q,
}
await Filesystem.writeJson(path.join(snap, `${tag}-${name}.json`), meta)
await line({ ...meta, heap })
log.warn("memory snapshot written", {
name,
pid: process.pid,
rss_mb: mb(mem.rss),
limit_mb: mb(limit),
heap,
})
} catch (err) {
log.warn("memory monitor failed", {
name,
error: err instanceof Error ? err.message : String(err),
})
} finally {
busy = false
}
}
const timer = setInterval(() => {
void tick("sample")
}, every)
timer.unref?.()
void tick("start")
return () => {
clearInterval(timer)
}
}
}
async function line(input: unknown) {
await mkdir(root, { recursive: true })
await appendFile(file, JSON.stringify(input) + "\n")
}
function num(key: string) {
const value = process.env[key]
if (!value) return undefined
const parsed = Number(value)
return Number.isFinite(parsed) && parsed > 0 ? parsed : undefined
}
function mb(value: number) {
return Math.round((value / 1024 / 1024) * 10) / 10
}
function stamp(now: number) {
return new Date(now).toISOString().replaceAll(":", "-").replaceAll(".", "-")
}

View File

@@ -32,8 +32,8 @@ export const EventRoutes = () =>
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
let done = false
const q = new AsyncQueue<string | null>({ name: "sse:event" })
let closed = false
q.push(
JSON.stringify({
@@ -53,11 +53,12 @@ export const EventRoutes = () =>
}, 10_000)
const stop = () => {
if (done) return
done = true
if (closed) return
closed = true
clearInterval(heartbeat)
unsub()
q.push(null)
q.untrack()
log.info("event disconnected")
}

View File

@@ -17,10 +17,10 @@ const log = Log.create({ service: "server" })
export const GlobalDisposedEvent = BusEvent.define("global.disposed", z.object({}))
async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>) => () => void) {
async function streamEvents(c: Context, name: string, subscribe: (q: AsyncQueue<string | null>) => () => void) {
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
let done = false
const q = new AsyncQueue<string | null>({ name })
let closed = false
q.push(
JSON.stringify({
@@ -44,11 +44,12 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
}, 10_000)
const stop = () => {
if (done) return
done = true
if (closed) return
closed = true
clearInterval(heartbeat)
unsub()
q.push(null)
q.untrack()
log.info("global event disconnected")
}
@@ -122,7 +123,7 @@ export const GlobalRoutes = lazy(() =>
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamEvents(c, (q) => {
return streamEvents(c, "sse:global", (q) => {
async function handler(event: any) {
q.push(JSON.stringify(event))
}
@@ -161,7 +162,7 @@ export const GlobalRoutes = lazy(() =>
c.header("Cache-Control", "no-cache, no-transform")
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamEvents(c, (q) => {
return streamEvents(c, "sse:sync", (q) => {
return SyncEvent.subscribeAll(({ def, event }) => {
// TODO: don't pass def, just pass the type (and it should
// be versioned)

View File

@@ -1,21 +1,89 @@
type Stat = {
id: number
name: string
size: number
max: number
push: number
pull: number
wait: number
}
const all = new Map<number, Stat>()
let next = 0
export function stats() {
return [...all.values()].map((item) => ({ ...item }))
}
export class AsyncQueue<T> implements AsyncIterable<T> {
private queue: T[] = []
private resolvers: ((value: T) => void)[] = []
private id: number | undefined
constructor(input?: { name?: string }) {
if (!input?.name) return
this.id = ++next
all.set(this.id, {
id: this.id,
name: input.name,
size: 0,
max: 0,
push: 0,
pull: 0,
wait: 0,
})
}
push(item: T) {
const itemStat = this.item()
if (itemStat) itemStat.push++
const resolve = this.resolvers.shift()
if (resolve) resolve(item)
else this.queue.push(item)
this.sync()
}
async next(): Promise<T> {
if (this.queue.length > 0) return this.queue.shift()!
return new Promise((resolve) => this.resolvers.push(resolve))
if (this.queue.length > 0) {
const value = this.queue.shift()!
const itemStat = this.item()
if (itemStat) itemStat.pull++
this.sync()
return value
}
return new Promise((resolve) => {
this.resolvers.push((value) => {
const itemStat = this.item()
if (itemStat) itemStat.pull++
this.sync()
resolve(value)
})
this.sync()
})
}
untrack() {
if (this.id === undefined) return
all.delete(this.id)
}
async *[Symbol.asyncIterator]() {
while (true) yield await this.next()
}
private item() {
if (this.id === undefined) return
return all.get(this.id)
}
private sync() {
const itemStat = this.item()
if (!itemStat) return
itemStat.size = this.queue.length
itemStat.max = Math.max(itemStat.max, itemStat.size)
itemStat.wait = this.resolvers.length
}
}
export async function work<T>(concurrency: number, items: T[], fn: (item: T) => Promise<void>) {