diff --git a/codex-rs/app-server/src/request_processors/plugins.rs b/codex-rs/app-server/src/request_processors/plugins.rs index 61336aa713..f8a9be6423 100644 --- a/codex-rs/app-server/src/request_processors/plugins.rs +++ b/codex-rs/app-server/src/request_processors/plugins.rs @@ -541,15 +541,9 @@ impl PluginRequestProcessor { remote_sources.push(RemoteMarketplaceSource::SharedWithMe); } if !remote_sources.is_empty() { - let remote_plugin_service_config = RemotePluginServiceConfig { - chatgpt_base_url: config.chatgpt_base_url.clone(), - }; - match codex_core_plugins::remote::fetch_remote_marketplaces( - &remote_plugin_service_config, - auth.as_ref(), - &remote_sources, - ) - .await + match plugins_manager + .remote_marketplaces_for_config(&plugins_input, auth.as_ref(), &remote_sources) + .await { Ok(remote_marketplaces) => { for remote_marketplace in remote_marketplaces @@ -1167,13 +1161,13 @@ impl PluginRequestProcessor { .await .map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"))?; - self.thread_manager - .plugins_manager() - .maybe_start_remote_installed_plugins_cache_refresh_after_mutation( - &config.plugins_config_input(), - auth.clone(), - Some(self.effective_plugins_changed_callback()), - ); + let plugins_manager = self.thread_manager.plugins_manager(); + plugins_manager.clear_remote_marketplaces_cache(); + plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation( + &config.plugins_config_input(), + auth.clone(), + Some(self.effective_plugins_changed_callback()), + ); let mut plugin_metadata = plugin_telemetry_metadata_from_root(&result.plugin_id, &result.installed_path).await; @@ -1463,6 +1457,7 @@ impl PluginRequestProcessor { Ok(()) | Err(RemotePluginCatalogError::CacheRemove(_)) ) { let plugins_manager = self.thread_manager.plugins_manager(); + plugins_manager.clear_remote_marketplaces_cache(); if plugins_manager.clear_remote_installed_plugins_cache() { self.on_effective_plugins_changed(); } diff --git a/codex-rs/app-server/tests/suite/v2/plugin_list.rs b/codex-rs/app-server/tests/suite/v2/plugin_list.rs index 0b2200a9d5..96c7a8a33f 100644 --- a/codex-rs/app-server/tests/suite/v2/plugin_list.rs +++ b/codex-rs/app-server/tests/suite/v2/plugin_list.rs @@ -1603,8 +1603,9 @@ async fn plugin_list_includes_remote_marketplaces_when_remote_plugin_enabled() - .mount(&server) .await; - let mut mcp = McpProcess::new(codex_home.path()).await?; + let mut mcp = McpProcess::new_with_plugin_startup_tasks(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + wait_for_remote_plugin_request_count(&server, "/ps/plugins/list", /*expected_count*/ 1).await?; let request_id = mcp .send_plugin_list_request(PluginListParams { @@ -1662,6 +1663,19 @@ async fn plugin_list_includes_remote_marketplaces_when_remote_plugin_enabled() - ] ); assert_eq!(response.featured_plugin_ids, Vec::::new()); + + let request_id = mcp + .send_plugin_list_request(PluginListParams { + cwds: None, + marketplace_kinds: None, + }) + .await?; + let _response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + wait_for_remote_plugin_request_count(&server, "/ps/plugins/list", /*expected_count*/ 1).await?; Ok(()) } diff --git a/codex-rs/core-plugins/src/manager.rs b/codex-rs/core-plugins/src/manager.rs index 2df35b401f..2317289ad8 100644 --- a/codex-rs/core-plugins/src/manager.rs +++ b/codex-rs/core-plugins/src/manager.rs @@ -36,6 +36,8 @@ use crate::marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome; use crate::marketplace_upgrade::configured_git_marketplace_names; use crate::marketplace_upgrade::upgrade_configured_git_marketplaces; use crate::remote::RemoteInstalledPlugin; +use crate::remote::RemoteMarketplace; +use crate::remote::RemoteMarketplaceSource; use crate::remote::RemotePluginCatalogError; use crate::remote::RemotePluginServiceConfig; use crate::remote_legacy::RemotePluginFetchError; @@ -71,10 +73,13 @@ use std::collections::HashMap; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; +use std::sync::Mutex; use std::sync::RwLock; use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::time::Instant; +use tokio::sync::Notify; use tokio::sync::Semaphore; use tracing::info; use tracing::warn; @@ -82,6 +87,7 @@ use tracing::warn; static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false); const FEATURED_PLUGIN_IDS_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 3); +const REMOTE_MARKETPLACES_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(60 * 5); #[derive(Debug, Clone)] pub struct PluginsConfigInput { @@ -125,6 +131,45 @@ struct CachedFeaturedPluginIds { featured_plugin_ids: Vec, } +#[derive(Clone, PartialEq, Eq, Hash)] +struct RemoteMarketplacesCacheKey { + chatgpt_base_url: String, + account_id: Option, + chatgpt_user_id: Option, + is_workspace_account: bool, + sources: Vec, +} + +#[derive(Clone)] +struct CachedRemoteMarketplaces { + expires_at: Instant, + marketplaces: Vec, +} + +struct RemoteMarketplacesFetchGuard<'a> { + manager: &'a PluginsManager, + cache_key: RemoteMarketplacesCacheKey, + notify: Arc, + active: bool, +} + +impl RemoteMarketplacesFetchGuard<'_> { + fn finish(mut self) { + self.manager + .finish_remote_marketplaces_fetch(&self.cache_key, &self.notify); + self.active = false; + } +} + +impl Drop for RemoteMarketplacesFetchGuard<'_> { + fn drop(&mut self) { + if self.active { + self.manager + .finish_remote_marketplaces_fetch(&self.cache_key, &self.notify); + } + } +} + struct RemoteInstalledPluginsCacheRefreshRequest { service_config: RemotePluginServiceConfig, auth: Option, @@ -191,6 +236,20 @@ fn featured_plugin_ids_cache_key( } } +fn remote_marketplaces_cache_key( + config: &PluginsConfigInput, + auth: Option<&CodexAuth>, + sources: &[RemoteMarketplaceSource], +) -> RemoteMarketplacesCacheKey { + RemoteMarketplacesCacheKey { + chatgpt_base_url: config.chatgpt_base_url.clone(), + account_id: auth.and_then(CodexAuth::get_account_id), + chatgpt_user_id: auth.and_then(CodexAuth::get_chatgpt_user_id), + is_workspace_account: auth.is_some_and(CodexAuth::is_workspace_account), + sources: sources.to_vec(), + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct PluginInstallRequest { pub plugin_name: String, @@ -399,6 +458,10 @@ pub struct PluginsManager { codex_home: PathBuf, store: PluginStore, featured_plugin_ids_cache: RwLock>, + remote_marketplaces_cache: + RwLock>, + remote_marketplaces_cache_generation: AtomicU64, + remote_marketplaces_in_flight: Mutex>>, configured_marketplace_upgrade_state: RwLock, non_curated_cache_refresh_state: RwLock, cached_enabled_outcome: RwLock>, @@ -436,6 +499,9 @@ impl PluginsManager { codex_home: codex_home.clone(), store: PluginStore::new(codex_home), featured_plugin_ids_cache: RwLock::new(None), + remote_marketplaces_cache: RwLock::new(HashMap::new()), + remote_marketplaces_cache_generation: AtomicU64::new(0), + remote_marketplaces_in_flight: Mutex::new(HashMap::new()), configured_marketplace_upgrade_state: RwLock::new( ConfiguredMarketplaceUpgradeState::default(), ), @@ -778,6 +844,129 @@ impl PluginsManager { }); } + fn cached_remote_marketplaces( + &self, + cache_key: &RemoteMarketplacesCacheKey, + ) -> Option> { + let mut cache = match self.remote_marketplaces_cache.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + let now = Instant::now(); + cache.retain(|_, cached| now < cached.expires_at); + cache + .get(cache_key) + .map(|cached| cached.marketplaces.clone()) + } + + fn write_remote_marketplaces_cache( + &self, + cache_key: RemoteMarketplacesCacheKey, + marketplaces: &[RemoteMarketplace], + ) { + let mut cache = match self.remote_marketplaces_cache.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + cache.insert( + cache_key, + CachedRemoteMarketplaces { + expires_at: Instant::now() + REMOTE_MARKETPLACES_CACHE_TTL, + marketplaces: marketplaces.to_vec(), + }, + ); + } + + pub fn clear_remote_marketplaces_cache(&self) { + self.remote_marketplaces_cache_generation + .fetch_add(1, Ordering::SeqCst); + let mut cache = match self.remote_marketplaces_cache.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + cache.clear(); + } + + fn begin_remote_marketplaces_fetch( + &self, + cache_key: &RemoteMarketplacesCacheKey, + ) -> Result, Arc> { + let mut in_flight = match self.remote_marketplaces_in_flight.lock() { + Ok(in_flight) => in_flight, + Err(err) => err.into_inner(), + }; + if let Some(notify) = in_flight.get(cache_key) { + return Err(Arc::clone(notify)); + } + let notify = Arc::new(Notify::new()); + in_flight.insert(cache_key.clone(), Arc::clone(¬ify)); + Ok(RemoteMarketplacesFetchGuard { + manager: self, + cache_key: cache_key.clone(), + notify, + active: true, + }) + } + + fn finish_remote_marketplaces_fetch( + &self, + cache_key: &RemoteMarketplacesCacheKey, + notify: &Arc, + ) { + let mut in_flight = match self.remote_marketplaces_in_flight.lock() { + Ok(in_flight) => in_flight, + Err(err) => err.into_inner(), + }; + if in_flight + .get(cache_key) + .is_some_and(|current| Arc::ptr_eq(current, notify)) + { + in_flight.remove(cache_key); + } + drop(in_flight); + notify.notify_waiters(); + } + + pub async fn remote_marketplaces_for_config( + &self, + config: &PluginsConfigInput, + auth: Option<&CodexAuth>, + sources: &[RemoteMarketplaceSource], + ) -> Result, RemotePluginCatalogError> { + let cache_key = remote_marketplaces_cache_key(config, auth, sources); + loop { + if let Some(marketplaces) = self.cached_remote_marketplaces(&cache_key) { + return Ok(marketplaces); + } + + let fetch_guard = match self.begin_remote_marketplaces_fetch(&cache_key) { + Ok(fetch_guard) => fetch_guard, + Err(notify) => { + notify.notified().await; + continue; + } + }; + let cache_generation = self + .remote_marketplaces_cache_generation + .load(Ordering::SeqCst); + let marketplaces = crate::remote::fetch_remote_marketplaces( + &remote_plugin_service_config(config), + auth, + sources, + ) + .await?; + if self + .remote_marketplaces_cache_generation + .load(Ordering::SeqCst) + == cache_generation + { + self.write_remote_marketplaces_cache(cache_key, &marketplaces); + } + fetch_guard.finish(); + return Ok(marketplaces); + } + } + pub async fn featured_plugin_ids_for_config( &self, config: &PluginsConfigInput, @@ -1518,9 +1707,22 @@ impl PluginsManager { ); manager.maybe_start_remote_installed_plugin_bundle_sync( &config, - auth, + auth.clone(), on_effective_plugins_changed, ); + if let Err(err) = manager + .remote_marketplaces_for_config( + &config, + auth.as_ref(), + &[RemoteMarketplaceSource::Global], + ) + .await + { + warn!( + error = %err, + "failed to warm remote marketplaces cache" + ); + } }); } diff --git a/codex-rs/core-plugins/src/remote.rs b/codex-rs/core-plugins/src/remote.rs index 47563b95b8..b7217f6e87 100644 --- a/codex-rs/core-plugins/src/remote.rs +++ b/codex-rs/core-plugins/src/remote.rs @@ -74,7 +74,7 @@ pub struct RemoteMarketplace { pub plugins: Vec, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum RemoteMarketplaceSource { Global, WorkspaceDirectory,