mirror of
https://github.com/openai/codex.git
synced 2026-05-07 04:47:13 +00:00
Compare commits
1 Commits
pr21236
...
exec-env-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4612c73c5 |
@@ -7,7 +7,6 @@ use codex_config::NoopThreadConfigLoader;
|
||||
use codex_config::RemoteThreadConfigLoader;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_core::config::Config;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
@@ -51,6 +50,7 @@ use codex_core::ExecPolicyError;
|
||||
use codex_core::check_execpolicy_for_warnings;
|
||||
use codex_core::config::find_codex_home;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -420,15 +420,6 @@ pub async fn run_main_with_transport_options(
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
runtime_options: AppServerRuntimeOptions,
|
||||
) -> IoResult<()> {
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
))
|
||||
.await,
|
||||
);
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
@@ -444,6 +435,17 @@ pub async fn run_main_with_transport_options(
|
||||
)
|
||||
})?;
|
||||
let codex_home = find_codex_home()?;
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
codex_home.clone(),
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
))
|
||||
.await
|
||||
.map_err(std::io::Error::other)?,
|
||||
);
|
||||
let config_manager = ConfigManager::new(
|
||||
codex_home.to_path_buf(),
|
||||
cli_kv_overrides.clone(),
|
||||
|
||||
@@ -446,7 +446,7 @@ struct AppServerCommand {
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct ExecServerCommand {
|
||||
/// Transport endpoint URL. Supported values: `ws://IP:PORT` (default).
|
||||
/// Transport endpoint URL. Supported values: `ws://IP:PORT` (default), `stdio`, `stdio://`.
|
||||
#[arg(
|
||||
long = "listen",
|
||||
value_name = "URL",
|
||||
|
||||
@@ -200,8 +200,11 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
config.codex_self_exe.clone(),
|
||||
config.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let environment_manager =
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await;
|
||||
let environment_manager = EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
config.codex_home.clone(),
|
||||
local_runtime_paths,
|
||||
))
|
||||
.await?;
|
||||
list_accessible_connectors_from_mcp_tools_with_environment_manager(
|
||||
config,
|
||||
force_refetch,
|
||||
|
||||
@@ -5,6 +5,7 @@ use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_login::AuthManager;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -34,12 +35,18 @@ pub async fn build_prompt_input(
|
||||
config.codex_self_exe.clone(),
|
||||
config.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
|
||||
let thread_manager = ThreadManager::new(
|
||||
&config,
|
||||
Arc::clone(&auth_manager),
|
||||
SessionSource::Exec,
|
||||
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await),
|
||||
Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
config.codex_home.clone(),
|
||||
local_runtime_paths,
|
||||
))
|
||||
.await
|
||||
.map_err(|err| CodexErr::Fatal(err.to_string()))?,
|
||||
),
|
||||
/*analytics_events_client*/ None,
|
||||
);
|
||||
let thread_store = thread_store_from_config(&config);
|
||||
|
||||
@@ -3,6 +3,10 @@ load("//:defs.bzl", "codex_rust_crate")
|
||||
codex_rust_crate(
|
||||
name = "exec-server",
|
||||
crate_name = "codex_exec_server",
|
||||
deps_extra = [
|
||||
"@crates//:schemars",
|
||||
"@crates//:toml",
|
||||
],
|
||||
# Keep the crate's integration tests single-threaded under Bazel because
|
||||
# they install process-global test-binary dispatch state, and the remote
|
||||
# exec-server cases already rely on serialization around the full CLI path.
|
||||
|
||||
@@ -24,9 +24,11 @@ codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-pty = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["rustls-tls", "stream"] }
|
||||
schemars = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
tokio = { workspace = true, features = [
|
||||
"fs",
|
||||
"io-std",
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::sync::OnceLock;
|
||||
@@ -11,6 +12,11 @@ use codex_app_server_protocol::JSONRPCNotification;
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use serde_json::Value;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::Child;
|
||||
use tokio::process::Command;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -19,11 +25,14 @@ use tokio::sync::watch;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ProcessId;
|
||||
use crate::client_api::ExecServerClientConnectOptions;
|
||||
use crate::client_api::ExecServerTransport;
|
||||
use crate::client_api::HttpClient;
|
||||
use crate::client_api::RemoteExecServerConnectArgs;
|
||||
use crate::client_api::StdioExecServerConnectArgs;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::process::ExecProcessEvent;
|
||||
use crate::process::ExecProcessEventLog;
|
||||
@@ -105,6 +114,16 @@ impl From<RemoteExecServerConnectArgs> for ExecServerClientConnectOptions {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StdioExecServerConnectArgs> for ExecServerClientConnectOptions {
|
||||
fn from(value: StdioExecServerConnectArgs) -> Self {
|
||||
Self {
|
||||
client_name: value.client_name,
|
||||
initialize_timeout: value.initialize_timeout,
|
||||
resume_session_id: value.resume_session_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteExecServerConnectArgs {
|
||||
pub fn new(websocket_url: String, client_name: String) -> Self {
|
||||
Self {
|
||||
@@ -180,29 +199,45 @@ pub struct ExecServerClient {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct LazyRemoteExecServerClient {
|
||||
websocket_url: String,
|
||||
transport: ExecServerTransport,
|
||||
client: Arc<OnceCell<ExecServerClient>>,
|
||||
}
|
||||
|
||||
impl LazyRemoteExecServerClient {
|
||||
pub(crate) fn new(websocket_url: String) -> Self {
|
||||
pub(crate) fn new(transport: ExecServerTransport) -> Self {
|
||||
Self {
|
||||
websocket_url,
|
||||
transport,
|
||||
client: Arc::new(OnceCell::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn get(&self) -> Result<ExecServerClient, ExecServerError> {
|
||||
self.client
|
||||
.get_or_try_init(|| async {
|
||||
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
|
||||
websocket_url: self.websocket_url.clone(),
|
||||
client_name: "codex-environment".to_string(),
|
||||
connect_timeout: Duration::from_secs(5),
|
||||
initialize_timeout: Duration::from_secs(5),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
.get_or_try_init(|| {
|
||||
let transport = self.transport.clone();
|
||||
async move {
|
||||
match transport {
|
||||
ExecServerTransport::WebSocketUrl(websocket_url) => {
|
||||
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
|
||||
websocket_url,
|
||||
client_name: "codex-environment".to_string(),
|
||||
connect_timeout: Duration::from_secs(5),
|
||||
initialize_timeout: Duration::from_secs(5),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
}
|
||||
ExecServerTransport::StdioShellCommand(shell_command) => {
|
||||
ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
|
||||
shell_command,
|
||||
client_name: "codex-environment".to_string(),
|
||||
initialize_timeout: Duration::from_secs(5),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.cloned()
|
||||
@@ -283,6 +318,51 @@ impl ExecServerClient {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn connect_stdio_command(
|
||||
args: StdioExecServerConnectArgs,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let shell_command = args.shell_command.clone();
|
||||
let mut child = shell_command_process(&shell_command)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.map_err(ExecServerError::Spawn)?;
|
||||
|
||||
let stdin = child.stdin.take().ok_or_else(|| {
|
||||
ExecServerError::Protocol("spawned exec-server command has no stdin".to_string())
|
||||
})?;
|
||||
let stdout = child.stdout.take().ok_or_else(|| {
|
||||
ExecServerError::Protocol("spawned exec-server command has no stdout".to_string())
|
||||
})?;
|
||||
if let Some(stderr) = child.stderr.take() {
|
||||
tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(stderr).lines();
|
||||
loop {
|
||||
match lines.next_line().await {
|
||||
Ok(Some(line)) => debug!("exec-server stdio stderr: {line}"),
|
||||
Ok(None) => break,
|
||||
Err(err) => {
|
||||
warn!("failed to read exec-server stdio stderr: {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Self::connect(
|
||||
JsonRpcConnection::from_stdio(
|
||||
stdout,
|
||||
stdin,
|
||||
format!("exec-server stdio command `{shell_command}`"),
|
||||
)
|
||||
.with_drop_resource(Box::new(StdioChildGuard { child: Some(child) })),
|
||||
args.into(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn initialize(
|
||||
&self,
|
||||
options: ExecServerClientConnectOptions,
|
||||
@@ -526,6 +606,60 @@ impl ExecServerClient {
|
||||
}
|
||||
}
|
||||
|
||||
struct StdioChildGuard {
|
||||
child: Option<Child>,
|
||||
}
|
||||
|
||||
impl Drop for StdioChildGuard {
|
||||
fn drop(&mut self) {
|
||||
let Some(child) = self.child.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
match Handle::try_current() {
|
||||
Ok(handle) => {
|
||||
let _terminate_task = handle.spawn(terminate_stdio_child(child));
|
||||
}
|
||||
Err(_) => {
|
||||
terminate_stdio_child_now(child);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn terminate_stdio_child(mut child: Child) {
|
||||
kill_stdio_child(&mut child);
|
||||
if let Err(err) = child.wait().await {
|
||||
debug!("failed to wait for exec-server stdio child: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
fn terminate_stdio_child_now(mut child: Child) {
|
||||
kill_stdio_child(&mut child);
|
||||
}
|
||||
|
||||
fn kill_stdio_child(child: &mut Child) {
|
||||
if let Err(err) = child.start_kill() {
|
||||
debug!("failed to terminate exec-server stdio child: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
fn shell_command_process(shell_command: &str) -> Command {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
let mut command = Command::new("cmd");
|
||||
command.arg("/C").arg(shell_command);
|
||||
command
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
{
|
||||
let mut command = Command::new("sh");
|
||||
command.arg("-lc").arg(shell_command);
|
||||
command
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RpcCallError> for ExecServerError {
|
||||
fn from(value: RpcCallError) -> Self {
|
||||
match value {
|
||||
@@ -893,6 +1027,7 @@ mod tests {
|
||||
use super::ExecServerClient;
|
||||
use super::ExecServerClientConnectOptions;
|
||||
use crate::ProcessId;
|
||||
use crate::client_api::StdioExecServerConnectArgs;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::process::ExecProcessEvent;
|
||||
use crate::protocol::EXEC_CLOSED_METHOD;
|
||||
@@ -930,6 +1065,21 @@ mod tests {
|
||||
.expect("json-rpc line should write");
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
#[tokio::test]
|
||||
async fn connect_stdio_command_initializes_json_rpc_client() {
|
||||
let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
|
||||
shell_command: "read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(),
|
||||
client_name: "stdio-test-client".to_string(),
|
||||
initialize_timeout: Duration::from_secs(1),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
.expect("stdio client should connect");
|
||||
|
||||
assert_eq!(client.session_id().as_deref(), Some("stdio-test"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn process_events_are_delivered_in_seq_order_when_notifications_are_reordered() {
|
||||
let (client_stdin, server_reader) = duplex(1 << 20);
|
||||
|
||||
@@ -25,6 +25,22 @@ pub struct RemoteExecServerConnectArgs {
|
||||
pub resume_session_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Stdio connection arguments for a command-backed exec-server.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct StdioExecServerConnectArgs {
|
||||
pub shell_command: String,
|
||||
pub client_name: String,
|
||||
pub initialize_timeout: Duration,
|
||||
pub resume_session_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Transport used to connect to a remote exec-server environment.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ExecServerTransport {
|
||||
WebSocketUrl(String),
|
||||
StdioShellCommand(String),
|
||||
}
|
||||
|
||||
/// Sends HTTP requests through a runtime-selected transport.
|
||||
///
|
||||
/// This is the HTTP capability counterpart to [`crate::ExecBackend`]. Callers
|
||||
|
||||
@@ -8,13 +8,9 @@ use tokio::sync::watch;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
#[cfg(test)]
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
#[cfg(test)]
|
||||
use tokio::io::AsyncWriteExt;
|
||||
#[cfg(test)]
|
||||
use tokio::io::BufReader;
|
||||
#[cfg(test)]
|
||||
use tokio::io::BufWriter;
|
||||
|
||||
pub(crate) const CHANNEL_CAPACITY: usize = 128;
|
||||
@@ -31,10 +27,10 @@ pub(crate) struct JsonRpcConnection {
|
||||
incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
disconnected_rx: watch::Receiver<bool>,
|
||||
task_handles: Vec<tokio::task::JoinHandle<()>>,
|
||||
drop_resource: Option<Box<dyn Send>>,
|
||||
}
|
||||
|
||||
impl JsonRpcConnection {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn from_stdio<R, W>(reader: R, writer: W, connection_label: String) -> Self
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + 'static,
|
||||
@@ -122,6 +118,7 @@ impl JsonRpcConnection {
|
||||
incoming_rx,
|
||||
disconnected_rx,
|
||||
task_handles: vec![reader_task, writer_task],
|
||||
drop_resource: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,9 +253,15 @@ impl JsonRpcConnection {
|
||||
incoming_rx,
|
||||
disconnected_rx,
|
||||
task_handles: vec![reader_task, writer_task],
|
||||
drop_resource: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn with_drop_resource(mut self, drop_resource: Box<dyn Send>) -> Self {
|
||||
self.drop_resource = Some(drop_resource);
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn into_parts(
|
||||
self,
|
||||
) -> (
|
||||
@@ -266,12 +269,14 @@ impl JsonRpcConnection {
|
||||
mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
watch::Receiver<bool>,
|
||||
Vec<tokio::task::JoinHandle<()>>,
|
||||
Option<Box<dyn Send>>,
|
||||
) {
|
||||
(
|
||||
self.outgoing_tx,
|
||||
self.incoming_rx,
|
||||
self.disconnected_rx,
|
||||
self.task_handles,
|
||||
self.drop_resource,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -298,7 +303,6 @@ async fn send_malformed_message(
|
||||
.await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
async fn write_jsonrpc_line_message<W>(
|
||||
writer: &mut BufWriter<W>,
|
||||
message: &JSONRPCMessage,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::ExecServerError;
|
||||
@@ -7,8 +8,11 @@ use crate::ExecutorFileSystem;
|
||||
use crate::HttpClient;
|
||||
use crate::client::LazyRemoteExecServerClient;
|
||||
use crate::client::http_client::ReqwestHttpClient;
|
||||
use crate::client_api::ExecServerTransport;
|
||||
use crate::environment_provider::DefaultEnvironmentProvider;
|
||||
use crate::environment_provider::DefaultEnvironmentSelection;
|
||||
use crate::environment_provider::EnvironmentProvider;
|
||||
use crate::environment_provider::environment_provider_from_codex_home;
|
||||
use crate::environment_provider::normalize_exec_server_url;
|
||||
use crate::local_file_system::LocalFileSystem;
|
||||
use crate::local_process::LocalProcess;
|
||||
@@ -31,8 +35,8 @@ pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
|
||||
/// shell/filesystem tool availability.
|
||||
///
|
||||
/// Remote environments create remote filesystem and execution backends that
|
||||
/// lazy-connect to the configured exec-server on first use. The websocket is
|
||||
/// not opened when the manager or environment is constructed.
|
||||
/// lazy-connect to the configured exec-server on first use. The remote
|
||||
/// transport is not opened when the manager or environment is constructed.
|
||||
#[derive(Debug)]
|
||||
pub struct EnvironmentManager {
|
||||
default_environment: Option<String>,
|
||||
@@ -45,12 +49,14 @@ pub const REMOTE_ENVIRONMENT_ID: &str = "remote";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct EnvironmentManagerArgs {
|
||||
pub codex_home: std::path::PathBuf,
|
||||
pub local_runtime_paths: ExecServerRuntimePaths,
|
||||
}
|
||||
|
||||
impl EnvironmentManagerArgs {
|
||||
pub fn new(local_runtime_paths: ExecServerRuntimePaths) -> Self {
|
||||
pub fn new(codex_home: impl AsRef<Path>, local_runtime_paths: ExecServerRuntimePaths) -> Self {
|
||||
Self {
|
||||
codex_home: codex_home.as_ref().to_path_buf(),
|
||||
local_runtime_paths,
|
||||
}
|
||||
}
|
||||
@@ -71,9 +77,12 @@ impl EnvironmentManager {
|
||||
|
||||
/// Builds a test-only manager with environment access disabled.
|
||||
pub fn disabled_for_tests(local_runtime_paths: ExecServerRuntimePaths) -> Self {
|
||||
let mut manager = Self::from_environments(HashMap::new(), local_runtime_paths);
|
||||
manager.default_environment = None;
|
||||
manager
|
||||
Self::from_environments(
|
||||
HashMap::new(),
|
||||
local_runtime_paths,
|
||||
DefaultEnvironmentSelection::Disabled,
|
||||
)
|
||||
.expect("disabled test environment manager")
|
||||
}
|
||||
|
||||
/// Builds a test-only manager from a raw exec-server URL value.
|
||||
@@ -84,30 +93,29 @@ impl EnvironmentManager {
|
||||
Self::from_default_provider_url(exec_server_url, local_runtime_paths).await
|
||||
}
|
||||
|
||||
/// Builds a manager from `CODEX_EXEC_SERVER_URL` and local runtime paths
|
||||
/// used when creating local filesystem helpers.
|
||||
pub async fn new(args: EnvironmentManagerArgs) -> Self {
|
||||
/// Builds a manager from `CODEX_HOME` and local runtime paths used when
|
||||
/// creating local filesystem helpers.
|
||||
pub async fn new(args: EnvironmentManagerArgs) -> Result<Self, ExecServerError> {
|
||||
let EnvironmentManagerArgs {
|
||||
codex_home,
|
||||
local_runtime_paths,
|
||||
} = args;
|
||||
let exec_server_url = std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok();
|
||||
Self::from_default_provider_url(exec_server_url, local_runtime_paths).await
|
||||
let provider = environment_provider_from_codex_home(codex_home.as_path())?;
|
||||
Self::from_provider(provider.as_ref(), local_runtime_paths).await
|
||||
}
|
||||
|
||||
async fn from_default_provider_url(
|
||||
exec_server_url: Option<String>,
|
||||
local_runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Self {
|
||||
let environment_disabled = normalize_exec_server_url(exec_server_url.clone()).1;
|
||||
let provider = DefaultEnvironmentProvider::new(exec_server_url);
|
||||
let provider_environments = provider.environments(&local_runtime_paths);
|
||||
let mut manager = Self::from_environments(provider_environments, local_runtime_paths);
|
||||
if environment_disabled {
|
||||
// TODO: Remove this legacy `CODEX_EXEC_SERVER_URL=none` crutch once
|
||||
// environment attachment defaulting moves out of EnvironmentManager.
|
||||
manager.default_environment = None;
|
||||
}
|
||||
manager
|
||||
Self::from_environments(
|
||||
provider_environments,
|
||||
local_runtime_paths,
|
||||
provider.default_environment_selection(),
|
||||
)
|
||||
.expect("default provider should create valid environments")
|
||||
}
|
||||
|
||||
/// Builds a manager from a provider-supplied startup snapshot.
|
||||
@@ -121,12 +129,14 @@ impl EnvironmentManager {
|
||||
Self::from_provider_environments(
|
||||
provider.get_environments(&local_runtime_paths).await?,
|
||||
local_runtime_paths,
|
||||
provider.default_environment_selection(),
|
||||
)
|
||||
}
|
||||
|
||||
fn from_provider_environments(
|
||||
environments: HashMap<String, Environment>,
|
||||
local_runtime_paths: ExecServerRuntimePaths,
|
||||
default_selection: DefaultEnvironmentSelection,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
for id in environments.keys() {
|
||||
if id.is_empty() {
|
||||
@@ -136,21 +146,35 @@ impl EnvironmentManager {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self::from_environments(environments, local_runtime_paths))
|
||||
Self::from_environments(environments, local_runtime_paths, default_selection)
|
||||
}
|
||||
|
||||
fn from_environments(
|
||||
environments: HashMap<String, Environment>,
|
||||
local_runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Self {
|
||||
default_selection: DefaultEnvironmentSelection,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
// TODO: Stop deriving a default environment here once omitted
|
||||
// environment attachment is owned by thread/session setup.
|
||||
let default_environment = if environments.contains_key(REMOTE_ENVIRONMENT_ID) {
|
||||
Some(REMOTE_ENVIRONMENT_ID.to_string())
|
||||
} else if environments.contains_key(LOCAL_ENVIRONMENT_ID) {
|
||||
Some(LOCAL_ENVIRONMENT_ID.to_string())
|
||||
} else {
|
||||
None
|
||||
let default_environment = match default_selection {
|
||||
DefaultEnvironmentSelection::Derived => {
|
||||
if environments.contains_key(REMOTE_ENVIRONMENT_ID) {
|
||||
Some(REMOTE_ENVIRONMENT_ID.to_string())
|
||||
} else if environments.contains_key(LOCAL_ENVIRONMENT_ID) {
|
||||
Some(LOCAL_ENVIRONMENT_ID.to_string())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
DefaultEnvironmentSelection::Environment(environment_id) => {
|
||||
if !environments.contains_key(&environment_id) {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"default environment `{environment_id}` is not configured"
|
||||
)));
|
||||
}
|
||||
Some(environment_id)
|
||||
}
|
||||
DefaultEnvironmentSelection::Disabled => None,
|
||||
};
|
||||
let local_environment = Arc::new(Environment::local(local_runtime_paths));
|
||||
let environments = environments
|
||||
@@ -158,11 +182,11 @@ impl EnvironmentManager {
|
||||
.map(|(id, environment)| (id, Arc::new(environment)))
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
default_environment,
|
||||
environments,
|
||||
local_environment,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the default environment instance.
|
||||
@@ -195,6 +219,7 @@ impl EnvironmentManager {
|
||||
#[derive(Clone)]
|
||||
pub struct Environment {
|
||||
exec_server_url: Option<String>,
|
||||
remote_transport: Option<ExecServerTransport>,
|
||||
exec_backend: Arc<dyn ExecBackend>,
|
||||
filesystem: Arc<dyn ExecutorFileSystem>,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
@@ -206,6 +231,7 @@ impl Environment {
|
||||
pub fn default_for_tests() -> Self {
|
||||
Self {
|
||||
exec_server_url: None,
|
||||
remote_transport: None,
|
||||
exec_backend: Arc::new(LocalProcess::default()),
|
||||
filesystem: Arc::new(LocalFileSystem::unsandboxed()),
|
||||
http_client: Arc::new(ReqwestHttpClient),
|
||||
@@ -261,6 +287,7 @@ impl Environment {
|
||||
pub(crate) fn local(local_runtime_paths: ExecServerRuntimePaths) -> Self {
|
||||
Self {
|
||||
exec_server_url: None,
|
||||
remote_transport: None,
|
||||
exec_backend: Arc::new(LocalProcess::default()),
|
||||
filesystem: Arc::new(LocalFileSystem::with_runtime_paths(
|
||||
local_runtime_paths.clone(),
|
||||
@@ -274,13 +301,38 @@ impl Environment {
|
||||
exec_server_url: String,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
let client = LazyRemoteExecServerClient::new(exec_server_url.clone());
|
||||
Self::remote_with_transport(
|
||||
ExecServerTransport::WebSocketUrl(exec_server_url),
|
||||
local_runtime_paths,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn remote_stdio_shell_command(
|
||||
shell_command: String,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
Self::remote_with_transport(
|
||||
ExecServerTransport::StdioShellCommand(shell_command),
|
||||
local_runtime_paths,
|
||||
)
|
||||
}
|
||||
|
||||
fn remote_with_transport(
|
||||
transport: ExecServerTransport,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
let exec_server_url = match &transport {
|
||||
ExecServerTransport::WebSocketUrl(url) => Some(url.clone()),
|
||||
ExecServerTransport::StdioShellCommand(_) => None,
|
||||
};
|
||||
let client = LazyRemoteExecServerClient::new(transport.clone());
|
||||
let exec_backend: Arc<dyn ExecBackend> = Arc::new(RemoteProcess::new(client.clone()));
|
||||
let filesystem: Arc<dyn ExecutorFileSystem> =
|
||||
Arc::new(RemoteFileSystem::new(client.clone()));
|
||||
|
||||
Self {
|
||||
exec_server_url: Some(exec_server_url),
|
||||
exec_server_url,
|
||||
remote_transport: Some(transport),
|
||||
exec_backend,
|
||||
filesystem,
|
||||
http_client: Arc::new(client),
|
||||
@@ -289,7 +341,7 @@ impl Environment {
|
||||
}
|
||||
|
||||
pub fn is_remote(&self) -> bool {
|
||||
self.exec_server_url.is_some()
|
||||
self.remote_transport.is_some()
|
||||
}
|
||||
|
||||
/// Returns the remote exec-server URL when this environment is remote.
|
||||
@@ -319,6 +371,7 @@ mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::DefaultEnvironmentSelection;
|
||||
use super::Environment;
|
||||
use super::EnvironmentManager;
|
||||
use super::LOCAL_ENVIRONMENT_ID;
|
||||
@@ -425,7 +478,9 @@ mod tests {
|
||||
.expect("remote environment"),
|
||||
)]),
|
||||
test_runtime_paths(),
|
||||
);
|
||||
DefaultEnvironmentSelection::Derived,
|
||||
)
|
||||
.expect("environment manager");
|
||||
|
||||
assert_eq!(
|
||||
manager.default_environment_id(),
|
||||
@@ -446,6 +501,7 @@ mod tests {
|
||||
let err = EnvironmentManager::from_provider_environments(
|
||||
HashMap::from([("".to_string(), Environment::default_for_tests())]),
|
||||
test_runtime_paths(),
|
||||
DefaultEnvironmentSelection::Derived,
|
||||
)
|
||||
.expect_err("empty id should fail");
|
||||
|
||||
@@ -455,6 +511,64 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_uses_explicit_provider_default() {
|
||||
let manager = EnvironmentManager::from_provider_environments(
|
||||
HashMap::from([
|
||||
(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Environment::default_for_tests(),
|
||||
),
|
||||
(
|
||||
"devbox".to_string(),
|
||||
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
|
||||
.expect("remote environment"),
|
||||
),
|
||||
]),
|
||||
test_runtime_paths(),
|
||||
DefaultEnvironmentSelection::Environment("devbox".to_string()),
|
||||
)
|
||||
.expect("manager");
|
||||
|
||||
assert_eq!(manager.default_environment_id(), Some("devbox"));
|
||||
assert!(manager.default_environment().expect("default").is_remote());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_disables_provider_default() {
|
||||
let manager = EnvironmentManager::from_provider_environments(
|
||||
HashMap::from([(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Environment::default_for_tests(),
|
||||
)]),
|
||||
test_runtime_paths(),
|
||||
DefaultEnvironmentSelection::Disabled,
|
||||
)
|
||||
.expect("manager");
|
||||
|
||||
assert_eq!(manager.default_environment_id(), None);
|
||||
assert!(manager.default_environment().is_none());
|
||||
assert!(manager.get_environment(LOCAL_ENVIRONMENT_ID).is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_rejects_unknown_provider_default() {
|
||||
let err = EnvironmentManager::from_provider_environments(
|
||||
HashMap::from([(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Environment::default_for_tests(),
|
||||
)]),
|
||||
test_runtime_paths(),
|
||||
DefaultEnvironmentSelection::Environment("missing".to_string()),
|
||||
)
|
||||
.expect_err("unknown default should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"exec-server protocol error: default environment `missing` is not configured"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_uses_provider_supplied_local_environment() {
|
||||
let manager = EnvironmentManager::create_for_tests(
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use schemars::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::Environment;
|
||||
use crate::ExecServerError;
|
||||
@@ -21,6 +26,17 @@ pub trait EnvironmentProvider: Send + Sync {
|
||||
&self,
|
||||
local_runtime_paths: &ExecServerRuntimePaths,
|
||||
) -> Result<HashMap<String, Environment>, ExecServerError>;
|
||||
|
||||
fn default_environment_selection(&self) -> DefaultEnvironmentSelection {
|
||||
DefaultEnvironmentSelection::Derived
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum DefaultEnvironmentSelection {
|
||||
Derived,
|
||||
Environment(String),
|
||||
Disabled,
|
||||
}
|
||||
|
||||
/// Default provider backed by `CODEX_EXEC_SERVER_URL`.
|
||||
@@ -69,6 +85,164 @@ impl EnvironmentProvider for DefaultEnvironmentProvider {
|
||||
) -> Result<HashMap<String, Environment>, ExecServerError> {
|
||||
Ok(self.environments(local_runtime_paths))
|
||||
}
|
||||
|
||||
fn default_environment_selection(&self) -> DefaultEnvironmentSelection {
|
||||
if normalize_exec_server_url(self.exec_server_url.clone()).1 {
|
||||
DefaultEnvironmentSelection::Disabled
|
||||
} else {
|
||||
DefaultEnvironmentSelection::Derived
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, JsonSchema)]
|
||||
#[schemars(deny_unknown_fields)]
|
||||
pub struct EnvironmentsToml {
|
||||
pub default: Option<String>,
|
||||
|
||||
#[serde(default)]
|
||||
pub items: Vec<EnvironmentToml>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)]
|
||||
#[schemars(deny_unknown_fields)]
|
||||
pub struct EnvironmentToml {
|
||||
pub id: String,
|
||||
pub url: Option<String>,
|
||||
pub command: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct TomlEnvironmentProvider {
|
||||
config: EnvironmentsToml,
|
||||
}
|
||||
|
||||
impl TomlEnvironmentProvider {
|
||||
pub fn new(config: EnvironmentsToml) -> Result<Self, ExecServerError> {
|
||||
let mut ids = HashSet::from([LOCAL_ENVIRONMENT_ID.to_string()]);
|
||||
for item in &config.items {
|
||||
validate_environment_item(item)?;
|
||||
if !ids.insert(item.id.clone()) {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment id `{}` is duplicated",
|
||||
item.id
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(default) = config.default.as_deref() {
|
||||
let default = default.trim();
|
||||
if default.is_empty() {
|
||||
return Err(ExecServerError::Protocol(
|
||||
"default environment id cannot be empty".to_string(),
|
||||
));
|
||||
}
|
||||
if !default.eq_ignore_ascii_case("none") && !ids.contains(default) {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"default environment `{default}` is not configured"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self { config })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EnvironmentProvider for TomlEnvironmentProvider {
|
||||
async fn get_environments(
|
||||
&self,
|
||||
local_runtime_paths: &ExecServerRuntimePaths,
|
||||
) -> Result<HashMap<String, Environment>, ExecServerError> {
|
||||
let mut environments = HashMap::from([(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Environment::local(local_runtime_paths.clone()),
|
||||
)]);
|
||||
|
||||
for item in &self.config.items {
|
||||
let environment = match (item.url.as_deref(), item.command.as_deref()) {
|
||||
(Some(url), None) => Environment::remote_inner(
|
||||
url.trim().to_string(),
|
||||
Some(local_runtime_paths.clone()),
|
||||
),
|
||||
(None, Some(command)) => Environment::remote_stdio_shell_command(
|
||||
command.trim().to_string(),
|
||||
Some(local_runtime_paths.clone()),
|
||||
),
|
||||
_ => unreachable!("transport shape validated by TomlEnvironmentProvider::new"),
|
||||
};
|
||||
environments.insert(item.id.clone(), environment);
|
||||
}
|
||||
|
||||
Ok(environments)
|
||||
}
|
||||
|
||||
fn default_environment_selection(&self) -> DefaultEnvironmentSelection {
|
||||
match self.config.default.as_deref().map(str::trim) {
|
||||
None => DefaultEnvironmentSelection::Environment(LOCAL_ENVIRONMENT_ID.to_string()),
|
||||
Some(default) if default.eq_ignore_ascii_case("none") => {
|
||||
DefaultEnvironmentSelection::Disabled
|
||||
}
|
||||
Some(default) => DefaultEnvironmentSelection::Environment(default.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_environment_item(item: &EnvironmentToml) -> Result<(), ExecServerError> {
|
||||
let id = item.id.trim();
|
||||
if id.is_empty() {
|
||||
return Err(ExecServerError::Protocol(
|
||||
"environment id cannot be empty".to_string(),
|
||||
));
|
||||
}
|
||||
if id != item.id {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment id `{}` must not contain surrounding whitespace",
|
||||
item.id
|
||||
)));
|
||||
}
|
||||
if item.id == LOCAL_ENVIRONMENT_ID || item.id.eq_ignore_ascii_case("none") {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment id `{}` is reserved",
|
||||
item.id
|
||||
)));
|
||||
}
|
||||
|
||||
match (item.url.as_deref(), item.command.as_deref()) {
|
||||
(Some(url), None) => validate_websocket_url(url),
|
||||
(None, Some(command)) => {
|
||||
if command.trim().is_empty() {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment `{}` command cannot be empty",
|
||||
item.id
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
(None, None) => Err(ExecServerError::Protocol(format!(
|
||||
"environment `{}` must set exactly one of url or command",
|
||||
item.id
|
||||
))),
|
||||
(Some(_), Some(_)) => Err(ExecServerError::Protocol(format!(
|
||||
"environment `{}` must set exactly one of url or command",
|
||||
item.id
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_websocket_url(url: &str) -> Result<(), ExecServerError> {
|
||||
let url = url.trim();
|
||||
if url.is_empty() {
|
||||
return Err(ExecServerError::Protocol(
|
||||
"environment url cannot be empty".to_string(),
|
||||
));
|
||||
}
|
||||
if !url.starts_with("ws://") && !url.starts_with("wss://") {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment url `{url}` must use ws:// or wss://"
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Option<String>, bool) {
|
||||
@@ -79,9 +253,45 @@ pub(crate) fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Opt
|
||||
}
|
||||
}
|
||||
|
||||
const ENVIRONMENTS_TOML_FILE: &str = "environments.toml";
|
||||
|
||||
pub fn environment_provider_from_codex_home(
|
||||
codex_home: &Path,
|
||||
) -> Result<Box<dyn EnvironmentProvider>, ExecServerError> {
|
||||
let path = codex_home.join(ENVIRONMENTS_TOML_FILE);
|
||||
if path.try_exists().map_err(|err| {
|
||||
ExecServerError::Protocol(format!(
|
||||
"failed to inspect environment config `{}`: {err}",
|
||||
path.display()
|
||||
))
|
||||
})? {
|
||||
let environments = load_environments_toml(&path)?;
|
||||
Ok(Box::new(TomlEnvironmentProvider::new(environments)?))
|
||||
} else {
|
||||
Ok(Box::new(DefaultEnvironmentProvider::from_env()))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load_environments_toml(path: &Path) -> Result<EnvironmentsToml, ExecServerError> {
|
||||
let contents = std::fs::read_to_string(path).map_err(|err| {
|
||||
ExecServerError::Protocol(format!(
|
||||
"failed to read environment config `{}`: {err}",
|
||||
path.display()
|
||||
))
|
||||
})?;
|
||||
|
||||
toml::from_str(&contents).map_err(|err| {
|
||||
ExecServerError::Protocol(format!(
|
||||
"failed to parse environment config `{}`: {err}",
|
||||
path.display()
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use super::*;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
@@ -135,6 +345,10 @@ mod tests {
|
||||
|
||||
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
|
||||
assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID));
|
||||
assert_eq!(
|
||||
provider.default_environment_selection(),
|
||||
DefaultEnvironmentSelection::Disabled
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -169,4 +383,214 @@ mod tests {
|
||||
Some("ws://127.0.0.1:8765")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn toml_provider_adds_implicit_local_and_configured_environments() {
|
||||
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: Some("ssh-dev".to_string()),
|
||||
items: vec![
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: Some(" ws://127.0.0.1:8765 ".to_string()),
|
||||
command: None,
|
||||
},
|
||||
EnvironmentToml {
|
||||
id: "ssh-dev".to_string(),
|
||||
url: None,
|
||||
command: Some(" ssh dev \"codex exec-server --listen stdio\" ".to_string()),
|
||||
},
|
||||
],
|
||||
})
|
||||
.expect("provider");
|
||||
let runtime_paths = test_runtime_paths();
|
||||
|
||||
let environments = provider
|
||||
.get_environments(&runtime_paths)
|
||||
.await
|
||||
.expect("environments");
|
||||
|
||||
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
|
||||
assert_eq!(
|
||||
environments["devbox"].exec_server_url(),
|
||||
Some("ws://127.0.0.1:8765")
|
||||
);
|
||||
assert!(environments["ssh-dev"].is_remote());
|
||||
assert_eq!(
|
||||
provider.default_environment_selection(),
|
||||
DefaultEnvironmentSelection::Environment("ssh-dev".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_default_omitted_selects_local() {
|
||||
let provider = TomlEnvironmentProvider::new(EnvironmentsToml::default()).expect("provider");
|
||||
|
||||
assert_eq!(
|
||||
provider.default_environment_selection(),
|
||||
DefaultEnvironmentSelection::Environment(LOCAL_ENVIRONMENT_ID.to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_default_none_disables_default() {
|
||||
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: Some("none".to_string()),
|
||||
items: Vec::new(),
|
||||
})
|
||||
.expect("provider");
|
||||
|
||||
assert_eq!(
|
||||
provider.default_environment_selection(),
|
||||
DefaultEnvironmentSelection::Disabled
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_rejects_invalid_items() {
|
||||
let cases = [
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "local".to_string(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
command: None,
|
||||
},
|
||||
"environment id `local` is reserved",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: " devbox ".to_string(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
command: None,
|
||||
},
|
||||
"environment id ` devbox ` must not contain surrounding whitespace",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: Some("http://127.0.0.1:8765".to_string()),
|
||||
command: None,
|
||||
},
|
||||
"environment url `http://127.0.0.1:8765` must use ws:// or wss://",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
command: Some("codex exec-server --listen stdio".to_string()),
|
||||
},
|
||||
"environment `devbox` must set exactly one of url or command",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: None,
|
||||
command: Some(" ".to_string()),
|
||||
},
|
||||
"environment `devbox` command cannot be empty",
|
||||
),
|
||||
];
|
||||
|
||||
for (item, expected) in cases {
|
||||
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: None,
|
||||
items: vec![item],
|
||||
})
|
||||
.expect_err("invalid item should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!("exec-server protocol error: {expected}")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_rejects_duplicate_ids() {
|
||||
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: None,
|
||||
items: vec![
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
command: None,
|
||||
},
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: None,
|
||||
command: Some("codex exec-server --listen stdio".to_string()),
|
||||
},
|
||||
],
|
||||
})
|
||||
.expect_err("duplicate id should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"exec-server protocol error: environment id `devbox` is duplicated"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_rejects_unknown_default() {
|
||||
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: Some("missing".to_string()),
|
||||
items: Vec::new(),
|
||||
})
|
||||
.expect_err("unknown default should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"exec-server protocol error: default environment `missing` is not configured"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_environments_toml_reads_root_environment_list() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let path = codex_home.path().join(ENVIRONMENTS_TOML_FILE);
|
||||
std::fs::write(
|
||||
&path,
|
||||
r#"
|
||||
default = "ssh-dev"
|
||||
|
||||
[[items]]
|
||||
id = "devbox"
|
||||
url = "ws://127.0.0.1:4512"
|
||||
|
||||
[[items]]
|
||||
id = "ssh-dev"
|
||||
command = 'ssh dev "codex exec-server --listen stdio"'
|
||||
"#,
|
||||
)
|
||||
.expect("write environments.toml");
|
||||
|
||||
let environments = load_environments_toml(&path).expect("environments.toml");
|
||||
|
||||
assert_eq!(environments.default.as_deref(), Some("ssh-dev"));
|
||||
assert_eq!(environments.items.len(), 2);
|
||||
assert_eq!(environments.items[0].id, "devbox");
|
||||
assert_eq!(
|
||||
environments.items[1].command.as_deref(),
|
||||
Some("ssh dev \"codex exec-server --listen stdio\"")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn environment_provider_from_codex_home_uses_present_environments_file() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
std::fs::write(
|
||||
codex_home.path().join(ENVIRONMENTS_TOML_FILE),
|
||||
r#"
|
||||
default = "none"
|
||||
"#,
|
||||
)
|
||||
.expect("write environments.toml");
|
||||
|
||||
let provider =
|
||||
environment_provider_from_codex_home(codex_home.path()).expect("environment provider");
|
||||
|
||||
assert_eq!(
|
||||
provider.default_environment_selection(),
|
||||
DefaultEnvironmentSelection::Disabled
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,10 @@ pub use client::ExecServerError;
|
||||
pub use client::http_client::HttpResponseBodyStream;
|
||||
pub use client::http_client::ReqwestHttpClient;
|
||||
pub use client_api::ExecServerClientConnectOptions;
|
||||
pub use client_api::ExecServerTransport;
|
||||
pub use client_api::HttpClient;
|
||||
pub use client_api::RemoteExecServerConnectArgs;
|
||||
pub use client_api::StdioExecServerConnectArgs;
|
||||
pub use codex_file_system::CopyOptions;
|
||||
pub use codex_file_system::CreateDirectoryOptions;
|
||||
pub use codex_file_system::ExecutorFileSystem;
|
||||
@@ -40,7 +42,13 @@ pub use environment::EnvironmentManagerArgs;
|
||||
pub use environment::LOCAL_ENVIRONMENT_ID;
|
||||
pub use environment::REMOTE_ENVIRONMENT_ID;
|
||||
pub use environment_provider::DefaultEnvironmentProvider;
|
||||
pub use environment_provider::DefaultEnvironmentSelection;
|
||||
pub use environment_provider::EnvironmentProvider;
|
||||
pub use environment_provider::EnvironmentToml;
|
||||
pub use environment_provider::EnvironmentsToml;
|
||||
pub use environment_provider::TomlEnvironmentProvider;
|
||||
pub use environment_provider::environment_provider_from_codex_home;
|
||||
pub use environment_provider::load_environments_toml;
|
||||
pub use fs_helper::CODEX_FS_HELPER_ARG1;
|
||||
pub use fs_helper_main::main as run_fs_helper_main;
|
||||
pub use local_file_system::LOCAL_FS;
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
@@ -229,12 +230,14 @@ pub(crate) struct RpcClient {
|
||||
disconnected_rx: watch::Receiver<bool>,
|
||||
next_request_id: AtomicI64,
|
||||
transport_tasks: Vec<JoinHandle<()>>,
|
||||
transport_drop_resource: StdMutex<Option<Box<dyn Send>>>,
|
||||
reader_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl RpcClient {
|
||||
pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver<RpcClientEvent>) {
|
||||
let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks) = connection.into_parts();
|
||||
let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks, transport_drop_resource) =
|
||||
connection.into_parts();
|
||||
let pending = Arc::new(Mutex::new(HashMap::<RequestId, PendingRequest>::new()));
|
||||
let (event_tx, event_rx) = mpsc::channel(128);
|
||||
|
||||
@@ -275,6 +278,7 @@ impl RpcClient {
|
||||
disconnected_rx,
|
||||
next_request_id: AtomicI64::new(1),
|
||||
transport_tasks,
|
||||
transport_drop_resource: StdMutex::new(transport_drop_resource),
|
||||
reader_task,
|
||||
},
|
||||
event_rx,
|
||||
@@ -369,6 +373,9 @@ impl Drop for RpcClient {
|
||||
task.abort();
|
||||
}
|
||||
self.reader_task.abort();
|
||||
if let Ok(drop_resource) = self.transport_drop_resource.get_mut() {
|
||||
let _ = drop_resource.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,8 +47,13 @@ async fn run_connection(
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
) {
|
||||
let router = Arc::new(build_router());
|
||||
let (json_outgoing_tx, mut incoming_rx, mut disconnected_rx, connection_tasks) =
|
||||
connection.into_parts();
|
||||
let (
|
||||
json_outgoing_tx,
|
||||
mut incoming_rx,
|
||||
mut disconnected_rx,
|
||||
connection_tasks,
|
||||
_drop_resource,
|
||||
) = connection.into_parts();
|
||||
let (outgoing_tx, mut outgoing_rx) =
|
||||
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
|
||||
let notifications = RpcNotificationSender::new(outgoing_tx.clone());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::io::Write as _;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::io;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_tungstenite::accept_async;
|
||||
use tracing::warn;
|
||||
@@ -10,6 +11,12 @@ use crate::server::processor::ConnectionProcessor;
|
||||
|
||||
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub(crate) enum ExecServerListenTransport {
|
||||
WebSocket(SocketAddr),
|
||||
Stdio,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum ExecServerListenUrlParseError {
|
||||
UnsupportedListenUrl(String),
|
||||
@@ -21,7 +28,7 @@ impl std::fmt::Display for ExecServerListenUrlParseError {
|
||||
match self {
|
||||
ExecServerListenUrlParseError::UnsupportedListenUrl(listen_url) => write!(
|
||||
f,
|
||||
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`"
|
||||
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT` or `stdio`"
|
||||
),
|
||||
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url) => write!(
|
||||
f,
|
||||
@@ -35,11 +42,18 @@ impl std::error::Error for ExecServerListenUrlParseError {}
|
||||
|
||||
pub(crate) fn parse_listen_url(
|
||||
listen_url: &str,
|
||||
) -> Result<SocketAddr, ExecServerListenUrlParseError> {
|
||||
) -> Result<ExecServerListenTransport, ExecServerListenUrlParseError> {
|
||||
if matches!(listen_url, "stdio" | "stdio://") {
|
||||
return Ok(ExecServerListenTransport::Stdio);
|
||||
}
|
||||
|
||||
if let Some(socket_addr) = listen_url.strip_prefix("ws://") {
|
||||
return socket_addr.parse::<SocketAddr>().map_err(|_| {
|
||||
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string())
|
||||
});
|
||||
return socket_addr
|
||||
.parse::<SocketAddr>()
|
||||
.map(ExecServerListenTransport::WebSocket)
|
||||
.map_err(|_| {
|
||||
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string())
|
||||
});
|
||||
}
|
||||
|
||||
Err(ExecServerListenUrlParseError::UnsupportedListenUrl(
|
||||
@@ -51,8 +65,27 @@ pub(crate) async fn run_transport(
|
||||
listen_url: &str,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let bind_address = parse_listen_url(listen_url)?;
|
||||
run_websocket_listener(bind_address, runtime_paths).await
|
||||
match parse_listen_url(listen_url)? {
|
||||
ExecServerListenTransport::WebSocket(bind_address) => {
|
||||
run_websocket_listener(bind_address, runtime_paths).await
|
||||
}
|
||||
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_stdio_connection(
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let processor = ConnectionProcessor::new(runtime_paths);
|
||||
tracing::info!("codex-exec-server listening on stdio");
|
||||
processor
|
||||
.run_connection(JsonRpcConnection::from_stdio(
|
||||
io::stdin(),
|
||||
io::stdout(),
|
||||
"exec-server stdio".to_string(),
|
||||
))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_websocket_listener(
|
||||
|
||||
@@ -3,29 +3,45 @@ use std::net::SocketAddr;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::DEFAULT_LISTEN_URL;
|
||||
use super::ExecServerListenTransport;
|
||||
use super::parse_listen_url;
|
||||
|
||||
#[test]
|
||||
fn parse_listen_url_accepts_default_websocket_url() {
|
||||
let bind_address =
|
||||
parse_listen_url(DEFAULT_LISTEN_URL).expect("default listen URL should parse");
|
||||
let transport = parse_listen_url(DEFAULT_LISTEN_URL).expect("default listen URL should parse");
|
||||
assert_eq!(
|
||||
bind_address,
|
||||
"127.0.0.1:0"
|
||||
.parse::<SocketAddr>()
|
||||
.expect("valid socket address")
|
||||
transport,
|
||||
ExecServerListenTransport::WebSocket(
|
||||
"127.0.0.1:0"
|
||||
.parse::<SocketAddr>()
|
||||
.expect("valid socket address")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_listen_url_accepts_stdio() {
|
||||
let transport = parse_listen_url("stdio").expect("stdio listen URL should parse");
|
||||
assert_eq!(transport, ExecServerListenTransport::Stdio);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_listen_url_accepts_stdio_url() {
|
||||
let transport = parse_listen_url("stdio://").expect("stdio listen URL should parse");
|
||||
assert_eq!(transport, ExecServerListenTransport::Stdio);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_listen_url_accepts_websocket_url() {
|
||||
let bind_address =
|
||||
let transport =
|
||||
parse_listen_url("ws://127.0.0.1:1234").expect("websocket listen URL should parse");
|
||||
assert_eq!(
|
||||
bind_address,
|
||||
"127.0.0.1:1234"
|
||||
.parse::<SocketAddr>()
|
||||
.expect("valid socket address")
|
||||
transport,
|
||||
ExecServerListenTransport::WebSocket(
|
||||
"127.0.0.1:1234"
|
||||
.parse::<SocketAddr>()
|
||||
.expect("valid socket address")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -45,6 +61,6 @@ fn parse_listen_url_rejects_unsupported_url() {
|
||||
parse_listen_url("http://127.0.0.1:1234").expect_err("unsupported scheme should fail");
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`"
|
||||
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT` or `stdio`"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ codex-app-server-protocol = { workspace = true }
|
||||
codex-cloud-requirements = { workspace = true }
|
||||
codex-config = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-exec-server = { workspace = true }
|
||||
codex-feedback = { workspace = true }
|
||||
codex-git-utils = { workspace = true }
|
||||
codex-login = { workspace = true }
|
||||
|
||||
@@ -512,7 +512,11 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
environment_manager: std::sync::Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await,
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
config.codex_home.clone(),
|
||||
local_runtime_paths,
|
||||
))
|
||||
.await?,
|
||||
),
|
||||
config_warnings,
|
||||
session_source: SessionSource::Exec,
|
||||
|
||||
@@ -60,15 +60,6 @@ pub async fn run_main(
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
cli_config_overrides: CliConfigOverrides,
|
||||
) -> IoResult<()> {
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
))
|
||||
.await,
|
||||
);
|
||||
// Parse CLI overrides once and derive the base Config eagerly so later
|
||||
// components do not need to work with raw TOML values.
|
||||
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
|
||||
@@ -83,6 +74,17 @@ pub async fn run_main(
|
||||
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
|
||||
})?;
|
||||
set_default_client_residency_requirement(config.enforce_residency.value());
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
config.codex_home.clone(),
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
))
|
||||
.await
|
||||
.map_err(std::io::Error::other)?,
|
||||
);
|
||||
|
||||
let otel = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
|
||||
@@ -110,8 +110,13 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
config.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let thread_store = thread_store_from_config(&config);
|
||||
let environment_manager =
|
||||
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await);
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
config.codex_home.clone(),
|
||||
local_runtime_paths,
|
||||
))
|
||||
.await?,
|
||||
);
|
||||
let thread_manager = ThreadManager::new(
|
||||
&config,
|
||||
auth_manager,
|
||||
|
||||
@@ -747,12 +747,14 @@ pub async fn run_main(
|
||||
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
codex_home.clone(),
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
))
|
||||
.await,
|
||||
.await
|
||||
.map_err(std::io::Error::other)?,
|
||||
);
|
||||
let cwd = cli.cwd.clone();
|
||||
let config_cwd =
|
||||
|
||||
Reference in New Issue
Block a user