Compare commits

...

2 Commits

Author SHA1 Message Date
Albin Cassirer
f73ef250e0 Trace inference input reduction mode
Record whether traced inference input is a full snapshot or a websocket delta so rollout replay does not infer hidden prewarm state from missing response ids.

Co-authored-by: Codex <noreply@openai.com>
2026-05-15 15:50:16 +01:00
Albin Cassirer
ede3fc305e [rollout-tracer] Allow unresolved prev_response_id.
Right now we hard fail if there is an inference request with a prev_response_id which can't be resolved to any previous inference request. However, in the web socket path we do send prewarmup requests which explicitly aren't tracked by the tracer so when the first real request arrives it has a prev_response_id which we can't resolve.

This commit relaxes the constraint so that we now allow for 1 inference request per thread to have an unresolved prev_response_id.
2026-05-15 12:26:29 +01:00
15 changed files with 282 additions and 73 deletions

View File

@@ -82,6 +82,7 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::W3cTraceContext;
use codex_rollout_trace::CompactionTraceContext;
use codex_rollout_trace::InferenceRequestInputMode;
use codex_rollout_trace::InferenceTraceAttempt;
use codex_rollout_trace::InferenceTraceContext;
use codex_tools::create_tools_json_for_responses_api;
@@ -1252,7 +1253,8 @@ impl ModelClientSession {
)?;
let inference_trace_attempt = inference_trace.start_attempt();
inference_trace_attempt.add_request_headers(&mut options.extra_headers);
inference_trace_attempt.record_started(&request);
inference_trace_attempt
.record_started(&request, InferenceRequestInputMode::FullSnapshot);
let client = ApiResponsesClient::new(
transport,
client_setup.api_provider,
@@ -1407,6 +1409,7 @@ impl ModelClientSession {
}
let mut ws_request = self.prepare_websocket_request(ws_payload, &request);
let request_input_mode = websocket_trace_request_input_mode(&ws_request, &request);
self.websocket_session.last_request = Some(request);
let inference_trace_attempt = if warmup {
// Prewarm sends `generate=false`; it is connection setup, not a
@@ -1416,7 +1419,7 @@ impl ModelClientSession {
inference_trace.start_attempt()
};
stamp_ws_stream_request_start_ms(&mut ws_request);
inference_trace_attempt.record_started(&ws_request);
inference_trace_attempt.record_started(&ws_request, request_input_mode);
let websocket_connection =
self.websocket_session.connection.as_ref().ok_or_else(|| {
map_api_error(ApiError::Stream(
@@ -1641,6 +1644,30 @@ fn stamp_ws_stream_request_start_ms(request: &mut ResponsesWsRequest) {
);
}
/// Describes whether the websocket wire payload still contains the logical full input.
///
/// A websocket request can reuse `previous_response_id` while omitting zero visible
/// items, notably after empty startup prewarm. The reducer should treat that as a
/// full snapshot even though the transport-level parent id is present.
fn websocket_trace_request_input_mode(
request: &ResponsesWsRequest,
logical_request: &ResponsesApiRequest,
) -> InferenceRequestInputMode {
let ResponsesWsRequest::ResponseCreate(payload) = request else {
return InferenceRequestInputMode::FullSnapshot;
};
let Some(previous_response_id) = payload.previous_response_id.as_ref() else {
return InferenceRequestInputMode::FullSnapshot;
};
if payload.input == logical_request.input {
InferenceRequestInputMode::FullSnapshot
} else {
InferenceRequestInputMode::Incremental {
previous_response_id: previous_response_id.clone(),
}
}
}
/// Builds the extra headers attached to Responses API requests.
///
/// These headers implement Codex-specific conventions:

View File

@@ -184,14 +184,17 @@ fn started_inference_attempt(temp: &TempDir) -> anyhow::Result<InferenceTraceAtt
"test-provider".to_string(),
);
let attempt = inference_trace.start_attempt();
attempt.record_started(&json!({
"model": "gpt-test",
"input": [{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "hello"}]
}],
}));
attempt.record_started(
&json!({
"model": "gpt-test",
"input": [{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "hello"}]
}],
}),
codex_rollout_trace::InferenceRequestInputMode::FullSnapshot,
);
Ok(attempt)
}

View File

