mirror of
https://github.com/openai/codex.git
synced 2026-05-14 08:12:36 +00:00
Compare commits
58 Commits
starr/wind
...
starr/exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c5b771e94 | ||
|
|
5f78998ddf | ||
|
|
da52fe6f43 | ||
|
|
f69cd0bf99 | ||
|
|
f4f9839a79 | ||
|
|
8436adb8e3 | ||
|
|
72453d70e3 | ||
|
|
394b50eafb | ||
|
|
9f2a9b32e7 | ||
|
|
5f009d27c1 | ||
|
|
90e4520bf7 | ||
|
|
b49ff33dc9 | ||
|
|
e9f0eaf8ed | ||
|
|
525ed52b18 | ||
|
|
74df186f8e | ||
|
|
bea492f54d | ||
|
|
84c8e21b31 | ||
|
|
ef74ece7e6 | ||
|
|
1fa0fec4dd | ||
|
|
0a7006cebc | ||
|
|
0db1e4b4d9 | ||
|
|
dc926a56c7 | ||
|
|
a2ef8e05b5 | ||
|
|
5086768859 | ||
|
|
b8f4ee4439 | ||
|
|
7a8bed96eb | ||
|
|
93f68577ed | ||
|
|
a970e46442 | ||
|
|
729d8109a3 | ||
|
|
1bfe59e42d | ||
|
|
26899a2d5b | ||
|
|
9f125d25cb | ||
|
|
256760d6b9 | ||
|
|
e58b331d8f | ||
|
|
dd1c9ff41a | ||
|
|
62bd368d38 | ||
|
|
28b23c5cd3 | ||
|
|
3ff901257a | ||
|
|
c72f484068 | ||
|
|
7557a7307a | ||
|
|
08795f1b65 | ||
|
|
f47954caef | ||
|
|
c317a66c61 | ||
|
|
d4b347176a | ||
|
|
6a7112ad21 | ||
|
|
b4269e85ff | ||
|
|
29f8812a83 | ||
|
|
942a674042 | ||
|
|
6ed49d62d7 | ||
|
|
045c740618 | ||
|
|
21297834ed | ||
|
|
c00a36e727 | ||
|
|
74e96987b8 | ||
|
|
caea51d3b7 | ||
|
|
c956939cc6 | ||
|
|
0bb3f728e1 | ||
|
|
995a669971 | ||
|
|
9face2bcbf |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2730,6 +2730,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
"tokio-util",
|
||||
"toml 0.9.11+spec-1.1.0",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"wiremock",
|
||||
|
||||
@@ -49,7 +49,6 @@ use codex_config::ThreadConfigLoader;
|
||||
pub use codex_core::StateDbHandle;
|
||||
use codex_core::config::Config;
|
||||
pub use codex_exec_server::EnvironmentManager;
|
||||
pub use codex_exec_server::EnvironmentManagerArgs;
|
||||
pub use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
|
||||
@@ -8,7 +8,6 @@ use codex_config::RemoteThreadConfigLoader;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::resolve_installation_id;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
@@ -420,15 +419,6 @@ pub async fn run_main_with_transport_options(
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
runtime_options: AppServerRuntimeOptions,
|
||||
) -> IoResult<()> {
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
))
|
||||
.await,
|
||||
);
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
@@ -444,6 +434,17 @@ pub async fn run_main_with_transport_options(
|
||||
)
|
||||
})?;
|
||||
let codex_home = find_codex_home()?;
|
||||
let local_runtime_paths = ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let environment_manager = if loader_overrides.ignore_user_config {
|
||||
EnvironmentManager::from_env(local_runtime_paths).await
|
||||
} else {
|
||||
EnvironmentManager::from_codex_home(codex_home.clone(), local_runtime_paths).await
|
||||
}
|
||||
.map(Arc::new)
|
||||
.map_err(std::io::Error::other)?;
|
||||
let config_manager = ConfigManager::new(
|
||||
codex_home.to_path_buf(),
|
||||
cli_kv_overrides.clone(),
|
||||
|
||||
@@ -46,7 +46,6 @@ pub use codex_core::resolve_installation_id;
|
||||
pub use codex_core::skills::SkillsManager;
|
||||
pub use codex_core::thread_store_from_config;
|
||||
pub use codex_exec_server::EnvironmentManager;
|
||||
pub use codex_exec_server::EnvironmentManagerArgs;
|
||||
pub use codex_exec_server::ExecServerRuntimePaths;
|
||||
pub use codex_features::Feature;
|
||||
pub use codex_features::Features;
|
||||
|
||||
@@ -15,7 +15,6 @@ pub use codex_app_server_protocol::AppMetadata;
|
||||
use codex_connectors::AllConnectorsCacheKey;
|
||||
use codex_connectors::DirectoryListResponse;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_tools::DiscoverableTool;
|
||||
@@ -202,7 +201,7 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
config.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let environment_manager =
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await;
|
||||
EnvironmentManager::from_codex_home(config.codex_home.clone(), local_runtime_paths).await?;
|
||||
list_accessible_connectors_from_mcp_tools_with_environment_manager(
|
||||
config,
|
||||
force_refetch,
|
||||
|
||||
@@ -15,12 +15,12 @@ pub(crate) fn default_thread_environment_selections(
|
||||
cwd: &AbsolutePathBuf,
|
||||
) -> Vec<TurnEnvironmentSelection> {
|
||||
environment_manager
|
||||
.default_environment_id()
|
||||
.default_environment_ids()
|
||||
.into_iter()
|
||||
.map(|environment_id| TurnEnvironmentSelection {
|
||||
environment_id: environment_id.to_string(),
|
||||
environment_id,
|
||||
cwd: cwd.clone(),
|
||||
})
|
||||
.into_iter()
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
||||
@@ -2,9 +2,9 @@ use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_login::AuthManager;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -48,7 +48,11 @@ pub async fn build_prompt_input(
|
||||
&config,
|
||||
Arc::clone(&auth_manager),
|
||||
SessionSource::Exec,
|
||||
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await),
|
||||
Arc::new(
|
||||
EnvironmentManager::from_codex_home(config.codex_home.clone(), local_runtime_paths)
|
||||
.await
|
||||
.map_err(|err| CodexErr::Fatal(err.to_string()))?,
|
||||
),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
|
||||
@@ -19,6 +19,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::ThreadSource;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::PathBufExt;
|
||||
use core_test_support::PathExt;
|
||||
use core_test_support::responses::mount_models_once;
|
||||
@@ -354,6 +355,115 @@ async fn start_thread_accepts_explicit_environment_when_default_environment_is_d
|
||||
assert_eq!(manager.list_thread_ids().await, vec![thread.thread_id]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_thread_uses_all_default_environments_from_codex_home() {
|
||||
let temp_dir = tempdir().expect("tempdir");
|
||||
let mut config = test_config().await;
|
||||
config.codex_home = temp_dir.path().join("codex-home").abs();
|
||||
config.cwd = config.codex_home.abs();
|
||||
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
|
||||
std::fs::write(
|
||||
config.codex_home.join("environments.toml"),
|
||||
r#"
|
||||
default = "dev"
|
||||
|
||||
[[environments]]
|
||||
id = "dev"
|
||||
program = "ssh"
|
||||
args = ["dev", "cd /tmp && true"]
|
||||
"#,
|
||||
)
|
||||
.expect("write environments.toml");
|
||||
|
||||
let runtime_paths = codex_exec_server::ExecServerRuntimePaths::new(
|
||||
std::env::current_exe().expect("current exe path"),
|
||||
/*codex_linux_sandbox_exe*/ None,
|
||||
)
|
||||
.expect("runtime paths");
|
||||
let environment_manager = Arc::new(
|
||||
codex_exec_server::EnvironmentManager::from_codex_home(
|
||||
config.codex_home.clone(),
|
||||
runtime_paths,
|
||||
)
|
||||
.await
|
||||
.expect("environment manager"),
|
||||
);
|
||||
assert_eq!(
|
||||
environment_manager.default_environment_ids(),
|
||||
vec!["dev".to_string(), "local".to_string()]
|
||||
);
|
||||
|
||||
let manager = ThreadManager::with_models_provider_and_home_for_tests(
|
||||
CodexAuth::from_api_key("dummy"),
|
||||
config.model_provider.clone(),
|
||||
config.codex_home.to_path_buf(),
|
||||
environment_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
let thread = manager
|
||||
.start_thread(config)
|
||||
.await
|
||||
.expect("thread should start");
|
||||
|
||||
let selections = thread.thread.environment_selections().await;
|
||||
assert_eq!(
|
||||
selections,
|
||||
vec![
|
||||
TurnEnvironmentSelection {
|
||||
environment_id: "dev".to_string(),
|
||||
cwd: thread.session_configured.cwd.abs(),
|
||||
},
|
||||
TurnEnvironmentSelection {
|
||||
environment_id: "local".to_string(),
|
||||
cwd: thread.session_configured.cwd.abs(),
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
let prompt_items = crate::prompt_debug::build_prompt_input_from_session(
|
||||
thread.thread.codex.session.as_ref(),
|
||||
Vec::<UserInput>::new(),
|
||||
)
|
||||
.await
|
||||
.expect("prompt input");
|
||||
let environment_context = prompt_items
|
||||
.iter()
|
||||
.filter_map(|item| match item {
|
||||
ResponseItem::Message { content, .. } => Some(content),
|
||||
_ => None,
|
||||
})
|
||||
.flatten()
|
||||
.find_map(|content| match content {
|
||||
ContentItem::InputText { text } if text.contains("<environment_context>") => {
|
||||
Some(text.as_str())
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.expect("environment context prompt item");
|
||||
assert!(environment_context.contains("<environments>"));
|
||||
let cwd = thread.session_configured.cwd.display().to_string();
|
||||
let dev_entry = format!(
|
||||
r#"<environment id="dev">
|
||||
<cwd>{cwd}</cwd>
|
||||
<shell>"#
|
||||
);
|
||||
let local_entry = format!(
|
||||
r#"<environment id="local">
|
||||
<cwd>{cwd}</cwd>
|
||||
<shell>"#
|
||||
);
|
||||
let dev_position = environment_context
|
||||
.find(&dev_entry)
|
||||
.expect("dev environment entry");
|
||||
let local_position = environment_context
|
||||
.find(&local_entry)
|
||||
.expect("local environment entry");
|
||||
assert!(dev_position < local_position);
|
||||
assert!(!environment_context.contains("\n <cwd>"));
|
||||
assert!(!environment_context.contains("\n <shell>"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() {
|
||||
let temp_dir = tempdir().expect("tempdir");
|
||||
|
||||
@@ -74,6 +74,7 @@ const SUBMIT_TURN_COMPLETE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
#[derive(Debug)]
|
||||
pub struct TestEnv {
|
||||
environment: codex_exec_server::Environment,
|
||||
exec_server_url: Option<String>,
|
||||
cwd: AbsolutePathBuf,
|
||||
local_cwd_temp_dir: Option<Arc<TempDir>>,
|
||||
remote_container_name: Option<String>,
|
||||
@@ -87,6 +88,7 @@ impl TestEnv {
|
||||
codex_exec_server::Environment::create_for_tests(/*exec_server_url*/ None)?;
|
||||
Ok(Self {
|
||||
environment,
|
||||
exec_server_url: None,
|
||||
cwd,
|
||||
local_cwd_temp_dir: Some(local_cwd_temp_dir),
|
||||
remote_container_name: None,
|
||||
@@ -101,10 +103,6 @@ impl TestEnv {
|
||||
&self.environment
|
||||
}
|
||||
|
||||
pub fn exec_server_url(&self) -> Option<&str> {
|
||||
self.environment.exec_server_url()
|
||||
}
|
||||
|
||||
fn local_cwd_temp_dir(&self) -> Option<Arc<TempDir>> {
|
||||
self.local_cwd_temp_dir.clone()
|
||||
}
|
||||
@@ -124,7 +122,7 @@ pub async fn test_env() -> Result<TestEnv> {
|
||||
Some(remote_env) => {
|
||||
let websocket_url = remote_exec_server_url()?;
|
||||
let environment =
|
||||
codex_exec_server::Environment::create_for_tests(Some(websocket_url))?;
|
||||
codex_exec_server::Environment::create_for_tests(Some(websocket_url.clone()))?;
|
||||
let cwd = remote_aware_cwd_path();
|
||||
environment
|
||||
.get_filesystem()
|
||||
@@ -136,6 +134,7 @@ pub async fn test_env() -> Result<TestEnv> {
|
||||
.await?;
|
||||
Ok(TestEnv {
|
||||
environment,
|
||||
exec_server_url: Some(websocket_url),
|
||||
cwd,
|
||||
local_cwd_temp_dir: None,
|
||||
remote_container_name: Some(remote_env.container_name),
|
||||
@@ -386,7 +385,7 @@ impl TestCodexBuilder {
|
||||
let exec_server_url = self
|
||||
.exec_server_url
|
||||
.clone()
|
||||
.or_else(|| test_env.exec_server_url().map(str::to_owned));
|
||||
.or_else(|| test_env.exec_server_url.clone());
|
||||
let local_runtime_paths = codex_exec_server::ExecServerRuntimePaths::new(
|
||||
std::env::current_exe()?,
|
||||
/*codex_linux_sandbox_exe*/ None,
|
||||
|
||||
@@ -3,6 +3,9 @@ load("//:defs.bzl", "codex_rust_crate")
|
||||
codex_rust_crate(
|
||||
name = "exec-server",
|
||||
crate_name = "codex_exec_server",
|
||||
deps_extra = [
|
||||
"@crates//:toml",
|
||||
],
|
||||
# Keep the crate's integration tests single-threaded under Bazel because
|
||||
# they install process-global test-binary dispatch state, and the remote
|
||||
# exec-server cases already rely on serialization around the full CLI path.
|
||||
|
||||
@@ -28,6 +28,7 @@ serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
tokio = { workspace = true, features = [
|
||||
"fs",
|
||||
"io-std",
|
||||
|
||||
@@ -17,13 +17,14 @@ use tokio::sync::mpsc;
|
||||
use tokio::sync::watch;
|
||||
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::ProcessId;
|
||||
use crate::client_api::ExecServerClientConnectOptions;
|
||||
use crate::client_api::ExecServerTransportParams;
|
||||
use crate::client_api::HttpClient;
|
||||
use crate::client_api::RemoteExecServerConnectArgs;
|
||||
use crate::client_api::StdioExecServerConnectArgs;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::process::ExecProcessEvent;
|
||||
use crate::process::ExecProcessEventLog;
|
||||
@@ -105,6 +106,16 @@ impl From<RemoteExecServerConnectArgs> for ExecServerClientConnectOptions {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StdioExecServerConnectArgs> for ExecServerClientConnectOptions {
|
||||
fn from(value: StdioExecServerConnectArgs) -> Self {
|
||||
Self {
|
||||
client_name: value.client_name,
|
||||
initialize_timeout: value.initialize_timeout,
|
||||
resume_session_id: value.resume_session_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteExecServerConnectArgs {
|
||||
pub fn new(websocket_url: String, client_name: String) -> Self {
|
||||
Self {
|
||||
@@ -180,29 +191,25 @@ pub struct ExecServerClient {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct LazyRemoteExecServerClient {
|
||||
websocket_url: String,
|
||||
transport_params: ExecServerTransportParams,
|
||||
client: Arc<OnceCell<ExecServerClient>>,
|
||||
}
|
||||
|
||||
impl LazyRemoteExecServerClient {
|
||||
pub(crate) fn new(websocket_url: String) -> Self {
|
||||
pub(crate) fn new(transport_params: ExecServerTransportParams) -> Self {
|
||||
Self {
|
||||
websocket_url,
|
||||
transport_params,
|
||||
client: Arc::new(OnceCell::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn get(&self) -> Result<ExecServerClient, ExecServerError> {
|
||||
self.client
|
||||
.get_or_try_init(|| async {
|
||||
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
|
||||
websocket_url: self.websocket_url.clone(),
|
||||
client_name: "codex-environment".to_string(),
|
||||
connect_timeout: Duration::from_secs(5),
|
||||
initialize_timeout: Duration::from_secs(5),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
// TODO: Add reconnect/disconnect handling here instead of reusing
|
||||
// the first successfully initialized connection forever.
|
||||
.get_or_try_init(|| {
|
||||
let transport_params = self.transport_params.clone();
|
||||
async move { ExecServerClient::connect_for_transport(transport_params).await }
|
||||
})
|
||||
.await
|
||||
.cloned()
|
||||
@@ -269,32 +276,6 @@ pub enum ExecServerError {
|
||||
}
|
||||
|
||||
impl ExecServerClient {
|
||||
pub async fn connect_websocket(
|
||||
args: RemoteExecServerConnectArgs,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let websocket_url = args.websocket_url.clone();
|
||||
let connect_timeout = args.connect_timeout;
|
||||
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
|
||||
.await
|
||||
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
|
||||
url: websocket_url.clone(),
|
||||
timeout: connect_timeout,
|
||||
})?
|
||||
.map_err(|source| ExecServerError::WebSocketConnect {
|
||||
url: websocket_url.clone(),
|
||||
source,
|
||||
})?;
|
||||
|
||||
Self::connect(
|
||||
JsonRpcConnection::from_websocket(
|
||||
stream,
|
||||
format!("exec-server websocket {websocket_url}"),
|
||||
),
|
||||
args.into(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn initialize(
|
||||
&self,
|
||||
options: ExecServerClientConnectOptions,
|
||||
@@ -443,7 +424,7 @@ impl ExecServerClient {
|
||||
.clone()
|
||||
}
|
||||
|
||||
async fn connect(
|
||||
pub(crate) async fn connect(
|
||||
connection: JsonRpcConnection,
|
||||
options: ExecServerClientConnectOptions,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
@@ -893,18 +874,30 @@ mod tests {
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
#[cfg(unix)]
|
||||
use std::path::Path;
|
||||
#[cfg(unix)]
|
||||
use std::process::Command;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::duplex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::Duration;
|
||||
#[cfg(unix)]
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::ExecServerClient;
|
||||
use super::ExecServerClientConnectOptions;
|
||||
use crate::ProcessId;
|
||||
#[cfg(not(windows))]
|
||||
use crate::client_api::ExecServerTransportParams;
|
||||
use crate::client_api::StdioExecServerCommand;
|
||||
use crate::client_api::StdioExecServerConnectArgs;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::process::ExecProcessEvent;
|
||||
use crate::protocol::EXEC_CLOSED_METHOD;
|
||||
@@ -942,6 +935,191 @@ mod tests {
|
||||
.expect("json-rpc line should write");
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
#[tokio::test]
|
||||
async fn connect_stdio_command_initializes_json_rpc_client() {
|
||||
let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
|
||||
command: StdioExecServerCommand {
|
||||
program: "sh".to_string(),
|
||||
args: vec![
|
||||
"-c".to_string(),
|
||||
"read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(),
|
||||
],
|
||||
env: HashMap::new(),
|
||||
cwd: None,
|
||||
},
|
||||
client_name: "stdio-test-client".to_string(),
|
||||
initialize_timeout: Duration::from_secs(1),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
.expect("stdio client should connect");
|
||||
|
||||
assert_eq!(client.session_id().as_deref(), Some("stdio-test"));
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
#[tokio::test]
|
||||
async fn connect_for_transport_initializes_stdio_command() {
|
||||
let client = ExecServerClient::connect_for_transport(
|
||||
ExecServerTransportParams::StdioCommand(StdioExecServerCommand {
|
||||
program: "sh".to_string(),
|
||||
args: vec![
|
||||
"-c".to_string(),
|
||||
"read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(),
|
||||
],
|
||||
env: HashMap::new(),
|
||||
cwd: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("stdio transport should connect");
|
||||
|
||||
assert_eq!(client.session_id().as_deref(), Some("stdio-test"));
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
#[tokio::test]
|
||||
async fn connect_stdio_command_initializes_json_rpc_client_on_windows() {
|
||||
let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
|
||||
command: StdioExecServerCommand {
|
||||
program: "powershell".to_string(),
|
||||
args: vec![
|
||||
"-NoProfile".to_string(),
|
||||
"-Command".to_string(),
|
||||
"$null = [Console]::In.ReadLine(); [Console]::Out.WriteLine('{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'); $null = [Console]::In.ReadLine(); Start-Sleep -Seconds 60".to_string(),
|
||||
],
|
||||
env: HashMap::new(),
|
||||
cwd: None,
|
||||
},
|
||||
client_name: "stdio-test-client".to_string(),
|
||||
initialize_timeout: Duration::from_secs(1),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
.expect("stdio client should connect");
|
||||
|
||||
assert_eq!(client.session_id().as_deref(), Some("stdio-test"));
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn dropping_stdio_client_terminates_spawned_process() {
|
||||
let tempdir = tempfile::tempdir().expect("tempdir should be created");
|
||||
let pid_file = tempdir.path().join("server.pid");
|
||||
let child_pid_file = tempdir.path().join("server-child.pid");
|
||||
let stdio_script = format!(
|
||||
"read _line; \
|
||||
echo \"$$\" > {}; \
|
||||
sleep 60 >/dev/null 2>&1 & echo \"$!\" > {}; \
|
||||
printf '%s\\n' '{{\"id\":1,\"result\":{{\"sessionId\":\"stdio-test\"}}}}'; \
|
||||
read _line; \
|
||||
wait",
|
||||
shell_quote(pid_file.as_path()),
|
||||
shell_quote(child_pid_file.as_path()),
|
||||
);
|
||||
|
||||
let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
|
||||
command: StdioExecServerCommand {
|
||||
program: "sh".to_string(),
|
||||
args: vec!["-c".to_string(), stdio_script],
|
||||
env: HashMap::new(),
|
||||
cwd: None,
|
||||
},
|
||||
client_name: "stdio-test-client".to_string(),
|
||||
initialize_timeout: Duration::from_secs(1),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
.expect("stdio client should connect");
|
||||
let server_pid = read_pid_file(pid_file.as_path()).await;
|
||||
let child_pid = read_pid_file(child_pid_file.as_path()).await;
|
||||
assert!(
|
||||
process_exists(server_pid),
|
||||
"spawned stdio process should be running before client drop"
|
||||
);
|
||||
assert!(
|
||||
process_exists(child_pid),
|
||||
"spawned stdio child process should be running before client drop"
|
||||
);
|
||||
|
||||
drop(client);
|
||||
|
||||
wait_for_process_exit(server_pid).await;
|
||||
wait_for_process_exit(child_pid).await;
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn malformed_stdio_message_terminates_spawned_process() {
|
||||
let tempdir = tempfile::tempdir().expect("tempdir should be created");
|
||||
let pid_file = tempdir.path().join("server.pid");
|
||||
let stdio_script = format!(
|
||||
"read _line; \
|
||||
echo \"$$\" > {}; \
|
||||
printf '%s\\n' 'not-json'; \
|
||||
sleep 60",
|
||||
shell_quote(pid_file.as_path()),
|
||||
);
|
||||
|
||||
let result = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
|
||||
command: StdioExecServerCommand {
|
||||
program: "sh".to_string(),
|
||||
args: vec!["-c".to_string(), stdio_script],
|
||||
env: HashMap::new(),
|
||||
cwd: None,
|
||||
},
|
||||
client_name: "stdio-test-client".to_string(),
|
||||
initialize_timeout: Duration::from_secs(1),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await;
|
||||
assert!(result.is_err(), "malformed stdio server should not connect");
|
||||
|
||||
let server_pid = read_pid_file(pid_file.as_path()).await;
|
||||
wait_for_process_exit(server_pid).await;
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn read_pid_file(path: &Path) -> u32 {
|
||||
for _ in 0..20 {
|
||||
if let Ok(contents) = std::fs::read_to_string(path) {
|
||||
return contents
|
||||
.trim()
|
||||
.parse()
|
||||
.expect("pid file should contain a pid");
|
||||
}
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
panic!("pid file {} should be written", path.display());
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn wait_for_process_exit(pid: u32) {
|
||||
for _ in 0..20 {
|
||||
if !process_exists(pid) {
|
||||
return;
|
||||
}
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
panic!("process {pid} should exit");
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn process_exists(pid: u32) -> bool {
|
||||
Command::new("kill")
|
||||
.arg("-0")
|
||||
.arg(pid.to_string())
|
||||
.status()
|
||||
.is_ok_and(|status| status.success())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn shell_quote(path: &Path) -> String {
|
||||
let value = path.to_string_lossy();
|
||||
format!("'{}'", value.replace('\'', "'\\''"))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn process_events_are_delivered_in_seq_order_when_notifications_are_reordered() {
|
||||
let (client_stdin, server_reader) = duplex(1 << 20);
|
||||
@@ -1085,6 +1263,92 @@ mod tests {
|
||||
server.await.expect("server task should finish");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn transport_disconnect_fails_sessions_and_rejects_new_sessions() {
|
||||
let (client_stdin, server_reader) = duplex(1 << 20);
|
||||
let (mut server_writer, client_stdout) = duplex(1 << 20);
|
||||
let (disconnect_tx, disconnect_rx) = oneshot::channel();
|
||||
let server = tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(server_reader).lines();
|
||||
let initialize = read_jsonrpc_line(&mut lines).await;
|
||||
let request = match initialize {
|
||||
JSONRPCMessage::Request(request) if request.method == INITIALIZE_METHOD => request,
|
||||
other => panic!("expected initialize request, got {other:?}"),
|
||||
};
|
||||
write_jsonrpc_line(
|
||||
&mut server_writer,
|
||||
JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: request.id,
|
||||
result: serde_json::to_value(InitializeResponse {
|
||||
session_id: "session-1".to_string(),
|
||||
})
|
||||
.expect("initialize response should serialize"),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let initialized = read_jsonrpc_line(&mut lines).await;
|
||||
match initialized {
|
||||
JSONRPCMessage::Notification(notification)
|
||||
if notification.method == INITIALIZED_METHOD => {}
|
||||
other => panic!("expected initialized notification, got {other:?}"),
|
||||
}
|
||||
|
||||
let _ = disconnect_rx.await;
|
||||
drop(server_writer);
|
||||
});
|
||||
|
||||
let client = ExecServerClient::connect(
|
||||
JsonRpcConnection::from_stdio(
|
||||
client_stdout,
|
||||
client_stdin,
|
||||
"test-exec-server-client".to_string(),
|
||||
),
|
||||
ExecServerClientConnectOptions::default(),
|
||||
)
|
||||
.await
|
||||
.expect("client should connect");
|
||||
|
||||
let process_id = ProcessId::from("disconnect");
|
||||
let session = client
|
||||
.register_session(&process_id)
|
||||
.await
|
||||
.expect("session should register");
|
||||
let mut events = session.subscribe_events();
|
||||
|
||||
disconnect_tx.send(()).expect("disconnect should signal");
|
||||
|
||||
let event = timeout(Duration::from_secs(1), events.recv())
|
||||
.await
|
||||
.expect("session failure should not time out")
|
||||
.expect("session event stream should stay open");
|
||||
let ExecProcessEvent::Failed(message) = event else {
|
||||
panic!("expected session failure after disconnect, got {event:?}");
|
||||
};
|
||||
assert_eq!(message, "exec-server transport disconnected");
|
||||
|
||||
let response = session
|
||||
.read(
|
||||
/*after_seq*/ None, /*max_bytes*/ None, /*wait_ms*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("disconnected session read should synthesize a response");
|
||||
assert_eq!(
|
||||
response.failure.as_deref(),
|
||||
Some("exec-server transport disconnected")
|
||||
);
|
||||
assert!(response.closed);
|
||||
|
||||
let new_session = client.register_session(&ProcessId::from("new")).await;
|
||||
assert!(matches!(
|
||||
new_session,
|
||||
Err(super::ExecServerError::Disconnected(_))
|
||||
));
|
||||
|
||||
drop(client);
|
||||
server.await.expect("server task should finish");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wake_notifications_do_not_block_other_sessions() {
|
||||
let (client_stdin, server_reader) = duplex(1 << 20);
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
@@ -25,6 +27,32 @@ pub struct RemoteExecServerConnectArgs {
|
||||
pub resume_session_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Stdio connection arguments for a command-backed exec-server.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct StdioExecServerConnectArgs {
|
||||
pub command: StdioExecServerCommand,
|
||||
pub client_name: String,
|
||||
pub initialize_timeout: Duration,
|
||||
pub resume_session_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Structured process command used to start an exec-server over stdio.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct StdioExecServerCommand {
|
||||
pub program: String,
|
||||
pub args: Vec<String>,
|
||||
pub env: HashMap<String, String>,
|
||||
pub cwd: Option<PathBuf>,
|
||||
}
|
||||
|
||||
/// Parameters used to connect to a remote exec-server environment.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum ExecServerTransportParams {
|
||||
WebSocketUrl(String),
|
||||
#[allow(dead_code)]
|
||||
StdioCommand(StdioExecServerCommand),
|
||||
}
|
||||
|
||||
/// Sends HTTP requests through a runtime-selected transport.
|
||||
///
|
||||
/// This is the HTTP capability counterpart to [`crate::ExecBackend`]. Callers
|
||||
|
||||
127
codex-rs/exec-server/src/client_transport.rs
Normal file
127
codex-rs/exec-server/src/client_transport.rs
Normal file
@@ -0,0 +1,127 @@
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ExecServerClient;
|
||||
use crate::ExecServerError;
|
||||
use crate::client_api::RemoteExecServerConnectArgs;
|
||||
use crate::client_api::StdioExecServerCommand;
|
||||
use crate::client_api::StdioExecServerConnectArgs;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
|
||||
const ENVIRONMENT_CLIENT_NAME: &str = "codex-environment";
|
||||
const ENVIRONMENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const ENVIRONMENT_INITIALIZE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
impl ExecServerClient {
|
||||
pub(crate) async fn connect_for_transport(
|
||||
transport_params: crate::client_api::ExecServerTransportParams,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
match transport_params {
|
||||
crate::client_api::ExecServerTransportParams::WebSocketUrl(websocket_url) => {
|
||||
Self::connect_websocket(RemoteExecServerConnectArgs {
|
||||
websocket_url,
|
||||
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
|
||||
connect_timeout: ENVIRONMENT_CONNECT_TIMEOUT,
|
||||
initialize_timeout: ENVIRONMENT_INITIALIZE_TIMEOUT,
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
}
|
||||
crate::client_api::ExecServerTransportParams::StdioCommand(command) => {
|
||||
Self::connect_stdio_command(StdioExecServerConnectArgs {
|
||||
command,
|
||||
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
|
||||
initialize_timeout: ENVIRONMENT_INITIALIZE_TIMEOUT,
|
||||
resume_session_id: None,
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect_websocket(
|
||||
args: RemoteExecServerConnectArgs,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let websocket_url = args.websocket_url.clone();
|
||||
let connect_timeout = args.connect_timeout;
|
||||
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
|
||||
.await
|
||||
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
|
||||
url: websocket_url.clone(),
|
||||
timeout: connect_timeout,
|
||||
})?
|
||||
.map_err(|source| ExecServerError::WebSocketConnect {
|
||||
url: websocket_url.clone(),
|
||||
source,
|
||||
})?;
|
||||
|
||||
Self::connect(
|
||||
JsonRpcConnection::from_websocket(
|
||||
stream,
|
||||
format!("exec-server websocket {websocket_url}"),
|
||||
),
|
||||
args.into(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn connect_stdio_command(
|
||||
args: StdioExecServerConnectArgs,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let mut child = stdio_command_process(&args.command)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.map_err(ExecServerError::Spawn)?;
|
||||
|
||||
let stdin = child.stdin.take().ok_or_else(|| {
|
||||
ExecServerError::Protocol("spawned exec-server command has no stdin".to_string())
|
||||
})?;
|
||||
let stdout = child.stdout.take().ok_or_else(|| {
|
||||
ExecServerError::Protocol("spawned exec-server command has no stdout".to_string())
|
||||
})?;
|
||||
if let Some(stderr) = child.stderr.take() {
|
||||
tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(stderr).lines();
|
||||
loop {
|
||||
match lines.next_line().await {
|
||||
Ok(Some(line)) => debug!("exec-server stdio stderr: {line}"),
|
||||
Ok(None) => break,
|
||||
Err(err) => {
|
||||
warn!("failed to read exec-server stdio stderr: {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Self::connect(
|
||||
JsonRpcConnection::from_stdio(stdout, stdin, "exec-server stdio command".to_string())
|
||||
.with_child_process(child),
|
||||
args.into(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command {
|
||||
let mut command = Command::new(&stdio_command.program);
|
||||
command.args(&stdio_command.args);
|
||||
command.envs(&stdio_command.env);
|
||||
if let Some(cwd) = &stdio_command.cwd {
|
||||
command.current_dir(cwd);
|
||||
}
|
||||
#[cfg(unix)]
|
||||
command.process_group(0);
|
||||
command
|
||||
}
|
||||
@@ -1,12 +1,21 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::process::Child;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::watch;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -14,6 +23,7 @@ use tokio::io::BufReader;
|
||||
use tokio::io::BufWriter;
|
||||
|
||||
pub(crate) const CHANNEL_CAPACITY: usize = 128;
|
||||
const STDIO_TERMINATION_GRACE_PERIOD: Duration = Duration::from_secs(2);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum JsonRpcConnectionEvent {
|
||||
@@ -22,11 +32,186 @@ pub(crate) enum JsonRpcConnectionEvent {
|
||||
Disconnected { reason: Option<String> },
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum JsonRpcTransport {
|
||||
Plain,
|
||||
Stdio { transport: StdioTransport },
|
||||
}
|
||||
|
||||
impl JsonRpcTransport {
|
||||
fn from_child_process(child_process: Child) -> Self {
|
||||
Self::Stdio {
|
||||
transport: StdioTransport::spawn(child_process),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn terminate(&self) {
|
||||
match self {
|
||||
Self::Plain => {}
|
||||
Self::Stdio { transport } => transport.terminate(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct StdioTransport {
|
||||
handle: Arc<StdioTransportHandle>,
|
||||
}
|
||||
|
||||
struct StdioTransportHandle {
|
||||
terminate_tx: watch::Sender<bool>,
|
||||
terminate_requested: AtomicBool,
|
||||
}
|
||||
|
||||
impl StdioTransport {
|
||||
fn spawn(child_process: Child) -> Self {
|
||||
let (terminate_tx, terminate_rx) = watch::channel(false);
|
||||
let handle = Arc::new(StdioTransportHandle {
|
||||
terminate_tx,
|
||||
terminate_requested: AtomicBool::new(false),
|
||||
});
|
||||
spawn_stdio_child_supervisor(child_process, terminate_rx);
|
||||
Self { handle }
|
||||
}
|
||||
|
||||
fn terminate(&self) {
|
||||
self.handle.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
impl StdioTransportHandle {
|
||||
fn terminate(&self) {
|
||||
if !self.terminate_requested.swap(true, Ordering::AcqRel) {
|
||||
let _ = self.terminate_tx.send(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for StdioTransportHandle {
|
||||
fn drop(&mut self) {
|
||||
self.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_stdio_child_supervisor(mut child_process: Child, mut terminate_rx: watch::Receiver<bool>) {
|
||||
let process_group_id = child_process.id();
|
||||
tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
result = child_process.wait() => {
|
||||
log_stdio_child_wait_result(result);
|
||||
kill_process_tree(&mut child_process, process_group_id);
|
||||
}
|
||||
() = wait_for_stdio_termination(&mut terminate_rx) => {
|
||||
terminate_stdio_child(&mut child_process, process_group_id).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn wait_for_stdio_termination(terminate_rx: &mut watch::Receiver<bool>) {
|
||||
loop {
|
||||
if *terminate_rx.borrow() {
|
||||
return;
|
||||
}
|
||||
if terminate_rx.changed().await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn terminate_stdio_child(child_process: &mut Child, process_group_id: Option<u32>) {
|
||||
terminate_process_tree(child_process, process_group_id);
|
||||
match timeout(STDIO_TERMINATION_GRACE_PERIOD, child_process.wait()).await {
|
||||
Ok(result) => {
|
||||
log_stdio_child_wait_result(result);
|
||||
}
|
||||
Err(_) => {
|
||||
kill_process_tree(child_process, process_group_id);
|
||||
log_stdio_child_wait_result(child_process.wait().await);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn terminate_process_tree(child_process: &mut Child, process_group_id: Option<u32>) {
|
||||
let Some(process_group_id) = process_group_id else {
|
||||
kill_direct_child(child_process, "terminate");
|
||||
return;
|
||||
};
|
||||
|
||||
#[cfg(unix)]
|
||||
if let Err(err) = codex_utils_pty::process_group::terminate_process_group(process_group_id) {
|
||||
warn!("failed to terminate exec-server stdio process group {process_group_id}: {err}");
|
||||
kill_direct_child(child_process, "terminate");
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
if !kill_windows_process_tree(process_group_id) {
|
||||
kill_direct_child(child_process, "terminate");
|
||||
}
|
||||
|
||||
#[cfg(not(any(unix, windows)))]
|
||||
{
|
||||
let _ = process_group_id;
|
||||
kill_direct_child(child_process, "terminate");
|
||||
}
|
||||
}
|
||||
|
||||
fn kill_process_tree(child_process: &mut Child, process_group_id: Option<u32>) {
|
||||
let Some(process_group_id) = process_group_id else {
|
||||
kill_direct_child(child_process, "kill");
|
||||
return;
|
||||
};
|
||||
|
||||
#[cfg(unix)]
|
||||
if let Err(err) = codex_utils_pty::process_group::kill_process_group(process_group_id) {
|
||||
warn!("failed to kill exec-server stdio process group {process_group_id}: {err}");
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
if !kill_windows_process_tree(process_group_id) {
|
||||
kill_direct_child(child_process, "kill");
|
||||
}
|
||||
|
||||
#[cfg(not(any(unix, windows)))]
|
||||
{
|
||||
let _ = process_group_id;
|
||||
kill_direct_child(child_process, "kill");
|
||||
}
|
||||
}
|
||||
|
||||
fn kill_direct_child(child_process: &mut Child, action: &str) {
|
||||
if let Err(err) = child_process.start_kill() {
|
||||
debug!("failed to {action} exec-server stdio child: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn kill_windows_process_tree(pid: u32) -> bool {
|
||||
let pid = pid.to_string();
|
||||
match std::process::Command::new("taskkill")
|
||||
.args(["/PID", pid.as_str(), "/T", "/F"])
|
||||
.status()
|
||||
{
|
||||
Ok(status) => status.success(),
|
||||
Err(err) => {
|
||||
warn!("failed to run taskkill for exec-server stdio process tree {pid}: {err}");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn log_stdio_child_wait_result(result: std::io::Result<std::process::ExitStatus>) {
|
||||
if let Err(err) = result {
|
||||
debug!("failed to wait for exec-server stdio child: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct JsonRpcConnection {
|
||||
outgoing_tx: mpsc::Sender<JSONRPCMessage>,
|
||||
incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
disconnected_rx: watch::Receiver<bool>,
|
||||
task_handles: Vec<tokio::task::JoinHandle<()>>,
|
||||
pub(crate) outgoing_tx: mpsc::Sender<JSONRPCMessage>,
|
||||
pub(crate) incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
pub(crate) disconnected_rx: watch::Receiver<bool>,
|
||||
pub(crate) task_handles: Vec<tokio::task::JoinHandle<()>>,
|
||||
pub(crate) transport: JsonRpcTransport,
|
||||
}
|
||||
|
||||
impl JsonRpcConnection {
|
||||
@@ -61,13 +246,15 @@ impl JsonRpcConnection {
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
send_malformed_message(
|
||||
send_disconnected(
|
||||
&incoming_tx_for_reader,
|
||||
&disconnected_tx_for_reader,
|
||||
Some(format!(
|
||||
"failed to parse JSON-RPC message from {reader_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -117,6 +304,7 @@ impl JsonRpcConnection {
|
||||
incoming_rx,
|
||||
disconnected_rx,
|
||||
task_handles: vec![reader_task, writer_task],
|
||||
transport: JsonRpcTransport::Plain,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,23 +439,13 @@ impl JsonRpcConnection {
|
||||
incoming_rx,
|
||||
disconnected_rx,
|
||||
task_handles: vec![reader_task, writer_task],
|
||||
transport: JsonRpcTransport::Plain,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_parts(
|
||||
self,
|
||||
) -> (
|
||||
mpsc::Sender<JSONRPCMessage>,
|
||||
mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
watch::Receiver<bool>,
|
||||
Vec<tokio::task::JoinHandle<()>>,
|
||||
) {
|
||||
(
|
||||
self.outgoing_tx,
|
||||
self.incoming_rx,
|
||||
self.disconnected_rx,
|
||||
self.task_handles,
|
||||
)
|
||||
pub(crate) fn with_child_process(mut self, child_process: Child) -> Self {
|
||||
self.transport = JsonRpcTransport::from_child_process(child_process);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,9 +7,13 @@ use crate::ExecutorFileSystem;
|
||||
use crate::HttpClient;
|
||||
use crate::client::LazyRemoteExecServerClient;
|
||||
use crate::client::http_client::ReqwestHttpClient;
|
||||
use crate::client_api::ExecServerTransportParams;
|
||||
use crate::environment_provider::DefaultEnvironmentProvider;
|
||||
use crate::environment_provider::EnvironmentDefault;
|
||||
use crate::environment_provider::EnvironmentProvider;
|
||||
use crate::environment_provider::EnvironmentProviderSnapshot;
|
||||
use crate::environment_provider::normalize_exec_server_url;
|
||||
use crate::environment_toml::environment_provider_from_codex_home;
|
||||
use crate::local_file_system::LocalFileSystem;
|
||||
use crate::local_process::LocalProcess;
|
||||
use crate::process::ExecBackend;
|
||||
@@ -20,9 +24,10 @@ pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
|
||||
|
||||
/// Owns the execution/filesystem environments available to the Codex runtime.
|
||||
///
|
||||
/// `EnvironmentManager` is a shared registry for concrete environments. Its
|
||||
/// default constructor preserves the legacy `CODEX_EXEC_SERVER_URL` behavior
|
||||
/// while provider-based construction accepts a provider-supplied snapshot.
|
||||
/// `EnvironmentManager` is a shared registry for concrete environments.
|
||||
/// `from_codex_home` preserves the legacy `CODEX_EXEC_SERVER_URL` behavior when
|
||||
/// no `environments.toml` file is present, while provider-based construction
|
||||
/// accepts a provider-supplied snapshot.
|
||||
///
|
||||
/// Setting `CODEX_EXEC_SERVER_URL=none` disables environment access by leaving
|
||||
/// the default environment unset while still keeping an explicit local
|
||||
@@ -31,11 +36,12 @@ pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
|
||||
/// shell/filesystem tool availability.
|
||||
///
|
||||
/// Remote environments create remote filesystem and execution backends that
|
||||
/// lazy-connect to the configured exec-server on first use. The websocket is
|
||||
/// not opened when the manager or environment is constructed.
|
||||
/// lazy-connect to the configured exec-server on first use. The remote
|
||||
/// transport is not opened when the manager or environment is constructed.
|
||||
#[derive(Debug)]
|
||||
pub struct EnvironmentManager {
|
||||
default_environment: Option<String>,
|
||||
startup_environment_ids: Vec<String>,
|
||||
environments: HashMap<String, Arc<Environment>>,
|
||||
local_environment: Arc<Environment>,
|
||||
}
|
||||
@@ -43,24 +49,12 @@ pub struct EnvironmentManager {
|
||||
pub const LOCAL_ENVIRONMENT_ID: &str = "local";
|
||||
pub const REMOTE_ENVIRONMENT_ID: &str = "remote";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct EnvironmentManagerArgs {
|
||||
pub local_runtime_paths: ExecServerRuntimePaths,
|
||||
}
|
||||
|
||||
impl EnvironmentManagerArgs {
|
||||
pub fn new(local_runtime_paths: ExecServerRuntimePaths) -> Self {
|
||||
Self {
|
||||
local_runtime_paths,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EnvironmentManager {
|
||||
/// Builds a test-only manager without configured sandbox helper paths.
|
||||
pub fn default_for_tests() -> Self {
|
||||
Self {
|
||||
default_environment: Some(LOCAL_ENVIRONMENT_ID.to_string()),
|
||||
startup_environment_ids: vec![LOCAL_ENVIRONMENT_ID.to_string()],
|
||||
environments: HashMap::from([(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Arc::new(Environment::default_for_tests()),
|
||||
@@ -71,9 +65,12 @@ impl EnvironmentManager {
|
||||
|
||||
/// Builds a test-only manager with environment access disabled.
|
||||
pub fn disabled_for_tests(local_runtime_paths: ExecServerRuntimePaths) -> Self {
|
||||
let mut manager = Self::from_environments(HashMap::new(), local_runtime_paths);
|
||||
manager.default_environment = None;
|
||||
manager
|
||||
Self {
|
||||
default_environment: None,
|
||||
startup_environment_ids: Vec::new(),
|
||||
environments: HashMap::new(),
|
||||
local_environment: Arc::new(Environment::local(local_runtime_paths)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a test-only manager from a raw exec-server URL value.
|
||||
@@ -84,85 +81,116 @@ impl EnvironmentManager {
|
||||
Self::from_default_provider_url(exec_server_url, local_runtime_paths).await
|
||||
}
|
||||
|
||||
/// Builds a manager from `CODEX_EXEC_SERVER_URL` and local runtime paths
|
||||
/// used when creating local filesystem helpers.
|
||||
pub async fn new(args: EnvironmentManagerArgs) -> Self {
|
||||
let EnvironmentManagerArgs {
|
||||
local_runtime_paths,
|
||||
} = args;
|
||||
let exec_server_url = std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok();
|
||||
Self::from_default_provider_url(exec_server_url, local_runtime_paths).await
|
||||
/// Builds a manager from `CODEX_HOME` and local runtime paths used when
|
||||
/// creating local filesystem helpers.
|
||||
///
|
||||
/// If `CODEX_HOME/environments.toml` is present, it defines the configured
|
||||
/// environments. Otherwise this preserves the legacy
|
||||
/// `CODEX_EXEC_SERVER_URL` behavior. Callers that ignore user config
|
||||
/// should use [`Self::from_env`] instead.
|
||||
pub async fn from_codex_home(
|
||||
codex_home: impl AsRef<std::path::Path>,
|
||||
local_runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let provider = environment_provider_from_codex_home(codex_home.as_ref())?;
|
||||
Self::from_provider(provider.as_ref(), local_runtime_paths).await
|
||||
}
|
||||
|
||||
/// Builds a manager from the legacy environment-variable provider without
|
||||
/// reading user config files from `CODEX_HOME`.
|
||||
pub async fn from_env(
|
||||
local_runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let provider = DefaultEnvironmentProvider::from_env();
|
||||
Self::from_provider(&provider, local_runtime_paths).await
|
||||
}
|
||||
|
||||
async fn from_default_provider_url(
|
||||
exec_server_url: Option<String>,
|
||||
local_runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Self {
|
||||
let environment_disabled = normalize_exec_server_url(exec_server_url.clone()).1;
|
||||
let provider = DefaultEnvironmentProvider::new(exec_server_url);
|
||||
let provider_environments = provider.environments(&local_runtime_paths);
|
||||
let mut manager = Self::from_environments(provider_environments, local_runtime_paths);
|
||||
if environment_disabled {
|
||||
// TODO: Remove this legacy `CODEX_EXEC_SERVER_URL=none` crutch once
|
||||
// environment attachment defaulting moves out of EnvironmentManager.
|
||||
manager.default_environment = None;
|
||||
match Self::from_provider(&provider, local_runtime_paths).await {
|
||||
Ok(manager) => manager,
|
||||
Err(err) => panic!("default provider should create valid environments: {err}"),
|
||||
}
|
||||
manager
|
||||
}
|
||||
|
||||
/// Builds a manager from a provider-supplied startup snapshot.
|
||||
pub async fn from_provider<P>(
|
||||
pub(crate) async fn from_provider<P>(
|
||||
provider: &P,
|
||||
local_runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Result<Self, ExecServerError>
|
||||
where
|
||||
P: EnvironmentProvider + ?Sized,
|
||||
{
|
||||
Self::from_provider_environments(
|
||||
provider.get_environments(&local_runtime_paths).await?,
|
||||
Self::from_provider_snapshot(
|
||||
provider.snapshot(&local_runtime_paths).await?,
|
||||
local_runtime_paths,
|
||||
)
|
||||
}
|
||||
|
||||
fn from_provider_environments(
|
||||
environments: HashMap<String, Environment>,
|
||||
fn from_provider_snapshot(
|
||||
snapshot: EnvironmentProviderSnapshot,
|
||||
local_runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
for id in environments.keys() {
|
||||
let EnvironmentProviderSnapshot {
|
||||
environments,
|
||||
default,
|
||||
include_all_environments_by_default,
|
||||
} = snapshot;
|
||||
let mut configured_environment_ids = Vec::with_capacity(environments.len());
|
||||
let mut environment_map = HashMap::with_capacity(environments.len());
|
||||
for (id, environment) in environments {
|
||||
if id.is_empty() {
|
||||
return Err(ExecServerError::Protocol(
|
||||
"environment id cannot be empty".to_string(),
|
||||
));
|
||||
}
|
||||
if environment_map
|
||||
.insert(id.clone(), Arc::new(environment))
|
||||
.is_some()
|
||||
{
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment id `{id}` is duplicated"
|
||||
)));
|
||||
}
|
||||
configured_environment_ids.push(id);
|
||||
}
|
||||
|
||||
Ok(Self::from_environments(environments, local_runtime_paths))
|
||||
}
|
||||
|
||||
fn from_environments(
|
||||
environments: HashMap<String, Environment>,
|
||||
local_runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Self {
|
||||
// TODO: Stop deriving a default environment here once omitted
|
||||
// environment attachment is owned by thread/session setup.
|
||||
let default_environment = if environments.contains_key(REMOTE_ENVIRONMENT_ID) {
|
||||
Some(REMOTE_ENVIRONMENT_ID.to_string())
|
||||
} else if environments.contains_key(LOCAL_ENVIRONMENT_ID) {
|
||||
Some(LOCAL_ENVIRONMENT_ID.to_string())
|
||||
} else {
|
||||
None
|
||||
let default_environment = match default {
|
||||
EnvironmentDefault::Disabled => None,
|
||||
EnvironmentDefault::EnvironmentId(environment_id) => {
|
||||
if !environment_map.contains_key(&environment_id) {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"default environment `{environment_id}` is not configured"
|
||||
)));
|
||||
}
|
||||
Some(environment_id)
|
||||
}
|
||||
};
|
||||
let local_environment = Arc::new(Environment::local(local_runtime_paths));
|
||||
let environments = environments
|
||||
.into_iter()
|
||||
.map(|(id, environment)| (id, Arc::new(environment)))
|
||||
.collect();
|
||||
let startup_environment_ids = match default_environment.as_ref() {
|
||||
None => Vec::new(),
|
||||
Some(default_environment_id) if include_all_environments_by_default => {
|
||||
let mut environment_ids = Vec::with_capacity(configured_environment_ids.len());
|
||||
environment_ids.push(default_environment_id.clone());
|
||||
environment_ids.extend(
|
||||
configured_environment_ids
|
||||
.iter()
|
||||
.filter(|environment_id| environment_id.as_str() != default_environment_id)
|
||||
.cloned(),
|
||||
);
|
||||
environment_ids
|
||||
}
|
||||
Some(default_environment_id) => vec![default_environment_id.clone()],
|
||||
};
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
default_environment,
|
||||
environments,
|
||||
startup_environment_ids,
|
||||
environments: environment_map,
|
||||
local_environment,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the default environment instance.
|
||||
@@ -177,6 +205,11 @@ impl EnvironmentManager {
|
||||
self.default_environment.as_deref()
|
||||
}
|
||||
|
||||
/// Returns the ordered environment ids used for new thread startup.
|
||||
pub fn default_environment_ids(&self) -> Vec<String> {
|
||||
self.startup_environment_ids.clone()
|
||||
}
|
||||
|
||||
/// Returns the local environment instance used for internal runtime work.
|
||||
pub fn local_environment(&self) -> Arc<Environment> {
|
||||
Arc::clone(&self.local_environment)
|
||||
@@ -194,7 +227,7 @@ impl EnvironmentManager {
|
||||
/// paths used by filesystem helpers.
|
||||
#[derive(Clone)]
|
||||
pub struct Environment {
|
||||
exec_server_url: Option<String>,
|
||||
remote_transport: Option<ExecServerTransportParams>,
|
||||
exec_backend: Arc<dyn ExecBackend>,
|
||||
filesystem: Arc<dyn ExecutorFileSystem>,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
@@ -205,7 +238,7 @@ impl Environment {
|
||||
/// Builds a test-only local environment without configured sandbox helper paths.
|
||||
pub fn default_for_tests() -> Self {
|
||||
Self {
|
||||
exec_server_url: None,
|
||||
remote_transport: None,
|
||||
exec_backend: Arc::new(LocalProcess::default()),
|
||||
filesystem: Arc::new(LocalFileSystem::unsandboxed()),
|
||||
http_client: Arc::new(ReqwestHttpClient),
|
||||
@@ -217,7 +250,7 @@ impl Environment {
|
||||
impl std::fmt::Debug for Environment {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Environment")
|
||||
.field("exec_server_url", &self.exec_server_url)
|
||||
.field("exec_server_url", &self.exec_server_url())
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
@@ -260,7 +293,7 @@ impl Environment {
|
||||
|
||||
pub(crate) fn local(local_runtime_paths: ExecServerRuntimePaths) -> Self {
|
||||
Self {
|
||||
exec_server_url: None,
|
||||
remote_transport: None,
|
||||
exec_backend: Arc::new(LocalProcess::default()),
|
||||
filesystem: Arc::new(LocalFileSystem::with_runtime_paths(
|
||||
local_runtime_paths.clone(),
|
||||
@@ -274,13 +307,23 @@ impl Environment {
|
||||
exec_server_url: String,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
let client = LazyRemoteExecServerClient::new(exec_server_url.clone());
|
||||
Self::remote_with_transport(
|
||||
ExecServerTransportParams::WebSocketUrl(exec_server_url),
|
||||
local_runtime_paths,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn remote_with_transport(
|
||||
transport_params: ExecServerTransportParams,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
let client = LazyRemoteExecServerClient::new(transport_params.clone());
|
||||
let exec_backend: Arc<dyn ExecBackend> = Arc::new(RemoteProcess::new(client.clone()));
|
||||
let filesystem: Arc<dyn ExecutorFileSystem> =
|
||||
Arc::new(RemoteFileSystem::new(client.clone()));
|
||||
|
||||
Self {
|
||||
exec_server_url: Some(exec_server_url),
|
||||
remote_transport: Some(transport_params),
|
||||
exec_backend,
|
||||
filesystem,
|
||||
http_client: Arc::new(client),
|
||||
@@ -289,12 +332,15 @@ impl Environment {
|
||||
}
|
||||
|
||||
pub fn is_remote(&self) -> bool {
|
||||
self.exec_server_url.is_some()
|
||||
self.remote_transport.is_some()
|
||||
}
|
||||
|
||||
/// Returns the remote exec-server URL when this environment is remote.
|
||||
pub fn exec_server_url(&self) -> Option<&str> {
|
||||
self.exec_server_url.as_deref()
|
||||
pub(crate) fn exec_server_url(&self) -> Option<&str> {
|
||||
match self.remote_transport.as_ref() {
|
||||
Some(ExecServerTransportParams::WebSocketUrl(url)) => Some(url.as_str()),
|
||||
Some(ExecServerTransportParams::StdioCommand(_)) | None => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn local_runtime_paths(&self) -> Option<&ExecServerRuntimePaths> {
|
||||
@@ -316,17 +362,34 @@ impl Environment {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::Environment;
|
||||
use super::EnvironmentManager;
|
||||
use super::LOCAL_ENVIRONMENT_ID;
|
||||
use super::REMOTE_ENVIRONMENT_ID;
|
||||
use crate::ExecServerError;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::ProcessId;
|
||||
use crate::environment_provider::EnvironmentDefault;
|
||||
use crate::environment_provider::EnvironmentProvider;
|
||||
use crate::environment_provider::EnvironmentProviderSnapshot;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
struct TestEnvironmentProvider {
|
||||
snapshot: EnvironmentProviderSnapshot,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl EnvironmentProvider for TestEnvironmentProvider {
|
||||
async fn snapshot(
|
||||
&self,
|
||||
_local_runtime_paths: &ExecServerRuntimePaths,
|
||||
) -> Result<EnvironmentProviderSnapshot, ExecServerError> {
|
||||
Ok(self.snapshot.clone())
|
||||
}
|
||||
}
|
||||
|
||||
fn test_runtime_paths() -> ExecServerRuntimePaths {
|
||||
ExecServerRuntimePaths::new(
|
||||
std::env::current_exe().expect("current exe"),
|
||||
@@ -417,15 +480,21 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_builds_from_provider_environments() {
|
||||
let manager = EnvironmentManager::from_environments(
|
||||
HashMap::from([(
|
||||
REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
|
||||
.expect("remote environment"),
|
||||
)]),
|
||||
test_runtime_paths(),
|
||||
);
|
||||
async fn environment_manager_builds_from_provider() {
|
||||
let provider = TestEnvironmentProvider {
|
||||
snapshot: EnvironmentProviderSnapshot {
|
||||
environments: vec![(
|
||||
REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
|
||||
.expect("remote environment"),
|
||||
)],
|
||||
default: EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string()),
|
||||
include_all_environments_by_default: false,
|
||||
},
|
||||
};
|
||||
let manager = EnvironmentManager::from_provider(&provider, test_runtime_paths())
|
||||
.await
|
||||
.expect("environment manager");
|
||||
|
||||
assert_eq!(
|
||||
manager.default_environment_id(),
|
||||
@@ -443,11 +512,16 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_rejects_empty_environment_id() {
|
||||
let err = EnvironmentManager::from_provider_environments(
|
||||
HashMap::from([("".to_string(), Environment::default_for_tests())]),
|
||||
test_runtime_paths(),
|
||||
)
|
||||
.expect_err("empty id should fail");
|
||||
let provider = TestEnvironmentProvider {
|
||||
snapshot: EnvironmentProviderSnapshot {
|
||||
environments: vec![("".to_string(), Environment::default_for_tests())],
|
||||
default: EnvironmentDefault::Disabled,
|
||||
include_all_environments_by_default: false,
|
||||
},
|
||||
};
|
||||
let err = EnvironmentManager::from_provider(&provider, test_runtime_paths())
|
||||
.await
|
||||
.expect_err("empty id should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
@@ -455,6 +529,80 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_uses_explicit_provider_default() {
|
||||
let provider = TestEnvironmentProvider {
|
||||
snapshot: EnvironmentProviderSnapshot {
|
||||
environments: vec![
|
||||
(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Environment::default_for_tests(),
|
||||
),
|
||||
(
|
||||
"devbox".to_string(),
|
||||
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
|
||||
.expect("remote environment"),
|
||||
),
|
||||
],
|
||||
default: EnvironmentDefault::EnvironmentId("devbox".to_string()),
|
||||
include_all_environments_by_default: true,
|
||||
},
|
||||
};
|
||||
let manager = EnvironmentManager::from_provider(&provider, test_runtime_paths())
|
||||
.await
|
||||
.expect("manager");
|
||||
|
||||
assert_eq!(manager.default_environment_id(), Some("devbox"));
|
||||
assert_eq!(
|
||||
manager.default_environment_ids(),
|
||||
vec!["devbox".to_string(), LOCAL_ENVIRONMENT_ID.to_string()]
|
||||
);
|
||||
assert!(manager.default_environment().expect("default").is_remote());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_disables_provider_default() {
|
||||
let provider = TestEnvironmentProvider {
|
||||
snapshot: EnvironmentProviderSnapshot {
|
||||
environments: vec![(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Environment::default_for_tests(),
|
||||
)],
|
||||
default: EnvironmentDefault::Disabled,
|
||||
include_all_environments_by_default: true,
|
||||
},
|
||||
};
|
||||
let manager = EnvironmentManager::from_provider(&provider, test_runtime_paths())
|
||||
.await
|
||||
.expect("manager");
|
||||
|
||||
assert_eq!(manager.default_environment_id(), None);
|
||||
assert!(manager.default_environment().is_none());
|
||||
assert!(manager.get_environment(LOCAL_ENVIRONMENT_ID).is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_rejects_unknown_provider_default() {
|
||||
let provider = TestEnvironmentProvider {
|
||||
snapshot: EnvironmentProviderSnapshot {
|
||||
environments: vec![(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Environment::default_for_tests(),
|
||||
)],
|
||||
default: EnvironmentDefault::EnvironmentId("missing".to_string()),
|
||||
include_all_environments_by_default: true,
|
||||
},
|
||||
};
|
||||
let err = EnvironmentManager::from_provider(&provider, test_runtime_paths())
|
||||
.await
|
||||
.expect_err("unknown default should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"exec-server protocol error: default environment `missing` is not configured"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_uses_provider_supplied_local_environment() {
|
||||
let manager = EnvironmentManager::create_for_tests(
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::Environment;
|
||||
@@ -11,16 +9,30 @@ use crate::environment::REMOTE_ENVIRONMENT_ID;
|
||||
|
||||
/// Lists the concrete environments available to Codex.
|
||||
///
|
||||
/// Implementations should return the provider-owned startup snapshot that
|
||||
/// `EnvironmentManager` will cache. Providers that want the local environment to
|
||||
/// be addressable by id should include it explicitly in the returned map.
|
||||
/// Implementations own a startup snapshot containing both the available
|
||||
/// environment list in configured order and the default environment
|
||||
/// selection. Providers that want the local environment to be addressable by
|
||||
/// id should include it explicitly in the returned list.
|
||||
#[async_trait]
|
||||
pub trait EnvironmentProvider: Send + Sync {
|
||||
/// Returns the environments available for a new manager.
|
||||
async fn get_environments(
|
||||
pub(crate) trait EnvironmentProvider: Send + Sync {
|
||||
/// Returns the provider-owned environment startup snapshot.
|
||||
async fn snapshot(
|
||||
&self,
|
||||
local_runtime_paths: &ExecServerRuntimePaths,
|
||||
) -> Result<HashMap<String, Environment>, ExecServerError>;
|
||||
) -> Result<EnvironmentProviderSnapshot, ExecServerError>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct EnvironmentProviderSnapshot {
|
||||
pub environments: Vec<(String, Environment)>,
|
||||
pub default: EnvironmentDefault,
|
||||
pub include_all_environments_by_default: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub(crate) enum EnvironmentDefault {
|
||||
Disabled,
|
||||
EnvironmentId(String),
|
||||
}
|
||||
|
||||
/// Default provider backed by `CODEX_EXEC_SERVER_URL`.
|
||||
@@ -40,34 +52,49 @@ impl DefaultEnvironmentProvider {
|
||||
Self::new(std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok())
|
||||
}
|
||||
|
||||
pub(crate) fn environments(
|
||||
pub(crate) fn snapshot_inner(
|
||||
&self,
|
||||
local_runtime_paths: &ExecServerRuntimePaths,
|
||||
) -> HashMap<String, Environment> {
|
||||
let mut environments = HashMap::from([(
|
||||
) -> EnvironmentProviderSnapshot {
|
||||
let mut environments = vec![(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Environment::local(local_runtime_paths.clone()),
|
||||
)]);
|
||||
let exec_server_url = normalize_exec_server_url(self.exec_server_url.clone()).0;
|
||||
)];
|
||||
let (exec_server_url, disabled) = normalize_exec_server_url(self.exec_server_url.clone());
|
||||
|
||||
if let Some(exec_server_url) = exec_server_url {
|
||||
environments.insert(
|
||||
environments.push((
|
||||
REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
Environment::remote_inner(exec_server_url, Some(local_runtime_paths.clone())),
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
environments
|
||||
let has_remote = environments
|
||||
.iter()
|
||||
.any(|(id, _environment)| id == REMOTE_ENVIRONMENT_ID);
|
||||
let default = if disabled {
|
||||
EnvironmentDefault::Disabled
|
||||
} else if has_remote {
|
||||
EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string())
|
||||
} else {
|
||||
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
|
||||
};
|
||||
|
||||
EnvironmentProviderSnapshot {
|
||||
environments,
|
||||
default,
|
||||
include_all_environments_by_default: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EnvironmentProvider for DefaultEnvironmentProvider {
|
||||
async fn get_environments(
|
||||
async fn snapshot(
|
||||
&self,
|
||||
local_runtime_paths: &ExecServerRuntimePaths,
|
||||
) -> Result<HashMap<String, Environment>, ExecServerError> {
|
||||
Ok(self.environments(local_runtime_paths))
|
||||
) -> Result<EnvironmentProviderSnapshot, ExecServerError> {
|
||||
Ok(self.snapshot_inner(local_runtime_paths))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,6 +108,8 @@ pub(crate) fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Opt
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
@@ -98,10 +127,16 @@ mod tests {
|
||||
async fn default_provider_returns_local_environment_when_url_is_missing() {
|
||||
let provider = DefaultEnvironmentProvider::new(/*exec_server_url*/ None);
|
||||
let runtime_paths = test_runtime_paths();
|
||||
let environments = provider
|
||||
.get_environments(&runtime_paths)
|
||||
let snapshot = provider
|
||||
.snapshot(&runtime_paths)
|
||||
.await
|
||||
.expect("environments");
|
||||
let EnvironmentProviderSnapshot {
|
||||
environments,
|
||||
default,
|
||||
include_all_environments_by_default,
|
||||
} = snapshot;
|
||||
let environments: HashMap<_, _> = environments.into_iter().collect();
|
||||
|
||||
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
|
||||
assert_eq!(
|
||||
@@ -109,42 +144,72 @@ mod tests {
|
||||
Some(&runtime_paths)
|
||||
);
|
||||
assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID));
|
||||
assert_eq!(
|
||||
default,
|
||||
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
|
||||
);
|
||||
assert!(!include_all_environments_by_default);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_provider_returns_local_environment_when_url_is_empty() {
|
||||
let provider = DefaultEnvironmentProvider::new(Some(String::new()));
|
||||
let runtime_paths = test_runtime_paths();
|
||||
let environments = provider
|
||||
.get_environments(&runtime_paths)
|
||||
let snapshot = provider
|
||||
.snapshot(&runtime_paths)
|
||||
.await
|
||||
.expect("environments");
|
||||
let EnvironmentProviderSnapshot {
|
||||
environments,
|
||||
default,
|
||||
include_all_environments_by_default,
|
||||
} = snapshot;
|
||||
let environments: HashMap<_, _> = environments.into_iter().collect();
|
||||
|
||||
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
|
||||
assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID));
|
||||
assert_eq!(
|
||||
default,
|
||||
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
|
||||
);
|
||||
assert!(!include_all_environments_by_default);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_provider_returns_local_environment_for_none_value() {
|
||||
let provider = DefaultEnvironmentProvider::new(Some("none".to_string()));
|
||||
let runtime_paths = test_runtime_paths();
|
||||
let environments = provider
|
||||
.get_environments(&runtime_paths)
|
||||
let snapshot = provider
|
||||
.snapshot(&runtime_paths)
|
||||
.await
|
||||
.expect("environments");
|
||||
let EnvironmentProviderSnapshot {
|
||||
environments,
|
||||
default,
|
||||
include_all_environments_by_default,
|
||||
} = snapshot;
|
||||
let environments: HashMap<_, _> = environments.into_iter().collect();
|
||||
|
||||
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
|
||||
assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID));
|
||||
assert_eq!(default, EnvironmentDefault::Disabled);
|
||||
assert!(!include_all_environments_by_default);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_provider_adds_remote_environment_for_websocket_url() {
|
||||
let provider = DefaultEnvironmentProvider::new(Some("ws://127.0.0.1:8765".to_string()));
|
||||
let runtime_paths = test_runtime_paths();
|
||||
let environments = provider
|
||||
.get_environments(&runtime_paths)
|
||||
let snapshot = provider
|
||||
.snapshot(&runtime_paths)
|
||||
.await
|
||||
.expect("environments");
|
||||
let EnvironmentProviderSnapshot {
|
||||
environments,
|
||||
default,
|
||||
include_all_environments_by_default,
|
||||
} = snapshot;
|
||||
let environments: HashMap<_, _> = environments.into_iter().collect();
|
||||
|
||||
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
|
||||
let remote_environment = &environments[REMOTE_ENVIRONMENT_ID];
|
||||
@@ -153,16 +218,22 @@ mod tests {
|
||||
remote_environment.exec_server_url(),
|
||||
Some("ws://127.0.0.1:8765")
|
||||
);
|
||||
assert_eq!(
|
||||
default,
|
||||
EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string())
|
||||
);
|
||||
assert!(!include_all_environments_by_default);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_provider_normalizes_exec_server_url() {
|
||||
let provider = DefaultEnvironmentProvider::new(Some(" ws://127.0.0.1:8765 ".to_string()));
|
||||
let runtime_paths = test_runtime_paths();
|
||||
let environments = provider
|
||||
.get_environments(&runtime_paths)
|
||||
let snapshot = provider
|
||||
.snapshot(&runtime_paths)
|
||||
.await
|
||||
.expect("environments");
|
||||
let environments: HashMap<_, _> = snapshot.environments.into_iter().collect();
|
||||
|
||||
assert_eq!(
|
||||
environments[REMOTE_ENVIRONMENT_ID].exec_server_url(),
|
||||
|
||||
720
codex-rs/exec-server/src/environment_toml.rs
Normal file
720
codex-rs/exec-server/src/environment_toml.rs
Normal file
@@ -0,0 +1,720 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use serde::Deserialize;
|
||||
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
||||
|
||||
use crate::DefaultEnvironmentProvider;
|
||||
use crate::Environment;
|
||||
use crate::ExecServerError;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::client_api::ExecServerTransportParams;
|
||||
use crate::client_api::StdioExecServerCommand;
|
||||
use crate::environment::LOCAL_ENVIRONMENT_ID;
|
||||
use crate::environment_provider::EnvironmentDefault;
|
||||
use crate::environment_provider::EnvironmentProvider;
|
||||
use crate::environment_provider::EnvironmentProviderSnapshot;
|
||||
|
||||
const ENVIRONMENTS_TOML_FILE: &str = "environments.toml";
|
||||
const MAX_ENVIRONMENT_ID_LEN: usize = 64;
|
||||
|
||||
#[derive(Deserialize, Debug, Default)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct EnvironmentsToml {
|
||||
default: Option<String>,
|
||||
|
||||
#[serde(default)]
|
||||
environments: Vec<EnvironmentToml>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Default, PartialEq, Eq)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct EnvironmentToml {
|
||||
id: String,
|
||||
url: Option<String>,
|
||||
program: Option<String>,
|
||||
args: Option<Vec<String>>,
|
||||
env: Option<HashMap<String, String>>,
|
||||
cwd: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
struct TomlEnvironmentProvider {
|
||||
default: EnvironmentDefault,
|
||||
environments: Vec<(String, ExecServerTransportParams)>,
|
||||
}
|
||||
|
||||
impl TomlEnvironmentProvider {
|
||||
#[cfg(test)]
|
||||
fn new(config: EnvironmentsToml) -> Result<Self, ExecServerError> {
|
||||
Self::new_with_config_dir(config, /*config_dir*/ None)
|
||||
}
|
||||
|
||||
fn new_with_config_dir(
|
||||
config: EnvironmentsToml,
|
||||
config_dir: Option<&Path>,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let mut ids = HashSet::from([LOCAL_ENVIRONMENT_ID.to_string()]);
|
||||
let mut environments = Vec::with_capacity(config.environments.len());
|
||||
for item in config.environments {
|
||||
let (id, transport) = parse_environment_toml(item, config_dir)?;
|
||||
if !ids.insert(id.clone()) {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment id `{id}` is duplicated"
|
||||
)));
|
||||
}
|
||||
environments.push((id, transport));
|
||||
}
|
||||
let default = normalize_default_environment_id(config.default.as_deref(), &ids)?;
|
||||
Ok(Self {
|
||||
default,
|
||||
environments,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EnvironmentProvider for TomlEnvironmentProvider {
|
||||
async fn snapshot(
|
||||
&self,
|
||||
local_runtime_paths: &ExecServerRuntimePaths,
|
||||
) -> Result<EnvironmentProviderSnapshot, ExecServerError> {
|
||||
let mut environments = Vec::with_capacity(self.environments.len() + 1);
|
||||
environments.push((
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
Environment::local(local_runtime_paths.clone()),
|
||||
));
|
||||
for (id, transport_params) in &self.environments {
|
||||
environments.push((
|
||||
id.clone(),
|
||||
Environment::remote_with_transport(
|
||||
transport_params.clone(),
|
||||
Some(local_runtime_paths.clone()),
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(EnvironmentProviderSnapshot {
|
||||
environments,
|
||||
default: self.default.clone(),
|
||||
include_all_environments_by_default: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_environment_toml(
|
||||
item: EnvironmentToml,
|
||||
config_dir: Option<&Path>,
|
||||
) -> Result<(String, ExecServerTransportParams), ExecServerError> {
|
||||
let EnvironmentToml {
|
||||
id,
|
||||
url,
|
||||
program,
|
||||
args,
|
||||
env,
|
||||
cwd,
|
||||
} = item;
|
||||
validate_environment_id(&id)?;
|
||||
if program.is_none() && (args.is_some() || env.is_some() || cwd.is_some()) {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment `{id}` args, env, and cwd require program"
|
||||
)));
|
||||
}
|
||||
|
||||
let transport_params = match (url, program) {
|
||||
(Some(url), None) => {
|
||||
let url = validate_websocket_url(url)?;
|
||||
ExecServerTransportParams::WebSocketUrl(url)
|
||||
}
|
||||
(None, Some(program)) => {
|
||||
let program = program.trim().to_string();
|
||||
if program.is_empty() {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment `{id}` program cannot be empty"
|
||||
)));
|
||||
}
|
||||
let cwd = normalize_stdio_cwd(&id, cwd, config_dir)?;
|
||||
ExecServerTransportParams::StdioCommand(StdioExecServerCommand {
|
||||
program,
|
||||
args: args.unwrap_or_default(),
|
||||
env: env.unwrap_or_default(),
|
||||
cwd,
|
||||
})
|
||||
}
|
||||
(None, None) | (Some(_), Some(_)) => {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment `{id}` must set exactly one of url or program"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
Ok((id, transport_params))
|
||||
}
|
||||
|
||||
fn normalize_stdio_cwd(
|
||||
id: &str,
|
||||
cwd: Option<PathBuf>,
|
||||
config_dir: Option<&Path>,
|
||||
) -> Result<Option<PathBuf>, ExecServerError> {
|
||||
let Some(cwd) = cwd else {
|
||||
return Ok(None);
|
||||
};
|
||||
if cwd.is_absolute() {
|
||||
return Ok(Some(cwd));
|
||||
}
|
||||
let Some(config_dir) = config_dir else {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment `{id}` cwd must be absolute"
|
||||
)));
|
||||
};
|
||||
Ok(Some(config_dir.join(cwd)))
|
||||
}
|
||||
|
||||
pub(crate) fn environment_provider_from_codex_home(
|
||||
codex_home: &Path,
|
||||
) -> Result<Box<dyn EnvironmentProvider>, ExecServerError> {
|
||||
let path = codex_home.join(ENVIRONMENTS_TOML_FILE);
|
||||
if !path.try_exists().map_err(|err| {
|
||||
ExecServerError::Protocol(format!(
|
||||
"failed to inspect environment config `{}`: {err}",
|
||||
path.display()
|
||||
))
|
||||
})? {
|
||||
return Ok(Box::new(DefaultEnvironmentProvider::from_env()));
|
||||
}
|
||||
|
||||
let environments = load_environments_toml(&path)?;
|
||||
Ok(Box::new(TomlEnvironmentProvider::new_with_config_dir(
|
||||
environments,
|
||||
Some(codex_home),
|
||||
)?))
|
||||
}
|
||||
|
||||
fn normalize_default_environment_id(
|
||||
default: Option<&str>,
|
||||
ids: &HashSet<String>,
|
||||
) -> Result<EnvironmentDefault, ExecServerError> {
|
||||
let Some(default) = default.map(str::trim) else {
|
||||
return Ok(EnvironmentDefault::EnvironmentId(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
));
|
||||
};
|
||||
if default.is_empty() {
|
||||
return Err(ExecServerError::Protocol(
|
||||
"default environment id cannot be empty".to_string(),
|
||||
));
|
||||
}
|
||||
if !default.eq_ignore_ascii_case("none") && !ids.contains(default) {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"default environment `{default}` is not configured"
|
||||
)));
|
||||
}
|
||||
if default.eq_ignore_ascii_case("none") {
|
||||
Ok(EnvironmentDefault::Disabled)
|
||||
} else {
|
||||
Ok(EnvironmentDefault::EnvironmentId(default.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_environment_id(id: &str) -> Result<(), ExecServerError> {
|
||||
let trimmed_id = id.trim();
|
||||
if trimmed_id.is_empty() {
|
||||
return Err(ExecServerError::Protocol(
|
||||
"environment id cannot be empty".to_string(),
|
||||
));
|
||||
}
|
||||
if trimmed_id != id {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment id `{id}` must not contain surrounding whitespace"
|
||||
)));
|
||||
}
|
||||
if id == LOCAL_ENVIRONMENT_ID || id.eq_ignore_ascii_case("none") {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment id `{id}` is reserved"
|
||||
)));
|
||||
}
|
||||
if id.len() > MAX_ENVIRONMENT_ID_LEN {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment id `{id}` cannot be longer than {MAX_ENVIRONMENT_ID_LEN} characters"
|
||||
)));
|
||||
}
|
||||
if !id
|
||||
.chars()
|
||||
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_')
|
||||
{
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment id `{id}` must contain only ASCII letters, numbers, '-' or '_'"
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn validate_websocket_url(url: String) -> Result<String, ExecServerError> {
|
||||
let url = url.trim();
|
||||
if url.is_empty() {
|
||||
return Err(ExecServerError::Protocol(
|
||||
"environment url cannot be empty".to_string(),
|
||||
));
|
||||
}
|
||||
if !url.starts_with("ws://") && !url.starts_with("wss://") {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"environment url `{url}` must use ws:// or wss://"
|
||||
)));
|
||||
}
|
||||
url.into_client_request().map_err(|err| {
|
||||
ExecServerError::Protocol(format!("environment url `{url}` is invalid: {err}"))
|
||||
})?;
|
||||
Ok(url.to_string())
|
||||
}
|
||||
|
||||
fn load_environments_toml(path: &Path) -> Result<EnvironmentsToml, ExecServerError> {
|
||||
let contents = std::fs::read_to_string(path).map_err(|err| {
|
||||
ExecServerError::Protocol(format!(
|
||||
"failed to read environment config `{}`: {err}",
|
||||
path.display()
|
||||
))
|
||||
})?;
|
||||
|
||||
toml::from_str(&contents).map_err(|err| {
|
||||
ExecServerError::Protocol(format!(
|
||||
"failed to parse environment config `{}`: {err}",
|
||||
path.display()
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn test_runtime_paths() -> ExecServerRuntimePaths {
|
||||
ExecServerRuntimePaths::new(
|
||||
std::env::current_exe().expect("current exe"),
|
||||
/*codex_linux_sandbox_exe*/ None,
|
||||
)
|
||||
.expect("runtime paths")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn toml_provider_adds_implicit_local_and_configured_environments() {
|
||||
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: Some("ssh-dev".to_string()),
|
||||
environments: vec![
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: Some(" ws://127.0.0.1:8765 ".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
EnvironmentToml {
|
||||
id: "ssh-dev".to_string(),
|
||||
program: Some(" ssh ".to_string()),
|
||||
args: Some(vec![
|
||||
"dev".to_string(),
|
||||
"codex exec-server --listen stdio".to_string(),
|
||||
]),
|
||||
env: Some(HashMap::from([(
|
||||
"CODEX_LOG".to_string(),
|
||||
"debug".to_string(),
|
||||
)])),
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
})
|
||||
.expect("provider");
|
||||
let runtime_paths = test_runtime_paths();
|
||||
|
||||
let snapshot = provider
|
||||
.snapshot(&runtime_paths)
|
||||
.await
|
||||
.expect("environments");
|
||||
let EnvironmentProviderSnapshot {
|
||||
environments,
|
||||
default,
|
||||
include_all_environments_by_default,
|
||||
} = snapshot;
|
||||
let environment_ids: Vec<_> = environments
|
||||
.iter()
|
||||
.map(|(id, _environment)| id.as_str())
|
||||
.collect();
|
||||
assert_eq!(
|
||||
environment_ids,
|
||||
vec![LOCAL_ENVIRONMENT_ID, "devbox", "ssh-dev"]
|
||||
);
|
||||
let environments: HashMap<_, _> = environments.into_iter().collect();
|
||||
|
||||
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
|
||||
assert_eq!(
|
||||
environments["devbox"].exec_server_url(),
|
||||
Some("ws://127.0.0.1:8765")
|
||||
);
|
||||
assert!(environments["ssh-dev"].is_remote());
|
||||
assert_eq!(environments["ssh-dev"].exec_server_url(), None);
|
||||
assert_eq!(
|
||||
default,
|
||||
EnvironmentDefault::EnvironmentId("ssh-dev".to_string())
|
||||
);
|
||||
assert!(include_all_environments_by_default);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn toml_provider_default_omitted_selects_local() {
|
||||
let provider = TomlEnvironmentProvider::new(EnvironmentsToml::default()).expect("provider");
|
||||
let snapshot = provider
|
||||
.snapshot(&test_runtime_paths())
|
||||
.await
|
||||
.expect("environments");
|
||||
|
||||
assert_eq!(
|
||||
snapshot.default,
|
||||
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn toml_provider_default_none_disables_default() {
|
||||
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: Some("none".to_string()),
|
||||
environments: Vec::new(),
|
||||
})
|
||||
.expect("provider");
|
||||
let snapshot = provider
|
||||
.snapshot(&test_runtime_paths())
|
||||
.await
|
||||
.expect("environments");
|
||||
|
||||
assert_eq!(snapshot.default, EnvironmentDefault::Disabled);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_rejects_invalid_environments() {
|
||||
let cases = [
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "local".to_string(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
"environment id `local` is reserved",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: " devbox ".to_string(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
"environment id ` devbox ` must not contain surrounding whitespace",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "dev box".to_string(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
"environment id `dev box` must contain only ASCII letters, numbers, '-' or '_'",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: Some("http://127.0.0.1:8765".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
"environment url `http://127.0.0.1:8765` must use ws:// or wss://",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
program: Some("codex".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
"environment `devbox` must set exactly one of url or program",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
program: Some(" ".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
"environment `devbox` program cannot be empty",
|
||||
),
|
||||
(
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
args: Some(Vec::new()),
|
||||
..Default::default()
|
||||
},
|
||||
"environment `devbox` args, env, and cwd require program",
|
||||
),
|
||||
];
|
||||
|
||||
for (item, expected) in cases {
|
||||
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: None,
|
||||
environments: vec![item],
|
||||
})
|
||||
.expect_err("invalid item should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!("exec-server protocol error: {expected}")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_resolves_relative_stdio_cwd_from_config_dir() {
|
||||
let config_dir = tempdir().expect("tempdir");
|
||||
let provider = TomlEnvironmentProvider::new_with_config_dir(
|
||||
EnvironmentsToml {
|
||||
default: None,
|
||||
environments: vec![EnvironmentToml {
|
||||
id: "ssh-dev".to_string(),
|
||||
program: Some("ssh".to_string()),
|
||||
cwd: Some(PathBuf::from("workspace")),
|
||||
..Default::default()
|
||||
}],
|
||||
},
|
||||
Some(config_dir.path()),
|
||||
)
|
||||
.expect("provider");
|
||||
|
||||
assert_eq!(
|
||||
provider.environments[0].1,
|
||||
ExecServerTransportParams::StdioCommand(StdioExecServerCommand {
|
||||
program: "ssh".to_string(),
|
||||
args: Vec::new(),
|
||||
env: HashMap::new(),
|
||||
cwd: Some(config_dir.path().join("workspace")),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_rejects_relative_stdio_cwd_without_config_dir() {
|
||||
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: None,
|
||||
environments: vec![EnvironmentToml {
|
||||
id: "ssh-dev".to_string(),
|
||||
program: Some("ssh".to_string()),
|
||||
cwd: Some(PathBuf::from("workspace")),
|
||||
..Default::default()
|
||||
}],
|
||||
})
|
||||
.expect_err("relative cwd without config dir should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"exec-server protocol error: environment `ssh-dev` cwd must be absolute"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_rejects_duplicate_ids() {
|
||||
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: None,
|
||||
environments: vec![
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
program: Some("codex".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
})
|
||||
.expect_err("duplicate id should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"exec-server protocol error: environment id `devbox` is duplicated"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_rejects_overlong_id() {
|
||||
let id = "a".repeat(MAX_ENVIRONMENT_ID_LEN + 1);
|
||||
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: None,
|
||||
environments: vec![EnvironmentToml {
|
||||
id: id.clone(),
|
||||
url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
..Default::default()
|
||||
}],
|
||||
})
|
||||
.expect_err("overlong id should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!(
|
||||
"exec-server protocol error: environment id `{id}` cannot be longer than {MAX_ENVIRONMENT_ID_LEN} characters"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_rejects_unknown_default() {
|
||||
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: Some("missing".to_string()),
|
||||
environments: Vec::new(),
|
||||
})
|
||||
.expect_err("unknown default should fail");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"exec-server protocol error: default environment `missing` is not configured"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_environments_toml_reads_root_environment_list() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let path = codex_home.path().join(ENVIRONMENTS_TOML_FILE);
|
||||
std::fs::write(
|
||||
&path,
|
||||
r#"
|
||||
default = "ssh-dev"
|
||||
|
||||
[[environments]]
|
||||
id = "devbox"
|
||||
url = "ws://127.0.0.1:4512"
|
||||
|
||||
[[environments]]
|
||||
id = "ssh-dev"
|
||||
program = "ssh"
|
||||
args = ["dev", "codex exec-server --listen stdio"]
|
||||
cwd = "/tmp"
|
||||
[environments.env]
|
||||
CODEX_LOG = "debug"
|
||||
"#,
|
||||
)
|
||||
.expect("write environments.toml");
|
||||
|
||||
let environments = load_environments_toml(&path).expect("environments.toml");
|
||||
|
||||
assert_eq!(environments.default.as_deref(), Some("ssh-dev"));
|
||||
assert_eq!(environments.environments.len(), 2);
|
||||
assert_eq!(environments.environments[0].id, "devbox");
|
||||
assert_eq!(
|
||||
environments.environments[1],
|
||||
EnvironmentToml {
|
||||
id: "ssh-dev".to_string(),
|
||||
program: Some("ssh".to_string()),
|
||||
args: Some(vec![
|
||||
"dev".to_string(),
|
||||
"codex exec-server --listen stdio".to_string(),
|
||||
]),
|
||||
env: Some(HashMap::from([(
|
||||
"CODEX_LOG".to_string(),
|
||||
"debug".to_string(),
|
||||
)])),
|
||||
cwd: Some(PathBuf::from("/tmp")),
|
||||
..Default::default()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_environments_toml_rejects_unknown_fields() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cases = [
|
||||
("unknown = true\n", "unknown field `unknown`"),
|
||||
(
|
||||
r#"
|
||||
[[environments]]
|
||||
id = "devbox"
|
||||
url = "ws://127.0.0.1:4512"
|
||||
unknown = true
|
||||
"#,
|
||||
"unknown field `unknown`",
|
||||
),
|
||||
];
|
||||
|
||||
for (index, (contents, expected)) in cases.into_iter().enumerate() {
|
||||
let path = codex_home.path().join(format!("environments-{index}.toml"));
|
||||
std::fs::write(&path, contents).expect("write environments.toml");
|
||||
|
||||
let err = load_environments_toml(&path).expect_err("unknown field should fail");
|
||||
|
||||
assert!(
|
||||
err.to_string().contains(expected),
|
||||
"expected `{err}` to contain `{expected}`"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn toml_provider_rejects_malformed_websocket_url() {
|
||||
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
|
||||
default: None,
|
||||
environments: vec![EnvironmentToml {
|
||||
id: "devbox".to_string(),
|
||||
url: Some("ws://".to_string()),
|
||||
..Default::default()
|
||||
}],
|
||||
})
|
||||
.expect_err("malformed websocket url should fail");
|
||||
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("environment url `ws://` is invalid"),
|
||||
"expected malformed URL error, got `{err}`"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_provider_from_codex_home_uses_present_environments_file() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
std::fs::write(
|
||||
codex_home.path().join(ENVIRONMENTS_TOML_FILE),
|
||||
r#"
|
||||
default = "none"
|
||||
"#,
|
||||
)
|
||||
.expect("write environments.toml");
|
||||
|
||||
let provider =
|
||||
environment_provider_from_codex_home(codex_home.path()).expect("environment provider");
|
||||
|
||||
let snapshot = provider
|
||||
.snapshot(&test_runtime_paths())
|
||||
.await
|
||||
.expect("environments");
|
||||
let environment_ids: Vec<_> = snapshot
|
||||
.environments
|
||||
.into_iter()
|
||||
.map(|(id, _environment)| id)
|
||||
.collect();
|
||||
|
||||
assert!(environment_ids.contains(&LOCAL_ENVIRONMENT_ID.to_string()));
|
||||
assert_eq!(snapshot.default, EnvironmentDefault::Disabled);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_provider_from_codex_home_falls_back_when_file_is_missing() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
|
||||
let provider =
|
||||
environment_provider_from_codex_home(codex_home.path()).expect("environment provider");
|
||||
|
||||
let snapshot = provider
|
||||
.snapshot(&test_runtime_paths())
|
||||
.await
|
||||
.expect("environments");
|
||||
let environment_ids: Vec<_> = snapshot
|
||||
.environments
|
||||
.into_iter()
|
||||
.map(|(id, _environment)| id)
|
||||
.collect();
|
||||
|
||||
assert!(environment_ids.contains(&LOCAL_ENVIRONMENT_ID.to_string()));
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,10 @@
|
||||
mod client;
|
||||
mod client_api;
|
||||
mod client_transport;
|
||||
mod connection;
|
||||
mod environment;
|
||||
mod environment_provider;
|
||||
mod environment_toml;
|
||||
mod fs_helper;
|
||||
mod fs_helper_main;
|
||||
mod fs_sandbox;
|
||||
@@ -37,11 +39,9 @@ pub use codex_file_system::RemoveOptions;
|
||||
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
|
||||
pub use environment::Environment;
|
||||
pub use environment::EnvironmentManager;
|
||||
pub use environment::EnvironmentManagerArgs;
|
||||
pub use environment::LOCAL_ENVIRONMENT_ID;
|
||||
pub use environment::REMOTE_ENVIRONMENT_ID;
|
||||
pub use environment_provider::DefaultEnvironmentProvider;
|
||||
pub use environment_provider::EnvironmentProvider;
|
||||
pub use fs_helper::CODEX_FS_HELPER_ARG1;
|
||||
pub use fs_helper_main::main as run_fs_helper_main;
|
||||
pub use local_file_system::LOCAL_FS;
|
||||
|
||||
@@ -23,6 +23,7 @@ use tokio::task::JoinHandle;
|
||||
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::connection::JsonRpcConnectionEvent;
|
||||
use crate::connection::JsonRpcTransport;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum RpcCallError {
|
||||
@@ -58,11 +59,9 @@ pub(crate) enum RpcServerOutboundMessage {
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
},
|
||||
#[allow(dead_code)]
|
||||
Notification(JSONRPCNotification),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RpcNotificationSender {
|
||||
outgoing_tx: mpsc::Sender<RpcServerOutboundMessage>,
|
||||
@@ -84,7 +83,6 @@ impl RpcNotificationSender {
|
||||
.map_err(|_| internal_error("RPC connection closed while sending response".into()))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn notify<P: Serialize>(
|
||||
&self,
|
||||
method: &str,
|
||||
@@ -229,43 +227,55 @@ pub(crate) struct RpcClient {
|
||||
disconnected_rx: watch::Receiver<bool>,
|
||||
next_request_id: AtomicI64,
|
||||
transport_tasks: Vec<JoinHandle<()>>,
|
||||
transport: JsonRpcTransport,
|
||||
reader_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl RpcClient {
|
||||
pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver<RpcClientEvent>) {
|
||||
let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks) = connection.into_parts();
|
||||
let JsonRpcConnection {
|
||||
outgoing_tx: write_tx,
|
||||
mut incoming_rx,
|
||||
disconnected_rx,
|
||||
task_handles: transport_tasks,
|
||||
transport,
|
||||
} = connection;
|
||||
let pending = Arc::new(Mutex::new(HashMap::<RequestId, PendingRequest>::new()));
|
||||
let (event_tx, event_rx) = mpsc::channel(128);
|
||||
|
||||
let pending_for_reader = Arc::clone(&pending);
|
||||
let transport_for_reader = transport.clone();
|
||||
let reader_task = tokio::spawn(async move {
|
||||
while let Some(event) = incoming_rx.recv().await {
|
||||
let disconnect_reason = loop {
|
||||
let Some(event) = incoming_rx.recv().await else {
|
||||
break None;
|
||||
};
|
||||
match event {
|
||||
JsonRpcConnectionEvent::Message(message) => {
|
||||
if let Err(err) =
|
||||
handle_server_message(&pending_for_reader, &event_tx, message).await
|
||||
{
|
||||
let _ = err;
|
||||
break;
|
||||
break None;
|
||||
}
|
||||
}
|
||||
JsonRpcConnectionEvent::MalformedMessage { reason } => {
|
||||
let _ = reason;
|
||||
break;
|
||||
break None;
|
||||
}
|
||||
JsonRpcConnectionEvent::Disconnected { reason } => {
|
||||
let _ = event_tx.send(RpcClientEvent::Disconnected { reason }).await;
|
||||
drain_pending(&pending_for_reader).await;
|
||||
return;
|
||||
break reason;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let _ = event_tx
|
||||
.send(RpcClientEvent::Disconnected { reason: None })
|
||||
.send(RpcClientEvent::Disconnected {
|
||||
reason: disconnect_reason,
|
||||
})
|
||||
.await;
|
||||
drain_pending(&pending_for_reader).await;
|
||||
transport_for_reader.terminate();
|
||||
});
|
||||
|
||||
(
|
||||
@@ -275,6 +285,7 @@ impl RpcClient {
|
||||
disconnected_rx,
|
||||
next_request_id: AtomicI64::new(1),
|
||||
transport_tasks,
|
||||
transport,
|
||||
reader_task,
|
||||
},
|
||||
event_rx,
|
||||
@@ -357,7 +368,6 @@ impl RpcClient {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn pending_request_count(&self) -> usize {
|
||||
self.pending.lock().await.len()
|
||||
}
|
||||
@@ -365,6 +375,7 @@ impl RpcClient {
|
||||
|
||||
impl Drop for RpcClient {
|
||||
fn drop(&mut self) {
|
||||
self.transport.terminate();
|
||||
for task in &self.transport_tasks {
|
||||
task.abort();
|
||||
}
|
||||
@@ -522,7 +533,9 @@ mod tests {
|
||||
use tokio::io::BufReader;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::RpcCallError;
|
||||
use super::RpcClient;
|
||||
use super::RpcClientEvent;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
|
||||
async fn read_jsonrpc_line<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> JSONRPCMessage
|
||||
@@ -565,11 +578,9 @@ mod tests {
|
||||
async fn rpc_client_matches_out_of_order_responses_by_request_id() {
|
||||
let (client_stdin, server_reader) = tokio::io::duplex(4096);
|
||||
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
|
||||
let (client, _events_rx) = RpcClient::new(JsonRpcConnection::from_stdio(
|
||||
client_stdout,
|
||||
client_stdin,
|
||||
"test-rpc".to_string(),
|
||||
));
|
||||
let connection =
|
||||
JsonRpcConnection::from_stdio(client_stdout, client_stdin, "test-rpc".to_string());
|
||||
let (client, _events_rx) = RpcClient::new(connection);
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(server_reader).lines();
|
||||
@@ -628,4 +639,41 @@ mod tests {
|
||||
panic!("server task failed: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stdio_malformed_message_closes_client_and_rejects_later_calls() {
|
||||
let (client_stdin, _server_reader) = tokio::io::duplex(4096);
|
||||
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
|
||||
let connection =
|
||||
JsonRpcConnection::from_stdio(client_stdout, client_stdin, "test-rpc".to_string());
|
||||
let (client, mut events_rx) = RpcClient::new(connection);
|
||||
|
||||
server_writer
|
||||
.write_all(b"not-json\n")
|
||||
.await
|
||||
.expect("malformed line should write");
|
||||
|
||||
let event = timeout(Duration::from_secs(1), events_rx.recv())
|
||||
.await
|
||||
.expect("disconnect event should not time out")
|
||||
.expect("disconnect event should be sent");
|
||||
let RpcClientEvent::Disconnected { reason } = event else {
|
||||
panic!("expected disconnect event after malformed stdio message");
|
||||
};
|
||||
assert!(
|
||||
reason
|
||||
.as_deref()
|
||||
.is_some_and(|reason| reason.contains("failed to parse JSON-RPC message")),
|
||||
"disconnect should explain malformed stdio message, got {reason:?}"
|
||||
);
|
||||
|
||||
let result = timeout(
|
||||
Duration::from_secs(1),
|
||||
client.call::<_, serde_json::Value>("after-malformed", &serde_json::json!({})),
|
||||
)
|
||||
.await
|
||||
.expect("later call should fail instead of hanging");
|
||||
assert!(matches!(result, Err(RpcCallError::Closed)));
|
||||
assert_eq!(client.pending_request_count().await, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,8 +47,13 @@ async fn run_connection(
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
) {
|
||||
let router = Arc::new(build_router());
|
||||
let (json_outgoing_tx, mut incoming_rx, mut disconnected_rx, connection_tasks) =
|
||||
connection.into_parts();
|
||||
let JsonRpcConnection {
|
||||
outgoing_tx: json_outgoing_tx,
|
||||
mut incoming_rx,
|
||||
mut disconnected_rx,
|
||||
task_handles: connection_tasks,
|
||||
transport: _transport,
|
||||
} = connection;
|
||||
let (outgoing_tx, mut outgoing_rx) =
|
||||
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
|
||||
let notifications = RpcNotificationSender::new(outgoing_tx.clone());
|
||||
|
||||
@@ -15,7 +15,6 @@ pub use cli::Command;
|
||||
pub use cli::ReviewArgs;
|
||||
use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
||||
use codex_app_server_client::EnvironmentManager;
|
||||
use codex_app_server_client::EnvironmentManagerArgs;
|
||||
use codex_app_server_client::ExecServerRuntimePaths;
|
||||
use codex_app_server_client::InProcessAppServerClient;
|
||||
use codex_app_server_client::InProcessClientStartArgs;
|
||||
@@ -509,6 +508,11 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let state_db = codex_core::init_state_db(&config).await;
|
||||
let environment_manager = if run_loader_overrides.ignore_user_config {
|
||||
EnvironmentManager::from_env(local_runtime_paths).await?
|
||||
} else {
|
||||
EnvironmentManager::from_codex_home(config.codex_home.clone(), local_runtime_paths).await?
|
||||
};
|
||||
let in_process_start_args = InProcessClientStartArgs {
|
||||
arg0_paths,
|
||||
config: std::sync::Arc::new(config.clone()),
|
||||
@@ -518,9 +522,7 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
state_db: state_db.clone(),
|
||||
environment_manager: std::sync::Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await,
|
||||
),
|
||||
environment_manager: std::sync::Arc::new(environment_manager),
|
||||
config_warnings,
|
||||
session_source: SessionSource::Exec,
|
||||
enable_codex_api_key_env: true,
|
||||
|
||||
@@ -9,7 +9,6 @@ use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::resolve_installation_id;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_login::default_client::set_default_client_residency_requirement;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
@@ -61,15 +60,6 @@ pub async fn run_main(
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
cli_config_overrides: CliConfigOverrides,
|
||||
) -> IoResult<()> {
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
))
|
||||
.await,
|
||||
);
|
||||
// Parse CLI overrides once and derive the base Config eagerly so later
|
||||
// components do not need to work with raw TOML values.
|
||||
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
|
||||
@@ -84,6 +74,17 @@ pub async fn run_main(
|
||||
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
|
||||
})?;
|
||||
set_default_client_residency_requirement(config.enforce_residency.value());
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::from_codex_home(
|
||||
config.codex_home.clone(),
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
)
|
||||
.await
|
||||
.map_err(std::io::Error::other)?,
|
||||
);
|
||||
|
||||
let otel = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
|
||||
@@ -20,7 +20,6 @@ use codex_core_api::Config;
|
||||
use codex_core_api::ConfigLayerStack;
|
||||
use codex_core_api::Constrained;
|
||||
use codex_core_api::EnvironmentManager;
|
||||
use codex_core_api::EnvironmentManagerArgs;
|
||||
use codex_core_api::EventMsg;
|
||||
use codex_core_api::ExecServerRuntimePaths;
|
||||
use codex_core_api::Features;
|
||||
@@ -118,8 +117,9 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
};
|
||||
let thread_store = thread_store_from_config(&config, state_db.clone());
|
||||
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
|
||||
let environment_manager =
|
||||
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await);
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::from_codex_home(config.codex_home.clone(), local_runtime_paths).await?,
|
||||
);
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let thread_manager = ThreadManager::new(
|
||||
&config,
|
||||
|
||||
@@ -575,6 +575,7 @@ impl App {
|
||||
) -> crate::chatwidget::ChatWidgetInit {
|
||||
crate::chatwidget::ChatWidgetInit {
|
||||
config: cfg,
|
||||
environment_manager: self.environment_manager.clone(),
|
||||
frame_requester: tui.frame_requester(),
|
||||
app_event_tx: self.app_event_tx.clone(),
|
||||
workspace_command_runner: self.workspace_command_runner.clone(),
|
||||
@@ -739,6 +740,7 @@ impl App {
|
||||
.await;
|
||||
let init = crate::chatwidget::ChatWidgetInit {
|
||||
config: config.clone(),
|
||||
environment_manager: environment_manager.clone(),
|
||||
frame_requester: tui.frame_requester(),
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
workspace_command_runner: Some(workspace_command_runner.clone()),
|
||||
@@ -775,6 +777,7 @@ impl App {
|
||||
})?;
|
||||
let init = crate::chatwidget::ChatWidgetInit {
|
||||
config: config.clone(),
|
||||
environment_manager: environment_manager.clone(),
|
||||
frame_requester: tui.frame_requester(),
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
workspace_command_runner: Some(workspace_command_runner.clone()),
|
||||
@@ -816,6 +819,7 @@ impl App {
|
||||
})?;
|
||||
let init = crate::chatwidget::ChatWidgetInit {
|
||||
config: config.clone(),
|
||||
environment_manager: environment_manager.clone(),
|
||||
frame_requester: tui.frame_requester(),
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
workspace_command_runner: Some(workspace_command_runner.clone()),
|
||||
|
||||
@@ -435,6 +435,7 @@ async fn enqueue_primary_thread_session_replays_turns_before_initial_prompt_subm
|
||||
let model = crate::legacy_core::test_support::get_model_offline(config.model.as_deref());
|
||||
app.chat_widget = ChatWidget::new_with_app_event(ChatWidgetInit {
|
||||
config,
|
||||
environment_manager: app.environment_manager.clone(),
|
||||
frame_requester: crate::tui::FrameRequester::test_dummy(),
|
||||
app_event_tx: app.app_event_tx.clone(),
|
||||
workspace_command_runner: None,
|
||||
@@ -4825,6 +4826,7 @@ async fn replace_chat_widget_reseeds_collab_agent_metadata_for_replay() {
|
||||
|
||||
let replacement = ChatWidget::new_with_app_event(ChatWidgetInit {
|
||||
config: app.config.clone(),
|
||||
environment_manager: app.environment_manager.clone(),
|
||||
frame_requester: crate::tui::FrameRequester::test_dummy(),
|
||||
app_event_tx: app.app_event_tx.clone(),
|
||||
workspace_command_runner: None,
|
||||
|
||||
@@ -129,6 +129,7 @@ use codex_config::types::ApprovalsReviewer;
|
||||
use codex_config::types::Notifications;
|
||||
use codex_config::types::WindowsSandboxModeToml;
|
||||
use codex_core_skills::model::SkillMetadata;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
#[cfg(test)]
|
||||
@@ -558,6 +559,7 @@ pub(crate) fn get_limits_duration(windows_minutes: i64) -> String {
|
||||
/// Common initialization parameters shared by all `ChatWidget` constructors.
|
||||
pub(crate) struct ChatWidgetInit {
|
||||
pub(crate) config: Config,
|
||||
pub(crate) environment_manager: Arc<EnvironmentManager>,
|
||||
pub(crate) frame_requester: FrameRequester,
|
||||
pub(crate) app_event_tx: AppEventSender,
|
||||
/// App-server-backed runner used by status surfaces for workspace metadata probes.
|
||||
@@ -759,6 +761,7 @@ pub(crate) struct ChatWidget {
|
||||
/// where the overlay may briefly treat new tail content as already cached.
|
||||
active_cell_revision: u64,
|
||||
config: Config,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
raw_output_mode: bool,
|
||||
/// Runtime value resolved by core. `config.service_tier` remains the explicit user choice.
|
||||
effective_service_tier: Option<ServiceTier>,
|
||||
@@ -4841,6 +4844,7 @@ impl ChatWidget {
|
||||
fn new_with_op_target(common: ChatWidgetInit, codex_op_target: CodexOpTarget) -> Self {
|
||||
let ChatWidgetInit {
|
||||
config,
|
||||
environment_manager,
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
workspace_command_runner,
|
||||
@@ -4929,6 +4933,7 @@ impl ChatWidget {
|
||||
active_cell_revision: 0,
|
||||
raw_output_mode: config.tui_raw_output_mode,
|
||||
config,
|
||||
environment_manager,
|
||||
effective_service_tier,
|
||||
skills_all: Vec::new(),
|
||||
skills_initial_state: None,
|
||||
@@ -7138,12 +7143,14 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
let config = self.config.clone();
|
||||
let environment_manager = Arc::clone(&self.environment_manager);
|
||||
let app_event_tx = self.app_event_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let accessible_result =
|
||||
match connectors::list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
match connectors::list_accessible_connectors_from_mcp_tools_with_environment_manager(
|
||||
&config,
|
||||
force_refetch,
|
||||
&environment_manager,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -195,6 +195,7 @@ pub(super) async fn make_chatwidget_manual(
|
||||
raw_output_mode: cfg.tui_raw_output_mode,
|
||||
config: cfg,
|
||||
effective_service_tier,
|
||||
environment_manager: Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
current_collaboration_mode,
|
||||
active_collaboration_mask,
|
||||
has_chatgpt_account: false,
|
||||
|
||||
@@ -1536,6 +1536,7 @@ async fn make_startup_chat_with_cli_overrides(
|
||||
let session_telemetry = test_session_telemetry(&cfg, resolved_model.as_str());
|
||||
let init = ChatWidgetInit {
|
||||
config: cfg.clone(),
|
||||
environment_manager: Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
frame_requester: FrameRequester::test_dummy(),
|
||||
app_event_tx: AppEventSender::new(unbounded_channel::<AppEvent>().0),
|
||||
workspace_command_runner: None,
|
||||
|
||||
@@ -70,6 +70,7 @@ async fn experimental_mode_plan_is_ignored_on_startup() {
|
||||
let session_telemetry = test_session_telemetry(&cfg, resolved_model.as_str());
|
||||
let init = ChatWidgetInit {
|
||||
config: cfg.clone(),
|
||||
environment_manager: Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
frame_requester: FrameRequester::test_dummy(),
|
||||
app_event_tx: AppEventSender::new(unbounded_channel::<AppEvent>().0),
|
||||
workspace_command_runner: None,
|
||||
|
||||
@@ -246,6 +246,7 @@ async fn helpers_are_available_and_do_not_panic() {
|
||||
let session_telemetry = test_session_telemetry(&cfg, resolved_model.as_str());
|
||||
let init = ChatWidgetInit {
|
||||
config: cfg.clone(),
|
||||
environment_manager: Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
frame_requester: FrameRequester::test_dummy(),
|
||||
app_event_tx: tx,
|
||||
workspace_command_runner: None,
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::legacy_core::config::Config;
|
||||
use crate::legacy_core::config::ConfigBuilder;
|
||||
use crate::legacy_core::config::ConfigOverrides;
|
||||
use crate::legacy_core::config::find_codex_home;
|
||||
use crate::legacy_core::config::load_config_as_toml_with_cli_overrides;
|
||||
use crate::legacy_core::config::load_config_as_toml_with_cli_and_loader_overrides;
|
||||
use crate::legacy_core::config::resolve_oss_provider;
|
||||
use crate::legacy_core::format_exec_policy_error_with_source;
|
||||
use crate::legacy_core::windows_sandbox::WindowsSandboxLevelExt;
|
||||
@@ -40,7 +40,6 @@ use codex_config::ConfigLoadError;
|
||||
use codex_config::LoaderOverrides;
|
||||
use codex_config::format_config_error_with_source;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_login::AuthConfig;
|
||||
use codex_login::default_client::set_default_client_residency_requirement;
|
||||
@@ -661,10 +660,10 @@ fn config_cwd_for_app_server_target(
|
||||
app_server_target: &AppServerTarget,
|
||||
environment_manager: &EnvironmentManager,
|
||||
) -> std::io::Result<Option<AbsolutePathBuf>> {
|
||||
if environment_manager
|
||||
.default_environment()
|
||||
.is_some_and(|environment| environment.is_remote())
|
||||
|| matches!(app_server_target, AppServerTarget::Remote { .. })
|
||||
if matches!(app_server_target, AppServerTarget::Remote { .. })
|
||||
|| environment_manager
|
||||
.default_environment()
|
||||
.is_some_and(|environment| environment.is_remote())
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -678,6 +677,14 @@ fn config_cwd_for_app_server_target(
|
||||
Ok(Some(cwd))
|
||||
}
|
||||
|
||||
fn should_load_configured_environments(
|
||||
loader_overrides: &LoaderOverrides,
|
||||
app_server_target: &AppServerTarget,
|
||||
) -> bool {
|
||||
!loader_overrides.ignore_user_config
|
||||
&& !matches!(app_server_target, AppServerTarget::Remote { .. })
|
||||
}
|
||||
|
||||
fn latest_session_cwd_filter<'a>(
|
||||
remote_mode: bool,
|
||||
remote_cwd_override: Option<&'a Path>,
|
||||
@@ -761,24 +768,28 @@ pub async fn run_main(
|
||||
}
|
||||
};
|
||||
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::new(EnvironmentManagerArgs::new(
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
))
|
||||
.await,
|
||||
);
|
||||
let local_runtime_paths = ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let environment_manager =
|
||||
if should_load_configured_environments(&loader_overrides, &app_server_target) {
|
||||
EnvironmentManager::from_codex_home(codex_home.clone(), local_runtime_paths).await
|
||||
} else {
|
||||
EnvironmentManager::from_env(local_runtime_paths).await
|
||||
}
|
||||
.map(Arc::new)
|
||||
.map_err(std::io::Error::other)?;
|
||||
let cwd = cli.cwd.clone();
|
||||
let config_cwd =
|
||||
config_cwd_for_app_server_target(cwd.as_deref(), &app_server_target, &environment_manager)?;
|
||||
|
||||
#[allow(clippy::print_stderr)]
|
||||
let config_toml = match load_config_as_toml_with_cli_overrides(
|
||||
let config_toml = match load_config_as_toml_with_cli_and_loader_overrides(
|
||||
&codex_home,
|
||||
config_cwd.as_ref(),
|
||||
cli_kv_overrides.clone(),
|
||||
loader_overrides.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user