mirror of
https://github.com/openai/codex.git
synced 2026-03-06 22:53:23 +00:00
Compare commits
3 Commits
dev/cc/fix
...
dev/friel/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b159523329 | ||
|
|
4b1cb360a6 | ||
|
|
d561670c80 |
@@ -183,6 +183,7 @@ impl ThreadHistoryBuilder {
|
||||
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
|
||||
RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,6 +202,7 @@ use codex_core::plugins::PluginInstallError as CorePluginInstallError;
|
||||
use codex_core::plugins::PluginInstallRequest;
|
||||
use codex_core::read_head_for_summary;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::resolve_fork_reference_rollout_path;
|
||||
use codex_core::rollout_date_parts;
|
||||
use codex_core::sandboxing::SandboxPermissions;
|
||||
use codex_core::skills::remote::export_remote_skill;
|
||||
@@ -6980,13 +6981,17 @@ pub(crate) async fn read_summary_from_rollout(
|
||||
.unwrap_or_else(|| fallback_provider.to_string());
|
||||
let git_info = git.as_ref().map(map_git_info);
|
||||
let updated_at = updated_at.or_else(|| timestamp.clone());
|
||||
let preview = read_rollout_items_from_rollout(path)
|
||||
.await
|
||||
.map(|items| preview_from_rollout_items(&items))
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(ConversationSummary {
|
||||
conversation_id: session_meta.id,
|
||||
timestamp,
|
||||
updated_at,
|
||||
path: path.to_path_buf(),
|
||||
preview: String::new(),
|
||||
preview,
|
||||
model_provider,
|
||||
cwd: session_meta.cwd,
|
||||
cli_version: session_meta.cli_version,
|
||||
@@ -7004,7 +7009,7 @@ pub(crate) async fn read_rollout_items_from_rollout(
|
||||
InitialHistory::Resumed(resumed) => resumed.history,
|
||||
};
|
||||
|
||||
Ok(items)
|
||||
Ok(materialize_rollout_items_for_replay(codex_home_from_rollout_path(path), &items).await)
|
||||
}
|
||||
|
||||
fn extract_conversation_summary(
|
||||
@@ -7105,6 +7110,137 @@ fn preview_from_rollout_items(items: &[RolloutItem]) -> String {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn user_message_positions_in_rollout(items: &[RolloutItem]) -> Vec<usize> {
|
||||
let mut user_positions = Vec::new();
|
||||
for (idx, item) in items.iter().enumerate() {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(item)
|
||||
if matches!(
|
||||
codex_core::parse_turn_item(item),
|
||||
Some(TurnItem::UserMessage(_))
|
||||
) =>
|
||||
{
|
||||
user_positions.push(idx);
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
|
||||
let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX);
|
||||
let new_len = user_positions.len().saturating_sub(num_turns);
|
||||
user_positions.truncate(new_len);
|
||||
}
|
||||
RolloutItem::ResponseItem(_) => {}
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => {}
|
||||
}
|
||||
}
|
||||
user_positions
|
||||
}
|
||||
|
||||
fn truncate_rollout_before_nth_user_message_from_start(
|
||||
items: &[RolloutItem],
|
||||
n_from_start: usize,
|
||||
) -> Vec<RolloutItem> {
|
||||
if n_from_start == usize::MAX {
|
||||
return items.to_vec();
|
||||
}
|
||||
|
||||
let user_positions = user_message_positions_in_rollout(items);
|
||||
if user_positions.len() <= n_from_start {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let cut_idx = user_positions[n_from_start];
|
||||
items[..cut_idx].to_vec()
|
||||
}
|
||||
|
||||
fn codex_home_from_rollout_path(path: &Path) -> Option<&Path> {
|
||||
path.ancestors().find_map(|ancestor| {
|
||||
let name = ancestor.file_name().and_then(OsStr::to_str)?;
|
||||
if name == codex_core::SESSIONS_SUBDIR || name == codex_core::ARCHIVED_SESSIONS_SUBDIR {
|
||||
ancestor.parent()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn materialize_rollout_items_for_replay(
|
||||
codex_home: Option<&Path>,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<RolloutItem> {
|
||||
const MAX_FORK_REFERENCE_DEPTH: usize = 8;
|
||||
|
||||
let mut materialized = Vec::new();
|
||||
let mut stack: Vec<(Vec<RolloutItem>, usize, usize)> = vec![(rollout_items.to_vec(), 0, 0)];
|
||||
|
||||
while let Some((items, mut idx, depth)) = stack.pop() {
|
||||
while idx < items.len() {
|
||||
match &items[idx] {
|
||||
RolloutItem::ForkReference(reference) => {
|
||||
if depth >= MAX_FORK_REFERENCE_DEPTH {
|
||||
warn!(
|
||||
"skipping fork reference recursion at depth {} for {:?}",
|
||||
depth, reference.rollout_path
|
||||
);
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let resolved_rollout_path = if let Some(codex_home) = codex_home {
|
||||
match resolve_fork_reference_rollout_path(
|
||||
codex_home,
|
||||
&reference.rollout_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to resolve fork reference rollout {:?}: {err}",
|
||||
reference.rollout_path
|
||||
);
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
reference.rollout_path.clone()
|
||||
};
|
||||
let parent_history = match RolloutRecorder::get_rollout_history(
|
||||
&resolved_rollout_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(history) => history,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to load fork reference rollout {:?} (resolved from {:?}): {err}",
|
||||
resolved_rollout_path, reference.rollout_path
|
||||
);
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let parent_items = truncate_rollout_before_nth_user_message_from_start(
|
||||
&parent_history.get_rollout_items(),
|
||||
reference.nth_user_message,
|
||||
);
|
||||
|
||||
stack.push((items, idx + 1, depth));
|
||||
stack.push((parent_items, 0, depth + 1));
|
||||
break;
|
||||
}
|
||||
item => materialized.push(item.clone()),
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
materialized
|
||||
}
|
||||
|
||||
fn with_thread_spawn_agent_metadata(
|
||||
source: codex_protocol::protocol::SessionSource,
|
||||
agent_nickname: Option<String>,
|
||||
|
||||
@@ -7,6 +7,10 @@ use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
@@ -20,6 +24,8 @@ use codex_app_server_protocol::ThreadSetNameResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ThreadUnarchiveParams;
|
||||
use codex_app_server_protocol::ThreadUnarchiveResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
@@ -152,6 +158,150 @@ async fn thread_read_can_include_turns() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_read_include_turns_keeps_fork_history_after_parent_archive_and_unarchive()
|
||||
-> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let start_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread: parent, .. } =
|
||||
to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let turn_start_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: parent.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "parent message".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: TurnStartResponse = to_response::<TurnStartResponse>(turn_start_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: parent.id.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadForkResponse { thread: child, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
let read_child_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: child.id.clone(),
|
||||
include_turns: true,
|
||||
})
|
||||
.await?;
|
||||
let read_child_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse {
|
||||
thread: child_before_archive,
|
||||
} = to_response::<ThreadReadResponse>(read_child_resp)?;
|
||||
assert_eq!(child_before_archive.turns.len(), 1);
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent.id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let read_child_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: child.id.clone(),
|
||||
include_turns: true,
|
||||
})
|
||||
.await?;
|
||||
let read_child_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse {
|
||||
thread: child_after_archive,
|
||||
} = to_response::<ThreadReadResponse>(read_child_resp)?;
|
||||
assert_eq!(child_after_archive.turns, child_before_archive.turns);
|
||||
|
||||
let unarchive_id = mcp
|
||||
.send_thread_unarchive_request(ThreadUnarchiveParams {
|
||||
thread_id: parent.id,
|
||||
})
|
||||
.await?;
|
||||
let unarchive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(unarchive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadUnarchiveResponse = to_response::<ThreadUnarchiveResponse>(unarchive_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/unarchived"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let read_child_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: child.id,
|
||||
include_turns: true,
|
||||
})
|
||||
.await?;
|
||||
let read_child_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse {
|
||||
thread: child_after_unarchive,
|
||||
} = to_response::<ThreadReadResponse>(read_child_resp)?;
|
||||
assert_eq!(child_after_unarchive.turns, child_before_archive.turns);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_read_loaded_thread_returns_precomputed_path_before_materialization() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::thread_manager::ThreadManagerState;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
@@ -169,6 +170,21 @@ impl AgentControl {
|
||||
RolloutRecorder::get_rollout_history(&rollout_path)
|
||||
.await?
|
||||
.get_rollout_items();
|
||||
if forked_rollout_items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
{
|
||||
forked_rollout_items =
|
||||
crate::rollout::truncation::materialize_rollout_items_for_replay(
|
||||
config.codex_home.as_path(),
|
||||
&forked_rollout_items,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
forked_rollout_items.push(RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path: rollout_path.clone(),
|
||||
nth_user_message: usize::MAX,
|
||||
}));
|
||||
let mut output = FunctionCallOutputPayload::from_text(
|
||||
FORKED_SPAWN_AGENT_OUTPUT_MESSAGE.to_string(),
|
||||
);
|
||||
@@ -1102,6 +1118,117 @@ mod tests {
|
||||
.expect("parent shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_fork_persists_fork_reference_instead_of_parent_history() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
parent_thread
|
||||
.inject_user_message_without_turn("parent seed context".to_string())
|
||||
.await;
|
||||
let turn_context = parent_thread.codex.session.new_default_turn().await;
|
||||
let parent_spawn_call_id = "spawn-call-dedup".to_string();
|
||||
let parent_spawn_call = ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
name: "spawn_agent".to_string(),
|
||||
arguments: "{}".to_string(),
|
||||
call_id: parent_spawn_call_id.clone(),
|
||||
};
|
||||
parent_thread
|
||||
.codex
|
||||
.session
|
||||
.record_conversation_items(turn_context.as_ref(), &[parent_spawn_call])
|
||||
.await;
|
||||
parent_thread
|
||||
.codex
|
||||
.session
|
||||
.ensure_rollout_materialized()
|
||||
.await;
|
||||
parent_thread.codex.session.flush_rollout().await;
|
||||
let parent_rollout_path = parent_thread
|
||||
.rollout_path()
|
||||
.expect("parent rollout path should be available");
|
||||
|
||||
let child_thread_id = harness
|
||||
.control
|
||||
.spawn_agent_with_options(
|
||||
harness.config.clone(),
|
||||
text_input("child task"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
})),
|
||||
SpawnAgentOptions {
|
||||
fork_parent_spawn_call_id: Some(parent_spawn_call_id),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("forked spawn should succeed");
|
||||
|
||||
let child_thread = harness
|
||||
.manager
|
||||
.get_thread(child_thread_id)
|
||||
.await
|
||||
.expect("child thread should be registered");
|
||||
let child_rollout_path = child_thread
|
||||
.rollout_path()
|
||||
.expect("child rollout path should be available");
|
||||
let InitialHistory::Resumed(resumed) =
|
||||
RolloutRecorder::get_rollout_history(child_rollout_path.as_path())
|
||||
.await
|
||||
.expect("child rollout should load")
|
||||
else {
|
||||
panic!("child rollout should include session metadata");
|
||||
};
|
||||
|
||||
assert!(
|
||||
resumed.history.iter().any(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path,
|
||||
nth_user_message,
|
||||
}) if rollout_path == &parent_rollout_path && *nth_user_message == usize::MAX
|
||||
)
|
||||
}),
|
||||
"child rollout should persist a fork reference to the parent rollout"
|
||||
);
|
||||
|
||||
let raw_response_items: Vec<ResponseItem> = resumed
|
||||
.history
|
||||
.iter()
|
||||
.filter_map(|item| match item {
|
||||
RolloutItem::ResponseItem(response_item) => Some(response_item.clone()),
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
.collect();
|
||||
assert!(
|
||||
!history_contains_text(&raw_response_items, "parent seed context"),
|
||||
"child rollout should not duplicate the parent's raw transcript"
|
||||
);
|
||||
|
||||
let history = child_thread.codex.session.clone_history().await;
|
||||
assert!(history_contains_text(
|
||||
history.raw_items(),
|
||||
"parent seed context"
|
||||
));
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.shutdown_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
let _ = parent_thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("parent shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_respects_max_threads_limit() {
|
||||
let max_threads = 1usize;
|
||||
|
||||
@@ -1826,8 +1826,17 @@ impl Session {
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
let rollout_items = resumed_history.history;
|
||||
let hydrated_rollout_items = if rollout_items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
{
|
||||
self.materialize_rollout_items_for_replay(&rollout_items)
|
||||
.await
|
||||
} else {
|
||||
rollout_items.clone()
|
||||
};
|
||||
let restored_tool_selection =
|
||||
Self::extract_mcp_tool_selection_from_rollout(&rollout_items);
|
||||
Self::extract_mcp_tool_selection_from_rollout(&hydrated_rollout_items);
|
||||
|
||||
let reconstructed_rollout = self
|
||||
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
|
||||
@@ -1869,7 +1878,7 @@ impl Session {
|
||||
|
||||
// Seed usage info from the recorded rollout so UIs can show token counts
|
||||
// immediately on resume/fork.
|
||||
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
|
||||
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_token_info(Some(info));
|
||||
}
|
||||
@@ -1884,11 +1893,24 @@ impl Session {
|
||||
}
|
||||
}
|
||||
InitialHistory::Forked(rollout_items) => {
|
||||
let persisted_rollout_items = rollout_items
|
||||
.iter()
|
||||
.position(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
.map(|index| rollout_items[index..].to_vec());
|
||||
let hydrated_rollout_items = if rollout_items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
{
|
||||
self.materialize_rollout_items_for_replay(&rollout_items)
|
||||
.await
|
||||
} else {
|
||||
rollout_items.clone()
|
||||
};
|
||||
let restored_tool_selection =
|
||||
Self::extract_mcp_tool_selection_from_rollout(&rollout_items);
|
||||
Self::extract_mcp_tool_selection_from_rollout(&hydrated_rollout_items);
|
||||
|
||||
let reconstructed_rollout = self
|
||||
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
|
||||
.reconstruct_history_from_rollout(&turn_context, &hydrated_rollout_items)
|
||||
.await;
|
||||
self.set_previous_turn_settings(
|
||||
reconstructed_rollout.previous_turn_settings.clone(),
|
||||
@@ -1910,7 +1932,7 @@ impl Session {
|
||||
|
||||
// Seed usage info from the recorded rollout so UIs can show token counts
|
||||
// immediately on resume/fork.
|
||||
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
|
||||
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_token_info(Some(info));
|
||||
}
|
||||
@@ -1918,8 +1940,11 @@ impl Session {
|
||||
self.set_mcp_tool_selection(selected_tools).await;
|
||||
}
|
||||
|
||||
// If persisting, persist all rollout items as-is (recorder filters)
|
||||
if !rollout_items.is_empty() {
|
||||
// Persist only the compact fork reference suffix so child rollouts do not
|
||||
// duplicate the full parent history they inherited in memory.
|
||||
if let Some(persisted_rollout_items) = persisted_rollout_items {
|
||||
self.persist_rollout_items(&persisted_rollout_items).await;
|
||||
} else if !rollout_items.is_empty() {
|
||||
self.persist_rollout_items(&rollout_items).await;
|
||||
}
|
||||
|
||||
|
||||
@@ -83,11 +83,34 @@ fn finalize_active_segment<'a>(
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub(super) async fn materialize_rollout_items_for_replay(
|
||||
&self,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<RolloutItem> {
|
||||
let codex_home = {
|
||||
self.state
|
||||
.lock()
|
||||
.await
|
||||
.session_configuration
|
||||
.codex_home
|
||||
.clone()
|
||||
};
|
||||
crate::rollout::truncation::materialize_rollout_items_for_replay(
|
||||
codex_home.as_path(),
|
||||
rollout_items,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn reconstruct_history_from_rollout(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> RolloutReconstruction {
|
||||
let rollout_items = self
|
||||
.materialize_rollout_items_for_replay(rollout_items)
|
||||
.await;
|
||||
let rollout_items = rollout_items.as_slice();
|
||||
// Replay metadata should already match the shape of the future lazy reverse loader, even
|
||||
// while history materialization still uses an eager bridge. Scan newest-to-oldest,
|
||||
// stopping once a surviving replacement-history checkpoint and the required resume metadata
|
||||
@@ -203,6 +226,7 @@ impl Session {
|
||||
}
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::EventMsg(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::SessionMeta(_) => {}
|
||||
}
|
||||
|
||||
@@ -271,6 +295,7 @@ impl Session {
|
||||
history.drop_last_n_user_turns(rollback.num_turns);
|
||||
}
|
||||
RolloutItem::EventMsg(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_) => {}
|
||||
}
|
||||
|
||||
@@ -6,8 +6,16 @@ use crate::protocol::ResumedHistory;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn user_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
@@ -33,6 +41,52 @@ fn assistant_message(text: &str) -> ResponseItem {
|
||||
}
|
||||
}
|
||||
|
||||
fn write_rollout_items(
|
||||
root: &Path,
|
||||
thread_id: ThreadId,
|
||||
items: &[RolloutItem],
|
||||
) -> std::io::Result<PathBuf> {
|
||||
let rollout_dir = root
|
||||
.join(crate::SESSIONS_SUBDIR)
|
||||
.join("2026")
|
||||
.join("03")
|
||||
.join("05");
|
||||
std::fs::create_dir_all(&rollout_dir)?;
|
||||
let rollout_path = rollout_dir.join(format!("rollout-2026-03-05T00-00-00-{thread_id}.jsonl"));
|
||||
let session_meta_line = RolloutLine {
|
||||
timestamp: "2026-03-05T00:00:00Z".to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: thread_id,
|
||||
timestamp: "2026-03-05T00:00:00Z".to_string(),
|
||||
cwd: root.to_path_buf(),
|
||||
originator: "codex".to_string(),
|
||||
cli_version: "test".to_string(),
|
||||
source: SessionSource::Exec,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
model_provider: Some("openai".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
forked_from_id: None,
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
};
|
||||
let mut text = format!("{}\n", serde_json::to_string(&session_meta_line).unwrap());
|
||||
for item in items {
|
||||
let line = RolloutLine {
|
||||
timestamp: "2026-03-05T00:00:01Z".to_string(),
|
||||
item: item.clone(),
|
||||
};
|
||||
text.push_str(&serde_json::to_string(&line).unwrap());
|
||||
text.push('\n');
|
||||
}
|
||||
std::fs::write(&rollout_path, text)?;
|
||||
Ok(rollout_path)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_turn_settings()
|
||||
{
|
||||
@@ -71,6 +125,129 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ
|
||||
assert!(session.reference_context_item().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconstruct_history_materializes_fork_reference_rollout_items() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let dir = TempDir::new().expect("create temp dir");
|
||||
let parent_thread_id = ThreadId::new();
|
||||
let parent_rollout_path = write_rollout_items(
|
||||
dir.path(),
|
||||
parent_thread_id,
|
||||
&[
|
||||
RolloutItem::ResponseItem(user_message("first user")),
|
||||
RolloutItem::ResponseItem(assistant_message("first reply")),
|
||||
RolloutItem::ResponseItem(user_message("second user")),
|
||||
RolloutItem::ResponseItem(assistant_message("second reply")),
|
||||
],
|
||||
)
|
||||
.expect("write parent rollout");
|
||||
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path: parent_rollout_path,
|
||||
nth_user_message: 1,
|
||||
})];
|
||||
|
||||
let reconstructed = session
|
||||
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
reconstructed.history,
|
||||
vec![user_message("first user"), assistant_message("first reply")]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_forked_materializes_fork_reference_rollout_items() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let codex_home = turn_context.config.codex_home.clone();
|
||||
let parent_thread_id = ThreadId::new();
|
||||
let parent_rollout_path = write_rollout_items(
|
||||
codex_home.as_path(),
|
||||
parent_thread_id,
|
||||
&[
|
||||
RolloutItem::ResponseItem(user_message("first user")),
|
||||
RolloutItem::ResponseItem(assistant_message("first reply")),
|
||||
RolloutItem::ResponseItem(user_message("second user")),
|
||||
RolloutItem::ResponseItem(assistant_message("second reply")),
|
||||
],
|
||||
)
|
||||
.expect("write parent rollout");
|
||||
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path: parent_rollout_path,
|
||||
nth_user_message: 1,
|
||||
})];
|
||||
|
||||
session
|
||||
.record_initial_history(InitialHistory::Forked(rollout_items))
|
||||
.await;
|
||||
|
||||
let reconstruction_turn = session.new_default_turn().await;
|
||||
let mut expected = vec![user_message("first user"), assistant_message("first reply")];
|
||||
expected.extend(
|
||||
session
|
||||
.build_initial_context(reconstruction_turn.as_ref())
|
||||
.await,
|
||||
);
|
||||
|
||||
let history = session.state.lock().await.clone_history();
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconstruct_history_resolves_fork_reference_after_parent_archive_and_unarchive() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let codex_home = turn_context.config.codex_home.clone();
|
||||
let parent_thread_id = ThreadId::new();
|
||||
let parent_rollout_path = write_rollout_items(
|
||||
codex_home.as_path(),
|
||||
parent_thread_id,
|
||||
&[
|
||||
RolloutItem::ResponseItem(user_message("first user")),
|
||||
RolloutItem::ResponseItem(assistant_message("first reply")),
|
||||
RolloutItem::ResponseItem(user_message("second user")),
|
||||
RolloutItem::ResponseItem(assistant_message("second reply")),
|
||||
],
|
||||
)
|
||||
.expect("write parent rollout");
|
||||
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path: parent_rollout_path.clone(),
|
||||
nth_user_message: 1,
|
||||
})];
|
||||
let expected_history = vec![user_message("first user"), assistant_message("first reply")];
|
||||
|
||||
let archived_rollout_dir = codex_home
|
||||
.join(crate::ARCHIVED_SESSIONS_SUBDIR)
|
||||
.join("2026")
|
||||
.join("03")
|
||||
.join("05");
|
||||
std::fs::create_dir_all(&archived_rollout_dir).expect("create archived rollout dir");
|
||||
let archived_rollout_path = archived_rollout_dir.join(
|
||||
parent_rollout_path
|
||||
.file_name()
|
||||
.expect("parent rollout file name"),
|
||||
);
|
||||
std::fs::rename(&parent_rollout_path, &archived_rollout_path).expect("archive parent rollout");
|
||||
|
||||
let reconstructed = session
|
||||
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
|
||||
.await;
|
||||
assert_eq!(reconstructed.history, expected_history);
|
||||
|
||||
let unarchived_rollout_dir = codex_home
|
||||
.join(crate::SESSIONS_SUBDIR)
|
||||
.join("2026")
|
||||
.join("03")
|
||||
.join("05");
|
||||
std::fs::create_dir_all(&unarchived_rollout_dir).expect("create unarchived rollout dir");
|
||||
std::fs::rename(&archived_rollout_path, &parent_rollout_path)
|
||||
.expect("unarchive parent rollout");
|
||||
|
||||
let reconstructed = session
|
||||
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
|
||||
.await;
|
||||
assert_eq!(reconstructed.history, expected_history);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lifecycle_turn_with_missing_turn_context_id()
|
||||
{
|
||||
|
||||
@@ -131,6 +131,7 @@ pub use rollout::list::parse_cursor;
|
||||
pub use rollout::list::read_head_for_summary;
|
||||
pub use rollout::list::read_session_meta_line;
|
||||
pub use rollout::policy::EventPersistenceMode;
|
||||
pub use rollout::resolve_fork_reference_rollout_path;
|
||||
pub use rollout::rollout_date_parts;
|
||||
pub use rollout::session_index::find_thread_names_by_ids;
|
||||
mod function_tool;
|
||||
|
||||
@@ -1061,6 +1061,9 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
|
||||
RolloutItem::Compacted(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::ForkReference(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::EventMsg(ev) => {
|
||||
if let EventMsg::UserMessage(user) = ev {
|
||||
summary.saw_user_event = true;
|
||||
@@ -1112,6 +1115,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Va
|
||||
head.push(value);
|
||||
}
|
||||
}
|
||||
RolloutItem::ForkReference(_) => {}
|
||||
RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => {}
|
||||
@@ -1258,6 +1262,40 @@ pub async fn find_archived_thread_path_by_id_str(
|
||||
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str).await
|
||||
}
|
||||
|
||||
/// Resolve a stored fork-reference rollout path to the current on-disk location.
|
||||
///
|
||||
/// Fork references persist a parent rollout filename. Archive and unarchive move that file
|
||||
/// between `sessions/` and `archived_sessions/`, so stale stored paths must be repaired by
|
||||
/// locating the rollout with the stable thread id embedded in the filename.
|
||||
pub async fn resolve_fork_reference_rollout_path(
|
||||
codex_home: &Path,
|
||||
rollout_path: &Path,
|
||||
) -> io::Result<PathBuf> {
|
||||
match tokio::fs::try_exists(rollout_path).await {
|
||||
Ok(true) => return Ok(rollout_path.to_path_buf()),
|
||||
Ok(false) => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
|
||||
let Some(file_name) = rollout_path.file_name().and_then(OsStr::to_str) else {
|
||||
return Ok(rollout_path.to_path_buf());
|
||||
};
|
||||
let Some((_, thread_uuid)) = parse_timestamp_uuid_from_filename(file_name) else {
|
||||
return Ok(rollout_path.to_path_buf());
|
||||
};
|
||||
let thread_id = thread_uuid.to_string();
|
||||
|
||||
if let Some(active_path) = find_thread_path_by_id_str(codex_home, &thread_id).await? {
|
||||
return Ok(active_path);
|
||||
}
|
||||
if let Some(archived_path) = find_archived_thread_path_by_id_str(codex_home, &thread_id).await?
|
||||
{
|
||||
return Ok(archived_path);
|
||||
}
|
||||
|
||||
Ok(rollout_path.to_path_buf())
|
||||
}
|
||||
|
||||
/// Extract the `YYYY/MM/DD` directory components from a rollout filename.
|
||||
pub fn rollout_date_parts(file_name: &OsStr) -> Option<(String, String, String)> {
|
||||
let name = file_name.to_string_lossy();
|
||||
|
||||
@@ -68,7 +68,8 @@ pub(crate) fn builder_from_items(
|
||||
) -> Option<ThreadMetadataBuilder> {
|
||||
if let Some(session_meta) = items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line),
|
||||
RolloutItem::ResponseItem(_)
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
@@ -134,6 +135,7 @@ pub(crate) async fn extract_metadata_from_rollout(
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
|
||||
@@ -21,6 +21,7 @@ pub use list::find_archived_thread_path_by_id_str;
|
||||
pub use list::find_thread_path_by_id_str;
|
||||
#[deprecated(note = "use find_thread_path_by_id_str")]
|
||||
pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
|
||||
pub use list::resolve_fork_reference_rollout_path;
|
||||
pub use list::rollout_date_parts;
|
||||
pub use recorder::RolloutRecorder;
|
||||
pub use recorder::RolloutRecorderParams;
|
||||
|
||||
@@ -17,9 +17,10 @@ pub(crate) fn is_persisted_response_item(item: &RolloutItem, mode: EventPersiste
|
||||
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
|
||||
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode),
|
||||
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
|
||||
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
|
||||
true
|
||||
}
|
||||
RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -566,6 +566,9 @@ impl RolloutRecorder {
|
||||
RolloutItem::ResponseItem(item) => {
|
||||
items.push(RolloutItem::ResponseItem(item));
|
||||
}
|
||||
RolloutItem::ForkReference(item) => {
|
||||
items.push(RolloutItem::ForkReference(item));
|
||||
}
|
||||
RolloutItem::Compacted(item) => {
|
||||
items.push(RolloutItem::Compacted(item));
|
||||
}
|
||||
@@ -1012,6 +1015,7 @@ async fn resume_candidate_matches_cwd(
|
||||
&& let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()),
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
|
||||
@@ -4,10 +4,14 @@
|
||||
//! interpreting them via `event_mapping::parse_turn_item(...)`.
|
||||
|
||||
use crate::event_mapping;
|
||||
use crate::resolve_fork_reference_rollout_path;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use std::path::Path;
|
||||
use tracing::warn;
|
||||
|
||||
/// Return the indices of user message boundaries in a rollout.
|
||||
///
|
||||
@@ -68,6 +72,77 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
|
||||
items[..cut_idx].to_vec()
|
||||
}
|
||||
|
||||
pub(crate) async fn materialize_rollout_items_for_replay(
|
||||
codex_home: &Path,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<RolloutItem> {
|
||||
const MAX_FORK_REFERENCE_DEPTH: usize = 8;
|
||||
|
||||
let mut materialized = Vec::new();
|
||||
let mut stack: Vec<(Vec<RolloutItem>, usize, usize)> = vec![(rollout_items.to_vec(), 0, 0)];
|
||||
|
||||
while let Some((items, mut idx, depth)) = stack.pop() {
|
||||
while idx < items.len() {
|
||||
match &items[idx] {
|
||||
RolloutItem::ForkReference(reference) => {
|
||||
if depth >= MAX_FORK_REFERENCE_DEPTH {
|
||||
warn!(
|
||||
"skipping fork reference recursion at depth {} for {:?}",
|
||||
depth, reference.rollout_path
|
||||
);
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let resolved_rollout_path = match resolve_fork_reference_rollout_path(
|
||||
codex_home,
|
||||
&reference.rollout_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to resolve fork reference rollout {:?}: {err}",
|
||||
reference.rollout_path
|
||||
);
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let parent_history = match RolloutRecorder::get_rollout_history(
|
||||
&resolved_rollout_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(history) => history,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to load fork reference rollout {:?} (resolved from {:?}): {err}",
|
||||
resolved_rollout_path, reference.rollout_path
|
||||
);
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let parent_items = truncate_rollout_before_nth_user_message_from_start(
|
||||
&parent_history.get_rollout_items(),
|
||||
reference.nth_user_message,
|
||||
);
|
||||
|
||||
stack.push((items, idx + 1, depth));
|
||||
stack.push((parent_items, 0, depth + 1));
|
||||
break;
|
||||
}
|
||||
item => materialized.push(item.clone()),
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
materialized
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -26,12 +26,14 @@ use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
use codex_protocol::openai_models::ModelsResponse;
|
||||
use codex_protocol::protocol::ForkReferenceItem;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::McpServerRefreshConfig;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
@@ -409,7 +411,18 @@ impl ThreadManager {
|
||||
persist_extended_history: bool,
|
||||
) -> CodexResult<NewThread> {
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
let history = truncate_before_nth_user_message(history, nth_user_message);
|
||||
let mut history = truncate_before_nth_user_message(
|
||||
config.codex_home.as_path(),
|
||||
history,
|
||||
nth_user_message,
|
||||
)
|
||||
.await;
|
||||
if let InitialHistory::Forked(items) = &mut history {
|
||||
*items = vec![RolloutItem::ForkReference(ForkReferenceItem {
|
||||
rollout_path: path.clone(),
|
||||
nth_user_message,
|
||||
})];
|
||||
}
|
||||
Box::pin(self.state.spawn_thread(
|
||||
config,
|
||||
history,
|
||||
@@ -656,8 +669,18 @@ impl ThreadManagerState {
|
||||
|
||||
/// Return a prefix of `items` obtained by cutting strictly before the nth user message
|
||||
/// (0-based) and all items that follow it.
|
||||
fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> InitialHistory {
|
||||
let items: Vec<RolloutItem> = history.get_rollout_items();
|
||||
async fn truncate_before_nth_user_message(
|
||||
codex_home: &Path,
|
||||
history: InitialHistory,
|
||||
n: usize,
|
||||
) -> InitialHistory {
|
||||
let mut items: Vec<RolloutItem> = history.get_rollout_items();
|
||||
if items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
|
||||
{
|
||||
items = truncation::materialize_rollout_items_for_replay(codex_home, &items).await;
|
||||
}
|
||||
let rolled = truncation::truncate_rollout_before_nth_user_message_from_start(&items, n);
|
||||
|
||||
if rolled.is_empty() {
|
||||
@@ -700,8 +723,8 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drops_from_last_user_only() {
|
||||
#[tokio::test]
|
||||
async fn drops_from_last_user_only() {
|
||||
let items = [
|
||||
user_msg("u1"),
|
||||
assistant_msg("a1"),
|
||||
@@ -730,7 +753,9 @@ mod tests {
|
||||
.cloned()
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1);
|
||||
let truncated =
|
||||
truncate_before_nth_user_message(Path::new("/tmp"), InitialHistory::Forked(initial), 1)
|
||||
.await;
|
||||
let got_items = truncated.get_rollout_items();
|
||||
let expected_items = vec![
|
||||
RolloutItem::ResponseItem(items[0].clone()),
|
||||
@@ -747,7 +772,12 @@ mod tests {
|
||||
.cloned()
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2);
|
||||
let truncated2 = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(initial2),
|
||||
2,
|
||||
)
|
||||
.await;
|
||||
assert_matches!(truncated2, InitialHistory::New);
|
||||
}
|
||||
|
||||
@@ -766,7 +796,12 @@ mod tests {
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
|
||||
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1);
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
Path::new("/tmp"),
|
||||
InitialHistory::Forked(rollout_items),
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let got_items = truncated.get_rollout_items();
|
||||
|
||||
let expected: Vec<RolloutItem> = vec![
|
||||
|
||||
@@ -18,6 +18,59 @@ use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
fn find_user_input_positions(items: &[RolloutItem]) -> Vec<usize> {
|
||||
let mut pos = Vec::new();
|
||||
for (i, it) in items.iter().enumerate() {
|
||||
if let RolloutItem::ResponseItem(response_item) = it
|
||||
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
|
||||
{
|
||||
pos.push(i);
|
||||
}
|
||||
}
|
||||
pos
|
||||
}
|
||||
|
||||
fn truncate_before_nth_user_message(
|
||||
items: &[RolloutItem],
|
||||
nth_user_message: usize,
|
||||
) -> Vec<RolloutItem> {
|
||||
if nth_user_message == usize::MAX {
|
||||
return items.to_vec();
|
||||
}
|
||||
let user_inputs = find_user_input_positions(items);
|
||||
let Some(cut_idx) = user_inputs.get(nth_user_message).copied() else {
|
||||
return Vec::new();
|
||||
};
|
||||
items[..cut_idx].to_vec()
|
||||
}
|
||||
|
||||
fn read_items_materialized(p: &std::path::Path) -> Vec<RolloutItem> {
|
||||
let text =
|
||||
std::fs::read_to_string(p).unwrap_or_else(|err| panic!("read rollout file {p:?}: {err}"));
|
||||
let mut items: Vec<RolloutItem> = Vec::new();
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let v: serde_json::Value =
|
||||
serde_json::from_str(line).unwrap_or_else(|err| panic!("jsonl line parse: {err}"));
|
||||
let rl: RolloutLine =
|
||||
serde_json::from_value(v).unwrap_or_else(|err| panic!("rollout line parse: {err}"));
|
||||
match rl.item {
|
||||
RolloutItem::SessionMeta(_) => {}
|
||||
RolloutItem::ForkReference(reference) => {
|
||||
let parent_items = read_items_materialized(&reference.rollout_path);
|
||||
items.extend(truncate_before_nth_user_message(
|
||||
&parent_items,
|
||||
reference.nth_user_message,
|
||||
));
|
||||
}
|
||||
other => items.push(other),
|
||||
}
|
||||
}
|
||||
items
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn fork_thread_twice_drops_to_first_message() {
|
||||
skip_if_no_network!();
|
||||
@@ -63,40 +116,9 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
|
||||
// GetHistory flushes before returning the path; no wait needed.
|
||||
|
||||
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
|
||||
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
|
||||
let text = std::fs::read_to_string(p).expect("read rollout file");
|
||||
let mut items: Vec<RolloutItem> = Vec::new();
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
|
||||
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
|
||||
match rl.item {
|
||||
RolloutItem::SessionMeta(_) => {}
|
||||
other => items.push(other),
|
||||
}
|
||||
}
|
||||
items
|
||||
};
|
||||
|
||||
// Compute expected prefixes after each fork by truncating base rollout
|
||||
// strictly before the nth user input (0-based).
|
||||
let base_items = read_items(&base_path);
|
||||
let find_user_input_positions = |items: &[RolloutItem]| -> Vec<usize> {
|
||||
let mut pos = Vec::new();
|
||||
for (i, it) in items.iter().enumerate() {
|
||||
if let RolloutItem::ResponseItem(response_item) = it
|
||||
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
|
||||
{
|
||||
// Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem.
|
||||
// We specifically look for input items, which are represented as ContentItem::InputText.
|
||||
pos.push(i);
|
||||
}
|
||||
}
|
||||
pos
|
||||
};
|
||||
let base_items = read_items_materialized(&base_path);
|
||||
let user_inputs = find_user_input_positions(&base_items);
|
||||
|
||||
// After cutting at nth user input (n=1 → second user message), cut strictly before that input.
|
||||
@@ -117,7 +139,7 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
let fork1_path = codex_fork1.rollout_path().expect("rollout path");
|
||||
|
||||
// GetHistory on fork1 flushed; the file is ready.
|
||||
let fork1_items = read_items(&fork1_path);
|
||||
let fork1_items = read_items_materialized(&fork1_path);
|
||||
assert!(fork1_items.len() > expected_after_first.len());
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(&fork1_items[..expected_after_first.len()]).unwrap(),
|
||||
@@ -135,14 +157,14 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
|
||||
let fork2_path = codex_fork2.rollout_path().expect("rollout path");
|
||||
// GetHistory on fork2 flushed; the file is ready.
|
||||
let fork1_items = read_items(&fork1_path);
|
||||
let fork1_items = read_items_materialized(&fork1_path);
|
||||
let fork1_user_inputs = find_user_input_positions(&fork1_items);
|
||||
let cut_last_on_fork1 = fork1_user_inputs
|
||||
.get(fork1_user_inputs.len().saturating_sub(1))
|
||||
.copied()
|
||||
.unwrap_or(0);
|
||||
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
|
||||
let fork2_items = read_items(&fork2_path);
|
||||
let fork2_items = read_items_materialized(&fork2_path);
|
||||
assert!(fork2_items.len() > expected_after_second.len());
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(&fork2_items[..expected_after_second.len()]).unwrap(),
|
||||
|
||||
@@ -1930,12 +1930,20 @@ impl InitialHistory {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.forked_from_id,
|
||||
_ => None,
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.id),
|
||||
_ => None,
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -1965,7 +1973,11 @@ impl InitialHistory {
|
||||
.iter()
|
||||
.filter_map(|ri| match ri {
|
||||
RolloutItem::EventMsg(ev) => Some(ev.clone()),
|
||||
_ => None,
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_) => None,
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
@@ -1974,7 +1986,11 @@ impl InitialHistory {
|
||||
.iter()
|
||||
.filter_map(|ri| match ri {
|
||||
RolloutItem::EventMsg(ev) => Some(ev.clone()),
|
||||
_ => None,
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_) => None,
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
@@ -1988,12 +2004,20 @@ impl InitialHistory {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
|
||||
_ => None,
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
|
||||
_ => None,
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -2004,12 +2028,20 @@ impl InitialHistory {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
|
||||
_ => None,
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
|
||||
_ => None,
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -2018,7 +2050,11 @@ impl InitialHistory {
|
||||
fn session_cwd_from_items(items: &[RolloutItem]) -> Option<PathBuf> {
|
||||
items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.cwd.clone()),
|
||||
_ => None,
|
||||
RolloutItem::ForkReference(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2172,10 +2208,17 @@ pub struct SessionMetaLine {
|
||||
pub git: Option<GitInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
|
||||
pub struct ForkReferenceItem {
|
||||
pub rollout_path: PathBuf,
|
||||
pub nth_user_message: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
|
||||
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
|
||||
pub enum RolloutItem {
|
||||
SessionMeta(SessionMetaLine),
|
||||
ForkReference(ForkReferenceItem),
|
||||
ResponseItem(ResponseItem),
|
||||
Compacted(CompactedItem),
|
||||
TurnContext(TurnContextItem),
|
||||
|
||||
@@ -22,6 +22,7 @@ pub fn apply_rollout_item(
|
||||
RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx),
|
||||
RolloutItem::EventMsg(event) => apply_event_msg(metadata, event),
|
||||
RolloutItem::ResponseItem(item) => apply_response_item(metadata, item),
|
||||
RolloutItem::ForkReference(_) => {}
|
||||
RolloutItem::Compacted(_) => {}
|
||||
}
|
||||
if metadata.model_provider.is_empty() {
|
||||
|
||||
@@ -554,6 +554,7 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
@@ -564,6 +565,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::ForkReference(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user