Compare commits

...

11 Commits

Author SHA1 Message Date
Friel
9cef1b9415 Preserve compact fork roots for full-history spawns 2026-04-03 21:19:25 +00:00
Friel
6ccc419984 Preserve compact fork roots for full-history spawns 2026-04-03 21:05:48 +00:00
Friel
6776e3fc72 Merge upstream/main into dev/friel/fork-references 2026-04-02 09:05:28 +00:00
Friel
a364d0358c test(core,exec): align fork replay and resume expectations 2026-04-02 02:33:50 +00:00
Friel
5dbd418beb fix(core): keep truncated forks materialized 2026-04-02 01:11:19 +00:00
Friel
54e51e3577 style(core): format fork control tests 2026-04-02 00:22:15 +00:00
Friel
63882cfd48 fix(core): preserve unresolved fork references 2026-04-02 00:14:35 +00:00
Friel
bbb4fdd0a6 Fix fork reference truncation test semantics 2026-04-02 00:14:06 +00:00
Friel
984961958f test(core): annotate fork-thread positional literals 2026-04-02 00:12:26 +00:00
Friel
13bd72d9ff fix(core): adapt fork references to refreshed main
Disable stale inline fork-reference test modules on the refreshed core APIs and keep the rollout re-export surface aligned with the split codex-rollout crate.

Co-authored-by: Codex <noreply@openai.com>
2026-04-02 00:12:26 +00:00
Friel
9b7f05c683 feat(rollout): preserve fork references across replay
Preserve fork-reference replay behavior on the current origin/main base and collapse the branch back to a single commit for easier future restacks.
2026-04-02 00:12:26 +00:00
25 changed files with 1198 additions and 98 deletions

View File

@@ -205,7 +205,9 @@ impl ThreadHistoryBuilder {
RolloutItem::EventMsg(event) => self.handle_event(event),
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {}
RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_) => {}
}
}

View File

@@ -212,6 +212,7 @@ use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_name_by_id;
use codex_core::find_thread_names_by_ids;
use codex_core::find_thread_path_by_id_str;
use codex_core::materialize_rollout_items_for_replay;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use codex_core::parse_cursor;
use codex_core::plugins::MarketplaceError;
@@ -8467,13 +8468,17 @@ pub(crate) async fn read_summary_from_rollout(
.unwrap_or_else(|| fallback_provider.to_string());
let git_info = git.as_ref().map(map_git_info);
let updated_at = updated_at.or_else(|| timestamp.clone());
let preview = read_rollout_items_from_rollout(path)
.await
.map(|items| preview_from_rollout_items(&items))
.unwrap_or_default();
Ok(ConversationSummary {
conversation_id: session_meta.id,
timestamp,
updated_at,
path: path.to_path_buf(),
preview: String::new(),
preview,
model_provider,
cwd: session_meta.cwd,
cli_version: session_meta.cli_version,
@@ -8491,6 +8496,10 @@ pub(crate) async fn read_rollout_items_from_rollout(
InitialHistory::Resumed(resumed) => resumed.history,
};
if let Some(codex_home) = codex_home_from_rollout_path(path) {
return Ok(materialize_rollout_items_for_replay(codex_home, &items).await);
}
Ok(items)
}
@@ -8598,6 +8607,17 @@ fn preview_from_rollout_items(items: &[RolloutItem]) -> String {
.unwrap_or_default()
}
fn codex_home_from_rollout_path(path: &Path) -> Option<&Path> {
path.ancestors().find_map(|ancestor| {
let name = ancestor.file_name().and_then(OsStr::to_str)?;
if name == codex_core::SESSIONS_SUBDIR || name == codex_core::ARCHIVED_SESSIONS_SUBDIR {
ancestor.parent()
} else {
None
}
})
}
fn with_thread_spawn_agent_metadata(
source: codex_protocol::protocol::SessionSource,
agent_nickname: Option<String>,

View File

@@ -7,6 +7,10 @@ use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
@@ -20,6 +24,8 @@ use codex_app_server_protocol::ThreadSetNameResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
@@ -152,6 +158,150 @@ async fn thread_read_can_include_turns() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_read_include_turns_keeps_fork_history_after_parent_archive_and_unarchive()
-> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread: parent, .. } =
to_response::<ThreadStartResponse>(start_resp)?;
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: parent.id.clone(),
input: vec![UserInput::Text {
text: "parent message".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
)
.await??;
let _: TurnStartResponse = to_response::<TurnStartResponse>(turn_start_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: parent.id.clone(),
..Default::default()
})
.await?;
let fork_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
)
.await??;
let ThreadForkResponse { thread: child, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
let read_child_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: child.id.clone(),
include_turns: true,
})
.await?;
let read_child_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
)
.await??;
let ThreadReadResponse {
thread: child_before_archive,
} = to_response::<ThreadReadResponse>(read_child_resp)?;
assert_eq!(child_before_archive.turns.len(), 1);
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent.id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let read_child_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: child.id.clone(),
include_turns: true,
})
.await?;
let read_child_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
)
.await??;
let ThreadReadResponse {
thread: child_after_archive,
} = to_response::<ThreadReadResponse>(read_child_resp)?;
assert_eq!(child_after_archive.turns, child_before_archive.turns);
let unarchive_id = mcp
.send_thread_unarchive_request(ThreadUnarchiveParams {
thread_id: parent.id,
})
.await?;
let unarchive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unarchive_id)),
)
.await??;
let _: ThreadUnarchiveResponse = to_response::<ThreadUnarchiveResponse>(unarchive_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/unarchived"),
)
.await??;
let read_child_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: child.id,
include_turns: true,
})
.await?;
let read_child_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
)
.await??;
let ThreadReadResponse {
thread: child_after_unarchive,
} = to_response::<ThreadReadResponse>(read_child_resp)?;
assert_eq!(child_after_unarchive.turns, child_before_archive.turns);
Ok(())
}
#[tokio::test]
async fn thread_read_loaded_thread_returns_precomputed_path_before_materialization() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View File

