mirror of
https://github.com/openai/codex.git
synced 2026-05-22 03:54:18 +00:00
Compare commits
8 Commits
starr/veri
...
codex/bisc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4fabb56c82 | ||
|
|
3094e2b6b0 | ||
|
|
a1f02c63e0 | ||
|
|
9dd3444b78 | ||
|
|
f5fe716060 | ||
|
|
cfb2a16b13 | ||
|
|
f18f987a98 | ||
|
|
7fc8bdbaba |
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -2747,6 +2747,7 @@ dependencies = [
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"codex-api",
|
||||
"codex-app-server-protocol",
|
||||
"codex-client",
|
||||
"codex-file-system",
|
||||
@@ -2758,6 +2759,7 @@ dependencies = [
|
||||
"codex-utils-rustls-provider",
|
||||
"ctor 0.6.3",
|
||||
"futures",
|
||||
"http 1.4.0",
|
||||
"pretty_assertions",
|
||||
"prost 0.14.3",
|
||||
"reqwest",
|
||||
|
||||
@@ -73,6 +73,8 @@ use codex_features::FEATURES;
|
||||
use codex_features::Stage;
|
||||
use codex_features::is_known_feature_key;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_login::read_codex_access_token_from_env;
|
||||
use codex_memories_write::clear_memory_roots_contents;
|
||||
use codex_models_manager::bundled_models_response;
|
||||
use codex_models_manager::manager::RefreshStrategy;
|
||||
@@ -478,17 +480,21 @@ struct ExecServerCommand {
|
||||
#[arg(long = "listen", value_name = "URL", conflicts_with = "remote")]
|
||||
listen: Option<String>,
|
||||
|
||||
/// Register this exec-server as a remote executor using the given base URL.
|
||||
#[arg(long = "remote", value_name = "URL", requires = "executor_id")]
|
||||
/// Register this exec-server as a remote environment using the given base URL.
|
||||
#[arg(long = "remote", value_name = "URL", requires = "environment_id")]
|
||||
remote: Option<String>,
|
||||
|
||||
/// Executor id to attach to when registering remotely.
|
||||
#[arg(long = "executor-id", value_name = "ID")]
|
||||
executor_id: Option<String>,
|
||||
/// Environment id to attach to when registering remotely.
|
||||
#[arg(long = "environment-id", value_name = "ID")]
|
||||
environment_id: Option<String>,
|
||||
|
||||
/// Human-readable executor name.
|
||||
/// Human-readable environment name.
|
||||
#[arg(long = "name", value_name = "NAME")]
|
||||
name: Option<String>,
|
||||
|
||||
/// Use Agent Identity auth from CODEX_ACCESS_TOKEN for remote registration.
|
||||
#[arg(long = "use-agent-identity-auth", requires = "remote")]
|
||||
use_agent_identity_auth: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, clap::Subcommand)]
|
||||
@@ -1388,7 +1394,13 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
root_remote_auth_token_env.as_deref(),
|
||||
"exec-server",
|
||||
)?;
|
||||
run_exec_server_command(cmd, &arg0_paths).await?;
|
||||
run_exec_server_command(
|
||||
cmd,
|
||||
&arg0_paths,
|
||||
&root_config_overrides,
|
||||
interactive.config_profile.clone(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Some(Subcommand::Features(FeaturesCli { sub })) => match sub {
|
||||
FeaturesSubcommand::List => {
|
||||
@@ -1485,6 +1497,8 @@ fn profile_v2_for_subcommand<'a>(
|
||||
async fn run_exec_server_command(
|
||||
cmd: ExecServerCommand,
|
||||
arg0_paths: &Arg0DispatchPaths,
|
||||
root_config_overrides: &CliConfigOverrides,
|
||||
config_profile: Option<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
let codex_self_exe = arg0_paths
|
||||
.codex_self_exe
|
||||
@@ -1495,15 +1509,24 @@ async fn run_exec_server_command(
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
if let Some(base_url) = cmd.remote {
|
||||
let executor_id = cmd
|
||||
.executor_id
|
||||
.ok_or_else(|| anyhow::anyhow!("--executor-id is required when --remote is set"))?;
|
||||
let mut remote_config =
|
||||
codex_exec_server::RemoteExecutorConfig::new(base_url, executor_id)?;
|
||||
let environment_id = cmd
|
||||
.environment_id
|
||||
.ok_or_else(|| anyhow::anyhow!("--environment-id is required when --remote is set"))?;
|
||||
let auth_provider = load_exec_server_remote_auth_provider(
|
||||
root_config_overrides,
|
||||
config_profile,
|
||||
cmd.use_agent_identity_auth,
|
||||
)
|
||||
.await?;
|
||||
let mut remote_config = codex_exec_server::RemoteEnvironmentConfig::new(
|
||||
base_url,
|
||||
environment_id,
|
||||
auth_provider,
|
||||
)?;
|
||||
if let Some(name) = cmd.name {
|
||||
remote_config.name = name;
|
||||
}
|
||||
codex_exec_server::run_remote_executor(remote_config, runtime_paths).await?;
|
||||
codex_exec_server::run_remote_environment(remote_config, runtime_paths).await?;
|
||||
return Ok(());
|
||||
}
|
||||
let listen_url = cmd
|
||||
@@ -1515,6 +1538,75 @@ async fn run_exec_server_command(
|
||||
.map_err(anyhow::Error::from_boxed)
|
||||
}
|
||||
|
||||
async fn load_exec_server_remote_auth_provider(
|
||||
root_config_overrides: &CliConfigOverrides,
|
||||
config_profile: Option<String>,
|
||||
use_agent_identity_auth: bool,
|
||||
) -> anyhow::Result<codex_api::SharedAuthProvider> {
|
||||
let config = load_exec_server_remote_config(root_config_overrides, config_profile).await?;
|
||||
if use_agent_identity_auth {
|
||||
let agent_identity_jwt = read_codex_access_token_from_env().ok_or_else(|| {
|
||||
anyhow::anyhow!("CODEX_ACCESS_TOKEN is required when --use-agent-identity-auth is set")
|
||||
})?;
|
||||
let auth =
|
||||
CodexAuth::from_agent_identity_jwt(&agent_identity_jwt, Some(&config.chatgpt_base_url))
|
||||
.await?;
|
||||
return Ok(codex_model_provider::auth_provider_from_auth(&auth));
|
||||
}
|
||||
|
||||
let auth = load_exec_server_remote_auth(
|
||||
&config,
|
||||
"remote exec-server registration requires ChatGPT authentication; run `codex login` first",
|
||||
)
|
||||
.await?;
|
||||
|
||||
if !auth.is_chatgpt_auth() {
|
||||
anyhow::bail!(
|
||||
"remote exec-server registration requires ChatGPT authentication; API key and Agent Identity auth are not supported"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(codex_model_provider::auth_provider_from_auth(&auth))
|
||||
}
|
||||
|
||||
async fn load_exec_server_remote_config(
|
||||
root_config_overrides: &CliConfigOverrides,
|
||||
config_profile: Option<String>,
|
||||
) -> anyhow::Result<codex_core::config::Config> {
|
||||
let cli_kv_overrides = root_config_overrides
|
||||
.parse_overrides()
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
Ok(ConfigBuilder::default()
|
||||
.cli_overrides(cli_kv_overrides)
|
||||
.harness_overrides(ConfigOverrides {
|
||||
config_profile,
|
||||
..Default::default()
|
||||
})
|
||||
.build()
|
||||
.await?)
|
||||
}
|
||||
|
||||
async fn load_exec_server_remote_auth(
|
||||
config: &codex_core::config::Config,
|
||||
missing_auth_error: &'static str,
|
||||
) -> anyhow::Result<codex_login::CodexAuth> {
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config, /*enable_codex_api_key_env*/ true).await;
|
||||
|
||||
let auth = match auth_manager.auth().await {
|
||||
Some(auth) => auth,
|
||||
None => {
|
||||
auth_manager.reload().await;
|
||||
auth_manager
|
||||
.auth()
|
||||
.await
|
||||
.ok_or_else(|| anyhow::anyhow!(missing_auth_error))?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(auth)
|
||||
}
|
||||
|
||||
async fn enable_feature_in_config(interactive: &TuiCli, feature: &str) -> anyhow::Result<()> {
|
||||
FeatureToggles::validate_feature(feature)?;
|
||||
let codex_home = find_codex_home()?;
|
||||
|
||||
@@ -17,6 +17,7 @@ axum = { workspace = true, features = ["http1", "tokio", "ws"] }
|
||||
base64 = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-api = { workspace = true }
|
||||
codex-client = { workspace = true }
|
||||
codex-file-system = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
@@ -51,6 +52,7 @@ uuid = { workspace = true, features = ["v4"] }
|
||||
anyhow = { workspace = true }
|
||||
codex-test-binary-support = { workspace = true }
|
||||
ctor = { workspace = true }
|
||||
http = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
serial_test = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
|
||||
@@ -26,7 +26,11 @@ The CLI entrypoint supports:
|
||||
|
||||
Remote mode registers the local exec-server with the executor registry,
|
||||
then reconnects to the service-provided rendezvous websocket as the executor.
|
||||
It requires a bearer token in `CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN`.
|
||||
It uses the standard Codex ChatGPT sign-in state; run `codex login` first when
|
||||
remote registration needs authentication. Containerized callers that receive an
|
||||
Agent Identity JWT in `CODEX_ACCESS_TOKEN` can opt into that auth path with
|
||||
`--use-agent-identity-auth`; Codex then registers an Agent task and sends the
|
||||
derived AgentAssertion headers on the registry request.
|
||||
|
||||
Wire framing:
|
||||
|
||||
@@ -376,7 +380,9 @@ The crate exports:
|
||||
|
||||
Callers must pass `ExecServerRuntimePaths` to `run_main()`. The top-level
|
||||
`codex exec-server` command builds these paths from the `codex` arg0 dispatch
|
||||
state.
|
||||
state. `RemoteExecutorConfig::new(...)` also takes the auth provider that
|
||||
remote registration should use; the CLI builds that provider from Codex auth
|
||||
state before starting remote mode.
|
||||
|
||||
## Example session
|
||||
|
||||
|
||||
@@ -261,18 +261,18 @@ pub enum ExecServerError {
|
||||
Protocol(String),
|
||||
#[error("exec-server rejected request ({code}): {message}")]
|
||||
Server { code: i64, message: String },
|
||||
#[error("executor registry request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())]
|
||||
ExecutorRegistryHttp {
|
||||
#[error("environment registry request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())]
|
||||
EnvironmentRegistryHttp {
|
||||
status: reqwest::StatusCode,
|
||||
code: Option<String>,
|
||||
message: String,
|
||||
},
|
||||
#[error("executor registry configuration error: {0}")]
|
||||
ExecutorRegistryConfig(String),
|
||||
#[error("executor registry authentication error: {0}")]
|
||||
ExecutorRegistryAuth(String),
|
||||
#[error("executor registry request failed: {0}")]
|
||||
ExecutorRegistryRequest(#[from] reqwest::Error),
|
||||
#[error("environment registry configuration error: {0}")]
|
||||
EnvironmentRegistryConfig(String),
|
||||
#[error("environment registry authentication error: {0}")]
|
||||
EnvironmentRegistryAuth(String),
|
||||
#[error("environment registry request failed: {0}")]
|
||||
EnvironmentRegistryRequest(#[from] reqwest::Error),
|
||||
}
|
||||
|
||||
impl ExecServerClient {
|
||||
|
||||
@@ -91,9 +91,8 @@ pub use protocol::TerminateResponse;
|
||||
pub use protocol::WriteParams;
|
||||
pub use protocol::WriteResponse;
|
||||
pub use protocol::WriteStatus;
|
||||
pub use remote::CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR;
|
||||
pub use remote::RemoteExecutorConfig;
|
||||
pub use remote::run_remote_executor;
|
||||
pub use remote::RemoteEnvironmentConfig;
|
||||
pub use remote::run_remote_environment;
|
||||
pub use runtime_paths::ExecServerRuntimePaths;
|
||||
pub use server::DEFAULT_LISTEN_URL;
|
||||
pub use server::ExecServerListenUrlParseError;
|
||||
|
||||
@@ -24,7 +24,7 @@ pub struct StartedExecProcess {
|
||||
/// stdout, stderr, or pty bytes. `Exited` reports the process exit status, while
|
||||
/// `Closed` means all output streams have ended and no more output events will
|
||||
/// arrive. `Failed` is used when the process session cannot continue, for
|
||||
/// example because the remote executor connection disconnected.
|
||||
/// example because the remote environment connection disconnected.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ExecProcessEvent {
|
||||
Output(ProcessOutputChunk),
|
||||
|
||||
@@ -333,7 +333,7 @@ pub(crate) async fn run_multiplexed_executor<S>(
|
||||
continue;
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
debug!("multiplexed executor websocket read failed: {err}");
|
||||
debug!("multiplexed environment websocket read failed: {err}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::env;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_api::SharedAuthProvider;
|
||||
use reqwest::StatusCode;
|
||||
use serde::Deserialize;
|
||||
use tokio::time::sleep;
|
||||
@@ -14,48 +14,45 @@ use crate::ExecServerRuntimePaths;
|
||||
use crate::relay::run_multiplexed_executor;
|
||||
use crate::server::ConnectionProcessor;
|
||||
|
||||
pub const CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR: &str =
|
||||
"CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN";
|
||||
|
||||
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ExecutorRegistryClient {
|
||||
struct EnvironmentRegistryClient {
|
||||
base_url: String,
|
||||
bearer_token: String,
|
||||
auth_provider: SharedAuthProvider,
|
||||
http: reqwest::Client,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ExecutorRegistryClient {
|
||||
impl std::fmt::Debug for EnvironmentRegistryClient {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ExecutorRegistryClient")
|
||||
f.debug_struct("EnvironmentRegistryClient")
|
||||
.field("base_url", &self.base_url)
|
||||
.field("bearer_token", &"<redacted>")
|
||||
.field("auth_provider", &"<redacted>")
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutorRegistryClient {
|
||||
fn new(base_url: String, bearer_token: String) -> Result<Self, ExecServerError> {
|
||||
impl EnvironmentRegistryClient {
|
||||
fn new(base_url: String, auth_provider: SharedAuthProvider) -> Result<Self, ExecServerError> {
|
||||
let base_url = normalize_base_url(base_url)?;
|
||||
Ok(Self {
|
||||
base_url,
|
||||
bearer_token,
|
||||
auth_provider,
|
||||
http: reqwest::Client::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn register_executor(
|
||||
async fn register_environment(
|
||||
&self,
|
||||
executor_id: &str,
|
||||
) -> Result<ExecutorRegistryExecutorRegistrationResponse, ExecServerError> {
|
||||
environment_id: &str,
|
||||
) -> Result<EnvironmentRegistryEnvironmentRegistrationResponse, ExecServerError> {
|
||||
let response = self
|
||||
.http
|
||||
.post(endpoint_url(
|
||||
&self.base_url,
|
||||
&format!("/cloud/executor/{executor_id}/register"),
|
||||
&format!("/cloud/environment/{environment_id}/register"),
|
||||
))
|
||||
.bearer_auth(&self.bearer_token)
|
||||
.headers(self.auth_provider.to_auth_headers())
|
||||
.send()
|
||||
.await?;
|
||||
self.parse_json_response(response).await
|
||||
@@ -75,76 +72,72 @@ impl ExecutorRegistryClient {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN) {
|
||||
return Err(executor_registry_auth_error(status, &body));
|
||||
return Err(environment_registry_auth_error(status, &body));
|
||||
}
|
||||
|
||||
Err(executor_registry_http_error(status, &body))
|
||||
Err(environment_registry_http_error(status, &body))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
|
||||
struct ExecutorRegistryExecutorRegistrationResponse {
|
||||
executor_id: String,
|
||||
struct EnvironmentRegistryEnvironmentRegistrationResponse {
|
||||
environment_id: String,
|
||||
url: String,
|
||||
}
|
||||
|
||||
/// Configuration for registering an exec-server for remote use.
|
||||
#[derive(Clone, Eq, PartialEq)]
|
||||
pub struct RemoteExecutorConfig {
|
||||
#[derive(Clone)]
|
||||
pub struct RemoteEnvironmentConfig {
|
||||
pub base_url: String,
|
||||
pub executor_id: String,
|
||||
pub environment_id: String,
|
||||
pub name: String,
|
||||
bearer_token: String,
|
||||
auth_provider: SharedAuthProvider,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RemoteExecutorConfig {
|
||||
impl std::fmt::Debug for RemoteEnvironmentConfig {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RemoteExecutorConfig")
|
||||
f.debug_struct("RemoteEnvironmentConfig")
|
||||
.field("base_url", &self.base_url)
|
||||
.field("executor_id", &self.executor_id)
|
||||
.field("environment_id", &self.environment_id)
|
||||
.field("name", &self.name)
|
||||
.field("bearer_token", &"<redacted>")
|
||||
.field("auth_provider", &"<redacted>")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteExecutorConfig {
|
||||
pub fn new(base_url: String, executor_id: String) -> Result<Self, ExecServerError> {
|
||||
Self::with_bearer_token(base_url, executor_id, read_remote_bearer_token_from_env()?)
|
||||
}
|
||||
|
||||
pub fn with_bearer_token(
|
||||
impl RemoteEnvironmentConfig {
|
||||
pub fn new(
|
||||
base_url: String,
|
||||
executor_id: String,
|
||||
bearer_token: String,
|
||||
environment_id: String,
|
||||
auth_provider: SharedAuthProvider,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let executor_id = normalize_executor_id(executor_id)?;
|
||||
let bearer_token = normalize_bearer_token(bearer_token)?;
|
||||
let environment_id = normalize_environment_id(environment_id)?;
|
||||
Ok(Self {
|
||||
base_url,
|
||||
executor_id,
|
||||
environment_id,
|
||||
name: "codex-exec-server".to_string(),
|
||||
bearer_token,
|
||||
auth_provider,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Register an exec-server for remote use and serve requests over the returned
|
||||
/// rendezvous websocket.
|
||||
pub async fn run_remote_executor(
|
||||
config: RemoteExecutorConfig,
|
||||
pub async fn run_remote_environment(
|
||||
config: RemoteEnvironmentConfig,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Result<(), ExecServerError> {
|
||||
ensure_rustls_crypto_provider();
|
||||
let client = ExecutorRegistryClient::new(config.base_url.clone(), config.bearer_token.clone())?;
|
||||
let client =
|
||||
EnvironmentRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?;
|
||||
let processor = ConnectionProcessor::new(runtime_paths);
|
||||
let mut backoff = Duration::from_secs(1);
|
||||
|
||||
loop {
|
||||
let response = client.register_executor(&config.executor_id).await?;
|
||||
let response = client.register_environment(&config.environment_id).await?;
|
||||
eprintln!(
|
||||
"codex exec-server remote executor registered with executor_id {}",
|
||||
response.executor_id
|
||||
"codex exec-server remote environment registered with environment_id {}",
|
||||
response.environment_id
|
||||
);
|
||||
|
||||
match connect_async(response.url.as_str()).await {
|
||||
@@ -162,40 +155,14 @@ pub async fn run_remote_executor(
|
||||
}
|
||||
}
|
||||
|
||||
fn read_remote_bearer_token_from_env() -> Result<String, ExecServerError> {
|
||||
read_remote_bearer_token_from_env_with(|name| env::var(name))
|
||||
}
|
||||
|
||||
fn read_remote_bearer_token_from_env_with<F>(get_var: F) -> Result<String, ExecServerError>
|
||||
where
|
||||
F: FnOnce(&str) -> Result<String, env::VarError>,
|
||||
{
|
||||
let bearer_token = get_var(CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR).map_err(|_| {
|
||||
ExecServerError::ExecutorRegistryAuth(format!(
|
||||
"executor registry bearer token environment variable `{CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR}` is not set"
|
||||
))
|
||||
})?;
|
||||
normalize_bearer_token(bearer_token)
|
||||
}
|
||||
|
||||
fn normalize_bearer_token(bearer_token: String) -> Result<String, ExecServerError> {
|
||||
let bearer_token = bearer_token.trim().to_string();
|
||||
if bearer_token.is_empty() {
|
||||
return Err(ExecServerError::ExecutorRegistryAuth(format!(
|
||||
"executor registry bearer token environment variable `{CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR}` is empty"
|
||||
)));
|
||||
}
|
||||
Ok(bearer_token)
|
||||
}
|
||||
|
||||
fn normalize_executor_id(executor_id: String) -> Result<String, ExecServerError> {
|
||||
let executor_id = executor_id.trim().to_string();
|
||||
if executor_id.is_empty() {
|
||||
return Err(ExecServerError::ExecutorRegistryConfig(
|
||||
"executor id is required for remote exec-server registration".to_string(),
|
||||
fn normalize_environment_id(environment_id: String) -> Result<String, ExecServerError> {
|
||||
let environment_id = environment_id.trim().to_string();
|
||||
if environment_id.is_empty() {
|
||||
return Err(ExecServerError::EnvironmentRegistryConfig(
|
||||
"environment id is required for remote exec-server registration".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(executor_id)
|
||||
Ok(environment_id)
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -212,8 +179,8 @@ struct RegistryError {
|
||||
fn normalize_base_url(base_url: String) -> Result<String, ExecServerError> {
|
||||
let trimmed = base_url.trim().trim_end_matches('/').to_string();
|
||||
if trimmed.is_empty() {
|
||||
return Err(ExecServerError::ExecutorRegistryConfig(
|
||||
"executor registry base URL is required".to_string(),
|
||||
return Err(ExecServerError::EnvironmentRegistryConfig(
|
||||
"environment registry base URL is required".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(trimmed)
|
||||
@@ -223,14 +190,14 @@ fn endpoint_url(base_url: &str, path: &str) -> String {
|
||||
format!("{base_url}/{}", path.trim_start_matches('/'))
|
||||
}
|
||||
|
||||
fn executor_registry_auth_error(status: StatusCode, body: &str) -> ExecServerError {
|
||||
fn environment_registry_auth_error(status: StatusCode, body: &str) -> ExecServerError {
|
||||
let message = registry_error_message(body).unwrap_or_else(|| "empty error body".to_string());
|
||||
ExecServerError::ExecutorRegistryAuth(format!(
|
||||
"executor registry authentication failed ({status}): {message}"
|
||||
ExecServerError::EnvironmentRegistryAuth(format!(
|
||||
"environment registry authentication failed ({status}): {message}"
|
||||
))
|
||||
}
|
||||
|
||||
fn executor_registry_http_error(status: StatusCode, body: &str) -> ExecServerError {
|
||||
fn environment_registry_http_error(status: StatusCode, body: &str) -> ExecServerError {
|
||||
let parsed = serde_json::from_str::<RegistryErrorBody>(body).ok();
|
||||
let (code, message) = parsed
|
||||
.and_then(|body| body.error)
|
||||
@@ -249,7 +216,7 @@ fn executor_registry_http_error(status: StatusCode, body: &str) -> ExecServerErr
|
||||
.unwrap_or_else(|| "empty or malformed error body".to_string()),
|
||||
)
|
||||
});
|
||||
ExecServerError::ExecutorRegistryHttp {
|
||||
ExecServerError::EnvironmentRegistryHttp {
|
||||
status,
|
||||
code,
|
||||
message,
|
||||
@@ -274,6 +241,11 @@ fn preview_error_body(body: &str) -> Option<String> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_api::AuthProvider;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use pretty_assertions::assert_eq;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
@@ -284,53 +256,75 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct StaticRegistryAuthProvider;
|
||||
|
||||
impl AuthProvider for StaticRegistryAuthProvider {
|
||||
fn add_auth_headers(&self, headers: &mut HeaderMap) {
|
||||
let _ = headers.insert(
|
||||
http::header::AUTHORIZATION,
|
||||
HeaderValue::from_static("Bearer registry-token"),
|
||||
);
|
||||
let _ = headers.insert(
|
||||
"ChatGPT-Account-ID",
|
||||
HeaderValue::from_static("workspace-123"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn static_registry_auth_provider() -> SharedAuthProvider {
|
||||
Arc::new(StaticRegistryAuthProvider)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn register_executor_posts_with_bearer_token_header() {
|
||||
async fn register_environment_posts_with_auth_provider_headers() {
|
||||
let server = MockServer::start().await;
|
||||
let config = RemoteExecutorConfig::with_bearer_token(
|
||||
let config = RemoteEnvironmentConfig::new(
|
||||
server.uri(),
|
||||
"exec-requested".to_string(),
|
||||
"registry-token".to_string(),
|
||||
static_registry_auth_provider(),
|
||||
)
|
||||
.expect("config");
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/cloud/executor/exec-requested/register"))
|
||||
.and(path("/cloud/environment/exec-requested/register"))
|
||||
.and(header("authorization", "Bearer registry-token"))
|
||||
.and(header("chatgpt-account-id", "workspace-123"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"executor_id": "exec-1",
|
||||
"url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc"
|
||||
"environment_id": "exec-1",
|
||||
"url": "wss://rendezvous.test/environment/exec-1?role=environment&sig=abc"
|
||||
})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
let client = ExecutorRegistryClient::new(server.uri(), "registry-token".to_string())
|
||||
let client = EnvironmentRegistryClient::new(server.uri(), static_registry_auth_provider())
|
||||
.expect("client");
|
||||
|
||||
let response = client
|
||||
.register_executor(&config.executor_id)
|
||||
.register_environment(&config.environment_id)
|
||||
.await
|
||||
.expect("register executor");
|
||||
.expect("register environment");
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
ExecutorRegistryExecutorRegistrationResponse {
|
||||
executor_id: "exec-1".to_string(),
|
||||
url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(),
|
||||
EnvironmentRegistryEnvironmentRegistrationResponse {
|
||||
environment_id: "exec-1".to_string(),
|
||||
url: "wss://rendezvous.test/environment/exec-1?role=environment&sig=abc"
|
||||
.to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn debug_output_redacts_bearer_token() {
|
||||
let config = RemoteExecutorConfig::with_bearer_token(
|
||||
fn debug_output_redacts_auth_provider() {
|
||||
let config = RemoteEnvironmentConfig::new(
|
||||
"https://registry.example".to_string(),
|
||||
"exec-1".to_string(),
|
||||
"secret-token".to_string(),
|
||||
static_registry_auth_provider(),
|
||||
)
|
||||
.expect("config");
|
||||
|
||||
let debug = format!("{config:?}");
|
||||
|
||||
assert!(debug.contains("<redacted>"));
|
||||
assert!(!debug.contains("secret-token"));
|
||||
assert!(!debug.contains("workspace-123"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::bail;
|
||||
use codex_api::AuthProvider;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
@@ -19,15 +20,18 @@ use codex_app_server_protocol::RequestId;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_exec_server::InitializeParams;
|
||||
use codex_exec_server::InitializeResponse;
|
||||
use codex_exec_server::RemoteExecutorConfig;
|
||||
use codex_exec_server::RemoteEnvironmentConfig;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use pretty_assertions::assert_eq;
|
||||
use prost::Message as ProstMessage;
|
||||
use relay_proto::RelayData;
|
||||
use relay_proto::RelayMessageFrame;
|
||||
use relay_proto::RelayReset;
|
||||
use relay_proto::relay_message_frame;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
@@ -41,21 +45,39 @@ use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
const EXECUTOR_ID: &str = "exec-mux-test";
|
||||
const ENVIRONMENT_ID: &str = "exec-mux-test";
|
||||
const REGISTRY_TOKEN: &str = "registry-token";
|
||||
const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
|
||||
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct StaticRegistryAuthProvider;
|
||||
|
||||
impl AuthProvider for StaticRegistryAuthProvider {
|
||||
fn add_auth_headers(&self, headers: &mut HeaderMap) {
|
||||
let _ = headers.insert(
|
||||
http::header::AUTHORIZATION,
|
||||
HeaderValue::from_static("Bearer registry-token"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn static_registry_auth_provider() -> codex_api::SharedAuthProvider {
|
||||
Arc::new(StaticRegistryAuthProvider)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Result<()> {
|
||||
async fn multiplexed_remote_environment_routes_independent_virtual_streams() -> Result<()> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
let rendezvous_url = format!("ws://{}", listener.local_addr()?);
|
||||
let registry = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path(format!("/cloud/executor/{EXECUTOR_ID}/register")))
|
||||
.and(path(format!(
|
||||
"/cloud/environment/{ENVIRONMENT_ID}/register"
|
||||
)))
|
||||
.and(header("authorization", format!("Bearer {REGISTRY_TOKEN}")))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"executor_id": EXECUTOR_ID,
|
||||
"environment_id": ENVIRONMENT_ID,
|
||||
"url": rendezvous_url,
|
||||
})))
|
||||
.mount(®istry)
|
||||
@@ -63,22 +85,22 @@ async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Res
|
||||
|
||||
let (codex_exe, codex_linux_sandbox_exe) = common::current_test_binary_helper_paths()?;
|
||||
let runtime_paths = ExecServerRuntimePaths::new(codex_exe, codex_linux_sandbox_exe)?;
|
||||
let config = RemoteExecutorConfig::with_bearer_token(
|
||||
let config = RemoteEnvironmentConfig::new(
|
||||
registry.uri(),
|
||||
EXECUTOR_ID.to_string(),
|
||||
REGISTRY_TOKEN.to_string(),
|
||||
ENVIRONMENT_ID.to_string(),
|
||||
static_registry_auth_provider(),
|
||||
)?;
|
||||
let remote_executor = tokio::spawn(codex_exec_server::run_remote_executor(
|
||||
let remote_environment = tokio::spawn(codex_exec_server::run_remote_environment(
|
||||
config,
|
||||
runtime_paths,
|
||||
));
|
||||
|
||||
let (socket, _peer_addr) = timeout(TEST_TIMEOUT, listener.accept())
|
||||
.await
|
||||
.context("remote executor should connect to fake rendezvous")??;
|
||||
.context("remote environment should connect to fake rendezvous")??;
|
||||
let mut websocket = timeout(TEST_TIMEOUT, accept_async(socket))
|
||||
.await
|
||||
.context("fake rendezvous should accept executor websocket")??;
|
||||
.context("fake rendezvous should accept environment websocket")??;
|
||||
|
||||
let stream_a = "stream-a";
|
||||
let stream_b = "stream-b";
|
||||
@@ -172,8 +194,8 @@ async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Res
|
||||
)?;
|
||||
|
||||
websocket.close(None).await?;
|
||||
remote_executor.abort();
|
||||
let _ = remote_executor.await;
|
||||
remote_environment.abort();
|
||||
let _ = remote_environment.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -265,7 +287,7 @@ where
|
||||
let frame = timeout(TEST_TIMEOUT, websocket.next())
|
||||
.await
|
||||
.context("timed out waiting for relay frame")?
|
||||
.ok_or_else(|| anyhow!("executor websocket closed"))??;
|
||||
.ok_or_else(|| anyhow!("environment websocket closed"))??;
|
||||
match frame {
|
||||
Message::Binary(bytes) => {
|
||||
let frame = RelayMessageFrame::decode(bytes.as_ref())?;
|
||||
@@ -277,8 +299,8 @@ where
|
||||
return Ok((stream_id, message));
|
||||
}
|
||||
Message::Ping(_) | Message::Pong(_) => {}
|
||||
Message::Close(_) => bail!("executor websocket closed"),
|
||||
Message::Text(_) => bail!("executor sent text frame on relay websocket"),
|
||||
Message::Close(_) => bail!("environment websocket closed"),
|
||||
Message::Text(_) => bail!("environment sent text frame on relay websocket"),
|
||||
Message::Frame(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user