From d9feaffffbbde12b49f3e6264f63524e7814164f Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 8 May 2026 10:48:45 +0200 Subject: [PATCH] [codex] make shutdown pending-touch test deterministic (#21550) ## What changed - rewrote `shutdown_flushes_pending_metadata_irrelevant_updated_at` to seed an existing pending `updated_at` touch directly in `RolloutWriterState` - kept the shutdown test focused on draining a pending touch, leaving the separate coalescing test to cover timing-based deferral ## Why The old test had to complete several async operations inside the 50 ms test-only coalescing window. When that sequence took longer, the second flush updated `threads.updated_at` immediately and the pre-shutdown equality assertion failed, even though shutdown behavior was correct. ## Validation - `cargo test -p codex-rollout shutdown_flushes_pending_metadata_irrelevant_updated_at` - `cargo test -p codex-rollout` Co-authored-by: Codex --- codex-rs/rollout/src/recorder_tests.rs | 88 ++++++++++---------------- 1 file changed, 33 insertions(+), 55 deletions(-) diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index bf51241f5b..a8b946934b 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -648,70 +648,48 @@ async fn shutdown_flushes_pending_metadata_irrelevant_updated_at() -> std::io::R .expect("backfill should be complete"); let thread_id = ThreadId::new(); - let recorder = RolloutRecorder::new( - &config, - RolloutRecorderParams::new( - thread_id, - /*forked_from_id*/ None, - SessionSource::Cli, - /*thread_source*/ None, - BaseInstructions::default(), - Vec::new(), - EventPersistenceMode::Limited, - ), - Some(state_db.clone()), - /*state_builder*/ None, - ) - .await?; - - recorder - .record_items(&[RolloutItem::EventMsg(EventMsg::UserMessage( - UserMessageEvent { - message: "first-user-message".to_string(), - images: None, - local_images: Vec::new(), - text_elements: Vec::new(), - }, - ))]) - .await?; - recorder.persist().await?; - recorder.flush().await?; - let initial_updated_at = state_db - .get_thread(thread_id) + let rollout_path = home.path().join("rollout.jsonl"); + let initial_updated_at = Utc.with_ymd_and_hms(2026, 5, 7, 7, 37, 8).unwrap(); + let builder = ThreadMetadataBuilder::new( + thread_id, + rollout_path.clone(), + initial_updated_at, + SessionSource::Cli, + ); + state_db + .upsert_thread(&builder.build(config.model_provider_id.as_str())) .await - .expect("thread should load") - .expect("thread should exist") - .updated_at; + .expect("thread should be inserted"); + + File::create(&rollout_path)?; + let rollout_file = std::fs::OpenOptions::new() + .append(true) + .open(&rollout_path)?; + let mut state = RolloutWriterState::new( + Some(tokio::fs::File::from_std(rollout_file)), + /*deferred_log_file_info*/ None, + /*meta*/ None, + home.path().to_path_buf(), + rollout_path, + Some(state_db.clone()), + Some(builder), + config.model_provider_id.clone(), + config.generate_memories, + ); + let pending_updated_at = initial_updated_at + chrono::Duration::seconds(1); + state.thread_updated_at_touch.pending_touch = Some((thread_id, pending_updated_at)); + + state.shutdown().await?; - recorder - .record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage( - AgentMessageEvent { - message: "assistant text".to_string(), - phase: None, - memory_citation: None, - }, - ))]) - .await?; - recorder.flush().await?; assert_eq!( state_db .get_thread(thread_id) .await - .expect("thread should load before shutdown") + .expect("thread should load after shutdown") .expect("thread should still exist") .updated_at, - initial_updated_at + pending_updated_at ); - - recorder.shutdown().await?; - - let shutdown_updated_at = state_db - .get_thread(thread_id) - .await - .expect("thread should load after shutdown") - .expect("thread should still exist") - .updated_at; - assert!(shutdown_updated_at > initial_updated_at); Ok(()) }