@@ -95,6 +95,10 @@ async fn websocket_first_turn_uses_startup_prewarm_and_create() -> Result<()> {
let turn = connection.get(1).expect("missing turn request").body_json();
assert_eq!(warmup["type"].as_str(), Some("response.create"));
assert_eq!(warmup["generate"].as_bool(), Some(false));
// Rollout trace reduction relies on startup prewarm carrying no model-visible
// conversation input: the first real request may reference this response id,
// while prewarm itself intentionally remains outside inference tracing.
assert_eq!(warmup["input"], serde_json::json!([]));
assert!(
turn["tools"]
.as_array()

View File

@@ -21,6 +21,7 @@ use crate::model::AgentThreadId;
use crate::model::CodexTurnId;
use crate::model::InferenceCallId;
use crate::payload::RawPayloadKind;
use crate::raw_event::InferenceRequestInputMode;
use crate::raw_event::RawTraceEventContext;
use crate::raw_event::RawTraceEventPayload;
use crate::writer::TraceWriter;
@@ -166,8 +167,12 @@ impl InferenceTraceAttempt {
headers.insert(INFERENCE_CALL_ID_HEADER, inference_call_id);
}
/// Records the exact request object about to be sent to the model provider.
pub fn record_started(&self, request: &impl Serialize) {
/// Records the exact provider request plus how replay should reduce its visible input.
pub fn record_started(
&self,
request: &impl Serialize,
request_input_mode: InferenceRequestInputMode,
) {
let InferenceTraceAttemptState::Enabled(attempt) = &self.state else {
return;
};
@@ -187,6 +192,7 @@ impl InferenceTraceAttempt {
codex_turn_id: attempt.context.codex_turn_id.clone(),
model: attempt.context.model.clone(),
provider_name: attempt.context.provider_name.clone(),
request_input_mode: Some(request_input_mode),
request_payload,
},
);
@@ -462,14 +468,17 @@ mod tests {
);
let attempt = context.start_attempt();
attempt.record_started(&json!({
"model": "gpt-test",
"input": [{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "hello"}]
}],
}));
attempt.record_started(
&json!({
"model": "gpt-test",
"input": [{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "hello"}]
}],
}),
InferenceRequestInputMode::FullSnapshot,
);
attempt.record_completed("resp-1", Some("req-1"), &None, &[]);
let rollout = replay_bundle(temp.path())?;

View File

@@ -44,6 +44,8 @@ pub use payload::RawPayloadId;
pub use payload::RawPayloadKind;
/// Reference to a raw request/response/log payload stored in the bundle.
pub use payload::RawPayloadRef;
/// Reducer semantics for one traced inference request's visible input items.
pub use raw_event::InferenceRequestInputMode;
/// Monotonic sequence number assigned by the raw trace writer.
pub use raw_event::RawEventSeq;
/// Runtime requester observed before semantic reduction.

View File

