codex: restore timers after resume history replay (#17380)

This commit is contained in:
Eric Traut
2026-04-10 20:59:16 -07:00
parent 108679a91f
commit 6fe44104f3
2 changed files with 108 additions and 2 deletions

View File

@@ -8,6 +8,7 @@ use codex_core::timers::ThreadTimerTrigger;
use codex_core::timers::TimerDelivery;
use codex_features::Feature;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
@@ -184,6 +185,112 @@ async fn create_timer_rejects_ephemeral_thread() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resume_due_timer_runs_after_history_reconstruction() -> Result<()> {
let server = start_mock_server().await;
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "recorded before resume"),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "timer after resume"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::Timers)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
config
.features
.enable(Feature::Sqlite)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
});
let initial = builder.build(&server).await?;
initial.submit_turn("context before resume").await?;
let db = initial.codex.state_db().expect("state db enabled");
let home = initial.home.clone();
let rollout_path = initial
.codex
.rollout_path()
.expect("rollout path should be materialized");
let thread_id = initial.session_configured.session_id.to_string();
initial.codex.submit(Op::Shutdown).await?;
wait_for_event(&initial.codex, |event| {
matches!(event, EventMsg::ShutdownComplete)
})
.await;
let now = Utc::now().timestamp();
db.create_thread_timer(&codex_state::ThreadTimerCreateParams {
id: "resume-due-timer".to_string(),
thread_id,
source: "external".to_string(),
client_id: "codex-test".to_string(),
trigger_json: r#"{"kind":"delay","seconds":0,"repeat":false}"#.to_string(),
content: "resume timer".to_string(),
instructions: None,
meta_json: "{}".to_string(),
delivery: TimerDelivery::AfterTurn.as_str().to_string(),
created_at: now - 10,
next_run_at: Some(now - 1),
last_run_at: None,
pending_run: false,
})
.await?;
let mut resume_builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::Timers)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
config
.features
.enable(Feature::Sqlite)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
});
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
wait_for_event_with_timeout(
&resumed.codex,
|event| match event {
EventMsg::InjectedMessage(event) => {
event.source == "timer resume-due-timer"
&& event.content == "Timer fired: resume timer"
}
_ => false,
},
Duration::from_secs(20),
)
.await;
wait_for_event_with_timeout(
&resumed.codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
Duration::from_secs(20),
)
.await;
let requests = mock.requests();
assert_eq!(requests.len(), 2);
let resumed_request = &requests[1];
assert!(resumed_request.body_contains_text("context before resume"));
assert!(resumed_request.body_contains_text("recorded before resume"));
assert!(resumed_request.body_contains_text("Timer fired: resume timer"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn list_timers_discovers_externally_inserted_timer() -> Result<()> {
let server = start_mock_server().await;