mirror of
https://github.com/openai/codex.git
synced 2026-04-23 22:24:57 +00:00
Wire remote MCP stdio through executor
Use the MCP server experimental_environment string to choose local stdio or executor-backed stdio at client startup time. Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2396,6 +2396,7 @@ dependencies = [
|
||||
"async-channel",
|
||||
"codex-async-utils",
|
||||
"codex-config",
|
||||
"codex-exec-server",
|
||||
"codex-login",
|
||||
"codex-otel",
|
||||
"codex-plugin",
|
||||
|
||||
@@ -16,6 +16,7 @@ anyhow = { workspace = true }
|
||||
async-channel = { workspace = true }
|
||||
codex-async-utils = { workspace = true }
|
||||
codex-config = { workspace = true }
|
||||
codex-exec-server = { workspace = true }
|
||||
codex-login = { workspace = true }
|
||||
codex-otel = { workspace = true }
|
||||
codex-plugin = { workspace = true }
|
||||
|
||||
@@ -355,6 +355,8 @@ pub async fn collect_mcp_snapshot_with_detail(
|
||||
submit_id,
|
||||
tx_event,
|
||||
SandboxPolicy::new_read_only_policy(),
|
||||
/*environment*/ None,
|
||||
config.codex_home.clone(),
|
||||
config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key(auth),
|
||||
tool_plugin_provenance,
|
||||
@@ -421,6 +423,8 @@ pub async fn collect_mcp_server_status_snapshot_with_detail(
|
||||
submit_id,
|
||||
tx_event,
|
||||
SandboxPolicy::new_read_only_policy(),
|
||||
/*environment*/ None,
|
||||
config.codex_home.clone(),
|
||||
config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key(auth),
|
||||
tool_plugin_provenance,
|
||||
|
||||
@@ -36,6 +36,7 @@ use codex_async_utils::CancelErr;
|
||||
use codex_async_utils::OrCancelExt;
|
||||
use codex_config::Constrained;
|
||||
use codex_config::types::OAuthCredentialsStoreMode;
|
||||
use codex_exec_server::Environment;
|
||||
use codex_protocol::ToolName;
|
||||
use codex_protocol::approvals::ElicitationRequest;
|
||||
use codex_protocol::approvals::ElicitationRequestEvent;
|
||||
@@ -50,6 +51,7 @@ use codex_protocol::protocol::McpStartupStatus;
|
||||
use codex_protocol::protocol::McpStartupUpdateEvent;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use codex_rmcp_client::ExecutorStdioServerLauncher;
|
||||
use codex_rmcp_client::LocalStdioServerLauncher;
|
||||
use codex_rmcp_client::RmcpClient;
|
||||
use codex_rmcp_client::SendElicitation;
|
||||
@@ -493,6 +495,8 @@ impl AsyncManagedClient {
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
tool_plugin_provenance: Arc<ToolPluginProvenance>,
|
||||
environment: Option<Arc<Environment>>,
|
||||
remote_stdio_cwd: PathBuf,
|
||||
) -> Self {
|
||||
let tool_filter = ToolFilter::from_config(&config);
|
||||
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
|
||||
@@ -509,8 +513,16 @@ impl AsyncManagedClient {
|
||||
return Err(error.into());
|
||||
}
|
||||
|
||||
let client =
|
||||
Arc::new(make_rmcp_client(&server_name, config.transport, store_mode).await?);
|
||||
let client = Arc::new(
|
||||
make_rmcp_client(
|
||||
&server_name,
|
||||
config.clone(),
|
||||
store_mode,
|
||||
environment,
|
||||
remote_stdio_cwd,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
match start_server_task(
|
||||
server_name,
|
||||
client,
|
||||
@@ -710,6 +722,8 @@ impl McpConnectionManager {
|
||||
submit_id: String,
|
||||
tx_event: Sender<Event>,
|
||||
initial_sandbox_policy: SandboxPolicy,
|
||||
environment: Option<Arc<Environment>>,
|
||||
remote_stdio_cwd: PathBuf,
|
||||
codex_home: PathBuf,
|
||||
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
|
||||
tool_plugin_provenance: ToolPluginProvenance,
|
||||
@@ -754,6 +768,8 @@ impl McpConnectionManager {
|
||||
elicitation_requests.clone(),
|
||||
codex_apps_tools_cache_context,
|
||||
Arc::clone(&tool_plugin_provenance),
|
||||
environment.clone(),
|
||||
remote_stdio_cwd.clone(),
|
||||
);
|
||||
clients.insert(server_name.clone(), async_managed_client.clone());
|
||||
let tx_event = tx_event.clone();
|
||||
@@ -1484,9 +1500,26 @@ struct StartServerTaskParams {
|
||||
|
||||
async fn make_rmcp_client(
|
||||
server_name: &str,
|
||||
transport: McpServerTransportConfig,
|
||||
config: McpServerConfig,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
exec_environment: Option<Arc<Environment>>,
|
||||
remote_stdio_cwd: PathBuf,
|
||||
) -> Result<RmcpClient, StartupOutcomeError> {
|
||||
let McpServerConfig {
|
||||
transport,
|
||||
experimental_environment,
|
||||
..
|
||||
} = config;
|
||||
let remote_environment = match experimental_environment.as_deref() {
|
||||
None | Some("local") => false,
|
||||
Some("remote") => true,
|
||||
Some(environment) => {
|
||||
return Err(StartupOutcomeError::from(anyhow!(
|
||||
"unsupported experimental_environment `{environment}` for MCP server `{server_name}`"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
match transport {
|
||||
McpServerTransportConfig::Stdio {
|
||||
command,
|
||||
@@ -1502,7 +1535,23 @@ async fn make_rmcp_client(
|
||||
.map(|(key, value)| (key.into(), value.into()))
|
||||
.collect::<HashMap<_, _>>()
|
||||
});
|
||||
let launcher = Arc::new(LocalStdioServerLauncher) as Arc<dyn StdioServerLauncher>;
|
||||
let launcher = if remote_environment {
|
||||
let exec_environment = exec_environment.ok_or_else(|| {
|
||||
StartupOutcomeError::from(anyhow!(
|
||||
"remote MCP server `{server_name}` requires an executor environment"
|
||||
))
|
||||
})?;
|
||||
Arc::new(ExecutorStdioServerLauncher::new(
|
||||
exec_environment.get_exec_backend(),
|
||||
remote_stdio_cwd,
|
||||
))
|
||||
} else {
|
||||
Arc::new(LocalStdioServerLauncher) as Arc<dyn StdioServerLauncher>
|
||||
};
|
||||
|
||||
// `RmcpClient` always sees a launched MCP stdio server. The
|
||||
// launcher hides whether that means a local child process or an
|
||||
// executor process whose stdin/stdout bytes cross the process API.
|
||||
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher)
|
||||
.await
|
||||
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
|
||||
@@ -1513,6 +1562,18 @@ async fn make_rmcp_client(
|
||||
env_http_headers,
|
||||
bearer_token_env_var,
|
||||
} => {
|
||||
if remote_environment {
|
||||
return Err(StartupOutcomeError::from(anyhow!(
|
||||
// Remote HTTP needs the future low-level executor
|
||||
// `network/request` API so reqwest runs on the executor side.
|
||||
// Do not fall back to local HTTP here; the config explicitly
|
||||
// asked for remote placement.
|
||||
"remote streamable HTTP MCP server `{server_name}` is not implemented yet"
|
||||
)));
|
||||
}
|
||||
|
||||
// Local streamable HTTP remains the existing reqwest path from
|
||||
// the orchestrator process.
|
||||
let resolved_bearer_token =
|
||||
match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) {
|
||||
Ok(token) => token,
|
||||
|
||||
@@ -2154,7 +2154,7 @@ impl Session {
|
||||
code_mode_service: crate::tools::code_mode::CodeModeService::new(
|
||||
config.js_repl_node_path.clone(),
|
||||
),
|
||||
environment,
|
||||
environment: environment.clone(),
|
||||
};
|
||||
services
|
||||
.model_client
|
||||
@@ -2248,6 +2248,8 @@ impl Session {
|
||||
INITIAL_SUBMIT_ID.to_owned(),
|
||||
tx_event.clone(),
|
||||
session_configuration.sandbox_policy.get().clone(),
|
||||
environment.clone(),
|
||||
session_configuration.cwd.to_path_buf(),
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth),
|
||||
tool_plugin_provenance,
|
||||
@@ -4584,6 +4586,8 @@ impl Session {
|
||||
turn_context.sub_id.clone(),
|
||||
self.get_tx_event(),
|
||||
turn_context.sandbox_policy.get().clone(),
|
||||
turn_context.environment.clone(),
|
||||
turn_context.cwd.to_path_buf(),
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth.as_ref()),
|
||||
tool_plugin_provenance,
|
||||
|
||||
@@ -233,6 +233,8 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
INITIAL_SUBMIT_ID.to_owned(),
|
||||
tx_event,
|
||||
SandboxPolicy::new_read_only_policy(),
|
||||
/*environment*/ None,
|
||||
config.codex_home.to_path_buf(),
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth.as_ref()),
|
||||
ToolPluginProvenance::default(),
|
||||
|
||||
Reference in New Issue
Block a user