use super::PluginLoadOutcome; use super::curated_plugins_repo_path; use super::installed_marketplaces::installed_marketplace_roots_from_config; use super::read_curated_plugins_sha; use super::startup_sync::start_startup_remote_plugin_sync_once; use super::sync_openai_plugins_repo; use crate::SkillMetadata; use crate::config::Config; use crate::config::ConfigService; use crate::config::ConfigServiceError; use crate::config::edit::ConfigEdit; use crate::config::edit::ConfigEditsBuilder; use crate::config_loader::ConfigLayerStack; use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::ConfigValueWriteParams; use codex_app_server_protocol::MergeStrategy; use codex_config::types::PluginConfig; use codex_core_plugins::loader::configured_curated_plugin_ids_from_codex_home; use codex_core_plugins::loader::installed_plugin_telemetry_metadata; use codex_core_plugins::loader::load_plugin_apps; use codex_core_plugins::loader::load_plugin_mcp_servers; use codex_core_plugins::loader::load_plugin_skills; use codex_core_plugins::loader::load_plugins_from_layer_stack; use codex_core_plugins::loader::log_plugin_load_errors; use codex_core_plugins::loader::materialize_marketplace_plugin_source; use codex_core_plugins::loader::plugin_telemetry_metadata_from_root; use codex_core_plugins::loader::refresh_curated_plugin_cache; use codex_core_plugins::loader::refresh_non_curated_plugin_cache; use codex_core_plugins::loader::refresh_non_curated_plugin_cache_force_reinstall; use codex_core_plugins::manifest::PluginManifestInterface; use codex_core_plugins::manifest::load_plugin_manifest; use codex_core_plugins::marketplace::MarketplaceError; use codex_core_plugins::marketplace::MarketplaceInterface; use codex_core_plugins::marketplace::MarketplaceListError; use codex_core_plugins::marketplace::MarketplacePluginAuthPolicy; use codex_core_plugins::marketplace::MarketplacePluginPolicy; use codex_core_plugins::marketplace::MarketplacePluginSource; use codex_core_plugins::marketplace::ResolvedMarketplacePlugin; use codex_core_plugins::marketplace::find_installable_marketplace_plugin; use codex_core_plugins::marketplace::find_marketplace_plugin; use codex_core_plugins::marketplace::list_marketplaces; use codex_core_plugins::marketplace::load_marketplace; use codex_core_plugins::marketplace::plugin_interface_with_marketplace_category; use codex_core_plugins::marketplace_upgrade::ConfiguredMarketplaceUpgradeError; use codex_core_plugins::marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome; use codex_core_plugins::marketplace_upgrade::configured_git_marketplace_names; use codex_core_plugins::marketplace_upgrade::upgrade_configured_git_marketplaces; use codex_core_plugins::remote::RemotePluginFetchError; use codex_core_plugins::remote::RemotePluginMutationError; use codex_core_plugins::remote::RemotePluginServiceConfig; use codex_core_plugins::store::PluginInstallResult as StorePluginInstallResult; use codex_core_plugins::store::PluginStore; use codex_core_plugins::store::PluginStoreError; use codex_features::Feature; use codex_login::AuthManager; use codex_login::CodexAuth; use codex_plugin::AppConnectorId; use codex_plugin::PluginCapabilitySummary; use codex_plugin::PluginId; use codex_plugin::PluginIdError; use codex_plugin::prompt_safe_plugin_description; use codex_protocol::protocol::Product; use codex_utils_absolute_path::AbsolutePathBuf; use serde_json::json; use std::collections::HashMap; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Instant; use tokio::sync::Mutex; use toml_edit::value; use tracing::info; use tracing::warn; pub const OPENAI_CURATED_MARKETPLACE_NAME: &str = "openai-curated"; pub const OPENAI_CURATED_MARKETPLACE_DISPLAY_NAME: &str = "OpenAI Curated"; pub const OPENAI_BUNDLED_MARKETPLACE_NAME: &str = "openai-bundled"; static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false); const FEATURED_PLUGIN_IDS_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 3); #[derive(Clone, PartialEq, Eq)] struct FeaturedPluginIdsCacheKey { chatgpt_base_url: String, account_id: Option, chatgpt_user_id: Option, is_workspace_account: bool, } #[derive(Clone)] struct CachedFeaturedPluginIds { key: FeaturedPluginIdsCacheKey, expires_at: Instant, featured_plugin_ids: Vec, } #[derive(Clone, PartialEq, Eq)] struct NonCuratedCacheRefreshRequest { roots: Vec, mode: NonCuratedCacheRefreshMode, } #[derive(Clone, Copy, PartialEq, Eq)] enum NonCuratedCacheRefreshMode { IfVersionChanged, ForceReinstall, } #[derive(Default)] struct NonCuratedCacheRefreshState { requested: Option, last_refreshed: Option, in_flight: bool, } #[derive(Default)] struct ConfiguredMarketplaceUpgradeState { in_flight: bool, } fn remote_plugin_service_config(config: &Config) -> RemotePluginServiceConfig { RemotePluginServiceConfig { chatgpt_base_url: config.chatgpt_base_url.clone(), } } fn featured_plugin_ids_cache_key( config: &Config, auth: Option<&CodexAuth>, ) -> FeaturedPluginIdsCacheKey { let token_data = auth.and_then(|auth| auth.get_token_data().ok()); let account_id = token_data .as_ref() .and_then(|token_data| token_data.account_id.clone()); let chatgpt_user_id = token_data .as_ref() .and_then(|token_data| token_data.id_token.chatgpt_user_id.clone()); let is_workspace_account = token_data .as_ref() .is_some_and(|token_data| token_data.id_token.is_workspace_account()); FeaturedPluginIdsCacheKey { chatgpt_base_url: config.chatgpt_base_url.clone(), account_id, chatgpt_user_id, is_workspace_account, } } #[derive(Debug, Clone, PartialEq, Eq)] pub struct PluginInstallRequest { pub plugin_name: String, pub marketplace_path: AbsolutePathBuf, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct PluginReadRequest { pub plugin_name: String, pub marketplace_path: AbsolutePathBuf, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct PluginInstallOutcome { pub plugin_id: PluginId, pub plugin_version: String, pub installed_path: AbsolutePathBuf, pub auth_policy: MarketplacePluginAuthPolicy, } #[derive(Debug, Clone, PartialEq)] pub struct PluginReadOutcome { pub marketplace_name: String, pub marketplace_path: AbsolutePathBuf, pub plugin: PluginDetail, } #[derive(Debug, Clone, PartialEq)] pub struct PluginDetail { pub id: String, pub name: String, pub description: Option, pub source: MarketplacePluginSource, pub policy: MarketplacePluginPolicy, pub interface: Option, pub installed: bool, pub enabled: bool, pub skills: Vec, pub disabled_skill_paths: HashSet, pub apps: Vec, pub mcp_server_names: Vec, pub details_unavailable_reason: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PluginDetailsUnavailableReason { InstallRequiredForRemoteSource, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct ConfiguredMarketplace { pub name: String, pub path: AbsolutePathBuf, pub interface: Option, pub plugins: Vec, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct ConfiguredMarketplacePlugin { pub id: String, pub name: String, pub source: MarketplacePluginSource, pub policy: MarketplacePluginPolicy, pub interface: Option, pub installed: bool, pub enabled: bool, } #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct ConfiguredMarketplaceListOutcome { pub marketplaces: Vec, pub errors: Vec, } impl From for PluginCapabilitySummary { fn from(value: PluginDetail) -> Self { let has_skills = value.skills.iter().any(|skill| { !value .disabled_skill_paths .contains(&skill.path_to_skills_md) }); Self { config_name: value.id, display_name: value.name, description: prompt_safe_plugin_description(value.description.as_deref()), has_skills, mcp_server_names: value.mcp_server_names, app_connector_ids: value.apps, } } } #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct RemotePluginSyncResult { /// Plugin ids newly installed into the local plugin cache. pub installed_plugin_ids: Vec, /// Plugin ids whose local config was changed to enabled. pub enabled_plugin_ids: Vec, /// Plugin ids whose local config was changed to disabled. /// This is not populated by `sync_plugins_from_remote`. pub disabled_plugin_ids: Vec, /// Plugin ids removed from local cache or plugin config. pub uninstalled_plugin_ids: Vec, } #[derive(Debug, thiserror::Error)] pub enum PluginRemoteSyncError { #[error("chatgpt authentication required to sync remote plugins")] AuthRequired, #[error( "chatgpt authentication required to sync remote plugins; api key auth is not supported" )] UnsupportedAuthMode, #[error("failed to read auth token for remote plugin sync: {0}")] AuthToken(#[source] std::io::Error), #[error("failed to send remote plugin sync request to {url}: {source}")] Request { url: String, #[source] source: reqwest::Error, }, #[error("remote plugin sync request to {url} failed with status {status}: {body}")] UnexpectedStatus { url: String, status: reqwest::StatusCode, body: String, }, #[error("failed to parse remote plugin sync response from {url}: {source}")] Decode { url: String, #[source] source: serde_json::Error, }, #[error("local curated marketplace is not available")] LocalMarketplaceNotFound, #[error("remote marketplace `{marketplace_name}` is not available locally")] UnknownRemoteMarketplace { marketplace_name: String }, #[error("duplicate remote plugin `{plugin_name}` in sync response")] DuplicateRemotePlugin { plugin_name: String }, #[error( "remote plugin `{plugin_name}` was not found in local marketplace `{marketplace_name}`" )] UnknownRemotePlugin { plugin_name: String, marketplace_name: String, }, #[error("{0}")] InvalidPluginId(#[from] PluginIdError), #[error("{0}")] Marketplace(#[from] MarketplaceError), #[error("{0}")] Store(#[from] PluginStoreError), #[error("{0}")] Config(#[from] anyhow::Error), #[error("failed to join remote plugin sync task: {0}")] Join(#[from] tokio::task::JoinError), } impl PluginRemoteSyncError { fn join(source: tokio::task::JoinError) -> Self { Self::Join(source) } } impl From for PluginRemoteSyncError { fn from(value: RemotePluginFetchError) -> Self { match value { RemotePluginFetchError::AuthRequired => Self::AuthRequired, RemotePluginFetchError::UnsupportedAuthMode => Self::UnsupportedAuthMode, RemotePluginFetchError::AuthToken(source) => Self::AuthToken(source), RemotePluginFetchError::Request { url, source } => Self::Request { url, source }, RemotePluginFetchError::UnexpectedStatus { url, status, body } => { Self::UnexpectedStatus { url, status, body } } RemotePluginFetchError::Decode { url, source } => Self::Decode { url, source }, } } } pub struct PluginsManager { codex_home: PathBuf, store: PluginStore, featured_plugin_ids_cache: RwLock>, configured_marketplace_upgrade_state: RwLock, non_curated_cache_refresh_state: RwLock, cached_enabled_outcome: RwLock>, remote_sync_lock: Mutex<()>, restriction_product: Option, analytics_events_client: RwLock>, } impl PluginsManager { pub fn new(codex_home: PathBuf) -> Self { Self::new_with_restriction_product(codex_home, Some(Product::Codex)) } pub fn new_with_restriction_product( codex_home: PathBuf, restriction_product: Option, ) -> Self { // Product restrictions are enforced at marketplace admission time for a given CODEX_HOME: // listing, install, and curated refresh all consult this restriction context before new // plugins enter local config or cache. After admission, runtime plugin loading trusts the // contents of that CODEX_HOME and does not re-filter configured plugins by product, so // already-admitted plugins may continue exposing MCP servers/tools from shared local state. // // This assumes a single CODEX_HOME is only used by one product. Self { codex_home: codex_home.clone(), store: PluginStore::new(codex_home), featured_plugin_ids_cache: RwLock::new(None), configured_marketplace_upgrade_state: RwLock::new( ConfiguredMarketplaceUpgradeState::default(), ), non_curated_cache_refresh_state: RwLock::new(NonCuratedCacheRefreshState::default()), cached_enabled_outcome: RwLock::new(None), remote_sync_lock: Mutex::new(()), restriction_product, analytics_events_client: RwLock::new(None), } } pub fn set_analytics_events_client(&self, analytics_events_client: AnalyticsEventsClient) { let mut stored_client = match self.analytics_events_client.write() { Ok(client_guard) => client_guard, Err(err) => err.into_inner(), }; *stored_client = Some(analytics_events_client); } fn restriction_product_matches(&self, products: Option<&[Product]>) -> bool { match products { None => true, Some([]) => false, Some(products) => self .restriction_product .is_some_and(|product| product.matches_product_restriction(products)), } } pub async fn plugins_for_config(&self, config: &Config) -> PluginLoadOutcome { self.plugins_for_config_with_force_reload(config, /*force_reload*/ false) .await } pub(crate) async fn plugins_for_config_with_force_reload( &self, config: &Config, force_reload: bool, ) -> PluginLoadOutcome { if !config.features.enabled(Feature::Plugins) { return PluginLoadOutcome::default(); } if !force_reload && let Some(outcome) = self.cached_enabled_outcome() { return outcome; } let outcome = load_plugins_from_layer_stack( &config.config_layer_stack, &self.store, self.restriction_product, ) .await; log_plugin_load_errors(&outcome); let mut cache = match self.cached_enabled_outcome.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; *cache = Some(outcome.clone()); outcome } pub fn clear_cache(&self) { let mut cached_enabled_outcome = match self.cached_enabled_outcome.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; let mut featured_plugin_ids_cache = match self.featured_plugin_ids_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; *featured_plugin_ids_cache = None; *cached_enabled_outcome = None; } /// Resolve plugin skill roots for a config layer stack without touching the plugins cache. pub async fn effective_skill_roots_for_layer_stack( &self, config_layer_stack: &ConfigLayerStack, plugins_feature_enabled: bool, ) -> Vec { if !plugins_feature_enabled { return Vec::new(); } load_plugins_from_layer_stack(config_layer_stack, &self.store, self.restriction_product) .await .effective_skill_roots() } fn cached_enabled_outcome(&self) -> Option { match self.cached_enabled_outcome.read() { Ok(cache) => cache.clone(), Err(err) => err.into_inner().clone(), } } fn cached_featured_plugin_ids( &self, cache_key: &FeaturedPluginIdsCacheKey, ) -> Option> { { let cache = match self.featured_plugin_ids_cache.read() { Ok(cache) => cache, Err(err) => err.into_inner(), }; let now = Instant::now(); if let Some(cached) = cache.as_ref() && now < cached.expires_at && cached.key == *cache_key { return Some(cached.featured_plugin_ids.clone()); } } let mut cache = match self.featured_plugin_ids_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; let now = Instant::now(); if cache .as_ref() .is_some_and(|cached| now >= cached.expires_at || cached.key != *cache_key) { *cache = None; } None } fn write_featured_plugin_ids_cache( &self, cache_key: FeaturedPluginIdsCacheKey, featured_plugin_ids: &[String], ) { let mut cache = match self.featured_plugin_ids_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; *cache = Some(CachedFeaturedPluginIds { key: cache_key, expires_at: Instant::now() + FEATURED_PLUGIN_IDS_CACHE_TTL, featured_plugin_ids: featured_plugin_ids.to_vec(), }); } pub async fn featured_plugin_ids_for_config( &self, config: &Config, auth: Option<&CodexAuth>, ) -> Result, RemotePluginFetchError> { if !config.features.enabled(Feature::Plugins) { return Ok(Vec::new()); } let cache_key = featured_plugin_ids_cache_key(config, auth); if let Some(featured_plugin_ids) = self.cached_featured_plugin_ids(&cache_key) { return Ok(featured_plugin_ids); } let featured_plugin_ids = codex_core_plugins::remote::fetch_remote_featured_plugin_ids( &remote_plugin_service_config(config), auth, self.restriction_product, ) .await?; self.write_featured_plugin_ids_cache(cache_key, &featured_plugin_ids); Ok(featured_plugin_ids) } pub async fn install_plugin( &self, request: PluginInstallRequest, ) -> Result { let resolved = find_installable_marketplace_plugin( &request.marketplace_path, &request.plugin_name, self.restriction_product, )?; self.install_resolved_plugin(resolved).await } pub async fn install_plugin_with_remote_sync( &self, config: &Config, auth: Option<&CodexAuth>, request: PluginInstallRequest, ) -> Result { let resolved = find_installable_marketplace_plugin( &request.marketplace_path, &request.plugin_name, self.restriction_product, )?; let plugin_id = resolved.plugin_id.as_key(); // This only forwards the backend mutation before the local install flow. We rely on // `plugin/list(forceRemoteSync=true)` to sync local state rather than doing an extra // reconcile pass here. codex_core_plugins::remote::enable_remote_plugin( &remote_plugin_service_config(config), auth, &plugin_id, ) .await .map_err(PluginInstallError::from)?; self.install_resolved_plugin(resolved).await } async fn install_resolved_plugin( &self, resolved: ResolvedMarketplacePlugin, ) -> Result { let auth_policy = resolved.policy.authentication; let plugin_version = if resolved.plugin_id.marketplace_name == OPENAI_CURATED_MARKETPLACE_NAME { Some( read_curated_plugins_sha(self.codex_home.as_path()).ok_or_else(|| { PluginStoreError::Invalid( "local curated marketplace sha is not available".to_string(), ) })?, ) } else { None }; let store = self.store.clone(); let codex_home = self.codex_home.clone(); let result: StorePluginInstallResult = tokio::task::spawn_blocking(move || { let materialized = materialize_marketplace_plugin_source(codex_home.as_path(), &resolved.source) .map_err(PluginStoreError::Invalid)?; let source_path = materialized.path; if let Some(plugin_version) = plugin_version { store.install_with_version(source_path, resolved.plugin_id, plugin_version) } else { store.install(source_path, resolved.plugin_id) } }) .await .map_err(PluginInstallError::join)??; ConfigService::new_with_defaults(self.codex_home.clone()) .write_value(ConfigValueWriteParams { key_path: format!("plugins.{}", result.plugin_id.as_key()), value: json!({ "enabled": true, }), merge_strategy: MergeStrategy::Replace, file_path: None, expected_version: None, }) .await .map(|_| ()) .map_err(PluginInstallError::from)?; let analytics_events_client = match self.analytics_events_client.read() { Ok(client) => client.clone(), Err(err) => err.into_inner().clone(), }; if let Some(analytics_events_client) = analytics_events_client { analytics_events_client.track_plugin_installed( plugin_telemetry_metadata_from_root(&result.plugin_id, &result.installed_path) .await, ); } Ok(PluginInstallOutcome { plugin_id: result.plugin_id, plugin_version: result.plugin_version, installed_path: result.installed_path, auth_policy, }) } pub async fn uninstall_plugin(&self, plugin_id: String) -> Result<(), PluginUninstallError> { let plugin_id = PluginId::parse(&plugin_id)?; self.uninstall_plugin_id(plugin_id).await } pub async fn uninstall_plugin_with_remote_sync( &self, config: &Config, auth: Option<&CodexAuth>, plugin_id: String, ) -> Result<(), PluginUninstallError> { let plugin_id = PluginId::parse(&plugin_id)?; let plugin_key = plugin_id.as_key(); // This only forwards the backend mutation before the local uninstall flow. We rely on // `plugin/list(forceRemoteSync=true)` to sync local state rather than doing an extra // reconcile pass here. codex_core_plugins::remote::uninstall_remote_plugin( &remote_plugin_service_config(config), auth, &plugin_key, ) .await .map_err(PluginUninstallError::from)?; self.uninstall_plugin_id(plugin_id).await } async fn uninstall_plugin_id(&self, plugin_id: PluginId) -> Result<(), PluginUninstallError> { let plugin_telemetry = if self.store.active_plugin_root(&plugin_id).is_some() { Some(installed_plugin_telemetry_metadata(self.codex_home.as_path(), &plugin_id).await) } else { None }; let store = self.store.clone(); let plugin_id_for_store = plugin_id.clone(); tokio::task::spawn_blocking(move || store.uninstall(&plugin_id_for_store)) .await .map_err(PluginUninstallError::join)??; ConfigEditsBuilder::new(&self.codex_home) .with_edits([ConfigEdit::ClearPath { segments: vec!["plugins".to_string(), plugin_id.as_key()], }]) .apply() .await?; let analytics_events_client = match self.analytics_events_client.read() { Ok(client) => client.clone(), Err(err) => err.into_inner().clone(), }; if let Some(plugin_telemetry) = plugin_telemetry && let Some(analytics_events_client) = analytics_events_client { analytics_events_client.track_plugin_uninstalled(plugin_telemetry); } Ok(()) } pub async fn sync_plugins_from_remote( &self, config: &Config, auth: Option<&CodexAuth>, additive_only: bool, ) -> Result { let _remote_sync_guard = self.remote_sync_lock.lock().await; if !config.features.enabled(Feature::Plugins) { return Ok(RemotePluginSyncResult::default()); } info!("starting remote plugin sync"); let remote_plugins = codex_core_plugins::remote::fetch_remote_plugin_status( &remote_plugin_service_config(config), auth, ) .await .map_err(PluginRemoteSyncError::from)?; let configured_plugins = configured_plugins_from_stack(&config.config_layer_stack); let curated_marketplace_root = curated_plugins_repo_path(self.codex_home.as_path()); let curated_marketplace_path = AbsolutePathBuf::try_from( curated_marketplace_root.join(".agents/plugins/marketplace.json"), ) .map_err(|_| PluginRemoteSyncError::LocalMarketplaceNotFound)?; let curated_marketplace = match load_marketplace(&curated_marketplace_path) { Ok(marketplace) => marketplace, Err(MarketplaceError::MarketplaceNotFound { .. }) => { return Err(PluginRemoteSyncError::LocalMarketplaceNotFound); } Err(err) => return Err(err.into()), }; let marketplace_name = curated_marketplace.name.clone(); let curated_plugin_version = read_curated_plugins_sha(self.codex_home.as_path()) .ok_or_else(|| { PluginStoreError::Invalid( "local curated marketplace sha is not available".to_string(), ) })?; let mut local_plugins = Vec::<( String, PluginId, AbsolutePathBuf, Option, Option, bool, )>::new(); let mut local_plugin_names = HashSet::new(); for plugin in curated_marketplace.plugins { let plugin_name = plugin.name; if !local_plugin_names.insert(plugin_name.clone()) { warn!( plugin = plugin_name, marketplace = %marketplace_name, "ignoring duplicate local plugin entry during remote sync" ); continue; } let plugin_id = PluginId::new(plugin_name.clone(), marketplace_name.clone())?; let plugin_key = plugin_id.as_key(); let source_path = match plugin.source { MarketplacePluginSource::Local { path } => path, MarketplacePluginSource::Git { .. } => { warn!( plugin = plugin_name, marketplace = %marketplace_name, "skipping remote plugin source during remote sync" ); continue; } }; let current_enabled = configured_plugins .get(&plugin_key) .map(|plugin| plugin.enabled); let installed_version = self.store.active_plugin_version(&plugin_id); let product_allowed = self.restriction_product_matches(plugin.policy.products.as_deref()); local_plugins.push(( plugin_name, plugin_id, source_path, current_enabled, installed_version, product_allowed, )); } let mut missing_remote_plugins = Vec::::new(); let mut remote_installed_plugin_names = HashSet::::new(); for plugin in remote_plugins { if plugin.marketplace_name != marketplace_name { return Err(PluginRemoteSyncError::UnknownRemoteMarketplace { marketplace_name: plugin.marketplace_name, }); } if !local_plugin_names.contains(&plugin.name) { missing_remote_plugins.push(plugin.name); continue; } // For now, sync treats remote `enabled = false` as uninstall rather than a distinct // disabled state. // TODO: Switch sync to `plugins/installed` so install and enable states stay distinct. if !plugin.enabled { continue; } if !remote_installed_plugin_names.insert(plugin.name.clone()) { return Err(PluginRemoteSyncError::DuplicateRemotePlugin { plugin_name: plugin.name, }); } } let mut config_edits = Vec::new(); let mut installs = Vec::new(); let mut uninstalls = Vec::new(); let mut result = RemotePluginSyncResult::default(); let remote_plugin_count = remote_installed_plugin_names.len(); let local_plugin_count = local_plugins.len(); if !missing_remote_plugins.is_empty() { let sample_missing_plugins = missing_remote_plugins .iter() .take(10) .cloned() .collect::>(); warn!( marketplace = %marketplace_name, missing_remote_plugin_count = missing_remote_plugins.len(), missing_remote_plugin_examples = ?sample_missing_plugins, "ignoring remote plugins missing from local marketplace during sync" ); } for ( plugin_name, plugin_id, source_path, current_enabled, installed_version, product_allowed, ) in local_plugins { let plugin_key = plugin_id.as_key(); let is_installed = installed_version.is_some(); if !product_allowed { continue; } if remote_installed_plugin_names.contains(&plugin_name) { if !is_installed { installs.push(( source_path, plugin_id.clone(), curated_plugin_version.clone(), )); } if !is_installed { result.installed_plugin_ids.push(plugin_key.clone()); } if current_enabled != Some(true) { result.enabled_plugin_ids.push(plugin_key.clone()); config_edits.push(ConfigEdit::SetPath { segments: vec!["plugins".to_string(), plugin_key, "enabled".to_string()], value: value(true), }); } } else if !additive_only { if is_installed { uninstalls.push(plugin_id); } if is_installed || current_enabled.is_some() { result.uninstalled_plugin_ids.push(plugin_key.clone()); } if current_enabled.is_some() { config_edits.push(ConfigEdit::ClearPath { segments: vec!["plugins".to_string(), plugin_key], }); } } } let store = self.store.clone(); let store_result = tokio::task::spawn_blocking(move || { for (source_path, plugin_id, plugin_version) in installs { store.install_with_version(source_path, plugin_id, plugin_version)?; } for plugin_id in uninstalls { store.uninstall(&plugin_id)?; } Ok::<(), PluginStoreError>(()) }) .await .map_err(PluginRemoteSyncError::join)?; if let Err(err) = store_result { self.clear_cache(); return Err(err.into()); } let config_result = if config_edits.is_empty() { Ok(()) } else { ConfigEditsBuilder::new(&self.codex_home) .with_edits(config_edits) .apply() .await }; self.clear_cache(); config_result?; info!( marketplace = %marketplace_name, remote_plugin_count, local_plugin_count, installed_plugin_ids = ?result.installed_plugin_ids, enabled_plugin_ids = ?result.enabled_plugin_ids, disabled_plugin_ids = ?result.disabled_plugin_ids, uninstalled_plugin_ids = ?result.uninstalled_plugin_ids, "completed remote plugin sync" ); Ok(result) } pub fn list_marketplaces_for_config( &self, config: &Config, additional_roots: &[AbsolutePathBuf], ) -> Result { if !config.features.enabled(Feature::Plugins) { return Ok(ConfiguredMarketplaceListOutcome::default()); } let (installed_plugins, enabled_plugins) = self.configured_plugin_states(config); let marketplace_outcome = list_marketplaces(&self.marketplace_roots(config, additional_roots))?; let mut seen_plugin_keys = HashSet::new(); let marketplaces = marketplace_outcome .marketplaces .into_iter() .filter_map(|marketplace| { let marketplace_name = marketplace.name.clone(); let plugins = marketplace .plugins .into_iter() .filter_map(|plugin| { let plugin_key = format!("{}@{marketplace_name}", plugin.name); if !seen_plugin_keys.insert(plugin_key.clone()) { return None; } if !self.restriction_product_matches(plugin.policy.products.as_deref()) { return None; } Some(ConfiguredMarketplacePlugin { // Enabled state is keyed by `@`, so duplicate // plugin entries from duplicate marketplace files intentionally // resolve to the first discovered source. id: plugin_key.clone(), installed: installed_plugins.contains(&plugin_key), enabled: enabled_plugins.contains(&plugin_key), name: plugin.name, source: plugin.source, policy: plugin.policy, interface: plugin.interface, }) }) .collect::>(); (!plugins.is_empty()).then_some(ConfiguredMarketplace { name: marketplace.name, path: marketplace.path, interface: if marketplace_name == OPENAI_CURATED_MARKETPLACE_NAME { Some(MarketplaceInterface { display_name: Some(OPENAI_CURATED_MARKETPLACE_DISPLAY_NAME.to_string()), }) } else { marketplace.interface }, plugins, }) }) .collect(); Ok(ConfiguredMarketplaceListOutcome { marketplaces, errors: marketplace_outcome.errors, }) } pub async fn read_plugin_for_config( &self, config: &Config, request: &PluginReadRequest, ) -> Result { if !config.features.enabled(Feature::Plugins) { return Err(MarketplaceError::PluginsDisabled); } let plugin = find_marketplace_plugin(&request.marketplace_path, &request.plugin_name)?; if !self.restriction_product_matches(plugin.policy.products.as_deref()) { return Err(MarketplaceError::PluginNotFound { plugin_name: plugin.plugin_id.plugin_name, marketplace_name: plugin.plugin_id.marketplace_name, }); } let marketplace_name = plugin.plugin_id.marketplace_name.clone(); let plugin_key = plugin.plugin_id.as_key(); let (installed_plugins, enabled_plugins) = self.configured_plugin_states(config); let plugin = self .read_plugin_detail_for_marketplace_plugin( config, &marketplace_name, ConfiguredMarketplacePlugin { id: plugin_key.clone(), name: plugin.plugin_id.plugin_name, source: plugin.source, policy: plugin.policy, interface: plugin.interface, installed: installed_plugins.contains(&plugin_key), enabled: enabled_plugins.contains(&plugin_key), }, ) .await?; Ok(PluginReadOutcome { marketplace_name: if marketplace_name == OPENAI_CURATED_MARKETPLACE_NAME { OPENAI_CURATED_MARKETPLACE_DISPLAY_NAME.to_string() } else { marketplace_name }, marketplace_path: request.marketplace_path.clone(), plugin, }) } pub(crate) async fn read_plugin_detail_for_marketplace_plugin( &self, config: &Config, marketplace_name: &str, plugin: ConfiguredMarketplacePlugin, ) -> Result { if !self.restriction_product_matches(plugin.policy.products.as_deref()) { return Err(MarketplaceError::PluginNotFound { plugin_name: plugin.name, marketplace_name: marketplace_name.to_string(), }); } let plugin_id = PluginId::new(plugin.name.clone(), marketplace_name.to_string()).map_err(|err| { match err { PluginIdError::Invalid(message) => MarketplaceError::InvalidPlugin(message), } })?; let plugin_key = plugin_id.as_key(); if matches!(plugin.source, MarketplacePluginSource::Git { .. }) && !plugin.installed { return Ok(PluginDetail { id: plugin_key, name: plugin.name, description: None, source: plugin.source, policy: plugin.policy, interface: plugin.interface, installed: plugin.installed, enabled: plugin.enabled, skills: Vec::new(), disabled_skill_paths: HashSet::new(), apps: Vec::new(), mcp_server_names: Vec::new(), details_unavailable_reason: Some( PluginDetailsUnavailableReason::InstallRequiredForRemoteSource, ), }); } let source_path = if matches!(plugin.source, MarketplacePluginSource::Git { .. }) && plugin.installed { self.store.active_plugin_root(&plugin_id).ok_or_else(|| { MarketplaceError::InvalidPlugin(format!( "installed plugin cache entry is missing for {plugin_key}" )) })? } else { let codex_home = self.codex_home.clone(); let source = plugin.source.clone(); let materialized = tokio::task::spawn_blocking(move || { materialize_marketplace_plugin_source(codex_home.as_path(), &source) }) .await .map_err(|err| { MarketplaceError::InvalidPlugin(format!( "failed to materialize plugin source: {err}" )) })? .map_err(MarketplaceError::InvalidPlugin)?; materialized.path.clone() }; if !source_path.as_path().is_dir() { return Err(MarketplaceError::InvalidPlugin( "path does not exist or is not a directory".to_string(), )); } let manifest = load_plugin_manifest(source_path.as_path()).ok_or_else(|| { MarketplaceError::InvalidPlugin("missing or invalid plugin.json".to_string()) })?; let description = manifest.description.clone(); let marketplace_category = plugin .interface .as_ref() .and_then(|interface| interface.category.clone()); let interface = plugin_interface_with_marketplace_category( manifest.interface.clone(), marketplace_category, ); let resolved_skills = load_plugin_skills( &source_path, &manifest.paths, self.restriction_product, &codex_core_skills::config_rules::skill_config_rules_from_stack( &config.config_layer_stack, ), ) .await; let apps = load_plugin_apps(source_path.as_path()).await; let mut mcp_server_names = load_plugin_mcp_servers(source_path.as_path()) .await .into_keys() .collect::>(); mcp_server_names.sort_unstable(); mcp_server_names.dedup(); Ok(PluginDetail { id: plugin.id, name: plugin.name, description, source: plugin.source, policy: plugin.policy, interface, installed: plugin.installed, enabled: plugin.enabled, skills: resolved_skills.skills, disabled_skill_paths: resolved_skills.disabled_skill_paths, apps, mcp_server_names, details_unavailable_reason: None, }) } pub fn maybe_start_plugin_startup_tasks_for_config( self: &Arc, config: &Config, auth_manager: Arc, ) { if config.features.enabled(Feature::Plugins) { self.start_curated_repo_sync(); let should_spawn_marketplace_auto_upgrade = { let mut state = match self.configured_marketplace_upgrade_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; if state.in_flight { false } else { state.in_flight = true; true } }; if should_spawn_marketplace_auto_upgrade { let manager = Arc::clone(self); let config = config.clone(); if let Err(err) = std::thread::Builder::new() .name("plugins-marketplace-auto-upgrade".to_string()) .spawn(move || { let outcome = manager.upgrade_configured_marketplaces_for_config( &config, /*marketplace_name*/ None, ); match outcome { Ok(outcome) => { for error in outcome.errors { warn!( marketplace = error.marketplace_name, error = %error.message, "failed to auto-upgrade configured marketplace" ); } } Err(err) => { warn!("failed to auto-upgrade configured marketplaces: {err}"); } } let mut state = match manager.configured_marketplace_upgrade_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; state.in_flight = false; }) { let mut state = match self.configured_marketplace_upgrade_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; state.in_flight = false; warn!("failed to start configured marketplace auto-upgrade task: {err}"); } } start_startup_remote_plugin_sync_once( Arc::clone(self), self.codex_home.clone(), config.clone(), auth_manager.clone(), ); let config = config.clone(); let manager = Arc::clone(self); tokio::spawn(async move { let auth = auth_manager.auth().await; if let Err(err) = manager .featured_plugin_ids_for_config(&config, auth.as_ref()) .await { warn!( error = %err, "failed to warm featured plugin ids cache" ); } }); } } pub fn upgrade_configured_marketplaces_for_config( &self, config: &Config, marketplace_name: Option<&str>, ) -> Result { if let Some(marketplace_name) = marketplace_name && !configured_git_marketplace_names(&config.config_layer_stack) .iter() .any(|name| name == marketplace_name) { return Err(format!( "marketplace `{marketplace_name}` is not configured as a Git marketplace" )); } let mut outcome = upgrade_configured_git_marketplaces( self.codex_home.as_path(), &config.config_layer_stack, marketplace_name, ); if !outcome.upgraded_roots.is_empty() { match refresh_non_curated_plugin_cache_force_reinstall( self.codex_home.as_path(), &outcome.upgraded_roots, ) { Ok(cache_refreshed) => { if cache_refreshed { self.clear_cache(); } } Err(err) => { self.clear_cache(); outcome.errors.push(ConfiguredMarketplaceUpgradeError { marketplace_name: marketplace_name .unwrap_or("all configured marketplaces") .to_string(), message: format!( "failed to refresh installed plugin cache after marketplace upgrade: {err}" ), }); } } } Ok(outcome) } pub fn maybe_start_non_curated_plugin_cache_refresh( self: &Arc, roots: &[AbsolutePathBuf], ) { self.schedule_non_curated_plugin_cache_refresh( roots, NonCuratedCacheRefreshMode::IfVersionChanged, ); } fn schedule_non_curated_plugin_cache_refresh( self: &Arc, roots: &[AbsolutePathBuf], mode: NonCuratedCacheRefreshMode, ) { let mut roots = roots.to_vec(); roots.sort_unstable(); roots.dedup(); if roots.is_empty() { return; } let request = NonCuratedCacheRefreshRequest { roots, mode }; let should_spawn = { let mut state = match self.non_curated_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; // Collapse repeated plugin/list requests onto one worker and only queue another pass // when the requested roots set actually changes. Forced reinstall requests are not // deduped against the last completed pass because the same marketplace root path can // point at newly activated files after an auto-upgrade. if state.requested.as_ref() == Some(&request) || (mode == NonCuratedCacheRefreshMode::IfVersionChanged && !state.in_flight && state.last_refreshed.as_ref() == Some(&request)) { return; } if mode == NonCuratedCacheRefreshMode::IfVersionChanged && state.requested.as_ref().is_some_and(|requested| { requested.mode == NonCuratedCacheRefreshMode::ForceReinstall && requested.roots == request.roots }) { return; } state.requested = Some(request); if state.in_flight { false } else { state.in_flight = true; true } }; if !should_spawn { return; } let manager = Arc::clone(self); if let Err(err) = std::thread::Builder::new() .name("plugins-non-curated-cache-refresh".to_string()) .spawn(move || manager.run_non_curated_plugin_cache_refresh_loop()) { let mut state = match self.non_curated_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; state.in_flight = false; state.requested = None; warn!("failed to start non-curated plugin cache refresh task: {err}"); } } fn start_curated_repo_sync(self: &Arc) { if CURATED_REPO_SYNC_STARTED.swap(true, Ordering::SeqCst) { return; } let manager = Arc::clone(self); let codex_home = self.codex_home.clone(); if let Err(err) = std::thread::Builder::new() .name("plugins-curated-repo-sync".to_string()) .spawn( move || match sync_openai_plugins_repo(codex_home.as_path()) { Ok(curated_plugin_version) => { let configured_curated_plugin_ids = configured_curated_plugin_ids_from_codex_home(codex_home.as_path()); match refresh_curated_plugin_cache( codex_home.as_path(), &curated_plugin_version, &configured_curated_plugin_ids, ) { Ok(cache_refreshed) => { if cache_refreshed { manager.clear_cache(); } } Err(err) => { manager.clear_cache(); CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst); warn!("failed to refresh curated plugin cache after sync: {err}"); } } } Err(err) => { CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst); warn!("failed to sync curated plugins repo: {err}"); } }, ) { CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst); warn!("failed to start curated plugins repo sync task: {err}"); } } fn run_non_curated_plugin_cache_refresh_loop(self: Arc) { loop { let request = { let state = match self.non_curated_cache_refresh_state.read() { Ok(state) => state, Err(err) => err.into_inner(), }; state.requested.clone() }; let Some(request) = request else { let mut state = match self.non_curated_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; state.in_flight = false; return; }; let refresh_result = match request.mode { NonCuratedCacheRefreshMode::IfVersionChanged => { refresh_non_curated_plugin_cache(self.codex_home.as_path(), &request.roots) } NonCuratedCacheRefreshMode::ForceReinstall => { refresh_non_curated_plugin_cache_force_reinstall( self.codex_home.as_path(), &request.roots, ) } }; let refreshed = match refresh_result { Ok(cache_refreshed) => { if cache_refreshed { self.clear_cache(); } true } Err(err) => { self.clear_cache(); warn!("failed to refresh non-curated plugin cache: {err}"); false } }; let mut state = match self.non_curated_cache_refresh_state.write() { Ok(state) => state, Err(err) => err.into_inner(), }; if refreshed { state.last_refreshed = Some(request.clone()); } if state.requested.as_ref() == Some(&request) { state.requested = None; state.in_flight = false; return; } } } fn configured_plugin_states(&self, config: &Config) -> (HashSet, HashSet) { let configured_plugins = configured_plugins_from_stack(&config.config_layer_stack); let installed_plugins = configured_plugins .keys() .filter(|plugin_key| { PluginId::parse(plugin_key) .ok() .is_some_and(|plugin_id| self.store.is_installed(&plugin_id)) }) .cloned() .collect::>(); let enabled_plugins = configured_plugins .into_iter() .filter_map(|(plugin_key, plugin)| plugin.enabled.then_some(plugin_key)) .collect::>(); (installed_plugins, enabled_plugins) } fn marketplace_roots( &self, config: &Config, additional_roots: &[AbsolutePathBuf], ) -> Vec { // Treat the curated catalog as an extra marketplace root so plugin listing can surface it // without requiring every caller to know where it is stored. let mut roots = additional_roots.to_vec(); roots.extend(installed_marketplace_roots_from_config( config, self.codex_home.as_path(), )); let curated_repo_root = curated_plugins_repo_path(self.codex_home.as_path()); if curated_repo_root.is_dir() && let Ok(curated_repo_root) = AbsolutePathBuf::try_from(curated_repo_root) { roots.push(curated_repo_root); } roots.sort_unstable(); roots.dedup(); roots } } #[derive(Debug, thiserror::Error)] pub enum PluginInstallError { #[error("{0}")] Marketplace(#[from] MarketplaceError), #[error("{0}")] Remote(#[from] RemotePluginMutationError), #[error("{0}")] Store(#[from] PluginStoreError), #[error("{0}")] Config(#[from] ConfigServiceError), #[error("failed to join plugin install task: {0}")] Join(#[from] tokio::task::JoinError), } impl PluginInstallError { fn join(source: tokio::task::JoinError) -> Self { Self::Join(source) } pub fn is_invalid_request(&self) -> bool { matches!( self, Self::Marketplace( MarketplaceError::MarketplaceNotFound { .. } | MarketplaceError::InvalidMarketplaceFile { .. } | MarketplaceError::PluginNotFound { .. } | MarketplaceError::PluginNotAvailable { .. } | MarketplaceError::InvalidPlugin(_) ) | Self::Store(PluginStoreError::Invalid(_)) ) } } #[derive(Debug, thiserror::Error)] pub enum PluginUninstallError { #[error("{0}")] InvalidPluginId(#[from] PluginIdError), #[error("{0}")] Remote(#[from] RemotePluginMutationError), #[error("{0}")] Store(#[from] PluginStoreError), #[error("{0}")] Config(#[from] anyhow::Error), #[error("failed to join plugin uninstall task: {0}")] Join(#[from] tokio::task::JoinError), } impl PluginUninstallError { fn join(source: tokio::task::JoinError) -> Self { Self::Join(source) } pub fn is_invalid_request(&self) -> bool { matches!(self, Self::InvalidPluginId(_)) } } pub(crate) fn configured_plugins_from_stack( config_layer_stack: &ConfigLayerStack, ) -> HashMap { // Plugin entries remain persisted user config only. let Some(user_layer) = config_layer_stack.get_user_layer() else { return HashMap::new(); }; configured_plugins_from_user_config_value(&user_layer.config) } fn configured_plugins_from_user_config_value( user_config: &toml::Value, ) -> HashMap { let Some(plugins_value) = user_config.get("plugins") else { return HashMap::new(); }; match plugins_value.clone().try_into() { Ok(plugins) => plugins, Err(err) => { warn!("invalid plugins config: {err}"); HashMap::new() } } } #[cfg(test)] #[path = "manager_tests.rs"] mod tests;