diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index cb359a45c7..e988f4b2ba 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2610,6 +2610,7 @@ dependencies = [ "codex-app-server-protocol", "codex-client", "codex-file-system", + "codex-login", "codex-protocol", "codex-sandboxing", "codex-test-binary-support", @@ -2622,6 +2623,7 @@ dependencies = [ "serde", "serde_json", "serial_test", + "sha2", "tempfile", "test-case", "thiserror 2.0.18", @@ -2630,6 +2632,7 @@ dependencies = [ "tokio-util", "tracing", "uuid", + "wiremock", ] [[package]] diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 852dff616d..b51f1aa5b9 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -451,6 +451,22 @@ struct ExecServerCommand { default_value = "ws://127.0.0.1:0" )] listen: String, + + /// Register this exec-server as a cloud executor instead of listening locally. + #[arg(long = "cloud", default_value_t = false)] + cloud: bool, + + /// Cloud environments service base URL. + #[arg(long = "cloud-base-url", value_name = "URL")] + cloud_base_url: Option, + + /// Existing cloud environment id to attach to. Omit to let the service create one. + #[arg(long = "cloud-environment-id", value_name = "ID")] + cloud_environment_id: Option, + + /// Human-readable executor name. + #[arg(long = "cloud-name", value_name = "NAME")] + cloud_name: Option, } #[derive(Debug, clap::Subcommand)] @@ -1155,7 +1171,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { root_remote_auth_token_env.as_deref(), "exec-server", )?; - run_exec_server_command(cmd, &arg0_paths).await?; + run_exec_server_command(cmd, &arg0_paths, root_config_overrides.clone()).await?; } Some(Subcommand::Features(FeaturesCli { sub })) => match sub { FeaturesSubcommand::List => { @@ -1230,6 +1246,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { async fn run_exec_server_command( cmd: ExecServerCommand, arg0_paths: &Arg0DispatchPaths, + root_config_overrides: CliConfigOverrides, ) -> anyhow::Result<()> { let codex_self_exe = arg0_paths .codex_self_exe @@ -1239,6 +1256,31 @@ async fn run_exec_server_command( codex_self_exe, arg0_paths.codex_linux_sandbox_exe.clone(), )?; + if cmd.cloud { + let cloud_base_url = cmd + .cloud_base_url + .or_else(|| { + std::env::var(codex_exec_server::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR).ok() + }) + .ok_or_else(|| { + anyhow::anyhow!( + "--cloud-base-url or CODEX_CLOUD_ENVIRONMENTS_BASE_URL is required in cloud mode" + ) + })?; + let cli_overrides = root_config_overrides + .parse_overrides() + .map_err(anyhow::Error::msg)?; + let config = Config::load_with_cli_overrides(cli_overrides).await?; + let auth_manager = + AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false); + let mut cloud_config = codex_exec_server::CloudExecutorConfig::new(cloud_base_url); + cloud_config.cloud_environment_id = cmd.cloud_environment_id; + if let Some(name) = cmd.cloud_name { + cloud_config.cloud_name = name; + } + codex_exec_server::run_cloud_executor(cloud_config, auth_manager, runtime_paths).await?; + return Ok(()); + } codex_exec_server::run_main(&cmd.listen, runtime_paths) .await .map_err(anyhow::Error::from_boxed) diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index 5f31ca4329..3cf9e290a9 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -18,14 +18,16 @@ bytes = { workspace = true } codex-app-server-protocol = { workspace = true } codex-client = { workspace = true } codex-file-system = { workspace = true } +codex-login = { workspace = true } codex-protocol = { workspace = true } codex-sandboxing = { workspace = true } codex-utils-absolute-path = { workspace = true } codex-utils-pty = { workspace = true } futures = { workspace = true } -reqwest = { workspace = true, features = ["rustls-tls", "stream"] } +reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +sha2 = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = [ "fs", @@ -51,3 +53,4 @@ pretty_assertions = { workspace = true } serial_test = { workspace = true } tempfile = { workspace = true } test-case = "3.3.1" +wiremock = { workspace = true } diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index 78b92e1a71..2a731158b5 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -22,6 +22,12 @@ the wire. The CLI entrypoint supports: - `ws://IP:PORT` (default) +- `--cloud --cloud-base-url URL [--cloud-environment-id ID] [--cloud-name NAME]` + +Cloud mode registers the local exec-server with the cloud environments service, +then reconnects to the service-provided rendezvous websocket as the executor. +It requires ChatGPT auth, and `CODEX_CLOUD_ENVIRONMENTS_BASE_URL` can be used +instead of `--cloud-base-url`. Wire framing: @@ -308,6 +314,8 @@ The crate exports: - `DEFAULT_LISTEN_URL` and `ExecServerListenUrlParseError` - `ExecServerRuntimePaths` - `run_main()` for embedding the websocket server +- `CloudExecutorConfig` and `run_cloud_executor()` for embedding cloud + registration mode Callers must pass `ExecServerRuntimePaths` to `run_main()`. The top-level `codex exec-server` command builds these paths from the `codex` arg0 dispatch diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index f26069ac7a..f953bd83a0 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -254,6 +254,18 @@ pub enum ExecServerError { Protocol(String), #[error("exec-server rejected request ({code}): {message}")] Server { code: i64, message: String }, + #[error("cloud environments request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())] + CloudEnvironmentHttp { + status: reqwest::StatusCode, + code: Option, + message: String, + }, + #[error("cloud environment configuration error: {0}")] + CloudEnvironmentConfig(String), + #[error("cloud environment authentication error: {0}")] + CloudEnvironmentAuth(String), + #[error("cloud environments request failed: {0}")] + CloudEnvironmentRequest(#[from] reqwest::Error), } impl ExecServerClient { diff --git a/codex-rs/exec-server/src/cloud.rs b/codex-rs/exec-server/src/cloud.rs new file mode 100644 index 0000000000..b4c635686d --- /dev/null +++ b/codex-rs/exec-server/src/cloud.rs @@ -0,0 +1,391 @@ +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::Duration; + +use codex_login::AuthManager; +use codex_login::CodexAuth; +use reqwest::StatusCode; +use serde::Deserialize; +use serde::Serialize; +use serde_json::Value; +use sha2::Digest as _; +use tokio::time::sleep; +use tokio_tungstenite::connect_async; +use tracing::info; +use tracing::warn; + +use crate::ExecServerError; +use crate::ExecServerRuntimePaths; +use crate::connection::JsonRpcConnection; +use crate::server::ConnectionProcessor; + +/// Environment variable fallback for the cloud environments base URL. +pub const CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR: &str = "CODEX_CLOUD_ENVIRONMENTS_BASE_URL"; + +const PROTOCOL_VERSION: &str = "codex-exec-server-v1"; +const ERROR_BODY_PREVIEW_BYTES: usize = 4096; + +#[derive(Clone)] +struct CloudEnvironmentClient { + base_url: String, + http: reqwest::Client, + auth_manager: Arc, +} + +impl std::fmt::Debug for CloudEnvironmentClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CloudEnvironmentClient") + .field("base_url", &self.base_url) + .finish_non_exhaustive() + } +} + +impl CloudEnvironmentClient { + fn new(base_url: String, auth_manager: Arc) -> Result { + let base_url = normalize_base_url(base_url)?; + Ok(Self { + base_url, + http: reqwest::Client::new(), + auth_manager, + }) + } + + async fn register_executor( + &self, + request: &CloudEnvironmentRegisterExecutorRequest, + ) -> Result { + self.post_json("/api/cloud/executor", request).await + } + + async fn post_json(&self, path: &str, request: &T) -> Result + where + T: Serialize + Sync, + R: for<'de> Deserialize<'de>, + { + for attempt in 0..=1 { + let auth = cloud_environment_chatgpt_auth(&self.auth_manager).await?; + let response = self + .http + .post(endpoint_url(&self.base_url, path)) + .bearer_auth(chatgpt_bearer_token(&auth)?) + .header("chatgpt-account-id", chatgpt_account_id(&auth)?) + .json(request) + .send() + .await?; + + if response.status().is_success() { + return response.json::().await.map_err(ExecServerError::from); + } + + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN) + && attempt == 0 + && recover_unauthorized(&self.auth_manager).await + { + continue; + } + + return Err(cloud_http_error(status, &body)); + } + + unreachable!("cloud environments request loop is bounded to two attempts") + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize)] +struct CloudEnvironmentRegisterExecutorRequest { + idempotency_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + environment_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + name: Option, + labels: BTreeMap, + metadata: Value, +} + +#[derive(Debug, Clone, Eq, PartialEq, Deserialize)] +struct CloudEnvironmentExecutorRegistrationResponse { + id: String, + environment_id: String, + url: String, +} + +/// Configuration for registering an exec-server with cloud environments. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct CloudExecutorConfig { + pub cloud_base_url: String, + pub cloud_environment_id: Option, + pub cloud_name: String, +} + +impl CloudExecutorConfig { + pub fn new(cloud_base_url: String) -> Self { + Self { + cloud_base_url, + cloud_environment_id: None, + cloud_name: "codex-exec-server".to_string(), + } + } + + fn registration_request( + &self, + auth: &CodexAuth, + ) -> Result { + Ok(CloudEnvironmentRegisterExecutorRequest { + idempotency_id: self.default_idempotency_id(auth)?, + environment_id: self.cloud_environment_id.clone(), + name: Some(self.cloud_name.clone()), + labels: BTreeMap::new(), + metadata: Value::Object(Default::default()), + }) + } + + fn default_idempotency_id(&self, auth: &CodexAuth) -> Result { + let mut hasher = sha2::Sha256::new(); + hasher.update(chatgpt_account_id(auth)?.as_bytes()); + hasher.update(b"\0"); + hasher.update(self.cloud_environment_id.as_deref().unwrap_or("auto")); + hasher.update(b"\0"); + hasher.update(self.cloud_name.as_bytes()); + hasher.update(b"\0"); + hasher.update(PROTOCOL_VERSION); + let digest = hasher.finalize(); + Ok(format!("codex-exec-server-{digest:x}")) + } +} + +/// Register an exec-server with cloud environments and serve requests over the +/// returned rendezvous websocket. +pub async fn run_cloud_executor( + config: CloudExecutorConfig, + auth_manager: Arc, + runtime_paths: ExecServerRuntimePaths, +) -> Result<(), ExecServerError> { + let client = CloudEnvironmentClient::new(config.cloud_base_url.clone(), auth_manager.clone())?; + let processor = ConnectionProcessor::new(runtime_paths); + let mut backoff = Duration::from_secs(1); + + loop { + let auth = cloud_environment_chatgpt_auth(&auth_manager).await?; + let request = config.registration_request(&auth)?; + let response = client.register_executor(&request).await?; + eprintln!( + "codex exec-server cloud executor {} registered in environment {}", + response.id, response.environment_id + ); + + match connect_async(response.url.as_str()).await { + Ok((websocket, _)) => { + backoff = Duration::from_secs(1); + processor + .run_connection(JsonRpcConnection::from_websocket( + websocket, + "cloud exec-server websocket".to_string(), + )) + .await; + } + Err(err) => { + warn!("failed to connect cloud exec-server websocket: {err}"); + } + } + + sleep(backoff).await; + backoff = (backoff * 2).min(Duration::from_secs(30)); + } +} + +async fn cloud_environment_chatgpt_auth( + auth_manager: &AuthManager, +) -> Result { + let mut reloaded = false; + let auth = loop { + let Some(auth) = auth_manager.auth().await else { + if reloaded { + return Err(ExecServerError::CloudEnvironmentAuth( + "cloud environments require ChatGPT authentication".to_string(), + )); + } + auth_manager.reload(); + reloaded = true; + continue; + }; + if !auth.is_chatgpt_auth() { + return Err(ExecServerError::CloudEnvironmentAuth( + "cloud environments require ChatGPT authentication; API key auth is not supported" + .to_string(), + )); + } + if auth.get_account_id().is_none() && !reloaded { + auth_manager.reload(); + reloaded = true; + continue; + } + break auth; + }; + + let _ = chatgpt_bearer_token(&auth)?; + let _ = chatgpt_account_id(&auth)?; + Ok(auth) +} + +fn chatgpt_bearer_token(auth: &CodexAuth) -> Result { + auth.get_token() + .map_err(|err| ExecServerError::CloudEnvironmentAuth(err.to_string())) + .and_then(|token| { + if token.is_empty() { + Err(ExecServerError::CloudEnvironmentAuth( + "cloud environments require a non-empty ChatGPT bearer token".to_string(), + )) + } else { + Ok(token) + } + }) +} + +fn chatgpt_account_id(auth: &CodexAuth) -> Result { + auth.get_account_id().ok_or_else(|| { + ExecServerError::CloudEnvironmentAuth( + "cloud environments are waiting for a ChatGPT account id".to_string(), + ) + }) +} + +async fn recover_unauthorized(auth_manager: &Arc) -> bool { + let mut recovery = auth_manager.unauthorized_recovery(); + if !recovery.has_next() { + return false; + } + + let mode = recovery.mode_name(); + let step = recovery.step_name(); + match recovery.next().await { + Ok(step_result) => { + info!( + "cloud environment auth recovery succeeded: mode={mode}, step={step}, auth_state_changed={:?}", + step_result.auth_state_changed() + ); + true + } + Err(err) => { + warn!("cloud environment auth recovery failed: mode={mode}, step={step}: {err}"); + false + } + } +} + +#[derive(Deserialize)] +struct CloudErrorBody { + error: Option, +} + +#[derive(Deserialize)] +struct CloudError { + code: Option, + message: Option, +} + +fn normalize_base_url(base_url: String) -> Result { + let trimmed = base_url.trim().trim_end_matches('/').to_string(); + if trimmed.is_empty() { + return Err(ExecServerError::CloudEnvironmentConfig( + "cloud environments base URL is required".to_string(), + )); + } + Ok(trimmed) +} + +fn endpoint_url(base_url: &str, path: &str) -> String { + format!("{base_url}/{}", path.trim_start_matches('/')) +} + +fn cloud_http_error(status: StatusCode, body: &str) -> ExecServerError { + let parsed = serde_json::from_str::(body).ok(); + let (code, message) = parsed + .and_then(|body| body.error) + .map(|error| { + ( + error.code, + error.message.unwrap_or_else(|| { + preview_error_body(body).unwrap_or_else(|| "empty error body".to_string()) + }), + ) + }) + .unwrap_or_else(|| { + ( + None, + preview_error_body(body) + .unwrap_or_else(|| "empty or malformed error body".to_string()), + ) + }); + ExecServerError::CloudEnvironmentHttp { + status, + code, + message, + } +} + +fn preview_error_body(body: &str) -> Option { + let trimmed = body.trim(); + if trimmed.is_empty() { + return None; + } + Some(trimmed.chars().take(ERROR_BODY_PREVIEW_BYTES).collect()) +} + +#[cfg(test)] +mod tests { + use codex_login::CodexAuth; + use pretty_assertions::assert_eq; + use serde_json::json; + use wiremock::Mock; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + use wiremock::matchers::body_json; + use wiremock::matchers::header; + use wiremock::matchers::method; + use wiremock::matchers::path; + + use super::*; + + fn auth_manager() -> Arc { + AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + } + + #[tokio::test] + async fn register_executor_posts_with_chatgpt_auth_headers() { + let server = MockServer::start().await; + let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); + let request = CloudExecutorConfig::new(server.uri()) + .registration_request(&auth) + .expect("registration request"); + let expected_request = serde_json::to_value(&request).expect("serialize request"); + Mock::given(method("POST")) + .and(path("/api/cloud/executor")) + .and(header("authorization", "Bearer Access Token")) + .and(header("chatgpt-account-id", "account_id")) + .and(body_json(expected_request)) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "id": "exec-1", + "environment_id": "env-1", + "url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc" + }))) + .mount(&server) + .await; + let client = CloudEnvironmentClient::new(server.uri(), auth_manager()).expect("client"); + + let response = client + .register_executor(&request) + .await + .expect("register executor"); + + assert_eq!( + response, + CloudEnvironmentExecutorRegistrationResponse { + id: "exec-1".to_string(), + environment_id: "env-1".to_string(), + url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(), + } + ); + } +} diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index e739b05644..e8d27f27d4 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -1,5 +1,6 @@ mod client; mod client_api; +mod cloud; mod connection; mod environment; mod fs_helper; @@ -32,6 +33,9 @@ pub use codex_file_system::FileSystemResult; pub use codex_file_system::FileSystemSandboxContext; pub use codex_file_system::ReadDirectoryEntry; pub use codex_file_system::RemoveOptions; +pub use cloud::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR; +pub use cloud::CloudExecutorConfig; +pub use cloud::run_cloud_executor; pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR; pub use environment::Environment; pub use environment::EnvironmentManager; diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index 62c1787381..bf33eb77ba 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -7,6 +7,7 @@ mod session_registry; mod transport; pub(crate) use handler::ExecServerHandler; +pub(crate) use processor::ConnectionProcessor; pub use transport::DEFAULT_LISTEN_URL; pub use transport::ExecServerListenUrlParseError;