Compare commits

...

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
c21cc243d2 Split MCP connection manager module
Move McpConnectionManager and its methods into manager.rs while keeping the remaining MCP connection support code in mcp_connection.rs. Update internal imports and tests to match the new module boundary without changing behavior.

Co-authored-by: Codex <noreply@openai.com>
2026-04-27 00:49:15 +03:00
6 changed files with 708 additions and 632 deletions

View File

@@ -1,15 +1,15 @@
pub use mcp_connection_manager::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use mcp_connection_manager::McpConnectionManager;
pub use mcp_connection_manager::McpRuntimeEnvironment;
pub use mcp_connection_manager::SandboxState;
pub use mcp_connection_manager::ToolInfo;
pub use manager::McpConnectionManager;
pub use mcp_connection::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use mcp_connection::McpRuntimeEnvironment;
pub use mcp_connection::SandboxState;
pub use mcp_connection::ToolInfo;
pub use mcp::CODEX_APPS_MCP_SERVER_NAME;
pub use mcp::McpConfig;
pub use mcp::ToolPluginProvenance;
pub use mcp_connection_manager::CodexAppsToolsCacheKey;
pub use mcp_connection_manager::codex_apps_tools_cache_key;
pub use mcp_connection::CodexAppsToolsCacheKey;
pub use mcp_connection::codex_apps_tools_cache_key;
pub use mcp::configured_mcp_servers;
pub use mcp::effective_mcp_servers;
@@ -35,9 +35,10 @@ pub use mcp::should_retry_without_scopes;
pub use mcp::mcp_permission_prompt_is_auto_approved;
pub use mcp::qualified_mcp_tool_name_prefix;
pub use mcp_connection_manager::declared_openai_file_input_param_names;
pub use mcp_connection_manager::filter_non_codex_apps_mcp_tools_only;
pub use mcp_connection::declared_openai_file_input_param_names;
pub use mcp_connection::filter_non_codex_apps_mcp_tools_only;
pub(crate) mod manager;
pub(crate) mod mcp;
pub(crate) mod mcp_connection_manager;
pub(crate) mod mcp_connection;
pub(crate) mod mcp_tool_names;

View File

