mirror of
https://github.com/openai/codex.git
synced 2026-04-24 22:54:54 +00:00
Fix test helper to avoid unwrap
This commit is contained in:
@@ -462,3 +462,98 @@ pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Size
|
||||
}
|
||||
|
||||
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::StreamExt;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
/// Helper that feeds the provided SSE content into `process_chat_sse` and
|
||||
/// collects all resulting `ResponseEvent`s.
|
||||
async fn collect_events(sse: &str, aggregate: bool) -> Vec<ResponseEvent> {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Result<ResponseEvent>>(16);
|
||||
|
||||
// ReaderStream turns the in-memory string into an async byte stream.
|
||||
let stream = ReaderStream::new(std::io::Cursor::new(sse.to_string())).map_err(CodexErr::Io);
|
||||
|
||||
tokio::spawn(process_chat_sse(stream, tx));
|
||||
|
||||
let response_stream = ResponseStream { rx_event: rx };
|
||||
let mut stream: Pin<Box<dyn Stream<Item = Result<ResponseEvent>>>> = if aggregate {
|
||||
Box::pin(response_stream.aggregate())
|
||||
} else {
|
||||
Box::pin(response_stream)
|
||||
};
|
||||
|
||||
let mut events = Vec::new();
|
||||
while let Some(ev) = stream.next().await {
|
||||
match ev {
|
||||
Ok(ev) => events.push(ev),
|
||||
Err(err) => panic!("stream error: {err}"),
|
||||
}
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
/// Split assistant message chunks are concatenated into a single message and
|
||||
/// followed by a Completed event.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn aggregates_text_deltas() {
|
||||
let sse = concat!(
|
||||
"data: {\"id\":\"1\",\"choices\":[{\"delta\":{\"content\":\"Hello\"},\"index\":0,\"finish_reason\":null}]}\n\n",
|
||||
"data: {\"id\":\"1\",\"choices\":[{\"delta\":{\"content\":\" world\"},\"index\":0,\"finish_reason\":null}]}\n\n",
|
||||
"data: {\"id\":\"1\",\"choices\":[{\"delta\":{},\"index\":0,\"finish_reason\":\"stop\"}],\"usage\":{}}\n\n"
|
||||
);
|
||||
|
||||
let events = collect_events(sse, true).await;
|
||||
|
||||
assert_eq!(events.len(), 2, "expected aggregated message and Completed");
|
||||
|
||||
match &events[0] {
|
||||
ResponseEvent::OutputItemDone(ResponseItem::Message { role, content }) => {
|
||||
assert_eq!(role, "assistant");
|
||||
assert_eq!(content.len(), 1);
|
||||
if let crate::models::ContentItem::OutputText { text } = &content[0] {
|
||||
assert_eq!(text, "Hello world");
|
||||
} else {
|
||||
panic!("unexpected content variant");
|
||||
}
|
||||
}
|
||||
other => panic!("unexpected first event: {other:?}"),
|
||||
}
|
||||
|
||||
assert!(matches!(events[1], ResponseEvent::Completed { .. }));
|
||||
}
|
||||
|
||||
/// Function call arguments arriving in multiple chunks are merged into a
|
||||
/// single `FunctionCall` item and the stream ends with `Completed`.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn merges_function_call_arguments() {
|
||||
let sse = concat!(
|
||||
"data: {\"id\":\"chatcmpl-123\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_123\",\"type\":\"function\",\"function\":{\"name\":\"get_weather\"}}]},\"finish_reason\":null,\"index\":0}]}\n\n",
|
||||
"data: {\"id\":\"chatcmpl-123\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"{\\\"location\\\":\\\"San Franci\"}}]},\"finish_reason\":null,\"index\":0}]}\n\n",
|
||||
"data: {\"id\":\"chatcmpl-123\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"sco\\\"}\"}}]},\"finish_reason\":null,\"index\":0}]}\n\n",
|
||||
"data: {\"id\":\"chatcmpl-123\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{},\"finish_reason\":\"tool_calls\",\"index\":0}],\"usage\":{}}\n\n"
|
||||
);
|
||||
|
||||
let events = collect_events(sse, false).await;
|
||||
|
||||
assert_eq!(events.len(), 2, "function call and Completed expected");
|
||||
|
||||
match &events[0] {
|
||||
ResponseEvent::OutputItemDone(ResponseItem::FunctionCall {
|
||||
name,
|
||||
arguments,
|
||||
call_id,
|
||||
}) => {
|
||||
assert_eq!(name, "get_weather");
|
||||
assert_eq!(call_id, "call_123");
|
||||
assert_eq!(arguments, "{\"location\":\"San Francisco\"}");
|
||||
}
|
||||
other => panic!("unexpected first event: {other:?}"),
|
||||
}
|
||||
|
||||
assert!(matches!(events[1], ResponseEvent::Completed { .. }));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user