Merge origin/main into rhan/surface-updates to resolve PR #14374 conflicts

This commit is contained in:
Roy Han
2026-03-12 10:31:10 -07:00
437 changed files with 65186 additions and 43385 deletions

View File

@@ -594,7 +594,7 @@ impl McpProcess {
/// Deterministically clean up an intentionally in-flight turn.
///
/// Some tests assert behavior while a turn is still running. Returning from those tests
/// without an explicit interrupt + `codex/event/turn_aborted` wait can leave in-flight work
/// without an explicit interrupt + terminal turn notification wait can leave in-flight work
/// racing teardown and intermittently show up as `LEAK` in nextest.
///
/// In rare races, the turn can also fail or complete on its own after we send
@@ -631,18 +631,19 @@ impl McpProcess {
}
match tokio::time::timeout(
read_timeout,
self.read_stream_until_notification_message("codex/event/turn_aborted"),
self.read_stream_until_notification_message("turn/completed"),
)
.await
{
Ok(result) => {
result.with_context(|| "failed while waiting for turn aborted notification")?;
result.with_context(|| "failed while waiting for terminal turn notification")?;
}
Err(err) => {
if self.pending_turn_completed_notification(&thread_id, &turn_id) {
return Ok(());
}
return Err(err).with_context(|| "timed out waiting for turn aborted notification");
return Err(err)
.with_context(|| "timed out waiting for terminal turn notification");
}
}
Ok(())

View File

@@ -710,6 +710,12 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let marker = format!(
"codex-command-exec-marker-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_nanos()
);
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
@@ -726,7 +732,12 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
"command/exec",
101,
Some(serde_json::json!({
"command": ["sh", "-lc", "printf 'ready\\n%s\\n' $$; sleep 30"],
"command": [
"python3",
"-c",
"import time; print('ready', flush=True); time.sleep(30)",
marker,
],
"processId": "shared-process",
"streamStdoutStderr": true,
})),
@@ -737,12 +748,8 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
assert_eq!(delta.process_id, "shared-process");
assert_eq!(delta.stream, CommandExecOutputStream::Stdout);
let delta_text = String::from_utf8(STANDARD.decode(&delta.delta_base64)?)?;
let pid = delta_text
.lines()
.last()
.context("delta should include shell pid")?
.parse::<u32>()
.context("parse shell pid")?;
assert!(delta_text.contains("ready"));
wait_for_process_marker(&marker, true).await?;
send_request(
&mut ws2,
@@ -766,12 +773,12 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
terminate_error.error.message,
"no active command/exec for process id \"shared-process\""
);
assert!(process_is_alive(pid)?);
wait_for_process_marker(&marker, true).await?;
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
ws1.close(None).await?;
wait_for_process_exit(pid).await?;
wait_for_process_marker(&marker, false).await?;
process
.kill()
@@ -855,24 +862,25 @@ async fn read_initialize_response(
}
}
async fn wait_for_process_exit(pid: u32) -> Result<()> {
async fn wait_for_process_marker(marker: &str, should_exist: bool) -> Result<()> {
let deadline = Instant::now() + Duration::from_secs(5);
loop {
if !process_is_alive(pid)? {
if process_with_marker_exists(marker)? == should_exist {
return Ok(());
}
if Instant::now() >= deadline {
anyhow::bail!("process {pid} was still alive after websocket disconnect");
let expectation = if should_exist { "appear" } else { "exit" };
anyhow::bail!("process marker {marker:?} did not {expectation} before timeout");
}
sleep(Duration::from_millis(50)).await;
}
}
fn process_is_alive(pid: u32) -> Result<bool> {
let status = std::process::Command::new("kill")
.arg("-0")
.arg(pid.to_string())
.status()
.context("spawn kill -0")?;
Ok(status.success())
fn process_with_marker_exists(marker: &str) -> Result<bool> {
let output = std::process::Command::new("ps")
.args(["-axo", "command"])
.output()
.context("spawn ps -axo command")?;
let stdout = String::from_utf8(output.stdout).context("decode ps output")?;
Ok(stdout.lines().any(|line| line.contains(marker)))
}

View File

@@ -139,10 +139,7 @@ async fn initialize_opt_out_notification_methods_filters_notifications() -> Resu
},
Some(InitializeCapabilities {
experimental_api: true,
opt_out_notification_methods: Some(vec![
"thread/started".to_string(),
"codex/event/session_configured".to_string(),
]),
opt_out_notification_methods: Some(vec!["thread/started".to_string()]),
}),
),
)

