Compare commits

...

3 Commits

Author SHA1 Message Date
Friel
b159523329 codex: fix CI failure on PR #13637 2026-03-05 18:31:11 -08:00
Friel
4b1cb360a6 Fix nested fork reference replay and truncation
Hydrate fork-reference chains before truncating forked histories and
before building forked spawn histories, then use the hydrated items
for forked-session startup replay while still persisting only the
compact fork-reference suffix.

This fixes fork-of-fork boundary calculations, preserves inherited
context on forked startup, and updates the regression tests to compare
logical materialized history instead of raw compact storage bytes.
2026-03-05 16:34:13 -08:00
Friel
d561670c80 feat(rollout): preserve fork references across replay
Keep forked sessions compact by recording fork references instead of duplicating full parent history.

Repair stale parent rollout paths by resolving the referenced thread id back to the current active or archived rollout location during replay, and materialize fork references before app-server derives thread summaries. Retain the core and app-server regressions that cover archive/unarchive and thread-read behavior.
2026-03-05 14:55:43 -08:00
19 changed files with 931 additions and 65 deletions

View File

@@ -183,6 +183,7 @@ impl ThreadHistoryBuilder {
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_) => {}
}
}

View File

@@ -202,6 +202,7 @@ use codex_core::plugins::PluginInstallError as CorePluginInstallError;
use codex_core::plugins::PluginInstallRequest;
use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::resolve_fork_reference_rollout_path;
use codex_core::rollout_date_parts;
use codex_core::sandboxing::SandboxPermissions;
use codex_core::skills::remote::export_remote_skill;
@@ -6980,13 +6981,17 @@ pub(crate) async fn read_summary_from_rollout(
.unwrap_or_else(|| fallback_provider.to_string());
let git_info = git.as_ref().map(map_git_info);
let updated_at = updated_at.or_else(|| timestamp.clone());
let preview = read_rollout_items_from_rollout(path)
.await
.map(|items| preview_from_rollout_items(&items))
.unwrap_or_default();
Ok(ConversationSummary {
conversation_id: session_meta.id,
timestamp,
updated_at,
path: path.to_path_buf(),
preview: String::new(),
preview,
model_provider,
cwd: session_meta.cwd,
cli_version: session_meta.cli_version,
@@ -7004,7 +7009,7 @@ pub(crate) async fn read_rollout_items_from_rollout(
InitialHistory::Resumed(resumed) => resumed.history,
};
Ok(items)
Ok(materialize_rollout_items_for_replay(codex_home_from_rollout_path(path), &items).await)
}
fn extract_conversation_summary(
@@ -7105,6 +7110,137 @@ fn preview_from_rollout_items(items: &[RolloutItem]) -> String {
.unwrap_or_default()
}
fn user_message_positions_in_rollout(items: &[RolloutItem]) -> Vec<usize> {
let mut user_positions = Vec::new();
for (idx, item) in items.iter().enumerate() {
match item {
RolloutItem::ResponseItem(item)
if matches!(
codex_core::parse_turn_item(item),
Some(TurnItem::UserMessage(_))
) =>
{
user_positions.push(idx);
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX);
let new_len = user_positions.len().saturating_sub(num_turns);
user_positions.truncate(new_len);
}
RolloutItem::ResponseItem(_) => {}
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => {}
}
}
user_positions
}
fn truncate_rollout_before_nth_user_message_from_start(
items: &[RolloutItem],
n_from_start: usize,
) -> Vec<RolloutItem> {
if n_from_start == usize::MAX {
return items.to_vec();
}
let user_positions = user_message_positions_in_rollout(items);
if user_positions.len() <= n_from_start {
return Vec::new();
}
let cut_idx = user_positions[n_from_start];
items[..cut_idx].to_vec()
}
fn codex_home_from_rollout_path(path: &Path) -> Option<&Path> {
path.ancestors().find_map(|ancestor| {
let name = ancestor.file_name().and_then(OsStr::to_str)?;
if name == codex_core::SESSIONS_SUBDIR || name == codex_core::ARCHIVED_SESSIONS_SUBDIR {
ancestor.parent()
} else {
None
}
})
}
async fn materialize_rollout_items_for_replay(
codex_home: Option<&Path>,
rollout_items: &[RolloutItem],
) -> Vec<RolloutItem> {
const MAX_FORK_REFERENCE_DEPTH: usize = 8;
let mut materialized = Vec::new();
let mut stack: Vec<(Vec<RolloutItem>, usize, usize)> = vec![(rollout_items.to_vec(), 0, 0)];
while let Some((items, mut idx, depth)) = stack.pop() {
while idx < items.len() {
match &items[idx] {
RolloutItem::ForkReference(reference) => {
if depth >= MAX_FORK_REFERENCE_DEPTH {
warn!(
"skipping fork reference recursion at depth {} for {:?}",
depth, reference.rollout_path
);
idx += 1;
continue;
}
let resolved_rollout_path = if let Some(codex_home) = codex_home {
match resolve_fork_reference_rollout_path(
codex_home,
&reference.rollout_path,
)
.await
{
Ok(path) => path,
Err(err) => {
warn!(
"failed to resolve fork reference rollout {:?}: {err}",
reference.rollout_path
);
idx += 1;
continue;
}
}
} else {
reference.rollout_path.clone()
};
let parent_history = match RolloutRecorder::get_rollout_history(
&resolved_rollout_path,
)
.await
{
Ok(history) => history,
Err(err) => {
warn!(
"failed to load fork reference rollout {:?} (resolved from {:?}): {err}",
resolved_rollout_path, reference.rollout_path
);
idx += 1;
continue;
}
};
let parent_items = truncate_rollout_before_nth_user_message_from_start(
&parent_history.get_rollout_items(),
reference.nth_user_message,
);
stack.push((items, idx + 1, depth));
stack.push((parent_items, 0, depth + 1));
break;
}
item => materialized.push(item.clone()),
}
idx += 1;
}
}
materialized
}
fn with_thread_spawn_agent_metadata(
source: codex_protocol::protocol::SessionSource,
agent_nickname: Option<String>,

View File

@@ -7,6 +7,10 @@ use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
@@ -20,6 +24,8 @@ use codex_app_server_protocol::ThreadSetNameResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
@@ -152,6 +158,150 @@ async fn thread_read_can_include_turns() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_read_include_turns_keeps_fork_history_after_parent_archive_and_unarchive()
-> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread: parent, .. } =
to_response::<ThreadStartResponse>(start_resp)?;
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: parent.id.clone(),
input: vec![UserInput::Text {
text: "parent message".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
)
.await??;
let _: TurnStartResponse = to_response::<TurnStartResponse>(turn_start_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: parent.id.clone(),
..Default::default()
})
.await?;
let fork_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
)
.await??;
let ThreadForkResponse { thread: child, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
let read_child_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: child.id.clone(),
include_turns: true,
})
.await?;
let read_child_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
)
.await??;
let ThreadReadResponse {
thread: child_before_archive,
} = to_response::<ThreadReadResponse>(read_child_resp)?;
assert_eq!(child_before_archive.turns.len(), 1);
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent.id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let read_child_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: child.id.clone(),
include_turns: true,
})
.await?;
let read_child_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
)
.await??;
let ThreadReadResponse {
thread: child_after_archive,
} = to_response::<ThreadReadResponse>(read_child_resp)?;
assert_eq!(child_after_archive.turns, child_before_archive.turns);
let unarchive_id = mcp
.send_thread_unarchive_request(ThreadUnarchiveParams {
thread_id: parent.id,
})
.await?;
let unarchive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unarchive_id)),
)
.await??;
let _: ThreadUnarchiveResponse = to_response::<ThreadUnarchiveResponse>(unarchive_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/unarchived"),
)
.await??;
let read_child_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: child.id,
include_turns: true,
})
.await?;
let read_child_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
)
.await??;
let ThreadReadResponse {
thread: child_after_unarchive,
} = to_response::<ThreadReadResponse>(read_child_resp)?;
assert_eq!(child_after_unarchive.turns, child_before_archive.turns);
Ok(())
}
#[tokio::test]
async fn thread_read_loaded_thread_returns_precomputed_path_before_materialization() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View File