@@ -0,0 +1,608 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use crate::McpAuthStatusEntry;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::ToolPluginProvenance;
use crate::mcp_connection::AsyncManagedClient;
use crate::mcp_connection::CodexAppsToolsCacheContext;
use crate::mcp_connection::CodexAppsToolsCacheKey;
use crate::mcp_connection::ElicitationRequestManager;
use crate::mcp_connection::MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC;
use crate::mcp_connection::MCP_TOOLS_LIST_DURATION_METRIC;
use crate::mcp_connection::ManagedClient;
use crate::mcp_connection::McpRuntimeEnvironment;
use crate::mcp_connection::StartupOutcomeError;
use crate::mcp_connection::ToolInfo;
use crate::mcp_connection::emit_duration;
use crate::mcp_connection::emit_update;
use crate::mcp_connection::filter_tools;
use crate::mcp_connection::list_tools_for_client_uncached;
use crate::mcp_connection::mcp_init_error_display;
use crate::mcp_connection::qualify_tools;
use crate::mcp_connection::startup_outcome_error_message;
use crate::mcp_connection::tool_with_model_visible_input_schema;
use crate::mcp_connection::transport_origin;
use crate::mcp_connection::write_cached_codex_apps_tools_if_needed;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use async_channel::Sender;
use codex_config::Constrained;
use codex_config::McpServerConfig;
use codex_config::McpServerTransportConfig;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_login::CodexAuth;
use codex_protocol::ToolName;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::McpStartupCompleteEvent;
use codex_protocol::protocol::McpStartupFailure;
use codex_protocol::protocol::McpStartupStatus;
use codex_protocol::protocol::McpStartupUpdateEvent;
use codex_protocol::protocol::SandboxPolicy;
use codex_rmcp_client::ElicitationResponse;
use rmcp::model::ListResourceTemplatesResult;
use rmcp::model::ListResourcesResult;
use rmcp::model::PaginatedRequestParams;
use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use rmcp::model::RequestId;
use rmcp::model::Resource;
use rmcp::model::ResourceTemplate;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use tracing::warn;
/// A thin wrapper around a set of running [`RmcpClient`] instances.
pub struct McpConnectionManager {
clients: HashMap<String, AsyncManagedClient>,
server_origins: HashMap<String, String>,
elicitation_requests: ElicitationRequestManager,
}
impl McpConnectionManager {
pub fn new_uninitialized(
approval_policy: &Constrained<AskForApproval>,
sandbox_policy: &Constrained<SandboxPolicy>,
) -> Self {
Self {
clients: HashMap::new(),
server_origins: HashMap::new(),
elicitation_requests: ElicitationRequestManager::new(
approval_policy.value(),
sandbox_policy.get().clone(),
),
}
}
pub fn has_servers(&self) -> bool {
!self.clients.is_empty()
}
pub fn server_origin(&self, server_name: &str) -> Option<&str> {
self.server_origins.get(server_name).map(String::as_str)
}
pub fn set_approval_policy(&self, approval_policy: &Constrained<AskForApproval>) {
if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() {
*policy = approval_policy.value();
}
}
pub fn set_sandbox_policy(&self, sandbox_policy: &SandboxPolicy) {
if let Ok(mut policy) = self.elicitation_requests.sandbox_policy.lock() {
*policy = sandbox_policy.clone();
}
}
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub async fn new(
mcp_servers: &HashMap<String, McpServerConfig>,
store_mode: OAuthCredentialsStoreMode,
auth_entries: HashMap<String, McpAuthStatusEntry>,
approval_policy: &Constrained<AskForApproval>,
submit_id: String,
tx_event: Sender<Event>,
initial_sandbox_policy: SandboxPolicy,
runtime_environment: McpRuntimeEnvironment,
codex_home: PathBuf,
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
tool_plugin_provenance: ToolPluginProvenance,
auth: Option<&CodexAuth>,
) -> (Self, CancellationToken) {
let cancel_token = CancellationToken::new();
let mut clients = HashMap::new();
let mut server_origins = HashMap::new();
let mut join_set = JoinSet::new();
let elicitation_requests =
ElicitationRequestManager::new(approval_policy.value(), initial_sandbox_policy);
let tool_plugin_provenance = Arc::new(tool_plugin_provenance);
let startup_submit_id = submit_id.clone();
let codex_apps_auth_provider = auth
.filter(|auth| auth.uses_codex_backend())
.map(codex_model_provider::auth_provider_from_auth);
let mcp_servers = mcp_servers.clone();
for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) {
if let Some(origin) = transport_origin(&cfg.transport) {
server_origins.insert(server_name.clone(), origin);
}
let cancel_token = cancel_token.child_token();
let _ = emit_update(
startup_submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status: McpStartupStatus::Starting,
},
)
.await;
let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME {
Some(CodexAppsToolsCacheContext {
codex_home: codex_home.clone(),
user_key: codex_apps_tools_cache_key.clone(),
})
} else {
None
};
let uses_env_bearer_token = match &cfg.transport {
McpServerTransportConfig::StreamableHttp {
bearer_token_env_var,
..
} => bearer_token_env_var.is_some(),
McpServerTransportConfig::Stdio { .. } => false,
};
let runtime_auth_provider =
if server_name == CODEX_APPS_MCP_SERVER_NAME && !uses_env_bearer_token {
codex_apps_auth_provider.clone()
} else {
None
};
let async_managed_client = AsyncManagedClient::new(
server_name.clone(),
cfg,
store_mode,
cancel_token.clone(),
tx_event.clone(),
elicitation_requests.clone(),
codex_apps_tools_cache_context,
Arc::clone(&tool_plugin_provenance),
runtime_environment.clone(),
runtime_auth_provider,
);
clients.insert(server_name.clone(), async_managed_client.clone());
let tx_event = tx_event.clone();
let submit_id = startup_submit_id.clone();
let auth_entry = auth_entries.get(&server_name).cloned();
join_set.spawn(async move {
let mut outcome = async_managed_client.client().await;
if cancel_token.is_cancelled() {
outcome = Err(StartupOutcomeError::Cancelled);
}
let status = match &outcome {
Ok(_) => McpStartupStatus::Ready,
Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled,
Err(error) => {
let error_str = mcp_init_error_display(
server_name.as_str(),
auth_entry.as_ref(),
error,
);
McpStartupStatus::Failed { error: error_str }
}
};
let _ = emit_update(
submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status,
},
)
.await;
(server_name, outcome)
});
}
let manager = Self {
clients,
server_origins,
elicitation_requests: elicitation_requests.clone(),
};
tokio::spawn(async move {
let outcomes = join_set.join_all().await;
let mut summary = McpStartupCompleteEvent::default();
for (server_name, outcome) in outcomes {
match outcome {
Ok(_) => summary.ready.push(server_name),
Err(StartupOutcomeError::Cancelled) => summary.cancelled.push(server_name),
Err(StartupOutcomeError::Failed { error }) => {
summary.failed.push(McpStartupFailure {
server: server_name,
error,
})
}
}
}
let _ = tx_event
.send(Event {
id: startup_submit_id,
msg: EventMsg::McpStartupComplete(summary),
})
.await;
});
(manager, cancel_token)
}
pub async fn resolve_elicitation(
&self,
server_name: String,
id: RequestId,
response: ElicitationResponse,
) -> Result<()> {
self.elicitation_requests
.resolve(server_name, id, response)
.await
}
pub async fn wait_for_server_ready(&self, server_name: &str, timeout: Duration) -> bool {
let Some(async_managed_client) = self.clients.get(server_name) else {
return false;
};
match tokio::time::timeout(timeout, async_managed_client.client()).await {
Ok(Ok(_)) => true,
Ok(Err(_)) | Err(_) => false,
}
}
pub async fn required_startup_failures(
&self,
required_servers: &[String],
) -> Vec<McpStartupFailure> {
let mut failures = Vec::new();
for server_name in required_servers {
let Some(async_managed_client) = self.clients.get(server_name).cloned() else {
failures.push(McpStartupFailure {
server: server_name.clone(),
error: format!("required MCP server `{server_name}` was not initialized"),
});
continue;
};
match async_managed_client.client().await {
Ok(_) => {}
Err(error) => failures.push(McpStartupFailure {
server: server_name.clone(),
error: startup_outcome_error_message(error),
}),
}
}
failures
}
/// Returns a single map that contains all tools. Each key is the
/// fully-qualified name for the tool.
#[instrument(level = "trace", skip_all)]
pub async fn list_all_tools(&self) -> HashMap<String, ToolInfo> {
let mut tools = Vec::new();
for managed_client in self.clients.values() {
let Some(server_tools) = managed_client.listed_tools().await else {
continue;
};
tools.extend(server_tools);
}
qualify_tools(tools)
}
/// Force-refresh codex apps tools by bypassing the in-process cache.
///
/// On success, the refreshed tools replace the cache contents and the
/// latest filtered tool map is returned directly to the caller. On
/// failure, the existing cache remains unchanged.
pub async fn hard_refresh_codex_apps_tools_cache(&self) -> Result<HashMap<String, ToolInfo>> {
let managed_client = self
.clients
.get(CODEX_APPS_MCP_SERVER_NAME)
.ok_or_else(|| anyhow!("unknown MCP server '{CODEX_APPS_MCP_SERVER_NAME}'"))?
.client()
.await
.context("failed to get client")?;
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(
CODEX_APPS_MCP_SERVER_NAME,
&managed_client.client,
managed_client.tool_timeout,
managed_client.server_instructions.as_deref(),
)
.await
.with_context(|| {
format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'")
})?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
&[],
);
write_cached_codex_apps_tools_if_needed(
CODEX_APPS_MCP_SERVER_NAME,
managed_client.codex_apps_tools_cache_context.as_ref(),
&tools,
);
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
list_start.elapsed(),
&[("cache", "miss")],
);
let tools = filter_tools(tools, &managed_client.tool_filter)
.into_iter()
.map(|mut tool| {
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
tool
});
Ok(qualify_tools(tools))
}
/// Returns a single map that contains all resources. Each key is the
/// server name and the value is a vector of resources.
pub async fn list_all_resources(&self) -> HashMap<String, Vec<Resource>> {
let mut join_set = JoinSet::new();
let clients_snapshot = &self.clients;
for (server_name, async_managed_client) in clients_snapshot {
let server_name = server_name.clone();
let Ok(managed_client) = async_managed_client.client().await else {
continue;
};
let timeout = managed_client.tool_timeout;
let client = managed_client.client.clone();
join_set.spawn(async move {
let mut collected: Vec<Resource> = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
meta: None,
cursor: Some(next.clone()),
});
let response = match client.list_resources(params, timeout).await {
Ok(result) => result,
Err(err) => return (server_name, Err(err)),
};
collected.extend(response.resources);
match response.next_cursor {
Some(next) => {
if cursor.as_ref() == Some(&next) {
return (
server_name,
Err(anyhow!("resources/list returned duplicate cursor")),
);
}
cursor = Some(next);
}
None => return (server_name, Ok(collected)),
}
}
});
}
let mut aggregated: HashMap<String, Vec<Resource>> = HashMap::new();
while let Some(join_res) = join_set.join_next().await {
match join_res {
Ok((server_name, Ok(resources))) => {
aggregated.insert(server_name, resources);
}
Ok((server_name, Err(err))) => {
warn!("Failed to list resources for MCP server '{server_name}': {err:#}");
}
Err(err) => {
warn!("Task panic when listing resources for MCP server: {err:#}");
}
}
}
aggregated
}
/// Returns a single map that contains all resource templates. Each key is the
/// server name and the value is a vector of resource templates.
pub async fn list_all_resource_templates(&self) -> HashMap<String, Vec<ResourceTemplate>> {
let mut join_set = JoinSet::new();
let clients_snapshot = &self.clients;
for (server_name, async_managed_client) in clients_snapshot {
let server_name_cloned = server_name.clone();
let Ok(managed_client) = async_managed_client.client().await else {
continue;
};
let client = managed_client.client.clone();
let timeout = managed_client.tool_timeout;
join_set.spawn(async move {
let mut collected: Vec<ResourceTemplate> = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
meta: None,
cursor: Some(next.clone()),
});
let response = match client.list_resource_templates(params, timeout).await {
Ok(result) => result,
Err(err) => return (server_name_cloned, Err(err)),
};
collected.extend(response.resource_templates);
match response.next_cursor {
Some(next) => {
if cursor.as_ref() == Some(&next) {
return (
server_name_cloned,
Err(anyhow!(
"resources/templates/list returned duplicate cursor"
)),
);
}
cursor = Some(next);
}
None => return (server_name_cloned, Ok(collected)),
}
}
});
}
let mut aggregated: HashMap<String, Vec<ResourceTemplate>> = HashMap::new();
while let Some(join_res) = join_set.join_next().await {
match join_res {
Ok((server_name, Ok(templates))) => {
aggregated.insert(server_name, templates);
}
Ok((server_name, Err(err))) => {
warn!(
"Failed to list resource templates for MCP server '{server_name}': {err:#}"
);
}
Err(err) => {
warn!("Task panic when listing resource templates for MCP server: {err:#}");
}
}
}
aggregated
}
/// Invoke the tool indicated by the (server, tool) pair.
pub async fn call_tool(
&self,
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> Result<CallToolResult> {
let client = self.client_by_name(server).await?;
if !client.tool_filter.allows(tool) {
return Err(anyhow!(
"tool '{tool}' is disabled for MCP server '{server}'"
));
}
let result: rmcp::model::CallToolResult = client
.client
.call_tool(tool.to_string(), arguments, meta, client.tool_timeout)
.await
.with_context(|| format!("tool call failed for `{server}/{tool}`"))?;
let content = result
.content
.into_iter()
.map(|content| {
serde_json::to_value(content)
.unwrap_or_else(|_| serde_json::Value::String("<content>".to_string()))
})
.collect();
Ok(CallToolResult {
content,
structured_content: result.structured_content,
is_error: result.is_error,
meta: result.meta.and_then(|meta| serde_json::to_value(meta).ok()),
})
}
pub async fn server_supports_sandbox_state_meta_capability(
&self,
server: &str,
) -> Result<bool> {
Ok(self
.client_by_name(server)
.await?
.server_supports_sandbox_state_meta_capability)
}
/// List resources from the specified server.
pub async fn list_resources(
&self,
server: &str,
params: Option<PaginatedRequestParams>,
) -> Result<ListResourcesResult> {
let managed = self.client_by_name(server).await?;
let timeout = managed.tool_timeout;
managed
.client
.list_resources(params, timeout)
.await
.with_context(|| format!("resources/list failed for `{server}`"))
}
/// List resource templates from the specified server.
pub async fn list_resource_templates(
&self,
server: &str,
params: Option<PaginatedRequestParams>,
) -> Result<ListResourceTemplatesResult> {
let managed = self.client_by_name(server).await?;
let client = managed.client.clone();
let timeout = managed.tool_timeout;
client
.list_resource_templates(params, timeout)
.await
.with_context(|| format!("resources/templates/list failed for `{server}`"))
}
/// Read a resource from the specified server.
pub async fn read_resource(
&self,
server: &str,
params: ReadResourceRequestParams,
) -> Result<ReadResourceResult> {
let managed = self.client_by_name(server).await?;
let client = managed.client.clone();
let timeout = managed.tool_timeout;
let uri = params.uri.clone();
client
.read_resource(params, timeout)
.await
.with_context(|| format!("resources/read failed for `{server}` ({uri})"))
}
pub async fn resolve_tool_info(&self, tool_name: &ToolName) -> Option<ToolInfo> {
let all_tools = self.list_all_tools().await;
all_tools
.into_values()
.find(|tool| tool.canonical_tool_name() == *tool_name)
}
async fn client_by_name(&self, name: &str) -> Result<ManagedClient> {
self.clients
.get(name)
.ok_or_else(|| anyhow!("unknown MCP server '{name}'"))?
.client()
.await
.context("failed to get client")
}
}
#[cfg(test)]
#[path = "mcp_connection_manager_tests.rs"]
mod tests;

