fix: race pending (#16561)

This commit is contained in:
jif-oai
2026-04-02 15:31:30 +02:00
committed by GitHub
parent 97df35c74f
commit 627299c551
5 changed files with 250 additions and 12 deletions

View File

@@ -18,6 +18,7 @@ use crate::tools::format_exec_output_str;
use codex_features::Features;
use codex_login::CodexAuth;
use codex_mcp::mcp_connection_manager::ToolInfo;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputPayload;
@@ -69,6 +70,7 @@ use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::CreditsSnapshot;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::NetworkApprovalProtocol;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
@@ -4781,6 +4783,119 @@ async fn queued_response_items_for_next_turn_move_into_next_active_turn() {
assert_eq!(sess.get_pending_input().await, vec![queued_item]);
}
#[tokio::test]
async fn queue_only_mailbox_mail_waits_for_next_turn_after_answer_boundary() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
let communication = InterAgentCommunication::new(
AgentPath::try_from("/root/worker").expect("worker path should parse"),
AgentPath::root(),
Vec::new(),
"late queue-only update".to_string(),
/*trigger_turn*/ false,
);
sess.spawn_task(
Arc::clone(&tc),
Vec::new(),
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: true,
},
)
.await;
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await;
sess.enqueue_mailbox_communication(communication.clone());
assert!(
!sess.has_pending_input().await,
"queue-only mailbox mail should stay buffered once the current turn emitted its answer"
);
assert_eq!(sess.get_pending_input().await, Vec::new());
sess.abort_all_tasks(TurnAbortReason::Replaced).await;
assert_eq!(
sess.get_pending_input().await,
vec![communication.to_response_input_item()],
);
}
#[tokio::test]
async fn trigger_turn_mailbox_mail_waits_for_next_turn_after_answer_boundary() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
sess.spawn_task(
Arc::clone(&tc),
Vec::new(),
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: true,
},
)
.await;
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await;
sess.enqueue_mailbox_communication(InterAgentCommunication::new(
AgentPath::try_from("/root/worker").expect("worker path should parse"),
AgentPath::root(),
Vec::new(),
"late trigger update".to_string(),
/*trigger_turn*/ true,
));
assert!(
!sess.has_pending_input().await,
"trigger-turn mailbox mail should not extend the current turn after its answer boundary"
);
sess.abort_all_tasks(TurnAbortReason::Replaced).await;
assert!(sess.has_trigger_turn_mailbox_items().await);
}
#[tokio::test]
async fn steered_input_reopens_mailbox_delivery_for_current_turn() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
let communication = InterAgentCommunication::new(
AgentPath::try_from("/root/worker").expect("worker path should parse"),
AgentPath::root(),
Vec::new(),
"queued child update".to_string(),
/*trigger_turn*/ false,
);
sess.spawn_task(
Arc::clone(&tc),
Vec::new(),
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: true,
},
)
.await;
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await;
sess.enqueue_mailbox_communication(communication.clone());
sess.steer_input(
vec![UserInput::Text {
text: "follow up".to_string(),
text_elements: Vec::new(),
}],
Some(&tc.sub_id),
)
.await
.expect("steered input should be accepted");
assert_eq!(
sess.get_pending_input().await,
vec![
ResponseInputItem::from(vec![UserInput::Text {
text: "follow up".to_string(),
text_elements: Vec::new(),
}]),
communication.to_response_input_item(),
],
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;