@@ -15,6 +15,7 @@ use crate::thread_manager::ThreadManagerState;
use codex_protocol::ThreadId;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
@@ -169,6 +170,21 @@ impl AgentControl {
RolloutRecorder::get_rollout_history(&rollout_path)
.await?
.get_rollout_items();
if forked_rollout_items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
forked_rollout_items =
crate::rollout::truncation::materialize_rollout_items_for_replay(
config.codex_home.as_path(),
&forked_rollout_items,
)
.await;
}
forked_rollout_items.push(RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: rollout_path.clone(),
nth_user_message: usize::MAX,
}));
let mut output = FunctionCallOutputPayload::from_text(
FORKED_SPAWN_AGENT_OUTPUT_MESSAGE.to_string(),
);
@@ -1102,6 +1118,117 @@ mod tests {
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_fork_persists_fork_reference_instead_of_parent_history() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
parent_thread
.inject_user_message_without_turn("parent seed context".to_string())
.await;
let turn_context = parent_thread.codex.session.new_default_turn().await;
let parent_spawn_call_id = "spawn-call-dedup".to_string();
let parent_spawn_call = ResponseItem::FunctionCall {
id: None,
name: "spawn_agent".to_string(),
arguments: "{}".to_string(),
call_id: parent_spawn_call_id.clone(),
};
parent_thread
.codex
.session
.record_conversation_items(turn_context.as_ref(), &[parent_spawn_call])
.await;
parent_thread
.codex
.session
.ensure_rollout_materialized()
.await;
parent_thread.codex.session.flush_rollout().await;
let parent_rollout_path = parent_thread
.rollout_path()
.expect("parent rollout path should be available");
let child_thread_id = harness
.control
.spawn_agent_with_options(
harness.config.clone(),
text_input("child task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: None,
})),
SpawnAgentOptions {
fork_parent_spawn_call_id: Some(parent_spawn_call_id),
},
)
.await
.expect("forked spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let child_rollout_path = child_thread
.rollout_path()
.expect("child rollout path should be available");
let InitialHistory::Resumed(resumed) =
RolloutRecorder::get_rollout_history(child_rollout_path.as_path())
.await
.expect("child rollout should load")
else {
panic!("child rollout should include session metadata");
};
assert!(
resumed.history.iter().any(|item| {
matches!(
item,
RolloutItem::ForkReference(ForkReferenceItem {
rollout_path,
nth_user_message,
}) if rollout_path == &parent_rollout_path && *nth_user_message == usize::MAX
)
}),
"child rollout should persist a fork reference to the parent rollout"
);
let raw_response_items: Vec<ResponseItem> = resumed
.history
.iter()
.filter_map(|item| match item {
RolloutItem::ResponseItem(response_item) => Some(response_item.clone()),
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
.collect();
assert!(
!history_contains_text(&raw_response_items, "parent seed context"),
"child rollout should not duplicate the parent's raw transcript"
);
let history = child_thread.codex.session.clone_history().await;
assert!(history_contains_text(
history.raw_items(),
"parent seed context"
));
let _ = harness
.control
.shutdown_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let _ = parent_thread
.submit(Op::Shutdown {})
.await
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_respects_max_threads_limit() {
let max_threads = 1usize;

View File

@@ -1826,8 +1826,17 @@ impl Session {
}
InitialHistory::Resumed(resumed_history) => {
let rollout_items = resumed_history.history;
let hydrated_rollout_items = if rollout_items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
self.materialize_rollout_items_for_replay(&rollout_items)
.await
} else {
rollout_items.clone()
};
let restored_tool_selection =
Self::extract_mcp_tool_selection_from_rollout(&rollout_items);
Self::extract_mcp_tool_selection_from_rollout(&hydrated_rollout_items);
let reconstructed_rollout = self
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
@@ -1869,7 +1878,7 @@ impl Session {
// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}
@@ -1884,11 +1893,24 @@ impl Session {
}
}
InitialHistory::Forked(rollout_items) => {
let persisted_rollout_items = rollout_items
.iter()
.position(|item| matches!(item, RolloutItem::ForkReference(_)))
.map(|index| rollout_items[index..].to_vec());
let hydrated_rollout_items = if rollout_items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
self.materialize_rollout_items_for_replay(&rollout_items)
.await
} else {
rollout_items.clone()
};
let restored_tool_selection =
Self::extract_mcp_tool_selection_from_rollout(&rollout_items);
Self::extract_mcp_tool_selection_from_rollout(&hydrated_rollout_items);
let reconstructed_rollout = self
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.reconstruct_history_from_rollout(&turn_context, &hydrated_rollout_items)
.await;
self.set_previous_turn_settings(
reconstructed_rollout.previous_turn_settings.clone(),
@@ -1910,7 +1932,7 @@ impl Session {
// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}
@@ -1918,8 +1940,11 @@ impl Session {
self.set_mcp_tool_selection(selected_tools).await;
}
// If persisting, persist all rollout items as-is (recorder filters)
if !rollout_items.is_empty() {
// Persist only the compact fork reference suffix so child rollouts do not
// duplicate the full parent history they inherited in memory.
if let Some(persisted_rollout_items) = persisted_rollout_items {
self.persist_rollout_items(&persisted_rollout_items).await;
} else if !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
}

View File

@@ -83,11 +83,34 @@ fn finalize_active_segment<'a>(
}
impl Session {
pub(super) async fn materialize_rollout_items_for_replay(
&self,
rollout_items: &[RolloutItem],
) -> Vec<RolloutItem> {
let codex_home = {
self.state
.lock()
.await
.session_configuration
.codex_home
.clone()
};
crate::rollout::truncation::materialize_rollout_items_for_replay(
codex_home.as_path(),
rollout_items,
)
.await
}
pub(super) async fn reconstruct_history_from_rollout(
&self,
turn_context: &TurnContext,
rollout_items: &[RolloutItem],
) -> RolloutReconstruction {
let rollout_items = self
.materialize_rollout_items_for_replay(rollout_items)
.await;
let rollout_items = rollout_items.as_slice();
// Replay metadata should already match the shape of the future lazy reverse loader, even
// while history materialization still uses an eager bridge. Scan newest-to-oldest,
// stopping once a surviving replacement-history checkpoint and the required resume metadata
@@ -203,6 +226,7 @@ impl Session {
}
RolloutItem::ResponseItem(_)
| RolloutItem::EventMsg(_)
| RolloutItem::ForkReference(_)
| RolloutItem::SessionMeta(_) => {}
}
@@ -271,6 +295,7 @@ impl Session {
history.drop_last_n_user_turns(rollback.num_turns);
}
RolloutItem::EventMsg(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_) => {}
}

View File

@@ -6,8 +6,16 @@ use crate::protocol::ResumedHistory;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message {
@@ -33,6 +41,52 @@ fn assistant_message(text: &str) -> ResponseItem {
}
}
fn write_rollout_items(
root: &Path,
thread_id: ThreadId,
items: &[RolloutItem],
) -> std::io::Result<PathBuf> {
let rollout_dir = root
.join(crate::SESSIONS_SUBDIR)
.join("2026")
.join("03")
.join("05");
std::fs::create_dir_all(&rollout_dir)?;
let rollout_path = rollout_dir.join(format!("rollout-2026-03-05T00-00-00-{thread_id}.jsonl"));
let session_meta_line = RolloutLine {
timestamp: "2026-03-05T00:00:00Z".to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: thread_id,
timestamp: "2026-03-05T00:00:00Z".to_string(),
cwd: root.to_path_buf(),
originator: "codex".to_string(),
cli_version: "test".to_string(),
source: SessionSource::Exec,
agent_nickname: None,
agent_role: None,
model_provider: Some("openai".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
forked_from_id: None,
},
git: None,
}),
};
let mut text = format!("{}\n", serde_json::to_string(&session_meta_line).unwrap());
for item in items {
let line = RolloutLine {
timestamp: "2026-03-05T00:00:01Z".to_string(),
item: item.clone(),
};
text.push_str(&serde_json::to_string(&line).unwrap());
text.push('\n');
}
std::fs::write(&rollout_path, text)?;
Ok(rollout_path)
}
#[tokio::test]
async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_turn_settings()
{
@@ -71,6 +125,129 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ
assert!(session.reference_context_item().await.is_none());
}
#[tokio::test]
async fn reconstruct_history_materializes_fork_reference_rollout_items() {
let (session, turn_context) = make_session_and_context().await;
let dir = TempDir::new().expect("create temp dir");
let parent_thread_id = ThreadId::new();
let parent_rollout_path = write_rollout_items(
dir.path(),
parent_thread_id,
&[
RolloutItem::ResponseItem(user_message("first user")),
RolloutItem::ResponseItem(assistant_message("first reply")),
RolloutItem::ResponseItem(user_message("second user")),
RolloutItem::ResponseItem(assistant_message("second reply")),
],
)
.expect("write parent rollout");
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: parent_rollout_path,
nth_user_message: 1,
})];
let reconstructed = session
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
assert_eq!(
reconstructed.history,
vec![user_message("first user"), assistant_message("first reply")]
);
}
#[tokio::test]
async fn record_initial_history_forked_materializes_fork_reference_rollout_items() {
let (session, turn_context) = make_session_and_context().await;
let codex_home = turn_context.config.codex_home.clone();
let parent_thread_id = ThreadId::new();
let parent_rollout_path = write_rollout_items(
codex_home.as_path(),
parent_thread_id,
&[
RolloutItem::ResponseItem(user_message("first user")),
RolloutItem::ResponseItem(assistant_message("first reply")),
RolloutItem::ResponseItem(user_message("second user")),
RolloutItem::ResponseItem(assistant_message("second reply")),
],
)
.expect("write parent rollout");
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: parent_rollout_path,
nth_user_message: 1,
})];
session
.record_initial_history(InitialHistory::Forked(rollout_items))
.await;
let reconstruction_turn = session.new_default_turn().await;
let mut expected = vec![user_message("first user"), assistant_message("first reply")];
expected.extend(
session
.build_initial_context(reconstruction_turn.as_ref())
.await,
);
let history = session.state.lock().await.clone_history();
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
async fn reconstruct_history_resolves_fork_reference_after_parent_archive_and_unarchive() {
let (session, turn_context) = make_session_and_context().await;
let codex_home = turn_context.config.codex_home.clone();
let parent_thread_id = ThreadId::new();
let parent_rollout_path = write_rollout_items(
codex_home.as_path(),
parent_thread_id,
&[
RolloutItem::ResponseItem(user_message("first user")),
RolloutItem::ResponseItem(assistant_message("first reply")),
RolloutItem::ResponseItem(user_message("second user")),
RolloutItem::ResponseItem(assistant_message("second reply")),
],
)
.expect("write parent rollout");
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: parent_rollout_path.clone(),
nth_user_message: 1,
})];
let expected_history = vec![user_message("first user"), assistant_message("first reply")];
let archived_rollout_dir = codex_home
.join(crate::ARCHIVED_SESSIONS_SUBDIR)
.join("2026")
.join("03")
.join("05");
std::fs::create_dir_all(&archived_rollout_dir).expect("create archived rollout dir");
let archived_rollout_path = archived_rollout_dir.join(
parent_rollout_path
.file_name()
.expect("parent rollout file name"),
);
std::fs::rename(&parent_rollout_path, &archived_rollout_path).expect("archive parent rollout");
let reconstructed = session
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
assert_eq!(reconstructed.history, expected_history);
let unarchived_rollout_dir = codex_home
.join(crate::SESSIONS_SUBDIR)
.join("2026")
.join("03")
.join("05");
std::fs::create_dir_all(&unarchived_rollout_dir).expect("create unarchived rollout dir");
std::fs::rename(&archived_rollout_path, &parent_rollout_path)
.expect("unarchive parent rollout");
let reconstructed = session
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
assert_eq!(reconstructed.history, expected_history);
}
#[tokio::test]
async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lifecycle_turn_with_missing_turn_context_id()
{

View File

@@ -131,6 +131,7 @@ pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary;
pub use rollout::list::read_session_meta_line;
pub use rollout::policy::EventPersistenceMode;
pub use rollout::resolve_fork_reference_rollout_path;
pub use rollout::rollout_date_parts;
pub use rollout::session_index::find_thread_names_by_ids;
mod function_tool;

View File

@@ -1061,6 +1061,9 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
RolloutItem::Compacted(_) => {
// Not included in `head`; skip.
}
RolloutItem::ForkReference(_) => {
// Not included in `head`; skip.
}
RolloutItem::EventMsg(ev) => {
if let EventMsg::UserMessage(user) = ev {
summary.saw_user_event = true;
@@ -1112,6 +1115,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Va
head.push(value);
}
}
RolloutItem::ForkReference(_) => {}
RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => {}
@@ -1258,6 +1262,40 @@ pub async fn find_archived_thread_path_by_id_str(
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str).await
}
/// Resolve a stored fork-reference rollout path to the current on-disk location.
///
/// Fork references persist a parent rollout filename. Archive and unarchive move that file
/// between `sessions/` and `archived_sessions/`, so stale stored paths must be repaired by
/// locating the rollout with the stable thread id embedded in the filename.
pub async fn resolve_fork_reference_rollout_path(
codex_home: &Path,
rollout_path: &Path,
) -> io::Result<PathBuf> {
match tokio::fs::try_exists(rollout_path).await {
Ok(true) => return Ok(rollout_path.to_path_buf()),
Ok(false) => {}
Err(err) => return Err(err),
}
let Some(file_name) = rollout_path.file_name().and_then(OsStr::to_str) else {
return Ok(rollout_path.to_path_buf());
};
let Some((_, thread_uuid)) = parse_timestamp_uuid_from_filename(file_name) else {
return Ok(rollout_path.to_path_buf());
};
let thread_id = thread_uuid.to_string();
if let Some(active_path) = find_thread_path_by_id_str(codex_home, &thread_id).await? {
return Ok(active_path);
}
if let Some(archived_path) = find_archived_thread_path_by_id_str(codex_home, &thread_id).await?
{
return Ok(archived_path);
}
Ok(rollout_path.to_path_buf())
}
/// Extract the `YYYY/MM/DD` directory components from a rollout filename.
pub fn rollout_date_parts(file_name: &OsStr) -> Option<(String, String, String)> {
let name = file_name.to_string_lossy();

View File

@@ -68,7 +68,8 @@ pub(crate) fn builder_from_items(
) -> Option<ThreadMetadataBuilder> {
if let Some(session_meta) = items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line),
RolloutItem::ResponseItem(_)
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
@@ -134,6 +135,7 @@ pub(crate) async fn extract_metadata_from_rollout(
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),

View File

@@ -21,6 +21,7 @@ pub use list::find_archived_thread_path_by_id_str;
pub use list::find_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
pub use list::resolve_fork_reference_rollout_path;
pub use list::rollout_date_parts;
pub use recorder::RolloutRecorder;
pub use recorder::RolloutRecorderParams;

View File

@@ -17,9 +17,10 @@ pub(crate) fn is_persisted_response_item(item: &RolloutItem, mode: EventPersiste
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode),
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
true
}
RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_) => true,
}
}

