mirror of
https://github.com/anomalyco/opencode.git
synced 2026-03-27 00:54:56 +00:00
Compare commits
1 Commits
beta
...
kit/zen-st
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8272d68335 |
@@ -39,6 +39,7 @@ import { createRateLimiter } from "./rateLimiter"
|
||||
import { createDataDumper } from "./dataDumper"
|
||||
import { createTrialLimiter } from "./trialLimiter"
|
||||
import { createStickyTracker } from "./stickyProviderTracker"
|
||||
import { relay } from "./stream"
|
||||
import { LiteData } from "@opencode-ai/console-core/lite.js"
|
||||
import { Resource } from "@opencode-ai/console-resource"
|
||||
import { i18n, type Key } from "~/i18n"
|
||||
@@ -254,80 +255,41 @@ export async function handler(
|
||||
const streamConverter = createStreamPartConverter(providerInfo.format, opts.format)
|
||||
const usageParser = providerInfo.createUsageParser()
|
||||
const binaryDecoder = providerInfo.createBinaryStreamDecoder()
|
||||
const stream = new ReadableStream({
|
||||
start(c) {
|
||||
const reader = res.body?.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
let buffer = ""
|
||||
let responseLength = 0
|
||||
|
||||
function pump(): Promise<void> {
|
||||
return (
|
||||
reader?.read().then(async ({ done, value: rawValue }) => {
|
||||
if (done) {
|
||||
logger.metric({
|
||||
response_length: responseLength,
|
||||
"timestamp.last_byte": Date.now(),
|
||||
})
|
||||
dataDumper?.flush()
|
||||
await rateLimiter?.track()
|
||||
const usage = usageParser.retrieve()
|
||||
if (usage) {
|
||||
const usageInfo = providerInfo.normalizeUsage(usage)
|
||||
const costInfo = calculateCost(modelInfo, usageInfo)
|
||||
await trialLimiter?.track(usageInfo)
|
||||
await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo)
|
||||
await reload(billingSource, authInfo, costInfo)
|
||||
const cost = calculateOccurredCost(billingSource, costInfo)
|
||||
c.enqueue(encoder.encode(buildCostChunk(opts.format, cost)))
|
||||
}
|
||||
c.close()
|
||||
return
|
||||
}
|
||||
|
||||
if (responseLength === 0) {
|
||||
const now = Date.now()
|
||||
logger.metric({
|
||||
time_to_first_byte: now - startTimestamp,
|
||||
"timestamp.first_byte": now,
|
||||
})
|
||||
}
|
||||
|
||||
const value = binaryDecoder ? binaryDecoder(rawValue) : rawValue
|
||||
if (!value) return
|
||||
|
||||
responseLength += value.length
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
dataDumper?.provideStream(buffer)
|
||||
|
||||
const parts = buffer.split(providerInfo.streamSeparator)
|
||||
buffer = parts.pop() ?? ""
|
||||
|
||||
for (let part of parts) {
|
||||
logger.debug("PART: " + part)
|
||||
|
||||
part = part.trim()
|
||||
usageParser.parse(part)
|
||||
|
||||
if (providerInfo.format !== opts.format) {
|
||||
part = streamConverter(part)
|
||||
c.enqueue(encoder.encode(part + "\n\n"))
|
||||
}
|
||||
}
|
||||
|
||||
if (providerInfo.format === opts.format) {
|
||||
c.enqueue(value)
|
||||
}
|
||||
|
||||
return pump()
|
||||
}) || Promise.resolve()
|
||||
)
|
||||
}
|
||||
|
||||
return pump()
|
||||
const metric = (values: Record<string, unknown>) =>
|
||||
logger.metric({
|
||||
request: requestId,
|
||||
session: sessionId,
|
||||
client: ocClient,
|
||||
provider: providerInfo.id,
|
||||
model: modelInfo.id,
|
||||
...values,
|
||||
})
|
||||
const stream = relay({
|
||||
body: res.body,
|
||||
separator: providerInfo.streamSeparator,
|
||||
signal: input.request.signal,
|
||||
start: startTimestamp,
|
||||
same: providerInfo.format === opts.format,
|
||||
binary: binaryDecoder,
|
||||
parse: (part) => {
|
||||
logger.debug("PART: " + part)
|
||||
usageParser.parse(part)
|
||||
},
|
||||
convert: streamConverter,
|
||||
tail: async () => {
|
||||
await rateLimiter?.track()
|
||||
const usage = usageParser.retrieve()
|
||||
if (!usage) return
|
||||
const usageInfo = providerInfo.normalizeUsage(usage)
|
||||
const costInfo = calculateCost(modelInfo, usageInfo)
|
||||
await trialLimiter?.track(usageInfo)
|
||||
await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo)
|
||||
await reload(billingSource, authInfo, costInfo)
|
||||
const cost = calculateOccurredCost(billingSource, costInfo)
|
||||
return buildCostChunk(opts.format, cost)
|
||||
},
|
||||
metric,
|
||||
dump: dataDumper,
|
||||
})
|
||||
return new Response(stream, {
|
||||
status: resStatus,
|
||||
|
||||
196
packages/console/app/src/routes/zen/util/stream.ts
Normal file
196
packages/console/app/src/routes/zen/util/stream.ts
Normal file
@@ -0,0 +1,196 @@
|
||||
const done = new Set(["[DONE]", "message_stop", "response.completed"])
|
||||
|
||||
type Dump = {
|
||||
provideStream: (chunk: string) => void
|
||||
flush: () => void
|
||||
}
|
||||
|
||||
type Opts = {
|
||||
body: ReadableStream<Uint8Array> | null | undefined
|
||||
separator: string
|
||||
signal: AbortSignal
|
||||
start: number
|
||||
same: boolean
|
||||
binary?: (chunk: Uint8Array) => Uint8Array | undefined
|
||||
parse: (part: string) => void
|
||||
convert: (part: string) => string
|
||||
tail: () => Promise<string | undefined>
|
||||
metric: (values: Record<string, unknown>) => void
|
||||
dump?: Dump
|
||||
}
|
||||
|
||||
export const eventName = (part: string) => {
|
||||
const line = part.split("\n", 1)[0]?.trim() ?? ""
|
||||
if (line.startsWith("event:")) return line.slice(6).trim() || "message"
|
||||
if (part.includes("[DONE]")) return "[DONE]"
|
||||
if (line.startsWith("data:")) return "message"
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
const errInfo = (err: unknown) => {
|
||||
if (err instanceof Error) {
|
||||
return {
|
||||
"stream.error_type": err.constructor.name,
|
||||
"stream.error_message": err.message,
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
"stream.error_type": typeof err,
|
||||
"stream.error_message": String(err),
|
||||
}
|
||||
}
|
||||
|
||||
const stats = (len: number, cnt: number, seen: number, buf: string, end: string | undefined, gap: number) => ({
|
||||
"stream.response_length": len,
|
||||
"stream.chunk_count": cnt,
|
||||
"stream.event_count": seen,
|
||||
"stream.pending_length": buf.length,
|
||||
"stream.last_event": end,
|
||||
"stream.max_gap_ms": gap || undefined,
|
||||
})
|
||||
|
||||
export const relay = (opts: Opts) =>
|
||||
new ReadableStream({
|
||||
async start(c) {
|
||||
let phase = "start"
|
||||
let len = 0
|
||||
let cnt = 0
|
||||
let seen = 0
|
||||
let end: string | undefined
|
||||
let gap = 0
|
||||
let prev: number | undefined
|
||||
let completed = false
|
||||
let aborted = false
|
||||
let buf = ""
|
||||
|
||||
if (!opts.body) {
|
||||
opts.metric({
|
||||
"stream.event": "missing_body",
|
||||
"stream.phase": phase,
|
||||
})
|
||||
opts.dump?.flush()
|
||||
c.close()
|
||||
return
|
||||
}
|
||||
|
||||
const reader = opts.body.getReader()
|
||||
const dec = new TextDecoder()
|
||||
const enc = new TextEncoder()
|
||||
const names: Record<string, number> = {}
|
||||
|
||||
const abort = () => {
|
||||
aborted = true
|
||||
reader.cancel().catch(() => undefined)
|
||||
}
|
||||
|
||||
const note = (part: string) => {
|
||||
const name = eventName(part)
|
||||
end = name
|
||||
seen += 1
|
||||
names[name] = (names[name] ?? 0) + 1
|
||||
if (done.has(name)) completed = true
|
||||
}
|
||||
|
||||
opts.signal.addEventListener("abort", abort)
|
||||
opts.metric({
|
||||
"stream.event": "started",
|
||||
})
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
phase = "read"
|
||||
const raw = await reader.read()
|
||||
if (raw.done) break
|
||||
|
||||
if (len === 0) {
|
||||
const now = Date.now()
|
||||
opts.metric({
|
||||
time_to_first_byte: now - opts.start,
|
||||
"timestamp.first_byte": now,
|
||||
})
|
||||
}
|
||||
|
||||
const value = opts.binary ? opts.binary(raw.value) : raw.value
|
||||
if (!value) continue
|
||||
|
||||
cnt += 1
|
||||
len += value.length
|
||||
const now = Date.now()
|
||||
if (prev !== undefined) gap = Math.max(gap, now - prev)
|
||||
prev = now
|
||||
|
||||
const text = dec.decode(value, { stream: true })
|
||||
buf += text
|
||||
opts.dump?.provideStream(text)
|
||||
|
||||
const parts = buf.split(opts.separator)
|
||||
buf = parts.pop() ?? ""
|
||||
|
||||
for (let part of parts) {
|
||||
part = part.trim()
|
||||
if (!part) continue
|
||||
note(part)
|
||||
phase = "parse"
|
||||
opts.parse(part)
|
||||
if (opts.same) continue
|
||||
phase = "convert"
|
||||
c.enqueue(enc.encode(opts.convert(part) + "\n\n"))
|
||||
}
|
||||
|
||||
if (opts.same) c.enqueue(value)
|
||||
}
|
||||
|
||||
const tail = dec.decode()
|
||||
if (tail) {
|
||||
buf += tail
|
||||
opts.dump?.provideStream(tail)
|
||||
}
|
||||
|
||||
if (buf.trim()) {
|
||||
const part = buf.trim()
|
||||
note(part)
|
||||
phase = "parse"
|
||||
opts.parse(part)
|
||||
if (!opts.same) {
|
||||
phase = "convert"
|
||||
c.enqueue(enc.encode(opts.convert(part) + "\n\n"))
|
||||
}
|
||||
buf = ""
|
||||
}
|
||||
|
||||
opts.metric({
|
||||
response_length: len,
|
||||
"timestamp.last_byte": Date.now(),
|
||||
})
|
||||
opts.dump?.flush()
|
||||
|
||||
phase = "tail"
|
||||
const chunk = await opts.tail()
|
||||
if (chunk) c.enqueue(enc.encode(chunk))
|
||||
|
||||
c.close()
|
||||
opts.metric({
|
||||
"stream.event": "finished",
|
||||
"stream.phase": "done",
|
||||
"stream.duration_ms": Date.now() - opts.start,
|
||||
"stream.saw_completed": completed,
|
||||
"stream.events": JSON.stringify(names),
|
||||
...stats(len, cnt, seen, buf, end, gap),
|
||||
})
|
||||
} catch (err) {
|
||||
opts.metric({
|
||||
"stream.event": aborted ? "aborted" : "error",
|
||||
"stream.phase": phase,
|
||||
"stream.duration_ms": Date.now() - opts.start,
|
||||
"stream.saw_completed": completed,
|
||||
"stream.events": JSON.stringify(names),
|
||||
...stats(len, cnt, seen, buf, end, gap),
|
||||
...errInfo(err),
|
||||
})
|
||||
c.error(err)
|
||||
} finally {
|
||||
opts.signal.removeEventListener("abort", abort)
|
||||
}
|
||||
},
|
||||
})
|
||||
139
packages/console/app/test/zenStream.test.ts
Normal file
139
packages/console/app/test/zenStream.test.ts
Normal file
@@ -0,0 +1,139 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import { eventName, relay } from "../src/routes/zen/util/stream"
|
||||
|
||||
const enc = new TextEncoder()
|
||||
|
||||
const read = (stream: ReadableStream<Uint8Array>) => new Response(stream).text()
|
||||
|
||||
const body = (parts: string[]) =>
|
||||
new ReadableStream<Uint8Array>({
|
||||
async start(c) {
|
||||
for (const part of parts) c.enqueue(enc.encode(part))
|
||||
c.close()
|
||||
},
|
||||
})
|
||||
|
||||
describe("zen stream", () => {
|
||||
test("parses known event names", () => {
|
||||
expect(eventName("event: response.created\ndata: {}")).toBe("response.created")
|
||||
expect(eventName('data: {"ok":true}')).toBe("message")
|
||||
expect(eventName("data: [DONE]")).toBe("[DONE]")
|
||||
})
|
||||
|
||||
test("relays split OpenAI responses and logs completion", async () => {
|
||||
const seen: string[] = []
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: body([
|
||||
"event: response.created\n",
|
||||
'data: {"type":"response.created"}\n\n',
|
||||
"event: response.completed\n",
|
||||
'data: {"response":{"usage":{"input_tokens":1,"output_tokens":2}}}\n\n',
|
||||
]),
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
parse: (part) => {
|
||||
seen.push(part)
|
||||
},
|
||||
convert: (part) => part,
|
||||
tail: async () => undefined,
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
const text = await read(stream)
|
||||
expect(text).toContain("response.created")
|
||||
expect(text).toContain("response.completed")
|
||||
expect(seen).toHaveLength(2)
|
||||
expect(logs.at(-1)?.["stream.event"]).toBe("finished")
|
||||
expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true)
|
||||
})
|
||||
|
||||
test("keeps reading when binary decoder needs another chunk", async () => {
|
||||
let calls = 0
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: body(["a", "b"]),
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
binary: (chunk) => {
|
||||
calls += 1
|
||||
if (calls === 1) return
|
||||
return chunk
|
||||
},
|
||||
parse: () => undefined,
|
||||
convert: (part) => part,
|
||||
tail: async () => undefined,
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
const text = await read(stream)
|
||||
expect(text).toBe("b")
|
||||
expect(logs.at(-1)?.["stream.event"]).toBe("finished")
|
||||
})
|
||||
|
||||
test("flushes a final unterminated event at EOF", async () => {
|
||||
const seen: string[] = []
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: body(['event: response.completed\ndata: {"response":{"usage":{"input_tokens":1}}}']),
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
parse: (part) => {
|
||||
seen.push(part)
|
||||
},
|
||||
convert: (part) => part,
|
||||
tail: async () => undefined,
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
const text = await read(stream)
|
||||
expect(text).toContain("response.completed")
|
||||
expect(seen).toHaveLength(1)
|
||||
expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true)
|
||||
})
|
||||
|
||||
test("closes cleanly when upstream body is missing", async () => {
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: null,
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
parse: () => undefined,
|
||||
convert: (part) => part,
|
||||
tail: async () => undefined,
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
expect(await read(stream)).toBe("")
|
||||
expect(logs.at(-1)?.["stream.event"]).toBe("missing_body")
|
||||
})
|
||||
|
||||
test("surfaces postprocess failures with stream metrics", async () => {
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: body(["event: response.created\ndata: {}\n\n"]),
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
parse: () => undefined,
|
||||
convert: (part) => part,
|
||||
tail: async () => {
|
||||
throw new Error("boom")
|
||||
},
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
await expect(read(stream)).rejects.toThrow("boom")
|
||||
expect(logs.at(-1)?.["stream.event"]).toBe("error")
|
||||
expect(logs.at(-1)?.["stream.phase"]).toBe("tail")
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user