mirror of
https://github.com/openai/codex.git
synced 2026-05-23 12:34:25 +00:00
test: cover compact fork references in integration tests
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use codex_core::ForkSnapshot;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::materialize_rollout_items_for_replay;
|
||||
use codex_core::parse_turn_item;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
@@ -110,13 +111,24 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
let fork1_path = codex_fork1.rollout_path().expect("rollout path");
|
||||
|
||||
// GetHistory on fork1 flushed; the file is ready.
|
||||
let fork1_items = read_rollout_items(&fork1_path);
|
||||
let fork1_raw_items = read_rollout_items(&fork1_path);
|
||||
assert!(
|
||||
fork1_raw_items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_))),
|
||||
"forked rollout should keep a compact ForkReference instead of copying parent history"
|
||||
);
|
||||
let fork1_items =
|
||||
materialize_rollout_items_for_replay(test.config.codex_home.as_path(), &fork1_raw_items)
|
||||
.await;
|
||||
let fork1_items = without_session_meta(fork1_items);
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(&fork1_items).unwrap(),
|
||||
serde_json::to_value(&expected_after_first).unwrap()
|
||||
);
|
||||
|
||||
// Fork again with n=0 → drops the (new) last user message, leaving only the first.
|
||||
// Fork again with n=0. Because fork1 has no new user turn after its
|
||||
// compact reference, this should preserve the same materialized history.
|
||||
let NewThread {
|
||||
thread: codex_fork2,
|
||||
..
|
||||
@@ -133,14 +145,18 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
|
||||
let fork2_path = codex_fork2.rollout_path().expect("rollout path");
|
||||
// GetHistory on fork2 flushed; the file is ready.
|
||||
let fork1_items = read_rollout_items(&fork1_path);
|
||||
let fork1_user_inputs = find_user_input_positions(&fork1_items);
|
||||
let cut_last_on_fork1 = fork1_user_inputs
|
||||
.get(fork1_user_inputs.len().saturating_sub(1))
|
||||
.copied()
|
||||
.unwrap_or(0);
|
||||
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
|
||||
let fork2_items = read_rollout_items(&fork2_path);
|
||||
let expected_after_second = fork1_items.clone();
|
||||
let fork2_raw_items = read_rollout_items(&fork2_path);
|
||||
assert!(
|
||||
fork2_raw_items
|
||||
.iter()
|
||||
.any(|item| matches!(item, RolloutItem::ForkReference(_))),
|
||||
"re-forked rollout should keep a compact ForkReference instead of copying parent history"
|
||||
);
|
||||
let fork2_items =
|
||||
materialize_rollout_items_for_replay(test.config.codex_home.as_path(), &fork2_raw_items)
|
||||
.await;
|
||||
let fork2_items = without_session_meta(fork2_items);
|
||||
pretty_assertions::assert_eq!(
|
||||
serde_json::to_value(&fork2_items).unwrap(),
|
||||
serde_json::to_value(&expected_after_second).unwrap()
|
||||
@@ -244,3 +260,10 @@ fn read_rollout_items(path: &std::path::Path) -> Vec<RolloutItem> {
|
||||
}
|
||||
items
|
||||
}
|
||||
|
||||
fn without_session_meta(items: Vec<RolloutItem>) -> Vec<RolloutItem> {
|
||||
items
|
||||
.into_iter()
|
||||
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -21,10 +21,16 @@ use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::sleep;
|
||||
use wiremock::Match;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path_regex;
|
||||
|
||||
const SPAWN_CALL_ID: &str = "spawn-call-1";
|
||||
const TURN_0_FORK_PROMPT: &str = "seed fork context";
|
||||
@@ -38,7 +44,43 @@ const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low;
|
||||
const ROLE_MODEL: &str = "gpt-5.4";
|
||||
const ROLE_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::High;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RawRequestRecorder {
|
||||
requests: Arc<Mutex<Vec<wiremock::Request>>>,
|
||||
}
|
||||
|
||||
impl RawRequestRecorder {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
requests: Arc::new(Mutex::new(Vec::new())),
|
||||
}
|
||||
}
|
||||
|
||||
fn single_request(&self) -> wiremock::Request {
|
||||
let requests = self
|
||||
.requests
|
||||
.lock()
|
||||
.expect("requests lock should not panic");
|
||||
assert_eq!(requests.len(), 1);
|
||||
requests.first().expect("request should exist").clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Match for RawRequestRecorder {
|
||||
fn matches(&self, request: &wiremock::Request) -> bool {
|
||||
self.requests
|
||||
.lock()
|
||||
.expect("requests lock should not panic")
|
||||
.push(request.clone());
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn body_contains(req: &wiremock::Request, text: &str) -> bool {
|
||||
request_body_text(req).is_some_and(|body| body.contains(text))
|
||||
}
|
||||
|
||||
fn request_body_text(req: &wiremock::Request) -> Option<String> {
|
||||
let is_zstd = req
|
||||
.headers
|
||||
.get("content-encoding")
|
||||
@@ -53,9 +95,7 @@ fn body_contains(req: &wiremock::Request, text: &str) -> bool {
|
||||
} else {
|
||||
Some(req.body.clone())
|
||||
};
|
||||
bytes
|
||||
.and_then(|body| String::from_utf8(body).ok())
|
||||
.is_some_and(|body| body.contains(text))
|
||||
bytes.and_then(|body| String::from_utf8(body).ok())
|
||||
}
|
||||
|
||||
fn has_subagent_notification(req: &ResponsesRequest) -> bool {
|
||||
@@ -305,8 +345,28 @@ async fn subagent_notification_is_included_without_wait() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn spawned_child_receives_forked_parent_context() -> Result<()> {
|
||||
#[test]
|
||||
fn spawned_child_receives_forked_parent_context() -> Result<()> {
|
||||
// This test intentionally sends a full forked request with parent developer
|
||||
// context and tool schemas. wiremock clones and matches that large request
|
||||
// body, so use an explicit Tokio stack size instead of relying on the
|
||||
// platform default worker stack.
|
||||
std::thread::Builder::new()
|
||||
.name("spawned_child_receives_forked_parent_context".to_string())
|
||||
.stack_size(32 * 1024 * 1024)
|
||||
.spawn(|| {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(2)
|
||||
.thread_stack_size(32 * 1024 * 1024)
|
||||
.enable_all()
|
||||
.build()?;
|
||||
runtime.block_on(spawned_child_receives_forked_parent_context_impl())
|
||||
})?
|
||||
.join()
|
||||
.expect("test thread should not panic")
|
||||
}
|
||||
|
||||
async fn spawned_child_receives_forked_parent_context_impl() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
@@ -337,17 +397,6 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> {
|
||||
)
|
||||
.await;
|
||||
|
||||
let _child_request_log = mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| body_contains(req, CHILD_PROMPT),
|
||||
sse(vec![
|
||||
ev_response_created("resp-child-1"),
|
||||
ev_assistant_message("msg-child-1", "child done"),
|
||||
ev_completed("resp-child-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let _turn1_followup = mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
|
||||
@@ -366,6 +415,26 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> {
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let parent_session_id = test.session_configured.session_id.to_string();
|
||||
|
||||
let child_request_log = RawRequestRecorder::new();
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.and(move |req: &wiremock::Request| {
|
||||
req.headers
|
||||
.get("x-client-request-id")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
!= Some(parent_session_id.as_str())
|
||||
})
|
||||
.and(child_request_log.clone())
|
||||
.respond_with(sse_response(sse(vec![
|
||||
ev_response_created("resp-child-1"),
|
||||
ev_assistant_message("msg-child-1", "child done"),
|
||||
ev_completed("resp-child-1"),
|
||||
])))
|
||||
.up_to_n_times(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
test.submit_turn(TURN_0_FORK_PROMPT).await?;
|
||||
let _ = seed_turn.single_request();
|
||||
@@ -373,27 +442,14 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> {
|
||||
test.submit_turn(TURN_1_PROMPT).await?;
|
||||
let _ = spawn_turn.single_request();
|
||||
|
||||
let deadline = Instant::now() + Duration::from_secs(2);
|
||||
let child_request = loop {
|
||||
if let Some(request) = server
|
||||
.received_requests()
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.find(|request| {
|
||||
body_contains(request, CHILD_PROMPT) && !body_contains(request, SPAWN_CALL_ID)
|
||||
})
|
||||
{
|
||||
break request;
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!("timed out waiting for forked child request");
|
||||
}
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
};
|
||||
assert!(body_contains(&child_request, TURN_0_FORK_PROMPT));
|
||||
assert!(!body_contains(&child_request, SPAWN_CALL_ID));
|
||||
|
||||
let child_request = child_request_log.single_request();
|
||||
let child_body = request_body_text(&child_request)
|
||||
.ok_or_else(|| anyhow::anyhow!("child request body should be text"))?;
|
||||
assert!(
|
||||
child_body.contains(TURN_0_FORK_PROMPT)
|
||||
|| child_body.contains(r#""previous_response_id":"resp-turn1-1""#),
|
||||
"forked child should either inline parent context or continue from the parent response"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user