exec-server: remove public process wrapper from client

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-17 19:49:42 +00:00
parent 3a8b3f1b07
commit ae3d5d1c61
4 changed files with 91 additions and 139 deletions

View File

@@ -262,8 +262,8 @@ The crate exports:
- `ExecServerClientConnectOptions`
- `RemoteExecServerConnectArgs`
- `ExecServerLaunchCommand`
- `ExecServerEvent`
- `ExecServerOutput`
- `ExecServerProcess`
- `SpawnedExecServer`
- `ExecServerError`
- `ExecServerTransport`
@@ -301,12 +301,12 @@ Timeout behavior:
- stdio and websocket clients both enforce an initialize-handshake timeout
- websocket clients also enforce a connect timeout before the handshake begins
Process output:
Events:
- `ExecServerProcess::output_receiver()` yields `ExecServerOutput`
- each output event includes both `stream` (`stdout` or `stderr`) and raw bytes
- `ExecServerProcess::has_exited()` is only updated from an actual exit
notification or transport shutdown, not from `terminate()` alone
- `ExecServerClient::event_receiver()` yields `ExecServerEvent`
- output events include both `stream` (`stdout` or `stderr`) and raw bytes
- process lifetime is tracked by server notifications such as
`command/exec/exited`, not by a client-side process registry
Spawning a local child process is deliberately separate:

View File

