Add native LLM core foundation (#24712)

This commit is contained in:
Kit Langton
2026-05-08 16:56:20 -04:00
committed by GitHub
parent dc7d665e94
commit 5bb7b23440
144 changed files with 17052 additions and 2 deletions

View File

@@ -0,0 +1,592 @@
import { Effect, Schema } from "effect"
import { Route } from "../route/client"
import { Auth } from "../route/auth"
import { Endpoint } from "../route/endpoint"
import { Framing } from "../route/framing"
import { Protocol } from "../route/protocol"
import {
Usage,
type CacheHint,
type FinishReason,
type LLMEvent,
type LLMRequest,
type ProviderMetadata,
type ToolCallPart,
type ToolDefinition,
type ToolResultPart,
} from "../schema"
import { JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared"
import { ToolStream } from "./utils/tool-stream"
const ADAPTER = "anthropic-messages"
export const DEFAULT_BASE_URL = "https://api.anthropic.com/v1"
export const PATH = "/messages"
// =============================================================================
// Request Body Schema
// =============================================================================
const AnthropicCacheControl = Schema.Struct({ type: Schema.tag("ephemeral") })
const AnthropicTextBlock = Schema.Struct({
type: Schema.tag("text"),
text: Schema.String,
cache_control: Schema.optional(AnthropicCacheControl),
})
type AnthropicTextBlock = Schema.Schema.Type<typeof AnthropicTextBlock>
const AnthropicThinkingBlock = Schema.Struct({
type: Schema.tag("thinking"),
thinking: Schema.String,
signature: Schema.optional(Schema.String),
cache_control: Schema.optional(AnthropicCacheControl),
})
const AnthropicToolUseBlock = Schema.Struct({
type: Schema.tag("tool_use"),
id: Schema.String,
name: Schema.String,
input: Schema.Unknown,
cache_control: Schema.optional(AnthropicCacheControl),
})
type AnthropicToolUseBlock = Schema.Schema.Type<typeof AnthropicToolUseBlock>
const AnthropicServerToolUseBlock = Schema.Struct({
type: Schema.tag("server_tool_use"),
id: Schema.String,
name: Schema.String,
input: Schema.Unknown,
cache_control: Schema.optional(AnthropicCacheControl),
})
type AnthropicServerToolUseBlock = Schema.Schema.Type<typeof AnthropicServerToolUseBlock>
// Server tool result blocks: web_search_tool_result, code_execution_tool_result,
// and web_fetch_tool_result. The provider executes the tool and inlines the
// structured result into the assistant turn — there is no client tool_result
// round-trip. We round-trip the structured `content` payload as opaque JSON so
// the next request can echo it back when continuing the conversation.
const AnthropicServerToolResultType = Schema.Literals([
"web_search_tool_result",
"code_execution_tool_result",
"web_fetch_tool_result",
])
type AnthropicServerToolResultType = Schema.Schema.Type<typeof AnthropicServerToolResultType>
const AnthropicServerToolResultBlock = Schema.Struct({
type: AnthropicServerToolResultType,
tool_use_id: Schema.String,
content: Schema.Unknown,
cache_control: Schema.optional(AnthropicCacheControl),
})
type AnthropicServerToolResultBlock = Schema.Schema.Type<typeof AnthropicServerToolResultBlock>
const AnthropicToolResultBlock = Schema.Struct({
type: Schema.tag("tool_result"),
tool_use_id: Schema.String,
content: Schema.String,
is_error: Schema.optional(Schema.Boolean),
cache_control: Schema.optional(AnthropicCacheControl),
})
const AnthropicUserBlock = Schema.Union([AnthropicTextBlock, AnthropicToolResultBlock])
const AnthropicAssistantBlock = Schema.Union([
AnthropicTextBlock,
AnthropicThinkingBlock,
AnthropicToolUseBlock,
AnthropicServerToolUseBlock,
AnthropicServerToolResultBlock,
])
type AnthropicAssistantBlock = Schema.Schema.Type<typeof AnthropicAssistantBlock>
type AnthropicToolResultBlock = Schema.Schema.Type<typeof AnthropicToolResultBlock>
const AnthropicMessage = Schema.Union([
Schema.Struct({ role: Schema.Literal("user"), content: Schema.Array(AnthropicUserBlock) }),
Schema.Struct({ role: Schema.Literal("assistant"), content: Schema.Array(AnthropicAssistantBlock) }),
]).pipe(Schema.toTaggedUnion("role"))
type AnthropicMessage = Schema.Schema.Type<typeof AnthropicMessage>
const AnthropicTool = Schema.Struct({
name: Schema.String,
description: Schema.String,
input_schema: JsonObject,
cache_control: Schema.optional(AnthropicCacheControl),
})
type AnthropicTool = Schema.Schema.Type<typeof AnthropicTool>
const AnthropicToolChoice = Schema.Union([
Schema.Struct({ type: Schema.Literals(["auto", "any"]) }),
Schema.Struct({ type: Schema.tag("tool"), name: Schema.String }),
])
const AnthropicThinking = Schema.Struct({
type: Schema.tag("enabled"),
budget_tokens: Schema.Number,
})
const AnthropicBodyFields = {
model: Schema.String,
system: optionalArray(AnthropicTextBlock),
messages: Schema.Array(AnthropicMessage),
tools: optionalArray(AnthropicTool),
tool_choice: Schema.optional(AnthropicToolChoice),
stream: Schema.Literal(true),
max_tokens: Schema.Number,
temperature: Schema.optional(Schema.Number),
top_p: Schema.optional(Schema.Number),
top_k: Schema.optional(Schema.Number),
stop_sequences: optionalArray(Schema.String),
thinking: Schema.optional(AnthropicThinking),
}
const AnthropicMessagesBody = Schema.Struct(AnthropicBodyFields)
export type AnthropicMessagesBody = Schema.Schema.Type<typeof AnthropicMessagesBody>
const AnthropicUsage = Schema.Struct({
input_tokens: Schema.optional(Schema.Number),
output_tokens: Schema.optional(Schema.Number),
cache_creation_input_tokens: optionalNull(Schema.Number),
cache_read_input_tokens: optionalNull(Schema.Number),
})
type AnthropicUsage = Schema.Schema.Type<typeof AnthropicUsage>
const AnthropicStreamBlock = Schema.Struct({
type: Schema.String,
id: Schema.optional(Schema.String),
name: Schema.optional(Schema.String),
text: Schema.optional(Schema.String),
thinking: Schema.optional(Schema.String),
signature: Schema.optional(Schema.String),
input: Schema.optional(Schema.Unknown),
// *_tool_result blocks arrive whole as content_block_start (no streaming
// delta) with the structured payload in `content` and the originating
// server_tool_use id in `tool_use_id`.
tool_use_id: Schema.optional(Schema.String),
content: Schema.optional(Schema.Unknown),
})
const AnthropicStreamDelta = Schema.Struct({
type: Schema.optional(Schema.String),
text: Schema.optional(Schema.String),
thinking: Schema.optional(Schema.String),
partial_json: Schema.optional(Schema.String),
signature: Schema.optional(Schema.String),
stop_reason: optionalNull(Schema.String),
stop_sequence: optionalNull(Schema.String),
})
const AnthropicEvent = Schema.Struct({
type: Schema.String,
index: Schema.optional(Schema.Number),
message: Schema.optional(Schema.Struct({ usage: Schema.optional(AnthropicUsage) })),
content_block: Schema.optional(AnthropicStreamBlock),
delta: Schema.optional(AnthropicStreamDelta),
usage: Schema.optional(AnthropicUsage),
error: Schema.optional(Schema.Struct({ type: Schema.String, message: Schema.String })),
})
type AnthropicEvent = Schema.Schema.Type<typeof AnthropicEvent>
interface ParserState {
readonly tools: ToolStream.State<number>
readonly usage?: Usage
}
const invalid = ProviderShared.invalidRequest
// =============================================================================
// Request Lowering
// =============================================================================
const cacheControl = (cache: CacheHint | undefined) =>
cache?.type === "ephemeral" ? { type: "ephemeral" as const } : undefined
const anthropicMetadata = (metadata: Record<string, unknown>): ProviderMetadata => ({ anthropic: metadata })
const signatureFromMetadata = (metadata: ProviderMetadata | undefined): string | undefined => {
const anthropic = metadata?.anthropic
if (!ProviderShared.isRecord(anthropic)) return undefined
return typeof anthropic.signature === "string" ? anthropic.signature : undefined
}
const lowerTool = (tool: ToolDefinition): AnthropicTool => ({
name: tool.name,
description: tool.description,
input_schema: tool.inputSchema,
})
const lowerToolChoice = (toolChoice: NonNullable<LLMRequest["toolChoice"]>) =>
ProviderShared.matchToolChoice("Anthropic Messages", toolChoice, {
auto: () => ({ type: "auto" as const }),
none: () => undefined,
required: () => ({ type: "any" as const }),
tool: (name) => ({ type: "tool" as const, name }),
})
const lowerToolCall = (part: ToolCallPart): AnthropicToolUseBlock => ({
type: "tool_use",
id: part.id,
name: part.name,
input: part.input,
})
const lowerServerToolCall = (part: ToolCallPart): AnthropicServerToolUseBlock => ({
type: "server_tool_use",
id: part.id,
name: part.name,
input: part.input,
})
// Server tool result blocks are typed by name. Anthropic ships three today;
// extend this list when new server tools land. The block content is the
// structured payload returned by the provider, which we round-trip as-is.
const serverToolResultType = (name: string): AnthropicServerToolResultType | undefined => {
if (name === "web_search") return "web_search_tool_result"
if (name === "code_execution") return "code_execution_tool_result"
if (name === "web_fetch") return "web_fetch_tool_result"
return undefined
}
const lowerServerToolResult = Effect.fn("AnthropicMessages.lowerServerToolResult")(function* (part: ToolResultPart) {
const wireType = serverToolResultType(part.name)
if (!wireType)
return yield* invalid(`Anthropic Messages does not know how to round-trip server tool result for ${part.name}`)
return { type: wireType, tool_use_id: part.id, content: part.result.value } satisfies AnthropicServerToolResultBlock
})
const lowerMessages = Effect.fn("AnthropicMessages.lowerMessages")(function* (request: LLMRequest) {
const messages: AnthropicMessage[] = []
for (const message of request.messages) {
if (message.role === "user") {
const content: AnthropicTextBlock[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["text"]))
return yield* ProviderShared.unsupportedContent("Anthropic Messages", "user", ["text"])
content.push({ type: "text", text: part.text, cache_control: cacheControl(part.cache) })
}
messages.push({ role: "user", content })
continue
}
if (message.role === "assistant") {
const content: AnthropicAssistantBlock[] = []
for (const part of message.content) {
if (part.type === "text") {
content.push({ type: "text", text: part.text, cache_control: cacheControl(part.cache) })
continue
}
if (part.type === "reasoning") {
content.push({
type: "thinking",
thinking: part.text,
signature: part.encrypted ?? signatureFromMetadata(part.providerMetadata),
})
continue
}
if (part.type === "tool-call") {
content.push(part.providerExecuted ? lowerServerToolCall(part) : lowerToolCall(part))
continue
}
if (part.type === "tool-result" && part.providerExecuted) {
content.push(yield* lowerServerToolResult(part))
continue
}
return yield* invalid(
`Anthropic Messages assistant messages only support text, reasoning, and tool-call content for now`,
)
}
messages.push({ role: "assistant", content })
continue
}
const content: AnthropicToolResultBlock[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["tool-result"]))
return yield* ProviderShared.unsupportedContent("Anthropic Messages", "tool", ["tool-result"])
content.push({
type: "tool_result",
tool_use_id: part.id,
content: ProviderShared.toolResultText(part),
is_error: part.result.type === "error" ? true : undefined,
})
}
messages.push({ role: "user", content })
}
return messages
})
const anthropicOptions = (request: LLMRequest) => request.providerOptions?.anthropic
const lowerThinking = Effect.fn("AnthropicMessages.lowerThinking")(function* (request: LLMRequest) {
const thinking = anthropicOptions(request)?.thinking
if (!ProviderShared.isRecord(thinking) || thinking.type !== "enabled") return undefined
const budget =
typeof thinking.budgetTokens === "number"
? thinking.budgetTokens
: typeof thinking.budget_tokens === "number"
? thinking.budget_tokens
: undefined
if (budget === undefined) return yield* invalid("Anthropic thinking provider option requires budgetTokens")
return { type: "enabled" as const, budget_tokens: budget }
})
const fromRequest = Effect.fn("AnthropicMessages.fromRequest")(function* (request: LLMRequest) {
const toolChoice = request.toolChoice ? yield* lowerToolChoice(request.toolChoice) : undefined
const generation = request.generation
return {
model: request.model.id,
system:
request.system.length === 0
? undefined
: request.system.map((part) => ({
type: "text" as const,
text: part.text,
cache_control: cacheControl(part.cache),
})),
messages: yield* lowerMessages(request),
tools: request.tools.length === 0 || request.toolChoice?.type === "none" ? undefined : request.tools.map(lowerTool),
tool_choice: toolChoice,
stream: true as const,
max_tokens: generation?.maxTokens ?? request.model.limits.output ?? 4096,
temperature: generation?.temperature,
top_p: generation?.topP,
top_k: generation?.topK,
stop_sequences: generation?.stop,
thinking: yield* lowerThinking(request),
}
})
// =============================================================================
// Stream Parsing
// =============================================================================
const mapFinishReason = (reason: string | null | undefined): FinishReason => {
if (reason === "end_turn" || reason === "stop_sequence" || reason === "pause_turn") return "stop"
if (reason === "max_tokens") return "length"
if (reason === "tool_use") return "tool-calls"
if (reason === "refusal") return "content-filter"
return "unknown"
}
const mapUsage = (usage: AnthropicUsage | undefined): Usage | undefined => {
if (!usage) return undefined
return new Usage({
inputTokens: usage.input_tokens,
outputTokens: usage.output_tokens,
cacheReadInputTokens: usage.cache_read_input_tokens ?? undefined,
cacheWriteInputTokens: usage.cache_creation_input_tokens ?? undefined,
totalTokens: ProviderShared.totalTokens(usage.input_tokens, usage.output_tokens, undefined),
native: usage,
})
}
// Anthropic emits usage on `message_start` and again on `message_delta` — the
// final delta carries the authoritative totals. Right-biased merge: each
// field prefers `right` when defined, falls back to `left`. `totalTokens` is
// recomputed from the merged input/output to stay consistent.
const mergeUsage = (left: Usage | undefined, right: Usage | undefined) => {
if (!left) return right
if (!right) return left
const inputTokens = right.inputTokens ?? left.inputTokens
const outputTokens = right.outputTokens ?? left.outputTokens
return new Usage({
inputTokens,
outputTokens,
cacheReadInputTokens: right.cacheReadInputTokens ?? left.cacheReadInputTokens,
cacheWriteInputTokens: right.cacheWriteInputTokens ?? left.cacheWriteInputTokens,
totalTokens: ProviderShared.totalTokens(inputTokens, outputTokens, undefined),
native: { ...left.native, ...right.native },
})
}
// Server tool result blocks come whole in `content_block_start` (no streaming
// delta sequence). We convert the payload to a `tool-result` event with
// `providerExecuted: true`. The runtime appends it to the assistant message
// for round-trip; downstream consumers can inspect `result.value` for the
// structured payload.
const SERVER_TOOL_RESULT_NAMES: Record<AnthropicServerToolResultType, string> = {
web_search_tool_result: "web_search",
code_execution_tool_result: "code_execution",
web_fetch_tool_result: "web_fetch",
}
const isServerToolResultType = (type: string): type is AnthropicServerToolResultType => type in SERVER_TOOL_RESULT_NAMES
const serverToolResultEvent = (block: NonNullable<AnthropicEvent["content_block"]>): LLMEvent | undefined => {
if (!block.type || !isServerToolResultType(block.type)) return undefined
const errorPayload =
typeof block.content === "object" && block.content !== null && "type" in block.content
? String((block.content as Record<string, unknown>).type)
: ""
const isError = errorPayload.endsWith("_tool_result_error")
return {
type: "tool-result",
id: block.tool_use_id ?? "",
name: SERVER_TOOL_RESULT_NAMES[block.type],
result: isError ? { type: "error", value: block.content } : { type: "json", value: block.content },
providerExecuted: true,
providerMetadata: anthropicMetadata({ blockType: block.type }),
}
}
type StepResult = readonly [ParserState, ReadonlyArray<LLMEvent>]
const NO_EVENTS: StepResult["1"] = []
const onMessageStart = (state: ParserState, event: AnthropicEvent): StepResult => {
const usage = mapUsage(event.message?.usage)
return [usage ? { ...state, usage: mergeUsage(state.usage, usage) } : state, NO_EVENTS]
}
const onContentBlockStart = (state: ParserState, event: AnthropicEvent): StepResult => {
const block = event.content_block
if (!block) return [state, NO_EVENTS]
if ((block.type === "tool_use" || block.type === "server_tool_use") && event.index !== undefined) {
return [
{
...state,
tools: ToolStream.start(state.tools, event.index, {
id: block.id ?? String(event.index),
name: block.name ?? "",
providerExecuted: block.type === "server_tool_use",
}),
},
NO_EVENTS,
]
}
if (block.type === "text" && block.text) {
return [state, [{ type: "text-delta", text: block.text }]]
}
if (block.type === "thinking" && block.thinking) {
return [
state,
[
{
type: "reasoning-delta",
text: block.thinking,
...(block.signature ? { providerMetadata: anthropicMetadata({ signature: block.signature }) } : {}),
},
],
]
}
const result = serverToolResultEvent(block)
return [state, result ? [result] : NO_EVENTS]
}
const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(function* (
state: ParserState,
event: AnthropicEvent,
) {
const delta = event.delta
if (delta?.type === "text_delta" && delta.text) {
return [state, [{ type: "text-delta", text: delta.text }]] satisfies StepResult
}
if (delta?.type === "thinking_delta" && delta.thinking) {
return [state, [{ type: "reasoning-delta", text: delta.thinking }]] satisfies StepResult
}
if (delta?.type === "signature_delta" && delta.signature) {
return [
state,
[{ type: "reasoning-delta", text: "", providerMetadata: anthropicMetadata({ signature: delta.signature }) }],
] satisfies StepResult
}
if (delta?.type === "input_json_delta" && event.index !== undefined) {
if (!delta.partial_json) return [state, NO_EVENTS] satisfies StepResult
const result = ToolStream.appendExisting(
ADAPTER,
state.tools,
event.index,
delta.partial_json,
"Anthropic Messages tool argument delta is missing its tool call",
)
if (ToolStream.isError(result)) return yield* result
return [{ ...state, tools: result.tools }, result.event ? [result.event] : NO_EVENTS] satisfies StepResult
}
return [state, NO_EVENTS] satisfies StepResult
})
const onContentBlockStop = Effect.fn("AnthropicMessages.onContentBlockStop")(function* (
state: ParserState,
event: AnthropicEvent,
) {
if (event.index === undefined) return [state, NO_EVENTS] satisfies StepResult
const result = yield* ToolStream.finish(ADAPTER, state.tools, event.index)
return [{ ...state, tools: result.tools }, result.event ? [result.event] : NO_EVENTS] satisfies StepResult
})
const onMessageDelta = (state: ParserState, event: AnthropicEvent): StepResult => {
const usage = mergeUsage(state.usage, mapUsage(event.usage))
return [
{ ...state, usage },
[
{
type: "request-finish",
reason: mapFinishReason(event.delta?.stop_reason),
usage,
...(event.delta?.stop_sequence
? { providerMetadata: anthropicMetadata({ stopSequence: event.delta.stop_sequence }) }
: {}),
},
],
]
}
const onError = (state: ParserState, event: AnthropicEvent): StepResult => [
state,
[{ type: "provider-error", message: event.error?.message ?? "Anthropic Messages stream error" }],
]
const step = (state: ParserState, event: AnthropicEvent) => {
if (event.type === "message_start") return Effect.succeed(onMessageStart(state, event))
if (event.type === "content_block_start") return Effect.succeed(onContentBlockStart(state, event))
if (event.type === "content_block_delta") return onContentBlockDelta(state, event)
if (event.type === "content_block_stop") return onContentBlockStop(state, event)
if (event.type === "message_delta") return Effect.succeed(onMessageDelta(state, event))
if (event.type === "error") return Effect.succeed(onError(state, event))
return Effect.succeed<StepResult>([state, NO_EVENTS])
}
// =============================================================================
// Protocol And Anthropic Route
// =============================================================================
/**
* The Anthropic Messages protocol — request body construction, body schema,
* and the streaming-event state machine. Used by native Anthropic Cloud and
* (once registered) Vertex Anthropic / Bedrock-hosted Anthropic passthrough.
*/
export const protocol = Protocol.make({
id: ADAPTER,
body: {
schema: AnthropicMessagesBody,
from: fromRequest,
},
stream: {
event: Protocol.jsonEvent(AnthropicEvent),
initial: () => ({ tools: ToolStream.empty<number>() }),
step,
},
})
export const route = Route.make({
id: ADAPTER,
protocol,
endpoint: Endpoint.path(PATH),
auth: Auth.apiKeyHeader("x-api-key"),
framing: Framing.sse,
headers: () => ({ "anthropic-version": "2023-06-01" }),
})
// =============================================================================
// Model Helper
// =============================================================================
export const model = Route.model(route, {
provider: "anthropic",
baseURL: DEFAULT_BASE_URL,
})
export * as AnthropicMessages from "./anthropic-messages"

