[codex] drain mailbox only at request boundaries (#17749)

This changes multi-agent v2 mailbox handling so incoming inter-agent
messages no longer preempt an in-flight sampling stream at reasoning or
commentary output-item boundaries.
This commit is contained in:
Thibault Sottiaux
2026-04-13 22:09:51 -07:00
committed by GitHub
parent ad37389c18
commit 05c5829923
4 changed files with 24 additions and 51 deletions

View File

@@ -338,7 +338,6 @@ use codex_protocol::config_types::ServiceTier;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::models::ContentItem;
use codex_protocol::models::DeveloperInstructions;
use codex_protocol::models::MessagePhase;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
@@ -7836,25 +7835,6 @@ async fn try_run_sampling_request(
cancellation_token: cancellation_token.child_token(),
};
let preempt_for_mailbox_mail = match &item {
ResponseItem::Message { role, phase, .. } => {
role == "assistant" && matches!(phase, Some(MessagePhase::Commentary))
}
ResponseItem::Reasoning { .. } => true,
ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::GhostSnapshot { .. }
| ResponseItem::Compaction { .. }
| ResponseItem::Other => false,
};
let output_result =
match handle_output_item_done(&mut ctx, item, previously_active_item)
.instrument(handle_responses)
@@ -7870,13 +7850,6 @@ async fn try_run_sampling_request(
last_agent_message = Some(agent_message);
}
needs_follow_up |= output_result.needs_follow_up;
// todo: remove before stabilizing multi-agent v2
if preempt_for_mailbox_mail && sess.mailbox_rx.lock().await.has_pending() {
break Ok(SamplingRequestResult {
needs_follow_up: true,
last_agent_message,
});
}
}
ResponseEvent::OutputItemAdded(item) => {
if let Some(turn_item) = handle_non_tool_response_item(

View File

@@ -313,7 +313,7 @@ async fn injected_user_input_triggers_follow_up_request_with_deltas() {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_inter_agent_mail_triggers_follow_up_after_reasoning_item() {
async fn queued_inter_agent_mail_waits_for_request_boundary_after_reasoning_item() {
let (gate_reasoning_done_tx, gate_reasoning_done_rx) = oneshot::channel();
let first_chunks = vec![
@@ -323,14 +323,18 @@ async fn queued_inter_agent_mail_triggers_follow_up_after_reasoning_item() {
gate_reasoning_done_rx,
vec![
ev_reasoning_item("reason-1", &["thinking"], &[]),
ev_function_call(
"call-stale",
"shell",
r#"{"command":"echo stale tool call"}"#,
),
ev_message_item_added("msg-stale", ""),
ev_output_text_delta("stale final"),
ev_message_item_done("msg-stale", "stale final"),
ev_message_item_added("msg-preserved", ""),
ev_output_text_delta("preserved commentary"),
json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": "msg-preserved",
"content": [{"type": "output_text", "text": "preserved commentary"}],
"phase": "commentary",
}
}),
ev_completed("resp-1"),
],
),
@@ -358,7 +362,7 @@ async fn queued_inter_agent_mail_triggers_follow_up_after_reasoning_item() {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_inter_agent_mail_triggers_follow_up_after_commentary_message_item() {
async fn queued_inter_agent_mail_waits_for_request_boundary_after_commentary_message_item() {
let (gate_message_done_tx, gate_message_done_rx) = oneshot::channel();
let first_chunks = vec![
@@ -367,25 +371,18 @@ async fn queued_inter_agent_mail_triggers_follow_up_after_commentary_message_ite
gated_chunk(
gate_message_done_rx,
vec![
ev_output_text_delta("first answer"),
ev_output_text_delta("first commentary"),
json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": "msg-1",
"content": [{"type": "output_text", "text": "first answer"}],
"content": [{"type": "output_text", "text": "first commentary"}],
"phase": "commentary",
}
}),
ev_function_call(
"call-stale",
"shell",
r#"{"command":"echo stale tool call"}"#,
),
ev_message_item_added("msg-stale", ""),
ev_output_text_delta("stale final"),
ev_message_item_done("msg-stale", "stale final"),
ev_function_call("call-preserved", "test_tool", "{}"),
ev_completed("resp-1"),
],
),
@@ -411,7 +408,7 @@ async fn queued_inter_agent_mail_triggers_follow_up_after_commentary_message_ite
let _ = gate_message_done_tx.send(());
wait_for_agent_message(&codex, "first answer").await;
wait_for_agent_message(&codex, "first commentary").await;
wait_for_turn_complete(&codex).await;

View File

@@ -13,5 +13,7 @@ Scenario: /responses POST bodies (input only, redacted like other suite snapshot
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first prompt
03:message/assistant:first answer
04:message/assistant:{"author":"/root/worker","recipient":"/root","other_recipients":[],"content":"queued child update","trigger_turn":false}
03:message/assistant:first commentary
04:function_call/test_tool
05:function_call_output:unsupported call: test_tool
06:message/assistant:{"author":"/root/worker","recipient":"/root","other_recipients":[],"content":"queued child update","trigger_turn":false}

View File

@@ -14,4 +14,5 @@ Scenario: /responses POST bodies (input only, redacted like other suite snapshot
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first prompt
03:reasoning:summary=thinking:encrypted=true
04:message/assistant:{"author":"/root/worker","recipient":"/root","other_recipients":[],"content":"queued child update","trigger_turn":false}
04:message/assistant:preserved commentary
05:message/assistant:{"author":"/root/worker","recipient":"/root","other_recipients":[],"content":"queued child update","trigger_turn":false}