Fix plugin manager move parity issues

This commit is contained in:
xli-oai
2026-04-29 23:08:18 -07:00
parent bd3f4edd48
commit 0faefd1327
8 changed files with 282 additions and 66 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2504,7 +2504,6 @@ version = "0.0.0"
dependencies = [
"anyhow",
"chrono",
"codex-analytics",
"codex-app-server-protocol",
"codex-config",
"codex-core-skills",

View File

@@ -3,6 +3,9 @@ use crate::error_code::internal_error;
use crate::error_code::invalid_request;
use codex_app_server_protocol::PluginInstallPolicy;
use codex_core::config::edit::ConfigEditsBuilder;
use codex_core_plugins::loader::installed_plugin_telemetry_metadata;
use codex_core_plugins::loader::plugin_telemetry_metadata_from_root;
use codex_core_plugins::store::PluginStore;
impl CodexMessageProcessor {
pub(super) async fn plugin_list(
@@ -488,6 +491,9 @@ impl CodexMessageProcessor {
.map_err(|err| {
internal_error(format!("failed to persist installed plugin config: {err}"))
})?;
self.analytics_events_client.track_plugin_installed(
plugin_telemetry_metadata_from_root(&result.plugin_id, &result.installed_path).await,
);
let config = match self.load_latest_config(config_cwd).await {
Ok(config) => config,
Err(err) => {
@@ -710,6 +716,19 @@ impl CodexMessageProcessor {
if !plugin_id.is_empty() && is_valid_remote_plugin_id(&plugin_id) {
return self.remote_plugin_uninstall_response(plugin_id).await;
}
let parsed_plugin_id = codex_core::plugins::PluginId::parse(&plugin_id)
.map_err(|err| invalid_request(err.to_string()))?;
let plugin_telemetry =
match PluginStore::try_new(self.config.codex_home.as_path().to_path_buf()) {
Ok(store) if store.active_plugin_root(&parsed_plugin_id).is_some() => Some(
installed_plugin_telemetry_metadata(
self.config.codex_home.as_path(),
&parsed_plugin_id,
)
.await,
),
Ok(_) | Err(_) => None,
};
let plugins_manager = self.thread_manager.plugins_manager();
plugins_manager
@@ -721,6 +740,10 @@ impl CodexMessageProcessor {
.apply()
.await
.map_err(|err| internal_error(format!("failed to clear plugin config: {err}")))?;
if let Some(plugin_telemetry) = plugin_telemetry {
self.analytics_events_client
.track_plugin_uninstalled(plugin_telemetry);
}
match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => self.on_effective_plugins_changed(config),
Err(err) => {

View File

@@ -292,9 +292,6 @@ impl MessageProcessor {
environment_manager,
Some(analytics_events_client.clone()),
));
thread_manager
.plugins_manager()
.set_analytics_events_client(analytics_events_client.clone());
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager: auth_manager.clone(),

View File

@@ -12,6 +12,8 @@ use toml_edit::Item as TomlItem;
use toml_edit::Table as TomlTable;
use toml_edit::value;
use codex_utils_path::write_atomically;
use crate::AppToolApproval;
use crate::CONFIG_TOML_FILE;
use crate::McpServerConfig;
@@ -117,8 +119,7 @@ impl ConfigEditsBuilder {
for edit in &self.plugin_edits {
apply_plugin_config_edit(&mut doc, edit);
}
fs::create_dir_all(&self.codex_home)?;
fs::write(config_path, doc.to_string())
write_atomically(&config_path, &doc.to_string())
}
}

View File

@@ -14,7 +14,6 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
codex-analytics = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-config = { workspace = true }
codex-core-skills = { workspace = true }

View File

@@ -170,13 +170,7 @@ pub fn remote_installed_plugins_to_config(
return None;
}
};
// Remote installed refresh materializes bundles before publishing state. Keep this
// version check so partial refresh failures do not make missing or stale local bundles
// effective.
let release_version = plugin.release_version.as_deref()?;
if store.active_plugin_version(&plugin_id).as_deref() != Some(release_version) {
return None;
}
store.active_plugin_root(&plugin_id)?;
Some((
plugin_id.as_key(),
PluginConfig {

View File

@@ -3,14 +3,12 @@ use crate::PluginLoadOutcome;
use crate::installed_marketplaces::installed_marketplace_roots_from_layer_stack;
use crate::loader::configured_curated_plugin_ids_from_codex_home;
use crate::loader::curated_plugin_cache_version;
use crate::loader::installed_plugin_telemetry_metadata;
use crate::loader::load_plugin_apps;
use crate::loader::load_plugin_mcp_servers;
use crate::loader::load_plugin_skills;
use crate::loader::load_plugins_from_layer_stack;
use crate::loader::log_plugin_load_errors;
use crate::loader::materialize_marketplace_plugin_source;
use crate::loader::plugin_telemetry_metadata_from_root;
use crate::loader::refresh_curated_plugin_cache;
use crate::loader::refresh_non_curated_plugin_cache;
use crate::loader::refresh_non_curated_plugin_cache_force_reinstall;
@@ -48,7 +46,7 @@ use crate::startup_sync::sync_openai_plugins_repo;
use crate::store::PluginInstallResult as StorePluginInstallResult;
use crate::store::PluginStore;
use crate::store::PluginStoreError;
use codex_analytics::AnalyticsEventsClient;
use crate::store::validate_plugin_version_segment;
use codex_config::ConfigEditsBuilder;
use codex_config::ConfigLayerStack;
use codex_config::types::PluginConfig;
@@ -358,12 +356,12 @@ pub struct PluginsManager {
remote_installed_plugins_cache_refresh_state: RwLock<RemoteInstalledPluginsCacheRefreshState>,
remote_sync_lock: Semaphore,
restriction_product: Option<Product>,
analytics_events_client: RwLock<Option<AnalyticsEventsClient>>,
}
#[derive(Clone)]
struct CachedPluginLoadOutcome {
config_version: String,
remote_plugins_enabled: bool,
plugin_hooks_enabled: bool,
outcome: PluginLoadOutcome,
}
@@ -399,18 +397,9 @@ impl PluginsManager {
),
remote_sync_lock: Semaphore::new(/*permits*/ 1),
restriction_product,
analytics_events_client: RwLock::new(None),
}
}
pub fn set_analytics_events_client(&self, analytics_events_client: AnalyticsEventsClient) {
let mut stored_client = match self.analytics_events_client.write() {
Ok(client_guard) => client_guard,
Err(err) => err.into_inner(),
};
*stored_client = Some(analytics_events_client);
}
fn restriction_product_matches(&self, products: Option<&[Product]>) -> bool {
match products {
None => true,
@@ -452,8 +441,11 @@ impl PluginsManager {
let config_version = version_for_toml(&config_layer_stack.effective_config());
if !force_reload
&& let Some(outcome) =
self.cached_enabled_outcome(&config_version, plugin_hooks_enabled)
&& let Some(outcome) = self.cached_enabled_outcome(
&config_version,
remote_plugins_enabled,
plugin_hooks_enabled,
)
{
return outcome;
}
@@ -473,6 +465,7 @@ impl PluginsManager {
};
*cache = Some(CachedPluginLoadOutcome {
config_version,
remote_plugins_enabled,
plugin_hooks_enabled,
outcome: outcome.clone(),
});
@@ -538,6 +531,7 @@ impl PluginsManager {
fn cached_enabled_outcome(
&self,
config_version: &str,
remote_plugins_enabled: bool,
plugin_hooks_enabled: bool,
) -> Option<PluginLoadOutcome> {
match self.cached_enabled_outcome.read() {
@@ -545,6 +539,7 @@ impl PluginsManager {
.as_ref()
.filter(|cached| {
cached.config_version == config_version
&& cached.remote_plugins_enabled == remote_plugins_enabled
&& cached.plugin_hooks_enabled == plugin_hooks_enabled
})
.map(|cached| cached.outcome.clone()),
@@ -553,6 +548,7 @@ impl PluginsManager {
.as_ref()
.filter(|cached| {
cached.config_version == config_version
&& cached.remote_plugins_enabled == remote_plugins_enabled
&& cached.plugin_hooks_enabled == plugin_hooks_enabled
})
.map(|cached| cached.outcome.clone()),
@@ -828,17 +824,6 @@ impl PluginsManager {
.await
.map_err(PluginInstallError::join)??;
let analytics_events_client = match self.analytics_events_client.read() {
Ok(client) => client.clone(),
Err(err) => err.into_inner().clone(),
};
if let Some(analytics_events_client) = analytics_events_client {
analytics_events_client.track_plugin_installed(
plugin_telemetry_metadata_from_root(&result.plugin_id, &result.installed_path)
.await,
);
}
Ok(PluginInstallOutcome {
plugin_id: result.plugin_id,
plugin_version: result.plugin_version,
@@ -874,27 +859,12 @@ impl PluginsManager {
}
async fn uninstall_plugin_id(&self, plugin_id: PluginId) -> Result<(), PluginUninstallError> {
let plugin_telemetry = if self.store.active_plugin_root(&plugin_id).is_some() {
Some(installed_plugin_telemetry_metadata(self.codex_home.as_path(), &plugin_id).await)
} else {
None
};
let store = self.store.clone();
let plugin_id_for_store = plugin_id.clone();
tokio::task::spawn_blocking(move || store.uninstall(&plugin_id_for_store))
.await
.map_err(PluginUninstallError::join)??;
let analytics_events_client = match self.analytics_events_client.read() {
Ok(client) => client.clone(),
Err(err) => err.into_inner().clone(),
};
if let Some(plugin_telemetry) = plugin_telemetry
&& let Some(analytics_events_client) = analytics_events_client
{
analytics_events_client.track_plugin_uninstalled(plugin_telemetry);
}
Ok(())
}
@@ -1708,18 +1678,112 @@ impl PluginsManager {
) -> Result<bool, RemotePluginCatalogError> {
let installed_plugins =
crate::remote::fetch_remote_installed_plugins(service_config, auth).await?;
let previous_plugins_by_key = {
let cache = match self.remote_installed_plugins_cache.read() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
cache
.as_ref()
.map(|plugins| {
plugins
.iter()
.map(|plugin| {
(
(
plugin.marketplace_name.clone(),
plugin.id.clone(),
plugin.name.clone(),
),
plugin.clone(),
)
})
.collect::<HashMap<_, _>>()
})
.unwrap_or_default()
};
let mut bundles_changed = false;
let mut publishable_plugins = Vec::new();
for plugin in installed_plugins {
let previous_plugin = previous_plugins_by_key
.get(&(
plugin.marketplace_name.clone(),
plugin.id.clone(),
plugin.name.clone(),
))
.cloned();
let plugin_id =
match PluginId::new(plugin.name.clone(), plugin.marketplace_name.clone()) {
Ok(plugin_id) => plugin_id,
Err(err) => {
warn!(
remote_plugin_id = %plugin.id,
marketplace = %plugin.marketplace_name,
plugin = %plugin.name,
error = %err,
"ignoring remote installed plugin with invalid local plugin id"
);
continue;
}
};
let release_version = plugin
.release_version
.as_deref()
.map(str::trim)
.filter(|version| !version.is_empty());
let Some(release_version) = release_version else {
if self.store.active_plugin_root(&plugin_id).is_some() {
publishable_plugins.push(plugin);
} else if let Some(mut previous_plugin) = previous_plugin {
previous_plugin.enabled = plugin.enabled;
publishable_plugins.push(previous_plugin);
} else {
warn!(
remote_plugin_id = %plugin.id,
marketplace = %plugin.marketplace_name,
plugin = %plugin.name,
"remote installed plugin is missing release metadata and no local bundle is available"
);
}
continue;
};
if let Err(message) = validate_plugin_version_segment(release_version) {
if let Some(mut previous_plugin) = previous_plugin {
previous_plugin.enabled = plugin.enabled;
publishable_plugins.push(previous_plugin);
}
warn!(
remote_plugin_id = %plugin.id,
marketplace = %plugin.marketplace_name,
plugin = %plugin.name,
version = %release_version,
error = %message,
"ignoring remote installed plugin with invalid release version"
);
continue;
}
if self.store.active_plugin_version(&plugin_id).as_deref() == Some(release_version) {
publishable_plugins.push(plugin);
continue;
}
let validated_bundle = match validate_remote_plugin_bundle(
&plugin.id,
&plugin.marketplace_name,
&plugin.name,
plugin.release_version.as_deref(),
Some(release_version),
plugin.bundle_download_url.as_deref(),
) {
Ok(bundle) => bundle,
Err(err) => {
if let Some(mut previous_plugin) = previous_plugin {
previous_plugin.enabled = plugin.enabled;
publishable_plugins.push(previous_plugin);
}
warn!(
remote_plugin_id = %plugin.id,
marketplace = %plugin.marketplace_name,
@@ -1730,14 +1794,6 @@ impl PluginsManager {
continue;
}
};
let plugin_id = validated_bundle.plugin_id.clone();
let plugin_version = validated_bundle.plugin_version.clone();
if self.store.active_plugin_version(&plugin_id).as_deref()
== Some(plugin_version.as_str())
{
publishable_plugins.push(plugin);
continue;
}
match download_and_install_remote_plugin_bundle(
self.codex_home.clone(),
validated_bundle,
@@ -1755,6 +1811,10 @@ impl PluginsManager {
publishable_plugins.push(plugin);
}
Err(err) => {
if let Some(mut previous_plugin) = previous_plugin {
previous_plugin.enabled = plugin.enabled;
publishable_plugins.push(previous_plugin);
}
warn!(
remote_plugin_id = %plugin.id,
marketplace = %plugin.marketplace_name,

View File

@@ -595,6 +595,46 @@ remote_plugin = true
assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global");
}
#[tokio::test]
async fn remote_installed_cache_reloads_when_remote_feature_enablement_changes() {
let codex_home = TempDir::new().unwrap();
let plugin_base = codex_home
.path()
.join("plugins/cache/chatgpt-global/linear");
write_plugin(&plugin_base, "local", "linear");
write_file(
&codex_home.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
remote_plugin = true
"#,
);
let config = load_config(codex_home.path(), codex_home.path()).await;
let manager = PluginsManager::new(codex_home.path().to_path_buf());
manager.write_remote_installed_plugins_cache(vec![RemoteInstalledPlugin {
marketplace_name: "chatgpt-global".to_string(),
id: "plugins~Plugin_linear".to_string(),
name: "linear".to_string(),
enabled: true,
release_version: Some("local".to_string()),
bundle_download_url: Some("https://example.com/linear.tar.gz".to_string()),
}]);
let enabled_outcome = manager.plugins_for_test_config(&config).await;
assert_eq!(enabled_outcome.plugins().len(), 1);
let disabled_outcome = PluginsManager::plugins_for_config(
&manager,
&config.config_layer_stack,
config.plugins_enabled,
/*remote_plugins_enabled*/ false,
config.plugin_hooks_enabled,
)
.await;
assert_eq!(disabled_outcome, PluginLoadOutcome::default());
}
#[tokio::test]
async fn remote_installed_cache_ignores_plugins_missing_local_cache() {
let codex_home = TempDir::new().unwrap();
@@ -622,8 +662,11 @@ remote_plugin = true
}
#[tokio::test]
async fn remote_installed_cache_ignores_plugins_with_stale_local_version() {
async fn remote_installed_cache_uses_available_local_cache_without_version_gate() {
let codex_home = TempDir::new().unwrap();
let linear_root = codex_home
.path()
.join("plugins/cache/chatgpt-global/linear/1.0.0");
write_plugin(
&codex_home.path().join("plugins/cache/chatgpt-global"),
"linear/1.0.0",
@@ -649,7 +692,51 @@ remote_plugin = true
}]);
let outcome = manager.plugins_for_test_config(&config).await;
assert_eq!(outcome, PluginLoadOutcome::default());
assert_eq!(
outcome.effective_skill_roots(),
vec![AbsolutePathBuf::try_from(linear_root.join("skills")).unwrap()]
);
assert_eq!(outcome.plugins().len(), 1);
assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global");
}
#[tokio::test]
async fn remote_installed_cache_uses_available_local_cache_without_release_metadata() {
let codex_home = TempDir::new().unwrap();
let linear_root = codex_home
.path()
.join("plugins/cache/chatgpt-global/linear/local");
write_plugin(
&codex_home.path().join("plugins/cache/chatgpt-global"),
"linear/local",
"linear",
);
write_file(
&codex_home.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
remote_plugin = true
"#,
);
let config = load_config(codex_home.path(), codex_home.path()).await;
let manager = PluginsManager::new(codex_home.path().to_path_buf());
manager.write_remote_installed_plugins_cache(vec![RemoteInstalledPlugin {
marketplace_name: "chatgpt-global".to_string(),
id: "plugins~Plugin_linear".to_string(),
name: "linear".to_string(),
enabled: true,
release_version: None,
bundle_download_url: None,
}]);
let outcome = manager.plugins_for_test_config(&config).await;
assert_eq!(
outcome.effective_skill_roots(),
vec![AbsolutePathBuf::try_from(linear_root.join("skills")).unwrap()]
);
assert_eq!(outcome.plugins().len(), 1);
assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global");
}
async fn mount_remote_installed_plugin_pages(
@@ -808,6 +895,62 @@ remote_plugin = true
assert_eq!(outcome, PluginLoadOutcome::default());
}
#[tokio::test]
async fn remote_installed_plugins_cache_refresh_preserves_previous_bundle_when_upgrade_unavailable()
{
let tmp = tempfile::tempdir().unwrap();
let linear_root = tmp.path().join("plugins/cache/chatgpt-global/linear/1.0.0");
write_plugin(
&tmp.path().join("plugins/cache/chatgpt-global"),
"linear/1.0.0",
"linear",
);
write_file(
&tmp.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
remote_plugin = true
"#,
);
let server = MockServer::start().await;
mount_remote_installed_plugin_pages(
&server,
&remote_installed_plugin_json_with_release(
"linear", /*enabled*/ true, "2.0.0", /*bundle_download_url*/ None,
),
"",
)
.await;
let config = load_config(tmp.path(), tmp.path()).await;
let manager = PluginsManager::new(tmp.path().to_path_buf());
manager.write_remote_installed_plugins_cache(vec![RemoteInstalledPlugin {
marketplace_name: "chatgpt-global".to_string(),
id: "plugins~Plugin_linear".to_string(),
name: "linear".to_string(),
enabled: true,
release_version: Some("1.0.0".to_string()),
bundle_download_url: Some("https://example.com/linear-1.0.0.tar.gz".to_string()),
}]);
let changed = manager
.refresh_remote_installed_plugins_cache(
&remote_plugin_service_config(&format!("{}/backend-api/", server.uri())),
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await
.unwrap();
assert!(!changed);
let outcome = manager.plugins_for_test_config(&config).await;
assert_eq!(
outcome.effective_skill_roots(),
vec![AbsolutePathBuf::try_from(linear_root.join("skills")).unwrap()]
);
assert_eq!(outcome.plugins().len(), 1);
assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global");
}
#[tokio::test]
async fn remote_installed_plugins_cache_refresh_reconciles_cached_bundles_without_config_writes() {
let tmp = tempfile::tempdir().unwrap();