Compare commits

...

1 Commits

Author SHA1 Message Date
Friel
def3e33ffe feat(rollout): preserve fork references across replay
Preserve fork-reference replay behavior on the current origin/main base and collapse the branch back to a single commit for easier future restacks.
2026-03-14 13:31:40 -07:00
20 changed files with 2301 additions and 68 deletions

View File

@@ -184,6 +184,7 @@ impl ThreadHistoryBuilder {
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_) => {}
}
}

View File

@@ -233,6 +233,7 @@ use codex_core::plugins::PluginUninstallError as CorePluginUninstallError;
use codex_core::plugins::load_plugin_apps;
use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::resolve_fork_reference_rollout_path;
use codex_core::rollout_date_parts;
use codex_core::sandboxing::SandboxPermissions;
use codex_core::skills::remote::export_remote_skill;
@@ -7923,13 +7924,17 @@ pub(crate) async fn read_summary_from_rollout(
.unwrap_or_else(|| fallback_provider.to_string());
let git_info = git.as_ref().map(map_git_info);
let updated_at = updated_at.or_else(|| timestamp.clone());
let preview = read_rollout_items_from_rollout(path)
.await
.map(|items| preview_from_rollout_items(&items))
.unwrap_or_default();
Ok(ConversationSummary {
conversation_id: session_meta.id,
timestamp,
updated_at,
path: path.to_path_buf(),
preview: String::new(),
preview,
model_provider,
cwd: session_meta.cwd,
cli_version: session_meta.cli_version,
@@ -7947,7 +7952,7 @@ pub(crate) async fn read_rollout_items_from_rollout(
InitialHistory::Resumed(resumed) => resumed.history,
};
Ok(items)
Ok(materialize_rollout_items_for_replay(codex_home_from_rollout_path(path), &items).await)
}
fn extract_conversation_summary(
@@ -8048,6 +8053,137 @@ fn preview_from_rollout_items(items: &[RolloutItem]) -> String {
.unwrap_or_default()
}
fn user_message_positions_in_rollout(items: &[RolloutItem]) -> Vec<usize> {
let mut user_positions = Vec::new();
for (idx, item) in items.iter().enumerate() {
match item {
RolloutItem::ResponseItem(item)
if matches!(
codex_core::parse_turn_item(item),
Some(TurnItem::UserMessage(_))
) =>
{
user_positions.push(idx);
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX);
let new_len = user_positions.len().saturating_sub(num_turns);
user_positions.truncate(new_len);
}
RolloutItem::ResponseItem(_) => {}
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => {}
}
}
user_positions
}
fn truncate_rollout_before_nth_user_message_from_start(
items: &[RolloutItem],
n_from_start: usize,
) -> Vec<RolloutItem> {
if n_from_start == usize::MAX {
return items.to_vec();
}
let user_positions = user_message_positions_in_rollout(items);
if user_positions.len() <= n_from_start {
return Vec::new();
}
let cut_idx = user_positions[n_from_start];
items[..cut_idx].to_vec()
}
fn codex_home_from_rollout_path(path: &Path) -> Option<&Path> {
path.ancestors().find_map(|ancestor| {
let name = ancestor.file_name().and_then(OsStr::to_str)?;
if name == codex_core::SESSIONS_SUBDIR || name == codex_core::ARCHIVED_SESSIONS_SUBDIR {
ancestor.parent()
} else {
None
}
})
}
async fn materialize_rollout_items_for_replay(
codex_home: Option<&Path>,
rollout_items: &[RolloutItem],
) -> Vec<RolloutItem> {
const MAX_FORK_REFERENCE_DEPTH: usize = 8;
let mut materialized = Vec::new();
let mut stack: Vec<(Vec<RolloutItem>, usize, usize)> = vec![(rollout_items.to_vec(), 0, 0)];
while let Some((items, mut idx, depth)) = stack.pop() {
while idx < items.len() {
match &items[idx] {
RolloutItem::ForkReference(reference) => {
if depth >= MAX_FORK_REFERENCE_DEPTH {
warn!(
"skipping fork reference recursion at depth {} for {:?}",
depth, reference.rollout_path
);
idx += 1;
continue;
}
let resolved_rollout_path = if let Some(codex_home) = codex_home {
match resolve_fork_reference_rollout_path(
codex_home,
&reference.rollout_path,
)
.await
{
Ok(path) => path,
Err(err) => {
warn!(
"failed to resolve fork reference rollout {:?}: {err}",
reference.rollout_path
);
idx += 1;
continue;
}
}
} else {
reference.rollout_path.clone()
};
let parent_history = match RolloutRecorder::get_rollout_history(
&resolved_rollout_path,
)
.await
{
Ok(history) => history,
Err(err) => {
warn!(
"failed to load fork reference rollout {:?} (resolved from {:?}): {err}",
resolved_rollout_path, reference.rollout_path
);
idx += 1;
continue;
}
};
let parent_items = truncate_rollout_before_nth_user_message_from_start(
&parent_history.get_rollout_items(),
reference.nth_user_message,
);
stack.push((items, idx + 1, depth));
stack.push((parent_items, 0, depth + 1));
break;
}
item => materialized.push(item.clone()),
}
idx += 1;
}
}
materialized
}
fn with_thread_spawn_agent_metadata(
source: codex_protocol::protocol::SessionSource,
agent_nickname: Option<String>,

View File

@@ -7,6 +7,10 @@ use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
@@ -20,6 +24,8 @@ use codex_app_server_protocol::ThreadSetNameResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
@@ -152,6 +158,150 @@ async fn thread_read_can_include_turns() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_read_include_turns_keeps_fork_history_after_parent_archive_and_unarchive()
-> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread: parent, .. } =
to_response::<ThreadStartResponse>(start_resp)?;
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: parent.id.clone(),
input: vec![UserInput::Text {
text: "parent message".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
)
.await??;
let _: TurnStartResponse = to_response::<TurnStartResponse>(turn_start_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: parent.id.clone(),
..Default::default()
})
.await?;
let fork_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
)
.await??;
let ThreadForkResponse { thread: child, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
let read_child_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: child.id.clone(),
include_turns: true,
})
.await?;
let read_child_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
)
.await??;
let ThreadReadResponse {
thread: child_before_archive,
} = to_response::<ThreadReadResponse>(read_child_resp)?;
assert_eq!(child_before_archive.turns.len(), 1);
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent.id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let read_child_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: child.id.clone(),
include_turns: true,
})
.await?;
let read_child_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
)
.await??;
let ThreadReadResponse {
thread: child_after_archive,
} = to_response::<ThreadReadResponse>(read_child_resp)?;
assert_eq!(child_after_archive.turns, child_before_archive.turns);
let unarchive_id = mcp
.send_thread_unarchive_request(ThreadUnarchiveParams {
thread_id: parent.id,
})
.await?;
let unarchive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unarchive_id)),
)
.await??;
let _: ThreadUnarchiveResponse = to_response::<ThreadUnarchiveResponse>(unarchive_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/unarchived"),
)
.await??;
let read_child_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: child.id,
include_turns: true,
})
.await?;
let read_child_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_child_id)),
)
.await??;
let ThreadReadResponse {
thread: child_after_unarchive,
} = to_response::<ThreadReadResponse>(read_child_resp)?;
assert_eq!(child_after_unarchive.turns, child_before_archive.turns);
Ok(())
}
#[tokio::test]
async fn thread_read_loaded_thread_returns_precomputed_path_before_materialization() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