@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;
#[cfg(test)]
use std::sync::Mutex as StdMutex;
#[cfg(test)]
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
@@ -106,42 +108,35 @@ pub struct ExecServerOutput {
pub chunk: Vec<u8>,
}
pub struct ExecServerProcess {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecServerEvent {
OutputDelta(ExecOutputDeltaNotification),
Exited(ExecExitedNotification),
}
#[cfg(test)]
struct ExecServerProcess {
session_id: String,
output_rx: broadcast::Receiver<ExecServerOutput>,
writer_tx: mpsc::Sender<Vec<u8>>,
status: Arc<RemoteProcessStatus>,
client: ExecServerClient,
}
impl std::fmt::Debug for ExecServerProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExecServerProcess")
.field("session_id", &self.session_id)
.field("has_exited", &self.has_exited())
.field("exit_code", &self.exit_code())
.finish()
}
}
#[cfg(test)]
impl ExecServerProcess {
pub fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
self.writer_tx.clone()
}
pub fn output_receiver(&self) -> broadcast::Receiver<ExecServerOutput> {
fn output_receiver(&self) -> broadcast::Receiver<ExecServerOutput> {
self.output_rx.resubscribe()
}
pub fn has_exited(&self) -> bool {
fn has_exited(&self) -> bool {
self.status.has_exited()
}
pub fn exit_code(&self) -> Option<i32> {
fn exit_code(&self) -> Option<i32> {
self.status.exit_code()
}
pub fn terminate(&self) {
fn terminate(&self) {
let client = self.client.clone();
let session_id = self.session_id.clone();
tokio::spawn(async move {
@@ -150,20 +145,13 @@ impl ExecServerProcess {
}
}
impl std::fmt::Debug for RemoteProcessStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteProcessStatus")
.field("exited", &self.has_exited())
.field("exit_code", &self.exit_code())
.finish()
}
}
#[cfg(test)]
struct RemoteProcessStatus {
exited: AtomicBool,
exit_code: StdMutex<Option<i32>>,
}
#[cfg(test)]
impl RemoteProcessStatus {
fn new() -> Self {
Self {
@@ -188,11 +176,6 @@ impl RemoteProcessStatus {
}
}
struct RegisteredProcess {
output_tx: broadcast::Sender<ExecServerOutput>,
status: Arc<RemoteProcessStatus>,
}
enum PendingRequest {
Initialize(oneshot::Sender<Result<InitializeResponse, JSONRPCErrorError>>),
Exec(oneshot::Sender<Result<ExecResponse, JSONRPCErrorError>>),
@@ -272,7 +255,7 @@ enum ClientBackend {
struct Inner {
backend: ClientBackend,
pending: Mutex<HashMap<RequestId, PendingRequest>>,
processes: Mutex<HashMap<String, RegisteredProcess>>,
events_tx: broadcast::Sender<ExecServerEvent>,
next_request_id: AtomicI64,
reader_task: JoinHandle<()>,
server_task: Option<JoinHandle<()>>,
@@ -355,7 +338,7 @@ impl ExecServerClient {
Inner {
backend: ClientBackend::InProcess { write_tx },
pending: Mutex::new(HashMap::new()),
processes: Mutex::new(HashMap::new()),
events_tx: broadcast::channel(256).0,
next_request_id: AtomicI64::new(1),
reader_task,
server_task: Some(server_task),
@@ -448,7 +431,7 @@ impl ExecServerClient {
Inner {
backend: ClientBackend::JsonRpc { write_tx },
pending: Mutex::new(HashMap::new()),
processes: Mutex::new(HashMap::new()),
events_tx: broadcast::channel(256).0,
next_request_id: AtomicI64::new(1),
reader_task,
server_task: None,
@@ -460,7 +443,12 @@ impl ExecServerClient {
Ok(client)
}
pub async fn start_process(
pub fn event_receiver(&self) -> broadcast::Receiver<ExecServerEvent> {
self.inner.events_tx.subscribe()
}
#[cfg(test)]
async fn start_process(
&self,
params: ExecParams,
) -> Result<ExecServerProcess, ExecServerError> {
@@ -468,33 +456,27 @@ impl ExecServerClient {
let session_id = response.session_id;
let status = Arc::new(RemoteProcessStatus::new());
let (output_tx, output_rx) = broadcast::channel(256);
{
let mut processes = self.inner.processes.lock().await;
if processes.contains_key(&session_id) {
return Err(ExecServerError::Protocol(format!(
"session `{session_id}` already exists"
)));
}
processes.insert(
session_id.clone(),
RegisteredProcess {
output_tx,
status: Arc::clone(&status),
},
);
}
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let client = self.clone();
let write_session_id = session_id.clone();
let mut events_rx = self.event_receiver();
let status_watcher = Arc::clone(&status);
let watch_session_id = session_id.clone();
tokio::spawn(async move {
while let Some(chunk) = writer_rx.recv().await {
let request = WriteParams {
session_id: write_session_id.clone(),
chunk: chunk.into(),
};
if client.write_process(request).await.is_err() {
break;
while let Ok(event) = events_rx.recv().await {
match event {
ExecServerEvent::OutputDelta(notification)
if notification.session_id == watch_session_id =>
{
let _ = output_tx.send(ExecServerOutput {
stream: notification.stream,
chunk: notification.chunk.into_inner(),
});
}
ExecServerEvent::Exited(notification)
if notification.session_id == watch_session_id =>
{
status_watcher.mark_exited(Some(notification.exit_code));
break;
}
ExecServerEvent::OutputDelta(_) | ExecServerEvent::Exited(_) => {}
}
}
});
@@ -502,7 +484,6 @@ impl ExecServerClient {
Ok(ExecServerProcess {
session_id,
output_rx,
writer_tx,
status,
client: self.clone(),
})
@@ -788,20 +769,10 @@ async fn handle_in_process_notification(
) {
match notification {
ExecServerServerNotification::OutputDelta(params) => {
let output = ExecServerOutput {
stream: params.stream,
chunk: params.chunk.into_inner(),
};
let processes = inner.processes.lock().await;
if let Some(process) = processes.get(&params.session_id) {
let _ = process.output_tx.send(output);
}
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
}
ExecServerServerNotification::Exited(params) => {
let mut processes = inner.processes.lock().await;
if let Some(process) = processes.remove(&params.session_id) {
process.status.mark_exited(Some(params.exit_code));
}
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
}
}
}
@@ -843,22 +814,12 @@ async fn handle_server_notification(
EXEC_OUTPUT_DELTA_METHOD => {
let params: ExecOutputDeltaNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let output = ExecServerOutput {
stream: params.stream,
chunk: params.chunk.into_inner(),
};
let processes = inner.processes.lock().await;
if let Some(process) = processes.get(&params.session_id) {
let _ = process.output_tx.send(output);
}
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
}
EXEC_EXITED_METHOD => {
let params: ExecExitedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let mut processes = inner.processes.lock().await;
if let Some(process) = processes.remove(&params.session_id) {
process.status.mark_exited(Some(params.exit_code));
}
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
}
other => {
debug!("ignoring unknown exec-server notification: {other}");
@@ -882,17 +843,6 @@ async fn handle_transport_shutdown(inner: &Arc<Inner>) {
message: "exec-server transport closed".to_string(),
});
}
let processes = {
let mut processes = inner.processes.lock().await;
processes
.drain()
.map(|(_, process)| process)
.collect::<Vec<_>>()
};
for process in processes {
process.status.mark_exited(None);
}
}
#[cfg(test)]
@@ -1255,8 +1205,8 @@ mod tests {
}
assert!(
client.inner.processes.lock().await.is_empty(),
"failed requests should not leave registered process state behind"
client.inner.pending.lock().await.is_empty(),
"failed requests should not leave pending request state behind"
);
}

View File

@@ -7,8 +7,8 @@ mod server;
pub use client::ExecServerClient;
pub use client::ExecServerClientConnectOptions;
pub use client::ExecServerError;
pub use client::ExecServerEvent;
pub use client::ExecServerOutput;
pub use client::ExecServerProcess;
pub use client::RemoteExecServerConnectArgs;
pub use local::ExecServerLaunchCommand;
pub use local::SpawnedExecServer;

View File

@@ -13,6 +13,7 @@ use codex_exec_server::ExecOutputStream;
use codex_exec_server::ExecParams;
use codex_exec_server::ExecServerClient;
use codex_exec_server::ExecServerClientConnectOptions;
use codex_exec_server::ExecServerEvent;
use codex_exec_server::ExecServerLaunchCommand;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
@@ -93,9 +94,10 @@ async fn exec_server_client_streams_output_and_accepts_writes() -> anyhow::Resul
)
.await?;
let process = server
.client()
.start_process(ExecParams {
let client = server.client();
let mut events = client.event_receiver();
let response = client
.exec(ExecParams {
argv: vec![
"bash".to_string(),
"-lc".to_string(),
@@ -108,29 +110,26 @@ async fn exec_server_client_streams_output_and_accepts_writes() -> anyhow::Resul
arg0: None,
})
.await?;
let session_id = response.session_id;
let mut output = process.output_receiver();
let (stream, ready_output) = recv_until_contains(&mut output, "ready").await?;
let (stream, ready_output) = recv_until_contains(&mut events, &session_id, "ready").await?;
assert_eq!(stream, ExecOutputStream::Stdout);
assert!(
ready_output.contains("ready"),
"expected initial ready output"
);
process
.writer_sender()
.send(b"hello\n".to_vec())
.await
.expect("write should succeed");
client.write(&session_id, b"hello\n".to_vec()).await?;
let (stream, echoed_output) = recv_until_contains(&mut output, "echo:hello").await?;
let (stream, echoed_output) =
recv_until_contains(&mut events, &session_id, "echo:hello").await?;
assert_eq!(stream, ExecOutputStream::Stdout);
assert!(
echoed_output.contains("echo:hello"),
"expected echoed output"
);
process.terminate();
client.terminate(&session_id).await?;
Ok(())
}
@@ -160,8 +159,9 @@ async fn exec_server_client_connects_over_websocket() -> anyhow::Result<()> {
})
.await?;
let process = client
.start_process(ExecParams {
let mut events = client.event_receiver();
let response = client
.exec(ExecParams {
argv: vec![
"bash".to_string(),
"-lc".to_string(),
@@ -174,29 +174,26 @@ async fn exec_server_client_connects_over_websocket() -> anyhow::Result<()> {
arg0: None,
})
.await?;
let session_id = response.session_id;
let mut output = process.output_receiver();
let (stream, ready_output) = recv_until_contains(&mut output, "ready").await?;
let (stream, ready_output) = recv_until_contains(&mut events, &session_id, "ready").await?;
assert_eq!(stream, ExecOutputStream::Stdout);
assert!(
ready_output.contains("ready"),
"expected initial ready output"
);
process
.writer_sender()
.send(b"hello\n".to_vec())
.await
.expect("write should succeed");
client.write(&session_id, b"hello\n".to_vec()).await?;
let (stream, echoed_output) = recv_until_contains(&mut output, "echo:hello").await?;
let (stream, echoed_output) =
recv_until_contains(&mut events, &session_id, "echo:hello").await?;
assert_eq!(stream, ExecOutputStream::Stdout);
assert!(
echoed_output.contains("echo:hello"),
"expected echoed output"
);
process.terminate();
client.terminate(&session_id).await?;
child.start_kill()?;
Ok(())
}
@@ -237,8 +234,8 @@ async fn websocket_disconnect_terminates_processes_for_that_connection() -> anyh
})
.await?;
let _process = client
.start_process(ExecParams {
let _response = client
.exec(ExecParams {
argv: vec![
"bash".to_string(),
"-lc".to_string(),
@@ -277,17 +274,22 @@ where
}
async fn recv_until_contains(
output: &mut broadcast::Receiver<codex_exec_server::ExecServerOutput>,
events: &mut broadcast::Receiver<ExecServerEvent>,
session_id: &str,
needle: &str,
) -> anyhow::Result<(ExecOutputStream, String)> {
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut collected = String::new();
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let output_event = timeout(remaining, output.recv()).await??;
collected.push_str(&String::from_utf8_lossy(&output_event.chunk));
if collected.contains(needle) {
return Ok((output_event.stream, collected));
let event = timeout(remaining, events.recv()).await??;
if let ExecServerEvent::OutputDelta(output_event) = event
&& output_event.session_id == session_id
{
collected.push_str(&String::from_utf8_lossy(&output_event.chunk.into_inner()));
if collected.contains(needle) {
return Ok((output_event.stream, collected));
}
}
}
}