mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Merge branch 'main' into codex/implement-tests-for-json-payload-construction
This commit is contained in:
47
codex-rs/Cargo.lock
generated
47
codex-rs/Cargo.lock
generated
@@ -250,6 +250,28 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
|
||||
dependencies = [
|
||||
"async-stream-impl",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream-impl"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.104",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.88"
|
||||
@@ -654,6 +676,7 @@ dependencies = [
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"toml 0.9.1",
|
||||
"tracing",
|
||||
@@ -4516,6 +4539,30 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-stream"
|
||||
version = "0.1.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-test"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.15"
|
||||
|
||||
@@ -64,4 +64,5 @@ maplit = "1.0.2"
|
||||
predicates = "3"
|
||||
pretty_assertions = "1.4.1"
|
||||
tempfile = "3"
|
||||
tokio-test = "0.4"
|
||||
wiremock = "0.6"
|
||||
|
||||
@@ -393,34 +393,30 @@ async fn stream_from_fixture(path: impl AsRef<Path>) -> Result<ResponseStream> {
|
||||
Ok(ResponseStream { rx_event })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#![allow(clippy::unwrap_used, clippy::print_stdout, clippy::expect_used)]
|
||||
|
||||
use super::*;
|
||||
use crate::client_common::Prompt;
|
||||
use crate::config::Config;
|
||||
use crate::config::ConfigOverrides;
|
||||
use crate::config::ConfigToml;
|
||||
use crate::config_types::ReasoningEffort as ReasoningEffortConfig;
|
||||
use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use crate::config::{Config, ConfigOverrides, ConfigToml};
|
||||
use crate::config_types::{
|
||||
ReasoningEffort as ReasoningEffortConfig, ReasoningSummary as ReasoningSummaryConfig,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use tempfile::TempDir;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_test::io::Builder as IoBuilder;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::Request;
|
||||
use wiremock::Respond;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
use wiremock::{
|
||||
matchers::{method, path},
|
||||
Mock, MockServer, Request, Respond, ResponseTemplate,
|
||||
};
|
||||
|
||||
// ─────────────────────────── Helpers ───────────────────────────
|
||||
|
||||
@@ -465,6 +461,28 @@ mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
/// Runs the SSE parser on pre-chunked byte slices and returns every event
|
||||
/// (including any final `Err` from a stream-closure check).
|
||||
async fn collect_events(chunks: &[&[u8]]) -> Vec<Result<ResponseEvent>> {
|
||||
let mut builder = IoBuilder::new();
|
||||
for chunk in chunks {
|
||||
builder.read(chunk);
|
||||
}
|
||||
|
||||
let reader = builder.build();
|
||||
let stream = ReaderStream::new(reader).map_err(CodexErr::Io);
|
||||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(16);
|
||||
tokio::spawn(process_sse(stream, tx));
|
||||
|
||||
let mut events = Vec::new();
|
||||
while let Some(ev) = rx.recv().await {
|
||||
events.push(ev);
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
/// Builds an in-memory SSE stream from JSON fixtures and returns only the
|
||||
/// successfully parsed events (panics on internal channel errors).
|
||||
async fn run_sse(events: Vec<serde_json::Value>) -> Vec<ResponseEvent> {
|
||||
let mut body = String::new();
|
||||
for e in events {
|
||||
@@ -478,9 +496,11 @@ mod tests {
|
||||
body.push_str(&format!("event: {kind}\ndata: {e}\n\n"));
|
||||
}
|
||||
}
|
||||
|
||||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
|
||||
let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io);
|
||||
tokio::spawn(process_sse(stream, tx));
|
||||
|
||||
let mut out = Vec::new();
|
||||
while let Some(ev) = rx.recv().await {
|
||||
out.push(ev.expect("channel closed"));
|
||||
@@ -488,7 +508,7 @@ mod tests {
|
||||
out
|
||||
}
|
||||
|
||||
// ─────────────── Retry / back-off behaviour tests ───────────────
|
||||
// ───────────── Retry / back-off behaviour tests ─────────────
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn retries_once_on_server_error() {
|
||||
@@ -500,8 +520,7 @@ mod tests {
|
||||
struct SeqResponder;
|
||||
impl Respond for SeqResponder {
|
||||
fn respond(&self, _req: &Request) -> ResponseTemplate {
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
static CALLS: AtomicUsize = AtomicUsize::new(0);
|
||||
let n = CALLS.fetch_add(1, Ordering::SeqCst);
|
||||
if n == 0 {
|
||||
@@ -651,7 +670,101 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────── Table-driven SSE event-kind tests ───────────
|
||||
// ───────────────────────────
|
||||
// SSE-parser tests
|
||||
// ───────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn parses_items_and_completed() {
|
||||
let item1 = json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "Hello"}]
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let item2 = json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "World"}]
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let completed = json!({
|
||||
"type": "response.completed",
|
||||
"response": { "id": "resp1" }
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n");
|
||||
let sse2 = format!("event: response.output_item.done\ndata: {item2}\n\n");
|
||||
let sse3 = format!("event: response.completed\ndata: {completed}\n\n");
|
||||
|
||||
let events = collect_events(&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()]).await;
|
||||
|
||||
assert_eq!(events.len(), 3);
|
||||
|
||||
assert!(matches!(
|
||||
&events[0],
|
||||
Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. }))
|
||||
if role == "assistant"
|
||||
));
|
||||
|
||||
assert!(matches!(
|
||||
&events[1],
|
||||
Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. }))
|
||||
if role == "assistant"
|
||||
));
|
||||
|
||||
match &events[2] {
|
||||
Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
}) => {
|
||||
assert_eq!(response_id, "resp1");
|
||||
assert!(token_usage.is_none());
|
||||
}
|
||||
other => panic!("unexpected third event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn error_when_missing_completed() {
|
||||
let item1 = json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "Hello"}]
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n");
|
||||
|
||||
let events = collect_events(&[sse1.as_bytes()]).await;
|
||||
|
||||
assert_eq!(events.len(), 2);
|
||||
|
||||
assert!(matches!(events[0], Ok(ResponseEvent::OutputItemDone(_))));
|
||||
|
||||
match &events[1] {
|
||||
Err(CodexErr::Stream(msg)) => {
|
||||
assert_eq!(msg, "stream closed before response.completed")
|
||||
}
|
||||
other => panic!("unexpected second event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
// ───────────────────────────
|
||||
// Table-driven event-kind test
|
||||
// ───────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn table_driven_event_kinds() {
|
||||
@@ -720,9 +833,14 @@ mod tests {
|
||||
for case in cases {
|
||||
let mut evs = vec![case.event];
|
||||
evs.push(completed.clone());
|
||||
|
||||
let out = run_sse(evs).await;
|
||||
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
|
||||
assert!((case.expect_first)(&out[0]), "case {}", case.name);
|
||||
assert!(
|
||||
(case.expect_first)(&out[0]),
|
||||
"first event mismatch in case {}",
|
||||
case.name
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user