View File

@@ -0,0 +1,531 @@
import { Effect, Schema } from "effect"
import { Route, type RouteModelInput } from "../route/client"
import { Endpoint } from "../route/endpoint"
import { Protocol } from "../route/protocol"
import {
Usage,
type CacheHint,
type FinishReason,
type LLMEvent,
type LLMRequest,
type ToolCallPart,
type ToolDefinition,
type ToolResultPart,
} from "../schema"
import { BedrockEventStream } from "./bedrock-event-stream"
import { JsonObject, optionalArray, ProviderShared } from "./shared"
import { BedrockAuth, type Credentials as BedrockCredentials } from "./utils/bedrock-auth"
import { BedrockCache } from "./utils/bedrock-cache"
import { BedrockMedia } from "./utils/bedrock-media"
import { ToolStream } from "./utils/tool-stream"
const ADAPTER = "bedrock-converse"
export type { Credentials as BedrockCredentials } from "./utils/bedrock-auth"
// =============================================================================
// Public Model Input
// =============================================================================
export type BedrockConverseModelInput = RouteModelInput & {
/**
* Bearer API key (Bedrock's newer API key auth). Sets the `Authorization`
* header and bypasses SigV4 signing. Mutually exclusive with `credentials`.
*/
readonly apiKey?: string
/**
* AWS credentials for SigV4 signing. The route signs each request at
* `toHttp` time using `aws4fetch`. Mutually exclusive with `apiKey`.
*/
readonly credentials?: BedrockCredentials
readonly headers?: Record<string, string>
}
// =============================================================================
// Request Body Schema
// =============================================================================
const BedrockTextBlock = Schema.Struct({
text: Schema.String,
})
type BedrockTextBlock = Schema.Schema.Type<typeof BedrockTextBlock>
const BedrockToolUseBlock = Schema.Struct({
toolUse: Schema.Struct({
toolUseId: Schema.String,
name: Schema.String,
input: Schema.Unknown,
}),
})
type BedrockToolUseBlock = Schema.Schema.Type<typeof BedrockToolUseBlock>
const BedrockToolResultContentItem = Schema.Union([
Schema.Struct({ text: Schema.String }),
Schema.Struct({ json: Schema.Unknown }),
])
const BedrockToolResultBlock = Schema.Struct({
toolResult: Schema.Struct({
toolUseId: Schema.String,
content: Schema.Array(BedrockToolResultContentItem),
status: Schema.optional(Schema.Literals(["success", "error"])),
}),
})
type BedrockToolResultBlock = Schema.Schema.Type<typeof BedrockToolResultBlock>
const BedrockReasoningBlock = Schema.Struct({
reasoningContent: Schema.Struct({
reasoningText: Schema.optional(
Schema.Struct({
text: Schema.String,
signature: Schema.optional(Schema.String),
}),
),
}),
})
const BedrockUserBlock = Schema.Union([
BedrockTextBlock,
BedrockMedia.ImageBlock,
BedrockMedia.DocumentBlock,
BedrockToolResultBlock,
BedrockCache.CachePointBlock,
])
type BedrockUserBlock = Schema.Schema.Type<typeof BedrockUserBlock>
const BedrockAssistantBlock = Schema.Union([
BedrockTextBlock,
BedrockReasoningBlock,
BedrockToolUseBlock,
BedrockCache.CachePointBlock,
])
type BedrockAssistantBlock = Schema.Schema.Type<typeof BedrockAssistantBlock>
const BedrockMessage = Schema.Union([
Schema.Struct({ role: Schema.Literal("user"), content: Schema.Array(BedrockUserBlock) }),
Schema.Struct({ role: Schema.Literal("assistant"), content: Schema.Array(BedrockAssistantBlock) }),
]).pipe(Schema.toTaggedUnion("role"))
type BedrockMessage = Schema.Schema.Type<typeof BedrockMessage>
const BedrockSystemBlock = Schema.Union([BedrockTextBlock, BedrockCache.CachePointBlock])
type BedrockSystemBlock = Schema.Schema.Type<typeof BedrockSystemBlock>
const BedrockTool = Schema.Struct({
toolSpec: Schema.Struct({
name: Schema.String,
description: Schema.String,
inputSchema: Schema.Struct({
json: JsonObject,
}),
}),
})
type BedrockTool = Schema.Schema.Type<typeof BedrockTool>
const BedrockToolChoice = Schema.Union([
Schema.Struct({ auto: Schema.Struct({}) }),
Schema.Struct({ any: Schema.Struct({}) }),
Schema.Struct({ tool: Schema.Struct({ name: Schema.String }) }),
])
const BedrockBodyFields = {
modelId: Schema.String,
messages: Schema.Array(BedrockMessage),
system: optionalArray(BedrockSystemBlock),
inferenceConfig: Schema.optional(
Schema.Struct({
maxTokens: Schema.optional(Schema.Number),
temperature: Schema.optional(Schema.Number),
topP: Schema.optional(Schema.Number),
stopSequences: optionalArray(Schema.String),
}),
),
toolConfig: Schema.optional(
Schema.Struct({
tools: Schema.Array(BedrockTool),
toolChoice: Schema.optional(BedrockToolChoice),
}),
),
additionalModelRequestFields: Schema.optional(JsonObject),
}
const BedrockConverseBody = Schema.Struct(BedrockBodyFields)
export type BedrockConverseBody = Schema.Schema.Type<typeof BedrockConverseBody>
const BedrockUsageSchema = Schema.Struct({
inputTokens: Schema.optional(Schema.Number),
outputTokens: Schema.optional(Schema.Number),
totalTokens: Schema.optional(Schema.Number),
cacheReadInputTokens: Schema.optional(Schema.Number),
cacheWriteInputTokens: Schema.optional(Schema.Number),
})
type BedrockUsageSchema = Schema.Schema.Type<typeof BedrockUsageSchema>
// Streaming event shape — the AWS event stream wraps each JSON payload by its
// `:event-type` header (e.g. `messageStart`, `contentBlockDelta`). We
// reconstruct that wrapping in `decodeFrames` below so the event schema can
// stay a plain discriminated record.
const BedrockEvent = Schema.Struct({
messageStart: Schema.optional(Schema.Struct({ role: Schema.String })),
contentBlockStart: Schema.optional(
Schema.Struct({
contentBlockIndex: Schema.Number,
start: Schema.optional(
Schema.Struct({
toolUse: Schema.optional(Schema.Struct({ toolUseId: Schema.String, name: Schema.String })),
}),
),
}),
),
contentBlockDelta: Schema.optional(
Schema.Struct({
contentBlockIndex: Schema.Number,
delta: Schema.optional(
Schema.Struct({
text: Schema.optional(Schema.String),
toolUse: Schema.optional(Schema.Struct({ input: Schema.String })),
reasoningContent: Schema.optional(
Schema.Struct({
text: Schema.optional(Schema.String),
signature: Schema.optional(Schema.String),
}),
),
}),
),
}),
),
contentBlockStop: Schema.optional(Schema.Struct({ contentBlockIndex: Schema.Number })),
messageStop: Schema.optional(
Schema.Struct({
stopReason: Schema.String,
additionalModelResponseFields: Schema.optional(Schema.Unknown),
}),
),
metadata: Schema.optional(
Schema.Struct({
usage: Schema.optional(BedrockUsageSchema),
metrics: Schema.optional(Schema.Unknown),
}),
),
internalServerException: Schema.optional(Schema.Struct({ message: Schema.String })),
modelStreamErrorException: Schema.optional(Schema.Struct({ message: Schema.String })),
validationException: Schema.optional(Schema.Struct({ message: Schema.String })),
throttlingException: Schema.optional(Schema.Struct({ message: Schema.String })),
serviceUnavailableException: Schema.optional(Schema.Struct({ message: Schema.String })),
})
type BedrockEvent = Schema.Schema.Type<typeof BedrockEvent>
// =============================================================================
// Request Lowering
// =============================================================================
const lowerTool = (tool: ToolDefinition): BedrockTool => ({
toolSpec: {
name: tool.name,
description: tool.description,
inputSchema: { json: tool.inputSchema },
},
})
const textWithCache = (
text: string,
cache: CacheHint | undefined,
): Array<BedrockTextBlock | BedrockCache.CachePointBlock> => {
const cachePoint = BedrockCache.block(cache)
return cachePoint ? [{ text }, cachePoint] : [{ text }]
}
const lowerToolChoice = (toolChoice: NonNullable<LLMRequest["toolChoice"]>) =>
ProviderShared.matchToolChoice("Bedrock Converse", toolChoice, {
auto: () => ({ auto: {} }) as const,
none: () => undefined,
required: () => ({ any: {} }) as const,
tool: (name) => ({ tool: { name } }) as const,
})
const lowerToolCall = (part: ToolCallPart): BedrockToolUseBlock => ({
toolUse: {
toolUseId: part.id,
name: part.name,
input: part.input,
},
})
const lowerToolResult = (part: ToolResultPart): BedrockToolResultBlock => ({
toolResult: {
toolUseId: part.id,
content:
part.result.type === "text" || part.result.type === "error"
? [{ text: ProviderShared.toolResultText(part) }]
: [{ json: part.result.value }],
status: part.result.type === "error" ? "error" : "success",
},
})
const lowerMessages = Effect.fn("BedrockConverse.lowerMessages")(function* (request: LLMRequest) {
const messages: BedrockMessage[] = []
for (const message of request.messages) {
if (message.role === "user") {
const content: BedrockUserBlock[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["text", "media"]))
return yield* ProviderShared.unsupportedContent("Bedrock Converse", "user", ["text", "media"])
if (part.type === "text") {
content.push(...textWithCache(part.text, part.cache))
continue
}
if (part.type === "media") {
content.push(yield* BedrockMedia.lower(part))
continue
}
}
messages.push({ role: "user", content })
continue
}
if (message.role === "assistant") {
const content: BedrockAssistantBlock[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["text", "reasoning", "tool-call"]))
return yield* ProviderShared.unsupportedContent("Bedrock Converse", "assistant", [
"text",
"reasoning",
"tool-call",
])
if (part.type === "text") {
content.push(...textWithCache(part.text, part.cache))
continue
}
if (part.type === "reasoning") {
content.push({
reasoningContent: {
reasoningText: { text: part.text, signature: part.encrypted },
},
})
continue
}
if (part.type === "tool-call") {
content.push(lowerToolCall(part))
continue
}
}
messages.push({ role: "assistant", content })
continue
}
const content: BedrockToolResultBlock[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["tool-result"]))
return yield* ProviderShared.unsupportedContent("Bedrock Converse", "tool", ["tool-result"])
content.push(lowerToolResult(part))
}
messages.push({ role: "user", content })
}
return messages
})
// System prompts share the cache-point convention: emit the text block, then
// optionally a positional `cachePoint` marker.
const lowerSystem = (system: ReadonlyArray<LLMRequest["system"][number]>): BedrockSystemBlock[] =>
system.flatMap((part) => textWithCache(part.text, part.cache))
const fromRequest = Effect.fn("BedrockConverse.fromRequest")(function* (request: LLMRequest) {
const toolChoice = request.toolChoice ? yield* lowerToolChoice(request.toolChoice) : undefined
const generation = request.generation
return {
modelId: request.model.id,
messages: yield* lowerMessages(request),
system: request.system.length === 0 ? undefined : lowerSystem(request.system),
inferenceConfig:
generation?.maxTokens === undefined &&
generation?.temperature === undefined &&
generation?.topP === undefined &&
(generation?.stop === undefined || generation.stop.length === 0)
? undefined
: {
maxTokens: generation?.maxTokens,
temperature: generation?.temperature,
topP: generation?.topP,
stopSequences: generation?.stop,
},
toolConfig:
request.tools.length > 0 && request.toolChoice?.type !== "none"
? { tools: request.tools.map(lowerTool), toolChoice }
: undefined,
}
})
// =============================================================================
// Stream Parsing
// =============================================================================
const mapFinishReason = (reason: string): FinishReason => {
if (reason === "end_turn" || reason === "stop_sequence") return "stop"
if (reason === "max_tokens") return "length"
if (reason === "tool_use") return "tool-calls"
if (reason === "content_filtered" || reason === "guardrail_intervened") return "content-filter"
return "unknown"
}
const mapUsage = (usage: BedrockUsageSchema | undefined): Usage | undefined => {
if (!usage) return undefined
return new Usage({
inputTokens: usage.inputTokens,
outputTokens: usage.outputTokens,
totalTokens: ProviderShared.totalTokens(usage.inputTokens, usage.outputTokens, usage.totalTokens),
cacheReadInputTokens: usage.cacheReadInputTokens,
cacheWriteInputTokens: usage.cacheWriteInputTokens,
native: usage,
})
}
interface ParserState {
readonly tools: ToolStream.State<number>
// Bedrock splits the finish into `messageStop` (carries `stopReason`) and
// `metadata` (carries usage). Hold the terminal event in state so `onHalt`
// can emit exactly one finish after both chunks have had a chance to arrive.
readonly pendingFinish: { readonly reason: FinishReason; readonly usage?: Usage } | undefined
}
const step = (state: ParserState, event: BedrockEvent) =>
Effect.gen(function* () {
if (event.contentBlockStart?.start?.toolUse) {
const index = event.contentBlockStart.contentBlockIndex
return [
{
...state,
tools: ToolStream.start(state.tools, index, {
id: event.contentBlockStart.start.toolUse.toolUseId,
name: event.contentBlockStart.start.toolUse.name,
}),
},
[],
] as const
}
if (event.contentBlockDelta?.delta?.text) {
return [state, [{ type: "text-delta" as const, text: event.contentBlockDelta.delta.text }]] as const
}
if (event.contentBlockDelta?.delta?.reasoningContent?.text) {
return [
state,
[{ type: "reasoning-delta" as const, text: event.contentBlockDelta.delta.reasoningContent.text }],
] as const
}
if (event.contentBlockDelta?.delta?.toolUse) {
const index = event.contentBlockDelta.contentBlockIndex
const result = ToolStream.appendExisting(
ADAPTER,
state.tools,
index,
event.contentBlockDelta.delta.toolUse.input,
"Bedrock Converse tool delta is missing its tool call",
)
if (ToolStream.isError(result)) return yield* result
return [{ ...state, tools: result.tools }, result.event ? [result.event] : []] as const
}
if (event.contentBlockStop) {
const result = yield* ToolStream.finish(ADAPTER, state.tools, event.contentBlockStop.contentBlockIndex)
return [{ ...state, tools: result.tools }, result.event ? [result.event] : []] as const
}
if (event.messageStop) {
return [
{
...state,
pendingFinish: { reason: mapFinishReason(event.messageStop.stopReason), usage: state.pendingFinish?.usage },
},
[],
] as const
}
if (event.metadata) {
const usage = mapUsage(event.metadata.usage)
return [{ ...state, pendingFinish: { reason: state.pendingFinish?.reason ?? "stop", usage } }, []] as const
}
if (event.internalServerException || event.modelStreamErrorException || event.serviceUnavailableException) {
const message =
event.internalServerException?.message ??
event.modelStreamErrorException?.message ??
event.serviceUnavailableException?.message ??
"Bedrock Converse stream error"
return [state, [{ type: "provider-error" as const, message, retryable: true }]] as const
}
if (event.validationException || event.throttlingException) {
const message =
event.validationException?.message ?? event.throttlingException?.message ?? "Bedrock Converse error"
return [
state,
[{ type: "provider-error" as const, message, retryable: event.throttlingException !== undefined }],
] as const
}
return [state, []] as const
})
const framing = BedrockEventStream.framing(ADAPTER)
const onHalt = (state: ParserState): ReadonlyArray<LLMEvent> =>
state.pendingFinish
? [{ type: "request-finish", reason: state.pendingFinish.reason, usage: state.pendingFinish.usage }]
: []
// =============================================================================
// Protocol And Bedrock Route
// =============================================================================
/**
* The Bedrock Converse protocol — request body construction, body schema, and
* the streaming-event state machine.
*/
export const protocol = Protocol.make({
id: ADAPTER,
body: {
schema: BedrockConverseBody,
from: fromRequest,
},
stream: {
event: BedrockEvent,
initial: () => ({ tools: ToolStream.empty<number>(), pendingFinish: undefined }),
step,
onHalt,
},
})
export const route = Route.make({
id: ADAPTER,
protocol,
// Bedrock's URL embeds the region in the host (set on `model.baseURL` by
// the provider helper from credentials) and the validated modelId in the
// path. We read the validated body so the URL matches the body that gets
// signed.
endpoint: Endpoint.path<BedrockConverseBody>(
({ body }) => `/model/${encodeURIComponent(body.modelId)}/converse-stream`,
),
auth: BedrockAuth.auth,
framing,
})
export const nativeCredentials = BedrockAuth.nativeCredentials
const bedrockModel = Route.model(
route,
{
provider: "bedrock",
},
{
mapInput: (input: BedrockConverseModelInput) => {
const { credentials, ...rest } = input
const region = credentials?.region ?? "us-east-1"
return {
...rest,
baseURL: rest.baseURL ?? `https://bedrock-runtime.${region}.amazonaws.com`,
native: nativeCredentials(input.native, credentials),
}
},
},
)
export const model = bedrockModel
export * as BedrockConverse from "./bedrock-converse"

