From afa0101ae222792c1c7e072107e1eb7b72c19b7b Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Mon, 18 May 2026 15:43:01 -0700 Subject: [PATCH] [codex] Move pending input into input queue (#22728) ## Why Pending model input was split across `Session`, `TurnState`, and the agent mailbox. That made it easy for new paths to manage queued user input or mailbox delivery outside the intended ownership boundary. This PR consolidates the model-facing input lifecycle behind the session input queue so turn-local pending input, next-turn queued items, and mailbox delivery coordination are owned in one place. ## What Changed - Added `session/input_queue.rs` to own pending input queues and mailbox delivery coordination. - Removed the standalone `agent/mailbox.rs` channel wrapper and store mailbox items directly in the input queue. - Moved pending-input mutations off `TurnState`; `TurnState` now exposes the queue-owned storage directly for now. - Routed abort cleanup, mailbox delivery phase changes, next-turn queued items, and active-turn pending input through `InputQueue`. - Boxed stack-heavy agent resume/fork startup futures that the refactor pushed over the default test stack. - Updated session, task, goal, stream-event, and multi-agent call sites and tests to use the new queue ownership. ## Verification - `cargo test -p codex-core --lib agent::control::tests` - `cargo test -p codex-core --lib agent::control::tests::resume_closed_child_reopens_open_descendants -- --exact` - `cargo test -p codex-core --lib agent::control::tests::spawn_agent_fork_last_n_turns_keeps_only_recent_turns -- --exact` - `cargo test -p codex-core --lib agent::control::tests::resume_thread_subagent_restores_stored_nickname_and_role -- --exact` - `cargo test -p codex-core` was also run; it completed with 1814 passed, 4 ignored, and one timeout in `agent::control::tests::resume_thread_subagent_restores_stored_nickname_and_role`, which passed when rerun in isolation. --- codex-rs/core/src/agent/control.rs | 13 +- codex-rs/core/src/agent/control_tests.rs | 8 +- codex-rs/core/src/agent/mailbox.rs | 161 ------- codex-rs/core/src/agent/mod.rs | 3 - codex-rs/core/src/codex_thread.rs | 1 + codex-rs/core/src/goals.rs | 24 +- codex-rs/core/src/session/handlers.rs | 8 +- codex-rs/core/src/session/input_queue.rs | 392 ++++++++++++++++++ codex-rs/core/src/session/mod.rs | 189 +-------- codex-rs/core/src/session/session.rs | 13 +- codex-rs/core/src/session/tests.rs | 129 +++--- codex-rs/core/src/session/turn.rs | 13 +- codex-rs/core/src/state/turn.rs | 42 +- codex-rs/core/src/stream_events_utils.rs | 9 +- codex-rs/core/src/tasks/mod.rs | 41 +- codex-rs/core/src/tasks/regular.rs | 2 +- .../src/tools/handlers/multi_agents_tests.rs | 85 ++-- .../tools/handlers/multi_agents_v2/wait.rs | 14 +- 18 files changed, 623 insertions(+), 524 deletions(-) delete mode 100644 codex-rs/core/src/agent/mailbox.rs create mode 100644 codex-rs/core/src/session/input_queue.rs diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 71bc026a13..a246f0084f 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -499,13 +499,12 @@ impl AgentControl { agent_nickname: None, agent_role: None, }); - match self - .resume_single_agent_from_rollout( - config.clone(), - child_thread_id, - child_session_source, - ) - .await + match Box::pin(self.resume_single_agent_from_rollout( + config.clone(), + child_thread_id, + child_session_source, + )) + .await { Ok(_) => true, Err(err) => { diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index b95aad4489..e5883f60da 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -486,7 +486,13 @@ async fn send_inter_agent_communication_without_turn_queues_message_without_trig timeout(Duration::from_secs(5), async { loop { - if thread.codex.session.has_pending_input().await { + if thread + .codex + .session + .input_queue + .has_pending_input(&thread.codex.session.active_turn) + .await + { break; } sleep(Duration::from_millis(10)).await; diff --git a/codex-rs/core/src/agent/mailbox.rs b/codex-rs/core/src/agent/mailbox.rs deleted file mode 100644 index c328236475..0000000000 --- a/codex-rs/core/src/agent/mailbox.rs +++ /dev/null @@ -1,161 +0,0 @@ -use codex_protocol::protocol::InterAgentCommunication; -use std::collections::VecDeque; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; -use tokio::sync::mpsc; -use tokio::sync::watch; - -#[cfg(test)] -use codex_protocol::AgentPath; - -pub(crate) struct Mailbox { - tx: mpsc::UnboundedSender, - next_seq: AtomicU64, - seq_tx: watch::Sender, -} - -pub(crate) struct MailboxReceiver { - rx: mpsc::UnboundedReceiver, - pending_mails: VecDeque, -} - -impl Mailbox { - pub(crate) fn new() -> (Self, MailboxReceiver) { - let (tx, rx) = mpsc::unbounded_channel(); - let (seq_tx, _) = watch::channel(0); - ( - Self { - tx, - next_seq: AtomicU64::new(0), - seq_tx, - }, - MailboxReceiver { - rx, - pending_mails: VecDeque::new(), - }, - ) - } - - pub(crate) fn subscribe(&self) -> watch::Receiver { - self.seq_tx.subscribe() - } - - pub(crate) fn send(&self, communication: InterAgentCommunication) -> u64 { - let seq = self.next_seq.fetch_add(1, Ordering::Relaxed) + 1; - let _ = self.tx.send(communication); - self.seq_tx.send_replace(seq); - seq - } -} - -impl MailboxReceiver { - fn sync_pending_mails(&mut self) { - while let Ok(mail) = self.rx.try_recv() { - self.pending_mails.push_back(mail); - } - } - - pub(crate) fn has_pending(&mut self) -> bool { - self.sync_pending_mails(); - !self.pending_mails.is_empty() - } - - pub(crate) fn has_pending_trigger_turn(&mut self) -> bool { - self.sync_pending_mails(); - self.pending_mails.iter().any(|mail| mail.trigger_turn) - } - - pub(crate) fn drain(&mut self) -> Vec { - self.sync_pending_mails(); - self.pending_mails.drain(..).collect() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use pretty_assertions::assert_eq; - - fn make_mail( - author: AgentPath, - recipient: AgentPath, - content: &str, - trigger_turn: bool, - ) -> InterAgentCommunication { - InterAgentCommunication::new( - author, - recipient, - Vec::new(), - content.to_string(), - trigger_turn, - ) - } - - #[tokio::test] - async fn mailbox_assigns_monotonic_sequence_numbers() { - let (mailbox, _receiver) = Mailbox::new(); - let mut seq_rx = mailbox.subscribe(); - - let seq_a = mailbox.send(make_mail( - AgentPath::root(), - AgentPath::try_from("/root/worker").expect("agent path"), - "one", - /*trigger_turn*/ false, - )); - let seq_b = mailbox.send(make_mail( - AgentPath::root(), - AgentPath::try_from("/root/worker").expect("agent path"), - "two", - /*trigger_turn*/ false, - )); - - seq_rx.changed().await.expect("first seq update"); - assert_eq!(*seq_rx.borrow(), seq_b); - assert_eq!(seq_a, 1); - assert_eq!(seq_b, 2); - } - - #[tokio::test] - async fn mailbox_drains_in_delivery_order() { - let (mailbox, mut receiver) = Mailbox::new(); - let mail_one = make_mail( - AgentPath::root(), - AgentPath::try_from("/root/worker").expect("agent path"), - "one", - /*trigger_turn*/ false, - ); - let mail_two = make_mail( - AgentPath::try_from("/root/worker").expect("agent path"), - AgentPath::root(), - "two", - /*trigger_turn*/ false, - ); - - mailbox.send(mail_one.clone()); - mailbox.send(mail_two.clone()); - - assert_eq!(receiver.drain(), vec![mail_one, mail_two]); - assert!(!receiver.has_pending()); - } - - #[tokio::test] - async fn mailbox_tracks_pending_trigger_turn_mail() { - let (mailbox, mut receiver) = Mailbox::new(); - - mailbox.send(make_mail( - AgentPath::root(), - AgentPath::try_from("/root/worker").expect("agent path"), - "queued", - /*trigger_turn*/ false, - )); - assert!(!receiver.has_pending_trigger_turn()); - - mailbox.send(make_mail( - AgentPath::root(), - AgentPath::try_from("/root/worker").expect("agent path"), - "wake", - /*trigger_turn*/ true, - )); - assert!(receiver.has_pending_trigger_turn()); - } -} diff --git a/codex-rs/core/src/agent/mod.rs b/codex-rs/core/src/agent/mod.rs index a60fc3004a..350962dc08 100644 --- a/codex-rs/core/src/agent/mod.rs +++ b/codex-rs/core/src/agent/mod.rs @@ -1,14 +1,11 @@ pub(crate) mod agent_resolver; pub(crate) mod control; -pub(crate) mod mailbox; mod registry; pub(crate) mod role; pub(crate) mod status; pub(crate) use codex_protocol::protocol::AgentStatus; pub(crate) use control::AgentControl; -pub(crate) use mailbox::Mailbox; -pub(crate) use mailbox::MailboxReceiver; pub(crate) use registry::exceeds_thread_spawn_depth_limit; pub(crate) use registry::next_thread_spawn_depth; pub(crate) use status::agent_status_from_event; diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 7e9e6e38cd..8f5023300c 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -383,6 +383,7 @@ impl CodexThread { { self.codex .session + .input_queue .queue_response_items_for_next_turn(items) .await; self.codex.session.maybe_start_turn_for_pending_work().await; diff --git a/codex-rs/core/src/goals.rs b/codex-rs/core/src/goals.rs index 18b7c77bda..13f8cc95c8 100644 --- a/codex-rs/core/src/goals.rs +++ b/codex-rs/core/src/goals.rs @@ -1331,12 +1331,9 @@ impl Session { .await; return; } - { - let mut turn_state = turn_state.lock().await; - for item in candidate.items { - turn_state.push_pending_input(item); - } - } + self.input_queue + .extend_pending_input_for_turn_state(turn_state.as_ref(), candidate.items) + .await; let turn_context = self .new_default_turn_with_sub_id(uuid::Uuid::new_v4().to_string()) @@ -1374,11 +1371,15 @@ impl Session { tracing::debug!("skipping active goal continuation because a turn is already active"); return None; } - if self.has_queued_response_items_for_next_turn().await { + if self + .input_queue + .has_queued_response_items_for_next_turn() + .await + { tracing::debug!("skipping active goal continuation because queued input exists"); return None; } - if self.has_trigger_turn_mailbox_items().await { + if self.input_queue.has_trigger_turn_mailbox_items().await { tracing::debug!( "skipping active goal continuation because trigger-turn mailbox input is pending" ); @@ -1415,8 +1416,11 @@ impl Session { return None; } if self.active_turn.lock().await.is_some() - || self.has_queued_response_items_for_next_turn().await - || self.has_trigger_turn_mailbox_items().await + || self + .input_queue + .has_queued_response_items_for_next_turn() + .await + || self.input_queue.has_trigger_turn_mailbox_items().await { tracing::debug!("skipping active goal continuation because pending work appeared"); return None; diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index c2ffe2f4e4..eb27ae78be 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -319,7 +319,9 @@ pub async fn inter_agent_communication( communication: InterAgentCommunication, ) { let trigger_turn = communication.trigger_turn; - sess.enqueue_mailbox_communication(communication); + sess.input_queue + .enqueue_mailbox_communication(communication) + .await; if trigger_turn { sess.maybe_start_turn_for_pending_work_with_sub_id(sub_id) .await; @@ -961,7 +963,9 @@ Approved action: }]; if let Err(items) = sess.inject_response_items(items).await { - sess.queue_response_items_for_next_turn(items).await; + sess.input_queue + .queue_response_items_for_next_turn(items) + .await; } } diff --git a/codex-rs/core/src/session/input_queue.rs b/codex-rs/core/src/session/input_queue.rs new file mode 100644 index 0000000000..2e8a6ded9d --- /dev/null +++ b/codex-rs/core/src/session/input_queue.rs @@ -0,0 +1,392 @@ +use crate::state::ActiveTurn; +use crate::state::MailboxDeliveryPhase; +use crate::state::TurnState; +use codex_protocol::models::ResponseInputItem; +use codex_protocol::protocol::InterAgentCommunication; +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::sync::watch; + +/// Turn-local pending input storage owned by the input queue flow. +#[derive(Default)] +pub(crate) struct TurnInputQueue { + items: Vec, +} + +/// Session-scoped pending input storage and active-turn mailbox delivery coordination. +pub(crate) struct InputQueue { + mailbox_tx: watch::Sender<()>, + mailbox_pending_mails: Mutex>, + + idle_pending_input: Mutex>, +} + +impl InputQueue { + pub(crate) fn new() -> Self { + let (mailbox_tx, _) = watch::channel(()); + Self { + mailbox_tx, + mailbox_pending_mails: Mutex::new(VecDeque::new()), + idle_pending_input: Mutex::new(Vec::new()), + } + } + + pub(crate) async fn subscribe_mailbox(&self) -> watch::Receiver<()> { + let mut mailbox_rx = self.mailbox_tx.subscribe(); + if self.has_pending_mailbox_items().await { + mailbox_rx.mark_changed(); + } + mailbox_rx + } + + pub(crate) async fn enqueue_mailbox_communication( + &self, + communication: InterAgentCommunication, + ) { + self.mailbox_pending_mails + .lock() + .await + .push_back(communication); + self.mailbox_tx.send_replace(()); + } + + pub(crate) async fn has_pending_mailbox_items(&self) -> bool { + !self.mailbox_pending_mails.lock().await.is_empty() + } + + pub(crate) async fn has_trigger_turn_mailbox_items(&self) -> bool { + self.mailbox_pending_mails + .lock() + .await + .iter() + .any(|mail| mail.trigger_turn) + } + + pub(crate) async fn drain_mailbox_input_items(&self) -> Vec { + self.mailbox_pending_mails + .lock() + .await + .drain(..) + .map(|mail| mail.to_response_input_item()) + .collect() + } + + pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec) { + if items.is_empty() { + return; + } + + self.idle_pending_input.lock().await.extend(items); + } + + pub(crate) async fn take_queued_response_items_for_next_turn(&self) -> Vec { + std::mem::take(&mut *self.idle_pending_input.lock().await) + } + + pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool { + !self.idle_pending_input.lock().await.is_empty() + } + + pub(crate) async fn turn_state_for_sub_id( + &self, + active_turn: &Mutex>, + sub_id: &str, + ) -> Option>> { + let active = active_turn.lock().await; + active.as_ref().and_then(|active_turn| { + active_turn + .tasks + .contains_key(sub_id) + .then(|| Arc::clone(&active_turn.turn_state)) + }) + } + + /// Clear any pending waiters and input buffered for the current turn. + pub(crate) async fn clear_pending(&self, active_turn: &ActiveTurn) { + let mut turn_state = active_turn.turn_state.lock().await; + turn_state.clear_pending_waiters(); + turn_state.pending_input.items.clear(); + } + + pub(crate) async fn defer_mailbox_delivery_to_next_turn( + &self, + active_turn: &Mutex>, + sub_id: &str, + ) { + let turn_state = self.turn_state_for_sub_id(active_turn, sub_id).await; + let Some(turn_state) = turn_state else { + return; + }; + let mut turn_state = turn_state.lock().await; + if !turn_state.pending_input.items.is_empty() { + return; + } + turn_state.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn); + } + + pub(crate) async fn accept_mailbox_delivery_for_current_turn( + &self, + active_turn: &Mutex>, + sub_id: &str, + ) { + let turn_state = self.turn_state_for_sub_id(active_turn, sub_id).await; + let Some(turn_state) = turn_state else { + return; + }; + self.accept_mailbox_delivery_for_turn_state(turn_state.as_ref()) + .await; + } + + pub(super) async fn accept_mailbox_delivery_for_turn_state( + &self, + turn_state: &Mutex, + ) { + turn_state + .lock() + .await + .accept_mailbox_delivery_for_current_turn(); + } + + pub(super) async fn push_pending_input_and_accept_mailbox_delivery_for_turn_state( + &self, + turn_state: &Mutex, + input: ResponseInputItem, + ) { + let mut turn_state = turn_state.lock().await; + turn_state.pending_input.items.push(input); + turn_state.accept_mailbox_delivery_for_current_turn(); + } + + pub(crate) async fn extend_pending_input_for_turn_state( + &self, + turn_state: &Mutex, + input: Vec, + ) { + turn_state.lock().await.pending_input.items.extend(input); + } + + pub(crate) async fn take_pending_input_for_turn_state( + &self, + turn_state: &Mutex, + ) -> Vec { + turn_state.lock().await.pending_input.items.split_off(0) + } + + #[expect( + clippy::await_holding_invalid_type, + reason = "active turn checks and turn state updates must remain atomic" + )] + pub(crate) async fn inject_response_items( + &self, + active_turn: &Mutex>, + input: Vec, + ) -> Result<(), Vec> { + let mut active = active_turn.lock().await; + match active.as_mut() { + Some(active_turn) => { + active_turn + .turn_state + .lock() + .await + .pending_input + .items + .extend(input); + Ok(()) + } + None => Err(input), + } + } + + #[expect( + clippy::await_holding_invalid_type, + reason = "active turn checks and turn state updates must remain atomic" + )] + pub(crate) async fn prepend_pending_input( + &self, + active_turn: &Mutex>, + mut input: Vec, + ) -> Result<(), ()> { + let mut active = active_turn.lock().await; + match active.as_mut() { + Some(active_turn) => { + let mut turn_state = active_turn.turn_state.lock().await; + if !input.is_empty() { + let pending_input = &mut turn_state.pending_input; + input.append(&mut pending_input.items); + pending_input.items = input; + } + Ok(()) + } + None => Err(()), + } + } + + #[expect( + clippy::await_holding_invalid_type, + reason = "active turn checks and turn state updates must remain atomic" + )] + pub(crate) async fn get_pending_input( + &self, + active_turn: &Mutex>, + ) -> Vec { + let (pending_input, accepts_mailbox_delivery) = { + let mut active = active_turn.lock().await; + match active.as_mut() { + Some(active_turn) => { + let mut turn_state = active_turn.turn_state.lock().await; + ( + turn_state.pending_input.items.split_off(0), + turn_state.accepts_mailbox_delivery_for_current_turn(), + ) + } + None => (Vec::new(), true), + } + }; + if !accepts_mailbox_delivery { + return pending_input; + } + let mailbox_items = self.drain_mailbox_input_items().await; + if pending_input.is_empty() { + mailbox_items + } else if mailbox_items.is_empty() { + pending_input + } else { + let mut pending_input = pending_input; + pending_input.extend(mailbox_items); + pending_input + } + } + + #[expect( + clippy::await_holding_invalid_type, + reason = "active turn checks and turn state reads must remain atomic" + )] + pub(crate) async fn has_pending_input(&self, active_turn: &Mutex>) -> bool { + let (has_turn_pending_input, accepts_mailbox_delivery) = { + let active = active_turn.lock().await; + match active.as_ref() { + Some(active_turn) => { + let turn_state = active_turn.turn_state.lock().await; + ( + !turn_state.pending_input.items.is_empty(), + turn_state.accepts_mailbox_delivery_for_current_turn(), + ) + } + None => (false, true), + } + }; + if has_turn_pending_input { + return true; + } + if !accepts_mailbox_delivery { + return false; + } + self.has_pending_mailbox_items().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use codex_protocol::AgentPath; + use pretty_assertions::assert_eq; + + fn make_mail( + author: AgentPath, + recipient: AgentPath, + content: &str, + trigger_turn: bool, + ) -> InterAgentCommunication { + InterAgentCommunication::new( + author, + recipient, + Vec::new(), + content.to_string(), + trigger_turn, + ) + } + + #[tokio::test] + async fn input_queue_notifies_mailbox_subscribers() { + let input_queue = InputQueue::new(); + let mut mailbox_rx = input_queue.subscribe_mailbox().await; + + input_queue + .enqueue_mailbox_communication(make_mail( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + "one", + /*trigger_turn*/ false, + )) + .await; + input_queue + .enqueue_mailbox_communication(make_mail( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + "two", + /*trigger_turn*/ false, + )) + .await; + + mailbox_rx.changed().await.expect("mailbox update"); + } + + #[tokio::test] + async fn input_queue_drains_mailbox_in_delivery_order() { + let input_queue = InputQueue::new(); + let mail_one = make_mail( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + "one", + /*trigger_turn*/ false, + ); + let mail_two = make_mail( + AgentPath::try_from("/root/worker").expect("agent path"), + AgentPath::root(), + "two", + /*trigger_turn*/ false, + ); + + input_queue + .enqueue_mailbox_communication(mail_one.clone()) + .await; + input_queue + .enqueue_mailbox_communication(mail_two.clone()) + .await; + + assert_eq!( + input_queue.drain_mailbox_input_items().await, + vec![ + mail_one.to_response_input_item(), + mail_two.to_response_input_item() + ] + ); + assert!(!input_queue.has_pending_mailbox_items().await); + } + + #[tokio::test] + async fn input_queue_tracks_pending_trigger_turn_mail() { + let input_queue = InputQueue::new(); + + input_queue + .enqueue_mailbox_communication(make_mail( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + "queued", + /*trigger_turn*/ false, + )) + .await; + assert!(!input_queue.has_trigger_turn_mailbox_items().await); + + input_queue + .enqueue_mailbox_communication(make_mail( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + "wake", + /*trigger_turn*/ true, + )) + .await; + assert!(input_queue.has_trigger_turn_mailbox_items().await); + } +} diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 02d9e75d4d..4db6c97e4b 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -10,8 +10,6 @@ use std::time::UNIX_EPOCH; use crate::agent::AgentControl; use crate::agent::AgentStatus; -use crate::agent::Mailbox; -use crate::agent::MailboxReceiver; use crate::agent::agent_status_from_event; use crate::agent::status::is_final; use crate::attestation::AttestationProvider; @@ -193,6 +191,7 @@ use codex_protocol::exec_output::StreamOutput; mod config_lock; mod handlers; +mod input_queue; mod mcp; mod multi_agents; mod review; @@ -206,6 +205,7 @@ use self::config_lock::validate_config_lock_if_configured; #[cfg(test)] use self::handlers::submission_dispatch_span; use self::handlers::submission_loop; +pub(crate) use self::input_queue::TurnInputQueue; use self::review::spawn_review_thread; use self::session::AppServerClientMetadata; use self::session::Session; @@ -289,8 +289,6 @@ use crate::rollout::map_session_init_error; use crate::session_startup_prewarm::SessionStartupPrewarmHandle; use crate::shell; use crate::shell_snapshot::ShellSnapshot; -use crate::state::ActiveTurn; -use crate::state::MailboxDeliveryPhase; use crate::state::PendingRequestPermissions; use crate::state::SessionServices; use crate::state::SessionState; @@ -3163,195 +3161,36 @@ impl Session { .set_responsesapi_client_metadata(responsesapi_client_metadata); } - let mut turn_state = active_turn.turn_state.lock().await; - turn_state.push_pending_input(input.into()); - turn_state.accept_mailbox_delivery_for_current_turn(); + self.input_queue + .push_pending_input_and_accept_mailbox_delivery_for_turn_state( + active_turn.turn_state.as_ref(), + input.into(), + ) + .await; Ok(active_turn_id.clone()) } /// Returns the input if there was no task running to inject into. - #[expect( - clippy::await_holding_invalid_type, - reason = "active turn checks and turn state updates must remain atomic" - )] pub async fn inject_response_items( &self, input: Vec, ) -> Result<(), Vec> { - let mut active = self.active_turn.lock().await; - match active.as_mut() { - Some(at) => { - let mut ts = at.turn_state.lock().await; - for item in input { - ts.push_pending_input(item); - } - Ok(()) - } - None => Err(input), - } - } - - pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) { - let turn_state = self.turn_state_for_sub_id(sub_id).await; - let Some(turn_state) = turn_state else { - return; - }; - let mut turn_state = turn_state.lock().await; - if turn_state.has_pending_input() { - return; - } - turn_state.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn); - } - - pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) { - let turn_state = self.turn_state_for_sub_id(sub_id).await; - let Some(turn_state) = turn_state else { - return; - }; - turn_state - .lock() + self.input_queue + .inject_response_items(&self.active_turn, input) .await - .set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn); } pub(crate) async fn record_memory_citation_for_turn(&self, sub_id: &str) { - let turn_state = self.turn_state_for_sub_id(sub_id).await; + let turn_state = self + .input_queue + .turn_state_for_sub_id(&self.active_turn, sub_id) + .await; let Some(turn_state) = turn_state else { return; }; turn_state.lock().await.has_memory_citation = true; } - async fn turn_state_for_sub_id( - &self, - sub_id: &str, - ) -> Option>> { - let active = self.active_turn.lock().await; - active.as_ref().and_then(|active_turn| { - active_turn - .tasks - .contains_key(sub_id) - .then(|| Arc::clone(&active_turn.turn_state)) - }) - } - - pub(crate) fn subscribe_mailbox_seq(&self) -> watch::Receiver { - self.mailbox.subscribe() - } - - pub(crate) fn enqueue_mailbox_communication(&self, communication: InterAgentCommunication) { - self.mailbox.send(communication); - } - - pub(crate) async fn has_trigger_turn_mailbox_items(&self) -> bool { - self.mailbox_rx.lock().await.has_pending_trigger_turn() - } - - pub(crate) async fn has_pending_mailbox_items(&self) -> bool { - self.mailbox_rx.lock().await.has_pending() - } - - #[expect( - clippy::await_holding_invalid_type, - reason = "active turn checks and turn state updates must remain atomic" - )] - pub async fn prepend_pending_input(&self, input: Vec) -> Result<(), ()> { - let mut active = self.active_turn.lock().await; - match active.as_mut() { - Some(at) => { - let mut ts = at.turn_state.lock().await; - ts.prepend_pending_input(input); - Ok(()) - } - None => Err(()), - } - } - - #[expect( - clippy::await_holding_invalid_type, - reason = "active turn checks and turn state updates must remain atomic" - )] - pub async fn get_pending_input(&self) -> Vec { - let (pending_input, accepts_mailbox_delivery) = { - let mut active = self.active_turn.lock().await; - match active.as_mut() { - Some(at) => { - let mut ts = at.turn_state.lock().await; - ( - ts.take_pending_input(), - ts.accepts_mailbox_delivery_for_current_turn(), - ) - } - None => (Vec::new(), true), - } - }; - if !accepts_mailbox_delivery { - return pending_input; - } - let mailbox_items = { - let mut mailbox_rx = self.mailbox_rx.lock().await; - mailbox_rx - .drain() - .into_iter() - .map(|mail| mail.to_response_input_item()) - .collect::>() - }; - if pending_input.is_empty() { - mailbox_items - } else if mailbox_items.is_empty() { - pending_input - } else { - let mut pending_input = pending_input; - pending_input.extend(mailbox_items); - pending_input - } - } - - /// Queue response items to be injected into the next active turn created for this session. - pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec) { - if items.is_empty() { - return; - } - - let mut idle_pending_input = self.idle_pending_input.lock().await; - idle_pending_input.extend(items); - } - - pub(crate) async fn take_queued_response_items_for_next_turn(&self) -> Vec { - std::mem::take(&mut *self.idle_pending_input.lock().await) - } - - pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool { - !self.idle_pending_input.lock().await.is_empty() - } - - #[expect( - clippy::await_holding_invalid_type, - reason = "active turn checks and turn state reads must remain atomic" - )] - pub async fn has_pending_input(&self) -> bool { - let (has_turn_pending_input, accepts_mailbox_delivery) = { - let active = self.active_turn.lock().await; - match active.as_ref() { - Some(at) => { - let ts = at.turn_state.lock().await; - ( - ts.has_pending_input(), - ts.accepts_mailbox_delivery_for_current_turn(), - ) - } - None => (false, true), - } - }; - if has_turn_pending_input { - return true; - } - if !accepts_mailbox_delivery { - return false; - } - self.has_pending_mailbox_items().await - } - pub async fn interrupt_task(self: &Arc) { info!("interrupt received: abort current task, if any"); let had_active_turn = self.active_turn.lock().await.is_some(); diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 925ee2df59..082308f20d 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -1,5 +1,7 @@ +use super::input_queue::InputQueue; use super::*; use crate::goals::GoalRuntimeState; +use crate::state::ActiveTurn; use codex_protocol::SessionId; use codex_protocol::config_types::ServiceTier; use codex_protocol::permissions::FileSystemPath; @@ -27,9 +29,7 @@ pub(crate) struct Session { pub(super) pending_mcp_server_refresh_config: Mutex>, pub(crate) conversation: Arc, pub(crate) active_turn: Mutex>, - pub(super) mailbox: Mailbox, - pub(super) mailbox_rx: Mutex, - pub(super) idle_pending_input: Mutex>, // TODO (jif) merge with mailbox! + pub(crate) input_queue: InputQueue, pub(crate) goal_runtime: GoalRuntimeState, pub(crate) guardian_review_session: GuardianReviewSessionManager, pub(crate) services: SessionServices, @@ -950,7 +950,6 @@ impl Session { let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) = watch::channel(false); - let (mailbox, mailbox_rx) = Mailbox::new(); let sess = Arc::new(Session { conversation_id: thread_id, installation_id, @@ -963,9 +962,7 @@ impl Session { pending_mcp_server_refresh_config: Mutex::new(None), conversation: Arc::new(RealtimeConversationManager::new()), active_turn: Mutex::new(None), - mailbox, - mailbox_rx: Mutex::new(mailbox_rx), - idle_pending_input: Mutex::new(Vec::new()), + input_queue: InputQueue::new(), goal_runtime: GoalRuntimeState::new(), guardian_review_session: GuardianReviewSessionManager::default(), services, @@ -1130,7 +1127,7 @@ impl Session { }; // record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted. - sess.record_initial_history(initial_history).await; + Box::pin(sess.record_initial_history(initial_history)).await; { let mut state = sess.state.lock().await; state.set_pending_session_start_source(Some(session_start_source)); diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index e3f038eff7..4486d287c5 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -4394,7 +4394,6 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { /*goal_tools_supported*/ true, ); - let (mailbox, mailbox_rx) = crate::agent::Mailbox::new(); let session = Session { conversation_id: thread_id, installation_id: "11111111-1111-4111-8111-111111111111".to_string(), @@ -4407,9 +4406,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { pending_mcp_server_refresh_config: Mutex::new(None), conversation: Arc::new(RealtimeConversationManager::new()), active_turn: Mutex::new(None), - mailbox, - mailbox_rx: Mutex::new(mailbox_rx), - idle_pending_input: Mutex::new(Vec::new()), + input_queue: super::input_queue::InputQueue::new(), goal_runtime: crate::goals::GoalRuntimeState::new(), guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(), services, @@ -6252,7 +6249,6 @@ where /*goal_tools_supported*/ true, )); - let (mailbox, mailbox_rx) = crate::agent::Mailbox::new(); let session = Arc::new(Session { conversation_id: thread_id, installation_id: "11111111-1111-4111-8111-111111111111".to_string(), @@ -6265,9 +6261,7 @@ where pending_mcp_server_refresh_config: Mutex::new(None), conversation: Arc::new(RealtimeConversationManager::new()), active_turn: Mutex::new(None), - mailbox, - mailbox_rx: Mutex::new(mailbox_rx), - idle_pending_input: Mutex::new(Vec::new()), + input_queue: super::input_queue::InputQueue::new(), goal_runtime: crate::goals::GoalRuntimeState::new(), guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(), services, @@ -8098,7 +8092,7 @@ async fn steer_input_returns_active_turn_id() { .expect("steering with matching expected turn id should succeed"); assert_eq!(turn_id, tc.sub_id); - assert!(sess.has_pending_input().await); + assert!(sess.input_queue.has_pending_input(&sess.active_turn).await); } #[tokio::test] @@ -8144,7 +8138,7 @@ async fn prepend_pending_input_keeps_older_tail_ahead_of_newer_input() { .await .expect("inject initial pending input into active turn"); - let drained = sess.get_pending_input().await; + let drained = sess.input_queue.get_pending_input(&sess.active_turn).await; assert_eq!(drained, vec![blocked, later.clone()]); sess.inject_response_items(vec![newer.clone()]) @@ -8153,11 +8147,15 @@ async fn prepend_pending_input_keeps_older_tail_ahead_of_newer_input() { let mut drained_iter = drained.into_iter(); let _blocked = drained_iter.next().expect("blocked prompt should exist"); - sess.prepend_pending_input(drained_iter.collect()) + sess.input_queue + .prepend_pending_input(&sess.active_turn, drained_iter.collect()) .await .expect("requeue later pending input at the front of the queue"); - assert_eq!(sess.get_pending_input().await, vec![later, newer]); + assert_eq!( + sess.input_queue.get_pending_input(&sess.active_turn).await, + vec![later, newer] + ); } #[tokio::test] @@ -8171,7 +8169,8 @@ async fn queued_response_items_for_next_turn_move_into_next_active_turn() { phase: None, }; - sess.queue_response_items_for_next_turn(vec![queued_item.clone()]) + sess.input_queue + .queue_response_items_for_next_turn(vec![queued_item.clone()]) .await; sess.spawn_task( @@ -8184,7 +8183,10 @@ async fn queued_response_items_for_next_turn_move_into_next_active_turn() { ) .await; - assert_eq!(sess.get_pending_input().await, vec![queued_item]); + assert_eq!( + sess.input_queue.get_pending_input(&sess.active_turn).await, + vec![queued_item] + ); } #[tokio::test] @@ -8198,13 +8200,18 @@ async fn idle_interrupt_does_not_wake_queued_next_turn_items() { phase: None, }; - sess.queue_response_items_for_next_turn(vec![queued_item]) + sess.input_queue + .queue_response_items_for_next_turn(vec![queued_item]) .await; sess.abort_all_tasks(TurnAbortReason::Interrupted).await; assert!(sess.active_turn.lock().await.is_none()); - assert!(sess.has_queued_response_items_for_next_turn().await); + assert!( + sess.input_queue + .has_queued_response_items_for_next_turn() + .await + ); } #[tokio::test] @@ -8222,16 +8229,17 @@ async fn abort_empty_active_turn_preserves_pending_input() { let active_turn = active.get_or_insert_with(ActiveTurn::default); Arc::clone(&active_turn.turn_state) }; - turn_state - .lock() - .await - .push_pending_input(pending_item.clone()); + sess.input_queue + .extend_pending_input_for_turn_state(turn_state.as_ref(), vec![pending_item.clone()]) + .await; sess.abort_all_tasks(TurnAbortReason::Replaced).await; assert!(sess.active_turn.lock().await.is_none()); assert_eq!( - turn_state.lock().await.take_pending_input(), + sess.input_queue + .take_pending_input_for_turn_state(turn_state.as_ref()) + .await, vec![pending_item] ); } @@ -8593,7 +8601,7 @@ async fn budget_limited_accounting_steers_active_turn_without_aborting() -> anyh }) .await?; - let pending_input = sess.get_pending_input().await; + let pending_input = sess.input_queue.get_pending_input(&sess.active_turn).await; let [ResponseInputItem::Message { role, content, .. }] = pending_input.as_slice() else { panic!("expected one budget-limit steering message, got {pending_input:#?}"); }; @@ -8814,7 +8822,7 @@ async fn external_objective_change_steers_active_turn() -> anyhow::Result<()> { }) .await?; - let pending_input = sess.get_pending_input().await; + let pending_input = sess.input_queue.get_pending_input(&sess.active_turn).await; assert!( pending_input.iter().any(|item| { matches!( @@ -9021,19 +9029,26 @@ async fn queue_only_mailbox_mail_waits_for_next_turn_after_answer_boundary() { ) .await; - sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; - sess.enqueue_mailbox_communication(communication.clone()); + sess.input_queue + .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id) + .await; + sess.input_queue + .enqueue_mailbox_communication(communication.clone()) + .await; assert!( - !sess.has_pending_input().await, + !sess.input_queue.has_pending_input(&sess.active_turn).await, "queue-only mailbox mail should stay buffered once the current turn emitted its answer" ); - assert_eq!(sess.get_pending_input().await, Vec::new()); + assert_eq!( + sess.input_queue.get_pending_input(&sess.active_turn).await, + Vec::new() + ); sess.abort_all_tasks(TurnAbortReason::Replaced).await; assert_eq!( - sess.get_pending_input().await, + sess.input_queue.get_pending_input(&sess.active_turn).await, vec![communication.to_response_input_item()], ); } @@ -9051,23 +9066,27 @@ async fn trigger_turn_mailbox_mail_waits_for_next_turn_after_answer_boundary() { ) .await; - sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; - sess.enqueue_mailbox_communication(InterAgentCommunication::new( - AgentPath::try_from("/root/worker").expect("worker path should parse"), - AgentPath::root(), - Vec::new(), - "late trigger update".to_string(), - /*trigger_turn*/ true, - )); + sess.input_queue + .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id) + .await; + sess.input_queue + .enqueue_mailbox_communication(InterAgentCommunication::new( + AgentPath::try_from("/root/worker").expect("worker path should parse"), + AgentPath::root(), + Vec::new(), + "late trigger update".to_string(), + /*trigger_turn*/ true, + )) + .await; assert!( - !sess.has_pending_input().await, + !sess.input_queue.has_pending_input(&sess.active_turn).await, "trigger-turn mailbox mail should not extend the current turn after its answer boundary" ); sess.abort_all_tasks(TurnAbortReason::Replaced).await; - assert!(sess.has_trigger_turn_mailbox_items().await); + assert!(sess.input_queue.has_trigger_turn_mailbox_items().await); } #[tokio::test] @@ -9090,8 +9109,12 @@ async fn steered_input_reopens_mailbox_delivery_for_current_turn() { ) .await; - sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; - sess.enqueue_mailbox_communication(communication.clone()); + sess.input_queue + .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id) + .await; + sess.input_queue + .enqueue_mailbox_communication(communication.clone()) + .await; sess.steer_input( vec![UserInput::Text { text: "follow up".to_string(), @@ -9104,7 +9127,7 @@ async fn steered_input_reopens_mailbox_delivery_for_current_turn() { .expect("steered input should be accepted"); assert_eq!( - sess.get_pending_input().await, + sess.input_queue.get_pending_input(&sess.active_turn).await, vec![ ResponseInputItem::from(vec![UserInput::Text { text: "follow up".to_string(), @@ -9135,8 +9158,12 @@ async fn stale_defer_mailbox_delivery_does_not_override_steered_input() { ) .await; - sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; - sess.enqueue_mailbox_communication(communication.clone()); + sess.input_queue + .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id) + .await; + sess.input_queue + .enqueue_mailbox_communication(communication.clone()) + .await; sess.steer_input( vec![UserInput::Text { text: "follow up".to_string(), @@ -9148,10 +9175,12 @@ async fn stale_defer_mailbox_delivery_does_not_override_steered_input() { .await .expect("steered input should be accepted"); - sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; + sess.input_queue + .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id) + .await; assert_eq!( - sess.get_pending_input().await, + sess.input_queue.get_pending_input(&sess.active_turn).await, vec![ ResponseInputItem::from(vec![UserInput::Text { text: "follow up".to_string(), @@ -9182,8 +9211,12 @@ async fn tool_calls_reopen_mailbox_delivery_for_current_turn() { ) .await; - sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; - sess.enqueue_mailbox_communication(communication.clone()); + sess.input_queue + .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id) + .await; + sess.input_queue + .enqueue_mailbox_communication(communication.clone()) + .await; let item = ResponseItem::FunctionCall { id: None, @@ -9207,7 +9240,7 @@ async fn tool_calls_reopen_mailbox_delivery_for_current_turn() { assert!(output.needs_follow_up); assert!(output.tool_future.is_some()); assert_eq!( - sess.get_pending_input().await, + sess.input_queue.get_pending_input(&sess.active_turn).await, vec![communication.to_response_input_item()], ); } diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index ab316440e5..0c5e00927a 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -148,7 +148,7 @@ pub(crate) async fn run_turn( prewarmed_client_session: Option, cancellation_token: CancellationToken, ) -> Option { - if input.is_empty() && !sess.has_pending_input().await { + if input.is_empty() && !sess.input_queue.has_pending_input(&sess.active_turn).await { return None; } @@ -413,7 +413,7 @@ pub(crate) async fn run_turn( // submitted through the UI while the model was running. Though the UI // may support this, the model might not. let pending_input = if can_drain_pending_input { - sess.get_pending_input().await + sess.input_queue.get_pending_input(&sess.active_turn).await } else { Vec::new() }; @@ -434,7 +434,10 @@ pub(crate) async fn run_turn( } => { let remaining_pending_input = pending_input_iter.collect::>(); if !remaining_pending_input.is_empty() { - let _ = sess.prepend_pending_input(remaining_pending_input).await; + let _ = sess + .input_queue + .prepend_pending_input(&sess.active_turn, remaining_pending_input) + .await; requeued_pending_input = true; } blocked_pending_input_contexts = additional_contexts; @@ -494,7 +497,7 @@ pub(crate) async fn run_turn( last_agent_message: sampling_request_last_agent_message, } = sampling_request_output; can_drain_pending_input = true; - let has_pending_input = sess.has_pending_input().await; + let has_pending_input = sess.input_queue.has_pending_input(&sess.active_turn).await; let needs_follow_up = model_needs_follow_up || has_pending_input; let total_usage_tokens = sess.get_total_token_usage().await; let token_limit_reached = total_usage_tokens >= auto_compact_limit; @@ -2065,7 +2068,7 @@ async fn try_run_sampling_request( } needs_follow_up |= output_result.needs_follow_up; // todo: remove before stabilizing multi-agent v2 - if preempt_for_mailbox_mail && sess.mailbox_rx.lock().await.has_pending() { + if preempt_for_mailbox_mail && sess.input_queue.has_pending_mailbox_items().await { break Ok(SamplingRequestResult { needs_follow_up: true, last_agent_message, diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 70aae74e87..86438ad56a 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -11,7 +11,6 @@ use tokio_util::task::AbortOnDropHandle; use codex_extension_api::ExtensionData; use codex_protocol::dynamic_tools::DynamicToolResponse; -use codex_protocol::models::ResponseInputItem; use codex_protocol::request_permissions::RequestPermissionProfile; use codex_protocol::request_permissions::RequestPermissionsResponse; use codex_protocol::request_user_input::RequestUserInputResponse; @@ -20,6 +19,7 @@ use codex_utils_absolute_path::AbsolutePathBuf; use rmcp::model::RequestId; use tokio::sync::oneshot; +use crate::session::TurnInputQueue; use crate::session::turn_context::TurnContext; use crate::tasks::AnySessionTask; use codex_protocol::models::AdditionalPermissionProfile; @@ -115,7 +115,7 @@ pub(crate) struct TurnState { pending_user_input: HashMap>, pending_elicitations: HashMap<(String, RequestId), oneshot::Sender>, pending_dynamic_tools: HashMap>, - pending_input: Vec, + pub(crate) pending_input: TurnInputQueue, mailbox_delivery_phase: MailboxDeliveryPhase, granted_permissions: Option, strict_auto_review_enabled: bool, @@ -146,13 +146,12 @@ impl TurnState { self.pending_approvals.remove(key) } - pub(crate) fn clear_pending(&mut self) { + pub(crate) fn clear_pending_waiters(&mut self) { self.pending_approvals.clear(); self.pending_request_permissions.clear(); self.pending_user_input.clear(); self.pending_elicitations.clear(); self.pending_dynamic_tools.clear(); - self.pending_input.clear(); } pub(crate) fn insert_pending_request_permissions( @@ -220,33 +219,6 @@ impl TurnState { self.pending_dynamic_tools.remove(key) } - pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) { - self.pending_input.push(input); - } - - pub(crate) fn prepend_pending_input(&mut self, mut input: Vec) { - if input.is_empty() { - return; - } - - input.append(&mut self.pending_input); - self.pending_input = input; - } - - pub(crate) fn take_pending_input(&mut self) -> Vec { - if self.pending_input.is_empty() { - Vec::with_capacity(0) - } else { - let mut ret = Vec::new(); - std::mem::swap(&mut ret, &mut self.pending_input); - ret - } - } - - pub(crate) fn has_pending_input(&self) -> bool { - !self.pending_input.is_empty() - } - pub(crate) fn accept_mailbox_delivery_for_current_turn(&mut self) { self.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn); } @@ -276,11 +248,3 @@ impl TurnState { self.strict_auto_review_enabled } } - -impl ActiveTurn { - /// Clear any pending approvals and input buffered for the current turn. - pub(crate) async fn clear_pending(&self) { - let mut ts = self.turn_state.lock().await; - ts.clear_pending(); - } -} diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index bc44fe0c3b..0e32ccb9fb 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -158,7 +158,8 @@ pub(crate) async fn record_completed_response_item_with_finalized_facts( |facts| facts.defers_mailbox_delivery_to_next_turn, ); if defers_mailbox_delivery { - sess.defer_mailbox_delivery_to_next_turn(&turn_context.sub_id) + sess.input_queue + .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &turn_context.sub_id) .await; } mark_thread_memory_mode_polluted_if_external_context(sess, turn_context, item).await; @@ -351,7 +352,11 @@ pub(crate) async fn handle_output_item_done( // The model emitted a tool call; log it, persist the item immediately, and queue the tool execution. Ok(Some(call)) => { ctx.sess - .accept_mailbox_delivery_for_current_turn(&ctx.turn_context.sub_id) + .input_queue + .accept_mailbox_delivery_for_current_turn( + &ctx.sess.active_turn, + &ctx.turn_context.sub_id, + ) .await; let payload_preview = call.payload.log_payload().into_owned(); diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 18f650e925..ba7f707927 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -347,24 +347,23 @@ impl Session { { warn!("failed to apply goal runtime turn-start event: {err}"); } - let queued_response_items = self.take_queued_response_items_for_next_turn().await; - let mailbox_items = self.get_pending_input().await; + let queued_response_items = self + .input_queue + .take_queued_response_items_for_next_turn() + .await; + let mailbox_items = self.input_queue.get_pending_input(&self.active_turn).await; let turn_state = { let mut active = self.active_turn.lock().await; let turn = active.get_or_insert_with(ActiveTurn::default); debug_assert!(turn.tasks.is_empty()); Arc::clone(&turn.turn_state) }; - { - let mut turn_state = turn_state.lock().await; - turn_state.token_usage_at_turn_start = token_usage_at_turn_start; - for item in queued_response_items { - turn_state.push_pending_input(item); - } - for item in mailbox_items { - turn_state.push_pending_input(item); - } - } + turn_state.lock().await.token_usage_at_turn_start = token_usage_at_turn_start; + let mut pending_items = queued_response_items; + pending_items.extend(mailbox_items); + self.input_queue + .extend_pending_input_for_turn_state(turn_state.as_ref(), pending_items) + .await; self.emit_turn_start_lifecycle(turn_context.extension_data.as_ref()) .await; @@ -468,8 +467,11 @@ impl Session { self: &Arc, sub_id: String, ) { - if !self.has_queued_response_items_for_next_turn().await - && !self.has_trigger_turn_mailbox_items().await + if !self + .input_queue + .has_queued_response_items_for_next_turn() + .await + && !self.input_queue.has_trigger_turn_mailbox_items().await { return; } @@ -521,7 +523,7 @@ impl Session { if let Some(active_turn) = active_turn_to_clear { // Let interrupted tasks observe cancellation before dropping pending approvals, or an // in-flight approval wait can surface as a model-visible rejection before TurnAborted. - active_turn.clear_pending().await; + self.input_queue.clear_pending(&active_turn).await; } if reason == TurnAbortReason::Interrupted && aborted_turn { self.maybe_start_turn_for_pending_work().await; @@ -567,7 +569,7 @@ impl Session { } // Let interrupted tasks observe cancellation before dropping pending approvals, or an // in-flight approval wait can surface as a model-visible rejection before TurnAborted. - active_turn.clear_pending().await; + self.input_queue.clear_pending(&active_turn).await; if reason == TurnAbortReason::Interrupted { self.maybe_start_turn_for_pending_work().await; @@ -609,8 +611,11 @@ impl Session { } }; if let Some(turn_state) = turn_state.as_ref() { - let mut ts = turn_state.lock().await; - pending_input = ts.take_pending_input(); + pending_input = self + .input_queue + .take_pending_input_for_turn_state(turn_state.as_ref()) + .await; + let ts = turn_state.lock().await; turn_had_memory_citation = ts.has_memory_citation; turn_tool_calls = ts.tool_calls; token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone()); diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 756a691f11..50414df278 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -80,7 +80,7 @@ impl SessionTask for RegularTask { ) .instrument(run_turn_span.clone()) .await; - if !sess.has_pending_input().await { + if !sess.input_queue.has_pending_input(&sess.active_turn).await { return last_agent_message; } next_input = Vec::new(); diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index 0044bf8522..a83167ffc5 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -2791,13 +2791,16 @@ async fn multi_agent_v2_wait_agent_accepts_timeout_only_argument() { }); tokio::task::yield_now().await; - session.enqueue_mailbox_communication(InterAgentCommunication::new( - worker_path, - AgentPath::root(), - Vec::new(), - "hello from worker".to_string(), - /*trigger_turn*/ false, - )); + session + .input_queue + .enqueue_mailbox_communication(InterAgentCommunication::new( + worker_path, + AgentPath::root(), + Vec::new(), + "hello from worker".to_string(), + /*trigger_turn*/ false, + )) + .await; let output = wait_task .await @@ -3276,13 +3279,16 @@ async fn multi_agent_v2_wait_agent_returns_summary_for_mailbox_activity() { }); tokio::task::yield_now().await; - session.enqueue_mailbox_communication(InterAgentCommunication::new( - worker_path, - AgentPath::root(), - Vec::new(), - "completed".to_string(), - /*trigger_turn*/ false, - )); + session + .input_queue + .enqueue_mailbox_communication(InterAgentCommunication::new( + worker_path, + AgentPath::root(), + Vec::new(), + "completed".to_string(), + /*trigger_turn*/ false, + )) + .await; let wait_output = wait_task .await @@ -3346,13 +3352,16 @@ async fn multi_agent_v2_wait_agent_returns_for_already_queued_mail() { .agent_path .expect("worker path"); - session.enqueue_mailbox_communication(InterAgentCommunication::new( - worker_path, - AgentPath::root(), - Vec::new(), - "already queued".to_string(), - /*trigger_turn*/ false, - )); + session + .input_queue + .enqueue_mailbox_communication(InterAgentCommunication::new( + worker_path, + AgentPath::root(), + Vec::new(), + "already queued".to_string(), + /*trigger_turn*/ false, + )) + .await; let output = timeout( Duration::from_millis(500), @@ -3442,13 +3451,16 @@ async fn multi_agent_v2_wait_agent_wakes_on_any_mailbox_notification() { }); tokio::task::yield_now().await; - session.enqueue_mailbox_communication(InterAgentCommunication::new( - worker_b_path, - AgentPath::root(), - Vec::new(), - "from worker b".to_string(), - /*trigger_turn*/ false, - )); + session + .input_queue + .enqueue_mailbox_communication(InterAgentCommunication::new( + worker_b_path, + AgentPath::root(), + Vec::new(), + "from worker b".to_string(), + /*trigger_turn*/ false, + )) + .await; let output = wait_task .await @@ -3527,13 +3539,16 @@ async fn multi_agent_v2_wait_agent_does_not_return_completed_content() { }); tokio::task::yield_now().await; - session.enqueue_mailbox_communication(InterAgentCommunication::new( - worker_path, - AgentPath::root(), - Vec::new(), - "sensitive child output".to_string(), - /*trigger_turn*/ false, - )); + session + .input_queue + .enqueue_mailbox_communication(InterAgentCommunication::new( + worker_path, + AgentPath::root(), + Vec::new(), + "sensitive child output".to_string(), + /*trigger_turn*/ false, + )) + .await; let output = wait_task .await diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs index 8913411b53..285abd9330 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs @@ -60,7 +60,7 @@ impl ToolExecutor for Handler { None => default_timeout_ms, }; - let mut mailbox_seq_rx = session.subscribe_mailbox_seq(); + let mut mailbox_rx = session.input_queue.subscribe_mailbox().await; session .send_event( @@ -76,12 +76,8 @@ impl ToolExecutor for Handler { ) .await; - let timed_out = if session.has_pending_mailbox_items().await { - false - } else { - let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64); - !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await - }; + let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64); + let timed_out = !wait_for_mailbox_change(&mut mailbox_rx, deadline).await; let result = WaitAgentResult::from_timed_out(timed_out); session @@ -153,10 +149,10 @@ impl ToolOutput for WaitAgentResult { } async fn wait_for_mailbox_change( - mailbox_seq_rx: &mut tokio::sync::watch::Receiver, + mailbox_rx: &mut tokio::sync::watch::Receiver<()>, deadline: Instant, ) -> bool { - match timeout_at(deadline, mailbox_seq_rx.changed()).await { + match timeout_at(deadline, mailbox_rx.changed()).await { Ok(Ok(())) => true, Ok(Err(_)) | Err(_) => false, }