Compare commits

..

1 Commits

Author SHA1 Message Date
Aismit Das
3ce2525848 Refresh AGENTS.md on cwd changes 2026-04-25 01:07:48 -04:00
22 changed files with 2615 additions and 2191 deletions

View File

@@ -94,7 +94,7 @@ In `workspace-write`, Codex also includes `~/.codex/memories` in its writable ro
This folder is the root of a Cargo workspace. It contains quite a bit of experimental code, but here are the key crates:
- [`core/`](./core) contains the business logic for Codex. Ultimately, we hope this becomes a library crate that is generally useful for building other Rust/native applications that use Codex.
- [`core/`](./core) contains the business logic for Codex. Ultimately, we hope this to be a library crate that is generally useful for building other Rust/native applications that use Codex.
- [`exec/`](./exec) "headless" CLI for use in automation.
- [`tui/`](./tui) CLI that launches a fullscreen TUI built with [Ratatui](https://ratatui.rs/).
- [`cli/`](./cli) CLI multitool that provides the aforementioned CLIs via subcommands.

View File

@@ -38,6 +38,7 @@ tracing = { workspace = true }
url = { workspace = true }
[dev-dependencies]
codex-utils-absolute-path = { workspace = true }
pretty_assertions = { workspace = true }
rmcp = { workspace = true, default-features = false, features = ["base64", "macros", "schemars", "server"] }
tempfile = { workspace = true }

View File

@@ -1,250 +0,0 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Instant;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::runtime::emit_duration;
use crate::tools::MCP_TOOLS_CACHE_WRITE_DURATION_METRIC;
use crate::tools::ToolInfo;
use codex_login::CodexAuth;
use codex_utils_plugins::mcp_connector::is_connector_id_allowed;
use codex_utils_plugins::mcp_connector::sanitize_name;
use serde::Deserialize;
use serde::Serialize;
use sha1::Digest;
use sha1::Sha1;
pub(crate) const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 2;
const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CodexAppsToolsCacheKey {
pub(crate) account_id: Option<String>,
pub(crate) chatgpt_user_id: Option<String>,
pub(crate) is_workspace_account: bool,
}
pub fn codex_apps_tools_cache_key(auth: Option<&CodexAuth>) -> CodexAppsToolsCacheKey {
CodexAppsToolsCacheKey {
account_id: auth.and_then(CodexAuth::get_account_id),
chatgpt_user_id: auth.and_then(CodexAuth::get_chatgpt_user_id),
is_workspace_account: auth.is_some_and(CodexAuth::is_workspace_account),
}
}
pub fn filter_non_codex_apps_mcp_tools_only(
mcp_tools: &HashMap<String, ToolInfo>,
) -> HashMap<String, ToolInfo> {
mcp_tools
.iter()
.filter(|(_, tool)| tool.server_name != CODEX_APPS_MCP_SERVER_NAME)
.map(|(name, tool)| (name.clone(), tool.clone()))
.collect()
}
fn sha1_hex(s: &str) -> String {
let mut hasher = Sha1::new();
hasher.update(s.as_bytes());
let sha1 = hasher.finalize();
format!("{sha1:x}")
}
#[derive(Clone)]
pub(crate) struct CodexAppsToolsCacheContext {
pub(crate) codex_home: PathBuf,
pub(crate) user_key: CodexAppsToolsCacheKey,
}
impl CodexAppsToolsCacheContext {
pub(crate) fn cache_path(&self) -> PathBuf {
let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default();
let user_key_hash = sha1_hex(&user_key_json);
self.codex_home
.join(CODEX_APPS_TOOLS_CACHE_DIR)
.join(format!("{user_key_hash}.json"))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CodexAppsToolsDiskCache {
schema_version: u8,
tools: Vec<ToolInfo>,
}
pub(crate) enum CachedCodexAppsToolsLoad {
Hit(Vec<ToolInfo>),
Missing,
Invalid,
}
pub(crate) fn normalize_codex_apps_tool_title(
server_name: &str,
connector_name: Option<&str>,
value: &str,
) -> String {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return value.to_string();
}
let Some(connector_name) = connector_name
.map(str::trim)
.filter(|name| !name.is_empty())
else {
return value.to_string();
};
let prefix = format!("{connector_name}_");
if let Some(stripped) = value.strip_prefix(&prefix)
&& !stripped.is_empty()
{
return stripped.to_string();
}
value.to_string()
}
pub(crate) fn normalize_codex_apps_callable_name(
server_name: &str,
tool_name: &str,
connector_id: Option<&str>,
connector_name: Option<&str>,
) -> String {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return tool_name.to_string();
}
let tool_name = sanitize_name(tool_name);
if let Some(connector_name) = connector_name
.map(str::trim)
.map(sanitize_name)
.filter(|name| !name.is_empty())
&& let Some(stripped) = tool_name.strip_prefix(&connector_name)
&& !stripped.is_empty()
{
return stripped.to_string();
}
if let Some(connector_id) = connector_id
.map(str::trim)
.map(sanitize_name)
.filter(|name| !name.is_empty())
&& let Some(stripped) = tool_name.strip_prefix(&connector_id)
&& !stripped.is_empty()
{
return stripped.to_string();
}
tool_name
}
pub(crate) fn normalize_codex_apps_callable_namespace(
server_name: &str,
connector_name: Option<&str>,
) -> String {
if server_name == CODEX_APPS_MCP_SERVER_NAME
&& let Some(connector_name) = connector_name
{
format!("mcp__{}__{}", server_name, sanitize_name(connector_name))
} else {
format!("mcp__{server_name}__")
}
}
pub(crate) fn write_cached_codex_apps_tools_if_needed(
server_name: &str,
cache_context: Option<&CodexAppsToolsCacheContext>,
tools: &[ToolInfo],
) {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return;
}
if let Some(cache_context) = cache_context {
let cache_write_start = Instant::now();
write_cached_codex_apps_tools(cache_context, tools);
emit_duration(
MCP_TOOLS_CACHE_WRITE_DURATION_METRIC,
cache_write_start.elapsed(),
&[],
);
}
}
pub(crate) fn load_startup_cached_codex_apps_tools_snapshot(
server_name: &str,
cache_context: Option<&CodexAppsToolsCacheContext>,
) -> Option<Vec<ToolInfo>> {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return None;
}
let cache_context = cache_context?;
match load_cached_codex_apps_tools(cache_context) {
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
}
}
#[cfg(test)]
pub(crate) fn read_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
) -> Option<Vec<ToolInfo>> {
match load_cached_codex_apps_tools(cache_context) {
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
}
}
pub(crate) fn load_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
) -> CachedCodexAppsToolsLoad {
let cache_path = cache_context.cache_path();
let bytes = match std::fs::read(cache_path) {
Ok(bytes) => bytes,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return CachedCodexAppsToolsLoad::Missing;
}
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
};
let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) {
Ok(cache) => cache,
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
};
if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION {
return CachedCodexAppsToolsLoad::Invalid;
}
CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools))
}
pub(crate) fn write_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
tools: &[ToolInfo],
) {
let cache_path = cache_context.cache_path();
if let Some(parent) = cache_path.parent()
&& std::fs::create_dir_all(parent).is_err()
{
return;
}
let tools = filter_disallowed_codex_apps_tools(tools.to_vec());
let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache {
schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION,
tools,
}) else {
return;
};
let _ = std::fs::write(cache_path, bytes);
}
pub(crate) fn filter_disallowed_codex_apps_tools(tools: Vec<ToolInfo>) -> Vec<ToolInfo> {
tools
.into_iter()
.filter(|tool| {
tool.connector_id
.as_deref()
.is_none_or(is_connector_id_allowed)
})
.collect()
}

View File

