From d1dbd312849c17613c0186826144af2389a4c693 Mon Sep 17 00:00:00 2001 From: Charles Cunningham Date: Sat, 31 Jan 2026 23:46:30 -0800 Subject: [PATCH] Handle compaction in turn context rebuild --- codex-rs/core/src/codex.rs | 158 ++++++++++++++++++++++++++++++++++--- 1 file changed, 149 insertions(+), 9 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index a1eb0b02e6..3fbda3bd4d 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1809,34 +1809,113 @@ impl Session { fn reconstruct_turn_context_history_from_rollout( rollout_items: &[RolloutItem], ) -> Vec> { - let mut history = Vec::new(); + let mut history_items = Vec::new(); let mut awaiting_turn_context = false; + let mut turn_context_history = Vec::new(); for item in rollout_items { match item { - RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) if role == "user" => { - history.push(None); - awaiting_turn_context = true; + RolloutItem::ResponseItem(response_item) => { + history_items.push(response_item.clone()); + if matches!(response_item, ResponseItem::Message { role, .. } if role == "user") + { + turn_context_history.push(None); + awaiting_turn_context = true; + } } RolloutItem::TurnContext(ctx) => { if awaiting_turn_context { - if let Some(last) = history.last_mut() { + if let Some(last) = turn_context_history.last_mut() { *last = Some(ctx.clone()); } else { - history.push(Some(ctx.clone())); + turn_context_history.push(Some(ctx.clone())); } awaiting_turn_context = false; } } + RolloutItem::Compacted(compacted) => { + if let Some(replacement) = &compacted.replacement_history { + history_items = replacement.clone(); + // Remote compaction can reorder messages, so drop contexts to avoid + // misalignment. + turn_context_history = vec![None; Session::user_turn_count(&history_items)]; + } else { + let user_messages = collect_user_messages(&history_items); + history_items = compact::build_compacted_history( + Vec::new(), + &user_messages, + &compacted.message, + ); + turn_context_history = Self::rebuild_turn_context_history_after_compaction( + turn_context_history, + &history_items, + ); + } + awaiting_turn_context = false; + } RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { + Self::drop_last_n_user_turns_from_items(&mut history_items, rollback.num_turns); let drop = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX); - let new_len = history.len().saturating_sub(drop); - history.truncate(new_len); + let new_len = turn_context_history.len().saturating_sub(drop); + turn_context_history.truncate(new_len); awaiting_turn_context = false; } _ => {} } } - history + turn_context_history + } + + fn rebuild_turn_context_history_after_compaction( + previous: Vec>, + compacted_history: &[ResponseItem], + ) -> Vec> { + let user_messages = collect_user_messages(compacted_history); + let retained = user_messages.len(); + let mut updated = Self::take_last_with_padding(previous, retained); + updated.push(None); + updated + } + + fn take_last_with_padding( + mut history: Vec>, + retained: usize, + ) -> Vec> { + if retained == 0 { + return Vec::new(); + } + if history.len() >= retained { + history.split_off(history.len() - retained) + } else { + let mut padded = Vec::with_capacity(retained); + padded.resize_with(retained - history.len(), || None); + padded.append(&mut history); + padded + } + } + + fn drop_last_n_user_turns_from_items(items: &mut Vec, num_turns: u32) { + if num_turns == 0 { + return; + } + + let user_positions: Vec = items + .iter() + .enumerate() + .filter_map(|(idx, item)| { + matches!(item, ResponseItem::Message { role, .. } if role == "user").then_some(idx) + }) + .collect(); + let Some(&first_user_idx) = user_positions.first() else { + return; + }; + + let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX); + let cut_idx = if n_from_end >= user_positions.len() { + first_user_idx + } else { + user_positions[user_positions.len() - n_from_end] + }; + items.truncate(cut_idx); } fn user_turn_count(items: &[ResponseItem]) -> usize { @@ -5131,6 +5210,67 @@ mod tests { assert!(update_items.contains(&expected_item)); } + #[test] + fn reconstruct_turn_context_history_handles_compaction() { + let collaboration_mode = CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: "gpt-test".to_string(), + reasoning_effort: None, + developer_instructions: None, + }, + }; + let turn_context = TurnContextItem { + cwd: PathBuf::from("/tmp"), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::ReadOnly, + model: collaboration_mode.settings.model.clone(), + personality: None, + collaboration_mode: Some(collaboration_mode.clone()), + effort: None, + summary: ReasoningSummaryConfig::Auto, + user_instructions: None, + developer_instructions: None, + final_output_json_schema: None, + truncation_policy: None, + }; + let user = ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "first user".to_string(), + }], + end_turn: None, + }; + let assistant = ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "assistant reply".to_string(), + }], + end_turn: None, + }; + let compacted = RolloutItem::Compacted(CompactedItem { + message: format!("{}\nsummary", compact::SUMMARY_PREFIX), + replacement_history: None, + }); + + let rollout_items = vec![ + RolloutItem::ResponseItem(user), + RolloutItem::TurnContext(turn_context), + RolloutItem::ResponseItem(assistant), + compacted, + ]; + let history = Session::reconstruct_turn_context_history_from_rollout(&rollout_items); + let modes: Vec> = history + .iter() + .map(|item| item.as_ref().and_then(|ctx| ctx.collaboration_mode.clone())) + .collect(); + + assert_eq!(history.len(), 2); + assert_eq!(modes, vec![Some(collaboration_mode), None]); + } + #[tokio::test] async fn thread_rollback_ignores_stale_turn_context() { let (sess, tc, rx) = make_session_and_context_with_rx().await;