Compare commits

...

1 Commits

Author SHA1 Message Date
Anton Panasenko
55ee42082f feat(exec): make executor support remote environments 2026-04-15 18:55:29 -07:00
11 changed files with 1272 additions and 70 deletions

7
codex-rs/Cargo.lock generated
View File

@@ -2102,18 +2102,24 @@ dependencies = [
"async-trait",
"base64 0.22.1",
"codex-app-server-protocol",
"codex-client",
"codex-config",
"codex-login",
"codex-protocol",
"codex-sandboxing",
"codex-test-binary-support",
"codex-utils-absolute-path",
"codex-utils-pty",
"codex-utils-rustls-provider",
"ctor 0.6.3",
"futures",
"gethostname",
"pretty_assertions",
"reqwest",
"serde",
"serde_json",
"serial_test",
"sha2",
"tempfile",
"test-case",
"thiserror 2.0.18",
@@ -2121,6 +2127,7 @@ dependencies = [
"tokio-tungstenite",
"tracing",
"uuid",
"wiremock",
]
[[package]]

View File

@@ -361,12 +361,10 @@ pub async fn run_main_with_transport(
session_source: SessionSource,
auth: AppServerWebsocketAuthSettings,
) -> IoResult<()> {
let environment_manager = Arc::new(EnvironmentManager::from_env_with_runtime_paths(Some(
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
)));
let local_runtime_paths = ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
@@ -569,6 +567,12 @@ pub async fn run_main_with_transport(
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
let environment_manager = Arc::new(
EnvironmentManager::from_env_with_runtime_paths_and_auth_manager(
Some(local_runtime_paths),
Some(auth_manager.clone()),
),
);
let remote_control_enabled = config.features.enabled(Feature::RemoteControl);
if transport_accept_handles.is_empty() && !remote_control_enabled {

View File

@@ -30,6 +30,7 @@ use codex_tui::ExitReason;
use codex_tui::UpdateAction;
use codex_utils_cli::CliConfigOverrides;
use owo_colors::OwoColorize;
use std::collections::BTreeMap;
use std::io::IsTerminal;
use std::path::PathBuf;
use supports_color::Stream;
@@ -393,13 +394,45 @@ struct AppServerCommand {
#[derive(Debug, Parser)]
struct ExecServerCommand {
/// Transport endpoint URL. Supported values: `ws://IP:PORT` (default).
/// Transport endpoint URL. Supported values: ws://IP:PORT (default).
#[arg(
long = "listen",
value_name = "URL",
default_value = "ws://127.0.0.1:0"
)]
listen: String,
/// Register this exec-server as a cloud executor instead of listening locally.
#[arg(long = "cloud", default_value_t = false)]
cloud: bool,
/// Cloud environments service base URL.
#[arg(long = "cloud-base-url", value_name = "URL")]
cloud_base_url: Option<String>,
/// Existing cloud environment id to attach to. Omit to let the service create one.
#[arg(long = "cloud-environment-id", value_name = "ID")]
cloud_environment_id: Option<String>,
/// Existing cloud executor id to reconnect.
#[arg(long = "cloud-executor-id", value_name = "ID")]
cloud_executor_id: Option<String>,
/// Registration idempotency id. Defaults to a deterministic id for this executor.
#[arg(long = "cloud-idempotency-id", value_name = "ID")]
cloud_idempotency_id: Option<String>,
/// Human-readable executor name. Defaults to the hostname.
#[arg(long = "cloud-name", value_name = "NAME")]
cloud_name: Option<String>,
/// Executor label in KEY=VALUE form. Repeatable.
#[arg(long = "cloud-label", value_name = "KEY=VALUE", action = clap::ArgAction::Append)]
cloud_label: Vec<String>,
/// Executor metadata as a JSON object.
#[arg(long = "cloud-metadata-json", value_name = "JSON")]
cloud_metadata_json: Option<String>,
}
#[derive(Debug, clap::Subcommand)]
@@ -1046,7 +1079,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
root_remote_auth_token_env.as_deref(),
"exec-server",
)?;
run_exec_server_command(cmd, &arg0_paths).await?;
run_exec_server_command(cmd, &arg0_paths, root_config_overrides).await?;
}
Some(Subcommand::Features(FeaturesCli { sub })) => match sub {
FeaturesSubcommand::List => {
@@ -1121,6 +1154,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
async fn run_exec_server_command(
cmd: ExecServerCommand,
arg0_paths: &Arg0DispatchPaths,
root_config_overrides: CliConfigOverrides,
) -> anyhow::Result<()> {
let codex_self_exe = arg0_paths
.codex_self_exe
@@ -1130,11 +1164,63 @@ async fn run_exec_server_command(
codex_self_exe,
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
if cmd.cloud {
let cloud_base_url = cmd
.cloud_base_url
.or_else(|| std::env::var(codex_exec_server::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR).ok())
.ok_or_else(|| {
anyhow::anyhow!(
"--cloud-base-url or CODEX_CLOUD_ENVIRONMENTS_BASE_URL is required in cloud mode"
)
})?;
let cli_overrides = root_config_overrides
.parse_overrides()
.map_err(anyhow::Error::msg)?;
let config = Config::load_with_cli_overrides(cli_overrides).await?;
let auth_manager = codex_login::AuthManager::shared_from_config(&config, false);
let mut cloud_config = codex_exec_server::CloudExecutorConfig::new(cloud_base_url);
cloud_config.cloud_environment_id = cmd.cloud_environment_id;
cloud_config.cloud_executor_id = cmd.cloud_executor_id;
cloud_config.cloud_idempotency_id = cmd.cloud_idempotency_id;
if let Some(name) = cmd.cloud_name {
cloud_config.cloud_name = name;
}
cloud_config.cloud_labels = parse_cloud_labels(cmd.cloud_label)?;
cloud_config.cloud_metadata = parse_cloud_metadata_json(cmd.cloud_metadata_json)?;
codex_exec_server::run_cloud_executor(cloud_config, auth_manager, runtime_paths).await?;
return Ok(());
}
codex_exec_server::run_main(&cmd.listen, runtime_paths)
.await
.map_err(anyhow::Error::from_boxed)
}
fn parse_cloud_labels(labels: Vec<String>) -> anyhow::Result<BTreeMap<String, String>> {
let mut parsed = BTreeMap::new();
for label in labels {
let (key, value) = label
.split_once('=')
.ok_or_else(|| anyhow::anyhow!("cloud labels must be in KEY=VALUE form"))?;
let key = key.trim();
if key.is_empty() {
anyhow::bail!("cloud label keys cannot be empty");
}
parsed.insert(key.to_string(), value.to_string());
}
Ok(parsed)
}
fn parse_cloud_metadata_json(metadata_json: Option<String>) -> anyhow::Result<serde_json::Value> {
let Some(metadata_json) = metadata_json else {
return Ok(serde_json::Value::Object(Default::default()));
};
let metadata = serde_json::from_str::<serde_json::Value>(&metadata_json)?;
if !metadata.is_object() {
anyhow::bail!("--cloud-metadata-json must be a JSON object");
}
Ok(metadata)
}
async fn enable_feature_in_config(interactive: &TuiCli, feature: &str) -> anyhow::Result<()> {
FeatureToggles::validate_feature(feature)?;
let codex_home = find_codex_home()?;

View File

@@ -15,14 +15,20 @@ arc-swap = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-client = { workspace = true }
codex-config = { workspace = true }
codex-login = { workspace = true }
codex-protocol = { workspace = true }
codex-sandboxing = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-pty = { workspace = true }
codex-utils-rustls-provider = { workspace = true }
futures = { workspace = true }
gethostname = { workspace = true }
reqwest = { workspace = true, features = ["json", "rustls-tls"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = [
"fs",
@@ -47,3 +53,4 @@ pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }
test-case = "3.3.1"
wiremock = { workspace = true }

View File

@@ -4,6 +4,7 @@ use std::time::Duration;
use arc_swap::ArcSwap;
use codex_app_server_protocol::JSONRPCNotification;
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::watch;
@@ -158,12 +159,25 @@ pub enum ExecServerError {
Protocol(String),
#[error("exec-server rejected request ({code}): {message}")]
Server { code: i64, message: String },
#[error("cloud environments request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())]
CloudEnvironmentHttp {
status: reqwest::StatusCode,
code: Option<String>,
message: String,
},
#[error("cloud environment configuration error: {0}")]
CloudEnvironmentConfig(String),
#[error("cloud environment authentication error: {0}")]
CloudEnvironmentAuth(String),
#[error("cloud environments request failed: {0}")]
CloudEnvironmentRequest(#[from] reqwest::Error),
}
impl ExecServerClient {
pub async fn connect_websocket(
args: RemoteExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
ensure_rustls_crypto_provider();
let websocket_url = args.websocket_url.clone();
let connect_timeout = args.connect_timeout;
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))

View File

@@ -0,0 +1,640 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use codex_client::CodexHttpClient;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_login::default_client::create_client;
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use reqwest::StatusCode;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use sha2::Digest as _;
use tokio::time::sleep;
use tokio_tungstenite::connect_async;
use tracing::info;
use tracing::warn;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::connection::JsonRpcConnection;
use crate::server::ConnectionProcessor;
pub const CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR: &str = "CODEX_CLOUD_ENVIRONMENT_ID";
pub const CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR: &str = "CODEX_CLOUD_ENVIRONMENTS_BASE_URL";
const PROTOCOL_VERSION: &str = "codex-exec-server-v1";
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
#[derive(Clone)]
pub struct CloudEnvironmentClient {
base_url: String,
http: CodexHttpClient,
auth_manager: Arc<AuthManager>,
}
impl std::fmt::Debug for CloudEnvironmentClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CloudEnvironmentClient")
.field("base_url", &self.base_url)
.finish_non_exhaustive()
}
}
impl CloudEnvironmentClient {
pub fn new(base_url: String, auth_manager: Arc<AuthManager>) -> Result<Self, ExecServerError> {
let base_url = normalize_base_url(base_url)?;
Ok(Self {
base_url,
http: create_client(),
auth_manager,
})
}
#[cfg(test)]
fn endpoint_url(&self, path: &str) -> String {
endpoint_url(&self.base_url, path)
}
pub async fn connect_environment(
&self,
environment_id: &str,
) -> Result<CloudEnvironmentConnectResponse, ExecServerError> {
let path = format!("/api/cloud/environment/{environment_id}");
self.post_json(&path, &EmptyRequest {}).await
}
pub async fn register_executor(
&self,
request: &CloudEnvironmentRegisterExecutorRequest,
) -> Result<CloudEnvironmentExecutorRegistrationResponse, ExecServerError> {
self.post_json("/api/cloud/executor", request).await
}
pub async fn reconnect_executor(
&self,
executor_id: &str,
) -> Result<CloudEnvironmentExecutorRegistrationResponse, ExecServerError> {
let path = format!("/api/cloud/executor/{executor_id}");
self.post_json(&path, &EmptyRequest {}).await
}
async fn post_json<T, R>(&self, path: &str, request: &T) -> Result<R, ExecServerError>
where
T: Serialize + Sync,
R: for<'de> Deserialize<'de>,
{
for attempt in 0..=1 {
let auth = cloud_environment_chatgpt_auth(&self.auth_manager).await?;
let response = self
.http
.post(endpoint_url(&self.base_url, path))
.bearer_auth(chatgpt_bearer_token(&auth)?)
.header("chatgpt-account-id", chatgpt_account_id(&auth)?)
.json(request)
.send()
.await?;
if response.status().is_success() {
return response.json::<R>().await.map_err(ExecServerError::from);
}
let status = response.status();
let body = response.text().await.unwrap_or_default();
if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN)
&& attempt == 0
&& recover_unauthorized(&self.auth_manager).await
{
continue;
}
return Err(cloud_http_error(status, &body));
}
unreachable!("cloud environments request loop is bounded to two attempts")
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
pub struct CloudEnvironmentRegisterExecutorRequest {
pub idempotency_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub environment_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
pub labels: BTreeMap<String, String>,
pub metadata: Value,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
pub struct CloudEnvironmentConnectResponse {
pub environment_id: String,
pub executor_id: String,
pub url: String,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
pub struct CloudEnvironmentExecutorRegistrationResponse {
pub id: String,
pub environment_id: String,
pub url: String,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct CloudExecutorConfig {
pub cloud_base_url: String,
pub cloud_environment_id: Option<String>,
pub cloud_executor_id: Option<String>,
pub cloud_idempotency_id: Option<String>,
pub cloud_name: String,
pub cloud_labels: BTreeMap<String, String>,
pub cloud_metadata: Value,
}
impl CloudExecutorConfig {
pub fn new(cloud_base_url: String) -> Self {
Self {
cloud_base_url,
cloud_environment_id: None,
cloud_executor_id: None,
cloud_idempotency_id: None,
cloud_name: default_executor_name(),
cloud_labels: BTreeMap::new(),
cloud_metadata: Value::Object(Default::default()),
}
}
fn registration_request(
&self,
auth: &CodexAuth,
) -> Result<CloudEnvironmentRegisterExecutorRequest, ExecServerError> {
let idempotency_id = match &self.cloud_idempotency_id {
Some(idempotency_id) => idempotency_id.clone(),
None => self.default_idempotency_id(auth)?,
};
Ok(CloudEnvironmentRegisterExecutorRequest {
idempotency_id,
environment_id: self.cloud_environment_id.clone(),
name: Some(self.cloud_name.clone()),
labels: self.cloud_labels.clone(),
metadata: self.cloud_metadata.clone(),
})
}
fn default_idempotency_id(&self, auth: &CodexAuth) -> Result<String, ExecServerError> {
let mut hasher = sha2::Sha256::new();
let account_id = chatgpt_account_id(auth)?;
hasher.update(account_id.as_bytes());
hasher.update(b"\0");
hasher.update(self.cloud_environment_id.as_deref().unwrap_or("auto"));
hasher.update(b"\0");
hasher.update(self.cloud_name.as_bytes());
hasher.update(b"\0");
hasher.update(serde_json::to_string(&self.cloud_labels).unwrap_or_default());
hasher.update(b"\0");
hasher.update(canonical_json(&self.cloud_metadata));
hasher.update(b"\0");
hasher.update(PROTOCOL_VERSION);
let digest = hasher.finalize();
Ok(format!("codex-exec-server-{digest:x}"))
}
}
pub async fn run_cloud_executor(
config: CloudExecutorConfig,
auth_manager: Arc<AuthManager>,
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), ExecServerError> {
let client = CloudEnvironmentClient::new(config.cloud_base_url.clone(), auth_manager.clone())?;
let processor = ConnectionProcessor::new(runtime_paths);
let mut executor_id = config.cloud_executor_id.clone();
let mut backoff = Duration::from_secs(1);
loop {
let signed_url = if let Some(existing_executor_id) = executor_id.as_deref() {
let response = client.reconnect_executor(existing_executor_id).await?;
executor_id = Some(response.id.clone());
eprintln!(
"codex exec-server cloud executor {} connected to environment {}",
response.id, response.environment_id
);
response.url
} else {
let auth = cloud_environment_chatgpt_auth(&auth_manager).await?;
let request = config.registration_request(&auth)?;
let response = client.register_executor(&request).await?;
executor_id = Some(response.id.clone());
eprintln!(
"codex exec-server cloud executor {} registered in environment {}",
response.id, response.environment_id
);
response.url
};
ensure_rustls_crypto_provider();
match connect_async(signed_url.as_str()).await {
Ok((websocket, _)) => {
backoff = Duration::from_secs(1);
processor
.run_connection(JsonRpcConnection::from_websocket(
websocket,
"cloud exec-server websocket".to_string(),
))
.await;
}
Err(err) => {
warn!("failed to connect cloud exec-server websocket: {err}");
}
}
sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(30));
}
}
async fn cloud_environment_chatgpt_auth(
auth_manager: &AuthManager,
) -> Result<CodexAuth, ExecServerError> {
let mut reloaded = false;
let auth = loop {
let Some(auth) = auth_manager.auth().await else {
if reloaded {
return Err(ExecServerError::CloudEnvironmentAuth(
"cloud environments require ChatGPT authentication".to_string(),
));
}
auth_manager.reload();
reloaded = true;
continue;
};
if !auth.is_chatgpt_auth() {
return Err(ExecServerError::CloudEnvironmentAuth(
"cloud environments require ChatGPT authentication; API key auth is not supported"
.to_string(),
));
}
if auth.get_account_id().is_none() && !reloaded {
auth_manager.reload();
reloaded = true;
continue;
}
break auth;
};
let _ = chatgpt_bearer_token(&auth)?;
let _ = chatgpt_account_id(&auth)?;
Ok(auth)
}
fn chatgpt_bearer_token(auth: &CodexAuth) -> Result<String, ExecServerError> {
auth.get_token()
.map_err(|err| ExecServerError::CloudEnvironmentAuth(err.to_string()))
}
fn chatgpt_account_id(auth: &CodexAuth) -> Result<String, ExecServerError> {
auth.get_account_id().ok_or_else(|| {
ExecServerError::CloudEnvironmentAuth(
"cloud environments are waiting for a ChatGPT account id".to_string(),
)
})
}
async fn recover_unauthorized(auth_manager: &Arc<AuthManager>) -> bool {
let mut recovery = auth_manager.unauthorized_recovery();
if !recovery.has_next() {
return false;
}
let mode = recovery.mode_name();
let step = recovery.step_name();
match recovery.next().await {
Ok(step_result) => {
info!(
"cloud environment auth recovery succeeded: mode={mode}, step={step}, auth_state_changed={:?}",
step_result.auth_state_changed()
);
true
}
Err(err) => {
warn!("cloud environment auth recovery failed: mode={mode}, step={step}: {err}");
false
}
}
}
#[derive(Serialize)]
struct EmptyRequest {}
#[derive(Deserialize)]
struct CloudErrorBody {
error: Option<CloudError>,
}
#[derive(Deserialize)]
struct CloudError {
code: Option<String>,
message: Option<String>,
}
fn normalize_base_url(base_url: String) -> Result<String, ExecServerError> {
let trimmed = base_url.trim().trim_end_matches('/').to_string();
if trimmed.is_empty() {
return Err(ExecServerError::CloudEnvironmentConfig(
"cloud environments base URL is required".to_string(),
));
}
Ok(trimmed)
}
fn endpoint_url(base_url: &str, path: &str) -> String {
format!("{base_url}/{}", path.trim_start_matches('/'))
}
fn cloud_http_error(status: StatusCode, body: &str) -> ExecServerError {
let parsed = serde_json::from_str::<CloudErrorBody>(body).ok();
let (code, message) = parsed
.and_then(|body| body.error)
.map(|error| {
(
error.code,
error.message.unwrap_or_else(|| {
preview_error_body(body).unwrap_or_else(|| "empty error body".to_string())
}),
)
})
.unwrap_or_else(|| {
(
None,
preview_error_body(body)
.unwrap_or_else(|| "empty or malformed error body".to_string()),
)
});
ExecServerError::CloudEnvironmentHttp {
status,
code,
message,
}
}
fn preview_error_body(body: &str) -> Option<String> {
let trimmed = body.trim();
if trimmed.is_empty() {
return None;
}
Some(trimmed.chars().take(ERROR_BODY_PREVIEW_BYTES).collect())
}
fn default_executor_name() -> String {
gethostname::gethostname()
.to_str()
.filter(|hostname| !hostname.is_empty())
.unwrap_or("codex-exec-server")
.to_string()
}
fn canonical_json(value: &Value) -> String {
match value {
Value::Object(map) => {
let sorted = map
.iter()
.map(|(key, value)| (key, sorted_json_value(value)))
.collect::<BTreeMap<_, _>>();
serde_json::to_string(&sorted).unwrap_or_default()
}
_ => serde_json::to_string(value).unwrap_or_default(),
}
}
fn sorted_json_value(value: &Value) -> Value {
match value {
Value::Array(values) => Value::Array(values.iter().map(sorted_json_value).collect()),
Value::Object(map) => Value::Object(
map.iter()
.map(|(key, value)| (key.clone(), sorted_json_value(value)))
.collect(),
),
value => value.clone(),
}
}
#[cfg(test)]
mod tests {
use base64::Engine as _;
use codex_config::types::AuthCredentialsStoreMode;
use codex_login::CodexAuth;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::body_json;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::*;
const TEST_ACCESS_TOKEN: &str = "test-access-token";
const TEST_REFRESHED_ACCESS_TOKEN: &str = "test-refreshed-access-token";
const TEST_ACCOUNT_ID: &str = "acct-1";
fn auth_manager() -> Arc<AuthManager> {
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing())
}
fn auth_manager_with_stored_chatgpt_auth() -> (TempDir, Arc<AuthManager>) {
let codex_home = tempfile::tempdir().expect("create temp codex home");
write_auth_json(codex_home.path(), TEST_ACCESS_TOKEN, TEST_ACCOUNT_ID);
let auth_manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
);
(codex_home, auth_manager)
}
fn write_auth_json(codex_home: &std::path::Path, access_token: &str, account_id: &str) {
let auth_json = json!({
"auth_mode": "chatgpt",
"tokens": {
"id_token": fake_jwt(account_id),
"access_token": access_token,
"refresh_token": "test-refresh-token",
"account_id": account_id,
},
"last_refresh": "2999-01-01T00:00:00Z",
});
std::fs::write(
codex_home.join("auth.json"),
serde_json::to_string_pretty(&auth_json).expect("serialize auth json"),
)
.expect("write auth json");
}
fn fake_jwt(account_id: &str) -> String {
let header = json!({
"alg": "none",
"typ": "JWT",
});
let payload = json!({
"email": "user@example.com",
"https://api.openai.com/auth": {
"chatgpt_account_id": account_id,
"chatgpt_user_id": "user-12345",
},
});
let b64 = |value: &serde_json::Value| {
base64::engine::general_purpose::URL_SAFE_NO_PAD
.encode(serde_json::to_vec(value).expect("serialize jwt part"))
};
let signature = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"sig");
format!("{}.{}.{}", b64(&header), b64(&payload), signature)
}
#[test]
fn normalizes_base_url_and_builds_endpoints() {
let client = CloudEnvironmentClient::new(
"https://cloud.example.test/root/".to_string(),
auth_manager(),
)
.expect("client");
assert_eq!(
client.endpoint_url("/api/cloud/environment/env-1"),
"https://cloud.example.test/root/api/cloud/environment/env-1"
);
}
#[test]
fn cloud_response_serde_matches_service_shape() {
let connect: CloudEnvironmentConnectResponse = serde_json::from_value(json!({
"environment_id": "env-1",
"executor_id": "exec-1",
"url": "wss://rendezvous.test/executor/exec-1?role=harness&sig=abc"
}))
.expect("connect response");
let registration: CloudEnvironmentExecutorRegistrationResponse =
serde_json::from_value(json!({
"id": "exec-1",
"environment_id": "env-1",
"url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc"
}))
.expect("registration response");
assert_eq!(
connect,
CloudEnvironmentConnectResponse {
environment_id: "env-1".to_string(),
executor_id: "exec-1".to_string(),
url: "wss://rendezvous.test/executor/exec-1?role=harness&sig=abc".to_string(),
}
);
assert_eq!(
registration,
CloudEnvironmentExecutorRegistrationResponse {
id: "exec-1".to_string(),
environment_id: "env-1".to_string(),
url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(),
}
);
}
#[test]
fn cloud_error_body_is_preserved() {
let err = cloud_http_error(
StatusCode::CONFLICT,
r#"{"error":{"code":"no_online_executor","message":"no executor is online"}}"#,
);
assert_eq!(
err.to_string(),
"cloud environments request failed (409 Conflict, no_online_executor): no executor is online"
);
}
#[tokio::test]
async fn connect_environment_posts_with_chatgpt_auth_headers() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/cloud/environment/env-1"))
.and(header(
"authorization",
format!("Bearer {TEST_ACCESS_TOKEN}"),
))
.and(header("chatgpt-account-id", TEST_ACCOUNT_ID))
.and(body_json(json!({})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"environment_id": "env-1",
"executor_id": "exec-1",
"url": "ws://127.0.0.1:1234"
})))
.mount(&server)
.await;
let (_codex_home, auth_manager) = auth_manager_with_stored_chatgpt_auth();
let client = CloudEnvironmentClient::new(server.uri(), auth_manager).expect("client");
let response = client
.connect_environment("env-1")
.await
.expect("connect environment");
assert_eq!(
response,
CloudEnvironmentConnectResponse {
environment_id: "env-1".to_string(),
executor_id: "exec-1".to_string(),
url: "ws://127.0.0.1:1234".to_string(),
}
);
}
#[tokio::test]
async fn retries_once_after_unauthorized_recovery() {
let (codex_home, auth_manager) = auth_manager_with_stored_chatgpt_auth();
write_auth_json(
codex_home.path(),
TEST_REFRESHED_ACCESS_TOKEN,
TEST_ACCOUNT_ID,
);
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/cloud/environment/env-1"))
.and(header(
"authorization",
format!("Bearer {TEST_ACCESS_TOKEN}"),
))
.respond_with(ResponseTemplate::new(401).set_body_json(json!({
"error": {
"code": "unauthorized",
"message": "expired token"
}
})))
.expect(1)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/api/cloud/environment/env-1"))
.and(header(
"authorization",
format!("Bearer {TEST_REFRESHED_ACCESS_TOKEN}"),
))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"environment_id": "env-1",
"executor_id": "exec-1",
"url": "ws://127.0.0.1:1234"
})))
.expect(1)
.mount(&server)
.await;
let client = CloudEnvironmentClient::new(server.uri(), auth_manager).expect("client");
client
.connect_environment("env-1")
.await
.expect("connect environment");
}
}