File diff suppressed because it is too large Load Diff

View File

@@ -2054,6 +2054,11 @@ impl Session {
state.clear_connector_selection();
}
async fn set_connector_selection(&self, connector_ids: HashSet<String>) {
self.clear_connector_selection().await;
self.merge_connector_selection(connector_ids).await;
}
async fn record_initial_history(&self, conversation_history: InitialHistory) {
let turn_context = self.new_default_turn().await;
let is_subagent = {
@@ -2071,8 +2076,19 @@ impl Session {
}
InitialHistory::Resumed(resumed_history) => {
let rollout_items = resumed_history.history;
let hydrated_rollout_items = if rollout_items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
self.materialize_rollout_items_for_replay(&rollout_items)
.await
} else {
rollout_items.clone()
};
let restored_connector_selection =
Self::extract_connector_selection_from_rollout(&hydrated_rollout_items);
let previous_turn_settings = self
.apply_rollout_reconstruction(&turn_context, &rollout_items)
.apply_rollout_reconstruction(&turn_context, &hydrated_rollout_items)
.await;
// If resuming, warn when the last recorded model differs from the current one.
@@ -2097,10 +2113,13 @@ impl Session {
// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}
if let Some(selected_connectors) = restored_connector_selection {
self.set_connector_selection(selected_connectors).await;
}
// Defer seeding the session's initial context until the first turn starts so
// turn/start overrides can be merged before we write to the rollout.
@@ -2109,18 +2128,40 @@ impl Session {
}
}
InitialHistory::Forked(rollout_items) => {
self.apply_rollout_reconstruction(&turn_context, &rollout_items)
let persisted_rollout_items = rollout_items
.iter()
.position(|item| matches!(item, RolloutItem::ForkReference(_)))
.map(|index| rollout_items[index..].to_vec());
let hydrated_rollout_items = if rollout_items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
self.materialize_rollout_items_for_replay(&rollout_items)
.await
} else {
rollout_items.clone()
};
let restored_connector_selection =
Self::extract_connector_selection_from_rollout(&hydrated_rollout_items);
self.apply_rollout_reconstruction(&turn_context, &hydrated_rollout_items)
.await;
// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
if let Some(info) = Self::last_token_info_from_rollout(&hydrated_rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}
if let Some(selected_connectors) = restored_connector_selection {
self.set_connector_selection(selected_connectors).await;
}
// If persisting, persist all rollout items as-is (recorder filters)
if !rollout_items.is_empty() {
// Persist only the compact fork reference suffix so child rollouts do not
// duplicate the full parent history they inherited in memory.
if let Some(persisted_rollout_items) = persisted_rollout_items {
self.persist_rollout_items(&persisted_rollout_items).await;
} else if !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
}
@@ -2170,6 +2211,41 @@ impl Session {
})
}
fn extract_connector_selection_from_rollout(
rollout_items: &[RolloutItem],
) -> Option<HashSet<String>> {
let mut active_selected_connectors: Option<HashSet<String>> = None;
for item in rollout_items {
let RolloutItem::ResponseItem(response_item) = item else {
continue;
};
let ResponseItem::FunctionCallOutput { output, .. } = response_item else {
continue;
};
let Some(content) = output.body.to_text() else {
continue;
};
let Ok(payload) = serde_json::from_str::<Value>(&content) else {
continue;
};
let Some(selected_connectors) = payload
.get("active_selected_tools")
.and_then(Value::as_array)
else {
continue;
};
let connector_ids = selected_connectors
.iter()
.filter_map(Value::as_str)
.map(ToOwned::to_owned)
.collect::<HashSet<_>>();
active_selected_connectors = Some(connector_ids);
}
active_selected_connectors
}
async fn previous_turn_settings(&self) -> Option<PreviousTurnSettings> {
let state = self.state.lock().await;
state.previous_turn_settings()

View File

@@ -83,11 +83,34 @@ fn finalize_active_segment<'a>(
}
impl Session {
pub(super) async fn materialize_rollout_items_for_replay(
&self,
rollout_items: &[RolloutItem],
) -> Vec<RolloutItem> {
let codex_home = {
self.state
.lock()
.await
.session_configuration
.codex_home
.clone()
};
crate::rollout::truncation::materialize_rollout_items_for_replay(
codex_home.as_path(),
rollout_items,
)
.await
}
pub(super) async fn reconstruct_history_from_rollout(
&self,
turn_context: &TurnContext,
rollout_items: &[RolloutItem],
) -> RolloutReconstruction {
let rollout_items = self
.materialize_rollout_items_for_replay(rollout_items)
.await;
let rollout_items = rollout_items.as_slice();
// Replay metadata should already match the shape of the future lazy reverse loader, even
// while history materialization still uses an eager bridge. Scan newest-to-oldest,
// stopping once a surviving replacement-history checkpoint and the required resume metadata
@@ -203,6 +226,7 @@ impl Session {
}
RolloutItem::ResponseItem(_)
| RolloutItem::EventMsg(_)
| RolloutItem::ForkReference(_)
| RolloutItem::SessionMeta(_) => {}
}
@@ -271,6 +295,7 @@ impl Session {
history.drop_last_n_user_turns(rollback.num_turns);
}
RolloutItem::EventMsg(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_) => {}
}

