Compare commits

...

1 Commits

Author SHA1 Message Date
aibrahim-oai
f64f5ca796 Fix test helper to avoid unwrap 2025-07-11 12:16:46 -07:00

View File

@@ -462,3 +462,98 @@ pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Size
}
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use tokio_util::io::ReaderStream;
/// Helper that feeds the provided SSE content into `process_chat_sse` and
/// collects all resulting `ResponseEvent`s.
async fn collect_events(sse: &str, aggregate: bool) -> Vec<ResponseEvent> {
let (tx, rx) = tokio::sync::mpsc::channel::<Result<ResponseEvent>>(16);
// ReaderStream turns the in-memory string into an async byte stream.
let stream = ReaderStream::new(std::io::Cursor::new(sse.to_string())).map_err(CodexErr::Io);
tokio::spawn(process_chat_sse(stream, tx));
let response_stream = ResponseStream { rx_event: rx };
let mut stream: Pin<Box<dyn Stream<Item = Result<ResponseEvent>>>> = if aggregate {
Box::pin(response_stream.aggregate())
} else {
Box::pin(response_stream)
};
let mut events = Vec::new();
while let Some(ev) = stream.next().await {
match ev {
Ok(ev) => events.push(ev),
Err(err) => panic!("stream error: {err}"),
}
}
events
}
/// Split assistant message chunks are concatenated into a single message and
/// followed by a Completed event.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn aggregates_text_deltas() {
let sse = concat!(
"data: {\"id\":\"1\",\"choices\":[{\"delta\":{\"content\":\"Hello\"},\"index\":0,\"finish_reason\":null}]}\n\n",
"data: {\"id\":\"1\",\"choices\":[{\"delta\":{\"content\":\" world\"},\"index\":0,\"finish_reason\":null}]}\n\n",
"data: {\"id\":\"1\",\"choices\":[{\"delta\":{},\"index\":0,\"finish_reason\":\"stop\"}],\"usage\":{}}\n\n"
);
let events = collect_events(sse, true).await;
assert_eq!(events.len(), 2, "expected aggregated message and Completed");
match &events[0] {
ResponseEvent::OutputItemDone(ResponseItem::Message { role, content }) => {
assert_eq!(role, "assistant");
assert_eq!(content.len(), 1);
if let crate::models::ContentItem::OutputText { text } = &content[0] {
assert_eq!(text, "Hello world");
} else {
panic!("unexpected content variant");
}
}
other => panic!("unexpected first event: {other:?}"),
}
assert!(matches!(events[1], ResponseEvent::Completed { .. }));
}
/// Function call arguments arriving in multiple chunks are merged into a
/// single `FunctionCall` item and the stream ends with `Completed`.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn merges_function_call_arguments() {
let sse = concat!(
"data: {\"id\":\"chatcmpl-123\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_123\",\"type\":\"function\",\"function\":{\"name\":\"get_weather\"}}]},\"finish_reason\":null,\"index\":0}]}\n\n",
"data: {\"id\":\"chatcmpl-123\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"{\\\"location\\\":\\\"San Franci\"}}]},\"finish_reason\":null,\"index\":0}]}\n\n",
"data: {\"id\":\"chatcmpl-123\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"sco\\\"}\"}}]},\"finish_reason\":null,\"index\":0}]}\n\n",
"data: {\"id\":\"chatcmpl-123\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{},\"finish_reason\":\"tool_calls\",\"index\":0}],\"usage\":{}}\n\n"
);
let events = collect_events(sse, false).await;
assert_eq!(events.len(), 2, "function call and Completed expected");
match &events[0] {
ResponseEvent::OutputItemDone(ResponseItem::FunctionCall {
name,
arguments,
call_id,
}) => {
assert_eq!(name, "get_weather");
assert_eq!(call_id, "call_123");
assert_eq!(arguments, "{\"location\":\"San Francisco\"}");
}
other => panic!("unexpected first event: {other:?}"),
}
assert!(matches!(events[1], ResponseEvent::Completed { .. }));
}
}