View File

@@ -0,0 +1,87 @@
import { EventStreamCodec } from "@smithy/eventstream-codec"
import { fromUtf8, toUtf8 } from "@smithy/util-utf8"
import { Effect, Stream } from "effect"
import type { Framing } from "../route/framing"
import { ProviderShared } from "./shared"
// Bedrock streams responses using the AWS event stream binary protocol — each
// frame is `[length:4][headers-length:4][prelude-crc:4][headers][payload][crc:4]`.
// We use `@smithy/eventstream-codec` to validate framing and CRCs, then
// reconstruct the JSON wrapping by `:event-type` so the chunk schema can match.
const eventCodec = new EventStreamCodec(toUtf8, fromUtf8)
const utf8 = new TextDecoder()
// Cursor-tracking buffer state. Bytes accumulate in `buffer`; `offset` is the
// read position. Reading by `subarray` is zero-copy. We only allocate a fresh
// buffer when a new network chunk arrives and we need to append.
interface FrameBufferState {
readonly buffer: Uint8Array
readonly offset: number
}
const initialFrameBuffer: FrameBufferState = { buffer: new Uint8Array(0), offset: 0 }
const appendChunk = (state: FrameBufferState, chunk: Uint8Array): FrameBufferState => {
const remaining = state.buffer.length - state.offset
// Compact: drop the consumed prefix and append the new chunk in one alloc.
// This bounds buffer growth to at most one network chunk past the live
// window, regardless of stream length.
const next = new Uint8Array(remaining + chunk.length)
next.set(state.buffer.subarray(state.offset), 0)
next.set(chunk, remaining)
return { buffer: next, offset: 0 }
}
const consumeFrames = (route: string) => (state: FrameBufferState, chunk: Uint8Array) =>
Effect.gen(function* () {
let cursor = appendChunk(state, chunk)
const out: object[] = []
while (cursor.buffer.length - cursor.offset >= 4) {
const view = cursor.buffer.subarray(cursor.offset)
const totalLength = new DataView(view.buffer, view.byteOffset, view.byteLength).getUint32(0, false)
if (view.length < totalLength) break
const decoded = yield* Effect.try({
try: () => eventCodec.decode(view.subarray(0, totalLength)),
catch: (error) =>
ProviderShared.eventError(
route,
`Failed to decode Bedrock Converse event-stream frame: ${
error instanceof Error ? error.message : String(error)
}`,
),
})
cursor = { buffer: cursor.buffer, offset: cursor.offset + totalLength }
if (decoded.headers[":message-type"]?.value !== "event") continue
const eventType = decoded.headers[":event-type"]?.value
if (typeof eventType !== "string") continue
const payload = utf8.decode(decoded.body)
if (!payload) continue
// The AWS event stream pads short payloads with a `p` field. Drop it
// before handing the object to the chunk schema. JSON decode goes
// through the shared Schema-driven codec to satisfy the package rule
// against ad-hoc `JSON.parse` calls.
const parsed = (yield* ProviderShared.parseJson(
route,
payload,
"Failed to parse Bedrock Converse event-stream payload",
)) as Record<string, unknown>
delete parsed.p
out.push({ [eventType]: parsed })
}
return [cursor, out] as const
})
/**
* AWS event-stream framing for Bedrock Converse. Each frame is decoded by
* `@smithy/eventstream-codec` (length + header + payload + CRC) and rewrapped
* under its `:event-type` header so the chunk schema can match the JSON
* payload directly.
*/
export const framing = (route: string): Framing<object> => ({
id: "aws-event-stream",
frame: (bytes) => bytes.pipe(Stream.mapAccumEffect(() => initialFrameBuffer, consumeFrames(route))),
})
export * as BedrockEventStream from "./bedrock-event-stream"

View File

