From d0f9d5eba2004c9a2057bc66baff5fe67ff60ff6 Mon Sep 17 00:00:00 2001 From: Michael Zeng Date: Tue, 5 May 2026 15:01:48 -0700 Subject: [PATCH] Add cloud executor registration to exec-server (#19575) ## Summary This PR adds the first `codex-rs` milestone for remote-exec e2e: a local `codex exec-server` can now register itself with `codex-cloud-environments` and attach to the returned rendezvous websocket. At a high level, `codex exec-server --cloud ...` now: - loads ChatGPT auth from normal Codex config - registers an executor with `codex-cloud-environments` - receives a signed rendezvous websocket URL - serves the existing exec-server JSON-RPC protocol over that websocket ## What Changed - Added `--cloud`, `--cloud-base-url`, `--cloud-environment-id`, and `--cloud-name` to `codex exec-server` - Added a new `exec-server/src/cloud.rs` module that handles: - registration requests - auth/header setup - bounded auth retry on `401/403` - reconnect/backoff after websocket disconnects - Reused the existing `ConnectionProcessor` / `ExecServerHandler` path so cloud mode serves the same exec/filesystem RPC surface as local websocket mode - Added cloud-specific error variants and minimal docs for the new mode ## Testing Manual e2e test that fully goes through exec server flow with our codex cloud agent as orchestrator --- codex-rs/Cargo.lock | 2 + codex-rs/cli/src/main.rs | 38 ++- codex-rs/exec-server/Cargo.toml | 4 +- codex-rs/exec-server/README.md | 7 + codex-rs/exec-server/src/client.rs | 12 + codex-rs/exec-server/src/lib.rs | 4 + codex-rs/exec-server/src/remote.rs | 392 +++++++++++++++++++++++++++++ codex-rs/exec-server/src/server.rs | 1 + 8 files changed, 452 insertions(+), 8 deletions(-) create mode 100644 codex-rs/exec-server/src/remote.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 7819fa223a..cd3e93c9cb 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2701,6 +2701,7 @@ dependencies = [ "serde", "serde_json", "serial_test", + "sha2", "tempfile", "test-case", "thiserror 2.0.18", @@ -2709,6 +2710,7 @@ dependencies = [ "tokio-util", "tracing", "uuid", + "wiremock", ] [[package]] diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 5b5ad7a2a3..5a8ffab36a 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -447,12 +447,20 @@ struct AppServerCommand { #[derive(Debug, Parser)] struct ExecServerCommand { /// Transport endpoint URL. Supported values: `ws://IP:PORT` (default), `stdio`, `stdio://`. - #[arg( - long = "listen", - value_name = "URL", - default_value = "ws://127.0.0.1:0" - )] - listen: String, + #[arg(long = "listen", value_name = "URL", conflicts_with = "remote")] + listen: Option, + + /// Register this exec-server as a remote executor using the given base URL. + #[arg(long = "remote", value_name = "URL", requires = "executor_id")] + remote: Option, + + /// Executor id to attach to when registering remotely. + #[arg(long = "executor-id", value_name = "ID")] + executor_id: Option, + + /// Human-readable executor name. + #[arg(long = "name", value_name = "NAME")] + name: Option, } #[derive(Debug, clap::Subcommand)] @@ -1264,7 +1272,23 @@ async fn run_exec_server_command( codex_self_exe, arg0_paths.codex_linux_sandbox_exe.clone(), )?; - codex_exec_server::run_main(&cmd.listen, runtime_paths) + if let Some(base_url) = cmd.remote { + let executor_id = cmd + .executor_id + .ok_or_else(|| anyhow::anyhow!("--executor-id is required when --remote is set"))?; + let mut remote_config = + codex_exec_server::RemoteExecutorConfig::new(base_url, executor_id)?; + if let Some(name) = cmd.name { + remote_config.name = name; + } + codex_exec_server::run_remote_executor(remote_config, runtime_paths).await?; + return Ok(()); + } + let listen_url = cmd + .listen + .as_deref() + .unwrap_or(codex_exec_server::DEFAULT_LISTEN_URL); + codex_exec_server::run_main(listen_url, runtime_paths) .await .map_err(anyhow::Error::from_boxed) } diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index 5f31ca4329..1495397c78 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -23,9 +23,10 @@ codex-sandboxing = { workspace = true } codex-utils-absolute-path = { workspace = true } codex-utils-pty = { workspace = true } futures = { workspace = true } -reqwest = { workspace = true, features = ["rustls-tls", "stream"] } +reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +sha2 = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = [ "fs", @@ -51,3 +52,4 @@ pretty_assertions = { workspace = true } serial_test = { workspace = true } tempfile = { workspace = true } test-case = "3.3.1" +wiremock = { workspace = true } diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index 78b92e1a71..81664eaca0 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -22,6 +22,11 @@ the wire. The CLI entrypoint supports: - `ws://IP:PORT` (default) +- `--remote URL --executor-id ID [--name NAME]` + +Remote mode registers the local exec-server with the executor registry, +then reconnects to the service-provided rendezvous websocket as the executor. +It requires a bearer token in `CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN`. Wire framing: @@ -308,6 +313,8 @@ The crate exports: - `DEFAULT_LISTEN_URL` and `ExecServerListenUrlParseError` - `ExecServerRuntimePaths` - `run_main()` for embedding the websocket server +- `RemoteExecutorConfig` and `run_remote_executor()` for embedding remote + registration mode Callers must pass `ExecServerRuntimePaths` to `run_main()`. The top-level `codex exec-server` command builds these paths from the `codex` arg0 dispatch diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index f26069ac7a..47359393d3 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -254,6 +254,18 @@ pub enum ExecServerError { Protocol(String), #[error("exec-server rejected request ({code}): {message}")] Server { code: i64, message: String }, + #[error("executor registry request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())] + ExecutorRegistryHttp { + status: reqwest::StatusCode, + code: Option, + message: String, + }, + #[error("executor registry configuration error: {0}")] + ExecutorRegistryConfig(String), + #[error("executor registry authentication error: {0}")] + ExecutorRegistryAuth(String), + #[error("executor registry request failed: {0}")] + ExecutorRegistryRequest(#[from] reqwest::Error), } impl ExecServerClient { diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 1550653d94..d860d59aba 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -11,6 +11,7 @@ mod local_process; mod process; mod process_id; mod protocol; +mod remote; mod remote_file_system; mod remote_process; mod rpc; @@ -87,6 +88,9 @@ pub use protocol::TerminateResponse; pub use protocol::WriteParams; pub use protocol::WriteResponse; pub use protocol::WriteStatus; +pub use remote::CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR; +pub use remote::RemoteExecutorConfig; +pub use remote::run_remote_executor; pub use runtime_paths::ExecServerRuntimePaths; pub use server::DEFAULT_LISTEN_URL; pub use server::ExecServerListenUrlParseError; diff --git a/codex-rs/exec-server/src/remote.rs b/codex-rs/exec-server/src/remote.rs new file mode 100644 index 0000000000..b574ced72f --- /dev/null +++ b/codex-rs/exec-server/src/remote.rs @@ -0,0 +1,392 @@ +use std::collections::BTreeMap; +use std::env; +use std::time::Duration; + +use reqwest::StatusCode; +use serde::Deserialize; +use serde::Serialize; +use serde_json::Value; +use sha2::Digest as _; +use tokio::time::sleep; +use tokio_tungstenite::connect_async; +use tracing::warn; +use uuid::Uuid; + +use crate::ExecServerError; +use crate::ExecServerRuntimePaths; +use crate::connection::JsonRpcConnection; +use crate::server::ConnectionProcessor; + +pub const CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR: &str = + "CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN"; + +const PROTOCOL_VERSION: &str = "codex-exec-server-v1"; +const ERROR_BODY_PREVIEW_BYTES: usize = 4096; + +#[derive(Clone)] +struct ExecutorRegistryClient { + base_url: String, + bearer_token: String, + http: reqwest::Client, +} + +impl std::fmt::Debug for ExecutorRegistryClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExecutorRegistryClient") + .field("base_url", &self.base_url) + .field("bearer_token", &"") + .finish_non_exhaustive() + } +} + +impl ExecutorRegistryClient { + fn new(base_url: String, bearer_token: String) -> Result { + let base_url = normalize_base_url(base_url)?; + Ok(Self { + base_url, + bearer_token, + http: reqwest::Client::new(), + }) + } + + async fn register_executor( + &self, + request: &ExecutorRegistryRegisterExecutorRequest, + ) -> Result { + self.post_json( + &format!("/cloud/executor/{}/register", request.executor_id), + request, + ) + .await + } + + async fn post_json(&self, path: &str, request: &T) -> Result + where + T: Serialize + Sync, + R: for<'de> Deserialize<'de>, + { + let response = self + .http + .post(endpoint_url(&self.base_url, path)) + .bearer_auth(&self.bearer_token) + .json(request) + .send() + .await?; + + if response.status().is_success() { + return response.json::().await.map_err(ExecServerError::from); + } + + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN) { + return Err(executor_registry_auth_error(status, &body)); + } + + Err(executor_registry_http_error(status, &body)) + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize)] +struct ExecutorRegistryRegisterExecutorRequest { + idempotency_id: String, + executor_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + name: Option, + labels: BTreeMap, + metadata: Value, +} + +#[derive(Debug, Clone, Eq, PartialEq, Deserialize)] +struct ExecutorRegistryExecutorRegistrationResponse { + id: String, + executor_id: String, + url: String, +} + +/// Configuration for registering an exec-server for remote use. +#[derive(Clone, Eq, PartialEq)] +pub struct RemoteExecutorConfig { + pub base_url: String, + pub executor_id: String, + pub name: String, + bearer_token: String, +} + +impl std::fmt::Debug for RemoteExecutorConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RemoteExecutorConfig") + .field("base_url", &self.base_url) + .field("executor_id", &self.executor_id) + .field("name", &self.name) + .field("bearer_token", &"") + .finish() + } +} + +impl RemoteExecutorConfig { + pub fn new(base_url: String, executor_id: String) -> Result { + Self::with_bearer_token(base_url, executor_id, read_remote_bearer_token_from_env()?) + } + + fn with_bearer_token( + base_url: String, + executor_id: String, + bearer_token: String, + ) -> Result { + let executor_id = normalize_executor_id(executor_id)?; + let bearer_token = normalize_bearer_token(bearer_token)?; + Ok(Self { + base_url, + executor_id, + name: "codex-exec-server".to_string(), + bearer_token, + }) + } + + fn registration_request( + &self, + registration_id: Uuid, + ) -> ExecutorRegistryRegisterExecutorRequest { + ExecutorRegistryRegisterExecutorRequest { + idempotency_id: self.default_idempotency_id(registration_id), + executor_id: self.executor_id.clone(), + name: Some(self.name.clone()), + labels: BTreeMap::new(), + metadata: Value::Object(Default::default()), + } + } + + fn default_idempotency_id(&self, registration_id: Uuid) -> String { + let mut hasher = sha2::Sha256::new(); + hasher.update(self.executor_id.as_bytes()); + hasher.update(b"\0"); + hasher.update(self.name.as_bytes()); + hasher.update(b"\0"); + hasher.update(PROTOCOL_VERSION); + hasher.update(b"\0"); + hasher.update(registration_id.as_bytes()); + let digest = hasher.finalize(); + format!("codex-exec-server-{digest:x}") + } +} + +/// Register an exec-server for remote use and serve requests over the returned +/// rendezvous websocket. +pub async fn run_remote_executor( + config: RemoteExecutorConfig, + runtime_paths: ExecServerRuntimePaths, +) -> Result<(), ExecServerError> { + let client = ExecutorRegistryClient::new(config.base_url.clone(), config.bearer_token.clone())?; + let processor = ConnectionProcessor::new(runtime_paths); + let registration_id = Uuid::new_v4(); + let mut backoff = Duration::from_secs(1); + + loop { + let request = config.registration_request(registration_id); + let response = client.register_executor(&request).await?; + eprintln!( + "codex exec-server remote executor {} registered with executor_id {}", + response.id, response.executor_id + ); + + match connect_async(response.url.as_str()).await { + Ok((websocket, _)) => { + backoff = Duration::from_secs(1); + processor + .run_connection(JsonRpcConnection::from_websocket( + websocket, + "remote exec-server websocket".to_string(), + )) + .await; + } + Err(err) => { + warn!("failed to connect remote exec-server websocket: {err}"); + } + } + + sleep(backoff).await; + backoff = (backoff * 2).min(Duration::from_secs(30)); + } +} + +fn read_remote_bearer_token_from_env() -> Result { + read_remote_bearer_token_from_env_with(|name| env::var(name)) +} + +fn read_remote_bearer_token_from_env_with(get_var: F) -> Result +where + F: FnOnce(&str) -> Result, +{ + let bearer_token = get_var(CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR).map_err(|_| { + ExecServerError::ExecutorRegistryAuth(format!( + "executor registry bearer token environment variable `{CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR}` is not set" + )) + })?; + normalize_bearer_token(bearer_token) +} + +fn normalize_bearer_token(bearer_token: String) -> Result { + let bearer_token = bearer_token.trim().to_string(); + if bearer_token.is_empty() { + return Err(ExecServerError::ExecutorRegistryAuth(format!( + "executor registry bearer token environment variable `{CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR}` is empty" + ))); + } + Ok(bearer_token) +} + +fn normalize_executor_id(executor_id: String) -> Result { + let executor_id = executor_id.trim().to_string(); + if executor_id.is_empty() { + return Err(ExecServerError::ExecutorRegistryConfig( + "executor id is required for remote exec-server registration".to_string(), + )); + } + Ok(executor_id) +} + +#[derive(Deserialize)] +struct RegistryErrorBody { + error: Option, +} + +#[derive(Deserialize)] +struct RegistryError { + code: Option, + message: Option, +} + +fn normalize_base_url(base_url: String) -> Result { + let trimmed = base_url.trim().trim_end_matches('/').to_string(); + if trimmed.is_empty() { + return Err(ExecServerError::ExecutorRegistryConfig( + "executor registry base URL is required".to_string(), + )); + } + Ok(trimmed) +} + +fn endpoint_url(base_url: &str, path: &str) -> String { + format!("{base_url}/{}", path.trim_start_matches('/')) +} + +fn executor_registry_auth_error(status: StatusCode, body: &str) -> ExecServerError { + let message = registry_error_message(body).unwrap_or_else(|| "empty error body".to_string()); + ExecServerError::ExecutorRegistryAuth(format!( + "executor registry authentication failed ({status}): {message}" + )) +} + +fn executor_registry_http_error(status: StatusCode, body: &str) -> ExecServerError { + let parsed = serde_json::from_str::(body).ok(); + let (code, message) = parsed + .and_then(|body| body.error) + .map(|error| { + ( + error.code, + error.message.unwrap_or_else(|| { + preview_error_body(body).unwrap_or_else(|| "empty error body".to_string()) + }), + ) + }) + .unwrap_or_else(|| { + ( + None, + preview_error_body(body) + .unwrap_or_else(|| "empty or malformed error body".to_string()), + ) + }); + ExecServerError::ExecutorRegistryHttp { + status, + code, + message, + } +} + +fn registry_error_message(body: &str) -> Option { + serde_json::from_str::(body) + .ok() + .and_then(|body| body.error) + .and_then(|error| error.message) + .or_else(|| preview_error_body(body)) +} + +fn preview_error_body(body: &str) -> Option { + let trimmed = body.trim(); + if trimmed.is_empty() { + return None; + } + Some(trimmed.chars().take(ERROR_BODY_PREVIEW_BYTES).collect()) +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + use serde_json::json; + use wiremock::Mock; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + use wiremock::matchers::body_json; + use wiremock::matchers::header; + use wiremock::matchers::method; + use wiremock::matchers::path; + + use super::*; + + #[tokio::test] + async fn register_executor_posts_with_bearer_token_header() { + let server = MockServer::start().await; + let registration_id = Uuid::from_u128(1); + let config = RemoteExecutorConfig::with_bearer_token( + server.uri(), + "exec-requested".to_string(), + "registry-token".to_string(), + ) + .expect("config"); + let request = config.registration_request(registration_id); + let expected_request = serde_json::to_value(&request).expect("serialize request"); + Mock::given(method("POST")) + .and(path("/cloud/executor/exec-requested/register")) + .and(header("authorization", "Bearer registry-token")) + .and(body_json(expected_request)) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "id": "registration-1", + "executor_id": "exec-1", + "url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc" + }))) + .mount(&server) + .await; + let client = ExecutorRegistryClient::new(server.uri(), "registry-token".to_string()) + .expect("client"); + + let response = client + .register_executor(&request) + .await + .expect("register executor"); + + assert_eq!( + response, + ExecutorRegistryExecutorRegistrationResponse { + id: "registration-1".to_string(), + executor_id: "exec-1".to_string(), + url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(), + } + ); + } + + #[test] + fn debug_output_redacts_bearer_token() { + let config = RemoteExecutorConfig::with_bearer_token( + "https://registry.example".to_string(), + "exec-1".to_string(), + "secret-token".to_string(), + ) + .expect("config"); + + let debug = format!("{config:?}"); + + assert!(debug.contains("")); + assert!(!debug.contains("secret-token")); + } +} diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index 62c1787381..bf33eb77ba 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -7,6 +7,7 @@ mod session_registry; mod transport; pub(crate) use handler::ExecServerHandler; +pub(crate) use processor::ConnectionProcessor; pub use transport::DEFAULT_LISTEN_URL; pub use transport::ExecServerListenUrlParseError;