diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 40ca5c809d..5914f0cbc8 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2441,7 +2441,11 @@ dependencies = [ "codex-app-server-protocol", "pretty_assertions", "serde", + "serde_json", + "sha1", + "tempfile", "tokio", + "tracing", "urlencoding", ] diff --git a/codex-rs/chatgpt/src/connectors.rs b/codex-rs/chatgpt/src/connectors.rs index cbeb4fd1b7..2e54192e85 100644 --- a/codex-rs/chatgpt/src/connectors.rs +++ b/codex-rs/chatgpt/src/connectors.rs @@ -4,7 +4,8 @@ use std::time::Duration; use crate::chatgpt_client::chatgpt_get_request_with_timeout; use codex_app_server_protocol::AppInfo; -use codex_connectors::AllConnectorsCacheKey; +use codex_connectors::ConnectorDirectoryCacheContext; +use codex_connectors::ConnectorDirectoryCacheKey; use codex_connectors::DirectoryListResponse; use codex_connectors::filter::filter_disallowed_connectors; use codex_connectors::merge::merge_connectors; @@ -75,8 +76,8 @@ pub async fn list_cached_all_connectors(config: &Config) -> Option> } let auth = connector_auth(config).await.ok()?; - let cache_key = all_connectors_cache_key(config, &auth); - let connectors = codex_connectors::cached_all_connectors(&cache_key)?; + let cache_context = connector_directory_cache_context(config, &auth); + let connectors = codex_connectors::cached_directory_connectors(&cache_context)?; let connectors = merge_plugin_connectors( connectors, plugin_apps_for_config(config) @@ -98,9 +99,9 @@ pub async fn list_all_connectors_with_options( return Ok(Vec::new()); } let auth = connector_auth(config).await?; - let cache_key = all_connectors_cache_key(config, &auth); + let cache_context = connector_directory_cache_context(config, &auth); let connectors = codex_connectors::list_all_connectors_with_options( - cache_key, + cache_context, auth.is_workspace_account(), force_refetch, |path| async move { @@ -126,12 +127,18 @@ pub async fn list_all_connectors_with_options( )) } -fn all_connectors_cache_key(config: &Config, auth: &CodexAuth) -> AllConnectorsCacheKey { - AllConnectorsCacheKey::new( - config.chatgpt_base_url.clone(), - auth.get_account_id(), - auth.get_chatgpt_user_id(), - auth.is_workspace_account(), +fn connector_directory_cache_context( + config: &Config, + auth: &CodexAuth, +) -> ConnectorDirectoryCacheContext { + ConnectorDirectoryCacheContext::new( + config.codex_home.to_path_buf(), + ConnectorDirectoryCacheKey::new( + config.chatgpt_base_url.clone(), + auth.get_account_id(), + auth.get_chatgpt_user_id(), + auth.is_workspace_account(), + ), ) } diff --git a/codex-rs/connectors/Cargo.toml b/codex-rs/connectors/Cargo.toml index c0094102c3..1ebdae32dc 100644 --- a/codex-rs/connectors/Cargo.toml +++ b/codex-rs/connectors/Cargo.toml @@ -11,10 +11,14 @@ workspace = true anyhow = { workspace = true } codex-app-server-protocol = { workspace = true } serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +sha1 = { workspace = true } +tracing = { workspace = true } urlencoding = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } +tempfile = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } [lib] diff --git a/codex-rs/connectors/src/directory_cache.rs b/codex-rs/connectors/src/directory_cache.rs new file mode 100644 index 0000000000..581193b87c --- /dev/null +++ b/codex-rs/connectors/src/directory_cache.rs @@ -0,0 +1,112 @@ +use std::path::PathBuf; + +use codex_app_server_protocol::AppInfo; +use serde::Deserialize; +use serde::Serialize; +use sha1::Digest; +use sha1::Sha1; +use tracing::warn; + +use crate::ConnectorDirectoryCacheKey; + +pub(crate) const CONNECTOR_DIRECTORY_DISK_CACHE_SCHEMA_VERSION: u8 = 1; +const CONNECTOR_DIRECTORY_DISK_CACHE_DIR: &str = "cache/codex_app_directory"; + +#[derive(Clone)] +pub struct ConnectorDirectoryCacheContext { + pub(crate) codex_home: PathBuf, + pub(crate) cache_key: ConnectorDirectoryCacheKey, +} + +impl ConnectorDirectoryCacheContext { + pub fn new(codex_home: PathBuf, cache_key: ConnectorDirectoryCacheKey) -> Self { + Self { + codex_home, + cache_key, + } + } + + pub(crate) fn cache_path(&self) -> PathBuf { + let cache_key_json = serde_json::to_string(&self.cache_key).unwrap_or_default(); + let cache_key_hash = sha1_hex(&cache_key_json); + self.codex_home + .join(CONNECTOR_DIRECTORY_DISK_CACHE_DIR) + .join(format!("{cache_key_hash}.json")) + } +} + +pub(crate) enum CachedConnectorDirectoryDiskLoad { + Hit { connectors: Vec }, + Missing, + Invalid, +} + +pub(crate) fn load_cached_directory_connectors_from_disk( + cache_context: &ConnectorDirectoryCacheContext, +) -> CachedConnectorDirectoryDiskLoad { + let cache_path = cache_context.cache_path(); + let bytes = match std::fs::read(&cache_path) { + Ok(bytes) => bytes, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + return CachedConnectorDirectoryDiskLoad::Missing; + } + Err(err) => { + warn!( + cache_path = %cache_path.display(), + "failed to read connector directory disk cache: {err}" + ); + return CachedConnectorDirectoryDiskLoad::Invalid; + } + }; + let cache: ConnectorDirectoryDiskCache = match serde_json::from_slice(&bytes) { + Ok(cache) => cache, + Err(err) => { + warn!( + cache_path = %cache_path.display(), + "failed to parse connector directory disk cache: {err}" + ); + let _ = std::fs::remove_file(cache_path); + return CachedConnectorDirectoryDiskLoad::Invalid; + } + }; + if cache.schema_version != CONNECTOR_DIRECTORY_DISK_CACHE_SCHEMA_VERSION { + let _ = std::fs::remove_file(cache_path); + return CachedConnectorDirectoryDiskLoad::Invalid; + } + + CachedConnectorDirectoryDiskLoad::Hit { + connectors: cache.connectors, + } +} + +pub(crate) fn write_cached_directory_connectors_to_disk( + cache_context: &ConnectorDirectoryCacheContext, + connectors: &[AppInfo], +) { + let cache_path = cache_context.cache_path(); + if let Some(parent) = cache_path.parent() + && std::fs::create_dir_all(parent).is_err() + { + return; + } + let Ok(bytes) = serde_json::to_vec_pretty(&ConnectorDirectoryDiskCache { + schema_version: CONNECTOR_DIRECTORY_DISK_CACHE_SCHEMA_VERSION, + connectors: connectors.to_vec(), + }) else { + return; + }; + let _ = std::fs::write(cache_path, bytes); +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ConnectorDirectoryDiskCache { + schema_version: u8, + connectors: Vec, +} + +fn sha1_hex(value: &str) -> String { + let mut hasher = Sha1::new(); + hasher.update(value.as_bytes()); + let sha1 = hasher.finalize(); + format!("{sha1:x}") +} diff --git a/codex-rs/connectors/src/lib.rs b/codex-rs/connectors/src/lib.rs index e6260d0e1d..c2bf891115 100644 --- a/codex-rs/connectors/src/lib.rs +++ b/codex-rs/connectors/src/lib.rs @@ -9,23 +9,27 @@ use codex_app_server_protocol::AppBranding; use codex_app_server_protocol::AppInfo; use codex_app_server_protocol::AppMetadata; use serde::Deserialize; +use serde::Serialize; pub mod accessible; +mod directory_cache; pub mod filter; pub mod merge; pub mod metadata; +pub use directory_cache::ConnectorDirectoryCacheContext; + pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600); -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct AllConnectorsCacheKey { +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConnectorDirectoryCacheKey { chatgpt_base_url: String, account_id: Option, chatgpt_user_id: Option, is_workspace_account: bool, } -impl AllConnectorsCacheKey { +impl ConnectorDirectoryCacheKey { pub fn new( chatgpt_base_url: String, account_id: Option, @@ -42,13 +46,13 @@ impl AllConnectorsCacheKey { } #[derive(Clone)] -struct CachedAllConnectors { - key: AllConnectorsCacheKey, +struct CachedConnectorDirectory { + key: ConnectorDirectoryCacheKey, expires_at: Instant, connectors: Vec, } -static ALL_CONNECTORS_CACHE: LazyLock>> = +static CONNECTOR_DIRECTORY_CACHE: LazyLock>> = LazyLock::new(|| StdMutex::new(None)); #[derive(Debug, Deserialize)] @@ -76,26 +80,54 @@ pub struct DirectoryApp { visibility: Option, } -pub fn cached_all_connectors(cache_key: &AllConnectorsCacheKey) -> Option> { - let mut cache_guard = ALL_CONNECTORS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - let now = Instant::now(); - - if let Some(cached) = cache_guard.as_ref() { - if now < cached.expires_at && cached.key == *cache_key { - return Some(cached.connectors.clone()); - } - if now >= cached.expires_at { - *cache_guard = None; - } +pub fn cached_directory_connectors( + cache_context: &ConnectorDirectoryCacheContext, +) -> Option> { + if let Some(cached_connectors) = cached_directory_connectors_in_memory(&cache_context.cache_key) + { + return Some(cached_connectors); } + let directory_cache::CachedConnectorDirectoryDiskLoad::Hit { connectors } = + directory_cache::load_cached_directory_connectors_from_disk(cache_context) + else { + return None; + }; + write_cached_directory_connectors_in_memory( + cache_context.cache_key.clone(), + &connectors, + Duration::ZERO, + ); + Some(connectors) +} + +fn cached_directory_connectors_in_memory( + cache_key: &ConnectorDirectoryCacheKey, +) -> Option> { + let cache_guard = CONNECTOR_DIRECTORY_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + cache_guard + .as_ref() + .filter(|cached| cached.key == *cache_key) + .map(|cached| cached.connectors.clone()) +} + +fn unexpired_directory_connectors_in_memory( + cache_key: &ConnectorDirectoryCacheKey, +) -> Option> { + let cache_guard = CONNECTOR_DIRECTORY_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let cached = cache_guard.as_ref()?; + if cached.key == *cache_key && Instant::now() < cached.expires_at { + return Some(cached.connectors.clone()); + } None } pub async fn list_all_connectors_with_options( - cache_key: AllConnectorsCacheKey, + cache_context: ConnectorDirectoryCacheContext, is_workspace_account: bool, force_refetch: bool, mut fetch_page: F, @@ -104,7 +136,10 @@ where F: FnMut(String) -> Fut, Fut: Future>, { - if !force_refetch && let Some(cached_connectors) = cached_all_connectors(&cache_key) { + if !force_refetch + && let Some(cached_connectors) = + unexpired_directory_connectors_in_memory(&cache_context.cache_key) + { return Ok(cached_connectors); } @@ -132,17 +167,33 @@ where .cmp(&right.name) .then_with(|| left.id.cmp(&right.id)) }); - write_cached_all_connectors(cache_key, &connectors); + write_cached_directory_connectors(&cache_context, &connectors); Ok(connectors) } -fn write_cached_all_connectors(cache_key: AllConnectorsCacheKey, connectors: &[AppInfo]) { - let mut cache_guard = ALL_CONNECTORS_CACHE +fn write_cached_directory_connectors( + cache_context: &ConnectorDirectoryCacheContext, + connectors: &[AppInfo], +) { + write_cached_directory_connectors_in_memory( + cache_context.cache_key.clone(), + connectors, + CONNECTORS_CACHE_TTL, + ); + directory_cache::write_cached_directory_connectors_to_disk(cache_context, connectors); +} + +fn write_cached_directory_connectors_in_memory( + cache_key: ConnectorDirectoryCacheKey, + connectors: &[AppInfo], + ttl: Duration, +) { + let mut cache_guard = CONNECTOR_DIRECTORY_CACHE .lock() .unwrap_or_else(std::sync::PoisonError::into_inner); - *cache_guard = Some(CachedAllConnectors { + *cache_guard = Some(CachedConnectorDirectory { key: cache_key, - expires_at: Instant::now() + CONNECTORS_CACHE_TTL, + expires_at: Instant::now() + ttl, connectors: connectors.to_vec(), }); } @@ -417,12 +468,13 @@ mod tests { use std::sync::Mutex; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; + use tempfile::TempDir; - static ALL_CONNECTORS_CACHE_TEST_LOCK: LazyLock> = + static CONNECTOR_DIRECTORY_CACHE_TEST_LOCK: LazyLock> = LazyLock::new(|| tokio::sync::Mutex::new(())); - fn cache_key(id: &str) -> AllConnectorsCacheKey { - AllConnectorsCacheKey::new( + fn cache_key(id: &str) -> ConnectorDirectoryCacheKey { + ConnectorDirectoryCacheKey::new( "https://chatgpt.example".to_string(), Some(format!("account-{id}")), Some(format!("user-{id}")), @@ -430,6 +482,17 @@ mod tests { ) } + fn cache_context(codex_home: &TempDir, id: &str) -> ConnectorDirectoryCacheContext { + ConnectorDirectoryCacheContext::new(codex_home.path().to_path_buf(), cache_key(id)) + } + + fn clear_directory_memory_cache() { + let mut cache_guard = CONNECTOR_DIRECTORY_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *cache_guard = None; + } + fn app(id: &str, name: &str) -> DirectoryApp { DirectoryApp { id: id.to_string(), @@ -450,15 +513,16 @@ mod tests { clippy::await_holding_invalid_type, reason = "test serializes access to the shared connector cache for its full duration" )] - async fn list_all_connectors_uses_shared_cache() -> anyhow::Result<()> { - let _cache_guard = ALL_CONNECTORS_CACHE_TEST_LOCK.lock().await; + async fn list_all_connectors_uses_shared_directory_cache() -> anyhow::Result<()> { + let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await; let calls = Arc::new(AtomicUsize::new(0)); let call_counter = Arc::clone(&calls); - let key = cache_key("shared"); + let codex_home = TempDir::new()?; + let cache_context = cache_context(&codex_home, "shared"); let first = list_all_connectors_with_options( - key.clone(), + cache_context.clone(), /*is_workspace_account*/ false, /*force_refetch*/ false, move |_path| { @@ -475,7 +539,7 @@ mod tests { .await?; let second = list_all_connectors_with_options( - key, + cache_context, /*is_workspace_account*/ false, /*force_refetch*/ false, move |_path| async move { @@ -495,14 +559,15 @@ mod tests { reason = "test serializes access to the shared connector cache for its full duration" )] async fn list_all_connectors_merges_and_normalizes_directory_apps() -> anyhow::Result<()> { - let _cache_guard = ALL_CONNECTORS_CACHE_TEST_LOCK.lock().await; + let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await; - let key = cache_key("merged"); + let codex_home = TempDir::new()?; + let cache_context = cache_context(&codex_home, "merged"); let calls = Arc::new(AtomicUsize::new(0)); let call_counter = Arc::clone(&calls); let connectors = list_all_connectors_with_options( - key, + cache_context, /*is_workspace_account*/ true, /*force_refetch*/ true, move |path| { @@ -566,6 +631,134 @@ mod tests { Ok(()) } + #[tokio::test] + #[expect( + clippy::await_holding_invalid_type, + reason = "test serializes access to the shared connector cache for its full duration" + )] + async fn cached_directory_connectors_reads_directory_disk_cache() -> anyhow::Result<()> { + let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await; + + let codex_home = TempDir::new()?; + let cache_context = cache_context(&codex_home, "disk"); + let calls = Arc::new(AtomicUsize::new(0)); + let call_counter = Arc::clone(&calls); + + let first = list_all_connectors_with_options( + cache_context.clone(), + /*is_workspace_account*/ false, + /*force_refetch*/ false, + move |_path| { + let call_counter = Arc::clone(&call_counter); + async move { + call_counter.fetch_add(1, Ordering::SeqCst); + Ok(DirectoryListResponse { + apps: vec![app("alpha", "Alpha")], + next_token: None, + }) + } + }, + ) + .await?; + + clear_directory_memory_cache(); + + let second = cached_directory_connectors(&cache_context).expect("disk cache should load"); + + assert_eq!(calls.load(Ordering::SeqCst), 1); + assert_eq!(first, second); + Ok(()) + } + + #[tokio::test] + #[expect( + clippy::await_holding_invalid_type, + reason = "test serializes access to the shared connector cache for its full duration" + )] + async fn list_all_connectors_refreshes_when_only_directory_disk_cache_exists() + -> anyhow::Result<()> { + let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await; + + let codex_home = TempDir::new()?; + let cache_context = cache_context(&codex_home, "disk-refresh"); + let calls = Arc::new(AtomicUsize::new(0)); + let call_counter = Arc::clone(&calls); + + list_all_connectors_with_options( + cache_context.clone(), + /*is_workspace_account*/ false, + /*force_refetch*/ false, + move |_path| { + let call_counter = Arc::clone(&call_counter); + async move { + call_counter.fetch_add(1, Ordering::SeqCst); + Ok(DirectoryListResponse { + apps: vec![app("alpha", "Alpha")], + next_token: None, + }) + } + }, + ) + .await?; + + clear_directory_memory_cache(); + let mut cached_expected = directory_app_to_app_info(app("alpha", "Alpha")); + cached_expected.install_url = Some(connector_install_url( + &cached_expected.name, + &cached_expected.id, + )); + assert_eq!( + cached_directory_connectors(&cache_context), + Some(vec![cached_expected]) + ); + let refreshed_calls = Arc::clone(&calls); + + let refreshed = list_all_connectors_with_options( + cache_context, + /*is_workspace_account*/ false, + /*force_refetch*/ false, + move |_path| { + let call_counter = Arc::clone(&refreshed_calls); + async move { + call_counter.fetch_add(1, Ordering::SeqCst); + Ok(DirectoryListResponse { + apps: vec![app("beta", "Beta")], + next_token: None, + }) + } + }, + ) + .await?; + + let mut expected = directory_app_to_app_info(app("beta", "Beta")); + expected.install_url = Some(connector_install_url(&expected.name, &expected.id)); + assert_eq!(calls.load(Ordering::SeqCst), 2); + assert_eq!(refreshed, vec![expected]); + Ok(()) + } + + #[tokio::test] + async fn cached_directory_connectors_drops_stale_disk_schema() -> anyhow::Result<()> { + let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await; + + clear_directory_memory_cache(); + let codex_home = TempDir::new()?; + let cache_context = cache_context(&codex_home, "stale-schema"); + let cache_path = cache_context.cache_path(); + std::fs::create_dir_all(cache_path.parent().expect("cache parent"))?; + std::fs::write( + &cache_path, + serde_json::to_vec_pretty(&serde_json::json!({ + "schema_version": 0, + "connectors": [], + }))?, + )?; + + assert_eq!(cached_directory_connectors(&cache_context), None); + assert!(!cache_path.exists()); + Ok(()) + } + #[tokio::test] async fn list_directory_connectors_omits_tier_for_all_pages() -> anyhow::Result<()> { let requested_paths: Arc>> = Arc::new(Mutex::new(Vec::new())); diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs index 718b2d402a..7a66e4ffa6 100644 --- a/codex-rs/core/src/connectors.rs +++ b/codex-rs/core/src/connectors.rs @@ -6,21 +6,18 @@ use std::sync::Mutex as StdMutex; use std::time::Duration; use std::time::Instant; -use anyhow::Context; use async_channel::unbounded; -use codex_api::SharedAuthProvider; pub use codex_app_server_protocol::AppBranding; pub use codex_app_server_protocol::AppInfo; pub use codex_app_server_protocol::AppMetadata; -use codex_connectors::AllConnectorsCacheKey; -use codex_connectors::DirectoryListResponse; +use codex_connectors::ConnectorDirectoryCacheContext; +use codex_connectors::ConnectorDirectoryCacheKey; use codex_exec_server::EnvironmentManager; use codex_exec_server::ExecServerRuntimePaths; use codex_protocol::models::PermissionProfile; use codex_tools::DiscoverableTool; use rmcp::model::ToolAnnotations; use serde::Deserialize; -use serde::de::DeserializeOwned; use tracing::warn; use crate::config::Config; @@ -35,7 +32,6 @@ use codex_core_plugins::PluginsManager; use codex_features::Feature; use codex_login::AuthManager; use codex_login::CodexAuth; -use codex_login::default_client::create_client; use codex_login::default_client::originator; use codex_mcp::CODEX_APPS_MCP_SERVER_NAME; use codex_mcp::McpConnectionManager; @@ -48,7 +44,6 @@ use codex_mcp::host_owned_codex_apps_enabled; use codex_mcp::with_codex_apps_mcp; const CONNECTORS_READY_TIMEOUT_ON_EMPTY_TOOLS: Duration = Duration::from_secs(30); -const DIRECTORY_CONNECTORS_TIMEOUT: Duration = Duration::from_secs(60); #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct AppToolPolicy { @@ -119,9 +114,11 @@ pub(crate) async fn list_tool_suggest_discoverable_tools_with_auth( auth: Option<&CodexAuth>, accessible_connectors: &[AppInfo], ) -> anyhow::Result> { - let directory_connectors = - list_directory_connectors_for_tool_suggest_with_auth(config, auth).await?; let connector_ids = tool_suggest_connector_ids(config).await; + let directory_connectors = codex_connectors::merge::merge_plugin_connectors( + cached_directory_connectors_for_tool_suggest_with_auth(config, auth).await, + connector_ids.iter().cloned(), + ); let discoverable_connectors = codex_connectors::filter::filter_tool_suggest_discoverable_connectors( directory_connectors, @@ -435,12 +432,12 @@ async fn tool_suggest_connector_ids(config: &Config) -> HashSet { connector_ids } -async fn list_directory_connectors_for_tool_suggest_with_auth( +async fn cached_directory_connectors_for_tool_suggest_with_auth( config: &Config, auth: Option<&CodexAuth>, -) -> anyhow::Result> { +) -> Vec { if !config.features.enabled(Feature::Apps) { - return Ok(Vec::new()); + return Vec::new(); } let loaded_auth; @@ -453,67 +450,25 @@ async fn list_directory_connectors_for_tool_suggest_with_auth( loaded_auth.as_ref() }; let Some(auth) = auth.filter(|auth| auth.uses_codex_backend()) else { - return Ok(Vec::new()); + return Vec::new(); }; let account_id = match auth.get_account_id() { Some(account_id) if !account_id.is_empty() => account_id, - _ => return Ok(Vec::new()), + _ => return Vec::new(), }; - let auth_provider = codex_model_provider::auth_provider_from_auth(auth); let is_workspace_account = auth.is_workspace_account(); - let cache_key = AllConnectorsCacheKey::new( - config.chatgpt_base_url.clone(), - Some(account_id.clone()), - auth.get_chatgpt_user_id(), - is_workspace_account, + let cache_context = ConnectorDirectoryCacheContext::new( + config.codex_home.to_path_buf(), + ConnectorDirectoryCacheKey::new( + config.chatgpt_base_url.clone(), + Some(account_id), + auth.get_chatgpt_user_id(), + is_workspace_account, + ), ); - codex_connectors::list_all_connectors_with_options( - cache_key, - is_workspace_account, - /*force_refetch*/ false, - |path| { - let auth_provider = auth_provider.clone(); - async move { - chatgpt_get_request_with_auth_provider::( - config, - path, - auth_provider, - ) - .await - } - }, - ) - .await -} - -async fn chatgpt_get_request_with_auth_provider( - config: &Config, - path: String, - auth_provider: SharedAuthProvider, -) -> anyhow::Result { - let client = create_client(); - let url = format!("{}{}", config.chatgpt_base_url, path); - let response = client - .get(&url) - .headers(auth_provider.to_auth_headers()) - .header("Content-Type", "application/json") - .timeout(DIRECTORY_CONNECTORS_TIMEOUT) - .send() - .await - .context("failed to send request")?; - - if response.status().is_success() { - response - .json() - .await - .context("failed to parse JSON response") - } else { - let status = response.status(); - let body = response.text().await.unwrap_or_default(); - anyhow::bail!("request failed with status {status}: {body}"); - } + codex_connectors::cached_directory_connectors(&cache_context).unwrap_or_default() } pub(crate) fn accessible_connectors_from_mcp_tools(mcp_tools: &[ToolInfo]) -> Vec { diff --git a/codex-rs/core/src/connectors_tests.rs b/codex-rs/core/src/connectors_tests.rs index 014ab1cad8..6ded1610af 100644 --- a/codex-rs/core/src/connectors_tests.rs +++ b/codex-rs/core/src/connectors_tests.rs @@ -19,6 +19,7 @@ use codex_connectors::metadata::connector_install_url; use codex_connectors::metadata::connector_mention_slug; use codex_connectors::metadata::sanitize_name; use codex_features::Feature; +use codex_login::CodexAuth; use codex_mcp::CODEX_APPS_MCP_SERVER_NAME; use codex_mcp::ToolInfo; use codex_utils_absolute_path::AbsolutePathBuf; @@ -1120,6 +1121,42 @@ disabled_tools = [ ); } +#[tokio::test] +async fn tool_suggest_uses_connector_id_fallback_when_directory_cache_is_empty() { + let codex_home = tempdir().expect("tempdir should succeed"); + std::fs::write( + codex_home.path().join(CONFIG_TOML_FILE), + r#" +[features] +apps = true + +[tool_suggest] +discoverables = [ + { type = "connector", id = "connector_gmail" } +] +"#, + ) + .expect("write config"); + let config = ConfigBuilder::default() + .codex_home(codex_home.path().to_path_buf()) + .build() + .await + .expect("config should load"); + let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); + + let discoverable_tools = + list_tool_suggest_discoverable_tools_with_auth(&config, Some(&auth), &[]) + .await + .expect("discoverable tools should load"); + + assert_eq!( + discoverable_tools, + vec![DiscoverableTool::from(plugin_connector_to_app_info( + "connector_gmail".to_string(), + ))] + ); +} + #[test] fn filter_tool_suggest_discoverable_connectors_keeps_only_plugin_backed_uninstalled_apps() { let filtered = filter_tool_suggest_discoverable_connectors(