diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 6afa9d9d1b..78010ae5ed 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4419,7 +4419,16 @@ impl Session { } let mut idle_pending_input = self.idle_pending_input.lock().await; - idle_pending_input.extend(items); + for item in items { + if let Some(timer_source) = item.generated_timer_source() + && idle_pending_input + .iter() + .any(|queued| queued.generated_timer_source() == Some(timer_source)) + { + continue; + } + idle_pending_input.push(item); + } } pub(crate) async fn take_queued_pending_input_for_next_turn(&self) -> Vec { diff --git a/codex-rs/core/src/codex/timer_runtime.rs b/codex-rs/core/src/codex/timer_runtime.rs index 4c6461e1b1..2fb84291e9 100644 --- a/codex-rs/core/src/codex/timer_runtime.rs +++ b/codex-rs/core/src/codex/timer_runtime.rs @@ -45,6 +45,17 @@ const TIMER_CLIENT_ID_FALLBACK: &str = "codex-cli"; const TIMER_DB_SYNC_INTERVAL: Duration = Duration::from_secs(15); const TIMER_DB_MAX_REFRESH_INTERVAL: Duration = Duration::from_secs(60); +enum PendingMessageStart { + Started, + NotReady, + None, +} + +enum PendingMessageClaim { + Claimed(PendingInputItem, TimerDelivery), + NotReady, +} + fn db_timer_to_persisted_timer(row: codex_state::ThreadTimer) -> Option { let trigger = match serde_json::from_str(&row.trigger_json) { Ok(trigger) => trigger, @@ -202,8 +213,9 @@ impl Session { { return; } - if self.maybe_start_pending_message().await { - return; + match self.maybe_start_pending_message().await { + PendingMessageStart::Started | PendingMessageStart::NotReady => return, + PendingMessageStart::None => {} } self.try_start_pending_timer(IdleRecurringTimerPolicy::IncludeAll) .await; @@ -252,9 +264,12 @@ impl Session { true } - async fn maybe_start_pending_message(self: &Arc) -> bool { - let Some((input_item, delivery)) = self.claim_next_message_for_delivery().await else { - return false; + async fn maybe_start_pending_message(self: &Arc) -> PendingMessageStart { + let Some(claim) = self.claim_next_message_for_delivery().await else { + return PendingMessageStart::None; + }; + let PendingMessageClaim::Claimed(input_item, delivery) = claim else { + return PendingMessageStart::NotReady; }; match delivery { @@ -275,12 +290,10 @@ impl Session { } } *self.timer_start_in_progress.lock().await = false; - true + PendingMessageStart::Started } - async fn claim_next_message_for_delivery( - self: &Arc, - ) -> Option<(PendingInputItem, TimerDelivery)> { + async fn claim_next_message_for_delivery(self: &Arc) -> Option { if !self.queued_messages_feature_enabled() { return None; } @@ -351,7 +364,7 @@ impl Session { item: message_prompt_input_item(&message_context), injected_event: injected_message_event(&message_context), }); - return Some((input_item, delivery)); + return Some(PendingMessageClaim::Claimed(input_item, delivery)); } Some(codex_state::ThreadMessageClaim::Invalid { id, reason }) => { warn!("dropped invalid queued message {id}: {reason}"); @@ -359,7 +372,7 @@ impl Session { } Some(codex_state::ThreadMessageClaim::NotReady) | None => { *self.timer_start_in_progress.lock().await = false; - return None; + return claim.map(|_| PendingMessageClaim::NotReady); } } } @@ -630,6 +643,11 @@ impl Session { let mut last_full_refresh = tokio::time::Instant::now(); let mut interval = tokio::time::interval(TIMER_DB_SYNC_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + if let Some(session) = weak.upgrade() { + session.sync_timers_from_db(/*emit_update*/ true).await; + session.maybe_start_pending_timer().await; + last_full_refresh = tokio::time::Instant::now(); + } loop { tokio::select! { diff --git a/codex-rs/core/src/pending_input.rs b/codex-rs/core/src/pending_input.rs index 14f5080f8b..f090611807 100644 --- a/codex-rs/core/src/pending_input.rs +++ b/codex-rs/core/src/pending_input.rs @@ -14,6 +14,17 @@ pub(crate) enum PendingInputItem { } impl PendingInputItem { + pub(crate) fn generated_timer_source(&self) -> Option<&str> { + match self { + Self::Plain(_) => None, + Self::GeneratedMessage(generated) => generated + .injected_event + .source + .starts_with("timer ") + .then_some(generated.injected_event.source.as_str()), + } + } + #[cfg(test)] pub(crate) fn into_model_input(self) -> ResponseInputItem { match self { diff --git a/codex-rs/core/tests/suite/timers.rs b/codex-rs/core/tests/suite/timers.rs index 5046cdb63d..12d1123bc7 100644 --- a/codex-rs/core/tests/suite/timers.rs +++ b/codex-rs/core/tests/suite/timers.rs @@ -326,11 +326,6 @@ async fn queued_message_runs_after_idle_recurring_timer() -> Result<()> { ev_assistant_message("msg-2", "queued turn"), ev_completed("resp-2"), ]), - sse(vec![ - ev_response_created("resp-3"), - ev_assistant_message("msg-3", "next timer turn"), - ev_completed("resp-3"), - ]), ], ) .await; @@ -351,19 +346,8 @@ async fn queued_message_runs_after_idle_recurring_timer() -> Result<()> { }); let test = builder.build(&server).await?; let db = test.codex.state_db().expect("state db enabled"); - let thread_id = test.session_configured.session_id.to_string(); - db.create_thread_message(&codex_state::ThreadMessageCreateParams::new( - thread_id, - "external".to_string(), - "queued hello".to_string(), - /*instructions*/ None, - "{}".to_string(), - TimerDelivery::AfterTurn.as_str().to_string(), - Utc::now().timestamp(), - )) - .await?; - - test.codex + let timer = test + .codex .create_timer( ThreadTimerTrigger::Delay { seconds: 0, @@ -378,6 +362,26 @@ async fn queued_message_runs_after_idle_recurring_timer() -> Result<()> { ) .await .map_err(|err| anyhow!("{err}"))?; + wait_for_event_with_timeout( + &test.codex, + |event| match event { + EventMsg::InjectedMessage(event) => event.source == format!("timer {}", timer.id), + _ => false, + }, + Duration::from_secs(20), + ) + .await; + let thread_id = test.session_configured.session_id.to_string(); + db.create_thread_message(&codex_state::ThreadMessageCreateParams::new( + thread_id, + "external".to_string(), + "queued hello".to_string(), + /*instructions*/ None, + "{}".to_string(), + TimerDelivery::AfterTurn.as_str().to_string(), + Utc::now().timestamp(), + )) + .await?; wait_for_event_with_timeout( &test.codex, @@ -385,6 +389,22 @@ async fn queued_message_runs_after_idle_recurring_timer() -> Result<()> { Duration::from_secs(20), ) .await; + assert!( + test.codex + .delete_timer(&timer.id) + .await + .map_err(|err| anyhow!("{err}"))?, + "test should delete the idle recurring timer before it can schedule another turn" + ); + wait_for_event_with_timeout( + &test.codex, + |event| match event { + EventMsg::InjectedMessage(event) => event.source == "external", + _ => false, + }, + Duration::from_secs(20), + ) + .await; wait_for_event_with_timeout( &test.codex, |event| matches!(event, EventMsg::TurnComplete(_)),