[codex] make shutdown pending-touch test deterministic (#21550)

## What changed
- rewrote `shutdown_flushes_pending_metadata_irrelevant_updated_at` to
seed an existing pending `updated_at` touch directly in
`RolloutWriterState`
- kept the shutdown test focused on draining a pending touch, leaving
the separate coalescing test to cover timing-based deferral

## Why
The old test had to complete several async operations inside the 50 ms
test-only coalescing window. When that sequence took longer, the second
flush updated `threads.updated_at` immediately and the pre-shutdown
equality assertion failed, even though shutdown behavior was correct.

## Validation
- `cargo test -p codex-rollout
shutdown_flushes_pending_metadata_irrelevant_updated_at`
- `cargo test -p codex-rollout`

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
jif-oai
2026-05-08 10:48:45 +02:00
committed by GitHub
parent 71d80f9a14
commit d9feaffffb

View File

@@ -648,70 +648,48 @@ async fn shutdown_flushes_pending_metadata_irrelevant_updated_at() -> std::io::R
.expect("backfill should be complete");
let thread_id = ThreadId::new();
let recorder = RolloutRecorder::new(
&config,
RolloutRecorderParams::new(
thread_id,
/*forked_from_id*/ None,
SessionSource::Cli,
/*thread_source*/ None,
BaseInstructions::default(),
Vec::new(),
EventPersistenceMode::Limited,
),
Some(state_db.clone()),
/*state_builder*/ None,
)
.await?;
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::UserMessage(
UserMessageEvent {
message: "first-user-message".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
},
))])
.await?;
recorder.persist().await?;
recorder.flush().await?;
let initial_updated_at = state_db
.get_thread(thread_id)
let rollout_path = home.path().join("rollout.jsonl");
let initial_updated_at = Utc.with_ymd_and_hms(2026, 5, 7, 7, 37, 8).unwrap();
let builder = ThreadMetadataBuilder::new(
thread_id,
rollout_path.clone(),
initial_updated_at,
SessionSource::Cli,
);
state_db
.upsert_thread(&builder.build(config.model_provider_id.as_str()))
.await
.expect("thread should load")
.expect("thread should exist")
.updated_at;
.expect("thread should be inserted");
File::create(&rollout_path)?;
let rollout_file = std::fs::OpenOptions::new()
.append(true)
.open(&rollout_path)?;
let mut state = RolloutWriterState::new(
Some(tokio::fs::File::from_std(rollout_file)),
/*deferred_log_file_info*/ None,
/*meta*/ None,
home.path().to_path_buf(),
rollout_path,
Some(state_db.clone()),
Some(builder),
config.model_provider_id.clone(),
config.generate_memories,
);
let pending_updated_at = initial_updated_at + chrono::Duration::seconds(1);
state.thread_updated_at_touch.pending_touch = Some((thread_id, pending_updated_at));
state.shutdown().await?;
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
message: "assistant text".to_string(),
phase: None,
memory_citation: None,
},
))])
.await?;
recorder.flush().await?;
assert_eq!(
state_db
.get_thread(thread_id)
.await
.expect("thread should load before shutdown")
.expect("thread should load after shutdown")
.expect("thread should still exist")
.updated_at,
initial_updated_at
pending_updated_at
);
recorder.shutdown().await?;
let shutdown_updated_at = state_db
.get_thread(thread_id)
.await
.expect("thread should load after shutdown")
.expect("thread should still exist")
.updated_at;
assert!(shutdown_updated_at > initial_updated_at);
Ok(())
}