View File

@@ -566,6 +566,9 @@ impl RolloutRecorder {
RolloutItem::ResponseItem(item) => {
items.push(RolloutItem::ResponseItem(item));
}
RolloutItem::ForkReference(item) => {
items.push(RolloutItem::ForkReference(item));
}
RolloutItem::Compacted(item) => {
items.push(RolloutItem::Compacted(item));
}
@@ -1012,6 +1015,7 @@ async fn resume_candidate_matches_cwd(
&& let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item {
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()),
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::EventMsg(_) => None,

View File

@@ -4,10 +4,14 @@
//! interpreting them via `event_mapping::parse_turn_item(...)`.
use crate::event_mapping;
use crate::resolve_fork_reference_rollout_path;
use crate::rollout::RolloutRecorder;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use std::path::Path;
use tracing::warn;
/// Return the indices of user message boundaries in a rollout.
///
@@ -68,6 +72,77 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
items[..cut_idx].to_vec()
}
pub(crate) async fn materialize_rollout_items_for_replay(
codex_home: &Path,
rollout_items: &[RolloutItem],
) -> Vec<RolloutItem> {
const MAX_FORK_REFERENCE_DEPTH: usize = 8;
let mut materialized = Vec::new();
let mut stack: Vec<(Vec<RolloutItem>, usize, usize)> = vec![(rollout_items.to_vec(), 0, 0)];
while let Some((items, mut idx, depth)) = stack.pop() {
while idx < items.len() {
match &items[idx] {
RolloutItem::ForkReference(reference) => {
if depth >= MAX_FORK_REFERENCE_DEPTH {
warn!(
"skipping fork reference recursion at depth {} for {:?}",
depth, reference.rollout_path
);
idx += 1;
continue;
}
let resolved_rollout_path = match resolve_fork_reference_rollout_path(
codex_home,
&reference.rollout_path,
)
.await
{
Ok(path) => path,
Err(err) => {
warn!(
"failed to resolve fork reference rollout {:?}: {err}",
reference.rollout_path
);
idx += 1;
continue;
}
};
let parent_history = match RolloutRecorder::get_rollout_history(
&resolved_rollout_path,
)
.await
{
Ok(history) => history,
Err(err) => {
warn!(
"failed to load fork reference rollout {:?} (resolved from {:?}): {err}",
resolved_rollout_path, reference.rollout_path
);
idx += 1;
continue;
}
};
let parent_items = truncate_rollout_before_nth_user_message_from_start(
&parent_history.get_rollout_items(),
reference.nth_user_message,
);
stack.push((items, idx + 1, depth));
stack.push((parent_items, 0, depth + 1));
break;
}
item => materialized.push(item.clone()),
}
idx += 1;
}
}
materialized
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -26,12 +26,14 @@ use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::openai_models::ModelsResponse;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -409,7 +411,18 @@ impl ThreadManager {
persist_extended_history: bool,
) -> CodexResult<NewThread> {
let history = RolloutRecorder::get_rollout_history(&path).await?;
let history = truncate_before_nth_user_message(history, nth_user_message);
let mut history = truncate_before_nth_user_message(
config.codex_home.as_path(),
history,
nth_user_message,
)
.await;
if let InitialHistory::Forked(items) = &mut history {
*items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: path.clone(),
nth_user_message,
})];
}
Box::pin(self.state.spawn_thread(
config,
history,
@@ -656,8 +669,18 @@ impl ThreadManagerState {
/// Return a prefix of `items` obtained by cutting strictly before the nth user message
/// (0-based) and all items that follow it.
fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> InitialHistory {
let items: Vec<RolloutItem> = history.get_rollout_items();
async fn truncate_before_nth_user_message(
codex_home: &Path,
history: InitialHistory,
n: usize,
) -> InitialHistory {
let mut items: Vec<RolloutItem> = history.get_rollout_items();
if items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
items = truncation::materialize_rollout_items_for_replay(codex_home, &items).await;
}
let rolled = truncation::truncate_rollout_before_nth_user_message_from_start(&items, n);
if rolled.is_empty() {
@@ -700,8 +723,8 @@ mod tests {
}
}
#[test]
fn drops_from_last_user_only() {
#[tokio::test]
async fn drops_from_last_user_only() {
let items = [
user_msg("u1"),
assistant_msg("a1"),
@@ -730,7 +753,9 @@ mod tests {
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1);
let truncated =
truncate_before_nth_user_message(Path::new("/tmp"), InitialHistory::Forked(initial), 1)
.await;
let got_items = truncated.get_rollout_items();
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
@@ -747,7 +772,12 @@ mod tests {
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2);
let truncated2 = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(initial2),
2,
)
.await;
assert_matches!(truncated2, InitialHistory::New);
}
@@ -766,7 +796,12 @@ mod tests {
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1);
let truncated = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(rollout_items),
1,
)
.await;
let got_items = truncated.get_rollout_items();
let expected: Vec<RolloutItem> = vec![

View File

@@ -18,6 +18,59 @@ use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
fn find_user_input_positions(items: &[RolloutItem]) -> Vec<usize> {
let mut pos = Vec::new();
for (i, it) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(response_item) = it
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
{
pos.push(i);
}
}
pos
}
fn truncate_before_nth_user_message(
items: &[RolloutItem],
nth_user_message: usize,
) -> Vec<RolloutItem> {
if nth_user_message == usize::MAX {
return items.to_vec();
}
let user_inputs = find_user_input_positions(items);
let Some(cut_idx) = user_inputs.get(nth_user_message).copied() else {
return Vec::new();
};
items[..cut_idx].to_vec()
}
fn read_items_materialized(p: &std::path::Path) -> Vec<RolloutItem> {
let text =
std::fs::read_to_string(p).unwrap_or_else(|err| panic!("read rollout file {p:?}: {err}"));
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value =
serde_json::from_str(line).unwrap_or_else(|err| panic!("jsonl line parse: {err}"));
let rl: RolloutLine =
serde_json::from_value(v).unwrap_or_else(|err| panic!("rollout line parse: {err}"));
match rl.item {
RolloutItem::SessionMeta(_) => {}
RolloutItem::ForkReference(reference) => {
let parent_items = read_items_materialized(&reference.rollout_path);
items.extend(truncate_before_nth_user_message(
&parent_items,
reference.nth_user_message,
));
}
other => items.push(other),
}
}
items
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_thread_twice_drops_to_first_message() {
skip_if_no_network!();
@@ -63,40 +116,9 @@ async fn fork_thread_twice_drops_to_first_message() {
// GetHistory flushes before returning the path; no wait needed.
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
let text = std::fs::read_to_string(p).expect("read rollout file");
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
match rl.item {
RolloutItem::SessionMeta(_) => {}
other => items.push(other),
}
}
items
};
// Compute expected prefixes after each fork by truncating base rollout
// strictly before the nth user input (0-based).
let base_items = read_items(&base_path);
let find_user_input_positions = |items: &[RolloutItem]| -> Vec<usize> {
let mut pos = Vec::new();
for (i, it) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(response_item) = it
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
{
// Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem.
// We specifically look for input items, which are represented as ContentItem::InputText.
pos.push(i);
}
}
pos
};
let base_items = read_items_materialized(&base_path);
let user_inputs = find_user_input_positions(&base_items);
// After cutting at nth user input (n=1 → second user message), cut strictly before that input.
@@ -117,7 +139,7 @@ async fn fork_thread_twice_drops_to_first_message() {
let fork1_path = codex_fork1.rollout_path().expect("rollout path");
// GetHistory on fork1 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_items = read_items_materialized(&fork1_path);
assert!(fork1_items.len() > expected_after_first.len());
pretty_assertions::assert_eq!(
serde_json::to_value(&fork1_items[..expected_after_first.len()]).unwrap(),
@@ -135,14 +157,14 @@ async fn fork_thread_twice_drops_to_first_message() {
let fork2_path = codex_fork2.rollout_path().expect("rollout path");
// GetHistory on fork2 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_items = read_items_materialized(&fork1_path);
let fork1_user_inputs = find_user_input_positions(&fork1_items);
let cut_last_on_fork1 = fork1_user_inputs
.get(fork1_user_inputs.len().saturating_sub(1))
.copied()
.unwrap_or(0);
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
let fork2_items = read_items(&fork2_path);
let fork2_items = read_items_materialized(&fork2_path);
assert!(fork2_items.len() > expected_after_second.len());
pretty_assertions::assert_eq!(
serde_json::to_value(&fork2_items[..expected_after_second.len()]).unwrap(),

View File

@@ -1930,12 +1930,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.forked_from_id,
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.id),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -1965,7 +1973,11 @@ impl InitialHistory {
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_) => None,
})
.collect(),
),
@@ -1974,7 +1986,11 @@ impl InitialHistory {
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_) => None,
})
.collect(),
),
@@ -1988,12 +2004,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -2004,12 +2028,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -2018,7 +2050,11 @@ impl InitialHistory {
fn session_cwd_from_items(items: &[RolloutItem]) -> Option<PathBuf> {
items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.cwd.clone()),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
@@ -2172,10 +2208,17 @@ pub struct SessionMetaLine {
pub git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
pub struct ForkReferenceItem {
pub rollout_path: PathBuf,
pub nth_user_message: usize,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
pub enum RolloutItem {
SessionMeta(SessionMetaLine),
ForkReference(ForkReferenceItem),
ResponseItem(ResponseItem),
Compacted(CompactedItem),
TurnContext(TurnContextItem),

View File

@@ -22,6 +22,7 @@ pub fn apply_rollout_item(
RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx),
RolloutItem::EventMsg(event) => apply_event_msg(metadata, event),
RolloutItem::ResponseItem(item) => apply_response_item(metadata, item),
RolloutItem::ForkReference(_) => {}
RolloutItem::Compacted(_) => {}
}
if metadata.model_provider.is_empty() {

View File

@@ -554,6 +554,7 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
@@ -564,6 +565,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})