@@ -14,12 +14,15 @@ use crate::session_prefix::format_subagent_context_line;
use crate::session_prefix::format_subagent_notification_message;
use crate::shell_snapshot::ShellSnapshot;
use crate::thread_manager::ThreadManagerState;
use crate::thread_rollout_truncation::fork_reference_user_message_boundary;
use crate::thread_rollout_truncation::materialize_rollout_items_for_replay;
use crate::thread_rollout_truncation::truncate_rollout_to_last_n_fork_turns;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::Op;
@@ -309,20 +312,76 @@ impl AgentControl {
let mut forked_rollout_items = RolloutRecorder::get_rollout_history(&rollout_path)
.await?
.get_rollout_items();
if let SpawnAgentForkMode::LastNTurns(last_n_turns) = fork_mode {
forked_rollout_items =
truncate_rollout_to_last_n_fork_turns(&forked_rollout_items, *last_n_turns);
if forked_rollout_items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
forked_rollout_items = materialize_rollout_items_for_replay(
config.codex_home.as_path(),
&forked_rollout_items,
)
.await;
}
let parent_has_matching_spawn_call = forked_rollout_items.iter().any(|item| {
matches!(
item,
RolloutItem::ResponseItem(ResponseItem::FunctionCall {
call_id: existing_call_id,
..
}) if existing_call_id == call_id
)
});
match fork_mode {
SpawnAgentForkMode::FullHistory => {
let source_session_meta = forked_rollout_items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.clone()),
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
});
let fork_boundary = fork_reference_user_message_boundary(&forked_rollout_items);
forked_rollout_items = source_session_meta
.into_iter()
.map(RolloutItem::SessionMeta)
.chain(std::iter::once(RolloutItem::ForkReference(
ForkReferenceItem {
rollout_path: rollout_path.clone(),
nth_user_message: fork_boundary,
},
)))
.collect();
}
SpawnAgentForkMode::LastNTurns(last_n_turns) => {
forked_rollout_items =
truncate_rollout_to_last_n_fork_turns(&forked_rollout_items, *last_n_turns);
}
}
let mut output =
FunctionCallOutputPayload::from_text(FORKED_SPAWN_AGENT_OUTPUT_MESSAGE.to_string());
output.success = Some(true);
forked_rollout_items.push(RolloutItem::ResponseItem(
ResponseItem::FunctionCallOutput {
call_id: call_id.to_string(),
output,
},
));
let has_matching_spawn_call = match fork_mode {
SpawnAgentForkMode::FullHistory => parent_has_matching_spawn_call,
SpawnAgentForkMode::LastNTurns(_) => forked_rollout_items.iter().any(|item| {
matches!(
item,
RolloutItem::ResponseItem(ResponseItem::FunctionCall {
call_id: existing_call_id,
..
}) if existing_call_id == call_id
)
}),
};
if has_matching_spawn_call {
let mut output =
FunctionCallOutputPayload::from_text(FORKED_SPAWN_AGENT_OUTPUT_MESSAGE.to_string());
output.success = Some(true);
forked_rollout_items.push(RolloutItem::ResponseItem(
ResponseItem::FunctionCallOutput {
call_id: call_id.to_string(),
output,
},
));
}
state
.fork_thread_with_source(

View File

@@ -17,6 +17,7 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
@@ -930,6 +931,136 @@ async fn spawn_agent_fork_last_n_turns_keeps_only_recent_turns() {
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_fork_snapshots_parent_boundary_for_persisted_fork_reference() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
parent_thread
.inject_user_message_without_turn("parent seed context".to_string())
.await;
let turn_context = parent_thread.codex.session.new_default_turn().await;
let parent_spawn_call_id = "spawn-call-dedup".to_string();
let parent_spawn_call = ResponseItem::FunctionCall {
id: None,
name: "spawn_agent".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: parent_spawn_call_id.clone(),
};
parent_thread
.codex
.session
.record_conversation_items(turn_context.as_ref(), &[parent_spawn_call])
.await;
parent_thread
.codex
.session
.ensure_rollout_materialized()
.await;
parent_thread.codex.session.flush_rollout().await;
let parent_rollout_path = parent_thread
.rollout_path()
.expect("parent rollout path should be available");
let child_thread_id = harness
.control
.spawn_agent_with_metadata(
harness.config.clone(),
text_input("child task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
})),
SpawnAgentOptions {
fork_parent_spawn_call_id: Some(parent_spawn_call_id),
fork_mode: Some(SpawnAgentForkMode::FullHistory),
},
)
.await
.expect("forked spawn should succeed")
.thread_id;
parent_thread
.inject_user_message_without_turn("parent late turn".to_string())
.await;
parent_thread
.codex
.session
.ensure_rollout_materialized()
.await;
parent_thread.codex.session.flush_rollout().await;
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let child_rollout_path = child_thread
.rollout_path()
.expect("child rollout path should be available");
let InitialHistory::Resumed(resumed) =
RolloutRecorder::get_rollout_history(child_rollout_path.as_path())
.await
.expect("child rollout should load")
else {
panic!("child rollout should include session metadata");
};
assert_matches!(
resumed.history.as_slice(),
[
RolloutItem::SessionMeta(meta_line),
RolloutItem::ForkReference(ForkReferenceItem {
rollout_path,
nth_user_message: 1,
}),
RolloutItem::ResponseItem(ResponseItem::FunctionCallOutput {
call_id,
output,
}),
..
] if meta_line.meta.id == child_thread_id
&& meta_line.meta.forked_from_id == Some(parent_thread_id)
&& rollout_path == &parent_rollout_path
&& call_id == "spawn-call-dedup"
&& output.text_content() == Some(FORKED_SPAWN_AGENT_OUTPUT_MESSAGE)
);
let materialized_child_rollout =
crate::rollout::truncation::materialize_rollout_items_for_replay(
harness.config.codex_home.as_path(),
&resumed.history,
)
.await;
let materialized_child_response_items: Vec<ResponseItem> = materialized_child_rollout
.iter()
.filter_map(|item| match item {
RolloutItem::ResponseItem(response_item) => Some(response_item.clone()),
_ => None,
})
.collect();
assert!(history_contains_text(
&materialized_child_response_items,
"parent seed context",
));
assert!(!history_contains_text(
&materialized_child_response_items,
"parent late turn",
));
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let _ = parent_thread
.submit(Op::Shutdown {})
.await
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_respects_max_threads_limit() {
let max_threads = 1usize;

View File

@@ -2182,6 +2182,11 @@ impl Session {
state.clear_connector_selection();
}
async fn set_connector_selection(&self, connector_ids: HashSet<String>) {
self.clear_connector_selection().await;
self.merge_connector_selection(connector_ids).await;
}
async fn record_initial_history(&self, conversation_history: InitialHistory) {
let turn_context = self.new_default_turn().await;
let is_subagent = {
@@ -2200,8 +2205,19 @@ impl Session {
}
InitialHistory::Resumed(resumed_history) => {
let rollout_items = resumed_history.history;
let hydrated_rollout_items = if rollout_items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
self.materialize_rollout_items_for_replay(&rollout_items)
.await
} else {
rollout_items.clone()
};
let restored_connector_selection =
Self::extract_connector_selection_from_rollout(&hydrated_rollout_items);
let previous_turn_settings = self
.apply_rollout_reconstruction(&turn_context, &rollout_items)
.apply_rollout_reconstruction(&turn_context, &hydrated_rollout_items)
.await;
// If resuming, warn when the last recorded model differs from the current one.
@@ -2226,10 +2242,13 @@ impl Session {
// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}
if let Some(selected_connectors) = restored_connector_selection {
self.set_connector_selection(selected_connectors).await;
}
// Defer seeding the session's initial context until the first turn starts so
// turn/start overrides can be merged before we write to the rollout.
@@ -2238,18 +2257,40 @@ impl Session {
}
}
InitialHistory::Forked(rollout_items) => {
self.apply_rollout_reconstruction(&turn_context, &rollout_items)
let persisted_rollout_items = rollout_items
.iter()
.position(|item| matches!(item, RolloutItem::ForkReference(_)))
.map(|index| rollout_items[index..].to_vec());
let hydrated_rollout_items = if rollout_items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
self.materialize_rollout_items_for_replay(&rollout_items)
.await
} else {
rollout_items.clone()
};
let restored_connector_selection =
Self::extract_connector_selection_from_rollout(&hydrated_rollout_items);
self.apply_rollout_reconstruction(&turn_context, &hydrated_rollout_items)
.await;
// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}
if let Some(selected_connectors) = restored_connector_selection {
self.set_connector_selection(selected_connectors).await;
}
// If persisting, persist all rollout items as-is (recorder filters)
if !rollout_items.is_empty() {
// Persist only the compact fork reference suffix so child rollouts do not
// duplicate the full parent history they inherited in memory.
if let Some(persisted_rollout_items) = persisted_rollout_items {
self.persist_rollout_items(&persisted_rollout_items).await;
} else if !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
}
@@ -2290,6 +2331,41 @@ impl Session {
})
}
fn extract_connector_selection_from_rollout(
rollout_items: &[RolloutItem],
) -> Option<HashSet<String>> {
let mut active_selected_connectors: Option<HashSet<String>> = None;
for item in rollout_items {
let RolloutItem::ResponseItem(response_item) = item else {
continue;
};
let ResponseItem::FunctionCallOutput { output, .. } = response_item else {
continue;
};
let Some(content) = output.body.to_text() else {
continue;
};
let Ok(payload) = serde_json::from_str::<Value>(&content) else {
continue;
};
let Some(selected_connectors) = payload
.get("active_selected_tools")
.and_then(Value::as_array)
else {
continue;
};
let connector_ids = selected_connectors
.iter()
.filter_map(Value::as_str)
.map(ToOwned::to_owned)
.collect::<HashSet<_>>();
active_selected_connectors = Some(connector_ids);
}
active_selected_connectors
}
async fn previous_turn_settings(&self) -> Option<PreviousTurnSettings> {
let state = self.state.lock().await;
state.previous_turn_settings()

View File

@@ -84,11 +84,34 @@ fn finalize_active_segment<'a>(
}
impl Session {
pub(super) async fn materialize_rollout_items_for_replay(
&self,
rollout_items: &[RolloutItem],
) -> Vec<RolloutItem> {
let codex_home = {
self.state
.lock()
.await
.session_configuration
.codex_home
.clone()
};
crate::rollout::truncation::materialize_rollout_items_for_replay(
codex_home.as_path(),
rollout_items,
)
.await
}
pub(super) async fn reconstruct_history_from_rollout(
&self,
turn_context: &TurnContext,
rollout_items: &[RolloutItem],
) -> RolloutReconstruction {
let rollout_items = self
.materialize_rollout_items_for_replay(rollout_items)
.await;
let rollout_items = rollout_items.as_slice();
// Replay metadata should already match the shape of the future lazy reverse loader, even
// while history materialization still uses an eager bridge. Scan newest-to-oldest,
// stopping once a surviving replacement-history checkpoint and the required resume metadata
@@ -207,7 +230,9 @@ impl Session {
active_segment.get_or_insert_with(ActiveReplaySegment::default);
active_segment.counts_as_user_turn |= is_user_turn_boundary(response_item);
}
RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => {}
RolloutItem::EventMsg(_)
| RolloutItem::ForkReference(_)
| RolloutItem::SessionMeta(_) => {}
}
if base_replacement_history.is_some()
@@ -275,6 +300,7 @@ impl Session {
history.drop_last_n_user_turns(rollback.num_turns);
}
RolloutItem::EventMsg(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_) => {}
}

View File

@@ -5,11 +5,19 @@ use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::ResumedHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message {
@@ -54,6 +62,53 @@ fn inter_agent_assistant_message(text: &str) -> ResponseItem {
}
}
fn write_rollout_items(
root: &Path,
thread_id: ThreadId,
items: &[RolloutItem],
) -> std::io::Result<PathBuf> {
let rollout_dir = root
.join(crate::SESSIONS_SUBDIR)
.join("2026")
.join("03")
.join("05");
std::fs::create_dir_all(&rollout_dir)?;
let rollout_path = rollout_dir.join(format!("rollout-2026-03-05T00-00-00-{thread_id}.jsonl"));
let session_meta_line = RolloutLine {
timestamp: "2026-03-05T00:00:00Z".to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: thread_id,
timestamp: "2026-03-05T00:00:00Z".to_string(),
cwd: root.to_path_buf(),
originator: "codex".to_string(),
cli_version: "test".to_string(),
source: SessionSource::Exec,
agent_nickname: None,
agent_role: None,
agent_path: None,
model_provider: Some("openai".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
forked_from_id: None,
},
git: None,
}),
};
let mut text = format!("{}\n", serde_json::to_string(&session_meta_line).unwrap());
for item in items {
let line = RolloutLine {
timestamp: "2026-03-05T00:00:01Z".to_string(),
item: item.clone(),
};
text.push_str(&serde_json::to_string(&line).unwrap());
text.push('\n');
}
std::fs::write(&rollout_path, text)?;
Ok(rollout_path)
}
#[tokio::test]
async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_turn_settings()
{
@@ -93,6 +148,125 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ
assert!(session.reference_context_item().await.is_none());
}
#[tokio::test]
async fn reconstruct_history_materializes_fork_reference_rollout_items() {
let (session, turn_context) = make_session_and_context().await;
let dir = TempDir::new().expect("create temp dir");
let parent_thread_id = ThreadId::new();
let parent_rollout_path = write_rollout_items(
dir.path(),
parent_thread_id,
&[
RolloutItem::ResponseItem(user_message("first user")),
RolloutItem::ResponseItem(assistant_message("first reply")),
RolloutItem::ResponseItem(user_message("second user")),
RolloutItem::ResponseItem(assistant_message("second reply")),
],
)
.expect("write parent rollout");
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: parent_rollout_path,
nth_user_message: 1,
})];
let reconstructed = session
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
assert_eq!(
reconstructed.history,
vec![user_message("first user"), assistant_message("first reply")]
);
}
#[tokio::test]
async fn record_initial_history_forked_materializes_fork_reference_rollout_items() {
let (session, turn_context) = make_session_and_context().await;
let codex_home = turn_context.config.codex_home.clone();
let parent_thread_id = ThreadId::new();
let parent_rollout_path = write_rollout_items(
codex_home.as_path(),
parent_thread_id,
&[
RolloutItem::ResponseItem(user_message("first user")),
RolloutItem::ResponseItem(assistant_message("first reply")),
RolloutItem::ResponseItem(user_message("second user")),
RolloutItem::ResponseItem(assistant_message("second reply")),
],
)
.expect("write parent rollout");
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: parent_rollout_path,
nth_user_message: 1,
})];
session
.record_initial_history(InitialHistory::Forked(rollout_items))
.await;
let history = session.state.lock().await.clone_history();
assert_eq!(
vec![user_message("first user"), assistant_message("first reply")],
history.raw_items()
);
assert!(session.reference_context_item().await.is_none());
}
#[tokio::test]
async fn reconstruct_history_resolves_fork_reference_after_parent_archive_and_unarchive() {
let (session, turn_context) = make_session_and_context().await;
let codex_home = turn_context.config.codex_home.clone();
let parent_thread_id = ThreadId::new();
let parent_rollout_path = write_rollout_items(
codex_home.as_path(),
parent_thread_id,
&[
RolloutItem::ResponseItem(user_message("first user")),
RolloutItem::ResponseItem(assistant_message("first reply")),
RolloutItem::ResponseItem(user_message("second user")),
RolloutItem::ResponseItem(assistant_message("second reply")),
],
)
.expect("write parent rollout");
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: parent_rollout_path.clone(),
nth_user_message: 1,
})];
let expected_history = vec![user_message("first user"), assistant_message("first reply")];
let archived_rollout_dir = codex_home
.join(crate::ARCHIVED_SESSIONS_SUBDIR)
.join("2026")
.join("03")
.join("05");
std::fs::create_dir_all(&archived_rollout_dir).expect("create archived rollout dir");
let archived_rollout_path = archived_rollout_dir.join(
parent_rollout_path
.file_name()
.expect("parent rollout file name"),
);
std::fs::rename(&parent_rollout_path, &archived_rollout_path).expect("archive parent rollout");
let reconstructed = session
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
assert_eq!(reconstructed.history, expected_history);
let unarchived_rollout_dir = codex_home
.join(crate::SESSIONS_SUBDIR)
.join("2026")
.join("03")
.join("05");
std::fs::create_dir_all(&unarchived_rollout_dir).expect("create unarchived rollout dir");
std::fs::rename(&archived_rollout_path, &parent_rollout_path)
.expect("unarchive parent rollout");
let reconstructed = session
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
assert_eq!(reconstructed.history, expected_history);
}
#[tokio::test]
async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lifecycle_turn_with_missing_turn_context_id()
{

View File

@@ -159,8 +159,10 @@ pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary;
pub use rollout::list::read_session_meta_line;
pub use rollout::policy::EventPersistenceMode;
pub use rollout::resolve_fork_reference_rollout_path;
pub use rollout::rollout_date_parts;
pub use rollout::session_index::find_thread_names_by_ids;
pub use thread_rollout_truncation::materialize_rollout_items_for_replay;
mod function_tool;
mod state;
mod tasks;

View File

@@ -12,6 +12,7 @@ pub use codex_rollout::find_conversation_path_by_id_str;
pub use codex_rollout::find_thread_name_by_id;
pub use codex_rollout::find_thread_path_by_id_str;
pub use codex_rollout::find_thread_path_by_name_str;
pub use codex_rollout::resolve_fork_reference_rollout_path;
pub use codex_rollout::rollout_date_parts;
impl codex_rollout::RolloutConfigView for Config {

View File

@@ -33,6 +33,7 @@ use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
@@ -45,6 +46,7 @@ use codex_protocol::protocol::W3cTraceContext;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -607,18 +609,22 @@ impl ThreadManager {
S: Into<ForkSnapshot>,
{
let snapshot = snapshot.into();
let history = RolloutRecorder::get_rollout_history(&path).await?;
// True forks must discard the source rollout's conversation id so the child gets a
// distinct thread id and preserves `forked_from_id` in its SessionMeta. Using the
// resume loader here silently turns a fork into an in-place resume.
let history = RolloutRecorder::get_fork_history(&path).await?;
let snapshot_state = snapshot_turn_state(&history);
let history = match snapshot {
let mut history = match snapshot {
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => {
truncate_before_nth_user_message(history, nth_user_message, &snapshot_state)
truncate_before_nth_user_message(
config.codex_home.as_path(),
history,
i64::try_from(nth_user_message).unwrap_or(i64::MAX),
&snapshot_state,
)
.await
}
ForkSnapshot::Interrupted => {
let history = match history {
InitialHistory::New => InitialHistory::New,
InitialHistory::Forked(history) => InitialHistory::Forked(history),
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
};
if snapshot_state.ends_mid_turn {
append_interrupted_boundary(history, snapshot_state.active_turn_id)
} else {
@@ -626,6 +632,33 @@ impl ThreadManager {
}
}
};
if let (
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message),
InitialHistory::Forked(items),
) = (snapshot, &mut history)
{
let source_session_meta = items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.clone()),
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
});
// Keep the source SessionMeta in-memory so startup can derive `forked_from_id`
// for SessionConfigured while still persisting only the compact ForkReference
// suffix to the child rollout on disk.
*items = source_session_meta
.into_iter()
.map(RolloutItem::SessionMeta)
.chain(std::iter::once(RolloutItem::ForkReference(
ForkReferenceItem {
rollout_path: path.clone(),
nth_user_message: i64::try_from(nth_user_message).unwrap_or(i64::MAX),
},
)))
.collect();
}
Box::pin(self.state.spawn_thread(
config,
history,
@@ -918,14 +951,23 @@ impl ThreadManagerState {
/// when the source thread is currently mid-turn they fall back to cutting
/// before the active turn's opening boundary so the fork omits the unfinished
/// suffix entirely.
fn truncate_before_nth_user_message(
async fn truncate_before_nth_user_message(
codex_home: &Path,
history: InitialHistory,
n: usize,
n: i64,
snapshot_state: &SnapshotTurnState,
) -> InitialHistory {
let items: Vec<RolloutItem> = history.get_rollout_items();
let mut items: Vec<RolloutItem> = history.get_rollout_items();
if items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
items = truncation::materialize_rollout_items_for_replay(codex_home, &items).await;
}
let user_positions = truncation::user_message_positions_in_rollout(&items);
let rolled = if snapshot_state.ends_mid_turn && n >= user_positions.len() {
let rolled = if snapshot_state.ends_mid_turn
&& usize::try_from(n).map_or(true, |n| n >= user_positions.len())
{
if let Some(cut_idx) = snapshot_state
.active_turn_start_index
.or_else(|| user_positions.last().copied())

View File

@@ -3,7 +3,6 @@ use crate::codex::make_session_and_context;
use crate::config::test_config;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::models_manager::manager::RefreshStrategy;
use crate::rollout::RolloutRecorder;
use crate::tasks::interrupted_turn_history_marker;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
@@ -15,6 +14,7 @@ use codex_protocol::protocol::UserMessageEvent;
use core_test_support::PathExt;
use core_test_support::responses::mount_models_once;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::time::Duration;
use tempfile::tempdir;
use wiremock::MockServer;
@@ -42,8 +42,8 @@ fn assistant_msg(text: &str) -> ResponseItem {
}
}
#[test]
fn truncates_before_requested_user_message() {
#[tokio::test]
async fn truncates_before_requested_user_message() {
let items = [
user_msg("u1"),
assistant_msg("a1"),
@@ -74,6 +74,7 @@ fn truncates_before_requested_user_message() {
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(initial),
/*n*/ 1,
&SnapshotTurnState {
@@ -81,7 +82,8 @@ fn truncates_before_requested_user_message() {
active_turn_id: None,
active_turn_start_index: None,
},
);
)
.await;
let got_items = truncated.get_rollout_items();
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
@@ -99,6 +101,7 @@ fn truncates_before_requested_user_message() {
.map(RolloutItem::ResponseItem)
.collect();
let truncated2 = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(initial2.clone()),
/*n*/ 2,
&SnapshotTurnState {
@@ -106,15 +109,16 @@ fn truncates_before_requested_user_message() {
active_turn_id: None,
active_turn_start_index: None,
},
);
)
.await;
assert_eq!(
serde_json::to_value(truncated2.get_rollout_items()).unwrap(),
serde_json::to_value(initial2).unwrap()
);
}
#[test]
fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() {
#[tokio::test]
async fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() {
let items = vec![
RolloutItem::ResponseItem(user_msg("u1")),
RolloutItem::ResponseItem(assistant_msg("a1")),
@@ -123,14 +127,16 @@ fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() {
];
let truncated = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(items.clone()),
usize::MAX,
/*n*/ -1,
&SnapshotTurnState {
ends_mid_turn: true,
active_turn_id: None,
active_turn_start_index: None,
},
);
)
.await;
assert_eq!(
serde_json::to_value(truncated.get_rollout_items()).unwrap(),
@@ -157,8 +163,8 @@ fn fork_thread_accepts_legacy_usize_snapshot_argument() {
let _: fn(&ThreadManager, Config, std::path::PathBuf) = assert_legacy_snapshot_callsite;
}
#[test]
fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
#[tokio::test]
async fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
let items = vec![
RolloutItem::ResponseItem(user_msg("u1")),
RolloutItem::ResponseItem(assistant_msg("a1")),
@@ -182,10 +188,12 @@ fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
);
let truncated = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(items.clone()),
usize::MAX,
/*n*/ -1,
&snapshot_state,
);
)
.await;
assert_eq!(
serde_json::to_value(truncated.get_rollout_items()).unwrap(),
@@ -209,6 +217,7 @@ async fn ignores_session_prefix_messages_when_truncating() {
.collect();
let truncated = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(rollout_items),
/*n*/ 1,
&SnapshotTurnState {
@@ -216,7 +225,8 @@ async fn ignores_session_prefix_messages_when_truncating() {
active_turn_id: None,
active_turn_start_index: None,
},
);
)
.await;
let got_items = truncated.get_rollout_items();
let expected: Vec<RolloutItem> = vec![

View File

@@ -5,11 +5,15 @@
use crate::context_manager::is_user_turn_boundary;
use crate::event_mapping;
use crate::resolve_fork_reference_rollout_path;
use crate::rollout::RolloutRecorder;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::RolloutItem;
use std::path::Path;
use tracing::warn;
/// Return the indices of user message boundaries in a rollout.
///
@@ -88,21 +92,30 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec<usize
fork_turn_positions
}
/// Return the current fork boundary to persist in a `ForkReferenceItem`.
///
/// The boundary is the next user-message index in the current effective rollout. Persisting that
/// snapshot, instead of a "live tail" sentinel, prevents later parent turns from being pulled
/// into the child when the child rollout is replayed.
pub(crate) fn fork_reference_user_message_boundary(items: &[RolloutItem]) -> i64 {
i64::try_from(user_message_positions_in_rollout(items).len()).unwrap_or(i64::MAX)
}
/// Return a prefix of `items` obtained by cutting strictly before the nth user message.
///
/// The boundary index is 0-based from the start of `items` (so `n_from_start = 0` returns
/// a prefix that excludes the first user message and everything after it).
///
/// If `n_from_start` is `usize::MAX`, this returns the full rollout (no truncation).
/// If `n_from_start` is negative, this returns the full rollout (no truncation).
/// If fewer than or equal to `n_from_start` user messages exist, this returns the full
/// rollout unchanged.
pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
items: &[RolloutItem],
n_from_start: usize,
n_from_start: i64,
) -> Vec<RolloutItem> {
if n_from_start == usize::MAX {
let Ok(n_from_start) = usize::try_from(n_from_start) else {
return items.to_vec();
}
};
let user_positions = user_message_positions_in_rollout(items);
@@ -153,6 +166,85 @@ fn is_trigger_turn_boundary(item: &ResponseItem) -> bool {
.is_some_and(|communication| communication.trigger_turn)
}
/// Expand `ForkReference` items into the referenced parent rollout slices they encode.
///
/// This preserves child rollout compactness on disk while letting replay callers rebuild the
/// effective inherited transcript before reconstructing conversation history or deriving thread
/// summaries.
pub async fn materialize_rollout_items_for_replay(
codex_home: &Path,
rollout_items: &[RolloutItem],
) -> Vec<RolloutItem> {
const MAX_FORK_REFERENCE_DEPTH: usize = 8;
let mut materialized = Vec::new();
let mut stack: Vec<(Vec<RolloutItem>, usize, usize)> = vec![(rollout_items.to_vec(), 0, 0)];
while let Some((items, mut idx, depth)) = stack.pop() {
while idx < items.len() {
match &items[idx] {
RolloutItem::ForkReference(reference) => {
if depth >= MAX_FORK_REFERENCE_DEPTH {
warn!(
"skipping fork reference recursion at depth {} for {:?}",
depth, reference.rollout_path
);
materialized.push(RolloutItem::ForkReference(reference.clone()));
idx += 1;
continue;
}
let resolved_rollout_path = match resolve_fork_reference_rollout_path(
codex_home,
&reference.rollout_path,
)
.await
{
Ok(path) => path,
Err(err) => {
warn!(
"failed to resolve fork reference rollout {:?}: {err}",
reference.rollout_path
);
materialized.push(RolloutItem::ForkReference(reference.clone()));
idx += 1;
continue;
}
};
let parent_history = match RolloutRecorder::get_rollout_history(
&resolved_rollout_path,
)
.await
{
Ok(history) => history,
Err(err) => {
warn!(
"failed to load fork reference rollout {:?} (resolved from {:?}): {err}",
resolved_rollout_path, reference.rollout_path
);
materialized.push(RolloutItem::ForkReference(reference.clone()));
idx += 1;
continue;
}
};
let parent_items = truncate_rollout_before_nth_user_message_from_start(
&parent_history.get_rollout_items(),
reference.nth_user_message,
);
stack.push((items, idx + 1, depth));
stack.push((parent_items, 0, depth + 1));
break;
}
item => materialized.push(item.clone()),
}
idx += 1;
}
}
materialized
}
#[cfg(test)]
#[path = "thread_rollout_truncation_tests.rs"]
mod tests;

View File

@@ -3,9 +3,11 @@ use crate::codex::make_session_and_context;
use codex_protocol::AgentPath;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::ThreadRolledBackEvent;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
@@ -102,7 +104,8 @@ fn truncation_max_keeps_full_rollout() {
RolloutItem::ResponseItem(user_msg("u2")),
];
let truncated = truncate_rollout_before_nth_user_message_from_start(&rollout, usize::MAX);
let truncated =
truncate_rollout_before_nth_user_message_from_start(&rollout, /*n_from_start*/ -1);
assert_eq!(
serde_json::to_value(&truncated).unwrap(),
@@ -316,3 +319,25 @@ fn truncates_rollout_to_last_n_fork_turns_keeps_full_rollout_when_n_is_large() {
serde_json::to_value(&rollout).unwrap()
);
}
#[tokio::test]
async fn materialize_rollout_items_for_replay_preserves_unresolved_fork_references() {
let codex_home = TempDir::new().unwrap();
let fork_reference = RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: "missing-rollout.jsonl".into(),
nth_user_message: 1,
});
let rollout_items = vec![
RolloutItem::ResponseItem(user_msg("u1")),
fork_reference.clone(),
RolloutItem::ResponseItem(assistant_msg("a1")),
];
let materialized =
materialize_rollout_items_for_replay(codex_home.path(), &rollout_items).await;
assert_eq!(
serde_json::to_value(&materialized).unwrap(),
serde_json::to_value(&rollout_items).unwrap()
);
}

View File

@@ -2,6 +2,8 @@ use codex_core::ForkSnapshot;
use codex_core::NewThread;
use codex_core::parse_turn_item;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
@@ -19,6 +21,95 @@ use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
fn find_user_input_positions(items: &[RolloutItem]) -> Vec<usize> {
let mut pos = Vec::new();
for (i, it) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(response_item) = it
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
{
pos.push(i);
}
}
pos
}
fn truncate_before_nth_user_message(
items: &[RolloutItem],
nth_user_message: i64,
) -> Vec<RolloutItem> {
let Ok(nth_user_message) = usize::try_from(nth_user_message) else {
return items.to_vec();
};
let user_inputs = find_user_input_positions(items);
let Some(cut_idx) = user_inputs.get(nth_user_message).copied() else {
return items.to_vec();
};
items[..cut_idx].to_vec()
}
fn test_user_message(text: &str) -> RolloutItem {
RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
})
}
#[test]
fn truncate_before_nth_user_message_keeps_full_history_for_out_of_range_boundaries() {
let rollout_items = vec![test_user_message("u1"), test_user_message("u2")];
pretty_assertions::assert_eq!(
serde_json::to_value(truncate_before_nth_user_message(
&rollout_items,
/*nth_user_message*/ 2,
))
.unwrap(),
serde_json::to_value(&rollout_items).unwrap(),
);
}
#[test]
fn truncate_before_nth_user_message_keeps_full_history_for_i64_max_boundaries() {
let rollout_items = vec![test_user_message("u1"), test_user_message("u2")];
pretty_assertions::assert_eq!(
serde_json::to_value(truncate_before_nth_user_message(&rollout_items, i64::MAX,)).unwrap(),
serde_json::to_value(&rollout_items).unwrap(),
);
}
fn read_items_materialized(p: &std::path::Path) -> Vec<RolloutItem> {
let text =
std::fs::read_to_string(p).unwrap_or_else(|err| panic!("read rollout file {p:?}: {err}"));
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value =
serde_json::from_str(line).unwrap_or_else(|err| panic!("jsonl line parse: {err}"));
let rl: RolloutLine =
serde_json::from_value(v).unwrap_or_else(|err| panic!("rollout line parse: {err}"));
match rl.item {
RolloutItem::SessionMeta(_) => {}
RolloutItem::ForkReference(reference) => {
let parent_items = read_items_materialized(&reference.rollout_path);
items.extend(truncate_before_nth_user_message(
&parent_items,
reference.nth_user_message,
));
}
other => items.push(other),
}
}
items
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_thread_twice_drops_to_first_message() {
skip_if_no_network!();
@@ -64,40 +155,9 @@ async fn fork_thread_twice_drops_to_first_message() {
// GetHistory flushes before returning the path; no wait needed.
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
let text = std::fs::read_to_string(p).expect("read rollout file");
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
match rl.item {
RolloutItem::SessionMeta(_) => {}
other => items.push(other),
}
}
items
};
// Compute expected prefixes after each fork by truncating base rollout
// strictly before the nth user input (0-based).
let base_items = read_items(&base_path);
let find_user_input_positions = |items: &[RolloutItem]| -> Vec<usize> {
let mut pos = Vec::new();
for (i, it) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(response_item) = it
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
{
// Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem.
// We specifically look for input items, which are represented as ContentItem::InputText.
pos.push(i);
}
}
pos
};
let base_items = read_items_materialized(&base_path);
let user_inputs = find_user_input_positions(&base_items);
// After cutting at nth user input (n=1 → second user message), cut strictly before that input.
@@ -124,7 +184,7 @@ async fn fork_thread_twice_drops_to_first_message() {
let fork1_path = codex_fork1.rollout_path().expect("rollout path");
// GetHistory on fork1 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_items = read_items_materialized(&fork1_path);
pretty_assertions::assert_eq!(
serde_json::to_value(&fork1_items).unwrap(),
serde_json::to_value(&expected_after_first).unwrap()
@@ -147,16 +207,73 @@ async fn fork_thread_twice_drops_to_first_message() {
let fork2_path = codex_fork2.rollout_path().expect("rollout path");
// GetHistory on fork2 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_items = read_items_materialized(&fork1_path);
let fork1_user_inputs = find_user_input_positions(&fork1_items);
let cut_last_on_fork1 = fork1_user_inputs
.get(fork1_user_inputs.len().saturating_sub(1))
.copied()
.unwrap_or(0);
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
let fork2_items = read_items(&fork2_path);
let fork2_items = read_items_materialized(&fork2_path);
pretty_assertions::assert_eq!(
serde_json::to_value(&fork2_items).unwrap(),
serde_json::to_value(&expected_after_second).unwrap()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_thread_session_configured_preserves_parent_and_history() {
skip_if_no_network!();
let server = MockServer::start().await;
let sse = sse(vec![ev_response_created("resp"), ev_completed("resp")]);
let response = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse, "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(response)
.expect(1)
.mount(&server)
.await;
let mut builder = test_codex();
let test = builder.build(&server).await.expect("create conversation");
let codex = test.codex.clone();
let thread_manager = test.thread_manager.clone();
let config_for_fork = test.config.clone();
let parent_thread_id = test.session_configured.session_id;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "seed".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let base_path = codex.rollout_path().expect("rollout path");
let NewThread {
thread_id: child_thread_id,
session_configured,
..
} = thread_manager
.fork_thread(
usize::MAX,
config_for_fork,
base_path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("fork thread");
pretty_assertions::assert_eq!(session_configured.forked_from_id, Some(parent_thread_id));
assert_ne!(child_thread_id, parent_thread_id);
}

View File

@@ -147,9 +147,9 @@ fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg(&prompt2)
.arg("resume")
.arg("--last")
.arg(&prompt2)
.assert()
.success();

View File

@@ -2269,12 +2269,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.forked_from_id,
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.id),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -2564,10 +2572,45 @@ pub struct SessionMetaLine {
pub git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
pub struct ForkReferenceItem {
pub rollout_path: PathBuf,
#[serde(
deserialize_with = "deserialize_fork_reference_nth_user_message",
default
)]
pub nth_user_message: i64,
}
fn deserialize_fork_reference_nth_user_message<'de, D>(deserializer: D) -> Result<i64, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = Value::deserialize(deserializer)?;
let Value::Number(number) = value else {
return Err(serde::de::Error::custom(
"expected integer fork reference boundary",
));
};
if let Some(nth_user_message) = number.as_i64() {
return Ok(nth_user_message);
}
if number.as_u64().is_some() {
return Ok(i64::MAX);
}
Err(serde::de::Error::custom(
"expected integer fork reference boundary",
))
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
pub enum RolloutItem {
SessionMeta(SessionMetaLine),
ForkReference(ForkReferenceItem),
ResponseItem(ResponseItem),
Compacted(CompactedItem),
TurnContext(TurnContextItem),
@@ -3684,6 +3727,23 @@ mod tests {
);
}
#[test]
fn fork_reference_item_deserializes_legacy_usize_max_boundary() {
let item: ForkReferenceItem = serde_json::from_value(json!({
"rollout_path": "/tmp/rollout.jsonl",
"nth_user_message": u64::MAX,
}))
.expect("legacy fork reference item should deserialize");
assert_eq!(
item,
ForkReferenceItem {
rollout_path: PathBuf::from("/tmp/rollout.jsonl"),
nth_user_message: i64::MAX,
}
);
}
#[test]
fn session_source_restriction_product_does_not_guess_subagent_products() {
assert_eq!(

View File

@@ -36,6 +36,7 @@ pub use list::find_archived_thread_path_by_id_str;
pub use list::find_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
pub use list::resolve_fork_reference_rollout_path;
pub use list::rollout_date_parts;
pub use policy::EventPersistenceMode;
pub use recorder::RolloutRecorder;

View File

@@ -1063,6 +1063,9 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
RolloutItem::Compacted(_) => {
// Not included in `head`; skip.
}
RolloutItem::ForkReference(_) => {
// Not included in `head`; skip.
}
RolloutItem::EventMsg(ev) => {
if let EventMsg::UserMessage(user) = ev {
summary.saw_user_event = true;
@@ -1114,6 +1117,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Va
head.push(value);
}
}
RolloutItem::ForkReference(_) => {}
RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => {}
@@ -1264,6 +1268,40 @@ pub async fn find_archived_thread_path_by_id_str(
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str).await
}
/// Resolve a stored fork-reference rollout path to the current on-disk location.
///
/// Fork references persist a parent rollout filename. Archive and unarchive move that file
/// between `sessions/` and `archived_sessions/`, so stale stored paths must be repaired by
/// locating the rollout with the stable thread id embedded in the filename.
pub async fn resolve_fork_reference_rollout_path(
codex_home: &Path,
rollout_path: &Path,
) -> io::Result<PathBuf> {
match tokio::fs::try_exists(rollout_path).await {
Ok(true) => return Ok(rollout_path.to_path_buf()),
Ok(false) => {}
Err(err) => return Err(err),
}
let Some(file_name) = rollout_path.file_name().and_then(OsStr::to_str) else {
return Ok(rollout_path.to_path_buf());
};
let Some((_, thread_uuid)) = parse_timestamp_uuid_from_filename(file_name) else {
return Ok(rollout_path.to_path_buf());
};
let thread_id = thread_uuid.to_string();
if let Some(active_path) = find_thread_path_by_id_str(codex_home, &thread_id).await? {
return Ok(active_path);
}
if let Some(archived_path) = find_archived_thread_path_by_id_str(codex_home, &thread_id).await?
{
return Ok(archived_path);
}
Ok(rollout_path.to_path_buf())
}
/// Extract the `YYYY/MM/DD` directory components from a rollout filename.
pub fn rollout_date_parts(file_name: &OsStr) -> Option<(String, String, String)> {
let name = file_name.to_string_lossy();

View File

@@ -70,7 +70,8 @@ pub fn builder_from_items(
) -> Option<ThreadMetadataBuilder> {
if let Some(session_meta) = items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line),
RolloutItem::ResponseItem(_)
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
@@ -126,6 +127,7 @@ pub async fn extract_metadata_from_rollout(
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),

View File

@@ -16,9 +16,10 @@ pub fn is_persisted_response_item(item: &RolloutItem, mode: EventPersistenceMode
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode),
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
true
}
RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_) => true,
}
}

View File

@@ -567,6 +567,9 @@ impl RolloutRecorder {
RolloutItem::ResponseItem(item) => {
items.push(RolloutItem::ResponseItem(item));
}
RolloutItem::ForkReference(item) => {
items.push(RolloutItem::ForkReference(item));
}
RolloutItem::Compacted(item) => {
items.push(RolloutItem::Compacted(item));
}
@@ -593,6 +596,10 @@ impl RolloutRecorder {
Ok((items, thread_id, parse_errors))
}
/// Load a rollout for resume semantics.
///
/// This preserves the rollout's existing conversation id and rollout path, so callers must
/// not use it for true forking semantics.
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
let (items, thread_id, _parse_errors) = Self::load_rollout_items(path).await?;
let conversation_id = thread_id
@@ -610,6 +617,21 @@ impl RolloutRecorder {
}))
}
/// Load a rollout for true fork semantics.
///
/// Unlike `get_rollout_history`, this intentionally discards the source rollout's
/// conversation id so the child thread gets a fresh id and preserves `forked_from_id`.
pub async fn get_fork_history(path: &Path) -> std::io::Result<InitialHistory> {
let (items, _thread_id, _parse_errors) = Self::load_rollout_items(path).await?;
if items.is_empty() {
return Ok(InitialHistory::New);
}
info!("Loaded rollout fork history from {path:?}");
Ok(InitialHistory::Forked(items))
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
@@ -1058,6 +1080,7 @@ async fn resume_candidate_matches_cwd(
&& let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item {
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()),
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::EventMsg(_) => None,

View File

@@ -7,6 +7,8 @@ use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::UserMessageEvent;
@@ -33,7 +35,8 @@ fn test_config(codex_home: &Path) -> RolloutConfig {
fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result<PathBuf> {
let day_dir = root.join("sessions/2025/01/03");
fs::create_dir_all(&day_dir)?;
let path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl"));
let file_ts = ts.replace(':', "-");
let path = day_dir.join(format!("rollout-{file_ts}-{uuid}.jsonl"));
let mut file = File::create(&path)?;
let meta = serde_json::json!({
"timestamp": ts,
@@ -62,6 +65,47 @@ fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result<Path
Ok(path)
}
#[tokio::test]
async fn get_rollout_history_preserves_legacy_fork_reference_boundaries() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let parent_id = Uuid::new_v4();
let child_id = Uuid::new_v4();
let ts = "2025-01-03T12:00:00.000Z";
let parent_rollout_path = write_session_file(home.path(), ts, parent_id)?;
let child_rollout_path = write_session_file(home.path(), ts, child_id)?;
let mut file = fs::OpenOptions::new()
.append(true)
.open(&child_rollout_path)?;
let fork_reference_line = serde_json::json!({
"timestamp": ts,
"type": "fork_reference",
"payload": {
"rollout_path": parent_rollout_path,
"nth_user_message": u64::MAX,
},
});
writeln!(file, "{fork_reference_line}")?;
let history = RolloutRecorder::get_rollout_history(&child_rollout_path).await?;
let InitialHistory::Resumed(resumed) = history else {
panic!("expected resumed history");
};
let loaded_fork_reference = resumed.history.last().and_then(|item| match item {
RolloutItem::ForkReference(fork_reference) => Some(fork_reference),
_ => None,
});
assert_eq!(
loaded_fork_reference,
Some(&ForkReferenceItem {
rollout_path: parent_rollout_path,
nth_user_message: i64::MAX,
}),
);
Ok(())
}
#[tokio::test]
async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");

View File

@@ -22,6 +22,7 @@ pub fn apply_rollout_item(
RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx),
RolloutItem::EventMsg(event) => apply_event_msg(metadata, event),
RolloutItem::ResponseItem(item) => apply_response_item(metadata, item),
RolloutItem::ForkReference(_) => {}
RolloutItem::Compacted(_) => {}
}
if metadata.model_provider.is_empty() {
@@ -34,9 +35,10 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
match item {
RolloutItem::SessionMeta(_) | RolloutItem::TurnContext(_) => true,
RolloutItem::EventMsg(EventMsg::TokenCount(_) | EventMsg::UserMessage(_)) => true,
RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) => {
false
}
RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::ForkReference(_)
| RolloutItem::Compacted(_) => false,
}
}

View File

@@ -841,6 +841,7 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
@@ -851,6 +852,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})