This commit is contained in:
jif-oai
2025-11-13 16:11:10 +01:00
parent def4a996c4
commit 38b568b504

View File

@@ -127,73 +127,104 @@ pub(crate) async fn stream_chat_completions(
{
reasoning_by_anchor_index
.entry(idx - 1)
.and_modify(|existing| existing.push_str(text.as_str()))
.and_modify(|v| v.push_str(text.as_str()))
.or_insert(text.clone());
attached = true;
}
// Otherwise, attach to the first future assistant anchor.
if !attached {
for anchor_idx in idx + 1..input.len() {
match &input[anchor_idx] {
ResponseItem::Message { role, .. } if role == "assistant" => {
reasoning_by_anchor_index
.entry(anchor_idx)
.and_modify(|existing| existing.push_str(text.as_str()))
.or_insert(text.clone());
attached = true;
break;
}
ResponseItem::FunctionCall { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCallOutput { .. } => {
continue;
}
_ => break,
// Otherwise, attach to immediate next assistant anchor (tool-calls or assistant message)
if !attached && idx + 1 < input.len() {
match &input[idx + 1] {
ResponseItem::FunctionCall { .. } | ResponseItem::LocalShellCall { .. } => {
reasoning_by_anchor_index
.entry(idx + 1)
.and_modify(|v| v.push_str(text.as_str()))
.or_insert(text.clone());
}
ResponseItem::Message { role, .. } if role == "assistant" => {
reasoning_by_anchor_index
.entry(idx + 1)
.and_modify(|v| v.push_str(text.as_str()))
.or_insert(text.clone());
}
_ => {}
}
}
// Either attached or dropped, move on.
}
}
}
for (index, item) in input.iter().enumerate() {
// Track last assistant text we emitted to avoid duplicate assistant messages
// in the outbound Chat Completions payload (can happen if a final
// aggregated assistant message was recorded alongside an earlier partial).
let mut last_assistant_text: Option<String> = None;
for (idx, item) in input.iter().enumerate() {
match item {
ResponseItem::Message { role, content, .. } => {
let mut content_text = String::new();
for item in content {
match item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
content_text.push_str(text)
// Build content either as a plain string (typical for assistant text)
// or as an array of content items when images are present (user/tool multimodal).
let mut text = String::new();
let mut items: Vec<serde_json::Value> = Vec::new();
let mut saw_image = false;
for c in content {
match c {
ContentItem::InputText { text: t }
| ContentItem::OutputText { text: t } => {
text.push_str(t);
items.push(json!({"type":"text","text": t}));
}
ContentItem::InputImage { image_url } => {
saw_image = true;
items.push(json!({"type":"image_url","image_url": {"url": image_url}}));
}
ContentItem::InputImage { .. } => {}
}
}
if content_text.trim().is_empty() {
continue;
// Skip exact-duplicate assistant messages.
if role == "assistant" {
if let Some(prev) = &last_assistant_text
&& prev == &text
{
continue;
}
last_assistant_text = Some(text.clone());
}
// Append reasoning when mapped to this anchor.
if let Some(reasoning) = reasoning_by_anchor_index.remove(&index) {
content_text.push_str(reasoning.as_str());
}
// For assistant messages, always send a plain string for compatibility.
// For user messages, if an image is present, send an array of content items.
let content_value = if role == "assistant" {
json!(text)
} else if saw_image {
json!(items)
} else {
json!(text)
};
messages.push(json!({
let mut msg = json!({
"role": role,
"content": content_text,
}));
}
"content": content_value
});
if role == "assistant"
&& let Some(reasoning) = reasoning_by_anchor_index.get(&idx)
&& let Some(obj) = msg.as_object_mut()
{
obj.insert("reasoning".to_string(), json!(reasoning));
}
messages.push(msg);
}
ResponseItem::FunctionCall {
name,
arguments,
call_id,
..
} => {
messages.push(json!({
let mut msg = json!({
"role": "assistant",
"content": null,
"tool_calls": [{
"id": call_id,
"type": "function",
@@ -202,7 +233,41 @@ pub(crate) async fn stream_chat_completions(
"arguments": arguments,
}
}],
}));
});
if let Some(reasoning) = reasoning_by_anchor_index.get(&idx)
&& let Some(obj) = msg.as_object_mut()
{
obj.insert("reasoning".to_string(), json!(reasoning));
}
messages.push(msg);
}
ResponseItem::LocalShellCall {
id,
call_id: _,
status,
action,
} => {
let mut msg = json!({
"role": "assistant",
"content": null,
"tool_calls": [{
"id": id.clone().unwrap_or_else(|| "".to_string()),
"type": "local_shell_call",
"status": status,
"action": action,
}]
});
if let Some(reasoning) = reasoning_by_anchor_index.get(&idx)
&& let Some(obj) = msg.as_object_mut()
{
obj.insert("reasoning".to_string(), json!(reasoning));
}
messages.push(msg);
}
ResponseItem::FunctionCallOutput { call_id, output } => {
@@ -230,43 +295,28 @@ pub(crate) async fn stream_chat_completions(
"tool_call_id": call_id,
"content": content_value,
}));
if let Some(reasoning) = reasoning_by_anchor_index.remove(&index) {
messages.push(json!({
"role": "assistant",
"content": reasoning,
}));
}
}
ResponseItem::LocalShellCall { call_id, .. } => {
ResponseItem::CustomToolCall {
id,
call_id: _,
name,
input,
status: _,
} => {
messages.push(json!({
"role": "assistant",
"content": null,
"tool_calls": [{
"id": call_id,
"type": "function",
"function": {
"name": "shell",
"arguments": "{}", // arguments are defined via `input` only.
}
}],
}));
}
ResponseItem::CustomToolCall { call_id, name, .. } => {
messages.push(json!({
"role": "assistant",
"tool_calls": [{
"id": call_id,
"type": "function",
"function": {
"id": id,
"type": "custom",
"custom": {
"name": name,
"arguments": "{}", // arguments are defined via `input` only.
"input": input,
}
}],
}));
}
ResponseItem::CustomToolCallOutput { call_id, output } => {
messages.push(json!({
"role": "tool",
@@ -276,24 +326,18 @@ pub(crate) async fn stream_chat_completions(
}
ResponseItem::Reasoning { .. } => {
// Reasoning is mapped onto adjacent assistant anchors above.
// Omit from conversation history; reasoning is attached to anchors above.
continue;
}
ResponseItem::WebSearchCall { .. } => {}
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => {
continue;
}
ResponseItem::Other { .. } => {}
ResponseItem::GhostSnapshot { .. } => {}
}
}
// Attach any reasoning still not mapped (e.g., if the last input items are Reasoning).
if messages.len() == 1 {
if let Some(text) = reasoning_by_anchor_index.remove(&0) {
messages.push(json!({
"role": "assistant",
"content": text,
}));
ResponseItem::GhostSnapshot { .. } => {
// Ghost snapshots annotate history but are not sent to the model.
continue;
}
}
}
@@ -718,28 +762,34 @@ async fn process_chat_sse<S>(
}
}
/// Adapter that aggregates Chat Completions SSE output into the final assistant
/// message plus optional reasoning, mirroring the Responses API contract.
pub(crate) struct AggregatedChatStream<S>
where
S: Stream<Item = Result<ResponseEvent>> + Unpin,
{
inner: S,
pending: std::collections::VecDeque<ResponseEvent>,
finished: bool,
/// Optional client-side aggregation helper
///
/// Stream adapter that merges the incremental `OutputItemDone` chunks coming from
/// [`process_chat_sse`] into a *running* assistant message, **suppressing the
/// per-token deltas**. The stream stays silent while the model is thinking
/// and only emits two events per turn:
///
/// 1. `ResponseEvent::OutputItemDone` with the *complete* assistant message
/// (fully concatenated).
/// 2. The original `ResponseEvent::Completed` right after it.
///
/// This mirrors the behaviour the TypeScript CLI exposes to its higher layers.
///
/// The adapter is intentionally *lossless*: callers who do **not** opt in via
/// [`AggregateStreamExt::aggregate()`] keep receiving the original unmodified
/// events.
#[derive(Copy, Clone, Eq, PartialEq)]
enum AggregateMode {
AggregatedOnly,
Streaming,
}
impl<S> AggregatedChatStream<S>
where
S: Stream<Item = Result<ResponseEvent>> + Unpin,
{
pub fn streaming_mode(inner: S) -> Self {
Self {
inner,
pending: std::collections::VecDeque::new(),
finished: false,
}
}
pub(crate) struct AggregatedChatStream<S> {
inner: S,
cumulative: String,
cumulative_reasoning: String,
pending: std::collections::VecDeque<ResponseEvent>,
mode: AggregateMode,
}
impl<S> Stream for AggregatedChatStream<S>
@@ -751,44 +801,152 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(event) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(event)));
}
if this.finished {
return Poll::Ready(None);
// First, flush any buffered events from the previous call.
if let Some(ev) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
loop {
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => {
this.pending.push_back(ResponseEvent::OutputItemDone(item));
continue;
// If this is an incremental assistant message chunk, accumulate but
// do NOT emit yet. Forward any other item (e.g. FunctionCall) right
// away so downstream consumers see it.
let is_assistant_message = matches!(
&item,
ResponseItem::Message { role, .. } if role == "assistant"
);
if is_assistant_message {
match this.mode {
AggregateMode::AggregatedOnly => {
// Only use the final assistant message if we have not
// seen any deltas; otherwise, deltas already built the
// cumulative text and this would duplicate it.
if this.cumulative.is_empty()
&& let ResponseItem::Message {
content,
..
} = &item
&& let Some(text) = content.iter().find_map(|c| match c {
ContentItem::OutputText { text } => Some(text),
_ => None,
})
{
this.cumulative.push_str(text);
}
// Swallow assistant message here; emit on Completed.
continue;
}
AggregateMode::Streaming => {
// In streaming mode, if we have not seen any deltas, forward
// the final assistant message directly. If deltas were seen,
// suppress the final message to avoid duplication.
if this.cumulative.is_empty() {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
item,
))));
}
continue;
}
}
}
// Not an assistant message forward immediately.
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
}
Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => {
this.pending.push_back(ResponseEvent::RateLimits(snapshot));
continue;
return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot))));
}
Poll::Ready(Some(Ok(ResponseEvent::Completed {
response_id,
token_usage,
}))) => {
this.pending.push_back(ResponseEvent::Completed {
// Build any aggregated items in the correct order: Reasoning first, then Message.
let mut emitted_any = false;
if !this.cumulative_reasoning.is_empty()
&& matches!(this.mode, AggregateMode::AggregatedOnly)
{
let aggregated_reasoning = ResponseItem::Reasoning {
id: String::new(),
summary: Vec::new(),
content: Some(vec![ReasoningItemContent::ReasoningText {
text: std::mem::take(&mut this.cumulative_reasoning),
}]),
encrypted_content: None,
};
this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_reasoning));
emitted_any = true;
}
// Always emit the final aggregated assistant message when any
// content deltas have been observed. In AggregatedOnly mode this
// is the sole assistant output; in Streaming mode this finalizes
// the streamed deltas into a terminal OutputItemDone so callers
// can persist/render the message once per turn.
if !this.cumulative.is_empty() {
let aggregated_message = ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: std::mem::take(&mut this.cumulative),
}],
};
this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_message));
emitted_any = true;
}
// Always emit Completed last when anything was aggregated.
if emitted_any {
this.pending.push_back(ResponseEvent::Completed {
response_id: response_id.clone(),
token_usage: token_usage.clone(),
});
if let Some(ev) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
}
// Nothing aggregated forward Completed directly.
return Poll::Ready(Some(Ok(ResponseEvent::Completed {
response_id,
token_usage,
});
this.finished = true;
return Poll::Ready(this.pending.pop_front().map(Ok));
})));
}
Poll::Ready(Some(Ok(other))) => {
this.pending.push_back(other);
Poll::Ready(Some(Ok(ResponseEvent::Created))) => {
// These events are exclusive to the Responses API and
// will never appear in a Chat Completions stream.
continue;
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
this.finished = true;
return Poll::Ready(None);
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => {
// Always accumulate deltas so we can emit a final OutputItemDone at Completed.
this.cumulative.push_str(&delta);
if matches!(this.mode, AggregateMode::Streaming) {
// In streaming mode, also forward the delta immediately.
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta)))) => {
// Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed.
this.cumulative_reasoning.push_str(&delta);
if matches!(this.mode, AggregateMode::Streaming) {
// In streaming mode, also forward the delta immediately.
return Poll::Ready(Some(Ok(
ResponseEvent::ReasoningContentDelta(delta),
)));
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded))) => {}
Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item))));
}
}
}
@@ -796,12 +954,36 @@ where
}
/// Extension trait that activates aggregation on any stream of [`ResponseEvent`].
pub(crate) trait AggregateStreamExt:
Stream<Item = Result<ResponseEvent>> + Sized + Unpin
{
pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Sized {
/// Returns a new stream that emits **only** the final assistant message
/// per turn instead of every incremental delta. The produced
/// `ResponseEvent` sequence for a typical text turn looks like:
///
/// ```ignore
/// OutputItemDone(<full message>)
/// Completed
/// ```
///
/// No other `OutputItemDone` events will be seen by the caller.
fn aggregate(self) -> AggregatedChatStream<Self> {
AggregatedChatStream::streaming_mode(self)
AggregatedChatStream::new(self, AggregateMode::AggregatedOnly)
}
}
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized + Unpin {}
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
impl<S> AggregatedChatStream<S> {
fn new(inner: S, mode: AggregateMode) -> Self {
AggregatedChatStream {
inner,
cumulative: String::new(),
cumulative_reasoning: String::new(),
pending: std::collections::VecDeque::new(),
mode,
}
}
pub(crate) fn streaming_mode(inner: S) -> Self {
Self::new(inner, AggregateMode::Streaming)
}
}