Merge branch 'main' into codex/implement-tests-for-json-payload-construction

This commit is contained in:
aibrahim-oai
2025-07-12 17:26:48 -07:00
committed by GitHub
7 changed files with 237 additions and 54 deletions

View File

@@ -392,27 +392,36 @@ async fn stream_from_fixture(path: impl AsRef<Path>) -> Result<ResponseStream> {
Ok(ResponseStream { rx_event })
}
#[cfg(test)]
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::print_stdout)]
#![allow(
clippy::unwrap_used,
clippy::print_stdout,
clippy::expect_used
)]
use super::*;
use crate::client_common::Prompt;
use crate::config::Config;
use crate::config::ConfigOverrides;
use crate::config::ConfigToml;
use crate::config::{Config, ConfigOverrides, ConfigToml};
use crate::config_types::{
ReasoningEffort as ReasoningEffortConfig,
ReasoningSummary as ReasoningSummaryConfig,
};
use futures::StreamExt;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use reqwest::StatusCode;
use serde_json::json;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tempfile::TempDir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use tokio::sync::mpsc;
use tokio_util::io::ReaderStream;
use wiremock::{
matchers::{method, path},
Mock, MockServer, Request, Respond, ResponseTemplate,
};
// ─────────────────────────── Helpers ───────────────────────────
fn default_config(provider: ModelProviderInfo) -> Arc<Config> {
let codex_home = TempDir::new().unwrap();
@@ -450,24 +459,46 @@ mod tests {
fn sse_completed(id: &str) -> String {
format!(
"event: response.completed\n\
data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{id}\",\"output\":[]}}}}\n\n\n"
data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{id}\",\"output\":[]}}}}\n\n\n"
)
}
/// When the first request returns a 500 the client should perform exactly one retry
/// (as governed by the `OPENAI_REQUEST_MAX_RETRIES=1` env-var we set below) and then
/// succeed. This exercises the generic 5xx retry path.
async fn run_sse(events: Vec<serde_json::Value>) -> Vec<ResponseEvent> {
let mut body = String::new();
for e in events {
let kind = e
.get("type")
.and_then(|v| v.as_str())
.expect("fixture event missing type");
if e.as_object().map(|o| o.len() == 1).unwrap_or(false) {
body.push_str(&format!("event: {kind}\n\n"));
} else {
body.push_str(&format!("event: {kind}\ndata: {e}\n\n"));
}
}
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io);
tokio::spawn(process_sse(stream, tx));
let mut out = Vec::new();
while let Some(ev) = rx.recv().await {
out.push(ev.expect("channel closed"));
}
out
}
// ─────────────── Retry / back-off behaviour tests ───────────────
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retries_once_on_server_error() {
if std::env::var(crate::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
return;
}
let server = MockServer::start().await;
struct SeqResponder;
impl Respond for SeqResponder {
fn respond(&self, _req: &Request) -> ResponseTemplate {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicUsize, Ordering};
static CALLS: AtomicUsize = AtomicUsize::new(0);
let n = CALLS.fetch_add(1, Ordering::SeqCst);
if n == 0 {
@@ -479,6 +510,7 @@ mod tests {
}
}
}
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(SeqResponder)
@@ -498,9 +530,6 @@ mod tests {
}
}
/// A 429 response that carries a `retry-after` header should cause the client to pause
/// for the specified delay before retrying. We record wall-clock times and assert that the
/// delta is ≥ the header value (1 s).
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retry_after_header_delay() {
if std::env::var(crate::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
@@ -508,6 +537,7 @@ mod tests {
}
let server = MockServer::start().await;
let times = Arc::new(Mutex::new(Vec::new()));
struct SeqResponder {
times: Arc<Mutex<Vec<Instant>>>,
}
@@ -524,6 +554,7 @@ mod tests {
}
}
}
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(SeqResponder {
@@ -544,12 +575,10 @@ mod tests {
}
}
let times = times.lock().unwrap();
assert!(times.len() == 2);
assert_eq!(times.len(), 2);
assert!(times[1] - times[0] >= Duration::from_secs(1));
}
/// A 429 without `retry-after` triggers the exponential back-off path. We assert that the
/// second attempt occurs at least 100 ms after the first (the current back-off floor).
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retry_backoff_no_header() {
if std::env::var(crate::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
@@ -557,6 +586,7 @@ mod tests {
}
let server = MockServer::start().await;
let times = Arc::new(Mutex::new(Vec::new()));
struct SeqResponder {
times: Arc<Mutex<Vec<Instant>>>,
}
@@ -573,6 +603,7 @@ mod tests {
}
}
}
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(SeqResponder {
@@ -593,18 +624,17 @@ mod tests {
}
}
let times = times.lock().unwrap();
assert!(times.len() == 2);
assert_eq!(times.len(), 2);
assert!(times[1] - times[0] >= Duration::from_millis(100));
}
/// Non-retryable 4xx errors should surface to the caller with their status code and body
/// intact; verify that behaviour with a `400 Bad Request` fixture.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn permanent_error_bubbles_body() {
if std::env::var(crate::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
return;
}
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(ResponseTemplate::new(400).set_body_string("bad"))
@@ -616,16 +646,88 @@ mod tests {
let client = create_test_client(&server);
let prompt = Prompt::default();
let res = client.stream(&prompt).await;
match res {
match client.stream(&prompt).await {
Ok(_) => panic!("expected error"),
Err(err) => match err {
CodexErr::UnexpectedStatus(code, body) => {
assert_eq!(code, StatusCode::BAD_REQUEST);
assert_eq!(body, "bad");
}
other => panic!("unexpected error: {other:?}"),
Err(CodexErr::UnexpectedStatus(code, body)) => {
assert_eq!(code, StatusCode::BAD_REQUEST);
assert_eq!(body, "bad");
}
Err(other) => panic!("unexpected error: {other:?}"),
}
}
// ─────────── Table-driven SSE event-kind tests ───────────
#[tokio::test]
async fn table_driven_event_kinds() {
struct TestCase {
name: &'static str,
event: serde_json::Value,
expect_first: fn(&ResponseEvent) -> bool,
expected_len: usize,
}
fn is_created(ev: &ResponseEvent) -> bool {
matches!(ev, ResponseEvent::Created)
}
fn is_output(ev: &ResponseEvent) -> bool {
matches!(ev, ResponseEvent::OutputItemDone(_))
}
fn is_completed(ev: &ResponseEvent) -> bool {
matches!(ev, ResponseEvent::Completed { .. })
}
let completed = json!({
"type": "response.completed",
"response": {
"id": "c",
"usage": {
"input_tokens": 0,
"input_tokens_details": null,
"output_tokens": 0,
"output_tokens_details": null,
"total_tokens": 0
},
"output": []
}
});
let cases = vec![
TestCase {
name: "created",
event: json!({"type": "response.created", "response": {}}),
expect_first: is_created,
expected_len: 2,
},
TestCase {
name: "output_item.done",
event: json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"content": [
{"type": "output_text", "text": "hi"}
]
}
}),
expect_first: is_output,
expected_len: 2,
},
TestCase {
name: "unknown",
event: json!({"type": "response.new_tool_event"}),
expect_first: is_completed,
expected_len: 1,
},
];
for case in cases {
let mut evs = vec![case.event];
evs.push(completed.clone());
let out = run_sse(evs).await;
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
assert!((case.expect_first)(&out[0]), "case {}", case.name);
}
}
}

