Compare commits

...

1 Commits

Author SHA1 Message Date
starr-openai
d4612c73c5 Add config-backed stdio exec-server environments
Add a config-backed environment provider that loads environments from CODEX_HOME, supports both websocket and stdio-backed exec-server endpoints, and threads provider selection through EnvironmentManager. This also teaches the exec-server transport layer to own stdio subprocess lifecycle and adds stdio server listen support.

Validated remotely on dev with Bazel build targets for //codex-rs/cli:codex and //codex-rs/app-server:app-server, plus exec-server unit and integration test targets.

Co-authored-by: Codex <noreply@openai.com>
2026-04-30 16:24:52 -07:00
21 changed files with 910 additions and 101 deletions

View File

@@ -7,7 +7,6 @@ use codex_config::NoopThreadConfigLoader;
use codex_config::RemoteThreadConfigLoader;
use codex_config::ThreadConfigLoader;
use codex_core::config::Config;
use codex_exec_server::EnvironmentManagerArgs;
use codex_features::Feature;
use codex_login::AuthManager;
use codex_utils_cli::CliConfigOverrides;
@@ -51,6 +50,7 @@ use codex_core::ExecPolicyError;
use codex_core::check_execpolicy_for_warnings;
use codex_core::config::find_codex_home;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::EnvironmentManagerArgs;
use codex_exec_server::ExecServerRuntimePaths;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
@@ -420,15 +420,6 @@ pub async fn run_main_with_transport_options(
auth: AppServerWebsocketAuthSettings,
runtime_options: AppServerRuntimeOptions,
) -> IoResult<()> {
let environment_manager = Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
))
.await,
);
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
@@ -444,6 +435,17 @@ pub async fn run_main_with_transport_options(
)
})?;
let codex_home = find_codex_home()?;
let environment_manager = Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
codex_home.clone(),
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
))
.await
.map_err(std::io::Error::other)?,
);
let config_manager = ConfigManager::new(
codex_home.to_path_buf(),
cli_kv_overrides.clone(),

View File

@@ -446,7 +446,7 @@ struct AppServerCommand {
#[derive(Debug, Parser)]
struct ExecServerCommand {
/// Transport endpoint URL. Supported values: `ws://IP:PORT` (default).
/// Transport endpoint URL. Supported values: `ws://IP:PORT` (default), `stdio`, `stdio://`.
#[arg(
long = "listen",
value_name = "URL",

View File

@@ -200,8 +200,11 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
config.codex_self_exe.clone(),
config.codex_linux_sandbox_exe.clone(),
)?;
let environment_manager =
EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await;
let environment_manager = EnvironmentManager::new(EnvironmentManagerArgs::new(
config.codex_home.clone(),
local_runtime_paths,
))
.await?;
list_accessible_connectors_from_mcp_tools_with_environment_manager(
config,
force_refetch,

View File

@@ -5,6 +5,7 @@ use codex_exec_server::EnvironmentManager;
use codex_exec_server::EnvironmentManagerArgs;
use codex_exec_server::ExecServerRuntimePaths;
use codex_login::AuthManager;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
@@ -34,12 +35,18 @@ pub async fn build_prompt_input(
config.codex_self_exe.clone(),
config.codex_linux_sandbox_exe.clone(),
)?;
let thread_manager = ThreadManager::new(
&config,
Arc::clone(&auth_manager),
SessionSource::Exec,
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await),
Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
config.codex_home.clone(),
local_runtime_paths,
))
.await
.map_err(|err| CodexErr::Fatal(err.to_string()))?,
),
/*analytics_events_client*/ None,
);
let thread_store = thread_store_from_config(&config);

View File

@@ -3,6 +3,10 @@ load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "exec-server",
crate_name = "codex_exec_server",
deps_extra = [
"@crates//:schemars",
"@crates//:toml",
],
# Keep the crate's integration tests single-threaded under Bazel because
# they install process-global test-binary dispatch state, and the remote
# exec-server cases already rely on serialization around the full CLI path.

View File

