mirror of
https://github.com/openai/codex.git
synced 2026-05-09 22:02:32 +00:00
Compare commits
7 Commits
pr20460
...
jif/compre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
538f338b42 | ||
|
|
ceec4cd64e | ||
|
|
dd82f28ea2 | ||
|
|
793dc92ae0 | ||
|
|
83eb0ed8b2 | ||
|
|
74bd00fc43 | ||
|
|
8e6c8d9de9 |
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -2478,6 +2478,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2712,6 +2713,7 @@ dependencies = [
|
||||
"which 8.0.0",
|
||||
"windows-sys 0.52.0",
|
||||
"winsplit",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -232,6 +232,7 @@ use codex_core::plugins::PluginUninstallError as CorePluginUninstallError;
|
||||
use codex_core::plugins::load_plugin_apps;
|
||||
use codex_core::plugins::load_plugin_mcp_servers;
|
||||
use codex_core::read_head_for_summary;
|
||||
use codex_core::read_latest_turn_context;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::rollout_date_parts;
|
||||
use codex_core::sandboxing::SandboxPermissions;
|
||||
@@ -2972,7 +2973,6 @@ impl CodexMessageProcessor {
|
||||
});
|
||||
};
|
||||
|
||||
let required_suffix = format!("{thread_id}.jsonl");
|
||||
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
@@ -2980,9 +2980,7 @@ impl CodexMessageProcessor {
|
||||
data: None,
|
||||
});
|
||||
};
|
||||
if !file_name
|
||||
.to_string_lossy()
|
||||
.ends_with(required_suffix.as_str())
|
||||
if !rollout_file_name_matches_thread_id(file_name.to_string_lossy().as_ref(), thread_id)
|
||||
{
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
@@ -5324,7 +5322,6 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
|
||||
// Verify file name matches thread id.
|
||||
let required_suffix = format!("{thread_id}.jsonl");
|
||||
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
@@ -5335,10 +5332,7 @@ impl CodexMessageProcessor {
|
||||
data: None,
|
||||
});
|
||||
};
|
||||
if !file_name
|
||||
.to_string_lossy()
|
||||
.ends_with(required_suffix.as_str())
|
||||
{
|
||||
if !rollout_file_name_matches_thread_id(file_name.to_string_lossy().as_ref(), thread_id) {
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!(
|
||||
@@ -8184,6 +8178,10 @@ async fn read_history_cwd_from_state_db(
|
||||
thread_id: Option<ThreadId>,
|
||||
rollout_path: &Path,
|
||||
) -> Option<PathBuf> {
|
||||
if let Ok(Some(turn_context)) = read_latest_turn_context(rollout_path).await {
|
||||
return Some(turn_context.cwd);
|
||||
}
|
||||
|
||||
if let Some(state_db_ctx) = get_state_db(config).await
|
||||
&& let Some(thread_id) = thread_id
|
||||
&& let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await
|
||||
@@ -8274,7 +8272,7 @@ async fn summary_from_thread_list_item(
|
||||
|
||||
fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
|
||||
let file_name = path.file_name()?.to_str()?;
|
||||
let stem = file_name.strip_suffix(".jsonl")?;
|
||||
let stem = codex_core::strip_rollout_file_suffix(file_name)?;
|
||||
if stem.len() < 37 {
|
||||
return None;
|
||||
}
|
||||
@@ -8285,6 +8283,13 @@ fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
|
||||
ThreadId::from_string(&stem[uuid_start..]).ok()
|
||||
}
|
||||
|
||||
fn rollout_file_name_matches_thread_id(file_name: &str, thread_id: ThreadId) -> bool {
|
||||
let Some(stem) = codex_core::strip_rollout_file_suffix(file_name) else {
|
||||
return false;
|
||||
};
|
||||
stem.ends_with(&format!("-{thread_id}"))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn summary_from_state_db_metadata(
|
||||
conversation_id: ThreadId,
|
||||
@@ -8412,6 +8417,12 @@ pub(crate) async fn read_summary_from_rollout(
|
||||
.unwrap_or_else(|| fallback_provider.to_string());
|
||||
let git_info = git.as_ref().map(map_git_info);
|
||||
let updated_at = updated_at.or_else(|| timestamp.clone());
|
||||
let resume_cwd = read_latest_turn_context(path)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|turn_context| turn_context.cwd)
|
||||
.unwrap_or_else(|| session_meta.cwd.clone());
|
||||
|
||||
Ok(ConversationSummary {
|
||||
conversation_id: session_meta.id,
|
||||
@@ -8420,7 +8431,7 @@ pub(crate) async fn read_summary_from_rollout(
|
||||
path: path.to_path_buf(),
|
||||
preview: String::new(),
|
||||
model_provider,
|
||||
cwd: session_meta.cwd,
|
||||
cwd: resume_cwd,
|
||||
cli_version: session_meta.cli_version,
|
||||
source: session_meta.source,
|
||||
git_info,
|
||||
|
||||
@@ -172,16 +172,21 @@ pub use rollout::find_conversation_path_by_id_str;
|
||||
pub use rollout::find_thread_name_by_id;
|
||||
pub use rollout::find_thread_path_by_id_str;
|
||||
pub use rollout::find_thread_path_by_name_str;
|
||||
pub use rollout::is_rollout_path;
|
||||
pub use rollout::list::Cursor;
|
||||
pub use rollout::list::ThreadItem;
|
||||
pub use rollout::list::ThreadSortKey;
|
||||
pub use rollout::list::ThreadsPage;
|
||||
pub use rollout::list::parse_cursor;
|
||||
pub use rollout::list::read_head_for_summary;
|
||||
pub use rollout::list::read_latest_turn_context;
|
||||
pub use rollout::list::read_session_meta_line;
|
||||
pub use rollout::policy::EventPersistenceMode;
|
||||
pub use rollout::read_nonempty_rollout_text;
|
||||
pub use rollout::read_rollout_text;
|
||||
pub use rollout::rollout_date_parts;
|
||||
pub use rollout::session_index::find_thread_names_by_ids;
|
||||
pub use rollout::strip_rollout_file_suffix;
|
||||
mod function_tool;
|
||||
mod state;
|
||||
mod tasks;
|
||||
|
||||
@@ -12,7 +12,11 @@ pub use codex_rollout::find_conversation_path_by_id_str;
|
||||
pub use codex_rollout::find_thread_name_by_id;
|
||||
pub use codex_rollout::find_thread_path_by_id_str;
|
||||
pub use codex_rollout::find_thread_path_by_name_str;
|
||||
pub use codex_rollout::is_rollout_path;
|
||||
pub use codex_rollout::read_nonempty_rollout_text;
|
||||
pub use codex_rollout::read_rollout_text;
|
||||
pub use codex_rollout::rollout_date_parts;
|
||||
pub use codex_rollout::strip_rollout_file_suffix;
|
||||
|
||||
impl codex_rollout::RolloutConfigView for Config {
|
||||
fn codex_home(&self) -> &std::path::Path {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use assert_cmd::Command as AssertCommand;
|
||||
use codex_core::auth::CODEX_API_KEY_ENV_VAR;
|
||||
use codex_core::is_rollout_path;
|
||||
use codex_core::read_rollout_text;
|
||||
use codex_git_utils::collect_git_info;
|
||||
use codex_protocol::protocol::GitInfo;
|
||||
use codex_utils_cargo_bin::find_resource;
|
||||
@@ -375,10 +377,10 @@ async fn integration_creates_and_checks_session_file() -> anyhow::Result<()> {
|
||||
// Find the session file that contains `marker`.
|
||||
let marker_clone = marker.clone();
|
||||
let path = fs_wait::wait_for_matching_file(&sessions_dir, Duration::from_secs(10), move |p| {
|
||||
if p.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
|
||||
if !is_rollout_path(p) {
|
||||
return false;
|
||||
}
|
||||
let Ok(content) = std::fs::read_to_string(p) else {
|
||||
let Ok(content) = read_rollout_text(p) else {
|
||||
return false;
|
||||
};
|
||||
content.contains(&marker_clone)
|
||||
@@ -422,7 +424,7 @@ async fn integration_creates_and_checks_session_file() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
let content =
|
||||
std::fs::read_to_string(&path).unwrap_or_else(|_| panic!("Failed to read session file"));
|
||||
read_rollout_text(path.as_path()).unwrap_or_else(|_| panic!("Failed to read session file"));
|
||||
let mut lines = content.lines();
|
||||
let meta_line = lines
|
||||
.next()
|
||||
@@ -491,10 +493,10 @@ async fn integration_creates_and_checks_session_file() -> anyhow::Result<()> {
|
||||
let marker2_clone = marker2.clone();
|
||||
let resumed_path =
|
||||
fs_wait::wait_for_matching_file(&sessions_dir, Duration::from_secs(10), move |p| {
|
||||
if p.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
|
||||
if !is_rollout_path(p) {
|
||||
return false;
|
||||
}
|
||||
std::fs::read_to_string(p)
|
||||
read_rollout_text(p)
|
||||
.map(|content| content.contains(&marker2_clone))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
@@ -506,7 +508,7 @@ async fn integration_creates_and_checks_session_file() -> anyhow::Result<()> {
|
||||
"resume should create a new session file"
|
||||
);
|
||||
|
||||
let resumed_content = std::fs::read_to_string(&resumed_path)?;
|
||||
let resumed_content = read_rollout_text(resumed_path.as_path())?;
|
||||
assert!(
|
||||
resumed_content.contains(&marker),
|
||||
"resumed file missing original marker"
|
||||
|
||||
@@ -5,6 +5,7 @@ use codex_core::built_in_model_providers;
|
||||
use codex_core::compact::SUMMARIZATION_PROMPT;
|
||||
use codex_core::compact::SUMMARY_PREFIX;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::read_rollout_text;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
@@ -368,7 +369,7 @@ async fn summarize_context_three_requests_and_instructions() {
|
||||
|
||||
// Verify rollout contains user-turn TurnContext entries and a Compacted entry.
|
||||
println!("rollout path: {}", rollout_path.display());
|
||||
let text = std::fs::read_to_string(&rollout_path).unwrap_or_else(|e| {
|
||||
let text = read_rollout_text(rollout_path.as_path()).unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"failed to read rollout file {}: {e}",
|
||||
rollout_path.display()
|
||||
@@ -2069,7 +2070,7 @@ async fn auto_compact_persists_rollout_entries() {
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
|
||||
let rollout_path = session_configured.rollout_path.expect("rollout path");
|
||||
let text = std::fs::read_to_string(&rollout_path).unwrap_or_else(|e| {
|
||||
let text = read_rollout_text(rollout_path.as_path()).unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"failed to read rollout file {}: {e}",
|
||||
rollout_path.display()
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::path::PathBuf;
|
||||
use anyhow::Result;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::compact::SUMMARY_PREFIX;
|
||||
use codex_core::read_rollout_text;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -1160,7 +1161,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
|
||||
assert_eq!(responses_mock.requests().len(), 1);
|
||||
assert_eq!(compact_mock.requests().len(), 1);
|
||||
|
||||
let rollout_text = fs::read_to_string(&rollout_path)?;
|
||||
let rollout_text = read_rollout_text(rollout_path.as_path())?;
|
||||
let mut saw_compacted_history = false;
|
||||
for line in rollout_text
|
||||
.lines()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use codex_core::ForkSnapshot;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::parse_turn_item;
|
||||
use codex_core::read_rollout_text;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
@@ -66,7 +67,7 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
|
||||
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
|
||||
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
|
||||
let text = std::fs::read_to_string(p).expect("read rollout file");
|
||||
let text = read_rollout_text(p).expect("read rollout file");
|
||||
let mut items: Vec<RolloutItem> = Vec::new();
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::path::Path;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_core::read_rollout_text;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::items::parse_hook_prompt_fragment;
|
||||
use codex_protocol::models::ContentItem;
|
||||
@@ -558,7 +559,7 @@ async fn stop_hook_can_block_multiple_times_in_same_turn() -> Result<()> {
|
||||
);
|
||||
|
||||
let rollout_path = test.codex.rollout_path().expect("rollout path");
|
||||
let rollout_text = fs::read_to_string(&rollout_path)?;
|
||||
let rollout_text = read_rollout_text(rollout_path.as_path())?;
|
||||
let hook_prompt_texts = rollout_hook_prompt_texts(&rollout_text)?;
|
||||
assert!(
|
||||
hook_prompt_texts.contains(&FIRST_CONTINUATION_PROMPT.to_string()),
|
||||
@@ -746,7 +747,7 @@ async fn multiple_blocking_stop_hooks_persist_multiple_hook_prompt_fragments() -
|
||||
);
|
||||
|
||||
let rollout_path = test.codex.rollout_path().expect("rollout path");
|
||||
let rollout_text = fs::read_to_string(&rollout_path)?;
|
||||
let rollout_text = read_rollout_text(rollout_path.as_path())?;
|
||||
assert_eq!(
|
||||
rollout_hook_prompt_texts(&rollout_text)?,
|
||||
vec![
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use anyhow::Context;
|
||||
use codex_core::read_nonempty_rollout_text;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
@@ -22,7 +23,6 @@ use image::ImageBuffer;
|
||||
use image::Rgba;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
fn find_user_message_with_image(text: &str) -> Option<ResponseItem> {
|
||||
for line in text.lines() {
|
||||
@@ -58,20 +58,6 @@ fn extract_image_url(item: &ResponseItem) -> Option<String> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_rollout_text(path: &Path) -> anyhow::Result<String> {
|
||||
for _ in 0..50 {
|
||||
if path.exists()
|
||||
&& let Ok(text) = std::fs::read_to_string(path)
|
||||
&& !text.trim().is_empty()
|
||||
{
|
||||
return Ok(text);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
std::fs::read_to_string(path)
|
||||
.with_context(|| format!("read rollout file at {}", path.display()))
|
||||
}
|
||||
|
||||
fn write_test_png(path: &Path, color: [u8; 4]) -> anyhow::Result<()> {
|
||||
if let Some(parent) = path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
@@ -138,7 +124,9 @@ async fn copy_paste_local_image_persists_rollout_request_shape() -> anyhow::Resu
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await;
|
||||
|
||||
let rollout_path = codex.rollout_path().expect("rollout path");
|
||||
let rollout_text = read_rollout_text(&rollout_path).await?;
|
||||
let rollout_text = read_nonempty_rollout_text(&rollout_path)
|
||||
.await
|
||||
.with_context(|| format!("read rollout file at {}", rollout_path.display()))?;
|
||||
let actual = find_user_message_with_image(&rollout_text)
|
||||
.expect("expected user message with input image in rollout");
|
||||
|
||||
@@ -222,7 +210,9 @@ async fn drag_drop_image_persists_rollout_request_shape() -> anyhow::Result<()>
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await;
|
||||
|
||||
let rollout_path = codex.rollout_path().expect("rollout path");
|
||||
let rollout_text = read_rollout_text(&rollout_path).await?;
|
||||
let rollout_text = read_nonempty_rollout_text(&rollout_path)
|
||||
.await
|
||||
.with_context(|| format!("read rollout file at {}", rollout_path.display()))?;
|
||||
let actual = find_user_message_with_image(&rollout_text)
|
||||
.expect("expected user message with input image in rollout");
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::config::Constrained;
|
||||
use codex_core::read_nonempty_rollout_text;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::COLLABORATION_MODE_CLOSE_TAG;
|
||||
use codex_protocol::protocol::COLLABORATION_MODE_OPEN_TAG;
|
||||
@@ -19,7 +20,6 @@ use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn collab_mode_with_instructions(instructions: Option<&str>) -> CollaborationMode {
|
||||
@@ -37,19 +37,6 @@ fn collab_xml(text: &str) -> String {
|
||||
format!("{COLLABORATION_MODE_OPEN_TAG}{text}{COLLABORATION_MODE_CLOSE_TAG}")
|
||||
}
|
||||
|
||||
async fn read_rollout_text(path: &Path) -> anyhow::Result<String> {
|
||||
for _ in 0..50 {
|
||||
if path.exists()
|
||||
&& let Ok(text) = std::fs::read_to_string(path)
|
||||
&& !text.trim().is_empty()
|
||||
{
|
||||
return Ok(text);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
Ok(std::fs::read_to_string(path)?)
|
||||
}
|
||||
|
||||
fn rollout_developer_texts(text: &str) -> Vec<String> {
|
||||
let mut texts = Vec::new();
|
||||
for line in text.lines() {
|
||||
@@ -132,7 +119,7 @@ async fn override_turn_context_without_user_turn_does_not_record_permissions_upd
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
|
||||
let rollout_path = test.codex.rollout_path().expect("rollout path");
|
||||
let rollout_text = read_rollout_text(&rollout_path).await?;
|
||||
let rollout_text = read_nonempty_rollout_text(&rollout_path).await?;
|
||||
let developer_texts = rollout_developer_texts(&rollout_text);
|
||||
let approval_texts: Vec<&String> = developer_texts
|
||||
.iter()
|
||||
@@ -174,7 +161,7 @@ async fn override_turn_context_without_user_turn_does_not_record_environment_upd
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
|
||||
let rollout_path = test.codex.rollout_path().expect("rollout path");
|
||||
let rollout_text = read_rollout_text(&rollout_path).await?;
|
||||
let rollout_text = read_nonempty_rollout_text(&rollout_path).await?;
|
||||
let env_texts = rollout_environment_texts(&rollout_text);
|
||||
assert!(
|
||||
env_texts.is_empty(),
|
||||
@@ -213,7 +200,7 @@ async fn override_turn_context_without_user_turn_does_not_record_collaboration_u
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
|
||||
let rollout_path = test.codex.rollout_path().expect("rollout path");
|
||||
let rollout_text = read_rollout_text(&rollout_path).await?;
|
||||
let rollout_text = read_nonempty_rollout_text(&rollout_path).await?;
|
||||
let developer_texts = rollout_developer_texts(&rollout_text);
|
||||
let collab_text = collab_xml(collab_text);
|
||||
let collab_count = developer_texts
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::REVIEW_PROMPT;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::read_rollout_text;
|
||||
use codex_core::review_format::render_review_output_text;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -120,7 +121,7 @@ async fn review_op_emits_lifecycle_and_review_output() {
|
||||
// Also verify that a user message with the header and a formatted finding
|
||||
// was recorded back in the parent session's rollout.
|
||||
let path = codex.rollout_path().expect("rollout path");
|
||||
let text = std::fs::read_to_string(&path).expect("read rollout file");
|
||||
let text = read_rollout_text(path.as_path()).expect("read rollout file");
|
||||
|
||||
let mut saw_header = false;
|
||||
let mut saw_finding_line = false;
|
||||
@@ -641,7 +642,7 @@ async fn review_input_isolated_from_parent_history() {
|
||||
|
||||
// Also verify that a user interruption note was recorded in the rollout.
|
||||
let path = codex.rollout_path().expect("rollout path");
|
||||
let text = std::fs::read_to_string(&path).expect("read rollout file");
|
||||
let text = read_rollout_text(path.as_path()).expect("read rollout file");
|
||||
let mut saw_interruption_message = false;
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::ThreadConfigSnapshot;
|
||||
use codex_core::config::AgentRoleConfig;
|
||||
use codex_core::read_rollout_text;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
@@ -227,8 +228,7 @@ async fn setup_turn_one_with_custom_spawned_child(
|
||||
.ok_or_else(|| anyhow::anyhow!("expected parent rollout path"))?;
|
||||
let deadline = Instant::now() + Duration::from_secs(6);
|
||||
loop {
|
||||
let has_notification = tokio::fs::read_to_string(&rollout_path)
|
||||
.await
|
||||
let has_notification = read_rollout_text(rollout_path.as_path())
|
||||
.is_ok_and(|rollout| rollout.contains("<subagent_notification>"));
|
||||
if has_notification {
|
||||
break;
|
||||
|
||||
@@ -65,6 +65,7 @@ use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::config_loader::format_config_error_with_source;
|
||||
use codex_core::format_exec_policy_error_with_source;
|
||||
use codex_core::path_utils;
|
||||
use codex_core::read_latest_turn_context;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_git_utils::get_git_repo_root;
|
||||
use codex_otel::set_parent_from_context;
|
||||
@@ -74,7 +75,6 @@ use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::ReviewTarget;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -1116,30 +1116,13 @@ fn all_thread_source_kinds() -> Vec<ThreadSourceKind> {
|
||||
|
||||
async fn latest_thread_cwd(thread: &AppServerThread) -> PathBuf {
|
||||
if let Some(path) = thread.path.as_deref()
|
||||
&& let Some(cwd) = parse_latest_turn_context_cwd(path).await
|
||||
&& let Ok(Some(turn_context)) = read_latest_turn_context(path).await
|
||||
{
|
||||
return cwd;
|
||||
return turn_context.cwd;
|
||||
}
|
||||
thread.cwd.clone()
|
||||
}
|
||||
|
||||
async fn parse_latest_turn_context_cwd(path: &Path) -> Option<PathBuf> {
|
||||
let text = tokio::fs::read_to_string(path).await.ok()?;
|
||||
for line in text.lines().rev() {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
|
||||
continue;
|
||||
};
|
||||
if let RolloutItem::TurnContext(item) = rollout_line.item {
|
||||
return Some(item.cwd);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn cwds_match(current_cwd: &Path, session_cwd: &Path) -> bool {
|
||||
match (
|
||||
path_utils::normalize_for_path_comparison(current_cwd),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#![cfg(not(target_os = "windows"))]
|
||||
#![allow(clippy::expect_used, clippy::unwrap_used)]
|
||||
|
||||
use codex_core::is_rollout_path;
|
||||
use codex_utils_cargo_bin::find_resource;
|
||||
use core_test_support::test_codex_exec::test_codex_exec;
|
||||
use walkdir::WalkDir;
|
||||
@@ -15,7 +16,7 @@ fn session_rollout_count(home_path: &std::path::Path) -> usize {
|
||||
.into_iter()
|
||||
.filter_map(Result::ok)
|
||||
.filter(|entry| entry.file_type().is_file())
|
||||
.filter(|entry| entry.file_name().to_string_lossy().ends_with(".jsonl"))
|
||||
.filter(|entry| is_rollout_path(entry.path()))
|
||||
.count()
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
#![allow(clippy::unwrap_used, clippy::expect_used)]
|
||||
use anyhow::Context;
|
||||
use codex_core::is_rollout_path;
|
||||
use codex_core::read_rollout_text;
|
||||
use codex_utils_cargo_bin::find_resource;
|
||||
use core_test_support::test_codex_exec::test_codex_exec;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -23,11 +25,11 @@ fn find_session_file_containing_marker(
|
||||
if !entry.file_type().is_file() {
|
||||
continue;
|
||||
}
|
||||
if !entry.file_name().to_string_lossy().ends_with(".jsonl") {
|
||||
if !is_rollout_path(entry.path()) {
|
||||
continue;
|
||||
}
|
||||
let path = entry.path();
|
||||
let Ok(content) = std::fs::read_to_string(path) else {
|
||||
let Ok(content) = read_rollout_text(path) else {
|
||||
continue;
|
||||
};
|
||||
// Skip the first meta line and scan remaining JSONL entries.
|
||||
@@ -60,7 +62,7 @@ fn find_session_file_containing_marker(
|
||||
|
||||
/// Extract the conversation UUID from the first SessionMeta line in the rollout file.
|
||||
fn extract_conversation_id(path: &std::path::Path) -> String {
|
||||
let content = std::fs::read_to_string(path).unwrap();
|
||||
let content = read_rollout_text(path).unwrap();
|
||||
let mut lines = content.lines();
|
||||
let meta_line = lines.next().expect("missing meta line");
|
||||
let meta: Value = serde_json::from_str(meta_line).expect("invalid meta json");
|
||||
@@ -72,7 +74,7 @@ fn extract_conversation_id(path: &std::path::Path) -> String {
|
||||
}
|
||||
|
||||
fn last_user_image_count(path: &std::path::Path) -> usize {
|
||||
let content = std::fs::read_to_string(path).unwrap_or_default();
|
||||
let content = read_rollout_text(path).unwrap_or_default();
|
||||
let mut last_count = 0;
|
||||
for line in content.lines() {
|
||||
if line.trim().is_empty() {
|
||||
@@ -160,7 +162,7 @@ fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
|
||||
resumed_path, path,
|
||||
"resume --last should append to existing file"
|
||||
);
|
||||
let content = std::fs::read_to_string(&resumed_path)?;
|
||||
let content = read_rollout_text(&resumed_path)?;
|
||||
assert!(content.contains(&marker));
|
||||
assert!(content.contains(&marker2));
|
||||
Ok(())
|
||||
@@ -214,7 +216,7 @@ fn exec_resume_last_accepts_prompt_after_flag_in_json_mode() -> anyhow::Result<(
|
||||
resumed_path, path,
|
||||
"resume --last should append to existing file"
|
||||
);
|
||||
let content = std::fs::read_to_string(&resumed_path)?;
|
||||
let content = read_rollout_text(&resumed_path)?;
|
||||
assert!(content.contains(&marker));
|
||||
assert!(content.contains(&marker2));
|
||||
Ok(())
|
||||
@@ -417,7 +419,7 @@ fn exec_resume_by_id_appends_to_existing_file() -> anyhow::Result<()> {
|
||||
resumed_path, path,
|
||||
"resume by id should append to existing file"
|
||||
);
|
||||
let content = std::fs::read_to_string(&resumed_path)?;
|
||||
let content = read_rollout_text(&resumed_path)?;
|
||||
assert!(content.contains(&marker));
|
||||
assert!(content.contains(&marker2));
|
||||
Ok(())
|
||||
@@ -493,7 +495,7 @@ fn exec_resume_preserves_cli_configuration_overrides() -> anyhow::Result<()> {
|
||||
.expect("no resumed session file containing marker2");
|
||||
assert_eq!(resumed_path, path, "resume should append to same file");
|
||||
|
||||
let content = std::fs::read_to_string(&resumed_path)?;
|
||||
let content = read_rollout_text(&resumed_path)?;
|
||||
assert!(content.contains(&marker));
|
||||
assert!(content.contains(&marker2));
|
||||
Ok(())
|
||||
|
||||
@@ -43,6 +43,7 @@ tokio = { workspace = true, features = [
|
||||
] }
|
||||
tracing = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
zstd = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
|
||||
215
codex-rs/rollout/src/file_io.rs
Normal file
215
codex-rs/rollout/src/file_io.rs
Normal file
@@ -0,0 +1,215 @@
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io;
|
||||
use std::io::BufRead;
|
||||
use std::io::BufReader;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::time::sleep;
|
||||
|
||||
pub const ROLLOUT_FILE_SUFFIX: &str = ".jsonl";
|
||||
pub const COMPRESSED_ROLLOUT_FILE_SUFFIX: &str = ".jsonl.zst";
|
||||
|
||||
const DEFAULT_ZSTD_LEVEL: i32 = 0;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub(crate) enum RolloutFileEncoding {
|
||||
PlainJsonl,
|
||||
ZstdJsonl,
|
||||
}
|
||||
|
||||
impl RolloutFileEncoding {
|
||||
pub(crate) fn for_path(path: &Path) -> Self {
|
||||
path.file_name()
|
||||
.and_then(|file_name| file_name.to_str())
|
||||
.and_then(file_encoding_from_name)
|
||||
.unwrap_or(Self::PlainJsonl)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the suffix used for newly created rollout files.
|
||||
pub fn preferred_rollout_file_suffix() -> &'static str {
|
||||
COMPRESSED_ROLLOUT_FILE_SUFFIX
|
||||
}
|
||||
|
||||
/// Returns true when `name` matches a rollout filename in either supported encoding.
|
||||
pub fn is_rollout_file_name(name: &str) -> bool {
|
||||
name.starts_with("rollout-") && strip_rollout_file_suffix(name).is_some()
|
||||
}
|
||||
|
||||
/// Returns true when `path` points to a rollout file in either supported encoding.
|
||||
pub fn is_rollout_path(path: &Path) -> bool {
|
||||
path.file_name()
|
||||
.and_then(|file_name| file_name.to_str())
|
||||
.is_some_and(is_rollout_file_name)
|
||||
}
|
||||
|
||||
/// Removes the rollout suffix from `name`, supporting both plain and compressed files.
|
||||
pub fn strip_rollout_file_suffix(name: &str) -> Option<&str> {
|
||||
name.strip_suffix(COMPRESSED_ROLLOUT_FILE_SUFFIX)
|
||||
.or_else(|| name.strip_suffix(ROLLOUT_FILE_SUFFIX))
|
||||
}
|
||||
|
||||
/// Reads the full rollout file contents, transparently handling plain and zstd-compressed files.
|
||||
pub fn read_rollout_text(path: &Path) -> io::Result<String> {
|
||||
let mut text = String::new();
|
||||
match RolloutFileEncoding::for_path(path) {
|
||||
RolloutFileEncoding::PlainJsonl => {
|
||||
File::open(path)?.read_to_string(&mut text)?;
|
||||
}
|
||||
RolloutFileEncoding::ZstdJsonl => {
|
||||
let file = File::open(path)?;
|
||||
let mut decoder = zstd::stream::read::Decoder::new(file)?;
|
||||
match decoder.read_to_string(&mut text) {
|
||||
Ok(_) => {}
|
||||
Err(err) if err.kind() == ErrorKind::UnexpectedEof && !text.is_empty() => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(text)
|
||||
}
|
||||
|
||||
/// Retries reading `path` until the rollout exists and contains non-empty text, or returns the
|
||||
/// last read attempt.
|
||||
pub async fn read_nonempty_rollout_text(path: &Path) -> io::Result<String> {
|
||||
const MAX_ATTEMPTS: usize = 50;
|
||||
const RETRY_DELAY: Duration = Duration::from_millis(20);
|
||||
|
||||
for _ in 0..MAX_ATTEMPTS {
|
||||
if path.exists()
|
||||
&& let Ok(text) = read_rollout_text(path)
|
||||
&& !text.trim().is_empty()
|
||||
{
|
||||
return Ok(text);
|
||||
}
|
||||
sleep(RETRY_DELAY).await;
|
||||
}
|
||||
|
||||
read_rollout_text(path)
|
||||
}
|
||||
|
||||
pub(crate) struct RolloutLineReader {
|
||||
inner: RolloutLineReaderInner,
|
||||
}
|
||||
|
||||
pub(crate) struct RolloutAppendWriter {
|
||||
inner: RolloutAppendWriterInner,
|
||||
}
|
||||
|
||||
enum RolloutLineReaderInner {
|
||||
Plain(BufReader<File>),
|
||||
Zstd(BufReader<zstd::stream::read::Decoder<'static, BufReader<File>>>),
|
||||
}
|
||||
|
||||
enum RolloutAppendWriterInner {
|
||||
Plain(File),
|
||||
Zstd(zstd::stream::write::Encoder<'static, File>),
|
||||
}
|
||||
|
||||
impl RolloutAppendWriter {
|
||||
pub(crate) fn open(path: &Path) -> io::Result<Self> {
|
||||
let Some(parent) = path.parent() else {
|
||||
return Err(io::Error::other(format!(
|
||||
"rollout path has no parent: {}",
|
||||
path.display()
|
||||
)));
|
||||
};
|
||||
std::fs::create_dir_all(parent)?;
|
||||
let file = OpenOptions::new().append(true).create(true).open(path)?;
|
||||
let inner = match RolloutFileEncoding::for_path(path) {
|
||||
RolloutFileEncoding::PlainJsonl => RolloutAppendWriterInner::Plain(file),
|
||||
RolloutFileEncoding::ZstdJsonl => RolloutAppendWriterInner::Zstd(
|
||||
zstd::stream::write::Encoder::new(file, DEFAULT_ZSTD_LEVEL)?,
|
||||
),
|
||||
};
|
||||
Ok(Self { inner })
|
||||
}
|
||||
|
||||
pub(crate) fn append_text(&mut self, text: &str) -> io::Result<()> {
|
||||
match &mut self.inner {
|
||||
RolloutAppendWriterInner::Plain(file) => {
|
||||
file.write_all(text.as_bytes())?;
|
||||
file.flush()
|
||||
}
|
||||
RolloutAppendWriterInner::Zstd(encoder) => {
|
||||
encoder.write_all(text.as_bytes())?;
|
||||
encoder.flush()?;
|
||||
encoder.get_mut().flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn finish(self) -> io::Result<()> {
|
||||
match self.inner {
|
||||
RolloutAppendWriterInner::Plain(mut file) => file.flush(),
|
||||
RolloutAppendWriterInner::Zstd(encoder) => {
|
||||
let mut file = encoder.finish()?;
|
||||
file.flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RolloutLineReader {
|
||||
pub(crate) fn open(path: &Path) -> io::Result<Self> {
|
||||
let inner = match RolloutFileEncoding::for_path(path) {
|
||||
RolloutFileEncoding::PlainJsonl => {
|
||||
RolloutLineReaderInner::Plain(BufReader::new(File::open(path)?))
|
||||
}
|
||||
RolloutFileEncoding::ZstdJsonl => {
|
||||
let file = BufReader::new(File::open(path)?);
|
||||
let decoder = zstd::stream::read::Decoder::with_buffer(file)?;
|
||||
RolloutLineReaderInner::Zstd(BufReader::new(decoder))
|
||||
}
|
||||
};
|
||||
Ok(Self { inner })
|
||||
}
|
||||
|
||||
pub(crate) fn next_line(&mut self) -> io::Result<Option<String>> {
|
||||
let mut line = String::new();
|
||||
let bytes_read = match &mut self.inner {
|
||||
RolloutLineReaderInner::Plain(reader) => reader.read_line(&mut line)?,
|
||||
RolloutLineReaderInner::Zstd(reader) => match reader.read_line(&mut line) {
|
||||
Ok(bytes_read) => bytes_read,
|
||||
Err(err) if err.kind() == ErrorKind::UnexpectedEof && !line.is_empty() => {
|
||||
line.len()
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::UnexpectedEof => 0,
|
||||
Err(err) => return Err(err),
|
||||
},
|
||||
};
|
||||
if bytes_read == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
trim_line_ending(&mut line);
|
||||
Ok(Some(line))
|
||||
}
|
||||
}
|
||||
|
||||
fn file_encoding_from_name(name: &str) -> Option<RolloutFileEncoding> {
|
||||
if name.ends_with(COMPRESSED_ROLLOUT_FILE_SUFFIX) {
|
||||
return Some(RolloutFileEncoding::ZstdJsonl);
|
||||
}
|
||||
if name.ends_with(ROLLOUT_FILE_SUFFIX) {
|
||||
return Some(RolloutFileEncoding::PlainJsonl);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn trim_line_ending(line: &mut String) {
|
||||
if line.ends_with('\n') {
|
||||
line.pop();
|
||||
if line.ends_with('\r') {
|
||||
line.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "file_io_tests.rs"]
|
||||
mod tests;
|
||||
63
codex-rs/rollout/src/file_io_tests.rs
Normal file
63
codex-rs/rollout/src/file_io_tests.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use super::*;
|
||||
use crate::file_io::RolloutAppendWriter;
|
||||
use crate::file_io::RolloutLineReader;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn strip_rollout_file_suffix_supports_both_formats() {
|
||||
assert_eq!(
|
||||
strip_rollout_file_suffix("rollout-2026-01-01T00-00-00-thread.jsonl"),
|
||||
Some("rollout-2026-01-01T00-00-00-thread")
|
||||
);
|
||||
assert_eq!(
|
||||
strip_rollout_file_suffix("rollout-2026-01-01T00-00-00-thread.jsonl.zst"),
|
||||
Some("rollout-2026-01-01T00-00-00-thread")
|
||||
);
|
||||
assert_eq!(strip_rollout_file_suffix("rollout.txt"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plain_rollouts_are_still_readable() -> io::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let path = temp_dir
|
||||
.path()
|
||||
.join("rollout-2026-01-01T00-00-00-thread.jsonl");
|
||||
let mut file = File::create(&path)?;
|
||||
writeln!(file, "{{\"a\":1}}")?;
|
||||
writeln!(file, "{{\"b\":2}}")?;
|
||||
|
||||
assert_eq!(read_rollout_text(path.as_path())?, "{\"a\":1}\n{\"b\":2}\n");
|
||||
|
||||
let mut reader = RolloutLineReader::open(path.as_path())?;
|
||||
assert_eq!(reader.next_line()?, Some("{\"a\":1}".to_string()));
|
||||
assert_eq!(reader.next_line()?, Some("{\"b\":2}".to_string()));
|
||||
assert_eq!(reader.next_line()?, None);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compressed_appends_are_read_back_from_open_stream() -> io::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let path = temp_dir
|
||||
.path()
|
||||
.join("rollout-2026-01-01T00-00-00-thread.jsonl.zst");
|
||||
|
||||
let mut writer = RolloutAppendWriter::open(path.as_path())?;
|
||||
writer.append_text("{\"a\":1}\n")?;
|
||||
assert_eq!(read_rollout_text(path.as_path())?, "{\"a\":1}\n");
|
||||
|
||||
writer.append_text("{\"b\":2}\n")?;
|
||||
assert_eq!(read_rollout_text(path.as_path())?, "{\"a\":1}\n{\"b\":2}\n");
|
||||
|
||||
writer.finish()?;
|
||||
assert_eq!(read_rollout_text(path.as_path())?, "{\"a\":1}\n{\"b\":2}\n");
|
||||
|
||||
let mut reader = RolloutLineReader::open(path.as_path())?;
|
||||
assert_eq!(reader.next_line()?, Some("{\"a\":1}".to_string()));
|
||||
assert_eq!(reader.next_line()?, Some("{\"b\":2}".to_string()));
|
||||
assert_eq!(reader.next_line()?, None);
|
||||
Ok(())
|
||||
}
|
||||
@@ -5,6 +5,7 @@ use std::sync::LazyLock;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
|
||||
pub mod config;
|
||||
pub(crate) mod file_io;
|
||||
pub mod list;
|
||||
pub mod metadata;
|
||||
pub mod policy;
|
||||
@@ -32,10 +33,15 @@ pub static INTERACTIVE_SESSION_SOURCES: LazyLock<Vec<SessionSource>> = LazyLock:
|
||||
pub use codex_protocol::protocol::SessionMeta;
|
||||
pub use config::RolloutConfig;
|
||||
pub use config::RolloutConfigView;
|
||||
pub use file_io::is_rollout_path;
|
||||
pub use file_io::read_nonempty_rollout_text;
|
||||
pub use file_io::read_rollout_text;
|
||||
pub use file_io::strip_rollout_file_suffix;
|
||||
pub use list::find_archived_thread_path_by_id_str;
|
||||
pub use list::find_thread_path_by_id_str;
|
||||
#[deprecated(note = "use find_thread_path_by_id_str")]
|
||||
pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
|
||||
pub use list::read_latest_turn_context;
|
||||
pub use list::rollout_date_parts;
|
||||
pub use policy::EventPersistenceMode;
|
||||
pub use recorder::RolloutRecorder;
|
||||
|
||||
@@ -17,6 +17,10 @@ use uuid::Uuid;
|
||||
|
||||
use super::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::file_io::RolloutLineReader;
|
||||
use super::file_io::is_rollout_file_name;
|
||||
use super::file_io::read_rollout_text;
|
||||
use super::file_io::strip_rollout_file_suffix;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::state_db;
|
||||
use codex_file_search as file_search;
|
||||
@@ -25,6 +29,7 @@ use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
|
||||
|
||||
/// Returned page of thread (thread) summaries.
|
||||
@@ -378,7 +383,7 @@ pub async fn get_threads_in_root(
|
||||
|
||||
/// Load thread file paths from disk using directory traversal.
|
||||
///
|
||||
/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl`
|
||||
/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl(.zst)`
|
||||
/// Returned newest (based on sort key) first.
|
||||
async fn traverse_directories_for_paths(
|
||||
root: PathBuf,
|
||||
@@ -823,7 +828,7 @@ async fn collect_flat_rollout_files(
|
||||
let Some(name_str) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
|
||||
if !is_rollout_file_name(name_str) {
|
||||
continue;
|
||||
}
|
||||
let Some((ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
|
||||
@@ -843,7 +848,7 @@ async fn collect_rollout_day_files(
|
||||
day_path: &Path,
|
||||
) -> io::Result<Vec<(OffsetDateTime, Uuid, PathBuf)>> {
|
||||
let mut day_files = collect_files(day_path, |name_str, path| {
|
||||
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
|
||||
if !is_rollout_file_name(name_str) {
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -856,8 +861,8 @@ async fn collect_rollout_day_files(
|
||||
}
|
||||
|
||||
pub(crate) fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
|
||||
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
|
||||
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
|
||||
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl(.zst)
|
||||
let core = strip_rollout_file_suffix(name)?.strip_prefix("rollout-")?;
|
||||
|
||||
// Scan from the right for a '-' such that the suffix parses as a UUID.
|
||||
let (sep_idx, uuid) = core
|
||||
@@ -913,7 +918,7 @@ async fn collect_flat_files_by_updated_at(
|
||||
let Some(name_str) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
|
||||
if !is_rollout_file_name(name_str) {
|
||||
continue;
|
||||
}
|
||||
let Some((_ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
|
||||
@@ -1001,127 +1006,136 @@ impl<'a> ProviderMatcher<'a> {
|
||||
}
|
||||
|
||||
async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTailSummary> {
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
read_rollout_head(path, move |reader| {
|
||||
let mut summary = HeadTailSummary::default();
|
||||
let mut lines_scanned = 0usize;
|
||||
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let mut summary = HeadTailSummary::default();
|
||||
let mut lines_scanned = 0usize;
|
||||
while lines_scanned < head_limit
|
||||
|| (summary.saw_session_meta
|
||||
&& !summary.saw_user_event
|
||||
&& lines_scanned < head_limit + USER_EVENT_SCAN_LIMIT)
|
||||
{
|
||||
let Some(line) = reader.next_line()? else {
|
||||
break;
|
||||
};
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
lines_scanned += 1;
|
||||
|
||||
while lines_scanned < head_limit
|
||||
|| (summary.saw_session_meta
|
||||
&& !summary.saw_user_event
|
||||
&& lines_scanned < head_limit + USER_EVENT_SCAN_LIMIT)
|
||||
{
|
||||
let line_opt = lines.next_line().await?;
|
||||
let Some(line) = line_opt else { break };
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
lines_scanned += 1;
|
||||
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let parsed: Result<RolloutLine, _> = serde_json::from_str(trimmed);
|
||||
let Ok(rollout_line) = parsed else { continue };
|
||||
|
||||
match rollout_line.item {
|
||||
RolloutItem::SessionMeta(session_meta_line) => {
|
||||
if !summary.saw_session_meta {
|
||||
summary.source = Some(session_meta_line.meta.source.clone());
|
||||
summary.agent_nickname = session_meta_line.meta.agent_nickname.clone();
|
||||
summary.agent_role = session_meta_line.meta.agent_role.clone();
|
||||
summary.model_provider = session_meta_line.meta.model_provider.clone();
|
||||
summary.thread_id = Some(session_meta_line.meta.id);
|
||||
summary.cwd = Some(session_meta_line.meta.cwd.clone());
|
||||
summary.git_branch = session_meta_line
|
||||
.git
|
||||
.as_ref()
|
||||
.and_then(|git| git.branch.clone());
|
||||
summary.git_sha = session_meta_line
|
||||
.git
|
||||
.as_ref()
|
||||
.and_then(|git| git.commit_hash.as_ref().map(|sha| sha.0.clone()));
|
||||
summary.git_origin_url = session_meta_line
|
||||
.git
|
||||
.as_ref()
|
||||
.and_then(|git| git.repository_url.clone());
|
||||
summary.cli_version = Some(session_meta_line.meta.cli_version);
|
||||
summary.created_at = Some(session_meta_line.meta.timestamp.clone());
|
||||
summary.saw_session_meta = true;
|
||||
match rollout_line.item {
|
||||
RolloutItem::SessionMeta(session_meta_line) => {
|
||||
if !summary.saw_session_meta {
|
||||
summary.source = Some(session_meta_line.meta.source.clone());
|
||||
summary.agent_nickname = session_meta_line.meta.agent_nickname.clone();
|
||||
summary.agent_role = session_meta_line.meta.agent_role.clone();
|
||||
summary.model_provider = session_meta_line.meta.model_provider.clone();
|
||||
summary.thread_id = Some(session_meta_line.meta.id);
|
||||
summary.cwd = Some(session_meta_line.meta.cwd.clone());
|
||||
summary.git_branch = session_meta_line
|
||||
.git
|
||||
.as_ref()
|
||||
.and_then(|git| git.branch.clone());
|
||||
summary.git_sha = session_meta_line
|
||||
.git
|
||||
.as_ref()
|
||||
.and_then(|git| git.commit_hash.as_ref().map(|sha| sha.0.clone()));
|
||||
summary.git_origin_url = session_meta_line
|
||||
.git
|
||||
.as_ref()
|
||||
.and_then(|git| git.repository_url.clone());
|
||||
summary.cli_version = Some(session_meta_line.meta.cli_version);
|
||||
summary.created_at = Some(session_meta_line.meta.timestamp.clone());
|
||||
summary.saw_session_meta = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
RolloutItem::ResponseItem(_) => {
|
||||
summary.created_at = summary
|
||||
.created_at
|
||||
.clone()
|
||||
.or_else(|| Some(rollout_line.timestamp.clone()));
|
||||
}
|
||||
RolloutItem::TurnContext(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::Compacted(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::EventMsg(ev) => {
|
||||
if let EventMsg::UserMessage(user) = ev {
|
||||
summary.saw_user_event = true;
|
||||
if summary.first_user_message.is_none() {
|
||||
let message = strip_user_message_prefix(user.message.as_str()).to_string();
|
||||
if !message.is_empty() {
|
||||
summary.first_user_message = Some(message);
|
||||
RolloutItem::ResponseItem(_) => {
|
||||
summary.created_at = summary
|
||||
.created_at
|
||||
.clone()
|
||||
.or_else(|| Some(rollout_line.timestamp.clone()));
|
||||
}
|
||||
RolloutItem::TurnContext(_) => {}
|
||||
RolloutItem::Compacted(_) => {}
|
||||
RolloutItem::EventMsg(event) => {
|
||||
if let EventMsg::UserMessage(user) = event {
|
||||
summary.saw_user_event = true;
|
||||
if summary.first_user_message.is_none() {
|
||||
let message =
|
||||
strip_user_message_prefix(user.message.as_str()).to_string();
|
||||
if !message.is_empty() {
|
||||
summary.first_user_message = Some(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if summary.saw_session_meta && summary.saw_user_event {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if summary.saw_session_meta && summary.saw_user_event {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(summary)
|
||||
Ok(summary)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Read up to `HEAD_RECORD_LIMIT` records from the start of the rollout file at `path`.
|
||||
/// This should be enough to produce a summary including the session meta line.
|
||||
pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Value>> {
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
read_rollout_head(path, |reader| {
|
||||
let mut head = Vec::new();
|
||||
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let mut head = Vec::new();
|
||||
|
||||
while head.len() < HEAD_RECORD_LIMIT {
|
||||
let Some(line) = lines.next_line().await? else {
|
||||
break;
|
||||
};
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) {
|
||||
match rollout_line.item {
|
||||
RolloutItem::SessionMeta(session_meta_line) => {
|
||||
if let Ok(value) = serde_json::to_value(session_meta_line) {
|
||||
head.push(value);
|
||||
while head.len() < HEAD_RECORD_LIMIT {
|
||||
let Some(line) = reader.next_line()? else {
|
||||
break;
|
||||
};
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) {
|
||||
match rollout_line.item {
|
||||
RolloutItem::SessionMeta(session_meta_line) => {
|
||||
if let Ok(value) = serde_json::to_value(session_meta_line) {
|
||||
head.push(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
RolloutItem::ResponseItem(item) => {
|
||||
if let Ok(value) = serde_json::to_value(item) {
|
||||
head.push(value);
|
||||
RolloutItem::ResponseItem(item) => {
|
||||
if let Ok(value) = serde_json::to_value(item) {
|
||||
head.push(value);
|
||||
}
|
||||
}
|
||||
RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => {}
|
||||
}
|
||||
RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(head)
|
||||
Ok(head)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn read_rollout_head<T, F>(path: &Path, read: F) -> io::Result<T>
|
||||
where
|
||||
T: Send + 'static,
|
||||
F: FnOnce(&mut RolloutLineReader) -> io::Result<T> + Send + 'static,
|
||||
{
|
||||
let path = path.to_path_buf();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut reader = RolloutLineReader::open(path.as_path())?;
|
||||
read(&mut reader)
|
||||
})
|
||||
.await
|
||||
.map_err(|err| io::Error::other(format!("failed to read rollout head: {err}")))?
|
||||
}
|
||||
|
||||
fn strip_user_message_prefix(text: &str) -> &str {
|
||||
@@ -1149,6 +1163,29 @@ pub async fn read_session_meta_line(path: &Path) -> io::Result<SessionMetaLine>
|
||||
})
|
||||
}
|
||||
|
||||
/// Read the latest TurnContext item from a rollout file, if present.
|
||||
pub async fn read_latest_turn_context(path: &Path) -> io::Result<Option<TurnContextItem>> {
|
||||
let path = path.to_path_buf();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let text = read_rollout_text(path.as_path())?;
|
||||
for line in text.lines().rev() {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
|
||||
continue;
|
||||
};
|
||||
if let RolloutItem::TurnContext(item) = rollout_line.item {
|
||||
return Ok(Some(item));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
})
|
||||
.await
|
||||
.map_err(|err| io::Error::other(format!("failed to read latest turn context: {err}")))?
|
||||
}
|
||||
|
||||
async fn file_modified_time(path: &Path) -> io::Result<Option<OffsetDateTime>> {
|
||||
let meta = tokio::fs::metadata(path).await?;
|
||||
let modified = meta.modified().ok();
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use crate::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use crate::SESSIONS_SUBDIR;
|
||||
use crate::config::RolloutConfigView;
|
||||
use crate::file_io::is_rollout_file_name;
|
||||
use crate::file_io::strip_rollout_file_suffix;
|
||||
use crate::list;
|
||||
use crate::list::parse_timestamp_uuid_from_filename;
|
||||
use crate::recorder::RolloutRecorder;
|
||||
@@ -30,7 +32,6 @@ use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
const ROLLOUT_PREFIX: &str = "rollout-";
|
||||
const ROLLOUT_SUFFIX: &str = ".jsonl";
|
||||
const BACKFILL_BATCH_SIZE: usize = 200;
|
||||
#[cfg(not(test))]
|
||||
const BACKFILL_LEASE_SECONDS: i64 = 900;
|
||||
@@ -80,7 +81,7 @@ pub fn builder_from_items(
|
||||
}
|
||||
|
||||
let file_name = rollout_path.file_name()?.to_str()?;
|
||||
if !file_name.starts_with(ROLLOUT_PREFIX) || !file_name.ends_with(ROLLOUT_SUFFIX) {
|
||||
if !is_rollout_file_name(file_name) {
|
||||
return None;
|
||||
}
|
||||
let (created_ts, uuid) = parse_timestamp_uuid_from_filename(file_name)?;
|
||||
@@ -430,7 +431,7 @@ async fn collect_rollout_paths(root: &Path) -> std::io::Result<Vec<PathBuf>> {
|
||||
let Some(name) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
if name.starts_with(ROLLOUT_PREFIX) && name.ends_with(ROLLOUT_SUFFIX) {
|
||||
if name.starts_with(ROLLOUT_PREFIX) && strip_rollout_file_suffix(name).is_some() {
|
||||
paths.push(path);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later.
|
||||
//! Persist Codex session rollouts (`.jsonl` / `.jsonl.zst`) so sessions can be replayed or
|
||||
//! inspected later.
|
||||
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Error as IoError;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
@@ -17,7 +16,6 @@ use time::OffsetDateTime;
|
||||
use time::format_description::FormatItem;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use time::macros::format_description;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::oneshot;
|
||||
@@ -27,6 +25,9 @@ use tracing::warn;
|
||||
|
||||
use super::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::file_io::RolloutAppendWriter;
|
||||
use super::file_io::RolloutLineReader;
|
||||
use super::file_io::preferred_rollout_file_suffix;
|
||||
use super::list::Cursor;
|
||||
use super::list::ThreadItem;
|
||||
use super::list::ThreadListConfig;
|
||||
@@ -37,6 +38,7 @@ use super::list::get_threads;
|
||||
use super::list::get_threads_in_root;
|
||||
use super::list::parse_cursor;
|
||||
use super::list::parse_timestamp_uuid_from_filename;
|
||||
use super::list::read_latest_turn_context;
|
||||
use super::metadata;
|
||||
use super::policy::EventPersistenceMode;
|
||||
use super::policy::is_persisted_response_item;
|
||||
@@ -61,7 +63,8 @@ use codex_utils_path as path_utils;
|
||||
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
|
||||
/// every update.
|
||||
///
|
||||
/// Rollouts are recorded as JSONL and can be inspected with tools such as:
|
||||
/// Rollouts are recorded as JSONL, optionally compressed with zstd, and can be inspected after
|
||||
/// decompression with tools such as:
|
||||
///
|
||||
/// ```ignore
|
||||
/// $ jq -C . ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
|
||||
@@ -366,14 +369,14 @@ impl RolloutRecorder {
|
||||
/// For newly created sessions, this precomputes path/metadata and defers
|
||||
/// file creation/open until an explicit `persist()` call.
|
||||
///
|
||||
/// For resumed sessions, this immediately opens the existing rollout file.
|
||||
/// For resumed sessions, this validates the existing rollout file path immediately.
|
||||
pub async fn new(
|
||||
config: &impl RolloutConfigView,
|
||||
params: RolloutRecorderParams,
|
||||
state_db_ctx: Option<StateDbHandle>,
|
||||
state_builder: Option<ThreadMetadataBuilder>,
|
||||
) -> std::io::Result<Self> {
|
||||
let (file, deferred_log_file_info, rollout_path, meta, event_persistence_mode) =
|
||||
let (writer, deferred_log_file_info, rollout_path, meta, event_persistence_mode) =
|
||||
match params {
|
||||
RolloutRecorderParams::Create {
|
||||
conversation_id,
|
||||
@@ -429,18 +432,16 @@ impl RolloutRecorder {
|
||||
RolloutRecorderParams::Resume {
|
||||
path,
|
||||
event_persistence_mode,
|
||||
} => (
|
||||
Some(
|
||||
tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.await?,
|
||||
),
|
||||
None,
|
||||
path,
|
||||
None,
|
||||
event_persistence_mode,
|
||||
),
|
||||
} => {
|
||||
tokio::fs::metadata(&path).await?;
|
||||
(
|
||||
Some(RolloutFileWriter::open(path.clone()).await?),
|
||||
None,
|
||||
path,
|
||||
None,
|
||||
event_persistence_mode,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
// Clone the cwd for the spawned task to collect git info asynchronously
|
||||
@@ -450,11 +451,9 @@ impl RolloutRecorder {
|
||||
// future will yield, which is fine – we only need to ensure we do not
|
||||
// perform *blocking* I/O on the caller's thread.
|
||||
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
|
||||
// Spawn a Tokio task that owns the file handle and performs async
|
||||
// writes. Using `tokio::fs::File` keeps everything on the async I/O
|
||||
// driver instead of blocking the runtime.
|
||||
// Spawn a Tokio task that owns the rollout writer.
|
||||
tokio::task::spawn(rollout_writer(
|
||||
file,
|
||||
writer,
|
||||
deferred_log_file_info,
|
||||
rx,
|
||||
meta,
|
||||
@@ -532,65 +531,10 @@ impl RolloutRecorder {
|
||||
path: &Path,
|
||||
) -> std::io::Result<(Vec<RolloutItem>, Option<ThreadId>, usize)> {
|
||||
trace!("Resuming rollout from {path:?}");
|
||||
let text = tokio::fs::read_to_string(path).await?;
|
||||
if text.trim().is_empty() {
|
||||
return Err(IoError::other("empty session file"));
|
||||
}
|
||||
|
||||
let mut items: Vec<RolloutItem> = Vec::new();
|
||||
let mut thread_id: Option<ThreadId> = None;
|
||||
let mut parse_errors = 0usize;
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let v: Value = match serde_json::from_str(line) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!("failed to parse line as JSON: {line:?}, error: {e}");
|
||||
parse_errors = parse_errors.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Parse the rollout line structure
|
||||
match serde_json::from_value::<RolloutLine>(v.clone()) {
|
||||
Ok(rollout_line) => match rollout_line.item {
|
||||
RolloutItem::SessionMeta(session_meta_line) => {
|
||||
// Use the FIRST SessionMeta encountered in the file as the canonical
|
||||
// thread id and main session information. Keep all items intact.
|
||||
if thread_id.is_none() {
|
||||
thread_id = Some(session_meta_line.meta.id);
|
||||
}
|
||||
items.push(RolloutItem::SessionMeta(session_meta_line));
|
||||
}
|
||||
RolloutItem::ResponseItem(item) => {
|
||||
items.push(RolloutItem::ResponseItem(item));
|
||||
}
|
||||
RolloutItem::Compacted(item) => {
|
||||
items.push(RolloutItem::Compacted(item));
|
||||
}
|
||||
RolloutItem::TurnContext(item) => {
|
||||
items.push(RolloutItem::TurnContext(item));
|
||||
}
|
||||
RolloutItem::EventMsg(_ev) => {
|
||||
items.push(RolloutItem::EventMsg(_ev));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
trace!("failed to parse rollout line: {e}");
|
||||
parse_errors = parse_errors.saturating_add(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Resumed rollout with {} items, thread ID: {:?}, parse errors: {}",
|
||||
items.len(),
|
||||
thread_id,
|
||||
parse_errors,
|
||||
);
|
||||
Ok((items, thread_id, parse_errors))
|
||||
let path = path.to_path_buf();
|
||||
tokio::task::spawn_blocking(move || load_rollout_items_sync(path.as_path()))
|
||||
.await
|
||||
.map_err(|err| IoError::other(format!("failed to read rollout items: {err}")))?
|
||||
}
|
||||
|
||||
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
|
||||
@@ -627,6 +571,85 @@ impl RolloutRecorder {
|
||||
}
|
||||
}
|
||||
|
||||
fn load_rollout_items_sync(
|
||||
path: &Path,
|
||||
) -> std::io::Result<(Vec<RolloutItem>, Option<ThreadId>, usize)> {
|
||||
let mut reader = RolloutLineReader::open(path)?;
|
||||
let mut items = Vec::new();
|
||||
let mut thread_id = None;
|
||||
let mut parse_errors = 0usize;
|
||||
let mut saw_content = false;
|
||||
|
||||
loop {
|
||||
let line = match reader.next_line() {
|
||||
Ok(Some(line)) => line,
|
||||
Ok(None) => break,
|
||||
Err(err) => {
|
||||
if !saw_content {
|
||||
return Err(err);
|
||||
}
|
||||
warn!(
|
||||
"stopping rollout resume after read error for {path:?}; preserving parsed items: {err}"
|
||||
);
|
||||
parse_errors = parse_errors.saturating_add(1);
|
||||
break;
|
||||
}
|
||||
};
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
saw_content = true;
|
||||
|
||||
let value: Value = match serde_json::from_str(line.as_str()) {
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
warn!("failed to parse line as JSON: {line:?}, error: {err}");
|
||||
parse_errors = parse_errors.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match serde_json::from_value::<RolloutLine>(value.clone()) {
|
||||
Ok(rollout_line) => match rollout_line.item {
|
||||
RolloutItem::SessionMeta(session_meta_line) => {
|
||||
if thread_id.is_none() {
|
||||
thread_id = Some(session_meta_line.meta.id);
|
||||
}
|
||||
items.push(RolloutItem::SessionMeta(session_meta_line));
|
||||
}
|
||||
RolloutItem::ResponseItem(item) => {
|
||||
items.push(RolloutItem::ResponseItem(item));
|
||||
}
|
||||
RolloutItem::Compacted(item) => {
|
||||
items.push(RolloutItem::Compacted(item));
|
||||
}
|
||||
RolloutItem::TurnContext(item) => {
|
||||
items.push(RolloutItem::TurnContext(item));
|
||||
}
|
||||
RolloutItem::EventMsg(event) => {
|
||||
items.push(RolloutItem::EventMsg(event));
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
trace!("failed to parse rollout line: {err}");
|
||||
parse_errors = parse_errors.saturating_add(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !saw_content {
|
||||
return Err(IoError::other("empty session file"));
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Resumed rollout with {} items, thread ID: {:?}, parse errors: {}",
|
||||
items.len(),
|
||||
thread_id,
|
||||
parse_errors,
|
||||
);
|
||||
Ok((items, thread_id, parse_errors))
|
||||
}
|
||||
|
||||
fn truncate_fs_page(
|
||||
mut page: ThreadsPage,
|
||||
page_size: usize,
|
||||
@@ -680,7 +703,10 @@ fn precompute_log_file_info(
|
||||
.format(format)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
|
||||
let filename = format!("rollout-{date_str}-{conversation_id}.jsonl");
|
||||
let filename = format!(
|
||||
"rollout-{date_str}-{conversation_id}{}",
|
||||
preferred_rollout_file_suffix()
|
||||
);
|
||||
|
||||
let path = dir.join(filename);
|
||||
|
||||
@@ -690,24 +716,9 @@ fn precompute_log_file_info(
|
||||
timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
fn open_log_file(path: &Path) -> std::io::Result<File> {
|
||||
let Some(parent) = path.parent() else {
|
||||
return Err(IoError::other(format!(
|
||||
"rollout path has no parent: {}",
|
||||
path.display()
|
||||
)));
|
||||
};
|
||||
fs::create_dir_all(parent)?;
|
||||
std::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.create(true)
|
||||
.open(path)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn rollout_writer(
|
||||
file: Option<tokio::fs::File>,
|
||||
mut writer: Option<RolloutFileWriter>,
|
||||
mut deferred_log_file_info: Option<LogFileInfo>,
|
||||
mut rx: mpsc::Receiver<RolloutCmd>,
|
||||
mut meta: Option<SessionMeta>,
|
||||
@@ -718,7 +729,6 @@ async fn rollout_writer(
|
||||
default_provider: String,
|
||||
generate_memories: bool,
|
||||
) -> std::io::Result<()> {
|
||||
let mut writer = file.map(|file| JsonlWriter { file });
|
||||
let mut buffered_items = Vec::<RolloutItem>::new();
|
||||
if let Some(builder) = state_builder.as_mut() {
|
||||
builder.rollout_path = rollout_path.clone();
|
||||
@@ -773,10 +783,7 @@ async fn rollout_writer(
|
||||
"deferred rollout recorder missing log file metadata",
|
||||
));
|
||||
};
|
||||
let file = open_log_file(log_file_info.path.as_path())?;
|
||||
writer = Some(JsonlWriter {
|
||||
file: tokio::fs::File::from_std(file),
|
||||
});
|
||||
writer = Some(RolloutFileWriter::open(log_file_info.path).await?);
|
||||
|
||||
if let Some(session_meta) = meta.take() {
|
||||
write_session_meta(
|
||||
@@ -817,27 +824,31 @@ async fn rollout_writer(
|
||||
let _ = ack.send(());
|
||||
}
|
||||
RolloutCmd::Flush { ack } => {
|
||||
// Deferred fresh threads may not have an initialized file yet.
|
||||
if let Some(writer) = writer.as_mut()
|
||||
&& let Err(e) = writer.file.flush().await
|
||||
{
|
||||
let _ = ack.send(());
|
||||
return Err(e);
|
||||
}
|
||||
let _ = ack.send(());
|
||||
}
|
||||
RolloutCmd::Shutdown { ack } => {
|
||||
if let Some(writer) = writer.as_mut()
|
||||
&& let Err(err) = writer.finish().await
|
||||
{
|
||||
let _ = ack.send(());
|
||||
return Err(err);
|
||||
}
|
||||
let _ = ack.send(());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(writer) = writer.as_mut() {
|
||||
writer.finish().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn write_session_meta(
|
||||
mut writer: Option<&mut JsonlWriter>,
|
||||
mut writer: Option<&mut RolloutFileWriter>,
|
||||
session_meta: SessionMeta,
|
||||
cwd: &Path,
|
||||
rollout_path: &Path,
|
||||
@@ -876,7 +887,7 @@ async fn write_session_meta(
|
||||
}
|
||||
|
||||
async fn write_and_reconcile_items(
|
||||
mut writer: Option<&mut JsonlWriter>,
|
||||
mut writer: Option<&mut RolloutFileWriter>,
|
||||
items: &[RolloutItem],
|
||||
rollout_path: &Path,
|
||||
state_db_ctx: Option<&StateRuntime>,
|
||||
@@ -884,9 +895,7 @@ async fn write_and_reconcile_items(
|
||||
default_provider: &str,
|
||||
) -> std::io::Result<()> {
|
||||
if let Some(writer) = writer.as_mut() {
|
||||
for item in items {
|
||||
writer.write_rollout_item(item).await?;
|
||||
}
|
||||
writer.write_rollout_items(items).await?;
|
||||
}
|
||||
sync_thread_state_after_write(
|
||||
state_db_ctx,
|
||||
@@ -949,8 +958,12 @@ async fn sync_thread_state_after_write(
|
||||
.await;
|
||||
}
|
||||
|
||||
struct JsonlWriter {
|
||||
file: tokio::fs::File,
|
||||
struct RolloutFileWriter {
|
||||
inner: Option<BlockingRolloutFileWriter>,
|
||||
}
|
||||
|
||||
struct BlockingRolloutFileWriter {
|
||||
writer: RolloutAppendWriter,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
@@ -960,28 +973,68 @@ struct RolloutLineRef<'a> {
|
||||
item: &'a RolloutItem,
|
||||
}
|
||||
|
||||
impl JsonlWriter {
|
||||
impl RolloutFileWriter {
|
||||
async fn open(path: PathBuf) -> std::io::Result<Self> {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let writer = RolloutAppendWriter::open(path.as_path())?;
|
||||
Ok(Self {
|
||||
inner: Some(BlockingRolloutFileWriter { writer }),
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|err| IoError::other(format!("failed to open rollout writer: {err}")))?
|
||||
}
|
||||
|
||||
async fn write_rollout_item(&mut self, rollout_item: &RolloutItem) -> std::io::Result<()> {
|
||||
self.write_rollout_items(std::slice::from_ref(rollout_item))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn write_rollout_items(&mut self, rollout_items: &[RolloutItem]) -> std::io::Result<()> {
|
||||
if rollout_items.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timestamp_format: &[FormatItem] = format_description!(
|
||||
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
|
||||
);
|
||||
let timestamp = OffsetDateTime::now_utc()
|
||||
.format(timestamp_format)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
|
||||
let line = RolloutLineRef {
|
||||
timestamp,
|
||||
item: rollout_item,
|
||||
let mut lines = Vec::with_capacity(rollout_items.len());
|
||||
for rollout_item in rollout_items {
|
||||
let timestamp = OffsetDateTime::now_utc()
|
||||
.format(timestamp_format)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
let line = RolloutLineRef {
|
||||
timestamp,
|
||||
item: rollout_item,
|
||||
};
|
||||
let mut json = serde_json::to_string(&line)?;
|
||||
json.push('\n');
|
||||
lines.push(json);
|
||||
}
|
||||
let Some(writer) = self.inner.take() else {
|
||||
return Err(IoError::other("rollout writer missing inner state"));
|
||||
};
|
||||
self.write_line(&line).await
|
||||
}
|
||||
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
|
||||
let mut json = serde_json::to_string(item)?;
|
||||
json.push('\n');
|
||||
self.file.write_all(json.as_bytes()).await?;
|
||||
self.file.flush().await?;
|
||||
let writer = tokio::task::spawn_blocking(move || {
|
||||
let mut writer = writer;
|
||||
for line in lines {
|
||||
writer.writer.append_text(line.as_str())?;
|
||||
}
|
||||
Ok::<BlockingRolloutFileWriter, IoError>(writer)
|
||||
})
|
||||
.await
|
||||
.map_err(|err| IoError::other(format!("failed to write rollout line: {err}")))??;
|
||||
self.inner = Some(writer);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finish(&mut self) -> std::io::Result<()> {
|
||||
let Some(writer) = self.inner.take() else {
|
||||
return Ok(());
|
||||
};
|
||||
tokio::task::spawn_blocking(move || writer.writer.finish())
|
||||
.await
|
||||
.map_err(|err| IoError::other(format!("failed to finish rollout writer: {err}")))?
|
||||
}
|
||||
}
|
||||
|
||||
impl From<codex_state::ThreadsPage> for ThreadsPage {
|
||||
@@ -1050,20 +1103,12 @@ async fn resume_candidate_matches_cwd(
|
||||
cwd: &Path,
|
||||
default_provider: &str,
|
||||
) -> bool {
|
||||
if cached_cwd.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd)) {
|
||||
return true;
|
||||
if let Ok(Some(turn_context)) = read_latest_turn_context(rollout_path).await {
|
||||
return cwd_matches(turn_context.cwd.as_path(), cwd);
|
||||
}
|
||||
|
||||
if let Ok((items, _, _)) = RolloutRecorder::load_rollout_items(rollout_path).await
|
||||
&& let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()),
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
{
|
||||
return cwd_matches(latest_turn_context_cwd, cwd);
|
||||
if cached_cwd.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
metadata::extract_metadata_from_rollout(rollout_path, default_provider)
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
|
||||
use super::*;
|
||||
use crate::config::RolloutConfig;
|
||||
use crate::file_io::RolloutAppendWriter;
|
||||
use crate::file_io::read_rollout_text;
|
||||
use chrono::TimeZone;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
@@ -124,7 +126,7 @@ async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<
|
||||
recorder.persist().await?;
|
||||
assert!(rollout_path.exists(), "rollout file should be materialized");
|
||||
|
||||
let text = std::fs::read_to_string(&rollout_path)?;
|
||||
let text = read_rollout_text(rollout_path.as_path())?;
|
||||
assert!(
|
||||
text.contains("\"type\":\"session_meta\""),
|
||||
"expected session metadata in rollout"
|
||||
@@ -139,7 +141,7 @@ async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<
|
||||
buffered_idx < user_idx,
|
||||
"buffered items should preserve ordering"
|
||||
);
|
||||
let text_after_second_persist = std::fs::read_to_string(&rollout_path)?;
|
||||
let text_after_second_persist = read_rollout_text(rollout_path.as_path())?;
|
||||
assert_eq!(text_after_second_persist, text);
|
||||
|
||||
recorder.shutdown().await?;
|
||||
@@ -483,3 +485,68 @@ async fn resume_candidate_matches_cwd_reads_latest_turn_context() -> std::io::Re
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn load_rollout_items_preserves_parsed_items_when_zstd_tail_is_truncated()
|
||||
-> std::io::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let thread_id = ThreadId::new();
|
||||
let rollout_path = temp_dir
|
||||
.path()
|
||||
.join(format!("rollout-2026-01-01T00-00-00-{thread_id}.jsonl.zst"));
|
||||
|
||||
let session_meta = serde_json::json!({
|
||||
"timestamp": "2026-01-01T00:00:00Z",
|
||||
"type": "session_meta",
|
||||
"payload": {
|
||||
"id": thread_id,
|
||||
"timestamp": "2026-01-01T00:00:00Z",
|
||||
"cwd": ".",
|
||||
"originator": "test_originator",
|
||||
"cli_version": "test_version",
|
||||
"source": "cli",
|
||||
"model_provider": "test-provider",
|
||||
},
|
||||
});
|
||||
let user_event = serde_json::json!({
|
||||
"timestamp": "2026-01-01T00:00:01Z",
|
||||
"type": "event_msg",
|
||||
"payload": {
|
||||
"type": "user_message",
|
||||
"message": "hello",
|
||||
"kind": "plain",
|
||||
},
|
||||
});
|
||||
|
||||
let mut writer = RolloutAppendWriter::open(&rollout_path)?;
|
||||
writer.append_text(&format!("{session_meta}\n"))?;
|
||||
writer.append_text(&format!("{user_event}\n"))?;
|
||||
drop(writer);
|
||||
|
||||
let file = File::options().write(true).open(&rollout_path)?;
|
||||
let truncated_len = file.metadata()?.len().saturating_sub(1);
|
||||
file.set_len(truncated_len)?;
|
||||
|
||||
let (items, parsed_thread_id, parse_errors) =
|
||||
RolloutRecorder::load_rollout_items(&rollout_path).await?;
|
||||
|
||||
assert_eq!(parsed_thread_id, Some(thread_id));
|
||||
assert_eq!(items.len(), 1);
|
||||
let Some(RolloutItem::SessionMeta(session_meta_line)) = items.first() else {
|
||||
panic!("expected recovered session meta");
|
||||
};
|
||||
assert_eq!(session_meta_line.meta.id, thread_id);
|
||||
assert_eq!(session_meta_line.meta.timestamp, "2026-01-01T00:00:00Z");
|
||||
assert_eq!(session_meta_line.meta.cwd, PathBuf::from("."));
|
||||
assert_eq!(session_meta_line.meta.originator, "test_originator");
|
||||
assert_eq!(session_meta_line.meta.cli_version, "test_version");
|
||||
assert_eq!(session_meta_line.meta.source, SessionSource::Cli);
|
||||
assert_eq!(
|
||||
session_meta_line.meta.model_provider.as_deref(),
|
||||
Some("test-provider")
|
||||
);
|
||||
assert!(session_meta_line.git.is_none());
|
||||
assert_eq!(parse_errors, 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use time::macros::format_description;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::INTERACTIVE_SESSION_SOURCES;
|
||||
use crate::file_io::read_rollout_text;
|
||||
use crate::find_thread_path_by_id_str;
|
||||
use crate::list::Cursor;
|
||||
use crate::list::ThreadItem;
|
||||
@@ -26,6 +27,7 @@ use crate::list::ThreadSortKey;
|
||||
use crate::list::ThreadsPage;
|
||||
use crate::list::get_threads;
|
||||
use crate::list::read_head_for_summary;
|
||||
use crate::list::read_latest_turn_context;
|
||||
use crate::rollout_date_parts;
|
||||
use anyhow::Result;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -37,6 +39,7 @@ use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
|
||||
const NO_SOURCE_FILTER: &[SessionSource] = &[];
|
||||
@@ -89,6 +92,47 @@ async fn insert_state_db_thread(
|
||||
.expect("state db upsert should succeed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_latest_turn_context_reads_compressed_rollout() -> Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
let path = temp.path().join("rollout.jsonl.zst");
|
||||
let mut encoder = zstd::stream::write::Encoder::new(File::create(&path)?, 0)?;
|
||||
for (idx, cwd) in ["/tmp/first", "/tmp/second"].into_iter().enumerate() {
|
||||
let line = RolloutLine {
|
||||
timestamp: format!("t{idx}"),
|
||||
item: RolloutItem::TurnContext(TurnContextItem {
|
||||
turn_id: Some(format!("turn-{idx}")),
|
||||
trace_id: None,
|
||||
cwd: cwd.into(),
|
||||
current_date: None,
|
||||
timezone: None,
|
||||
approval_policy: codex_protocol::protocol::AskForApproval::OnRequest,
|
||||
sandbox_policy: codex_protocol::protocol::SandboxPolicy::new_read_only_policy(),
|
||||
network: None,
|
||||
model: format!("model-{idx}"),
|
||||
personality: None,
|
||||
collaboration_mode: None,
|
||||
realtime_active: None,
|
||||
effort: None,
|
||||
summary: codex_protocol::config_types::ReasoningSummary::Auto,
|
||||
user_instructions: None,
|
||||
developer_instructions: None,
|
||||
final_output_json_schema: None,
|
||||
truncation_policy: None,
|
||||
}),
|
||||
};
|
||||
writeln!(encoder, "{}", serde_json::to_string(&line)?)?;
|
||||
}
|
||||
let _ = encoder.finish()?;
|
||||
|
||||
let turn_context = read_latest_turn_context(path.as_path())
|
||||
.await?
|
||||
.expect("latest turn context");
|
||||
assert_eq!(turn_context.cwd, Path::new("/tmp/second"));
|
||||
assert_eq!(turn_context.model, "model-1");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO(jif) fix
|
||||
// #[tokio::test]
|
||||
// async fn list_threads_prefers_state_db_when_available() {
|
||||
@@ -918,7 +962,7 @@ async fn test_get_thread_contents() {
|
||||
.unwrap();
|
||||
let path = &page.items[0].path;
|
||||
|
||||
let content = tokio::fs::read_to_string(path).await.unwrap();
|
||||
let content = read_rollout_text(path).unwrap();
|
||||
|
||||
// Page equality (single item)
|
||||
let expected_path = home
|
||||
|
||||
@@ -148,3 +148,4 @@ rand = { workspace = true }
|
||||
serial_test = { workspace = true }
|
||||
vt100 = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
zstd = { workspace = true }
|
||||
|
||||
@@ -37,6 +37,7 @@ use codex_core::config_loader::format_config_error_with_source;
|
||||
use codex_core::default_client::set_default_client_residency_requirement;
|
||||
use codex_core::format_exec_policy_error_with_source;
|
||||
use codex_core::path_utils;
|
||||
use codex_core::read_latest_turn_context;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::state_db::get_state_db;
|
||||
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
|
||||
@@ -45,9 +46,7 @@ use codex_protocol::config_types::AltScreenMode;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_state::log_db;
|
||||
use codex_terminal_detection::Multiplexer;
|
||||
use codex_terminal_detection::terminal_info;
|
||||
@@ -1381,12 +1380,13 @@ pub(crate) async fn read_session_cwd(
|
||||
|
||||
// Prefer the latest TurnContext cwd so resume/fork reflects the most recent
|
||||
// session directory (for the changed-cwd prompt) when DB data is unavailable.
|
||||
// When the rollout cannot be read, fall back to session metadata.
|
||||
// The alternative would be mutating the SessionMeta line when the session cwd
|
||||
// changes, but the rollout is an append-only JSONL log and rewriting the head
|
||||
// would be error-prone.
|
||||
let path = path?;
|
||||
if let Some(cwd) = read_latest_turn_context(path).await.map(|item| item.cwd) {
|
||||
return Some(cwd);
|
||||
if let Ok(Some(turn_context)) = read_latest_turn_context(path).await {
|
||||
return Some(turn_context.cwd);
|
||||
}
|
||||
match read_session_meta_line(path).await {
|
||||
Ok(meta_line) => Some(meta_line.meta.cwd),
|
||||
@@ -1415,26 +1415,11 @@ pub(crate) async fn read_session_model(
|
||||
}
|
||||
|
||||
let path = path?;
|
||||
read_latest_turn_context(path).await.map(|item| item.model)
|
||||
}
|
||||
|
||||
async fn read_latest_turn_context(path: &Path) -> Option<TurnContextItem> {
|
||||
let text = tokio::fs::read_to_string(path).await.ok()?;
|
||||
for line in text.lines().rev() {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
|
||||
continue;
|
||||
};
|
||||
if let RolloutItem::TurnContext(item) = rollout_line.item {
|
||||
return Some(item);
|
||||
}
|
||||
if let Ok(Some(turn_context)) = read_latest_turn_context(path).await {
|
||||
return Some(turn_context.model);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn cwds_differ(current_cwd: &Path, session_cwd: &Path) -> bool {
|
||||
match (
|
||||
path_utils::normalize_for_path_comparison(current_cwd),
|
||||
@@ -2044,6 +2029,45 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_session_cwd_prefers_latest_turn_context_from_compressed_rollout()
|
||||
-> std::io::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let config = build_config(&temp_dir).await?;
|
||||
let first = temp_dir.path().join("first");
|
||||
let second = temp_dir.path().join("second");
|
||||
std::fs::create_dir_all(&first)?;
|
||||
std::fs::create_dir_all(&second)?;
|
||||
|
||||
let rollout_path = temp_dir.path().join("rollout.jsonl.zst");
|
||||
let mut encoder =
|
||||
zstd::stream::write::Encoder::new(std::fs::File::create(&rollout_path)?, 0)?;
|
||||
for line in [
|
||||
RolloutLine {
|
||||
timestamp: "t0".to_string(),
|
||||
item: RolloutItem::TurnContext(build_turn_context(&config, first)),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: "t1".to_string(),
|
||||
item: RolloutItem::TurnContext(build_turn_context(&config, second.clone())),
|
||||
},
|
||||
] {
|
||||
use std::io::Write;
|
||||
writeln!(
|
||||
encoder,
|
||||
"{}",
|
||||
serde_json::to_string(&line).expect("serialize rollout")
|
||||
)?;
|
||||
}
|
||||
let _ = encoder.finish()?;
|
||||
|
||||
let cwd = read_session_cwd(&config, ThreadId::new(), Some(&rollout_path))
|
||||
.await
|
||||
.expect("expected cwd");
|
||||
assert_eq!(cwd, second);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn should_prompt_when_meta_matches_current_but_latest_turn_differs() -> std::io::Result<()>
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user