mirror of
https://github.com/openai/codex.git
synced 2026-05-24 13:04:29 +00:00
## Why Rollout traces need an identifier that can be used to correlate a Codex inference with upstream Responses API, proxy, and engine logs. The reduced trace model already exposed `upstream_request_id`, but it was being populated from the Responses API `response.id`. That value is useful for `previous_response_id` chaining, but it is not the transport request id that upstream systems key on. This PR separates those concepts so trace consumers can reliably answer both questions: - which Responses API response did this inference produce? - which upstream request handled it? ## Structure The change keeps the upstream request id at the same lifecycle level as the provider stream: - `codex-api` captures the `x-request-id` HTTP response header when the SSE stream is created and exposes it on `ResponseStream`. Fixture and websocket streams set the field to `None` because they do not have that HTTP response header. - `codex-core` carries that stream-level id into `InferenceTraceAttempt` when recording terminal stream outcomes. Completed, failed, cancelled, dropped-stream, and pre-response error paths all record the id when it is available. - `rollout-trace` now records both identifiers in raw terminal inference events and response payloads: `response_id` for the Responses API `response.id`, and `upstream_request_id` for `x-request-id`. - The reducer stores both fields on `InferenceCall`. It also uses `response_id` for `previous_response_id` conversation linking, which removes the old accidental dependency on the misnamed `upstream_request_id` field. - Terminal inference reduction now consumes the full terminal payload (`InferenceCompleted`, `InferenceFailed`, or `InferenceCancelled`) in one place. That keeps status, partial payloads, response ids, and upstream request ids consistent across success, failure, cancellation, and late stream-mapper events. ## Why This Shape `x-request-id` is a property of the HTTP/provider response envelope, not an SSE event. Capturing it once in `codex-api` and plumbing it through terminal trace recording avoids trying to infer the value from stream contents, and it preserves the id even when the stream fails or is cancelled after only partial output. Keeping `response_id` separate from `upstream_request_id` also makes the reduced trace model less surprising: `response_id` remains the conversation-continuation id, while `upstream_request_id` is the operational correlation id for upstream debugging. ## Validation The PR updates trace and reducer coverage for: - reading `x-request-id` from SSE response headers; - storing the true upstream request id on completed inference calls; - preserving upstream request ids for cancelled and late-cancelled inference streams; - keeping `previous_response_id` reconstruction tied to `response_id` rather than transport request ids.
301 lines
9.8 KiB
Rust
301 lines
9.8 KiB
Rust
use crate::error::ApiError;
|
|
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
|
use codex_protocol::config_types::Verbosity as VerbosityConfig;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
|
use codex_protocol::protocol::ModelVerification;
|
|
use codex_protocol::protocol::RateLimitSnapshot;
|
|
use codex_protocol::protocol::TokenUsage;
|
|
use codex_protocol::protocol::W3cTraceContext;
|
|
use futures::Stream;
|
|
use serde::Deserialize;
|
|
use serde::Serialize;
|
|
use serde_json::Value;
|
|
use std::collections::HashMap;
|
|
use std::pin::Pin;
|
|
use std::task::Context;
|
|
use std::task::Poll;
|
|
use tokio::sync::mpsc;
|
|
|
|
pub const WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY: &str = "ws_request_header_traceparent";
|
|
pub const WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY: &str = "ws_request_header_tracestate";
|
|
|
|
/// Canonical input payload for the compaction endpoint.
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct CompactionInput<'a> {
|
|
pub model: &'a str,
|
|
pub input: &'a [ResponseItem],
|
|
#[serde(skip_serializing_if = "str::is_empty")]
|
|
pub instructions: &'a str,
|
|
pub tools: Vec<Value>,
|
|
pub parallel_tool_calls: bool,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub reasoning: Option<Reasoning>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub text: Option<TextControls>,
|
|
}
|
|
|
|
/// Canonical input payload for the memory summarize endpoint.
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct MemorySummarizeInput {
|
|
pub model: String,
|
|
#[serde(rename = "traces")]
|
|
pub raw_memories: Vec<RawMemory>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub reasoning: Option<Reasoning>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct RawMemory {
|
|
pub id: String,
|
|
pub metadata: RawMemoryMetadata,
|
|
pub items: Vec<Value>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct RawMemoryMetadata {
|
|
pub source_path: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
|
|
pub struct MemorySummarizeOutput {
|
|
#[serde(rename = "trace_summary", alias = "raw_memory")]
|
|
pub raw_memory: String,
|
|
pub memory_summary: String,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum ResponseEvent {
|
|
Created,
|
|
OutputItemDone(ResponseItem),
|
|
OutputItemAdded(ResponseItem),
|
|
/// Emitted when the server includes `OpenAI-Model` on the stream response.
|
|
/// This can differ from the requested model when backend safety routing applies.
|
|
ServerModel(String),
|
|
/// Emitted when the server recommends additional account verification.
|
|
ModelVerifications(Vec<ModelVerification>),
|
|
/// Emitted when `X-Reasoning-Included: true` is present on the response,
|
|
/// meaning the server already accounted for past reasoning tokens and the
|
|
/// client should not re-estimate them.
|
|
ServerReasoningIncluded(bool),
|
|
Completed {
|
|
response_id: String,
|
|
token_usage: Option<TokenUsage>,
|
|
/// Did the model affirmatively end its turn? Some providers do not set this,
|
|
/// so we rely on fallback logic when this is `None`.
|
|
end_turn: Option<bool>,
|
|
},
|
|
OutputTextDelta(String),
|
|
ToolCallInputDelta {
|
|
item_id: String,
|
|
call_id: Option<String>,
|
|
delta: String,
|
|
},
|
|
ReasoningSummaryDelta {
|
|
delta: String,
|
|
summary_index: i64,
|
|
},
|
|
ReasoningContentDelta {
|
|
delta: String,
|
|
content_index: i64,
|
|
},
|
|
ReasoningSummaryPartAdded {
|
|
summary_index: i64,
|
|
},
|
|
RateLimits(RateLimitSnapshot),
|
|
ModelsEtag(String),
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Clone, PartialEq)]
|
|
pub struct Reasoning {
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub effort: Option<ReasoningEffortConfig>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub summary: Option<ReasoningSummaryConfig>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Default, Clone, PartialEq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum TextFormatType {
|
|
#[default]
|
|
JsonSchema,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Default, Clone, PartialEq)]
|
|
pub struct TextFormat {
|
|
/// Format type used by the OpenAI text controls.
|
|
pub r#type: TextFormatType,
|
|
/// When true, the server is expected to strictly validate responses.
|
|
pub strict: bool,
|
|
/// JSON schema for the desired output.
|
|
pub schema: Value,
|
|
/// Friendly name for the format, used in telemetry/debugging.
|
|
pub name: String,
|
|
}
|
|
|
|
/// Controls the `text` field for the Responses API, combining verbosity and
|
|
/// optional JSON schema output formatting.
|
|
#[derive(Debug, Serialize, Default, Clone, PartialEq)]
|
|
pub struct TextControls {
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub verbosity: Option<OpenAiVerbosity>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub format: Option<TextFormat>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Default, Clone, PartialEq)]
|
|
#[serde(rename_all = "lowercase")]
|
|
pub enum OpenAiVerbosity {
|
|
Low,
|
|
#[default]
|
|
Medium,
|
|
High,
|
|
}
|
|
|
|
impl From<VerbosityConfig> for OpenAiVerbosity {
|
|
fn from(v: VerbosityConfig) -> Self {
|
|
match v {
|
|
VerbosityConfig::Low => OpenAiVerbosity::Low,
|
|
VerbosityConfig::Medium => OpenAiVerbosity::Medium,
|
|
VerbosityConfig::High => OpenAiVerbosity::High,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Clone, PartialEq)]
|
|
pub struct ResponsesApiRequest {
|
|
pub model: String,
|
|
#[serde(skip_serializing_if = "String::is_empty")]
|
|
pub instructions: String,
|
|
pub input: Vec<ResponseItem>,
|
|
pub tools: Vec<serde_json::Value>,
|
|
pub tool_choice: String,
|
|
pub parallel_tool_calls: bool,
|
|
pub reasoning: Option<Reasoning>,
|
|
pub store: bool,
|
|
pub stream: bool,
|
|
pub include: Vec<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub service_tier: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub prompt_cache_key: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub text: Option<TextControls>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub client_metadata: Option<HashMap<String, String>>,
|
|
}
|
|
|
|
impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
|
|
fn from(request: &ResponsesApiRequest) -> Self {
|
|
Self {
|
|
model: request.model.clone(),
|
|
instructions: request.instructions.clone(),
|
|
previous_response_id: None,
|
|
input: request.input.clone(),
|
|
tools: request.tools.clone(),
|
|
tool_choice: request.tool_choice.clone(),
|
|
parallel_tool_calls: request.parallel_tool_calls,
|
|
reasoning: request.reasoning.clone(),
|
|
store: request.store,
|
|
stream: request.stream,
|
|
include: request.include.clone(),
|
|
service_tier: request.service_tier.clone(),
|
|
prompt_cache_key: request.prompt_cache_key.clone(),
|
|
text: request.text.clone(),
|
|
generate: None,
|
|
client_metadata: request.client_metadata.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
pub struct ResponseCreateWsRequest {
|
|
pub model: String,
|
|
#[serde(skip_serializing_if = "String::is_empty")]
|
|
pub instructions: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub previous_response_id: Option<String>,
|
|
pub input: Vec<ResponseItem>,
|
|
pub tools: Vec<Value>,
|
|
pub tool_choice: String,
|
|
pub parallel_tool_calls: bool,
|
|
pub reasoning: Option<Reasoning>,
|
|
pub store: bool,
|
|
pub stream: bool,
|
|
pub include: Vec<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub service_tier: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub prompt_cache_key: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub text: Option<TextControls>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub generate: Option<bool>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub client_metadata: Option<HashMap<String, String>>,
|
|
}
|
|
|
|
pub fn response_create_client_metadata(
|
|
client_metadata: Option<HashMap<String, String>>,
|
|
trace: Option<&W3cTraceContext>,
|
|
) -> Option<HashMap<String, String>> {
|
|
let mut client_metadata = client_metadata.unwrap_or_default();
|
|
|
|
if let Some(traceparent) = trace.and_then(|trace| trace.traceparent.as_deref()) {
|
|
client_metadata.insert(
|
|
WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY.to_string(),
|
|
traceparent.to_string(),
|
|
);
|
|
}
|
|
if let Some(tracestate) = trace.and_then(|trace| trace.tracestate.as_deref()) {
|
|
client_metadata.insert(
|
|
WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY.to_string(),
|
|
tracestate.to_string(),
|
|
);
|
|
}
|
|
|
|
(!client_metadata.is_empty()).then_some(client_metadata)
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
#[serde(tag = "type")]
|
|
#[allow(clippy::large_enum_variant)]
|
|
pub enum ResponsesWsRequest {
|
|
#[serde(rename = "response.create")]
|
|
ResponseCreate(ResponseCreateWsRequest),
|
|
}
|
|
|
|
pub fn create_text_param_for_request(
|
|
verbosity: Option<VerbosityConfig>,
|
|
output_schema: &Option<Value>,
|
|
output_schema_strict: bool,
|
|
) -> Option<TextControls> {
|
|
if verbosity.is_none() && output_schema.is_none() {
|
|
return None;
|
|
}
|
|
|
|
Some(TextControls {
|
|
verbosity: verbosity.map(std::convert::Into::into),
|
|
format: output_schema.as_ref().map(|schema| TextFormat {
|
|
r#type: TextFormatType::JsonSchema,
|
|
strict: output_schema_strict,
|
|
schema: schema.clone(),
|
|
name: "codex_output_schema".to_string(),
|
|
}),
|
|
})
|
|
}
|
|
|
|
pub struct ResponseStream {
|
|
pub rx_event: mpsc::Receiver<Result<ResponseEvent, ApiError>>,
|
|
/// Server-assigned `x-request-id` response header, when present.
|
|
pub upstream_request_id: Option<String>,
|
|
}
|
|
|
|
impl Stream for ResponseStream {
|
|
type Item = Result<ResponseEvent, ApiError>;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
self.rx_event.poll_recv(cx)
|
|
}
|
|
}
|