From 354801f0ddc978d125011406ed567fb6b0538aa3 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sun, 3 Aug 2025 00:22:16 -0700 Subject: [PATCH] completions --- codex-rs/core/src/chat_completions.rs | 171 ++++++++++++++++++++------ codex-rs/core/src/client.rs | 8 +- 2 files changed, 140 insertions(+), 39 deletions(-) diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index 5ede774b1c..a51f219412 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -13,7 +13,7 @@ use std::task::Poll; use tokio::sync::mpsc; use tokio::time::timeout; use tracing::debug; -use tracing::trace; +use tracing::warn; use crate::ModelProviderInfo; use crate::client_common::Prompt; @@ -207,6 +207,7 @@ async fn process_chat_sse( } let mut fn_call_state = FunctionCallState::default(); + let mut assistant_text = String::new(); loop { let sse = match timeout(idle_timeout, stream.next()).await { @@ -249,26 +250,50 @@ async fn process_chat_sse( Ok(v) => v, Err(_) => continue, }; - trace!("chat_completions received SSE chunk: {chunk:?}"); + warn!("chat_completions received SSE chunk: {chunk:?}"); let choice_opt = chunk.get("choices").and_then(|c| c.get(0)); if let Some(choice) = choice_opt { - // Handle assistant content tokens. + // Handle assistant content tokens as streaming deltas. if let Some(content) = choice .get("delta") .and_then(|d| d.get("content")) .and_then(|c| c.as_str()) { - let item = ResponseItem::Message { - role: "assistant".to_string(), - content: vec![ContentItem::OutputText { - text: content.to_string(), - }], - id: None, - }; + // Some providers emit frequent empty-string content deltas. + // Suppress empty deltas to avoid creating a premature answer block + // ahead of reasoning in streaming UIs. + if !content.is_empty() { + assistant_text.push_str(content); + let _ = tx_event + .send(Ok(ResponseEvent::OutputTextDelta(content.to_string()))) + .await; + } + } - let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + // Forward any reasoning/thinking deltas if present. + if let Some(reasoning) = choice + .get("delta") + .and_then(|d| d.get("reasoning")) + .and_then(|c| c.as_str()) + { + let _ = tx_event + .send(Ok(ResponseEvent::ReasoningSummaryDelta( + reasoning.to_string(), + ))) + .await; + } + if let Some(reasoning_content) = choice + .get("delta") + .and_then(|d| d.get("reasoning_content")) + .and_then(|c| c.as_str()) + { + let _ = tx_event + .send(Ok(ResponseEvent::ReasoningSummaryDelta( + reasoning_content.to_string(), + ))) + .await; } // Handle streaming function / tool calls. @@ -317,7 +342,18 @@ async fn process_chat_sse( let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } "stop" => { - // Regular turn without tool-call. + // Regular turn without tool-call. Emit the final assistant message + // as a single OutputItemDone so non-delta consumers see the result. + if !assistant_text.is_empty() { + let item = ResponseItem::Message { + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: std::mem::take(&mut assistant_text), + }], + id: None, + }; + let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + } } _ => {} } @@ -358,7 +394,11 @@ async fn process_chat_sse( pub(crate) struct AggregatedChatStream { inner: S, cumulative: String, - pending_completed: Option, + cumulative_reasoning: String, + pending: std::collections::VecDeque, + // When true, do not emit a cumulative assistant message at Completed. + streaming_mode: bool, + // When true, forward reasoning deltas instead of ignoring them. } impl Stream for AggregatedChatStream @@ -370,8 +410,8 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // First, flush any buffered Completed event from the previous call. - if let Some(ev) = this.pending_completed.take() { + // First, flush any buffered events from the previous call. + if let Some(ev) = this.pending.pop_front() { return Poll::Ready(Some(Ok(ev))); } @@ -388,16 +428,21 @@ where let is_assistant_delta = matches!(&item, crate::models::ResponseItem::Message { role, .. } if role == "assistant"); if is_assistant_delta { - if let crate::models::ResponseItem::Message { content, .. } = &item { - if let Some(text) = content.iter().find_map(|c| match c { - crate::models::ContentItem::OutputText { text } => Some(text), - _ => None, - }) { - this.cumulative.push_str(text); + // Only use the final assistant message if we have not + // seen any deltas; otherwise, deltas already built the + // cumulative text and this would duplicate it. + if this.cumulative.is_empty() { + if let crate::models::ResponseItem::Message { content, .. } = &item { + if let Some(text) = content.iter().find_map(|c| match c { + crate::models::ContentItem::OutputText { text } => Some(text), + _ => None, + }) { + this.cumulative.push_str(text); + } } } - // Swallow partial assistant chunk; keep polling. + // Swallow assistant message here; emit on Completed. continue; } @@ -408,24 +453,48 @@ where response_id, token_usage, }))) => { + // Build any aggregated items in the correct order: Reasoning first, then Message. + let mut emitted_any = false; + + if !this.cumulative_reasoning.is_empty() { + let aggregated_reasoning = crate::models::ResponseItem::Reasoning { + id: String::new(), + summary: vec![ + crate::models::ReasoningItemReasoningSummary::SummaryText { + text: std::mem::take(&mut this.cumulative_reasoning), + }, + ], + content: None, + encrypted_content: None, + }; + this.pending + .push_back(ResponseEvent::OutputItemDone(aggregated_reasoning)); + emitted_any = true; + } + if !this.cumulative.is_empty() { - let aggregated_item = crate::models::ResponseItem::Message { + let aggregated_message = crate::models::ResponseItem::Message { id: None, role: "assistant".to_string(), content: vec![crate::models::ContentItem::OutputText { text: std::mem::take(&mut this.cumulative), }], }; + this.pending + .push_back(ResponseEvent::OutputItemDone(aggregated_message)); + emitted_any = true; + } - // Buffer Completed so it is returned *after* the aggregated message. - this.pending_completed = Some(ResponseEvent::Completed { - response_id, - token_usage, + // Always emit Completed last when anything was aggregated. + if emitted_any { + this.pending.push_back(ResponseEvent::Completed { + response_id: response_id.clone(), + token_usage: token_usage.clone(), }); - - return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone( - aggregated_item, - )))); + // Return the first pending event now. + if let Some(ev) = this.pending.pop_front() { + return Poll::Ready(Some(Ok(ev))); + } } // Nothing aggregated – forward Completed directly. @@ -439,11 +508,25 @@ where // will never appear in a Chat Completions stream. continue; } - Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_)))) - | Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => { - // Deltas are ignored here since aggregation waits for the - // final OutputItemDone. - continue; + Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => { + // Always accumulate deltas so we can emit a final OutputItemDone at Completed. + this.cumulative.push_str(&delta); + if this.streaming_mode { + // In streaming mode, also forward the delta immediately. + return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))); + } else { + continue; + } + } + Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))) => { + // Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed. + this.cumulative_reasoning.push_str(&delta); + if this.streaming_mode { + // In streaming mode, also forward the delta immediately. + return Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))); + } else { + continue; + } } } } @@ -475,9 +558,23 @@ pub(crate) trait AggregateStreamExt: Stream> + Size AggregatedChatStream { inner: self, cumulative: String::new(), - pending_completed: None, + cumulative_reasoning: String::new(), + pending: std::collections::VecDeque::new(), + streaming_mode: false, } } } impl AggregateStreamExt for T where T: Stream> + Sized {} + +impl AggregatedChatStream { + pub(crate) fn streaming_mode(inner: S) -> Self { + AggregatedChatStream { + inner, + cumulative: String::new(), + cumulative_reasoning: String::new(), + pending: std::collections::VecDeque::new(), + streaming_mode: true, + } + } +} diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index b9ea6b13f4..503301e21a 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -93,7 +93,11 @@ impl ModelClient { // Wrap it with the aggregation adapter so callers see *only* // the final assistant message per turn (matching the // behaviour of the Responses API). - let mut aggregated = response_stream.aggregate(); + let mut aggregated = if self.config.show_reasoning_content { + crate::chat_completions::AggregatedChatStream::streaming_mode(response_stream) + } else { + response_stream.aggregate() + }; // Bridge the aggregated stream back into a standard // `ResponseStream` by forwarding events through a channel. @@ -438,7 +442,7 @@ async fn process_sse( } } } - "response.reasoning_summary_text.delta" => { + "response.reasoning_summary_text.delta" | "response.reasoning_text.delta" => { if let Some(delta) = event.delta { let event = ResponseEvent::ReasoningSummaryDelta(delta); if tx_event.send(Ok(event)).await.is_err() {