View File

@@ -1,7 +1,13 @@
use std::sync::Arc;
use async_trait::async_trait;
use codex_login::AuthManager;
use codex_login::AuthManagerConfig;
use tokio::sync::OnceCell;
use crate::CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR;
use crate::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR;
use crate::CloudEnvironmentClient;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
@@ -15,18 +21,160 @@ use crate::remote_process::RemoteProcess;
pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
/// Resolves the execution/filesystem environment for a session.
///
/// Implementations own the selection-specific details, such as whether a remote
/// websocket URL is used directly or resolved from cloud environment metadata.
#[async_trait]
pub trait EnvironmentResolver: std::fmt::Debug + Send + Sync {
fn exec_server_url(&self) -> Option<&str>;
fn is_remote(&self) -> bool;
async fn create_environment(&self) -> Result<Option<Environment>, ExecServerError>;
}
/// Resolver for the existing direct/local/disabled behavior.
#[derive(Debug)]
pub struct DefaultEnvironmentResolver {
exec_server_url: Option<String>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
disabled: bool,
}
impl DefaultEnvironmentResolver {
pub fn new(
exec_server_url: Option<String>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
Self {
exec_server_url,
local_runtime_paths,
disabled,
}
}
pub fn disabled() -> Self {
Self {
exec_server_url: None,
local_runtime_paths: None,
disabled: true,
}
}
}
#[async_trait]
impl EnvironmentResolver for DefaultEnvironmentResolver {
fn exec_server_url(&self) -> Option<&str> {
self.exec_server_url.as_deref()
}
fn is_remote(&self) -> bool {
self.exec_server_url.is_some()
}
async fn create_environment(&self) -> Result<Option<Environment>, ExecServerError> {
if self.disabled {
return Ok(None);
}
Ok(Some(
Environment::create_with_runtime_paths(
self.exec_server_url.clone(),
/*cloud_environment_id*/ None,
/*cloud_environments_base_url*/ None,
/*auth_manager*/ None,
self.local_runtime_paths.clone(),
)
.await?,
))
}
}
/// Resolver for cloud environment id selection.
pub struct CloudEnvironmentResolver {
cloud_environment_id: String,
cloud_environments_base_url: Option<String>,
auth_manager: Option<Arc<AuthManager>>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
}
impl CloudEnvironmentResolver {
pub fn new(
cloud_environment_id: String,
cloud_environments_base_url: Option<String>,
auth_manager: Option<Arc<AuthManager>>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
Self {
cloud_environment_id,
cloud_environments_base_url: normalize_optional_env_value(cloud_environments_base_url),
auth_manager,
local_runtime_paths,
}
}
pub fn cloud_environment_id(&self) -> &str {
&self.cloud_environment_id
}
pub fn cloud_environments_base_url(&self) -> Option<&str> {
self.cloud_environments_base_url.as_deref()
}
}
impl std::fmt::Debug for CloudEnvironmentResolver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CloudEnvironmentResolver")
.field("cloud_environment_id", &self.cloud_environment_id)
.field(
"cloud_environments_base_url",
&self.cloud_environments_base_url,
)
.finish_non_exhaustive()
}
}
#[async_trait]
impl EnvironmentResolver for CloudEnvironmentResolver {
fn exec_server_url(&self) -> Option<&str> {
None
}
fn is_remote(&self) -> bool {
true
}
async fn create_environment(&self) -> Result<Option<Environment>, ExecServerError> {
Ok(Some(
Environment::create_with_runtime_paths(
/*exec_server_url*/ None,
Some(self.cloud_environment_id.clone()),
self.cloud_environments_base_url.clone(),
self.auth_manager.clone(),
self.local_runtime_paths.clone(),
)
.await?,
))
}
}
/// Lazily creates and caches the active environment for a session.
///
/// The manager keeps the session's environment selection stable so subagents
/// and follow-up turns preserve an explicit disabled state.
#[derive(Debug)]
pub struct EnvironmentManager {
exec_server_url: Option<String>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
disabled: bool,
resolver: Box<dyn EnvironmentResolver>,
current_environment: OnceCell<Option<Arc<Environment>>>,
}
impl std::fmt::Debug for EnvironmentManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EnvironmentManager")
.field("resolver", &self.resolver)
.finish_non_exhaustive()
}
}
impl Default for EnvironmentManager {
fn default() -> Self {
Self::new(/*exec_server_url*/ None)
@@ -45,11 +193,48 @@ impl EnvironmentManager {
exec_server_url: Option<String>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
Self {
Self::from_resolver(DefaultEnvironmentResolver::new(
exec_server_url,
local_runtime_paths,
disabled,
))
}
fn new_with_options(options: EnvironmentManagerOptions) -> Self {
let EnvironmentManagerOptions {
exec_server_url,
cloud_environment_id,
cloud_environments_base_url,
auth_manager,
local_runtime_paths,
} = options;
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
if exec_server_url.is_some() || disabled {
return Self::from_resolver(DefaultEnvironmentResolver {
exec_server_url,
local_runtime_paths,
disabled,
});
}
if let Some(cloud_environment_id) = normalize_optional_env_value(cloud_environment_id) {
Self::from_resolver(CloudEnvironmentResolver::new(
cloud_environment_id,
cloud_environments_base_url,
auth_manager,
local_runtime_paths,
))
} else {
Self::from_resolver(DefaultEnvironmentResolver {
exec_server_url: None,
local_runtime_paths,
disabled: false,
})
}
}
pub fn from_resolver(resolver: impl EnvironmentResolver + 'static) -> Self {
Self {
resolver: Box::new(resolver),
current_environment: OnceCell::new(),
}
}
@@ -64,56 +249,86 @@ impl EnvironmentManager {
pub fn from_env_with_runtime_paths(
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
Self::new_with_runtime_paths(
std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok(),
Self::new_with_options(EnvironmentManagerOptions {
exec_server_url: std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok(),
cloud_environment_id: std::env::var(CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR).ok(),
cloud_environments_base_url: std::env::var(CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR)
.ok(),
auth_manager: None,
local_runtime_paths,
)
})
}
/// Builds a manager from process environment variables and an auth manager.
/// This is the production constructor used after config/auth are
/// available.
pub fn from_env_with_runtime_paths_and_auth_manager(
local_runtime_paths: Option<ExecServerRuntimePaths>,
auth_manager: Option<Arc<AuthManager>>,
) -> Self {
Self::new_with_options(EnvironmentManagerOptions {
exec_server_url: std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok(),
cloud_environment_id: std::env::var(CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR).ok(),
cloud_environments_base_url: std::env::var(CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR)
.ok(),
auth_manager,
local_runtime_paths,
})
}
/// Builds a manager from process environment variables and the resolved
/// login config. Cloud environments use ChatGPT credentials, so API-key
/// auth from environment variables is intentionally disabled here.
pub fn from_env_with_runtime_paths_and_chatgpt_login_config(
local_runtime_paths: Option<ExecServerRuntimePaths>,
config: &impl AuthManagerConfig,
) -> Self {
let auth_manager =
AuthManager::shared_from_config(config, /*enable_codex_api_key_env*/ false);
Self::from_env_with_runtime_paths_and_auth_manager(local_runtime_paths, Some(auth_manager))
}
/// Builds a manager from the currently selected environment, or from the
/// disabled mode when no environment is available.
pub fn from_environment(environment: Option<&Environment>) -> Self {
match environment {
Some(environment) => Self {
exec_server_url: environment.exec_server_url().map(str::to_owned),
local_runtime_paths: environment.local_runtime_paths().cloned(),
disabled: false,
current_environment: OnceCell::new(),
},
None => Self {
exec_server_url: None,
local_runtime_paths: None,
disabled: true,
current_environment: OnceCell::new(),
},
Some(environment) => {
if let Some(cloud_environment_id) = &environment.cloud_environment_id {
return Self::from_resolver(CloudEnvironmentResolver::new(
cloud_environment_id.clone(),
environment.cloud_environments_base_url.clone(),
environment.auth_manager.clone(),
environment.local_runtime_paths().cloned(),
));
}
Self::from_resolver(DefaultEnvironmentResolver::new(
environment.exec_server_url().map(str::to_owned),
environment.local_runtime_paths().cloned(),
))
}
None => Self::from_resolver(DefaultEnvironmentResolver::disabled()),
}
}
/// Returns the remote exec-server URL when one is configured.
pub fn exec_server_url(&self) -> Option<&str> {
self.exec_server_url.as_deref()
self.resolver.exec_server_url()
}
/// Returns true when this manager is configured to use a remote exec server.
pub fn is_remote(&self) -> bool {
self.exec_server_url.is_some()
self.resolver.is_remote()
}
/// Returns the cached environment, creating it on first access.
pub async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
self.current_environment
.get_or_try_init(|| async {
if self.disabled {
Ok(None)
} else {
Ok(Some(Arc::new(
Environment::create_with_runtime_paths(
self.exec_server_url.clone(),
self.local_runtime_paths.clone(),
)
.await?,
)))
}
self.resolver
.create_environment()
.await
.map(|environment| environment.map(Arc::new))
})
.await
.map(Option::as_ref)
@@ -128,6 +343,9 @@ impl EnvironmentManager {
#[derive(Clone)]
pub struct Environment {
exec_server_url: Option<String>,
cloud_environment_id: Option<String>,
cloud_environments_base_url: Option<String>,
auth_manager: Option<Arc<AuthManager>>,
remote_exec_server_client: Option<ExecServerClient>,
exec_backend: Arc<dyn ExecBackend>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
@@ -137,6 +355,9 @@ impl Default for Environment {
fn default() -> Self {
Self {
exec_server_url: None,
cloud_environment_id: None,
cloud_environments_base_url: None,
auth_manager: None,
remote_exec_server_client: None,
exec_backend: Arc::new(LocalProcess::default()),
local_runtime_paths: None,
@@ -148,6 +369,7 @@ impl std::fmt::Debug for Environment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Environment")
.field("exec_server_url", &self.exec_server_url)
.field("cloud_environment_id", &self.cloud_environment_id)
.finish_non_exhaustive()
}
}
@@ -155,13 +377,23 @@ impl std::fmt::Debug for Environment {
impl Environment {
/// Builds an environment from the raw `CODEX_EXEC_SERVER_URL` value.
pub async fn create(exec_server_url: Option<String>) -> Result<Self, ExecServerError> {
Self::create_with_runtime_paths(exec_server_url, /*local_runtime_paths*/ None).await
Self::create_with_runtime_paths(
exec_server_url,
/*cloud_environment_id*/ None,
/*cloud_environments_base_url*/ None,
/*auth_manager*/ None,
/*local_runtime_paths*/ None,
)
.await
}
/// Builds an environment from the raw `CODEX_EXEC_SERVER_URL` value and
/// local runtime paths used when creating local filesystem helpers.
pub async fn create_with_runtime_paths(
exec_server_url: Option<String>,
cloud_environment_id: Option<String>,
cloud_environments_base_url: Option<String>,
auth_manager: Option<Arc<AuthManager>>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Result<Self, ExecServerError> {
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
@@ -171,17 +403,24 @@ impl Environment {
));
}
let cloud_environment_id = normalize_optional_env_value(cloud_environment_id);
let cloud_environments_base_url = normalize_optional_env_value(cloud_environments_base_url);
let remote_exec_server_client = if let Some(exec_server_url) = &exec_server_url {
Some(
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
websocket_url: exec_server_url.clone(),
client_name: "codex-environment".to_string(),
connect_timeout: std::time::Duration::from_secs(5),
initialize_timeout: std::time::Duration::from_secs(5),
resume_session_id: None,
})
.await?,
)
Some(connect_remote_exec_server(exec_server_url.clone()).await?)
} else if let Some(environment_id) = &cloud_environment_id {
let base_url = cloud_environments_base_url.clone().ok_or_else(|| {
ExecServerError::CloudEnvironmentConfig(format!(
"{CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR} is required when {CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR} is set"
))
})?;
let auth_manager = auth_manager.clone().ok_or_else(|| {
ExecServerError::CloudEnvironmentAuth(
"cloud environment selection requires ChatGPT authentication".to_string(),
)
})?;
let client = CloudEnvironmentClient::new(base_url, auth_manager)?;
let response = client.connect_environment(environment_id).await?;
Some(connect_remote_exec_server(response.url).await?)
} else {
None
};
@@ -195,6 +434,9 @@ impl Environment {
Ok(Self {
exec_server_url,
cloud_environment_id,
cloud_environments_base_url,
auth_manager,
remote_exec_server_client,
exec_backend,
local_runtime_paths,
@@ -202,7 +444,7 @@ impl Environment {
}
pub fn is_remote(&self) -> bool {
self.exec_server_url.is_some()
self.exec_server_url.is_some() || self.cloud_environment_id.is_some()
}
/// Returns the remote exec-server URL when this environment is remote.
@@ -236,12 +478,43 @@ fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Option<String>
Some(url) => (Some(url.to_string()), false),
}
}
fn normalize_optional_env_value(value: Option<String>) -> Option<String> {
value
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
async fn connect_remote_exec_server(
websocket_url: String,
) -> Result<ExecServerClient, ExecServerError> {
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
websocket_url,
client_name: "codex-environment".to_string(),
connect_timeout: std::time::Duration::from_secs(5),
initialize_timeout: std::time::Duration::from_secs(5),
resume_session_id: None,
})
.await
}
struct EnvironmentManagerOptions {
exec_server_url: Option<String>,
cloud_environment_id: Option<String>,
cloud_environments_base_url: Option<String>,
auth_manager: Option<Arc<AuthManager>>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::CloudEnvironmentResolver;
use super::Environment;
use super::EnvironmentManager;
use super::EnvironmentManagerOptions;
use crate::ExecServerRuntimePaths;
use crate::ProcessId;
use pretty_assertions::assert_eq;
@@ -260,7 +533,6 @@ mod tests {
fn environment_manager_normalizes_empty_url() {
let manager = EnvironmentManager::new(Some(String::new()));
assert!(!manager.disabled);
assert_eq!(manager.exec_server_url(), None);
assert!(!manager.is_remote());
}
@@ -269,7 +541,6 @@ mod tests {
fn environment_manager_treats_none_value_as_disabled() {
let manager = EnvironmentManager::new(Some("none".to_string()));
assert!(manager.disabled);
assert_eq!(manager.exec_server_url(), None);
assert!(!manager.is_remote());
}
@@ -282,6 +553,88 @@ mod tests {
assert_eq!(manager.exec_server_url(), Some("ws://127.0.0.1:8765"));
}
#[test]
fn direct_remote_url_takes_precedence_over_cloud_environment() {
let manager = EnvironmentManager::new_with_options(EnvironmentManagerOptions {
exec_server_url: Some("ws://127.0.0.1:8765".to_string()),
cloud_environment_id: Some("env-1".to_string()),
cloud_environments_base_url: Some("https://cloud.example.test".to_string()),
auth_manager: None,
local_runtime_paths: None,
});
assert!(manager.is_remote());
assert_eq!(manager.exec_server_url(), Some("ws://127.0.0.1:8765"));
}
#[test]
fn none_disables_cloud_environment_selection() {
let manager = EnvironmentManager::new_with_options(EnvironmentManagerOptions {
exec_server_url: Some("none".to_string()),
cloud_environment_id: Some("env-1".to_string()),
cloud_environments_base_url: Some("https://cloud.example.test".to_string()),
auth_manager: None,
local_runtime_paths: None,
});
assert!(!manager.is_remote());
}
#[test]
fn cloud_environment_id_is_remote_without_direct_url() {
let cloud_resolver = CloudEnvironmentResolver::new(
"env-1".to_string(),
Some(" https://cloud.example.test ".to_string()),
/*auth_manager*/ None,
/*local_runtime_paths*/ None,
);
assert_eq!(cloud_resolver.cloud_environment_id(), "env-1");
assert_eq!(
cloud_resolver.cloud_environments_base_url(),
Some("https://cloud.example.test")
);
let manager = EnvironmentManager::from_resolver(cloud_resolver);
assert!(manager.is_remote());
assert_eq!(manager.exec_server_url(), None);
}
#[tokio::test]
async fn cloud_environment_requires_base_url() {
let err = Environment::create_with_runtime_paths(
/*exec_server_url*/ None,
Some("env-1".to_string()),
/*cloud_environments_base_url*/ None,
/*auth_manager*/ None,
/*local_runtime_paths*/ None,
)
.await
.expect_err("missing base URL should fail");
assert_eq!(
err.to_string(),
"cloud environment configuration error: CODEX_CLOUD_ENVIRONMENTS_BASE_URL is required when CODEX_CLOUD_ENVIRONMENT_ID is set"
);
}
#[tokio::test]
async fn cloud_environment_requires_auth_manager() {
let err = Environment::create_with_runtime_paths(
/*exec_server_url*/ None,
Some("env-1".to_string()),
Some("https://cloud.example.test".to_string()),
/*auth_manager*/ None,
/*local_runtime_paths*/ None,
)
.await
.expect_err("missing auth should fail");
assert_eq!(
err.to_string(),
"cloud environment authentication error: cloud environment selection requires ChatGPT authentication"
);
}
#[tokio::test]
async fn environment_manager_current_caches_environment() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
@@ -314,9 +667,14 @@ mod tests {
.expect("local environment");
assert_eq!(environment.local_runtime_paths(), Some(&runtime_paths));
let inherited_environment = EnvironmentManager::from_environment(Some(&environment))
.current()
.await
.expect("get inherited current environment")
.expect("inherited local environment");
assert_eq!(
EnvironmentManager::from_environment(Some(&environment)).local_runtime_paths,
Some(runtime_paths)
inherited_environment.local_runtime_paths(),
Some(&runtime_paths)
);
}

View File

@@ -1,5 +1,6 @@
mod client;
mod client_api;
mod cloud;
mod connection;
mod environment;
mod file_system;
@@ -22,9 +23,20 @@ pub use client::ExecServerClient;
pub use client::ExecServerError;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::RemoteExecServerConnectArgs;
pub use cloud::CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR;
pub use cloud::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR;
pub use cloud::CloudEnvironmentClient;
pub use cloud::CloudEnvironmentConnectResponse;
pub use cloud::CloudEnvironmentExecutorRegistrationResponse;
pub use cloud::CloudEnvironmentRegisterExecutorRequest;
pub use cloud::CloudExecutorConfig;
pub use cloud::run_cloud_executor;
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
pub use environment::CloudEnvironmentResolver;
pub use environment::DefaultEnvironmentResolver;
pub use environment::Environment;
pub use environment::EnvironmentManager;
pub use environment::EnvironmentResolver;
pub use file_system::CopyOptions;
pub use file_system::CreateDirectoryOptions;
pub use file_system::ExecutorFileSystem;

View File

@@ -7,6 +7,7 @@ mod session_registry;
mod transport;
pub(crate) use handler::ExecServerHandler;
pub(crate) use processor::ConnectionProcessor;
pub use transport::DEFAULT_LISTEN_URL;
pub use transport::ExecServerListenUrlParseError;

View File

@@ -67,6 +67,7 @@ use codex_core::path_utils;
use codex_feedback::CodexFeedback;
use codex_git_utils::get_git_repo_root;
use codex_login::AuthConfig;
use codex_login::AuthManager;
use codex_login::default_client::set_default_client_residency_requirement;
use codex_login::default_client::set_default_originator;
use codex_login::enforce_login_restrictions;
@@ -474,6 +475,8 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
let environment_auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
let in_process_start_args = InProcessClientStartArgs {
arg0_paths,
config: std::sync::Arc::new(config.clone()),
@@ -481,9 +484,12 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
loader_overrides: run_loader_overrides,
cloud_requirements: run_cloud_requirements,
feedback: CodexFeedback::new(),
environment_manager: std::sync::Arc::new(EnvironmentManager::from_env_with_runtime_paths(
Some(local_runtime_paths),
)),
environment_manager: std::sync::Arc::new(
EnvironmentManager::from_env_with_runtime_paths_and_auth_manager(
Some(local_runtime_paths),
Some(environment_auth_manager),
),
),
config_warnings,
session_source: SessionSource::Exec,
enable_codex_api_key_env: true,

View File

@@ -723,15 +723,19 @@ pub async fn run_main(
}
};
let environment_manager = Arc::new(EnvironmentManager::from_env_with_runtime_paths(Some(
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
)));
let local_runtime_paths = ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
let preliminary_environment_manager = Arc::new(
EnvironmentManager::from_env_with_runtime_paths(Some(local_runtime_paths.clone())),
);
let cwd = cli.cwd.clone();
let config_cwd =
config_cwd_for_app_server_target(cwd.as_deref(), &app_server_target, &environment_manager)?;
let config_cwd = config_cwd_for_app_server_target(
cwd.as_deref(),
&app_server_target,
&preliminary_environment_manager,
)?;
#[allow(clippy::print_stderr)]
let config_toml = match load_config_as_toml_with_cli_overrides(
@@ -982,6 +986,13 @@ pub async fn run_main(
.with(otel_tracing_layer)
.try_init();
let environment_manager = Arc::new(
EnvironmentManager::from_env_with_runtime_paths_and_chatgpt_login_config(
Some(local_runtime_paths),
&config,
),
);
run_ratatui_app(
cli,
arg0_paths,
@@ -1755,6 +1766,34 @@ mod tests {
use serial_test::serial;
use tempfile::TempDir;
struct EnvVarGuard {
key: &'static str,
previous: Option<String>,
}
impl EnvVarGuard {
fn set(key: &'static str, value: &str) -> Self {
let previous = std::env::var(key).ok();
unsafe { std::env::set_var(key, value) };
Self { key, previous }
}
fn remove(key: &'static str) -> Self {
let previous = std::env::var(key).ok();
unsafe { std::env::remove_var(key) };
Self { key, previous }
}
}
impl Drop for EnvVarGuard {
fn drop(&mut self) {
match &self.previous {
Some(value) => unsafe { std::env::set_var(self.key, value) },
None => unsafe { std::env::remove_var(self.key) },
}
}
}
async fn build_config(temp_dir: &TempDir) -> std::io::Result<Config> {
ConfigBuilder::default()
.codex_home(temp_dir.path().to_path_buf())
@@ -1990,6 +2029,34 @@ mod tests {
Ok(())
}
#[test]
#[serial]
fn config_cwd_for_app_server_target_omits_cwd_for_cloud_exec_server() -> std::io::Result<()> {
let _exec_server_url =
EnvVarGuard::remove(codex_exec_server::CODEX_EXEC_SERVER_URL_ENV_VAR);
let _cloud_environment_id = EnvVarGuard::set(
codex_exec_server::CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR,
"env-1",
);
let _cloud_base_url = EnvVarGuard::set(
codex_exec_server::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR,
"https://cloud.example.test",
);
let remote_only_cwd = if cfg!(windows) {
Path::new(r"C:\definitely\not\local\to\this\test")
} else {
Path::new("/definitely/not/local/to/this/test")
};
let target = AppServerTarget::Embedded;
let environment_manager = EnvironmentManager::from_env_with_runtime_paths(None);
let config_cwd =
config_cwd_for_app_server_target(Some(remote_only_cwd), &target, &environment_manager)?;
assert_eq!(config_cwd, None);
Ok(())
}
#[tokio::test]
async fn read_session_cwd_returns_none_without_sqlite_or_rollout_path() -> std::io::Result<()> {
let temp_dir = TempDir::new()?;