@@ -1,583 +0,0 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::env;
use std::ffi::OsString;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use crate::apps::CachedCodexAppsToolsLoad;
use crate::apps::CodexAppsToolsCacheContext;
use crate::apps::filter_disallowed_codex_apps_tools;
use crate::apps::load_cached_codex_apps_tools;
use crate::apps::load_startup_cached_codex_apps_tools_snapshot;
use crate::apps::normalize_codex_apps_callable_name;
use crate::apps::normalize_codex_apps_callable_namespace;
use crate::apps::normalize_codex_apps_tool_title;
use crate::apps::write_cached_codex_apps_tools_if_needed;
use crate::elicitation::ElicitationRequestManager;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::ToolPluginProvenance;
use crate::runtime::McpRuntimeEnvironment;
use crate::runtime::emit_duration;
use crate::tools::ToolFilter;
use crate::tools::ToolInfo;
use crate::tools::filter_tools;
use crate::tools::tool_with_model_visible_input_schema;
use anyhow::Result;
use anyhow::anyhow;
use async_channel::Sender;
use codex_api::SharedAuthProvider;
use codex_async_utils::CancelErr;
use codex_async_utils::OrCancelExt;
use codex_config::McpServerConfig;
use codex_config::McpServerTransportConfig;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_exec_server::HttpClient;
use codex_exec_server::ReqwestHttpClient;
use codex_protocol::protocol::Event;
use codex_rmcp_client::ExecutorStdioServerLauncher;
use codex_rmcp_client::LocalStdioServerLauncher;
use codex_rmcp_client::RmcpClient;
use codex_rmcp_client::StdioServerLauncher;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::future::Shared;
use rmcp::model::ClientCapabilities;
use rmcp::model::ElicitationCapability;
use rmcp::model::FormElicitationCapability;
use rmcp::model::Implementation;
use rmcp::model::InitializeRequestParams;
use rmcp::model::ProtocolVersion;
use tokio_util::sync::CancellationToken;
pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str =
"codex.mcp.tools.fetch_uncached.duration_ms";
pub(crate) const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(120);
/// MCP server capability indicating that Codex should include [`SandboxState`]
/// in tool-call request `_meta` under this key.
pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta";
#[derive(Clone)]
pub(crate) struct ManagedClient {
pub(crate) client: Arc<RmcpClient>,
pub(crate) tools: Vec<ToolInfo>,
pub(crate) tool_filter: ToolFilter,
pub(crate) tool_timeout: Option<Duration>,
pub(crate) server_instructions: Option<String>,
pub(crate) server_supports_sandbox_state_meta_capability: bool,
pub(crate) codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
impl ManagedClient {
fn listed_tools(&self) -> Vec<ToolInfo> {
let total_start = Instant::now();
if let Some(cache_context) = self.codex_apps_tools_cache_context.as_ref()
&& let CachedCodexAppsToolsLoad::Hit(tools) =
load_cached_codex_apps_tools(cache_context)
{
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
total_start.elapsed(),
&[("cache", "hit")],
);
return filter_tools(tools, &self.tool_filter);
}
if self.codex_apps_tools_cache_context.is_some() {
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
total_start.elapsed(),
&[("cache", "miss")],
);
}
self.tools.clone()
}
}
#[derive(Clone)]
pub(crate) struct AsyncManagedClient {
pub(crate) client: Shared<BoxFuture<'static, Result<ManagedClient, StartupOutcomeError>>>,
pub(crate) startup_snapshot: Option<Vec<ToolInfo>>,
pub(crate) startup_complete: Arc<AtomicBool>,
pub(crate) tool_plugin_provenance: Arc<ToolPluginProvenance>,
}
impl AsyncManagedClient {
// Keep this constructor flat so the startup inputs remain readable at the
// single call site instead of introducing a one-off params wrapper.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
server_name: String,
config: McpServerConfig,
store_mode: OAuthCredentialsStoreMode,
cancel_token: CancellationToken,
tx_event: Sender<Event>,
elicitation_requests: ElicitationRequestManager,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
tool_plugin_provenance: Arc<ToolPluginProvenance>,
runtime_environment: McpRuntimeEnvironment,
runtime_auth_provider: Option<SharedAuthProvider>,
) -> Self {
let tool_filter = ToolFilter::from_config(&config);
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
&server_name,
codex_apps_tools_cache_context.as_ref(),
)
.map(|tools| filter_tools(tools, &tool_filter));
let startup_tool_filter = tool_filter;
let startup_complete = Arc::new(AtomicBool::new(false));
let startup_complete_for_fut = Arc::clone(&startup_complete);
let fut = async move {
let outcome = async {
if let Err(error) = validate_mcp_server_name(&server_name) {
return Err(error.into());
}
let client = Arc::new(
make_rmcp_client(
&server_name,
config.clone(),
store_mode,
runtime_environment,
runtime_auth_provider,
)
.await?,
);
match start_server_task(
server_name,
client,
StartServerTaskParams {
startup_timeout: config
.startup_timeout_sec
.or(Some(DEFAULT_STARTUP_TIMEOUT)),
tool_timeout: config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT),
tool_filter: startup_tool_filter,
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
},
)
.or_cancel(&cancel_token)
.await
{
Ok(result) => result,
Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled),
}
}
.await;
startup_complete_for_fut.store(true, Ordering::Release);
outcome
};
let client = fut.boxed().shared();
if startup_snapshot.is_some() {
let startup_task = client.clone();
tokio::spawn(async move {
let _ = startup_task.await;
});
}
Self {
client,
startup_snapshot,
startup_complete,
tool_plugin_provenance,
}
}
pub(crate) async fn client(&self) -> Result<ManagedClient, StartupOutcomeError> {
self.client.clone().await
}
fn startup_snapshot_while_initializing(&self) -> Option<Vec<ToolInfo>> {
if !self.startup_complete.load(Ordering::Acquire) {
return self.startup_snapshot.clone();
}
None
}
pub(crate) async fn listed_tools(&self) -> Option<Vec<ToolInfo>> {
let annotate_tools = |tools: Vec<ToolInfo>| {
let mut tools = tools;
for tool in &mut tools {
if tool.server_name == CODEX_APPS_MCP_SERVER_NAME {
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
}
let plugin_names = match tool.connector_id.as_deref() {
Some(connector_id) => self
.tool_plugin_provenance
.plugin_display_names_for_connector_id(connector_id),
None => self
.tool_plugin_provenance
.plugin_display_names_for_mcp_server_name(tool.server_name.as_str()),
};
tool.plugin_display_names = plugin_names.to_vec();
if plugin_names.is_empty() {
continue;
}
let plugin_source_note = if plugin_names.len() == 1 {
format!("This tool is part of plugin `{}`.", plugin_names[0])
} else {
format!(
"This tool is part of plugins {}.",
plugin_names
.iter()
.map(|plugin_name| format!("`{plugin_name}`"))
.collect::<Vec<_>>()
.join(", ")
)
};
let description = tool
.tool
.description
.as_deref()
.map(str::trim)
.unwrap_or("");
let annotated_description = if description.is_empty() {
plugin_source_note
} else if matches!(description.chars().last(), Some('.' | '!' | '?')) {
format!("{description} {plugin_source_note}")
} else {
format!("{description}. {plugin_source_note}")
};
tool.tool.description = Some(Cow::Owned(annotated_description));
}
tools
};
// Keep cache payloads raw; plugin provenance is resolved per-session at read time.
let tools = if let Some(startup_tools) = self.startup_snapshot_while_initializing() {
Some(startup_tools)
} else {
match self.client().await {
Ok(client) => Some(client.listed_tools()),
Err(_) => self.startup_snapshot.clone(),
}
};
tools.map(annotate_tools)
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub(crate) enum StartupOutcomeError {
#[error("MCP startup cancelled")]
Cancelled,
// We can't store the original error here because anyhow::Error doesn't implement
// `Clone`.
#[error("MCP startup failed: {error}")]
Failed { error: String },
}
impl From<anyhow::Error> for StartupOutcomeError {
fn from(error: anyhow::Error) -> Self {
Self::Failed {
error: error.to_string(),
}
}
}
pub(crate) fn elicitation_capability_for_server(
_server_name: &str,
) -> Option<ElicitationCapability> {
// https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities
// indicates this should be an empty object.
Some(ElicitationCapability {
form: Some(FormElicitationCapability {
schema_validation: None,
}),
url: None,
})
}
fn resolve_bearer_token(
server_name: &str,
bearer_token_env_var: Option<&str>,
) -> Result<Option<String>> {
let Some(env_var) = bearer_token_env_var else {
return Ok(None);
};
match env::var(env_var) {
Ok(value) => {
if value.is_empty() {
Err(anyhow!(
"Environment variable {env_var} for MCP server '{server_name}' is empty"
))
} else {
Ok(Some(value))
}
}
Err(env::VarError::NotPresent) => Err(anyhow!(
"Environment variable {env_var} for MCP server '{server_name}' is not set"
)),
Err(env::VarError::NotUnicode(_)) => Err(anyhow!(
"Environment variable {env_var} for MCP server '{server_name}' contains invalid Unicode"
)),
}
}
fn validate_mcp_server_name(server_name: &str) -> Result<()> {
let re = regex_lite::Regex::new(r"^[a-zA-Z0-9_-]+$")?;
if !re.is_match(server_name) {
return Err(anyhow!(
"Invalid MCP server name '{server_name}': must match pattern {pattern}",
pattern = re.as_str()
));
}
Ok(())
}
async fn start_server_task(
server_name: String,
client: Arc<RmcpClient>,
params: StartServerTaskParams,
) -> Result<ManagedClient, StartupOutcomeError> {
let StartServerTaskParams {
startup_timeout,
tool_timeout,
tool_filter,
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
} = params;
let elicitation = elicitation_capability_for_server(&server_name);
let params = InitializeRequestParams {
meta: None,
capabilities: ClientCapabilities {
experimental: None,
extensions: None,
roots: None,
sampling: None,
elicitation,
tasks: None,
},
client_info: Implementation {
name: "codex-mcp-client".to_owned(),
version: env!("CARGO_PKG_VERSION").to_owned(),
title: Some("Codex".into()),
description: None,
icons: None,
website_url: None,
},
protocol_version: ProtocolVersion::V_2025_06_18,
};
let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event);
let initialize_result = client
.initialize(params, startup_timeout, send_elicitation)
.await
.map_err(StartupOutcomeError::from)?;
let server_supports_sandbox_state_meta_capability = initialize_result
.capabilities
.experimental
.as_ref()
.and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY))
.is_some();
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(
&server_name,
&client,
startup_timeout,
initialize_result.instructions.as_deref(),
)
.await
.map_err(StartupOutcomeError::from)?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
&[],
);
write_cached_codex_apps_tools_if_needed(
&server_name,
codex_apps_tools_cache_context.as_ref(),
&tools,
);
if server_name == CODEX_APPS_MCP_SERVER_NAME {
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
list_start.elapsed(),
&[("cache", "miss")],
);
}
let tools = filter_tools(tools, &tool_filter);
let managed = ManagedClient {
client: Arc::clone(&client),
tools,
tool_timeout: Some(tool_timeout),
tool_filter,
server_instructions: initialize_result.instructions,
server_supports_sandbox_state_meta_capability,
codex_apps_tools_cache_context,
};
Ok(managed)
}
struct StartServerTaskParams {
startup_timeout: Option<Duration>, // TODO: cancel_token should handle this.
tool_timeout: Duration,
tool_filter: ToolFilter,
tx_event: Sender<Event>,
elicitation_requests: ElicitationRequestManager,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
async fn make_rmcp_client(
server_name: &str,
config: McpServerConfig,
store_mode: OAuthCredentialsStoreMode,
runtime_environment: McpRuntimeEnvironment,
runtime_auth_provider: Option<SharedAuthProvider>,
) -> Result<RmcpClient, StartupOutcomeError> {
let McpServerConfig {
transport,
experimental_environment,
..
} = config;
let remote_environment = match experimental_environment.as_deref() {
None | Some("local") => false,
Some("remote") => {
if !runtime_environment.environment().is_remote() {
return Err(StartupOutcomeError::from(anyhow!(
"remote MCP server `{server_name}` requires a remote environment"
)));
}
true
}
Some(environment) => {
return Err(StartupOutcomeError::from(anyhow!(
"unsupported experimental_environment `{environment}` for MCP server `{server_name}`"
)));
}
};
match transport {
McpServerTransportConfig::Stdio {
command,
args,
env,
env_vars,
cwd,
} => {
let command_os: OsString = command.into();
let args_os: Vec<OsString> = args.into_iter().map(Into::into).collect();
let env_os = env.map(|env| {
env.into_iter()
.map(|(key, value)| (key.into(), value.into()))
.collect::<HashMap<_, _>>()
});
let launcher = if remote_environment {
Arc::new(ExecutorStdioServerLauncher::new(
runtime_environment.environment().get_exec_backend(),
runtime_environment.fallback_cwd(),
))
} else {
Arc::new(LocalStdioServerLauncher::new(
runtime_environment.fallback_cwd(),
)) as Arc<dyn StdioServerLauncher>
};
// `RmcpClient` always sees a launched MCP stdio server. The
// launcher hides whether that means a local child process or an
// executor process whose stdin/stdout bytes cross the process API.
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher)
.await
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
}
McpServerTransportConfig::StreamableHttp {
url,
http_headers,
env_http_headers,
bearer_token_env_var,
} => {
let http_client: Arc<dyn HttpClient> = if remote_environment {
runtime_environment.environment().get_http_client()
} else {
Arc::new(ReqwestHttpClient)
};
let resolved_bearer_token =
match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) {
Ok(token) => token,
Err(error) => return Err(error.into()),
};
RmcpClient::new_streamable_http_client(
server_name,
&url,
resolved_bearer_token,
http_headers,
env_http_headers,
store_mode,
http_client,
runtime_auth_provider,
)
.await
.map_err(StartupOutcomeError::from)
}
}
}
pub(crate) async fn list_tools_for_client_uncached(
server_name: &str,
client: &Arc<RmcpClient>,
timeout: Option<Duration>,
server_instructions: Option<&str>,
) -> Result<Vec<ToolInfo>> {
let resp = client
.list_tools_with_connector_ids(/*params*/ None, timeout)
.await?;
let tools = resp
.tools
.into_iter()
.map(|tool| {
let callable_name = normalize_codex_apps_callable_name(
server_name,
&tool.tool.name,
tool.connector_id.as_deref(),
tool.connector_name.as_deref(),
);
let callable_namespace = normalize_codex_apps_callable_namespace(
server_name,
tool.connector_name.as_deref(),
);
let connector_name = tool.connector_name;
let connector_description = tool.connector_description;
let mut tool_def = tool.tool;
if let Some(title) = tool_def.title.as_deref() {
let normalized_title =
normalize_codex_apps_tool_title(server_name, connector_name.as_deref(), title);
if tool_def.title.as_deref() != Some(normalized_title.as_str()) {
tool_def.title = Some(normalized_title);
}
}
ToolInfo {
server_name: server_name.to_owned(),
callable_name,
callable_namespace,
server_instructions: server_instructions.map(str::to_string),
tool: tool_def,
connector_id: tool.connector_id,
connector_name,
plugin_display_names: Vec::new(),
connector_description,
}
})
.collect();
if server_name == CODEX_APPS_MCP_SERVER_NAME {
return Ok(filter_disallowed_codex_apps_tools(tools));
}
Ok(tools)
}

View File

