Compare commits

...

6 Commits

Author SHA1 Message Date
jif-oai
8c640ae021 Merge branch 'main' into jif/compressor 2026-05-29 12:54:27 +02:00
jif-oai
8ad185c741 make it stronger 2026-05-29 11:51:30 +01:00
jif-oai
7b74a61000 again 2026-05-29 09:52:29 +01:00
jif-oai
988badb1f9 box fix 2026-05-28 19:04:34 +01:00
jif-oai
6d523bbaec fixes 2026-05-28 18:45:20 +01:00
jif-oai
d97837a9a0 feat: compressor 2026-05-28 18:24:28 +01:00
20 changed files with 1192 additions and 104 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -3596,6 +3596,7 @@ dependencies = [
"tokio",
"tracing",
"uuid",
"zstd 0.13.3",
]
[[package]]

View File

@@ -485,6 +485,9 @@
"js_repl_tools_only": {
"type": "boolean"
},
"local_thread_store_compression": {
"type": "boolean"
},
"memories": {
"type": "boolean"
},
@@ -4579,6 +4582,9 @@
"js_repl_tools_only": {
"type": "boolean"
},
"local_thread_store_compression": {
"type": "boolean"
},
"memories": {
"type": "boolean"
},

View File

@@ -22,6 +22,7 @@ use codex_core_plugins::PluginsManager;
use codex_exec_server::EnvironmentManager;
use codex_extension_api::ExtensionRegistry;
use codex_extension_api::empty_extension_registry;
use codex_features::Feature;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_model_provider::create_model_provider;
@@ -230,10 +231,18 @@ pub fn thread_store_from_config(
state_db: Option<StateDbHandle>,
) -> Arc<dyn ThreadStore> {
match &config.experimental_thread_store {
ThreadStoreConfig::Local => Arc::new(LocalThreadStore::new(
LocalThreadStoreConfig::from_config(config),
state_db,
)),
ThreadStoreConfig::Local => {
if config
.features
.enabled(Feature::LocalThreadStoreCompression)
{
codex_rollout::spawn_rollout_compression_worker(config.codex_home.to_path_buf());
}
Arc::new(LocalThreadStore::new(
LocalThreadStoreConfig::from_config(config),
state_db,
))
}
ThreadStoreConfig::InMemory { id } => InMemoryThreadStore::for_id(id),
}
}

View File

