merge upstream/dev/friel/watchdog-runtime-and-prompts into collab stack

This commit is contained in:
Friel
2026-03-30 01:53:08 +00:00
58 changed files with 4043 additions and 426 deletions

View File

@@ -20,11 +20,17 @@ use core_test_support::fs_wait;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
#[cfg(not(windows))]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[cfg(windows)]
const DEFAULT_NOTIFY_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
#[cfg(not(windows))]
const DEFAULT_NOTIFY_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
#[tokio::test]
async fn initialize_uses_client_info_name_as_originator() -> Result<()> {
@@ -270,9 +276,9 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
fs_wait::wait_for_path_exists(&notify_file, Duration::from_secs(5)).await?;
let payload_raw = tokio::fs::read_to_string(&notify_file).await?;
let notify_file = Path::new(&notify_file);
fs_wait::wait_for_path_exists(notify_file, DEFAULT_NOTIFY_FILE_TIMEOUT).await?;
let payload_raw = tokio::fs::read_to_string(notify_file).await?;
let payload: Value = serde_json::from_str(&payload_raw)?;
assert_eq!(payload["client"], "xcode");

View File

@@ -557,6 +557,62 @@ async fn thread_read_include_turns_rejects_unmaterialized_loaded_thread() -> Res
Ok(())
}
#[tokio::test]
async fn thread_read_loaded_ephemeral_thread_ignores_unrelated_rollout_mentions() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
ephemeral: Some(true),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let unrelated_preview = thread.id.clone();
let _unrelated_rollout_id = create_fake_rollout_with_text_elements(
codex_home.path(),
"2025-01-05T13-00-00",
"2025-01-05T13:00:00Z",
&unrelated_preview,
vec![],
Some("mock_provider"),
/*git_info*/ None,
)?;
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: thread.id.clone(),
include_turns: false,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread: read } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(read.id, thread.id);
assert!(read.ephemeral);
assert_eq!(read.path, None);
assert!(read.preview.is_empty());
assert_eq!(read.status, ThreadStatus::Idle);
Ok(())
}
#[tokio::test]
async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Result<()> {
let server = responses::start_mock_server().await;

View File

@@ -62,6 +62,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use tempfile::TempDir;
use tokio::time::sleep;
use tokio::time::timeout;
use uuid::Uuid;
use wiremock::Mock;
@@ -70,39 +71,12 @@ use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
#[cfg(not(windows))]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
async fn wait_for_responses_request_count(
server: &wiremock::MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
anyhow::bail!("wiremock did not record requests");
};
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -1075,13 +1049,9 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
let responses = vec![
create_final_assistant_message_sse_response("seeded")?,
create_shell_command_sse_response(
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
],
fast_shell_command(),
/*workdir*/ None,
Some(5000),
Some(1000),
"call-1",
)?,
create_final_assistant_message_sse_response("done")?,
@@ -1201,7 +1171,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, /*expected_count*/ 3).await?;
wait_for_mock_request_count(&server, /*expected*/ 3).await?;
Ok(())
}
@@ -1367,11 +1337,50 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, /*expected_count*/ 3).await?;
wait_for_mock_request_count(&server, /*expected*/ 3).await?;
Ok(())
}
fn fast_shell_command() -> Vec<String> {
if cfg!(windows) {
vec![
"cmd".to_string(),
"/d".to_string(),
"/c".to_string(),
"echo 42".to_string(),
]
} else {
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
]
}
}
async fn wait_for_mock_request_count(server: &MockServer, expected: usize) -> Result<()> {
let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT;
loop {
let requests = server
.received_requests()
.await
.ok_or_else(|| anyhow::anyhow!("failed to fetch received requests"))?;
if requests.len() >= expected {
return Ok(());
}
if tokio::time::Instant::now() >= deadline {
anyhow::bail!(
"expected at least {expected} mock requests, observed {}",
requests.len()
);
}
sleep(std::time::Duration::from_millis(50)).await;
}
}
#[tokio::test]
async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View File

