From 639382609f8a96b4bf255fa2e735e8fb1aca4531 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Wed, 22 Apr 2026 11:16:17 +0100 Subject: [PATCH] fix: wait_agent timeout for queued mailbox mail (#18968) ## Why `wait_agent` can be called while mailbox mail is already pending. The previous implementation subscribed for future mailbox sequence changes and then waited for the next notification. If the mail was queued before that wait started, no new notification arrived, so the tool could sit until `timeout_ms` even though mail was ready to deliver. ## What Changed - Added `Session::has_pending_mailbox_items()` for checking pending mailbox mail through the session API. - Updated `multi_agents_v2::wait` to return immediately when pending mailbox mail already exists before sleeping on a new mailbox sequence update. - Reworked the regression coverage in `multi_agents_tests.rs` so already queued mailbox mail must wake `wait_agent` promptly. Relevant code: - [`wait_agent` pending-mail check](https://github.com/openai/codex/blob/aa8ca06e83cf2a3dc22f86f37caec6cc2d9533ea/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs#L55-L60) - [`Session::has_pending_mailbox_items`](https://github.com/openai/codex/blob/aa8ca06e83cf2a3dc22f86f37caec6cc2d9533ea/codex-rs/core/src/session/mod.rs#L2979-L2981) - [`multi_agent_v2_wait_agent_returns_for_already_queued_mail`](https://github.com/openai/codex/blob/aa8ca06e83cf2a3dc22f86f37caec6cc2d9533ea/codex-rs/core/src/tools/handlers/multi_agents_tests.rs#L2854) ## Verification - `cargo test -p codex-core multi_agent_v2_wait_agent_returns_for_already_queued_mail` --- codex-rs/core/src/session/mod.rs | 6 ++- .../src/tools/handlers/multi_agents_tests.rs | 49 ++++++------------- .../tools/handlers/multi_agents_v2/wait.rs | 8 ++- 3 files changed, 25 insertions(+), 38 deletions(-) diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 2efb59cdfa..d06666dd19 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -2976,6 +2976,10 @@ impl Session { self.mailbox_rx.lock().await.has_pending_trigger_turn() } + pub(crate) async fn has_pending_mailbox_items(&self) -> bool { + self.mailbox_rx.lock().await.has_pending() + } + #[expect( clippy::await_holding_invalid_type, reason = "active turn checks and turn state updates must remain atomic" @@ -3075,7 +3079,7 @@ impl Session { if !accepts_mailbox_delivery { return false; } - self.mailbox_rx.lock().await.has_pending() + self.has_pending_mailbox_items().await } pub async fn interrupt_task(self: &Arc) { diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index b886664e35..d7117dcc4d 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -2851,7 +2851,7 @@ async fn multi_agent_v2_wait_agent_returns_summary_for_mailbox_activity() { } #[tokio::test] -async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() { +async fn multi_agent_v2_wait_agent_returns_for_already_queued_mail() { let (mut session, mut turn) = make_session_and_context().await; let manager = thread_manager(); let root = manager @@ -2896,46 +2896,25 @@ async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() { .expect("worker path"); session.enqueue_mailbox_communication(InterAgentCommunication::new( - worker_path.clone(), + worker_path, AgentPath::root(), Vec::new(), "already queued".to_string(), /*trigger_turn*/ false, )); - let wait_task = tokio::spawn({ - let session = session.clone(); - let turn = turn.clone(); - async move { - WaitAgentHandlerV2 - .handle(invocation( - session, - turn, - "wait_agent", - function_payload(json!({"timeout_ms": 1000})), - )) - .await - } - }); - tokio::task::yield_now().await; - tokio::time::sleep(Duration::from_millis(50)).await; - assert!( - !wait_task.is_finished(), - "mail already queued before wait should not wake wait_agent" - ); - - session.enqueue_mailbox_communication(InterAgentCommunication::new( - worker_path, - AgentPath::root(), - Vec::new(), - "new mail".to_string(), - /*trigger_turn*/ false, - )); - - let output = wait_task - .await - .expect("wait task should join") - .expect("wait_agent should succeed"); + let output = timeout( + Duration::from_millis(500), + WaitAgentHandlerV2.handle(invocation( + session, + turn, + "wait_agent", + function_payload(json!({"timeout_ms": 1000})), + )), + ) + .await + .expect("already queued mail should complete wait_agent immediately") + .expect("wait_agent should succeed"); let (content, success) = expect_text_output(output); let result: crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult = serde_json::from_str(&content).expect("wait_agent result should be json"); diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs index 777fe1c2e7..e50c6cab23 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs @@ -52,8 +52,12 @@ impl ToolHandler for Handler { ) .await; - let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64); - let timed_out = !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await; + let timed_out = if session.has_pending_mailbox_items().await { + false + } else { + let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64); + !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await + }; let result = WaitAgentResult::from_timed_out(timed_out); session