rollout: coalesce thread updated_at touches (#21367)

## Why

Metadata-irrelevant rollout events currently refresh
`threads.updated_at` on every flush. That keeps thread recency accurate,
but it also turns high-frequency agent output into unnecessary SQLite
writes. Recency only needs to advance periodically during an active
session, while the final suppressed touch still needs to be persisted
before shutdown.

## What changed

- coalesce touch-only `updated_at` writes in the rollout writer, with a
short production interval between persisted touches
- retain the latest suppressed touch and flush it during shutdown so the
thread is not left stale
- extend rollout recorder coverage for coalesced touches, delayed
refresh, shutdown flushing, and the existing missing-thread fallback
path

## Verification

- Added regression coverage in `rollout/src/recorder_tests.rs` for
coalescing and shutdown flushing behavior.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
jif-oai
2026-05-06 19:32:24 +02:00
committed by GitHub
parent 2070d5bfd3
commit 0e821b380a
2 changed files with 165 additions and 5 deletions

View File

@@ -8,7 +8,10 @@ use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use chrono::DateTime;
use chrono::SecondsFormat;
use chrono::Utc;
use codex_protocol::ThreadId;
@@ -1430,9 +1433,28 @@ struct RolloutWriterState {
state_builder: Option<ThreadMetadataBuilder>,
default_provider: String,
generate_memories: bool,
thread_updated_at_touch: ThreadUpdatedAtTouch,
last_logged_error: Option<String>,
}
#[cfg(not(test))]
const THREAD_UPDATED_AT_TOUCH_INTERVAL: Duration = Duration::from_secs(5);
#[cfg(test)]
const THREAD_UPDATED_AT_TOUCH_INTERVAL: Duration = Duration::from_millis(50);
#[derive(Default)]
struct ThreadUpdatedAtTouch {
last_persisted_at: Option<Instant>,
pending_touch: Option<(ThreadId, DateTime<Utc>)>,
}
impl ThreadUpdatedAtTouch {
fn mark_persisted(&mut self, now: Instant) {
self.last_persisted_at = Some(now);
self.pending_touch = None;
}
}
impl RolloutWriterState {
#[allow(clippy::too_many_arguments)]
fn new(
@@ -1460,6 +1482,7 @@ impl RolloutWriterState {
state_builder,
default_provider,
generate_memories,
thread_updated_at_touch: ThreadUpdatedAtTouch::default(),
last_logged_error: None,
}
}
@@ -1492,7 +1515,19 @@ impl RolloutWriterState {
if self.is_deferred() && self.pending_items.is_empty() {
return Ok(());
}
self.write_pending_with_recovery("shutdown").await
self.write_pending_with_recovery("shutdown").await?;
if let Some((thread_id, updated_at)) = self.thread_updated_at_touch.pending_touch.take()
&& state_db::touch_thread_updated_at(
self.state_db_ctx.as_deref(),
Some(thread_id),
updated_at,
"rollout_writer_shutdown",
)
.await
{
self.thread_updated_at_touch.mark_persisted(Instant::now());
}
Ok(())
}
async fn write_pending_with_recovery(&mut self, operation: &str) -> std::io::Result<()> {
@@ -1569,6 +1604,7 @@ impl RolloutWriterState {
&mut self.state_builder,
self.default_provider.as_str(),
self.generate_memories,
&mut self.thread_updated_at_touch,
)
.await?;
self.meta = None;
@@ -1612,6 +1648,7 @@ impl RolloutWriterState {
written_items.as_slice(),
self.default_provider.as_str(),
/*new_thread_memory_mode*/ None,
&mut self.thread_updated_at_touch,
)
.await;
}
@@ -1683,6 +1720,7 @@ async fn write_session_meta(
state_builder: &mut Option<ThreadMetadataBuilder>,
default_provider: &str,
generate_memories: bool,
thread_updated_at_touch: &mut ThreadUpdatedAtTouch,
) -> std::io::Result<()> {
let git_info = collect_git_info(cwd).await.map(|info| ProtocolGitInfo {
commit_hash: info.commit_hash,
@@ -1708,6 +1746,7 @@ async fn write_session_meta(
std::slice::from_ref(&rollout_item),
default_provider,
(!generate_memories).then_some("disabled"),
thread_updated_at_touch,
)
.await;
Ok(())
@@ -1720,8 +1759,10 @@ async fn sync_thread_state_after_write(
items: &[RolloutItem],
default_provider: &str,
new_thread_memory_mode: Option<&str>,
thread_updated_at_touch: &mut ThreadUpdatedAtTouch,
) {
let updated_at = Utc::now();
let now = Instant::now();
if new_thread_memory_mode.is_some()
|| items
.iter()
@@ -1738,15 +1779,27 @@ async fn sync_thread_state_after_write(
Some(updated_at),
)
.await;
thread_updated_at_touch.mark_persisted(now);
return;
}
let thread_id = state_builder
.map(|builder| builder.id)
.or_else(|| metadata::builder_from_items(items, rollout_path).map(|builder| builder.id));
if thread_updated_at_touch
.last_persisted_at
.is_some_and(|last_persisted_at| {
now.duration_since(last_persisted_at) < THREAD_UPDATED_AT_TOUCH_INTERVAL
})
{
thread_updated_at_touch.pending_touch = thread_id.map(|thread_id| (thread_id, updated_at));
return;
}
if state_db::touch_thread_updated_at(state_db_ctx, thread_id, updated_at, "rollout_writer")
.await
{
thread_updated_at_touch.mark_persisted(now);
return;
}
state_db::apply_rollout_items(
@@ -1760,6 +1813,7 @@ async fn sync_thread_state_after_write(
Some(updated_at),
)
.await;
thread_updated_at_touch.mark_persisted(now);
}
/// Append one already-filtered rollout item to an existing rollout JSONL file.

View File

@@ -469,7 +469,7 @@ async fn writer_state_retries_write_error_before_reporting_flush_success() -> st
}
#[tokio::test]
async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Result<()> {
async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
@@ -519,8 +519,6 @@ async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Resu
let initial_title = initial_thread.title.clone();
let initial_first_user_message = initial_thread.first_user_message.clone();
tokio::time::sleep(Duration::from_secs(1)).await;
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
@@ -538,17 +536,123 @@ async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Resu
.expect("thread should load after agent message")
.expect("thread should still exist");
assert!(updated_thread.updated_at > initial_updated_at);
assert_eq!(updated_thread.updated_at, initial_updated_at);
assert_eq!(updated_thread.title, initial_title);
assert_eq!(
updated_thread.first_user_message,
initial_first_user_message
);
tokio::time::sleep(THREAD_UPDATED_AT_TOUCH_INTERVAL + Duration::from_millis(10)).await;
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
message: "more assistant text".to_string(),
phase: None,
memory_citation: None,
},
))])
.await?;
recorder.flush().await?;
let refreshed_thread = state_db
.get_thread(thread_id)
.await
.expect("thread should load after refresh")
.expect("thread should still exist");
assert!(refreshed_thread.updated_at > initial_updated_at);
assert_eq!(refreshed_thread.title, initial_title);
assert_eq!(
refreshed_thread.first_user_message,
initial_first_user_message
);
recorder.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn shutdown_flushes_pending_metadata_irrelevant_updated_at() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let state_db = StateRuntime::init(home.path().to_path_buf(), config.model_provider_id.clone())
.await
.expect("state db should initialize");
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
let thread_id = ThreadId::new();
let recorder = RolloutRecorder::new(
&config,
RolloutRecorderParams::new(
thread_id,
/*forked_from_id*/ None,
SessionSource::Cli,
/*thread_source*/ None,
BaseInstructions::default(),
Vec::new(),
EventPersistenceMode::Limited,
),
Some(state_db.clone()),
/*state_builder*/ None,
)
.await?;
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::UserMessage(
UserMessageEvent {
message: "first-user-message".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
},
))])
.await?;
recorder.persist().await?;
recorder.flush().await?;
let initial_updated_at = state_db
.get_thread(thread_id)
.await
.expect("thread should load")
.expect("thread should exist")
.updated_at;
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
message: "assistant text".to_string(),
phase: None,
memory_citation: None,
},
))])
.await?;
recorder.flush().await?;
assert_eq!(
state_db
.get_thread(thread_id)
.await
.expect("thread should load before shutdown")
.expect("thread should still exist")
.updated_at,
initial_updated_at
);
recorder.shutdown().await?;
let shutdown_updated_at = state_db
.get_thread(thread_id)
.await
.expect("thread should load after shutdown")
.expect("thread should still exist")
.updated_at;
assert!(shutdown_updated_at > initial_updated_at);
Ok(())
}
#[tokio::test]
async fn metadata_irrelevant_events_fall_back_to_upsert_when_thread_missing() -> std::io::Result<()>
{
@@ -574,6 +678,7 @@ async fn metadata_irrelevant_events_fall_back_to_upsert_when_thread_missing() ->
},
))];
let mut thread_updated_at_touch = ThreadUpdatedAtTouch::default();
sync_thread_state_after_write(
Some(state_db.as_ref()),
rollout_path.as_path(),
@@ -581,6 +686,7 @@ async fn metadata_irrelevant_events_fall_back_to_upsert_when_thread_missing() ->
items.as_slice(),
config.model_provider_id.as_str(),
/*new_thread_memory_mode*/ None,
&mut thread_updated_at_touch,
)
.await;