mirror of
https://github.com/openai/codex.git
synced 2026-04-30 09:26:44 +00:00
Add supports_parallel_tool_calls flag to included mcps (#17667)
## Why
For more advanced MCP usage, we want the model to be able to emit
parallel MCP tool calls and have Codex execute eligible ones
concurrently, instead of forcing all MCP calls through the serial block.
The main design choice was where to thread the config. I made this
server-level because parallel safety depends on the MCP server
implementation. Codex reads the flag from `mcp_servers`, threads the
opted-in server names into `ToolRouter`, and checks the parsed
`ToolPayload::Mcp { server, .. }` at execution time. That avoids relying
on model-visible tool names, which can be incomplete in
deferred/search-tool paths or ambiguous for similarly named
servers/tools.
## What was added
Added `supports_parallel_tool_calls` for MCP servers.
Before:
```toml
[mcp_servers.docs]
command = "docs-server"
```
After:
```toml
[mcp_servers.docs]
command = "docs-server"
supports_parallel_tool_calls = true
```
MCP calls remain serial by default. Only tools from opted-in servers are
eligible to run in parallel. Docs also now warn to enable this only when
the server’s tools are safe to run concurrently, especially around
shared state or read/write races.
## Testing
Tested with a local stdio MCP server exposing real delay tools. The
model/Responses side was mocked only to deterministically emit two MCP
calls in the same turn.
Each test called `query_with_delay` and `query_with_delay_2` with `{
"seconds": 25 }`.
| Build/config | Observed | Wall time |
| --- | --- | --- |
| main with flag enabled | serial | `58.79s` |
| PR with flag enabled | parallel | `31.73s` |
| PR without flag | serial | `56.70s` |
PR with flag enabled showed both tools start before either completed;
main and PR-without-flag completed the first delay before starting the
second.
Also added an integration test.
Additional checks:
- `cargo test -p codex-tools` passed
- `cargo test -p codex-core
mcp_parallel_support_uses_exact_payload_server` passed
- `git diff --check` passed
This commit is contained in:
@@ -200,6 +200,7 @@ async fn run_code_mode_turn_with_rmcp(
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
|
||||
@@ -75,6 +75,12 @@ fn assert_wall_time_header(output: &str) {
|
||||
assert_eq!(marker, "Output:");
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum McpCallEvent {
|
||||
Begin(String),
|
||||
End(String),
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
#[serial(mcp_test_value)]
|
||||
async fn stdio_server_round_trip() -> anyhow::Result<()> {
|
||||
@@ -125,6 +131,7 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> {
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
@@ -230,6 +237,263 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
|
||||
let first_call_id = "sync-serial-1";
|
||||
let second_call_id = "sync-serial-2";
|
||||
let server_name = "rmcp";
|
||||
let tool_name = format!("mcp__{server_name}__sync");
|
||||
let args = json!({ "sleep_after_ms": 100 }).to_string();
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(first_call_id, &tool_name, &args),
|
||||
responses::ev_function_call(second_call_id, &tool_name, &args),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let final_mock = mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("msg-1", "rmcp sync tools completed successfully."),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
let mut servers = config.mcp_servers.get().clone();
|
||||
servers.insert(
|
||||
server_name.to_string(),
|
||||
McpServerConfig {
|
||||
transport: McpServerTransportConfig::Stdio {
|
||||
command: rmcp_test_server_bin,
|
||||
args: Vec::new(),
|
||||
env: None,
|
||||
env_vars: Vec::new(),
|
||||
cwd: None,
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: Some(Duration::from_secs(2)),
|
||||
enabled_tools: None,
|
||||
disabled_tools: None,
|
||||
scopes: None,
|
||||
oauth_resource: None,
|
||||
tools: HashMap::new(),
|
||||
},
|
||||
);
|
||||
config
|
||||
.mcp_servers
|
||||
.set(servers)
|
||||
.expect("test mcp servers should accept any configuration");
|
||||
})
|
||||
.build(&server)
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
fixture
|
||||
.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "call the rmcp sync tool twice".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: fixture.cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut call_events = Vec::new();
|
||||
while call_events.len() < 4 {
|
||||
let event = wait_for_event(&fixture.codex, |ev| {
|
||||
matches!(
|
||||
ev,
|
||||
EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallEnd(_)
|
||||
)
|
||||
})
|
||||
.await;
|
||||
match event {
|
||||
EventMsg::McpToolCallBegin(begin) => {
|
||||
call_events.push(McpCallEvent::Begin(begin.call_id));
|
||||
}
|
||||
EventMsg::McpToolCallEnd(end) => {
|
||||
call_events.push(McpCallEvent::End(end.call_id));
|
||||
}
|
||||
_ => unreachable!("event guard guarantees MCP call events"),
|
||||
}
|
||||
}
|
||||
|
||||
let event_index = |needle: McpCallEvent| {
|
||||
call_events
|
||||
.iter()
|
||||
.position(|event| event == &needle)
|
||||
.expect("expected MCP call event")
|
||||
};
|
||||
let first_begin = event_index(McpCallEvent::Begin(first_call_id.to_string()));
|
||||
let first_end = event_index(McpCallEvent::End(first_call_id.to_string()));
|
||||
let second_begin = event_index(McpCallEvent::Begin(second_call_id.to_string()));
|
||||
let second_end = event_index(McpCallEvent::End(second_call_id.to_string()));
|
||||
assert!(
|
||||
first_end < second_begin || second_end < first_begin,
|
||||
"default MCP tool calls should run serially; saw events: {call_events:?}"
|
||||
);
|
||||
|
||||
wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let request = final_mock.single_request();
|
||||
for call_id in [first_call_id, second_call_id] {
|
||||
let output_text = request
|
||||
.function_call_output_text(call_id)
|
||||
.expect("function_call_output present for rmcp sync call");
|
||||
let wrapped_payload = split_wall_time_wrapped_output(&output_text);
|
||||
let output_json: Value = serde_json::from_str(wrapped_payload)
|
||||
.expect("wrapped MCP output should preserve structured JSON");
|
||||
assert_eq!(output_json, json!({ "result": "ok" }));
|
||||
}
|
||||
|
||||
server.verify().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
|
||||
let first_call_id = "sync-1";
|
||||
let second_call_id = "sync-2";
|
||||
let server_name = "rmcp";
|
||||
let tool_name = format!("mcp__{server_name}__sync");
|
||||
let args = json!({
|
||||
"sleep_after_ms": 100,
|
||||
"barrier": {
|
||||
"id": "stdio-mcp-parallel-tool-calls",
|
||||
"participants": 2,
|
||||
"timeout_ms": 1_000
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(first_call_id, &tool_name, &args),
|
||||
responses::ev_function_call(second_call_id, &tool_name, &args),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let final_mock = mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("msg-1", "rmcp sync tools completed successfully."),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
let mut servers = config.mcp_servers.get().clone();
|
||||
servers.insert(
|
||||
server_name.to_string(),
|
||||
McpServerConfig {
|
||||
transport: McpServerTransportConfig::Stdio {
|
||||
command: rmcp_test_server_bin,
|
||||
args: Vec::new(),
|
||||
env: None,
|
||||
env_vars: Vec::new(),
|
||||
cwd: None,
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: true,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: Some(Duration::from_secs(2)),
|
||||
enabled_tools: None,
|
||||
disabled_tools: None,
|
||||
scopes: None,
|
||||
oauth_resource: None,
|
||||
tools: HashMap::new(),
|
||||
},
|
||||
);
|
||||
config
|
||||
.mcp_servers
|
||||
.set(servers)
|
||||
.expect("test mcp servers should accept any configuration");
|
||||
})
|
||||
.build(&server)
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
fixture
|
||||
.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "call the rmcp sync tool twice".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: fixture.cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let request = final_mock.single_request();
|
||||
for call_id in [first_call_id, second_call_id] {
|
||||
let output_text = request
|
||||
.function_call_output_text(call_id)
|
||||
.expect("function_call_output present for rmcp sync call");
|
||||
let wrapped_payload = split_wall_time_wrapped_output(&output_text);
|
||||
let output_json: Value = serde_json::from_str(wrapped_payload)
|
||||
.expect("wrapped MCP output should preserve structured JSON");
|
||||
assert_eq!(output_json, json!({ "result": "ok" }));
|
||||
}
|
||||
|
||||
server.verify().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
#[serial(mcp_test_value)]
|
||||
async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
|
||||
@@ -282,6 +546,7 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
@@ -514,6 +779,7 @@ async fn stdio_image_responses_are_sanitized_for_text_only_model() -> anyhow::Re
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
@@ -637,6 +903,7 @@ async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> {
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
@@ -800,6 +1067,7 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> {
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
@@ -1023,6 +1291,7 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
|
||||
@@ -649,6 +649,7 @@ async fn tool_search_indexes_only_enabled_non_app_mcp_tools() -> Result<()> {
|
||||
disabled_tools: Some(vec!["image".to_string()]),
|
||||
scopes: None,
|
||||
oauth_resource: None,
|
||||
supports_parallel_tool_calls: false,
|
||||
tools: HashMap::new(),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -368,6 +368,7 @@ async fn mcp_call_marks_thread_memory_mode_polluted_when_configured() -> Result<
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
|
||||
@@ -372,6 +372,7 @@ async fn mcp_tool_call_output_exceeds_limit_truncated_for_model() -> Result<()>
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(std::time::Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
@@ -468,6 +469,7 @@ async fn mcp_image_output_preserves_image_and_no_text_summary() -> Result<()> {
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
@@ -742,6 +744,7 @@ async fn mcp_tool_call_output_not_truncated_with_custom_limit() -> Result<()> {
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(std::time::Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
|
||||
Reference in New Issue
Block a user