View File

@@ -6,8 +6,16 @@ use crate::protocol::ResumedHistory;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message {
@@ -33,6 +41,52 @@ fn assistant_message(text: &str) -> ResponseItem {
}
}
fn write_rollout_items(
root: &Path,
thread_id: ThreadId,
items: &[RolloutItem],
) -> std::io::Result<PathBuf> {
let rollout_dir = root
.join(crate::SESSIONS_SUBDIR)
.join("2026")
.join("03")
.join("05");
std::fs::create_dir_all(&rollout_dir)?;
let rollout_path = rollout_dir.join(format!("rollout-2026-03-05T00-00-00-{thread_id}.jsonl"));
let session_meta_line = RolloutLine {
timestamp: "2026-03-05T00:00:00Z".to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: thread_id,
timestamp: "2026-03-05T00:00:00Z".to_string(),
cwd: root.to_path_buf(),
originator: "codex".to_string(),
cli_version: "test".to_string(),
source: SessionSource::Exec,
agent_nickname: None,
agent_role: None,
model_provider: Some("openai".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
forked_from_id: None,
},
git: None,
}),
};
let mut text = format!("{}\n", serde_json::to_string(&session_meta_line).unwrap());
for item in items {
let line = RolloutLine {
timestamp: "2026-03-05T00:00:01Z".to_string(),
item: item.clone(),
};
text.push_str(&serde_json::to_string(&line).unwrap());
text.push('\n');
}
std::fs::write(&rollout_path, text)?;
Ok(rollout_path)
}
#[tokio::test]
async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_turn_settings()
{
@@ -72,6 +126,129 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ
assert!(session.reference_context_item().await.is_none());
}
#[tokio::test]
async fn reconstruct_history_materializes_fork_reference_rollout_items() {
let (session, turn_context) = make_session_and_context().await;
let dir = TempDir::new().expect("create temp dir");
let parent_thread_id = ThreadId::new();
let parent_rollout_path = write_rollout_items(
dir.path(),
parent_thread_id,
&[
RolloutItem::ResponseItem(user_message("first user")),
RolloutItem::ResponseItem(assistant_message("first reply")),
RolloutItem::ResponseItem(user_message("second user")),
RolloutItem::ResponseItem(assistant_message("second reply")),
],
)
.expect("write parent rollout");
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: parent_rollout_path,
nth_user_message: 1,
})];
let reconstructed = session
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
assert_eq!(
reconstructed.history,
vec![user_message("first user"), assistant_message("first reply")]
);
}
#[tokio::test]
async fn record_initial_history_forked_materializes_fork_reference_rollout_items() {
let (session, turn_context) = make_session_and_context().await;
let codex_home = turn_context.config.codex_home.clone();
let parent_thread_id = ThreadId::new();
let parent_rollout_path = write_rollout_items(
codex_home.as_path(),
parent_thread_id,
&[
RolloutItem::ResponseItem(user_message("first user")),
RolloutItem::ResponseItem(assistant_message("first reply")),
RolloutItem::ResponseItem(user_message("second user")),
RolloutItem::ResponseItem(assistant_message("second reply")),
],
)
.expect("write parent rollout");
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: parent_rollout_path,
nth_user_message: 1,
})];
session
.record_initial_history(InitialHistory::Forked(rollout_items))
.await;
let reconstruction_turn = session.new_default_turn().await;
let mut expected = vec![user_message("first user"), assistant_message("first reply")];
expected.extend(
session
.build_initial_context(reconstruction_turn.as_ref())
.await,
);
let history = session.state.lock().await.clone_history();
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
async fn reconstruct_history_resolves_fork_reference_after_parent_archive_and_unarchive() {
let (session, turn_context) = make_session_and_context().await;
let codex_home = turn_context.config.codex_home.clone();
let parent_thread_id = ThreadId::new();
let parent_rollout_path = write_rollout_items(
codex_home.as_path(),
parent_thread_id,
&[
RolloutItem::ResponseItem(user_message("first user")),
RolloutItem::ResponseItem(assistant_message("first reply")),
RolloutItem::ResponseItem(user_message("second user")),
RolloutItem::ResponseItem(assistant_message("second reply")),
],
)
.expect("write parent rollout");
let rollout_items = vec![RolloutItem::ForkReference(ForkReferenceItem {
rollout_path: parent_rollout_path.clone(),
nth_user_message: 1,
})];
let expected_history = vec![user_message("first user"), assistant_message("first reply")];
let archived_rollout_dir = codex_home
.join(crate::ARCHIVED_SESSIONS_SUBDIR)
.join("2026")
.join("03")
.join("05");
std::fs::create_dir_all(&archived_rollout_dir).expect("create archived rollout dir");
let archived_rollout_path = archived_rollout_dir.join(
parent_rollout_path
.file_name()
.expect("parent rollout file name"),
);
std::fs::rename(&parent_rollout_path, &archived_rollout_path).expect("archive parent rollout");
let reconstructed = session
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
assert_eq!(reconstructed.history, expected_history);
let unarchived_rollout_dir = codex_home
.join(crate::SESSIONS_SUBDIR)
.join("2026")
.join("03")
.join("05");
std::fs::create_dir_all(&unarchived_rollout_dir).expect("create unarchived rollout dir");
std::fs::rename(&archived_rollout_path, &parent_rollout_path)
.expect("unarchive parent rollout");
let reconstructed = session
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
assert_eq!(reconstructed.history, expected_history);
}
#[tokio::test]
async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lifecycle_turn_with_missing_turn_context_id()
{

View File

@@ -141,6 +141,7 @@ pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary;
pub use rollout::list::read_session_meta_line;
pub use rollout::policy::EventPersistenceMode;
pub use rollout::resolve_fork_reference_rollout_path;
pub use rollout::rollout_date_parts;
pub use rollout::session_index::find_thread_names_by_ids;
mod function_tool;

View File

@@ -1061,6 +1061,9 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
RolloutItem::Compacted(_) => {
// Not included in `head`; skip.
}
RolloutItem::ForkReference(_) => {
// Not included in `head`; skip.
}
RolloutItem::EventMsg(ev) => {
if let EventMsg::UserMessage(user) = ev {
summary.saw_user_event = true;
@@ -1112,6 +1115,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Va
head.push(value);
}
}
RolloutItem::ForkReference(_) => {}
RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => {}
@@ -1262,6 +1266,40 @@ pub async fn find_archived_thread_path_by_id_str(
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str).await
}
/// Resolve a stored fork-reference rollout path to the current on-disk location.
///
/// Fork references persist a parent rollout filename. Archive and unarchive move that file
/// between `sessions/` and `archived_sessions/`, so stale stored paths must be repaired by
/// locating the rollout with the stable thread id embedded in the filename.
pub async fn resolve_fork_reference_rollout_path(
codex_home: &Path,
rollout_path: &Path,
) -> io::Result<PathBuf> {
match tokio::fs::try_exists(rollout_path).await {
Ok(true) => return Ok(rollout_path.to_path_buf()),
Ok(false) => {}
Err(err) => return Err(err),
}
let Some(file_name) = rollout_path.file_name().and_then(OsStr::to_str) else {
return Ok(rollout_path.to_path_buf());
};
let Some((_, thread_uuid)) = parse_timestamp_uuid_from_filename(file_name) else {
return Ok(rollout_path.to_path_buf());
};
let thread_id = thread_uuid.to_string();
if let Some(active_path) = find_thread_path_by_id_str(codex_home, &thread_id).await? {
return Ok(active_path);
}
if let Some(archived_path) = find_archived_thread_path_by_id_str(codex_home, &thread_id).await?
{
return Ok(archived_path);
}
Ok(rollout_path.to_path_buf())
}
/// Extract the `YYYY/MM/DD` directory components from a rollout filename.
pub fn rollout_date_parts(file_name: &OsStr) -> Option<(String, String, String)> {
let name = file_name.to_string_lossy();

View File

@@ -67,7 +67,8 @@ pub(crate) fn builder_from_items(
) -> Option<ThreadMetadataBuilder> {
if let Some(session_meta) = items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line),
RolloutItem::ResponseItem(_)
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
@@ -123,6 +124,7 @@ pub(crate) async fn extract_metadata_from_rollout(
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),

View File

@@ -21,6 +21,7 @@ pub use list::find_archived_thread_path_by_id_str;
pub use list::find_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
pub use list::resolve_fork_reference_rollout_path;
pub use list::rollout_date_parts;
pub use recorder::RolloutRecorder;
pub use recorder::RolloutRecorderParams;

View File

@@ -17,9 +17,10 @@ pub(crate) fn is_persisted_response_item(item: &RolloutItem, mode: EventPersiste
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode),
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
true
}
RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_) => true,
}
}

