diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index a7680e73e8..6a5bbbf7a2 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use codex_app_server_protocol::FsCopyParams; use codex_app_server_protocol::FsCopyResponse; use codex_app_server_protocol::FsCreateDirectoryParams; @@ -26,6 +27,7 @@ use tracing::warn; use crate::client_api::ExecServerClientConnectOptions; use crate::client_api::ExecServerEvent; +use crate::client_api::ExecProcess; use crate::client_api::RemoteExecServerConnectArgs; use crate::connection::JsonRpcConnection; use crate::protocol::EXEC_EXITED_METHOD; @@ -504,6 +506,33 @@ impl ExecServerClient { } } +#[async_trait] +impl ExecProcess for ExecServerClient { + async fn start(&self, params: ExecParams) -> Result { + self.exec(params).await + } + + async fn read(&self, params: ReadParams) -> Result { + self.read(params).await + } + + async fn write( + &self, + process_id: &str, + chunk: Vec, + ) -> Result { + self.write(process_id, chunk).await + } + + async fn terminate(&self, process_id: &str) -> Result { + self.terminate(process_id).await + } + + fn subscribe_events(&self) -> broadcast::Receiver { + self.event_receiver() + } +} + impl From for ExecServerError { fn from(value: RpcCallError) -> Self { match value { diff --git a/codex-rs/exec-server/src/client_api.rs b/codex-rs/exec-server/src/client_api.rs index 962d3ba364..852ade566a 100644 --- a/codex-rs/exec-server/src/client_api.rs +++ b/codex-rs/exec-server/src/client_api.rs @@ -1,7 +1,17 @@ use std::time::Duration; +use async_trait::async_trait; +use tokio::sync::broadcast; + +use crate::client::ExecServerError; use crate::protocol::ExecExitedNotification; use crate::protocol::ExecOutputDeltaNotification; +use crate::protocol::ExecParams; +use crate::protocol::ExecResponse; +use crate::protocol::ReadParams; +use crate::protocol::ReadResponse; +use crate::protocol::TerminateResponse; +use crate::protocol::WriteResponse; /// Connection options for any exec-server client transport. #[derive(Debug, Clone, PartialEq, Eq)] @@ -25,3 +35,26 @@ pub enum ExecServerEvent { OutputDelta(ExecOutputDeltaNotification), Exited(ExecExitedNotification), } + +/// Process lifecycle capability for an execution environment. +#[async_trait] +pub trait ExecProcess: Send + Sync { + async fn start(&self, params: ExecParams) -> Result; + + async fn read(&self, params: ReadParams) -> Result; + + async fn write( + &self, + process_id: &str, + chunk: Vec, + ) -> Result; + + async fn terminate(&self, process_id: &str) -> Result; + + fn subscribe_events(&self) -> broadcast::Receiver; +} + +/// Capability bundle exposed by an execution environment. +pub trait ExecutorEnvironment: Send + Sync { + fn process(&self) -> &(dyn ExecProcess + '_); +} diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index c8635ec03a..a91c355fff 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,8 +1,24 @@ use crate::ExecServerClient; use crate::ExecServerError; +use crate::ExecServerEvent; +use crate::ExecProcess; +use crate::ExecutorEnvironment; use crate::RemoteExecServerConnectArgs; +use crate::protocol::ExecParams; +use crate::protocol::ExecResponse; +use crate::protocol::ReadParams; +use crate::protocol::ReadResponse; +use crate::protocol::TerminateResponse; +use crate::protocol::WriteResponse; use crate::fs; use crate::fs::ExecutorFileSystem; +use async_trait::async_trait; +use tokio::sync::broadcast; + +#[derive(Debug)] +struct UnavailableExecProcess; + +static UNAVAILABLE_EXEC_PROCESS: UnavailableExecProcess = UnavailableExecProcess; #[derive(Clone, Default)] pub struct Environment { @@ -56,11 +72,57 @@ impl Environment { self.remote_exec_server_client.as_ref() } + pub fn process(&self) -> &(dyn ExecProcess + '_) { + self.remote_exec_server_client + .as_ref() + .map_or(&UNAVAILABLE_EXEC_PROCESS as &dyn ExecProcess, |client| client) + } + pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> { fs::LocalFileSystem } } +impl ExecutorEnvironment for Environment { + fn process(&self) -> &(dyn ExecProcess + '_) { + self.process() + } +} + +#[async_trait] +impl ExecProcess for UnavailableExecProcess { + async fn start(&self, _params: ExecParams) -> Result { + Err(unavailable_exec_process_error()) + } + + async fn read(&self, _params: ReadParams) -> Result { + Err(unavailable_exec_process_error()) + } + + async fn write( + &self, + _process_id: &str, + _chunk: Vec, + ) -> Result { + Err(unavailable_exec_process_error()) + } + + async fn terminate(&self, _process_id: &str) -> Result { + Err(unavailable_exec_process_error()) + } + + fn subscribe_events(&self) -> broadcast::Receiver { + let (_tx, rx) = broadcast::channel(1); + rx + } +} + +fn unavailable_exec_process_error() -> ExecServerError { + ExecServerError::Protocol( + "exec process capability is unavailable for a local default Environment".to_string(), + ) +} + #[cfg(test)] mod tests { use super::Environment; diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 3c50d0ec59..19f0642ad0 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -11,6 +11,8 @@ pub use client::ExecServerClient; pub use client::ExecServerError; pub use client_api::ExecServerClientConnectOptions; pub use client_api::ExecServerEvent; +pub use client_api::ExecProcess; +pub use client_api::ExecutorEnvironment; pub use client_api::RemoteExecServerConnectArgs; pub use codex_app_server_protocol::FsCopyParams; pub use codex_app_server_protocol::FsCopyResponse;