@@ -0,0 +1,397 @@
import { Effect, Schema } from "effect"
import { Route } from "../route/client"
import { Auth } from "../route/auth"
import { Endpoint } from "../route/endpoint"
import { Framing } from "../route/framing"
import { Protocol } from "../route/protocol"
import {
Usage,
type FinishReason,
type LLMEvent,
type LLMRequest,
type MediaPart,
type TextPart,
type ToolCallPart,
type ToolDefinition,
} from "../schema"
import { JsonObject, optionalArray, ProviderShared } from "./shared"
import { GeminiToolSchema } from "./utils/gemini-tool-schema"
const ADAPTER = "gemini"
export const DEFAULT_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
// =============================================================================
// Request Body Schema
// =============================================================================
const GeminiTextPart = Schema.Struct({
text: Schema.String,
thought: Schema.optional(Schema.Boolean),
thoughtSignature: Schema.optional(Schema.String),
})
const GeminiInlineDataPart = Schema.Struct({
inlineData: Schema.Struct({
mimeType: Schema.String,
data: Schema.String,
}),
})
const GeminiFunctionCallPart = Schema.Struct({
functionCall: Schema.Struct({
name: Schema.String,
args: Schema.Unknown,
}),
thoughtSignature: Schema.optional(Schema.String),
})
const GeminiFunctionResponsePart = Schema.Struct({
functionResponse: Schema.Struct({
name: Schema.String,
response: Schema.Unknown,
}),
})
const GeminiContentPart = Schema.Union([
GeminiTextPart,
GeminiInlineDataPart,
GeminiFunctionCallPart,
GeminiFunctionResponsePart,
])
const GeminiContent = Schema.Struct({
role: Schema.Literals(["user", "model"]),
parts: Schema.Array(GeminiContentPart),
})
type GeminiContent = Schema.Schema.Type<typeof GeminiContent>
const GeminiSystemInstruction = Schema.Struct({
parts: Schema.Array(Schema.Struct({ text: Schema.String })),
})
const GeminiFunctionDeclaration = Schema.Struct({
name: Schema.String,
description: Schema.String,
parameters: Schema.optional(JsonObject),
})
const GeminiTool = Schema.Struct({
functionDeclarations: Schema.Array(GeminiFunctionDeclaration),
})
const GeminiToolConfig = Schema.Struct({
functionCallingConfig: Schema.Struct({
mode: Schema.Literals(["AUTO", "NONE", "ANY"]),
allowedFunctionNames: optionalArray(Schema.String),
}),
})
const GeminiThinkingConfig = Schema.Struct({
thinkingBudget: Schema.optional(Schema.Number),
includeThoughts: Schema.optional(Schema.Boolean),
})
const GeminiGenerationConfig = Schema.Struct({
maxOutputTokens: Schema.optional(Schema.Number),
temperature: Schema.optional(Schema.Number),
topP: Schema.optional(Schema.Number),
topK: Schema.optional(Schema.Number),
stopSequences: optionalArray(Schema.String),
thinkingConfig: Schema.optional(GeminiThinkingConfig),
})
const GeminiBodyFields = {
contents: Schema.Array(GeminiContent),
systemInstruction: Schema.optional(GeminiSystemInstruction),
tools: optionalArray(GeminiTool),
toolConfig: Schema.optional(GeminiToolConfig),
generationConfig: Schema.optional(GeminiGenerationConfig),
}
const GeminiBody = Schema.Struct(GeminiBodyFields)
export type GeminiBody = Schema.Schema.Type<typeof GeminiBody>
const GeminiUsage = Schema.Struct({
cachedContentTokenCount: Schema.optional(Schema.Number),
thoughtsTokenCount: Schema.optional(Schema.Number),
promptTokenCount: Schema.optional(Schema.Number),
candidatesTokenCount: Schema.optional(Schema.Number),
totalTokenCount: Schema.optional(Schema.Number),
})
type GeminiUsage = Schema.Schema.Type<typeof GeminiUsage>
const GeminiCandidate = Schema.Struct({
content: Schema.optional(GeminiContent),
finishReason: Schema.optional(Schema.String),
})
const GeminiEvent = Schema.Struct({
candidates: optionalArray(GeminiCandidate),
usageMetadata: Schema.optional(GeminiUsage),
})
type GeminiEvent = Schema.Schema.Type<typeof GeminiEvent>
interface ParserState {
readonly finishReason?: string
readonly hasToolCalls: boolean
readonly nextToolCallId: number
readonly usage?: Usage
}
const invalid = ProviderShared.invalidRequest
const mediaData = ProviderShared.mediaBytes
// =============================================================================
// Tool Schema Conversion
// =============================================================================
// Tool-schema conversion has two distinct concerns:
//
// 1. Sanitize — fix common authoring mistakes Gemini rejects: integer/number
// enums (must be strings), `required` entries that don't match a property,
// untyped arrays (`items` must be present), and `properties`/`required`
// keys on non-object scalars. Mirrors OpenCode's historical Gemini rules.
//
// 2. Project — lossy mapping from JSON Schema to Gemini's schema dialect:
// drop empty objects, derive `nullable: true` from `type: [..., "null"]`,
// coerce `const` to `[const]` enum, recurse properties/items, propagate
// only an allowlisted set of keys (description, required, format, type,
// properties, items, allOf, anyOf, oneOf, minLength). Anything outside the
// allowlist (e.g. `additionalProperties`, `$ref`) is silently dropped.
//
// Sanitize runs first, then project. The implementation lives in
// `utils/gemini-tool-schema` so this protocol keeps the same shape as the other
// provider protocols.
// =============================================================================
// Request Lowering
// =============================================================================
const lowerTool = (tool: ToolDefinition) => ({
name: tool.name,
description: tool.description,
parameters: GeminiToolSchema.convert(tool.inputSchema),
})
const lowerToolConfig = (toolChoice: NonNullable<LLMRequest["toolChoice"]>) =>
ProviderShared.matchToolChoice("Gemini", toolChoice, {
auto: () => ({ functionCallingConfig: { mode: "AUTO" as const } }),
none: () => ({ functionCallingConfig: { mode: "NONE" as const } }),
required: () => ({ functionCallingConfig: { mode: "ANY" as const } }),
tool: (name) => ({ functionCallingConfig: { mode: "ANY" as const, allowedFunctionNames: [name] } }),
})
const lowerUserPart = (part: TextPart | MediaPart) =>
part.type === "text" ? { text: part.text } : { inlineData: { mimeType: part.mediaType, data: mediaData(part) } }
const lowerToolCall = (part: ToolCallPart) => ({
functionCall: { name: part.name, args: part.input },
})
const lowerMessages = Effect.fn("Gemini.lowerMessages")(function* (request: LLMRequest) {
const contents: GeminiContent[] = []
for (const message of request.messages) {
if (message.role === "user") {
const parts: Array<Schema.Schema.Type<typeof GeminiContentPart>> = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["text", "media"]))
return yield* ProviderShared.unsupportedContent("Gemini", "user", ["text", "media"])
parts.push(lowerUserPart(part))
}
contents.push({ role: "user", parts })
continue
}
if (message.role === "assistant") {
const parts: Array<Schema.Schema.Type<typeof GeminiContentPart>> = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["text", "reasoning", "tool-call"]))
return yield* ProviderShared.unsupportedContent("Gemini", "assistant", ["text", "reasoning", "tool-call"])
if (part.type === "text") {
parts.push({ text: part.text })
continue
}
if (part.type === "reasoning") {
parts.push({ text: part.text, thought: true })
continue
}
if (part.type === "tool-call") {
parts.push(lowerToolCall(part))
continue
}
}
contents.push({ role: "model", parts })
continue
}
const parts: Array<Schema.Schema.Type<typeof GeminiContentPart>> = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["tool-result"]))
return yield* ProviderShared.unsupportedContent("Gemini", "tool", ["tool-result"])
parts.push({
functionResponse: {
name: part.name,
response: {
name: part.name,
content: ProviderShared.toolResultText(part),
},
},
})
}
contents.push({ role: "user", parts })
}
return contents
})
const geminiOptions = (request: LLMRequest) => request.providerOptions?.gemini
const thinkingConfig = (request: LLMRequest) => {
const value = geminiOptions(request)?.thinkingConfig
if (!ProviderShared.isRecord(value)) return undefined
const result = {
thinkingBudget: typeof value.thinkingBudget === "number" ? value.thinkingBudget : undefined,
includeThoughts: typeof value.includeThoughts === "boolean" ? value.includeThoughts : undefined,
}
return Object.values(result).some((item) => item !== undefined) ? result : undefined
}
const fromRequest = Effect.fn("Gemini.fromRequest")(function* (request: LLMRequest) {
const toolsEnabled = request.tools.length > 0 && request.toolChoice?.type !== "none"
const generation = request.generation
const generationConfig = {
maxOutputTokens: generation?.maxTokens,
temperature: generation?.temperature,
topP: generation?.topP,
topK: generation?.topK,
stopSequences: generation?.stop,
thinkingConfig: thinkingConfig(request),
}
return {
contents: yield* lowerMessages(request),
systemInstruction:
request.system.length === 0 ? undefined : { parts: [{ text: ProviderShared.joinText(request.system) }] },
tools: toolsEnabled ? [{ functionDeclarations: request.tools.map(lowerTool) }] : undefined,
toolConfig: toolsEnabled && request.toolChoice ? yield* lowerToolConfig(request.toolChoice) : undefined,
generationConfig: Object.values(generationConfig).some((value) => value !== undefined)
? generationConfig
: undefined,
}
})
// =============================================================================
// Stream Parsing
// =============================================================================
const mapUsage = (usage: GeminiUsage | undefined) => {
if (!usage) return undefined
return new Usage({
inputTokens: usage.promptTokenCount,
outputTokens: usage.candidatesTokenCount,
reasoningTokens: usage.thoughtsTokenCount,
cacheReadInputTokens: usage.cachedContentTokenCount,
totalTokens: ProviderShared.totalTokens(usage.promptTokenCount, usage.candidatesTokenCount, usage.totalTokenCount),
native: usage,
})
}
const mapFinishReason = (finishReason: string | undefined, hasToolCalls: boolean): FinishReason => {
if (finishReason === "STOP") return hasToolCalls ? "tool-calls" : "stop"
if (finishReason === "MAX_TOKENS") return "length"
if (
finishReason === "IMAGE_SAFETY" ||
finishReason === "RECITATION" ||
finishReason === "SAFETY" ||
finishReason === "BLOCKLIST" ||
finishReason === "PROHIBITED_CONTENT" ||
finishReason === "SPII"
)
return "content-filter"
if (finishReason === "MALFORMED_FUNCTION_CALL") return "error"
return "unknown"
}
const finish = (state: ParserState): ReadonlyArray<LLMEvent> =>
state.finishReason || state.usage
? [{ type: "request-finish", reason: mapFinishReason(state.finishReason, state.hasToolCalls), usage: state.usage }]
: []
const step = (state: ParserState, event: GeminiEvent) => {
const nextState = {
...state,
usage: event.usageMetadata ? (mapUsage(event.usageMetadata) ?? state.usage) : state.usage,
}
const candidate = event.candidates?.[0]
if (!candidate?.content)
return Effect.succeed([
{ ...nextState, finishReason: candidate?.finishReason ?? nextState.finishReason },
[],
] as const)
const events: LLMEvent[] = []
let hasToolCalls = nextState.hasToolCalls
let nextToolCallId = nextState.nextToolCallId
for (const part of candidate.content.parts) {
if ("text" in part && part.text.length > 0) {
events.push({ type: part.thought ? "reasoning-delta" : "text-delta", text: part.text })
continue
}
if ("functionCall" in part) {
const input = part.functionCall.args
const id = `tool_${nextToolCallId++}`
events.push({ type: "tool-call", id, name: part.functionCall.name, input })
hasToolCalls = true
}
}
return Effect.succeed([
{
...nextState,
hasToolCalls,
nextToolCallId,
finishReason: candidate.finishReason ?? nextState.finishReason,
},
events,
] as const)
}
// =============================================================================
// Protocol And Gemini Route
// =============================================================================
/**
* The Gemini protocol — request body construction, body schema, and the
* streaming-event state machine. Used by Google AI Studio Gemini and (once
* registered) Vertex Gemini.
*/
export const protocol = Protocol.make({
id: ADAPTER,
body: {
schema: GeminiBody,
from: fromRequest,
},
stream: {
event: Protocol.jsonEvent(GeminiEvent),
initial: () => ({ hasToolCalls: false, nextToolCallId: 0 }),
step,
onHalt: finish,
},
})
export const route = Route.make({
id: ADAPTER,
protocol,
// Gemini's path embeds the model id and pins SSE framing at the URL level.
endpoint: Endpoint.path(({ request }) => `/models/${request.model.id}:streamGenerateContent?alt=sse`),
auth: Auth.apiKeyHeader("x-goog-api-key"),
framing: Framing.sse,
})
// =============================================================================
// Model Helper
// =============================================================================
export const model = Route.model(route, {
provider: "google",
baseURL: DEFAULT_BASE_URL,
})
export * as Gemini from "./gemini"

View File

@@ -0,0 +1,6 @@
export * as AnthropicMessages from "./anthropic-messages"
export * as BedrockConverse from "./bedrock-converse"
export * as Gemini from "./gemini"
export * as OpenAIChat from "./openai-chat"
export * as OpenAICompatibleChat from "./openai-compatible-chat"
export * as OpenAIResponses from "./openai-responses"

View File