View File

@@ -566,6 +566,9 @@ impl RolloutRecorder {
RolloutItem::ResponseItem(item) => {
items.push(RolloutItem::ResponseItem(item));
}
RolloutItem::ForkReference(item) => {
items.push(RolloutItem::ForkReference(item));
}
RolloutItem::Compacted(item) => {
items.push(RolloutItem::Compacted(item));
}
@@ -592,6 +595,10 @@ impl RolloutRecorder {
Ok((items, thread_id, parse_errors))
}
/// Load a rollout for resume semantics.
///
/// This preserves the rollout's existing conversation id and rollout path, so callers must
/// not use it for true forking semantics.
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
let (items, thread_id, _parse_errors) = Self::load_rollout_items(path).await?;
let conversation_id = thread_id
@@ -609,6 +616,21 @@ impl RolloutRecorder {
}))
}
/// Load a rollout for true fork semantics.
///
/// Unlike `get_rollout_history`, this intentionally discards the source rollout's
/// conversation id so the child thread gets a fresh id and preserves `forked_from_id`.
pub async fn get_fork_history(path: &Path) -> std::io::Result<InitialHistory> {
let (items, _thread_id, _parse_errors) = Self::load_rollout_items(path).await?;
if items.is_empty() {
return Ok(InitialHistory::New);
}
info!("Loaded rollout fork history from {path:?}");
Ok(InitialHistory::Forked(items))
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
@@ -1053,6 +1075,7 @@ async fn resume_candidate_matches_cwd(
&& let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item {
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()),
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::EventMsg(_) => None,

View File

@@ -4,10 +4,14 @@
//! interpreting them via `event_mapping::parse_turn_item(...)`.
use crate::event_mapping;
use crate::resolve_fork_reference_rollout_path;
use crate::rollout::RolloutRecorder;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use std::path::Path;
use tracing::warn;
/// Return the indices of user message boundaries in a rollout.
///
@@ -68,6 +72,77 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
items[..cut_idx].to_vec()
}
pub(crate) async fn materialize_rollout_items_for_replay(
codex_home: &Path,
rollout_items: &[RolloutItem],
) -> Vec<RolloutItem> {
const MAX_FORK_REFERENCE_DEPTH: usize = 8;
let mut materialized = Vec::new();
let mut stack: Vec<(Vec<RolloutItem>, usize, usize)> = vec![(rollout_items.to_vec(), 0, 0)];
while let Some((items, mut idx, depth)) = stack.pop() {
while idx < items.len() {
match &items[idx] {
RolloutItem::ForkReference(reference) => {
if depth >= MAX_FORK_REFERENCE_DEPTH {
warn!(
"skipping fork reference recursion at depth {} for {:?}",
depth, reference.rollout_path
);
idx += 1;
continue;
}
let resolved_rollout_path = match resolve_fork_reference_rollout_path(
codex_home,
&reference.rollout_path,
)
.await
{
Ok(path) => path,
Err(err) => {
warn!(
"failed to resolve fork reference rollout {:?}: {err}",
reference.rollout_path
);
idx += 1;
continue;
}
};
let parent_history = match RolloutRecorder::get_rollout_history(
&resolved_rollout_path,
)
.await
{
Ok(history) => history,
Err(err) => {
warn!(
"failed to load fork reference rollout {:?} (resolved from {:?}): {err}",
resolved_rollout_path, reference.rollout_path
);
idx += 1;
continue;
}
};
let parent_items = truncate_rollout_before_nth_user_message_from_start(
&parent_history.get_rollout_items(),
reference.nth_user_message,
);
stack.push((items, idx + 1, depth));
stack.push((parent_items, 0, depth + 1));
break;
}
item => materialized.push(item.clone()),
}
idx += 1;
}
}
materialized
}
#[cfg(test)]
#[path = "truncation_tests.rs"]
mod tests;

View File

@@ -27,6 +27,7 @@ use crate::skills::SkillsManager;
use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::protocol::ForkReferenceItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
@@ -36,6 +37,7 @@ use codex_protocol::protocol::W3cTraceContext;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -489,8 +491,39 @@ impl ThreadManager {
persist_extended_history: bool,
parent_trace: Option<W3cTraceContext>,
) -> CodexResult<NewThread> {
let history = RolloutRecorder::get_rollout_history(&path).await?;
let history = truncate_before_nth_user_message(history, nth_user_message);
// True forks must discard the source rollout's conversation id so the child gets a
// distinct thread id and preserves `forked_from_id` in its SessionMeta. Using the
// resume loader here silently turns a fork into an in-place resume.
let history = RolloutRecorder::get_fork_history(&path).await?;
let mut history = truncate_before_nth_user_message(
config.codex_home.as_path(),
history,
nth_user_message,
)
.await;
if let InitialHistory::Forked(items) = &mut history {
let source_session_meta = items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.clone()),
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
});
// Keep the source SessionMeta in-memory so startup can derive `forked_from_id`
// for SessionConfigured while still persisting only the compact ForkReference
// suffix to the child rollout on disk.
*items = source_session_meta
.into_iter()
.map(RolloutItem::SessionMeta)
.chain(std::iter::once(RolloutItem::ForkReference(
ForkReferenceItem {
rollout_path: path.clone(),
nth_user_message,
},
)))
.collect();
}
Box::pin(self.state.spawn_thread(
config,
history,
@@ -745,8 +778,18 @@ impl ThreadManagerState {
/// Return a prefix of `items` obtained by cutting strictly before the nth user message
/// (0-based) and all items that follow it.
fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> InitialHistory {
let items: Vec<RolloutItem> = history.get_rollout_items();
async fn truncate_before_nth_user_message(
codex_home: &Path,
history: InitialHistory,
n: usize,
) -> InitialHistory {
let mut items: Vec<RolloutItem> = history.get_rollout_items();
if items
.iter()
.any(|item| matches!(item, RolloutItem::ForkReference(_)))
{
items = truncation::materialize_rollout_items_for_replay(codex_home, &items).await;
}
let rolled = truncation::truncate_rollout_before_nth_user_message_from_start(&items, n);
if rolled.is_empty() {
@@ -759,3 +802,131 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia
#[cfg(test)]
#[path = "thread_manager_tests.rs"]
mod tests;
#[cfg(test)]
mod fork_reference_tests {
use super::*;
use crate::codex::make_session_and_context;
use assert_matches::assert_matches;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
#[tokio::test]
async fn drops_from_last_user_only() {
let items = [
user_msg("u1"),
assistant_msg("a1"),
assistant_msg("a2"),
user_msg("u2"),
assistant_msg("a3"),
ResponseItem::Reasoning {
id: "r1".to_string(),
summary: vec![ReasoningItemReasoningSummary::SummaryText {
text: "s".to_string(),
}],
content: None,
encrypted_content: None,
},
ResponseItem::FunctionCall {
id: None,
call_id: "c1".to_string(),
name: "tool".to_string(),
namespace: None,
arguments: "{}".to_string(),
},
assistant_msg("a4"),
];
let initial: Vec<RolloutItem> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated =
truncate_before_nth_user_message(Path::new("/tmp"), InitialHistory::Forked(initial), 1)
.await;
let got_items = truncated.get_rollout_items();
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
RolloutItem::ResponseItem(items[1].clone()),
RolloutItem::ResponseItem(items[2].clone()),
];
assert_eq!(
serde_json::to_value(&got_items).unwrap(),
serde_json::to_value(&expected_items).unwrap()
);
let initial2: Vec<RolloutItem> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated2 = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(initial2),
2,
)
.await;
assert_matches!(truncated2, InitialHistory::New);
}
#[tokio::test]
async fn ignores_session_prefix_messages_when_truncating() {
let (session, turn_context) = make_session_and_context().await;
let mut items = session.build_initial_context(&turn_context).await;
items.push(user_msg("feature request"));
items.push(assistant_msg("ack"));
items.push(user_msg("second question"));
items.push(assistant_msg("answer"));
let rollout_items: Vec<RolloutItem> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(rollout_items),
1,
)
.await;
let got_items = truncated.get_rollout_items();
let expected: Vec<RolloutItem> = vec![
RolloutItem::ResponseItem(items[0].clone()),
RolloutItem::ResponseItem(items[1].clone()),
RolloutItem::ResponseItem(items[2].clone()),
RolloutItem::ResponseItem(items[3].clone()),
];
assert_eq!(
serde_json::to_value(&got_items).unwrap(),
serde_json::to_value(&expected).unwrap()
);
}
}

View File

@@ -10,6 +10,7 @@ use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelsResponse;
use core_test_support::responses::mount_models_once;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::time::Duration;
use tempfile::tempdir;
use wiremock::MockServer;
@@ -37,8 +38,8 @@ fn assistant_msg(text: &str) -> ResponseItem {
}
}
#[test]
fn drops_from_last_user_only() {
#[tokio::test]
async fn drops_from_last_user_only() {
let items = [
user_msg("u1"),
assistant_msg("a1"),
@@ -68,7 +69,9 @@ fn drops_from_last_user_only() {
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1);
let truncated =
truncate_before_nth_user_message(Path::new("/tmp"), InitialHistory::Forked(initial), 1)
.await;
let got_items = truncated.get_rollout_items();
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
@@ -85,7 +88,9 @@ fn drops_from_last_user_only() {
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2);
let truncated2 =
truncate_before_nth_user_message(Path::new("/tmp"), InitialHistory::Forked(initial2), 2)
.await;
assert_matches!(truncated2, InitialHistory::New);
}
@@ -104,7 +109,12 @@ async fn ignores_session_prefix_messages_when_truncating() {
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1);
let truncated = truncate_before_nth_user_message(
Path::new("/tmp"),
InitialHistory::Forked(rollout_items),
1,
)
.await;
let got_items = truncated.get_rollout_items();
let expected: Vec<RolloutItem> = vec![

View File

@@ -18,6 +18,59 @@ use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
fn find_user_input_positions(items: &[RolloutItem]) -> Vec<usize> {
let mut pos = Vec::new();
for (i, it) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(response_item) = it
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
{
pos.push(i);
}
}
pos
}
fn truncate_before_nth_user_message(
items: &[RolloutItem],
nth_user_message: usize,
) -> Vec<RolloutItem> {
if nth_user_message == usize::MAX {
return items.to_vec();
}
let user_inputs = find_user_input_positions(items);
let Some(cut_idx) = user_inputs.get(nth_user_message).copied() else {
return Vec::new();
};
items[..cut_idx].to_vec()
}
fn read_items_materialized(p: &std::path::Path) -> Vec<RolloutItem> {
let text =
std::fs::read_to_string(p).unwrap_or_else(|err| panic!("read rollout file {p:?}: {err}"));
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value =
serde_json::from_str(line).unwrap_or_else(|err| panic!("jsonl line parse: {err}"));
let rl: RolloutLine =
serde_json::from_value(v).unwrap_or_else(|err| panic!("rollout line parse: {err}"));
match rl.item {
RolloutItem::SessionMeta(_) => {}
RolloutItem::ForkReference(reference) => {
let parent_items = read_items_materialized(&reference.rollout_path);
items.extend(truncate_before_nth_user_message(
&parent_items,
reference.nth_user_message,
));
}
other => items.push(other),
}
}
items
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_thread_twice_drops_to_first_message() {
skip_if_no_network!();
@@ -63,40 +116,9 @@ async fn fork_thread_twice_drops_to_first_message() {
// GetHistory flushes before returning the path; no wait needed.
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
let text = std::fs::read_to_string(p).expect("read rollout file");
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
match rl.item {
RolloutItem::SessionMeta(_) => {}
other => items.push(other),
}
}
items
};
// Compute expected prefixes after each fork by truncating base rollout
// strictly before the nth user input (0-based).
let base_items = read_items(&base_path);
let find_user_input_positions = |items: &[RolloutItem]| -> Vec<usize> {
let mut pos = Vec::new();
for (i, it) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(response_item) = it
&& let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item)
{
// Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem.
// We specifically look for input items, which are represented as ContentItem::InputText.
pos.push(i);
}
}
pos
};
let base_items = read_items_materialized(&base_path);
let user_inputs = find_user_input_positions(&base_items);
// After cutting at nth user input (n=1 → second user message), cut strictly before that input.
@@ -117,7 +139,7 @@ async fn fork_thread_twice_drops_to_first_message() {
let fork1_path = codex_fork1.rollout_path().expect("rollout path");
// GetHistory on fork1 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_items = read_items_materialized(&fork1_path);
assert!(fork1_items.len() > expected_after_first.len());
pretty_assertions::assert_eq!(
serde_json::to_value(&fork1_items[..expected_after_first.len()]).unwrap(),
@@ -135,17 +157,68 @@ async fn fork_thread_twice_drops_to_first_message() {
let fork2_path = codex_fork2.rollout_path().expect("rollout path");
// GetHistory on fork2 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_items = read_items_materialized(&fork1_path);
let fork1_user_inputs = find_user_input_positions(&fork1_items);
let cut_last_on_fork1 = fork1_user_inputs
.get(fork1_user_inputs.len().saturating_sub(1))
.copied()
.unwrap_or(0);
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
let fork2_items = read_items(&fork2_path);
let fork2_items = read_items_materialized(&fork2_path);
assert!(fork2_items.len() > expected_after_second.len());
pretty_assertions::assert_eq!(
serde_json::to_value(&fork2_items[..expected_after_second.len()]).unwrap(),
serde_json::to_value(&expected_after_second).unwrap()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_thread_session_configured_preserves_parent_and_history() {
skip_if_no_network!();
let server = MockServer::start().await;
let sse = sse(vec![ev_response_created("resp"), ev_completed("resp")]);
let response = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse, "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(response)
.expect(1)
.mount(&server)
.await;
let mut builder = test_codex();
let test = builder.build(&server).await.expect("create conversation");
let codex = test.codex.clone();
let thread_manager = test.thread_manager.clone();
let config_for_fork = test.config.clone();
let parent_thread_id = test.session_configured.session_id;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "seed".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let base_path = codex.rollout_path().expect("rollout path");
let NewThread {
thread_id: child_thread_id,
session_configured,
..
} = thread_manager
.fork_thread(usize::MAX, config_for_fork, base_path, false, None)
.await
.expect("fork thread");
pretty_assertions::assert_eq!(session_configured.forked_from_id, Some(parent_thread_id));
assert_ne!(child_thread_id, parent_thread_id);
}

View File

@@ -2159,12 +2159,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.forked_from_id,
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.id),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -2194,7 +2202,11 @@ impl InitialHistory {
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_) => None,
})
.collect(),
),
@@ -2203,7 +2215,11 @@ impl InitialHistory {
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
RolloutItem::SessionMeta(_)
| RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_) => None,
})
.collect(),
),
@@ -2217,12 +2233,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -2233,12 +2257,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -2247,7 +2279,11 @@ impl InitialHistory {
fn session_cwd_from_items(items: &[RolloutItem]) -> Option<PathBuf> {
items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.cwd.clone()),
_ => None,
RolloutItem::ForkReference(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
@@ -2401,10 +2437,17 @@ pub struct SessionMetaLine {
pub git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
pub struct ForkReferenceItem {
pub rollout_path: PathBuf,
pub nth_user_message: usize,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
pub enum RolloutItem {
SessionMeta(SessionMetaLine),
ForkReference(ForkReferenceItem),
ResponseItem(ResponseItem),
Compacted(CompactedItem),
TurnContext(TurnContextItem),

View File

@@ -22,6 +22,7 @@ pub fn apply_rollout_item(
RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx),
RolloutItem::EventMsg(event) => apply_event_msg(metadata, event),
RolloutItem::ResponseItem(item) => apply_response_item(metadata, item),
RolloutItem::ForkReference(_) => {}
RolloutItem::Compacted(_) => {}
}
if metadata.model_provider.is_empty() {
@@ -34,9 +35,10 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
match item {
RolloutItem::SessionMeta(_) | RolloutItem::TurnContext(_) => true,
RolloutItem::EventMsg(EventMsg::TokenCount(_) | EventMsg::UserMessage(_)) => true,
RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) => {
false
}
RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::ForkReference(_)
| RolloutItem::Compacted(_) => false,
}
}

View File

@@ -563,6 +563,7 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
@@ -573,6 +574,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::ForkReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})