diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index a04c409e03..130a2bbd1a 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -6020,11 +6020,24 @@ fn build_server_side_compaction_replacement_history( history_before_turn: &[ResponseItem], current_history: &[ResponseItem], ) -> Vec { + let current_turn_items = + if let Some(current_turn_items) = current_history.strip_prefix(history_before_turn) { + current_turn_items + } else if matches!( + current_history.first(), + Some(ResponseItem::Compaction { .. }) + ) { + let first_non_compaction = current_history + .iter() + .position(|item| !matches!(item, ResponseItem::Compaction { .. })) + .unwrap_or(current_history.len()); + ¤t_history[first_non_compaction..] + } else { + current_history + }; let mut replacement_history = vec![compaction_item]; replacement_history.extend( - current_history - .strip_prefix(history_before_turn) - .unwrap_or(current_history) + current_turn_items .iter() .filter(|item| !matches!(item, ResponseItem::GhostSnapshot { .. })) .cloned(), @@ -6192,6 +6205,8 @@ async fn downgrade_known_inline_compaction_error( // the pre-turn baseline was captured so `/undo` still works after // we downgrade to the legacy compaction path. let current_history = sess.clone_history().await; + let current_history_items = current_history.raw_items().to_vec(); + let current_reference_context_item = sess.reference_context_item().await; let mut restored_history = preturn_state.history_before_turn.clone(); restored_history.extend(collect_new_ghost_snapshots_since( &preturn_state.history_before_turn, @@ -6202,13 +6217,27 @@ async fn downgrade_known_inline_compaction_error( preturn_state.reference_context_before_turn.clone(), ) .await; - run_auto_compact( + if let Err(err) = run_auto_compact( sess, turn_context, InitialContextInjection::DoNotInject, AutoCompactTrigger::AutoPreTurn, ) - .await?; + .await + { + let latest_history = sess.clone_history().await; + let mut restored_current_history = current_history_items; + restored_current_history.extend(collect_new_ghost_snapshots_since( + &restored_current_history, + latest_history.raw_items(), + )); + // If the legacy fallback also fails, restore the live turn + // state instead of silently dropping the already-recorded turn. + sess.replace_history(restored_current_history, current_reference_context_item) + .await; + sess.recompute_token_usage(turn_context).await; + return Err(err); + } if !preturn_state.replay_items.is_empty() { sess.record_into_history(&preturn_state.replay_items, turn_context) .await; diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 4a0523104d..e272704c22 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -86,6 +86,10 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::Once; use std::time::Duration as StdDuration; +use wiremock::Mock; +use wiremock::MockServer; +use wiremock::ResponseTemplate; +use wiremock::matchers::method; #[path = "codex_tests_guardian.rs"] mod guardian_tests; @@ -317,6 +321,134 @@ fn build_server_side_compaction_replacement_history_keeps_current_turn_inputs() ); } +#[test] +fn build_server_side_compaction_replacement_history_replaces_prior_same_turn_summary() { + let prior_snapshot = ghost_snapshot("ghost-before"); + let same_turn_snapshot = ghost_snapshot("ghost-during"); + let history_before_turn = vec![user_message("earlier"), prior_snapshot.clone()]; + let current_turn_user = user_message("current turn"); + let current_turn_tool_output = ResponseItem::FunctionCallOutput { + call_id: "call-1".to_string(), + output: FunctionCallOutputPayload::from_text("tool result".to_string()), + }; + let prior_compaction = ResponseItem::Compaction { + encrypted_content: "INLINE_SUMMARY_1".to_string(), + }; + let new_compaction = ResponseItem::Compaction { + encrypted_content: "INLINE_SUMMARY_2".to_string(), + }; + let current_history = vec![ + prior_compaction, + current_turn_user.clone(), + current_turn_tool_output.clone(), + prior_snapshot.clone(), + same_turn_snapshot.clone(), + ]; + + let replacement_history = build_server_side_compaction_replacement_history( + new_compaction.clone(), + &history_before_turn, + ¤t_history, + ); + + assert_eq!( + replacement_history, + vec![ + new_compaction, + current_turn_user, + current_turn_tool_output, + prior_snapshot, + same_turn_snapshot, + ] + ); +} + +#[tokio::test] +async fn downgrade_known_inline_compaction_error_restores_current_turn_when_fallback_fails() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(500).set_body_string("compact unavailable")) + .mount(&server) + .await; + + let (mut session, mut turn_context) = make_session_and_context().await; + let mut provider = crate::model_provider_info::ModelProviderInfo::create_openai_provider(); + provider.base_url = Some(format!("{}/v1", server.uri())); + turn_context.provider = provider.clone(); + session.services.model_client = ModelClient::new( + Some(Arc::clone(&session.services.auth_manager)), + session.conversation_id.clone(), + provider, + turn_context.session_source.clone(), + turn_context.config.model_verbosity, + ws_version_from_features(turn_context.config.as_ref()), + turn_context + .config + .features + .enabled(Feature::EnableRequestCompression), + turn_context + .config + .features + .enabled(Feature::RuntimeMetrics), + Session::build_model_client_beta_features_header(turn_context.config.as_ref()), + ); + let session = Arc::new(session); + let turn_context = Arc::new(turn_context); + + let history_before_turn = vec![user_message("earlier")]; + let context_update = ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: "context update".to_string(), + }], + end_turn: None, + phase: None, + }; + let current_turn_user = user_message("current turn"); + let same_turn_snapshot = ghost_snapshot("ghost-during"); + let replay_items = vec![context_update.clone(), current_turn_user.clone()]; + let current_history = vec![ + history_before_turn[0].clone(), + context_update, + current_turn_user, + same_turn_snapshot, + ]; + let turn_context_item = turn_context.to_turn_context_item(); + session + .replace_history(current_history.clone(), Some(turn_context_item.clone())) + .await; + + let result = downgrade_known_inline_compaction_error( + &session, + &turn_context, + PendingServerSideCompaction { + threshold: 123, + trigger: AutoCompactTrigger::AutoPreTurn, + }, + Some(&PreTurnInlineCompactionState { + history_before_turn, + reference_context_before_turn: None, + replay_items, + turn_context_item: turn_context_item.clone(), + }), + &CodexErr::InvalidRequest("compact_threshold is unsupported".to_string()), + ) + .await; + + assert!(result.is_err()); + assert_eq!( + session.clone_history().await.raw_items(), + current_history.as_slice() + ); + assert_eq!( + serde_json::to_value(session.reference_context_item().await) + .expect("serialize restored reference context"), + serde_json::to_value(Some(turn_context_item)) + .expect("serialize expected reference context") + ); +} + fn make_mcp_tool( server_name: &str, tool_name: &str,