From 00b11701611ea0ea572987ff4c1533f004b0e3bb Mon Sep 17 00:00:00 2001 From: Friel Date: Mon, 4 May 2026 09:38:30 +0000 Subject: [PATCH] Fix fork truncation across rollout references --- .../core/src/thread_rollout_truncation.rs | 12 ++-- .../src/thread_rollout_truncation_tests.rs | 55 +++++++++++++++++++ codex-rs/core/tests/suite/compact.rs | 15 +++-- 3 files changed, 72 insertions(+), 10 deletions(-) diff --git a/codex-rs/core/src/thread_rollout_truncation.rs b/codex-rs/core/src/thread_rollout_truncation.rs index 65b7d7fa9b..dfa56f6b24 100644 --- a/codex-rs/core/src/thread_rollout_truncation.rs +++ b/codex-rs/core/src/thread_rollout_truncation.rs @@ -188,18 +188,18 @@ async fn materialize_rollout_items_for_replay_at_depth( }; match RolloutRecorder::load_rollout_items(&resolved_path).await { Ok((parent_items, _, _)) => { - let parent_prefix = truncate_rollout_before_nth_user_message_from_start( - &parent_items, - reference.nth_user_message, - ); let parent_materialized = Box::pin(materialize_rollout_items_for_replay_at_depth( codex_home, - &parent_prefix, + &parent_items, depth + 1, )) .await; - materialized.extend(parent_materialized); + let parent_prefix = truncate_rollout_before_nth_user_message_from_start( + &parent_materialized, + reference.nth_user_message, + ); + materialized.extend(parent_prefix); } Err(err) => { warn!( diff --git a/codex-rs/core/src/thread_rollout_truncation_tests.rs b/codex-rs/core/src/thread_rollout_truncation_tests.rs index a2414242ba..92f4b3db72 100644 --- a/codex-rs/core/src/thread_rollout_truncation_tests.rs +++ b/codex-rs/core/src/thread_rollout_truncation_tests.rs @@ -245,6 +245,61 @@ async fn materializes_fork_reference_by_segment_id_after_source_rollover() { assert!(text.contains("child request")); } +#[tokio::test] +async fn materializes_fork_reference_before_truncating_rollout_references() { + let temp = tempfile::tempdir().expect("tempdir"); + let thread_id = ThreadId::new(); + let old_segment_id = SegmentId::new(); + let current_segment_id = SegmentId::new(); + + let old_path = temp.path().join("old.jsonl"); + let current_path = temp.path().join("current.jsonl"); + + write_rollout( + &old_path, + &[ + session_meta_item(thread_id, old_segment_id), + RolloutItem::ResponseItem(user_msg("u1")), + RolloutItem::ResponseItem(assistant_msg("a1")), + ], + ) + .await; + write_rollout( + ¤t_path, + &[ + session_meta_item(thread_id, current_segment_id), + RolloutItem::RolloutReference(RolloutReferenceItem { + rollout_path: old_path, + thread_id: None, + segment_id: None, + max_depth: 2, + }), + RolloutItem::ResponseItem(user_msg("u2")), + RolloutItem::ResponseItem(assistant_msg("a2")), + RolloutItem::ResponseItem(user_msg("u3")), + ], + ) + .await; + + let fork_items = vec![ + RolloutItem::ForkReference(ForkReferenceItem { + rollout_path: current_path, + thread_id: Some(thread_id), + segment_id: Some(current_segment_id), + nth_user_message: 2, + }), + RolloutItem::ResponseItem(user_msg("child request")), + ]; + + let materialized = materialize_rollout_items_for_replay(temp.path(), &fork_items).await; + let text = serde_json::to_string(&materialized).expect("serialize materialized rollout"); + + assert!(text.contains("u1")); + assert!(text.contains("u2")); + assert!(!text.contains("u3")); + assert!(text.contains("child request")); +} + #[tokio::test] async fn materializes_rollout_reference_with_bounded_depth() { let temp = tempfile::tempdir().expect("tempdir"); diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 55a379e8ac..f1d32281d0 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -2024,7 +2024,6 @@ async fn auto_compact_persists_rollout_entries() { }); let test = builder.build(&server).await.unwrap(); let codex = test.codex.clone(); - let session_configured = test.session_configured; codex .submit(Op::UserInput { @@ -2068,10 +2067,10 @@ async fn auto_compact_persists_rollout_entries() { .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + let rollout_path = codex.current_rollout_path().await.expect("rollout path"); codex.submit(Op::Shutdown).await.unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; - let rollout_path = session_configured.rollout_path.expect("rollout path"); let text = std::fs::read_to_string(&rollout_path).unwrap_or_else(|e| { panic!( "failed to read rollout file {}: {e}", @@ -2079,7 +2078,7 @@ async fn auto_compact_persists_rollout_entries() { ) }); - let mut turn_context_count = 0usize; + let mut rollout_items = Vec::new(); for line in text.lines() { let trimmed = line.trim(); if trimmed.is_empty() { @@ -2088,7 +2087,15 @@ async fn auto_compact_persists_rollout_entries() { let Ok(entry): Result = serde_json::from_str(trimmed) else { continue; }; - match entry.item { + rollout_items.push(entry.item); + } + let rollout_items = + materialize_rollout_items_for_replay(test.config.codex_home.as_path(), &rollout_items) + .await; + + let mut turn_context_count = 0usize; + for item in rollout_items { + match item { RolloutItem::TurnContext(_) => { turn_context_count += 1; }