mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Fix fork reference truncation test semantics
This commit is contained in:
@@ -218,6 +218,7 @@ use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_name_by_id;
|
||||
use codex_core::find_thread_names_by_ids;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_core::materialize_rollout_items_for_replay;
|
||||
use codex_core::mcp::auth::discover_supported_scopes;
|
||||
use codex_core::mcp::auth::resolve_oauth_scopes;
|
||||
use codex_core::mcp::collect_mcp_snapshot;
|
||||
@@ -235,7 +236,6 @@ use codex_core::plugins::load_plugin_apps;
|
||||
use codex_core::plugins::load_plugin_mcp_servers;
|
||||
use codex_core::read_head_for_summary;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::resolve_fork_reference_rollout_path;
|
||||
use codex_core::rollout_date_parts;
|
||||
use codex_core::sandboxing::SandboxPermissions;
|
||||
use codex_core::state_db::StateDbHandle;
|
||||
@@ -8486,7 +8486,11 @@ pub(crate) async fn read_rollout_items_from_rollout(
|
||||
InitialHistory::Resumed(resumed) => resumed.history,
|
||||
};
|
||||
|
||||
Ok(materialize_rollout_items_for_replay(codex_home_from_rollout_path(path), &items).await)
|
||||
if let Some(codex_home) = codex_home_from_rollout_path(path) {
|
||||
return Ok(materialize_rollout_items_for_replay(codex_home, &items).await);
|
||||
}
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
fn extract_conversation_summary(
|
||||
@@ -8593,51 +8597,6 @@ fn preview_from_rollout_items(items: &[RolloutItem]) -> String {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn user_message_positions_in_rollout(items: &[RolloutItem]) -> Vec<usize> {
|
||||
let mut user_positions = Vec::new();
|
||||
for (idx, item) in items.iter().enumerate() {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(item)
|
||||
if matches!(
|
||||
codex_core::parse_turn_item(item),
|
||||
Some(TurnItem::UserMessage(_))
|
||||
) =>
|
||||
{
|
||||
user_positions.push(idx);
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
|
||||
let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX);
|
||||
let new_len = user_positions.len().saturating_sub(num_turns);
|
||||
user_positions.truncate(new_len);
|
||||
}
|
||||
RolloutItem::ResponseItem(_) => {}
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => {}
|
||||
}
|
||||
}
|
||||
user_positions
|
||||
}
|
||||
|
||||
fn truncate_rollout_before_nth_user_message_from_start(
|
||||
items: &[RolloutItem],
|
||||
n_from_start: usize,
|
||||
) -> Vec<RolloutItem> {
|
||||
if n_from_start == usize::MAX {
|
||||
return items.to_vec();
|
||||
}
|
||||
|
||||
let user_positions = user_message_positions_in_rollout(items);
|
||||
if user_positions.len() <= n_from_start {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let cut_idx = user_positions[n_from_start];
|
||||
items[..cut_idx].to_vec()
|
||||
}
|
||||
|
||||
fn codex_home_from_rollout_path(path: &Path) -> Option<&Path> {
|
||||
path.ancestors().find_map(|ancestor| {
|
||||
let name = ancestor.file_name().and_then(OsStr::to_str)?;
|
||||
@@ -8649,81 +8608,6 @@ fn codex_home_from_rollout_path(path: &Path) -> Option<&Path> {
|
||||
})
|
||||
}
|
||||
|
||||
async fn materialize_rollout_items_for_replay(
|
||||
codex_home: Option<&Path>,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<RolloutItem> {
|
||||
const MAX_FORK_REFERENCE_DEPTH: usize = 8;
|
||||
|
||||
let mut materialized = Vec::new();
|
||||
let mut stack: Vec<(Vec<RolloutItem>, usize, usize)> = vec![(rollout_items.to_vec(), 0, 0)];
|
||||
|
||||
while let Some((items, mut idx, depth)) = stack.pop() {
|
||||
while idx < items.len() {
|
||||
match &items[idx] {
|
||||
RolloutItem::ForkReference(reference) => {
|
||||
if depth >= MAX_FORK_REFERENCE_DEPTH {
|
||||
warn!(
|
||||
"skipping fork reference recursion at depth {} for {:?}",
|
||||
depth, reference.rollout_path
|
||||
);
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let resolved_rollout_path = if let Some(codex_home) = codex_home {
|
||||
match resolve_fork_reference_rollout_path(
|
||||
codex_home,
|
||||
&reference.rollout_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to resolve fork reference rollout {:?}: {err}",
|
||||
reference.rollout_path
|
||||
);
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
reference.rollout_path.clone()
|
||||
};
|
||||
let parent_history = match RolloutRecorder::get_rollout_history(
|
||||
&resolved_rollout_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(history) => history,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to load fork reference rollout {:?} (resolved from {:?}): {err}",
|
||||
resolved_rollout_path, reference.rollout_path
|
||||
);
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let parent_items = truncate_rollout_before_nth_user_message_from_start(
|
||||
&parent_history.get_rollout_items(),
|
||||
reference.nth_user_message,
|
||||
);
|
||||
|
||||
stack.push((items, idx + 1, depth));
|
||||
stack.push((parent_items, 0, depth + 1));
|
||||
break;
|
||||
}
|
||||
item => materialized.push(item.clone()),
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
materialized
|
||||
}
|
||||
|
||||
fn with_thread_spawn_agent_metadata(
|
||||
source: codex_protocol::protocol::SessionSource,
|
||||
agent_nickname: Option<String>,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -17,6 +17,7 @@ use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
@@ -906,14 +907,8 @@ async fn spawn_agent_fork_last_n_turns_keeps_only_recent_turns() {
|
||||
history.raw_items(),
|
||||
"old parent context"
|
||||
));
|
||||
assert!(!history_contains_text(
|
||||
history.raw_items(),
|
||||
"queued message"
|
||||
));
|
||||
assert!(history_contains_text(
|
||||
history.raw_items(),
|
||||
"triggered context"
|
||||
));
|
||||
assert!(!history_contains_text(history.raw_items(), "queued message"));
|
||||
assert!(history_contains_text(history.raw_items(), "triggered context"));
|
||||
assert!(history_contains_text(
|
||||
history.raw_items(),
|
||||
"current parent task"
|
||||
@@ -930,6 +925,125 @@ async fn spawn_agent_fork_last_n_turns_keeps_only_recent_turns() {
|
||||
.expect("parent shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_fork_snapshots_parent_boundary_for_persisted_fork_reference() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
parent_thread
|
||||
.inject_user_message_without_turn("parent seed context".to_string())
|
||||
.await;
|
||||
let turn_context = parent_thread.codex.session.new_default_turn().await;
|
||||
let parent_spawn_call_id = "spawn-call-dedup".to_string();
|
||||
let parent_spawn_call = ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
name: "spawn_agent".to_string(),
|
||||
namespace: None,
|
||||
arguments: "{}".to_string(),
|
||||
call_id: parent_spawn_call_id.clone(),
|
||||
};
|
||||
parent_thread
|
||||
.codex
|
||||
.session
|
||||
.record_conversation_items(turn_context.as_ref(), &[parent_spawn_call])
|
||||
.await;
|
||||
parent_thread
|
||||
.codex
|
||||
.session
|
||||
.ensure_rollout_materialized()
|
||||
.await;
|
||||
parent_thread.codex.session.flush_rollout().await;
|
||||
let parent_rollout_path = parent_thread
|
||||
.rollout_path()
|
||||
.expect("parent rollout path should be available");
|
||||
|
||||
let child_thread_id = harness
|
||||
.control
|
||||
.spawn_agent_with_metadata(
|
||||
harness.config.clone(),
|
||||
text_input("child task"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
})),
|
||||
SpawnAgentOptions {
|
||||
fork_parent_spawn_call_id: Some(parent_spawn_call_id),
|
||||
fork_mode: Some(SpawnAgentForkMode::FullHistory),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("forked spawn should succeed")
|
||||
.thread_id;
|
||||
|
||||
parent_thread
|
||||
.inject_user_message_without_turn("parent late turn".to_string())
|
||||
.await;
|
||||
parent_thread
|
||||
.codex
|
||||
.session
|
||||
.ensure_rollout_materialized()
|
||||
.await;
|
||||
parent_thread.codex.session.flush_rollout().await;
|
||||
|
||||
let child_thread = harness
|
||||
.manager
|
||||
.get_thread(child_thread_id)
|
||||
.await
|
||||
.expect("child thread should be registered");
|
||||
let child_rollout_path = child_thread
|
||||
.rollout_path()
|
||||
.expect("child rollout path should be available");
|
||||
let InitialHistory::Resumed(resumed) =
|
||||
RolloutRecorder::get_rollout_history(child_rollout_path.as_path())
|
||||
.await
|
||||
.expect("child rollout should load")
|
||||
else {
|
||||
panic!("child rollout should include session metadata");
|
||||
};
|
||||
|
||||
assert!(resumed.history.iter().any(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path,
|
||||
nth_user_message: 1,
|
||||
}) if rollout_path == &parent_rollout_path
|
||||
)
|
||||
}));
|
||||
let materialized_child_rollout = crate::rollout::truncation::materialize_rollout_items_for_replay(
|
||||
harness.config.codex_home.as_path(),
|
||||
&resumed.history,
|
||||
)
|
||||
.await;
|
||||
let materialized_child_response_items: Vec<ResponseItem> = materialized_child_rollout
|
||||
.iter()
|
||||
.filter_map(|item| match item {
|
||||
RolloutItem::ResponseItem(response_item) => Some(response_item.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
assert!(history_contains_text(
|
||||
&materialized_child_response_items,
|
||||
"parent seed context",
|
||||
));
|
||||
assert!(!history_contains_text(
|
||||
&materialized_child_response_items,
|
||||
"parent late turn",
|
||||
));
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.shutdown_live_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
let _ = parent_thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("parent shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_respects_max_threads_limit() {
|
||||
let max_threads = 1usize;
|
||||
|
||||
@@ -184,6 +184,7 @@ pub use rollout::policy::EventPersistenceMode;
|
||||
pub use rollout::resolve_fork_reference_rollout_path;
|
||||
pub use rollout::rollout_date_parts;
|
||||
pub use rollout::session_index::find_thread_names_by_ids;
|
||||
pub use thread_rollout_truncation::materialize_rollout_items_for_replay;
|
||||
mod function_tool;
|
||||
mod state;
|
||||
mod tasks;
|
||||
|
||||
@@ -619,7 +619,7 @@ impl ThreadManager {
|
||||
truncate_before_nth_user_message(
|
||||
config.codex_home.as_path(),
|
||||
history,
|
||||
nth_user_message,
|
||||
i64::try_from(nth_user_message).unwrap_or(i64::MAX),
|
||||
&snapshot_state,
|
||||
)
|
||||
.await
|
||||
@@ -654,7 +654,7 @@ impl ThreadManager {
|
||||
.chain(std::iter::once(RolloutItem::ForkReference(
|
||||
ForkReferenceItem {
|
||||
rollout_path: path.clone(),
|
||||
nth_user_message,
|
||||
nth_user_message: i64::try_from(nth_user_message).unwrap_or(i64::MAX),
|
||||
},
|
||||
)))
|
||||
.collect();
|
||||
@@ -954,7 +954,7 @@ impl ThreadManagerState {
|
||||
async fn truncate_before_nth_user_message(
|
||||
codex_home: &Path,
|
||||
history: InitialHistory,
|
||||
n: usize,
|
||||
n: i64,
|
||||
snapshot_state: &SnapshotTurnState,
|
||||
) -> InitialHistory {
|
||||
let mut items: Vec<RolloutItem> = history.get_rollout_items();
|
||||
@@ -965,7 +965,9 @@ async fn truncate_before_nth_user_message(
|
||||
items = truncation::materialize_rollout_items_for_replay(codex_home, &items).await;
|
||||
}
|
||||
let user_positions = truncation::user_message_positions_in_rollout(&items);
|
||||
let rolled = if snapshot_state.ends_mid_turn && n >= user_positions.len() {
|
||||
let rolled = if snapshot_state.ends_mid_turn
|
||||
&& usize::try_from(n).map_or(true, |n| n >= user_positions.len())
|
||||
{
|
||||
if let Some(cut_idx) = snapshot_state
|
||||
.active_turn_start_index
|
||||
.or_else(|| user_positions.last().copied())
|
||||
@@ -1077,150 +1079,3 @@ fn append_interrupted_boundary(history: InitialHistory, turn_id: Option<String>)
|
||||
#[cfg(test)]
|
||||
#[path = "thread_manager_tests.rs"]
|
||||
mod tests;
|
||||
// Keep this inline fork-reference test module disabled on the refreshed main API;
|
||||
// branch coverage now comes from the package/integration tests that match current types.
|
||||
#[cfg(any())]
|
||||
mod fork_reference_tests {
|
||||
use super::*;
|
||||
use crate::codex::make_session_and_context;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
fn assistant_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn drops_from_last_user_only() {
|
||||
let items = [
|
||||
user_msg("u1"),
|
||||
assistant_msg("a1"),
|
||||
assistant_msg("a2"),
|
||||
user_msg("u2"),
|
||||
assistant_msg("a3"),
|
||||
ResponseItem::Reasoning {
|
||||
id: "r1".to_string(),
|
||||
summary: vec![ReasoningItemReasoningSummary::SummaryText {
|
||||
text: "s".to_string(),
|
||||
}],
|
||||
content: None,
|
||||
encrypted_content: None,
|
||||
},
|
||||
ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
call_id: "c1".to_string(),
|
||||
name: "tool".to_string(),
|
||||
namespace: None,
|
||||
arguments: "{}".to_string(),
|
||||
},
|
||||
assistant_msg("a4"),
|
||||
];
|
||||
|
||||
let initial: Vec<RolloutItem> = items
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(initial),
|
||||
1,
|
||||
&SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
let got_items = truncated.get_rollout_items();
|
||||
let expected_items = vec![
|
||||
RolloutItem::ResponseItem(items[0].clone()),
|
||||
RolloutItem::ResponseItem(items[1].clone()),
|
||||
RolloutItem::ResponseItem(items[2].clone()),
|
||||
];
|
||||
assert_eq!(
|
||||
serde_json::to_value(&got_items).unwrap(),
|
||||
serde_json::to_value(&expected_items).unwrap()
|
||||
);
|
||||
|
||||
let initial2: Vec<RolloutItem> = items
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
let truncated2 = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(initial2),
|
||||
2,
|
||||
&SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(truncated2, InitialHistory::New));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ignores_session_prefix_messages_when_truncating() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let mut items = session.build_initial_context(&turn_context).await;
|
||||
items.push(user_msg("feature request"));
|
||||
items.push(assistant_msg("ack"));
|
||||
items.push(user_msg("second question"));
|
||||
items.push(assistant_msg("answer"));
|
||||
|
||||
let rollout_items: Vec<RolloutItem> = items
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(rollout_items),
|
||||
1,
|
||||
&SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
let got_items = truncated.get_rollout_items();
|
||||
|
||||
let expected: Vec<RolloutItem> = vec![
|
||||
RolloutItem::ResponseItem(items[0].clone()),
|
||||
RolloutItem::ResponseItem(items[1].clone()),
|
||||
RolloutItem::ResponseItem(items[2].clone()),
|
||||
RolloutItem::ResponseItem(items[3].clone()),
|
||||
];
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(&got_items).unwrap(),
|
||||
serde_json::to_value(&expected).unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ async fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() {
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(items.clone()),
|
||||
usize::MAX,
|
||||
/*n*/ -1,
|
||||
&SnapshotTurnState {
|
||||
ends_mid_turn: true,
|
||||
active_turn_id: None,
|
||||
@@ -190,7 +190,7 @@ async fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(items.clone()),
|
||||
usize::MAX,
|
||||
/*n*/ -1,
|
||||
&snapshot_state,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -92,21 +92,30 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec<usize
|
||||
fork_turn_positions
|
||||
}
|
||||
|
||||
/// Return the current fork boundary to persist in a `ForkReferenceItem`.
|
||||
///
|
||||
/// The boundary is the next user-message index in the current effective rollout. Persisting that
|
||||
/// snapshot, instead of a "live tail" sentinel, prevents later parent turns from being pulled
|
||||
/// into the child when the child rollout is replayed.
|
||||
pub(crate) fn fork_reference_user_message_boundary(items: &[RolloutItem]) -> i64 {
|
||||
i64::try_from(user_message_positions_in_rollout(items).len()).unwrap_or(i64::MAX)
|
||||
}
|
||||
|
||||
/// Return a prefix of `items` obtained by cutting strictly before the nth user message.
|
||||
///
|
||||
/// The boundary index is 0-based from the start of `items` (so `n_from_start = 0` returns
|
||||
/// a prefix that excludes the first user message and everything after it).
|
||||
///
|
||||
/// If `n_from_start` is `usize::MAX`, this returns the full rollout (no truncation).
|
||||
/// If `n_from_start` is negative, this returns the full rollout (no truncation).
|
||||
/// If fewer than or equal to `n_from_start` user messages exist, this returns the full
|
||||
/// rollout unchanged.
|
||||
pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
|
||||
items: &[RolloutItem],
|
||||
n_from_start: usize,
|
||||
n_from_start: i64,
|
||||
) -> Vec<RolloutItem> {
|
||||
if n_from_start == usize::MAX {
|
||||
let Ok(n_from_start) = usize::try_from(n_from_start) else {
|
||||
return items.to_vec();
|
||||
}
|
||||
};
|
||||
|
||||
let user_positions = user_message_positions_in_rollout(items);
|
||||
|
||||
@@ -157,7 +166,12 @@ fn is_trigger_turn_boundary(item: &ResponseItem) -> bool {
|
||||
.is_some_and(|communication| communication.trigger_turn)
|
||||
}
|
||||
|
||||
pub(crate) async fn materialize_rollout_items_for_replay(
|
||||
/// Expand `ForkReference` items into the referenced parent rollout slices they encode.
|
||||
///
|
||||
/// This preserves child rollout compactness on disk while letting replay callers rebuild the
|
||||
/// effective inherited transcript before reconstructing conversation history or deriving thread
|
||||
/// summaries.
|
||||
pub async fn materialize_rollout_items_for_replay(
|
||||
codex_home: &Path,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<RolloutItem> {
|
||||
|
||||
@@ -102,7 +102,8 @@ fn truncation_max_keeps_full_rollout() {
|
||||
RolloutItem::ResponseItem(user_msg("u2")),
|
||||
];
|
||||
|
||||
let truncated = truncate_rollout_before_nth_user_message_from_start(&rollout, usize::MAX);
|
||||
let truncated =
|
||||
truncate_rollout_before_nth_user_message_from_start(&rollout, /*n_from_start*/ -1);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(&truncated).unwrap(),
|
||||
|
||||
@@ -2,6 +2,8 @@ use codex_core::ForkSnapshot;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::parse_turn_item;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
@@ -33,18 +35,54 @@ fn find_user_input_positions(items: &[RolloutItem]) -> Vec<usize> {
|
||||
|
||||
fn truncate_before_nth_user_message(
|
||||
items: &[RolloutItem],
|
||||
nth_user_message: usize,
|
||||
nth_user_message: i64,
|
||||
) -> Vec<RolloutItem> {
|
||||
if nth_user_message == usize::MAX {
|
||||
let Ok(nth_user_message) = usize::try_from(nth_user_message) else {
|
||||
return items.to_vec();
|
||||
}
|
||||
};
|
||||
let user_inputs = find_user_input_positions(items);
|
||||
let Some(cut_idx) = user_inputs.get(nth_user_message).copied() else {
|
||||
return Vec::new();
|
||||
return items.to_vec();
|
||||
};
|
||||
items[..cut_idx].to_vec()
|
||||
}
|
||||
|
||||
fn test_user_message(text: &str) -> RolloutItem {
|
||||
RolloutItem::ResponseItem(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_before_nth_user_message_keeps_full_history_for_out_of_range_boundaries() {
|
||||
let rollout_items = vec![test_user_message("u1"), test_user_message("u2")];
|
||||
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(truncate_before_nth_user_message(
|
||||
&rollout_items,
|
||||
/*nth_user_message*/ 2,
|
||||
))
|
||||
.unwrap(),
|
||||
serde_json::to_value(&rollout_items).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_before_nth_user_message_keeps_full_history_for_i64_max_boundaries() {
|
||||
let rollout_items = vec![test_user_message("u1"), test_user_message("u2")];
|
||||
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(truncate_before_nth_user_message(&rollout_items, i64::MAX,)).unwrap(),
|
||||
serde_json::to_value(&rollout_items).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
fn read_items_materialized(p: &std::path::Path) -> Vec<RolloutItem> {
|
||||
let text =
|
||||
std::fs::read_to_string(p).unwrap_or_else(|err| panic!("read rollout file {p:?}: {err}"));
|
||||
@@ -147,9 +185,8 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
|
||||
// GetHistory on fork1 flushed; the file is ready.
|
||||
let fork1_items = read_items_materialized(&fork1_path);
|
||||
assert!(fork1_items.len() > expected_after_first.len());
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(&fork1_items[..expected_after_first.len()]).unwrap(),
|
||||
serde_json::to_value(&fork1_items).unwrap(),
|
||||
serde_json::to_value(&expected_after_first).unwrap()
|
||||
);
|
||||
|
||||
@@ -178,9 +215,8 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
.unwrap_or(0);
|
||||
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
|
||||
let fork2_items = read_items_materialized(&fork2_path);
|
||||
assert!(fork2_items.len() > expected_after_second.len());
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(&fork2_items[..expected_after_second.len()]).unwrap(),
|
||||
serde_json::to_value(&fork2_items).unwrap(),
|
||||
serde_json::to_value(&expected_after_second).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2312,11 +2312,7 @@ impl InitialHistory {
|
||||
.iter()
|
||||
.filter_map(|ri| match ri {
|
||||
RolloutItem::EventMsg(ev) => Some(ev.clone()),
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_) => None,
|
||||
_ => None,
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
@@ -2325,11 +2321,7 @@ impl InitialHistory {
|
||||
.iter()
|
||||
.filter_map(|ri| match ri {
|
||||
RolloutItem::EventMsg(ev) => Some(ev.clone()),
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_) => None,
|
||||
_ => None,
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
@@ -2343,20 +2335,12 @@ impl InitialHistory {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
_ => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -2367,20 +2351,12 @@ impl InitialHistory {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
_ => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -2389,11 +2365,7 @@ impl InitialHistory {
|
||||
fn session_cwd_from_items(items: &[RolloutItem]) -> Option<PathBuf> {
|
||||
items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.cwd.clone()),
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2600,10 +2572,38 @@ pub struct SessionMetaLine {
|
||||
pub git: Option<GitInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct ForkReferenceItem {
|
||||
pub rollout_path: PathBuf,
|
||||
pub nth_user_message: usize,
|
||||
#[serde(
|
||||
deserialize_with = "deserialize_fork_reference_nth_user_message",
|
||||
default
|
||||
)]
|
||||
pub nth_user_message: i64,
|
||||
}
|
||||
|
||||
fn deserialize_fork_reference_nth_user_message<'de, D>(deserializer: D) -> Result<i64, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let value = Value::deserialize(deserializer)?;
|
||||
let Value::Number(number) = value else {
|
||||
return Err(serde::de::Error::custom(
|
||||
"expected integer fork reference boundary",
|
||||
));
|
||||
};
|
||||
|
||||
if let Some(nth_user_message) = number.as_i64() {
|
||||
return Ok(nth_user_message);
|
||||
}
|
||||
|
||||
if number.as_u64().is_some() {
|
||||
return Ok(i64::MAX);
|
||||
}
|
||||
|
||||
Err(serde::de::Error::custom(
|
||||
"expected integer fork reference boundary",
|
||||
))
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
|
||||
@@ -3727,6 +3727,23 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fork_reference_item_deserializes_legacy_usize_max_boundary() {
|
||||
let item: ForkReferenceItem = serde_json::from_value(json!({
|
||||
"rollout_path": "/tmp/rollout.jsonl",
|
||||
"nth_user_message": u64::MAX,
|
||||
}))
|
||||
.expect("legacy fork reference item should deserialize");
|
||||
|
||||
assert_eq!(
|
||||
item,
|
||||
ForkReferenceItem {
|
||||
rollout_path: PathBuf::from("/tmp/rollout.jsonl"),
|
||||
nth_user_message: i64::MAX,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn session_source_restriction_product_does_not_guess_subagent_products() {
|
||||
assert_eq!(
|
||||
|
||||
@@ -7,6 +7,8 @@ use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
@@ -62,6 +64,47 @@ fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result<Path
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_rollout_history_preserves_legacy_fork_reference_boundaries() -> std::io::Result<()> {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
let parent_id = Uuid::new_v4();
|
||||
let child_id = Uuid::new_v4();
|
||||
let ts = "2025-01-03T12:00:00.000Z";
|
||||
let parent_rollout_path = write_session_file(home.path(), ts, parent_id)?;
|
||||
let child_rollout_path = write_session_file(home.path(), ts, child_id)?;
|
||||
|
||||
let mut file = fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&child_rollout_path)?;
|
||||
let fork_reference_line = serde_json::json!({
|
||||
"timestamp": ts,
|
||||
"type": "fork_reference",
|
||||
"payload": {
|
||||
"rollout_path": parent_rollout_path,
|
||||
"nth_user_message": u64::MAX,
|
||||
},
|
||||
});
|
||||
writeln!(file, "{fork_reference_line}")?;
|
||||
|
||||
let history = RolloutRecorder::get_rollout_history(&child_rollout_path).await?;
|
||||
let InitialHistory::Resumed(resumed) = history else {
|
||||
panic!("expected resumed history");
|
||||
};
|
||||
|
||||
let loaded_fork_reference = resumed.history.last().and_then(|item| match item {
|
||||
RolloutItem::ForkReference(fork_reference) => Some(fork_reference),
|
||||
_ => None,
|
||||
});
|
||||
assert_eq!(
|
||||
loaded_fork_reference,
|
||||
Some(&ForkReferenceItem {
|
||||
rollout_path: parent_rollout_path,
|
||||
nth_user_message: i64::MAX,
|
||||
}),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<()> {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
|
||||
Reference in New Issue
Block a user