Compare commits

...

2 Commits

Author SHA1 Message Date
Javier Soto
bf5a4924c7 Clean up implementation 2026-03-19 11:42:28 -07:00
Javier Soto
a8b8f8e0e2 [WIP] fix: rollout recorder not adding turns to rollout after an error until next restart
I have been chasing this bug for quite some time. I noticed that
sometimes when I closed the Codex app, or I tried to resume a CLI
thread, that a bunch of messages were missing.

Turns out that this can happen if the rollout recorder hits an error
(for example if you’re out of disk space). After that happens, it will
stop writing to disk.

I’m not confident about the Codex-written fix, but I’m sharing it as-is
at least to highlight the problem and start the conversation!
2026-03-19 11:42:28 -07:00
2 changed files with 250 additions and 65 deletions

View File

@@ -94,11 +94,11 @@ pub enum RolloutRecorderParams {
enum RolloutCmd {
AddItems(Vec<RolloutItem>),
Persist {
ack: oneshot::Sender<()>,
ack: oneshot::Sender<std::io::Result<()>>,
},
/// Ensure all prior writes are processed; respond when flushed.
Flush {
ack: oneshot::Sender<()>,
ack: oneshot::Sender<std::io::Result<()>>,
},
Shutdown {
ack: oneshot::Sender<()>,
@@ -453,7 +453,7 @@ impl RolloutRecorder {
// writes. Using `tokio::fs::File` keeps everything on the async I/O
// driver instead of blocking the runtime.
tokio::task::spawn(rollout_writer(
file,
file.map(JsonlWriter::new),
deferred_log_file_info,
rx,
meta,
@@ -514,6 +514,7 @@ impl RolloutRecorder {
.map_err(|e| IoError::other(format!("failed to queue rollout persist: {e}")))?;
rx.await
.map_err(|e| IoError::other(format!("failed waiting for rollout persist: {e}")))
.and_then(|result| result)
}
/// Flush all queued writes and wait until they are committed by the writer task.
@@ -525,6 +526,7 @@ impl RolloutRecorder {
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
rx.await
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
.and_then(|result| result)
}
pub(crate) async fn load_rollout_items(
@@ -647,6 +649,7 @@ fn truncate_fs_page(
page
}
#[derive(Clone)]
struct LogFileInfo {
/// Full path to the rollout file.
path: PathBuf,
@@ -706,7 +709,7 @@ fn open_log_file(path: &Path) -> std::io::Result<File> {
#[allow(clippy::too_many_arguments)]
async fn rollout_writer(
file: Option<tokio::fs::File>,
file: Option<JsonlWriter>,
mut deferred_log_file_info: Option<LogFileInfo>,
mut rx: mpsc::Receiver<RolloutCmd>,
mut meta: Option<SessionMeta>,
@@ -717,8 +720,8 @@ async fn rollout_writer(
default_provider: String,
generate_memories: bool,
) -> std::io::Result<()> {
let mut writer = file.map(|file| JsonlWriter { file });
let mut buffered_items = Vec::<RolloutItem>::new();
let mut writer = file;
let mut pending_items = Vec::<RolloutItem>::new();
if let Some(builder) = state_builder.as_mut() {
builder.rollout_path = rollout_path.clone();
}
@@ -749,35 +752,49 @@ async fn rollout_writer(
continue;
}
pending_items.extend(items);
if writer.is_none() {
buffered_items.extend(items);
continue;
if meta.is_some() {
continue;
}
match reopen_rollout_writer(&rollout_path) {
Ok(reopened_writer) => writer = Some(reopened_writer),
Err(err) => {
warn!("rollout reopen failed; keeping pending items queued: {err}");
continue;
}
}
}
write_and_reconcile_items(
writer.as_mut(),
items.as_slice(),
pending_items.as_slice(),
&rollout_path,
state_db_ctx.as_deref(),
state_builder.as_ref(),
default_provider.as_str(),
)
.await?;
.await
.map(|()| pending_items.clear())
.unwrap_or_else(|err| {
writer = None;
warn!("rollout write failed; queued items will retry after reopen: {err}");
});
}
RolloutCmd::Persist { ack } => {
if writer.is_none() {
if writer.is_none() || meta.is_some() || !pending_items.is_empty() {
let result = async {
let Some(log_file_info) = deferred_log_file_info.take() else {
return Err(IoError::other(
"deferred rollout recorder missing log file metadata",
));
};
let file = open_log_file(log_file_info.path.as_path())?;
writer = Some(JsonlWriter {
file: tokio::fs::File::from_std(file),
});
if writer.is_none() {
let writer_path = deferred_log_file_info
.as_ref()
.map(|log_file_info| log_file_info.path.as_path())
.unwrap_or(rollout_path.as_path());
let file = open_log_file(writer_path)?;
writer = Some(JsonlWriter::new(tokio::fs::File::from_std(file)));
}
if let Some(session_meta) = meta.take() {
if let Some(session_meta) = meta.clone() {
write_session_meta(
writer.as_mut(),
session_meta,
@@ -789,41 +806,47 @@ async fn rollout_writer(
generate_memories,
)
.await?;
meta = None;
}
if !buffered_items.is_empty() {
if !pending_items.is_empty() {
write_and_reconcile_items(
writer.as_mut(),
buffered_items.as_slice(),
pending_items.as_slice(),
&rollout_path,
state_db_ctx.as_deref(),
state_builder.as_ref(),
default_provider.as_str(),
)
.await?;
buffered_items.clear();
pending_items.clear();
}
deferred_log_file_info = None;
Ok(())
}
.await;
if let Err(err) = result {
let _ = ack.send(());
return Err(err);
writer = None;
warn!("rollout persist failed; keeping writer alive: {err}");
let _ = ack.send(Err(err));
continue;
}
}
let _ = ack.send(());
let _ = ack.send(Ok(()));
}
RolloutCmd::Flush { ack } => {
// Deferred fresh threads may not have an initialized file yet.
if let Some(writer) = writer.as_mut()
&& let Err(e) = writer.file.flush().await
if let Some(current_writer) = writer.as_mut()
&& let Err(e) = current_writer.file.flush().await
{
let _ = ack.send(());
return Err(e);
writer = None;
warn!("rollout flush failed; keeping writer alive: {e}");
let _ = ack.send(Err(e));
continue;
}
let _ = ack.send(());
let _ = ack.send(Ok(()));
}
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
@@ -879,9 +902,7 @@ async fn write_and_reconcile_items(
default_provider: &str,
) -> std::io::Result<()> {
if let Some(writer) = writer.as_mut() {
for item in items {
writer.write_rollout_item(item).await?;
}
writer.write_rollout_items(items).await?;
}
sync_thread_state_after_write(
state_db_ctx,
@@ -956,7 +977,42 @@ struct RolloutLineRef<'a> {
}
impl JsonlWriter {
fn new(file: tokio::fs::File) -> Self {
Self { file }
}
async fn write_rollout_item(&mut self, rollout_item: &RolloutItem) -> std::io::Result<()> {
self.write_rollout_items(std::slice::from_ref(rollout_item))
.await
}
async fn write_rollout_items(&mut self, rollout_items: &[RolloutItem]) -> std::io::Result<()> {
let file_len_before_write = self.file.metadata().await?.len();
let mut json = String::new();
for rollout_item in rollout_items {
json.push_str(&Self::rollout_line_json(rollout_item)?);
json.push('\n');
}
let result = async {
self.file.write_all(json.as_bytes()).await?;
self.file.flush().await
}
.await;
if let Err(err) = result {
if let Err(truncate_err) = self.file.set_len(file_len_before_write).await {
return Err(IoError::other(format!(
"failed to roll back partial rollout write after {err}: {truncate_err}"
)));
}
return Err(err);
}
Ok(())
}
fn rollout_line_json(rollout_item: &RolloutItem) -> std::io::Result<String> {
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
@@ -968,17 +1024,16 @@ impl JsonlWriter {
timestamp,
item: rollout_item,
};
self.write_line(&line).await
}
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
let mut json = serde_json::to_string(item)?;
json.push('\n');
self.file.write_all(json.as_bytes()).await?;
self.file.flush().await?;
Ok(())
serde_json::to_string(&line).map_err(IoError::other)
}
}
fn reopen_rollout_writer(rollout_path: &Path) -> std::io::Result<JsonlWriter> {
open_log_file(rollout_path)
.map(tokio::fs::File::from_std)
.map(JsonlWriter::new)
}
impl From<codex_state::ThreadsPage> for ThreadsPage {
fn from(db_page: codex_state::ThreadsPage) -> Self {
let items = db_page

View File

@@ -3,7 +3,8 @@ use crate::config::ConfigBuilder;
use crate::features::Feature;
use chrono::TimeZone;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::items::AgentMessageContent;
use codex_protocol::items::AgentMessageItem;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::SandboxPolicy;
@@ -51,6 +52,48 @@ fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result<Path
Ok(path)
}
fn test_rollout_agent_message(message: &str) -> RolloutItem {
let mut events = AgentMessageItem::new(&[AgentMessageContent::Text {
text: message.to_string(),
}])
.as_legacy_events();
RolloutItem::EventMsg(events.pop().expect("single agent message event"))
}
async fn queue_test_rollout_agent_message(
tx: &mpsc::Sender<RolloutCmd>,
message: &str,
) -> Result<(), mpsc::error::SendError<RolloutCmd>> {
tx.send(RolloutCmd::AddItems(vec![test_rollout_agent_message(
message,
)]))
.await
}
async fn flush_test_rollout_writer(tx: &mpsc::Sender<RolloutCmd>) -> std::io::Result<()> {
let (flush_tx, flush_rx) = oneshot::channel();
tx.send(RolloutCmd::Flush { ack: flush_tx })
.await
.map_err(|e| IoError::other(format!("flush should queue: {e}")))?;
flush_rx
.await
.map_err(|e| IoError::other(format!("flush ack should be sent: {e}")))?
}
async fn open_read_only_rollout_writer(path: &Path) -> std::io::Result<JsonlWriter> {
std::fs::write(path, "")?;
let file = tokio::fs::OpenOptions::new().read(true).open(path).await?;
Ok(JsonlWriter::new(file))
}
fn assert_rollout_message_count(text: &str, message: &str, expected: usize) {
assert_eq!(
text.matches(message).count(),
expected,
"unexpected rollout count for {message:?}"
);
}
#[tokio::test]
async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
@@ -81,13 +124,7 @@ async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<
);
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
message: "buffered-event".to_string(),
phase: None,
memory_citation: None,
},
))])
.record_items(&[test_rollout_agent_message("buffered-event")])
.await?;
recorder.flush().await?;
assert!(
@@ -138,6 +175,111 @@ async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<
Ok(())
}
#[tokio::test]
async fn rollout_writer_reopens_after_initial_write_error_and_retries_pending_items()
-> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let rollout_path = home.path().join("rollout.jsonl");
let writer = open_read_only_rollout_writer(&rollout_path).await?;
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
let writer_task = tokio::spawn(rollout_writer(
Some(writer),
None,
rx,
None,
home.path().to_path_buf(),
rollout_path.clone(),
None,
None,
"test-provider".to_string(),
false,
));
queue_test_rollout_agent_message(&tx, "first-write-fails")
.await
.expect("first write should queue");
queue_test_rollout_agent_message(&tx, "second-write-succeeds")
.await
.expect("second write should queue after reopen");
flush_test_rollout_writer(&tx).await?;
drop(tx);
let text = std::fs::read_to_string(&rollout_path)?;
assert!(
text.contains("second-write-succeeds"),
"expected the message sent after a failure to show up in the rollout"
);
assert!(
text.contains("first-write-fails"),
"expected the message from the failed batch to be retried into the rollout"
);
writer_task
.await
.expect("writer task should join cleanly")
.expect("writer task should exit cleanly after channel closes");
Ok(())
}
#[tokio::test]
async fn recorder_retries_persist_after_materialization_failure() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.build()
.await?;
let recorder = RolloutRecorder::new(
&config,
RolloutRecorderParams::new(
ThreadId::new(),
None,
SessionSource::Exec,
BaseInstructions::default(),
Vec::new(),
EventPersistenceMode::Limited,
),
None,
None,
)
.await?;
let rollout_path = recorder.rollout_path().to_path_buf();
std::fs::create_dir_all(
rollout_path
.parent()
.expect("rollout path should have a parent directory"),
)?;
std::fs::create_dir(&rollout_path)?;
recorder
.record_items(&[test_rollout_agent_message("buffered-before-failure")])
.await?;
let persist_error = recorder
.persist()
.await
.expect_err("materialization should fail while the rollout path is a directory");
assert!(
persist_error.to_string().contains("Is a directory"),
"expected a real open failure, got: {persist_error}"
);
std::fs::remove_dir(&rollout_path)?;
recorder
.record_items(&[test_rollout_agent_message("buffered-after-failure")])
.await?;
recorder.persist().await?;
recorder.flush().await?;
let text = std::fs::read_to_string(&rollout_path)?;
assert_rollout_message_count(&text, "buffered-before-failure", 1);
assert_rollout_message_count(&text, "buffered-after-failure", 1);
recorder.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
@@ -198,13 +340,7 @@ async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Resu
tokio::time::sleep(Duration::from_secs(1)).await;
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
message: "assistant text".to_string(),
phase: None,
memory_citation: None,
},
))])
.record_items(&[test_rollout_agent_message("assistant text")])
.await?;
recorder.flush().await?;
@@ -249,13 +385,7 @@ async fn metadata_irrelevant_events_fall_back_to_upsert_when_thread_missing() ->
Utc::now(),
SessionSource::Cli,
);
let items = vec![RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
message: "assistant text".to_string(),
phase: None,
memory_citation: None,
},
))];
let items = vec![test_rollout_agent_message("assistant text")];
sync_thread_state_after_write(
Some(state_db.as_ref()),