View File

@@ -66,7 +66,7 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
const CONNECTOR_ID: &str = "calendar";
const CONNECTOR_NAME: &str = "Calendar";
const TOOL_NAME: &str = "calendar_confirm_action";
const QUALIFIED_TOOL_NAME: &str = "mcp__codex_apps__calendar_confirm_action";
const QUALIFIED_TOOL_NAME: &str = "mcp__codex_apps__calendar-confirm-action";
const TOOL_CALL_ID: &str = "call-calendar-confirm";
const ELICITATION_MESSAGE: &str = "Allow this request?";

View File

@@ -191,7 +191,7 @@ async fn plugin_install_returns_apps_needing_auth() -> Result<()> {
"sample-plugin",
"./sample-plugin",
None,
Some("ON_INSTALL"),
None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &["alpha", "beta"])?;
let marketplace_path =
@@ -217,7 +217,7 @@ async fn plugin_install_returns_apps_needing_auth() -> Result<()> {
assert_eq!(
response,
PluginInstallResponse {
auth_policy: Some(PluginAuthPolicy::OnInstall),
auth_policy: PluginAuthPolicy::OnInstall,
apps_needing_auth: vec![AppSummary {
id: "alpha".to_string(),
name: "Alpha".to_string(),
@@ -299,7 +299,7 @@ async fn plugin_install_filters_disallowed_apps_needing_auth() -> Result<()> {
assert_eq!(
response,
PluginInstallResponse {
auth_policy: Some(PluginAuthPolicy::OnUse),
auth_policy: PluginAuthPolicy::OnUse,
apps_needing_auth: vec![AppSummary {
id: "alpha".to_string(),
name: "Alpha".to_string(),

View File

@@ -26,6 +26,7 @@ use wiremock::matchers::method;
use wiremock::matchers::path;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567";
#[tokio::test]
async fn plugin_list_returns_invalid_request_for_invalid_marketplace_file() -> Result<()> {
@@ -224,10 +225,26 @@ enabled = false
assert_eq!(marketplace.plugins[0].name, "enabled-plugin");
assert_eq!(marketplace.plugins[0].installed, true);
assert_eq!(marketplace.plugins[0].enabled, true);
assert_eq!(
marketplace.plugins[0].install_policy,
PluginInstallPolicy::Available
);
assert_eq!(
marketplace.plugins[0].auth_policy,
PluginAuthPolicy::OnInstall
);
assert_eq!(marketplace.plugins[1].id, "disabled-plugin@codex-curated");
assert_eq!(marketplace.plugins[1].name, "disabled-plugin");
assert_eq!(marketplace.plugins[1].installed, true);
assert_eq!(marketplace.plugins[1].enabled, false);
assert_eq!(
marketplace.plugins[1].install_policy,
PluginInstallPolicy::Available
);
assert_eq!(
marketplace.plugins[1].auth_policy,
PluginAuthPolicy::OnInstall
);
assert_eq!(
marketplace.plugins[2].id,
"uninstalled-plugin@codex-curated"
@@ -235,6 +252,14 @@ enabled = false
assert_eq!(marketplace.plugins[2].name, "uninstalled-plugin");
assert_eq!(marketplace.plugins[2].installed, false);
assert_eq!(marketplace.plugins[2].enabled, false);
assert_eq!(
marketplace.plugins[2].install_policy,
PluginInstallPolicy::Available
);
assert_eq!(
marketplace.plugins[2].auth_policy,
PluginAuthPolicy::OnInstall
);
Ok(())
}
@@ -418,8 +443,8 @@ async fn plugin_list_returns_plugin_interface_with_absolute_asset_paths() -> Res
assert_eq!(plugin.id, "demo-plugin@codex-curated");
assert_eq!(plugin.installed, false);
assert_eq!(plugin.enabled, false);
assert_eq!(plugin.install_policy, Some(PluginInstallPolicy::Available));
assert_eq!(plugin.auth_policy, Some(PluginAuthPolicy::OnInstall));
assert_eq!(plugin.install_policy, PluginInstallPolicy::Available);
assert_eq!(plugin.auth_policy, PluginAuthPolicy::OnInstall);
let interface = plugin
.interface
.as_ref()
@@ -589,7 +614,9 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
assert!(
codex_home
.path()
.join("plugins/cache/openai-curated/gmail/local")
.join(format!(
"plugins/cache/openai-curated/gmail/{TEST_CURATED_PLUGIN_SHA}"
))
.is_dir()
);
assert!(
@@ -682,5 +709,10 @@ fn write_openai_curated_marketplace(
format!(r#"{{"name":"{plugin_name}"}}"#),
)?;
}
std::fs::create_dir_all(codex_home.join(".tmp"))?;
std::fs::write(
codex_home.join(".tmp/plugins.sha"),
format!("{TEST_CURATED_PLUGIN_SHA}\n"),
)?;
Ok(())
}

View File

@@ -13,6 +13,10 @@ use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE;
use codex_app_server::INVALID_PARAMS_ERROR_CODE;
use codex_app_server_protocol::ByteRange;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::CollabAgentState;
use codex_app_server_protocol::CollabAgentStatus;
use codex_app_server_protocol::CollabAgentTool;
use codex_app_server_protocol::CollabAgentToolCallStatus;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::CommandExecutionStatus;
@@ -68,6 +72,12 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
const TEST_ORIGINATOR: &str = "codex_vscode";
const LOCAL_PRAGMATIC_TEMPLATE: &str = "You are a deeply pragmatic, effective software engineer.";
fn body_contains(req: &wiremock::Request, text: &str) -> bool {
String::from_utf8(req.body.clone())
.ok()
.is_some_and(|body| body.contains(text))
}
#[tokio::test]
async fn turn_start_sends_originator_header() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
@@ -1152,11 +1162,6 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
.await??;
// Ensure we do NOT receive a CommandExecutionRequestApproval request before task completes
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
@@ -1462,7 +1467,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
@@ -1652,13 +1657,205 @@ async fn turn_start_file_change_approval_v2() -> Result<()> {
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
#[tokio::test]
async fn turn_start_emits_spawn_agent_item_with_model_metadata_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
const CHILD_PROMPT: &str = "child: do work";
const PARENT_PROMPT: &str = "spawn a child and continue";
const SPAWN_CALL_ID: &str = "spawn-call-1";
const REQUESTED_MODEL: &str = "gpt-5.1";
const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low;
let server = responses::start_mock_server().await;
let spawn_args = serde_json::to_string(&json!({
"message": CHILD_PROMPT,
"model": REQUESTED_MODEL,
"reasoning_effort": REQUESTED_REASONING_EFFORT,
}))?;
let _parent_turn = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, PARENT_PROMPT),
responses::sse(vec![
responses::ev_response_created("resp-turn1-1"),
responses::ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
responses::ev_completed("resp-turn1-1"),
]),
)
.await;
let _child_turn = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
},
responses::sse(vec![
responses::ev_response_created("resp-child-1"),
responses::ev_assistant_message("msg-child-1", "child done"),
responses::ev_completed("resp-child-1"),
]),
)
.await;
let _parent_follow_up = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
responses::sse(vec![
responses::ev_response_created("resp-turn1-2"),
responses::ev_assistant_message("msg-turn1-2", "parent done"),
responses::ev_completed("resp-turn1-2"),
]),
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::from([(Feature::Collab, true)]),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.2-codex".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: PARENT_PROMPT.to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let turn: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
let spawn_started = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let started_notif = mcp
.read_stream_until_notification_message("item/started")
.await?;
let started: ItemStartedNotification =
serde_json::from_value(started_notif.params.expect("item/started params"))?;
if let ThreadItem::CollabAgentToolCall { id, .. } = &started.item
&& id == SPAWN_CALL_ID
{
return Ok::<ThreadItem, anyhow::Error>(started.item);
}
}
})
.await??;
assert_eq!(
spawn_started,
ThreadItem::CollabAgentToolCall {
id: SPAWN_CALL_ID.to_string(),
tool: CollabAgentTool::SpawnAgent,
status: CollabAgentToolCallStatus::InProgress,
sender_thread_id: thread.id.clone(),
receiver_thread_ids: Vec::new(),
prompt: Some(CHILD_PROMPT.to_string()),
model: Some(REQUESTED_MODEL.to_string()),
reasoning_effort: Some(REQUESTED_REASONING_EFFORT),
agents_states: HashMap::new(),
}
);
let spawn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let completed_notif = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification =
serde_json::from_value(completed_notif.params.expect("item/completed params"))?;
if let ThreadItem::CollabAgentToolCall { id, .. } = &completed.item
&& id == SPAWN_CALL_ID
{
return Ok::<ThreadItem, anyhow::Error>(completed.item);
}
}
})
.await??;
let ThreadItem::CollabAgentToolCall {
id,
tool,
status,
sender_thread_id,
receiver_thread_ids,
prompt,
model,
reasoning_effort,
agents_states,
} = spawn_completed
else {
unreachable!("loop ensures we break on collab agent tool call items");
};
let receiver_thread_id = receiver_thread_ids
.first()
.cloned()
.expect("spawn completion should include child thread id");
assert_eq!(id, SPAWN_CALL_ID);
assert_eq!(tool, CollabAgentTool::SpawnAgent);
assert_eq!(status, CollabAgentToolCallStatus::Completed);
assert_eq!(sender_thread_id, thread.id);
assert_eq!(receiver_thread_ids, vec![receiver_thread_id.clone()]);
assert_eq!(prompt, Some(CHILD_PROMPT.to_string()));
assert_eq!(model, Some(REQUESTED_MODEL.to_string()));
assert_eq!(reasoning_effort, Some(REQUESTED_REASONING_EFFORT));
assert_eq!(
agents_states,
HashMap::from([(
receiver_thread_id,
CollabAgentState {
status: CollabAgentStatus::PendingInit,
message: None,
},
)])
);
let turn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let turn_completed_notif = mcp
.read_stream_until_notification_message("turn/completed")
.await?;
let turn_completed: TurnCompletedNotification = serde_json::from_value(
turn_completed_notif.params.expect("turn/completed params"),
)?;
if turn_completed.thread_id == thread.id && turn_completed.turn.id == turn.turn.id {
return Ok::<TurnCompletedNotification, anyhow::Error>(turn_completed);
}
}
})
.await??;
assert_eq!(turn_completed.thread_id, thread.id);
assert_eq!(turn_completed.turn.id, turn.turn.id);
Ok(())
}
#[tokio::test]
async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -1783,7 +1980,7 @@ async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Res
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
@@ -1841,7 +2038,7 @@ async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Res
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
@@ -1993,7 +2190,7 @@ async fn turn_start_file_change_approval_decline_v2() -> Result<()> {
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;

View File

@@ -303,7 +303,7 @@ async fn turn_start_shell_zsh_fork_exec_approval_decline_v2() -> Result<()> {
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;

View File

@@ -133,7 +133,7 @@ async fn turn_steer_rejects_oversized_text_input() -> Result<()> {
let _task_started: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_started"),
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
@@ -236,7 +236,7 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
let _task_started: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_started"),
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;