fix(plugins) versioned install

This commit is contained in:
Dylan Hurd
2026-05-04 13:52:43 -04:00
parent 102366aa78
commit b6dd7ef4c9
5 changed files with 560 additions and 34 deletions

View File

@@ -507,6 +507,48 @@ impl PluginsManager {
*featured_plugin_ids_cache = None;
}
pub fn cleanup_inactive_plugin_versions_for_config(&self, config: &PluginsConfigInput) {
if !config.plugins_enabled {
return;
}
let mut plugin_ids = configured_plugins_from_stack(&config.config_layer_stack)
.into_keys()
.filter_map(|plugin_key| match PluginId::parse(&plugin_key) {
Ok(plugin_id) => Some(plugin_id),
Err(err) => {
warn!(
plugin_key,
error = %err,
"ignoring invalid plugin key during inactive plugin cache cleanup"
);
None
}
})
.collect::<Vec<_>>();
plugin_ids.extend(
self.remote_installed_plugin_configs(config)
.into_keys()
.filter_map(|plugin_key| match PluginId::parse(&plugin_key) {
Ok(plugin_id) => Some(plugin_id),
Err(err) => {
warn!(
plugin_key,
error = %err,
"ignoring invalid remote plugin key during inactive plugin cache cleanup"
);
None
}
}),
);
plugin_ids.sort_unstable_by_key(PluginId::as_key);
plugin_ids.dedup();
for plugin_id in plugin_ids {
self.store.cleanup_inactive_versions(&plugin_id);
}
}
fn clear_enabled_outcome_cache(&self) {
let mut cached_enabled_outcome = match self.cached_enabled_outcome.write() {
Ok(cache) => cache,

View File

@@ -2977,7 +2977,7 @@ plugins = true
}
#[test]
fn refresh_curated_plugin_cache_replaces_existing_local_version_with_short_sha_version() {
fn refresh_curated_plugin_cache_installs_short_sha_version_alongside_existing_local_version() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["slack"]);
@@ -2999,9 +2999,9 @@ fn refresh_curated_plugin_cache_replaces_existing_local_version_with_short_sha_v
);
assert!(
!tmp.path()
tmp.path()
.join("plugins/cache/openai-curated/slack/local")
.exists()
.is_dir()
);
assert!(
tmp.path()
@@ -3010,6 +3010,14 @@ fn refresh_curated_plugin_cache_replaces_existing_local_version_with_short_sha_v
))
.is_dir()
);
assert_eq!(
fs::read_to_string(
tmp.path()
.join("plugins/cache/openai-curated/slack/.active-version"),
)
.unwrap(),
format!("{TEST_CURATED_PLUGIN_CACHE_VERSION}\n")
);
}
#[test]
@@ -3118,11 +3126,11 @@ fn refresh_curated_plugin_cache_migrates_full_sha_cache_version_to_short_version
.expect("cache refresh should migrate the full sha cache version")
);
assert!(
!tmp.path()
tmp.path()
.join(format!(
"plugins/cache/openai-curated/slack/{TEST_CURATED_PLUGIN_SHA}"
))
.exists()
.is_dir()
);
assert!(
tmp.path()
@@ -3131,10 +3139,18 @@ fn refresh_curated_plugin_cache_migrates_full_sha_cache_version_to_short_version
))
.is_dir()
);
assert_eq!(
fs::read_to_string(
tmp.path()
.join("plugins/cache/openai-curated/slack/.active-version"),
)
.unwrap(),
format!("{TEST_CURATED_PLUGIN_CACHE_VERSION}\n")
);
}
#[test]
fn refresh_non_curated_plugin_cache_replaces_existing_local_version_with_manifest_version() {
fn refresh_non_curated_plugin_cache_installs_manifest_version_alongside_existing_local_version() {
let tmp = tempfile::tempdir().unwrap();
let repo_root = tmp.path().join("repo");
fs::create_dir_all(repo_root.join(".git")).unwrap();
@@ -3179,15 +3195,23 @@ enabled = true
);
assert!(
!tmp.path()
tmp.path()
.join("plugins/cache/debug/sample-plugin/local")
.exists()
.is_dir()
);
assert!(
tmp.path()
.join("plugins/cache/debug/sample-plugin/1.2.3")
.is_dir()
);
assert_eq!(
fs::read_to_string(
tmp.path()
.join("plugins/cache/debug/sample-plugin/.active-version"),
)
.unwrap(),
"1.2.3\n"
);
}
#[test]

View File

