use super::PluginLoadOutcome; use super::startup_remote_sync::start_startup_remote_plugin_sync_once; use crate::OPENAI_CURATED_MARKETPLACE_NAME; use crate::installed_marketplaces::installed_marketplace_roots_from_layer_stack; use crate::loader::configured_curated_plugin_ids_from_codex_home; use crate::loader::curated_plugin_cache_version; use crate::loader::installed_plugin_telemetry_metadata; use crate::loader::load_plugin_apps; use crate::loader::load_plugin_hooks; use crate::loader::load_plugin_mcp_servers; use crate::loader::load_plugin_skills; use crate::loader::load_plugins_from_layer_stack; use crate::loader::log_plugin_load_errors; use crate::loader::materialize_marketplace_plugin_source; use crate::loader::plugin_telemetry_metadata_from_root; use crate::loader::refresh_curated_plugin_cache; use crate::loader::refresh_non_curated_plugin_cache; use crate::loader::refresh_non_curated_plugin_cache_force_reinstall; use crate::loader::remote_installed_plugins_to_config; use crate::manifest::PluginManifestInterface; use crate::manifest::load_plugin_manifest; use crate::marketplace::MarketplaceError; use crate::marketplace::MarketplaceInterface; use crate::marketplace::MarketplaceListError; use crate::marketplace::MarketplacePluginAuthPolicy; use crate::marketplace::MarketplacePluginPolicy; use crate::marketplace::MarketplacePluginSource; use crate::marketplace::ResolvedMarketplacePlugin; use crate::marketplace::find_installable_marketplace_plugin; use crate::marketplace::find_marketplace_plugin; use crate::marketplace::list_marketplaces; use crate::marketplace::load_marketplace; use crate::marketplace::plugin_interface_with_marketplace_category; use crate::marketplace_upgrade::ConfiguredMarketplaceUpgradeError; use crate::marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome; use crate::marketplace_upgrade::configured_git_marketplace_names; use crate::marketplace_upgrade::upgrade_configured_git_marketplaces; use crate::remote::RemoteInstalledPlugin; use crate::remote::RemotePluginCatalogError; use crate::remote::RemotePluginServiceConfig; use crate::remote_legacy::RemotePluginFetchError; use crate::remote_legacy::RemotePluginMutationError; use crate::startup_sync::curated_plugins_repo_path; use crate::startup_sync::read_curated_plugins_sha; use crate::startup_sync::sync_openai_plugins_repo; use crate::store::PluginInstallResult as StorePluginInstallResult; use crate::store::PluginStore; use crate::store::PluginStoreError; use codex_analytics::AnalyticsEventsClient; use codex_config::ConfigLayerStack; use codex_config::PluginConfigEdit; use codex_config::apply_user_plugin_config_edits; use codex_config::clear_user_plugin; use codex_config::set_user_plugin_enabled; use codex_config::types::PluginConfig; use codex_config::version_for_toml; use codex_core_skills::SkillMetadata; use codex_hooks::plugin_hook_declarations; use codex_login::AuthManager; use codex_login::CodexAuth; use codex_plugin::AppConnectorId; use codex_plugin::PluginCapabilitySummary; use codex_plugin::PluginId; use codex_plugin::PluginIdError; use codex_plugin::prompt_safe_plugin_description; use codex_protocol::protocol::HookEventName; use codex_protocol::protocol::Product; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_plugins::PluginSkillRoot; use std::collections::HashMap; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Instant; use tokio::sync::Semaphore; use tracing::info; use tracing::warn; static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false); const FEATURED_PLUGIN_IDS_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 3); #[derive(Debug, Clone)] pub struct PluginsConfigInput { pub config_layer_stack: ConfigLayerStack, pub plugins_enabled: bool, pub remote_plugin_enabled: bool, pub plugin_hooks_enabled: bool, pub chatgpt_base_url: String, } impl PluginsConfigInput { pub fn new( config_layer_stack: ConfigLayerStack, plugins_enabled: bool, remote_plugin_enabled: bool, plugin_hooks_enabled: bool, chatgpt_base_url: String, ) -> Self { Self { config_layer_stack, plugins_enabled, remote_plugin_enabled, plugin_hooks_enabled, chatgpt_base_url, } } } #[derive(Clone, PartialEq, Eq)] struct FeaturedPluginIdsCacheKey { chatgpt_base_url: String, account_id: Option, chatgpt_user_id: Option, is_workspace_account: bool, } #[derive(Clone)] struct CachedFeaturedPluginIds { key: FeaturedPluginIdsCacheKey, expires_at: Instant, featured_plugin_ids: Vec, } struct RemoteInstalledPluginsCacheRefreshRequest { service_config: RemotePluginServiceConfig, auth: Option, notify: RemoteInstalledPluginsCacheRefreshNotify, // App-server attaches side effects such as skills metadata invalidation and MCP refreshes when // remote installed state changes. on_effective_plugins_changed: Option>, } #[derive(Clone, Copy)] enum RemoteInstalledPluginsCacheRefreshNotify { IfCacheChanged, // Remote mutations may change local bundles or active MCP state even when the installed set is // unchanged. Notify after `/installed` succeeds so MCP refreshes are ordered after the remote // installed cache. AfterSuccessfulRefresh, } #[derive(Default)] struct RemoteInstalledPluginsCacheRefreshState { requested: Option, in_flight: bool, } #[derive(Clone, PartialEq, Eq)] struct NonCuratedCacheRefreshRequest { roots: Vec, mode: NonCuratedCacheRefreshMode, } #[derive(Clone, Copy, PartialEq, Eq)] enum NonCuratedCacheRefreshMode { IfVersionChanged, ForceReinstall, } #[derive(Default)] struct NonCuratedCacheRefreshState { requested: Option, last_refreshed: Option, in_flight: bool, } #[derive(Default)] struct ConfiguredMarketplaceUpgradeState { in_flight: bool, } fn remote_plugin_service_config(config: &PluginsConfigInput) -> RemotePluginServiceConfig { RemotePluginServiceConfig { chatgpt_base_url: config.chatgpt_base_url.clone(), } } fn featured_plugin_ids_cache_key( config: &PluginsConfigInput, auth: Option<&CodexAuth>, ) -> FeaturedPluginIdsCacheKey { FeaturedPluginIdsCacheKey { chatgpt_base_url: config.chatgpt_base_url.clone(), account_id: auth.and_then(CodexAuth::get_account_id), chatgpt_user_id: auth.and_then(CodexAuth::get_chatgpt_user_id), is_workspace_account: auth.is_some_and(CodexAuth::is_workspace_account), } } #[derive(Debug, Clone, PartialEq, Eq)] pub struct PluginInstallRequest { pub plugin_name: String, pub marketplace_path: AbsolutePathBuf, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct PluginReadRequest { pub plugin_name: String, pub marketplace_path: AbsolutePathBuf, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct PluginInstallOutcome { pub plugin_id: PluginId, pub plugin_version: String, pub installed_path: AbsolutePathBuf, pub auth_policy: MarketplacePluginAuthPolicy, } #[derive(Debug, Clone, PartialEq)] pub struct PluginReadOutcome { pub marketplace_name: String, pub marketplace_path: Option, pub plugin: PluginDetail, } #[derive(Debug, Clone, PartialEq)] pub struct PluginDetail { pub id: String, pub name: String, pub local_version: Option, pub description: Option, pub source: MarketplacePluginSource, pub policy: MarketplacePluginPolicy, pub interface: Option, pub keywords: Vec, pub installed: bool, pub enabled: bool, pub skills: Vec, pub disabled_skill_paths: HashSet, pub hooks: Vec, pub apps: Vec, pub mcp_server_names: Vec, pub details_unavailable_reason: Option, } #[derive(Debug, Clone, PartialEq)] pub struct PluginHookSummary { pub key: String, pub event_name: HookEventName, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PluginDetailsUnavailableReason { InstallRequiredForRemoteSource, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct ConfiguredMarketplace { pub name: String, pub path: AbsolutePathBuf, pub interface: Option, pub plugins: Vec, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct ConfiguredMarketplacePlugin { pub id: String, pub name: String, pub local_version: Option, pub source: MarketplacePluginSource, pub policy: MarketplacePluginPolicy, pub interface: Option, pub keywords: Vec, pub installed: bool, pub enabled: bool, } #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct ConfiguredMarketplaceListOutcome { pub marketplaces: Vec, pub errors: Vec, } impl From for PluginCapabilitySummary { fn from(value: PluginDetail) -> Self { let has_skills = value.skills.iter().any(|skill| { !value .disabled_skill_paths .contains(&skill.path_to_skills_md) }); Self { config_name: value.id, display_name: value.name, description: prompt_safe_plugin_description(value.description.as_deref()), has_skills, mcp_server_names: value.mcp_server_names, app_connector_ids: value.apps, } } } #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct RemotePluginSyncResult { /// Plugin ids newly installed into the local plugin cache. pub installed_plugin_ids: Vec, /// Plugin ids whose local config was changed to enabled. pub enabled_plugin_ids: Vec, /// Plugin ids whose local config was changed to disabled. /// This is not populated by `sync_plugins_from_remote`. pub disabled_plugin_ids: Vec, /// Plugin ids removed from local cache or plugin config. pub uninstalled_plugin_ids: Vec, } #[derive(Debug, thiserror::Error)] pub enum PluginRemoteSyncError { #[error("chatgpt authentication required to sync remote plugins")] AuthRequired, #[error( "chatgpt authentication required to sync remote plugins; api key auth is not supported" )] UnsupportedAuthMode, #[error("failed to read auth token for remote plugin sync: {0}")] AuthToken(#[source] std::io::Error), #[error("failed to send remote plugin sync request to {url}: {source}")] Request { url: String, #[source] source: reqwest::Error, }, #[error("remote plugin sync request to {url} failed with status {status}: {body}")] UnexpectedStatus { url: String, status: reqwest::StatusCode, body: String, }, #[error("failed to parse remote plugin sync response from {url}: {source}")] Decode { url: String, #[source] source: serde_json::Error, }, #[error("local curated marketplace is not available")] LocalMarketplaceNotFound, #[error("remote marketplace `{marketplace_name}` is not available locally")] UnknownRemoteMarketplace { marketplace_name: String }, #[error("duplicate remote plugin `{plugin_name}` in sync response")] DuplicateRemotePlugin { plugin_name: String }, #[error( "remote plugin `{plugin_name}` was not found in local marketplace `{marketplace_name}`" )] UnknownRemotePlugin { plugin_name: String, marketplace_name: String, }, #[error("{0}")] InvalidPluginId(#[from] PluginIdError), #[error("{0}")] Marketplace(#[from] MarketplaceError), #[error("{0}")] Store(#[from] PluginStoreError), #[error("{0}")] Config(#[from] anyhow::Error), #[error("failed to join remote plugin sync task: {0}")] Join(#[from] tokio::task::JoinError), } impl PluginRemoteSyncError { fn join(source: tokio::task::JoinError) -> Self { Self::Join(source) } } impl From for PluginRemoteSyncError { fn from(value: RemotePluginFetchError) -> Self { match value { RemotePluginFetchError::AuthRequired => Self::AuthRequired, RemotePluginFetchError::UnsupportedAuthMode => Self::UnsupportedAuthMode, RemotePluginFetchError::AuthToken(source) => Self::AuthToken(source), RemotePluginFetchError::Request { url, source } => Self::Request { url, source }, RemotePluginFetchError::UnexpectedStatus { url, status, body } => { Self::UnexpectedStatus { url, status, body } } RemotePluginFetchError::Decode { url, source } => Self::Decode { url, source }, } } } pub struct PluginsManager { codex_home: PathBuf, store: PluginStore, featured_plugin_ids_cache: RwLock>, configured_marketplace_upgrade_state: RwLock, non_curated_cache_refresh_state: RwLock, cached_enabled_outcome: RwLock>, remote_installed_plugins_cache: RwLock>>, remote_installed_plugins_cache_refresh_state: RwLock, remote_sync_lock: Semaphore, restriction_product: Option, analytics_events_client: RwLock>, } #[derive(Clone)] struct CachedPluginLoadOutcome { config_version: String, plugin_hooks_enabled: bool, outcome: PluginLoadOutcome, } impl PluginsManager { pub fn new(codex_home: PathBuf) -> Self { Self::new_with_restriction_product(codex_home, Some(Product::Codex)) } pub fn new_with_restriction_product( codex_home: PathBuf, restriction_product: Option, ) -> Self { // Product restrictions are enforced at marketplace admission time for a given CODEX_HOME: // listing, install, and curated refresh all consult this restriction context before new // plugins enter local config or cache. After admission, runtime plugin loading trusts the // contents of that CODEX_HOME and does not re-filter configured plugins by product, so // already-admitted plugins may continue exposing MCP servers/tools from shared local state. // // This assumes a single CODEX_HOME is only used by one product. Self { codex_home: codex_home.clone(), store: PluginStore::new(codex_home), featured_plugin_ids_cache: RwLock::new(None), configured_marketplace_upgrade_state: RwLock::new( ConfiguredMarketplaceUpgradeState::default(), ), non_curated_cache_refresh_state: RwLock::new(NonCuratedCacheRefreshState::default()), cached_enabled_outcome: RwLock::new(None), remote_installed_plugins_cache: RwLock::new(None), remote_installed_plugins_cache_refresh_state: RwLock::new( RemoteInstalledPluginsCacheRefreshState::default(), ), remote_sync_lock: Semaphore::new(/*permits*/ 1), restriction_product, analytics_events_client: RwLock::new(None), } } pub fn set_analytics_events_client(&self, analytics_events_client: AnalyticsEventsClient) { let mut stored_client = match self.analytics_events_client.write() { Ok(client_guard) => client_guard, Err(err) => err.into_inner(), }; *stored_client = Some(analytics_events_client); } fn restriction_product_matches(&self, products: Option<&[Product]>) -> bool { match products { None => true, Some([]) => false, Some(products) => self .restriction_product .is_some_and(|product| product.matches_product_restriction(products)), } } pub async fn plugins_for_config(&self, config: &PluginsConfigInput) -> PluginLoadOutcome { self.plugins_for_config_with_force_reload(config, /*force_reload*/ false) .await } pub(crate) async fn plugins_for_config_with_force_reload( &self, config: &PluginsConfigInput, force_reload: bool, ) -> PluginLoadOutcome { if !config.plugins_enabled { return PluginLoadOutcome::default(); } let plugin_hooks_enabled = config.plugin_hooks_enabled; let config_version = version_for_toml(&config.config_layer_stack.effective_config()); if !force_reload && let Some(outcome) = self.cached_enabled_outcome(&config_version, plugin_hooks_enabled) { return outcome; } let outcome = load_plugins_from_layer_stack( &config.config_layer_stack, self.remote_installed_plugin_configs(config), &self.store, self.restriction_product, plugin_hooks_enabled, ) .await; log_plugin_load_errors(&outcome); let mut cache = match self.cached_enabled_outcome.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; *cache = Some(CachedPluginLoadOutcome { config_version, plugin_hooks_enabled, outcome: outcome.clone(), }); outcome } pub fn clear_cache(&self) { self.clear_enabled_outcome_cache(); let mut featured_plugin_ids_cache = match self.featured_plugin_ids_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; *featured_plugin_ids_cache = None; } fn clear_enabled_outcome_cache(&self) { let mut cached_enabled_outcome = match self.cached_enabled_outcome.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; *cached_enabled_outcome = None; } /// Load plugins for a config layer stack without touching the plugins cache. pub async fn plugins_for_layer_stack( &self, config_layer_stack: &ConfigLayerStack, config: &PluginsConfigInput, plugin_hooks_feature_enabled: bool, ) -> PluginLoadOutcome { if !config.plugins_enabled { return PluginLoadOutcome::default(); } load_plugins_from_layer_stack( config_layer_stack, self.remote_installed_plugin_configs(config), &self.store, self.restriction_product, plugin_hooks_feature_enabled, ) .await } /// Resolve plugin skill roots for a config layer stack without touching the plugins cache. pub async fn effective_skill_roots_for_layer_stack( &self, config_layer_stack: &ConfigLayerStack, config: &PluginsConfigInput, ) -> Vec { self.plugins_for_layer_stack(config_layer_stack, config, config.plugin_hooks_enabled) .await .effective_plugin_skill_roots() } fn cached_enabled_outcome( &self, config_version: &str, plugin_hooks_enabled: bool, ) -> Option { match self.cached_enabled_outcome.read() { Ok(cache) => cache .as_ref() .filter(|cached| { cached.config_version == config_version && cached.plugin_hooks_enabled == plugin_hooks_enabled }) .map(|cached| cached.outcome.clone()), Err(err) => err .into_inner() .as_ref() .filter(|cached| { cached.config_version == config_version && cached.plugin_hooks_enabled == plugin_hooks_enabled }) .map(|cached| cached.outcome.clone()), } } fn remote_installed_plugin_configs( &self, config: &PluginsConfigInput, ) -> HashMap { if !config.remote_plugin_enabled { return HashMap::new(); } let cache = match self.remote_installed_plugins_cache.read() { Ok(cache) => cache, Err(err) => err.into_inner(), }; let Some(plugins) = cache.as_ref() else { return HashMap::new(); }; remote_installed_plugins_to_config(plugins, &self.store) } fn write_remote_installed_plugins_cache(&self, plugins: Vec) -> bool { let mut cache = match self.remote_installed_plugins_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; if cache.as_ref().is_some_and(|cache| cache.eq(&plugins)) { return false; } *cache = Some(plugins); drop(cache); self.clear_enabled_outcome_cache(); true } pub fn clear_remote_installed_plugins_cache(&self) -> bool { let mut cache = match self.remote_installed_plugins_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; if cache.is_none() { return false; } *cache = None; drop(cache); self.clear_enabled_outcome_cache(); true } pub fn maybe_start_remote_installed_plugins_cache_refresh( self: &Arc, config: &PluginsConfigInput, auth: Option, on_effective_plugins_changed: Option>, ) { self.maybe_start_remote_installed_plugins_cache_refresh_with_notify( config, auth, RemoteInstalledPluginsCacheRefreshNotify::IfCacheChanged, on_effective_plugins_changed, ); } pub fn maybe_start_remote_installed_plugins_cache_refresh_after_mutation( self: &Arc, config: &PluginsConfigInput, auth: Option, on_effective_plugins_changed: Option>, ) { self.maybe_start_remote_installed_plugins_cache_refresh_with_notify( config, auth, RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh, on_effective_plugins_changed, ); } fn maybe_start_remote_installed_plugins_cache_refresh_with_notify( self: &Arc, config: &PluginsConfigInput, auth: Option, notify: RemoteInstalledPluginsCacheRefreshNotify, on_effective_plugins_changed: Option>, ) { if !config.plugins_enabled || !config.remote_plugin_enabled { return; } self.schedule_remote_installed_plugins_cache_refresh( RemoteInstalledPluginsCacheRefreshRequest { service_config: remote_plugin_service_config(config), auth, notify, on_effective_plugins_changed, }, ); } fn maybe_start_remote_installed_plugin_bundle_sync( self: &Arc, config: &PluginsConfigInput, auth: Option, on_effective_plugins_changed: Option>, ) { if !config.plugins_enabled || !config.remote_plugin_enabled { return; } let manager = Arc::clone(self); let config_for_refresh = config.clone(); let auth_for_refresh = auth.clone(); let on_local_cache_changed = Arc::new(move || { manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation( &config_for_refresh, auth_for_refresh.clone(), on_effective_plugins_changed.clone(), ); }); crate::remote::maybe_start_remote_installed_plugin_bundle_sync( self.codex_home.clone(), remote_plugin_service_config(config), auth, Some(on_local_cache_changed), ); } pub fn maybe_start_plugin_list_background_tasks_for_config( self: &Arc, config: &PluginsConfigInput, auth: Option, roots: &[AbsolutePathBuf], on_effective_plugins_changed: Option>, ) { self.maybe_start_non_curated_plugin_cache_refresh(roots); self.maybe_start_remote_installed_plugins_cache_refresh( config, auth.clone(), on_effective_plugins_changed.clone(), ); self.maybe_start_remote_installed_plugin_bundle_sync( config, auth, on_effective_plugins_changed, ); } fn cached_featured_plugin_ids( &self, cache_key: &FeaturedPluginIdsCacheKey, ) -> Option> { { let cache = match self.featured_plugin_ids_cache.read() { Ok(cache) => cache, Err(err) => err.into_inner(), }; let now = Instant::now(); if let Some(cached) = cache.as_ref() && now < cached.expires_at && cached.key == *cache_key { return Some(cached.featured_plugin_ids.clone()); } } let mut cache = match self.featured_plugin_ids_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; let now = Instant::now(); if cache .as_ref() .is_some_and(|cached| now >= cached.expires_at || cached.key != *cache_key) { *cache = None; } None } fn write_featured_plugin_ids_cache( &self, cache_key: FeaturedPluginIdsCacheKey, featured_plugin_ids: &[String], ) { let mut cache = match self.featured_plugin_ids_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; *cache = Some(CachedFeaturedPluginIds { key: cache_key, expires_at: Instant::now() + FEATURED_PLUGIN_IDS_CACHE_TTL, featured_plugin_ids: featured_plugin_ids.to_vec(), }); } pub async fn featured_plugin_ids_for_config( &self, config: &PluginsConfigInput, auth: Option<&CodexAuth>, ) -> Result, RemotePluginFetchError> { if !config.plugins_enabled { return Ok(Vec::new()); } let cache_key = featured_plugin_ids_cache_key(config, auth); if let Some(featured_plugin_ids) = self.cached_featured_plugin_ids(&cache_key) { return Ok(featured_plugin_ids); } let featured_plugin_ids = crate::remote_legacy::fetch_remote_featured_plugin_ids( &remote_plugin_service_config(config), auth, self.restriction_product, ) .await?; self.write_featured_plugin_ids_cache(cache_key, &featured_plugin_ids); Ok(featured_plugin_ids) } pub async fn install_plugin( &self, request: PluginInstallRequest, ) -> Result { let resolved = find_installable_marketplace_plugin( &request.marketplace_path, &request.plugin_name, self.restriction_product, )?; self.install_resolved_plugin(resolved).await } pub async fn install_plugin_with_remote_sync( &self, config: &PluginsConfigInput, auth: Option<&CodexAuth>, request: PluginInstallRequest, ) -> Result { let resolved = find_installable_marketplace_plugin( &request.marketplace_path, &request.plugin_name, self.restriction_product, )?; let plugin_id = resolved.plugin_id.as_key(); // This only forwards the backend mutation before the local install flow. crate::remote_legacy::enable_remote_plugin( &remote_plugin_service_config(config), auth, &plugin_id, ) .await .map_err(PluginInstallError::from)?; self.install_resolved_plugin(resolved).await } async fn install_resolved_plugin( &self, resolved: ResolvedMarketplacePlugin, ) -> Result { let auth_policy = resolved.policy.authentication; let plugin_version = if resolved.plugin_id.marketplace_name == OPENAI_CURATED_MARKETPLACE_NAME { let curated_plugin_version = read_curated_plugins_sha(self.codex_home.as_path()) .ok_or_else(|| { PluginStoreError::Invalid( "local curated marketplace sha is not available".to_string(), ) })?; Some(curated_plugin_cache_version(&curated_plugin_version)) } else { None }; let store = self.store.clone(); let codex_home = self.codex_home.clone(); let result: StorePluginInstallResult = tokio::task::spawn_blocking(move || { let materialized = materialize_marketplace_plugin_source(codex_home.as_path(), &resolved.source) .map_err(PluginStoreError::Invalid)?; let source_path = materialized.path; if let Some(plugin_version) = plugin_version { store.install_with_version(source_path, resolved.plugin_id, plugin_version) } else { store.install(source_path, resolved.plugin_id) } }) .await .map_err(PluginInstallError::join)??; set_user_plugin_enabled( &self.codex_home, result.plugin_id.as_key(), /*enabled*/ true, ) .await .map_err(anyhow::Error::from)?; let analytics_events_client = match self.analytics_events_client.read() { Ok(client) => client.clone(), Err(err) => err.into_inner().clone(), }; if let Some(analytics_events_client) = analytics_events_client { analytics_events_client.track_plugin_installed( plugin_telemetry_metadata_from_root(&result.plugin_id, &result.installed_path) .await, ); } Ok(PluginInstallOutcome { plugin_id: result.plugin_id, plugin_version: result.plugin_version, installed_path: result.installed_path, auth_policy, }) } pub async fn uninstall_plugin(&self, plugin_id: String) -> Result<(), PluginUninstallError> { let plugin_id = PluginId::parse(&plugin_id)?; self.uninstall_plugin_id(plugin_id).await } pub async fn uninstall_plugin_with_remote_sync( &self, config: &PluginsConfigInput, auth: Option<&CodexAuth>, plugin_id: String, ) -> Result<(), PluginUninstallError> { // TODO: Remove this legacy remote-sync path once remote plugins have // their own manager and installed-state API. let plugin_id = PluginId::parse(&plugin_id)?; let plugin_key = plugin_id.as_key(); // This only forwards the backend mutation before the local uninstall flow. crate::remote_legacy::uninstall_remote_plugin( &remote_plugin_service_config(config), auth, &plugin_key, ) .await .map_err(PluginUninstallError::from)?; self.uninstall_plugin_id(plugin_id).await } async fn uninstall_plugin_id(&self, plugin_id: PluginId) -> Result<(), PluginUninstallError> { let plugin_telemetry = if self.store.active_plugin_root(&plugin_id).is_some() { Some(installed_plugin_telemetry_metadata(self.codex_home.as_path(), &plugin_id).await) } else { None }; let store = self.store.clone(); let plugin_id_for_store = plugin_id.clone(); tokio::task::spawn_blocking(move || store.uninstall(&plugin_id_for_store)) .await .map_err(PluginUninstallError::join)??; clear_user_plugin(&self.codex_home, plugin_id.as_key()) .await .map_err(anyhow::Error::from)?; let analytics_events_client = match self.analytics_events_client.read() { Ok(client) => client.clone(), Err(err) => err.into_inner().clone(), }; if let Some(plugin_telemetry) = plugin_telemetry && let Some(analytics_events_client) = analytics_events_client { analytics_events_client.track_plugin_uninstalled(plugin_telemetry); } Ok(()) } pub async fn sync_plugins_from_remote( &self, config: &PluginsConfigInput, auth: Option<&CodexAuth>, additive_only: bool, ) -> Result { let _remote_sync_guard = self.remote_sync_lock.acquire().await.map_err(|_| { PluginRemoteSyncError::Config(anyhow::anyhow!("remote plugin sync semaphore closed")) })?; if !config.plugins_enabled { return Ok(RemotePluginSyncResult::default()); } info!("starting remote plugin sync"); let remote_plugins = crate::remote_legacy::fetch_remote_plugin_status( &remote_plugin_service_config(config), auth, ) .await .map_err(PluginRemoteSyncError::from)?; let configured_plugins = configured_plugins_from_stack(&config.config_layer_stack); let curated_marketplace_root = curated_plugins_repo_path(self.codex_home.as_path()); let curated_marketplace_path = AbsolutePathBuf::try_from( curated_marketplace_root.join(".agents/plugins/marketplace.json"), ) .map_err(|_| PluginRemoteSyncError::LocalMarketplaceNotFound)?; let curated_marketplace = match load_marketplace(&curated_marketplace_path) { Ok(marketplace) => marketplace, Err(MarketplaceError::MarketplaceNotFound { .. }) => { return Err(PluginRemoteSyncError::LocalMarketplaceNotFound); } Err(err) => return Err(err.into()), }; let marketplace_name = curated_marketplace.name.clone(); let curated_plugin_version = read_curated_plugins_sha(self.codex_home.as_path()) .ok_or_else(|| { PluginStoreError::Invalid( "local curated marketplace sha is not available".to_string(), ) })?; let cache_plugin_version = curated_plugin_cache_version(&curated_plugin_version); let mut local_plugins = Vec::<( String, PluginId, AbsolutePathBuf, Option, Option, bool, )>::new(); let mut local_plugin_names = HashSet::new(); for plugin in curated_marketplace.plugins { let plugin_name = plugin.name; if !local_plugin_names.insert(plugin_name.clone()) { warn!( plugin = plugin_name, marketplace = %marketplace_name, "ignoring duplicate local plugin entry during remote sync" ); continue; } let plugin_id = PluginId::new(plugin_name.clone(), marketplace_name.clone())?; let plugin_key = plugin_id.as_key(); let source_path = match plugin.source { MarketplacePluginSource::Local { path } => path, MarketplacePluginSource::Git { .. } => { warn!( plugin = plugin_name, marketplace = %marketplace_name, "skipping remote plugin source during remote sync" ); continue; } }; let current_enabled = configured_plugins .get(&plugin_key) .map(|plugin| plugin.enabled); let installed_version = self.store.active_plugin_version(&plugin_id); let product_allowed = self.restriction_product_matches(plugin.policy.products.as_deref()); local_plugins.push(( plugin_name, plugin_id, source_path, current_enabled, installed_version, product_allowed, )); } let mut missing_remote_plugins = Vec::::new(); let mut remote_installed_plugin_names = HashSet::::new(); for plugin in remote_plugins { if plugin.marketplace_name != marketplace_name { return Err(PluginRemoteSyncError::UnknownRemoteMarketplace { marketplace_name: plugin.marketplace_name, }); } if !local_plugin_names.contains(&plugin.name) { missing_remote_plugins.push(plugin.name); continue; } // For now, sync treats remote `enabled = false` as uninstall rather than a distinct // disabled state. // TODO: Switch sync to `plugins/installed` so install and enable states stay distinct. if !plugin.enabled { continue; } if !remote_installed_plugin_names.insert(plugin.name.clone()) { return Err(PluginRemoteSyncError::DuplicateRemotePlugin { plugin_name: plugin.name, }); } } let mut config_edits = Vec::new(); let mut installs = Vec::new(); let mut uninstalls = Vec::new(); let mut result = RemotePluginSyncResult::default(); let remote_plugin_count = remote_installed_plugin_names.len(); let local_plugin_count = local_plugins.len(); if !missing_remote_plugins.is_empty() { let sample_missing_plugins = missing_remote_plugins .iter() .take(10) .cloned() .collect::>(); warn!( marketplace = %marketplace_name, missing_remote_plugin_count = missing_remote_plugins.len(), missing_remote_plugin_examples = ?sample_missing_plugins, "ignoring remote plugins missing from local marketplace during sync" ); } for ( plugin_name, plugin_id, source_path, current_enabled, installed_version, product_allowed, ) in local_plugins { let plugin_key = plugin_id.as_key(); let is_installed = installed_version.is_some(); if !product_allowed { continue; } if remote_installed_plugin_names.contains(&plugin_name) { if !is_installed { installs.push((source_path, plugin_id.clone(), cache_plugin_version.clone())); } if !is_installed { result.installed_plugin_ids.push(plugin_key.clone()); } if current_enabled != Some(true) { result.enabled_plugin_ids.push(plugin_key.clone()); config_edits.push(PluginConfigEdit::SetEnabled { plugin_key, enabled: true, }); } } else if !additive_only { if is_installed { uninstalls.push(plugin_id); } if is_installed || current_enabled.is_some() { result.uninstalled_plugin_ids.push(plugin_key.clone()); } if current_enabled.is_some() { config_edits.push(PluginConfigEdit::Clear { plugin_key }); } } } let store = self.store.clone(); let store_result = tokio::task::spawn_blocking(move || { for (source_path, plugin_id, plugin_version) in installs { store.install_with_version(source_path, plugin_id, plugin_version)?; } for plugin_id in uninstalls { store.uninstall(&plugin_id)?; } Ok::<(), PluginStoreError>(()) }) .await .map_err(PluginRemoteSyncError::join)?; if let Err(err) = store_result { self.clear_cache(); return Err(err.into()); } let config_result = if config_edits.is_empty() { Ok(()) } else { apply_user_plugin_config_edits(&self.codex_home, config_edits).await }; self.clear_cache(); config_result.map_err(anyhow::Error::from)?; info!( marketplace = %marketplace_name, remote_plugin_count, local_plugin_count, installed_plugin_ids = ?result.installed_plugin_ids, enabled_plugin_ids = ?result.enabled_plugin_ids, disabled_plugin_ids = ?result.disabled_plugin_ids, uninstalled_plugin_ids = ?result.uninstalled_plugin_ids, "completed remote plugin sync" ); Ok(result) } pub fn list_marketplaces_for_config( &self, config: &PluginsConfigInput, additional_roots: &[AbsolutePathBuf], ) -> Result { if !config.plugins_enabled { return Ok(ConfiguredMarketplaceListOutcome::default()); } let (installed_plugins, enabled_plugins) = self.configured_plugin_states(config); let marketplace_outcome = list_marketplaces(&self.marketplace_roots(config, additional_roots))?; let mut seen_plugin_keys = HashSet::new(); let marketplaces = marketplace_outcome .marketplaces .into_iter() .filter_map(|marketplace| { let marketplace_name = marketplace.name.clone(); let plugins = marketplace .plugins .into_iter() .filter_map(|plugin| { let plugin_key = format!("{}@{marketplace_name}", plugin.name); if !seen_plugin_keys.insert(plugin_key.clone()) { return None; } if !self.restriction_product_matches(plugin.policy.products.as_deref()) { return None; } let installed = installed_plugins.contains(&plugin_key); let enabled = enabled_plugins.contains(&plugin_key); let mut interface = plugin.interface; let mut local_version = plugin.local_version; if installed && matches!(&plugin.source, MarketplacePluginSource::Git { .. }) && let Ok(plugin_id) = PluginId::new(plugin.name.clone(), marketplace_name.clone()) && let Some(plugin_root) = self.store.active_plugin_root(&plugin_id) && let Some(manifest) = load_plugin_manifest(plugin_root.as_path()) { local_version = manifest.version.clone(); let marketplace_category = interface .as_ref() .and_then(|interface| interface.category.clone()); interface = plugin_interface_with_marketplace_category( manifest.interface, marketplace_category, ); } Some(ConfiguredMarketplacePlugin { // Enabled state is keyed by `@`, so duplicate // plugin entries from duplicate marketplace files intentionally // resolve to the first discovered source. id: plugin_key, installed, enabled, name: plugin.name, local_version, source: plugin.source, policy: plugin.policy, keywords: plugin.keywords, interface, }) }) .collect::>(); (!plugins.is_empty()).then_some(ConfiguredMarketplace { name: marketplace.name, path: marketplace.path, interface: marketplace.interface, plugins, }) }) .collect(); Ok(ConfiguredMarketplaceListOutcome { marketplaces, errors: marketplace_outcome.errors, }) } pub async fn read_plugin_for_config( &self, config: &PluginsConfigInput, request: &PluginReadRequest, ) -> Result { if !config.plugins_enabled { return Err(MarketplaceError::PluginsDisabled); } let plugin = find_marketplace_plugin(&request.marketplace_path, &request.plugin_name)?; if !self.restriction_product_matches(plugin.policy.products.as_deref()) { return Err(MarketplaceError::PluginNotFound { plugin_name: plugin.plugin_id.plugin_name, marketplace_name: plugin.plugin_id.marketplace_name, }); } let marketplace_name = plugin.plugin_id.marketplace_name.clone(); let plugin_key = plugin.plugin_id.as_key(); let (installed_plugins, enabled_plugins) = self.configured_plugin_states(config); let plugin = self .read_plugin_detail_for_marketplace_plugin( config, &marketplace_name, ConfiguredMarketplacePlugin { id: plugin_key.clone(), name: plugin.plugin_id.plugin_name, local_version: plugin .manifest .as_ref() .and_then(|manifest| manifest.version.clone()), source: plugin.source, policy: plugin.policy, interface: plugin.interface, keywords: plugin .manifest .as_ref() .map(|manifest| manifest.keywords.clone()) .unwrap_or_default(), installed: installed_plugins.contains(&plugin_key), enabled: enabled_plugins.contains(&plugin_key), }, ) .await?; Ok(PluginReadOutcome { marketplace_name, marketplace_path: Some(request.marketplace_path.clone()), plugin, }) } pub async fn read_plugin_detail_for_marketplace_plugin( &self, config: &PluginsConfigInput, marketplace_name: &str, plugin: ConfiguredMarketplacePlugin, ) -> Result { if !self.restriction_product_matches(plugin.policy.products.as_deref()) { return Err(MarketplaceError::PluginNotFound { plugin_name: plugin.name, marketplace_name: marketplace_name.to_string(), }); } let plugin_id = PluginId::new(plugin.name.clone(), marketplace_name.to_string()).map_err(|err| { match err { PluginIdError::Invalid(message) => MarketplaceError::InvalidPlugin(message), } })?; let plugin_key = plugin_id.as_key(); if matches!(plugin.source, MarketplacePluginSource::Git { .. }) && !plugin.installed { let description = remote_plugin_install_required_description(&plugin.source); return Ok(PluginDetail { id: plugin_key, name: plugin.name, local_version: None, description: Some(description), source: plugin.source, policy: plugin.policy, interface: plugin.interface, keywords: plugin.keywords, installed: plugin.installed, enabled: plugin.enabled, skills: Vec::new(), disabled_skill_paths: HashSet::new(), hooks: Vec::new(), apps: Vec::new(), mcp_server_names: Vec::new(), details_unavailable_reason: Some( PluginDetailsUnavailableReason::InstallRequiredForRemoteSource, ), }); } let source_path = if matches!(plugin.source, MarketplacePluginSource::Git { .. }) && plugin.installed { self.store.active_plugin_root(&plugin_id).ok_or_else(|| { MarketplaceError::InvalidPlugin(format!( "installed plugin cache entry is missing for {plugin_key}" )) })? } else { let codex_home = self.codex_home.clone(); let source = plugin.source.clone(); let materialized = tokio::task::spawn_blocking(move || { materialize_marketplace_plugin_source(codex_home.as_path(), &source) }) .await .map_err(|err| { MarketplaceError::InvalidPlugin(format!( "failed to materialize plugin source: {err}" )) })? .map_err(MarketplaceError::InvalidPlugin)?; materialized.path.clone() }; if !source_path.as_path().is_dir() { return Err(MarketplaceError::InvalidPlugin( "path does not exist or is not a directory".to_string(), )); } let manifest = load_plugin_manifest(source_path.as_path()).ok_or_else(|| { MarketplaceError::InvalidPlugin("missing or invalid plugin.json".to_string()) })?; let description = manifest.description.clone(); let marketplace_category = plugin .interface .as_ref() .and_then(|interface| interface.category.clone()); let interface = plugin_interface_with_marketplace_category( manifest.interface.clone(), marketplace_category, ); let resolved_skills = load_plugin_skills( &source_path, &plugin_id, &manifest.paths, self.restriction_product, &codex_core_skills::config_rules::skill_config_rules_from_stack( &config.config_layer_stack, ), ) .await; let hooks = if config.plugin_hooks_enabled { let plugin_data_root = self.store.plugin_data_root(&plugin_id); let (hook_sources, _hook_load_warnings) = load_plugin_hooks(&source_path, &plugin_id, &plugin_data_root, &manifest.paths); plugin_hook_declarations(&hook_sources) .into_iter() .map(|hook| PluginHookSummary { key: hook.key, event_name: hook.event_name, }) .collect() } else { Vec::new() }; let apps = load_plugin_apps(source_path.as_path()).await; let mut mcp_server_names = load_plugin_mcp_servers(source_path.as_path()) .await .into_keys() .collect::>(); mcp_server_names.sort_unstable(); mcp_server_names.dedup(); Ok(PluginDetail { id: plugin.id, name: plugin.name, local_version: manifest.version.clone(), description, source: plugin.source, policy: plugin.policy, interface, keywords: manifest.keywords, installed: plugin.installed, enabled: plugin.enabled, skills: resolved_skills.skills, disabled_skill_paths: resolved_skills.disabled_skill_paths, hooks, apps, mcp_server_names, details_unavailable_reason: None, }) } pub fn maybe_start_plugin_startup_tasks_for_config( self: &Arc, config: &PluginsConfigInput, auth_manager: Arc, on_effective_plugins_changed: Option>, ) { if config.plugins_enabled { self.start_curated_repo_sync(); let should_spawn_marketplace_auto_upgrade = { let mut state = match self.configured_marketplace_upgrade_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; if state.in_flight { false } else { state.in_flight = true; true } }; if should_spawn_marketplace_auto_upgrade { let manager = Arc::clone(self); let config = config.clone(); if let Err(err) = std::thread::Builder::new() .name("plugins-marketplace-auto-upgrade".to_string()) .spawn(move || { let outcome = manager.upgrade_configured_marketplaces_for_config( &config, /*marketplace_name*/ None, ); match outcome { Ok(outcome) => { for error in outcome.errors { warn!( marketplace = error.marketplace_name, error = %error.message, "failed to auto-upgrade configured marketplace" ); } } Err(err) => { warn!("failed to auto-upgrade configured marketplaces: {err}"); } } let mut state = match manager.configured_marketplace_upgrade_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; state.in_flight = false; }) { let mut state = match self.configured_marketplace_upgrade_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; state.in_flight = false; warn!("failed to start configured marketplace auto-upgrade task: {err}"); } } start_startup_remote_plugin_sync_once( Arc::clone(self), self.codex_home.clone(), config.clone(), auth_manager.clone(), ); if config.remote_plugin_enabled { let config = config.clone(); let manager = Arc::clone(self); let auth_manager = auth_manager.clone(); let on_effective_plugins_changed = on_effective_plugins_changed.clone(); tokio::spawn(async move { let auth = auth_manager.auth().await; manager.maybe_start_remote_installed_plugins_cache_refresh( &config, auth.clone(), on_effective_plugins_changed.clone(), ); manager.maybe_start_remote_installed_plugin_bundle_sync( &config, auth, on_effective_plugins_changed, ); }); } let config = config.clone(); let manager = Arc::clone(self); tokio::spawn(async move { let auth = auth_manager.auth().await; if let Err(err) = manager .featured_plugin_ids_for_config(&config, auth.as_ref()) .await { warn!( error = %err, "failed to warm featured plugin ids cache" ); } }); } } pub fn upgrade_configured_marketplaces_for_config( &self, config: &PluginsConfigInput, marketplace_name: Option<&str>, ) -> Result { if let Some(marketplace_name) = marketplace_name && !configured_git_marketplace_names(&config.config_layer_stack) .iter() .any(|name| name == marketplace_name) { return Err(format!( "marketplace `{marketplace_name}` is not configured as a Git marketplace" )); } let mut outcome = upgrade_configured_git_marketplaces( self.codex_home.as_path(), &config.config_layer_stack, marketplace_name, ); if !outcome.upgraded_roots.is_empty() { match refresh_non_curated_plugin_cache_force_reinstall( self.codex_home.as_path(), &outcome.upgraded_roots, ) { Ok(cache_refreshed) => { if cache_refreshed { self.clear_cache(); } } Err(err) => { self.clear_cache(); outcome.errors.push(ConfiguredMarketplaceUpgradeError { marketplace_name: marketplace_name .unwrap_or("all configured marketplaces") .to_string(), message: format!( "failed to refresh installed plugin cache after marketplace upgrade: {err}" ), }); } } } Ok(outcome) } pub fn maybe_start_non_curated_plugin_cache_refresh( self: &Arc, roots: &[AbsolutePathBuf], ) { self.schedule_non_curated_plugin_cache_refresh( roots, NonCuratedCacheRefreshMode::IfVersionChanged, ); } fn schedule_remote_installed_plugins_cache_refresh( self: &Arc, mut request: RemoteInstalledPluginsCacheRefreshRequest, ) { let should_spawn = { let mut state = match self.remote_installed_plugins_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; if let Some(existing_request) = state.requested.as_ref() { if matches!( existing_request.notify, RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh ) { request.notify = RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh; } if request.on_effective_plugins_changed.is_none() { request.on_effective_plugins_changed = existing_request.on_effective_plugins_changed.clone(); } } state.requested = Some(request); if state.in_flight { false } else { state.in_flight = true; true } }; if !should_spawn { return; } let manager = Arc::clone(self); tokio::spawn(async move { manager .run_remote_installed_plugins_cache_refresh_loop() .await; }); } fn schedule_non_curated_plugin_cache_refresh( self: &Arc, roots: &[AbsolutePathBuf], mode: NonCuratedCacheRefreshMode, ) { let mut roots = roots.to_vec(); roots.sort_unstable(); roots.dedup(); if roots.is_empty() { return; } let request = NonCuratedCacheRefreshRequest { roots, mode }; let should_spawn = { let mut state = match self.non_curated_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; // Collapse repeated plugin/list requests onto one worker and only queue another pass // when the requested roots set actually changes. Forced reinstall requests are not // deduped against the last completed pass because the same marketplace root path can // point at newly activated files after an auto-upgrade. if state.requested.as_ref() == Some(&request) || (mode == NonCuratedCacheRefreshMode::IfVersionChanged && !state.in_flight && state.last_refreshed.as_ref() == Some(&request)) { return; } if mode == NonCuratedCacheRefreshMode::IfVersionChanged && state.requested.as_ref().is_some_and(|requested| { requested.mode == NonCuratedCacheRefreshMode::ForceReinstall && requested.roots == request.roots }) { return; } state.requested = Some(request); if state.in_flight { false } else { state.in_flight = true; true } }; if !should_spawn { return; } let manager = Arc::clone(self); if let Err(err) = std::thread::Builder::new() .name("plugins-non-curated-cache-refresh".to_string()) .spawn(move || manager.run_non_curated_plugin_cache_refresh_loop()) { let mut state = match self.non_curated_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; state.in_flight = false; state.requested = None; warn!("failed to start non-curated plugin cache refresh task: {err}"); } } fn start_curated_repo_sync(self: &Arc) { if CURATED_REPO_SYNC_STARTED.swap(true, Ordering::SeqCst) { return; } let manager = Arc::clone(self); let codex_home = self.codex_home.clone(); if let Err(err) = std::thread::Builder::new() .name("plugins-curated-repo-sync".to_string()) .spawn( move || match sync_openai_plugins_repo(codex_home.as_path()) { Ok(curated_plugin_version) => { let configured_curated_plugin_ids = configured_curated_plugin_ids_from_codex_home(codex_home.as_path()); match refresh_curated_plugin_cache( codex_home.as_path(), &curated_plugin_version, &configured_curated_plugin_ids, ) { Ok(cache_refreshed) => { if cache_refreshed { manager.clear_cache(); } } Err(err) => { manager.clear_cache(); CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst); warn!("failed to refresh curated plugin cache after sync: {err}"); } } } Err(err) => { CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst); warn!("failed to sync curated plugins repo: {err}"); } }, ) { CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst); warn!("failed to start curated plugins repo sync task: {err}"); } } async fn run_remote_installed_plugins_cache_refresh_loop(self: Arc) { loop { let request = { let mut state = match self.remote_installed_plugins_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; match state.requested.take() { Some(request) => request, None => { state.in_flight = false; return; } } }; let installed_plugins = crate::remote::fetch_remote_installed_plugins( &request.service_config, request.auth.as_ref(), ) .await; match installed_plugins { Ok(installed_plugins) => { // TODO(remote plugins): reconcile missing or stale local bundles before // publishing remote installed state as effective local plugin config. let changed = self.write_remote_installed_plugins_cache(installed_plugins); let should_notify = changed || matches!( request.notify, RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh ); if should_notify && let Some(on_effective_plugins_changed) = request.on_effective_plugins_changed { on_effective_plugins_changed(); } } Err( RemotePluginCatalogError::AuthRequired | RemotePluginCatalogError::UnsupportedAuthMode, ) => { let changed = self.clear_remote_installed_plugins_cache(); if changed && let Some(on_effective_plugins_changed) = request.on_effective_plugins_changed { on_effective_plugins_changed(); } } Err(err) => { warn!( error = %err, "failed to refresh remote installed plugins cache" ); } } } } fn run_non_curated_plugin_cache_refresh_loop(self: Arc) { loop { let request = { let state = match self.non_curated_cache_refresh_state.read() { Ok(state) => state, Err(err) => err.into_inner(), }; state.requested.clone() }; let Some(request) = request else { let mut state = match self.non_curated_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; state.in_flight = false; return; }; let refresh_result = match request.mode { NonCuratedCacheRefreshMode::IfVersionChanged => { refresh_non_curated_plugin_cache(self.codex_home.as_path(), &request.roots) } NonCuratedCacheRefreshMode::ForceReinstall => { refresh_non_curated_plugin_cache_force_reinstall( self.codex_home.as_path(), &request.roots, ) } }; let refreshed = match refresh_result { Ok(cache_refreshed) => { if cache_refreshed { self.clear_cache(); } true } Err(err) => { self.clear_cache(); warn!("failed to refresh non-curated plugin cache: {err}"); false } }; let mut state = match self.non_curated_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; if refreshed { state.last_refreshed = Some(request.clone()); } if state.requested.as_ref() == Some(&request) { state.requested = None; state.in_flight = false; return; } } } fn configured_plugin_states( &self, config: &PluginsConfigInput, ) -> (HashSet, HashSet) { let configured_plugins = configured_plugins_from_stack(&config.config_layer_stack); let installed_plugins = configured_plugins .keys() .filter(|plugin_key| { PluginId::parse(plugin_key) .ok() .is_some_and(|plugin_id| self.store.is_installed(&plugin_id)) }) .cloned() .collect::>(); let enabled_plugins = configured_plugins .into_iter() .filter_map(|(plugin_key, plugin)| plugin.enabled.then_some(plugin_key)) .collect::>(); (installed_plugins, enabled_plugins) } fn marketplace_roots( &self, config: &PluginsConfigInput, additional_roots: &[AbsolutePathBuf], ) -> Vec { // Treat the curated catalog as an extra marketplace root so plugin listing can surface it // without requiring every caller to know where it is stored. let mut roots = additional_roots.to_vec(); roots.extend(installed_marketplace_roots_from_layer_stack( &config.config_layer_stack, self.codex_home.as_path(), )); let curated_repo_root = curated_plugins_repo_path(self.codex_home.as_path()); if curated_repo_root.is_dir() && let Ok(curated_repo_root) = AbsolutePathBuf::try_from(curated_repo_root) { roots.push(curated_repo_root); } roots.sort_unstable(); roots.dedup(); roots } } fn remote_plugin_install_required_description(source: &MarketplacePluginSource) -> String { let source_description = match source { MarketplacePluginSource::Git { url, path, ref_name, sha, } => { let mut parts = vec![url.clone()]; if let Some(path) = path { parts.push(format!("path `{path}`")); } if let Some(ref_name) = ref_name { parts.push(format!("ref `{ref_name}`")); } if let Some(sha) = sha { parts.push(format!("sha `{sha}`")); } parts.join(", ") } MarketplacePluginSource::Local { path } => path.as_path().display().to_string(), }; format!( "This is a cross-repo plugin. Install it to view more detailed information. The source of the plugin is {source_description}." ) } #[derive(Debug, thiserror::Error)] pub enum PluginInstallError { #[error("{0}")] Marketplace(#[from] MarketplaceError), #[error("{0}")] Remote(#[from] RemotePluginMutationError), #[error("{0}")] Store(#[from] PluginStoreError), #[error("{0}")] Config(#[from] anyhow::Error), #[error("failed to join plugin install task: {0}")] Join(#[from] tokio::task::JoinError), } impl PluginInstallError { fn join(source: tokio::task::JoinError) -> Self { Self::Join(source) } pub fn is_invalid_request(&self) -> bool { matches!( self, Self::Marketplace( MarketplaceError::MarketplaceNotFound { .. } | MarketplaceError::InvalidMarketplaceFile { .. } | MarketplaceError::PluginNotFound { .. } | MarketplaceError::PluginNotAvailable { .. } | MarketplaceError::InvalidPlugin(_) ) | Self::Store(PluginStoreError::Invalid(_)) ) } } #[derive(Debug, thiserror::Error)] pub enum PluginUninstallError { #[error("{0}")] InvalidPluginId(#[from] PluginIdError), #[error("{0}")] Remote(#[from] RemotePluginMutationError), #[error("{0}")] Store(#[from] PluginStoreError), #[error("{0}")] Config(#[from] anyhow::Error), #[error("failed to join plugin uninstall task: {0}")] Join(#[from] tokio::task::JoinError), } impl PluginUninstallError { fn join(source: tokio::task::JoinError) -> Self { Self::Join(source) } pub fn is_invalid_request(&self) -> bool { matches!(self, Self::InvalidPluginId(_)) } } pub(crate) fn configured_plugins_from_stack( config_layer_stack: &ConfigLayerStack, ) -> HashMap { // Plugin entries remain persisted user config only. let Some(user_layer) = config_layer_stack.get_user_layer() else { return HashMap::new(); }; configured_plugins_from_user_config_value(&user_layer.config) } fn configured_plugins_from_user_config_value( user_config: &toml::Value, ) -> HashMap { let Some(plugins_value) = user_config.get("plugins") else { return HashMap::new(); }; match plugins_value.clone().try_into() { Ok(plugins) => plugins, Err(err) => { warn!("invalid plugins config: {err}"); HashMap::new() } } } #[cfg(test)] #[path = "manager_tests.rs"] mod tests;