@@ -63,6 +63,24 @@ pub enum RawToolCallRequester {
},
}
/// How an inference request's visible `input` field should reduce into conversation state.
///
/// The raw request payload intentionally preserves the transport bytes. WebSocket
/// transport can still send a `previous_response_id` when that parent omitted no
/// model-visible items, so replay needs this trace-side semantic fact rather than
/// guessing solely from the wire payload.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum InferenceRequestInputMode {
/// The request payload contains the whole model-visible input snapshot.
FullSnapshot,
/// The request payload contains only items appended after a traced prior response.
Incremental {
/// Responses API `response.id` whose traced request/response items form the omitted prefix.
previous_response_id: String,
},
}
/// Typed payload for a raw trace event.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
@@ -98,6 +116,12 @@ pub enum RawTraceEventPayload {
codex_turn_id: CodexTurnId,
model: String,
provider_name: String,
/// Reducer semantics for the request payload's visible `input` items.
///
/// `None` is accepted only for trace bundles written before this field existed;
/// new producers should always record an explicit mode.
#[serde(default, skip_serializing_if = "Option::is_none")]
request_input_mode: Option<InferenceRequestInputMode>,
request_payload: RawPayloadRef,
},
InferenceCompleted {

View File

@@ -37,6 +37,7 @@ fn code_cell_lifecycle_links_nested_tools_waits_and_outputs() -> anyhow::Result<
codex_turn_id: "turn-1".to_string(),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
request_input_mode: None,
request_payload: request,
})?;
let response = writer.write_json_payload(
@@ -120,6 +121,7 @@ fn code_cell_lifecycle_links_nested_tools_waits_and_outputs() -> anyhow::Result<
codex_turn_id: "turn-2".to_string(),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
request_input_mode: None,
request_payload: followup,
})?;
let wait_request = writer.write_json_payload(
@@ -207,6 +209,7 @@ fn fast_code_cell_lifecycle_waits_for_source_item() -> anyhow::Result<()> {
codex_turn_id: "turn-1".to_string(),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
request_input_mode: None,
request_payload: request,
})?;
writer.append_with_context(
@@ -286,6 +289,7 @@ fn cancelled_turn_terminates_unfinished_code_cell() -> anyhow::Result<()> {
codex_turn_id: "turn-1".to_string(),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
request_input_mode: None,
request_payload: request,
})?;
let response = writer.write_json_payload(
@@ -366,6 +370,7 @@ fn runtime_code_cell_ids_can_repeat_across_threads() -> anyhow::Result<()> {
codex_turn_id: turn_id.to_string(),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
request_input_mode: None,
request_payload: request,
})?;
writer.append_with_context(

View File

@@ -22,6 +22,7 @@ use crate::model::ConversationRole;
use crate::model::InferenceCallId;
use crate::model::ProducerRef;
use crate::payload::RawPayloadRef;
use crate::raw_event::InferenceRequestInputMode;
mod normalize;
@@ -37,6 +38,7 @@ impl TraceReducer {
inference_call_id: &InferenceCallId,
thread_id: &str,
codex_turn_id: &str,
request_input_mode: Option<&InferenceRequestInputMode>,
request_payload: &RawPayloadRef,
) -> Result<Vec<String>> {
let payload = self.read_payload_json(request_payload)?;
@@ -55,56 +57,78 @@ impl TraceReducer {
let items = normalize::normalize_model_items(request_items, request_payload)?;
let previous_response_id = payload.get("previous_response_id").and_then(Value::as_str);
let payload_previous_response_id =
payload.get("previous_response_id").and_then(Value::as_str);
// Old bundles predate request-input reduction metadata. Preserve their
// existing replay behavior by deriving the legacy interpretation from
// the transport payload only when the explicit mode is absent.
let request_input_mode = request_input_mode.cloned().unwrap_or_else(|| {
payload_previous_response_id.map_or(InferenceRequestInputMode::FullSnapshot, |id| {
InferenceRequestInputMode::Incremental {
previous_response_id: id.to_string(),
}
})
});
// After compaction, the next full request is compared against the installed replacement
// history, not the pre-compaction prompt. Any repeated developer/context prefix that Codex
// reinjects must therefore become a fresh post-compaction conversation item.
let post_compaction_snapshot = if previous_response_id.is_none() {
self.pending_compaction_replacement_item_ids
.get(thread_id)
.cloned()
} else {
None
};
let request_item_ids = if let Some(previous_response_id) = previous_response_id {
// Streaming follow-up requests can send only the new input plus a
// `previous_response_id`. The trace model still exposes the full
// model-visible input, so rebuild the omitted prefix from the
// previous request and response before reducing this delta.
let previous_items = self
.rollout
.inference_calls
.values()
.find(|inference| {
inference.thread_id == thread_id
&& inference.response_id.as_deref() == Some(previous_response_id)
})
.map(|inference| {
let mut ids = inference.request_item_ids.clone();
ids.extend(inference.response_item_ids.clone());
ids
});
let Some(mut item_ids) = previous_items else {
bail!(
"incremental inference request {inference_call_id} referenced unknown previous_response_id {previous_response_id}"
);
let post_compaction_snapshot =
if matches!(request_input_mode, InferenceRequestInputMode::FullSnapshot) {
self.pending_compaction_replacement_item_ids
.get(thread_id)
.cloned()
} else {
None
};
let delta_item_ids = self.reconcile_conversation_items(
items,
ReconcileItems {
thread_id,
codex_turn_id,
wall_time_unix_ms,
produced_by: Vec::new(),
start_index: item_ids.len(),
mode: ReconcileMode::AppendOnly,
snapshot_override: None,
},
)?;
item_ids.extend(delta_item_ids);
item_ids
} else {
self.reconcile_conversation_items(
let request_item_ids = match request_input_mode {
InferenceRequestInputMode::Incremental {
previous_response_id,
} => {
if payload_previous_response_id != Some(previous_response_id.as_str()) {
bail!(
"incremental inference request {inference_call_id} recorded previous_response_id {previous_response_id}, \
but payload previous_response_id was {payload_previous_response_id:?}"
);
}
// Streaming follow-up requests can send only the new input plus a
// `previous_response_id`. The trace model still exposes the full
// model-visible input, so rebuild the omitted prefix from the
// previous request and response before reducing this delta.
let previous_items = self
.rollout
.inference_calls
.values()
.find(|inference| {
inference.thread_id == thread_id
&& inference.response_id.as_deref()
== Some(previous_response_id.as_str())
})
.map(|inference| {
let mut ids = inference.request_item_ids.clone();
ids.extend(inference.response_item_ids.clone());
ids
});
let Some(mut item_ids) = previous_items else {
bail!(
"incremental inference request {inference_call_id} referenced unknown previous_response_id {previous_response_id}"
);
};
let delta_item_ids = self.reconcile_conversation_items(
items,
ReconcileItems {
thread_id,
codex_turn_id,
wall_time_unix_ms,
produced_by: Vec::new(),
start_index: item_ids.len(),
mode: ReconcileMode::AppendOnly,
snapshot_override: None,
},
)?;
item_ids.extend(delta_item_ids);
item_ids
}
InferenceRequestInputMode::FullSnapshot => self.reconcile_conversation_items(
items,
ReconcileItems {
thread_id,
@@ -115,7 +139,7 @@ impl TraceReducer {
mode: ReconcileMode::FullSnapshot,
snapshot_override: post_compaction_snapshot.as_deref(),
},
)?
)?,
};
self.append_thread_conversation_items(thread_id, &request_item_ids)?;

View File

@@ -13,6 +13,7 @@ use crate::payload::RawPayloadKind;
use crate::raw_event::RawTraceEventPayload;
use crate::reducer::test_support::append_inference_completion;
use crate::reducer::test_support::append_inference_start;
use crate::reducer::test_support::append_inference_start_for_thread_with_mode;
use crate::reducer::test_support::create_started_writer;
use crate::reducer::test_support::expect_replay_error;
use crate::reducer::test_support::message;
@@ -76,7 +77,14 @@ fn response_outputs_enter_thread_conversation_on_completion() -> anyhow::Result<
"input": [message("user", "run tests")]
}),
)?;
append_inference_start(&writer, "inference-1", "turn-1", request)?;
append_inference_start_for_thread_with_mode(
&writer,
"thread-root",
"turn-1",
"inference-1",
Some(crate::InferenceRequestInputMode::FullSnapshot),
request,
)?;
let response = writer.write_json_payload(
RawPayloadKind::InferenceResponse,
@@ -119,7 +127,14 @@ fn later_full_request_reuses_prior_json_tool_call_by_position() -> anyhow::Resul
"input": [message("user", "run tests")]
}),
)?;
append_inference_start(&writer, "inference-1", "turn-1", request)?;
append_inference_start_for_thread_with_mode(
&writer,
"thread-root",
"turn-1",
"inference-1",
Some(crate::InferenceRequestInputMode::FullSnapshot),
request,
)?;
let response = writer.write_json_payload(
RawPayloadKind::InferenceResponse,
@@ -220,7 +235,16 @@ fn incremental_request_carries_prior_request_and_response_items_forward() -> any
]
}),
)?;
append_inference_start(&writer, "inference-2", "turn-2", incremental_request)?;
append_inference_start_for_thread_with_mode(
&writer,
"thread-root",
"turn-2",
"inference-2",
Some(crate::InferenceRequestInputMode::Incremental {
previous_response_id: "resp-1".to_string(),
}),
incremental_request,
)?;
let rollout = replay_bundle(temp.path())?;
let first = &rollout.inference_calls["inference-1"];
@@ -666,7 +690,7 @@ fn missing_request_input_is_reducer_error() -> anyhow::Result<()> {
}
#[test]
fn unknown_previous_response_id_is_reducer_error() -> anyhow::Result<()> {
fn initial_untraced_websocket_prewarm_response_reduces_full_delta_input() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let writer = create_started_writer(&temp)?;
start_turn(&writer, "turn-1")?;
@@ -678,7 +702,63 @@ fn unknown_previous_response_id_is_reducer_error() -> anyhow::Result<()> {
"input": [message("user", "still here")]
}),
)?;
append_inference_start(&writer, "inference-1", "turn-1", request)?;
append_inference_start_for_thread_with_mode(
&writer,
"thread-root",
"turn-1",
"inference-1",
Some(crate::InferenceRequestInputMode::FullSnapshot),
request,
)?;
let rollout = replay_bundle(temp.path())?;
let inference = &rollout.inference_calls["inference-1"];
assert_eq!(inference.request_item_ids.len(), 1);
assert_eq!(
rollout.conversation_items[&inference.request_item_ids[0]]
.body
.parts,
vec![ConversationPart::Text {
text: "still here".to_string(),
}],
);
Ok(())
}
#[test]
fn later_unknown_previous_response_id_is_reducer_error() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let writer = create_started_writer(&temp)?;
start_turn(&writer, "turn-1")?;
let first_request = writer.write_json_payload(
RawPayloadKind::InferenceRequest,
&json!({
"input": [message("user", "first")]
}),
)?;
append_inference_start(&writer, "inference-1", "turn-1", first_request)?;
start_turn(&writer, "turn-2")?;
let later_request = writer.write_json_payload(
RawPayloadKind::InferenceRequest,
&json!({
"previous_response_id": "resp-missing",
"input": [message("user", "later")]
}),
)?;
append_inference_start_for_thread_with_mode(
&writer,
"thread-root",
"turn-2",
"inference-2",
Some(crate::InferenceRequestInputMode::Incremental {
previous_response_id: "resp-missing".to_string(),
}),
later_request,
)?;
expect_replay_error(&temp, "unknown previous_response_id resp-missing")
}

