Throttle repeated rollout compression runs (#25659)

## Why

[#25089](https://github.com/openai/codex/pull/25089) introduced the
background worker that compresses cold archived rollouts, and
[#25654](https://github.com/openai/codex/pull/25654) made that pass
faster once it starts. But the worker still deleted
`rollout-compression.lock` on successful exit, so the existing six-hour
staleness window only helped with overlapping or crashed workers. Each
new local thread-store initialization could immediately rescan archived
rollouts even if a full pass had just finished.

This change keeps the existing marker around long enough to throttle
redundant reruns. The worker is still best-effort, but it no longer does
repeated startup scans when nothing new is eligible for compression.

## What Changed

- Replace the drop-scoped `CompressionLock` with a
`CompressionRunMarker` that claims the existing
`.tmp/rollout-compression.lock` path and leaves it in place after
success.
- Reuse the existing six-hour staleness window to block both overlapping
starts and immediate reruns, while still letting a stale marker be
reclaimed.
- Update the worker docs and debug logging to describe the new "already
running or recently ran" behavior.
- Extend the rollout compression tests to assert that a successful run
leaves the marker behind and that a fresh marker suppresses a new run.

## Validation

- `just test -p codex-rollout`
This commit is contained in:
jif-oai
2026-06-01 20:46:54 +02:00
committed by GitHub
parent ba2b67f9cd
commit c3cdf3c007
2 changed files with 68 additions and 26 deletions

View File

@@ -24,8 +24,8 @@ static TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
/// Starts a best-effort background job that compresses cold local rollout files.
///
/// The worker is fire-and-forget: failures are logged, startup is not blocked,
/// and a process-wide lock under `codex_home` prevents overlapping compression
/// runs from the same local store.
/// and a run marker under `codex_home` prevents overlapping or too-frequent
/// compression runs from the same local store.
pub fn spawn_rollout_compression_worker(codex_home: PathBuf) {
worker::spawn(codex_home)
}
@@ -246,10 +246,10 @@ mod worker {
const TEMP_SUFFIX: &str = ".tmp";
const COMPRESSION_LEVEL: i32 = 3;
const MIN_ROLLOUT_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60);
const GLOBAL_LOCK_STALE_AFTER: Duration = Duration::from_secs(6 * 60 * 60);
const TEMP_FILE_STALE_AFTER: Duration = GLOBAL_LOCK_STALE_AFTER;
const RUN_MARKER_STALE_AFTER: Duration = Duration::from_secs(6 * 60 * 60);
const TEMP_FILE_STALE_AFTER: Duration = RUN_MARKER_STALE_AFTER;
const WORKER_MAX_RUNTIME: Duration = Duration::from_secs(5 * 60 * 60);
const LOCK_FILE_NAME: &str = "rollout-compression.lock";
const RUN_MARKER_FILE_NAME: &str = "rollout-compression.lock";
const MAX_CONCURRENT_COMPRESSION_JOBS: usize = 2;
#[derive(Default)]
@@ -260,17 +260,18 @@ mod worker {
failed: usize,
}
struct CompressionLock {
pub(super) struct CompressionRunMarker {
path: PathBuf,
remove_on_drop: bool,
}
impl CompressionLock {
fn try_acquire(codex_home: &Path) -> io::Result<Option<Self>> {
let lock_dir = codex_home.join(".tmp");
std::fs::create_dir_all(lock_dir.as_path())?;
let path = lock_dir.join(LOCK_FILE_NAME);
match create_lock_file(path.as_path()) {
Ok(()) => return Ok(Some(Self { path })),
impl CompressionRunMarker {
pub(super) fn try_claim(codex_home: &Path) -> io::Result<Option<Self>> {
let marker_dir = codex_home.join(".tmp");
std::fs::create_dir_all(marker_dir.as_path())?;
let path = marker_dir.join(RUN_MARKER_FILE_NAME);
match create_run_marker_file(path.as_path()) {
Ok(()) => return Ok(Some(Self::new(path))),
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => {}
Err(err) => return Err(err),
}
@@ -279,7 +280,7 @@ mod worker {
.and_then(|metadata| metadata.modified())
.ok()
.and_then(|modified| SystemTime::now().duration_since(modified).ok())
.is_some_and(|age| age >= GLOBAL_LOCK_STALE_AFTER);
.is_some_and(|age| age >= RUN_MARKER_STALE_AFTER);
if !stale {
return Ok(None);
}
@@ -288,17 +289,30 @@ mod worker {
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(err),
}
match create_lock_file(path.as_path()) {
Ok(()) => Ok(Some(Self { path })),
match create_run_marker_file(path.as_path()) {
Ok(()) => Ok(Some(Self::new(path))),
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => Ok(None),
Err(err) => Err(err),
}
}
fn new(path: PathBuf) -> Self {
Self {
path,
remove_on_drop: true,
}
}
pub(super) fn persist(mut self) {
self.remove_on_drop = false;
}
}
impl Drop for CompressionLock {
impl Drop for CompressionRunMarker {
fn drop(&mut self) {
let _ = std::fs::remove_file(self.path.as_path());
if self.remove_on_drop {
let _ = std::fs::remove_file(self.path.as_path());
}
}
}
@@ -321,9 +335,9 @@ mod worker {
}
pub(super) async fn run(codex_home: PathBuf) -> io::Result<()> {
let Some(_lock) = CompressionLock::try_acquire(codex_home.as_path())? else {
let Some(marker) = CompressionRunMarker::try_claim(codex_home.as_path())? else {
debug!(
"rollout compression worker already running for {}",
"rollout compression worker recently ran or is already running for {}",
codex_home.display()
);
return Ok(());
@@ -340,10 +354,11 @@ mod worker {
"rollout compression worker finished: scanned={}, compressed={}, skipped={}, failed={}",
stats.scanned, stats.compressed, stats.skipped, stats.failed
);
marker.persist();
Ok(())
}
fn create_lock_file(path: &Path) -> io::Result<()> {
fn create_run_marker_file(path: &Path) -> io::Result<()> {
let mut file = std::fs::OpenOptions::new()
.write(true)
.create_new(true)

View File

@@ -141,6 +141,12 @@ async fn worker_compresses_old_archived_rollouts_only() -> anyhow::Result<()> {
assert!(!compressed_rollout_path(&fresh_path).exists());
assert!(!stale_temp.exists());
assert!(fresh_temp.exists());
assert!(
home.path()
.join(".tmp")
.join("rollout-compression.lock")
.exists()
);
Ok(())
}
@@ -323,16 +329,16 @@ async fn worker_skips_existing_compressed_archived_rollouts() -> anyhow::Result<
}
#[tokio::test]
async fn worker_skips_when_fresh_lock_exists() -> anyhow::Result<()> {
async fn worker_skips_when_fresh_run_marker_exists() -> anyhow::Result<()> {
let home = TempDir::new()?;
let uuid = Uuid::from_u128(11);
let thread_id = ThreadId::from_string(&uuid.to_string())?;
let rollout_path = archived_rollout_path(home.path(), "2025-01-03T12-00-00", uuid);
write_rollout(&rollout_path, thread_id, "locked worker")?;
write_rollout(&rollout_path, thread_id, "throttled worker")?;
set_old_mtime(&rollout_path)?;
let lock_dir = home.path().join(".tmp");
fs::create_dir_all(lock_dir.as_path())?;
fs::write(lock_dir.join("rollout-compression.lock"), "locked")?;
let marker_dir = home.path().join(".tmp");
fs::create_dir_all(marker_dir.as_path())?;
fs::write(marker_dir.join("rollout-compression.lock"), "recent run")?;
worker::run(home.path().to_path_buf()).await?;
@@ -341,6 +347,27 @@ async fn worker_skips_when_fresh_lock_exists() -> anyhow::Result<()> {
Ok(())
}
#[test]
fn run_marker_is_removed_unless_persisted() -> anyhow::Result<()> {
let home = TempDir::new()?;
let marker_path = home.path().join(".tmp").join("rollout-compression.lock");
{
let marker = worker::CompressionRunMarker::try_claim(home.path())?;
assert!(marker.is_some());
}
assert!(!marker_path.exists());
let marker = worker::CompressionRunMarker::try_claim(home.path())?;
let Some(marker) = marker else {
panic!("expected run marker claim");
};
marker.persist();
assert!(marker_path.exists());
assert!(worker::CompressionRunMarker::try_claim(home.path())?.is_none());
Ok(())
}
#[tokio::test]
async fn find_thread_path_by_id_handles_compressed_rollout_filenames() -> anyhow::Result<()> {
let home = TempDir::new()?;