Compare commits

...

1 Commits

Author SHA1 Message Date
Michael Zeng
205d9335af exec-server: archive agent identity registry handoff experiment 2026-05-14 21:36:21 -07:00
6 changed files with 152 additions and 45 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -2749,6 +2749,8 @@ dependencies = [
"codex-app-server-protocol",
"codex-client",
"codex-file-system",
"codex-login",
"codex-model-provider",
"codex-protocol",
"codex-sandboxing",
"codex-test-binary-support",

View File

@@ -19,6 +19,8 @@ bytes = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-client = { workspace = true }
codex-file-system = { workspace = true }
codex-login = { workspace = true }
codex-model-provider = { workspace = true }
codex-protocol = { workspace = true }
codex-sandboxing = { workspace = true }
codex-utils-absolute-path = { workspace = true }

View File

@@ -26,7 +26,8 @@ The CLI entrypoint supports:
Remote mode registers the local exec-server with the executor registry,
then reconnects to the service-provided rendezvous websocket as the executor.
It requires a bearer token in `CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN`.
It requires an Agent Identity JWT in
`CODEX_EXEC_SERVER_REMOTE_AGENT_IDENTITY_JWT`.
Wire framing:

View File

@@ -91,7 +91,7 @@ pub use protocol::TerminateResponse;
pub use protocol::WriteParams;
pub use protocol::WriteResponse;
pub use protocol::WriteStatus;
pub use remote::CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR;
pub use remote::CODEX_EXEC_SERVER_REMOTE_AGENT_IDENTITY_JWT_ENV_VAR;
pub use remote::RemoteExecutorConfig;
pub use remote::run_remote_executor;
pub use runtime_paths::ExecServerRuntimePaths;

View File

@@ -1,6 +1,8 @@
use std::env;
use std::time::Duration;
use codex_login::CodexAuth;
use codex_model_provider::auth_provider_from_auth;
use reqwest::StatusCode;
use serde::Deserialize;
use tokio::time::sleep;
@@ -14,33 +16,73 @@ use crate::ExecServerRuntimePaths;
use crate::relay::run_multiplexed_executor;
use crate::server::ConnectionProcessor;
pub const CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR: &str =
"CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN";
pub const CODEX_EXEC_SERVER_REMOTE_AGENT_IDENTITY_JWT_ENV_VAR: &str =
"CODEX_EXEC_SERVER_REMOTE_AGENT_IDENTITY_JWT";
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
#[derive(Clone)]
struct ExecutorRegistryClient {
base_url: String,
bearer_token: String,
auth: ExecutorRegistryAuth,
http: reqwest::Client,
}
#[derive(Clone)]
enum ExecutorRegistryAuth {
AgentIdentity(CodexAuth),
StaticHeaders(reqwest::header::HeaderMap),
}
impl std::fmt::Debug for ExecutorRegistryClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExecutorRegistryClient")
.field("base_url", &self.base_url)
.field("bearer_token", &"<redacted>")
.field("agent_identity_jwt", &"<redacted>")
.finish_non_exhaustive()
}
}
impl ExecutorRegistryClient {
fn new(base_url: String, bearer_token: String) -> Result<Self, ExecServerError> {
async fn new(base_url: String, agent_identity_jwt: String) -> Result<Self, ExecServerError> {
let base_url = normalize_base_url(base_url)?;
let auth = CodexAuth::from_agent_identity_jwt(&agent_identity_jwt, None)
.await
.map_err(|err| {
ExecServerError::ExecutorRegistryAuth(format!(
"failed to load executor registry Agent Identity JWT: {err}"
))
})?;
Ok(Self {
base_url,
bearer_token,
auth: ExecutorRegistryAuth::AgentIdentity(auth),
http: reqwest::Client::new(),
})
}
fn with_static_headers(
base_url: String,
headers: Vec<(String, String)>,
) -> Result<Self, ExecServerError> {
let mut normalized_headers = reqwest::header::HeaderMap::new();
for (name, value) in headers {
let header_name =
reqwest::header::HeaderName::from_bytes(name.as_bytes()).map_err(|err| {
ExecServerError::ExecutorRegistryConfig(format!(
"invalid executor registry test header name `{name}`: {err}"
))
})?;
let header_value =
reqwest::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
ExecServerError::ExecutorRegistryConfig(format!(
"invalid executor registry test header value for `{name}`: {err}"
))
})?;
normalized_headers.insert(header_name, header_value);
}
Ok(Self {
base_url: normalize_base_url(base_url)?,
auth: ExecutorRegistryAuth::StaticHeaders(normalized_headers),
http: reqwest::Client::new(),
})
}
@@ -55,12 +97,21 @@ impl ExecutorRegistryClient {
&self.base_url,
&format!("/cloud/executor/{executor_id}/register"),
))
.bearer_auth(&self.bearer_token)
.headers(self.registry_auth_headers())
.send()
.await?;
self.parse_json_response(response).await
}
fn registry_auth_headers(&self) -> reqwest::header::HeaderMap {
match &self.auth {
ExecutorRegistryAuth::AgentIdentity(auth) => {
auth_provider_from_auth(auth).to_auth_headers()
}
ExecutorRegistryAuth::StaticHeaders(headers) => headers.clone(),
}
}
async fn parse_json_response<R>(
&self,
response: reqwest::Response,
@@ -94,7 +145,8 @@ pub struct RemoteExecutorConfig {
pub base_url: String,
pub executor_id: String,
pub name: String,
bearer_token: String,
agent_identity_jwt: String,
test_registration_headers: Option<Vec<(String, String)>>,
}
impl std::fmt::Debug for RemoteExecutorConfig {
@@ -103,28 +155,56 @@ impl std::fmt::Debug for RemoteExecutorConfig {
.field("base_url", &self.base_url)
.field("executor_id", &self.executor_id)
.field("name", &self.name)
.field("bearer_token", &"<redacted>")
.field("agent_identity_jwt", &"<redacted>")
.field(
"test_registration_headers",
&self
.test_registration_headers
.as_ref()
.map(|_| "<redacted>"),
)
.finish()
}
}
impl RemoteExecutorConfig {
pub fn new(base_url: String, executor_id: String) -> Result<Self, ExecServerError> {
Self::with_bearer_token(base_url, executor_id, read_remote_bearer_token_from_env()?)
Self::with_agent_identity_jwt(
base_url,
executor_id,
read_remote_agent_identity_jwt_from_env()?,
)
}
pub fn with_bearer_token(
pub fn with_agent_identity_jwt(
base_url: String,
executor_id: String,
bearer_token: String,
agent_identity_jwt: String,
) -> Result<Self, ExecServerError> {
let executor_id = normalize_executor_id(executor_id)?;
let bearer_token = normalize_bearer_token(bearer_token)?;
let agent_identity_jwt = normalize_agent_identity_jwt(agent_identity_jwt)?;
Ok(Self {
base_url,
executor_id,
name: "codex-exec-server".to_string(),
bearer_token,
agent_identity_jwt,
test_registration_headers: None,
})
}
#[doc(hidden)]
pub fn with_registration_headers_for_tests(
base_url: String,
executor_id: String,
headers: Vec<(String, String)>,
) -> Result<Self, ExecServerError> {
let executor_id = normalize_executor_id(executor_id)?;
Ok(Self {
base_url,
executor_id,
name: "codex-exec-server".to_string(),
agent_identity_jwt: String::new(),
test_registration_headers: Some(headers),
})
}
}
@@ -136,7 +216,12 @@ pub async fn run_remote_executor(
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), ExecServerError> {
ensure_rustls_crypto_provider();
let client = ExecutorRegistryClient::new(config.base_url.clone(), config.bearer_token.clone())?;
let client = if let Some(headers) = config.test_registration_headers.clone() {
ExecutorRegistryClient::with_static_headers(config.base_url.clone(), headers)?
} else {
ExecutorRegistryClient::new(config.base_url.clone(), config.agent_identity_jwt.clone())
.await?
};
let processor = ConnectionProcessor::new(runtime_paths);
let mut backoff = Duration::from_secs(1);
@@ -162,30 +247,31 @@ pub async fn run_remote_executor(
}
}
fn read_remote_bearer_token_from_env() -> Result<String, ExecServerError> {
read_remote_bearer_token_from_env_with(|name| env::var(name))
fn read_remote_agent_identity_jwt_from_env() -> Result<String, ExecServerError> {
read_remote_agent_identity_jwt_from_env_with(|name| env::var(name))
}
fn read_remote_bearer_token_from_env_with<F>(get_var: F) -> Result<String, ExecServerError>
fn read_remote_agent_identity_jwt_from_env_with<F>(get_var: F) -> Result<String, ExecServerError>
where
F: FnOnce(&str) -> Result<String, env::VarError>,
{
let bearer_token = get_var(CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR).map_err(|_| {
ExecServerError::ExecutorRegistryAuth(format!(
"executor registry bearer token environment variable `{CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR}` is not set"
))
})?;
normalize_bearer_token(bearer_token)
let agent_identity_jwt = get_var(CODEX_EXEC_SERVER_REMOTE_AGENT_IDENTITY_JWT_ENV_VAR)
.map_err(|_| {
ExecServerError::ExecutorRegistryAuth(format!(
"executor registry Agent Identity JWT environment variable `{CODEX_EXEC_SERVER_REMOTE_AGENT_IDENTITY_JWT_ENV_VAR}` is not set"
))
})?;
normalize_agent_identity_jwt(agent_identity_jwt)
}
fn normalize_bearer_token(bearer_token: String) -> Result<String, ExecServerError> {
let bearer_token = bearer_token.trim().to_string();
if bearer_token.is_empty() {
fn normalize_agent_identity_jwt(agent_identity_jwt: String) -> Result<String, ExecServerError> {
let agent_identity_jwt = agent_identity_jwt.trim().to_string();
if agent_identity_jwt.is_empty() {
return Err(ExecServerError::ExecutorRegistryAuth(format!(
"executor registry bearer token environment variable `{CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR}` is empty"
"executor registry Agent Identity JWT environment variable `{CODEX_EXEC_SERVER_REMOTE_AGENT_IDENTITY_JWT_ENV_VAR}` is empty"
)));
}
Ok(bearer_token)
Ok(agent_identity_jwt)
}
fn normalize_executor_id(executor_id: String) -> Result<String, ExecServerError> {
@@ -285,25 +371,35 @@ mod tests {
use super::*;
#[tokio::test]
async fn register_executor_posts_with_bearer_token_header() {
async fn register_executor_posts_with_agent_assertion_headers() {
let server = MockServer::start().await;
let config = RemoteExecutorConfig::with_bearer_token(
let config = RemoteExecutorConfig::with_agent_identity_jwt(
server.uri(),
"exec-requested".to_string(),
"registry-token".to_string(),
"agent-identity-jwt".to_string(),
)
.expect("config");
Mock::given(method("POST"))
.and(path("/cloud/executor/exec-requested/register"))
.and(header("authorization", "Bearer registry-token"))
.and(header("authorization", "AgentAssertion registry-assertion"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"executor_id": "exec-1",
"url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc"
})))
.mount(&server)
.await;
let client = ExecutorRegistryClient::new(server.uri(), "registry-token".to_string())
.expect("client");
let client = ExecutorRegistryClient::with_static_headers(
server.uri(),
vec![
(
"Authorization".to_string(),
"AgentAssertion registry-assertion".to_string(),
),
("ChatGPT-Account-ID".to_string(), "account-123".to_string()),
],
)
.expect("client");
let response = client
.register_executor(&config.executor_id)
@@ -320,17 +416,17 @@ mod tests {
}
#[test]
fn debug_output_redacts_bearer_token() {
let config = RemoteExecutorConfig::with_bearer_token(
fn debug_output_redacts_agent_identity_jwt() {
let config = RemoteExecutorConfig::with_agent_identity_jwt(
"https://registry.example".to_string(),
"exec-1".to_string(),
"secret-token".to_string(),
"secret-agent-identity-jwt".to_string(),
)
.expect("config");
let debug = format!("{config:?}");
assert!(debug.contains("<redacted>"));
assert!(!debug.contains("secret-token"));
assert!(!debug.contains("secret-agent-identity-jwt"));
}
}

View File

@@ -42,7 +42,7 @@ use wiremock::matchers::method;
use wiremock::matchers::path;
const EXECUTOR_ID: &str = "exec-mux-test";
const REGISTRY_TOKEN: &str = "registry-token";
const REGISTRY_ASSERTION: &str = "registry-assertion";
const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
@@ -53,7 +53,10 @@ async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Res
let registry = MockServer::start().await;
Mock::given(method("POST"))
.and(path(format!("/cloud/executor/{EXECUTOR_ID}/register")))
.and(header("authorization", format!("Bearer {REGISTRY_TOKEN}")))
.and(header(
"authorization",
format!("AgentAssertion {REGISTRY_ASSERTION}"),
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"executor_id": EXECUTOR_ID,
"url": rendezvous_url,
@@ -63,10 +66,13 @@ async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Res
let (codex_exe, codex_linux_sandbox_exe) = common::current_test_binary_helper_paths()?;
let runtime_paths = ExecServerRuntimePaths::new(codex_exe, codex_linux_sandbox_exe)?;
let config = RemoteExecutorConfig::with_bearer_token(
let config = RemoteExecutorConfig::with_registration_headers_for_tests(
registry.uri(),
EXECUTOR_ID.to_string(),
REGISTRY_TOKEN.to_string(),
vec![(
"Authorization".to_string(),
format!("AgentAssertion {REGISTRY_ASSERTION}"),
)],
)?;
let remote_executor = tokio::spawn(codex_exec_server::run_remote_executor(
config,