Compare commits

...

8 Commits

Author SHA1 Message Date
Michael Zeng
c1d9533bdc Clean up remote exec-server replay 2026-04-27 18:00:11 -07:00
Michael Zeng
850d481b57 Restore exec-server auth handling 2026-04-27 17:52:03 -07:00
Michael Zeng
dd4c55ecad Address exec-server remote mode review feedback 2026-04-27 17:51:47 -07:00
Michael Zeng
8a01ea0560 Rename remote exec-server options and API 2026-04-27 17:51:38 -07:00
Michael Zeng
e6d20646a1 Rename exec-server --cloud flag to --remote 2026-04-27 17:51:29 -07:00
Michael Zeng
303b5ea9f6 Remove cloud executor idempotency test 2026-04-27 17:51:28 -07:00
Michael Zeng
350b2679b4 Use per-launch idempotency for cloud executor registration 2026-04-27 17:51:28 -07:00
Michael Zeng
07182cdc04 Add cloud executor registration to exec-server 2026-04-27 17:51:26 -07:00
8 changed files with 467 additions and 9 deletions

3
codex-rs/Cargo.lock generated
View File

@@ -2610,6 +2610,7 @@ dependencies = [
"codex-app-server-protocol",
"codex-client",
"codex-file-system",
"codex-login",
"codex-protocol",
"codex-sandboxing",
"codex-test-binary-support",
@@ -2622,6 +2623,7 @@ dependencies = [
"serde",
"serde_json",
"serial_test",
"sha2",
"tempfile",
"test-case",
"thiserror 2.0.18",
@@ -2630,6 +2632,7 @@ dependencies = [
"tokio-util",
"tracing",
"uuid",
"wiremock",
]
[[package]]

View File

@@ -445,12 +445,20 @@ struct AppServerCommand {
#[derive(Debug, Parser)]
struct ExecServerCommand {
/// Transport endpoint URL. Supported values: `ws://IP:PORT` (default).
#[arg(
long = "listen",
value_name = "URL",
default_value = "ws://127.0.0.1:0"
)]
listen: String,
#[arg(long = "listen", value_name = "URL", conflicts_with = "remote")]
listen: Option<String>,
/// Register this exec-server as a remote executor using the given base URL.
#[arg(long = "remote", value_name = "URL")]
remote: Option<String>,
/// Existing environment id to attach to. Omit to let the service create one.
#[arg(long = "environment-id", value_name = "ID")]
environment_id: Option<String>,
/// Human-readable executor name.
#[arg(long = "name", value_name = "NAME")]
name: Option<String>,
}
#[derive(Debug, clap::Subcommand)]
@@ -1155,7 +1163,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
root_remote_auth_token_env.as_deref(),
"exec-server",
)?;
run_exec_server_command(cmd, &arg0_paths).await?;
run_exec_server_command(cmd, &arg0_paths, root_config_overrides.clone()).await?;
}
Some(Subcommand::Features(FeaturesCli { sub })) => match sub {
FeaturesSubcommand::List => {
@@ -1230,6 +1238,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
async fn run_exec_server_command(
cmd: ExecServerCommand,
arg0_paths: &Arg0DispatchPaths,
root_config_overrides: CliConfigOverrides,
) -> anyhow::Result<()> {
let codex_self_exe = arg0_paths
.codex_self_exe
@@ -1239,7 +1248,29 @@ async fn run_exec_server_command(
codex_self_exe,
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
codex_exec_server::run_main(&cmd.listen, runtime_paths)
if let Some(base_url) = cmd.remote {
// `-c` overrides can affect auth loading (for example ChatGPT base URL
// or credential store settings), so load config before constructing the
// auth manager for remote registration.
let cli_overrides = root_config_overrides
.parse_overrides()
.map_err(anyhow::Error::msg)?;
let config = Config::load_with_cli_overrides(cli_overrides).await?;
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
let mut remote_config = codex_exec_server::RemoteExecutorConfig::new(base_url);
remote_config.environment_id = cmd.environment_id;
if let Some(name) = cmd.name {
remote_config.name = name;
}
codex_exec_server::run_remote_executor(remote_config, auth_manager, runtime_paths).await?;
return Ok(());
}
let listen_url = cmd
.listen
.as_deref()
.unwrap_or(codex_exec_server::DEFAULT_LISTEN_URL);
codex_exec_server::run_main(listen_url, runtime_paths)
.await
.map_err(anyhow::Error::from_boxed)
}

View File

@@ -18,14 +18,16 @@ bytes = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-client = { workspace = true }
codex-file-system = { workspace = true }
codex-login = { workspace = true }
codex-protocol = { workspace = true }
codex-sandboxing = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-pty = { workspace = true }
futures = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls", "stream"] }
reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = [
"fs",
@@ -51,3 +53,4 @@ pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }
test-case = "3.3.1"
wiremock = { workspace = true }

View File

@@ -22,6 +22,11 @@ the wire.
The CLI entrypoint supports:
- `ws://IP:PORT` (default)
- `--remote URL [--environment-id ID] [--name NAME]`
Remote mode registers the local exec-server with the environment service,
then reconnects to the service-provided rendezvous websocket as the executor.
It requires ChatGPT auth.
Wire framing:
@@ -308,6 +313,8 @@ The crate exports:
- `DEFAULT_LISTEN_URL` and `ExecServerListenUrlParseError`
- `ExecServerRuntimePaths`
- `run_main()` for embedding the websocket server
- `RemoteExecutorConfig` and `run_remote_executor()` for embedding remote
registration mode
Callers must pass `ExecServerRuntimePaths` to `run_main()`. The top-level
`codex exec-server` command builds these paths from the `codex` arg0 dispatch

View File

@@ -254,6 +254,18 @@ pub enum ExecServerError {
Protocol(String),
#[error("exec-server rejected request ({code}): {message}")]
Server { code: i64, message: String },
#[error("cloud environments request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())]
CloudEnvironmentHttp {
status: reqwest::StatusCode,
code: Option<String>,
message: String,
},
#[error("cloud environment configuration error: {0}")]
CloudEnvironmentConfig(String),
#[error("cloud environment authentication error: {0}")]
CloudEnvironmentAuth(String),
#[error("cloud environments request failed: {0}")]
CloudEnvironmentRequest(#[from] reqwest::Error),
}
impl ExecServerClient {

View File

@@ -0,0 +1,398 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use reqwest::StatusCode;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use sha2::Digest as _;
use tokio::time::sleep;
use tokio_tungstenite::connect_async;
use tracing::info;
use tracing::warn;
use uuid::Uuid;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::connection::JsonRpcConnection;
use crate::server::ConnectionProcessor;
const PROTOCOL_VERSION: &str = "codex-exec-server-v1";
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
#[derive(Clone)]
struct CloudEnvironmentClient {
base_url: String,
http: reqwest::Client,
auth_manager: Arc<AuthManager>,
}
impl std::fmt::Debug for CloudEnvironmentClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CloudEnvironmentClient")
.field("base_url", &self.base_url)
.finish_non_exhaustive()
}
}
impl CloudEnvironmentClient {
fn new(base_url: String, auth_manager: Arc<AuthManager>) -> Result<Self, ExecServerError> {
let base_url = normalize_base_url(base_url)?;
Ok(Self {
base_url,
http: reqwest::Client::new(),
auth_manager,
})
}
async fn register_executor(
&self,
request: &CloudEnvironmentRegisterExecutorRequest,
) -> Result<CloudEnvironmentExecutorRegistrationResponse, ExecServerError> {
self.post_json("/api/cloud/executor", request).await
}
async fn post_json<T, R>(&self, path: &str, request: &T) -> Result<R, ExecServerError>
where
T: Serialize + Sync,
R: for<'de> Deserialize<'de>,
{
for attempt in 0..=1 {
let auth = cloud_environment_chatgpt_auth(&self.auth_manager).await?;
let response = self
.http
.post(endpoint_url(&self.base_url, path))
.bearer_auth(chatgpt_bearer_token(&auth)?)
.header("chatgpt-account-id", chatgpt_account_id(&auth)?)
.json(request)
.send()
.await?;
if response.status().is_success() {
return response.json::<R>().await.map_err(ExecServerError::from);
}
let status = response.status();
let body = response.text().await.unwrap_or_default();
if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN)
&& attempt == 0
&& recover_unauthorized(&self.auth_manager).await
{
continue;
}
return Err(cloud_http_error(status, &body));
}
unreachable!("cloud environments request loop is bounded to two attempts")
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
struct CloudEnvironmentRegisterExecutorRequest {
idempotency_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
environment_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>,
labels: BTreeMap<String, String>,
metadata: Value,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
struct CloudEnvironmentExecutorRegistrationResponse {
id: String,
environment_id: String,
url: String,
}
/// Configuration for registering an exec-server for remote use.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RemoteExecutorConfig {
pub base_url: String,
pub environment_id: Option<String>,
pub name: String,
}
impl RemoteExecutorConfig {
pub fn new(base_url: String) -> Self {
Self {
base_url,
environment_id: None,
name: "codex-exec-server".to_string(),
}
}
fn registration_request(
&self,
auth: &CodexAuth,
registration_id: Uuid,
) -> Result<CloudEnvironmentRegisterExecutorRequest, ExecServerError> {
Ok(CloudEnvironmentRegisterExecutorRequest {
idempotency_id: self.default_idempotency_id(auth, registration_id)?,
environment_id: self.environment_id.clone(),
name: Some(self.name.clone()),
labels: BTreeMap::new(),
metadata: Value::Object(Default::default()),
})
}
fn default_idempotency_id(
&self,
auth: &CodexAuth,
registration_id: Uuid,
) -> Result<String, ExecServerError> {
let mut hasher = sha2::Sha256::new();
hasher.update(chatgpt_account_id(auth)?.as_bytes());
hasher.update(b"\0");
hasher.update(self.environment_id.as_deref().unwrap_or("auto"));
hasher.update(b"\0");
hasher.update(self.name.as_bytes());
hasher.update(b"\0");
hasher.update(PROTOCOL_VERSION);
hasher.update(b"\0");
hasher.update(registration_id.as_bytes());
let digest = hasher.finalize();
Ok(format!("codex-exec-server-{digest:x}"))
}
}
/// Register an exec-server for remote use and serve requests over the returned
/// rendezvous websocket.
pub async fn run_remote_executor(
config: RemoteExecutorConfig,
auth_manager: Arc<AuthManager>,
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), ExecServerError> {
let client = CloudEnvironmentClient::new(config.base_url.clone(), auth_manager.clone())?;
let processor = ConnectionProcessor::new(runtime_paths);
let registration_id = Uuid::new_v4();
let mut backoff = Duration::from_secs(1);
loop {
let auth = cloud_environment_chatgpt_auth(&auth_manager).await?;
let request = config.registration_request(&auth, registration_id)?;
let response = client.register_executor(&request).await?;
eprintln!(
"codex exec-server remote executor {} registered in environment {}",
response.id, response.environment_id
);
match connect_async(response.url.as_str()).await {
Ok((websocket, _)) => {
backoff = Duration::from_secs(1);
processor
.run_connection(JsonRpcConnection::from_websocket(
websocket,
"cloud exec-server websocket".to_string(),
))
.await;
}
Err(err) => {
warn!("failed to connect cloud exec-server websocket: {err}");
}
}
sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(30));
}
}
async fn cloud_environment_chatgpt_auth(
auth_manager: &AuthManager,
) -> Result<CodexAuth, ExecServerError> {
let mut reloaded = false;
let auth = loop {
let Some(auth) = auth_manager.auth().await else {
if reloaded {
return Err(ExecServerError::CloudEnvironmentAuth(
"cloud environments require ChatGPT authentication".to_string(),
));
}
auth_manager.reload().await;
reloaded = true;
continue;
};
if !auth.is_chatgpt_auth() {
return Err(ExecServerError::CloudEnvironmentAuth(
"cloud environments require ChatGPT authentication; API key auth is not supported"
.to_string(),
));
}
if auth.get_account_id().is_none() && !reloaded {
auth_manager.reload().await;
reloaded = true;
continue;
}
break auth;
};
let _ = chatgpt_bearer_token(&auth)?;
let _ = chatgpt_account_id(&auth)?;
Ok(auth)
}
fn chatgpt_bearer_token(auth: &CodexAuth) -> Result<String, ExecServerError> {
auth.get_token()
.map_err(|err| ExecServerError::CloudEnvironmentAuth(err.to_string()))
.and_then(|token| {
if token.is_empty() {
Err(ExecServerError::CloudEnvironmentAuth(
"cloud environments require a non-empty ChatGPT bearer token".to_string(),
))
} else {
Ok(token)
}
})
}
fn chatgpt_account_id(auth: &CodexAuth) -> Result<String, ExecServerError> {
auth.get_account_id().ok_or_else(|| {
ExecServerError::CloudEnvironmentAuth(
"cloud environments are waiting for a ChatGPT account id".to_string(),
)
})
}
async fn recover_unauthorized(auth_manager: &Arc<AuthManager>) -> bool {
let mut recovery = auth_manager.unauthorized_recovery();
if !recovery.has_next() {
return false;
}
let mode = recovery.mode_name();
let step = recovery.step_name();
match recovery.next().await {
Ok(step_result) => {
info!(
"cloud environment auth recovery succeeded: mode={mode}, step={step}, auth_state_changed={:?}",
step_result.auth_state_changed()
);
true
}
Err(err) => {
warn!("cloud environment auth recovery failed: mode={mode}, step={step}: {err}");
false
}
}
}
#[derive(Deserialize)]
struct CloudErrorBody {
error: Option<CloudError>,
}
#[derive(Deserialize)]
struct CloudError {
code: Option<String>,
message: Option<String>,
}
fn normalize_base_url(base_url: String) -> Result<String, ExecServerError> {
let trimmed = base_url.trim().trim_end_matches('/').to_string();
if trimmed.is_empty() {
return Err(ExecServerError::CloudEnvironmentConfig(
"cloud environments base URL is required".to_string(),
));
}
Ok(trimmed)
}
fn endpoint_url(base_url: &str, path: &str) -> String {
format!("{base_url}/{}", path.trim_start_matches('/'))
}
fn cloud_http_error(status: StatusCode, body: &str) -> ExecServerError {
let parsed = serde_json::from_str::<CloudErrorBody>(body).ok();
let (code, message) = parsed
.and_then(|body| body.error)
.map(|error| {
(
error.code,
error.message.unwrap_or_else(|| {
preview_error_body(body).unwrap_or_else(|| "empty error body".to_string())
}),
)
})
.unwrap_or_else(|| {
(
None,
preview_error_body(body)
.unwrap_or_else(|| "empty or malformed error body".to_string()),
)
});
ExecServerError::CloudEnvironmentHttp {
status,
code,
message,
}
}
fn preview_error_body(body: &str) -> Option<String> {
let trimmed = body.trim();
if trimmed.is_empty() {
return None;
}
Some(trimmed.chars().take(ERROR_BODY_PREVIEW_BYTES).collect())
}
#[cfg(test)]
mod tests {
use codex_login::CodexAuth;
use pretty_assertions::assert_eq;
use serde_json::json;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::body_json;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::*;
fn auth_manager() -> Arc<AuthManager> {
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing())
}
#[tokio::test]
async fn register_executor_posts_with_chatgpt_auth_headers() {
let server = MockServer::start().await;
let registration_id = Uuid::from_u128(1);
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
let request = RemoteExecutorConfig::new(server.uri())
.registration_request(&auth, registration_id)
.expect("registration request");
let expected_request = serde_json::to_value(&request).expect("serialize request");
Mock::given(method("POST"))
.and(path("/api/cloud/executor"))
.and(header("authorization", "Bearer Access Token"))
.and(header("chatgpt-account-id", "account_id"))
.and(body_json(expected_request))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": "exec-1",
"environment_id": "env-1",
"url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc"
})))
.mount(&server)
.await;
let client = CloudEnvironmentClient::new(server.uri(), auth_manager()).expect("client");
let response = client
.register_executor(&request)
.await
.expect("register executor");
assert_eq!(
response,
CloudEnvironmentExecutorRegistrationResponse {
id: "exec-1".to_string(),
environment_id: "env-1".to_string(),
url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(),
}
);
}
}

View File

@@ -1,5 +1,6 @@
mod client;
mod client_api;
mod cloud;
mod connection;
mod environment;
mod fs_helper;
@@ -24,6 +25,8 @@ pub use client::http_client::ReqwestHttpClient;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::HttpClient;
pub use client_api::RemoteExecServerConnectArgs;
pub use cloud::RemoteExecutorConfig;
pub use cloud::run_remote_executor;
pub use codex_file_system::CopyOptions;
pub use codex_file_system::CreateDirectoryOptions;
pub use codex_file_system::ExecutorFileSystem;

View File

@@ -7,6 +7,7 @@ mod session_registry;
mod transport;
pub(crate) use handler::ExecServerHandler;
pub(crate) use processor::ConnectionProcessor;
pub use transport::DEFAULT_LISTEN_URL;
pub use transport::ExecServerListenUrlParseError;