mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
fix(codex-api): handle Chat Completions DONE sentinel (#8708)
Context - This code parses Server-Sent Events (SSE) from the legacy Chat Completions streaming API (wire_api = "chat"). - The upstream protocol terminates a stream with a final sentinel event: data: [DONE]. - Some of our test stubs/helpers historically end the stream with data: DONE (no brackets). How this was found - GitHub Actions on Windows failed in codex-app-server integration tests with wiremock verification errors (expected multiple POSTs, got 1). Diagnosis - The job logs included: codex_api::sse::chat: Failed to parse ChatCompletions SSE event ... data: DONE. - eventsource_stream surfaces the sentinel as a normal SSE event; it does not automatically close the stream. - The parser previously attempted to JSON-decode every data: payload. The sentinel is not JSON, so we logged and skipped it, then continued polling. - On servers that keep the HTTP connection open after emitting the sentinel (notably wiremock on Windows), skipping the sentinel meant we never emitted ResponseEvent::Completed. - Higher layers wait for completion before progressing (emitting approval requests and issuing follow-up model calls), so the test never reached the subsequent requests and wiremock panicked when its expected-call count was not met. Fix - Treat both data: [DONE] and data: DONE as explicit end-of-stream sentinels. - When a sentinel is seen, flush any pending assistant/reasoning items and emit ResponseEvent::Completed once. Tests - Add a regression unit test asserting we complete on the sentinel even if the underlying connection is not closed.
This commit is contained in:
@@ -30,6 +30,21 @@ pub(crate) fn spawn_chat_stream(
|
||||
ResponseStream { rx_event }
|
||||
}
|
||||
|
||||
/// Processes Server-Sent Events from the legacy Chat Completions streaming API.
|
||||
///
|
||||
/// The upstream protocol terminates a streaming response with a final sentinel event
|
||||
/// (`data: [DONE]`). Historically, some of our test stubs have emitted `data: DONE`
|
||||
/// (without brackets) instead.
|
||||
///
|
||||
/// `eventsource_stream` delivers these sentinels as regular events rather than signaling
|
||||
/// end-of-stream. If we try to parse them as JSON, we log and skip them, then keep
|
||||
/// polling for more events.
|
||||
///
|
||||
/// On servers that keep the HTTP connection open after emitting the sentinel (notably
|
||||
/// wiremock on Windows), skipping the sentinel means we never emit `ResponseEvent::Completed`.
|
||||
/// Higher-level workflows/tests that wait for completion before issuing subsequent model
|
||||
/// calls will then stall, which shows up as "expected N requests, got 1" verification
|
||||
/// failures in the mock server.
|
||||
pub async fn process_chat_sse<S>(
|
||||
stream: S,
|
||||
tx_event: mpsc::Sender<Result<ResponseEvent, ApiError>>,
|
||||
@@ -57,6 +72,31 @@ pub async fn process_chat_sse<S>(
|
||||
let mut reasoning_item: Option<ResponseItem> = None;
|
||||
let mut completed_sent = false;
|
||||
|
||||
async fn flush_and_complete(
|
||||
tx_event: &mpsc::Sender<Result<ResponseEvent, ApiError>>,
|
||||
reasoning_item: &mut Option<ResponseItem>,
|
||||
assistant_item: &mut Option<ResponseItem>,
|
||||
) {
|
||||
if let Some(reasoning) = reasoning_item.take() {
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::OutputItemDone(reasoning)))
|
||||
.await;
|
||||
}
|
||||
|
||||
if let Some(assistant) = assistant_item.take() {
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::OutputItemDone(assistant)))
|
||||
.await;
|
||||
}
|
||||
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::Completed {
|
||||
response_id: String::new(),
|
||||
token_usage: None,
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
|
||||
loop {
|
||||
let start = Instant::now();
|
||||
let response = timeout(idle_timeout, stream.next()).await;
|
||||
@@ -70,24 +110,8 @@ pub async fn process_chat_sse<S>(
|
||||
return;
|
||||
}
|
||||
Ok(None) => {
|
||||
if let Some(reasoning) = reasoning_item {
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::OutputItemDone(reasoning)))
|
||||
.await;
|
||||
}
|
||||
|
||||
if let Some(assistant) = assistant_item {
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::OutputItemDone(assistant)))
|
||||
.await;
|
||||
}
|
||||
if !completed_sent {
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::Completed {
|
||||
response_id: String::new(),
|
||||
token_usage: None,
|
||||
}))
|
||||
.await;
|
||||
flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -101,16 +125,25 @@ pub async fn process_chat_sse<S>(
|
||||
|
||||
trace!("SSE event: {}", sse.data);
|
||||
|
||||
if sse.data.trim().is_empty() {
|
||||
let data = sse.data.trim();
|
||||
|
||||
if data.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let value: serde_json::Value = match serde_json::from_str(&sse.data) {
|
||||
if data == "[DONE]" || data == "DONE" {
|
||||
if !completed_sent {
|
||||
flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let value: serde_json::Value = match serde_json::from_str(data) {
|
||||
Ok(val) => val,
|
||||
Err(err) => {
|
||||
debug!(
|
||||
"Failed to parse ChatCompletions SSE event: {err}, data: {}",
|
||||
&sse.data
|
||||
data
|
||||
);
|
||||
continue;
|
||||
}
|
||||
@@ -362,6 +395,16 @@ mod tests {
|
||||
body
|
||||
}
|
||||
|
||||
/// Regression test: the stream should complete when we see a `[DONE]` sentinel.
|
||||
///
|
||||
/// This is important for tests/mocks that don't immediately close the underlying
|
||||
/// connection after emitting the sentinel.
|
||||
#[tokio::test]
|
||||
async fn completes_on_done_sentinel_without_json() {
|
||||
let events = collect_events("event: message\ndata: [DONE]\n\n").await;
|
||||
assert_matches!(&events[..], [ResponseEvent::Completed { .. }]);
|
||||
}
|
||||
|
||||
async fn collect_events(body: &str) -> Vec<ResponseEvent> {
|
||||
let reader = ReaderStream::new(std::io::Cursor::new(body.to_string()))
|
||||
.map_err(|err| codex_client::TransportError::Network(err.to_string()));
|
||||
|
||||
Reference in New Issue
Block a user