feat(agents): enable subagent inbox delivery

Preserve subagent inbox delivery on the current origin/main base and collapse the branch back to a single commit for easier future restacks.
This commit is contained in:
Friel
2026-03-14 13:31:40 -07:00
parent 65f631c3d6
commit 0e91619094
21 changed files with 1840 additions and 85 deletions

View File

@@ -62,6 +62,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use tempfile::TempDir;
use tokio::time::sleep;
use tokio::time::timeout;
use uuid::Uuid;
use wiremock::Mock;
@@ -70,39 +71,12 @@ use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
#[cfg(not(windows))]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
async fn wait_for_responses_request_count(
server: &wiremock::MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
anyhow::bail!("wiremock did not record requests");
};
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -171,7 +145,7 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
.map(|elem| serde_json::to_value(elem).expect("serialize text element"))
.collect(),
Some("mock_provider"),
/*git_info*/ None,
None,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
@@ -368,9 +342,7 @@ stream_max_retries = 0
)?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
state_db.mark_backfill_complete(None).await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -431,7 +403,7 @@ async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is
"Saved user message",
Vec::new(),
Some("mock_provider"),
/*git_info*/ None,
None,
)?;
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
@@ -1018,16 +990,7 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R
async fn thread_resume_replays_pending_command_execution_request_approval() -> Result<()> {
let responses = vec![
create_final_assistant_message_sse_response("seeded")?,
create_shell_command_sse_response(
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
],
/*workdir*/ None,
Some(5000),
"call-1",
)?,
create_shell_command_sse_response(fast_shell_command(), None, Some(1000), "call-1")?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
@@ -1145,7 +1108,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, /*expected_count*/ 3).await?;
wait_for_mock_request_count(&server, 3).await?;
Ok(())
}
@@ -1311,11 +1274,50 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, /*expected_count*/ 3).await?;
wait_for_mock_request_count(&server, 3).await?;
Ok(())
}
fn fast_shell_command() -> Vec<String> {
if cfg!(windows) {
vec![
"cmd".to_string(),
"/d".to_string(),
"/c".to_string(),
"echo 42".to_string(),
]
} else {
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
]
}
}
async fn wait_for_mock_request_count(server: &MockServer, expected: usize) -> Result<()> {
let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT;
loop {
let requests = server
.received_requests()
.await
.ok_or_else(|| anyhow::anyhow!("failed to fetch received requests"))?;
if requests.len() >= expected {
return Ok(());
}
if tokio::time::Instant::now() >= deadline {
anyhow::bail!(
"expected at least {expected} mock requests, observed {}",
requests.len()
);
}
sleep(std::time::Duration::from_millis(50)).await;
}
}
#[tokio::test]
async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -1467,7 +1469,7 @@ async fn thread_resume_surfaces_cloud_requirements_load_errors() -> Result<()> {
"Saved user message",
Vec::new(),
Some("mock_provider"),
/*git_info*/ None,
None,
)?;
let refresh_token_url = format!("{}/oauth/token", server.uri());
let mut mcp = McpProcess::new_with_env(
@@ -1935,7 +1937,7 @@ fn setup_rollout_fixture(codex_home: &Path, server_uri: &str) -> Result<RolloutF
preview,
Vec::new(),
Some("mock_provider"),
/*git_info*/ None,
None,
)?;
let rollout_file_path = rollout_path(codex_home, filename_ts, &conversation_id);
set_rollout_mtime(rollout_file_path.as_path(), expected_updated_at_rfc3339)?;