diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index e1a97216bd..82f92febee 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2504,7 +2504,6 @@ version = "0.0.0" dependencies = [ "anyhow", "chrono", - "codex-analytics", "codex-app-server-protocol", "codex-config", "codex-core-skills", diff --git a/codex-rs/app-server/src/codex_message_processor/plugins.rs b/codex-rs/app-server/src/codex_message_processor/plugins.rs index edc0119f1b..e64267b279 100644 --- a/codex-rs/app-server/src/codex_message_processor/plugins.rs +++ b/codex-rs/app-server/src/codex_message_processor/plugins.rs @@ -3,6 +3,9 @@ use crate::error_code::internal_error; use crate::error_code::invalid_request; use codex_app_server_protocol::PluginInstallPolicy; use codex_core::config::edit::ConfigEditsBuilder; +use codex_core_plugins::loader::installed_plugin_telemetry_metadata; +use codex_core_plugins::loader::plugin_telemetry_metadata_from_root; +use codex_core_plugins::store::PluginStore; impl CodexMessageProcessor { pub(super) async fn plugin_list( @@ -488,6 +491,9 @@ impl CodexMessageProcessor { .map_err(|err| { internal_error(format!("failed to persist installed plugin config: {err}")) })?; + self.analytics_events_client.track_plugin_installed( + plugin_telemetry_metadata_from_root(&result.plugin_id, &result.installed_path).await, + ); let config = match self.load_latest_config(config_cwd).await { Ok(config) => config, Err(err) => { @@ -710,6 +716,19 @@ impl CodexMessageProcessor { if !plugin_id.is_empty() && is_valid_remote_plugin_id(&plugin_id) { return self.remote_plugin_uninstall_response(plugin_id).await; } + let parsed_plugin_id = codex_core::plugins::PluginId::parse(&plugin_id) + .map_err(|err| invalid_request(err.to_string()))?; + let plugin_telemetry = + match PluginStore::try_new(self.config.codex_home.as_path().to_path_buf()) { + Ok(store) if store.active_plugin_root(&parsed_plugin_id).is_some() => Some( + installed_plugin_telemetry_metadata( + self.config.codex_home.as_path(), + &parsed_plugin_id, + ) + .await, + ), + Ok(_) | Err(_) => None, + }; let plugins_manager = self.thread_manager.plugins_manager(); plugins_manager @@ -721,6 +740,10 @@ impl CodexMessageProcessor { .apply() .await .map_err(|err| internal_error(format!("failed to clear plugin config: {err}")))?; + if let Some(plugin_telemetry) = plugin_telemetry { + self.analytics_events_client + .track_plugin_uninstalled(plugin_telemetry); + } match self.load_latest_config(/*fallback_cwd*/ None).await { Ok(config) => self.on_effective_plugins_changed(config), Err(err) => { diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index c915e5a356..466a27179a 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -292,9 +292,6 @@ impl MessageProcessor { environment_manager, Some(analytics_events_client.clone()), )); - thread_manager - .plugins_manager() - .set_analytics_events_client(analytics_events_client.clone()); let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs { auth_manager: auth_manager.clone(), diff --git a/codex-rs/config/src/mcp_edit.rs b/codex-rs/config/src/mcp_edit.rs index f7241159a2..0b2c5bb7f8 100644 --- a/codex-rs/config/src/mcp_edit.rs +++ b/codex-rs/config/src/mcp_edit.rs @@ -12,6 +12,8 @@ use toml_edit::Item as TomlItem; use toml_edit::Table as TomlTable; use toml_edit::value; +use codex_utils_path::write_atomically; + use crate::AppToolApproval; use crate::CONFIG_TOML_FILE; use crate::McpServerConfig; @@ -117,8 +119,7 @@ impl ConfigEditsBuilder { for edit in &self.plugin_edits { apply_plugin_config_edit(&mut doc, edit); } - fs::create_dir_all(&self.codex_home)?; - fs::write(config_path, doc.to_string()) + write_atomically(&config_path, &doc.to_string()) } } diff --git a/codex-rs/core-plugins/Cargo.toml b/codex-rs/core-plugins/Cargo.toml index 7c8e8fb760..9555b40d42 100644 --- a/codex-rs/core-plugins/Cargo.toml +++ b/codex-rs/core-plugins/Cargo.toml @@ -14,7 +14,6 @@ workspace = true [dependencies] anyhow = { workspace = true } -codex-analytics = { workspace = true } codex-app-server-protocol = { workspace = true } codex-config = { workspace = true } codex-core-skills = { workspace = true } diff --git a/codex-rs/core-plugins/src/loader.rs b/codex-rs/core-plugins/src/loader.rs index d809dc61fb..bf28d10d50 100644 --- a/codex-rs/core-plugins/src/loader.rs +++ b/codex-rs/core-plugins/src/loader.rs @@ -170,13 +170,7 @@ pub fn remote_installed_plugins_to_config( return None; } }; - // Remote installed refresh materializes bundles before publishing state. Keep this - // version check so partial refresh failures do not make missing or stale local bundles - // effective. - let release_version = plugin.release_version.as_deref()?; - if store.active_plugin_version(&plugin_id).as_deref() != Some(release_version) { - return None; - } + store.active_plugin_root(&plugin_id)?; Some(( plugin_id.as_key(), PluginConfig { diff --git a/codex-rs/core-plugins/src/manager.rs b/codex-rs/core-plugins/src/manager.rs index 5e57896caa..fbeab960fa 100644 --- a/codex-rs/core-plugins/src/manager.rs +++ b/codex-rs/core-plugins/src/manager.rs @@ -3,14 +3,12 @@ use crate::PluginLoadOutcome; use crate::installed_marketplaces::installed_marketplace_roots_from_layer_stack; use crate::loader::configured_curated_plugin_ids_from_codex_home; use crate::loader::curated_plugin_cache_version; -use crate::loader::installed_plugin_telemetry_metadata; use crate::loader::load_plugin_apps; use crate::loader::load_plugin_mcp_servers; use crate::loader::load_plugin_skills; use crate::loader::load_plugins_from_layer_stack; use crate::loader::log_plugin_load_errors; use crate::loader::materialize_marketplace_plugin_source; -use crate::loader::plugin_telemetry_metadata_from_root; use crate::loader::refresh_curated_plugin_cache; use crate::loader::refresh_non_curated_plugin_cache; use crate::loader::refresh_non_curated_plugin_cache_force_reinstall; @@ -48,7 +46,7 @@ use crate::startup_sync::sync_openai_plugins_repo; use crate::store::PluginInstallResult as StorePluginInstallResult; use crate::store::PluginStore; use crate::store::PluginStoreError; -use codex_analytics::AnalyticsEventsClient; +use crate::store::validate_plugin_version_segment; use codex_config::ConfigEditsBuilder; use codex_config::ConfigLayerStack; use codex_config::types::PluginConfig; @@ -358,12 +356,12 @@ pub struct PluginsManager { remote_installed_plugins_cache_refresh_state: RwLock, remote_sync_lock: Semaphore, restriction_product: Option, - analytics_events_client: RwLock>, } #[derive(Clone)] struct CachedPluginLoadOutcome { config_version: String, + remote_plugins_enabled: bool, plugin_hooks_enabled: bool, outcome: PluginLoadOutcome, } @@ -399,18 +397,9 @@ impl PluginsManager { ), remote_sync_lock: Semaphore::new(/*permits*/ 1), restriction_product, - analytics_events_client: RwLock::new(None), } } - pub fn set_analytics_events_client(&self, analytics_events_client: AnalyticsEventsClient) { - let mut stored_client = match self.analytics_events_client.write() { - Ok(client_guard) => client_guard, - Err(err) => err.into_inner(), - }; - *stored_client = Some(analytics_events_client); - } - fn restriction_product_matches(&self, products: Option<&[Product]>) -> bool { match products { None => true, @@ -452,8 +441,11 @@ impl PluginsManager { let config_version = version_for_toml(&config_layer_stack.effective_config()); if !force_reload - && let Some(outcome) = - self.cached_enabled_outcome(&config_version, plugin_hooks_enabled) + && let Some(outcome) = self.cached_enabled_outcome( + &config_version, + remote_plugins_enabled, + plugin_hooks_enabled, + ) { return outcome; } @@ -473,6 +465,7 @@ impl PluginsManager { }; *cache = Some(CachedPluginLoadOutcome { config_version, + remote_plugins_enabled, plugin_hooks_enabled, outcome: outcome.clone(), }); @@ -538,6 +531,7 @@ impl PluginsManager { fn cached_enabled_outcome( &self, config_version: &str, + remote_plugins_enabled: bool, plugin_hooks_enabled: bool, ) -> Option { match self.cached_enabled_outcome.read() { @@ -545,6 +539,7 @@ impl PluginsManager { .as_ref() .filter(|cached| { cached.config_version == config_version + && cached.remote_plugins_enabled == remote_plugins_enabled && cached.plugin_hooks_enabled == plugin_hooks_enabled }) .map(|cached| cached.outcome.clone()), @@ -553,6 +548,7 @@ impl PluginsManager { .as_ref() .filter(|cached| { cached.config_version == config_version + && cached.remote_plugins_enabled == remote_plugins_enabled && cached.plugin_hooks_enabled == plugin_hooks_enabled }) .map(|cached| cached.outcome.clone()), @@ -828,17 +824,6 @@ impl PluginsManager { .await .map_err(PluginInstallError::join)??; - let analytics_events_client = match self.analytics_events_client.read() { - Ok(client) => client.clone(), - Err(err) => err.into_inner().clone(), - }; - if let Some(analytics_events_client) = analytics_events_client { - analytics_events_client.track_plugin_installed( - plugin_telemetry_metadata_from_root(&result.plugin_id, &result.installed_path) - .await, - ); - } - Ok(PluginInstallOutcome { plugin_id: result.plugin_id, plugin_version: result.plugin_version, @@ -874,27 +859,12 @@ impl PluginsManager { } async fn uninstall_plugin_id(&self, plugin_id: PluginId) -> Result<(), PluginUninstallError> { - let plugin_telemetry = if self.store.active_plugin_root(&plugin_id).is_some() { - Some(installed_plugin_telemetry_metadata(self.codex_home.as_path(), &plugin_id).await) - } else { - None - }; let store = self.store.clone(); let plugin_id_for_store = plugin_id.clone(); tokio::task::spawn_blocking(move || store.uninstall(&plugin_id_for_store)) .await .map_err(PluginUninstallError::join)??; - let analytics_events_client = match self.analytics_events_client.read() { - Ok(client) => client.clone(), - Err(err) => err.into_inner().clone(), - }; - if let Some(plugin_telemetry) = plugin_telemetry - && let Some(analytics_events_client) = analytics_events_client - { - analytics_events_client.track_plugin_uninstalled(plugin_telemetry); - } - Ok(()) } @@ -1708,18 +1678,112 @@ impl PluginsManager { ) -> Result { let installed_plugins = crate::remote::fetch_remote_installed_plugins(service_config, auth).await?; + let previous_plugins_by_key = { + let cache = match self.remote_installed_plugins_cache.read() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + cache + .as_ref() + .map(|plugins| { + plugins + .iter() + .map(|plugin| { + ( + ( + plugin.marketplace_name.clone(), + plugin.id.clone(), + plugin.name.clone(), + ), + plugin.clone(), + ) + }) + .collect::>() + }) + .unwrap_or_default() + }; let mut bundles_changed = false; let mut publishable_plugins = Vec::new(); for plugin in installed_plugins { + let previous_plugin = previous_plugins_by_key + .get(&( + plugin.marketplace_name.clone(), + plugin.id.clone(), + plugin.name.clone(), + )) + .cloned(); + let plugin_id = + match PluginId::new(plugin.name.clone(), plugin.marketplace_name.clone()) { + Ok(plugin_id) => plugin_id, + Err(err) => { + warn!( + remote_plugin_id = %plugin.id, + marketplace = %plugin.marketplace_name, + plugin = %plugin.name, + error = %err, + "ignoring remote installed plugin with invalid local plugin id" + ); + continue; + } + }; + + let release_version = plugin + .release_version + .as_deref() + .map(str::trim) + .filter(|version| !version.is_empty()); + + let Some(release_version) = release_version else { + if self.store.active_plugin_root(&plugin_id).is_some() { + publishable_plugins.push(plugin); + } else if let Some(mut previous_plugin) = previous_plugin { + previous_plugin.enabled = plugin.enabled; + publishable_plugins.push(previous_plugin); + } else { + warn!( + remote_plugin_id = %plugin.id, + marketplace = %plugin.marketplace_name, + plugin = %plugin.name, + "remote installed plugin is missing release metadata and no local bundle is available" + ); + } + continue; + }; + + if let Err(message) = validate_plugin_version_segment(release_version) { + if let Some(mut previous_plugin) = previous_plugin { + previous_plugin.enabled = plugin.enabled; + publishable_plugins.push(previous_plugin); + } + warn!( + remote_plugin_id = %plugin.id, + marketplace = %plugin.marketplace_name, + plugin = %plugin.name, + version = %release_version, + error = %message, + "ignoring remote installed plugin with invalid release version" + ); + continue; + } + + if self.store.active_plugin_version(&plugin_id).as_deref() == Some(release_version) { + publishable_plugins.push(plugin); + continue; + } + let validated_bundle = match validate_remote_plugin_bundle( &plugin.id, &plugin.marketplace_name, &plugin.name, - plugin.release_version.as_deref(), + Some(release_version), plugin.bundle_download_url.as_deref(), ) { Ok(bundle) => bundle, Err(err) => { + if let Some(mut previous_plugin) = previous_plugin { + previous_plugin.enabled = plugin.enabled; + publishable_plugins.push(previous_plugin); + } warn!( remote_plugin_id = %plugin.id, marketplace = %plugin.marketplace_name, @@ -1730,14 +1794,6 @@ impl PluginsManager { continue; } }; - let plugin_id = validated_bundle.plugin_id.clone(); - let plugin_version = validated_bundle.plugin_version.clone(); - if self.store.active_plugin_version(&plugin_id).as_deref() - == Some(plugin_version.as_str()) - { - publishable_plugins.push(plugin); - continue; - } match download_and_install_remote_plugin_bundle( self.codex_home.clone(), validated_bundle, @@ -1755,6 +1811,10 @@ impl PluginsManager { publishable_plugins.push(plugin); } Err(err) => { + if let Some(mut previous_plugin) = previous_plugin { + previous_plugin.enabled = plugin.enabled; + publishable_plugins.push(previous_plugin); + } warn!( remote_plugin_id = %plugin.id, marketplace = %plugin.marketplace_name, diff --git a/codex-rs/core-plugins/src/manager_tests.rs b/codex-rs/core-plugins/src/manager_tests.rs index 117ae3034b..53de77f44b 100644 --- a/codex-rs/core-plugins/src/manager_tests.rs +++ b/codex-rs/core-plugins/src/manager_tests.rs @@ -595,6 +595,46 @@ remote_plugin = true assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global"); } +#[tokio::test] +async fn remote_installed_cache_reloads_when_remote_feature_enablement_changes() { + let codex_home = TempDir::new().unwrap(); + let plugin_base = codex_home + .path() + .join("plugins/cache/chatgpt-global/linear"); + write_plugin(&plugin_base, "local", "linear"); + write_file( + &codex_home.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true +remote_plugin = true +"#, + ); + + let config = load_config(codex_home.path(), codex_home.path()).await; + let manager = PluginsManager::new(codex_home.path().to_path_buf()); + manager.write_remote_installed_plugins_cache(vec![RemoteInstalledPlugin { + marketplace_name: "chatgpt-global".to_string(), + id: "plugins~Plugin_linear".to_string(), + name: "linear".to_string(), + enabled: true, + release_version: Some("local".to_string()), + bundle_download_url: Some("https://example.com/linear.tar.gz".to_string()), + }]); + + let enabled_outcome = manager.plugins_for_test_config(&config).await; + assert_eq!(enabled_outcome.plugins().len(), 1); + + let disabled_outcome = PluginsManager::plugins_for_config( + &manager, + &config.config_layer_stack, + config.plugins_enabled, + /*remote_plugins_enabled*/ false, + config.plugin_hooks_enabled, + ) + .await; + assert_eq!(disabled_outcome, PluginLoadOutcome::default()); +} + #[tokio::test] async fn remote_installed_cache_ignores_plugins_missing_local_cache() { let codex_home = TempDir::new().unwrap(); @@ -622,8 +662,11 @@ remote_plugin = true } #[tokio::test] -async fn remote_installed_cache_ignores_plugins_with_stale_local_version() { +async fn remote_installed_cache_uses_available_local_cache_without_version_gate() { let codex_home = TempDir::new().unwrap(); + let linear_root = codex_home + .path() + .join("plugins/cache/chatgpt-global/linear/1.0.0"); write_plugin( &codex_home.path().join("plugins/cache/chatgpt-global"), "linear/1.0.0", @@ -649,7 +692,51 @@ remote_plugin = true }]); let outcome = manager.plugins_for_test_config(&config).await; - assert_eq!(outcome, PluginLoadOutcome::default()); + assert_eq!( + outcome.effective_skill_roots(), + vec![AbsolutePathBuf::try_from(linear_root.join("skills")).unwrap()] + ); + assert_eq!(outcome.plugins().len(), 1); + assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global"); +} + +#[tokio::test] +async fn remote_installed_cache_uses_available_local_cache_without_release_metadata() { + let codex_home = TempDir::new().unwrap(); + let linear_root = codex_home + .path() + .join("plugins/cache/chatgpt-global/linear/local"); + write_plugin( + &codex_home.path().join("plugins/cache/chatgpt-global"), + "linear/local", + "linear", + ); + write_file( + &codex_home.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true +remote_plugin = true +"#, + ); + + let config = load_config(codex_home.path(), codex_home.path()).await; + let manager = PluginsManager::new(codex_home.path().to_path_buf()); + manager.write_remote_installed_plugins_cache(vec![RemoteInstalledPlugin { + marketplace_name: "chatgpt-global".to_string(), + id: "plugins~Plugin_linear".to_string(), + name: "linear".to_string(), + enabled: true, + release_version: None, + bundle_download_url: None, + }]); + + let outcome = manager.plugins_for_test_config(&config).await; + assert_eq!( + outcome.effective_skill_roots(), + vec![AbsolutePathBuf::try_from(linear_root.join("skills")).unwrap()] + ); + assert_eq!(outcome.plugins().len(), 1); + assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global"); } async fn mount_remote_installed_plugin_pages( @@ -808,6 +895,62 @@ remote_plugin = true assert_eq!(outcome, PluginLoadOutcome::default()); } +#[tokio::test] +async fn remote_installed_plugins_cache_refresh_preserves_previous_bundle_when_upgrade_unavailable() +{ + let tmp = tempfile::tempdir().unwrap(); + let linear_root = tmp.path().join("plugins/cache/chatgpt-global/linear/1.0.0"); + write_plugin( + &tmp.path().join("plugins/cache/chatgpt-global"), + "linear/1.0.0", + "linear", + ); + write_file( + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true +remote_plugin = true +"#, + ); + + let server = MockServer::start().await; + mount_remote_installed_plugin_pages( + &server, + &remote_installed_plugin_json_with_release( + "linear", /*enabled*/ true, "2.0.0", /*bundle_download_url*/ None, + ), + "", + ) + .await; + + let config = load_config(tmp.path(), tmp.path()).await; + let manager = PluginsManager::new(tmp.path().to_path_buf()); + manager.write_remote_installed_plugins_cache(vec![RemoteInstalledPlugin { + marketplace_name: "chatgpt-global".to_string(), + id: "plugins~Plugin_linear".to_string(), + name: "linear".to_string(), + enabled: true, + release_version: Some("1.0.0".to_string()), + bundle_download_url: Some("https://example.com/linear-1.0.0.tar.gz".to_string()), + }]); + let changed = manager + .refresh_remote_installed_plugins_cache( + &remote_plugin_service_config(&format!("{}/backend-api/", server.uri())), + Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + ) + .await + .unwrap(); + + assert!(!changed); + let outcome = manager.plugins_for_test_config(&config).await; + assert_eq!( + outcome.effective_skill_roots(), + vec![AbsolutePathBuf::try_from(linear_root.join("skills")).unwrap()] + ); + assert_eq!(outcome.plugins().len(), 1); + assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global"); +} + #[tokio::test] async fn remote_installed_plugins_cache_refresh_reconciles_cached_bundles_without_config_writes() { let tmp = tempfile::tempdir().unwrap();