@@ -24,9 +24,11 @@ codex-utils-absolute-path = { workspace = true }
codex-utils-pty = { workspace = true }
futures = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls", "stream"] }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true }
tokio = { workspace = true, features = [
"fs",
"io-std",

View File

@@ -1,5 +1,6 @@
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::OnceLock;
@@ -11,6 +12,11 @@ use codex_app_server_protocol::JSONRPCNotification;
use futures::FutureExt;
use futures::future::BoxFuture;
use serde_json::Value;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Child;
use tokio::process::Command;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tokio::sync::OnceCell;
use tokio::sync::mpsc;
@@ -19,11 +25,14 @@ use tokio::sync::watch;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::debug;
use tracing::warn;
use crate::ProcessId;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::ExecServerTransport;
use crate::client_api::HttpClient;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::client_api::StdioExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecProcessEvent;
use crate::process::ExecProcessEventLog;
@@ -105,6 +114,16 @@ impl From<RemoteExecServerConnectArgs> for ExecServerClientConnectOptions {
}
}
impl From<StdioExecServerConnectArgs> for ExecServerClientConnectOptions {
fn from(value: StdioExecServerConnectArgs) -> Self {
Self {
client_name: value.client_name,
initialize_timeout: value.initialize_timeout,
resume_session_id: value.resume_session_id,
}
}
}
impl RemoteExecServerConnectArgs {
pub fn new(websocket_url: String, client_name: String) -> Self {
Self {
@@ -180,29 +199,45 @@ pub struct ExecServerClient {
#[derive(Clone)]
pub(crate) struct LazyRemoteExecServerClient {
websocket_url: String,
transport: ExecServerTransport,
client: Arc<OnceCell<ExecServerClient>>,
}
impl LazyRemoteExecServerClient {
pub(crate) fn new(websocket_url: String) -> Self {
pub(crate) fn new(transport: ExecServerTransport) -> Self {
Self {
websocket_url,
transport,
client: Arc::new(OnceCell::new()),
}
}
pub(crate) async fn get(&self) -> Result<ExecServerClient, ExecServerError> {
self.client
.get_or_try_init(|| async {
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
websocket_url: self.websocket_url.clone(),
client_name: "codex-environment".to_string(),
connect_timeout: Duration::from_secs(5),
initialize_timeout: Duration::from_secs(5),
resume_session_id: None,
})
.await
.get_or_try_init(|| {
let transport = self.transport.clone();
async move {
match transport {
ExecServerTransport::WebSocketUrl(websocket_url) => {
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
websocket_url,
client_name: "codex-environment".to_string(),
connect_timeout: Duration::from_secs(5),
initialize_timeout: Duration::from_secs(5),
resume_session_id: None,
})
.await
}
ExecServerTransport::StdioShellCommand(shell_command) => {
ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
shell_command,
client_name: "codex-environment".to_string(),
initialize_timeout: Duration::from_secs(5),
resume_session_id: None,
})
.await
}
}
}
})
.await
.cloned()
@@ -283,6 +318,51 @@ impl ExecServerClient {
.await
}
pub async fn connect_stdio_command(
args: StdioExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
let shell_command = args.shell_command.clone();
let mut child = shell_command_process(&shell_command)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(ExecServerError::Spawn)?;
let stdin = child.stdin.take().ok_or_else(|| {
ExecServerError::Protocol("spawned exec-server command has no stdin".to_string())
})?;
let stdout = child.stdout.take().ok_or_else(|| {
ExecServerError::Protocol("spawned exec-server command has no stdout".to_string())
})?;
if let Some(stderr) = child.stderr.take() {
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => debug!("exec-server stdio stderr: {line}"),
Ok(None) => break,
Err(err) => {
warn!("failed to read exec-server stdio stderr: {err}");
break;
}
}
}
});
}
Self::connect(
JsonRpcConnection::from_stdio(
stdout,
stdin,
format!("exec-server stdio command `{shell_command}`"),
)
.with_drop_resource(Box::new(StdioChildGuard { child: Some(child) })),
args.into(),
)
.await
}
pub async fn initialize(
&self,
options: ExecServerClientConnectOptions,
@@ -526,6 +606,60 @@ impl ExecServerClient {
}
}
struct StdioChildGuard {
child: Option<Child>,
}
impl Drop for StdioChildGuard {
fn drop(&mut self) {
let Some(child) = self.child.take() else {
return;
};
match Handle::try_current() {
Ok(handle) => {
let _terminate_task = handle.spawn(terminate_stdio_child(child));
}
Err(_) => {
terminate_stdio_child_now(child);
}
}
}
}
async fn terminate_stdio_child(mut child: Child) {
kill_stdio_child(&mut child);
if let Err(err) = child.wait().await {
debug!("failed to wait for exec-server stdio child: {err}");
}
}
fn terminate_stdio_child_now(mut child: Child) {
kill_stdio_child(&mut child);
}
fn kill_stdio_child(child: &mut Child) {
if let Err(err) = child.start_kill() {
debug!("failed to terminate exec-server stdio child: {err}");
}
}
fn shell_command_process(shell_command: &str) -> Command {
#[cfg(windows)]
{
let mut command = Command::new("cmd");
command.arg("/C").arg(shell_command);
command
}
#[cfg(not(windows))]
{
let mut command = Command::new("sh");
command.arg("-lc").arg(shell_command);
command
}
}
impl From<RpcCallError> for ExecServerError {
fn from(value: RpcCallError) -> Self {
match value {
@@ -893,6 +1027,7 @@ mod tests {
use super::ExecServerClient;
use super::ExecServerClientConnectOptions;
use crate::ProcessId;
use crate::client_api::StdioExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecProcessEvent;
use crate::protocol::EXEC_CLOSED_METHOD;
@@ -930,6 +1065,21 @@ mod tests {
.expect("json-rpc line should write");
}
#[cfg(not(windows))]
#[tokio::test]
async fn connect_stdio_command_initializes_json_rpc_client() {
let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
shell_command: "read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(),
client_name: "stdio-test-client".to_string(),
initialize_timeout: Duration::from_secs(1),
resume_session_id: None,
})
.await
.expect("stdio client should connect");
assert_eq!(client.session_id().as_deref(), Some("stdio-test"));
}
#[tokio::test]
async fn process_events_are_delivered_in_seq_order_when_notifications_are_reordered() {
let (client_stdin, server_reader) = duplex(1 << 20);

View File

@@ -25,6 +25,22 @@ pub struct RemoteExecServerConnectArgs {
pub resume_session_id: Option<String>,
}
/// Stdio connection arguments for a command-backed exec-server.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StdioExecServerConnectArgs {
pub shell_command: String,
pub client_name: String,
pub initialize_timeout: Duration,
pub resume_session_id: Option<String>,
}
/// Transport used to connect to a remote exec-server environment.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecServerTransport {
WebSocketUrl(String),
StdioShellCommand(String),
}
/// Sends HTTP requests through a runtime-selected transport.
///
/// This is the HTTP capability counterpart to [`crate::ExecBackend`]. Callers

View File

@@ -8,13 +8,9 @@ use tokio::sync::watch;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Message;
#[cfg(test)]
use tokio::io::AsyncBufReadExt;
#[cfg(test)]
use tokio::io::AsyncWriteExt;
#[cfg(test)]
use tokio::io::BufReader;
#[cfg(test)]
use tokio::io::BufWriter;
pub(crate) const CHANNEL_CAPACITY: usize = 128;
@@ -31,10 +27,10 @@ pub(crate) struct JsonRpcConnection {
incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
disconnected_rx: watch::Receiver<bool>,
task_handles: Vec<tokio::task::JoinHandle<()>>,
drop_resource: Option<Box<dyn Send>>,
}
impl JsonRpcConnection {
#[cfg(test)]
pub(crate) fn from_stdio<R, W>(reader: R, writer: W, connection_label: String) -> Self
where
R: AsyncRead + Unpin + Send + 'static,
@@ -122,6 +118,7 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
drop_resource: None,
}
}
@@ -256,9 +253,15 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
drop_resource: None,
}
}
pub(crate) fn with_drop_resource(mut self, drop_resource: Box<dyn Send>) -> Self {
self.drop_resource = Some(drop_resource);
self
}
pub(crate) fn into_parts(
self,
) -> (
@@ -266,12 +269,14 @@ impl JsonRpcConnection {
mpsc::Receiver<JsonRpcConnectionEvent>,
watch::Receiver<bool>,
Vec<tokio::task::JoinHandle<()>>,
Option<Box<dyn Send>>,
) {
(
self.outgoing_tx,
self.incoming_rx,
self.disconnected_rx,
self.task_handles,
self.drop_resource,
)
}
}
@@ -298,7 +303,6 @@ async fn send_malformed_message(
.await;
}
#[cfg(test)]
async fn write_jsonrpc_line_message<W>(
writer: &mut BufWriter<W>,
message: &JSONRPCMessage,

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use crate::ExecServerError;
@@ -7,8 +8,11 @@ use crate::ExecutorFileSystem;
use crate::HttpClient;
use crate::client::LazyRemoteExecServerClient;
use crate::client::http_client::ReqwestHttpClient;
use crate::client_api::ExecServerTransport;
use crate::environment_provider::DefaultEnvironmentProvider;
use crate::environment_provider::DefaultEnvironmentSelection;
use crate::environment_provider::EnvironmentProvider;
use crate::environment_provider::environment_provider_from_codex_home;
use crate::environment_provider::normalize_exec_server_url;
use crate::local_file_system::LocalFileSystem;
use crate::local_process::LocalProcess;
@@ -31,8 +35,8 @@ pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
/// shell/filesystem tool availability.
///
/// Remote environments create remote filesystem and execution backends that
/// lazy-connect to the configured exec-server on first use. The websocket is
/// not opened when the manager or environment is constructed.
/// lazy-connect to the configured exec-server on first use. The remote
/// transport is not opened when the manager or environment is constructed.
#[derive(Debug)]
pub struct EnvironmentManager {
default_environment: Option<String>,
@@ -45,12 +49,14 @@ pub const REMOTE_ENVIRONMENT_ID: &str = "remote";
#[derive(Clone, Debug)]
pub struct EnvironmentManagerArgs {
pub codex_home: std::path::PathBuf,
pub local_runtime_paths: ExecServerRuntimePaths,
}
impl EnvironmentManagerArgs {
pub fn new(local_runtime_paths: ExecServerRuntimePaths) -> Self {
pub fn new(codex_home: impl AsRef<Path>, local_runtime_paths: ExecServerRuntimePaths) -> Self {
Self {
codex_home: codex_home.as_ref().to_path_buf(),
local_runtime_paths,
}
}
@@ -71,9 +77,12 @@ impl EnvironmentManager {
/// Builds a test-only manager with environment access disabled.
pub fn disabled_for_tests(local_runtime_paths: ExecServerRuntimePaths) -> Self {
let mut manager = Self::from_environments(HashMap::new(), local_runtime_paths);
manager.default_environment = None;
manager
Self::from_environments(
HashMap::new(),
local_runtime_paths,
DefaultEnvironmentSelection::Disabled,
)
.expect("disabled test environment manager")
}
/// Builds a test-only manager from a raw exec-server URL value.
@@ -84,30 +93,29 @@ impl EnvironmentManager {
Self::from_default_provider_url(exec_server_url, local_runtime_paths).await
}
/// Builds a manager from `CODEX_EXEC_SERVER_URL` and local runtime paths
/// used when creating local filesystem helpers.
pub async fn new(args: EnvironmentManagerArgs) -> Self {
/// Builds a manager from `CODEX_HOME` and local runtime paths used when
/// creating local filesystem helpers.
pub async fn new(args: EnvironmentManagerArgs) -> Result<Self, ExecServerError> {
let EnvironmentManagerArgs {
codex_home,
local_runtime_paths,
} = args;
let exec_server_url = std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok();
Self::from_default_provider_url(exec_server_url, local_runtime_paths).await
let provider = environment_provider_from_codex_home(codex_home.as_path())?;
Self::from_provider(provider.as_ref(), local_runtime_paths).await
}
async fn from_default_provider_url(
exec_server_url: Option<String>,
local_runtime_paths: ExecServerRuntimePaths,
) -> Self {
let environment_disabled = normalize_exec_server_url(exec_server_url.clone()).1;
let provider = DefaultEnvironmentProvider::new(exec_server_url);
let provider_environments = provider.environments(&local_runtime_paths);
let mut manager = Self::from_environments(provider_environments, local_runtime_paths);
if environment_disabled {
// TODO: Remove this legacy `CODEX_EXEC_SERVER_URL=none` crutch once
// environment attachment defaulting moves out of EnvironmentManager.
manager.default_environment = None;
}
manager
Self::from_environments(
provider_environments,
local_runtime_paths,
provider.default_environment_selection(),
)
.expect("default provider should create valid environments")
}
/// Builds a manager from a provider-supplied startup snapshot.
@@ -121,12 +129,14 @@ impl EnvironmentManager {
Self::from_provider_environments(
provider.get_environments(&local_runtime_paths).await?,
local_runtime_paths,
provider.default_environment_selection(),
)
}
fn from_provider_environments(
environments: HashMap<String, Environment>,
local_runtime_paths: ExecServerRuntimePaths,
default_selection: DefaultEnvironmentSelection,
) -> Result<Self, ExecServerError> {
for id in environments.keys() {
if id.is_empty() {
@@ -136,21 +146,35 @@ impl EnvironmentManager {
}
}
Ok(Self::from_environments(environments, local_runtime_paths))
Self::from_environments(environments, local_runtime_paths, default_selection)
}
fn from_environments(
environments: HashMap<String, Environment>,
local_runtime_paths: ExecServerRuntimePaths,
) -> Self {
default_selection: DefaultEnvironmentSelection,
) -> Result<Self, ExecServerError> {
// TODO: Stop deriving a default environment here once omitted
// environment attachment is owned by thread/session setup.
let default_environment = if environments.contains_key(REMOTE_ENVIRONMENT_ID) {
Some(REMOTE_ENVIRONMENT_ID.to_string())
} else if environments.contains_key(LOCAL_ENVIRONMENT_ID) {
Some(LOCAL_ENVIRONMENT_ID.to_string())
} else {
None
let default_environment = match default_selection {
DefaultEnvironmentSelection::Derived => {
if environments.contains_key(REMOTE_ENVIRONMENT_ID) {
Some(REMOTE_ENVIRONMENT_ID.to_string())
} else if environments.contains_key(LOCAL_ENVIRONMENT_ID) {
Some(LOCAL_ENVIRONMENT_ID.to_string())
} else {
None
}
}
DefaultEnvironmentSelection::Environment(environment_id) => {
if !environments.contains_key(&environment_id) {
return Err(ExecServerError::Protocol(format!(
"default environment `{environment_id}` is not configured"
)));
}
Some(environment_id)
}
DefaultEnvironmentSelection::Disabled => None,
};
let local_environment = Arc::new(Environment::local(local_runtime_paths));
let environments = environments
@@ -158,11 +182,11 @@ impl EnvironmentManager {
.map(|(id, environment)| (id, Arc::new(environment)))
.collect();
Self {
Ok(Self {
default_environment,
environments,
local_environment,
}
})
}
/// Returns the default environment instance.
@@ -195,6 +219,7 @@ impl EnvironmentManager {
#[derive(Clone)]
pub struct Environment {
exec_server_url: Option<String>,
remote_transport: Option<ExecServerTransport>,
exec_backend: Arc<dyn ExecBackend>,
filesystem: Arc<dyn ExecutorFileSystem>,
http_client: Arc<dyn HttpClient>,
@@ -206,6 +231,7 @@ impl Environment {
pub fn default_for_tests() -> Self {
Self {
exec_server_url: None,
remote_transport: None,
exec_backend: Arc::new(LocalProcess::default()),
filesystem: Arc::new(LocalFileSystem::unsandboxed()),
http_client: Arc::new(ReqwestHttpClient),
@@ -261,6 +287,7 @@ impl Environment {
pub(crate) fn local(local_runtime_paths: ExecServerRuntimePaths) -> Self {
Self {
exec_server_url: None,
remote_transport: None,
exec_backend: Arc::new(LocalProcess::default()),
filesystem: Arc::new(LocalFileSystem::with_runtime_paths(
local_runtime_paths.clone(),
@@ -274,13 +301,38 @@ impl Environment {
exec_server_url: String,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
let client = LazyRemoteExecServerClient::new(exec_server_url.clone());
Self::remote_with_transport(
ExecServerTransport::WebSocketUrl(exec_server_url),
local_runtime_paths,
)
}
pub(crate) fn remote_stdio_shell_command(
shell_command: String,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
Self::remote_with_transport(
ExecServerTransport::StdioShellCommand(shell_command),
local_runtime_paths,
)
}
fn remote_with_transport(
transport: ExecServerTransport,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
let exec_server_url = match &transport {
ExecServerTransport::WebSocketUrl(url) => Some(url.clone()),
ExecServerTransport::StdioShellCommand(_) => None,
};
let client = LazyRemoteExecServerClient::new(transport.clone());
let exec_backend: Arc<dyn ExecBackend> = Arc::new(RemoteProcess::new(client.clone()));
let filesystem: Arc<dyn ExecutorFileSystem> =
Arc::new(RemoteFileSystem::new(client.clone()));
Self {
exec_server_url: Some(exec_server_url),
exec_server_url,
remote_transport: Some(transport),
exec_backend,
filesystem,
http_client: Arc::new(client),
@@ -289,7 +341,7 @@ impl Environment {
}
pub fn is_remote(&self) -> bool {
self.exec_server_url.is_some()
self.remote_transport.is_some()
}
/// Returns the remote exec-server URL when this environment is remote.
@@ -319,6 +371,7 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use super::DefaultEnvironmentSelection;
use super::Environment;
use super::EnvironmentManager;
use super::LOCAL_ENVIRONMENT_ID;
@@ -425,7 +478,9 @@ mod tests {
.expect("remote environment"),
)]),
test_runtime_paths(),
);
DefaultEnvironmentSelection::Derived,
)
.expect("environment manager");
assert_eq!(
manager.default_environment_id(),
@@ -446,6 +501,7 @@ mod tests {
let err = EnvironmentManager::from_provider_environments(
HashMap::from([("".to_string(), Environment::default_for_tests())]),
test_runtime_paths(),
DefaultEnvironmentSelection::Derived,
)
.expect_err("empty id should fail");
@@ -455,6 +511,64 @@ mod tests {
);
}
#[tokio::test]
async fn environment_manager_uses_explicit_provider_default() {
let manager = EnvironmentManager::from_provider_environments(
HashMap::from([
(
LOCAL_ENVIRONMENT_ID.to_string(),
Environment::default_for_tests(),
),
(
"devbox".to_string(),
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
.expect("remote environment"),
),
]),
test_runtime_paths(),
DefaultEnvironmentSelection::Environment("devbox".to_string()),
)
.expect("manager");
assert_eq!(manager.default_environment_id(), Some("devbox"));
assert!(manager.default_environment().expect("default").is_remote());
}
#[tokio::test]
async fn environment_manager_disables_provider_default() {
let manager = EnvironmentManager::from_provider_environments(
HashMap::from([(
LOCAL_ENVIRONMENT_ID.to_string(),
Environment::default_for_tests(),
)]),
test_runtime_paths(),
DefaultEnvironmentSelection::Disabled,
)
.expect("manager");
assert_eq!(manager.default_environment_id(), None);
assert!(manager.default_environment().is_none());
assert!(manager.get_environment(LOCAL_ENVIRONMENT_ID).is_some());
}
#[tokio::test]
async fn environment_manager_rejects_unknown_provider_default() {
let err = EnvironmentManager::from_provider_environments(
HashMap::from([(
LOCAL_ENVIRONMENT_ID.to_string(),
Environment::default_for_tests(),
)]),
test_runtime_paths(),
DefaultEnvironmentSelection::Environment("missing".to_string()),
)
.expect_err("unknown default should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: default environment `missing` is not configured"
);
}
#[tokio::test]
async fn environment_manager_uses_provider_supplied_local_environment() {
let manager = EnvironmentManager::create_for_tests(

View File

@@ -1,6 +1,11 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use async_trait::async_trait;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use crate::Environment;
use crate::ExecServerError;
@@ -21,6 +26,17 @@ pub trait EnvironmentProvider: Send + Sync {
&self,
local_runtime_paths: &ExecServerRuntimePaths,
) -> Result<HashMap<String, Environment>, ExecServerError>;
fn default_environment_selection(&self) -> DefaultEnvironmentSelection {
DefaultEnvironmentSelection::Derived
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DefaultEnvironmentSelection {
Derived,
Environment(String),
Disabled,
}
/// Default provider backed by `CODEX_EXEC_SERVER_URL`.
@@ -69,6 +85,164 @@ impl EnvironmentProvider for DefaultEnvironmentProvider {
) -> Result<HashMap<String, Environment>, ExecServerError> {
Ok(self.environments(local_runtime_paths))
}
fn default_environment_selection(&self) -> DefaultEnvironmentSelection {
if normalize_exec_server_url(self.exec_server_url.clone()).1 {
DefaultEnvironmentSelection::Disabled
} else {
DefaultEnvironmentSelection::Derived
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, JsonSchema)]
#[schemars(deny_unknown_fields)]
pub struct EnvironmentsToml {
pub default: Option<String>,
#[serde(default)]
pub items: Vec<EnvironmentToml>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)]
#[schemars(deny_unknown_fields)]
pub struct EnvironmentToml {
pub id: String,
pub url: Option<String>,
pub command: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TomlEnvironmentProvider {
config: EnvironmentsToml,
}
impl TomlEnvironmentProvider {
pub fn new(config: EnvironmentsToml) -> Result<Self, ExecServerError> {
let mut ids = HashSet::from([LOCAL_ENVIRONMENT_ID.to_string()]);
for item in &config.items {
validate_environment_item(item)?;
if !ids.insert(item.id.clone()) {
return Err(ExecServerError::Protocol(format!(
"environment id `{}` is duplicated",
item.id
)));
}
}
if let Some(default) = config.default.as_deref() {
let default = default.trim();
if default.is_empty() {
return Err(ExecServerError::Protocol(
"default environment id cannot be empty".to_string(),
));
}
if !default.eq_ignore_ascii_case("none") && !ids.contains(default) {
return Err(ExecServerError::Protocol(format!(
"default environment `{default}` is not configured"
)));
}
}
Ok(Self { config })
}
}
#[async_trait]
impl EnvironmentProvider for TomlEnvironmentProvider {
async fn get_environments(
&self,
local_runtime_paths: &ExecServerRuntimePaths,
) -> Result<HashMap<String, Environment>, ExecServerError> {
let mut environments = HashMap::from([(
LOCAL_ENVIRONMENT_ID.to_string(),
Environment::local(local_runtime_paths.clone()),
)]);
for item in &self.config.items {
let environment = match (item.url.as_deref(), item.command.as_deref()) {
(Some(url), None) => Environment::remote_inner(
url.trim().to_string(),
Some(local_runtime_paths.clone()),
),
(None, Some(command)) => Environment::remote_stdio_shell_command(
command.trim().to_string(),
Some(local_runtime_paths.clone()),
),
_ => unreachable!("transport shape validated by TomlEnvironmentProvider::new"),
};
environments.insert(item.id.clone(), environment);
}
Ok(environments)
}
fn default_environment_selection(&self) -> DefaultEnvironmentSelection {
match self.config.default.as_deref().map(str::trim) {
None => DefaultEnvironmentSelection::Environment(LOCAL_ENVIRONMENT_ID.to_string()),
Some(default) if default.eq_ignore_ascii_case("none") => {
DefaultEnvironmentSelection::Disabled
}
Some(default) => DefaultEnvironmentSelection::Environment(default.to_string()),
}
}
}
fn validate_environment_item(item: &EnvironmentToml) -> Result<(), ExecServerError> {
let id = item.id.trim();
if id.is_empty() {
return Err(ExecServerError::Protocol(
"environment id cannot be empty".to_string(),
));
}
if id != item.id {
return Err(ExecServerError::Protocol(format!(
"environment id `{}` must not contain surrounding whitespace",
item.id
)));
}
if item.id == LOCAL_ENVIRONMENT_ID || item.id.eq_ignore_ascii_case("none") {
return Err(ExecServerError::Protocol(format!(
"environment id `{}` is reserved",
item.id
)));
}
match (item.url.as_deref(), item.command.as_deref()) {
(Some(url), None) => validate_websocket_url(url),
(None, Some(command)) => {
if command.trim().is_empty() {
return Err(ExecServerError::Protocol(format!(
"environment `{}` command cannot be empty",
item.id
)));
}
Ok(())
}
(None, None) => Err(ExecServerError::Protocol(format!(
"environment `{}` must set exactly one of url or command",
item.id
))),
(Some(_), Some(_)) => Err(ExecServerError::Protocol(format!(
"environment `{}` must set exactly one of url or command",
item.id
))),
}
}
fn validate_websocket_url(url: &str) -> Result<(), ExecServerError> {
let url = url.trim();
if url.is_empty() {
return Err(ExecServerError::Protocol(
"environment url cannot be empty".to_string(),
));
}
if !url.starts_with("ws://") && !url.starts_with("wss://") {
return Err(ExecServerError::Protocol(format!(
"environment url `{url}` must use ws:// or wss://"
)));
}
Ok(())
}
pub(crate) fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Option<String>, bool) {
@@ -79,9 +253,45 @@ pub(crate) fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Opt
}
}
const ENVIRONMENTS_TOML_FILE: &str = "environments.toml";
pub fn environment_provider_from_codex_home(
codex_home: &Path,
) -> Result<Box<dyn EnvironmentProvider>, ExecServerError> {
let path = codex_home.join(ENVIRONMENTS_TOML_FILE);
if path.try_exists().map_err(|err| {
ExecServerError::Protocol(format!(
"failed to inspect environment config `{}`: {err}",
path.display()
))
})? {
let environments = load_environments_toml(&path)?;
Ok(Box::new(TomlEnvironmentProvider::new(environments)?))
} else {
Ok(Box::new(DefaultEnvironmentProvider::from_env()))
}
}
pub fn load_environments_toml(path: &Path) -> Result<EnvironmentsToml, ExecServerError> {
let contents = std::fs::read_to_string(path).map_err(|err| {
ExecServerError::Protocol(format!(
"failed to read environment config `{}`: {err}",
path.display()
))
})?;
toml::from_str(&contents).map_err(|err| {
ExecServerError::Protocol(format!(
"failed to parse environment config `{}`: {err}",
path.display()
))
})
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use tempfile::tempdir;
use super::*;
use crate::ExecServerRuntimePaths;
@@ -135,6 +345,10 @@ mod tests {
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID));
assert_eq!(
provider.default_environment_selection(),
DefaultEnvironmentSelection::Disabled
);
}
#[tokio::test]
@@ -169,4 +383,214 @@ mod tests {
Some("ws://127.0.0.1:8765")
);
}
#[tokio::test]
async fn toml_provider_adds_implicit_local_and_configured_environments() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
default: Some("ssh-dev".to_string()),
items: vec![
EnvironmentToml {
id: "devbox".to_string(),
url: Some(" ws://127.0.0.1:8765 ".to_string()),
command: None,
},
EnvironmentToml {
id: "ssh-dev".to_string(),
url: None,
command: Some(" ssh dev \"codex exec-server --listen stdio\" ".to_string()),
},
],
})
.expect("provider");
let runtime_paths = test_runtime_paths();
let environments = provider
.get_environments(&runtime_paths)
.await
.expect("environments");
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
assert_eq!(
environments["devbox"].exec_server_url(),
Some("ws://127.0.0.1:8765")
);
assert!(environments["ssh-dev"].is_remote());
assert_eq!(
provider.default_environment_selection(),
DefaultEnvironmentSelection::Environment("ssh-dev".to_string())
);
}
#[test]
fn toml_provider_default_omitted_selects_local() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml::default()).expect("provider");
assert_eq!(
provider.default_environment_selection(),
DefaultEnvironmentSelection::Environment(LOCAL_ENVIRONMENT_ID.to_string())
);
}
#[test]
fn toml_provider_default_none_disables_default() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
default: Some("none".to_string()),
items: Vec::new(),
})
.expect("provider");
assert_eq!(
provider.default_environment_selection(),
DefaultEnvironmentSelection::Disabled
);
}
#[test]
fn toml_provider_rejects_invalid_items() {
let cases = [
(
EnvironmentToml {
id: "local".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
command: None,
},
"environment id `local` is reserved",
),
(
EnvironmentToml {
id: " devbox ".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
command: None,
},
"environment id ` devbox ` must not contain surrounding whitespace",
),
(
EnvironmentToml {
id: "devbox".to_string(),
url: Some("http://127.0.0.1:8765".to_string()),
command: None,
},
"environment url `http://127.0.0.1:8765` must use ws:// or wss://",
),
(
EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
command: Some("codex exec-server --listen stdio".to_string()),
},
"environment `devbox` must set exactly one of url or command",
),
(
EnvironmentToml {
id: "devbox".to_string(),
url: None,
command: Some(" ".to_string()),
},
"environment `devbox` command cannot be empty",
),
];
for (item, expected) in cases {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
items: vec![item],
})
.expect_err("invalid item should fail");
assert_eq!(
err.to_string(),
format!("exec-server protocol error: {expected}")
);
}
}
#[test]
fn toml_provider_rejects_duplicate_ids() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
items: vec![
EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
command: None,
},
EnvironmentToml {
id: "devbox".to_string(),
url: None,
command: Some("codex exec-server --listen stdio".to_string()),
},
],
})
.expect_err("duplicate id should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: environment id `devbox` is duplicated"
);
}
#[test]
fn toml_provider_rejects_unknown_default() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: Some("missing".to_string()),
items: Vec::new(),
})
.expect_err("unknown default should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: default environment `missing` is not configured"
);
}
#[test]
fn load_environments_toml_reads_root_environment_list() {
let codex_home = tempdir().expect("tempdir");
let path = codex_home.path().join(ENVIRONMENTS_TOML_FILE);
std::fs::write(
&path,
r#"
default = "ssh-dev"
[[items]]
id = "devbox"
url = "ws://127.0.0.1:4512"
[[items]]
id = "ssh-dev"
command = 'ssh dev "codex exec-server --listen stdio"'
"#,
)
.expect("write environments.toml");
let environments = load_environments_toml(&path).expect("environments.toml");
assert_eq!(environments.default.as_deref(), Some("ssh-dev"));
assert_eq!(environments.items.len(), 2);
assert_eq!(environments.items[0].id, "devbox");
assert_eq!(
environments.items[1].command.as_deref(),
Some("ssh dev \"codex exec-server --listen stdio\"")
);
}
#[test]
fn environment_provider_from_codex_home_uses_present_environments_file() {
let codex_home = tempdir().expect("tempdir");
std::fs::write(
codex_home.path().join(ENVIRONMENTS_TOML_FILE),
r#"
default = "none"
"#,
)
.expect("write environments.toml");
let provider =
environment_provider_from_codex_home(codex_home.path()).expect("environment provider");
assert_eq!(
provider.default_environment_selection(),
DefaultEnvironmentSelection::Disabled
);
}
}