@@ -114,6 +114,8 @@ pub enum Feature {
RuntimeMetrics,
/// Enable startup memory extraction and file-backed memory consolidation.
MemoryTool,
/// Compress cold local thread-store rollout files.
LocalThreadStoreCompression,
/// Enable the Chronicle sidecar for passive screen-context memories.
Chronicle,
/// Append additional AGENTS.md guidance to user instructions.
@@ -831,6 +833,12 @@ pub const FEATURES: &[FeatureSpec] = &[
},
default_enabled: false,
},
FeatureSpec {
id: Feature::LocalThreadStoreCompression,
key: "local_thread_store_compression",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::Chronicle,
key: "chronicle",

View File

@@ -44,6 +44,7 @@ tokio = { workspace = true, features = [
] }
tracing = { workspace = true }
uuid = { workspace = true }
zstd = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -0,0 +1,641 @@
use std::ffi::OsStr;
use std::fs::File;
use std::fs::FileTimes;
use std::fs::Permissions;
use std::io;
use std::io::BufRead;
use std::io::Read;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
#[cfg(unix)]
use std::os::unix::fs::OpenOptionsExt;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use tokio::io::AsyncBufReadExt;
use tracing::debug;
use tracing::info;
use tracing::warn;
use crate::ARCHIVED_SESSIONS_SUBDIR;
use crate::SESSIONS_SUBDIR;
const COMPRESSED_SUFFIX: &str = ".zst";
const TEMP_SUFFIX: &str = ".tmp";
const COMPRESSION_LEVEL: i32 = 3;
const MIN_ROLLOUT_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60);
const GLOBAL_LOCK_STALE_AFTER: Duration = Duration::from_secs(6 * 60 * 60);
const TEMP_FILE_STALE_AFTER: Duration = GLOBAL_LOCK_STALE_AFTER;
const WORKER_MAX_RUNTIME: Duration = Duration::from_secs(5 * 60 * 60);
const LOCK_FILE_NAME: &str = "rollout-compression.lock";
static TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
/// Starts a best-effort background job that compresses cold local rollout files.
///
/// The worker is fire-and-forget: failures are logged, startup is not blocked,
/// and a process-wide lock under `codex_home` prevents overlapping compression
/// runs from the same local store.
pub fn spawn_rollout_compression_worker(codex_home: PathBuf) {
let Ok(handle) = tokio::runtime::Handle::try_current() else {
warn!(
"failed to start rollout compression worker for {}: no Tokio runtime",
codex_home.display()
);
return;
};
handle.spawn(async move {
if let Err(err) = run_rollout_compression_worker(codex_home.clone()).await {
warn!(
"rollout compression worker failed for {}: {err}",
codex_home.display()
);
}
});
}
async fn run_rollout_compression_worker(codex_home: PathBuf) -> io::Result<()> {
let Some(_lock) = CompressionLock::try_acquire(codex_home.as_path())? else {
debug!(
"rollout compression worker already running for {}",
codex_home.display()
);
return Ok(());
};
let started_at = Instant::now();
cleanup_stale_temps(codex_home.as_path()).await?;
let mut stats = CompressionStats::default();
for root in [
codex_home.join(SESSIONS_SUBDIR),
codex_home.join(ARCHIVED_SESSIONS_SUBDIR),
] {
if started_at.elapsed() >= WORKER_MAX_RUNTIME {
break;
}
compress_rollouts_in_root(root.as_path(), started_at, &mut stats).await?;
}
info!(
"rollout compression worker finished: scanned={}, compressed={}, skipped={}, failed={}",
stats.scanned, stats.compressed, stats.skipped, stats.failed
);
Ok(())
}
pub(crate) async fn file_modified_time(path: &Path) -> io::Result<Option<time::OffsetDateTime>> {
let Some(path) = existing_rollout_path(path).await else {
return Ok(None);
};
let meta = tokio::fs::metadata(path).await?;
let modified = meta.modified().ok();
Ok(modified.map(time::OffsetDateTime::from))
}
/// Opens a rollout line reader that transparently handles plain `.jsonl` and `.jsonl.zst` files.
///
/// If the requested path disappears during a compression or decompression transition, this retries
/// the matching plain/compressed sibling once so readers do not need to know which representation is
/// currently stored on disk.
pub async fn open_rollout_line_reader(path: &Path) -> io::Result<RolloutLineReader> {
match open_rollout_line_reader_once(path).await {
Ok(reader) => Ok(reader),
Err(err) if err.kind() == io::ErrorKind::NotFound => {
match open_rollout_line_reader_once(path).await {
Ok(reader) => Ok(reader),
Err(err) if err.kind() == io::ErrorKind::NotFound => {
open_rollout_line_reader_alternate(path).await
}
Err(err) => Err(err),
}
}
Err(err) => Err(err),
}
}
pub(crate) async fn materialize_rollout_for_append(path: &Path) -> io::Result<PathBuf> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || materialize_rollout_for_append_blocking(path.as_path()))
.await
.map_err(io::Error::other)?
}
pub(crate) fn materialize_rollout_for_append_blocking(path: &Path) -> io::Result<PathBuf> {
let plain_path = plain_rollout_path(path);
if plain_path.exists() {
return Ok(plain_path);
}
let compressed_path = compressed_rollout_path(plain_path.as_path());
if !compressed_path.exists() {
return Ok(plain_path);
}
let temp_path = temp_path_for(plain_path.as_path(), "decompress");
if let Some(parent) = plain_path.parent() {
std::fs::create_dir_all(parent)?;
}
let result = (|| {
let permissions = std::fs::metadata(compressed_path.as_path())?.permissions();
let input = File::open(compressed_path.as_path())?;
let mut decoder = zstd::stream::read::Decoder::new(input)?;
let mut output = create_file_with_permissions(temp_path.as_path(), &permissions)?;
io::copy(&mut decoder, &mut output)?;
output.flush()?;
match std::fs::hard_link(temp_path.as_path(), plain_path.as_path()) {
Ok(()) => {}
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => {}
Err(err) => return Err(err),
}
let _ = std::fs::remove_file(temp_path.as_path());
match std::fs::remove_file(compressed_path.as_path()) {
Ok(()) => {}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(err),
}
Ok(())
})();
if result.is_err() {
let _ = std::fs::remove_file(temp_path.as_path());
}
result?;
Ok(plain_path)
}
pub(crate) fn compressed_rollout_path(path: &Path) -> PathBuf {
if is_compressed_rollout_path(path) {
return path.to_path_buf();
}
let mut file_name = path
.file_name()
.map(OsStr::to_os_string)
.unwrap_or_else(|| OsStr::new("rollout.jsonl").to_os_string());
file_name.push(COMPRESSED_SUFFIX);
path.with_file_name(file_name)
}
pub(crate) fn plain_rollout_path(path: &Path) -> PathBuf {
let Some(file_name) = path.file_name().and_then(OsStr::to_str) else {
return path.to_path_buf();
};
let Some(plain_file_name) = file_name.strip_suffix(COMPRESSED_SUFFIX) else {
return path.to_path_buf();
};
path.with_file_name(plain_file_name)
}
pub(crate) fn is_compressed_rollout_path(path: &Path) -> bool {
path.file_name()
.and_then(OsStr::to_str)
.is_some_and(|name| name.ends_with(".jsonl.zst"))
}
pub(crate) fn is_rollout_file_name(name: &str) -> bool {
parse_rollout_file_name(name).is_some()
}
pub(crate) fn parse_rollout_file_name(name: &str) -> Option<&str> {
let name = name.strip_suffix(COMPRESSED_SUFFIX).unwrap_or(name);
if name.starts_with("rollout-") && name.ends_with(".jsonl") {
Some(name)
} else {
None
}
}
pub(crate) fn should_skip_compressed_sibling(path: &Path) -> bool {
is_compressed_rollout_path(path) && plain_rollout_path(path).exists()
}
/// Line-oriented rollout reader returned by [`open_rollout_line_reader`].
pub struct RolloutLineReader {
inner: RolloutLineReaderInner,
}
enum RolloutLineReaderInner {
Plain(tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>),
Blocking(Option<BlockingLineReader>),
}
impl RolloutLineReader {
/// Reads the next JSONL record from the rollout.
pub async fn next_line(&mut self) -> io::Result<Option<String>> {
match &mut self.inner {
RolloutLineReaderInner::Plain(lines) => lines.next_line().await,
RolloutLineReaderInner::Blocking(slot) => {
let Some(mut reader) = slot.take() else {
return Err(io::Error::other("compressed rollout reader is busy"));
};
let (line, reader) =
tokio::task::spawn_blocking(move || (reader.next().transpose(), reader))
.await
.map_err(io::Error::other)?;
*slot = Some(reader);
line
}
}
}
}
type BlockingLineReader = std::io::Lines<std::io::BufReader<Box<dyn Read + Send>>>;
#[derive(Default)]
struct CompressionStats {
scanned: usize,
compressed: usize,
skipped: usize,
failed: usize,
}
struct CompressionLock {
path: PathBuf,
}
impl CompressionLock {
fn try_acquire(codex_home: &Path) -> io::Result<Option<Self>> {
let lock_dir = codex_home.join(".tmp");
std::fs::create_dir_all(lock_dir.as_path())?;
let path = lock_dir.join(LOCK_FILE_NAME);
match create_lock_file(path.as_path()) {
Ok(()) => return Ok(Some(Self { path })),
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => {}
Err(err) => return Err(err),
}
let stale = std::fs::metadata(path.as_path())
.and_then(|metadata| metadata.modified())
.ok()
.and_then(|modified| SystemTime::now().duration_since(modified).ok())
.is_some_and(|age| age >= GLOBAL_LOCK_STALE_AFTER);
if !stale {
return Ok(None);
}
match std::fs::remove_file(path.as_path()) {
Ok(()) => {}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(err),
}
match create_lock_file(path.as_path()) {
Ok(()) => Ok(Some(Self { path })),
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => Ok(None),
Err(err) => Err(err),
}
}
}
impl Drop for CompressionLock {
fn drop(&mut self) {
let _ = std::fs::remove_file(self.path.as_path());
}
}
fn create_lock_file(path: &Path) -> io::Result<()> {
let mut file = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(path)?;
writeln!(
file,
"pid={} started_at={:?}",
std::process::id(),
SystemTime::now()
)?;
Ok(())
}
async fn compress_rollouts_in_root(
root: &Path,
started_at: Instant,
stats: &mut CompressionStats,
) -> io::Result<()> {
if !tokio::fs::try_exists(root).await.unwrap_or(false) {
return Ok(());
}
let mut stack = vec![root.to_path_buf()];
while let Some(dir) = stack.pop() {
if started_at.elapsed() >= WORKER_MAX_RUNTIME {
break;
}
let mut read_dir = match tokio::fs::read_dir(dir.as_path()).await {
Ok(read_dir) => read_dir,
Err(err) => {
warn!(
"failed to read rollout compression directory {}: {err}",
dir.display()
);
continue;
}
};
while let Some(entry) = read_dir.next_entry().await? {
if started_at.elapsed() >= WORKER_MAX_RUNTIME {
break;
}
let path = entry.path();
let file_type = match entry.file_type().await {
Ok(file_type) => file_type,
Err(err) => {
warn!(
"failed to read rollout compression file type {}: {err}",
path.display()
);
continue;
}
};
if file_type.is_dir() {
stack.push(path);
continue;
}
if !file_type.is_file() {
continue;
}
let Some(file_name) = path.file_name().and_then(OsStr::to_str) else {
continue;
};
if !file_name.starts_with("rollout-") || !file_name.ends_with(".jsonl") {
continue;
}
stats.scanned = stats.scanned.saturating_add(1);
match compress_rollout_if_cold(path.as_path()).await {
Ok(true) => stats.compressed = stats.compressed.saturating_add(1),
Ok(false) => stats.skipped = stats.skipped.saturating_add(1),
Err(err) => {
stats.failed = stats.failed.saturating_add(1);
warn!("failed to compress rollout {}: {err}", path.display());
}
}
}
}
Ok(())
}
async fn compress_rollout_if_cold(path: &Path) -> io::Result<bool> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || compress_rollout_if_cold_blocking(path.as_path()))
.await
.map_err(io::Error::other)?
}
fn compress_rollout_if_cold_blocking(path: &Path) -> io::Result<bool> {
let before = match cold_file_state(path)? {
Some(state) => state,
None => return Ok(false),
};
let compressed_path = compressed_rollout_path(path);
let temp_path = temp_path_for(compressed_path.as_path(), "compress");
let result = (|| {
encode_zstd(path, temp_path.as_path(), &before.permissions)?;
verify_zstd(temp_path.as_path())?;
if !same_file_state(path, &before)? {
return Ok(false);
}
set_modified_time(temp_path.as_path(), before.modified)?;
match std::fs::remove_file(compressed_path.as_path()) {
Ok(()) => {}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(err),
}
std::fs::rename(temp_path.as_path(), compressed_path.as_path())?;
if !same_file_state(path, &before)? {
let _ = std::fs::remove_file(compressed_path.as_path());
return Ok(false);
}
std::fs::remove_file(path)?;
Ok(true)
})();
if !matches!(result, Ok(true)) {
let _ = std::fs::remove_file(temp_path.as_path());
}
result
}
struct FileState {
len: u64,
modified: SystemTime,
permissions: Permissions,
}
fn cold_file_state(path: &Path) -> io::Result<Option<FileState>> {
let metadata = match std::fs::metadata(path) {
Ok(metadata) => metadata,
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err),
};
if !metadata.is_file() {
return Ok(None);
}
let modified = metadata.modified()?;
let age = SystemTime::now()
.duration_since(modified)
.unwrap_or(Duration::ZERO);
if age < MIN_ROLLOUT_AGE {
return Ok(None);
}
Ok(Some(FileState {
len: metadata.len(),
modified,
permissions: metadata.permissions(),
}))
}
fn same_file_state(path: &Path, expected: &FileState) -> io::Result<bool> {
match std::fs::metadata(path) {
Ok(metadata) => Ok(metadata.len() == expected.len
&& metadata.modified()? == expected.modified
&& metadata.permissions() == expected.permissions),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false),
Err(err) => Err(err),
}
}
fn encode_zstd(source: &Path, temp_path: &Path, permissions: &Permissions) -> io::Result<()> {
let mut input = File::open(source)?;
let output = create_file_with_permissions(temp_path, permissions)?;
let mut encoder = zstd::stream::write::Encoder::new(output, COMPRESSION_LEVEL)?;
io::copy(&mut input, &mut encoder)?;
encoder.finish()?;
Ok(())
}
#[cfg(unix)]
fn create_file_with_permissions(path: &Path, permissions: &Permissions) -> io::Result<File> {
let file = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.mode(permissions.mode() & 0o7777)
.open(path)?;
file.set_permissions(permissions.clone())?;
Ok(file)
}
#[cfg(not(unix))]
fn create_file_with_permissions(path: &Path, permissions: &Permissions) -> io::Result<File> {
let file = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(path)?;
file.set_permissions(permissions.clone())?;
Ok(file)
}
fn verify_zstd(path: &Path) -> io::Result<()> {
let input = File::open(path)?;
let mut decoder = zstd::stream::read::Decoder::new(input)?;
let mut sink = io::sink();
io::copy(&mut decoder, &mut sink)?;
Ok(())
}
fn set_modified_time(path: &Path, modified: SystemTime) -> io::Result<()> {
let times = FileTimes::new().set_modified(modified);
std::fs::OpenOptions::new()
.read(true)
.open(path)?
.set_times(times)
}
async fn cleanup_stale_temps(codex_home: &Path) -> io::Result<()> {
for root in [
codex_home.join(SESSIONS_SUBDIR),
codex_home.join(ARCHIVED_SESSIONS_SUBDIR),
] {
cleanup_stale_temps_in_root(root.as_path()).await?;
}
Ok(())
}
async fn cleanup_stale_temps_in_root(root: &Path) -> io::Result<()> {
if !tokio::fs::try_exists(root).await.unwrap_or(false) {
return Ok(());
}
let mut stack = vec![root.to_path_buf()];
while let Some(dir) = stack.pop() {
let mut read_dir = match tokio::fs::read_dir(dir.as_path()).await {
Ok(read_dir) => read_dir,
Err(err) => {
warn!(
"failed to read rollout temp cleanup directory {}: {err}",
dir.display()
);
continue;
}
};
while let Some(entry) = read_dir.next_entry().await? {
let path = entry.path();
let file_type = match entry.file_type().await {
Ok(file_type) => file_type,
Err(err) => {
warn!(
"failed to read rollout temp cleanup file type {}: {err}",
path.display()
);
continue;
}
};
if file_type.is_dir() {
stack.push(path);
continue;
}
if file_type.is_file()
&& path
.file_name()
.and_then(OsStr::to_str)
.is_some_and(|name| name.ends_with(TEMP_SUFFIX))
{
let stale = entry
.metadata()
.await
.ok()
.and_then(|metadata| metadata.modified().ok())
.and_then(|modified| SystemTime::now().duration_since(modified).ok())
.is_some_and(|age| age >= TEMP_FILE_STALE_AFTER);
if !stale {
continue;
}
match tokio::fs::remove_file(path.as_path()).await {
Ok(()) => {}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => warn!(
"failed to remove stale rollout temp {}: {err}",
path.display()
),
}
}
}
}
Ok(())
}
/// Returns the existing rollout path, preferring the plain `.jsonl` file over
/// its `.jsonl.zst` compressed sibling.
pub async fn existing_rollout_path(path: &Path) -> Option<PathBuf> {
let plain_path = plain_rollout_path(path);
if tokio::fs::try_exists(plain_path.as_path())
.await
.unwrap_or(false)
{
return Some(plain_path);
}
let compressed_path = compressed_rollout_path(plain_path.as_path());
if tokio::fs::try_exists(compressed_path.as_path())
.await
.unwrap_or(false)
{
return Some(compressed_path);
}
None
}
async fn open_rollout_line_reader_once(path: &Path) -> io::Result<RolloutLineReader> {
let path = existing_rollout_path(path)
.await
.unwrap_or_else(|| path.to_path_buf());
if is_compressed_rollout_path(path.as_path()) {
return open_compressed_reader(path).await;
}
let file = tokio::fs::File::open(path).await?;
Ok(RolloutLineReader {
inner: RolloutLineReaderInner::Plain(tokio::io::BufReader::new(file).lines()),
})
}
async fn open_rollout_line_reader_alternate(path: &Path) -> io::Result<RolloutLineReader> {
let plain_path = plain_rollout_path(path);
let compressed_path = compressed_rollout_path(plain_path.as_path());
if is_compressed_rollout_path(path) {
let file = tokio::fs::File::open(plain_path).await?;
return Ok(RolloutLineReader {
inner: RolloutLineReaderInner::Plain(tokio::io::BufReader::new(file).lines()),
});
}
open_compressed_reader(compressed_path).await
}
async fn open_compressed_reader(path: PathBuf) -> io::Result<RolloutLineReader> {
let reader = tokio::task::spawn_blocking(move || {
let input = File::open(path.as_path())?;
let decoder = zstd::stream::read::Decoder::new(input)?;
Ok::<_, io::Error>(io::BufReader::new(Box::new(decoder) as Box<dyn Read + Send>).lines())
})
.await
.map_err(io::Error::other)??;
Ok(RolloutLineReader {
inner: RolloutLineReaderInner::Blocking(Some(reader)),
})
}
fn temp_path_for(path: &Path, operation: &str) -> PathBuf {
let mut file_name = path
.file_name()
.map(OsStr::to_os_string)
.unwrap_or_else(|| OsStr::new("rollout").to_os_string());
let counter = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
file_name.push(format!(".{operation}.{}.{counter}.tmp", std::process::id()));
path.with_file_name(file_name)
}
#[cfg(test)]
#[path = "compression_tests.rs"]
mod tests;