@@ -7,13 +7,21 @@ use codex_utils_plugins::find_plugin_manifest_path;
use serde::Deserialize;
use serde_json::Value as JsonValue;
use std::fs;
use std::fs::File;
use std::io;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use tracing::warn;
pub const DEFAULT_PLUGIN_VERSION: &str = "local";
pub const PLUGINS_CACHE_DIR: &str = "plugins/cache";
pub const PLUGINS_DATA_DIR: &str = "plugins/data";
const ACTIVE_PLUGIN_VERSION_FILE: &str = ".active-version";
const ACTIVE_PLUGIN_VERSION_LOCK_FILE: &str = ".active-version.lock";
const ACTIVE_PLUGIN_VERSION_LOCK_RETRIES: usize = 20;
const ACTIVE_PLUGIN_VERSION_LOCK_RETRY_SLEEP: Duration = Duration::from_millis(10);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PluginInstallResult {
@@ -66,26 +74,21 @@ impl PluginStore {
}
pub fn active_plugin_version(&self, plugin_id: &PluginId) -> Option<String> {
let mut discovered_versions = fs::read_dir(self.plugin_base_root(plugin_id).as_path())
.ok()?
.filter_map(Result::ok)
.filter_map(|entry| {
entry.file_type().ok().filter(std::fs::FileType::is_dir)?;
entry.file_name().into_string().ok()
})
.filter(|version| validate_plugin_version_segment(version).is_ok())
.collect::<Vec<_>>();
discovered_versions.sort_unstable();
if discovered_versions.is_empty() {
None
} else if discovered_versions
.iter()
.any(|version| version == DEFAULT_PLUGIN_VERSION)
{
Some(DEFAULT_PLUGIN_VERSION.to_string())
} else {
discovered_versions.pop()
let plugin_base_root = self.plugin_base_root(plugin_id);
match active_plugin_version_marker(plugin_base_root.as_path()) {
Ok(Some(active_version)) => return Some(active_version),
Ok(None) => {}
Err(err) => {
warn!(
plugin = %plugin_id.as_key(),
path = %plugin_base_root.display(),
error = %err,
"failed to read active plugin version marker"
);
}
}
legacy_active_plugin_version(plugin_base_root.as_path())
}
pub fn active_plugin_root(&self, plugin_id: &PluginId) -> Option<AbsolutePathBuf> {
@@ -97,6 +100,14 @@ impl PluginStore {
self.active_plugin_version(plugin_id).is_some()
}
pub fn cleanup_inactive_versions(&self, plugin_id: &PluginId) {
cleanup_inactive_versions_with_remover(
plugin_id,
self.plugin_base_root(plugin_id).as_path(),
|path| fs::remove_dir_all(path),
);
}
pub fn install(
&self,
source_path: AbsolutePathBuf,
@@ -127,12 +138,26 @@ impl PluginStore {
)));
}
validate_plugin_version_segment(&plugin_version).map_err(PluginStoreError::Invalid)?;
let installed_path = self.plugin_root(&plugin_id, &plugin_version);
replace_plugin_root_atomically(
source_path.as_path(),
self.plugin_base_root(&plugin_id).as_path(),
&plugin_version,
)?;
let plugin_base_root = self.plugin_base_root(&plugin_id);
let installed_path = plugin_base_root.join(&plugin_version);
if plugin_version == DEFAULT_PLUGIN_VERSION {
replace_plugin_root_atomically(
source_path.as_path(),
plugin_base_root.as_path(),
&plugin_version,
)?;
} else {
self.cleanup_inactive_versions(&plugin_id);
if !installed_path.as_path().is_dir() {
install_plugin_version_into_existing_base(
source_path.as_path(),
plugin_base_root.as_path(),
&plugin_version,
)?;
}
write_active_plugin_version_marker(plugin_base_root.as_path(), &plugin_version)?;
}
Ok(PluginInstallResult {
plugin_id,
@@ -172,6 +197,63 @@ pub fn plugin_version_for_source(source_path: &Path) -> Result<String, PluginSto
Ok(plugin_version)
}
fn active_plugin_version_marker(plugin_base_root: &Path) -> io::Result<Option<String>> {
if !plugin_base_root.is_dir() {
return Ok(None);
}
let _lock_file =
lock_active_plugin_version_marker(plugin_base_root, ActivePluginVersionLockKind::Shared)?;
let marker_path = plugin_base_root.join(ACTIVE_PLUGIN_VERSION_FILE);
let version = match fs::read_to_string(&marker_path) {
Ok(version) => version,
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err),
};
let version = version.trim();
if validate_plugin_version_segment(version).is_err() {
warn!(
marker_path = %marker_path.display(),
"ignoring invalid active plugin version marker"
);
return Ok(None);
}
if plugin_base_root.join(version).is_dir() {
Ok(Some(version.to_string()))
} else {
warn!(
marker_path = %marker_path.display(),
plugin_version = version,
"ignoring active plugin version marker for missing version directory"
);
Ok(None)
}
}
fn legacy_active_plugin_version(plugin_base_root: &Path) -> Option<String> {
let mut discovered_versions = fs::read_dir(plugin_base_root)
.ok()?
.filter_map(Result::ok)
.filter_map(|entry| {
entry.file_type().ok().filter(std::fs::FileType::is_dir)?;
entry.file_name().into_string().ok()
})
.filter(|version| validate_plugin_version_segment(version).is_ok())
.collect::<Vec<_>>();
discovered_versions.sort_unstable();
if discovered_versions.is_empty() {
None
} else if discovered_versions
.iter()
.any(|version| version == DEFAULT_PLUGIN_VERSION)
{
Some(DEFAULT_PLUGIN_VERSION.to_string())
} else {
discovered_versions.pop()
}
}
pub fn validate_plugin_version_segment(plugin_version: &str) -> Result<(), String> {
if plugin_version.is_empty() {
return Err("invalid plugin version: must not be empty".to_string());
@@ -255,6 +337,181 @@ fn remove_existing_target(path: &Path) -> Result<(), PluginStoreError> {
}
}
fn write_active_plugin_version_marker(
plugin_base_root: &Path,
plugin_version: &str,
) -> Result<(), PluginStoreError> {
fs::create_dir_all(plugin_base_root)
.map_err(|err| PluginStoreError::io("failed to create plugin cache directory", err))?;
let _lock_file =
lock_active_plugin_version_marker(plugin_base_root, ActivePluginVersionLockKind::Exclusive)
.map_err(|err| {
PluginStoreError::io("failed to lock active plugin version marker", err)
})?;
let marker_path = plugin_base_root.join(ACTIVE_PLUGIN_VERSION_FILE);
let mut marker_file = fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(marker_path)
.map_err(|err| PluginStoreError::io("failed to open active plugin version marker", err))?;
marker_file
.write_all(format!("{plugin_version}\n").as_bytes())
.map_err(|err| PluginStoreError::io("failed to write active plugin version marker", err))?;
marker_file
.flush()
.map_err(|err| PluginStoreError::io("failed to flush active plugin version marker", err))
}
#[derive(Clone, Copy)]
enum ActivePluginVersionLockKind {
Shared,
Exclusive,
}
fn lock_active_plugin_version_marker(
plugin_base_root: &Path,
lock_kind: ActivePluginVersionLockKind,
) -> io::Result<File> {
let lock_path = plugin_base_root.join(ACTIVE_PLUGIN_VERSION_LOCK_FILE);
let lock_file = fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&lock_path)?;
for _ in 0..ACTIVE_PLUGIN_VERSION_LOCK_RETRIES {
let lock_result = match lock_kind {
ActivePluginVersionLockKind::Shared => lock_file.try_lock_shared(),
ActivePluginVersionLockKind::Exclusive => lock_file.try_lock(),
};
match lock_result {
Ok(()) => return Ok(lock_file),
Err(fs::TryLockError::WouldBlock) => {
std::thread::sleep(ACTIVE_PLUGIN_VERSION_LOCK_RETRY_SLEEP);
}
Err(err) => return Err(err.into()),
}
}
Err(io::Error::new(
io::ErrorKind::WouldBlock,
format!(
"could not acquire active plugin version marker lock: {}",
lock_path.display()
),
))
}
fn install_plugin_version_into_existing_base(
source: &Path,
plugin_base_root: &Path,
plugin_version: &str,
) -> Result<(), PluginStoreError> {
let Some(parent) = plugin_base_root.parent() else {
return Err(PluginStoreError::Invalid(format!(
"plugin cache path has no parent: {}",
plugin_base_root.display()
)));
};
fs::create_dir_all(plugin_base_root)
.map_err(|err| PluginStoreError::io("failed to create plugin cache directory", err))?;
let staged_dir = tempfile::Builder::new()
.prefix("plugin-install-")
.tempdir_in(parent)
.map_err(|err| {
PluginStoreError::io("failed to create temporary plugin cache directory", err)
})?;
let staged_version_root = staged_dir.path().join(plugin_version);
copy_dir_recursive(source, &staged_version_root)?;
fs::rename(&staged_version_root, plugin_base_root.join(plugin_version)).map_err(|err| {
PluginStoreError::io("failed to activate plugin cache version entry", err)
})?;
Ok(())
}
fn cleanup_inactive_versions_with_remover<F>(
plugin_id: &PluginId,
plugin_base_root: &Path,
mut remove_dir_all: F,
) where
F: FnMut(&Path) -> io::Result<()>,
{
let active_version = match active_plugin_version_marker(plugin_base_root) {
Ok(Some(active_version)) => Some(active_version),
Ok(None) => legacy_active_plugin_version(plugin_base_root),
Err(err) => {
warn!(
plugin = %plugin_id.as_key(),
path = %plugin_base_root.display(),
error = %err,
"failed to read active plugin version marker while cleaning inactive versions"
);
None
}
};
let Some(active_version) = active_version else {
return;
};
let entries = match fs::read_dir(plugin_base_root) {
Ok(entries) => entries,
Err(err) => {
warn!(
plugin = %plugin_id.as_key(),
path = %plugin_base_root.display(),
error = %err,
"failed to read plugin cache while cleaning inactive versions"
);
return;
}
};
for entry in entries {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
warn!(
plugin = %plugin_id.as_key(),
path = %plugin_base_root.display(),
error = %err,
"failed to enumerate plugin cache while cleaning inactive versions"
);
continue;
}
};
let Ok(file_type) = entry.file_type() else {
continue;
};
if !file_type.is_dir() {
continue;
}
let Ok(version) = entry.file_name().into_string() else {
continue;
};
if version == active_version || validate_plugin_version_segment(&version).is_err() {
continue;
}
let path = entry.path();
if let Err(err) = remove_dir_all(&path) {
warn!(
plugin = %plugin_id.as_key(),
plugin_version = %version,
path = %path.display(),
error = %err,
"failed to remove inactive plugin cache version"
);
}
}
}
fn replace_plugin_root_atomically(
source: &Path,
target_root: &Path,

View File

@@ -144,11 +144,112 @@ fn install_with_version_uses_requested_cache_version() {
result,
PluginInstallResult {
plugin_id,
plugin_version,
plugin_version: plugin_version.clone(),
installed_path: AbsolutePathBuf::try_from(installed_path.clone()).unwrap(),
}
);
assert!(installed_path.join(".codex-plugin/plugin.json").is_file());
assert_eq!(
std::fs::read_to_string(
tmp.path()
.join("plugins/cache/openai-curated/sample-plugin/.active-version"),
)
.unwrap(),
format!("{plugin_version}\n")
);
assert!(
tmp.path()
.join("plugins/cache/openai-curated/sample-plugin/.active-version.lock")
.is_file()
);
}
#[test]
fn install_with_existing_version_reuses_cache_directory() {
let tmp = tempdir().unwrap();
write_plugin(tmp.path(), "source-one", "sample-plugin");
write_plugin(tmp.path(), "source-two", "sample-plugin");
fs::write(tmp.path().join("source-two/new-file"), "new").unwrap();
let plugin_id =
PluginId::new("sample-plugin".to_string(), "openai-curated".to_string()).unwrap();
let plugin_version = "0123456789abcdef".to_string();
let store = PluginStore::new(tmp.path().to_path_buf());
let result = store
.install_with_version(
AbsolutePathBuf::try_from(tmp.path().join("source-one")).unwrap(),
plugin_id.clone(),
plugin_version.clone(),
)
.unwrap();
fs::write(result.installed_path.as_path().join("sentinel"), "old").unwrap();
let reinstall_result = store
.install_with_version(
AbsolutePathBuf::try_from(tmp.path().join("source-two")).unwrap(),
plugin_id.clone(),
plugin_version.clone(),
)
.unwrap();
assert_eq!(reinstall_result, result);
assert_eq!(
fs::read_to_string(result.installed_path.as_path().join("sentinel")).unwrap(),
"old"
);
assert!(!result.installed_path.as_path().join("new-file").exists());
assert_eq!(
store.active_plugin_version(&plugin_id),
Some(plugin_version)
);
}
#[test]
fn install_with_version_adds_new_version_without_deleting_old_version() {
let tmp = tempdir().unwrap();
write_plugin(tmp.path(), "source-one", "sample-plugin");
write_plugin(tmp.path(), "source-two", "sample-plugin");
let plugin_id =
PluginId::new("sample-plugin".to_string(), "openai-curated".to_string()).unwrap();
let store = PluginStore::new(tmp.path().to_path_buf());
store
.install_with_version(
AbsolutePathBuf::try_from(tmp.path().join("source-one")).unwrap(),
plugin_id.clone(),
"1.0.0".to_string(),
)
.unwrap();
store
.install_with_version(
AbsolutePathBuf::try_from(tmp.path().join("source-two")).unwrap(),
plugin_id.clone(),
"2.0.0".to_string(),
)
.unwrap();
assert!(
tmp.path()
.join("plugins/cache/openai-curated/sample-plugin/1.0.0")
.is_dir()
);
assert!(
tmp.path()
.join("plugins/cache/openai-curated/sample-plugin/2.0.0")
.is_dir()
);
assert_eq!(
std::fs::read_to_string(
tmp.path()
.join("plugins/cache/openai-curated/sample-plugin/.active-version"),
)
.unwrap(),
"2.0.0\n"
);
assert_eq!(
store.active_plugin_version(&plugin_id),
Some("2.0.0".to_string())
);
}
#[test]
@@ -203,6 +304,34 @@ fn install_rejects_blank_manifest_version() {
);
}
#[test]
fn active_plugin_version_prefers_active_marker_over_sorted_versions() {
let tmp = tempdir().unwrap();
write_plugin(
&tmp.path().join("plugins/cache/debug"),
"sample-plugin/1.0.0",
"sample-plugin",
);
write_plugin(
&tmp.path().join("plugins/cache/debug"),
"sample-plugin/9.0.0",
"sample-plugin",
);
fs::write(
tmp.path()
.join("plugins/cache/debug/sample-plugin/.active-version"),
"1.0.0\n",
)
.unwrap();
let store = PluginStore::new(tmp.path().to_path_buf());
let plugin_id = PluginId::new("sample-plugin".to_string(), "debug".to_string()).unwrap();
assert_eq!(
store.active_plugin_version(&plugin_id),
Some("1.0.0".to_string())
);
}
#[test]
fn active_plugin_version_reads_version_directory_name() {
let tmp = tempdir().unwrap();
@@ -224,6 +353,77 @@ fn active_plugin_version_reads_version_directory_name() {
);
}
#[test]
fn cleanup_inactive_versions_removes_versions_except_active_marker_version() {
let tmp = tempdir().unwrap();
write_plugin(
&tmp.path().join("plugins/cache/debug"),
"sample-plugin/1.0.0",
"sample-plugin",
);
write_plugin(
&tmp.path().join("plugins/cache/debug"),
"sample-plugin/2.0.0",
"sample-plugin",
);
fs::write(
tmp.path()
.join("plugins/cache/debug/sample-plugin/.active-version"),
"2.0.0\n",
)
.unwrap();
let store = PluginStore::new(tmp.path().to_path_buf());
let plugin_id = PluginId::new("sample-plugin".to_string(), "debug".to_string()).unwrap();
store.cleanup_inactive_versions(&plugin_id);
assert!(
!tmp.path()
.join("plugins/cache/debug/sample-plugin/1.0.0")
.exists()
);
assert!(
tmp.path()
.join("plugins/cache/debug/sample-plugin/2.0.0")
.is_dir()
);
}
#[test]
fn cleanup_inactive_versions_logs_and_continues_after_remove_failures() {
let tmp = tempdir().unwrap();
let plugin_base_root = tmp.path().join("plugins/cache/debug/sample-plugin");
write_plugin(
&tmp.path().join("plugins/cache/debug"),
"sample-plugin/1.0.0",
"sample-plugin",
);
write_plugin(
&tmp.path().join("plugins/cache/debug"),
"sample-plugin/2.0.0",
"sample-plugin",
);
write_plugin(
&tmp.path().join("plugins/cache/debug"),
"sample-plugin/3.0.0",
"sample-plugin",
);
fs::write(plugin_base_root.join(".active-version"), "3.0.0\n").unwrap();
let plugin_id = PluginId::new("sample-plugin".to_string(), "debug".to_string()).unwrap();
cleanup_inactive_versions_with_remover(&plugin_id, &plugin_base_root, |path| {
if path.ends_with("1.0.0") {
Err(io::Error::new(io::ErrorKind::PermissionDenied, "held open"))
} else {
fs::remove_dir_all(path)
}
});
assert!(plugin_base_root.join("1.0.0").is_dir());
assert!(!plugin_base_root.join("2.0.0").exists());
assert!(plugin_base_root.join("3.0.0").is_dir());
}
#[test]
fn active_plugin_version_prefers_default_local_version_when_multiple_versions_exist() {
let tmp = tempdir().unwrap();

View File

@@ -267,6 +267,9 @@ impl Session {
std::mem::replace(&mut *manager, refreshed_manager)
};
old_manager.shutdown().await;
self.services
.plugins_manager
.cleanup_inactive_plugin_versions_for_config(&config.plugins_config_input());
}
pub(crate) async fn refresh_mcp_servers_if_requested(&self, turn_context: &TurnContext) {