View File

@@ -23,8 +23,10 @@ pub use client::ExecServerError;
pub use client::http_client::HttpResponseBodyStream;
pub use client::http_client::ReqwestHttpClient;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::ExecServerTransport;
pub use client_api::HttpClient;
pub use client_api::RemoteExecServerConnectArgs;
pub use client_api::StdioExecServerConnectArgs;
pub use codex_file_system::CopyOptions;
pub use codex_file_system::CreateDirectoryOptions;
pub use codex_file_system::ExecutorFileSystem;
@@ -40,7 +42,13 @@ pub use environment::EnvironmentManagerArgs;
pub use environment::LOCAL_ENVIRONMENT_ID;
pub use environment::REMOTE_ENVIRONMENT_ID;
pub use environment_provider::DefaultEnvironmentProvider;
pub use environment_provider::DefaultEnvironmentSelection;
pub use environment_provider::EnvironmentProvider;
pub use environment_provider::EnvironmentToml;
pub use environment_provider::EnvironmentsToml;
pub use environment_provider::TomlEnvironmentProvider;
pub use environment_provider::environment_provider_from_codex_home;
pub use environment_provider::load_environments_toml;
pub use fs_helper::CODEX_FS_HELPER_ARG1;
pub use fs_helper_main::main as run_fs_helper_main;
pub use local_file_system::LOCAL_FS;

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
@@ -229,12 +230,14 @@ pub(crate) struct RpcClient {
disconnected_rx: watch::Receiver<bool>,
next_request_id: AtomicI64,
transport_tasks: Vec<JoinHandle<()>>,
transport_drop_resource: StdMutex<Option<Box<dyn Send>>>,
reader_task: JoinHandle<()>,
}
impl RpcClient {
pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver<RpcClientEvent>) {
let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks) = connection.into_parts();
let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks, transport_drop_resource) =
connection.into_parts();
let pending = Arc::new(Mutex::new(HashMap::<RequestId, PendingRequest>::new()));
let (event_tx, event_rx) = mpsc::channel(128);
@@ -275,6 +278,7 @@ impl RpcClient {
disconnected_rx,
next_request_id: AtomicI64::new(1),
transport_tasks,
transport_drop_resource: StdMutex::new(transport_drop_resource),
reader_task,
},
event_rx,
@@ -369,6 +373,9 @@ impl Drop for RpcClient {
task.abort();
}
self.reader_task.abort();
if let Ok(drop_resource) = self.transport_drop_resource.get_mut() {
let _ = drop_resource.take();
}
}
}

