diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index fdcd44d01a..e37c9afd22 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -7,7 +7,6 @@ use std::sync::atomic::AtomicBool; use crate::attestation::app_server_attestation_provider; use crate::config_manager::ConfigManager; use crate::connection_rpc_gate::ConnectionRpcGate; -use crate::error_code::internal_error; use crate::error_code::invalid_request; use crate::extensions::guardian_agent_spawner; use crate::extensions::thread_extensions; @@ -166,11 +165,10 @@ pub(crate) struct MessageProcessor { command_exec_processor: CommandExecRequestProcessor, process_exec_processor: ProcessExecRequestProcessor, config_processor: ConfigRequestProcessor, - environment_manager: Arc, environment_processor: EnvironmentRequestProcessor, external_agent_config_processor: ExternalAgentConfigRequestProcessor, feedback_processor: FeedbackRequestProcessor, - fs_processor: Option, + fs_processor: FsRequestProcessor, git_processor: GitRequestProcessor, initialize_processor: InitializeRequestProcessor, marketplace_processor: MarketplaceRequestProcessor, @@ -274,23 +272,6 @@ pub(crate) struct MessageProcessorArgs { } impl MessageProcessor { - fn fs_processor(&self) -> Result<&FsRequestProcessor, JSONRPCErrorError> { - self.fs_processor - .as_ref() - .ok_or_else(|| internal_error("local filesystem is not configured")) - } - - fn require_local_environment(&self) -> Result<(), JSONRPCErrorError> { - // CCA filters these local-only RPCs before they reach app-server, but - // keep a Codex-side backstop so no-local app-server modes fail safely - // if a client still invokes one directly. - self.environment_manager - .try_local_environment() - .is_some() - .then_some(()) - .ok_or_else(|| internal_error("local environment is not configured")) - } - /// Create a new `MessageProcessor`, retaining a handle to the outgoing /// `Sender` so handlers can enqueue messages to be written to stdout. pub(crate) fn new(args: MessageProcessorArgs) -> Self { @@ -375,8 +356,12 @@ impl MessageProcessor { Arc::clone(&config), outgoing.clone(), config_manager.clone(), + Arc::clone(&environment_manager_for_requests), + ); + let process_exec_processor = ProcessExecRequestProcessor::new( + outgoing.clone(), + Arc::clone(&environment_manager_for_requests), ); - let process_exec_processor = ProcessExecRequestProcessor::new(outgoing.clone()); let feedback_processor = FeedbackRequestProcessor::new( auth_manager.clone(), Arc::clone(&thread_manager), @@ -480,17 +465,10 @@ impl MessageProcessor { ); let environment_processor = EnvironmentRequestProcessor::new(thread_manager.environment_manager()); - // `fs/*` is a local-host filesystem surface. Do not construct it when - // the manager intentionally has no local environment. - let fs_processor = thread_manager - .environment_manager() - .try_local_environment() - .map(|environment| { - FsRequestProcessor::new( - environment.get_filesystem(), - FsWatchManager::new(outgoing.clone()), - ) - }); + let fs_processor = FsRequestProcessor::new( + Arc::clone(&environment_manager_for_requests), + FsWatchManager::new(outgoing.clone()), + ); let windows_sandbox_processor = WindowsSandboxRequestProcessor::new( outgoing.clone(), Arc::clone(&config), @@ -505,7 +483,6 @@ impl MessageProcessor { command_exec_processor, process_exec_processor, config_processor, - environment_manager: environment_manager_for_requests, environment_processor, external_agent_config_processor, feedback_processor, @@ -729,9 +706,7 @@ impl MessageProcessor { ) { session_state.rpc_gate.shutdown().await; self.outgoing.connection_closed(connection_id).await; - if let Some(fs_processor) = &self.fs_processor { - fs_processor.connection_closed(connection_id).await; - } + self.fs_processor.connection_closed(connection_id).await; self.command_exec_processor .connection_closed(connection_id) .await; @@ -937,47 +912,47 @@ impl MessageProcessor { self.environment_processor.environment_add(params).await } ClientRequest::FsReadFile { params, .. } => self - .fs_processor()? + .fs_processor .read_file(params) .await .map(|response| Some(response.into())), ClientRequest::FsWriteFile { params, .. } => self - .fs_processor()? + .fs_processor .write_file(params) .await .map(|response| Some(response.into())), ClientRequest::FsCreateDirectory { params, .. } => self - .fs_processor()? + .fs_processor .create_directory(params) .await .map(|response| Some(response.into())), ClientRequest::FsGetMetadata { params, .. } => self - .fs_processor()? + .fs_processor .get_metadata(params) .await .map(|response| Some(response.into())), ClientRequest::FsReadDirectory { params, .. } => self - .fs_processor()? + .fs_processor .read_directory(params) .await .map(|response| Some(response.into())), ClientRequest::FsRemove { params, .. } => self - .fs_processor()? + .fs_processor .remove(params) .await .map(|response| Some(response.into())), ClientRequest::FsCopy { params, .. } => self - .fs_processor()? + .fs_processor .copy(params) .await .map(|response| Some(response.into())), ClientRequest::FsWatch { params, .. } => self - .fs_processor()? + .fs_processor .watch(connection_id, params) .await .map(|response| Some(response.into())), ClientRequest::FsUnwatch { params, .. } => self - .fs_processor()? + .fs_processor .unwatch(connection_id, params) .await .map(|response| Some(response.into())), @@ -1306,7 +1281,6 @@ impl MessageProcessor { .await .map(|response| Some(response.into())), ClientRequest::OneOffCommandExec { params, .. } => { - self.require_local_environment()?; self.command_exec_processor .one_off_command_exec(&request_id, params) .await @@ -1326,13 +1300,11 @@ impl MessageProcessor { .command_exec_terminate(request_id.clone(), params) .await } - ClientRequest::ProcessSpawn { params, .. } => { - self.require_local_environment()?; - self.process_exec_processor - .process_spawn(request_id.clone(), params) - .await - .map(|()| None) - } + ClientRequest::ProcessSpawn { params, .. } => self + .process_exec_processor + .process_spawn(request_id.clone(), params) + .await + .map(|()| None), ClientRequest::ProcessWriteStdin { params, .. } => { self.process_exec_processor .process_write_stdin(request_id.clone(), params) diff --git a/codex-rs/app-server/src/request_processors/command_exec_processor.rs b/codex-rs/app-server/src/request_processors/command_exec_processor.rs index 1219eb17f9..930cc6d18f 100644 --- a/codex-rs/app-server/src/request_processors/command_exec_processor.rs +++ b/codex-rs/app-server/src/request_processors/command_exec_processor.rs @@ -6,6 +6,7 @@ pub(crate) struct CommandExecRequestProcessor { config: Arc, outgoing: Arc, config_manager: ConfigManager, + environment_manager: Arc, command_exec_manager: CommandExecManager, } @@ -15,12 +16,14 @@ impl CommandExecRequestProcessor { config: Arc, outgoing: Arc, config_manager: ConfigManager, + environment_manager: Arc, ) -> Self { Self { arg0_paths, config, outgoing, config_manager, + environment_manager, command_exec_manager: CommandExecManager::default(), } } @@ -30,6 +33,7 @@ impl CommandExecRequestProcessor { request_id: &ConnectionRequestId, params: CommandExecParams, ) -> Result, JSONRPCErrorError> { + self.require_local_environment()?; self.exec_one_off_command(request_id, params) .await .map(|()| None) @@ -74,6 +78,14 @@ impl CommandExecRequestProcessor { .await; } + fn require_local_environment(&self) -> Result<(), JSONRPCErrorError> { + self.environment_manager + .try_local_environment() + .is_some() + .then_some(()) + .ok_or_else(|| internal_error("local environment is not configured")) + } + async fn exec_one_off_command( &self, request_id: &ConnectionRequestId, diff --git a/codex-rs/app-server/src/request_processors/fs_processor.rs b/codex-rs/app-server/src/request_processors/fs_processor.rs index 01b9b20bfd..99a8620b4f 100644 --- a/codex-rs/app-server/src/request_processors/fs_processor.rs +++ b/codex-rs/app-server/src/request_processors/fs_processor.rs @@ -26,6 +26,7 @@ use codex_app_server_protocol::FsWriteFileResponse; use codex_app_server_protocol::JSONRPCErrorError; use codex_exec_server::CopyOptions; use codex_exec_server::CreateDirectoryOptions; +use codex_exec_server::EnvironmentManager; use codex_exec_server::ExecutorFileSystem; use codex_exec_server::RemoveOptions; use std::io; @@ -33,21 +34,28 @@ use std::sync::Arc; #[derive(Clone)] pub(crate) struct FsRequestProcessor { - file_system: Arc, + environment_manager: Arc, fs_watch_manager: FsWatchManager, } impl FsRequestProcessor { pub(crate) fn new( - file_system: Arc, + environment_manager: Arc, fs_watch_manager: FsWatchManager, ) -> Self { Self { - file_system, + environment_manager, fs_watch_manager, } } + fn file_system(&self) -> Result, JSONRPCErrorError> { + self.environment_manager + .try_local_environment() + .map(|environment| environment.get_filesystem()) + .ok_or_else(|| internal_error("local filesystem is not configured")) + } + pub(crate) async fn connection_closed(&self, connection_id: ConnectionId) { self.fs_watch_manager.connection_closed(connection_id).await; } @@ -57,7 +65,7 @@ impl FsRequestProcessor { params: FsReadFileParams, ) -> Result { let bytes = self - .file_system + .file_system()? .read_file(¶ms.path, /*sandbox*/ None) .await .map_err(map_fs_error)?; @@ -75,7 +83,7 @@ impl FsRequestProcessor { "fs/writeFile requires valid base64 dataBase64: {err}" )) })?; - self.file_system + self.file_system()? .write_file(¶ms.path, bytes, /*sandbox*/ None) .await .map_err(map_fs_error)?; @@ -86,7 +94,7 @@ impl FsRequestProcessor { &self, params: FsCreateDirectoryParams, ) -> Result { - self.file_system + self.file_system()? .create_directory( ¶ms.path, CreateDirectoryOptions { @@ -104,7 +112,7 @@ impl FsRequestProcessor { params: FsGetMetadataParams, ) -> Result { let metadata = self - .file_system + .file_system()? .get_metadata(¶ms.path, /*sandbox*/ None) .await .map_err(map_fs_error)?; @@ -122,7 +130,7 @@ impl FsRequestProcessor { params: FsReadDirectoryParams, ) -> Result { let entries = self - .file_system + .file_system()? .read_directory(¶ms.path, /*sandbox*/ None) .await .map_err(map_fs_error)?; @@ -142,7 +150,7 @@ impl FsRequestProcessor { &self, params: FsRemoveParams, ) -> Result { - self.file_system + self.file_system()? .remove( ¶ms.path, RemoveOptions { @@ -160,7 +168,7 @@ impl FsRequestProcessor { &self, params: FsCopyParams, ) -> Result { - self.file_system + self.file_system()? .copy( ¶ms.source_path, ¶ms.destination_path, @@ -179,6 +187,7 @@ impl FsRequestProcessor { connection_id: ConnectionId, params: FsWatchParams, ) -> Result { + self.file_system()?; self.fs_watch_manager.watch(connection_id, params).await } @@ -187,6 +196,7 @@ impl FsRequestProcessor { connection_id: ConnectionId, params: FsUnwatchParams, ) -> Result { + self.file_system()?; self.fs_watch_manager.unwatch(connection_id, params).await } } diff --git a/codex-rs/app-server/src/request_processors/process_exec_processor.rs b/codex-rs/app-server/src/request_processors/process_exec_processor.rs index 5742d0e4d5..0b84c7f7b9 100644 --- a/codex-rs/app-server/src/request_processors/process_exec_processor.rs +++ b/codex-rs/app-server/src/request_processors/process_exec_processor.rs @@ -23,6 +23,7 @@ use codex_app_server_protocol::ServerNotification; use codex_core::exec::ExecExpiration; use codex_core::exec::ExecExpirationOutcome; use codex_core::exec::IO_DRAIN_TIMEOUT_MS; +use codex_exec_server::EnvironmentManager; use codex_protocol::exec_output::bytes_to_string_smart; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP; @@ -48,13 +49,18 @@ const OUTPUT_CHUNK_SIZE_HINT: usize = 64 * 1024; #[derive(Clone)] pub(crate) struct ProcessExecRequestProcessor { outgoing: Arc, + environment_manager: Arc, process_exec_manager: ProcessExecManager, } impl ProcessExecRequestProcessor { - pub(crate) fn new(outgoing: Arc) -> Self { + pub(crate) fn new( + outgoing: Arc, + environment_manager: Arc, + ) -> Self { Self { outgoing, + environment_manager, process_exec_manager: ProcessExecManager::default(), } } @@ -64,6 +70,7 @@ impl ProcessExecRequestProcessor { request_id: ConnectionRequestId, params: ProcessSpawnParams, ) -> Result<(), JSONRPCErrorError> { + self.require_local_environment()?; let ProcessSpawnParams { command, process_handle, @@ -173,6 +180,14 @@ impl ProcessExecRequestProcessor { .connection_closed(connection_id) .await; } + + fn require_local_environment(&self) -> Result<(), JSONRPCErrorError> { + self.environment_manager + .try_local_environment() + .is_some() + .then_some(()) + .ok_or_else(|| internal_error("local environment is not configured")) + } } #[derive(Clone, Default)]