Introduce exec process capability traits

Add a narrow ExecProcess trait for process lifecycle RPCs and expose it
from Environment behind an ExecutorEnvironment trait. Keep the first cut
behavior-preserving by delegating remote mode to the existing
ExecServerClient and returning an unavailable process stub for default
local Environment values.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-19 14:18:10 -07:00
parent 668330acc1
commit 952b7212b3
4 changed files with 126 additions and 0 deletions

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
@@ -26,6 +27,7 @@ use tracing::warn;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::ExecServerEvent;
use crate::client_api::ExecProcess;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::protocol::EXEC_EXITED_METHOD;
@@ -504,6 +506,33 @@ impl ExecServerClient {
}
}
#[async_trait]
impl ExecProcess for ExecServerClient {
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
self.exec(params).await
}
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
self.read(params).await
}
async fn write(
&self,
process_id: &str,
chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError> {
self.write(process_id, chunk).await
}
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
self.terminate(process_id).await
}
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
self.event_receiver()
}
}
impl From<RpcCallError> for ExecServerError {
fn from(value: RpcCallError) -> Self {
match value {

View File

@@ -1,7 +1,17 @@
use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::broadcast;
use crate::client::ExecServerError;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteResponse;
/// Connection options for any exec-server client transport.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -25,3 +35,26 @@ pub enum ExecServerEvent {
OutputDelta(ExecOutputDeltaNotification),
Exited(ExecExitedNotification),
}
/// Process lifecycle capability for an execution environment.
#[async_trait]
pub trait ExecProcess: Send + Sync {
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError>;
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError>;
async fn write(
&self,
process_id: &str,
chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError>;
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError>;
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent>;
}
/// Capability bundle exposed by an execution environment.
pub trait ExecutorEnvironment: Send + Sync {
fn process(&self) -> &(dyn ExecProcess + '_);
}

View File

@@ -1,8 +1,24 @@
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::ExecServerEvent;
use crate::ExecProcess;
use crate::ExecutorEnvironment;
use crate::RemoteExecServerConnectArgs;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteResponse;
use crate::fs;
use crate::fs::ExecutorFileSystem;
use async_trait::async_trait;
use tokio::sync::broadcast;
#[derive(Debug)]
struct UnavailableExecProcess;
static UNAVAILABLE_EXEC_PROCESS: UnavailableExecProcess = UnavailableExecProcess;
#[derive(Clone, Default)]
pub struct Environment {
@@ -56,11 +72,57 @@ impl Environment {
self.remote_exec_server_client.as_ref()
}
pub fn process(&self) -> &(dyn ExecProcess + '_) {
self.remote_exec_server_client
.as_ref()
.map_or(&UNAVAILABLE_EXEC_PROCESS as &dyn ExecProcess, |client| client)
}
pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> {
fs::LocalFileSystem
}
}
impl ExecutorEnvironment for Environment {
fn process(&self) -> &(dyn ExecProcess + '_) {
self.process()
}
}
#[async_trait]
impl ExecProcess for UnavailableExecProcess {
async fn start(&self, _params: ExecParams) -> Result<ExecResponse, ExecServerError> {
Err(unavailable_exec_process_error())
}
async fn read(&self, _params: ReadParams) -> Result<ReadResponse, ExecServerError> {
Err(unavailable_exec_process_error())
}
async fn write(
&self,
_process_id: &str,
_chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError> {
Err(unavailable_exec_process_error())
}
async fn terminate(&self, _process_id: &str) -> Result<TerminateResponse, ExecServerError> {
Err(unavailable_exec_process_error())
}
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
let (_tx, rx) = broadcast::channel(1);
rx
}
}
fn unavailable_exec_process_error() -> ExecServerError {
ExecServerError::Protocol(
"exec process capability is unavailable for a local default Environment".to_string(),
)
}
#[cfg(test)]
mod tests {
use super::Environment;

View File

@@ -11,6 +11,8 @@ pub use client::ExecServerClient;
pub use client::ExecServerError;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::ExecServerEvent;
pub use client_api::ExecProcess;
pub use client_api::ExecutorEnvironment;
pub use client_api::RemoteExecServerConnectArgs;
pub use codex_app_server_protocol::FsCopyParams;
pub use codex_app_server_protocol::FsCopyResponse;