[core] Preserve inline compaction turn state [ci changed_files]

Preserve current-turn history when inline compaction downgrades fail and replace prior same-turn compaction checkpoints instead of stacking them.

Tests:
- cargo test -p codex-core codex::tests::build_server_side_compaction_replacement_history_keeps_current_turn_inputs -- --exact
- cargo test -p codex-core codex::tests::build_server_side_compaction_replacement_history_replaces_prior_same_turn_summary -- --exact
- cargo test -p codex-core codex::tests::downgrade_known_inline_compaction_error_restores_current_turn_when_fallback_fails -- --exact

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Cooper Gamble
2026-03-08 20:37:22 +00:00
parent e54a4ae5e2
commit 2bcf6ebaa3
2 changed files with 166 additions and 5 deletions

View File

@@ -6020,11 +6020,24 @@ fn build_server_side_compaction_replacement_history(
history_before_turn: &[ResponseItem],
current_history: &[ResponseItem],
) -> Vec<ResponseItem> {
let current_turn_items =
if let Some(current_turn_items) = current_history.strip_prefix(history_before_turn) {
current_turn_items
} else if matches!(
current_history.first(),
Some(ResponseItem::Compaction { .. })
) {
let first_non_compaction = current_history
.iter()
.position(|item| !matches!(item, ResponseItem::Compaction { .. }))
.unwrap_or(current_history.len());
&current_history[first_non_compaction..]
} else {
current_history
};
let mut replacement_history = vec![compaction_item];
replacement_history.extend(
current_history
.strip_prefix(history_before_turn)
.unwrap_or(current_history)
current_turn_items
.iter()
.filter(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned(),
@@ -6192,6 +6205,8 @@ async fn downgrade_known_inline_compaction_error(
// the pre-turn baseline was captured so `/undo` still works after
// we downgrade to the legacy compaction path.
let current_history = sess.clone_history().await;
let current_history_items = current_history.raw_items().to_vec();
let current_reference_context_item = sess.reference_context_item().await;
let mut restored_history = preturn_state.history_before_turn.clone();
restored_history.extend(collect_new_ghost_snapshots_since(
&preturn_state.history_before_turn,
@@ -6202,13 +6217,27 @@ async fn downgrade_known_inline_compaction_error(
preturn_state.reference_context_before_turn.clone(),
)
.await;
run_auto_compact(
if let Err(err) = run_auto_compact(
sess,
turn_context,
InitialContextInjection::DoNotInject,
AutoCompactTrigger::AutoPreTurn,
)
.await?;
.await
{
let latest_history = sess.clone_history().await;
let mut restored_current_history = current_history_items;
restored_current_history.extend(collect_new_ghost_snapshots_since(
&restored_current_history,
latest_history.raw_items(),
));
// If the legacy fallback also fails, restore the live turn
// state instead of silently dropping the already-recorded turn.
sess.replace_history(restored_current_history, current_reference_context_item)
.await;
sess.recompute_token_usage(turn_context).await;
return Err(err);
}
if !preturn_state.replay_items.is_empty() {
sess.record_into_history(&preturn_state.replay_items, turn_context)
.await;

View File

@@ -86,6 +86,10 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Once;
use std::time::Duration as StdDuration;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
#[path = "codex_tests_guardian.rs"]
mod guardian_tests;
@@ -317,6 +321,134 @@ fn build_server_side_compaction_replacement_history_keeps_current_turn_inputs()
);
}
#[test]
fn build_server_side_compaction_replacement_history_replaces_prior_same_turn_summary() {
let prior_snapshot = ghost_snapshot("ghost-before");
let same_turn_snapshot = ghost_snapshot("ghost-during");
let history_before_turn = vec![user_message("earlier"), prior_snapshot.clone()];
let current_turn_user = user_message("current turn");
let current_turn_tool_output = ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_text("tool result".to_string()),
};
let prior_compaction = ResponseItem::Compaction {
encrypted_content: "INLINE_SUMMARY_1".to_string(),
};
let new_compaction = ResponseItem::Compaction {
encrypted_content: "INLINE_SUMMARY_2".to_string(),
};
let current_history = vec![
prior_compaction,
current_turn_user.clone(),
current_turn_tool_output.clone(),
prior_snapshot.clone(),
same_turn_snapshot.clone(),
];
let replacement_history = build_server_side_compaction_replacement_history(
new_compaction.clone(),
&history_before_turn,
&current_history,
);
assert_eq!(
replacement_history,
vec![
new_compaction,
current_turn_user,
current_turn_tool_output,
prior_snapshot,
same_turn_snapshot,
]
);
}
#[tokio::test]
async fn downgrade_known_inline_compaction_error_restores_current_turn_when_fallback_fails() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(500).set_body_string("compact unavailable"))
.mount(&server)
.await;
let (mut session, mut turn_context) = make_session_and_context().await;
let mut provider = crate::model_provider_info::ModelProviderInfo::create_openai_provider();
provider.base_url = Some(format!("{}/v1", server.uri()));
turn_context.provider = provider.clone();
session.services.model_client = ModelClient::new(
Some(Arc::clone(&session.services.auth_manager)),
session.conversation_id.clone(),
provider,
turn_context.session_source.clone(),
turn_context.config.model_verbosity,
ws_version_from_features(turn_context.config.as_ref()),
turn_context
.config
.features
.enabled(Feature::EnableRequestCompression),
turn_context
.config
.features
.enabled(Feature::RuntimeMetrics),
Session::build_model_client_beta_features_header(turn_context.config.as_ref()),
);
let session = Arc::new(session);
let turn_context = Arc::new(turn_context);
let history_before_turn = vec![user_message("earlier")];
let context_update = ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "context update".to_string(),
}],
end_turn: None,
phase: None,
};
let current_turn_user = user_message("current turn");
let same_turn_snapshot = ghost_snapshot("ghost-during");
let replay_items = vec![context_update.clone(), current_turn_user.clone()];
let current_history = vec![
history_before_turn[0].clone(),
context_update,
current_turn_user,
same_turn_snapshot,
];
let turn_context_item = turn_context.to_turn_context_item();
session
.replace_history(current_history.clone(), Some(turn_context_item.clone()))
.await;
let result = downgrade_known_inline_compaction_error(
&session,
&turn_context,
PendingServerSideCompaction {
threshold: 123,
trigger: AutoCompactTrigger::AutoPreTurn,
},
Some(&PreTurnInlineCompactionState {
history_before_turn,
reference_context_before_turn: None,
replay_items,
turn_context_item: turn_context_item.clone(),
}),
&CodexErr::InvalidRequest("compact_threshold is unsupported".to_string()),
)
.await;
assert!(result.is_err());
assert_eq!(
session.clone_history().await.raw_items(),
current_history.as_slice()
);
assert_eq!(
serde_json::to_value(session.reference_context_item().await)
.expect("serialize restored reference context"),
serde_json::to_value(Some(turn_context_item))
.expect("serialize expected reference context")
);
}
fn make_mcp_tool(
server_name: &str,
tool_name: &str,