import { Buffer } from "node:buffer" import { Effect, Schema, Stream } from "effect" import * as Sse from "effect/unstable/encoding/Sse" import { Headers, HttpClientRequest } from "effect/unstable/http" import { InvalidProviderOutputReason, InvalidRequestReason, LLMError, type ContentPart, type LLMRequest, type MediaPart, type ToolResultPart, } from "../schema" export const Json = Schema.fromJsonString(Schema.Unknown) export const decodeJson = Schema.decodeUnknownSync(Json) export const encodeJson = Schema.encodeSync(Json) export const JsonObject = Schema.Record(Schema.String, Schema.Unknown) export const optionalArray = (schema: S) => Schema.optional(Schema.Array(schema)) export const optionalNull = (schema: S) => Schema.optional(Schema.NullOr(schema)) /** * Plain-record narrowing. Excludes arrays so routes checking nested JSON * Schema fragments don't accidentally treat a tuple as a key/value bag. */ export const isRecord = (value: unknown): value is Record => typeof value === "object" && value !== null && !Array.isArray(value) /** * Streaming tool-call accumulator. Adapters that build a tool call across * multiple `tool-input-delta` chunks store the partial JSON input string here * and finalize it with `parseToolInput` once the call completes. */ export interface ToolAccumulator { readonly id: string readonly name: string readonly input: string } /** * `Usage.totalTokens` policy shared by every route. Honors a provider- * supplied total; otherwise falls back to `inputTokens + outputTokens` only * when at least one is defined. Returns `undefined` when neither input nor * output is known so routes don't publish a misleading `0`. * * Under the additive `LLM.Usage` contract, `inputTokens` and `outputTokens` * are the non-cached input and visible output only. The provider-supplied * `total` is the source of truth when present; the computed fallback * under-counts cache and reasoning by design and exists mainly so * Anthropic-style providers (which don't surface a total) still get a * sensible aggregate on the input + output axes. */ export const totalTokens = ( inputTokens: number | undefined, outputTokens: number | undefined, total: number | undefined, ) => { if (total !== undefined) return total if (inputTokens === undefined && outputTokens === undefined) return undefined return (inputTokens ?? 0) + (outputTokens ?? 0) } /** * Subtract `subtrahend` from `total`, clamping to zero if the provider * reports a non-sensical breakdown (e.g. `cached_tokens > prompt_tokens`). * Used by protocol mappers when deriving a non-overlapping breakdown field * from a provider's inclusive total — `nonCachedInputTokens` from * `inputTokens - cacheReadInputTokens - cacheWriteInputTokens`. * * If `total` is `undefined`, returns `undefined` (we don't fabricate * counts). If `subtrahend` is `undefined`, returns `total` unchanged. The * provider-native breakdown stays available on `Usage.native` for debugging. */ export const subtractTokens = (total: number | undefined, subtrahend: number | undefined): number | undefined => { if (total === undefined) return undefined if (subtrahend === undefined) return total return Math.max(0, total - subtrahend) } /** * Sum a list of optional token counts, returning `undefined` only when * every value is `undefined` (so we don't fabricate a `0`). Used by * protocol mappers to derive the inclusive `inputTokens` total from a * provider that natively reports a non-overlapping breakdown * (e.g. Anthropic, whose `input_tokens` is already non-cached only). */ export const sumTokens = (...values: ReadonlyArray): number | undefined => { if (values.every((value) => value === undefined)) return undefined return values.reduce((acc, value) => acc + (value ?? 0), 0) } export const eventError = (route: string, message: string, raw?: string) => new LLMError({ module: "ProviderShared", method: "stream", reason: new InvalidProviderOutputReason({ route, message, raw }), }) export const parseJson = (route: string, input: string, message: string) => Effect.try({ try: () => decodeJson(input), catch: () => eventError(route, message, input), }) /** * Join the `text` field of a list of parts with newlines. Used by routes * that flatten system / message content arrays into a single provider string * (OpenAI Chat `system` content, OpenAI Responses `system` content, Gemini * `systemInstruction.parts[].text`). */ export const joinText = (parts: ReadonlyArray<{ readonly text: string }>) => parts.map((part) => part.text).join("\n") /** * Parse the streamed JSON input of a tool call. Treats an empty string as * `"{}"` — providers occasionally finish a tool call without ever emitting * input deltas (e.g. zero-arg tools). The error message is uniform across * routes: `Invalid JSON input for tool call `. */ export const parseToolInput = (route: string, name: string, raw: string) => parseJson(route, raw || "{}", `Invalid JSON input for ${route} tool call ${name}`) /** * Encode a `MediaPart`'s raw bytes for inclusion in a JSON request body. * `data: string` is assumed to already be base64 (matches caller convention * across Gemini / Bedrock); `data: Uint8Array` is base64-encoded here. Used * by every route that supports image / document inputs. */ export const mediaBytes = (part: MediaPart) => typeof part.data === "string" ? part.data : Buffer.from(part.data).toString("base64") export const trimBaseUrl = (value: string) => value.replace(/\/+$/, "") export const toolResultText = (part: ToolResultPart) => { if (part.result.type === "text" || part.result.type === "error") return String(part.result.value) return encodeJson(part.result.value) } export const errorText = (error: unknown) => { if (error instanceof Error) return error.message if (typeof error === "string") return error if (typeof error === "number" || typeof error === "boolean" || typeof error === "bigint") return String(error) if (error === null) return "null" if (error === undefined) return "undefined" return "Unknown stream error" } /** * `framing` step for Server-Sent Events. Decodes UTF-8, runs the SSE channel * decoder, and drops empty / `[DONE]` keep-alive events so the downstream * `decodeChunk` sees one JSON string per element. The SSE channel emits a * `Retry` control event on its error channel; we drop it here (we don't * implement client-driven retries) so the public error channel stays * `LLMError`. */ export const sseFraming = (bytes: Stream.Stream): Stream.Stream => bytes.pipe( Stream.decodeText(), Stream.pipeThroughChannel(Sse.decode()), Stream.catchTag("Retry", () => Stream.empty), Stream.filter((event) => event.data.length > 0 && event.data !== "[DONE]"), Stream.map((event) => event.data), ) /** * Canonical invalid-request constructor. Lift one-line `const invalid = * (message) => invalidRequest(message)` aliases out of every * route so the error constructor lives in one place. If we ever extend * `InvalidRequestReason` with route context or trace metadata, the change * lands here. */ export const invalidRequest = (message: string) => new LLMError({ module: "ProviderShared", method: "request", reason: new InvalidRequestReason({ message }), }) export const matchToolChoice = ( route: string, toolChoice: NonNullable, cases: { readonly auto: () => Auto readonly none: () => None readonly required: () => Required readonly tool: (name: string) => Tool }, ) => Effect.gen(function* () { if (toolChoice.type === "auto") return cases.auto() if (toolChoice.type === "none") return cases.none() if (toolChoice.type === "required") return cases.required() if (!toolChoice.name) return yield* invalidRequest(`${route} tool choice requires a tool name`) return cases.tool(toolChoice.name) }) type ContentType = ContentPart["type"] const formatContentTypes = (types: ReadonlyArray) => { if (types.length <= 1) return types[0] ?? "" if (types.length === 2) return `${types[0]} and ${types[1]}` return `${types.slice(0, -1).join(", ")}, and ${types.at(-1)}` } export const supportsContent = ( part: ContentPart, types: ReadonlyArray, ): part is Extract => (types as ReadonlyArray).includes(part.type) export const unsupportedContent = ( route: string, role: LLMRequest["messages"][number]["role"], types: ReadonlyArray, ) => invalidRequest(`${route} ${role} messages only support ${formatContentTypes(types)} content for now`) /** * Build a `validate` step from a Schema decoder. Replaces the per-route * lambda body `(payload) => decode(payload).pipe(Effect.mapError((e) => * invalid(e.message)))`. Any decode error is translated into * `LLMError` carrying the original parse-error message. */ export const validateWith = (decode: (input: I) => Effect.Effect) => (payload: I) => decode(payload).pipe(Effect.mapError((error) => invalidRequest(error.message))) /** * Build an HTTP POST with a JSON body. Sets `content-type: application/json` * automatically after caller-supplied headers so routes cannot accidentally * send JSON with a stale content type. The body is passed pre-encoded so * routes can choose between * `Schema.encodeSync(payload)` and `ProviderShared.encodeJson(payload)`. */ export const jsonPost = (input: { readonly url: string; readonly body: string; readonly headers?: Headers.Input }) => HttpClientRequest.post(input.url).pipe( HttpClientRequest.setHeaders(Headers.set(Headers.fromInput(input.headers), "content-type", "application/json")), HttpClientRequest.bodyText(input.body, "application/json"), ) export * as ProviderShared from "./shared"