diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 941cb7e13a..e135c9b245 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -7,6 +7,7 @@ use std::sync::atomic::AtomicBool; use crate::attestation::app_server_attestation_provider; use crate::config_manager::ConfigManager; use crate::connection_rpc_gate::ConnectionRpcGate; +use crate::error_code::internal_error; use crate::error_code::invalid_request; use crate::extensions::guardian_agent_spawner; use crate::extensions::thread_extensions; @@ -984,8 +985,10 @@ impl MessageProcessor { })? } else { self.environment_manager - .default_environment() - .unwrap_or_else(|| self.environment_manager.local_environment()) + .default_or_local_environment() + .ok_or_else(|| { + internal_error("runtime install environment is not configured") + })? }; let response = environment.install_runtime(params).await?; let response = diff --git a/codex-rs/app-server/src/runtime_install.rs b/codex-rs/app-server/src/runtime_install.rs index 2dba4543c7..18bdfad9d9 100644 --- a/codex-rs/app-server/src/runtime_install.rs +++ b/codex-rs/app-server/src/runtime_install.rs @@ -101,9 +101,8 @@ async fn sync_primary_runtime_skills( return Ok(Vec::new()); } - move_legacy_primary_runtime_skills(Arc::clone(&fs), codex_home, skills_to_remove).await?; - if bundled_skill_paths.is_empty() { + move_legacy_primary_runtime_skills(fs, codex_home, skills_to_remove).await?; return Ok(Vec::new()); } @@ -113,43 +112,44 @@ async fn sync_primary_runtime_skills( .join("skills") .join(PUBLISHED_ARTIFACT_NAME), )?; - remove_if_exists( - Arc::clone(&fs), - &destination_root, - RemoveOptions { - recursive: true, - force: true, - }, - ) - .await?; - create_directory(Arc::clone(&fs), &destination_root).await?; + let staging_root = temporary_sibling_path(&destination_root, "staging")?; + let result = async { + create_directory(Arc::clone(&fs), &staging_root).await?; - let mut materialized = Vec::with_capacity(bundled_skill_paths.len()); - for bundled_skill_path in bundled_skill_paths { - let skill_root = absolute_path( - bundled_skill_path - .as_path() - .parent() - .ok_or_else(|| { - invalid_params(format!( - "bundled skill path {} has no parent directory", - bundled_skill_path.display() - )) - })? - .to_path_buf(), - )?; - let skill_name = skill_root.as_path().file_name().ok_or_else(|| { - invalid_params(format!( - "bundled skill path {} has no skill directory name", - bundled_skill_path.display() - )) - })?; - let destination = absolute_path(destination_root.as_path().join(skill_name))?; - replace_directory(Arc::clone(&fs), &skill_root, &destination).await?; - materialized.push(absolute_path(destination.as_path().join("SKILL.md"))?); + let mut materialized = Vec::with_capacity(bundled_skill_paths.len()); + for bundled_skill_path in bundled_skill_paths { + let skill_root = absolute_path( + bundled_skill_path + .as_path() + .parent() + .ok_or_else(|| { + invalid_params(format!( + "bundled skill path {} has no parent directory", + bundled_skill_path.display() + )) + })? + .to_path_buf(), + )?; + let skill_name = skill_root.as_path().file_name().ok_or_else(|| { + invalid_params(format!( + "bundled skill path {} has no skill directory name", + bundled_skill_path.display() + )) + })?; + let staged_skill_root = absolute_path(staging_root.as_path().join(skill_name))?; + copy_directory(Arc::clone(&fs), &skill_root, &staged_skill_root).await?; + materialized.push(absolute_path( + destination_root.as_path().join(skill_name).join("SKILL.md"), + )?); + } + + publish_staged_directory(Arc::clone(&fs), &staging_root, &destination_root).await?; + move_legacy_primary_runtime_skills(Arc::clone(&fs), codex_home, skills_to_remove).await?; + Ok(materialized) } - - Ok(materialized) + .await; + cleanup_directory(&fs, &staging_root, "staged primary runtime skills").await; + result } async fn move_legacy_primary_runtime_skills( @@ -241,19 +241,72 @@ async fn replace_directory( source: &AbsolutePathBuf, destination: &AbsolutePathBuf, ) -> Result<(), JSONRPCErrorError> { - remove_if_exists( - Arc::clone(&fs), - destination, - RemoveOptions { - recursive: true, - force: true, - }, - ) - .await?; if let Some(parent) = destination.as_path().parent() { create_directory(Arc::clone(&fs), &absolute_path(parent.to_path_buf())?).await?; } - copy_directory(fs, source, destination).await + let staging_path = temporary_sibling_path(destination, "staging")?; + let result = async { + copy_directory(Arc::clone(&fs), source, &staging_path).await?; + publish_staged_directory(Arc::clone(&fs), &staging_path, destination).await + } + .await; + cleanup_directory(&fs, &staging_path, "staged runtime directory").await; + result +} + +async fn publish_staged_directory( + fs: Arc, + staging_path: &AbsolutePathBuf, + destination: &AbsolutePathBuf, +) -> Result<(), JSONRPCErrorError> { + let backup_path = temporary_sibling_path(destination, "previous")?; + let result = async { + let destination_exists = path_exists(Arc::clone(&fs), destination).await?; + if destination_exists { + copy_directory(Arc::clone(&fs), destination, &backup_path).await?; + } + + remove_if_exists( + Arc::clone(&fs), + destination, + RemoveOptions { + recursive: true, + force: true, + }, + ) + .await?; + + if let Err(error) = copy_directory(Arc::clone(&fs), staging_path, destination).await { + remove_if_exists( + Arc::clone(&fs), + destination, + RemoveOptions { + recursive: true, + force: true, + }, + ) + .await?; + if destination_exists + && let Err(restore_error) = + copy_directory(Arc::clone(&fs), &backup_path, destination).await + { + return Err(internal_error(format!( + "failed to restore published runtime directory {} after replacement failed: {}; restore failed: {}", + destination.display(), + error.message, + restore_error.message + ))); + } + return Err(error); + } + + Ok(()) + } + .await; + if result.is_ok() { + cleanup_directory(&fs, &backup_path, "previous runtime directory").await; + } + result } async fn copy_directory( @@ -310,6 +363,57 @@ async fn remove_if_exists( } } +async fn path_exists( + fs: Arc, + path: &AbsolutePathBuf, +) -> Result { + match fs.get_metadata(path, /*sandbox*/ None).await { + Ok(_) => Ok(true), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false), + Err(err) => Err(internal_error(format!( + "failed to inspect runtime path {}: {err}", + path.display() + ))), + } +} + +async fn cleanup_directory(fs: &Arc, path: &AbsolutePathBuf, label: &str) { + if let Err(error) = remove_if_exists( + Arc::clone(fs), + path, + RemoveOptions { + recursive: true, + force: true, + }, + ) + .await + { + tracing::warn!( + path = %path.display(), + error = %error.message, + "failed to clean up {label}" + ); + } +} + +fn temporary_sibling_path( + destination: &AbsolutePathBuf, + label: &str, +) -> Result { + let parent = destination.as_path().parent().ok_or_else(|| { + internal_error(format!( + "runtime destination {} has no parent directory", + destination.display() + )) + })?; + let destination_name = destination + .as_path() + .file_name() + .map(safe_path_segment) + .unwrap_or_else(|| "runtime-item".to_string()); + absolute_path(parent.join(format!(".{destination_name}-{label}-{}", Uuid::new_v4()))) +} + fn safe_path_segment(segment: &OsStr) -> String { let safe = segment .to_string_lossy() @@ -444,4 +548,76 @@ mod tests { 1 ); } + + #[tokio::test] + async fn materialize_bundled_plugin_marketplaces_preserves_existing_copy_on_copy_failure() { + let codex_home = tempfile::tempdir().expect("codex home"); + let runtime = tempfile::tempdir().expect("runtime"); + let missing_marketplace_root = runtime.path().join("market"); + let published_manifest = codex_home + .path() + .join("plugins") + .join(PUBLISHED_ARTIFACT_NAME) + .join("marketplaces") + .join("market") + .join(".agents/plugins/marketplace.json"); + fs::create_dir_all(published_manifest.parent().expect("manifest parent")) + .await + .expect("create published marketplace"); + fs::write(&published_manifest, "previous") + .await + .expect("write published marketplace"); + + let error = materialize_bundled_plugin_marketplaces( + Arc::new(LocalFileSystem::unsandboxed()), + &absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"), + &[absolute_path(missing_marketplace_root).expect("absolute marketplace path")], + ) + .await + .expect_err("missing marketplace should fail"); + + assert!(error.message.contains("failed to copy directory")); + assert_eq!( + fs::read_to_string(published_manifest) + .await + .expect("read published marketplace"), + "previous" + ); + } + + #[tokio::test] + async fn sync_primary_runtime_skills_preserves_existing_copy_on_copy_failure() { + let codex_home = tempfile::tempdir().expect("codex home"); + let runtime = tempfile::tempdir().expect("runtime"); + let missing_skill_path = runtime.path().join("skills").join("debug").join("SKILL.md"); + let published_skill_path = codex_home + .path() + .join("skills") + .join(PUBLISHED_ARTIFACT_NAME) + .join("existing") + .join("SKILL.md"); + fs::create_dir_all(published_skill_path.parent().expect("skill parent")) + .await + .expect("create published skill"); + fs::write(&published_skill_path, "previous") + .await + .expect("write published skill"); + + let error = sync_primary_runtime_skills( + Arc::new(LocalFileSystem::unsandboxed()), + &absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"), + &[absolute_path(missing_skill_path).expect("absolute skill path")], + &[], + ) + .await + .expect_err("missing skill should fail"); + + assert!(error.message.contains("failed to copy directory")); + assert_eq!( + fs::read_to_string(published_skill_path) + .await + .expect("read published skill"), + "previous" + ); + } }