Compare commits

...

2 Commits

Author SHA1 Message Date
Friel
b6be6d0984 test(core): split fork cache invariants 2026-04-01 20:10:21 +00:00
Friel
3572226938 test(core): assert forked spawn request prefix stability 2026-04-01 19:40:27 +00:00
2 changed files with 251 additions and 106 deletions

View File

@@ -103,14 +103,18 @@ fn decode_body_bytes(body: &[u8], content_encoding: Option<&str>) -> Vec<u8> {
impl ResponsesRequest {
pub fn body_json(&self) -> Value {
let body = decode_body_bytes(
let body = self.decoded_body_bytes();
serde_json::from_slice(&body).unwrap()
}
pub fn decoded_body_bytes(&self) -> Vec<u8> {
decode_body_bytes(
&self.0.body,
self.0
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok()),
);
serde_json::from_slice(&body).unwrap()
)
}
pub fn body_bytes(&self) -> Vec<u8> {

View File

@@ -18,6 +18,7 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::time::Duration;
use tokio::time::Instant;
@@ -63,6 +64,118 @@ fn has_subagent_notification(req: &ResponsesRequest) -> bool {
.any(|text| text.contains("<subagent_notification>"))
}
fn cache_prefix_request_body_without_prompt_cache_key(
request: &ResponsesRequest,
call_id: &str,
) -> Result<Value> {
let mut body = request.body_json();
let Some(object) = body.as_object_mut() else {
anyhow::bail!("expected JSON object request body");
};
object.remove("prompt_cache_key");
let input = object
.get_mut("input")
.and_then(Value::as_array_mut)
.ok_or_else(|| anyhow::anyhow!("expected request input array"))?;
let spawn_call_index = input
.iter()
.rposition(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})
.ok_or_else(|| {
anyhow::anyhow!("expected request input to include function_call {call_id}: {input:?}")
})?;
// The cache-preservation contract is only for the shared request prefix up to
// and including the forked `spawn_agent` call. The `FunctionCallOutput` for that
// call is the first legal divergence point between parent and child requests,
// so truncate immediately before it.
input.truncate(spawn_call_index + 1);
if let Some(tools) = object.get_mut("tools") {
*tools = normalize_tools_for_cache_prefix(tools);
}
Ok(body)
}
fn prompt_cache_key(request: &ResponsesRequest) -> Option<String> {
request
.body_json()
.get("prompt_cache_key")
.and_then(Value::as_str)
.map(str::to_string)
}
struct ForkedChildRequests {
parent_session_id: String,
parent_followup_request: ResponsesRequest,
child_request: ResponsesRequest,
}
fn normalize_tools_for_cache_prefix(tools: &Value) -> Value {
let normalized_tools = tools
.as_array()
.unwrap_or_else(|| panic!("expected tools array: {tools:?}"))
.iter()
.filter_map(normalize_tool_for_cache_prefix)
.collect::<Vec<_>>();
Value::Array(normalized_tools)
}
fn normalize_tool_for_cache_prefix(tool: &Value) -> Option<Value> {
let mut normalized = tool
.as_object()
.unwrap_or_else(|| panic!("expected tool object: {tool:?}"))
.clone();
if normalized.get("type").and_then(Value::as_str) == Some("namespace")
&& let Some(namespace_tools) = normalized.get("tools")
{
normalized.insert(
"tools".to_string(),
normalize_namespace_tools_for_cache_prefix(namespace_tools),
);
}
if normalized
.get("defer_loading")
.and_then(Value::as_bool)
.unwrap_or(false)
&& normalized.get("type").and_then(Value::as_str) == Some("function")
{
normalized.remove("parameters");
}
Some(Value::Object(normalized))
}
fn normalize_namespace_tools_for_cache_prefix(tools: &Value) -> Value {
let normalized_tools = tools
.as_array()
.unwrap_or_else(|| panic!("expected namespace tools array: {tools:?}"))
.iter()
.filter_map(|tool| {
let tool_object = tool
.as_object()
.unwrap_or_else(|| panic!("expected namespace tool object: {tool:?}"))
.clone();
if tool_object
.get("defer_loading")
.and_then(Value::as_bool)
.unwrap_or(false)
&& tool_object.get("type").and_then(Value::as_str) == Some("function")
{
None
} else {
normalize_tool_for_cache_prefix(&Value::Object(tool_object))
}
})
.collect::<Vec<_>>();
Value::Array(normalized_tools)
}
fn tool_parameter_description(
req: &ResponsesRequest,
tool_name: &str,
@@ -270,6 +383,102 @@ async fn spawn_child_and_capture_snapshot(
.await)
}
async fn spawn_child_and_capture_fork_requests(server: &MockServer) -> Result<ForkedChildRequests> {
let seed_turn = mount_sse_once_match(
server,
|req: &wiremock::Request| body_contains(req, TURN_0_FORK_PROMPT),
sse(vec![
ev_response_created("resp-seed-1"),
ev_assistant_message("msg-seed-1", "seeded"),
ev_completed("resp-seed-1"),
]),
)
.await;
let spawn_args = serde_json::to_string(&json!({
"message": CHILD_PROMPT,
"fork_context": true,
}))?;
let spawn_turn = mount_sse_once_match(
server,
|req: &wiremock::Request| body_contains(req, TURN_1_PROMPT),
sse(vec![
ev_response_created("resp-turn1-1"),
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
ev_completed("resp-turn1-1"),
]),
)
.await;
let child_request_log = mount_sse_once_match(
server,
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT)
&& body_contains(req, FORKED_SPAWN_AGENT_OUTPUT_MESSAGE)
},
sse(vec![
ev_response_created("resp-child-1"),
ev_assistant_message("msg-child-1", "child done"),
ev_completed("resp-child-1"),
]),
)
.await;
let turn1_followup = mount_sse_once_match(
server,
|req: &wiremock::Request| {
body_contains(req, SPAWN_CALL_ID) && !body_contains(req, CHILD_PROMPT)
},
sse(vec![
ev_response_created("resp-turn1-2"),
ev_assistant_message("msg-turn1-2", "parent done"),
ev_completed("resp-turn1-2"),
]),
)
.await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::Collab)
.expect("test config should allow feature update");
});
let test = builder.build(server).await?;
let parent_session_id = test.session_configured.session_id.to_string();
test.submit_turn(TURN_0_FORK_PROMPT).await?;
let _ = seed_turn.single_request();
test.submit_turn(TURN_1_PROMPT).await?;
let _ = spawn_turn.single_request();
let parent_followup_request = wait_for_requests(&turn1_followup)
.await?
.into_iter()
.next()
.expect("parent follow-up request should be captured");
let deadline = Instant::now() + Duration::from_secs(2);
let child_request = loop {
if let Some(request) = child_request_log
.requests()
.into_iter()
.find(|request| request.body_contains_text(CHILD_PROMPT))
{
break request;
}
if Instant::now() >= deadline {
anyhow::bail!("timed out waiting for forked child request");
}
sleep(Duration::from_millis(10)).await;
};
Ok(ForkedChildRequests {
parent_session_id,
parent_followup_request,
child_request,
})
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subagent_notification_is_included_without_wait() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -297,117 +506,49 @@ async fn subagent_notification_is_included_without_wait() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawned_child_receives_forked_parent_context() -> Result<()> {
async fn spawned_child_request_reuses_parent_prompt_cache_key() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let requests = spawn_child_and_capture_fork_requests(&server).await?;
let seed_turn = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, TURN_0_FORK_PROMPT),
sse(vec![
ev_response_created("resp-seed-1"),
ev_assistant_message("msg-seed-1", "seeded"),
ev_completed("resp-seed-1"),
]),
)
.await;
assert_eq!(
prompt_cache_key(&requests.parent_followup_request),
Some(requests.parent_session_id.clone()),
"parent follow-up requests should use the parent session id as prompt_cache_key"
);
assert_eq!(
prompt_cache_key(&requests.child_request),
Some(requests.parent_session_id),
"forked child requests must reuse the parent prompt_cache_key so backend sharding can colocate them for KV cache reuse"
);
let spawn_args = serde_json::to_string(&json!({
"message": CHILD_PROMPT,
"fork_context": true,
}))?;
let spawn_turn = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, TURN_1_PROMPT),
sse(vec![
ev_response_created("resp-turn1-1"),
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
ev_completed("resp-turn1-1"),
]),
)
.await;
Ok(())
}
let _child_request_log = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, CHILD_PROMPT),
sse(vec![
ev_response_created("resp-child-1"),
ev_assistant_message("msg-child-1", "child done"),
ev_completed("resp-child-1"),
]),
)
.await;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawned_child_preserves_parent_prefix_and_tool_surface() -> Result<()> {
skip_if_no_network!(Ok(()));
let _turn1_followup = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
sse(vec![
ev_response_created("resp-turn1-2"),
ev_assistant_message("msg-turn1-2", "parent done"),
ev_completed("resp-turn1-2"),
]),
)
.await;
let server = start_mock_server().await;
let requests = spawn_child_and_capture_fork_requests(&server).await?;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::Collab)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
test.submit_turn(TURN_0_FORK_PROMPT).await?;
let _ = seed_turn.single_request();
test.submit_turn(TURN_1_PROMPT).await?;
let _ = spawn_turn.single_request();
let deadline = Instant::now() + Duration::from_secs(2);
let child_request = loop {
if let Some(request) = server
.received_requests()
.await
.unwrap_or_default()
.into_iter()
.find(|request| {
body_contains(request, CHILD_PROMPT)
&& body_contains(request, FORKED_SPAWN_AGENT_OUTPUT_MESSAGE)
})
{
break request;
}
if Instant::now() >= deadline {
anyhow::bail!("timed out waiting for forked child request");
}
sleep(Duration::from_millis(10)).await;
};
assert!(body_contains(&child_request, TURN_0_FORK_PROMPT));
assert!(body_contains(&child_request, "seeded"));
let child_body = child_request
.body_json::<serde_json::Value>()
.expect("forked child request body should be json");
let function_call_output = child_body["input"]
.as_array()
.and_then(|items| {
items.iter().find(|item| {
item["type"].as_str() == Some("function_call_output")
&& item["call_id"].as_str() == Some(SPAWN_CALL_ID)
})
})
.unwrap_or_else(|| panic!("expected forked child request to include spawn_agent output"));
let (content, success) = match &function_call_output["output"] {
serde_json::Value::String(text) => (Some(text.as_str()), None),
serde_json::Value::Object(output) => (
output.get("content").and_then(serde_json::Value::as_str),
output.get("success").and_then(serde_json::Value::as_bool),
),
_ => (None, None),
};
assert_eq!(content, Some(FORKED_SPAWN_AGENT_OUTPUT_MESSAGE));
assert_ne!(success, Some(false));
assert!(
requests
.child_request
.body_contains_text(TURN_0_FORK_PROMPT)
);
assert!(requests.child_request.body_contains_text("seeded"));
let parent_prefix = cache_prefix_request_body_without_prompt_cache_key(
&requests.parent_followup_request,
SPAWN_CALL_ID,
)?;
let child_prefix =
cache_prefix_request_body_without_prompt_cache_key(&requests.child_request, SPAWN_CALL_ID)?;
assert_eq!(
parent_prefix, child_prefix,
"forked child requests must preserve every cache-relevant request field and the conversation-item prefix exactly through the shared spawn_agent call; namespace shells and non-deferred tools must stay stable, while deferred namespace members may only appear after tool_search_output"
);
Ok(())
}