From a7b400ac83bff6d84b11f799a6730ccea5acedf6 Mon Sep 17 00:00:00 2001 From: Friel Date: Mon, 4 May 2026 03:36:08 +0000 Subject: [PATCH] test: stabilize forked subagent request assertions --- .../tests/suite/subagent_notifications.rs | 215 ++++++++++-------- 1 file changed, 120 insertions(+), 95 deletions(-) diff --git a/codex-rs/core/tests/suite/subagent_notifications.rs b/codex-rs/core/tests/suite/subagent_notifications.rs index 511ef17308..7e0de280aa 100644 --- a/codex-rs/core/tests/suite/subagent_notifications.rs +++ b/codex-rs/core/tests/suite/subagent_notifications.rs @@ -20,6 +20,7 @@ use core_test_support::test_codex::test_codex; use pretty_assertions::assert_eq; use serde_json::json; use std::fs; +use std::future::Future; use std::path::Path; use std::sync::Arc; use std::sync::Mutex; @@ -57,21 +58,24 @@ impl RawRequestRecorder { } fn single_request(&self) -> wiremock::Request { - let requests = self - .requests - .lock() - .expect("requests lock should not panic"); + let requests = match self.requests.lock() { + Ok(requests) => requests, + Err(err) => panic!("requests lock should not panic: {err}"), + }; assert_eq!(requests.len(), 1); - requests.first().expect("request should exist").clone() + let Some(request) = requests.first() else { + panic!("request should exist"); + }; + request.clone() } } impl Match for RawRequestRecorder { fn matches(&self, request: &wiremock::Request) -> bool { - self.requests - .lock() - .expect("requests lock should not panic") - .push(request.clone()); + match self.requests.lock() { + Ok(mut requests) => requests.push(request.clone()), + Err(err) => panic!("requests lock should not panic: {err}"), + } true } } @@ -104,6 +108,54 @@ fn has_subagent_notification(req: &ResponsesRequest) -> bool { .any(|text| text.contains("")) } +async fn mount_child_response_for_non_parent_session( + server: &MockServer, + parent_session_id: String, + response_body: String, +) -> RawRequestRecorder { + let child_request_log = RawRequestRecorder::new(); + Mock::given(method("POST")) + .and(path_regex(".*/responses$")) + .and(move |req: &wiremock::Request| { + req.headers + .get("x-client-request-id") + .and_then(|value| value.to_str().ok()) + != Some(parent_session_id.as_str()) + }) + .and(child_request_log.clone()) + .respond_with(sse_response(response_body)) + .up_to_n_times(1) + .mount(server) + .await; + child_request_log +} + +fn run_large_fork_request_test(name: &'static str, test: F) -> Result<()> +where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future> + Send + 'static, +{ + // These tests intentionally send full forked requests with parent + // developer context and tool schemas. wiremock clones and matches that + // large request body, so use an explicit Tokio stack size instead of + // relying on the platform default worker stack. + let test_thread = std::thread::Builder::new() + .name(name.to_string()) + .stack_size(32 * 1024 * 1024) + .spawn(|| { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .thread_stack_size(32 * 1024 * 1024) + .enable_all() + .build()?; + runtime.block_on(test()) + })?; + match test_thread.join() { + Ok(result) => result, + Err(err) => std::panic::resume_unwind(err), + } +} + fn tool_parameter_description( req: &ResponsesRequest, tool_name: &str, @@ -347,23 +399,10 @@ async fn subagent_notification_is_included_without_wait() -> Result<()> { #[test] fn spawned_child_receives_forked_parent_context() -> Result<()> { - // This test intentionally sends a full forked request with parent developer - // context and tool schemas. wiremock clones and matches that large request - // body, so use an explicit Tokio stack size instead of relying on the - // platform default worker stack. - std::thread::Builder::new() - .name("spawned_child_receives_forked_parent_context".to_string()) - .stack_size(32 * 1024 * 1024) - .spawn(|| { - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .thread_stack_size(32 * 1024 * 1024) - .enable_all() - .build()?; - runtime.block_on(spawned_child_receives_forked_parent_context_impl()) - })? - .join() - .expect("test thread should not panic") + run_large_fork_request_test( + "spawned_child_receives_forked_parent_context", + spawned_child_receives_forked_parent_context_impl, + ) } async fn spawned_child_receives_forked_parent_context_impl() -> Result<()> { @@ -409,32 +448,23 @@ async fn spawned_child_receives_forked_parent_context_impl() -> Result<()> { .await; let mut builder = test_codex().with_config(|config| { - config - .features - .enable(Feature::Collab) - .expect("test config should allow feature update"); + if let Err(err) = config.features.enable(Feature::Collab) { + panic!("test config should allow feature update: {err}"); + } }); let test = builder.build(&server).await?; let parent_session_id = test.session_configured.session_id.to_string(); - let child_request_log = RawRequestRecorder::new(); - Mock::given(method("POST")) - .and(path_regex(".*/responses$")) - .and(move |req: &wiremock::Request| { - req.headers - .get("x-client-request-id") - .and_then(|value| value.to_str().ok()) - != Some(parent_session_id.as_str()) - }) - .and(child_request_log.clone()) - .respond_with(sse_response(sse(vec![ + let child_request_log = mount_child_response_for_non_parent_session( + &server, + parent_session_id, + sse(vec![ ev_response_created("resp-child-1"), ev_assistant_message("msg-child-1", "child done"), ev_completed("resp-child-1"), - ]))) - .up_to_n_times(1) - .mount(&server) - .await; + ]), + ) + .await; test.submit_turn(TURN_0_FORK_PROMPT).await?; let _ = seed_turn.single_request(); @@ -499,16 +529,6 @@ async fn spawned_multi_agent_v2_child_inherits_parent_developer_context() -> Res ) .await; - let _child_request_log = mount_sse_once_match( - &server, - |req: &wiremock::Request| body_contains(req, CHILD_PROMPT), - sse(vec![ - ev_response_created("resp-child-1"), - ev_completed("resp-child-1"), - ]), - ) - .await; - let _turn1_followup = mount_sse_once_match( &server, |req: &wiremock::Request| { @@ -564,8 +584,15 @@ async fn spawned_multi_agent_v2_child_inherits_parent_developer_context() -> Res Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn skills_toggle_skips_instructions_for_parent_and_spawned_child() -> Result<()> { +#[test] +fn skills_toggle_skips_instructions_for_parent_and_spawned_child() -> Result<()> { + run_large_fork_request_test( + "skills_toggle_skips_instructions_for_parent_and_spawned_child", + skills_toggle_skips_instructions_for_parent_and_spawned_child_impl, + ) +} + +async fn skills_toggle_skips_instructions_for_parent_and_spawned_child_impl() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; @@ -594,10 +621,34 @@ async fn skills_toggle_skips_instructions_for_parent_and_spawned_child() -> Resu ) .await; + let mut builder = test_codex() + .with_pre_build_hook(|home| { + if let Err(err) = write_home_skill(home, "demo", "demo-skill", "demo skill") { + panic!("write home skill: {err}"); + } + }) + .with_config(|config| { + if let Err(err) = config.features.enable(Feature::Collab) { + panic!("test config should allow feature update: {err}"); + } + if let Err(err) = config.features.enable(Feature::MultiAgentV2) { + panic!("test config should allow feature update: {err}"); + } + config.include_skill_instructions = false; + }); + let test = builder.build(&server).await?; + let parent_session_id = test.session_configured.session_id.to_string(); + + let followup_parent_session_id = parent_session_id.clone(); let _turn1_followup = mount_sse_once_match( &server, - |req: &wiremock::Request| { - body_contains(req, "function_call_output") && body_contains(req, "/root/worker") + move |req: &wiremock::Request| { + req.headers + .get("x-client-request-id") + .and_then(|value| value.to_str().ok()) + == Some(followup_parent_session_id.as_str()) + && body_contains(req, "function_call_output") + && body_contains(req, "/root/worker") }, sse(vec![ ev_response_created("resp-turn1-2"), @@ -607,48 +658,22 @@ async fn skills_toggle_skips_instructions_for_parent_and_spawned_child() -> Resu ) .await; - let mut builder = test_codex() - .with_pre_build_hook(|home| { - if let Err(err) = write_home_skill(home, "demo", "demo-skill", "demo skill") { - panic!("write home skill: {err}"); - } - }) - .with_config(|config| { - config - .features - .enable(Feature::Collab) - .expect("test config should allow feature update"); - config - .features - .enable(Feature::MultiAgentV2) - .expect("test config should allow feature update"); - config.include_skill_instructions = false; - }); - let test = builder.build(&server).await?; + let child_request_log = mount_child_response_for_non_parent_session( + &server, + parent_session_id, + sse(vec![ + ev_response_created("resp-child-1"), + ev_completed("resp-child-1"), + ]), + ) + .await; test.submit_turn(TURN_1_PROMPT).await?; let parent_request = spawn_turn.single_request(); assert!(!parent_request.body_contains_text("")); assert!(!parent_request.body_contains_text("demo-skill")); - let deadline = Instant::now() + Duration::from_secs(2); - let child_request = loop { - if let Some(request) = server - .received_requests() - .await - .unwrap_or_default() - .into_iter() - .find(|request| { - body_contains(request, CHILD_PROMPT) && !body_contains(request, SPAWN_CALL_ID) - }) - { - break request; - } - if Instant::now() >= deadline { - anyhow::bail!("timed out waiting for spawned child request"); - } - sleep(Duration::from_millis(10)).await; - }; + let child_request = child_request_log.single_request(); assert!(!body_contains(&child_request, "")); assert!(!body_contains(&child_request, "demo-skill"));