Files
codex/codex-rs/rollout-trace/src/raw_event.rs
cassirer-openai 89698ad1c3 [rollout-trace] Include x-request-id in rollout trace. (#20066)
## Why

Rollout traces need an identifier that can be used to correlate a Codex
inference with upstream Responses API, proxy, and engine logs. The
reduced trace model already exposed `upstream_request_id`, but it was
being populated from the Responses API `response.id`. That value is
useful for `previous_response_id` chaining, but it is not the transport
request id that upstream systems key on.

This PR separates those concepts so trace consumers can reliably answer
both questions:

- which Responses API response did this inference produce?
- which upstream request handled it?

## Structure

The change keeps the upstream request id at the same lifecycle level as
the provider stream:

- `codex-api` captures the `x-request-id` HTTP response header when the
SSE stream is created and exposes it on `ResponseStream`. Fixture and
websocket streams set the field to `None` because they do not have that
HTTP response header.
- `codex-core` carries that stream-level id into `InferenceTraceAttempt`
when recording terminal stream outcomes. Completed, failed, cancelled,
dropped-stream, and pre-response error paths all record the id when it
is available.
- `rollout-trace` now records both identifiers in raw terminal inference
events and response payloads: `response_id` for the Responses API
`response.id`, and `upstream_request_id` for `x-request-id`.
- The reducer stores both fields on `InferenceCall`. It also uses
`response_id` for `previous_response_id` conversation linking, which
removes the old accidental dependency on the misnamed
`upstream_request_id` field.
- Terminal inference reduction now consumes the full terminal payload
(`InferenceCompleted`, `InferenceFailed`, or `InferenceCancelled`) in
one place. That keeps status, partial payloads, response ids, and
upstream request ids consistent across success, failure, cancellation,
and late stream-mapper events.

## Why This Shape

`x-request-id` is a property of the HTTP/provider response envelope, not
an SSE event. Capturing it once in `codex-api` and plumbing it through
terminal trace recording avoids trying to infer the value from stream
contents, and it preserves the id even when the stream fails or is
cancelled after only partial output.

Keeping `response_id` separate from `upstream_request_id` also makes the
reduced trace model less surprising: `response_id` remains the
conversation-continuation id, while `upstream_request_id` is the
operational correlation id for upstream debugging.

## Validation

The PR updates trace and reducer coverage for:

- reading `x-request-id` from SSE response headers;
- storing the true upstream request id on completed inference calls;
- preserving upstream request ids for cancelled and late-cancelled
inference streams;
- keeping `previous_response_id` reconstruction tied to `response_id`
rather than transport request ids.
2026-04-28 21:11:17 +00:00

306 lines
11 KiB
Rust

//! Append-only raw trace events.
use crate::model::AgentThreadId;
use crate::model::CodeCellRuntimeStatus;
use crate::model::CodexTurnId;
use crate::model::CompactionId;
use crate::model::CompactionRequestId;
use crate::model::EdgeId;
use crate::model::ExecutionStatus;
use crate::model::InferenceCallId;
use crate::model::ModelVisibleCallId;
use crate::model::RolloutStatus;
use crate::model::ToolCallId;
use crate::model::ToolCallKind;
use crate::model::ToolCallSummary;
use crate::payload::RawPayloadRef;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
/// Monotonic sequence number assigned by the raw trace writer.
pub type RawEventSeq = u64;
/// Current raw event envelope schema version.
pub(crate) const RAW_TRACE_EVENT_SCHEMA_VERSION: u32 = 1;
/// One append-only raw trace event.
///
/// Every event uses the same envelope so partial replay and corruption checks
/// can run before the reducer understands the event-specific payload.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RawTraceEvent {
pub schema_version: u32,
/// Contiguous writer-assigned order inside one rollout event log.
pub seq: RawEventSeq,
/// Unix wall-clock timestamp in milliseconds. Use for display/latency.
pub wall_time_unix_ms: i64,
pub rollout_id: String,
pub thread_id: Option<AgentThreadId>,
pub codex_turn_id: Option<CodexTurnId>,
pub payload: RawTraceEventPayload,
}
/// Writer-supplied context that appears in the raw event envelope.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RawTraceEventContext {
pub thread_id: Option<AgentThreadId>,
pub codex_turn_id: Option<CodexTurnId>,
}
/// Runtime requester as observed at the raw tool boundary.
///
/// This intentionally uses runtime-local identifiers. The reducer is the only
/// place that maps these handles to graph identities such as `CodeCellId`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum RawToolCallRequester {
Model,
CodeCell {
/// Runtime-local code-mode cell handle.
runtime_cell_id: String,
},
}
/// Typed payload for a raw trace event.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum RawTraceEventPayload {
RolloutStarted {
trace_id: String,
root_thread_id: AgentThreadId,
},
RolloutEnded {
status: RolloutStatus,
},
ThreadStarted {
thread_id: AgentThreadId,
/// Stable agent path.
agent_path: String,
metadata_payload: Option<RawPayloadRef>,
},
ThreadEnded {
thread_id: AgentThreadId,
status: RolloutStatus,
},
CodexTurnStarted {
codex_turn_id: CodexTurnId,
thread_id: AgentThreadId,
},
CodexTurnEnded {
codex_turn_id: CodexTurnId,
status: ExecutionStatus,
},
InferenceStarted {
inference_call_id: InferenceCallId,
thread_id: AgentThreadId,
codex_turn_id: CodexTurnId,
model: String,
provider_name: String,
request_payload: RawPayloadRef,
},
InferenceCompleted {
inference_call_id: InferenceCallId,
/// Responses API `response.id`; used by `previous_response_id`.
response_id: Option<String>,
/// Provider transport request id, such as `x-request-id`.
upstream_request_id: Option<String>,
response_payload: RawPayloadRef,
},
InferenceFailed {
inference_call_id: InferenceCallId,
/// Provider transport request id, such as `x-request-id`, when the
/// provider returned one before the stream failed.
upstream_request_id: Option<String>,
error: String,
/// Partial response payload, when stream events arrived before failure.
partial_response_payload: Option<RawPayloadRef>,
},
InferenceCancelled {
inference_call_id: InferenceCallId,
/// Provider transport request id, such as `x-request-id`, when observed
/// before Codex stopped consuming the stream.
upstream_request_id: Option<String>,
/// Why Codex stopped consuming the provider stream before a terminal response event.
reason: String,
/// Completed output items observed before cancellation, if any.
partial_response_payload: Option<RawPayloadRef>,
},
ToolCallStarted {
tool_call_id: ToolCallId,
/// Protocol/model call ID when this runtime call came from model output.
model_visible_call_id: Option<String>,
/// Code-mode runtime bridge ID when model-authored code issued this call.
code_mode_runtime_tool_id: Option<String>,
/// Runtime requester that caused this tool lifecycle.
requester: RawToolCallRequester,
kind: ToolCallKind,
summary: ToolCallSummary,
invocation_payload: Option<RawPayloadRef>,
},
ToolCallRuntimeStarted {
tool_call_id: ToolCallId,
/// Runtime/protocol observation for how Codex began executing the tool.
runtime_payload: RawPayloadRef,
},
ToolCallRuntimeEnded {
tool_call_id: ToolCallId,
status: ExecutionStatus,
/// Runtime/protocol observation for how Codex finished executing the tool.
runtime_payload: RawPayloadRef,
},
ToolCallEnded {
tool_call_id: ToolCallId,
status: ExecutionStatus,
result_payload: Option<RawPayloadRef>,
},
CodeCellStarted {
/// Runtime-local handle allocated by code mode for waits and nested tools.
runtime_cell_id: String,
/// Custom tool call id on the model-visible `exec` item.
model_visible_call_id: ModelVisibleCallId,
/// JavaScript source after the public `exec` wrapper has been parsed.
source_js: String,
},
CodeCellInitialResponse {
/// Runtime-local handle, matching `CodeCellStarted`.
runtime_cell_id: String,
status: CodeCellRuntimeStatus,
response_payload: Option<RawPayloadRef>,
},
CodeCellEnded {
/// Runtime-local handle, matching `CodeCellStarted`.
runtime_cell_id: String,
status: CodeCellRuntimeStatus,
response_payload: Option<RawPayloadRef>,
},
CompactionRequestStarted {
compaction_id: CompactionId,
compaction_request_id: CompactionRequestId,
thread_id: AgentThreadId,
codex_turn_id: CodexTurnId,
model: String,
provider_name: String,
request_payload: RawPayloadRef,
},
CompactionRequestCompleted {
compaction_id: CompactionId,
compaction_request_id: CompactionRequestId,
response_payload: RawPayloadRef,
},
CompactionRequestFailed {
compaction_id: CompactionId,
compaction_request_id: CompactionRequestId,
error: String,
},
/// Checkpoint installation event for remote-compacted replacement history.
CompactionInstalled {
compaction_id: CompactionId,
/// Trace-only checkpoint payload. Do not route this through public UI protocol.
checkpoint_payload: RawPayloadRef,
},
/// Multi-agent v2 child-to-parent completion delivery.
AgentResultObserved {
edge_id: EdgeId,
child_thread_id: AgentThreadId,
child_codex_turn_id: CodexTurnId,
parent_thread_id: AgentThreadId,
message: String,
/// Raw notification payload. This is evidence for the runtime delivery,
/// not the parent-side model-visible item.
carried_payload: Option<RawPayloadRef>,
},
/// Existing UI/protocol event wrapped into trace format.
ProtocolEventObserved {
event_type: String,
event_payload: RawPayloadRef,
},
/// Structured payload for early instrumentation before a dedicated variant exists.
Other {
kind: String,
summary: String,
payloads: Vec<RawPayloadRef>,
/// Small structured metadata. Large data belongs in `payloads`.
metadata: Value,
},
}
impl RawTraceEventPayload {
/// Raw payload refs that must exist before this raw event is appended.
pub(crate) fn raw_payload_refs(&self) -> Vec<&RawPayloadRef> {
match self {
RawTraceEventPayload::RolloutStarted { .. }
| RawTraceEventPayload::RolloutEnded { .. }
| RawTraceEventPayload::ThreadEnded { .. }
| RawTraceEventPayload::CodexTurnStarted { .. }
| RawTraceEventPayload::CodexTurnEnded { .. }
| RawTraceEventPayload::CompactionRequestFailed { .. }
| RawTraceEventPayload::CodeCellStarted { .. }
| RawTraceEventPayload::AgentResultObserved {
carried_payload: None,
..
} => Vec::new(),
RawTraceEventPayload::ThreadStarted {
metadata_payload, ..
} => metadata_payload.iter().collect(),
RawTraceEventPayload::InferenceStarted {
request_payload, ..
}
| RawTraceEventPayload::InferenceCompleted {
response_payload: request_payload,
..
}
| RawTraceEventPayload::CompactionRequestStarted {
request_payload, ..
}
| RawTraceEventPayload::CompactionRequestCompleted {
response_payload: request_payload,
..
}
| RawTraceEventPayload::CompactionInstalled {
checkpoint_payload: request_payload,
..
}
| RawTraceEventPayload::ProtocolEventObserved {
event_payload: request_payload,
..
} => vec![request_payload],
RawTraceEventPayload::InferenceFailed {
partial_response_payload,
..
}
| RawTraceEventPayload::InferenceCancelled {
partial_response_payload,
..
}
| RawTraceEventPayload::ToolCallStarted {
invocation_payload: partial_response_payload,
..
}
| RawTraceEventPayload::ToolCallEnded {
result_payload: partial_response_payload,
..
}
| RawTraceEventPayload::CodeCellInitialResponse {
response_payload: partial_response_payload,
..
}
| RawTraceEventPayload::CodeCellEnded {
response_payload: partial_response_payload,
..
} => partial_response_payload.iter().collect(),
RawTraceEventPayload::AgentResultObserved {
carried_payload: Some(carried_payload),
..
} => vec![carried_payload],
RawTraceEventPayload::ToolCallRuntimeStarted {
runtime_payload, ..
}
| RawTraceEventPayload::ToolCallRuntimeEnded {
runtime_payload, ..
} => vec![runtime_payload],
RawTraceEventPayload::Other { payloads, .. } => payloads.iter().collect(),
}
}
}