View File

@@ -34,9 +34,9 @@ use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use serde_json::Value;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::McpRuntimeEnvironment;
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
use crate::manager::McpConnectionManager;
use crate::mcp_connection::McpRuntimeEnvironment;
use crate::mcp_connection::codex_apps_tools_cache_key;
pub const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps";
const MCP_TOOL_NAME_PREFIX: &str = "mcp";

View File

@@ -1,10 +1,6 @@
//! Connection manager for Model Context Protocol (MCP) servers.
//! Connection support for Model Context Protocol (MCP) servers.
//!
//! The [`McpConnectionManager`] owns one [`codex_rmcp_client::RmcpClient`] per
//! configured server (keyed by the *server name*). It offers convenience
//! helpers to query the available tools across *all* servers and returns them
//! in a single aggregated map using the model-visible fully-qualified tool name
//! as the key.
//! This module contains shared types and helpers used by [`McpConnectionManager`].
use std::borrow::Cow;
use std::collections::HashMap;
@@ -31,7 +27,6 @@ use async_channel::Sender;
use codex_api::SharedAuthProvider;
use codex_async_utils::CancelErr;
use codex_async_utils::OrCancelExt;
use codex_config::Constrained;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_exec_server::Environment;
use codex_exec_server::HttpClient;
@@ -39,15 +34,11 @@ use codex_exec_server::ReqwestHttpClient;
use codex_protocol::ToolName;
use codex_protocol::approvals::ElicitationRequest;
use codex_protocol::approvals::ElicitationRequestEvent;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::mcp::RequestId as ProtocolRequestId;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::McpStartupCompleteEvent;
use codex_protocol::protocol::McpStartupFailure;
use codex_protocol::protocol::McpStartupStatus;
use codex_protocol::protocol::McpStartupUpdateEvent;
use codex_protocol::protocol::SandboxPolicy;
use codex_rmcp_client::ElicitationResponse;
@@ -66,15 +57,8 @@ use rmcp::model::ElicitationCapability;
use rmcp::model::FormElicitationCapability;
use rmcp::model::Implementation;
use rmcp::model::InitializeRequestParams;
use rmcp::model::ListResourceTemplatesResult;
use rmcp::model::ListResourcesResult;
use rmcp::model::PaginatedRequestParams;
use rmcp::model::ProtocolVersion;
use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use rmcp::model::RequestId;
use rmcp::model::Resource;
use rmcp::model::ResourceTemplate;
use rmcp::model::Tool;
use serde::Deserialize;
@@ -85,10 +69,7 @@ use sha1::Digest;
use sha1::Sha1;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use tracing::warn;
use url::Url;
use codex_config::McpServerConfig;
@@ -106,10 +87,11 @@ const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(30);
/// Default timeout for individual tool calls.
const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(120);
const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 2;
pub(crate) const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 2;
const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools";
const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str = "codex.mcp.tools.fetch_uncached.duration_ms";
pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str =
"codex.mcp.tools.fetch_uncached.duration_ms";
const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str = "codex.mcp.tools.cache_write.duration_ms";
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -159,9 +141,9 @@ pub fn declared_openai_file_input_param_names(
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CodexAppsToolsCacheKey {
account_id: Option<String>,
chatgpt_user_id: Option<String>,
is_workspace_account: bool,
pub(crate) account_id: Option<String>,
pub(crate) chatgpt_user_id: Option<String>,
pub(crate) is_workspace_account: bool,
}
pub fn codex_apps_tools_cache_key(auth: Option<&CodexAuth>) -> CodexAppsToolsCacheKey {
@@ -198,13 +180,6 @@ pub struct SandboxState {
pub use_legacy_landlock: bool,
}
/// A thin wrapper around a set of running [`RmcpClient`] instances.
pub struct McpConnectionManager {
clients: HashMap<String, AsyncManagedClient>,
server_origins: HashMap<String, String>,
elicitation_requests: ElicitationRequestManager,
}
/// Runtime placement information used when starting MCP server transports.
///
/// `McpConfig` describes what servers exist. This value describes where those
@@ -241,8 +216,8 @@ impl McpRuntimeEnvironment {
/// 2. The tool is not explicitly disabled.
#[derive(Default, Clone)]
pub(crate) struct ToolFilter {
enabled: Option<HashSet<String>>,
disabled: HashSet<String>,
pub(crate) enabled: Option<HashSet<String>>,
pub(crate) disabled: HashSet<String>,
}
impl ToolFilter {
@@ -260,7 +235,7 @@ impl ToolFilter {
Self { enabled, disabled }
}
fn allows(&self, tool_name: &str) -> bool {
pub(crate) fn allows(&self, tool_name: &str) -> bool {
if let Some(enabled) = &self.enabled
&& !enabled.contains(tool_name)
{
@@ -279,13 +254,13 @@ fn sha1_hex(s: &str) -> String {
}
#[derive(Clone)]
struct CodexAppsToolsCacheContext {
codex_home: PathBuf,
user_key: CodexAppsToolsCacheKey,
pub(crate) struct CodexAppsToolsCacheContext {
pub(crate) codex_home: PathBuf,
pub(crate) user_key: CodexAppsToolsCacheKey,
}
impl CodexAppsToolsCacheContext {
fn cache_path(&self) -> PathBuf {
pub(crate) fn cache_path(&self) -> PathBuf {
let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default();
let user_key_hash = sha1_hex(&user_key_json);
self.codex_home
@@ -308,7 +283,7 @@ enum CachedCodexAppsToolsLoad {
type ResponderMap = HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>;
fn elicitation_is_rejected_by_policy(approval_policy: AskForApproval) -> bool {
pub(crate) fn elicitation_is_rejected_by_policy(approval_policy: AskForApproval) -> bool {
match approval_policy {
AskForApproval::Never => true,
AskForApproval::OnFailure => false,
@@ -331,14 +306,14 @@ fn can_auto_accept_elicitation(elicitation: &CreateElicitationRequestParams) ->
}
#[derive(Clone)]
struct ElicitationRequestManager {
pub(crate) struct ElicitationRequestManager {
requests: Arc<Mutex<ResponderMap>>,
approval_policy: Arc<StdMutex<AskForApproval>>,
sandbox_policy: Arc<StdMutex<SandboxPolicy>>,
pub(crate) approval_policy: Arc<StdMutex<AskForApproval>>,
pub(crate) sandbox_policy: Arc<StdMutex<SandboxPolicy>>,
}
impl ElicitationRequestManager {
fn new(approval_policy: AskForApproval, sandbox_policy: SandboxPolicy) -> Self {
pub(crate) fn new(approval_policy: AskForApproval, sandbox_policy: SandboxPolicy) -> Self {
Self {
requests: Arc::new(Mutex::new(HashMap::new())),
approval_policy: Arc::new(StdMutex::new(approval_policy)),
@@ -346,7 +321,7 @@ impl ElicitationRequestManager {
}
}
async fn resolve(
pub(crate) async fn resolve(
&self,
server_name: String,
id: RequestId,
@@ -361,7 +336,11 @@ impl ElicitationRequestManager {
.map_err(|e| anyhow!("failed to send elicitation response: {e:?}"))
}
fn make_sender(&self, server_name: String, tx_event: Sender<Event>) -> SendElicitation {
pub(crate) fn make_sender(
&self,
server_name: String,
tx_event: Sender<Event>,
) -> SendElicitation {
let elicitation_requests = self.requests.clone();
let approval_policy = self.approval_policy.clone();
let sandbox_policy = self.sandbox_policy.clone();
@@ -459,14 +438,14 @@ impl ElicitationRequestManager {
}
#[derive(Clone)]
struct ManagedClient {
client: Arc<RmcpClient>,
tools: Vec<ToolInfo>,
tool_filter: ToolFilter,
tool_timeout: Option<Duration>,
server_instructions: Option<String>,
server_supports_sandbox_state_meta_capability: bool,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
pub(crate) struct ManagedClient {
pub(crate) client: Arc<RmcpClient>,
pub(crate) tools: Vec<ToolInfo>,
pub(crate) tool_filter: ToolFilter,
pub(crate) tool_timeout: Option<Duration>,
pub(crate) server_instructions: Option<String>,
pub(crate) server_supports_sandbox_state_meta_capability: bool,
pub(crate) codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
impl ManagedClient {
@@ -497,18 +476,18 @@ impl ManagedClient {
}
#[derive(Clone)]
struct AsyncManagedClient {
client: Shared<BoxFuture<'static, Result<ManagedClient, StartupOutcomeError>>>,
startup_snapshot: Option<Vec<ToolInfo>>,
startup_complete: Arc<AtomicBool>,
tool_plugin_provenance: Arc<ToolPluginProvenance>,
pub(crate) struct AsyncManagedClient {
pub(crate) client: Shared<BoxFuture<'static, Result<ManagedClient, StartupOutcomeError>>>,
pub(crate) startup_snapshot: Option<Vec<ToolInfo>>,
pub(crate) startup_complete: Arc<AtomicBool>,
pub(crate) tool_plugin_provenance: Arc<ToolPluginProvenance>,
}
impl AsyncManagedClient {
// Keep this constructor flat so the startup inputs remain readable at the
// single call site instead of introducing a one-off params wrapper.
#[allow(clippy::too_many_arguments)]
fn new(
pub(crate) fn new(
server_name: String,
config: McpServerConfig,
store_mode: OAuthCredentialsStoreMode,
@@ -587,7 +566,7 @@ impl AsyncManagedClient {
}
}
async fn client(&self) -> Result<ManagedClient, StartupOutcomeError> {
pub(crate) async fn client(&self) -> Result<ManagedClient, StartupOutcomeError> {
self.client.clone().await
}
@@ -598,7 +577,7 @@ impl AsyncManagedClient {
None
}
async fn listed_tools(&self) -> Option<Vec<ToolInfo>> {
pub(crate) async fn listed_tools(&self) -> Option<Vec<ToolInfo>> {
let annotate_tools = |tools: Vec<ToolInfo>| {
let mut tools = tools;
for tool in &mut tools {
@@ -663,548 +642,12 @@ impl AsyncManagedClient {
}
}
impl McpConnectionManager {
pub fn new_uninitialized(
approval_policy: &Constrained<AskForApproval>,
sandbox_policy: &Constrained<SandboxPolicy>,
) -> Self {
Self {
clients: HashMap::new(),
server_origins: HashMap::new(),
elicitation_requests: ElicitationRequestManager::new(
approval_policy.value(),
sandbox_policy.get().clone(),
),
}
}
pub fn has_servers(&self) -> bool {
!self.clients.is_empty()
}
pub fn server_origin(&self, server_name: &str) -> Option<&str> {
self.server_origins.get(server_name).map(String::as_str)
}
pub fn set_approval_policy(&self, approval_policy: &Constrained<AskForApproval>) {
if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() {
*policy = approval_policy.value();
}
}
pub fn set_sandbox_policy(&self, sandbox_policy: &SandboxPolicy) {
if let Ok(mut policy) = self.elicitation_requests.sandbox_policy.lock() {
*policy = sandbox_policy.clone();
}
}
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub async fn new(
mcp_servers: &HashMap<String, McpServerConfig>,
store_mode: OAuthCredentialsStoreMode,
auth_entries: HashMap<String, McpAuthStatusEntry>,
approval_policy: &Constrained<AskForApproval>,
submit_id: String,
tx_event: Sender<Event>,
initial_sandbox_policy: SandboxPolicy,
runtime_environment: McpRuntimeEnvironment,
codex_home: PathBuf,
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
tool_plugin_provenance: ToolPluginProvenance,
auth: Option<&CodexAuth>,
) -> (Self, CancellationToken) {
let cancel_token = CancellationToken::new();
let mut clients = HashMap::new();
let mut server_origins = HashMap::new();
let mut join_set = JoinSet::new();
let elicitation_requests =
ElicitationRequestManager::new(approval_policy.value(), initial_sandbox_policy);
let tool_plugin_provenance = Arc::new(tool_plugin_provenance);
let startup_submit_id = submit_id.clone();
let codex_apps_auth_provider = auth
.filter(|auth| auth.uses_codex_backend())
.map(codex_model_provider::auth_provider_from_auth);
let mcp_servers = mcp_servers.clone();
for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) {
if let Some(origin) = transport_origin(&cfg.transport) {
server_origins.insert(server_name.clone(), origin);
}
let cancel_token = cancel_token.child_token();
let _ = emit_update(
startup_submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status: McpStartupStatus::Starting,
},
)
.await;
let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME {
Some(CodexAppsToolsCacheContext {
codex_home: codex_home.clone(),
user_key: codex_apps_tools_cache_key.clone(),
})
} else {
None
};
let uses_env_bearer_token = match &cfg.transport {
McpServerTransportConfig::StreamableHttp {
bearer_token_env_var,
..
} => bearer_token_env_var.is_some(),
McpServerTransportConfig::Stdio { .. } => false,
};
let runtime_auth_provider =
if server_name == CODEX_APPS_MCP_SERVER_NAME && !uses_env_bearer_token {
codex_apps_auth_provider.clone()
} else {
None
};
let async_managed_client = AsyncManagedClient::new(
server_name.clone(),
cfg,
store_mode,
cancel_token.clone(),
tx_event.clone(),
elicitation_requests.clone(),
codex_apps_tools_cache_context,
Arc::clone(&tool_plugin_provenance),
runtime_environment.clone(),
runtime_auth_provider,
);
clients.insert(server_name.clone(), async_managed_client.clone());
let tx_event = tx_event.clone();
let submit_id = startup_submit_id.clone();
let auth_entry = auth_entries.get(&server_name).cloned();
join_set.spawn(async move {
let mut outcome = async_managed_client.client().await;
if cancel_token.is_cancelled() {
outcome = Err(StartupOutcomeError::Cancelled);
}
let status = match &outcome {
Ok(_) => McpStartupStatus::Ready,
Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled,
Err(error) => {
let error_str = mcp_init_error_display(
server_name.as_str(),
auth_entry.as_ref(),
error,
);
McpStartupStatus::Failed { error: error_str }
}
};
let _ = emit_update(
submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status,
},
)
.await;
(server_name, outcome)
});
}
let manager = Self {
clients,
server_origins,
elicitation_requests: elicitation_requests.clone(),
};
tokio::spawn(async move {
let outcomes = join_set.join_all().await;
let mut summary = McpStartupCompleteEvent::default();
for (server_name, outcome) in outcomes {
match outcome {
Ok(_) => summary.ready.push(server_name),
Err(StartupOutcomeError::Cancelled) => summary.cancelled.push(server_name),
Err(StartupOutcomeError::Failed { error }) => {
summary.failed.push(McpStartupFailure {
server: server_name,
error,
})
}
}
}
let _ = tx_event
.send(Event {
id: startup_submit_id,
msg: EventMsg::McpStartupComplete(summary),
})
.await;
});
(manager, cancel_token)
}
pub async fn resolve_elicitation(
&self,
server_name: String,
id: RequestId,
response: ElicitationResponse,
) -> Result<()> {
self.elicitation_requests
.resolve(server_name, id, response)
.await
}
pub async fn wait_for_server_ready(&self, server_name: &str, timeout: Duration) -> bool {
let Some(async_managed_client) = self.clients.get(server_name) else {
return false;
};
match tokio::time::timeout(timeout, async_managed_client.client()).await {
Ok(Ok(_)) => true,
Ok(Err(_)) | Err(_) => false,
}
}
pub async fn required_startup_failures(
&self,
required_servers: &[String],
) -> Vec<McpStartupFailure> {
let mut failures = Vec::new();
for server_name in required_servers {
let Some(async_managed_client) = self.clients.get(server_name).cloned() else {
failures.push(McpStartupFailure {
server: server_name.clone(),
error: format!("required MCP server `{server_name}` was not initialized"),
});
continue;
};
match async_managed_client.client().await {
Ok(_) => {}
Err(error) => failures.push(McpStartupFailure {
server: server_name.clone(),
error: startup_outcome_error_message(error),
}),
}
}
failures
}
/// Returns a single map that contains all tools. Each key is the
/// fully-qualified name for the tool.
#[instrument(level = "trace", skip_all)]
pub async fn list_all_tools(&self) -> HashMap<String, ToolInfo> {
let mut tools = Vec::new();
for managed_client in self.clients.values() {
let Some(server_tools) = managed_client.listed_tools().await else {
continue;
};
tools.extend(server_tools);
}
qualify_tools(tools)
}
/// Force-refresh codex apps tools by bypassing the in-process cache.
///
/// On success, the refreshed tools replace the cache contents and the
/// latest filtered tool map is returned directly to the caller. On
/// failure, the existing cache remains unchanged.
pub async fn hard_refresh_codex_apps_tools_cache(&self) -> Result<HashMap<String, ToolInfo>> {
let managed_client = self
.clients
.get(CODEX_APPS_MCP_SERVER_NAME)
.ok_or_else(|| anyhow!("unknown MCP server '{CODEX_APPS_MCP_SERVER_NAME}'"))?
.client()
.await
.context("failed to get client")?;
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(
CODEX_APPS_MCP_SERVER_NAME,
&managed_client.client,
managed_client.tool_timeout,
managed_client.server_instructions.as_deref(),
)
.await
.with_context(|| {
format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'")
})?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
&[],
);
write_cached_codex_apps_tools_if_needed(
CODEX_APPS_MCP_SERVER_NAME,
managed_client.codex_apps_tools_cache_context.as_ref(),
&tools,
);
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
list_start.elapsed(),
&[("cache", "miss")],
);
let tools = filter_tools(tools, &managed_client.tool_filter)
.into_iter()
.map(|mut tool| {
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
tool
});
Ok(qualify_tools(tools))
}
/// Returns a single map that contains all resources. Each key is the
/// server name and the value is a vector of resources.
pub async fn list_all_resources(&self) -> HashMap<String, Vec<Resource>> {
let mut join_set = JoinSet::new();
let clients_snapshot = &self.clients;
for (server_name, async_managed_client) in clients_snapshot {
let server_name = server_name.clone();
let Ok(managed_client) = async_managed_client.client().await else {
continue;
};
let timeout = managed_client.tool_timeout;
let client = managed_client.client.clone();
join_set.spawn(async move {
let mut collected: Vec<Resource> = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
meta: None,
cursor: Some(next.clone()),
});
let response = match client.list_resources(params, timeout).await {
Ok(result) => result,
Err(err) => return (server_name, Err(err)),
};
collected.extend(response.resources);
match response.next_cursor {
Some(next) => {
if cursor.as_ref() == Some(&next) {
return (
server_name,
Err(anyhow!("resources/list returned duplicate cursor")),
);
}
cursor = Some(next);
}
None => return (server_name, Ok(collected)),
}
}
});
}
let mut aggregated: HashMap<String, Vec<Resource>> = HashMap::new();
while let Some(join_res) = join_set.join_next().await {
match join_res {
Ok((server_name, Ok(resources))) => {
aggregated.insert(server_name, resources);
}
Ok((server_name, Err(err))) => {
warn!("Failed to list resources for MCP server '{server_name}': {err:#}");
}
Err(err) => {
warn!("Task panic when listing resources for MCP server: {err:#}");
}
}
}
aggregated
}
/// Returns a single map that contains all resource templates. Each key is the
/// server name and the value is a vector of resource templates.
pub async fn list_all_resource_templates(&self) -> HashMap<String, Vec<ResourceTemplate>> {
let mut join_set = JoinSet::new();
let clients_snapshot = &self.clients;
for (server_name, async_managed_client) in clients_snapshot {
let server_name_cloned = server_name.clone();
let Ok(managed_client) = async_managed_client.client().await else {
continue;
};
let client = managed_client.client.clone();
let timeout = managed_client.tool_timeout;
join_set.spawn(async move {
let mut collected: Vec<ResourceTemplate> = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
meta: None,
cursor: Some(next.clone()),
});
let response = match client.list_resource_templates(params, timeout).await {
Ok(result) => result,
Err(err) => return (server_name_cloned, Err(err)),
};
collected.extend(response.resource_templates);
match response.next_cursor {
Some(next) => {
if cursor.as_ref() == Some(&next) {
return (
server_name_cloned,
Err(anyhow!(
"resources/templates/list returned duplicate cursor"
)),
);
}
cursor = Some(next);
}
None => return (server_name_cloned, Ok(collected)),
}
}
});
}
let mut aggregated: HashMap<String, Vec<ResourceTemplate>> = HashMap::new();
while let Some(join_res) = join_set.join_next().await {
match join_res {
Ok((server_name, Ok(templates))) => {
aggregated.insert(server_name, templates);
}
Ok((server_name, Err(err))) => {
warn!(
"Failed to list resource templates for MCP server '{server_name}': {err:#}"
);
}
Err(err) => {
warn!("Task panic when listing resource templates for MCP server: {err:#}");
}
}
}
aggregated
}
/// Invoke the tool indicated by the (server, tool) pair.
pub async fn call_tool(
&self,
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> Result<CallToolResult> {
let client = self.client_by_name(server).await?;
if !client.tool_filter.allows(tool) {
return Err(anyhow!(
"tool '{tool}' is disabled for MCP server '{server}'"
));
}
let result: rmcp::model::CallToolResult = client
.client
.call_tool(tool.to_string(), arguments, meta, client.tool_timeout)
.await
.with_context(|| format!("tool call failed for `{server}/{tool}`"))?;
let content = result
.content
.into_iter()
.map(|content| {
serde_json::to_value(content)
.unwrap_or_else(|_| serde_json::Value::String("<content>".to_string()))
})
.collect();
Ok(CallToolResult {
content,
structured_content: result.structured_content,
is_error: result.is_error,
meta: result.meta.and_then(|meta| serde_json::to_value(meta).ok()),
})
}
pub async fn server_supports_sandbox_state_meta_capability(
&self,
server: &str,
) -> Result<bool> {
Ok(self
.client_by_name(server)
.await?
.server_supports_sandbox_state_meta_capability)
}
/// List resources from the specified server.
pub async fn list_resources(
&self,
server: &str,
params: Option<PaginatedRequestParams>,
) -> Result<ListResourcesResult> {
let managed = self.client_by_name(server).await?;
let timeout = managed.tool_timeout;
managed
.client
.list_resources(params, timeout)
.await
.with_context(|| format!("resources/list failed for `{server}`"))
}
/// List resource templates from the specified server.
pub async fn list_resource_templates(
&self,
server: &str,
params: Option<PaginatedRequestParams>,
) -> Result<ListResourceTemplatesResult> {
let managed = self.client_by_name(server).await?;
let client = managed.client.clone();
let timeout = managed.tool_timeout;
client
.list_resource_templates(params, timeout)
.await
.with_context(|| format!("resources/templates/list failed for `{server}`"))
}
/// Read a resource from the specified server.
pub async fn read_resource(
&self,
server: &str,
params: ReadResourceRequestParams,
) -> Result<ReadResourceResult> {
let managed = self.client_by_name(server).await?;
let client = managed.client.clone();
let timeout = managed.tool_timeout;
let uri = params.uri.clone();
client
.read_resource(params, timeout)
.await
.with_context(|| format!("resources/read failed for `{server}` ({uri})"))
}
pub async fn resolve_tool_info(&self, tool_name: &ToolName) -> Option<ToolInfo> {
let all_tools = self.list_all_tools().await;
all_tools
.into_values()
.find(|tool| tool.canonical_tool_name() == *tool_name)
}
async fn client_by_name(&self, name: &str) -> Result<ManagedClient> {
self.clients
.get(name)
.ok_or_else(|| anyhow!("unknown MCP server '{name}'"))?
.client()
.await
.context("failed to get client")
}
}
const META_OPENAI_FILE_PARAMS: &str = "openai/fileParams";
/// Returns the model-visible view of a tool while preserving the raw metadata
/// used by execution. Keep cache entries raw and call this at manager return
/// boundaries.
fn tool_with_model_visible_input_schema(tool: &Tool) -> Tool {
pub(crate) fn tool_with_model_visible_input_schema(tool: &Tool) -> Tool {
let file_params = declared_openai_file_input_param_names(tool.meta.as_deref());
if file_params.is_empty() {
return tool.clone();
@@ -1265,7 +708,7 @@ fn mask_input_property_schema(schema: &mut JsonValue) {
}
}
async fn emit_update(
pub(crate) async fn emit_update(
submit_id: &str,
tx_event: &Sender<Event>,
update: McpStartupUpdateEvent,
@@ -1278,7 +721,7 @@ async fn emit_update(
.await
}
fn filter_tools(tools: Vec<ToolInfo>, filter: &ToolFilter) -> Vec<ToolInfo> {
pub(crate) fn filter_tools(tools: Vec<ToolInfo>, filter: &ToolFilter) -> Vec<ToolInfo> {
tools
.into_iter()
.filter(|tool| filter.allows(&tool.tool.name))
@@ -1393,7 +836,7 @@ fn resolve_bearer_token(
}
#[derive(Debug, Clone, thiserror::Error)]
enum StartupOutcomeError {
pub(crate) enum StartupOutcomeError {
#[error("MCP startup cancelled")]
Cancelled,
// We can't store the original error here because anyhow::Error doesn't implement
@@ -1410,7 +853,9 @@ impl From<anyhow::Error> for StartupOutcomeError {
}
}
fn elicitation_capability_for_server(_server_name: &str) -> Option<ElicitationCapability> {
pub(crate) fn elicitation_capability_for_server(
_server_name: &str,
) -> Option<ElicitationCapability> {
// https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities
// indicates this should be an empty object.
Some(ElicitationCapability {
@@ -1614,7 +1059,7 @@ async fn make_rmcp_client(
}
}
fn write_cached_codex_apps_tools_if_needed(
pub(crate) fn write_cached_codex_apps_tools_if_needed(
server_name: &str,
cache_context: Option<&CodexAppsToolsCacheContext>,
tools: &[ToolInfo],
@@ -1634,7 +1079,7 @@ fn write_cached_codex_apps_tools_if_needed(
}
}
fn load_startup_cached_codex_apps_tools_snapshot(
pub(crate) fn load_startup_cached_codex_apps_tools_snapshot(
server_name: &str,
cache_context: Option<&CodexAppsToolsCacheContext>,
) -> Option<Vec<ToolInfo>> {
@@ -1651,7 +1096,7 @@ fn load_startup_cached_codex_apps_tools_snapshot(
}
#[cfg(test)]
fn read_cached_codex_apps_tools(
pub(crate) fn read_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
) -> Option<Vec<ToolInfo>> {
match load_cached_codex_apps_tools(cache_context) {
@@ -1681,7 +1126,10 @@ fn load_cached_codex_apps_tools(
CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools))
}
fn write_cached_codex_apps_tools(cache_context: &CodexAppsToolsCacheContext, tools: &[ToolInfo]) {
pub(crate) fn write_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
tools: &[ToolInfo],
) {
let cache_path = cache_context.cache_path();
if let Some(parent) = cache_path.parent()
&& std::fs::create_dir_all(parent).is_err()
@@ -1709,13 +1157,13 @@ fn filter_disallowed_codex_apps_tools(tools: Vec<ToolInfo>) -> Vec<ToolInfo> {
.collect()
}
fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) {
pub(crate) fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) {
if let Some(metrics) = codex_otel::global() {
let _ = metrics.record_duration(metric, duration, tags);
}
}
fn transport_origin(transport: &McpServerTransportConfig) -> Option<String> {
pub(crate) fn transport_origin(transport: &McpServerTransportConfig) -> Option<String> {
match transport {
McpServerTransportConfig::StreamableHttp { url, .. } => {
let parsed = Url::parse(url).ok()?;
@@ -1725,7 +1173,7 @@ fn transport_origin(transport: &McpServerTransportConfig) -> Option<String> {
}
}
async fn list_tools_for_client_uncached(
pub(crate) async fn list_tools_for_client_uncached(
server_name: &str,
client: &Arc<RmcpClient>,
timeout: Option<Duration>,
@@ -1788,7 +1236,7 @@ fn validate_mcp_server_name(server_name: &str) -> Result<()> {
Ok(())
}
fn mcp_init_error_display(
pub(crate) fn mcp_init_error_display(
server_name: &str,
entry: Option<&McpAuthStatusEntry>,
err: &StartupOutcomeError,
@@ -1844,7 +1292,7 @@ fn is_mcp_client_startup_timeout_error(error: &StartupOutcomeError) -> bool {
}
}
fn startup_outcome_error_message(error: StartupOutcomeError) -> String {
pub(crate) fn startup_outcome_error_message(error: StartupOutcomeError) -> String {
match error {
StartupOutcomeError::Cancelled => "MCP startup cancelled".to_string(),
StartupOutcomeError::Failed { error } => error,
@@ -1853,7 +1301,3 @@ fn startup_outcome_error_message(error: StartupOutcomeError) -> String {
#[cfg(test)]
mod mcp_init_error_display_tests {}
#[cfg(test)]
#[path = "mcp_connection_manager_tests.rs"]
mod tests;

View File

@@ -1,11 +1,34 @@
use super::*;
use crate::declared_openai_file_input_param_names;
use crate::mcp_connection::CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION;
use crate::mcp_connection::CodexAppsToolsCacheContext;
use crate::mcp_connection::ElicitationRequestManager;
use crate::mcp_connection::ManagedClient;
use crate::mcp_connection::StartupOutcomeError;
use crate::mcp_connection::ToolFilter;
use crate::mcp_connection::ToolInfo;
use crate::mcp_connection::elicitation_capability_for_server;
use crate::mcp_connection::elicitation_is_rejected_by_policy;
use crate::mcp_connection::filter_tools;
use crate::mcp_connection::load_startup_cached_codex_apps_tools_snapshot;
use crate::mcp_connection::read_cached_codex_apps_tools;
use crate::mcp_connection::tool_with_model_visible_input_schema;
use crate::mcp_connection::transport_origin;
use crate::mcp_connection::write_cached_codex_apps_tools;
use codex_config::Constrained;
use codex_protocol::ToolName;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::McpAuthStatus;
use futures::FutureExt;
use pretty_assertions::assert_eq;
use rmcp::model::CreateElicitationRequestParams;
use rmcp::model::ElicitationAction;
use rmcp::model::ElicitationCapability;
use rmcp::model::FormElicitationCapability;
use rmcp::model::JsonObject;
use rmcp::model::Meta;
use rmcp::model::NumberOrString;
use rmcp::model::Tool;
use std::collections::HashSet;
use std::sync::Arc;
use tempfile::tempdir;

View File

@@ -8,7 +8,7 @@ use sha1::Sha1;
use tracing::warn;
use crate::mcp::sanitize_responses_api_tool_name;
use crate::mcp_connection_manager::ToolInfo;
use crate::mcp_connection::ToolInfo;
const MCP_TOOL_NAME_DELIMITER: &str = "__";
const MAX_TOOL_NAME_LENGTH: usize = 64;