@@ -1,180 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use crate::mcp::mcp_permission_prompt_is_auto_approved;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use async_channel::Sender;
use codex_protocol::approvals::ElicitationRequest;
use codex_protocol::approvals::ElicitationRequestEvent;
use codex_protocol::mcp::RequestId as ProtocolRequestId;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::SandboxPolicy;
use codex_rmcp_client::ElicitationResponse;
use codex_rmcp_client::SendElicitation;
use futures::future::FutureExt;
use rmcp::model::CreateElicitationRequestParams;
use rmcp::model::ElicitationAction;
use rmcp::model::RequestId;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
type ResponderMap = HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>;
pub(crate) fn elicitation_is_rejected_by_policy(approval_policy: AskForApproval) -> bool {
match approval_policy {
AskForApproval::Never => true,
AskForApproval::OnFailure => false,
AskForApproval::OnRequest => false,
AskForApproval::UnlessTrusted => false,
AskForApproval::Granular(granular_config) => !granular_config.allows_mcp_elicitations(),
}
}
fn can_auto_accept_elicitation(elicitation: &CreateElicitationRequestParams) -> bool {
match elicitation {
CreateElicitationRequestParams::FormElicitationParams {
requested_schema, ..
} => {
// Auto-accept confirm/approval elicitations without schema requirements.
requested_schema.properties.is_empty()
}
CreateElicitationRequestParams::UrlElicitationParams { .. } => false,
}
}
#[derive(Clone)]
pub(crate) struct ElicitationRequestManager {
requests: Arc<Mutex<ResponderMap>>,
pub(crate) approval_policy: Arc<StdMutex<AskForApproval>>,
pub(crate) sandbox_policy: Arc<StdMutex<SandboxPolicy>>,
}
impl ElicitationRequestManager {
pub(crate) fn new(approval_policy: AskForApproval, sandbox_policy: SandboxPolicy) -> Self {
Self {
requests: Arc::new(Mutex::new(HashMap::new())),
approval_policy: Arc::new(StdMutex::new(approval_policy)),
sandbox_policy: Arc::new(StdMutex::new(sandbox_policy)),
}
}
pub(crate) async fn resolve(
&self,
server_name: String,
id: RequestId,
response: ElicitationResponse,
) -> Result<()> {
self.requests
.lock()
.await
.remove(&(server_name, id))
.ok_or_else(|| anyhow!("elicitation request not found"))?
.send(response)
.map_err(|e| anyhow!("failed to send elicitation response: {e:?}"))
}
pub(crate) fn make_sender(
&self,
server_name: String,
tx_event: Sender<Event>,
) -> SendElicitation {
let elicitation_requests = self.requests.clone();
let approval_policy = self.approval_policy.clone();
let sandbox_policy = self.sandbox_policy.clone();
Box::new(move |id, elicitation| {
let elicitation_requests = elicitation_requests.clone();
let tx_event = tx_event.clone();
let server_name = server_name.clone();
let approval_policy = approval_policy.clone();
let sandbox_policy = sandbox_policy.clone();
async move {
let approval_policy = approval_policy
.lock()
.map(|policy| *policy)
.unwrap_or(AskForApproval::Never);
let sandbox_policy = sandbox_policy
.lock()
.map(|policy| policy.clone())
.unwrap_or_else(|_| SandboxPolicy::new_read_only_policy());
if mcp_permission_prompt_is_auto_approved(approval_policy, &sandbox_policy)
&& can_auto_accept_elicitation(&elicitation)
{
return Ok(ElicitationResponse {
action: ElicitationAction::Accept,
content: Some(serde_json::json!({})),
meta: None,
});
}
if elicitation_is_rejected_by_policy(approval_policy) {
return Ok(ElicitationResponse {
action: ElicitationAction::Decline,
content: None,
meta: None,
});
}
let request = match elicitation {
CreateElicitationRequestParams::FormElicitationParams {
meta,
message,
requested_schema,
} => ElicitationRequest::Form {
meta: meta
.map(serde_json::to_value)
.transpose()
.context("failed to serialize MCP elicitation metadata")?,
message,
requested_schema: serde_json::to_value(requested_schema)
.context("failed to serialize MCP elicitation schema")?,
},
CreateElicitationRequestParams::UrlElicitationParams {
meta,
message,
url,
elicitation_id,
} => ElicitationRequest::Url {
meta: meta
.map(serde_json::to_value)
.transpose()
.context("failed to serialize MCP elicitation metadata")?,
message,
url,
elicitation_id,
},
};
let (tx, rx) = oneshot::channel();
{
let mut lock = elicitation_requests.lock().await;
lock.insert((server_name.clone(), id.clone()), tx);
}
let _ = tx_event
.send(Event {
id: "mcp_elicitation_request".to_string(),
msg: EventMsg::ElicitationRequest(ElicitationRequestEvent {
turn_id: None,
server_name,
id: match id.clone() {
rmcp::model::NumberOrString::String(value) => {
ProtocolRequestId::String(value.to_string())
}
rmcp::model::NumberOrString::Number(value) => {
ProtocolRequestId::Integer(value)
}
},
request,
}),
})
.await;
rx.await
.context("elicitation request channel closed unexpectedly")
}
.boxed()
})
}
}

View File

@@ -1,47 +1,47 @@
pub use client::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use manager::McpConnectionManager;
pub use runtime::McpRuntimeEnvironment;
pub use runtime::SandboxState;
pub use tools::ToolInfo;
pub(crate) mod mcp;
pub(crate) mod mcp_connection_manager;
pub(crate) mod mcp_tool_names;
pub use mcp::CODEX_APPS_MCP_SERVER_NAME;
pub use mcp::McpConfig;
pub use mcp::ToolPluginProvenance;
pub use apps::CodexAppsToolsCacheKey;
pub use apps::codex_apps_tools_cache_key;
pub use mcp::configured_mcp_servers;
pub use mcp::effective_mcp_servers;
pub use mcp::tool_plugin_provenance;
pub use mcp::with_codex_apps_mcp;
pub use mcp::McpServerStatusSnapshot;
pub use mcp::McpSnapshotDetail;
pub use mcp::collect_mcp_server_status_snapshot_with_detail;
pub use mcp::collect_mcp_snapshot_from_manager;
pub use mcp::read_mcp_resource;
pub use mcp::McpAuthStatusEntry;
pub use mcp::McpConfig;
pub use mcp::McpManager;
pub use mcp::McpOAuthLoginConfig;
pub use mcp::McpOAuthLoginSupport;
pub use mcp::McpOAuthScopesSource;
pub use mcp::McpServerStatusSnapshot;
pub use mcp::McpSnapshotDetail;
pub use mcp::ResolvedMcpOAuthScopes;
pub use mcp::ToolPluginProvenance;
pub use mcp::canonical_mcp_server_key;
pub use mcp::collect_mcp_server_status_snapshot;
pub use mcp::collect_mcp_server_status_snapshot_with_detail;
pub use mcp::collect_mcp_snapshot;
pub use mcp::collect_mcp_snapshot_from_manager;
pub use mcp::collect_mcp_snapshot_from_manager_with_detail;
pub use mcp::collect_mcp_snapshot_with_detail;
pub use mcp::collect_missing_mcp_dependencies;
pub use mcp::compute_auth_statuses;
pub use mcp::configured_mcp_servers;
pub use mcp::discover_supported_scopes;
pub use mcp::effective_mcp_servers;
pub use mcp::group_tools_by_server;
pub use mcp::mcp_permission_prompt_is_auto_approved;
pub use mcp::oauth_login_support;
pub use mcp::qualified_mcp_tool_name_prefix;
pub use mcp::read_mcp_resource;
pub use mcp::resolve_oauth_scopes;
pub use mcp::should_retry_without_scopes;
pub use apps::filter_non_codex_apps_mcp_tools_only;
pub use mcp::mcp_permission_prompt_is_auto_approved;
pub use mcp::qualified_mcp_tool_name_prefix;
pub use tools::declared_openai_file_input_param_names;
pub(crate) mod apps;
pub(crate) mod client;
pub(crate) mod elicitation;
pub(crate) mod manager;
pub(crate) mod mcp;
pub(crate) mod runtime;
pub(crate) mod tools;
pub use mcp::split_qualified_tool_name;
pub use mcp::tool_plugin_provenance;
pub use mcp::with_codex_apps_mcp;
pub use mcp_connection_manager::CodexAppsToolsCacheKey;
pub use mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT;
pub use mcp_connection_manager::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use mcp_connection_manager::McpConnectionManager;
pub use mcp_connection_manager::McpRuntimeEnvironment;
pub use mcp_connection_manager::SandboxState;
pub use mcp_connection_manager::ToolInfo;
pub use mcp_connection_manager::codex_apps_tools_cache_key;
pub use mcp_connection_manager::declared_openai_file_input_param_names;
pub use mcp_connection_manager::filter_non_codex_apps_mcp_tools_only;

View File