@@ -0,0 +1,404 @@
import { Array as Arr, Effect, Schema } from "effect"
import { Route } from "../route/client"
import { Auth } from "../route/auth"
import { Endpoint } from "../route/endpoint"
import { Framing } from "../route/framing"
import { HttpTransport } from "../route/transport"
import { Protocol } from "../route/protocol"
import {
Usage,
type FinishReason,
type LLMEvent,
type LLMRequest,
type TextPart,
type ToolCallPart,
type ToolDefinition,
} from "../schema"
import { isRecord, JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared"
import { OpenAIOptions } from "./utils/openai-options"
import { ToolStream } from "./utils/tool-stream"
const ADAPTER = "openai-chat"
export const DEFAULT_BASE_URL = "https://api.openai.com/v1"
export const PATH = "/chat/completions"
// =============================================================================
// Request Body Schema
// =============================================================================
// The body schema is the provider-native JSON body. `fromRequest` below builds
// this shape from the common `LLMRequest`, then `Route.make` validates and
// JSON-encodes it before transport.
const OpenAIChatFunction = Schema.Struct({
name: Schema.String,
description: Schema.String,
parameters: JsonObject,
})
const OpenAIChatTool = Schema.Struct({
type: Schema.tag("function"),
function: OpenAIChatFunction,
})
type OpenAIChatTool = Schema.Schema.Type<typeof OpenAIChatTool>
const OpenAIChatAssistantToolCall = Schema.Struct({
id: Schema.String,
type: Schema.tag("function"),
function: Schema.Struct({
name: Schema.String,
arguments: Schema.String,
}),
})
type OpenAIChatAssistantToolCall = Schema.Schema.Type<typeof OpenAIChatAssistantToolCall>
const OpenAIChatMessage = Schema.Union([
Schema.Struct({ role: Schema.Literal("system"), content: Schema.String }),
Schema.Struct({ role: Schema.Literal("user"), content: Schema.String }),
Schema.Struct({
role: Schema.Literal("assistant"),
content: Schema.NullOr(Schema.String),
tool_calls: optionalArray(OpenAIChatAssistantToolCall),
reasoning_content: Schema.optional(Schema.String),
}),
Schema.Struct({ role: Schema.Literal("tool"), tool_call_id: Schema.String, content: Schema.String }),
]).pipe(Schema.toTaggedUnion("role"))
type OpenAIChatMessage = Schema.Schema.Type<typeof OpenAIChatMessage>
const OpenAIChatToolChoice = Schema.Union([
Schema.Literals(["auto", "none", "required"]),
Schema.Struct({
type: Schema.tag("function"),
function: Schema.Struct({ name: Schema.String }),
}),
])
export const bodyFields = {
model: Schema.String,
messages: Schema.Array(OpenAIChatMessage),
tools: optionalArray(OpenAIChatTool),
tool_choice: Schema.optional(OpenAIChatToolChoice),
stream: Schema.Literal(true),
stream_options: Schema.optional(Schema.Struct({ include_usage: Schema.Boolean })),
store: Schema.optional(Schema.Boolean),
reasoning_effort: Schema.optional(OpenAIOptions.OpenAIReasoningEffort),
max_tokens: Schema.optional(Schema.Number),
temperature: Schema.optional(Schema.Number),
top_p: Schema.optional(Schema.Number),
frequency_penalty: Schema.optional(Schema.Number),
presence_penalty: Schema.optional(Schema.Number),
seed: Schema.optional(Schema.Number),
stop: optionalArray(Schema.String),
}
const OpenAIChatBody = Schema.Struct(bodyFields)
export type OpenAIChatBody = Schema.Schema.Type<typeof OpenAIChatBody>
// =============================================================================
// Streaming Event Schema
// =============================================================================
// The event schema is one decoded SSE `data:` payload. `Framing.sse` splits the
// byte stream into strings, then `Protocol.jsonEvent` decodes each string into
// this provider-native event shape.
const OpenAIChatUsage = Schema.Struct({
prompt_tokens: Schema.optional(Schema.Number),
completion_tokens: Schema.optional(Schema.Number),
total_tokens: Schema.optional(Schema.Number),
prompt_tokens_details: optionalNull(
Schema.Struct({
cached_tokens: Schema.optional(Schema.Number),
}),
),
completion_tokens_details: optionalNull(
Schema.Struct({
reasoning_tokens: Schema.optional(Schema.Number),
}),
),
})
const OpenAIChatToolCallDeltaFunction = Schema.Struct({
name: optionalNull(Schema.String),
arguments: optionalNull(Schema.String),
})
const OpenAIChatToolCallDelta = Schema.Struct({
index: Schema.Number,
id: optionalNull(Schema.String),
function: optionalNull(OpenAIChatToolCallDeltaFunction),
})
type OpenAIChatToolCallDelta = Schema.Schema.Type<typeof OpenAIChatToolCallDelta>
const OpenAIChatDelta = Schema.Struct({
content: optionalNull(Schema.String),
tool_calls: optionalNull(Schema.Array(OpenAIChatToolCallDelta)),
})
const OpenAIChatChoice = Schema.Struct({
delta: optionalNull(OpenAIChatDelta),
finish_reason: optionalNull(Schema.String),
})
const OpenAIChatEvent = Schema.Struct({
choices: Schema.Array(OpenAIChatChoice),
usage: optionalNull(OpenAIChatUsage),
})
type OpenAIChatEvent = Schema.Schema.Type<typeof OpenAIChatEvent>
type OpenAIChatRequestMessage = LLMRequest["messages"][number]
interface ParserState {
readonly tools: ToolStream.State<number>
readonly toolCallEvents: ReadonlyArray<LLMEvent>
readonly usage?: Usage
readonly finishReason?: FinishReason
}
const invalid = ProviderShared.invalidRequest
// =============================================================================
// Request Lowering
// =============================================================================
// Lowering is the only place that knows how common LLM messages map onto the
// OpenAI Chat wire format. Keep provider quirks here instead of leaking native
// fields into `LLMRequest`.
const lowerTool = (tool: ToolDefinition): OpenAIChatTool => ({
type: "function",
function: {
name: tool.name,
description: tool.description,
parameters: tool.inputSchema,
},
})
const lowerToolChoice = (toolChoice: NonNullable<LLMRequest["toolChoice"]>) =>
ProviderShared.matchToolChoice("OpenAI Chat", toolChoice, {
auto: () => "auto" as const,
none: () => "none" as const,
required: () => "required" as const,
tool: (name) => ({ type: "function" as const, function: { name } }),
})
const lowerToolCall = (part: ToolCallPart): OpenAIChatAssistantToolCall => ({
id: part.id,
type: "function",
function: {
name: part.name,
arguments: ProviderShared.encodeJson(part.input),
},
})
const openAICompatibleReasoningContent = (native: unknown) =>
isRecord(native) && typeof native.reasoning_content === "string" ? native.reasoning_content : undefined
const lowerUserMessage = Effect.fn("OpenAIChat.lowerUserMessage")(function* (message: OpenAIChatRequestMessage) {
const content: TextPart[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["text"]))
return yield* ProviderShared.unsupportedContent("OpenAI Chat", "user", ["text"])
content.push(part)
}
return { role: "user" as const, content: ProviderShared.joinText(content) }
})
const lowerAssistantMessage = Effect.fn("OpenAIChat.lowerAssistantMessage")(function* (
message: OpenAIChatRequestMessage,
) {
const content: TextPart[] = []
const toolCalls: OpenAIChatAssistantToolCall[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["text", "tool-call"]))
return yield* ProviderShared.unsupportedContent("OpenAI Chat", "assistant", ["text", "tool-call"])
if (part.type === "text") {
content.push(part)
continue
}
if (part.type === "tool-call") {
toolCalls.push(lowerToolCall(part))
continue
}
}
return {
role: "assistant" as const,
content: content.length === 0 ? null : ProviderShared.joinText(content),
tool_calls: toolCalls.length === 0 ? undefined : toolCalls,
reasoning_content: openAICompatibleReasoningContent(message.native?.openaiCompatible),
}
})
const lowerToolMessages = Effect.fn("OpenAIChat.lowerToolMessages")(function* (message: OpenAIChatRequestMessage) {
const messages: OpenAIChatMessage[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["tool-result"]))
return yield* ProviderShared.unsupportedContent("OpenAI Chat", "tool", ["tool-result"])
messages.push({ role: "tool", tool_call_id: part.id, content: ProviderShared.toolResultText(part) })
}
return messages
})
const lowerMessage = Effect.fn("OpenAIChat.lowerMessage")(function* (message: OpenAIChatRequestMessage) {
if (message.role === "user") return [yield* lowerUserMessage(message)]
if (message.role === "assistant") return [yield* lowerAssistantMessage(message)]
return yield* lowerToolMessages(message)
})
const lowerMessages = Effect.fn("OpenAIChat.lowerMessages")(function* (request: LLMRequest) {
const system: OpenAIChatMessage[] =
request.system.length === 0 ? [] : [{ role: "system", content: ProviderShared.joinText(request.system) }]
return [...system, ...Arr.flatten(yield* Effect.forEach(request.messages, lowerMessage))]
})
const lowerOptions = Effect.fn("OpenAIChat.lowerOptions")(function* (request: LLMRequest) {
const store = OpenAIOptions.store(request)
const reasoningEffort = OpenAIOptions.reasoningEffort(request)
if (reasoningEffort && !OpenAIOptions.isReasoningEffort(reasoningEffort))
return yield* invalid(`OpenAI Chat does not support reasoning effort ${reasoningEffort}`)
return {
...(store !== undefined ? { store } : {}),
...(reasoningEffort ? { reasoning_effort: reasoningEffort } : {}),
}
})
const fromRequest = Effect.fn("OpenAIChat.fromRequest")(function* (request: LLMRequest) {
// `fromRequest` returns the provider body only. Endpoint, auth, framing,
// validation, and HTTP execution are composed by `Route.make`.
const generation = request.generation
return {
model: request.model.id,
messages: yield* lowerMessages(request),
tools: request.tools.length === 0 ? undefined : request.tools.map(lowerTool),
tool_choice: request.toolChoice ? yield* lowerToolChoice(request.toolChoice) : undefined,
stream: true as const,
stream_options: { include_usage: true },
max_tokens: generation?.maxTokens,
temperature: generation?.temperature,
top_p: generation?.topP,
frequency_penalty: generation?.frequencyPenalty,
presence_penalty: generation?.presencePenalty,
seed: generation?.seed,
stop: generation?.stop,
...(yield* lowerOptions(request)),
}
})
// =============================================================================
// Stream Parsing
// =============================================================================
// Streaming parsers are small state machines: every event returns a new state
// plus the common `LLMEvent`s produced by that event. Tool calls are accumulated
// because OpenAI streams JSON arguments across multiple deltas.
const mapFinishReason = (reason: string | null | undefined): FinishReason => {
if (reason === "stop") return "stop"
if (reason === "length") return "length"
if (reason === "content_filter") return "content-filter"
if (reason === "function_call" || reason === "tool_calls") return "tool-calls"
return "unknown"
}
const mapUsage = (usage: OpenAIChatEvent["usage"]): Usage | undefined => {
if (!usage) return undefined
return new Usage({
inputTokens: usage.prompt_tokens,
outputTokens: usage.completion_tokens,
reasoningTokens: usage.completion_tokens_details?.reasoning_tokens,
cacheReadInputTokens: usage.prompt_tokens_details?.cached_tokens,
totalTokens: ProviderShared.totalTokens(usage.prompt_tokens, usage.completion_tokens, usage.total_tokens),
native: usage,
})
}
const step = (state: ParserState, event: OpenAIChatEvent) =>
Effect.gen(function* () {
const events: LLMEvent[] = []
const usage = mapUsage(event.usage) ?? state.usage
const choice = event.choices[0]
const finishReason = choice?.finish_reason ? mapFinishReason(choice.finish_reason) : state.finishReason
const delta = choice?.delta
const toolDeltas = delta?.tool_calls ?? []
let tools = state.tools
if (delta?.content) events.push({ type: "text-delta", text: delta.content })
for (const tool of toolDeltas) {
const result = ToolStream.appendOrStart(
ADAPTER,
tools,
tool.index,
{ id: tool.id ?? undefined, name: tool.function?.name ?? undefined, text: tool.function?.arguments ?? "" },
"OpenAI Chat tool call delta is missing id or name",
)
if (ToolStream.isError(result)) return yield* result
tools = result.tools
if (result.event) events.push(result.event)
}
// Finalize accumulated tool inputs eagerly when finish_reason arrives so
// JSON parse failures fail the stream at the boundary rather than at halt.
const finished =
finishReason !== undefined && state.finishReason === undefined && Object.keys(tools).length > 0
? yield* ToolStream.finishAll(ADAPTER, tools)
: undefined
return [
{
tools: finished?.tools ?? tools,
toolCallEvents: finished?.events ?? state.toolCallEvents,
usage,
finishReason,
},
events,
] as const
})
const finishEvents = (state: ParserState): ReadonlyArray<LLMEvent> => {
const hasToolCalls = state.toolCallEvents.length > 0
const reason = state.finishReason === "stop" && hasToolCalls ? "tool-calls" : state.finishReason
return [
...state.toolCallEvents,
...(reason ? ([{ type: "request-finish", reason, usage: state.usage }] satisfies ReadonlyArray<LLMEvent>) : []),
]
}
// =============================================================================
// Protocol And OpenAI Route
// =============================================================================
/**
* The OpenAI Chat protocol — request body construction, body schema, and the
* streaming-event state machine. Reused by every route that speaks OpenAI Chat
* over HTTP+SSE: native OpenAI, DeepSeek, TogetherAI, Cerebras, Baseten,
* Fireworks, DeepInfra, and (once added) Azure OpenAI Chat.
*/
export const protocol = Protocol.make({
id: ADAPTER,
body: {
schema: OpenAIChatBody,
from: fromRequest,
},
stream: {
event: Protocol.jsonEvent(OpenAIChatEvent),
initial: () => ({ tools: ToolStream.empty<number>(), toolCallEvents: [] }),
step,
onHalt: finishEvents,
},
})
const encodeBody = Schema.encodeSync(Schema.fromJsonString(OpenAIChatBody))
export const httpTransport = HttpTransport.httpJson({
endpoint: Endpoint.path(PATH),
auth: Auth.bearer(),
framing: Framing.sse,
encodeBody,
})
export const route = Route.make({
id: ADAPTER,
provider: "openai",
protocol,
transport: httpTransport,
defaults: {
baseURL: DEFAULT_BASE_URL,
},
})
// =============================================================================
// Model Helper
// =============================================================================
export const model = route.model
export * as OpenAIChat from "./openai-chat"

View File

@@ -0,0 +1,28 @@
import { Route, type RouteRoutedModelInput } from "../route/client"
import { Endpoint } from "../route/endpoint"
import { Framing } from "../route/framing"
import * as OpenAIChat from "./openai-chat"
const ADAPTER = "openai-compatible-chat"
export type OpenAICompatibleChatModelInput = Omit<RouteRoutedModelInput, "baseURL"> & {
readonly baseURL: string
}
/**
* Route for non-OpenAI providers that expose an OpenAI Chat-compatible
* `/chat/completions` endpoint. Reuses `OpenAIChat.protocol` end-to-end and
* overrides only the route id so providers can be resolved per-family without
* colliding with native OpenAI. The model carries the host on `baseURL`,
* supplied by whichever profile/provider helper builds it.
*/
export const route = Route.make({
id: ADAPTER,
protocol: OpenAIChat.protocol,
endpoint: Endpoint.path("/chat/completions"),
framing: Framing.sse,
})
export const model = Route.model<OpenAICompatibleChatModelInput>(route)
export * as OpenAICompatibleChat from "./openai-compatible-chat"

View File