View File

@@ -12,6 +12,7 @@ use crate::model::ExecutionWindow;
use crate::model::InferenceCall;
use crate::model::InferenceCallId;
use crate::payload::RawPayloadRef;
use crate::raw_event::InferenceRequestInputMode;
use crate::raw_event::RawEventSeq;
use crate::raw_event::RawTraceEventPayload;
@@ -25,6 +26,7 @@ pub(super) struct StartedInferenceCall {
pub(super) codex_turn_id: String,
pub(super) model: String,
pub(super) provider_name: String,
pub(super) request_input_mode: Option<InferenceRequestInputMode>,
pub(super) request_payload: RawPayloadRef,
}
@@ -72,6 +74,7 @@ impl TraceReducer {
&inference_call_id,
&thread_id,
&codex_turn_id,
started.request_input_mode.as_ref(),
&request_payload,
)?;

View File

@@ -211,6 +211,7 @@ impl TraceReducer {
codex_turn_id,
model,
provider_name,
request_input_mode,
request_payload,
} => {
self.start_inference_call(
@@ -222,6 +223,7 @@ impl TraceReducer {
codex_turn_id,
model,
provider_name,
request_input_mode,
request_payload,
},
)?;

View File

@@ -9,6 +9,7 @@ use tempfile::TempDir;
use crate::model::ToolCallSummary;
use crate::payload::RawPayloadKind;
use crate::payload::RawPayloadRef;
use crate::raw_event::InferenceRequestInputMode;
use crate::raw_event::RawTraceEventContext;
use crate::raw_event::RawTraceEventPayload;
use crate::replay_bundle;
@@ -125,6 +126,24 @@ pub(crate) fn append_inference_start_for_thread(
codex_turn_id: &str,
inference_call_id: &str,
request_payload: RawPayloadRef,
) -> anyhow::Result<()> {
append_inference_start_for_thread_with_mode(
writer,
thread_id,
codex_turn_id,
inference_call_id,
/*request_input_mode*/ None,
request_payload,
)
}
pub(crate) fn append_inference_start_for_thread_with_mode(
writer: &TraceWriter,
thread_id: &str,
codex_turn_id: &str,
inference_call_id: &str,
request_input_mode: Option<InferenceRequestInputMode>,
request_payload: RawPayloadRef,
) -> anyhow::Result<()> {
writer.append(RawTraceEventPayload::InferenceStarted {
inference_call_id: inference_call_id.to_string(),
@@ -132,6 +151,7 @@ pub(crate) fn append_inference_start_for_thread(
codex_turn_id: codex_turn_id.to_string(),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
request_input_mode,
request_payload,
})?;
Ok(())

View File

@@ -533,6 +533,7 @@ fn append_inference_with_tool_call(writer: &TraceWriter) -> anyhow::Result<()> {
codex_turn_id: "turn-1".to_string(),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
request_input_mode: None,
request_payload: request,
})?;
@@ -575,6 +576,7 @@ fn append_followup_with_tool_output(writer: &TraceWriter) -> anyhow::Result<()>
codex_turn_id: "turn-2".to_string(),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
request_input_mode: None,
request_payload: request,
})?;
Ok(())

View File

@@ -131,7 +131,10 @@ fn disabled_thread_context_accepts_trace_calls_without_writing() -> anyhow::Resu
let inference_trace =
thread_trace.inference_trace_context("turn-1", "gpt-test", "test-provider");
let inference_attempt = inference_trace.start_attempt();
inference_attempt.record_started(&serde_json::json!({ "kind": "inference" }));
inference_attempt.record_started(
&serde_json::json!({ "kind": "inference" }),
crate::InferenceRequestInputMode::FullSnapshot,
);
let token_usage: Option<codex_protocol::protocol::TokenUsage> = None;
inference_attempt.record_completed("response-1", Some("req-1"), &token_usage, &[]);
inference_attempt.record_failed("inference failed", /*upstream_request_id*/ None, &[]);

View File

@@ -214,6 +214,7 @@ mod tests {
codex_turn_id: "turn-1".to_string(),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
request_input_mode: None,
request_payload: inference_request.clone(),
})?;
let inference_response = writer.write_json_payload(