mirror of
https://github.com/openai/codex.git
synced 2026-04-17 19:24:47 +00:00
Compare commits
1 Commits
dev/remote
...
anton_pana
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
55ee42082f |
7
codex-rs/Cargo.lock
generated
7
codex-rs/Cargo.lock
generated
@@ -2102,18 +2102,24 @@ dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"codex-app-server-protocol",
|
||||
"codex-client",
|
||||
"codex-config",
|
||||
"codex-login",
|
||||
"codex-protocol",
|
||||
"codex-sandboxing",
|
||||
"codex-test-binary-support",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-pty",
|
||||
"codex-utils-rustls-provider",
|
||||
"ctor 0.6.3",
|
||||
"futures",
|
||||
"gethostname",
|
||||
"pretty_assertions",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serial_test",
|
||||
"sha2",
|
||||
"tempfile",
|
||||
"test-case",
|
||||
"thiserror 2.0.18",
|
||||
@@ -2121,6 +2127,7 @@ dependencies = [
|
||||
"tokio-tungstenite",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"wiremock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -361,12 +361,10 @@ pub async fn run_main_with_transport(
|
||||
session_source: SessionSource,
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
) -> IoResult<()> {
|
||||
let environment_manager = Arc::new(EnvironmentManager::from_env_with_runtime_paths(Some(
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
)));
|
||||
let local_runtime_paths = ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
@@ -569,6 +567,12 @@ pub async fn run_main_with_transport(
|
||||
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::from_env_with_runtime_paths_and_auth_manager(
|
||||
Some(local_runtime_paths),
|
||||
Some(auth_manager.clone()),
|
||||
),
|
||||
);
|
||||
|
||||
let remote_control_enabled = config.features.enabled(Feature::RemoteControl);
|
||||
if transport_accept_handles.is_empty() && !remote_control_enabled {
|
||||
|
||||
@@ -30,6 +30,7 @@ use codex_tui::ExitReason;
|
||||
use codex_tui::UpdateAction;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use owo_colors::OwoColorize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::io::IsTerminal;
|
||||
use std::path::PathBuf;
|
||||
use supports_color::Stream;
|
||||
@@ -393,13 +394,45 @@ struct AppServerCommand {
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct ExecServerCommand {
|
||||
/// Transport endpoint URL. Supported values: `ws://IP:PORT` (default).
|
||||
/// Transport endpoint URL. Supported values: ws://IP:PORT (default).
|
||||
#[arg(
|
||||
long = "listen",
|
||||
value_name = "URL",
|
||||
default_value = "ws://127.0.0.1:0"
|
||||
)]
|
||||
listen: String,
|
||||
|
||||
/// Register this exec-server as a cloud executor instead of listening locally.
|
||||
#[arg(long = "cloud", default_value_t = false)]
|
||||
cloud: bool,
|
||||
|
||||
/// Cloud environments service base URL.
|
||||
#[arg(long = "cloud-base-url", value_name = "URL")]
|
||||
cloud_base_url: Option<String>,
|
||||
|
||||
/// Existing cloud environment id to attach to. Omit to let the service create one.
|
||||
#[arg(long = "cloud-environment-id", value_name = "ID")]
|
||||
cloud_environment_id: Option<String>,
|
||||
|
||||
/// Existing cloud executor id to reconnect.
|
||||
#[arg(long = "cloud-executor-id", value_name = "ID")]
|
||||
cloud_executor_id: Option<String>,
|
||||
|
||||
/// Registration idempotency id. Defaults to a deterministic id for this executor.
|
||||
#[arg(long = "cloud-idempotency-id", value_name = "ID")]
|
||||
cloud_idempotency_id: Option<String>,
|
||||
|
||||
/// Human-readable executor name. Defaults to the hostname.
|
||||
#[arg(long = "cloud-name", value_name = "NAME")]
|
||||
cloud_name: Option<String>,
|
||||
|
||||
/// Executor label in KEY=VALUE form. Repeatable.
|
||||
#[arg(long = "cloud-label", value_name = "KEY=VALUE", action = clap::ArgAction::Append)]
|
||||
cloud_label: Vec<String>,
|
||||
|
||||
/// Executor metadata as a JSON object.
|
||||
#[arg(long = "cloud-metadata-json", value_name = "JSON")]
|
||||
cloud_metadata_json: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, clap::Subcommand)]
|
||||
@@ -1046,7 +1079,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
root_remote_auth_token_env.as_deref(),
|
||||
"exec-server",
|
||||
)?;
|
||||
run_exec_server_command(cmd, &arg0_paths).await?;
|
||||
run_exec_server_command(cmd, &arg0_paths, root_config_overrides).await?;
|
||||
}
|
||||
Some(Subcommand::Features(FeaturesCli { sub })) => match sub {
|
||||
FeaturesSubcommand::List => {
|
||||
@@ -1121,6 +1154,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
async fn run_exec_server_command(
|
||||
cmd: ExecServerCommand,
|
||||
arg0_paths: &Arg0DispatchPaths,
|
||||
root_config_overrides: CliConfigOverrides,
|
||||
) -> anyhow::Result<()> {
|
||||
let codex_self_exe = arg0_paths
|
||||
.codex_self_exe
|
||||
@@ -1130,11 +1164,63 @@ async fn run_exec_server_command(
|
||||
codex_self_exe,
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
if cmd.cloud {
|
||||
let cloud_base_url = cmd
|
||||
.cloud_base_url
|
||||
.or_else(|| std::env::var(codex_exec_server::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR).ok())
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"--cloud-base-url or CODEX_CLOUD_ENVIRONMENTS_BASE_URL is required in cloud mode"
|
||||
)
|
||||
})?;
|
||||
let cli_overrides = root_config_overrides
|
||||
.parse_overrides()
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
let config = Config::load_with_cli_overrides(cli_overrides).await?;
|
||||
let auth_manager = codex_login::AuthManager::shared_from_config(&config, false);
|
||||
let mut cloud_config = codex_exec_server::CloudExecutorConfig::new(cloud_base_url);
|
||||
cloud_config.cloud_environment_id = cmd.cloud_environment_id;
|
||||
cloud_config.cloud_executor_id = cmd.cloud_executor_id;
|
||||
cloud_config.cloud_idempotency_id = cmd.cloud_idempotency_id;
|
||||
if let Some(name) = cmd.cloud_name {
|
||||
cloud_config.cloud_name = name;
|
||||
}
|
||||
cloud_config.cloud_labels = parse_cloud_labels(cmd.cloud_label)?;
|
||||
cloud_config.cloud_metadata = parse_cloud_metadata_json(cmd.cloud_metadata_json)?;
|
||||
codex_exec_server::run_cloud_executor(cloud_config, auth_manager, runtime_paths).await?;
|
||||
return Ok(());
|
||||
}
|
||||
codex_exec_server::run_main(&cmd.listen, runtime_paths)
|
||||
.await
|
||||
.map_err(anyhow::Error::from_boxed)
|
||||
}
|
||||
|
||||
fn parse_cloud_labels(labels: Vec<String>) -> anyhow::Result<BTreeMap<String, String>> {
|
||||
let mut parsed = BTreeMap::new();
|
||||
for label in labels {
|
||||
let (key, value) = label
|
||||
.split_once('=')
|
||||
.ok_or_else(|| anyhow::anyhow!("cloud labels must be in KEY=VALUE form"))?;
|
||||
let key = key.trim();
|
||||
if key.is_empty() {
|
||||
anyhow::bail!("cloud label keys cannot be empty");
|
||||
}
|
||||
parsed.insert(key.to_string(), value.to_string());
|
||||
}
|
||||
Ok(parsed)
|
||||
}
|
||||
|
||||
fn parse_cloud_metadata_json(metadata_json: Option<String>) -> anyhow::Result<serde_json::Value> {
|
||||
let Some(metadata_json) = metadata_json else {
|
||||
return Ok(serde_json::Value::Object(Default::default()));
|
||||
};
|
||||
let metadata = serde_json::from_str::<serde_json::Value>(&metadata_json)?;
|
||||
if !metadata.is_object() {
|
||||
anyhow::bail!("--cloud-metadata-json must be a JSON object");
|
||||
}
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
async fn enable_feature_in_config(interactive: &TuiCli, feature: &str) -> anyhow::Result<()> {
|
||||
FeatureToggles::validate_feature(feature)?;
|
||||
let codex_home = find_codex_home()?;
|
||||
|
||||
@@ -15,14 +15,20 @@ arc-swap = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-client = { workspace = true }
|
||||
codex-config = { workspace = true }
|
||||
codex-login = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-sandboxing = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-pty = { workspace = true }
|
||||
codex-utils-rustls-provider = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
gethostname = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json", "rustls-tls"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = [
|
||||
"fs",
|
||||
@@ -47,3 +53,4 @@ pretty_assertions = { workspace = true }
|
||||
serial_test = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
test-case = "3.3.1"
|
||||
wiremock = { workspace = true }
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::time::Duration;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::watch;
|
||||
@@ -158,12 +159,25 @@ pub enum ExecServerError {
|
||||
Protocol(String),
|
||||
#[error("exec-server rejected request ({code}): {message}")]
|
||||
Server { code: i64, message: String },
|
||||
#[error("cloud environments request failed ({status}{code_suffix}): {message}", code_suffix = .code.as_ref().map(|code| format!(", {code}")).unwrap_or_default())]
|
||||
CloudEnvironmentHttp {
|
||||
status: reqwest::StatusCode,
|
||||
code: Option<String>,
|
||||
message: String,
|
||||
},
|
||||
#[error("cloud environment configuration error: {0}")]
|
||||
CloudEnvironmentConfig(String),
|
||||
#[error("cloud environment authentication error: {0}")]
|
||||
CloudEnvironmentAuth(String),
|
||||
#[error("cloud environments request failed: {0}")]
|
||||
CloudEnvironmentRequest(#[from] reqwest::Error),
|
||||
}
|
||||
|
||||
impl ExecServerClient {
|
||||
pub async fn connect_websocket(
|
||||
args: RemoteExecServerConnectArgs,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
ensure_rustls_crypto_provider();
|
||||
let websocket_url = args.websocket_url.clone();
|
||||
let connect_timeout = args.connect_timeout;
|
||||
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
|
||||
|
||||
640
codex-rs/exec-server/src/cloud.rs
Normal file
640
codex-rs/exec-server/src/cloud.rs
Normal file
@@ -0,0 +1,640 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_client::CodexHttpClient;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
|
||||
use reqwest::StatusCode;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use sha2::Digest as _;
|
||||
use tokio::time::sleep;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ExecServerError;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::server::ConnectionProcessor;
|
||||
|
||||
pub const CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR: &str = "CODEX_CLOUD_ENVIRONMENT_ID";
|
||||
pub const CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR: &str = "CODEX_CLOUD_ENVIRONMENTS_BASE_URL";
|
||||
|
||||
const PROTOCOL_VERSION: &str = "codex-exec-server-v1";
|
||||
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CloudEnvironmentClient {
|
||||
base_url: String,
|
||||
http: CodexHttpClient,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for CloudEnvironmentClient {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CloudEnvironmentClient")
|
||||
.field("base_url", &self.base_url)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl CloudEnvironmentClient {
|
||||
pub fn new(base_url: String, auth_manager: Arc<AuthManager>) -> Result<Self, ExecServerError> {
|
||||
let base_url = normalize_base_url(base_url)?;
|
||||
Ok(Self {
|
||||
base_url,
|
||||
http: create_client(),
|
||||
auth_manager,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn endpoint_url(&self, path: &str) -> String {
|
||||
endpoint_url(&self.base_url, path)
|
||||
}
|
||||
|
||||
pub async fn connect_environment(
|
||||
&self,
|
||||
environment_id: &str,
|
||||
) -> Result<CloudEnvironmentConnectResponse, ExecServerError> {
|
||||
let path = format!("/api/cloud/environment/{environment_id}");
|
||||
self.post_json(&path, &EmptyRequest {}).await
|
||||
}
|
||||
|
||||
pub async fn register_executor(
|
||||
&self,
|
||||
request: &CloudEnvironmentRegisterExecutorRequest,
|
||||
) -> Result<CloudEnvironmentExecutorRegistrationResponse, ExecServerError> {
|
||||
self.post_json("/api/cloud/executor", request).await
|
||||
}
|
||||
|
||||
pub async fn reconnect_executor(
|
||||
&self,
|
||||
executor_id: &str,
|
||||
) -> Result<CloudEnvironmentExecutorRegistrationResponse, ExecServerError> {
|
||||
let path = format!("/api/cloud/executor/{executor_id}");
|
||||
self.post_json(&path, &EmptyRequest {}).await
|
||||
}
|
||||
|
||||
async fn post_json<T, R>(&self, path: &str, request: &T) -> Result<R, ExecServerError>
|
||||
where
|
||||
T: Serialize + Sync,
|
||||
R: for<'de> Deserialize<'de>,
|
||||
{
|
||||
for attempt in 0..=1 {
|
||||
let auth = cloud_environment_chatgpt_auth(&self.auth_manager).await?;
|
||||
let response = self
|
||||
.http
|
||||
.post(endpoint_url(&self.base_url, path))
|
||||
.bearer_auth(chatgpt_bearer_token(&auth)?)
|
||||
.header("chatgpt-account-id", chatgpt_account_id(&auth)?)
|
||||
.json(request)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
return response.json::<R>().await.map_err(ExecServerError::from);
|
||||
}
|
||||
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN)
|
||||
&& attempt == 0
|
||||
&& recover_unauthorized(&self.auth_manager).await
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
return Err(cloud_http_error(status, &body));
|
||||
}
|
||||
|
||||
unreachable!("cloud environments request loop is bounded to two attempts")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
|
||||
pub struct CloudEnvironmentRegisterExecutorRequest {
|
||||
pub idempotency_id: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub environment_id: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub name: Option<String>,
|
||||
pub labels: BTreeMap<String, String>,
|
||||
pub metadata: Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
|
||||
pub struct CloudEnvironmentConnectResponse {
|
||||
pub environment_id: String,
|
||||
pub executor_id: String,
|
||||
pub url: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
|
||||
pub struct CloudEnvironmentExecutorRegistrationResponse {
|
||||
pub id: String,
|
||||
pub environment_id: String,
|
||||
pub url: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct CloudExecutorConfig {
|
||||
pub cloud_base_url: String,
|
||||
pub cloud_environment_id: Option<String>,
|
||||
pub cloud_executor_id: Option<String>,
|
||||
pub cloud_idempotency_id: Option<String>,
|
||||
pub cloud_name: String,
|
||||
pub cloud_labels: BTreeMap<String, String>,
|
||||
pub cloud_metadata: Value,
|
||||
}
|
||||
|
||||
impl CloudExecutorConfig {
|
||||
pub fn new(cloud_base_url: String) -> Self {
|
||||
Self {
|
||||
cloud_base_url,
|
||||
cloud_environment_id: None,
|
||||
cloud_executor_id: None,
|
||||
cloud_idempotency_id: None,
|
||||
cloud_name: default_executor_name(),
|
||||
cloud_labels: BTreeMap::new(),
|
||||
cloud_metadata: Value::Object(Default::default()),
|
||||
}
|
||||
}
|
||||
|
||||
fn registration_request(
|
||||
&self,
|
||||
auth: &CodexAuth,
|
||||
) -> Result<CloudEnvironmentRegisterExecutorRequest, ExecServerError> {
|
||||
let idempotency_id = match &self.cloud_idempotency_id {
|
||||
Some(idempotency_id) => idempotency_id.clone(),
|
||||
None => self.default_idempotency_id(auth)?,
|
||||
};
|
||||
|
||||
Ok(CloudEnvironmentRegisterExecutorRequest {
|
||||
idempotency_id,
|
||||
environment_id: self.cloud_environment_id.clone(),
|
||||
name: Some(self.cloud_name.clone()),
|
||||
labels: self.cloud_labels.clone(),
|
||||
metadata: self.cloud_metadata.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn default_idempotency_id(&self, auth: &CodexAuth) -> Result<String, ExecServerError> {
|
||||
let mut hasher = sha2::Sha256::new();
|
||||
let account_id = chatgpt_account_id(auth)?;
|
||||
hasher.update(account_id.as_bytes());
|
||||
hasher.update(b"\0");
|
||||
hasher.update(self.cloud_environment_id.as_deref().unwrap_or("auto"));
|
||||
hasher.update(b"\0");
|
||||
hasher.update(self.cloud_name.as_bytes());
|
||||
hasher.update(b"\0");
|
||||
hasher.update(serde_json::to_string(&self.cloud_labels).unwrap_or_default());
|
||||
hasher.update(b"\0");
|
||||
hasher.update(canonical_json(&self.cloud_metadata));
|
||||
hasher.update(b"\0");
|
||||
hasher.update(PROTOCOL_VERSION);
|
||||
let digest = hasher.finalize();
|
||||
Ok(format!("codex-exec-server-{digest:x}"))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_cloud_executor(
|
||||
config: CloudExecutorConfig,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Result<(), ExecServerError> {
|
||||
let client = CloudEnvironmentClient::new(config.cloud_base_url.clone(), auth_manager.clone())?;
|
||||
let processor = ConnectionProcessor::new(runtime_paths);
|
||||
let mut executor_id = config.cloud_executor_id.clone();
|
||||
let mut backoff = Duration::from_secs(1);
|
||||
|
||||
loop {
|
||||
let signed_url = if let Some(existing_executor_id) = executor_id.as_deref() {
|
||||
let response = client.reconnect_executor(existing_executor_id).await?;
|
||||
executor_id = Some(response.id.clone());
|
||||
eprintln!(
|
||||
"codex exec-server cloud executor {} connected to environment {}",
|
||||
response.id, response.environment_id
|
||||
);
|
||||
response.url
|
||||
} else {
|
||||
let auth = cloud_environment_chatgpt_auth(&auth_manager).await?;
|
||||
let request = config.registration_request(&auth)?;
|
||||
let response = client.register_executor(&request).await?;
|
||||
executor_id = Some(response.id.clone());
|
||||
eprintln!(
|
||||
"codex exec-server cloud executor {} registered in environment {}",
|
||||
response.id, response.environment_id
|
||||
);
|
||||
response.url
|
||||
};
|
||||
|
||||
ensure_rustls_crypto_provider();
|
||||
match connect_async(signed_url.as_str()).await {
|
||||
Ok((websocket, _)) => {
|
||||
backoff = Duration::from_secs(1);
|
||||
processor
|
||||
.run_connection(JsonRpcConnection::from_websocket(
|
||||
websocket,
|
||||
"cloud exec-server websocket".to_string(),
|
||||
))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("failed to connect cloud exec-server websocket: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
sleep(backoff).await;
|
||||
backoff = (backoff * 2).min(Duration::from_secs(30));
|
||||
}
|
||||
}
|
||||
|
||||
async fn cloud_environment_chatgpt_auth(
|
||||
auth_manager: &AuthManager,
|
||||
) -> Result<CodexAuth, ExecServerError> {
|
||||
let mut reloaded = false;
|
||||
let auth = loop {
|
||||
let Some(auth) = auth_manager.auth().await else {
|
||||
if reloaded {
|
||||
return Err(ExecServerError::CloudEnvironmentAuth(
|
||||
"cloud environments require ChatGPT authentication".to_string(),
|
||||
));
|
||||
}
|
||||
auth_manager.reload();
|
||||
reloaded = true;
|
||||
continue;
|
||||
};
|
||||
if !auth.is_chatgpt_auth() {
|
||||
return Err(ExecServerError::CloudEnvironmentAuth(
|
||||
"cloud environments require ChatGPT authentication; API key auth is not supported"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
if auth.get_account_id().is_none() && !reloaded {
|
||||
auth_manager.reload();
|
||||
reloaded = true;
|
||||
continue;
|
||||
}
|
||||
break auth;
|
||||
};
|
||||
|
||||
let _ = chatgpt_bearer_token(&auth)?;
|
||||
let _ = chatgpt_account_id(&auth)?;
|
||||
Ok(auth)
|
||||
}
|
||||
|
||||
fn chatgpt_bearer_token(auth: &CodexAuth) -> Result<String, ExecServerError> {
|
||||
auth.get_token()
|
||||
.map_err(|err| ExecServerError::CloudEnvironmentAuth(err.to_string()))
|
||||
}
|
||||
|
||||
fn chatgpt_account_id(auth: &CodexAuth) -> Result<String, ExecServerError> {
|
||||
auth.get_account_id().ok_or_else(|| {
|
||||
ExecServerError::CloudEnvironmentAuth(
|
||||
"cloud environments are waiting for a ChatGPT account id".to_string(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
async fn recover_unauthorized(auth_manager: &Arc<AuthManager>) -> bool {
|
||||
let mut recovery = auth_manager.unauthorized_recovery();
|
||||
if !recovery.has_next() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let mode = recovery.mode_name();
|
||||
let step = recovery.step_name();
|
||||
match recovery.next().await {
|
||||
Ok(step_result) => {
|
||||
info!(
|
||||
"cloud environment auth recovery succeeded: mode={mode}, step={step}, auth_state_changed={:?}",
|
||||
step_result.auth_state_changed()
|
||||
);
|
||||
true
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("cloud environment auth recovery failed: mode={mode}, step={step}: {err}");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct EmptyRequest {}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct CloudErrorBody {
|
||||
error: Option<CloudError>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct CloudError {
|
||||
code: Option<String>,
|
||||
message: Option<String>,
|
||||
}
|
||||
|
||||
fn normalize_base_url(base_url: String) -> Result<String, ExecServerError> {
|
||||
let trimmed = base_url.trim().trim_end_matches('/').to_string();
|
||||
if trimmed.is_empty() {
|
||||
return Err(ExecServerError::CloudEnvironmentConfig(
|
||||
"cloud environments base URL is required".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(trimmed)
|
||||
}
|
||||
|
||||
fn endpoint_url(base_url: &str, path: &str) -> String {
|
||||
format!("{base_url}/{}", path.trim_start_matches('/'))
|
||||
}
|
||||
|
||||
fn cloud_http_error(status: StatusCode, body: &str) -> ExecServerError {
|
||||
let parsed = serde_json::from_str::<CloudErrorBody>(body).ok();
|
||||
let (code, message) = parsed
|
||||
.and_then(|body| body.error)
|
||||
.map(|error| {
|
||||
(
|
||||
error.code,
|
||||
error.message.unwrap_or_else(|| {
|
||||
preview_error_body(body).unwrap_or_else(|| "empty error body".to_string())
|
||||
}),
|
||||
)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
(
|
||||
None,
|
||||
preview_error_body(body)
|
||||
.unwrap_or_else(|| "empty or malformed error body".to_string()),
|
||||
)
|
||||
});
|
||||
ExecServerError::CloudEnvironmentHttp {
|
||||
status,
|
||||
code,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
fn preview_error_body(body: &str) -> Option<String> {
|
||||
let trimmed = body.trim();
|
||||
if trimmed.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(trimmed.chars().take(ERROR_BODY_PREVIEW_BYTES).collect())
|
||||
}
|
||||
|
||||
fn default_executor_name() -> String {
|
||||
gethostname::gethostname()
|
||||
.to_str()
|
||||
.filter(|hostname| !hostname.is_empty())
|
||||
.unwrap_or("codex-exec-server")
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn canonical_json(value: &Value) -> String {
|
||||
match value {
|
||||
Value::Object(map) => {
|
||||
let sorted = map
|
||||
.iter()
|
||||
.map(|(key, value)| (key, sorted_json_value(value)))
|
||||
.collect::<BTreeMap<_, _>>();
|
||||
serde_json::to_string(&sorted).unwrap_or_default()
|
||||
}
|
||||
_ => serde_json::to_string(value).unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn sorted_json_value(value: &Value) -> Value {
|
||||
match value {
|
||||
Value::Array(values) => Value::Array(values.iter().map(sorted_json_value).collect()),
|
||||
Value::Object(map) => Value::Object(
|
||||
map.iter()
|
||||
.map(|(key, value)| (key.clone(), sorted_json_value(value)))
|
||||
.collect(),
|
||||
),
|
||||
value => value.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use base64::Engine as _;
|
||||
use codex_config::types::AuthCredentialsStoreMode;
|
||||
use codex_login::CodexAuth;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::body_json;
|
||||
use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
use super::*;
|
||||
|
||||
const TEST_ACCESS_TOKEN: &str = "test-access-token";
|
||||
const TEST_REFRESHED_ACCESS_TOKEN: &str = "test-refreshed-access-token";
|
||||
const TEST_ACCOUNT_ID: &str = "acct-1";
|
||||
|
||||
fn auth_manager() -> Arc<AuthManager> {
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
}
|
||||
|
||||
fn auth_manager_with_stored_chatgpt_auth() -> (TempDir, Arc<AuthManager>) {
|
||||
let codex_home = tempfile::tempdir().expect("create temp codex home");
|
||||
write_auth_json(codex_home.path(), TEST_ACCESS_TOKEN, TEST_ACCOUNT_ID);
|
||||
let auth_manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
);
|
||||
(codex_home, auth_manager)
|
||||
}
|
||||
|
||||
fn write_auth_json(codex_home: &std::path::Path, access_token: &str, account_id: &str) {
|
||||
let auth_json = json!({
|
||||
"auth_mode": "chatgpt",
|
||||
"tokens": {
|
||||
"id_token": fake_jwt(account_id),
|
||||
"access_token": access_token,
|
||||
"refresh_token": "test-refresh-token",
|
||||
"account_id": account_id,
|
||||
},
|
||||
"last_refresh": "2999-01-01T00:00:00Z",
|
||||
});
|
||||
std::fs::write(
|
||||
codex_home.join("auth.json"),
|
||||
serde_json::to_string_pretty(&auth_json).expect("serialize auth json"),
|
||||
)
|
||||
.expect("write auth json");
|
||||
}
|
||||
|
||||
fn fake_jwt(account_id: &str) -> String {
|
||||
let header = json!({
|
||||
"alg": "none",
|
||||
"typ": "JWT",
|
||||
});
|
||||
let payload = json!({
|
||||
"email": "user@example.com",
|
||||
"https://api.openai.com/auth": {
|
||||
"chatgpt_account_id": account_id,
|
||||
"chatgpt_user_id": "user-12345",
|
||||
},
|
||||
});
|
||||
let b64 = |value: &serde_json::Value| {
|
||||
base64::engine::general_purpose::URL_SAFE_NO_PAD
|
||||
.encode(serde_json::to_vec(value).expect("serialize jwt part"))
|
||||
};
|
||||
let signature = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"sig");
|
||||
format!("{}.{}.{}", b64(&header), b64(&payload), signature)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalizes_base_url_and_builds_endpoints() {
|
||||
let client = CloudEnvironmentClient::new(
|
||||
"https://cloud.example.test/root/".to_string(),
|
||||
auth_manager(),
|
||||
)
|
||||
.expect("client");
|
||||
|
||||
assert_eq!(
|
||||
client.endpoint_url("/api/cloud/environment/env-1"),
|
||||
"https://cloud.example.test/root/api/cloud/environment/env-1"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cloud_response_serde_matches_service_shape() {
|
||||
let connect: CloudEnvironmentConnectResponse = serde_json::from_value(json!({
|
||||
"environment_id": "env-1",
|
||||
"executor_id": "exec-1",
|
||||
"url": "wss://rendezvous.test/executor/exec-1?role=harness&sig=abc"
|
||||
}))
|
||||
.expect("connect response");
|
||||
let registration: CloudEnvironmentExecutorRegistrationResponse =
|
||||
serde_json::from_value(json!({
|
||||
"id": "exec-1",
|
||||
"environment_id": "env-1",
|
||||
"url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc"
|
||||
}))
|
||||
.expect("registration response");
|
||||
|
||||
assert_eq!(
|
||||
connect,
|
||||
CloudEnvironmentConnectResponse {
|
||||
environment_id: "env-1".to_string(),
|
||||
executor_id: "exec-1".to_string(),
|
||||
url: "wss://rendezvous.test/executor/exec-1?role=harness&sig=abc".to_string(),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
registration,
|
||||
CloudEnvironmentExecutorRegistrationResponse {
|
||||
id: "exec-1".to_string(),
|
||||
environment_id: "env-1".to_string(),
|
||||
url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cloud_error_body_is_preserved() {
|
||||
let err = cloud_http_error(
|
||||
StatusCode::CONFLICT,
|
||||
r#"{"error":{"code":"no_online_executor","message":"no executor is online"}}"#,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"cloud environments request failed (409 Conflict, no_online_executor): no executor is online"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_environment_posts_with_chatgpt_auth_headers() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/api/cloud/environment/env-1"))
|
||||
.and(header(
|
||||
"authorization",
|
||||
format!("Bearer {TEST_ACCESS_TOKEN}"),
|
||||
))
|
||||
.and(header("chatgpt-account-id", TEST_ACCOUNT_ID))
|
||||
.and(body_json(json!({})))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"environment_id": "env-1",
|
||||
"executor_id": "exec-1",
|
||||
"url": "ws://127.0.0.1:1234"
|
||||
})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
let (_codex_home, auth_manager) = auth_manager_with_stored_chatgpt_auth();
|
||||
let client = CloudEnvironmentClient::new(server.uri(), auth_manager).expect("client");
|
||||
|
||||
let response = client
|
||||
.connect_environment("env-1")
|
||||
.await
|
||||
.expect("connect environment");
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
CloudEnvironmentConnectResponse {
|
||||
environment_id: "env-1".to_string(),
|
||||
executor_id: "exec-1".to_string(),
|
||||
url: "ws://127.0.0.1:1234".to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn retries_once_after_unauthorized_recovery() {
|
||||
let (codex_home, auth_manager) = auth_manager_with_stored_chatgpt_auth();
|
||||
write_auth_json(
|
||||
codex_home.path(),
|
||||
TEST_REFRESHED_ACCESS_TOKEN,
|
||||
TEST_ACCOUNT_ID,
|
||||
);
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/api/cloud/environment/env-1"))
|
||||
.and(header(
|
||||
"authorization",
|
||||
format!("Bearer {TEST_ACCESS_TOKEN}"),
|
||||
))
|
||||
.respond_with(ResponseTemplate::new(401).set_body_json(json!({
|
||||
"error": {
|
||||
"code": "unauthorized",
|
||||
"message": "expired token"
|
||||
}
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/api/cloud/environment/env-1"))
|
||||
.and(header(
|
||||
"authorization",
|
||||
format!("Bearer {TEST_REFRESHED_ACCESS_TOKEN}"),
|
||||
))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"environment_id": "env-1",
|
||||
"executor_id": "exec-1",
|
||||
"url": "ws://127.0.0.1:1234"
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
let client = CloudEnvironmentClient::new(server.uri(), auth_manager).expect("client");
|
||||
|
||||
client
|
||||
.connect_environment("env-1")
|
||||
.await
|
||||
.expect("connect environment");
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,13 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::AuthManagerConfig;
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
use crate::CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR;
|
||||
use crate::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR;
|
||||
use crate::CloudEnvironmentClient;
|
||||
use crate::ExecServerClient;
|
||||
use crate::ExecServerError;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
@@ -15,18 +21,160 @@ use crate::remote_process::RemoteProcess;
|
||||
|
||||
pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
|
||||
|
||||
/// Resolves the execution/filesystem environment for a session.
|
||||
///
|
||||
/// Implementations own the selection-specific details, such as whether a remote
|
||||
/// websocket URL is used directly or resolved from cloud environment metadata.
|
||||
#[async_trait]
|
||||
pub trait EnvironmentResolver: std::fmt::Debug + Send + Sync {
|
||||
fn exec_server_url(&self) -> Option<&str>;
|
||||
fn is_remote(&self) -> bool;
|
||||
async fn create_environment(&self) -> Result<Option<Environment>, ExecServerError>;
|
||||
}
|
||||
|
||||
/// Resolver for the existing direct/local/disabled behavior.
|
||||
#[derive(Debug)]
|
||||
pub struct DefaultEnvironmentResolver {
|
||||
exec_server_url: Option<String>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
disabled: bool,
|
||||
}
|
||||
|
||||
impl DefaultEnvironmentResolver {
|
||||
pub fn new(
|
||||
exec_server_url: Option<String>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
|
||||
Self {
|
||||
exec_server_url,
|
||||
local_runtime_paths,
|
||||
disabled,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disabled() -> Self {
|
||||
Self {
|
||||
exec_server_url: None,
|
||||
local_runtime_paths: None,
|
||||
disabled: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EnvironmentResolver for DefaultEnvironmentResolver {
|
||||
fn exec_server_url(&self) -> Option<&str> {
|
||||
self.exec_server_url.as_deref()
|
||||
}
|
||||
|
||||
fn is_remote(&self) -> bool {
|
||||
self.exec_server_url.is_some()
|
||||
}
|
||||
|
||||
async fn create_environment(&self) -> Result<Option<Environment>, ExecServerError> {
|
||||
if self.disabled {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some(
|
||||
Environment::create_with_runtime_paths(
|
||||
self.exec_server_url.clone(),
|
||||
/*cloud_environment_id*/ None,
|
||||
/*cloud_environments_base_url*/ None,
|
||||
/*auth_manager*/ None,
|
||||
self.local_runtime_paths.clone(),
|
||||
)
|
||||
.await?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolver for cloud environment id selection.
|
||||
pub struct CloudEnvironmentResolver {
|
||||
cloud_environment_id: String,
|
||||
cloud_environments_base_url: Option<String>,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
}
|
||||
|
||||
impl CloudEnvironmentResolver {
|
||||
pub fn new(
|
||||
cloud_environment_id: String,
|
||||
cloud_environments_base_url: Option<String>,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
Self {
|
||||
cloud_environment_id,
|
||||
cloud_environments_base_url: normalize_optional_env_value(cloud_environments_base_url),
|
||||
auth_manager,
|
||||
local_runtime_paths,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cloud_environment_id(&self) -> &str {
|
||||
&self.cloud_environment_id
|
||||
}
|
||||
|
||||
pub fn cloud_environments_base_url(&self) -> Option<&str> {
|
||||
self.cloud_environments_base_url.as_deref()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for CloudEnvironmentResolver {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CloudEnvironmentResolver")
|
||||
.field("cloud_environment_id", &self.cloud_environment_id)
|
||||
.field(
|
||||
"cloud_environments_base_url",
|
||||
&self.cloud_environments_base_url,
|
||||
)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EnvironmentResolver for CloudEnvironmentResolver {
|
||||
fn exec_server_url(&self) -> Option<&str> {
|
||||
None
|
||||
}
|
||||
|
||||
fn is_remote(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
async fn create_environment(&self) -> Result<Option<Environment>, ExecServerError> {
|
||||
Ok(Some(
|
||||
Environment::create_with_runtime_paths(
|
||||
/*exec_server_url*/ None,
|
||||
Some(self.cloud_environment_id.clone()),
|
||||
self.cloud_environments_base_url.clone(),
|
||||
self.auth_manager.clone(),
|
||||
self.local_runtime_paths.clone(),
|
||||
)
|
||||
.await?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Lazily creates and caches the active environment for a session.
|
||||
///
|
||||
/// The manager keeps the session's environment selection stable so subagents
|
||||
/// and follow-up turns preserve an explicit disabled state.
|
||||
#[derive(Debug)]
|
||||
pub struct EnvironmentManager {
|
||||
exec_server_url: Option<String>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
disabled: bool,
|
||||
resolver: Box<dyn EnvironmentResolver>,
|
||||
current_environment: OnceCell<Option<Arc<Environment>>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for EnvironmentManager {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("EnvironmentManager")
|
||||
.field("resolver", &self.resolver)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EnvironmentManager {
|
||||
fn default() -> Self {
|
||||
Self::new(/*exec_server_url*/ None)
|
||||
@@ -45,11 +193,48 @@ impl EnvironmentManager {
|
||||
exec_server_url: Option<String>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
|
||||
Self {
|
||||
Self::from_resolver(DefaultEnvironmentResolver::new(
|
||||
exec_server_url,
|
||||
local_runtime_paths,
|
||||
disabled,
|
||||
))
|
||||
}
|
||||
|
||||
fn new_with_options(options: EnvironmentManagerOptions) -> Self {
|
||||
let EnvironmentManagerOptions {
|
||||
exec_server_url,
|
||||
cloud_environment_id,
|
||||
cloud_environments_base_url,
|
||||
auth_manager,
|
||||
local_runtime_paths,
|
||||
} = options;
|
||||
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
|
||||
if exec_server_url.is_some() || disabled {
|
||||
return Self::from_resolver(DefaultEnvironmentResolver {
|
||||
exec_server_url,
|
||||
local_runtime_paths,
|
||||
disabled,
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(cloud_environment_id) = normalize_optional_env_value(cloud_environment_id) {
|
||||
Self::from_resolver(CloudEnvironmentResolver::new(
|
||||
cloud_environment_id,
|
||||
cloud_environments_base_url,
|
||||
auth_manager,
|
||||
local_runtime_paths,
|
||||
))
|
||||
} else {
|
||||
Self::from_resolver(DefaultEnvironmentResolver {
|
||||
exec_server_url: None,
|
||||
local_runtime_paths,
|
||||
disabled: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_resolver(resolver: impl EnvironmentResolver + 'static) -> Self {
|
||||
Self {
|
||||
resolver: Box::new(resolver),
|
||||
current_environment: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
@@ -64,56 +249,86 @@ impl EnvironmentManager {
|
||||
pub fn from_env_with_runtime_paths(
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
Self::new_with_runtime_paths(
|
||||
std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok(),
|
||||
Self::new_with_options(EnvironmentManagerOptions {
|
||||
exec_server_url: std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok(),
|
||||
cloud_environment_id: std::env::var(CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR).ok(),
|
||||
cloud_environments_base_url: std::env::var(CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR)
|
||||
.ok(),
|
||||
auth_manager: None,
|
||||
local_runtime_paths,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds a manager from process environment variables and an auth manager.
|
||||
/// This is the production constructor used after config/auth are
|
||||
/// available.
|
||||
pub fn from_env_with_runtime_paths_and_auth_manager(
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
) -> Self {
|
||||
Self::new_with_options(EnvironmentManagerOptions {
|
||||
exec_server_url: std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok(),
|
||||
cloud_environment_id: std::env::var(CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR).ok(),
|
||||
cloud_environments_base_url: std::env::var(CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR)
|
||||
.ok(),
|
||||
auth_manager,
|
||||
local_runtime_paths,
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds a manager from process environment variables and the resolved
|
||||
/// login config. Cloud environments use ChatGPT credentials, so API-key
|
||||
/// auth from environment variables is intentionally disabled here.
|
||||
pub fn from_env_with_runtime_paths_and_chatgpt_login_config(
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
config: &impl AuthManagerConfig,
|
||||
) -> Self {
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config, /*enable_codex_api_key_env*/ false);
|
||||
Self::from_env_with_runtime_paths_and_auth_manager(local_runtime_paths, Some(auth_manager))
|
||||
}
|
||||
|
||||
/// Builds a manager from the currently selected environment, or from the
|
||||
/// disabled mode when no environment is available.
|
||||
pub fn from_environment(environment: Option<&Environment>) -> Self {
|
||||
match environment {
|
||||
Some(environment) => Self {
|
||||
exec_server_url: environment.exec_server_url().map(str::to_owned),
|
||||
local_runtime_paths: environment.local_runtime_paths().cloned(),
|
||||
disabled: false,
|
||||
current_environment: OnceCell::new(),
|
||||
},
|
||||
None => Self {
|
||||
exec_server_url: None,
|
||||
local_runtime_paths: None,
|
||||
disabled: true,
|
||||
current_environment: OnceCell::new(),
|
||||
},
|
||||
Some(environment) => {
|
||||
if let Some(cloud_environment_id) = &environment.cloud_environment_id {
|
||||
return Self::from_resolver(CloudEnvironmentResolver::new(
|
||||
cloud_environment_id.clone(),
|
||||
environment.cloud_environments_base_url.clone(),
|
||||
environment.auth_manager.clone(),
|
||||
environment.local_runtime_paths().cloned(),
|
||||
));
|
||||
}
|
||||
|
||||
Self::from_resolver(DefaultEnvironmentResolver::new(
|
||||
environment.exec_server_url().map(str::to_owned),
|
||||
environment.local_runtime_paths().cloned(),
|
||||
))
|
||||
}
|
||||
None => Self::from_resolver(DefaultEnvironmentResolver::disabled()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the remote exec-server URL when one is configured.
|
||||
pub fn exec_server_url(&self) -> Option<&str> {
|
||||
self.exec_server_url.as_deref()
|
||||
self.resolver.exec_server_url()
|
||||
}
|
||||
|
||||
/// Returns true when this manager is configured to use a remote exec server.
|
||||
pub fn is_remote(&self) -> bool {
|
||||
self.exec_server_url.is_some()
|
||||
self.resolver.is_remote()
|
||||
}
|
||||
|
||||
/// Returns the cached environment, creating it on first access.
|
||||
pub async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
|
||||
self.current_environment
|
||||
.get_or_try_init(|| async {
|
||||
if self.disabled {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(Arc::new(
|
||||
Environment::create_with_runtime_paths(
|
||||
self.exec_server_url.clone(),
|
||||
self.local_runtime_paths.clone(),
|
||||
)
|
||||
.await?,
|
||||
)))
|
||||
}
|
||||
self.resolver
|
||||
.create_environment()
|
||||
.await
|
||||
.map(|environment| environment.map(Arc::new))
|
||||
})
|
||||
.await
|
||||
.map(Option::as_ref)
|
||||
@@ -128,6 +343,9 @@ impl EnvironmentManager {
|
||||
#[derive(Clone)]
|
||||
pub struct Environment {
|
||||
exec_server_url: Option<String>,
|
||||
cloud_environment_id: Option<String>,
|
||||
cloud_environments_base_url: Option<String>,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
remote_exec_server_client: Option<ExecServerClient>,
|
||||
exec_backend: Arc<dyn ExecBackend>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
@@ -137,6 +355,9 @@ impl Default for Environment {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
exec_server_url: None,
|
||||
cloud_environment_id: None,
|
||||
cloud_environments_base_url: None,
|
||||
auth_manager: None,
|
||||
remote_exec_server_client: None,
|
||||
exec_backend: Arc::new(LocalProcess::default()),
|
||||
local_runtime_paths: None,
|
||||
@@ -148,6 +369,7 @@ impl std::fmt::Debug for Environment {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Environment")
|
||||
.field("exec_server_url", &self.exec_server_url)
|
||||
.field("cloud_environment_id", &self.cloud_environment_id)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
@@ -155,13 +377,23 @@ impl std::fmt::Debug for Environment {
|
||||
impl Environment {
|
||||
/// Builds an environment from the raw `CODEX_EXEC_SERVER_URL` value.
|
||||
pub async fn create(exec_server_url: Option<String>) -> Result<Self, ExecServerError> {
|
||||
Self::create_with_runtime_paths(exec_server_url, /*local_runtime_paths*/ None).await
|
||||
Self::create_with_runtime_paths(
|
||||
exec_server_url,
|
||||
/*cloud_environment_id*/ None,
|
||||
/*cloud_environments_base_url*/ None,
|
||||
/*auth_manager*/ None,
|
||||
/*local_runtime_paths*/ None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Builds an environment from the raw `CODEX_EXEC_SERVER_URL` value and
|
||||
/// local runtime paths used when creating local filesystem helpers.
|
||||
pub async fn create_with_runtime_paths(
|
||||
exec_server_url: Option<String>,
|
||||
cloud_environment_id: Option<String>,
|
||||
cloud_environments_base_url: Option<String>,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
|
||||
@@ -171,17 +403,24 @@ impl Environment {
|
||||
));
|
||||
}
|
||||
|
||||
let cloud_environment_id = normalize_optional_env_value(cloud_environment_id);
|
||||
let cloud_environments_base_url = normalize_optional_env_value(cloud_environments_base_url);
|
||||
let remote_exec_server_client = if let Some(exec_server_url) = &exec_server_url {
|
||||
Some(
|
||||
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
|
||||
websocket_url: exec_server_url.clone(),
|
||||
client_name: "codex-environment".to_string(),
|
||||
connect_timeout: std::time::Duration::from_secs(5),
|
||||
initialize_timeout: std::time::Duration::from_secs(5),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await?,
|
||||
)
|
||||
Some(connect_remote_exec_server(exec_server_url.clone()).await?)
|
||||
} else if let Some(environment_id) = &cloud_environment_id {
|
||||
let base_url = cloud_environments_base_url.clone().ok_or_else(|| {
|
||||
ExecServerError::CloudEnvironmentConfig(format!(
|
||||
"{CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR} is required when {CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR} is set"
|
||||
))
|
||||
})?;
|
||||
let auth_manager = auth_manager.clone().ok_or_else(|| {
|
||||
ExecServerError::CloudEnvironmentAuth(
|
||||
"cloud environment selection requires ChatGPT authentication".to_string(),
|
||||
)
|
||||
})?;
|
||||
let client = CloudEnvironmentClient::new(base_url, auth_manager)?;
|
||||
let response = client.connect_environment(environment_id).await?;
|
||||
Some(connect_remote_exec_server(response.url).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -195,6 +434,9 @@ impl Environment {
|
||||
|
||||
Ok(Self {
|
||||
exec_server_url,
|
||||
cloud_environment_id,
|
||||
cloud_environments_base_url,
|
||||
auth_manager,
|
||||
remote_exec_server_client,
|
||||
exec_backend,
|
||||
local_runtime_paths,
|
||||
@@ -202,7 +444,7 @@ impl Environment {
|
||||
}
|
||||
|
||||
pub fn is_remote(&self) -> bool {
|
||||
self.exec_server_url.is_some()
|
||||
self.exec_server_url.is_some() || self.cloud_environment_id.is_some()
|
||||
}
|
||||
|
||||
/// Returns the remote exec-server URL when this environment is remote.
|
||||
@@ -236,12 +478,43 @@ fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Option<String>
|
||||
Some(url) => (Some(url.to_string()), false),
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_optional_env_value(value: Option<String>) -> Option<String> {
|
||||
value
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
async fn connect_remote_exec_server(
|
||||
websocket_url: String,
|
||||
) -> Result<ExecServerClient, ExecServerError> {
|
||||
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
|
||||
websocket_url,
|
||||
client_name: "codex-environment".to_string(),
|
||||
connect_timeout: std::time::Duration::from_secs(5),
|
||||
initialize_timeout: std::time::Duration::from_secs(5),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
struct EnvironmentManagerOptions {
|
||||
exec_server_url: Option<String>,
|
||||
cloud_environment_id: Option<String>,
|
||||
cloud_environments_base_url: Option<String>,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::CloudEnvironmentResolver;
|
||||
use super::Environment;
|
||||
use super::EnvironmentManager;
|
||||
use super::EnvironmentManagerOptions;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::ProcessId;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -260,7 +533,6 @@ mod tests {
|
||||
fn environment_manager_normalizes_empty_url() {
|
||||
let manager = EnvironmentManager::new(Some(String::new()));
|
||||
|
||||
assert!(!manager.disabled);
|
||||
assert_eq!(manager.exec_server_url(), None);
|
||||
assert!(!manager.is_remote());
|
||||
}
|
||||
@@ -269,7 +541,6 @@ mod tests {
|
||||
fn environment_manager_treats_none_value_as_disabled() {
|
||||
let manager = EnvironmentManager::new(Some("none".to_string()));
|
||||
|
||||
assert!(manager.disabled);
|
||||
assert_eq!(manager.exec_server_url(), None);
|
||||
assert!(!manager.is_remote());
|
||||
}
|
||||
@@ -282,6 +553,88 @@ mod tests {
|
||||
assert_eq!(manager.exec_server_url(), Some("ws://127.0.0.1:8765"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn direct_remote_url_takes_precedence_over_cloud_environment() {
|
||||
let manager = EnvironmentManager::new_with_options(EnvironmentManagerOptions {
|
||||
exec_server_url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
cloud_environment_id: Some("env-1".to_string()),
|
||||
cloud_environments_base_url: Some("https://cloud.example.test".to_string()),
|
||||
auth_manager: None,
|
||||
local_runtime_paths: None,
|
||||
});
|
||||
|
||||
assert!(manager.is_remote());
|
||||
assert_eq!(manager.exec_server_url(), Some("ws://127.0.0.1:8765"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn none_disables_cloud_environment_selection() {
|
||||
let manager = EnvironmentManager::new_with_options(EnvironmentManagerOptions {
|
||||
exec_server_url: Some("none".to_string()),
|
||||
cloud_environment_id: Some("env-1".to_string()),
|
||||
cloud_environments_base_url: Some("https://cloud.example.test".to_string()),
|
||||
auth_manager: None,
|
||||
local_runtime_paths: None,
|
||||
});
|
||||
|
||||
assert!(!manager.is_remote());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cloud_environment_id_is_remote_without_direct_url() {
|
||||
let cloud_resolver = CloudEnvironmentResolver::new(
|
||||
"env-1".to_string(),
|
||||
Some(" https://cloud.example.test ".to_string()),
|
||||
/*auth_manager*/ None,
|
||||
/*local_runtime_paths*/ None,
|
||||
);
|
||||
|
||||
assert_eq!(cloud_resolver.cloud_environment_id(), "env-1");
|
||||
assert_eq!(
|
||||
cloud_resolver.cloud_environments_base_url(),
|
||||
Some("https://cloud.example.test")
|
||||
);
|
||||
let manager = EnvironmentManager::from_resolver(cloud_resolver);
|
||||
assert!(manager.is_remote());
|
||||
assert_eq!(manager.exec_server_url(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cloud_environment_requires_base_url() {
|
||||
let err = Environment::create_with_runtime_paths(
|
||||
/*exec_server_url*/ None,
|
||||
Some("env-1".to_string()),
|
||||
/*cloud_environments_base_url*/ None,
|
||||
/*auth_manager*/ None,
|
||||
/*local_runtime_paths*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("missing base URL should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"cloud environment configuration error: CODEX_CLOUD_ENVIRONMENTS_BASE_URL is required when CODEX_CLOUD_ENVIRONMENT_ID is set"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cloud_environment_requires_auth_manager() {
|
||||
let err = Environment::create_with_runtime_paths(
|
||||
/*exec_server_url*/ None,
|
||||
Some("env-1".to_string()),
|
||||
Some("https://cloud.example.test".to_string()),
|
||||
/*auth_manager*/ None,
|
||||
/*local_runtime_paths*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("missing auth should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"cloud environment authentication error: cloud environment selection requires ChatGPT authentication"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_current_caches_environment() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
@@ -314,9 +667,14 @@ mod tests {
|
||||
.expect("local environment");
|
||||
|
||||
assert_eq!(environment.local_runtime_paths(), Some(&runtime_paths));
|
||||
let inherited_environment = EnvironmentManager::from_environment(Some(&environment))
|
||||
.current()
|
||||
.await
|
||||
.expect("get inherited current environment")
|
||||
.expect("inherited local environment");
|
||||
assert_eq!(
|
||||
EnvironmentManager::from_environment(Some(&environment)).local_runtime_paths,
|
||||
Some(runtime_paths)
|
||||
inherited_environment.local_runtime_paths(),
|
||||
Some(&runtime_paths)
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
mod client;
|
||||
mod client_api;
|
||||
mod cloud;
|
||||
mod connection;
|
||||
mod environment;
|
||||
mod file_system;
|
||||
@@ -22,9 +23,20 @@ pub use client::ExecServerClient;
|
||||
pub use client::ExecServerError;
|
||||
pub use client_api::ExecServerClientConnectOptions;
|
||||
pub use client_api::RemoteExecServerConnectArgs;
|
||||
pub use cloud::CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR;
|
||||
pub use cloud::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR;
|
||||
pub use cloud::CloudEnvironmentClient;
|
||||
pub use cloud::CloudEnvironmentConnectResponse;
|
||||
pub use cloud::CloudEnvironmentExecutorRegistrationResponse;
|
||||
pub use cloud::CloudEnvironmentRegisterExecutorRequest;
|
||||
pub use cloud::CloudExecutorConfig;
|
||||
pub use cloud::run_cloud_executor;
|
||||
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
|
||||
pub use environment::CloudEnvironmentResolver;
|
||||
pub use environment::DefaultEnvironmentResolver;
|
||||
pub use environment::Environment;
|
||||
pub use environment::EnvironmentManager;
|
||||
pub use environment::EnvironmentResolver;
|
||||
pub use file_system::CopyOptions;
|
||||
pub use file_system::CreateDirectoryOptions;
|
||||
pub use file_system::ExecutorFileSystem;
|
||||
|
||||
@@ -7,6 +7,7 @@ mod session_registry;
|
||||
mod transport;
|
||||
|
||||
pub(crate) use handler::ExecServerHandler;
|
||||
pub(crate) use processor::ConnectionProcessor;
|
||||
pub use transport::DEFAULT_LISTEN_URL;
|
||||
pub use transport::ExecServerListenUrlParseError;
|
||||
|
||||
|
||||
@@ -67,6 +67,7 @@ use codex_core::path_utils;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_git_utils::get_git_repo_root;
|
||||
use codex_login::AuthConfig;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::default_client::set_default_client_residency_requirement;
|
||||
use codex_login::default_client::set_default_originator;
|
||||
use codex_login::enforce_login_restrictions;
|
||||
@@ -474,6 +475,8 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let environment_auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
|
||||
let in_process_start_args = InProcessClientStartArgs {
|
||||
arg0_paths,
|
||||
config: std::sync::Arc::new(config.clone()),
|
||||
@@ -481,9 +484,12 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
loader_overrides: run_loader_overrides,
|
||||
cloud_requirements: run_cloud_requirements,
|
||||
feedback: CodexFeedback::new(),
|
||||
environment_manager: std::sync::Arc::new(EnvironmentManager::from_env_with_runtime_paths(
|
||||
Some(local_runtime_paths),
|
||||
)),
|
||||
environment_manager: std::sync::Arc::new(
|
||||
EnvironmentManager::from_env_with_runtime_paths_and_auth_manager(
|
||||
Some(local_runtime_paths),
|
||||
Some(environment_auth_manager),
|
||||
),
|
||||
),
|
||||
config_warnings,
|
||||
session_source: SessionSource::Exec,
|
||||
enable_codex_api_key_env: true,
|
||||
|
||||
@@ -723,15 +723,19 @@ pub async fn run_main(
|
||||
}
|
||||
};
|
||||
|
||||
let environment_manager = Arc::new(EnvironmentManager::from_env_with_runtime_paths(Some(
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
)));
|
||||
let local_runtime_paths = ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let preliminary_environment_manager = Arc::new(
|
||||
EnvironmentManager::from_env_with_runtime_paths(Some(local_runtime_paths.clone())),
|
||||
);
|
||||
let cwd = cli.cwd.clone();
|
||||
let config_cwd =
|
||||
config_cwd_for_app_server_target(cwd.as_deref(), &app_server_target, &environment_manager)?;
|
||||
let config_cwd = config_cwd_for_app_server_target(
|
||||
cwd.as_deref(),
|
||||
&app_server_target,
|
||||
&preliminary_environment_manager,
|
||||
)?;
|
||||
|
||||
#[allow(clippy::print_stderr)]
|
||||
let config_toml = match load_config_as_toml_with_cli_overrides(
|
||||
@@ -982,6 +986,13 @@ pub async fn run_main(
|
||||
.with(otel_tracing_layer)
|
||||
.try_init();
|
||||
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::from_env_with_runtime_paths_and_chatgpt_login_config(
|
||||
Some(local_runtime_paths),
|
||||
&config,
|
||||
),
|
||||
);
|
||||
|
||||
run_ratatui_app(
|
||||
cli,
|
||||
arg0_paths,
|
||||
@@ -1755,6 +1766,34 @@ mod tests {
|
||||
use serial_test::serial;
|
||||
use tempfile::TempDir;
|
||||
|
||||
struct EnvVarGuard {
|
||||
key: &'static str,
|
||||
previous: Option<String>,
|
||||
}
|
||||
|
||||
impl EnvVarGuard {
|
||||
fn set(key: &'static str, value: &str) -> Self {
|
||||
let previous = std::env::var(key).ok();
|
||||
unsafe { std::env::set_var(key, value) };
|
||||
Self { key, previous }
|
||||
}
|
||||
|
||||
fn remove(key: &'static str) -> Self {
|
||||
let previous = std::env::var(key).ok();
|
||||
unsafe { std::env::remove_var(key) };
|
||||
Self { key, previous }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EnvVarGuard {
|
||||
fn drop(&mut self) {
|
||||
match &self.previous {
|
||||
Some(value) => unsafe { std::env::set_var(self.key, value) },
|
||||
None => unsafe { std::env::remove_var(self.key) },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_config(temp_dir: &TempDir) -> std::io::Result<Config> {
|
||||
ConfigBuilder::default()
|
||||
.codex_home(temp_dir.path().to_path_buf())
|
||||
@@ -1990,6 +2029,34 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn config_cwd_for_app_server_target_omits_cwd_for_cloud_exec_server() -> std::io::Result<()> {
|
||||
let _exec_server_url =
|
||||
EnvVarGuard::remove(codex_exec_server::CODEX_EXEC_SERVER_URL_ENV_VAR);
|
||||
let _cloud_environment_id = EnvVarGuard::set(
|
||||
codex_exec_server::CODEX_CLOUD_ENVIRONMENT_ID_ENV_VAR,
|
||||
"env-1",
|
||||
);
|
||||
let _cloud_base_url = EnvVarGuard::set(
|
||||
codex_exec_server::CODEX_CLOUD_ENVIRONMENTS_BASE_URL_ENV_VAR,
|
||||
"https://cloud.example.test",
|
||||
);
|
||||
let remote_only_cwd = if cfg!(windows) {
|
||||
Path::new(r"C:\definitely\not\local\to\this\test")
|
||||
} else {
|
||||
Path::new("/definitely/not/local/to/this/test")
|
||||
};
|
||||
let target = AppServerTarget::Embedded;
|
||||
let environment_manager = EnvironmentManager::from_env_with_runtime_paths(None);
|
||||
|
||||
let config_cwd =
|
||||
config_cwd_for_app_server_target(Some(remote_only_cwd), &target, &environment_manager)?;
|
||||
|
||||
assert_eq!(config_cwd, None);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_session_cwd_returns_none_without_sqlite_or_rollout_path() -> std::io::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
|
||||
Reference in New Issue
Block a user