Send message by default mid turn. queue messages by tab (#9077)

https://github.com/user-attachments/assets/03838730-4ddc-44df-a2c7-cb8ecda78660
This commit is contained in:
Ahmed Ibrahim
2026-01-12 23:06:35 -08:00
committed by GitHub
parent e726a82c8a
commit cbca43d57a
24 changed files with 875 additions and 342 deletions

View File

@@ -19,6 +19,7 @@ pub struct StreamingSseChunk {
/// Minimal streaming SSE server for tests that need gated per-chunk delivery.
pub struct StreamingSseServer {
uri: String,
requests: Arc<TokioMutex<Vec<Vec<u8>>>>,
shutdown: oneshot::Sender<()>,
task: tokio::task::JoinHandle<()>,
}
@@ -28,6 +29,10 @@ impl StreamingSseServer {
&self.uri
}
pub async fn requests(&self) -> Vec<Vec<u8>> {
self.requests.lock().await.clone()
}
pub async fn shutdown(self) {
let _ = self.shutdown.send(());
let _ = self.task.await;
@@ -61,6 +66,8 @@ pub async fn start_streaming_sse_server(
responses: VecDeque::from(responses),
completions: VecDeque::from(completion_senders),
}));
let requests = Arc::new(TokioMutex::new(Vec::new()));
let requests_for_task = Arc::clone(&requests);
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move {
@@ -70,6 +77,7 @@ pub async fn start_streaming_sse_server(
accept_res = listener.accept() => {
let (mut stream, _) = accept_res.expect("accept streaming SSE connection");
let state = Arc::clone(&state);
let requests = Arc::clone(&requests_for_task);
tokio::spawn(async move {
let (request, body_prefix) = read_http_request(&mut stream).await;
let Some((method, path)) = parse_request_line(&request) else {
@@ -78,7 +86,7 @@ pub async fn start_streaming_sse_server(
};
if method == "GET" && path == "/v1/models" {
if drain_request_body(&mut stream, &request, body_prefix)
if read_request_body(&mut stream, &request, body_prefix)
.await
.is_err()
{
@@ -95,13 +103,16 @@ pub async fn start_streaming_sse_server(
}
if method == "POST" && path == "/v1/responses" {
if drain_request_body(&mut stream, &request, body_prefix)
let body = match read_request_body(&mut stream, &request, body_prefix)
.await
.is_err()
{
let _ = write_http_response(&mut stream, 400, "bad request", "text/plain").await;
return;
}
Ok(body) => body,
Err(_) => {
let _ = write_http_response(&mut stream, 400, "bad request", "text/plain").await;
return;
}
};
requests.lock().await.push(body);
let Some((chunks, completion)) = take_next_stream(&state).await else {
let _ = write_http_response(&mut stream, 500, "no responses queued", "text/plain").await;
return;
@@ -137,6 +148,7 @@ pub async fn start_streaming_sse_server(
(
StreamingSseServer {
uri,
requests,
shutdown: shutdown_tx,
task,
},
@@ -202,13 +214,13 @@ fn content_length(headers: &str) -> Option<usize> {
})
}
async fn drain_request_body(
async fn read_request_body(
stream: &mut tokio::net::TcpStream,
headers: &str,
mut body_prefix: Vec<u8>,
) -> std::io::Result<()> {
) -> std::io::Result<Vec<u8>> {
let Some(content_len) = content_length(headers) else {
return Ok(());
return Ok(body_prefix);
};
if body_prefix.len() > content_len {
@@ -217,12 +229,13 @@ async fn drain_request_body(
let remaining = content_len.saturating_sub(body_prefix.len());
if remaining == 0 {
return Ok(());
return Ok(body_prefix);
}
let mut rest = vec![0u8; remaining];
stream.read_exact(&mut rest).await?;
Ok(())
body_prefix.extend_from_slice(&rest);
Ok(body_prefix)
}
async fn write_sse_headers(stream: &mut tokio::net::TcpStream) -> std::io::Result<()> {

View File

@@ -42,6 +42,7 @@ mod model_overrides;
mod model_tools;
mod models_etag_responses;
mod otel;
mod pending_input;
mod permissions_messages;
mod prompt_caching;
mod quota_exceeded;

View File

@@ -0,0 +1,143 @@
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_response_created;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use serde_json::Value;
use tokio::sync::oneshot;
fn ev_message_item_done(id: &str, text: &str) -> Value {
serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": id,
"content": [{"type": "output_text", "text": text}]
}
})
}
fn sse_event(event: Value) -> String {
responses::sse(vec![event])
}
fn message_input_texts(body: &Value, role: &str) -> Vec<String> {
body.get("input")
.and_then(Value::as_array)
.into_iter()
.flatten()
.filter(|item| item.get("type").and_then(Value::as_str) == Some("message"))
.filter(|item| item.get("role").and_then(Value::as_str) == Some(role))
.filter_map(|item| item.get("content").and_then(Value::as_array))
.flatten()
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text"))
.filter_map(|span| span.get("text").and_then(Value::as_str).map(str::to_owned))
.collect()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn injected_user_input_triggers_follow_up_request_with_deltas() {
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_added("msg-1", "")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_output_text_delta("first ")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_output_text_delta("turn")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_done("msg-1", "first turn")),
},
StreamingSseChunk {
gate: Some(gate_completed_rx),
body: sse_event(ev_completed("resp-1")),
},
];
let second_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(ev_response_created("resp-2")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_completed("resp-2")),
},
];
let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, second_chunks]).await;
let codex = test_codex()
.with_model("gpt-5.1")
.build_with_streaming_server(&server)
.await
.unwrap()
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "first prompt".into(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |event| {
matches!(event, EventMsg::AgentMessageContentDelta(_))
})
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "second prompt".into(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
let _ = gate_completed_tx.send(());
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let requests = server.requests().await;
assert_eq!(requests.len(), 2);
let first_body: Value = serde_json::from_slice(&requests[0]).expect("parse first request");
let second_body: Value = serde_json::from_slice(&requests[1]).expect("parse second request");
let first_texts = message_input_texts(&first_body, "user");
assert!(first_texts.iter().any(|text| text == "first prompt"));
assert!(!first_texts.iter().any(|text| text == "second prompt"));
let second_texts = message_input_texts(&second_body, "user");
assert!(second_texts.iter().any(|text| text == "first prompt"));
assert!(second_texts.iter().any(|text| text == "second prompt"));
server.shutdown().await;
}