diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 30794012f5..305a899b3e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -3245,6 +3245,9 @@ impl Session { history_before_turn: &[ResponseItem], history_at_checkpoint: &[ResponseItem], ) { + // The server emits compaction as a streamed item before the response is fully complete. + // Wait until `response.completed` to rewrite local history so later streamed items from the + // same turn can still be appended in wire order before we collapse the checkpoint. let current_history = self.clone_history().await; let replacement_history = build_server_side_compaction_replacement_history( item.clone(), @@ -5963,6 +5966,11 @@ fn build_server_side_compaction_replacement_history( history_at_checkpoint: &[ResponseItem], current_history: &[ResponseItem], ) -> Vec { + // Rebuild the active turn around the compaction checkpoint: + // 1. keep the turn-local items that existed when compaction fired + // 2. replace any prior same-turn compaction summary with the newest one + // 3. re-append items that arrived later in the same streamed response + // 4. reattach ghost snapshots at the end so undo state survives the rewrite let checkpoint_turn_items = history_at_checkpoint .strip_prefix(history_before_turn) .unwrap_or(history_at_checkpoint); @@ -6130,6 +6138,13 @@ async fn maybe_run_previous_model_inline_compact( turn_context: &Arc, total_usage_tokens: i64, ) -> CodexResult { + // Keep OpenAI auto-compaction on one path. If inline server-side compaction is eligible for + // the current turn, let the normal pre-turn inline request handle it instead of running the + // older previous-model client-side preflight flow first. + if inline_server_side_compaction_threshold(sess, turn_context).is_some() { + return Ok(false); + } + let Some(previous_turn_settings) = sess.previous_turn_settings().await else { return Ok(false); }; diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index d95c7774c6..475732bbfa 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -286,56 +286,6 @@ fn assistant_message_stream_parsers_seed_plan_parser_across_added_and_delta_boun #[test] fn build_server_side_compaction_replacement_history_keeps_current_turn_inputs() { - let prior_snapshot = ghost_snapshot("ghost-before"); - let same_turn_snapshot = ghost_snapshot("ghost-during"); - let history_before_turn = vec![user_message("earlier"), prior_snapshot.clone()]; - let turn_start_context_items = vec![ - developer_message("fresh permissions"), - environment_context_message("/fresh"), - ]; - let current_turn_user = user_message("current turn"); - let current_turn_tool_output = ResponseItem::FunctionCallOutput { - call_id: "call-1".to_string(), - output: FunctionCallOutputPayload::from_text("tool result".to_string()), - }; - let current_history = vec![ - user_message("earlier"), - prior_snapshot.clone(), - turn_start_context_items[0].clone(), - turn_start_context_items[1].clone(), - current_turn_user.clone(), - current_turn_tool_output.clone(), - same_turn_snapshot.clone(), - ]; - let compaction_item = ResponseItem::Compaction { - encrypted_content: "INLINE_SUMMARY".to_string(), - }; - - let replacement_history = build_server_side_compaction_replacement_history( - compaction_item.clone(), - &turn_start_context_items, - &turn_start_context_items, - &history_before_turn, - ¤t_history, - ¤t_history, - ); - - assert_eq!( - replacement_history, - vec![ - turn_start_context_items[0].clone(), - turn_start_context_items[1].clone(), - current_turn_user, - current_turn_tool_output, - compaction_item, - prior_snapshot, - same_turn_snapshot, - ] - ); -} - -#[test] -fn build_server_side_compaction_replacement_history_preserves_turn_scoped_injections() { let prior_snapshot = ghost_snapshot("ghost-before"); let same_turn_snapshot = ghost_snapshot("ghost-during"); let history_before_turn = vec![user_message("earlier"), prior_snapshot.clone()]; @@ -392,163 +342,6 @@ fn build_server_side_compaction_replacement_history_preserves_turn_scoped_inject ); } -#[test] -fn build_server_side_compaction_replacement_history_replaces_prior_same_turn_summary() { - let prior_snapshot = ghost_snapshot("ghost-before"); - let same_turn_snapshot = ghost_snapshot("ghost-during"); - let history_before_turn = vec![user_message("earlier"), prior_snapshot.clone()]; - let turn_start_context_items = vec![ - developer_message("fresh permissions"), - environment_context_message("/fresh"), - ]; - let current_turn_user = user_message("current turn"); - let current_turn_tool_output = ResponseItem::FunctionCallOutput { - call_id: "call-1".to_string(), - output: FunctionCallOutputPayload::from_text("tool result".to_string()), - }; - let prior_compaction = ResponseItem::Compaction { - encrypted_content: "INLINE_SUMMARY_1".to_string(), - }; - let new_compaction = ResponseItem::Compaction { - encrypted_content: "INLINE_SUMMARY_2".to_string(), - }; - let current_history = vec![ - turn_start_context_items[0].clone(), - turn_start_context_items[1].clone(), - current_turn_user.clone(), - current_turn_tool_output.clone(), - prior_compaction, - prior_snapshot.clone(), - same_turn_snapshot.clone(), - ]; - - let replacement_history = build_server_side_compaction_replacement_history( - new_compaction.clone(), - &turn_start_context_items, - &turn_start_context_items, - &history_before_turn, - ¤t_history, - ¤t_history, - ); - - assert_eq!( - replacement_history, - vec![ - turn_start_context_items[0].clone(), - turn_start_context_items[1].clone(), - current_turn_user, - current_turn_tool_output, - new_compaction, - prior_snapshot, - same_turn_snapshot, - ] - ); -} - -#[test] -fn build_server_side_compaction_replacement_history_replaces_prior_summary_with_empty_history() { - let same_turn_snapshot = ghost_snapshot("ghost-during"); - let history_before_turn = Vec::new(); - let turn_start_context_items = vec![ - developer_message("fresh permissions"), - environment_context_message("/fresh"), - ]; - let current_turn_user = user_message("current turn"); - let current_turn_tool_output = ResponseItem::FunctionCallOutput { - call_id: "call-1".to_string(), - output: FunctionCallOutputPayload::from_text("tool result".to_string()), - }; - let prior_compaction = ResponseItem::Compaction { - encrypted_content: "INLINE_SUMMARY_1".to_string(), - }; - let new_compaction = ResponseItem::Compaction { - encrypted_content: "INLINE_SUMMARY_2".to_string(), - }; - let current_history = vec![ - turn_start_context_items[0].clone(), - turn_start_context_items[1].clone(), - prior_compaction, - current_turn_user.clone(), - current_turn_tool_output.clone(), - same_turn_snapshot.clone(), - ]; - - let replacement_history = build_server_side_compaction_replacement_history( - new_compaction.clone(), - &turn_start_context_items, - &turn_start_context_items, - &history_before_turn, - ¤t_history, - ¤t_history, - ); - - assert_eq!( - replacement_history, - vec![ - turn_start_context_items[0].clone(), - turn_start_context_items[1].clone(), - current_turn_user, - current_turn_tool_output, - new_compaction, - same_turn_snapshot, - ] - ); -} - -#[test] -fn build_server_side_compaction_replacement_history_reuses_existing_initial_context_once() { - let history_before_turn = vec![user_message("earlier")]; - let compaction_initial_context = vec![ - developer_message("fresh permissions"), - environment_context_message("/fresh"), - ]; - let turn_start_context_items = Vec::new(); - let current_turn_user = user_message("current turn"); - let prior_compaction = ResponseItem::Compaction { - encrypted_content: "INLINE_SUMMARY_1".to_string(), - }; - let new_compaction = ResponseItem::Compaction { - encrypted_content: "INLINE_SUMMARY_2".to_string(), - }; - let current_turn_tool_output = ResponseItem::FunctionCallOutput { - call_id: "call-1".to_string(), - output: FunctionCallOutputPayload::from_text("tool result".to_string()), - }; - let history_at_checkpoint = vec![ - compaction_initial_context[0].clone(), - compaction_initial_context[1].clone(), - current_turn_user.clone(), - prior_compaction, - ]; - let current_history = vec![ - history_at_checkpoint[0].clone(), - history_at_checkpoint[1].clone(), - history_at_checkpoint[2].clone(), - history_at_checkpoint[3].clone(), - current_turn_tool_output.clone(), - ]; - - let replacement_history = build_server_side_compaction_replacement_history( - new_compaction.clone(), - &compaction_initial_context, - &turn_start_context_items, - &history_before_turn, - &history_at_checkpoint, - ¤t_history, - ); - - assert_eq!( - replacement_history, - vec![ - compaction_initial_context[0].clone(), - compaction_initial_context[1].clone(), - current_turn_user, - new_compaction, - current_turn_tool_output, - ] - ); -} - #[test] fn build_server_side_compaction_replacement_history_prefers_longer_initial_context_prefix() { let history_before_turn = vec![user_message("earlier")]; @@ -603,66 +396,6 @@ fn build_server_side_compaction_replacement_history_prefers_longer_initial_conte ); } -#[test] -fn build_server_side_compaction_replacement_history_keeps_checkpoint_before_post_compaction_items() -{ - let prior_snapshot = ghost_snapshot("ghost-before"); - let same_turn_snapshot = ghost_snapshot("ghost-during"); - let history_before_turn = vec![user_message("earlier"), prior_snapshot.clone()]; - let turn_start_context_items = vec![ - developer_message("fresh permissions"), - environment_context_message("/fresh"), - ]; - let current_turn_user = user_message("current turn"); - let post_checkpoint_tool_call = ResponseItem::FunctionCall { - id: None, - call_id: "call-1".to_string(), - name: "test_tool".to_string(), - arguments: "{}".to_string(), - }; - let history_at_checkpoint = vec![ - user_message("earlier"), - prior_snapshot.clone(), - turn_start_context_items[0].clone(), - turn_start_context_items[1].clone(), - current_turn_user.clone(), - ]; - let current_history = vec![ - history_at_checkpoint[0].clone(), - history_at_checkpoint[1].clone(), - history_at_checkpoint[2].clone(), - history_at_checkpoint[3].clone(), - history_at_checkpoint[4].clone(), - post_checkpoint_tool_call.clone(), - same_turn_snapshot.clone(), - ]; - let compaction_item = ResponseItem::Compaction { - encrypted_content: "INLINE_SUMMARY".to_string(), - }; - - let replacement_history = build_server_side_compaction_replacement_history( - compaction_item.clone(), - &turn_start_context_items, - &turn_start_context_items, - &history_before_turn, - &history_at_checkpoint, - ¤t_history, - ); - - assert_eq!( - replacement_history, - vec![ - turn_start_context_items[0].clone(), - turn_start_context_items[1].clone(), - current_turn_user, - compaction_item, - post_checkpoint_tool_call, - prior_snapshot, - same_turn_snapshot, - ] - ); -} - fn make_mcp_tool( server_name: &str, tool_name: &str, diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index a172ab1f3d..8ae516ff9c 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -178,6 +178,9 @@ pub(crate) async fn handle_output_item_done( Some(TurnItem::ContextCompaction(item)) => item, _ => ContextCompactionItem::new(), }); + // Preserve the raw wire event immediately, but defer the committed turn-item lifecycle + // until `response.completed` so later streamed output from the same response is not + // reordered around the local checkpoint rewrite. debug!( turn_id = %ctx.turn_context.sub_id, "emitting streamed server-side raw compaction item and buffering committed checkpoint until response.completed" diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 7dd0fa7e94..450cb5616f 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -12,8 +12,6 @@ use codex_core::features::Feature; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; -use codex_protocol::openai_models::ModelInfo; -use codex_protocol::openai_models::ModelsResponse; use codex_protocol::protocol::ConversationStartParams; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; @@ -64,18 +62,6 @@ fn summary_with_prefix(summary: &str) -> String { format!("{SUMMARY_PREFIX}\n{summary}") } -fn model_info_with_context_window(slug: &str, context_window: i64) -> ModelInfo { - let models_response: ModelsResponse = - serde_json::from_str(include_str!("../../models.json")).expect("valid models.json"); - let mut model_info = models_response - .models - .into_iter() - .find(|model| model.slug == slug) - .unwrap_or_else(|| panic!("model `{slug}` missing from models.json")); - model_info.context_window = Some(context_window); - model_info -} - fn context_snapshot_options() -> ContextSnapshotOptions { ContextSnapshotOptions::default() .render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 64 }) @@ -546,190 +532,6 @@ async fn auto_server_side_compaction_uses_inline_context_management() -> Result< Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn auto_server_side_compaction_commits_events_after_later_streamed_items() -> Result<()> { - skip_if_no_network!(Ok(())); - - let compact_threshold = 120; - let harness = TestCodexHarness::with_builder( - test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_config(move |config| { - config - .features - .enable(Feature::ServerSideCompaction) - .expect("enable server-side compaction"); - config.model_auto_compact_token_limit = Some(compact_threshold); - }), - ) - .await?; - let codex = harness.test().codex.clone(); - - let _responses_mock = responses::mount_sse_sequence( - harness.server(), - vec![ - responses::sse(vec![ - responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"), - responses::ev_completed_with_tokens("resp-1", 500), - ]), - responses::sse(vec![ - responses::ev_compaction(&summary_with_prefix("INLINE_SERVER_SUMMARY")), - responses::ev_assistant_message("m2", "AFTER_INLINE_REPLY"), - responses::ev_completed_with_tokens("resp-2", 80), - ]), - ], - ) - .await; - - submit_text_turn_and_wait(&codex, "inline compact turn one").await?; - - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: "inline compact turn two".to_string(), - text_elements: Vec::new(), - }], - final_output_json_schema: None, - }) - .await?; - - let mut event_index = 0usize; - let mut context_compacted_index = None; - let mut assistant_completed_index = None; - let mut saw_turn_complete = false; - while !saw_turn_complete - || context_compacted_index.is_none() - || assistant_completed_index.is_none() - { - let event = codex.next_event().await.expect("event"); - match event.msg { - EventMsg::ContextCompacted(_) => { - context_compacted_index = Some(event_index); - } - EventMsg::ItemCompleted(ItemCompletedEvent { - item: TurnItem::AgentMessage(item), - .. - }) if item.content.iter().any(|entry| { - matches!( - entry, - codex_protocol::items::AgentMessageContent::Text { text } - if text == "AFTER_INLINE_REPLY" - ) - }) => - { - assistant_completed_index = Some(event_index); - } - EventMsg::TurnComplete(_) => { - saw_turn_complete = true; - } - _ => {} - } - event_index += 1; - } - - assert!( - assistant_completed_index.expect("assistant completed event") - < context_compacted_index.expect("context compacted event"), - "expected the committed inline compaction event to arrive only after later streamed assistant items" - ); - - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn auto_server_side_compaction_preserves_raw_response_item_order() -> Result<()> { - skip_if_no_network!(Ok(())); - - let compact_threshold = 120; - let inline_summary = summary_with_prefix("INLINE_SERVER_SUMMARY"); - - let harness = TestCodexHarness::with_builder( - test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_config(move |config| { - config - .features - .enable(Feature::ServerSideCompaction) - .expect("enable server-side compaction"); - config.model_auto_compact_token_limit = Some(compact_threshold); - }), - ) - .await?; - let codex = harness.test().codex.clone(); - - responses::mount_sse_sequence( - harness.server(), - vec![ - responses::sse(vec![ - responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"), - responses::ev_completed_with_tokens("resp-1", 500), - ]), - responses::sse(vec![ - responses::ev_compaction(&inline_summary), - responses::ev_assistant_message("m2", "AFTER_INLINE_REPLY"), - responses::ev_completed("resp-2"), - ]), - ], - ) - .await; - - submit_text_turn_and_wait(&codex, "inline compact turn one").await?; - - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: "inline compact turn two".to_string(), - text_elements: Vec::new(), - }], - final_output_json_schema: None, - }) - .await?; - - let mut event_index = 0usize; - let mut raw_compaction_index = None; - let mut raw_assistant_index = None; - let mut saw_turn_complete = false; - while !saw_turn_complete || raw_compaction_index.is_none() || raw_assistant_index.is_none() { - let event = codex.next_event().await.expect("event"); - match event.msg { - EventMsg::RawResponseItem(raw) - if matches!(raw.item, ResponseItem::Compaction { .. }) => - { - raw_compaction_index = Some(event_index); - } - EventMsg::RawResponseItem(raw) - if matches!( - &raw.item, - ResponseItem::Message { role, content, .. } - if role == "assistant" - && content.iter().any(|entry| { - matches!( - entry, - ContentItem::OutputText { text } - if text == "AFTER_INLINE_REPLY" - ) - }) - ) => - { - raw_assistant_index = Some(event_index); - } - EventMsg::TurnComplete(_) => { - saw_turn_complete = true; - } - _ => {} - } - event_index += 1; - } - - assert!( - raw_compaction_index.expect("raw compaction event") - < raw_assistant_index.expect("raw assistant event"), - "expected raw response item notifications to preserve wire order" - ); - - Ok(()) -} - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn auto_server_side_compaction_keeps_current_turn_inputs_for_follow_ups() -> Result<()> { skip_if_no_network!(Ok(())); @@ -829,204 +631,6 @@ async fn auto_server_side_compaction_keeps_current_turn_inputs_for_follow_ups() Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn auto_server_side_compaction_preserves_recomputed_token_estimate() -> Result<()> { - skip_if_no_network!(Ok(())); - - let compact_threshold = 120; - let inline_summary = summary_with_prefix("INLINE_SERVER_SUMMARY"); - - let harness = TestCodexHarness::with_builder( - test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_config(move |config| { - config - .features - .enable(Feature::ServerSideCompaction) - .expect("enable server-side compaction"); - config.model_auto_compact_token_limit = Some(compact_threshold); - }), - ) - .await?; - let codex = harness.test().codex.clone(); - - let responses_mock = responses::mount_sse_sequence( - harness.server(), - vec![ - responses::sse(vec![ - responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"), - responses::ev_completed_with_tokens("resp-1", 500), - ]), - responses::sse(vec![ - responses::ev_compaction(&inline_summary), - responses::ev_function_call( - "call-inline-stale-token-usage", - DUMMY_FUNCTION_NAME, - "{}", - ), - responses::ev_completed_with_tokens("resp-2", 500), - ]), - responses::sse(vec![ - responses::ev_assistant_message("m3", "AFTER_INLINE_TOOL_REPLY"), - responses::ev_completed("resp-3"), - ]), - ], - ) - .await; - - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: "inline compact turn one".to_string(), - text_elements: Vec::new(), - }], - final_output_json_schema: None, - }) - .await?; - wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; - - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: "inline compact turn two".to_string(), - text_elements: Vec::new(), - }], - final_output_json_schema: None, - }) - .await?; - - let mut saw_turn_complete = false; - let mut token_usage_events = Vec::new(); - while !saw_turn_complete { - let event = codex.next_event().await.expect("event"); - match event.msg { - EventMsg::TokenCount(token_count) => { - if let Some(info) = token_count.info.as_ref() { - token_usage_events.push(( - info.last_token_usage.total_tokens, - info.total_token_usage.total_tokens, - )); - } - } - EventMsg::TurnComplete(_) => { - saw_turn_complete = true; - } - _ => {} - } - } - - let requests = responses_mock.requests(); - assert_eq!( - requests.len(), - 3, - "expected initial request, inline-compacted tool call, and same-turn follow-up" - ); - assert!( - token_usage_events - .iter() - .copied() - .any(|(last_tokens, total_tokens)| last_tokens > 500 && total_tokens >= 1_000), - "expected a post-compaction token event to keep the recomputed local estimate and preserve cumulative provider accounting: {token_usage_events:?}" - ); - - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn auto_server_side_compaction_follow_up_preserves_model_switch_updates() -> Result<()> { - skip_if_no_network!(Ok(())); - - let compact_threshold = 120; - let previous_model = "gpt-5.2-codex"; - let next_model = "gpt-5.1-codex-max"; - let inline_summary = summary_with_prefix("INLINE_SWITCH_SUMMARY"); - let server = wiremock::MockServer::start().await; - let _models_mock = responses::mount_models_once( - &server, - ModelsResponse { - models: vec![ - model_info_with_context_window(previous_model, 273_000), - model_info_with_context_window(next_model, 273_000), - ], - }, - ) - .await; - let responses_mock = responses::mount_sse_sequence( - &server, - vec![ - responses::sse(vec![ - responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"), - responses::ev_completed_with_tokens("resp-1", 500), - ]), - responses::sse(vec![ - responses::ev_compaction(&inline_summary), - responses::ev_function_call("call-inline-model-switch", DUMMY_FUNCTION_NAME, "{}"), - responses::ev_completed_with_tokens("resp-2", 80), - ]), - responses::sse(vec![ - responses::ev_assistant_message("m3", "AFTER_INLINE_SWITCH_TOOL_REPLY"), - responses::ev_completed("resp-3"), - ]), - ], - ) - .await; - - let test = test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_model(previous_model) - .with_config(move |config| { - config - .features - .enable(Feature::ServerSideCompaction) - .expect("enable server-side compaction"); - config.model_auto_compact_token_limit = Some(compact_threshold); - }) - .build(&server) - .await?; - let codex = test.codex.clone(); - - submit_text_turn_and_wait(&codex, "BEFORE_SWITCH_USER").await?; - - codex - .submit(Op::OverrideTurnContext { - cwd: None, - approval_policy: None, - sandbox_policy: None, - windows_sandbox_level: None, - model: Some(next_model.to_string()), - effort: None, - summary: None, - service_tier: None, - collaboration_mode: None, - personality: None, - }) - .await?; - submit_text_turn_and_wait(&codex, "AFTER_SWITCH_USER").await?; - - let requests = responses_mock.requests(); - assert_eq!( - requests.len(), - 3, - "expected initial request, switched-model inline compaction request, and same-turn follow-up" - ); - - let follow_up_request = &requests[2]; - assert!( - follow_up_request.body_contains_text(&inline_summary), - "expected same-turn follow-up to include the inline compaction item" - ); - assert!( - follow_up_request.body_contains_text("AFTER_SWITCH_USER"), - "expected same-turn follow-up to retain the switched-model user input" - ); - assert!( - follow_up_request.body_contains_text(""), - "expected same-turn follow-up to preserve the original model switch update" - ); - - Ok(()) -} - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn auto_server_side_compaction_retries_without_committing_incomplete_checkpoint() -> Result<()> { @@ -1114,300 +718,6 @@ async fn auto_server_side_compaction_retries_without_committing_incomplete_check Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn auto_server_side_compaction_stays_inline_with_custom_prompt() -> Result<()> { - skip_if_no_network!(Ok(())); - - let compact_threshold = 120; - let custom_compact_prompt = "CUSTOM_REMOTE_COMPACT_PROMPT"; - let inline_summary = summary_with_prefix("INLINE_SERVER_SUMMARY"); - - let harness = TestCodexHarness::with_builder( - test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_config(move |config| { - config - .features - .enable(Feature::ServerSideCompaction) - .expect("enable server-side compaction"); - config.model_auto_compact_token_limit = Some(compact_threshold); - config.compact_prompt = Some(custom_compact_prompt.to_string()); - }), - ) - .await?; - let codex = harness.test().codex.clone(); - - let responses_mock = responses::mount_response_sequence( - harness.server(), - vec![ - responses::sse_response(sse(vec![ - responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"), - responses::ev_completed_with_tokens("resp-1", 500), - ])), - responses::sse_response(sse(vec![ - responses::ev_compaction(&inline_summary), - responses::ev_assistant_message("m2", "AFTER_INLINE_REPLY"), - responses::ev_completed("resp-2"), - ])), - ], - ) - .await; - let compact_mock = responses::mount_compact_response_once( - harness.server(), - ResponseTemplate::new(200) - .insert_header("content-type", "application/json") - .set_body_json(json!({ "output": [] })), - ) - .await; - - submit_text_turn_and_wait(&codex, "custom prompt turn one").await?; - submit_text_turn_and_wait(&codex, "custom prompt turn two").await?; - - let requests = responses_mock.requests(); - assert_eq!(requests.len(), 2, "expected two /responses requests"); - assert_eq!( - compact_mock.requests().len(), - 0, - "expected auto-compaction to stay on inline context management" - ); - assert_eq!( - requests[1].body_json().get("context_management"), - Some(&json!([{ - "type": "compaction", - "compact_threshold": compact_threshold, - }])), - ); - assert!( - !requests[1].body_contains_text(custom_compact_prompt), - "inline auto-compaction should not send the OpenAI-unsupported compact prompt" - ); - - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn previous_model_preflight_compaction_still_runs_with_inline_server_side_compaction() --> Result<()> { - skip_if_no_network!(Ok(())); - - let server = wiremock::MockServer::start().await; - let previous_model = "gpt-5.2-codex"; - let next_model = "gpt-5.1-codex-max"; - let _models_mock = responses::mount_models_once( - &server, - ModelsResponse { - models: vec![ - model_info_with_context_window(previous_model, 273_000), - model_info_with_context_window(next_model, 125_000), - ], - }, - ) - .await; - - let test = test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_model(previous_model) - .with_config(|config| { - config - .features - .enable(Feature::ServerSideCompaction) - .expect("enable server-side compaction"); - config.model_auto_compact_token_limit = Some(200); - }) - .build(&server) - .await?; - let codex = test.codex.clone(); - - let initial_turn_request_mock = responses::mount_sse_once( - &server, - responses::sse(vec![ - responses::ev_assistant_message("m1", "BEFORE_SWITCH_REPLY"), - responses::ev_completed_with_tokens("r1", 120_000), - ]), - ) - .await; - let post_compact_turn_request_mock = responses::mount_sse_once( - &server, - responses::sse(vec![ - responses::ev_assistant_message("m2", "AFTER_SWITCH_REPLY"), - responses::ev_completed_with_tokens("r2", 80), - ]), - ) - .await; - let compact_mock = responses::mount_compact_user_history_with_summary_once( - &server, - &summary_with_prefix("REMOTE_SWITCH_SUMMARY"), - ) - .await; - - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: "BEFORE_SWITCH_USER".to_string(), - text_elements: Vec::new(), - }], - final_output_json_schema: None, - }) - .await?; - wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; - - codex - .submit(Op::OverrideTurnContext { - cwd: None, - approval_policy: None, - sandbox_policy: None, - windows_sandbox_level: None, - model: Some(next_model.to_string()), - effort: None, - summary: None, - service_tier: None, - collaboration_mode: None, - personality: None, - }) - .await?; - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: "AFTER_SWITCH_USER".to_string(), - text_elements: Vec::new(), - }], - final_output_json_schema: None, - }) - .await?; - wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; - - assert_eq!( - initial_turn_request_mock.requests().len(), - 1, - "expected initial turn request" - ); - assert_eq!( - compact_mock.requests().len(), - 1, - "expected previous-model preflight compaction before the smaller-model turn" - ); - assert_eq!( - post_compact_turn_request_mock.requests().len(), - 1, - "expected a single post-compaction smaller-model request" - ); - - let compact_request = compact_mock.single_request(); - let compact_body = compact_request.body_json().to_string(); - assert!( - !compact_body.contains("AFTER_SWITCH_USER"), - "preflight compaction should still exclude the incoming user message" - ); - assert!( - !compact_body.contains(""), - "preflight compaction should still strip the incoming model switch item" - ); - - let post_compact_turn_request = post_compact_turn_request_mock.single_request(); - assert!( - post_compact_turn_request.body_contains_text("REMOTE_SWITCH_SUMMARY"), - "smaller-model follow-up should use the previous-model compaction summary" - ); - assert!( - post_compact_turn_request.body_contains_text("AFTER_SWITCH_USER"), - "smaller-model follow-up should still include the incoming user message" - ); - assert!( - post_compact_turn_request.body_contains_text(""), - "smaller-model follow-up should still include the model switch item" - ); - - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn auto_server_side_compaction_reports_inline_compat_errors_without_fallback() -> Result<()> { - skip_if_no_network!(Ok(())); - - let compact_threshold = 120; - - let harness = TestCodexHarness::with_builder( - test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_config(move |config| { - config - .features - .enable(Feature::ServerSideCompaction) - .expect("enable server-side compaction"); - config.model_auto_compact_token_limit = Some(compact_threshold); - }), - ) - .await?; - let codex = harness.test().codex.clone(); - - let responses_mock = responses::mount_response_sequence( - harness.server(), - vec![ - responses::sse_response(sse(vec![ - responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"), - responses::ev_completed_with_tokens("resp-1", 500), - ])), - ResponseTemplate::new(400).set_body_json(json!({ - "error": { - "message": "Unknown field `context_management` on request body", - } - })), - ], - ) - .await; - let compact_mock = responses::mount_compact_response_once( - harness.server(), - ResponseTemplate::new(200) - .insert_header("content-type", "application/json") - .set_body_json(json!({ "output": [] })), - ) - .await; - - submit_text_turn_and_wait(&codex, "downgrade turn one").await?; - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: "downgrade turn two".to_string(), - text_elements: Vec::new(), - }], - final_output_json_schema: None, - }) - .await?; - let error_message = wait_for_event_match(&codex, |event| match event { - EventMsg::Error(err) => Some(err.message.clone()), - _ => None, - }) - .await; - wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; - - let requests = responses_mock.requests(); - assert_eq!( - requests.len(), - 2, - "expected the initial turn plus one failed inline attempt" - ); - - let inline_attempt = requests[1].body_json(); - assert_eq!( - inline_attempt.get("context_management"), - Some(&json!([{ - "type": "compaction", - "compact_threshold": compact_threshold, - }])), - ); - assert!( - error_message.contains("Unknown field `context_management` on request body"), - "expected the inline compatibility error to surface, got {error_message}" - ); - assert_eq!( - compact_mock.requests().len(), - 0, - "expected no legacy /compact fallback after an inline compatibility error" - ); - - Ok(()) -} - #[cfg_attr(target_os = "windows", ignore)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {