From e0856c650562818c6768d509f80a1d28aa68bdac Mon Sep 17 00:00:00 2001 From: xli-oai Date: Wed, 29 Apr 2026 21:42:46 -0700 Subject: [PATCH] Restore legacy remote plugin startup sync --- .../app-server/tests/suite/v2/plugin_list.rs | 99 ++++ codex-rs/config/src/mcp_edit.rs | 92 ++++ codex-rs/config/src/mcp_edit_tests.rs | 37 ++ codex-rs/core-plugins/Cargo.toml | 2 +- codex-rs/core-plugins/src/manager.rs | 345 ++++++++++++- codex-rs/core-plugins/src/manager_tests.rs | 461 ++++++++++++++++++ .../core-plugins/src/remote_startup_sync.rs | 84 +++- .../src/remote_startup_sync_tests.rs | 198 ++++---- 8 files changed, 1206 insertions(+), 112 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/plugin_list.rs b/codex-rs/app-server/tests/suite/v2/plugin_list.rs index dad606040a..0d119694dc 100644 --- a/codex-rs/app-server/tests/suite/v2/plugin_list.rs +++ b/codex-rs/app-server/tests/suite/v2/plugin_list.rs @@ -34,6 +34,7 @@ use wiremock::matchers::query_param; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567"; +const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1"; const TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS: &str = "CODEX_TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS"; const ALTERNATE_MARKETPLACE_RELATIVE_PATH: &str = ".claude-plugin/marketplace.json"; @@ -1047,6 +1048,91 @@ async fn plugin_list_accepts_legacy_string_default_prompt() -> Result<()> { Ok(()) } +#[tokio::test] +async fn app_server_startup_remote_plugin_sync_runs_once() -> Result<()> { + let codex_home = TempDir::new()?; + let server = MockServer::start().await; + write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + write_openai_curated_marketplace(codex_home.path(), &["linear"])?; + + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true} +]"#, + )) + .mount(&server) + .await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/featured")) + .and(query_param("platform", "codex")) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#)) + .mount(&server) + .await; + + let marker_path = codex_home + .path() + .join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE); + + { + let mut mcp = McpProcess::new_with_plugin_startup_tasks(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + wait_for_path_exists(&marker_path).await?; + wait_for_remote_plugin_request_count(&server, "/plugins/list", /*expected_count*/ 1) + .await?; + let request_id = mcp + .send_plugin_list_request(PluginListParams { cwds: None }) + .await?; + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let response: PluginListResponse = to_response(response)?; + let curated_marketplace = response + .marketplaces + .into_iter() + .find(|marketplace| marketplace.name == "openai-curated") + .expect("expected openai-curated marketplace entry"); + assert_eq!( + curated_marketplace + .plugins + .into_iter() + .map(|plugin| (plugin.id, plugin.installed, plugin.enabled)) + .collect::>(), + vec![("linear@openai-curated".to_string(), true, true)] + ); + wait_for_remote_plugin_request_count(&server, "/plugins/list", /*expected_count*/ 1) + .await?; + } + + let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?; + assert!(config.contains(r#"[plugins."linear@openai-curated"]"#)); + + { + let mut mcp = McpProcess::new_with_plugin_startup_tasks(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + } + + tokio::time::sleep(Duration::from_millis(250)).await; + wait_for_remote_plugin_request_count(&server, "/plugins/list", /*expected_count*/ 1).await?; + Ok(()) +} + #[tokio::test] async fn app_server_startup_refreshes_remote_installed_cache_each_process() -> Result<()> { let codex_home = TempDir::new()?; @@ -1658,6 +1744,19 @@ async fn wait_for_remote_plugin_request_count( Ok(()) } +async fn wait_for_path_exists(path: &std::path::Path) -> Result<()> { + timeout(DEFAULT_TIMEOUT, async { + loop { + if path.exists() { + return Ok::<(), anyhow::Error>(()); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await??; + Ok(()) +} + fn write_installed_plugin( codex_home: &TempDir, marketplace_name: &str, diff --git a/codex-rs/config/src/mcp_edit.rs b/codex-rs/config/src/mcp_edit.rs index f5881a1257..f7241159a2 100644 --- a/codex-rs/config/src/mcp_edit.rs +++ b/codex-rs/config/src/mcp_edit.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use tokio::task; use toml::Value as TomlValue; use toml_edit::DocumentMut; +use toml_edit::InlineTable; use toml_edit::Item as TomlItem; use toml_edit::Table as TomlTable; use toml_edit::value; @@ -62,6 +63,12 @@ fn ensure_no_inline_bearer_tokens(value: &TomlValue) -> std::io::Result<()> { pub struct ConfigEditsBuilder { codex_home: PathBuf, mcp_servers: Option>, + plugin_edits: Vec, +} + +enum PluginConfigEdit { + SetEnabled { plugin_id: String, enabled: bool }, + Clear { plugin_id: String }, } impl ConfigEditsBuilder { @@ -69,6 +76,7 @@ impl ConfigEditsBuilder { Self { codex_home: codex_home.to_path_buf(), mcp_servers: None, + plugin_edits: Vec::new(), } } @@ -77,6 +85,21 @@ impl ConfigEditsBuilder { self } + pub fn set_plugin_enabled(mut self, plugin_id: &str, enabled: bool) -> Self { + self.plugin_edits.push(PluginConfigEdit::SetEnabled { + plugin_id: plugin_id.to_string(), + enabled, + }); + self + } + + pub fn clear_plugin(mut self, plugin_id: &str) -> Self { + self.plugin_edits.push(PluginConfigEdit::Clear { + plugin_id: plugin_id.to_string(), + }); + self + } + pub async fn apply(self) -> std::io::Result<()> { task::spawn_blocking(move || self.apply_blocking()) .await @@ -91,6 +114,9 @@ impl ConfigEditsBuilder { if let Some(servers) = self.mcp_servers.as_ref() { replace_mcp_servers(&mut doc, servers); } + for edit in &self.plugin_edits { + apply_plugin_config_edit(&mut doc, edit); + } fs::create_dir_all(&self.codex_home)?; fs::write(config_path, doc.to_string()) } @@ -121,6 +147,72 @@ fn replace_mcp_servers(doc: &mut DocumentMut, servers: &BTreeMap { + set_plugin_enabled(doc, plugin_id, *enabled); + } + PluginConfigEdit::Clear { plugin_id } => { + clear_plugin(doc, plugin_id); + } + } +} + +fn set_plugin_enabled(doc: &mut DocumentMut, plugin_id: &str, enabled: bool) { + let root = doc.as_table_mut(); + let plugins = ensure_table(root, "plugins", /*implicit*/ true); + let plugin = ensure_table(plugins, plugin_id, /*implicit*/ false); + plugin["enabled"] = value(enabled); +} + +fn clear_plugin(doc: &mut DocumentMut, plugin_id: &str) { + let root = doc.as_table_mut(); + if !root.contains_key("plugins") { + return; + } + let plugins = ensure_table(root, "plugins", /*implicit*/ true); + plugins.remove(plugin_id); +} + +fn ensure_table<'a>(parent: &'a mut TomlTable, key: &str, implicit: bool) -> &'a mut TomlTable { + match parent.get_mut(key) { + Some(TomlItem::Table(_)) => {} + Some(item @ TomlItem::Value(_)) => { + if let Some(inline) = item.as_value().and_then(toml_edit::Value::as_inline_table) { + *item = TomlItem::Table(table_from_inline(inline, implicit)); + } else { + *item = TomlItem::Table(new_table(implicit)); + } + } + Some(item) => { + *item = TomlItem::Table(new_table(implicit)); + } + None => { + parent.insert(key, TomlItem::Table(new_table(implicit))); + } + } + let Some(TomlItem::Table(table)) = parent.get_mut(key) else { + unreachable!("inserted value should be a table"); + }; + table +} + +fn new_table(implicit: bool) -> TomlTable { + let mut table = TomlTable::new(); + table.set_implicit(implicit); + table +} + +fn table_from_inline(inline: &InlineTable, implicit: bool) -> TomlTable { + let mut table = new_table(implicit); + for (key, value) in inline.iter() { + let mut value = value.clone(); + value.decor_mut().set_suffix(""); + table.insert(key, TomlItem::Value(value)); + } + table +} + fn serialize_mcp_server(config: &McpServerConfig) -> TomlItem { let mut entry = TomlTable::new(); entry.set_implicit(false); diff --git a/codex-rs/config/src/mcp_edit_tests.rs b/codex-rs/config/src/mcp_edit_tests.rs index cfcd73c3e5..8ba6607a57 100644 --- a/codex-rs/config/src/mcp_edit_tests.rs +++ b/codex-rs/config/src/mcp_edit_tests.rs @@ -82,3 +82,40 @@ approval_mode = "approve" Ok(()) } + +#[tokio::test] +async fn plugin_edits_set_and_clear_enabled_entries() -> anyhow::Result<()> { + let unique_suffix = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos(); + let codex_home = std::env::temp_dir().join(format!( + "codex-config-plugin-edit-test-{}-{unique_suffix}", + std::process::id() + )); + std::fs::create_dir_all(&codex_home)?; + std::fs::write( + codex_home.join(CONFIG_TOML_FILE), + r#"[plugins."linear@openai-curated"] +enabled = false + +[plugins."gmail@openai-curated"] +enabled = true +"#, + )?; + + ConfigEditsBuilder::new(&codex_home) + .set_plugin_enabled("linear@openai-curated", true) + .clear_plugin("gmail@openai-curated") + .apply() + .await?; + + let serialized = std::fs::read_to_string(codex_home.join(CONFIG_TOML_FILE))?; + assert_eq!( + serialized, + r#"[plugins."linear@openai-curated"] +enabled = true +"# + ); + + std::fs::remove_dir_all(&codex_home)?; + + Ok(()) +} diff --git a/codex-rs/core-plugins/Cargo.toml b/codex-rs/core-plugins/Cargo.toml index be304a4f85..7c8e8fb760 100644 --- a/codex-rs/core-plugins/Cargo.toml +++ b/codex-rs/core-plugins/Cargo.toml @@ -37,7 +37,7 @@ serde_json = { workspace = true } tar = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true, features = ["fs", "macros", "rt", "time"] } +tokio = { workspace = true, features = ["fs", "macros", "rt", "sync", "time"] } toml = { workspace = true } tracing = { workspace = true } url = { workspace = true } diff --git a/codex-rs/core-plugins/src/manager.rs b/codex-rs/core-plugins/src/manager.rs index 6596203a97..3eed4ca9d6 100644 --- a/codex-rs/core-plugins/src/manager.rs +++ b/codex-rs/core-plugins/src/manager.rs @@ -27,6 +27,7 @@ use crate::marketplace::ResolvedMarketplacePlugin; use crate::marketplace::find_installable_marketplace_plugin; use crate::marketplace::find_marketplace_plugin; use crate::marketplace::list_marketplaces; +use crate::marketplace::load_marketplace; use crate::marketplace::plugin_interface_with_marketplace_category; use crate::marketplace_upgrade::ConfiguredMarketplaceUpgradeError; use crate::marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome; @@ -48,6 +49,7 @@ use crate::store::PluginInstallResult as StorePluginInstallResult; use crate::store::PluginStore; use crate::store::PluginStoreError; use codex_analytics::AnalyticsEventsClient; +use codex_config::ConfigEditsBuilder; use codex_config::ConfigLayerStack; use codex_config::types::PluginConfig; use codex_config::version_for_toml; @@ -69,6 +71,8 @@ use std::sync::RwLock; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Instant; +use tokio::sync::Semaphore; +use tracing::info; use tracing::warn; static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false); @@ -230,6 +234,99 @@ pub struct ConfiguredMarketplaceListOutcome { pub errors: Vec, } +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct RemotePluginSyncResult { + /// Plugin ids newly installed into the local plugin cache. + pub installed_plugin_ids: Vec, + /// Plugin ids whose local config was changed to enabled. + pub enabled_plugin_ids: Vec, + /// Plugin ids whose local config was changed to disabled. + /// This is not populated by `sync_plugins_from_remote`. + pub disabled_plugin_ids: Vec, + /// Plugin ids removed from local cache or plugin config. + pub uninstalled_plugin_ids: Vec, +} + +#[derive(Debug, thiserror::Error)] +pub enum PluginRemoteSyncError { + #[error("chatgpt authentication required to sync remote plugins")] + AuthRequired, + + #[error( + "chatgpt authentication required to sync remote plugins; api key auth is not supported" + )] + UnsupportedAuthMode, + + #[error("failed to read auth token for remote plugin sync: {0}")] + AuthToken(#[source] std::io::Error), + + #[error("failed to send remote plugin sync request to {url}: {source}")] + Request { + url: String, + #[source] + source: reqwest::Error, + }, + + #[error("remote plugin sync request to {url} failed with status {status}: {body}")] + UnexpectedStatus { + url: String, + status: reqwest::StatusCode, + body: String, + }, + + #[error("failed to parse remote plugin sync response from {url}: {source}")] + Decode { + url: String, + #[source] + source: serde_json::Error, + }, + + #[error("local curated marketplace is not available")] + LocalMarketplaceNotFound, + + #[error("remote marketplace `{marketplace_name}` is not available locally")] + UnknownRemoteMarketplace { marketplace_name: String }, + + #[error("duplicate remote plugin `{plugin_name}` in sync response")] + DuplicateRemotePlugin { plugin_name: String }, + + #[error("{0}")] + InvalidPluginId(#[from] PluginIdError), + + #[error("{0}")] + Marketplace(#[from] MarketplaceError), + + #[error("{0}")] + Store(#[from] PluginStoreError), + + #[error("{0}")] + Config(#[from] anyhow::Error), + + #[error("failed to join remote plugin sync task: {0}")] + Join(#[from] tokio::task::JoinError), +} + +impl PluginRemoteSyncError { + fn join(source: tokio::task::JoinError) -> Self { + Self::Join(source) + } +} + +impl From for PluginRemoteSyncError { + fn from(value: RemotePluginFetchError) -> Self { + match value { + RemotePluginFetchError::AuthRequired => Self::AuthRequired, + RemotePluginFetchError::UnsupportedAuthMode => Self::UnsupportedAuthMode, + RemotePluginFetchError::AuthToken(source) => Self::AuthToken(source), + RemotePluginFetchError::Request { url, source } => Self::Request { url, source }, + RemotePluginFetchError::UnexpectedStatus { url, status, body } => { + Self::UnexpectedStatus { url, status, body } + } + RemotePluginFetchError::Decode { url, source } => Self::Decode { url, source }, + } + } +} + impl From for PluginCapabilitySummary { fn from(value: PluginDetail) -> Self { let has_skills = value.skills.iter().any(|skill| { @@ -259,6 +356,7 @@ pub struct PluginsManager { // remote installed state cannot remain effective for a different account. remote_installed_plugins_cache: RwLock>>, remote_installed_plugins_cache_refresh_state: RwLock, + remote_sync_lock: Semaphore, restriction_product: Option, analytics_events_client: RwLock>, } @@ -299,6 +397,7 @@ impl PluginsManager { remote_installed_plugins_cache_refresh_state: RwLock::new( RemoteInstalledPluginsCacheRefreshState::default(), ), + remote_sync_lock: Semaphore::new(/*permits*/ 1), restriction_product, analytics_events_client: RwLock::new(None), } @@ -799,6 +898,222 @@ impl PluginsManager { Ok(()) } + pub async fn sync_plugins_from_remote( + &self, + config_layer_stack: &ConfigLayerStack, + plugins_enabled: bool, + chatgpt_base_url: &str, + auth: Option<&CodexAuth>, + additive_only: bool, + ) -> Result { + let _remote_sync_guard = self.remote_sync_lock.acquire().await.map_err(|_| { + PluginRemoteSyncError::Config(anyhow::anyhow!("remote plugin sync semaphore closed")) + })?; + + if !plugins_enabled { + return Ok(RemotePluginSyncResult::default()); + } + + info!("starting remote plugin sync"); + let remote_plugins = crate::remote_legacy::fetch_remote_plugin_status( + &remote_plugin_service_config(chatgpt_base_url), + auth, + ) + .await + .map_err(PluginRemoteSyncError::from)?; + let configured_plugins = configured_plugins_from_stack(config_layer_stack); + let curated_marketplace_root = curated_plugins_repo_path(self.codex_home.as_path()); + let curated_marketplace_path = AbsolutePathBuf::try_from( + curated_marketplace_root.join(".agents/plugins/marketplace.json"), + ) + .map_err(|_| PluginRemoteSyncError::LocalMarketplaceNotFound)?; + let curated_marketplace = match load_marketplace(&curated_marketplace_path) { + Ok(marketplace) => marketplace, + Err(MarketplaceError::MarketplaceNotFound { .. }) => { + return Err(PluginRemoteSyncError::LocalMarketplaceNotFound); + } + Err(err) => return Err(err.into()), + }; + + let marketplace_name = curated_marketplace.name.clone(); + let curated_plugin_version = read_curated_plugins_sha(self.codex_home.as_path()) + .ok_or_else(|| { + PluginStoreError::Invalid( + "local curated marketplace sha is not available".to_string(), + ) + })?; + let cache_plugin_version = curated_plugin_cache_version(&curated_plugin_version); + let mut local_plugins = Vec::<( + String, + PluginId, + AbsolutePathBuf, + Option, + Option, + bool, + )>::new(); + let mut local_plugin_names = HashSet::new(); + for plugin in curated_marketplace.plugins { + let plugin_name = plugin.name; + if !local_plugin_names.insert(plugin_name.clone()) { + warn!( + plugin = plugin_name, + marketplace = %marketplace_name, + "ignoring duplicate local plugin entry during remote sync" + ); + continue; + } + + let plugin_id = PluginId::new(plugin_name.clone(), marketplace_name.clone())?; + let plugin_key = plugin_id.as_key(); + let source_path = match plugin.source { + MarketplacePluginSource::Local { path } => path, + MarketplacePluginSource::Git { .. } => { + warn!( + plugin = plugin_name, + marketplace = %marketplace_name, + "skipping remote plugin source during remote sync" + ); + continue; + } + }; + let current_enabled = configured_plugins + .get(&plugin_key) + .map(|plugin| plugin.enabled); + let installed_version = self.store.active_plugin_version(&plugin_id); + let product_allowed = + self.restriction_product_matches(plugin.policy.products.as_deref()); + local_plugins.push(( + plugin_name, + plugin_id, + source_path, + current_enabled, + installed_version, + product_allowed, + )); + } + + let mut missing_remote_plugins = Vec::::new(); + let mut remote_installed_plugin_names = HashSet::::new(); + for plugin in remote_plugins { + if plugin.marketplace_name != marketplace_name { + return Err(PluginRemoteSyncError::UnknownRemoteMarketplace { + marketplace_name: plugin.marketplace_name, + }); + } + if !local_plugin_names.contains(&plugin.name) { + missing_remote_plugins.push(plugin.name); + continue; + } + // For this legacy sync path, remote `enabled = false` means "not installed". + if !plugin.enabled { + continue; + } + if !remote_installed_plugin_names.insert(plugin.name.clone()) { + return Err(PluginRemoteSyncError::DuplicateRemotePlugin { + plugin_name: plugin.name, + }); + } + } + + let mut config_edits = ConfigEditsBuilder::new(&self.codex_home); + let mut has_config_edits = false; + let mut installs = Vec::new(); + let mut uninstalls = Vec::new(); + let mut result = RemotePluginSyncResult::default(); + let remote_plugin_count = remote_installed_plugin_names.len(); + let local_plugin_count = local_plugins.len(); + if !missing_remote_plugins.is_empty() { + let sample_missing_plugins = missing_remote_plugins + .iter() + .take(10) + .cloned() + .collect::>(); + warn!( + marketplace = %marketplace_name, + missing_remote_plugin_count = missing_remote_plugins.len(), + missing_remote_plugin_examples = ?sample_missing_plugins, + "ignoring remote plugins missing from local marketplace during sync" + ); + } + + for ( + plugin_name, + plugin_id, + source_path, + current_enabled, + installed_version, + product_allowed, + ) in local_plugins + { + let plugin_key = plugin_id.as_key(); + let is_installed = installed_version.is_some(); + if !product_allowed { + continue; + } + if remote_installed_plugin_names.contains(&plugin_name) { + if !is_installed { + installs.push((source_path, plugin_id.clone(), cache_plugin_version.clone())); + result.installed_plugin_ids.push(plugin_key.clone()); + } + + if current_enabled != Some(true) { + result.enabled_plugin_ids.push(plugin_key.clone()); + config_edits = config_edits.set_plugin_enabled(&plugin_key, true); + has_config_edits = true; + } + } else if !additive_only { + if is_installed { + uninstalls.push(plugin_id); + } + if is_installed || current_enabled.is_some() { + result.uninstalled_plugin_ids.push(plugin_key.clone()); + } + if current_enabled.is_some() { + config_edits = config_edits.clear_plugin(&plugin_key); + has_config_edits = true; + } + } + } + + let store = self.store.clone(); + let store_result = tokio::task::spawn_blocking(move || { + for (source_path, plugin_id, plugin_version) in installs { + store.install_with_version(source_path, plugin_id, plugin_version)?; + } + for plugin_id in uninstalls { + store.uninstall(&plugin_id)?; + } + Ok::<(), PluginStoreError>(()) + }) + .await + .map_err(PluginRemoteSyncError::join)?; + if let Err(err) = store_result { + self.clear_cache(); + return Err(err.into()); + } + + let config_result = if has_config_edits { + config_edits.apply().await.map_err(anyhow::Error::from) + } else { + Ok(()) + }; + self.clear_cache(); + config_result?; + + info!( + marketplace = %marketplace_name, + remote_plugin_count, + local_plugin_count, + installed_plugin_ids = ?result.installed_plugin_ids, + enabled_plugin_ids = ?result.enabled_plugin_ids, + disabled_plugin_ids = ?result.disabled_plugin_ids, + uninstalled_plugin_ids = ?result.uninstalled_plugin_ids, + "completed remote plugin sync" + ); + + Ok(result) + } + pub fn list_marketplaces_for_config( &self, config_layer_stack: &ConfigLayerStack, @@ -1043,7 +1358,7 @@ impl PluginsManager { }; if should_spawn_marketplace_auto_upgrade { let manager = Arc::clone(self); - let config_layer_stack_for_upgrade = config_layer_stack; + let config_layer_stack_for_upgrade = config_layer_stack.clone(); if let Err(err) = std::thread::Builder::new() .name("plugins-marketplace-auto-upgrade".to_string()) .spawn(move || { @@ -1082,14 +1397,28 @@ impl PluginsManager { } } + start_startup_remote_plugin_sync_once(RemoteStartupPluginSyncRequest { + manager: Arc::clone(self), + codex_home: self.codex_home.clone(), + config_layer_stack, + plugins_enabled, + chatgpt_base_url: chatgpt_base_url.clone(), + auth_manager: auth_manager.clone(), + }); + if remote_plugins_enabled { - start_startup_remote_plugin_sync_once(RemoteStartupPluginSyncRequest { - manager: Arc::clone(self), - plugins_enabled, - remote_plugins_enabled, - chatgpt_base_url: chatgpt_base_url.clone(), - auth_manager: auth_manager.clone(), - on_effective_plugins_changed: on_effective_plugins_changed.clone(), + let manager = Arc::clone(self); + let auth_manager = auth_manager.clone(); + let chatgpt_base_url_for_remote_refresh = chatgpt_base_url.clone(); + tokio::spawn(async move { + let auth = auth_manager.auth().await; + manager.maybe_start_remote_installed_plugins_cache_refresh( + plugins_enabled, + remote_plugins_enabled, + chatgpt_base_url_for_remote_refresh, + auth, + on_effective_plugins_changed, + ); }); } diff --git a/codex-rs/core-plugins/src/manager_tests.rs b/codex-rs/core-plugins/src/manager_tests.rs index 4d27f210f7..9ef975fedf 100644 --- a/codex-rs/core-plugins/src/manager_tests.rs +++ b/codex-rs/core-plugins/src/manager_tests.rs @@ -19,6 +19,7 @@ use crate::remote::RemoteInstalledPlugin; use crate::remote_legacy::RemotePluginFetchError; use crate::startup_sync::curated_plugins_repo_path; use crate::store::PluginStore; +use crate::store::PluginStoreError; use codex_app_server_protocol::ConfigLayerSource; use codex_config::AppToolApproval; use codex_config::CONFIG_TOML_FILE; @@ -2447,6 +2448,31 @@ enabled = true ); } +#[tokio::test] +async fn sync_plugins_from_remote_returns_default_when_feature_disabled() { + let tmp = tempfile::tempdir().unwrap(); + write_file( + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = false +"#, + ); + + let config = load_config(tmp.path(), tmp.path()).await; + let outcome = PluginsManager::new(tmp.path().to_path_buf()) + .sync_plugins_from_remote( + &config.config_layer_stack, + config.plugins_enabled, + &config.chatgpt_base_url, + /*auth*/ None, + /*additive_only*/ false, + ) + .await + .unwrap(); + + assert_eq!(outcome, RemotePluginSyncResult::default()); +} + #[tokio::test] async fn list_marketplaces_includes_curated_repo_marketplace() { let tmp = tempfile::tempdir().unwrap(); @@ -2928,6 +2954,441 @@ enabled = true ); } +#[tokio::test] +async fn sync_plugins_from_remote_reconciles_cache_and_config() { + let tmp = tempfile::tempdir().unwrap(); + let curated_root = curated_plugins_repo_path(tmp.path()); + write_openai_curated_marketplace(&curated_root, &["linear", "gmail", "calendar"]); + write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "linear/local", + "linear", + ); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "gmail/local", + "gmail", + ); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "calendar/local", + "calendar", + ); + write_file( + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true + +[plugins."linear@openai-curated"] +enabled = false + +[plugins."gmail@openai-curated"] +enabled = false + +[plugins."calendar@openai-curated"] +enabled = true +"#, + ); + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .and(header("authorization", "Bearer Access Token")) + .and(header("chatgpt-account-id", "account_id")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}, + {"id":"2","name":"gmail","marketplace_name":"openai-curated","version":"1.0.0","enabled":false} +]"#, + )) + .mount(&server) + .await; + + let mut config = load_config(tmp.path(), tmp.path()).await; + config.chatgpt_base_url = format!("{}/backend-api/", server.uri()); + let manager = PluginsManager::new(tmp.path().to_path_buf()); + let result = manager + .sync_plugins_from_remote( + &config.config_layer_stack, + config.plugins_enabled, + &config.chatgpt_base_url, + Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ false, + ) + .await + .unwrap(); + + assert_eq!( + result, + RemotePluginSyncResult { + installed_plugin_ids: Vec::new(), + enabled_plugin_ids: vec!["linear@openai-curated".to_string()], + disabled_plugin_ids: Vec::new(), + uninstalled_plugin_ids: vec![ + "gmail@openai-curated".to_string(), + "calendar@openai-curated".to_string(), + ], + } + ); + + assert!( + tmp.path() + .join("plugins/cache/openai-curated/linear/local") + .is_dir() + ); + assert!( + !tmp.path() + .join("plugins/cache/openai-curated/gmail") + .exists() + ); + assert!( + !tmp.path() + .join("plugins/cache/openai-curated/calendar") + .exists() + ); + + let config_toml = fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).unwrap(); + assert!(config_toml.contains(r#"[plugins."linear@openai-curated"]"#)); + assert!(config_toml.contains("enabled = true")); + assert!(!config_toml.contains(r#"[plugins."gmail@openai-curated"]"#)); + assert!(!config_toml.contains(r#"[plugins."calendar@openai-curated"]"#)); + + let synced_config = load_config(tmp.path(), tmp.path()).await; + let curated_marketplace = manager + .list_marketplaces_for_test_config(&synced_config, &[]) + .unwrap() + .marketplaces + .into_iter() + .find(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME) + .unwrap(); + assert_eq!( + curated_marketplace + .plugins + .into_iter() + .map(|plugin| (plugin.id, plugin.installed, plugin.enabled)) + .collect::>(), + vec![ + ("linear@openai-curated".to_string(), true, true), + ("gmail@openai-curated".to_string(), false, false), + ("calendar@openai-curated".to_string(), false, false), + ] + ); +} + +#[tokio::test] +async fn sync_plugins_from_remote_additive_only_keeps_existing_plugins() { + let tmp = tempfile::tempdir().unwrap(); + let curated_root = curated_plugins_repo_path(tmp.path()); + write_openai_curated_marketplace(&curated_root, &["linear", "gmail", "calendar"]); + write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "linear/local", + "linear", + ); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "gmail/local", + "gmail", + ); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "calendar/local", + "calendar", + ); + write_file( + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true + +[plugins."linear@openai-curated"] +enabled = false + +[plugins."gmail@openai-curated"] +enabled = false + +[plugins."calendar@openai-curated"] +enabled = true +"#, + ); + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .and(header("authorization", "Bearer Access Token")) + .and(header("chatgpt-account-id", "account_id")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}, + {"id":"2","name":"gmail","marketplace_name":"openai-curated","version":"1.0.0","enabled":false} +]"#, + )) + .mount(&server) + .await; + + let mut config = load_config(tmp.path(), tmp.path()).await; + config.chatgpt_base_url = format!("{}/backend-api/", server.uri()); + let manager = PluginsManager::new(tmp.path().to_path_buf()); + let result = manager + .sync_plugins_from_remote( + &config.config_layer_stack, + config.plugins_enabled, + &config.chatgpt_base_url, + Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ true, + ) + .await + .unwrap(); + + assert_eq!( + result, + RemotePluginSyncResult { + installed_plugin_ids: Vec::new(), + enabled_plugin_ids: vec!["linear@openai-curated".to_string()], + disabled_plugin_ids: Vec::new(), + uninstalled_plugin_ids: Vec::new(), + } + ); + + assert!( + tmp.path() + .join("plugins/cache/openai-curated/linear/local") + .is_dir() + ); + assert!( + tmp.path() + .join("plugins/cache/openai-curated/gmail/local") + .is_dir() + ); + assert!( + tmp.path() + .join("plugins/cache/openai-curated/calendar/local") + .is_dir() + ); + + let config_toml = fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).unwrap(); + assert!(config_toml.contains(r#"[plugins."linear@openai-curated"]"#)); + assert!(config_toml.contains(r#"[plugins."gmail@openai-curated"]"#)); + assert!(config_toml.contains(r#"[plugins."calendar@openai-curated"]"#)); + assert!(config_toml.contains("enabled = true")); +} + +#[tokio::test] +async fn sync_plugins_from_remote_ignores_unknown_remote_plugins() { + let tmp = tempfile::tempdir().unwrap(); + let curated_root = curated_plugins_repo_path(tmp.path()); + write_openai_curated_marketplace(&curated_root, &["linear"]); + write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA); + write_file( + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true + +[plugins."linear@openai-curated"] +enabled = false +"#, + ); + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"plugin-one","marketplace_name":"openai-curated","version":"1.0.0","enabled":true} +]"#, + )) + .mount(&server) + .await; + + let mut config = load_config(tmp.path(), tmp.path()).await; + config.chatgpt_base_url = format!("{}/backend-api/", server.uri()); + let manager = PluginsManager::new(tmp.path().to_path_buf()); + let result = manager + .sync_plugins_from_remote( + &config.config_layer_stack, + config.plugins_enabled, + &config.chatgpt_base_url, + Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ false, + ) + .await + .unwrap(); + + assert_eq!( + result, + RemotePluginSyncResult { + installed_plugin_ids: Vec::new(), + enabled_plugin_ids: Vec::new(), + disabled_plugin_ids: Vec::new(), + uninstalled_plugin_ids: vec!["linear@openai-curated".to_string()], + } + ); + let config_toml = fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).unwrap(); + assert!(!config_toml.contains(r#"[plugins."linear@openai-curated"]"#)); + assert!( + !tmp.path() + .join("plugins/cache/openai-curated/linear") + .exists() + ); +} + +#[tokio::test] +async fn sync_plugins_from_remote_keeps_existing_plugins_when_install_fails() { + let tmp = tempfile::tempdir().unwrap(); + let curated_root = curated_plugins_repo_path(tmp.path()); + write_openai_curated_marketplace(&curated_root, &["linear", "gmail"]); + write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA); + fs::remove_dir_all(curated_root.join("plugins/gmail")).unwrap(); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "linear/local", + "linear", + ); + write_file( + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true + +[plugins."linear@openai-curated"] +enabled = false +"#, + ); + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"gmail","marketplace_name":"openai-curated","version":"1.0.0","enabled":true} +]"#, + )) + .mount(&server) + .await; + + let mut config = load_config(tmp.path(), tmp.path()).await; + config.chatgpt_base_url = format!("{}/backend-api/", server.uri()); + let manager = PluginsManager::new(tmp.path().to_path_buf()); + let err = manager + .sync_plugins_from_remote( + &config.config_layer_stack, + config.plugins_enabled, + &config.chatgpt_base_url, + Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ false, + ) + .await + .unwrap_err(); + + assert!(matches!( + err, + PluginRemoteSyncError::Store(PluginStoreError::Invalid(ref message)) + if message.contains("plugin source path is not a directory") + )); + assert!( + tmp.path() + .join("plugins/cache/openai-curated/linear/local") + .is_dir() + ); + assert!( + !tmp.path() + .join("plugins/cache/openai-curated/gmail") + .exists() + ); + + let config_toml = fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).unwrap(); + assert!(config_toml.contains(r#"[plugins."linear@openai-curated"]"#)); + assert!(!config_toml.contains(r#"[plugins."gmail@openai-curated"]"#)); + assert!(config_toml.contains("enabled = false")); +} + +#[tokio::test] +async fn sync_plugins_from_remote_uses_first_duplicate_local_plugin_entry() { + let tmp = tempfile::tempdir().unwrap(); + let curated_root = curated_plugins_repo_path(tmp.path()); + write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA); + fs::create_dir_all(curated_root.join(".agents/plugins")).unwrap(); + fs::write( + curated_root.join(".agents/plugins/marketplace.json"), + r#"{ + "name": "openai-curated", + "plugins": [ + { + "name": "gmail", + "source": { + "source": "local", + "path": "./plugins/gmail-first" + } + }, + { + "name": "gmail", + "source": { + "source": "local", + "path": "./plugins/gmail-second" + } + } + ] +}"#, + ) + .unwrap(); + write_plugin(&curated_root, "plugins/gmail-first", "gmail"); + write_plugin(&curated_root, "plugins/gmail-second", "gmail"); + fs::write(curated_root.join("plugins/gmail-first/marker.txt"), "first").unwrap(); + fs::write( + curated_root.join("plugins/gmail-second/marker.txt"), + "second", + ) + .unwrap(); + write_file( + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true +"#, + ); + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"gmail","marketplace_name":"openai-curated","version":"1.0.0","enabled":true} +]"#, + )) + .mount(&server) + .await; + + let mut config = load_config(tmp.path(), tmp.path()).await; + config.chatgpt_base_url = format!("{}/backend-api/", server.uri()); + let manager = PluginsManager::new(tmp.path().to_path_buf()); + let result = manager + .sync_plugins_from_remote( + &config.config_layer_stack, + config.plugins_enabled, + &config.chatgpt_base_url, + Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ false, + ) + .await + .unwrap(); + + assert_eq!( + result, + RemotePluginSyncResult { + installed_plugin_ids: vec!["gmail@openai-curated".to_string()], + enabled_plugin_ids: vec!["gmail@openai-curated".to_string()], + disabled_plugin_ids: Vec::new(), + uninstalled_plugin_ids: Vec::new(), + } + ); + assert_eq!( + fs::read_to_string(tmp.path().join(format!( + "plugins/cache/openai-curated/gmail/{TEST_CURATED_PLUGIN_CACHE_VERSION}/marker.txt" + ))) + .unwrap(), + "first" + ); +} + #[tokio::test] async fn featured_plugin_ids_for_test_config_uses_restriction_product_query_param() { let tmp = tempfile::tempdir().unwrap(); diff --git a/codex-rs/core-plugins/src/remote_startup_sync.rs b/codex-rs/core-plugins/src/remote_startup_sync.rs index 9ed0b5f12b..5528354fb9 100644 --- a/codex-rs/core-plugins/src/remote_startup_sync.rs +++ b/codex-rs/core-plugins/src/remote_startup_sync.rs @@ -1,48 +1,81 @@ +use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use crate::manager::PluginsManager; -use crate::manager::remote_plugin_service_config; +use crate::startup_sync::has_local_curated_plugins_snapshot; +use codex_config::ConfigLayerStack; use codex_login::AuthManager; use tracing::info; use tracing::warn; +const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1"; +const STARTUP_REMOTE_PLUGIN_SYNC_PREREQUISITE_TIMEOUT: Duration = Duration::from_secs(10); + pub(crate) struct RemoteStartupPluginSyncRequest { pub(crate) manager: Arc, + pub(crate) codex_home: PathBuf, + pub(crate) config_layer_stack: ConfigLayerStack, pub(crate) plugins_enabled: bool, - pub(crate) remote_plugins_enabled: bool, pub(crate) chatgpt_base_url: String, pub(crate) auth_manager: Arc, - pub(crate) on_effective_plugins_changed: Option>, } pub(crate) fn start_startup_remote_plugin_sync_once(request: RemoteStartupPluginSyncRequest) { let RemoteStartupPluginSyncRequest { manager, + codex_home, + config_layer_stack, plugins_enabled, - remote_plugins_enabled, chatgpt_base_url, auth_manager, - on_effective_plugins_changed, } = request; - if !plugins_enabled || !remote_plugins_enabled { + let marker_path = startup_remote_plugin_sync_marker_path(codex_home.as_path()); + if marker_path.is_file() { return; } tokio::spawn(async move { + if marker_path.is_file() { + return; + } + + if !wait_for_startup_remote_plugin_sync_prerequisites(codex_home.as_path()).await { + warn!( + codex_home = %codex_home.display(), + "skipping startup remote plugin sync because curated marketplace is not ready" + ); + return; + } + let auth = auth_manager.auth().await; match manager - .refresh_remote_installed_plugins_cache( - &remote_plugin_service_config(&chatgpt_base_url), + .sync_plugins_from_remote( + &config_layer_stack, + plugins_enabled, + &chatgpt_base_url, auth.as_ref(), + /*additive_only*/ true, ) .await { - Ok(cache_changed) => { - info!(cache_changed, "completed startup remote plugin sync"); - if cache_changed - && let Some(on_effective_plugins_changed) = on_effective_plugins_changed + Ok(sync_result) => { + info!( + installed_plugin_ids = ?sync_result.installed_plugin_ids, + enabled_plugin_ids = ?sync_result.enabled_plugin_ids, + disabled_plugin_ids = ?sync_result.disabled_plugin_ids, + uninstalled_plugin_ids = ?sync_result.uninstalled_plugin_ids, + "completed startup remote plugin sync" + ); + if let Err(err) = + write_startup_remote_plugin_sync_marker(codex_home.as_path()).await { - on_effective_plugins_changed(); + warn!( + error = %err, + path = %marker_path.display(), + "failed to persist startup remote plugin sync marker" + ); } } Err(err) => { @@ -55,6 +88,31 @@ pub(crate) fn start_startup_remote_plugin_sync_once(request: RemoteStartupPlugin }); } +fn startup_remote_plugin_sync_marker_path(codex_home: &Path) -> PathBuf { + codex_home.join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE) +} + +async fn wait_for_startup_remote_plugin_sync_prerequisites(codex_home: &Path) -> bool { + let deadline = tokio::time::Instant::now() + STARTUP_REMOTE_PLUGIN_SYNC_PREREQUISITE_TIMEOUT; + loop { + if has_local_curated_plugins_snapshot(codex_home) { + return true; + } + if tokio::time::Instant::now() >= deadline { + return false; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } +} + +async fn write_startup_remote_plugin_sync_marker(codex_home: &Path) -> std::io::Result<()> { + let marker_path = startup_remote_plugin_sync_marker_path(codex_home); + if let Some(parent) = marker_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + tokio::fs::write(marker_path, b"ok\n").await +} + #[cfg(test)] #[path = "remote_startup_sync_tests.rs"] mod tests; diff --git a/codex-rs/core-plugins/src/remote_startup_sync_tests.rs b/codex-rs/core-plugins/src/remote_startup_sync_tests.rs index 90f54fe07b..8d3812c329 100644 --- a/codex-rs/core-plugins/src/remote_startup_sync_tests.rs +++ b/codex-rs/core-plugins/src/remote_startup_sync_tests.rs @@ -1,14 +1,19 @@ use super::*; +use crate::OPENAI_CURATED_MARKETPLACE_NAME; use crate::manager::PluginsManager; -use crate::remote::REMOTE_GLOBAL_MARKETPLACE_NAME; +use crate::startup_sync::curated_plugins_repo_path; +use codex_config::CONFIG_TOML_FILE; +use codex_config::ConfigLayerEntry; +use codex_config::ConfigLayerSource; use codex_config::ConfigLayerStack; +use codex_config::ConfigRequirements; +use codex_config::ConfigRequirementsToml; use codex_login::AuthManager; use codex_login::CodexAuth; +use codex_utils_absolute_path::AbsolutePathBuf; use pretty_assertions::assert_eq; use std::path::Path; use std::sync::Arc; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; use std::time::Duration; use tempfile::tempdir; use wiremock::Mock; @@ -17,136 +22,149 @@ use wiremock::ResponseTemplate; use wiremock::matchers::header; use wiremock::matchers::method; use wiremock::matchers::path; -use wiremock::matchers::query_param; -const LEGACY_STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1"; +const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567"; +const TEST_CURATED_PLUGIN_CACHE_VERSION: &str = "01234567"; fn write_file(path: &Path, contents: &str) { std::fs::create_dir_all(path.parent().expect("file should have a parent")).unwrap(); std::fs::write(path, contents).unwrap(); } -fn write_cached_plugin(codex_home: &Path, marketplace_name: &str, plugin_name: &str) { - let plugin_root = codex_home - .join("plugins/cache") - .join(marketplace_name) - .join(plugin_name) - .join("local"); +fn write_curated_plugin(root: &Path, plugin_name: &str) { + let plugin_root = root.join("plugins").join(plugin_name); write_file( &plugin_root.join(".codex-plugin/plugin.json"), &format!(r#"{{"name":"{plugin_name}"}}"#), ); - write_file(&plugin_root.join("skills/SKILL.md"), "skill"); + write_file( + &plugin_root.join("skills/SKILL.md"), + "---\nname: sample\ndescription: sample\n---\n", + ); } -async fn mount_installed_plugins(server: &MockServer) { - let empty_page_body = r#"{ - "plugins": [], - "pagination": { - "limit": 50, - "next_page_token": null - } -}"#; - let global_installed_body = r#"{ +fn write_openai_curated_marketplace(root: &Path, plugin_names: &[&str]) { + let plugins = plugin_names + .iter() + .map(|plugin_name| { + format!( + r#"{{ + "name": "{plugin_name}", + "source": {{ + "source": "local", + "path": "./plugins/{plugin_name}" + }} + }}"# + ) + }) + .collect::>() + .join(",\n"); + write_file( + &root.join(".agents/plugins/marketplace.json"), + &format!( + r#"{{ + "name": "{OPENAI_CURATED_MARKETPLACE_NAME}", "plugins": [ - { - "id": "plugins~Plugin_linear", - "name": "linear", - "scope": "GLOBAL", - "installation_policy": "AVAILABLE", - "authentication_policy": "ON_USE", - "release": { - "version": "local", - "bundle_download_url": "https://example.com/linear.tar.gz", - "display_name": "Linear", - "description": "Track work in Linear", - "app_ids": [], - "interface": { - "short_description": "Plan and track work", - "capabilities": ["Read", "Write"] - }, - "skills": [] - }, - "enabled": true, - "disabled_skill_names": [] +{plugins} + ] +}}"# + ), + ); + for plugin_name in plugin_names { + write_curated_plugin(root, plugin_name); } - ], - "pagination": { - "limit": 50, - "next_page_token": null - } -}"#; +} - Mock::given(method("GET")) - .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", "GLOBAL")) - .and(query_param("includeDownloadUrls", "true")) - .and(header("authorization", "Bearer Access Token")) - .and(header("chatgpt-account-id", "account_id")) - .respond_with(ResponseTemplate::new(200).set_body_string(global_installed_body)) - .mount(server) - .await; - Mock::given(method("GET")) - .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", "WORKSPACE")) - .and(query_param("includeDownloadUrls", "true")) - .and(header("authorization", "Bearer Access Token")) - .and(header("chatgpt-account-id", "account_id")) - .respond_with(ResponseTemplate::new(200).set_body_string(empty_page_body)) - .mount(server) - .await; +fn write_curated_plugin_sha(codex_home: &Path) { + write_file( + &codex_home.join(".tmp/plugins.sha"), + &format!("{TEST_CURATED_PLUGIN_SHA}\n"), + ); +} + +fn load_config_layer_stack(codex_home: &Path) -> ConfigLayerStack { + let config_path = codex_home.join(CONFIG_TOML_FILE); + let config_path = + AbsolutePathBuf::try_from(config_path).expect("config path should be absolute"); + let raw_config = std::fs::read_to_string(config_path.as_path()).expect("config should exist"); + let user_config = toml::from_str::(&raw_config).expect("config should parse"); + ConfigLayerStack::new( + vec![ConfigLayerEntry::new( + ConfigLayerSource::User { file: config_path }, + user_config, + )], + ConfigRequirements::default(), + ConfigRequirementsToml::default(), + ) + .expect("config layer stack should build") } #[tokio::test] -async fn startup_remote_plugin_sync_refreshes_remote_installed_cache() { +async fn startup_remote_plugin_sync_writes_marker_and_reconciles_state() { let tmp = tempdir().expect("tempdir"); - write_cached_plugin(tmp.path(), REMOTE_GLOBAL_MARKETPLACE_NAME, "linear"); + let curated_root = curated_plugins_repo_path(tmp.path()); + write_openai_curated_marketplace(&curated_root, &["linear"]); + write_curated_plugin_sha(tmp.path()); write_file( - &tmp.path() - .join(LEGACY_STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE), - "ok\n", + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true + +[plugins."linear@openai-curated"] +enabled = false +"#, ); let server = MockServer::start().await; - mount_installed_plugins(&server).await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .and(header("authorization", "Bearer Access Token")) + .and(header("chatgpt-account-id", "account_id")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true} +]"#, + )) + .mount(&server) + .await; let manager = Arc::new(PluginsManager::new(tmp.path().to_path_buf())); let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let notification_count = Arc::new(AtomicUsize::new(0)); - let notification_count_for_callback = Arc::clone(¬ification_count); start_startup_remote_plugin_sync_once(RemoteStartupPluginSyncRequest { manager: Arc::clone(&manager), + codex_home: tmp.path().to_path_buf(), + config_layer_stack: load_config_layer_stack(tmp.path()), plugins_enabled: true, - remote_plugins_enabled: true, chatgpt_base_url: format!("{}/backend-api/", server.uri()), auth_manager, - on_effective_plugins_changed: Some(Arc::new(move || { - notification_count_for_callback.fetch_add(1, Ordering::SeqCst); - })), }); + let marker_path = tmp.path().join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE); tokio::time::timeout(Duration::from_secs(5), async { loop { - if notification_count.load(Ordering::SeqCst) == 1 { + if marker_path.is_file() { break; } tokio::time::sleep(Duration::from_millis(10)).await; } }) .await - .expect("remote installed cache should refresh"); + .expect("marker should be written"); - let outcome = manager - .plugins_for_config( - &ConfigLayerStack::default(), - /*plugins_enabled*/ true, - /*remote_plugins_enabled*/ true, - /*plugin_hooks_enabled*/ true, - ) - .await; - assert_eq!(outcome.plugins().len(), 1); - assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global"); - assert_eq!(notification_count.load(Ordering::SeqCst), 1); + assert!( + tmp.path() + .join(format!( + "plugins/cache/openai-curated/linear/{TEST_CURATED_PLUGIN_CACHE_VERSION}" + )) + .is_dir() + ); + let config = + std::fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).expect("config should exist"); + assert!(config.contains(r#"[plugins."linear@openai-curated"]"#)); + assert!(config.contains("enabled = true")); + + let marker_contents = std::fs::read_to_string(marker_path).expect("marker should be readable"); + assert_eq!(marker_contents, "ok\n"); }