From a787408a06cc7b4d10138d53c355237206427bbe Mon Sep 17 00:00:00 2001 From: acrognale-oai Date: Fri, 22 May 2026 11:43:19 -0400 Subject: [PATCH] feat(app-server): publish installed runtime resources safely --- codex-rs/app-server/src/lib.rs | 1 + codex-rs/app-server/src/message_processor.rs | 13 +- codex-rs/app-server/src/runtime_install.rs | 655 +++++++++++++++++++ codex-rs/exec-server/src/client.rs | 44 +- codex-rs/exec-server/src/codex_home.rs | 26 + codex-rs/exec-server/src/environment.rs | 61 +- codex-rs/exec-server/src/lib.rs | 1 + codex-rs/exec-server/src/protocol.rs | 1 + codex-rs/exec-server/src/runtime_install.rs | 375 ----------- codex-rs/exec-server/src/server/handler.rs | 5 +- codex-rs/exec-server/tests/http_client.rs | 2 + 11 files changed, 798 insertions(+), 386 deletions(-) create mode 100644 codex-rs/app-server/src/runtime_install.rs create mode 100644 codex-rs/exec-server/src/codex_home.rs diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 80305d2d9f..9393330c1d 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -95,6 +95,7 @@ mod models; mod outgoing_message; mod request_processors; mod request_serialization; +mod runtime_install; mod server_request_error; mod skills_watcher; mod thread_state; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 801f3bb593..e135c9b245 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -180,6 +180,7 @@ pub(crate) struct MessageProcessor { plugin_processor: PluginRequestProcessor, remote_control_processor: RemoteControlRequestProcessor, search_processor: SearchRequestProcessor, + thread_manager: Arc, thread_goal_processor: ThreadGoalRequestProcessor, thread_processor: ThreadRequestProcessor, turn_processor: TurnRequestProcessor, @@ -502,6 +503,7 @@ impl MessageProcessor { plugin_processor, remote_control_processor, search_processor, + thread_manager, thread_goal_processor, thread_processor, turn_processor, @@ -988,10 +990,13 @@ impl MessageProcessor { internal_error("runtime install environment is not configured") })? }; - environment - .install_runtime(params) - .await - .map(|response| Some(response.into())) + let response = environment.install_runtime(params).await?; + let response = + crate::runtime_install::finalize_runtime_install(&environment, response) + .await?; + self.thread_manager.plugins_manager().clear_cache(); + self.thread_manager.skills_manager().clear_cache(); + Ok(Some(response.into())) } ClientRequest::ThreadStart { params, .. } => { self.thread_processor diff --git a/codex-rs/app-server/src/runtime_install.rs b/codex-rs/app-server/src/runtime_install.rs new file mode 100644 index 0000000000..e89db560ae --- /dev/null +++ b/codex-rs/app-server/src/runtime_install.rs @@ -0,0 +1,655 @@ +use std::ffi::OsStr; +use std::io; +use std::path::Component; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; + +use codex_app_server_protocol::JSONRPCErrorError; +use codex_app_server_protocol::RuntimeInstallPaths; +use codex_app_server_protocol::RuntimeInstallResponse; +use codex_exec_server::CopyOptions; +use codex_exec_server::CreateDirectoryOptions; +use codex_exec_server::Environment; +use codex_exec_server::ExecutorFileSystem; +use codex_exec_server::RemoveOptions; +use codex_utils_absolute_path::AbsolutePathBuf; +use uuid::Uuid; + +use crate::error_code::internal_error; +use crate::error_code::invalid_params; + +const PUBLISHED_ARTIFACT_NAME: &str = "codex-primary-runtime"; + +pub(crate) async fn finalize_runtime_install( + environment: &Environment, + mut response: RuntimeInstallResponse, +) -> Result { + if response.paths.bundled_plugin_marketplace_paths.is_empty() + && response.paths.bundled_skill_paths.is_empty() + && response.paths.skills_to_remove.is_empty() + { + return Ok(response); + } + + let codex_home = environment.codex_home().await?; + response.paths = + finalize_runtime_paths(environment.get_filesystem(), &codex_home, response.paths).await?; + Ok(response) +} + +async fn finalize_runtime_paths( + fs: Arc, + codex_home: &AbsolutePathBuf, + mut paths: RuntimeInstallPaths, +) -> Result { + paths.bundled_plugin_marketplace_paths = materialize_bundled_plugin_marketplaces( + Arc::clone(&fs), + codex_home, + &paths.bundled_plugin_marketplace_paths, + ) + .await?; + paths.bundled_skill_paths = sync_primary_runtime_skills( + fs, + codex_home, + &paths.bundled_skill_paths, + &paths.skills_to_remove, + ) + .await?; + Ok(paths) +} + +async fn materialize_bundled_plugin_marketplaces( + fs: Arc, + codex_home: &AbsolutePathBuf, + marketplace_roots: &[AbsolutePathBuf], +) -> Result, JSONRPCErrorError> { + if marketplace_roots.is_empty() { + return Ok(Vec::new()); + } + + let destination_root = absolute_path( + codex_home + .as_path() + .join("plugins") + .join(PUBLISHED_ARTIFACT_NAME) + .join("marketplaces"), + )?; + let mut materialized = Vec::with_capacity(marketplace_roots.len()); + for marketplace_root in marketplace_roots { + let marketplace_name = marketplace_root.as_path().file_name().ok_or_else(|| { + invalid_params("bundled plugin marketplace path has no directory name") + })?; + let destination = absolute_path( + destination_root + .as_path() + .join(safe_path_segment(marketplace_name)), + )?; + replace_directory(Arc::clone(&fs), marketplace_root, &destination).await?; + materialized.push(destination); + } + Ok(materialized) +} + +async fn sync_primary_runtime_skills( + fs: Arc, + codex_home: &AbsolutePathBuf, + bundled_skill_paths: &[AbsolutePathBuf], + skills_to_remove: &[String], +) -> Result, JSONRPCErrorError> { + if bundled_skill_paths.is_empty() && skills_to_remove.is_empty() { + return Ok(Vec::new()); + } + + if bundled_skill_paths.is_empty() { + move_legacy_primary_runtime_skills(fs, codex_home, skills_to_remove).await?; + return Ok(Vec::new()); + } + + let destination_root = absolute_path( + codex_home + .as_path() + .join("skills") + .join(PUBLISHED_ARTIFACT_NAME), + )?; + let staging_root = temporary_sibling_path(&destination_root, "staging")?; + let result = async { + create_directory(Arc::clone(&fs), &staging_root).await?; + + let mut materialized = Vec::with_capacity(bundled_skill_paths.len()); + for bundled_skill_path in bundled_skill_paths { + let skill_root = absolute_path( + bundled_skill_path + .as_path() + .parent() + .ok_or_else(|| { + invalid_params(format!( + "bundled skill path {} has no parent directory", + bundled_skill_path.display() + )) + })? + .to_path_buf(), + )?; + let skill_name = skill_root.as_path().file_name().ok_or_else(|| { + invalid_params(format!( + "bundled skill path {} has no skill directory name", + bundled_skill_path.display() + )) + })?; + let staged_skill_root = absolute_path(staging_root.as_path().join(skill_name))?; + copy_directory(Arc::clone(&fs), &skill_root, &staged_skill_root).await?; + materialized.push(absolute_path( + destination_root.as_path().join(skill_name).join("SKILL.md"), + )?); + } + + publish_staged_directory(Arc::clone(&fs), &staging_root, &destination_root).await?; + move_legacy_primary_runtime_skills(Arc::clone(&fs), codex_home, skills_to_remove).await?; + Ok(materialized) + } + .await; + cleanup_directory(&fs, &staging_root, "staged primary runtime skills").await; + result +} + +async fn move_legacy_primary_runtime_skills( + fs: Arc, + codex_home: &AbsolutePathBuf, + skills_to_remove: &[String], +) -> Result<(), JSONRPCErrorError> { + if skills_to_remove.is_empty() { + return Ok(()); + } + + let skills_root = absolute_path(codex_home.as_path().join("skills"))?; + for skill_dir in skills_to_remove { + let skill_root = resolve_legacy_skill_directory(&skills_root, skill_dir)?; + let metadata = match fs.get_metadata(&skill_root, /*sandbox*/ None).await { + Ok(metadata) => metadata, + Err(err) if err.kind() == io::ErrorKind::NotFound => continue, + Err(err) => { + return Err(internal_error(format!( + "failed to inspect legacy skill directory {}: {err}", + skill_root.display() + ))); + } + }; + if !metadata.is_directory { + continue; + } + + let backup_path = absolute_path( + codex_home + .as_path() + .join(".tmp") + .join("legacy-primary-runtime-skills") + .join(format!( + "{}-{}", + skill_root + .as_path() + .file_name() + .and_then(OsStr::to_str) + .unwrap_or("skill"), + Uuid::new_v4() + )), + )?; + if let Some(parent) = backup_path.as_path().parent() { + create_directory(Arc::clone(&fs), &absolute_path(parent.to_path_buf())?).await?; + } + copy_directory(Arc::clone(&fs), &skill_root, &backup_path).await?; + remove_if_exists( + Arc::clone(&fs), + &skill_root, + RemoveOptions { + recursive: true, + force: true, + }, + ) + .await?; + tracing::info!( + skill_dir = %skill_dir, + skill_root = %skill_root.display(), + backup_path = %backup_path.display(), + "moved legacy primary runtime skill" + ); + } + Ok(()) +} + +fn resolve_legacy_skill_directory( + skills_root: &AbsolutePathBuf, + skill_dir: &str, +) -> Result { + let relative = Path::new(skill_dir); + if !relative + .components() + .all(|component| matches!(component, Component::Normal(_))) + { + return Err(invalid_params(format!( + "legacy primary runtime skill path must stay within the skills directory: {skill_dir}" + ))); + } + absolute_path(skills_root.as_path().join(relative)) +} + +async fn replace_directory( + fs: Arc, + source: &AbsolutePathBuf, + destination: &AbsolutePathBuf, +) -> Result<(), JSONRPCErrorError> { + if let Some(parent) = destination.as_path().parent() { + create_directory(Arc::clone(&fs), &absolute_path(parent.to_path_buf())?).await?; + } + let staging_path = temporary_sibling_path(destination, "staging")?; + let result = async { + copy_directory(Arc::clone(&fs), source, &staging_path).await?; + publish_staged_directory(Arc::clone(&fs), &staging_path, destination).await + } + .await; + cleanup_directory(&fs, &staging_path, "staged runtime directory").await; + result +} + +async fn publish_staged_directory( + fs: Arc, + staging_path: &AbsolutePathBuf, + destination: &AbsolutePathBuf, +) -> Result<(), JSONRPCErrorError> { + let backup_path = temporary_sibling_path(destination, "previous")?; + let result = async { + let destination_exists = path_exists(Arc::clone(&fs), destination).await?; + if destination_exists { + copy_directory(Arc::clone(&fs), destination, &backup_path).await?; + } + + remove_if_exists( + Arc::clone(&fs), + destination, + RemoveOptions { + recursive: true, + force: true, + }, + ) + .await?; + + if let Err(error) = copy_directory(Arc::clone(&fs), staging_path, destination).await { + remove_if_exists( + Arc::clone(&fs), + destination, + RemoveOptions { + recursive: true, + force: true, + }, + ) + .await?; + if destination_exists + && let Err(restore_error) = + copy_directory(Arc::clone(&fs), &backup_path, destination).await + { + return Err(internal_error(format!( + "failed to restore published runtime directory {} after replacement failed: {}; restore failed: {}", + destination.display(), + error.message, + restore_error.message + ))); + } + return Err(error); + } + + Ok(()) + } + .await; + if result.is_ok() { + cleanup_directory(&fs, &backup_path, "previous runtime directory").await; + } + result +} + +async fn copy_directory( + fs: Arc, + source: &AbsolutePathBuf, + destination: &AbsolutePathBuf, +) -> Result<(), JSONRPCErrorError> { + fs.copy( + source, + destination, + CopyOptions { recursive: true }, + /*sandbox*/ None, + ) + .await + .map_err(|err| { + internal_error(format!( + "failed to copy directory {} to {}: {err}", + source.display(), + destination.display() + )) + }) +} + +async fn create_directory( + fs: Arc, + path: &AbsolutePathBuf, +) -> Result<(), JSONRPCErrorError> { + fs.create_directory( + path, + CreateDirectoryOptions { recursive: true }, + /*sandbox*/ None, + ) + .await + .map_err(|err| { + internal_error(format!( + "failed to create directory {}: {err}", + path.display() + )) + }) +} + +async fn remove_if_exists( + fs: Arc, + path: &AbsolutePathBuf, + options: RemoveOptions, +) -> Result<(), JSONRPCErrorError> { + match fs.remove(path, options, /*sandbox*/ None).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(internal_error(format!( + "failed to remove directory {}: {err}", + path.display() + ))), + } +} + +async fn path_exists( + fs: Arc, + path: &AbsolutePathBuf, +) -> Result { + match fs.get_metadata(path, /*sandbox*/ None).await { + Ok(_) => Ok(true), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false), + Err(err) => Err(internal_error(format!( + "failed to inspect runtime path {}: {err}", + path.display() + ))), + } +} + +async fn cleanup_directory(fs: &Arc, path: &AbsolutePathBuf, label: &str) { + if let Err(error) = remove_if_exists( + Arc::clone(fs), + path, + RemoveOptions { + recursive: true, + force: true, + }, + ) + .await + { + tracing::warn!( + path = %path.display(), + error = %error.message, + "failed to clean up {label}" + ); + } +} + +fn temporary_sibling_path( + destination: &AbsolutePathBuf, + label: &str, +) -> Result { + let parent = destination.as_path().parent().ok_or_else(|| { + internal_error(format!( + "runtime destination {} has no parent directory", + destination.display() + )) + })?; + let destination_name = destination + .as_path() + .file_name() + .map(safe_path_segment) + .unwrap_or_else(|| "runtime-item".to_string()); + absolute_path(parent.join(format!(".{destination_name}-{label}-{}", Uuid::new_v4()))) +} + +fn safe_path_segment(segment: &OsStr) -> String { + let safe = segment + .to_string_lossy() + .chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') { + ch + } else { + '-' + } + }) + .collect::(); + let safe = safe.trim_matches('.').to_string(); + if safe.is_empty() || safe == ".." { + "runtime-item".to_string() + } else { + safe + } +} + +fn absolute_path(path: PathBuf) -> Result { + AbsolutePathBuf::from_absolute_path_checked(path) + .map_err(|err| internal_error(format!("runtime path is not absolute: {err}"))) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use codex_app_server_protocol::RuntimeInstallPaths; + use codex_exec_server::LocalFileSystem; + use pretty_assertions::assert_eq; + use tokio::fs; + + use super::*; + + #[tokio::test] + async fn finalize_runtime_paths_materializes_marketplaces_and_skills() { + let codex_home = tempfile::tempdir().expect("codex home"); + let runtime = tempfile::tempdir().expect("runtime"); + let marketplace_root = runtime.path().join("market"); + fs::create_dir_all(marketplace_root.join(".agents/plugins")) + .await + .expect("create marketplace manifest dir"); + fs::write( + marketplace_root.join(".agents/plugins/marketplace.json"), + r#"{"name":"debug","plugins":[]}"#, + ) + .await + .expect("write marketplace"); + + let bundled_skill_root = runtime.path().join("skills").join("debug"); + fs::create_dir_all(&bundled_skill_root) + .await + .expect("create bundled skill"); + fs::write(bundled_skill_root.join("SKILL.md"), "debug") + .await + .expect("write bundled skill"); + + let legacy_skill_root = codex_home.path().join("skills").join("legacy"); + fs::create_dir_all(&legacy_skill_root) + .await + .expect("create legacy skill"); + fs::write(legacy_skill_root.join("SKILL.md"), "legacy") + .await + .expect("write legacy skill"); + + let paths = RuntimeInstallPaths { + bundled_plugin_marketplace_paths: vec![ + absolute_path(marketplace_root).expect("absolute marketplace path"), + ], + bundled_skill_paths: vec![ + absolute_path(bundled_skill_root.join("SKILL.md")).expect("absolute skill path"), + ], + node_modules_path: absolute_path(runtime.path().join("node_modules")) + .expect("absolute node modules path"), + node_path: absolute_path(runtime.path().join("node")).expect("absolute node path"), + python_path: absolute_path(runtime.path().join("python")) + .expect("absolute python path"), + skills_to_remove: vec!["legacy".to_string()], + }; + + let finalized = finalize_runtime_paths( + Arc::new(LocalFileSystem::unsandboxed()), + &absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"), + paths, + ) + .await + .expect("finalize runtime paths"); + + let expected_marketplace_root = codex_home + .path() + .join("plugins") + .join(PUBLISHED_ARTIFACT_NAME) + .join("marketplaces") + .join("market"); + let expected_skill_path = codex_home + .path() + .join("skills") + .join(PUBLISHED_ARTIFACT_NAME) + .join("debug") + .join("SKILL.md"); + assert_eq!( + finalized.bundled_plugin_marketplace_paths, + vec![absolute_path(expected_marketplace_root.clone()).expect("absolute path")] + ); + assert_eq!( + finalized.bundled_skill_paths, + vec![absolute_path(expected_skill_path.clone()).expect("absolute path")] + ); + assert!( + expected_marketplace_root + .join(".agents/plugins/marketplace.json") + .is_file() + ); + assert_eq!( + fs::read_to_string(expected_skill_path) + .await + .expect("read materialized skill"), + "debug" + ); + assert!(!legacy_skill_root.exists()); + assert_eq!( + std::fs::read_dir( + codex_home + .path() + .join(".tmp") + .join("legacy-primary-runtime-skills") + ) + .expect("read legacy backups") + .count(), + 1 + ); + } + + #[tokio::test] + async fn move_legacy_primary_runtime_skills_rejects_parent_path_without_removing_skill() { + let codex_home = tempfile::tempdir().expect("codex home"); + let existing_skill_path = codex_home + .path() + .join("skills") + .join("existing") + .join("SKILL.md"); + fs::create_dir_all(existing_skill_path.parent().expect("skill parent")) + .await + .expect("create existing skill"); + fs::write(&existing_skill_path, "existing") + .await + .expect("write existing skill"); + + let error = move_legacy_primary_runtime_skills( + Arc::new(LocalFileSystem::unsandboxed()), + &absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"), + &["../existing".to_string()], + ) + .await + .expect_err("parent path should fail"); + + assert!( + error.message.contains( + "legacy primary runtime skill path must stay within the skills directory" + ) + ); + assert_eq!( + fs::read_to_string(existing_skill_path) + .await + .expect("read existing skill"), + "existing" + ); + } + + #[tokio::test] + async fn materialize_bundled_plugin_marketplaces_preserves_existing_copy_on_copy_failure() { + let codex_home = tempfile::tempdir().expect("codex home"); + let runtime = tempfile::tempdir().expect("runtime"); + let missing_marketplace_root = runtime.path().join("market"); + let published_manifest = codex_home + .path() + .join("plugins") + .join(PUBLISHED_ARTIFACT_NAME) + .join("marketplaces") + .join("market") + .join(".agents/plugins/marketplace.json"); + fs::create_dir_all(published_manifest.parent().expect("manifest parent")) + .await + .expect("create published marketplace"); + fs::write(&published_manifest, "previous") + .await + .expect("write published marketplace"); + + let error = materialize_bundled_plugin_marketplaces( + Arc::new(LocalFileSystem::unsandboxed()), + &absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"), + &[absolute_path(missing_marketplace_root).expect("absolute marketplace path")], + ) + .await + .expect_err("missing marketplace should fail"); + + assert!(error.message.contains("failed to copy directory")); + assert_eq!( + fs::read_to_string(published_manifest) + .await + .expect("read published marketplace"), + "previous" + ); + } + + #[tokio::test] + async fn sync_primary_runtime_skills_preserves_existing_copy_on_copy_failure() { + let codex_home = tempfile::tempdir().expect("codex home"); + let runtime = tempfile::tempdir().expect("runtime"); + let missing_skill_path = runtime.path().join("skills").join("debug").join("SKILL.md"); + let published_skill_path = codex_home + .path() + .join("skills") + .join(PUBLISHED_ARTIFACT_NAME) + .join("existing") + .join("SKILL.md"); + fs::create_dir_all(published_skill_path.parent().expect("skill parent")) + .await + .expect("create published skill"); + fs::write(&published_skill_path, "previous") + .await + .expect("write published skill"); + + let error = sync_primary_runtime_skills( + Arc::new(LocalFileSystem::unsandboxed()), + &absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"), + &[absolute_path(missing_skill_path).expect("absolute skill path")], + &[], + ) + .await + .expect_err("missing skill should fail"); + + assert!(error.message.contains("failed to copy directory")); + assert_eq!( + fs::read_to_string(published_skill_path) + .await + .expect("read published skill"), + "previous" + ); + } +} diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 8566dbbc6d..cbe6a0fc79 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -10,6 +10,7 @@ use arc_swap::ArcSwap; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::RuntimeInstallParams; use codex_app_server_protocol::RuntimeInstallResponse; +use codex_utils_absolute_path::AbsolutePathBuf; use futures::FutureExt; use futures::future::BoxFuture; use serde_json::Value; @@ -178,6 +179,7 @@ struct Inner { http_body_streams_write_lock: Mutex<()>, http_body_stream_next_id: AtomicU64, session_id: std::sync::RwLock>, + codex_home: std::sync::RwLock>, reader_task: tokio::task::JoinHandle<()>, } @@ -344,6 +346,14 @@ impl ExecServerClient { .unwrap_or_else(std::sync::PoisonError::into_inner); *session_id = Some(response.session_id.clone()); } + { + let mut codex_home = self + .inner + .codex_home + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *codex_home = Some(response.codex_home.clone()); + } self.notify_initialized().await?; Ok(response) }) @@ -473,6 +483,14 @@ impl ExecServerClient { self.inner.disconnected.get().is_some() || self.inner.client.is_disconnected() } + pub fn codex_home(&self) -> Option { + self.inner + .codex_home + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone() + } + pub(crate) async fn connect( connection: JsonRpcConnection, options: ExecServerClientConnectOptions, @@ -520,6 +538,7 @@ impl ExecServerClient { http_body_streams_write_lock: Mutex::new(()), http_body_stream_next_id: AtomicU64::new(1), session_id: std::sync::RwLock::new(None), + codex_home: std::sync::RwLock::new(None), reader_task, } }); @@ -922,6 +941,7 @@ mod tests { use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; + use codex_utils_absolute_path::AbsolutePathBuf; use futures::SinkExt; use futures::StreamExt; use pretty_assertions::assert_eq; @@ -1057,6 +1077,10 @@ mod tests { id: request.id, result: serde_json::to_value(InitializeResponse { session_id: session_id.to_string(), + codex_home: AbsolutePathBuf::try_from( + std::env::current_dir().expect("current dir"), + ) + .expect("absolute current dir"), }) .expect("initialize response should serialize"), }), @@ -1092,7 +1116,7 @@ mod tests { program: "sh".to_string(), args: vec![ "-c".to_string(), - "read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(), + "read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\",\"codexHome\":\"/tmp\"}}'; read _line; sleep 60".to_string(), ], env: HashMap::new(), cwd: None, @@ -1116,7 +1140,7 @@ mod tests { program: "sh".to_string(), args: vec![ "-c".to_string(), - "read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(), + "read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\",\"codexHome\":\"/tmp\"}}'; read _line; sleep 60".to_string(), ], env: HashMap::new(), cwd: None, @@ -1139,7 +1163,7 @@ mod tests { args: vec![ "-NoProfile".to_string(), "-Command".to_string(), - "$null = [Console]::In.ReadLine(); [Console]::Out.WriteLine('{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'); $null = [Console]::In.ReadLine(); Start-Sleep -Seconds 60".to_string(), + "$null = [Console]::In.ReadLine(); [Console]::Out.WriteLine('{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\",\"codexHome\":\"C:\\\\Users\\\\codex\\\\.codex\"}}'); $null = [Console]::In.ReadLine(); Start-Sleep -Seconds 60".to_string(), ], env: HashMap::new(), cwd: None, @@ -1164,7 +1188,7 @@ mod tests { "read _line; \ echo \"$$\" > {}; \ sleep 60 >/dev/null 2>&1 & echo \"$!\" > {}; \ - printf '%s\\n' '{{\"id\":1,\"result\":{{\"sessionId\":\"stdio-test\"}}}}'; \ + printf '%s\\n' '{{\"id\":1,\"result\":{{\"sessionId\":\"stdio-test\",\"codexHome\":\"/tmp\"}}}}'; \ read _line; \ wait", shell_quote(pid_file.as_path()), @@ -1290,6 +1314,10 @@ mod tests { id: request.id, result: serde_json::to_value(InitializeResponse { session_id: "session-1".to_string(), + codex_home: AbsolutePathBuf::try_from( + std::env::current_dir().expect("current dir"), + ) + .expect("absolute current dir"), }) .expect("initialize response should serialize"), }), @@ -1433,6 +1461,10 @@ mod tests { id: request.id, result: serde_json::to_value(InitializeResponse { session_id: "session-1".to_string(), + codex_home: AbsolutePathBuf::try_from( + std::env::current_dir().expect("current dir"), + ) + .expect("absolute current dir"), }) .expect("initialize response should serialize"), }), @@ -1570,6 +1602,10 @@ mod tests { id: request.id, result: serde_json::to_value(InitializeResponse { session_id: "session-1".to_string(), + codex_home: AbsolutePathBuf::try_from( + std::env::current_dir().expect("current dir"), + ) + .expect("absolute current dir"), }) .expect("initialize response should serialize"), }), diff --git a/codex-rs/exec-server/src/codex_home.rs b/codex-rs/exec-server/src/codex_home.rs new file mode 100644 index 0000000000..c64f909757 --- /dev/null +++ b/codex-rs/exec-server/src/codex_home.rs @@ -0,0 +1,26 @@ +use std::path::PathBuf; + +use codex_app_server_protocol::JSONRPCErrorError; +use codex_utils_absolute_path::AbsolutePathBuf; + +use crate::rpc::internal_error; + +pub(crate) fn default_codex_home() -> Result { + default_codex_home_path() + .and_then(|path| { + AbsolutePathBuf::from_absolute_path_checked(path) + .map_err(|err| format!("runtime codex home is not absolute: {err}")) + }) + .map_err(internal_error) +} + +pub(crate) fn default_codex_home_path() -> Result { + if let Some(codex_home) = std::env::var_os("CODEX_HOME") { + return Ok(PathBuf::from(codex_home)); + } + let home = std::env::var_os("HOME") + .or_else(|| std::env::var_os("USERPROFILE")) + .map(PathBuf::from) + .ok_or_else(|| "failed to locate home directory".to_string())?; + Ok(home.join(".codex")) +} diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index af886b5e20..5566acb1a2 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -5,6 +5,7 @@ use std::sync::RwLock; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RuntimeInstallParams; use codex_app_server_protocol::RuntimeInstallResponse; +use codex_utils_absolute_path::AbsolutePathBuf; use crate::ExecServerError; use crate::ExecServerRuntimePaths; @@ -13,6 +14,7 @@ use crate::HttpClient; use crate::client::LazyRemoteExecServerClient; use crate::client::http_client::ReqwestHttpClient; use crate::client_api::ExecServerTransportParams; +use crate::codex_home::default_codex_home_path; use crate::environment_provider::DefaultEnvironmentProvider; use crate::environment_provider::EnvironmentDefault; use crate::environment_provider::EnvironmentProvider; @@ -96,7 +98,13 @@ impl EnvironmentManager { local_runtime_paths: Option, ) -> Result { let provider = environment_provider_from_codex_home(codex_home.as_ref())?; - Self::from_snapshot(provider.snapshot().await?, local_runtime_paths) + let local_codex_home = AbsolutePathBuf::from_absolute_path_checked(codex_home.as_ref()) + .map_err(|err| ExecServerError::Protocol(err.to_string()))?; + Self::from_snapshot_with_codex_home( + provider.snapshot().await?, + local_runtime_paths, + Some(local_codex_home), + ) } /// Builds a manager from the legacy environment-variable provider without @@ -136,6 +144,18 @@ impl EnvironmentManager { fn from_snapshot( snapshot: EnvironmentProviderSnapshot, local_runtime_paths: Option, + ) -> Result { + Self::from_snapshot_with_codex_home( + snapshot, + local_runtime_paths, + default_local_codex_home(), + ) + } + + fn from_snapshot_with_codex_home( + snapshot: EnvironmentProviderSnapshot, + local_runtime_paths: Option, + local_codex_home: Option, ) -> Result { let EnvironmentProviderSnapshot { environments, @@ -150,7 +170,10 @@ impl EnvironmentManager { "local environment requires configured runtime paths".to_string(), ) })?; - let local_environment = Arc::new(Environment::local(local_runtime_paths)); + let local_environment = Arc::new(Environment::local_with_codex_home( + local_runtime_paths, + local_codex_home, + )); environment_map.insert( LOCAL_ENVIRONMENT_ID.to_string(), Arc::clone(&local_environment), @@ -296,6 +319,7 @@ pub struct Environment { http_client: Arc, runtime_installer: RuntimeInstaller, local_runtime_paths: Option, + codex_home: Option, } #[derive(Clone)] @@ -344,6 +368,7 @@ impl Environment { http_client: Arc::new(ReqwestHttpClient), runtime_installer: RuntimeInstaller::Local, local_runtime_paths: None, + codex_home: default_local_codex_home(), } } } @@ -393,6 +418,13 @@ impl Environment { } pub(crate) fn local(local_runtime_paths: ExecServerRuntimePaths) -> Self { + Self::local_with_codex_home(local_runtime_paths, default_local_codex_home()) + } + + fn local_with_codex_home( + local_runtime_paths: ExecServerRuntimePaths, + codex_home: Option, + ) -> Self { Self { exec_server_url: None, remote_transport: None, @@ -403,6 +435,7 @@ impl Environment { http_client: Arc::new(ReqwestHttpClient), runtime_installer: RuntimeInstaller::Local, local_runtime_paths: Some(local_runtime_paths), + codex_home, } } @@ -441,6 +474,7 @@ impl Environment { http_client: Arc::new(http_client), runtime_installer: RuntimeInstaller::Remote(client), local_runtime_paths, + codex_home: None, } } @@ -475,6 +509,29 @@ impl Environment { ) -> Result { self.runtime_installer.install_runtime(params).await } + + pub async fn codex_home(&self) -> Result { + if let Some(codex_home) = self.codex_home.clone() { + return Ok(codex_home); + } + match &self.runtime_installer { + RuntimeInstaller::Local => default_local_codex_home().ok_or_else(|| { + internal_error("failed to locate local codex home for runtime install") + }), + RuntimeInstaller::Remote(client) => { + let client = client.get().await.map_err(exec_server_error_to_jsonrpc)?; + client + .codex_home() + .ok_or_else(|| internal_error("remote exec-server did not report a codex home")) + } + } + } +} + +fn default_local_codex_home() -> Option { + default_codex_home_path() + .ok() + .and_then(|path| AbsolutePathBuf::from_absolute_path_checked(path).ok()) } #[cfg(test)] diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 1e9dcd1600..9b65d85f73 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -1,6 +1,7 @@ mod client; mod client_api; mod client_transport; +mod codex_home; mod connection; mod environment; mod environment_provider; diff --git a/codex-rs/exec-server/src/protocol.rs b/codex-rs/exec-server/src/protocol.rs index 629c1da767..5ecd6a34d6 100644 --- a/codex-rs/exec-server/src/protocol.rs +++ b/codex-rs/exec-server/src/protocol.rs @@ -60,6 +60,7 @@ pub struct InitializeParams { #[serde(rename_all = "camelCase")] pub struct InitializeResponse { pub session_id: String, + pub codex_home: AbsolutePathBuf, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/codex-rs/exec-server/src/runtime_install.rs b/codex-rs/exec-server/src/runtime_install.rs index aaf100e01e..565d566029 100644 --- a/codex-rs/exec-server/src/runtime_install.rs +++ b/codex-rs/exec-server/src/runtime_install.rs @@ -1,7 +1,5 @@ -use std::ffi::OsStr; use std::future::Future; use std::io::ErrorKind; -use std::path::Component; use std::path::Path; use std::path::PathBuf; use std::process::Stdio; @@ -102,7 +100,6 @@ async fn install_runtime_from_archive( ) .await { - let paths = configure_runtime_paths(paths).await?; return Ok(RuntimeInstallResponse { bundle_version: Some(bundle_version.clone()), paths, @@ -179,8 +176,6 @@ async fn install_runtime_from_archive( return Err(error); } }; - let paths = configure_runtime_paths(paths).await?; - remove_dir_if_exists(&previous_runtime_root).await?; Ok(RuntimeInstallResponse { bundle_version: manifest.bundle_version.clone(), @@ -661,276 +656,6 @@ fn runtime_contained_paths( .collect() } -async fn configure_runtime_paths( - mut paths: RuntimeInstallPaths, -) -> Result { - if paths.bundled_plugin_marketplace_paths.is_empty() - && paths.bundled_skill_paths.is_empty() - && paths.skills_to_remove.is_empty() - { - return Ok(paths); - } - - let codex_home = default_codex_home()?; - let marketplace_roots = materialize_bundled_plugin_marketplaces( - &codex_home, - &paths.bundled_plugin_marketplace_paths, - ) - .await?; - let skill_paths = sync_primary_runtime_skills( - &codex_home, - &paths.bundled_skill_paths, - &paths.skills_to_remove, - ) - .await?; - - paths.bundled_plugin_marketplace_paths = marketplace_roots; - paths.bundled_skill_paths = skill_paths; - Ok(paths) -} - -fn default_codex_home() -> Result { - if let Some(codex_home) = std::env::var_os("CODEX_HOME") { - return Ok(PathBuf::from(codex_home)); - } - let home = std::env::var_os("HOME") - .or_else(|| std::env::var_os("USERPROFILE")) - .map(PathBuf::from) - .ok_or_else(|| internal_error("failed to locate home directory for runtime install"))?; - Ok(home.join(".codex")) -} - -async fn materialize_bundled_plugin_marketplaces( - codex_home: &Path, - marketplace_roots: &[AbsolutePathBuf], -) -> Result, JSONRPCErrorError> { - if marketplace_roots.is_empty() { - return Ok(Vec::new()); - } - let destination_root = codex_home - .join("plugins") - .join(PUBLISHED_ARTIFACT_NAME) - .join("marketplaces"); - let mut materialized = Vec::with_capacity(marketplace_roots.len()); - for marketplace_root in marketplace_roots { - let marketplace_name = marketplace_root.as_path().file_name().ok_or_else(|| { - invalid_params("bundled plugin marketplace path has no directory name") - })?; - let destination = destination_root.join(safe_path_segment(marketplace_name)); - copy_dir(marketplace_root.as_path(), &destination).await?; - materialized.push(absolute_path(destination)?); - } - Ok(materialized) -} - -async fn sync_primary_runtime_skills( - codex_home: &Path, - bundled_skill_paths: &[AbsolutePathBuf], - skills_to_remove: &[String], -) -> Result, JSONRPCErrorError> { - if bundled_skill_paths.is_empty() && skills_to_remove.is_empty() { - return Ok(Vec::new()); - } - - move_legacy_primary_runtime_skills(codex_home, skills_to_remove).await?; - - if bundled_skill_paths.is_empty() { - return Ok(Vec::new()); - } - - let destination_root = codex_home.join("skills").join(PUBLISHED_ARTIFACT_NAME); - remove_dir_if_exists(&destination_root).await?; - fs::create_dir_all(&destination_root).await.map_err(|err| { - internal_error(format!( - "failed to create bundled skills directory {}: {err}", - destination_root.display() - )) - })?; - - let mut materialized = Vec::with_capacity(bundled_skill_paths.len()); - for bundled_skill_path in bundled_skill_paths { - let skill_root = bundled_skill_path.as_path().parent().ok_or_else(|| { - invalid_params(format!( - "bundled skill path {} has no parent directory", - bundled_skill_path.display() - )) - })?; - let skill_name = skill_root.file_name().ok_or_else(|| { - invalid_params(format!( - "bundled skill path {} has no skill directory name", - bundled_skill_path.display() - )) - })?; - let destination = destination_root.join(skill_name); - copy_dir(skill_root, &destination).await?; - materialized.push(absolute_path(destination.join("SKILL.md"))?); - } - - Ok(materialized) -} - -async fn move_legacy_primary_runtime_skills( - codex_home: &Path, - skills_to_remove: &[String], -) -> Result<(), JSONRPCErrorError> { - if skills_to_remove.is_empty() { - return Ok(()); - } - - let skills_root = codex_home.join("skills"); - for skill_dir in skills_to_remove { - let skill_root = resolve_legacy_skill_directory(&skills_root, skill_dir); - let metadata = match fs::metadata(&skill_root).await { - Ok(metadata) => metadata, - Err(err) if err.kind() == ErrorKind::NotFound => continue, - Err(err) => { - return Err(internal_error(format!( - "failed to inspect legacy skill directory {}: {err}", - skill_root.display() - ))); - } - }; - if !metadata.is_dir() { - continue; - } - - let backup_path = codex_home - .join(".tmp") - .join("legacy-primary-runtime-skills") - .join(format!( - "{}-{}", - skill_root - .file_name() - .and_then(OsStr::to_str) - .unwrap_or("skill"), - uuid::Uuid::new_v4() - )); - if let Some(parent) = backup_path.parent() { - fs::create_dir_all(parent).await.map_err(|err| { - internal_error(format!( - "failed to create legacy skill backup directory {}: {err}", - parent.display() - )) - })?; - } - match fs::rename(&skill_root, &backup_path).await { - Ok(()) => { - tracing::info!( - skill_dir = %skill_dir, - skill_root = %skill_root.display(), - backup_path = %backup_path.display(), - "moved legacy primary runtime skill" - ); - } - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => { - return Err(internal_error(format!( - "failed to move legacy skill directory {} to {}: {err}", - skill_root.display(), - backup_path.display() - ))); - } - } - } - Ok(()) -} - -fn resolve_legacy_skill_directory(skills_root: &Path, skill_dir: &str) -> PathBuf { - let relative = Path::new(skill_dir); - if relative - .components() - .all(|component| matches!(component, Component::Normal(_))) - { - return skills_root.join(relative); - } - skills_root.join( - relative - .file_name() - .unwrap_or_else(|| OsStr::new(skill_dir.trim_matches(['/', '\\']))), - ) -} - -async fn copy_dir(source: &Path, destination: &Path) -> Result<(), JSONRPCErrorError> { - remove_dir_if_exists(destination).await?; - fs::create_dir_all(destination).await.map_err(|err| { - internal_error(format!( - "failed to create destination directory {}: {err}", - destination.display() - )) - })?; - - let mut pending = vec![(source.to_path_buf(), destination.to_path_buf())]; - while let Some((source_dir, destination_dir)) = pending.pop() { - let mut entries = fs::read_dir(&source_dir).await.map_err(|err| { - internal_error(format!( - "failed to read directory {}: {err}", - source_dir.display() - )) - })?; - while let Some(entry) = entries.next_entry().await.map_err(|err| { - internal_error(format!( - "failed to read directory entry in {}: {err}", - source_dir.display() - )) - })? { - let file_type = entry.file_type().await.map_err(|err| { - internal_error(format!( - "failed to inspect directory entry {}: {err}", - entry.path().display() - )) - })?; - let destination_path = destination_dir.join(entry.file_name()); - if file_type.is_dir() { - fs::create_dir_all(&destination_path).await.map_err(|err| { - internal_error(format!( - "failed to create destination directory {}: {err}", - destination_path.display() - )) - })?; - pending.push((entry.path(), destination_path)); - } else if file_type.is_file() { - if let Some(parent) = destination_path.parent() { - fs::create_dir_all(parent).await.map_err(|err| { - internal_error(format!( - "failed to create destination directory {}: {err}", - parent.display() - )) - })?; - } - fs::copy(entry.path(), &destination_path) - .await - .map_err(|err| { - internal_error(format!( - "failed to copy file to {}: {err}", - destination_path.display() - )) - })?; - } - } - } - Ok(()) -} - -fn safe_path_segment(segment: &OsStr) -> String { - let safe = segment - .to_string_lossy() - .chars() - .map(|ch| { - if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') { - ch - } else { - '-' - } - }) - .collect::(); - let safe = safe.trim_matches('.').to_string(); - if safe.is_empty() || safe == ".." { - "runtime-item".to_string() - } else { - safe - } -} - fn absolute_path(path: PathBuf) -> Result { AbsolutePathBuf::from_absolute_path_checked(path) .map_err(|err| internal_error(format!("runtime path is not absolute: {err}"))) @@ -1032,106 +757,6 @@ mod tests { assert!(error.message.contains("would extract outside target")); } - #[tokio::test] - async fn materialize_bundled_plugin_marketplaces_copies_to_codex_home() { - let codex_home = tempfile::tempdir().expect("codex home"); - let runtime = tempfile::tempdir().expect("runtime"); - let marketplace_root = runtime.path().join("market"); - fs::create_dir_all(marketplace_root.join(".agents/plugins")) - .await - .expect("create marketplace manifest dir"); - fs::write( - marketplace_root.join(".agents/plugins/marketplace.json"), - r#"{"name":"debug","plugins":[]}"#, - ) - .await - .expect("write marketplace"); - - let materialized = materialize_bundled_plugin_marketplaces( - codex_home.path(), - &[AbsolutePathBuf::try_from(marketplace_root).expect("absolute path")], - ) - .await - .expect("materialize marketplaces"); - - let expected_root = codex_home - .path() - .join("plugins") - .join(PUBLISHED_ARTIFACT_NAME) - .join("marketplaces") - .join("market"); - assert_eq!( - materialized, - vec![AbsolutePathBuf::try_from(expected_root.clone()).expect("absolute path")] - ); - assert!( - expected_root - .join(".agents/plugins/marketplace.json") - .is_file() - ); - } - - #[tokio::test] - async fn sync_primary_runtime_skills_copies_bundled_and_moves_legacy() { - let codex_home = tempfile::tempdir().expect("codex home"); - let runtime = tempfile::tempdir().expect("runtime"); - let bundled_skill_root = runtime.path().join("skills").join("debug"); - fs::create_dir_all(&bundled_skill_root) - .await - .expect("create bundled skill"); - fs::write(bundled_skill_root.join("SKILL.md"), "debug") - .await - .expect("write bundled skill"); - - let legacy_skill_root = codex_home.path().join("skills").join("legacy"); - fs::create_dir_all(&legacy_skill_root) - .await - .expect("create legacy skill"); - fs::write(legacy_skill_root.join("SKILL.md"), "legacy") - .await - .expect("write legacy skill"); - - let materialized = sync_primary_runtime_skills( - codex_home.path(), - &[ - AbsolutePathBuf::try_from(bundled_skill_root.join("SKILL.md")) - .expect("absolute path"), - ], - &["legacy".to_string()], - ) - .await - .expect("sync skills"); - - let expected_skill_path = codex_home - .path() - .join("skills") - .join(PUBLISHED_ARTIFACT_NAME) - .join("debug") - .join("SKILL.md"); - assert_eq!( - materialized, - vec![AbsolutePathBuf::try_from(expected_skill_path.clone()).expect("absolute path")] - ); - assert_eq!( - fs::read_to_string(expected_skill_path) - .await - .expect("read materialized skill"), - "debug" - ); - assert!(!legacy_skill_root.exists()); - assert_eq!( - std::fs::read_dir( - codex_home - .path() - .join(".tmp") - .join("legacy-primary-runtime-skills") - ) - .expect("read legacy backups") - .count(), - 1 - ); - } - #[tokio::test] async fn install_from_archive_reuses_current_runtime() { let temp_dir = tempfile::tempdir().expect("tempdir"); diff --git a/codex-rs/exec-server/src/server/handler.rs b/codex-rs/exec-server/src/server/handler.rs index 5e68ebd315..5dd6ea537a 100644 --- a/codex-rs/exec-server/src/server/handler.rs +++ b/codex-rs/exec-server/src/server/handler.rs @@ -125,7 +125,10 @@ impl ExecServerHandler { .session .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(session); - Ok(InitializeResponse { session_id }) + Ok(InitializeResponse { + session_id, + codex_home: crate::codex_home::default_codex_home()?, + }) } pub(crate) fn initialized(&self) -> Result<(), String> { diff --git a/codex-rs/exec-server/tests/http_client.rs b/codex-rs/exec-server/tests/http_client.rs index 6a087dd115..5d473d162a 100644 --- a/codex-rs/exec-server/tests/http_client.rs +++ b/codex-rs/exec-server/tests/http_client.rs @@ -17,6 +17,7 @@ use codex_exec_server::HttpRequestResponse; use codex_exec_server::InitializeParams; use codex_exec_server::InitializeResponse; use codex_exec_server::RemoteExecServerConnectArgs; +use codex_utils_absolute_path::AbsolutePathBuf; use futures::SinkExt; use futures::StreamExt; use pretty_assertions::assert_eq; @@ -1013,6 +1014,7 @@ impl JsonRpcPeer { request.id, InitializeResponse { session_id: "session-1".to_string(), + codex_home: AbsolutePathBuf::try_from(std::env::current_dir()?)?, }, ) .await?;