Compare commits

...

58 Commits

Author SHA1 Message Date
starr-openai
0c5b771e94 codex: format review-fix rebase (#21439)
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:53 -07:00
starr-openai
5f78998ddf Tighten exec-server review followups
Treat malformed stdio JSON-RPC output as a terminal disconnect so later calls fail through the existing closed path instead of waiting after the reader exits. Keep the environment provider snapshot/default contract internal by removing the public trait surface from the crate root.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:53 -07:00
starr-openai
da52fe6f43 codex: await test environment manager setup (#20667)
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:42 -07:00
starr-openai
f69cd0bf99 Fix environment startup defaults and sample build
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:42 -07:00
starr-openai
f4f9839a79 Strengthen multi-environment startup regression
Exercise a non-local configured default so the test proves default-first ordering rather than relying on the implicit local environment already being first.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:42 -07:00
starr-openai
8436adb8e3 Attach configured environments on thread startup
Preserve provider environment order and derive startup selections from the configured default plus remaining environments. This lets multi-environment CODEX_HOME setups attach every configured environment by default while keeping the default first as primary.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:42 -07:00
starr-openai
72453d70e3 Fix prompt debug formatting
Apply the rustfmt indentation reported by the PR20667 Format / etc CI job after the stack rebase.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:41 -07:00
starr-openai
394b50eafb Use test environment manager in chat widget helper
Keep the direct ChatWidget test constructor aligned with the production initializer after wiring the active environment manager through the TUI.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:41 -07:00
starr-openai
9f2a9b32e7 Use active environment manager for TUI connectors
Pass the app-owned EnvironmentManager into ChatWidget so connector prefetch uses the same environment selection that the session was initialized with, instead of reconstructing it from config.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:41 -07:00
starr-openai
5f009d27c1 Load configured environments from CODEX_HOME
Thread codex_home into EnvironmentManager construction so app entrypoints load environments.toml when present and continue falling back to the legacy CODEX_EXEC_SERVER_URL provider otherwise.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:41 -07:00
starr-openai
90e4520bf7 Simplify environment provider defaults
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:41 -07:00
starr-openai
b49ff33dc9 codex: remove duplicate environment manager constructors (#20666)
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:06 -07:00
starr-openai
e9f0eaf8ed Expose CODEX_HOME environment manager constructor
Make the environments.toml provider reachable from the exec-server crate API so the provider PR passes clippy before entrypoint wiring lands in the next stack PR.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:06 -07:00
starr-openai
525ed52b18 Align TOML provider with snapshot trait
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:06 -07:00
starr-openai
74df186f8e Narrow exec server URL accessor
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:06 -07:00
starr-openai
bea492f54d Limit TOML provider test constructor to tests
Avoid keeping the test-only constructor in normal builds now that production construction uses the config-dir aware path.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:05 -07:00
starr-openai
84c8e21b31 Fix environments TOML lint coverage
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:05 -07:00
starr-openai
ef74ece7e6 Add CODEX_HOME environments TOML provider
Add the environments.toml schema, parser, validation, and provider implementation for configured websocket and stdio-command environments. This keeps the provider load helper available but does not make product entrypoints use it yet.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:05 -07:00
starr-openai
1fa0fec4dd Represent provider defaults with snapshots
Keep EnvironmentManager construction async to preserve caller behavior while moving provider-owned default selection into a single snapshot object.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:05 -07:00
starr-openai
0a7006cebc Simplify environment provider defaults
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:05 -07:00
starr-openai
0db1e4b4d9 Fix exec-server transport CI failures
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:22:05 -07:00
starr-openai
dc926a56c7 Represent provider defaults with snapshots
Keep EnvironmentManager construction async to preserve caller behavior while moving provider-owned default selection into a single snapshot object.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:21:56 -07:00
starr-openai
a2ef8e05b5 Simplify environment provider defaults
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:21:56 -07:00
starr-openai
5086768859 Inline provider manager construction
Remove the private from_provider_parts helper. EnvironmentManager::from_provider now performs the provider read, validation, and manager construction directly, and tests use a small provider implementation instead of bypassing that path.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:21:56 -07:00
starr-openai
b8f4ee4439 Split provider environments from default id
Remove the EnvironmentProviderSnapshot wrapper. Providers now expose environments and the selected default id directly, while EnvironmentManager validates that the default id exists in the returned environment map.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:21:56 -07:00
starr-openai
7a8bed96eb Return provider environment snapshots
Make environment providers return the environment map and default id together. This keeps provider-owned startup state in one boundary and removes the separate default callback over a map.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:21:55 -07:00
starr-openai
93f68577ed Simplify provider default environment selection
Have providers return a concrete default environment id after constructing their environment map, using None to disable the default. This removes the DefaultEnvironmentSelection tri-state while preserving legacy derived defaults through the trait's default implementation.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:21:55 -07:00
starr-openai
a970e46442 Fix environment manager clippy lints
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:21:55 -07:00
starr-openai
729d8109a3 Make environment providers own default selection
Let environment providers return an explicit default selection and let remote environments track the underlying transport instead of treating only websocket URLs as remote. This prepares the environment layer for stdio-backed remotes without introducing config-file loading.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:21:55 -07:00
starr-openai
1bfe59e42d codex: fix Windows stdio transport clippy (#20664)
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:21:42 -07:00
starr-openai
26899a2d5b codex: address stdio transport review feedback (#20664)
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 16:06:20 -07:00
starr-openai
9f125d25cb Use PowerShell for Windows stdio test helper
Avoid cmd.exe echo quoting semantics in the Windows stdio client test by reading stdin and writing the JSON-RPC initialize response from PowerShell.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:46 -07:00
starr-openai
256760d6b9 Fix Windows stdio test JSON quoting
Escape the JSON-RPC response quotes in the cmd.exe stdio test command so Windows emits valid JSON before the client initialize timeout.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:46 -07:00
starr-openai
e58b331d8f Apply rustfmt to stdio transport guard
Match the rustfmt shape reported by the PR20664 Format / etc CI job after boxing the retained stdio transport guard.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:46 -07:00
starr-openai
dd1c9ff41a Box retained stdio transport guard
Avoid the Windows clippy large-enum-variant failure while preserving the retained stdio child cleanup guard behavior.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:46 -07:00
starr-openai
62bd368d38 Fix stdio transport clippy issues
Keep the stack-introduced stdio transport variant explicit while avoiding dead-code and redundant-pattern lints reported by PR20664 CI.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:45 -07:00
starr-openai
28b23c5cd3 Narrow stdio client lifetime handling
Keep the retained transport ownership needed for stdio child cleanup, but drop the broader AtomicBool closed-state behavior and its targeted tests from this PR.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:45 -07:00
starr-openai
3ff901257a Flatten JSON-RPC connection state
Drop the separate JsonRpcConnectionRuntime wrapper so JsonRpcConnection directly owns the channels, disconnect watch, transport tasks, and transport guard. This keeps the lifetime model explicit without helper extraction methods.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:45 -07:00
starr-openai
c72f484068 Simplify exec-server connection ownership
Remove the runtime extraction helpers and make JsonRpcConnection ownership explicit at the destructuring sites. Let the stdio transport clean up through Drop so ExecServerClient no longer needs to call an explicit shutdown hook.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:45 -07:00
starr-openai
7557a7307a Restore exec-server processor ownership boundary
Keep the server-side connection processor on the original by-value parts API, and move the compatibility needed for that shape into JsonRpcConnection. The client still borrows the connection mutably so it can keep transport ownership with ExecServerClient.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:45 -07:00
starr-openai
08795f1b65 Simplify exec-server transport ownership
Remove the Option wrapper used only to force connection drop order and call transport shutdown explicitly instead. Also drop dead-code allowances that are no longer needed.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:45 -07:00
starr-openai
f47954caef Remove server disconnect race test
The stdio transport no longer adds a processor-side disconnect side channel, so drop the test that asserted that removed behavior. Client cleanup is covered at the RPC/client transport boundary instead.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:44 -07:00
starr-openai
c317a66c61 Simplify exec-server disconnect plumbing
Keep transport shutdown responsible for stdio child cleanup, and remove the separate disconnect watch channel from the JSON-RPC connection/runtime. The RPC client now keeps a single closed flag for rejecting calls after the ordered reader exits.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:44 -07:00
starr-openai
d4b347176a Fix exec-server transport CI failures
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:44 -07:00
starr-openai
6a7112ad21 Rename exec-server transport input params
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:44 -07:00
starr-openai
b4269e85ff Split JSON-RPC transport variants
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:44 -07:00
starr-openai
29f8812a83 Model retained JSON-RPC transport generically
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:43 -07:00
starr-openai
942a674042 Name retained exec-server connection field
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:43 -07:00
starr-openai
6ed49d62d7 Order exec-server transport teardown before RPC teardown
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:43 -07:00
starr-openai
045c740618 Clarify exec-server transport connect naming
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:43 -07:00
starr-openai
21297834ed Simplify stdio exec-server transport ownership
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:42 -07:00
starr-openai
c00a36e727 Address stdio exec-server review feedback
Spawn stdio exec-server commands directly from structured argv/env/cwd instead of wrapping a shell string, redact the connection label, and tie the stdio child guard to transport disconnect.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:42 -07:00
starr-openai
74e96987b8 Simplify exec-server transport internals
Keep environment transport connection policy on ExecServerClient instead of the transport enum, and replace the JSON-RPC connection tuple alias with named connection parts.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:42 -07:00
starr-openai
caea51d3b7 Clean up stdio client process groups
Use the existing process-group cleanup pattern for stdio command transports so wrapper shell children are terminated with the client lifetime. Add a regression test that drops the client after spawning a background shell child through the command-backed transport.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:42 -07:00
starr-openai
c956939cc6 Clarify exec-server transport lifetime ownership
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:42 -07:00
starr-openai
0bb3f728e1 Remove duplicate stdio client test import
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:42 -07:00
starr-openai
995a669971 Make exec-server RPC client Send-safe
Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:42 -07:00
starr-openai
9face2bcbf Add stdio exec-server client transport
Allow exec-server clients to connect through a shell command over stdio. The connection can now retain a drop resource so the spawned child is terminated when the JSON-RPC client is dropped.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 19:19:41 -07:00
32 changed files with 1996 additions and 260 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2730,6 +2730,7 @@ dependencies = [
"tokio",
"tokio-tungstenite",
"tokio-util",
"toml 0.9.11+spec-1.1.0",
"tracing",
"uuid",
"wiremock",

View File

@@ -49,7 +49,6 @@ use codex_config::ThreadConfigLoader;
pub use codex_core::StateDbHandle;
use codex_core::config::Config;
pub use codex_exec_server::EnvironmentManager;
pub use codex_exec_server::EnvironmentManagerArgs;
pub use codex_exec_server::ExecServerRuntimePaths;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;

View File

@@ -8,7 +8,6 @@ use codex_config::RemoteThreadConfigLoader;
use codex_config::ThreadConfigLoader;
use codex_core::config::Config;
use codex_core::resolve_installation_id;
use codex_exec_server::EnvironmentManagerArgs;
use codex_features::Feature;
use codex_login::AuthManager;
use codex_utils_cli::CliConfigOverrides;
@@ -420,15 +419,6 @@ pub async fn run_main_with_transport_options(
auth: AppServerWebsocketAuthSettings,
runtime_options: AppServerRuntimeOptions,
) -> IoResult<()> {
let environment_manager = Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
))
.await,
);
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
@@ -444,6 +434,17 @@ pub async fn run_main_with_transport_options(
)
})?;
let codex_home = find_codex_home()?;
let local_runtime_paths = ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
let environment_manager = if loader_overrides.ignore_user_config {
EnvironmentManager::from_env(local_runtime_paths).await
} else {
EnvironmentManager::from_codex_home(codex_home.clone(), local_runtime_paths).await
}
.map(Arc::new)
.map_err(std::io::Error::other)?;
let config_manager = ConfigManager::new(
codex_home.to_path_buf(),
cli_kv_overrides.clone(),

View File

@@ -46,7 +46,6 @@ pub use codex_core::resolve_installation_id;
pub use codex_core::skills::SkillsManager;
pub use codex_core::thread_store_from_config;
pub use codex_exec_server::EnvironmentManager;
pub use codex_exec_server::EnvironmentManagerArgs;
pub use codex_exec_server::ExecServerRuntimePaths;
pub use codex_features::Feature;
pub use codex_features::Features;

View File

@@ -15,7 +15,6 @@ pub use codex_app_server_protocol::AppMetadata;
use codex_connectors::AllConnectorsCacheKey;
use codex_connectors::DirectoryListResponse;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::EnvironmentManagerArgs;
use codex_exec_server::ExecServerRuntimePaths;
use codex_protocol::models::PermissionProfile;
use codex_tools::DiscoverableTool;
@@ -202,7 +201,7 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
config.codex_linux_sandbox_exe.clone(),
)?;
let environment_manager =
EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await;
EnvironmentManager::from_codex_home(config.codex_home.clone(), local_runtime_paths).await?;
list_accessible_connectors_from_mcp_tools_with_environment_manager(
config,
force_refetch,

View File

@@ -15,12 +15,12 @@ pub(crate) fn default_thread_environment_selections(
cwd: &AbsolutePathBuf,
) -> Vec<TurnEnvironmentSelection> {
environment_manager
.default_environment_id()
.default_environment_ids()
.into_iter()
.map(|environment_id| TurnEnvironmentSelection {
environment_id: environment_id.to_string(),
environment_id,
cwd: cwd.clone(),
})
.into_iter()
.collect()
}

View File

@@ -2,9 +2,9 @@ use std::collections::HashSet;
use std::sync::Arc;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::EnvironmentManagerArgs;
use codex_exec_server::ExecServerRuntimePaths;
use codex_login::AuthManager;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
@@ -48,7 +48,11 @@ pub async fn build_prompt_input(
&config,
Arc::clone(&auth_manager),
SessionSource::Exec,
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await),
Arc::new(
EnvironmentManager::from_codex_home(config.codex_home.clone(), local_runtime_paths)
.await
.map_err(|err| CodexErr::Fatal(err.to_string()))?,
),
/*analytics_events_client*/ None,
state_db,
thread_store,

View File

@@ -19,6 +19,7 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadSource;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::user_input::UserInput;
use core_test_support::PathBufExt;
use core_test_support::PathExt;
use core_test_support::responses::mount_models_once;
@@ -354,6 +355,115 @@ async fn start_thread_accepts_explicit_environment_when_default_environment_is_d
assert_eq!(manager.list_thread_ids().await, vec![thread.thread_id]);
}
#[tokio::test]
async fn start_thread_uses_all_default_environments_from_codex_home() {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config().await;
config.codex_home = temp_dir.path().join("codex-home").abs();
config.cwd = config.codex_home.abs();
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
std::fs::write(
config.codex_home.join("environments.toml"),
r#"
default = "dev"
[[environments]]
id = "dev"
program = "ssh"
args = ["dev", "cd /tmp && true"]
"#,
)
.expect("write environments.toml");
let runtime_paths = codex_exec_server::ExecServerRuntimePaths::new(
std::env::current_exe().expect("current exe path"),
/*codex_linux_sandbox_exe*/ None,
)
.expect("runtime paths");
let environment_manager = Arc::new(
codex_exec_server::EnvironmentManager::from_codex_home(
config.codex_home.clone(),
runtime_paths,
)
.await
.expect("environment manager"),
);
assert_eq!(
environment_manager.default_environment_ids(),
vec!["dev".to_string(), "local".to_string()]
);
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
environment_manager,
)
.await;
let thread = manager
.start_thread(config)
.await
.expect("thread should start");
let selections = thread.thread.environment_selections().await;
assert_eq!(
selections,
vec![
TurnEnvironmentSelection {
environment_id: "dev".to_string(),
cwd: thread.session_configured.cwd.abs(),
},
TurnEnvironmentSelection {
environment_id: "local".to_string(),
cwd: thread.session_configured.cwd.abs(),
},
]
);
let prompt_items = crate::prompt_debug::build_prompt_input_from_session(
thread.thread.codex.session.as_ref(),
Vec::<UserInput>::new(),
)
.await
.expect("prompt input");
let environment_context = prompt_items
.iter()
.filter_map(|item| match item {
ResponseItem::Message { content, .. } => Some(content),
_ => None,
})
.flatten()
.find_map(|content| match content {
ContentItem::InputText { text } if text.contains("<environment_context>") => {
Some(text.as_str())
}
_ => None,
})
.expect("environment context prompt item");
assert!(environment_context.contains("<environments>"));
let cwd = thread.session_configured.cwd.display().to_string();
let dev_entry = format!(
r#"<environment id="dev">
<cwd>{cwd}</cwd>
<shell>"#
);
let local_entry = format!(
r#"<environment id="local">
<cwd>{cwd}</cwd>
<shell>"#
);
let dev_position = environment_context
.find(&dev_entry)
.expect("dev environment entry");
let local_position = environment_context
.find(&local_entry)
.expect("local environment entry");
assert!(dev_position < local_position);
assert!(!environment_context.contains("\n <cwd>"));
assert!(!environment_context.contains("\n <shell>"));
}
#[tokio::test]
async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() {
let temp_dir = tempdir().expect("tempdir");

View File

@@ -74,6 +74,7 @@ const SUBMIT_TURN_COMPLETE_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug)]
pub struct TestEnv {
environment: codex_exec_server::Environment,
exec_server_url: Option<String>,
cwd: AbsolutePathBuf,
local_cwd_temp_dir: Option<Arc<TempDir>>,
remote_container_name: Option<String>,
@@ -87,6 +88,7 @@ impl TestEnv {
codex_exec_server::Environment::create_for_tests(/*exec_server_url*/ None)?;
Ok(Self {
environment,
exec_server_url: None,
cwd,
local_cwd_temp_dir: Some(local_cwd_temp_dir),
remote_container_name: None,
@@ -101,10 +103,6 @@ impl TestEnv {
&self.environment
}
pub fn exec_server_url(&self) -> Option<&str> {
self.environment.exec_server_url()
}
fn local_cwd_temp_dir(&self) -> Option<Arc<TempDir>> {
self.local_cwd_temp_dir.clone()
}
@@ -124,7 +122,7 @@ pub async fn test_env() -> Result<TestEnv> {
Some(remote_env) => {
let websocket_url = remote_exec_server_url()?;
let environment =
codex_exec_server::Environment::create_for_tests(Some(websocket_url))?;
codex_exec_server::Environment::create_for_tests(Some(websocket_url.clone()))?;
let cwd = remote_aware_cwd_path();
environment
.get_filesystem()
@@ -136,6 +134,7 @@ pub async fn test_env() -> Result<TestEnv> {
.await?;
Ok(TestEnv {
environment,
exec_server_url: Some(websocket_url),
cwd,
local_cwd_temp_dir: None,
remote_container_name: Some(remote_env.container_name),
@@ -386,7 +385,7 @@ impl TestCodexBuilder {
let exec_server_url = self
.exec_server_url
.clone()
.or_else(|| test_env.exec_server_url().map(str::to_owned));
.or_else(|| test_env.exec_server_url.clone());
let local_runtime_paths = codex_exec_server::ExecServerRuntimePaths::new(
std::env::current_exe()?,
/*codex_linux_sandbox_exe*/ None,

View File

@@ -3,6 +3,9 @@ load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "exec-server",
crate_name = "codex_exec_server",
deps_extra = [
"@crates//:toml",
],
# Keep the crate's integration tests single-threaded under Bazel because
# they install process-global test-binary dispatch state, and the remote
# exec-server cases already rely on serialization around the full CLI path.

View File

@@ -28,6 +28,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true }
tokio = { workspace = true, features = [
"fs",
"io-std",

View File

@@ -17,13 +17,14 @@ use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::debug;
use crate::ProcessId;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::ExecServerTransportParams;
use crate::client_api::HttpClient;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::client_api::StdioExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecProcessEvent;
use crate::process::ExecProcessEventLog;
@@ -105,6 +106,16 @@ impl From<RemoteExecServerConnectArgs> for ExecServerClientConnectOptions {
}
}
impl From<StdioExecServerConnectArgs> for ExecServerClientConnectOptions {
fn from(value: StdioExecServerConnectArgs) -> Self {
Self {
client_name: value.client_name,
initialize_timeout: value.initialize_timeout,
resume_session_id: value.resume_session_id,
}
}
}
impl RemoteExecServerConnectArgs {
pub fn new(websocket_url: String, client_name: String) -> Self {
Self {
@@ -180,29 +191,25 @@ pub struct ExecServerClient {
#[derive(Clone)]
pub(crate) struct LazyRemoteExecServerClient {
websocket_url: String,
transport_params: ExecServerTransportParams,
client: Arc<OnceCell<ExecServerClient>>,
}
impl LazyRemoteExecServerClient {
pub(crate) fn new(websocket_url: String) -> Self {
pub(crate) fn new(transport_params: ExecServerTransportParams) -> Self {
Self {
websocket_url,
transport_params,
client: Arc::new(OnceCell::new()),
}
}
pub(crate) async fn get(&self) -> Result<ExecServerClient, ExecServerError> {
self.client
.get_or_try_init(|| async {
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
websocket_url: self.websocket_url.clone(),
client_name: "codex-environment".to_string(),
connect_timeout: Duration::from_secs(5),
initialize_timeout: Duration::from_secs(5),
resume_session_id: None,
})
.await
// TODO: Add reconnect/disconnect handling here instead of reusing
// the first successfully initialized connection forever.
.get_or_try_init(|| {
let transport_params = self.transport_params.clone();
async move { ExecServerClient::connect_for_transport(transport_params).await }
})
.await
.cloned()
@@ -269,32 +276,6 @@ pub enum ExecServerError {
}
impl ExecServerClient {
pub async fn connect_websocket(
args: RemoteExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
let websocket_url = args.websocket_url.clone();
let connect_timeout = args.connect_timeout;
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
Self::connect(
JsonRpcConnection::from_websocket(
stream,
format!("exec-server websocket {websocket_url}"),
),
args.into(),
)
.await
}
pub async fn initialize(
&self,
options: ExecServerClientConnectOptions,
@@ -443,7 +424,7 @@ impl ExecServerClient {
.clone()
}
async fn connect(
pub(crate) async fn connect(
connection: JsonRpcConnection,
options: ExecServerClientConnectOptions,
) -> Result<Self, ExecServerError> {
@@ -893,18 +874,30 @@ mod tests {
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
#[cfg(unix)]
use std::path::Path;
#[cfg(unix)]
use std::process::Command;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::duplex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Duration;
#[cfg(unix)]
use tokio::time::sleep;
use tokio::time::timeout;
use super::ExecServerClient;
use super::ExecServerClientConnectOptions;
use crate::ProcessId;
#[cfg(not(windows))]
use crate::client_api::ExecServerTransportParams;
use crate::client_api::StdioExecServerCommand;
use crate::client_api::StdioExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecProcessEvent;
use crate::protocol::EXEC_CLOSED_METHOD;
@@ -942,6 +935,191 @@ mod tests {
.expect("json-rpc line should write");
}
#[cfg(not(windows))]
#[tokio::test]
async fn connect_stdio_command_initializes_json_rpc_client() {
let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
command: StdioExecServerCommand {
program: "sh".to_string(),
args: vec![
"-c".to_string(),
"read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(),
],
env: HashMap::new(),
cwd: None,
},
client_name: "stdio-test-client".to_string(),
initialize_timeout: Duration::from_secs(1),
resume_session_id: None,
})
.await
.expect("stdio client should connect");
assert_eq!(client.session_id().as_deref(), Some("stdio-test"));
}
#[cfg(not(windows))]
#[tokio::test]
async fn connect_for_transport_initializes_stdio_command() {
let client = ExecServerClient::connect_for_transport(
ExecServerTransportParams::StdioCommand(StdioExecServerCommand {
program: "sh".to_string(),
args: vec![
"-c".to_string(),
"read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(),
],
env: HashMap::new(),
cwd: None,
}),
)
.await
.expect("stdio transport should connect");
assert_eq!(client.session_id().as_deref(), Some("stdio-test"));
}
#[cfg(windows)]
#[tokio::test]
async fn connect_stdio_command_initializes_json_rpc_client_on_windows() {
let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
command: StdioExecServerCommand {
program: "powershell".to_string(),
args: vec![
"-NoProfile".to_string(),
"-Command".to_string(),
"$null = [Console]::In.ReadLine(); [Console]::Out.WriteLine('{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'); $null = [Console]::In.ReadLine(); Start-Sleep -Seconds 60".to_string(),
],
env: HashMap::new(),
cwd: None,
},
client_name: "stdio-test-client".to_string(),
initialize_timeout: Duration::from_secs(1),
resume_session_id: None,
})
.await
.expect("stdio client should connect");
assert_eq!(client.session_id().as_deref(), Some("stdio-test"));
}
#[cfg(unix)]
#[tokio::test]
async fn dropping_stdio_client_terminates_spawned_process() {
let tempdir = tempfile::tempdir().expect("tempdir should be created");
let pid_file = tempdir.path().join("server.pid");
let child_pid_file = tempdir.path().join("server-child.pid");
let stdio_script = format!(
"read _line; \
echo \"$$\" > {}; \
sleep 60 >/dev/null 2>&1 & echo \"$!\" > {}; \
printf '%s\\n' '{{\"id\":1,\"result\":{{\"sessionId\":\"stdio-test\"}}}}'; \
read _line; \
wait",
shell_quote(pid_file.as_path()),
shell_quote(child_pid_file.as_path()),
);
let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
command: StdioExecServerCommand {
program: "sh".to_string(),
args: vec!["-c".to_string(), stdio_script],
env: HashMap::new(),
cwd: None,
},
client_name: "stdio-test-client".to_string(),
initialize_timeout: Duration::from_secs(1),
resume_session_id: None,
})
.await
.expect("stdio client should connect");
let server_pid = read_pid_file(pid_file.as_path()).await;
let child_pid = read_pid_file(child_pid_file.as_path()).await;
assert!(
process_exists(server_pid),
"spawned stdio process should be running before client drop"
);
assert!(
process_exists(child_pid),
"spawned stdio child process should be running before client drop"
);
drop(client);
wait_for_process_exit(server_pid).await;
wait_for_process_exit(child_pid).await;
}
#[cfg(unix)]
#[tokio::test]
async fn malformed_stdio_message_terminates_spawned_process() {
let tempdir = tempfile::tempdir().expect("tempdir should be created");
let pid_file = tempdir.path().join("server.pid");
let stdio_script = format!(
"read _line; \
echo \"$$\" > {}; \
printf '%s\\n' 'not-json'; \
sleep 60",
shell_quote(pid_file.as_path()),
);
let result = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
command: StdioExecServerCommand {
program: "sh".to_string(),
args: vec!["-c".to_string(), stdio_script],
env: HashMap::new(),
cwd: None,
},
client_name: "stdio-test-client".to_string(),
initialize_timeout: Duration::from_secs(1),
resume_session_id: None,
})
.await;
assert!(result.is_err(), "malformed stdio server should not connect");
let server_pid = read_pid_file(pid_file.as_path()).await;
wait_for_process_exit(server_pid).await;
}
#[cfg(unix)]
async fn read_pid_file(path: &Path) -> u32 {
for _ in 0..20 {
if let Ok(contents) = std::fs::read_to_string(path) {
return contents
.trim()
.parse()
.expect("pid file should contain a pid");
}
sleep(Duration::from_millis(50)).await;
}
panic!("pid file {} should be written", path.display());
}
#[cfg(unix)]
async fn wait_for_process_exit(pid: u32) {
for _ in 0..20 {
if !process_exists(pid) {
return;
}
sleep(Duration::from_millis(100)).await;
}
panic!("process {pid} should exit");
}
#[cfg(unix)]
fn process_exists(pid: u32) -> bool {
Command::new("kill")
.arg("-0")
.arg(pid.to_string())
.status()
.is_ok_and(|status| status.success())
}
#[cfg(unix)]
fn shell_quote(path: &Path) -> String {
let value = path.to_string_lossy();
format!("'{}'", value.replace('\'', "'\\''"))
}
#[tokio::test]
async fn process_events_are_delivered_in_seq_order_when_notifications_are_reordered() {
let (client_stdin, server_reader) = duplex(1 << 20);
@@ -1085,6 +1263,92 @@ mod tests {
server.await.expect("server task should finish");
}
#[tokio::test]
async fn transport_disconnect_fails_sessions_and_rejects_new_sessions() {
let (client_stdin, server_reader) = duplex(1 << 20);
let (mut server_writer, client_stdout) = duplex(1 << 20);
let (disconnect_tx, disconnect_rx) = oneshot::channel();
let server = tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let request = match initialize {
JSONRPCMessage::Request(request) if request.method == INITIALIZE_METHOD => request,
other => panic!("expected initialize request, got {other:?}"),
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id,
result: serde_json::to_value(InitializeResponse {
session_id: "session-1".to_string(),
})
.expect("initialize response should serialize"),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
match initialized {
JSONRPCMessage::Notification(notification)
if notification.method == INITIALIZED_METHOD => {}
other => panic!("expected initialized notification, got {other:?}"),
}
let _ = disconnect_rx.await;
drop(server_writer);
});
let client = ExecServerClient::connect(
JsonRpcConnection::from_stdio(
client_stdout,
client_stdin,
"test-exec-server-client".to_string(),
),
ExecServerClientConnectOptions::default(),
)
.await
.expect("client should connect");
let process_id = ProcessId::from("disconnect");
let session = client
.register_session(&process_id)
.await
.expect("session should register");
let mut events = session.subscribe_events();
disconnect_tx.send(()).expect("disconnect should signal");
let event = timeout(Duration::from_secs(1), events.recv())
.await
.expect("session failure should not time out")
.expect("session event stream should stay open");
let ExecProcessEvent::Failed(message) = event else {
panic!("expected session failure after disconnect, got {event:?}");
};
assert_eq!(message, "exec-server transport disconnected");
let response = session
.read(
/*after_seq*/ None, /*max_bytes*/ None, /*wait_ms*/ None,
)
.await
.expect("disconnected session read should synthesize a response");
assert_eq!(
response.failure.as_deref(),
Some("exec-server transport disconnected")
);
assert!(response.closed);
let new_session = client.register_session(&ProcessId::from("new")).await;
assert!(matches!(
new_session,
Err(super::ExecServerError::Disconnected(_))
));
drop(client);
server.await.expect("server task should finish");
}
#[tokio::test]
async fn wake_notifications_do_not_block_other_sessions() {
let (client_stdin, server_reader) = duplex(1 << 20);

View File

@@ -1,3 +1,5 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use futures::future::BoxFuture;
@@ -25,6 +27,32 @@ pub struct RemoteExecServerConnectArgs {
pub resume_session_id: Option<String>,
}
/// Stdio connection arguments for a command-backed exec-server.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct StdioExecServerConnectArgs {
pub command: StdioExecServerCommand,
pub client_name: String,
pub initialize_timeout: Duration,
pub resume_session_id: Option<String>,
}
/// Structured process command used to start an exec-server over stdio.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct StdioExecServerCommand {
pub program: String,
pub args: Vec<String>,
pub env: HashMap<String, String>,
pub cwd: Option<PathBuf>,
}
/// Parameters used to connect to a remote exec-server environment.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum ExecServerTransportParams {
WebSocketUrl(String),
#[allow(dead_code)]
StdioCommand(StdioExecServerCommand),
}
/// Sends HTTP requests through a runtime-selected transport.
///
/// This is the HTTP capability counterpart to [`crate::ExecBackend`]. Callers

View File

@@ -0,0 +1,127 @@
use std::process::Stdio;
use std::time::Duration;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Command;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::debug;
use tracing::warn;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::client_api::StdioExecServerCommand;
use crate::client_api::StdioExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
const ENVIRONMENT_CLIENT_NAME: &str = "codex-environment";
const ENVIRONMENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const ENVIRONMENT_INITIALIZE_TIMEOUT: Duration = Duration::from_secs(5);
impl ExecServerClient {
pub(crate) async fn connect_for_transport(
transport_params: crate::client_api::ExecServerTransportParams,
) -> Result<Self, ExecServerError> {
match transport_params {
crate::client_api::ExecServerTransportParams::WebSocketUrl(websocket_url) => {
Self::connect_websocket(RemoteExecServerConnectArgs {
websocket_url,
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
connect_timeout: ENVIRONMENT_CONNECT_TIMEOUT,
initialize_timeout: ENVIRONMENT_INITIALIZE_TIMEOUT,
resume_session_id: None,
})
.await
}
crate::client_api::ExecServerTransportParams::StdioCommand(command) => {
Self::connect_stdio_command(StdioExecServerConnectArgs {
command,
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
initialize_timeout: ENVIRONMENT_INITIALIZE_TIMEOUT,
resume_session_id: None,
})
.await
}
}
}
pub async fn connect_websocket(
args: RemoteExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
let websocket_url = args.websocket_url.clone();
let connect_timeout = args.connect_timeout;
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
Self::connect(
JsonRpcConnection::from_websocket(
stream,
format!("exec-server websocket {websocket_url}"),
),
args.into(),
)
.await
}
pub(crate) async fn connect_stdio_command(
args: StdioExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
let mut child = stdio_command_process(&args.command)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(ExecServerError::Spawn)?;
let stdin = child.stdin.take().ok_or_else(|| {
ExecServerError::Protocol("spawned exec-server command has no stdin".to_string())
})?;
let stdout = child.stdout.take().ok_or_else(|| {
ExecServerError::Protocol("spawned exec-server command has no stdout".to_string())
})?;
if let Some(stderr) = child.stderr.take() {
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => debug!("exec-server stdio stderr: {line}"),
Ok(None) => break,
Err(err) => {
warn!("failed to read exec-server stdio stderr: {err}");
break;
}
}
}
});
}
Self::connect(
JsonRpcConnection::from_stdio(stdout, stdin, "exec-server stdio command".to_string())
.with_child_process(child),
args.into(),
)
.await
}
}
fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command {
let mut command = Command::new(&stdio_command.program);
command.args(&stdio_command.args);
command.envs(&stdio_command.env);
if let Some(cwd) = &stdio_command.cwd {
command.current_dir(cwd);
}
#[cfg(unix)]
command.process_group(0);
command
}

View File

@@ -1,12 +1,21 @@
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_app_server_protocol::JSONRPCMessage;
use futures::SinkExt;
use futures::StreamExt;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::process::Child;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Message;
use tracing::debug;
use tracing::warn;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
@@ -14,6 +23,7 @@ use tokio::io::BufReader;
use tokio::io::BufWriter;
pub(crate) const CHANNEL_CAPACITY: usize = 128;
const STDIO_TERMINATION_GRACE_PERIOD: Duration = Duration::from_secs(2);
#[derive(Debug)]
pub(crate) enum JsonRpcConnectionEvent {
@@ -22,11 +32,186 @@ pub(crate) enum JsonRpcConnectionEvent {
Disconnected { reason: Option<String> },
}
#[derive(Clone)]
pub(crate) enum JsonRpcTransport {
Plain,
Stdio { transport: StdioTransport },
}
impl JsonRpcTransport {
fn from_child_process(child_process: Child) -> Self {
Self::Stdio {
transport: StdioTransport::spawn(child_process),
}
}
pub(crate) fn terminate(&self) {
match self {
Self::Plain => {}
Self::Stdio { transport } => transport.terminate(),
}
}
}
#[derive(Clone)]
pub(crate) struct StdioTransport {
handle: Arc<StdioTransportHandle>,
}
struct StdioTransportHandle {
terminate_tx: watch::Sender<bool>,
terminate_requested: AtomicBool,
}
impl StdioTransport {
fn spawn(child_process: Child) -> Self {
let (terminate_tx, terminate_rx) = watch::channel(false);
let handle = Arc::new(StdioTransportHandle {
terminate_tx,
terminate_requested: AtomicBool::new(false),
});
spawn_stdio_child_supervisor(child_process, terminate_rx);
Self { handle }
}
fn terminate(&self) {
self.handle.terminate();
}
}
impl StdioTransportHandle {
fn terminate(&self) {
if !self.terminate_requested.swap(true, Ordering::AcqRel) {
let _ = self.terminate_tx.send(true);
}
}
}
impl Drop for StdioTransportHandle {
fn drop(&mut self) {
self.terminate();
}
}
fn spawn_stdio_child_supervisor(mut child_process: Child, mut terminate_rx: watch::Receiver<bool>) {
let process_group_id = child_process.id();
tokio::spawn(async move {
tokio::select! {
result = child_process.wait() => {
log_stdio_child_wait_result(result);
kill_process_tree(&mut child_process, process_group_id);
}
() = wait_for_stdio_termination(&mut terminate_rx) => {
terminate_stdio_child(&mut child_process, process_group_id).await;
}
}
});
}
async fn wait_for_stdio_termination(terminate_rx: &mut watch::Receiver<bool>) {
loop {
if *terminate_rx.borrow() {
return;
}
if terminate_rx.changed().await.is_err() {
return;
}
}
}
async fn terminate_stdio_child(child_process: &mut Child, process_group_id: Option<u32>) {
terminate_process_tree(child_process, process_group_id);
match timeout(STDIO_TERMINATION_GRACE_PERIOD, child_process.wait()).await {
Ok(result) => {
log_stdio_child_wait_result(result);
}
Err(_) => {
kill_process_tree(child_process, process_group_id);
log_stdio_child_wait_result(child_process.wait().await);
}
}
}
fn terminate_process_tree(child_process: &mut Child, process_group_id: Option<u32>) {
let Some(process_group_id) = process_group_id else {
kill_direct_child(child_process, "terminate");
return;
};
#[cfg(unix)]
if let Err(err) = codex_utils_pty::process_group::terminate_process_group(process_group_id) {
warn!("failed to terminate exec-server stdio process group {process_group_id}: {err}");
kill_direct_child(child_process, "terminate");
}
#[cfg(windows)]
if !kill_windows_process_tree(process_group_id) {
kill_direct_child(child_process, "terminate");
}
#[cfg(not(any(unix, windows)))]
{
let _ = process_group_id;
kill_direct_child(child_process, "terminate");
}
}
fn kill_process_tree(child_process: &mut Child, process_group_id: Option<u32>) {
let Some(process_group_id) = process_group_id else {
kill_direct_child(child_process, "kill");
return;
};
#[cfg(unix)]
if let Err(err) = codex_utils_pty::process_group::kill_process_group(process_group_id) {
warn!("failed to kill exec-server stdio process group {process_group_id}: {err}");
}
#[cfg(windows)]
if !kill_windows_process_tree(process_group_id) {
kill_direct_child(child_process, "kill");
}
#[cfg(not(any(unix, windows)))]
{
let _ = process_group_id;
kill_direct_child(child_process, "kill");
}
}
fn kill_direct_child(child_process: &mut Child, action: &str) {
if let Err(err) = child_process.start_kill() {
debug!("failed to {action} exec-server stdio child: {err}");
}
}
#[cfg(windows)]
fn kill_windows_process_tree(pid: u32) -> bool {
let pid = pid.to_string();
match std::process::Command::new("taskkill")
.args(["/PID", pid.as_str(), "/T", "/F"])
.status()
{
Ok(status) => status.success(),
Err(err) => {
warn!("failed to run taskkill for exec-server stdio process tree {pid}: {err}");
false
}
}
}
fn log_stdio_child_wait_result(result: std::io::Result<std::process::ExitStatus>) {
if let Err(err) = result {
debug!("failed to wait for exec-server stdio child: {err}");
}
}
pub(crate) struct JsonRpcConnection {
outgoing_tx: mpsc::Sender<JSONRPCMessage>,
incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
disconnected_rx: watch::Receiver<bool>,
task_handles: Vec<tokio::task::JoinHandle<()>>,
pub(crate) outgoing_tx: mpsc::Sender<JSONRPCMessage>,
pub(crate) incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
pub(crate) disconnected_rx: watch::Receiver<bool>,
pub(crate) task_handles: Vec<tokio::task::JoinHandle<()>>,
pub(crate) transport: JsonRpcTransport,
}
impl JsonRpcConnection {
@@ -61,13 +246,15 @@ impl JsonRpcConnection {
}
}
Err(err) => {
send_malformed_message(
send_disconnected(
&incoming_tx_for_reader,
&disconnected_tx_for_reader,
Some(format!(
"failed to parse JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
break;
}
}
}
@@ -117,6 +304,7 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
transport: JsonRpcTransport::Plain,
}
}
@@ -251,23 +439,13 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
transport: JsonRpcTransport::Plain,
}
}
pub(crate) fn into_parts(
self,
) -> (
mpsc::Sender<JSONRPCMessage>,
mpsc::Receiver<JsonRpcConnectionEvent>,
watch::Receiver<bool>,
Vec<tokio::task::JoinHandle<()>>,
) {
(
self.outgoing_tx,
self.incoming_rx,
self.disconnected_rx,
self.task_handles,
)
pub(crate) fn with_child_process(mut self, child_process: Child) -> Self {
self.transport = JsonRpcTransport::from_child_process(child_process);
self
}
}

View File

@@ -7,9 +7,13 @@ use crate::ExecutorFileSystem;
use crate::HttpClient;
use crate::client::LazyRemoteExecServerClient;
use crate::client::http_client::ReqwestHttpClient;
use crate::client_api::ExecServerTransportParams;
use crate::environment_provider::DefaultEnvironmentProvider;
use crate::environment_provider::EnvironmentDefault;
use crate::environment_provider::EnvironmentProvider;
use crate::environment_provider::EnvironmentProviderSnapshot;
use crate::environment_provider::normalize_exec_server_url;
use crate::environment_toml::environment_provider_from_codex_home;
use crate::local_file_system::LocalFileSystem;
use crate::local_process::LocalProcess;
use crate::process::ExecBackend;
@@ -20,9 +24,10 @@ pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
/// Owns the execution/filesystem environments available to the Codex runtime.
///
/// `EnvironmentManager` is a shared registry for concrete environments. Its
/// default constructor preserves the legacy `CODEX_EXEC_SERVER_URL` behavior
/// while provider-based construction accepts a provider-supplied snapshot.
/// `EnvironmentManager` is a shared registry for concrete environments.
/// `from_codex_home` preserves the legacy `CODEX_EXEC_SERVER_URL` behavior when
/// no `environments.toml` file is present, while provider-based construction
/// accepts a provider-supplied snapshot.
///
/// Setting `CODEX_EXEC_SERVER_URL=none` disables environment access by leaving
/// the default environment unset while still keeping an explicit local
@@ -31,11 +36,12 @@ pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
/// shell/filesystem tool availability.
///
/// Remote environments create remote filesystem and execution backends that
/// lazy-connect to the configured exec-server on first use. The websocket is
/// not opened when the manager or environment is constructed.
/// lazy-connect to the configured exec-server on first use. The remote
/// transport is not opened when the manager or environment is constructed.
#[derive(Debug)]
pub struct EnvironmentManager {
default_environment: Option<String>,
startup_environment_ids: Vec<String>,
environments: HashMap<String, Arc<Environment>>,
local_environment: Arc<Environment>,
}
@@ -43,24 +49,12 @@ pub struct EnvironmentManager {
pub const LOCAL_ENVIRONMENT_ID: &str = "local";
pub const REMOTE_ENVIRONMENT_ID: &str = "remote";
#[derive(Clone, Debug)]
pub struct EnvironmentManagerArgs {
pub local_runtime_paths: ExecServerRuntimePaths,
}
impl EnvironmentManagerArgs {
pub fn new(local_runtime_paths: ExecServerRuntimePaths) -> Self {
Self {
local_runtime_paths,
}
}
}
impl EnvironmentManager {
/// Builds a test-only manager without configured sandbox helper paths.
pub fn default_for_tests() -> Self {
Self {
default_environment: Some(LOCAL_ENVIRONMENT_ID.to_string()),
startup_environment_ids: vec![LOCAL_ENVIRONMENT_ID.to_string()],
environments: HashMap::from([(
LOCAL_ENVIRONMENT_ID.to_string(),
Arc::new(Environment::default_for_tests()),
@@ -71,9 +65,12 @@ impl EnvironmentManager {
/// Builds a test-only manager with environment access disabled.
pub fn disabled_for_tests(local_runtime_paths: ExecServerRuntimePaths) -> Self {
let mut manager = Self::from_environments(HashMap::new(), local_runtime_paths);
manager.default_environment = None;
manager
Self {
default_environment: None,
startup_environment_ids: Vec::new(),
environments: HashMap::new(),
local_environment: Arc::new(Environment::local(local_runtime_paths)),
}
}
/// Builds a test-only manager from a raw exec-server URL value.
@@ -84,85 +81,116 @@ impl EnvironmentManager {
Self::from_default_provider_url(exec_server_url, local_runtime_paths).await
}
/// Builds a manager from `CODEX_EXEC_SERVER_URL` and local runtime paths
/// used when creating local filesystem helpers.
pub async fn new(args: EnvironmentManagerArgs) -> Self {
let EnvironmentManagerArgs {
local_runtime_paths,
} = args;
let exec_server_url = std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok();
Self::from_default_provider_url(exec_server_url, local_runtime_paths).await
/// Builds a manager from `CODEX_HOME` and local runtime paths used when
/// creating local filesystem helpers.
///
/// If `CODEX_HOME/environments.toml` is present, it defines the configured
/// environments. Otherwise this preserves the legacy
/// `CODEX_EXEC_SERVER_URL` behavior. Callers that ignore user config
/// should use [`Self::from_env`] instead.
pub async fn from_codex_home(
codex_home: impl AsRef<std::path::Path>,
local_runtime_paths: ExecServerRuntimePaths,
) -> Result<Self, ExecServerError> {
let provider = environment_provider_from_codex_home(codex_home.as_ref())?;
Self::from_provider(provider.as_ref(), local_runtime_paths).await
}
/// Builds a manager from the legacy environment-variable provider without
/// reading user config files from `CODEX_HOME`.
pub async fn from_env(
local_runtime_paths: ExecServerRuntimePaths,
) -> Result<Self, ExecServerError> {
let provider = DefaultEnvironmentProvider::from_env();
Self::from_provider(&provider, local_runtime_paths).await
}
async fn from_default_provider_url(
exec_server_url: Option<String>,
local_runtime_paths: ExecServerRuntimePaths,
) -> Self {
let environment_disabled = normalize_exec_server_url(exec_server_url.clone()).1;
let provider = DefaultEnvironmentProvider::new(exec_server_url);
let provider_environments = provider.environments(&local_runtime_paths);
let mut manager = Self::from_environments(provider_environments, local_runtime_paths);
if environment_disabled {
// TODO: Remove this legacy `CODEX_EXEC_SERVER_URL=none` crutch once
// environment attachment defaulting moves out of EnvironmentManager.
manager.default_environment = None;
match Self::from_provider(&provider, local_runtime_paths).await {
Ok(manager) => manager,
Err(err) => panic!("default provider should create valid environments: {err}"),
}
manager
}
/// Builds a manager from a provider-supplied startup snapshot.
pub async fn from_provider<P>(
pub(crate) async fn from_provider<P>(
provider: &P,
local_runtime_paths: ExecServerRuntimePaths,
) -> Result<Self, ExecServerError>
where
P: EnvironmentProvider + ?Sized,
{
Self::from_provider_environments(
provider.get_environments(&local_runtime_paths).await?,
Self::from_provider_snapshot(
provider.snapshot(&local_runtime_paths).await?,
local_runtime_paths,
)
}
fn from_provider_environments(
environments: HashMap<String, Environment>,
fn from_provider_snapshot(
snapshot: EnvironmentProviderSnapshot,
local_runtime_paths: ExecServerRuntimePaths,
) -> Result<Self, ExecServerError> {
for id in environments.keys() {
let EnvironmentProviderSnapshot {
environments,
default,
include_all_environments_by_default,
} = snapshot;
let mut configured_environment_ids = Vec::with_capacity(environments.len());
let mut environment_map = HashMap::with_capacity(environments.len());
for (id, environment) in environments {
if id.is_empty() {
return Err(ExecServerError::Protocol(
"environment id cannot be empty".to_string(),
));
}
if environment_map
.insert(id.clone(), Arc::new(environment))
.is_some()
{
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` is duplicated"
)));
}
configured_environment_ids.push(id);
}
Ok(Self::from_environments(environments, local_runtime_paths))
}
fn from_environments(
environments: HashMap<String, Environment>,
local_runtime_paths: ExecServerRuntimePaths,
) -> Self {
// TODO: Stop deriving a default environment here once omitted
// environment attachment is owned by thread/session setup.
let default_environment = if environments.contains_key(REMOTE_ENVIRONMENT_ID) {
Some(REMOTE_ENVIRONMENT_ID.to_string())
} else if environments.contains_key(LOCAL_ENVIRONMENT_ID) {
Some(LOCAL_ENVIRONMENT_ID.to_string())
} else {
None
let default_environment = match default {
EnvironmentDefault::Disabled => None,
EnvironmentDefault::EnvironmentId(environment_id) => {
if !environment_map.contains_key(&environment_id) {
return Err(ExecServerError::Protocol(format!(
"default environment `{environment_id}` is not configured"
)));
}
Some(environment_id)
}
};
let local_environment = Arc::new(Environment::local(local_runtime_paths));
let environments = environments
.into_iter()
.map(|(id, environment)| (id, Arc::new(environment)))
.collect();
let startup_environment_ids = match default_environment.as_ref() {
None => Vec::new(),
Some(default_environment_id) if include_all_environments_by_default => {
let mut environment_ids = Vec::with_capacity(configured_environment_ids.len());
environment_ids.push(default_environment_id.clone());
environment_ids.extend(
configured_environment_ids
.iter()
.filter(|environment_id| environment_id.as_str() != default_environment_id)
.cloned(),
);
environment_ids
}
Some(default_environment_id) => vec![default_environment_id.clone()],
};
Self {
Ok(Self {
default_environment,
environments,
startup_environment_ids,
environments: environment_map,
local_environment,
}
})
}
/// Returns the default environment instance.
@@ -177,6 +205,11 @@ impl EnvironmentManager {
self.default_environment.as_deref()
}
/// Returns the ordered environment ids used for new thread startup.
pub fn default_environment_ids(&self) -> Vec<String> {
self.startup_environment_ids.clone()
}
/// Returns the local environment instance used for internal runtime work.
pub fn local_environment(&self) -> Arc<Environment> {
Arc::clone(&self.local_environment)
@@ -194,7 +227,7 @@ impl EnvironmentManager {
/// paths used by filesystem helpers.
#[derive(Clone)]
pub struct Environment {
exec_server_url: Option<String>,
remote_transport: Option<ExecServerTransportParams>,
exec_backend: Arc<dyn ExecBackend>,
filesystem: Arc<dyn ExecutorFileSystem>,
http_client: Arc<dyn HttpClient>,
@@ -205,7 +238,7 @@ impl Environment {
/// Builds a test-only local environment without configured sandbox helper paths.
pub fn default_for_tests() -> Self {
Self {
exec_server_url: None,
remote_transport: None,
exec_backend: Arc::new(LocalProcess::default()),
filesystem: Arc::new(LocalFileSystem::unsandboxed()),
http_client: Arc::new(ReqwestHttpClient),
@@ -217,7 +250,7 @@ impl Environment {
impl std::fmt::Debug for Environment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Environment")
.field("exec_server_url", &self.exec_server_url)
.field("exec_server_url", &self.exec_server_url())
.finish_non_exhaustive()
}
}
@@ -260,7 +293,7 @@ impl Environment {
pub(crate) fn local(local_runtime_paths: ExecServerRuntimePaths) -> Self {
Self {
exec_server_url: None,
remote_transport: None,
exec_backend: Arc::new(LocalProcess::default()),
filesystem: Arc::new(LocalFileSystem::with_runtime_paths(
local_runtime_paths.clone(),
@@ -274,13 +307,23 @@ impl Environment {
exec_server_url: String,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
let client = LazyRemoteExecServerClient::new(exec_server_url.clone());
Self::remote_with_transport(
ExecServerTransportParams::WebSocketUrl(exec_server_url),
local_runtime_paths,
)
}
pub(crate) fn remote_with_transport(
transport_params: ExecServerTransportParams,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
let client = LazyRemoteExecServerClient::new(transport_params.clone());
let exec_backend: Arc<dyn ExecBackend> = Arc::new(RemoteProcess::new(client.clone()));
let filesystem: Arc<dyn ExecutorFileSystem> =
Arc::new(RemoteFileSystem::new(client.clone()));
Self {
exec_server_url: Some(exec_server_url),
remote_transport: Some(transport_params),
exec_backend,
filesystem,
http_client: Arc::new(client),
@@ -289,12 +332,15 @@ impl Environment {
}
pub fn is_remote(&self) -> bool {
self.exec_server_url.is_some()
self.remote_transport.is_some()
}
/// Returns the remote exec-server URL when this environment is remote.
pub fn exec_server_url(&self) -> Option<&str> {
self.exec_server_url.as_deref()
pub(crate) fn exec_server_url(&self) -> Option<&str> {
match self.remote_transport.as_ref() {
Some(ExecServerTransportParams::WebSocketUrl(url)) => Some(url.as_str()),
Some(ExecServerTransportParams::StdioCommand(_)) | None => None,
}
}
pub fn local_runtime_paths(&self) -> Option<&ExecServerRuntimePaths> {
@@ -316,17 +362,34 @@ impl Environment {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use super::Environment;
use super::EnvironmentManager;
use super::LOCAL_ENVIRONMENT_ID;
use super::REMOTE_ENVIRONMENT_ID;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::ProcessId;
use crate::environment_provider::EnvironmentDefault;
use crate::environment_provider::EnvironmentProvider;
use crate::environment_provider::EnvironmentProviderSnapshot;
use pretty_assertions::assert_eq;
struct TestEnvironmentProvider {
snapshot: EnvironmentProviderSnapshot,
}
#[async_trait::async_trait]
impl EnvironmentProvider for TestEnvironmentProvider {
async fn snapshot(
&self,
_local_runtime_paths: &ExecServerRuntimePaths,
) -> Result<EnvironmentProviderSnapshot, ExecServerError> {
Ok(self.snapshot.clone())
}
}
fn test_runtime_paths() -> ExecServerRuntimePaths {
ExecServerRuntimePaths::new(
std::env::current_exe().expect("current exe"),
@@ -417,15 +480,21 @@ mod tests {
}
#[tokio::test]
async fn environment_manager_builds_from_provider_environments() {
let manager = EnvironmentManager::from_environments(
HashMap::from([(
REMOTE_ENVIRONMENT_ID.to_string(),
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
.expect("remote environment"),
)]),
test_runtime_paths(),
);
async fn environment_manager_builds_from_provider() {
let provider = TestEnvironmentProvider {
snapshot: EnvironmentProviderSnapshot {
environments: vec![(
REMOTE_ENVIRONMENT_ID.to_string(),
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
.expect("remote environment"),
)],
default: EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string()),
include_all_environments_by_default: false,
},
};
let manager = EnvironmentManager::from_provider(&provider, test_runtime_paths())
.await
.expect("environment manager");
assert_eq!(
manager.default_environment_id(),
@@ -443,11 +512,16 @@ mod tests {
#[tokio::test]
async fn environment_manager_rejects_empty_environment_id() {
let err = EnvironmentManager::from_provider_environments(
HashMap::from([("".to_string(), Environment::default_for_tests())]),
test_runtime_paths(),
)
.expect_err("empty id should fail");
let provider = TestEnvironmentProvider {
snapshot: EnvironmentProviderSnapshot {
environments: vec![("".to_string(), Environment::default_for_tests())],
default: EnvironmentDefault::Disabled,
include_all_environments_by_default: false,
},
};
let err = EnvironmentManager::from_provider(&provider, test_runtime_paths())
.await
.expect_err("empty id should fail");
assert_eq!(
err.to_string(),
@@ -455,6 +529,80 @@ mod tests {
);
}
#[tokio::test]
async fn environment_manager_uses_explicit_provider_default() {
let provider = TestEnvironmentProvider {
snapshot: EnvironmentProviderSnapshot {
environments: vec![
(
LOCAL_ENVIRONMENT_ID.to_string(),
Environment::default_for_tests(),
),
(
"devbox".to_string(),
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
.expect("remote environment"),
),
],
default: EnvironmentDefault::EnvironmentId("devbox".to_string()),
include_all_environments_by_default: true,
},
};
let manager = EnvironmentManager::from_provider(&provider, test_runtime_paths())
.await
.expect("manager");
assert_eq!(manager.default_environment_id(), Some("devbox"));
assert_eq!(
manager.default_environment_ids(),
vec!["devbox".to_string(), LOCAL_ENVIRONMENT_ID.to_string()]
);
assert!(manager.default_environment().expect("default").is_remote());
}
#[tokio::test]
async fn environment_manager_disables_provider_default() {
let provider = TestEnvironmentProvider {
snapshot: EnvironmentProviderSnapshot {
environments: vec![(
LOCAL_ENVIRONMENT_ID.to_string(),
Environment::default_for_tests(),
)],
default: EnvironmentDefault::Disabled,
include_all_environments_by_default: true,
},
};
let manager = EnvironmentManager::from_provider(&provider, test_runtime_paths())
.await
.expect("manager");
assert_eq!(manager.default_environment_id(), None);
assert!(manager.default_environment().is_none());
assert!(manager.get_environment(LOCAL_ENVIRONMENT_ID).is_some());
}
#[tokio::test]
async fn environment_manager_rejects_unknown_provider_default() {
let provider = TestEnvironmentProvider {
snapshot: EnvironmentProviderSnapshot {
environments: vec![(
LOCAL_ENVIRONMENT_ID.to_string(),
Environment::default_for_tests(),
)],
default: EnvironmentDefault::EnvironmentId("missing".to_string()),
include_all_environments_by_default: true,
},
};
let err = EnvironmentManager::from_provider(&provider, test_runtime_paths())
.await
.expect_err("unknown default should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: default environment `missing` is not configured"
);
}
#[tokio::test]
async fn environment_manager_uses_provider_supplied_local_environment() {
let manager = EnvironmentManager::create_for_tests(

View File

@@ -1,5 +1,3 @@
use std::collections::HashMap;
use async_trait::async_trait;
use crate::Environment;
@@ -11,16 +9,30 @@ use crate::environment::REMOTE_ENVIRONMENT_ID;
/// Lists the concrete environments available to Codex.
///
/// Implementations should return the provider-owned startup snapshot that
/// `EnvironmentManager` will cache. Providers that want the local environment to
/// be addressable by id should include it explicitly in the returned map.
/// Implementations own a startup snapshot containing both the available
/// environment list in configured order and the default environment
/// selection. Providers that want the local environment to be addressable by
/// id should include it explicitly in the returned list.
#[async_trait]
pub trait EnvironmentProvider: Send + Sync {
/// Returns the environments available for a new manager.
async fn get_environments(
pub(crate) trait EnvironmentProvider: Send + Sync {
/// Returns the provider-owned environment startup snapshot.
async fn snapshot(
&self,
local_runtime_paths: &ExecServerRuntimePaths,
) -> Result<HashMap<String, Environment>, ExecServerError>;
) -> Result<EnvironmentProviderSnapshot, ExecServerError>;
}
#[derive(Clone, Debug)]
pub(crate) struct EnvironmentProviderSnapshot {
pub environments: Vec<(String, Environment)>,
pub default: EnvironmentDefault,
pub include_all_environments_by_default: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum EnvironmentDefault {
Disabled,
EnvironmentId(String),
}
/// Default provider backed by `CODEX_EXEC_SERVER_URL`.
@@ -40,34 +52,49 @@ impl DefaultEnvironmentProvider {
Self::new(std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok())
}
pub(crate) fn environments(
pub(crate) fn snapshot_inner(
&self,
local_runtime_paths: &ExecServerRuntimePaths,
) -> HashMap<String, Environment> {
let mut environments = HashMap::from([(
) -> EnvironmentProviderSnapshot {
let mut environments = vec![(
LOCAL_ENVIRONMENT_ID.to_string(),
Environment::local(local_runtime_paths.clone()),
)]);
let exec_server_url = normalize_exec_server_url(self.exec_server_url.clone()).0;
)];
let (exec_server_url, disabled) = normalize_exec_server_url(self.exec_server_url.clone());
if let Some(exec_server_url) = exec_server_url {
environments.insert(
environments.push((
REMOTE_ENVIRONMENT_ID.to_string(),
Environment::remote_inner(exec_server_url, Some(local_runtime_paths.clone())),
);
));
}
environments
let has_remote = environments
.iter()
.any(|(id, _environment)| id == REMOTE_ENVIRONMENT_ID);
let default = if disabled {
EnvironmentDefault::Disabled
} else if has_remote {
EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string())
} else {
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
};
EnvironmentProviderSnapshot {
environments,
default,
include_all_environments_by_default: false,
}
}
}
#[async_trait]
impl EnvironmentProvider for DefaultEnvironmentProvider {
async fn get_environments(
async fn snapshot(
&self,
local_runtime_paths: &ExecServerRuntimePaths,
) -> Result<HashMap<String, Environment>, ExecServerError> {
Ok(self.environments(local_runtime_paths))
) -> Result<EnvironmentProviderSnapshot, ExecServerError> {
Ok(self.snapshot_inner(local_runtime_paths))
}
}
@@ -81,6 +108,8 @@ pub(crate) fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Opt
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use pretty_assertions::assert_eq;
use super::*;
@@ -98,10 +127,16 @@ mod tests {
async fn default_provider_returns_local_environment_when_url_is_missing() {
let provider = DefaultEnvironmentProvider::new(/*exec_server_url*/ None);
let runtime_paths = test_runtime_paths();
let environments = provider
.get_environments(&runtime_paths)
let snapshot = provider
.snapshot(&runtime_paths)
.await
.expect("environments");
let EnvironmentProviderSnapshot {
environments,
default,
include_all_environments_by_default,
} = snapshot;
let environments: HashMap<_, _> = environments.into_iter().collect();
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
assert_eq!(
@@ -109,42 +144,72 @@ mod tests {
Some(&runtime_paths)
);
assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID));
assert_eq!(
default,
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
);
assert!(!include_all_environments_by_default);
}
#[tokio::test]
async fn default_provider_returns_local_environment_when_url_is_empty() {
let provider = DefaultEnvironmentProvider::new(Some(String::new()));
let runtime_paths = test_runtime_paths();
let environments = provider
.get_environments(&runtime_paths)
let snapshot = provider
.snapshot(&runtime_paths)
.await
.expect("environments");
let EnvironmentProviderSnapshot {
environments,
default,
include_all_environments_by_default,
} = snapshot;
let environments: HashMap<_, _> = environments.into_iter().collect();
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID));
assert_eq!(
default,
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
);
assert!(!include_all_environments_by_default);
}
#[tokio::test]
async fn default_provider_returns_local_environment_for_none_value() {
let provider = DefaultEnvironmentProvider::new(Some("none".to_string()));
let runtime_paths = test_runtime_paths();
let environments = provider
.get_environments(&runtime_paths)
let snapshot = provider
.snapshot(&runtime_paths)
.await
.expect("environments");
let EnvironmentProviderSnapshot {
environments,
default,
include_all_environments_by_default,
} = snapshot;
let environments: HashMap<_, _> = environments.into_iter().collect();
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID));
assert_eq!(default, EnvironmentDefault::Disabled);
assert!(!include_all_environments_by_default);
}
#[tokio::test]
async fn default_provider_adds_remote_environment_for_websocket_url() {
let provider = DefaultEnvironmentProvider::new(Some("ws://127.0.0.1:8765".to_string()));
let runtime_paths = test_runtime_paths();
let environments = provider
.get_environments(&runtime_paths)
let snapshot = provider
.snapshot(&runtime_paths)
.await
.expect("environments");
let EnvironmentProviderSnapshot {
environments,
default,
include_all_environments_by_default,
} = snapshot;
let environments: HashMap<_, _> = environments.into_iter().collect();
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
let remote_environment = &environments[REMOTE_ENVIRONMENT_ID];
@@ -153,16 +218,22 @@ mod tests {
remote_environment.exec_server_url(),
Some("ws://127.0.0.1:8765")
);
assert_eq!(
default,
EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string())
);
assert!(!include_all_environments_by_default);
}
#[tokio::test]
async fn default_provider_normalizes_exec_server_url() {
let provider = DefaultEnvironmentProvider::new(Some(" ws://127.0.0.1:8765 ".to_string()));
let runtime_paths = test_runtime_paths();
let environments = provider
.get_environments(&runtime_paths)
let snapshot = provider
.snapshot(&runtime_paths)
.await
.expect("environments");
let environments: HashMap<_, _> = snapshot.environments.into_iter().collect();
assert_eq!(
environments[REMOTE_ENVIRONMENT_ID].exec_server_url(),

View File

@@ -0,0 +1,720 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use async_trait::async_trait;
use serde::Deserialize;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use crate::DefaultEnvironmentProvider;
use crate::Environment;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::client_api::ExecServerTransportParams;
use crate::client_api::StdioExecServerCommand;
use crate::environment::LOCAL_ENVIRONMENT_ID;
use crate::environment_provider::EnvironmentDefault;
use crate::environment_provider::EnvironmentProvider;
use crate::environment_provider::EnvironmentProviderSnapshot;
const ENVIRONMENTS_TOML_FILE: &str = "environments.toml";
const MAX_ENVIRONMENT_ID_LEN: usize = 64;
#[derive(Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
struct EnvironmentsToml {
default: Option<String>,
#[serde(default)]
environments: Vec<EnvironmentToml>,
}
#[derive(Deserialize, Debug, Default, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
struct EnvironmentToml {
id: String,
url: Option<String>,
program: Option<String>,
args: Option<Vec<String>>,
env: Option<HashMap<String, String>>,
cwd: Option<PathBuf>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct TomlEnvironmentProvider {
default: EnvironmentDefault,
environments: Vec<(String, ExecServerTransportParams)>,
}
impl TomlEnvironmentProvider {
#[cfg(test)]
fn new(config: EnvironmentsToml) -> Result<Self, ExecServerError> {
Self::new_with_config_dir(config, /*config_dir*/ None)
}
fn new_with_config_dir(
config: EnvironmentsToml,
config_dir: Option<&Path>,
) -> Result<Self, ExecServerError> {
let mut ids = HashSet::from([LOCAL_ENVIRONMENT_ID.to_string()]);
let mut environments = Vec::with_capacity(config.environments.len());
for item in config.environments {
let (id, transport) = parse_environment_toml(item, config_dir)?;
if !ids.insert(id.clone()) {
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` is duplicated"
)));
}
environments.push((id, transport));
}
let default = normalize_default_environment_id(config.default.as_deref(), &ids)?;
Ok(Self {
default,
environments,
})
}
}
#[async_trait]
impl EnvironmentProvider for TomlEnvironmentProvider {
async fn snapshot(
&self,
local_runtime_paths: &ExecServerRuntimePaths,
) -> Result<EnvironmentProviderSnapshot, ExecServerError> {
let mut environments = Vec::with_capacity(self.environments.len() + 1);
environments.push((
LOCAL_ENVIRONMENT_ID.to_string(),
Environment::local(local_runtime_paths.clone()),
));
for (id, transport_params) in &self.environments {
environments.push((
id.clone(),
Environment::remote_with_transport(
transport_params.clone(),
Some(local_runtime_paths.clone()),
),
));
}
Ok(EnvironmentProviderSnapshot {
environments,
default: self.default.clone(),
include_all_environments_by_default: true,
})
}
}
fn parse_environment_toml(
item: EnvironmentToml,
config_dir: Option<&Path>,
) -> Result<(String, ExecServerTransportParams), ExecServerError> {
let EnvironmentToml {
id,
url,
program,
args,
env,
cwd,
} = item;
validate_environment_id(&id)?;
if program.is_none() && (args.is_some() || env.is_some() || cwd.is_some()) {
return Err(ExecServerError::Protocol(format!(
"environment `{id}` args, env, and cwd require program"
)));
}
let transport_params = match (url, program) {
(Some(url), None) => {
let url = validate_websocket_url(url)?;
ExecServerTransportParams::WebSocketUrl(url)
}
(None, Some(program)) => {
let program = program.trim().to_string();
if program.is_empty() {
return Err(ExecServerError::Protocol(format!(
"environment `{id}` program cannot be empty"
)));
}
let cwd = normalize_stdio_cwd(&id, cwd, config_dir)?;
ExecServerTransportParams::StdioCommand(StdioExecServerCommand {
program,
args: args.unwrap_or_default(),
env: env.unwrap_or_default(),
cwd,
})
}
(None, None) | (Some(_), Some(_)) => {
return Err(ExecServerError::Protocol(format!(
"environment `{id}` must set exactly one of url or program"
)));
}
};
Ok((id, transport_params))
}
fn normalize_stdio_cwd(
id: &str,
cwd: Option<PathBuf>,
config_dir: Option<&Path>,
) -> Result<Option<PathBuf>, ExecServerError> {
let Some(cwd) = cwd else {
return Ok(None);
};
if cwd.is_absolute() {
return Ok(Some(cwd));
}
let Some(config_dir) = config_dir else {
return Err(ExecServerError::Protocol(format!(
"environment `{id}` cwd must be absolute"
)));
};
Ok(Some(config_dir.join(cwd)))
}
pub(crate) fn environment_provider_from_codex_home(
codex_home: &Path,
) -> Result<Box<dyn EnvironmentProvider>, ExecServerError> {
let path = codex_home.join(ENVIRONMENTS_TOML_FILE);
if !path.try_exists().map_err(|err| {
ExecServerError::Protocol(format!(
"failed to inspect environment config `{}`: {err}",
path.display()
))
})? {
return Ok(Box::new(DefaultEnvironmentProvider::from_env()));
}
let environments = load_environments_toml(&path)?;
Ok(Box::new(TomlEnvironmentProvider::new_with_config_dir(
environments,
Some(codex_home),
)?))
}
fn normalize_default_environment_id(
default: Option<&str>,
ids: &HashSet<String>,
) -> Result<EnvironmentDefault, ExecServerError> {
let Some(default) = default.map(str::trim) else {
return Ok(EnvironmentDefault::EnvironmentId(
LOCAL_ENVIRONMENT_ID.to_string(),
));
};
if default.is_empty() {
return Err(ExecServerError::Protocol(
"default environment id cannot be empty".to_string(),
));
}
if !default.eq_ignore_ascii_case("none") && !ids.contains(default) {
return Err(ExecServerError::Protocol(format!(
"default environment `{default}` is not configured"
)));
}
if default.eq_ignore_ascii_case("none") {
Ok(EnvironmentDefault::Disabled)
} else {
Ok(EnvironmentDefault::EnvironmentId(default.to_string()))
}
}
fn validate_environment_id(id: &str) -> Result<(), ExecServerError> {
let trimmed_id = id.trim();
if trimmed_id.is_empty() {
return Err(ExecServerError::Protocol(
"environment id cannot be empty".to_string(),
));
}
if trimmed_id != id {
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` must not contain surrounding whitespace"
)));
}
if id == LOCAL_ENVIRONMENT_ID || id.eq_ignore_ascii_case("none") {
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` is reserved"
)));
}
if id.len() > MAX_ENVIRONMENT_ID_LEN {
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` cannot be longer than {MAX_ENVIRONMENT_ID_LEN} characters"
)));
}
if !id
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_')
{
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` must contain only ASCII letters, numbers, '-' or '_'"
)));
}
Ok(())
}
fn validate_websocket_url(url: String) -> Result<String, ExecServerError> {
let url = url.trim();
if url.is_empty() {
return Err(ExecServerError::Protocol(
"environment url cannot be empty".to_string(),
));
}
if !url.starts_with("ws://") && !url.starts_with("wss://") {
return Err(ExecServerError::Protocol(format!(
"environment url `{url}` must use ws:// or wss://"
)));
}
url.into_client_request().map_err(|err| {
ExecServerError::Protocol(format!("environment url `{url}` is invalid: {err}"))
})?;
Ok(url.to_string())
}
fn load_environments_toml(path: &Path) -> Result<EnvironmentsToml, ExecServerError> {
let contents = std::fs::read_to_string(path).map_err(|err| {
ExecServerError::Protocol(format!(
"failed to read environment config `{}`: {err}",
path.display()
))
})?;
toml::from_str(&contents).map_err(|err| {
ExecServerError::Protocol(format!(
"failed to parse environment config `{}`: {err}",
path.display()
))
})
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use tempfile::tempdir;
use super::*;
fn test_runtime_paths() -> ExecServerRuntimePaths {
ExecServerRuntimePaths::new(
std::env::current_exe().expect("current exe"),
/*codex_linux_sandbox_exe*/ None,
)
.expect("runtime paths")
}
#[tokio::test]
async fn toml_provider_adds_implicit_local_and_configured_environments() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
default: Some("ssh-dev".to_string()),
environments: vec![
EnvironmentToml {
id: "devbox".to_string(),
url: Some(" ws://127.0.0.1:8765 ".to_string()),
..Default::default()
},
EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some(" ssh ".to_string()),
args: Some(vec![
"dev".to_string(),
"codex exec-server --listen stdio".to_string(),
]),
env: Some(HashMap::from([(
"CODEX_LOG".to_string(),
"debug".to_string(),
)])),
..Default::default()
},
],
})
.expect("provider");
let runtime_paths = test_runtime_paths();
let snapshot = provider
.snapshot(&runtime_paths)
.await
.expect("environments");
let EnvironmentProviderSnapshot {
environments,
default,
include_all_environments_by_default,
} = snapshot;
let environment_ids: Vec<_> = environments
.iter()
.map(|(id, _environment)| id.as_str())
.collect();
assert_eq!(
environment_ids,
vec![LOCAL_ENVIRONMENT_ID, "devbox", "ssh-dev"]
);
let environments: HashMap<_, _> = environments.into_iter().collect();
assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote());
assert_eq!(
environments["devbox"].exec_server_url(),
Some("ws://127.0.0.1:8765")
);
assert!(environments["ssh-dev"].is_remote());
assert_eq!(environments["ssh-dev"].exec_server_url(), None);
assert_eq!(
default,
EnvironmentDefault::EnvironmentId("ssh-dev".to_string())
);
assert!(include_all_environments_by_default);
}
#[tokio::test]
async fn toml_provider_default_omitted_selects_local() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml::default()).expect("provider");
let snapshot = provider
.snapshot(&test_runtime_paths())
.await
.expect("environments");
assert_eq!(
snapshot.default,
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
);
}
#[tokio::test]
async fn toml_provider_default_none_disables_default() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
default: Some("none".to_string()),
environments: Vec::new(),
})
.expect("provider");
let snapshot = provider
.snapshot(&test_runtime_paths())
.await
.expect("environments");
assert_eq!(snapshot.default, EnvironmentDefault::Disabled);
}
#[test]
fn toml_provider_rejects_invalid_environments() {
let cases = [
(
EnvironmentToml {
id: "local".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment id `local` is reserved",
),
(
EnvironmentToml {
id: " devbox ".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment id ` devbox ` must not contain surrounding whitespace",
),
(
EnvironmentToml {
id: "dev box".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment id `dev box` must contain only ASCII letters, numbers, '-' or '_'",
),
(
EnvironmentToml {
id: "devbox".to_string(),
url: Some("http://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment url `http://127.0.0.1:8765` must use ws:// or wss://",
),
(
EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
program: Some("codex".to_string()),
..Default::default()
},
"environment `devbox` must set exactly one of url or program",
),
(
EnvironmentToml {
id: "devbox".to_string(),
program: Some(" ".to_string()),
..Default::default()
},
"environment `devbox` program cannot be empty",
),
(
EnvironmentToml {
id: "devbox".to_string(),
args: Some(Vec::new()),
..Default::default()
},
"environment `devbox` args, env, and cwd require program",
),
];
for (item, expected) in cases {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![item],
})
.expect_err("invalid item should fail");
assert_eq!(
err.to_string(),
format!("exec-server protocol error: {expected}")
);
}
}
#[test]
fn toml_provider_resolves_relative_stdio_cwd_from_config_dir() {
let config_dir = tempdir().expect("tempdir");
let provider = TomlEnvironmentProvider::new_with_config_dir(
EnvironmentsToml {
default: None,
environments: vec![EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some("ssh".to_string()),
cwd: Some(PathBuf::from("workspace")),
..Default::default()
}],
},
Some(config_dir.path()),
)
.expect("provider");
assert_eq!(
provider.environments[0].1,
ExecServerTransportParams::StdioCommand(StdioExecServerCommand {
program: "ssh".to_string(),
args: Vec::new(),
env: HashMap::new(),
cwd: Some(config_dir.path().join("workspace")),
})
);
}
#[test]
fn toml_provider_rejects_relative_stdio_cwd_without_config_dir() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some("ssh".to_string()),
cwd: Some(PathBuf::from("workspace")),
..Default::default()
}],
})
.expect_err("relative cwd without config dir should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: environment `ssh-dev` cwd must be absolute"
);
}
#[test]
fn toml_provider_rejects_duplicate_ids() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![
EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
},
EnvironmentToml {
id: "devbox".to_string(),
program: Some("codex".to_string()),
..Default::default()
},
],
})
.expect_err("duplicate id should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: environment id `devbox` is duplicated"
);
}
#[test]
fn toml_provider_rejects_overlong_id() {
let id = "a".repeat(MAX_ENVIRONMENT_ID_LEN + 1);
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![EnvironmentToml {
id: id.clone(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
}],
})
.expect_err("overlong id should fail");
assert_eq!(
err.to_string(),
format!(
"exec-server protocol error: environment id `{id}` cannot be longer than {MAX_ENVIRONMENT_ID_LEN} characters"
)
);
}
#[test]
fn toml_provider_rejects_unknown_default() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: Some("missing".to_string()),
environments: Vec::new(),
})
.expect_err("unknown default should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: default environment `missing` is not configured"
);
}
#[test]
fn load_environments_toml_reads_root_environment_list() {
let codex_home = tempdir().expect("tempdir");
let path = codex_home.path().join(ENVIRONMENTS_TOML_FILE);
std::fs::write(
&path,
r#"
default = "ssh-dev"
[[environments]]
id = "devbox"
url = "ws://127.0.0.1:4512"
[[environments]]
id = "ssh-dev"
program = "ssh"
args = ["dev", "codex exec-server --listen stdio"]
cwd = "/tmp"
[environments.env]
CODEX_LOG = "debug"
"#,
)
.expect("write environments.toml");
let environments = load_environments_toml(&path).expect("environments.toml");
assert_eq!(environments.default.as_deref(), Some("ssh-dev"));
assert_eq!(environments.environments.len(), 2);
assert_eq!(environments.environments[0].id, "devbox");
assert_eq!(
environments.environments[1],
EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some("ssh".to_string()),
args: Some(vec![
"dev".to_string(),
"codex exec-server --listen stdio".to_string(),
]),
env: Some(HashMap::from([(
"CODEX_LOG".to_string(),
"debug".to_string(),
)])),
cwd: Some(PathBuf::from("/tmp")),
..Default::default()
}
);
}
#[test]
fn load_environments_toml_rejects_unknown_fields() {
let codex_home = tempdir().expect("tempdir");
let cases = [
("unknown = true\n", "unknown field `unknown`"),
(
r#"
[[environments]]
id = "devbox"
url = "ws://127.0.0.1:4512"
unknown = true
"#,
"unknown field `unknown`",
),
];
for (index, (contents, expected)) in cases.into_iter().enumerate() {
let path = codex_home.path().join(format!("environments-{index}.toml"));
std::fs::write(&path, contents).expect("write environments.toml");
let err = load_environments_toml(&path).expect_err("unknown field should fail");
assert!(
err.to_string().contains(expected),
"expected `{err}` to contain `{expected}`"
);
}
}
#[test]
fn toml_provider_rejects_malformed_websocket_url() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://".to_string()),
..Default::default()
}],
})
.expect_err("malformed websocket url should fail");
assert!(
err.to_string()
.contains("environment url `ws://` is invalid"),
"expected malformed URL error, got `{err}`"
);
}
#[tokio::test]
async fn environment_provider_from_codex_home_uses_present_environments_file() {
let codex_home = tempdir().expect("tempdir");
std::fs::write(
codex_home.path().join(ENVIRONMENTS_TOML_FILE),
r#"
default = "none"
"#,
)
.expect("write environments.toml");
let provider =
environment_provider_from_codex_home(codex_home.path()).expect("environment provider");
let snapshot = provider
.snapshot(&test_runtime_paths())
.await
.expect("environments");
let environment_ids: Vec<_> = snapshot
.environments
.into_iter()
.map(|(id, _environment)| id)
.collect();
assert!(environment_ids.contains(&LOCAL_ENVIRONMENT_ID.to_string()));
assert_eq!(snapshot.default, EnvironmentDefault::Disabled);
}
#[tokio::test]
async fn environment_provider_from_codex_home_falls_back_when_file_is_missing() {
let codex_home = tempdir().expect("tempdir");
let provider =
environment_provider_from_codex_home(codex_home.path()).expect("environment provider");
let snapshot = provider
.snapshot(&test_runtime_paths())
.await
.expect("environments");
let environment_ids: Vec<_> = snapshot
.environments
.into_iter()
.map(|(id, _environment)| id)
.collect();
assert!(environment_ids.contains(&LOCAL_ENVIRONMENT_ID.to_string()));
}
}

