mirror of
https://github.com/openai/codex.git
synced 2026-06-04 04:12:03 +00:00
Compare commits
6 Commits
fcoury/imp
...
jif/compre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c640ae021 | ||
|
|
8ad185c741 | ||
|
|
7b74a61000 | ||
|
|
988badb1f9 | ||
|
|
6d523bbaec | ||
|
|
d97837a9a0 |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -3596,6 +3596,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"zstd 0.13.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -485,6 +485,9 @@
|
||||
"js_repl_tools_only": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"local_thread_store_compression": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"memories": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -4579,6 +4582,9 @@
|
||||
"js_repl_tools_only": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"local_thread_store_compression": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"memories": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_core_plugins::PluginsManager;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_extension_api::ExtensionRegistry;
|
||||
use codex_extension_api::empty_extension_registry;
|
||||
use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_model_provider::create_model_provider;
|
||||
@@ -230,10 +231,18 @@ pub fn thread_store_from_config(
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> Arc<dyn ThreadStore> {
|
||||
match &config.experimental_thread_store {
|
||||
ThreadStoreConfig::Local => Arc::new(LocalThreadStore::new(
|
||||
LocalThreadStoreConfig::from_config(config),
|
||||
state_db,
|
||||
)),
|
||||
ThreadStoreConfig::Local => {
|
||||
if config
|
||||
.features
|
||||
.enabled(Feature::LocalThreadStoreCompression)
|
||||
{
|
||||
codex_rollout::spawn_rollout_compression_worker(config.codex_home.to_path_buf());
|
||||
}
|
||||
Arc::new(LocalThreadStore::new(
|
||||
LocalThreadStoreConfig::from_config(config),
|
||||
state_db,
|
||||
))
|
||||
}
|
||||
ThreadStoreConfig::InMemory { id } => InMemoryThreadStore::for_id(id),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,6 +114,8 @@ pub enum Feature {
|
||||
RuntimeMetrics,
|
||||
/// Enable startup memory extraction and file-backed memory consolidation.
|
||||
MemoryTool,
|
||||
/// Compress cold local thread-store rollout files.
|
||||
LocalThreadStoreCompression,
|
||||
/// Enable the Chronicle sidecar for passive screen-context memories.
|
||||
Chronicle,
|
||||
/// Append additional AGENTS.md guidance to user instructions.
|
||||
@@ -831,6 +833,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
},
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::LocalThreadStoreCompression,
|
||||
key: "local_thread_store_compression",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::Chronicle,
|
||||
key: "chronicle",
|
||||
|
||||
@@ -44,6 +44,7 @@ tokio = { workspace = true, features = [
|
||||
] }
|
||||
tracing = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
zstd = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
|
||||
641
codex-rs/rollout/src/compression.rs
Normal file
641
codex-rs/rollout/src/compression.rs
Normal file
@@ -0,0 +1,641 @@
|
||||
use std::ffi::OsStr;
|
||||
use std::fs::File;
|
||||
use std::fs::FileTimes;
|
||||
use std::fs::Permissions;
|
||||
use std::io;
|
||||
use std::io::BufRead;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use crate::SESSIONS_SUBDIR;
|
||||
|
||||
const COMPRESSED_SUFFIX: &str = ".zst";
|
||||
const TEMP_SUFFIX: &str = ".tmp";
|
||||
const COMPRESSION_LEVEL: i32 = 3;
|
||||
const MIN_ROLLOUT_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60);
|
||||
const GLOBAL_LOCK_STALE_AFTER: Duration = Duration::from_secs(6 * 60 * 60);
|
||||
const TEMP_FILE_STALE_AFTER: Duration = GLOBAL_LOCK_STALE_AFTER;
|
||||
const WORKER_MAX_RUNTIME: Duration = Duration::from_secs(5 * 60 * 60);
|
||||
const LOCK_FILE_NAME: &str = "rollout-compression.lock";
|
||||
static TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
/// Starts a best-effort background job that compresses cold local rollout files.
|
||||
///
|
||||
/// The worker is fire-and-forget: failures are logged, startup is not blocked,
|
||||
/// and a process-wide lock under `codex_home` prevents overlapping compression
|
||||
/// runs from the same local store.
|
||||
pub fn spawn_rollout_compression_worker(codex_home: PathBuf) {
|
||||
let Ok(handle) = tokio::runtime::Handle::try_current() else {
|
||||
warn!(
|
||||
"failed to start rollout compression worker for {}: no Tokio runtime",
|
||||
codex_home.display()
|
||||
);
|
||||
return;
|
||||
};
|
||||
handle.spawn(async move {
|
||||
if let Err(err) = run_rollout_compression_worker(codex_home.clone()).await {
|
||||
warn!(
|
||||
"rollout compression worker failed for {}: {err}",
|
||||
codex_home.display()
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn run_rollout_compression_worker(codex_home: PathBuf) -> io::Result<()> {
|
||||
let Some(_lock) = CompressionLock::try_acquire(codex_home.as_path())? else {
|
||||
debug!(
|
||||
"rollout compression worker already running for {}",
|
||||
codex_home.display()
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let started_at = Instant::now();
|
||||
cleanup_stale_temps(codex_home.as_path()).await?;
|
||||
let mut stats = CompressionStats::default();
|
||||
for root in [
|
||||
codex_home.join(SESSIONS_SUBDIR),
|
||||
codex_home.join(ARCHIVED_SESSIONS_SUBDIR),
|
||||
] {
|
||||
if started_at.elapsed() >= WORKER_MAX_RUNTIME {
|
||||
break;
|
||||
}
|
||||
compress_rollouts_in_root(root.as_path(), started_at, &mut stats).await?;
|
||||
}
|
||||
info!(
|
||||
"rollout compression worker finished: scanned={}, compressed={}, skipped={}, failed={}",
|
||||
stats.scanned, stats.compressed, stats.skipped, stats.failed
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn file_modified_time(path: &Path) -> io::Result<Option<time::OffsetDateTime>> {
|
||||
let Some(path) = existing_rollout_path(path).await else {
|
||||
return Ok(None);
|
||||
};
|
||||
let meta = tokio::fs::metadata(path).await?;
|
||||
let modified = meta.modified().ok();
|
||||
Ok(modified.map(time::OffsetDateTime::from))
|
||||
}
|
||||
|
||||
/// Opens a rollout line reader that transparently handles plain `.jsonl` and `.jsonl.zst` files.
|
||||
///
|
||||
/// If the requested path disappears during a compression or decompression transition, this retries
|
||||
/// the matching plain/compressed sibling once so readers do not need to know which representation is
|
||||
/// currently stored on disk.
|
||||
pub async fn open_rollout_line_reader(path: &Path) -> io::Result<RolloutLineReader> {
|
||||
match open_rollout_line_reader_once(path).await {
|
||||
Ok(reader) => Ok(reader),
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => {
|
||||
match open_rollout_line_reader_once(path).await {
|
||||
Ok(reader) => Ok(reader),
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => {
|
||||
open_rollout_line_reader_alternate(path).await
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn materialize_rollout_for_append(path: &Path) -> io::Result<PathBuf> {
|
||||
let path = path.to_path_buf();
|
||||
tokio::task::spawn_blocking(move || materialize_rollout_for_append_blocking(path.as_path()))
|
||||
.await
|
||||
.map_err(io::Error::other)?
|
||||
}
|
||||
|
||||
pub(crate) fn materialize_rollout_for_append_blocking(path: &Path) -> io::Result<PathBuf> {
|
||||
let plain_path = plain_rollout_path(path);
|
||||
if plain_path.exists() {
|
||||
return Ok(plain_path);
|
||||
}
|
||||
let compressed_path = compressed_rollout_path(plain_path.as_path());
|
||||
if !compressed_path.exists() {
|
||||
return Ok(plain_path);
|
||||
}
|
||||
|
||||
let temp_path = temp_path_for(plain_path.as_path(), "decompress");
|
||||
if let Some(parent) = plain_path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
let result = (|| {
|
||||
let permissions = std::fs::metadata(compressed_path.as_path())?.permissions();
|
||||
let input = File::open(compressed_path.as_path())?;
|
||||
let mut decoder = zstd::stream::read::Decoder::new(input)?;
|
||||
let mut output = create_file_with_permissions(temp_path.as_path(), &permissions)?;
|
||||
io::copy(&mut decoder, &mut output)?;
|
||||
output.flush()?;
|
||||
match std::fs::hard_link(temp_path.as_path(), plain_path.as_path()) {
|
||||
Ok(()) => {}
|
||||
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
let _ = std::fs::remove_file(temp_path.as_path());
|
||||
match std::fs::remove_file(compressed_path.as_path()) {
|
||||
Ok(()) => {}
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
Ok(())
|
||||
})();
|
||||
if result.is_err() {
|
||||
let _ = std::fs::remove_file(temp_path.as_path());
|
||||
}
|
||||
result?;
|
||||
Ok(plain_path)
|
||||
}
|
||||
|
||||
pub(crate) fn compressed_rollout_path(path: &Path) -> PathBuf {
|
||||
if is_compressed_rollout_path(path) {
|
||||
return path.to_path_buf();
|
||||
}
|
||||
let mut file_name = path
|
||||
.file_name()
|
||||
.map(OsStr::to_os_string)
|
||||
.unwrap_or_else(|| OsStr::new("rollout.jsonl").to_os_string());
|
||||
file_name.push(COMPRESSED_SUFFIX);
|
||||
path.with_file_name(file_name)
|
||||
}
|
||||
|
||||
pub(crate) fn plain_rollout_path(path: &Path) -> PathBuf {
|
||||
let Some(file_name) = path.file_name().and_then(OsStr::to_str) else {
|
||||
return path.to_path_buf();
|
||||
};
|
||||
let Some(plain_file_name) = file_name.strip_suffix(COMPRESSED_SUFFIX) else {
|
||||
return path.to_path_buf();
|
||||
};
|
||||
path.with_file_name(plain_file_name)
|
||||
}
|
||||
|
||||
pub(crate) fn is_compressed_rollout_path(path: &Path) -> bool {
|
||||
path.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.is_some_and(|name| name.ends_with(".jsonl.zst"))
|
||||
}
|
||||
|
||||
pub(crate) fn is_rollout_file_name(name: &str) -> bool {
|
||||
parse_rollout_file_name(name).is_some()
|
||||
}
|
||||
|
||||
pub(crate) fn parse_rollout_file_name(name: &str) -> Option<&str> {
|
||||
let name = name.strip_suffix(COMPRESSED_SUFFIX).unwrap_or(name);
|
||||
if name.starts_with("rollout-") && name.ends_with(".jsonl") {
|
||||
Some(name)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn should_skip_compressed_sibling(path: &Path) -> bool {
|
||||
is_compressed_rollout_path(path) && plain_rollout_path(path).exists()
|
||||
}
|
||||
|
||||
/// Line-oriented rollout reader returned by [`open_rollout_line_reader`].
|
||||
pub struct RolloutLineReader {
|
||||
inner: RolloutLineReaderInner,
|
||||
}
|
||||
|
||||
enum RolloutLineReaderInner {
|
||||
Plain(tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>),
|
||||
Blocking(Option<BlockingLineReader>),
|
||||
}
|
||||
|
||||
impl RolloutLineReader {
|
||||
/// Reads the next JSONL record from the rollout.
|
||||
pub async fn next_line(&mut self) -> io::Result<Option<String>> {
|
||||
match &mut self.inner {
|
||||
RolloutLineReaderInner::Plain(lines) => lines.next_line().await,
|
||||
RolloutLineReaderInner::Blocking(slot) => {
|
||||
let Some(mut reader) = slot.take() else {
|
||||
return Err(io::Error::other("compressed rollout reader is busy"));
|
||||
};
|
||||
let (line, reader) =
|
||||
tokio::task::spawn_blocking(move || (reader.next().transpose(), reader))
|
||||
.await
|
||||
.map_err(io::Error::other)?;
|
||||
*slot = Some(reader);
|
||||
line
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type BlockingLineReader = std::io::Lines<std::io::BufReader<Box<dyn Read + Send>>>;
|
||||
|
||||
#[derive(Default)]
|
||||
struct CompressionStats {
|
||||
scanned: usize,
|
||||
compressed: usize,
|
||||
skipped: usize,
|
||||
failed: usize,
|
||||
}
|
||||
|
||||
struct CompressionLock {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl CompressionLock {
|
||||
fn try_acquire(codex_home: &Path) -> io::Result<Option<Self>> {
|
||||
let lock_dir = codex_home.join(".tmp");
|
||||
std::fs::create_dir_all(lock_dir.as_path())?;
|
||||
let path = lock_dir.join(LOCK_FILE_NAME);
|
||||
match create_lock_file(path.as_path()) {
|
||||
Ok(()) => return Ok(Some(Self { path })),
|
||||
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
|
||||
let stale = std::fs::metadata(path.as_path())
|
||||
.and_then(|metadata| metadata.modified())
|
||||
.ok()
|
||||
.and_then(|modified| SystemTime::now().duration_since(modified).ok())
|
||||
.is_some_and(|age| age >= GLOBAL_LOCK_STALE_AFTER);
|
||||
if !stale {
|
||||
return Ok(None);
|
||||
}
|
||||
match std::fs::remove_file(path.as_path()) {
|
||||
Ok(()) => {}
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
match create_lock_file(path.as_path()) {
|
||||
Ok(()) => Ok(Some(Self { path })),
|
||||
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => Ok(None),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CompressionLock {
|
||||
fn drop(&mut self) {
|
||||
let _ = std::fs::remove_file(self.path.as_path());
|
||||
}
|
||||
}
|
||||
|
||||
fn create_lock_file(path: &Path) -> io::Result<()> {
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(path)?;
|
||||
writeln!(
|
||||
file,
|
||||
"pid={} started_at={:?}",
|
||||
std::process::id(),
|
||||
SystemTime::now()
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn compress_rollouts_in_root(
|
||||
root: &Path,
|
||||
started_at: Instant,
|
||||
stats: &mut CompressionStats,
|
||||
) -> io::Result<()> {
|
||||
if !tokio::fs::try_exists(root).await.unwrap_or(false) {
|
||||
return Ok(());
|
||||
}
|
||||
let mut stack = vec![root.to_path_buf()];
|
||||
while let Some(dir) = stack.pop() {
|
||||
if started_at.elapsed() >= WORKER_MAX_RUNTIME {
|
||||
break;
|
||||
}
|
||||
let mut read_dir = match tokio::fs::read_dir(dir.as_path()).await {
|
||||
Ok(read_dir) => read_dir,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read rollout compression directory {}: {err}",
|
||||
dir.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
while let Some(entry) = read_dir.next_entry().await? {
|
||||
if started_at.elapsed() >= WORKER_MAX_RUNTIME {
|
||||
break;
|
||||
}
|
||||
let path = entry.path();
|
||||
let file_type = match entry.file_type().await {
|
||||
Ok(file_type) => file_type,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read rollout compression file type {}: {err}",
|
||||
path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if file_type.is_dir() {
|
||||
stack.push(path);
|
||||
continue;
|
||||
}
|
||||
if !file_type.is_file() {
|
||||
continue;
|
||||
}
|
||||
let Some(file_name) = path.file_name().and_then(OsStr::to_str) else {
|
||||
continue;
|
||||
};
|
||||
if !file_name.starts_with("rollout-") || !file_name.ends_with(".jsonl") {
|
||||
continue;
|
||||
}
|
||||
stats.scanned = stats.scanned.saturating_add(1);
|
||||
match compress_rollout_if_cold(path.as_path()).await {
|
||||
Ok(true) => stats.compressed = stats.compressed.saturating_add(1),
|
||||
Ok(false) => stats.skipped = stats.skipped.saturating_add(1),
|
||||
Err(err) => {
|
||||
stats.failed = stats.failed.saturating_add(1);
|
||||
warn!("failed to compress rollout {}: {err}", path.display());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn compress_rollout_if_cold(path: &Path) -> io::Result<bool> {
|
||||
let path = path.to_path_buf();
|
||||
tokio::task::spawn_blocking(move || compress_rollout_if_cold_blocking(path.as_path()))
|
||||
.await
|
||||
.map_err(io::Error::other)?
|
||||
}
|
||||
|
||||
fn compress_rollout_if_cold_blocking(path: &Path) -> io::Result<bool> {
|
||||
let before = match cold_file_state(path)? {
|
||||
Some(state) => state,
|
||||
None => return Ok(false),
|
||||
};
|
||||
let compressed_path = compressed_rollout_path(path);
|
||||
let temp_path = temp_path_for(compressed_path.as_path(), "compress");
|
||||
let result = (|| {
|
||||
encode_zstd(path, temp_path.as_path(), &before.permissions)?;
|
||||
verify_zstd(temp_path.as_path())?;
|
||||
if !same_file_state(path, &before)? {
|
||||
return Ok(false);
|
||||
}
|
||||
set_modified_time(temp_path.as_path(), before.modified)?;
|
||||
match std::fs::remove_file(compressed_path.as_path()) {
|
||||
Ok(()) => {}
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
std::fs::rename(temp_path.as_path(), compressed_path.as_path())?;
|
||||
if !same_file_state(path, &before)? {
|
||||
let _ = std::fs::remove_file(compressed_path.as_path());
|
||||
return Ok(false);
|
||||
}
|
||||
std::fs::remove_file(path)?;
|
||||
Ok(true)
|
||||
})();
|
||||
if !matches!(result, Ok(true)) {
|
||||
let _ = std::fs::remove_file(temp_path.as_path());
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
struct FileState {
|
||||
len: u64,
|
||||
modified: SystemTime,
|
||||
permissions: Permissions,
|
||||
}
|
||||
|
||||
fn cold_file_state(path: &Path) -> io::Result<Option<FileState>> {
|
||||
let metadata = match std::fs::metadata(path) {
|
||||
Ok(metadata) => metadata,
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None),
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
if !metadata.is_file() {
|
||||
return Ok(None);
|
||||
}
|
||||
let modified = metadata.modified()?;
|
||||
let age = SystemTime::now()
|
||||
.duration_since(modified)
|
||||
.unwrap_or(Duration::ZERO);
|
||||
if age < MIN_ROLLOUT_AGE {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(FileState {
|
||||
len: metadata.len(),
|
||||
modified,
|
||||
permissions: metadata.permissions(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn same_file_state(path: &Path, expected: &FileState) -> io::Result<bool> {
|
||||
match std::fs::metadata(path) {
|
||||
Ok(metadata) => Ok(metadata.len() == expected.len
|
||||
&& metadata.modified()? == expected.modified
|
||||
&& metadata.permissions() == expected.permissions),
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
fn encode_zstd(source: &Path, temp_path: &Path, permissions: &Permissions) -> io::Result<()> {
|
||||
let mut input = File::open(source)?;
|
||||
let output = create_file_with_permissions(temp_path, permissions)?;
|
||||
let mut encoder = zstd::stream::write::Encoder::new(output, COMPRESSION_LEVEL)?;
|
||||
io::copy(&mut input, &mut encoder)?;
|
||||
encoder.finish()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn create_file_with_permissions(path: &Path, permissions: &Permissions) -> io::Result<File> {
|
||||
let file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.mode(permissions.mode() & 0o7777)
|
||||
.open(path)?;
|
||||
file.set_permissions(permissions.clone())?;
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
fn create_file_with_permissions(path: &Path, permissions: &Permissions) -> io::Result<File> {
|
||||
let file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(path)?;
|
||||
file.set_permissions(permissions.clone())?;
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
fn verify_zstd(path: &Path) -> io::Result<()> {
|
||||
let input = File::open(path)?;
|
||||
let mut decoder = zstd::stream::read::Decoder::new(input)?;
|
||||
let mut sink = io::sink();
|
||||
io::copy(&mut decoder, &mut sink)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_modified_time(path: &Path, modified: SystemTime) -> io::Result<()> {
|
||||
let times = FileTimes::new().set_modified(modified);
|
||||
std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path)?
|
||||
.set_times(times)
|
||||
}
|
||||
|
||||
async fn cleanup_stale_temps(codex_home: &Path) -> io::Result<()> {
|
||||
for root in [
|
||||
codex_home.join(SESSIONS_SUBDIR),
|
||||
codex_home.join(ARCHIVED_SESSIONS_SUBDIR),
|
||||
] {
|
||||
cleanup_stale_temps_in_root(root.as_path()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cleanup_stale_temps_in_root(root: &Path) -> io::Result<()> {
|
||||
if !tokio::fs::try_exists(root).await.unwrap_or(false) {
|
||||
return Ok(());
|
||||
}
|
||||
let mut stack = vec![root.to_path_buf()];
|
||||
while let Some(dir) = stack.pop() {
|
||||
let mut read_dir = match tokio::fs::read_dir(dir.as_path()).await {
|
||||
Ok(read_dir) => read_dir,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read rollout temp cleanup directory {}: {err}",
|
||||
dir.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
while let Some(entry) = read_dir.next_entry().await? {
|
||||
let path = entry.path();
|
||||
let file_type = match entry.file_type().await {
|
||||
Ok(file_type) => file_type,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read rollout temp cleanup file type {}: {err}",
|
||||
path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if file_type.is_dir() {
|
||||
stack.push(path);
|
||||
continue;
|
||||
}
|
||||
if file_type.is_file()
|
||||
&& path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.is_some_and(|name| name.ends_with(TEMP_SUFFIX))
|
||||
{
|
||||
let stale = entry
|
||||
.metadata()
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|metadata| metadata.modified().ok())
|
||||
.and_then(|modified| SystemTime::now().duration_since(modified).ok())
|
||||
.is_some_and(|age| age >= TEMP_FILE_STALE_AFTER);
|
||||
if !stale {
|
||||
continue;
|
||||
}
|
||||
match tokio::fs::remove_file(path.as_path()).await {
|
||||
Ok(()) => {}
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
|
||||
Err(err) => warn!(
|
||||
"failed to remove stale rollout temp {}: {err}",
|
||||
path.display()
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the existing rollout path, preferring the plain `.jsonl` file over
|
||||
/// its `.jsonl.zst` compressed sibling.
|
||||
pub async fn existing_rollout_path(path: &Path) -> Option<PathBuf> {
|
||||
let plain_path = plain_rollout_path(path);
|
||||
if tokio::fs::try_exists(plain_path.as_path())
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Some(plain_path);
|
||||
}
|
||||
let compressed_path = compressed_rollout_path(plain_path.as_path());
|
||||
if tokio::fs::try_exists(compressed_path.as_path())
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Some(compressed_path);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
async fn open_rollout_line_reader_once(path: &Path) -> io::Result<RolloutLineReader> {
|
||||
let path = existing_rollout_path(path)
|
||||
.await
|
||||
.unwrap_or_else(|| path.to_path_buf());
|
||||
if is_compressed_rollout_path(path.as_path()) {
|
||||
return open_compressed_reader(path).await;
|
||||
}
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
Ok(RolloutLineReader {
|
||||
inner: RolloutLineReaderInner::Plain(tokio::io::BufReader::new(file).lines()),
|
||||
})
|
||||
}
|
||||
|
||||
async fn open_rollout_line_reader_alternate(path: &Path) -> io::Result<RolloutLineReader> {
|
||||
let plain_path = plain_rollout_path(path);
|
||||
let compressed_path = compressed_rollout_path(plain_path.as_path());
|
||||
if is_compressed_rollout_path(path) {
|
||||
let file = tokio::fs::File::open(plain_path).await?;
|
||||
return Ok(RolloutLineReader {
|
||||
inner: RolloutLineReaderInner::Plain(tokio::io::BufReader::new(file).lines()),
|
||||
});
|
||||
}
|
||||
open_compressed_reader(compressed_path).await
|
||||
}
|
||||
|
||||
async fn open_compressed_reader(path: PathBuf) -> io::Result<RolloutLineReader> {
|
||||
let reader = tokio::task::spawn_blocking(move || {
|
||||
let input = File::open(path.as_path())?;
|
||||
let decoder = zstd::stream::read::Decoder::new(input)?;
|
||||
Ok::<_, io::Error>(io::BufReader::new(Box::new(decoder) as Box<dyn Read + Send>).lines())
|
||||
})
|
||||
.await
|
||||
.map_err(io::Error::other)??;
|
||||
Ok(RolloutLineReader {
|
||||
inner: RolloutLineReaderInner::Blocking(Some(reader)),
|
||||
})
|
||||
}
|
||||
|
||||
fn temp_path_for(path: &Path, operation: &str) -> PathBuf {
|
||||
let mut file_name = path
|
||||
.file_name()
|
||||
.map(OsStr::to_os_string)
|
||||
.unwrap_or_else(|| OsStr::new("rollout").to_os_string());
|
||||
let counter = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
file_name.push(format!(".{operation}.{}.{counter}.tmp", std::process::id()));
|
||||
path.with_file_name(file_name)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "compression_tests.rs"]
|
||||
mod tests;
|
||||
281
codex-rs/rollout/src/compression_tests.rs
Normal file
281
codex-rs/rollout/src/compression_tests.rs
Normal file
@@ -0,0 +1,281 @@
|
||||
use std::fs;
|
||||
use std::fs::FileTimes;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
use crate::RolloutRecorder;
|
||||
use crate::append_rollout_item_to_path;
|
||||
|
||||
#[tokio::test]
|
||||
async fn load_rollout_items_reads_compressed_rollout() -> anyhow::Result<()> {
|
||||
let home = TempDir::new()?;
|
||||
let uuid = Uuid::from_u128(1);
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string())?;
|
||||
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
|
||||
write_rollout(&rollout_path, thread_id, "hello compressed")?;
|
||||
compress_now(&rollout_path)?;
|
||||
|
||||
let (items, loaded_thread_id, parse_errors) =
|
||||
RolloutRecorder::load_rollout_items(&rollout_path).await?;
|
||||
|
||||
assert_eq!(loaded_thread_id, Some(thread_id));
|
||||
assert_eq!(parse_errors, 0);
|
||||
assert_eq!(items.len(), 2);
|
||||
assert!(!rollout_path.exists());
|
||||
assert!(compressed_rollout_path(&rollout_path).exists());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn append_rollout_item_materializes_compressed_rollout() -> anyhow::Result<()> {
|
||||
let home = TempDir::new()?;
|
||||
let uuid = Uuid::from_u128(2);
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string())?;
|
||||
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
|
||||
write_rollout(&rollout_path, thread_id, "hello before append")?;
|
||||
compress_now(&rollout_path)?;
|
||||
|
||||
append_rollout_item_to_path(
|
||||
&rollout_path,
|
||||
&RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "hello after append".to_string(),
|
||||
..Default::default()
|
||||
})),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert!(rollout_path.exists());
|
||||
assert!(!compressed_rollout_path(&rollout_path).exists());
|
||||
let (items, loaded_thread_id, parse_errors) =
|
||||
RolloutRecorder::load_rollout_items(&rollout_path).await?;
|
||||
assert_eq!(loaded_thread_id, Some(thread_id));
|
||||
assert_eq!(parse_errors, 0);
|
||||
assert_eq!(items.len(), 3);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn worker_compresses_old_active_and_archived_rollouts() -> anyhow::Result<()> {
|
||||
let home = TempDir::new()?;
|
||||
let active_uuid = Uuid::from_u128(3);
|
||||
let active_id = ThreadId::from_string(&active_uuid.to_string())?;
|
||||
let active_path = rollout_path(home.path(), "2025-01-03T12-00-00", active_uuid);
|
||||
write_rollout(&active_path, active_id, "old active")?;
|
||||
set_old_mtime(&active_path)?;
|
||||
|
||||
let archived_uuid = Uuid::from_u128(4);
|
||||
let archived_id = ThreadId::from_string(&archived_uuid.to_string())?;
|
||||
let archived_path = home
|
||||
.path()
|
||||
.join("archived_sessions")
|
||||
.join(format!("rollout-2025-01-04T12-00-00-{archived_uuid}.jsonl"));
|
||||
write_rollout(&archived_path, archived_id, "old archived")?;
|
||||
set_old_mtime(&archived_path)?;
|
||||
|
||||
let fresh_uuid = Uuid::from_u128(5);
|
||||
let fresh_id = ThreadId::from_string(&fresh_uuid.to_string())?;
|
||||
let fresh_path = rollout_path(home.path(), "2025-01-05T12-00-00", fresh_uuid);
|
||||
write_rollout(&fresh_path, fresh_id, "fresh active")?;
|
||||
|
||||
let stale_temp = active_path.with_file_name("rollout-stale.jsonl.zst.tmp");
|
||||
fs::write(&stale_temp, "stale temp")?;
|
||||
set_old_mtime(&stale_temp)?;
|
||||
|
||||
let fresh_temp = active_path.with_file_name("rollout-fresh.jsonl.zst.tmp");
|
||||
fs::write(&fresh_temp, "fresh temp")?;
|
||||
|
||||
run_rollout_compression_worker(home.path().to_path_buf()).await?;
|
||||
|
||||
assert!(!active_path.exists());
|
||||
assert!(compressed_rollout_path(&active_path).exists());
|
||||
assert!(!archived_path.exists());
|
||||
assert!(compressed_rollout_path(&archived_path).exists());
|
||||
assert!(fresh_path.exists());
|
||||
assert!(!compressed_rollout_path(&fresh_path).exists());
|
||||
assert!(!stale_temp.exists());
|
||||
assert!(fresh_temp.exists());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn compression_preserves_rollout_permissions() -> anyhow::Result<()> {
|
||||
let home = TempDir::new()?;
|
||||
let uuid = Uuid::from_u128(6);
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string())?;
|
||||
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
|
||||
write_rollout(&rollout_path, thread_id, "restricted transcript")?;
|
||||
fs::set_permissions(&rollout_path, fs::Permissions::from_mode(0o600))?;
|
||||
set_old_mtime(&rollout_path)?;
|
||||
|
||||
run_rollout_compression_worker(home.path().to_path_buf()).await?;
|
||||
|
||||
let compressed_path = compressed_rollout_path(&rollout_path);
|
||||
assert!(!rollout_path.exists());
|
||||
assert_eq!(
|
||||
fs::metadata(&compressed_path)?.permissions().mode() & 0o777,
|
||||
0o600
|
||||
);
|
||||
|
||||
append_rollout_item_to_path(
|
||||
&rollout_path,
|
||||
&RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "materialize restricted transcript".to_string(),
|
||||
..Default::default()
|
||||
})),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert!(rollout_path.exists());
|
||||
assert!(!compressed_path.exists());
|
||||
assert_eq!(
|
||||
fs::metadata(&rollout_path)?.permissions().mode() & 0o777,
|
||||
0o600
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn compression_preserves_read_only_rollout_permissions() -> anyhow::Result<()> {
|
||||
let home = TempDir::new()?;
|
||||
let uuid = Uuid::from_u128(7);
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string())?;
|
||||
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
|
||||
write_rollout(&rollout_path, thread_id, "read-only transcript")?;
|
||||
set_old_mtime(&rollout_path)?;
|
||||
fs::set_permissions(&rollout_path, fs::Permissions::from_mode(0o400))?;
|
||||
let source_modified = fs::metadata(&rollout_path)?.modified()?;
|
||||
|
||||
run_rollout_compression_worker(home.path().to_path_buf()).await?;
|
||||
|
||||
let compressed_path = compressed_rollout_path(&rollout_path);
|
||||
let compressed_metadata = fs::metadata(&compressed_path)?;
|
||||
assert!(!rollout_path.exists());
|
||||
assert_eq!(compressed_metadata.permissions().mode() & 0o777, 0o400);
|
||||
assert_eq!(compressed_metadata.modified()?, source_modified);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_path_by_id_handles_compressed_rollout_filenames() -> anyhow::Result<()> {
|
||||
let home = TempDir::new()?;
|
||||
let uuid = Uuid::from_u128(8);
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string())?;
|
||||
let rollout_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
|
||||
write_rollout(&rollout_path, thread_id, "compressed filename lookup")?;
|
||||
compress_now(&rollout_path)?;
|
||||
let compressed_path = compressed_rollout_path(&rollout_path);
|
||||
|
||||
assert_eq!(
|
||||
crate::find_thread_path_by_id_str(home.path(), &uuid.to_string(), None).await?,
|
||||
Some(compressed_path)
|
||||
);
|
||||
assert_eq!(
|
||||
crate::find_thread_path_by_id_str(home.path(), "not-a-uuid", None).await?,
|
||||
None
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_path_by_id_ignores_compression_temp_matches() -> anyhow::Result<()> {
|
||||
let home = TempDir::new()?;
|
||||
let uuid = Uuid::from_u128(9);
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string())?;
|
||||
let temp_path = rollout_path(home.path(), "2025-01-03T12-00-00", uuid).with_file_name(format!(
|
||||
"rollout-2025-01-03T12-00-00-{uuid}.jsonl.zst.compress.1.0.tmp"
|
||||
));
|
||||
write_rollout(&temp_path, thread_id, "temporary file should not resolve")?;
|
||||
|
||||
assert_eq!(
|
||||
crate::find_thread_path_by_id_str(home.path(), &uuid.to_string(), None).await?,
|
||||
None
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rollout_path(home: &std::path::Path, ts: &str, uuid: Uuid) -> std::path::PathBuf {
|
||||
home.join("sessions/2025/01/03")
|
||||
.join(format!("rollout-{ts}-{uuid}.jsonl"))
|
||||
}
|
||||
|
||||
fn write_rollout(path: &std::path::Path, thread_id: ThreadId, message: &str) -> anyhow::Result<()> {
|
||||
let parent = path.parent().expect("rollout path should have parent");
|
||||
fs::create_dir_all(parent)?;
|
||||
let session_meta_line = SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: thread_id,
|
||||
forked_from_id: None,
|
||||
timestamp: "2025-01-03T12:00:00Z".to_string(),
|
||||
cwd: parent.to_path_buf(),
|
||||
originator: "test".to_string(),
|
||||
cli_version: "test".to_string(),
|
||||
source: SessionSource::Cli,
|
||||
thread_source: None,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
model_provider: None,
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
};
|
||||
let lines = [
|
||||
RolloutLine {
|
||||
timestamp: "2025-01-03T12:00:00Z".to_string(),
|
||||
item: RolloutItem::SessionMeta(session_meta_line),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: "2025-01-03T12:00:01Z".to_string(),
|
||||
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: message.to_string(),
|
||||
..Default::default()
|
||||
})),
|
||||
},
|
||||
];
|
||||
let jsonl = lines
|
||||
.iter()
|
||||
.map(serde_json::to_string)
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.join("\n");
|
||||
fs::write(path, format!("{jsonl}\n"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compress_now(path: &std::path::Path) -> anyhow::Result<()> {
|
||||
let compressed_path = compressed_rollout_path(path);
|
||||
let permissions = fs::metadata(path)?.permissions();
|
||||
encode_zstd(path, compressed_path.as_path(), &permissions)?;
|
||||
fs::remove_file(path)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_old_mtime(path: &std::path::Path) -> anyhow::Result<()> {
|
||||
let old = SystemTime::now()
|
||||
.checked_sub(Duration::from_secs(8 * 24 * 60 * 60))
|
||||
.expect("old timestamp should be representable");
|
||||
let times = FileTimes::new().set_modified(old);
|
||||
fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.open(path)?
|
||||
.set_times(times)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -4,6 +4,7 @@ use std::sync::LazyLock;
|
||||
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
|
||||
pub(crate) mod compression;
|
||||
pub(crate) mod config;
|
||||
pub(crate) mod list;
|
||||
pub(crate) mod metadata;
|
||||
@@ -32,6 +33,10 @@ pub static INTERACTIVE_SESSION_SOURCES: LazyLock<Vec<SessionSource>> = LazyLock:
|
||||
});
|
||||
|
||||
pub use codex_protocol::protocol::SessionMeta;
|
||||
pub use compression::RolloutLineReader;
|
||||
pub use compression::existing_rollout_path;
|
||||
pub use compression::open_rollout_line_reader;
|
||||
pub use compression::spawn_rollout_compression_worker;
|
||||
pub use config::Config;
|
||||
pub use config::RolloutConfig;
|
||||
pub use config::RolloutConfigView;
|
||||
|
||||
@@ -18,6 +18,7 @@ use uuid::Uuid;
|
||||
|
||||
use super::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::compression;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::state_db;
|
||||
use codex_file_search as file_search;
|
||||
@@ -895,7 +896,9 @@ async fn collect_flat_rollout_files(
|
||||
let Some(name_str) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
|
||||
if !compression::is_rollout_file_name(name_str)
|
||||
|| compression::should_skip_compressed_sibling(entry.path().as_path())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let Some((ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
|
||||
@@ -915,7 +918,9 @@ async fn collect_rollout_day_files(
|
||||
day_path: &Path,
|
||||
) -> io::Result<Vec<(OffsetDateTime, Uuid, PathBuf)>> {
|
||||
let mut day_files = collect_files(day_path, |name_str, path| {
|
||||
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
|
||||
if !compression::is_rollout_file_name(name_str)
|
||||
|| compression::should_skip_compressed_sibling(path)
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -928,7 +933,8 @@ async fn collect_rollout_day_files(
|
||||
}
|
||||
|
||||
pub(crate) fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
|
||||
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
|
||||
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl[.zst]
|
||||
let name = compression::parse_rollout_file_name(name)?;
|
||||
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
|
||||
|
||||
// Scan from the right for a '-' such that the suffix parses as a UUID.
|
||||
@@ -985,7 +991,9 @@ async fn collect_flat_files_by_updated_at(
|
||||
let Some(name_str) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
|
||||
if !compression::is_rollout_file_name(name_str)
|
||||
|| compression::should_skip_compressed_sibling(entry.path().as_path())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let Some((_ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
|
||||
@@ -1073,11 +1081,7 @@ impl<'a> ProviderMatcher<'a> {
|
||||
}
|
||||
|
||||
async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTailSummary> {
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let mut lines = compression::open_rollout_line_reader(path).await?;
|
||||
let mut summary = HeadTailSummary::default();
|
||||
let mut lines_scanned = 0usize;
|
||||
|
||||
@@ -1163,11 +1167,7 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
|
||||
/// Read up to `HEAD_RECORD_LIMIT` records from the start of the rollout file at `path`.
|
||||
/// This should be enough to produce a summary including the session meta line.
|
||||
pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Value>> {
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let mut lines = compression::open_rollout_line_reader(path).await?;
|
||||
let mut head = Vec::new();
|
||||
|
||||
while head.len() < HEAD_RECORD_LIMIT {
|
||||
@@ -1251,13 +1251,9 @@ pub async fn read_session_meta_line(path: &Path) -> io::Result<SessionMetaLine>
|
||||
}
|
||||
|
||||
async fn file_modified_time(path: &Path) -> io::Result<Option<OffsetDateTime>> {
|
||||
let meta = tokio::fs::metadata(path).await?;
|
||||
let modified = meta.modified().ok();
|
||||
let Some(modified) = modified else {
|
||||
return Ok(None);
|
||||
};
|
||||
let dt = OffsetDateTime::from(modified);
|
||||
Ok(truncate_to_millis(dt))
|
||||
Ok(compression::file_modified_time(path)
|
||||
.await?
|
||||
.and_then(truncate_to_millis))
|
||||
}
|
||||
|
||||
fn format_rfc3339(dt: OffsetDateTime) -> Option<String> {
|
||||
@@ -1298,16 +1294,18 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
.await
|
||||
{
|
||||
Ok(Some(db_path)) => {
|
||||
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
|
||||
match read_session_meta_line(&db_path).await {
|
||||
if let Some(existing_db_path) =
|
||||
compression::existing_rollout_path(db_path.as_path()).await
|
||||
{
|
||||
match read_session_meta_line(&existing_db_path).await {
|
||||
Ok(meta_line) if meta_line.meta.id == thread_id => {
|
||||
return Ok(Some(db_path));
|
||||
return Ok(Some(existing_db_path));
|
||||
}
|
||||
Ok(meta_line) => {
|
||||
tracing::error!(
|
||||
"state db returned rollout path for thread {id_str} but file belongs to thread {}: {}",
|
||||
meta_line.meta.id,
|
||||
db_path.display()
|
||||
existing_db_path.display()
|
||||
);
|
||||
tracing::warn!(
|
||||
"state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path"
|
||||
@@ -1321,9 +1319,9 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
Err(err) => {
|
||||
tracing::debug!(
|
||||
"state db returned rollout path for thread {id_str} that could not be verified: {}: {err}",
|
||||
db_path.display()
|
||||
existing_db_path.display()
|
||||
);
|
||||
unverified_db_path = Some(db_path);
|
||||
unverified_db_path = Some(existing_db_path);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -1366,10 +1364,27 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let results = file_search::run(id_str, vec![root], options, /*cancel_flag*/ None)
|
||||
.map_err(|e| io::Error::other(format!("file search failed: {e}")))?;
|
||||
let results = file_search::run(
|
||||
id_str,
|
||||
vec![root.clone()],
|
||||
options,
|
||||
/*cancel_flag*/ None,
|
||||
)
|
||||
.map_err(|e| io::Error::other(format!("file search failed: {e}")))?;
|
||||
|
||||
let found = results.matches.into_iter().next().map(|m| m.full_path());
|
||||
let found = match results
|
||||
.matches
|
||||
.into_iter()
|
||||
.map(|m| m.full_path())
|
||||
.find(|path| {
|
||||
path.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.is_some_and(|name| !name.ends_with(".tmp"))
|
||||
&& !compression::should_skip_compressed_sibling(path)
|
||||
}) {
|
||||
Some(path) => Some(path),
|
||||
None => find_rollout_path_by_id_from_filenames(root.as_path(), id_str).await?,
|
||||
};
|
||||
if let Some(found_path) = found.as_ref() {
|
||||
tracing::debug!("state db missing rollout path for thread {id_str}");
|
||||
tracing::warn!(
|
||||
@@ -1394,6 +1409,45 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
Ok(found.or(unverified_db_path))
|
||||
}
|
||||
|
||||
async fn find_rollout_path_by_id_from_filenames(
|
||||
root: &Path,
|
||||
id_str: &str,
|
||||
) -> io::Result<Option<PathBuf>> {
|
||||
let Ok(target) = Uuid::parse_str(id_str) else {
|
||||
return Ok(None);
|
||||
};
|
||||
let mut stack = vec![root.to_path_buf()];
|
||||
while let Some(dir) = stack.pop() {
|
||||
let mut read_dir = match tokio::fs::read_dir(dir.as_path()).await {
|
||||
Ok(read_dir) => read_dir,
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => continue,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
while let Some(entry) = read_dir.next_entry().await? {
|
||||
let path = entry.path();
|
||||
let file_type = entry.file_type().await?;
|
||||
if file_type.is_dir() {
|
||||
stack.push(path);
|
||||
continue;
|
||||
}
|
||||
if !file_type.is_file() || compression::should_skip_compressed_sibling(path.as_path()) {
|
||||
continue;
|
||||
}
|
||||
let file_name = entry.file_name();
|
||||
let Some(name) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
let Some((_ts, id)) = parse_timestamp_uuid_from_filename(name) else {
|
||||
continue;
|
||||
};
|
||||
if id == target {
|
||||
return Ok(Some(path));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Locate a recorded thread rollout file by its UUID string using the existing
|
||||
/// paginated listing implementation. Returns `Ok(Some(path))` if found, `Ok(None)` if not present
|
||||
/// or the id is invalid.
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use crate::SESSIONS_SUBDIR;
|
||||
use crate::compression;
|
||||
use crate::list::parse_timestamp_uuid_from_filename;
|
||||
use crate::recorder::RolloutRecorder;
|
||||
use crate::state_db::normalize_cwd_for_state_db;
|
||||
@@ -27,8 +28,6 @@ use std::path::PathBuf;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
const ROLLOUT_PREFIX: &str = "rollout-";
|
||||
const ROLLOUT_SUFFIX: &str = ".jsonl";
|
||||
const BACKFILL_BATCH_SIZE: usize = 200;
|
||||
#[cfg(not(test))]
|
||||
const BACKFILL_LEASE_SECONDS: i64 = 900;
|
||||
@@ -78,9 +77,7 @@ pub fn builder_from_items(
|
||||
}
|
||||
|
||||
let file_name = rollout_path.file_name()?.to_str()?;
|
||||
if !file_name.starts_with(ROLLOUT_PREFIX) || !file_name.ends_with(ROLLOUT_SUFFIX) {
|
||||
return None;
|
||||
}
|
||||
let file_name = compression::parse_rollout_file_name(file_name)?;
|
||||
let (created_ts, uuid) = parse_timestamp_uuid_from_filename(file_name)?;
|
||||
let created_at =
|
||||
DateTime::<Utc>::from_timestamp(created_ts.unix_timestamp(), 0)?.with_nanosecond(0)?;
|
||||
@@ -364,9 +361,8 @@ fn backfill_watermark_for_path(codex_home: &Path, path: &Path) -> String {
|
||||
}
|
||||
|
||||
async fn file_modified_time_utc(path: &Path) -> Option<DateTime<Utc>> {
|
||||
let modified = tokio::fs::metadata(path).await.ok()?.modified().ok()?;
|
||||
let updated_at: DateTime<Utc> = modified.into();
|
||||
Some(updated_at)
|
||||
let modified = compression::file_modified_time(path).await.ok()??;
|
||||
DateTime::<Utc>::from_timestamp(modified.unix_timestamp(), modified.nanosecond())
|
||||
}
|
||||
|
||||
fn parse_timestamp_to_utc(ts: &str) -> Option<DateTime<Utc>> {
|
||||
@@ -425,7 +421,9 @@ async fn collect_rollout_paths(root: &Path) -> std::io::Result<Vec<PathBuf>> {
|
||||
let Some(name) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
if name.starts_with(ROLLOUT_PREFIX) && name.ends_with(ROLLOUT_SUFFIX) {
|
||||
if compression::is_rollout_file_name(name)
|
||||
&& !compression::should_skip_compressed_sibling(path.as_path())
|
||||
{
|
||||
paths.push(path);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ use tracing::warn;
|
||||
|
||||
use super::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::compression;
|
||||
use super::list::Cursor;
|
||||
use super::list::SortDirection;
|
||||
use super::list::ThreadItem;
|
||||
@@ -694,17 +695,20 @@ impl RolloutRecorder {
|
||||
|
||||
(None, Some(log_file_info), path, Some(session_meta))
|
||||
}
|
||||
RolloutRecorderParams::Resume { path } => (
|
||||
Some(
|
||||
tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.await?,
|
||||
),
|
||||
None,
|
||||
path,
|
||||
None,
|
||||
),
|
||||
RolloutRecorderParams::Resume { path } => {
|
||||
let path = compression::materialize_rollout_for_append(path.as_path()).await?;
|
||||
(
|
||||
Some(
|
||||
tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.await?,
|
||||
),
|
||||
None,
|
||||
path,
|
||||
None,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
// Clone the cwd for the spawned task to collect git info asynchronously
|
||||
@@ -815,19 +819,17 @@ impl RolloutRecorder {
|
||||
path: &Path,
|
||||
) -> std::io::Result<(Vec<RolloutItem>, Option<ThreadId>, usize)> {
|
||||
trace!("Resuming rollout from {path:?}");
|
||||
let text = tokio::fs::read_to_string(path).await?;
|
||||
if text.trim().is_empty() {
|
||||
return Err(IoError::other("empty session file"));
|
||||
}
|
||||
|
||||
let mut items: Vec<RolloutItem> = Vec::new();
|
||||
let mut thread_id: Option<ThreadId> = None;
|
||||
let mut parse_errors = 0usize;
|
||||
for line in text.lines() {
|
||||
let mut reader = compression::open_rollout_line_reader(path).await?;
|
||||
let mut saw_non_empty_line = false;
|
||||
while let Some(line) = reader.next_line().await? {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let mut v: Value = match serde_json::from_str(line) {
|
||||
saw_non_empty_line = true;
|
||||
let mut v: Value = match serde_json::from_str(&line) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!("failed to parse line as JSON: {line:?}, error: {e}");
|
||||
@@ -870,6 +872,9 @@ impl RolloutRecorder {
|
||||
}
|
||||
}
|
||||
}
|
||||
if !saw_non_empty_line {
|
||||
return Err(IoError::other("empty session file"));
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Resumed rollout with {} items, thread ID: {:?}, parse errors: {}",
|
||||
@@ -1357,6 +1362,7 @@ fn precompute_log_file_info(
|
||||
}
|
||||
|
||||
fn open_log_file(path: &Path) -> std::io::Result<File> {
|
||||
let path = compression::materialize_rollout_for_append_blocking(path)?;
|
||||
let Some(parent) = path.parent() else {
|
||||
return Err(IoError::other(format!(
|
||||
"rollout path has no parent: {}",
|
||||
@@ -1616,6 +1622,7 @@ pub async fn append_rollout_item_to_path(
|
||||
rollout_path: &Path,
|
||||
item: &RolloutItem,
|
||||
) -> std::io::Result<()> {
|
||||
let rollout_path = compression::materialize_rollout_for_append(rollout_path).await?;
|
||||
let file = tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(rollout_path)
|
||||
|
||||
@@ -11,11 +11,11 @@ use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
|
||||
use regex::Regex;
|
||||
use regex::RegexBuilder;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::process::Command;
|
||||
|
||||
use super::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::compression;
|
||||
|
||||
const MATCH_CONTEXT_BEFORE_CHARS: usize = 48;
|
||||
const MATCH_CONTEXT_AFTER_CHARS: usize = 96;
|
||||
@@ -32,7 +32,10 @@ pub async fn search_rollout_paths(
|
||||
SESSIONS_SUBDIR
|
||||
});
|
||||
let search_term = json_escaped_search_term(search_term)?;
|
||||
ripgrep_rollout_paths(rg_command, root.as_path(), search_term.as_str()).await
|
||||
let mut matches =
|
||||
ripgrep_rollout_paths(rg_command, root.as_path(), search_term.as_str()).await?;
|
||||
matches.extend(scan_compressed_rollout_paths(root.as_path(), search_term.as_str()).await?);
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
async fn ripgrep_rollout_paths(
|
||||
@@ -107,7 +110,11 @@ async fn scan_rollout_paths(root: &Path, search_term: &str) -> io::Result<HashSe
|
||||
continue;
|
||||
}
|
||||
if !file_type.is_file()
|
||||
|| path.extension().and_then(|extension| extension.to_str()) != Some("jsonl")
|
||||
|| !path
|
||||
.file_name()
|
||||
.and_then(|name| name.to_str())
|
||||
.is_some_and(compression::is_rollout_file_name)
|
||||
|| compression::should_skip_compressed_sibling(path.as_path())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
@@ -121,8 +128,7 @@ async fn scan_rollout_paths(root: &Path, search_term: &str) -> io::Result<HashSe
|
||||
}
|
||||
|
||||
async fn rollout_contains(path: &Path, search_term: &Regex) -> io::Result<bool> {
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
let mut lines = tokio::io::BufReader::new(file).lines();
|
||||
let mut lines = compression::open_rollout_line_reader(path).await?;
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
if search_term.is_match(line.as_str()) {
|
||||
return Ok(true);
|
||||
@@ -135,8 +141,7 @@ pub async fn first_rollout_content_match_snippet(
|
||||
path: &Path,
|
||||
search_term: &str,
|
||||
) -> io::Result<Option<String>> {
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
let mut lines = tokio::io::BufReader::new(file).lines();
|
||||
let mut lines = compression::open_rollout_line_reader(path).await?;
|
||||
let json_search_term = case_insensitive_literal_regex(json_escaped_search_term(search_term)?)?;
|
||||
let search_term = case_insensitive_literal_regex(search_term)?;
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
@@ -149,6 +154,42 @@ pub async fn first_rollout_content_match_snippet(
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn scan_compressed_rollout_paths(
|
||||
root: &Path,
|
||||
search_term: &str,
|
||||
) -> io::Result<HashSet<PathBuf>> {
|
||||
let mut matches = HashSet::new();
|
||||
let mut dirs = vec![root.to_path_buf()];
|
||||
let search_term = case_insensitive_literal_regex(search_term)?;
|
||||
|
||||
while let Some(dir) = dirs.pop() {
|
||||
let mut entries = match tokio::fs::read_dir(dir).await {
|
||||
Ok(entries) => entries,
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => continue,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
let file_type = entry.file_type().await?;
|
||||
if file_type.is_dir() {
|
||||
dirs.push(path);
|
||||
continue;
|
||||
}
|
||||
if !file_type.is_file()
|
||||
|| !compression::is_compressed_rollout_path(path.as_path())
|
||||
|| compression::should_skip_compressed_sibling(path.as_path())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if rollout_contains(path.as_path(), &search_term).await? {
|
||||
matches.insert(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
fn json_escaped_search_term(search_term: &str) -> io::Result<String> {
|
||||
let serialized = serde_json::to_string(search_term).map_err(io::Error::other)?;
|
||||
Ok(serialized[1..serialized.len() - 1].to_string())
|
||||
|
||||
@@ -412,10 +412,11 @@ pub async fn list_threads_db(
|
||||
Ok(mut page) => {
|
||||
let mut valid_items = Vec::with_capacity(page.items.len());
|
||||
for item in page.items {
|
||||
if tokio::fs::try_exists(&item.rollout_path)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
if let Some(existing_path) =
|
||||
crate::compression::existing_rollout_path(item.rollout_path.as_path()).await
|
||||
{
|
||||
let mut item = item;
|
||||
item.rollout_path = existing_path;
|
||||
valid_items.push(item);
|
||||
} else {
|
||||
warn!(
|
||||
|
||||
@@ -72,10 +72,11 @@ pub(super) fn matching_rollout_file_name(
|
||||
),
|
||||
});
|
||||
};
|
||||
let required_suffix = format!("{thread_id}.jsonl");
|
||||
if file_name
|
||||
.to_string_lossy()
|
||||
.ends_with(required_suffix.as_str())
|
||||
let required_plain_suffix = format!("{thread_id}.jsonl");
|
||||
let required_compressed_suffix = format!("{required_plain_suffix}.zst");
|
||||
let file_name_str = file_name.to_string_lossy();
|
||||
if file_name_str.ends_with(required_plain_suffix.as_str())
|
||||
|| file_name_str.ends_with(required_compressed_suffix.as_str())
|
||||
{
|
||||
Ok(file_name)
|
||||
} else {
|
||||
@@ -186,6 +187,7 @@ pub(super) fn git_info_from_parts(
|
||||
|
||||
fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
|
||||
let file_name = path.file_name()?.to_str()?;
|
||||
let file_name = file_name.strip_suffix(".zst").unwrap_or(file_name);
|
||||
let stem = file_name.strip_suffix(".jsonl")?;
|
||||
if stem.len() < 37 {
|
||||
return None;
|
||||
|
||||
@@ -156,9 +156,9 @@ async fn sync_materialized_rollout_path(
|
||||
thread_id: ThreadId,
|
||||
) -> ThreadStoreResult<()> {
|
||||
let rollout_path = rollout_path(store, thread_id).await?;
|
||||
if !tokio::fs::try_exists(rollout_path.as_path())
|
||||
if codex_rollout::existing_rollout_path(rollout_path.as_path())
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
.is_none()
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ async fn sqlite_rollout_path_can_load_history_for_thread(
|
||||
path: &std::path::Path,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
) -> bool {
|
||||
if !tokio::fs::try_exists(path).await.unwrap_or(false) {
|
||||
if codex_rollout::existing_rollout_path(path).await.is_none() {
|
||||
return false;
|
||||
}
|
||||
// SQLite metadata can outlive a moved/recreated rollout path. When history is
|
||||
@@ -101,7 +101,7 @@ pub(super) async fn read_thread_by_rollout_path(
|
||||
include_archived: bool,
|
||||
include_history: bool,
|
||||
) -> ThreadStoreResult<StoredThread> {
|
||||
let path = resolve_requested_rollout_path(store, rollout_path)?;
|
||||
let path = resolve_requested_rollout_path(store, rollout_path).await?;
|
||||
let mut thread = read_thread_from_rollout_path(store, path).await?;
|
||||
if !include_archived && thread.archived_at.is_some() {
|
||||
return Err(ThreadStoreError::InvalidRequest {
|
||||
@@ -128,7 +128,7 @@ pub(super) async fn read_thread_by_rollout_path(
|
||||
Ok(thread)
|
||||
}
|
||||
|
||||
fn resolve_requested_rollout_path(
|
||||
async fn resolve_requested_rollout_path(
|
||||
store: &LocalThreadStore,
|
||||
rollout_path: std::path::PathBuf,
|
||||
) -> ThreadStoreResult<std::path::PathBuf> {
|
||||
@@ -137,7 +137,15 @@ fn resolve_requested_rollout_path(
|
||||
} else {
|
||||
rollout_path
|
||||
};
|
||||
std::fs::canonicalize(&path).map_err(|err| ThreadStoreError::InvalidRequest {
|
||||
let Some(path) = codex_rollout::existing_rollout_path(path.as_path()).await else {
|
||||
return Err(ThreadStoreError::InvalidRequest {
|
||||
message: format!(
|
||||
"failed to resolve rollout path `{}`: file does not exist",
|
||||
path.display()
|
||||
),
|
||||
});
|
||||
};
|
||||
std::fs::canonicalize(path.as_path()).map_err(|err| ThreadStoreError::InvalidRequest {
|
||||
message: format!("failed to resolve rollout path `{}`: {err}", path.display()),
|
||||
})
|
||||
}
|
||||
@@ -166,11 +174,9 @@ async fn resolve_rollout_path(
|
||||
include_archived: bool,
|
||||
) -> ThreadStoreResult<Option<std::path::PathBuf>> {
|
||||
if let Ok(path) = live_writer::rollout_path(store, thread_id).await
|
||||
&& tokio::fs::try_exists(path.as_path()).await.map_err(|err| {
|
||||
ThreadStoreError::InvalidRequest {
|
||||
message: format!("failed to check rollout path for thread id {thread_id}: {err}"),
|
||||
}
|
||||
})?
|
||||
&& codex_rollout::existing_rollout_path(path.as_path())
|
||||
.await
|
||||
.is_some()
|
||||
&& (include_archived || !rollout_path_is_archived(store.config.codex_home.as_path(), &path))
|
||||
{
|
||||
return Ok(Some(path));
|
||||
@@ -280,6 +286,9 @@ async fn stored_thread_from_sqlite_metadata(
|
||||
.await
|
||||
.ok()
|
||||
.map(|meta_line| meta_line.meta);
|
||||
let rollout_path = codex_rollout::existing_rollout_path(metadata.rollout_path.as_path())
|
||||
.await
|
||||
.unwrap_or_else(|| metadata.rollout_path.clone());
|
||||
let forked_from_id = session_meta.as_ref().and_then(|meta| meta.forked_from_id);
|
||||
let preview = metadata
|
||||
.preview
|
||||
@@ -288,7 +297,7 @@ async fn stored_thread_from_sqlite_metadata(
|
||||
.unwrap_or_default();
|
||||
StoredThread {
|
||||
thread_id: metadata.id,
|
||||
rollout_path: Some(metadata.rollout_path),
|
||||
rollout_path: Some(rollout_path),
|
||||
forked_from_id,
|
||||
preview,
|
||||
name,
|
||||
|
||||
@@ -68,13 +68,14 @@ pub(super) async fn update_thread_metadata(
|
||||
if live_writer::rollout_path(store, thread_id).await.is_ok() {
|
||||
live_writer::persist_thread(store, thread_id).await?;
|
||||
}
|
||||
let resolved_rollout_path =
|
||||
let mut resolved_rollout_path =
|
||||
resolve_rollout_path(store, thread_id, params.include_archived).await?;
|
||||
let name = patch.name;
|
||||
let git_info = patch.git_info;
|
||||
if let Some(memory_mode) = patch.memory_mode {
|
||||
apply_thread_memory_mode(resolved_rollout_path.path.as_path(), thread_id, memory_mode)
|
||||
.await?;
|
||||
refresh_resolved_rollout_path(&mut resolved_rollout_path).await;
|
||||
}
|
||||
|
||||
let state_db_ctx = store.state_db().await;
|
||||
@@ -142,6 +143,7 @@ pub(super) async fn update_thread_metadata(
|
||||
memory_mode.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
refresh_resolved_rollout_path(&mut resolved_rollout_path).await;
|
||||
apply_thread_git_info(store, thread_id, sha, branch, origin_url).await?;
|
||||
}
|
||||
|
||||
@@ -172,6 +174,12 @@ pub(super) async fn update_thread_metadata(
|
||||
Ok(thread)
|
||||
}
|
||||
|
||||
async fn refresh_resolved_rollout_path(resolved: &mut ResolvedRolloutPath) {
|
||||
if let Some(path) = codex_rollout::existing_rollout_path(resolved.path.as_path()).await {
|
||||
resolved.path = path;
|
||||
}
|
||||
}
|
||||
|
||||
async fn apply_metadata_update(
|
||||
store: &LocalThreadStore,
|
||||
thread_id: ThreadId,
|
||||
|
||||
@@ -1086,16 +1086,15 @@ See the Codex keymap documentation for supported actions and examples."
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
let pre_loop_exit_reason = if let Some(latest_version) = upgrade_version {
|
||||
let control = app
|
||||
.handle_event(
|
||||
tui,
|
||||
&mut app_server,
|
||||
AppEvent::InsertHistoryCell(Box::new(UpdateAvailableHistoryCell::new(
|
||||
latest_version,
|
||||
crate::update_action::get_update_action(),
|
||||
))),
|
||||
)
|
||||
.await?;
|
||||
let control = Box::pin(app.handle_event(
|
||||
tui,
|
||||
&mut app_server,
|
||||
AppEvent::InsertHistoryCell(Box::new(UpdateAvailableHistoryCell::new(
|
||||
latest_version,
|
||||
crate::update_action::get_update_action(),
|
||||
))),
|
||||
))
|
||||
.await?;
|
||||
match control {
|
||||
AppRunControl::Continue => None,
|
||||
AppRunControl::Exit(exit_reason) => Some(exit_reason),
|
||||
@@ -1112,7 +1111,7 @@ See the Codex keymap documentation for supported actions and examples."
|
||||
loop {
|
||||
let control = select! {
|
||||
Some(event) = app_event_rx.recv() => {
|
||||
match app.handle_event(tui, &mut app_server, event).await {
|
||||
match Box::pin(app.handle_event(tui, &mut app_server, event)).await {
|
||||
Ok(control) => control,
|
||||
Err(err) => break Err(err),
|
||||
}
|
||||
|
||||
@@ -1231,6 +1231,25 @@ terminal_resize_reflow_max_rows = 9000
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rebuild_config_for_resume_or_fallback_reloads_same_cwd_config() -> Result<()> {
|
||||
let mut app = make_test_app().await;
|
||||
let codex_home = tempdir()?;
|
||||
app.config.codex_home = codex_home.path().to_path_buf().abs();
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
"model = \"freshly-loaded-model\"",
|
||||
)?;
|
||||
let current_cwd = app.config.cwd.clone();
|
||||
|
||||
let resume_config = app
|
||||
.rebuild_config_for_resume_or_fallback(¤t_cwd, current_cwd.to_path_buf())
|
||||
.await?;
|
||||
|
||||
assert_eq!(resume_config.model.as_deref(), Some("freshly-loaded-model"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rebuild_config_for_resume_or_fallback_uses_current_config_on_same_cwd_error()
|
||||
-> Result<()> {
|
||||
|
||||
@@ -14,11 +14,11 @@ use crate::cwd_prompt::CwdPromptOutcome;
|
||||
use crate::cwd_prompt::CwdSelection;
|
||||
use crate::tui::Tui;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_rollout::open_rollout_line_reader;
|
||||
use codex_state::StateRuntime;
|
||||
use codex_utils_path as path_utils;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
|
||||
#[derive(Default)]
|
||||
struct RolloutResumeState {
|
||||
@@ -142,13 +142,11 @@ pub(crate) fn cwds_differ(current_cwd: &Path, session_cwd: &Path) -> bool {
|
||||
}
|
||||
|
||||
async fn read_rollout_resume_state(path: &Path) -> io::Result<RolloutResumeState> {
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let mut reader = open_rollout_line_reader(path).await?;
|
||||
let mut state = RolloutResumeState::default();
|
||||
let mut saw_record = false;
|
||||
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
while let Some(line) = reader.next_line().await? {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
|
||||
Reference in New Issue
Block a user