From 0322ac3df89c945760e68db1d910119a584355be Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 15 May 2026 11:40:35 +0200 Subject: [PATCH] [codex] Use compaction_trigger item for remote compaction v2 (#22809) ## Why Remote compaction v2 was still using `context_compaction` as both the request trigger and the compacted output shape. The Responses API now has the landed contract for this flow: Codex sends a dedicated `{ "type": "compaction_trigger" }` input item, and the backend returns the standard `compaction` output item with encrypted content. This aligns the v2 path with that wire contract while preserving the existing local compacted-history post-processing behavior. ## What changed - Add `ResponseItem::CompactionTrigger` and regenerate the app-server protocol schema fixtures. - Send `compaction_trigger` from `remote_compaction_v2` instead of a payload-less `context_compaction`. - Collect exactly one backend `compaction` output item, then reuse the existing compacted-history rebuilding path. - Treat the trigger item as a transient request marker rather than model output or persisted rollout/memory content. ## Verification - `cargo test -p codex-protocol compaction_trigger` - `cargo test -p codex-core remote_compact_v2` - `cargo test -p codex-core compact_remote_v2` - `cargo test -p codex-core responses_websocket_sends_response_processed_after_remote_compaction_v2` - `just write-app-server-schema` - `cargo test -p codex-app-server-protocol schema_fixtures` --- .../schema/json/ClientRequest.json | 16 +++++ .../codex_app_server_protocol.schemas.json | 16 +++++ .../codex_app_server_protocol.v2.schemas.json | 16 +++++ .../RawResponseItemCompletedNotification.json | 16 +++++ .../schema/json/v2/ThreadResumeParams.json | 16 +++++ .../schema/typescript/ResponseItem.ts | 2 +- codex-rs/core/src/agent/control.rs | 1 + codex-rs/core/src/arc_monitor.rs | 1 + codex-rs/core/src/compact_remote.rs | 1 + codex-rs/core/src/compact_remote_v2.rs | 61 +++++++------------ codex-rs/core/src/context_manager/history.rs | 3 + codex-rs/core/src/session/turn.rs | 1 + codex-rs/core/src/turn_timing.rs | 1 + .../core/tests/suite/client_websockets.rs | 2 +- codex-rs/core/tests/suite/compact_remote.rs | 25 ++++---- codex-rs/otel/src/events/session_telemetry.rs | 1 + codex-rs/protocol/src/models.rs | 23 +++++-- codex-rs/rollout/src/policy.rs | 2 + 18 files changed, 145 insertions(+), 59 deletions(-) diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index f40d16feb6..fffc610128 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -2729,6 +2729,22 @@ "title": "CompactionResponseItem", "type": "object" }, + { + "properties": { + "type": { + "enum": [ + "compaction_trigger" + ], + "title": "CompactionTriggerResponseItemType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "CompactionTriggerResponseItem", + "type": "object" + }, { "properties": { "encrypted_content": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index e37c26881b..8b292f667d 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -14065,6 +14065,22 @@ "title": "CompactionResponseItem", "type": "object" }, + { + "properties": { + "type": { + "enum": [ + "compaction_trigger" + ], + "title": "CompactionTriggerResponseItemType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "CompactionTriggerResponseItem", + "type": "object" + }, { "properties": { "encrypted_content": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 653a1a5751..16e548e8c2 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -10614,6 +10614,22 @@ "title": "CompactionResponseItem", "type": "object" }, + { + "properties": { + "type": { + "enum": [ + "compaction_trigger" + ], + "title": "CompactionTriggerResponseItemType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "CompactionTriggerResponseItem", + "type": "object" + }, { "properties": { "encrypted_content": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json index 6973d15baa..bd88872fb6 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json @@ -732,6 +732,22 @@ "title": "CompactionResponseItem", "type": "object" }, + { + "properties": { + "type": { + "enum": [ + "compaction_trigger" + ], + "title": "CompactionTriggerResponseItemType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "CompactionTriggerResponseItem", + "type": "object" + }, { "properties": { "encrypted_content": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json index 27674afc7b..e8a437f003 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json @@ -799,6 +799,22 @@ "title": "CompactionResponseItem", "type": "object" }, + { + "properties": { + "type": { + "enum": [ + "compaction_trigger" + ], + "title": "CompactionTriggerResponseItemType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "CompactionTriggerResponseItem", + "type": "object" + }, { "properties": { "encrypted_content": { diff --git a/codex-rs/app-server-protocol/schema/typescript/ResponseItem.ts b/codex-rs/app-server-protocol/schema/typescript/ResponseItem.ts index 6fa9beee25..e5e960ff81 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ResponseItem.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ResponseItem.ts @@ -14,4 +14,4 @@ export type ResponseItem = { "type": "message", role: string, content: Array bool { | ResponseItem::WebSearchCall { .. } | ResponseItem::ImageGenerationCall { .. } | ResponseItem::Compaction { .. } + | ResponseItem::CompactionTrigger | ResponseItem::ContextCompaction { .. } | ResponseItem::Other, ) => false, diff --git a/codex-rs/core/src/arc_monitor.rs b/codex-rs/core/src/arc_monitor.rs index d1e679a631..a4f7e038e0 100644 --- a/codex-rs/core/src/arc_monitor.rs +++ b/codex-rs/core/src/arc_monitor.rs @@ -384,6 +384,7 @@ fn build_arc_monitor_message_item( | ResponseItem::ToolSearchOutput { .. } | ResponseItem::ImageGenerationCall { .. } | ResponseItem::Compaction { .. } + | ResponseItem::CompactionTrigger | ResponseItem::ContextCompaction { .. } | ResponseItem::Other => None, } diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index cc31d50b13..bc684647eb 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -302,6 +302,7 @@ fn should_keep_compacted_history_item(item: &ResponseItem) -> bool { ResponseItem::Message { role, .. } if role == "assistant" => true, ResponseItem::Message { .. } => false, ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. } => true, + ResponseItem::CompactionTrigger => false, ResponseItem::Reasoning { .. } | ResponseItem::LocalShellCall { .. } | ResponseItem::FunctionCall { .. } diff --git a/codex-rs/core/src/compact_remote_v2.rs b/codex-rs/core/src/compact_remote_v2.rs index 4dafb95f8e..9a7a7435f1 100644 --- a/codex-rs/core/src/compact_remote_v2.rs +++ b/codex-rs/core/src/compact_remote_v2.rs @@ -165,9 +165,7 @@ async fn run_remote_compact_task_inner_impl( ) .await?; let mut input = prompt_input.clone(); - input.push(ResponseItem::ContextCompaction { - encrypted_content: None, - }); + input.push(ResponseItem::CompactionTrigger); let prompt = Prompt { input, tools: tool_router.model_visible_specs(), @@ -276,38 +274,25 @@ async fn run_remote_compaction_request_v2( Err(err) }) .await?; - collect_context_compaction_output(stream).await + collect_compaction_output(stream).await } -async fn collect_context_compaction_output( +async fn collect_compaction_output( mut stream: ResponseStream, ) -> CodexResult<(ResponseItem, String)> { let mut output_item_count = 0usize; - let mut context_compaction_count = 0usize; - let mut context_compaction_output = None; + let mut compaction_count = 0usize; + let mut compaction_output = None; let mut completed_response_id = None; while let Some(event) = stream.next().await { match event? { ResponseEvent::OutputItemDone(item) => { output_item_count += 1; - match item { - ResponseItem::ContextCompaction { - encrypted_content: Some(_), - } => { - context_compaction_count += 1; - if context_compaction_output.is_none() { - context_compaction_output = Some(item); - } + if let ResponseItem::Compaction { .. } = item { + compaction_count += 1; + if compaction_output.is_none() { + compaction_output = Some(item); } - ResponseItem::ContextCompaction { - encrypted_content: None, - } => { - return Err(CodexErr::Fatal( - "remote compaction v2 returned context_compaction without encrypted_content" - .to_string(), - )); - } - _ => {} } } ResponseEvent::Completed { response_id, .. } => { @@ -324,16 +309,16 @@ async fn collect_context_compaction_output( )); }; - if context_compaction_count != 1 { + if compaction_count != 1 { return Err(CodexErr::Fatal(format!( - "remote compaction v2 expected exactly one context_compaction output item, got {context_compaction_count} from {output_item_count} output items" + "remote compaction v2 expected exactly one compaction output item, got {compaction_count} from {output_item_count} output items" ))); } - let Some(context_compaction_output) = context_compaction_output else { - unreachable!("context compaction output must exist when count is exactly one"); + let Some(compaction_output) = compaction_output else { + unreachable!("compaction output must exist when count is exactly one"); }; - Ok((context_compaction_output, response_id)) + Ok((compaction_output, response_id)) } fn build_v2_compacted_history( @@ -410,8 +395,8 @@ mod tests { encrypted_content: "old".to_string(), }, ]; - let output = ResponseItem::ContextCompaction { - encrypted_content: Some("new".to_string()), + let output = ResponseItem::Compaction { + encrypted_content: "new".to_string(), }; let history = build_v2_compacted_history(&input, output.clone()); @@ -428,9 +413,9 @@ mod tests { } #[tokio::test] - async fn collect_context_compaction_output_accepts_additional_output_items() { - let context_compaction = ResponseItem::ContextCompaction { - encrypted_content: Some("encrypted".to_string()), + async fn collect_compaction_output_accepts_additional_output_items() { + let compaction = ResponseItem::Compaction { + encrypted_content: "encrypted".to_string(), }; let stream = response_stream(vec![ Ok(ResponseEvent::OutputItemDone(message( @@ -438,7 +423,7 @@ mod tests { "IGNORED_COMPACT_REPLY", Some(MessagePhase::FinalAnswer), ))), - Ok(ResponseEvent::OutputItemDone(context_compaction.clone())), + Ok(ResponseEvent::OutputItemDone(compaction.clone())), Ok(ResponseEvent::Completed { response_id: "resp-compact".to_string(), token_usage: None, @@ -446,11 +431,11 @@ mod tests { }), ]); - let (output, response_id) = collect_context_compaction_output(stream) + let (output, response_id) = collect_compaction_output(stream) .await - .expect("context compaction should be collected"); + .expect("compaction should be collected"); - assert_eq!(output, context_compaction); + assert_eq!(output, compaction); assert_eq!(response_id, "resp-compact"); } } diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 80c057e0eb..cfd9297382 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -400,6 +400,7 @@ impl ContextManager { | ResponseItem::ImageGenerationCall { .. } | ResponseItem::CustomToolCall { .. } | ResponseItem::Compaction { .. } + | ResponseItem::CompactionTrigger | ResponseItem::ContextCompaction { .. } | ResponseItem::Other => item.clone(), } @@ -490,6 +491,7 @@ fn is_api_message(message: &ResponseItem) -> bool { | ResponseItem::ImageGenerationCall { .. } | ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. } => true, + ResponseItem::CompactionTrigger => false, ResponseItem::Other => false, } } @@ -688,6 +690,7 @@ fn is_model_generated_item(item: &ResponseItem) -> bool { | ResponseItem::LocalShellCall { .. } | ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. } => true, + ResponseItem::CompactionTrigger => false, ResponseItem::FunctionCallOutput { .. } | ResponseItem::ToolSearchOutput { .. } | ResponseItem::CustomToolCallOutput { .. } diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 86710e4871..aee4bd360f 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -2002,6 +2002,7 @@ async fn try_run_sampling_request( | ResponseItem::WebSearchCall { .. } | ResponseItem::ImageGenerationCall { .. } | ResponseItem::Compaction { .. } + | ResponseItem::CompactionTrigger | ResponseItem::ContextCompaction { .. } | ResponseItem::Other => false, }; diff --git a/codex-rs/core/src/turn_timing.rs b/codex-rs/core/src/turn_timing.rs index 3d35afae95..570e4a6e32 100644 --- a/codex-rs/core/src/turn_timing.rs +++ b/codex-rs/core/src/turn_timing.rs @@ -185,6 +185,7 @@ fn response_item_records_turn_ttft(item: &ResponseItem) -> bool { | ResponseItem::ImageGenerationCall { .. } | ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. } => true, + ResponseItem::CompactionTrigger => false, ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::ToolSearchOutput { .. } diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 030aecd29e..a14ee6518f 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -253,7 +253,7 @@ async fn responses_websocket_sends_response_processed_after_remote_compaction_v2 json!({ "type": "response.output_item.done", "item": { - "type": "context_compaction", + "type": "compaction", "encrypted_content": "ENCRYPTED_CONTEXT_COMPACTION_SUMMARY", } }), diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 8849d80773..720ae10599 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -697,7 +697,7 @@ async fn remote_manual_compact_chatgpt_auth_reuses_service_tier_and_prompt_cache } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn remote_compact_v2_reuses_context_compaction_for_followups() -> Result<()> { +async fn remote_compact_v2_reuses_compaction_trigger_for_followups() -> Result<()> { skip_if_no_network!(Ok(())); let harness = TestCodexHarness::with_builder( @@ -721,7 +721,7 @@ async fn remote_compact_v2_reuses_context_compaction_for_followups() -> Result<( serde_json::json!({ "type": "response.output_item.done", "item": { - "type": "context_compaction", + "type": "compaction", "encrypted_content": "ENCRYPTED_CONTEXT_COMPACTION_SUMMARY", } }), @@ -778,8 +778,8 @@ async fn remote_compact_v2_reuses_context_compaction_for_followups() -> Result<( assert_eq!(compact_request.path(), "/v1/responses"); let compact_body = compact_request.body_json().to_string(); assert!( - compact_body.contains("\"type\":\"context_compaction\""), - "expected v2 compaction request to include the context_compaction trigger item" + compact_body.contains("\"type\":\"compaction_trigger\""), + "expected v2 compaction request to include the compaction_trigger item" ); assert!( !compact_body.contains("ENCRYPTED_CONTEXT_COMPACTION_SUMMARY"), @@ -789,12 +789,12 @@ async fn remote_compact_v2_reuses_context_compaction_for_followups() -> Result<( let follow_up_request = response_requests.last().expect("follow-up request missing"); let follow_up_body = follow_up_request.body_json().to_string(); assert!( - follow_up_body.contains("\"type\":\"context_compaction\""), - "expected follow-up request to preserve the v2 context_compaction item" + follow_up_body.contains("\"type\":\"compaction\""), + "expected follow-up request to preserve the compaction item" ); assert!( follow_up_body.contains("ENCRYPTED_CONTEXT_COMPACTION_SUMMARY"), - "expected follow-up request to include the context compaction payload" + "expected follow-up request to include the compaction payload" ); assert!( follow_up_body.contains("hello remote compact"), @@ -805,8 +805,7 @@ async fn remote_compact_v2_reuses_context_compaction_for_followups() -> Result<( } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn remote_compact_v2_accepts_additional_output_items_before_context_compaction() -> Result<()> -{ +async fn remote_compact_v2_accepts_additional_output_items_before_compaction() -> Result<()> { skip_if_no_network!(Ok(())); let harness = TestCodexHarness::with_builder( @@ -831,7 +830,7 @@ async fn remote_compact_v2_accepts_additional_output_items_before_context_compac serde_json::json!({ "type": "response.output_item.done", "item": { - "type": "context_compaction", + "type": "compaction", "encrypted_content": "ENCRYPTED_CONTEXT_COMPACTION_SUMMARY", } }), @@ -878,12 +877,12 @@ async fn remote_compact_v2_accepts_additional_output_items_before_context_compac let follow_up_request = response_requests.last().expect("follow-up request missing"); let follow_up_body = follow_up_request.body_json().to_string(); assert!( - follow_up_body.contains("\"type\":\"context_compaction\""), - "expected follow-up request to preserve the v2 context_compaction item" + follow_up_body.contains("\"type\":\"compaction\""), + "expected follow-up request to preserve the compaction item" ); assert!( follow_up_body.contains("ENCRYPTED_CONTEXT_COMPACTION_SUMMARY"), - "expected follow-up request to include the context compaction payload" + "expected follow-up request to include the compaction payload" ); assert!( !follow_up_body.contains("IGNORED_COMPACT_REPLY"), diff --git a/codex-rs/otel/src/events/session_telemetry.rs b/codex-rs/otel/src/events/session_telemetry.rs index 49d8b8eb7e..6394c73ea2 100644 --- a/codex-rs/otel/src/events/session_telemetry.rs +++ b/codex-rs/otel/src/events/session_telemetry.rs @@ -1141,6 +1141,7 @@ impl SessionTelemetry { ResponseItem::WebSearchCall { .. } => "web_search_call".into(), ResponseItem::ImageGenerationCall { .. } => "image_generation_call".into(), ResponseItem::Compaction { .. } => "compaction".into(), + ResponseItem::CompactionTrigger => "compaction_trigger".into(), ResponseItem::ContextCompaction { .. } => "context_compaction".into(), ResponseItem::Other => "other".into(), } diff --git a/codex-rs/protocol/src/models.rs b/codex-rs/protocol/src/models.rs index 512dbd5444..2706a47c82 100644 --- a/codex-rs/protocol/src/models.rs +++ b/codex-rs/protocol/src/models.rs @@ -887,7 +887,10 @@ pub enum ResponseItem { result: String, }, #[serde(alias = "compaction_summary")] - Compaction { encrypted_content: String }, + Compaction { + encrypted_content: String, + }, + CompactionTrigger, ContextCompaction { #[serde(default, skip_serializing_if = "Option::is_none")] #[ts(optional)] @@ -2407,20 +2410,28 @@ mod tests { } #[test] - fn serializes_context_compaction_trigger_without_payload() -> Result<()> { - let item = ResponseItem::ContextCompaction { - encrypted_content: None, - }; + fn serializes_compaction_trigger_without_payload() -> Result<()> { + let item = ResponseItem::CompactionTrigger; assert_eq!( serde_json::to_value(item)?, serde_json::json!({ - "type": "context_compaction", + "type": "compaction_trigger", }) ); Ok(()) } + #[test] + fn deserializes_compaction_trigger_without_payload() -> Result<()> { + let json = r#"{"type":"compaction_trigger"}"#; + + let item: ResponseItem = serde_json::from_str(json)?; + + assert_eq!(item, ResponseItem::CompactionTrigger); + Ok(()) + } + #[test] fn deserializes_legacy_ghost_snapshot_as_other() -> Result<()> { let json = r#"{ diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 9c42f8db61..ceb617763f 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -79,6 +79,7 @@ pub fn should_persist_response_item(item: &ResponseItem) -> bool { | ResponseItem::ImageGenerationCall { .. } | ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. } => true, + ResponseItem::CompactionTrigger => false, ResponseItem::Other => false, } } @@ -99,6 +100,7 @@ pub fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool { ResponseItem::Reasoning { .. } | ResponseItem::ImageGenerationCall { .. } | ResponseItem::Compaction { .. } + | ResponseItem::CompactionTrigger | ResponseItem::ContextCompaction { .. } | ResponseItem::Other => false, }