@@ -1,692 +0,0 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use crate::McpAuthStatusEntry;
use crate::apps::CodexAppsToolsCacheContext;
use crate::apps::CodexAppsToolsCacheKey;
use crate::apps::write_cached_codex_apps_tools_if_needed;
use crate::client::AsyncManagedClient;
use crate::client::DEFAULT_STARTUP_TIMEOUT;
use crate::client::MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC;
use crate::client::MCP_TOOLS_LIST_DURATION_METRIC;
use crate::client::ManagedClient;
use crate::client::StartupOutcomeError;
use crate::client::list_tools_for_client_uncached;
use crate::elicitation::ElicitationRequestManager;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::ToolPluginProvenance;
use crate::runtime::McpRuntimeEnvironment;
use crate::runtime::emit_duration;
use crate::tools::ToolInfo;
use crate::tools::filter_tools;
use crate::tools::qualify_tools;
use crate::tools::tool_with_model_visible_input_schema;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use async_channel::Sender;
use codex_config::Constrained;
use codex_config::McpServerConfig;
use codex_config::McpServerTransportConfig;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_login::CodexAuth;
use codex_protocol::ToolName;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::McpStartupCompleteEvent;
use codex_protocol::protocol::McpStartupFailure;
use codex_protocol::protocol::McpStartupStatus;
use codex_protocol::protocol::McpStartupUpdateEvent;
use codex_protocol::protocol::SandboxPolicy;
use codex_rmcp_client::ElicitationResponse;
use rmcp::model::ListResourceTemplatesResult;
use rmcp::model::ListResourcesResult;
use rmcp::model::PaginatedRequestParams;
use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use rmcp::model::RequestId;
use rmcp::model::Resource;
use rmcp::model::ResourceTemplate;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use tracing::warn;
use url::Url;
/// A thin wrapper around a set of running [`RmcpClient`] instances.
pub struct McpConnectionManager {
clients: HashMap<String, AsyncManagedClient>,
server_origins: HashMap<String, String>,
elicitation_requests: ElicitationRequestManager,
}
async fn emit_update(
submit_id: &str,
tx_event: &Sender<Event>,
update: McpStartupUpdateEvent,
) -> Result<(), async_channel::SendError<Event>> {
tx_event
.send(Event {
id: submit_id.to_string(),
msg: EventMsg::McpStartupUpdate(update),
})
.await
}
fn transport_origin(transport: &McpServerTransportConfig) -> Option<String> {
match transport {
McpServerTransportConfig::StreamableHttp { url, .. } => {
let parsed = Url::parse(url).ok()?;
Some(parsed.origin().ascii_serialization())
}
McpServerTransportConfig::Stdio { .. } => Some("stdio".to_string()),
}
}
fn mcp_init_error_display(
server_name: &str,
entry: Option<&McpAuthStatusEntry>,
err: &StartupOutcomeError,
) -> String {
if let Some(McpServerTransportConfig::StreamableHttp {
url,
bearer_token_env_var,
http_headers,
..
}) = &entry.map(|entry| &entry.config.transport)
&& url == "https://api.githubcopilot.com/mcp/"
&& bearer_token_env_var.is_none()
&& http_headers.as_ref().map(HashMap::is_empty).unwrap_or(true)
{
format!(
"GitHub MCP does not support OAuth. Log in by adding a personal access token (https://github.com/settings/personal-access-tokens) to your environment and config.toml:\n[mcp_servers.{server_name}]\nbearer_token_env_var = CODEX_GITHUB_PERSONAL_ACCESS_TOKEN"
)
} else if is_mcp_client_auth_required_error(err) {
format!(
"The {server_name} MCP server is not logged in. Run `codex mcp login {server_name}`."
)
} else if is_mcp_client_startup_timeout_error(err) {
let startup_timeout_secs = match entry {
Some(entry) => match entry.config.startup_timeout_sec {
Some(timeout) => timeout,
None => DEFAULT_STARTUP_TIMEOUT,
},
None => DEFAULT_STARTUP_TIMEOUT,
}
.as_secs();
format!(
"MCP client for `{server_name}` timed out after {startup_timeout_secs} seconds. Add or adjust `startup_timeout_sec` in your config.toml:\n[mcp_servers.{server_name}]\nstartup_timeout_sec = XX"
)
} else {
format!("MCP client for `{server_name}` failed to start: {err:#}")
}
}
fn is_mcp_client_auth_required_error(error: &StartupOutcomeError) -> bool {
match error {
StartupOutcomeError::Failed { error } => error.contains("Auth required"),
_ => false,
}
}
fn is_mcp_client_startup_timeout_error(error: &StartupOutcomeError) -> bool {
match error {
StartupOutcomeError::Failed { error } => {
error.contains("request timed out")
|| error.contains("timed out handshaking with MCP server")
}
_ => false,
}
}
fn startup_outcome_error_message(error: StartupOutcomeError) -> String {
match error {
StartupOutcomeError::Cancelled => "MCP startup cancelled".to_string(),
StartupOutcomeError::Failed { error } => error,
}
}
impl McpConnectionManager {
pub fn new_uninitialized(
approval_policy: &Constrained<AskForApproval>,
sandbox_policy: &Constrained<SandboxPolicy>,
) -> Self {
Self {
clients: HashMap::new(),
server_origins: HashMap::new(),
elicitation_requests: ElicitationRequestManager::new(
approval_policy.value(),
sandbox_policy.get().clone(),
),
}
}
pub fn has_servers(&self) -> bool {
!self.clients.is_empty()
}
pub fn server_origin(&self, server_name: &str) -> Option<&str> {
self.server_origins.get(server_name).map(String::as_str)
}
pub fn set_approval_policy(&self, approval_policy: &Constrained<AskForApproval>) {
if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() {
*policy = approval_policy.value();
}
}
pub fn set_sandbox_policy(&self, sandbox_policy: &SandboxPolicy) {
if let Ok(mut policy) = self.elicitation_requests.sandbox_policy.lock() {
*policy = sandbox_policy.clone();
}
}
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub async fn new(
mcp_servers: &HashMap<String, McpServerConfig>,
store_mode: OAuthCredentialsStoreMode,
auth_entries: HashMap<String, McpAuthStatusEntry>,
approval_policy: &Constrained<AskForApproval>,
submit_id: String,
tx_event: Sender<Event>,
initial_sandbox_policy: SandboxPolicy,
runtime_environment: McpRuntimeEnvironment,
codex_home: PathBuf,
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
tool_plugin_provenance: ToolPluginProvenance,
auth: Option<&CodexAuth>,
) -> (Self, CancellationToken) {
let cancel_token = CancellationToken::new();
let mut clients = HashMap::new();
let mut server_origins = HashMap::new();
let mut join_set = JoinSet::new();
let elicitation_requests =
ElicitationRequestManager::new(approval_policy.value(), initial_sandbox_policy);
let tool_plugin_provenance = Arc::new(tool_plugin_provenance);
let startup_submit_id = submit_id.clone();
let codex_apps_auth_provider = auth
.filter(|auth| auth.uses_codex_backend())
.map(codex_model_provider::auth_provider_from_auth);
let mcp_servers = mcp_servers.clone();
for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) {
if let Some(origin) = transport_origin(&cfg.transport) {
server_origins.insert(server_name.clone(), origin);
}
let cancel_token = cancel_token.child_token();
let _ = emit_update(
startup_submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status: McpStartupStatus::Starting,
},
)
.await;
let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME {
Some(CodexAppsToolsCacheContext {
codex_home: codex_home.clone(),
user_key: codex_apps_tools_cache_key.clone(),
})
} else {
None
};
let uses_env_bearer_token = match &cfg.transport {
McpServerTransportConfig::StreamableHttp {
bearer_token_env_var,
..
} => bearer_token_env_var.is_some(),
McpServerTransportConfig::Stdio { .. } => false,
};
let runtime_auth_provider =
if server_name == CODEX_APPS_MCP_SERVER_NAME && !uses_env_bearer_token {
codex_apps_auth_provider.clone()
} else {
None
};
let async_managed_client = AsyncManagedClient::new(
server_name.clone(),
cfg,
store_mode,
cancel_token.clone(),
tx_event.clone(),
elicitation_requests.clone(),
codex_apps_tools_cache_context,
Arc::clone(&tool_plugin_provenance),
runtime_environment.clone(),
runtime_auth_provider,
);
clients.insert(server_name.clone(), async_managed_client.clone());
let tx_event = tx_event.clone();
let submit_id = startup_submit_id.clone();
let auth_entry = auth_entries.get(&server_name).cloned();
join_set.spawn(async move {
let mut outcome = async_managed_client.client().await;
if cancel_token.is_cancelled() {
outcome = Err(StartupOutcomeError::Cancelled);
}
let status = match &outcome {
Ok(_) => McpStartupStatus::Ready,
Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled,
Err(error) => {
let error_str = mcp_init_error_display(
server_name.as_str(),
auth_entry.as_ref(),
error,
);
McpStartupStatus::Failed { error: error_str }
}
};
let _ = emit_update(
submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status,
},
)
.await;
(server_name, outcome)
});
}
let manager = Self {
clients,
server_origins,
elicitation_requests: elicitation_requests.clone(),
};
tokio::spawn(async move {
let outcomes = join_set.join_all().await;
let mut summary = McpStartupCompleteEvent::default();
for (server_name, outcome) in outcomes {
match outcome {
Ok(_) => summary.ready.push(server_name),
Err(StartupOutcomeError::Cancelled) => summary.cancelled.push(server_name),
Err(StartupOutcomeError::Failed { error }) => {
summary.failed.push(McpStartupFailure {
server: server_name,
error,
})
}
}
}
let _ = tx_event
.send(Event {
id: startup_submit_id,
msg: EventMsg::McpStartupComplete(summary),
})
.await;
});
(manager, cancel_token)
}
pub async fn resolve_elicitation(
&self,
server_name: String,
id: RequestId,
response: ElicitationResponse,
) -> Result<()> {
self.elicitation_requests
.resolve(server_name, id, response)
.await
}
pub async fn wait_for_server_ready(&self, server_name: &str, timeout: Duration) -> bool {
let Some(async_managed_client) = self.clients.get(server_name) else {
return false;
};
match tokio::time::timeout(timeout, async_managed_client.client()).await {
Ok(Ok(_)) => true,
Ok(Err(_)) | Err(_) => false,
}
}
pub async fn required_startup_failures(
&self,
required_servers: &[String],
) -> Vec<McpStartupFailure> {
let mut failures = Vec::new();
for server_name in required_servers {
let Some(async_managed_client) = self.clients.get(server_name).cloned() else {
failures.push(McpStartupFailure {
server: server_name.clone(),
error: format!("required MCP server `{server_name}` was not initialized"),
});
continue;
};
match async_managed_client.client().await {
Ok(_) => {}
Err(error) => failures.push(McpStartupFailure {
server: server_name.clone(),
error: startup_outcome_error_message(error),
}),
}
}
failures
}
/// Returns a single map that contains all tools. Each key is the
/// fully-qualified name for the tool.
#[instrument(level = "trace", skip_all)]
pub async fn list_all_tools(&self) -> HashMap<String, ToolInfo> {
let mut tools = Vec::new();
for managed_client in self.clients.values() {
let Some(server_tools) = managed_client.listed_tools().await else {
continue;
};
tools.extend(server_tools);
}
qualify_tools(tools)
}
/// Force-refresh codex apps tools by bypassing the in-process cache.
///
/// On success, the refreshed tools replace the cache contents and the
/// latest filtered tool map is returned directly to the caller. On
/// failure, the existing cache remains unchanged.
pub async fn hard_refresh_codex_apps_tools_cache(&self) -> Result<HashMap<String, ToolInfo>> {
let managed_client = self
.clients
.get(CODEX_APPS_MCP_SERVER_NAME)
.ok_or_else(|| anyhow!("unknown MCP server '{CODEX_APPS_MCP_SERVER_NAME}'"))?
.client()
.await
.context("failed to get client")?;
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(
CODEX_APPS_MCP_SERVER_NAME,
&managed_client.client,
managed_client.tool_timeout,
managed_client.server_instructions.as_deref(),
)
.await
.with_context(|| {
format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'")
})?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
&[],
);
write_cached_codex_apps_tools_if_needed(
CODEX_APPS_MCP_SERVER_NAME,
managed_client.codex_apps_tools_cache_context.as_ref(),
&tools,
);
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
list_start.elapsed(),
&[("cache", "miss")],
);
let tools = filter_tools(tools, &managed_client.tool_filter)
.into_iter()
.map(|mut tool| {
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
tool
});
Ok(qualify_tools(tools))
}
/// Returns a single map that contains all resources. Each key is the
/// server name and the value is a vector of resources.
pub async fn list_all_resources(&self) -> HashMap<String, Vec<Resource>> {
let mut join_set = JoinSet::new();
let clients_snapshot = &self.clients;
for (server_name, async_managed_client) in clients_snapshot {
let server_name = server_name.clone();
let Ok(managed_client) = async_managed_client.client().await else {
continue;
};
let timeout = managed_client.tool_timeout;
let client = managed_client.client.clone();
join_set.spawn(async move {
let mut collected: Vec<Resource> = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
meta: None,
cursor: Some(next.clone()),
});
let response = match client.list_resources(params, timeout).await {
Ok(result) => result,
Err(err) => return (server_name, Err(err)),
};
collected.extend(response.resources);
match response.next_cursor {
Some(next) => {
if cursor.as_ref() == Some(&next) {
return (
server_name,
Err(anyhow!("resources/list returned duplicate cursor")),
);
}
cursor = Some(next);
}
None => return (server_name, Ok(collected)),
}
}
});
}
let mut aggregated: HashMap<String, Vec<Resource>> = HashMap::new();
while let Some(join_res) = join_set.join_next().await {
match join_res {
Ok((server_name, Ok(resources))) => {
aggregated.insert(server_name, resources);
}
Ok((server_name, Err(err))) => {
warn!("Failed to list resources for MCP server '{server_name}': {err:#}");
}
Err(err) => {
warn!("Task panic when listing resources for MCP server: {err:#}");
}
}
}
aggregated
}
/// Returns a single map that contains all resource templates. Each key is the
/// server name and the value is a vector of resource templates.
pub async fn list_all_resource_templates(&self) -> HashMap<String, Vec<ResourceTemplate>> {
let mut join_set = JoinSet::new();
let clients_snapshot = &self.clients;
for (server_name, async_managed_client) in clients_snapshot {
let server_name_cloned = server_name.clone();
let Ok(managed_client) = async_managed_client.client().await else {
continue;
};
let client = managed_client.client.clone();
let timeout = managed_client.tool_timeout;
join_set.spawn(async move {
let mut collected: Vec<ResourceTemplate> = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
meta: None,
cursor: Some(next.clone()),
});
let response = match client.list_resource_templates(params, timeout).await {
Ok(result) => result,
Err(err) => return (server_name_cloned, Err(err)),
};
collected.extend(response.resource_templates);
match response.next_cursor {
Some(next) => {
if cursor.as_ref() == Some(&next) {
return (
server_name_cloned,
Err(anyhow!(
"resources/templates/list returned duplicate cursor"
)),
);
}
cursor = Some(next);
}
None => return (server_name_cloned, Ok(collected)),
}
}
});
}
let mut aggregated: HashMap<String, Vec<ResourceTemplate>> = HashMap::new();
while let Some(join_res) = join_set.join_next().await {
match join_res {
Ok((server_name, Ok(templates))) => {
aggregated.insert(server_name, templates);
}
Ok((server_name, Err(err))) => {
warn!(
"Failed to list resource templates for MCP server '{server_name}': {err:#}"
);
}
Err(err) => {
warn!("Task panic when listing resource templates for MCP server: {err:#}");
}
}
}
aggregated
}
/// Invoke the tool indicated by the (server, tool) pair.
pub async fn call_tool(
&self,
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> Result<CallToolResult> {
let client = self.client_by_name(server).await?;
if !client.tool_filter.allows(tool) {
return Err(anyhow!(
"tool '{tool}' is disabled for MCP server '{server}'"
));
}
let result: rmcp::model::CallToolResult = client
.client
.call_tool(tool.to_string(), arguments, meta, client.tool_timeout)
.await
.with_context(|| format!("tool call failed for `{server}/{tool}`"))?;
let content = result
.content
.into_iter()
.map(|content| {
serde_json::to_value(content)
.unwrap_or_else(|_| serde_json::Value::String("<content>".to_string()))
})
.collect();
Ok(CallToolResult {
content,
structured_content: result.structured_content,
is_error: result.is_error,
meta: result.meta.and_then(|meta| serde_json::to_value(meta).ok()),
})
}
pub async fn server_supports_sandbox_state_meta_capability(
&self,
server: &str,
) -> Result<bool> {
Ok(self
.client_by_name(server)
.await?
.server_supports_sandbox_state_meta_capability)
}
/// List resources from the specified server.
pub async fn list_resources(
&self,
server: &str,
params: Option<PaginatedRequestParams>,
) -> Result<ListResourcesResult> {
let managed = self.client_by_name(server).await?;
let timeout = managed.tool_timeout;
managed
.client
.list_resources(params, timeout)
.await
.with_context(|| format!("resources/list failed for `{server}`"))
}
/// List resource templates from the specified server.
pub async fn list_resource_templates(
&self,
server: &str,
params: Option<PaginatedRequestParams>,
) -> Result<ListResourceTemplatesResult> {
let managed = self.client_by_name(server).await?;
let client = managed.client.clone();
let timeout = managed.tool_timeout;
client
.list_resource_templates(params, timeout)
.await
.with_context(|| format!("resources/templates/list failed for `{server}`"))
}
/// Read a resource from the specified server.
pub async fn read_resource(
&self,
server: &str,
params: ReadResourceRequestParams,
) -> Result<ReadResourceResult> {
let managed = self.client_by_name(server).await?;
let client = managed.client.clone();
let timeout = managed.tool_timeout;
let uri = params.uri.clone();
client
.read_resource(params, timeout)
.await
.with_context(|| format!("resources/read failed for `{server}` ({uri})"))
}
pub async fn resolve_tool_info(&self, tool_name: &ToolName) -> Option<ToolInfo> {
let all_tools = self.list_all_tools().await;
all_tools
.into_values()
.find(|tool| tool.canonical_tool_name() == *tool_name)
}
async fn client_by_name(&self, name: &str) -> Result<ManagedClient> {
self.clients
.get(name)
.ok_or_else(|| anyhow!("unknown MCP server '{name}'"))?
.client()
.await
.context("failed to get client")
}
}
#[cfg(test)]
#[path = "mcp_connection_manager_tests.rs"]
mod tests;

