From 0e821b380a012a14f3dac3c729d17e20efeb63c3 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Wed, 6 May 2026 19:32:24 +0200 Subject: [PATCH] rollout: coalesce thread updated_at touches (#21367) ## Why Metadata-irrelevant rollout events currently refresh `threads.updated_at` on every flush. That keeps thread recency accurate, but it also turns high-frequency agent output into unnecessary SQLite writes. Recency only needs to advance periodically during an active session, while the final suppressed touch still needs to be persisted before shutdown. ## What changed - coalesce touch-only `updated_at` writes in the rollout writer, with a short production interval between persisted touches - retain the latest suppressed touch and flush it during shutdown so the thread is not left stale - extend rollout recorder coverage for coalesced touches, delayed refresh, shutdown flushing, and the existing missing-thread fallback path ## Verification - Added regression coverage in `rollout/src/recorder_tests.rs` for coalescing and shutdown flushing behavior. --------- Co-authored-by: Codex --- codex-rs/rollout/src/recorder.rs | 56 +++++++++++- codex-rs/rollout/src/recorder_tests.rs | 114 ++++++++++++++++++++++++- 2 files changed, 165 insertions(+), 5 deletions(-) diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 7a5e28a5a3..6b549b745c 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -8,7 +8,10 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::Mutex; +use std::time::Duration; +use std::time::Instant; +use chrono::DateTime; use chrono::SecondsFormat; use chrono::Utc; use codex_protocol::ThreadId; @@ -1430,9 +1433,28 @@ struct RolloutWriterState { state_builder: Option, default_provider: String, generate_memories: bool, + thread_updated_at_touch: ThreadUpdatedAtTouch, last_logged_error: Option, } +#[cfg(not(test))] +const THREAD_UPDATED_AT_TOUCH_INTERVAL: Duration = Duration::from_secs(5); +#[cfg(test)] +const THREAD_UPDATED_AT_TOUCH_INTERVAL: Duration = Duration::from_millis(50); + +#[derive(Default)] +struct ThreadUpdatedAtTouch { + last_persisted_at: Option, + pending_touch: Option<(ThreadId, DateTime)>, +} + +impl ThreadUpdatedAtTouch { + fn mark_persisted(&mut self, now: Instant) { + self.last_persisted_at = Some(now); + self.pending_touch = None; + } +} + impl RolloutWriterState { #[allow(clippy::too_many_arguments)] fn new( @@ -1460,6 +1482,7 @@ impl RolloutWriterState { state_builder, default_provider, generate_memories, + thread_updated_at_touch: ThreadUpdatedAtTouch::default(), last_logged_error: None, } } @@ -1492,7 +1515,19 @@ impl RolloutWriterState { if self.is_deferred() && self.pending_items.is_empty() { return Ok(()); } - self.write_pending_with_recovery("shutdown").await + self.write_pending_with_recovery("shutdown").await?; + if let Some((thread_id, updated_at)) = self.thread_updated_at_touch.pending_touch.take() + && state_db::touch_thread_updated_at( + self.state_db_ctx.as_deref(), + Some(thread_id), + updated_at, + "rollout_writer_shutdown", + ) + .await + { + self.thread_updated_at_touch.mark_persisted(Instant::now()); + } + Ok(()) } async fn write_pending_with_recovery(&mut self, operation: &str) -> std::io::Result<()> { @@ -1569,6 +1604,7 @@ impl RolloutWriterState { &mut self.state_builder, self.default_provider.as_str(), self.generate_memories, + &mut self.thread_updated_at_touch, ) .await?; self.meta = None; @@ -1612,6 +1648,7 @@ impl RolloutWriterState { written_items.as_slice(), self.default_provider.as_str(), /*new_thread_memory_mode*/ None, + &mut self.thread_updated_at_touch, ) .await; } @@ -1683,6 +1720,7 @@ async fn write_session_meta( state_builder: &mut Option, default_provider: &str, generate_memories: bool, + thread_updated_at_touch: &mut ThreadUpdatedAtTouch, ) -> std::io::Result<()> { let git_info = collect_git_info(cwd).await.map(|info| ProtocolGitInfo { commit_hash: info.commit_hash, @@ -1708,6 +1746,7 @@ async fn write_session_meta( std::slice::from_ref(&rollout_item), default_provider, (!generate_memories).then_some("disabled"), + thread_updated_at_touch, ) .await; Ok(()) @@ -1720,8 +1759,10 @@ async fn sync_thread_state_after_write( items: &[RolloutItem], default_provider: &str, new_thread_memory_mode: Option<&str>, + thread_updated_at_touch: &mut ThreadUpdatedAtTouch, ) { let updated_at = Utc::now(); + let now = Instant::now(); if new_thread_memory_mode.is_some() || items .iter() @@ -1738,15 +1779,27 @@ async fn sync_thread_state_after_write( Some(updated_at), ) .await; + thread_updated_at_touch.mark_persisted(now); return; } let thread_id = state_builder .map(|builder| builder.id) .or_else(|| metadata::builder_from_items(items, rollout_path).map(|builder| builder.id)); + if thread_updated_at_touch + .last_persisted_at + .is_some_and(|last_persisted_at| { + now.duration_since(last_persisted_at) < THREAD_UPDATED_AT_TOUCH_INTERVAL + }) + { + thread_updated_at_touch.pending_touch = thread_id.map(|thread_id| (thread_id, updated_at)); + return; + } + if state_db::touch_thread_updated_at(state_db_ctx, thread_id, updated_at, "rollout_writer") .await { + thread_updated_at_touch.mark_persisted(now); return; } state_db::apply_rollout_items( @@ -1760,6 +1813,7 @@ async fn sync_thread_state_after_write( Some(updated_at), ) .await; + thread_updated_at_touch.mark_persisted(now); } /// Append one already-filtered rollout item to an existing rollout JSONL file. diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 35018b657d..505cd59929 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -469,7 +469,7 @@ async fn writer_state_retries_write_error_before_reporting_flush_success() -> st } #[tokio::test] -async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Result<()> { +async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); @@ -519,8 +519,6 @@ async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Resu let initial_title = initial_thread.title.clone(); let initial_first_user_message = initial_thread.first_user_message.clone(); - tokio::time::sleep(Duration::from_secs(1)).await; - recorder .record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage( AgentMessageEvent { @@ -538,17 +536,123 @@ async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Resu .expect("thread should load after agent message") .expect("thread should still exist"); - assert!(updated_thread.updated_at > initial_updated_at); + assert_eq!(updated_thread.updated_at, initial_updated_at); assert_eq!(updated_thread.title, initial_title); assert_eq!( updated_thread.first_user_message, initial_first_user_message ); + tokio::time::sleep(THREAD_UPDATED_AT_TOUCH_INTERVAL + Duration::from_millis(10)).await; + + recorder + .record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage( + AgentMessageEvent { + message: "more assistant text".to_string(), + phase: None, + memory_citation: None, + }, + ))]) + .await?; + recorder.flush().await?; + + let refreshed_thread = state_db + .get_thread(thread_id) + .await + .expect("thread should load after refresh") + .expect("thread should still exist"); + assert!(refreshed_thread.updated_at > initial_updated_at); + assert_eq!(refreshed_thread.title, initial_title); + assert_eq!( + refreshed_thread.first_user_message, + initial_first_user_message + ); + recorder.shutdown().await?; Ok(()) } +#[tokio::test] +async fn shutdown_flushes_pending_metadata_irrelevant_updated_at() -> std::io::Result<()> { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + + let state_db = StateRuntime::init(home.path().to_path_buf(), config.model_provider_id.clone()) + .await + .expect("state db should initialize"); + state_db + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); + + let thread_id = ThreadId::new(); + let recorder = RolloutRecorder::new( + &config, + RolloutRecorderParams::new( + thread_id, + /*forked_from_id*/ None, + SessionSource::Cli, + /*thread_source*/ None, + BaseInstructions::default(), + Vec::new(), + EventPersistenceMode::Limited, + ), + Some(state_db.clone()), + /*state_builder*/ None, + ) + .await?; + + recorder + .record_items(&[RolloutItem::EventMsg(EventMsg::UserMessage( + UserMessageEvent { + message: "first-user-message".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + ))]) + .await?; + recorder.persist().await?; + recorder.flush().await?; + let initial_updated_at = state_db + .get_thread(thread_id) + .await + .expect("thread should load") + .expect("thread should exist") + .updated_at; + + recorder + .record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage( + AgentMessageEvent { + message: "assistant text".to_string(), + phase: None, + memory_citation: None, + }, + ))]) + .await?; + recorder.flush().await?; + assert_eq!( + state_db + .get_thread(thread_id) + .await + .expect("thread should load before shutdown") + .expect("thread should still exist") + .updated_at, + initial_updated_at + ); + + recorder.shutdown().await?; + + let shutdown_updated_at = state_db + .get_thread(thread_id) + .await + .expect("thread should load after shutdown") + .expect("thread should still exist") + .updated_at; + assert!(shutdown_updated_at > initial_updated_at); + Ok(()) +} + #[tokio::test] async fn metadata_irrelevant_events_fall_back_to_upsert_when_thread_missing() -> std::io::Result<()> { @@ -574,6 +678,7 @@ async fn metadata_irrelevant_events_fall_back_to_upsert_when_thread_missing() -> }, ))]; + let mut thread_updated_at_touch = ThreadUpdatedAtTouch::default(); sync_thread_state_after_write( Some(state_db.as_ref()), rollout_path.as_path(), @@ -581,6 +686,7 @@ async fn metadata_irrelevant_events_fall_back_to_upsert_when_thread_missing() -> items.as_slice(), config.model_provider_id.as_str(), /*new_thread_memory_mode*/ None, + &mut thread_updated_at_touch, ) .await;