Compare commits

...

1 Commits

Author SHA1 Message Date
xli-oai
bbb0f2bb4f Sync remote installed plugin bundles 2026-04-29 14:48:33 -07:00
5 changed files with 691 additions and 4 deletions

View File

@@ -3,7 +3,65 @@ use crate::error_code::internal_error;
use crate::error_code::invalid_request;
use codex_app_server_protocol::PluginInstallPolicy;
fn maybe_start_remote_installed_plugin_bundle_sync_for_config(
thread_manager: Arc<ThreadManager>,
config: Config,
auth: Option<CodexAuth>,
on_effective_plugins_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
) {
if !config.features.enabled(Feature::Plugins) || !config.features.enabled(Feature::RemotePlugin)
{
return;
}
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
let plugins_manager = thread_manager.plugins_manager();
let config_for_refresh = config.clone();
let auth_for_refresh = auth.clone();
let on_local_cache_changed = Arc::new(move || {
plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
&config_for_refresh,
auth_for_refresh.clone(),
on_effective_plugins_changed.clone(),
);
});
codex_core_plugins::remote::maybe_start_remote_installed_plugin_bundle_sync(
config.codex_home.to_path_buf(),
remote_plugin_service_config,
auth,
Some(on_local_cache_changed),
);
}
impl CodexMessageProcessor {
pub(crate) fn maybe_start_remote_installed_plugin_bundle_sync_with_auth_manager(
&self,
config: Config,
auth_manager: Arc<AuthManager>,
) {
if !config.features.enabled(Feature::Plugins)
|| !config.features.enabled(Feature::RemotePlugin)
{
return;
}
let thread_manager = Arc::clone(&self.thread_manager);
let on_effective_plugins_changed =
Some(self.effective_plugins_changed_callback(config.clone()));
tokio::spawn(async move {
let auth = auth_manager.auth().await;
maybe_start_remote_installed_plugin_bundle_sync_for_config(
thread_manager,
config,
auth,
on_effective_plugins_changed,
);
});
}
pub(super) async fn plugin_list(
&self,
request_id: ConnectionRequestId,
@@ -37,11 +95,19 @@ impl CodexMessageProcessor {
{
return Ok(empty_response());
}
let on_effective_plugins_changed =
Some(self.effective_plugins_changed_callback(config.clone()));
plugins_manager.maybe_start_plugin_list_background_tasks_for_config(
&config,
auth.clone(),
&roots,
Some(self.effective_plugins_changed_callback(config.clone())),
on_effective_plugins_changed.clone(),
);
maybe_start_remote_installed_plugin_bundle_sync_for_config(
Arc::clone(&self.thread_manager),
config.clone(),
auth.clone(),
on_effective_plugins_changed,
);
let config_for_marketplace_listing = config.clone();

View File

@@ -324,6 +324,11 @@ impl MessageProcessor {
auth_manager.clone(),
Some(on_effective_plugins_changed),
);
codex_message_processor
.maybe_start_remote_installed_plugin_bundle_sync_with_auth_manager(
(*config).clone(),
auth_manager.clone(),
);
}
let config_api = ConfigApi::new(
config_manager,

View File

@@ -132,6 +132,13 @@ impl McpProcess {
Self::new_with_env_and_args(codex_home, &[], &[]).await
}
pub async fn new_with_env_and_plugin_startup_tasks(
codex_home: &Path,
env_overrides: &[(&str, Option<&str>)],
) -> anyhow::Result<Self> {
Self::new_with_env_and_args(codex_home, env_overrides, &[]).await
}
pub async fn new_with_args(codex_home: &Path, args: &[&str]) -> anyhow::Result<Self> {
let mut all_args = vec![DISABLE_PLUGIN_STARTUP_TASKS_ARG];
all_args.extend_from_slice(args);

View File

@@ -19,6 +19,8 @@ use codex_config::types::AuthCredentialsStoreMode;
use codex_core::config::set_project_trust_level;
use codex_protocol::config_types::TrustLevel;
use codex_utils_absolute_path::AbsolutePathBuf;
use flate2::Compression;
use flate2::write::GzEncoder;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -33,6 +35,8 @@ use wiremock::matchers::query_param;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567";
const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1";
const TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS: &str =
"CODEX_TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS";
const ALTERNATE_MARKETPLACE_RELATIVE_PATH: &str = ".claude-plugin/marketplace.json";
const ALTERNATE_PLUGIN_MANIFEST_RELATIVE_PATH: &str = ".claude-plugin/plugin.json";
@@ -1127,6 +1131,135 @@ async fn app_server_startup_remote_plugin_sync_runs_once() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn app_server_startup_sync_downloads_remote_installed_plugin_bundles() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_remote_plugin_catalog_config(
codex_home.path(),
&format!("{}/backend-api/", server.uri()),
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let bundle_url = mount_remote_plugin_bundle(
&server,
"linear",
remote_plugin_bundle_tar_gz_bytes("linear")?,
)
.await;
let global_installed_body =
remote_installed_plugin_body(&bundle_url, "1.2.3", /*enabled*/ true);
mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await;
mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body())
.await;
let installed_path = codex_home
.path()
.join("plugins/cache/chatgpt-global/linear/1.2.3");
let mut mcp = McpProcess::new_with_env_and_plugin_startup_tasks(
codex_home.path(),
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
wait_for_path_exists(&installed_path.join(".codex-plugin/plugin.json")).await?;
assert!(installed_path.join("skills/plan-work/SKILL.md").is_file());
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
assert!(!config.contains("linear@chatgpt-global"));
Ok(())
}
#[tokio::test]
async fn plugin_list_sync_upgrades_and_removes_remote_installed_plugin_bundles() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_remote_plugin_catalog_config(
codex_home.path(),
&format!("{}/backend-api/", server.uri()),
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
write_installed_plugin_with_version(&codex_home, "chatgpt-global", "linear", "1.0.0")?;
write_installed_plugin_with_version(&codex_home, "chatgpt-global", "stale", "1.0.0")?;
let bundle_url = mount_remote_plugin_bundle(
&server,
"linear",
remote_plugin_bundle_tar_gz_bytes("linear")?,
)
.await;
let global_installed_body =
remote_installed_plugin_body(&bundle_url, "1.2.3", /*enabled*/ true);
mount_remote_plugin_list(&server, "GLOBAL", &global_installed_body).await;
mount_remote_plugin_list(&server, "WORKSPACE", empty_remote_installed_plugins_body()).await;
mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await;
mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body())
.await;
let old_path = codex_home
.path()
.join("plugins/cache/chatgpt-global/linear/1.0.0");
let new_path = codex_home
.path()
.join("plugins/cache/chatgpt-global/linear/1.2.3");
let stale_path = codex_home.path().join("plugins/cache/chatgpt-global/stale");
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_list_request(PluginListParams { cwds: None })
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;
let remote_marketplace = response
.marketplaces
.into_iter()
.find(|marketplace| marketplace.name == "chatgpt-global")
.expect("expected chatgpt-global marketplace entry");
assert_eq!(
remote_marketplace
.plugins
.into_iter()
.map(|plugin| (plugin.id, plugin.installed, plugin.enabled))
.collect::<Vec<_>>(),
vec![(
"plugins~Plugin_00000000000000000000000000000000".to_string(),
true,
true
)]
);
wait_for_path_exists(&new_path.join(".codex-plugin/plugin.json")).await?;
wait_for_path_missing(&old_path).await?;
wait_for_path_missing(&stale_path).await?;
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
assert!(!config.contains("linear@chatgpt-global"));
Ok(())
}
#[tokio::test]
async fn plugin_list_includes_remote_marketplaces_when_remote_plugin_enabled() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -1590,17 +1723,152 @@ async fn wait_for_path_exists(path: &std::path::Path) -> Result<()> {
Ok(())
}
async fn wait_for_path_missing(path: &std::path::Path) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
if !path.exists() {
return Ok::<(), anyhow::Error>(());
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
async fn mount_remote_plugin_list(server: &MockServer, scope: &str, body: &str) {
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/list"))
.and(query_param("scope", scope))
.and(query_param("limit", "200"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(server)
.await;
}
async fn mount_remote_installed_plugins(server: &MockServer, scope: &str, body: &str) {
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/installed"))
.and(query_param("scope", scope))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(server)
.await;
}
fn empty_remote_installed_plugins_body() -> &'static str {
r#"{
"plugins": [],
"pagination": {
"limit": 50,
"next_page_token": null
}
}"#
}
fn remote_installed_plugin_body(
bundle_download_url: &str,
release_version: &str,
enabled: bool,
) -> String {
format!(
r#"{{
"plugins": [
{{
"id": "plugins~Plugin_00000000000000000000000000000000",
"name": "linear",
"scope": "GLOBAL",
"installation_policy": "AVAILABLE",
"authentication_policy": "ON_USE",
"release": {{
"version": "{release_version}",
"display_name": "Linear",
"description": "Track work in Linear",
"bundle_download_url": "{bundle_download_url}",
"app_ids": [],
"interface": {{}},
"skills": []
}},
"enabled": {enabled},
"disabled_skill_names": []
}}
],
"pagination": {{
"limit": 50,
"next_page_token": null
}}
}}"#
)
}
async fn mount_remote_plugin_bundle(
server: &MockServer,
plugin_name: &str,
body: Vec<u8>,
) -> String {
let bundle_path = format!("/bundles/{plugin_name}.tar.gz");
Mock::given(method("GET"))
.and(path(bundle_path.as_str()))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "application/gzip")
.set_body_bytes(body),
)
.mount(server)
.await;
format!("{}{bundle_path}", server.uri())
}
fn remote_plugin_bundle_tar_gz_bytes(plugin_name: &str) -> Result<Vec<u8>> {
let manifest = format!(r#"{{"name":"{plugin_name}"}}"#);
let skill = "---\nname: plan-work\ndescription: Track work in Linear.\n---\n\n# Plan Work\n";
let encoder = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(encoder);
for (path, contents, mode) in [
(
".codex-plugin/plugin.json",
manifest.as_bytes(),
/*mode*/ 0o644,
),
(
"skills/plan-work/SKILL.md",
skill.as_bytes(),
/*mode*/ 0o644,
),
] {
let mut header = tar::Header::new_gnu();
header.set_size(contents.len() as u64);
header.set_mode(mode);
header.set_cksum();
tar.append_data(&mut header, path, contents)?;
}
Ok(tar.into_inner()?.finish()?)
}
fn write_installed_plugin(
codex_home: &TempDir,
marketplace_name: &str,
plugin_name: &str,
) -> Result<()> {
write_installed_plugin_with_version(codex_home, marketplace_name, plugin_name, "local")
}
fn write_installed_plugin_with_version(
codex_home: &TempDir,
marketplace_name: &str,
plugin_name: &str,
plugin_version: &str,
) -> Result<()> {
let plugin_root = codex_home
.path()
.join("plugins/cache")
.join(marketplace_name)
.join(plugin_name)
.join("local/.codex-plugin");
.join(plugin_version)
.join(".codex-plugin");
std::fs::create_dir_all(&plugin_root)?;
std::fs::write(
plugin_root.join("plugin.json"),

View File

@@ -1,5 +1,6 @@
use crate::store::PLUGINS_CACHE_DIR;
use crate::store::PluginStore;
use crate::store::PluginStoreError;
use codex_app_server_protocol::PluginAuthPolicy;
use codex_app_server_protocol::PluginInstallPolicy;
use codex_app_server_protocol::PluginInterface;
@@ -13,8 +14,14 @@ use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashSet;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
use std::time::Duration;
use tracing::info;
use tracing::warn;
pub const REMOTE_GLOBAL_MARKETPLACE_NAME: &str = "chatgpt-global";
pub const REMOTE_WORKSPACE_MARKETPLACE_NAME: &str = "chatgpt-workspace";
@@ -25,6 +32,10 @@ const REMOTE_PLUGIN_CATALOG_TIMEOUT: Duration = Duration::from_secs(30);
const REMOTE_PLUGIN_LIST_PAGE_LIMIT: u32 = 200;
const MAX_REMOTE_DEFAULT_PROMPT_LEN: usize = 128;
static REMOTE_INSTALLED_PLUGIN_BUNDLE_SYNC_IN_FLIGHT: OnceLock<
Mutex<HashSet<RemoteInstalledPluginBundleSyncKey>>,
> = OnceLock::new();
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemotePluginServiceConfig {
pub chatgpt_base_url: String,
@@ -141,6 +152,42 @@ pub enum RemotePluginCatalogError {
CacheRemove(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct RemoteInstalledPluginBundleSyncOutcome {
pub installed_plugin_ids: Vec<String>,
pub removed_cache_plugin_ids: Vec<String>,
pub failed_remote_plugin_ids: Vec<String>,
}
impl RemoteInstalledPluginBundleSyncOutcome {
pub fn changed_local_cache(&self) -> bool {
!self.installed_plugin_ids.is_empty() || !self.removed_cache_plugin_ids.is_empty()
}
}
#[derive(Debug, thiserror::Error)]
pub enum RemoteInstalledPluginBundleSyncError {
#[error("{0}")]
Catalog(#[from] RemotePluginCatalogError),
#[error("{0}")]
Store(#[from] PluginStoreError),
#[error("failed to join stale remote plugin cache cleanup task: {0}")]
Join(#[from] tokio::task::JoinError),
#[error("failed to remove stale remote plugin cache entries: {0}")]
CacheRemove(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct RemoteInstalledPluginBundleSyncKey {
codex_home: PathBuf,
chatgpt_base_url: String,
account_id: Option<String>,
chatgpt_user_id: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize)]
enum RemotePluginScope {
#[serde(rename = "GLOBAL")]
@@ -408,6 +455,177 @@ pub async fn fetch_remote_installed_plugins(
Ok(installed_plugins)
}
pub fn maybe_start_remote_installed_plugin_bundle_sync(
codex_home: PathBuf,
config: RemotePluginServiceConfig,
auth: Option<CodexAuth>,
on_local_cache_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
) {
let Some(auth) = auth else {
return;
};
let key = RemoteInstalledPluginBundleSyncKey {
codex_home: codex_home.clone(),
chatgpt_base_url: config.chatgpt_base_url.clone(),
account_id: auth.get_account_id(),
chatgpt_user_id: auth.get_chatgpt_user_id(),
};
if !mark_remote_installed_plugin_bundle_sync_in_flight(key.clone()) {
return;
}
tokio::spawn(async move {
let result =
sync_remote_installed_plugin_bundles_once(codex_home, &config, Some(&auth)).await;
match result {
Ok(outcome) => {
if outcome.changed_local_cache()
&& let Some(on_local_cache_changed) = on_local_cache_changed
{
on_local_cache_changed();
}
info!(
installed_plugin_ids = ?outcome.installed_plugin_ids,
removed_cache_plugin_ids = ?outcome.removed_cache_plugin_ids,
failed_remote_plugin_ids = ?outcome.failed_remote_plugin_ids,
"completed remote installed plugin bundle sync"
);
}
Err(err) => {
warn!(
error = %err,
"remote installed plugin bundle sync failed"
);
}
}
clear_remote_installed_plugin_bundle_sync_in_flight(&key);
});
}
pub async fn sync_remote_installed_plugin_bundles_once(
codex_home: PathBuf,
config: &RemotePluginServiceConfig,
auth: Option<&CodexAuth>,
) -> Result<RemoteInstalledPluginBundleSyncOutcome, RemoteInstalledPluginBundleSyncError> {
let auth = ensure_chatgpt_auth(auth)?;
let global = async {
let scope = RemotePluginScope::Global;
let installed_plugins =
fetch_installed_plugins_for_scope_with_download_urls(config, auth, scope).await?;
Ok::<_, RemotePluginCatalogError>((scope, installed_plugins))
};
let workspace = async {
let scope = RemotePluginScope::Workspace;
let installed_plugins =
fetch_installed_plugins_for_scope_with_download_urls(config, auth, scope).await?;
Ok::<_, RemotePluginCatalogError>((scope, installed_plugins))
};
let (global, workspace) = tokio::try_join!(global, workspace)?;
let store = PluginStore::try_new(codex_home.clone())?;
let mut installed_plugin_names_by_marketplace =
BTreeMap::<String, BTreeSet<String>>::from_iter([
(REMOTE_GLOBAL_MARKETPLACE_NAME.to_string(), BTreeSet::new()),
(
REMOTE_WORKSPACE_MARKETPLACE_NAME.to_string(),
BTreeSet::new(),
),
]);
let mut installed_plugin_ids = BTreeSet::new();
let mut failed_remote_plugin_ids = BTreeSet::new();
for (scope, installed_plugins) in [global, workspace] {
let marketplace_name = scope.marketplace_name().to_string();
for installed_plugin in installed_plugins {
let plugin = installed_plugin.plugin;
installed_plugin_names_by_marketplace
.entry(marketplace_name.clone())
.or_default()
.insert(plugin.name.clone());
let plugin_id = match PluginId::new(plugin.name.clone(), marketplace_name.clone()) {
Ok(plugin_id) => plugin_id,
Err(err) => {
warn!(
remote_plugin_id = %plugin.id,
plugin = %plugin.name,
marketplace = %marketplace_name,
error = %err,
"skipping remote installed plugin with invalid local cache id"
);
failed_remote_plugin_ids.insert(plugin.id);
continue;
}
};
let release_version = plugin
.release
.version
.as_deref()
.map(str::trim)
.filter(|version| !version.is_empty());
if store.active_plugin_version(&plugin_id).as_deref() == release_version {
continue;
}
let bundle = match crate::remote_bundle::validate_remote_plugin_bundle(
&plugin.id,
&marketplace_name,
&plugin.name,
release_version,
plugin.release.bundle_download_url.as_deref(),
) {
Ok(bundle) => bundle,
Err(err) => {
warn!(
remote_plugin_id = %plugin.id,
plugin = %plugin.name,
marketplace = %marketplace_name,
error = %err,
"skipping remote installed plugin bundle download"
);
failed_remote_plugin_ids.insert(plugin.id);
continue;
}
};
match crate::remote_bundle::download_and_install_remote_plugin_bundle(
codex_home.clone(),
bundle,
)
.await
{
Ok(result) => {
installed_plugin_ids.insert(result.plugin_id.as_key());
}
Err(err) => {
warn!(
remote_plugin_id = %plugin.id,
plugin = %plugin.name,
marketplace = %marketplace_name,
error = %err,
"failed to download remote installed plugin bundle"
);
failed_remote_plugin_ids.insert(plugin.id);
}
}
}
}
let removed_cache_plugin_ids = tokio::task::spawn_blocking(move || {
remove_stale_remote_plugin_caches(
codex_home.as_path(),
&installed_plugin_names_by_marketplace,
)
})
.await?
.map_err(RemoteInstalledPluginBundleSyncError::CacheRemove)?;
Ok(RemoteInstalledPluginBundleSyncOutcome {
installed_plugin_ids: installed_plugin_ids.into_iter().collect(),
removed_cache_plugin_ids,
failed_remote_plugin_ids: failed_remote_plugin_ids.into_iter().collect(),
})
}
pub async fn fetch_remote_plugin_detail(
config: &RemotePluginServiceConfig,
auth: Option<&CodexAuth>,
@@ -651,6 +869,96 @@ fn remove_remote_plugin_cache(
Ok(())
}
fn remove_stale_remote_plugin_caches(
codex_home: &Path,
installed_plugin_names_by_marketplace: &BTreeMap<String, BTreeSet<String>>,
) -> Result<Vec<String>, String> {
let mut removed_cache_plugin_ids = Vec::new();
for marketplace_name in [
REMOTE_GLOBAL_MARKETPLACE_NAME,
REMOTE_WORKSPACE_MARKETPLACE_NAME,
] {
let marketplace_root = codex_home.join(PLUGINS_CACHE_DIR).join(marketplace_name);
if !marketplace_root.exists() {
continue;
}
let installed_plugin_names = installed_plugin_names_by_marketplace
.get(marketplace_name)
.cloned()
.unwrap_or_default();
for entry in fs::read_dir(&marketplace_root).map_err(|err| {
format!(
"failed to read remote plugin cache directory {}: {err}",
marketplace_root.display()
)
})? {
let entry = entry.map_err(|err| {
format!(
"failed to enumerate remote plugin cache directory {}: {err}",
marketplace_root.display()
)
})?;
let plugin_name = entry.file_name().into_string().map_err(|file_name| {
format!(
"remote plugin cache entry under {} is not valid UTF-8: {:?}",
marketplace_root.display(),
file_name
)
})?;
if installed_plugin_names.contains(&plugin_name) {
continue;
}
let cache_path = entry.path();
if cache_path.is_dir() {
fs::remove_dir_all(&cache_path).map_err(|err| {
format!(
"failed to remove stale remote plugin cache entry {}: {err}",
cache_path.display()
)
})?;
} else {
fs::remove_file(&cache_path).map_err(|err| {
format!(
"failed to remove stale remote plugin cache entry {}: {err}",
cache_path.display()
)
})?;
}
let plugin_key = PluginId::new(plugin_name.clone(), marketplace_name.to_string())
.map(|plugin_id| plugin_id.as_key())
.unwrap_or_else(|_| format!("{plugin_name}@{marketplace_name}"));
removed_cache_plugin_ids.push(plugin_key);
}
}
removed_cache_plugin_ids.sort();
Ok(removed_cache_plugin_ids)
}
fn mark_remote_installed_plugin_bundle_sync_in_flight(
key: RemoteInstalledPluginBundleSyncKey,
) -> bool {
let syncs =
REMOTE_INSTALLED_PLUGIN_BUNDLE_SYNC_IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
let mut syncs = match syncs.lock() {
Ok(syncs) => syncs,
Err(err) => err.into_inner(),
};
syncs.insert(key)
}
fn clear_remote_installed_plugin_bundle_sync_in_flight(key: &RemoteInstalledPluginBundleSyncKey) {
let Some(syncs) = REMOTE_INSTALLED_PLUGIN_BUNDLE_SYNC_IN_FLIGHT.get() else {
return;
};
let mut syncs = match syncs.lock() {
Ok(syncs) => syncs,
Err(err) => err.into_inner(),
};
syncs.remove(key);
}
fn build_remote_plugin_summary(
plugin: &RemotePluginDirectoryItem,
installed_plugin: Option<&RemotePluginInstalledItem>,
@@ -791,12 +1099,41 @@ async fn fetch_installed_plugins_for_scope(
config: &RemotePluginServiceConfig,
auth: &CodexAuth,
scope: RemotePluginScope,
) -> Result<Vec<RemotePluginInstalledItem>, RemotePluginCatalogError> {
fetch_installed_plugins_for_scope_with_download_url_option(
config, auth, scope, /*include_download_urls*/ false,
)
.await
}
async fn fetch_installed_plugins_for_scope_with_download_urls(
config: &RemotePluginServiceConfig,
auth: &CodexAuth,
scope: RemotePluginScope,
) -> Result<Vec<RemotePluginInstalledItem>, RemotePluginCatalogError> {
fetch_installed_plugins_for_scope_with_download_url_option(
config, auth, scope, /*include_download_urls*/ true,
)
.await
}
async fn fetch_installed_plugins_for_scope_with_download_url_option(
config: &RemotePluginServiceConfig,
auth: &CodexAuth,
scope: RemotePluginScope,
include_download_urls: bool,
) -> Result<Vec<RemotePluginInstalledItem>, RemotePluginCatalogError> {
let mut plugins = Vec::new();
let mut page_token = None;
loop {
let response =
get_remote_plugin_installed_page(config, auth, scope, page_token.as_deref()).await?;
let response = get_remote_plugin_installed_page(
config,
auth,
scope,
page_token.as_deref(),
include_download_urls,
)
.await?;
plugins.extend(response.plugins);
let Some(next_page_token) = response.pagination.next_page_token else {
break;
@@ -829,12 +1166,16 @@ async fn get_remote_plugin_installed_page(
auth: &CodexAuth,
scope: RemotePluginScope,
page_token: Option<&str>,
include_download_urls: bool,
) -> Result<RemotePluginInstalledResponse, RemotePluginCatalogError> {
let base_url = config.chatgpt_base_url.trim_end_matches('/');
let url = format!("{base_url}/ps/plugins/installed");
let client = build_reqwest_client();
let mut request = authenticated_request(client.get(&url), auth)?;
request = request.query(&[("scope", scope.api_value())]);
if include_download_urls {
request = request.query(&[("includeDownloadUrls", true)]);
}
if let Some(page_token) = page_token {
request = request.query(&[("pageToken", page_token)]);
}