diff --git a/codex-rs/app-server/src/request_processors/plugins.rs b/codex-rs/app-server/src/request_processors/plugins.rs index a4c424d5af..d7394c5a50 100644 --- a/codex-rs/app-server/src/request_processors/plugins.rs +++ b/codex-rs/app-server/src/request_processors/plugins.rs @@ -696,6 +696,12 @@ impl PluginRequestProcessor { let plugins_input = config.plugins_config_input(); let remote_installed_plugin_scopes = remote_installed_plugin_scopes(&config); + plugins_manager.maybe_start_remote_installed_plugin_bundle_sync( + &plugins_input, + auth.clone(), + remote_installed_plugin_scopes.clone(), + Some(self.effective_plugins_changed_callback()), + ); let (mut data, marketplace_load_errors) = self .load_local_installed_and_suggested_plugins( diff --git a/codex-rs/app-server/tests/suite/v2/plugin_list.rs b/codex-rs/app-server/tests/suite/v2/plugin_list.rs index 9a34fbc007..51d5ae896a 100644 --- a/codex-rs/app-server/tests/suite/v2/plugin_list.rs +++ b/codex-rs/app-server/tests/suite/v2/plugin_list.rs @@ -1951,6 +1951,82 @@ plugin_sharing = false Ok(()) } +#[tokio::test] +async fn plugin_installed_starts_remote_installed_bundle_sync() -> Result<()> { + let codex_home = TempDir::new()?; + let server = MockServer::start().await; + std::fs::write( + codex_home.path().join("config.toml"), + format!( + r#"chatgpt_base_url = "{}/backend-api/" + +[features] +plugins = true +remote_plugin = true +plugin_sharing = false +"#, + server.uri() + ), + )?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let bundle_url = mount_remote_plugin_bundle( + &server, + "linear", + remote_plugin_bundle_tar_gz_bytes("linear")?, + ) + .await; + let global_installed_body = + remote_installed_plugin_body(&bundle_url, "1.2.3", /*enabled*/ true); + mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await; + + let mut mcp = McpProcess::new_with_env( + codex_home.path(), + &[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))], + ) + .await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let plugin_installed_request_id = mcp + .send_plugin_installed_request(PluginInstalledParams { + cwds: None, + install_suggestion_plugin_names: None, + }) + .await?; + let response: PluginInstalledResponse = to_response( + timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(plugin_installed_request_id)), + ) + .await??, + )?; + + assert_eq!(response.marketplaces.len(), 1); + assert_eq!(response.marketplaces[0].name, "chatgpt-global"); + assert_eq!( + response.marketplaces[0] + .plugins + .iter() + .map(|plugin| (plugin.id.clone(), plugin.installed, plugin.enabled)) + .collect::>(), + vec![("linear@chatgpt-global".to_string(), true, true)] + ); + let installed_path = codex_home + .path() + .join("plugins/cache/chatgpt-global/linear/1.2.3/.codex-plugin/plugin.json"); + wait_for_path_exists(&installed_path).await?; + wait_for_remote_installed_scope_request(&server, "GLOBAL").await?; + assert_no_remote_installed_scope_request(&server, "WORKSPACE").await?; + Ok(()) +} + #[tokio::test] async fn plugin_list_fetches_workspace_directory_kind_without_remote_plugin_flag() -> Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/core-plugins/src/manager.rs b/codex-rs/core-plugins/src/manager.rs index b3b262349f..c638bc9416 100644 --- a/codex-rs/core-plugins/src/manager.rs +++ b/codex-rs/core-plugins/src/manager.rs @@ -429,6 +429,7 @@ pub struct PluginsManager { struct CachedPluginLoadOutcome { config_version: String, plugin_hooks_enabled: bool, + remote_installed_plugin_scopes: Vec, outcome: PluginLoadOutcome, } @@ -500,17 +501,21 @@ impl PluginsManager { } let plugin_hooks_enabled = config.plugin_hooks_enabled; + let remote_installed_plugin_scopes = config.remote_installed_plugin_scopes(); let config_version = version_for_toml(&config.config_layer_stack.effective_config()); if !force_reload - && let Some(outcome) = - self.cached_enabled_outcome(&config_version, plugin_hooks_enabled) + && let Some(outcome) = self.cached_enabled_outcome( + &config_version, + plugin_hooks_enabled, + &remote_installed_plugin_scopes, + ) { return outcome; } let outcome = load_plugins_from_layer_stack( &config.config_layer_stack, - self.remote_installed_plugin_configs(), + self.remote_installed_plugin_configs(&remote_installed_plugin_scopes), &self.store, self.restriction_product, plugin_hooks_enabled, @@ -524,6 +529,7 @@ impl PluginsManager { *cache = Some(CachedPluginLoadOutcome { config_version, plugin_hooks_enabled, + remote_installed_plugin_scopes, outcome: outcome.clone(), }); outcome @@ -558,7 +564,7 @@ impl PluginsManager { } load_plugins_from_layer_stack( config_layer_stack, - self.remote_installed_plugin_configs(), + self.remote_installed_plugin_configs(&config.remote_installed_plugin_scopes()), &self.store, self.restriction_product, plugin_hooks_feature_enabled, @@ -581,6 +587,7 @@ impl PluginsManager { &self, config_version: &str, plugin_hooks_enabled: bool, + remote_installed_plugin_scopes: &[RemotePluginScope], ) -> Option { match self.cached_enabled_outcome.read() { Ok(cache) => cache @@ -588,6 +595,7 @@ impl PluginsManager { .filter(|cached| { cached.config_version == config_version && cached.plugin_hooks_enabled == plugin_hooks_enabled + && cached.remote_installed_plugin_scopes == remote_installed_plugin_scopes }) .map(|cached| cached.outcome.clone()), Err(err) => err @@ -596,12 +604,16 @@ impl PluginsManager { .filter(|cached| { cached.config_version == config_version && cached.plugin_hooks_enabled == plugin_hooks_enabled + && cached.remote_installed_plugin_scopes == remote_installed_plugin_scopes }) .map(|cached| cached.outcome.clone()), } } - fn remote_installed_plugin_configs(&self) -> HashMap { + fn remote_installed_plugin_configs( + &self, + scopes: &[RemotePluginScope], + ) -> HashMap { let cache = match self.remote_installed_plugins_cache.read() { Ok(cache) => cache, Err(err) => err.into_inner(), @@ -610,7 +622,16 @@ impl PluginsManager { return HashMap::new(); }; - remote_installed_plugins_to_config(&cache.plugins, &self.store) + let plugins = cache + .plugins + .iter() + .filter(|plugin| { + remote_installed_scope_allows_marketplace(scopes, &plugin.marketplace_name) + }) + .cloned() + .collect::>(); + + remote_installed_plugins_to_config(&plugins, &self.store) } pub fn build_remote_installed_plugin_marketplaces_from_cache( @@ -771,23 +792,29 @@ impl PluginsManager { ); } - fn maybe_start_remote_installed_plugin_bundle_sync( + pub fn maybe_start_remote_installed_plugin_bundle_sync( self: &Arc, config: &PluginsConfigInput, auth: Option, + scopes: Vec, on_effective_plugins_changed: Option>, ) { if !config.plugins_enabled { return; } + if scopes.is_empty() { + return; + } let manager = Arc::clone(self); let config_for_refresh = config.clone(); let auth_for_refresh = auth.clone(); + let scopes_for_refresh = scopes.clone(); let on_local_cache_changed = Arc::new(move || { - manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation( + manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation_for_scopes( &config_for_refresh, auth_for_refresh.clone(), + scopes_for_refresh.clone(), on_effective_plugins_changed.clone(), ); }); @@ -796,6 +823,7 @@ impl PluginsManager { self.codex_home.clone(), remote_plugin_service_config(config), auth, + scopes, Some(on_local_cache_changed), ); } @@ -816,6 +844,7 @@ impl PluginsManager { self.maybe_start_remote_installed_plugin_bundle_sync( config, auth, + config.remote_installed_plugin_scopes(), on_effective_plugins_changed, ); } @@ -1608,6 +1637,7 @@ impl PluginsManager { manager.maybe_start_remote_installed_plugin_bundle_sync( &config_for_remote_sync, auth, + config_for_remote_sync.remote_installed_plugin_scopes(), on_effective_plugins_changed, ); }); diff --git a/codex-rs/core-plugins/src/remote/remote_installed_plugin_sync.rs b/codex-rs/core-plugins/src/remote/remote_installed_plugin_sync.rs index b13ca7a630..5f6f45fa6d 100644 --- a/codex-rs/core-plugins/src/remote/remote_installed_plugin_sync.rs +++ b/codex-rs/core-plugins/src/remote/remote_installed_plugin_sync.rs @@ -65,6 +65,7 @@ pub enum RemoteInstalledPluginBundleSyncError { #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct RemoteInstalledPluginBundleSyncKey { plugin_cache_root: PathBuf, + scopes: Vec, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -78,25 +79,35 @@ pub struct RemotePluginCacheMutationGuard { key: RemotePluginCacheMutationKey, } -pub fn maybe_start_remote_installed_plugin_bundle_sync( +pub(crate) fn maybe_start_remote_installed_plugin_bundle_sync( codex_home: PathBuf, config: RemotePluginServiceConfig, auth: Option, + scopes: Vec, on_local_cache_changed: Option>, ) { + if scopes.is_empty() { + return; + } let Some(auth) = auth else { return; }; let key = RemoteInstalledPluginBundleSyncKey { plugin_cache_root: remote_plugin_cache_root(&codex_home), + scopes: scopes.clone(), }; if !mark_remote_installed_plugin_bundle_sync_in_flight(key.clone()) { return; } tokio::spawn(async move { - let result = - sync_remote_installed_plugin_bundles_once(codex_home, &config, Some(&auth)).await; + let result = sync_remote_installed_plugin_bundles_once_for_scopes( + codex_home, + &config, + Some(&auth), + &scopes, + ) + .await; match result { Ok(outcome) => { if outcome.changed_local_cache() @@ -127,50 +138,55 @@ pub async fn sync_remote_installed_plugin_bundles_once( config: &RemotePluginServiceConfig, auth: Option<&CodexAuth>, ) -> Result { - let auth = ensure_chatgpt_auth(auth)?; - let global = async { - let scope = RemotePluginScope::Global; - let installed_plugins = fetch_installed_plugins_for_scope_with_download_url( - config, auth, scope, /*include_download_urls*/ true, - ) - .await?; - Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) - }; - let workspace = async { - let scope = RemotePluginScope::Workspace; - let installed_plugins = fetch_installed_plugins_for_scope_with_download_url( - config, auth, scope, /*include_download_urls*/ true, - ) - .await?; - Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) - }; + sync_remote_installed_plugin_bundles_once_for_scopes( + codex_home, + config, + auth, + &[RemotePluginScope::Global, RemotePluginScope::Workspace], + ) + .await +} + +async fn sync_remote_installed_plugin_bundles_once_for_scopes( + codex_home: PathBuf, + config: &RemotePluginServiceConfig, + auth: Option<&CodexAuth>, + scopes: &[RemotePluginScope], +) -> Result { + if scopes.is_empty() { + return Ok(RemoteInstalledPluginBundleSyncOutcome::default()); + } + let auth = ensure_chatgpt_auth(auth)?; + let mut installed_plugins_by_scope = Vec::new(); + for scope in scopes { + let installed_plugins = fetch_installed_plugins_for_scope_with_download_url( + config, auth, *scope, /*include_download_urls*/ true, + ) + .await?; + installed_plugins_by_scope.push((*scope, installed_plugins)); + } - let (global, workspace) = tokio::try_join!(global, workspace)?; let store = PluginStore::try_new(codex_home.clone())?; - let mut installed_plugin_names_by_marketplace = - BTreeMap::>::from_iter([ - (REMOTE_GLOBAL_MARKETPLACE_NAME.to_string(), BTreeSet::new()), - ( - REMOTE_WORKSPACE_MARKETPLACE_NAME.to_string(), - BTreeSet::new(), - ), - ( - REMOTE_WORKSPACE_SHARED_WITH_ME_MARKETPLACE_NAME.to_string(), - BTreeSet::new(), - ), - ( - REMOTE_WORKSPACE_SHARED_WITH_ME_PRIVATE_MARKETPLACE_NAME.to_string(), - BTreeSet::new(), - ), - ( - REMOTE_WORKSPACE_SHARED_WITH_ME_UNLISTED_MARKETPLACE_NAME.to_string(), - BTreeSet::new(), - ), - ]); + let mut installed_plugin_names_by_marketplace = BTreeMap::>::new(); + if scopes.contains(&RemotePluginScope::Global) { + installed_plugin_names_by_marketplace + .insert(REMOTE_GLOBAL_MARKETPLACE_NAME.to_string(), BTreeSet::new()); + } + if scopes.contains(&RemotePluginScope::Workspace) { + for marketplace_name in [ + REMOTE_WORKSPACE_MARKETPLACE_NAME, + REMOTE_WORKSPACE_SHARED_WITH_ME_MARKETPLACE_NAME, + REMOTE_WORKSPACE_SHARED_WITH_ME_PRIVATE_MARKETPLACE_NAME, + REMOTE_WORKSPACE_SHARED_WITH_ME_UNLISTED_MARKETPLACE_NAME, + ] { + installed_plugin_names_by_marketplace + .insert(marketplace_name.to_string(), BTreeSet::new()); + } + } let mut installed_plugin_ids = BTreeSet::new(); let mut failed_remote_plugin_ids = BTreeSet::new(); - for (_scope, installed_plugins) in [global, workspace] { + for (_scope, installed_plugins) in installed_plugins_by_scope { for installed_plugin in installed_plugins { let plugin = installed_plugin.plugin; let marketplace_name = remote_plugin_canonical_marketplace_name(&plugin)?.to_string(); @@ -305,21 +321,11 @@ fn remove_stale_remote_plugin_caches( installed_plugin_names_by_marketplace: &BTreeMap>, ) -> Result, String> { let mut removed_cache_plugin_ids = Vec::new(); - for marketplace_name in [ - REMOTE_GLOBAL_MARKETPLACE_NAME, - REMOTE_WORKSPACE_MARKETPLACE_NAME, - REMOTE_WORKSPACE_SHARED_WITH_ME_MARKETPLACE_NAME, - REMOTE_WORKSPACE_SHARED_WITH_ME_PRIVATE_MARKETPLACE_NAME, - REMOTE_WORKSPACE_SHARED_WITH_ME_UNLISTED_MARKETPLACE_NAME, - ] { + for (marketplace_name, installed_plugin_names) in installed_plugin_names_by_marketplace { let marketplace_root = codex_home.join(PLUGINS_CACHE_DIR).join(marketplace_name); if !marketplace_root.exists() { continue; } - let installed_plugin_names = installed_plugin_names_by_marketplace - .get(marketplace_name) - .cloned() - .unwrap_or_default(); for entry in fs::read_dir(&marketplace_root).map_err(|err| { format!( "failed to read remote plugin cache directory {}: {err}", @@ -430,6 +436,7 @@ mod tests { let codex_home = tempfile::tempdir().expect("create codex home"); let key = RemoteInstalledPluginBundleSyncKey { plugin_cache_root: remote_plugin_cache_root(codex_home.path()), + scopes: vec![RemotePluginScope::Global, RemotePluginScope::Workspace], }; assert!(mark_remote_installed_plugin_bundle_sync_in_flight(