mirror of
https://github.com/openai/codex.git
synced 2026-05-24 13:04:29 +00:00
## Why Config loading had become split across crates: `codex-config` owned the config types and merge logic, while `codex-core` still owned the loader that assembled the layer stack. This change consolidates that responsibility in `codex-config`, so the crate that defines config behavior also owns how configs are discovered and loaded. To make that move possible without reintroducing the old dependency cycle, the shell-environment policy types and helpers that `codex-exec-server` needs now live in `codex-protocol` instead of flowing through `codex-config`. This also makes the migrated loader tests more deterministic on machines that already have managed or system Codex config installed by letting tests override the system config and requirements paths instead of reading the host's `/etc/codex`. ## What Changed - moved the config loader implementation from `codex-core` into `codex-config::loader` and deleted the old `core::config_loader` module instead of leaving a compatibility shim - moved shell-environment policy types and helpers into `codex-protocol`, then updated `codex-exec-server` and other downstream crates to import them from their new home - updated downstream callers to use loader/config APIs from `codex-config` - added test-only loader overrides for system config and requirements paths so loader-focused tests do not depend on host-managed config state - cleaned up now-unused dependency entries and platform-specific cfgs that were surfaced by post-push CI ## Testing - `cargo test -p codex-config` - `cargo test -p codex-core config_loader_tests::` - `cargo test -p codex-protocol -p codex-exec-server -p codex-cloud-requirements -p codex-rmcp-client --lib` - `cargo test --lib -p codex-app-server-client -p codex-exec` - `cargo test --no-run --lib -p codex-app-server` - `cargo test -p codex-linux-sandbox --lib` - `cargo shear` - `just bazel-lock-check` ## Notes - I did not chase unrelated full-suite failures outside the migrated loader surface. - `cargo test -p codex-core --lib` still hits unrelated proxy-sensitive failures on this machine, and Windows CI still shows unrelated long-running/timeouting test noise outside the loader migration itself.
1593 lines
58 KiB
Rust
1593 lines
58 KiB
Rust
use super::PluginLoadOutcome;
|
|
use super::startup_sync::start_startup_remote_plugin_sync_once;
|
|
use crate::SkillMetadata;
|
|
use crate::config::Config;
|
|
use crate::config::edit::ConfigEdit;
|
|
use crate::config::edit::ConfigEditsBuilder;
|
|
use codex_analytics::AnalyticsEventsClient;
|
|
use codex_config::ConfigLayerStack;
|
|
use codex_config::types::PluginConfig;
|
|
use codex_core_plugins::OPENAI_CURATED_MARKETPLACE_NAME;
|
|
use codex_core_plugins::installed_marketplaces::installed_marketplace_roots_from_layer_stack;
|
|
use codex_core_plugins::loader::configured_curated_plugin_ids_from_codex_home;
|
|
use codex_core_plugins::loader::curated_plugin_cache_version;
|
|
use codex_core_plugins::loader::installed_plugin_telemetry_metadata;
|
|
use codex_core_plugins::loader::load_plugin_apps;
|
|
use codex_core_plugins::loader::load_plugin_mcp_servers;
|
|
use codex_core_plugins::loader::load_plugin_skills;
|
|
use codex_core_plugins::loader::load_plugins_from_layer_stack;
|
|
use codex_core_plugins::loader::log_plugin_load_errors;
|
|
use codex_core_plugins::loader::materialize_marketplace_plugin_source;
|
|
use codex_core_plugins::loader::plugin_telemetry_metadata_from_root;
|
|
use codex_core_plugins::loader::refresh_curated_plugin_cache;
|
|
use codex_core_plugins::loader::refresh_non_curated_plugin_cache;
|
|
use codex_core_plugins::loader::refresh_non_curated_plugin_cache_force_reinstall;
|
|
use codex_core_plugins::manifest::PluginManifestInterface;
|
|
use codex_core_plugins::manifest::load_plugin_manifest;
|
|
use codex_core_plugins::marketplace::MarketplaceError;
|
|
use codex_core_plugins::marketplace::MarketplaceInterface;
|
|
use codex_core_plugins::marketplace::MarketplaceListError;
|
|
use codex_core_plugins::marketplace::MarketplacePluginAuthPolicy;
|
|
use codex_core_plugins::marketplace::MarketplacePluginPolicy;
|
|
use codex_core_plugins::marketplace::MarketplacePluginSource;
|
|
use codex_core_plugins::marketplace::ResolvedMarketplacePlugin;
|
|
use codex_core_plugins::marketplace::find_installable_marketplace_plugin;
|
|
use codex_core_plugins::marketplace::find_marketplace_plugin;
|
|
use codex_core_plugins::marketplace::list_marketplaces;
|
|
use codex_core_plugins::marketplace::load_marketplace;
|
|
use codex_core_plugins::marketplace::plugin_interface_with_marketplace_category;
|
|
use codex_core_plugins::marketplace_upgrade::ConfiguredMarketplaceUpgradeError;
|
|
use codex_core_plugins::marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome;
|
|
use codex_core_plugins::marketplace_upgrade::configured_git_marketplace_names;
|
|
use codex_core_plugins::marketplace_upgrade::upgrade_configured_git_marketplaces;
|
|
use codex_core_plugins::remote::RemotePluginServiceConfig;
|
|
use codex_core_plugins::remote_legacy::RemotePluginFetchError;
|
|
use codex_core_plugins::remote_legacy::RemotePluginMutationError;
|
|
use codex_core_plugins::startup_sync::curated_plugins_repo_path;
|
|
use codex_core_plugins::startup_sync::read_curated_plugins_sha;
|
|
use codex_core_plugins::startup_sync::sync_openai_plugins_repo;
|
|
use codex_core_plugins::store::PluginInstallResult as StorePluginInstallResult;
|
|
use codex_core_plugins::store::PluginStore;
|
|
use codex_core_plugins::store::PluginStoreError;
|
|
use codex_features::Feature;
|
|
use codex_login::AuthManager;
|
|
use codex_login::CodexAuth;
|
|
use codex_plugin::AppConnectorId;
|
|
use codex_plugin::PluginCapabilitySummary;
|
|
use codex_plugin::PluginId;
|
|
use codex_plugin::PluginIdError;
|
|
use codex_plugin::prompt_safe_plugin_description;
|
|
use codex_protocol::protocol::Product;
|
|
use codex_utils_absolute_path::AbsolutePathBuf;
|
|
use std::collections::HashMap;
|
|
use std::collections::HashSet;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use std::sync::RwLock;
|
|
use std::sync::atomic::AtomicBool;
|
|
use std::sync::atomic::Ordering;
|
|
use std::time::Instant;
|
|
use tokio::sync::Semaphore;
|
|
use toml_edit::value;
|
|
use tracing::info;
|
|
use tracing::warn;
|
|
|
|
static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false);
|
|
const FEATURED_PLUGIN_IDS_CACHE_TTL: std::time::Duration =
|
|
std::time::Duration::from_secs(60 * 60 * 3);
|
|
|
|
#[derive(Clone, PartialEq, Eq)]
|
|
struct FeaturedPluginIdsCacheKey {
|
|
chatgpt_base_url: String,
|
|
account_id: Option<String>,
|
|
chatgpt_user_id: Option<String>,
|
|
is_workspace_account: bool,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct CachedFeaturedPluginIds {
|
|
key: FeaturedPluginIdsCacheKey,
|
|
expires_at: Instant,
|
|
featured_plugin_ids: Vec<String>,
|
|
}
|
|
|
|
#[derive(Clone, PartialEq, Eq)]
|
|
struct NonCuratedCacheRefreshRequest {
|
|
roots: Vec<AbsolutePathBuf>,
|
|
mode: NonCuratedCacheRefreshMode,
|
|
}
|
|
|
|
#[derive(Clone, Copy, PartialEq, Eq)]
|
|
enum NonCuratedCacheRefreshMode {
|
|
IfVersionChanged,
|
|
ForceReinstall,
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct NonCuratedCacheRefreshState {
|
|
requested: Option<NonCuratedCacheRefreshRequest>,
|
|
last_refreshed: Option<NonCuratedCacheRefreshRequest>,
|
|
in_flight: bool,
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct ConfiguredMarketplaceUpgradeState {
|
|
in_flight: bool,
|
|
}
|
|
|
|
fn remote_plugin_service_config(config: &Config) -> RemotePluginServiceConfig {
|
|
RemotePluginServiceConfig {
|
|
chatgpt_base_url: config.chatgpt_base_url.clone(),
|
|
}
|
|
}
|
|
|
|
fn featured_plugin_ids_cache_key(
|
|
config: &Config,
|
|
auth: Option<&CodexAuth>,
|
|
) -> FeaturedPluginIdsCacheKey {
|
|
FeaturedPluginIdsCacheKey {
|
|
chatgpt_base_url: config.chatgpt_base_url.clone(),
|
|
account_id: auth.and_then(CodexAuth::get_account_id),
|
|
chatgpt_user_id: auth.and_then(CodexAuth::get_chatgpt_user_id),
|
|
is_workspace_account: auth.is_some_and(CodexAuth::is_workspace_account),
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct PluginInstallRequest {
|
|
pub plugin_name: String,
|
|
pub marketplace_path: AbsolutePathBuf,
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct PluginReadRequest {
|
|
pub plugin_name: String,
|
|
pub marketplace_path: AbsolutePathBuf,
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct PluginInstallOutcome {
|
|
pub plugin_id: PluginId,
|
|
pub plugin_version: String,
|
|
pub installed_path: AbsolutePathBuf,
|
|
pub auth_policy: MarketplacePluginAuthPolicy,
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub struct PluginReadOutcome {
|
|
pub marketplace_name: String,
|
|
pub marketplace_path: Option<AbsolutePathBuf>,
|
|
pub plugin: PluginDetail,
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub struct PluginDetail {
|
|
pub id: String,
|
|
pub name: String,
|
|
pub description: Option<String>,
|
|
pub source: MarketplacePluginSource,
|
|
pub policy: MarketplacePluginPolicy,
|
|
pub interface: Option<PluginManifestInterface>,
|
|
pub installed: bool,
|
|
pub enabled: bool,
|
|
pub skills: Vec<SkillMetadata>,
|
|
pub disabled_skill_paths: HashSet<AbsolutePathBuf>,
|
|
pub apps: Vec<AppConnectorId>,
|
|
pub mcp_server_names: Vec<String>,
|
|
pub details_unavailable_reason: Option<PluginDetailsUnavailableReason>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum PluginDetailsUnavailableReason {
|
|
InstallRequiredForRemoteSource,
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct ConfiguredMarketplace {
|
|
pub name: String,
|
|
pub path: AbsolutePathBuf,
|
|
pub interface: Option<MarketplaceInterface>,
|
|
pub plugins: Vec<ConfiguredMarketplacePlugin>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct ConfiguredMarketplacePlugin {
|
|
pub id: String,
|
|
pub name: String,
|
|
pub source: MarketplacePluginSource,
|
|
pub policy: MarketplacePluginPolicy,
|
|
pub interface: Option<PluginManifestInterface>,
|
|
pub installed: bool,
|
|
pub enabled: bool,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
|
pub struct ConfiguredMarketplaceListOutcome {
|
|
pub marketplaces: Vec<ConfiguredMarketplace>,
|
|
pub errors: Vec<MarketplaceListError>,
|
|
}
|
|
|
|
impl From<PluginDetail> for PluginCapabilitySummary {
|
|
fn from(value: PluginDetail) -> Self {
|
|
let has_skills = value.skills.iter().any(|skill| {
|
|
!value
|
|
.disabled_skill_paths
|
|
.contains(&skill.path_to_skills_md)
|
|
});
|
|
Self {
|
|
config_name: value.id,
|
|
display_name: value.name,
|
|
description: prompt_safe_plugin_description(value.description.as_deref()),
|
|
has_skills,
|
|
mcp_server_names: value.mcp_server_names,
|
|
app_connector_ids: value.apps,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
|
pub struct RemotePluginSyncResult {
|
|
/// Plugin ids newly installed into the local plugin cache.
|
|
pub installed_plugin_ids: Vec<String>,
|
|
/// Plugin ids whose local config was changed to enabled.
|
|
pub enabled_plugin_ids: Vec<String>,
|
|
/// Plugin ids whose local config was changed to disabled.
|
|
/// This is not populated by `sync_plugins_from_remote`.
|
|
pub disabled_plugin_ids: Vec<String>,
|
|
/// Plugin ids removed from local cache or plugin config.
|
|
pub uninstalled_plugin_ids: Vec<String>,
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum PluginRemoteSyncError {
|
|
#[error("chatgpt authentication required to sync remote plugins")]
|
|
AuthRequired,
|
|
|
|
#[error(
|
|
"chatgpt authentication required to sync remote plugins; api key auth is not supported"
|
|
)]
|
|
UnsupportedAuthMode,
|
|
|
|
#[error("failed to read auth token for remote plugin sync: {0}")]
|
|
AuthToken(#[source] std::io::Error),
|
|
|
|
#[error("failed to send remote plugin sync request to {url}: {source}")]
|
|
Request {
|
|
url: String,
|
|
#[source]
|
|
source: reqwest::Error,
|
|
},
|
|
|
|
#[error("remote plugin sync request to {url} failed with status {status}: {body}")]
|
|
UnexpectedStatus {
|
|
url: String,
|
|
status: reqwest::StatusCode,
|
|
body: String,
|
|
},
|
|
|
|
#[error("failed to parse remote plugin sync response from {url}: {source}")]
|
|
Decode {
|
|
url: String,
|
|
#[source]
|
|
source: serde_json::Error,
|
|
},
|
|
|
|
#[error("local curated marketplace is not available")]
|
|
LocalMarketplaceNotFound,
|
|
|
|
#[error("remote marketplace `{marketplace_name}` is not available locally")]
|
|
UnknownRemoteMarketplace { marketplace_name: String },
|
|
|
|
#[error("duplicate remote plugin `{plugin_name}` in sync response")]
|
|
DuplicateRemotePlugin { plugin_name: String },
|
|
|
|
#[error(
|
|
"remote plugin `{plugin_name}` was not found in local marketplace `{marketplace_name}`"
|
|
)]
|
|
UnknownRemotePlugin {
|
|
plugin_name: String,
|
|
marketplace_name: String,
|
|
},
|
|
|
|
#[error("{0}")]
|
|
InvalidPluginId(#[from] PluginIdError),
|
|
|
|
#[error("{0}")]
|
|
Marketplace(#[from] MarketplaceError),
|
|
|
|
#[error("{0}")]
|
|
Store(#[from] PluginStoreError),
|
|
|
|
#[error("{0}")]
|
|
Config(#[from] anyhow::Error),
|
|
|
|
#[error("failed to join remote plugin sync task: {0}")]
|
|
Join(#[from] tokio::task::JoinError),
|
|
}
|
|
|
|
impl PluginRemoteSyncError {
|
|
fn join(source: tokio::task::JoinError) -> Self {
|
|
Self::Join(source)
|
|
}
|
|
}
|
|
|
|
impl From<RemotePluginFetchError> for PluginRemoteSyncError {
|
|
fn from(value: RemotePluginFetchError) -> Self {
|
|
match value {
|
|
RemotePluginFetchError::AuthRequired => Self::AuthRequired,
|
|
RemotePluginFetchError::UnsupportedAuthMode => Self::UnsupportedAuthMode,
|
|
RemotePluginFetchError::AuthToken(source) => Self::AuthToken(source),
|
|
RemotePluginFetchError::Request { url, source } => Self::Request { url, source },
|
|
RemotePluginFetchError::UnexpectedStatus { url, status, body } => {
|
|
Self::UnexpectedStatus { url, status, body }
|
|
}
|
|
RemotePluginFetchError::Decode { url, source } => Self::Decode { url, source },
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct PluginsManager {
|
|
codex_home: PathBuf,
|
|
store: PluginStore,
|
|
featured_plugin_ids_cache: RwLock<Option<CachedFeaturedPluginIds>>,
|
|
configured_marketplace_upgrade_state: RwLock<ConfiguredMarketplaceUpgradeState>,
|
|
non_curated_cache_refresh_state: RwLock<NonCuratedCacheRefreshState>,
|
|
cached_enabled_outcome: RwLock<Option<PluginLoadOutcome>>,
|
|
remote_sync_lock: Semaphore,
|
|
restriction_product: Option<Product>,
|
|
analytics_events_client: RwLock<Option<AnalyticsEventsClient>>,
|
|
}
|
|
|
|
impl PluginsManager {
|
|
pub fn new(codex_home: PathBuf) -> Self {
|
|
Self::new_with_restriction_product(codex_home, Some(Product::Codex))
|
|
}
|
|
|
|
pub fn new_with_restriction_product(
|
|
codex_home: PathBuf,
|
|
restriction_product: Option<Product>,
|
|
) -> Self {
|
|
// Product restrictions are enforced at marketplace admission time for a given CODEX_HOME:
|
|
// listing, install, and curated refresh all consult this restriction context before new
|
|
// plugins enter local config or cache. After admission, runtime plugin loading trusts the
|
|
// contents of that CODEX_HOME and does not re-filter configured plugins by product, so
|
|
// already-admitted plugins may continue exposing MCP servers/tools from shared local state.
|
|
//
|
|
// This assumes a single CODEX_HOME is only used by one product.
|
|
Self {
|
|
codex_home: codex_home.clone(),
|
|
store: PluginStore::new(codex_home),
|
|
featured_plugin_ids_cache: RwLock::new(None),
|
|
configured_marketplace_upgrade_state: RwLock::new(
|
|
ConfiguredMarketplaceUpgradeState::default(),
|
|
),
|
|
non_curated_cache_refresh_state: RwLock::new(NonCuratedCacheRefreshState::default()),
|
|
cached_enabled_outcome: RwLock::new(None),
|
|
remote_sync_lock: Semaphore::new(/*permits*/ 1),
|
|
restriction_product,
|
|
analytics_events_client: RwLock::new(None),
|
|
}
|
|
}
|
|
|
|
pub fn set_analytics_events_client(&self, analytics_events_client: AnalyticsEventsClient) {
|
|
let mut stored_client = match self.analytics_events_client.write() {
|
|
Ok(client_guard) => client_guard,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
*stored_client = Some(analytics_events_client);
|
|
}
|
|
|
|
fn restriction_product_matches(&self, products: Option<&[Product]>) -> bool {
|
|
match products {
|
|
None => true,
|
|
Some([]) => false,
|
|
Some(products) => self
|
|
.restriction_product
|
|
.is_some_and(|product| product.matches_product_restriction(products)),
|
|
}
|
|
}
|
|
|
|
pub async fn plugins_for_config(&self, config: &Config) -> PluginLoadOutcome {
|
|
self.plugins_for_config_with_force_reload(config, /*force_reload*/ false)
|
|
.await
|
|
}
|
|
|
|
pub(crate) async fn plugins_for_config_with_force_reload(
|
|
&self,
|
|
config: &Config,
|
|
force_reload: bool,
|
|
) -> PluginLoadOutcome {
|
|
if !config.features.enabled(Feature::Plugins) {
|
|
return PluginLoadOutcome::default();
|
|
}
|
|
|
|
if !force_reload && let Some(outcome) = self.cached_enabled_outcome() {
|
|
return outcome;
|
|
}
|
|
|
|
let outcome = load_plugins_from_layer_stack(
|
|
&config.config_layer_stack,
|
|
&self.store,
|
|
self.restriction_product,
|
|
)
|
|
.await;
|
|
log_plugin_load_errors(&outcome);
|
|
let mut cache = match self.cached_enabled_outcome.write() {
|
|
Ok(cache) => cache,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
*cache = Some(outcome.clone());
|
|
outcome
|
|
}
|
|
|
|
pub fn clear_cache(&self) {
|
|
let mut cached_enabled_outcome = match self.cached_enabled_outcome.write() {
|
|
Ok(cache) => cache,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
let mut featured_plugin_ids_cache = match self.featured_plugin_ids_cache.write() {
|
|
Ok(cache) => cache,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
*featured_plugin_ids_cache = None;
|
|
*cached_enabled_outcome = None;
|
|
}
|
|
|
|
/// Resolve plugin skill roots for a config layer stack without touching the plugins cache.
|
|
pub async fn effective_skill_roots_for_layer_stack(
|
|
&self,
|
|
config_layer_stack: &ConfigLayerStack,
|
|
plugins_feature_enabled: bool,
|
|
) -> Vec<AbsolutePathBuf> {
|
|
if !plugins_feature_enabled {
|
|
return Vec::new();
|
|
}
|
|
load_plugins_from_layer_stack(config_layer_stack, &self.store, self.restriction_product)
|
|
.await
|
|
.effective_skill_roots()
|
|
}
|
|
|
|
fn cached_enabled_outcome(&self) -> Option<PluginLoadOutcome> {
|
|
match self.cached_enabled_outcome.read() {
|
|
Ok(cache) => cache.clone(),
|
|
Err(err) => err.into_inner().clone(),
|
|
}
|
|
}
|
|
|
|
fn cached_featured_plugin_ids(
|
|
&self,
|
|
cache_key: &FeaturedPluginIdsCacheKey,
|
|
) -> Option<Vec<String>> {
|
|
{
|
|
let cache = match self.featured_plugin_ids_cache.read() {
|
|
Ok(cache) => cache,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
let now = Instant::now();
|
|
if let Some(cached) = cache.as_ref()
|
|
&& now < cached.expires_at
|
|
&& cached.key == *cache_key
|
|
{
|
|
return Some(cached.featured_plugin_ids.clone());
|
|
}
|
|
}
|
|
|
|
let mut cache = match self.featured_plugin_ids_cache.write() {
|
|
Ok(cache) => cache,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
let now = Instant::now();
|
|
if cache
|
|
.as_ref()
|
|
.is_some_and(|cached| now >= cached.expires_at || cached.key != *cache_key)
|
|
{
|
|
*cache = None;
|
|
}
|
|
None
|
|
}
|
|
|
|
fn write_featured_plugin_ids_cache(
|
|
&self,
|
|
cache_key: FeaturedPluginIdsCacheKey,
|
|
featured_plugin_ids: &[String],
|
|
) {
|
|
let mut cache = match self.featured_plugin_ids_cache.write() {
|
|
Ok(cache) => cache,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
*cache = Some(CachedFeaturedPluginIds {
|
|
key: cache_key,
|
|
expires_at: Instant::now() + FEATURED_PLUGIN_IDS_CACHE_TTL,
|
|
featured_plugin_ids: featured_plugin_ids.to_vec(),
|
|
});
|
|
}
|
|
|
|
pub async fn featured_plugin_ids_for_config(
|
|
&self,
|
|
config: &Config,
|
|
auth: Option<&CodexAuth>,
|
|
) -> Result<Vec<String>, RemotePluginFetchError> {
|
|
if !config.features.enabled(Feature::Plugins) {
|
|
return Ok(Vec::new());
|
|
}
|
|
|
|
let cache_key = featured_plugin_ids_cache_key(config, auth);
|
|
if let Some(featured_plugin_ids) = self.cached_featured_plugin_ids(&cache_key) {
|
|
return Ok(featured_plugin_ids);
|
|
}
|
|
let featured_plugin_ids =
|
|
codex_core_plugins::remote_legacy::fetch_remote_featured_plugin_ids(
|
|
&remote_plugin_service_config(config),
|
|
auth,
|
|
self.restriction_product,
|
|
)
|
|
.await?;
|
|
self.write_featured_plugin_ids_cache(cache_key, &featured_plugin_ids);
|
|
Ok(featured_plugin_ids)
|
|
}
|
|
|
|
pub async fn install_plugin(
|
|
&self,
|
|
request: PluginInstallRequest,
|
|
) -> Result<PluginInstallOutcome, PluginInstallError> {
|
|
let resolved = find_installable_marketplace_plugin(
|
|
&request.marketplace_path,
|
|
&request.plugin_name,
|
|
self.restriction_product,
|
|
)?;
|
|
self.install_resolved_plugin(resolved).await
|
|
}
|
|
|
|
pub async fn install_plugin_with_remote_sync(
|
|
&self,
|
|
config: &Config,
|
|
auth: Option<&CodexAuth>,
|
|
request: PluginInstallRequest,
|
|
) -> Result<PluginInstallOutcome, PluginInstallError> {
|
|
let resolved = find_installable_marketplace_plugin(
|
|
&request.marketplace_path,
|
|
&request.plugin_name,
|
|
self.restriction_product,
|
|
)?;
|
|
let plugin_id = resolved.plugin_id.as_key();
|
|
// This only forwards the backend mutation before the local install flow.
|
|
codex_core_plugins::remote_legacy::enable_remote_plugin(
|
|
&remote_plugin_service_config(config),
|
|
auth,
|
|
&plugin_id,
|
|
)
|
|
.await
|
|
.map_err(PluginInstallError::from)?;
|
|
self.install_resolved_plugin(resolved).await
|
|
}
|
|
|
|
async fn install_resolved_plugin(
|
|
&self,
|
|
resolved: ResolvedMarketplacePlugin,
|
|
) -> Result<PluginInstallOutcome, PluginInstallError> {
|
|
let auth_policy = resolved.policy.authentication;
|
|
let plugin_version =
|
|
if resolved.plugin_id.marketplace_name == OPENAI_CURATED_MARKETPLACE_NAME {
|
|
let curated_plugin_version = read_curated_plugins_sha(self.codex_home.as_path())
|
|
.ok_or_else(|| {
|
|
PluginStoreError::Invalid(
|
|
"local curated marketplace sha is not available".to_string(),
|
|
)
|
|
})?;
|
|
Some(curated_plugin_cache_version(&curated_plugin_version))
|
|
} else {
|
|
None
|
|
};
|
|
let store = self.store.clone();
|
|
let codex_home = self.codex_home.clone();
|
|
let result: StorePluginInstallResult = tokio::task::spawn_blocking(move || {
|
|
let materialized =
|
|
materialize_marketplace_plugin_source(codex_home.as_path(), &resolved.source)
|
|
.map_err(PluginStoreError::Invalid)?;
|
|
let source_path = materialized.path;
|
|
if let Some(plugin_version) = plugin_version {
|
|
store.install_with_version(source_path, resolved.plugin_id, plugin_version)
|
|
} else {
|
|
store.install(source_path, resolved.plugin_id)
|
|
}
|
|
})
|
|
.await
|
|
.map_err(PluginInstallError::join)??;
|
|
|
|
ConfigEditsBuilder::new(&self.codex_home)
|
|
.with_edits([ConfigEdit::SetPath {
|
|
segments: vec![
|
|
"plugins".to_string(),
|
|
result.plugin_id.as_key(),
|
|
"enabled".to_string(),
|
|
],
|
|
value: value(true),
|
|
}])
|
|
.apply()
|
|
.await
|
|
.map_err(PluginInstallError::from)?;
|
|
|
|
let analytics_events_client = match self.analytics_events_client.read() {
|
|
Ok(client) => client.clone(),
|
|
Err(err) => err.into_inner().clone(),
|
|
};
|
|
if let Some(analytics_events_client) = analytics_events_client {
|
|
analytics_events_client.track_plugin_installed(
|
|
plugin_telemetry_metadata_from_root(&result.plugin_id, &result.installed_path)
|
|
.await,
|
|
);
|
|
}
|
|
|
|
Ok(PluginInstallOutcome {
|
|
plugin_id: result.plugin_id,
|
|
plugin_version: result.plugin_version,
|
|
installed_path: result.installed_path,
|
|
auth_policy,
|
|
})
|
|
}
|
|
|
|
pub async fn uninstall_plugin(&self, plugin_id: String) -> Result<(), PluginUninstallError> {
|
|
let plugin_id = PluginId::parse(&plugin_id)?;
|
|
self.uninstall_plugin_id(plugin_id).await
|
|
}
|
|
|
|
pub async fn uninstall_plugin_with_remote_sync(
|
|
&self,
|
|
config: &Config,
|
|
auth: Option<&CodexAuth>,
|
|
plugin_id: String,
|
|
) -> Result<(), PluginUninstallError> {
|
|
// TODO: Remove this legacy remote-sync path once remote plugins have
|
|
// their own manager and installed-state API.
|
|
let plugin_id = PluginId::parse(&plugin_id)?;
|
|
let plugin_key = plugin_id.as_key();
|
|
// This only forwards the backend mutation before the local uninstall flow.
|
|
codex_core_plugins::remote_legacy::uninstall_remote_plugin(
|
|
&remote_plugin_service_config(config),
|
|
auth,
|
|
&plugin_key,
|
|
)
|
|
.await
|
|
.map_err(PluginUninstallError::from)?;
|
|
self.uninstall_plugin_id(plugin_id).await
|
|
}
|
|
|
|
async fn uninstall_plugin_id(&self, plugin_id: PluginId) -> Result<(), PluginUninstallError> {
|
|
let plugin_telemetry = if self.store.active_plugin_root(&plugin_id).is_some() {
|
|
Some(installed_plugin_telemetry_metadata(self.codex_home.as_path(), &plugin_id).await)
|
|
} else {
|
|
None
|
|
};
|
|
let store = self.store.clone();
|
|
let plugin_id_for_store = plugin_id.clone();
|
|
tokio::task::spawn_blocking(move || store.uninstall(&plugin_id_for_store))
|
|
.await
|
|
.map_err(PluginUninstallError::join)??;
|
|
|
|
ConfigEditsBuilder::new(&self.codex_home)
|
|
.with_edits([ConfigEdit::ClearPath {
|
|
segments: vec!["plugins".to_string(), plugin_id.as_key()],
|
|
}])
|
|
.apply()
|
|
.await?;
|
|
|
|
let analytics_events_client = match self.analytics_events_client.read() {
|
|
Ok(client) => client.clone(),
|
|
Err(err) => err.into_inner().clone(),
|
|
};
|
|
if let Some(plugin_telemetry) = plugin_telemetry
|
|
&& let Some(analytics_events_client) = analytics_events_client
|
|
{
|
|
analytics_events_client.track_plugin_uninstalled(plugin_telemetry);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn sync_plugins_from_remote(
|
|
&self,
|
|
config: &Config,
|
|
auth: Option<&CodexAuth>,
|
|
additive_only: bool,
|
|
) -> Result<RemotePluginSyncResult, PluginRemoteSyncError> {
|
|
let _remote_sync_guard = self.remote_sync_lock.acquire().await.map_err(|_| {
|
|
PluginRemoteSyncError::Config(anyhow::anyhow!("remote plugin sync semaphore closed"))
|
|
})?;
|
|
|
|
if !config.features.enabled(Feature::Plugins) {
|
|
return Ok(RemotePluginSyncResult::default());
|
|
}
|
|
|
|
info!("starting remote plugin sync");
|
|
let remote_plugins = codex_core_plugins::remote_legacy::fetch_remote_plugin_status(
|
|
&remote_plugin_service_config(config),
|
|
auth,
|
|
)
|
|
.await
|
|
.map_err(PluginRemoteSyncError::from)?;
|
|
let configured_plugins = configured_plugins_from_stack(&config.config_layer_stack);
|
|
let curated_marketplace_root = curated_plugins_repo_path(self.codex_home.as_path());
|
|
let curated_marketplace_path = AbsolutePathBuf::try_from(
|
|
curated_marketplace_root.join(".agents/plugins/marketplace.json"),
|
|
)
|
|
.map_err(|_| PluginRemoteSyncError::LocalMarketplaceNotFound)?;
|
|
let curated_marketplace = match load_marketplace(&curated_marketplace_path) {
|
|
Ok(marketplace) => marketplace,
|
|
Err(MarketplaceError::MarketplaceNotFound { .. }) => {
|
|
return Err(PluginRemoteSyncError::LocalMarketplaceNotFound);
|
|
}
|
|
Err(err) => return Err(err.into()),
|
|
};
|
|
|
|
let marketplace_name = curated_marketplace.name.clone();
|
|
let curated_plugin_version = read_curated_plugins_sha(self.codex_home.as_path())
|
|
.ok_or_else(|| {
|
|
PluginStoreError::Invalid(
|
|
"local curated marketplace sha is not available".to_string(),
|
|
)
|
|
})?;
|
|
let cache_plugin_version = curated_plugin_cache_version(&curated_plugin_version);
|
|
let mut local_plugins = Vec::<(
|
|
String,
|
|
PluginId,
|
|
AbsolutePathBuf,
|
|
Option<bool>,
|
|
Option<String>,
|
|
bool,
|
|
)>::new();
|
|
let mut local_plugin_names = HashSet::new();
|
|
for plugin in curated_marketplace.plugins {
|
|
let plugin_name = plugin.name;
|
|
if !local_plugin_names.insert(plugin_name.clone()) {
|
|
warn!(
|
|
plugin = plugin_name,
|
|
marketplace = %marketplace_name,
|
|
"ignoring duplicate local plugin entry during remote sync"
|
|
);
|
|
continue;
|
|
}
|
|
|
|
let plugin_id = PluginId::new(plugin_name.clone(), marketplace_name.clone())?;
|
|
let plugin_key = plugin_id.as_key();
|
|
let source_path = match plugin.source {
|
|
MarketplacePluginSource::Local { path } => path,
|
|
MarketplacePluginSource::Git { .. } => {
|
|
warn!(
|
|
plugin = plugin_name,
|
|
marketplace = %marketplace_name,
|
|
"skipping remote plugin source during remote sync"
|
|
);
|
|
continue;
|
|
}
|
|
};
|
|
let current_enabled = configured_plugins
|
|
.get(&plugin_key)
|
|
.map(|plugin| plugin.enabled);
|
|
let installed_version = self.store.active_plugin_version(&plugin_id);
|
|
let product_allowed =
|
|
self.restriction_product_matches(plugin.policy.products.as_deref());
|
|
local_plugins.push((
|
|
plugin_name,
|
|
plugin_id,
|
|
source_path,
|
|
current_enabled,
|
|
installed_version,
|
|
product_allowed,
|
|
));
|
|
}
|
|
|
|
let mut missing_remote_plugins = Vec::<String>::new();
|
|
let mut remote_installed_plugin_names = HashSet::<String>::new();
|
|
for plugin in remote_plugins {
|
|
if plugin.marketplace_name != marketplace_name {
|
|
return Err(PluginRemoteSyncError::UnknownRemoteMarketplace {
|
|
marketplace_name: plugin.marketplace_name,
|
|
});
|
|
}
|
|
if !local_plugin_names.contains(&plugin.name) {
|
|
missing_remote_plugins.push(plugin.name);
|
|
continue;
|
|
}
|
|
// For now, sync treats remote `enabled = false` as uninstall rather than a distinct
|
|
// disabled state.
|
|
// TODO: Switch sync to `plugins/installed` so install and enable states stay distinct.
|
|
if !plugin.enabled {
|
|
continue;
|
|
}
|
|
if !remote_installed_plugin_names.insert(plugin.name.clone()) {
|
|
return Err(PluginRemoteSyncError::DuplicateRemotePlugin {
|
|
plugin_name: plugin.name,
|
|
});
|
|
}
|
|
}
|
|
|
|
let mut config_edits = Vec::new();
|
|
let mut installs = Vec::new();
|
|
let mut uninstalls = Vec::new();
|
|
let mut result = RemotePluginSyncResult::default();
|
|
let remote_plugin_count = remote_installed_plugin_names.len();
|
|
let local_plugin_count = local_plugins.len();
|
|
if !missing_remote_plugins.is_empty() {
|
|
let sample_missing_plugins = missing_remote_plugins
|
|
.iter()
|
|
.take(10)
|
|
.cloned()
|
|
.collect::<Vec<_>>();
|
|
warn!(
|
|
marketplace = %marketplace_name,
|
|
missing_remote_plugin_count = missing_remote_plugins.len(),
|
|
missing_remote_plugin_examples = ?sample_missing_plugins,
|
|
"ignoring remote plugins missing from local marketplace during sync"
|
|
);
|
|
}
|
|
|
|
for (
|
|
plugin_name,
|
|
plugin_id,
|
|
source_path,
|
|
current_enabled,
|
|
installed_version,
|
|
product_allowed,
|
|
) in local_plugins
|
|
{
|
|
let plugin_key = plugin_id.as_key();
|
|
let is_installed = installed_version.is_some();
|
|
if !product_allowed {
|
|
continue;
|
|
}
|
|
if remote_installed_plugin_names.contains(&plugin_name) {
|
|
if !is_installed {
|
|
installs.push((source_path, plugin_id.clone(), cache_plugin_version.clone()));
|
|
}
|
|
if !is_installed {
|
|
result.installed_plugin_ids.push(plugin_key.clone());
|
|
}
|
|
|
|
if current_enabled != Some(true) {
|
|
result.enabled_plugin_ids.push(plugin_key.clone());
|
|
config_edits.push(ConfigEdit::SetPath {
|
|
segments: vec!["plugins".to_string(), plugin_key, "enabled".to_string()],
|
|
value: value(true),
|
|
});
|
|
}
|
|
} else if !additive_only {
|
|
if is_installed {
|
|
uninstalls.push(plugin_id);
|
|
}
|
|
if is_installed || current_enabled.is_some() {
|
|
result.uninstalled_plugin_ids.push(plugin_key.clone());
|
|
}
|
|
if current_enabled.is_some() {
|
|
config_edits.push(ConfigEdit::ClearPath {
|
|
segments: vec!["plugins".to_string(), plugin_key],
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
let store = self.store.clone();
|
|
let store_result = tokio::task::spawn_blocking(move || {
|
|
for (source_path, plugin_id, plugin_version) in installs {
|
|
store.install_with_version(source_path, plugin_id, plugin_version)?;
|
|
}
|
|
for plugin_id in uninstalls {
|
|
store.uninstall(&plugin_id)?;
|
|
}
|
|
Ok::<(), PluginStoreError>(())
|
|
})
|
|
.await
|
|
.map_err(PluginRemoteSyncError::join)?;
|
|
if let Err(err) = store_result {
|
|
self.clear_cache();
|
|
return Err(err.into());
|
|
}
|
|
|
|
let config_result = if config_edits.is_empty() {
|
|
Ok(())
|
|
} else {
|
|
ConfigEditsBuilder::new(&self.codex_home)
|
|
.with_edits(config_edits)
|
|
.apply()
|
|
.await
|
|
};
|
|
self.clear_cache();
|
|
config_result?;
|
|
|
|
info!(
|
|
marketplace = %marketplace_name,
|
|
remote_plugin_count,
|
|
local_plugin_count,
|
|
installed_plugin_ids = ?result.installed_plugin_ids,
|
|
enabled_plugin_ids = ?result.enabled_plugin_ids,
|
|
disabled_plugin_ids = ?result.disabled_plugin_ids,
|
|
uninstalled_plugin_ids = ?result.uninstalled_plugin_ids,
|
|
"completed remote plugin sync"
|
|
);
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
pub fn list_marketplaces_for_config(
|
|
&self,
|
|
config: &Config,
|
|
additional_roots: &[AbsolutePathBuf],
|
|
) -> Result<ConfiguredMarketplaceListOutcome, MarketplaceError> {
|
|
if !config.features.enabled(Feature::Plugins) {
|
|
return Ok(ConfiguredMarketplaceListOutcome::default());
|
|
}
|
|
|
|
let (installed_plugins, enabled_plugins) = self.configured_plugin_states(config);
|
|
let marketplace_outcome =
|
|
list_marketplaces(&self.marketplace_roots(config, additional_roots))?;
|
|
let mut seen_plugin_keys = HashSet::new();
|
|
let marketplaces = marketplace_outcome
|
|
.marketplaces
|
|
.into_iter()
|
|
.filter_map(|marketplace| {
|
|
let marketplace_name = marketplace.name.clone();
|
|
let plugins = marketplace
|
|
.plugins
|
|
.into_iter()
|
|
.filter_map(|plugin| {
|
|
let plugin_key = format!("{}@{marketplace_name}", plugin.name);
|
|
if !seen_plugin_keys.insert(plugin_key.clone()) {
|
|
return None;
|
|
}
|
|
if !self.restriction_product_matches(plugin.policy.products.as_deref()) {
|
|
return None;
|
|
}
|
|
|
|
Some(ConfiguredMarketplacePlugin {
|
|
// Enabled state is keyed by `<plugin>@<marketplace>`, so duplicate
|
|
// plugin entries from duplicate marketplace files intentionally
|
|
// resolve to the first discovered source.
|
|
id: plugin_key.clone(),
|
|
installed: installed_plugins.contains(&plugin_key),
|
|
enabled: enabled_plugins.contains(&plugin_key),
|
|
name: plugin.name,
|
|
source: plugin.source,
|
|
policy: plugin.policy,
|
|
interface: plugin.interface,
|
|
})
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
(!plugins.is_empty()).then_some(ConfiguredMarketplace {
|
|
name: marketplace.name,
|
|
path: marketplace.path,
|
|
interface: marketplace.interface,
|
|
plugins,
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
Ok(ConfiguredMarketplaceListOutcome {
|
|
marketplaces,
|
|
errors: marketplace_outcome.errors,
|
|
})
|
|
}
|
|
|
|
pub async fn read_plugin_for_config(
|
|
&self,
|
|
config: &Config,
|
|
request: &PluginReadRequest,
|
|
) -> Result<PluginReadOutcome, MarketplaceError> {
|
|
if !config.features.enabled(Feature::Plugins) {
|
|
return Err(MarketplaceError::PluginsDisabled);
|
|
}
|
|
|
|
let plugin = find_marketplace_plugin(&request.marketplace_path, &request.plugin_name)?;
|
|
if !self.restriction_product_matches(plugin.policy.products.as_deref()) {
|
|
return Err(MarketplaceError::PluginNotFound {
|
|
plugin_name: plugin.plugin_id.plugin_name,
|
|
marketplace_name: plugin.plugin_id.marketplace_name,
|
|
});
|
|
}
|
|
|
|
let marketplace_name = plugin.plugin_id.marketplace_name.clone();
|
|
let plugin_key = plugin.plugin_id.as_key();
|
|
let (installed_plugins, enabled_plugins) = self.configured_plugin_states(config);
|
|
let plugin = self
|
|
.read_plugin_detail_for_marketplace_plugin(
|
|
config,
|
|
&marketplace_name,
|
|
ConfiguredMarketplacePlugin {
|
|
id: plugin_key.clone(),
|
|
name: plugin.plugin_id.plugin_name,
|
|
source: plugin.source,
|
|
policy: plugin.policy,
|
|
interface: plugin.interface,
|
|
installed: installed_plugins.contains(&plugin_key),
|
|
enabled: enabled_plugins.contains(&plugin_key),
|
|
},
|
|
)
|
|
.await?;
|
|
|
|
Ok(PluginReadOutcome {
|
|
marketplace_name,
|
|
marketplace_path: Some(request.marketplace_path.clone()),
|
|
plugin,
|
|
})
|
|
}
|
|
|
|
pub(crate) async fn read_plugin_detail_for_marketplace_plugin(
|
|
&self,
|
|
config: &Config,
|
|
marketplace_name: &str,
|
|
plugin: ConfiguredMarketplacePlugin,
|
|
) -> Result<PluginDetail, MarketplaceError> {
|
|
if !self.restriction_product_matches(plugin.policy.products.as_deref()) {
|
|
return Err(MarketplaceError::PluginNotFound {
|
|
plugin_name: plugin.name,
|
|
marketplace_name: marketplace_name.to_string(),
|
|
});
|
|
}
|
|
|
|
let plugin_id =
|
|
PluginId::new(plugin.name.clone(), marketplace_name.to_string()).map_err(|err| {
|
|
match err {
|
|
PluginIdError::Invalid(message) => MarketplaceError::InvalidPlugin(message),
|
|
}
|
|
})?;
|
|
let plugin_key = plugin_id.as_key();
|
|
if matches!(plugin.source, MarketplacePluginSource::Git { .. }) && !plugin.installed {
|
|
let description = remote_plugin_install_required_description(&plugin.source);
|
|
return Ok(PluginDetail {
|
|
id: plugin_key,
|
|
name: plugin.name,
|
|
description: Some(description),
|
|
source: plugin.source,
|
|
policy: plugin.policy,
|
|
interface: plugin.interface,
|
|
installed: plugin.installed,
|
|
enabled: plugin.enabled,
|
|
skills: Vec::new(),
|
|
disabled_skill_paths: HashSet::new(),
|
|
apps: Vec::new(),
|
|
mcp_server_names: Vec::new(),
|
|
details_unavailable_reason: Some(
|
|
PluginDetailsUnavailableReason::InstallRequiredForRemoteSource,
|
|
),
|
|
});
|
|
}
|
|
|
|
let source_path =
|
|
if matches!(plugin.source, MarketplacePluginSource::Git { .. }) && plugin.installed {
|
|
self.store.active_plugin_root(&plugin_id).ok_or_else(|| {
|
|
MarketplaceError::InvalidPlugin(format!(
|
|
"installed plugin cache entry is missing for {plugin_key}"
|
|
))
|
|
})?
|
|
} else {
|
|
let codex_home = self.codex_home.clone();
|
|
let source = plugin.source.clone();
|
|
let materialized = tokio::task::spawn_blocking(move || {
|
|
materialize_marketplace_plugin_source(codex_home.as_path(), &source)
|
|
})
|
|
.await
|
|
.map_err(|err| {
|
|
MarketplaceError::InvalidPlugin(format!(
|
|
"failed to materialize plugin source: {err}"
|
|
))
|
|
})?
|
|
.map_err(MarketplaceError::InvalidPlugin)?;
|
|
materialized.path.clone()
|
|
};
|
|
if !source_path.as_path().is_dir() {
|
|
return Err(MarketplaceError::InvalidPlugin(
|
|
"path does not exist or is not a directory".to_string(),
|
|
));
|
|
}
|
|
let manifest = load_plugin_manifest(source_path.as_path()).ok_or_else(|| {
|
|
MarketplaceError::InvalidPlugin("missing or invalid plugin.json".to_string())
|
|
})?;
|
|
let description = manifest.description.clone();
|
|
let marketplace_category = plugin
|
|
.interface
|
|
.as_ref()
|
|
.and_then(|interface| interface.category.clone());
|
|
let interface = plugin_interface_with_marketplace_category(
|
|
manifest.interface.clone(),
|
|
marketplace_category,
|
|
);
|
|
let resolved_skills = load_plugin_skills(
|
|
&source_path,
|
|
&manifest.paths,
|
|
self.restriction_product,
|
|
&codex_core_skills::config_rules::skill_config_rules_from_stack(
|
|
&config.config_layer_stack,
|
|
),
|
|
)
|
|
.await;
|
|
let apps = load_plugin_apps(source_path.as_path()).await;
|
|
let mut mcp_server_names = load_plugin_mcp_servers(source_path.as_path())
|
|
.await
|
|
.into_keys()
|
|
.collect::<Vec<_>>();
|
|
mcp_server_names.sort_unstable();
|
|
mcp_server_names.dedup();
|
|
|
|
Ok(PluginDetail {
|
|
id: plugin.id,
|
|
name: plugin.name,
|
|
description,
|
|
source: plugin.source,
|
|
policy: plugin.policy,
|
|
interface,
|
|
installed: plugin.installed,
|
|
enabled: plugin.enabled,
|
|
skills: resolved_skills.skills,
|
|
disabled_skill_paths: resolved_skills.disabled_skill_paths,
|
|
apps,
|
|
mcp_server_names,
|
|
details_unavailable_reason: None,
|
|
})
|
|
}
|
|
|
|
pub fn maybe_start_plugin_startup_tasks_for_config(
|
|
self: &Arc<Self>,
|
|
config: &Config,
|
|
auth_manager: Arc<AuthManager>,
|
|
) {
|
|
if config.features.enabled(Feature::Plugins) {
|
|
self.start_curated_repo_sync();
|
|
let should_spawn_marketplace_auto_upgrade = {
|
|
let mut state = match self.configured_marketplace_upgrade_state.write() {
|
|
Ok(state) => state,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
if state.in_flight {
|
|
false
|
|
} else {
|
|
state.in_flight = true;
|
|
true
|
|
}
|
|
};
|
|
if should_spawn_marketplace_auto_upgrade {
|
|
let manager = Arc::clone(self);
|
|
let config = config.clone();
|
|
if let Err(err) = std::thread::Builder::new()
|
|
.name("plugins-marketplace-auto-upgrade".to_string())
|
|
.spawn(move || {
|
|
let outcome = manager.upgrade_configured_marketplaces_for_config(
|
|
&config, /*marketplace_name*/ None,
|
|
);
|
|
match outcome {
|
|
Ok(outcome) => {
|
|
for error in outcome.errors {
|
|
warn!(
|
|
marketplace = error.marketplace_name,
|
|
error = %error.message,
|
|
"failed to auto-upgrade configured marketplace"
|
|
);
|
|
}
|
|
}
|
|
Err(err) => {
|
|
warn!("failed to auto-upgrade configured marketplaces: {err}");
|
|
}
|
|
}
|
|
|
|
let mut state = match manager.configured_marketplace_upgrade_state.write() {
|
|
Ok(state) => state,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
state.in_flight = false;
|
|
})
|
|
{
|
|
let mut state = match self.configured_marketplace_upgrade_state.write() {
|
|
Ok(state) => state,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
state.in_flight = false;
|
|
warn!("failed to start configured marketplace auto-upgrade task: {err}");
|
|
}
|
|
}
|
|
start_startup_remote_plugin_sync_once(
|
|
Arc::clone(self),
|
|
self.codex_home.clone(),
|
|
config.clone(),
|
|
auth_manager.clone(),
|
|
);
|
|
|
|
let config = config.clone();
|
|
let manager = Arc::clone(self);
|
|
tokio::spawn(async move {
|
|
let auth = auth_manager.auth().await;
|
|
if let Err(err) = manager
|
|
.featured_plugin_ids_for_config(&config, auth.as_ref())
|
|
.await
|
|
{
|
|
warn!(
|
|
error = %err,
|
|
"failed to warm featured plugin ids cache"
|
|
);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
pub fn upgrade_configured_marketplaces_for_config(
|
|
&self,
|
|
config: &Config,
|
|
marketplace_name: Option<&str>,
|
|
) -> Result<ConfiguredMarketplaceUpgradeOutcome, String> {
|
|
if let Some(marketplace_name) = marketplace_name
|
|
&& !configured_git_marketplace_names(&config.config_layer_stack)
|
|
.iter()
|
|
.any(|name| name == marketplace_name)
|
|
{
|
|
return Err(format!(
|
|
"marketplace `{marketplace_name}` is not configured as a Git marketplace"
|
|
));
|
|
}
|
|
|
|
let mut outcome = upgrade_configured_git_marketplaces(
|
|
self.codex_home.as_path(),
|
|
&config.config_layer_stack,
|
|
marketplace_name,
|
|
);
|
|
if !outcome.upgraded_roots.is_empty() {
|
|
match refresh_non_curated_plugin_cache_force_reinstall(
|
|
self.codex_home.as_path(),
|
|
&outcome.upgraded_roots,
|
|
) {
|
|
Ok(cache_refreshed) => {
|
|
if cache_refreshed {
|
|
self.clear_cache();
|
|
}
|
|
}
|
|
Err(err) => {
|
|
self.clear_cache();
|
|
outcome.errors.push(ConfiguredMarketplaceUpgradeError {
|
|
marketplace_name: marketplace_name
|
|
.unwrap_or("all configured marketplaces")
|
|
.to_string(),
|
|
message: format!(
|
|
"failed to refresh installed plugin cache after marketplace upgrade: {err}"
|
|
),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
Ok(outcome)
|
|
}
|
|
|
|
pub fn maybe_start_non_curated_plugin_cache_refresh(
|
|
self: &Arc<Self>,
|
|
roots: &[AbsolutePathBuf],
|
|
) {
|
|
self.schedule_non_curated_plugin_cache_refresh(
|
|
roots,
|
|
NonCuratedCacheRefreshMode::IfVersionChanged,
|
|
);
|
|
}
|
|
|
|
fn schedule_non_curated_plugin_cache_refresh(
|
|
self: &Arc<Self>,
|
|
roots: &[AbsolutePathBuf],
|
|
mode: NonCuratedCacheRefreshMode,
|
|
) {
|
|
let mut roots = roots.to_vec();
|
|
roots.sort_unstable();
|
|
roots.dedup();
|
|
if roots.is_empty() {
|
|
return;
|
|
}
|
|
let request = NonCuratedCacheRefreshRequest { roots, mode };
|
|
|
|
let should_spawn = {
|
|
let mut state = match self.non_curated_cache_refresh_state.write() {
|
|
Ok(state) => state,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
// Collapse repeated plugin/list requests onto one worker and only queue another pass
|
|
// when the requested roots set actually changes. Forced reinstall requests are not
|
|
// deduped against the last completed pass because the same marketplace root path can
|
|
// point at newly activated files after an auto-upgrade.
|
|
if state.requested.as_ref() == Some(&request)
|
|
|| (mode == NonCuratedCacheRefreshMode::IfVersionChanged
|
|
&& !state.in_flight
|
|
&& state.last_refreshed.as_ref() == Some(&request))
|
|
{
|
|
return;
|
|
}
|
|
if mode == NonCuratedCacheRefreshMode::IfVersionChanged
|
|
&& state.requested.as_ref().is_some_and(|requested| {
|
|
requested.mode == NonCuratedCacheRefreshMode::ForceReinstall
|
|
&& requested.roots == request.roots
|
|
})
|
|
{
|
|
return;
|
|
}
|
|
state.requested = Some(request);
|
|
if state.in_flight {
|
|
false
|
|
} else {
|
|
state.in_flight = true;
|
|
true
|
|
}
|
|
};
|
|
if !should_spawn {
|
|
return;
|
|
}
|
|
|
|
let manager = Arc::clone(self);
|
|
if let Err(err) = std::thread::Builder::new()
|
|
.name("plugins-non-curated-cache-refresh".to_string())
|
|
.spawn(move || manager.run_non_curated_plugin_cache_refresh_loop())
|
|
{
|
|
let mut state = match self.non_curated_cache_refresh_state.write() {
|
|
Ok(state) => state,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
state.in_flight = false;
|
|
state.requested = None;
|
|
warn!("failed to start non-curated plugin cache refresh task: {err}");
|
|
}
|
|
}
|
|
|
|
fn start_curated_repo_sync(self: &Arc<Self>) {
|
|
if CURATED_REPO_SYNC_STARTED.swap(true, Ordering::SeqCst) {
|
|
return;
|
|
}
|
|
let manager = Arc::clone(self);
|
|
let codex_home = self.codex_home.clone();
|
|
if let Err(err) = std::thread::Builder::new()
|
|
.name("plugins-curated-repo-sync".to_string())
|
|
.spawn(
|
|
move || match sync_openai_plugins_repo(codex_home.as_path()) {
|
|
Ok(curated_plugin_version) => {
|
|
let configured_curated_plugin_ids =
|
|
configured_curated_plugin_ids_from_codex_home(codex_home.as_path());
|
|
match refresh_curated_plugin_cache(
|
|
codex_home.as_path(),
|
|
&curated_plugin_version,
|
|
&configured_curated_plugin_ids,
|
|
) {
|
|
Ok(cache_refreshed) => {
|
|
if cache_refreshed {
|
|
manager.clear_cache();
|
|
}
|
|
}
|
|
Err(err) => {
|
|
manager.clear_cache();
|
|
CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst);
|
|
warn!("failed to refresh curated plugin cache after sync: {err}");
|
|
}
|
|
}
|
|
}
|
|
Err(err) => {
|
|
CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst);
|
|
warn!("failed to sync curated plugins repo: {err}");
|
|
}
|
|
},
|
|
)
|
|
{
|
|
CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst);
|
|
warn!("failed to start curated plugins repo sync task: {err}");
|
|
}
|
|
}
|
|
|
|
fn run_non_curated_plugin_cache_refresh_loop(self: Arc<Self>) {
|
|
loop {
|
|
let request = {
|
|
let state = match self.non_curated_cache_refresh_state.read() {
|
|
Ok(state) => state,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
state.requested.clone()
|
|
};
|
|
|
|
let Some(request) = request else {
|
|
let mut state = match self.non_curated_cache_refresh_state.write() {
|
|
Ok(state) => state,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
state.in_flight = false;
|
|
return;
|
|
};
|
|
|
|
let refresh_result = match request.mode {
|
|
NonCuratedCacheRefreshMode::IfVersionChanged => {
|
|
refresh_non_curated_plugin_cache(self.codex_home.as_path(), &request.roots)
|
|
}
|
|
NonCuratedCacheRefreshMode::ForceReinstall => {
|
|
refresh_non_curated_plugin_cache_force_reinstall(
|
|
self.codex_home.as_path(),
|
|
&request.roots,
|
|
)
|
|
}
|
|
};
|
|
let refreshed = match refresh_result {
|
|
Ok(cache_refreshed) => {
|
|
if cache_refreshed {
|
|
self.clear_cache();
|
|
}
|
|
true
|
|
}
|
|
Err(err) => {
|
|
self.clear_cache();
|
|
warn!("failed to refresh non-curated plugin cache: {err}");
|
|
false
|
|
}
|
|
};
|
|
|
|
let mut state = match self.non_curated_cache_refresh_state.write() {
|
|
Ok(state) => state,
|
|
Err(err) => err.into_inner(),
|
|
};
|
|
if refreshed {
|
|
state.last_refreshed = Some(request.clone());
|
|
}
|
|
if state.requested.as_ref() == Some(&request) {
|
|
state.requested = None;
|
|
state.in_flight = false;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
fn configured_plugin_states(&self, config: &Config) -> (HashSet<String>, HashSet<String>) {
|
|
let configured_plugins = configured_plugins_from_stack(&config.config_layer_stack);
|
|
let installed_plugins = configured_plugins
|
|
.keys()
|
|
.filter(|plugin_key| {
|
|
PluginId::parse(plugin_key)
|
|
.ok()
|
|
.is_some_and(|plugin_id| self.store.is_installed(&plugin_id))
|
|
})
|
|
.cloned()
|
|
.collect::<HashSet<_>>();
|
|
let enabled_plugins = configured_plugins
|
|
.into_iter()
|
|
.filter_map(|(plugin_key, plugin)| plugin.enabled.then_some(plugin_key))
|
|
.collect::<HashSet<_>>();
|
|
(installed_plugins, enabled_plugins)
|
|
}
|
|
|
|
fn marketplace_roots(
|
|
&self,
|
|
config: &Config,
|
|
additional_roots: &[AbsolutePathBuf],
|
|
) -> Vec<AbsolutePathBuf> {
|
|
// Treat the curated catalog as an extra marketplace root so plugin listing can surface it
|
|
// without requiring every caller to know where it is stored.
|
|
let mut roots = additional_roots.to_vec();
|
|
roots.extend(installed_marketplace_roots_from_layer_stack(
|
|
&config.config_layer_stack,
|
|
self.codex_home.as_path(),
|
|
));
|
|
let curated_repo_root = curated_plugins_repo_path(self.codex_home.as_path());
|
|
if curated_repo_root.is_dir()
|
|
&& let Ok(curated_repo_root) = AbsolutePathBuf::try_from(curated_repo_root)
|
|
{
|
|
roots.push(curated_repo_root);
|
|
}
|
|
roots.sort_unstable();
|
|
roots.dedup();
|
|
roots
|
|
}
|
|
}
|
|
|
|
fn remote_plugin_install_required_description(source: &MarketplacePluginSource) -> String {
|
|
let source_description = match source {
|
|
MarketplacePluginSource::Git {
|
|
url,
|
|
path,
|
|
ref_name,
|
|
sha,
|
|
} => {
|
|
let mut parts = vec![url.clone()];
|
|
if let Some(path) = path {
|
|
parts.push(format!("path `{path}`"));
|
|
}
|
|
if let Some(ref_name) = ref_name {
|
|
parts.push(format!("ref `{ref_name}`"));
|
|
}
|
|
if let Some(sha) = sha {
|
|
parts.push(format!("sha `{sha}`"));
|
|
}
|
|
parts.join(", ")
|
|
}
|
|
MarketplacePluginSource::Local { path } => path.as_path().display().to_string(),
|
|
};
|
|
|
|
format!(
|
|
"This is a cross-repo plugin. Install it to view more detailed information. The source of the plugin is {source_description}."
|
|
)
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum PluginInstallError {
|
|
#[error("{0}")]
|
|
Marketplace(#[from] MarketplaceError),
|
|
|
|
#[error("{0}")]
|
|
Remote(#[from] RemotePluginMutationError),
|
|
|
|
#[error("{0}")]
|
|
Store(#[from] PluginStoreError),
|
|
|
|
#[error("{0}")]
|
|
Config(#[from] anyhow::Error),
|
|
|
|
#[error("failed to join plugin install task: {0}")]
|
|
Join(#[from] tokio::task::JoinError),
|
|
}
|
|
|
|
impl PluginInstallError {
|
|
fn join(source: tokio::task::JoinError) -> Self {
|
|
Self::Join(source)
|
|
}
|
|
|
|
pub fn is_invalid_request(&self) -> bool {
|
|
matches!(
|
|
self,
|
|
Self::Marketplace(
|
|
MarketplaceError::MarketplaceNotFound { .. }
|
|
| MarketplaceError::InvalidMarketplaceFile { .. }
|
|
| MarketplaceError::PluginNotFound { .. }
|
|
| MarketplaceError::PluginNotAvailable { .. }
|
|
| MarketplaceError::InvalidPlugin(_)
|
|
) | Self::Store(PluginStoreError::Invalid(_))
|
|
)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum PluginUninstallError {
|
|
#[error("{0}")]
|
|
InvalidPluginId(#[from] PluginIdError),
|
|
|
|
#[error("{0}")]
|
|
Remote(#[from] RemotePluginMutationError),
|
|
|
|
#[error("{0}")]
|
|
Store(#[from] PluginStoreError),
|
|
|
|
#[error("{0}")]
|
|
Config(#[from] anyhow::Error),
|
|
|
|
#[error("failed to join plugin uninstall task: {0}")]
|
|
Join(#[from] tokio::task::JoinError),
|
|
}
|
|
|
|
impl PluginUninstallError {
|
|
fn join(source: tokio::task::JoinError) -> Self {
|
|
Self::Join(source)
|
|
}
|
|
|
|
pub fn is_invalid_request(&self) -> bool {
|
|
matches!(self, Self::InvalidPluginId(_))
|
|
}
|
|
}
|
|
|
|
pub(crate) fn configured_plugins_from_stack(
|
|
config_layer_stack: &ConfigLayerStack,
|
|
) -> HashMap<String, PluginConfig> {
|
|
// Plugin entries remain persisted user config only.
|
|
let Some(user_layer) = config_layer_stack.get_user_layer() else {
|
|
return HashMap::new();
|
|
};
|
|
configured_plugins_from_user_config_value(&user_layer.config)
|
|
}
|
|
|
|
fn configured_plugins_from_user_config_value(
|
|
user_config: &toml::Value,
|
|
) -> HashMap<String, PluginConfig> {
|
|
let Some(plugins_value) = user_config.get("plugins") else {
|
|
return HashMap::new();
|
|
};
|
|
match plugins_value.clone().try_into() {
|
|
Ok(plugins) => plugins,
|
|
Err(err) => {
|
|
warn!("invalid plugins config: {err}");
|
|
HashMap::new()
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[path = "manager_tests.rs"]
|
|
mod tests;
|