View File

@@ -43,12 +43,6 @@ pub struct ResolvedMcpOAuthScopes {
pub source: McpOAuthScopesSource,
}
#[derive(Debug, Clone)]
pub struct McpAuthStatusEntry {
pub config: McpServerConfig,
pub auth_status: McpAuthStatus,
}
pub async fn oauth_login_support(transport: &McpServerTransportConfig) -> McpOAuthLoginSupport {
let McpServerTransportConfig::StreamableHttp {
url,
@@ -125,6 +119,12 @@ pub fn should_retry_without_scopes(scopes: &ResolvedMcpOAuthScopes, error: &anyh
&& error.downcast_ref::<OAuthProviderError>().is_some()
}
#[derive(Debug, Clone)]
pub struct McpAuthStatusEntry {
pub config: McpServerConfig,
pub auth_status: McpAuthStatus,
}
pub async fn compute_auth_statuses<'a, I>(
servers: I,
store_mode: OAuthCredentialsStoreMode,

View File

@@ -1,3 +1,5 @@
pub(crate) mod auth;
mod skill_dependencies;
pub use auth::McpAuthStatusEntry;
pub use auth::McpOAuthLoginConfig;
pub use auth::McpOAuthLoginSupport;
@@ -8,8 +10,8 @@ pub use auth::discover_supported_scopes;
pub use auth::oauth_login_support;
pub use auth::resolve_oauth_scopes;
pub use auth::should_retry_without_scopes;
pub(crate) mod auth;
pub use skill_dependencies::canonical_mcp_server_key;
pub use skill_dependencies::collect_missing_mcp_dependencies;
use std::collections::HashMap;
use std::env;
@@ -34,13 +36,14 @@ use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use serde_json::Value;
use crate::apps::codex_apps_tools_cache_key;
use crate::manager::McpConnectionManager;
use crate::runtime::McpRuntimeEnvironment;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::McpRuntimeEnvironment;
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
pub type McpManager = McpConnectionManager;
pub const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps";
const MCP_TOOL_NAME_PREFIX: &str = "mcp";
const MCP_TOOL_NAME_DELIMITER: &str = "__";
pub const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps";
const CODEX_CONNECTORS_TOKEN_ENV_VAR: &str = "CODEX_CONNECTORS_TOKEN";
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
@@ -56,6 +59,26 @@ impl McpSnapshotDetail {
}
}
/// The Responses API requires tool names to match `^[a-zA-Z0-9_-]+$`.
/// MCP server/tool names are user-controlled, so sanitize the fully-qualified
/// name we expose to the model by replacing any disallowed character with `_`.
pub(crate) fn sanitize_responses_api_tool_name(name: &str) -> String {
let mut sanitized = String::with_capacity(name.len());
for c in name.chars() {
if c.is_ascii_alphanumeric() || c == '_' {
sanitized.push(c);
} else {
sanitized.push('_');
}
}
if sanitized.is_empty() {
"_".to_string()
} else {
sanitized
}
}
pub fn qualified_mcp_tool_name_prefix(server_name: &str) -> String {
sanitize_responses_api_tool_name(&format!(
"{MCP_TOOL_NAME_PREFIX}{MCP_TOOL_NAME_DELIMITER}{server_name}{MCP_TOOL_NAME_DELIMITER}"
@@ -82,7 +105,7 @@ pub fn mcp_permission_prompt_is_auto_approved(
/// approval/sandbox policy, locate OAuth state, and merge plugin-provided MCP
/// servers. Request-scoped or auth-scoped state should not be stored here;
/// thread those values explicitly into runtime entry points such as
/// [`with_codex_apps_mcp`] and snapshot collection helpers so config objects do
/// [`with_codex_apps_mcp`] and [`collect_mcp_snapshot`] so config objects do
/// not go stale when auth changes.
#[derive(Debug, Clone)]
pub struct McpConfig {
@@ -173,6 +196,67 @@ impl ToolPluginProvenance {
}
}
fn codex_apps_mcp_bearer_token_env_var() -> Option<String> {
match env::var(CODEX_CONNECTORS_TOKEN_ENV_VAR) {
Ok(value) if !value.trim().is_empty() => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()),
Ok(_) => None,
Err(env::VarError::NotPresent) => None,
Err(env::VarError::NotUnicode(_)) => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()),
}
}
fn normalize_codex_apps_base_url(base_url: &str) -> String {
let mut base_url = base_url.trim_end_matches('/').to_string();
if (base_url.starts_with("https://chatgpt.com")
|| base_url.starts_with("https://chat.openai.com"))
&& !base_url.contains("/backend-api")
{
base_url = format!("{base_url}/backend-api");
}
base_url
}
fn codex_apps_mcp_url_for_base_url(base_url: &str) -> String {
let base_url = normalize_codex_apps_base_url(base_url);
if base_url.contains("/backend-api") {
format!("{base_url}/wham/apps")
} else if base_url.contains("/api/codex") {
format!("{base_url}/apps")
} else {
format!("{base_url}/api/codex/apps")
}
}
pub(crate) fn codex_apps_mcp_url(config: &McpConfig) -> String {
codex_apps_mcp_url_for_base_url(&config.chatgpt_base_url)
}
fn codex_apps_mcp_server_config(config: &McpConfig) -> McpServerConfig {
let url = codex_apps_mcp_url(config);
McpServerConfig {
transport: McpServerTransportConfig::StreamableHttp {
url,
bearer_token_env_var: codex_apps_mcp_bearer_token_env_var(),
http_headers: None,
env_http_headers: None,
},
experimental_environment: None,
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: Some(Duration::from_secs(30)),
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth_resource: None,
tools: HashMap::new(),
}
}
pub fn with_codex_apps_mcp(
mut servers: HashMap<String, McpServerConfig>,
auth: Option<&CodexAuth>,
@@ -251,6 +335,78 @@ pub async fn read_mcp_resource(
result
}
pub async fn collect_mcp_snapshot(
config: &McpConfig,
auth: Option<&CodexAuth>,
submit_id: String,
runtime_environment: McpRuntimeEnvironment,
) -> McpListToolsResponseEvent {
collect_mcp_snapshot_with_detail(
config,
auth,
submit_id,
runtime_environment,
McpSnapshotDetail::Full,
)
.await
}
pub async fn collect_mcp_snapshot_with_detail(
config: &McpConfig,
auth: Option<&CodexAuth>,
submit_id: String,
runtime_environment: McpRuntimeEnvironment,
detail: McpSnapshotDetail,
) -> McpListToolsResponseEvent {
let mcp_servers = effective_mcp_servers(config, auth);
let tool_plugin_provenance = tool_plugin_provenance(config);
if mcp_servers.is_empty() {
return McpListToolsResponseEvent {
tools: HashMap::new(),
resources: HashMap::new(),
resource_templates: HashMap::new(),
auth_statuses: HashMap::new(),
};
}
let auth_status_entries = compute_auth_statuses(
mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
auth,
)
.await;
let (tx_event, rx_event) = unbounded();
drop(rx_event);
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_status_entries.clone(),
&config.approval_policy,
submit_id,
tx_event,
SandboxPolicy::new_read_only_policy(),
runtime_environment,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth),
tool_plugin_provenance,
auth,
)
.await;
let snapshot = collect_mcp_snapshot_from_manager_with_detail(
&mcp_connection_manager,
auth_status_entries,
detail,
)
.await;
cancel_token.cancel();
snapshot
}
#[derive(Debug, Clone)]
pub struct McpServerStatusSnapshot {
pub tools_by_server: HashMap<String, HashMap<String, Tool>>,
@@ -259,6 +415,22 @@ pub struct McpServerStatusSnapshot {
pub auth_statuses: HashMap<String, McpAuthStatus>,
}
pub async fn collect_mcp_server_status_snapshot(
config: &McpConfig,
auth: Option<&CodexAuth>,
submit_id: String,
runtime_environment: McpRuntimeEnvironment,
) -> McpServerStatusSnapshot {
collect_mcp_server_status_snapshot_with_detail(
config,
auth,
submit_id,
runtime_environment,
McpSnapshotDetail::Full,
)
.await
}
pub async fn collect_mcp_server_status_snapshot_with_detail(
config: &McpConfig,
auth: Option<&CodexAuth>,
@@ -315,97 +487,33 @@ pub async fn collect_mcp_server_status_snapshot_with_detail(
snapshot
}
pub async fn collect_mcp_snapshot_from_manager(
mcp_connection_manager: &McpConnectionManager,
auth_status_entries: HashMap<String, McpAuthStatusEntry>,
) -> McpListToolsResponseEvent {
collect_mcp_snapshot_from_manager_with_detail(
mcp_connection_manager,
auth_status_entries,
McpSnapshotDetail::Full,
)
.await
pub fn split_qualified_tool_name(qualified_name: &str) -> Option<(String, String)> {
let mut parts = qualified_name.split(MCP_TOOL_NAME_DELIMITER);
let prefix = parts.next()?;
if prefix != MCP_TOOL_NAME_PREFIX {
return None;
}
let server_name = parts.next()?;
let tool_name: String = parts.collect::<Vec<_>>().join(MCP_TOOL_NAME_DELIMITER);
if tool_name.is_empty() {
return None;
}
Some((server_name.to_string(), tool_name))
}
pub(crate) fn codex_apps_mcp_url(config: &McpConfig) -> String {
codex_apps_mcp_url_for_base_url(&config.chatgpt_base_url)
}
/// The Responses API requires tool names to match `^[a-zA-Z0-9_-]+$`.
/// MCP server/tool names are user-controlled, so sanitize the fully-qualified
/// name we expose to the model by replacing any disallowed character with `_`.
pub(crate) fn sanitize_responses_api_tool_name(name: &str) -> String {
let mut sanitized = String::with_capacity(name.len());
for c in name.chars() {
if c.is_ascii_alphanumeric() || c == '_' {
sanitized.push(c);
} else {
sanitized.push('_');
pub fn group_tools_by_server(
tools: &HashMap<String, Tool>,
) -> HashMap<String, HashMap<String, Tool>> {
let mut grouped = HashMap::new();
for (qualified_name, tool) in tools {
if let Some((server_name, tool_name)) = split_qualified_tool_name(qualified_name) {
grouped
.entry(server_name)
.or_insert_with(HashMap::new)
.insert(tool_name, tool.clone());
}
}
if sanitized.is_empty() {
"_".to_string()
} else {
sanitized
}
}
fn codex_apps_mcp_bearer_token_env_var() -> Option<String> {
match env::var(CODEX_CONNECTORS_TOKEN_ENV_VAR) {
Ok(value) if !value.trim().is_empty() => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()),
Ok(_) => None,
Err(env::VarError::NotPresent) => None,
Err(env::VarError::NotUnicode(_)) => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()),
}
}
fn normalize_codex_apps_base_url(base_url: &str) -> String {
let mut base_url = base_url.trim_end_matches('/').to_string();
if (base_url.starts_with("https://chatgpt.com")
|| base_url.starts_with("https://chat.openai.com"))
&& !base_url.contains("/backend-api")
{
base_url = format!("{base_url}/backend-api");
}
base_url
}
fn codex_apps_mcp_url_for_base_url(base_url: &str) -> String {
let base_url = normalize_codex_apps_base_url(base_url);
if base_url.contains("/backend-api") {
format!("{base_url}/wham/apps")
} else if base_url.contains("/api/codex") {
format!("{base_url}/apps")
} else {
format!("{base_url}/api/codex/apps")
}
}
fn codex_apps_mcp_server_config(config: &McpConfig) -> McpServerConfig {
let url = codex_apps_mcp_url(config);
McpServerConfig {
transport: McpServerTransportConfig::StreamableHttp {
url,
bearer_token_env_var: codex_apps_mcp_bearer_token_env_var(),
http_headers: None,
env_http_headers: None,
},
experimental_environment: None,
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: Some(Duration::from_secs(30)),
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth_resource: None,
tools: HashMap::new(),
}
grouped
}
fn protocol_tool_from_rmcp_tool(name: &str, tool: &rmcp::model::Tool) -> Option<Tool> {
@@ -556,7 +664,19 @@ async fn collect_mcp_server_status_snapshot_from_manager(
}
}
async fn collect_mcp_snapshot_from_manager_with_detail(
pub async fn collect_mcp_snapshot_from_manager(
mcp_connection_manager: &McpConnectionManager,
auth_status_entries: HashMap<String, McpAuthStatusEntry>,
) -> McpListToolsResponseEvent {
collect_mcp_snapshot_from_manager_with_detail(
mcp_connection_manager,
auth_status_entries,
McpSnapshotDetail::Full,
)
.await
}
pub async fn collect_mcp_snapshot_from_manager_with_detail(
mcp_connection_manager: &McpConnectionManager,
auth_status_entries: HashMap<String, McpAuthStatusEntry>,
detail: McpSnapshotDetail,

View File

@@ -25,6 +25,27 @@ fn test_mcp_config(codex_home: PathBuf) -> McpConfig {
}
}
fn make_tool(name: &str) -> Tool {
Tool {
name: name.to_string(),
title: None,
description: None,
input_schema: serde_json::json!({"type": "object", "properties": {}}),
output_schema: None,
annotations: None,
icons: None,
meta: None,
}
}
#[test]
fn split_qualified_tool_name_returns_server_and_tool() {
assert_eq!(
split_qualified_tool_name("mcp__alpha__do_thing"),
Some(("alpha".to_string(), "do_thing".to_string()))
);
}
#[test]
fn qualified_mcp_tool_name_prefix_sanitizes_server_names_without_lowercasing() {
assert_eq!(
@@ -33,6 +54,36 @@ fn qualified_mcp_tool_name_prefix_sanitizes_server_names_without_lowercasing() {
);
}
#[test]
fn split_qualified_tool_name_rejects_invalid_names() {
assert_eq!(split_qualified_tool_name("other__alpha__do_thing"), None);
assert_eq!(split_qualified_tool_name("mcp__alpha__"), None);
}
#[test]
fn group_tools_by_server_strips_prefix_and_groups() {
let mut tools = HashMap::new();
tools.insert("mcp__alpha__do_thing".to_string(), make_tool("do_thing"));
tools.insert(
"mcp__alpha__nested__op".to_string(),
make_tool("nested__op"),
);
tools.insert("mcp__beta__do_other".to_string(), make_tool("do_other"));
let mut expected_alpha = HashMap::new();
expected_alpha.insert("do_thing".to_string(), make_tool("do_thing"));
expected_alpha.insert("nested__op".to_string(), make_tool("nested__op"));
let mut expected_beta = HashMap::new();
expected_beta.insert("do_other".to_string(), make_tool("do_other"));
let mut expected = HashMap::new();
expected.insert("alpha".to_string(), expected_alpha);
expected.insert("beta".to_string(), expected_beta);
assert_eq!(group_tools_by_server(&tools), expected);
}
#[test]
fn tool_plugin_provenance_collects_app_and_mcp_sources() {
let provenance = ToolPluginProvenance::from_capability_summaries(&[

View File

@@ -0,0 +1,172 @@
use std::collections::HashMap;
use std::collections::HashSet;
use codex_config::McpServerConfig;
use codex_config::McpServerTransportConfig;
use codex_protocol::protocol::SkillMetadata;
use codex_protocol::protocol::SkillToolDependency;
use tracing::warn;
pub fn collect_missing_mcp_dependencies(
mentioned_skills: &[SkillMetadata],
installed: &HashMap<String, McpServerConfig>,
) -> HashMap<String, McpServerConfig> {
let mut missing = HashMap::new();
let installed_keys: HashSet<String> = installed
.iter()
.map(|(name, config)| canonical_mcp_server_key(name, config))
.collect();
let mut seen_canonical_keys = HashSet::new();
for skill in mentioned_skills {
let Some(dependencies) = skill.dependencies.as_ref() else {
continue;
};
for tool in &dependencies.tools {
if !tool.r#type.eq_ignore_ascii_case("mcp") {
continue;
}
let dependency_key = match canonical_mcp_dependency_key(tool) {
Ok(key) => key,
Err(err) => {
let dependency = tool.value.as_str();
let skill_name = skill.name.as_str();
warn!(
"unable to auto-install MCP dependency {dependency} for skill {skill_name}: {err}",
);
continue;
}
};
if installed_keys.contains(&dependency_key)
|| seen_canonical_keys.contains(&dependency_key)
{
continue;
}
let config = match mcp_dependency_to_server_config(tool) {
Ok(config) => config,
Err(err) => {
let dependency = dependency_key.as_str();
let skill_name = skill.name.as_str();
warn!(
"unable to auto-install MCP dependency {dependency} for skill {skill_name}: {err}",
);
continue;
}
};
missing.insert(tool.value.clone(), config);
seen_canonical_keys.insert(dependency_key);
}
}
missing
}
fn canonical_mcp_key(transport: &str, identifier: &str, fallback: &str) -> String {
let identifier = identifier.trim();
if identifier.is_empty() {
fallback.to_string()
} else {
format!("mcp__{transport}__{identifier}")
}
}
pub fn canonical_mcp_server_key(name: &str, config: &McpServerConfig) -> String {
match &config.transport {
McpServerTransportConfig::Stdio { command, .. } => {
canonical_mcp_key("stdio", command, name)
}
McpServerTransportConfig::StreamableHttp { url, .. } => {
canonical_mcp_key("streamable_http", url, name)
}
}
}
fn canonical_mcp_dependency_key(dependency: &SkillToolDependency) -> Result<String, String> {
let transport = dependency.transport.as_deref().unwrap_or("streamable_http");
if transport.eq_ignore_ascii_case("streamable_http") {
let url = dependency
.url
.as_ref()
.ok_or_else(|| "missing url for streamable_http dependency".to_string())?;
return Ok(canonical_mcp_key("streamable_http", url, &dependency.value));
}
if transport.eq_ignore_ascii_case("stdio") {
let command = dependency
.command
.as_ref()
.ok_or_else(|| "missing command for stdio dependency".to_string())?;
return Ok(canonical_mcp_key("stdio", command, &dependency.value));
}
Err(format!("unsupported transport {transport}"))
}
fn mcp_dependency_to_server_config(
dependency: &SkillToolDependency,
) -> Result<McpServerConfig, String> {
let transport = dependency.transport.as_deref().unwrap_or("streamable_http");
if transport.eq_ignore_ascii_case("streamable_http") {
let url = dependency
.url
.as_ref()
.ok_or_else(|| "missing url for streamable_http dependency".to_string())?;
return Ok(McpServerConfig {
transport: McpServerTransportConfig::StreamableHttp {
url: url.clone(),
bearer_token_env_var: None,
http_headers: None,
env_http_headers: None,
},
experimental_environment: None,
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: None,
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth_resource: None,
tools: HashMap::new(),
});
}
if transport.eq_ignore_ascii_case("stdio") {
let command = dependency
.command
.as_ref()
.ok_or_else(|| "missing command for stdio dependency".to_string())?;
return Ok(McpServerConfig {
transport: McpServerTransportConfig::Stdio {
command: command.clone(),
args: Vec::new(),
env: None,
env_vars: Vec::new(),
cwd: None,
},
experimental_environment: None,
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: None,
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth_resource: None,
tools: HashMap::new(),
});
}
Err(format!("unsupported transport {transport}"))
}
#[cfg(test)]
#[path = "skill_dependencies_tests.rs"]
mod tests;

View File

@@ -0,0 +1,115 @@
use super::*;
use codex_protocol::protocol::SkillDependencies;
use codex_protocol::protocol::SkillMetadata;
use codex_protocol::protocol::SkillScope;
use codex_utils_absolute_path::test_support::PathBufExt as _;
use codex_utils_absolute_path::test_support::test_path_buf;
use pretty_assertions::assert_eq;
fn skill_with_tools(tools: Vec<SkillToolDependency>) -> SkillMetadata {
SkillMetadata {
name: "skill".to_string(),
description: "skill".to_string(),
short_description: None,
interface: None,
dependencies: Some(SkillDependencies { tools }),
path: test_path_buf("/tmp/skill").abs(),
scope: SkillScope::User,
enabled: true,
}
}
#[test]
fn collect_missing_respects_canonical_installed_key() {
let url = "https://example.com/mcp".to_string();
let skills = vec![skill_with_tools(vec![SkillToolDependency {
r#type: "mcp".to_string(),
value: "github".to_string(),
description: None,
transport: Some("streamable_http".to_string()),
command: None,
url: Some(url.clone()),
}])];
let installed = HashMap::from([(
"alias".to_string(),
McpServerConfig {
transport: McpServerTransportConfig::StreamableHttp {
url,
bearer_token_env_var: None,
http_headers: None,
env_http_headers: None,
},
experimental_environment: None,
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: None,
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth_resource: None,
tools: HashMap::new(),
},
)]);
assert_eq!(
collect_missing_mcp_dependencies(&skills, &installed),
HashMap::new()
);
}
#[test]
fn collect_missing_dedupes_by_canonical_key_but_preserves_original_name() {
let url = "https://example.com/one".to_string();
let skills = vec![skill_with_tools(vec![
SkillToolDependency {
r#type: "mcp".to_string(),
value: "alias-one".to_string(),
description: None,
transport: Some("streamable_http".to_string()),
command: None,
url: Some(url.clone()),
},
SkillToolDependency {
r#type: "mcp".to_string(),
value: "alias-two".to_string(),
description: None,
transport: Some("streamable_http".to_string()),
command: None,
url: Some(url.clone()),
},
])];
let expected = HashMap::from([(
"alias-one".to_string(),
McpServerConfig {
transport: McpServerTransportConfig::StreamableHttp {
url,
bearer_token_env_var: None,
http_headers: None,
env_http_headers: None,
},
experimental_environment: None,
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: None,
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth_resource: None,
tools: HashMap::new(),
},
)]);
assert_eq!(
collect_missing_mcp_dependencies(&skills, &HashMap::new()),
expected
);
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,35 +1,11 @@
use super::*;
use crate::apps::CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION;
use crate::apps::CodexAppsToolsCacheContext;
use crate::apps::load_startup_cached_codex_apps_tools_snapshot;
use crate::apps::read_cached_codex_apps_tools;
use crate::apps::write_cached_codex_apps_tools;
use crate::client::AsyncManagedClient;
use crate::client::ManagedClient;
use crate::client::StartupOutcomeError;
use crate::client::elicitation_capability_for_server;
use crate::declared_openai_file_input_param_names;
use crate::elicitation::ElicitationRequestManager;
use crate::elicitation::elicitation_is_rejected_by_policy;
use crate::tools::ToolFilter;
use crate::tools::ToolInfo;
use crate::tools::filter_tools;
use crate::tools::qualify_tools;
use crate::tools::tool_with_model_visible_input_schema;
use codex_config::Constrained;
use codex_protocol::ToolName;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::McpAuthStatus;
use futures::FutureExt;
use pretty_assertions::assert_eq;
use rmcp::model::CreateElicitationRequestParams;
use rmcp::model::ElicitationAction;
use rmcp::model::ElicitationCapability;
use rmcp::model::FormElicitationCapability;
use rmcp::model::JsonObject;
use rmcp::model::Meta;
use rmcp::model::NumberOrString;
use rmcp::model::Tool;
use std::collections::HashSet;
use std::sync::Arc;
use tempfile::tempdir;

View File

@@ -2,177 +2,105 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use codex_config::McpServerConfig;
use codex_protocol::ToolName;
use rmcp::model::Tool;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Map;
use serde_json::Value as JsonValue;
use sha1::Digest;
use sha1::Sha1;
use tracing::warn;
use crate::mcp::sanitize_responses_api_tool_name;
use crate::mcp_connection_manager::ToolInfo;
const MCP_TOOL_NAME_DELIMITER: &str = "__";
const MAX_TOOL_NAME_LENGTH: usize = 64;
const CALLABLE_NAME_HASH_LEN: usize = 12;
const META_OPENAI_FILE_PARAMS: &str = "openai/fileParams";
pub(crate) const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str =
"codex.mcp.tools.cache_write.duration_ms";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolInfo {
/// Raw MCP server name used for routing the tool call.
pub server_name: String,
/// Model-visible tool name used in Responses API tool declarations.
#[serde(rename = "tool_name", alias = "callable_name")]
pub callable_name: String,
/// Model-visible namespace used for deferred tool loading.
#[serde(rename = "tool_namespace", alias = "callable_namespace")]
pub callable_namespace: String,
/// Instructions from the MCP server initialize result.
#[serde(default)]
pub server_instructions: Option<String>,
/// Raw MCP tool definition; `tool.name` is sent back to the MCP server.
pub tool: Tool,
pub connector_id: Option<String>,
pub connector_name: Option<String>,
#[serde(default)]
pub plugin_display_names: Vec<String>,
pub connector_description: Option<String>,
fn sha1_hex(s: &str) -> String {
let mut hasher = Sha1::new();
hasher.update(s.as_bytes());
let sha1 = hasher.finalize();
format!("{sha1:x}")
}
impl ToolInfo {
pub fn canonical_tool_name(&self) -> ToolName {
ToolName::namespaced(self.callable_namespace.clone(), self.callable_name.clone())
}
fn callable_name_hash_suffix(raw_identity: &str) -> String {
let hash = sha1_hex(raw_identity);
format!("_{}", &hash[..CALLABLE_NAME_HASH_LEN])
}
pub fn declared_openai_file_input_param_names(
meta: Option<&Map<String, JsonValue>>,
) -> Vec<String> {
let Some(meta) = meta else {
return Vec::new();
};
meta.get(META_OPENAI_FILE_PARAMS)
.and_then(JsonValue::as_array)
.into_iter()
.flatten()
.filter_map(JsonValue::as_str)
.filter(|value| !value.is_empty())
.map(str::to_string)
.collect()
fn append_hash_suffix(value: &str, raw_identity: &str) -> String {
format!("{value}{}", callable_name_hash_suffix(raw_identity))
}
/// A tool is allowed to be used if both are true:
/// 1. enabled is None (no allowlist is set) or the tool is explicitly enabled.
/// 2. The tool is not explicitly disabled.
#[derive(Default, Clone)]
pub(crate) struct ToolFilter {
pub(crate) enabled: Option<HashSet<String>>,
pub(crate) disabled: HashSet<String>,
}
impl ToolFilter {
pub(crate) fn from_config(cfg: &McpServerConfig) -> Self {
let enabled = cfg
.enabled_tools
.as_ref()
.map(|tools| tools.iter().cloned().collect::<HashSet<_>>());
let disabled = cfg
.disabled_tools
.as_ref()
.map(|tools| tools.iter().cloned().collect::<HashSet<_>>())
.unwrap_or_default();
Self { enabled, disabled }
}
pub(crate) fn allows(&self, tool_name: &str) -> bool {
if let Some(enabled) = &self.enabled
&& !enabled.contains(tool_name)
{
return false;
}
!self.disabled.contains(tool_name)
}
}
/// Returns the model-visible view of a tool while preserving the raw metadata
/// used by execution. Keep cache entries raw and call this at manager return
/// boundaries.
pub(crate) fn tool_with_model_visible_input_schema(tool: &Tool) -> Tool {
let file_params = declared_openai_file_input_param_names(tool.meta.as_deref());
if file_params.is_empty() {
return tool.clone();
}
let mut tool = tool.clone();
let mut input_schema = JsonValue::Object(tool.input_schema.as_ref().clone());
mask_input_schema_for_file_path_params(&mut input_schema, &file_params);
if let JsonValue::Object(input_schema) = input_schema {
tool.input_schema = Arc::new(input_schema);
}
tool
}
fn mask_input_schema_for_file_path_params(input_schema: &mut JsonValue, file_params: &[String]) {
let Some(properties) = input_schema
.as_object_mut()
.and_then(|schema| schema.get_mut("properties"))
.and_then(JsonValue::as_object_mut)
else {
return;
};
for field_name in file_params {
let Some(property_schema) = properties.get_mut(field_name) else {
continue;
};
mask_input_property_schema(property_schema);
}
}
fn mask_input_property_schema(schema: &mut JsonValue) {
let Some(object) = schema.as_object_mut() else {
return;
};
let mut description = object
.get("description")
.and_then(JsonValue::as_str)
.map(str::to_string)
.unwrap_or_default();
let guidance = "This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here.";
if description.is_empty() {
description = guidance.to_string();
} else if !description.contains(guidance) {
description = format!("{description} {guidance}");
}
let is_array = object.get("type").and_then(JsonValue::as_str) == Some("array")
|| object.get("items").is_some();
object.clear();
object.insert("description".to_string(), JsonValue::String(description));
if is_array {
object.insert("type".to_string(), JsonValue::String("array".to_string()));
object.insert("items".to_string(), serde_json::json!({ "type": "string" }));
fn append_namespace_hash_suffix(namespace: &str, raw_identity: &str) -> String {
if let Some(namespace) = namespace.strip_suffix(MCP_TOOL_NAME_DELIMITER) {
format!(
"{}{}{}",
namespace,
callable_name_hash_suffix(raw_identity),
MCP_TOOL_NAME_DELIMITER
)
} else {
object.insert("type".to_string(), JsonValue::String("string".to_string()));
append_hash_suffix(namespace, raw_identity)
}
}
pub(crate) fn filter_tools(tools: Vec<ToolInfo>, filter: &ToolFilter) -> Vec<ToolInfo> {
tools
.into_iter()
.filter(|tool| filter.allows(&tool.tool.name))
.collect()
fn truncate_name(value: &str, max_len: usize) -> String {
value.chars().take(max_len).collect()
}
fn fit_callable_parts_with_hash(
namespace: &str,
tool_name: &str,
raw_identity: &str,
) -> (String, String) {
let suffix = callable_name_hash_suffix(raw_identity);
let max_tool_len = MAX_TOOL_NAME_LENGTH.saturating_sub(namespace.len());
if max_tool_len >= suffix.len() {
let prefix_len = max_tool_len - suffix.len();
return (
namespace.to_string(),
format!("{}{}", truncate_name(tool_name, prefix_len), suffix),
);
}
let max_namespace_len = MAX_TOOL_NAME_LENGTH - suffix.len();
(truncate_name(namespace, max_namespace_len), suffix)
}
fn unique_callable_parts(
namespace: &str,
tool_name: &str,
raw_identity: &str,
used_names: &mut HashSet<String>,
) -> (String, String, String) {
let qualified_name = format!("{namespace}{tool_name}");
if qualified_name.len() <= MAX_TOOL_NAME_LENGTH && used_names.insert(qualified_name.clone()) {
return (namespace.to_string(), tool_name.to_string(), qualified_name);
}
let mut attempt = 0_u32;
loop {
let hash_input = if attempt == 0 {
raw_identity.to_string()
} else {
format!("{raw_identity}\0{attempt}")
};
let (namespace, tool_name) =
fit_callable_parts_with_hash(namespace, tool_name, &hash_input);
let qualified_name = format!("{namespace}{tool_name}");
if used_names.insert(qualified_name.clone()) {
return (namespace, tool_name, qualified_name);
}
attempt = attempt.saturating_add(1);
}
}
#[derive(Debug)]
struct CallableToolCandidate {
tool: ToolInfo,
raw_namespace_identity: String,
raw_tool_identity: String,
callable_namespace: String,
callable_name: String,
}
/// Returns a qualified-name lookup for MCP tools.
@@ -272,92 +200,3 @@ where
}
qualified_tools
}
#[derive(Debug)]
struct CallableToolCandidate {
tool: ToolInfo,
raw_namespace_identity: String,
raw_tool_identity: String,
callable_namespace: String,
callable_name: String,
}
fn sha1_hex(s: &str) -> String {
let mut hasher = Sha1::new();
hasher.update(s.as_bytes());
let sha1 = hasher.finalize();
format!("{sha1:x}")
}
fn callable_name_hash_suffix(raw_identity: &str) -> String {
let hash = sha1_hex(raw_identity);
format!("_{}", &hash[..CALLABLE_NAME_HASH_LEN])
}
fn append_hash_suffix(value: &str, raw_identity: &str) -> String {
format!("{value}{}", callable_name_hash_suffix(raw_identity))
}
fn append_namespace_hash_suffix(namespace: &str, raw_identity: &str) -> String {
if let Some(namespace) = namespace.strip_suffix(MCP_TOOL_NAME_DELIMITER) {
format!(
"{}{}{}",
namespace,
callable_name_hash_suffix(raw_identity),
MCP_TOOL_NAME_DELIMITER
)
} else {
append_hash_suffix(namespace, raw_identity)
}
}
fn truncate_name(value: &str, max_len: usize) -> String {
value.chars().take(max_len).collect()
}
fn fit_callable_parts_with_hash(
namespace: &str,
tool_name: &str,
raw_identity: &str,
) -> (String, String) {
let suffix = callable_name_hash_suffix(raw_identity);
let max_tool_len = MAX_TOOL_NAME_LENGTH.saturating_sub(namespace.len());
if max_tool_len >= suffix.len() {
let prefix_len = max_tool_len - suffix.len();
return (
namespace.to_string(),
format!("{}{}", truncate_name(tool_name, prefix_len), suffix),
);
}
let max_namespace_len = MAX_TOOL_NAME_LENGTH - suffix.len();
(truncate_name(namespace, max_namespace_len), suffix)
}
fn unique_callable_parts(
namespace: &str,
tool_name: &str,
raw_identity: &str,
used_names: &mut HashSet<String>,
) -> (String, String, String) {
let qualified_name = format!("{namespace}{tool_name}");
if qualified_name.len() <= MAX_TOOL_NAME_LENGTH && used_names.insert(qualified_name.clone()) {
return (namespace.to_string(), tool_name.to_string(), qualified_name);
}
let mut attempt = 0_u32;
loop {
let hash_input = if attempt == 0 {
raw_identity.to_string()
} else {
format!("{raw_identity}\0{attempt}")
};
let (namespace, tool_name) =
fit_callable_parts_with_hash(namespace, tool_name, &hash_input);
let qualified_name = format!("{namespace}{tool_name}");
if used_names.insert(qualified_name.clone()) {
return (namespace, tool_name, qualified_name);
}
attempt = attempt.saturating_add(1);
}
}

View File

@@ -1,63 +0,0 @@
//! Runtime support for Model Context Protocol (MCP) servers.
//!
//! This module contains shared types and helpers used by [`McpConnectionManager`].
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use codex_exec_server::Environment;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::SandboxPolicy;
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SandboxState {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub permission_profile: Option<PermissionProfile>,
pub sandbox_policy: SandboxPolicy,
pub codex_linux_sandbox_exe: Option<PathBuf>,
pub sandbox_cwd: PathBuf,
#[serde(default)]
pub use_legacy_landlock: bool,
}
/// Runtime placement information used when starting MCP server transports.
///
/// `McpConfig` describes what servers exist. This value describes where those
/// servers should run for the current caller. Keep it explicit at manager
/// construction time so status/snapshot paths and real sessions make the same
/// local-vs-remote decision. `fallback_cwd` is not a per-server override; it is
/// used when a stdio server omits `cwd` and the launcher needs a concrete
/// process working directory.
#[derive(Clone)]
pub struct McpRuntimeEnvironment {
environment: Arc<Environment>,
fallback_cwd: PathBuf,
}
impl McpRuntimeEnvironment {
pub fn new(environment: Arc<Environment>, fallback_cwd: PathBuf) -> Self {
Self {
environment,
fallback_cwd,
}
}
pub(crate) fn environment(&self) -> Arc<Environment> {
Arc::clone(&self.environment)
}
pub(crate) fn fallback_cwd(&self) -> PathBuf {
self.fallback_cwd.clone()
}
}
pub(crate) fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) {
if let Some(metrics) = codex_otel::global() {
let _ = metrics.record_duration(metric, duration, tags);
}
}

View File

@@ -7,6 +7,7 @@ use crate::context::PersonalitySpecInstructions;
use crate::context::RealtimeEndInstructions;
use crate::context::RealtimeStartInstructions;
use crate::context::RealtimeStartWithInstructions;
use crate::context::UserInstructions;
use crate::session::PreviousTurnSettings;
use crate::session::turn_context::TurnContext;
use crate::shell::Shell;
@@ -215,6 +216,17 @@ pub(crate) fn build_settings_update_items(
// inputs or add explicit replay events so fork/resume can diff everything
// deterministically.
let contextual_user_message = build_environment_update_item(previous, next, shell);
let user_instructions_message = if previous.and_then(|prev| prev.user_instructions.as_ref())
== next.user_instructions.as_ref()
{
None
} else {
let text = next.user_instructions.clone().unwrap_or_default();
Some(ContextualUserFragment::into(UserInstructions {
text,
directory: next.cwd.to_string_lossy().into_owned(),
}))
};
let developer_update_sections = [
// Keep model-switch instructions first so model-specific guidance is read before
// any other context diffs on this turn.
@@ -228,10 +240,13 @@ pub(crate) fn build_settings_update_items(
.flatten()
.collect();
let mut items = Vec::with_capacity(2);
let mut items = Vec::with_capacity(3);
if let Some(developer_message) = build_developer_update_item(developer_update_sections) {
items.push(developer_message);
}
if let Some(user_instructions_message) = user_instructions_message {
items.push(user_instructions_message);
}
if let Some(contextual_user_message) = contextual_user_message {
items.push(contextual_user_message);
}

View File

@@ -626,6 +626,9 @@ impl Session {
.map(|turn_environment| turn_environment.cwd.clone())
.unwrap_or_else(|| session_configuration.cwd.clone());
let per_turn_config = Self::build_per_turn_config(&session_configuration, cwd.clone());
let user_instructions = AgentsMdManager::new(&per_turn_config)
.user_instructions(environment.as_deref())
.await;
{
let mcp_connection_manager = self.services.mcp_connection_manager.read().await;
mcp_connection_manager.set_approval_policy(&session_configuration.approval_policy);
@@ -686,6 +689,7 @@ impl Session {
goal_tools_supported,
);
turn_context.realtime_active = self.conversation.running_state().await.is_some();
turn_context.user_instructions = user_instructions;
if let Some(final_schema) = final_output_json_schema {
turn_context.final_output_json_schema = final_schema;

View File

@@ -45,12 +45,12 @@ fn format_labeled_requests_snapshot(
)
}
fn user_instructions_wrapper_count(request: &ResponsesRequest) -> usize {
fn user_instructions_texts(request: &ResponsesRequest) -> Vec<String> {
request
.message_input_texts("user")
.iter()
.into_iter()
.filter(|text| text.starts_with("# AGENTS.md instructions for "))
.count()
.collect()
}
fn format_environment_context_subagents_snapshot(subagents: &[&str]) -> String {
@@ -181,9 +181,7 @@ async fn snapshot_model_visible_layout_turn_overrides() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Diff `user_instructions` and emit updates when AGENTS.md content changes
// (for example after cwd changes), then update this test to assert refreshed AGENTS content.
async fn snapshot_model_visible_layout_cwd_change_does_not_refresh_agents() -> Result<()> {
async fn snapshot_model_visible_layout_cwd_change_refreshes_agents() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
@@ -273,20 +271,28 @@ async fn snapshot_model_visible_layout_cwd_change_does_not_refresh_agents() -> R
let requests = responses.requests();
assert_eq!(requests.len(), 2, "expected two requests");
let first_agents = user_instructions_texts(&requests[0]);
assert_eq!(
user_instructions_wrapper_count(&requests[0]),
0,
"expected first request to omit the serialized user-instructions wrapper when cwd-only project docs are introduced after session init"
first_agents.len(),
1,
"expected first request to include agents_one AGENTS instructions"
);
assert!(first_agents[0].contains("Turn one agents instructions."));
let second_agents = user_instructions_texts(&requests[1]);
assert_eq!(
user_instructions_wrapper_count(&requests[1]),
0,
"expected second request to keep omitting the serialized user-instructions wrapper after cwd change with the current session-scoped project doc behavior"
second_agents.len(),
2,
"expected second request to include original and refreshed AGENTS instructions"
);
assert!(
second_agents
.last()
.is_some_and(|text| text.contains("Turn two agents instructions."))
);
insta::assert_snapshot!(
"model_visible_layout_cwd_change_does_not_refresh_agents",
"model_visible_layout_cwd_change_refreshes_agents",
format_labeled_requests_snapshot(
"Second turn changes cwd to a directory with different AGENTS.md; current behavior does not emit refreshed AGENTS instructions.",
"Second turn changes cwd to a directory with different AGENTS.md; refreshed AGENTS instructions are emitted.",
&[
("First Request (agents_one)", &requests[0]),
("Second Request (agents_two cwd)", &requests[1]),

View File

@@ -1,22 +1,27 @@
---
source: core/tests/suite/model_visible_layout.rs
expression: "format_labeled_requests_snapshot(\"Second turn changes cwd to a directory with different AGENTS.md; current behavior does not emit refreshed AGENTS instructions.\",\n&[(\"First Request (agents_one)\", &requests[0]),\n(\"Second Request (agents_two cwd)\", &requests[1]),])"
expression: "format_labeled_requests_snapshot(\"Second turn changes cwd to a directory with different AGENTS.md; refreshed AGENTS instructions are emitted.\",\n&[(\"First Request (agents_one)\", &requests[0]),\n(\"Second Request (agents_two cwd)\", &requests[1]),])"
---
Scenario: Second turn changes cwd to a directory with different AGENTS.md; current behavior does not emit refreshed AGENTS instructions.
Scenario: Second turn changes cwd to a directory with different AGENTS.md; refreshed AGENTS instructions are emitted.
## First Request (agents_one)
00:message/developer[2]:
[01] <PERMISSIONS_INSTRUCTIONS>
[02] <SKILLS_INSTRUCTIONS>
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first turn in agents_one
## Second Request (agents_two cwd)
00:message/developer[2]:
[01] <PERMISSIONS_INSTRUCTIONS>
[02] <SKILLS_INSTRUCTIONS>
01:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:first turn in agents_one
03:message/assistant:turn one complete
04:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
05:message/user:second turn in agents_two
04:message/user:<AGENTS_MD>
05:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
06:message/user:second turn in agents_two

View File

@@ -845,8 +845,8 @@ pub const FEATURES: &[FeatureSpec] = &[
FeatureSpec {
id: Feature::UnavailableDummyTools,
key: "unavailable_dummy_tools",
stage: Stage::Stable,
default_enabled: true,
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::ToolSuggest,

View File

@@ -142,6 +142,15 @@ fn browser_controls_are_stable_and_enabled_by_default() {
assert_eq!(feature_for_key("computer_use"), Some(Feature::ComputerUse));
}
#[test]
fn unavailable_dummy_tools_is_under_development_and_disabled_by_default() {
assert_eq!(
Feature::UnavailableDummyTools.stage(),
Stage::UnderDevelopment
);
assert_eq!(Feature::UnavailableDummyTools.default_enabled(), false);
}
#[test]
fn general_analytics_is_stable_and_enabled_by_default() {
assert_eq!(Feature::GeneralAnalytics.stage(), Stage::Stable);