@@ -0,0 +1,575 @@
import { Effect, Schema } from "effect"
import { Route } from "../route/client"
import { Auth } from "../route/auth"
import { Endpoint } from "../route/endpoint"
import { Framing } from "../route/framing"
import { HttpTransport, WebSocketTransport } from "../route/transport"
import { Protocol } from "../route/protocol"
import {
Usage,
type FinishReason,
type LLMEvent,
type LLMRequest,
type ProviderMetadata,
type TextPart,
type ToolCallPart,
type ToolDefinition,
} from "../schema"
import { JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared"
import { OpenAIOptions } from "./utils/openai-options"
import { ToolStream } from "./utils/tool-stream"
const ADAPTER = "openai-responses"
export const DEFAULT_BASE_URL = "https://api.openai.com/v1"
export const PATH = "/responses"
// =============================================================================
// Request Body Schema
// =============================================================================
const OpenAIResponsesInputText = Schema.Struct({
type: Schema.tag("input_text"),
text: Schema.String,
})
const OpenAIResponsesOutputText = Schema.Struct({
type: Schema.tag("output_text"),
text: Schema.String,
})
const OpenAIResponsesInputItem = Schema.Union([
Schema.Struct({ role: Schema.tag("system"), content: Schema.String }),
Schema.Struct({ role: Schema.tag("user"), content: Schema.Array(OpenAIResponsesInputText) }),
Schema.Struct({ role: Schema.tag("assistant"), content: Schema.Array(OpenAIResponsesOutputText) }),
Schema.Struct({
type: Schema.tag("function_call"),
call_id: Schema.String,
name: Schema.String,
arguments: Schema.String,
}),
Schema.Struct({
type: Schema.tag("function_call_output"),
call_id: Schema.String,
output: Schema.String,
}),
])
type OpenAIResponsesInputItem = Schema.Schema.Type<typeof OpenAIResponsesInputItem>
const OpenAIResponsesTool = Schema.Struct({
type: Schema.tag("function"),
name: Schema.String,
description: Schema.String,
parameters: JsonObject,
strict: Schema.optional(Schema.Boolean),
})
type OpenAIResponsesTool = Schema.Schema.Type<typeof OpenAIResponsesTool>
const OpenAIResponsesToolChoice = Schema.Union([
Schema.Literals(["auto", "none", "required"]),
Schema.Struct({ type: Schema.tag("function"), name: Schema.String }),
])
// Fields shared between the HTTP body and the WebSocket `response.create`
// message. The HTTP body adds `stream: true`; the WebSocket message adds
// `type: "response.create"`. Defining the shared shape once keeps the two
// transports in sync without a destructure-and-strip dance.
const OpenAIResponsesCoreFields = {
model: Schema.String,
input: Schema.Array(OpenAIResponsesInputItem),
tools: optionalArray(OpenAIResponsesTool),
tool_choice: Schema.optional(OpenAIResponsesToolChoice),
store: Schema.optional(Schema.Boolean),
prompt_cache_key: Schema.optional(Schema.String),
include: optionalArray(Schema.Literal("reasoning.encrypted_content")),
reasoning: Schema.optional(
Schema.Struct({
effort: Schema.optional(OpenAIOptions.OpenAIReasoningEffort),
summary: Schema.optional(Schema.Literal("auto")),
}),
),
text: Schema.optional(
Schema.Struct({
verbosity: Schema.optional(OpenAIOptions.OpenAITextVerbosity),
}),
),
max_output_tokens: Schema.optional(Schema.Number),
temperature: Schema.optional(Schema.Number),
top_p: Schema.optional(Schema.Number),
}
const OpenAIResponsesBody = Schema.Struct({
...OpenAIResponsesCoreFields,
stream: Schema.Literal(true),
})
export type OpenAIResponsesBody = Schema.Schema.Type<typeof OpenAIResponsesBody>
const OpenAIResponsesWebSocketMessage = Schema.StructWithRest(
Schema.Struct({
type: Schema.tag("response.create"),
...OpenAIResponsesCoreFields,
}),
[Schema.Record(Schema.String, Schema.Unknown)],
)
type OpenAIResponsesWebSocketMessage = Schema.Schema.Type<typeof OpenAIResponsesWebSocketMessage>
const encodeWebSocketMessage = Schema.encodeSync(Schema.fromJsonString(OpenAIResponsesWebSocketMessage))
const OpenAIResponsesUsage = Schema.Struct({
input_tokens: Schema.optional(Schema.Number),
input_tokens_details: optionalNull(Schema.Struct({ cached_tokens: Schema.optional(Schema.Number) })),
output_tokens: Schema.optional(Schema.Number),
output_tokens_details: optionalNull(Schema.Struct({ reasoning_tokens: Schema.optional(Schema.Number) })),
total_tokens: Schema.optional(Schema.Number),
})
type OpenAIResponsesUsage = Schema.Schema.Type<typeof OpenAIResponsesUsage>
const OpenAIResponsesStreamItem = Schema.Struct({
type: Schema.String,
id: Schema.optional(Schema.String),
call_id: Schema.optional(Schema.String),
name: Schema.optional(Schema.String),
arguments: Schema.optional(Schema.String),
// Hosted (provider-executed) tool fields. Each hosted tool item carries its
// own subset of these — we capture them generically so we can surface the
// call's typed input portion and round-trip the full result payload without
// hand-rolling a per-tool schema.
status: Schema.optional(Schema.String),
action: Schema.optional(Schema.Unknown),
queries: Schema.optional(Schema.Unknown),
results: Schema.optional(Schema.Unknown),
code: Schema.optional(Schema.String),
container_id: Schema.optional(Schema.String),
outputs: Schema.optional(Schema.Unknown),
server_label: Schema.optional(Schema.String),
output: Schema.optional(Schema.Unknown),
error: Schema.optional(Schema.Unknown),
})
type OpenAIResponsesStreamItem = Schema.Schema.Type<typeof OpenAIResponsesStreamItem>
const OpenAIResponsesEvent = Schema.Struct({
type: Schema.String,
delta: Schema.optional(Schema.String),
item_id: Schema.optional(Schema.String),
item: Schema.optional(OpenAIResponsesStreamItem),
response: Schema.optional(
Schema.Struct({
id: Schema.optional(Schema.String),
service_tier: Schema.optional(Schema.String),
incomplete_details: optionalNull(Schema.Struct({ reason: Schema.String })),
usage: optionalNull(OpenAIResponsesUsage),
}),
),
code: Schema.optional(Schema.String),
message: Schema.optional(Schema.String),
})
type OpenAIResponsesEvent = Schema.Schema.Type<typeof OpenAIResponsesEvent>
interface ParserState {
readonly tools: ToolStream.State<string>
readonly hasFunctionCall: boolean
}
const invalid = ProviderShared.invalidRequest
// =============================================================================
// Request Lowering
// =============================================================================
const lowerTool = (tool: ToolDefinition): OpenAIResponsesTool => ({
type: "function",
name: tool.name,
description: tool.description,
parameters: tool.inputSchema,
})
const lowerToolChoice = (toolChoice: NonNullable<LLMRequest["toolChoice"]>) =>
ProviderShared.matchToolChoice("OpenAI Responses", toolChoice, {
auto: () => "auto" as const,
none: () => "none" as const,
required: () => "required" as const,
tool: (name) => ({ type: "function" as const, name }),
})
const lowerToolCall = (part: ToolCallPart): OpenAIResponsesInputItem => ({
type: "function_call",
call_id: part.id,
name: part.name,
arguments: ProviderShared.encodeJson(part.input),
})
const lowerMessages = Effect.fn("OpenAIResponses.lowerMessages")(function* (request: LLMRequest) {
const system: OpenAIResponsesInputItem[] =
request.system.length === 0 ? [] : [{ role: "system", content: ProviderShared.joinText(request.system) }]
const input: OpenAIResponsesInputItem[] = [...system]
for (const message of request.messages) {
if (message.role === "user") {
const content: TextPart[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["text"]))
return yield* ProviderShared.unsupportedContent("OpenAI Responses", "user", ["text"])
content.push(part)
}
input.push({ role: "user", content: content.map((part) => ({ type: "input_text", text: part.text })) })
continue
}
if (message.role === "assistant") {
const content: TextPart[] = []
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["text", "tool-call"]))
return yield* ProviderShared.unsupportedContent("OpenAI Responses", "assistant", ["text", "tool-call"])
if (part.type === "text") {
content.push(part)
continue
}
if (part.type === "tool-call") {
input.push(lowerToolCall(part))
continue
}
}
if (content.length > 0)
input.push({ role: "assistant", content: content.map((part) => ({ type: "output_text", text: part.text })) })
continue
}
for (const part of message.content) {
if (!ProviderShared.supportsContent(part, ["tool-result"]))
return yield* ProviderShared.unsupportedContent("OpenAI Responses", "tool", ["tool-result"])
input.push({ type: "function_call_output", call_id: part.id, output: ProviderShared.toolResultText(part) })
}
}
return input
})
const lowerOptions = Effect.fn("OpenAIResponses.lowerOptions")(function* (request: LLMRequest) {
const store = OpenAIOptions.store(request)
const promptCacheKey = OpenAIOptions.promptCacheKey(request)
const effort = OpenAIOptions.reasoningEffort(request)
if (effort && !OpenAIOptions.isReasoningEffort(effort))
return yield* invalid(`OpenAI Responses does not support reasoning effort ${effort}`)
const summary = OpenAIOptions.reasoningSummary(request)
const encryptedState = OpenAIOptions.encryptedReasoning(request)
const verbosity = OpenAIOptions.textVerbosity(request)
return {
...(store !== undefined ? { store } : {}),
...(promptCacheKey ? { prompt_cache_key: promptCacheKey } : {}),
...(encryptedState ? { include: ["reasoning.encrypted_content"] as const } : {}),
...(effort || summary ? { reasoning: { effort, summary } } : {}),
...(verbosity ? { text: { verbosity } } : {}),
}
})
const fromRequest = Effect.fn("OpenAIResponses.fromRequest")(function* (request: LLMRequest) {
const generation = request.generation
return {
model: request.model.id,
input: yield* lowerMessages(request),
tools: request.tools.length === 0 ? undefined : request.tools.map(lowerTool),
tool_choice: request.toolChoice ? yield* lowerToolChoice(request.toolChoice) : undefined,
stream: true as const,
max_output_tokens: generation?.maxTokens,
temperature: generation?.temperature,
top_p: generation?.topP,
...(yield* lowerOptions(request)),
}
})
// =============================================================================
// Stream Parsing
// =============================================================================
const mapUsage = (usage: OpenAIResponsesUsage | null | undefined) => {
if (!usage) return undefined
return new Usage({
inputTokens: usage.input_tokens,
outputTokens: usage.output_tokens,
reasoningTokens: usage.output_tokens_details?.reasoning_tokens,
cacheReadInputTokens: usage.input_tokens_details?.cached_tokens,
totalTokens: ProviderShared.totalTokens(usage.input_tokens, usage.output_tokens, usage.total_tokens),
native: usage,
})
}
const mapFinishReason = (event: OpenAIResponsesEvent, hasFunctionCall: boolean): FinishReason => {
const reason = event.response?.incomplete_details?.reason
if (reason === undefined || reason === null) return hasFunctionCall ? "tool-calls" : "stop"
if (reason === "max_output_tokens") return "length"
if (reason === "content_filter") return "content-filter"
return hasFunctionCall ? "tool-calls" : "unknown"
}
const openaiMetadata = (metadata: Record<string, unknown>): ProviderMetadata => ({ openai: metadata })
// Hosted tool items (provider-executed) ship their typed input + status +
// result fields all in one item. We expose them as a `tool-call` +
// `tool-result` pair so consumers can treat them uniformly with client tools,
// only differentiated by `providerExecuted: true`.
//
// One record per OpenAI Responses item type that represents a hosted
// (provider-executed) tool call: the common name we surface, plus an `input`
// extractor that picks the fields the model actually populated for that tool.
// Falling back to `{}` when an entry isn't fully typed keeps unknown tools
// observable without rolling a per-tool schema.
const HOSTED_TOOLS = {
web_search_call: { name: "web_search", input: (item) => item.action ?? {} },
web_search_preview_call: { name: "web_search_preview", input: (item) => item.action ?? {} },
file_search_call: { name: "file_search", input: (item) => ({ queries: item.queries ?? [] }) },
code_interpreter_call: {
name: "code_interpreter",
input: (item) => ({ code: item.code, container_id: item.container_id }),
},
computer_use_call: { name: "computer_use", input: (item) => item.action ?? {} },
image_generation_call: { name: "image_generation", input: () => ({}) },
mcp_call: {
name: "mcp",
input: (item) => ({ server_label: item.server_label, name: item.name, arguments: item.arguments }),
},
local_shell_call: { name: "local_shell", input: (item) => item.action ?? {} },
} as const satisfies Record<
string,
{ readonly name: string; readonly input: (item: OpenAIResponsesStreamItem) => unknown }
>
type HostedToolType = keyof typeof HOSTED_TOOLS
const isHostedToolItem = (
item: OpenAIResponsesStreamItem,
): item is OpenAIResponsesStreamItem & { type: HostedToolType; id: string } =>
item.type in HOSTED_TOOLS && typeof item.id === "string" && item.id.length > 0
// Round-trip the full item as the structured result so consumers can extract
// outputs / sources / status without re-decoding.
const hostedToolResult = (item: OpenAIResponsesStreamItem) => {
const isError = typeof item.error !== "undefined" && item.error !== null
return isError ? { type: "error" as const, value: item.error } : { type: "json" as const, value: item }
}
const hostedToolEvents = (
item: OpenAIResponsesStreamItem & { type: HostedToolType; id: string },
): ReadonlyArray<LLMEvent> => {
const tool = HOSTED_TOOLS[item.type]
const providerMetadata = openaiMetadata({ itemId: item.id })
return [
{
type: "tool-call",
id: item.id,
name: tool.name,
input: tool.input(item),
providerExecuted: true,
providerMetadata,
},
{
type: "tool-result",
id: item.id,
name: tool.name,
result: hostedToolResult(item),
providerExecuted: true,
providerMetadata,
},
]
}
type StepResult = readonly [ParserState, ReadonlyArray<LLMEvent>]
const NO_EVENTS: StepResult["1"] = []
// `response.completed` / `response.incomplete` are clean finishes that emit a
// `request-finish` event; `response.failed` is a hard failure that emits a
// `provider-error`. All three end the stream — kept in one set so `step` and
// the protocol's `terminal` predicate stay in sync.
const TERMINAL_TYPES = new Set(["response.completed", "response.incomplete", "response.failed"])
const onOutputTextDelta = (state: ParserState, event: OpenAIResponsesEvent): StepResult => {
if (!event.delta) return [state, NO_EVENTS]
return [
state,
[
{
type: "text-delta",
id: event.item_id,
text: event.delta,
...(event.item_id ? { providerMetadata: openaiMetadata({ itemId: event.item_id }) } : {}),
},
],
]
}
const onOutputItemAdded = (state: ParserState, event: OpenAIResponsesEvent): StepResult => {
const item = event.item
if (item?.type !== "function_call" || !item.id) return [state, NO_EVENTS]
return [
{
hasFunctionCall: state.hasFunctionCall,
tools: ToolStream.start(state.tools, item.id, {
id: item.call_id ?? item.id,
name: item.name ?? "",
input: item.arguments ?? "",
providerMetadata: openaiMetadata({ itemId: item.id }),
}),
},
NO_EVENTS,
]
}
const onFunctionCallArgumentsDelta = Effect.fn("OpenAIResponses.onFunctionCallArgumentsDelta")(function* (
state: ParserState,
event: OpenAIResponsesEvent,
) {
if (!event.item_id || !event.delta) return [state, NO_EVENTS] satisfies StepResult
const result = ToolStream.appendExisting(
ADAPTER,
state.tools,
event.item_id,
event.delta,
"OpenAI Responses tool argument delta is missing its tool call",
)
if (ToolStream.isError(result)) return yield* result
return [
{ hasFunctionCall: state.hasFunctionCall, tools: result.tools },
result.event ? [result.event] : NO_EVENTS,
] satisfies StepResult
})
const onOutputItemDone = Effect.fn("OpenAIResponses.onOutputItemDone")(function* (
state: ParserState,
event: OpenAIResponsesEvent,
) {
const item = event.item
if (!item) return [state, NO_EVENTS] satisfies StepResult
if (item.type === "function_call") {
if (!item.id || !item.call_id || !item.name) return [state, NO_EVENTS] satisfies StepResult
const tools = state.tools[item.id]
? state.tools
: ToolStream.start(state.tools, item.id, { id: item.call_id, name: item.name })
const result =
item.arguments === undefined
? yield* ToolStream.finish(ADAPTER, tools, item.id)
: yield* ToolStream.finishWithInput(ADAPTER, tools, item.id, item.arguments)
return [
{ hasFunctionCall: result.event ? true : state.hasFunctionCall, tools: result.tools },
result.event ? [result.event] : NO_EVENTS,
] satisfies StepResult
}
if (isHostedToolItem(item)) return [state, hostedToolEvents(item)] satisfies StepResult
return [state, NO_EVENTS] satisfies StepResult
})
const onResponseFinish = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
state,
[
{
type: "request-finish",
reason: mapFinishReason(event, state.hasFunctionCall),
usage: mapUsage(event.response?.usage),
...(event.response?.id || event.response?.service_tier
? {
providerMetadata: openaiMetadata({
responseId: event.response.id,
serviceTier: event.response.service_tier,
}),
}
: {}),
},
],
]
const onResponseFailed = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
state,
[{ type: "provider-error", message: event.message ?? event.code ?? "OpenAI Responses response failed" }],
]
const onError = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
state,
[{ type: "provider-error", message: event.message ?? event.code ?? "OpenAI Responses stream error" }],
]
const step = (state: ParserState, event: OpenAIResponsesEvent) => {
if (event.type === "response.output_text.delta") return Effect.succeed(onOutputTextDelta(state, event))
if (event.type === "response.output_item.added") return Effect.succeed(onOutputItemAdded(state, event))
if (event.type === "response.function_call_arguments.delta") return onFunctionCallArgumentsDelta(state, event)
if (event.type === "response.output_item.done") return onOutputItemDone(state, event)
if (event.type === "response.completed" || event.type === "response.incomplete")
return Effect.succeed(onResponseFinish(state, event))
if (event.type === "response.failed") return Effect.succeed(onResponseFailed(state, event))
if (event.type === "error") return Effect.succeed(onError(state, event))
return Effect.succeed<StepResult>([state, NO_EVENTS])
}
// =============================================================================
// Protocol And OpenAI Route
// =============================================================================
/**
* The OpenAI Responses protocol — request body construction, body schema, and
* the streaming-event state machine. Used by native OpenAI and (once
* registered) Azure OpenAI Responses.
*/
export const protocol = Protocol.make({
id: ADAPTER,
body: {
schema: OpenAIResponsesBody,
from: fromRequest,
},
stream: {
event: Protocol.jsonEvent(OpenAIResponsesEvent),
initial: () => ({ hasFunctionCall: false, tools: ToolStream.empty<string>() }),
step,
terminal: (event) => TERMINAL_TYPES.has(event.type),
},
})
const encodeBody = Schema.encodeSync(Schema.fromJsonString(OpenAIResponsesBody))
const transportBase = {
endpoint: Endpoint.path<OpenAIResponsesBody>(PATH),
auth: Auth.bearer(),
encodeBody,
}
const routeDefaults = {
baseURL: DEFAULT_BASE_URL,
}
export const httpTransport = HttpTransport.httpJson({
...transportBase,
framing: Framing.sse,
})
export const route = Route.make({
id: ADAPTER,
provider: "openai",
protocol,
transport: httpTransport,
defaults: routeDefaults,
})
const decodeWebSocketMessage = ProviderShared.validateWith(Schema.decodeUnknownEffect(OpenAIResponsesWebSocketMessage))
const webSocketMessage = (body: OpenAIResponsesBody | Record<string, unknown>) =>
Effect.gen(function* () {
if (!ProviderShared.isRecord(body))
return yield* ProviderShared.invalidRequest("OpenAI Responses WebSocket body must be a JSON object")
const { stream: _stream, ...message } = body
return yield* decodeWebSocketMessage({ ...message, type: "response.create" })
})
export const webSocketTransport = WebSocketTransport.json({
...transportBase,
toMessage: webSocketMessage,
encodeMessage: encodeWebSocketMessage,
})
export const webSocketRoute = Route.make({
id: `${ADAPTER}-websocket`,
provider: "openai",
protocol,
transport: webSocketTransport,
defaults: routeDefaults,
})
// =============================================================================
// Model Helper
// =============================================================================
export const model = route.model
export const webSocketModel = webSocketRoute.model
export * as OpenAIResponses from "./openai-responses"

