feat: refactor on openai-curated plugins. (#14427)

- Curated repo sync now uses GitHub HTTP, not local git.
- Curated plugin cache/versioning now uses commit SHA instead of local.
- Startup sync now always repairs or refreshes curated plugin cache from
tmp (auto update to the lastest)
This commit is contained in:
xl-openai
2026-03-11 23:18:58 -07:00
committed by GitHub
parent f6c6128fc7
commit b5f927b973
6 changed files with 820 additions and 259 deletions

View File

@@ -1,30 +1,67 @@
use crate::default_client::build_reqwest_client;
use reqwest::Client;
use serde::Deserialize;
use std::fs;
use std::io::Cursor;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::path::Component;
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::process::Output;
use std::process::Stdio;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use zip::ZipArchive;
const OPENAI_PLUGINS_REPO_URL: &str = "https://github.com/openai/plugins.git";
const GITHUB_API_BASE_URL: &str = "https://api.github.com";
const GITHUB_API_ACCEPT_HEADER: &str = "application/vnd.github+json";
const GITHUB_API_VERSION_HEADER: &str = "2022-11-28";
const OPENAI_PLUGINS_OWNER: &str = "openai";
const OPENAI_PLUGINS_REPO: &str = "plugins";
const CURATED_PLUGINS_RELATIVE_DIR: &str = ".tmp/plugins";
const CURATED_PLUGINS_SHA_FILE: &str = ".tmp/plugins.sha";
const CURATED_PLUGINS_GIT_TIMEOUT: Duration = Duration::from_secs(30);
const CURATED_PLUGINS_HTTP_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, Deserialize)]
struct GitHubRepositorySummary {
default_branch: String,
}
#[derive(Debug, Deserialize)]
struct GitHubGitRefSummary {
object: GitHubGitRefObject,
}
#[derive(Debug, Deserialize)]
struct GitHubGitRefObject {
sha: String,
}
pub(crate) fn curated_plugins_repo_path(codex_home: &Path) -> PathBuf {
codex_home.join(CURATED_PLUGINS_RELATIVE_DIR)
}
pub(crate) fn sync_openai_plugins_repo(codex_home: &Path) -> Result<(), String> {
pub(crate) fn read_curated_plugins_sha(codex_home: &Path) -> Option<String> {
read_sha_file(codex_home.join(CURATED_PLUGINS_SHA_FILE).as_path())
}
pub(crate) fn sync_openai_plugins_repo(codex_home: &Path) -> Result<String, String> {
sync_openai_plugins_repo_with_api_base_url(codex_home, GITHUB_API_BASE_URL)
}
fn sync_openai_plugins_repo_with_api_base_url(
codex_home: &Path,
api_base_url: &str,
) -> Result<String, String> {
let repo_path = curated_plugins_repo_path(codex_home);
let sha_path = codex_home.join(CURATED_PLUGINS_SHA_FILE);
let remote_sha = git_ls_remote_head_sha()?;
let local_sha = read_local_sha(&repo_path, &sha_path);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| format!("failed to create curated plugins sync runtime: {err}"))?;
let remote_sha = runtime.block_on(fetch_curated_repo_remote_sha(api_base_url))?;
let local_sha = read_sha_file(&sha_path);
if local_sha.as_deref() == Some(remote_sha.as_str()) && repo_path.join(".git").is_dir() {
return Ok(());
if local_sha.as_deref() == Some(remote_sha.as_str()) && repo_path.is_dir() {
return Ok(remote_sha);
}
let Some(parent) = repo_path.parent() else {
@@ -50,23 +87,18 @@ pub(crate) fn sync_openai_plugins_repo(codex_home: &Path) -> Result<(), String>
)
})?;
let cloned_repo_path = clone_dir.path().join("repo");
let clone_output = run_git_command_with_timeout(
Command::new("git")
.env("GIT_OPTIONAL_LOCKS", "0")
.arg("clone")
.arg("--depth")
.arg("1")
.arg(OPENAI_PLUGINS_REPO_URL)
.arg(&cloned_repo_path),
"git clone curated plugins repo",
CURATED_PLUGINS_GIT_TIMEOUT,
)?;
ensure_git_success(&clone_output, "git clone curated plugins repo")?;
let zipball_bytes = runtime.block_on(fetch_curated_repo_zipball(api_base_url, &remote_sha))?;
extract_zipball_to_dir(&zipball_bytes, &cloned_repo_path)?;
let cloned_sha = git_head_sha(&cloned_repo_path)?;
if cloned_sha != remote_sha {
if !cloned_repo_path
.join(".agents/plugins/marketplace.json")
.is_file()
{
return Err(format!(
"curated plugins clone HEAD mismatch: expected {remote_sha}, got {cloned_sha}"
"curated plugins archive missing marketplace manifest at {}",
cloned_repo_path
.join(".agents/plugins/marketplace.json")
.display()
));
}
@@ -123,156 +155,215 @@ pub(crate) fn sync_openai_plugins_repo(codex_home: &Path) -> Result<(), String>
)
})?;
}
fs::write(&sha_path, format!("{cloned_sha}\n")).map_err(|err| {
fs::write(&sha_path, format!("{remote_sha}\n")).map_err(|err| {
format!(
"failed to write curated plugins sha file {}: {err}",
sha_path.display()
)
})?;
Ok(())
Ok(remote_sha)
}
fn read_local_sha(repo_path: &Path, sha_path: &Path) -> Option<String> {
if repo_path.join(".git").is_dir()
&& let Ok(sha) = git_head_sha(repo_path)
{
return Some(sha);
async fn fetch_curated_repo_remote_sha(api_base_url: &str) -> Result<String, String> {
let api_base_url = api_base_url.trim_end_matches('/');
let repo_url = format!("{api_base_url}/repos/{OPENAI_PLUGINS_OWNER}/{OPENAI_PLUGINS_REPO}");
let client = build_reqwest_client();
let repo_body = fetch_github_text(&client, &repo_url, "get curated plugins repository").await?;
let repo_summary: GitHubRepositorySummary =
serde_json::from_str(&repo_body).map_err(|err| {
format!("failed to parse curated plugins repository response from {repo_url}: {err}")
})?;
if repo_summary.default_branch.is_empty() {
return Err(format!(
"curated plugins repository response from {repo_url} did not include a default branch"
));
}
let git_ref_url = format!("{repo_url}/git/ref/heads/{}", repo_summary.default_branch);
let git_ref_body =
fetch_github_text(&client, &git_ref_url, "get curated plugins HEAD ref").await?;
let git_ref: GitHubGitRefSummary = serde_json::from_str(&git_ref_body).map_err(|err| {
format!("failed to parse curated plugins ref response from {git_ref_url}: {err}")
})?;
if git_ref.object.sha.is_empty() {
return Err(format!(
"curated plugins ref response from {git_ref_url} did not include a HEAD sha"
));
}
Ok(git_ref.object.sha)
}
async fn fetch_curated_repo_zipball(
api_base_url: &str,
remote_sha: &str,
) -> Result<Vec<u8>, String> {
let api_base_url = api_base_url.trim_end_matches('/');
let repo_url = format!("{api_base_url}/repos/{OPENAI_PLUGINS_OWNER}/{OPENAI_PLUGINS_REPO}");
let zipball_url = format!("{repo_url}/zipball/{remote_sha}");
let client = build_reqwest_client();
fetch_github_bytes(&client, &zipball_url, "download curated plugins archive").await
}
async fn fetch_github_text(client: &Client, url: &str, context: &str) -> Result<String, String> {
let response = github_request(client, url)
.send()
.await
.map_err(|err| format!("failed to {context} from {url}: {err}"))?;
let status = response.status();
let body = response.text().await.unwrap_or_default();
if !status.is_success() {
return Err(format!(
"{context} from {url} failed with status {status}: {body}"
));
}
Ok(body)
}
async fn fetch_github_bytes(client: &Client, url: &str, context: &str) -> Result<Vec<u8>, String> {
let response = github_request(client, url)
.send()
.await
.map_err(|err| format!("failed to {context} from {url}: {err}"))?;
let status = response.status();
let body = response
.bytes()
.await
.map_err(|err| format!("failed to read {context} response from {url}: {err}"))?;
if !status.is_success() {
let body_text = String::from_utf8_lossy(&body);
return Err(format!(
"{context} from {url} failed with status {status}: {body_text}"
));
}
Ok(body.to_vec())
}
fn github_request(client: &Client, url: &str) -> reqwest::RequestBuilder {
client
.get(url)
.timeout(CURATED_PLUGINS_HTTP_TIMEOUT)
.header("accept", GITHUB_API_ACCEPT_HEADER)
.header("x-github-api-version", GITHUB_API_VERSION_HEADER)
}
fn read_sha_file(sha_path: &Path) -> Option<String> {
fs::read_to_string(sha_path)
.ok()
.map(|sha| sha.trim().to_string())
.filter(|sha| !sha.is_empty())
}
fn git_ls_remote_head_sha() -> Result<String, String> {
let output = run_git_command_with_timeout(
Command::new("git")
.env("GIT_OPTIONAL_LOCKS", "0")
.arg("ls-remote")
.arg(OPENAI_PLUGINS_REPO_URL)
.arg("HEAD"),
"git ls-remote curated plugins repo",
CURATED_PLUGINS_GIT_TIMEOUT,
)?;
ensure_git_success(&output, "git ls-remote curated plugins repo")?;
fn extract_zipball_to_dir(bytes: &[u8], destination: &Path) -> Result<(), String> {
fs::create_dir_all(destination).map_err(|err| {
format!(
"failed to create curated plugins extraction directory {}: {err}",
destination.display()
)
})?;
let stdout = String::from_utf8_lossy(&output.stdout);
let Some(first_line) = stdout.lines().next() else {
return Err("git ls-remote returned empty output for curated plugins repo".to_string());
};
let Some((sha, _)) = first_line.split_once('\t') else {
return Err(format!(
"unexpected git ls-remote output for curated plugins repo: {first_line}"
));
};
if sha.is_empty() {
return Err("git ls-remote returned empty sha for curated plugins repo".to_string());
}
Ok(sha.to_string())
}
let cursor = Cursor::new(bytes);
let mut archive = ZipArchive::new(cursor)
.map_err(|err| format!("failed to open curated plugins zip archive: {err}"))?;
fn git_head_sha(repo_path: &Path) -> Result<String, String> {
let output = Command::new("git")
.env("GIT_OPTIONAL_LOCKS", "0")
.arg("-C")
.arg(repo_path)
.arg("rev-parse")
.arg("HEAD")
.output()
.map_err(|err| {
for index in 0..archive.len() {
let mut entry = archive
.by_index(index)
.map_err(|err| format!("failed to read curated plugins zip entry: {err}"))?;
let Some(relative_path) = entry.enclosed_name() else {
return Err(format!(
"curated plugins zip entry `{}` escapes extraction root",
entry.name()
));
};
let mut components = relative_path.components();
let Some(Component::Normal(_)) = components.next() else {
continue;
};
let output_relative = components.fold(PathBuf::new(), |mut path, component| {
if let Component::Normal(segment) = component {
path.push(segment);
}
path
});
if output_relative.as_os_str().is_empty() {
continue;
}
let output_path = destination.join(&output_relative);
if entry.is_dir() {
fs::create_dir_all(&output_path).map_err(|err| {
format!(
"failed to create curated plugins directory {}: {err}",
output_path.display()
)
})?;
continue;
}
if let Some(parent) = output_path.parent() {
fs::create_dir_all(parent).map_err(|err| {
format!(
"failed to create curated plugins directory {}: {err}",
parent.display()
)
})?;
}
let mut output = fs::File::create(&output_path).map_err(|err| {
format!(
"failed to run git rev-parse HEAD in {}: {err}",
repo_path.display()
"failed to create curated plugins file {}: {err}",
output_path.display()
)
})?;
ensure_git_success(&output, "git rev-parse HEAD")?;
let sha = String::from_utf8_lossy(&output.stdout).trim().to_string();
if sha.is_empty() {
return Err(format!(
"git rev-parse HEAD returned empty output in {}",
repo_path.display()
));
std::io::copy(&mut entry, &mut output).map_err(|err| {
format!(
"failed to write curated plugins file {}: {err}",
output_path.display()
)
})?;
apply_zip_permissions(&entry, &output_path)?;
}
Ok(sha)
Ok(())
}
fn run_git_command_with_timeout(
command: &mut Command,
context: &str,
timeout: Duration,
) -> Result<Output, String> {
let mut child = command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|err| format!("failed to run {context}: {err}"))?;
let start = Instant::now();
loop {
match child.try_wait() {
Ok(Some(_)) => {
return child
.wait_with_output()
.map_err(|err| format!("failed to wait for {context}: {err}"));
}
Ok(None) => {}
Err(err) => return Err(format!("failed to poll {context}: {err}")),
}
if start.elapsed() >= timeout {
match child.try_wait() {
Ok(Some(_)) => {
return child
.wait_with_output()
.map_err(|err| format!("failed to wait for {context}: {err}"));
}
Ok(None) => {}
Err(err) => return Err(format!("failed to poll {context}: {err}")),
}
let _ = child.kill();
let output = child
.wait_with_output()
.map_err(|err| format!("failed to wait for {context} after timeout: {err}"))?;
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
return if stderr.is_empty() {
Err(format!("{context} timed out after {}s", timeout.as_secs()))
} else {
Err(format!(
"{context} timed out after {}s: {stderr}",
timeout.as_secs()
))
};
}
thread::sleep(Duration::from_millis(100));
}
}
fn ensure_git_success(output: &Output, context: &str) -> Result<(), String> {
if output.status.success() {
#[cfg(unix)]
fn apply_zip_permissions(entry: &zip::read::ZipFile<'_>, output_path: &Path) -> Result<(), String> {
let Some(mode) = entry.unix_mode() else {
return Ok(());
}
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
if stderr.is_empty() {
Err(format!("{context} failed with status {}", output.status))
} else {
Err(format!(
"{context} failed with status {}: {stderr}",
output.status
))
}
};
fs::set_permissions(output_path, fs::Permissions::from_mode(mode)).map_err(|err| {
format!(
"failed to set permissions on curated plugins file {}: {err}",
output_path.display()
)
})
}
#[cfg(not(unix))]
fn apply_zip_permissions(
_entry: &zip::read::ZipFile<'_>,
_output_path: &Path,
) -> Result<(), String> {
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use std::io::Write;
use tempfile::tempdir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use zip::ZipWriter;
use zip::write::SimpleFileOptions;
#[test]
fn curated_plugins_repo_path_uses_codex_home_tmp_dir() {
@@ -284,70 +375,145 @@ mod tests {
}
#[test]
fn read_local_sha_prefers_repo_head_when_available() {
fn read_curated_plugins_sha_reads_trimmed_sha_file() {
let tmp = tempdir().expect("tempdir");
let repo_path = tmp.path().join("repo");
let sha_path = tmp.path().join("plugins.sha");
fs::create_dir_all(tmp.path().join(".tmp")).expect("create tmp");
fs::write(tmp.path().join(".tmp/plugins.sha"), "abc123\n").expect("write sha");
fs::create_dir_all(&repo_path).expect("create repo dir");
fs::write(&sha_path, "abc123\n").expect("write sha");
let init_output = Command::new("git")
.arg("init")
.arg(&repo_path)
.output()
.expect("git init should run");
ensure_git_success(&init_output, "git init").expect("git init should succeed");
let config_name_output = Command::new("git")
.arg("-C")
.arg(&repo_path)
.arg("config")
.arg("user.name")
.arg("Codex")
.output()
.expect("git config user.name should run");
ensure_git_success(&config_name_output, "git config user.name")
.expect("git config user.name should succeed");
let config_email_output = Command::new("git")
.arg("-C")
.arg(&repo_path)
.arg("config")
.arg("user.email")
.arg("codex@example.com")
.output()
.expect("git config user.email should run");
ensure_git_success(&config_email_output, "git config user.email")
.expect("git config user.email should succeed");
fs::write(repo_path.join("README.md"), "demo\n").expect("write file");
let add_output = Command::new("git")
.arg("-C")
.arg(&repo_path)
.arg("add")
.arg(".")
.output()
.expect("git add should run");
ensure_git_success(&add_output, "git add").expect("git add should succeed");
let commit_output = Command::new("git")
.arg("-C")
.arg(&repo_path)
.arg("commit")
.arg("-m")
.arg("init")
.output()
.expect("git commit should run");
ensure_git_success(&commit_output, "git commit").expect("git commit should succeed");
let sha = read_local_sha(&repo_path, &sha_path);
assert_eq!(sha, Some(git_head_sha(&repo_path).expect("repo head sha")));
assert_eq!(
read_curated_plugins_sha(tmp.path()).as_deref(),
Some("abc123")
);
}
#[test]
fn read_local_sha_falls_back_to_sha_file() {
#[tokio::test]
async fn sync_openai_plugins_repo_downloads_zipball_and_records_sha() {
let tmp = tempdir().expect("tempdir");
let repo_path = tmp.path().join("repo");
let sha_path = tmp.path().join("plugins.sha");
fs::write(&sha_path, "abc123\n").expect("write sha");
let server = MockServer::start().await;
let sha = "0123456789abcdef0123456789abcdef01234567";
let sha = read_local_sha(&repo_path, &sha_path);
assert_eq!(sha.as_deref(), Some("abc123"));
Mock::given(method("GET"))
.and(path("/repos/openai/plugins"))
.respond_with(
ResponseTemplate::new(200).set_body_string(r#"{"default_branch":"main"}"#),
)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/repos/openai/plugins/git/ref/heads/main"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(format!(r#"{{"object":{{"sha":"{sha}"}}}}"#)),
)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path(format!("/repos/openai/plugins/zipball/{sha}")))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "application/zip")
.set_body_bytes(curated_repo_zipball_bytes(sha)),
)
.mount(&server)
.await;
let server_uri = server.uri();
let tmp_path = tmp.path().to_path_buf();
tokio::task::spawn_blocking(move || {
sync_openai_plugins_repo_with_api_base_url(tmp_path.as_path(), &server_uri)
})
.await
.expect("sync task should join")
.expect("sync should succeed");
let repo_path = curated_plugins_repo_path(tmp.path());
assert!(repo_path.join(".agents/plugins/marketplace.json").is_file());
assert!(
repo_path
.join("plugins/gmail/.codex-plugin/plugin.json")
.is_file()
);
assert_eq!(read_curated_plugins_sha(tmp.path()).as_deref(), Some(sha));
}
#[tokio::test]
async fn sync_openai_plugins_repo_skips_archive_download_when_sha_matches() {
let tmp = tempdir().expect("tempdir");
let repo_path = curated_plugins_repo_path(tmp.path());
fs::create_dir_all(repo_path.join(".agents/plugins")).expect("create repo");
fs::write(
repo_path.join(".agents/plugins/marketplace.json"),
r#"{"name":"openai-curated","plugins":[]}"#,
)
.expect("write marketplace");
fs::create_dir_all(tmp.path().join(".tmp")).expect("create tmp");
let sha = "fedcba9876543210fedcba9876543210fedcba98";
fs::write(tmp.path().join(".tmp/plugins.sha"), format!("{sha}\n")).expect("write sha");
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/repos/openai/plugins"))
.respond_with(
ResponseTemplate::new(200).set_body_string(r#"{"default_branch":"main"}"#),
)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/repos/openai/plugins/git/ref/heads/main"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(format!(r#"{{"object":{{"sha":"{sha}"}}}}"#)),
)
.mount(&server)
.await;
let server_uri = server.uri();
let tmp_path = tmp.path().to_path_buf();
tokio::task::spawn_blocking(move || {
sync_openai_plugins_repo_with_api_base_url(tmp_path.as_path(), &server_uri)
})
.await
.expect("sync task should join")
.expect("sync should succeed");
assert_eq!(read_curated_plugins_sha(tmp.path()).as_deref(), Some(sha));
assert!(repo_path.join(".agents/plugins/marketplace.json").is_file());
}
fn curated_repo_zipball_bytes(sha: &str) -> Vec<u8> {
let cursor = Cursor::new(Vec::new());
let mut writer = ZipWriter::new(cursor);
let options = SimpleFileOptions::default();
let root = format!("openai-plugins-{sha}");
writer
.start_file(format!("{root}/.agents/plugins/marketplace.json"), options)
.expect("start marketplace entry");
writer
.write_all(
br#"{
"name": "openai-curated",
"plugins": [
{
"name": "gmail",
"source": {
"source": "local",
"path": "./plugins/gmail"
}
}
]
}"#,
)
.expect("write marketplace");
writer
.start_file(
format!("{root}/plugins/gmail/.codex-plugin/plugin.json"),
options,
)
.expect("start plugin manifest entry");
writer
.write_all(br#"{"name":"gmail"}"#)
.expect("write plugin manifest");
writer.finish().expect("finish zip writer").into_inner()
}
}

View File

@@ -11,6 +11,7 @@ use super::marketplace::load_marketplace_summary;
use super::marketplace::resolve_marketplace_plugin;
use super::plugin_manifest_name;
use super::plugin_manifest_paths;
use super::read_curated_plugins_sha;
use super::store::DEFAULT_PLUGIN_VERSION;
use super::store::PluginId;
use super::store::PluginIdError;
@@ -45,6 +46,7 @@ use std::collections::HashSet;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@@ -56,7 +58,6 @@ use tracing::warn;
const DEFAULT_SKILLS_DIR_NAME: &str = "skills";
const DEFAULT_MCP_CONFIG_FILE: &str = ".mcp.json";
const DEFAULT_APP_CONFIG_FILE: &str = ".app.json";
const DISABLE_CURATED_PLUGIN_SYNC_ENV_VAR: &str = "CODEX_DISABLE_CURATED_PLUGIN_SYNC";
const OPENAI_CURATED_MARKETPLACE_NAME: &str = "openai-curated";
const REMOTE_PLUGIN_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false);
@@ -395,9 +396,25 @@ impl PluginsManager {
) -> Result<PluginInstallOutcome, PluginInstallError> {
let resolved = resolve_marketplace_plugin(&request.marketplace_path, &request.plugin_name)?;
let auth_policy = resolved.auth_policy;
let plugin_version =
if resolved.plugin_id.marketplace_name == OPENAI_CURATED_MARKETPLACE_NAME {
Some(
read_curated_plugins_sha(self.codex_home.as_path()).ok_or_else(|| {
PluginStoreError::Invalid(
"local curated marketplace sha is not available".to_string(),
)
})?,
)
} else {
None
};
let store = self.store.clone();
let result: StorePluginInstallResult = tokio::task::spawn_blocking(move || {
store.install(resolved.source_path, resolved.plugin_id)
if let Some(plugin_version) = plugin_version {
store.install_with_version(resolved.source_path, resolved.plugin_id, plugin_version)
} else {
store.install(resolved.source_path, resolved.plugin_id)
}
})
.await
.map_err(PluginInstallError::join)??;
@@ -464,8 +481,19 @@ impl PluginsManager {
};
let marketplace_name = curated_marketplace.name.clone();
let mut local_plugins =
Vec::<(String, PluginId, AbsolutePathBuf, Option<bool>, bool)>::new();
let curated_plugin_version = read_curated_plugins_sha(self.codex_home.as_path())
.ok_or_else(|| {
PluginStoreError::Invalid(
"local curated marketplace sha is not available".to_string(),
)
})?;
let mut local_plugins = Vec::<(
String,
PluginId,
AbsolutePathBuf,
Option<bool>,
Option<String>,
)>::new();
let mut local_plugin_names = HashSet::new();
for plugin in curated_marketplace.plugins {
let plugin_name = plugin.name;
@@ -486,13 +514,13 @@ impl PluginsManager {
let current_enabled = configured_plugins
.get(&plugin_key)
.map(|plugin| plugin.enabled);
let is_installed = self.store.is_installed(&plugin_id);
let installed_version = self.store.active_plugin_version(&plugin_id);
local_plugins.push((
plugin_name,
plugin_id,
source_path,
current_enabled,
is_installed,
installed_version,
));
}
@@ -528,11 +556,20 @@ impl PluginsManager {
let remote_plugin_count = remote_enabled_by_name.len();
let local_plugin_count = local_plugins.len();
for (plugin_name, plugin_id, source_path, current_enabled, is_installed) in local_plugins {
for (plugin_name, plugin_id, source_path, current_enabled, installed_version) in
local_plugins
{
let plugin_key = plugin_id.as_key();
let is_installed = installed_version.is_some();
if let Some(enabled) = remote_enabled_by_name.get(&plugin_name).copied() {
if !is_installed {
installs.push((source_path, plugin_id.clone()));
installs.push((
source_path,
plugin_id.clone(),
curated_plugin_version.clone(),
));
}
if !is_installed {
result.installed_plugin_ids.push(plugin_key.clone());
}
@@ -565,8 +602,8 @@ impl PluginsManager {
let store = self.store.clone();
let store_result = tokio::task::spawn_blocking(move || {
for (source_path, plugin_id) in installs {
store.install(source_path, plugin_id)?;
for (source_path, plugin_id, plugin_version) in installs {
store.install_with_version(source_path, plugin_id, plugin_version)?;
}
for plugin_id in uninstalls {
store.uninstall(&plugin_id)?;
@@ -668,28 +705,67 @@ impl PluginsManager {
.collect())
}
pub fn maybe_start_curated_repo_sync_for_config(&self, config: &Config) {
pub fn maybe_start_curated_repo_sync_for_config(self: &Arc<Self>, config: &Config) {
if plugins_feature_enabled_from_stack(&config.config_layer_stack) {
self.start_curated_repo_sync();
let mut configured_curated_plugin_ids =
configured_plugins_from_stack(&config.config_layer_stack)
.into_keys()
.filter_map(|plugin_key| match PluginId::parse(&plugin_key) {
Ok(plugin_id)
if plugin_id.marketplace_name == OPENAI_CURATED_MARKETPLACE_NAME =>
{
Some(plugin_id)
}
Ok(_) => None,
Err(err) => {
warn!(
plugin_key,
error = %err,
"ignoring invalid configured plugin key during curated sync setup"
);
None
}
})
.collect::<Vec<_>>();
configured_curated_plugin_ids.sort_unstable_by_key(super::store::PluginId::as_key);
self.start_curated_repo_sync(configured_curated_plugin_ids);
}
}
pub fn start_curated_repo_sync(&self) {
if std::env::var_os(DISABLE_CURATED_PLUGIN_SYNC_ENV_VAR).is_some() {
return;
}
fn start_curated_repo_sync(self: &Arc<Self>, configured_curated_plugin_ids: Vec<PluginId>) {
if CURATED_REPO_SYNC_STARTED.swap(true, Ordering::SeqCst) {
return;
}
let manager = Arc::clone(self);
let codex_home = self.codex_home.clone();
if let Err(err) = std::thread::Builder::new()
.name("plugins-curated-repo-sync".to_string())
.spawn(move || {
if let Err(err) = sync_openai_plugins_repo(codex_home.as_path()) {
CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst);
warn!("failed to sync curated plugins repo: {err}");
}
})
.spawn(
move || match sync_openai_plugins_repo(codex_home.as_path()) {
Ok(curated_plugin_version) => {
match refresh_curated_plugin_cache(
codex_home.as_path(),
&curated_plugin_version,
&configured_curated_plugin_ids,
) {
Ok(cache_refreshed) => {
if cache_refreshed {
manager.clear_cache();
}
}
Err(err) => {
manager.clear_cache();
CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst);
warn!("failed to refresh curated plugin cache after sync: {err}");
}
}
}
Err(err) => {
CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst);
warn!("failed to sync curated plugins repo: {err}");
}
},
)
{
CURATED_REPO_SYNC_STARTED.store(false, Ordering::SeqCst);
warn!("failed to start curated plugins repo sync task: {err}");
@@ -906,6 +982,65 @@ pub(crate) fn plugin_namespace_for_skill_path(path: &Path) -> Option<String> {
None
}
fn refresh_curated_plugin_cache(
codex_home: &Path,
plugin_version: &str,
configured_curated_plugin_ids: &[PluginId],
) -> Result<bool, String> {
let store = PluginStore::new(codex_home.to_path_buf());
let curated_marketplace_path = AbsolutePathBuf::try_from(
curated_plugins_repo_path(codex_home).join(".agents/plugins/marketplace.json"),
)
.map_err(|_| "local curated marketplace is not available".to_string())?;
let curated_marketplace = load_marketplace_summary(&curated_marketplace_path)
.map_err(|err| format!("failed to load curated marketplace for cache refresh: {err}"))?;
let mut plugin_sources = HashMap::<String, AbsolutePathBuf>::new();
for plugin in curated_marketplace.plugins {
let plugin_name = plugin.name;
if plugin_sources.contains_key(&plugin_name) {
warn!(
plugin = plugin_name,
marketplace = OPENAI_CURATED_MARKETPLACE_NAME,
"ignoring duplicate curated plugin entry during cache refresh"
);
continue;
}
let source_path = match plugin.source {
MarketplacePluginSourceSummary::Local { path } => path,
};
plugin_sources.insert(plugin_name, source_path);
}
let mut cache_refreshed = false;
for plugin_id in configured_curated_plugin_ids {
if store.active_plugin_version(plugin_id).as_deref() == Some(plugin_version) {
continue;
}
let Some(source_path) = plugin_sources.get(&plugin_id.plugin_name).cloned() else {
warn!(
plugin = plugin_id.plugin_name,
marketplace = OPENAI_CURATED_MARKETPLACE_NAME,
"configured curated plugin no longer exists in curated marketplace during cache refresh"
);
continue;
};
store
.install_with_version(source_path, plugin_id.clone(), plugin_version.to_string())
.map_err(|err| {
format!(
"failed to refresh curated plugin cache for {}: {err}",
plugin_id.as_key()
)
})?;
cache_refreshed = true;
}
Ok(cache_refreshed)
}
fn configured_plugins_from_stack(
config_layer_stack: &ConfigLayerStack,
) -> HashMap<String, PluginConfig> {
@@ -926,9 +1061,11 @@ fn configured_plugins_from_stack(
}
fn load_plugin(config_name: String, plugin: &PluginConfig, store: &PluginStore) -> LoadedPlugin {
let plugin_version = DEFAULT_PLUGIN_VERSION.to_string();
let plugin_root = PluginId::parse(&config_name)
.map(|plugin_id| store.plugin_root(&plugin_id, &plugin_version));
let plugin_root = PluginId::parse(&config_name).map(|plugin_id| {
store
.active_plugin_root(&plugin_id)
.unwrap_or_else(|| store.plugin_root(&plugin_id, DEFAULT_PLUGIN_VERSION))
});
let root = match &plugin_root {
Ok(plugin_root) => plugin_root.clone(),
Err(_) => store.root().clone(),
@@ -1227,6 +1364,8 @@ mod tests {
use wiremock::matchers::method;
use wiremock::matchers::path;
const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567";
fn write_file(path: &Path, contents: &str) {
fs::create_dir_all(path.parent().expect("file should have a parent")).unwrap();
fs::write(path, contents).unwrap();
@@ -1246,7 +1385,6 @@ mod tests {
}
fn write_openai_curated_marketplace(root: &Path, plugin_names: &[&str]) {
fs::create_dir_all(root.join(".git")).unwrap();
fs::create_dir_all(root.join(".agents/plugins")).unwrap();
let plugins = plugin_names
.iter()
@@ -1280,6 +1418,10 @@ mod tests {
}
}
fn write_curated_plugin_sha(codex_home: &Path, sha: &str) {
write_file(&codex_home.join(".tmp/plugins.sha"), &format!("{sha}\n"));
}
fn plugin_config_toml(enabled: bool, plugins_feature_enabled: bool) -> String {
let mut root = toml::map::Map::new();
@@ -2134,7 +2276,6 @@ enabled = false
let curated_root = curated_plugins_repo_path(tmp.path());
let plugin_root = curated_root.join("plugins/linear");
fs::create_dir_all(curated_root.join(".git")).unwrap();
fs::create_dir_all(curated_root.join(".agents/plugins")).unwrap();
fs::create_dir_all(plugin_root.join(".codex-plugin")).unwrap();
fs::write(
@@ -2404,6 +2545,7 @@ enabled = true
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["linear", "gmail", "calendar"]);
write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA);
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
"linear/local",
@@ -2469,7 +2611,9 @@ enabled = true
);
assert!(
tmp.path()
.join("plugins/cache/openai-curated/gmail/local")
.join(format!(
"plugins/cache/openai-curated/gmail/{TEST_CURATED_PLUGIN_SHA}"
))
.is_dir()
);
assert!(
@@ -2511,6 +2655,7 @@ enabled = true
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["linear"]);
write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA);
write_file(
&tmp.path().join(CONFIG_TOML_FILE),
r#"[features]
@@ -2566,6 +2711,7 @@ enabled = false
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["linear", "gmail"]);
write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA);
fs::remove_dir_all(curated_root.join("plugins/gmail")).unwrap();
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
@@ -2630,7 +2776,7 @@ enabled = false
async fn sync_plugins_from_remote_uses_first_duplicate_local_plugin_entry() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
fs::create_dir_all(curated_root.join(".git")).unwrap();
write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA);
fs::create_dir_all(curated_root.join(".agents/plugins")).unwrap();
fs::write(
curated_root.join(".agents/plugins/marketplace.json"),
@@ -2702,15 +2848,98 @@ plugins = true
}
);
assert_eq!(
fs::read_to_string(
tmp.path()
.join("plugins/cache/openai-curated/gmail/local/marker.txt")
)
fs::read_to_string(tmp.path().join(format!(
"plugins/cache/openai-curated/gmail/{TEST_CURATED_PLUGIN_SHA}/marker.txt"
)))
.unwrap(),
"first"
);
}
#[test]
fn refresh_curated_plugin_cache_replaces_existing_local_version_with_sha() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["slack"]);
write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA);
let plugin_id = PluginId::new(
"slack".to_string(),
OPENAI_CURATED_MARKETPLACE_NAME.to_string(),
)
.unwrap();
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
"slack/local",
"slack",
);
assert!(
refresh_curated_plugin_cache(tmp.path(), TEST_CURATED_PLUGIN_SHA, &[plugin_id])
.expect("cache refresh should succeed")
);
assert!(
!tmp.path()
.join("plugins/cache/openai-curated/slack/local")
.exists()
);
assert!(
tmp.path()
.join(format!(
"plugins/cache/openai-curated/slack/{TEST_CURATED_PLUGIN_SHA}"
))
.is_dir()
);
}
#[test]
fn refresh_curated_plugin_cache_reinstalls_missing_configured_plugin_with_current_sha() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["slack"]);
write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA);
let plugin_id = PluginId::new(
"slack".to_string(),
OPENAI_CURATED_MARKETPLACE_NAME.to_string(),
)
.unwrap();
assert!(
refresh_curated_plugin_cache(tmp.path(), TEST_CURATED_PLUGIN_SHA, &[plugin_id])
.expect("cache refresh should recreate missing configured plugin")
);
assert!(
tmp.path()
.join(format!(
"plugins/cache/openai-curated/slack/{TEST_CURATED_PLUGIN_SHA}"
))
.is_dir()
);
}
#[test]
fn refresh_curated_plugin_cache_returns_false_when_configured_plugins_are_current() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["slack"]);
let plugin_id = PluginId::new(
"slack".to_string(),
OPENAI_CURATED_MARKETPLACE_NAME.to_string(),
)
.unwrap();
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
&format!("slack/{TEST_CURATED_PLUGIN_SHA}"),
"slack",
);
assert!(
!refresh_curated_plugin_cache(tmp.path(), TEST_CURATED_PLUGIN_SHA, &[plugin_id])
.expect("cache refresh should be a no-op when configured plugins are current")
);
}
#[test]
fn load_plugins_ignores_project_config_files() {
let codex_home = TempDir::new().unwrap();

View File

@@ -245,6 +245,15 @@ fn discover_marketplace_paths_from_roots(
}
for root in additional_roots {
// Curated marketplaces can now come from an HTTP-downloaded directory that is not a git
// checkout, so check the root directly before falling back to repo-root discovery.
if let Ok(path) = root.join(MARKETPLACE_RELATIVE_PATH)
&& path.as_path().is_file()
&& !paths.contains(&path)
{
paths.push(path);
continue;
}
if let Some(repo_root) = get_git_repo_root(root.as_path())
&& let Ok(repo_root) = AbsolutePathBuf::try_from(repo_root)
&& let Ok(path) = repo_root.join(MARKETPLACE_RELATIVE_PATH)

View File

@@ -7,6 +7,7 @@ mod render;
mod store;
pub(crate) use curated_repo::curated_plugins_repo_path;
pub(crate) use curated_repo::read_curated_plugins_sha;
pub(crate) use curated_repo::sync_openai_plugins_repo;
pub(crate) use injection::build_plugin_injections;
pub use manager::AppConnectorId;

View File

@@ -81,27 +81,65 @@ impl PluginStore {
&self.root
}
pub fn plugin_root(&self, plugin_id: &PluginId, plugin_version: &str) -> AbsolutePathBuf {
pub fn plugin_base_root(&self, plugin_id: &PluginId) -> AbsolutePathBuf {
AbsolutePathBuf::try_from(
self.root
.as_path()
.join(&plugin_id.marketplace_name)
.join(&plugin_id.plugin_name)
.join(&plugin_id.plugin_name),
)
.unwrap_or_else(|err| panic!("plugin cache path should resolve to an absolute path: {err}"))
}
pub fn plugin_root(&self, plugin_id: &PluginId, plugin_version: &str) -> AbsolutePathBuf {
AbsolutePathBuf::try_from(
self.plugin_base_root(plugin_id)
.as_path()
.join(plugin_version),
)
.unwrap_or_else(|err| panic!("plugin cache path should resolve to an absolute path: {err}"))
}
pub fn active_plugin_version(&self, plugin_id: &PluginId) -> Option<String> {
let mut discovered_versions = fs::read_dir(self.plugin_base_root(plugin_id).as_path())
.ok()?
.filter_map(Result::ok)
.filter_map(|entry| {
entry.file_type().ok().filter(std::fs::FileType::is_dir)?;
entry.file_name().into_string().ok()
})
.filter(|version| validate_plugin_segment(version, "plugin version").is_ok())
.collect::<Vec<_>>();
discovered_versions.sort_unstable();
if discovered_versions.len() == 1 {
discovered_versions.pop()
} else {
None
}
}
pub fn active_plugin_root(&self, plugin_id: &PluginId) -> Option<AbsolutePathBuf> {
self.active_plugin_version(plugin_id)
.map(|plugin_version| self.plugin_root(plugin_id, &plugin_version))
}
pub fn is_installed(&self, plugin_id: &PluginId) -> bool {
self.plugin_root(plugin_id, DEFAULT_PLUGIN_VERSION)
.as_path()
.is_dir()
self.active_plugin_version(plugin_id).is_some()
}
pub fn install(
&self,
source_path: AbsolutePathBuf,
plugin_id: PluginId,
) -> Result<PluginInstallResult, PluginStoreError> {
self.install_with_version(source_path, plugin_id, DEFAULT_PLUGIN_VERSION.to_string())
}
pub fn install_with_version(
&self,
source_path: AbsolutePathBuf,
plugin_id: PluginId,
plugin_version: String,
) -> Result<PluginInstallResult, PluginStoreError> {
if !source_path.as_path().is_dir() {
return Err(PluginStoreError::Invalid(format!(
@@ -117,17 +155,14 @@ impl PluginStore {
plugin_id.plugin_name
)));
}
let plugin_version = DEFAULT_PLUGIN_VERSION.to_string();
validate_plugin_segment(&plugin_version, "plugin version")
.map_err(PluginStoreError::Invalid)?;
let installed_path = self.plugin_root(&plugin_id, &plugin_version);
if let Some(parent) = installed_path.parent() {
fs::create_dir_all(parent.as_path()).map_err(|err| {
PluginStoreError::io("failed to create plugin cache directory", err)
})?;
}
remove_existing_target(installed_path.as_path())?;
copy_dir_recursive(source_path.as_path(), installed_path.as_path())?;
replace_plugin_root_atomically(
source_path.as_path(),
self.plugin_base_root(&plugin_id).as_path(),
&plugin_version,
)?;
Ok(PluginInstallResult {
plugin_id,
@@ -137,12 +172,7 @@ impl PluginStore {
}
pub fn uninstall(&self, plugin_id: &PluginId) -> Result<(), PluginStoreError> {
let plugin_path = self
.root
.as_path()
.join(&plugin_id.marketplace_name)
.join(&plugin_id.plugin_name);
remove_existing_target(&plugin_path)
remove_existing_target(self.plugin_base_root(plugin_id).as_path())
}
}
@@ -218,6 +248,73 @@ fn remove_existing_target(path: &Path) -> Result<(), PluginStoreError> {
}
}
fn replace_plugin_root_atomically(
source: &Path,
target_root: &Path,
plugin_version: &str,
) -> Result<(), PluginStoreError> {
let Some(parent) = target_root.parent() else {
return Err(PluginStoreError::Invalid(format!(
"plugin cache path has no parent: {}",
target_root.display()
)));
};
fs::create_dir_all(parent)
.map_err(|err| PluginStoreError::io("failed to create plugin cache directory", err))?;
let Some(plugin_dir_name) = target_root.file_name() else {
return Err(PluginStoreError::Invalid(format!(
"plugin cache path has no directory name: {}",
target_root.display()
)));
};
let staged_dir = tempfile::Builder::new()
.prefix("plugin-install-")
.tempdir_in(parent)
.map_err(|err| {
PluginStoreError::io("failed to create temporary plugin cache directory", err)
})?;
let staged_root = staged_dir.path().join(plugin_dir_name);
let staged_version_root = staged_root.join(plugin_version);
copy_dir_recursive(source, &staged_version_root)?;
if target_root.exists() {
let backup_dir = tempfile::Builder::new()
.prefix("plugin-backup-")
.tempdir_in(parent)
.map_err(|err| {
PluginStoreError::io("failed to create plugin cache backup directory", err)
})?;
let backup_root = backup_dir.path().join(plugin_dir_name);
fs::rename(target_root, &backup_root)
.map_err(|err| PluginStoreError::io("failed to back up plugin cache entry", err))?;
if let Err(err) = fs::rename(&staged_root, target_root) {
let rollback_result = fs::rename(&backup_root, target_root);
return match rollback_result {
Ok(()) => Err(PluginStoreError::io(
"failed to activate updated plugin cache entry",
err,
)),
Err(rollback_err) => {
let backup_path = backup_dir.keep().join(plugin_dir_name);
Err(PluginStoreError::Invalid(format!(
"failed to activate updated plugin cache entry at {}: {err}; failed to restore previous cache entry (left at {}): {rollback_err}",
target_root.display(),
backup_path.display()
)))
}
};
}
} else {
fs::rename(&staged_root, target_root)
.map_err(|err| PluginStoreError::io("failed to activate plugin cache entry", err))?;
}
Ok(())
}
fn copy_dir_recursive(source: &Path, target: &Path) -> Result<(), PluginStoreError> {
fs::create_dir_all(target)
.map_err(|err| PluginStoreError::io("failed to create plugin target directory", err))?;
@@ -327,6 +424,57 @@ mod tests {
);
}
#[test]
fn install_with_version_uses_requested_cache_version() {
let tmp = tempdir().unwrap();
write_plugin(tmp.path(), "sample-plugin", "sample-plugin");
let plugin_id =
PluginId::new("sample-plugin".to_string(), "openai-curated".to_string()).unwrap();
let plugin_version = "0123456789abcdef".to_string();
let result = PluginStore::new(tmp.path().to_path_buf())
.install_with_version(
AbsolutePathBuf::try_from(tmp.path().join("sample-plugin")).unwrap(),
plugin_id.clone(),
plugin_version.clone(),
)
.unwrap();
let installed_path = tmp.path().join(format!(
"plugins/cache/openai-curated/sample-plugin/{plugin_version}"
));
assert_eq!(
result,
PluginInstallResult {
plugin_id,
plugin_version,
installed_path: AbsolutePathBuf::try_from(installed_path.clone()).unwrap(),
}
);
assert!(installed_path.join(".codex-plugin/plugin.json").is_file());
}
#[test]
fn active_plugin_version_reads_version_directory_name() {
let tmp = tempdir().unwrap();
write_plugin(
&tmp.path().join("plugins/cache/debug"),
"sample-plugin/local",
"sample-plugin",
);
let store = PluginStore::new(tmp.path().to_path_buf());
let plugin_id = PluginId::new("sample-plugin".to_string(), "debug".to_string()).unwrap();
assert_eq!(
store.active_plugin_version(&plugin_id),
Some("local".to_string())
);
assert_eq!(
store.active_plugin_root(&plugin_id).unwrap().as_path(),
tmp.path().join("plugins/cache/debug/sample-plugin/local")
);
}
#[test]
fn plugin_root_rejects_path_separators_in_key_segments() {
let err = PluginId::parse("../../etc@debug").unwrap_err();