feat: make rollout recorder reliable against errors (#17214)

The rollout writer now keeps an owned/monitored task handle, returns
real Result acks for flush/persist/shutdown, retries failed flushes by
reopening the rollout file, and keeps buffered items until they are
successfully written. Session flushes are now real durability barriers
for fork/rollback/read-after-write paths, while turn completion surfaces
a warning if the rollout still cannot be saved after recovery.
This commit is contained in:
jif-oai
2026-04-10 14:12:33 +01:00
committed by GitHub
parent 085ffb4456
commit 8035cb03f1
10 changed files with 536 additions and 191 deletions

View File

@@ -1313,7 +1313,11 @@ async fn fork_startup_context_then_first_turn_diff_snapshot() -> anyhow::Result<
// Forking reads the persisted rollout JSONL, so force the completed source turn to disk
// before snapshotting from it.
initial.codex.ensure_rollout_materialized().await;
initial.codex.flush_rollout().await;
initial
.codex
.flush_rollout()
.await
.expect("source rollout should flush before fork");
let mut fork_config = initial.config.clone();
fork_config.permissions.approval_policy =
@@ -2359,7 +2363,10 @@ async fn attach_rollout_recorder(session: &Arc<Session>) -> PathBuf {
*rollout = Some(recorder);
}
session.ensure_rollout_materialized().await;
session.flush_rollout().await;
session
.flush_rollout()
.await
.expect("attached rollout should flush");
rollout_path
}
@@ -4422,7 +4429,7 @@ async fn record_context_updates_and_set_reference_context_item_persists_baseline
.expect("serialize expected context item")
);
session.ensure_rollout_materialized().await;
session.flush_rollout().await;
session.flush_rollout().await.expect("rollout should flush");
let InitialHistory::Resumed(resumed) = RolloutRecorder::get_rollout_history(&rollout_path)
.await
@@ -4524,7 +4531,7 @@ async fn record_context_updates_and_set_reference_context_item_persists_full_rei
.record_context_updates_and_set_reference_context_item(&turn_context)
.await;
session.ensure_rollout_materialized().await;
session.flush_rollout().await;
session.flush_rollout().await.expect("rollout should flush");
let InitialHistory::Resumed(resumed) = RolloutRecorder::get_rollout_history(&rollout_path)
.await