diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 59cbf57dd5..47384dcdfd 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -149,6 +149,7 @@ use crate::mcp::effective_mcp_servers; use crate::mcp::maybe_prompt_and_install_mcp_dependencies; use crate::mcp::with_codex_apps_mcp; use crate::mcp_connection_manager::McpConnectionManager; +use crate::mcp_connection_manager::codex_apps_tools_cache_key; use crate::mcp_connection_manager::filter_codex_apps_mcp_tools_only; use crate::mcp_connection_manager::filter_mcp_tools_by_name; use crate::memories; @@ -1336,6 +1337,8 @@ impl Session { tx_event.clone(), cancel_token, sandbox_state, + config.codex_home.clone(), + codex_apps_tools_cache_key(auth), ) .await; if !required_mcp_servers.is_empty() { @@ -2939,6 +2942,8 @@ impl Session { self.get_tx_event(), cancel_token, sandbox_state, + config.codex_home.clone(), + codex_apps_tools_cache_key(auth.as_ref()), ) .await; diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs index 35c2dc5ceb..5e625aa13b 100644 --- a/codex-rs/core/src/connectors.rs +++ b/codex-rs/core/src/connectors.rs @@ -22,8 +22,8 @@ use crate::features::Feature; use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; use crate::mcp::auth::compute_auth_statuses; use crate::mcp::with_codex_apps_mcp; -use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT; use crate::mcp_connection_manager::McpConnectionManager; +use crate::mcp_connection_manager::codex_apps_tools_cache_key; use crate::token_data::TokenData; pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600); @@ -109,6 +109,8 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options( tx_event, cancel_token.clone(), sandbox_state, + config.codex_home.clone(), + codex_apps_tools_cache_key(auth.as_ref()), ) .await; @@ -122,16 +124,14 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options( ); } - let codex_apps_ready = if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) { - let timeout = cfg.startup_timeout_sec.unwrap_or(DEFAULT_STARTUP_TIMEOUT); + let tools = mcp_connection_manager.list_all_tools().await; + let codex_apps_ready = if mcp_servers.contains_key(CODEX_APPS_MCP_SERVER_NAME) { mcp_connection_manager - .wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout) + .wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, Duration::ZERO) .await } else { false }; - - let tools = mcp_connection_manager.list_all_tools().await; cancel_token.cancel(); let accessible_connectors = accessible_connectors_from_mcp_tools(&tools); diff --git a/codex-rs/core/src/mcp/mod.rs b/codex-rs/core/src/mcp/mod.rs index 1365b5da8b..ba4894a266 100644 --- a/codex-rs/core/src/mcp/mod.rs +++ b/codex-rs/core/src/mcp/mod.rs @@ -25,6 +25,7 @@ use crate::features::Feature; use crate::mcp::auth::compute_auth_statuses; use crate::mcp_connection_manager::McpConnectionManager; use crate::mcp_connection_manager::SandboxState; +use crate::mcp_connection_manager::codex_apps_tools_cache_key; const MCP_TOOL_NAME_PREFIX: &str = "mcp"; const MCP_TOOL_NAME_DELIMITER: &str = "__"; @@ -212,6 +213,8 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent tx_event, cancel_token.clone(), sandbox_state, + config.codex_home.clone(), + codex_apps_tools_cache_key(auth.as_ref()), ) .await; diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index af2003375a..44eefb0d38 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -12,10 +12,7 @@ use std::env; use std::ffi::OsString; use std::path::PathBuf; use std::sync::Arc; -use std::sync::LazyLock; -use std::sync::Mutex as StdMutex; use std::time::Duration; -use std::time::Instant; use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; use crate::mcp::auth::McpAuthStatusEntry; @@ -88,7 +85,8 @@ pub const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10); /// Default timeout for individual tool calls. const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(60); -const CODEX_APPS_TOOLS_CACHE_TTL: Duration = Duration::from_secs(3600); +const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 1; +const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools"; /// The Responses API requires tool names to match `^[a-zA-Z0-9_-]+$`. /// MCP server/tool names are user-controlled, so sanitize the fully-qualified @@ -117,6 +115,27 @@ fn sha1_hex(s: &str) -> String { format!("{sha1:x}") } +pub(crate) fn codex_apps_tools_cache_key( + auth: Option<&crate::CodexAuth>, +) -> CodexAppsToolsCacheKey { + let token_data = auth.and_then(|auth| auth.get_token_data().ok()); + let account_id = token_data + .as_ref() + .and_then(|token_data| token_data.account_id.clone()); + let chatgpt_user_id = token_data + .as_ref() + .and_then(|token_data| token_data.id_token.chatgpt_user_id.clone()); + let is_workspace_account = token_data + .as_ref() + .is_some_and(|token_data| token_data.id_token.is_workspace_account()); + + CodexAppsToolsCacheKey { + account_id, + chatgpt_user_id, + is_workspace_account, + } +} + fn qualify_tools(tools: I) -> HashMap where I: IntoIterator, @@ -159,7 +178,7 @@ where qualified_tools } -#[derive(Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct ToolInfo { pub(crate) server_name: String, pub(crate) tool_name: String, @@ -168,14 +187,40 @@ pub(crate) struct ToolInfo { pub(crate) connector_name: Option, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct CodexAppsToolsCacheKey { + account_id: Option, + chatgpt_user_id: Option, + is_workspace_account: bool, +} + #[derive(Clone)] -struct CachedCodexAppsTools { - expires_at: Instant, +struct CodexAppsToolsCacheContext { + codex_home: PathBuf, + user_key: CodexAppsToolsCacheKey, +} + +impl CodexAppsToolsCacheContext { + fn cache_path(&self) -> PathBuf { + let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default(); + let user_key_hash = sha1_hex(&user_key_json); + self.codex_home + .join(CODEX_APPS_TOOLS_CACHE_DIR) + .join(format!("{user_key_hash}.json")) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CodexAppsToolsDiskCache { + schema_version: u8, tools: Vec, } -static CODEX_APPS_TOOLS_CACHE: LazyLock>> = - LazyLock::new(|| StdMutex::new(None)); +enum CachedCodexAppsToolsLoad { + Hit(Vec), + Missing, + Invalid, +} type ResponderMap = HashMap<(String, RequestId), oneshot::Sender>; @@ -253,6 +298,7 @@ struct ManagedClient { tool_filter: ToolFilter, tool_timeout: Option, server_supports_sandbox_state_capability: bool, + codex_apps_tools_cache_context: Option, } impl ManagedClient { @@ -276,6 +322,13 @@ impl ManagedClient { #[derive(Clone)] struct AsyncManagedClient { client: Shared>>, + startup_snapshot: StartupToolSnapshot, +} + +#[derive(Clone)] +enum StartupToolSnapshot { + CacheHit(Vec), + Unavailable, } impl AsyncManagedClient { @@ -286,8 +339,19 @@ impl AsyncManagedClient { cancel_token: CancellationToken, tx_event: Sender, elicitation_requests: ElicitationRequestManager, + codex_apps_tools_cache_context: Option, ) -> Self { let tool_filter = ToolFilter::from_config(&config); + let startup_snapshot = match load_startup_cached_codex_apps_tools_snapshot( + &server_name, + codex_apps_tools_cache_context.as_ref(), + ) { + StartupToolSnapshot::CacheHit(tools) => { + StartupToolSnapshot::CacheHit(filter_tools(tools, tool_filter.clone())) + } + StartupToolSnapshot::Unavailable => StartupToolSnapshot::Unavailable, + }; + let startup_tool_filter = tool_filter.clone(); let fut = async move { if let Err(error) = validate_mcp_server_name(&server_name) { return Err(error.into()); @@ -300,9 +364,10 @@ impl AsyncManagedClient { client, config.startup_timeout_sec.or(Some(DEFAULT_STARTUP_TIMEOUT)), config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT), - tool_filter, + startup_tool_filter, tx_event, elicitation_requests, + codex_apps_tools_cache_context, ) .or_cancel(&cancel_token) .await @@ -313,6 +378,7 @@ impl AsyncManagedClient { }; Self { client: fut.boxed().shared(), + startup_snapshot, } } @@ -320,6 +386,10 @@ impl AsyncManagedClient { self.client.clone().await } + fn try_client(&self) -> Option> { + self.client.clone().now_or_never() + } + async fn notify_sandbox_state_change(&self, sandbox_state: &SandboxState) -> Result<()> { let managed = self.client().await?; managed.notify_sandbox_state_change(sandbox_state).await @@ -358,6 +428,8 @@ impl McpConnectionManager { tx_event: Sender, cancel_token: CancellationToken, initial_sandbox_state: SandboxState, + codex_home: PathBuf, + codex_apps_tools_cache_key: CodexAppsToolsCacheKey, ) { if cancel_token.is_cancelled() { return; @@ -376,6 +448,14 @@ impl McpConnectionManager { }, ) .await; + let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME { + Some(CodexAppsToolsCacheContext { + codex_home: codex_home.clone(), + user_key: codex_apps_tools_cache_key.clone(), + }) + } else { + None + }; let async_managed_client = AsyncManagedClient::new( server_name.clone(), cfg, @@ -383,6 +463,7 @@ impl McpConnectionManager { cancel_token.clone(), tx_event.clone(), elicitation_requests.clone(), + codex_apps_tools_cache_context, ); clients.insert(server_name.clone(), async_managed_client.clone()); let tx_event = tx_event.clone(); @@ -510,34 +591,62 @@ impl McpConnectionManager { failures } + async fn list_tools_for_ready_client( + server_name: &str, + client: ManagedClient, + ) -> Vec { + let rmcp_client = client.client; + let tool_timeout = client.tool_timeout; + let tool_filter = client.tool_filter; + let mut server_tools = client.tools; + + if server_name == CODEX_APPS_MCP_SERVER_NAME { + match list_tools_for_client( + server_name, + &rmcp_client, + tool_timeout, + client.codex_apps_tools_cache_context.as_ref(), + false, + ) + .await + { + Ok(fresh_or_cached_tools) => { + server_tools = fresh_or_cached_tools; + } + Err(err) => { + warn!( + "Failed to refresh tools for MCP server '{server_name}', using startup snapshot: {err:#}" + ); + } + } + } + + filter_tools(server_tools, tool_filter) + } + /// Returns a single map that contains all tools. Each key is the /// fully-qualified name for the tool. #[instrument(level = "trace", skip_all)] pub async fn list_all_tools(&self) -> HashMap { let mut tools = HashMap::new(); for (server_name, managed_client) in &self.clients { - let client = managed_client.client().await.ok(); - if let Some(client) = client { - let rmcp_client = client.client; - let tool_timeout = client.tool_timeout; - let tool_filter = client.tool_filter; - let mut server_tools = client.tools; - - if server_name == CODEX_APPS_MCP_SERVER_NAME { - match list_tools_for_client(server_name, &rmcp_client, tool_timeout).await { - Ok(fresh_or_cached_tools) => { - server_tools = fresh_or_cached_tools; - } - Err(err) => { - warn!( - "Failed to refresh tools for MCP server '{server_name}', using startup snapshot: {err:#}" - ); - } + let ready_client = match managed_client.try_client() { + Some(Ok(client)) => Some(client), + Some(Err(_)) => None, + None => match &managed_client.startup_snapshot { + StartupToolSnapshot::CacheHit(tools_from_cache) => { + tools.extend(qualify_tools(tools_from_cache.clone())); + None } - } + StartupToolSnapshot::Unavailable => managed_client.client().await.ok(), + }, + }; - tools.extend(qualify_tools(filter_tools(server_tools, tool_filter))); - } + let Some(ready_client) = ready_client else { + continue; + }; + let server_tools = Self::list_tools_for_ready_client(server_name, ready_client).await; + tools.extend(qualify_tools(server_tools)); } tools } @@ -565,7 +674,9 @@ impl McpConnectionManager { format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'") })?; - write_cached_codex_apps_tools(&tools); + if let Some(cache_context) = managed_client.codex_apps_tools_cache_context.as_ref() { + write_cached_codex_apps_tools(cache_context, &tools); + } Ok(()) } @@ -997,6 +1108,7 @@ async fn start_server_task( tool_filter: ToolFilter, tx_event: Sender, elicitation_requests: ElicitationRequestManager, + codex_apps_tools_cache_context: Option, ) -> Result { let params = InitializeRequestParams { meta: None, @@ -1033,9 +1145,15 @@ async fn start_server_task( .await .map_err(StartupOutcomeError::from)?; - let tools = list_tools_for_client(&server_name, &client, startup_timeout) - .await - .map_err(StartupOutcomeError::from)?; + let tools = list_tools_for_client( + &server_name, + &client, + startup_timeout, + codex_apps_tools_cache_context.as_ref(), + true, + ) + .await + .map_err(StartupOutcomeError::from)?; let server_supports_sandbox_state_capability = initialize_result .capabilities @@ -1050,6 +1168,7 @@ async fn start_server_task( tool_timeout: Some(tool_timeout), tool_filter, server_supports_sandbox_state_capability, + codex_apps_tools_cache_context, }; Ok(managed) @@ -1103,51 +1222,122 @@ async fn list_tools_for_client( server_name: &str, client: &Arc, timeout: Option, + codex_apps_tools_cache_context: Option<&CodexAppsToolsCacheContext>, + refresh_in_background: bool, ) -> Result> { - if server_name == CODEX_APPS_MCP_SERVER_NAME - && let Some(cached_tools) = read_cached_codex_apps_tools() - { - return Ok(cached_tools); + if server_name != CODEX_APPS_MCP_SERVER_NAME { + return list_tools_for_client_uncached(server_name, client, timeout).await; } - let tools = list_tools_for_client_uncached(server_name, client, timeout).await?; - if server_name == CODEX_APPS_MCP_SERVER_NAME { - write_cached_codex_apps_tools(&tools); + let Some(cache_context) = codex_apps_tools_cache_context.cloned() else { + return list_tools_for_client_uncached(server_name, client, timeout).await; + }; + + let cached = load_cached_codex_apps_tools(&cache_context); + if refresh_in_background + && matches!( + cached, + CachedCodexAppsToolsLoad::Hit(_) | CachedCodexAppsToolsLoad::Invalid + ) + { + refresh_codex_apps_tools_cache_async(Arc::clone(client), timeout, cache_context.clone()); + } + + match cached { + CachedCodexAppsToolsLoad::Hit(tools) => Ok(tools), + CachedCodexAppsToolsLoad::Invalid if refresh_in_background => Ok(Vec::new()), + CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => { + let tools = list_tools_for_client_uncached(server_name, client, timeout).await?; + write_cached_codex_apps_tools(&cache_context, &tools); + Ok(tools) + } } - Ok(tools) } -fn read_cached_codex_apps_tools() -> Option> { - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - let now = Instant::now(); - - if let Some(cached) = cache_guard.as_ref() - && now < cached.expires_at - { - return Some(cached.tools.clone()); - } - - if cache_guard - .as_ref() - .is_some_and(|cached| now >= cached.expires_at) - { - *cache_guard = None; - } - None -} - -fn write_cached_codex_apps_tools(tools: &[ToolInfo]) { - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - *cache_guard = Some(CachedCodexAppsTools { - expires_at: Instant::now() + CODEX_APPS_TOOLS_CACHE_TTL, - tools: tools.to_vec(), +fn refresh_codex_apps_tools_cache_async( + client: Arc, + timeout: Option, + cache_context: CodexAppsToolsCacheContext, +) { + tokio::spawn(async move { + match list_tools_for_client_uncached(CODEX_APPS_MCP_SERVER_NAME, &client, timeout).await { + Ok(tools) => write_cached_codex_apps_tools(&cache_context, &tools), + Err(err) => { + warn!( + "Failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}': {err:#}" + ); + } + } }); } +fn load_startup_cached_codex_apps_tools_snapshot( + server_name: &str, + cache_context: Option<&CodexAppsToolsCacheContext>, +) -> StartupToolSnapshot { + if server_name != CODEX_APPS_MCP_SERVER_NAME { + return StartupToolSnapshot::Unavailable; + } + + let Some(cache_context) = cache_context else { + return StartupToolSnapshot::Unavailable; + }; + + match load_cached_codex_apps_tools(cache_context) { + CachedCodexAppsToolsLoad::Hit(tools) => StartupToolSnapshot::CacheHit(tools), + CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => { + StartupToolSnapshot::Unavailable + } + } +} + +#[cfg(test)] +fn read_cached_codex_apps_tools( + cache_context: &CodexAppsToolsCacheContext, +) -> Option> { + match load_cached_codex_apps_tools(cache_context) { + CachedCodexAppsToolsLoad::Hit(tools) => Some(tools), + CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None, + } +} + +fn load_cached_codex_apps_tools( + cache_context: &CodexAppsToolsCacheContext, +) -> CachedCodexAppsToolsLoad { + let cache_path = cache_context.cache_path(); + let bytes = match std::fs::read(cache_path) { + Ok(bytes) => bytes, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + return CachedCodexAppsToolsLoad::Missing; + } + Err(_) => return CachedCodexAppsToolsLoad::Invalid, + }; + let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) { + Ok(cache) => cache, + Err(_) => return CachedCodexAppsToolsLoad::Invalid, + }; + if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION { + return CachedCodexAppsToolsLoad::Invalid; + } + CachedCodexAppsToolsLoad::Hit(cache.tools) +} + +fn write_cached_codex_apps_tools(cache_context: &CodexAppsToolsCacheContext, tools: &[ToolInfo]) { + let cache_path = cache_context.cache_path(); + if let Some(parent) = cache_path.parent() + && std::fs::create_dir_all(parent).is_err() + { + return; + } + let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache { + schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION, + tools: tools.to_vec(), + }) else { + return; + }; + let _ = std::fs::write(cache_path, bytes); +} + async fn list_tools_for_client_uncached( server_name: &str, client: &Arc, @@ -1262,6 +1452,7 @@ mod tests { use rmcp::model::JsonObject; use std::collections::HashSet; use std::sync::Arc; + use tempfile::tempdir; fn create_test_tool(server_name: &str, tool_name: &str) -> ToolInfo { ToolInfo { @@ -1283,19 +1474,19 @@ mod tests { } } - fn with_clean_codex_apps_tools_cache(f: impl FnOnce() -> T) -> T { - let previous_cache = { - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - cache_guard.take() - }; - let result = f(); - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - *cache_guard = previous_cache; - result + fn create_codex_apps_tools_cache_context( + codex_home: PathBuf, + account_id: Option<&str>, + chatgpt_user_id: Option<&str>, + ) -> CodexAppsToolsCacheContext { + CodexAppsToolsCacheContext { + codex_home, + user_key: CodexAppsToolsCacheKey { + account_id: account_id.map(ToOwned::to_owned), + chatgpt_user_id: chatgpt_user_id.map(ToOwned::to_owned), + is_workspace_account: false, + }, + } } #[test] @@ -1452,43 +1643,191 @@ mod tests { #[test] fn codex_apps_tools_cache_is_overwritten_by_last_write() { - with_clean_codex_apps_tools_cache(|| { - let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")]; - let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")]; + let codex_home = tempdir().expect("tempdir"); + let cache_context = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")]; + let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")]; - write_cached_codex_apps_tools(&tools_gateway_1); - let cached_gateway_1 = - read_cached_codex_apps_tools().expect("cache entry exists for first write"); - assert_eq!(cached_gateway_1[0].tool_name, "one"); + write_cached_codex_apps_tools(&cache_context, &tools_gateway_1); + let cached_gateway_1 = read_cached_codex_apps_tools(&cache_context) + .expect("cache entry exists for first write"); + assert_eq!(cached_gateway_1[0].tool_name, "one"); - write_cached_codex_apps_tools(&tools_gateway_2); - let cached_gateway_2 = - read_cached_codex_apps_tools().expect("cache entry exists for second write"); - assert_eq!(cached_gateway_2[0].tool_name, "two"); - }); + write_cached_codex_apps_tools(&cache_context, &tools_gateway_2); + let cached_gateway_2 = read_cached_codex_apps_tools(&cache_context) + .expect("cache entry exists for second write"); + assert_eq!(cached_gateway_2[0].tool_name, "two"); } #[test] - fn codex_apps_tools_cache_is_cleared_when_expired() { - with_clean_codex_apps_tools_cache(|| { - let tools = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "stale_tool")]; - write_cached_codex_apps_tools(&tools); + fn codex_apps_tools_cache_is_scoped_per_user() { + let codex_home = tempdir().expect("tempdir"); + let cache_context_user_1 = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let cache_context_user_2 = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-two"), + Some("user-two"), + ); + let tools_user_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")]; + let tools_user_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")]; - { - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - cache_guard.as_mut().expect("cache exists").expires_at = - Instant::now() - Duration::from_secs(1); - } + write_cached_codex_apps_tools(&cache_context_user_1, &tools_user_1); + write_cached_codex_apps_tools(&cache_context_user_2, &tools_user_2); - assert!(read_cached_codex_apps_tools().is_none()); + let read_user_1 = + read_cached_codex_apps_tools(&cache_context_user_1).expect("cache entry for user one"); + let read_user_2 = + read_cached_codex_apps_tools(&cache_context_user_2).expect("cache entry for user two"); - let cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - assert!(cache_guard.is_none()); - }); + assert_eq!(read_user_1[0].tool_name, "one"); + assert_eq!(read_user_2[0].tool_name, "two"); + assert_ne!( + cache_context_user_1.cache_path(), + cache_context_user_2.cache_path(), + "each user should get an isolated cache file" + ); + } + + #[test] + fn codex_apps_tools_cache_is_ignored_when_schema_version_mismatches() { + let codex_home = tempdir().expect("tempdir"); + let cache_context = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let cache_path = cache_context.cache_path(); + if let Some(parent) = cache_path.parent() { + std::fs::create_dir_all(parent).expect("create parent"); + } + let bytes = serde_json::to_vec_pretty(&serde_json::json!({ + "schema_version": CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION + 1, + "tools": [create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")], + })) + .expect("serialize"); + std::fs::write(cache_path, bytes).expect("write"); + + assert!(read_cached_codex_apps_tools(&cache_context).is_none()); + } + + #[test] + fn codex_apps_tools_cache_is_ignored_when_json_is_invalid() { + let codex_home = tempdir().expect("tempdir"); + let cache_context = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let cache_path = cache_context.cache_path(); + if let Some(parent) = cache_path.parent() { + std::fs::create_dir_all(parent).expect("create parent"); + } + std::fs::write(cache_path, b"{not json").expect("write"); + + assert!(read_cached_codex_apps_tools(&cache_context).is_none()); + } + + #[test] + fn startup_cached_codex_apps_tools_loads_from_disk_cache() { + let codex_home = tempdir().expect("tempdir"); + let cache_context = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let cached_tools = vec![create_test_tool( + CODEX_APPS_MCP_SERVER_NAME, + "calendar_search", + )]; + write_cached_codex_apps_tools(&cache_context, &cached_tools); + + let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot( + CODEX_APPS_MCP_SERVER_NAME, + Some(&cache_context), + ); + let StartupToolSnapshot::CacheHit(startup_tools) = startup_snapshot else { + panic!("expected startup snapshot to load from cache"); + }; + + assert_eq!(startup_tools.len(), 1); + assert_eq!(startup_tools[0].server_name, CODEX_APPS_MCP_SERVER_NAME); + assert_eq!(startup_tools[0].tool_name, "calendar_search"); + } + + #[tokio::test] + async fn list_all_tools_uses_startup_snapshot_while_client_is_pending() { + let startup_tools = vec![create_test_tool( + CODEX_APPS_MCP_SERVER_NAME, + "calendar_create_event", + )]; + let pending_client = + futures::future::pending::>() + .boxed() + .shared(); + let mut manager = McpConnectionManager::default(); + manager.clients.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + AsyncManagedClient { + client: pending_client, + startup_snapshot: StartupToolSnapshot::CacheHit(startup_tools), + }, + ); + + let tools = manager.list_all_tools().await; + let tool = tools + .get("mcp__codex_apps__calendar_create_event") + .expect("tool from startup cache"); + assert_eq!(tool.server_name, CODEX_APPS_MCP_SERVER_NAME); + assert_eq!(tool.tool_name, "calendar_create_event"); + } + + #[tokio::test] + async fn list_all_tools_blocks_while_client_is_pending_without_startup_snapshot() { + let pending_client = + futures::future::pending::>() + .boxed() + .shared(); + let mut manager = McpConnectionManager::default(); + manager.clients.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + AsyncManagedClient { + client: pending_client, + startup_snapshot: StartupToolSnapshot::Unavailable, + }, + ); + + let timeout_result = + tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await; + assert!(timeout_result.is_err()); + } + + #[tokio::test] + async fn list_all_tools_does_not_block_when_startup_snapshot_cache_hit_is_empty() { + let pending_client = + futures::future::pending::>() + .boxed() + .shared(); + let mut manager = McpConnectionManager::default(); + manager.clients.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + AsyncManagedClient { + client: pending_client, + startup_snapshot: StartupToolSnapshot::CacheHit(Vec::new()), + }, + ); + + let timeout_result = + tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await; + let tools = timeout_result.expect("cache-hit startup snapshot should not block"); + assert!(tools.is_empty()); } #[test]