Compare commits

...

7 Commits

Author SHA1 Message Date
jif-oai
538f338b42 fix some tests 2026-03-31 19:29:22 +02:00
jif-oai
ceec4cd64e Merge remote-tracking branch 'origin/jif/compressed-rollouts' into jif/compressed-rollouts 2026-03-31 19:09:45 +02:00
jif-oai
dd82f28ea2 nit fix 2026-03-31 19:09:31 +02:00
jif-oai
793dc92ae0 simplify 2026-03-31 18:38:42 +02:00
jif-oai
83eb0ed8b2 Merge branch 'main' into jif/compressed-rollouts 2026-03-31 16:58:54 +02:00
jif-oai
74bd00fc43 nit tests fix 2026-03-31 16:34:26 +02:00
jif-oai
8e6c8d9de9 feat: compress rollout files 2026-03-31 16:17:24 +02:00
27 changed files with 863 additions and 367 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -2478,6 +2478,7 @@ dependencies = [
"tokio",
"tracing",
"uuid",
"zstd",
]
[[package]]
@@ -2712,6 +2713,7 @@ dependencies = [
"which 8.0.0",
"windows-sys 0.52.0",
"winsplit",
"zstd",
]
[[package]]

View File

@@ -232,6 +232,7 @@ use codex_core::plugins::PluginUninstallError as CorePluginUninstallError;
use codex_core::plugins::load_plugin_apps;
use codex_core::plugins::load_plugin_mcp_servers;
use codex_core::read_head_for_summary;
use codex_core::read_latest_turn_context;
use codex_core::read_session_meta_line;
use codex_core::rollout_date_parts;
use codex_core::sandboxing::SandboxPermissions;
@@ -2972,7 +2973,6 @@ impl CodexMessageProcessor {
});
};
let required_suffix = format!("{thread_id}.jsonl");
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
@@ -2980,9 +2980,7 @@ impl CodexMessageProcessor {
data: None,
});
};
if !file_name
.to_string_lossy()
.ends_with(required_suffix.as_str())
if !rollout_file_name_matches_thread_id(file_name.to_string_lossy().as_ref(), thread_id)
{
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
@@ -5324,7 +5322,6 @@ impl CodexMessageProcessor {
};
// Verify file name matches thread id.
let required_suffix = format!("{thread_id}.jsonl");
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
@@ -5335,10 +5332,7 @@ impl CodexMessageProcessor {
data: None,
});
};
if !file_name
.to_string_lossy()
.ends_with(required_suffix.as_str())
{
if !rollout_file_name_matches_thread_id(file_name.to_string_lossy().as_ref(), thread_id) {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
@@ -8184,6 +8178,10 @@ async fn read_history_cwd_from_state_db(
thread_id: Option<ThreadId>,
rollout_path: &Path,
) -> Option<PathBuf> {
if let Ok(Some(turn_context)) = read_latest_turn_context(rollout_path).await {
return Some(turn_context.cwd);
}
if let Some(state_db_ctx) = get_state_db(config).await
&& let Some(thread_id) = thread_id
&& let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await
@@ -8274,7 +8272,7 @@ async fn summary_from_thread_list_item(
fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
let file_name = path.file_name()?.to_str()?;
let stem = file_name.strip_suffix(".jsonl")?;
let stem = codex_core::strip_rollout_file_suffix(file_name)?;
if stem.len() < 37 {
return None;
}
@@ -8285,6 +8283,13 @@ fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
ThreadId::from_string(&stem[uuid_start..]).ok()
}
fn rollout_file_name_matches_thread_id(file_name: &str, thread_id: ThreadId) -> bool {
let Some(stem) = codex_core::strip_rollout_file_suffix(file_name) else {
return false;
};
stem.ends_with(&format!("-{thread_id}"))
}
#[allow(clippy::too_many_arguments)]
fn summary_from_state_db_metadata(
conversation_id: ThreadId,
@@ -8412,6 +8417,12 @@ pub(crate) async fn read_summary_from_rollout(
.unwrap_or_else(|| fallback_provider.to_string());
let git_info = git.as_ref().map(map_git_info);
let updated_at = updated_at.or_else(|| timestamp.clone());
let resume_cwd = read_latest_turn_context(path)
.await
.ok()
.flatten()
.map(|turn_context| turn_context.cwd)
.unwrap_or_else(|| session_meta.cwd.clone());
Ok(ConversationSummary {
conversation_id: session_meta.id,
@@ -8420,7 +8431,7 @@ pub(crate) async fn read_summary_from_rollout(
path: path.to_path_buf(),
preview: String::new(),
model_provider,
cwd: session_meta.cwd,
cwd: resume_cwd,
cli_version: session_meta.cli_version,
source: session_meta.source,
git_info,

View File

@@ -172,16 +172,21 @@ pub use rollout::find_conversation_path_by_id_str;
pub use rollout::find_thread_name_by_id;
pub use rollout::find_thread_path_by_id_str;
pub use rollout::find_thread_path_by_name_str;
pub use rollout::is_rollout_path;
pub use rollout::list::Cursor;
pub use rollout::list::ThreadItem;
pub use rollout::list::ThreadSortKey;
pub use rollout::list::ThreadsPage;
pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary;
pub use rollout::list::read_latest_turn_context;
pub use rollout::list::read_session_meta_line;
pub use rollout::policy::EventPersistenceMode;
pub use rollout::read_nonempty_rollout_text;
pub use rollout::read_rollout_text;
pub use rollout::rollout_date_parts;
pub use rollout::session_index::find_thread_names_by_ids;
pub use rollout::strip_rollout_file_suffix;
mod function_tool;
mod state;
mod tasks;

View File

@@ -12,7 +12,11 @@ pub use codex_rollout::find_conversation_path_by_id_str;
pub use codex_rollout::find_thread_name_by_id;
pub use codex_rollout::find_thread_path_by_id_str;
pub use codex_rollout::find_thread_path_by_name_str;
pub use codex_rollout::is_rollout_path;
pub use codex_rollout::read_nonempty_rollout_text;
pub use codex_rollout::read_rollout_text;
pub use codex_rollout::rollout_date_parts;
pub use codex_rollout::strip_rollout_file_suffix;
impl codex_rollout::RolloutConfigView for Config {
fn codex_home(&self) -> &std::path::Path {

View File

@@ -1,5 +1,7 @@
use assert_cmd::Command as AssertCommand;
use codex_core::auth::CODEX_API_KEY_ENV_VAR;
use codex_core::is_rollout_path;
use codex_core::read_rollout_text;
use codex_git_utils::collect_git_info;
use codex_protocol::protocol::GitInfo;
use codex_utils_cargo_bin::find_resource;
@@ -375,10 +377,10 @@ async fn integration_creates_and_checks_session_file() -> anyhow::Result<()> {
// Find the session file that contains `marker`.
let marker_clone = marker.clone();
let path = fs_wait::wait_for_matching_file(&sessions_dir, Duration::from_secs(10), move |p| {
if p.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
if !is_rollout_path(p) {
return false;
}
let Ok(content) = std::fs::read_to_string(p) else {
let Ok(content) = read_rollout_text(p) else {
return false;
};
content.contains(&marker_clone)
@@ -422,7 +424,7 @@ async fn integration_creates_and_checks_session_file() -> anyhow::Result<()> {
}
let content =
std::fs::read_to_string(&path).unwrap_or_else(|_| panic!("Failed to read session file"));
read_rollout_text(path.as_path()).unwrap_or_else(|_| panic!("Failed to read session file"));
let mut lines = content.lines();
let meta_line = lines
.next()
@@ -491,10 +493,10 @@ async fn integration_creates_and_checks_session_file() -> anyhow::Result<()> {
let marker2_clone = marker2.clone();
let resumed_path =
fs_wait::wait_for_matching_file(&sessions_dir, Duration::from_secs(10), move |p| {
if p.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
if !is_rollout_path(p) {
return false;
}
std::fs::read_to_string(p)
read_rollout_text(p)
.map(|content| content.contains(&marker2_clone))
.unwrap_or(false)
})
@@ -506,7 +508,7 @@ async fn integration_creates_and_checks_session_file() -> anyhow::Result<()> {
"resume should create a new session file"
);
let resumed_content = std::fs::read_to_string(&resumed_path)?;
let resumed_content = read_rollout_text(resumed_path.as_path())?;
assert!(
resumed_content.contains(&marker),
"resumed file missing original marker"

View File

@@ -5,6 +5,7 @@ use codex_core::built_in_model_providers;
use codex_core::compact::SUMMARIZATION_PROMPT;
use codex_core::compact::SUMMARY_PREFIX;
use codex_core::config::Config;
use codex_core::read_rollout_text;
use codex_features::Feature;
use codex_protocol::items::TurnItem;
use codex_protocol::openai_models::ModelInfo;
@@ -368,7 +369,7 @@ async fn summarize_context_three_requests_and_instructions() {
// Verify rollout contains user-turn TurnContext entries and a Compacted entry.
println!("rollout path: {}", rollout_path.display());
let text = std::fs::read_to_string(&rollout_path).unwrap_or_else(|e| {
let text = read_rollout_text(rollout_path.as_path()).unwrap_or_else(|e| {
panic!(
"failed to read rollout file {}: {e}",
rollout_path.display()
@@ -2069,7 +2070,7 @@ async fn auto_compact_persists_rollout_entries() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
let rollout_path = session_configured.rollout_path.expect("rollout path");
let text = std::fs::read_to_string(&rollout_path).unwrap_or_else(|e| {
let text = read_rollout_text(rollout_path.as_path()).unwrap_or_else(|e| {
panic!(
"failed to read rollout file {}: {e}",
rollout_path.display()

View File

@@ -6,6 +6,7 @@ use std::path::PathBuf;
use anyhow::Result;
use codex_core::CodexAuth;
use codex_core::compact::SUMMARY_PREFIX;
use codex_core::read_rollout_text;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
@@ -1160,7 +1161,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
assert_eq!(responses_mock.requests().len(), 1);
assert_eq!(compact_mock.requests().len(), 1);
let rollout_text = fs::read_to_string(&rollout_path)?;
let rollout_text = read_rollout_text(rollout_path.as_path())?;
let mut saw_compacted_history = false;
for line in rollout_text
.lines()

View File

@@ -1,6 +1,7 @@
use codex_core::ForkSnapshot;
use codex_core::NewThread;
use codex_core::parse_turn_item;
use codex_core::read_rollout_text;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
@@ -66,7 +67,7 @@ async fn fork_thread_twice_drops_to_first_message() {
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
let text = std::fs::read_to_string(p).expect("read rollout file");
let text = read_rollout_text(p).expect("read rollout file");
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {

View File

@@ -3,6 +3,7 @@ use std::path::Path;
use anyhow::Context;
use anyhow::Result;
use codex_core::read_rollout_text;
use codex_features::Feature;
use codex_protocol::items::parse_hook_prompt_fragment;
use codex_protocol::models::ContentItem;
@@ -558,7 +559,7 @@ async fn stop_hook_can_block_multiple_times_in_same_turn() -> Result<()> {
);
let rollout_path = test.codex.rollout_path().expect("rollout path");
let rollout_text = fs::read_to_string(&rollout_path)?;
let rollout_text = read_rollout_text(rollout_path.as_path())?;
let hook_prompt_texts = rollout_hook_prompt_texts(&rollout_text)?;
assert!(
hook_prompt_texts.contains(&FIRST_CONTINUATION_PROMPT.to_string()),
@@ -746,7 +747,7 @@ async fn multiple_blocking_stop_hooks_persist_multiple_hook_prompt_fragments() -
);
let rollout_path = test.codex.rollout_path().expect("rollout path");
let rollout_text = fs::read_to_string(&rollout_path)?;
let rollout_text = read_rollout_text(rollout_path.as_path())?;
assert_eq!(
rollout_hook_prompt_texts(&rollout_text)?,
vec![

View File

@@ -1,4 +1,5 @@
use anyhow::Context;
use codex_core::read_nonempty_rollout_text;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AskForApproval;
@@ -22,7 +23,6 @@ use image::ImageBuffer;
use image::Rgba;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::time::Duration;
fn find_user_message_with_image(text: &str) -> Option<ResponseItem> {
for line in text.lines() {
@@ -58,20 +58,6 @@ fn extract_image_url(item: &ResponseItem) -> Option<String> {
}
}
async fn read_rollout_text(path: &Path) -> anyhow::Result<String> {
for _ in 0..50 {
if path.exists()
&& let Ok(text) = std::fs::read_to_string(path)
&& !text.trim().is_empty()
{
return Ok(text);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
std::fs::read_to_string(path)
.with_context(|| format!("read rollout file at {}", path.display()))
}
fn write_test_png(path: &Path, color: [u8; 4]) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
@@ -138,7 +124,9 @@ async fn copy_paste_local_image_persists_rollout_request_shape() -> anyhow::Resu
wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await;
let rollout_path = codex.rollout_path().expect("rollout path");
let rollout_text = read_rollout_text(&rollout_path).await?;
let rollout_text = read_nonempty_rollout_text(&rollout_path)
.await
.with_context(|| format!("read rollout file at {}", rollout_path.display()))?;
let actual = find_user_message_with_image(&rollout_text)
.expect("expected user message with input image in rollout");
@@ -222,7 +210,9 @@ async fn drag_drop_image_persists_rollout_request_shape() -> anyhow::Result<()>
wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await;
let rollout_path = codex.rollout_path().expect("rollout path");
let rollout_text = read_rollout_text(&rollout_path).await?;
let rollout_text = read_nonempty_rollout_text(&rollout_path)
.await
.with_context(|| format!("read rollout file at {}", rollout_path.display()))?;
let actual = find_user_message_with_image(&rollout_text)
.expect("expected user message with input image in rollout");

View File

@@ -1,5 +1,6 @@
use anyhow::Result;
use codex_core::config::Constrained;
use codex_core::read_nonempty_rollout_text;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::COLLABORATION_MODE_CLOSE_TAG;
use codex_protocol::protocol::COLLABORATION_MODE_OPEN_TAG;
@@ -19,7 +20,6 @@ use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
fn collab_mode_with_instructions(instructions: Option<&str>) -> CollaborationMode {
@@ -37,19 +37,6 @@ fn collab_xml(text: &str) -> String {
format!("{COLLABORATION_MODE_OPEN_TAG}{text}{COLLABORATION_MODE_CLOSE_TAG}")
}
async fn read_rollout_text(path: &Path) -> anyhow::Result<String> {
for _ in 0..50 {
if path.exists()
&& let Ok(text) = std::fs::read_to_string(path)
&& !text.trim().is_empty()
{
return Ok(text);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
Ok(std::fs::read_to_string(path)?)
}
fn rollout_developer_texts(text: &str) -> Vec<String> {
let mut texts = Vec::new();
for line in text.lines() {
@@ -132,7 +119,7 @@ async fn override_turn_context_without_user_turn_does_not_record_permissions_upd
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
let rollout_path = test.codex.rollout_path().expect("rollout path");
let rollout_text = read_rollout_text(&rollout_path).await?;
let rollout_text = read_nonempty_rollout_text(&rollout_path).await?;
let developer_texts = rollout_developer_texts(&rollout_text);
let approval_texts: Vec<&String> = developer_texts
.iter()
@@ -174,7 +161,7 @@ async fn override_turn_context_without_user_turn_does_not_record_environment_upd
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
let rollout_path = test.codex.rollout_path().expect("rollout path");
let rollout_text = read_rollout_text(&rollout_path).await?;
let rollout_text = read_nonempty_rollout_text(&rollout_path).await?;
let env_texts = rollout_environment_texts(&rollout_text);
assert!(
env_texts.is_empty(),
@@ -213,7 +200,7 @@ async fn override_turn_context_without_user_turn_does_not_record_collaboration_u
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
let rollout_path = test.codex.rollout_path().expect("rollout path");
let rollout_text = read_rollout_text(&rollout_path).await?;
let rollout_text = read_nonempty_rollout_text(&rollout_path).await?;
let developer_texts = rollout_developer_texts(&rollout_text);
let collab_text = collab_xml(collab_text);
let collab_count = developer_texts

View File

@@ -1,6 +1,7 @@
use codex_core::CodexThread;
use codex_core::REVIEW_PROMPT;
use codex_core::config::Config;
use codex_core::read_rollout_text;
use codex_core::review_format::render_review_output_text;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
@@ -120,7 +121,7 @@ async fn review_op_emits_lifecycle_and_review_output() {
// Also verify that a user message with the header and a formatted finding
// was recorded back in the parent session's rollout.
let path = codex.rollout_path().expect("rollout path");
let text = std::fs::read_to_string(&path).expect("read rollout file");
let text = read_rollout_text(path.as_path()).expect("read rollout file");
let mut saw_header = false;
let mut saw_finding_line = false;
@@ -641,7 +642,7 @@ async fn review_input_isolated_from_parent_history() {
// Also verify that a user interruption note was recorded in the rollout.
let path = codex.rollout_path().expect("rollout path");
let text = std::fs::read_to_string(&path).expect("read rollout file");
let text = read_rollout_text(path.as_path()).expect("read rollout file");
let mut saw_interruption_message = false;
for line in text.lines() {
if line.trim().is_empty() {

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use codex_core::ThreadConfigSnapshot;
use codex_core::config::AgentRoleConfig;
use codex_core::read_rollout_text;
use codex_features::Feature;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ReasoningEffort;
@@ -227,8 +228,7 @@ async fn setup_turn_one_with_custom_spawned_child(
.ok_or_else(|| anyhow::anyhow!("expected parent rollout path"))?;
let deadline = Instant::now() + Duration::from_secs(6);
loop {
let has_notification = tokio::fs::read_to_string(&rollout_path)
.await
let has_notification = read_rollout_text(rollout_path.as_path())
.is_ok_and(|rollout| rollout.contains("<subagent_notification>"));
if has_notification {
break;

View File

@@ -65,6 +65,7 @@ use codex_core::config_loader::LoaderOverrides;
use codex_core::config_loader::format_config_error_with_source;
use codex_core::format_exec_policy_error_with_source;
use codex_core::path_utils;
use codex_core::read_latest_turn_context;
use codex_feedback::CodexFeedback;
use codex_git_utils::get_git_repo_root;
use codex_otel::set_parent_from_context;
@@ -74,7 +75,6 @@ use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::SessionSource;
@@ -1116,30 +1116,13 @@ fn all_thread_source_kinds() -> Vec<ThreadSourceKind> {
async fn latest_thread_cwd(thread: &AppServerThread) -> PathBuf {
if let Some(path) = thread.path.as_deref()
&& let Some(cwd) = parse_latest_turn_context_cwd(path).await
&& let Ok(Some(turn_context)) = read_latest_turn_context(path).await
{
return cwd;
return turn_context.cwd;
}
thread.cwd.clone()
}
async fn parse_latest_turn_context_cwd(path: &Path) -> Option<PathBuf> {
let text = tokio::fs::read_to_string(path).await.ok()?;
for line in text.lines().rev() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
continue;
};
if let RolloutItem::TurnContext(item) = rollout_line.item {
return Some(item.cwd);
}
}
None
}
fn cwds_match(current_cwd: &Path, session_cwd: &Path) -> bool {
match (
path_utils::normalize_for_path_comparison(current_cwd),

View File

@@ -1,6 +1,7 @@
#![cfg(not(target_os = "windows"))]
#![allow(clippy::expect_used, clippy::unwrap_used)]
use codex_core::is_rollout_path;
use codex_utils_cargo_bin::find_resource;
use core_test_support::test_codex_exec::test_codex_exec;
use walkdir::WalkDir;
@@ -15,7 +16,7 @@ fn session_rollout_count(home_path: &std::path::Path) -> usize {
.into_iter()
.filter_map(Result::ok)
.filter(|entry| entry.file_type().is_file())
.filter(|entry| entry.file_name().to_string_lossy().ends_with(".jsonl"))
.filter(|entry| is_rollout_path(entry.path()))
.count()
}

View File

@@ -1,5 +1,7 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use anyhow::Context;
use codex_core::is_rollout_path;
use codex_core::read_rollout_text;
use codex_utils_cargo_bin::find_resource;
use core_test_support::test_codex_exec::test_codex_exec;
use pretty_assertions::assert_eq;
@@ -23,11 +25,11 @@ fn find_session_file_containing_marker(
if !entry.file_type().is_file() {
continue;
}
if !entry.file_name().to_string_lossy().ends_with(".jsonl") {
if !is_rollout_path(entry.path()) {
continue;
}
let path = entry.path();
let Ok(content) = std::fs::read_to_string(path) else {
let Ok(content) = read_rollout_text(path) else {
continue;
};
// Skip the first meta line and scan remaining JSONL entries.
@@ -60,7 +62,7 @@ fn find_session_file_containing_marker(
/// Extract the conversation UUID from the first SessionMeta line in the rollout file.
fn extract_conversation_id(path: &std::path::Path) -> String {
let content = std::fs::read_to_string(path).unwrap();
let content = read_rollout_text(path).unwrap();
let mut lines = content.lines();
let meta_line = lines.next().expect("missing meta line");
let meta: Value = serde_json::from_str(meta_line).expect("invalid meta json");
@@ -72,7 +74,7 @@ fn extract_conversation_id(path: &std::path::Path) -> String {
}
fn last_user_image_count(path: &std::path::Path) -> usize {
let content = std::fs::read_to_string(path).unwrap_or_default();
let content = read_rollout_text(path).unwrap_or_default();
let mut last_count = 0;
for line in content.lines() {
if line.trim().is_empty() {
@@ -160,7 +162,7 @@ fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
resumed_path, path,
"resume --last should append to existing file"
);
let content = std::fs::read_to_string(&resumed_path)?;
let content = read_rollout_text(&resumed_path)?;
assert!(content.contains(&marker));
assert!(content.contains(&marker2));
Ok(())
@@ -214,7 +216,7 @@ fn exec_resume_last_accepts_prompt_after_flag_in_json_mode() -> anyhow::Result<(
resumed_path, path,
"resume --last should append to existing file"
);
let content = std::fs::read_to_string(&resumed_path)?;
let content = read_rollout_text(&resumed_path)?;
assert!(content.contains(&marker));
assert!(content.contains(&marker2));
Ok(())
@@ -417,7 +419,7 @@ fn exec_resume_by_id_appends_to_existing_file() -> anyhow::Result<()> {
resumed_path, path,
"resume by id should append to existing file"
);
let content = std::fs::read_to_string(&resumed_path)?;
let content = read_rollout_text(&resumed_path)?;
assert!(content.contains(&marker));
assert!(content.contains(&marker2));
Ok(())
@@ -493,7 +495,7 @@ fn exec_resume_preserves_cli_configuration_overrides() -> anyhow::Result<()> {
.expect("no resumed session file containing marker2");
assert_eq!(resumed_path, path, "resume should append to same file");
let content = std::fs::read_to_string(&resumed_path)?;
let content = read_rollout_text(&resumed_path)?;
assert!(content.contains(&marker));
assert!(content.contains(&marker2));
Ok(())

View File

@@ -43,6 +43,7 @@ tokio = { workspace = true, features = [
] }
tracing = { workspace = true }
uuid = { workspace = true }
zstd = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -0,0 +1,215 @@
use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::io::BufRead;
use std::io::BufReader;
use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
use std::path::Path;
use std::time::Duration;
use tokio::time::sleep;
pub const ROLLOUT_FILE_SUFFIX: &str = ".jsonl";
pub const COMPRESSED_ROLLOUT_FILE_SUFFIX: &str = ".jsonl.zst";
const DEFAULT_ZSTD_LEVEL: i32 = 0;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum RolloutFileEncoding {
PlainJsonl,
ZstdJsonl,
}
impl RolloutFileEncoding {
pub(crate) fn for_path(path: &Path) -> Self {
path.file_name()
.and_then(|file_name| file_name.to_str())
.and_then(file_encoding_from_name)
.unwrap_or(Self::PlainJsonl)
}
}
/// Returns the suffix used for newly created rollout files.
pub fn preferred_rollout_file_suffix() -> &'static str {
COMPRESSED_ROLLOUT_FILE_SUFFIX
}
/// Returns true when `name` matches a rollout filename in either supported encoding.
pub fn is_rollout_file_name(name: &str) -> bool {
name.starts_with("rollout-") && strip_rollout_file_suffix(name).is_some()
}
/// Returns true when `path` points to a rollout file in either supported encoding.
pub fn is_rollout_path(path: &Path) -> bool {
path.file_name()
.and_then(|file_name| file_name.to_str())
.is_some_and(is_rollout_file_name)
}
/// Removes the rollout suffix from `name`, supporting both plain and compressed files.
pub fn strip_rollout_file_suffix(name: &str) -> Option<&str> {
name.strip_suffix(COMPRESSED_ROLLOUT_FILE_SUFFIX)
.or_else(|| name.strip_suffix(ROLLOUT_FILE_SUFFIX))
}
/// Reads the full rollout file contents, transparently handling plain and zstd-compressed files.
pub fn read_rollout_text(path: &Path) -> io::Result<String> {
let mut text = String::new();
match RolloutFileEncoding::for_path(path) {
RolloutFileEncoding::PlainJsonl => {
File::open(path)?.read_to_string(&mut text)?;
}
RolloutFileEncoding::ZstdJsonl => {
let file = File::open(path)?;
let mut decoder = zstd::stream::read::Decoder::new(file)?;
match decoder.read_to_string(&mut text) {
Ok(_) => {}
Err(err) if err.kind() == ErrorKind::UnexpectedEof && !text.is_empty() => {}
Err(err) => return Err(err),
}
}
}
Ok(text)
}
/// Retries reading `path` until the rollout exists and contains non-empty text, or returns the
/// last read attempt.
pub async fn read_nonempty_rollout_text(path: &Path) -> io::Result<String> {
const MAX_ATTEMPTS: usize = 50;
const RETRY_DELAY: Duration = Duration::from_millis(20);
for _ in 0..MAX_ATTEMPTS {
if path.exists()
&& let Ok(text) = read_rollout_text(path)
&& !text.trim().is_empty()
{
return Ok(text);
}
sleep(RETRY_DELAY).await;
}
read_rollout_text(path)
}
pub(crate) struct RolloutLineReader {
inner: RolloutLineReaderInner,
}
pub(crate) struct RolloutAppendWriter {
inner: RolloutAppendWriterInner,
}
enum RolloutLineReaderInner {
Plain(BufReader<File>),
Zstd(BufReader<zstd::stream::read::Decoder<'static, BufReader<File>>>),
}
enum RolloutAppendWriterInner {
Plain(File),
Zstd(zstd::stream::write::Encoder<'static, File>),
}
impl RolloutAppendWriter {
pub(crate) fn open(path: &Path) -> io::Result<Self> {
let Some(parent) = path.parent() else {
return Err(io::Error::other(format!(
"rollout path has no parent: {}",
path.display()
)));
};
std::fs::create_dir_all(parent)?;
let file = OpenOptions::new().append(true).create(true).open(path)?;
let inner = match RolloutFileEncoding::for_path(path) {
RolloutFileEncoding::PlainJsonl => RolloutAppendWriterInner::Plain(file),
RolloutFileEncoding::ZstdJsonl => RolloutAppendWriterInner::Zstd(
zstd::stream::write::Encoder::new(file, DEFAULT_ZSTD_LEVEL)?,
),
};
Ok(Self { inner })
}
pub(crate) fn append_text(&mut self, text: &str) -> io::Result<()> {
match &mut self.inner {
RolloutAppendWriterInner::Plain(file) => {
file.write_all(text.as_bytes())?;
file.flush()
}
RolloutAppendWriterInner::Zstd(encoder) => {
encoder.write_all(text.as_bytes())?;
encoder.flush()?;
encoder.get_mut().flush()
}
}
}
pub(crate) fn finish(self) -> io::Result<()> {
match self.inner {
RolloutAppendWriterInner::Plain(mut file) => file.flush(),
RolloutAppendWriterInner::Zstd(encoder) => {
let mut file = encoder.finish()?;
file.flush()
}
}
}
}
impl RolloutLineReader {
pub(crate) fn open(path: &Path) -> io::Result<Self> {
let inner = match RolloutFileEncoding::for_path(path) {
RolloutFileEncoding::PlainJsonl => {
RolloutLineReaderInner::Plain(BufReader::new(File::open(path)?))
}
RolloutFileEncoding::ZstdJsonl => {
let file = BufReader::new(File::open(path)?);
let decoder = zstd::stream::read::Decoder::with_buffer(file)?;
RolloutLineReaderInner::Zstd(BufReader::new(decoder))
}
};
Ok(Self { inner })
}
pub(crate) fn next_line(&mut self) -> io::Result<Option<String>> {
let mut line = String::new();
let bytes_read = match &mut self.inner {
RolloutLineReaderInner::Plain(reader) => reader.read_line(&mut line)?,
RolloutLineReaderInner::Zstd(reader) => match reader.read_line(&mut line) {
Ok(bytes_read) => bytes_read,
Err(err) if err.kind() == ErrorKind::UnexpectedEof && !line.is_empty() => {
line.len()
}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => 0,
Err(err) => return Err(err),
},
};
if bytes_read == 0 {
return Ok(None);
}
trim_line_ending(&mut line);
Ok(Some(line))
}
}
fn file_encoding_from_name(name: &str) -> Option<RolloutFileEncoding> {
if name.ends_with(COMPRESSED_ROLLOUT_FILE_SUFFIX) {
return Some(RolloutFileEncoding::ZstdJsonl);
}
if name.ends_with(ROLLOUT_FILE_SUFFIX) {
return Some(RolloutFileEncoding::PlainJsonl);
}
None
}
fn trim_line_ending(line: &mut String) {
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
}
#[cfg(test)]
#[path = "file_io_tests.rs"]
mod tests;

View File

@@ -0,0 +1,63 @@
use super::*;
use crate::file_io::RolloutAppendWriter;
use crate::file_io::RolloutLineReader;
use std::fs::File;
use std::io;
use std::io::Write;
use tempfile::TempDir;
#[test]
fn strip_rollout_file_suffix_supports_both_formats() {
assert_eq!(
strip_rollout_file_suffix("rollout-2026-01-01T00-00-00-thread.jsonl"),
Some("rollout-2026-01-01T00-00-00-thread")
);
assert_eq!(
strip_rollout_file_suffix("rollout-2026-01-01T00-00-00-thread.jsonl.zst"),
Some("rollout-2026-01-01T00-00-00-thread")
);
assert_eq!(strip_rollout_file_suffix("rollout.txt"), None);
}
#[test]
fn plain_rollouts_are_still_readable() -> io::Result<()> {
let temp_dir = TempDir::new()?;
let path = temp_dir
.path()
.join("rollout-2026-01-01T00-00-00-thread.jsonl");
let mut file = File::create(&path)?;
writeln!(file, "{{\"a\":1}}")?;
writeln!(file, "{{\"b\":2}}")?;
assert_eq!(read_rollout_text(path.as_path())?, "{\"a\":1}\n{\"b\":2}\n");
let mut reader = RolloutLineReader::open(path.as_path())?;
assert_eq!(reader.next_line()?, Some("{\"a\":1}".to_string()));
assert_eq!(reader.next_line()?, Some("{\"b\":2}".to_string()));
assert_eq!(reader.next_line()?, None);
Ok(())
}
#[test]
fn compressed_appends_are_read_back_from_open_stream() -> io::Result<()> {
let temp_dir = TempDir::new()?;
let path = temp_dir
.path()
.join("rollout-2026-01-01T00-00-00-thread.jsonl.zst");
let mut writer = RolloutAppendWriter::open(path.as_path())?;
writer.append_text("{\"a\":1}\n")?;
assert_eq!(read_rollout_text(path.as_path())?, "{\"a\":1}\n");
writer.append_text("{\"b\":2}\n")?;
assert_eq!(read_rollout_text(path.as_path())?, "{\"a\":1}\n{\"b\":2}\n");
writer.finish()?;
assert_eq!(read_rollout_text(path.as_path())?, "{\"a\":1}\n{\"b\":2}\n");
let mut reader = RolloutLineReader::open(path.as_path())?;
assert_eq!(reader.next_line()?, Some("{\"a\":1}".to_string()));
assert_eq!(reader.next_line()?, Some("{\"b\":2}".to_string()));
assert_eq!(reader.next_line()?, None);
Ok(())
}

View File

@@ -5,6 +5,7 @@ use std::sync::LazyLock;
use codex_protocol::protocol::SessionSource;
pub mod config;
pub(crate) mod file_io;
pub mod list;
pub mod metadata;
pub mod policy;
@@ -32,10 +33,15 @@ pub static INTERACTIVE_SESSION_SOURCES: LazyLock<Vec<SessionSource>> = LazyLock:
pub use codex_protocol::protocol::SessionMeta;
pub use config::RolloutConfig;
pub use config::RolloutConfigView;
pub use file_io::is_rollout_path;
pub use file_io::read_nonempty_rollout_text;
pub use file_io::read_rollout_text;
pub use file_io::strip_rollout_file_suffix;
pub use list::find_archived_thread_path_by_id_str;
pub use list::find_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
pub use list::read_latest_turn_context;
pub use list::rollout_date_parts;
pub use policy::EventPersistenceMode;
pub use recorder::RolloutRecorder;

View File

@@ -17,6 +17,10 @@ use uuid::Uuid;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use super::file_io::RolloutLineReader;
use super::file_io::is_rollout_file_name;
use super::file_io::read_rollout_text;
use super::file_io::strip_rollout_file_suffix;
use crate::protocol::EventMsg;
use crate::state_db;
use codex_file_search as file_search;
@@ -25,6 +29,7 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
/// Returned page of thread (thread) summaries.
@@ -378,7 +383,7 @@ pub async fn get_threads_in_root(
/// Load thread file paths from disk using directory traversal.
///
/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl`
/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl(.zst)`
/// Returned newest (based on sort key) first.
async fn traverse_directories_for_paths(
root: PathBuf,
@@ -823,7 +828,7 @@ async fn collect_flat_rollout_files(
let Some(name_str) = file_name.to_str() else {
continue;
};
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
if !is_rollout_file_name(name_str) {
continue;
}
let Some((ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
@@ -843,7 +848,7 @@ async fn collect_rollout_day_files(
day_path: &Path,
) -> io::Result<Vec<(OffsetDateTime, Uuid, PathBuf)>> {
let mut day_files = collect_files(day_path, |name_str, path| {
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
if !is_rollout_file_name(name_str) {
return None;
}
@@ -856,8 +861,8 @@ async fn collect_rollout_day_files(
}
pub(crate) fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl(.zst)
let core = strip_rollout_file_suffix(name)?.strip_prefix("rollout-")?;
// Scan from the right for a '-' such that the suffix parses as a UUID.
let (sep_idx, uuid) = core
@@ -913,7 +918,7 @@ async fn collect_flat_files_by_updated_at(
let Some(name_str) = file_name.to_str() else {
continue;
};
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
if !is_rollout_file_name(name_str) {
continue;
}
let Some((_ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
@@ -1001,127 +1006,136 @@ impl<'a> ProviderMatcher<'a> {
}
async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTailSummary> {
use tokio::io::AsyncBufReadExt;
read_rollout_head(path, move |reader| {
let mut summary = HeadTailSummary::default();
let mut lines_scanned = 0usize;
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut summary = HeadTailSummary::default();
let mut lines_scanned = 0usize;
while lines_scanned < head_limit
|| (summary.saw_session_meta
&& !summary.saw_user_event
&& lines_scanned < head_limit + USER_EVENT_SCAN_LIMIT)
{
let Some(line) = reader.next_line()? else {
break;
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
lines_scanned += 1;
while lines_scanned < head_limit
|| (summary.saw_session_meta
&& !summary.saw_user_event
&& lines_scanned < head_limit + USER_EVENT_SCAN_LIMIT)
{
let line_opt = lines.next_line().await?;
let Some(line) = line_opt else { break };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
lines_scanned += 1;
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
continue;
};
let parsed: Result<RolloutLine, _> = serde_json::from_str(trimmed);
let Ok(rollout_line) = parsed else { continue };
match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
if !summary.saw_session_meta {
summary.source = Some(session_meta_line.meta.source.clone());
summary.agent_nickname = session_meta_line.meta.agent_nickname.clone();
summary.agent_role = session_meta_line.meta.agent_role.clone();
summary.model_provider = session_meta_line.meta.model_provider.clone();
summary.thread_id = Some(session_meta_line.meta.id);
summary.cwd = Some(session_meta_line.meta.cwd.clone());
summary.git_branch = session_meta_line
.git
.as_ref()
.and_then(|git| git.branch.clone());
summary.git_sha = session_meta_line
.git
.as_ref()
.and_then(|git| git.commit_hash.as_ref().map(|sha| sha.0.clone()));
summary.git_origin_url = session_meta_line
.git
.as_ref()
.and_then(|git| git.repository_url.clone());
summary.cli_version = Some(session_meta_line.meta.cli_version);
summary.created_at = Some(session_meta_line.meta.timestamp.clone());
summary.saw_session_meta = true;
match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
if !summary.saw_session_meta {
summary.source = Some(session_meta_line.meta.source.clone());
summary.agent_nickname = session_meta_line.meta.agent_nickname.clone();
summary.agent_role = session_meta_line.meta.agent_role.clone();
summary.model_provider = session_meta_line.meta.model_provider.clone();
summary.thread_id = Some(session_meta_line.meta.id);
summary.cwd = Some(session_meta_line.meta.cwd.clone());
summary.git_branch = session_meta_line
.git
.as_ref()
.and_then(|git| git.branch.clone());
summary.git_sha = session_meta_line
.git
.as_ref()
.and_then(|git| git.commit_hash.as_ref().map(|sha| sha.0.clone()));
summary.git_origin_url = session_meta_line
.git
.as_ref()
.and_then(|git| git.repository_url.clone());
summary.cli_version = Some(session_meta_line.meta.cli_version);
summary.created_at = Some(session_meta_line.meta.timestamp.clone());
summary.saw_session_meta = true;
}
}
}
RolloutItem::ResponseItem(_) => {
summary.created_at = summary
.created_at
.clone()
.or_else(|| Some(rollout_line.timestamp.clone()));
}
RolloutItem::TurnContext(_) => {
// Not included in `head`; skip.
}
RolloutItem::Compacted(_) => {
// Not included in `head`; skip.
}
RolloutItem::EventMsg(ev) => {
if let EventMsg::UserMessage(user) = ev {
summary.saw_user_event = true;
if summary.first_user_message.is_none() {
let message = strip_user_message_prefix(user.message.as_str()).to_string();
if !message.is_empty() {
summary.first_user_message = Some(message);
RolloutItem::ResponseItem(_) => {
summary.created_at = summary
.created_at
.clone()
.or_else(|| Some(rollout_line.timestamp.clone()));
}
RolloutItem::TurnContext(_) => {}
RolloutItem::Compacted(_) => {}
RolloutItem::EventMsg(event) => {
if let EventMsg::UserMessage(user) = event {
summary.saw_user_event = true;
if summary.first_user_message.is_none() {
let message =
strip_user_message_prefix(user.message.as_str()).to_string();
if !message.is_empty() {
summary.first_user_message = Some(message);
}
}
}
}
}
if summary.saw_session_meta && summary.saw_user_event {
break;
}
}
if summary.saw_session_meta && summary.saw_user_event {
break;
}
}
Ok(summary)
Ok(summary)
})
.await
}
/// Read up to `HEAD_RECORD_LIMIT` records from the start of the rollout file at `path`.
/// This should be enough to produce a summary including the session meta line.
pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Value>> {
use tokio::io::AsyncBufReadExt;
read_rollout_head(path, |reader| {
let mut head = Vec::new();
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut head = Vec::new();
while head.len() < HEAD_RECORD_LIMIT {
let Some(line) = lines.next_line().await? else {
break;
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) {
match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
if let Ok(value) = serde_json::to_value(session_meta_line) {
head.push(value);
while head.len() < HEAD_RECORD_LIMIT {
let Some(line) = reader.next_line()? else {
break;
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) {
match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
if let Ok(value) = serde_json::to_value(session_meta_line) {
head.push(value);
}
}
}
RolloutItem::ResponseItem(item) => {
if let Ok(value) = serde_json::to_value(item) {
head.push(value);
RolloutItem::ResponseItem(item) => {
if let Ok(value) = serde_json::to_value(item) {
head.push(value);
}
}
RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => {}
}
RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => {}
}
}
}
Ok(head)
Ok(head)
})
.await
}
async fn read_rollout_head<T, F>(path: &Path, read: F) -> io::Result<T>
where
T: Send + 'static,
F: FnOnce(&mut RolloutLineReader) -> io::Result<T> + Send + 'static,
{
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || {
let mut reader = RolloutLineReader::open(path.as_path())?;
read(&mut reader)
})
.await
.map_err(|err| io::Error::other(format!("failed to read rollout head: {err}")))?
}
fn strip_user_message_prefix(text: &str) -> &str {
@@ -1149,6 +1163,29 @@ pub async fn read_session_meta_line(path: &Path) -> io::Result<SessionMetaLine>
})
}
/// Read the latest TurnContext item from a rollout file, if present.
pub async fn read_latest_turn_context(path: &Path) -> io::Result<Option<TurnContextItem>> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || {
let text = read_rollout_text(path.as_path())?;
for line in text.lines().rev() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
continue;
};
if let RolloutItem::TurnContext(item) = rollout_line.item {
return Ok(Some(item));
}
}
Ok(None)
})
.await
.map_err(|err| io::Error::other(format!("failed to read latest turn context: {err}")))?
}
async fn file_modified_time(path: &Path) -> io::Result<Option<OffsetDateTime>> {
let meta = tokio::fs::metadata(path).await?;
let modified = meta.modified().ok();

View File

@@ -1,6 +1,8 @@
use crate::ARCHIVED_SESSIONS_SUBDIR;
use crate::SESSIONS_SUBDIR;
use crate::config::RolloutConfigView;
use crate::file_io::is_rollout_file_name;
use crate::file_io::strip_rollout_file_suffix;
use crate::list;
use crate::list::parse_timestamp_uuid_from_filename;
use crate::recorder::RolloutRecorder;
@@ -30,7 +32,6 @@ use tracing::info;
use tracing::warn;
const ROLLOUT_PREFIX: &str = "rollout-";
const ROLLOUT_SUFFIX: &str = ".jsonl";
const BACKFILL_BATCH_SIZE: usize = 200;
#[cfg(not(test))]
const BACKFILL_LEASE_SECONDS: i64 = 900;
@@ -80,7 +81,7 @@ pub fn builder_from_items(
}
let file_name = rollout_path.file_name()?.to_str()?;
if !file_name.starts_with(ROLLOUT_PREFIX) || !file_name.ends_with(ROLLOUT_SUFFIX) {
if !is_rollout_file_name(file_name) {
return None;
}
let (created_ts, uuid) = parse_timestamp_uuid_from_filename(file_name)?;
@@ -430,7 +431,7 @@ async fn collect_rollout_paths(root: &Path) -> std::io::Result<Vec<PathBuf>> {
let Some(name) = file_name.to_str() else {
continue;
};
if name.starts_with(ROLLOUT_PREFIX) && name.ends_with(ROLLOUT_SUFFIX) {
if name.starts_with(ROLLOUT_PREFIX) && strip_rollout_file_suffix(name).is_some() {
paths.push(path);
}
}

View File

@@ -1,7 +1,6 @@
//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later.
//! Persist Codex session rollouts (`.jsonl` / `.jsonl.zst`) so sessions can be replayed or
//! inspected later.
use std::fs;
use std::fs::File;
use std::io::Error as IoError;
use std::path::Path;
use std::path::PathBuf;
@@ -17,7 +16,6 @@ use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::format_description::well_known::Rfc3339;
use time::macros::format_description;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
@@ -27,6 +25,9 @@ use tracing::warn;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use super::file_io::RolloutAppendWriter;
use super::file_io::RolloutLineReader;
use super::file_io::preferred_rollout_file_suffix;
use super::list::Cursor;
use super::list::ThreadItem;
use super::list::ThreadListConfig;
@@ -37,6 +38,7 @@ use super::list::get_threads;
use super::list::get_threads_in_root;
use super::list::parse_cursor;
use super::list::parse_timestamp_uuid_from_filename;
use super::list::read_latest_turn_context;
use super::metadata;
use super::policy::EventPersistenceMode;
use super::policy::is_persisted_response_item;
@@ -61,7 +63,8 @@ use codex_utils_path as path_utils;
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
/// every update.
///
/// Rollouts are recorded as JSONL and can be inspected with tools such as:
/// Rollouts are recorded as JSONL, optionally compressed with zstd, and can be inspected after
/// decompression with tools such as:
///
/// ```ignore
/// $ jq -C . ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
@@ -366,14 +369,14 @@ impl RolloutRecorder {
/// For newly created sessions, this precomputes path/metadata and defers
/// file creation/open until an explicit `persist()` call.
///
/// For resumed sessions, this immediately opens the existing rollout file.
/// For resumed sessions, this validates the existing rollout file path immediately.
pub async fn new(
config: &impl RolloutConfigView,
params: RolloutRecorderParams,
state_db_ctx: Option<StateDbHandle>,
state_builder: Option<ThreadMetadataBuilder>,
) -> std::io::Result<Self> {
let (file, deferred_log_file_info, rollout_path, meta, event_persistence_mode) =
let (writer, deferred_log_file_info, rollout_path, meta, event_persistence_mode) =
match params {
RolloutRecorderParams::Create {
conversation_id,
@@ -429,18 +432,16 @@ impl RolloutRecorder {
RolloutRecorderParams::Resume {
path,
event_persistence_mode,
} => (
Some(
tokio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?,
),
None,
path,
None,
event_persistence_mode,
),
} => {
tokio::fs::metadata(&path).await?;
(
Some(RolloutFileWriter::open(path.clone()).await?),
None,
path,
None,
event_persistence_mode,
)
}
};
// Clone the cwd for the spawned task to collect git info asynchronously
@@ -450,11 +451,9 @@ impl RolloutRecorder {
// future will yield, which is fine we only need to ensure we do not
// perform *blocking* I/O on the caller's thread.
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
// Spawn a Tokio task that owns the file handle and performs async
// writes. Using `tokio::fs::File` keeps everything on the async I/O
// driver instead of blocking the runtime.
// Spawn a Tokio task that owns the rollout writer.
tokio::task::spawn(rollout_writer(
file,
writer,
deferred_log_file_info,
rx,
meta,
@@ -532,65 +531,10 @@ impl RolloutRecorder {
path: &Path,
) -> std::io::Result<(Vec<RolloutItem>, Option<ThreadId>, usize)> {
trace!("Resuming rollout from {path:?}");
let text = tokio::fs::read_to_string(path).await?;
if text.trim().is_empty() {
return Err(IoError::other("empty session file"));
}
let mut items: Vec<RolloutItem> = Vec::new();
let mut thread_id: Option<ThreadId> = None;
let mut parse_errors = 0usize;
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(e) => {
warn!("failed to parse line as JSON: {line:?}, error: {e}");
parse_errors = parse_errors.saturating_add(1);
continue;
}
};
// Parse the rollout line structure
match serde_json::from_value::<RolloutLine>(v.clone()) {
Ok(rollout_line) => match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
// Use the FIRST SessionMeta encountered in the file as the canonical
// thread id and main session information. Keep all items intact.
if thread_id.is_none() {
thread_id = Some(session_meta_line.meta.id);
}
items.push(RolloutItem::SessionMeta(session_meta_line));
}
RolloutItem::ResponseItem(item) => {
items.push(RolloutItem::ResponseItem(item));
}
RolloutItem::Compacted(item) => {
items.push(RolloutItem::Compacted(item));
}
RolloutItem::TurnContext(item) => {
items.push(RolloutItem::TurnContext(item));
}
RolloutItem::EventMsg(_ev) => {
items.push(RolloutItem::EventMsg(_ev));
}
},
Err(e) => {
trace!("failed to parse rollout line: {e}");
parse_errors = parse_errors.saturating_add(1);
}
}
}
tracing::debug!(
"Resumed rollout with {} items, thread ID: {:?}, parse errors: {}",
items.len(),
thread_id,
parse_errors,
);
Ok((items, thread_id, parse_errors))
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || load_rollout_items_sync(path.as_path()))
.await
.map_err(|err| IoError::other(format!("failed to read rollout items: {err}")))?
}
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
@@ -627,6 +571,85 @@ impl RolloutRecorder {
}
}
fn load_rollout_items_sync(
path: &Path,
) -> std::io::Result<(Vec<RolloutItem>, Option<ThreadId>, usize)> {
let mut reader = RolloutLineReader::open(path)?;
let mut items = Vec::new();
let mut thread_id = None;
let mut parse_errors = 0usize;
let mut saw_content = false;
loop {
let line = match reader.next_line() {
Ok(Some(line)) => line,
Ok(None) => break,
Err(err) => {
if !saw_content {
return Err(err);
}
warn!(
"stopping rollout resume after read error for {path:?}; preserving parsed items: {err}"
);
parse_errors = parse_errors.saturating_add(1);
break;
}
};
if line.trim().is_empty() {
continue;
}
saw_content = true;
let value: Value = match serde_json::from_str(line.as_str()) {
Ok(value) => value,
Err(err) => {
warn!("failed to parse line as JSON: {line:?}, error: {err}");
parse_errors = parse_errors.saturating_add(1);
continue;
}
};
match serde_json::from_value::<RolloutLine>(value.clone()) {
Ok(rollout_line) => match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
if thread_id.is_none() {
thread_id = Some(session_meta_line.meta.id);
}
items.push(RolloutItem::SessionMeta(session_meta_line));
}
RolloutItem::ResponseItem(item) => {
items.push(RolloutItem::ResponseItem(item));
}
RolloutItem::Compacted(item) => {
items.push(RolloutItem::Compacted(item));
}
RolloutItem::TurnContext(item) => {
items.push(RolloutItem::TurnContext(item));
}
RolloutItem::EventMsg(event) => {
items.push(RolloutItem::EventMsg(event));
}
},
Err(err) => {
trace!("failed to parse rollout line: {err}");
parse_errors = parse_errors.saturating_add(1);
}
}
}
if !saw_content {
return Err(IoError::other("empty session file"));
}
tracing::debug!(
"Resumed rollout with {} items, thread ID: {:?}, parse errors: {}",
items.len(),
thread_id,
parse_errors,
);
Ok((items, thread_id, parse_errors))
}
fn truncate_fs_page(
mut page: ThreadsPage,
page_size: usize,
@@ -680,7 +703,10 @@ fn precompute_log_file_info(
.format(format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let filename = format!("rollout-{date_str}-{conversation_id}.jsonl");
let filename = format!(
"rollout-{date_str}-{conversation_id}{}",
preferred_rollout_file_suffix()
);
let path = dir.join(filename);
@@ -690,24 +716,9 @@ fn precompute_log_file_info(
timestamp,
})
}
fn open_log_file(path: &Path) -> std::io::Result<File> {
let Some(parent) = path.parent() else {
return Err(IoError::other(format!(
"rollout path has no parent: {}",
path.display()
)));
};
fs::create_dir_all(parent)?;
std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(path)
}
#[allow(clippy::too_many_arguments)]
async fn rollout_writer(
file: Option<tokio::fs::File>,
mut writer: Option<RolloutFileWriter>,
mut deferred_log_file_info: Option<LogFileInfo>,
mut rx: mpsc::Receiver<RolloutCmd>,
mut meta: Option<SessionMeta>,
@@ -718,7 +729,6 @@ async fn rollout_writer(
default_provider: String,
generate_memories: bool,
) -> std::io::Result<()> {
let mut writer = file.map(|file| JsonlWriter { file });
let mut buffered_items = Vec::<RolloutItem>::new();
if let Some(builder) = state_builder.as_mut() {
builder.rollout_path = rollout_path.clone();
@@ -773,10 +783,7 @@ async fn rollout_writer(
"deferred rollout recorder missing log file metadata",
));
};
let file = open_log_file(log_file_info.path.as_path())?;
writer = Some(JsonlWriter {
file: tokio::fs::File::from_std(file),
});
writer = Some(RolloutFileWriter::open(log_file_info.path).await?);
if let Some(session_meta) = meta.take() {
write_session_meta(
@@ -817,27 +824,31 @@ async fn rollout_writer(
let _ = ack.send(());
}
RolloutCmd::Flush { ack } => {
// Deferred fresh threads may not have an initialized file yet.
if let Some(writer) = writer.as_mut()
&& let Err(e) = writer.file.flush().await
{
let _ = ack.send(());
return Err(e);
}
let _ = ack.send(());
}
RolloutCmd::Shutdown { ack } => {
if let Some(writer) = writer.as_mut()
&& let Err(err) = writer.finish().await
{
let _ = ack.send(());
return Err(err);
}
let _ = ack.send(());
return Ok(());
}
}
}
if let Some(writer) = writer.as_mut() {
writer.finish().await?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn write_session_meta(
mut writer: Option<&mut JsonlWriter>,
mut writer: Option<&mut RolloutFileWriter>,
session_meta: SessionMeta,
cwd: &Path,
rollout_path: &Path,
@@ -876,7 +887,7 @@ async fn write_session_meta(
}
async fn write_and_reconcile_items(
mut writer: Option<&mut JsonlWriter>,
mut writer: Option<&mut RolloutFileWriter>,
items: &[RolloutItem],
rollout_path: &Path,
state_db_ctx: Option<&StateRuntime>,
@@ -884,9 +895,7 @@ async fn write_and_reconcile_items(
default_provider: &str,
) -> std::io::Result<()> {
if let Some(writer) = writer.as_mut() {
for item in items {
writer.write_rollout_item(item).await?;
}
writer.write_rollout_items(items).await?;
}
sync_thread_state_after_write(
state_db_ctx,
@@ -949,8 +958,12 @@ async fn sync_thread_state_after_write(
.await;
}
struct JsonlWriter {
file: tokio::fs::File,
struct RolloutFileWriter {
inner: Option<BlockingRolloutFileWriter>,
}
struct BlockingRolloutFileWriter {
writer: RolloutAppendWriter,
}
#[derive(serde::Serialize)]
@@ -960,28 +973,68 @@ struct RolloutLineRef<'a> {
item: &'a RolloutItem,
}
impl JsonlWriter {
impl RolloutFileWriter {
async fn open(path: PathBuf) -> std::io::Result<Self> {
tokio::task::spawn_blocking(move || {
let writer = RolloutAppendWriter::open(path.as_path())?;
Ok(Self {
inner: Some(BlockingRolloutFileWriter { writer }),
})
})
.await
.map_err(|err| IoError::other(format!("failed to open rollout writer: {err}")))?
}
async fn write_rollout_item(&mut self, rollout_item: &RolloutItem) -> std::io::Result<()> {
self.write_rollout_items(std::slice::from_ref(rollout_item))
.await
}
async fn write_rollout_items(&mut self, rollout_items: &[RolloutItem]) -> std::io::Result<()> {
if rollout_items.is_empty() {
return Ok(());
}
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let timestamp = OffsetDateTime::now_utc()
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let line = RolloutLineRef {
timestamp,
item: rollout_item,
let mut lines = Vec::with_capacity(rollout_items.len());
for rollout_item in rollout_items {
let timestamp = OffsetDateTime::now_utc()
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let line = RolloutLineRef {
timestamp,
item: rollout_item,
};
let mut json = serde_json::to_string(&line)?;
json.push('\n');
lines.push(json);
}
let Some(writer) = self.inner.take() else {
return Err(IoError::other("rollout writer missing inner state"));
};
self.write_line(&line).await
}
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
let mut json = serde_json::to_string(item)?;
json.push('\n');
self.file.write_all(json.as_bytes()).await?;
self.file.flush().await?;
let writer = tokio::task::spawn_blocking(move || {
let mut writer = writer;
for line in lines {
writer.writer.append_text(line.as_str())?;
}
Ok::<BlockingRolloutFileWriter, IoError>(writer)
})
.await
.map_err(|err| IoError::other(format!("failed to write rollout line: {err}")))??;
self.inner = Some(writer);
Ok(())
}
async fn finish(&mut self) -> std::io::Result<()> {
let Some(writer) = self.inner.take() else {
return Ok(());
};
tokio::task::spawn_blocking(move || writer.writer.finish())
.await
.map_err(|err| IoError::other(format!("failed to finish rollout writer: {err}")))?
}
}
impl From<codex_state::ThreadsPage> for ThreadsPage {
@@ -1050,20 +1103,12 @@ async fn resume_candidate_matches_cwd(
cwd: &Path,
default_provider: &str,
) -> bool {
if cached_cwd.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd)) {
return true;
if let Ok(Some(turn_context)) = read_latest_turn_context(rollout_path).await {
return cwd_matches(turn_context.cwd.as_path(), cwd);
}
if let Ok((items, _, _)) = RolloutRecorder::load_rollout_items(rollout_path).await
&& let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item {
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()),
RolloutItem::SessionMeta(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::EventMsg(_) => None,
})
{
return cwd_matches(latest_turn_context_cwd, cwd);
if cached_cwd.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd)) {
return true;
}
metadata::extract_metadata_from_rollout(rollout_path, default_provider)

View File

@@ -2,6 +2,8 @@
use super::*;
use crate::config::RolloutConfig;
use crate::file_io::RolloutAppendWriter;
use crate::file_io::read_rollout_text;
use chrono::TimeZone;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::protocol::AgentMessageEvent;
@@ -124,7 +126,7 @@ async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<
recorder.persist().await?;
assert!(rollout_path.exists(), "rollout file should be materialized");
let text = std::fs::read_to_string(&rollout_path)?;
let text = read_rollout_text(rollout_path.as_path())?;
assert!(
text.contains("\"type\":\"session_meta\""),
"expected session metadata in rollout"
@@ -139,7 +141,7 @@ async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<
buffered_idx < user_idx,
"buffered items should preserve ordering"
);
let text_after_second_persist = std::fs::read_to_string(&rollout_path)?;
let text_after_second_persist = read_rollout_text(rollout_path.as_path())?;
assert_eq!(text_after_second_persist, text);
recorder.shutdown().await?;
@@ -483,3 +485,68 @@ async fn resume_candidate_matches_cwd_reads_latest_turn_context() -> std::io::Re
);
Ok(())
}
#[tokio::test]
async fn load_rollout_items_preserves_parsed_items_when_zstd_tail_is_truncated()
-> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let thread_id = ThreadId::new();
let rollout_path = temp_dir
.path()
.join(format!("rollout-2026-01-01T00-00-00-{thread_id}.jsonl.zst"));
let session_meta = serde_json::json!({
"timestamp": "2026-01-01T00:00:00Z",
"type": "session_meta",
"payload": {
"id": thread_id,
"timestamp": "2026-01-01T00:00:00Z",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "cli",
"model_provider": "test-provider",
},
});
let user_event = serde_json::json!({
"timestamp": "2026-01-01T00:00:01Z",
"type": "event_msg",
"payload": {
"type": "user_message",
"message": "hello",
"kind": "plain",
},
});
let mut writer = RolloutAppendWriter::open(&rollout_path)?;
writer.append_text(&format!("{session_meta}\n"))?;
writer.append_text(&format!("{user_event}\n"))?;
drop(writer);
let file = File::options().write(true).open(&rollout_path)?;
let truncated_len = file.metadata()?.len().saturating_sub(1);
file.set_len(truncated_len)?;
let (items, parsed_thread_id, parse_errors) =
RolloutRecorder::load_rollout_items(&rollout_path).await?;
assert_eq!(parsed_thread_id, Some(thread_id));
assert_eq!(items.len(), 1);
let Some(RolloutItem::SessionMeta(session_meta_line)) = items.first() else {
panic!("expected recovered session meta");
};
assert_eq!(session_meta_line.meta.id, thread_id);
assert_eq!(session_meta_line.meta.timestamp, "2026-01-01T00:00:00Z");
assert_eq!(session_meta_line.meta.cwd, PathBuf::from("."));
assert_eq!(session_meta_line.meta.originator, "test_originator");
assert_eq!(session_meta_line.meta.cli_version, "test_version");
assert_eq!(session_meta_line.meta.source, SessionSource::Cli);
assert_eq!(
session_meta_line.meta.model_provider.as_deref(),
Some("test-provider")
);
assert!(session_meta_line.git.is_none());
assert_eq!(parse_errors, 0);
Ok(())
}

View File

@@ -19,6 +19,7 @@ use time::macros::format_description;
use uuid::Uuid;
use crate::INTERACTIVE_SESSION_SOURCES;
use crate::file_io::read_rollout_text;
use crate::find_thread_path_by_id_str;
use crate::list::Cursor;
use crate::list::ThreadItem;
@@ -26,6 +27,7 @@ use crate::list::ThreadSortKey;
use crate::list::ThreadsPage;
use crate::list::get_threads;
use crate::list::read_head_for_summary;
use crate::list::read_latest_turn_context;
use crate::rollout_date_parts;
use anyhow::Result;
use codex_protocol::ThreadId;
@@ -37,6 +39,7 @@ use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::UserMessageEvent;
const NO_SOURCE_FILTER: &[SessionSource] = &[];
@@ -89,6 +92,47 @@ async fn insert_state_db_thread(
.expect("state db upsert should succeed");
}
#[tokio::test]
async fn read_latest_turn_context_reads_compressed_rollout() -> Result<()> {
let temp = TempDir::new()?;
let path = temp.path().join("rollout.jsonl.zst");
let mut encoder = zstd::stream::write::Encoder::new(File::create(&path)?, 0)?;
for (idx, cwd) in ["/tmp/first", "/tmp/second"].into_iter().enumerate() {
let line = RolloutLine {
timestamp: format!("t{idx}"),
item: RolloutItem::TurnContext(TurnContextItem {
turn_id: Some(format!("turn-{idx}")),
trace_id: None,
cwd: cwd.into(),
current_date: None,
timezone: None,
approval_policy: codex_protocol::protocol::AskForApproval::OnRequest,
sandbox_policy: codex_protocol::protocol::SandboxPolicy::new_read_only_policy(),
network: None,
model: format!("model-{idx}"),
personality: None,
collaboration_mode: None,
realtime_active: None,
effort: None,
summary: codex_protocol::config_types::ReasoningSummary::Auto,
user_instructions: None,
developer_instructions: None,
final_output_json_schema: None,
truncation_policy: None,
}),
};
writeln!(encoder, "{}", serde_json::to_string(&line)?)?;
}
let _ = encoder.finish()?;
let turn_context = read_latest_turn_context(path.as_path())
.await?
.expect("latest turn context");
assert_eq!(turn_context.cwd, Path::new("/tmp/second"));
assert_eq!(turn_context.model, "model-1");
Ok(())
}
// TODO(jif) fix
// #[tokio::test]
// async fn list_threads_prefers_state_db_when_available() {
@@ -918,7 +962,7 @@ async fn test_get_thread_contents() {
.unwrap();
let path = &page.items[0].path;
let content = tokio::fs::read_to_string(path).await.unwrap();
let content = read_rollout_text(path).unwrap();
// Page equality (single item)
let expected_path = home

View File

@@ -148,3 +148,4 @@ rand = { workspace = true }
serial_test = { workspace = true }
vt100 = { workspace = true }
uuid = { workspace = true }
zstd = { workspace = true }

View File

@@ -37,6 +37,7 @@ use codex_core::config_loader::format_config_error_with_source;
use codex_core::default_client::set_default_client_residency_requirement;
use codex_core::format_exec_policy_error_with_source;
use codex_core::path_utils;
use codex_core::read_latest_turn_context;
use codex_core::read_session_meta_line;
use codex_core::state_db::get_state_db;
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
@@ -45,9 +46,7 @@ use codex_protocol::config_types::AltScreenMode;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::TurnContextItem;
use codex_state::log_db;
use codex_terminal_detection::Multiplexer;
use codex_terminal_detection::terminal_info;
@@ -1381,12 +1380,13 @@ pub(crate) async fn read_session_cwd(
// Prefer the latest TurnContext cwd so resume/fork reflects the most recent
// session directory (for the changed-cwd prompt) when DB data is unavailable.
// When the rollout cannot be read, fall back to session metadata.
// The alternative would be mutating the SessionMeta line when the session cwd
// changes, but the rollout is an append-only JSONL log and rewriting the head
// would be error-prone.
let path = path?;
if let Some(cwd) = read_latest_turn_context(path).await.map(|item| item.cwd) {
return Some(cwd);
if let Ok(Some(turn_context)) = read_latest_turn_context(path).await {
return Some(turn_context.cwd);
}
match read_session_meta_line(path).await {
Ok(meta_line) => Some(meta_line.meta.cwd),
@@ -1415,26 +1415,11 @@ pub(crate) async fn read_session_model(
}
let path = path?;
read_latest_turn_context(path).await.map(|item| item.model)
}
async fn read_latest_turn_context(path: &Path) -> Option<TurnContextItem> {
let text = tokio::fs::read_to_string(path).await.ok()?;
for line in text.lines().rev() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
continue;
};
if let RolloutItem::TurnContext(item) = rollout_line.item {
return Some(item);
}
if let Ok(Some(turn_context)) = read_latest_turn_context(path).await {
return Some(turn_context.model);
}
None
}
pub(crate) fn cwds_differ(current_cwd: &Path, session_cwd: &Path) -> bool {
match (
path_utils::normalize_for_path_comparison(current_cwd),
@@ -2044,6 +2029,45 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn read_session_cwd_prefers_latest_turn_context_from_compressed_rollout()
-> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let config = build_config(&temp_dir).await?;
let first = temp_dir.path().join("first");
let second = temp_dir.path().join("second");
std::fs::create_dir_all(&first)?;
std::fs::create_dir_all(&second)?;
let rollout_path = temp_dir.path().join("rollout.jsonl.zst");
let mut encoder =
zstd::stream::write::Encoder::new(std::fs::File::create(&rollout_path)?, 0)?;
for line in [
RolloutLine {
timestamp: "t0".to_string(),
item: RolloutItem::TurnContext(build_turn_context(&config, first)),
},
RolloutLine {
timestamp: "t1".to_string(),
item: RolloutItem::TurnContext(build_turn_context(&config, second.clone())),
},
] {
use std::io::Write;
writeln!(
encoder,
"{}",
serde_json::to_string(&line).expect("serialize rollout")
)?;
}
let _ = encoder.finish()?;
let cwd = read_session_cwd(&config, ThreadId::new(), Some(&rollout_path))
.await
.expect("expected cwd");
assert_eq!(cwd, second);
Ok(())
}
#[tokio::test]
async fn should_prompt_when_meta_matches_current_but_latest_turn_differs() -> std::io::Result<()>
{