Defer persistence of rollout file (#11028)

- Defer rollout persistence for fresh threads (`InitialHistory::New`):
keep rollout events in memory and only materialize rollout file + state
DB row on first `EventMsg::UserMessage`.
- Keep precomputed rollout path available before materialization.
- Change `thread/start` to build thread response from live config
snapshot and optional precomputed path.
- Improve pre-materialization behavior in app-server/TUI: clearer
invalid-request errors for file-backed ops and a friendlier `/fork` “not
ready yet” UX.
- Update tests to match deferred semantics across
start/read/archive/unarchive/fork/resume/review flows.
- Improved resilience of user_shell test, which should be unrelated to
this change but must be affected by timing changes

For Reviewers:
* The primary change is in recorder.rs
* Most of the other changes were to fix up broken assumptions in
existing tests

Testing:
* Manually tested CLI
* Exercised app server paths by manually running IDE Extension with
rebuilt CLI binary
* Only user-visible change is that `/fork` in TUI generates visible
error if used prior to first turn
This commit is contained in:
Eric Traut
2026-02-07 23:05:03 -08:00
committed by GitHub
parent 6d08298f4e
commit b3de6c7f2b
19 changed files with 983 additions and 195 deletions

View File

@@ -314,7 +314,7 @@ impl Codex {
// Resolve base instructions for the session. Priority order:
// 1. config.base_instructions override
// 2. conversation history => session_meta.base_instructions
// 3. base_intructions for current model
// 3. base_instructions for current model
let model_info = models_manager.get_model_info(model.as_str(), &config).await;
let base_instructions = config
.base_instructions
@@ -1211,6 +1211,18 @@ impl Session {
}
}
pub(crate) async fn ensure_rollout_materialized(&self) {
let recorder = {
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder
&& let Err(e) = rec.persist().await
{
warn!("failed to materialize rollout recorder: {e}");
}
}
fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
@@ -1326,6 +1338,10 @@ impl Session {
let mut state = self.state.lock().await;
state.initial_context_seeded = true;
}
// Forked threads should remain file-backed immediately after startup.
self.ensure_rollout_materialized().await;
// Flush after seeding history and any persisted rollout copy.
self.flush_rollout().await;
}
@@ -2347,6 +2363,7 @@ impl Session {
let turn_item = TurnItem::UserMessage(UserMessageItem::new(input));
self.emit_turn_item_started(turn_context, &turn_item).await;
self.emit_turn_item_completed(turn_context, turn_item).await;
self.ensure_rollout_materialized().await;
}
pub(crate) async fn notify_background_event(

View File

@@ -47,6 +47,7 @@ use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_state::StateRuntime;
use codex_state::ThreadMetadataBuilder;
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
@@ -81,6 +82,9 @@ pub enum RolloutRecorderParams {
enum RolloutCmd {
AddItems(Vec<RolloutItem>),
Persist {
ack: oneshot::Sender<()>,
},
/// Ensure all prior writes are processed; respond when flushed.
Flush {
ack: oneshot::Sender<()>,
@@ -279,16 +283,19 @@ impl RolloutRecorder {
}
}
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
/// cannot be created or the rollout file cannot be opened we return the
/// error so the caller can decide whether to disable persistence.
/// Attempt to create a new [`RolloutRecorder`].
///
/// For newly created sessions, this precomputes path/metadata and defers
/// file creation/open until an explicit `persist()` call.
///
/// For resumed sessions, this immediately opens the existing rollout file.
pub async fn new(
config: &Config,
params: RolloutRecorderParams,
state_db_ctx: Option<StateDbHandle>,
state_builder: Option<ThreadMetadataBuilder>,
) -> std::io::Result<Self> {
let (file, rollout_path, meta) = match params {
let (file, deferred_log_file_info, rollout_path, meta) = match params {
RolloutRecorderParams::Create {
conversation_id,
forked_from_id,
@@ -296,47 +303,46 @@ impl RolloutRecorder {
base_instructions,
dynamic_tools,
} => {
let LogFileInfo {
file,
path,
conversation_id: session_id,
timestamp,
} = create_log_file(config, conversation_id)?;
let log_file_info = precompute_log_file_info(config, conversation_id)?;
let path = log_file_info.path.clone();
let session_id = log_file_info.conversation_id;
let started_at = log_file_info.timestamp;
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let timestamp = timestamp
let timestamp = started_at
.to_offset(time::UtcOffset::UTC)
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
(
tokio::fs::File::from_std(file),
path,
Some(SessionMeta {
id: session_id,
forked_from_id,
timestamp,
cwd: config.cwd.clone(),
originator: originator().value,
cli_version: env!("CARGO_PKG_VERSION").to_string(),
source,
model_provider: Some(config.model_provider_id.clone()),
base_instructions: Some(base_instructions),
dynamic_tools: if dynamic_tools.is_empty() {
None
} else {
Some(dynamic_tools)
},
}),
)
let session_meta = SessionMeta {
id: session_id,
forked_from_id,
timestamp,
cwd: config.cwd.clone(),
originator: originator().value,
cli_version: env!("CARGO_PKG_VERSION").to_string(),
source,
model_provider: Some(config.model_provider_id.clone()),
base_instructions: Some(base_instructions),
dynamic_tools: if dynamic_tools.is_empty() {
None
} else {
Some(dynamic_tools)
},
};
(None, Some(log_file_info), path, Some(session_meta))
}
RolloutRecorderParams::Resume { path } => (
tokio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?,
Some(
tokio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?,
),
None,
path,
None,
),
@@ -355,6 +361,7 @@ impl RolloutRecorder {
// driver instead of blocking the runtime.
tokio::task::spawn(rollout_writer(
file,
deferred_log_file_info,
rx,
meta,
cwd,
@@ -398,6 +405,19 @@ impl RolloutRecorder {
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
/// Materialize the rollout file and persist all buffered items.
///
/// This is idempotent; after first materialization, repeated calls are no-ops.
pub async fn persist(&self) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
self.tx
.send(RolloutCmd::Persist { ack: tx })
.await
.map_err(|e| IoError::other(format!("failed to queue rollout persist: {e}")))?;
rx.await
.map_err(|e| IoError::other(format!("failed waiting for rollout persist: {e}")))
}
/// Flush all queued writes and wait until they are committed by the writer task.
pub async fn flush(&self) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
@@ -508,9 +528,6 @@ impl RolloutRecorder {
}
struct LogFileInfo {
/// Opened file handle to the rollout file.
file: File,
/// Full path to the rollout file.
path: PathBuf,
@@ -521,8 +538,11 @@ struct LogFileInfo {
timestamp: OffsetDateTime,
}
fn create_log_file(config: &Config, conversation_id: ThreadId) -> std::io::Result<LogFileInfo> {
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
fn precompute_log_file_info(
config: &Config,
conversation_id: ThreadId,
) -> std::io::Result<LogFileInfo> {
// Resolve ~/.codex/sessions/YYYY/MM/DD path.
let timestamp = OffsetDateTime::now_local()
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
let mut dir = config.codex_home.clone();
@@ -530,7 +550,6 @@ fn create_log_file(config: &Config, conversation_id: ThreadId) -> std::io::Resul
dir.push(timestamp.year().to_string());
dir.push(format!("{:02}", u8::from(timestamp.month())));
dir.push(format!("{:02}", timestamp.day()));
fs::create_dir_all(&dir)?;
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
// compatibility with filesystems that do not allow colons in filenames.
@@ -543,22 +562,32 @@ fn create_log_file(config: &Config, conversation_id: ThreadId) -> std::io::Resul
let filename = format!("rollout-{date_str}-{conversation_id}.jsonl");
let path = dir.join(filename);
let file = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(&path)?;
Ok(LogFileInfo {
file,
path,
conversation_id,
timestamp,
})
}
fn open_log_file(path: &Path) -> std::io::Result<File> {
let Some(parent) = path.parent() else {
return Err(IoError::other(format!(
"rollout path has no parent: {}",
path.display()
)));
};
fs::create_dir_all(parent)?;
std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(path)
}
#[allow(clippy::too_many_arguments)]
async fn rollout_writer(
file: tokio::fs::File,
file: Option<tokio::fs::File>,
mut deferred_log_file_info: Option<LogFileInfo>,
mut rx: mpsc::Receiver<RolloutCmd>,
mut meta: Option<SessionMeta>,
cwd: std::path::PathBuf,
@@ -567,35 +596,27 @@ async fn rollout_writer(
mut state_builder: Option<ThreadMetadataBuilder>,
default_provider: String,
) -> std::io::Result<()> {
let mut writer = JsonlWriter { file };
let mut writer = file.map(|file| JsonlWriter { file });
let mut buffered_items = Vec::<RolloutItem>::new();
if let Some(builder) = state_builder.as_mut() {
builder.rollout_path = rollout_path.clone();
}
// If we have a meta, collect git info asynchronously and write meta first
if let Some(session_meta) = meta.take() {
let git_info = collect_git_info(&cwd).await;
let session_meta_line = SessionMetaLine {
meta: session_meta,
git: git_info,
};
if state_db_ctx.is_some() {
state_builder =
metadata::builder_from_session_meta(&session_meta_line, rollout_path.as_path());
}
// Write the SessionMeta as the first item in the file, wrapped in a rollout line
let rollout_item = RolloutItem::SessionMeta(session_meta_line);
writer.write_rollout_item(&rollout_item).await?;
state_db::reconcile_rollout(
// Resumed sessions already have a file handle open, so session metadata can
// be written immediately if present.
if writer.is_some()
&& let Some(session_meta) = meta.take()
{
write_session_meta(
writer.as_mut(),
session_meta,
&cwd,
&rollout_path,
state_db_ctx.as_deref(),
rollout_path.as_path(),
&mut state_builder,
default_provider.as_str(),
state_builder.as_ref(),
std::slice::from_ref(&rollout_item),
None,
)
.await;
.await?;
}
// Process rollout commands
@@ -605,29 +626,83 @@ async fn rollout_writer(
let mut persisted_items = Vec::new();
for item in items {
if is_persisted_response_item(&item) {
writer.write_rollout_item(&item).await?;
persisted_items.push(item);
}
}
if persisted_items.is_empty() {
continue;
}
if let Some(builder) = state_builder.as_mut() {
builder.rollout_path = rollout_path.clone();
if writer.is_none() {
buffered_items.extend(persisted_items);
continue;
}
state_db::apply_rollout_items(
state_db_ctx.as_deref(),
rollout_path.as_path(),
default_provider.as_str(),
state_builder.as_ref(),
write_and_reconcile_items(
writer.as_mut(),
persisted_items.as_slice(),
"rollout_writer",
&rollout_path,
state_db_ctx.as_deref(),
&mut state_builder,
default_provider.as_str(),
)
.await;
.await?;
}
RolloutCmd::Persist { ack } => {
if writer.is_none() {
let result = async {
let Some(log_file_info) = deferred_log_file_info.take() else {
return Err(IoError::other(
"deferred rollout recorder missing log file metadata",
));
};
let file = open_log_file(log_file_info.path.as_path())?;
writer = Some(JsonlWriter {
file: tokio::fs::File::from_std(file),
});
if let Some(session_meta) = meta.take() {
write_session_meta(
writer.as_mut(),
session_meta,
&cwd,
&rollout_path,
state_db_ctx.as_deref(),
&mut state_builder,
default_provider.as_str(),
)
.await?;
}
if !buffered_items.is_empty() {
write_and_reconcile_items(
writer.as_mut(),
buffered_items.as_slice(),
&rollout_path,
state_db_ctx.as_deref(),
&mut state_builder,
default_provider.as_str(),
)
.await?;
buffered_items.clear();
}
Ok(())
}
.await;
if let Err(err) = result {
let _ = ack.send(());
return Err(err);
}
}
let _ = ack.send(());
}
RolloutCmd::Flush { ack } => {
// Ensure underlying file is flushed and then ack.
if let Err(e) = writer.file.flush().await {
// Deferred fresh threads may not have an initialized file yet.
if let Some(writer) = writer.as_mut()
&& let Err(e) = writer.file.flush().await
{
let _ = ack.send(());
return Err(e);
}
@@ -642,6 +717,68 @@ async fn rollout_writer(
Ok(())
}
async fn write_session_meta(
mut writer: Option<&mut JsonlWriter>,
session_meta: SessionMeta,
cwd: &Path,
rollout_path: &Path,
state_db_ctx: Option<&StateRuntime>,
state_builder: &mut Option<ThreadMetadataBuilder>,
default_provider: &str,
) -> std::io::Result<()> {
let git_info = collect_git_info(cwd).await;
let session_meta_line = SessionMetaLine {
meta: session_meta,
git: git_info,
};
if state_db_ctx.is_some() {
*state_builder = metadata::builder_from_session_meta(&session_meta_line, rollout_path);
}
let rollout_item = RolloutItem::SessionMeta(session_meta_line);
if let Some(writer) = writer.as_mut() {
writer.write_rollout_item(&rollout_item).await?;
}
state_db::reconcile_rollout(
state_db_ctx,
rollout_path,
default_provider,
state_builder.as_ref(),
std::slice::from_ref(&rollout_item),
None,
)
.await;
Ok(())
}
async fn write_and_reconcile_items(
mut writer: Option<&mut JsonlWriter>,
items: &[RolloutItem],
rollout_path: &Path,
state_db_ctx: Option<&StateRuntime>,
state_builder: &mut Option<ThreadMetadataBuilder>,
default_provider: &str,
) -> std::io::Result<()> {
if let Some(writer) = writer.as_mut() {
for item in items {
writer.write_rollout_item(item).await?;
}
}
if let Some(builder) = state_builder.as_mut() {
builder.rollout_path = rollout_path.to_path_buf();
}
state_db::apply_rollout_items(
state_db_ctx,
rollout_path,
default_provider,
state_builder.as_ref(),
items,
"rollout_writer",
)
.await;
Ok(())
}
struct JsonlWriter {
file: tokio::fs::File,
}
@@ -751,3 +888,97 @@ fn cwd_matches(session_cwd: &Path, cwd: &Path) -> bool {
}
session_cwd == cwd
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ConfigBuilder;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::UserMessageEvent;
use tempfile::TempDir;
#[tokio::test]
async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.build()
.await?;
let thread_id = ThreadId::new();
let recorder = RolloutRecorder::new(
&config,
RolloutRecorderParams::new(
thread_id,
None,
SessionSource::Exec,
BaseInstructions::default(),
Vec::new(),
),
None,
None,
)
.await?;
let rollout_path = recorder.rollout_path().to_path_buf();
assert!(
!rollout_path.exists(),
"rollout file should not exist before first user message"
);
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
message: "buffered-event".to_string(),
},
))])
.await?;
recorder.flush().await?;
assert!(
!rollout_path.exists(),
"rollout file should remain deferred before first user message"
);
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::UserMessage(
UserMessageEvent {
message: "first-user-message".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
},
))])
.await?;
recorder.flush().await?;
assert!(
!rollout_path.exists(),
"user-message-like items should not materialize without explicit persist"
);
recorder.persist().await?;
// Second call verifies `persist()` is idempotent after materialization.
recorder.persist().await?;
assert!(rollout_path.exists(), "rollout file should be materialized");
let text = std::fs::read_to_string(&rollout_path)?;
assert!(
text.contains("\"type\":\"session_meta\""),
"expected session metadata in rollout"
);
let buffered_idx = text
.find("buffered-event")
.expect("buffered event in rollout");
let user_idx = text
.find("first-user-message")
.expect("first user message in rollout");
assert!(
buffered_idx < user_idx,
"buffered items should preserve ordering"
);
let text_after_second_persist = std::fs::read_to_string(&rollout_path)?;
assert_eq!(text_after_second_persist, text);
recorder.shutdown().await?;
Ok(())
}
}

View File

@@ -240,6 +240,7 @@ pub(crate) async fn exit_review_mode(
}],
)
.await;
session
.send_event(
ctx.as_ref(),
@@ -260,4 +261,9 @@ pub(crate) async fn exit_review_mode(
},
)
.await;
// Review turns can run before any regular user turn, so explicitly
// materialize rollout persistence. Do this after emitting review output so
// file creation + git metadata collection cannot delay client-facing items.
session.ensure_rollout_materialized().await;
}

View File

@@ -797,6 +797,7 @@ fn build_agent_shared_config(
#[cfg(test)]
mod tests {
use super::*;
use crate::AuthManager;
use crate::CodexAuth;
use crate::ThreadManager;
use crate::agent::MAX_THREAD_SPAWN_DEPTH;
@@ -811,6 +812,10 @@ mod tests {
use crate::protocol::SubAgentSource;
use crate::turn_diff_tracker::TurnDiffTracker;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use pretty_assertions::assert_eq;
use serde::Deserialize;
use serde_json::json;
@@ -1142,7 +1147,22 @@ mod tests {
let manager = thread_manager();
session.services.agent_control = manager.agent_control();
let config = turn.config.as_ref().clone();
let thread = manager.start_thread(config).await.expect("start thread");
let thread = manager
.resume_thread_with_history(
config,
InitialHistory::Forked(vec![RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "materialized".to_string(),
}],
end_turn: None,
phase: None,
})]),
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")),
)
.await
.expect("start thread");
let agent_id = thread.thread_id;
let _ = manager
.agent_control()