mirror of
https://github.com/openai/codex.git
synced 2026-05-06 12:26:38 +00:00
Compare commits
1 Commits
abhinav/sk
...
xli-codex/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bbb0f2bb4f |
@@ -3,7 +3,65 @@ use crate::error_code::internal_error;
|
||||
use crate::error_code::invalid_request;
|
||||
use codex_app_server_protocol::PluginInstallPolicy;
|
||||
|
||||
fn maybe_start_remote_installed_plugin_bundle_sync_for_config(
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
config: Config,
|
||||
auth: Option<CodexAuth>,
|
||||
on_effective_plugins_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
|
||||
) {
|
||||
if !config.features.enabled(Feature::Plugins) || !config.features.enabled(Feature::RemotePlugin)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let remote_plugin_service_config = RemotePluginServiceConfig {
|
||||
chatgpt_base_url: config.chatgpt_base_url.clone(),
|
||||
};
|
||||
let plugins_manager = thread_manager.plugins_manager();
|
||||
let config_for_refresh = config.clone();
|
||||
let auth_for_refresh = auth.clone();
|
||||
let on_local_cache_changed = Arc::new(move || {
|
||||
plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
|
||||
&config_for_refresh,
|
||||
auth_for_refresh.clone(),
|
||||
on_effective_plugins_changed.clone(),
|
||||
);
|
||||
});
|
||||
|
||||
codex_core_plugins::remote::maybe_start_remote_installed_plugin_bundle_sync(
|
||||
config.codex_home.to_path_buf(),
|
||||
remote_plugin_service_config,
|
||||
auth,
|
||||
Some(on_local_cache_changed),
|
||||
);
|
||||
}
|
||||
|
||||
impl CodexMessageProcessor {
|
||||
pub(crate) fn maybe_start_remote_installed_plugin_bundle_sync_with_auth_manager(
|
||||
&self,
|
||||
config: Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) {
|
||||
if !config.features.enabled(Feature::Plugins)
|
||||
|| !config.features.enabled(Feature::RemotePlugin)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let thread_manager = Arc::clone(&self.thread_manager);
|
||||
let on_effective_plugins_changed =
|
||||
Some(self.effective_plugins_changed_callback(config.clone()));
|
||||
tokio::spawn(async move {
|
||||
let auth = auth_manager.auth().await;
|
||||
maybe_start_remote_installed_plugin_bundle_sync_for_config(
|
||||
thread_manager,
|
||||
config,
|
||||
auth,
|
||||
on_effective_plugins_changed,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
pub(super) async fn plugin_list(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
@@ -37,11 +95,19 @@ impl CodexMessageProcessor {
|
||||
{
|
||||
return Ok(empty_response());
|
||||
}
|
||||
let on_effective_plugins_changed =
|
||||
Some(self.effective_plugins_changed_callback(config.clone()));
|
||||
plugins_manager.maybe_start_plugin_list_background_tasks_for_config(
|
||||
&config,
|
||||
auth.clone(),
|
||||
&roots,
|
||||
Some(self.effective_plugins_changed_callback(config.clone())),
|
||||
on_effective_plugins_changed.clone(),
|
||||
);
|
||||
maybe_start_remote_installed_plugin_bundle_sync_for_config(
|
||||
Arc::clone(&self.thread_manager),
|
||||
config.clone(),
|
||||
auth.clone(),
|
||||
on_effective_plugins_changed,
|
||||
);
|
||||
|
||||
let config_for_marketplace_listing = config.clone();
|
||||
|
||||
@@ -324,6 +324,11 @@ impl MessageProcessor {
|
||||
auth_manager.clone(),
|
||||
Some(on_effective_plugins_changed),
|
||||
);
|
||||
codex_message_processor
|
||||
.maybe_start_remote_installed_plugin_bundle_sync_with_auth_manager(
|
||||
(*config).clone(),
|
||||
auth_manager.clone(),
|
||||
);
|
||||
}
|
||||
let config_api = ConfigApi::new(
|
||||
config_manager,
|
||||
|
||||
@@ -132,6 +132,13 @@ impl McpProcess {
|
||||
Self::new_with_env_and_args(codex_home, &[], &[]).await
|
||||
}
|
||||
|
||||
pub async fn new_with_env_and_plugin_startup_tasks(
|
||||
codex_home: &Path,
|
||||
env_overrides: &[(&str, Option<&str>)],
|
||||
) -> anyhow::Result<Self> {
|
||||
Self::new_with_env_and_args(codex_home, env_overrides, &[]).await
|
||||
}
|
||||
|
||||
pub async fn new_with_args(codex_home: &Path, args: &[&str]) -> anyhow::Result<Self> {
|
||||
let mut all_args = vec![DISABLE_PLUGIN_STARTUP_TASKS_ARG];
|
||||
all_args.extend_from_slice(args);
|
||||
|
||||
@@ -19,6 +19,8 @@ use codex_config::types::AuthCredentialsStoreMode;
|
||||
use codex_core::config::set_project_trust_level;
|
||||
use codex_protocol::config_types::TrustLevel;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use flate2::Compression;
|
||||
use flate2::write::GzEncoder;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
@@ -33,6 +35,8 @@ use wiremock::matchers::query_param;
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567";
|
||||
const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1";
|
||||
const TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS: &str =
|
||||
"CODEX_TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS";
|
||||
const ALTERNATE_MARKETPLACE_RELATIVE_PATH: &str = ".claude-plugin/marketplace.json";
|
||||
const ALTERNATE_PLUGIN_MANIFEST_RELATIVE_PATH: &str = ".claude-plugin/plugin.json";
|
||||
|
||||
@@ -1127,6 +1131,135 @@ async fn app_server_startup_remote_plugin_sync_runs_once() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn app_server_startup_sync_downloads_remote_installed_plugin_bundles() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let server = MockServer::start().await;
|
||||
write_remote_plugin_catalog_config(
|
||||
codex_home.path(),
|
||||
&format!("{}/backend-api/", server.uri()),
|
||||
)?;
|
||||
write_chatgpt_auth(
|
||||
codex_home.path(),
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
.account_id("account-123")
|
||||
.chatgpt_user_id("user-123")
|
||||
.chatgpt_account_id("account-123"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)?;
|
||||
|
||||
let bundle_url = mount_remote_plugin_bundle(
|
||||
&server,
|
||||
"linear",
|
||||
remote_plugin_bundle_tar_gz_bytes("linear")?,
|
||||
)
|
||||
.await;
|
||||
let global_installed_body =
|
||||
remote_installed_plugin_body(&bundle_url, "1.2.3", /*enabled*/ true);
|
||||
mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await;
|
||||
mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body())
|
||||
.await;
|
||||
|
||||
let installed_path = codex_home
|
||||
.path()
|
||||
.join("plugins/cache/chatgpt-global/linear/1.2.3");
|
||||
let mut mcp = McpProcess::new_with_env_and_plugin_startup_tasks(
|
||||
codex_home.path(),
|
||||
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
|
||||
)
|
||||
.await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
wait_for_path_exists(&installed_path.join(".codex-plugin/plugin.json")).await?;
|
||||
assert!(installed_path.join("skills/plan-work/SKILL.md").is_file());
|
||||
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
|
||||
assert!(!config.contains("linear@chatgpt-global"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn plugin_list_sync_upgrades_and_removes_remote_installed_plugin_bundles() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let server = MockServer::start().await;
|
||||
write_remote_plugin_catalog_config(
|
||||
codex_home.path(),
|
||||
&format!("{}/backend-api/", server.uri()),
|
||||
)?;
|
||||
write_chatgpt_auth(
|
||||
codex_home.path(),
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
.account_id("account-123")
|
||||
.chatgpt_user_id("user-123")
|
||||
.chatgpt_account_id("account-123"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)?;
|
||||
write_installed_plugin_with_version(&codex_home, "chatgpt-global", "linear", "1.0.0")?;
|
||||
write_installed_plugin_with_version(&codex_home, "chatgpt-global", "stale", "1.0.0")?;
|
||||
|
||||
let bundle_url = mount_remote_plugin_bundle(
|
||||
&server,
|
||||
"linear",
|
||||
remote_plugin_bundle_tar_gz_bytes("linear")?,
|
||||
)
|
||||
.await;
|
||||
let global_installed_body =
|
||||
remote_installed_plugin_body(&bundle_url, "1.2.3", /*enabled*/ true);
|
||||
mount_remote_plugin_list(&server, "GLOBAL", &global_installed_body).await;
|
||||
mount_remote_plugin_list(&server, "WORKSPACE", empty_remote_installed_plugins_body()).await;
|
||||
mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await;
|
||||
mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body())
|
||||
.await;
|
||||
|
||||
let old_path = codex_home
|
||||
.path()
|
||||
.join("plugins/cache/chatgpt-global/linear/1.0.0");
|
||||
let new_path = codex_home
|
||||
.path()
|
||||
.join("plugins/cache/chatgpt-global/linear/1.2.3");
|
||||
let stale_path = codex_home.path().join("plugins/cache/chatgpt-global/stale");
|
||||
|
||||
let mut mcp = McpProcess::new_with_env(
|
||||
codex_home.path(),
|
||||
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
|
||||
)
|
||||
.await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_plugin_list_request(PluginListParams { cwds: None })
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let response: PluginListResponse = to_response(response)?;
|
||||
let remote_marketplace = response
|
||||
.marketplaces
|
||||
.into_iter()
|
||||
.find(|marketplace| marketplace.name == "chatgpt-global")
|
||||
.expect("expected chatgpt-global marketplace entry");
|
||||
assert_eq!(
|
||||
remote_marketplace
|
||||
.plugins
|
||||
.into_iter()
|
||||
.map(|plugin| (plugin.id, plugin.installed, plugin.enabled))
|
||||
.collect::<Vec<_>>(),
|
||||
vec![(
|
||||
"plugins~Plugin_00000000000000000000000000000000".to_string(),
|
||||
true,
|
||||
true
|
||||
)]
|
||||
);
|
||||
|
||||
wait_for_path_exists(&new_path.join(".codex-plugin/plugin.json")).await?;
|
||||
wait_for_path_missing(&old_path).await?;
|
||||
wait_for_path_missing(&stale_path).await?;
|
||||
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
|
||||
assert!(!config.contains("linear@chatgpt-global"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn plugin_list_includes_remote_marketplaces_when_remote_plugin_enabled() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -1590,17 +1723,152 @@ async fn wait_for_path_exists(path: &std::path::Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_path_missing(path: &std::path::Path) -> Result<()> {
|
||||
timeout(DEFAULT_TIMEOUT, async {
|
||||
loop {
|
||||
if !path.exists() {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mount_remote_plugin_list(server: &MockServer, scope: &str, body: &str) {
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/ps/plugins/list"))
|
||||
.and(query_param("scope", scope))
|
||||
.and(query_param("limit", "200"))
|
||||
.and(header("authorization", "Bearer chatgpt-token"))
|
||||
.and(header("chatgpt-account-id", "account-123"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(body))
|
||||
.mount(server)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn mount_remote_installed_plugins(server: &MockServer, scope: &str, body: &str) {
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/ps/plugins/installed"))
|
||||
.and(query_param("scope", scope))
|
||||
.and(header("authorization", "Bearer chatgpt-token"))
|
||||
.and(header("chatgpt-account-id", "account-123"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(body))
|
||||
.mount(server)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn empty_remote_installed_plugins_body() -> &'static str {
|
||||
r#"{
|
||||
"plugins": [],
|
||||
"pagination": {
|
||||
"limit": 50,
|
||||
"next_page_token": null
|
||||
}
|
||||
}"#
|
||||
}
|
||||
|
||||
fn remote_installed_plugin_body(
|
||||
bundle_download_url: &str,
|
||||
release_version: &str,
|
||||
enabled: bool,
|
||||
) -> String {
|
||||
format!(
|
||||
r#"{{
|
||||
"plugins": [
|
||||
{{
|
||||
"id": "plugins~Plugin_00000000000000000000000000000000",
|
||||
"name": "linear",
|
||||
"scope": "GLOBAL",
|
||||
"installation_policy": "AVAILABLE",
|
||||
"authentication_policy": "ON_USE",
|
||||
"release": {{
|
||||
"version": "{release_version}",
|
||||
"display_name": "Linear",
|
||||
"description": "Track work in Linear",
|
||||
"bundle_download_url": "{bundle_download_url}",
|
||||
"app_ids": [],
|
||||
"interface": {{}},
|
||||
"skills": []
|
||||
}},
|
||||
"enabled": {enabled},
|
||||
"disabled_skill_names": []
|
||||
}}
|
||||
],
|
||||
"pagination": {{
|
||||
"limit": 50,
|
||||
"next_page_token": null
|
||||
}}
|
||||
}}"#
|
||||
)
|
||||
}
|
||||
|
||||
async fn mount_remote_plugin_bundle(
|
||||
server: &MockServer,
|
||||
plugin_name: &str,
|
||||
body: Vec<u8>,
|
||||
) -> String {
|
||||
let bundle_path = format!("/bundles/{plugin_name}.tar.gz");
|
||||
Mock::given(method("GET"))
|
||||
.and(path(bundle_path.as_str()))
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "application/gzip")
|
||||
.set_body_bytes(body),
|
||||
)
|
||||
.mount(server)
|
||||
.await;
|
||||
format!("{}{bundle_path}", server.uri())
|
||||
}
|
||||
|
||||
fn remote_plugin_bundle_tar_gz_bytes(plugin_name: &str) -> Result<Vec<u8>> {
|
||||
let manifest = format!(r#"{{"name":"{plugin_name}"}}"#);
|
||||
let skill = "---\nname: plan-work\ndescription: Track work in Linear.\n---\n\n# Plan Work\n";
|
||||
let encoder = GzEncoder::new(Vec::new(), Compression::default());
|
||||
let mut tar = tar::Builder::new(encoder);
|
||||
for (path, contents, mode) in [
|
||||
(
|
||||
".codex-plugin/plugin.json",
|
||||
manifest.as_bytes(),
|
||||
/*mode*/ 0o644,
|
||||
),
|
||||
(
|
||||
"skills/plan-work/SKILL.md",
|
||||
skill.as_bytes(),
|
||||
/*mode*/ 0o644,
|
||||
),
|
||||
] {
|
||||
let mut header = tar::Header::new_gnu();
|
||||
header.set_size(contents.len() as u64);
|
||||
header.set_mode(mode);
|
||||
header.set_cksum();
|
||||
tar.append_data(&mut header, path, contents)?;
|
||||
}
|
||||
Ok(tar.into_inner()?.finish()?)
|
||||
}
|
||||
|
||||
fn write_installed_plugin(
|
||||
codex_home: &TempDir,
|
||||
marketplace_name: &str,
|
||||
plugin_name: &str,
|
||||
) -> Result<()> {
|
||||
write_installed_plugin_with_version(codex_home, marketplace_name, plugin_name, "local")
|
||||
}
|
||||
|
||||
fn write_installed_plugin_with_version(
|
||||
codex_home: &TempDir,
|
||||
marketplace_name: &str,
|
||||
plugin_name: &str,
|
||||
plugin_version: &str,
|
||||
) -> Result<()> {
|
||||
let plugin_root = codex_home
|
||||
.path()
|
||||
.join("plugins/cache")
|
||||
.join(marketplace_name)
|
||||
.join(plugin_name)
|
||||
.join("local/.codex-plugin");
|
||||
.join(plugin_version)
|
||||
.join(".codex-plugin");
|
||||
std::fs::create_dir_all(&plugin_root)?;
|
||||
std::fs::write(
|
||||
plugin_root.join("plugin.json"),
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::store::PLUGINS_CACHE_DIR;
|
||||
use crate::store::PluginStore;
|
||||
use crate::store::PluginStoreError;
|
||||
use codex_app_server_protocol::PluginAuthPolicy;
|
||||
use codex_app_server_protocol::PluginInstallPolicy;
|
||||
use codex_app_server_protocol::PluginInterface;
|
||||
@@ -13,8 +14,14 @@ use std::collections::BTreeMap;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashSet;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
pub const REMOTE_GLOBAL_MARKETPLACE_NAME: &str = "chatgpt-global";
|
||||
pub const REMOTE_WORKSPACE_MARKETPLACE_NAME: &str = "chatgpt-workspace";
|
||||
@@ -25,6 +32,10 @@ const REMOTE_PLUGIN_CATALOG_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const REMOTE_PLUGIN_LIST_PAGE_LIMIT: u32 = 200;
|
||||
const MAX_REMOTE_DEFAULT_PROMPT_LEN: usize = 128;
|
||||
|
||||
static REMOTE_INSTALLED_PLUGIN_BUNDLE_SYNC_IN_FLIGHT: OnceLock<
|
||||
Mutex<HashSet<RemoteInstalledPluginBundleSyncKey>>,
|
||||
> = OnceLock::new();
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RemotePluginServiceConfig {
|
||||
pub chatgpt_base_url: String,
|
||||
@@ -141,6 +152,42 @@ pub enum RemotePluginCatalogError {
|
||||
CacheRemove(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default)]
|
||||
pub struct RemoteInstalledPluginBundleSyncOutcome {
|
||||
pub installed_plugin_ids: Vec<String>,
|
||||
pub removed_cache_plugin_ids: Vec<String>,
|
||||
pub failed_remote_plugin_ids: Vec<String>,
|
||||
}
|
||||
|
||||
impl RemoteInstalledPluginBundleSyncOutcome {
|
||||
pub fn changed_local_cache(&self) -> bool {
|
||||
!self.installed_plugin_ids.is_empty() || !self.removed_cache_plugin_ids.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum RemoteInstalledPluginBundleSyncError {
|
||||
#[error("{0}")]
|
||||
Catalog(#[from] RemotePluginCatalogError),
|
||||
|
||||
#[error("{0}")]
|
||||
Store(#[from] PluginStoreError),
|
||||
|
||||
#[error("failed to join stale remote plugin cache cleanup task: {0}")]
|
||||
Join(#[from] tokio::task::JoinError),
|
||||
|
||||
#[error("failed to remove stale remote plugin cache entries: {0}")]
|
||||
CacheRemove(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
struct RemoteInstalledPluginBundleSyncKey {
|
||||
codex_home: PathBuf,
|
||||
chatgpt_base_url: String,
|
||||
account_id: Option<String>,
|
||||
chatgpt_user_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize)]
|
||||
enum RemotePluginScope {
|
||||
#[serde(rename = "GLOBAL")]
|
||||
@@ -408,6 +455,177 @@ pub async fn fetch_remote_installed_plugins(
|
||||
Ok(installed_plugins)
|
||||
}
|
||||
|
||||
pub fn maybe_start_remote_installed_plugin_bundle_sync(
|
||||
codex_home: PathBuf,
|
||||
config: RemotePluginServiceConfig,
|
||||
auth: Option<CodexAuth>,
|
||||
on_local_cache_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
|
||||
) {
|
||||
let Some(auth) = auth else {
|
||||
return;
|
||||
};
|
||||
let key = RemoteInstalledPluginBundleSyncKey {
|
||||
codex_home: codex_home.clone(),
|
||||
chatgpt_base_url: config.chatgpt_base_url.clone(),
|
||||
account_id: auth.get_account_id(),
|
||||
chatgpt_user_id: auth.get_chatgpt_user_id(),
|
||||
};
|
||||
if !mark_remote_installed_plugin_bundle_sync_in_flight(key.clone()) {
|
||||
return;
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
let result =
|
||||
sync_remote_installed_plugin_bundles_once(codex_home, &config, Some(&auth)).await;
|
||||
match result {
|
||||
Ok(outcome) => {
|
||||
if outcome.changed_local_cache()
|
||||
&& let Some(on_local_cache_changed) = on_local_cache_changed
|
||||
{
|
||||
on_local_cache_changed();
|
||||
}
|
||||
info!(
|
||||
installed_plugin_ids = ?outcome.installed_plugin_ids,
|
||||
removed_cache_plugin_ids = ?outcome.removed_cache_plugin_ids,
|
||||
failed_remote_plugin_ids = ?outcome.failed_remote_plugin_ids,
|
||||
"completed remote installed plugin bundle sync"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
error = %err,
|
||||
"remote installed plugin bundle sync failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
clear_remote_installed_plugin_bundle_sync_in_flight(&key);
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn sync_remote_installed_plugin_bundles_once(
|
||||
codex_home: PathBuf,
|
||||
config: &RemotePluginServiceConfig,
|
||||
auth: Option<&CodexAuth>,
|
||||
) -> Result<RemoteInstalledPluginBundleSyncOutcome, RemoteInstalledPluginBundleSyncError> {
|
||||
let auth = ensure_chatgpt_auth(auth)?;
|
||||
let global = async {
|
||||
let scope = RemotePluginScope::Global;
|
||||
let installed_plugins =
|
||||
fetch_installed_plugins_for_scope_with_download_urls(config, auth, scope).await?;
|
||||
Ok::<_, RemotePluginCatalogError>((scope, installed_plugins))
|
||||
};
|
||||
let workspace = async {
|
||||
let scope = RemotePluginScope::Workspace;
|
||||
let installed_plugins =
|
||||
fetch_installed_plugins_for_scope_with_download_urls(config, auth, scope).await?;
|
||||
Ok::<_, RemotePluginCatalogError>((scope, installed_plugins))
|
||||
};
|
||||
|
||||
let (global, workspace) = tokio::try_join!(global, workspace)?;
|
||||
let store = PluginStore::try_new(codex_home.clone())?;
|
||||
let mut installed_plugin_names_by_marketplace =
|
||||
BTreeMap::<String, BTreeSet<String>>::from_iter([
|
||||
(REMOTE_GLOBAL_MARKETPLACE_NAME.to_string(), BTreeSet::new()),
|
||||
(
|
||||
REMOTE_WORKSPACE_MARKETPLACE_NAME.to_string(),
|
||||
BTreeSet::new(),
|
||||
),
|
||||
]);
|
||||
let mut installed_plugin_ids = BTreeSet::new();
|
||||
let mut failed_remote_plugin_ids = BTreeSet::new();
|
||||
|
||||
for (scope, installed_plugins) in [global, workspace] {
|
||||
let marketplace_name = scope.marketplace_name().to_string();
|
||||
for installed_plugin in installed_plugins {
|
||||
let plugin = installed_plugin.plugin;
|
||||
installed_plugin_names_by_marketplace
|
||||
.entry(marketplace_name.clone())
|
||||
.or_default()
|
||||
.insert(plugin.name.clone());
|
||||
let plugin_id = match PluginId::new(plugin.name.clone(), marketplace_name.clone()) {
|
||||
Ok(plugin_id) => plugin_id,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
remote_plugin_id = %plugin.id,
|
||||
plugin = %plugin.name,
|
||||
marketplace = %marketplace_name,
|
||||
error = %err,
|
||||
"skipping remote installed plugin with invalid local cache id"
|
||||
);
|
||||
failed_remote_plugin_ids.insert(plugin.id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let release_version = plugin
|
||||
.release
|
||||
.version
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|version| !version.is_empty());
|
||||
if store.active_plugin_version(&plugin_id).as_deref() == release_version {
|
||||
continue;
|
||||
}
|
||||
|
||||
let bundle = match crate::remote_bundle::validate_remote_plugin_bundle(
|
||||
&plugin.id,
|
||||
&marketplace_name,
|
||||
&plugin.name,
|
||||
release_version,
|
||||
plugin.release.bundle_download_url.as_deref(),
|
||||
) {
|
||||
Ok(bundle) => bundle,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
remote_plugin_id = %plugin.id,
|
||||
plugin = %plugin.name,
|
||||
marketplace = %marketplace_name,
|
||||
error = %err,
|
||||
"skipping remote installed plugin bundle download"
|
||||
);
|
||||
failed_remote_plugin_ids.insert(plugin.id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match crate::remote_bundle::download_and_install_remote_plugin_bundle(
|
||||
codex_home.clone(),
|
||||
bundle,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
installed_plugin_ids.insert(result.plugin_id.as_key());
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
remote_plugin_id = %plugin.id,
|
||||
plugin = %plugin.name,
|
||||
marketplace = %marketplace_name,
|
||||
error = %err,
|
||||
"failed to download remote installed plugin bundle"
|
||||
);
|
||||
failed_remote_plugin_ids.insert(plugin.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let removed_cache_plugin_ids = tokio::task::spawn_blocking(move || {
|
||||
remove_stale_remote_plugin_caches(
|
||||
codex_home.as_path(),
|
||||
&installed_plugin_names_by_marketplace,
|
||||
)
|
||||
})
|
||||
.await?
|
||||
.map_err(RemoteInstalledPluginBundleSyncError::CacheRemove)?;
|
||||
|
||||
Ok(RemoteInstalledPluginBundleSyncOutcome {
|
||||
installed_plugin_ids: installed_plugin_ids.into_iter().collect(),
|
||||
removed_cache_plugin_ids,
|
||||
failed_remote_plugin_ids: failed_remote_plugin_ids.into_iter().collect(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn fetch_remote_plugin_detail(
|
||||
config: &RemotePluginServiceConfig,
|
||||
auth: Option<&CodexAuth>,
|
||||
@@ -651,6 +869,96 @@ fn remove_remote_plugin_cache(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_stale_remote_plugin_caches(
|
||||
codex_home: &Path,
|
||||
installed_plugin_names_by_marketplace: &BTreeMap<String, BTreeSet<String>>,
|
||||
) -> Result<Vec<String>, String> {
|
||||
let mut removed_cache_plugin_ids = Vec::new();
|
||||
for marketplace_name in [
|
||||
REMOTE_GLOBAL_MARKETPLACE_NAME,
|
||||
REMOTE_WORKSPACE_MARKETPLACE_NAME,
|
||||
] {
|
||||
let marketplace_root = codex_home.join(PLUGINS_CACHE_DIR).join(marketplace_name);
|
||||
if !marketplace_root.exists() {
|
||||
continue;
|
||||
}
|
||||
let installed_plugin_names = installed_plugin_names_by_marketplace
|
||||
.get(marketplace_name)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
for entry in fs::read_dir(&marketplace_root).map_err(|err| {
|
||||
format!(
|
||||
"failed to read remote plugin cache directory {}: {err}",
|
||||
marketplace_root.display()
|
||||
)
|
||||
})? {
|
||||
let entry = entry.map_err(|err| {
|
||||
format!(
|
||||
"failed to enumerate remote plugin cache directory {}: {err}",
|
||||
marketplace_root.display()
|
||||
)
|
||||
})?;
|
||||
let plugin_name = entry.file_name().into_string().map_err(|file_name| {
|
||||
format!(
|
||||
"remote plugin cache entry under {} is not valid UTF-8: {:?}",
|
||||
marketplace_root.display(),
|
||||
file_name
|
||||
)
|
||||
})?;
|
||||
if installed_plugin_names.contains(&plugin_name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let cache_path = entry.path();
|
||||
if cache_path.is_dir() {
|
||||
fs::remove_dir_all(&cache_path).map_err(|err| {
|
||||
format!(
|
||||
"failed to remove stale remote plugin cache entry {}: {err}",
|
||||
cache_path.display()
|
||||
)
|
||||
})?;
|
||||
} else {
|
||||
fs::remove_file(&cache_path).map_err(|err| {
|
||||
format!(
|
||||
"failed to remove stale remote plugin cache entry {}: {err}",
|
||||
cache_path.display()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
let plugin_key = PluginId::new(plugin_name.clone(), marketplace_name.to_string())
|
||||
.map(|plugin_id| plugin_id.as_key())
|
||||
.unwrap_or_else(|_| format!("{plugin_name}@{marketplace_name}"));
|
||||
removed_cache_plugin_ids.push(plugin_key);
|
||||
}
|
||||
}
|
||||
|
||||
removed_cache_plugin_ids.sort();
|
||||
Ok(removed_cache_plugin_ids)
|
||||
}
|
||||
|
||||
fn mark_remote_installed_plugin_bundle_sync_in_flight(
|
||||
key: RemoteInstalledPluginBundleSyncKey,
|
||||
) -> bool {
|
||||
let syncs =
|
||||
REMOTE_INSTALLED_PLUGIN_BUNDLE_SYNC_IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
|
||||
let mut syncs = match syncs.lock() {
|
||||
Ok(syncs) => syncs,
|
||||
Err(err) => err.into_inner(),
|
||||
};
|
||||
syncs.insert(key)
|
||||
}
|
||||
|
||||
fn clear_remote_installed_plugin_bundle_sync_in_flight(key: &RemoteInstalledPluginBundleSyncKey) {
|
||||
let Some(syncs) = REMOTE_INSTALLED_PLUGIN_BUNDLE_SYNC_IN_FLIGHT.get() else {
|
||||
return;
|
||||
};
|
||||
let mut syncs = match syncs.lock() {
|
||||
Ok(syncs) => syncs,
|
||||
Err(err) => err.into_inner(),
|
||||
};
|
||||
syncs.remove(key);
|
||||
}
|
||||
|
||||
fn build_remote_plugin_summary(
|
||||
plugin: &RemotePluginDirectoryItem,
|
||||
installed_plugin: Option<&RemotePluginInstalledItem>,
|
||||
@@ -791,12 +1099,41 @@ async fn fetch_installed_plugins_for_scope(
|
||||
config: &RemotePluginServiceConfig,
|
||||
auth: &CodexAuth,
|
||||
scope: RemotePluginScope,
|
||||
) -> Result<Vec<RemotePluginInstalledItem>, RemotePluginCatalogError> {
|
||||
fetch_installed_plugins_for_scope_with_download_url_option(
|
||||
config, auth, scope, /*include_download_urls*/ false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn fetch_installed_plugins_for_scope_with_download_urls(
|
||||
config: &RemotePluginServiceConfig,
|
||||
auth: &CodexAuth,
|
||||
scope: RemotePluginScope,
|
||||
) -> Result<Vec<RemotePluginInstalledItem>, RemotePluginCatalogError> {
|
||||
fetch_installed_plugins_for_scope_with_download_url_option(
|
||||
config, auth, scope, /*include_download_urls*/ true,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn fetch_installed_plugins_for_scope_with_download_url_option(
|
||||
config: &RemotePluginServiceConfig,
|
||||
auth: &CodexAuth,
|
||||
scope: RemotePluginScope,
|
||||
include_download_urls: bool,
|
||||
) -> Result<Vec<RemotePluginInstalledItem>, RemotePluginCatalogError> {
|
||||
let mut plugins = Vec::new();
|
||||
let mut page_token = None;
|
||||
loop {
|
||||
let response =
|
||||
get_remote_plugin_installed_page(config, auth, scope, page_token.as_deref()).await?;
|
||||
let response = get_remote_plugin_installed_page(
|
||||
config,
|
||||
auth,
|
||||
scope,
|
||||
page_token.as_deref(),
|
||||
include_download_urls,
|
||||
)
|
||||
.await?;
|
||||
plugins.extend(response.plugins);
|
||||
let Some(next_page_token) = response.pagination.next_page_token else {
|
||||
break;
|
||||
@@ -829,12 +1166,16 @@ async fn get_remote_plugin_installed_page(
|
||||
auth: &CodexAuth,
|
||||
scope: RemotePluginScope,
|
||||
page_token: Option<&str>,
|
||||
include_download_urls: bool,
|
||||
) -> Result<RemotePluginInstalledResponse, RemotePluginCatalogError> {
|
||||
let base_url = config.chatgpt_base_url.trim_end_matches('/');
|
||||
let url = format!("{base_url}/ps/plugins/installed");
|
||||
let client = build_reqwest_client();
|
||||
let mut request = authenticated_request(client.get(&url), auth)?;
|
||||
request = request.query(&[("scope", scope.api_value())]);
|
||||
if include_download_urls {
|
||||
request = request.query(&[("includeDownloadUrls", true)]);
|
||||
}
|
||||
if let Some(page_token) = page_token {
|
||||
request = request.query(&[("pageToken", page_token)]);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user