Compare commits

...

3 Commits

Author SHA1 Message Date
aibrahim-oai
e2343e710b Fix SSE parser test clippy issues 2025-07-11 13:19:32 -07:00
aibrahim-oai
64897d9083 Format SSE parser test 2025-07-11 12:09:42 -07:00
aibrahim-oai
e274cd04de test: ensure chat completions SSE parser merges chunks 2025-07-11 12:01:27 -07:00

View File

@@ -462,3 +462,66 @@ pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Size
}
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use futures::stream;
use tokio::sync::mpsc;
#[tokio::test]
async fn merges_function_call_chunks_and_completes() {
let chunks = vec![
"data: {\"choices\":[{\"delta\":{\"content\":\"Hello \"}}]}\n\n",
"data: {\"choices\":[{\"delta\":{\"content\":\"world\"}}]}\n\n",
"data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call1\",\"type\":\"function\",\"function\":{\"name\":\"foo\"}}]}}]}\n\n",
"data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"{\\\"a\\\": \"}}]}}]}\n\n",
"data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"1}\"}}]},\"finish_reason\":\"tool_calls\"}]}\n\n",
];
let byte_stream = stream::iter(chunks.into_iter().map(|s| Ok(Bytes::from(s))));
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
tokio::spawn(process_chat_sse(byte_stream, tx));
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
match ev {
Ok(event) => events.push(event),
Err(e) => panic!("stream error: {e}"),
}
}
assert_eq!(events.len(), 4);
let mut text = String::new();
for (i, event) in events.iter().take(2).enumerate() {
match event {
ResponseEvent::OutputItemDone(ResponseItem::Message { role, content })
if role == "assistant" =>
{
if let Some(ContentItem::OutputText { text: t }) = content.first() {
text.push_str(t);
}
}
other => panic!("unexpected event {i}: {other:?}"),
}
}
assert_eq!(text, "Hello world");
match &events[2] {
ResponseEvent::OutputItemDone(ResponseItem::FunctionCall {
name,
arguments,
call_id,
}) => {
assert_eq!(name, "foo");
assert_eq!(call_id, "call1");
assert_eq!(arguments, "{\"a\": 1}");
}
other => panic!("unexpected third event: {other:?}"),
}
assert!(matches!(events[3], ResponseEvent::Completed { .. }));
}
}