From 4fabb56c820f936f6e92eba6839bfa577bcc7893 Mon Sep 17 00:00:00 2001 From: Michael Zeng Date: Sun, 17 May 2026 02:35:40 -0700 Subject: [PATCH] Rename remote registry contract to environment --- codex-rs/cli/src/main.rs | 27 +++---- codex-rs/exec-server/src/client.rs | 16 ++--- codex-rs/exec-server/src/lib.rs | 4 +- codex-rs/exec-server/src/process.rs | 2 +- codex-rs/exec-server/src/relay.rs | 2 +- codex-rs/exec-server/src/remote.rs | 105 ++++++++++++++-------------- codex-rs/exec-server/tests/relay.rs | 32 +++++---- 7 files changed, 97 insertions(+), 91 deletions(-) diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 91ff4d33e8..64f33248a8 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -480,15 +480,15 @@ struct ExecServerCommand { #[arg(long = "listen", value_name = "URL", conflicts_with = "remote")] listen: Option, - /// Register this exec-server as a remote executor using the given base URL. - #[arg(long = "remote", value_name = "URL", requires = "executor_id")] + /// Register this exec-server as a remote environment using the given base URL. + #[arg(long = "remote", value_name = "URL", requires = "environment_id")] remote: Option, - /// Executor id to attach to when registering remotely. - #[arg(long = "executor-id", value_name = "ID")] - executor_id: Option, + /// Environment id to attach to when registering remotely. + #[arg(long = "environment-id", value_name = "ID")] + environment_id: Option, - /// Human-readable executor name. + /// Human-readable environment name. #[arg(long = "name", value_name = "NAME")] name: Option, @@ -1509,21 +1509,24 @@ async fn run_exec_server_command( arg0_paths.codex_linux_sandbox_exe.clone(), )?; if let Some(base_url) = cmd.remote { - let executor_id = cmd - .executor_id - .ok_or_else(|| anyhow::anyhow!("--executor-id is required when --remote is set"))?; + let environment_id = cmd + .environment_id + .ok_or_else(|| anyhow::anyhow!("--environment-id is required when --remote is set"))?; let auth_provider = load_exec_server_remote_auth_provider( root_config_overrides, config_profile, cmd.use_agent_identity_auth, ) .await?; - let mut remote_config = - codex_exec_server::RemoteExecutorConfig::new(base_url, executor_id, auth_provider)?; + let mut remote_config = codex_exec_server::RemoteEnvironmentConfig::new( + base_url, + environment_id, + auth_provider, + )?; if let Some(name) = cmd.name { remote_config.name = name; } - codex_exec_server::run_remote_executor(remote_config, runtime_paths).await?; + codex_exec_server::run_remote_environment(remote_config, runtime_paths).await?; return Ok(()); } let listen_url = cmd diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 9261a59542..615592bfa5 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -261,18 +261,18 @@ pub enum ExecServerError { Protocol(String), #[error("exec-server rejected request ({code}): {message}")] Server { code: i64, message: String }, - #[error("executor registry request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())] - ExecutorRegistryHttp { + #[error("environment registry request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())] + EnvironmentRegistryHttp { status: reqwest::StatusCode, code: Option, message: String, }, - #[error("executor registry configuration error: {0}")] - ExecutorRegistryConfig(String), - #[error("executor registry authentication error: {0}")] - ExecutorRegistryAuth(String), - #[error("executor registry request failed: {0}")] - ExecutorRegistryRequest(#[from] reqwest::Error), + #[error("environment registry configuration error: {0}")] + EnvironmentRegistryConfig(String), + #[error("environment registry authentication error: {0}")] + EnvironmentRegistryAuth(String), + #[error("environment registry request failed: {0}")] + EnvironmentRegistryRequest(#[from] reqwest::Error), } impl ExecServerClient { diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index bd556638ea..f2d16f8fac 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -91,8 +91,8 @@ pub use protocol::TerminateResponse; pub use protocol::WriteParams; pub use protocol::WriteResponse; pub use protocol::WriteStatus; -pub use remote::RemoteExecutorConfig; -pub use remote::run_remote_executor; +pub use remote::RemoteEnvironmentConfig; +pub use remote::run_remote_environment; pub use runtime_paths::ExecServerRuntimePaths; pub use server::DEFAULT_LISTEN_URL; pub use server::ExecServerListenUrlParseError; diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs index 052e4aa643..cb6c832138 100644 --- a/codex-rs/exec-server/src/process.rs +++ b/codex-rs/exec-server/src/process.rs @@ -24,7 +24,7 @@ pub struct StartedExecProcess { /// stdout, stderr, or pty bytes. `Exited` reports the process exit status, while /// `Closed` means all output streams have ended and no more output events will /// arrive. `Failed` is used when the process session cannot continue, for -/// example because the remote executor connection disconnected. +/// example because the remote environment connection disconnected. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ExecProcessEvent { Output(ProcessOutputChunk), diff --git a/codex-rs/exec-server/src/relay.rs b/codex-rs/exec-server/src/relay.rs index bce787cfc2..26cda67d22 100644 --- a/codex-rs/exec-server/src/relay.rs +++ b/codex-rs/exec-server/src/relay.rs @@ -333,7 +333,7 @@ pub(crate) async fn run_multiplexed_executor( continue; } Some(Err(err)) => { - debug!("multiplexed executor websocket read failed: {err}"); + debug!("multiplexed environment websocket read failed: {err}"); break; } }; diff --git a/codex-rs/exec-server/src/remote.rs b/codex-rs/exec-server/src/remote.rs index 2493135c9a..3753d95b91 100644 --- a/codex-rs/exec-server/src/remote.rs +++ b/codex-rs/exec-server/src/remote.rs @@ -17,22 +17,22 @@ use crate::server::ConnectionProcessor; const ERROR_BODY_PREVIEW_BYTES: usize = 4096; #[derive(Clone)] -struct ExecutorRegistryClient { +struct EnvironmentRegistryClient { base_url: String, auth_provider: SharedAuthProvider, http: reqwest::Client, } -impl std::fmt::Debug for ExecutorRegistryClient { +impl std::fmt::Debug for EnvironmentRegistryClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ExecutorRegistryClient") + f.debug_struct("EnvironmentRegistryClient") .field("base_url", &self.base_url) .field("auth_provider", &"") .finish_non_exhaustive() } } -impl ExecutorRegistryClient { +impl EnvironmentRegistryClient { fn new(base_url: String, auth_provider: SharedAuthProvider) -> Result { let base_url = normalize_base_url(base_url)?; Ok(Self { @@ -42,15 +42,15 @@ impl ExecutorRegistryClient { }) } - async fn register_executor( + async fn register_environment( &self, - executor_id: &str, - ) -> Result { + environment_id: &str, + ) -> Result { let response = self .http .post(endpoint_url( &self.base_url, - &format!("/cloud/executor/{executor_id}/register"), + &format!("/cloud/environment/{environment_id}/register"), )) .headers(self.auth_provider.to_auth_headers()) .send() @@ -72,49 +72,49 @@ impl ExecutorRegistryClient { let status = response.status(); let body = response.text().await.unwrap_or_default(); if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN) { - return Err(executor_registry_auth_error(status, &body)); + return Err(environment_registry_auth_error(status, &body)); } - Err(executor_registry_http_error(status, &body)) + Err(environment_registry_http_error(status, &body)) } } #[derive(Debug, Clone, Eq, PartialEq, Deserialize)] -struct ExecutorRegistryExecutorRegistrationResponse { - executor_id: String, +struct EnvironmentRegistryEnvironmentRegistrationResponse { + environment_id: String, url: String, } /// Configuration for registering an exec-server for remote use. #[derive(Clone)] -pub struct RemoteExecutorConfig { +pub struct RemoteEnvironmentConfig { pub base_url: String, - pub executor_id: String, + pub environment_id: String, pub name: String, auth_provider: SharedAuthProvider, } -impl std::fmt::Debug for RemoteExecutorConfig { +impl std::fmt::Debug for RemoteEnvironmentConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RemoteExecutorConfig") + f.debug_struct("RemoteEnvironmentConfig") .field("base_url", &self.base_url) - .field("executor_id", &self.executor_id) + .field("environment_id", &self.environment_id) .field("name", &self.name) .field("auth_provider", &"") .finish() } } -impl RemoteExecutorConfig { +impl RemoteEnvironmentConfig { pub fn new( base_url: String, - executor_id: String, + environment_id: String, auth_provider: SharedAuthProvider, ) -> Result { - let executor_id = normalize_executor_id(executor_id)?; + let environment_id = normalize_environment_id(environment_id)?; Ok(Self { base_url, - executor_id, + environment_id, name: "codex-exec-server".to_string(), auth_provider, }) @@ -123,21 +123,21 @@ impl RemoteExecutorConfig { /// Register an exec-server for remote use and serve requests over the returned /// rendezvous websocket. -pub async fn run_remote_executor( - config: RemoteExecutorConfig, +pub async fn run_remote_environment( + config: RemoteEnvironmentConfig, runtime_paths: ExecServerRuntimePaths, ) -> Result<(), ExecServerError> { ensure_rustls_crypto_provider(); let client = - ExecutorRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?; + EnvironmentRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?; let processor = ConnectionProcessor::new(runtime_paths); let mut backoff = Duration::from_secs(1); loop { - let response = client.register_executor(&config.executor_id).await?; + let response = client.register_environment(&config.environment_id).await?; eprintln!( - "codex exec-server remote executor registered with executor_id {}", - response.executor_id + "codex exec-server remote environment registered with environment_id {}", + response.environment_id ); match connect_async(response.url.as_str()).await { @@ -155,14 +155,14 @@ pub async fn run_remote_executor( } } -fn normalize_executor_id(executor_id: String) -> Result { - let executor_id = executor_id.trim().to_string(); - if executor_id.is_empty() { - return Err(ExecServerError::ExecutorRegistryConfig( - "executor id is required for remote exec-server registration".to_string(), +fn normalize_environment_id(environment_id: String) -> Result { + let environment_id = environment_id.trim().to_string(); + if environment_id.is_empty() { + return Err(ExecServerError::EnvironmentRegistryConfig( + "environment id is required for remote exec-server registration".to_string(), )); } - Ok(executor_id) + Ok(environment_id) } #[derive(Deserialize)] @@ -179,8 +179,8 @@ struct RegistryError { fn normalize_base_url(base_url: String) -> Result { let trimmed = base_url.trim().trim_end_matches('/').to_string(); if trimmed.is_empty() { - return Err(ExecServerError::ExecutorRegistryConfig( - "executor registry base URL is required".to_string(), + return Err(ExecServerError::EnvironmentRegistryConfig( + "environment registry base URL is required".to_string(), )); } Ok(trimmed) @@ -190,14 +190,14 @@ fn endpoint_url(base_url: &str, path: &str) -> String { format!("{base_url}/{}", path.trim_start_matches('/')) } -fn executor_registry_auth_error(status: StatusCode, body: &str) -> ExecServerError { +fn environment_registry_auth_error(status: StatusCode, body: &str) -> ExecServerError { let message = registry_error_message(body).unwrap_or_else(|| "empty error body".to_string()); - ExecServerError::ExecutorRegistryAuth(format!( - "executor registry authentication failed ({status}): {message}" + ExecServerError::EnvironmentRegistryAuth(format!( + "environment registry authentication failed ({status}): {message}" )) } -fn executor_registry_http_error(status: StatusCode, body: &str) -> ExecServerError { +fn environment_registry_http_error(status: StatusCode, body: &str) -> ExecServerError { let parsed = serde_json::from_str::(body).ok(); let (code, message) = parsed .and_then(|body| body.error) @@ -216,7 +216,7 @@ fn executor_registry_http_error(status: StatusCode, body: &str) -> ExecServerErr .unwrap_or_else(|| "empty or malformed error body".to_string()), ) }); - ExecServerError::ExecutorRegistryHttp { + ExecServerError::EnvironmentRegistryHttp { status, code, message, @@ -277,44 +277,45 @@ mod tests { } #[tokio::test] - async fn register_executor_posts_with_auth_provider_headers() { + async fn register_environment_posts_with_auth_provider_headers() { let server = MockServer::start().await; - let config = RemoteExecutorConfig::new( + let config = RemoteEnvironmentConfig::new( server.uri(), "exec-requested".to_string(), static_registry_auth_provider(), ) .expect("config"); Mock::given(method("POST")) - .and(path("/cloud/executor/exec-requested/register")) + .and(path("/cloud/environment/exec-requested/register")) .and(header("authorization", "Bearer registry-token")) .and(header("chatgpt-account-id", "workspace-123")) .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ - "executor_id": "exec-1", - "url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc" + "environment_id": "exec-1", + "url": "wss://rendezvous.test/environment/exec-1?role=environment&sig=abc" }))) .mount(&server) .await; - let client = ExecutorRegistryClient::new(server.uri(), static_registry_auth_provider()) + let client = EnvironmentRegistryClient::new(server.uri(), static_registry_auth_provider()) .expect("client"); let response = client - .register_executor(&config.executor_id) + .register_environment(&config.environment_id) .await - .expect("register executor"); + .expect("register environment"); assert_eq!( response, - ExecutorRegistryExecutorRegistrationResponse { - executor_id: "exec-1".to_string(), - url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(), + EnvironmentRegistryEnvironmentRegistrationResponse { + environment_id: "exec-1".to_string(), + url: "wss://rendezvous.test/environment/exec-1?role=environment&sig=abc" + .to_string(), } ); } #[test] fn debug_output_redacts_auth_provider() { - let config = RemoteExecutorConfig::new( + let config = RemoteEnvironmentConfig::new( "https://registry.example".to_string(), "exec-1".to_string(), static_registry_auth_provider(), diff --git a/codex-rs/exec-server/tests/relay.rs b/codex-rs/exec-server/tests/relay.rs index 9f985e6982..80ac3697ff 100644 --- a/codex-rs/exec-server/tests/relay.rs +++ b/codex-rs/exec-server/tests/relay.rs @@ -20,7 +20,7 @@ use codex_app_server_protocol::RequestId; use codex_exec_server::ExecServerRuntimePaths; use codex_exec_server::InitializeParams; use codex_exec_server::InitializeResponse; -use codex_exec_server::RemoteExecutorConfig; +use codex_exec_server::RemoteEnvironmentConfig; use futures::SinkExt; use futures::StreamExt; use http::HeaderMap; @@ -45,7 +45,7 @@ use wiremock::matchers::header; use wiremock::matchers::method; use wiremock::matchers::path; -const EXECUTOR_ID: &str = "exec-mux-test"; +const ENVIRONMENT_ID: &str = "exec-mux-test"; const REGISTRY_TOKEN: &str = "registry-token"; const RELAY_MESSAGE_FRAME_VERSION: u32 = 1; const TEST_TIMEOUT: Duration = Duration::from_secs(5); @@ -67,15 +67,17 @@ fn static_registry_auth_provider() -> codex_api::SharedAuthProvider { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Result<()> { +async fn multiplexed_remote_environment_routes_independent_virtual_streams() -> Result<()> { let listener = TcpListener::bind("127.0.0.1:0").await?; let rendezvous_url = format!("ws://{}", listener.local_addr()?); let registry = MockServer::start().await; Mock::given(method("POST")) - .and(path(format!("/cloud/executor/{EXECUTOR_ID}/register"))) + .and(path(format!( + "/cloud/environment/{ENVIRONMENT_ID}/register" + ))) .and(header("authorization", format!("Bearer {REGISTRY_TOKEN}"))) .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ - "executor_id": EXECUTOR_ID, + "environment_id": ENVIRONMENT_ID, "url": rendezvous_url, }))) .mount(®istry) @@ -83,22 +85,22 @@ async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Res let (codex_exe, codex_linux_sandbox_exe) = common::current_test_binary_helper_paths()?; let runtime_paths = ExecServerRuntimePaths::new(codex_exe, codex_linux_sandbox_exe)?; - let config = RemoteExecutorConfig::new( + let config = RemoteEnvironmentConfig::new( registry.uri(), - EXECUTOR_ID.to_string(), + ENVIRONMENT_ID.to_string(), static_registry_auth_provider(), )?; - let remote_executor = tokio::spawn(codex_exec_server::run_remote_executor( + let remote_environment = tokio::spawn(codex_exec_server::run_remote_environment( config, runtime_paths, )); let (socket, _peer_addr) = timeout(TEST_TIMEOUT, listener.accept()) .await - .context("remote executor should connect to fake rendezvous")??; + .context("remote environment should connect to fake rendezvous")??; let mut websocket = timeout(TEST_TIMEOUT, accept_async(socket)) .await - .context("fake rendezvous should accept executor websocket")??; + .context("fake rendezvous should accept environment websocket")??; let stream_a = "stream-a"; let stream_b = "stream-b"; @@ -192,8 +194,8 @@ async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Res )?; websocket.close(None).await?; - remote_executor.abort(); - let _ = remote_executor.await; + remote_environment.abort(); + let _ = remote_environment.await; Ok(()) } @@ -285,7 +287,7 @@ where let frame = timeout(TEST_TIMEOUT, websocket.next()) .await .context("timed out waiting for relay frame")? - .ok_or_else(|| anyhow!("executor websocket closed"))??; + .ok_or_else(|| anyhow!("environment websocket closed"))??; match frame { Message::Binary(bytes) => { let frame = RelayMessageFrame::decode(bytes.as_ref())?; @@ -297,8 +299,8 @@ where return Ok((stream_id, message)); } Message::Ping(_) | Message::Pong(_) => {} - Message::Close(_) => bail!("executor websocket closed"), - Message::Text(_) => bail!("executor sent text frame on relay websocket"), + Message::Close(_) => bail!("environment websocket closed"), + Message::Text(_) => bail!("environment sent text frame on relay websocket"), Message::Frame(_) => {} } }