mirror of
https://github.com/openai/codex.git
synced 2026-05-01 01:47:18 +00:00
feat: bind package manager (#13571)
This commit is contained in:
@@ -1,19 +1,27 @@
|
||||
use fd_lock::RwLock as FileRwLock;
|
||||
use flate2::read::GzDecoder;
|
||||
use reqwest::Client;
|
||||
use serde::de::DeserializeOwned;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Component;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use tar::Archive;
|
||||
use tempfile::tempdir_in;
|
||||
use thiserror::Error;
|
||||
use tokio::fs;
|
||||
use tokio::time::sleep;
|
||||
use url::Url;
|
||||
use zip::ZipArchive;
|
||||
|
||||
const INSTALL_LOCK_POLL_INTERVAL: Duration = Duration::from_millis(50);
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct PackageManagerConfig<P> {
|
||||
codex_home: PathBuf,
|
||||
@@ -86,6 +94,14 @@ impl<P: ManagedPackage> PackageManager<P> {
|
||||
.config
|
||||
.package()
|
||||
.install_dir(&self.config.cache_root(), platform);
|
||||
self.resolve_cached_at(platform, install_dir).await
|
||||
}
|
||||
|
||||
async fn resolve_cached_at(
|
||||
&self,
|
||||
platform: PackagePlatform,
|
||||
install_dir: PathBuf,
|
||||
) -> Result<Option<P::Installed>, P::Error> {
|
||||
if !fs::try_exists(&install_dir)
|
||||
.await
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
@@ -97,10 +113,10 @@ impl<P: ManagedPackage> PackageManager<P> {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let package = self
|
||||
.config
|
||||
.package()
|
||||
.load_installed(install_dir, platform)?;
|
||||
let package = match self.config.package().load_installed(install_dir, platform) {
|
||||
Ok(package) => package,
|
||||
Err(_) => return Ok(None),
|
||||
};
|
||||
if self.config.package().installed_version(&package) != self.config.package().version() {
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -113,6 +129,61 @@ impl<P: ManagedPackage> PackageManager<P> {
|
||||
}
|
||||
|
||||
let platform = PackagePlatform::detect_current().map_err(P::Error::from)?;
|
||||
let cache_root = self.config.cache_root();
|
||||
let install_dir = self.config.package().install_dir(&cache_root, platform);
|
||||
if let Some(package) = self
|
||||
.resolve_cached_at(platform, install_dir.clone())
|
||||
.await?
|
||||
{
|
||||
return Ok(package);
|
||||
}
|
||||
|
||||
if let Some(parent) = install_dir.parent() {
|
||||
fs::create_dir_all(parent)
|
||||
.await
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
context: format!("failed to create {}", parent.display()),
|
||||
source,
|
||||
})
|
||||
.map_err(P::Error::from)?;
|
||||
}
|
||||
|
||||
let lock_path = install_dir.with_extension("lock");
|
||||
let lock_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.truncate(false)
|
||||
.open(&lock_path)
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
context: format!("failed to open {}", lock_path.display()),
|
||||
source,
|
||||
})
|
||||
.map_err(P::Error::from)?;
|
||||
let mut install_lock = FileRwLock::new(lock_file);
|
||||
let _install_guard = loop {
|
||||
match install_lock.try_write() {
|
||||
Ok(guard) => break guard,
|
||||
Err(source) if source.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
sleep(INSTALL_LOCK_POLL_INTERVAL).await;
|
||||
}
|
||||
Err(source) => {
|
||||
return Err(PackageManagerError::Io {
|
||||
context: format!("failed to lock {}", lock_path.display()),
|
||||
source,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(package) = self
|
||||
.resolve_cached_at(platform, install_dir.clone())
|
||||
.await?
|
||||
{
|
||||
return Ok(package);
|
||||
}
|
||||
|
||||
let manifest = self.fetch_release_manifest().await?;
|
||||
if self.config.package().release_version(&manifest) != self.config.package().version() {
|
||||
return Err(PackageManagerError::UnexpectedPackageVersion {
|
||||
@@ -122,6 +193,22 @@ impl<P: ManagedPackage> PackageManager<P> {
|
||||
.into());
|
||||
}
|
||||
|
||||
fs::create_dir_all(&cache_root)
|
||||
.await
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
context: format!("failed to create {}", cache_root.display()),
|
||||
source,
|
||||
})
|
||||
.map_err(P::Error::from)?;
|
||||
let staging_root = cache_root.join(".staging");
|
||||
fs::create_dir_all(&staging_root)
|
||||
.await
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
context: format!("failed to create {}", staging_root.display()),
|
||||
source,
|
||||
})
|
||||
.map_err(P::Error::from)?;
|
||||
|
||||
let platform_archive = self
|
||||
.config
|
||||
.package()
|
||||
@@ -134,41 +221,11 @@ impl<P: ManagedPackage> PackageManager<P> {
|
||||
let archive_bytes = self.download_bytes(&archive_url).await?;
|
||||
verify_sha256(&archive_bytes, &platform_archive.sha256).map_err(P::Error::from)?;
|
||||
|
||||
let install_dir = self
|
||||
.config
|
||||
.package()
|
||||
.install_dir(&self.config.cache_root(), platform);
|
||||
if fs::try_exists(&install_dir)
|
||||
.await
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
context: format!("failed to read {}", install_dir.display()),
|
||||
source,
|
||||
})
|
||||
.map_err(P::Error::from)?
|
||||
{
|
||||
fs::remove_dir_all(&install_dir)
|
||||
.await
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
context: format!("failed to remove {}", install_dir.display()),
|
||||
source,
|
||||
})
|
||||
.map_err(P::Error::from)?;
|
||||
}
|
||||
|
||||
let cache_root = self.config.cache_root();
|
||||
fs::create_dir_all(&cache_root)
|
||||
.await
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
context: format!("failed to create {}", cache_root.display()),
|
||||
source,
|
||||
})
|
||||
.map_err(P::Error::from)?;
|
||||
|
||||
let staging_dir = tempdir_in(&cache_root)
|
||||
let staging_dir = tempdir_in(&staging_root)
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
context: format!(
|
||||
"failed to create staging directory in {}",
|
||||
cache_root.display()
|
||||
staging_root.display()
|
||||
),
|
||||
source,
|
||||
})
|
||||
@@ -222,17 +279,91 @@ impl<P: ManagedPackage> PackageManager<P> {
|
||||
.map_err(P::Error::from)?;
|
||||
}
|
||||
|
||||
fs::rename(&extracted_root, &install_dir)
|
||||
let mut replaced_install_dir = None;
|
||||
if fs::try_exists(&install_dir)
|
||||
.await
|
||||
.map_err(|source| PackageManagerError::Io {
|
||||
context: format!(
|
||||
"failed to move {} to {}",
|
||||
extracted_root.display(),
|
||||
install_dir.display()
|
||||
),
|
||||
context: format!("failed to read {}", install_dir.display()),
|
||||
source,
|
||||
})
|
||||
.map_err(P::Error::from)?;
|
||||
.map_err(P::Error::from)?
|
||||
{
|
||||
let install_name = install_dir.file_name().ok_or_else(|| {
|
||||
PackageManagerError::ArchiveExtraction(format!(
|
||||
"install path `{}` has no terminal component",
|
||||
install_dir.display()
|
||||
))
|
||||
})?;
|
||||
let install_name = install_name.to_string_lossy();
|
||||
let mut suffix = 0u32;
|
||||
loop {
|
||||
let quarantined_path = install_dir.with_file_name(format!(
|
||||
".{install_name}.replaced-{}-{suffix}",
|
||||
std::process::id()
|
||||
));
|
||||
match fs::rename(&install_dir, &quarantined_path).await {
|
||||
Ok(()) => {
|
||||
replaced_install_dir = Some(quarantined_path);
|
||||
break;
|
||||
}
|
||||
Err(source) if source.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
suffix += 1;
|
||||
}
|
||||
Err(source) => {
|
||||
return Err(PackageManagerError::Io {
|
||||
context: format!(
|
||||
"failed to quarantine {} to {}",
|
||||
install_dir.display(),
|
||||
quarantined_path.display()
|
||||
),
|
||||
source,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match fs::rename(&extracted_root, &install_dir).await {
|
||||
Ok(()) => {}
|
||||
Err(source)
|
||||
if matches!(
|
||||
source.kind(),
|
||||
std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::DirectoryNotEmpty
|
||||
) =>
|
||||
{
|
||||
if let Some(package) = self
|
||||
.resolve_cached_at(platform, install_dir.clone())
|
||||
.await?
|
||||
{
|
||||
return Ok(package);
|
||||
}
|
||||
return Err(PackageManagerError::Io {
|
||||
context: format!(
|
||||
"failed to move {} to {}",
|
||||
extracted_root.display(),
|
||||
install_dir.display()
|
||||
),
|
||||
source,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
Err(source) => {
|
||||
return Err(PackageManagerError::Io {
|
||||
context: format!(
|
||||
"failed to move {} to {}",
|
||||
extracted_root.display(),
|
||||
install_dir.display()
|
||||
),
|
||||
source,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(replaced_install_dir) = replaced_install_dir {
|
||||
let _ = fs::remove_dir_all(replaced_install_dir).await;
|
||||
}
|
||||
|
||||
self.config.package().load_installed(install_dir, platform)
|
||||
}
|
||||
@@ -505,10 +636,35 @@ fn extract_zip_archive(archive_path: &Path, destination: &Path) -> Result<(), Pa
|
||||
context: format!("failed to write {}", output_path.display()),
|
||||
source,
|
||||
})?;
|
||||
apply_zip_permissions(&entry, &output_path)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn apply_zip_permissions(
|
||||
entry: &zip::read::ZipFile<'_>,
|
||||
output_path: &Path,
|
||||
) -> Result<(), PackageManagerError> {
|
||||
let Some(mode) = entry.unix_mode() else {
|
||||
return Ok(());
|
||||
};
|
||||
std::fs::set_permissions(output_path, std::fs::Permissions::from_mode(mode)).map_err(|source| {
|
||||
PackageManagerError::Io {
|
||||
context: format!("failed to set permissions on {}", output_path.display()),
|
||||
source,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
fn apply_zip_permissions(
|
||||
_entry: &zip::read::ZipFile<'_>,
|
||||
_output_path: &Path,
|
||||
) -> Result<(), PackageManagerError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn extract_tar_gz_archive(
|
||||
archive_path: &Path,
|
||||
destination: &Path,
|
||||
@@ -573,7 +729,9 @@ mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::io::Cursor;
|
||||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
use tokio::sync::Barrier;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
@@ -732,6 +890,138 @@ mod tests {
|
||||
.join(platform.as_str()),
|
||||
}
|
||||
);
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let executable_mode = std::fs::metadata(installed.root_dir.join("bin/tool"))
|
||||
.unwrap_or_else(|error| panic!("{error}"))
|
||||
.permissions()
|
||||
.mode();
|
||||
assert_eq!(executable_mode & 0o111, 0o111);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ensure_installed_replaces_invalid_cached_install() {
|
||||
let server = MockServer::start().await;
|
||||
let version = "0.1.0";
|
||||
let platform = PackagePlatform::detect_current().unwrap_or_else(|error| panic!("{error}"));
|
||||
let archive_name = format!("test-package-v{version}-{}.zip", platform.as_str());
|
||||
let archive_bytes = build_zip_archive(version);
|
||||
let archive_sha = format!("{:x}", Sha256::digest(&archive_bytes));
|
||||
let manifest = serde_json::json!({
|
||||
"package_version": version,
|
||||
"platforms": {
|
||||
platform.as_str(): {
|
||||
"archive": archive_name,
|
||||
"sha256": archive_sha,
|
||||
"format": "zip",
|
||||
"size_bytes": archive_bytes.len(),
|
||||
}
|
||||
}
|
||||
});
|
||||
Mock::given(method("GET"))
|
||||
.and(path(format!("/test-package-v{version}-manifest.json")))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(&manifest))
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path(format!("/{archive_name}")))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_bytes(archive_bytes))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let codex_home = TempDir::new().unwrap_or_else(|error| panic!("{error}"));
|
||||
let install_dir = codex_home
|
||||
.path()
|
||||
.join("packages")
|
||||
.join("test-package")
|
||||
.join(version)
|
||||
.join(platform.as_str());
|
||||
std::fs::create_dir_all(&install_dir).unwrap_or_else(|error| panic!("{error}"));
|
||||
std::fs::write(install_dir.join("broken.txt"), "stale")
|
||||
.unwrap_or_else(|error| panic!("{error}"));
|
||||
|
||||
let manager = PackageManager::new(PackageManagerConfig::new(
|
||||
codex_home.path().to_path_buf(),
|
||||
TestPackage {
|
||||
base_url: Url::parse(&format!("{}/", server.uri()))
|
||||
.unwrap_or_else(|error| panic!("{error}")),
|
||||
version: version.to_string(),
|
||||
},
|
||||
));
|
||||
|
||||
let installed = manager
|
||||
.ensure_installed()
|
||||
.await
|
||||
.unwrap_or_else(|error| panic!("{error}"));
|
||||
|
||||
assert_eq!(installed.version, version);
|
||||
assert!(installed.root_dir.join("manifest.json").exists());
|
||||
assert!(!installed.root_dir.join("broken.txt").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ensure_installed_serializes_concurrent_installs() {
|
||||
let server = MockServer::start().await;
|
||||
let version = "0.1.0";
|
||||
let platform = PackagePlatform::detect_current().unwrap_or_else(|error| panic!("{error}"));
|
||||
let archive_name = format!("test-package-v{version}-{}.zip", platform.as_str());
|
||||
let archive_bytes = build_zip_archive(version);
|
||||
let archive_sha = format!("{:x}", Sha256::digest(&archive_bytes));
|
||||
let manifest = serde_json::json!({
|
||||
"package_version": version,
|
||||
"platforms": {
|
||||
platform.as_str(): {
|
||||
"archive": archive_name,
|
||||
"sha256": archive_sha,
|
||||
"format": "zip",
|
||||
"size_bytes": archive_bytes.len(),
|
||||
}
|
||||
}
|
||||
});
|
||||
Mock::given(method("GET"))
|
||||
.and(path(format!("/test-package-v{version}-manifest.json")))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(&manifest))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path(format!("/{archive_name}")))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_bytes(archive_bytes))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let codex_home = TempDir::new().unwrap_or_else(|error| panic!("{error}"));
|
||||
let config = PackageManagerConfig::new(
|
||||
codex_home.path().to_path_buf(),
|
||||
TestPackage {
|
||||
base_url: Url::parse(&format!("{}/", server.uri()))
|
||||
.unwrap_or_else(|error| panic!("{error}")),
|
||||
version: version.to_string(),
|
||||
},
|
||||
);
|
||||
let manager_one = PackageManager::new(config.clone());
|
||||
let manager_two = PackageManager::new(config);
|
||||
let barrier = Arc::new(Barrier::new(2));
|
||||
let barrier_one = Arc::clone(&barrier);
|
||||
let barrier_two = Arc::clone(&barrier);
|
||||
|
||||
let (first, second) = tokio::join!(
|
||||
async {
|
||||
barrier_one.wait().await;
|
||||
manager_one.ensure_installed().await
|
||||
},
|
||||
async {
|
||||
barrier_two.wait().await;
|
||||
manager_two.ensure_installed().await
|
||||
}
|
||||
);
|
||||
|
||||
let first = first.unwrap_or_else(|error| panic!("{error}"));
|
||||
let second = second.unwrap_or_else(|error| panic!("{error}"));
|
||||
assert_eq!(first, second);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -759,6 +1049,10 @@ mod tests {
|
||||
.unwrap_or_else(|error| panic!("{error}"));
|
||||
zip.write_all(version.as_bytes())
|
||||
.unwrap_or_else(|error| panic!("{error}"));
|
||||
zip.start_file("test-package/bin/tool", options.unix_permissions(0o755))
|
||||
.unwrap_or_else(|error| panic!("{error}"));
|
||||
zip.write_all(b"#!/bin/sh\n")
|
||||
.unwrap_or_else(|error| panic!("{error}"));
|
||||
zip.finish().unwrap_or_else(|error| panic!("{error}"));
|
||||
}
|
||||
bytes.into_inner()
|
||||
|
||||
Reference in New Issue
Block a user