From b6dd7ef4c917fa42c081346949b0697012faa9d2 Mon Sep 17 00:00:00 2001 From: Dylan Hurd Date: Mon, 4 May 2026 13:52:43 -0400 Subject: [PATCH] fix(plugins) versioned install --- codex-rs/core-plugins/src/manager.rs | 42 +++ codex-rs/core-plugins/src/manager_tests.rs | 40 ++- codex-rs/core-plugins/src/store.rs | 307 +++++++++++++++++++-- codex-rs/core-plugins/src/store_tests.rs | 202 +++++++++++++- codex-rs/core/src/session/mcp.rs | 3 + 5 files changed, 560 insertions(+), 34 deletions(-) diff --git a/codex-rs/core-plugins/src/manager.rs b/codex-rs/core-plugins/src/manager.rs index aecbd76e5c..a6451e3eab 100644 --- a/codex-rs/core-plugins/src/manager.rs +++ b/codex-rs/core-plugins/src/manager.rs @@ -507,6 +507,48 @@ impl PluginsManager { *featured_plugin_ids_cache = None; } + pub fn cleanup_inactive_plugin_versions_for_config(&self, config: &PluginsConfigInput) { + if !config.plugins_enabled { + return; + } + + let mut plugin_ids = configured_plugins_from_stack(&config.config_layer_stack) + .into_keys() + .filter_map(|plugin_key| match PluginId::parse(&plugin_key) { + Ok(plugin_id) => Some(plugin_id), + Err(err) => { + warn!( + plugin_key, + error = %err, + "ignoring invalid plugin key during inactive plugin cache cleanup" + ); + None + } + }) + .collect::>(); + plugin_ids.extend( + self.remote_installed_plugin_configs(config) + .into_keys() + .filter_map(|plugin_key| match PluginId::parse(&plugin_key) { + Ok(plugin_id) => Some(plugin_id), + Err(err) => { + warn!( + plugin_key, + error = %err, + "ignoring invalid remote plugin key during inactive plugin cache cleanup" + ); + None + } + }), + ); + plugin_ids.sort_unstable_by_key(PluginId::as_key); + plugin_ids.dedup(); + + for plugin_id in plugin_ids { + self.store.cleanup_inactive_versions(&plugin_id); + } + } + fn clear_enabled_outcome_cache(&self) { let mut cached_enabled_outcome = match self.cached_enabled_outcome.write() { Ok(cache) => cache, diff --git a/codex-rs/core-plugins/src/manager_tests.rs b/codex-rs/core-plugins/src/manager_tests.rs index 8abff7700b..d98db1ede7 100644 --- a/codex-rs/core-plugins/src/manager_tests.rs +++ b/codex-rs/core-plugins/src/manager_tests.rs @@ -2977,7 +2977,7 @@ plugins = true } #[test] -fn refresh_curated_plugin_cache_replaces_existing_local_version_with_short_sha_version() { +fn refresh_curated_plugin_cache_installs_short_sha_version_alongside_existing_local_version() { let tmp = tempfile::tempdir().unwrap(); let curated_root = curated_plugins_repo_path(tmp.path()); write_openai_curated_marketplace(&curated_root, &["slack"]); @@ -2999,9 +2999,9 @@ fn refresh_curated_plugin_cache_replaces_existing_local_version_with_short_sha_v ); assert!( - !tmp.path() + tmp.path() .join("plugins/cache/openai-curated/slack/local") - .exists() + .is_dir() ); assert!( tmp.path() @@ -3010,6 +3010,14 @@ fn refresh_curated_plugin_cache_replaces_existing_local_version_with_short_sha_v )) .is_dir() ); + assert_eq!( + fs::read_to_string( + tmp.path() + .join("plugins/cache/openai-curated/slack/.active-version"), + ) + .unwrap(), + format!("{TEST_CURATED_PLUGIN_CACHE_VERSION}\n") + ); } #[test] @@ -3118,11 +3126,11 @@ fn refresh_curated_plugin_cache_migrates_full_sha_cache_version_to_short_version .expect("cache refresh should migrate the full sha cache version") ); assert!( - !tmp.path() + tmp.path() .join(format!( "plugins/cache/openai-curated/slack/{TEST_CURATED_PLUGIN_SHA}" )) - .exists() + .is_dir() ); assert!( tmp.path() @@ -3131,10 +3139,18 @@ fn refresh_curated_plugin_cache_migrates_full_sha_cache_version_to_short_version )) .is_dir() ); + assert_eq!( + fs::read_to_string( + tmp.path() + .join("plugins/cache/openai-curated/slack/.active-version"), + ) + .unwrap(), + format!("{TEST_CURATED_PLUGIN_CACHE_VERSION}\n") + ); } #[test] -fn refresh_non_curated_plugin_cache_replaces_existing_local_version_with_manifest_version() { +fn refresh_non_curated_plugin_cache_installs_manifest_version_alongside_existing_local_version() { let tmp = tempfile::tempdir().unwrap(); let repo_root = tmp.path().join("repo"); fs::create_dir_all(repo_root.join(".git")).unwrap(); @@ -3179,15 +3195,23 @@ enabled = true ); assert!( - !tmp.path() + tmp.path() .join("plugins/cache/debug/sample-plugin/local") - .exists() + .is_dir() ); assert!( tmp.path() .join("plugins/cache/debug/sample-plugin/1.2.3") .is_dir() ); + assert_eq!( + fs::read_to_string( + tmp.path() + .join("plugins/cache/debug/sample-plugin/.active-version"), + ) + .unwrap(), + "1.2.3\n" + ); } #[test] diff --git a/codex-rs/core-plugins/src/store.rs b/codex-rs/core-plugins/src/store.rs index fe662a142e..41307055ef 100644 --- a/codex-rs/core-plugins/src/store.rs +++ b/codex-rs/core-plugins/src/store.rs @@ -7,13 +7,21 @@ use codex_utils_plugins::find_plugin_manifest_path; use serde::Deserialize; use serde_json::Value as JsonValue; use std::fs; +use std::fs::File; use std::io; +use std::io::Write; use std::path::Path; use std::path::PathBuf; +use std::time::Duration; +use tracing::warn; pub const DEFAULT_PLUGIN_VERSION: &str = "local"; pub const PLUGINS_CACHE_DIR: &str = "plugins/cache"; pub const PLUGINS_DATA_DIR: &str = "plugins/data"; +const ACTIVE_PLUGIN_VERSION_FILE: &str = ".active-version"; +const ACTIVE_PLUGIN_VERSION_LOCK_FILE: &str = ".active-version.lock"; +const ACTIVE_PLUGIN_VERSION_LOCK_RETRIES: usize = 20; +const ACTIVE_PLUGIN_VERSION_LOCK_RETRY_SLEEP: Duration = Duration::from_millis(10); #[derive(Debug, Clone, PartialEq, Eq)] pub struct PluginInstallResult { @@ -66,26 +74,21 @@ impl PluginStore { } pub fn active_plugin_version(&self, plugin_id: &PluginId) -> Option { - let mut discovered_versions = fs::read_dir(self.plugin_base_root(plugin_id).as_path()) - .ok()? - .filter_map(Result::ok) - .filter_map(|entry| { - entry.file_type().ok().filter(std::fs::FileType::is_dir)?; - entry.file_name().into_string().ok() - }) - .filter(|version| validate_plugin_version_segment(version).is_ok()) - .collect::>(); - discovered_versions.sort_unstable(); - if discovered_versions.is_empty() { - None - } else if discovered_versions - .iter() - .any(|version| version == DEFAULT_PLUGIN_VERSION) - { - Some(DEFAULT_PLUGIN_VERSION.to_string()) - } else { - discovered_versions.pop() + let plugin_base_root = self.plugin_base_root(plugin_id); + match active_plugin_version_marker(plugin_base_root.as_path()) { + Ok(Some(active_version)) => return Some(active_version), + Ok(None) => {} + Err(err) => { + warn!( + plugin = %plugin_id.as_key(), + path = %plugin_base_root.display(), + error = %err, + "failed to read active plugin version marker" + ); + } } + + legacy_active_plugin_version(plugin_base_root.as_path()) } pub fn active_plugin_root(&self, plugin_id: &PluginId) -> Option { @@ -97,6 +100,14 @@ impl PluginStore { self.active_plugin_version(plugin_id).is_some() } + pub fn cleanup_inactive_versions(&self, plugin_id: &PluginId) { + cleanup_inactive_versions_with_remover( + plugin_id, + self.plugin_base_root(plugin_id).as_path(), + |path| fs::remove_dir_all(path), + ); + } + pub fn install( &self, source_path: AbsolutePathBuf, @@ -127,12 +138,26 @@ impl PluginStore { ))); } validate_plugin_version_segment(&plugin_version).map_err(PluginStoreError::Invalid)?; - let installed_path = self.plugin_root(&plugin_id, &plugin_version); - replace_plugin_root_atomically( - source_path.as_path(), - self.plugin_base_root(&plugin_id).as_path(), - &plugin_version, - )?; + let plugin_base_root = self.plugin_base_root(&plugin_id); + let installed_path = plugin_base_root.join(&plugin_version); + + if plugin_version == DEFAULT_PLUGIN_VERSION { + replace_plugin_root_atomically( + source_path.as_path(), + plugin_base_root.as_path(), + &plugin_version, + )?; + } else { + self.cleanup_inactive_versions(&plugin_id); + if !installed_path.as_path().is_dir() { + install_plugin_version_into_existing_base( + source_path.as_path(), + plugin_base_root.as_path(), + &plugin_version, + )?; + } + write_active_plugin_version_marker(plugin_base_root.as_path(), &plugin_version)?; + } Ok(PluginInstallResult { plugin_id, @@ -172,6 +197,63 @@ pub fn plugin_version_for_source(source_path: &Path) -> Result io::Result> { + if !plugin_base_root.is_dir() { + return Ok(None); + } + + let _lock_file = + lock_active_plugin_version_marker(plugin_base_root, ActivePluginVersionLockKind::Shared)?; + let marker_path = plugin_base_root.join(ACTIVE_PLUGIN_VERSION_FILE); + let version = match fs::read_to_string(&marker_path) { + Ok(version) => version, + Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(err), + }; + let version = version.trim(); + if validate_plugin_version_segment(version).is_err() { + warn!( + marker_path = %marker_path.display(), + "ignoring invalid active plugin version marker" + ); + return Ok(None); + } + + if plugin_base_root.join(version).is_dir() { + Ok(Some(version.to_string())) + } else { + warn!( + marker_path = %marker_path.display(), + plugin_version = version, + "ignoring active plugin version marker for missing version directory" + ); + Ok(None) + } +} + +fn legacy_active_plugin_version(plugin_base_root: &Path) -> Option { + let mut discovered_versions = fs::read_dir(plugin_base_root) + .ok()? + .filter_map(Result::ok) + .filter_map(|entry| { + entry.file_type().ok().filter(std::fs::FileType::is_dir)?; + entry.file_name().into_string().ok() + }) + .filter(|version| validate_plugin_version_segment(version).is_ok()) + .collect::>(); + discovered_versions.sort_unstable(); + if discovered_versions.is_empty() { + None + } else if discovered_versions + .iter() + .any(|version| version == DEFAULT_PLUGIN_VERSION) + { + Some(DEFAULT_PLUGIN_VERSION.to_string()) + } else { + discovered_versions.pop() + } +} + pub fn validate_plugin_version_segment(plugin_version: &str) -> Result<(), String> { if plugin_version.is_empty() { return Err("invalid plugin version: must not be empty".to_string()); @@ -255,6 +337,181 @@ fn remove_existing_target(path: &Path) -> Result<(), PluginStoreError> { } } +fn write_active_plugin_version_marker( + plugin_base_root: &Path, + plugin_version: &str, +) -> Result<(), PluginStoreError> { + fs::create_dir_all(plugin_base_root) + .map_err(|err| PluginStoreError::io("failed to create plugin cache directory", err))?; + let _lock_file = + lock_active_plugin_version_marker(plugin_base_root, ActivePluginVersionLockKind::Exclusive) + .map_err(|err| { + PluginStoreError::io("failed to lock active plugin version marker", err) + })?; + + let marker_path = plugin_base_root.join(ACTIVE_PLUGIN_VERSION_FILE); + let mut marker_file = fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(marker_path) + .map_err(|err| PluginStoreError::io("failed to open active plugin version marker", err))?; + marker_file + .write_all(format!("{plugin_version}\n").as_bytes()) + .map_err(|err| PluginStoreError::io("failed to write active plugin version marker", err))?; + marker_file + .flush() + .map_err(|err| PluginStoreError::io("failed to flush active plugin version marker", err)) +} + +#[derive(Clone, Copy)] +enum ActivePluginVersionLockKind { + Shared, + Exclusive, +} + +fn lock_active_plugin_version_marker( + plugin_base_root: &Path, + lock_kind: ActivePluginVersionLockKind, +) -> io::Result { + let lock_path = plugin_base_root.join(ACTIVE_PLUGIN_VERSION_LOCK_FILE); + let lock_file = fs::OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(false) + .open(&lock_path)?; + + for _ in 0..ACTIVE_PLUGIN_VERSION_LOCK_RETRIES { + let lock_result = match lock_kind { + ActivePluginVersionLockKind::Shared => lock_file.try_lock_shared(), + ActivePluginVersionLockKind::Exclusive => lock_file.try_lock(), + }; + match lock_result { + Ok(()) => return Ok(lock_file), + Err(fs::TryLockError::WouldBlock) => { + std::thread::sleep(ACTIVE_PLUGIN_VERSION_LOCK_RETRY_SLEEP); + } + Err(err) => return Err(err.into()), + } + } + + Err(io::Error::new( + io::ErrorKind::WouldBlock, + format!( + "could not acquire active plugin version marker lock: {}", + lock_path.display() + ), + )) +} + +fn install_plugin_version_into_existing_base( + source: &Path, + plugin_base_root: &Path, + plugin_version: &str, +) -> Result<(), PluginStoreError> { + let Some(parent) = plugin_base_root.parent() else { + return Err(PluginStoreError::Invalid(format!( + "plugin cache path has no parent: {}", + plugin_base_root.display() + ))); + }; + + fs::create_dir_all(plugin_base_root) + .map_err(|err| PluginStoreError::io("failed to create plugin cache directory", err))?; + + let staged_dir = tempfile::Builder::new() + .prefix("plugin-install-") + .tempdir_in(parent) + .map_err(|err| { + PluginStoreError::io("failed to create temporary plugin cache directory", err) + })?; + let staged_version_root = staged_dir.path().join(plugin_version); + copy_dir_recursive(source, &staged_version_root)?; + + fs::rename(&staged_version_root, plugin_base_root.join(plugin_version)).map_err(|err| { + PluginStoreError::io("failed to activate plugin cache version entry", err) + })?; + + Ok(()) +} + +fn cleanup_inactive_versions_with_remover( + plugin_id: &PluginId, + plugin_base_root: &Path, + mut remove_dir_all: F, +) where + F: FnMut(&Path) -> io::Result<()>, +{ + let active_version = match active_plugin_version_marker(plugin_base_root) { + Ok(Some(active_version)) => Some(active_version), + Ok(None) => legacy_active_plugin_version(plugin_base_root), + Err(err) => { + warn!( + plugin = %plugin_id.as_key(), + path = %plugin_base_root.display(), + error = %err, + "failed to read active plugin version marker while cleaning inactive versions" + ); + None + } + }; + let Some(active_version) = active_version else { + return; + }; + + let entries = match fs::read_dir(plugin_base_root) { + Ok(entries) => entries, + Err(err) => { + warn!( + plugin = %plugin_id.as_key(), + path = %plugin_base_root.display(), + error = %err, + "failed to read plugin cache while cleaning inactive versions" + ); + return; + } + }; + + for entry in entries { + let entry = match entry { + Ok(entry) => entry, + Err(err) => { + warn!( + plugin = %plugin_id.as_key(), + path = %plugin_base_root.display(), + error = %err, + "failed to enumerate plugin cache while cleaning inactive versions" + ); + continue; + } + }; + let Ok(file_type) = entry.file_type() else { + continue; + }; + if !file_type.is_dir() { + continue; + } + let Ok(version) = entry.file_name().into_string() else { + continue; + }; + if version == active_version || validate_plugin_version_segment(&version).is_err() { + continue; + } + + let path = entry.path(); + if let Err(err) = remove_dir_all(&path) { + warn!( + plugin = %plugin_id.as_key(), + plugin_version = %version, + path = %path.display(), + error = %err, + "failed to remove inactive plugin cache version" + ); + } + } +} + fn replace_plugin_root_atomically( source: &Path, target_root: &Path, diff --git a/codex-rs/core-plugins/src/store_tests.rs b/codex-rs/core-plugins/src/store_tests.rs index 0ba6b0d2c6..384c3270c2 100644 --- a/codex-rs/core-plugins/src/store_tests.rs +++ b/codex-rs/core-plugins/src/store_tests.rs @@ -144,11 +144,112 @@ fn install_with_version_uses_requested_cache_version() { result, PluginInstallResult { plugin_id, - plugin_version, + plugin_version: plugin_version.clone(), installed_path: AbsolutePathBuf::try_from(installed_path.clone()).unwrap(), } ); assert!(installed_path.join(".codex-plugin/plugin.json").is_file()); + assert_eq!( + std::fs::read_to_string( + tmp.path() + .join("plugins/cache/openai-curated/sample-plugin/.active-version"), + ) + .unwrap(), + format!("{plugin_version}\n") + ); + assert!( + tmp.path() + .join("plugins/cache/openai-curated/sample-plugin/.active-version.lock") + .is_file() + ); +} + +#[test] +fn install_with_existing_version_reuses_cache_directory() { + let tmp = tempdir().unwrap(); + write_plugin(tmp.path(), "source-one", "sample-plugin"); + write_plugin(tmp.path(), "source-two", "sample-plugin"); + fs::write(tmp.path().join("source-two/new-file"), "new").unwrap(); + let plugin_id = + PluginId::new("sample-plugin".to_string(), "openai-curated".to_string()).unwrap(); + let plugin_version = "0123456789abcdef".to_string(); + let store = PluginStore::new(tmp.path().to_path_buf()); + + let result = store + .install_with_version( + AbsolutePathBuf::try_from(tmp.path().join("source-one")).unwrap(), + plugin_id.clone(), + plugin_version.clone(), + ) + .unwrap(); + fs::write(result.installed_path.as_path().join("sentinel"), "old").unwrap(); + + let reinstall_result = store + .install_with_version( + AbsolutePathBuf::try_from(tmp.path().join("source-two")).unwrap(), + plugin_id.clone(), + plugin_version.clone(), + ) + .unwrap(); + + assert_eq!(reinstall_result, result); + assert_eq!( + fs::read_to_string(result.installed_path.as_path().join("sentinel")).unwrap(), + "old" + ); + assert!(!result.installed_path.as_path().join("new-file").exists()); + assert_eq!( + store.active_plugin_version(&plugin_id), + Some(plugin_version) + ); +} + +#[test] +fn install_with_version_adds_new_version_without_deleting_old_version() { + let tmp = tempdir().unwrap(); + write_plugin(tmp.path(), "source-one", "sample-plugin"); + write_plugin(tmp.path(), "source-two", "sample-plugin"); + let plugin_id = + PluginId::new("sample-plugin".to_string(), "openai-curated".to_string()).unwrap(); + let store = PluginStore::new(tmp.path().to_path_buf()); + + store + .install_with_version( + AbsolutePathBuf::try_from(tmp.path().join("source-one")).unwrap(), + plugin_id.clone(), + "1.0.0".to_string(), + ) + .unwrap(); + store + .install_with_version( + AbsolutePathBuf::try_from(tmp.path().join("source-two")).unwrap(), + plugin_id.clone(), + "2.0.0".to_string(), + ) + .unwrap(); + + assert!( + tmp.path() + .join("plugins/cache/openai-curated/sample-plugin/1.0.0") + .is_dir() + ); + assert!( + tmp.path() + .join("plugins/cache/openai-curated/sample-plugin/2.0.0") + .is_dir() + ); + assert_eq!( + std::fs::read_to_string( + tmp.path() + .join("plugins/cache/openai-curated/sample-plugin/.active-version"), + ) + .unwrap(), + "2.0.0\n" + ); + assert_eq!( + store.active_plugin_version(&plugin_id), + Some("2.0.0".to_string()) + ); } #[test] @@ -203,6 +304,34 @@ fn install_rejects_blank_manifest_version() { ); } +#[test] +fn active_plugin_version_prefers_active_marker_over_sorted_versions() { + let tmp = tempdir().unwrap(); + write_plugin( + &tmp.path().join("plugins/cache/debug"), + "sample-plugin/1.0.0", + "sample-plugin", + ); + write_plugin( + &tmp.path().join("plugins/cache/debug"), + "sample-plugin/9.0.0", + "sample-plugin", + ); + fs::write( + tmp.path() + .join("plugins/cache/debug/sample-plugin/.active-version"), + "1.0.0\n", + ) + .unwrap(); + let store = PluginStore::new(tmp.path().to_path_buf()); + let plugin_id = PluginId::new("sample-plugin".to_string(), "debug".to_string()).unwrap(); + + assert_eq!( + store.active_plugin_version(&plugin_id), + Some("1.0.0".to_string()) + ); +} + #[test] fn active_plugin_version_reads_version_directory_name() { let tmp = tempdir().unwrap(); @@ -224,6 +353,77 @@ fn active_plugin_version_reads_version_directory_name() { ); } +#[test] +fn cleanup_inactive_versions_removes_versions_except_active_marker_version() { + let tmp = tempdir().unwrap(); + write_plugin( + &tmp.path().join("plugins/cache/debug"), + "sample-plugin/1.0.0", + "sample-plugin", + ); + write_plugin( + &tmp.path().join("plugins/cache/debug"), + "sample-plugin/2.0.0", + "sample-plugin", + ); + fs::write( + tmp.path() + .join("plugins/cache/debug/sample-plugin/.active-version"), + "2.0.0\n", + ) + .unwrap(); + let store = PluginStore::new(tmp.path().to_path_buf()); + let plugin_id = PluginId::new("sample-plugin".to_string(), "debug".to_string()).unwrap(); + + store.cleanup_inactive_versions(&plugin_id); + + assert!( + !tmp.path() + .join("plugins/cache/debug/sample-plugin/1.0.0") + .exists() + ); + assert!( + tmp.path() + .join("plugins/cache/debug/sample-plugin/2.0.0") + .is_dir() + ); +} + +#[test] +fn cleanup_inactive_versions_logs_and_continues_after_remove_failures() { + let tmp = tempdir().unwrap(); + let plugin_base_root = tmp.path().join("plugins/cache/debug/sample-plugin"); + write_plugin( + &tmp.path().join("plugins/cache/debug"), + "sample-plugin/1.0.0", + "sample-plugin", + ); + write_plugin( + &tmp.path().join("plugins/cache/debug"), + "sample-plugin/2.0.0", + "sample-plugin", + ); + write_plugin( + &tmp.path().join("plugins/cache/debug"), + "sample-plugin/3.0.0", + "sample-plugin", + ); + fs::write(plugin_base_root.join(".active-version"), "3.0.0\n").unwrap(); + let plugin_id = PluginId::new("sample-plugin".to_string(), "debug".to_string()).unwrap(); + + cleanup_inactive_versions_with_remover(&plugin_id, &plugin_base_root, |path| { + if path.ends_with("1.0.0") { + Err(io::Error::new(io::ErrorKind::PermissionDenied, "held open")) + } else { + fs::remove_dir_all(path) + } + }); + + assert!(plugin_base_root.join("1.0.0").is_dir()); + assert!(!plugin_base_root.join("2.0.0").exists()); + assert!(plugin_base_root.join("3.0.0").is_dir()); +} + #[test] fn active_plugin_version_prefers_default_local_version_when_multiple_versions_exist() { let tmp = tempdir().unwrap(); diff --git a/codex-rs/core/src/session/mcp.rs b/codex-rs/core/src/session/mcp.rs index 2aa5adee28..227daf0a91 100644 --- a/codex-rs/core/src/session/mcp.rs +++ b/codex-rs/core/src/session/mcp.rs @@ -267,6 +267,9 @@ impl Session { std::mem::replace(&mut *manager, refreshed_manager) }; old_manager.shutdown().await; + self.services + .plugins_manager + .cleanup_inactive_plugin_versions_for_config(&config.plugins_config_input()); } pub(crate) async fn refresh_mcp_servers_if_requested(&self, turn_context: &TurnContext) {