View File

@@ -47,8 +47,13 @@ async fn run_connection(
runtime_paths: ExecServerRuntimePaths,
) {
let router = Arc::new(build_router());
let (json_outgoing_tx, mut incoming_rx, mut disconnected_rx, connection_tasks) =
connection.into_parts();
let (
json_outgoing_tx,
mut incoming_rx,
mut disconnected_rx,
connection_tasks,
_drop_resource,
) = connection.into_parts();
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
let notifications = RpcNotificationSender::new(outgoing_tx.clone());

View File

@@ -1,5 +1,6 @@
use std::io::Write as _;
use std::net::SocketAddr;
use tokio::io;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tracing::warn;
@@ -10,6 +11,12 @@ use crate::server::processor::ConnectionProcessor;
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) enum ExecServerListenTransport {
WebSocket(SocketAddr),
Stdio,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ExecServerListenUrlParseError {
UnsupportedListenUrl(String),
@@ -21,7 +28,7 @@ impl std::fmt::Display for ExecServerListenUrlParseError {
match self {
ExecServerListenUrlParseError::UnsupportedListenUrl(listen_url) => write!(
f,
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`"
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT` or `stdio`"
),
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url) => write!(
f,
@@ -35,11 +42,18 @@ impl std::error::Error for ExecServerListenUrlParseError {}
pub(crate) fn parse_listen_url(
listen_url: &str,
) -> Result<SocketAddr, ExecServerListenUrlParseError> {
) -> Result<ExecServerListenTransport, ExecServerListenUrlParseError> {
if matches!(listen_url, "stdio" | "stdio://") {
return Ok(ExecServerListenTransport::Stdio);
}
if let Some(socket_addr) = listen_url.strip_prefix("ws://") {
return socket_addr.parse::<SocketAddr>().map_err(|_| {
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string())
});
return socket_addr
.parse::<SocketAddr>()
.map(ExecServerListenTransport::WebSocket)
.map_err(|_| {
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string())
});
}
Err(ExecServerListenUrlParseError::UnsupportedListenUrl(
@@ -51,8 +65,27 @@ pub(crate) async fn run_transport(
listen_url: &str,
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let bind_address = parse_listen_url(listen_url)?;
run_websocket_listener(bind_address, runtime_paths).await
match parse_listen_url(listen_url)? {
ExecServerListenTransport::WebSocket(bind_address) => {
run_websocket_listener(bind_address, runtime_paths).await
}
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await,
}
}
async fn run_stdio_connection(
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let processor = ConnectionProcessor::new(runtime_paths);
tracing::info!("codex-exec-server listening on stdio");
processor
.run_connection(JsonRpcConnection::from_stdio(
io::stdin(),
io::stdout(),
"exec-server stdio".to_string(),
))
.await;
Ok(())
}
async fn run_websocket_listener(

View File

@@ -3,29 +3,45 @@ use std::net::SocketAddr;
use pretty_assertions::assert_eq;
use super::DEFAULT_LISTEN_URL;
use super::ExecServerListenTransport;
use super::parse_listen_url;
#[test]
fn parse_listen_url_accepts_default_websocket_url() {
let bind_address =
parse_listen_url(DEFAULT_LISTEN_URL).expect("default listen URL should parse");
let transport = parse_listen_url(DEFAULT_LISTEN_URL).expect("default listen URL should parse");
assert_eq!(
bind_address,
"127.0.0.1:0"
.parse::<SocketAddr>()
.expect("valid socket address")
transport,
ExecServerListenTransport::WebSocket(
"127.0.0.1:0"
.parse::<SocketAddr>()
.expect("valid socket address")
)
);
}
#[test]
fn parse_listen_url_accepts_stdio() {
let transport = parse_listen_url("stdio").expect("stdio listen URL should parse");
assert_eq!(transport, ExecServerListenTransport::Stdio);
}
#[test]
fn parse_listen_url_accepts_stdio_url() {
let transport = parse_listen_url("stdio://").expect("stdio listen URL should parse");
assert_eq!(transport, ExecServerListenTransport::Stdio);
}
#[test]
fn parse_listen_url_accepts_websocket_url() {
let bind_address =
let transport =
parse_listen_url("ws://127.0.0.1:1234").expect("websocket listen URL should parse");
assert_eq!(
bind_address,
"127.0.0.1:1234"
.parse::<SocketAddr>()
.expect("valid socket address")
transport,
ExecServerListenTransport::WebSocket(
"127.0.0.1:1234"
.parse::<SocketAddr>()
.expect("valid socket address")
)
);
}
@@ -45,6 +61,6 @@ fn parse_listen_url_rejects_unsupported_url() {
parse_listen_url("http://127.0.0.1:1234").expect_err("unsupported scheme should fail");
assert_eq!(
err.to_string(),
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`"
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT` or `stdio`"
);
}

View File

@@ -29,6 +29,7 @@ codex-app-server-protocol = { workspace = true }
codex-cloud-requirements = { workspace = true }
codex-config = { workspace = true }
codex-core = { workspace = true }
codex-exec-server = { workspace = true }
codex-feedback = { workspace = true }
codex-git-utils = { workspace = true }
codex-login = { workspace = true }

View File

@@ -512,7 +512,11 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
feedback: CodexFeedback::new(),
log_db: None,
environment_manager: std::sync::Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await,
EnvironmentManager::new(EnvironmentManagerArgs::new(
config.codex_home.clone(),
local_runtime_paths,
))
.await?,
),
config_warnings,
session_source: SessionSource::Exec,

View File

@@ -60,15 +60,6 @@ pub async fn run_main(
arg0_paths: Arg0DispatchPaths,
cli_config_overrides: CliConfigOverrides,
) -> IoResult<()> {
let environment_manager = Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
))
.await,
);
// Parse CLI overrides once and derive the base Config eagerly so later
// components do not need to work with raw TOML values.
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
@@ -83,6 +74,17 @@ pub async fn run_main(
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
})?;
set_default_client_residency_requirement(config.enforce_residency.value());
let environment_manager = Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
config.codex_home.clone(),
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
))
.await
.map_err(std::io::Error::other)?,
);
let otel = codex_core::otel_init::build_provider(
&config,

View File

@@ -110,8 +110,13 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
config.codex_linux_sandbox_exe.clone(),
)?;
let thread_store = thread_store_from_config(&config);
let environment_manager =
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await);
let environment_manager = Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
config.codex_home.clone(),
local_runtime_paths,
))
.await?,
);
let thread_manager = ThreadManager::new(
&config,
auth_manager,

View File

@@ -747,12 +747,14 @@ pub async fn run_main(
let environment_manager = Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
codex_home.clone(),
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
))
.await,
.await
.map_err(std::io::Error::other)?,
);
let cwd = cli.cwd.clone();
let config_cwd =