Files
codex/codex-rs/exec-server/src/environment.rs
starr-openai cb273931f8 Remove environment provider trait
Co-authored-by: Codex <noreply@openai.com>
2026-04-16 13:58:05 -07:00

701 lines
24 KiB
Rust

use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::OnceCell;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::RemoteExecServerConnectArgs;
use crate::file_system::ExecutorFileSystem;
use crate::local_file_system::LocalFileSystem;
use crate::local_process::LocalProcess;
use crate::process::ExecBackend;
use crate::remote_file_system::RemoteFileSystem;
use crate::remote_process::RemoteProcess;
pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
pub const CODEX_EXEC_SERVER_CWD_ENV_VAR: &str = "CODEX_EXEC_SERVER_CWD";
const LOCAL_ENVIRONMENT_ID: &str = "local";
const REMOTE_ENVIRONMENT_ID: &str = "remote";
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ExplicitEnvironmentId {
Local,
Remote,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct EnvironmentConfig {
exec_server_url: Option<String>,
default_cwd: Option<PathBuf>,
}
/// Lazily creates and caches the active environment for a session.
///
/// The manager keeps the session's environment selection stable so subagents
/// and follow-up turns preserve an explicit disabled state.
#[derive(Debug)]
pub struct EnvironmentManager {
current_environment_config: Option<EnvironmentConfig>,
local_environment_config: Option<EnvironmentConfig>,
remote_environment_config: Option<EnvironmentConfig>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
disabled: bool,
current_environment: OnceCell<Option<Arc<Environment>>>,
local_environment: OnceCell<Arc<Environment>>,
remote_environment: OnceCell<Arc<Environment>>,
}
impl Default for EnvironmentManager {
fn default() -> Self {
Self::new(/*exec_server_url*/ None)
}
}
impl EnvironmentManager {
/// Builds a manager from the raw `CODEX_EXEC_SERVER_URL` value.
pub fn new(exec_server_url: Option<String>) -> Self {
Self::new_with_runtime_paths(
exec_server_url,
/*exec_server_cwd*/ None,
/*local_runtime_paths*/ None,
)
}
/// Builds a manager from the raw `CODEX_EXEC_SERVER_URL` value and local
/// runtime paths used when creating local filesystem helpers.
pub fn new_with_runtime_paths(
exec_server_url: Option<String>,
exec_server_cwd: Option<PathBuf>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
let local_environment_config = (!disabled).then_some(EnvironmentConfig {
exec_server_url: None,
default_cwd: None,
});
let remote_environment_config =
exec_server_url
.clone()
.map(|exec_server_url| EnvironmentConfig {
exec_server_url: Some(exec_server_url),
default_cwd: exec_server_cwd,
});
let current_environment_config = if disabled {
None
} else if remote_environment_config.is_some() {
remote_environment_config.clone()
} else {
local_environment_config.clone()
};
Self {
current_environment_config,
local_environment_config,
remote_environment_config,
local_runtime_paths,
disabled,
current_environment: OnceCell::new(),
local_environment: OnceCell::new(),
remote_environment: OnceCell::new(),
}
}
/// Builds a manager from process environment variables.
pub fn from_env() -> Self {
Self::from_env_with_runtime_paths(/*local_runtime_paths*/ None)
}
/// Builds a manager from process environment variables and local runtime
/// paths used when creating local filesystem helpers.
pub fn from_env_with_runtime_paths(
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
Self::new_with_runtime_paths(
std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok(),
std::env::var(CODEX_EXEC_SERVER_CWD_ENV_VAR)
.ok()
.map(PathBuf::from),
local_runtime_paths,
)
}
/// Builds a manager from the currently selected environment, or from the
/// disabled mode when no environment is available.
pub fn from_environment(environment: Option<&Environment>) -> Self {
match environment {
Some(environment) => Self {
current_environment_config: Some(EnvironmentConfig {
exec_server_url: environment.exec_server_url().map(str::to_owned),
default_cwd: environment.default_cwd().map(PathBuf::from),
}),
local_environment_config: Some(EnvironmentConfig {
exec_server_url: None,
default_cwd: None,
}),
remote_environment_config: environment.exec_server_url().map(|exec_server_url| {
EnvironmentConfig {
exec_server_url: Some(exec_server_url.to_owned()),
default_cwd: environment.default_cwd().map(PathBuf::from),
}
}),
local_runtime_paths: environment.local_runtime_paths().cloned(),
disabled: false,
current_environment: OnceCell::new(),
local_environment: OnceCell::new(),
remote_environment: OnceCell::new(),
},
None => Self {
current_environment_config: None,
local_environment_config: None,
remote_environment_config: None,
local_runtime_paths: None,
disabled: true,
current_environment: OnceCell::new(),
local_environment: OnceCell::new(),
remote_environment: OnceCell::new(),
},
}
}
/// Returns the remote exec-server URL when one is configured.
pub fn exec_server_url(&self) -> Option<&str> {
self.current_environment_config
.as_ref()
.and_then(|config| config.exec_server_url.as_deref())
}
/// Returns true when this manager is configured to use a remote exec server.
pub fn is_remote(&self) -> bool {
self.exec_server_url().is_some()
}
fn environment_config(
&self,
environment_id: Option<&str>,
) -> Result<Option<&EnvironmentConfig>, ExecServerError> {
match parse_environment_id(environment_id)? {
None => Ok(self.current_environment_config.as_ref()),
Some(ExplicitEnvironmentId::Local) => {
if self.disabled {
return Err(ExecServerError::Protocol(
"environments are disabled for this session".to_string(),
));
}
self.local_environment_config
.as_ref()
.ok_or_else(|| {
ExecServerError::Protocol("local environment is not configured".to_string())
})
.map(Some)
}
Some(ExplicitEnvironmentId::Remote) => {
if self.disabled {
return Err(ExecServerError::Protocol(
"environments are disabled for this session".to_string(),
));
}
self.remote_environment_config
.as_ref()
.ok_or_else(|| {
ExecServerError::Protocol(
"remote environment is not configured".to_string(),
)
})
.map(Some)
}
}
}
/// Returns the cached environment, creating it on first access.
pub async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
self.current_environment
.get_or_try_init(|| async {
if self.disabled {
Ok(None)
} else {
let Some(environment_config) = self.current_environment_config.clone() else {
return Ok(None);
};
Ok(Some(Arc::new(
Environment::create_with_config(
environment_config,
self.local_runtime_paths.clone(),
)
.await?,
)))
}
})
.await
.map(Option::as_ref)
.map(std::option::Option::<&Arc<Environment>>::cloned)
}
pub async fn environment(
&self,
environment_id: Option<&str>,
) -> Result<Option<Arc<Environment>>, ExecServerError> {
match parse_environment_id(environment_id)? {
None => self.current().await,
Some(ExplicitEnvironmentId::Local) => self.local_environment().await.map(Some),
Some(ExplicitEnvironmentId::Remote) => self.remote_environment().await.map(Some),
}
}
pub fn default_cwd(
&self,
environment_id: Option<&str>,
) -> Result<Option<&Path>, ExecServerError> {
Ok(self
.environment_config(environment_id)?
.and_then(|environment_config| environment_config.default_cwd.as_deref()))
}
async fn local_environment(&self) -> Result<Arc<Environment>, ExecServerError> {
if self.disabled {
return Err(ExecServerError::Protocol(
"environments are disabled for this session".to_string(),
));
}
let Some(environment_config) = self.local_environment_config.clone() else {
return Err(ExecServerError::Protocol(
"local environment is not configured".to_string(),
));
};
self.local_environment
.get_or_try_init(|| async {
Environment::create_with_config(
environment_config,
self.local_runtime_paths.clone(),
)
.await
.map(Arc::new)
})
.await
.map(Arc::clone)
}
async fn remote_environment(&self) -> Result<Arc<Environment>, ExecServerError> {
if self.disabled {
return Err(ExecServerError::Protocol(
"environments are disabled for this session".to_string(),
));
}
let Some(environment_config) = self.remote_environment_config.clone() else {
return Err(ExecServerError::Protocol(
"remote environment is not configured".to_string(),
));
};
self.remote_environment
.get_or_try_init(|| async {
Environment::create_with_config(
environment_config,
self.local_runtime_paths.clone(),
)
.await
.map(Arc::new)
})
.await
.map(Arc::clone)
}
}
/// Concrete execution/filesystem environment selected for a session.
///
/// This bundles the selected backend together with the corresponding remote
/// client, if any.
#[derive(Clone)]
pub struct Environment {
exec_server_url: Option<String>,
default_cwd: Option<PathBuf>,
remote_exec_server_client: Option<ExecServerClient>,
exec_backend: Arc<dyn ExecBackend>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
}
impl Default for Environment {
fn default() -> Self {
Self {
exec_server_url: None,
default_cwd: None,
remote_exec_server_client: None,
exec_backend: Arc::new(LocalProcess::default()),
local_runtime_paths: None,
}
}
}
impl std::fmt::Debug for Environment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Environment")
.field("exec_server_url", &self.exec_server_url)
.finish_non_exhaustive()
}
}
impl Environment {
/// Builds an environment from the raw `CODEX_EXEC_SERVER_URL` value.
pub async fn create(exec_server_url: Option<String>) -> Result<Self, ExecServerError> {
Self::create_with_runtime_paths(exec_server_url, /*local_runtime_paths*/ None).await
}
/// Builds an environment from the raw `CODEX_EXEC_SERVER_URL` value and
/// local runtime paths used when creating local filesystem helpers.
pub async fn create_with_runtime_paths(
exec_server_url: Option<String>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Result<Self, ExecServerError> {
Self::create_with_config(
EnvironmentConfig {
exec_server_url,
default_cwd: None,
},
local_runtime_paths,
)
.await
}
async fn create_with_config(
environment_config: EnvironmentConfig,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Result<Self, ExecServerError> {
let (exec_server_url, disabled) =
normalize_exec_server_url(environment_config.exec_server_url);
if disabled {
return Err(ExecServerError::Protocol(
"disabled mode does not create an Environment".to_string(),
));
}
let remote_exec_server_client = if let Some(exec_server_url) = &exec_server_url {
Some(
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
websocket_url: exec_server_url.clone(),
client_name: "codex-environment".to_string(),
connect_timeout: std::time::Duration::from_secs(5),
initialize_timeout: std::time::Duration::from_secs(5),
resume_session_id: None,
})
.await?,
)
} else {
None
};
let exec_backend: Arc<dyn ExecBackend> =
if let Some(client) = remote_exec_server_client.clone() {
Arc::new(RemoteProcess::new(client))
} else {
Arc::new(LocalProcess::default())
};
Ok(Self {
exec_server_url,
default_cwd: environment_config.default_cwd,
remote_exec_server_client,
exec_backend,
local_runtime_paths,
})
}
pub fn is_remote(&self) -> bool {
self.exec_server_url.is_some()
}
/// Returns the remote exec-server URL when this environment is remote.
pub fn exec_server_url(&self) -> Option<&str> {
self.exec_server_url.as_deref()
}
pub fn local_runtime_paths(&self) -> Option<&ExecServerRuntimePaths> {
self.local_runtime_paths.as_ref()
}
pub fn default_cwd(&self) -> Option<&Path> {
self.default_cwd.as_deref()
}
pub fn get_exec_backend(&self) -> Arc<dyn ExecBackend> {
Arc::clone(&self.exec_backend)
}
pub fn get_filesystem(&self) -> Arc<dyn ExecutorFileSystem> {
match self.remote_exec_server_client.clone() {
Some(client) => Arc::new(RemoteFileSystem::new(client)),
None => match self.local_runtime_paths.clone() {
Some(runtime_paths) => Arc::new(LocalFileSystem::with_runtime_paths(runtime_paths)),
None => Arc::new(LocalFileSystem::unsandboxed()),
},
}
}
}
fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Option<String>, bool) {
match exec_server_url.as_deref().map(str::trim) {
None | Some("") => (None, false),
Some(url) if url.eq_ignore_ascii_case("none") => (None, true),
Some(url) => (Some(url.to_string()), false),
}
}
fn parse_environment_id(
environment_id: Option<&str>,
) -> Result<Option<ExplicitEnvironmentId>, ExecServerError> {
match environment_id.map(str::trim) {
None | Some("") => Ok(None),
Some(environment_id) if environment_id.eq_ignore_ascii_case(LOCAL_ENVIRONMENT_ID) => {
Ok(Some(ExplicitEnvironmentId::Local))
}
Some(environment_id) if environment_id.eq_ignore_ascii_case(REMOTE_ENVIRONMENT_ID) => {
Ok(Some(ExplicitEnvironmentId::Remote))
}
Some(environment_id) => Err(ExecServerError::Protocol(format!(
"unknown environment id: {environment_id}"
))),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::Environment;
use super::EnvironmentManager;
use crate::ExecServerRuntimePaths;
use crate::ProcessId;
use pretty_assertions::assert_eq;
#[tokio::test]
async fn create_local_environment_does_not_connect() {
let environment = Environment::create(/*exec_server_url*/ None)
.await
.expect("create environment");
assert_eq!(environment.exec_server_url(), None);
assert!(environment.remote_exec_server_client.is_none());
}
#[test]
fn environment_manager_normalizes_empty_url() {
let manager = EnvironmentManager::new(Some(String::new()));
assert!(!manager.disabled);
assert_eq!(manager.exec_server_url(), None);
assert!(!manager.is_remote());
}
#[test]
fn environment_manager_treats_none_value_as_disabled() {
let manager = EnvironmentManager::new(Some("none".to_string()));
assert!(manager.disabled);
assert_eq!(manager.exec_server_url(), None);
assert!(!manager.is_remote());
}
#[test]
fn environment_manager_reports_remote_url() {
let manager = EnvironmentManager::new(Some("ws://127.0.0.1:8765".to_string()));
assert!(manager.is_remote());
assert_eq!(manager.exec_server_url(), Some("ws://127.0.0.1:8765"));
}
#[test]
fn environment_manager_carries_remote_default_cwd() {
let manager = EnvironmentManager::new_with_runtime_paths(
Some("ws://127.0.0.1:8765".to_string()),
Some(PathBuf::from("/tmp/devbox")),
/*local_runtime_paths*/ None,
);
assert_eq!(
manager
.remote_environment_config
.as_ref()
.and_then(|config| config.default_cwd.as_deref()),
Some(Path::new("/tmp/devbox"))
);
}
#[tokio::test]
async fn environment_manager_current_caches_environment() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
let first = manager.current().await.expect("get current environment");
let second = manager.current().await.expect("get current environment");
let first = first.expect("local environment");
let second = second.expect("local environment");
assert!(Arc::ptr_eq(&first, &second));
}
#[tokio::test]
async fn environment_manager_carries_local_runtime_paths() {
let runtime_paths = ExecServerRuntimePaths::new(
std::env::current_exe().expect("current exe"),
/*codex_linux_sandbox_exe*/ None,
)
.expect("runtime paths");
let manager = EnvironmentManager::new_with_runtime_paths(
/*exec_server_url*/ None,
/*exec_server_cwd*/ None,
Some(runtime_paths.clone()),
);
let environment = manager
.current()
.await
.expect("get current environment")
.expect("local environment");
assert_eq!(environment.local_runtime_paths(), Some(&runtime_paths));
assert_eq!(
EnvironmentManager::from_environment(Some(&environment)).local_runtime_paths,
Some(runtime_paths)
);
}
#[tokio::test]
async fn environment_manager_preserves_default_cwd_from_environment() {
let environment = Environment::create_with_config(
EnvironmentConfig {
exec_server_url: None,
default_cwd: Some(PathBuf::from("/tmp/session")),
},
/*local_runtime_paths*/ None,
)
.await
.expect("create environment");
let manager = EnvironmentManager::from_environment(Some(&environment));
assert_eq!(
manager
.current()
.await
.expect("get current environment")
.expect("local environment")
.default_cwd(),
Some(Path::new("/tmp/session"))
);
}
#[test]
fn environment_manager_reports_default_cwd_for_selected_environment() {
let manager = EnvironmentManager::new_with_runtime_paths(
Some("ws://127.0.0.1:8765".to_string()),
Some(PathBuf::from("/tmp/devbox")),
/*local_runtime_paths*/ None,
);
assert_eq!(
manager.default_cwd(/*environment_id*/ None),
Ok(Some(Path::new("/tmp/devbox")))
);
assert_eq!(
manager.default_cwd(Some("remote")),
Ok(Some(Path::new("/tmp/devbox")))
);
assert_eq!(manager.default_cwd(Some("local")), Ok(None));
}
#[tokio::test]
async fn disabled_environment_manager_has_no_current_environment() {
let manager = EnvironmentManager::new(Some("none".to_string()));
assert!(
manager
.current()
.await
.expect("get current environment")
.is_none()
);
}
#[tokio::test]
async fn environment_manager_explicit_local_selection_bypasses_remote_default() {
let manager = EnvironmentManager::new(Some("ws://127.0.0.1:8765".to_string()));
let environment = manager
.environment(Some("local"))
.await
.expect("get explicit local environment")
.expect("local environment");
assert!(!environment.is_remote());
assert_eq!(environment.exec_server_url(), None);
}
#[tokio::test]
async fn environment_manager_rejects_remote_selection_when_not_configured() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
let err = manager
.environment(Some("remote"))
.await
.expect_err("remote selection should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: remote environment is not configured"
);
}
#[tokio::test]
async fn environment_manager_rejects_explicit_selection_when_disabled() {
let manager = EnvironmentManager::new(Some("none".to_string()));
let err = manager
.environment(Some("local"))
.await
.expect_err("explicit local selection should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: environments are disabled for this session"
);
}
#[tokio::test]
async fn environment_manager_rejects_unknown_environment_id() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
let err = manager
.environment(Some("mystery"))
.await
.expect_err("unknown environment should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: unknown environment id: mystery"
);
}
#[tokio::test]
async fn default_environment_has_ready_local_executor() {
let environment = Environment::default();
let response = environment
.get_exec_backend()
.start(crate::ExecParams {
process_id: ProcessId::from("default-env-proc"),
argv: vec!["true".to_string()],
cwd: std::env::current_dir().expect("read current dir"),
env_policy: None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await
.expect("start process");
assert_eq!(response.process.process_id().as_str(), "default-env-proc");
}
}