feat(exec-server): support executor registry environments

This commit is contained in:
Michael Zeng
2026-05-06 01:03:52 -07:00
parent f9a907aebe
commit c07b2cb9c7
2 changed files with 102 additions and 78 deletions

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
@@ -36,7 +37,7 @@ pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
#[derive(Debug)]
pub struct EnvironmentManager {
default_environment: Option<String>,
environments: HashMap<String, Arc<Environment>>,
environments: RwLock<HashMap<String, Arc<Environment>>>,
local_environment: Arc<Environment>,
}
@@ -61,10 +62,10 @@ impl EnvironmentManager {
pub fn default_for_tests() -> Self {
Self {
default_environment: Some(LOCAL_ENVIRONMENT_ID.to_string()),
environments: HashMap::from([(
environments: RwLock::new(HashMap::from([(
LOCAL_ENVIRONMENT_ID.to_string(),
Arc::new(Environment::default_for_tests()),
)]),
)])),
local_environment: Arc::new(Environment::default_for_tests()),
}
}
@@ -160,7 +161,7 @@ impl EnvironmentManager {
Self {
default_environment,
environments,
environments: RwLock::new(environments),
local_environment,
}
}
@@ -184,7 +185,45 @@ impl EnvironmentManager {
/// Returns a named environment instance.
pub fn get_environment(&self, environment_id: &str) -> Option<Arc<Environment>> {
self.environments.get(environment_id).cloned()
self.environments
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(environment_id)
.cloned()
}
/// Adds or replaces a named remote environment without changing the
/// manager's default environment selection.
pub fn upsert_remote_environment(
&self,
environment_id: String,
exec_server_url: String,
) -> Result<(), ExecServerError> {
if environment_id.is_empty() {
return Err(ExecServerError::Protocol(
"environment id cannot be empty".to_string(),
));
}
let (exec_server_url, disabled) = normalize_exec_server_url(Some(exec_server_url));
if disabled {
return Err(ExecServerError::Protocol(
"remote environment cannot use disabled exec-server url".to_string(),
));
}
let Some(exec_server_url) = exec_server_url else {
return Err(ExecServerError::Protocol(
"remote environment requires an exec-server url".to_string(),
));
};
let environment = Environment::remote_inner(
exec_server_url,
self.local_environment.local_runtime_paths.clone(),
);
self.environments
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(environment_id, Arc::new(environment));
Ok(())
}
}
@@ -528,6 +567,45 @@ mod tests {
assert!(manager.get_environment("does-not-exist").is_none());
}
#[tokio::test]
async fn environment_manager_upserts_named_remote_environment() {
let manager = EnvironmentManager::disabled_for_tests(test_runtime_paths());
manager
.upsert_remote_environment("executor-a".to_string(), "ws://127.0.0.1:8765".to_string())
.expect("remote environment");
let first = manager
.get_environment("executor-a")
.expect("first remote environment");
assert!(first.is_remote());
assert_eq!(first.exec_server_url(), Some("ws://127.0.0.1:8765"));
assert_eq!(manager.default_environment_id(), None);
manager
.upsert_remote_environment("executor-a".to_string(), "ws://127.0.0.1:9876".to_string())
.expect("updated remote environment");
let second = manager
.get_environment("executor-a")
.expect("second remote environment");
assert!(second.is_remote());
assert_eq!(second.exec_server_url(), Some("ws://127.0.0.1:9876"));
assert!(!Arc::ptr_eq(&first, &second));
}
#[tokio::test]
async fn environment_manager_rejects_empty_remote_environment_url() {
let manager = EnvironmentManager::disabled_for_tests(test_runtime_paths());
let err = manager
.upsert_remote_environment("executor-a".to_string(), String::new())
.expect_err("empty URL should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: remote environment requires an exec-server url"
);
}
#[tokio::test]
async fn default_environment_has_ready_local_executor() {
let environment = Environment::default_for_tests();

View File

@@ -1,16 +1,11 @@
use std::collections::BTreeMap;
use std::env;
use std::time::Duration;
use reqwest::StatusCode;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use sha2::Digest as _;
use tokio::time::sleep;
use tokio_tungstenite::connect_async;
use tracing::warn;
use uuid::Uuid;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
@@ -20,7 +15,6 @@ use crate::server::ConnectionProcessor;
pub const CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR: &str =
"CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN";
const PROTOCOL_VERSION: &str = "codex-exec-server-v1";
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
#[derive(Clone)]
@@ -51,28 +45,27 @@ impl ExecutorRegistryClient {
async fn register_executor(
&self,
request: &ExecutorRegistryRegisterExecutorRequest,
executor_id: &str,
) -> Result<ExecutorRegistryExecutorRegistrationResponse, ExecServerError> {
self.post_json(
&format!("/cloud/executor/{}/register", request.executor_id),
request,
)
.await
}
async fn post_json<T, R>(&self, path: &str, request: &T) -> Result<R, ExecServerError>
where
T: Serialize + Sync,
R: for<'de> Deserialize<'de>,
{
let response = self
.http
.post(endpoint_url(&self.base_url, path))
.post(endpoint_url(
&self.base_url,
&format!("/cloud/executor/{executor_id}/register"),
))
.bearer_auth(&self.bearer_token)
.json(request)
.send()
.await?;
self.parse_json_response(response).await
}
async fn parse_json_response<R>(
&self,
response: reqwest::Response,
) -> Result<R, ExecServerError>
where
R: for<'de> Deserialize<'de>,
{
if response.status().is_success() {
return response.json::<R>().await.map_err(ExecServerError::from);
}
@@ -87,19 +80,8 @@ impl ExecutorRegistryClient {
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
struct ExecutorRegistryRegisterExecutorRequest {
idempotency_id: String,
executor_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>,
labels: BTreeMap<String, String>,
metadata: Value,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
struct ExecutorRegistryExecutorRegistrationResponse {
id: String,
executor_id: String,
url: String,
}
@@ -143,32 +125,6 @@ impl RemoteExecutorConfig {
bearer_token,
})
}
fn registration_request(
&self,
registration_id: Uuid,
) -> ExecutorRegistryRegisterExecutorRequest {
ExecutorRegistryRegisterExecutorRequest {
idempotency_id: self.default_idempotency_id(registration_id),
executor_id: self.executor_id.clone(),
name: Some(self.name.clone()),
labels: BTreeMap::new(),
metadata: Value::Object(Default::default()),
}
}
fn default_idempotency_id(&self, registration_id: Uuid) -> String {
let mut hasher = sha2::Sha256::new();
hasher.update(self.executor_id.as_bytes());
hasher.update(b"\0");
hasher.update(self.name.as_bytes());
hasher.update(b"\0");
hasher.update(PROTOCOL_VERSION);
hasher.update(b"\0");
hasher.update(registration_id.as_bytes());
let digest = hasher.finalize();
format!("codex-exec-server-{digest:x}")
}
}
/// Register an exec-server for remote use and serve requests over the returned
@@ -179,15 +135,13 @@ pub async fn run_remote_executor(
) -> Result<(), ExecServerError> {
let client = ExecutorRegistryClient::new(config.base_url.clone(), config.bearer_token.clone())?;
let processor = ConnectionProcessor::new(runtime_paths);
let registration_id = Uuid::new_v4();
let mut backoff = Duration::from_secs(1);
loop {
let request = config.registration_request(registration_id);
let response = client.register_executor(&request).await?;
let response = client.register_executor(&config.executor_id).await?;
eprintln!(
"codex exec-server remote executor {} registered with executor_id {}",
response.id, response.executor_id
"codex exec-server remote executor registered with executor_id {}",
response.executor_id
);
match connect_async(response.url.as_str()).await {
@@ -323,11 +277,9 @@ fn preview_error_body(body: &str) -> Option<String> {
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use serde_json::json;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::body_json;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
@@ -337,21 +289,16 @@ mod tests {
#[tokio::test]
async fn register_executor_posts_with_bearer_token_header() {
let server = MockServer::start().await;
let registration_id = Uuid::from_u128(1);
let config = RemoteExecutorConfig::with_bearer_token(
server.uri(),
"exec-requested".to_string(),
"registry-token".to_string(),
)
.expect("config");
let request = config.registration_request(registration_id);
let expected_request = serde_json::to_value(&request).expect("serialize request");
Mock::given(method("POST"))
.and(path("/cloud/executor/exec-requested/register"))
.and(header("authorization", "Bearer registry-token"))
.and(body_json(expected_request))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": "registration-1",
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"executor_id": "exec-1",
"url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc"
})))
@@ -361,14 +308,13 @@ mod tests {
.expect("client");
let response = client
.register_executor(&request)
.register_executor(&config.executor_id)
.await
.expect("register executor");
assert_eq!(
response,
ExecutorRegistryExecutorRegistrationResponse {
id: "registration-1".to_string(),
executor_id: "exec-1".to_string(),
url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(),
}