mirror of
https://github.com/openai/codex.git
synced 2026-05-01 18:06:47 +00:00
Compare commits
3 Commits
automation
...
split-mcp-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9f0d43288 | ||
|
|
b95c8b997a | ||
|
|
c21cc243d2 |
544
codex-rs/codex-mcp/src/client.rs
Normal file
544
codex-rs/codex-mcp/src/client.rs
Normal file
@@ -0,0 +1,544 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::ffi::OsString;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::elicitation::ElicitationRequestManager;
|
||||
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use crate::mcp::ToolPluginProvenance;
|
||||
use crate::mcp_connection::CachedCodexAppsToolsLoad;
|
||||
use crate::mcp_connection::CodexAppsToolsCacheContext;
|
||||
use crate::mcp_connection::DEFAULT_STARTUP_TIMEOUT;
|
||||
use crate::mcp_connection::DEFAULT_TOOL_TIMEOUT;
|
||||
use crate::mcp_connection::MCP_SANDBOX_STATE_META_CAPABILITY;
|
||||
use crate::mcp_connection::McpRuntimeEnvironment;
|
||||
use crate::mcp_connection::ToolFilter;
|
||||
use crate::mcp_connection::ToolInfo;
|
||||
use crate::mcp_connection::emit_duration;
|
||||
use crate::mcp_connection::filter_disallowed_codex_apps_tools;
|
||||
use crate::mcp_connection::filter_tools;
|
||||
use crate::mcp_connection::load_cached_codex_apps_tools;
|
||||
use crate::mcp_connection::load_startup_cached_codex_apps_tools_snapshot;
|
||||
use crate::mcp_connection::normalize_codex_apps_callable_name;
|
||||
use crate::mcp_connection::normalize_codex_apps_callable_namespace;
|
||||
use crate::mcp_connection::normalize_codex_apps_tool_title;
|
||||
use crate::mcp_connection::resolve_bearer_token;
|
||||
use crate::mcp_connection::tool_with_model_visible_input_schema;
|
||||
use crate::mcp_connection::validate_mcp_server_name;
|
||||
use crate::mcp_connection::write_cached_codex_apps_tools_if_needed;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use async_channel::Sender;
|
||||
use codex_api::SharedAuthProvider;
|
||||
use codex_async_utils::CancelErr;
|
||||
use codex_async_utils::OrCancelExt;
|
||||
use codex_config::McpServerConfig;
|
||||
use codex_config::McpServerTransportConfig;
|
||||
use codex_config::types::OAuthCredentialsStoreMode;
|
||||
use codex_exec_server::HttpClient;
|
||||
use codex_exec_server::ReqwestHttpClient;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_rmcp_client::ExecutorStdioServerLauncher;
|
||||
use codex_rmcp_client::LocalStdioServerLauncher;
|
||||
use codex_rmcp_client::RmcpClient;
|
||||
use codex_rmcp_client::StdioServerLauncher;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::future::FutureExt;
|
||||
use futures::future::Shared;
|
||||
use rmcp::model::ClientCapabilities;
|
||||
use rmcp::model::ElicitationCapability;
|
||||
use rmcp::model::FormElicitationCapability;
|
||||
use rmcp::model::Implementation;
|
||||
use rmcp::model::InitializeRequestParams;
|
||||
use rmcp::model::ProtocolVersion;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
|
||||
pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str =
|
||||
"codex.mcp.tools.fetch_uncached.duration_ms";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ManagedClient {
|
||||
pub(crate) client: Arc<RmcpClient>,
|
||||
pub(crate) tools: Vec<ToolInfo>,
|
||||
pub(crate) tool_filter: ToolFilter,
|
||||
pub(crate) tool_timeout: Option<Duration>,
|
||||
pub(crate) server_instructions: Option<String>,
|
||||
pub(crate) server_supports_sandbox_state_meta_capability: bool,
|
||||
pub(crate) codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
}
|
||||
|
||||
impl ManagedClient {
|
||||
fn listed_tools(&self) -> Vec<ToolInfo> {
|
||||
let total_start = Instant::now();
|
||||
if let Some(cache_context) = self.codex_apps_tools_cache_context.as_ref()
|
||||
&& let CachedCodexAppsToolsLoad::Hit(tools) =
|
||||
load_cached_codex_apps_tools(cache_context)
|
||||
{
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
total_start.elapsed(),
|
||||
&[("cache", "hit")],
|
||||
);
|
||||
return filter_tools(tools, &self.tool_filter);
|
||||
}
|
||||
|
||||
if self.codex_apps_tools_cache_context.is_some() {
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
total_start.elapsed(),
|
||||
&[("cache", "miss")],
|
||||
);
|
||||
}
|
||||
|
||||
self.tools.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AsyncManagedClient {
|
||||
pub(crate) client: Shared<BoxFuture<'static, Result<ManagedClient, StartupOutcomeError>>>,
|
||||
pub(crate) startup_snapshot: Option<Vec<ToolInfo>>,
|
||||
pub(crate) startup_complete: Arc<AtomicBool>,
|
||||
pub(crate) tool_plugin_provenance: Arc<ToolPluginProvenance>,
|
||||
}
|
||||
|
||||
impl AsyncManagedClient {
|
||||
// Keep this constructor flat so the startup inputs remain readable at the
|
||||
// single call site instead of introducing a one-off params wrapper.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
server_name: String,
|
||||
config: McpServerConfig,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
cancel_token: CancellationToken,
|
||||
tx_event: Sender<Event>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
tool_plugin_provenance: Arc<ToolPluginProvenance>,
|
||||
runtime_environment: McpRuntimeEnvironment,
|
||||
runtime_auth_provider: Option<SharedAuthProvider>,
|
||||
) -> Self {
|
||||
let tool_filter = ToolFilter::from_config(&config);
|
||||
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
|
||||
&server_name,
|
||||
codex_apps_tools_cache_context.as_ref(),
|
||||
)
|
||||
.map(|tools| filter_tools(tools, &tool_filter));
|
||||
let startup_tool_filter = tool_filter;
|
||||
let startup_complete = Arc::new(AtomicBool::new(false));
|
||||
let startup_complete_for_fut = Arc::clone(&startup_complete);
|
||||
let fut = async move {
|
||||
let outcome = async {
|
||||
if let Err(error) = validate_mcp_server_name(&server_name) {
|
||||
return Err(error.into());
|
||||
}
|
||||
|
||||
let client = Arc::new(
|
||||
make_rmcp_client(
|
||||
&server_name,
|
||||
config.clone(),
|
||||
store_mode,
|
||||
runtime_environment,
|
||||
runtime_auth_provider,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
match start_server_task(
|
||||
server_name,
|
||||
client,
|
||||
StartServerTaskParams {
|
||||
startup_timeout: config
|
||||
.startup_timeout_sec
|
||||
.or(Some(DEFAULT_STARTUP_TIMEOUT)),
|
||||
tool_timeout: config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT),
|
||||
tool_filter: startup_tool_filter,
|
||||
tx_event,
|
||||
elicitation_requests,
|
||||
codex_apps_tools_cache_context,
|
||||
},
|
||||
)
|
||||
.or_cancel(&cancel_token)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled),
|
||||
}
|
||||
}
|
||||
.await;
|
||||
|
||||
startup_complete_for_fut.store(true, Ordering::Release);
|
||||
outcome
|
||||
};
|
||||
let client = fut.boxed().shared();
|
||||
if startup_snapshot.is_some() {
|
||||
let startup_task = client.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = startup_task.await;
|
||||
});
|
||||
}
|
||||
|
||||
Self {
|
||||
client,
|
||||
startup_snapshot,
|
||||
startup_complete,
|
||||
tool_plugin_provenance,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn client(&self) -> Result<ManagedClient, StartupOutcomeError> {
|
||||
self.client.clone().await
|
||||
}
|
||||
|
||||
fn startup_snapshot_while_initializing(&self) -> Option<Vec<ToolInfo>> {
|
||||
if !self.startup_complete.load(Ordering::Acquire) {
|
||||
return self.startup_snapshot.clone();
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) async fn listed_tools(&self) -> Option<Vec<ToolInfo>> {
|
||||
let annotate_tools = |tools: Vec<ToolInfo>| {
|
||||
let mut tools = tools;
|
||||
for tool in &mut tools {
|
||||
if tool.server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
|
||||
}
|
||||
|
||||
let plugin_names = match tool.connector_id.as_deref() {
|
||||
Some(connector_id) => self
|
||||
.tool_plugin_provenance
|
||||
.plugin_display_names_for_connector_id(connector_id),
|
||||
None => self
|
||||
.tool_plugin_provenance
|
||||
.plugin_display_names_for_mcp_server_name(tool.server_name.as_str()),
|
||||
};
|
||||
tool.plugin_display_names = plugin_names.to_vec();
|
||||
|
||||
if plugin_names.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let plugin_source_note = if plugin_names.len() == 1 {
|
||||
format!("This tool is part of plugin `{}`.", plugin_names[0])
|
||||
} else {
|
||||
format!(
|
||||
"This tool is part of plugins {}.",
|
||||
plugin_names
|
||||
.iter()
|
||||
.map(|plugin_name| format!("`{plugin_name}`"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
)
|
||||
};
|
||||
let description = tool
|
||||
.tool
|
||||
.description
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.unwrap_or("");
|
||||
let annotated_description = if description.is_empty() {
|
||||
plugin_source_note
|
||||
} else if matches!(description.chars().last(), Some('.' | '!' | '?')) {
|
||||
format!("{description} {plugin_source_note}")
|
||||
} else {
|
||||
format!("{description}. {plugin_source_note}")
|
||||
};
|
||||
tool.tool.description = Some(Cow::Owned(annotated_description));
|
||||
}
|
||||
tools
|
||||
};
|
||||
|
||||
// Keep cache payloads raw; plugin provenance is resolved per-session at read time.
|
||||
let tools = if let Some(startup_tools) = self.startup_snapshot_while_initializing() {
|
||||
Some(startup_tools)
|
||||
} else {
|
||||
match self.client().await {
|
||||
Ok(client) => Some(client.listed_tools()),
|
||||
Err(_) => self.startup_snapshot.clone(),
|
||||
}
|
||||
};
|
||||
tools.map(annotate_tools)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, thiserror::Error)]
|
||||
pub(crate) enum StartupOutcomeError {
|
||||
#[error("MCP startup cancelled")]
|
||||
Cancelled,
|
||||
// We can't store the original error here because anyhow::Error doesn't implement
|
||||
// `Clone`.
|
||||
#[error("MCP startup failed: {error}")]
|
||||
Failed { error: String },
|
||||
}
|
||||
|
||||
impl From<anyhow::Error> for StartupOutcomeError {
|
||||
fn from(error: anyhow::Error) -> Self {
|
||||
Self::Failed {
|
||||
error: error.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn elicitation_capability_for_server(
|
||||
_server_name: &str,
|
||||
) -> Option<ElicitationCapability> {
|
||||
// https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities
|
||||
// indicates this should be an empty object.
|
||||
Some(ElicitationCapability {
|
||||
form: Some(FormElicitationCapability {
|
||||
schema_validation: None,
|
||||
}),
|
||||
url: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn start_server_task(
|
||||
server_name: String,
|
||||
client: Arc<RmcpClient>,
|
||||
params: StartServerTaskParams,
|
||||
) -> Result<ManagedClient, StartupOutcomeError> {
|
||||
let StartServerTaskParams {
|
||||
startup_timeout,
|
||||
tool_timeout,
|
||||
tool_filter,
|
||||
tx_event,
|
||||
elicitation_requests,
|
||||
codex_apps_tools_cache_context,
|
||||
} = params;
|
||||
let elicitation = elicitation_capability_for_server(&server_name);
|
||||
let params = InitializeRequestParams {
|
||||
meta: None,
|
||||
capabilities: ClientCapabilities {
|
||||
experimental: None,
|
||||
extensions: None,
|
||||
roots: None,
|
||||
sampling: None,
|
||||
elicitation,
|
||||
tasks: None,
|
||||
},
|
||||
client_info: Implementation {
|
||||
name: "codex-mcp-client".to_owned(),
|
||||
version: env!("CARGO_PKG_VERSION").to_owned(),
|
||||
title: Some("Codex".into()),
|
||||
description: None,
|
||||
icons: None,
|
||||
website_url: None,
|
||||
},
|
||||
protocol_version: ProtocolVersion::V_2025_06_18,
|
||||
};
|
||||
|
||||
let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event);
|
||||
|
||||
let initialize_result = client
|
||||
.initialize(params, startup_timeout, send_elicitation)
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)?;
|
||||
|
||||
let server_supports_sandbox_state_meta_capability = initialize_result
|
||||
.capabilities
|
||||
.experimental
|
||||
.as_ref()
|
||||
.and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY))
|
||||
.is_some();
|
||||
let list_start = Instant::now();
|
||||
let fetch_start = Instant::now();
|
||||
let tools = list_tools_for_client_uncached(
|
||||
&server_name,
|
||||
&client,
|
||||
startup_timeout,
|
||||
initialize_result.instructions.as_deref(),
|
||||
)
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)?;
|
||||
emit_duration(
|
||||
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
|
||||
fetch_start.elapsed(),
|
||||
&[],
|
||||
);
|
||||
write_cached_codex_apps_tools_if_needed(
|
||||
&server_name,
|
||||
codex_apps_tools_cache_context.as_ref(),
|
||||
&tools,
|
||||
);
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
list_start.elapsed(),
|
||||
&[("cache", "miss")],
|
||||
);
|
||||
}
|
||||
let tools = filter_tools(tools, &tool_filter);
|
||||
|
||||
let managed = ManagedClient {
|
||||
client: Arc::clone(&client),
|
||||
tools,
|
||||
tool_timeout: Some(tool_timeout),
|
||||
tool_filter,
|
||||
server_instructions: initialize_result.instructions,
|
||||
server_supports_sandbox_state_meta_capability,
|
||||
codex_apps_tools_cache_context,
|
||||
};
|
||||
|
||||
Ok(managed)
|
||||
}
|
||||
|
||||
struct StartServerTaskParams {
|
||||
startup_timeout: Option<Duration>, // TODO: cancel_token should handle this.
|
||||
tool_timeout: Duration,
|
||||
tool_filter: ToolFilter,
|
||||
tx_event: Sender<Event>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
}
|
||||
|
||||
async fn make_rmcp_client(
|
||||
server_name: &str,
|
||||
config: McpServerConfig,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
runtime_environment: McpRuntimeEnvironment,
|
||||
runtime_auth_provider: Option<SharedAuthProvider>,
|
||||
) -> Result<RmcpClient, StartupOutcomeError> {
|
||||
let McpServerConfig {
|
||||
transport,
|
||||
experimental_environment,
|
||||
..
|
||||
} = config;
|
||||
let remote_environment = match experimental_environment.as_deref() {
|
||||
None | Some("local") => false,
|
||||
Some("remote") => {
|
||||
if !runtime_environment.environment().is_remote() {
|
||||
return Err(StartupOutcomeError::from(anyhow!(
|
||||
"remote MCP server `{server_name}` requires a remote environment"
|
||||
)));
|
||||
}
|
||||
true
|
||||
}
|
||||
Some(environment) => {
|
||||
return Err(StartupOutcomeError::from(anyhow!(
|
||||
"unsupported experimental_environment `{environment}` for MCP server `{server_name}`"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
match transport {
|
||||
McpServerTransportConfig::Stdio {
|
||||
command,
|
||||
args,
|
||||
env,
|
||||
env_vars,
|
||||
cwd,
|
||||
} => {
|
||||
let command_os: OsString = command.into();
|
||||
let args_os: Vec<OsString> = args.into_iter().map(Into::into).collect();
|
||||
let env_os = env.map(|env| {
|
||||
env.into_iter()
|
||||
.map(|(key, value)| (key.into(), value.into()))
|
||||
.collect::<HashMap<_, _>>()
|
||||
});
|
||||
let launcher = if remote_environment {
|
||||
Arc::new(ExecutorStdioServerLauncher::new(
|
||||
runtime_environment.environment().get_exec_backend(),
|
||||
runtime_environment.fallback_cwd(),
|
||||
))
|
||||
} else {
|
||||
Arc::new(LocalStdioServerLauncher::new(
|
||||
runtime_environment.fallback_cwd(),
|
||||
)) as Arc<dyn StdioServerLauncher>
|
||||
};
|
||||
|
||||
// `RmcpClient` always sees a launched MCP stdio server. The
|
||||
// launcher hides whether that means a local child process or an
|
||||
// executor process whose stdin/stdout bytes cross the process API.
|
||||
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher)
|
||||
.await
|
||||
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
|
||||
}
|
||||
McpServerTransportConfig::StreamableHttp {
|
||||
url,
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
bearer_token_env_var,
|
||||
} => {
|
||||
let http_client: Arc<dyn HttpClient> = if remote_environment {
|
||||
runtime_environment.environment().get_http_client()
|
||||
} else {
|
||||
Arc::new(ReqwestHttpClient)
|
||||
};
|
||||
let resolved_bearer_token =
|
||||
match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) {
|
||||
Ok(token) => token,
|
||||
Err(error) => return Err(error.into()),
|
||||
};
|
||||
RmcpClient::new_streamable_http_client(
|
||||
server_name,
|
||||
&url,
|
||||
resolved_bearer_token,
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
store_mode,
|
||||
http_client,
|
||||
runtime_auth_provider,
|
||||
)
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn list_tools_for_client_uncached(
|
||||
server_name: &str,
|
||||
client: &Arc<RmcpClient>,
|
||||
timeout: Option<Duration>,
|
||||
server_instructions: Option<&str>,
|
||||
) -> Result<Vec<ToolInfo>> {
|
||||
let resp = client
|
||||
.list_tools_with_connector_ids(/*params*/ None, timeout)
|
||||
.await?;
|
||||
let tools = resp
|
||||
.tools
|
||||
.into_iter()
|
||||
.map(|tool| {
|
||||
let callable_name = normalize_codex_apps_callable_name(
|
||||
server_name,
|
||||
&tool.tool.name,
|
||||
tool.connector_id.as_deref(),
|
||||
tool.connector_name.as_deref(),
|
||||
);
|
||||
let callable_namespace = normalize_codex_apps_callable_namespace(
|
||||
server_name,
|
||||
tool.connector_name.as_deref(),
|
||||
);
|
||||
let connector_name = tool.connector_name;
|
||||
let connector_description = tool.connector_description;
|
||||
let mut tool_def = tool.tool;
|
||||
if let Some(title) = tool_def.title.as_deref() {
|
||||
let normalized_title =
|
||||
normalize_codex_apps_tool_title(server_name, connector_name.as_deref(), title);
|
||||
if tool_def.title.as_deref() != Some(normalized_title.as_str()) {
|
||||
tool_def.title = Some(normalized_title);
|
||||
}
|
||||
}
|
||||
ToolInfo {
|
||||
server_name: server_name.to_owned(),
|
||||
callable_name,
|
||||
callable_namespace,
|
||||
server_instructions: server_instructions.map(str::to_string),
|
||||
tool: tool_def,
|
||||
connector_id: tool.connector_id,
|
||||
connector_name,
|
||||
plugin_display_names: Vec::new(),
|
||||
connector_description,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
return Ok(filter_disallowed_codex_apps_tools(tools));
|
||||
}
|
||||
Ok(tools)
|
||||
}
|
||||
180
codex-rs/codex-mcp/src/elicitation.rs
Normal file
180
codex-rs/codex-mcp/src/elicitation.rs
Normal file
@@ -0,0 +1,180 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
|
||||
use crate::mcp::mcp_permission_prompt_is_auto_approved;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use async_channel::Sender;
|
||||
use codex_protocol::approvals::ElicitationRequest;
|
||||
use codex_protocol::approvals::ElicitationRequestEvent;
|
||||
use codex_protocol::mcp::RequestId as ProtocolRequestId;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use codex_rmcp_client::SendElicitation;
|
||||
use futures::future::FutureExt;
|
||||
use rmcp::model::CreateElicitationRequestParams;
|
||||
use rmcp::model::ElicitationAction;
|
||||
use rmcp::model::RequestId;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
type ResponderMap = HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>;
|
||||
|
||||
pub(crate) fn elicitation_is_rejected_by_policy(approval_policy: AskForApproval) -> bool {
|
||||
match approval_policy {
|
||||
AskForApproval::Never => true,
|
||||
AskForApproval::OnFailure => false,
|
||||
AskForApproval::OnRequest => false,
|
||||
AskForApproval::UnlessTrusted => false,
|
||||
AskForApproval::Granular(granular_config) => !granular_config.allows_mcp_elicitations(),
|
||||
}
|
||||
}
|
||||
|
||||
fn can_auto_accept_elicitation(elicitation: &CreateElicitationRequestParams) -> bool {
|
||||
match elicitation {
|
||||
CreateElicitationRequestParams::FormElicitationParams {
|
||||
requested_schema, ..
|
||||
} => {
|
||||
// Auto-accept confirm/approval elicitations without schema requirements.
|
||||
requested_schema.properties.is_empty()
|
||||
}
|
||||
CreateElicitationRequestParams::UrlElicitationParams { .. } => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ElicitationRequestManager {
|
||||
requests: Arc<Mutex<ResponderMap>>,
|
||||
pub(crate) approval_policy: Arc<StdMutex<AskForApproval>>,
|
||||
pub(crate) sandbox_policy: Arc<StdMutex<SandboxPolicy>>,
|
||||
}
|
||||
|
||||
impl ElicitationRequestManager {
|
||||
pub(crate) fn new(approval_policy: AskForApproval, sandbox_policy: SandboxPolicy) -> Self {
|
||||
Self {
|
||||
requests: Arc::new(Mutex::new(HashMap::new())),
|
||||
approval_policy: Arc::new(StdMutex::new(approval_policy)),
|
||||
sandbox_policy: Arc::new(StdMutex::new(sandbox_policy)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn resolve(
|
||||
&self,
|
||||
server_name: String,
|
||||
id: RequestId,
|
||||
response: ElicitationResponse,
|
||||
) -> Result<()> {
|
||||
self.requests
|
||||
.lock()
|
||||
.await
|
||||
.remove(&(server_name, id))
|
||||
.ok_or_else(|| anyhow!("elicitation request not found"))?
|
||||
.send(response)
|
||||
.map_err(|e| anyhow!("failed to send elicitation response: {e:?}"))
|
||||
}
|
||||
|
||||
pub(crate) fn make_sender(
|
||||
&self,
|
||||
server_name: String,
|
||||
tx_event: Sender<Event>,
|
||||
) -> SendElicitation {
|
||||
let elicitation_requests = self.requests.clone();
|
||||
let approval_policy = self.approval_policy.clone();
|
||||
let sandbox_policy = self.sandbox_policy.clone();
|
||||
Box::new(move |id, elicitation| {
|
||||
let elicitation_requests = elicitation_requests.clone();
|
||||
let tx_event = tx_event.clone();
|
||||
let server_name = server_name.clone();
|
||||
let approval_policy = approval_policy.clone();
|
||||
let sandbox_policy = sandbox_policy.clone();
|
||||
async move {
|
||||
let approval_policy = approval_policy
|
||||
.lock()
|
||||
.map(|policy| *policy)
|
||||
.unwrap_or(AskForApproval::Never);
|
||||
let sandbox_policy = sandbox_policy
|
||||
.lock()
|
||||
.map(|policy| policy.clone())
|
||||
.unwrap_or_else(|_| SandboxPolicy::new_read_only_policy());
|
||||
if mcp_permission_prompt_is_auto_approved(approval_policy, &sandbox_policy)
|
||||
&& can_auto_accept_elicitation(&elicitation)
|
||||
{
|
||||
return Ok(ElicitationResponse {
|
||||
action: ElicitationAction::Accept,
|
||||
content: Some(serde_json::json!({})),
|
||||
meta: None,
|
||||
});
|
||||
}
|
||||
|
||||
if elicitation_is_rejected_by_policy(approval_policy) {
|
||||
return Ok(ElicitationResponse {
|
||||
action: ElicitationAction::Decline,
|
||||
content: None,
|
||||
meta: None,
|
||||
});
|
||||
}
|
||||
|
||||
let request = match elicitation {
|
||||
CreateElicitationRequestParams::FormElicitationParams {
|
||||
meta,
|
||||
message,
|
||||
requested_schema,
|
||||
} => ElicitationRequest::Form {
|
||||
meta: meta
|
||||
.map(serde_json::to_value)
|
||||
.transpose()
|
||||
.context("failed to serialize MCP elicitation metadata")?,
|
||||
message,
|
||||
requested_schema: serde_json::to_value(requested_schema)
|
||||
.context("failed to serialize MCP elicitation schema")?,
|
||||
},
|
||||
CreateElicitationRequestParams::UrlElicitationParams {
|
||||
meta,
|
||||
message,
|
||||
url,
|
||||
elicitation_id,
|
||||
} => ElicitationRequest::Url {
|
||||
meta: meta
|
||||
.map(serde_json::to_value)
|
||||
.transpose()
|
||||
.context("failed to serialize MCP elicitation metadata")?,
|
||||
message,
|
||||
url,
|
||||
elicitation_id,
|
||||
},
|
||||
};
|
||||
let (tx, rx) = oneshot::channel();
|
||||
{
|
||||
let mut lock = elicitation_requests.lock().await;
|
||||
lock.insert((server_name.clone(), id.clone()), tx);
|
||||
}
|
||||
let _ = tx_event
|
||||
.send(Event {
|
||||
id: "mcp_elicitation_request".to_string(),
|
||||
msg: EventMsg::ElicitationRequest(ElicitationRequestEvent {
|
||||
turn_id: None,
|
||||
server_name,
|
||||
id: match id.clone() {
|
||||
rmcp::model::NumberOrString::String(value) => {
|
||||
ProtocolRequestId::String(value.to_string())
|
||||
}
|
||||
rmcp::model::NumberOrString::Number(value) => {
|
||||
ProtocolRequestId::Integer(value)
|
||||
}
|
||||
},
|
||||
request,
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
rx.await
|
||||
.context("elicitation request channel closed unexpectedly")
|
||||
}
|
||||
.boxed()
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,15 @@
|
||||
pub use mcp_connection_manager::MCP_SANDBOX_STATE_META_CAPABILITY;
|
||||
pub use mcp_connection_manager::McpConnectionManager;
|
||||
pub use mcp_connection_manager::McpRuntimeEnvironment;
|
||||
pub use mcp_connection_manager::SandboxState;
|
||||
pub use mcp_connection_manager::ToolInfo;
|
||||
pub use manager::McpConnectionManager;
|
||||
pub use mcp_connection::MCP_SANDBOX_STATE_META_CAPABILITY;
|
||||
pub use mcp_connection::McpRuntimeEnvironment;
|
||||
pub use mcp_connection::SandboxState;
|
||||
pub use mcp_connection::ToolInfo;
|
||||
|
||||
pub use mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
pub use mcp::McpConfig;
|
||||
pub use mcp::ToolPluginProvenance;
|
||||
|
||||
pub use mcp_connection_manager::CodexAppsToolsCacheKey;
|
||||
pub use mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
pub use mcp_connection::CodexAppsToolsCacheKey;
|
||||
pub use mcp_connection::codex_apps_tools_cache_key;
|
||||
|
||||
pub use mcp::configured_mcp_servers;
|
||||
pub use mcp::effective_mcp_servers;
|
||||
@@ -35,9 +35,12 @@ pub use mcp::should_retry_without_scopes;
|
||||
|
||||
pub use mcp::mcp_permission_prompt_is_auto_approved;
|
||||
pub use mcp::qualified_mcp_tool_name_prefix;
|
||||
pub use mcp_connection_manager::declared_openai_file_input_param_names;
|
||||
pub use mcp_connection_manager::filter_non_codex_apps_mcp_tools_only;
|
||||
pub use mcp_connection::declared_openai_file_input_param_names;
|
||||
pub use mcp_connection::filter_non_codex_apps_mcp_tools_only;
|
||||
|
||||
pub(crate) mod client;
|
||||
pub(crate) mod elicitation;
|
||||
pub(crate) mod manager;
|
||||
pub(crate) mod mcp;
|
||||
pub(crate) mod mcp_connection_manager;
|
||||
pub(crate) mod mcp_connection;
|
||||
pub(crate) mod mcp_tool_names;
|
||||
|
||||
608
codex-rs/codex-mcp/src/manager.rs
Normal file
608
codex-rs/codex-mcp/src/manager.rs
Normal file
@@ -0,0 +1,608 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::McpAuthStatusEntry;
|
||||
use crate::client::AsyncManagedClient;
|
||||
use crate::client::MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC;
|
||||
use crate::client::MCP_TOOLS_LIST_DURATION_METRIC;
|
||||
use crate::client::ManagedClient;
|
||||
use crate::client::StartupOutcomeError;
|
||||
use crate::client::list_tools_for_client_uncached;
|
||||
use crate::elicitation::ElicitationRequestManager;
|
||||
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use crate::mcp::ToolPluginProvenance;
|
||||
use crate::mcp_connection::CodexAppsToolsCacheContext;
|
||||
use crate::mcp_connection::CodexAppsToolsCacheKey;
|
||||
use crate::mcp_connection::McpRuntimeEnvironment;
|
||||
use crate::mcp_connection::ToolInfo;
|
||||
use crate::mcp_connection::emit_duration;
|
||||
use crate::mcp_connection::emit_update;
|
||||
use crate::mcp_connection::filter_tools;
|
||||
use crate::mcp_connection::mcp_init_error_display;
|
||||
use crate::mcp_connection::qualify_tools;
|
||||
use crate::mcp_connection::startup_outcome_error_message;
|
||||
use crate::mcp_connection::tool_with_model_visible_input_schema;
|
||||
use crate::mcp_connection::transport_origin;
|
||||
use crate::mcp_connection::write_cached_codex_apps_tools_if_needed;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use async_channel::Sender;
|
||||
use codex_config::Constrained;
|
||||
use codex_config::McpServerConfig;
|
||||
use codex_config::McpServerTransportConfig;
|
||||
use codex_config::types::OAuthCredentialsStoreMode;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_protocol::ToolName;
|
||||
use codex_protocol::mcp::CallToolResult;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::McpStartupCompleteEvent;
|
||||
use codex_protocol::protocol::McpStartupFailure;
|
||||
use codex_protocol::protocol::McpStartupStatus;
|
||||
use codex_protocol::protocol::McpStartupUpdateEvent;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use rmcp::model::ListResourceTemplatesResult;
|
||||
use rmcp::model::ListResourcesResult;
|
||||
use rmcp::model::PaginatedRequestParams;
|
||||
use rmcp::model::ReadResourceRequestParams;
|
||||
use rmcp::model::ReadResourceResult;
|
||||
use rmcp::model::RequestId;
|
||||
use rmcp::model::Resource;
|
||||
use rmcp::model::ResourceTemplate;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::instrument;
|
||||
use tracing::warn;
|
||||
|
||||
/// A thin wrapper around a set of running [`RmcpClient`] instances.
|
||||
pub struct McpConnectionManager {
|
||||
clients: HashMap<String, AsyncManagedClient>,
|
||||
server_origins: HashMap<String, String>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
}
|
||||
|
||||
impl McpConnectionManager {
|
||||
pub fn new_uninitialized(
|
||||
approval_policy: &Constrained<AskForApproval>,
|
||||
sandbox_policy: &Constrained<SandboxPolicy>,
|
||||
) -> Self {
|
||||
Self {
|
||||
clients: HashMap::new(),
|
||||
server_origins: HashMap::new(),
|
||||
elicitation_requests: ElicitationRequestManager::new(
|
||||
approval_policy.value(),
|
||||
sandbox_policy.get().clone(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has_servers(&self) -> bool {
|
||||
!self.clients.is_empty()
|
||||
}
|
||||
|
||||
pub fn server_origin(&self, server_name: &str) -> Option<&str> {
|
||||
self.server_origins.get(server_name).map(String::as_str)
|
||||
}
|
||||
|
||||
pub fn set_approval_policy(&self, approval_policy: &Constrained<AskForApproval>) {
|
||||
if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() {
|
||||
*policy = approval_policy.value();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_sandbox_policy(&self, sandbox_policy: &SandboxPolicy) {
|
||||
if let Ok(mut policy) = self.elicitation_requests.sandbox_policy.lock() {
|
||||
*policy = sandbox_policy.clone();
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
mcp_servers: &HashMap<String, McpServerConfig>,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
auth_entries: HashMap<String, McpAuthStatusEntry>,
|
||||
approval_policy: &Constrained<AskForApproval>,
|
||||
submit_id: String,
|
||||
tx_event: Sender<Event>,
|
||||
initial_sandbox_policy: SandboxPolicy,
|
||||
runtime_environment: McpRuntimeEnvironment,
|
||||
codex_home: PathBuf,
|
||||
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
|
||||
tool_plugin_provenance: ToolPluginProvenance,
|
||||
auth: Option<&CodexAuth>,
|
||||
) -> (Self, CancellationToken) {
|
||||
let cancel_token = CancellationToken::new();
|
||||
let mut clients = HashMap::new();
|
||||
let mut server_origins = HashMap::new();
|
||||
let mut join_set = JoinSet::new();
|
||||
let elicitation_requests =
|
||||
ElicitationRequestManager::new(approval_policy.value(), initial_sandbox_policy);
|
||||
let tool_plugin_provenance = Arc::new(tool_plugin_provenance);
|
||||
let startup_submit_id = submit_id.clone();
|
||||
let codex_apps_auth_provider = auth
|
||||
.filter(|auth| auth.uses_codex_backend())
|
||||
.map(codex_model_provider::auth_provider_from_auth);
|
||||
let mcp_servers = mcp_servers.clone();
|
||||
for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) {
|
||||
if let Some(origin) = transport_origin(&cfg.transport) {
|
||||
server_origins.insert(server_name.clone(), origin);
|
||||
}
|
||||
let cancel_token = cancel_token.child_token();
|
||||
let _ = emit_update(
|
||||
startup_submit_id.as_str(),
|
||||
&tx_event,
|
||||
McpStartupUpdateEvent {
|
||||
server: server_name.clone(),
|
||||
status: McpStartupStatus::Starting,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
Some(CodexAppsToolsCacheContext {
|
||||
codex_home: codex_home.clone(),
|
||||
user_key: codex_apps_tools_cache_key.clone(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let uses_env_bearer_token = match &cfg.transport {
|
||||
McpServerTransportConfig::StreamableHttp {
|
||||
bearer_token_env_var,
|
||||
..
|
||||
} => bearer_token_env_var.is_some(),
|
||||
McpServerTransportConfig::Stdio { .. } => false,
|
||||
};
|
||||
let runtime_auth_provider =
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME && !uses_env_bearer_token {
|
||||
codex_apps_auth_provider.clone()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let async_managed_client = AsyncManagedClient::new(
|
||||
server_name.clone(),
|
||||
cfg,
|
||||
store_mode,
|
||||
cancel_token.clone(),
|
||||
tx_event.clone(),
|
||||
elicitation_requests.clone(),
|
||||
codex_apps_tools_cache_context,
|
||||
Arc::clone(&tool_plugin_provenance),
|
||||
runtime_environment.clone(),
|
||||
runtime_auth_provider,
|
||||
);
|
||||
clients.insert(server_name.clone(), async_managed_client.clone());
|
||||
let tx_event = tx_event.clone();
|
||||
let submit_id = startup_submit_id.clone();
|
||||
let auth_entry = auth_entries.get(&server_name).cloned();
|
||||
join_set.spawn(async move {
|
||||
let mut outcome = async_managed_client.client().await;
|
||||
if cancel_token.is_cancelled() {
|
||||
outcome = Err(StartupOutcomeError::Cancelled);
|
||||
}
|
||||
let status = match &outcome {
|
||||
Ok(_) => McpStartupStatus::Ready,
|
||||
Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled,
|
||||
Err(error) => {
|
||||
let error_str = mcp_init_error_display(
|
||||
server_name.as_str(),
|
||||
auth_entry.as_ref(),
|
||||
error,
|
||||
);
|
||||
McpStartupStatus::Failed { error: error_str }
|
||||
}
|
||||
};
|
||||
|
||||
let _ = emit_update(
|
||||
submit_id.as_str(),
|
||||
&tx_event,
|
||||
McpStartupUpdateEvent {
|
||||
server: server_name.clone(),
|
||||
status,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
(server_name, outcome)
|
||||
});
|
||||
}
|
||||
let manager = Self {
|
||||
clients,
|
||||
server_origins,
|
||||
elicitation_requests: elicitation_requests.clone(),
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
let outcomes = join_set.join_all().await;
|
||||
let mut summary = McpStartupCompleteEvent::default();
|
||||
for (server_name, outcome) in outcomes {
|
||||
match outcome {
|
||||
Ok(_) => summary.ready.push(server_name),
|
||||
Err(StartupOutcomeError::Cancelled) => summary.cancelled.push(server_name),
|
||||
Err(StartupOutcomeError::Failed { error }) => {
|
||||
summary.failed.push(McpStartupFailure {
|
||||
server: server_name,
|
||||
error,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = tx_event
|
||||
.send(Event {
|
||||
id: startup_submit_id,
|
||||
msg: EventMsg::McpStartupComplete(summary),
|
||||
})
|
||||
.await;
|
||||
});
|
||||
(manager, cancel_token)
|
||||
}
|
||||
|
||||
pub async fn resolve_elicitation(
|
||||
&self,
|
||||
server_name: String,
|
||||
id: RequestId,
|
||||
response: ElicitationResponse,
|
||||
) -> Result<()> {
|
||||
self.elicitation_requests
|
||||
.resolve(server_name, id, response)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn wait_for_server_ready(&self, server_name: &str, timeout: Duration) -> bool {
|
||||
let Some(async_managed_client) = self.clients.get(server_name) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
match tokio::time::timeout(timeout, async_managed_client.client()).await {
|
||||
Ok(Ok(_)) => true,
|
||||
Ok(Err(_)) | Err(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn required_startup_failures(
|
||||
&self,
|
||||
required_servers: &[String],
|
||||
) -> Vec<McpStartupFailure> {
|
||||
let mut failures = Vec::new();
|
||||
for server_name in required_servers {
|
||||
let Some(async_managed_client) = self.clients.get(server_name).cloned() else {
|
||||
failures.push(McpStartupFailure {
|
||||
server: server_name.clone(),
|
||||
error: format!("required MCP server `{server_name}` was not initialized"),
|
||||
});
|
||||
continue;
|
||||
};
|
||||
|
||||
match async_managed_client.client().await {
|
||||
Ok(_) => {}
|
||||
Err(error) => failures.push(McpStartupFailure {
|
||||
server: server_name.clone(),
|
||||
error: startup_outcome_error_message(error),
|
||||
}),
|
||||
}
|
||||
}
|
||||
failures
|
||||
}
|
||||
|
||||
/// Returns a single map that contains all tools. Each key is the
|
||||
/// fully-qualified name for the tool.
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub async fn list_all_tools(&self) -> HashMap<String, ToolInfo> {
|
||||
let mut tools = Vec::new();
|
||||
for managed_client in self.clients.values() {
|
||||
let Some(server_tools) = managed_client.listed_tools().await else {
|
||||
continue;
|
||||
};
|
||||
tools.extend(server_tools);
|
||||
}
|
||||
qualify_tools(tools)
|
||||
}
|
||||
|
||||
/// Force-refresh codex apps tools by bypassing the in-process cache.
|
||||
///
|
||||
/// On success, the refreshed tools replace the cache contents and the
|
||||
/// latest filtered tool map is returned directly to the caller. On
|
||||
/// failure, the existing cache remains unchanged.
|
||||
pub async fn hard_refresh_codex_apps_tools_cache(&self) -> Result<HashMap<String, ToolInfo>> {
|
||||
let managed_client = self
|
||||
.clients
|
||||
.get(CODEX_APPS_MCP_SERVER_NAME)
|
||||
.ok_or_else(|| anyhow!("unknown MCP server '{CODEX_APPS_MCP_SERVER_NAME}'"))?
|
||||
.client()
|
||||
.await
|
||||
.context("failed to get client")?;
|
||||
|
||||
let list_start = Instant::now();
|
||||
let fetch_start = Instant::now();
|
||||
let tools = list_tools_for_client_uncached(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
&managed_client.client,
|
||||
managed_client.tool_timeout,
|
||||
managed_client.server_instructions.as_deref(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'")
|
||||
})?;
|
||||
emit_duration(
|
||||
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
|
||||
fetch_start.elapsed(),
|
||||
&[],
|
||||
);
|
||||
|
||||
write_cached_codex_apps_tools_if_needed(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
managed_client.codex_apps_tools_cache_context.as_ref(),
|
||||
&tools,
|
||||
);
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
list_start.elapsed(),
|
||||
&[("cache", "miss")],
|
||||
);
|
||||
let tools = filter_tools(tools, &managed_client.tool_filter)
|
||||
.into_iter()
|
||||
.map(|mut tool| {
|
||||
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
|
||||
tool
|
||||
});
|
||||
Ok(qualify_tools(tools))
|
||||
}
|
||||
|
||||
/// Returns a single map that contains all resources. Each key is the
|
||||
/// server name and the value is a vector of resources.
|
||||
pub async fn list_all_resources(&self) -> HashMap<String, Vec<Resource>> {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
let clients_snapshot = &self.clients;
|
||||
|
||||
for (server_name, async_managed_client) in clients_snapshot {
|
||||
let server_name = server_name.clone();
|
||||
let Ok(managed_client) = async_managed_client.client().await else {
|
||||
continue;
|
||||
};
|
||||
let timeout = managed_client.tool_timeout;
|
||||
let client = managed_client.client.clone();
|
||||
|
||||
join_set.spawn(async move {
|
||||
let mut collected: Vec<Resource> = Vec::new();
|
||||
let mut cursor: Option<String> = None;
|
||||
|
||||
loop {
|
||||
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
|
||||
meta: None,
|
||||
cursor: Some(next.clone()),
|
||||
});
|
||||
let response = match client.list_resources(params, timeout).await {
|
||||
Ok(result) => result,
|
||||
Err(err) => return (server_name, Err(err)),
|
||||
};
|
||||
|
||||
collected.extend(response.resources);
|
||||
|
||||
match response.next_cursor {
|
||||
Some(next) => {
|
||||
if cursor.as_ref() == Some(&next) {
|
||||
return (
|
||||
server_name,
|
||||
Err(anyhow!("resources/list returned duplicate cursor")),
|
||||
);
|
||||
}
|
||||
cursor = Some(next);
|
||||
}
|
||||
None => return (server_name, Ok(collected)),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let mut aggregated: HashMap<String, Vec<Resource>> = HashMap::new();
|
||||
|
||||
while let Some(join_res) = join_set.join_next().await {
|
||||
match join_res {
|
||||
Ok((server_name, Ok(resources))) => {
|
||||
aggregated.insert(server_name, resources);
|
||||
}
|
||||
Ok((server_name, Err(err))) => {
|
||||
warn!("Failed to list resources for MCP server '{server_name}': {err:#}");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Task panic when listing resources for MCP server: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
aggregated
|
||||
}
|
||||
|
||||
/// Returns a single map that contains all resource templates. Each key is the
|
||||
/// server name and the value is a vector of resource templates.
|
||||
pub async fn list_all_resource_templates(&self) -> HashMap<String, Vec<ResourceTemplate>> {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
let clients_snapshot = &self.clients;
|
||||
|
||||
for (server_name, async_managed_client) in clients_snapshot {
|
||||
let server_name_cloned = server_name.clone();
|
||||
let Ok(managed_client) = async_managed_client.client().await else {
|
||||
continue;
|
||||
};
|
||||
let client = managed_client.client.clone();
|
||||
let timeout = managed_client.tool_timeout;
|
||||
|
||||
join_set.spawn(async move {
|
||||
let mut collected: Vec<ResourceTemplate> = Vec::new();
|
||||
let mut cursor: Option<String> = None;
|
||||
|
||||
loop {
|
||||
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
|
||||
meta: None,
|
||||
cursor: Some(next.clone()),
|
||||
});
|
||||
let response = match client.list_resource_templates(params, timeout).await {
|
||||
Ok(result) => result,
|
||||
Err(err) => return (server_name_cloned, Err(err)),
|
||||
};
|
||||
|
||||
collected.extend(response.resource_templates);
|
||||
|
||||
match response.next_cursor {
|
||||
Some(next) => {
|
||||
if cursor.as_ref() == Some(&next) {
|
||||
return (
|
||||
server_name_cloned,
|
||||
Err(anyhow!(
|
||||
"resources/templates/list returned duplicate cursor"
|
||||
)),
|
||||
);
|
||||
}
|
||||
cursor = Some(next);
|
||||
}
|
||||
None => return (server_name_cloned, Ok(collected)),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let mut aggregated: HashMap<String, Vec<ResourceTemplate>> = HashMap::new();
|
||||
|
||||
while let Some(join_res) = join_set.join_next().await {
|
||||
match join_res {
|
||||
Ok((server_name, Ok(templates))) => {
|
||||
aggregated.insert(server_name, templates);
|
||||
}
|
||||
Ok((server_name, Err(err))) => {
|
||||
warn!(
|
||||
"Failed to list resource templates for MCP server '{server_name}': {err:#}"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Task panic when listing resource templates for MCP server: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
aggregated
|
||||
}
|
||||
|
||||
/// Invoke the tool indicated by the (server, tool) pair.
|
||||
pub async fn call_tool(
|
||||
&self,
|
||||
server: &str,
|
||||
tool: &str,
|
||||
arguments: Option<serde_json::Value>,
|
||||
meta: Option<serde_json::Value>,
|
||||
) -> Result<CallToolResult> {
|
||||
let client = self.client_by_name(server).await?;
|
||||
if !client.tool_filter.allows(tool) {
|
||||
return Err(anyhow!(
|
||||
"tool '{tool}' is disabled for MCP server '{server}'"
|
||||
));
|
||||
}
|
||||
|
||||
let result: rmcp::model::CallToolResult = client
|
||||
.client
|
||||
.call_tool(tool.to_string(), arguments, meta, client.tool_timeout)
|
||||
.await
|
||||
.with_context(|| format!("tool call failed for `{server}/{tool}`"))?;
|
||||
|
||||
let content = result
|
||||
.content
|
||||
.into_iter()
|
||||
.map(|content| {
|
||||
serde_json::to_value(content)
|
||||
.unwrap_or_else(|_| serde_json::Value::String("<content>".to_string()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(CallToolResult {
|
||||
content,
|
||||
structured_content: result.structured_content,
|
||||
is_error: result.is_error,
|
||||
meta: result.meta.and_then(|meta| serde_json::to_value(meta).ok()),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn server_supports_sandbox_state_meta_capability(
|
||||
&self,
|
||||
server: &str,
|
||||
) -> Result<bool> {
|
||||
Ok(self
|
||||
.client_by_name(server)
|
||||
.await?
|
||||
.server_supports_sandbox_state_meta_capability)
|
||||
}
|
||||
|
||||
/// List resources from the specified server.
|
||||
pub async fn list_resources(
|
||||
&self,
|
||||
server: &str,
|
||||
params: Option<PaginatedRequestParams>,
|
||||
) -> Result<ListResourcesResult> {
|
||||
let managed = self.client_by_name(server).await?;
|
||||
let timeout = managed.tool_timeout;
|
||||
|
||||
managed
|
||||
.client
|
||||
.list_resources(params, timeout)
|
||||
.await
|
||||
.with_context(|| format!("resources/list failed for `{server}`"))
|
||||
}
|
||||
|
||||
/// List resource templates from the specified server.
|
||||
pub async fn list_resource_templates(
|
||||
&self,
|
||||
server: &str,
|
||||
params: Option<PaginatedRequestParams>,
|
||||
) -> Result<ListResourceTemplatesResult> {
|
||||
let managed = self.client_by_name(server).await?;
|
||||
let client = managed.client.clone();
|
||||
let timeout = managed.tool_timeout;
|
||||
|
||||
client
|
||||
.list_resource_templates(params, timeout)
|
||||
.await
|
||||
.with_context(|| format!("resources/templates/list failed for `{server}`"))
|
||||
}
|
||||
|
||||
/// Read a resource from the specified server.
|
||||
pub async fn read_resource(
|
||||
&self,
|
||||
server: &str,
|
||||
params: ReadResourceRequestParams,
|
||||
) -> Result<ReadResourceResult> {
|
||||
let managed = self.client_by_name(server).await?;
|
||||
let client = managed.client.clone();
|
||||
let timeout = managed.tool_timeout;
|
||||
let uri = params.uri.clone();
|
||||
|
||||
client
|
||||
.read_resource(params, timeout)
|
||||
.await
|
||||
.with_context(|| format!("resources/read failed for `{server}` ({uri})"))
|
||||
}
|
||||
|
||||
pub async fn resolve_tool_info(&self, tool_name: &ToolName) -> Option<ToolInfo> {
|
||||
let all_tools = self.list_all_tools().await;
|
||||
all_tools
|
||||
.into_values()
|
||||
.find(|tool| tool.canonical_tool_name() == *tool_name)
|
||||
}
|
||||
|
||||
async fn client_by_name(&self, name: &str) -> Result<ManagedClient> {
|
||||
self.clients
|
||||
.get(name)
|
||||
.ok_or_else(|| anyhow!("unknown MCP server '{name}'"))?
|
||||
.client()
|
||||
.await
|
||||
.context("failed to get client")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "mcp_connection_manager_tests.rs"]
|
||||
mod tests;
|
||||
@@ -34,9 +34,9 @@ use rmcp::model::ReadResourceRequestParams;
|
||||
use rmcp::model::ReadResourceResult;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::mcp_connection_manager::McpRuntimeEnvironment;
|
||||
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
use crate::manager::McpConnectionManager;
|
||||
use crate::mcp_connection::McpRuntimeEnvironment;
|
||||
use crate::mcp_connection::codex_apps_tools_cache_key;
|
||||
|
||||
pub const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps";
|
||||
const MCP_TOOL_NAME_PREFIX: &str = "mcp";
|
||||
|
||||
625
codex-rs/codex-mcp/src/mcp_connection.rs
Normal file
625
codex-rs/codex-mcp/src/mcp_connection.rs
Normal file
@@ -0,0 +1,625 @@
|
||||
//! Connection support for Model Context Protocol (MCP) servers.
|
||||
//!
|
||||
//! This module contains shared types and helpers used by [`McpConnectionManager`].
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::env;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::McpAuthStatusEntry;
|
||||
use crate::client::StartupOutcomeError;
|
||||
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
pub(crate) use crate::mcp_tool_names::qualify_tools;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use async_channel::Sender;
|
||||
use codex_exec_server::Environment;
|
||||
use codex_protocol::ToolName;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::McpStartupUpdateEvent;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use rmcp::model::Tool;
|
||||
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value as JsonValue;
|
||||
use sha1::Digest;
|
||||
use sha1::Sha1;
|
||||
use url::Url;
|
||||
|
||||
use codex_config::McpServerConfig;
|
||||
use codex_config::McpServerTransportConfig;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_utils_plugins::mcp_connector::is_connector_id_allowed;
|
||||
use codex_utils_plugins::mcp_connector::sanitize_name;
|
||||
|
||||
/// Delimiter used to separate MCP tool-name parts.
|
||||
const MCP_TOOL_NAME_DELIMITER: &str = "__";
|
||||
|
||||
/// Default timeout for initializing MCP server & initially listing tools.
|
||||
pub(crate) const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Default timeout for individual tool calls.
|
||||
pub(crate) const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(120);
|
||||
|
||||
pub(crate) const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 2;
|
||||
const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools";
|
||||
const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str = "codex.mcp.tools.cache_write.duration_ms";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ToolInfo {
|
||||
/// Raw MCP server name used for routing the tool call.
|
||||
pub server_name: String,
|
||||
/// Model-visible tool name used in Responses API tool declarations.
|
||||
#[serde(rename = "tool_name", alias = "callable_name")]
|
||||
pub callable_name: String,
|
||||
/// Model-visible namespace used for deferred tool loading.
|
||||
#[serde(rename = "tool_namespace", alias = "callable_namespace")]
|
||||
pub callable_namespace: String,
|
||||
/// Instructions from the MCP server initialize result.
|
||||
#[serde(default)]
|
||||
pub server_instructions: Option<String>,
|
||||
/// Raw MCP tool definition; `tool.name` is sent back to the MCP server.
|
||||
pub tool: Tool,
|
||||
pub connector_id: Option<String>,
|
||||
pub connector_name: Option<String>,
|
||||
#[serde(default)]
|
||||
pub plugin_display_names: Vec<String>,
|
||||
pub connector_description: Option<String>,
|
||||
}
|
||||
|
||||
impl ToolInfo {
|
||||
pub fn canonical_tool_name(&self) -> ToolName {
|
||||
ToolName::namespaced(self.callable_namespace.clone(), self.callable_name.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn declared_openai_file_input_param_names(
|
||||
meta: Option<&Map<String, JsonValue>>,
|
||||
) -> Vec<String> {
|
||||
let Some(meta) = meta else {
|
||||
return Vec::new();
|
||||
};
|
||||
|
||||
meta.get(META_OPENAI_FILE_PARAMS)
|
||||
.and_then(JsonValue::as_array)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.filter_map(JsonValue::as_str)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct CodexAppsToolsCacheKey {
|
||||
pub(crate) account_id: Option<String>,
|
||||
pub(crate) chatgpt_user_id: Option<String>,
|
||||
pub(crate) is_workspace_account: bool,
|
||||
}
|
||||
|
||||
pub fn codex_apps_tools_cache_key(auth: Option<&CodexAuth>) -> CodexAppsToolsCacheKey {
|
||||
CodexAppsToolsCacheKey {
|
||||
account_id: auth.and_then(CodexAuth::get_account_id),
|
||||
chatgpt_user_id: auth.and_then(CodexAuth::get_chatgpt_user_id),
|
||||
is_workspace_account: auth.is_some_and(CodexAuth::is_workspace_account),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn filter_non_codex_apps_mcp_tools_only(
|
||||
mcp_tools: &HashMap<String, ToolInfo>,
|
||||
) -> HashMap<String, ToolInfo> {
|
||||
mcp_tools
|
||||
.iter()
|
||||
.filter(|(_, tool)| tool.server_name != CODEX_APPS_MCP_SERVER_NAME)
|
||||
.map(|(name, tool)| (name.clone(), tool.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// MCP server capability indicating that Codex should include [`SandboxState`]
|
||||
/// in tool-call request `_meta` under this key.
|
||||
pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SandboxState {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub permission_profile: Option<PermissionProfile>,
|
||||
pub sandbox_policy: SandboxPolicy,
|
||||
pub codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
pub sandbox_cwd: PathBuf,
|
||||
#[serde(default)]
|
||||
pub use_legacy_landlock: bool,
|
||||
}
|
||||
|
||||
/// Runtime placement information used when starting MCP server transports.
|
||||
///
|
||||
/// `McpConfig` describes what servers exist. This value describes where those
|
||||
/// servers should run for the current caller. Keep it explicit at manager
|
||||
/// construction time so status/snapshot paths and real sessions make the same
|
||||
/// local-vs-remote decision. `fallback_cwd` is not a per-server override; it is
|
||||
/// used when a stdio server omits `cwd` and the launcher needs a concrete
|
||||
/// process working directory.
|
||||
#[derive(Clone)]
|
||||
pub struct McpRuntimeEnvironment {
|
||||
environment: Arc<Environment>,
|
||||
fallback_cwd: PathBuf,
|
||||
}
|
||||
|
||||
impl McpRuntimeEnvironment {
|
||||
pub fn new(environment: Arc<Environment>, fallback_cwd: PathBuf) -> Self {
|
||||
Self {
|
||||
environment,
|
||||
fallback_cwd,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn environment(&self) -> Arc<Environment> {
|
||||
Arc::clone(&self.environment)
|
||||
}
|
||||
|
||||
pub(crate) fn fallback_cwd(&self) -> PathBuf {
|
||||
self.fallback_cwd.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// A tool is allowed to be used if both are true:
|
||||
/// 1. enabled is None (no allowlist is set) or the tool is explicitly enabled.
|
||||
/// 2. The tool is not explicitly disabled.
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct ToolFilter {
|
||||
pub(crate) enabled: Option<HashSet<String>>,
|
||||
pub(crate) disabled: HashSet<String>,
|
||||
}
|
||||
|
||||
impl ToolFilter {
|
||||
pub(crate) fn from_config(cfg: &McpServerConfig) -> Self {
|
||||
let enabled = cfg
|
||||
.enabled_tools
|
||||
.as_ref()
|
||||
.map(|tools| tools.iter().cloned().collect::<HashSet<_>>());
|
||||
let disabled = cfg
|
||||
.disabled_tools
|
||||
.as_ref()
|
||||
.map(|tools| tools.iter().cloned().collect::<HashSet<_>>())
|
||||
.unwrap_or_default();
|
||||
|
||||
Self { enabled, disabled }
|
||||
}
|
||||
|
||||
pub(crate) fn allows(&self, tool_name: &str) -> bool {
|
||||
if let Some(enabled) = &self.enabled
|
||||
&& !enabled.contains(tool_name)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
!self.disabled.contains(tool_name)
|
||||
}
|
||||
}
|
||||
|
||||
fn sha1_hex(s: &str) -> String {
|
||||
let mut hasher = Sha1::new();
|
||||
hasher.update(s.as_bytes());
|
||||
let sha1 = hasher.finalize();
|
||||
format!("{sha1:x}")
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct CodexAppsToolsCacheContext {
|
||||
pub(crate) codex_home: PathBuf,
|
||||
pub(crate) user_key: CodexAppsToolsCacheKey,
|
||||
}
|
||||
|
||||
impl CodexAppsToolsCacheContext {
|
||||
pub(crate) fn cache_path(&self) -> PathBuf {
|
||||
let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default();
|
||||
let user_key_hash = sha1_hex(&user_key_json);
|
||||
self.codex_home
|
||||
.join(CODEX_APPS_TOOLS_CACHE_DIR)
|
||||
.join(format!("{user_key_hash}.json"))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct CodexAppsToolsDiskCache {
|
||||
schema_version: u8,
|
||||
tools: Vec<ToolInfo>,
|
||||
}
|
||||
|
||||
pub(crate) enum CachedCodexAppsToolsLoad {
|
||||
Hit(Vec<ToolInfo>),
|
||||
Missing,
|
||||
Invalid,
|
||||
}
|
||||
|
||||
const META_OPENAI_FILE_PARAMS: &str = "openai/fileParams";
|
||||
|
||||
/// Returns the model-visible view of a tool while preserving the raw metadata
|
||||
/// used by execution. Keep cache entries raw and call this at manager return
|
||||
/// boundaries.
|
||||
pub(crate) fn tool_with_model_visible_input_schema(tool: &Tool) -> Tool {
|
||||
let file_params = declared_openai_file_input_param_names(tool.meta.as_deref());
|
||||
if file_params.is_empty() {
|
||||
return tool.clone();
|
||||
}
|
||||
|
||||
let mut tool = tool.clone();
|
||||
let mut input_schema = JsonValue::Object(tool.input_schema.as_ref().clone());
|
||||
mask_input_schema_for_file_path_params(&mut input_schema, &file_params);
|
||||
if let JsonValue::Object(input_schema) = input_schema {
|
||||
tool.input_schema = Arc::new(input_schema);
|
||||
}
|
||||
tool
|
||||
}
|
||||
|
||||
fn mask_input_schema_for_file_path_params(input_schema: &mut JsonValue, file_params: &[String]) {
|
||||
let Some(properties) = input_schema
|
||||
.as_object_mut()
|
||||
.and_then(|schema| schema.get_mut("properties"))
|
||||
.and_then(JsonValue::as_object_mut)
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
for field_name in file_params {
|
||||
let Some(property_schema) = properties.get_mut(field_name) else {
|
||||
continue;
|
||||
};
|
||||
mask_input_property_schema(property_schema);
|
||||
}
|
||||
}
|
||||
|
||||
fn mask_input_property_schema(schema: &mut JsonValue) {
|
||||
let Some(object) = schema.as_object_mut() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let mut description = object
|
||||
.get("description")
|
||||
.and_then(JsonValue::as_str)
|
||||
.map(str::to_string)
|
||||
.unwrap_or_default();
|
||||
let guidance = "This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here.";
|
||||
if description.is_empty() {
|
||||
description = guidance.to_string();
|
||||
} else if !description.contains(guidance) {
|
||||
description = format!("{description} {guidance}");
|
||||
}
|
||||
|
||||
let is_array = object.get("type").and_then(JsonValue::as_str) == Some("array")
|
||||
|| object.get("items").is_some();
|
||||
object.clear();
|
||||
object.insert("description".to_string(), JsonValue::String(description));
|
||||
if is_array {
|
||||
object.insert("type".to_string(), JsonValue::String("array".to_string()));
|
||||
object.insert("items".to_string(), serde_json::json!({ "type": "string" }));
|
||||
} else {
|
||||
object.insert("type".to_string(), JsonValue::String("string".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn emit_update(
|
||||
submit_id: &str,
|
||||
tx_event: &Sender<Event>,
|
||||
update: McpStartupUpdateEvent,
|
||||
) -> Result<(), async_channel::SendError<Event>> {
|
||||
tx_event
|
||||
.send(Event {
|
||||
id: submit_id.to_string(),
|
||||
msg: EventMsg::McpStartupUpdate(update),
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) fn filter_tools(tools: Vec<ToolInfo>, filter: &ToolFilter) -> Vec<ToolInfo> {
|
||||
tools
|
||||
.into_iter()
|
||||
.filter(|tool| filter.allows(&tool.tool.name))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn normalize_codex_apps_tool_title(
|
||||
server_name: &str,
|
||||
connector_name: Option<&str>,
|
||||
value: &str,
|
||||
) -> String {
|
||||
if server_name != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return value.to_string();
|
||||
}
|
||||
|
||||
let Some(connector_name) = connector_name
|
||||
.map(str::trim)
|
||||
.filter(|name| !name.is_empty())
|
||||
else {
|
||||
return value.to_string();
|
||||
};
|
||||
|
||||
let prefix = format!("{connector_name}_");
|
||||
if let Some(stripped) = value.strip_prefix(&prefix)
|
||||
&& !stripped.is_empty()
|
||||
{
|
||||
return stripped.to_string();
|
||||
}
|
||||
|
||||
value.to_string()
|
||||
}
|
||||
|
||||
pub(crate) fn normalize_codex_apps_callable_name(
|
||||
server_name: &str,
|
||||
tool_name: &str,
|
||||
connector_id: Option<&str>,
|
||||
connector_name: Option<&str>,
|
||||
) -> String {
|
||||
if server_name != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return tool_name.to_string();
|
||||
}
|
||||
|
||||
let tool_name = sanitize_name(tool_name);
|
||||
|
||||
if let Some(connector_name) = connector_name
|
||||
.map(str::trim)
|
||||
.map(sanitize_name)
|
||||
.filter(|name| !name.is_empty())
|
||||
&& let Some(stripped) = tool_name.strip_prefix(&connector_name)
|
||||
&& !stripped.is_empty()
|
||||
{
|
||||
return stripped.to_string();
|
||||
}
|
||||
|
||||
if let Some(connector_id) = connector_id
|
||||
.map(str::trim)
|
||||
.map(sanitize_name)
|
||||
.filter(|name| !name.is_empty())
|
||||
&& let Some(stripped) = tool_name.strip_prefix(&connector_id)
|
||||
&& !stripped.is_empty()
|
||||
{
|
||||
return stripped.to_string();
|
||||
}
|
||||
|
||||
tool_name
|
||||
}
|
||||
|
||||
pub(crate) fn normalize_codex_apps_callable_namespace(
|
||||
server_name: &str,
|
||||
connector_name: Option<&str>,
|
||||
) -> String {
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME
|
||||
&& let Some(connector_name) = connector_name
|
||||
{
|
||||
format!(
|
||||
"mcp{}{}{}{}",
|
||||
MCP_TOOL_NAME_DELIMITER,
|
||||
server_name,
|
||||
MCP_TOOL_NAME_DELIMITER,
|
||||
sanitize_name(connector_name)
|
||||
)
|
||||
} else {
|
||||
format!("mcp{MCP_TOOL_NAME_DELIMITER}{server_name}{MCP_TOOL_NAME_DELIMITER}")
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_bearer_token(
|
||||
server_name: &str,
|
||||
bearer_token_env_var: Option<&str>,
|
||||
) -> Result<Option<String>> {
|
||||
let Some(env_var) = bearer_token_env_var else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
match env::var(env_var) {
|
||||
Ok(value) => {
|
||||
if value.is_empty() {
|
||||
Err(anyhow!(
|
||||
"Environment variable {env_var} for MCP server '{server_name}' is empty"
|
||||
))
|
||||
} else {
|
||||
Ok(Some(value))
|
||||
}
|
||||
}
|
||||
Err(env::VarError::NotPresent) => Err(anyhow!(
|
||||
"Environment variable {env_var} for MCP server '{server_name}' is not set"
|
||||
)),
|
||||
Err(env::VarError::NotUnicode(_)) => Err(anyhow!(
|
||||
"Environment variable {env_var} for MCP server '{server_name}' contains invalid Unicode"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn write_cached_codex_apps_tools_if_needed(
|
||||
server_name: &str,
|
||||
cache_context: Option<&CodexAppsToolsCacheContext>,
|
||||
tools: &[ToolInfo],
|
||||
) {
|
||||
if server_name != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(cache_context) = cache_context {
|
||||
let cache_write_start = Instant::now();
|
||||
write_cached_codex_apps_tools(cache_context, tools);
|
||||
emit_duration(
|
||||
MCP_TOOLS_CACHE_WRITE_DURATION_METRIC,
|
||||
cache_write_start.elapsed(),
|
||||
&[],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn load_startup_cached_codex_apps_tools_snapshot(
|
||||
server_name: &str,
|
||||
cache_context: Option<&CodexAppsToolsCacheContext>,
|
||||
) -> Option<Vec<ToolInfo>> {
|
||||
if server_name != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return None;
|
||||
}
|
||||
|
||||
let cache_context = cache_context?;
|
||||
|
||||
match load_cached_codex_apps_tools(cache_context) {
|
||||
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
|
||||
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn read_cached_codex_apps_tools(
|
||||
cache_context: &CodexAppsToolsCacheContext,
|
||||
) -> Option<Vec<ToolInfo>> {
|
||||
match load_cached_codex_apps_tools(cache_context) {
|
||||
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
|
||||
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn load_cached_codex_apps_tools(
|
||||
cache_context: &CodexAppsToolsCacheContext,
|
||||
) -> CachedCodexAppsToolsLoad {
|
||||
let cache_path = cache_context.cache_path();
|
||||
let bytes = match std::fs::read(cache_path) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
return CachedCodexAppsToolsLoad::Missing;
|
||||
}
|
||||
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
|
||||
};
|
||||
let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) {
|
||||
Ok(cache) => cache,
|
||||
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
|
||||
};
|
||||
if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION {
|
||||
return CachedCodexAppsToolsLoad::Invalid;
|
||||
}
|
||||
CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools))
|
||||
}
|
||||
|
||||
pub(crate) fn write_cached_codex_apps_tools(
|
||||
cache_context: &CodexAppsToolsCacheContext,
|
||||
tools: &[ToolInfo],
|
||||
) {
|
||||
let cache_path = cache_context.cache_path();
|
||||
if let Some(parent) = cache_path.parent()
|
||||
&& std::fs::create_dir_all(parent).is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
let tools = filter_disallowed_codex_apps_tools(tools.to_vec());
|
||||
let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache {
|
||||
schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION,
|
||||
tools,
|
||||
}) else {
|
||||
return;
|
||||
};
|
||||
let _ = std::fs::write(cache_path, bytes);
|
||||
}
|
||||
|
||||
pub(crate) fn filter_disallowed_codex_apps_tools(tools: Vec<ToolInfo>) -> Vec<ToolInfo> {
|
||||
tools
|
||||
.into_iter()
|
||||
.filter(|tool| {
|
||||
tool.connector_id
|
||||
.as_deref()
|
||||
.is_none_or(is_connector_id_allowed)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) {
|
||||
if let Some(metrics) = codex_otel::global() {
|
||||
let _ = metrics.record_duration(metric, duration, tags);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn transport_origin(transport: &McpServerTransportConfig) -> Option<String> {
|
||||
match transport {
|
||||
McpServerTransportConfig::StreamableHttp { url, .. } => {
|
||||
let parsed = Url::parse(url).ok()?;
|
||||
Some(parsed.origin().ascii_serialization())
|
||||
}
|
||||
McpServerTransportConfig::Stdio { .. } => Some("stdio".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn validate_mcp_server_name(server_name: &str) -> Result<()> {
|
||||
let re = regex_lite::Regex::new(r"^[a-zA-Z0-9_-]+$")?;
|
||||
if !re.is_match(server_name) {
|
||||
return Err(anyhow!(
|
||||
"Invalid MCP server name '{server_name}': must match pattern {pattern}",
|
||||
pattern = re.as_str()
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn mcp_init_error_display(
|
||||
server_name: &str,
|
||||
entry: Option<&McpAuthStatusEntry>,
|
||||
err: &StartupOutcomeError,
|
||||
) -> String {
|
||||
if let Some(McpServerTransportConfig::StreamableHttp {
|
||||
url,
|
||||
bearer_token_env_var,
|
||||
http_headers,
|
||||
..
|
||||
}) = &entry.map(|entry| &entry.config.transport)
|
||||
&& url == "https://api.githubcopilot.com/mcp/"
|
||||
&& bearer_token_env_var.is_none()
|
||||
&& http_headers.as_ref().map(HashMap::is_empty).unwrap_or(true)
|
||||
{
|
||||
format!(
|
||||
"GitHub MCP does not support OAuth. Log in by adding a personal access token (https://github.com/settings/personal-access-tokens) to your environment and config.toml:\n[mcp_servers.{server_name}]\nbearer_token_env_var = CODEX_GITHUB_PERSONAL_ACCESS_TOKEN"
|
||||
)
|
||||
} else if is_mcp_client_auth_required_error(err) {
|
||||
format!(
|
||||
"The {server_name} MCP server is not logged in. Run `codex mcp login {server_name}`."
|
||||
)
|
||||
} else if is_mcp_client_startup_timeout_error(err) {
|
||||
let startup_timeout_secs = match entry {
|
||||
Some(entry) => match entry.config.startup_timeout_sec {
|
||||
Some(timeout) => timeout,
|
||||
None => DEFAULT_STARTUP_TIMEOUT,
|
||||
},
|
||||
None => DEFAULT_STARTUP_TIMEOUT,
|
||||
}
|
||||
.as_secs();
|
||||
format!(
|
||||
"MCP client for `{server_name}` timed out after {startup_timeout_secs} seconds. Add or adjust `startup_timeout_sec` in your config.toml:\n[mcp_servers.{server_name}]\nstartup_timeout_sec = XX"
|
||||
)
|
||||
} else {
|
||||
format!("MCP client for `{server_name}` failed to start: {err:#}")
|
||||
}
|
||||
}
|
||||
|
||||
fn is_mcp_client_auth_required_error(error: &StartupOutcomeError) -> bool {
|
||||
match error {
|
||||
StartupOutcomeError::Failed { error } => error.contains("Auth required"),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_mcp_client_startup_timeout_error(error: &StartupOutcomeError) -> bool {
|
||||
match error {
|
||||
StartupOutcomeError::Failed { error } => {
|
||||
error.contains("request timed out")
|
||||
|| error.contains("timed out handshaking with MCP server")
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn startup_outcome_error_message(error: StartupOutcomeError) -> String {
|
||||
match error {
|
||||
StartupOutcomeError::Cancelled => "MCP startup cancelled".to_string(),
|
||||
StartupOutcomeError::Failed { error } => error,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod mcp_init_error_display_tests {}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,11 +1,35 @@
|
||||
use super::*;
|
||||
use crate::client::AsyncManagedClient;
|
||||
use crate::client::ManagedClient;
|
||||
use crate::client::StartupOutcomeError;
|
||||
use crate::client::elicitation_capability_for_server;
|
||||
use crate::declared_openai_file_input_param_names;
|
||||
use crate::elicitation::ElicitationRequestManager;
|
||||
use crate::elicitation::elicitation_is_rejected_by_policy;
|
||||
use crate::mcp_connection::CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION;
|
||||
use crate::mcp_connection::CodexAppsToolsCacheContext;
|
||||
use crate::mcp_connection::ToolFilter;
|
||||
use crate::mcp_connection::ToolInfo;
|
||||
use crate::mcp_connection::filter_tools;
|
||||
use crate::mcp_connection::load_startup_cached_codex_apps_tools_snapshot;
|
||||
use crate::mcp_connection::read_cached_codex_apps_tools;
|
||||
use crate::mcp_connection::tool_with_model_visible_input_schema;
|
||||
use crate::mcp_connection::transport_origin;
|
||||
use crate::mcp_connection::write_cached_codex_apps_tools;
|
||||
use codex_config::Constrained;
|
||||
use codex_protocol::ToolName;
|
||||
use codex_protocol::protocol::GranularApprovalConfig;
|
||||
use codex_protocol::protocol::McpAuthStatus;
|
||||
use futures::FutureExt;
|
||||
use pretty_assertions::assert_eq;
|
||||
use rmcp::model::CreateElicitationRequestParams;
|
||||
use rmcp::model::ElicitationAction;
|
||||
use rmcp::model::ElicitationCapability;
|
||||
use rmcp::model::FormElicitationCapability;
|
||||
use rmcp::model::JsonObject;
|
||||
use rmcp::model::Meta;
|
||||
use rmcp::model::NumberOrString;
|
||||
use rmcp::model::Tool;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempdir;
|
||||
|
||||
@@ -8,7 +8,7 @@ use sha1::Sha1;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::mcp::sanitize_responses_api_tool_name;
|
||||
use crate::mcp_connection_manager::ToolInfo;
|
||||
use crate::mcp_connection::ToolInfo;
|
||||
|
||||
const MCP_TOOL_NAME_DELIMITER: &str = "__";
|
||||
const MAX_TOOL_NAME_LENGTH: usize = 64;
|
||||
|
||||
Reference in New Issue
Block a user