View File

@@ -1,8 +1,10 @@
mod client;
mod client_api;
mod client_transport;
mod connection;
mod environment;
mod environment_provider;
mod environment_toml;
mod fs_helper;
mod fs_helper_main;
mod fs_sandbox;
@@ -37,11 +39,9 @@ pub use codex_file_system::RemoveOptions;
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
pub use environment::Environment;
pub use environment::EnvironmentManager;
pub use environment::EnvironmentManagerArgs;
pub use environment::LOCAL_ENVIRONMENT_ID;
pub use environment::REMOTE_ENVIRONMENT_ID;
pub use environment_provider::DefaultEnvironmentProvider;
pub use environment_provider::EnvironmentProvider;
pub use fs_helper::CODEX_FS_HELPER_ARG1;
pub use fs_helper_main::main as run_fs_helper_main;
pub use local_file_system::LOCAL_FS;

View File

@@ -23,6 +23,7 @@ use tokio::task::JoinHandle;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::connection::JsonRpcTransport;
#[derive(Debug)]
pub(crate) enum RpcCallError {
@@ -58,11 +59,9 @@ pub(crate) enum RpcServerOutboundMessage {
request_id: RequestId,
error: JSONRPCErrorError,
},
#[allow(dead_code)]
Notification(JSONRPCNotification),
}
#[allow(dead_code)]
#[derive(Clone)]
pub(crate) struct RpcNotificationSender {
outgoing_tx: mpsc::Sender<RpcServerOutboundMessage>,
@@ -84,7 +83,6 @@ impl RpcNotificationSender {
.map_err(|_| internal_error("RPC connection closed while sending response".into()))
}
#[allow(dead_code)]
pub(crate) async fn notify<P: Serialize>(
&self,
method: &str,
@@ -229,43 +227,55 @@ pub(crate) struct RpcClient {
disconnected_rx: watch::Receiver<bool>,
next_request_id: AtomicI64,
transport_tasks: Vec<JoinHandle<()>>,
transport: JsonRpcTransport,
reader_task: JoinHandle<()>,
}
impl RpcClient {
pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver<RpcClientEvent>) {
let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks) = connection.into_parts();
let JsonRpcConnection {
outgoing_tx: write_tx,
mut incoming_rx,
disconnected_rx,
task_handles: transport_tasks,
transport,
} = connection;
let pending = Arc::new(Mutex::new(HashMap::<RequestId, PendingRequest>::new()));
let (event_tx, event_rx) = mpsc::channel(128);
let pending_for_reader = Arc::clone(&pending);
let transport_for_reader = transport.clone();
let reader_task = tokio::spawn(async move {
while let Some(event) = incoming_rx.recv().await {
let disconnect_reason = loop {
let Some(event) = incoming_rx.recv().await else {
break None;
};
match event {
JsonRpcConnectionEvent::Message(message) => {
if let Err(err) =
handle_server_message(&pending_for_reader, &event_tx, message).await
{
let _ = err;
break;
break None;
}
}
JsonRpcConnectionEvent::MalformedMessage { reason } => {
let _ = reason;
break;
break None;
}
JsonRpcConnectionEvent::Disconnected { reason } => {
let _ = event_tx.send(RpcClientEvent::Disconnected { reason }).await;
drain_pending(&pending_for_reader).await;
return;
break reason;
}
}
}
};
let _ = event_tx
.send(RpcClientEvent::Disconnected { reason: None })
.send(RpcClientEvent::Disconnected {
reason: disconnect_reason,
})
.await;
drain_pending(&pending_for_reader).await;
transport_for_reader.terminate();
});
(
@@ -275,6 +285,7 @@ impl RpcClient {
disconnected_rx,
next_request_id: AtomicI64::new(1),
transport_tasks,
transport,
reader_task,
},
event_rx,
@@ -357,7 +368,6 @@ impl RpcClient {
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) async fn pending_request_count(&self) -> usize {
self.pending.lock().await.len()
}
@@ -365,6 +375,7 @@ impl RpcClient {
impl Drop for RpcClient {
fn drop(&mut self) {
self.transport.terminate();
for task in &self.transport_tasks {
task.abort();
}
@@ -522,7 +533,9 @@ mod tests {
use tokio::io::BufReader;
use tokio::time::timeout;
use super::RpcCallError;
use super::RpcClient;
use super::RpcClientEvent;
use crate::connection::JsonRpcConnection;
async fn read_jsonrpc_line<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> JSONRPCMessage
@@ -565,11 +578,9 @@ mod tests {
async fn rpc_client_matches_out_of_order_responses_by_request_id() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
let (client, _events_rx) = RpcClient::new(JsonRpcConnection::from_stdio(
client_stdout,
client_stdin,
"test-rpc".to_string(),
));
let connection =
JsonRpcConnection::from_stdio(client_stdout, client_stdin, "test-rpc".to_string());
let (client, _events_rx) = RpcClient::new(connection);
let server = tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
@@ -628,4 +639,41 @@ mod tests {
panic!("server task failed: {err}");
}
}
#[tokio::test]
async fn stdio_malformed_message_closes_client_and_rejects_later_calls() {
let (client_stdin, _server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
let connection =
JsonRpcConnection::from_stdio(client_stdout, client_stdin, "test-rpc".to_string());
let (client, mut events_rx) = RpcClient::new(connection);
server_writer
.write_all(b"not-json\n")
.await
.expect("malformed line should write");
let event = timeout(Duration::from_secs(1), events_rx.recv())
.await
.expect("disconnect event should not time out")
.expect("disconnect event should be sent");
let RpcClientEvent::Disconnected { reason } = event else {
panic!("expected disconnect event after malformed stdio message");
};
assert!(
reason
.as_deref()
.is_some_and(|reason| reason.contains("failed to parse JSON-RPC message")),
"disconnect should explain malformed stdio message, got {reason:?}"
);
let result = timeout(
Duration::from_secs(1),
client.call::<_, serde_json::Value>("after-malformed", &serde_json::json!({})),
)
.await
.expect("later call should fail instead of hanging");
assert!(matches!(result, Err(RpcCallError::Closed)));
assert_eq!(client.pending_request_count().await, 0);
}
}

View File

@@ -47,8 +47,13 @@ async fn run_connection(
runtime_paths: ExecServerRuntimePaths,
) {
let router = Arc::new(build_router());
let (json_outgoing_tx, mut incoming_rx, mut disconnected_rx, connection_tasks) =
connection.into_parts();
let JsonRpcConnection {
outgoing_tx: json_outgoing_tx,
mut incoming_rx,
mut disconnected_rx,
task_handles: connection_tasks,
transport: _transport,
} = connection;
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
let notifications = RpcNotificationSender::new(outgoing_tx.clone());

View File

@@ -15,7 +15,6 @@ pub use cli::Command;
pub use cli::ReviewArgs;
use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
use codex_app_server_client::EnvironmentManager;
use codex_app_server_client::EnvironmentManagerArgs;
use codex_app_server_client::ExecServerRuntimePaths;
use codex_app_server_client::InProcessAppServerClient;
use codex_app_server_client::InProcessClientStartArgs;
@@ -509,6 +508,11 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
let state_db = codex_core::init_state_db(&config).await;
let environment_manager = if run_loader_overrides.ignore_user_config {
EnvironmentManager::from_env(local_runtime_paths).await?
} else {
EnvironmentManager::from_codex_home(config.codex_home.clone(), local_runtime_paths).await?
};
let in_process_start_args = InProcessClientStartArgs {
arg0_paths,
config: std::sync::Arc::new(config.clone()),
@@ -518,9 +522,7 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
feedback: CodexFeedback::new(),
log_db: None,
state_db: state_db.clone(),
environment_manager: std::sync::Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await,
),
environment_manager: std::sync::Arc::new(environment_manager),
config_warnings,
session_source: SessionSource::Exec,
enable_codex_api_key_env: true,

View File

@@ -9,7 +9,6 @@ use codex_arg0::Arg0DispatchPaths;
use codex_core::config::Config;
use codex_core::resolve_installation_id;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::EnvironmentManagerArgs;
use codex_exec_server::ExecServerRuntimePaths;
use codex_login::default_client::set_default_client_residency_requirement;
use codex_utils_cli::CliConfigOverrides;
@@ -61,15 +60,6 @@ pub async fn run_main(
arg0_paths: Arg0DispatchPaths,
cli_config_overrides: CliConfigOverrides,
) -> IoResult<()> {
let environment_manager = Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
))
.await,
);
// Parse CLI overrides once and derive the base Config eagerly so later
// components do not need to work with raw TOML values.
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
@@ -84,6 +74,17 @@ pub async fn run_main(
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
})?;
set_default_client_residency_requirement(config.enforce_residency.value());
let environment_manager = Arc::new(
EnvironmentManager::from_codex_home(
config.codex_home.clone(),
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
)
.await
.map_err(std::io::Error::other)?,
);
let otel = codex_core::otel_init::build_provider(
&config,

View File

@@ -20,7 +20,6 @@ use codex_core_api::Config;
use codex_core_api::ConfigLayerStack;
use codex_core_api::Constrained;
use codex_core_api::EnvironmentManager;
use codex_core_api::EnvironmentManagerArgs;
use codex_core_api::EventMsg;
use codex_core_api::ExecServerRuntimePaths;
use codex_core_api::Features;
@@ -118,8 +117,9 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
};
let thread_store = thread_store_from_config(&config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let environment_manager =
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await);
let environment_manager = Arc::new(
EnvironmentManager::from_codex_home(config.codex_home.clone(), local_runtime_paths).await?,
);
let installation_id = resolve_installation_id(&config.codex_home).await?;
let thread_manager = ThreadManager::new(
&config,

View File

@@ -575,6 +575,7 @@ impl App {
) -> crate::chatwidget::ChatWidgetInit {
crate::chatwidget::ChatWidgetInit {
config: cfg,
environment_manager: self.environment_manager.clone(),
frame_requester: tui.frame_requester(),
app_event_tx: self.app_event_tx.clone(),
workspace_command_runner: self.workspace_command_runner.clone(),
@@ -739,6 +740,7 @@ impl App {
.await;
let init = crate::chatwidget::ChatWidgetInit {
config: config.clone(),
environment_manager: environment_manager.clone(),
frame_requester: tui.frame_requester(),
app_event_tx: app_event_tx.clone(),
workspace_command_runner: Some(workspace_command_runner.clone()),
@@ -775,6 +777,7 @@ impl App {
})?;
let init = crate::chatwidget::ChatWidgetInit {
config: config.clone(),
environment_manager: environment_manager.clone(),
frame_requester: tui.frame_requester(),
app_event_tx: app_event_tx.clone(),
workspace_command_runner: Some(workspace_command_runner.clone()),
@@ -816,6 +819,7 @@ impl App {
})?;
let init = crate::chatwidget::ChatWidgetInit {
config: config.clone(),
environment_manager: environment_manager.clone(),
frame_requester: tui.frame_requester(),
app_event_tx: app_event_tx.clone(),
workspace_command_runner: Some(workspace_command_runner.clone()),

View File

@@ -435,6 +435,7 @@ async fn enqueue_primary_thread_session_replays_turns_before_initial_prompt_subm
let model = crate::legacy_core::test_support::get_model_offline(config.model.as_deref());
app.chat_widget = ChatWidget::new_with_app_event(ChatWidgetInit {
config,
environment_manager: app.environment_manager.clone(),
frame_requester: crate::tui::FrameRequester::test_dummy(),
app_event_tx: app.app_event_tx.clone(),
workspace_command_runner: None,
@@ -4825,6 +4826,7 @@ async fn replace_chat_widget_reseeds_collab_agent_metadata_for_replay() {
let replacement = ChatWidget::new_with_app_event(ChatWidgetInit {
config: app.config.clone(),
environment_manager: app.environment_manager.clone(),
frame_requester: crate::tui::FrameRequester::test_dummy(),
app_event_tx: app.app_event_tx.clone(),
workspace_command_runner: None,

View File

@@ -129,6 +129,7 @@ use codex_config::types::ApprovalsReviewer;
use codex_config::types::Notifications;
use codex_config::types::WindowsSandboxModeToml;
use codex_core_skills::model::SkillMetadata;
use codex_exec_server::EnvironmentManager;
use codex_features::FEATURES;
use codex_features::Feature;
#[cfg(test)]
@@ -558,6 +559,7 @@ pub(crate) fn get_limits_duration(windows_minutes: i64) -> String {
/// Common initialization parameters shared by all `ChatWidget` constructors.
pub(crate) struct ChatWidgetInit {
pub(crate) config: Config,
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) frame_requester: FrameRequester,
pub(crate) app_event_tx: AppEventSender,
/// App-server-backed runner used by status surfaces for workspace metadata probes.
@@ -759,6 +761,7 @@ pub(crate) struct ChatWidget {
/// where the overlay may briefly treat new tail content as already cached.
active_cell_revision: u64,
config: Config,
environment_manager: Arc<EnvironmentManager>,
raw_output_mode: bool,
/// Runtime value resolved by core. `config.service_tier` remains the explicit user choice.
effective_service_tier: Option<ServiceTier>,
@@ -4841,6 +4844,7 @@ impl ChatWidget {
fn new_with_op_target(common: ChatWidgetInit, codex_op_target: CodexOpTarget) -> Self {
let ChatWidgetInit {
config,
environment_manager,
frame_requester,
app_event_tx,
workspace_command_runner,
@@ -4929,6 +4933,7 @@ impl ChatWidget {
active_cell_revision: 0,
raw_output_mode: config.tui_raw_output_mode,
config,
environment_manager,
effective_service_tier,
skills_all: Vec::new(),
skills_initial_state: None,
@@ -7138,12 +7143,14 @@ impl ChatWidget {
}
let config = self.config.clone();
let environment_manager = Arc::clone(&self.environment_manager);
let app_event_tx = self.app_event_tx.clone();
tokio::spawn(async move {
let accessible_result =
match connectors::list_accessible_connectors_from_mcp_tools_with_options_and_status(
match connectors::list_accessible_connectors_from_mcp_tools_with_environment_manager(
&config,
force_refetch,
&environment_manager,
)
.await
{

View File

@@ -195,6 +195,7 @@ pub(super) async fn make_chatwidget_manual(
raw_output_mode: cfg.tui_raw_output_mode,
config: cfg,
effective_service_tier,
environment_manager: Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
current_collaboration_mode,
active_collaboration_mask,
has_chatgpt_account: false,

View File

@@ -1536,6 +1536,7 @@ async fn make_startup_chat_with_cli_overrides(
let session_telemetry = test_session_telemetry(&cfg, resolved_model.as_str());
let init = ChatWidgetInit {
config: cfg.clone(),
environment_manager: Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
frame_requester: FrameRequester::test_dummy(),
app_event_tx: AppEventSender::new(unbounded_channel::<AppEvent>().0),
workspace_command_runner: None,

View File

@@ -70,6 +70,7 @@ async fn experimental_mode_plan_is_ignored_on_startup() {
let session_telemetry = test_session_telemetry(&cfg, resolved_model.as_str());
let init = ChatWidgetInit {
config: cfg.clone(),
environment_manager: Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
frame_requester: FrameRequester::test_dummy(),
app_event_tx: AppEventSender::new(unbounded_channel::<AppEvent>().0),
workspace_command_runner: None,

View File

@@ -246,6 +246,7 @@ async fn helpers_are_available_and_do_not_panic() {
let session_telemetry = test_session_telemetry(&cfg, resolved_model.as_str());
let init = ChatWidgetInit {
config: cfg.clone(),
environment_manager: Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
frame_requester: FrameRequester::test_dummy(),
app_event_tx: tx,
workspace_command_runner: None,

View File

@@ -8,7 +8,7 @@ use crate::legacy_core::config::Config;
use crate::legacy_core::config::ConfigBuilder;
use crate::legacy_core::config::ConfigOverrides;
use crate::legacy_core::config::find_codex_home;
use crate::legacy_core::config::load_config_as_toml_with_cli_overrides;
use crate::legacy_core::config::load_config_as_toml_with_cli_and_loader_overrides;
use crate::legacy_core::config::resolve_oss_provider;
use crate::legacy_core::format_exec_policy_error_with_source;
use crate::legacy_core::windows_sandbox::WindowsSandboxLevelExt;
@@ -40,7 +40,6 @@ use codex_config::ConfigLoadError;
use codex_config::LoaderOverrides;
use codex_config::format_config_error_with_source;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::EnvironmentManagerArgs;
use codex_exec_server::ExecServerRuntimePaths;
use codex_login::AuthConfig;
use codex_login::default_client::set_default_client_residency_requirement;
@@ -661,10 +660,10 @@ fn config_cwd_for_app_server_target(
app_server_target: &AppServerTarget,
environment_manager: &EnvironmentManager,
) -> std::io::Result<Option<AbsolutePathBuf>> {
if environment_manager
.default_environment()
.is_some_and(|environment| environment.is_remote())
|| matches!(app_server_target, AppServerTarget::Remote { .. })
if matches!(app_server_target, AppServerTarget::Remote { .. })
|| environment_manager
.default_environment()
.is_some_and(|environment| environment.is_remote())
{
return Ok(None);
}
@@ -678,6 +677,14 @@ fn config_cwd_for_app_server_target(
Ok(Some(cwd))
}
fn should_load_configured_environments(
loader_overrides: &LoaderOverrides,
app_server_target: &AppServerTarget,
) -> bool {
!loader_overrides.ignore_user_config
&& !matches!(app_server_target, AppServerTarget::Remote { .. })
}
fn latest_session_cwd_filter<'a>(
remote_mode: bool,
remote_cwd_override: Option<&'a Path>,
@@ -761,24 +768,28 @@ pub async fn run_main(
}
};
let environment_manager = Arc::new(
EnvironmentManager::new(EnvironmentManagerArgs::new(
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
))
.await,
);
let local_runtime_paths = ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
let environment_manager =
if should_load_configured_environments(&loader_overrides, &app_server_target) {
EnvironmentManager::from_codex_home(codex_home.clone(), local_runtime_paths).await
} else {
EnvironmentManager::from_env(local_runtime_paths).await
}
.map(Arc::new)
.map_err(std::io::Error::other)?;
let cwd = cli.cwd.clone();
let config_cwd =
config_cwd_for_app_server_target(cwd.as_deref(), &app_server_target, &environment_manager)?;
#[allow(clippy::print_stderr)]
let config_toml = match load_config_as_toml_with_cli_overrides(
let config_toml = match load_config_as_toml_with_cli_and_loader_overrides(
&codex_home,
config_cwd.as_ref(),
cli_kv_overrides.clone(),
loader_overrides.clone(),
)
.await
{