mirror of
https://github.com/openai/codex.git
synced 2026-05-22 20:14:17 +00:00
Allow read-only MCP tools to run in parallel
This commit is contained in:
@@ -43,6 +43,15 @@ impl McpHandler {
|
||||
exposure,
|
||||
}
|
||||
}
|
||||
|
||||
fn has_read_only_hint(&self) -> bool {
|
||||
self.tool_info
|
||||
.tool
|
||||
.annotations
|
||||
.as_ref()
|
||||
.and_then(|annotations| annotations.read_only_hint)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -84,7 +93,9 @@ impl ToolExecutor<ToolInvocation> for McpHandler {
|
||||
}
|
||||
|
||||
fn supports_parallel_tool_calls(&self) -> bool {
|
||||
self.tool_info.supports_parallel_tool_calls
|
||||
// Correctly implemented MCP servers should tolerate parallel calls to
|
||||
// tools that advertise themselves as read-only.
|
||||
self.tool_info.supports_parallel_tool_calls || self.has_read_only_hint()
|
||||
}
|
||||
|
||||
async fn handle(
|
||||
@@ -448,6 +459,28 @@ mod tests {
|
||||
assert_eq!(mcp_hook_tool_input(" "), json!({}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mcp_read_only_hint_supports_parallel_calls_without_server_opt_in() {
|
||||
let mut read_only_info = tool_info("foo", "mcp__foo__", "read");
|
||||
read_only_info.tool.annotations = Some(rmcp::model::ToolAnnotations::new().read_only(true));
|
||||
|
||||
assert!(McpHandler::new(read_only_info).supports_parallel_tool_calls());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mcp_parallel_calls_require_read_only_hint_or_server_opt_in() {
|
||||
let missing_hint_info = tool_info("foo", "mcp__foo__", "unannotated");
|
||||
assert!(!McpHandler::new(missing_hint_info).supports_parallel_tool_calls());
|
||||
|
||||
let mut writable_info = tool_info("foo", "mcp__foo__", "write");
|
||||
writable_info.tool.annotations = Some(rmcp::model::ToolAnnotations::new().read_only(false));
|
||||
assert!(!McpHandler::new(writable_info).supports_parallel_tool_calls());
|
||||
|
||||
let mut server_opt_in_info = tool_info("foo", "mcp__foo__", "server_opt_in");
|
||||
server_opt_in_info.supports_parallel_tool_calls = true;
|
||||
assert!(McpHandler::new(server_opt_in_info).supports_parallel_tool_calls());
|
||||
}
|
||||
|
||||
fn tool_info(server_name: &str, callable_namespace: &str, tool_name: &str) -> ToolInfo {
|
||||
ToolInfo {
|
||||
server_name: server_name.to_string(),
|
||||
|
||||
@@ -101,10 +101,28 @@ fn read_only_user_turn_with_model(
|
||||
fixture: &TestCodex,
|
||||
text: impl Into<String>,
|
||||
model: String,
|
||||
) -> Op {
|
||||
user_turn_with_permission_profile(fixture, text, model, PermissionProfile::read_only())
|
||||
}
|
||||
|
||||
fn auto_approved_user_turn(fixture: &TestCodex, text: impl Into<String>) -> Op {
|
||||
user_turn_with_permission_profile(
|
||||
fixture,
|
||||
text,
|
||||
fixture.session_configured.model.clone(),
|
||||
PermissionProfile::Disabled,
|
||||
)
|
||||
}
|
||||
|
||||
fn user_turn_with_permission_profile(
|
||||
fixture: &TestCodex,
|
||||
text: impl Into<String>,
|
||||
model: String,
|
||||
permission_profile: PermissionProfile,
|
||||
) -> Op {
|
||||
let cwd = fixture.cwd.path().to_path_buf();
|
||||
let (sandbox_policy, permission_profile) =
|
||||
turn_permission_fields(PermissionProfile::read_only(), cwd.as_path());
|
||||
turn_permission_fields(permission_profile, cwd.as_path());
|
||||
Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: text.into(),
|
||||
@@ -848,8 +866,18 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call_with_namespace(first_call_id, &namespace, "sync", &args),
|
||||
responses::ev_function_call_with_namespace(second_call_id, &namespace, "sync", &args),
|
||||
responses::ev_function_call_with_namespace(
|
||||
first_call_id,
|
||||
&namespace,
|
||||
"sync_mutable",
|
||||
&args,
|
||||
),
|
||||
responses::ev_function_call_with_namespace(
|
||||
second_call_id,
|
||||
&namespace,
|
||||
"sync_mutable",
|
||||
&args,
|
||||
),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
@@ -882,9 +910,9 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::
|
||||
.await?;
|
||||
fixture
|
||||
.codex
|
||||
.submit(read_only_user_turn(
|
||||
.submit(auto_approved_user_turn(
|
||||
&fixture,
|
||||
"call the rmcp sync tool twice",
|
||||
"call the rmcp sync_mutable tool twice",
|
||||
))
|
||||
.await?;
|
||||
|
||||
@@ -942,19 +970,23 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Result<()> {
|
||||
async fn stdio_mcp_read_only_tool_calls_run_concurrently_without_server_opt_in()
|
||||
-> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
|
||||
let first_call_id = "sync-1";
|
||||
let second_call_id = "sync-2";
|
||||
let first_call_id = "sync-read-only-1";
|
||||
let second_call_id = "sync-read-only-2";
|
||||
let server_name = "rmcp";
|
||||
let namespace = format!("mcp__{server_name}__");
|
||||
// The stdio MCP test server holds each sync call at this barrier until both
|
||||
// calls arrive. A serial scheduler times out inside the server instead of
|
||||
// returning the structured `{ "result": "ok" }` result asserted below.
|
||||
let args = json!({
|
||||
"sleep_after_ms": 100,
|
||||
"barrier": {
|
||||
"id": "stdio-mcp-parallel-tool-calls",
|
||||
"id": "stdio-mcp-read-only-tool-calls",
|
||||
"participants": 2,
|
||||
"timeout_ms": 1_000
|
||||
}
|
||||
@@ -990,8 +1022,8 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res
|
||||
stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
supports_parallel_tool_calls: true,
|
||||
tool_timeout_sec: Some(Duration::from_secs(2)),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
@@ -1001,7 +1033,99 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res
|
||||
.codex
|
||||
.submit(read_only_user_turn(
|
||||
&fixture,
|
||||
"call the rmcp sync tool twice",
|
||||
"call the rmcp read-only sync tool twice",
|
||||
))
|
||||
.await?;
|
||||
|
||||
wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let request = final_mock.single_request();
|
||||
for call_id in [first_call_id, second_call_id] {
|
||||
let output_text = request
|
||||
.function_call_output_text(call_id)
|
||||
.expect("function_call_output present for rmcp sync call");
|
||||
let wrapped_payload = split_wall_time_wrapped_output(&output_text);
|
||||
let output_json: Value = serde_json::from_str(wrapped_payload)
|
||||
.expect("wrapped MCP output should preserve structured JSON");
|
||||
assert_eq!(output_json, json!({ "result": "ok" }));
|
||||
}
|
||||
|
||||
server.verify().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
|
||||
let first_call_id = "sync-1";
|
||||
let second_call_id = "sync-2";
|
||||
let server_name = "rmcp";
|
||||
let namespace = format!("mcp__{server_name}__");
|
||||
let args = json!({
|
||||
"sleep_after_ms": 100,
|
||||
"barrier": {
|
||||
"id": "stdio-mcp-parallel-tool-calls",
|
||||
"participants": 2,
|
||||
"timeout_ms": 1_000
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call_with_namespace(
|
||||
first_call_id,
|
||||
&namespace,
|
||||
"sync_mutable",
|
||||
&args,
|
||||
),
|
||||
responses::ev_function_call_with_namespace(
|
||||
second_call_id,
|
||||
&namespace,
|
||||
"sync_mutable",
|
||||
&args,
|
||||
),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let final_mock = mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("msg-1", "rmcp sync tools completed successfully."),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
insert_mcp_server(
|
||||
config,
|
||||
server_name,
|
||||
stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
supports_parallel_tool_calls: true,
|
||||
tool_timeout_sec: Some(Duration::from_secs(2)),
|
||||
},
|
||||
);
|
||||
})
|
||||
.build_with_remote_env(&server)
|
||||
.await?;
|
||||
fixture
|
||||
.codex
|
||||
.submit(auto_approved_user_turn(
|
||||
&fixture,
|
||||
"call the rmcp sync_mutable tool twice",
|
||||
))
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -70,6 +70,7 @@ impl TestToolServer {
|
||||
Self::echo_dash_tool(),
|
||||
Self::cwd_tool(),
|
||||
Self::sync_tool(),
|
||||
Self::sync_mutable_tool(),
|
||||
Self::image_tool(),
|
||||
Self::image_scenario_tool(),
|
||||
sandbox_meta_tool,
|
||||
@@ -166,6 +167,16 @@ impl TestToolServer {
|
||||
}
|
||||
|
||||
fn sync_tool() -> Tool {
|
||||
let mut tool = Self::build_sync_tool("sync");
|
||||
tool.annotations = Some(ToolAnnotations::new().read_only(true));
|
||||
tool
|
||||
}
|
||||
|
||||
fn sync_mutable_tool() -> Tool {
|
||||
Self::build_sync_tool("sync_mutable")
|
||||
}
|
||||
|
||||
fn build_sync_tool(name: &'static str) -> Tool {
|
||||
#[expect(clippy::expect_used)]
|
||||
let schema: JsonObject = serde_json::from_value(json!({
|
||||
"type": "object",
|
||||
@@ -188,7 +199,7 @@ impl TestToolServer {
|
||||
.expect("sync tool schema should deserialize");
|
||||
|
||||
let mut tool = Tool::new(
|
||||
Cow::Borrowed("sync"),
|
||||
Cow::Borrowed(name),
|
||||
Cow::Borrowed(
|
||||
"Synchronize concurrent test calls and optionally delay before or after the barrier.",
|
||||
),
|
||||
@@ -205,7 +216,6 @@ impl TestToolServer {
|
||||
}))
|
||||
.expect("sync tool output schema should deserialize");
|
||||
tool.output_schema = Some(Arc::new(output_schema));
|
||||
tool.annotations = Some(ToolAnnotations::new().read_only(true));
|
||||
tool
|
||||
}
|
||||
|
||||
@@ -551,6 +561,10 @@ impl ServerHandler for TestToolServer {
|
||||
let args = Self::parse_call_args::<SyncArgs>(&request, "sync")?;
|
||||
Self::sync_result(args).await
|
||||
}
|
||||
"sync_mutable" => {
|
||||
let args = Self::parse_call_args::<SyncArgs>(&request, "sync_mutable")?;
|
||||
Self::sync_result(args).await
|
||||
}
|
||||
other => Err(McpError::invalid_params(
|
||||
format!("unknown tool: {other}"),
|
||||
None,
|
||||
|
||||
Reference in New Issue
Block a user