Use a direct local ExecProcess implementation

Replace the in-process client wrapper with a local ExecProcess that owns
ExecServerHandler directly and forwards process notifications to the
trait event stream. This keeps the default Environment process path
entirely local instead of routing through ExecServerClient.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-19 14:36:47 -07:00
parent 2d52d35c09
commit 7bbc0a4e15

View File

@@ -4,22 +4,57 @@ use crate::ExecServerEvent;
use crate::ExecProcess;
use crate::ExecutorEnvironment;
use crate::RemoteExecServerConnectArgs;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use crate::rpc::RpcNotificationSender;
use crate::rpc::RpcServerOutboundMessage;
use crate::server::ExecServerHandler;
use crate::fs;
use crate::fs::ExecutorFileSystem;
use async_trait::async_trait;
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::sync::OnceCell;
#[derive(Clone, Default)]
#[derive(Clone)]
struct LocalExecProcess {
client: Arc<OnceCell<ExecServerClient>>,
handler: Arc<ExecServerHandler>,
events_tx: broadcast::Sender<ExecServerEvent>,
outgoing_rx: Arc<Mutex<Option<mpsc::Receiver<RpcServerOutboundMessage>>>>,
reader_task_started: Arc<OnceCell<()>>,
}
impl Default for LocalExecProcess {
fn default() -> Self {
let (outgoing_tx, outgoing_rx) = mpsc::channel(256);
let handler = Arc::new(ExecServerHandler::new(RpcNotificationSender::new(outgoing_tx)));
let (_initialize_response) = handler
.initialize()
.expect("new local exec process should initialize once");
handler
.initialized()
.expect("new local exec process should accept initialized notification");
Self {
handler,
events_tx: broadcast::channel(256).0,
outgoing_rx: Arc::new(Mutex::new(Some(outgoing_rx))),
reader_task_started: Arc::new(OnceCell::new()),
}
}
}
#[derive(Clone)]
@@ -106,11 +141,13 @@ impl ExecutorEnvironment for Environment {
#[async_trait]
impl ExecProcess for LocalExecProcess {
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
self.client().await?.start(params).await
self.ensure_reader_task().await;
self.handler.exec(params).await.map_err(local_server_error)
}
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
self.client().await?.read(params).await
self.ensure_reader_task().await;
self.handler.exec_read(params).await.map_err(local_server_error)
}
async fn write(
@@ -118,30 +155,77 @@ impl ExecProcess for LocalExecProcess {
process_id: &str,
chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError> {
self.client().await?.write(process_id, chunk).await
self.ensure_reader_task().await;
self.handler
.exec_write(WriteParams {
process_id: process_id.to_string(),
chunk: chunk.into(),
})
.await
.map_err(local_server_error)
}
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
self.client().await?.terminate(process_id).await
self.ensure_reader_task().await;
self.handler
.terminate(TerminateParams {
process_id: process_id.to_string(),
})
.await
.map_err(local_server_error)
}
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
if let Some(client) = self.client.get() {
client.event_receiver()
} else {
let (_tx, rx) = broadcast::channel(1);
rx
}
self.events_tx.subscribe()
}
}
impl LocalExecProcess {
async fn client(&self) -> Result<&ExecServerClient, ExecServerError> {
self.client
.get_or_try_init(|| async {
ExecServerClient::connect_in_process(Default::default()).await
async fn ensure_reader_task(&self) {
let _ = self
.reader_task_started
.get_or_init(|| async {
let mut outgoing_rx = self.outgoing_rx.lock().await;
let Some(mut outgoing_rx) = outgoing_rx.take() else {
return;
};
let events_tx = self.events_tx.clone();
tokio::spawn(async move {
while let Some(message) = outgoing_rx.recv().await {
if let RpcServerOutboundMessage::Notification(notification) = message {
match notification.method.as_str() {
EXEC_OUTPUT_DELTA_METHOD => {
if let Ok(params) = serde_json::from_value::<
ExecOutputDeltaNotification,
>(
notification.params.unwrap_or(Value::Null)
) {
let _ = events_tx.send(ExecServerEvent::OutputDelta(params));
}
}
EXEC_EXITED_METHOD => {
if let Ok(params) =
serde_json::from_value::<ExecExitedNotification>(
notification.params.unwrap_or(Value::Null),
)
{
let _ = events_tx.send(ExecServerEvent::Exited(params));
}
}
_ => {}
}
}
}
});
})
.await
.await;
}
}
fn local_server_error(error: codex_app_server_protocol::JSONRPCErrorError) -> ExecServerError {
ExecServerError::Server {
code: error.code,
message: error.message,
}
}