Rename remote registry contract to environment

This commit is contained in:
Michael Zeng
2026-05-17 02:35:40 -07:00
parent 3094e2b6b0
commit 4fabb56c82
7 changed files with 97 additions and 91 deletions

View File

@@ -480,15 +480,15 @@ struct ExecServerCommand {
#[arg(long = "listen", value_name = "URL", conflicts_with = "remote")]
listen: Option<String>,
/// Register this exec-server as a remote executor using the given base URL.
#[arg(long = "remote", value_name = "URL", requires = "executor_id")]
/// Register this exec-server as a remote environment using the given base URL.
#[arg(long = "remote", value_name = "URL", requires = "environment_id")]
remote: Option<String>,
/// Executor id to attach to when registering remotely.
#[arg(long = "executor-id", value_name = "ID")]
executor_id: Option<String>,
/// Environment id to attach to when registering remotely.
#[arg(long = "environment-id", value_name = "ID")]
environment_id: Option<String>,
/// Human-readable executor name.
/// Human-readable environment name.
#[arg(long = "name", value_name = "NAME")]
name: Option<String>,
@@ -1509,21 +1509,24 @@ async fn run_exec_server_command(
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
if let Some(base_url) = cmd.remote {
let executor_id = cmd
.executor_id
.ok_or_else(|| anyhow::anyhow!("--executor-id is required when --remote is set"))?;
let environment_id = cmd
.environment_id
.ok_or_else(|| anyhow::anyhow!("--environment-id is required when --remote is set"))?;
let auth_provider = load_exec_server_remote_auth_provider(
root_config_overrides,
config_profile,
cmd.use_agent_identity_auth,
)
.await?;
let mut remote_config =
codex_exec_server::RemoteExecutorConfig::new(base_url, executor_id, auth_provider)?;
let mut remote_config = codex_exec_server::RemoteEnvironmentConfig::new(
base_url,
environment_id,
auth_provider,
)?;
if let Some(name) = cmd.name {
remote_config.name = name;
}
codex_exec_server::run_remote_executor(remote_config, runtime_paths).await?;
codex_exec_server::run_remote_environment(remote_config, runtime_paths).await?;
return Ok(());
}
let listen_url = cmd

View File

@@ -261,18 +261,18 @@ pub enum ExecServerError {
Protocol(String),
#[error("exec-server rejected request ({code}): {message}")]
Server { code: i64, message: String },
#[error("executor registry request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())]
ExecutorRegistryHttp {
#[error("environment registry request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())]
EnvironmentRegistryHttp {
status: reqwest::StatusCode,
code: Option<String>,
message: String,
},
#[error("executor registry configuration error: {0}")]
ExecutorRegistryConfig(String),
#[error("executor registry authentication error: {0}")]
ExecutorRegistryAuth(String),
#[error("executor registry request failed: {0}")]
ExecutorRegistryRequest(#[from] reqwest::Error),
#[error("environment registry configuration error: {0}")]
EnvironmentRegistryConfig(String),
#[error("environment registry authentication error: {0}")]
EnvironmentRegistryAuth(String),
#[error("environment registry request failed: {0}")]
EnvironmentRegistryRequest(#[from] reqwest::Error),
}
impl ExecServerClient {

View File

@@ -91,8 +91,8 @@ pub use protocol::TerminateResponse;
pub use protocol::WriteParams;
pub use protocol::WriteResponse;
pub use protocol::WriteStatus;
pub use remote::RemoteExecutorConfig;
pub use remote::run_remote_executor;
pub use remote::RemoteEnvironmentConfig;
pub use remote::run_remote_environment;
pub use runtime_paths::ExecServerRuntimePaths;
pub use server::DEFAULT_LISTEN_URL;
pub use server::ExecServerListenUrlParseError;

View File

@@ -24,7 +24,7 @@ pub struct StartedExecProcess {
/// stdout, stderr, or pty bytes. `Exited` reports the process exit status, while
/// `Closed` means all output streams have ended and no more output events will
/// arrive. `Failed` is used when the process session cannot continue, for
/// example because the remote executor connection disconnected.
/// example because the remote environment connection disconnected.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecProcessEvent {
Output(ProcessOutputChunk),

View File

@@ -333,7 +333,7 @@ pub(crate) async fn run_multiplexed_executor<S>(
continue;
}
Some(Err(err)) => {
debug!("multiplexed executor websocket read failed: {err}");
debug!("multiplexed environment websocket read failed: {err}");
break;
}
};

View File

@@ -17,22 +17,22 @@ use crate::server::ConnectionProcessor;
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
#[derive(Clone)]
struct ExecutorRegistryClient {
struct EnvironmentRegistryClient {
base_url: String,
auth_provider: SharedAuthProvider,
http: reqwest::Client,
}
impl std::fmt::Debug for ExecutorRegistryClient {
impl std::fmt::Debug for EnvironmentRegistryClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExecutorRegistryClient")
f.debug_struct("EnvironmentRegistryClient")
.field("base_url", &self.base_url)
.field("auth_provider", &"<redacted>")
.finish_non_exhaustive()
}
}
impl ExecutorRegistryClient {
impl EnvironmentRegistryClient {
fn new(base_url: String, auth_provider: SharedAuthProvider) -> Result<Self, ExecServerError> {
let base_url = normalize_base_url(base_url)?;
Ok(Self {
@@ -42,15 +42,15 @@ impl ExecutorRegistryClient {
})
}
async fn register_executor(
async fn register_environment(
&self,
executor_id: &str,
) -> Result<ExecutorRegistryExecutorRegistrationResponse, ExecServerError> {
environment_id: &str,
) -> Result<EnvironmentRegistryEnvironmentRegistrationResponse, ExecServerError> {
let response = self
.http
.post(endpoint_url(
&self.base_url,
&format!("/cloud/executor/{executor_id}/register"),
&format!("/cloud/environment/{environment_id}/register"),
))
.headers(self.auth_provider.to_auth_headers())
.send()
@@ -72,49 +72,49 @@ impl ExecutorRegistryClient {
let status = response.status();
let body = response.text().await.unwrap_or_default();
if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN) {
return Err(executor_registry_auth_error(status, &body));
return Err(environment_registry_auth_error(status, &body));
}
Err(executor_registry_http_error(status, &body))
Err(environment_registry_http_error(status, &body))
}
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
struct ExecutorRegistryExecutorRegistrationResponse {
executor_id: String,
struct EnvironmentRegistryEnvironmentRegistrationResponse {
environment_id: String,
url: String,
}
/// Configuration for registering an exec-server for remote use.
#[derive(Clone)]
pub struct RemoteExecutorConfig {
pub struct RemoteEnvironmentConfig {
pub base_url: String,
pub executor_id: String,
pub environment_id: String,
pub name: String,
auth_provider: SharedAuthProvider,
}
impl std::fmt::Debug for RemoteExecutorConfig {
impl std::fmt::Debug for RemoteEnvironmentConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteExecutorConfig")
f.debug_struct("RemoteEnvironmentConfig")
.field("base_url", &self.base_url)
.field("executor_id", &self.executor_id)
.field("environment_id", &self.environment_id)
.field("name", &self.name)
.field("auth_provider", &"<redacted>")
.finish()
}
}
impl RemoteExecutorConfig {
impl RemoteEnvironmentConfig {
pub fn new(
base_url: String,
executor_id: String,
environment_id: String,
auth_provider: SharedAuthProvider,
) -> Result<Self, ExecServerError> {
let executor_id = normalize_executor_id(executor_id)?;
let environment_id = normalize_environment_id(environment_id)?;
Ok(Self {
base_url,
executor_id,
environment_id,
name: "codex-exec-server".to_string(),
auth_provider,
})
@@ -123,21 +123,21 @@ impl RemoteExecutorConfig {
/// Register an exec-server for remote use and serve requests over the returned
/// rendezvous websocket.
pub async fn run_remote_executor(
config: RemoteExecutorConfig,
pub async fn run_remote_environment(
config: RemoteEnvironmentConfig,
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), ExecServerError> {
ensure_rustls_crypto_provider();
let client =
ExecutorRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?;
EnvironmentRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?;
let processor = ConnectionProcessor::new(runtime_paths);
let mut backoff = Duration::from_secs(1);
loop {
let response = client.register_executor(&config.executor_id).await?;
let response = client.register_environment(&config.environment_id).await?;
eprintln!(
"codex exec-server remote executor registered with executor_id {}",
response.executor_id
"codex exec-server remote environment registered with environment_id {}",
response.environment_id
);
match connect_async(response.url.as_str()).await {
@@ -155,14 +155,14 @@ pub async fn run_remote_executor(
}
}
fn normalize_executor_id(executor_id: String) -> Result<String, ExecServerError> {
let executor_id = executor_id.trim().to_string();
if executor_id.is_empty() {
return Err(ExecServerError::ExecutorRegistryConfig(
"executor id is required for remote exec-server registration".to_string(),
fn normalize_environment_id(environment_id: String) -> Result<String, ExecServerError> {
let environment_id = environment_id.trim().to_string();
if environment_id.is_empty() {
return Err(ExecServerError::EnvironmentRegistryConfig(
"environment id is required for remote exec-server registration".to_string(),
));
}
Ok(executor_id)
Ok(environment_id)
}
#[derive(Deserialize)]
@@ -179,8 +179,8 @@ struct RegistryError {
fn normalize_base_url(base_url: String) -> Result<String, ExecServerError> {
let trimmed = base_url.trim().trim_end_matches('/').to_string();
if trimmed.is_empty() {
return Err(ExecServerError::ExecutorRegistryConfig(
"executor registry base URL is required".to_string(),
return Err(ExecServerError::EnvironmentRegistryConfig(
"environment registry base URL is required".to_string(),
));
}
Ok(trimmed)
@@ -190,14 +190,14 @@ fn endpoint_url(base_url: &str, path: &str) -> String {
format!("{base_url}/{}", path.trim_start_matches('/'))
}
fn executor_registry_auth_error(status: StatusCode, body: &str) -> ExecServerError {
fn environment_registry_auth_error(status: StatusCode, body: &str) -> ExecServerError {
let message = registry_error_message(body).unwrap_or_else(|| "empty error body".to_string());
ExecServerError::ExecutorRegistryAuth(format!(
"executor registry authentication failed ({status}): {message}"
ExecServerError::EnvironmentRegistryAuth(format!(
"environment registry authentication failed ({status}): {message}"
))
}
fn executor_registry_http_error(status: StatusCode, body: &str) -> ExecServerError {
fn environment_registry_http_error(status: StatusCode, body: &str) -> ExecServerError {
let parsed = serde_json::from_str::<RegistryErrorBody>(body).ok();
let (code, message) = parsed
.and_then(|body| body.error)
@@ -216,7 +216,7 @@ fn executor_registry_http_error(status: StatusCode, body: &str) -> ExecServerErr
.unwrap_or_else(|| "empty or malformed error body".to_string()),
)
});
ExecServerError::ExecutorRegistryHttp {
ExecServerError::EnvironmentRegistryHttp {
status,
code,
message,
@@ -277,44 +277,45 @@ mod tests {
}
#[tokio::test]
async fn register_executor_posts_with_auth_provider_headers() {
async fn register_environment_posts_with_auth_provider_headers() {
let server = MockServer::start().await;
let config = RemoteExecutorConfig::new(
let config = RemoteEnvironmentConfig::new(
server.uri(),
"exec-requested".to_string(),
static_registry_auth_provider(),
)
.expect("config");
Mock::given(method("POST"))
.and(path("/cloud/executor/exec-requested/register"))
.and(path("/cloud/environment/exec-requested/register"))
.and(header("authorization", "Bearer registry-token"))
.and(header("chatgpt-account-id", "workspace-123"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"executor_id": "exec-1",
"url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc"
"environment_id": "exec-1",
"url": "wss://rendezvous.test/environment/exec-1?role=environment&sig=abc"
})))
.mount(&server)
.await;
let client = ExecutorRegistryClient::new(server.uri(), static_registry_auth_provider())
let client = EnvironmentRegistryClient::new(server.uri(), static_registry_auth_provider())
.expect("client");
let response = client
.register_executor(&config.executor_id)
.register_environment(&config.environment_id)
.await
.expect("register executor");
.expect("register environment");
assert_eq!(
response,
ExecutorRegistryExecutorRegistrationResponse {
executor_id: "exec-1".to_string(),
url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(),
EnvironmentRegistryEnvironmentRegistrationResponse {
environment_id: "exec-1".to_string(),
url: "wss://rendezvous.test/environment/exec-1?role=environment&sig=abc"
.to_string(),
}
);
}
#[test]
fn debug_output_redacts_auth_provider() {
let config = RemoteExecutorConfig::new(
let config = RemoteEnvironmentConfig::new(
"https://registry.example".to_string(),
"exec-1".to_string(),
static_registry_auth_provider(),

View File

@@ -20,7 +20,7 @@ use codex_app_server_protocol::RequestId;
use codex_exec_server::ExecServerRuntimePaths;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use codex_exec_server::RemoteExecutorConfig;
use codex_exec_server::RemoteEnvironmentConfig;
use futures::SinkExt;
use futures::StreamExt;
use http::HeaderMap;
@@ -45,7 +45,7 @@ use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
const EXECUTOR_ID: &str = "exec-mux-test";
const ENVIRONMENT_ID: &str = "exec-mux-test";
const REGISTRY_TOKEN: &str = "registry-token";
const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
@@ -67,15 +67,17 @@ fn static_registry_auth_provider() -> codex_api::SharedAuthProvider {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Result<()> {
async fn multiplexed_remote_environment_routes_independent_virtual_streams() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let rendezvous_url = format!("ws://{}", listener.local_addr()?);
let registry = MockServer::start().await;
Mock::given(method("POST"))
.and(path(format!("/cloud/executor/{EXECUTOR_ID}/register")))
.and(path(format!(
"/cloud/environment/{ENVIRONMENT_ID}/register"
)))
.and(header("authorization", format!("Bearer {REGISTRY_TOKEN}")))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"executor_id": EXECUTOR_ID,
"environment_id": ENVIRONMENT_ID,
"url": rendezvous_url,
})))
.mount(&registry)
@@ -83,22 +85,22 @@ async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Res
let (codex_exe, codex_linux_sandbox_exe) = common::current_test_binary_helper_paths()?;
let runtime_paths = ExecServerRuntimePaths::new(codex_exe, codex_linux_sandbox_exe)?;
let config = RemoteExecutorConfig::new(
let config = RemoteEnvironmentConfig::new(
registry.uri(),
EXECUTOR_ID.to_string(),
ENVIRONMENT_ID.to_string(),
static_registry_auth_provider(),
)?;
let remote_executor = tokio::spawn(codex_exec_server::run_remote_executor(
let remote_environment = tokio::spawn(codex_exec_server::run_remote_environment(
config,
runtime_paths,
));
let (socket, _peer_addr) = timeout(TEST_TIMEOUT, listener.accept())
.await
.context("remote executor should connect to fake rendezvous")??;
.context("remote environment should connect to fake rendezvous")??;
let mut websocket = timeout(TEST_TIMEOUT, accept_async(socket))
.await
.context("fake rendezvous should accept executor websocket")??;
.context("fake rendezvous should accept environment websocket")??;
let stream_a = "stream-a";
let stream_b = "stream-b";
@@ -192,8 +194,8 @@ async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Res
)?;
websocket.close(None).await?;
remote_executor.abort();
let _ = remote_executor.await;
remote_environment.abort();
let _ = remote_environment.await;
Ok(())
}
@@ -285,7 +287,7 @@ where
let frame = timeout(TEST_TIMEOUT, websocket.next())
.await
.context("timed out waiting for relay frame")?
.ok_or_else(|| anyhow!("executor websocket closed"))??;
.ok_or_else(|| anyhow!("environment websocket closed"))??;
match frame {
Message::Binary(bytes) => {
let frame = RelayMessageFrame::decode(bytes.as_ref())?;
@@ -297,8 +299,8 @@ where
return Ok((stream_id, message));
}
Message::Ping(_) | Message::Pong(_) => {}
Message::Close(_) => bail!("executor websocket closed"),
Message::Text(_) => bail!("executor sent text frame on relay websocket"),
Message::Close(_) => bail!("environment websocket closed"),
Message::Text(_) => bail!("environment sent text frame on relay websocket"),
Message::Frame(_) => {}
}
}