Preempt mailbox mail after reasoning/commentary items (#16725)

Send pending mailbox mail after completed reasoning or commentary items
so follow-up requests can pick it up mid-turn.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-03 18:29:05 -07:00
committed by GitHub
parent 91ca49e53c
commit e4f1b3a65e
5 changed files with 363 additions and 0 deletions

View File

@@ -1,17 +1,31 @@
use std::sync::Arc;
use codex_core::CodexThread;
use codex_protocol::AgentPath;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
use core_test_support::responses;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_reasoning_item;
use core_test_support::responses::ev_reasoning_item_added;
use core_test_support::responses::ev_response_created;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::StreamingSseServer;
use core_test_support::streaming_sse::start_streaming_sse_server;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::from_slice;
use serde_json::json;
use tokio::sync::oneshot;
fn ev_message_item_done(id: &str, text: &str) -> Value {
@@ -44,6 +58,115 @@ fn message_input_texts(body: &Value, role: &str) -> Vec<String> {
.collect()
}
fn chunk(event: Value) -> StreamingSseChunk {
StreamingSseChunk {
gate: None,
body: responses::sse(vec![event]),
}
}
fn gated_chunk(gate: oneshot::Receiver<()>, events: Vec<Value>) -> StreamingSseChunk {
StreamingSseChunk {
gate: Some(gate),
body: responses::sse(events),
}
}
fn response_completed_chunks(response_id: &str) -> Vec<StreamingSseChunk> {
vec![
chunk(ev_response_created(response_id)),
chunk(ev_completed(response_id)),
]
}
async fn build_codex(server: &StreamingSseServer) -> Arc<CodexThread> {
test_codex()
.with_model("gpt-5.1")
.build_with_streaming_server(server)
.await
.unwrap_or_else(|err| panic!("build streaming Codex test session: {err}"))
.codex
}
async fn submit_user_input(codex: &CodexThread, text: &str) {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap_or_else(|err| panic!("submit user input: {err}"));
}
async fn submit_queue_only_agent_mail(codex: &CodexThread, text: &str) {
codex
.submit(Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
AgentPath::try_from("/root/worker")
.unwrap_or_else(|err| panic!("worker path should parse: {err}")),
AgentPath::root(),
Vec::new(),
text.to_string(),
/*trigger_turn*/ false,
),
})
.await
.unwrap_or_else(|err| panic!("submit queue-only agent mail: {err}"));
}
async fn wait_for_reasoning_item_started(codex: &CodexThread) {
wait_for_event(codex, |event| {
matches!(
event,
EventMsg::ItemStarted(item_started)
if matches!(&item_started.item, TurnItem::Reasoning(_))
)
})
.await;
}
async fn wait_for_agent_message(codex: &CodexThread, text: &str) {
let final_message = wait_for_event(
codex,
|event| matches!(event, EventMsg::AgentMessage(message) if message.message == text),
)
.await;
assert!(matches!(final_message, EventMsg::AgentMessage(_)));
}
async fn wait_for_turn_complete(codex: &CodexThread) {
wait_for_event(codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
}
fn assert_two_responses_input_snapshot(snapshot_name: &str, requests: &[Vec<u8>]) {
assert_eq!(requests.len(), 2);
let options = ContextSnapshotOptions::default().strip_capability_instructions();
let first: Value =
from_slice(&requests[0]).unwrap_or_else(|err| panic!("parse first request: {err}"));
let second: Value =
from_slice(&requests[1]).unwrap_or_else(|err| panic!("parse second request: {err}"));
let first_items = first["input"]
.as_array()
.unwrap_or_else(|| panic!("first request input"))
.clone();
let second_items = second["input"]
.as_array()
.unwrap_or_else(|| panic!("second request input"))
.clone();
let snapshot = context_snapshot::format_labeled_items_snapshot(
"/responses POST bodies (input only, redacted like other suite snapshots)",
&[
("First request", first_items.as_slice()),
("Second request", second_items.as_slice()),
],
&options,
);
insta::assert_snapshot!(snapshot_name, snapshot);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "TODO(aibrahim): flaky"]
async fn injected_user_input_triggers_follow_up_request_with_deltas() {
@@ -144,3 +267,162 @@ async fn injected_user_input_triggers_follow_up_request_with_deltas() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_inter_agent_mail_triggers_follow_up_after_reasoning_item() {
let (gate_reasoning_done_tx, gate_reasoning_done_rx) = oneshot::channel();
let first_chunks = vec![
chunk(ev_response_created("resp-1")),
chunk(ev_reasoning_item_added("reason-1", &["thinking"])),
gated_chunk(
gate_reasoning_done_rx,
vec![
ev_reasoning_item("reason-1", &["thinking"], &[]),
ev_function_call(
"call-stale",
"shell",
r#"{"command":"echo stale tool call"}"#,
),
ev_message_item_added("msg-stale", ""),
ev_output_text_delta("stale final"),
ev_message_item_done("msg-stale", "stale final"),
ev_completed("resp-1"),
],
),
];
let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, response_completed_chunks("resp-2")]).await;
let codex = build_codex(&server).await;
submit_user_input(&codex, "first prompt").await;
wait_for_reasoning_item_started(&codex).await;
submit_queue_only_agent_mail(&codex, "queued child update").await;
let _ = gate_reasoning_done_tx.send(());
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_two_responses_input_snapshot("pending_input_queued_mail_after_reasoning", &requests);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_inter_agent_mail_triggers_follow_up_after_commentary_message_item() {
let (gate_message_done_tx, gate_message_done_rx) = oneshot::channel();
let first_chunks = vec![
chunk(ev_response_created("resp-1")),
chunk(ev_message_item_added("msg-1", "")),
gated_chunk(
gate_message_done_rx,
vec![
ev_output_text_delta("first answer"),
json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": "msg-1",
"content": [{"type": "output_text", "text": "first answer"}],
"phase": "commentary",
}
}),
ev_function_call(
"call-stale",
"shell",
r#"{"command":"echo stale tool call"}"#,
),
ev_message_item_added("msg-stale", ""),
ev_output_text_delta("stale final"),
ev_message_item_done("msg-stale", "stale final"),
ev_completed("resp-1"),
],
),
];
let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, response_completed_chunks("resp-2")]).await;
let codex = build_codex(&server).await;
submit_user_input(&codex, "first prompt").await;
wait_for_event(&codex, |event| {
matches!(
event,
EventMsg::ItemStarted(item_started)
if matches!(&item_started.item, TurnItem::AgentMessage(_))
)
})
.await;
submit_queue_only_agent_mail(&codex, "queued child update").await;
let _ = gate_message_done_tx.send(());
wait_for_agent_message(&codex, "first answer").await;
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_two_responses_input_snapshot("pending_input_queued_mail_after_commentary", &requests);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_input_does_not_preempt_after_reasoning_item() {
let (gate_reasoning_done_tx, gate_reasoning_done_rx) = oneshot::channel();
let first_chunks = vec![
chunk(ev_response_created("resp-1")),
chunk(ev_reasoning_item_added("reason-1", &["thinking"])),
gated_chunk(
gate_reasoning_done_rx,
vec![
ev_reasoning_item("reason-1", &["thinking"], &[]),
ev_function_call(
"call-preserved",
"shell",
r#"{"command":"echo preserved tool call"}"#,
),
ev_message_item_added("msg-1", ""),
ev_output_text_delta("first answer"),
ev_message_item_done("msg-1", "first answer"),
ev_completed("resp-1"),
],
),
];
let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, response_completed_chunks("resp-2")]).await;
let codex = build_codex(&server).await;
submit_user_input(&codex, "first prompt").await;
wait_for_reasoning_item_started(&codex).await;
submit_user_input(&codex, "second prompt").await;
let _ = gate_reasoning_done_tx.send(());
wait_for_agent_message(&codex, "first answer").await;
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_two_responses_input_snapshot(
"pending_input_user_input_no_preempt_after_reasoning",
&requests,
);
server.shutdown().await;
}