View File

@@ -0,0 +1,281 @@
use std::fs;
use std::fs::FileTimes;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::time::Duration;
use std::time::SystemTime;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::UserMessageEvent;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use uuid::Uuid;
use super::*;
use crate::RolloutRecorder;
use crate::append_rollout_item_to_path;
#[tokio::test]
async fn load_rollout_items_reads_compressed_rollout() -> anyhow::Result<()> {
let home = TempDir::new()?;
let uuid = Uuid::from_u128(1);
let thread_id = ThreadId::from_string(&uuid.to_string())?;
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
write_rollout(&rollout_path, thread_id, "hello compressed")?;
compress_now(&rollout_path)?;
let (items, loaded_thread_id, parse_errors) =
RolloutRecorder::load_rollout_items(&rollout_path).await?;
assert_eq!(loaded_thread_id, Some(thread_id));
assert_eq!(parse_errors, 0);
assert_eq!(items.len(), 2);
assert!(!rollout_path.exists());
assert!(compressed_rollout_path(&rollout_path).exists());
Ok(())
}
#[tokio::test]
async fn append_rollout_item_materializes_compressed_rollout() -> anyhow::Result<()> {
let home = TempDir::new()?;
let uuid = Uuid::from_u128(2);
let thread_id = ThreadId::from_string(&uuid.to_string())?;
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
write_rollout(&rollout_path, thread_id, "hello before append")?;
compress_now(&rollout_path)?;
append_rollout_item_to_path(
&rollout_path,
&RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "hello after append".to_string(),
..Default::default()
})),
)
.await?;
assert!(rollout_path.exists());
assert!(!compressed_rollout_path(&rollout_path).exists());
let (items, loaded_thread_id, parse_errors) =
RolloutRecorder::load_rollout_items(&rollout_path).await?;
assert_eq!(loaded_thread_id, Some(thread_id));
assert_eq!(parse_errors, 0);
assert_eq!(items.len(), 3);
Ok(())
}
#[tokio::test]
async fn worker_compresses_old_active_and_archived_rollouts() -> anyhow::Result<()> {
let home = TempDir::new()?;
let active_uuid = Uuid::from_u128(3);
let active_id = ThreadId::from_string(&active_uuid.to_string())?;
let active_path = rollout_path(home.path(), "2025-01-03T12-00-00", active_uuid);
write_rollout(&active_path, active_id, "old active")?;
set_old_mtime(&active_path)?;
let archived_uuid = Uuid::from_u128(4);
let archived_id = ThreadId::from_string(&archived_uuid.to_string())?;
let archived_path = home
.path()
.join("archived_sessions")
.join(format!("rollout-2025-01-04T12-00-00-{archived_uuid}.jsonl"));
write_rollout(&archived_path, archived_id, "old archived")?;
set_old_mtime(&archived_path)?;
let fresh_uuid = Uuid::from_u128(5);
let fresh_id = ThreadId::from_string(&fresh_uuid.to_string())?;
let fresh_path = rollout_path(home.path(), "2025-01-05T12-00-00", fresh_uuid);
write_rollout(&fresh_path, fresh_id, "fresh active")?;
let stale_temp = active_path.with_file_name("rollout-stale.jsonl.zst.tmp");
fs::write(&stale_temp, "stale temp")?;
set_old_mtime(&stale_temp)?;
let fresh_temp = active_path.with_file_name("rollout-fresh.jsonl.zst.tmp");
fs::write(&fresh_temp, "fresh temp")?;
run_rollout_compression_worker(home.path().to_path_buf()).await?;
assert!(!active_path.exists());
assert!(compressed_rollout_path(&active_path).exists());
assert!(!archived_path.exists());
assert!(compressed_rollout_path(&archived_path).exists());
assert!(fresh_path.exists());
assert!(!compressed_rollout_path(&fresh_path).exists());
assert!(!stale_temp.exists());
assert!(fresh_temp.exists());
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn compression_preserves_rollout_permissions() -> anyhow::Result<()> {
let home = TempDir::new()?;
let uuid = Uuid::from_u128(6);
let thread_id = ThreadId::from_string(&uuid.to_string())?;
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
write_rollout(&rollout_path, thread_id, "restricted transcript")?;
fs::set_permissions(&rollout_path, fs::Permissions::from_mode(0o600))?;
set_old_mtime(&rollout_path)?;
run_rollout_compression_worker(home.path().to_path_buf()).await?;
let compressed_path = compressed_rollout_path(&rollout_path);
assert!(!rollout_path.exists());
assert_eq!(
fs::metadata(&compressed_path)?.permissions().mode() & 0o777,
0o600
);
append_rollout_item_to_path(
&rollout_path,
&RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "materialize restricted transcript".to_string(),
..Default::default()
})),
)
.await?;
assert!(rollout_path.exists());
assert!(!compressed_path.exists());
assert_eq!(
fs::metadata(&rollout_path)?.permissions().mode() & 0o777,
0o600
);
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn compression_preserves_read_only_rollout_permissions() -> anyhow::Result<()> {
let home = TempDir::new()?;
let uuid = Uuid::from_u128(7);
let thread_id = ThreadId::from_string(&uuid.to_string())?;
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
write_rollout(&rollout_path, thread_id, "read-only transcript")?;
set_old_mtime(&rollout_path)?;
fs::set_permissions(&rollout_path, fs::Permissions::from_mode(0o400))?;
let source_modified = fs::metadata(&rollout_path)?.modified()?;
run_rollout_compression_worker(home.path().to_path_buf()).await?;
let compressed_path = compressed_rollout_path(&rollout_path);
let compressed_metadata = fs::metadata(&compressed_path)?;
assert!(!rollout_path.exists());
assert_eq!(compressed_metadata.permissions().mode() & 0o777, 0o400);
assert_eq!(compressed_metadata.modified()?, source_modified);
Ok(())
}
#[tokio::test]
async fn find_thread_path_by_id_handles_compressed_rollout_filenames() -> anyhow::Result<()> {
let home = TempDir::new()?;
let uuid = Uuid::from_u128(8);
let thread_id = ThreadId::from_string(&uuid.to_string())?;
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
write_rollout(&rollout_path, thread_id, "compressed filename lookup")?;
compress_now(&rollout_path)?;
let compressed_path = compressed_rollout_path(&rollout_path);
assert_eq!(
crate::find_thread_path_by_id_str(home.path(), &uuid.to_string(), None).await?,
Some(compressed_path)
);
assert_eq!(
crate::find_thread_path_by_id_str(home.path(), "not-a-uuid", None).await?,
None
);
Ok(())
}
#[tokio::test]
async fn find_thread_path_by_id_ignores_compression_temp_matches() -> anyhow::Result<()> {
let home = TempDir::new()?;
let uuid = Uuid::from_u128(9);
let thread_id = ThreadId::from_string(&uuid.to_string())?;
let temp_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid).with_file_name(format!(
"rollout-2025-01-03T12-00-00-{uuid}.jsonl.zst.compress.1.0.tmp"
));
write_rollout(&temp_path, thread_id, "temporary file should not resolve")?;
assert_eq!(
crate::find_thread_path_by_id_str(home.path(), &uuid.to_string(), None).await?,
None
);
Ok(())
}
fn rollout_path(home: &std::path::Path, ts: &str, uuid: Uuid) -> std::path::PathBuf {
home.join("sessions/2025/01/03")
.join(format!("rollout-{ts}-{uuid}.jsonl"))
}
fn write_rollout(path: &std::path::Path, thread_id: ThreadId, message: &str) -> anyhow::Result<()> {
let parent = path.parent().expect("rollout path should have parent");
fs::create_dir_all(parent)?;
let session_meta_line = SessionMetaLine {
meta: SessionMeta {
id: thread_id,
forked_from_id: None,
timestamp: "2025-01-03T12:00:00Z".to_string(),
cwd: parent.to_path_buf(),
originator: "test".to_string(),
cli_version: "test".to_string(),
source: SessionSource::Cli,
thread_source: None,
agent_path: None,
agent_nickname: None,
agent_role: None,
model_provider: None,
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
};
let lines = [
RolloutLine {
timestamp: "2025-01-03T12:00:00Z".to_string(),
item: RolloutItem::SessionMeta(session_meta_line),
},
RolloutLine {
timestamp: "2025-01-03T12:00:01Z".to_string(),
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: message.to_string(),
..Default::default()
})),
},
];
let jsonl = lines
.iter()
.map(serde_json::to_string)
.collect::<Result<Vec<_>, _>>()?
.join("\n");
fs::write(path, format!("{jsonl}\n"))?;
Ok(())
}
fn compress_now(path: &std::path::Path) -> anyhow::Result<()> {
let compressed_path = compressed_rollout_path(path);
let permissions = fs::metadata(path)?.permissions();
encode_zstd(path, compressed_path.as_path(), &permissions)?;
fs::remove_file(path)?;
Ok(())
}
fn set_old_mtime(path: &std::path::Path) -> anyhow::Result<()> {
let old = SystemTime::now()
.checked_sub(Duration::from_secs(8 * 24 * 60 * 60))
.expect("old timestamp should be representable");
let times = FileTimes::new().set_modified(old);
fs::OpenOptions::new()
.write(true)
.open(path)?
.set_times(times)?;
Ok(())
}

