Compare commits

...

4 Commits

Author SHA1 Message Date
xli-oai
f559d3cd95 codex: rename remote plugin cache fetch helper 2026-05-12 23:47:37 -07:00
xli-oai
6b54c1ab13 codex: rename remote marketplace cache helper 2026-05-12 23:47:37 -07:00
xli-oai
c2be76c6a9 codex: clarify remote marketplace cache helper 2026-05-12 23:47:37 -07:00
xli-oai
a0de847e6c Cache remote plugin marketplaces during startup 2026-05-12 23:47:37 -07:00
4 changed files with 230 additions and 19 deletions

View File

@@ -541,15 +541,9 @@ impl PluginRequestProcessor {
remote_sources.push(RemoteMarketplaceSource::SharedWithMe);
}
if !remote_sources.is_empty() {
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
match codex_core_plugins::remote::fetch_remote_marketplaces(
&remote_plugin_service_config,
auth.as_ref(),
&remote_sources,
)
.await
match plugins_manager
.fetch_remote_plugin_with_caching(&plugins_input, auth.as_ref(), &remote_sources)
.await
{
Ok(remote_marketplaces) => {
for remote_marketplace in remote_marketplaces
@@ -1167,13 +1161,13 @@ impl PluginRequestProcessor {
.await
.map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"))?;
self.thread_manager
.plugins_manager()
.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
&config.plugins_config_input(),
auth.clone(),
Some(self.effective_plugins_changed_callback()),
);
let plugins_manager = self.thread_manager.plugins_manager();
plugins_manager.clear_remote_marketplaces_cache();
plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
&config.plugins_config_input(),
auth.clone(),
Some(self.effective_plugins_changed_callback()),
);
let mut plugin_metadata =
plugin_telemetry_metadata_from_root(&result.plugin_id, &result.installed_path).await;
@@ -1463,6 +1457,7 @@ impl PluginRequestProcessor {
Ok(()) | Err(RemotePluginCatalogError::CacheRemove(_))
) {
let plugins_manager = self.thread_manager.plugins_manager();
plugins_manager.clear_remote_marketplaces_cache();
if plugins_manager.clear_remote_installed_plugins_cache() {
self.on_effective_plugins_changed();
}

View File

@@ -1603,8 +1603,9 @@ async fn plugin_list_includes_remote_marketplaces_when_remote_plugin_enabled() -
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
let mut mcp = McpProcess::new_with_plugin_startup_tasks(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
wait_for_remote_plugin_request_count(&server, "/ps/plugins/list", /*expected_count*/ 1).await?;
let request_id = mcp
.send_plugin_list_request(PluginListParams {
@@ -1662,6 +1663,19 @@ async fn plugin_list_includes_remote_marketplaces_when_remote_plugin_enabled() -
]
);
assert_eq!(response.featured_plugin_ids, Vec::<String>::new());
let request_id = mcp
.send_plugin_list_request(PluginListParams {
cwds: None,
marketplace_kinds: None,
})
.await?;
let _response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
wait_for_remote_plugin_request_count(&server, "/ps/plugins/list", /*expected_count*/ 1).await?;
Ok(())
}

View File

@@ -36,6 +36,8 @@ use crate::marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome;
use crate::marketplace_upgrade::configured_git_marketplace_names;
use crate::marketplace_upgrade::upgrade_configured_git_marketplaces;
use crate::remote::RemoteInstalledPlugin;
use crate::remote::RemoteMarketplace;
use crate::remote::RemoteMarketplaceSource;
use crate::remote::RemotePluginCatalogError;
use crate::remote::RemotePluginServiceConfig;
use crate::remote_legacy::RemotePluginFetchError;
@@ -71,10 +73,13 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Instant;
use tokio::sync::Notify;
use tokio::sync::Semaphore;
use tracing::info;
use tracing::warn;
@@ -82,6 +87,7 @@ use tracing::warn;
static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false);
const FEATURED_PLUGIN_IDS_CACHE_TTL: std::time::Duration =
std::time::Duration::from_secs(60 * 60 * 3);
const REMOTE_MARKETPLACES_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(60 * 5);
#[derive(Debug, Clone)]
pub struct PluginsConfigInput {
@@ -125,6 +131,45 @@ struct CachedFeaturedPluginIds {
featured_plugin_ids: Vec<String>,
}
#[derive(Clone, PartialEq, Eq, Hash)]
struct RemoteMarketplacesCacheKey {
chatgpt_base_url: String,
account_id: Option<String>,
chatgpt_user_id: Option<String>,
is_workspace_account: bool,
sources: Vec<RemoteMarketplaceSource>,
}
#[derive(Clone)]
struct CachedRemoteMarketplaces {
expires_at: Instant,
marketplaces: Vec<RemoteMarketplace>,
}
struct RemoteMarketplacesFetchGuard<'a> {
manager: &'a PluginsManager,
cache_key: RemoteMarketplacesCacheKey,
notify: Arc<Notify>,
active: bool,
}
impl RemoteMarketplacesFetchGuard<'_> {
fn finish(mut self) {
self.manager
.finish_remote_marketplaces_fetch(&self.cache_key, &self.notify);
self.active = false;
}
}
impl Drop for RemoteMarketplacesFetchGuard<'_> {
fn drop(&mut self) {
if self.active {
self.manager
.finish_remote_marketplaces_fetch(&self.cache_key, &self.notify);
}
}
}
struct RemoteInstalledPluginsCacheRefreshRequest {
service_config: RemotePluginServiceConfig,
auth: Option<CodexAuth>,
@@ -191,6 +236,20 @@ fn featured_plugin_ids_cache_key(
}
}
fn remote_marketplaces_cache_key(
config: &PluginsConfigInput,
auth: Option<&CodexAuth>,
sources: &[RemoteMarketplaceSource],
) -> RemoteMarketplacesCacheKey {
RemoteMarketplacesCacheKey {
chatgpt_base_url: config.chatgpt_base_url.clone(),
account_id: auth.and_then(CodexAuth::get_account_id),
chatgpt_user_id: auth.and_then(CodexAuth::get_chatgpt_user_id),
is_workspace_account: auth.is_some_and(CodexAuth::is_workspace_account),
sources: sources.to_vec(),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PluginInstallRequest {
pub plugin_name: String,
@@ -399,6 +458,10 @@ pub struct PluginsManager {
codex_home: PathBuf,
store: PluginStore,
featured_plugin_ids_cache: RwLock<Option<CachedFeaturedPluginIds>>,
remote_marketplaces_cache:
RwLock<HashMap<RemoteMarketplacesCacheKey, CachedRemoteMarketplaces>>,
remote_marketplaces_cache_generation: AtomicU64,
remote_marketplaces_in_flight: Mutex<HashMap<RemoteMarketplacesCacheKey, Arc<Notify>>>,
configured_marketplace_upgrade_state: RwLock<ConfiguredMarketplaceUpgradeState>,
non_curated_cache_refresh_state: RwLock<NonCuratedCacheRefreshState>,
cached_enabled_outcome: RwLock<Option<CachedPluginLoadOutcome>>,
@@ -436,6 +499,9 @@ impl PluginsManager {
codex_home: codex_home.clone(),
store: PluginStore::new(codex_home),
featured_plugin_ids_cache: RwLock::new(None),
remote_marketplaces_cache: RwLock::new(HashMap::new()),
remote_marketplaces_cache_generation: AtomicU64::new(0),
remote_marketplaces_in_flight: Mutex::new(HashMap::new()),
configured_marketplace_upgrade_state: RwLock::new(
ConfiguredMarketplaceUpgradeState::default(),
),
@@ -778,6 +844,129 @@ impl PluginsManager {
});
}
fn prune_and_get_cached_remote_marketplaces(
&self,
cache_key: &RemoteMarketplacesCacheKey,
) -> Option<Vec<RemoteMarketplace>> {
let mut cache = match self.remote_marketplaces_cache.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
let now = Instant::now();
cache.retain(|_, cached| now < cached.expires_at);
cache
.get(cache_key)
.map(|cached| cached.marketplaces.clone())
}
fn write_remote_marketplaces_cache(
&self,
cache_key: RemoteMarketplacesCacheKey,
marketplaces: &[RemoteMarketplace],
) {
let mut cache = match self.remote_marketplaces_cache.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
cache.insert(
cache_key,
CachedRemoteMarketplaces {
expires_at: Instant::now() + REMOTE_MARKETPLACES_CACHE_TTL,
marketplaces: marketplaces.to_vec(),
},
);
}
pub fn clear_remote_marketplaces_cache(&self) {
self.remote_marketplaces_cache_generation
.fetch_add(1, Ordering::SeqCst);
let mut cache = match self.remote_marketplaces_cache.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
cache.clear();
}
fn begin_remote_marketplaces_fetch(
&self,
cache_key: &RemoteMarketplacesCacheKey,
) -> Result<RemoteMarketplacesFetchGuard<'_>, Arc<Notify>> {
let mut in_flight = match self.remote_marketplaces_in_flight.lock() {
Ok(in_flight) => in_flight,
Err(err) => err.into_inner(),
};
if let Some(notify) = in_flight.get(cache_key) {
return Err(Arc::clone(notify));
}
let notify = Arc::new(Notify::new());
in_flight.insert(cache_key.clone(), Arc::clone(&notify));
Ok(RemoteMarketplacesFetchGuard {
manager: self,
cache_key: cache_key.clone(),
notify,
active: true,
})
}
fn finish_remote_marketplaces_fetch(
&self,
cache_key: &RemoteMarketplacesCacheKey,
notify: &Arc<Notify>,
) {
let mut in_flight = match self.remote_marketplaces_in_flight.lock() {
Ok(in_flight) => in_flight,
Err(err) => err.into_inner(),
};
if in_flight
.get(cache_key)
.is_some_and(|current| Arc::ptr_eq(current, notify))
{
in_flight.remove(cache_key);
}
drop(in_flight);
notify.notify_waiters();
}
pub async fn fetch_remote_plugin_with_caching(
&self,
config: &PluginsConfigInput,
auth: Option<&CodexAuth>,
sources: &[RemoteMarketplaceSource],
) -> Result<Vec<RemoteMarketplace>, RemotePluginCatalogError> {
let cache_key = remote_marketplaces_cache_key(config, auth, sources);
loop {
if let Some(marketplaces) = self.prune_and_get_cached_remote_marketplaces(&cache_key) {
return Ok(marketplaces);
}
let fetch_guard = match self.begin_remote_marketplaces_fetch(&cache_key) {
Ok(fetch_guard) => fetch_guard,
Err(notify) => {
notify.notified().await;
continue;
}
};
let cache_generation = self
.remote_marketplaces_cache_generation
.load(Ordering::SeqCst);
let marketplaces = crate::remote::fetch_remote_marketplaces(
&remote_plugin_service_config(config),
auth,
sources,
)
.await?;
if self
.remote_marketplaces_cache_generation
.load(Ordering::SeqCst)
== cache_generation
{
self.write_remote_marketplaces_cache(cache_key, &marketplaces);
}
fetch_guard.finish();
return Ok(marketplaces);
}
}
pub async fn featured_plugin_ids_for_config(
&self,
config: &PluginsConfigInput,
@@ -1518,9 +1707,22 @@ impl PluginsManager {
);
manager.maybe_start_remote_installed_plugin_bundle_sync(
&config,
auth,
auth.clone(),
on_effective_plugins_changed,
);
if let Err(err) = manager
.fetch_remote_plugin_with_caching(
&config,
auth.as_ref(),
&[RemoteMarketplaceSource::Global],
)
.await
{
warn!(
error = %err,
"failed to warm remote marketplaces cache"
);
}
});
}

View File

@@ -74,7 +74,7 @@ pub struct RemoteMarketplace {
pub plugins: Vec<RemotePluginSummary>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteMarketplaceSource {
Global,
WorkspaceDirectory,