mirror of
https://github.com/openai/codex.git
synced 2026-04-08 14:54:47 +00:00
Compare commits
11 Commits
dev/window
...
dev/friel/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9cef1b9415 | ||
|
|
6ccc419984 | ||
|
|
6776e3fc72 | ||
|
|
a364d0358c | ||
|
|
5dbd418beb | ||
|
|
54e51e3577 | ||
|
|
63882cfd48 | ||
|
|
bbb4fdd0a6 | ||
|
|
984961958f | ||
|
|
13bd72d9ff | ||
|
|
9b7f05c683 |
@@ -205,7 +205,9 @@ impl ThreadHistoryBuilder {
|
||||
RolloutItem::EventMsg(event) => self.handle_event(event),
|
||||
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
|
||||
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
|
||||
RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {}
|
||||
RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -212,6 +212,7 @@ use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_name_by_id;
|
||||
use codex_core::find_thread_names_by_ids;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_core::materialize_rollout_items_for_replay;
|
||||
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use codex_core::parse_cursor;
|
||||
use codex_core::plugins::MarketplaceError;
|
||||
@@ -8467,13 +8468,17 @@ pub(crate) async fn read_summary_from_rollout(
|
||||
.unwrap_or_else(|| fallback_provider.to_string());
|
||||
let git_info = git.as_ref().map(map_git_info);
|
||||
let updated_at = updated_at.or_else(|| timestamp.clone());
|
||||
let preview = read_rollout_items_from_rollout(path)
|
||||
.await
|
||||
.map(|items| preview_from_rollout_items(&items))
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(ConversationSummary {
|
||||
conversation_id: session_meta.id,
|
||||
timestamp,
|
||||
updated_at,
|
||||
path: path.to_path_buf(),
|
||||
preview: String::new(),
|
||||
preview,
|
||||
model_provider,
|
||||
cwd: session_meta.cwd,
|
||||
cli_version: session_meta.cli_version,
|
||||
@@ -8491,6 +8496,10 @@ pub(crate) async fn read_rollout_items_from_rollout(
|
||||
InitialHistory::Resumed(resumed) => resumed.history,
|
||||
};
|
||||
|
||||
if let Some(codex_home) = codex_home_from_rollout_path(path) {
|
||||
return Ok(materialize_rollout_items_for_replay(codex_home, &items).await);
|
||||
}
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
@@ -8598,6 +8607,17 @@ fn preview_from_rollout_items(items: &[RolloutItem]) -> String {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn codex_home_from_rollout_path(path: &Path) -> Option<&Path> {
|
||||
path.ancestors().find_map(|ancestor| {
|
||||
let name = ancestor.file_name().and_then(OsStr::to_str)?;
|
||||
if name == codex_core::SESSIONS_SUBDIR || name == codex_core::ARCHIVED_SESSIONS_SUBDIR {
|
||||
ancestor.parent()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn with_thread_spawn_agent_metadata(
|
||||
source: codex_protocol::protocol::SessionSource,
|
||||
agent_nickname: Option<String>,
|
||||
|
||||
@@ -7,6 +7,10 @@ use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
@@ -20,6 +24,8 @@ use codex_app_server_protocol::ThreadSetNameResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ThreadUnarchiveParams;
|
||||
use codex_app_server_protocol::ThreadUnarchiveResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
@@ -152,6 +158,150 @@ async fn thread_read_can_include_turns() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_read_include_turns_keeps_fork_history_after_parent_archive_and_unarchive()
|
||||
-> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let start_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread: parent, .. } =
|
||||
to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let turn_start_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: parent.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "parent message".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: TurnStartResponse = to_response::<TurnStartResponse>(turn_start_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: parent.id.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadForkResponse { thread: child, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
let read_child_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: child.id.clone(),
|
||||
include_turns: true,
|
||||
})
|
||||
.await?;
|
||||
let read_child_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse {
|
||||
thread: child_before_archive,
|
||||
} = to_response::<ThreadReadResponse>(read_child_resp)?;
|
||||
assert_eq!(child_before_archive.turns.len(), 1);
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent.id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let read_child_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: child.id.clone(),
|
||||
include_turns: true,
|
||||
})
|
||||
.await?;
|
||||
let read_child_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse {
|
||||
thread: child_after_archive,
|
||||
} = to_response::<ThreadReadResponse>(read_child_resp)?;
|
||||
assert_eq!(child_after_archive.turns, child_before_archive.turns);
|
||||
|
||||
let unarchive_id = mcp
|
||||
.send_thread_unarchive_request(ThreadUnarchiveParams {
|
||||
thread_id: parent.id,
|
||||
})
|
||||
.await?;
|
||||
let unarchive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(unarchive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadUnarchiveResponse = to_response::<ThreadUnarchiveResponse>(unarchive_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/unarchived"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let read_child_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: child.id,
|
||||
include_turns: true,
|
||||
})
|
||||
.await?;
|
||||
let read_child_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse {
|
||||
thread: child_after_unarchive,
|
||||
} = to_response::<ThreadReadResponse>(read_child_resp)?;
|
||||
assert_eq!(child_after_unarchive.turns, child_before_archive.turns);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_read_loaded_thread_returns_precomputed_path_before_materialization() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -14,12 +14,15 @@ use crate::session_prefix::format_subagent_context_line;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
use crate::thread_rollout_truncation::fork_reference_user_message_boundary;
|
||||
use crate::thread_rollout_truncation::materialize_rollout_items_for_replay;
|
||||
use crate::thread_rollout_truncation::truncate_rollout_to_last_n_fork_turns;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::Op;
|
||||
@@ -309,20 +312,76 @@ impl AgentControl {
|
||||
let mut forked_rollout_items = RolloutRecorder::get_rollout_history(&rollout_path)
|
||||
.await?
|
||||
.get_rollout_items();
|
||||
if let SpawnAgentForkMode::LastNTurns(last_n_turns) = fork_mode {
|
||||
forked_rollout_items =
|
||||
truncate_rollout_to_last_n_fork_turns(&forked_rollout_items, *last_n_turns);
|
||||
if forked_rollout_items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
{
|
||||
forked_rollout_items = materialize_rollout_items_for_replay(
|
||||
config.codex_home.as_path(),
|
||||
&forked_rollout_items,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
let parent_has_matching_spawn_call = forked_rollout_items.iter().any(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::ResponseItem(ResponseItem::FunctionCall {
|
||||
call_id: existing_call_id,
|
||||
..
|
||||
}) if existing_call_id == call_id
|
||||
)
|
||||
});
|
||||
match fork_mode {
|
||||
SpawnAgentForkMode::FullHistory => {
|
||||
let source_session_meta = forked_rollout_items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.clone()),
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
});
|
||||
let fork_boundary = fork_reference_user_message_boundary(&forked_rollout_items);
|
||||
forked_rollout_items = source_session_meta
|
||||
.into_iter()
|
||||
.map(RolloutItem::SessionMeta)
|
||||
.chain(std::iter::once(RolloutItem::ForkReference(
|
||||
ForkReferenceItem {
|
||||
rollout_path: rollout_path.clone(),
|
||||
nth_user_message: fork_boundary,
|
||||
},
|
||||
)))
|
||||
.collect();
|
||||
}
|
||||
SpawnAgentForkMode::LastNTurns(last_n_turns) => {
|
||||
forked_rollout_items =
|
||||
truncate_rollout_to_last_n_fork_turns(&forked_rollout_items, *last_n_turns);
|
||||
}
|
||||
}
|
||||
|
||||
let mut output =
|
||||
FunctionCallOutputPayload::from_text(FORKED_SPAWN_AGENT_OUTPUT_MESSAGE.to_string());
|
||||
output.success = Some(true);
|
||||
forked_rollout_items.push(RolloutItem::ResponseItem(
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.to_string(),
|
||||
output,
|
||||
},
|
||||
));
|
||||
let has_matching_spawn_call = match fork_mode {
|
||||
SpawnAgentForkMode::FullHistory => parent_has_matching_spawn_call,
|
||||
SpawnAgentForkMode::LastNTurns(_) => forked_rollout_items.iter().any(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::ResponseItem(ResponseItem::FunctionCall {
|
||||
call_id: existing_call_id,
|
||||
..
|
||||
}) if existing_call_id == call_id
|
||||
)
|
||||
}),
|
||||
};
|
||||
if has_matching_spawn_call {
|
||||
let mut output =
|
||||
FunctionCallOutputPayload::from_text(FORKED_SPAWN_AGENT_OUTPUT_MESSAGE.to_string());
|
||||
output.success = Some(true);
|
||||
forked_rollout_items.push(RolloutItem::ResponseItem(
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.to_string(),
|
||||
output,
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
state
|
||||
.fork_thread_with_source(
|
||||
|
||||
@@ -17,6 +17,7 @@ use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
@@ -930,6 +931,136 @@ async fn spawn_agent_fork_last_n_turns_keeps_only_recent_turns() {
|
||||
.expect("parent shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_fork_snapshots_parent_boundary_for_persisted_fork_reference() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
parent_thread
|
||||
.inject_user_message_without_turn("parent seed context".to_string())
|
||||
.await;
|
||||
let turn_context = parent_thread.codex.session.new_default_turn().await;
|
||||
let parent_spawn_call_id = "spawn-call-dedup".to_string();
|
||||
let parent_spawn_call = ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
name: "spawn_agent".to_string(),
|
||||
namespace: None,
|
||||
arguments: "{}".to_string(),
|
||||
call_id: parent_spawn_call_id.clone(),
|
||||
};
|
||||
parent_thread
|
||||
.codex
|
||||
.session
|
||||
.record_conversation_items(turn_context.as_ref(), &[parent_spawn_call])
|
||||
.await;
|
||||
parent_thread
|
||||
.codex
|
||||
.session
|
||||
.ensure_rollout_materialized()
|
||||
.await;
|
||||
parent_thread.codex.session.flush_rollout().await;
|
||||
let parent_rollout_path = parent_thread
|
||||
.rollout_path()
|
||||
.expect("parent rollout path should be available");
|
||||
|
||||
let child_thread_id = harness
|
||||
.control
|
||||
.spawn_agent_with_metadata(
|
||||
harness.config.clone(),
|
||||
text_input("child task"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
})),
|
||||
SpawnAgentOptions {
|
||||
fork_parent_spawn_call_id: Some(parent_spawn_call_id),
|
||||
fork_mode: Some(SpawnAgentForkMode::FullHistory),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("forked spawn should succeed")
|
||||
.thread_id;
|
||||
|
||||
parent_thread
|
||||
.inject_user_message_without_turn("parent late turn".to_string())
|
||||
.await;
|
||||
parent_thread
|
||||
.codex
|
||||
.session
|
||||
.ensure_rollout_materialized()
|
||||
.await;
|
||||
parent_thread.codex.session.flush_rollout().await;
|
||||
|
||||
let child_thread = harness
|
||||
.manager
|
||||
.get_thread(child_thread_id)
|
||||
.await
|
||||
.expect("child thread should be registered");
|
||||
let child_rollout_path = child_thread
|
||||
.rollout_path()
|
||||
.expect("child rollout path should be available");
|
||||
let InitialHistory::Resumed(resumed) =
|
||||
RolloutRecorder::get_rollout_history(child_rollout_path.as_path())
|
||||
.await
|
||||
.expect("child rollout should load")
|
||||
else {
|
||||
panic!("child rollout should include session metadata");
|
||||
};
|
||||
|
||||
assert_matches!(
|
||||
resumed.history.as_slice(),
|
||||
[
|
||||
RolloutItem::SessionMeta(meta_line),
|
||||
RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path,
|
||||
nth_user_message: 1,
|
||||
}),
|
||||
RolloutItem::ResponseItem(ResponseItem::FunctionCallOutput {
|
||||
call_id,
|
||||
output,
|
||||
}),
|
||||
..
|
||||
] if meta_line.meta.id == child_thread_id
|
||||
&& meta_line.meta.forked_from_id == Some(parent_thread_id)
|
||||
&& rollout_path == &parent_rollout_path
|
||||
&& call_id == "spawn-call-dedup"
|
||||
&& output.text_content() == Some(FORKED_SPAWN_AGENT_OUTPUT_MESSAGE)
|
||||
);
|
||||
let materialized_child_rollout =
|
||||
crate::rollout::truncation::materialize_rollout_items_for_replay(
|
||||
harness.config.codex_home.as_path(),
|
||||
&resumed.history,
|
||||
)
|
||||
.await;
|
||||
let materialized_child_response_items: Vec<ResponseItem> = materialized_child_rollout
|
||||
.iter()
|
||||
.filter_map(|item| match item {
|
||||
RolloutItem::ResponseItem(response_item) => Some(response_item.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
assert!(history_contains_text(
|
||||
&materialized_child_response_items,
|
||||
"parent seed context",
|
||||
));
|
||||
assert!(!history_contains_text(
|
||||
&materialized_child_response_items,
|
||||
"parent late turn",
|
||||
));
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.shutdown_live_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
let _ = parent_thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("parent shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_respects_max_threads_limit() {
|
||||
let max_threads = 1usize;
|
||||
|
||||
@@ -2182,6 +2182,11 @@ impl Session {
|
||||
state.clear_connector_selection();
|
||||
}
|
||||
|
||||
async fn set_connector_selection(&self, connector_ids: HashSet<String>) {
|
||||
self.clear_connector_selection().await;
|
||||
self.merge_connector_selection(connector_ids).await;
|
||||
}
|
||||
|
||||
async fn record_initial_history(&self, conversation_history: InitialHistory) {
|
||||
let turn_context = self.new_default_turn().await;
|
||||
let is_subagent = {
|
||||
@@ -2200,8 +2205,19 @@ impl Session {
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
let rollout_items = resumed_history.history;
|
||||
let hydrated_rollout_items = if rollout_items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
{
|
||||
self.materialize_rollout_items_for_replay(&rollout_items)
|
||||
.await
|
||||
} else {
|
||||
rollout_items.clone()
|
||||
};
|
||||
let restored_connector_selection =
|
||||
Self::extract_connector_selection_from_rollout(&hydrated_rollout_items);
|
||||
let previous_turn_settings = self
|
||||
.apply_rollout_reconstruction(&turn_context, &rollout_items)
|
||||
.apply_rollout_reconstruction(&turn_context, &hydrated_rollout_items)
|
||||
.await;
|
||||
|
||||
// If resuming, warn when the last recorded model differs from the current one.
|
||||
@@ -2226,10 +2242,13 @@ impl Session {
|
||||
|
||||
// Seed usage info from the recorded rollout so UIs can show token counts
|
||||
// immediately on resume/fork.
|
||||
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
|
||||
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_token_info(Some(info));
|
||||
}
|
||||
if let Some(selected_connectors) = restored_connector_selection {
|
||||
self.set_connector_selection(selected_connectors).await;
|
||||
}
|
||||
|
||||
// Defer seeding the session's initial context until the first turn starts so
|
||||
// turn/start overrides can be merged before we write to the rollout.
|
||||
@@ -2238,18 +2257,40 @@ impl Session {
|
||||
}
|
||||
}
|
||||
InitialHistory::Forked(rollout_items) => {
|
||||
self.apply_rollout_reconstruction(&turn_context, &rollout_items)
|
||||
let persisted_rollout_items = rollout_items
|
||||
.iter()
|
||||
.position(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
.map(|index| rollout_items[index..].to_vec());
|
||||
let hydrated_rollout_items = if rollout_items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
{
|
||||
self.materialize_rollout_items_for_replay(&rollout_items)
|
||||
.await
|
||||
} else {
|
||||
rollout_items.clone()
|
||||
};
|
||||
let restored_connector_selection =
|
||||
Self::extract_connector_selection_from_rollout(&hydrated_rollout_items);
|
||||
|
||||
self.apply_rollout_reconstruction(&turn_context, &hydrated_rollout_items)
|
||||
.await;
|
||||
|
||||
// Seed usage info from the recorded rollout so UIs can show token counts
|
||||
// immediately on resume/fork.
|
||||
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
|
||||
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_token_info(Some(info));
|
||||
}
|
||||
if let Some(selected_connectors) = restored_connector_selection {
|
||||
self.set_connector_selection(selected_connectors).await;
|
||||
}
|
||||
|
||||
// If persisting, persist all rollout items as-is (recorder filters)
|
||||
if !rollout_items.is_empty() {
|
||||
// Persist only the compact fork reference suffix so child rollouts do not
|
||||
// duplicate the full parent history they inherited in memory.
|
||||
if let Some(persisted_rollout_items) = persisted_rollout_items {
|
||||
self.persist_rollout_items(&persisted_rollout_items).await;
|
||||
} else if !rollout_items.is_empty() {
|
||||
self.persist_rollout_items(&rollout_items).await;
|
||||
}
|
||||
|
||||
@@ -2290,6 +2331,41 @@ impl Session {
|
||||
})
|
||||
}
|
||||
|
||||
fn extract_connector_selection_from_rollout(
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Option<HashSet<String>> {
|
||||
let mut active_selected_connectors: Option<HashSet<String>> = None;
|
||||
|
||||
for item in rollout_items {
|
||||
let RolloutItem::ResponseItem(response_item) = item else {
|
||||
continue;
|
||||
};
|
||||
let ResponseItem::FunctionCallOutput { output, .. } = response_item else {
|
||||
continue;
|
||||
};
|
||||
let Some(content) = output.body.to_text() else {
|
||||
continue;
|
||||
};
|
||||
let Ok(payload) = serde_json::from_str::<Value>(&content) else {
|
||||
continue;
|
||||
};
|
||||
let Some(selected_connectors) = payload
|
||||
.get("active_selected_tools")
|
||||
.and_then(Value::as_array)
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let connector_ids = selected_connectors
|
||||
.iter()
|
||||
.filter_map(Value::as_str)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<HashSet<_>>();
|
||||
active_selected_connectors = Some(connector_ids);
|
||||
}
|
||||
|
||||
active_selected_connectors
|
||||
}
|
||||
|
||||
async fn previous_turn_settings(&self) -> Option<PreviousTurnSettings> {
|
||||
let state = self.state.lock().await;
|
||||
state.previous_turn_settings()
|
||||
|
||||
@@ -84,11 +84,34 @@ fn finalize_active_segment<'a>(
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub(super) async fn materialize_rollout_items_for_replay(
|
||||
&self,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<RolloutItem> {
|
||||
let codex_home = {
|
||||
self.state
|
||||
.lock()
|
||||
.await
|
||||
.session_configuration
|
||||
.codex_home
|
||||
.clone()
|
||||
};
|
||||
crate::rollout::truncation::materialize_rollout_items_for_replay(
|
||||
codex_home.as_path(),
|
||||
rollout_items,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn reconstruct_history_from_rollout(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> RolloutReconstruction {
|
||||
let rollout_items = self
|
||||
.materialize_rollout_items_for_replay(rollout_items)
|
||||
.await;
|
||||
let rollout_items = rollout_items.as_slice();
|
||||
// Replay metadata should already match the shape of the future lazy reverse loader, even
|
||||
// while history materialization still uses an eager bridge. Scan newest-to-oldest,
|
||||
// stopping once a surviving replacement-history checkpoint and the required resume metadata
|
||||
@@ -207,7 +230,9 @@ impl Session {
|
||||
active_segment.get_or_insert_with(ActiveReplaySegment::default);
|
||||
active_segment.counts_as_user_turn |= is_user_turn_boundary(response_item);
|
||||
}
|
||||
RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => {}
|
||||
RolloutItem::EventMsg(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::SessionMeta(_) => {}
|
||||
}
|
||||
|
||||
if base_replacement_history.is_some()
|
||||
@@ -275,6 +300,7 @@ impl Session {
|
||||
history.drop_last_n_user_turns(rollback.num_turns);
|
||||
}
|
||||
RolloutItem::EventMsg(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_) => {}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,19 @@ use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::ResumedHistory;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn user_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
@@ -54,6 +62,53 @@ fn inter_agent_assistant_message(text: &str) -> ResponseItem {
|
||||
}
|
||||
}
|
||||
|
||||
fn write_rollout_items(
|
||||
root: &Path,
|
||||
thread_id: ThreadId,
|
||||
items: &[RolloutItem],
|
||||
) -> std::io::Result<PathBuf> {
|
||||
let rollout_dir = root
|
||||
.join(crate::SESSIONS_SUBDIR)
|
||||
.join("2026")
|
||||
.join("03")
|
||||
.join("05");
|
||||
std::fs::create_dir_all(&rollout_dir)?;
|
||||
let rollout_path = rollout_dir.join(format!("rollout-2026-03-05T00-00-00-{thread_id}.jsonl"));
|
||||
let session_meta_line = RolloutLine {
|
||||
timestamp: "2026-03-05T00:00:00Z".to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: thread_id,
|
||||
timestamp: "2026-03-05T00:00:00Z".to_string(),
|
||||
cwd: root.to_path_buf(),
|
||||
originator: "codex".to_string(),
|
||||
cli_version: "test".to_string(),
|
||||
source: SessionSource::Exec,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
agent_path: None,
|
||||
model_provider: Some("openai".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
forked_from_id: None,
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
};
|
||||
let mut text = format!("{}\n", serde_json::to_string(&session_meta_line).unwrap());
|
||||
for item in items {
|
||||
let line = RolloutLine {
|
||||
timestamp: "2026-03-05T00:00:01Z".to_string(),
|
||||
item: item.clone(),
|
||||
};
|
||||
text.push_str(&serde_json::to_string(&line).unwrap());
|
||||
text.push('\n');
|
||||
}
|
||||
std::fs::write(&rollout_path, text)?;
|
||||
Ok(rollout_path)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_turn_settings()
|
||||
{
|
||||
@@ -93,6 +148,125 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ
|
||||
assert!(session.reference_context_item().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconstruct_history_materializes_fork_reference_rollout_items() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let dir = TempDir::new().expect("create temp dir");
|
||||
let parent_thread_id = ThreadId::new();
|
||||
let parent_rollout_path = write_rollout_items(
|
||||
dir.path(),
|
||||
parent_thread_id,
|
||||
&[
|
||||
RolloutItem::ResponseItem(user_message("first user")),
|
||||
RolloutItem::ResponseItem(assistant_message("first reply")),
|
||||
RolloutItem::ResponseItem(user_message("second user")),
|
||||
RolloutItem::ResponseItem(assistant_message("second reply")),
|
||||
],
|
||||
)
|
||||
.expect("write parent rollout");
|
||||
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path: parent_rollout_path,
|
||||
nth_user_message: 1,
|
||||
})];
|
||||
|
||||
let reconstructed = session
|
||||
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
reconstructed.history,
|
||||
vec![user_message("first user"), assistant_message("first reply")]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_forked_materializes_fork_reference_rollout_items() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let codex_home = turn_context.config.codex_home.clone();
|
||||
let parent_thread_id = ThreadId::new();
|
||||
let parent_rollout_path = write_rollout_items(
|
||||
codex_home.as_path(),
|
||||
parent_thread_id,
|
||||
&[
|
||||
RolloutItem::ResponseItem(user_message("first user")),
|
||||
RolloutItem::ResponseItem(assistant_message("first reply")),
|
||||
RolloutItem::ResponseItem(user_message("second user")),
|
||||
RolloutItem::ResponseItem(assistant_message("second reply")),
|
||||
],
|
||||
)
|
||||
.expect("write parent rollout");
|
||||
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path: parent_rollout_path,
|
||||
nth_user_message: 1,
|
||||
})];
|
||||
|
||||
session
|
||||
.record_initial_history(InitialHistory::Forked(rollout_items))
|
||||
.await;
|
||||
|
||||
let history = session.state.lock().await.clone_history();
|
||||
assert_eq!(
|
||||
vec![user_message("first user"), assistant_message("first reply")],
|
||||
history.raw_items()
|
||||
);
|
||||
assert!(session.reference_context_item().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconstruct_history_resolves_fork_reference_after_parent_archive_and_unarchive() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let codex_home = turn_context.config.codex_home.clone();
|
||||
let parent_thread_id = ThreadId::new();
|
||||
let parent_rollout_path = write_rollout_items(
|
||||
codex_home.as_path(),
|
||||
parent_thread_id,
|
||||
&[
|
||||
RolloutItem::ResponseItem(user_message("first user")),
|
||||
RolloutItem::ResponseItem(assistant_message("first reply")),
|
||||
RolloutItem::ResponseItem(user_message("second user")),
|
||||
RolloutItem::ResponseItem(assistant_message("second reply")),
|
||||
],
|
||||
)
|
||||
.expect("write parent rollout");
|
||||
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path: parent_rollout_path.clone(),
|
||||
nth_user_message: 1,
|
||||
})];
|
||||
let expected_history = vec![user_message("first user"), assistant_message("first reply")];
|
||||
|
||||
let archived_rollout_dir = codex_home
|
||||
.join(crate::ARCHIVED_SESSIONS_SUBDIR)
|
||||
.join("2026")
|
||||
.join("03")
|
||||
.join("05");
|
||||
std::fs::create_dir_all(&archived_rollout_dir).expect("create archived rollout dir");
|
||||
let archived_rollout_path = archived_rollout_dir.join(
|
||||
parent_rollout_path
|
||||
.file_name()
|
||||
.expect("parent rollout file name"),
|
||||
);
|
||||
std::fs::rename(&parent_rollout_path, &archived_rollout_path).expect("archive parent rollout");
|
||||
|
||||
let reconstructed = session
|
||||
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
|
||||
.await;
|
||||
assert_eq!(reconstructed.history, expected_history);
|
||||
|
||||
let unarchived_rollout_dir = codex_home
|
||||
.join(crate::SESSIONS_SUBDIR)
|
||||
.join("2026")
|
||||
.join("03")
|
||||
.join("05");
|
||||
std::fs::create_dir_all(&unarchived_rollout_dir).expect("create unarchived rollout dir");
|
||||
std::fs::rename(&archived_rollout_path, &parent_rollout_path)
|
||||
.expect("unarchive parent rollout");
|
||||
|
||||
let reconstructed = session
|
||||
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
|
||||
.await;
|
||||
assert_eq!(reconstructed.history, expected_history);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lifecycle_turn_with_missing_turn_context_id()
|
||||
{
|
||||
|
||||
@@ -159,8 +159,10 @@ pub use rollout::list::parse_cursor;
|
||||
pub use rollout::list::read_head_for_summary;
|
||||
pub use rollout::list::read_session_meta_line;
|
||||
pub use rollout::policy::EventPersistenceMode;
|
||||
pub use rollout::resolve_fork_reference_rollout_path;
|
||||
pub use rollout::rollout_date_parts;
|
||||
pub use rollout::session_index::find_thread_names_by_ids;
|
||||
pub use thread_rollout_truncation::materialize_rollout_items_for_replay;
|
||||
mod function_tool;
|
||||
mod state;
|
||||
mod tasks;
|
||||
|
||||
@@ -12,6 +12,7 @@ pub use codex_rollout::find_conversation_path_by_id_str;
|
||||
pub use codex_rollout::find_thread_name_by_id;
|
||||
pub use codex_rollout::find_thread_path_by_id_str;
|
||||
pub use codex_rollout::find_thread_path_by_name_str;
|
||||
pub use codex_rollout::resolve_fork_reference_rollout_path;
|
||||
pub use codex_rollout::rollout_date_parts;
|
||||
|
||||
impl codex_rollout::RolloutConfigView for Config {
|
||||
|
||||
@@ -33,6 +33,7 @@ use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::McpServerRefreshConfig;
|
||||
use codex_protocol::protocol::Op;
|
||||
@@ -45,6 +46,7 @@ use codex_protocol::protocol::W3cTraceContext;
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
@@ -607,18 +609,22 @@ impl ThreadManager {
|
||||
S: Into<ForkSnapshot>,
|
||||
{
|
||||
let snapshot = snapshot.into();
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
// True forks must discard the source rollout's conversation id so the child gets a
|
||||
// distinct thread id and preserves `forked_from_id` in its SessionMeta. Using the
|
||||
// resume loader here silently turns a fork into an in-place resume.
|
||||
let history = RolloutRecorder::get_fork_history(&path).await?;
|
||||
let snapshot_state = snapshot_turn_state(&history);
|
||||
let history = match snapshot {
|
||||
let mut history = match snapshot {
|
||||
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => {
|
||||
truncate_before_nth_user_message(history, nth_user_message, &snapshot_state)
|
||||
truncate_before_nth_user_message(
|
||||
config.codex_home.as_path(),
|
||||
history,
|
||||
i64::try_from(nth_user_message).unwrap_or(i64::MAX),
|
||||
&snapshot_state,
|
||||
)
|
||||
.await
|
||||
}
|
||||
ForkSnapshot::Interrupted => {
|
||||
let history = match history {
|
||||
InitialHistory::New => InitialHistory::New,
|
||||
InitialHistory::Forked(history) => InitialHistory::Forked(history),
|
||||
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
|
||||
};
|
||||
if snapshot_state.ends_mid_turn {
|
||||
append_interrupted_boundary(history, snapshot_state.active_turn_id)
|
||||
} else {
|
||||
@@ -626,6 +632,33 @@ impl ThreadManager {
|
||||
}
|
||||
}
|
||||
};
|
||||
if let (
|
||||
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message),
|
||||
InitialHistory::Forked(items),
|
||||
) = (snapshot, &mut history)
|
||||
{
|
||||
let source_session_meta = items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.clone()),
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
});
|
||||
// Keep the source SessionMeta in-memory so startup can derive `forked_from_id`
|
||||
// for SessionConfigured while still persisting only the compact ForkReference
|
||||
// suffix to the child rollout on disk.
|
||||
*items = source_session_meta
|
||||
.into_iter()
|
||||
.map(RolloutItem::SessionMeta)
|
||||
.chain(std::iter::once(RolloutItem::ForkReference(
|
||||
ForkReferenceItem {
|
||||
rollout_path: path.clone(),
|
||||
nth_user_message: i64::try_from(nth_user_message).unwrap_or(i64::MAX),
|
||||
},
|
||||
)))
|
||||
.collect();
|
||||
}
|
||||
Box::pin(self.state.spawn_thread(
|
||||
config,
|
||||
history,
|
||||
@@ -918,14 +951,23 @@ impl ThreadManagerState {
|
||||
/// when the source thread is currently mid-turn they fall back to cutting
|
||||
/// before the active turn's opening boundary so the fork omits the unfinished
|
||||
/// suffix entirely.
|
||||
fn truncate_before_nth_user_message(
|
||||
async fn truncate_before_nth_user_message(
|
||||
codex_home: &Path,
|
||||
history: InitialHistory,
|
||||
n: usize,
|
||||
n: i64,
|
||||
snapshot_state: &SnapshotTurnState,
|
||||
) -> InitialHistory {
|
||||
let items: Vec<RolloutItem> = history.get_rollout_items();
|
||||
let mut items: Vec<RolloutItem> = history.get_rollout_items();
|
||||
if items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
{
|
||||
items = truncation::materialize_rollout_items_for_replay(codex_home, &items).await;
|
||||
}
|
||||
let user_positions = truncation::user_message_positions_in_rollout(&items);
|
||||
let rolled = if snapshot_state.ends_mid_turn && n >= user_positions.len() {
|
||||
let rolled = if snapshot_state.ends_mid_turn
|
||||
&& usize::try_from(n).map_or(true, |n| n >= user_positions.len())
|
||||
{
|
||||
if let Some(cut_idx) = snapshot_state
|
||||
.active_turn_start_index
|
||||
.or_else(|| user_positions.last().copied())
|
||||
|
||||
@@ -3,7 +3,6 @@ use crate::codex::make_session_and_context;
|
||||
use crate::config::test_config;
|
||||
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use crate::models_manager::manager::RefreshStrategy;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::tasks::interrupted_turn_history_marker;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
@@ -15,6 +14,7 @@ use codex_protocol::protocol::UserMessageEvent;
|
||||
use core_test_support::PathExt;
|
||||
use core_test_support::responses::mount_models_once;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tempfile::tempdir;
|
||||
use wiremock::MockServer;
|
||||
@@ -42,8 +42,8 @@ fn assistant_msg(text: &str) -> ResponseItem {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncates_before_requested_user_message() {
|
||||
#[tokio::test]
|
||||
async fn truncates_before_requested_user_message() {
|
||||
let items = [
|
||||
user_msg("u1"),
|
||||
assistant_msg("a1"),
|
||||
@@ -74,6 +74,7 @@ fn truncates_before_requested_user_message() {
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(initial),
|
||||
/*n*/ 1,
|
||||
&SnapshotTurnState {
|
||||
@@ -81,7 +82,8 @@ fn truncates_before_requested_user_message() {
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
)
|
||||
.await;
|
||||
let got_items = truncated.get_rollout_items();
|
||||
let expected_items = vec![
|
||||
RolloutItem::ResponseItem(items[0].clone()),
|
||||
@@ -99,6 +101,7 @@ fn truncates_before_requested_user_message() {
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
let truncated2 = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(initial2.clone()),
|
||||
/*n*/ 2,
|
||||
&SnapshotTurnState {
|
||||
@@ -106,15 +109,16 @@ fn truncates_before_requested_user_message() {
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
)
|
||||
.await;
|
||||
assert_eq!(
|
||||
serde_json::to_value(truncated2.get_rollout_items()).unwrap(),
|
||||
serde_json::to_value(initial2).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() {
|
||||
#[tokio::test]
|
||||
async fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() {
|
||||
let items = vec![
|
||||
RolloutItem::ResponseItem(user_msg("u1")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a1")),
|
||||
@@ -123,14 +127,16 @@ fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() {
|
||||
];
|
||||
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(items.clone()),
|
||||
usize::MAX,
|
||||
/*n*/ -1,
|
||||
&SnapshotTurnState {
|
||||
ends_mid_turn: true,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(truncated.get_rollout_items()).unwrap(),
|
||||
@@ -157,8 +163,8 @@ fn fork_thread_accepts_legacy_usize_snapshot_argument() {
|
||||
let _: fn(&ThreadManager, Config, std::path::PathBuf) = assert_legacy_snapshot_callsite;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
|
||||
#[tokio::test]
|
||||
async fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
|
||||
let items = vec![
|
||||
RolloutItem::ResponseItem(user_msg("u1")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a1")),
|
||||
@@ -182,10 +188,12 @@ fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
|
||||
);
|
||||
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(items.clone()),
|
||||
usize::MAX,
|
||||
/*n*/ -1,
|
||||
&snapshot_state,
|
||||
);
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(truncated.get_rollout_items()).unwrap(),
|
||||
@@ -209,6 +217,7 @@ async fn ignores_session_prefix_messages_when_truncating() {
|
||||
.collect();
|
||||
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(rollout_items),
|
||||
/*n*/ 1,
|
||||
&SnapshotTurnState {
|
||||
@@ -216,7 +225,8 @@ async fn ignores_session_prefix_messages_when_truncating() {
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
)
|
||||
.await;
|
||||
let got_items = truncated.get_rollout_items();
|
||||
|
||||
let expected: Vec<RolloutItem> = vec![
|
||||
|
||||
@@ -5,11 +5,15 @@
|
||||
|
||||
use crate::context_manager::is_user_turn_boundary;
|
||||
use crate::event_mapping;
|
||||
use crate::resolve_fork_reference_rollout_path;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use std::path::Path;
|
||||
use tracing::warn;
|
||||
|
||||
/// Return the indices of user message boundaries in a rollout.
|
||||
///
|
||||
@@ -88,21 +92,30 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec<usize
|
||||
fork_turn_positions
|
||||
}
|
||||
|
||||
/// Return the current fork boundary to persist in a `ForkReferenceItem`.
|
||||
///
|
||||
/// The boundary is the next user-message index in the current effective rollout. Persisting that
|
||||
/// snapshot, instead of a "live tail" sentinel, prevents later parent turns from being pulled
|
||||
/// into the child when the child rollout is replayed.
|
||||
pub(crate) fn fork_reference_user_message_boundary(items: &[RolloutItem]) -> i64 {
|
||||
i64::try_from(user_message_positions_in_rollout(items).len()).unwrap_or(i64::MAX)
|
||||
}
|
||||
|
||||
/// Return a prefix of `items` obtained by cutting strictly before the nth user message.
|
||||
///
|
||||
/// The boundary index is 0-based from the start of `items` (so `n_from_start = 0` returns
|
||||
/// a prefix that excludes the first user message and everything after it).
|
||||
///
|
||||
/// If `n_from_start` is `usize::MAX`, this returns the full rollout (no truncation).
|
||||
/// If `n_from_start` is negative, this returns the full rollout (no truncation).
|
||||
/// If fewer than or equal to `n_from_start` user messages exist, this returns the full
|
||||
/// rollout unchanged.
|
||||
pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
|
||||
items: &[RolloutItem],
|
||||
n_from_start: usize,
|
||||
n_from_start: i64,
|
||||
) -> Vec<RolloutItem> {
|
||||
if n_from_start == usize::MAX {
|
||||
let Ok(n_from_start) = usize::try_from(n_from_start) else {
|
||||
return items.to_vec();
|
||||
}
|
||||
};
|
||||
|
||||
let user_positions = user_message_positions_in_rollout(items);
|
||||
|
||||
@@ -153,6 +166,85 @@ fn is_trigger_turn_boundary(item: &ResponseItem) -> bool {
|
||||
.is_some_and(|communication| communication.trigger_turn)
|
||||
}
|
||||
|
||||
/// Expand `ForkReference` items into the referenced parent rollout slices they encode.
|
||||
///
|
||||
/// This preserves child rollout compactness on disk while letting replay callers rebuild the
|
||||
/// effective inherited transcript before reconstructing conversation history or deriving thread
|
||||
/// summaries.
|
||||
pub async fn materialize_rollout_items_for_replay(
|
||||
codex_home: &Path,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<RolloutItem> {
|
||||
const MAX_FORK_REFERENCE_DEPTH: usize = 8;
|
||||
|
||||
let mut materialized = Vec::new();
|
||||
let mut stack: Vec<(Vec<RolloutItem>, usize, usize)> = vec![(rollout_items.to_vec(), 0, 0)];
|
||||
|
||||
while let Some((items, mut idx, depth)) = stack.pop() {
|
||||
while idx < items.len() {
|
||||
match &items[idx] {
|
||||
RolloutItem::ForkReference(reference) => {
|
||||
if depth >= MAX_FORK_REFERENCE_DEPTH {
|
||||
warn!(
|
||||
"skipping fork reference recursion at depth {} for {:?}",
|
||||
depth, reference.rollout_path
|
||||
);
|
||||
materialized.push(RolloutItem::ForkReference(reference.clone()));
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let resolved_rollout_path = match resolve_fork_reference_rollout_path(
|
||||
codex_home,
|
||||
&reference.rollout_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to resolve fork reference rollout {:?}: {err}",
|
||||
reference.rollout_path
|
||||
);
|
||||
materialized.push(RolloutItem::ForkReference(reference.clone()));
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let parent_history = match RolloutRecorder::get_rollout_history(
|
||||
&resolved_rollout_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(history) => history,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to load fork reference rollout {:?} (resolved from {:?}): {err}",
|
||||
resolved_rollout_path, reference.rollout_path
|
||||
);
|
||||
materialized.push(RolloutItem::ForkReference(reference.clone()));
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let parent_items = truncate_rollout_before_nth_user_message_from_start(
|
||||
&parent_history.get_rollout_items(),
|
||||
reference.nth_user_message,
|
||||
);
|
||||
|
||||
stack.push((items, idx + 1, depth));
|
||||
stack.push((parent_items, 0, depth + 1));
|
||||
break;
|
||||
}
|
||||
item => materialized.push(item.clone()),
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
materialized
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "thread_rollout_truncation_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -3,9 +3,11 @@ use crate::codex::make_session_and_context;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
@@ -102,7 +104,8 @@ fn truncation_max_keeps_full_rollout() {
|
||||
RolloutItem::ResponseItem(user_msg("u2")),
|
||||
];
|
||||
|
||||
let truncated = truncate_rollout_before_nth_user_message_from_start(&rollout, usize::MAX);
|
||||
let truncated =
|
||||
truncate_rollout_before_nth_user_message_from_start(&rollout, /*n_from_start*/ -1);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(&truncated).unwrap(),
|
||||
@@ -316,3 +319,25 @@ fn truncates_rollout_to_last_n_fork_turns_keeps_full_rollout_when_n_is_large() {
|
||||
serde_json::to_value(&rollout).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn materialize_rollout_items_for_replay_preserves_unresolved_fork_references() {
|
||||
let codex_home = TempDir::new().unwrap();
|
||||
let fork_reference = RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path: "missing-rollout.jsonl".into(),
|
||||
nth_user_message: 1,
|
||||
});
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(user_msg("u1")),
|
||||
fork_reference.clone(),
|
||||
RolloutItem::ResponseItem(assistant_msg("a1")),
|
||||
];
|
||||
|
||||
let materialized =
|
||||
materialize_rollout_items_for_replay(codex_home.path(), &rollout_items).await;
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(&materialized).unwrap(),
|
||||
serde_json::to_value(&rollout_items).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ use codex_core::ForkSnapshot;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::parse_turn_item;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
@@ -19,6 +21,95 @@ use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
fn find_user_input_positions(items: &[RolloutItem]) -> Vec<usize> {
|
||||
let mut pos = Vec::new();
|
||||
for (i, it) in items.iter().enumerate() {
|
||||
if let RolloutItem::ResponseItem(response_item) = it
|
||||
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
|
||||
{
|
||||
pos.push(i);
|
||||
}
|
||||
}
|
||||
pos
|
||||
}
|
||||
|
||||
fn truncate_before_nth_user_message(
|
||||
items: &[RolloutItem],
|
||||
nth_user_message: i64,
|
||||
) -> Vec<RolloutItem> {
|
||||
let Ok(nth_user_message) = usize::try_from(nth_user_message) else {
|
||||
return items.to_vec();
|
||||
};
|
||||
let user_inputs = find_user_input_positions(items);
|
||||
let Some(cut_idx) = user_inputs.get(nth_user_message).copied() else {
|
||||
return items.to_vec();
|
||||
};
|
||||
items[..cut_idx].to_vec()
|
||||
}
|
||||
|
||||
fn test_user_message(text: &str) -> RolloutItem {
|
||||
RolloutItem::ResponseItem(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_before_nth_user_message_keeps_full_history_for_out_of_range_boundaries() {
|
||||
let rollout_items = vec![test_user_message("u1"), test_user_message("u2")];
|
||||
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(truncate_before_nth_user_message(
|
||||
&rollout_items,
|
||||
/*nth_user_message*/ 2,
|
||||
))
|
||||
.unwrap(),
|
||||
serde_json::to_value(&rollout_items).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_before_nth_user_message_keeps_full_history_for_i64_max_boundaries() {
|
||||
let rollout_items = vec![test_user_message("u1"), test_user_message("u2")];
|
||||
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(truncate_before_nth_user_message(&rollout_items, i64::MAX,)).unwrap(),
|
||||
serde_json::to_value(&rollout_items).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
fn read_items_materialized(p: &std::path::Path) -> Vec<RolloutItem> {
|
||||
let text =
|
||||
std::fs::read_to_string(p).unwrap_or_else(|err| panic!("read rollout file {p:?}: {err}"));
|
||||
let mut items: Vec<RolloutItem> = Vec::new();
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let v: serde_json::Value =
|
||||
serde_json::from_str(line).unwrap_or_else(|err| panic!("jsonl line parse: {err}"));
|
||||
let rl: RolloutLine =
|
||||
serde_json::from_value(v).unwrap_or_else(|err| panic!("rollout line parse: {err}"));
|
||||
match rl.item {
|
||||
RolloutItem::SessionMeta(_) => {}
|
||||
RolloutItem::ForkReference(reference) => {
|
||||
let parent_items = read_items_materialized(&reference.rollout_path);
|
||||
items.extend(truncate_before_nth_user_message(
|
||||
&parent_items,
|
||||
reference.nth_user_message,
|
||||
));
|
||||
}
|
||||
other => items.push(other),
|
||||
}
|
||||
}
|
||||
items
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn fork_thread_twice_drops_to_first_message() {
|
||||
skip_if_no_network!();
|
||||
@@ -64,40 +155,9 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
|
||||
// GetHistory flushes before returning the path; no wait needed.
|
||||
|
||||
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
|
||||
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
|
||||
let text = std::fs::read_to_string(p).expect("read rollout file");
|
||||
let mut items: Vec<RolloutItem> = Vec::new();
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
|
||||
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
|
||||
match rl.item {
|
||||
RolloutItem::SessionMeta(_) => {}
|
||||
other => items.push(other),
|
||||
}
|
||||
}
|
||||
items
|
||||
};
|
||||
|
||||
// Compute expected prefixes after each fork by truncating base rollout
|
||||
// strictly before the nth user input (0-based).
|
||||
let base_items = read_items(&base_path);
|
||||
let find_user_input_positions = |items: &[RolloutItem]| -> Vec<usize> {
|
||||
let mut pos = Vec::new();
|
||||
for (i, it) in items.iter().enumerate() {
|
||||
if let RolloutItem::ResponseItem(response_item) = it
|
||||
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
|
||||
{
|
||||
// Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem.
|
||||
// We specifically look for input items, which are represented as ContentItem::InputText.
|
||||
pos.push(i);
|
||||
}
|
||||
}
|
||||
pos
|
||||
};
|
||||
let base_items = read_items_materialized(&base_path);
|
||||
let user_inputs = find_user_input_positions(&base_items);
|
||||
|
||||
// After cutting at nth user input (n=1 → second user message), cut strictly before that input.
|
||||
@@ -124,7 +184,7 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
let fork1_path = codex_fork1.rollout_path().expect("rollout path");
|
||||
|
||||
// GetHistory on fork1 flushed; the file is ready.
|
||||
let fork1_items = read_items(&fork1_path);
|
||||
let fork1_items = read_items_materialized(&fork1_path);
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(&fork1_items).unwrap(),
|
||||
serde_json::to_value(&expected_after_first).unwrap()
|
||||
@@ -147,16 +207,73 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
|
||||
let fork2_path = codex_fork2.rollout_path().expect("rollout path");
|
||||
// GetHistory on fork2 flushed; the file is ready.
|
||||
let fork1_items = read_items(&fork1_path);
|
||||
let fork1_items = read_items_materialized(&fork1_path);
|
||||
let fork1_user_inputs = find_user_input_positions(&fork1_items);
|
||||
let cut_last_on_fork1 = fork1_user_inputs
|
||||
.get(fork1_user_inputs.len().saturating_sub(1))
|
||||
.copied()
|
||||
.unwrap_or(0);
|
||||
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
|
||||
let fork2_items = read_items(&fork2_path);
|
||||
let fork2_items = read_items_materialized(&fork2_path);
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(&fork2_items).unwrap(),
|
||||
serde_json::to_value(&expected_after_second).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn fork_thread_session_configured_preserves_parent_and_history() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = MockServer::start().await;
|
||||
let sse = sse(vec![ev_response_created("resp"), ev_completed("resp")]);
|
||||
let response = ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(sse, "text/event-stream");
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/responses"))
|
||||
.respond_with(response)
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build(&server).await.expect("create conversation");
|
||||
let codex = test.codex.clone();
|
||||
let thread_manager = test.thread_manager.clone();
|
||||
let config_for_fork = test.config.clone();
|
||||
let parent_thread_id = test.session_configured.session_id;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "seed".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let base_path = codex.rollout_path().expect("rollout path");
|
||||
|
||||
let NewThread {
|
||||
thread_id: child_thread_id,
|
||||
session_configured,
|
||||
..
|
||||
} = thread_manager
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
config_for_fork,
|
||||
base_path,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("fork thread");
|
||||
|
||||
pretty_assertions::assert_eq!(session_configured.forked_from_id, Some(parent_thread_id));
|
||||
assert_ne!(child_thread_id, parent_thread_id);
|
||||
}
|
||||
|
||||
@@ -147,9 +147,9 @@ fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-C")
|
||||
.arg(&repo_root)
|
||||
.arg(&prompt2)
|
||||
.arg("resume")
|
||||
.arg("--last")
|
||||
.arg(&prompt2)
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
|
||||
@@ -2269,12 +2269,20 @@ impl InitialHistory {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.forked_from_id,
|
||||
_ => None,
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.id),
|
||||
_ => None,
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -2564,10 +2572,45 @@ pub struct SessionMetaLine {
|
||||
pub git: Option<GitInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct ForkReferenceItem {
|
||||
pub rollout_path: PathBuf,
|
||||
#[serde(
|
||||
deserialize_with = "deserialize_fork_reference_nth_user_message",
|
||||
default
|
||||
)]
|
||||
pub nth_user_message: i64,
|
||||
}
|
||||
|
||||
fn deserialize_fork_reference_nth_user_message<'de, D>(deserializer: D) -> Result<i64, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let value = Value::deserialize(deserializer)?;
|
||||
let Value::Number(number) = value else {
|
||||
return Err(serde::de::Error::custom(
|
||||
"expected integer fork reference boundary",
|
||||
));
|
||||
};
|
||||
|
||||
if let Some(nth_user_message) = number.as_i64() {
|
||||
return Ok(nth_user_message);
|
||||
}
|
||||
|
||||
if number.as_u64().is_some() {
|
||||
return Ok(i64::MAX);
|
||||
}
|
||||
|
||||
Err(serde::de::Error::custom(
|
||||
"expected integer fork reference boundary",
|
||||
))
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
|
||||
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
|
||||
pub enum RolloutItem {
|
||||
SessionMeta(SessionMetaLine),
|
||||
ForkReference(ForkReferenceItem),
|
||||
ResponseItem(ResponseItem),
|
||||
Compacted(CompactedItem),
|
||||
TurnContext(TurnContextItem),
|
||||
@@ -3684,6 +3727,23 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fork_reference_item_deserializes_legacy_usize_max_boundary() {
|
||||
let item: ForkReferenceItem = serde_json::from_value(json!({
|
||||
"rollout_path": "/tmp/rollout.jsonl",
|
||||
"nth_user_message": u64::MAX,
|
||||
}))
|
||||
.expect("legacy fork reference item should deserialize");
|
||||
|
||||
assert_eq!(
|
||||
item,
|
||||
ForkReferenceItem {
|
||||
rollout_path: PathBuf::from("/tmp/rollout.jsonl"),
|
||||
nth_user_message: i64::MAX,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn session_source_restriction_product_does_not_guess_subagent_products() {
|
||||
assert_eq!(
|
||||
|
||||
@@ -36,6 +36,7 @@ pub use list::find_archived_thread_path_by_id_str;
|
||||
pub use list::find_thread_path_by_id_str;
|
||||
#[deprecated(note = "use find_thread_path_by_id_str")]
|
||||
pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
|
||||
pub use list::resolve_fork_reference_rollout_path;
|
||||
pub use list::rollout_date_parts;
|
||||
pub use policy::EventPersistenceMode;
|
||||
pub use recorder::RolloutRecorder;
|
||||
|
||||
@@ -1063,6 +1063,9 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
|
||||
RolloutItem::Compacted(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::ForkReference(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::EventMsg(ev) => {
|
||||
if let EventMsg::UserMessage(user) = ev {
|
||||
summary.saw_user_event = true;
|
||||
@@ -1114,6 +1117,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Va
|
||||
head.push(value);
|
||||
}
|
||||
}
|
||||
RolloutItem::ForkReference(_) => {}
|
||||
RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => {}
|
||||
@@ -1264,6 +1268,40 @@ pub async fn find_archived_thread_path_by_id_str(
|
||||
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str).await
|
||||
}
|
||||
|
||||
/// Resolve a stored fork-reference rollout path to the current on-disk location.
|
||||
///
|
||||
/// Fork references persist a parent rollout filename. Archive and unarchive move that file
|
||||
/// between `sessions/` and `archived_sessions/`, so stale stored paths must be repaired by
|
||||
/// locating the rollout with the stable thread id embedded in the filename.
|
||||
pub async fn resolve_fork_reference_rollout_path(
|
||||
codex_home: &Path,
|
||||
rollout_path: &Path,
|
||||
) -> io::Result<PathBuf> {
|
||||
match tokio::fs::try_exists(rollout_path).await {
|
||||
Ok(true) => return Ok(rollout_path.to_path_buf()),
|
||||
Ok(false) => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
|
||||
let Some(file_name) = rollout_path.file_name().and_then(OsStr::to_str) else {
|
||||
return Ok(rollout_path.to_path_buf());
|
||||
};
|
||||
let Some((_, thread_uuid)) = parse_timestamp_uuid_from_filename(file_name) else {
|
||||
return Ok(rollout_path.to_path_buf());
|
||||
};
|
||||
let thread_id = thread_uuid.to_string();
|
||||
|
||||
if let Some(active_path) = find_thread_path_by_id_str(codex_home, &thread_id).await? {
|
||||
return Ok(active_path);
|
||||
}
|
||||
if let Some(archived_path) = find_archived_thread_path_by_id_str(codex_home, &thread_id).await?
|
||||
{
|
||||
return Ok(archived_path);
|
||||
}
|
||||
|
||||
Ok(rollout_path.to_path_buf())
|
||||
}
|
||||
|
||||
/// Extract the `YYYY/MM/DD` directory components from a rollout filename.
|
||||
pub fn rollout_date_parts(file_name: &OsStr) -> Option<(String, String, String)> {
|
||||
let name = file_name.to_string_lossy();
|
||||
|
||||
@@ -70,7 +70,8 @@ pub fn builder_from_items(
|
||||
) -> Option<ThreadMetadataBuilder> {
|
||||
if let Some(session_meta) = items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line),
|
||||
RolloutItem::ResponseItem(_)
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
@@ -126,6 +127,7 @@ pub async fn extract_metadata_from_rollout(
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
|
||||
@@ -16,9 +16,10 @@ pub fn is_persisted_response_item(item: &RolloutItem, mode: EventPersistenceMode
|
||||
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
|
||||
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode),
|
||||
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
|
||||
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
|
||||
true
|
||||
}
|
||||
RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -567,6 +567,9 @@ impl RolloutRecorder {
|
||||
RolloutItem::ResponseItem(item) => {
|
||||
items.push(RolloutItem::ResponseItem(item));
|
||||
}
|
||||
RolloutItem::ForkReference(item) => {
|
||||
items.push(RolloutItem::ForkReference(item));
|
||||
}
|
||||
RolloutItem::Compacted(item) => {
|
||||
items.push(RolloutItem::Compacted(item));
|
||||
}
|
||||
@@ -593,6 +596,10 @@ impl RolloutRecorder {
|
||||
Ok((items, thread_id, parse_errors))
|
||||
}
|
||||
|
||||
/// Load a rollout for resume semantics.
|
||||
///
|
||||
/// This preserves the rollout's existing conversation id and rollout path, so callers must
|
||||
/// not use it for true forking semantics.
|
||||
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
|
||||
let (items, thread_id, _parse_errors) = Self::load_rollout_items(path).await?;
|
||||
let conversation_id = thread_id
|
||||
@@ -610,6 +617,21 @@ impl RolloutRecorder {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Load a rollout for true fork semantics.
|
||||
///
|
||||
/// Unlike `get_rollout_history`, this intentionally discards the source rollout's
|
||||
/// conversation id so the child thread gets a fresh id and preserves `forked_from_id`.
|
||||
pub async fn get_fork_history(path: &Path) -> std::io::Result<InitialHistory> {
|
||||
let (items, _thread_id, _parse_errors) = Self::load_rollout_items(path).await?;
|
||||
|
||||
if items.is_empty() {
|
||||
return Ok(InitialHistory::New);
|
||||
}
|
||||
|
||||
info!("Loaded rollout fork history from {path:?}");
|
||||
Ok(InitialHistory::Forked(items))
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) -> std::io::Result<()> {
|
||||
let (tx_done, rx_done) = oneshot::channel();
|
||||
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
|
||||
@@ -1058,6 +1080,7 @@ async fn resume_candidate_matches_cwd(
|
||||
&& let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()),
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
|
||||
@@ -7,6 +7,8 @@ use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
@@ -33,7 +35,8 @@ fn test_config(codex_home: &Path) -> RolloutConfig {
|
||||
fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result<PathBuf> {
|
||||
let day_dir = root.join("sessions/2025/01/03");
|
||||
fs::create_dir_all(&day_dir)?;
|
||||
let path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl"));
|
||||
let file_ts = ts.replace(':', "-");
|
||||
let path = day_dir.join(format!("rollout-{file_ts}-{uuid}.jsonl"));
|
||||
let mut file = File::create(&path)?;
|
||||
let meta = serde_json::json!({
|
||||
"timestamp": ts,
|
||||
@@ -62,6 +65,47 @@ fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result<Path
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_rollout_history_preserves_legacy_fork_reference_boundaries() -> std::io::Result<()> {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
let parent_id = Uuid::new_v4();
|
||||
let child_id = Uuid::new_v4();
|
||||
let ts = "2025-01-03T12:00:00.000Z";
|
||||
let parent_rollout_path = write_session_file(home.path(), ts, parent_id)?;
|
||||
let child_rollout_path = write_session_file(home.path(), ts, child_id)?;
|
||||
|
||||
let mut file = fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&child_rollout_path)?;
|
||||
let fork_reference_line = serde_json::json!({
|
||||
"timestamp": ts,
|
||||
"type": "fork_reference",
|
||||
"payload": {
|
||||
"rollout_path": parent_rollout_path,
|
||||
"nth_user_message": u64::MAX,
|
||||
},
|
||||
});
|
||||
writeln!(file, "{fork_reference_line}")?;
|
||||
|
||||
let history = RolloutRecorder::get_rollout_history(&child_rollout_path).await?;
|
||||
let InitialHistory::Resumed(resumed) = history else {
|
||||
panic!("expected resumed history");
|
||||
};
|
||||
|
||||
let loaded_fork_reference = resumed.history.last().and_then(|item| match item {
|
||||
RolloutItem::ForkReference(fork_reference) => Some(fork_reference),
|
||||
_ => None,
|
||||
});
|
||||
assert_eq!(
|
||||
loaded_fork_reference,
|
||||
Some(&ForkReferenceItem {
|
||||
rollout_path: parent_rollout_path,
|
||||
nth_user_message: i64::MAX,
|
||||
}),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<()> {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
|
||||
@@ -22,6 +22,7 @@ pub fn apply_rollout_item(
|
||||
RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx),
|
||||
RolloutItem::EventMsg(event) => apply_event_msg(metadata, event),
|
||||
RolloutItem::ResponseItem(item) => apply_response_item(metadata, item),
|
||||
RolloutItem::ForkReference(_) => {}
|
||||
RolloutItem::Compacted(_) => {}
|
||||
}
|
||||
if metadata.model_provider.is_empty() {
|
||||
@@ -34,9 +35,10 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
|
||||
match item {
|
||||
RolloutItem::SessionMeta(_) | RolloutItem::TurnContext(_) => true,
|
||||
RolloutItem::EventMsg(EventMsg::TokenCount(_) | EventMsg::UserMessage(_)) => true,
|
||||
RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) => {
|
||||
false
|
||||
}
|
||||
RolloutItem::EventMsg(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::Compacted(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -841,6 +841,7 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
@@ -851,6 +852,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user