remove get_responses_requests and get_responses_request_bodies to use in-place matcher (#8858)

This commit is contained in:
Ahmed Ibrahim
2026-01-08 13:57:48 -08:00
committed by GitHub
parent 41a317321d
commit 0d3e673019
8 changed files with 241 additions and 309 deletions

View File

@@ -70,6 +70,10 @@ impl ResponsesRequest {
self.0.body_json().unwrap()
}
pub fn body_bytes(&self) -> Vec<u8> {
self.0.body.clone()
}
/// Returns all `input_text` spans from `message` inputs for the provided role.
pub fn message_input_texts(&self, role: &str) -> Vec<String> {
self.inputs_of_type("message")
@@ -701,33 +705,6 @@ pub async fn start_mock_server() -> MockServer {
server
}
// todo(aibrahim): remove this and use our search matching patterns directly
/// Get all POST requests to `/responses` endpoints from the mock server.
/// Filters out GET requests (e.g., `/models`) .
pub async fn get_responses_requests(server: &MockServer) -> Vec<wiremock::Request> {
server
.received_requests()
.await
.expect("mock server should not fail")
.into_iter()
.filter(|req| req.method == "POST" && req.url.path().ends_with("/responses"))
.collect()
}
// todo(aibrahim): remove this and use our search matching patterns directly
/// Get request bodies as JSON values from POST requests to `/responses` endpoints.
/// Filters out GET requests (e.g., `/models`) .
pub async fn get_responses_request_bodies(server: &MockServer) -> Vec<Value> {
get_responses_requests(server)
.await
.into_iter()
.map(|req| {
req.body_json::<Value>()
.expect("request body to be valid JSON")
})
.collect()
}
#[derive(Clone)]
pub struct FunctionCallResponseMocks {
pub function_call: ResponseMock,

View File

@@ -23,10 +23,11 @@ use tempfile::TempDir;
use wiremock::MockServer;
use crate::load_default_config_for_test;
use crate::responses::get_responses_request_bodies;
use crate::responses::start_mock_server;
use crate::streaming_sse::StreamingSseServer;
use crate::wait_for_event;
use wiremock::Match;
use wiremock::matchers::path_regex;
type ConfigMutator = dyn FnOnce(&mut Config) + Send;
type PreBuildHook = dyn FnOnce(&Path) + Send + 'static;
@@ -322,7 +323,18 @@ impl TestCodexHarness {
}
pub async fn request_bodies(&self) -> Vec<Value> {
get_responses_request_bodies(&self.server).await
let path_matcher = path_regex(".*/responses$");
self.server
.received_requests()
.await
.expect("mock server should not fail")
.into_iter()
.filter(|req| path_matcher.matches(req))
.map(|req| {
req.body_json::<Value>()
.expect("request body to be valid JSON")
})
.collect()
}
pub async fn function_call_output_value(&self, call_id: &str) -> Value {

View File

@@ -23,6 +23,7 @@ use codex_otel::OtelManager;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::Verbosity;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::WebSearchAction;
@@ -31,9 +32,9 @@ use codex_protocol::user_input::UserInput;
use core_test_support::load_default_config_for_test;
use core_test_support::load_sse_fixture_with_id;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::get_responses_requests;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_once_match;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::sse_failed;
use core_test_support::skip_if_no_network;
@@ -324,17 +325,7 @@ async fn includes_conversation_id_and_model_headers_in_request() {
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let resp_mock = mount_sse_once(&server, sse_completed("resp1")).await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
@@ -373,24 +364,19 @@ async fn includes_conversation_id_and_model_headers_in_request() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// get request from the server
let requests = get_responses_requests(&server).await;
let request = requests
.first()
.expect("expected POST request to /responses");
let request_conversation_id = request.headers.get("conversation_id").unwrap();
let request_authorization = request.headers.get("authorization").unwrap();
let request_originator = request.headers.get("originator").unwrap();
let request = resp_mock.single_request();
assert_eq!(request.path(), "/v1/responses");
let request_conversation_id = request
.header("conversation_id")
.expect("conversation_id header");
let request_authorization = request
.header("authorization")
.expect("authorization header");
let request_originator = request.header("originator").expect("originator header");
assert_eq!(
request_conversation_id.to_str().unwrap(),
conversation_id.to_string()
);
assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs");
assert_eq!(
request_authorization.to_str().unwrap(),
"Bearer Test API Key"
);
assert_eq!(request_conversation_id, conversation_id.to_string());
assert_eq!(request_originator, "codex_cli_rs");
assert_eq!(request_authorization, "Bearer Test API Key");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -451,17 +437,7 @@ async fn chatgpt_auth_sends_correct_request() {
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/api/codex/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let resp_mock = mount_sse_once(&server, sse_completed("resp1")).await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/api/codex", server.uri())),
@@ -499,27 +475,24 @@ async fn chatgpt_auth_sends_correct_request() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// get request from the server
let requests = get_responses_requests(&server).await;
let request = requests
.first()
.expect("expected POST request to /responses");
let request_conversation_id = request.headers.get("conversation_id").unwrap();
let request_authorization = request.headers.get("authorization").unwrap();
let request_originator = request.headers.get("originator").unwrap();
let request_chatgpt_account_id = request.headers.get("chatgpt-account-id").unwrap();
let request_body = request.body_json::<serde_json::Value>().unwrap();
let request = resp_mock.single_request();
assert_eq!(request.path(), "/api/codex/responses");
let request_conversation_id = request
.header("conversation_id")
.expect("conversation_id header");
let request_authorization = request
.header("authorization")
.expect("authorization header");
let request_originator = request.header("originator").expect("originator header");
let request_chatgpt_account_id = request
.header("chatgpt-account-id")
.expect("chatgpt-account-id header");
let request_body = request.body_json();
assert_eq!(
request_conversation_id.to_str().unwrap(),
conversation_id.to_string()
);
assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs");
assert_eq!(
request_authorization.to_str().unwrap(),
"Bearer Access Token"
);
assert_eq!(request_chatgpt_account_id.to_str().unwrap(), "account_id");
assert_eq!(request_conversation_id, conversation_id.to_string());
assert_eq!(request_originator, "codex_cli_rs");
assert_eq!(request_authorization, "Bearer Access Token");
assert_eq!(request_chatgpt_account_id, "account_id");
assert!(request_body["stream"].as_bool().unwrap());
assert_eq!(
request_body["include"][0].as_str().unwrap(),
@@ -1107,17 +1080,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
"data: {\"type\":\"response.created\",\"response\":{}}\n\n",
"data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_1\"}}\n\n",
);
let template = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_body, "text/event-stream");
Mock::given(method("POST"))
.and(path("/openai/responses"))
.respond_with(template)
.expect(1)
.mount(&server)
.await;
let resp_mock = mount_sse_once(&server, sse_body.to_string()).await;
let provider = ModelProviderInfo {
name: "azure".into(),
@@ -1202,6 +1165,13 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
arguments: "{}".into(),
call_id: "function-call-id".into(),
});
prompt.input.push(ResponseItem::FunctionCallOutput {
call_id: "function-call-id".into(),
output: FunctionCallOutputPayload {
content: "ok".into(),
..Default::default()
},
});
prompt.input.push(ResponseItem::LocalShellCall {
id: Some("local-shell-id".into()),
call_id: Some("local-shell-call-id".into()),
@@ -1221,6 +1191,10 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
name: "custom_tool".into(),
input: "{}".into(),
});
prompt.input.push(ResponseItem::CustomToolCallOutput {
call_id: "custom-tool-call-id".into(),
output: "ok".into(),
});
let mut stream = client
.stream(&prompt)
@@ -1233,21 +1207,27 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
}
}
let requests = get_responses_requests(&server).await;
assert_eq!(requests.len(), 1, "expected a single POST request");
let body: serde_json::Value = requests[0]
.body_json()
.expect("request body to be valid JSON");
let request = resp_mock.single_request();
assert_eq!(request.path(), "/openai/responses");
let body = request.body_json();
assert_eq!(body["store"], serde_json::Value::Bool(true));
assert_eq!(body["stream"], serde_json::Value::Bool(true));
assert_eq!(body["input"].as_array().map(Vec::len), Some(6));
assert_eq!(body["input"].as_array().map(Vec::len), Some(8));
assert_eq!(body["input"][0]["id"].as_str(), Some("reasoning-id"));
assert_eq!(body["input"][1]["id"].as_str(), Some("message-id"));
assert_eq!(body["input"][2]["id"].as_str(), Some("web-search-id"));
assert_eq!(body["input"][3]["id"].as_str(), Some("function-id"));
assert_eq!(body["input"][4]["id"].as_str(), Some("local-shell-id"));
assert_eq!(body["input"][5]["id"].as_str(), Some("custom-tool-id"));
assert_eq!(
body["input"][4]["call_id"].as_str(),
Some("function-call-id")
);
assert_eq!(body["input"][5]["id"].as_str(), Some("local-shell-id"));
assert_eq!(body["input"][6]["id"].as_str(), Some("custom-tool-id"));
assert_eq!(
body["input"][7]["call_id"].as_str(),
Some("custom-tool-call-id")
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -1784,16 +1764,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
]"##;
let sse1 = core_test_support::load_sse_fixture_with_id_from_str(sse_raw, "resp1");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse1.clone(), "text/event-stream"),
)
.expect(3) // respond identically to the three sequential turns
.mount(&server)
.await;
let request_log = mount_sse_sequence(&server, vec![sse1.clone(), sse1.clone(), sse1]).await;
// Configure provider to point to mock server (Responses API) and use API key auth.
let model_provider = ModelProviderInfo {
@@ -1847,8 +1818,11 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// Inspect the three captured requests.
let requests = get_responses_requests(&server).await;
let requests = request_log.requests();
assert_eq!(requests.len(), 3, "expected 3 requests (one per turn)");
for request in &requests {
assert_eq!(request.path(), "/v1/responses");
}
// Replace full-array compare with tail-only raw JSON compare using a single hard-coded value.
let r3_tail_expected = json!([
@@ -1880,8 +1854,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
]);
let r3_input_array = requests[2]
.body_json::<serde_json::Value>()
.unwrap()
.body_json()
.get("input")
.and_then(|v| v.as_array())
.cloned()

View File

@@ -31,7 +31,6 @@ use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::get_responses_requests;
use core_test_support::responses::mount_compact_json_once;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_once_match;
@@ -330,7 +329,7 @@ async fn manual_compact_uses_custom_prompt() {
let server = start_mock_server().await;
let sse_stream = sse(vec![ev_completed("r1")]);
mount_sse_once(&server, sse_stream).await;
let response_mock = mount_sse_once(&server, sse_stream).await;
let custom_prompt = "Use this compact prompt instead";
@@ -358,11 +357,7 @@ async fn manual_compact_uses_custom_prompt() {
assert_eq!(message, COMPACT_WARNING_MESSAGE);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = get_responses_requests(&server).await;
let body = requests
.iter()
.find_map(|req| req.body_json::<serde_json::Value>().ok())
.expect("summary request body");
let body = response_mock.single_request().body_json();
let input = body
.get("input")
@@ -571,7 +566,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
model_compact_response_3_sse,
model_final_response_sse,
];
mount_sse_sequence(&server, bodies).await;
let request_log = mount_sse_sequence(&server, bodies).await;
// Start the conversation with the user message
codex
@@ -586,11 +581,8 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// collect the requests payloads from the model
let requests_payloads = get_responses_requests(&server).await;
let body = requests_payloads[0]
.body_json::<serde_json::Value>()
.unwrap();
let requests_payloads = request_log.requests();
let body = requests_payloads[0].body_json();
let input = body.get("input").and_then(|v| v.as_array()).unwrap();
fn normalize_inputs(values: &[serde_json::Value]) -> Vec<serde_json::Value> {
@@ -631,9 +623,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
prefixed_third_summary.as_str(),
];
for (i, expected_summary) in compaction_indices.into_iter().zip(expected_summaries) {
let body = requests_payloads.clone()[i]
.body_json::<serde_json::Value>()
.unwrap();
let body = requests_payloads.clone()[i].body_json();
let input = body.get("input").and_then(|v| v.as_array()).unwrap();
let input = normalize_inputs(input);
assert_eq!(input.len(), 3);
@@ -996,7 +986,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
]);
for (i, request) in requests_payloads.iter().enumerate() {
let body = request.body_json::<serde_json::Value>().unwrap();
let body = request.body_json();
let input = body.get("input").and_then(|v| v.as_array()).unwrap();
let expected_input = expected_requests_inputs[i].as_array().unwrap();
assert_eq!(normalize_inputs(input), normalize_inputs(expected_input));
@@ -1034,33 +1024,7 @@ async fn auto_compact_runs_after_token_limit_hit() {
]);
let prefixed_auto_summary = AUTO_SUMMARY_TEXT;
let first_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(FIRST_AUTO_MSG)
&& !body.contains(SECOND_AUTO_MSG)
&& !body_contains_text(body, SUMMARIZATION_PROMPT)
};
mount_sse_once_match(&server, first_matcher, sse1).await;
let second_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(SECOND_AUTO_MSG)
&& body.contains(FIRST_AUTO_MSG)
&& !body_contains_text(body, SUMMARIZATION_PROMPT)
};
mount_sse_once_match(&server, second_matcher, sse2).await;
let third_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body_contains_text(body, SUMMARIZATION_PROMPT)
};
mount_sse_once_match(&server, third_matcher, sse3).await;
let fourth_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT)
};
mount_sse_once_match(&server, fourth_matcher, sse4).await;
let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await;
let model_provider = non_openai_model_provider(&server);
@@ -1111,53 +1075,49 @@ async fn auto_compact_runs_after_token_limit_hit() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = get_responses_requests(&server).await;
let requests = request_log.requests();
let request_bodies: Vec<String> = requests
.iter()
.map(|request| request.body_json().to_string())
.collect();
assert_eq!(
requests.len(),
request_bodies.len(),
4,
"expected user turns, a compaction request, and the follow-up turn; got {}",
requests.len()
request_bodies.len()
);
let is_auto_compact = |req: &wiremock::Request| {
body_contains_text(
std::str::from_utf8(&req.body).unwrap_or(""),
SUMMARIZATION_PROMPT,
)
};
let auto_compact_count = requests.iter().filter(|req| is_auto_compact(req)).count();
let auto_compact_count = request_bodies
.iter()
.filter(|body| body_contains_text(body, SUMMARIZATION_PROMPT))
.count();
assert_eq!(
auto_compact_count, 1,
"expected exactly one auto compact request"
);
let auto_compact_index = requests
let auto_compact_index = request_bodies
.iter()
.enumerate()
.find_map(|(idx, req)| is_auto_compact(req).then_some(idx))
.find_map(|(idx, body)| body_contains_text(body, SUMMARIZATION_PROMPT).then_some(idx))
.expect("auto compact request missing");
assert_eq!(
auto_compact_index, 2,
"auto compact should add a third request"
);
let follow_up_index = requests
let follow_up_index = request_bodies
.iter()
.enumerate()
.rev()
.find_map(|(idx, req)| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
.find_map(|(idx, body)| {
(body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT))
.then_some(idx)
})
.expect("follow-up request missing");
assert_eq!(follow_up_index, 3, "follow-up request should be last");
let body_first = requests[0].body_json::<serde_json::Value>().unwrap();
let body_auto = requests[auto_compact_index]
.body_json::<serde_json::Value>()
.unwrap();
let body_follow_up = requests[follow_up_index]
.body_json::<serde_json::Value>()
.unwrap();
let body_first = requests[0].body_json();
let body_auto = requests[auto_compact_index].body_json();
let body_follow_up = requests[follow_up_index].body_json();
let instructions = body_auto
.get("instructions")
.and_then(|v| v.as_str())
@@ -1848,7 +1808,7 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
let follow_up_user = "FOLLOW_UP_AUTO_COMPACT";
let final_user = "FINAL_AUTO_COMPACT";
mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4, sse5, sse6]).await;
let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4, sse5, sse6]).await;
let model_provider = non_openai_model_provider(&server);
@@ -1897,10 +1857,10 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
"auto compact should not emit task lifecycle events"
);
let requests = get_responses_requests(&server).await;
let request_bodies: Vec<String> = requests
let request_bodies: Vec<String> = request_log
.requests()
.into_iter()
.map(|request| String::from_utf8(request.body).unwrap_or_default())
.map(|request| request.body_json().to_string())
.collect();
assert_eq!(
request_bodies.len(),

View File

@@ -24,9 +24,9 @@ use codex_core::protocol::WarningEvent;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_protocol::user_input::UserInput;
use core_test_support::load_default_config_for_test;
use core_test_support::responses::ResponseMock;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::get_responses_request_bodies;
use core_test_support::responses::mount_sse_once_match;
use core_test_support::responses::sse;
use core_test_support::wait_for_event;
@@ -148,7 +148,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
// 1. Arrange mocked SSE responses for the initial compact/resume/fork flow.
let server = MockServer::start().await;
mount_initial_flow(&server).await;
let request_log = mount_initial_flow(&server).await;
let expected_model = "gpt-5.1-codex";
// 2. Start a new conversation and drive it through the compact/resume/fork steps.
let (_home, config, manager, base) =
@@ -175,7 +175,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
user_turn(&forked, "AFTER_FORK").await;
// 3. Capture the requests to the model and validate the history slices.
let mut requests = gather_request_bodies(&server).await;
let mut requests = gather_request_bodies(&request_log);
normalize_compact_prompts(&mut requests);
// input after compact is a prefix of input after resume/fork
@@ -600,8 +600,8 @@ async fn compact_resume_after_second_compaction_preserves_history() {
// 1. Arrange mocked SSE responses for the initial flow plus the second compact.
let server = MockServer::start().await;
mount_initial_flow(&server).await;
mount_second_compact_flow(&server).await;
let mut request_log = mount_initial_flow(&server).await;
request_log.extend(mount_second_compact_flow(&server).await);
// 2. Drive the conversation through compact -> resume -> fork -> compact -> resume.
let (_home, config, manager, base) = start_test_conversation(&server, None).await;
@@ -637,7 +637,7 @@ async fn compact_resume_after_second_compaction_preserves_history() {
let resumed_again = resume_conversation(&manager, &config, forked_path).await;
user_turn(&resumed_again, AFTER_SECOND_RESUME).await;
let mut requests = gather_request_bodies(&server).await;
let mut requests = gather_request_bodies(&request_log);
normalize_compact_prompts(&mut requests);
let input_after_compact = json!(requests[requests.len() - 2]["input"]);
let input_after_resume = json!(requests[requests.len() - 1]["input"]);
@@ -771,15 +771,19 @@ fn normalize_line_endings(value: &mut Value) {
}
}
async fn gather_request_bodies(server: &MockServer) -> Vec<Value> {
let mut bodies = get_responses_request_bodies(server).await;
fn gather_request_bodies(request_log: &[ResponseMock]) -> Vec<Value> {
let mut bodies = request_log
.iter()
.flat_map(ResponseMock::requests)
.map(|request| request.body_json())
.collect::<Vec<_>>();
for body in &mut bodies {
normalize_line_endings(body);
}
bodies
}
async fn mount_initial_flow(server: &MockServer) {
async fn mount_initial_flow(server: &MockServer) -> Vec<ResponseMock> {
let sse1 = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed("r1"),
@@ -803,13 +807,13 @@ async fn mount_initial_flow(server: &MockServer) {
&& !body.contains("\"text\":\"AFTER_RESUME\"")
&& !body.contains("\"text\":\"AFTER_FORK\"")
};
mount_sse_once_match(server, match_first, sse1).await;
let first = mount_sse_once_match(server, match_first, sse1).await;
let match_compact = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body_contains_text(body, SUMMARIZATION_PROMPT) || body.contains(&json_fragment(FIRST_REPLY))
};
mount_sse_once_match(server, match_compact, sse2).await;
let compact = mount_sse_once_match(server, match_compact, sse2).await;
let match_after_compact = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
@@ -817,22 +821,24 @@ async fn mount_initial_flow(server: &MockServer) {
&& !body.contains("\"text\":\"AFTER_RESUME\"")
&& !body.contains("\"text\":\"AFTER_FORK\"")
};
mount_sse_once_match(server, match_after_compact, sse3).await;
let after_compact = mount_sse_once_match(server, match_after_compact, sse3).await;
let match_after_resume = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains("\"text\":\"AFTER_RESUME\"")
};
mount_sse_once_match(server, match_after_resume, sse4).await;
let after_resume = mount_sse_once_match(server, match_after_resume, sse4).await;
let match_after_fork = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains("\"text\":\"AFTER_FORK\"")
};
mount_sse_once_match(server, match_after_fork, sse5).await;
let after_fork = mount_sse_once_match(server, match_after_fork, sse5).await;
vec![first, compact, after_compact, after_resume, after_fork]
}
async fn mount_second_compact_flow(server: &MockServer) {
async fn mount_second_compact_flow(server: &MockServer) -> Vec<ResponseMock> {
let sse6 = sse(vec![
ev_assistant_message("m4", SUMMARY_TEXT),
ev_completed("r6"),
@@ -843,13 +849,15 @@ async fn mount_second_compact_flow(server: &MockServer) {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains("AFTER_FORK")
};
mount_sse_once_match(server, match_second_compact, sse6).await;
let second_compact = mount_sse_once_match(server, match_second_compact, sse6).await;
let match_after_second_resume = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(&format!("\"text\":\"{AFTER_SECOND_RESUME}\""))
};
mount_sse_once_match(server, match_after_second_resume, sse7).await;
let after_second_resume = mount_sse_once_match(server, match_after_second_resume, sse7).await;
vec![second_compact, after_second_resume]
}
async fn start_test_conversation(

View File

@@ -7,7 +7,6 @@ use codex_core::protocol::Op;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::get_responses_requests;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
@@ -21,7 +20,7 @@ async fn request_body_is_zstd_compressed_for_codex_backend_when_enabled() -> any
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_once(
let request_log = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
@@ -48,17 +47,10 @@ async fn request_body_is_zstd_compressed_for_codex_backend_when_enabled() -> any
// Wait until the task completes so the request definitely hit the server.
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = get_responses_requests(&server).await;
assert_eq!(requests.len(), 1);
let request = request_log.single_request();
assert_eq!(request.header("content-encoding").as_deref(), Some("zstd"));
let request = &requests[0];
let content_encoding = request
.headers
.get("content-encoding")
.and_then(|v| v.to_str().ok());
assert_eq!(content_encoding, Some("zstd"));
let decompressed = zstd::stream::decode_all(std::io::Cursor::new(request.body.clone()))?;
let decompressed = zstd::stream::decode_all(std::io::Cursor::new(request.body_bytes()))?;
let json: serde_json::Value = serde_json::from_slice(&decompressed)?;
assert!(
json.get("input").is_some(),
@@ -73,7 +65,7 @@ async fn request_body_is_not_compressed_for_api_key_auth_even_when_enabled() ->
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_once(
let request_log = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
@@ -97,16 +89,13 @@ async fn request_body_is_not_compressed_for_api_key_auth_even_when_enabled() ->
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = get_responses_requests(&server).await;
assert_eq!(requests.len(), 1);
let request = &requests[0];
let request = request_log.single_request();
assert!(
request.headers.get("content-encoding").is_none(),
request.header("content-encoding").is_none(),
"did not expect request compression for API-key auth"
);
let json: serde_json::Value = serde_json::from_slice(&request.body)?;
let json: serde_json::Value = serde_json::from_slice(&request.body_bytes())?;
assert!(
json.get("input").is_some(),
"expected request body to be plain Responses API JSON"

View File

@@ -23,7 +23,8 @@ use codex_core::review_format::render_review_output_text;
use codex_protocol::user_input::UserInput;
use core_test_support::load_default_config_for_test;
use core_test_support::load_sse_fixture_with_id_from_str;
use core_test_support::responses::get_responses_requests;
use core_test_support::responses::ResponseMock;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::skip_if_no_network;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
@@ -32,11 +33,7 @@ use std::sync::Arc;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt as _;
use uuid::Uuid;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
/// Verify that submitting `Op::Review` spawns a child task and emits
/// EnteredReviewMode -> ExitedReviewMode(None) -> TaskComplete
@@ -75,7 +72,7 @@ async fn review_op_emits_lifecycle_and_review_output() {
]"#;
let review_json_escaped = serde_json::to_string(&review_json).unwrap();
let sse_raw = sse_template.replace("__REVIEW__", &review_json_escaped);
let server = start_responses_server_with_sse(&sse_raw, 1).await;
let (server, _request_log) = start_responses_server_with_sse(&sse_raw, 1).await;
let codex_home = TempDir::new().unwrap();
let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await;
@@ -196,7 +193,7 @@ async fn review_op_with_plain_text_emits_review_fallback() {
}},
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let server = start_responses_server_with_sse(sse_raw, 1).await;
let (server, _request_log) = start_responses_server_with_sse(sse_raw, 1).await;
let codex_home = TempDir::new().unwrap();
let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await;
@@ -256,7 +253,7 @@ async fn review_filters_agent_message_related_events() {
}},
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let server = start_responses_server_with_sse(sse_raw, 1).await;
let (server, _request_log) = start_responses_server_with_sse(sse_raw, 1).await;
let codex_home = TempDir::new().unwrap();
let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await;
@@ -337,7 +334,7 @@ async fn review_does_not_emit_agent_message_on_structured_output() {
]"#;
let review_json_escaped = serde_json::to_string(&review_json).unwrap();
let sse_raw = sse_template.replace("__REVIEW__", &review_json_escaped);
let server = start_responses_server_with_sse(&sse_raw, 1).await;
let (server, _request_log) = start_responses_server_with_sse(&sse_raw, 1).await;
let codex_home = TempDir::new().unwrap();
let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await;
@@ -391,7 +388,7 @@ async fn review_uses_custom_review_model_from_config() {
let sse_raw = r#"[
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let server = start_responses_server_with_sse(sse_raw, 1).await;
let (server, request_log) = start_responses_server_with_sse(sse_raw, 1).await;
let codex_home = TempDir::new().unwrap();
// Choose a review model different from the main model; ensure it is used.
let codex = new_conversation_for_server(&server, &codex_home, |cfg| {
@@ -426,11 +423,9 @@ async fn review_uses_custom_review_model_from_config() {
let _complete = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// Assert the request body model equals the configured review model
let requests = get_responses_requests(&server).await;
let request = requests
.first()
.expect("expected POST request to /responses");
let body = request.body_json::<serde_json::Value>().unwrap();
let request = request_log.single_request();
assert_eq!(request.path(), "/v1/responses");
let body = request.body_json();
assert_eq!(body["model"].as_str().unwrap(), "gpt-5.1");
server.verify().await;
@@ -449,7 +444,7 @@ async fn review_input_isolated_from_parent_history() {
let sse_raw = r#"[
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let server = start_responses_server_with_sse(sse_raw, 1).await;
let (server, request_log) = start_responses_server_with_sse(sse_raw, 1).await;
// Seed a parent session history via resume file with both user + assistant items.
let codex_home = TempDir::new().unwrap();
@@ -547,11 +542,9 @@ async fn review_input_isolated_from_parent_history() {
let _complete = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// Assert the request `input` contains the environment context followed by the user review prompt.
let requests = get_responses_requests(&server).await;
let request = requests
.first()
.expect("expected POST request to /responses");
let body = request.body_json::<serde_json::Value>().unwrap();
let request = request_log.single_request();
assert_eq!(request.path(), "/v1/responses");
let body = request.body_json();
let input = body["input"].as_array().expect("input array");
assert!(
input.len() >= 2,
@@ -630,7 +623,7 @@ async fn review_history_surfaces_in_parent_session() {
}},
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let server = start_responses_server_with_sse(sse_raw, 2).await;
let (server, request_log) = start_responses_server_with_sse(sse_raw, 2).await;
let codex_home = TempDir::new().unwrap();
let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await;
@@ -674,9 +667,12 @@ async fn review_history_surfaces_in_parent_session() {
// Inspect the second request (parent turn) input contents.
// Parent turns include session initial messages (user_instructions, environment_context).
// Critically, no messages from the review thread should appear.
let requests = get_responses_requests(&server).await;
let requests = request_log.requests();
assert_eq!(requests.len(), 2);
let body = requests[1].body_json::<serde_json::Value>().unwrap();
for request in &requests {
assert_eq!(request.path(), "/v1/responses");
}
let body = requests[1].body_json();
let input = body["input"].as_array().expect("input array");
// Must include the followup as the last item for this turn
@@ -717,7 +713,7 @@ async fn review_uses_overridden_cwd_for_base_branch_merge_base() {
skip_if_no_network!();
let sse_raw = r#"[{"type":"response.completed", "response": {"id": "__ID__"}}]"#;
let server = start_responses_server_with_sse(sse_raw, 1).await;
let (server, request_log) = start_responses_server_with_sse(sse_raw, 1).await;
let initial_cwd = TempDir::new().unwrap();
@@ -792,9 +788,12 @@ async fn review_uses_overridden_cwd_for_base_branch_merge_base() {
let _entered = wait_for_event(&codex, |ev| matches!(ev, EventMsg::EnteredReviewMode(_))).await;
let _complete = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = get_responses_requests(&server).await;
let requests = request_log.requests();
assert_eq!(requests.len(), 1);
let body = requests[0].body_json::<serde_json::Value>().unwrap();
for request in &requests {
assert_eq!(request.path(), "/v1/responses");
}
let body = requests[0].body_json();
let input = body["input"].as_array().expect("input array");
let saw_merge_base_sha = input
@@ -810,20 +809,15 @@ async fn review_uses_overridden_cwd_for_base_branch_merge_base() {
}
/// Start a mock Responses API server and mount the given SSE stream body.
async fn start_responses_server_with_sse(sse_raw: &str, expected_requests: usize) -> MockServer {
async fn start_responses_server_with_sse(
sse_raw: &str,
expected_requests: usize,
) -> (MockServer, ResponseMock) {
let server = MockServer::start().await;
let sse = load_sse_fixture_with_id_from_str(sse_raw, &Uuid::new_v4().to_string());
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse.clone(), "text/event-stream"),
)
.expect(expected_requests as u64)
.mount(&server)
.await;
server
let responses = vec![sse; expected_requests];
let request_log = mount_sse_sequence(&server, responses).await;
(server, request_log)
}
/// Create a conversation configured to talk to the provided mock server.

View File

@@ -20,7 +20,6 @@ use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::get_responses_request_bodies;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
@@ -471,7 +470,7 @@ async fn unified_exec_respects_workdir_override() -> Result<()> {
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -503,7 +502,7 @@ async fn unified_exec_respects_workdir_override() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
Ok(())
@@ -1217,7 +1216,7 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -1238,10 +1237,12 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let metadata = outputs
@@ -1321,7 +1322,7 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -1342,10 +1343,12 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs
@@ -1446,7 +1449,7 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
ev_completed("resp-4"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -1467,10 +1470,12 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
@@ -1800,7 +1805,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -1821,10 +1826,12 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
@@ -1929,7 +1936,7 @@ PY
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -1955,10 +1962,12 @@ PY
)
.await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
@@ -2038,7 +2047,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -2064,10 +2073,12 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
}
}
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
@@ -2129,7 +2140,7 @@ PY
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -2150,10 +2161,12 @@ PY
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let large_output = outputs.get(call_id).expect("missing large output summary");
@@ -2205,7 +2218,7 @@ async fn unified_exec_runs_under_sandbox() -> Result<()> {
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -2227,10 +2240,12 @@ async fn unified_exec_runs_under_sandbox() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs.get(call_id).expect("missing output");
@@ -2304,7 +2319,7 @@ async fn unified_exec_python_prompt_under_seatbelt() -> Result<()> {
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -2325,10 +2340,12 @@ async fn unified_exec_python_prompt_under_seatbelt() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let startup_output = outputs
@@ -2394,7 +2411,7 @@ async fn unified_exec_runs_on_all_platforms() -> Result<()> {
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
@@ -2415,10 +2432,12 @@ async fn unified_exec_runs_on_all_platforms() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = get_responses_request_bodies(&server).await;
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs.get(call_id).expect("missing output");