Compare commits

...

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
e3626db649 Use responses request helpers for compact requests
Make /responses/compact reuse the shared responses request construction and serialization path, then adapt only the fields rejected by the compact endpoint. Preserve compact-specific headers and add remote compact parity coverage across auth modes.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 00:23:01 +03:00
14 changed files with 759 additions and 190 deletions

View File

@@ -20,21 +20,6 @@ use tokio::sync::mpsc;
pub const WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY: &str = "ws_request_header_traceparent";
pub const WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY: &str = "ws_request_header_tracestate";
/// Canonical input payload for the compaction endpoint.
#[derive(Debug, Clone, Serialize)]
pub struct CompactionInput<'a> {
pub model: &'a str,
pub input: &'a [ResponseItem],
#[serde(skip_serializing_if = "str::is_empty")]
pub instructions: &'a str,
pub tools: Vec<Value>,
pub parallel_tool_calls: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning: Option<Reasoning>,
#[serde(skip_serializing_if = "Option::is_none")]
pub text: Option<TextControls>,
}
/// Canonical input payload for the memory summarize endpoint.
#[derive(Debug, Clone, Serialize)]
pub struct MemorySummarizeInput {
@@ -162,6 +147,11 @@ impl From<VerbosityConfig> for OpenAiVerbosity {
}
}
/// Shared request body for `/responses`-style calls.
///
/// Normal sampling sends every populated field. `/responses/compact` starts from this same shape
/// and then clears the fields that endpoint does not accept, so new request-body fields naturally
/// stay visible to both paths unless compact explicitly opts out.
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct ResponsesApiRequest {
pub model: String,
@@ -169,12 +159,14 @@ pub struct ResponsesApiRequest {
pub instructions: String,
pub input: Vec<ResponseItem>,
pub tools: Vec<serde_json::Value>,
pub tool_choice: String,
pub parallel_tool_calls: bool,
pub reasoning: Option<Reasoning>,
pub store: bool,
pub stream: bool,
pub include: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub store: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub include: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub service_tier: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -193,7 +185,6 @@ impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
previous_response_id: None,
input: request.input.clone(),
tools: request.tools.clone(),
tool_choice: request.tool_choice.clone(),
parallel_tool_calls: request.parallel_tool_calls,
reasoning: request.reasoning.clone(),
store: request.store,
@@ -217,12 +208,14 @@ pub struct ResponseCreateWsRequest {
pub previous_response_id: Option<String>,
pub input: Vec<ResponseItem>,
pub tools: Vec<Value>,
pub tool_choice: String,
pub parallel_tool_calls: bool,
pub reasoning: Option<Reasoning>,
pub store: bool,
pub stream: bool,
pub include: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub store: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub include: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub service_tier: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]

View File

@@ -1,15 +1,17 @@
use crate::auth::SharedAuthProvider;
use crate::common::CompactionInput;
use crate::common::ResponsesApiRequest;
use crate::endpoint::ResponsesOptions;
use crate::endpoint::session::EndpointSession;
use crate::error::ApiError;
use crate::provider::Provider;
use crate::requests::responses::build_responses_request_body;
use crate::requests::responses::build_responses_request_headers;
use codex_client::HttpTransport;
use codex_client::RequestTelemetry;
use codex_protocol::models::ResponseItem;
use http::HeaderMap;
use http::Method;
use serde::Deserialize;
use serde_json::to_value;
use std::sync::Arc;
pub struct CompactClient<T: HttpTransport> {
@@ -47,14 +49,21 @@ impl<T: HttpTransport> CompactClient<T> {
Ok(parsed.output)
}
pub async fn compact_input(
pub async fn compact_request(
&self,
input: &CompactionInput<'_>,
extra_headers: HeaderMap,
request: ResponsesApiRequest,
options: ResponsesOptions,
) -> Result<Vec<ResponseItem>, ApiError> {
let body = to_value(input)
.map_err(|e| ApiError::Stream(format!("failed to encode compaction input: {e}")))?;
self.compact(body, extra_headers).await
let ResponsesOptions {
conversation_id,
session_source,
extra_headers,
..
} = options;
let body = build_responses_request_body(&request, self.session.provider())?;
let headers =
build_responses_request_headers(extra_headers, conversation_id, session_source);
self.compact(body, headers).await
}
}

View File

@@ -5,10 +5,8 @@ use crate::endpoint::session::EndpointSession;
use crate::error::ApiError;
use crate::provider::Provider;
use crate::requests::Compression;
use crate::requests::attach_item_ids;
use crate::requests::headers::build_conversation_headers;
use crate::requests::headers::insert_header;
use crate::requests::headers::subagent_header;
use crate::requests::responses::build_responses_request_body;
use crate::requests::responses::build_responses_request_headers;
use crate::sse::spawn_response_stream;
use crate::telemetry::SseTelemetry;
use codex_client::HttpTransport;
@@ -28,7 +26,7 @@ pub struct ResponsesClient<T: HttpTransport> {
sse_telemetry: Option<Arc<dyn SseTelemetry>>,
}
#[derive(Default)]
#[derive(Clone, Default)]
pub struct ResponsesOptions {
pub conversation_id: Option<String>,
pub session_source: Option<SessionSource>,
@@ -79,20 +77,9 @@ impl<T: HttpTransport> ResponsesClient<T> {
turn_state,
} = options;
let mut body = serde_json::to_value(&request)
.map_err(|e| ApiError::Stream(format!("failed to encode responses request: {e}")))?;
if request.store && self.session.provider().is_azure_responses_endpoint() {
attach_item_ids(&mut body, &request.input);
}
let mut headers = extra_headers;
if let Some(ref conv_id) = conversation_id {
insert_header(&mut headers, "x-client-request-id", conv_id);
}
headers.extend(build_conversation_headers(conversation_id));
if let Some(subagent) = subagent_header(&session_source) {
insert_header(&mut headers, "x-openai-subagent", &subagent);
}
let body = build_responses_request_body(&request, self.session.provider())?;
let headers =
build_responses_request_headers(extra_headers, conversation_id, session_source);
self.stream(body, headers, compression, turn_state).await
}

View File

@@ -21,7 +21,6 @@ pub use crate::auth::AuthHeaderTelemetry;
pub use crate::auth::AuthProvider;
pub use crate::auth::SharedAuthProvider;
pub use crate::auth::auth_header_telemetry;
pub use crate::common::CompactionInput;
pub use crate::common::MemorySummarizeInput;
pub use crate::common::MemorySummarizeOutput;
pub use crate::common::OpenAiVerbosity;

View File

@@ -2,4 +2,3 @@ pub(crate) mod headers;
pub(crate) mod responses;
pub use responses::Compression;
pub(crate) use responses::attach_item_ids;

View File

@@ -1,4 +1,12 @@
use crate::common::ResponsesApiRequest;
use crate::error::ApiError;
use crate::provider::Provider;
use crate::requests::headers::build_conversation_headers;
use crate::requests::headers::insert_header;
use crate::requests::headers::subagent_header;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::SessionSource;
use http::HeaderMap;
use serde_json::Value;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
@@ -8,6 +16,33 @@ pub enum Compression {
Zstd,
}
pub(crate) fn build_responses_request_body(
request: &ResponsesApiRequest,
provider: &Provider,
) -> Result<Value, ApiError> {
let mut body = serde_json::to_value(request)
.map_err(|e| ApiError::Stream(format!("failed to encode responses request: {e}")))?;
if request.store == Some(true) && provider.is_azure_responses_endpoint() {
attach_item_ids(&mut body, &request.input);
}
Ok(body)
}
pub(crate) fn build_responses_request_headers(
mut extra_headers: HeaderMap,
conversation_id: Option<String>,
session_source: Option<SessionSource>,
) -> HeaderMap {
if let Some(ref conv_id) = conversation_id {
insert_header(&mut extra_headers, "x-client-request-id", conv_id);
}
extra_headers.extend(build_conversation_headers(conversation_id));
if let Some(subagent) = subagent_header(&session_source) {
insert_header(&mut extra_headers, "x-openai-subagent", &subagent);
}
extra_headers
}
pub(crate) fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) {
let Some(input_value) = payload_json.get_mut("input") else {
return;

View File

@@ -43,7 +43,7 @@ struct RecordingState {
}
impl RecordingState {
fn record(&self, req: Request) {
fn record_stream(&self, req: Request) {
let mut guard = self
.stream_requests
.lock()
@@ -78,7 +78,7 @@ impl HttpTransport for RecordingTransport {
}
async fn stream(&self, req: Request) -> Result<StreamResponse, TransportError> {
self.state.record(req);
self.state.record_stream(req);
let stream = futures::stream::iter(Vec::<Result<Bytes, TransportError>>::new());
Ok(StreamResponse {
@@ -325,12 +325,11 @@ async fn streaming_client_retries_on_transport_error() -> Result<()> {
instructions: "Say hi".into(),
input: Vec::new(),
tools: Vec::new(),
tool_choice: "auto".into(),
parallel_tool_calls: false,
reasoning: None,
store: false,
stream: true,
include: Vec::new(),
store: Some(false),
stream: Some(true),
include: Some(Vec::new()),
service_tier: None,
prompt_cache_key: None,
text: None,
@@ -426,12 +425,11 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> {
phase: None,
}],
tools: Vec::new(),
tool_choice: "auto".into(),
parallel_tool_calls: false,
reasoning: None,
store: true,
stream: true,
include: Vec::new(),
store: Some(true),
stream: Some(true),
include: Some(Vec::new()),
service_tier: None,
prompt_cache_key: None,
text: None,

View File

@@ -34,7 +34,6 @@ use std::sync::atomic::Ordering;
use codex_api::ApiError;
use codex_api::AuthProvider;
use codex_api::CompactClient as ApiCompactClient;
use codex_api::CompactionInput as ApiCompactionInput;
use codex_api::Compression;
use codex_api::MemoriesClient as ApiMemoriesClient;
use codex_api::MemorySummarizeInput as ApiMemorySummarizeInput;
@@ -203,6 +202,21 @@ pub struct ModelClient {
state: Arc<ModelClientState>,
}
/// Turn-scoped inputs for remote conversation-history compaction.
///
/// Remote compaction intentionally receives the same per-turn request controls as normal
/// `/responses` sampling so the two paths stay aligned when request shape evolves.
pub(crate) struct CompactConversationRequest<'a> {
pub(crate) prompt: &'a Prompt,
pub(crate) model_info: &'a ModelInfo,
pub(crate) session_telemetry: &'a SessionTelemetry,
pub(crate) effort: Option<ReasoningEffortConfig>,
pub(crate) summary: ReasoningSummaryConfig,
pub(crate) service_tier: Option<ServiceTier>,
pub(crate) turn_metadata_header: Option<&'a str>,
pub(crate) compaction_trace: &'a CompactionTraceContext,
}
/// A turn-scoped streaming session created from a [`ModelClient`].
///
/// The session establishes a Responses WebSocket connection lazily and reuses it across multiple
@@ -289,6 +303,32 @@ fn sideband_websocket_auth_headers(api_auth: &dyn AuthProvider) -> ApiHeaderMap
headers
}
/// Adapts the shared `/responses` request for the `/responses/compact` endpoint.
///
/// Remote compaction starts from the same request body and transport options as a normal sampling
/// turn, then clears only the body fields the compact endpoint rejects. Because compact cannot
/// carry `client_metadata` in the body, installation attribution is preserved as a header to match
/// the previous compact path.
fn prepare_responses_compact_request(
request: &mut ResponsesApiRequest,
options: &mut ApiResponsesOptions,
auth: Option<&CodexAuth>,
installation_id: &str,
) {
request.store = None;
request.stream = None;
request.include = None;
request.client_metadata = None;
if auth.is_some_and(CodexAuth::is_api_key_auth) {
request.service_tier = None;
}
if let Ok(header_value) = HeaderValue::from_str(installation_id) {
options
.extra_headers
.insert(X_CODEX_INSTALLATION_ID_HEADER, header_value);
}
}
impl ModelClient {
#[allow(clippy::too_many_arguments)]
/// Creates a new session-scoped `ModelClient`.
@@ -408,17 +448,22 @@ impl ModelClient {
/// This is a unary call (no streaming) that returns a new list of
/// `ResponseItem`s representing the compacted transcript.
///
/// The model selection and telemetry context are passed explicitly to keep `ModelClient`
/// session-scoped.
pub async fn compact_conversation_history(
/// Per-turn request settings are passed explicitly, matching the normal `/responses` streaming
/// callsite while keeping `ModelClient` session-scoped.
pub(crate) async fn compact_conversation_history(
&self,
prompt: &Prompt,
model_info: &ModelInfo,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
session_telemetry: &SessionTelemetry,
compaction_trace: &CompactionTraceContext,
request: CompactConversationRequest<'_>,
) -> Result<Vec<ResponseItem>> {
let CompactConversationRequest {
prompt,
model_info,
session_telemetry,
effort,
summary,
service_tier,
turn_metadata_header,
compaction_trace,
} = request;
if prompt.input.is_empty() {
return Ok(Vec::new());
}
@@ -434,53 +479,31 @@ impl ModelClient {
RequestRouteTelemetry::for_endpoint(RESPONSES_COMPACT_ENDPOINT),
self.state.auth_env_telemetry.clone(),
);
let request = self.build_responses_request(
let mut request = self.build_responses_request(
&client_setup.api_provider,
prompt,
model_info,
effort,
summary,
/*service_tier*/ None,
service_tier,
)?;
let ResponsesApiRequest {
model,
instructions,
input,
tools,
parallel_tool_calls,
reasoning,
text,
..
} = request;
let mut options = self.build_responses_options(
/*turn_state*/ None,
turn_metadata_header,
Compression::None,
);
prepare_responses_compact_request(
&mut request,
&mut options,
client_setup.auth.as_ref(),
&self.state.installation_id,
);
let trace_attempt = compaction_trace.start_attempt(&request);
let client =
ApiCompactClient::new(transport, client_setup.api_provider, client_setup.api_auth)
.with_telemetry(Some(request_telemetry));
let payload = ApiCompactionInput {
model: &model,
input: &input,
instructions: &instructions,
tools,
parallel_tool_calls,
reasoning,
text,
};
let mut extra_headers = ApiHeaderMap::new();
if let Ok(header_value) = HeaderValue::from_str(&self.state.installation_id) {
extra_headers.insert(X_CODEX_INSTALLATION_ID_HEADER, header_value);
}
extra_headers.extend(build_responses_headers(
self.state.beta_features_header.as_deref(),
/*turn_state*/ None,
/*turn_metadata_header*/ None,
));
extra_headers.extend(self.build_responses_identity_headers());
extra_headers.extend(build_conversation_headers(Some(
self.state.conversation_id.to_string(),
)));
let trace_attempt = compaction_trace.start_attempt(&payload);
let result = client
.compact_input(&payload, extra_headers)
.compact_request(request, options)
.await
.map_err(map_api_error);
trace_attempt.record_result(result.as_deref());
@@ -643,28 +666,9 @@ impl ModelClient {
request_telemetry
}
fn build_reasoning(
model_info: &ModelInfo,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
) -> Option<Reasoning> {
if model_info.supports_reasoning_summaries {
Some(Reasoning {
effort: effort.or(model_info.default_reasoning_level),
summary: if summary == ReasoningSummaryConfig::None {
None
} else {
Some(summary)
},
})
} else {
None
}
}
fn build_responses_request(
&self,
provider: &codex_api::Provider,
provider: &ApiProvider,
prompt: &Prompt,
model_info: &ModelInfo,
effort: Option<ReasoningEffortConfig>,
@@ -674,7 +678,19 @@ impl ModelClient {
let instructions = &prompt.base_instructions.text;
let input = prompt.get_formatted_input();
let tools = create_tools_json_for_responses_api(&prompt.tools)?;
let reasoning = Self::build_reasoning(model_info, effort, summary);
let default_reasoning_effort = model_info.default_reasoning_level;
let reasoning = if model_info.supports_reasoning_summaries {
Some(Reasoning {
effort: effort.or(default_reasoning_effort),
summary: if summary == ReasoningSummaryConfig::None {
None
} else {
Some(summary)
},
})
} else {
None
};
let include = if reasoning.is_some() {
vec!["reasoning.encrypted_content".to_string()]
} else {
@@ -702,12 +718,11 @@ impl ModelClient {
instructions: instructions.clone(),
input,
tools,
tool_choice: "auto".to_string(),
parallel_tool_calls: prompt.parallel_tool_calls,
reasoning,
store: provider.is_azure_responses_endpoint(),
stream: true,
include,
store: Some(provider.is_azure_responses_endpoint()),
stream: Some(true),
include: Some(include),
service_tier: match service_tier {
Some(ServiceTier::Fast) => Some("priority".to_string()),
Some(service_tier) => Some(service_tier.to_string()),
@@ -723,6 +738,35 @@ impl ModelClient {
Ok(request)
}
/// Builds shared Responses API transport options and request-body options.
///
/// Keeping option construction in one place ensures request-scoped headers are consistent
/// regardless of transport choice.
fn build_responses_options(
&self,
turn_state: Option<Arc<OnceLock<String>>>,
turn_metadata_header: Option<&str>,
compression: Compression,
) -> ApiResponsesOptions {
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
let conversation_id = self.state.conversation_id.to_string();
ApiResponsesOptions {
conversation_id: Some(conversation_id),
session_source: Some(self.state.session_source.clone()),
extra_headers: {
let mut headers = build_responses_headers(
self.state.beta_features_header.as_deref(),
turn_state.as_ref(),
turn_metadata_header.as_ref(),
);
headers.extend(self.build_responses_identity_headers());
headers
},
compression,
turn_state,
}
}
/// Returns whether the Responses-over-WebSocket transport is active for this session.
///
/// WebSocket use is controlled by provider capability and session-scoped fallback state.
@@ -892,35 +936,6 @@ impl ModelClientSession {
.set_connection_reused(/*connection_reused*/ false);
}
#[allow(clippy::too_many_arguments)]
/// Builds shared Responses API transport options and request-body options.
///
/// Keeping option construction in one place ensures request-scoped headers are consistent
/// regardless of transport choice.
fn build_responses_options(
&self,
turn_metadata_header: Option<&str>,
compression: Compression,
) -> ApiResponsesOptions {
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
let conversation_id = self.client.state.conversation_id.to_string();
ApiResponsesOptions {
conversation_id: Some(conversation_id),
session_source: Some(self.client.state.session_source.clone()),
extra_headers: {
let mut headers = build_responses_headers(
self.client.state.beta_features_header.as_deref(),
Some(&self.turn_state),
turn_metadata_header.as_ref(),
);
headers.extend(self.client.build_responses_identity_headers());
headers
},
compression,
turn_state: Some(Arc::clone(&self.turn_state)),
}
}
fn get_incremental_items(
&self,
request: &ResponsesApiRequest,
@@ -1190,7 +1205,11 @@ impl ModelClientSession {
self.client.state.auth_env_telemetry.clone(),
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
let options = self.client.build_responses_options(
Some(Arc::clone(&self.turn_state)),
turn_metadata_header,
compression,
);
let request = self.client.build_responses_request(
&client_setup.api_provider,
@@ -1297,7 +1316,11 @@ impl ModelClientSession {
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
let options = self.client.build_responses_options(
Some(Arc::clone(&self.turn_state)),
turn_metadata_header,
compression,
);
let request = self.client.build_responses_request(
&client_setup.api_provider,
prompt,

View File

@@ -17,12 +17,11 @@ fn serializes_text_verbosity_when_set() {
instructions: "i".to_string(),
input,
tools,
tool_choice: "auto".to_string(),
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
store: Some(false),
stream: Some(true),
include: Some(vec![]),
prompt_cache_key: None,
service_tier: None,
text: Some(TextControls {
@@ -64,12 +63,11 @@ fn serializes_text_schema_with_strict_format() {
instructions: "i".to_string(),
input,
tools,
tool_choice: "auto".to_string(),
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
store: Some(false),
stream: Some(true),
include: Some(vec![]),
prompt_cache_key: None,
service_tier: None,
text: Some(text_controls),
@@ -125,12 +123,11 @@ fn omits_text_when_not_set() {
instructions: "i".to_string(),
input,
tools,
tool_choice: "auto".to_string(),
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
store: Some(false),
stream: Some(true),
include: Some(vec![]),
prompt_cache_key: None,
service_tier: None,
text: None,
@@ -148,12 +145,11 @@ fn serializes_flex_service_tier_when_set() {
instructions: "i".to_string(),
input: vec![],
tools: vec![],
tool_choice: "auto".to_string(),
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
store: Some(false),
stream: Some(true),
include: Some(vec![]),
prompt_cache_key: None,
service_tier: Some(ServiceTier::Flex.to_string()),
text: None,

View File

@@ -2,6 +2,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use crate::Prompt;
use crate::client::CompactConversationRequest;
use crate::compact::CompactionAnalyticsAttempt;
use crate::compact::InitialContextInjection;
use crate::compact::compaction_status_from_result;
@@ -164,17 +165,20 @@ async fn run_remote_compact_task_inner_impl(
output_schema: None,
output_schema_strict: true,
};
let turn_metadata_header = turn_context.turn_metadata_state.current_header_value();
let mut new_history = sess
.services
.model_client
.compact_conversation_history(
&prompt,
&turn_context.model_info,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
&turn_context.session_telemetry,
&compaction_trace,
)
.compact_conversation_history(CompactConversationRequest {
prompt: &prompt,
model_info: &turn_context.model_info,
session_telemetry: &turn_context.session_telemetry,
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
service_tier: turn_context.config.service_tier,
turn_metadata_header: turn_metadata_header.as_deref(),
compaction_trace: &compaction_trace,
})
.or_else(|err| async {
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
let compact_request_log_data =

View File

@@ -242,6 +242,158 @@ pub fn format_labeled_items_snapshot(
format!("Scenario: {scenario}\n\n{sections}")
}
/// Render changed JSON lines between two captured `/responses` request bodies.
///
/// Request-parity tests use this to compare the entire JSON payload while showing only fields that
/// changed, with the same redactions as the other context snapshots.
pub fn format_request_body_diff_snapshot(
scenario: &str,
before_title: &str,
before_request: &ResponsesRequest,
after_title: &str,
after_request: &ResponsesRequest,
options: &ContextSnapshotOptions,
) -> String {
let before = format_request_body_snapshot(before_request, options);
let after = format_request_body_snapshot(after_request, options);
let diff = format_changed_lines_diff(before_title, &before, after_title, &after);
format!("Scenario: {scenario}\n\n{diff}")
}
fn format_request_body_snapshot(
request: &ResponsesRequest,
options: &ContextSnapshotOptions,
) -> String {
let mut body = request.body_json();
canonicalize_json_snapshot_value(&mut body, options);
serde_json::to_string_pretty(&body).expect("request body should serialize")
}
fn canonicalize_json_snapshot_value(value: &mut Value, options: &ContextSnapshotOptions) {
match value {
Value::Array(values) => {
for value in values {
canonicalize_json_snapshot_value(value, options);
}
}
Value::Object(map) => {
// Keep request-body snapshots stable when serde_json preserves insertion order.
let mut entries = std::mem::take(map).into_iter().collect::<Vec<_>>();
entries.sort_by(|(left_key, _), (right_key, _)| left_key.cmp(right_key));
for (key, mut value) in entries {
canonicalize_json_snapshot_value(&mut value, options);
map.insert(key, value);
}
}
Value::String(text) => {
*text = format_snapshot_json_string(text, options);
}
Value::Null | Value::Bool(_) | Value::Number(_) => {}
}
}
fn format_snapshot_json_string(text: &str, options: &ContextSnapshotOptions) -> String {
let normalized = match options.render_mode {
ContextSnapshotRenderMode::RedactedText
| ContextSnapshotRenderMode::KindWithTextPrefix { .. } => normalize_snapshot_uuids(
&normalize_snapshot_line_endings(&canonicalize_snapshot_text(text)),
),
ContextSnapshotRenderMode::FullText => {
normalize_snapshot_uuids(&normalize_snapshot_line_endings(text))
}
ContextSnapshotRenderMode::KindOnly => unreachable!(),
};
match options.render_mode {
ContextSnapshotRenderMode::KindWithTextPrefix { max_chars }
if normalized.chars().count() > max_chars =>
{
let prefix = normalized.chars().take(max_chars).collect::<String>();
format!("{prefix}...")
}
ContextSnapshotRenderMode::RedactedText
| ContextSnapshotRenderMode::FullText
| ContextSnapshotRenderMode::KindWithTextPrefix { .. } => normalized,
ContextSnapshotRenderMode::KindOnly => unreachable!(),
}
}
fn format_changed_lines_diff(
before_title: &str,
before: &str,
after_title: &str,
after: &str,
) -> String {
let before_lines = before.lines().collect::<Vec<&str>>();
let after_lines = after.lines().collect::<Vec<&str>>();
let mut diff = format!("--- {before_title}\n+++ {after_title}\n");
for line in diff_lines(before_lines.as_slice(), after_lines.as_slice()) {
match line {
DiffLine::Equal => {}
DiffLine::Remove(text) => {
diff.push('-');
diff.push_str(text);
diff.push('\n');
}
DiffLine::Add(text) => {
diff.push('+');
diff.push_str(text);
diff.push('\n');
}
}
}
diff
}
enum DiffLine<'a> {
Equal,
Remove(&'a str),
Add(&'a str),
}
fn diff_lines<'a>(before: &[&'a str], after: &[&'a str]) -> Vec<DiffLine<'a>> {
let after_len = after.len();
let mut lengths = vec![0usize; (before.len() + 1) * (after_len + 1)];
for before_index in (0..before.len()).rev() {
for after_index in (0..after.len()).rev() {
let offset = before_index * (after_len + 1) + after_index;
lengths[offset] = if before[before_index] == after[after_index] {
lengths[(before_index + 1) * (after_len + 1) + after_index + 1] + 1
} else {
lengths[(before_index + 1) * (after_len + 1) + after_index]
.max(lengths[before_index * (after_len + 1) + after_index + 1])
};
}
}
let mut lines = Vec::new();
let mut before_index = 0usize;
let mut after_index = 0usize;
while before_index < before.len() && after_index < after.len() {
if before[before_index] == after[after_index] {
lines.push(DiffLine::Equal);
before_index += 1;
after_index += 1;
} else if lengths[(before_index + 1) * (after_len + 1) + after_index]
>= lengths[before_index * (after_len + 1) + after_index + 1]
{
lines.push(DiffLine::Remove(before[before_index]));
before_index += 1;
} else {
lines.push(DiffLine::Add(after[after_index]));
after_index += 1;
}
}
while before_index < before.len() {
lines.push(DiffLine::Remove(before[before_index]));
before_index += 1;
}
while after_index < after.len() {
lines.push(DiffLine::Add(after[after_index]));
after_index += 1;
}
lines
}
fn format_snapshot_text(text: &str, options: &ContextSnapshotOptions) -> String {
match options.render_mode {
ContextSnapshotRenderMode::RedactedText => {
@@ -342,6 +494,17 @@ fn normalize_dynamic_snapshot_paths(text: &str) -> String {
.into_owned()
}
fn normalize_snapshot_uuids(text: &str) -> String {
static UUID_RE: OnceLock<Regex> = OnceLock::new();
let uuid_re = UUID_RE.get_or_init(|| {
Regex::new(
r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b",
)
.expect("uuid regex should compile")
});
uuid_re.replace_all(text, "<UUID>").into_owned()
}
#[cfg(test)]
mod tests {
use super::ContextSnapshotOptions;

View File

@@ -4,9 +4,12 @@ use std::fs;
use std::path::PathBuf;
use anyhow::Result;
use codex_core::X_CODEX_INSTALLATION_ID_HEADER;
use codex_core::X_CODEX_TURN_METADATA_HEADER;
use codex_core::compact::SUMMARY_PREFIX;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_protocol::config_types::ServiceTier;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
@@ -412,6 +415,281 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
Ok(())
}
async fn assert_remote_manual_compact_matches_last_sampling_request_after_varied_history(
auth: CodexAuth,
snapshot_name: &str,
scenario: &str,
normal_only_fields: &[&str],
) -> Result<()> {
// Phase 1: script five completed user turns with deliberately different output shapes. The
// unsupported tool call and local shell call each add a continuation request, so the mock
// captures seven normal `/responses` requests for five logical turns.
let harness =
TestCodexHarness::with_builder(test_codex().with_auth(auth).with_config(|config| {
config.service_tier = Some(ServiceTier::Fast);
}))
.await?;
let codex = harness.test().codex.clone();
let image_url =
"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGNgYAAAAAMAASsJTYQAAAAASUVORK5CYII="
.to_string();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("turn-one-assistant", "TURN_ONE_ASSISTANT"),
responses::ev_completed("turn-one-response"),
]),
responses::sse(vec![
responses::ev_reasoning_item(
"turn-two-reasoning",
&["TURN_TWO_REASONING"],
&["turn two raw content"],
),
responses::ev_assistant_message("turn-two-assistant", "TURN_TWO_ASSISTANT"),
responses::ev_completed("turn-two-response"),
]),
responses::sse(vec![
responses::ev_function_call("turn-three-call", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed("turn-three-call-response"),
]),
responses::sse(vec![
responses::ev_assistant_message("turn-three-assistant", "TURN_THREE_ASSISTANT"),
responses::ev_completed("turn-three-final-response"),
]),
responses::sse(vec![
responses::ev_local_shell_call(
"turn-four-local-shell",
"completed",
vec!["/bin/echo", "TURN_FOUR_LOCAL_SHELL"],
),
responses::ev_completed("turn-four-local-shell-response"),
]),
responses::sse(vec![
responses::ev_assistant_message("turn-four-assistant", "TURN_FOUR_ASSISTANT"),
responses::ev_completed("turn-four-final-response"),
]),
responses::sse(vec![
responses::ev_reasoning_item(
"turn-five-reasoning",
&["TURN_FIVE_REASONING"],
&["turn five raw content"],
),
responses::ev_assistant_message("turn-five-assistant", "TURN_FIVE_ASSISTANT"),
responses::ev_completed("turn-five-response"),
]),
],
)
.await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
"REMOTE_DIFF_SUMMARY",
)
.await;
// Phase 2: drive the varied user inputs through the real session path. The final turn is a
// normal sampling request first; only after it completes do we manually compact.
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "TURN_ONE_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![
UserInput::Text {
text: "TURN_TWO_PREFIX".to_string(),
text_elements: Vec::new(),
},
UserInput::Text {
text: "TURN_TWO_SUFFIX".to_string(),
text_elements: Vec::new(),
},
],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "TURN_THREE_TOOL_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![
UserInput::Image { image_url },
UserInput::Text {
text: "TURN_FOUR_IMAGE_USER".to_string(),
text_elements: Vec::new(),
},
],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "TURN_FIVE_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_turn_complete(&codex).await;
// Phase 3: trigger remote manual compaction after the fifth turn is fully recorded. This makes
// the last normal request the "before compact" baseline and the compact request the "after"
// request that should append only completed final-turn outputs.
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
let response_requests = responses_mock.requests();
assert_eq!(
response_requests.len(),
7,
"expected five turns with one unsupported tool continuation and one local shell continuation"
);
assert_eq!(
compact_mock.requests().len(),
1,
"expected exactly one remote compact request"
);
let last_turn_request = response_requests
.last()
.cloned()
.expect("last turn request missing");
let compact_request = compact_mock.single_request();
// Phase 4: first compare the full non-input request body. Normal `/responses` accepts a few
// transport/caching metadata fields that `/responses/compact` rejects, so strip those from the
// normal request only; if compact sends any of them, the whole-object equality still fails.
let mut last_turn_body_without_input = last_turn_request.body_json();
last_turn_body_without_input
.as_object_mut()
.expect("responses request body should be an object")
.remove("input");
let mut compact_body_without_input = compact_request.body_json();
compact_body_without_input
.as_object_mut()
.expect("compact request body should be an object")
.remove("input");
for field in normal_only_fields {
last_turn_body_without_input
.as_object_mut()
.expect("responses request body should be an object")
.remove(*field);
}
assert_eq!(compact_body_without_input, last_turn_body_without_input);
let last_normal_installation_id = last_turn_request
.body_json()
.get("client_metadata")
.and_then(|client_metadata| client_metadata.get(X_CODEX_INSTALLATION_ID_HEADER))
.and_then(Value::as_str)
.map(str::to_string)
.expect("normal responses request should include installation client metadata");
assert_eq!(
json!({
"last_normal_has_turn_metadata_header": last_turn_request
.header(X_CODEX_TURN_METADATA_HEADER)
.is_some(),
"compact_has_turn_metadata_header": compact_request
.header(X_CODEX_TURN_METADATA_HEADER)
.is_some(),
"last_normal_installation_id": last_normal_installation_id,
"compact_installation_id": compact_request
.header(X_CODEX_INSTALLATION_ID_HEADER),
}),
json!({
"last_normal_has_turn_metadata_header": true,
"compact_has_turn_metadata_header": true,
"last_normal_installation_id": last_normal_installation_id,
"compact_installation_id": last_normal_installation_id,
}),
);
// Phase 5: then snapshot the whole JSON request-body delta. The expected model-visible input
// change is append-only: final-turn reasoning and assistant output appear in compact because
// the normal request was captured before those outputs existed.
insta::assert_snapshot!(
snapshot_name,
context_snapshot::format_request_body_diff_snapshot(
scenario,
"Last Normal /responses Request",
&last_turn_request,
"Remote /responses/compact Request",
&compact_request,
&ContextSnapshotOptions::default(),
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_manual_compact_varied_history_api_auth() -> Result<()> {
skip_if_no_network!(Ok(()));
assert_remote_manual_compact_matches_last_sampling_request_after_varied_history(
CodexAuth::from_api_key("dummy"),
"remote_manual_compact_varied_history_api_auth_request_diff",
"After five varied turns with API-key auth, remote manual compaction reuses the last sampling request input, strips compact-rejected fields including service_tier, and appends the completed final-turn outputs.",
&[
"store",
"stream",
"include",
"client_metadata",
"service_tier",
],
)
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_manual_compact_varied_history_chatgpt_auth() -> Result<()> {
skip_if_no_network!(Ok(()));
assert_remote_manual_compact_matches_last_sampling_request_after_varied_history(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
"remote_manual_compact_varied_history_chatgpt_auth_request_diff",
"After five varied turns with ChatGPT auth, remote manual compaction reuses the last sampling request input and only appends the completed final-turn outputs.",
&["store", "stream", "include", "client_metadata"],
)
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_v2_reuses_context_compaction_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -0,0 +1,43 @@
---
source: core/tests/suite/compact_remote.rs
expression: "context_snapshot::format_request_body_diff_snapshot(scenario,\n\"Last Normal /responses Request\", &last_turn_request,\n\"Remote /responses/compact Request\", &compact_request,\n&ContextSnapshotOptions::default(),)"
---
Scenario: After five varied turns with API-key auth, remote manual compaction reuses the last sampling request input, strips compact-rejected fields including service_tier, and appends the completed final-turn outputs.
--- Last Normal /responses Request
+++ Remote /responses/compact Request
- "client_metadata": {
- "x-codex-installation-id": "<UUID>"
- },
- "include": [
- "reasoning.encrypted_content"
- ],
+ },
+ {
+ "content": [
+ {
+ "text": "turn five raw content",
+ "type": "reasoning_text"
+ }
+ ],
+ "encrypted_content": "YmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYnR1cm4gZml2ZSByYXcgY29udGVudA==",
+ "summary": [
+ {
+ "text": "TURN_FIVE_REASONING",
+ "type": "summary_text"
+ }
+ ],
+ "type": "reasoning"
+ },
+ {
+ "content": [
+ {
+ "text": "TURN_FIVE_ASSISTANT",
+ "type": "output_text"
+ }
+ ],
+ "role": "assistant",
+ "type": "message"
- "service_tier": "priority",
- "store": false,
- "stream": true,

View File

@@ -0,0 +1,42 @@
---
source: core/tests/suite/compact_remote.rs
expression: "context_snapshot::format_request_body_diff_snapshot(scenario,\n\"Last Normal /responses Request\", &last_turn_request,\n\"Remote /responses/compact Request\", &compact_request,\n&ContextSnapshotOptions::default(),)"
---
Scenario: After five varied turns with ChatGPT auth, remote manual compaction reuses the last sampling request input and only appends the completed final-turn outputs.
--- Last Normal /responses Request
+++ Remote /responses/compact Request
- "client_metadata": {
- "x-codex-installation-id": "<UUID>"
- },
- "include": [
- "reasoning.encrypted_content"
- ],
+ },
+ {
+ "content": [
+ {
+ "text": "turn five raw content",
+ "type": "reasoning_text"
+ }
+ ],
+ "encrypted_content": "YmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYmJiYnR1cm4gZml2ZSByYXcgY29udGVudA==",
+ "summary": [
+ {
+ "text": "TURN_FIVE_REASONING",
+ "type": "summary_text"
+ }
+ ],
+ "type": "reasoning"
+ },
+ {
+ "content": [
+ {
+ "text": "TURN_FIVE_ASSISTANT",
+ "type": "output_text"
+ }
+ ],
+ "role": "assistant",
+ "type": "message"
- "store": false,
- "stream": true,