View File

@@ -0,0 +1,203 @@
import { Buffer } from "node:buffer"
import { Effect, Schema, Stream } from "effect"
import * as Sse from "effect/unstable/encoding/Sse"
import { Headers, HttpClientRequest } from "effect/unstable/http"
import {
InvalidProviderOutputReason,
InvalidRequestReason,
LLMError,
type ContentPart,
type LLMRequest,
type MediaPart,
type ToolResultPart,
} from "../schema"
export const Json = Schema.fromJsonString(Schema.Unknown)
export const decodeJson = Schema.decodeUnknownSync(Json)
export const encodeJson = Schema.encodeSync(Json)
export const JsonObject = Schema.Record(Schema.String, Schema.Unknown)
export const optionalArray = <const S extends Schema.Top>(schema: S) => Schema.optional(Schema.Array(schema))
export const optionalNull = <const S extends Schema.Top>(schema: S) => Schema.optional(Schema.NullOr(schema))
/**
* Plain-record narrowing. Excludes arrays so routes checking nested JSON
* Schema fragments don't accidentally treat a tuple as a key/value bag.
*/
export const isRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value)
/**
* Streaming tool-call accumulator. Adapters that build a tool call across
* multiple `tool-input-delta` chunks store the partial JSON input string here
* and finalize it with `parseToolInput` once the call completes.
*/
export interface ToolAccumulator {
readonly id: string
readonly name: string
readonly input: string
}
/**
* `Usage.totalTokens` policy shared by every route. Honors a provider-
* supplied total; otherwise falls back to `inputTokens + outputTokens` only
* when at least one is defined. Returns `undefined` when neither input nor
* output is known so routes don't publish a misleading `0`.
*/
export const totalTokens = (
inputTokens: number | undefined,
outputTokens: number | undefined,
total: number | undefined,
) => {
if (total !== undefined) return total
if (inputTokens === undefined && outputTokens === undefined) return undefined
return (inputTokens ?? 0) + (outputTokens ?? 0)
}
export const eventError = (route: string, message: string, raw?: string) =>
new LLMError({
module: "ProviderShared",
method: "stream",
reason: new InvalidProviderOutputReason({ route, message, raw }),
})
export const parseJson = (route: string, input: string, message: string) =>
Effect.try({
try: () => decodeJson(input),
catch: () => eventError(route, message, input),
})
/**
* Join the `text` field of a list of parts with newlines. Used by routes
* that flatten system / message content arrays into a single provider string
* (OpenAI Chat `system` content, OpenAI Responses `system` content, Gemini
* `systemInstruction.parts[].text`).
*/
export const joinText = (parts: ReadonlyArray<{ readonly text: string }>) => parts.map((part) => part.text).join("\n")
/**
* Parse the streamed JSON input of a tool call. Treats an empty string as
* `"{}"` — providers occasionally finish a tool call without ever emitting
* input deltas (e.g. zero-arg tools). The error message is uniform across
* routes: `Invalid JSON input for <route> tool call <name>`.
*/
export const parseToolInput = (route: string, name: string, raw: string) =>
parseJson(route, raw || "{}", `Invalid JSON input for ${route} tool call ${name}`)
/**
* Encode a `MediaPart`'s raw bytes for inclusion in a JSON request body.
* `data: string` is assumed to already be base64 (matches caller convention
* across Gemini / Bedrock); `data: Uint8Array` is base64-encoded here. Used
* by every route that supports image / document inputs.
*/
export const mediaBytes = (part: MediaPart) =>
typeof part.data === "string" ? part.data : Buffer.from(part.data).toString("base64")
export const trimBaseUrl = (value: string) => value.replace(/\/+$/, "")
export const toolResultText = (part: ToolResultPart) => {
if (part.result.type === "text" || part.result.type === "error") return String(part.result.value)
return encodeJson(part.result.value)
}
export const errorText = (error: unknown) => {
if (error instanceof Error) return error.message
if (typeof error === "string") return error
if (typeof error === "number" || typeof error === "boolean" || typeof error === "bigint") return String(error)
if (error === null) return "null"
if (error === undefined) return "undefined"
return "Unknown stream error"
}
/**
* `framing` step for Server-Sent Events. Decodes UTF-8, runs the SSE channel
* decoder, and drops empty / `[DONE]` keep-alive events so the downstream
* `decodeChunk` sees one JSON string per element. The SSE channel emits a
* `Retry` control event on its error channel; we drop it here (we don't
* implement client-driven retries) so the public error channel stays
* `LLMError`.
*/
export const sseFraming = (bytes: Stream.Stream<Uint8Array, LLMError>): Stream.Stream<string, LLMError> =>
bytes.pipe(
Stream.decodeText(),
Stream.pipeThroughChannel(Sse.decode()),
Stream.catchTag("Retry", () => Stream.empty),
Stream.filter((event) => event.data.length > 0 && event.data !== "[DONE]"),
Stream.map((event) => event.data),
)
/**
* Canonical invalid-request constructor. Lift one-line `const invalid =
* (message) => invalidRequest(message)` aliases out of every
* route so the error constructor lives in one place. If we ever extend
* `InvalidRequestReason` with route context or trace metadata, the change
* lands here.
*/
export const invalidRequest = (message: string) =>
new LLMError({
module: "ProviderShared",
method: "request",
reason: new InvalidRequestReason({ message }),
})
export const matchToolChoice = <Auto, None, Required, Tool>(
route: string,
toolChoice: NonNullable<LLMRequest["toolChoice"]>,
cases: {
readonly auto: () => Auto
readonly none: () => None
readonly required: () => Required
readonly tool: (name: string) => Tool
},
) =>
Effect.gen(function* () {
if (toolChoice.type === "auto") return cases.auto()
if (toolChoice.type === "none") return cases.none()
if (toolChoice.type === "required") return cases.required()
if (!toolChoice.name) return yield* invalidRequest(`${route} tool choice requires a tool name`)
return cases.tool(toolChoice.name)
})
type ContentType = ContentPart["type"]
const formatContentTypes = (types: ReadonlyArray<ContentType>) => {
if (types.length <= 1) return types[0] ?? ""
if (types.length === 2) return `${types[0]} and ${types[1]}`
return `${types.slice(0, -1).join(", ")}, and ${types.at(-1)}`
}
export const supportsContent = <const Type extends ContentType>(
part: ContentPart,
types: ReadonlyArray<Type>,
): part is Extract<ContentPart, { readonly type: Type }> => (types as ReadonlyArray<ContentType>).includes(part.type)
export const unsupportedContent = (
route: string,
role: LLMRequest["messages"][number]["role"],
types: ReadonlyArray<ContentType>,
) => invalidRequest(`${route} ${role} messages only support ${formatContentTypes(types)} content for now`)
/**
* Build a `validate` step from a Schema decoder. Replaces the per-route
* lambda body `(payload) => decode(payload).pipe(Effect.mapError((e) =>
* invalid(e.message)))`. Any decode error is translated into
* `LLMError` carrying the original parse-error message.
*/
export const validateWith =
<A, I, E extends { readonly message: string }>(decode: (input: I) => Effect.Effect<A, E>) =>
(payload: I) =>
decode(payload).pipe(Effect.mapError((error) => invalidRequest(error.message)))
/**
* Build an HTTP POST with a JSON body. Sets `content-type: application/json`
* automatically after caller-supplied headers so routes cannot accidentally
* send JSON with a stale content type. The body is passed pre-encoded so
* routes can choose between
* `Schema.encodeSync(payload)` and `ProviderShared.encodeJson(payload)`.
*/
export const jsonPost = (input: { readonly url: string; readonly body: string; readonly headers?: Headers.Input }) =>
HttpClientRequest.post(input.url).pipe(
HttpClientRequest.setHeaders(Headers.set(Headers.fromInput(input.headers), "content-type", "application/json")),
HttpClientRequest.bodyText(input.body, "application/json"),
)
export * as ProviderShared from "./shared"

View File

@@ -0,0 +1,103 @@
import { AwsV4Signer } from "aws4fetch"
import { Effect, Option, Schema } from "effect"
import { Headers } from "effect/unstable/http"
import { Auth, type AuthInput } from "../../route/auth"
import type { LLMRequest } from "../../schema"
import { ProviderShared } from "../shared"
/**
* AWS credentials for SigV4 signing. Bedrock also supports Bearer API key auth
* via `model.apiKey`, which bypasses SigV4 signing. STS-vended credentials
* should be refreshed by the consumer (rebuild the model) before they expire;
* the route does not refresh.
*/
export interface Credentials {
readonly region: string
readonly accessKeyId: string
readonly secretAccessKey: string
readonly sessionToken?: string
}
const NativeCredentials = Schema.Struct({
accessKeyId: Schema.String,
secretAccessKey: Schema.String,
region: Schema.optional(Schema.String),
sessionToken: Schema.optional(Schema.String),
})
const decodeNativeCredentials = Schema.decodeUnknownOption(NativeCredentials)
export const region = (request: LLMRequest) => {
const fromNative = request.model.native?.aws_region
if (typeof fromNative === "string" && fromNative !== "") return fromNative
return (
decodeNativeCredentials(request.model.native?.aws_credentials).pipe(
Option.map((credentials) => credentials.region),
Option.getOrUndefined,
) ?? "us-east-1"
)
}
const credentialsFromInput = (request: LLMRequest): Credentials | undefined =>
decodeNativeCredentials(request.model.native?.aws_credentials).pipe(
Option.map((creds) => ({ ...creds, region: creds.region ?? region(request) })),
Option.getOrUndefined,
)
const signRequest = (input: {
readonly url: string
readonly body: string
readonly headers: Headers.Headers
readonly credentials: Credentials
}) =>
Effect.tryPromise({
try: async () => {
const signed = await new AwsV4Signer({
url: input.url,
method: "POST",
headers: Object.entries(input.headers),
body: input.body,
region: input.credentials.region,
accessKeyId: input.credentials.accessKeyId,
secretAccessKey: input.credentials.secretAccessKey,
sessionToken: input.credentials.sessionToken,
service: "bedrock",
}).sign()
return Object.fromEntries(signed.headers.entries())
},
catch: (error) =>
ProviderShared.invalidRequest(
`Bedrock Converse SigV4 signing failed: ${error instanceof Error ? error.message : String(error)}`,
),
})
/**
* Bedrock auth. `model.apiKey` (Bedrock's newer Bearer API key auth) wins if
* set; otherwise sign the exact JSON bytes with SigV4 using credentials from
* `model.native.aws_credentials`.
*/
export const auth = Auth.custom((input: AuthInput) => {
if (input.request.model.apiKey) return Auth.toEffect(Auth.bearer())(input)
return Effect.gen(function* () {
const credentials = credentialsFromInput(input.request)
if (!credentials) {
return yield* ProviderShared.invalidRequest(
"Bedrock Converse requires either model.apiKey or AWS credentials in model.native.aws_credentials",
)
}
const headersForSigning = Headers.set(input.headers, "content-type", "application/json")
const signed = yield* signRequest({ url: input.url, body: input.body, headers: headersForSigning, credentials })
return Headers.setAll(headersForSigning, signed)
})
})
export const nativeCredentials = (native: Record<string, unknown> | undefined, credentials: Credentials | undefined) =>
credentials
? {
...native,
aws_credentials: credentials,
aws_region: credentials.region,
}
: native
export * as BedrockAuth from "./bedrock-auth"

View File

@@ -0,0 +1,20 @@
import { Schema } from "effect"
import type { CacheHint } from "../../schema"
// Bedrock cache markers are positional: emit a `cachePoint` block immediately
// after the content the caller wants treated as a cacheable prefix.
export const CachePointBlock = Schema.Struct({
cachePoint: Schema.Struct({ type: Schema.tag("default") }),
})
export type CachePointBlock = Schema.Schema.Type<typeof CachePointBlock>
// Bedrock recently added optional `ttl: "5m" | "1h"` on cachePoint. Map
// `CacheHint.ttlSeconds` here once a recorded cassette validates the wire shape.
const DEFAULT: CachePointBlock = { cachePoint: { type: "default" } }
export const block = (cache: CacheHint | undefined): CachePointBlock | undefined => {
if (cache?.type !== "ephemeral" && cache?.type !== "persistent") return undefined
return DEFAULT
}
export * as BedrockCache from "./bedrock-cache"

View File

