Files
codex/codex-rs/core/src/client_tests.rs
cassirer-openai 89698ad1c3 [rollout-trace] Include x-request-id in rollout trace. (#20066)
## Why

Rollout traces need an identifier that can be used to correlate a Codex
inference with upstream Responses API, proxy, and engine logs. The
reduced trace model already exposed `upstream_request_id`, but it was
being populated from the Responses API `response.id`. That value is
useful for `previous_response_id` chaining, but it is not the transport
request id that upstream systems key on.

This PR separates those concepts so trace consumers can reliably answer
both questions:

- which Responses API response did this inference produce?
- which upstream request handled it?

## Structure

The change keeps the upstream request id at the same lifecycle level as
the provider stream:

- `codex-api` captures the `x-request-id` HTTP response header when the
SSE stream is created and exposes it on `ResponseStream`. Fixture and
websocket streams set the field to `None` because they do not have that
HTTP response header.
- `codex-core` carries that stream-level id into `InferenceTraceAttempt`
when recording terminal stream outcomes. Completed, failed, cancelled,
dropped-stream, and pre-response error paths all record the id when it
is available.
- `rollout-trace` now records both identifiers in raw terminal inference
events and response payloads: `response_id` for the Responses API
`response.id`, and `upstream_request_id` for `x-request-id`.
- The reducer stores both fields on `InferenceCall`. It also uses
`response_id` for `previous_response_id` conversation linking, which
removes the old accidental dependency on the misnamed
`upstream_request_id` field.
- Terminal inference reduction now consumes the full terminal payload
(`InferenceCompleted`, `InferenceFailed`, or `InferenceCancelled`) in
one place. That keeps status, partial payloads, response ids, and
upstream request ids consistent across success, failure, cancellation,
and late stream-mapper events.

## Why This Shape

`x-request-id` is a property of the HTTP/provider response envelope, not
an SSE event. Capturing it once in `codex-api` and plumbing it through
terminal trace recording avoids trying to infer the value from stream
contents, and it preserves the id even when the stream fails or is
cancelled after only partial output.

Keeping `response_id` separate from `upstream_request_id` also makes the
reduced trace model less surprising: `response_id` remains the
conversation-continuation id, while `upstream_request_id` is the
operational correlation id for upstream debugging.

## Validation

The PR updates trace and reducer coverage for:

- reading `x-request-id` from SSE response headers;
- storing the true upstream request id on completed inference calls;
- preserving upstream request ids for cancelled and late-cancelled
inference streams;
- keeping `previous_response_id` reconstruction tied to `response_id`
rather than transport request ids.
2026-04-28 21:11:17 +00:00

386 lines
13 KiB
Rust

use super::AuthRequestTelemetryContext;
use super::ModelClient;
use super::PendingUnauthorizedRetry;
use super::UnauthorizedRecoveryExecution;
use super::X_CODEX_INSTALLATION_ID_HEADER;
use super::X_CODEX_PARENT_THREAD_ID_HEADER;
use super::X_CODEX_TURN_METADATA_HEADER;
use super::X_CODEX_WINDOW_ID_HEADER;
use super::X_OPENAI_SUBAGENT_HEADER;
use codex_api::ApiError;
use codex_api::ResponseEvent;
use codex_app_server_protocol::AuthMode;
use codex_model_provider::BearerAuthProvider;
use codex_model_provider_info::WireApi;
use codex_model_provider_info::create_oss_provider_with_base_url;
use codex_otel::SessionTelemetry;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::protocol::InternalSessionSource;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_rollout_trace::ExecutionStatus;
use codex_rollout_trace::InferenceTraceAttempt;
use codex_rollout_trace::InferenceTraceContext;
use codex_rollout_trace::RawTraceEventPayload;
use codex_rollout_trace::RolloutTrace;
use codex_rollout_trace::TraceWriter;
use codex_rollout_trace::replay_bundle;
use futures::StreamExt;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use tempfile::TempDir;
use tokio::sync::Notify;
fn test_model_client(session_source: SessionSource) -> ModelClient {
let provider = create_oss_provider_with_base_url("https://example.com/v1", WireApi::Responses);
ModelClient::new(
/*auth_manager*/ None,
ThreadId::new(),
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
provider,
session_source,
/*model_verbosity*/ None,
/*enable_request_compression*/ false,
/*include_timing_metrics*/ false,
/*beta_features_header*/ None,
)
}
fn test_model_info() -> ModelInfo {
serde_json::from_value(json!({
"slug": "gpt-test",
"display_name": "gpt-test",
"description": "desc",
"default_reasoning_level": "medium",
"supported_reasoning_levels": [
{"effort": "medium", "description": "medium"}
],
"shell_type": "shell_command",
"visibility": "list",
"supported_in_api": true,
"priority": 1,
"upgrade": null,
"base_instructions": "base instructions",
"model_messages": null,
"supports_reasoning_summaries": false,
"support_verbosity": false,
"default_verbosity": null,
"apply_patch_tool_type": null,
"truncation_policy": {"mode": "bytes", "limit": 10000},
"supports_parallel_tool_calls": false,
"supports_image_detail_original": false,
"context_window": 272000,
"auto_compact_token_limit": null,
"experimental_supported_tools": []
}))
.expect("deserialize test model info")
}
fn test_session_telemetry() -> SessionTelemetry {
SessionTelemetry::new(
ThreadId::new(),
"gpt-test",
"gpt-test",
/*account_id*/ None,
/*account_email*/ None,
/*auth_mode*/ None,
"test-originator".to_string(),
/*log_user_prompts*/ false,
"test-terminal".to_string(),
SessionSource::Cli,
)
}
fn started_inference_attempt(temp: &TempDir) -> anyhow::Result<InferenceTraceAttempt> {
let writer = Arc::new(TraceWriter::create(
temp.path(),
"trace-1".to_string(),
"rollout-1".to_string(),
"thread-root".to_string(),
)?);
writer.append(RawTraceEventPayload::ThreadStarted {
thread_id: "thread-root".to_string(),
agent_path: "/root".to_string(),
metadata_payload: None,
})?;
writer.append(RawTraceEventPayload::CodexTurnStarted {
codex_turn_id: "turn-1".to_string(),
thread_id: "thread-root".to_string(),
})?;
let inference_trace = InferenceTraceContext::enabled(
writer,
"thread-root".to_string(),
"turn-1".to_string(),
"gpt-test".to_string(),
"test-provider".to_string(),
);
let attempt = inference_trace.start_attempt();
attempt.record_started(&json!({
"model": "gpt-test",
"input": [{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "hello"}]
}],
}));
Ok(attempt)
}
fn output_message(id: &str, text: &str) -> ResponseItem {
ResponseItem::Message {
id: Some(id.to_string()),
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
phase: None,
}
}
async fn replay_until_cancelled(temp: &TempDir) -> anyhow::Result<RolloutTrace> {
let mut rollout = replay_bundle(temp.path())?;
for _ in 0..50 {
let inference = rollout
.inference_calls
.values()
.next()
.expect("inference should be reduced");
if inference.execution.status == ExecutionStatus::Cancelled {
return Ok(rollout);
}
tokio::time::sleep(Duration::from_millis(10)).await;
rollout = replay_bundle(temp.path())?;
}
Ok(rollout)
}
struct NotifyAfterEventStream {
events: VecDeque<ResponseEvent>,
yielded: usize,
notify_after: usize,
notify: Arc<Notify>,
}
impl futures::Stream for NotifyAfterEventStream {
type Item = std::result::Result<ResponseEvent, ApiError>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Some(event) = self.events.pop_front() else {
return Poll::Pending;
};
self.yielded += 1;
if self.yielded == self.notify_after {
self.notify.notify_one();
}
Poll::Ready(Some(Ok(event)))
}
}
#[test]
fn build_subagent_headers_sets_other_subagent_label() {
let client = test_model_client(SessionSource::SubAgent(SubAgentSource::Other(
"memory_consolidation".to_string(),
)));
let headers = client.build_subagent_headers();
let value = headers
.get(X_OPENAI_SUBAGENT_HEADER)
.and_then(|value| value.to_str().ok());
assert_eq!(value, Some("memory_consolidation"));
}
#[test]
fn build_subagent_headers_sets_internal_memory_consolidation_label() {
let client = test_model_client(SessionSource::Internal(
InternalSessionSource::MemoryConsolidation,
));
let headers = client.build_subagent_headers();
let value = headers
.get(X_OPENAI_SUBAGENT_HEADER)
.and_then(|value| value.to_str().ok());
assert_eq!(value, Some("memory_consolidation"));
}
#[test]
fn build_ws_client_metadata_includes_window_lineage_and_turn_metadata() {
let parent_thread_id = ThreadId::new();
let client = test_model_client(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: None,
}));
client.advance_window_generation();
let client_metadata = client.build_ws_client_metadata(Some(r#"{"turn_id":"turn-123"}"#));
let conversation_id = client.state.conversation_id;
assert_eq!(
client_metadata,
std::collections::HashMap::from([
(
X_CODEX_INSTALLATION_ID_HEADER.to_string(),
"11111111-1111-4111-8111-111111111111".to_string(),
),
(
X_CODEX_WINDOW_ID_HEADER.to_string(),
format!("{conversation_id}:1"),
),
(
X_OPENAI_SUBAGENT_HEADER.to_string(),
"collab_spawn".to_string(),
),
(
X_CODEX_PARENT_THREAD_ID_HEADER.to_string(),
parent_thread_id.to_string(),
),
(
X_CODEX_TURN_METADATA_HEADER.to_string(),
r#"{"turn_id":"turn-123"}"#.to_string(),
),
])
);
}
#[tokio::test]
async fn summarize_memories_returns_empty_for_empty_input() {
let client = test_model_client(SessionSource::Cli);
let model_info = test_model_info();
let session_telemetry = test_session_telemetry();
let output = client
.summarize_memories(
Vec::new(),
&model_info,
/*effort*/ None,
&session_telemetry,
)
.await
.expect("empty summarize request should succeed");
assert_eq!(output.len(), 0);
}
#[tokio::test]
async fn dropped_response_stream_traces_cancelled_partial_output() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let attempt = started_inference_attempt(&temp)?;
// The provider has produced one complete output item, but no terminal
// response.completed event. The harness has enough information to keep this
// item in history, so the trace should preserve it when the stream is
// abandoned.
let item = output_message("msg-1", "partial answer");
let api_stream = futures::stream::iter([Ok(ResponseEvent::OutputItemDone(item))])
.chain(futures::stream::pending());
let (mut stream, _) = super::map_response_events(
/*upstream_request_id*/ None,
api_stream,
test_session_telemetry(),
attempt,
);
let observed = stream
.next()
.await
.expect("mapped stream should yield output item")?;
assert!(matches!(observed, ResponseEvent::OutputItemDone(_)));
// Dropping the consumer is how turn interruption/preemption stops polling
// the provider stream. The mapper task observes that drop asynchronously
// and records cancellation using the output items it has already seen.
drop(stream);
// Cancellation is recorded by the mapper task after Drop wakes it, so the
// replay may need a short wait before the terminal event appears on disk.
let rollout = replay_until_cancelled(&temp).await?;
let inference = rollout
.inference_calls
.values()
.next()
.expect("inference should be reduced");
assert_eq!(inference.execution.status, ExecutionStatus::Cancelled);
assert_eq!(inference.response_item_ids.len(), 1);
assert_eq!(rollout.raw_payloads.len(), 2);
Ok(())
}
#[tokio::test]
async fn dropped_backpressured_response_stream_traces_cancelled_partial_output()
-> anyhow::Result<()> {
let temp = TempDir::new()?;
let attempt = started_inference_attempt(&temp)?;
let backpressured_item_yielded = Arc::new(Notify::new());
let mut events = VecDeque::new();
for _ in 0..super::RESPONSE_STREAM_CHANNEL_CAPACITY {
events.push_back(ResponseEvent::Created);
}
events.push_back(ResponseEvent::OutputItemDone(output_message(
"msg-1",
"partial answer",
)));
let api_stream = NotifyAfterEventStream {
events,
yielded: 0,
notify_after: super::RESPONSE_STREAM_CHANNEL_CAPACITY + 1,
notify: Arc::clone(&backpressured_item_yielded),
};
let (stream, _) = super::map_response_events(
/*upstream_request_id*/ None,
api_stream,
test_session_telemetry(),
attempt,
);
// Fill the mapper channel with non-terminal events, then yield one output
// item. The mapper has observed that item and is blocked trying to send it
// downstream, so dropping the consumer covers the send-failure path rather
// than the `consumer_dropped` select branch.
backpressured_item_yielded.notified().await;
drop(stream);
let rollout = replay_until_cancelled(&temp).await?;
let inference = rollout
.inference_calls
.values()
.next()
.expect("inference should be reduced");
assert_eq!(inference.execution.status, ExecutionStatus::Cancelled);
assert_eq!(inference.response_item_ids.len(), 1);
assert_eq!(rollout.raw_payloads.len(), 2);
Ok(())
}
#[test]
fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() {
let auth_context = AuthRequestTelemetryContext::new(
Some(AuthMode::Chatgpt),
&BearerAuthProvider::for_test(Some("access-token"), Some("workspace-123")),
PendingUnauthorizedRetry::from_recovery(UnauthorizedRecoveryExecution {
mode: "managed",
phase: "refresh_token",
}),
);
assert_eq!(auth_context.auth_mode, Some("Chatgpt"));
assert!(auth_context.auth_header_attached);
assert_eq!(auth_context.auth_header_name, Some("authorization"));
assert!(auth_context.retry_after_unauthorized);
assert_eq!(auth_context.recovery_mode, Some("managed"));
assert_eq!(auth_context.recovery_phase, Some("refresh_token"));
}