Route local-only app-server gating through processors (#23551)

## Summary
- move local-only app-server gating out of `MessageProcessor`
- let `fs/*`, `command/exec`, and `process/spawn` resolve local
availability inside their owning processors
- keep `fs/*` mounted for the future environment-param path while
preserving current no-local error behavior

## Validation
- not run locally per Codex repo guidance
This commit is contained in:
starr-openai
2026-05-19 14:38:03 -07:00
committed by GitHub
parent 954a9c8579
commit 1509ae6d8d
4 changed files with 73 additions and 64 deletions

View File

@@ -7,7 +7,6 @@ use std::sync::atomic::AtomicBool;
use crate::attestation::app_server_attestation_provider;
use crate::config_manager::ConfigManager;
use crate::connection_rpc_gate::ConnectionRpcGate;
use crate::error_code::internal_error;
use crate::error_code::invalid_request;
use crate::extensions::guardian_agent_spawner;
use crate::extensions::thread_extensions;
@@ -166,11 +165,10 @@ pub(crate) struct MessageProcessor {
command_exec_processor: CommandExecRequestProcessor,
process_exec_processor: ProcessExecRequestProcessor,
config_processor: ConfigRequestProcessor,
environment_manager: Arc<EnvironmentManager>,
environment_processor: EnvironmentRequestProcessor,
external_agent_config_processor: ExternalAgentConfigRequestProcessor,
feedback_processor: FeedbackRequestProcessor,
fs_processor: Option<FsRequestProcessor>,
fs_processor: FsRequestProcessor,
git_processor: GitRequestProcessor,
initialize_processor: InitializeRequestProcessor,
marketplace_processor: MarketplaceRequestProcessor,
@@ -274,23 +272,6 @@ pub(crate) struct MessageProcessorArgs {
}
impl MessageProcessor {
fn fs_processor(&self) -> Result<&FsRequestProcessor, JSONRPCErrorError> {
self.fs_processor
.as_ref()
.ok_or_else(|| internal_error("local filesystem is not configured"))
}
fn require_local_environment(&self) -> Result<(), JSONRPCErrorError> {
// CCA filters these local-only RPCs before they reach app-server, but
// keep a Codex-side backstop so no-local app-server modes fail safely
// if a client still invokes one directly.
self.environment_manager
.try_local_environment()
.is_some()
.then_some(())
.ok_or_else(|| internal_error("local environment is not configured"))
}
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
/// `Sender` so handlers can enqueue messages to be written to stdout.
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
@@ -375,8 +356,12 @@ impl MessageProcessor {
Arc::clone(&config),
outgoing.clone(),
config_manager.clone(),
Arc::clone(&environment_manager_for_requests),
);
let process_exec_processor = ProcessExecRequestProcessor::new(
outgoing.clone(),
Arc::clone(&environment_manager_for_requests),
);
let process_exec_processor = ProcessExecRequestProcessor::new(outgoing.clone());
let feedback_processor = FeedbackRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
@@ -480,17 +465,10 @@ impl MessageProcessor {
);
let environment_processor =
EnvironmentRequestProcessor::new(thread_manager.environment_manager());
// `fs/*` is a local-host filesystem surface. Do not construct it when
// the manager intentionally has no local environment.
let fs_processor = thread_manager
.environment_manager()
.try_local_environment()
.map(|environment| {
FsRequestProcessor::new(
environment.get_filesystem(),
FsWatchManager::new(outgoing.clone()),
)
});
let fs_processor = FsRequestProcessor::new(
Arc::clone(&environment_manager_for_requests),
FsWatchManager::new(outgoing.clone()),
);
let windows_sandbox_processor = WindowsSandboxRequestProcessor::new(
outgoing.clone(),
Arc::clone(&config),
@@ -505,7 +483,6 @@ impl MessageProcessor {
command_exec_processor,
process_exec_processor,
config_processor,
environment_manager: environment_manager_for_requests,
environment_processor,
external_agent_config_processor,
feedback_processor,
@@ -729,9 +706,7 @@ impl MessageProcessor {
) {
session_state.rpc_gate.shutdown().await;
self.outgoing.connection_closed(connection_id).await;
if let Some(fs_processor) = &self.fs_processor {
fs_processor.connection_closed(connection_id).await;
}
self.fs_processor.connection_closed(connection_id).await;
self.command_exec_processor
.connection_closed(connection_id)
.await;
@@ -937,47 +912,47 @@ impl MessageProcessor {
self.environment_processor.environment_add(params).await
}
ClientRequest::FsReadFile { params, .. } => self
.fs_processor()?
.fs_processor
.read_file(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsWriteFile { params, .. } => self
.fs_processor()?
.fs_processor
.write_file(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsCreateDirectory { params, .. } => self
.fs_processor()?
.fs_processor
.create_directory(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsGetMetadata { params, .. } => self
.fs_processor()?
.fs_processor
.get_metadata(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsReadDirectory { params, .. } => self
.fs_processor()?
.fs_processor
.read_directory(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsRemove { params, .. } => self
.fs_processor()?
.fs_processor
.remove(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsCopy { params, .. } => self
.fs_processor()?
.fs_processor
.copy(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsWatch { params, .. } => self
.fs_processor()?
.fs_processor
.watch(connection_id, params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsUnwatch { params, .. } => self
.fs_processor()?
.fs_processor
.unwatch(connection_id, params)
.await
.map(|response| Some(response.into())),
@@ -1306,7 +1281,6 @@ impl MessageProcessor {
.await
.map(|response| Some(response.into())),
ClientRequest::OneOffCommandExec { params, .. } => {
self.require_local_environment()?;
self.command_exec_processor
.one_off_command_exec(&request_id, params)
.await
@@ -1326,13 +1300,11 @@ impl MessageProcessor {
.command_exec_terminate(request_id.clone(), params)
.await
}
ClientRequest::ProcessSpawn { params, .. } => {
self.require_local_environment()?;
self.process_exec_processor
.process_spawn(request_id.clone(), params)
.await
.map(|()| None)
}
ClientRequest::ProcessSpawn { params, .. } => self
.process_exec_processor
.process_spawn(request_id.clone(), params)
.await
.map(|()| None),
ClientRequest::ProcessWriteStdin { params, .. } => {
self.process_exec_processor
.process_write_stdin(request_id.clone(), params)

View File

@@ -6,6 +6,7 @@ pub(crate) struct CommandExecRequestProcessor {
config: Arc<Config>,
outgoing: Arc<OutgoingMessageSender>,
config_manager: ConfigManager,
environment_manager: Arc<EnvironmentManager>,
command_exec_manager: CommandExecManager,
}
@@ -15,12 +16,14 @@ impl CommandExecRequestProcessor {
config: Arc<Config>,
outgoing: Arc<OutgoingMessageSender>,
config_manager: ConfigManager,
environment_manager: Arc<EnvironmentManager>,
) -> Self {
Self {
arg0_paths,
config,
outgoing,
config_manager,
environment_manager,
command_exec_manager: CommandExecManager::default(),
}
}
@@ -30,6 +33,7 @@ impl CommandExecRequestProcessor {
request_id: &ConnectionRequestId,
params: CommandExecParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.require_local_environment()?;
self.exec_one_off_command(request_id, params)
.await
.map(|()| None)
@@ -74,6 +78,14 @@ impl CommandExecRequestProcessor {
.await;
}
fn require_local_environment(&self) -> Result<(), JSONRPCErrorError> {
self.environment_manager
.try_local_environment()
.is_some()
.then_some(())
.ok_or_else(|| internal_error("local environment is not configured"))
}
async fn exec_one_off_command(
&self,
request_id: &ConnectionRequestId,

View File

@@ -26,6 +26,7 @@ use codex_app_server_protocol::FsWriteFileResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_exec_server::CopyOptions;
use codex_exec_server::CreateDirectoryOptions;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::ExecutorFileSystem;
use codex_exec_server::RemoveOptions;
use std::io;
@@ -33,21 +34,28 @@ use std::sync::Arc;
#[derive(Clone)]
pub(crate) struct FsRequestProcessor {
file_system: Arc<dyn ExecutorFileSystem>,
environment_manager: Arc<EnvironmentManager>,
fs_watch_manager: FsWatchManager,
}
impl FsRequestProcessor {
pub(crate) fn new(
file_system: Arc<dyn ExecutorFileSystem>,
environment_manager: Arc<EnvironmentManager>,
fs_watch_manager: FsWatchManager,
) -> Self {
Self {
file_system,
environment_manager,
fs_watch_manager,
}
}
fn file_system(&self) -> Result<Arc<dyn ExecutorFileSystem>, JSONRPCErrorError> {
self.environment_manager
.try_local_environment()
.map(|environment| environment.get_filesystem())
.ok_or_else(|| internal_error("local filesystem is not configured"))
}
pub(crate) async fn connection_closed(&self, connection_id: ConnectionId) {
self.fs_watch_manager.connection_closed(connection_id).await;
}
@@ -57,7 +65,7 @@ impl FsRequestProcessor {
params: FsReadFileParams,
) -> Result<FsReadFileResponse, JSONRPCErrorError> {
let bytes = self
.file_system
.file_system()?
.read_file(&params.path, /*sandbox*/ None)
.await
.map_err(map_fs_error)?;
@@ -75,7 +83,7 @@ impl FsRequestProcessor {
"fs/writeFile requires valid base64 dataBase64: {err}"
))
})?;
self.file_system
self.file_system()?
.write_file(&params.path, bytes, /*sandbox*/ None)
.await
.map_err(map_fs_error)?;
@@ -86,7 +94,7 @@ impl FsRequestProcessor {
&self,
params: FsCreateDirectoryParams,
) -> Result<FsCreateDirectoryResponse, JSONRPCErrorError> {
self.file_system
self.file_system()?
.create_directory(
&params.path,
CreateDirectoryOptions {
@@ -104,7 +112,7 @@ impl FsRequestProcessor {
params: FsGetMetadataParams,
) -> Result<FsGetMetadataResponse, JSONRPCErrorError> {
let metadata = self
.file_system
.file_system()?
.get_metadata(&params.path, /*sandbox*/ None)
.await
.map_err(map_fs_error)?;
@@ -122,7 +130,7 @@ impl FsRequestProcessor {
params: FsReadDirectoryParams,
) -> Result<FsReadDirectoryResponse, JSONRPCErrorError> {
let entries = self
.file_system
.file_system()?
.read_directory(&params.path, /*sandbox*/ None)
.await
.map_err(map_fs_error)?;
@@ -142,7 +150,7 @@ impl FsRequestProcessor {
&self,
params: FsRemoveParams,
) -> Result<FsRemoveResponse, JSONRPCErrorError> {
self.file_system
self.file_system()?
.remove(
&params.path,
RemoveOptions {
@@ -160,7 +168,7 @@ impl FsRequestProcessor {
&self,
params: FsCopyParams,
) -> Result<FsCopyResponse, JSONRPCErrorError> {
self.file_system
self.file_system()?
.copy(
&params.source_path,
&params.destination_path,
@@ -179,6 +187,7 @@ impl FsRequestProcessor {
connection_id: ConnectionId,
params: FsWatchParams,
) -> Result<FsWatchResponse, JSONRPCErrorError> {
self.file_system()?;
self.fs_watch_manager.watch(connection_id, params).await
}
@@ -187,6 +196,7 @@ impl FsRequestProcessor {
connection_id: ConnectionId,
params: FsUnwatchParams,
) -> Result<FsUnwatchResponse, JSONRPCErrorError> {
self.file_system()?;
self.fs_watch_manager.unwatch(connection_id, params).await
}
}

View File

@@ -23,6 +23,7 @@ use codex_app_server_protocol::ServerNotification;
use codex_core::exec::ExecExpiration;
use codex_core::exec::ExecExpirationOutcome;
use codex_core::exec::IO_DRAIN_TIMEOUT_MS;
use codex_exec_server::EnvironmentManager;
use codex_protocol::exec_output::bytes_to_string_smart;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
@@ -48,13 +49,18 @@ const OUTPUT_CHUNK_SIZE_HINT: usize = 64 * 1024;
#[derive(Clone)]
pub(crate) struct ProcessExecRequestProcessor {
outgoing: Arc<OutgoingMessageSender>,
environment_manager: Arc<EnvironmentManager>,
process_exec_manager: ProcessExecManager,
}
impl ProcessExecRequestProcessor {
pub(crate) fn new(outgoing: Arc<OutgoingMessageSender>) -> Self {
pub(crate) fn new(
outgoing: Arc<OutgoingMessageSender>,
environment_manager: Arc<EnvironmentManager>,
) -> Self {
Self {
outgoing,
environment_manager,
process_exec_manager: ProcessExecManager::default(),
}
}
@@ -64,6 +70,7 @@ impl ProcessExecRequestProcessor {
request_id: ConnectionRequestId,
params: ProcessSpawnParams,
) -> Result<(), JSONRPCErrorError> {
self.require_local_environment()?;
let ProcessSpawnParams {
command,
process_handle,
@@ -173,6 +180,14 @@ impl ProcessExecRequestProcessor {
.connection_closed(connection_id)
.await;
}
fn require_local_environment(&self) -> Result<(), JSONRPCErrorError> {
self.environment_manager
.try_local_environment()
.is_some()
.then_some(())
.ok_or_else(|| internal_error("local environment is not configured"))
}
}
#[derive(Clone, Default)]