@@ -200,9 +200,14 @@ async fn thread_start_accepts_flex_service_tier() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let read_timeout = if cfg!(windows) {
std::time::Duration::from_secs(15)
} else {
DEFAULT_READ_TIMEOUT
};
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
timeout(read_timeout, mcp.initialize()).await??;
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
@@ -212,7 +217,7 @@ async fn thread_start_accepts_flex_service_tier() -> Result<()> {
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
read_timeout,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;

View File

@@ -33,6 +33,9 @@ use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
#[cfg(not(windows))]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
async fn wait_for_responses_request_count_to_stabilize(

View File

@@ -1026,24 +1026,16 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
// Second turn same, but we'll set approval_policy=never to avoid elicitation.
let responses = vec![
create_shell_command_sse_response(
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
],
fast_shell_command(),
/*workdir*/ None,
Some(5000),
Some(1000),
"call1",
)?,
create_final_assistant_message_sse_response("done 1")?,
create_shell_command_sse_response(
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
],
fast_shell_command(),
/*workdir*/ None,
Some(5000),
Some(1000),
"call2",
)?,
create_final_assistant_message_sse_response("done 2")?,
@@ -1170,6 +1162,23 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
Ok(())
}
fn fast_shell_command() -> Vec<String> {
if cfg!(windows) {
vec![
"cmd".to_string(),
"/d".to_string(),
"/c".to_string(),
"echo 42".to_string(),
]
} else {
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
]
}
}
#[tokio::test]
async fn turn_start_exec_approval_decline_v2() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -215,9 +215,12 @@ async fn turn_start_shell_zsh_fork_exec_approval_decline_v2() -> Result<()> {
]),
&zsh_path,
)?;
// This flow can require several sequential approval round-trips on slower
// macOS runners before the parent command reaches a terminal state.
let read_timeout = std::time::Duration::from_secs(20);
let mut mcp = create_zsh_test_mcp_process(&codex_home, &workspace).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
timeout(read_timeout, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
@@ -227,7 +230,7 @@ async fn turn_start_shell_zsh_fork_exec_approval_decline_v2() -> Result<()> {
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
read_timeout,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
@@ -348,9 +351,12 @@ async fn turn_start_shell_zsh_fork_exec_approval_cancel_v2() -> Result<()> {
]),
&zsh_path,
)?;
// This flow can require several sequential approval round-trips on slower
// macOS runners before the parent command reaches a terminal state.
let read_timeout = std::time::Duration::from_secs(20);
let mut mcp = create_zsh_test_mcp_process(&codex_home, &workspace).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
timeout(read_timeout, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
@@ -360,7 +366,7 @@ async fn turn_start_shell_zsh_fork_exec_approval_cancel_v2() -> Result<()> {
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
read_timeout,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
@@ -507,9 +513,10 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
]),
&zsh_path,
)?;
let read_timeout = std::time::Duration::from_secs(20);
let mut mcp = create_zsh_test_mcp_process(&codex_home, &workspace).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
timeout(read_timeout, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
@@ -519,7 +526,7 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
read_timeout,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
@@ -548,7 +555,7 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
read_timeout,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
@@ -566,11 +573,7 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
let second_file_str = second_file.to_string_lossy().into_owned();
let parent_shell_hint = format!("&& {}", &first_file_str);
while target_decision_index < target_decisions.len() || !saw_parent_approval {
let server_req = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(),
)
.await??;
let server_req = timeout(read_timeout, mcp.read_stream_until_request_message()).await??;
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req
else {
panic!("expected CommandExecutionRequestApproval request");
@@ -640,7 +643,7 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
assert_eq!(approved_subcommand_strings.len(), 2);
assert!(approved_subcommand_strings[0].contains(&first_file.display().to_string()));
assert!(approved_subcommand_strings[1].contains(&second_file.display().to_string()));
let parent_completed_command_execution = timeout(DEFAULT_READ_TIMEOUT, async {
let parent_completed_command_execution = timeout(read_timeout, async {
loop {
let completed_notif = mcp
.read_stream_until_notification_message("item/completed")
@@ -682,7 +685,7 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
}
match timeout(
DEFAULT_READ_TIMEOUT,
read_timeout,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await
@@ -705,7 +708,7 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
mcp.interrupt_turn_and_wait_for_aborted(
thread.id.clone(),
turn.id.clone(),
DEFAULT_READ_TIMEOUT,
read_timeout,
)
.await?;
}
@@ -718,7 +721,7 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
// sandbox failures can also complete the turn before the parent
// completion item is observed.
let completed_notif = timeout(
DEFAULT_READ_TIMEOUT,
read_timeout,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;