@@ -0,0 +1,80 @@
import { Effect, Schema } from "effect"
import type { MediaPart } from "../../schema"
import { ProviderShared } from "../shared"
// Bedrock Converse accepts image `format` as the file extension and
// `source.bytes` as base64 in the JSON wire format.
export const ImageFormat = Schema.Literals(["png", "jpeg", "gif", "webp"])
export type ImageFormat = Schema.Schema.Type<typeof ImageFormat>
export const ImageBlock = Schema.Struct({
image: Schema.Struct({
format: ImageFormat,
source: Schema.Struct({ bytes: Schema.String }),
}),
})
export type ImageBlock = Schema.Schema.Type<typeof ImageBlock>
// Bedrock document blocks require a user-facing name so the model can refer to
// the uploaded document.
export const DocumentFormat = Schema.Literals(["pdf", "csv", "doc", "docx", "xls", "xlsx", "html", "txt", "md"])
export type DocumentFormat = Schema.Schema.Type<typeof DocumentFormat>
export const DocumentBlock = Schema.Struct({
document: Schema.Struct({
format: DocumentFormat,
name: Schema.String,
source: Schema.Struct({ bytes: Schema.String }),
}),
})
export type DocumentBlock = Schema.Schema.Type<typeof DocumentBlock>
const IMAGE_FORMATS = {
"image/png": "png",
"image/jpeg": "jpeg",
"image/jpg": "jpeg",
"image/gif": "gif",
"image/webp": "webp",
} as const satisfies Record<string, ImageFormat>
const DOCUMENT_FORMATS = {
"application/pdf": "pdf",
"text/csv": "csv",
"application/msword": "doc",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": "docx",
"application/vnd.ms-excel": "xls",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": "xlsx",
"text/html": "html",
"text/plain": "txt",
"text/markdown": "md",
} as const satisfies Record<string, DocumentFormat>
const imageBlock = (part: MediaPart, format: ImageFormat): ImageBlock => ({
image: { format, source: { bytes: ProviderShared.mediaBytes(part) } },
})
const documentBlock = (part: MediaPart, format: DocumentFormat): DocumentBlock => ({
document: {
format,
name: part.filename ?? `document.${format}`,
source: { bytes: ProviderShared.mediaBytes(part) },
},
})
// Route by MIME. Known image/document formats lower into a typed block; anything
// else fails with a clear error instead of silently degrading to a malformed
// document block. Image MIME types not in `IMAGE_FORMATS` (e.g. `image/svg+xml`)
// get an image-specific error so the caller knows it's a format-support issue,
// not a kind-detection issue.
export const lower = (part: MediaPart) => {
const mime = part.mediaType.toLowerCase()
const imageFormat = IMAGE_FORMATS[mime as keyof typeof IMAGE_FORMATS]
if (imageFormat) return Effect.succeed(imageBlock(part, imageFormat))
if (mime.startsWith("image/"))
return ProviderShared.invalidRequest(`Bedrock Converse does not support image media type ${part.mediaType}`)
const documentFormat = DOCUMENT_FORMATS[mime as keyof typeof DOCUMENT_FORMATS]
if (documentFormat) return Effect.succeed(documentBlock(part, documentFormat))
return ProviderShared.invalidRequest(`Bedrock Converse does not support media type ${part.mediaType}`)
}
export * as BedrockMedia from "./bedrock-media"

View File

@@ -0,0 +1,101 @@
import { ProviderShared } from "../shared"
// Gemini accepts a JSON Schema-like dialect for tool parameters, but rejects a
// handful of common JSON Schema shapes. Keep this projection isolated so the
// Gemini protocol file still reads like the other protocol modules.
const SCHEMA_INTENT_KEYS = [
"type",
"properties",
"items",
"prefixItems",
"enum",
"const",
"$ref",
"additionalProperties",
"patternProperties",
"required",
"not",
"if",
"then",
"else",
]
const isRecord = ProviderShared.isRecord
const hasCombiner = (schema: unknown) =>
isRecord(schema) && (Array.isArray(schema.anyOf) || Array.isArray(schema.oneOf) || Array.isArray(schema.allOf))
const hasSchemaIntent = (schema: unknown) =>
isRecord(schema) && (hasCombiner(schema) || SCHEMA_INTENT_KEYS.some((key) => key in schema))
const sanitizeNode = (schema: unknown): unknown => {
if (!isRecord(schema)) return Array.isArray(schema) ? schema.map(sanitizeNode) : schema
const result: Record<string, unknown> = Object.fromEntries(
Object.entries(schema).map(([key, value]) => [
key,
key === "enum" && Array.isArray(value) ? value.map(String) : sanitizeNode(value),
]),
)
if (Array.isArray(result.enum) && (result.type === "integer" || result.type === "number")) result.type = "string"
const properties = result.properties
if (result.type === "object" && isRecord(properties) && Array.isArray(result.required)) {
result.required = result.required.filter((field) => typeof field === "string" && field in properties)
}
if (result.type === "array" && !hasCombiner(result)) {
result.items = result.items ?? {}
if (isRecord(result.items) && !hasSchemaIntent(result.items)) result.items = { ...result.items, type: "string" }
}
if (typeof result.type === "string" && result.type !== "object" && !hasCombiner(result)) {
delete result.properties
delete result.required
}
return result
}
const emptyObjectSchema = (schema: Record<string, unknown>) =>
schema.type === "object" &&
(!isRecord(schema.properties) || Object.keys(schema.properties).length === 0) &&
!schema.additionalProperties
const projectNode = (schema: unknown): Record<string, unknown> | undefined => {
if (!isRecord(schema)) return undefined
if (emptyObjectSchema(schema)) return undefined
return Object.fromEntries(
[
["description", schema.description],
["required", schema.required],
["format", schema.format],
["type", Array.isArray(schema.type) ? schema.type.filter((type) => type !== "null")[0] : schema.type],
["nullable", Array.isArray(schema.type) && schema.type.includes("null") ? true : undefined],
["enum", schema.const !== undefined ? [schema.const] : schema.enum],
[
"properties",
isRecord(schema.properties)
? Object.fromEntries(Object.entries(schema.properties).map(([key, value]) => [key, projectNode(value)]))
: undefined,
],
[
"items",
Array.isArray(schema.items)
? schema.items.map(projectNode)
: schema.items === undefined
? undefined
: projectNode(schema.items),
],
["allOf", Array.isArray(schema.allOf) ? schema.allOf.map(projectNode) : undefined],
["anyOf", Array.isArray(schema.anyOf) ? schema.anyOf.map(projectNode) : undefined],
["oneOf", Array.isArray(schema.oneOf) ? schema.oneOf.map(projectNode) : undefined],
["minLength", schema.minLength],
].filter((entry) => entry[1] !== undefined),
)
}
export const convert = (schema: unknown) => projectNode(sanitizeNode(schema))
export * as GeminiToolSchema from "./gemini-tool-schema"

View File

@@ -0,0 +1,55 @@
import { Schema } from "effect"
import type { LLMRequest, ReasoningEffort, TextVerbosity as TextVerbosityValue } from "../../schema"
import { ReasoningEfforts, TextVerbosity } from "../../schema"
export const OpenAIReasoningEfforts = ReasoningEfforts.filter(
(effort): effort is Exclude<ReasoningEffort, "max"> => effort !== "max",
)
export type OpenAIReasoningEffort = (typeof OpenAIReasoningEfforts)[number]
const REASONING_EFFORTS = new Set<string>(ReasoningEfforts)
const OPENAI_REASONING_EFFORTS = new Set<string>(OpenAIReasoningEfforts)
const TEXT_VERBOSITY = new Set<string>(["low", "medium", "high"])
export const OpenAIReasoningEffort = Schema.Literals(OpenAIReasoningEfforts)
export const OpenAITextVerbosity = TextVerbosity
const isAnyReasoningEffort = (effort: unknown): effort is ReasoningEffort =>
typeof effort === "string" && REASONING_EFFORTS.has(effort)
export const isReasoningEffort = (effort: unknown): effort is OpenAIReasoningEffort =>
typeof effort === "string" && OPENAI_REASONING_EFFORTS.has(effort)
const isTextVerbosity = (value: unknown): value is TextVerbosityValue =>
typeof value === "string" && TEXT_VERBOSITY.has(value)
const options = (request: LLMRequest) => request.providerOptions?.openai
export const store = (request: LLMRequest): boolean | undefined => {
const value = options(request)?.store
return typeof value === "boolean" ? value : undefined
}
export const reasoningEffort = (request: LLMRequest): ReasoningEffort | undefined => {
const value = options(request)?.reasoningEffort
return isAnyReasoningEffort(value) ? value : undefined
}
export const reasoningSummary = (request: LLMRequest): "auto" | undefined => {
return options(request)?.reasoningSummary === "auto" ? "auto" : undefined
}
export const encryptedReasoning = (request: LLMRequest) =>
options(request)?.includeEncryptedReasoning === true ? true : undefined
export const promptCacheKey = (request: LLMRequest) => {
const value = options(request)?.promptCacheKey
return typeof value === "string" ? value : undefined
}
export const textVerbosity = (request: LLMRequest) => {
const value = options(request)?.textVerbosity
return isTextVerbosity(value) ? value : undefined
}
export * as OpenAIOptions from "./openai-options"

View File

@@ -0,0 +1,196 @@
import { Effect } from "effect"
import { LLMError, type ProviderMetadata, type ToolCall, type ToolInputDelta } from "../../schema"
import { eventError, parseToolInput, type ToolAccumulator } from "../shared"
type StreamKey = string | number
/**
* One pending streamed tool call. Providers emit the tool identity and JSON
* argument text across separate chunks; `input` is the raw JSON string collected
* so far, not the parsed object.
*/
export interface PendingTool extends ToolAccumulator {
readonly providerExecuted?: boolean
readonly providerMetadata?: ProviderMetadata
}
/**
* Sparse parser state keyed by the provider's stream-local tool identifier.
*
* This key is not the final tool-call id (`call_...`). It is the id/index the
* provider uses while streaming a partial call: OpenAI Chat / Anthropic /
* Bedrock use numeric content indexes, while OpenAI Responses uses string
* `item_id`s. The generic keeps each protocol internally consistent.
*/
export type State<K extends StreamKey> = Partial<Record<K, PendingTool>>
/**
* Result of adding argument text to one pending tool call. It returns both the
* next `tools` state and the updated `tool` because parsers often need the
* current id/name immediately. `event` is present only when new text arrived;
* metadata-only deltas update identity without emitting `tool-input-delta`.
*/
export interface AppendOutcome<K extends StreamKey> {
readonly tools: State<K>
readonly tool: PendingTool
readonly event?: ToolInputDelta
}
/** Create empty accumulator state for one provider stream. */
export const empty = <K extends StreamKey>(): State<K> => ({})
const withTool = <K extends StreamKey>(tools: State<K>, key: K, tool: PendingTool): State<K> => {
return { ...tools, [key]: tool }
}
const withoutTool = <K extends StreamKey>(tools: State<K>, key: K): State<K> => {
const next = { ...tools }
delete next[key]
return next
}
const inputDelta = (tool: PendingTool, text: string): ToolInputDelta => ({
type: "tool-input-delta",
id: tool.id,
name: tool.name,
text,
...(tool.providerMetadata ? { providerMetadata: tool.providerMetadata } : {}),
})
const toolCall = (route: string, tool: PendingTool, inputOverride?: string) =>
parseToolInput(route, tool.name, inputOverride ?? tool.input).pipe(
Effect.map(
(input): ToolCall =>
tool.providerExecuted
? {
type: "tool-call",
id: tool.id,
name: tool.name,
input,
providerExecuted: true,
...(tool.providerMetadata ? { providerMetadata: tool.providerMetadata } : {}),
}
: {
type: "tool-call",
id: tool.id,
name: tool.name,
input,
...(tool.providerMetadata ? { providerMetadata: tool.providerMetadata } : {}),
},
),
)
/** Store the updated tool and produce the optional public delta event. */
const appendTool = <K extends StreamKey>(
tools: State<K>,
key: K,
tool: PendingTool,
text: string,
): AppendOutcome<K> => ({
tools: withTool(tools, key, tool),
tool,
event: text.length === 0 ? undefined : inputDelta(tool, text),
})
export const isError = <K extends StreamKey>(result: AppendOutcome<K> | LLMError): result is LLMError =>
result instanceof LLMError
/**
* Register a tool call whose start event arrived before any argument deltas.
* Used by Anthropic `content_block_start`, Bedrock `contentBlockStart`, and
* OpenAI Responses `response.output_item.added`.
*/
export const start = <K extends StreamKey>(
tools: State<K>,
key: K,
tool: Omit<PendingTool, "input"> & { readonly input?: string },
) => withTool(tools, key, { ...tool, input: tool.input ?? "" })
/**
* Append a streamed argument delta, starting the tool if this provider encodes
* identity on the first delta instead of a separate start event. OpenAI Chat has
* this shape: `tool_calls[].index` is the stream key, and `id` / `name` may only
* appear on the first delta for that index.
*/
export const appendOrStart = <K extends StreamKey>(
route: string,
tools: State<K>,
key: K,
delta: { readonly id?: string; readonly name?: string; readonly text: string },
missingToolMessage: string,
): AppendOutcome<K> | LLMError => {
const current = tools[key]
const id = delta.id ?? current?.id
const name = delta.name ?? current?.name
if (!id || !name) return eventError(route, missingToolMessage)
const tool = {
id,
name,
input: `${current?.input ?? ""}${delta.text}`,
providerExecuted: current?.providerExecuted,
providerMetadata: current?.providerMetadata,
}
if (current && delta.text.length === 0 && current.id === id && current.name === name) return { tools, tool: current }
return appendTool(tools, key, tool, delta.text)
}
/**
* Append argument text to a tool that must already have been started. This keeps
* protocols honest when their stream grammar promises a start event before any
* argument delta.
*/
export const appendExisting = <K extends StreamKey>(
route: string,
tools: State<K>,
key: K,
text: string,
missingToolMessage: string,
): AppendOutcome<K> | LLMError => {
const current = tools[key]
if (!current) return eventError(route, missingToolMessage)
if (text.length === 0) return { tools, tool: current }
return appendTool(tools, key, { ...current, input: `${current.input}${text}` }, text)
}
/**
* Finalize one pending tool call: parse the accumulated raw JSON, remove it
* from state, and return the optional public `tool-call` event. Missing keys are
* a no-op because some providers emit stop events for non-tool content blocks.
*/
export const finish = <K extends StreamKey>(route: string, tools: State<K>, key: K) =>
Effect.gen(function* () {
const tool = tools[key]
if (!tool) return { tools }
return { tools: withoutTool(tools, key), event: yield* toolCall(route, tool) }
})
/**
* Finalize one pending tool call with an authoritative final input string.
* OpenAI Responses can send accumulated deltas and then repeat the completed
* arguments on `response.output_item.done`; the final value wins.
*/
export const finishWithInput = <K extends StreamKey>(route: string, tools: State<K>, key: K, input: string) =>
Effect.gen(function* () {
const tool = tools[key]
if (!tool) return { tools }
return { tools: withoutTool(tools, key), event: yield* toolCall(route, tool, input) }
})
/**
* Finalize every pending tool call at once. OpenAI Chat has this shape: it does
* not emit per-tool stop events, so all accumulated calls finish when the choice
* receives a terminal `finish_reason`.
*/
export const finishAll = <K extends StreamKey>(route: string, tools: State<K>) =>
Effect.gen(function* () {
const pending = Object.values<PendingTool | undefined>(tools).filter(
(tool): tool is PendingTool => tool !== undefined,
)
return {
tools: empty<K>(),
events: yield* Effect.forEach(pending, (tool) => toolCall(route, tool)),
}
})
export * as ToolStream from "./tool-stream"