[codex] Move pending input into input queue (#22728)

## Why

Pending model input was split across `Session`, `TurnState`, and the
agent mailbox. That made it easy for new paths to manage queued user
input or mailbox delivery outside the intended ownership boundary.

This PR consolidates the model-facing input lifecycle behind the session
input queue so turn-local pending input, next-turn queued items, and
mailbox delivery coordination are owned in one place.

## What Changed

- Added `session/input_queue.rs` to own pending input queues and mailbox
delivery coordination.
- Removed the standalone `agent/mailbox.rs` channel wrapper and store
mailbox items directly in the input queue.
- Moved pending-input mutations off `TurnState`; `TurnState` now exposes
the queue-owned storage directly for now.
- Routed abort cleanup, mailbox delivery phase changes, next-turn queued
items, and active-turn pending input through `InputQueue`.
- Boxed stack-heavy agent resume/fork startup futures that the refactor
pushed over the default test stack.
- Updated session, task, goal, stream-event, and multi-agent call sites
and tests to use the new queue ownership.

## Verification

- `cargo test -p codex-core --lib agent::control::tests`
- `cargo test -p codex-core --lib
agent::control::tests::resume_closed_child_reopens_open_descendants --
--exact`
- `cargo test -p codex-core --lib
agent::control::tests::spawn_agent_fork_last_n_turns_keeps_only_recent_turns
-- --exact`
- `cargo test -p codex-core --lib
agent::control::tests::resume_thread_subagent_restores_stored_nickname_and_role
-- --exact`
- `cargo test -p codex-core` was also run; it completed with 1814
passed, 4 ignored, and one timeout in
`agent::control::tests::resume_thread_subagent_restores_stored_nickname_and_role`,
which passed when rerun in isolation.
This commit is contained in:
pakrym-oai
2026-05-18 15:43:01 -07:00
committed by GitHub
parent a66e0e9c4b
commit afa0101ae2
18 changed files with 623 additions and 524 deletions

View File

@@ -499,13 +499,12 @@ impl AgentControl {
agent_nickname: None, agent_nickname: None,
agent_role: None, agent_role: None,
}); });
match self match Box::pin(self.resume_single_agent_from_rollout(
.resume_single_agent_from_rollout( config.clone(),
config.clone(), child_thread_id,
child_thread_id, child_session_source,
child_session_source, ))
) .await
.await
{ {
Ok(_) => true, Ok(_) => true,
Err(err) => { Err(err) => {

View File

@@ -486,7 +486,13 @@ async fn send_inter_agent_communication_without_turn_queues_message_without_trig
timeout(Duration::from_secs(5), async { timeout(Duration::from_secs(5), async {
loop { loop {
if thread.codex.session.has_pending_input().await { if thread
.codex
.session
.input_queue
.has_pending_input(&thread.codex.session.active_turn)
.await
{
break; break;
} }
sleep(Duration::from_millis(10)).await; sleep(Duration::from_millis(10)).await;

View File

@@ -1,161 +0,0 @@
use codex_protocol::protocol::InterAgentCommunication;
use std::collections::VecDeque;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use tokio::sync::mpsc;
use tokio::sync::watch;
#[cfg(test)]
use codex_protocol::AgentPath;
pub(crate) struct Mailbox {
tx: mpsc::UnboundedSender<InterAgentCommunication>,
next_seq: AtomicU64,
seq_tx: watch::Sender<u64>,
}
pub(crate) struct MailboxReceiver {
rx: mpsc::UnboundedReceiver<InterAgentCommunication>,
pending_mails: VecDeque<InterAgentCommunication>,
}
impl Mailbox {
pub(crate) fn new() -> (Self, MailboxReceiver) {
let (tx, rx) = mpsc::unbounded_channel();
let (seq_tx, _) = watch::channel(0);
(
Self {
tx,
next_seq: AtomicU64::new(0),
seq_tx,
},
MailboxReceiver {
rx,
pending_mails: VecDeque::new(),
},
)
}
pub(crate) fn subscribe(&self) -> watch::Receiver<u64> {
self.seq_tx.subscribe()
}
pub(crate) fn send(&self, communication: InterAgentCommunication) -> u64 {
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed) + 1;
let _ = self.tx.send(communication);
self.seq_tx.send_replace(seq);
seq
}
}
impl MailboxReceiver {
fn sync_pending_mails(&mut self) {
while let Ok(mail) = self.rx.try_recv() {
self.pending_mails.push_back(mail);
}
}
pub(crate) fn has_pending(&mut self) -> bool {
self.sync_pending_mails();
!self.pending_mails.is_empty()
}
pub(crate) fn has_pending_trigger_turn(&mut self) -> bool {
self.sync_pending_mails();
self.pending_mails.iter().any(|mail| mail.trigger_turn)
}
pub(crate) fn drain(&mut self) -> Vec<InterAgentCommunication> {
self.sync_pending_mails();
self.pending_mails.drain(..).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
fn make_mail(
author: AgentPath,
recipient: AgentPath,
content: &str,
trigger_turn: bool,
) -> InterAgentCommunication {
InterAgentCommunication::new(
author,
recipient,
Vec::new(),
content.to_string(),
trigger_turn,
)
}
#[tokio::test]
async fn mailbox_assigns_monotonic_sequence_numbers() {
let (mailbox, _receiver) = Mailbox::new();
let mut seq_rx = mailbox.subscribe();
let seq_a = mailbox.send(make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"one",
/*trigger_turn*/ false,
));
let seq_b = mailbox.send(make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"two",
/*trigger_turn*/ false,
));
seq_rx.changed().await.expect("first seq update");
assert_eq!(*seq_rx.borrow(), seq_b);
assert_eq!(seq_a, 1);
assert_eq!(seq_b, 2);
}
#[tokio::test]
async fn mailbox_drains_in_delivery_order() {
let (mailbox, mut receiver) = Mailbox::new();
let mail_one = make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"one",
/*trigger_turn*/ false,
);
let mail_two = make_mail(
AgentPath::try_from("/root/worker").expect("agent path"),
AgentPath::root(),
"two",
/*trigger_turn*/ false,
);
mailbox.send(mail_one.clone());
mailbox.send(mail_two.clone());
assert_eq!(receiver.drain(), vec![mail_one, mail_two]);
assert!(!receiver.has_pending());
}
#[tokio::test]
async fn mailbox_tracks_pending_trigger_turn_mail() {
let (mailbox, mut receiver) = Mailbox::new();
mailbox.send(make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"queued",
/*trigger_turn*/ false,
));
assert!(!receiver.has_pending_trigger_turn());
mailbox.send(make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"wake",
/*trigger_turn*/ true,
));
assert!(receiver.has_pending_trigger_turn());
}
}

View File

@@ -1,14 +1,11 @@
pub(crate) mod agent_resolver; pub(crate) mod agent_resolver;
pub(crate) mod control; pub(crate) mod control;
pub(crate) mod mailbox;
mod registry; mod registry;
pub(crate) mod role; pub(crate) mod role;
pub(crate) mod status; pub(crate) mod status;
pub(crate) use codex_protocol::protocol::AgentStatus; pub(crate) use codex_protocol::protocol::AgentStatus;
pub(crate) use control::AgentControl; pub(crate) use control::AgentControl;
pub(crate) use mailbox::Mailbox;
pub(crate) use mailbox::MailboxReceiver;
pub(crate) use registry::exceeds_thread_spawn_depth_limit; pub(crate) use registry::exceeds_thread_spawn_depth_limit;
pub(crate) use registry::next_thread_spawn_depth; pub(crate) use registry::next_thread_spawn_depth;
pub(crate) use status::agent_status_from_event; pub(crate) use status::agent_status_from_event;

View File

@@ -383,6 +383,7 @@ impl CodexThread {
{ {
self.codex self.codex
.session .session
.input_queue
.queue_response_items_for_next_turn(items) .queue_response_items_for_next_turn(items)
.await; .await;
self.codex.session.maybe_start_turn_for_pending_work().await; self.codex.session.maybe_start_turn_for_pending_work().await;

View File

@@ -1331,12 +1331,9 @@ impl Session {
.await; .await;
return; return;
} }
{ self.input_queue
let mut turn_state = turn_state.lock().await; .extend_pending_input_for_turn_state(turn_state.as_ref(), candidate.items)
for item in candidate.items { .await;
turn_state.push_pending_input(item);
}
}
let turn_context = self let turn_context = self
.new_default_turn_with_sub_id(uuid::Uuid::new_v4().to_string()) .new_default_turn_with_sub_id(uuid::Uuid::new_v4().to_string())
@@ -1374,11 +1371,15 @@ impl Session {
tracing::debug!("skipping active goal continuation because a turn is already active"); tracing::debug!("skipping active goal continuation because a turn is already active");
return None; return None;
} }
if self.has_queued_response_items_for_next_turn().await { if self
.input_queue
.has_queued_response_items_for_next_turn()
.await
{
tracing::debug!("skipping active goal continuation because queued input exists"); tracing::debug!("skipping active goal continuation because queued input exists");
return None; return None;
} }
if self.has_trigger_turn_mailbox_items().await { if self.input_queue.has_trigger_turn_mailbox_items().await {
tracing::debug!( tracing::debug!(
"skipping active goal continuation because trigger-turn mailbox input is pending" "skipping active goal continuation because trigger-turn mailbox input is pending"
); );
@@ -1415,8 +1416,11 @@ impl Session {
return None; return None;
} }
if self.active_turn.lock().await.is_some() if self.active_turn.lock().await.is_some()
|| self.has_queued_response_items_for_next_turn().await || self
|| self.has_trigger_turn_mailbox_items().await .input_queue
.has_queued_response_items_for_next_turn()
.await
|| self.input_queue.has_trigger_turn_mailbox_items().await
{ {
tracing::debug!("skipping active goal continuation because pending work appeared"); tracing::debug!("skipping active goal continuation because pending work appeared");
return None; return None;

View File

@@ -319,7 +319,9 @@ pub async fn inter_agent_communication(
communication: InterAgentCommunication, communication: InterAgentCommunication,
) { ) {
let trigger_turn = communication.trigger_turn; let trigger_turn = communication.trigger_turn;
sess.enqueue_mailbox_communication(communication); sess.input_queue
.enqueue_mailbox_communication(communication)
.await;
if trigger_turn { if trigger_turn {
sess.maybe_start_turn_for_pending_work_with_sub_id(sub_id) sess.maybe_start_turn_for_pending_work_with_sub_id(sub_id)
.await; .await;
@@ -961,7 +963,9 @@ Approved action:
}]; }];
if let Err(items) = sess.inject_response_items(items).await { if let Err(items) = sess.inject_response_items(items).await {
sess.queue_response_items_for_next_turn(items).await; sess.input_queue
.queue_response_items_for_next_turn(items)
.await;
} }
} }

View File

@@ -0,0 +1,392 @@
use crate::state::ActiveTurn;
use crate::state::MailboxDeliveryPhase;
use crate::state::TurnState;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::protocol::InterAgentCommunication;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::watch;
/// Turn-local pending input storage owned by the input queue flow.
#[derive(Default)]
pub(crate) struct TurnInputQueue {
items: Vec<ResponseInputItem>,
}
/// Session-scoped pending input storage and active-turn mailbox delivery coordination.
pub(crate) struct InputQueue {
mailbox_tx: watch::Sender<()>,
mailbox_pending_mails: Mutex<VecDeque<InterAgentCommunication>>,
idle_pending_input: Mutex<Vec<ResponseInputItem>>,
}
impl InputQueue {
pub(crate) fn new() -> Self {
let (mailbox_tx, _) = watch::channel(());
Self {
mailbox_tx,
mailbox_pending_mails: Mutex::new(VecDeque::new()),
idle_pending_input: Mutex::new(Vec::new()),
}
}
pub(crate) async fn subscribe_mailbox(&self) -> watch::Receiver<()> {
let mut mailbox_rx = self.mailbox_tx.subscribe();
if self.has_pending_mailbox_items().await {
mailbox_rx.mark_changed();
}
mailbox_rx
}
pub(crate) async fn enqueue_mailbox_communication(
&self,
communication: InterAgentCommunication,
) {
self.mailbox_pending_mails
.lock()
.await
.push_back(communication);
self.mailbox_tx.send_replace(());
}
pub(crate) async fn has_pending_mailbox_items(&self) -> bool {
!self.mailbox_pending_mails.lock().await.is_empty()
}
pub(crate) async fn has_trigger_turn_mailbox_items(&self) -> bool {
self.mailbox_pending_mails
.lock()
.await
.iter()
.any(|mail| mail.trigger_turn)
}
pub(crate) async fn drain_mailbox_input_items(&self) -> Vec<ResponseInputItem> {
self.mailbox_pending_mails
.lock()
.await
.drain(..)
.map(|mail| mail.to_response_input_item())
.collect()
}
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {
if items.is_empty() {
return;
}
self.idle_pending_input.lock().await.extend(items);
}
pub(crate) async fn take_queued_response_items_for_next_turn(&self) -> Vec<ResponseInputItem> {
std::mem::take(&mut *self.idle_pending_input.lock().await)
}
pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool {
!self.idle_pending_input.lock().await.is_empty()
}
pub(crate) async fn turn_state_for_sub_id(
&self,
active_turn: &Mutex<Option<ActiveTurn>>,
sub_id: &str,
) -> Option<Arc<Mutex<TurnState>>> {
let active = active_turn.lock().await;
active.as_ref().and_then(|active_turn| {
active_turn
.tasks
.contains_key(sub_id)
.then(|| Arc::clone(&active_turn.turn_state))
})
}
/// Clear any pending waiters and input buffered for the current turn.
pub(crate) async fn clear_pending(&self, active_turn: &ActiveTurn) {
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.clear_pending_waiters();
turn_state.pending_input.items.clear();
}
pub(crate) async fn defer_mailbox_delivery_to_next_turn(
&self,
active_turn: &Mutex<Option<ActiveTurn>>,
sub_id: &str,
) {
let turn_state = self.turn_state_for_sub_id(active_turn, sub_id).await;
let Some(turn_state) = turn_state else {
return;
};
let mut turn_state = turn_state.lock().await;
if !turn_state.pending_input.items.is_empty() {
return;
}
turn_state.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn);
}
pub(crate) async fn accept_mailbox_delivery_for_current_turn(
&self,
active_turn: &Mutex<Option<ActiveTurn>>,
sub_id: &str,
) {
let turn_state = self.turn_state_for_sub_id(active_turn, sub_id).await;
let Some(turn_state) = turn_state else {
return;
};
self.accept_mailbox_delivery_for_turn_state(turn_state.as_ref())
.await;
}
pub(super) async fn accept_mailbox_delivery_for_turn_state(
&self,
turn_state: &Mutex<TurnState>,
) {
turn_state
.lock()
.await
.accept_mailbox_delivery_for_current_turn();
}
pub(super) async fn push_pending_input_and_accept_mailbox_delivery_for_turn_state(
&self,
turn_state: &Mutex<TurnState>,
input: ResponseInputItem,
) {
let mut turn_state = turn_state.lock().await;
turn_state.pending_input.items.push(input);
turn_state.accept_mailbox_delivery_for_current_turn();
}
pub(crate) async fn extend_pending_input_for_turn_state(
&self,
turn_state: &Mutex<TurnState>,
input: Vec<ResponseInputItem>,
) {
turn_state.lock().await.pending_input.items.extend(input);
}
pub(crate) async fn take_pending_input_for_turn_state(
&self,
turn_state: &Mutex<TurnState>,
) -> Vec<ResponseInputItem> {
turn_state.lock().await.pending_input.items.split_off(0)
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub(crate) async fn inject_response_items(
&self,
active_turn: &Mutex<Option<ActiveTurn>>,
input: Vec<ResponseInputItem>,
) -> Result<(), Vec<ResponseInputItem>> {
let mut active = active_turn.lock().await;
match active.as_mut() {
Some(active_turn) => {
active_turn
.turn_state
.lock()
.await
.pending_input
.items
.extend(input);
Ok(())
}
None => Err(input),
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub(crate) async fn prepend_pending_input(
&self,
active_turn: &Mutex<Option<ActiveTurn>>,
mut input: Vec<ResponseInputItem>,
) -> Result<(), ()> {
let mut active = active_turn.lock().await;
match active.as_mut() {
Some(active_turn) => {
let mut turn_state = active_turn.turn_state.lock().await;
if !input.is_empty() {
let pending_input = &mut turn_state.pending_input;
input.append(&mut pending_input.items);
pending_input.items = input;
}
Ok(())
}
None => Err(()),
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub(crate) async fn get_pending_input(
&self,
active_turn: &Mutex<Option<ActiveTurn>>,
) -> Vec<ResponseInputItem> {
let (pending_input, accepts_mailbox_delivery) = {
let mut active = active_turn.lock().await;
match active.as_mut() {
Some(active_turn) => {
let mut turn_state = active_turn.turn_state.lock().await;
(
turn_state.pending_input.items.split_off(0),
turn_state.accepts_mailbox_delivery_for_current_turn(),
)
}
None => (Vec::new(), true),
}
};
if !accepts_mailbox_delivery {
return pending_input;
}
let mailbox_items = self.drain_mailbox_input_items().await;
if pending_input.is_empty() {
mailbox_items
} else if mailbox_items.is_empty() {
pending_input
} else {
let mut pending_input = pending_input;
pending_input.extend(mailbox_items);
pending_input
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state reads must remain atomic"
)]
pub(crate) async fn has_pending_input(&self, active_turn: &Mutex<Option<ActiveTurn>>) -> bool {
let (has_turn_pending_input, accepts_mailbox_delivery) = {
let active = active_turn.lock().await;
match active.as_ref() {
Some(active_turn) => {
let turn_state = active_turn.turn_state.lock().await;
(
!turn_state.pending_input.items.is_empty(),
turn_state.accepts_mailbox_delivery_for_current_turn(),
)
}
None => (false, true),
}
};
if has_turn_pending_input {
return true;
}
if !accepts_mailbox_delivery {
return false;
}
self.has_pending_mailbox_items().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::AgentPath;
use pretty_assertions::assert_eq;
fn make_mail(
author: AgentPath,
recipient: AgentPath,
content: &str,
trigger_turn: bool,
) -> InterAgentCommunication {
InterAgentCommunication::new(
author,
recipient,
Vec::new(),
content.to_string(),
trigger_turn,
)
}
#[tokio::test]
async fn input_queue_notifies_mailbox_subscribers() {
let input_queue = InputQueue::new();
let mut mailbox_rx = input_queue.subscribe_mailbox().await;
input_queue
.enqueue_mailbox_communication(make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"one",
/*trigger_turn*/ false,
))
.await;
input_queue
.enqueue_mailbox_communication(make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"two",
/*trigger_turn*/ false,
))
.await;
mailbox_rx.changed().await.expect("mailbox update");
}
#[tokio::test]
async fn input_queue_drains_mailbox_in_delivery_order() {
let input_queue = InputQueue::new();
let mail_one = make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"one",
/*trigger_turn*/ false,
);
let mail_two = make_mail(
AgentPath::try_from("/root/worker").expect("agent path"),
AgentPath::root(),
"two",
/*trigger_turn*/ false,
);
input_queue
.enqueue_mailbox_communication(mail_one.clone())
.await;
input_queue
.enqueue_mailbox_communication(mail_two.clone())
.await;
assert_eq!(
input_queue.drain_mailbox_input_items().await,
vec![
mail_one.to_response_input_item(),
mail_two.to_response_input_item()
]
);
assert!(!input_queue.has_pending_mailbox_items().await);
}
#[tokio::test]
async fn input_queue_tracks_pending_trigger_turn_mail() {
let input_queue = InputQueue::new();
input_queue
.enqueue_mailbox_communication(make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"queued",
/*trigger_turn*/ false,
))
.await;
assert!(!input_queue.has_trigger_turn_mailbox_items().await);
input_queue
.enqueue_mailbox_communication(make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"wake",
/*trigger_turn*/ true,
))
.await;
assert!(input_queue.has_trigger_turn_mailbox_items().await);
}
}

View File

@@ -10,8 +10,6 @@ use std::time::UNIX_EPOCH;
use crate::agent::AgentControl; use crate::agent::AgentControl;
use crate::agent::AgentStatus; use crate::agent::AgentStatus;
use crate::agent::Mailbox;
use crate::agent::MailboxReceiver;
use crate::agent::agent_status_from_event; use crate::agent::agent_status_from_event;
use crate::agent::status::is_final; use crate::agent::status::is_final;
use crate::attestation::AttestationProvider; use crate::attestation::AttestationProvider;
@@ -193,6 +191,7 @@ use codex_protocol::exec_output::StreamOutput;
mod config_lock; mod config_lock;
mod handlers; mod handlers;
mod input_queue;
mod mcp; mod mcp;
mod multi_agents; mod multi_agents;
mod review; mod review;
@@ -206,6 +205,7 @@ use self::config_lock::validate_config_lock_if_configured;
#[cfg(test)] #[cfg(test)]
use self::handlers::submission_dispatch_span; use self::handlers::submission_dispatch_span;
use self::handlers::submission_loop; use self::handlers::submission_loop;
pub(crate) use self::input_queue::TurnInputQueue;
use self::review::spawn_review_thread; use self::review::spawn_review_thread;
use self::session::AppServerClientMetadata; use self::session::AppServerClientMetadata;
use self::session::Session; use self::session::Session;
@@ -289,8 +289,6 @@ use crate::rollout::map_session_init_error;
use crate::session_startup_prewarm::SessionStartupPrewarmHandle; use crate::session_startup_prewarm::SessionStartupPrewarmHandle;
use crate::shell; use crate::shell;
use crate::shell_snapshot::ShellSnapshot; use crate::shell_snapshot::ShellSnapshot;
use crate::state::ActiveTurn;
use crate::state::MailboxDeliveryPhase;
use crate::state::PendingRequestPermissions; use crate::state::PendingRequestPermissions;
use crate::state::SessionServices; use crate::state::SessionServices;
use crate::state::SessionState; use crate::state::SessionState;
@@ -3163,195 +3161,36 @@ impl Session {
.set_responsesapi_client_metadata(responsesapi_client_metadata); .set_responsesapi_client_metadata(responsesapi_client_metadata);
} }
let mut turn_state = active_turn.turn_state.lock().await; self.input_queue
turn_state.push_pending_input(input.into()); .push_pending_input_and_accept_mailbox_delivery_for_turn_state(
turn_state.accept_mailbox_delivery_for_current_turn(); active_turn.turn_state.as_ref(),
input.into(),
)
.await;
Ok(active_turn_id.clone()) Ok(active_turn_id.clone())
} }
/// Returns the input if there was no task running to inject into. /// Returns the input if there was no task running to inject into.
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn inject_response_items( pub async fn inject_response_items(
&self, &self,
input: Vec<ResponseInputItem>, input: Vec<ResponseInputItem>,
) -> Result<(), Vec<ResponseInputItem>> { ) -> Result<(), Vec<ResponseInputItem>> {
let mut active = self.active_turn.lock().await; self.input_queue
match active.as_mut() { .inject_response_items(&self.active_turn, input)
Some(at) => {
let mut ts = at.turn_state.lock().await;
for item in input {
ts.push_pending_input(item);
}
Ok(())
}
None => Err(input),
}
}
pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) {
let turn_state = self.turn_state_for_sub_id(sub_id).await;
let Some(turn_state) = turn_state else {
return;
};
let mut turn_state = turn_state.lock().await;
if turn_state.has_pending_input() {
return;
}
turn_state.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn);
}
pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) {
let turn_state = self.turn_state_for_sub_id(sub_id).await;
let Some(turn_state) = turn_state else {
return;
};
turn_state
.lock()
.await .await
.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn);
} }
pub(crate) async fn record_memory_citation_for_turn(&self, sub_id: &str) { pub(crate) async fn record_memory_citation_for_turn(&self, sub_id: &str) {
let turn_state = self.turn_state_for_sub_id(sub_id).await; let turn_state = self
.input_queue
.turn_state_for_sub_id(&self.active_turn, sub_id)
.await;
let Some(turn_state) = turn_state else { let Some(turn_state) = turn_state else {
return; return;
}; };
turn_state.lock().await.has_memory_citation = true; turn_state.lock().await.has_memory_citation = true;
} }
async fn turn_state_for_sub_id(
&self,
sub_id: &str,
) -> Option<Arc<tokio::sync::Mutex<crate::state::TurnState>>> {
let active = self.active_turn.lock().await;
active.as_ref().and_then(|active_turn| {
active_turn
.tasks
.contains_key(sub_id)
.then(|| Arc::clone(&active_turn.turn_state))
})
}
pub(crate) fn subscribe_mailbox_seq(&self) -> watch::Receiver<u64> {
self.mailbox.subscribe()
}
pub(crate) fn enqueue_mailbox_communication(&self, communication: InterAgentCommunication) {
self.mailbox.send(communication);
}
pub(crate) async fn has_trigger_turn_mailbox_items(&self) -> bool {
self.mailbox_rx.lock().await.has_pending_trigger_turn()
}
pub(crate) async fn has_pending_mailbox_items(&self) -> bool {
self.mailbox_rx.lock().await.has_pending()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn prepend_pending_input(&self, input: Vec<ResponseInputItem>) -> Result<(), ()> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.prepend_pending_input(input);
Ok(())
}
None => Err(()),
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
let (pending_input, accepts_mailbox_delivery) = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
(
ts.take_pending_input(),
ts.accepts_mailbox_delivery_for_current_turn(),
)
}
None => (Vec::new(), true),
}
};
if !accepts_mailbox_delivery {
return pending_input;
}
let mailbox_items = {
let mut mailbox_rx = self.mailbox_rx.lock().await;
mailbox_rx
.drain()
.into_iter()
.map(|mail| mail.to_response_input_item())
.collect::<Vec<_>>()
};
if pending_input.is_empty() {
mailbox_items
} else if mailbox_items.is_empty() {
pending_input
} else {
let mut pending_input = pending_input;
pending_input.extend(mailbox_items);
pending_input
}
}
/// Queue response items to be injected into the next active turn created for this session.
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {
if items.is_empty() {
return;
}
let mut idle_pending_input = self.idle_pending_input.lock().await;
idle_pending_input.extend(items);
}
pub(crate) async fn take_queued_response_items_for_next_turn(&self) -> Vec<ResponseInputItem> {
std::mem::take(&mut *self.idle_pending_input.lock().await)
}
pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool {
!self.idle_pending_input.lock().await.is_empty()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state reads must remain atomic"
)]
pub async fn has_pending_input(&self) -> bool {
let (has_turn_pending_input, accepts_mailbox_delivery) = {
let active = self.active_turn.lock().await;
match active.as_ref() {
Some(at) => {
let ts = at.turn_state.lock().await;
(
ts.has_pending_input(),
ts.accepts_mailbox_delivery_for_current_turn(),
)
}
None => (false, true),
}
};
if has_turn_pending_input {
return true;
}
if !accepts_mailbox_delivery {
return false;
}
self.has_pending_mailbox_items().await
}
pub async fn interrupt_task(self: &Arc<Self>) { pub async fn interrupt_task(self: &Arc<Self>) {
info!("interrupt received: abort current task, if any"); info!("interrupt received: abort current task, if any");
let had_active_turn = self.active_turn.lock().await.is_some(); let had_active_turn = self.active_turn.lock().await.is_some();

View File

@@ -1,5 +1,7 @@
use super::input_queue::InputQueue;
use super::*; use super::*;
use crate::goals::GoalRuntimeState; use crate::goals::GoalRuntimeState;
use crate::state::ActiveTurn;
use codex_protocol::SessionId; use codex_protocol::SessionId;
use codex_protocol::config_types::ServiceTier; use codex_protocol::config_types::ServiceTier;
use codex_protocol::permissions::FileSystemPath; use codex_protocol::permissions::FileSystemPath;
@@ -27,9 +29,7 @@ pub(crate) struct Session {
pub(super) pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>, pub(super) pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(crate) conversation: Arc<RealtimeConversationManager>, pub(crate) conversation: Arc<RealtimeConversationManager>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>, pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(super) mailbox: Mailbox, pub(crate) input_queue: InputQueue,
pub(super) mailbox_rx: Mutex<MailboxReceiver>,
pub(super) idle_pending_input: Mutex<Vec<ResponseInputItem>>, // TODO (jif) merge with mailbox!
pub(crate) goal_runtime: GoalRuntimeState, pub(crate) goal_runtime: GoalRuntimeState,
pub(crate) guardian_review_session: GuardianReviewSessionManager, pub(crate) guardian_review_session: GuardianReviewSessionManager,
pub(crate) services: SessionServices, pub(crate) services: SessionServices,
@@ -950,7 +950,6 @@ impl Session {
let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) = let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) =
watch::channel(false); watch::channel(false);
let (mailbox, mailbox_rx) = Mailbox::new();
let sess = Arc::new(Session { let sess = Arc::new(Session {
conversation_id: thread_id, conversation_id: thread_id,
installation_id, installation_id,
@@ -963,9 +962,7 @@ impl Session {
pending_mcp_server_refresh_config: Mutex::new(None), pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()), conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None), active_turn: Mutex::new(None),
mailbox, input_queue: InputQueue::new(),
mailbox_rx: Mutex::new(mailbox_rx),
idle_pending_input: Mutex::new(Vec::new()),
goal_runtime: GoalRuntimeState::new(), goal_runtime: GoalRuntimeState::new(),
guardian_review_session: GuardianReviewSessionManager::default(), guardian_review_session: GuardianReviewSessionManager::default(),
services, services,
@@ -1130,7 +1127,7 @@ impl Session {
}; };
// record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted. // record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted.
sess.record_initial_history(initial_history).await; Box::pin(sess.record_initial_history(initial_history)).await;
{ {
let mut state = sess.state.lock().await; let mut state = sess.state.lock().await;
state.set_pending_session_start_source(Some(session_start_source)); state.set_pending_session_start_source(Some(session_start_source));

View File

@@ -4394,7 +4394,6 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
/*goal_tools_supported*/ true, /*goal_tools_supported*/ true,
); );
let (mailbox, mailbox_rx) = crate::agent::Mailbox::new();
let session = Session { let session = Session {
conversation_id: thread_id, conversation_id: thread_id,
installation_id: "11111111-1111-4111-8111-111111111111".to_string(), installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
@@ -4407,9 +4406,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
pending_mcp_server_refresh_config: Mutex::new(None), pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()), conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None), active_turn: Mutex::new(None),
mailbox, input_queue: super::input_queue::InputQueue::new(),
mailbox_rx: Mutex::new(mailbox_rx),
idle_pending_input: Mutex::new(Vec::new()),
goal_runtime: crate::goals::GoalRuntimeState::new(), goal_runtime: crate::goals::GoalRuntimeState::new(),
guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(), guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(),
services, services,
@@ -6252,7 +6249,6 @@ where
/*goal_tools_supported*/ true, /*goal_tools_supported*/ true,
)); ));
let (mailbox, mailbox_rx) = crate::agent::Mailbox::new();
let session = Arc::new(Session { let session = Arc::new(Session {
conversation_id: thread_id, conversation_id: thread_id,
installation_id: "11111111-1111-4111-8111-111111111111".to_string(), installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
@@ -6265,9 +6261,7 @@ where
pending_mcp_server_refresh_config: Mutex::new(None), pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()), conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None), active_turn: Mutex::new(None),
mailbox, input_queue: super::input_queue::InputQueue::new(),
mailbox_rx: Mutex::new(mailbox_rx),
idle_pending_input: Mutex::new(Vec::new()),
goal_runtime: crate::goals::GoalRuntimeState::new(), goal_runtime: crate::goals::GoalRuntimeState::new(),
guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(), guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(),
services, services,
@@ -8098,7 +8092,7 @@ async fn steer_input_returns_active_turn_id() {
.expect("steering with matching expected turn id should succeed"); .expect("steering with matching expected turn id should succeed");
assert_eq!(turn_id, tc.sub_id); assert_eq!(turn_id, tc.sub_id);
assert!(sess.has_pending_input().await); assert!(sess.input_queue.has_pending_input(&sess.active_turn).await);
} }
#[tokio::test] #[tokio::test]
@@ -8144,7 +8138,7 @@ async fn prepend_pending_input_keeps_older_tail_ahead_of_newer_input() {
.await .await
.expect("inject initial pending input into active turn"); .expect("inject initial pending input into active turn");
let drained = sess.get_pending_input().await; let drained = sess.input_queue.get_pending_input(&sess.active_turn).await;
assert_eq!(drained, vec![blocked, later.clone()]); assert_eq!(drained, vec![blocked, later.clone()]);
sess.inject_response_items(vec![newer.clone()]) sess.inject_response_items(vec![newer.clone()])
@@ -8153,11 +8147,15 @@ async fn prepend_pending_input_keeps_older_tail_ahead_of_newer_input() {
let mut drained_iter = drained.into_iter(); let mut drained_iter = drained.into_iter();
let _blocked = drained_iter.next().expect("blocked prompt should exist"); let _blocked = drained_iter.next().expect("blocked prompt should exist");
sess.prepend_pending_input(drained_iter.collect()) sess.input_queue
.prepend_pending_input(&sess.active_turn, drained_iter.collect())
.await .await
.expect("requeue later pending input at the front of the queue"); .expect("requeue later pending input at the front of the queue");
assert_eq!(sess.get_pending_input().await, vec![later, newer]); assert_eq!(
sess.input_queue.get_pending_input(&sess.active_turn).await,
vec![later, newer]
);
} }
#[tokio::test] #[tokio::test]
@@ -8171,7 +8169,8 @@ async fn queued_response_items_for_next_turn_move_into_next_active_turn() {
phase: None, phase: None,
}; };
sess.queue_response_items_for_next_turn(vec![queued_item.clone()]) sess.input_queue
.queue_response_items_for_next_turn(vec![queued_item.clone()])
.await; .await;
sess.spawn_task( sess.spawn_task(
@@ -8184,7 +8183,10 @@ async fn queued_response_items_for_next_turn_move_into_next_active_turn() {
) )
.await; .await;
assert_eq!(sess.get_pending_input().await, vec![queued_item]); assert_eq!(
sess.input_queue.get_pending_input(&sess.active_turn).await,
vec![queued_item]
);
} }
#[tokio::test] #[tokio::test]
@@ -8198,13 +8200,18 @@ async fn idle_interrupt_does_not_wake_queued_next_turn_items() {
phase: None, phase: None,
}; };
sess.queue_response_items_for_next_turn(vec![queued_item]) sess.input_queue
.queue_response_items_for_next_turn(vec![queued_item])
.await; .await;
sess.abort_all_tasks(TurnAbortReason::Interrupted).await; sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
assert!(sess.active_turn.lock().await.is_none()); assert!(sess.active_turn.lock().await.is_none());
assert!(sess.has_queued_response_items_for_next_turn().await); assert!(
sess.input_queue
.has_queued_response_items_for_next_turn()
.await
);
} }
#[tokio::test] #[tokio::test]
@@ -8222,16 +8229,17 @@ async fn abort_empty_active_turn_preserves_pending_input() {
let active_turn = active.get_or_insert_with(ActiveTurn::default); let active_turn = active.get_or_insert_with(ActiveTurn::default);
Arc::clone(&active_turn.turn_state) Arc::clone(&active_turn.turn_state)
}; };
turn_state sess.input_queue
.lock() .extend_pending_input_for_turn_state(turn_state.as_ref(), vec![pending_item.clone()])
.await .await;
.push_pending_input(pending_item.clone());
sess.abort_all_tasks(TurnAbortReason::Replaced).await; sess.abort_all_tasks(TurnAbortReason::Replaced).await;
assert!(sess.active_turn.lock().await.is_none()); assert!(sess.active_turn.lock().await.is_none());
assert_eq!( assert_eq!(
turn_state.lock().await.take_pending_input(), sess.input_queue
.take_pending_input_for_turn_state(turn_state.as_ref())
.await,
vec![pending_item] vec![pending_item]
); );
} }
@@ -8593,7 +8601,7 @@ async fn budget_limited_accounting_steers_active_turn_without_aborting() -> anyh
}) })
.await?; .await?;
let pending_input = sess.get_pending_input().await; let pending_input = sess.input_queue.get_pending_input(&sess.active_turn).await;
let [ResponseInputItem::Message { role, content, .. }] = pending_input.as_slice() else { let [ResponseInputItem::Message { role, content, .. }] = pending_input.as_slice() else {
panic!("expected one budget-limit steering message, got {pending_input:#?}"); panic!("expected one budget-limit steering message, got {pending_input:#?}");
}; };
@@ -8814,7 +8822,7 @@ async fn external_objective_change_steers_active_turn() -> anyhow::Result<()> {
}) })
.await?; .await?;
let pending_input = sess.get_pending_input().await; let pending_input = sess.input_queue.get_pending_input(&sess.active_turn).await;
assert!( assert!(
pending_input.iter().any(|item| { pending_input.iter().any(|item| {
matches!( matches!(
@@ -9021,19 +9029,26 @@ async fn queue_only_mailbox_mail_waits_for_next_turn_after_answer_boundary() {
) )
.await; .await;
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; sess.input_queue
sess.enqueue_mailbox_communication(communication.clone()); .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id)
.await;
sess.input_queue
.enqueue_mailbox_communication(communication.clone())
.await;
assert!( assert!(
!sess.has_pending_input().await, !sess.input_queue.has_pending_input(&sess.active_turn).await,
"queue-only mailbox mail should stay buffered once the current turn emitted its answer" "queue-only mailbox mail should stay buffered once the current turn emitted its answer"
); );
assert_eq!(sess.get_pending_input().await, Vec::new()); assert_eq!(
sess.input_queue.get_pending_input(&sess.active_turn).await,
Vec::new()
);
sess.abort_all_tasks(TurnAbortReason::Replaced).await; sess.abort_all_tasks(TurnAbortReason::Replaced).await;
assert_eq!( assert_eq!(
sess.get_pending_input().await, sess.input_queue.get_pending_input(&sess.active_turn).await,
vec![communication.to_response_input_item()], vec![communication.to_response_input_item()],
); );
} }
@@ -9051,23 +9066,27 @@ async fn trigger_turn_mailbox_mail_waits_for_next_turn_after_answer_boundary() {
) )
.await; .await;
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; sess.input_queue
sess.enqueue_mailbox_communication(InterAgentCommunication::new( .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id)
AgentPath::try_from("/root/worker").expect("worker path should parse"), .await;
AgentPath::root(), sess.input_queue
Vec::new(), .enqueue_mailbox_communication(InterAgentCommunication::new(
"late trigger update".to_string(), AgentPath::try_from("/root/worker").expect("worker path should parse"),
/*trigger_turn*/ true, AgentPath::root(),
)); Vec::new(),
"late trigger update".to_string(),
/*trigger_turn*/ true,
))
.await;
assert!( assert!(
!sess.has_pending_input().await, !sess.input_queue.has_pending_input(&sess.active_turn).await,
"trigger-turn mailbox mail should not extend the current turn after its answer boundary" "trigger-turn mailbox mail should not extend the current turn after its answer boundary"
); );
sess.abort_all_tasks(TurnAbortReason::Replaced).await; sess.abort_all_tasks(TurnAbortReason::Replaced).await;
assert!(sess.has_trigger_turn_mailbox_items().await); assert!(sess.input_queue.has_trigger_turn_mailbox_items().await);
} }
#[tokio::test] #[tokio::test]
@@ -9090,8 +9109,12 @@ async fn steered_input_reopens_mailbox_delivery_for_current_turn() {
) )
.await; .await;
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; sess.input_queue
sess.enqueue_mailbox_communication(communication.clone()); .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id)
.await;
sess.input_queue
.enqueue_mailbox_communication(communication.clone())
.await;
sess.steer_input( sess.steer_input(
vec![UserInput::Text { vec![UserInput::Text {
text: "follow up".to_string(), text: "follow up".to_string(),
@@ -9104,7 +9127,7 @@ async fn steered_input_reopens_mailbox_delivery_for_current_turn() {
.expect("steered input should be accepted"); .expect("steered input should be accepted");
assert_eq!( assert_eq!(
sess.get_pending_input().await, sess.input_queue.get_pending_input(&sess.active_turn).await,
vec![ vec![
ResponseInputItem::from(vec![UserInput::Text { ResponseInputItem::from(vec![UserInput::Text {
text: "follow up".to_string(), text: "follow up".to_string(),
@@ -9135,8 +9158,12 @@ async fn stale_defer_mailbox_delivery_does_not_override_steered_input() {
) )
.await; .await;
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; sess.input_queue
sess.enqueue_mailbox_communication(communication.clone()); .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id)
.await;
sess.input_queue
.enqueue_mailbox_communication(communication.clone())
.await;
sess.steer_input( sess.steer_input(
vec![UserInput::Text { vec![UserInput::Text {
text: "follow up".to_string(), text: "follow up".to_string(),
@@ -9148,10 +9175,12 @@ async fn stale_defer_mailbox_delivery_does_not_override_steered_input() {
.await .await
.expect("steered input should be accepted"); .expect("steered input should be accepted");
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; sess.input_queue
.defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id)
.await;
assert_eq!( assert_eq!(
sess.get_pending_input().await, sess.input_queue.get_pending_input(&sess.active_turn).await,
vec![ vec![
ResponseInputItem::from(vec![UserInput::Text { ResponseInputItem::from(vec![UserInput::Text {
text: "follow up".to_string(), text: "follow up".to_string(),
@@ -9182,8 +9211,12 @@ async fn tool_calls_reopen_mailbox_delivery_for_current_turn() {
) )
.await; .await;
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; sess.input_queue
sess.enqueue_mailbox_communication(communication.clone()); .defer_mailbox_delivery_to_next_turn(&sess.active_turn, &tc.sub_id)
.await;
sess.input_queue
.enqueue_mailbox_communication(communication.clone())
.await;
let item = ResponseItem::FunctionCall { let item = ResponseItem::FunctionCall {
id: None, id: None,
@@ -9207,7 +9240,7 @@ async fn tool_calls_reopen_mailbox_delivery_for_current_turn() {
assert!(output.needs_follow_up); assert!(output.needs_follow_up);
assert!(output.tool_future.is_some()); assert!(output.tool_future.is_some());
assert_eq!( assert_eq!(
sess.get_pending_input().await, sess.input_queue.get_pending_input(&sess.active_turn).await,
vec![communication.to_response_input_item()], vec![communication.to_response_input_item()],
); );
} }

View File

@@ -148,7 +148,7 @@ pub(crate) async fn run_turn(
prewarmed_client_session: Option<ModelClientSession>, prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> Option<String> { ) -> Option<String> {
if input.is_empty() && !sess.has_pending_input().await { if input.is_empty() && !sess.input_queue.has_pending_input(&sess.active_turn).await {
return None; return None;
} }
@@ -413,7 +413,7 @@ pub(crate) async fn run_turn(
// submitted through the UI while the model was running. Though the UI // submitted through the UI while the model was running. Though the UI
// may support this, the model might not. // may support this, the model might not.
let pending_input = if can_drain_pending_input { let pending_input = if can_drain_pending_input {
sess.get_pending_input().await sess.input_queue.get_pending_input(&sess.active_turn).await
} else { } else {
Vec::new() Vec::new()
}; };
@@ -434,7 +434,10 @@ pub(crate) async fn run_turn(
} => { } => {
let remaining_pending_input = pending_input_iter.collect::<Vec<_>>(); let remaining_pending_input = pending_input_iter.collect::<Vec<_>>();
if !remaining_pending_input.is_empty() { if !remaining_pending_input.is_empty() {
let _ = sess.prepend_pending_input(remaining_pending_input).await; let _ = sess
.input_queue
.prepend_pending_input(&sess.active_turn, remaining_pending_input)
.await;
requeued_pending_input = true; requeued_pending_input = true;
} }
blocked_pending_input_contexts = additional_contexts; blocked_pending_input_contexts = additional_contexts;
@@ -494,7 +497,7 @@ pub(crate) async fn run_turn(
last_agent_message: sampling_request_last_agent_message, last_agent_message: sampling_request_last_agent_message,
} = sampling_request_output; } = sampling_request_output;
can_drain_pending_input = true; can_drain_pending_input = true;
let has_pending_input = sess.has_pending_input().await; let has_pending_input = sess.input_queue.has_pending_input(&sess.active_turn).await;
let needs_follow_up = model_needs_follow_up || has_pending_input; let needs_follow_up = model_needs_follow_up || has_pending_input;
let total_usage_tokens = sess.get_total_token_usage().await; let total_usage_tokens = sess.get_total_token_usage().await;
let token_limit_reached = total_usage_tokens >= auto_compact_limit; let token_limit_reached = total_usage_tokens >= auto_compact_limit;
@@ -2065,7 +2068,7 @@ async fn try_run_sampling_request(
} }
needs_follow_up |= output_result.needs_follow_up; needs_follow_up |= output_result.needs_follow_up;
// todo: remove before stabilizing multi-agent v2 // todo: remove before stabilizing multi-agent v2
if preempt_for_mailbox_mail && sess.mailbox_rx.lock().await.has_pending() { if preempt_for_mailbox_mail && sess.input_queue.has_pending_mailbox_items().await {
break Ok(SamplingRequestResult { break Ok(SamplingRequestResult {
needs_follow_up: true, needs_follow_up: true,
last_agent_message, last_agent_message,

View File

@@ -11,7 +11,6 @@ use tokio_util::task::AbortOnDropHandle;
use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionData;
use codex_protocol::dynamic_tools::DynamicToolResponse; use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::request_permissions::RequestPermissionProfile; use codex_protocol::request_permissions::RequestPermissionProfile;
use codex_protocol::request_permissions::RequestPermissionsResponse; use codex_protocol::request_permissions::RequestPermissionsResponse;
use codex_protocol::request_user_input::RequestUserInputResponse; use codex_protocol::request_user_input::RequestUserInputResponse;
@@ -20,6 +19,7 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use rmcp::model::RequestId; use rmcp::model::RequestId;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use crate::session::TurnInputQueue;
use crate::session::turn_context::TurnContext; use crate::session::turn_context::TurnContext;
use crate::tasks::AnySessionTask; use crate::tasks::AnySessionTask;
use codex_protocol::models::AdditionalPermissionProfile; use codex_protocol::models::AdditionalPermissionProfile;
@@ -115,7 +115,7 @@ pub(crate) struct TurnState {
pending_user_input: HashMap<String, oneshot::Sender<RequestUserInputResponse>>, pending_user_input: HashMap<String, oneshot::Sender<RequestUserInputResponse>>,
pending_elicitations: HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>, pending_elicitations: HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>,
pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>, pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>,
pending_input: Vec<ResponseInputItem>, pub(crate) pending_input: TurnInputQueue,
mailbox_delivery_phase: MailboxDeliveryPhase, mailbox_delivery_phase: MailboxDeliveryPhase,
granted_permissions: Option<AdditionalPermissionProfile>, granted_permissions: Option<AdditionalPermissionProfile>,
strict_auto_review_enabled: bool, strict_auto_review_enabled: bool,
@@ -146,13 +146,12 @@ impl TurnState {
self.pending_approvals.remove(key) self.pending_approvals.remove(key)
} }
pub(crate) fn clear_pending(&mut self) { pub(crate) fn clear_pending_waiters(&mut self) {
self.pending_approvals.clear(); self.pending_approvals.clear();
self.pending_request_permissions.clear(); self.pending_request_permissions.clear();
self.pending_user_input.clear(); self.pending_user_input.clear();
self.pending_elicitations.clear(); self.pending_elicitations.clear();
self.pending_dynamic_tools.clear(); self.pending_dynamic_tools.clear();
self.pending_input.clear();
} }
pub(crate) fn insert_pending_request_permissions( pub(crate) fn insert_pending_request_permissions(
@@ -220,33 +219,6 @@ impl TurnState {
self.pending_dynamic_tools.remove(key) self.pending_dynamic_tools.remove(key)
} }
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
self.pending_input.push(input);
}
pub(crate) fn prepend_pending_input(&mut self, mut input: Vec<ResponseInputItem>) {
if input.is_empty() {
return;
}
input.append(&mut self.pending_input);
self.pending_input = input;
}
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
if self.pending_input.is_empty() {
Vec::with_capacity(0)
} else {
let mut ret = Vec::new();
std::mem::swap(&mut ret, &mut self.pending_input);
ret
}
}
pub(crate) fn has_pending_input(&self) -> bool {
!self.pending_input.is_empty()
}
pub(crate) fn accept_mailbox_delivery_for_current_turn(&mut self) { pub(crate) fn accept_mailbox_delivery_for_current_turn(&mut self) {
self.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn); self.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn);
} }
@@ -276,11 +248,3 @@ impl TurnState {
self.strict_auto_review_enabled self.strict_auto_review_enabled
} }
} }
impl ActiveTurn {
/// Clear any pending approvals and input buffered for the current turn.
pub(crate) async fn clear_pending(&self) {
let mut ts = self.turn_state.lock().await;
ts.clear_pending();
}
}

View File

@@ -158,7 +158,8 @@ pub(crate) async fn record_completed_response_item_with_finalized_facts(
|facts| facts.defers_mailbox_delivery_to_next_turn, |facts| facts.defers_mailbox_delivery_to_next_turn,
); );
if defers_mailbox_delivery { if defers_mailbox_delivery {
sess.defer_mailbox_delivery_to_next_turn(&turn_context.sub_id) sess.input_queue
.defer_mailbox_delivery_to_next_turn(&sess.active_turn, &turn_context.sub_id)
.await; .await;
} }
mark_thread_memory_mode_polluted_if_external_context(sess, turn_context, item).await; mark_thread_memory_mode_polluted_if_external_context(sess, turn_context, item).await;
@@ -351,7 +352,11 @@ pub(crate) async fn handle_output_item_done(
// The model emitted a tool call; log it, persist the item immediately, and queue the tool execution. // The model emitted a tool call; log it, persist the item immediately, and queue the tool execution.
Ok(Some(call)) => { Ok(Some(call)) => {
ctx.sess ctx.sess
.accept_mailbox_delivery_for_current_turn(&ctx.turn_context.sub_id) .input_queue
.accept_mailbox_delivery_for_current_turn(
&ctx.sess.active_turn,
&ctx.turn_context.sub_id,
)
.await; .await;
let payload_preview = call.payload.log_payload().into_owned(); let payload_preview = call.payload.log_payload().into_owned();

View File

@@ -347,24 +347,23 @@ impl Session {
{ {
warn!("failed to apply goal runtime turn-start event: {err}"); warn!("failed to apply goal runtime turn-start event: {err}");
} }
let queued_response_items = self.take_queued_response_items_for_next_turn().await; let queued_response_items = self
let mailbox_items = self.get_pending_input().await; .input_queue
.take_queued_response_items_for_next_turn()
.await;
let mailbox_items = self.input_queue.get_pending_input(&self.active_turn).await;
let turn_state = { let turn_state = {
let mut active = self.active_turn.lock().await; let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default); let turn = active.get_or_insert_with(ActiveTurn::default);
debug_assert!(turn.tasks.is_empty()); debug_assert!(turn.tasks.is_empty());
Arc::clone(&turn.turn_state) Arc::clone(&turn.turn_state)
}; };
{ turn_state.lock().await.token_usage_at_turn_start = token_usage_at_turn_start;
let mut turn_state = turn_state.lock().await; let mut pending_items = queued_response_items;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start; pending_items.extend(mailbox_items);
for item in queued_response_items { self.input_queue
turn_state.push_pending_input(item); .extend_pending_input_for_turn_state(turn_state.as_ref(), pending_items)
} .await;
for item in mailbox_items {
turn_state.push_pending_input(item);
}
}
self.emit_turn_start_lifecycle(turn_context.extension_data.as_ref()) self.emit_turn_start_lifecycle(turn_context.extension_data.as_ref())
.await; .await;
@@ -468,8 +467,11 @@ impl Session {
self: &Arc<Self>, self: &Arc<Self>,
sub_id: String, sub_id: String,
) { ) {
if !self.has_queued_response_items_for_next_turn().await if !self
&& !self.has_trigger_turn_mailbox_items().await .input_queue
.has_queued_response_items_for_next_turn()
.await
&& !self.input_queue.has_trigger_turn_mailbox_items().await
{ {
return; return;
} }
@@ -521,7 +523,7 @@ impl Session {
if let Some(active_turn) = active_turn_to_clear { if let Some(active_turn) = active_turn_to_clear {
// Let interrupted tasks observe cancellation before dropping pending approvals, or an // Let interrupted tasks observe cancellation before dropping pending approvals, or an
// in-flight approval wait can surface as a model-visible rejection before TurnAborted. // in-flight approval wait can surface as a model-visible rejection before TurnAborted.
active_turn.clear_pending().await; self.input_queue.clear_pending(&active_turn).await;
} }
if reason == TurnAbortReason::Interrupted && aborted_turn { if reason == TurnAbortReason::Interrupted && aborted_turn {
self.maybe_start_turn_for_pending_work().await; self.maybe_start_turn_for_pending_work().await;
@@ -567,7 +569,7 @@ impl Session {
} }
// Let interrupted tasks observe cancellation before dropping pending approvals, or an // Let interrupted tasks observe cancellation before dropping pending approvals, or an
// in-flight approval wait can surface as a model-visible rejection before TurnAborted. // in-flight approval wait can surface as a model-visible rejection before TurnAborted.
active_turn.clear_pending().await; self.input_queue.clear_pending(&active_turn).await;
if reason == TurnAbortReason::Interrupted { if reason == TurnAbortReason::Interrupted {
self.maybe_start_turn_for_pending_work().await; self.maybe_start_turn_for_pending_work().await;
@@ -609,8 +611,11 @@ impl Session {
} }
}; };
if let Some(turn_state) = turn_state.as_ref() { if let Some(turn_state) = turn_state.as_ref() {
let mut ts = turn_state.lock().await; pending_input = self
pending_input = ts.take_pending_input(); .input_queue
.take_pending_input_for_turn_state(turn_state.as_ref())
.await;
let ts = turn_state.lock().await;
turn_had_memory_citation = ts.has_memory_citation; turn_had_memory_citation = ts.has_memory_citation;
turn_tool_calls = ts.tool_calls; turn_tool_calls = ts.tool_calls;
token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone()); token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone());

View File

@@ -80,7 +80,7 @@ impl SessionTask for RegularTask {
) )
.instrument(run_turn_span.clone()) .instrument(run_turn_span.clone())
.await; .await;
if !sess.has_pending_input().await { if !sess.input_queue.has_pending_input(&sess.active_turn).await {
return last_agent_message; return last_agent_message;
} }
next_input = Vec::new(); next_input = Vec::new();

View File

@@ -2791,13 +2791,16 @@ async fn multi_agent_v2_wait_agent_accepts_timeout_only_argument() {
}); });
tokio::task::yield_now().await; tokio::task::yield_now().await;
session.enqueue_mailbox_communication(InterAgentCommunication::new( session
worker_path, .input_queue
AgentPath::root(), .enqueue_mailbox_communication(InterAgentCommunication::new(
Vec::new(), worker_path,
"hello from worker".to_string(), AgentPath::root(),
/*trigger_turn*/ false, Vec::new(),
)); "hello from worker".to_string(),
/*trigger_turn*/ false,
))
.await;
let output = wait_task let output = wait_task
.await .await
@@ -3276,13 +3279,16 @@ async fn multi_agent_v2_wait_agent_returns_summary_for_mailbox_activity() {
}); });
tokio::task::yield_now().await; tokio::task::yield_now().await;
session.enqueue_mailbox_communication(InterAgentCommunication::new( session
worker_path, .input_queue
AgentPath::root(), .enqueue_mailbox_communication(InterAgentCommunication::new(
Vec::new(), worker_path,
"completed".to_string(), AgentPath::root(),
/*trigger_turn*/ false, Vec::new(),
)); "completed".to_string(),
/*trigger_turn*/ false,
))
.await;
let wait_output = wait_task let wait_output = wait_task
.await .await
@@ -3346,13 +3352,16 @@ async fn multi_agent_v2_wait_agent_returns_for_already_queued_mail() {
.agent_path .agent_path
.expect("worker path"); .expect("worker path");
session.enqueue_mailbox_communication(InterAgentCommunication::new( session
worker_path, .input_queue
AgentPath::root(), .enqueue_mailbox_communication(InterAgentCommunication::new(
Vec::new(), worker_path,
"already queued".to_string(), AgentPath::root(),
/*trigger_turn*/ false, Vec::new(),
)); "already queued".to_string(),
/*trigger_turn*/ false,
))
.await;
let output = timeout( let output = timeout(
Duration::from_millis(500), Duration::from_millis(500),
@@ -3442,13 +3451,16 @@ async fn multi_agent_v2_wait_agent_wakes_on_any_mailbox_notification() {
}); });
tokio::task::yield_now().await; tokio::task::yield_now().await;
session.enqueue_mailbox_communication(InterAgentCommunication::new( session
worker_b_path, .input_queue
AgentPath::root(), .enqueue_mailbox_communication(InterAgentCommunication::new(
Vec::new(), worker_b_path,
"from worker b".to_string(), AgentPath::root(),
/*trigger_turn*/ false, Vec::new(),
)); "from worker b".to_string(),
/*trigger_turn*/ false,
))
.await;
let output = wait_task let output = wait_task
.await .await
@@ -3527,13 +3539,16 @@ async fn multi_agent_v2_wait_agent_does_not_return_completed_content() {
}); });
tokio::task::yield_now().await; tokio::task::yield_now().await;
session.enqueue_mailbox_communication(InterAgentCommunication::new( session
worker_path, .input_queue
AgentPath::root(), .enqueue_mailbox_communication(InterAgentCommunication::new(
Vec::new(), worker_path,
"sensitive child output".to_string(), AgentPath::root(),
/*trigger_turn*/ false, Vec::new(),
)); "sensitive child output".to_string(),
/*trigger_turn*/ false,
))
.await;
let output = wait_task let output = wait_task
.await .await

View File

@@ -60,7 +60,7 @@ impl ToolExecutor<ToolInvocation> for Handler {
None => default_timeout_ms, None => default_timeout_ms,
}; };
let mut mailbox_seq_rx = session.subscribe_mailbox_seq(); let mut mailbox_rx = session.input_queue.subscribe_mailbox().await;
session session
.send_event( .send_event(
@@ -76,12 +76,8 @@ impl ToolExecutor<ToolInvocation> for Handler {
) )
.await; .await;
let timed_out = if session.has_pending_mailbox_items().await { let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
false let timed_out = !wait_for_mailbox_change(&mut mailbox_rx, deadline).await;
} else {
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
!wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await
};
let result = WaitAgentResult::from_timed_out(timed_out); let result = WaitAgentResult::from_timed_out(timed_out);
session session
@@ -153,10 +149,10 @@ impl ToolOutput for WaitAgentResult {
} }
async fn wait_for_mailbox_change( async fn wait_for_mailbox_change(
mailbox_seq_rx: &mut tokio::sync::watch::Receiver<u64>, mailbox_rx: &mut tokio::sync::watch::Receiver<()>,
deadline: Instant, deadline: Instant,
) -> bool { ) -> bool {
match timeout_at(deadline, mailbox_seq_rx.changed()).await { match timeout_at(deadline, mailbox_rx.changed()).await {
Ok(Ok(())) => true, Ok(Ok(())) => true,
Ok(Err(_)) | Err(_) => false, Ok(Err(_)) | Err(_) => false,
} }