exec-server: add in-process client mode

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-17 20:10:22 +00:00
parent adfc5aa0c5
commit 9c53cce1d1
9 changed files with 1023 additions and 347 deletions

View File

@@ -70,7 +70,7 @@ Start simple and explicit:
Server notifications:
- `process/outputDelta`
- `process/output`
- `process/exited`
- optionally `process/started`
- optionally `process/failed`
@@ -132,7 +132,7 @@ growth.
Suggested behavior:
- stream every output chunk live via `process/outputDelta`
- stream every output chunk live via `process/output`
- retain capped output per process in memory
- keep stdout and stderr separately for pipe-backed processes
- for PTY-backed processes, treat retained output as a single terminal stream
@@ -234,7 +234,7 @@ If unified exec is later backed by exec-server, the `core` client wrapper should
keep owning the translation into the existing event model:
- `process/start` success -> `ExecCommandBegin`
- `process/outputDelta` -> `ExecCommandOutputDelta`
- `process/output` -> `ExecCommandOutputDelta`
- local `process/write` call -> `TerminalInteraction`
- `process/exited` plus retained transcript -> `ExecCommandEnd`

View File

@@ -8,6 +8,7 @@ It currently provides:
- a standalone binary: `codex-exec-server`
- a transport-agnostic server runtime with stdio and websocket entrypoints
- a Rust client: `ExecServerClient`
- a direct in-process client mode: `ExecServerClient::connect_in_process`
- a separate local launch helper: `spawn_local_exec_server`
- a small protocol module with shared request/response types
@@ -19,6 +20,8 @@ The internal shape is intentionally closer to `app-server` than the first cut:
- transport adapters are separate from the per-connection request processor
- JSON-RPC route matching is separate from the stateful exec handler
- the client only speaks the protocol; it does not spawn a server subprocess
- the client can also bypass the JSON-RPC transport/routing layer in local
in-process mode and call the typed handler directly
- local child-process launch is handled by a separate helper/factory layer
That split is meant to leave reusable seams if exec-server and app-server later
@@ -59,10 +62,10 @@ Each connection follows this sequence:
1. Send `initialize`.
2. Wait for the `initialize` response.
3. Send `initialized`.
4. Start and manage processes with `command/exec`, `command/exec/write`, and
`command/exec/terminate`.
5. Read streaming notifications from `command/exec/outputDelta` and
`command/exec/exited`.
4. Start and manage processes with `process/start`, `process/read`,
`process/write`, and `process/terminate`.
5. Read streaming notifications from `process/output` and
`process/exited`.
If the client sends exec methods before completing the `initialize` /
`initialized` handshake, the server rejects them.
@@ -100,7 +103,7 @@ Handshake acknowledgement notification sent by the client after a successful
Params are currently ignored. Sending any other client notification method is a
protocol error.
### `command/exec`
### `process/start`
Starts a new managed process.
@@ -121,7 +124,6 @@ Request params:
Field definitions:
- `processId`: caller-chosen stable id for this process within the connection.
- `argv`: command vector. It must be non-empty.
- `cwd`: absolute working directory used for the child process.
- `env`: environment variables passed to the child process.
@@ -139,13 +141,13 @@ Response:
Behavior notes:
- Reusing an existing `processId` is rejected.
- PTY-backed processes accept later writes through `command/exec/write`.
- `processId` is chosen by the client and must be unique for the connection.
- PTY-backed processes accept later writes through `process/write`.
- Pipe-backed processes are launched with stdin closed and reject writes.
- Output is streamed asynchronously via `command/exec/outputDelta`.
- Exit is reported asynchronously via `command/exec/exited`.
- Output is streamed asynchronously via `process/output`.
- Exit is reported asynchronously via `process/exited`.
### `command/exec/write`
### `process/write`
Writes raw bytes to a running PTY-backed process stdin.
@@ -173,7 +175,48 @@ Behavior notes:
- Writes to an unknown `processId` are rejected.
- Writes to a non-PTY process are rejected because stdin is already closed.
### `command/exec/terminate`
### `process/read`
Reads retained output from a managed process by sequence number.
Request params:
```json
{
"processId": "proc-1",
"afterSeq": 0,
"maxBytes": 65536,
"waitMs": 250
}
```
Response:
```json
{
"chunks": [
{
"seq": 1,
"stream": "pty",
"chunk": "aGVsbG8K"
}
],
"nextSeq": 2,
"exited": false,
"exitCode": null
}
```
Behavior notes:
- Output is retained in bounded server memory so callers can poll without
relying only on notifications.
- `afterSeq` is exclusive: `0` reads from the beginning of the retained buffer.
- `waitMs` waits briefly for new output or exit if nothing is currently
available.
- Once retained output exceeds the per-process cap, oldest chunks are dropped.
### `process/terminate`
Terminates a running managed process.
@@ -203,7 +246,7 @@ If the process is already unknown or already removed, the server responds with:
## Notifications
### `command/exec/outputDelta`
### `process/output`
Streaming output chunk from a running process.
@@ -220,10 +263,10 @@ Params:
Fields:
- `processId`: process identifier
- `stream`: `"stdout"` or `"stderr"`
- `stream`: `"stdout"`, `"stderr"`, or `"pty"` for PTY-backed processes
- `chunk`: base64-encoded output bytes
### `command/exec/exited`
### `process/exited`
Final process exit notification.
@@ -261,8 +304,8 @@ The crate exports:
- `ExecServerClientConnectOptions`
- `RemoteExecServerConnectArgs`
- `ExecServerLaunchCommand`
- `ExecServerEvent`
- `ExecServerOutput`
- `ExecServerProcess`
- `SpawnedExecServer`
- `ExecServerError`
- `ExecServerTransport`
@@ -292,18 +335,21 @@ Connect the client to an existing server transport:
- `ExecServerClient::connect_stdio(...)`
- `ExecServerClient::connect_websocket(...)`
- `ExecServerClient::connect_in_process(...)` for a local no-transport mode
backed directly by the typed handler
Timeout behavior:
- stdio and websocket clients both enforce an initialize-handshake timeout
- websocket clients also enforce a connect timeout before the handshake begins
Process output:
Events:
- `ExecServerProcess::output_receiver()` yields `ExecServerOutput`
- each output event includes both `stream` (`stdout` or `stderr`) and raw bytes
- `ExecServerProcess::has_exited()` is only updated from an actual exit
notification or transport shutdown, not from `terminate()` alone
- `ExecServerClient::event_receiver()` yields `ExecServerEvent`
- output events include both `stream` (`stdout`, `stderr`, or `pty`) and raw
bytes
- process lifetime is tracked by server notifications such as
`process/exited`, not by a client-side process registry
Spawning a local child process is deliberately separate:
@@ -322,23 +368,23 @@ Initialize:
Start a process:
```json
{"id":2,"method":"command/exec","params":{"processId":"proc-1","argv":["bash","-lc","printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"],"cwd":"/tmp","env":{"PATH":"/usr/bin:/bin"},"tty":true,"arg0":null}}
{"id":2,"method":"process/start","params":{"processId":"proc-1","argv":["bash","-lc","printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"],"cwd":"/tmp","env":{"PATH":"/usr/bin:/bin"},"tty":true,"arg0":null}}
{"id":2,"result":{"processId":"proc-1"}}
{"method":"command/exec/outputDelta","params":{"processId":"proc-1","stream":"stdout","chunk":"cmVhZHkK"}}
{"method":"process/output","params":{"processId":"proc-1","stream":"pty","chunk":"cmVhZHkK"}}
```
Write to the process:
```json
{"id":3,"method":"command/exec/write","params":{"processId":"proc-1","chunk":"aGVsbG8K"}}
{"id":3,"method":"process/write","params":{"processId":"proc-1","chunk":"aGVsbG8K"}}
{"id":3,"result":{"accepted":true}}
{"method":"command/exec/outputDelta","params":{"processId":"proc-1","stream":"stdout","chunk":"ZWNobzpoZWxsbwo="}}
{"method":"process/output","params":{"processId":"proc-1","stream":"pty","chunk":"ZWNobzpoZWxsbwo="}}
```
Terminate it:
```json
{"id":4,"method":"command/exec/terminate","params":{"processId":"proc-1"}}
{"id":4,"method":"process/terminate","params":{"processId":"proc-1"}}
{"id":4,"result":{"running":true}}
{"method":"command/exec/exited","params":{"processId":"proc-1","exitCode":0}}
{"method":"process/exited","params":{"processId":"proc-1","exitCode":0}}
```

