[codex] support executor registry remote environments (#21323)

## Summary

Support registry-backed remote executors end to end so downstream
services can resolve an executor id into an exec-server URL and make
that environment available to Codex without relying on the legacy cloud
environments flow.

## What changed

- switch remote executor registration to the executor registry bootstrap
contract
- allow named remote environments to be inserted into
`EnvironmentManager` at runtime
- add the experimental app-server RPC `environment/add` so initialized
experimental clients can register those remote environments for later
`thread/start` and `turn/start` selection

## Validation

Ran focused validation locally:
- `cargo test -p codex-exec-server environment_manager_`
- `cargo test -p codex-exec-server
register_executor_posts_with_bearer_token_header`
- `cargo test -p codex-app-server-protocol`
This commit is contained in:
Michael Zeng
2026-05-08 16:30:07 -07:00
committed by GitHub
parent 80a408e201
commit 8f4020846e
11 changed files with 222 additions and 85 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2737,7 +2737,6 @@ dependencies = [
"serde",
"serde_json",
"serial_test",
"sha2",
"tempfile",
"test-case",
"thiserror 2.0.18",

View File

@@ -808,6 +808,13 @@ client_request_definitions! {
serialization: None,
response: v2::MockExperimentalMethodResponse,
},
#[experimental("environment/add")]
/// Adds or replaces a remote environment by id for later selection.
EnvironmentAdd => "environment/add" {
params: v2::EnvironmentAddParams,
serialization: global("environment"),
response: v2::EnvironmentAddResponse,
},
McpServerOauthLogin => "mcpServer/oauth/login" {
params: v2::McpServerOauthLoginParams,
@@ -1796,6 +1803,18 @@ mod tests {
add_credits_nudge.serialization_scope(),
Some(ClientRequestSerializationScope::Global("account-auth"))
);
let environment_add = ClientRequest::EnvironmentAdd {
request_id: request_id(),
params: v2::EnvironmentAddParams {
environment_id: "remote-a".to_string(),
exec_server_url: "ws://127.0.0.1:8765".to_string(),
},
};
assert_eq!(
environment_add.serialization_scope(),
Some(ClientRequestSerializationScope::Global("environment"))
);
}
#[test]
@@ -2578,10 +2597,33 @@ mod tests {
Ok(())
}
#[test]
fn serialize_environment_add() -> Result<()> {
let request = ClientRequest::EnvironmentAdd {
request_id: RequestId::Integer(9),
params: v2::EnvironmentAddParams {
environment_id: "remote-a".to_string(),
exec_server_url: "ws://127.0.0.1:8765".to_string(),
},
};
assert_eq!(
json!({
"method": "environment/add",
"id": 9,
"params": {
"environmentId": "remote-a",
"execServerUrl": "ws://127.0.0.1:8765"
}
}),
serde_json::to_value(&request)?,
);
Ok(())
}
#[test]
fn serialize_fs_get_metadata() -> Result<()> {
let request = ClientRequest::FsGetMetadata {
request_id: RequestId::Integer(9),
request_id: RequestId::Integer(10),
params: v2::FsGetMetadataParams {
path: absolute_path("tmp/example"),
},
@@ -2589,7 +2631,7 @@ mod tests {
assert_eq!(
json!({
"method": "fs/getMetadata",
"id": 9,
"id": 10,
"params": {
"path": absolute_path_string("tmp/example")
}
@@ -2850,6 +2892,19 @@ mod tests {
assert_eq!(reason, Some("mock/experimentalMethod"));
}
#[test]
fn environment_add_is_marked_experimental() {
let request = ClientRequest::EnvironmentAdd {
request_id: RequestId::Integer(1),
params: v2::EnvironmentAddParams {
environment_id: "remote-a".to_string(),
exec_server_url: "ws://127.0.0.1:8765".to_string(),
},
};
let reason = crate::experimental_api::ExperimentalApi::experimental_reason(&request);
assert_eq!(reason, Some("environment/add"));
}
#[test]
fn command_exec_permission_profile_is_marked_experimental() {
let request = ClientRequest::OneOffCommandExec {

View File

@@ -0,0 +1,17 @@
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct EnvironmentAddParams {
pub environment_id: String,
pub exec_server_url: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct EnvironmentAddResponse {}

View File

@@ -6,6 +6,7 @@ mod attestation;
mod collaboration_mode;
mod command_exec;
mod config;
mod environment;
mod experimental_feature;
mod feedback;
mod fs;
@@ -31,6 +32,7 @@ pub use attestation::*;
pub use collaboration_mode::*;
pub use command_exec::*;
pub use config::*;
pub use environment::*;
pub use experimental_feature::*;
pub use feedback::*;
pub use fs::*;

View File

@@ -202,6 +202,7 @@ Example with notification opt-out:
- `modelProvider/capabilities/read` — read provider-level capabilities for the currently configured model provider.
- `experimentalFeature/list` — list feature flags with stage metadata (`beta`, `underDevelopment`, `stable`, etc.), enabled/default-enabled state, and cursor pagination. For non-beta flags, `displayName`/`description`/`announcement` are `null`.
- `experimentalFeature/enablement/set` — patch the in-memory process-wide runtime feature enablement for the currently supported feature keys (`apps`, `memories`, `plugins`, `remote_control`, `tool_search`, `tool_suggest`, `tool_call_mcp_elicitation`). For each feature, precedence is: cloud requirements > --enable <feature_name> > config.toml > experimentalFeature/enablement/set (new) > code default.
- `environment/add` — experimental; add or replace a named remote environment by `environmentId` and `execServerUrl` for later selection by `thread/start` or `turn/start`; returns `{}` and does not change the default environment.
- `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination). Built-in presets do not select a model; the Plan preset selects medium reasoning effort. This response omits built-in developer instructions; clients should either pass `settings.developer_instructions: null` when setting a mode to use Codex's built-in instructions, or provide their own instructions explicitly.
- `skills/list` — list skills for one or more `cwd` values (optional `forceReload`).
- `hooks/list` — list discovered hooks for one or more `cwd` values.

View File

@@ -18,6 +18,7 @@ use crate::request_processors::AppsRequestProcessor;
use crate::request_processors::CatalogRequestProcessor;
use crate::request_processors::CommandExecRequestProcessor;
use crate::request_processors::ConfigRequestProcessor;
use crate::request_processors::EnvironmentRequestProcessor;
use crate::request_processors::ExternalAgentConfigRequestProcessor;
use crate::request_processors::FeedbackRequestProcessor;
use crate::request_processors::FsRequestProcessor;
@@ -161,6 +162,7 @@ pub(crate) struct MessageProcessor {
command_exec_processor: CommandExecRequestProcessor,
process_exec_processor: ProcessExecRequestProcessor,
config_processor: ConfigRequestProcessor,
environment_processor: EnvironmentRequestProcessor,
external_agent_config_processor: ExternalAgentConfigRequestProcessor,
feedback_processor: FeedbackRequestProcessor,
fs_processor: FsRequestProcessor,
@@ -446,6 +448,8 @@ impl MessageProcessor {
arg0_paths,
config.codex_home.to_path_buf(),
);
let environment_processor =
EnvironmentRequestProcessor::new(thread_manager.environment_manager());
let fs_processor = FsRequestProcessor::new(
thread_manager
.environment_manager()
@@ -467,6 +471,7 @@ impl MessageProcessor {
command_exec_processor,
process_exec_processor,
config_processor,
environment_processor,
external_agent_config_processor,
feedback_processor,
fs_processor,
@@ -878,6 +883,9 @@ impl MessageProcessor {
.config_requirements_read()
.await
.map(|response| Some(response.into())),
ClientRequest::EnvironmentAdd { params, .. } => {
self.environment_processor.environment_add(params).await
}
ClientRequest::FsReadFile { params, .. } => self
.fs_processor
.read_file(params)

View File

@@ -49,6 +49,8 @@ use codex_app_server_protocol::ConversationGitInfo;
use codex_app_server_protocol::ConversationSummary;
use codex_app_server_protocol::DeprecationNoticeNotification;
use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec;
use codex_app_server_protocol::EnvironmentAddParams;
use codex_app_server_protocol::EnvironmentAddResponse;
use codex_app_server_protocol::ExperimentalFeature as ApiExperimentalFeature;
use codex_app_server_protocol::ExperimentalFeatureListParams;
use codex_app_server_protocol::ExperimentalFeatureListResponse;
@@ -434,6 +436,7 @@ mod apps_processor;
mod catalog_processor;
mod command_exec_processor;
mod config_processor;
mod environment_processor;
mod external_agent_config_processor;
mod feedback_processor;
mod fs_processor;
@@ -454,6 +457,7 @@ pub(crate) use apps_processor::AppsRequestProcessor;
pub(crate) use catalog_processor::CatalogRequestProcessor;
pub(crate) use command_exec_processor::CommandExecRequestProcessor;
pub(crate) use config_processor::ConfigRequestProcessor;
pub(crate) use environment_processor::EnvironmentRequestProcessor;
pub(crate) use external_agent_config_processor::ExternalAgentConfigRequestProcessor;
pub(crate) use feedback_processor::FeedbackRequestProcessor;
pub(crate) use fs_processor::FsRequestProcessor;

View File

@@ -0,0 +1,24 @@
use super::*;
#[derive(Clone)]
pub(crate) struct EnvironmentRequestProcessor {
environment_manager: Arc<EnvironmentManager>,
}
impl EnvironmentRequestProcessor {
pub(crate) fn new(environment_manager: Arc<EnvironmentManager>) -> Self {
Self {
environment_manager,
}
}
pub(crate) async fn environment_add(
&self,
params: EnvironmentAddParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.environment_manager
.upsert_environment(params.environment_id, params.exec_server_url)
.map_err(|err| invalid_request(err.to_string()))?;
Ok(Some(EnvironmentAddResponse {}.into()))
}
}

View File

@@ -26,7 +26,6 @@ futures = { workspace = true }
reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true }
tokio = { workspace = true, features = [

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
@@ -40,7 +41,7 @@ pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
#[derive(Debug)]
pub struct EnvironmentManager {
default_environment: Option<String>,
environments: HashMap<String, Arc<Environment>>,
environments: RwLock<HashMap<String, Arc<Environment>>>,
local_environment: Arc<Environment>,
}
@@ -65,10 +66,10 @@ impl EnvironmentManager {
pub fn default_for_tests() -> Self {
Self {
default_environment: Some(LOCAL_ENVIRONMENT_ID.to_string()),
environments: HashMap::from([(
environments: RwLock::new(HashMap::from([(
LOCAL_ENVIRONMENT_ID.to_string(),
Arc::new(Environment::default_for_tests()),
)]),
)])),
local_environment: Arc::new(Environment::default_for_tests()),
}
}
@@ -77,7 +78,7 @@ impl EnvironmentManager {
pub fn disabled_for_tests(local_runtime_paths: ExecServerRuntimePaths) -> Self {
Self {
default_environment: None,
environments: HashMap::new(),
environments: RwLock::new(HashMap::new()),
local_environment: Arc::new(Environment::local(local_runtime_paths)),
}
}
@@ -196,7 +197,7 @@ impl EnvironmentManager {
};
Ok(Self {
default_environment,
environments: environment_map,
environments: RwLock::new(environment_map),
local_environment,
})
}
@@ -218,10 +219,14 @@ impl EnvironmentManager {
let Some(default_environment_id) = self.default_environment.as_ref() else {
return Vec::new();
};
let mut environment_ids = Vec::with_capacity(self.environments.len());
let environments = self
.environments
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut environment_ids = Vec::with_capacity(environments.len());
environment_ids.push(default_environment_id.clone());
environment_ids.extend(
self.environments
environments
.keys()
.filter(|environment_id| *environment_id != default_environment_id)
.cloned(),
@@ -236,7 +241,45 @@ impl EnvironmentManager {
/// Returns a named environment instance.
pub fn get_environment(&self, environment_id: &str) -> Option<Arc<Environment>> {
self.environments.get(environment_id).cloned()
self.environments
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(environment_id)
.cloned()
}
/// Adds or replaces a named remote environment without changing the
/// manager's default environment selection.
pub fn upsert_environment(
&self,
environment_id: String,
exec_server_url: String,
) -> Result<(), ExecServerError> {
if environment_id.is_empty() {
return Err(ExecServerError::Protocol(
"environment id cannot be empty".to_string(),
));
}
let (exec_server_url, disabled) = normalize_exec_server_url(Some(exec_server_url));
if disabled {
return Err(ExecServerError::Protocol(
"remote environment cannot use disabled exec-server url".to_string(),
));
}
let Some(exec_server_url) = exec_server_url else {
return Err(ExecServerError::Protocol(
"remote environment requires an exec-server url".to_string(),
));
};
let environment = Environment::remote_inner(
exec_server_url,
self.local_environment.local_runtime_paths.clone(),
);
self.environments
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(environment_id, Arc::new(environment));
Ok(())
}
}
@@ -717,6 +760,45 @@ mod tests {
assert!(manager.get_environment("does-not-exist").is_none());
}
#[tokio::test]
async fn environment_manager_upserts_named_remote_environment() {
let manager = EnvironmentManager::disabled_for_tests(test_runtime_paths());
manager
.upsert_environment("executor-a".to_string(), "ws://127.0.0.1:8765".to_string())
.expect("remote environment");
let first = manager
.get_environment("executor-a")
.expect("first remote environment");
assert!(first.is_remote());
assert_eq!(first.exec_server_url(), Some("ws://127.0.0.1:8765"));
assert_eq!(manager.default_environment_id(), None);
manager
.upsert_environment("executor-a".to_string(), "ws://127.0.0.1:9876".to_string())
.expect("updated remote environment");
let second = manager
.get_environment("executor-a")
.expect("second remote environment");
assert!(second.is_remote());
assert_eq!(second.exec_server_url(), Some("ws://127.0.0.1:9876"));
assert!(!Arc::ptr_eq(&first, &second));
}
#[tokio::test]
async fn environment_manager_rejects_empty_remote_environment_url() {
let manager = EnvironmentManager::disabled_for_tests(test_runtime_paths());
let err = manager
.upsert_environment("executor-a".to_string(), String::new())
.expect_err("empty URL should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: remote environment requires an exec-server url"
);
}
#[tokio::test]
async fn default_environment_has_ready_local_executor() {
let environment = Environment::default_for_tests();

View File

@@ -1,16 +1,11 @@
use std::collections::BTreeMap;
use std::env;
use std::time::Duration;
use reqwest::StatusCode;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use sha2::Digest as _;
use tokio::time::sleep;
use tokio_tungstenite::connect_async;
use tracing::warn;
use uuid::Uuid;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
@@ -20,7 +15,6 @@ use crate::server::ConnectionProcessor;
pub const CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR: &str =
"CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN";
const PROTOCOL_VERSION: &str = "codex-exec-server-v1";
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
#[derive(Clone)]
@@ -51,28 +45,27 @@ impl ExecutorRegistryClient {
async fn register_executor(
&self,
request: &ExecutorRegistryRegisterExecutorRequest,
executor_id: &str,
) -> Result<ExecutorRegistryExecutorRegistrationResponse, ExecServerError> {
self.post_json(
&format!("/cloud/executor/{}/register", request.executor_id),
request,
)
.await
}
async fn post_json<T, R>(&self, path: &str, request: &T) -> Result<R, ExecServerError>
where
T: Serialize + Sync,
R: for<'de> Deserialize<'de>,
{
let response = self
.http
.post(endpoint_url(&self.base_url, path))
.post(endpoint_url(
&self.base_url,
&format!("/cloud/executor/{executor_id}/register"),
))
.bearer_auth(&self.bearer_token)
.json(request)
.send()
.await?;
self.parse_json_response(response).await
}
async fn parse_json_response<R>(
&self,
response: reqwest::Response,
) -> Result<R, ExecServerError>
where
R: for<'de> Deserialize<'de>,
{
if response.status().is_success() {
return response.json::<R>().await.map_err(ExecServerError::from);
}
@@ -87,19 +80,8 @@ impl ExecutorRegistryClient {
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
struct ExecutorRegistryRegisterExecutorRequest {
idempotency_id: String,
executor_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>,
labels: BTreeMap<String, String>,
metadata: Value,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
struct ExecutorRegistryExecutorRegistrationResponse {
id: String,
executor_id: String,
url: String,
}
@@ -143,32 +125,6 @@ impl RemoteExecutorConfig {
bearer_token,
})
}
fn registration_request(
&self,
registration_id: Uuid,
) -> ExecutorRegistryRegisterExecutorRequest {
ExecutorRegistryRegisterExecutorRequest {
idempotency_id: self.default_idempotency_id(registration_id),
executor_id: self.executor_id.clone(),
name: Some(self.name.clone()),
labels: BTreeMap::new(),
metadata: Value::Object(Default::default()),
}
}
fn default_idempotency_id(&self, registration_id: Uuid) -> String {
let mut hasher = sha2::Sha256::new();
hasher.update(self.executor_id.as_bytes());
hasher.update(b"\0");
hasher.update(self.name.as_bytes());
hasher.update(b"\0");
hasher.update(PROTOCOL_VERSION);
hasher.update(b"\0");
hasher.update(registration_id.as_bytes());
let digest = hasher.finalize();
format!("codex-exec-server-{digest:x}")
}
}
/// Register an exec-server for remote use and serve requests over the returned
@@ -179,15 +135,13 @@ pub async fn run_remote_executor(
) -> Result<(), ExecServerError> {
let client = ExecutorRegistryClient::new(config.base_url.clone(), config.bearer_token.clone())?;
let processor = ConnectionProcessor::new(runtime_paths);
let registration_id = Uuid::new_v4();
let mut backoff = Duration::from_secs(1);
loop {
let request = config.registration_request(registration_id);
let response = client.register_executor(&request).await?;
let response = client.register_executor(&config.executor_id).await?;
eprintln!(
"codex exec-server remote executor {} registered with executor_id {}",
response.id, response.executor_id
"codex exec-server remote executor registered with executor_id {}",
response.executor_id
);
match connect_async(response.url.as_str()).await {
@@ -323,11 +277,9 @@ fn preview_error_body(body: &str) -> Option<String> {
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use serde_json::json;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::body_json;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
@@ -337,21 +289,16 @@ mod tests {
#[tokio::test]
async fn register_executor_posts_with_bearer_token_header() {
let server = MockServer::start().await;
let registration_id = Uuid::from_u128(1);
let config = RemoteExecutorConfig::with_bearer_token(
server.uri(),
"exec-requested".to_string(),
"registry-token".to_string(),
)
.expect("config");
let request = config.registration_request(registration_id);
let expected_request = serde_json::to_value(&request).expect("serialize request");
Mock::given(method("POST"))
.and(path("/cloud/executor/exec-requested/register"))
.and(header("authorization", "Bearer registry-token"))
.and(body_json(expected_request))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": "registration-1",
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"executor_id": "exec-1",
"url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc"
})))
@@ -361,14 +308,13 @@ mod tests {
.expect("client");
let response = client
.register_executor(&request)
.register_executor(&config.executor_id)
.await
.expect("register executor");
assert_eq!(
response,
ExecutorRegistryExecutorRegistrationResponse {
id: "registration-1".to_string(),
executor_id: "exec-1".to_string(),
url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(),
}