mirror of
https://github.com/openai/codex.git
synced 2026-04-30 09:26:44 +00:00
[5/6] Wire executor-backed MCP stdio (#18212)
## Summary - Add the executor-backed RMCP stdio transport. - Wire MCP stdio placement through the executor environment config. - Cover local and executor-backed stdio paths with the existing MCP test helpers. ## Stack ```text o #18027 [6/6] Fail exec client operations after disconnect │ @ #18212 [5/6] Wire executor-backed MCP stdio │ o #18087 [4/6] Abstract MCP stdio server launching │ o #18020 [3/6] Add pushed exec process events │ o #18086 [2/6] Support piped stdin in exec process API │ o #18085 [1/6] Add MCP server environment config │ o main ``` --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -1,17 +1,26 @@
|
||||
#![allow(clippy::expect_used)]
|
||||
|
||||
use anyhow::Context as _;
|
||||
use anyhow::ensure;
|
||||
use std::collections::HashMap;
|
||||
use std::ffi::OsStr;
|
||||
use std::ffi::OsString;
|
||||
use std::fs;
|
||||
use std::net::TcpListener;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command as StdCommand;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use codex_config::types::McpServerConfig;
|
||||
use codex_config::types::McpServerEnvVar;
|
||||
use codex_config::types::McpServerTransportConfig;
|
||||
use codex_core::config::Config;
|
||||
use codex_exec_server::CreateDirectoryOptions;
|
||||
use codex_features::Feature;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_mcp::MCP_SANDBOX_STATE_META_CAPABILITY;
|
||||
@@ -34,6 +43,7 @@ use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_cargo_bin::cargo_bin;
|
||||
use core_test_support::assert_regex_match;
|
||||
use core_test_support::remote_env_env_var;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::ev_custom_tool_call;
|
||||
use core_test_support::responses::mount_models_once;
|
||||
@@ -54,6 +64,7 @@ use tokio::process::Child;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::sleep;
|
||||
use wiremock::MockServer;
|
||||
|
||||
static OPENAI_PNG: &str = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAD0AAAA9CAYAAAAeYmHpAAAE6klEQVR4Aeyau44UVxCGx1fZsmRLlm3Zoe0XcGQ5cUiCCIgJeS9CHgAhMkISQnIuGQgJEkBcxLW+nqnZ6uqqc+nuWRC7q/P3qetf9e+MtOwyX25O4Nep6JPyop++0qev9HrfgZ+F6r2DuB/vHOrt/UIkqdDHYvujOW6fO7h/CNEI+a5jc+pBR8uy0jVFsziYu5HtfSUk+Io34q921hLNctFSX0gwww+S8wce8K1LfCU+cYW4888aov8NxqvQILUPPReLOrm6zyLxa4i+6VZuFbJo8d1MOHZm+7VUtB/aIvhPWc/3SWg49JcwFLlHxuXKjtyloo+YNhuW3VS+WPBuUEMvCFKjEDVgFBQHXrnazpqiSxNZCkQ1kYiozsbm9Oz7l4i2Il7vGccGNWAc3XosDrZe/9P3ZnMmzHNEQw4smf8RQ87XEAMsC7Az0Au+dgXerfH4+sHvEc0SYGic8WBBUGqFH2gN7yDrazy7m2pbRTeRmU3+MjZmr1h6LJgPbGy23SI6GlYT0brQ71IY8Us4PNQCm+zepSbaD2BY9xCaAsD9IIj/IzFmKMSdHHonwdZATbTnYREf6/VZGER98N9yCWIvXQwXDoDdhZJoT8jwLnJXDB9w4Sb3e6nK5ndzlkTLnP3JBu4LKkbrYrU69gCVceV0JvpyuW1xlsUVngzhwMetn/XamtTORF9IO5YnWNiyeF9zCAfqR3fUW+vZZKLtgP+ts8BmQRBREAdRDhH3o8QuRh/YucNFz2BEjxbRN6LGzphfKmvP6v6QhqIQyZ8XNJ0W0X83MR1PEcJBNO2KC2Z1TW/v244scp9FwRViZxIOBF0Lctk7ZVSavdLvRlV1hz/ysUi9sr8CIcB3nvWBwA93ykTz18eAYxQ6N/K2DkPA1lv3iXCwmDUT7YkjIby9siXueIJj9H+pzSqJ9oIuJWTUgSSt4WO7o/9GGg0viR4VinNRUDoIj34xoCd6pxD3aK3zfdbnx5v1J3ZNNEJsE0sBG7N27ReDrJc4sFxz7dI/ZAbOmmiKvHBitQXpAdR6+F7v+/ol/tOouUV01EeMZQF2BoQDn6dP4XNr+j9GZEtEK1/L8pFw7bd3a53tsTa7WD+054jOFmPg1XBKPQgnqFfmFcy32ZRvjmiIIQTYFvyDxQ8nH8WIwwGwlyDjDznnilYyFr6njrlZwsKkBpO59A7OwgdzPEWRm+G+oeb7IfyNuzjEEVLrOVxJsxvxwF8kmCM6I2QYmJunz4u4TrADpfl7mlbRTWQ7VmrBzh3+C9f6Grc3YoGN9dg/SXFthpRsT6vobfXRs2VBlgBHXVMLHjDNbIZv1sZ9+X3hB09cXdH1JKViyG0+W9bWZDa/r2f9zAFR71sTzGpMSWz2iI4YssWjWo3REy1MDGjdwe5e0dFSiAC1JakBvu4/CUS8Eh6dqHdU0Or0ioY3W5ClSqDXAy7/6SRfgw8vt4I+tbvvNtFT2kVDhY5+IGb1rCqYaXNF08vSALsXCPmt0kQNqJT1p5eI1mkIV/BxCY1z85lOzeFbPBQHURkkPTlwTYK9gTVE25l84IbFFN+YJDHjdpn0gq6mrHht0dkcjbM4UL9283O5p77GN+SPW/QwVB4IUYg7Or+Kp7naR6qktP98LNF2UxWo9yObPIT9KYg+hK4i56no4rfnM0qeyFf6AwAAAP//trwR3wAAAAZJREFUAwBZ0sR75itw5gAAAABJRU5ErkJggg==";
|
||||
|
||||
@@ -86,6 +97,73 @@ enum McpCallEvent {
|
||||
End(String),
|
||||
}
|
||||
|
||||
const REMOTE_MCP_ENVIRONMENT: &str = "remote";
|
||||
|
||||
fn remote_aware_experimental_environment() -> Option<String> {
|
||||
// These tests run locally in normal CI and against the Docker-backed
|
||||
// executor in full-ci. Match that shared test environment instead of
|
||||
// parameterizing each stdio MCP test with its own local/remote cases.
|
||||
std::env::var_os(remote_env_env_var()).map(|_| REMOTE_MCP_ENVIRONMENT.to_string())
|
||||
}
|
||||
|
||||
/// Returns the stdio MCP test server command path for the active test placement.
|
||||
///
|
||||
/// Local test runs can execute the host-built test binary directly. Remote-aware
|
||||
/// runs start MCP stdio through the executor inside Docker, so the host path
|
||||
/// would be meaningless to the process that actually launches the server. When
|
||||
/// the remote test environment is active, copy the binary into the executor
|
||||
/// container and return that in-container path instead.
|
||||
fn remote_aware_stdio_server_bin() -> anyhow::Result<String> {
|
||||
let bin = stdio_server_bin()?;
|
||||
let Some(container_name) = std::env::var_os(remote_env_env_var()) else {
|
||||
return Ok(bin);
|
||||
};
|
||||
let container_name = container_name
|
||||
.into_string()
|
||||
.map_err(|value| anyhow::anyhow!("remote env container name must be utf-8: {value:?}"))?;
|
||||
|
||||
// Keep the Docker path rewrite scoped to tests that use `build_remote_aware`.
|
||||
// Other MCP tests still start their stdio server from the orchestrator test
|
||||
// process, even when the full-ci remote env is present.
|
||||
//
|
||||
// Remote-aware MCP tests run the executor inside Docker. The stdio test
|
||||
// server is built on the host, so hand the executor a copied in-container
|
||||
// path instead of the host build artifact path.
|
||||
// Several remote-aware MCP tests can run in parallel; give each copied
|
||||
// binary its own path so one test cannot replace another test's executable.
|
||||
let unique_suffix = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
|
||||
let remote_path = format!(
|
||||
"/tmp/codex-remote-env/test_stdio_server-{}-{unique_suffix}",
|
||||
std::process::id()
|
||||
);
|
||||
let container_target = format!("{container_name}:{remote_path}");
|
||||
let copy_output = StdCommand::new("docker")
|
||||
.arg("cp")
|
||||
.arg(&bin)
|
||||
.arg(&container_target)
|
||||
.output()
|
||||
.with_context(|| format!("copy {bin} to remote MCP test env"))?;
|
||||
ensure!(
|
||||
copy_output.status.success(),
|
||||
"docker cp test_stdio_server failed: stdout={} stderr={}",
|
||||
String::from_utf8_lossy(©_output.stdout).trim(),
|
||||
String::from_utf8_lossy(©_output.stderr).trim()
|
||||
);
|
||||
|
||||
let chmod_output = StdCommand::new("docker")
|
||||
.args(["exec", &container_name, "chmod", "+x", remote_path.as_str()])
|
||||
.output()
|
||||
.context("mark remote test_stdio_server executable")?;
|
||||
ensure!(
|
||||
chmod_output.status.success(),
|
||||
"docker chmod test_stdio_server failed: stdout={} stderr={}",
|
||||
String::from_utf8_lossy(&chmod_output.stdout).trim(),
|
||||
String::from_utf8_lossy(&chmod_output.stderr).trim()
|
||||
);
|
||||
|
||||
Ok(remote_path)
|
||||
}
|
||||
|
||||
async fn wait_for_mcp_tool(fixture: &TestCodex, tool_name: &str) -> anyhow::Result<()> {
|
||||
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
|
||||
loop {
|
||||
@@ -115,6 +193,7 @@ async fn wait_for_mcp_tool(fixture: &TestCodex, tool_name: &str) -> anyhow::Resu
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestMcpServerOptions {
|
||||
experimental_environment: Option<String>,
|
||||
supports_parallel_tool_calls: bool,
|
||||
tool_timeout_sec: Option<Duration>,
|
||||
}
|
||||
@@ -122,14 +201,23 @@ struct TestMcpServerOptions {
|
||||
fn stdio_transport(
|
||||
command: String,
|
||||
env: Option<HashMap<String, String>>,
|
||||
env_vars: Vec<String>,
|
||||
env_vars: Vec<McpServerEnvVar>,
|
||||
) -> McpServerTransportConfig {
|
||||
stdio_transport_with_cwd(command, env, env_vars, /*cwd*/ None)
|
||||
}
|
||||
|
||||
fn stdio_transport_with_cwd(
|
||||
command: String,
|
||||
env: Option<HashMap<String, String>>,
|
||||
env_vars: Vec<McpServerEnvVar>,
|
||||
cwd: Option<PathBuf>,
|
||||
) -> McpServerTransportConfig {
|
||||
McpServerTransportConfig::Stdio {
|
||||
command,
|
||||
args: Vec::new(),
|
||||
env,
|
||||
env_vars,
|
||||
cwd: None,
|
||||
cwd,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,7 +232,7 @@ fn insert_mcp_server(
|
||||
server_name.to_string(),
|
||||
McpServerConfig {
|
||||
transport,
|
||||
experimental_environment: None,
|
||||
experimental_environment: options.experimental_environment,
|
||||
enabled: true,
|
||||
required: false,
|
||||
supports_parallel_tool_calls: options.supports_parallel_tool_calls,
|
||||
@@ -164,6 +252,105 @@ fn insert_mcp_server(
|
||||
}
|
||||
}
|
||||
|
||||
async fn call_cwd_tool(
|
||||
server: &MockServer,
|
||||
fixture: &TestCodex,
|
||||
server_name: &str,
|
||||
call_id: &str,
|
||||
) -> anyhow::Result<Value> {
|
||||
let namespace = format!("mcp__{server_name}__");
|
||||
mount_sse_once(
|
||||
server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call_with_namespace(call_id, &namespace, "cwd", r#"{}"#),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
mount_sse_once(
|
||||
server,
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("msg-1", "rmcp cwd tool completed successfully."),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
fixture
|
||||
.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "call the rmcp cwd tool".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: fixture.config.cwd.to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&fixture.codex, |ev| {
|
||||
matches!(ev, EventMsg::McpToolCallBegin(_))
|
||||
})
|
||||
.await;
|
||||
let end_event = wait_for_event(&fixture.codex, |ev| {
|
||||
matches!(ev, EventMsg::McpToolCallEnd(_))
|
||||
})
|
||||
.await;
|
||||
let EventMsg::McpToolCallEnd(end) = end_event else {
|
||||
unreachable!("event guard guarantees McpToolCallEnd");
|
||||
};
|
||||
let structured_content = end
|
||||
.result
|
||||
.as_ref()
|
||||
.expect("rmcp cwd tool should return success")
|
||||
.structured_content
|
||||
.as_ref()
|
||||
.expect("structured content")
|
||||
.clone();
|
||||
|
||||
wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
Ok(structured_content)
|
||||
}
|
||||
|
||||
fn assert_cwd_tool_output(structured: &Value, expected_cwd: &Path) {
|
||||
let actual_cwd = structured
|
||||
.get("cwd")
|
||||
.and_then(Value::as_str)
|
||||
.expect("cwd tool should return a string cwd");
|
||||
|
||||
if std::env::var_os(remote_env_env_var()).is_some() {
|
||||
assert_eq!(
|
||||
structured,
|
||||
&json!({
|
||||
"cwd": expected_cwd.to_string_lossy(),
|
||||
})
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Local Windows can report the same absolute directory through an 8.3 path.
|
||||
// Canonical paths keep the assertion focused on cwd precedence.
|
||||
assert_eq!(
|
||||
Path::new(actual_cwd)
|
||||
.canonicalize()
|
||||
.expect("cwd tool path should exist"),
|
||||
expected_cwd
|
||||
.canonicalize()
|
||||
.expect("expected cwd should exist"),
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
#[serial(mcp_test_value)]
|
||||
async fn stdio_server_round_trip() -> anyhow::Result<()> {
|
||||
@@ -199,7 +386,7 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> {
|
||||
.await;
|
||||
|
||||
let expected_env_value = "propagated-env";
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
@@ -214,10 +401,13 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> {
|
||||
)])),
|
||||
Vec::new(),
|
||||
),
|
||||
TestMcpServerOptions::default(),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build(&server)
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
@@ -314,6 +504,118 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
#[serial(mcp_cwd)]
|
||||
async fn stdio_server_uses_configured_cwd_before_runtime_fallback() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let server_name = "rmcp_configured_cwd";
|
||||
let expected_cwd = Arc::new(Mutex::new(None::<PathBuf>));
|
||||
let expected_cwd_for_config = Arc::clone(&expected_cwd);
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_workspace_setup(|cwd, fs| async move {
|
||||
fs.create_directory(
|
||||
&cwd.join("mcp-configured-cwd"),
|
||||
CreateDirectoryOptions { recursive: true },
|
||||
/*sandbox*/ None,
|
||||
)
|
||||
.await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
})
|
||||
.with_config(move |config| {
|
||||
let configured_cwd = config.cwd.join("mcp-configured-cwd").into_path_buf();
|
||||
*expected_cwd_for_config
|
||||
.lock()
|
||||
.expect("expected cwd lock should not be poisoned") = Some(configured_cwd.clone());
|
||||
insert_mcp_server(
|
||||
config,
|
||||
server_name,
|
||||
stdio_transport_with_cwd(
|
||||
rmcp_test_server_bin,
|
||||
Some(HashMap::from([(
|
||||
"MCP_TEST_VALUE".to_string(),
|
||||
"configured-cwd".to_string(),
|
||||
)])),
|
||||
Vec::new(),
|
||||
Some(configured_cwd),
|
||||
),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
|
||||
let expected_cwd = expected_cwd
|
||||
.lock()
|
||||
.expect("expected cwd lock should not be poisoned")
|
||||
.clone()
|
||||
.expect("test config should record configured MCP cwd");
|
||||
let structured = call_cwd_tool(&server, &fixture, server_name, "call-configured-cwd").await?;
|
||||
|
||||
assert_cwd_tool_output(&structured, &expected_cwd);
|
||||
server.verify().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
#[serial(mcp_cwd)]
|
||||
async fn remote_stdio_server_uses_runtime_fallback_cwd_when_config_omits_cwd() -> anyhow::Result<()>
|
||||
{
|
||||
skip_if_no_network!(Ok(()));
|
||||
if std::env::var_os(remote_env_env_var()).is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let server_name = "rmcp_fallback_cwd";
|
||||
let expected_cwd = Arc::new(Mutex::new(None::<PathBuf>));
|
||||
let expected_cwd_for_config = Arc::clone(&expected_cwd);
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
*expected_cwd_for_config
|
||||
.lock()
|
||||
.expect("expected cwd lock should not be poisoned") =
|
||||
Some(config.cwd.to_path_buf());
|
||||
insert_mcp_server(
|
||||
config,
|
||||
server_name,
|
||||
stdio_transport(
|
||||
rmcp_test_server_bin,
|
||||
Some(HashMap::from([(
|
||||
"MCP_TEST_VALUE".to_string(),
|
||||
"fallback-cwd".to_string(),
|
||||
)])),
|
||||
Vec::new(),
|
||||
),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
|
||||
let expected_cwd = expected_cwd
|
||||
.lock()
|
||||
.expect("expected cwd lock should not be poisoned")
|
||||
.clone()
|
||||
.expect("test config should record runtime fallback cwd");
|
||||
let structured = call_cwd_tool(&server, &fixture, server_name, "call-fallback-cwd").await?;
|
||||
|
||||
assert_cwd_tool_output(&structured, &expected_cwd);
|
||||
server.verify().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn stdio_mcp_tool_call_includes_sandbox_state_meta() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -343,17 +645,20 @@ async fn stdio_mcp_tool_call_includes_sandbox_state_meta() -> anyhow::Result<()>
|
||||
)
|
||||
.await;
|
||||
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
insert_mcp_server(
|
||||
config,
|
||||
server_name,
|
||||
stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()),
|
||||
TestMcpServerOptions::default(),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build(&server)
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
|
||||
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
|
||||
@@ -415,7 +720,7 @@ async fn stdio_mcp_tool_call_includes_sandbox_state_meta() -> anyhow::Result<()>
|
||||
);
|
||||
assert_eq!(
|
||||
sandbox_meta.get("sandboxCwd").and_then(Value::as_str),
|
||||
fixture.cwd.path().to_str()
|
||||
fixture.config.cwd.as_path().to_str()
|
||||
);
|
||||
assert_eq!(sandbox_meta.get("useLegacyLandlock"), Some(&json!(false)));
|
||||
|
||||
@@ -455,7 +760,7 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::
|
||||
)
|
||||
.await;
|
||||
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
@@ -464,12 +769,13 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::
|
||||
server_name,
|
||||
stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
tool_timeout_sec: Some(Duration::from_secs(2)),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build(&server)
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
@@ -586,7 +892,7 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res
|
||||
)
|
||||
.await;
|
||||
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
@@ -595,12 +901,13 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res
|
||||
server_name,
|
||||
stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
supports_parallel_tool_calls: true,
|
||||
tool_timeout_sec: Some(Duration::from_secs(2)),
|
||||
},
|
||||
);
|
||||
})
|
||||
.build(&server)
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
@@ -676,7 +983,7 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
|
||||
.await;
|
||||
|
||||
// Build the stdio rmcp server and pass the image as data URL so it can construct ImageContent.
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
@@ -691,10 +998,13 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
|
||||
)])),
|
||||
Vec::new(),
|
||||
),
|
||||
TestMcpServerOptions::default(),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build(&server)
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
@@ -830,7 +1140,7 @@ async fn stdio_image_responses_preserve_original_detail_metadata() -> anyhow::Re
|
||||
)
|
||||
.await;
|
||||
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_model("gpt-5.3-codex")
|
||||
@@ -839,10 +1149,13 @@ async fn stdio_image_responses_preserve_original_detail_metadata() -> anyhow::Re
|
||||
config,
|
||||
server_name,
|
||||
stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()),
|
||||
TestMcpServerOptions::default(),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build(&server)
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
@@ -1053,7 +1366,7 @@ async fn stdio_image_responses_are_sanitized_for_text_only_model() -> anyhow::Re
|
||||
)
|
||||
.await;
|
||||
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
@@ -1069,10 +1382,13 @@ async fn stdio_image_responses_are_sanitized_for_text_only_model() -> anyhow::Re
|
||||
)])),
|
||||
Vec::new(),
|
||||
),
|
||||
TestMcpServerOptions::default(),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build(&server)
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
|
||||
fixture
|
||||
@@ -1168,7 +1484,7 @@ async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> {
|
||||
|
||||
let expected_env_value = "propagated-env-from-whitelist";
|
||||
let _guard = EnvVarGuard::set("MCP_TEST_VALUE", OsStr::new(expected_env_value));
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
@@ -1178,12 +1494,15 @@ async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> {
|
||||
stdio_transport(
|
||||
rmcp_test_server_bin,
|
||||
/*env*/ None,
|
||||
vec!["MCP_TEST_VALUE".to_string()],
|
||||
vec!["MCP_TEST_VALUE".into()],
|
||||
),
|
||||
TestMcpServerOptions::default(),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build(&server)
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
@@ -1262,6 +1581,222 @@ async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
#[serial(mcp_env_source)]
|
||||
async fn stdio_server_propagates_explicit_local_env_var_source() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let call_id = "call-local-source";
|
||||
let server_name = "rmcp_local_source";
|
||||
let namespace = format!("mcp__{server_name}__");
|
||||
let env_name = "MCP_TEST_LOCAL_SOURCE";
|
||||
let expected_env_value = "propagated-explicit-local-source";
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call_with_namespace(
|
||||
call_id,
|
||||
&namespace,
|
||||
"echo",
|
||||
&format!(r#"{{"message":"ping","env_var":"{env_name}"}}"#),
|
||||
),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("msg-1", "rmcp echo tool completed successfully."),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let _guard = EnvVarGuard::set(env_name, OsStr::new(expected_env_value));
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
insert_mcp_server(
|
||||
config,
|
||||
server_name,
|
||||
stdio_transport(
|
||||
rmcp_test_server_bin,
|
||||
/*env*/ None,
|
||||
vec![McpServerEnvVar::Config {
|
||||
name: env_name.to_string(),
|
||||
source: Some("local".to_string()),
|
||||
}],
|
||||
),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
fixture
|
||||
.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "call the rmcp echo tool".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: fixture.cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&fixture.codex, |ev| {
|
||||
matches!(ev, EventMsg::McpToolCallBegin(_))
|
||||
})
|
||||
.await;
|
||||
let end_event = wait_for_event(&fixture.codex, |ev| {
|
||||
matches!(ev, EventMsg::McpToolCallEnd(_))
|
||||
})
|
||||
.await;
|
||||
let EventMsg::McpToolCallEnd(end) = end_event else {
|
||||
unreachable!("event guard guarantees McpToolCallEnd");
|
||||
};
|
||||
let structured = end
|
||||
.result
|
||||
.as_ref()
|
||||
.expect("rmcp echo tool should return success")
|
||||
.structured_content
|
||||
.as_ref()
|
||||
.expect("structured content");
|
||||
assert_eq!(structured["env"], expected_env_value);
|
||||
|
||||
wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
server.verify().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
#[serial(mcp_env_source)]
|
||||
async fn remote_stdio_env_var_source_does_not_copy_local_env() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
if std::env::var_os(remote_env_env_var()).is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let call_id = "call-remote-source";
|
||||
let server_name = "rmcp_remote_source";
|
||||
let namespace = format!("mcp__{server_name}__");
|
||||
let env_name = "MCP_TEST_REMOTE_SOURCE_ONLY";
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call_with_namespace(
|
||||
call_id,
|
||||
&namespace,
|
||||
"echo",
|
||||
&format!(r#"{{"message":"ping","env_var":"{env_name}"}}"#),
|
||||
),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("msg-1", "rmcp echo tool completed successfully."),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let _guard = EnvVarGuard::set(env_name, OsStr::new("local-value-should-not-cross"));
|
||||
let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
insert_mcp_server(
|
||||
config,
|
||||
server_name,
|
||||
stdio_transport(
|
||||
rmcp_test_server_bin,
|
||||
/*env*/ None,
|
||||
vec![McpServerEnvVar::Config {
|
||||
name: env_name.to_string(),
|
||||
source: Some("remote".to_string()),
|
||||
}],
|
||||
),
|
||||
TestMcpServerOptions {
|
||||
experimental_environment: remote_aware_experimental_environment(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
})
|
||||
.build_remote_aware(&server)
|
||||
.await?;
|
||||
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
fixture
|
||||
.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "call the rmcp echo tool".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: fixture.cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&fixture.codex, |ev| {
|
||||
matches!(ev, EventMsg::McpToolCallBegin(_))
|
||||
})
|
||||
.await;
|
||||
let end_event = wait_for_event(&fixture.codex, |ev| {
|
||||
matches!(ev, EventMsg::McpToolCallEnd(_))
|
||||
})
|
||||
.await;
|
||||
let EventMsg::McpToolCallEnd(end) = end_event else {
|
||||
unreachable!("event guard guarantees McpToolCallEnd");
|
||||
};
|
||||
let structured = end
|
||||
.result
|
||||
.as_ref()
|
||||
.expect("rmcp echo tool should return success")
|
||||
.structured_content
|
||||
.as_ref()
|
||||
.expect("structured content");
|
||||
assert_eq!(structured["env"], Value::Null);
|
||||
|
||||
wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
server.verify().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
Reference in New Issue
Block a user