File diff suppressed because it is too large Load Diff

View File

@@ -7,8 +7,8 @@ mod server;
pub use client::ExecServerClient;
pub use client::ExecServerClientConnectOptions;
pub use client::ExecServerError;
pub use client::ExecServerEvent;
pub use client::ExecServerOutput;
pub use client::ExecServerProcess;
pub use client::RemoteExecServerConnectArgs;
pub use local::ExecServerLaunchCommand;
pub use local::SpawnedExecServer;

View File

@@ -7,11 +7,12 @@ use serde::Serialize;
pub const INITIALIZE_METHOD: &str = "initialize";
pub const INITIALIZED_METHOD: &str = "initialized";
pub const EXEC_METHOD: &str = "command/exec";
pub const EXEC_WRITE_METHOD: &str = "command/exec/write";
pub const EXEC_TERMINATE_METHOD: &str = "command/exec/terminate";
pub const EXEC_OUTPUT_DELTA_METHOD: &str = "command/exec/outputDelta";
pub const EXEC_EXITED_METHOD: &str = "command/exec/exited";
pub const EXEC_METHOD: &str = "process/start";
pub const EXEC_READ_METHOD: &str = "process/read";
pub const EXEC_WRITE_METHOD: &str = "process/write";
pub const EXEC_TERMINATE_METHOD: &str = "process/terminate";
pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output";
pub const EXEC_EXITED_METHOD: &str = "process/exited";
pub const PROTOCOL_VERSION: &str = "exec-server.v0";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -45,8 +46,8 @@ pub struct InitializeResponse {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecParams {
/// Caller-chosen stable process identifier scoped to a single exec-server
/// connection. This is a protocol handle, not an OS pid.
/// Client-chosen logical process handle scoped to this connection/session.
/// This is a protocol key, not an OS pid.
pub process_id: String,
pub argv: Vec<String>,
pub cwd: PathBuf,
@@ -61,6 +62,32 @@ pub struct ExecResponse {
pub process_id: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadParams {
pub process_id: String,
pub after_seq: Option<u64>,
pub max_bytes: Option<usize>,
pub wait_ms: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProcessOutputChunk {
pub seq: u64,
pub stream: ExecOutputStream,
pub chunk: ByteChunk,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadResponse {
pub chunks: Vec<ProcessOutputChunk>,
pub next_seq: u64,
pub exited: bool,
pub exit_code: Option<i32>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WriteParams {
@@ -91,6 +118,7 @@ pub struct TerminateResponse {
pub enum ExecOutputStream {
Stdout,
Stderr,
Pty,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -3,6 +3,13 @@ mod processor;
mod routing;
mod transport;
pub(crate) use handler::ExecServerHandler;
pub(crate) use routing::ExecServerClientNotification;
pub(crate) use routing::ExecServerInboundMessage;
pub(crate) use routing::ExecServerOutboundMessage;
pub(crate) use routing::ExecServerRequest;
pub(crate) use routing::ExecServerResponseMessage;
pub(crate) use routing::ExecServerServerNotification;
pub use transport::ExecServerTransport;
pub use transport::ExecServerTransportParseError;

View File

@@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::TerminalSize;
@@ -12,6 +14,8 @@ use crate::protocol::ExecOutputStream;
use crate::protocol::ExecResponse;
use crate::protocol::InitializeResponse;
use crate::protocol::PROTOCOL_VERSION;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteResponse;
use crate::server::routing::ExecServerClientNotification;
@@ -24,15 +28,28 @@ use crate::server::routing::internal_error;
use crate::server::routing::invalid_params;
use crate::server::routing::invalid_request;
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
#[derive(Clone)]
struct RetainedOutputChunk {
seq: u64,
stream: ExecOutputStream,
chunk: Vec<u8>,
}
struct RunningProcess {
session: ExecCommandSession,
tty: bool,
output: VecDeque<RetainedOutputChunk>,
retained_bytes: usize,
next_seq: u64,
exit_code: Option<i32>,
}
pub(crate) struct ExecServerHandler {
outbound_tx: mpsc::Sender<ExecServerOutboundMessage>,
// Keyed by the protocol `processId`, which is caller-assigned and scoped to
// a single client connection rather than an OS pid.
// Keyed by client-chosen logical `processId` scoped to this connection.
// This is a protocol handle, not an OS pid.
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
initialize_requested: bool,
initialized: bool,
@@ -94,6 +111,19 @@ impl ExecServerHandler {
)
.await;
}
ExecServerRequest::Read { request_id, params } => {
self.send_request_result(
request_id,
match self.require_initialized() {
Ok(()) => self
.handle_read_request(params)
.await
.map(ExecServerResponseMessage::Read),
Err(err) => Err(err),
},
)
.await;
}
ExecServerRequest::Write { request_id, params } => {
self.send_request_result(
request_id,
@@ -177,14 +207,6 @@ impl ExecServerHandler {
.ok_or_else(|| invalid_params("argv must not be empty".to_string()))?;
let process_id = params.process_id.clone();
{
let process_map = self.processes.lock().await;
if process_map.contains_key(&process_id) {
return Err(invalid_request(format!(
"process {process_id} already exists"
)));
}
}
let spawned = if params.tty {
codex_utils_pty::spawn_pty_process(
@@ -221,21 +243,35 @@ impl ExecServerHandler {
RunningProcess {
session: spawned.session,
tty: params.tty,
output: std::collections::VecDeque::new(),
retained_bytes: 0,
next_seq: 1,
exit_code: None,
},
);
}
tokio::spawn(stream_output(
process_id.clone(),
ExecOutputStream::Stdout,
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stdout
},
spawned.stdout_rx,
self.outbound_tx.clone(),
Arc::clone(&self.processes),
));
tokio::spawn(stream_output(
process_id.clone(),
ExecOutputStream::Stderr,
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stderr
},
spawned.stderr_rx,
self.outbound_tx.clone(),
Arc::clone(&self.processes),
));
tokio::spawn(watch_exit(
process_id.clone(),
@@ -247,6 +283,59 @@ impl ExecServerHandler {
Ok(ExecResponse { process_id })
}
async fn handle_read_request(
&self,
params: crate::protocol::ReadParams,
) -> Result<ReadResponse, codex_app_server_protocol::JSONRPCErrorError> {
let after_seq = params.after_seq.unwrap_or(0);
let max_bytes = params.max_bytes.unwrap_or(usize::MAX);
let wait = Duration::from_millis(params.wait_ms.unwrap_or(0));
let deadline = tokio::time::Instant::now() + wait;
loop {
let response = {
let process_map = self.processes.lock().await;
let process = process_map.get(&params.process_id).ok_or_else(|| {
invalid_request(format!("unknown process id {}", params.process_id))
})?;
let mut chunks = Vec::new();
let mut total_bytes = 0;
for retained in process.output.iter().filter(|chunk| chunk.seq > after_seq) {
let chunk_len = retained.chunk.len();
if !chunks.is_empty() && total_bytes + chunk_len > max_bytes {
break;
}
total_bytes += chunk_len;
chunks.push(ProcessOutputChunk {
seq: retained.seq,
stream: retained.stream,
chunk: retained.chunk.clone().into(),
});
if total_bytes >= max_bytes {
break;
}
}
ReadResponse {
chunks,
next_seq: process.next_seq,
exited: process.exit_code.is_some(),
exit_code: process.exit_code,
}
};
if !response.chunks.is_empty()
|| response.exited
|| tokio::time::Instant::now() >= deadline
{
return Ok(response);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
async fn handle_write_request(
&self,
params: crate::protocol::WriteParams,
@@ -315,15 +404,38 @@ async fn stream_output(
stream: ExecOutputStream,
mut receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
outbound_tx: mpsc::Sender<ExecServerOutboundMessage>,
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
) {
while let Some(chunk) = receiver.recv().await {
let notification = {
let mut processes = processes.lock().await;
let Some(process) = processes.get_mut(&process_id) else {
break;
};
let seq = process.next_seq;
process.next_seq += 1;
process.retained_bytes += chunk.len();
process.output.push_back(RetainedOutputChunk {
seq,
stream,
chunk: chunk.clone(),
});
while process.retained_bytes > RETAINED_OUTPUT_BYTES_PER_PROCESS {
let Some(evicted) = process.output.pop_front() else {
break;
};
process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len());
}
ExecOutputDeltaNotification {
process_id: process_id.clone(),
stream,
chunk: chunk.into(),
}
};
if outbound_tx
.send(ExecServerOutboundMessage::Notification(
ExecServerServerNotification::OutputDelta(ExecOutputDeltaNotification {
process_id: process_id.clone(),
stream,
chunk: chunk.into(),
}),
ExecServerServerNotification::OutputDelta(notification),
))
.await
.is_err()
@@ -342,7 +454,9 @@ async fn watch_exit(
let exit_code = exit_rx.await.unwrap_or(-1);
{
let mut processes = processes.lock().await;
processes.remove(&process_id);
if let Some(process) = processes.get_mut(&process_id) {
process.exit_code = Some(exit_code);
}
}
let _ = outbound_tx
.send(ExecServerOutboundMessage::Notification(
@@ -574,18 +688,9 @@ mod tests {
}
#[tokio::test]
async fn duplicate_process_ids_are_rejected_per_connection() {
async fn exec_echoes_client_process_ids() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4);
let mut handler = ExecServerHandler::new(outgoing_tx);
let marker_path = std::env::temp_dir().join(format!(
"codex-exec-server-duplicate-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock before unix epoch")
.as_nanos()
));
let _ = std::fs::remove_file(&marker_path);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
@@ -631,48 +736,41 @@ mod tests {
{
panic!("first exec should succeed: {err}");
}
assert_eq!(
recv_outbound(&mut outgoing_rx).await,
ExecServerOutboundMessage::Response {
request_id: RequestId::Integer(2),
response: ExecServerResponseMessage::Exec(crate::protocol::ExecResponse {
process_id: "proc-1".to_string(),
}),
}
);
let ExecServerOutboundMessage::Response {
request_id,
response: ExecServerResponseMessage::Exec(first_exec),
} = recv_outbound(&mut outgoing_rx).await
else {
panic!("expected first exec response");
};
assert_eq!(request_id, RequestId::Integer(2));
assert_eq!(first_exec.process_id, "proc-1");
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(3),
params: crate::protocol::ExecParams {
argv: vec![
"bash".to_string(),
"-lc".to_string(),
format!("printf duplicate > {}", marker_path.display()),
],
process_id: "proc-2".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
..params
},
}))
.await
{
panic!("duplicate exec should not fail the handler: {err}");
panic!("second exec should succeed: {err}");
}
let ExecServerOutboundMessage::Error { request_id, error } =
recv_outbound(&mut outgoing_rx).await
let ExecServerOutboundMessage::Response {
request_id,
response: ExecServerResponseMessage::Exec(second_exec),
} = recv_outbound(&mut outgoing_rx).await
else {
panic!("expected duplicate-process error");
panic!("expected second exec response");
};
assert_eq!(request_id, RequestId::Integer(3));
assert_eq!(error.code, -32600);
assert_eq!(error.message, "process proc-1 already exists");
assert!(
!marker_path.exists(),
"duplicate process ids must be rejected before spawning the command"
);
assert_eq!(second_exec.process_id, "proc-2");
handler.shutdown().await;
let _ = std::fs::remove_file(&marker_path);
}
#[tokio::test]
@@ -707,7 +805,7 @@ mod tests {
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(2),
params: crate::protocol::ExecParams {
process_id: "proc-2".to_string(),
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
@@ -723,14 +821,20 @@ mod tests {
{
panic!("exec should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
let ExecServerOutboundMessage::Response {
response: ExecServerResponseMessage::Exec(exec_response),
..
} = recv_outbound(&mut outgoing_rx).await
else {
panic!("expected exec response");
};
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Write {
request_id: RequestId::Integer(3),
params: WriteParams {
process_id: "proc-2".to_string(),
process_id: exec_response.process_id,
chunk: b"hello\n".to_vec().into(),
},
},
@@ -747,7 +851,7 @@ mod tests {
};
assert_eq!(request_id, RequestId::Integer(3));
assert_eq!(error.code, -32600);
assert_eq!(error.message, "stdin is closed for process proc-2");
assert_eq!(error.message, "stdin is closed for process proc-1");
handler.shutdown().await;
}
@@ -859,7 +963,7 @@ mod tests {
}
#[tokio::test]
async fn terminate_keeps_process_ids_reserved_until_exit_cleanup() {
async fn terminate_keeps_process_ids_reserved() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2);
let mut handler = ExecServerHandler::new(outgoing_tx);
@@ -902,6 +1006,10 @@ mod tests {
super::RunningProcess {
session: spawned.session,
tty: false,
output: std::collections::VecDeque::new(),
retained_bytes: 0,
next_seq: 1,
exit_code: None,
},
);
}

View File

@@ -10,6 +10,7 @@ use serde::de::DeserializeOwned;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::ExecExitedNotification;
@@ -20,6 +21,8 @@ use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -41,6 +44,10 @@ pub(crate) enum ExecServerRequest {
request_id: RequestId,
params: ExecParams,
},
Read {
request_id: RequestId,
params: ReadParams,
},
Write {
request_id: RequestId,
params: WriteParams,
@@ -73,6 +80,7 @@ pub(crate) enum ExecServerOutboundMessage {
pub(crate) enum ExecServerResponseMessage {
Initialize(InitializeResponse),
Exec(ExecResponse),
Read(ReadResponse),
Write(WriteResponse),
Terminate(TerminateResponse),
}
@@ -161,6 +169,9 @@ fn route_request(request: JSONRPCRequest) -> Result<RoutedExecServerMessage, Str
EXEC_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::Exec { request_id, params }
})),
EXEC_READ_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::Read { request_id, params }
})),
EXEC_WRITE_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::Write { request_id, params }
})),
@@ -210,6 +221,7 @@ fn serialize_response(
match response {
ExecServerResponseMessage::Initialize(response) => serde_json::to_value(response),
ExecServerResponseMessage::Exec(response) => serde_json::to_value(response),
ExecServerResponseMessage::Read(response) => serde_json::to_value(response),
ExecServerResponseMessage::Write(response) => serde_json::to_value(response),
ExecServerResponseMessage::Terminate(response) => serde_json::to_value(response),
}
@@ -421,12 +433,12 @@ mod tests {
#[test]
fn unexpected_client_notifications_are_rejected() {
let err = route_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification {
method: "process/outputDelta".to_string(),
method: "process/output".to_string(),
params: Some(json!({})),
}))
.expect_err("unexpected client notification should fail");
assert_eq!(err, "unexpected notification method: process/outputDelta");
assert_eq!(err, "unexpected notification method: process/output");
}
#[test]

View File

@@ -13,6 +13,7 @@ use codex_exec_server::ExecOutputStream;
use codex_exec_server::ExecParams;
use codex_exec_server::ExecServerClient;
use codex_exec_server::ExecServerClientConnectOptions;
use codex_exec_server::ExecServerEvent;
use codex_exec_server::ExecServerLaunchCommand;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
@@ -93,10 +94,11 @@ async fn exec_server_client_streams_output_and_accepts_writes() -> anyhow::Resul
)
.await?;
let process = server
.client()
.start_process(ExecParams {
process_id: "2001".to_string(),
let client = server.client();
let mut events = client.event_receiver();
let response = client
.exec(ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
@@ -109,29 +111,26 @@ async fn exec_server_client_streams_output_and_accepts_writes() -> anyhow::Resul
arg0: None,
})
.await?;
let process_id = response.process_id;
let mut output = process.output_receiver();
let (stream, ready_output) = recv_until_contains(&mut output, "ready").await?;
assert_eq!(stream, ExecOutputStream::Stdout);
let (stream, ready_output) = recv_until_contains(&mut events, &process_id, "ready").await?;
assert_eq!(stream, ExecOutputStream::Pty);
assert!(
ready_output.contains("ready"),
"expected initial ready output"
);
process
.writer_sender()
.send(b"hello\n".to_vec())
.await
.expect("write should succeed");
client.write(&process_id, b"hello\n".to_vec()).await?;
let (stream, echoed_output) = recv_until_contains(&mut output, "echo:hello").await?;
assert_eq!(stream, ExecOutputStream::Stdout);
let (stream, echoed_output) =
recv_until_contains(&mut events, &process_id, "echo:hello").await?;
assert_eq!(stream, ExecOutputStream::Pty);
assert!(
echoed_output.contains("echo:hello"),
"expected echoed output"
);
process.terminate();
client.terminate(&process_id).await?;
Ok(())
}
@@ -161,9 +160,10 @@ async fn exec_server_client_connects_over_websocket() -> anyhow::Result<()> {
})
.await?;
let process = client
.start_process(ExecParams {
process_id: "2002".to_string(),
let mut events = client.event_receiver();
let response = client
.exec(ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
@@ -176,29 +176,26 @@ async fn exec_server_client_connects_over_websocket() -> anyhow::Result<()> {
arg0: None,
})
.await?;
let process_id = response.process_id;
let mut output = process.output_receiver();
let (stream, ready_output) = recv_until_contains(&mut output, "ready").await?;
assert_eq!(stream, ExecOutputStream::Stdout);
let (stream, ready_output) = recv_until_contains(&mut events, &process_id, "ready").await?;
assert_eq!(stream, ExecOutputStream::Pty);
assert!(
ready_output.contains("ready"),
"expected initial ready output"
);
process
.writer_sender()
.send(b"hello\n".to_vec())
.await
.expect("write should succeed");
client.write(&process_id, b"hello\n".to_vec()).await?;
let (stream, echoed_output) = recv_until_contains(&mut output, "echo:hello").await?;
assert_eq!(stream, ExecOutputStream::Stdout);
let (stream, echoed_output) =
recv_until_contains(&mut events, &process_id, "echo:hello").await?;
assert_eq!(stream, ExecOutputStream::Pty);
assert!(
echoed_output.contains("echo:hello"),
"expected echoed output"
);
process.terminate();
client.terminate(&process_id).await?;
child.start_kill()?;
Ok(())
}
@@ -239,9 +236,9 @@ async fn websocket_disconnect_terminates_processes_for_that_connection() -> anyh
})
.await?;
let _process = client
.start_process(ExecParams {
process_id: "2003".to_string(),
let _response = client
.exec(ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
@@ -280,17 +277,22 @@ where
}
async fn recv_until_contains(
output: &mut broadcast::Receiver<codex_exec_server::ExecServerOutput>,
events: &mut broadcast::Receiver<ExecServerEvent>,
process_id: &str,
needle: &str,
) -> anyhow::Result<(ExecOutputStream, String)> {
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut collected = String::new();
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let output_event = timeout(remaining, output.recv()).await??;
collected.push_str(&String::from_utf8_lossy(&output_event.chunk));
if collected.contains(needle) {
return Ok((output_event.stream, collected));
let event = timeout(remaining, events.recv()).await??;
if let ExecServerEvent::OutputDelta(output_event) = event
&& output_event.process_id == process_id
{
collected.push_str(&String::from_utf8_lossy(&output_event.chunk.into_inner()));
if collected.contains(needle) {
return Ok((output_event.stream, collected));
}
}
}
}