Compare commits

...

1 Commits

Author SHA1 Message Date
xli-oai
0274a71984 Enforce remote plugin cache cleanup on account changes 2026-04-28 22:51:23 -07:00
5 changed files with 899 additions and 27 deletions

View File

@@ -287,6 +287,7 @@ use codex_core_plugins::remote::RemotePluginCatalogError;
use codex_core_plugins::remote::RemotePluginDetail as RemoteCatalogPluginDetail;
use codex_core_plugins::remote::RemotePluginServiceConfig;
use codex_core_plugins::remote::RemotePluginSummary as RemoteCatalogPluginSummary;
use codex_core_plugins::remote_cache;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::LOCAL_FS;
use codex_external_agent_sessions::ImportedExternalAgentSession;
@@ -506,6 +507,14 @@ fn thread_read_view_error(err: ThreadReadViewError) -> JSONRPCErrorError {
}
}
struct ChatgptLoginCompletionContext {
outgoing: Arc<OutgoingMessageSender>,
auth_manager: Arc<AuthManager>,
config_manager: ConfigManager,
fallback_config: Arc<Config>,
thread_manager: Arc<ThreadManager>,
}
impl Drop for ActiveLogin {
fn drop(&mut self) {
self.cancel();
@@ -727,6 +736,82 @@ impl CodexMessageProcessor {
self.thread_manager.skills_manager().clear_cache();
}
async fn enforce_remote_plugin_cache_after_auth_change(&self) -> Result<(), JSONRPCErrorError> {
let loaded_config = self
.config_manager
.load_latest_config(/*fallback_cwd*/ None)
.await;
let config = match loaded_config.as_ref() {
Ok(config) => config,
Err(err) => {
warn!(
error = %err,
"failed to reload config after auth change; using startup config for remote plugin cache cleanup"
);
self.config.as_ref()
}
};
let auth = self.auth_manager.auth().await;
let result = Self::sync_remote_plugin_cache_for_config(config, auth.as_ref()).await;
self.clear_plugin_related_caches();
result.map_err(|err| {
internal_error(format!(
"failed to clean remote plugin cache after auth change: {err}"
))
})
}
async fn sync_remote_plugin_cache_for_config(
config: &Config,
auth: Option<&CodexAuth>,
) -> Result<(), RemotePluginCatalogError> {
if config.features.enabled(Feature::Plugins)
&& config.features.enabled(Feature::RemotePlugin)
{
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
remote_cache::prune_remote_plugin_cache_for_current_auth(
&remote_plugin_service_config,
auth,
config.codex_home.to_path_buf(),
)
.await
} else {
remote_cache::clear_remote_plugin_cache(config.codex_home.to_path_buf()).await
}
}
async fn enforce_remote_plugin_cache_after_async_chatgpt_login(
auth_manager: Arc<AuthManager>,
config_manager: ConfigManager,
fallback_config: Arc<Config>,
thread_manager: Arc<ThreadManager>,
) {
let loaded_config = config_manager
.load_latest_config(/*fallback_cwd*/ None)
.await;
let config = match loaded_config.as_ref() {
Ok(config) => config,
Err(err) => {
warn!(
error = %err,
"failed to reload config after ChatGPT login; using startup config for remote plugin cache cleanup"
);
fallback_config.as_ref()
}
};
let auth = auth_manager.auth().await;
if let Err(err) = Self::sync_remote_plugin_cache_for_config(config, auth.as_ref()).await {
warn!(
error = %err,
"failed to clean remote plugin cache after ChatGPT login"
);
}
thread_manager.plugins_manager().clear_cache();
thread_manager.skills_manager().clear_cache();
}
fn current_account_updated_notification(&self) -> AccountUpdatedNotification {
let auth = self.auth_manager.auth_cached();
AccountUpdatedNotification {
@@ -1384,6 +1469,7 @@ impl CodexMessageProcessor {
) {
Ok(()) => {
self.auth_manager.reload().await;
self.enforce_remote_plugin_cache_after_auth_change().await?;
Ok(())
}
Err(err) => Err(JSONRPCErrorError {
@@ -1490,11 +1576,14 @@ impl CodexMessageProcessor {
});
}
let outgoing_clone = self.outgoing.clone();
let active_login = self.active_login.clone();
let auth_manager = self.auth_manager.clone();
let config_manager = self.config_manager.clone();
let chatgpt_base_url = self.config.chatgpt_base_url.clone();
let completion_context = ChatgptLoginCompletionContext {
outgoing: self.outgoing.clone(),
auth_manager: self.auth_manager.clone(),
config_manager: self.config_manager.clone(),
fallback_config: self.config.clone(),
thread_manager: self.thread_manager.clone(),
};
let auth_url = server.auth_url.clone();
tokio::spawn(async move {
let (success, error_msg) = match tokio::time::timeout(
@@ -1512,10 +1601,7 @@ impl CodexMessageProcessor {
};
Self::send_chatgpt_login_completion_notifications(
&outgoing_clone,
auth_manager,
config_manager,
chatgpt_base_url,
completion_context,
login_id,
success,
error_msg,
@@ -1564,11 +1650,14 @@ impl CodexMessageProcessor {
let verification_url = device_code.verification_url.clone();
let user_code = device_code.user_code.clone();
let outgoing_clone = self.outgoing.clone();
let active_login = self.active_login.clone();
let auth_manager = self.auth_manager.clone();
let config_manager = self.config_manager.clone();
let chatgpt_base_url = self.config.chatgpt_base_url.clone();
let completion_context = ChatgptLoginCompletionContext {
outgoing: self.outgoing.clone(),
auth_manager: self.auth_manager.clone(),
config_manager: self.config_manager.clone(),
fallback_config: self.config.clone(),
thread_manager: self.thread_manager.clone(),
};
tokio::spawn(async move {
let (success, error_msg) = tokio::select! {
_ = cancel.cancelled() => {
@@ -1583,10 +1672,7 @@ impl CodexMessageProcessor {
};
Self::send_chatgpt_login_completion_notifications(
&outgoing_clone,
auth_manager,
config_manager,
chatgpt_base_url,
completion_context,
login_id,
success,
error_msg,
@@ -1709,6 +1795,7 @@ impl CodexMessageProcessor {
self.config_manager
.sync_default_client_residency_requirement()
.await;
self.enforce_remote_plugin_cache_after_auth_change().await?;
Ok(LoginAccountResponse::ChatgptAuthTokens {})
}
@@ -1733,14 +1820,36 @@ impl CodexMessageProcessor {
}
async fn send_chatgpt_login_completion_notifications(
outgoing: &OutgoingMessageSender,
auth_manager: Arc<AuthManager>,
config_manager: ConfigManager,
chatgpt_base_url: String,
context: ChatgptLoginCompletionContext,
login_id: Uuid,
success: bool,
error_msg: Option<String>,
) {
let ChatgptLoginCompletionContext {
outgoing,
auth_manager,
config_manager,
fallback_config,
thread_manager,
} = context;
if success {
auth_manager.reload().await;
config_manager.replace_cloud_requirements_loader(
auth_manager.clone(),
fallback_config.chatgpt_base_url.clone(),
);
config_manager
.sync_default_client_residency_requirement()
.await;
Self::enforce_remote_plugin_cache_after_async_chatgpt_login(
auth_manager.clone(),
config_manager,
fallback_config,
thread_manager,
)
.await;
}
let payload_v2 = AccountLoginCompletedNotification {
login_id: Some(login_id.to_string()),
success,
@@ -1751,13 +1860,6 @@ impl CodexMessageProcessor {
.await;
if success {
auth_manager.reload().await;
config_manager
.replace_cloud_requirements_loader(auth_manager.clone(), chatgpt_base_url);
config_manager
.sync_default_client_residency_requirement()
.await;
let auth = auth_manager.auth_cached();
let payload_v2 = AccountUpdatedNotification {
auth_mode: auth.as_ref().map(CodexAuth::api_auth_mode),
@@ -1788,6 +1890,14 @@ impl CodexMessageProcessor {
});
}
}
remote_cache::clear_remote_plugin_cache(self.config.codex_home.to_path_buf())
.await
.map_err(|err| {
internal_error(format!(
"failed to clean remote plugin cache after logout: {err}"
))
})?;
self.clear_plugin_related_caches();
// Reflect the current auth method after logout (likely None).
Ok(self

View File

@@ -49,6 +49,7 @@ use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::query_param;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const LOGIN_ISSUER_ENV_VAR: &str = "CODEX_APP_SERVER_LOGIN_ISSUER";
@@ -60,8 +61,10 @@ struct CreateConfigTomlParams {
forced_workspace_id: Option<String>,
requires_openai_auth: Option<bool>,
base_url: Option<String>,
chatgpt_base_url: Option<String>,
model_provider_id: Option<String>,
extra_provider_config: Option<String>,
remote_plugin_enabled: bool,
}
fn create_config_toml(codex_home: &Path, params: CreateConfigTomlParams) -> std::io::Result<()> {
@@ -84,6 +87,15 @@ fn create_config_toml(codex_home: &Path, params: CreateConfigTomlParams) -> std:
Some(false) => String::new(),
None => String::new(),
};
let chatgpt_base_url_line = params
.chatgpt_base_url
.map(|base_url| format!("chatgpt_base_url = \"{base_url}\"\n"))
.unwrap_or_default();
let remote_plugin_feature_lines = if params.remote_plugin_enabled {
"plugins = true\nremote_plugin = true\n"
} else {
""
};
let model_provider_id = params
.model_provider_id
.unwrap_or_else(|| "mock_provider".to_string());
@@ -106,6 +118,7 @@ stream_max_retries = 0
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
{chatgpt_base_url_line}
{forced_line}
{forced_workspace_line}
@@ -113,6 +126,7 @@ model_provider = "{model_provider_id}"
[features]
shell_snapshot = false
{remote_plugin_feature_lines}
{provider_section}
"#
@@ -172,6 +186,83 @@ async fn mock_device_code_oauth_token(server: &MockServer, id_token: &str) {
.await;
}
async fn mock_empty_remote_plugin_directory(server: &MockServer) {
for scope in ["GLOBAL", "WORKSPACE"] {
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/list"))
.and(query_param("scope", scope))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"plugins": [],
"pagination": {
"next_page_token": null
}
})))
.mount(server)
.await;
}
}
async fn mock_remote_installed_plugins(
server: &MockServer,
scope: &'static str,
plugins: serde_json::Value,
) {
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/installed"))
.and(query_param("scope", scope))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"plugins": plugins,
"pagination": {
"next_page_token": null
}
})))
.mount(server)
.await;
}
fn remote_plugin_item(
remote_plugin_id: &str,
plugin_name: &str,
scope: &str,
enabled: bool,
) -> serde_json::Value {
json!({
"id": remote_plugin_id,
"name": plugin_name,
"scope": scope,
"installation_policy": "AVAILABLE",
"authentication_policy": "ON_USE",
"enabled": enabled,
"release": {
"version": "1.0.0",
"display_name": plugin_name,
"description": plugin_name,
"interface": {},
"app_ids": [],
"skills": []
}
})
}
fn write_remote_plugin_cache(
codex_home: &TempDir,
marketplace_name: &str,
plugin_name: &str,
) -> Result<()> {
let plugin_root = codex_home
.path()
.join("plugins/cache")
.join(marketplace_name)
.join(plugin_name)
.join("1.0.0/.codex-plugin");
std::fs::create_dir_all(&plugin_root)?;
std::fs::write(
plugin_root.join("plugin.json"),
format!(r#"{{"name":"{plugin_name}","version":"1.0.0"}}"#),
)?;
Ok(())
}
#[tokio::test]
async fn logout_account_removes_auth_and_notifies() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -230,6 +321,52 @@ async fn logout_account_removes_auth_and_notifies() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn logout_account_removes_remote_plugin_cache() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), CreateConfigTomlParams::default())?;
login_with_api_key(
codex_home.path(),
"sk-test-key",
AuthCredentialsStoreMode::File,
)?;
write_remote_plugin_cache(&codex_home, "chatgpt-global", "linear")?;
write_remote_plugin_cache(&codex_home, "chatgpt-workspace", "workspace-tool")?;
write_remote_plugin_cache(&codex_home, "openai-curated", "gmail")?;
let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let id = mcp.send_logout_account_request().await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(id)),
)
.await??;
let _ok: LogoutAccountResponse = to_response(resp)?;
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-global")
.exists()
);
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-workspace")
.exists()
);
assert!(
codex_home
.path()
.join("plugins/cache/openai-curated/gmail")
.is_dir()
);
Ok(())
}
#[tokio::test]
async fn set_auth_token_updates_account_and_notifies() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -306,6 +443,115 @@ async fn set_auth_token_updates_account_and_notifies() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn set_auth_token_prunes_remote_plugin_cache_to_current_account() -> Result<()> {
let codex_home = TempDir::new()?;
let mock_server = MockServer::start().await;
create_config_toml(
codex_home.path(),
CreateConfigTomlParams {
requires_openai_auth: Some(true),
base_url: Some(format!("{}/v1", mock_server.uri())),
chatgpt_base_url: Some(format!("{}/backend-api/", mock_server.uri())),
remote_plugin_enabled: true,
..Default::default()
},
)?;
write_models_cache(codex_home.path())?;
mock_empty_remote_plugin_directory(&mock_server).await;
mock_remote_installed_plugins(
&mock_server,
"GLOBAL",
json!([remote_plugin_item(
"plugins~Plugin_linear",
"linear",
"GLOBAL",
true
)]),
)
.await;
mock_remote_installed_plugins(
&mock_server,
"WORKSPACE",
json!([remote_plugin_item(
"plugins~Plugin_workspace",
"workspace-tool",
"WORKSPACE",
false
)]),
)
.await;
write_remote_plugin_cache(&codex_home, "chatgpt-global", "linear")?;
write_remote_plugin_cache(&codex_home, "chatgpt-global", "plugins~Plugin_linear")?;
write_remote_plugin_cache(&codex_home, "chatgpt-global", "stale-global")?;
write_remote_plugin_cache(&codex_home, "chatgpt-workspace", "workspace-tool")?;
write_remote_plugin_cache(&codex_home, "chatgpt-workspace", "stale-workspace")?;
write_remote_plugin_cache(&codex_home, "openai-curated", "gmail")?;
let access_token = encode_id_token(
&ChatGptIdTokenClaims::new()
.email("embedded@example.com")
.plan_type("pro")
.chatgpt_account_id("org-embedded"),
)?;
let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let set_id = mcp
.send_chatgpt_auth_tokens_login_request(
access_token,
"org-embedded".to_string(),
Some("pro".to_string()),
)
.await?;
let set_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(set_id)),
)
.await??;
let response: LoginAccountResponse = to_response(set_resp)?;
assert_eq!(response, LoginAccountResponse::ChatgptAuthTokens {});
assert!(
codex_home
.path()
.join("plugins/cache/chatgpt-global/linear")
.is_dir()
);
assert!(
codex_home
.path()
.join("plugins/cache/chatgpt-global/plugins~Plugin_linear")
.is_dir()
);
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-global/stale-global")
.exists()
);
assert!(
codex_home
.path()
.join("plugins/cache/chatgpt-workspace/workspace-tool")
.is_dir()
);
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-workspace/stale-workspace")
.exists()
);
assert!(
codex_home
.path()
.join("plugins/cache/openai-curated/gmail")
.is_dir()
);
Ok(())
}
#[tokio::test]
async fn account_read_refresh_token_is_noop_in_external_mode() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -928,6 +1174,49 @@ async fn login_account_api_key_succeeds_and_notifies() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn login_account_api_key_removes_remote_plugin_cache() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), CreateConfigTomlParams::default())?;
write_remote_plugin_cache(&codex_home, "chatgpt-global", "linear")?;
write_remote_plugin_cache(&codex_home, "chatgpt-workspace", "workspace-tool")?;
write_remote_plugin_cache(&codex_home, "openai-curated", "gmail")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let req_id = mcp
.send_login_account_api_key_request("sk-test-key")
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let login: LoginAccountResponse = to_response(resp)?;
assert_eq!(login, LoginAccountResponse::ApiKey {});
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-global")
.exists()
);
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-workspace")
.exists()
);
assert!(
codex_home
.path()
.join("plugins/cache/openai-curated/gmail")
.is_dir()
);
Ok(())
}
#[tokio::test]
async fn login_account_api_key_rejected_when_forced_chatgpt() -> Result<()> {
let codex_home = TempDir::new()?;

View File

@@ -7,6 +7,7 @@ pub mod marketplace_remove;
pub mod marketplace_upgrade;
pub mod remote;
pub mod remote_bundle;
pub mod remote_cache;
pub mod remote_legacy;
pub mod startup_sync;
pub mod store;

View File

@@ -0,0 +1,181 @@
use crate::remote::REMOTE_GLOBAL_MARKETPLACE_NAME;
use crate::remote::REMOTE_WORKSPACE_MARKETPLACE_NAME;
use crate::remote::RemoteMarketplace;
use crate::remote::RemotePluginCatalogError;
use crate::remote::RemotePluginServiceConfig;
use crate::remote::fetch_remote_marketplaces;
use crate::store::PLUGINS_CACHE_DIR;
use codex_login::CodexAuth;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use tracing::warn;
const REMOTE_MARKETPLACE_NAMES: [&str; 2] = [
REMOTE_GLOBAL_MARKETPLACE_NAME,
REMOTE_WORKSPACE_MARKETPLACE_NAME,
];
#[derive(Debug, Default)]
struct RemotePluginCacheRetainSet {
plugin_names_by_marketplace: BTreeMap<String, BTreeSet<String>>,
}
impl RemotePluginCacheRetainSet {
fn from_marketplaces(marketplaces: Vec<RemoteMarketplace>) -> Self {
let mut retain = Self::default();
for marketplace in marketplaces {
if !REMOTE_MARKETPLACE_NAMES.contains(&marketplace.name.as_str()) {
continue;
}
let plugin_names = retain
.plugin_names_by_marketplace
.entry(marketplace.name)
.or_default();
for plugin in marketplace.plugins {
if plugin.installed {
plugin_names.insert(plugin.name);
plugin_names.insert(plugin.id);
}
}
}
retain
}
fn contains(&self, marketplace_name: &str, plugin_cache_name: &str) -> bool {
self.plugin_names_by_marketplace
.get(marketplace_name)
.is_some_and(|plugin_names| plugin_names.contains(plugin_cache_name))
}
}
/// Remove all locally cached remote plugin bundles.
///
/// This is used when there is no authenticated ChatGPT account whose installed
/// plugin set can be trusted, such as logout or API-key login.
pub async fn clear_remote_plugin_cache(
codex_home: PathBuf,
) -> Result<(), RemotePluginCatalogError> {
run_cache_mutation("remote plugin cache clear", move || {
clear_remote_plugin_cache_blocking(codex_home.as_path())
})
.await
}
/// Keep only remote plugin cache entries that belong to the authenticated account.
///
/// Disabled remote plugins are retained because the backend still reports them
/// as installed; disabled state controls availability, not local cache ownership.
/// If the account cannot be read, all remote plugin cache entries are removed so
/// stale bundles from a previous account cannot stay visible locally.
pub async fn prune_remote_plugin_cache_for_current_auth(
config: &RemotePluginServiceConfig,
auth: Option<&CodexAuth>,
codex_home: PathBuf,
) -> Result<(), RemotePluginCatalogError> {
let marketplaces = match fetch_remote_marketplaces(config, auth).await {
Ok(marketplaces) => marketplaces,
Err(err) => {
warn!(
error = %err,
"failed to fetch account remote plugin state; clearing all remote plugin cache entries"
);
return clear_remote_plugin_cache(codex_home).await;
}
};
let retain = RemotePluginCacheRetainSet::from_marketplaces(marketplaces);
run_cache_mutation("remote plugin cache prune", move || {
prune_remote_plugin_cache_blocking(codex_home.as_path(), &retain)
})
.await
}
async fn run_cache_mutation<F>(
context: &'static str,
mutation: F,
) -> Result<(), RemotePluginCatalogError>
where
F: FnOnce() -> Result<(), String> + Send + 'static,
{
tokio::task::spawn_blocking(mutation)
.await
.map_err(|err| {
RemotePluginCatalogError::CacheRemove(format!("failed to join {context} task: {err}"))
})?
.map_err(RemotePluginCatalogError::CacheRemove)
}
fn clear_remote_plugin_cache_blocking(codex_home: &Path) -> Result<(), String> {
for marketplace_name in REMOTE_MARKETPLACE_NAMES {
remove_path_if_exists(&remote_marketplace_cache_root(codex_home, marketplace_name))?;
}
Ok(())
}
fn prune_remote_plugin_cache_blocking(
codex_home: &Path,
retain: &RemotePluginCacheRetainSet,
) -> Result<(), String> {
for marketplace_name in REMOTE_MARKETPLACE_NAMES {
let marketplace_root = remote_marketplace_cache_root(codex_home, marketplace_name);
if !marketplace_root.exists() {
continue;
}
if !marketplace_root.is_dir() {
remove_path_if_exists(&marketplace_root)?;
continue;
}
let entries = fs::read_dir(&marketplace_root).map_err(|err| {
format!(
"failed to read remote plugin cache namespace {}: {err}",
marketplace_root.display()
)
})?;
for entry in entries {
let entry = entry.map_err(|err| {
format!(
"failed to enumerate remote plugin cache namespace {}: {err}",
marketplace_root.display()
)
})?;
let plugin_cache_name = entry.file_name();
let Some(plugin_cache_name) = plugin_cache_name.to_str() else {
remove_path_if_exists(&entry.path())?;
continue;
};
if !retain.contains(marketplace_name, plugin_cache_name) {
remove_path_if_exists(&entry.path())?;
}
}
}
Ok(())
}
fn remote_marketplace_cache_root(codex_home: &Path, marketplace_name: &str) -> PathBuf {
codex_home.join(PLUGINS_CACHE_DIR).join(marketplace_name)
}
fn remove_path_if_exists(path: &Path) -> Result<(), String> {
if !path.exists() {
return Ok(());
}
let result = if path.is_dir() {
fs::remove_dir_all(path)
} else {
fs::remove_file(path)
};
result.map_err(|err| {
format!(
"failed to remove remote plugin cache entry {}: {err}",
path.display()
)
})
}
#[cfg(test)]
#[path = "remote_cache_tests.rs"]
mod tests;

View File

@@ -0,0 +1,291 @@
use super::*;
use anyhow::Result;
use codex_login::CodexAuth;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::query_param;
#[tokio::test]
async fn clear_remote_plugin_cache_removes_only_remote_namespaces() -> Result<()> {
let codex_home = TempDir::new()?;
write_cached_plugin(&codex_home, REMOTE_GLOBAL_MARKETPLACE_NAME, "linear")?;
write_cached_plugin(
&codex_home,
REMOTE_WORKSPACE_MARKETPLACE_NAME,
"workspace-tool",
)?;
write_cached_plugin(&codex_home, "openai-curated", "gmail")?;
write_cached_plugin(&codex_home, "debug", "sample")?;
let plugin_data = codex_home.path().join("plugins/data/linear-chatgpt-global");
std::fs::create_dir_all(&plugin_data)?;
clear_remote_plugin_cache(codex_home.path().to_path_buf()).await?;
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-global")
.exists()
);
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-workspace")
.exists()
);
assert!(
codex_home
.path()
.join("plugins/cache/openai-curated/gmail")
.is_dir()
);
assert!(
codex_home
.path()
.join("plugins/cache/debug/sample")
.is_dir()
);
assert!(plugin_data.is_dir());
Ok(())
}
#[tokio::test]
async fn prune_remote_plugin_cache_preserves_current_account_installed_plugins() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
mount_empty_directory_plugins(&server).await;
mount_installed_plugins(
&server,
"GLOBAL",
json!([remote_plugin_item(
"plugins~Plugin_linear",
"linear",
"GLOBAL",
true
)]),
)
.await;
mount_installed_plugins(
&server,
"WORKSPACE",
json!([remote_plugin_item(
"plugins~Plugin_workspace",
"workspace-tool",
"WORKSPACE",
false
)]),
)
.await;
write_cached_plugin(&codex_home, REMOTE_GLOBAL_MARKETPLACE_NAME, "linear")?;
write_cached_plugin(
&codex_home,
REMOTE_GLOBAL_MARKETPLACE_NAME,
"plugins~Plugin_linear",
)?;
write_cached_plugin(&codex_home, REMOTE_GLOBAL_MARKETPLACE_NAME, "stale-global")?;
write_cached_plugin(
&codex_home,
REMOTE_WORKSPACE_MARKETPLACE_NAME,
"workspace-tool",
)?;
write_cached_plugin(
&codex_home,
REMOTE_WORKSPACE_MARKETPLACE_NAME,
"stale-workspace",
)?;
write_cached_plugin(&codex_home, "openai-curated", "gmail")?;
let config = RemotePluginServiceConfig {
chatgpt_base_url: format!("{}/backend-api", server.uri()),
};
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
prune_remote_plugin_cache_for_current_auth(
&config,
Some(&auth),
codex_home.path().to_path_buf(),
)
.await?;
assert!(
codex_home
.path()
.join("plugins/cache/chatgpt-global/linear")
.is_dir()
);
assert!(
codex_home
.path()
.join("plugins/cache/chatgpt-global/plugins~Plugin_linear")
.is_dir()
);
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-global/stale-global")
.exists()
);
assert!(
codex_home
.path()
.join("plugins/cache/chatgpt-workspace/workspace-tool")
.is_dir()
);
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-workspace/stale-workspace")
.exists()
);
assert!(
codex_home
.path()
.join("plugins/cache/openai-curated/gmail")
.is_dir()
);
let requests = server.received_requests().await.unwrap_or_default();
let requested_paths = requests
.iter()
.map(|request| {
let query = request.url.query().unwrap_or_default();
format!("{}?{query}", request.url.path())
})
.collect::<Vec<_>>();
assert_eq!(
requested_paths
.iter()
.filter(|path| path.starts_with("/backend-api/ps/plugins/installed?"))
.count(),
2
);
Ok(())
}
#[tokio::test]
async fn prune_remote_plugin_cache_clears_all_remote_cache_when_account_fetch_fails() -> Result<()>
{
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_cached_plugin(&codex_home, REMOTE_GLOBAL_MARKETPLACE_NAME, "linear")?;
write_cached_plugin(
&codex_home,
REMOTE_WORKSPACE_MARKETPLACE_NAME,
"workspace-tool",
)?;
write_cached_plugin(&codex_home, "openai-curated", "gmail")?;
let config = RemotePluginServiceConfig {
chatgpt_base_url: format!("{}/backend-api", server.uri()),
};
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
prune_remote_plugin_cache_for_current_auth(
&config,
Some(&auth),
codex_home.path().to_path_buf(),
)
.await?;
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-global")
.exists()
);
assert!(
!codex_home
.path()
.join("plugins/cache/chatgpt-workspace")
.exists()
);
assert!(
codex_home
.path()
.join("plugins/cache/openai-curated/gmail")
.is_dir()
);
Ok(())
}
async fn mount_empty_directory_plugins(server: &MockServer) {
for scope in ["GLOBAL", "WORKSPACE"] {
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/list"))
.and(query_param("scope", scope))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"plugins": [],
"pagination": {
"next_page_token": null
}
})))
.mount(server)
.await;
}
}
async fn mount_installed_plugins(
server: &MockServer,
scope: &'static str,
plugins: serde_json::Value,
) {
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/installed"))
.and(query_param("scope", scope))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"plugins": plugins,
"pagination": {
"next_page_token": null
}
})))
.mount(server)
.await;
}
fn remote_plugin_item(
remote_plugin_id: &str,
plugin_name: &str,
scope: &str,
enabled: bool,
) -> serde_json::Value {
json!({
"id": remote_plugin_id,
"name": plugin_name,
"scope": scope,
"installation_policy": "AVAILABLE",
"authentication_policy": "ON_USE",
"enabled": enabled,
"release": {
"version": "1.0.0",
"display_name": plugin_name,
"description": plugin_name,
"interface": {},
"app_ids": [],
"skills": []
}
})
}
fn write_cached_plugin(
codex_home: &TempDir,
marketplace_name: &str,
plugin_name: &str,
) -> Result<()> {
let plugin_root = codex_home
.path()
.join("plugins/cache")
.join(marketplace_name)
.join(plugin_name)
.join("1.0.0/.codex-plugin");
std::fs::create_dir_all(&plugin_root)?;
std::fs::write(
plugin_root.join("plugin.json"),
format!(r#"{{"name":"{plugin_name}","version":"1.0.0"}}"#),
)?;
Ok(())
}