mirror of
https://github.com/openai/codex.git
synced 2026-02-02 06:57:03 +00:00
completions
This commit is contained in:
@@ -13,7 +13,7 @@ use std::task::Poll;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::timeout;
|
||||
use tracing::debug;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ModelProviderInfo;
|
||||
use crate::client_common::Prompt;
|
||||
@@ -207,6 +207,7 @@ async fn process_chat_sse<S>(
|
||||
}
|
||||
|
||||
let mut fn_call_state = FunctionCallState::default();
|
||||
let mut assistant_text = String::new();
|
||||
|
||||
loop {
|
||||
let sse = match timeout(idle_timeout, stream.next()).await {
|
||||
@@ -249,26 +250,50 @@ async fn process_chat_sse<S>(
|
||||
Ok(v) => v,
|
||||
Err(_) => continue,
|
||||
};
|
||||
trace!("chat_completions received SSE chunk: {chunk:?}");
|
||||
warn!("chat_completions received SSE chunk: {chunk:?}");
|
||||
|
||||
let choice_opt = chunk.get("choices").and_then(|c| c.get(0));
|
||||
|
||||
if let Some(choice) = choice_opt {
|
||||
// Handle assistant content tokens.
|
||||
// Handle assistant content tokens as streaming deltas.
|
||||
if let Some(content) = choice
|
||||
.get("delta")
|
||||
.and_then(|d| d.get("content"))
|
||||
.and_then(|c| c.as_str())
|
||||
{
|
||||
let item = ResponseItem::Message {
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: content.to_string(),
|
||||
}],
|
||||
id: None,
|
||||
};
|
||||
// Some providers emit frequent empty-string content deltas.
|
||||
// Suppress empty deltas to avoid creating a premature answer block
|
||||
// ahead of reasoning in streaming UIs.
|
||||
if !content.is_empty() {
|
||||
assistant_text.push_str(content);
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::OutputTextDelta(content.to_string())))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
// Forward any reasoning/thinking deltas if present.
|
||||
if let Some(reasoning) = choice
|
||||
.get("delta")
|
||||
.and_then(|d| d.get("reasoning"))
|
||||
.and_then(|c| c.as_str())
|
||||
{
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::ReasoningSummaryDelta(
|
||||
reasoning.to_string(),
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
if let Some(reasoning_content) = choice
|
||||
.get("delta")
|
||||
.and_then(|d| d.get("reasoning_content"))
|
||||
.and_then(|c| c.as_str())
|
||||
{
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::ReasoningSummaryDelta(
|
||||
reasoning_content.to_string(),
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
|
||||
// Handle streaming function / tool calls.
|
||||
@@ -317,7 +342,18 @@ async fn process_chat_sse<S>(
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
"stop" => {
|
||||
// Regular turn without tool-call.
|
||||
// Regular turn without tool-call. Emit the final assistant message
|
||||
// as a single OutputItemDone so non-delta consumers see the result.
|
||||
if !assistant_text.is_empty() {
|
||||
let item = ResponseItem::Message {
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: std::mem::take(&mut assistant_text),
|
||||
}],
|
||||
id: None,
|
||||
};
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -358,7 +394,11 @@ async fn process_chat_sse<S>(
|
||||
pub(crate) struct AggregatedChatStream<S> {
|
||||
inner: S,
|
||||
cumulative: String,
|
||||
pending_completed: Option<ResponseEvent>,
|
||||
cumulative_reasoning: String,
|
||||
pending: std::collections::VecDeque<ResponseEvent>,
|
||||
// When true, do not emit a cumulative assistant message at Completed.
|
||||
streaming_mode: bool,
|
||||
// When true, forward reasoning deltas instead of ignoring them.
|
||||
}
|
||||
|
||||
impl<S> Stream for AggregatedChatStream<S>
|
||||
@@ -370,8 +410,8 @@ where
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
// First, flush any buffered Completed event from the previous call.
|
||||
if let Some(ev) = this.pending_completed.take() {
|
||||
// First, flush any buffered events from the previous call.
|
||||
if let Some(ev) = this.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(ev)));
|
||||
}
|
||||
|
||||
@@ -388,16 +428,21 @@ where
|
||||
let is_assistant_delta = matches!(&item, crate::models::ResponseItem::Message { role, .. } if role == "assistant");
|
||||
|
||||
if is_assistant_delta {
|
||||
if let crate::models::ResponseItem::Message { content, .. } = &item {
|
||||
if let Some(text) = content.iter().find_map(|c| match c {
|
||||
crate::models::ContentItem::OutputText { text } => Some(text),
|
||||
_ => None,
|
||||
}) {
|
||||
this.cumulative.push_str(text);
|
||||
// Only use the final assistant message if we have not
|
||||
// seen any deltas; otherwise, deltas already built the
|
||||
// cumulative text and this would duplicate it.
|
||||
if this.cumulative.is_empty() {
|
||||
if let crate::models::ResponseItem::Message { content, .. } = &item {
|
||||
if let Some(text) = content.iter().find_map(|c| match c {
|
||||
crate::models::ContentItem::OutputText { text } => Some(text),
|
||||
_ => None,
|
||||
}) {
|
||||
this.cumulative.push_str(text);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Swallow partial assistant chunk; keep polling.
|
||||
// Swallow assistant message here; emit on Completed.
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -408,24 +453,48 @@ where
|
||||
response_id,
|
||||
token_usage,
|
||||
}))) => {
|
||||
// Build any aggregated items in the correct order: Reasoning first, then Message.
|
||||
let mut emitted_any = false;
|
||||
|
||||
if !this.cumulative_reasoning.is_empty() {
|
||||
let aggregated_reasoning = crate::models::ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: vec![
|
||||
crate::models::ReasoningItemReasoningSummary::SummaryText {
|
||||
text: std::mem::take(&mut this.cumulative_reasoning),
|
||||
},
|
||||
],
|
||||
content: None,
|
||||
encrypted_content: None,
|
||||
};
|
||||
this.pending
|
||||
.push_back(ResponseEvent::OutputItemDone(aggregated_reasoning));
|
||||
emitted_any = true;
|
||||
}
|
||||
|
||||
if !this.cumulative.is_empty() {
|
||||
let aggregated_item = crate::models::ResponseItem::Message {
|
||||
let aggregated_message = crate::models::ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![crate::models::ContentItem::OutputText {
|
||||
text: std::mem::take(&mut this.cumulative),
|
||||
}],
|
||||
};
|
||||
this.pending
|
||||
.push_back(ResponseEvent::OutputItemDone(aggregated_message));
|
||||
emitted_any = true;
|
||||
}
|
||||
|
||||
// Buffer Completed so it is returned *after* the aggregated message.
|
||||
this.pending_completed = Some(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
// Always emit Completed last when anything was aggregated.
|
||||
if emitted_any {
|
||||
this.pending.push_back(ResponseEvent::Completed {
|
||||
response_id: response_id.clone(),
|
||||
token_usage: token_usage.clone(),
|
||||
});
|
||||
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
|
||||
aggregated_item,
|
||||
))));
|
||||
// Return the first pending event now.
|
||||
if let Some(ev) = this.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(ev)));
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing aggregated – forward Completed directly.
|
||||
@@ -439,11 +508,25 @@ where
|
||||
// will never appear in a Chat Completions stream.
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_))))
|
||||
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
|
||||
// Deltas are ignored here since aggregation waits for the
|
||||
// final OutputItemDone.
|
||||
continue;
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => {
|
||||
// Always accumulate deltas so we can emit a final OutputItemDone at Completed.
|
||||
this.cumulative.push_str(&delta);
|
||||
if this.streaming_mode {
|
||||
// In streaming mode, also forward the delta immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))) => {
|
||||
// Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed.
|
||||
this.cumulative_reasoning.push_str(&delta);
|
||||
if this.streaming_mode {
|
||||
// In streaming mode, also forward the delta immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta))));
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -475,9 +558,23 @@ pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Size
|
||||
AggregatedChatStream {
|
||||
inner: self,
|
||||
cumulative: String::new(),
|
||||
pending_completed: None,
|
||||
cumulative_reasoning: String::new(),
|
||||
pending: std::collections::VecDeque::new(),
|
||||
streaming_mode: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
|
||||
|
||||
impl<S> AggregatedChatStream<S> {
|
||||
pub(crate) fn streaming_mode(inner: S) -> Self {
|
||||
AggregatedChatStream {
|
||||
inner,
|
||||
cumulative: String::new(),
|
||||
cumulative_reasoning: String::new(),
|
||||
pending: std::collections::VecDeque::new(),
|
||||
streaming_mode: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,7 +93,11 @@ impl ModelClient {
|
||||
// Wrap it with the aggregation adapter so callers see *only*
|
||||
// the final assistant message per turn (matching the
|
||||
// behaviour of the Responses API).
|
||||
let mut aggregated = response_stream.aggregate();
|
||||
let mut aggregated = if self.config.show_reasoning_content {
|
||||
crate::chat_completions::AggregatedChatStream::streaming_mode(response_stream)
|
||||
} else {
|
||||
response_stream.aggregate()
|
||||
};
|
||||
|
||||
// Bridge the aggregated stream back into a standard
|
||||
// `ResponseStream` by forwarding events through a channel.
|
||||
@@ -438,7 +442,7 @@ async fn process_sse<S>(
|
||||
}
|
||||
}
|
||||
}
|
||||
"response.reasoning_summary_text.delta" => {
|
||||
"response.reasoning_summary_text.delta" | "response.reasoning_text.delta" => {
|
||||
if let Some(delta) = event.delta {
|
||||
let event = ResponseEvent::ReasoningSummaryDelta(delta);
|
||||
if tx_event.send(Ok(event)).await.is_err() {
|
||||
|
||||
Reference in New Issue
Block a user