From aa67cee2074ca4ea6ffdf9dd4cb02a8c06003633 Mon Sep 17 00:00:00 2001 From: Adam Perry Date: Wed, 20 May 2026 14:04:06 -0700 Subject: [PATCH] Allow read-only MCP tools to run in parallel --- codex-rs/core/src/tools/handlers/mcp.rs | 35 ++++- codex-rs/core/tests/suite/rmcp_client.rs | 146 ++++++++++++++++-- .../rmcp-client/src/bin/test_stdio_server.rs | 18 ++- 3 files changed, 185 insertions(+), 14 deletions(-) diff --git a/codex-rs/core/src/tools/handlers/mcp.rs b/codex-rs/core/src/tools/handlers/mcp.rs index 6f4f79505b..86ded3609e 100644 --- a/codex-rs/core/src/tools/handlers/mcp.rs +++ b/codex-rs/core/src/tools/handlers/mcp.rs @@ -43,6 +43,15 @@ impl McpHandler { exposure, } } + + fn has_read_only_hint(&self) -> bool { + self.tool_info + .tool + .annotations + .as_ref() + .and_then(|annotations| annotations.read_only_hint) + .unwrap_or(false) + } } #[async_trait::async_trait] @@ -84,7 +93,9 @@ impl ToolExecutor for McpHandler { } fn supports_parallel_tool_calls(&self) -> bool { - self.tool_info.supports_parallel_tool_calls + // Correctly implemented MCP servers should tolerate parallel calls to + // tools that advertise themselves as read-only. + self.tool_info.supports_parallel_tool_calls || self.has_read_only_hint() } async fn handle( @@ -448,6 +459,28 @@ mod tests { assert_eq!(mcp_hook_tool_input(" "), json!({})); } + #[test] + fn mcp_read_only_hint_supports_parallel_calls_without_server_opt_in() { + let mut read_only_info = tool_info("foo", "mcp__foo__", "read"); + read_only_info.tool.annotations = Some(rmcp::model::ToolAnnotations::new().read_only(true)); + + assert!(McpHandler::new(read_only_info).supports_parallel_tool_calls()); + } + + #[test] + fn mcp_parallel_calls_require_read_only_hint_or_server_opt_in() { + let missing_hint_info = tool_info("foo", "mcp__foo__", "unannotated"); + assert!(!McpHandler::new(missing_hint_info).supports_parallel_tool_calls()); + + let mut writable_info = tool_info("foo", "mcp__foo__", "write"); + writable_info.tool.annotations = Some(rmcp::model::ToolAnnotations::new().read_only(false)); + assert!(!McpHandler::new(writable_info).supports_parallel_tool_calls()); + + let mut server_opt_in_info = tool_info("foo", "mcp__foo__", "server_opt_in"); + server_opt_in_info.supports_parallel_tool_calls = true; + assert!(McpHandler::new(server_opt_in_info).supports_parallel_tool_calls()); + } + fn tool_info(server_name: &str, callable_namespace: &str, tool_name: &str) -> ToolInfo { ToolInfo { server_name: server_name.to_string(), diff --git a/codex-rs/core/tests/suite/rmcp_client.rs b/codex-rs/core/tests/suite/rmcp_client.rs index c00da77236..0f2d97c15f 100644 --- a/codex-rs/core/tests/suite/rmcp_client.rs +++ b/codex-rs/core/tests/suite/rmcp_client.rs @@ -101,10 +101,28 @@ fn read_only_user_turn_with_model( fixture: &TestCodex, text: impl Into, model: String, +) -> Op { + user_turn_with_permission_profile(fixture, text, model, PermissionProfile::read_only()) +} + +fn auto_approved_user_turn(fixture: &TestCodex, text: impl Into) -> Op { + user_turn_with_permission_profile( + fixture, + text, + fixture.session_configured.model.clone(), + PermissionProfile::Disabled, + ) +} + +fn user_turn_with_permission_profile( + fixture: &TestCodex, + text: impl Into, + model: String, + permission_profile: PermissionProfile, ) -> Op { let cwd = fixture.cwd.path().to_path_buf(); let (sandbox_policy, permission_profile) = - turn_permission_fields(PermissionProfile::read_only(), cwd.as_path()); + turn_permission_fields(permission_profile, cwd.as_path()); Op::UserInput { items: vec![UserInput::Text { text: text.into(), @@ -848,8 +866,18 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow:: &server, responses::sse(vec![ responses::ev_response_created("resp-1"), - responses::ev_function_call_with_namespace(first_call_id, &namespace, "sync", &args), - responses::ev_function_call_with_namespace(second_call_id, &namespace, "sync", &args), + responses::ev_function_call_with_namespace( + first_call_id, + &namespace, + "sync_mutable", + &args, + ), + responses::ev_function_call_with_namespace( + second_call_id, + &namespace, + "sync_mutable", + &args, + ), responses::ev_completed("resp-1"), ]), ) @@ -882,9 +910,9 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow:: .await?; fixture .codex - .submit(read_only_user_turn( + .submit(auto_approved_user_turn( &fixture, - "call the rmcp sync tool twice", + "call the rmcp sync_mutable tool twice", )) .await?; @@ -942,19 +970,23 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow:: } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Result<()> { +async fn stdio_mcp_read_only_tool_calls_run_concurrently_without_server_opt_in() +-> anyhow::Result<()> { skip_if_no_network!(Ok(())); let server = responses::start_mock_server().await; - let first_call_id = "sync-1"; - let second_call_id = "sync-2"; + let first_call_id = "sync-read-only-1"; + let second_call_id = "sync-read-only-2"; let server_name = "rmcp"; let namespace = format!("mcp__{server_name}__"); + // The stdio MCP test server holds each sync call at this barrier until both + // calls arrive. A serial scheduler times out inside the server instead of + // returning the structured `{ "result": "ok" }` result asserted below. let args = json!({ "sleep_after_ms": 100, "barrier": { - "id": "stdio-mcp-parallel-tool-calls", + "id": "stdio-mcp-read-only-tool-calls", "participants": 2, "timeout_ms": 1_000 } @@ -990,8 +1022,8 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()), TestMcpServerOptions { experimental_environment: remote_aware_experimental_environment(), - supports_parallel_tool_calls: true, tool_timeout_sec: Some(Duration::from_secs(2)), + ..Default::default() }, ); }) @@ -1001,7 +1033,99 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res .codex .submit(read_only_user_turn( &fixture, - "call the rmcp sync tool twice", + "call the rmcp read-only sync tool twice", + )) + .await?; + + wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let request = final_mock.single_request(); + for call_id in [first_call_id, second_call_id] { + let output_text = request + .function_call_output_text(call_id) + .expect("function_call_output present for rmcp sync call"); + let wrapped_payload = split_wall_time_wrapped_output(&output_text); + let output_json: Value = serde_json::from_str(wrapped_payload) + .expect("wrapped MCP output should preserve structured JSON"); + assert_eq!(output_json, json!({ "result": "ok" })); + } + + server.verify().await; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + + let first_call_id = "sync-1"; + let second_call_id = "sync-2"; + let server_name = "rmcp"; + let namespace = format!("mcp__{server_name}__"); + let args = json!({ + "sleep_after_ms": 100, + "barrier": { + "id": "stdio-mcp-parallel-tool-calls", + "participants": 2, + "timeout_ms": 1_000 + } + }) + .to_string(); + + mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_function_call_with_namespace( + first_call_id, + &namespace, + "sync_mutable", + &args, + ), + responses::ev_function_call_with_namespace( + second_call_id, + &namespace, + "sync_mutable", + &args, + ), + responses::ev_completed("resp-1"), + ]), + ) + .await; + let final_mock = mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_assistant_message("msg-1", "rmcp sync tools completed successfully."), + responses::ev_completed("resp-2"), + ]), + ) + .await; + + let rmcp_test_server_bin = remote_aware_stdio_server_bin()?; + + let fixture = test_codex() + .with_config(move |config| { + insert_mcp_server( + config, + server_name, + stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()), + TestMcpServerOptions { + experimental_environment: remote_aware_experimental_environment(), + supports_parallel_tool_calls: true, + tool_timeout_sec: Some(Duration::from_secs(2)), + }, + ); + }) + .build_with_remote_env(&server) + .await?; + fixture + .codex + .submit(auto_approved_user_turn( + &fixture, + "call the rmcp sync_mutable tool twice", )) .await?; diff --git a/codex-rs/rmcp-client/src/bin/test_stdio_server.rs b/codex-rs/rmcp-client/src/bin/test_stdio_server.rs index 7add4d05f5..7d731d7eda 100644 --- a/codex-rs/rmcp-client/src/bin/test_stdio_server.rs +++ b/codex-rs/rmcp-client/src/bin/test_stdio_server.rs @@ -70,6 +70,7 @@ impl TestToolServer { Self::echo_dash_tool(), Self::cwd_tool(), Self::sync_tool(), + Self::sync_mutable_tool(), Self::image_tool(), Self::image_scenario_tool(), sandbox_meta_tool, @@ -166,6 +167,16 @@ impl TestToolServer { } fn sync_tool() -> Tool { + let mut tool = Self::build_sync_tool("sync"); + tool.annotations = Some(ToolAnnotations::new().read_only(true)); + tool + } + + fn sync_mutable_tool() -> Tool { + Self::build_sync_tool("sync_mutable") + } + + fn build_sync_tool(name: &'static str) -> Tool { #[expect(clippy::expect_used)] let schema: JsonObject = serde_json::from_value(json!({ "type": "object", @@ -188,7 +199,7 @@ impl TestToolServer { .expect("sync tool schema should deserialize"); let mut tool = Tool::new( - Cow::Borrowed("sync"), + Cow::Borrowed(name), Cow::Borrowed( "Synchronize concurrent test calls and optionally delay before or after the barrier.", ), @@ -205,7 +216,6 @@ impl TestToolServer { })) .expect("sync tool output schema should deserialize"); tool.output_schema = Some(Arc::new(output_schema)); - tool.annotations = Some(ToolAnnotations::new().read_only(true)); tool } @@ -551,6 +561,10 @@ impl ServerHandler for TestToolServer { let args = Self::parse_call_args::(&request, "sync")?; Self::sync_result(args).await } + "sync_mutable" => { + let args = Self::parse_call_args::(&request, "sync_mutable")?; + Self::sync_result(args).await + } other => Err(McpError::invalid_params( format!("unknown tool: {other}"), None,