View File

@@ -0,0 +1,16 @@
[
{
"type": "response.completed",
"response": {
"id": "__ID__",
"usage": {
"input_tokens": 0,
"input_tokens_details": null,
"output_tokens": 0,
"output_tokens_details": null,
"total_tokens": 0
},
"output": []
}
}
]

View File

@@ -0,0 +1,3 @@
[
{"type": "response.output_item.done"}
]

View File

@@ -11,6 +11,7 @@ mod test_support;
use serde_json::Value;
use tempfile::TempDir;
use test_support::load_default_config_for_test;
use test_support::load_sse_fixture_with_id;
use tokio::time::timeout;
use wiremock::Match;
use wiremock::Mock;
@@ -42,12 +43,9 @@ impl Match for HasPrevId {
}
}
/// Build minimal SSE stream with completed marker.
/// Build minimal SSE stream with completed marker using the JSON fixture.
fn sse_completed(id: &str) -> String {
format!(
"event: response.completed\n\
data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{id}\",\"output\":[]}}}}\n\n\n"
)
load_sse_fixture_with_id("tests/fixtures/completed_template.json", id)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]

View File

@@ -12,6 +12,8 @@ use codex_core::protocol::Op;
mod test_support;
use tempfile::TempDir;
use test_support::load_default_config_for_test;
use test_support::load_sse_fixture;
use test_support::load_sse_fixture_with_id;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
@@ -22,15 +24,11 @@ use wiremock::matchers::method;
use wiremock::matchers::path;
fn sse_incomplete() -> String {
// Only a single line; missing the completed event.
"event: response.output_item.done\n\n".to_string()
load_sse_fixture("tests/fixtures/incomplete_sse.json")
}
fn sse_completed(id: &str) -> String {
format!(
"event: response.completed\n\
data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{id}\",\"output\":[]}}}}\n\n\n"
)
load_sse_fixture_with_id("tests/fixtures/completed_template.json", id)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]

