//! Connection manager for Model Context Protocol (MCP) servers. //! //! The [`McpConnectionManager`] owns one [`codex_rmcp_client::RmcpClient`] per //! configured server (keyed by the *server name*). It offers convenience //! helpers to query the available tools across *all* servers and returns them //! in a single aggregated map using the model-visible fully-qualified tool name //! as the key. use std::borrow::Cow; use std::collections::HashMap; use std::collections::HashSet; use std::env; use std::ffi::OsString; use std::path::PathBuf; use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; use std::time::Instant; use crate::McpAuthStatusEntry; use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; use crate::mcp::McpConfig; use crate::mcp::ToolPluginProvenance; use crate::mcp::configured_mcp_servers; use crate::mcp::effective_mcp_servers; use crate::mcp::mcp_permission_prompt_is_auto_approved; use crate::mcp::tool_plugin_provenance; pub(crate) use crate::mcp_tool_names::qualify_tools; use anyhow::Context; use anyhow::Result; use anyhow::anyhow; use async_channel::Sender; use codex_async_utils::CancelErr; use codex_async_utils::OrCancelExt; use codex_config::Constrained; use codex_config::types::OAuthCredentialsStoreMode; use codex_exec_server::Environment; use codex_protocol::ToolName; use codex_protocol::approvals::ElicitationRequest; use codex_protocol::approvals::ElicitationRequestEvent; use codex_protocol::mcp::CallToolResult; use codex_protocol::mcp::RequestId as ProtocolRequestId; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::McpStartupCompleteEvent; use codex_protocol::protocol::McpStartupFailure; use codex_protocol::protocol::McpStartupStatus; use codex_protocol::protocol::McpStartupUpdateEvent; use codex_protocol::protocol::SandboxPolicy; use codex_rmcp_client::ElicitationResponse; use codex_rmcp_client::ExecutorStdioServerLauncher; use codex_rmcp_client::LocalStdioServerLauncher; use codex_rmcp_client::RmcpClient; use codex_rmcp_client::SendElicitation; use codex_rmcp_client::StdioServerLauncher; use futures::future::BoxFuture; use futures::future::FutureExt; use futures::future::Shared; use rmcp::model::ClientCapabilities; use rmcp::model::CreateElicitationRequestParams; use rmcp::model::ElicitationAction; use rmcp::model::ElicitationCapability; use rmcp::model::FormElicitationCapability; use rmcp::model::Implementation; use rmcp::model::InitializeRequestParams; use rmcp::model::ListResourceTemplatesResult; use rmcp::model::ListResourcesResult; use rmcp::model::PaginatedRequestParams; use rmcp::model::ProtocolVersion; use rmcp::model::ReadResourceRequestParams; use rmcp::model::ReadResourceResult; use rmcp::model::RequestId; use rmcp::model::Resource; use rmcp::model::ResourceTemplate; use rmcp::model::Tool; use serde::Deserialize; use serde::Serialize; use serde_json::Map; use serde_json::Value as JsonValue; use sha1::Digest; use sha1::Sha1; use tokio::sync::Mutex; use tokio::sync::oneshot; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::instrument; use tracing::warn; use url::Url; use codex_config::McpServerConfig; use codex_config::McpServerTransportConfig; use codex_login::CodexAuth; use codex_utils_plugins::mcp_connector::is_connector_id_allowed; use codex_utils_plugins::mcp_connector::sanitize_name; /// Delimiter used to separate MCP tool-name parts. const MCP_TOOL_NAME_DELIMITER: &str = "__"; /// Default timeout for initializing MCP server & initially listing tools. pub const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(30); /// Default timeout for individual tool calls. const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(120); const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 2; const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools"; const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms"; const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str = "codex.mcp.tools.fetch_uncached.duration_ms"; const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str = "codex.mcp.tools.cache_write.duration_ms"; fn sha1_hex(s: &str) -> String { let mut hasher = Sha1::new(); hasher.update(s.as_bytes()); let sha1 = hasher.finalize(); format!("{sha1:x}") } pub fn codex_apps_tools_cache_key(auth: Option<&CodexAuth>) -> CodexAppsToolsCacheKey { let token_data = auth.and_then(|auth| auth.get_token_data().ok()); let account_id = token_data .as_ref() .and_then(|token_data| token_data.account_id.clone()); let chatgpt_user_id = token_data .as_ref() .and_then(|token_data| token_data.id_token.chatgpt_user_id.clone()); let is_workspace_account = token_data .as_ref() .is_some_and(|token_data| token_data.id_token.is_workspace_account()); CodexAppsToolsCacheKey { account_id, chatgpt_user_id, is_workspace_account, } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ToolInfo { /// Raw MCP server name used for routing the tool call. pub server_name: String, /// Model-visible tool name used in Responses API tool declarations. #[serde(rename = "tool_name", alias = "callable_name")] pub callable_name: String, /// Model-visible namespace used for deferred tool loading. #[serde(rename = "tool_namespace", alias = "callable_namespace")] pub callable_namespace: String, /// Instructions from the MCP server initialize result. #[serde(default)] pub server_instructions: Option, /// Raw MCP tool definition; `tool.name` is sent back to the MCP server. pub tool: Tool, pub connector_id: Option, pub connector_name: Option, #[serde(default)] pub plugin_display_names: Vec, pub connector_description: Option, } impl ToolInfo { pub fn canonical_tool_name(&self) -> ToolName { ToolName::namespaced(self.callable_namespace.clone(), self.callable_name.clone()) } } const META_OPENAI_FILE_PARAMS: &str = "openai/fileParams"; pub fn declared_openai_file_input_param_names( meta: Option<&Map>, ) -> Vec { let Some(meta) = meta else { return Vec::new(); }; meta.get(META_OPENAI_FILE_PARAMS) .and_then(JsonValue::as_array) .into_iter() .flatten() .filter_map(JsonValue::as_str) .filter(|value| !value.is_empty()) .map(str::to_string) .collect() } /// Returns the model-visible view of a tool while preserving the raw metadata /// used by execution. Keep cache entries raw and call this at manager return /// boundaries. fn tool_with_model_visible_input_schema(tool: &Tool) -> Tool { let file_params = declared_openai_file_input_param_names(tool.meta.as_deref()); if file_params.is_empty() { return tool.clone(); } let mut tool = tool.clone(); let mut input_schema = JsonValue::Object(tool.input_schema.as_ref().clone()); mask_input_schema_for_file_path_params(&mut input_schema, &file_params); if let JsonValue::Object(input_schema) = input_schema { tool.input_schema = Arc::new(input_schema); } tool } fn mask_input_schema_for_file_path_params(input_schema: &mut JsonValue, file_params: &[String]) { let Some(properties) = input_schema .as_object_mut() .and_then(|schema| schema.get_mut("properties")) .and_then(JsonValue::as_object_mut) else { return; }; for field_name in file_params { let Some(property_schema) = properties.get_mut(field_name) else { continue; }; mask_input_property_schema(property_schema); } } fn mask_input_property_schema(schema: &mut JsonValue) { let Some(object) = schema.as_object_mut() else { return; }; let mut description = object .get("description") .and_then(JsonValue::as_str) .map(str::to_string) .unwrap_or_default(); let guidance = "This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here."; if description.is_empty() { description = guidance.to_string(); } else if !description.contains(guidance) { description = format!("{description} {guidance}"); } let is_array = object.get("type").and_then(JsonValue::as_str) == Some("array") || object.get("items").is_some(); object.clear(); object.insert("description".to_string(), JsonValue::String(description)); if is_array { object.insert("type".to_string(), JsonValue::String("array".to_string())); object.insert("items".to_string(), serde_json::json!({ "type": "string" })); } else { object.insert("type".to_string(), JsonValue::String("string".to_string())); } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct CodexAppsToolsCacheKey { account_id: Option, chatgpt_user_id: Option, is_workspace_account: bool, } #[derive(Clone)] struct CodexAppsToolsCacheContext { codex_home: PathBuf, user_key: CodexAppsToolsCacheKey, } impl CodexAppsToolsCacheContext { fn cache_path(&self) -> PathBuf { let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default(); let user_key_hash = sha1_hex(&user_key_json); self.codex_home .join(CODEX_APPS_TOOLS_CACHE_DIR) .join(format!("{user_key_hash}.json")) } } #[derive(Debug, Clone, Serialize, Deserialize)] struct CodexAppsToolsDiskCache { schema_version: u8, tools: Vec, } enum CachedCodexAppsToolsLoad { Hit(Vec), Missing, Invalid, } type ResponderMap = HashMap<(String, RequestId), oneshot::Sender>; fn elicitation_is_rejected_by_policy(approval_policy: AskForApproval) -> bool { match approval_policy { AskForApproval::Never => true, AskForApproval::OnFailure => false, AskForApproval::OnRequest => false, AskForApproval::UnlessTrusted => false, AskForApproval::Granular(granular_config) => !granular_config.allows_mcp_elicitations(), } } fn can_auto_accept_elicitation(elicitation: &CreateElicitationRequestParams) -> bool { match elicitation { CreateElicitationRequestParams::FormElicitationParams { requested_schema, .. } => { // Auto-accept confirm/approval elicitations without schema requirements. requested_schema.properties.is_empty() } CreateElicitationRequestParams::UrlElicitationParams { .. } => false, } } #[derive(Clone)] struct ElicitationRequestManager { requests: Arc>, approval_policy: Arc>, sandbox_policy: Arc>, } impl ElicitationRequestManager { fn new(approval_policy: AskForApproval, sandbox_policy: SandboxPolicy) -> Self { Self { requests: Arc::new(Mutex::new(HashMap::new())), approval_policy: Arc::new(StdMutex::new(approval_policy)), sandbox_policy: Arc::new(StdMutex::new(sandbox_policy)), } } async fn resolve( &self, server_name: String, id: RequestId, response: ElicitationResponse, ) -> Result<()> { self.requests .lock() .await .remove(&(server_name, id)) .ok_or_else(|| anyhow!("elicitation request not found"))? .send(response) .map_err(|e| anyhow!("failed to send elicitation response: {e:?}")) } fn make_sender(&self, server_name: String, tx_event: Sender) -> SendElicitation { let elicitation_requests = self.requests.clone(); let approval_policy = self.approval_policy.clone(); let sandbox_policy = self.sandbox_policy.clone(); Box::new(move |id, elicitation| { let elicitation_requests = elicitation_requests.clone(); let tx_event = tx_event.clone(); let server_name = server_name.clone(); let approval_policy = approval_policy.clone(); let sandbox_policy = sandbox_policy.clone(); async move { let approval_policy = approval_policy .lock() .map(|policy| *policy) .unwrap_or(AskForApproval::Never); let sandbox_policy = sandbox_policy .lock() .map(|policy| policy.clone()) .unwrap_or_else(|_| SandboxPolicy::new_read_only_policy()); if mcp_permission_prompt_is_auto_approved(approval_policy, &sandbox_policy) && can_auto_accept_elicitation(&elicitation) { return Ok(ElicitationResponse { action: ElicitationAction::Accept, content: Some(serde_json::json!({})), meta: None, }); } if elicitation_is_rejected_by_policy(approval_policy) { return Ok(ElicitationResponse { action: ElicitationAction::Decline, content: None, meta: None, }); } let request = match elicitation { CreateElicitationRequestParams::FormElicitationParams { meta, message, requested_schema, } => ElicitationRequest::Form { meta: meta .map(serde_json::to_value) .transpose() .context("failed to serialize MCP elicitation metadata")?, message, requested_schema: serde_json::to_value(requested_schema) .context("failed to serialize MCP elicitation schema")?, }, CreateElicitationRequestParams::UrlElicitationParams { meta, message, url, elicitation_id, } => ElicitationRequest::Url { meta: meta .map(serde_json::to_value) .transpose() .context("failed to serialize MCP elicitation metadata")?, message, url, elicitation_id, }, }; let (tx, rx) = oneshot::channel(); { let mut lock = elicitation_requests.lock().await; lock.insert((server_name.clone(), id.clone()), tx); } let _ = tx_event .send(Event { id: "mcp_elicitation_request".to_string(), msg: EventMsg::ElicitationRequest(ElicitationRequestEvent { turn_id: None, server_name, id: match id.clone() { rmcp::model::NumberOrString::String(value) => { ProtocolRequestId::String(value.to_string()) } rmcp::model::NumberOrString::Number(value) => { ProtocolRequestId::Integer(value) } }, request, }), }) .await; rx.await .context("elicitation request channel closed unexpectedly") } .boxed() }) } } #[derive(Clone)] struct ManagedClient { client: Arc, tools: Vec, tool_filter: ToolFilter, tool_timeout: Option, server_instructions: Option, server_supports_sandbox_state_meta_capability: bool, codex_apps_tools_cache_context: Option, } impl ManagedClient { fn listed_tools(&self) -> Vec { let total_start = Instant::now(); if let Some(cache_context) = self.codex_apps_tools_cache_context.as_ref() && let CachedCodexAppsToolsLoad::Hit(tools) = load_cached_codex_apps_tools(cache_context) { emit_duration( MCP_TOOLS_LIST_DURATION_METRIC, total_start.elapsed(), &[("cache", "hit")], ); return filter_tools(tools, &self.tool_filter); } if self.codex_apps_tools_cache_context.is_some() { emit_duration( MCP_TOOLS_LIST_DURATION_METRIC, total_start.elapsed(), &[("cache", "miss")], ); } self.tools.clone() } } #[derive(Clone)] struct AsyncManagedClient { client: Shared>>, startup_snapshot: Option>, startup_complete: Arc, tool_plugin_provenance: Arc, } impl AsyncManagedClient { // Keep this constructor flat so the startup inputs remain readable at the // single call site instead of introducing a one-off params wrapper. #[allow(clippy::too_many_arguments)] fn new( server_name: String, config: McpServerConfig, store_mode: OAuthCredentialsStoreMode, cancel_token: CancellationToken, tx_event: Sender, elicitation_requests: ElicitationRequestManager, codex_apps_tools_cache_context: Option, tool_plugin_provenance: Arc, runtime_environment: McpRuntimeEnvironment, ) -> Self { let tool_filter = ToolFilter::from_config(&config); let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot( &server_name, codex_apps_tools_cache_context.as_ref(), ) .map(|tools| filter_tools(tools, &tool_filter)); let startup_tool_filter = tool_filter; let startup_complete = Arc::new(AtomicBool::new(false)); let startup_complete_for_fut = Arc::clone(&startup_complete); let fut = async move { let outcome = async { if let Err(error) = validate_mcp_server_name(&server_name) { return Err(error.into()); } let client = Arc::new( make_rmcp_client( &server_name, config.clone(), store_mode, runtime_environment, ) .await?, ); match start_server_task( server_name, client, StartServerTaskParams { startup_timeout: config .startup_timeout_sec .or(Some(DEFAULT_STARTUP_TIMEOUT)), tool_timeout: config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT), tool_filter: startup_tool_filter, tx_event, elicitation_requests, codex_apps_tools_cache_context, }, ) .or_cancel(&cancel_token) .await { Ok(result) => result, Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled), } } .await; startup_complete_for_fut.store(true, Ordering::Release); outcome }; let client = fut.boxed().shared(); if startup_snapshot.is_some() { let startup_task = client.clone(); tokio::spawn(async move { let _ = startup_task.await; }); } Self { client, startup_snapshot, startup_complete, tool_plugin_provenance, } } async fn client(&self) -> Result { self.client.clone().await } fn startup_snapshot_while_initializing(&self) -> Option> { if !self.startup_complete.load(Ordering::Acquire) { return self.startup_snapshot.clone(); } None } async fn listed_tools(&self) -> Option> { let annotate_tools = |tools: Vec| { let mut tools = tools; for tool in &mut tools { if tool.server_name == CODEX_APPS_MCP_SERVER_NAME { tool.tool = tool_with_model_visible_input_schema(&tool.tool); } let plugin_names = match tool.connector_id.as_deref() { Some(connector_id) => self .tool_plugin_provenance .plugin_display_names_for_connector_id(connector_id), None => self .tool_plugin_provenance .plugin_display_names_for_mcp_server_name(tool.server_name.as_str()), }; tool.plugin_display_names = plugin_names.to_vec(); if plugin_names.is_empty() { continue; } let plugin_source_note = if plugin_names.len() == 1 { format!("This tool is part of plugin `{}`.", plugin_names[0]) } else { format!( "This tool is part of plugins {}.", plugin_names .iter() .map(|plugin_name| format!("`{plugin_name}`")) .collect::>() .join(", ") ) }; let description = tool .tool .description .as_deref() .map(str::trim) .unwrap_or(""); let annotated_description = if description.is_empty() { plugin_source_note } else if matches!(description.chars().last(), Some('.' | '!' | '?')) { format!("{description} {plugin_source_note}") } else { format!("{description}. {plugin_source_note}") }; tool.tool.description = Some(Cow::Owned(annotated_description)); } tools }; // Keep cache payloads raw; plugin provenance is resolved per-session at read time. let tools = if let Some(startup_tools) = self.startup_snapshot_while_initializing() { Some(startup_tools) } else { match self.client().await { Ok(client) => Some(client.listed_tools()), Err(_) => self.startup_snapshot.clone(), } }; tools.map(annotate_tools) } } /// MCP server capability indicating that Codex should include [`SandboxState`] /// in tool-call request `_meta` under this key. pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta"; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct SandboxState { pub sandbox_policy: SandboxPolicy, pub codex_linux_sandbox_exe: Option, pub sandbox_cwd: PathBuf, #[serde(default)] pub use_legacy_landlock: bool, } /// A thin wrapper around a set of running [`RmcpClient`] instances. pub struct McpConnectionManager { clients: HashMap, server_origins: HashMap, elicitation_requests: ElicitationRequestManager, } /// Runtime placement information used when starting MCP server transports. /// /// `McpConfig` describes what servers exist. This value describes where those /// servers should run for the current caller. Keep it explicit at manager /// construction time so status/snapshot paths and real sessions make the same /// local-vs-remote decision. `fallback_cwd` is not a per-server override; it is /// used only when an executor-backed stdio server omits `cwd` and the executor /// API still needs a concrete process working directory. #[derive(Clone)] pub struct McpRuntimeEnvironment { environment: Arc, fallback_cwd: PathBuf, } impl McpRuntimeEnvironment { pub fn new(environment: Arc, fallback_cwd: PathBuf) -> Self { Self { environment, fallback_cwd, } } fn environment(&self) -> Arc { Arc::clone(&self.environment) } fn fallback_cwd(&self) -> PathBuf { self.fallback_cwd.clone() } } impl McpConnectionManager { pub fn configured_servers(&self, config: &McpConfig) -> HashMap { configured_mcp_servers(config) } pub fn effective_servers( &self, config: &McpConfig, auth: Option<&CodexAuth>, ) -> HashMap { effective_mcp_servers(config, auth) } pub fn tool_plugin_provenance(&self, config: &McpConfig) -> ToolPluginProvenance { tool_plugin_provenance(config) } pub fn new_uninitialized( approval_policy: &Constrained, sandbox_policy: &Constrained, ) -> Self { Self { clients: HashMap::new(), server_origins: HashMap::new(), elicitation_requests: ElicitationRequestManager::new( approval_policy.value(), sandbox_policy.get().clone(), ), } } pub fn has_servers(&self) -> bool { !self.clients.is_empty() } pub fn server_origin(&self, server_name: &str) -> Option<&str> { self.server_origins.get(server_name).map(String::as_str) } pub fn set_approval_policy(&self, approval_policy: &Constrained) { if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() { *policy = approval_policy.value(); } } pub fn set_sandbox_policy(&self, sandbox_policy: &SandboxPolicy) { if let Ok(mut policy) = self.elicitation_requests.sandbox_policy.lock() { *policy = sandbox_policy.clone(); } } #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] pub async fn new( mcp_servers: &HashMap, store_mode: OAuthCredentialsStoreMode, auth_entries: HashMap, approval_policy: &Constrained, submit_id: String, tx_event: Sender, initial_sandbox_policy: SandboxPolicy, runtime_environment: McpRuntimeEnvironment, codex_home: PathBuf, codex_apps_tools_cache_key: CodexAppsToolsCacheKey, tool_plugin_provenance: ToolPluginProvenance, ) -> (Self, CancellationToken) { let cancel_token = CancellationToken::new(); let mut clients = HashMap::new(); let mut server_origins = HashMap::new(); let mut join_set = JoinSet::new(); let elicitation_requests = ElicitationRequestManager::new(approval_policy.value(), initial_sandbox_policy); let tool_plugin_provenance = Arc::new(tool_plugin_provenance); let startup_submit_id = submit_id.clone(); let mcp_servers = mcp_servers.clone(); for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) { if let Some(origin) = transport_origin(&cfg.transport) { server_origins.insert(server_name.clone(), origin); } let cancel_token = cancel_token.child_token(); let _ = emit_update( startup_submit_id.as_str(), &tx_event, McpStartupUpdateEvent { server: server_name.clone(), status: McpStartupStatus::Starting, }, ) .await; let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME { Some(CodexAppsToolsCacheContext { codex_home: codex_home.clone(), user_key: codex_apps_tools_cache_key.clone(), }) } else { None }; let async_managed_client = AsyncManagedClient::new( server_name.clone(), cfg, store_mode, cancel_token.clone(), tx_event.clone(), elicitation_requests.clone(), codex_apps_tools_cache_context, Arc::clone(&tool_plugin_provenance), runtime_environment.clone(), ); clients.insert(server_name.clone(), async_managed_client.clone()); let tx_event = tx_event.clone(); let submit_id = startup_submit_id.clone(); let auth_entry = auth_entries.get(&server_name).cloned(); join_set.spawn(async move { let mut outcome = async_managed_client.client().await; if cancel_token.is_cancelled() { outcome = Err(StartupOutcomeError::Cancelled); } let status = match &outcome { Ok(_) => McpStartupStatus::Ready, Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled, Err(error) => { let error_str = mcp_init_error_display( server_name.as_str(), auth_entry.as_ref(), error, ); McpStartupStatus::Failed { error: error_str } } }; let _ = emit_update( submit_id.as_str(), &tx_event, McpStartupUpdateEvent { server: server_name.clone(), status, }, ) .await; (server_name, outcome) }); } let manager = Self { clients, server_origins, elicitation_requests: elicitation_requests.clone(), }; tokio::spawn(async move { let outcomes = join_set.join_all().await; let mut summary = McpStartupCompleteEvent::default(); for (server_name, outcome) in outcomes { match outcome { Ok(_) => summary.ready.push(server_name), Err(StartupOutcomeError::Cancelled) => summary.cancelled.push(server_name), Err(StartupOutcomeError::Failed { error }) => { summary.failed.push(McpStartupFailure { server: server_name, error, }) } } } let _ = tx_event .send(Event { id: startup_submit_id, msg: EventMsg::McpStartupComplete(summary), }) .await; }); (manager, cancel_token) } async fn client_by_name(&self, name: &str) -> Result { self.clients .get(name) .ok_or_else(|| anyhow!("unknown MCP server '{name}'"))? .client() .await .context("failed to get client") } pub async fn resolve_elicitation( &self, server_name: String, id: RequestId, response: ElicitationResponse, ) -> Result<()> { self.elicitation_requests .resolve(server_name, id, response) .await } pub async fn wait_for_server_ready(&self, server_name: &str, timeout: Duration) -> bool { let Some(async_managed_client) = self.clients.get(server_name) else { return false; }; match tokio::time::timeout(timeout, async_managed_client.client()).await { Ok(Ok(_)) => true, Ok(Err(_)) | Err(_) => false, } } pub async fn required_startup_failures( &self, required_servers: &[String], ) -> Vec { let mut failures = Vec::new(); for server_name in required_servers { let Some(async_managed_client) = self.clients.get(server_name).cloned() else { failures.push(McpStartupFailure { server: server_name.clone(), error: format!("required MCP server `{server_name}` was not initialized"), }); continue; }; match async_managed_client.client().await { Ok(_) => {} Err(error) => failures.push(McpStartupFailure { server: server_name.clone(), error: startup_outcome_error_message(error), }), } } failures } /// Returns a single map that contains all tools. Each key is the /// fully-qualified name for the tool. #[instrument(level = "trace", skip_all)] pub async fn list_all_tools(&self) -> HashMap { let mut tools = Vec::new(); for managed_client in self.clients.values() { let Some(server_tools) = managed_client.listed_tools().await else { continue; }; tools.extend(server_tools); } qualify_tools(tools) } /// Force-refresh codex apps tools by bypassing the in-process cache. /// /// On success, the refreshed tools replace the cache contents and the /// latest filtered tool map is returned directly to the caller. On /// failure, the existing cache remains unchanged. pub async fn hard_refresh_codex_apps_tools_cache(&self) -> Result> { let managed_client = self .clients .get(CODEX_APPS_MCP_SERVER_NAME) .ok_or_else(|| anyhow!("unknown MCP server '{CODEX_APPS_MCP_SERVER_NAME}'"))? .client() .await .context("failed to get client")?; let list_start = Instant::now(); let fetch_start = Instant::now(); let tools = list_tools_for_client_uncached( CODEX_APPS_MCP_SERVER_NAME, &managed_client.client, managed_client.tool_timeout, managed_client.server_instructions.as_deref(), ) .await .with_context(|| { format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'") })?; emit_duration( MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC, fetch_start.elapsed(), &[], ); write_cached_codex_apps_tools_if_needed( CODEX_APPS_MCP_SERVER_NAME, managed_client.codex_apps_tools_cache_context.as_ref(), &tools, ); emit_duration( MCP_TOOLS_LIST_DURATION_METRIC, list_start.elapsed(), &[("cache", "miss")], ); let tools = filter_tools(tools, &managed_client.tool_filter) .into_iter() .map(|mut tool| { tool.tool = tool_with_model_visible_input_schema(&tool.tool); tool }); Ok(qualify_tools(tools)) } /// Returns a single map that contains all resources. Each key is the /// server name and the value is a vector of resources. pub async fn list_all_resources(&self) -> HashMap> { let mut join_set = JoinSet::new(); let clients_snapshot = &self.clients; for (server_name, async_managed_client) in clients_snapshot { let server_name = server_name.clone(); let Ok(managed_client) = async_managed_client.client().await else { continue; }; let timeout = managed_client.tool_timeout; let client = managed_client.client.clone(); join_set.spawn(async move { let mut collected: Vec = Vec::new(); let mut cursor: Option = None; loop { let params = cursor.as_ref().map(|next| PaginatedRequestParams { meta: None, cursor: Some(next.clone()), }); let response = match client.list_resources(params, timeout).await { Ok(result) => result, Err(err) => return (server_name, Err(err)), }; collected.extend(response.resources); match response.next_cursor { Some(next) => { if cursor.as_ref() == Some(&next) { return ( server_name, Err(anyhow!("resources/list returned duplicate cursor")), ); } cursor = Some(next); } None => return (server_name, Ok(collected)), } } }); } let mut aggregated: HashMap> = HashMap::new(); while let Some(join_res) = join_set.join_next().await { match join_res { Ok((server_name, Ok(resources))) => { aggregated.insert(server_name, resources); } Ok((server_name, Err(err))) => { warn!("Failed to list resources for MCP server '{server_name}': {err:#}"); } Err(err) => { warn!("Task panic when listing resources for MCP server: {err:#}"); } } } aggregated } /// Returns a single map that contains all resource templates. Each key is the /// server name and the value is a vector of resource templates. pub async fn list_all_resource_templates(&self) -> HashMap> { let mut join_set = JoinSet::new(); let clients_snapshot = &self.clients; for (server_name, async_managed_client) in clients_snapshot { let server_name_cloned = server_name.clone(); let Ok(managed_client) = async_managed_client.client().await else { continue; }; let client = managed_client.client.clone(); let timeout = managed_client.tool_timeout; join_set.spawn(async move { let mut collected: Vec = Vec::new(); let mut cursor: Option = None; loop { let params = cursor.as_ref().map(|next| PaginatedRequestParams { meta: None, cursor: Some(next.clone()), }); let response = match client.list_resource_templates(params, timeout).await { Ok(result) => result, Err(err) => return (server_name_cloned, Err(err)), }; collected.extend(response.resource_templates); match response.next_cursor { Some(next) => { if cursor.as_ref() == Some(&next) { return ( server_name_cloned, Err(anyhow!( "resources/templates/list returned duplicate cursor" )), ); } cursor = Some(next); } None => return (server_name_cloned, Ok(collected)), } } }); } let mut aggregated: HashMap> = HashMap::new(); while let Some(join_res) = join_set.join_next().await { match join_res { Ok((server_name, Ok(templates))) => { aggregated.insert(server_name, templates); } Ok((server_name, Err(err))) => { warn!( "Failed to list resource templates for MCP server '{server_name}': {err:#}" ); } Err(err) => { warn!("Task panic when listing resource templates for MCP server: {err:#}"); } } } aggregated } /// Invoke the tool indicated by the (server, tool) pair. pub async fn call_tool( &self, server: &str, tool: &str, arguments: Option, meta: Option, ) -> Result { let client = self.client_by_name(server).await?; if !client.tool_filter.allows(tool) { return Err(anyhow!( "tool '{tool}' is disabled for MCP server '{server}'" )); } let result: rmcp::model::CallToolResult = client .client .call_tool(tool.to_string(), arguments, meta, client.tool_timeout) .await .with_context(|| format!("tool call failed for `{server}/{tool}`"))?; let content = result .content .into_iter() .map(|content| { serde_json::to_value(content) .unwrap_or_else(|_| serde_json::Value::String("".to_string())) }) .collect(); Ok(CallToolResult { content, structured_content: result.structured_content, is_error: result.is_error, meta: result.meta.and_then(|meta| serde_json::to_value(meta).ok()), }) } pub async fn server_supports_sandbox_state_meta_capability( &self, server: &str, ) -> Result { Ok(self .client_by_name(server) .await? .server_supports_sandbox_state_meta_capability) } /// List resources from the specified server. pub async fn list_resources( &self, server: &str, params: Option, ) -> Result { let managed = self.client_by_name(server).await?; let timeout = managed.tool_timeout; managed .client .list_resources(params, timeout) .await .with_context(|| format!("resources/list failed for `{server}`")) } /// List resource templates from the specified server. pub async fn list_resource_templates( &self, server: &str, params: Option, ) -> Result { let managed = self.client_by_name(server).await?; let client = managed.client.clone(); let timeout = managed.tool_timeout; client .list_resource_templates(params, timeout) .await .with_context(|| format!("resources/templates/list failed for `{server}`")) } /// Read a resource from the specified server. pub async fn read_resource( &self, server: &str, params: ReadResourceRequestParams, ) -> Result { let managed = self.client_by_name(server).await?; let client = managed.client.clone(); let timeout = managed.tool_timeout; let uri = params.uri.clone(); client .read_resource(params, timeout) .await .with_context(|| format!("resources/read failed for `{server}` ({uri})")) } pub async fn resolve_tool_info(&self, tool_name: &ToolName) -> Option { let all_tools = self.list_all_tools().await; all_tools .into_values() .find(|tool| tool.canonical_tool_name() == *tool_name) } } async fn emit_update( submit_id: &str, tx_event: &Sender, update: McpStartupUpdateEvent, ) -> Result<(), async_channel::SendError> { tx_event .send(Event { id: submit_id.to_string(), msg: EventMsg::McpStartupUpdate(update), }) .await } /// A tool is allowed to be used if both are true: /// 1. enabled is None (no allowlist is set) or the tool is explicitly enabled. /// 2. The tool is not explicitly disabled. #[derive(Default, Clone)] pub(crate) struct ToolFilter { enabled: Option>, disabled: HashSet, } impl ToolFilter { fn from_config(cfg: &McpServerConfig) -> Self { let enabled = cfg .enabled_tools .as_ref() .map(|tools| tools.iter().cloned().collect::>()); let disabled = cfg .disabled_tools .as_ref() .map(|tools| tools.iter().cloned().collect::>()) .unwrap_or_default(); Self { enabled, disabled } } fn allows(&self, tool_name: &str) -> bool { if let Some(enabled) = &self.enabled && !enabled.contains(tool_name) { return false; } !self.disabled.contains(tool_name) } } fn filter_tools(tools: Vec, filter: &ToolFilter) -> Vec { tools .into_iter() .filter(|tool| filter.allows(&tool.tool.name)) .collect() } pub fn filter_non_codex_apps_mcp_tools_only( mcp_tools: &HashMap, ) -> HashMap { mcp_tools .iter() .filter(|(_, tool)| tool.server_name != CODEX_APPS_MCP_SERVER_NAME) .map(|(name, tool)| (name.clone(), tool.clone())) .collect() } fn normalize_codex_apps_tool_title( server_name: &str, connector_name: Option<&str>, value: &str, ) -> String { if server_name != CODEX_APPS_MCP_SERVER_NAME { return value.to_string(); } let Some(connector_name) = connector_name .map(str::trim) .filter(|name| !name.is_empty()) else { return value.to_string(); }; let prefix = format!("{connector_name}_"); if let Some(stripped) = value.strip_prefix(&prefix) && !stripped.is_empty() { return stripped.to_string(); } value.to_string() } fn normalize_codex_apps_callable_name( server_name: &str, tool_name: &str, connector_id: Option<&str>, connector_name: Option<&str>, ) -> String { if server_name != CODEX_APPS_MCP_SERVER_NAME { return tool_name.to_string(); } let tool_name = sanitize_name(tool_name); if let Some(connector_name) = connector_name .map(str::trim) .map(sanitize_name) .filter(|name| !name.is_empty()) && let Some(stripped) = tool_name.strip_prefix(&connector_name) && !stripped.is_empty() { return stripped.to_string(); } if let Some(connector_id) = connector_id .map(str::trim) .map(sanitize_name) .filter(|name| !name.is_empty()) && let Some(stripped) = tool_name.strip_prefix(&connector_id) && !stripped.is_empty() { return stripped.to_string(); } tool_name } fn normalize_codex_apps_callable_namespace( server_name: &str, connector_name: Option<&str>, ) -> String { if server_name == CODEX_APPS_MCP_SERVER_NAME && let Some(connector_name) = connector_name { format!( "mcp{}{}{}{}", MCP_TOOL_NAME_DELIMITER, server_name, MCP_TOOL_NAME_DELIMITER, sanitize_name(connector_name) ) } else { format!("mcp{MCP_TOOL_NAME_DELIMITER}{server_name}{MCP_TOOL_NAME_DELIMITER}") } } fn resolve_bearer_token( server_name: &str, bearer_token_env_var: Option<&str>, ) -> Result> { let Some(env_var) = bearer_token_env_var else { return Ok(None); }; match env::var(env_var) { Ok(value) => { if value.is_empty() { Err(anyhow!( "Environment variable {env_var} for MCP server '{server_name}' is empty" )) } else { Ok(Some(value)) } } Err(env::VarError::NotPresent) => Err(anyhow!( "Environment variable {env_var} for MCP server '{server_name}' is not set" )), Err(env::VarError::NotUnicode(_)) => Err(anyhow!( "Environment variable {env_var} for MCP server '{server_name}' contains invalid Unicode" )), } } #[derive(Debug, Clone, thiserror::Error)] enum StartupOutcomeError { #[error("MCP startup cancelled")] Cancelled, // We can't store the original error here because anyhow::Error doesn't implement // `Clone`. #[error("MCP startup failed: {error}")] Failed { error: String }, } impl From for StartupOutcomeError { fn from(error: anyhow::Error) -> Self { Self::Failed { error: error.to_string(), } } } fn elicitation_capability_for_server(_server_name: &str) -> Option { // https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities // indicates this should be an empty object. Some(ElicitationCapability { form: Some(FormElicitationCapability { schema_validation: None, }), url: None, }) } async fn start_server_task( server_name: String, client: Arc, params: StartServerTaskParams, ) -> Result { let StartServerTaskParams { startup_timeout, tool_timeout, tool_filter, tx_event, elicitation_requests, codex_apps_tools_cache_context, } = params; let elicitation = elicitation_capability_for_server(&server_name); let params = InitializeRequestParams { meta: None, capabilities: ClientCapabilities { experimental: None, extensions: None, roots: None, sampling: None, elicitation, tasks: None, }, client_info: Implementation { name: "codex-mcp-client".to_owned(), version: env!("CARGO_PKG_VERSION").to_owned(), title: Some("Codex".into()), description: None, icons: None, website_url: None, }, protocol_version: ProtocolVersion::V_2025_06_18, }; let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event); let initialize_result = client .initialize(params, startup_timeout, send_elicitation) .await .map_err(StartupOutcomeError::from)?; let server_supports_sandbox_state_meta_capability = initialize_result .capabilities .experimental .as_ref() .and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY)) .is_some(); let list_start = Instant::now(); let fetch_start = Instant::now(); let tools = list_tools_for_client_uncached( &server_name, &client, startup_timeout, initialize_result.instructions.as_deref(), ) .await .map_err(StartupOutcomeError::from)?; emit_duration( MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC, fetch_start.elapsed(), &[], ); write_cached_codex_apps_tools_if_needed( &server_name, codex_apps_tools_cache_context.as_ref(), &tools, ); if server_name == CODEX_APPS_MCP_SERVER_NAME { emit_duration( MCP_TOOLS_LIST_DURATION_METRIC, list_start.elapsed(), &[("cache", "miss")], ); } let tools = filter_tools(tools, &tool_filter); let managed = ManagedClient { client: Arc::clone(&client), tools, tool_timeout: Some(tool_timeout), tool_filter, server_instructions: initialize_result.instructions, server_supports_sandbox_state_meta_capability, codex_apps_tools_cache_context, }; Ok(managed) } struct StartServerTaskParams { startup_timeout: Option, // TODO: cancel_token should handle this. tool_timeout: Duration, tool_filter: ToolFilter, tx_event: Sender, elicitation_requests: ElicitationRequestManager, codex_apps_tools_cache_context: Option, } async fn make_rmcp_client( server_name: &str, config: McpServerConfig, store_mode: OAuthCredentialsStoreMode, runtime_environment: McpRuntimeEnvironment, ) -> Result { let McpServerConfig { transport, experimental_environment, .. } = config; let remote_environment = match experimental_environment.as_deref() { None | Some("local") => false, Some("remote") => true, Some(environment) => { return Err(StartupOutcomeError::from(anyhow!( "unsupported experimental_environment `{environment}` for MCP server `{server_name}`" ))); } }; match transport { McpServerTransportConfig::Stdio { command, args, env, env_vars, cwd, } => { let command_os: OsString = command.into(); let args_os: Vec = args.into_iter().map(Into::into).collect(); let env_os = env.map(|env| { env.into_iter() .map(|(key, value)| (key.into(), value.into())) .collect::>() }); let launcher = if remote_environment { let exec_environment = runtime_environment.environment(); if !exec_environment.is_remote() { return Err(StartupOutcomeError::from(anyhow!( "remote MCP server `{server_name}` requires a remote executor environment" ))); } Arc::new(ExecutorStdioServerLauncher::new( exec_environment.get_exec_backend(), runtime_environment.fallback_cwd(), )) } else { Arc::new(LocalStdioServerLauncher) as Arc }; // `RmcpClient` always sees a launched MCP stdio server. The // launcher hides whether that means a local child process or an // executor process whose stdin/stdout bytes cross the process API. RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher) .await .map_err(|err| StartupOutcomeError::from(anyhow!(err))) } McpServerTransportConfig::StreamableHttp { url, http_headers, env_http_headers, bearer_token_env_var, } => { if remote_environment { if !runtime_environment.environment().is_remote() { return Err(StartupOutcomeError::from(anyhow!( "remote MCP server `{server_name}` requires a remote executor environment" ))); } return Err(StartupOutcomeError::from(anyhow!( // Remote HTTP needs the future low-level executor // `network/request` API so reqwest runs on the executor side. // Do not fall back to local HTTP here; the config explicitly // asked for remote placement. "remote streamable HTTP MCP server `{server_name}` is not implemented yet" ))); } // Local streamable HTTP remains the existing reqwest path from // the orchestrator process. let resolved_bearer_token = match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) { Ok(token) => token, Err(error) => return Err(error.into()), }; RmcpClient::new_streamable_http_client( server_name, &url, resolved_bearer_token, http_headers, env_http_headers, store_mode, ) .await .map_err(StartupOutcomeError::from) } } } fn write_cached_codex_apps_tools_if_needed( server_name: &str, cache_context: Option<&CodexAppsToolsCacheContext>, tools: &[ToolInfo], ) { if server_name != CODEX_APPS_MCP_SERVER_NAME { return; } if let Some(cache_context) = cache_context { let cache_write_start = Instant::now(); write_cached_codex_apps_tools(cache_context, tools); emit_duration( MCP_TOOLS_CACHE_WRITE_DURATION_METRIC, cache_write_start.elapsed(), &[], ); } } fn load_startup_cached_codex_apps_tools_snapshot( server_name: &str, cache_context: Option<&CodexAppsToolsCacheContext>, ) -> Option> { if server_name != CODEX_APPS_MCP_SERVER_NAME { return None; } let cache_context = cache_context?; match load_cached_codex_apps_tools(cache_context) { CachedCodexAppsToolsLoad::Hit(tools) => Some(tools), CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None, } } #[cfg(test)] fn read_cached_codex_apps_tools( cache_context: &CodexAppsToolsCacheContext, ) -> Option> { match load_cached_codex_apps_tools(cache_context) { CachedCodexAppsToolsLoad::Hit(tools) => Some(tools), CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None, } } fn load_cached_codex_apps_tools( cache_context: &CodexAppsToolsCacheContext, ) -> CachedCodexAppsToolsLoad { let cache_path = cache_context.cache_path(); let bytes = match std::fs::read(cache_path) { Ok(bytes) => bytes, Err(err) if err.kind() == std::io::ErrorKind::NotFound => { return CachedCodexAppsToolsLoad::Missing; } Err(_) => return CachedCodexAppsToolsLoad::Invalid, }; let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) { Ok(cache) => cache, Err(_) => return CachedCodexAppsToolsLoad::Invalid, }; if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION { return CachedCodexAppsToolsLoad::Invalid; } CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools)) } fn write_cached_codex_apps_tools(cache_context: &CodexAppsToolsCacheContext, tools: &[ToolInfo]) { let cache_path = cache_context.cache_path(); if let Some(parent) = cache_path.parent() && std::fs::create_dir_all(parent).is_err() { return; } let tools = filter_disallowed_codex_apps_tools(tools.to_vec()); let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache { schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION, tools, }) else { return; }; let _ = std::fs::write(cache_path, bytes); } fn filter_disallowed_codex_apps_tools(tools: Vec) -> Vec { tools .into_iter() .filter(|tool| { tool.connector_id .as_deref() .is_none_or(is_connector_id_allowed) }) .collect() } fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) { if let Some(metrics) = codex_otel::global() { let _ = metrics.record_duration(metric, duration, tags); } } fn transport_origin(transport: &McpServerTransportConfig) -> Option { match transport { McpServerTransportConfig::StreamableHttp { url, .. } => { let parsed = Url::parse(url).ok()?; Some(parsed.origin().ascii_serialization()) } McpServerTransportConfig::Stdio { .. } => Some("stdio".to_string()), } } async fn list_tools_for_client_uncached( server_name: &str, client: &Arc, timeout: Option, server_instructions: Option<&str>, ) -> Result> { let resp = client .list_tools_with_connector_ids(/*params*/ None, timeout) .await?; let tools = resp .tools .into_iter() .map(|tool| { let callable_name = normalize_codex_apps_callable_name( server_name, &tool.tool.name, tool.connector_id.as_deref(), tool.connector_name.as_deref(), ); let callable_namespace = normalize_codex_apps_callable_namespace( server_name, tool.connector_name.as_deref(), ); let connector_name = tool.connector_name; let connector_description = tool.connector_description; let mut tool_def = tool.tool; if let Some(title) = tool_def.title.as_deref() { let normalized_title = normalize_codex_apps_tool_title(server_name, connector_name.as_deref(), title); if tool_def.title.as_deref() != Some(normalized_title.as_str()) { tool_def.title = Some(normalized_title); } } ToolInfo { server_name: server_name.to_owned(), callable_name, callable_namespace, server_instructions: server_instructions.map(str::to_string), tool: tool_def, connector_id: tool.connector_id, connector_name, plugin_display_names: Vec::new(), connector_description, } }) .collect(); if server_name == CODEX_APPS_MCP_SERVER_NAME { return Ok(filter_disallowed_codex_apps_tools(tools)); } Ok(tools) } fn validate_mcp_server_name(server_name: &str) -> Result<()> { let re = regex_lite::Regex::new(r"^[a-zA-Z0-9_-]+$")?; if !re.is_match(server_name) { return Err(anyhow!( "Invalid MCP server name '{server_name}': must match pattern {pattern}", pattern = re.as_str() )); } Ok(()) } fn mcp_init_error_display( server_name: &str, entry: Option<&McpAuthStatusEntry>, err: &StartupOutcomeError, ) -> String { if let Some(McpServerTransportConfig::StreamableHttp { url, bearer_token_env_var, http_headers, .. }) = &entry.map(|entry| &entry.config.transport) && url == "https://api.githubcopilot.com/mcp/" && bearer_token_env_var.is_none() && http_headers.as_ref().map(HashMap::is_empty).unwrap_or(true) { format!( "GitHub MCP does not support OAuth. Log in by adding a personal access token (https://github.com/settings/personal-access-tokens) to your environment and config.toml:\n[mcp_servers.{server_name}]\nbearer_token_env_var = CODEX_GITHUB_PERSONAL_ACCESS_TOKEN" ) } else if is_mcp_client_auth_required_error(err) { format!( "The {server_name} MCP server is not logged in. Run `codex mcp login {server_name}`." ) } else if is_mcp_client_startup_timeout_error(err) { let startup_timeout_secs = match entry { Some(entry) => match entry.config.startup_timeout_sec { Some(timeout) => timeout, None => DEFAULT_STARTUP_TIMEOUT, }, None => DEFAULT_STARTUP_TIMEOUT, } .as_secs(); format!( "MCP client for `{server_name}` timed out after {startup_timeout_secs} seconds. Add or adjust `startup_timeout_sec` in your config.toml:\n[mcp_servers.{server_name}]\nstartup_timeout_sec = XX" ) } else { format!("MCP client for `{server_name}` failed to start: {err:#}") } } fn is_mcp_client_auth_required_error(error: &StartupOutcomeError) -> bool { match error { StartupOutcomeError::Failed { error } => error.contains("Auth required"), _ => false, } } fn is_mcp_client_startup_timeout_error(error: &StartupOutcomeError) -> bool { match error { StartupOutcomeError::Failed { error } => { error.contains("request timed out") || error.contains("timed out handshaking with MCP server") } _ => false, } } fn startup_outcome_error_message(error: StartupOutcomeError) -> String { match error { StartupOutcomeError::Cancelled => "MCP startup cancelled".to_string(), StartupOutcomeError::Failed { error } => error, } } #[cfg(test)] mod mcp_init_error_display_tests {} #[cfg(test)] #[path = "mcp_connection_manager_tests.rs"] mod tests;