From c83ba22359f4140e44fc43500d2bedbb882d7211 Mon Sep 17 00:00:00 2001 From: anp-oai Date: Thu, 21 May 2026 20:40:34 -0700 Subject: [PATCH] Allow parallel MCP tool calls when annotated readOnly (#23750) ## Summary - Treat MCP tools with `readOnlyHint: true` as parallel-safe even when `supports_parallel_tool_calls` is unset or `false`. - Keep server-level `supports_parallel_tool_calls` as an additive override for non-read-only tools. - Add focused unit coverage for the MCP handler eligibility decision. - Update RMCP integration coverage to keep the serial baseline on a mutable tool, verify read-only concurrency without server opt-in, and preserve the server opt-in concurrency path separately. ## Testing - `just fmt` - `cargo test -p codex-core --lib tools::handlers::mcp::tests::` - `cargo test -p codex-core --test all stdio_mcp_read_only_tool_calls_run_concurrently_without_server_opt_in` - `cargo test -p codex-core --test all stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently` - `cargo test -p codex-rmcp-client` --- codex-rs/core/src/tools/handlers/mcp.rs | 47 +++++++ codex-rs/core/tests/suite/rmcp_client.rs | 126 +++++++++++++++++- .../rmcp-client/src/bin/test_stdio_server.rs | 11 ++ 3 files changed, 181 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/src/tools/handlers/mcp.rs b/codex-rs/core/src/tools/handlers/mcp.rs index a6f3cf4ced..5f6261323d 100644 --- a/codex-rs/core/src/tools/handlers/mcp.rs +++ b/codex-rs/core/src/tools/handlers/mcp.rs @@ -49,7 +49,16 @@ impl ToolExecutor for McpHandler { } fn supports_parallel_tool_calls(&self) -> bool { + // Correctly implemented MCP servers should tolerate parallel calls to + // tools that advertise themselves as read-only. self.tool_info.supports_parallel_tool_calls + || self + .tool_info + .tool + .annotations + .as_ref() + .and_then(|annotations| annotations.read_only_hint) + .unwrap_or(false) } async fn handle( @@ -443,6 +452,44 @@ mod tests { assert_eq!(mcp_hook_tool_input(" "), json!({})); } + #[test] + fn mcp_read_only_hint_supports_parallel_calls_without_server_opt_in() { + let mut read_only_info = tool_info("foo", "mcp__foo__", "read"); + read_only_info.tool.annotations = Some(rmcp::model::ToolAnnotations::new().read_only(true)); + + assert!( + McpHandler::new(read_only_info) + .expect("MCP tool spec should build") + .supports_parallel_tool_calls() + ); + } + + #[test] + fn mcp_parallel_calls_require_read_only_hint_or_server_opt_in() { + let missing_hint_info = tool_info("foo", "mcp__foo__", "unannotated"); + assert!( + !McpHandler::new(missing_hint_info) + .expect("MCP tool spec should build") + .supports_parallel_tool_calls() + ); + + let mut writable_info = tool_info("foo", "mcp__foo__", "write"); + writable_info.tool.annotations = Some(rmcp::model::ToolAnnotations::new().read_only(false)); + assert!( + !McpHandler::new(writable_info) + .expect("MCP tool spec should build") + .supports_parallel_tool_calls() + ); + + let mut server_opt_in_info = tool_info("foo", "mcp__foo__", "server_opt_in"); + server_opt_in_info.supports_parallel_tool_calls = true; + assert!( + McpHandler::new(server_opt_in_info) + .expect("MCP tool spec should build") + .supports_parallel_tool_calls() + ); + } + fn tool_info(server_name: &str, callable_namespace: &str, tool_name: &str) -> ToolInfo { ToolInfo { server_name: server_name.to_string(), diff --git a/codex-rs/core/tests/suite/rmcp_client.rs b/codex-rs/core/tests/suite/rmcp_client.rs index 94222deab8..864fb1e07a 100644 --- a/codex-rs/core/tests/suite/rmcp_client.rs +++ b/codex-rs/core/tests/suite/rmcp_client.rs @@ -101,10 +101,28 @@ fn read_only_user_turn_with_model( fixture: &TestCodex, text: impl Into, model: String, +) -> Op { + user_turn_with_permission_profile(fixture, text, model, PermissionProfile::read_only()) +} + +fn auto_approved_user_turn(fixture: &TestCodex, text: impl Into) -> Op { + user_turn_with_permission_profile( + fixture, + text, + fixture.session_configured.model.clone(), + PermissionProfile::Disabled, + ) +} + +fn user_turn_with_permission_profile( + fixture: &TestCodex, + text: impl Into, + model: String, + permission_profile: PermissionProfile, ) -> Op { let cwd = fixture.cwd.path().to_path_buf(); let (sandbox_policy, permission_profile) = - turn_permission_fields(PermissionProfile::read_only(), cwd.as_path()); + turn_permission_fields(permission_profile, cwd.as_path()); Op::UserInput { items: vec![UserInput::Text { text: text.into(), @@ -840,7 +858,10 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow:: .await?; fixture .codex - .submit(read_only_user_turn( + // Keep this baseline on the mutable sync tool so read-only hints do not + // make the call parallel-safe. Bypass read-only turn permissions so + // approval behavior does not block the scheduling assertion. + .submit(auto_approved_user_turn( &fixture, "call the rmcp sync tool twice", )) @@ -899,6 +920,102 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow:: Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn stdio_mcp_read_only_tool_calls_run_concurrently_without_server_opt_in() +-> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + + let first_call_id = "sync-read-only-1"; + let second_call_id = "sync-read-only-2"; + let server_name = "rmcp"; + let namespace = format!("mcp__{server_name}__"); + // The stdio MCP test server holds each sync call at this barrier until both + // calls arrive. A serial scheduler times out inside the server instead of + // returning the structured `{ "result": "ok" }` result asserted below. + let args = json!({ + "sleep_after_ms": 100, + "barrier": { + "id": "stdio-mcp-read-only-tool-calls", + "participants": 2, + "timeout_ms": 1_000 + } + }) + .to_string(); + + mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_function_call_with_namespace( + first_call_id, + &namespace, + "sync_readonly", + &args, + ), + responses::ev_function_call_with_namespace( + second_call_id, + &namespace, + "sync_readonly", + &args, + ), + responses::ev_completed("resp-1"), + ]), + ) + .await; + let final_mock = mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_assistant_message("msg-1", "rmcp sync tools completed successfully."), + responses::ev_completed("resp-2"), + ]), + ) + .await; + + let rmcp_test_server_bin = remote_aware_stdio_server_bin()?; + + let fixture = test_codex() + .with_config(move |config| { + insert_mcp_server( + config, + server_name, + stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()), + TestMcpServerOptions { + environment_id: remote_aware_environment_id(), + tool_timeout_sec: Some(Duration::from_secs(2)), + ..Default::default() + }, + ); + }) + .build_with_remote_env(&server) + .await?; + fixture + .codex + .submit(read_only_user_turn( + &fixture, + "call the rmcp sync_readonly tool twice", + )) + .await?; + + wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let request = final_mock.single_request(); + for call_id in [first_call_id, second_call_id] { + let output_text = request + .function_call_output_text(call_id) + .expect("function_call_output present for rmcp sync call"); + let wrapped_payload = split_wall_time_wrapped_output(&output_text); + let output_json: Value = serde_json::from_str(wrapped_payload) + .expect("wrapped MCP output should preserve structured JSON"); + assert_eq!(output_json, json!({ "result": "ok" })); + } + + server.verify().await; + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); @@ -957,7 +1074,10 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res .await?; fixture .codex - .submit(read_only_user_turn( + // Exercise the server opt-in with the mutable sync tool rather than the + // read-only sync_readonly tool. Bypass read-only turn permissions so + // approval behavior does not block the scheduling assertion. + .submit(auto_approved_user_turn( &fixture, "call the rmcp sync tool twice", )) diff --git a/codex-rs/rmcp-client/src/bin/test_stdio_server.rs b/codex-rs/rmcp-client/src/bin/test_stdio_server.rs index 7add4d05f5..50657ab182 100644 --- a/codex-rs/rmcp-client/src/bin/test_stdio_server.rs +++ b/codex-rs/rmcp-client/src/bin/test_stdio_server.rs @@ -70,6 +70,7 @@ impl TestToolServer { Self::echo_dash_tool(), Self::cwd_tool(), Self::sync_tool(), + Self::sync_readonly_tool(), Self::image_tool(), Self::image_scenario_tool(), sandbox_meta_tool, @@ -205,6 +206,12 @@ impl TestToolServer { })) .expect("sync tool output schema should deserialize"); tool.output_schema = Some(Arc::new(output_schema)); + tool + } + + fn sync_readonly_tool() -> Tool { + let mut tool = Self::sync_tool(); + tool.name = Cow::Borrowed("sync_readonly"); tool.annotations = Some(ToolAnnotations::new().read_only(true)); tool } @@ -551,6 +558,10 @@ impl ServerHandler for TestToolServer { let args = Self::parse_call_args::(&request, "sync")?; Self::sync_result(args).await } + "sync_readonly" => { + let args = Self::parse_call_args::(&request, "sync_readonly")?; + Self::sync_result(args).await + } other => Err(McpError::invalid_params( format!("unknown tool: {other}"), None,