View File

@@ -21,3 +21,58 @@ pub fn load_default_config_for_test(codex_home: &TempDir) -> Config {
)
.expect("defaults for test should always succeed")
}
/// Builds an SSE stream body from a JSON fixture.
///
/// The fixture must contain an array of objects where each object represents a
/// single SSE event with at least a `type` field matching the `event:` value.
/// Additional fields become the JSON payload for the `data:` line. An object
/// with only a `type` field results in an event with no `data:` section. This
/// makes it trivial to extend the fixtures as OpenAI adds new event kinds or
/// fields.
#[allow(dead_code)]
pub fn load_sse_fixture(path: impl AsRef<std::path::Path>) -> String {
let events: Vec<serde_json::Value> =
serde_json::from_reader(std::fs::File::open(path).expect("read fixture"))
.expect("parse JSON fixture");
events
.into_iter()
.map(|e| {
let kind = e
.get("type")
.and_then(|v| v.as_str())
.expect("fixture event missing type");
if e.as_object().map(|o| o.len() == 1).unwrap_or(false) {
format!("event: {kind}\n\n")
} else {
format!("event: {kind}\ndata: {e}\n\n")
}
})
.collect()
}
/// Same as [`load_sse_fixture`], but replaces the placeholder `__ID__` in the
/// fixture template with the supplied identifier before parsing. This lets a
/// single JSON template be reused by multiple tests that each need a unique
/// `response_id`.
#[allow(dead_code)]
pub fn load_sse_fixture_with_id(path: impl AsRef<std::path::Path>, id: &str) -> String {
let raw = std::fs::read_to_string(path).expect("read fixture template");
let replaced = raw.replace("__ID__", id);
let events: Vec<serde_json::Value> =
serde_json::from_str(&replaced).expect("parse JSON fixture");
events
.into_iter()
.map(|e| {
let kind = e
.get("type")
.and_then(|v| v.as_str())
.expect("fixture event missing type");
if e.as_object().map(|o| o.len() == 1).unwrap_or(false) {
format!("event: {kind}\n\n")
} else {
format!("event: {kind}\ndata: {e}\n\n")
}
})
.collect()
}

View File

@@ -9,6 +9,7 @@ use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::Submission;
use codex_core::protocol::TaskCompleteEvent;
use mcp_types::CallToolResult;
use mcp_types::CallToolResultContent;
@@ -66,14 +67,24 @@ pub async fn run_codex_tool_session(
.send(codex_event_to_notification(&first_event))
.await;
if let Err(e) = codex
.submit(Op::UserInput {
// Use the original MCP request ID as the `sub_id` for the Codex submission so that
// any events emitted for this tool-call can be correlated with the
// originating `tools/call` request.
let sub_id = match &id {
RequestId::String(s) => s.clone(),
RequestId::Integer(n) => n.to_string(),
};
let submission = Submission {
id: sub_id,
op: Op::UserInput {
items: vec![InputItem::Text {
text: initial_prompt.clone(),
}],
})
.await
{
},
};
if let Err(e) = codex.submit_with_id(submission).await {
tracing::error!("Failed to submit initial prompt: {e}");
}