Compare commits

...

1 Commits

Author SHA1 Message Date
Kit Langton
8272d68335 fix(console): harden zen stream relay diagnostics
Add stream lifecycle metrics and extract the Zen relay loop so missing bodies, decoder stalls, EOF framing, and postprocess failures are easier to diagnose. Cover the new relay behavior with focused repro tests for the streaming edge cases we suspect in production.
2026-03-26 11:24:56 -04:00
3 changed files with 370 additions and 73 deletions

View File

@@ -39,6 +39,7 @@ import { createRateLimiter } from "./rateLimiter"
import { createDataDumper } from "./dataDumper"
import { createTrialLimiter } from "./trialLimiter"
import { createStickyTracker } from "./stickyProviderTracker"
import { relay } from "./stream"
import { LiteData } from "@opencode-ai/console-core/lite.js"
import { Resource } from "@opencode-ai/console-resource"
import { i18n, type Key } from "~/i18n"
@@ -254,80 +255,41 @@ export async function handler(
const streamConverter = createStreamPartConverter(providerInfo.format, opts.format)
const usageParser = providerInfo.createUsageParser()
const binaryDecoder = providerInfo.createBinaryStreamDecoder()
const stream = new ReadableStream({
start(c) {
const reader = res.body?.getReader()
const decoder = new TextDecoder()
const encoder = new TextEncoder()
let buffer = ""
let responseLength = 0
function pump(): Promise<void> {
return (
reader?.read().then(async ({ done, value: rawValue }) => {
if (done) {
logger.metric({
response_length: responseLength,
"timestamp.last_byte": Date.now(),
})
dataDumper?.flush()
await rateLimiter?.track()
const usage = usageParser.retrieve()
if (usage) {
const usageInfo = providerInfo.normalizeUsage(usage)
const costInfo = calculateCost(modelInfo, usageInfo)
await trialLimiter?.track(usageInfo)
await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo)
await reload(billingSource, authInfo, costInfo)
const cost = calculateOccurredCost(billingSource, costInfo)
c.enqueue(encoder.encode(buildCostChunk(opts.format, cost)))
}
c.close()
return
}
if (responseLength === 0) {
const now = Date.now()
logger.metric({
time_to_first_byte: now - startTimestamp,
"timestamp.first_byte": now,
})
}
const value = binaryDecoder ? binaryDecoder(rawValue) : rawValue
if (!value) return
responseLength += value.length
buffer += decoder.decode(value, { stream: true })
dataDumper?.provideStream(buffer)
const parts = buffer.split(providerInfo.streamSeparator)
buffer = parts.pop() ?? ""
for (let part of parts) {
logger.debug("PART: " + part)
part = part.trim()
usageParser.parse(part)
if (providerInfo.format !== opts.format) {
part = streamConverter(part)
c.enqueue(encoder.encode(part + "\n\n"))
}
}
if (providerInfo.format === opts.format) {
c.enqueue(value)
}
return pump()
}) || Promise.resolve()
)
}
return pump()
const metric = (values: Record<string, unknown>) =>
logger.metric({
request: requestId,
session: sessionId,
client: ocClient,
provider: providerInfo.id,
model: modelInfo.id,
...values,
})
const stream = relay({
body: res.body,
separator: providerInfo.streamSeparator,
signal: input.request.signal,
start: startTimestamp,
same: providerInfo.format === opts.format,
binary: binaryDecoder,
parse: (part) => {
logger.debug("PART: " + part)
usageParser.parse(part)
},
convert: streamConverter,
tail: async () => {
await rateLimiter?.track()
const usage = usageParser.retrieve()
if (!usage) return
const usageInfo = providerInfo.normalizeUsage(usage)
const costInfo = calculateCost(modelInfo, usageInfo)
await trialLimiter?.track(usageInfo)
await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo)
await reload(billingSource, authInfo, costInfo)
const cost = calculateOccurredCost(billingSource, costInfo)
return buildCostChunk(opts.format, cost)
},
metric,
dump: dataDumper,
})
return new Response(stream, {
status: resStatus,

View File

@@ -0,0 +1,196 @@
const done = new Set(["[DONE]", "message_stop", "response.completed"])
type Dump = {
provideStream: (chunk: string) => void
flush: () => void
}
type Opts = {
body: ReadableStream<Uint8Array> | null | undefined
separator: string
signal: AbortSignal
start: number
same: boolean
binary?: (chunk: Uint8Array) => Uint8Array | undefined
parse: (part: string) => void
convert: (part: string) => string
tail: () => Promise<string | undefined>
metric: (values: Record<string, unknown>) => void
dump?: Dump
}
export const eventName = (part: string) => {
const line = part.split("\n", 1)[0]?.trim() ?? ""
if (line.startsWith("event:")) return line.slice(6).trim() || "message"
if (part.includes("[DONE]")) return "[DONE]"
if (line.startsWith("data:")) return "message"
return "unknown"
}
const errInfo = (err: unknown) => {
if (err instanceof Error) {
return {
"stream.error_type": err.constructor.name,
"stream.error_message": err.message,
}
}
return {
"stream.error_type": typeof err,
"stream.error_message": String(err),
}
}
const stats = (len: number, cnt: number, seen: number, buf: string, end: string | undefined, gap: number) => ({
"stream.response_length": len,
"stream.chunk_count": cnt,
"stream.event_count": seen,
"stream.pending_length": buf.length,
"stream.last_event": end,
"stream.max_gap_ms": gap || undefined,
})
export const relay = (opts: Opts) =>
new ReadableStream({
async start(c) {
let phase = "start"
let len = 0
let cnt = 0
let seen = 0
let end: string | undefined
let gap = 0
let prev: number | undefined
let completed = false
let aborted = false
let buf = ""
if (!opts.body) {
opts.metric({
"stream.event": "missing_body",
"stream.phase": phase,
})
opts.dump?.flush()
c.close()
return
}
const reader = opts.body.getReader()
const dec = new TextDecoder()
const enc = new TextEncoder()
const names: Record<string, number> = {}
const abort = () => {
aborted = true
reader.cancel().catch(() => undefined)
}
const note = (part: string) => {
const name = eventName(part)
end = name
seen += 1
names[name] = (names[name] ?? 0) + 1
if (done.has(name)) completed = true
}
opts.signal.addEventListener("abort", abort)
opts.metric({
"stream.event": "started",
})
try {
while (true) {
phase = "read"
const raw = await reader.read()
if (raw.done) break
if (len === 0) {
const now = Date.now()
opts.metric({
time_to_first_byte: now - opts.start,
"timestamp.first_byte": now,
})
}
const value = opts.binary ? opts.binary(raw.value) : raw.value
if (!value) continue
cnt += 1
len += value.length
const now = Date.now()
if (prev !== undefined) gap = Math.max(gap, now - prev)
prev = now
const text = dec.decode(value, { stream: true })
buf += text
opts.dump?.provideStream(text)
const parts = buf.split(opts.separator)
buf = parts.pop() ?? ""
for (let part of parts) {
part = part.trim()
if (!part) continue
note(part)
phase = "parse"
opts.parse(part)
if (opts.same) continue
phase = "convert"
c.enqueue(enc.encode(opts.convert(part) + "\n\n"))
}
if (opts.same) c.enqueue(value)
}
const tail = dec.decode()
if (tail) {
buf += tail
opts.dump?.provideStream(tail)
}
if (buf.trim()) {
const part = buf.trim()
note(part)
phase = "parse"
opts.parse(part)
if (!opts.same) {
phase = "convert"
c.enqueue(enc.encode(opts.convert(part) + "\n\n"))
}
buf = ""
}
opts.metric({
response_length: len,
"timestamp.last_byte": Date.now(),
})
opts.dump?.flush()
phase = "tail"
const chunk = await opts.tail()
if (chunk) c.enqueue(enc.encode(chunk))
c.close()
opts.metric({
"stream.event": "finished",
"stream.phase": "done",
"stream.duration_ms": Date.now() - opts.start,
"stream.saw_completed": completed,
"stream.events": JSON.stringify(names),
...stats(len, cnt, seen, buf, end, gap),
})
} catch (err) {
opts.metric({
"stream.event": aborted ? "aborted" : "error",
"stream.phase": phase,
"stream.duration_ms": Date.now() - opts.start,
"stream.saw_completed": completed,
"stream.events": JSON.stringify(names),
...stats(len, cnt, seen, buf, end, gap),
...errInfo(err),
})
c.error(err)
} finally {
opts.signal.removeEventListener("abort", abort)
}
},
})

View File

@@ -0,0 +1,139 @@
import { describe, expect, test } from "bun:test"
import { eventName, relay } from "../src/routes/zen/util/stream"
const enc = new TextEncoder()
const read = (stream: ReadableStream<Uint8Array>) => new Response(stream).text()
const body = (parts: string[]) =>
new ReadableStream<Uint8Array>({
async start(c) {
for (const part of parts) c.enqueue(enc.encode(part))
c.close()
},
})
describe("zen stream", () => {
test("parses known event names", () => {
expect(eventName("event: response.created\ndata: {}")).toBe("response.created")
expect(eventName('data: {"ok":true}')).toBe("message")
expect(eventName("data: [DONE]")).toBe("[DONE]")
})
test("relays split OpenAI responses and logs completion", async () => {
const seen: string[] = []
const logs: Array<Record<string, unknown>> = []
const stream = relay({
body: body([
"event: response.created\n",
'data: {"type":"response.created"}\n\n',
"event: response.completed\n",
'data: {"response":{"usage":{"input_tokens":1,"output_tokens":2}}}\n\n',
]),
separator: "\n\n",
signal: new AbortController().signal,
start: Date.now(),
same: true,
parse: (part) => {
seen.push(part)
},
convert: (part) => part,
tail: async () => undefined,
metric: (values) => logs.push(values),
})
const text = await read(stream)
expect(text).toContain("response.created")
expect(text).toContain("response.completed")
expect(seen).toHaveLength(2)
expect(logs.at(-1)?.["stream.event"]).toBe("finished")
expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true)
})
test("keeps reading when binary decoder needs another chunk", async () => {
let calls = 0
const logs: Array<Record<string, unknown>> = []
const stream = relay({
body: body(["a", "b"]),
separator: "\n\n",
signal: new AbortController().signal,
start: Date.now(),
same: true,
binary: (chunk) => {
calls += 1
if (calls === 1) return
return chunk
},
parse: () => undefined,
convert: (part) => part,
tail: async () => undefined,
metric: (values) => logs.push(values),
})
const text = await read(stream)
expect(text).toBe("b")
expect(logs.at(-1)?.["stream.event"]).toBe("finished")
})
test("flushes a final unterminated event at EOF", async () => {
const seen: string[] = []
const logs: Array<Record<string, unknown>> = []
const stream = relay({
body: body(['event: response.completed\ndata: {"response":{"usage":{"input_tokens":1}}}']),
separator: "\n\n",
signal: new AbortController().signal,
start: Date.now(),
same: true,
parse: (part) => {
seen.push(part)
},
convert: (part) => part,
tail: async () => undefined,
metric: (values) => logs.push(values),
})
const text = await read(stream)
expect(text).toContain("response.completed")
expect(seen).toHaveLength(1)
expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true)
})
test("closes cleanly when upstream body is missing", async () => {
const logs: Array<Record<string, unknown>> = []
const stream = relay({
body: null,
separator: "\n\n",
signal: new AbortController().signal,
start: Date.now(),
same: true,
parse: () => undefined,
convert: (part) => part,
tail: async () => undefined,
metric: (values) => logs.push(values),
})
expect(await read(stream)).toBe("")
expect(logs.at(-1)?.["stream.event"]).toBe("missing_body")
})
test("surfaces postprocess failures with stream metrics", async () => {
const logs: Array<Record<string, unknown>> = []
const stream = relay({
body: body(["event: response.created\ndata: {}\n\n"]),
separator: "\n\n",
signal: new AbortController().signal,
start: Date.now(),
same: true,
parse: () => undefined,
convert: (part) => part,
tail: async () => {
throw new Error("boom")
},
metric: (values) => logs.push(values),
})
await expect(read(stream)).rejects.toThrow("boom")
expect(logs.at(-1)?.["stream.event"]).toBe("error")
expect(logs.at(-1)?.["stream.phase"]).toBe("tail")
})
})