From cd40f8d59d80d1d3c2ec3e565aadceee1b54eed8 Mon Sep 17 00:00:00 2001 From: Friel Date: Mon, 4 May 2026 03:25:13 +0000 Subject: [PATCH] test: cover compact fork references in integration tests --- codex-rs/core/tests/suite/fork_thread.rs | 43 ++++-- .../tests/suite/subagent_notifications.rs | 130 +++++++++++++----- 2 files changed, 126 insertions(+), 47 deletions(-) diff --git a/codex-rs/core/tests/suite/fork_thread.rs b/codex-rs/core/tests/suite/fork_thread.rs index 19ed2a2088..ec0c5baa23 100644 --- a/codex-rs/core/tests/suite/fork_thread.rs +++ b/codex-rs/core/tests/suite/fork_thread.rs @@ -1,5 +1,6 @@ use codex_core::ForkSnapshot; use codex_core::NewThread; +use codex_core::materialize_rollout_items_for_replay; use codex_core::parse_turn_item; use codex_protocol::items::TurnItem; use codex_protocol::protocol::EventMsg; @@ -110,13 +111,24 @@ async fn fork_thread_twice_drops_to_first_message() { let fork1_path = codex_fork1.rollout_path().expect("rollout path"); // GetHistory on fork1 flushed; the file is ready. - let fork1_items = read_rollout_items(&fork1_path); + let fork1_raw_items = read_rollout_items(&fork1_path); + assert!( + fork1_raw_items + .iter() + .any(|item| matches!(item, RolloutItem::ForkReference(_))), + "forked rollout should keep a compact ForkReference instead of copying parent history" + ); + let fork1_items = + materialize_rollout_items_for_replay(test.config.codex_home.as_path(), &fork1_raw_items) + .await; + let fork1_items = without_session_meta(fork1_items); pretty_assertions::assert_eq!( serde_json::to_value(&fork1_items).unwrap(), serde_json::to_value(&expected_after_first).unwrap() ); - // Fork again with n=0 → drops the (new) last user message, leaving only the first. + // Fork again with n=0. Because fork1 has no new user turn after its + // compact reference, this should preserve the same materialized history. let NewThread { thread: codex_fork2, .. @@ -133,14 +145,18 @@ async fn fork_thread_twice_drops_to_first_message() { let fork2_path = codex_fork2.rollout_path().expect("rollout path"); // GetHistory on fork2 flushed; the file is ready. - let fork1_items = read_rollout_items(&fork1_path); - let fork1_user_inputs = find_user_input_positions(&fork1_items); - let cut_last_on_fork1 = fork1_user_inputs - .get(fork1_user_inputs.len().saturating_sub(1)) - .copied() - .unwrap_or(0); - let expected_after_second: Vec = fork1_items[..cut_last_on_fork1].to_vec(); - let fork2_items = read_rollout_items(&fork2_path); + let expected_after_second = fork1_items.clone(); + let fork2_raw_items = read_rollout_items(&fork2_path); + assert!( + fork2_raw_items + .iter() + .any(|item| matches!(item, RolloutItem::ForkReference(_))), + "re-forked rollout should keep a compact ForkReference instead of copying parent history" + ); + let fork2_items = + materialize_rollout_items_for_replay(test.config.codex_home.as_path(), &fork2_raw_items) + .await; + let fork2_items = without_session_meta(fork2_items); pretty_assertions::assert_eq!( serde_json::to_value(&fork2_items).unwrap(), serde_json::to_value(&expected_after_second).unwrap() @@ -244,3 +260,10 @@ fn read_rollout_items(path: &std::path::Path) -> Vec { } items } + +fn without_session_meta(items: Vec) -> Vec { + items + .into_iter() + .filter(|item| !matches!(item, RolloutItem::SessionMeta(_))) + .collect() +} diff --git a/codex-rs/core/tests/suite/subagent_notifications.rs b/codex-rs/core/tests/suite/subagent_notifications.rs index 3f457967c1..511ef17308 100644 --- a/codex-rs/core/tests/suite/subagent_notifications.rs +++ b/codex-rs/core/tests/suite/subagent_notifications.rs @@ -21,10 +21,16 @@ use pretty_assertions::assert_eq; use serde_json::json; use std::fs; use std::path::Path; +use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; use tokio::time::Instant; use tokio::time::sleep; +use wiremock::Match; +use wiremock::Mock; use wiremock::MockServer; +use wiremock::matchers::method; +use wiremock::matchers::path_regex; const SPAWN_CALL_ID: &str = "spawn-call-1"; const TURN_0_FORK_PROMPT: &str = "seed fork context"; @@ -38,7 +44,43 @@ const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low; const ROLE_MODEL: &str = "gpt-5.4"; const ROLE_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::High; +#[derive(Clone)] +struct RawRequestRecorder { + requests: Arc>>, +} + +impl RawRequestRecorder { + fn new() -> Self { + Self { + requests: Arc::new(Mutex::new(Vec::new())), + } + } + + fn single_request(&self) -> wiremock::Request { + let requests = self + .requests + .lock() + .expect("requests lock should not panic"); + assert_eq!(requests.len(), 1); + requests.first().expect("request should exist").clone() + } +} + +impl Match for RawRequestRecorder { + fn matches(&self, request: &wiremock::Request) -> bool { + self.requests + .lock() + .expect("requests lock should not panic") + .push(request.clone()); + true + } +} + fn body_contains(req: &wiremock::Request, text: &str) -> bool { + request_body_text(req).is_some_and(|body| body.contains(text)) +} + +fn request_body_text(req: &wiremock::Request) -> Option { let is_zstd = req .headers .get("content-encoding") @@ -53,9 +95,7 @@ fn body_contains(req: &wiremock::Request, text: &str) -> bool { } else { Some(req.body.clone()) }; - bytes - .and_then(|body| String::from_utf8(body).ok()) - .is_some_and(|body| body.contains(text)) + bytes.and_then(|body| String::from_utf8(body).ok()) } fn has_subagent_notification(req: &ResponsesRequest) -> bool { @@ -305,8 +345,28 @@ async fn subagent_notification_is_included_without_wait() -> Result<()> { Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn spawned_child_receives_forked_parent_context() -> Result<()> { +#[test] +fn spawned_child_receives_forked_parent_context() -> Result<()> { + // This test intentionally sends a full forked request with parent developer + // context and tool schemas. wiremock clones and matches that large request + // body, so use an explicit Tokio stack size instead of relying on the + // platform default worker stack. + std::thread::Builder::new() + .name("spawned_child_receives_forked_parent_context".to_string()) + .stack_size(32 * 1024 * 1024) + .spawn(|| { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .thread_stack_size(32 * 1024 * 1024) + .enable_all() + .build()?; + runtime.block_on(spawned_child_receives_forked_parent_context_impl()) + })? + .join() + .expect("test thread should not panic") +} + +async fn spawned_child_receives_forked_parent_context_impl() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; @@ -337,17 +397,6 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> { ) .await; - let _child_request_log = mount_sse_once_match( - &server, - |req: &wiremock::Request| body_contains(req, CHILD_PROMPT), - sse(vec![ - ev_response_created("resp-child-1"), - ev_assistant_message("msg-child-1", "child done"), - ev_completed("resp-child-1"), - ]), - ) - .await; - let _turn1_followup = mount_sse_once_match( &server, |req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID), @@ -366,6 +415,26 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> { .expect("test config should allow feature update"); }); let test = builder.build(&server).await?; + let parent_session_id = test.session_configured.session_id.to_string(); + + let child_request_log = RawRequestRecorder::new(); + Mock::given(method("POST")) + .and(path_regex(".*/responses$")) + .and(move |req: &wiremock::Request| { + req.headers + .get("x-client-request-id") + .and_then(|value| value.to_str().ok()) + != Some(parent_session_id.as_str()) + }) + .and(child_request_log.clone()) + .respond_with(sse_response(sse(vec![ + ev_response_created("resp-child-1"), + ev_assistant_message("msg-child-1", "child done"), + ev_completed("resp-child-1"), + ]))) + .up_to_n_times(1) + .mount(&server) + .await; test.submit_turn(TURN_0_FORK_PROMPT).await?; let _ = seed_turn.single_request(); @@ -373,27 +442,14 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> { test.submit_turn(TURN_1_PROMPT).await?; let _ = spawn_turn.single_request(); - let deadline = Instant::now() + Duration::from_secs(2); - let child_request = loop { - if let Some(request) = server - .received_requests() - .await - .unwrap_or_default() - .into_iter() - .find(|request| { - body_contains(request, CHILD_PROMPT) && !body_contains(request, SPAWN_CALL_ID) - }) - { - break request; - } - if Instant::now() >= deadline { - anyhow::bail!("timed out waiting for forked child request"); - } - sleep(Duration::from_millis(10)).await; - }; - assert!(body_contains(&child_request, TURN_0_FORK_PROMPT)); - assert!(!body_contains(&child_request, SPAWN_CALL_ID)); - + let child_request = child_request_log.single_request(); + let child_body = request_body_text(&child_request) + .ok_or_else(|| anyhow::anyhow!("child request body should be text"))?; + assert!( + child_body.contains(TURN_0_FORK_PROMPT) + || child_body.contains(r#""previous_response_id":"resp-turn1-1""#), + "forked child should either inline parent context or continue from the parent response" + ); Ok(()) }