diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 113d82c94c..718cd40332 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3280,6 +3280,13 @@ "null" ] }, + "environmentId": { + "description": "Optional environment id to use for this thread. Supported values are `\"local\"` and `\"remote\"`. When omitted, the default environment is used.", + "type": [ + "string", + "null" + ] + }, "ephemeral": { "type": [ "boolean", diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index e87fa17928..745e3712e3 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -14583,6 +14583,13 @@ "null" ] }, + "environmentId": { + "description": "Optional environment id to use for this thread. Supported values are `\"local\"` and `\"remote\"`. When omitted, the default environment is used.", + "type": [ + "string", + "null" + ] + }, "ephemeral": { "type": [ "boolean", diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 8254ad0127..ee2423992f 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -12427,6 +12427,13 @@ "null" ] }, + "environmentId": { + "description": "Optional environment id to use for this thread. Supported values are `\"local\"` and `\"remote\"`. When omitted, the default environment is used.", + "type": [ + "string", + "null" + ] + }, "ephemeral": { "type": [ "boolean", diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json index 21f4d7ef74..c8af2a2364 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json @@ -157,6 +157,13 @@ "null" ] }, + "environmentId": { + "description": "Optional environment id to use for this thread. Supported values are `\"local\"` and `\"remote\"`. When omitted, the default environment is used.", + "type": [ + "string", + "null" + ] + }, "ephemeral": { "type": [ "boolean", diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts index 904487e81c..96da5fa0c2 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts @@ -14,6 +14,11 @@ export type ThreadStartParams = {model?: string | null, modelProvider?: string | * and subsequent turns. */ approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, serviceName?: string | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, ephemeral?: boolean | null, sessionStartSource?: ThreadStartSource | null, /** + * Optional environment id to use for this thread. Supported values are + * `"local"` and `"remote"`. When omitted, the default environment is + * used. + */ +environmentId?: string | null, /** * If true, opt into emitting raw Responses API items on the event stream. * This is for internal use only (e.g. Codex Cloud). */ diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 5173549326..88ef34777a 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -2722,6 +2722,11 @@ pub struct ThreadStartParams { pub ephemeral: Option, #[ts(optional = nullable)] pub session_start_source: Option, + /// Optional environment id to use for this thread. Supported values are + /// `"local"` and `"remote"`. When omitted, the default environment is + /// used. + #[ts(optional = nullable)] + pub environment_id: Option, #[experimental("thread/start.dynamicTools")] #[ts(optional = nullable)] pub dynamic_tools: Option>, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 410f22ece8..d297c04203 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2267,6 +2267,7 @@ impl CodexMessageProcessor { personality, ephemeral, session_start_source, + environment_id, persist_extended_history, } = params; let mut typesafe_overrides = self.build_thread_config_overrides( @@ -2314,6 +2315,7 @@ impl CodexMessageProcessor { service_name, experimental_raw_events, request_trace, + environment_id, ) .await; }; @@ -2390,6 +2392,7 @@ impl CodexMessageProcessor { service_name: Option, experimental_raw_events: bool, request_trace: Option, + environment_id: Option, ) { let requested_cwd = typesafe_overrides.cwd.clone(); let mut config = match derive_config_from_params( @@ -2538,6 +2541,7 @@ impl CodexMessageProcessor { persist_extended_history, service_name, request_trace, + environment_id, ) .instrument(tracing::info_span!( "app_server.thread_start.create_thread", diff --git a/codex-rs/app-server/tests/suite/v2/skills_list.rs b/codex-rs/app-server/tests/suite/v2/skills_list.rs index 8675b3a429..38ffe4d1a7 100644 --- a/codex-rs/app-server/tests/suite/v2/skills_list.rs +++ b/codex-rs/app-server/tests/suite/v2/skills_list.rs @@ -322,6 +322,7 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<( personality: None, ephemeral: None, session_start_source: None, + environment_id: None, dynamic_tools: None, mock_experimental_field: None, experimental_raw_events: false, diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs index ab514252e4..10ec30e778 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_start.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -45,6 +45,7 @@ use super::analytics::thread_initialized_event; use super::analytics::wait_for_analytics_payload; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL"; #[tokio::test] async fn thread_start_creates_thread_and_emits_started() -> Result<()> { @@ -163,6 +164,108 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_start_accepts_explicit_local_environment_when_default_is_remote() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + + let codex_home = TempDir::new()?; + create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new_with_env( + codex_home.path(), + &[(CODEX_EXEC_SERVER_URL_ENV_VAR, Some("ws://127.0.0.1:1"))], + ) + .await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let req_id = mcp + .send_thread_start_request(ThreadStartParams { + environment_id: Some("local".to_string()), + ..Default::default() + }) + .await?; + + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(resp)?; + + assert!(!thread.id.is_empty(), "thread id should not be empty"); + Ok(()) +} + +#[tokio::test] +async fn thread_start_rejects_explicit_remote_environment_when_not_configured() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + + let codex_home = TempDir::new()?; + create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let req_id = mcp + .send_thread_start_request(ThreadStartParams { + environment_id: Some("remote".to_string()), + ..Default::default() + }) + .await?; + + let err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(req_id)), + ) + .await??; + + assert!( + err.error + .message + .contains("remote environment is not configured"), + "unexpected error message: {}", + err.error.message + ); + Ok(()) +} + +#[tokio::test] +async fn thread_start_rejects_explicit_environment_when_disabled() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + + let codex_home = TempDir::new()?; + create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new_with_env( + codex_home.path(), + &[(CODEX_EXEC_SERVER_URL_ENV_VAR, Some("none"))], + ) + .await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let req_id = mcp + .send_thread_start_request(ThreadStartParams { + environment_id: Some("local".to_string()), + ..Default::default() + }) + .await?; + + let err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(req_id)), + ) + .await??; + + assert!( + err.error + .message + .contains("environments are disabled for this session"), + "unexpected error message: {}", + err.error.message + ); + Ok(()) +} + #[tokio::test] async fn thread_start_response_includes_loaded_instruction_sources() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 844d585be5..abcb23f314 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -430,6 +430,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) auth_manager: Arc, pub(crate) models_manager: Arc, pub(crate) environment_manager: Arc, + pub(crate) environment_id: Option, pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, @@ -483,6 +484,7 @@ impl Codex { auth_manager, models_manager, environment_manager, + environment_id, skills_manager, plugins_manager, mcp_manager, @@ -503,7 +505,7 @@ impl Codex { let (tx_event, rx_event) = async_channel::unbounded(); let environment = environment_manager - .current() + .environment(environment_id.as_deref()) .await .map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?; let fs = environment diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 9cd58e044f..eda1c13d18 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -82,6 +82,7 @@ pub(crate) async fn run_codex_thread_interactive( environment_manager: Arc::new(EnvironmentManager::from_environment( parent_ctx.environment.as_deref(), )), + environment_id: None, skills_manager: Arc::clone(&parent_session.services.skills_manager), plugins_manager: Arc::clone(&parent_session.services.plugins_manager), mcp_manager: Arc::clone(&parent_session.services.mcp_manager), diff --git a/codex-rs/core/src/codex_tests_guardian.rs b/codex-rs/core/src/codex_tests_guardian.rs index cad67dcc8f..2de4ed613f 100644 --- a/codex-rs/core/src/codex_tests_guardian.rs +++ b/codex-rs/core/src/codex_tests_guardian.rs @@ -435,6 +435,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { auth_manager, models_manager, environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)), + environment_id: None, skills_manager, plugins_manager, mcp_manager, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index b8ad75b49e..9b37d61c67 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -488,6 +488,7 @@ impl ThreadManager { persist_extended_history, /*metrics_service_name*/ None, /*parent_trace*/ None, + /*environment_id*/ None, )) .await } @@ -500,6 +501,7 @@ impl ThreadManager { persist_extended_history: bool, metrics_service_name: Option, parent_trace: Option, + environment_id: Option, ) -> CodexResult { Box::pin(self.state.spawn_thread( config, @@ -511,6 +513,7 @@ impl ThreadManager { metrics_service_name, parent_trace, /*user_shell_override*/ None, + environment_id, )) .await } @@ -551,6 +554,7 @@ impl ThreadManager { /*metrics_service_name*/ None, parent_trace, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -570,6 +574,7 @@ impl ThreadManager { /*metrics_service_name*/ None, /*parent_trace*/ None, /*user_shell_override*/ Some(user_shell_override), + /*environment_id*/ None, )) .await } @@ -592,6 +597,7 @@ impl ThreadManager { /*metrics_service_name*/ None, /*parent_trace*/ None, /*user_shell_override*/ Some(user_shell_override), + /*environment_id*/ None, )) .await } @@ -700,6 +706,7 @@ impl ThreadManager { /*metrics_service_name*/ None, parent_trace, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -801,6 +808,7 @@ impl ThreadManagerState { inherited_exec_policy, /*parent_trace*/ None, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -828,6 +836,7 @@ impl ThreadManagerState { inherited_exec_policy, /*parent_trace*/ None, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -856,6 +865,7 @@ impl ThreadManagerState { inherited_exec_policy, /*parent_trace*/ None, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -873,6 +883,7 @@ impl ThreadManagerState { metrics_service_name: Option, parent_trace: Option, user_shell_override: Option, + environment_id: Option, ) -> CodexResult { Box::pin(self.spawn_thread_with_source( config, @@ -887,6 +898,7 @@ impl ThreadManagerState { /*inherited_exec_policy*/ None, parent_trace, user_shell_override, + environment_id, )) .await } @@ -906,10 +918,11 @@ impl ThreadManagerState { inherited_exec_policy: Option>, parent_trace: Option, user_shell_override: Option, + environment_id: Option, ) -> CodexResult { let environment = self .environment_manager - .current() + .environment(environment_id.as_deref()) .await .map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?; let watch_registration = match environment.as_ref() { @@ -932,6 +945,7 @@ impl ThreadManagerState { auth_manager, models_manager: Arc::clone(&self.models_manager), environment_manager: Arc::clone(&self.environment_manager), + environment_id, skills_manager: Arc::clone(&self.skills_manager), plugins_manager: Arc::clone(&self.plugins_manager), mcp_manager: Arc::clone(&self.mcp_manager), diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index afe0720196..a5efc6925f 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -15,6 +15,15 @@ use crate::remote_process::RemoteProcess; pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL"; +const LOCAL_ENVIRONMENT_ID: &str = "local"; +const REMOTE_ENVIRONMENT_ID: &str = "remote"; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum ExplicitEnvironmentId { + Local, + Remote, +} + /// Lazily creates and caches the active environment for a session. /// /// The manager keeps the session's environment selection stable so subagents @@ -25,6 +34,8 @@ pub struct EnvironmentManager { local_runtime_paths: Option, disabled: bool, current_environment: OnceCell>>, + local_environment: OnceCell>, + remote_environment: OnceCell>, } impl Default for EnvironmentManager { @@ -51,6 +62,8 @@ impl EnvironmentManager { local_runtime_paths, disabled, current_environment: OnceCell::new(), + local_environment: OnceCell::new(), + remote_environment: OnceCell::new(), } } @@ -79,12 +92,16 @@ impl EnvironmentManager { local_runtime_paths: environment.local_runtime_paths().cloned(), disabled: false, current_environment: OnceCell::new(), + local_environment: OnceCell::new(), + remote_environment: OnceCell::new(), }, None => Self { exec_server_url: None, local_runtime_paths: None, disabled: true, current_environment: OnceCell::new(), + local_environment: OnceCell::new(), + remote_environment: OnceCell::new(), }, } } @@ -119,6 +136,62 @@ impl EnvironmentManager { .map(Option::as_ref) .map(std::option::Option::<&Arc>::cloned) } + + pub async fn environment( + &self, + environment_id: Option<&str>, + ) -> Result>, ExecServerError> { + match parse_environment_id(environment_id)? { + None => self.current().await, + Some(ExplicitEnvironmentId::Local) => self.local_environment().await.map(Some), + Some(ExplicitEnvironmentId::Remote) => self.remote_environment().await.map(Some), + } + } + + async fn local_environment(&self) -> Result, ExecServerError> { + if self.disabled { + return Err(ExecServerError::Protocol( + "environments are disabled for this session".to_string(), + )); + } + + self.local_environment + .get_or_try_init(|| async { + Environment::create_with_runtime_paths( + /*exec_server_url*/ None, + self.local_runtime_paths.clone(), + ) + .await + .map(Arc::new) + }) + .await + .map(Arc::clone) + } + + async fn remote_environment(&self) -> Result, ExecServerError> { + if self.disabled { + return Err(ExecServerError::Protocol( + "environments are disabled for this session".to_string(), + )); + } + let Some(exec_server_url) = &self.exec_server_url else { + return Err(ExecServerError::Protocol( + "remote environment is not configured".to_string(), + )); + }; + + self.remote_environment + .get_or_try_init(|| async { + Environment::create_with_runtime_paths( + Some(exec_server_url.clone()), + self.local_runtime_paths.clone(), + ) + .await + .map(Arc::new) + }) + .await + .map(Arc::clone) + } } /// Concrete execution/filesystem environment selected for a session. @@ -236,6 +309,23 @@ fn normalize_exec_server_url(exec_server_url: Option) -> (Option Some(url) => (Some(url.to_string()), false), } } + +fn parse_environment_id( + environment_id: Option<&str>, +) -> Result, ExecServerError> { + match environment_id.map(str::trim) { + None | Some("") => Ok(None), + Some(environment_id) if environment_id.eq_ignore_ascii_case(LOCAL_ENVIRONMENT_ID) => { + Ok(Some(ExplicitEnvironmentId::Local)) + } + Some(environment_id) if environment_id.eq_ignore_ascii_case(REMOTE_ENVIRONMENT_ID) => { + Ok(Some(ExplicitEnvironmentId::Remote)) + } + Some(environment_id) => Err(ExecServerError::Protocol(format!( + "unknown environment id: {environment_id}" + ))), + } +} #[cfg(test)] mod tests { use std::sync::Arc; @@ -333,6 +423,65 @@ mod tests { ); } + #[tokio::test] + async fn environment_manager_explicit_local_selection_bypasses_remote_default() { + let manager = EnvironmentManager::new(Some("ws://127.0.0.1:8765".to_string())); + + let environment = manager + .environment(Some("local")) + .await + .expect("get explicit local environment") + .expect("local environment"); + + assert!(!environment.is_remote()); + assert_eq!(environment.exec_server_url(), None); + } + + #[tokio::test] + async fn environment_manager_rejects_remote_selection_when_not_configured() { + let manager = EnvironmentManager::new(/*exec_server_url*/ None); + + let err = manager + .environment(Some("remote")) + .await + .expect_err("remote selection should fail"); + + assert_eq!( + err.to_string(), + "exec-server protocol error: remote environment is not configured" + ); + } + + #[tokio::test] + async fn environment_manager_rejects_explicit_selection_when_disabled() { + let manager = EnvironmentManager::new(Some("none".to_string())); + + let err = manager + .environment(Some("local")) + .await + .expect_err("explicit local selection should fail"); + + assert_eq!( + err.to_string(), + "exec-server protocol error: environments are disabled for this session" + ); + } + + #[tokio::test] + async fn environment_manager_rejects_unknown_environment_id() { + let manager = EnvironmentManager::new(/*exec_server_url*/ None); + + let err = manager + .environment(Some("mystery")) + .await + .expect_err("unknown environment should fail"); + + assert_eq!( + err.to_string(), + "exec-server protocol error: unknown environment id: mystery" + ); + } + #[tokio::test] async fn default_environment_has_ready_local_executor() { let environment = Environment::default();