View File

@@ -0,0 +1,17 @@
---
source: core/tests/suite/pending_input.rs
expression: snapshot
---
Scenario: /responses POST bodies (input only, redacted like other suite snapshots)
## First request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first prompt
## Second request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first prompt
03:message/assistant:first answer
04:message/assistant:{"author":"/root/worker","recipient":"/root","other_recipients":[],"content":"queued child update","trigger_turn":false}

View File

@@ -0,0 +1,17 @@
---
source: core/tests/suite/pending_input.rs
expression: snapshot
---
Scenario: /responses POST bodies (input only, redacted like other suite snapshots)
## First request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first prompt
## Second request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first prompt
03:reasoning:summary=thinking:encrypted=true
04:message/assistant:{"author":"/root/worker","recipient":"/root","other_recipients":[],"content":"queued child update","trigger_turn":false}

View File

@@ -0,0 +1,20 @@
---
source: core/tests/suite/pending_input.rs
expression: snapshot
---
Scenario: /responses POST bodies (input only, redacted like other suite snapshots)
## First request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first prompt
## Second request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first prompt
03:reasoning:summary=thinking:encrypted=true
04:function_call/shell
05:message/assistant:first answer
06:function_call_output:failed to parse function arguments: invalid type: string "echo preserved tool call", expected a sequence at line 1 column 37
07:message/user:second prompt