View File

@@ -4,6 +4,7 @@ use std::sync::LazyLock;
use codex_protocol::protocol::SessionSource;
pub(crate) mod compression;
pub(crate) mod config;
pub(crate) mod list;
pub(crate) mod metadata;
@@ -32,6 +33,10 @@ pub static INTERACTIVE_SESSION_SOURCES: LazyLock<Vec<SessionSource>> = LazyLock:
});
pub use codex_protocol::protocol::SessionMeta;
pub use compression::RolloutLineReader;
pub use compression::existing_rollout_path;
pub use compression::open_rollout_line_reader;
pub use compression::spawn_rollout_compression_worker;
pub use config::Config;
pub use config::RolloutConfig;
pub use config::RolloutConfigView;

View File

@@ -18,6 +18,7 @@ use uuid::Uuid;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use super::compression;
use crate::protocol::EventMsg;
use crate::state_db;
use codex_file_search as file_search;
@@ -895,7 +896,9 @@ async fn collect_flat_rollout_files(
let Some(name_str) = file_name.to_str() else {
continue;
};
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
if !compression::is_rollout_file_name(name_str)
|| compression::should_skip_compressed_sibling(entry.path().as_path())
{
continue;
}
let Some((ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
@@ -915,7 +918,9 @@ async fn collect_rollout_day_files(
day_path: &Path,
) -> io::Result<Vec<(OffsetDateTime, Uuid, PathBuf)>> {
let mut day_files = collect_files(day_path, |name_str, path| {
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
if !compression::is_rollout_file_name(name_str)
|| compression::should_skip_compressed_sibling(path)
{
return None;
}
@@ -928,7 +933,8 @@ async fn collect_rollout_day_files(
}
pub(crate) fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl[.zst]
let name = compression::parse_rollout_file_name(name)?;
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
// Scan from the right for a '-' such that the suffix parses as a UUID.
@@ -985,7 +991,9 @@ async fn collect_flat_files_by_updated_at(
let Some(name_str) = file_name.to_str() else {
continue;
};
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
if !compression::is_rollout_file_name(name_str)
|| compression::should_skip_compressed_sibling(entry.path().as_path())
{
continue;
}
let Some((_ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
@@ -1073,11 +1081,7 @@ impl<'a> ProviderMatcher<'a> {
}
async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTailSummary> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut lines = compression::open_rollout_line_reader(path).await?;
let mut summary = HeadTailSummary::default();
let mut lines_scanned = 0usize;
@@ -1163,11 +1167,7 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
/// Read up to `HEAD_RECORD_LIMIT` records from the start of the rollout file at `path`.
/// This should be enough to produce a summary including the session meta line.
pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Value>> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut lines = compression::open_rollout_line_reader(path).await?;
let mut head = Vec::new();
while head.len() < HEAD_RECORD_LIMIT {
@@ -1251,13 +1251,9 @@ pub async fn read_session_meta_line(path: &Path) -> io::Result<SessionMetaLine>
}
async fn file_modified_time(path: &Path) -> io::Result<Option<OffsetDateTime>> {
let meta = tokio::fs::metadata(path).await?;
let modified = meta.modified().ok();
let Some(modified) = modified else {
return Ok(None);
};
let dt = OffsetDateTime::from(modified);
Ok(truncate_to_millis(dt))
Ok(compression::file_modified_time(path)
.await?
.and_then(truncate_to_millis))
}
fn format_rfc3339(dt: OffsetDateTime) -> Option<String> {
@@ -1298,16 +1294,18 @@ async fn find_thread_path_by_id_str_in_subdir(
.await
{
Ok(Some(db_path)) => {
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
match read_session_meta_line(&db_path).await {
if let Some(existing_db_path) =
compression::existing_rollout_path(db_path.as_path()).await
{
match read_session_meta_line(&existing_db_path).await {
Ok(meta_line) if meta_line.meta.id == thread_id => {
return Ok(Some(db_path));
return Ok(Some(existing_db_path));
}
Ok(meta_line) => {
tracing::error!(
"state db returned rollout path for thread {id_str} but file belongs to thread {}: {}",
meta_line.meta.id,
db_path.display()
existing_db_path.display()
);
tracing::warn!(
"state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path"
@@ -1321,9 +1319,9 @@ async fn find_thread_path_by_id_str_in_subdir(
Err(err) => {
tracing::debug!(
"state db returned rollout path for thread {id_str} that could not be verified: {}: {err}",
db_path.display()
existing_db_path.display()
);
unverified_db_path = Some(db_path);
unverified_db_path = Some(existing_db_path);
}
}
} else {
@@ -1366,10 +1364,27 @@ async fn find_thread_path_by_id_str_in_subdir(
..Default::default()
};
let results = file_search::run(id_str, vec![root], options, /*cancel_flag*/ None)
.map_err(|e| io::Error::other(format!("file search failed: {e}")))?;
let results = file_search::run(
id_str,
vec![root.clone()],
options,
/*cancel_flag*/ None,
)
.map_err(|e| io::Error::other(format!("file search failed: {e}")))?;
let found = results.matches.into_iter().next().map(|m| m.full_path());
let found = match results
.matches
.into_iter()
.map(|m| m.full_path())
.find(|path| {
path.file_name()
.and_then(OsStr::to_str)
.is_some_and(|name| !name.ends_with(".tmp"))
&& !compression::should_skip_compressed_sibling(path)
}) {
Some(path) => Some(path),
None => find_rollout_path_by_id_from_filenames(root.as_path(), id_str).await?,
};
if let Some(found_path) = found.as_ref() {
tracing::debug!("state db missing rollout path for thread {id_str}");
tracing::warn!(
@@ -1394,6 +1409,45 @@ async fn find_thread_path_by_id_str_in_subdir(
Ok(found.or(unverified_db_path))
}
async fn find_rollout_path_by_id_from_filenames(
root: &Path,
id_str: &str,
) -> io::Result<Option<PathBuf>> {
let Ok(target) = Uuid::parse_str(id_str) else {
return Ok(None);
};
let mut stack = vec![root.to_path_buf()];
while let Some(dir) = stack.pop() {
let mut read_dir = match tokio::fs::read_dir(dir.as_path()).await {
Ok(read_dir) => read_dir,
Err(err) if err.kind() == io::ErrorKind::NotFound => continue,
Err(err) => return Err(err),
};
while let Some(entry) = read_dir.next_entry().await? {
let path = entry.path();
let file_type = entry.file_type().await?;
if file_type.is_dir() {
stack.push(path);
continue;
}
if !file_type.is_file() || compression::should_skip_compressed_sibling(path.as_path()) {
continue;
}
let file_name = entry.file_name();
let Some(name) = file_name.to_str() else {
continue;
};
let Some((_ts, id)) = parse_timestamp_uuid_from_filename(name) else {
continue;
};
if id == target {
return Ok(Some(path));
}
}
}
Ok(None)
}
/// Locate a recorded thread rollout file by its UUID string using the existing
/// paginated listing implementation. Returns `Ok(Some(path))` if found, `Ok(None)` if not present
/// or the id is invalid.

View File

@@ -1,5 +1,6 @@
use crate::ARCHIVED_SESSIONS_SUBDIR;
use crate::SESSIONS_SUBDIR;
use crate::compression;
use crate::list::parse_timestamp_uuid_from_filename;
use crate::recorder::RolloutRecorder;
use crate::state_db::normalize_cwd_for_state_db;
@@ -27,8 +28,6 @@ use std::path::PathBuf;
use tracing::info;
use tracing::warn;
const ROLLOUT_PREFIX: &str = "rollout-";
const ROLLOUT_SUFFIX: &str = ".jsonl";
const BACKFILL_BATCH_SIZE: usize = 200;
#[cfg(not(test))]
const BACKFILL_LEASE_SECONDS: i64 = 900;
@@ -78,9 +77,7 @@ pub fn builder_from_items(
}
let file_name = rollout_path.file_name()?.to_str()?;
if !file_name.starts_with(ROLLOUT_PREFIX) || !file_name.ends_with(ROLLOUT_SUFFIX) {
return None;
}
let file_name = compression::parse_rollout_file_name(file_name)?;
let (created_ts, uuid) = parse_timestamp_uuid_from_filename(file_name)?;
let created_at =
DateTime::<Utc>::from_timestamp(created_ts.unix_timestamp(), 0)?.with_nanosecond(0)?;
@@ -364,9 +361,8 @@ fn backfill_watermark_for_path(codex_home: &Path, path: &Path) -> String {
}
async fn file_modified_time_utc(path: &Path) -> Option<DateTime<Utc>> {
let modified = tokio::fs::metadata(path).await.ok()?.modified().ok()?;
let updated_at: DateTime<Utc> = modified.into();
Some(updated_at)
let modified = compression::file_modified_time(path).await.ok()??;
DateTime::<Utc>::from_timestamp(modified.unix_timestamp(), modified.nanosecond())
}
fn parse_timestamp_to_utc(ts: &str) -> Option<DateTime<Utc>> {
@@ -425,7 +421,9 @@ async fn collect_rollout_paths(root: &Path) -> std::io::Result<Vec<PathBuf>> {
let Some(name) = file_name.to_str() else {
continue;
};
if name.starts_with(ROLLOUT_PREFIX) && name.ends_with(ROLLOUT_SUFFIX) {
if compression::is_rollout_file_name(name)
&& !compression::should_skip_compressed_sibling(path.as_path())
{
paths.push(path);
}
}

View File

@@ -30,6 +30,7 @@ use tracing::warn;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use super::compression;
use super::list::Cursor;
use super::list::SortDirection;
use super::list::ThreadItem;
@@ -694,17 +695,20 @@ impl RolloutRecorder {
(None, Some(log_file_info), path, Some(session_meta))
}
RolloutRecorderParams::Resume { path } => (
Some(
tokio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?,
),
None,
path,
None,
),
RolloutRecorderParams::Resume { path } => {
let path = compression::materialize_rollout_for_append(path.as_path()).await?;
(
Some(
tokio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?,
),
None,
path,
None,
)
}
};
// Clone the cwd for the spawned task to collect git info asynchronously
@@ -815,19 +819,17 @@ impl RolloutRecorder {
path: &Path,
) -> std::io::Result<(Vec<RolloutItem>, Option<ThreadId>, usize)> {
trace!("Resuming rollout from {path:?}");
let text = tokio::fs::read_to_string(path).await?;
if text.trim().is_empty() {
return Err(IoError::other("empty session file"));
}
let mut items: Vec<RolloutItem> = Vec::new();
let mut thread_id: Option<ThreadId> = None;
let mut parse_errors = 0usize;
for line in text.lines() {
let mut reader = compression::open_rollout_line_reader(path).await?;
let mut saw_non_empty_line = false;
while let Some(line) = reader.next_line().await? {
if line.trim().is_empty() {
continue;
}
let mut v: Value = match serde_json::from_str(line) {
saw_non_empty_line = true;
let mut v: Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(e) => {
warn!("failed to parse line as JSON: {line:?}, error: {e}");
@@ -870,6 +872,9 @@ impl RolloutRecorder {
}
}
}
if !saw_non_empty_line {
return Err(IoError::other("empty session file"));
}
tracing::debug!(
"Resumed rollout with {} items, thread ID: {:?}, parse errors: {}",
@@ -1357,6 +1362,7 @@ fn precompute_log_file_info(
}
fn open_log_file(path: &Path) -> std::io::Result<File> {
let path = compression::materialize_rollout_for_append_blocking(path)?;
let Some(parent) = path.parent() else {
return Err(IoError::other(format!(
"rollout path has no parent: {}",
@@ -1616,6 +1622,7 @@ pub async fn append_rollout_item_to_path(
rollout_path: &Path,
item: &RolloutItem,
) -> std::io::Result<()> {
let rollout_path = compression::materialize_rollout_for_append(rollout_path).await?;
let file = tokio::fs::OpenOptions::new()
.append(true)
.open(rollout_path)

View File

@@ -11,11 +11,11 @@ use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use regex::Regex;
use regex::RegexBuilder;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use super::compression;
const MATCH_CONTEXT_BEFORE_CHARS: usize = 48;
const MATCH_CONTEXT_AFTER_CHARS: usize = 96;
@@ -32,7 +32,10 @@ pub async fn search_rollout_paths(
SESSIONS_SUBDIR
});
let search_term = json_escaped_search_term(search_term)?;
ripgrep_rollout_paths(rg_command, root.as_path(), search_term.as_str()).await
let mut matches =
ripgrep_rollout_paths(rg_command, root.as_path(), search_term.as_str()).await?;
matches.extend(scan_compressed_rollout_paths(root.as_path(), search_term.as_str()).await?);
Ok(matches)
}
async fn ripgrep_rollout_paths(
@@ -107,7 +110,11 @@ async fn scan_rollout_paths(root: &Path, search_term: &str) -> io::Result<HashSe
continue;
}
if !file_type.is_file()
|| path.extension().and_then(|extension| extension.to_str()) != Some("jsonl")
|| !path
.file_name()
.and_then(|name| name.to_str())
.is_some_and(compression::is_rollout_file_name)
|| compression::should_skip_compressed_sibling(path.as_path())
{
continue;
}
@@ -121,8 +128,7 @@ async fn scan_rollout_paths(root: &Path, search_term: &str) -> io::Result<HashSe
}
async fn rollout_contains(path: &Path, search_term: &Regex) -> io::Result<bool> {
let file = tokio::fs::File::open(path).await?;
let mut lines = tokio::io::BufReader::new(file).lines();
let mut lines = compression::open_rollout_line_reader(path).await?;
while let Some(line) = lines.next_line().await? {
if search_term.is_match(line.as_str()) {
return Ok(true);
@@ -135,8 +141,7 @@ pub async fn first_rollout_content_match_snippet(
path: &Path,
search_term: &str,
) -> io::Result<Option<String>> {
let file = tokio::fs::File::open(path).await?;
let mut lines = tokio::io::BufReader::new(file).lines();
let mut lines = compression::open_rollout_line_reader(path).await?;
let json_search_term = case_insensitive_literal_regex(json_escaped_search_term(search_term)?)?;
let search_term = case_insensitive_literal_regex(search_term)?;
while let Some(line) = lines.next_line().await? {
@@ -149,6 +154,42 @@ pub async fn first_rollout_content_match_snippet(
Ok(None)
}
async fn scan_compressed_rollout_paths(
root: &Path,
search_term: &str,
) -> io::Result<HashSet<PathBuf>> {
let mut matches = HashSet::new();
let mut dirs = vec![root.to_path_buf()];
let search_term = case_insensitive_literal_regex(search_term)?;
while let Some(dir) = dirs.pop() {
let mut entries = match tokio::fs::read_dir(dir).await {
Ok(entries) => entries,
Err(err) if err.kind() == io::ErrorKind::NotFound => continue,
Err(err) => return Err(err),
};
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let file_type = entry.file_type().await?;
if file_type.is_dir() {
dirs.push(path);
continue;
}
if !file_type.is_file()
|| !compression::is_compressed_rollout_path(path.as_path())
|| compression::should_skip_compressed_sibling(path.as_path())
{
continue;
}
if rollout_contains(path.as_path(), &search_term).await? {
matches.insert(path);
}
}
}
Ok(matches)
}
fn json_escaped_search_term(search_term: &str) -> io::Result<String> {
let serialized = serde_json::to_string(search_term).map_err(io::Error::other)?;
Ok(serialized[1..serialized.len() - 1].to_string())

View File

@@ -412,10 +412,11 @@ pub async fn list_threads_db(
Ok(mut page) => {
let mut valid_items = Vec::with_capacity(page.items.len());
for item in page.items {
if tokio::fs::try_exists(&item.rollout_path)
.await
.unwrap_or(false)
if let Some(existing_path) =
crate::compression::existing_rollout_path(item.rollout_path.as_path()).await
{
let mut item = item;
item.rollout_path = existing_path;
valid_items.push(item);
} else {
warn!(

View File

@@ -72,10 +72,11 @@ pub(super) fn matching_rollout_file_name(
),
});
};
let required_suffix = format!("{thread_id}.jsonl");
if file_name
.to_string_lossy()
.ends_with(required_suffix.as_str())
let required_plain_suffix = format!("{thread_id}.jsonl");
let required_compressed_suffix = format!("{required_plain_suffix}.zst");
let file_name_str = file_name.to_string_lossy();
if file_name_str.ends_with(required_plain_suffix.as_str())
|| file_name_str.ends_with(required_compressed_suffix.as_str())
{
Ok(file_name)
} else {
@@ -186,6 +187,7 @@ pub(super) fn git_info_from_parts(
fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
let file_name = path.file_name()?.to_str()?;
let file_name = file_name.strip_suffix(".zst").unwrap_or(file_name);
let stem = file_name.strip_suffix(".jsonl")?;
if stem.len() < 37 {
return None;

View File

@@ -156,9 +156,9 @@ async fn sync_materialized_rollout_path(
thread_id: ThreadId,
) -> ThreadStoreResult<()> {
let rollout_path = rollout_path(store, thread_id).await?;
if !tokio::fs::try_exists(rollout_path.as_path())
if codex_rollout::existing_rollout_path(rollout_path.as_path())
.await
.unwrap_or(false)
.is_none()
{
return Ok(());
}

View File

@@ -84,7 +84,7 @@ async fn sqlite_rollout_path_can_load_history_for_thread(
path: &std::path::Path,
thread_id: codex_protocol::ThreadId,
) -> bool {
if !tokio::fs::try_exists(path).await.unwrap_or(false) {
if codex_rollout::existing_rollout_path(path).await.is_none() {
return false;
}
// SQLite metadata can outlive a moved/recreated rollout path. When history is
@@ -101,7 +101,7 @@ pub(super) async fn read_thread_by_rollout_path(
include_archived: bool,
include_history: bool,
) -> ThreadStoreResult<StoredThread> {
let path = resolve_requested_rollout_path(store, rollout_path)?;
let path = resolve_requested_rollout_path(store, rollout_path).await?;
let mut thread = read_thread_from_rollout_path(store, path).await?;
if !include_archived && thread.archived_at.is_some() {
return Err(ThreadStoreError::InvalidRequest {
@@ -128,7 +128,7 @@ pub(super) async fn read_thread_by_rollout_path(
Ok(thread)
}
fn resolve_requested_rollout_path(
async fn resolve_requested_rollout_path(
store: &LocalThreadStore,
rollout_path: std::path::PathBuf,
) -> ThreadStoreResult<std::path::PathBuf> {
@@ -137,7 +137,15 @@ fn resolve_requested_rollout_path(
} else {
rollout_path
};
std::fs::canonicalize(&path).map_err(|err| ThreadStoreError::InvalidRequest {
let Some(path) = codex_rollout::existing_rollout_path(path.as_path()).await else {
return Err(ThreadStoreError::InvalidRequest {
message: format!(
"failed to resolve rollout path `{}`: file does not exist",
path.display()
),
});
};
std::fs::canonicalize(path.as_path()).map_err(|err| ThreadStoreError::InvalidRequest {
message: format!("failed to resolve rollout path `{}`: {err}", path.display()),
})
}
@@ -166,11 +174,9 @@ async fn resolve_rollout_path(
include_archived: bool,
) -> ThreadStoreResult<Option<std::path::PathBuf>> {
if let Ok(path) = live_writer::rollout_path(store, thread_id).await
&& tokio::fs::try_exists(path.as_path()).await.map_err(|err| {
ThreadStoreError::InvalidRequest {
message: format!("failed to check rollout path for thread id {thread_id}: {err}"),
}
})?
&& codex_rollout::existing_rollout_path(path.as_path())
.await
.is_some()
&& (include_archived || !rollout_path_is_archived(store.config.codex_home.as_path(), &path))
{
return Ok(Some(path));
@@ -280,6 +286,9 @@ async fn stored_thread_from_sqlite_metadata(
.await
.ok()
.map(|meta_line| meta_line.meta);
let rollout_path = codex_rollout::existing_rollout_path(metadata.rollout_path.as_path())
.await
.unwrap_or_else(|| metadata.rollout_path.clone());
let forked_from_id = session_meta.as_ref().and_then(|meta| meta.forked_from_id);
let preview = metadata
.preview
@@ -288,7 +297,7 @@ async fn stored_thread_from_sqlite_metadata(
.unwrap_or_default();
StoredThread {
thread_id: metadata.id,
rollout_path: Some(metadata.rollout_path),
rollout_path: Some(rollout_path),
forked_from_id,
preview,
name,

View File

@@ -68,13 +68,14 @@ pub(super) async fn update_thread_metadata(
if live_writer::rollout_path(store, thread_id).await.is_ok() {
live_writer::persist_thread(store, thread_id).await?;
}
let resolved_rollout_path =
let mut resolved_rollout_path =
resolve_rollout_path(store, thread_id, params.include_archived).await?;
let name = patch.name;
let git_info = patch.git_info;
if let Some(memory_mode) = patch.memory_mode {
apply_thread_memory_mode(resolved_rollout_path.path.as_path(), thread_id, memory_mode)
.await?;
refresh_resolved_rollout_path(&mut resolved_rollout_path).await;
}
let state_db_ctx = store.state_db().await;
@@ -142,6 +143,7 @@ pub(super) async fn update_thread_metadata(
memory_mode.as_deref(),
)
.await?;
refresh_resolved_rollout_path(&mut resolved_rollout_path).await;
apply_thread_git_info(store, thread_id, sha, branch, origin_url).await?;
}
@@ -172,6 +174,12 @@ pub(super) async fn update_thread_metadata(
Ok(thread)
}
async fn refresh_resolved_rollout_path(resolved: &mut ResolvedRolloutPath) {
if let Some(path) = codex_rollout::existing_rollout_path(resolved.path.as_path()).await {
resolved.path = path;
}
}
async fn apply_metadata_update(
store: &LocalThreadStore,
thread_id: ThreadId,

View File

@@ -1086,16 +1086,15 @@ See the Codex keymap documentation for supported actions and examples."
#[cfg(not(debug_assertions))]
let pre_loop_exit_reason = if let Some(latest_version) = upgrade_version {
let control = app
.handle_event(
tui,
&mut app_server,
AppEvent::InsertHistoryCell(Box::new(UpdateAvailableHistoryCell::new(
latest_version,
crate::update_action::get_update_action(),
))),
)
.await?;
let control = Box::pin(app.handle_event(
tui,
&mut app_server,
AppEvent::InsertHistoryCell(Box::new(UpdateAvailableHistoryCell::new(
latest_version,
crate::update_action::get_update_action(),
))),
))
.await?;
match control {
AppRunControl::Continue => None,
AppRunControl::Exit(exit_reason) => Some(exit_reason),
@@ -1112,7 +1111,7 @@ See the Codex keymap documentation for supported actions and examples."
loop {
let control = select! {
Some(event) = app_event_rx.recv() => {
match app.handle_event(tui, &mut app_server, event).await {
match Box::pin(app.handle_event(tui, &mut app_server, event)).await {
Ok(control) => control,
Err(err) => break Err(err),
}

View File

@@ -1231,6 +1231,25 @@ terminal_resize_reflow_max_rows = 9000
Ok(())
}
#[tokio::test]
async fn rebuild_config_for_resume_or_fallback_reloads_same_cwd_config() -> Result<()> {
let mut app = make_test_app().await;
let codex_home = tempdir()?;
app.config.codex_home = codex_home.path().to_path_buf().abs();
std::fs::write(
codex_home.path().join("config.toml"),
"model = \"freshly-loaded-model\"",
)?;
let current_cwd = app.config.cwd.clone();
let resume_config = app
.rebuild_config_for_resume_or_fallback(&current_cwd, current_cwd.to_path_buf())
.await?;
assert_eq!(resume_config.model.as_deref(), Some("freshly-loaded-model"));
Ok(())
}
#[tokio::test]
async fn rebuild_config_for_resume_or_fallback_uses_current_config_on_same_cwd_error()
-> Result<()> {

View File

@@ -14,11 +14,11 @@ use crate::cwd_prompt::CwdPromptOutcome;
use crate::cwd_prompt::CwdSelection;
use crate::tui::Tui;
use codex_protocol::ThreadId;
use codex_rollout::open_rollout_line_reader;
use codex_state::StateRuntime;
use codex_utils_path as path_utils;
use serde::Deserialize;
use serde_json::Value;
use tokio::io::AsyncBufReadExt;
#[derive(Default)]
struct RolloutResumeState {
@@ -142,13 +142,11 @@ pub(crate) fn cwds_differ(current_cwd: &Path, session_cwd: &Path) -> bool {
}
async fn read_rollout_resume_state(path: &Path) -> io::Result<RolloutResumeState> {
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut reader = open_rollout_line_reader(path).await?;
let mut state = RolloutResumeState::default();
let mut saw_record = false;
while let Some(line) = lines.next_line().await? {
while let Some(line) = reader.next_line().await? {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;