Compare commits

...

3 Commits

Author SHA1 Message Date
starr-openai
bfdd8bcc65 Refactor exec-server client RPC plumbing
Extract a small generic JSON-RPC client core, move remote connection setup into the JSON-RPC backend, and make handler request execution concurrency-safe so out-of-order responses and duplicate process ID races work correctly.

Co-authored-by: Codex <noreply@openai.com>
2026-03-18 13:37:05 -07:00
starr-openai
51a656fc1e Add exec-server filesystem RPC implementation
Layer the filesystem RPC implementation and exec-server-only client cleanup on top of the process RPC follow-up.\n\nCo-authored-by: Codex <noreply@openai.com>
2026-03-18 13:01:16 -07:00
starr-openai
79b46e346d Add exec-server process RPC implementation
Restore the process RPC client/server implementation on top of the initialize-only base.\n\nCo-authored-by: Codex <noreply@openai.com>
2026-03-18 13:01:16 -07:00
25 changed files with 5908 additions and 447 deletions

4
codex-rs/Cargo.lock generated
View File

@@ -2009,14 +2009,18 @@ version = "0.0.0"
dependencies = [
"anyhow",
"base64 0.22.1",
"clap",
"codex-app-server-protocol",
"codex-environment",
"codex-utils-cargo-bin",
"codex-utils-pty",
"futures",
"pretty_assertions",
"serde",
"serde_json",
"thiserror 2.0.18",
"tokio",
"tokio-tungstenite",
"tracing",
]

View File

@@ -13,8 +13,11 @@ workspace = true
[dependencies]
base64 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-app-server-protocol = { workspace = true }
codex-environment = { workspace = true }
codex-utils-pty = { workspace = true }
futures = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
@@ -22,11 +25,13 @@ tokio = { workspace = true, features = [
"io-std",
"io-util",
"macros",
"net",
"process",
"rt-multi-thread",
"sync",
"time",
] }
tokio-tungstenite = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]

View File

@@ -0,0 +1,242 @@
# exec-server design notes
This document sketches a likely direction for integrating `codex-exec-server`
with unified exec without baking the full tool-call policy stack into the
server.
The goal is:
- keep exec-server generic and reusable
- keep approval, sandbox, and retry policy in `core`
- preserve the unified-exec event flow the model already depends on
- support retained output caps so polling and snapshot-style APIs do not grow
memory without bound
## Unified exec today
Today the flow for LLM-visible interactive execution is:
1. The model sees the `exec_command` and `write_stdin` tools.
2. `UnifiedExecHandler` parses the tool arguments and allocates a process id.
3. `UnifiedExecProcessManager::exec_command(...)` calls
`open_session_with_sandbox(...)`.
4. `ToolOrchestrator` drives approval, sandbox selection, managed network
approval, and sandbox-denial retry behavior.
5. `UnifiedExecRuntime` builds a `CommandSpec`, asks the current
`SandboxAttempt` to transform it into an `ExecRequest`, and passes that
resolved request back to the process manager.
6. `open_session_with_exec_env(...)` spawns the process from that resolved
`ExecRequest`.
7. Unified exec emits an `ExecCommandBegin` event.
8. Unified exec starts a background output watcher that emits
`ExecCommandOutputDelta` events.
9. The initial tool call collects output until the requested yield deadline and
returns an `ExecCommandToolOutput` snapshot to the model.
10. If the process is still running, unified exec stores it and later emits
`ExecCommandEnd` when the exit watcher fires.
11. A later `write_stdin` tool call writes to the stored process, emits a
`TerminalInteraction` event, collects another bounded snapshot, and returns
that tool response to the model.
Important observation: the 250ms / 10s yield-window behavior is not really a
process-server concern. It is a client-side convenience layer for the LLM tool
API. The server should focus on raw process lifecycle and streaming events.
## Proposed boundary
The clean split is:
- exec-server server: process lifecycle, output streaming, retained output caps
- exec-server client: `wait`, `communicate`, yield-window helpers, session
bookkeeping
- unified exec in `core`: tool parsing, event emission, approvals, sandboxing,
managed networking, retry semantics
If exec-server is used by unified exec later, the boundary should sit between
step 5 and step 6 above: after policy has produced a resolved spawn request, but
before the actual PTY or pipe spawn.
## Suggested process API
Start simple and explicit:
- `process/start`
- `process/write`
- `process/closeStdin`
- `process/resize`
- `process/terminate`
- `process/wait`
- `process/snapshot`
Server notifications:
- `process/output`
- `process/exited`
- optionally `process/started`
- optionally `process/failed`
Suggested request shapes:
```rust
enum ProcessStartRequest {
Direct(DirectExecSpec),
Prepared(PreparedExecSpec),
}
struct DirectExecSpec {
process_id: String,
argv: Vec<String>,
cwd: PathBuf,
env: HashMap<String, String>,
arg0: Option<String>,
io: ProcessIo,
}
struct PreparedExecSpec {
process_id: String,
request: PreparedExecRequest,
io: ProcessIo,
}
enum ProcessIo {
Pty { rows: u16, cols: u16 },
Pipe { stdin: StdinMode },
}
enum StdinMode {
Open,
Closed,
}
enum TerminateMode {
Graceful { timeout_ms: u64 },
Force,
}
```
Notes:
- `processId` remains a protocol handle, not an OS pid.
- `wait` is a good generic API because many callers want process completion
without manually wiring notifications.
- `communicate` is also a reasonable API, but it should probably start as a
client helper built on top of `write + closeStdin + wait + snapshot`.
- If an RPC form of `communicate` is added later, it should be a convenience
wrapper rather than the primitive execution model.
## Output capping
Even with event streaming, the server should retain a bounded amount of output
per process so callers can poll, wait, or reconnect without unbounded memory
growth.
Suggested behavior:
- stream every output chunk live via `process/output`
- retain capped output per process in memory
- keep stdout and stderr separately for pipe-backed processes
- for PTY-backed processes, treat retained output as a single terminal stream
- expose truncation metadata on snapshots
Suggested snapshot response:
```rust
struct ProcessSnapshot {
stdout: Vec<u8>,
stderr: Vec<u8>,
terminal: Vec<u8>,
truncated: bool,
exit_code: Option<i32>,
running: bool,
}
```
Implementation-wise, the current `HeadTailBuffer` pattern used by unified exec
is a good fit. The cap should be server config, not request config, so memory
use stays predictable.
## Sandboxing and networking
### How unified exec does it today
Unified exec does not hand raw command args directly to the PTY layer for tool
calls. Instead, it:
1. computes approval requirements
2. chooses a sandbox attempt
3. applies managed-network policy if needed
4. transforms `CommandSpec` into `ExecRequest`
5. spawns from that resolved `ExecRequest`
That split is already valuable and should be preserved.
### Recommended exec-server design
Do not put approval policy into exec-server.
Instead, support two execution modes:
- `Direct`: raw command, intended for orchestrator-side or already-trusted use
- `Prepared`: already-resolved spawn request, intended for tool-call execution
For tool calls from the LLM side:
1. `core` runs the existing approval + sandbox + managed-network flow
2. `core` produces a resolved `ExecRequest`
3. the exec-server client sends `PreparedExecSpec`
4. exec-server spawns exactly that request and streams process events
For orchestrator-side execution:
1. caller sends `DirectExecSpec`
2. exec-server spawns directly without running approval or sandbox policy
This gives one generic process API while keeping the policy-sensitive logic in
the place that already owns it.
### Why not make exec-server own sandbox selection?
That would force exec-server to understand:
- approval policy
- exec policy / prefix rules
- managed-network approval flow
- sandbox retry semantics
- guardian routing
- feature-flag-driven sandbox selection
- platform-specific sandbox helper configuration
That is too opinionated for a reusable process service.
## Optional future server config
If exec-server grows beyond the current prototype, a config object like this
would be enough:
```rust
struct ExecServerConfig {
shutdown_grace_period_ms: u64,
max_processes_per_connection: usize,
retained_output_bytes_per_process: usize,
allow_direct_exec: bool,
allow_prepared_exec: bool,
}
```
That keeps policy surface small:
- lifecycle limits live in the server
- trust and sandbox policy stay with the caller
## Mapping back to LLM-visible events
If unified exec is later backed by exec-server, the `core` client wrapper should
keep owning the translation into the existing event model:
- `process/start` success -> `ExecCommandBegin`
- `process/output` -> `ExecCommandOutputDelta`
- local `process/write` call -> `TerminalInteraction`
- `process/exited` plus retained transcript -> `ExecCommandEnd`
That preserves the current LLM-facing contract while making the process backend
swappable.

View File

@@ -1,28 +1,50 @@
# codex-exec-server
`codex-exec-server` is a small standalone stdio JSON-RPC server for spawning
and controlling subprocesses through `codex-utils-pty`.
This PR intentionally lands only the standalone binary, client, wire protocol,
and docs. Exec and filesystem methods are stubbed server-side here and are
implemented in follow-up PRs.
`codex-exec-server` is a small standalone JSON-RPC server for spawning and
controlling subprocesses through `codex-utils-pty`.
It currently provides:
- a standalone binary: `codex-exec-server`
- a transport-agnostic server runtime with stdio and websocket entrypoints
- a Rust client: `ExecServerClient`
- a direct in-process client mode: `ExecServerClient::connect_in_process`
- a separate local launch helper: `spawn_local_exec_server`
- a small protocol module with shared request/response types
This crate is intentionally narrow. It is not wired into the main Codex CLI or
unified-exec in this PR; it is only the standalone transport layer.
The internal shape is intentionally closer to `app-server` than the first cut:
- transport adapters are separate from the per-connection request processor
- JSON-RPC route matching is separate from the stateful exec handler
- the client only speaks the protocol; it does not spawn a server subprocess
- the client can also bypass the JSON-RPC transport/routing layer in local
in-process mode and call the typed handler directly
- local child-process launch is handled by a separate helper/factory layer
That split is meant to leave reusable seams if exec-server and app-server later
share transport or JSON-RPC connection utilities. It also keeps the core
handler testable without the RPC server implementation itself.
Design notes for a likely future integration with unified exec, including
rough call flow, buffering, and sandboxing boundaries, live in
[DESIGN.md](./DESIGN.md).
## Transport
The server speaks newline-delimited JSON-RPC 2.0 over stdio.
The server speaks the same JSON-RPC message shapes over multiple transports.
- `stdin`: one JSON-RPC message per line
- `stdout`: one JSON-RPC message per line
- `stderr`: reserved for logs / process errors
The standalone binary supports:
- `stdio://` (default)
- `ws://IP:PORT`
Wire framing:
- stdio: one newline-delimited JSON-RPC message per line on stdin/stdout
- websocket: one JSON-RPC message per websocket text frame
Like the app-server transport, messages on the wire omit the `"jsonrpc":"2.0"`
field and use the shared `codex-app-server-protocol` envelope types.
@@ -40,13 +62,19 @@ Each connection follows this sequence:
1. Send `initialize`.
2. Wait for the `initialize` response.
3. Send `initialized`.
4. Call exec or filesystem RPCs once the follow-up implementation PRs land.
4. Start and manage processes with `process/start`, `process/read`,
`process/write`, and `process/terminate`.
5. Read streaming notifications from `process/output` and
`process/exited`.
If the server receives any notification other than `initialized`, it replies
with an error using request id `-1`.
If the client sends exec methods before completing the `initialize` /
`initialized` handshake, the server rejects them.
If the stdio connection closes, the server terminates any remaining managed
processes before exiting.
If a connection closes, the server terminates any remaining managed processes
for that connection.
TODO: add authentication to the `initialize` setup before this is used across a
trust boundary.
## API
@@ -73,12 +101,12 @@ Response:
### `initialized`
Handshake acknowledgement notification sent by the client after a successful
`initialize` response.
`initialize` response. Exec methods are rejected until this arrives.
Params are currently ignored. Sending any other notification method is treated
as an invalid request.
Params are currently ignored. Sending any other client notification method is a
protocol error.
### `command/exec`
### `process/start`
Starts a new managed process.
@@ -93,44 +121,40 @@ Request params:
"PATH": "/usr/bin:/bin"
},
"tty": true,
"outputBytesCap": 16384,
"arg0": null
"arg0": null,
"sandbox": null
}
```
Field definitions:
- `processId`: caller-chosen stable id for this process within the connection.
- `argv`: command vector. It must be non-empty.
- `cwd`: absolute working directory used for the child process.
- `env`: environment variables passed to the child process.
- `tty`: when `true`, spawn a PTY-backed interactive process; when `false`,
spawn a pipe-backed process with closed stdin.
- `outputBytesCap`: maximum retained stdout/stderr bytes per stream for the
in-memory buffer. Defaults to `codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP`.
- `arg0`: optional argv0 override forwarded to `codex-utils-pty`.
- `sandbox`: optional sandbox config. Omit it for the current direct-spawn
behavior. Explicit `{"mode":"none"}` is accepted; `{"mode":"hostDefault"}`
is currently rejected until host-local sandbox materialization is wired up.
Response:
```json
{
"processId": "proc-1",
"running": true,
"exitCode": null,
"stdout": null,
"stderr": null
"processId": "proc-1"
}
```
Behavior notes:
- Reusing an existing `processId` is rejected.
- PTY-backed processes accept later writes through `command/exec/write`.
- `processId` is chosen by the client and must be unique for the connection.
- PTY-backed processes accept later writes through `process/write`.
- Pipe-backed processes are launched with stdin closed and reject writes.
- Output is streamed asynchronously via `command/exec/outputDelta`.
- Exit is reported asynchronously via `command/exec/exited`.
- Output is streamed asynchronously via `process/output`.
- Exit is reported asynchronously via `process/exited`.
### `command/exec/write`
### `process/write`
Writes raw bytes to a running PTY-backed process stdin.
@@ -158,7 +182,48 @@ Behavior notes:
- Writes to an unknown `processId` are rejected.
- Writes to a non-PTY process are rejected because stdin is already closed.
### `command/exec/terminate`
### `process/read`
Reads retained output from a managed process by sequence number.
Request params:
```json
{
"processId": "proc-1",
"afterSeq": 0,
"maxBytes": 65536,
"waitMs": 250
}
```
Response:
```json
{
"chunks": [
{
"seq": 1,
"stream": "pty",
"chunk": "aGVsbG8K"
}
],
"nextSeq": 2,
"exited": false,
"exitCode": null
}
```
Behavior notes:
- Output is retained in bounded server memory so callers can poll without
relying only on notifications.
- `afterSeq` is exclusive: `0` reads from the beginning of the retained buffer.
- `waitMs` waits briefly for new output or exit if nothing is currently
available.
- Once retained output exceeds the per-process cap, oldest chunks are dropped.
### `process/terminate`
Terminates a running managed process.
@@ -188,7 +253,7 @@ If the process is already unknown or already removed, the server responds with:
## Notifications
### `command/exec/outputDelta`
### `process/output`
Streaming output chunk from a running process.
@@ -205,10 +270,10 @@ Params:
Fields:
- `processId`: process identifier
- `stream`: `"stdout"` or `"stderr"`
- `stream`: `"stdout"`, `"stderr"`, or `"pty"` for PTY-backed processes
- `chunk`: base64-encoded output bytes
### `command/exec/exited`
### `process/exited`
Final process exit notification.
@@ -243,13 +308,58 @@ Typical error cases:
The crate exports:
- `ExecServerClient`
- `ExecServerClientConnectOptions`
- `RemoteExecServerConnectArgs`
- `ExecServerLaunchCommand`
- `ExecServerProcess`
- `ExecServerEvent`
- `SpawnedExecServer`
- `ExecServerError`
- `ExecServerTransport`
- `spawn_local_exec_server(...)`
- protocol structs such as `ExecParams`, `ExecResponse`,
`WriteParams`, `TerminateParams`, `ExecOutputDeltaNotification`, and
`ExecExitedNotification`
- `run_main()` for embedding the stdio server in a binary
- `run_main()` and `run_main_with_transport(...)`
### Binary
Run over stdio:
```text
codex-exec-server
```
Run as a websocket server:
```text
codex-exec-server --listen ws://127.0.0.1:8080
```
### Client
Connect the client to an existing server transport:
- `ExecServerClient::connect_stdio(...)`
- `ExecServerClient::connect_websocket(...)`
- `ExecServerClient::connect_in_process(...)` for a local no-transport mode
backed directly by the typed handler
Timeout behavior:
- stdio and websocket clients both enforce an initialize-handshake timeout
- websocket clients also enforce a connect timeout before the handshake begins
Events:
- `ExecServerClient::event_receiver()` yields `ExecServerEvent`
- output events include both `stream` (`stdout`, `stderr`, or `pty`) and raw
bytes
- process lifetime is tracked by server notifications such as
`process/exited`, not by a client-side process registry
Spawning a local child process is deliberately separate:
- `spawn_local_exec_server(...)`
## Example session
@@ -264,23 +374,23 @@ Initialize:
Start a process:
```json
{"id":2,"method":"command/exec","params":{"processId":"proc-1","argv":["bash","-lc","printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"],"cwd":"/tmp","env":{"PATH":"/usr/bin:/bin"},"tty":true,"outputBytesCap":4096,"arg0":null}}
{"id":2,"result":{"processId":"proc-1","running":true,"exitCode":null,"stdout":null,"stderr":null}}
{"method":"command/exec/outputDelta","params":{"processId":"proc-1","stream":"stdout","chunk":"cmVhZHkK"}}
{"id":2,"method":"process/start","params":{"processId":"proc-1","argv":["bash","-lc","printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"],"cwd":"/tmp","env":{"PATH":"/usr/bin:/bin"},"tty":true,"arg0":null}}
{"id":2,"result":{"processId":"proc-1"}}
{"method":"process/output","params":{"processId":"proc-1","stream":"pty","chunk":"cmVhZHkK"}}
```
Write to the process:
```json
{"id":3,"method":"command/exec/write","params":{"processId":"proc-1","chunk":"aGVsbG8K"}}
{"id":3,"method":"process/write","params":{"processId":"proc-1","chunk":"aGVsbG8K"}}
{"id":3,"result":{"accepted":true}}
{"method":"command/exec/outputDelta","params":{"processId":"proc-1","stream":"stdout","chunk":"ZWNobzpoZWxsbwo="}}
{"method":"process/output","params":{"processId":"proc-1","stream":"pty","chunk":"ZWNobzpoZWxsbwo="}}
```
Terminate it:
```json
{"id":4,"method":"command/exec/terminate","params":{"processId":"proc-1"}}
{"id":4,"method":"process/terminate","params":{"processId":"proc-1"}}
{"id":4,"result":{"running":true}}
{"method":"command/exec/exited","params":{"processId":"proc-1","exitCode":0}}
{"method":"process/exited","params":{"processId":"proc-1","exitCode":0}}
```

View File

@@ -1,6 +1,22 @@
use clap::Parser;
use codex_exec_server::ExecServerTransport;
#[derive(Debug, Parser)]
struct ExecServerArgs {
/// Transport endpoint URL. Supported values: `stdio://` (default),
/// `ws://IP:PORT`.
#[arg(
long = "listen",
value_name = "URL",
default_value = ExecServerTransport::DEFAULT_LISTEN_URL
)]
listen: ExecServerTransport,
}
#[tokio::main]
async fn main() {
if let Err(err) = codex_exec_server::run_main().await {
let args = ExecServerArgs::parse();
if let Err(err) = codex_exec_server::run_main_with_transport(args.listen).await {
eprintln!("{err}");
std::process::exit(1);
}

View File

@@ -1,54 +1,143 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
use codex_app_server_protocol::FsCreateDirectoryResponse;
use codex_app_server_protocol::FsGetMetadataParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryParams;
use codex_app_server_protocol::FsReadDirectoryResponse;
use codex_app_server_protocol::FsReadFileParams;
use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsRemoveParams;
use codex_app_server_protocol::FsRemoveResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::process::Child;
use tokio::sync::Mutex;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tracing::debug;
use tracing::warn;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::ExecServerEvent;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::FS_COPY_METHOD;
use crate::protocol::FS_CREATE_DIRECTORY_METHOD;
use crate::protocol::FS_GET_METADATA_METHOD;
use crate::protocol::FS_READ_DIRECTORY_METHOD;
use crate::protocol::FS_READ_FILE_METHOD;
use crate::protocol::FS_REMOVE_METHOD;
use crate::protocol::FS_WRITE_FILE_METHOD;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::server_process::ExecServerLaunchCommand;
use crate::server_process::spawn_stdio_exec_server;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use crate::rpc::RpcClientEvent;
use crate::server::ExecServerOutboundMessage;
use crate::server::ExecServerServerNotification;
mod jsonrpc_backend;
mod local_backend;
#[cfg(test)]
mod process;
use jsonrpc_backend::JsonRpcBackend;
use local_backend::LocalBackend;
#[cfg(test)]
use process::ExecServerOutput;
#[cfg(test)]
use process::ExecServerProcess;
#[cfg(test)]
use process::RemoteProcessStatus;
impl Default for ExecServerClientConnectOptions {
fn default() -> Self {
Self {
client_name: "codex-core".to_string(),
initialize_timeout: INITIALIZE_TIMEOUT,
}
}
}
impl From<RemoteExecServerConnectArgs> for ExecServerClientConnectOptions {
fn from(value: RemoteExecServerConnectArgs) -> Self {
Self {
client_name: value.client_name,
initialize_timeout: value.initialize_timeout,
}
}
}
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
impl RemoteExecServerConnectArgs {
pub fn new(websocket_url: String, client_name: String) -> Self {
Self {
websocket_url,
client_name,
connect_timeout: CONNECT_TIMEOUT,
initialize_timeout: INITIALIZE_TIMEOUT,
}
}
}
enum ClientBackend {
JsonRpc(JsonRpcBackend),
InProcess(LocalBackend),
}
impl ClientBackend {
fn as_local(&self) -> Option<&LocalBackend> {
match self {
ClientBackend::JsonRpc(_) => None,
ClientBackend::InProcess(backend) => Some(backend),
}
}
}
struct Inner {
child: StdMutex<Option<Child>>,
write_tx: mpsc::UnboundedSender<JSONRPCMessage>,
pending: Mutex<HashMap<RequestId, oneshot::Sender<Result<Value, JSONRPCErrorError>>>>,
next_request_id: AtomicI64,
backend: ClientBackend,
events_tx: broadcast::Sender<ExecServerEvent>,
reader_task: JoinHandle<()>,
writer_task: JoinHandle<()>,
}
impl Drop for Inner {
fn drop(&mut self) {
self.reader_task.abort();
self.writer_task.abort();
if let Ok(mut child_guard) = self.child.lock()
&& let Some(child) = child_guard.as_mut()
if let Some(backend) = self.backend.as_local()
&& let Ok(handle) = tokio::runtime::Handle::try_current()
{
let _ = child.start_kill();
let backend = backend.clone();
handle.spawn(async move {
backend.shutdown().await;
});
}
self.reader_task.abort();
}
}
@@ -61,6 +150,16 @@ pub struct ExecServerClient {
pub enum ExecServerError {
#[error("failed to spawn exec-server: {0}")]
Spawn(#[source] std::io::Error),
#[error("timed out connecting to exec-server websocket `{url}` after {timeout:?}")]
WebSocketConnectTimeout { url: String, timeout: Duration },
#[error("failed to connect to exec-server websocket `{url}`: {source}")]
WebSocketConnect {
url: String,
#[source]
source: tokio_tungstenite::tungstenite::Error,
},
#[error("timed out waiting for exec-server initialize handshake after {timeout:?}")]
InitializeTimedOut { timeout: Duration },
#[error("exec-server transport closed")]
Closed,
#[error("failed to serialize or deserialize exec-server JSON: {0}")]
@@ -72,205 +171,398 @@ pub enum ExecServerError {
}
impl ExecServerClient {
pub async fn spawn(command: ExecServerLaunchCommand) -> Result<Self, ExecServerError> {
let crate::server_process::SpawnedStdioExecServer {
child,
stdin,
stdout,
} = spawn_stdio_exec_server(command)?;
pub async fn connect_in_process(
options: ExecServerClientConnectOptions,
) -> Result<Self, ExecServerError> {
let (outbound_tx, mut outgoing_rx) = mpsc::channel::<ExecServerOutboundMessage>(256);
let backend = LocalBackend::new(crate::server::ExecServerHandler::new(outbound_tx));
let (write_tx, mut write_rx) = mpsc::unbounded_channel::<JSONRPCMessage>();
let writer_task = tokio::spawn(async move {
let mut stdin = stdin;
while let Some(message) = write_rx.recv().await {
let encoded = match serde_json::to_vec(&message) {
Ok(encoded) => encoded,
Err(err) => {
warn!("failed to encode exec-server message: {err}");
break;
}
};
if stdin.write_all(&encoded).await.is_err() {
break;
}
if stdin.write_all(b"\n").await.is_err() {
break;
}
if stdin.flush().await.is_err() {
break;
}
}
});
let pending = Mutex::new(HashMap::<
RequestId,
oneshot::Sender<Result<Value, JSONRPCErrorError>>,
>::new());
let inner = Arc::new_cyclic(move |weak| {
let inner = Arc::new_cyclic(|weak| {
let weak = weak.clone();
let reader_task = tokio::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
loop {
let Some(inner) = weak.upgrade() else {
break;
};
match lines.next_line().await {
Ok(Some(line)) => {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<JSONRPCMessage>(&line) {
Ok(message) => {
if let Err(err) = handle_server_message(&inner, message).await {
warn!("failed to handle exec-server message: {err}");
break;
}
}
Err(err) => {
warn!("failed to parse exec-server message: {err}");
break;
}
}
}
Ok(None) => break,
Err(err) => {
warn!("failed to read exec-server stdout: {err}");
break;
}
while let Some(message) = outgoing_rx.recv().await {
if let Some(inner) = weak.upgrade()
&& let Err(err) = handle_in_process_outbound_message(&inner, message).await
{
warn!(
"in-process exec-server client closing after unexpected response: {err}"
);
return;
}
}
if let Some(inner) = weak.upgrade() {
fail_pending_requests(&inner).await;
}
});
Inner {
child: StdMutex::new(Some(child)),
write_tx,
pending,
next_request_id: AtomicI64::new(1),
backend: ClientBackend::InProcess(backend),
events_tx: broadcast::channel(256).0,
reader_task,
writer_task,
}
});
let client = Self { inner };
client
.initialize(InitializeParams {
client_name: "codex-core".to_string(),
})
.await?;
client.send_notification(INITIALIZED_METHOD, serde_json::json!({}))?;
client.initialize(options).await?;
Ok(client)
}
pub async fn initialize(
&self,
params: InitializeParams,
) -> Result<InitializeResponse, ExecServerError> {
self.send_request(INITIALIZE_METHOD, params).await
pub async fn connect_stdio<R, W>(
stdin: W,
stdout: R,
options: ExecServerClientConnectOptions,
) -> Result<Self, ExecServerError>
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let (backend, events_rx) = JsonRpcBackend::connect_stdio(stdin, stdout);
Self::connect(backend, events_rx, options).await
}
async fn send_request<P, R>(&self, method: &str, params: P) -> Result<R, ExecServerError>
pub async fn connect_websocket(
args: RemoteExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
let options = args.clone().into();
let (backend, events_rx) = JsonRpcBackend::connect_websocket(&args).await?;
Self::connect(backend, events_rx, options).await
}
async fn connect(
backend: JsonRpcBackend,
mut rpc_events_rx: mpsc::Receiver<RpcClientEvent>,
options: ExecServerClientConnectOptions,
) -> Result<Self, ExecServerError> {
let inner = Arc::new_cyclic(|weak| {
let weak = weak.clone();
let reader_task = tokio::spawn(async move {
while let Some(event) = rpc_events_rx.recv().await {
match event {
RpcClientEvent::Notification(notification) => {
if let Some(inner) = weak.upgrade()
&& let Err(err) =
handle_server_notification(&inner, notification).await
{
warn!("exec-server client closing after protocol error: {err}");
return;
}
}
RpcClientEvent::Disconnected { reason } => {
if let Some(reason) = reason {
warn!("exec-server client transport disconnected: {reason}");
}
return;
}
}
}
});
Inner {
backend: ClientBackend::JsonRpc(backend),
events_tx: broadcast::channel(256).0,
reader_task,
}
});
let client = Self { inner };
client.initialize(options).await?;
Ok(client)
}
pub fn event_receiver(&self) -> broadcast::Receiver<ExecServerEvent> {
self.inner.events_tx.subscribe()
}
#[cfg(test)]
async fn start_process(
&self,
params: ExecParams,
) -> Result<ExecServerProcess, ExecServerError> {
let response = self.exec(params).await?;
let process_id = response.process_id;
let status = Arc::new(RemoteProcessStatus::new());
let (output_tx, output_rx) = broadcast::channel(256);
let mut events_rx = self.event_receiver();
let status_watcher = Arc::clone(&status);
let watch_process_id = process_id.clone();
tokio::spawn(async move {
while let Ok(event) = events_rx.recv().await {
match event {
ExecServerEvent::OutputDelta(notification)
if notification.process_id == watch_process_id =>
{
let _ = output_tx.send(ExecServerOutput {
stream: notification.stream,
chunk: notification.chunk.into_inner(),
});
}
ExecServerEvent::Exited(notification)
if notification.process_id == watch_process_id =>
{
status_watcher.mark_exited(Some(notification.exit_code));
break;
}
ExecServerEvent::OutputDelta(_) | ExecServerEvent::Exited(_) => {}
}
}
});
Ok(ExecServerProcess {
process_id,
output_rx,
status,
client: self.clone(),
})
}
pub async fn exec(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
self.request_or_local(EXEC_METHOD, params, |backend, params| async move {
backend.exec(params).await
})
.await
}
pub async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
self.request_or_local(EXEC_READ_METHOD, params, |backend, params| async move {
backend.exec_read(params).await
})
.await
}
pub async fn write(
&self,
process_id: &str,
chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError> {
let params = WriteParams {
process_id: process_id.to_string(),
chunk: chunk.into(),
};
self.request_or_local(EXEC_WRITE_METHOD, params, |backend, params| async move {
backend.exec_write(params).await
})
.await
}
pub async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
let params = TerminateParams {
process_id: process_id.to_string(),
};
self.request_or_local(
EXEC_TERMINATE_METHOD,
params,
|backend, params| async move { backend.terminate(params).await },
)
.await
}
pub async fn fs_read_file(
&self,
params: FsReadFileParams,
) -> Result<FsReadFileResponse, ExecServerError> {
self.request_or_local(FS_READ_FILE_METHOD, params, |backend, params| async move {
backend.fs_read_file(params).await
})
.await
}
pub async fn fs_write_file(
&self,
params: FsWriteFileParams,
) -> Result<FsWriteFileResponse, ExecServerError> {
self.request_or_local(FS_WRITE_FILE_METHOD, params, |backend, params| async move {
backend.fs_write_file(params).await
})
.await
}
pub async fn fs_create_directory(
&self,
params: FsCreateDirectoryParams,
) -> Result<FsCreateDirectoryResponse, ExecServerError> {
self.request_or_local(
FS_CREATE_DIRECTORY_METHOD,
params,
|backend, params| async move { backend.fs_create_directory(params).await },
)
.await
}
pub async fn fs_get_metadata(
&self,
params: FsGetMetadataParams,
) -> Result<FsGetMetadataResponse, ExecServerError> {
self.request_or_local(
FS_GET_METADATA_METHOD,
params,
|backend, params| async move { backend.fs_get_metadata(params).await },
)
.await
}
pub async fn fs_read_directory(
&self,
params: FsReadDirectoryParams,
) -> Result<FsReadDirectoryResponse, ExecServerError> {
self.request_or_local(
FS_READ_DIRECTORY_METHOD,
params,
|backend, params| async move { backend.fs_read_directory(params).await },
)
.await
}
pub async fn fs_remove(
&self,
params: FsRemoveParams,
) -> Result<FsRemoveResponse, ExecServerError> {
self.request_or_local(FS_REMOVE_METHOD, params, |backend, params| async move {
backend.fs_remove(params).await
})
.await
}
pub async fn fs_copy(&self, params: FsCopyParams) -> Result<FsCopyResponse, ExecServerError> {
self.request_or_local(FS_COPY_METHOD, params, |backend, params| async move {
backend.fs_copy(params).await
})
.await
}
async fn initialize(
&self,
options: ExecServerClientConnectOptions,
) -> Result<(), ExecServerError> {
let ExecServerClientConnectOptions {
client_name,
initialize_timeout,
} = options;
timeout(initialize_timeout, async {
if let Some(backend) = self.inner.backend.as_local() {
backend.initialize().await?;
} else {
let params = crate::protocol::InitializeParams { client_name };
let _: InitializeResponse =
self.send_remote_request(INITIALIZE_METHOD, &params).await?;
}
self.notify(INITIALIZED_METHOD, &serde_json::json!({}))
.await
})
.await
.map_err(|_| ExecServerError::InitializeTimedOut {
timeout: initialize_timeout,
})?
}
async fn notify<P: Serialize>(&self, method: &str, params: &P) -> Result<(), ExecServerError> {
match &self.inner.backend {
ClientBackend::JsonRpc(backend) => backend.notify(method, params).await,
ClientBackend::InProcess(backend) => backend.notify(method).await,
}
}
async fn send_remote_request<P, T>(
&self,
method: &str,
params: &P,
) -> Result<T, ExecServerError>
where
P: Serialize,
R: DeserializeOwned,
T: serde::de::DeserializeOwned,
{
let id = RequestId::Integer(self.inner.next_request_id.fetch_add(1, Ordering::SeqCst));
let params = serde_json::to_value(params)?;
let (tx, rx) = oneshot::channel();
self.inner.pending.lock().await.insert(id.clone(), tx);
let ClientBackend::JsonRpc(backend) = &self.inner.backend else {
unreachable!("in-process requests return before JSON-RPC setup");
};
backend.call(method, params).await
}
if let Err(err) = self
.inner
.write_tx
.send(JSONRPCMessage::Request(JSONRPCRequest {
id: id.clone(),
method: method.to_string(),
params: Some(params),
trace: None,
}))
{
let _ = self.inner.pending.lock().await.remove(&id);
return Err(ExecServerError::Protocol(format!(
"failed to queue exec-server request: {err}"
)));
#[cfg(test)]
async fn pending_request_count(&self) -> usize {
match &self.inner.backend {
ClientBackend::JsonRpc(backend) => backend.pending_request_count().await,
ClientBackend::InProcess(_) => 0,
}
}
async fn request_or_local<P, T, Fut>(
&self,
method: &str,
params: P,
call_local: impl FnOnce(LocalBackend, P) -> Fut,
) -> Result<T, ExecServerError>
where
P: Serialize,
T: serde::de::DeserializeOwned,
Fut: Future<Output = Result<T, ExecServerError>>,
{
if let Some(backend) = self.inner.backend.as_local() {
return call_local(backend.clone(), params).await;
}
let result = rx.await.map_err(|_| ExecServerError::Closed)??;
Ok(serde_json::from_value(result)?)
}
fn send_notification<P>(&self, method: &str, params: P) -> Result<(), ExecServerError>
where
P: Serialize,
{
let params = serde_json::to_value(params)?;
self.inner
.write_tx
.send(JSONRPCMessage::Notification(JSONRPCNotification {
method: method.to_string(),
params: Some(params),
}))
.map_err(|err| {
ExecServerError::Protocol(format!(
"failed to queue exec-server notification: {err}"
))
})
self.send_remote_request(method, &params).await
}
}
impl From<JSONRPCErrorError> for ExecServerError {
fn from(error: JSONRPCErrorError) -> Self {
Self::Server {
fn server_result_to_client<T>(result: Result<T, JSONRPCErrorError>) -> Result<T, ExecServerError> {
match result {
Ok(response) => Ok(response),
Err(error) => Err(ExecServerError::Server {
code: error.code,
message: error.message,
}
}),
}
}
async fn handle_server_message(
inner: &Inner,
message: JSONRPCMessage,
async fn handle_in_process_outbound_message(
inner: &Arc<Inner>,
message: ExecServerOutboundMessage,
) -> Result<(), ExecServerError> {
match message {
JSONRPCMessage::Response(JSONRPCResponse { id, result }) => {
if let Some(tx) = inner.pending.lock().await.remove(&id) {
let _ = tx.send(Ok(result));
}
Ok(())
ExecServerOutboundMessage::Response { .. } | ExecServerOutboundMessage::Error { .. } => {
return Err(ExecServerError::Protocol(
"unexpected in-process RPC response".to_string(),
));
}
JSONRPCMessage::Error(JSONRPCError { id, error }) => {
if let Some(tx) = inner.pending.lock().await.remove(&id) {
let _ = tx.send(Err(error));
Ok(())
} else {
Err(ExecServerError::Server {
code: error.code,
message: error.message,
})
}
ExecServerOutboundMessage::Notification(notification) => {
handle_in_process_notification(inner, notification).await;
}
}
Ok(())
}
async fn handle_in_process_notification(
inner: &Arc<Inner>,
notification: ExecServerServerNotification,
) {
match notification {
ExecServerServerNotification::OutputDelta(params) => {
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
}
ExecServerServerNotification::Exited(params) => {
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
}
JSONRPCMessage::Notification(notification) => Err(ExecServerError::Protocol(format!(
"unexpected exec-server notification: {}",
notification.method
))),
JSONRPCMessage::Request(request) => Err(ExecServerError::Protocol(format!(
"unexpected exec-server request: {}",
request.method
))),
}
}
async fn fail_pending_requests(inner: &Inner) {
let mut pending = inner.pending.lock().await;
for (_, tx) in pending.drain() {
let _ = tx.send(Err(JSONRPCErrorError {
code: -32000,
message: "exec-server transport closed".to_string(),
data: None,
}));
async fn handle_server_notification(
inner: &Arc<Inner>,
notification: JSONRPCNotification,
) -> Result<(), ExecServerError> {
match notification.method.as_str() {
EXEC_OUTPUT_DELTA_METHOD => {
let params: ExecOutputDeltaNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
}
EXEC_EXITED_METHOD => {
let params: ExecExitedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
}
other => {
debug!("ignoring unknown exec-server notification: {other}");
}
}
Ok(())
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,98 @@
use serde::Serialize;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::rpc::RpcCallError;
use crate::rpc::RpcClient;
use crate::rpc::RpcClientEvent;
use super::ExecServerError;
pub(super) struct JsonRpcBackend {
rpc: RpcClient,
}
impl JsonRpcBackend {
pub(super) fn connect_stdio<R, W>(stdin: W, stdout: R) -> (Self, mpsc::Receiver<RpcClientEvent>)
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
Self::connect(JsonRpcConnection::from_stdio(
stdout,
stdin,
"exec-server stdio".to_string(),
))
}
pub(super) async fn connect_websocket(
args: &RemoteExecServerConnectArgs,
) -> Result<(Self, mpsc::Receiver<RpcClientEvent>), ExecServerError> {
let websocket_url = args.websocket_url.clone();
let connect_timeout = args.connect_timeout;
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
Ok(Self::connect(JsonRpcConnection::from_websocket(
stream,
format!("exec-server websocket {websocket_url}"),
)))
}
fn connect(connection: JsonRpcConnection) -> (Self, mpsc::Receiver<RpcClientEvent>) {
let (rpc, events_rx) = RpcClient::new(connection);
(Self { rpc }, events_rx)
}
pub(super) async fn notify<P: Serialize>(
&self,
method: &str,
params: &P,
) -> Result<(), ExecServerError> {
self.rpc
.notify(method, params)
.await
.map_err(|err| match err.classify() {
serde_json::error::Category::Io => ExecServerError::Closed,
serde_json::error::Category::Syntax
| serde_json::error::Category::Data
| serde_json::error::Category::Eof => ExecServerError::Json(err),
})
}
pub(super) async fn call<P, T>(&self, method: &str, params: &P) -> Result<T, ExecServerError>
where
P: Serialize,
T: serde::de::DeserializeOwned,
{
self.rpc
.call(method, params)
.await
.map_err(|err| match err {
RpcCallError::Closed => ExecServerError::Closed,
RpcCallError::Json(err) => ExecServerError::Json(err),
RpcCallError::Server(error) => ExecServerError::Server {
code: error.code,
message: error.message,
},
})
}
#[cfg(test)]
pub(super) async fn pending_request_count(&self) -> usize {
self.rpc.pending_request_count().await
}
}

View File

@@ -0,0 +1,137 @@
use std::sync::Arc;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use crate::server::ExecServerHandler;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
use codex_app_server_protocol::FsCreateDirectoryResponse;
use codex_app_server_protocol::FsGetMetadataParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryParams;
use codex_app_server_protocol::FsReadDirectoryResponse;
use codex_app_server_protocol::FsReadFileParams;
use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsRemoveParams;
use codex_app_server_protocol::FsRemoveResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
use super::ExecServerError;
use super::server_result_to_client;
#[derive(Clone)]
pub(super) struct LocalBackend {
handler: Arc<ExecServerHandler>,
}
impl LocalBackend {
pub(super) fn new(handler: ExecServerHandler) -> Self {
Self {
handler: Arc::new(handler),
}
}
pub(super) async fn shutdown(&self) {
self.handler.shutdown().await;
}
pub(super) async fn initialize(&self) -> Result<InitializeResponse, ExecServerError> {
server_result_to_client(self.handler.initialize())
}
pub(super) async fn notify(&self, method: &str) -> Result<(), ExecServerError> {
match method {
INITIALIZED_METHOD => self
.handler
.initialized()
.map_err(ExecServerError::Protocol),
other => Err(ExecServerError::Protocol(format!(
"unsupported in-process notification method `{other}`"
))),
}
}
pub(super) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
server_result_to_client(self.handler.exec(params).await)
}
pub(super) async fn exec_read(
&self,
params: ReadParams,
) -> Result<ReadResponse, ExecServerError> {
server_result_to_client(self.handler.exec_read(params).await)
}
pub(super) async fn exec_write(
&self,
params: WriteParams,
) -> Result<WriteResponse, ExecServerError> {
server_result_to_client(self.handler.exec_write(params).await)
}
pub(super) async fn terminate(
&self,
params: TerminateParams,
) -> Result<TerminateResponse, ExecServerError> {
server_result_to_client(self.handler.terminate(params).await)
}
pub(super) async fn fs_read_file(
&self,
params: FsReadFileParams,
) -> Result<FsReadFileResponse, ExecServerError> {
server_result_to_client(self.handler.fs_read_file(params).await)
}
pub(super) async fn fs_write_file(
&self,
params: FsWriteFileParams,
) -> Result<FsWriteFileResponse, ExecServerError> {
server_result_to_client(self.handler.fs_write_file(params).await)
}
pub(super) async fn fs_create_directory(
&self,
params: FsCreateDirectoryParams,
) -> Result<FsCreateDirectoryResponse, ExecServerError> {
server_result_to_client(self.handler.fs_create_directory(params).await)
}
pub(super) async fn fs_get_metadata(
&self,
params: FsGetMetadataParams,
) -> Result<FsGetMetadataResponse, ExecServerError> {
server_result_to_client(self.handler.fs_get_metadata(params).await)
}
pub(super) async fn fs_read_directory(
&self,
params: FsReadDirectoryParams,
) -> Result<FsReadDirectoryResponse, ExecServerError> {
server_result_to_client(self.handler.fs_read_directory(params).await)
}
pub(super) async fn fs_remove(
&self,
params: FsRemoveParams,
) -> Result<FsRemoveResponse, ExecServerError> {
server_result_to_client(self.handler.fs_remove(params).await)
}
pub(super) async fn fs_copy(
&self,
params: FsCopyParams,
) -> Result<FsCopyResponse, ExecServerError> {
server_result_to_client(self.handler.fs_copy(params).await)
}
}

View File

@@ -0,0 +1,72 @@
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::sync::broadcast;
use super::ExecServerClient;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct ExecServerOutput {
pub(super) stream: crate::protocol::ExecOutputStream,
pub(super) chunk: Vec<u8>,
}
pub(super) struct ExecServerProcess {
pub(super) process_id: String,
pub(super) output_rx: broadcast::Receiver<ExecServerOutput>,
pub(super) status: Arc<RemoteProcessStatus>,
pub(super) client: ExecServerClient,
}
impl ExecServerProcess {
pub(super) fn output_receiver(&self) -> broadcast::Receiver<ExecServerOutput> {
self.output_rx.resubscribe()
}
pub(super) fn has_exited(&self) -> bool {
self.status.has_exited()
}
pub(super) fn exit_code(&self) -> Option<i32> {
self.status.exit_code()
}
pub(super) fn terminate(&self) {
let client = self.client.clone();
let process_id = self.process_id.clone();
tokio::spawn(async move {
let _ = client.terminate(&process_id).await;
});
}
}
pub(super) struct RemoteProcessStatus {
exited: AtomicBool,
exit_code: StdMutex<Option<i32>>,
}
impl RemoteProcessStatus {
pub(super) fn new() -> Self {
Self {
exited: AtomicBool::new(false),
exit_code: StdMutex::new(None),
}
}
pub(super) fn has_exited(&self) -> bool {
self.exited.load(Ordering::SeqCst)
}
pub(super) fn exit_code(&self) -> Option<i32> {
self.exit_code.lock().ok().and_then(|guard| *guard)
}
pub(super) fn mark_exited(&self, exit_code: Option<i32>) {
self.exited.store(true, Ordering::SeqCst);
if let Ok(mut guard) = self.exit_code.lock() {
*guard = exit_code;
}
}
}

View File

@@ -0,0 +1,981 @@
use std::collections::HashMap;
use std::time::Duration;
use pretty_assertions::assert_eq;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::time::timeout;
use super::ExecServerClient;
use super::ExecServerClientConnectOptions;
use super::ExecServerError;
use super::ExecServerOutput;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::ExecOutputStream;
use crate::protocol::ExecParams;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::PROTOCOL_VERSION;
use crate::protocol::ReadParams;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
fn test_options() -> ExecServerClientConnectOptions {
ExecServerClientConnectOptions {
client_name: "test-client".to_string(),
initialize_timeout: Duration::from_secs(1),
}
}
async fn read_jsonrpc_line<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> JSONRPCMessage
where
R: tokio::io::AsyncRead + Unpin,
{
let next_line = timeout(Duration::from_secs(1), lines.next_line()).await;
let line_result = match next_line {
Ok(line_result) => line_result,
Err(err) => panic!("timed out waiting for JSON-RPC line: {err}"),
};
let maybe_line = match line_result {
Ok(maybe_line) => maybe_line,
Err(err) => panic!("failed to read JSON-RPC line: {err}"),
};
let line = match maybe_line {
Some(line) => line,
None => panic!("server connection closed before JSON-RPC line arrived"),
};
match serde_json::from_str::<JSONRPCMessage>(&line) {
Ok(message) => message,
Err(err) => panic!("failed to parse JSON-RPC line: {err}"),
}
}
async fn write_jsonrpc_line<W>(writer: &mut W, message: JSONRPCMessage)
where
W: tokio::io::AsyncWrite + Unpin,
{
let encoded = match serde_json::to_string(&message) {
Ok(encoded) => encoded,
Err(err) => panic!("failed to encode JSON-RPC message: {err}"),
};
if let Err(err) = writer.write_all(format!("{encoded}\n").as_bytes()).await {
panic!("failed to write JSON-RPC line: {err}");
}
}
#[tokio::test]
async fn connect_stdio_performs_initialize_handshake() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
let server = tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(request) = initialize else {
panic!("expected initialize request");
};
assert_eq!(request.method, INITIALIZE_METHOD);
assert_eq!(
request.params,
Some(serde_json::json!({ "clientName": "test-client" }))
);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(JSONRPCNotification { method, params }) = initialized
else {
panic!("expected initialized notification");
};
assert_eq!(method, INITIALIZED_METHOD);
assert_eq!(params, Some(serde_json::json!({})));
});
let client = ExecServerClient::connect_stdio(client_stdin, client_stdout, test_options()).await;
if let Err(err) = client {
panic!("failed to connect test client: {err}");
}
if let Err(err) = server.await {
panic!("server task failed: {err}");
}
}
#[tokio::test]
async fn connect_stdio_matches_out_of_order_responses_by_request_id() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
let server = tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(initialize) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(_) = initialized else {
panic!("expected initialized notification");
};
let first = read_jsonrpc_line(&mut lines).await;
let second = read_jsonrpc_line(&mut lines).await;
let (read_request, terminate_request) = match (first, second) {
(JSONRPCMessage::Request(first_request), JSONRPCMessage::Request(second_request))
if first_request.method == EXEC_READ_METHOD
&& second_request.method == EXEC_TERMINATE_METHOD =>
{
(first_request, second_request)
}
(JSONRPCMessage::Request(first_request), JSONRPCMessage::Request(second_request))
if first_request.method == EXEC_TERMINATE_METHOD
&& second_request.method == EXEC_READ_METHOD =>
{
(second_request, first_request)
}
_ => panic!("expected read and terminate requests"),
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: terminate_request.id,
result: serde_json::json!({ "running": false }),
}),
)
.await;
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: read_request.id,
result: serde_json::json!({
"chunks": [],
"nextSeq": 1,
"exited": false,
"exitCode": null,
}),
}),
)
.await;
});
let client = ExecServerClient::connect_stdio(client_stdin, client_stdout, test_options())
.await
.unwrap_or_else(|err| panic!("failed to connect test client: {err}"));
let (read, terminate) = tokio::join!(
client.read(ReadParams {
process_id: "proc-1".to_string(),
after_seq: None,
max_bytes: None,
wait_ms: Some(0),
}),
client.terminate("proc-1"),
);
let read = read.unwrap_or_else(|err| panic!("read failed: {err}"));
let terminate = terminate.unwrap_or_else(|err| panic!("terminate failed: {err}"));
assert_eq!(read.next_seq, 1);
assert!(!read.exited);
assert!(!terminate.running);
if let Err(err) = server.await {
panic!("server task failed: {err}");
}
}
#[tokio::test]
async fn connect_in_process_starts_processes_without_jsonrpc_transport() {
let client = match ExecServerClient::connect_in_process(test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect in-process client: {err}"),
};
let process = match client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["printf".to_string(), "hello".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
})
.await
{
Ok(process) => process,
Err(err) => panic!("failed to start in-process child: {err}"),
};
let mut output = process.output_receiver();
let output = timeout(Duration::from_secs(1), output.recv())
.await
.unwrap_or_else(|err| panic!("timed out waiting for process output: {err}"))
.unwrap_or_else(|err| panic!("failed to receive process output: {err}"));
assert_eq!(
output,
ExecServerOutput {
stream: crate::protocol::ExecOutputStream::Stdout,
chunk: b"hello".to_vec(),
}
);
}
#[tokio::test]
async fn connect_in_process_read_returns_retained_output_and_exit_state() {
let client = match ExecServerClient::connect_in_process(test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect in-process client: {err}"),
};
let response = match client
.exec(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["printf".to_string(), "hello".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
})
.await
{
Ok(response) => response,
Err(err) => panic!("failed to start in-process child: {err}"),
};
let process_id = response.process_id.clone();
let read = match client
.read(ReadParams {
process_id: process_id.clone(),
after_seq: None,
max_bytes: None,
wait_ms: Some(1000),
})
.await
{
Ok(read) => read,
Err(err) => panic!("failed to read in-process child output: {err}"),
};
assert_eq!(read.chunks.len(), 1);
assert_eq!(read.chunks[0].seq, 1);
assert_eq!(read.chunks[0].stream, ExecOutputStream::Stdout);
assert_eq!(read.chunks[0].chunk.clone().into_inner(), b"hello".to_vec());
assert_eq!(read.next_seq, 2);
let read = if read.exited {
read
} else {
match client
.read(ReadParams {
process_id,
after_seq: Some(read.next_seq - 1),
max_bytes: None,
wait_ms: Some(1000),
})
.await
{
Ok(read) => read,
Err(err) => panic!("failed to wait for in-process child exit: {err}"),
}
};
assert!(read.exited);
assert_eq!(read.exit_code, Some(0));
}
#[tokio::test]
async fn connect_in_process_rejects_invalid_exec_params_from_handler() {
let client = match ExecServerClient::connect_in_process(test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect in-process client: {err}"),
};
let result = client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: Vec::new(),
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
})
.await;
match result {
Err(ExecServerError::Server { code, message }) => {
assert_eq!(code, -32602);
assert_eq!(message, "argv must not be empty");
}
Err(err) => panic!("unexpected in-process exec failure: {err}"),
Ok(_) => panic!("expected invalid params error"),
}
}
#[tokio::test]
async fn connect_in_process_rejects_writes_to_unknown_processes() {
let client = match ExecServerClient::connect_in_process(test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect in-process client: {err}"),
};
let result = client.write("missing", b"input".to_vec()).await;
match result {
Err(ExecServerError::Server { code, message }) => {
assert_eq!(code, -32600);
assert_eq!(message, "unknown process id missing");
}
Err(err) => panic!("unexpected in-process write failure: {err}"),
Ok(_) => panic!("expected unknown process error"),
}
}
#[tokio::test]
async fn connect_in_process_terminate_marks_process_exited() {
let client = match ExecServerClient::connect_in_process(test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect in-process client: {err}"),
};
let process = match client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["sleep".to_string(), "30".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
})
.await
{
Ok(process) => process,
Err(err) => panic!("failed to start in-process child: {err}"),
};
if let Err(err) = client.terminate(&process.process_id).await {
panic!("failed to terminate in-process child: {err}");
}
timeout(Duration::from_secs(2), async {
loop {
if process.has_exited() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap_or_else(|err| panic!("timed out waiting for in-process child to exit: {err}"));
assert!(process.has_exited());
}
#[tokio::test]
async fn dropping_in_process_client_terminates_running_processes() {
let marker_path = std::env::temp_dir().join(format!(
"codex-exec-server-inprocess-drop-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system time")
.as_nanos()
));
let _ = std::fs::remove_file(&marker_path);
{
let client = match ExecServerClient::connect_in_process(test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect in-process client: {err}"),
};
let _ = client
.exec(ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
format!("sleep 2; printf dropped > {}", marker_path.display()),
],
cwd: std::env::current_dir().expect("cwd"),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
})
.await
.unwrap_or_else(|err| panic!("failed to start in-process child: {err}"));
}
tokio::time::sleep(Duration::from_secs(3)).await;
assert!(
!marker_path.exists(),
"dropping the in-process client should terminate managed children"
);
let _ = std::fs::remove_file(&marker_path);
}
#[tokio::test]
async fn connect_stdio_returns_initialize_errors() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(request) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Error(JSONRPCError {
id: request.id,
error: JSONRPCErrorError {
code: -32600,
message: "rejected".to_string(),
data: None,
},
}),
)
.await;
});
let result = ExecServerClient::connect_stdio(client_stdin, client_stdout, test_options()).await;
match result {
Err(ExecServerError::Server { code, message }) => {
assert_eq!(code, -32600);
assert_eq!(message, "rejected");
}
Err(err) => panic!("unexpected initialize failure: {err}"),
Ok(_) => panic!("expected initialize failure"),
}
}
#[tokio::test]
async fn start_process_cleans_up_registered_process_after_request_error() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(initialize_request) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize_request.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(notification) = initialized else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, INITIALIZED_METHOD);
let exec_request = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else {
panic!("expected exec request");
};
assert_eq!(method, EXEC_METHOD);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Error(JSONRPCError {
id,
error: JSONRPCErrorError {
code: -32600,
message: "duplicate process".to_string(),
data: None,
},
}),
)
.await;
});
let client =
match ExecServerClient::connect_stdio(client_stdin, client_stdout, test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect test client: {err}"),
};
let result = client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
sandbox: None,
})
.await;
match result {
Err(ExecServerError::Server { code, message }) => {
assert_eq!(code, -32600);
assert_eq!(message, "duplicate process");
}
Err(err) => panic!("unexpected start_process failure: {err}"),
Ok(_) => panic!("expected start_process failure"),
}
assert!(
client.pending_request_count().await == 0,
"failed requests should not leave pending request state behind"
);
}
#[tokio::test]
async fn connect_stdio_times_out_during_initialize_handshake() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (_server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let _ = read_jsonrpc_line(&mut lines).await;
tokio::time::sleep(Duration::from_millis(200)).await;
});
let result = ExecServerClient::connect_stdio(
client_stdin,
client_stdout,
ExecServerClientConnectOptions {
client_name: "test-client".to_string(),
initialize_timeout: Duration::from_millis(25),
},
)
.await;
match result {
Err(ExecServerError::InitializeTimedOut { timeout }) => {
assert_eq!(timeout, Duration::from_millis(25));
}
Err(err) => panic!("unexpected initialize timeout failure: {err}"),
Ok(_) => panic!("expected initialize timeout"),
}
}
#[tokio::test]
async fn start_process_preserves_output_stream_metadata() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(initialize_request) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize_request.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(notification) = initialized else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, INITIALIZED_METHOD);
let exec_request = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else {
panic!("expected exec request");
};
assert_eq!(method, EXEC_METHOD);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id,
result: serde_json::json!({ "processId": "proc-1" }),
}),
)
.await;
tokio::time::sleep(Duration::from_millis(25)).await;
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"stream": "stderr",
"chunk": "ZXJyb3IK"
})),
}),
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
});
let client =
match ExecServerClient::connect_stdio(client_stdin, client_stdout, test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect test client: {err}"),
};
let process = match client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
sandbox: None,
})
.await
{
Ok(process) => process,
Err(err) => panic!("failed to start process: {err}"),
};
let mut output = process.output_receiver();
let output = timeout(Duration::from_secs(1), output.recv())
.await
.unwrap_or_else(|err| panic!("timed out waiting for process output: {err}"))
.unwrap_or_else(|err| panic!("failed to receive process output: {err}"));
assert_eq!(output.stream, ExecOutputStream::Stderr);
assert_eq!(output.chunk, b"error\n".to_vec());
}
#[tokio::test]
async fn terminate_does_not_mark_process_exited_before_exit_notification() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(initialize_request) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize_request.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(notification) = initialized else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, INITIALIZED_METHOD);
let exec_request = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else {
panic!("expected exec request");
};
assert_eq!(method, EXEC_METHOD);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id,
result: serde_json::json!({ "processId": "proc-1" }),
}),
)
.await;
let terminate_request = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = terminate_request else {
panic!("expected terminate request");
};
assert_eq!(method, EXEC_TERMINATE_METHOD);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id,
result: serde_json::json!({ "running": true }),
}),
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
});
let client =
match ExecServerClient::connect_stdio(client_stdin, client_stdout, test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect test client: {err}"),
};
let process = match client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
sandbox: None,
})
.await
{
Ok(process) => process,
Err(err) => panic!("failed to start process: {err}"),
};
process.terminate();
tokio::time::sleep(Duration::from_millis(25)).await;
assert!(!process.has_exited(), "terminate should not imply exit");
assert_eq!(process.exit_code(), None);
}
#[tokio::test]
async fn start_process_uses_protocol_process_ids() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(initialize_request) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize_request.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(notification) = initialized else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, INITIALIZED_METHOD);
let exec_request = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else {
panic!("expected exec request");
};
assert_eq!(method, EXEC_METHOD);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id,
result: serde_json::json!({ "processId": "other-proc" }),
}),
)
.await;
});
let client =
match ExecServerClient::connect_stdio(client_stdin, client_stdout, test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect test client: {err}"),
};
let process = match client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
sandbox: None,
})
.await
{
Ok(process) => process,
Err(err) => panic!("failed to start process: {err}"),
};
assert_eq!(process.process_id, "other-proc");
}
#[tokio::test]
async fn start_process_routes_output_for_protocol_process_ids() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(initialize_request) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize_request.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(notification) = initialized else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, INITIALIZED_METHOD);
let exec_request = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else {
panic!("expected exec request");
};
assert_eq!(method, EXEC_METHOD);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id,
result: serde_json::json!({ "processId": "proc-1" }),
}),
)
.await;
tokio::time::sleep(Duration::from_millis(25)).await;
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"stream": "stdout",
"chunk": "YWxpdmUK"
})),
}),
)
.await;
});
let client =
match ExecServerClient::connect_stdio(client_stdin, client_stdout, test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect test client: {err}"),
};
let first_process = match client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
sandbox: None,
})
.await
{
Ok(process) => process,
Err(err) => panic!("failed to start first process: {err}"),
};
let mut output = first_process.output_receiver();
let output = timeout(Duration::from_secs(1), output.recv())
.await
.unwrap_or_else(|err| panic!("timed out waiting for process output: {err}"))
.unwrap_or_else(|err| panic!("failed to receive process output: {err}"));
assert_eq!(output.stream, ExecOutputStream::Stdout);
assert_eq!(output.chunk, b"alive\n".to_vec());
}
#[tokio::test]
async fn transport_shutdown_marks_processes_exited_without_exit_codes() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(initialize_request) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize_request.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(notification) = initialized else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, INITIALIZED_METHOD);
let exec_request = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else {
panic!("expected exec request");
};
assert_eq!(method, EXEC_METHOD);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id,
result: serde_json::json!({ "processId": "proc-1" }),
}),
)
.await;
drop(server_writer);
});
let client =
match ExecServerClient::connect_stdio(client_stdin, client_stdout, test_options()).await {
Ok(client) => client,
Err(err) => panic!("failed to connect test client: {err}"),
};
let process = match client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
sandbox: None,
})
.await
{
Ok(process) => process,
Err(err) => panic!("failed to start process: {err}"),
};
let _ = process;
}

View File

@@ -0,0 +1,27 @@
use std::time::Duration;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
/// Connection options for any exec-server client transport.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecServerClientConnectOptions {
pub client_name: String,
pub initialize_timeout: Duration,
}
/// WebSocket connection arguments for a remote exec-server.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteExecServerConnectArgs {
pub websocket_url: String,
pub client_name: String,
pub connect_timeout: Duration,
pub initialize_timeout: Duration,
}
/// Connection-level server events.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecServerEvent {
OutputDelta(ExecOutputDeltaNotification),
Exited(ExecExitedNotification),
}

View File

@@ -0,0 +1,421 @@
use codex_app_server_protocol::JSONRPCMessage;
use futures::SinkExt;
use futures::StreamExt;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::BufWriter;
use tokio::sync::mpsc;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Message;
pub(crate) const CHANNEL_CAPACITY: usize = 128;
#[derive(Debug)]
pub(crate) enum JsonRpcConnectionEvent {
Message(JSONRPCMessage),
Disconnected { reason: Option<String> },
}
pub(crate) struct JsonRpcConnection {
outgoing_tx: mpsc::Sender<JSONRPCMessage>,
incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
task_handles: Vec<tokio::task::JoinHandle<()>>,
}
impl JsonRpcConnection {
pub(crate) fn from_stdio<R, W>(reader: R, writer: W, connection_label: String) -> Self
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
let reader_label = connection_label.clone();
let incoming_tx_for_reader = incoming_tx.clone();
let reader_task = tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<JSONRPCMessage>(&line) {
Ok(message) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
Err(err) => {
send_disconnected(
&incoming_tx_for_reader,
Some(format!(
"failed to parse JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
break;
}
}
}
Ok(None) => {
send_disconnected(&incoming_tx_for_reader, /*reason*/ None).await;
break;
}
Err(err) => {
send_disconnected(
&incoming_tx_for_reader,
Some(format!(
"failed to read JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
break;
}
}
}
});
let writer_task = tokio::spawn(async move {
let mut writer = BufWriter::new(writer);
while let Some(message) = outgoing_rx.recv().await {
if let Err(err) = write_jsonrpc_line_message(&mut writer, &message).await {
send_disconnected(
&incoming_tx,
Some(format!(
"failed to write JSON-RPC message to {connection_label}: {err}"
)),
)
.await;
break;
}
}
});
Self {
outgoing_tx,
incoming_rx,
task_handles: vec![reader_task, writer_task],
}
}
pub(crate) fn from_websocket<S>(stream: WebSocketStream<S>, connection_label: String) -> Self
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (mut websocket_writer, mut websocket_reader) = stream.split();
let reader_label = connection_label.clone();
let incoming_tx_for_reader = incoming_tx.clone();
let reader_task = tokio::spawn(async move {
loop {
match websocket_reader.next().await {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<JSONRPCMessage>(text.as_ref()) {
Ok(message) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
Err(err) => {
send_disconnected(
&incoming_tx_for_reader,
Some(format!(
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
break;
}
}
}
Some(Ok(Message::Binary(bytes))) => {
match serde_json::from_slice::<JSONRPCMessage>(bytes.as_ref()) {
Ok(message) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
Err(err) => {
send_disconnected(
&incoming_tx_for_reader,
Some(format!(
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
break;
}
}
}
Some(Ok(Message::Close(_))) => {
send_disconnected(&incoming_tx_for_reader, /*reason*/ None).await;
break;
}
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {}
Some(Ok(_)) => {}
Some(Err(err)) => {
send_disconnected(
&incoming_tx_for_reader,
Some(format!(
"failed to read websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
break;
}
None => {
send_disconnected(&incoming_tx_for_reader, /*reason*/ None).await;
break;
}
}
}
});
let writer_task = tokio::spawn(async move {
while let Some(message) = outgoing_rx.recv().await {
match serialize_jsonrpc_message(&message) {
Ok(encoded) => {
if let Err(err) = websocket_writer.send(Message::Text(encoded.into())).await
{
send_disconnected(
&incoming_tx,
Some(format!(
"failed to write websocket JSON-RPC message to {connection_label}: {err}"
)),
)
.await;
break;
}
}
Err(err) => {
send_disconnected(
&incoming_tx,
Some(format!(
"failed to serialize JSON-RPC message for {connection_label}: {err}"
)),
)
.await;
break;
}
}
}
});
Self {
outgoing_tx,
incoming_rx,
task_handles: vec![reader_task, writer_task],
}
}
pub(crate) fn into_parts(
self,
) -> (
mpsc::Sender<JSONRPCMessage>,
mpsc::Receiver<JsonRpcConnectionEvent>,
Vec<tokio::task::JoinHandle<()>>,
) {
(self.outgoing_tx, self.incoming_rx, self.task_handles)
}
}
async fn send_disconnected(
incoming_tx: &mpsc::Sender<JsonRpcConnectionEvent>,
reason: Option<String>,
) {
let _ = incoming_tx
.send(JsonRpcConnectionEvent::Disconnected { reason })
.await;
}
async fn write_jsonrpc_line_message<W>(
writer: &mut BufWriter<W>,
message: &JSONRPCMessage,
) -> std::io::Result<()>
where
W: AsyncWrite + Unpin,
{
let encoded =
serialize_jsonrpc_message(message).map_err(|err| std::io::Error::other(err.to_string()))?;
writer.write_all(encoded.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await
}
fn serialize_jsonrpc_message(message: &JSONRPCMessage) -> Result<String, serde_json::Error> {
serde_json::to_string(message)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::sync::mpsc;
use tokio::time::timeout;
use super::JsonRpcConnection;
use super::JsonRpcConnectionEvent;
use super::serialize_jsonrpc_message;
async fn recv_event(
incoming_rx: &mut mpsc::Receiver<JsonRpcConnectionEvent>,
) -> JsonRpcConnectionEvent {
let recv_result = timeout(Duration::from_secs(1), incoming_rx.recv()).await;
let maybe_event = match recv_result {
Ok(maybe_event) => maybe_event,
Err(err) => panic!("timed out waiting for connection event: {err}"),
};
match maybe_event {
Some(event) => event,
None => panic!("connection event stream ended unexpectedly"),
}
}
async fn read_jsonrpc_line<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> JSONRPCMessage
where
R: tokio::io::AsyncRead + Unpin,
{
let next_line = timeout(Duration::from_secs(1), lines.next_line()).await;
let line_result = match next_line {
Ok(line_result) => line_result,
Err(err) => panic!("timed out waiting for JSON-RPC line: {err}"),
};
let maybe_line = match line_result {
Ok(maybe_line) => maybe_line,
Err(err) => panic!("failed to read JSON-RPC line: {err}"),
};
let line = match maybe_line {
Some(line) => line,
None => panic!("connection closed before JSON-RPC line arrived"),
};
match serde_json::from_str::<JSONRPCMessage>(&line) {
Ok(message) => message,
Err(err) => panic!("failed to parse JSON-RPC line: {err}"),
}
}
#[tokio::test]
async fn stdio_connection_reads_and_writes_jsonrpc_messages() {
let (mut writer_to_connection, connection_reader) = tokio::io::duplex(1024);
let (connection_writer, reader_from_connection) = tokio::io::duplex(1024);
let connection =
JsonRpcConnection::from_stdio(connection_reader, connection_writer, "test".to_string());
let (outgoing_tx, mut incoming_rx, _task_handles) = connection.into_parts();
let incoming_message = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(7),
method: "initialize".to_string(),
params: Some(serde_json::json!({ "clientName": "test-client" })),
trace: None,
});
let encoded = match serialize_jsonrpc_message(&incoming_message) {
Ok(encoded) => encoded,
Err(err) => panic!("failed to serialize incoming message: {err}"),
};
if let Err(err) = writer_to_connection
.write_all(format!("{encoded}\n").as_bytes())
.await
{
panic!("failed to write to connection: {err}");
}
let event = recv_event(&mut incoming_rx).await;
match event {
JsonRpcConnectionEvent::Message(message) => {
assert_eq!(message, incoming_message);
}
JsonRpcConnectionEvent::Disconnected { reason } => {
panic!("unexpected disconnect event: {reason:?}");
}
}
let outgoing_message = JSONRPCMessage::Response(JSONRPCResponse {
id: RequestId::Integer(7),
result: serde_json::json!({ "protocolVersion": "exec-server.v0" }),
});
if let Err(err) = outgoing_tx.send(outgoing_message.clone()).await {
panic!("failed to queue outgoing message: {err}");
}
let mut lines = BufReader::new(reader_from_connection).lines();
let message = read_jsonrpc_line(&mut lines).await;
assert_eq!(message, outgoing_message);
}
#[tokio::test]
async fn stdio_connection_reports_parse_errors() {
let (mut writer_to_connection, connection_reader) = tokio::io::duplex(1024);
let (connection_writer, _reader_from_connection) = tokio::io::duplex(1024);
let connection =
JsonRpcConnection::from_stdio(connection_reader, connection_writer, "test".to_string());
let (_outgoing_tx, mut incoming_rx, _task_handles) = connection.into_parts();
if let Err(err) = writer_to_connection.write_all(b"not-json\n").await {
panic!("failed to write invalid JSON: {err}");
}
let event = recv_event(&mut incoming_rx).await;
match event {
JsonRpcConnectionEvent::Disconnected { reason } => {
let reason = match reason {
Some(reason) => reason,
None => panic!("expected a parse error reason"),
};
assert!(
reason.contains("failed to parse JSON-RPC message from test"),
"unexpected disconnect reason: {reason}"
);
}
JsonRpcConnectionEvent::Message(message) => {
panic!("unexpected JSON-RPC message: {message:?}");
}
}
}
#[tokio::test]
async fn stdio_connection_reports_clean_disconnect() {
let (writer_to_connection, connection_reader) = tokio::io::duplex(1024);
let (connection_writer, _reader_from_connection) = tokio::io::duplex(1024);
let connection =
JsonRpcConnection::from_stdio(connection_reader, connection_writer, "test".to_string());
let (_outgoing_tx, mut incoming_rx, _task_handles) = connection.into_parts();
drop(writer_to_connection);
let event = recv_event(&mut incoming_rx).await;
match event {
JsonRpcConnectionEvent::Disconnected { reason } => {
assert_eq!(reason, None);
}
JsonRpcConnectionEvent::Message(message) => {
panic!("unexpected JSON-RPC message: {message:?}");
}
}
}
}

View File

@@ -1,11 +1,31 @@
mod client;
mod client_api;
mod connection;
mod local;
mod protocol;
mod rpc;
mod server;
mod server_process;
pub use client::ExecServerClient;
pub use client::ExecServerError;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::ExecServerEvent;
pub use client_api::RemoteExecServerConnectArgs;
pub use local::ExecServerLaunchCommand;
pub use local::SpawnedExecServer;
pub use local::spawn_local_exec_server;
pub use protocol::ExecExitedNotification;
pub use protocol::ExecOutputDeltaNotification;
pub use protocol::ExecOutputStream;
pub use protocol::ExecParams;
pub use protocol::ExecResponse;
pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::TerminateParams;
pub use protocol::TerminateResponse;
pub use protocol::WriteParams;
pub use protocol::WriteResponse;
pub use server::ExecServerTransport;
pub use server::ExecServerTransportParseError;
pub use server::run_main;
pub use server_process::ExecServerLaunchCommand;
pub use server::run_main_with_transport;

View File

@@ -0,0 +1,70 @@
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Mutex as StdMutex;
use tokio::process::Child;
use tokio::process::Command;
use crate::client::ExecServerClient;
use crate::client::ExecServerError;
use crate::client_api::ExecServerClientConnectOptions;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecServerLaunchCommand {
pub program: PathBuf,
pub args: Vec<String>,
}
pub struct SpawnedExecServer {
client: ExecServerClient,
child: StdMutex<Option<Child>>,
}
impl SpawnedExecServer {
pub fn client(&self) -> &ExecServerClient {
&self.client
}
}
impl Drop for SpawnedExecServer {
fn drop(&mut self) {
if let Ok(mut child_guard) = self.child.lock()
&& let Some(child) = child_guard.as_mut()
{
let _ = child.start_kill();
}
}
}
pub async fn spawn_local_exec_server(
command: ExecServerLaunchCommand,
options: ExecServerClientConnectOptions,
) -> Result<SpawnedExecServer, ExecServerError> {
let mut child = Command::new(&command.program);
child.args(&command.args);
child.stdin(Stdio::piped());
child.stdout(Stdio::piped());
child.stderr(Stdio::inherit());
child.kill_on_drop(true);
let mut child = child.spawn().map_err(ExecServerError::Spawn)?;
let stdin = child.stdin.take().ok_or_else(|| {
ExecServerError::Protocol("exec-server stdin was not captured".to_string())
})?;
let stdout = child.stdout.take().ok_or_else(|| {
ExecServerError::Protocol("exec-server stdout was not captured".to_string())
})?;
let client = match ExecServerClient::connect_stdio(stdin, stdout, options).await {
Ok(client) => client,
Err(err) => {
let _ = child.start_kill();
return Err(err);
}
};
Ok(SpawnedExecServer {
client,
child: StdMutex::new(Some(child)),
})
}

View File

@@ -1,10 +1,43 @@
use std::collections::HashMap;
use std::path::PathBuf;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use serde::Deserialize;
use serde::Serialize;
pub const INITIALIZE_METHOD: &str = "initialize";
pub const INITIALIZED_METHOD: &str = "initialized";
pub const EXEC_METHOD: &str = "process/start";
pub const EXEC_READ_METHOD: &str = "process/read";
pub const EXEC_WRITE_METHOD: &str = "process/write";
pub const EXEC_TERMINATE_METHOD: &str = "process/terminate";
pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output";
pub const EXEC_EXITED_METHOD: &str = "process/exited";
pub const FS_READ_FILE_METHOD: &str = "fs/readFile";
pub const FS_WRITE_FILE_METHOD: &str = "fs/writeFile";
pub const FS_CREATE_DIRECTORY_METHOD: &str = "fs/createDirectory";
pub const FS_GET_METADATA_METHOD: &str = "fs/getMetadata";
pub const FS_READ_DIRECTORY_METHOD: &str = "fs/readDirectory";
pub const FS_REMOVE_METHOD: &str = "fs/remove";
pub const FS_COPY_METHOD: &str = "fs/copy";
pub const PROTOCOL_VERSION: &str = "exec-server.v0";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ByteChunk(#[serde(with = "base64_bytes")] pub Vec<u8>);
impl ByteChunk {
pub fn into_inner(self) -> Vec<u8> {
self.0
}
}
impl From<Vec<u8>> for ByteChunk {
fn from(value: Vec<u8>) -> Self {
Self(value)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InitializeParams {
@@ -16,3 +49,136 @@ pub struct InitializeParams {
pub struct InitializeResponse {
pub protocol_version: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecParams {
/// Client-chosen logical process handle scoped to this connection/session.
/// This is a protocol key, not an OS pid.
pub process_id: String,
pub argv: Vec<String>,
pub cwd: PathBuf,
pub env: HashMap<String, String>,
pub tty: bool,
pub arg0: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sandbox: Option<ExecSandboxConfig>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecSandboxConfig {
pub mode: ExecSandboxMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ExecSandboxMode {
None,
HostDefault,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecResponse {
pub process_id: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadParams {
pub process_id: String,
pub after_seq: Option<u64>,
pub max_bytes: Option<usize>,
pub wait_ms: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProcessOutputChunk {
pub seq: u64,
pub stream: ExecOutputStream,
pub chunk: ByteChunk,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadResponse {
pub chunks: Vec<ProcessOutputChunk>,
pub next_seq: u64,
pub exited: bool,
pub exit_code: Option<i32>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WriteParams {
pub process_id: String,
pub chunk: ByteChunk,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WriteResponse {
pub accepted: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TerminateParams {
pub process_id: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TerminateResponse {
pub running: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ExecOutputStream {
Stdout,
Stderr,
Pty,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecOutputDeltaNotification {
pub process_id: String,
pub stream: ExecOutputStream,
pub chunk: ByteChunk,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecExitedNotification {
pub process_id: String,
pub exit_code: i32,
}
mod base64_bytes {
use super::BASE64_STANDARD;
use base64::Engine as _;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serializer;
pub fn serialize<S>(bytes: &[u8], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64_STANDARD.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let encoded = String::deserialize(deserializer)?;
BASE64_STANDARD
.decode(encoded)
.map_err(serde::de::Error::custom)
}
}

View File

@@ -0,0 +1,216 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::warn;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
type PendingRequest = oneshot::Sender<Result<Value, JSONRPCErrorError>>;
#[derive(Debug)]
pub(crate) enum RpcClientEvent {
Notification(JSONRPCNotification),
Disconnected { reason: Option<String> },
}
pub(crate) struct RpcClient {
write_tx: mpsc::Sender<JSONRPCMessage>,
pending: Arc<Mutex<HashMap<RequestId, PendingRequest>>>,
next_request_id: AtomicI64,
transport_tasks: Vec<JoinHandle<()>>,
reader_task: JoinHandle<()>,
}
impl RpcClient {
pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver<RpcClientEvent>) {
let (write_tx, mut incoming_rx, transport_tasks) = connection.into_parts();
let pending = Arc::new(Mutex::new(HashMap::<RequestId, PendingRequest>::new()));
let (event_tx, event_rx) = mpsc::channel(128);
let pending_for_reader = Arc::clone(&pending);
let reader_task = tokio::spawn(async move {
while let Some(event) = incoming_rx.recv().await {
match event {
JsonRpcConnectionEvent::Message(message) => {
if let Err(err) =
handle_server_message(&pending_for_reader, &event_tx, message).await
{
warn!("JSON-RPC client closing after protocol error: {err}");
break;
}
}
JsonRpcConnectionEvent::Disconnected { reason } => {
let _ = event_tx.send(RpcClientEvent::Disconnected { reason }).await;
drain_pending(&pending_for_reader).await;
return;
}
}
}
let _ = event_tx
.send(RpcClientEvent::Disconnected { reason: None })
.await;
drain_pending(&pending_for_reader).await;
});
(
Self {
write_tx,
pending,
next_request_id: AtomicI64::new(1),
transport_tasks,
reader_task,
},
event_rx,
)
}
pub(crate) async fn notify<P: Serialize>(
&self,
method: &str,
params: &P,
) -> Result<(), serde_json::Error> {
let params = serde_json::to_value(params)?;
self.write_tx
.send(JSONRPCMessage::Notification(JSONRPCNotification {
method: method.to_string(),
params: Some(params),
}))
.await
.map_err(|_| {
serde_json::Error::io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"JSON-RPC transport closed",
))
})
}
pub(crate) async fn call<P, T>(&self, method: &str, params: &P) -> Result<T, RpcCallError>
where
P: Serialize,
T: DeserializeOwned,
{
let request_id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::SeqCst));
let (response_tx, response_rx) = oneshot::channel();
self.pending
.lock()
.await
.insert(request_id.clone(), response_tx);
let params = match serde_json::to_value(params) {
Ok(params) => params,
Err(err) => {
self.pending.lock().await.remove(&request_id);
return Err(RpcCallError::Json(err));
}
};
if self
.write_tx
.send(JSONRPCMessage::Request(JSONRPCRequest {
id: request_id.clone(),
method: method.to_string(),
params: Some(params),
trace: None,
}))
.await
.is_err()
{
self.pending.lock().await.remove(&request_id);
return Err(RpcCallError::Closed);
}
let result = response_rx.await.map_err(|_| RpcCallError::Closed)?;
let response = match result {
Ok(response) => response,
Err(error) => return Err(RpcCallError::Server(error)),
};
serde_json::from_value(response).map_err(RpcCallError::Json)
}
#[cfg(test)]
pub(crate) async fn pending_request_count(&self) -> usize {
self.pending.lock().await.len()
}
}
impl Drop for RpcClient {
fn drop(&mut self) {
for task in &self.transport_tasks {
task.abort();
}
self.reader_task.abort();
}
}
#[derive(Debug)]
pub(crate) enum RpcCallError {
Closed,
Json(serde_json::Error),
Server(JSONRPCErrorError),
}
async fn handle_server_message(
pending: &Mutex<HashMap<RequestId, PendingRequest>>,
event_tx: &mpsc::Sender<RpcClientEvent>,
message: JSONRPCMessage,
) -> Result<(), String> {
match message {
JSONRPCMessage::Response(JSONRPCResponse { id, result }) => {
if let Some(pending) = pending.lock().await.remove(&id) {
let _ = pending.send(Ok(result));
}
}
JSONRPCMessage::Error(JSONRPCError { id, error }) => {
if let Some(pending) = pending.lock().await.remove(&id) {
let _ = pending.send(Err(error));
}
}
JSONRPCMessage::Notification(notification) => {
let _ = event_tx
.send(RpcClientEvent::Notification(notification))
.await;
}
JSONRPCMessage::Request(request) => {
return Err(format!(
"unexpected JSON-RPC request from remote server: {}",
request.method
));
}
}
Ok(())
}
async fn drain_pending(pending: &Mutex<HashMap<RequestId, PendingRequest>>) {
let pending = {
let mut pending = pending.lock().await;
pending
.drain()
.map(|(_, pending)| pending)
.collect::<Vec<_>>()
};
for pending in pending {
let _ = pending.send(Err(JSONRPCErrorError {
code: -32000,
data: None,
message: "JSON-RPC transport closed".to_string(),
}));
}
}

View File

@@ -1,137 +1,21 @@
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
mod filesystem;
mod handler;
mod processor;
mod routing;
mod transport;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeResponse;
use crate::protocol::PROTOCOL_VERSION;
pub(crate) use handler::ExecServerHandler;
pub(crate) use routing::ExecServerOutboundMessage;
pub(crate) use routing::ExecServerServerNotification;
pub use transport::ExecServerTransport;
pub use transport::ExecServerTransportParseError;
pub async fn run_main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut stdin = BufReader::new(tokio::io::stdin()).lines();
let mut stdout = tokio::io::stdout();
while let Some(line) = stdin.next_line().await? {
if line.trim().is_empty() {
continue;
}
let message = serde_json::from_str::<JSONRPCMessage>(&line)?;
match message {
JSONRPCMessage::Request(request) => {
handle_request(request, &mut stdout).await?;
}
JSONRPCMessage::Notification(notification) => {
if notification.method != INITIALIZED_METHOD {
send_error(
&mut stdout,
RequestId::Integer(-1),
invalid_request(format!(
"unexpected notification method: {}",
notification.method
)),
)
.await?;
}
}
JSONRPCMessage::Response(response) => {
send_error(
&mut stdout,
response.id,
invalid_request("unexpected response from client".to_string()),
)
.await?;
}
JSONRPCMessage::Error(error) => {
send_error(
&mut stdout,
error.id,
invalid_request("unexpected error from client".to_string()),
)
.await?;
}
}
}
Ok(())
run_main_with_transport(ExecServerTransport::Stdio).await
}
async fn handle_request(
request: JSONRPCRequest,
stdout: &mut tokio::io::Stdout,
) -> Result<(), std::io::Error> {
match request.method.as_str() {
INITIALIZE_METHOD => {
let result = serde_json::to_value(InitializeResponse {
protocol_version: PROTOCOL_VERSION.to_string(),
})
.map_err(std::io::Error::other)?;
send_response(
stdout,
JSONRPCResponse {
id: request.id,
result,
},
)
.await
}
method => {
send_error(
stdout,
request.id,
method_not_implemented(format!(
"exec-server stub does not implement `{method}` yet"
)),
)
.await
}
}
}
async fn send_response(
stdout: &mut tokio::io::Stdout,
response: JSONRPCResponse,
) -> Result<(), std::io::Error> {
send_message(stdout, &JSONRPCMessage::Response(response)).await
}
async fn send_error(
stdout: &mut tokio::io::Stdout,
id: RequestId,
error: JSONRPCErrorError,
) -> Result<(), std::io::Error> {
send_message(stdout, &JSONRPCMessage::Error(JSONRPCError { id, error })).await
}
async fn send_message(
stdout: &mut tokio::io::Stdout,
message: &JSONRPCMessage,
) -> Result<(), std::io::Error> {
let encoded = serde_json::to_vec(message).map_err(std::io::Error::other)?;
stdout.write_all(&encoded).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await
}
fn invalid_request(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32600,
message,
data: None,
}
}
fn method_not_implemented(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32601,
message,
data: None,
}
pub async fn run_main_with_transport(
transport: ExecServerTransport,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
transport::run_transport(transport).await
}

View File

@@ -0,0 +1,170 @@
use std::io;
use std::sync::Arc;
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
use codex_app_server_protocol::FsCreateDirectoryResponse;
use codex_app_server_protocol::FsGetMetadataParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryEntry;
use codex_app_server_protocol::FsReadDirectoryParams;
use codex_app_server_protocol::FsReadDirectoryResponse;
use codex_app_server_protocol::FsReadFileParams;
use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsRemoveParams;
use codex_app_server_protocol::FsRemoveResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_environment::CopyOptions;
use codex_environment::CreateDirectoryOptions;
use codex_environment::Environment;
use codex_environment::ExecutorFileSystem;
use codex_environment::RemoveOptions;
use crate::server::routing::internal_error;
use crate::server::routing::invalid_request;
#[derive(Clone)]
pub(crate) struct ExecServerFileSystem {
file_system: Arc<dyn ExecutorFileSystem>,
}
impl Default for ExecServerFileSystem {
fn default() -> Self {
Self {
file_system: Arc::new(Environment.get_filesystem()),
}
}
}
impl ExecServerFileSystem {
pub(crate) async fn read_file(
&self,
params: FsReadFileParams,
) -> Result<FsReadFileResponse, JSONRPCErrorError> {
let bytes = self
.file_system
.read_file(&params.path)
.await
.map_err(map_fs_error)?;
Ok(FsReadFileResponse {
data_base64: STANDARD.encode(bytes),
})
}
pub(crate) async fn write_file(
&self,
params: FsWriteFileParams,
) -> Result<FsWriteFileResponse, JSONRPCErrorError> {
let bytes = STANDARD.decode(params.data_base64).map_err(|err| {
invalid_request(format!(
"fs/writeFile requires valid base64 dataBase64: {err}"
))
})?;
self.file_system
.write_file(&params.path, bytes)
.await
.map_err(map_fs_error)?;
Ok(FsWriteFileResponse {})
}
pub(crate) async fn create_directory(
&self,
params: FsCreateDirectoryParams,
) -> Result<FsCreateDirectoryResponse, JSONRPCErrorError> {
self.file_system
.create_directory(
&params.path,
CreateDirectoryOptions {
recursive: params.recursive.unwrap_or(true),
},
)
.await
.map_err(map_fs_error)?;
Ok(FsCreateDirectoryResponse {})
}
pub(crate) async fn get_metadata(
&self,
params: FsGetMetadataParams,
) -> Result<FsGetMetadataResponse, JSONRPCErrorError> {
let metadata = self
.file_system
.get_metadata(&params.path)
.await
.map_err(map_fs_error)?;
Ok(FsGetMetadataResponse {
is_directory: metadata.is_directory,
is_file: metadata.is_file,
created_at_ms: metadata.created_at_ms,
modified_at_ms: metadata.modified_at_ms,
})
}
pub(crate) async fn read_directory(
&self,
params: FsReadDirectoryParams,
) -> Result<FsReadDirectoryResponse, JSONRPCErrorError> {
let entries = self
.file_system
.read_directory(&params.path)
.await
.map_err(map_fs_error)?;
Ok(FsReadDirectoryResponse {
entries: entries
.into_iter()
.map(|entry| FsReadDirectoryEntry {
file_name: entry.file_name,
is_directory: entry.is_directory,
is_file: entry.is_file,
})
.collect(),
})
}
pub(crate) async fn remove(
&self,
params: FsRemoveParams,
) -> Result<FsRemoveResponse, JSONRPCErrorError> {
self.file_system
.remove(
&params.path,
RemoveOptions {
recursive: params.recursive.unwrap_or(true),
force: params.force.unwrap_or(true),
},
)
.await
.map_err(map_fs_error)?;
Ok(FsRemoveResponse {})
}
pub(crate) async fn copy(
&self,
params: FsCopyParams,
) -> Result<FsCopyResponse, JSONRPCErrorError> {
self.file_system
.copy(
&params.source_path,
&params.destination_path,
CopyOptions {
recursive: params.recursive,
},
)
.await
.map_err(map_fs_error)?;
Ok(FsCopyResponse {})
}
}
fn map_fs_error(err: io::Error) -> JSONRPCErrorError {
if err.kind() == io::ErrorKind::InvalidInput {
invalid_request(err.to_string())
} else {
internal_error(err.to_string())
}
}

View File

@@ -0,0 +1,630 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
use codex_app_server_protocol::FsCreateDirectoryResponse;
use codex_app_server_protocol::FsGetMetadataParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryParams;
use codex_app_server_protocol::FsReadDirectoryResponse;
use codex_app_server_protocol::FsReadFileParams;
use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsRemoveParams;
use codex_app_server_protocol::FsRemoveResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::TerminalSize;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tracing::warn;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecOutputStream;
use crate::protocol::ExecResponse;
use crate::protocol::ExecSandboxMode;
use crate::protocol::InitializeResponse;
use crate::protocol::PROTOCOL_VERSION;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteResponse;
use crate::server::filesystem::ExecServerFileSystem;
use crate::server::routing::ExecServerClientNotification;
use crate::server::routing::ExecServerInboundMessage;
use crate::server::routing::ExecServerOutboundMessage;
use crate::server::routing::ExecServerRequest;
use crate::server::routing::ExecServerResponseMessage;
use crate::server::routing::ExecServerServerNotification;
use crate::server::routing::internal_error;
use crate::server::routing::invalid_params;
use crate::server::routing::invalid_request;
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
#[cfg(test)]
const EXITED_PROCESS_RETENTION: Duration = Duration::from_millis(25);
#[cfg(not(test))]
const EXITED_PROCESS_RETENTION: Duration = Duration::from_secs(30);
#[derive(Clone)]
struct RetainedOutputChunk {
seq: u64,
stream: ExecOutputStream,
chunk: Vec<u8>,
}
struct RunningProcess {
session: ExecCommandSession,
tty: bool,
output: VecDeque<RetainedOutputChunk>,
retained_bytes: usize,
next_seq: u64,
exit_code: Option<i32>,
output_notify: Arc<Notify>,
}
enum ProcessEntry {
Starting,
Running(Box<RunningProcess>),
}
pub(crate) struct ExecServerHandler {
outbound_tx: mpsc::Sender<ExecServerOutboundMessage>,
file_system: ExecServerFileSystem,
// Keyed by client-chosen logical `processId` scoped to this connection.
// This is a protocol handle, not an OS pid.
processes: Arc<Mutex<HashMap<String, ProcessEntry>>>,
initialize_requested: AtomicBool,
initialized: AtomicBool,
}
impl ExecServerHandler {
pub(crate) fn new(outbound_tx: mpsc::Sender<ExecServerOutboundMessage>) -> Self {
Self {
outbound_tx,
file_system: ExecServerFileSystem::default(),
processes: Arc::new(Mutex::new(HashMap::new())),
initialize_requested: AtomicBool::new(false),
initialized: AtomicBool::new(false),
}
}
pub(crate) async fn shutdown(&self) {
let remaining = {
let mut processes = self.processes.lock().await;
processes
.drain()
.filter_map(|(_, process)| match process {
ProcessEntry::Starting => None,
ProcessEntry::Running(process) => Some(process),
})
.collect::<Vec<_>>()
};
for process in remaining {
process.session.terminate();
}
}
pub(crate) fn initialized(&self) -> Result<(), String> {
if !self.initialize_requested.load(Ordering::SeqCst) {
return Err("received `initialized` notification before `initialize`".into());
}
self.initialized.store(true, Ordering::SeqCst);
Ok(())
}
pub(crate) fn initialize(
&self,
) -> Result<InitializeResponse, codex_app_server_protocol::JSONRPCErrorError> {
if self.initialize_requested.swap(true, Ordering::SeqCst) {
return Err(invalid_request(
"initialize may only be sent once per connection".to_string(),
));
}
Ok(InitializeResponse {
protocol_version: PROTOCOL_VERSION.to_string(),
})
}
fn require_initialized(&self) -> Result<(), codex_app_server_protocol::JSONRPCErrorError> {
if !self.initialize_requested.load(Ordering::SeqCst) {
return Err(invalid_request(
"client must call initialize before using exec methods".to_string(),
));
}
if !self.initialized.load(Ordering::SeqCst) {
return Err(invalid_request(
"client must send initialized before using exec methods".to_string(),
));
}
Ok(())
}
pub(crate) async fn exec(
&self,
params: crate::protocol::ExecParams,
) -> Result<ExecResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
let process_id = params.process_id.clone();
if matches!(
params.sandbox.as_ref().map(|sandbox| sandbox.mode),
Some(ExecSandboxMode::HostDefault)
) {
return Err(invalid_request(
"sandbox mode `hostDefault` is not supported by exec-server yet".to_string(),
));
}
let (program, args) = params
.argv
.split_first()
.ok_or_else(|| invalid_params("argv must not be empty".to_string()))?;
{
let mut process_map = self.processes.lock().await;
if process_map.contains_key(&process_id) {
return Err(invalid_request(format!(
"process {process_id} already exists"
)));
}
process_map.insert(process_id.clone(), ProcessEntry::Starting);
}
let spawned_result = if params.tty {
codex_utils_pty::spawn_pty_process(
program,
args,
params.cwd.as_path(),
&params.env,
&params.arg0,
TerminalSize::default(),
)
.await
} else {
codex_utils_pty::spawn_pipe_process_no_stdin(
program,
args,
params.cwd.as_path(),
&params.env,
&params.arg0,
)
.await
};
let spawned = match spawned_result {
Ok(spawned) => spawned,
Err(err) => {
let mut process_map = self.processes.lock().await;
if matches!(process_map.get(&process_id), Some(ProcessEntry::Starting)) {
process_map.remove(&process_id);
}
return Err(internal_error(err.to_string()));
}
};
let output_notify = Arc::new(Notify::new());
{
let mut process_map = self.processes.lock().await;
process_map.insert(
process_id.clone(),
ProcessEntry::Running(Box::new(RunningProcess {
session: spawned.session,
tty: params.tty,
output: std::collections::VecDeque::new(),
retained_bytes: 0,
next_seq: 1,
exit_code: None,
output_notify: Arc::clone(&output_notify),
})),
);
}
tokio::spawn(stream_output(
process_id.clone(),
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stdout
},
spawned.stdout_rx,
self.outbound_tx.clone(),
Arc::clone(&self.processes),
Arc::clone(&output_notify),
));
tokio::spawn(stream_output(
process_id.clone(),
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stderr
},
spawned.stderr_rx,
self.outbound_tx.clone(),
Arc::clone(&self.processes),
Arc::clone(&output_notify),
));
tokio::spawn(watch_exit(
process_id.clone(),
spawned.exit_rx,
self.outbound_tx.clone(),
Arc::clone(&self.processes),
output_notify,
));
Ok(ExecResponse { process_id })
}
pub(crate) async fn fs_read_file(
&self,
params: FsReadFileParams,
) -> Result<FsReadFileResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.read_file(params).await
}
pub(crate) async fn fs_write_file(
&self,
params: FsWriteFileParams,
) -> Result<FsWriteFileResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.write_file(params).await
}
pub(crate) async fn fs_create_directory(
&self,
params: FsCreateDirectoryParams,
) -> Result<FsCreateDirectoryResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.create_directory(params).await
}
pub(crate) async fn fs_get_metadata(
&self,
params: FsGetMetadataParams,
) -> Result<FsGetMetadataResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.get_metadata(params).await
}
pub(crate) async fn fs_read_directory(
&self,
params: FsReadDirectoryParams,
) -> Result<FsReadDirectoryResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.read_directory(params).await
}
pub(crate) async fn fs_remove(
&self,
params: FsRemoveParams,
) -> Result<FsRemoveResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.remove(params).await
}
pub(crate) async fn fs_copy(
&self,
params: FsCopyParams,
) -> Result<FsCopyResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.copy(params).await
}
pub(crate) async fn exec_read(
&self,
params: crate::protocol::ReadParams,
) -> Result<ReadResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
let after_seq = params.after_seq.unwrap_or(0);
let max_bytes = params.max_bytes.unwrap_or(usize::MAX);
let wait = Duration::from_millis(params.wait_ms.unwrap_or(0));
let deadline = tokio::time::Instant::now() + wait;
loop {
let (response, output_notify) = {
let process_map = self.processes.lock().await;
let process = process_map.get(&params.process_id).ok_or_else(|| {
invalid_request(format!("unknown process id {}", params.process_id))
})?;
let ProcessEntry::Running(process) = process else {
return Err(invalid_request(format!(
"process id {} is starting",
params.process_id
)));
};
let mut chunks = Vec::new();
let mut total_bytes = 0;
let mut next_seq = process.next_seq;
for retained in process.output.iter().filter(|chunk| chunk.seq > after_seq) {
let chunk_len = retained.chunk.len();
if !chunks.is_empty() && total_bytes + chunk_len > max_bytes {
break;
}
total_bytes += chunk_len;
chunks.push(ProcessOutputChunk {
seq: retained.seq,
stream: retained.stream,
chunk: retained.chunk.clone().into(),
});
next_seq = retained.seq + 1;
if total_bytes >= max_bytes {
break;
}
}
(
ReadResponse {
chunks,
next_seq,
exited: process.exit_code.is_some(),
exit_code: process.exit_code,
},
Arc::clone(&process.output_notify),
)
};
if !response.chunks.is_empty()
|| response.exited
|| tokio::time::Instant::now() >= deadline
{
return Ok(response);
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Ok(response);
}
let _ = tokio::time::timeout(remaining, output_notify.notified()).await;
}
}
pub(crate) async fn exec_write(
&self,
params: crate::protocol::WriteParams,
) -> Result<WriteResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
let writer_tx = {
let process_map = self.processes.lock().await;
let process = process_map.get(&params.process_id).ok_or_else(|| {
invalid_request(format!("unknown process id {}", params.process_id))
})?;
let ProcessEntry::Running(process) = process else {
return Err(invalid_request(format!(
"process id {} is starting",
params.process_id
)));
};
if !process.tty {
return Err(invalid_request(format!(
"stdin is closed for process {}",
params.process_id
)));
}
process.session.writer_sender()
};
writer_tx
.send(params.chunk.into_inner())
.await
.map_err(|_| internal_error("failed to write to process stdin".to_string()))?;
Ok(WriteResponse { accepted: true })
}
pub(crate) async fn terminate(
&self,
params: crate::protocol::TerminateParams,
) -> Result<TerminateResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
let running = {
let process_map = self.processes.lock().await;
match process_map.get(&params.process_id) {
Some(ProcessEntry::Running(process)) => {
process.session.terminate();
true
}
Some(ProcessEntry::Starting) | None => false,
}
};
Ok(TerminateResponse { running })
}
pub(crate) async fn handle_message(
&self,
message: ExecServerInboundMessage,
) -> Result<(), String> {
match message {
ExecServerInboundMessage::Request(request) => self.handle_request(request).await,
ExecServerInboundMessage::Notification(ExecServerClientNotification::Initialized) => {
self.initialized()
}
}
}
async fn handle_request(&self, request: ExecServerRequest) -> Result<(), String> {
let outbound = match request {
ExecServerRequest::Initialize { request_id, .. } => Self::request_outbound(
request_id,
self.initialize().map(ExecServerResponseMessage::Initialize),
),
ExecServerRequest::Exec { request_id, params } => Self::request_outbound(
request_id,
self.exec(params).await.map(ExecServerResponseMessage::Exec),
),
ExecServerRequest::Read { request_id, params } => Self::request_outbound(
request_id,
self.exec_read(params)
.await
.map(ExecServerResponseMessage::Read),
),
ExecServerRequest::Write { request_id, params } => Self::request_outbound(
request_id,
self.exec_write(params)
.await
.map(ExecServerResponseMessage::Write),
),
ExecServerRequest::Terminate { request_id, params } => Self::request_outbound(
request_id,
self.terminate(params)
.await
.map(ExecServerResponseMessage::Terminate),
),
ExecServerRequest::FsReadFile { request_id, params } => Self::request_outbound(
request_id,
self.fs_read_file(params)
.await
.map(ExecServerResponseMessage::FsReadFile),
),
ExecServerRequest::FsWriteFile { request_id, params } => Self::request_outbound(
request_id,
self.fs_write_file(params)
.await
.map(ExecServerResponseMessage::FsWriteFile),
),
ExecServerRequest::FsCreateDirectory { request_id, params } => Self::request_outbound(
request_id,
self.fs_create_directory(params)
.await
.map(ExecServerResponseMessage::FsCreateDirectory),
),
ExecServerRequest::FsGetMetadata { request_id, params } => Self::request_outbound(
request_id,
self.fs_get_metadata(params)
.await
.map(ExecServerResponseMessage::FsGetMetadata),
),
ExecServerRequest::FsReadDirectory { request_id, params } => Self::request_outbound(
request_id,
self.fs_read_directory(params)
.await
.map(ExecServerResponseMessage::FsReadDirectory),
),
ExecServerRequest::FsRemove { request_id, params } => Self::request_outbound(
request_id,
self.fs_remove(params)
.await
.map(ExecServerResponseMessage::FsRemove),
),
ExecServerRequest::FsCopy { request_id, params } => Self::request_outbound(
request_id,
self.fs_copy(params)
.await
.map(ExecServerResponseMessage::FsCopy),
),
};
self.outbound_tx
.send(outbound)
.await
.map_err(|_| "outbound channel closed".to_string())
}
fn request_outbound(
request_id: codex_app_server_protocol::RequestId,
result: Result<ExecServerResponseMessage, codex_app_server_protocol::JSONRPCErrorError>,
) -> ExecServerOutboundMessage {
match result {
Ok(response) => ExecServerOutboundMessage::Response {
request_id,
response,
},
Err(error) => ExecServerOutboundMessage::Error { request_id, error },
}
}
}
async fn stream_output(
process_id: String,
stream: ExecOutputStream,
mut receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
outbound_tx: mpsc::Sender<ExecServerOutboundMessage>,
processes: Arc<Mutex<HashMap<String, ProcessEntry>>>,
output_notify: Arc<Notify>,
) {
while let Some(chunk) = receiver.recv().await {
let notification = {
let mut processes = processes.lock().await;
let Some(entry) = processes.get_mut(&process_id) else {
break;
};
let ProcessEntry::Running(process) = entry else {
break;
};
let seq = process.next_seq;
process.next_seq += 1;
process.retained_bytes += chunk.len();
process.output.push_back(RetainedOutputChunk {
seq,
stream,
chunk: chunk.clone(),
});
while process.retained_bytes > RETAINED_OUTPUT_BYTES_PER_PROCESS {
let Some(evicted) = process.output.pop_front() else {
break;
};
process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len());
warn!(
"retained output cap exceeded for process {process_id}; dropping oldest output"
);
}
ExecOutputDeltaNotification {
process_id: process_id.clone(),
stream,
chunk: chunk.into(),
}
};
output_notify.notify_waiters();
if outbound_tx
.send(ExecServerOutboundMessage::Notification(
ExecServerServerNotification::OutputDelta(notification),
))
.await
.is_err()
{
break;
}
}
}
async fn watch_exit(
process_id: String,
exit_rx: tokio::sync::oneshot::Receiver<i32>,
outbound_tx: mpsc::Sender<ExecServerOutboundMessage>,
processes: Arc<Mutex<HashMap<String, ProcessEntry>>>,
output_notify: Arc<Notify>,
) {
let exit_code = exit_rx.await.unwrap_or(-1);
{
let mut processes = processes.lock().await;
if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) {
process.exit_code = Some(exit_code);
}
}
output_notify.notify_waiters();
let _ = outbound_tx
.send(ExecServerOutboundMessage::Notification(
ExecServerServerNotification::Exited(ExecExitedNotification {
process_id: process_id.clone(),
exit_code,
}),
))
.await;
tokio::spawn(async move {
tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
let mut processes = processes.lock().await;
processes.remove(&process_id);
});
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,789 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use pretty_assertions::assert_eq;
use tokio::sync::Notify;
use tokio::time::timeout;
use super::ExecServerHandler;
use super::ProcessEntry;
use super::RetainedOutputChunk;
use super::RunningProcess;
use crate::protocol::ExecOutputStream;
use crate::protocol::ExecSandboxConfig;
use crate::protocol::ExecSandboxMode;
use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::PROTOCOL_VERSION;
use crate::protocol::ReadParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::server::routing::ExecServerClientNotification;
use crate::server::routing::ExecServerInboundMessage;
use crate::server::routing::ExecServerOutboundMessage;
use crate::server::routing::ExecServerRequest;
use crate::server::routing::ExecServerResponseMessage;
use codex_app_server_protocol::RequestId;
async fn recv_outbound(
outgoing_rx: &mut tokio::sync::mpsc::Receiver<ExecServerOutboundMessage>,
) -> ExecServerOutboundMessage {
let recv_result = timeout(Duration::from_secs(1), outgoing_rx.recv()).await;
let maybe_message = match recv_result {
Ok(maybe_message) => maybe_message,
Err(err) => panic!("timed out waiting for handler output: {err}"),
};
match maybe_message {
Some(message) => message,
None => panic!("handler output channel closed unexpectedly"),
}
}
#[tokio::test]
async fn initialize_response_reports_protocol_version() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(1);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
assert_eq!(
recv_outbound(&mut outgoing_rx).await,
ExecServerOutboundMessage::Response {
request_id: RequestId::Integer(1),
response: ExecServerResponseMessage::Initialize(InitializeResponse {
protocol_version: PROTOCOL_VERSION.to_string(),
}),
}
);
}
#[tokio::test]
async fn exec_methods_require_initialize() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(1);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(7),
params: crate::protocol::ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().expect("cwd"),
env: HashMap::new(),
tty: true,
arg0: None,
sandbox: None,
},
}))
.await
{
panic!("request handling should not fail the handler: {err}");
}
let ExecServerOutboundMessage::Error { request_id, error } =
recv_outbound(&mut outgoing_rx).await
else {
panic!("expected invalid-request error");
};
assert_eq!(request_id, RequestId::Integer(7));
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"client must call initialize before using exec methods"
);
}
#[tokio::test]
async fn exec_methods_require_initialized_notification_after_initialize() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(2),
params: crate::protocol::ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().expect("cwd"),
env: HashMap::new(),
tty: true,
arg0: None,
sandbox: None,
},
}))
.await
{
panic!("request handling should not fail the handler: {err}");
}
let ExecServerOutboundMessage::Error { request_id, error } =
recv_outbound(&mut outgoing_rx).await
else {
panic!("expected invalid-request error");
};
assert_eq!(request_id, RequestId::Integer(2));
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"client must send initialized before using exec methods"
);
}
#[tokio::test]
async fn initialized_before_initialize_is_a_protocol_error() {
let (outgoing_tx, _outgoing_rx) = tokio::sync::mpsc::channel(1);
let handler = ExecServerHandler::new(outgoing_tx);
let result = handler
.handle_message(ExecServerInboundMessage::Notification(
ExecServerClientNotification::Initialized,
))
.await;
match result {
Err(err) => {
assert_eq!(
err,
"received `initialized` notification before `initialize`"
);
}
Ok(()) => panic!("expected protocol error for early initialized notification"),
}
}
#[tokio::test]
async fn initialize_may_only_be_sent_once_per_connection() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(2),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("duplicate initialize should not fail the handler: {err}");
}
let ExecServerOutboundMessage::Error { request_id, error } =
recv_outbound(&mut outgoing_rx).await
else {
panic!("expected invalid-request error");
};
assert_eq!(request_id, RequestId::Integer(2));
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"initialize may only be sent once per connection"
);
}
#[tokio::test]
async fn host_default_sandbox_requests_are_rejected_until_supported() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(3);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Notification(
ExecServerClientNotification::Initialized,
))
.await
{
panic!("initialized should succeed: {err}");
}
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(2),
params: crate::protocol::ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().expect("cwd"),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: Some(ExecSandboxConfig {
mode: ExecSandboxMode::HostDefault,
}),
},
}))
.await
{
panic!("request handling should not fail the handler: {err}");
}
let ExecServerOutboundMessage::Error { request_id, error } =
recv_outbound(&mut outgoing_rx).await
else {
panic!("expected unsupported sandbox error");
};
assert_eq!(request_id, RequestId::Integer(2));
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"sandbox mode `hostDefault` is not supported by exec-server yet"
);
}
#[tokio::test]
async fn exec_echoes_client_process_ids() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Notification(
ExecServerClientNotification::Initialized,
))
.await
{
panic!("initialized should succeed: {err}");
}
let params = crate::protocol::ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
"sleep 30".to_string(),
],
cwd: std::env::current_dir().expect("cwd"),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
};
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(2),
params: params.clone(),
}))
.await
{
panic!("first exec should succeed: {err}");
}
let ExecServerOutboundMessage::Response {
request_id,
response: ExecServerResponseMessage::Exec(first_exec),
} = recv_outbound(&mut outgoing_rx).await
else {
panic!("expected first exec response");
};
assert_eq!(request_id, RequestId::Integer(2));
assert_eq!(first_exec.process_id, "proc-1");
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(3),
params: crate::protocol::ExecParams {
process_id: "proc-2".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
..params
},
}))
.await
{
panic!("second exec should succeed: {err}");
}
let ExecServerOutboundMessage::Response {
request_id,
response: ExecServerResponseMessage::Exec(second_exec),
} = recv_outbound(&mut outgoing_rx).await
else {
panic!("expected second exec response");
};
assert_eq!(request_id, RequestId::Integer(3));
assert_eq!(second_exec.process_id, "proc-2");
handler.shutdown().await;
}
#[tokio::test]
async fn concurrent_duplicate_execs_reserve_process_ids_atomically() {
let (outgoing_tx, _outgoing_rx) = tokio::sync::mpsc::channel(4);
let handler = Arc::new(ExecServerHandler::new(outgoing_tx));
let _ = handler.initialize().expect("initialize should succeed");
handler.initialized().expect("initialized should succeed");
let params = crate::protocol::ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
"sleep 30".to_string(),
],
cwd: std::env::current_dir().expect("cwd"),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
};
let first = {
let handler = Arc::clone(&handler);
let params = params.clone();
tokio::spawn(async move { handler.exec(params).await })
};
let second = {
let handler = Arc::clone(&handler);
let params = params.clone();
tokio::spawn(async move { handler.exec(params).await })
};
let (first, second) = tokio::join!(first, second);
let results = [
first.expect("first task should complete"),
second.expect("second task should complete"),
];
let successes = results.iter().filter(|result| result.is_ok()).count();
let duplicate_errors = results
.iter()
.filter(|result| {
result
.as_ref()
.err()
.is_some_and(|err| err.message == "process proc-1 already exists")
})
.count();
assert_eq!(successes, 1);
assert_eq!(duplicate_errors, 1);
handler.shutdown().await;
}
#[tokio::test]
async fn writes_to_pipe_backed_processes_are_rejected() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Notification(
ExecServerClientNotification::Initialized,
))
.await
{
panic!("initialized should succeed: {err}");
}
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(2),
params: crate::protocol::ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
"sleep 30".to_string(),
],
cwd: std::env::current_dir().expect("cwd"),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
},
}))
.await
{
panic!("exec should succeed: {err}");
}
let ExecServerOutboundMessage::Response {
response: ExecServerResponseMessage::Exec(exec_response),
..
} = recv_outbound(&mut outgoing_rx).await
else {
panic!("expected exec response");
};
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Write {
request_id: RequestId::Integer(3),
params: WriteParams {
process_id: exec_response.process_id,
chunk: b"hello\n".to_vec().into(),
},
},
))
.await
{
panic!("write should not fail the handler: {err}");
}
let ExecServerOutboundMessage::Error { request_id, error } =
recv_outbound(&mut outgoing_rx).await
else {
panic!("expected stdin-closed error");
};
assert_eq!(request_id, RequestId::Integer(3));
assert_eq!(error.code, -32600);
assert_eq!(error.message, "stdin is closed for process proc-1");
handler.shutdown().await;
}
#[tokio::test]
async fn writes_to_unknown_processes_are_rejected() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Notification(
ExecServerClientNotification::Initialized,
))
.await
{
panic!("initialized should succeed: {err}");
}
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Write {
request_id: RequestId::Integer(2),
params: WriteParams {
process_id: "missing".to_string(),
chunk: b"hello\n".to_vec().into(),
},
},
))
.await
{
panic!("write should not fail the handler: {err}");
}
let ExecServerOutboundMessage::Error { request_id, error } =
recv_outbound(&mut outgoing_rx).await
else {
panic!("expected unknown-process error");
};
assert_eq!(request_id, RequestId::Integer(2));
assert_eq!(error.code, -32600);
assert_eq!(error.message, "unknown process id missing");
}
#[tokio::test]
async fn terminate_unknown_processes_report_running_false() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Notification(
ExecServerClientNotification::Initialized,
))
.await
{
panic!("initialized should succeed: {err}");
}
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Terminate {
request_id: RequestId::Integer(2),
params: crate::protocol::TerminateParams {
process_id: "missing".to_string(),
},
},
))
.await
{
panic!("terminate should not fail the handler: {err}");
}
assert_eq!(
recv_outbound(&mut outgoing_rx).await,
ExecServerOutboundMessage::Response {
request_id: RequestId::Integer(2),
response: ExecServerResponseMessage::Terminate(TerminateResponse { running: false }),
}
);
}
#[tokio::test]
async fn terminate_keeps_process_ids_reserved() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2);
let handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Notification(
ExecServerClientNotification::Initialized,
))
.await
{
panic!("initialized should succeed: {err}");
}
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(2),
params: crate::protocol::ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
"sleep 30".to_string(),
],
cwd: std::env::current_dir().expect("cwd"),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
},
}))
.await
{
panic!("exec should not fail the handler: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Terminate {
request_id: RequestId::Integer(3),
params: crate::protocol::TerminateParams {
process_id: "proc-1".to_string(),
},
},
))
.await
{
panic!("terminate should not fail the handler: {err}");
}
assert_eq!(
recv_outbound(&mut outgoing_rx).await,
ExecServerOutboundMessage::Response {
request_id: RequestId::Integer(3),
response: ExecServerResponseMessage::Terminate(TerminateResponse { running: true }),
}
);
assert!(
handler.processes.lock().await.contains_key("proc-1"),
"terminated ids should stay reserved until exit cleanup removes them"
);
let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
loop {
if !handler.processes.lock().await.contains_key("proc-1") {
break;
}
assert!(
tokio::time::Instant::now() < deadline,
"terminated ids should be removed after the exit retention window"
);
tokio::time::sleep(Duration::from_millis(25)).await;
}
handler.shutdown().await;
}
#[tokio::test]
async fn read_paginates_retained_output_without_skipping_omitted_chunks() {
let (outgoing_tx, _outgoing_rx) = tokio::sync::mpsc::channel(1);
let handler = ExecServerHandler::new(outgoing_tx);
let _ = handler.initialize().expect("initialize should succeed");
handler.initialized().expect("initialized should succeed");
let spawned = codex_utils_pty::spawn_pipe_process_no_stdin(
"bash",
&["-lc".to_string(), "true".to_string()],
std::env::current_dir().expect("cwd").as_path(),
&HashMap::new(),
&None,
)
.await
.expect("spawn test process");
{
let mut process_map = handler.processes.lock().await;
process_map.insert(
"proc-1".to_string(),
ProcessEntry::Running(Box::new(RunningProcess {
session: spawned.session,
tty: false,
output: VecDeque::from([
RetainedOutputChunk {
seq: 1,
stream: ExecOutputStream::Stdout,
chunk: b"abc".to_vec(),
},
RetainedOutputChunk {
seq: 2,
stream: ExecOutputStream::Stderr,
chunk: b"def".to_vec(),
},
]),
retained_bytes: 6,
next_seq: 3,
exit_code: None,
output_notify: Arc::new(Notify::new()),
})),
);
}
let first = handler
.exec_read(ReadParams {
process_id: "proc-1".to_string(),
after_seq: Some(0),
max_bytes: Some(3),
wait_ms: Some(0),
})
.await
.expect("first read should succeed");
assert_eq!(first.chunks.len(), 1);
assert_eq!(first.chunks[0].seq, 1);
assert_eq!(first.chunks[0].stream, ExecOutputStream::Stdout);
assert_eq!(first.chunks[0].chunk.clone().into_inner(), b"abc".to_vec());
assert_eq!(first.next_seq, 2);
let second = handler
.exec_read(ReadParams {
process_id: "proc-1".to_string(),
after_seq: Some(first.next_seq - 1),
max_bytes: Some(3),
wait_ms: Some(0),
})
.await
.expect("second read should succeed");
assert_eq!(second.chunks.len(), 1);
assert_eq!(second.chunks[0].seq, 2);
assert_eq!(second.chunks[0].stream, ExecOutputStream::Stderr);
assert_eq!(second.chunks[0].chunk.clone().into_inner(), b"def".to_vec());
assert_eq!(second.next_seq, 3);
handler.shutdown().await;
}

View File

@@ -0,0 +1,71 @@
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::debug;
use tracing::warn;
use crate::connection::CHANNEL_CAPACITY;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::server::handler::ExecServerHandler;
use crate::server::routing::ExecServerOutboundMessage;
use crate::server::routing::RoutedExecServerMessage;
use crate::server::routing::encode_outbound_message;
use crate::server::routing::route_jsonrpc_message;
pub(crate) async fn run_connection(connection: JsonRpcConnection) {
let (json_outgoing_tx, mut incoming_rx, _connection_tasks) = connection.into_parts();
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<ExecServerOutboundMessage>(CHANNEL_CAPACITY);
let handler = Arc::new(ExecServerHandler::new(outgoing_tx.clone()));
let outbound_task = tokio::spawn(async move {
while let Some(message) = outgoing_rx.recv().await {
let json_message = match encode_outbound_message(message) {
Ok(json_message) => json_message,
Err(err) => {
warn!("failed to serialize exec-server outbound message: {err}");
break;
}
};
if json_outgoing_tx.send(json_message).await.is_err() {
break;
}
}
});
while let Some(event) = incoming_rx.recv().await {
match event {
JsonRpcConnectionEvent::Message(message) => match route_jsonrpc_message(message) {
Ok(RoutedExecServerMessage::Inbound(message)) => {
let handler = Arc::clone(&handler);
tokio::spawn(async move {
if let Err(err) = handler.handle_message(message).await {
warn!("exec-server request failed after protocol error: {err}");
}
});
}
Ok(RoutedExecServerMessage::ImmediateOutbound(message)) => {
if outgoing_tx.send(message).await.is_err() {
break;
}
}
Err(err) => {
warn!("closing exec-server connection after protocol error: {err}");
break;
}
},
JsonRpcConnectionEvent::Disconnected { reason } => {
if let Some(reason) = reason {
debug!("exec-server connection disconnected: {reason}");
}
break;
}
}
}
handler.shutdown().await;
drop(handler);
drop(outgoing_tx);
let _ = outbound_task.await;
}

View File

@@ -0,0 +1,585 @@
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use serde::de::DeserializeOwned;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::FS_COPY_METHOD;
use crate::protocol::FS_CREATE_DIRECTORY_METHOD;
use crate::protocol::FS_GET_METADATA_METHOD;
use crate::protocol::FS_READ_DIRECTORY_METHOD;
use crate::protocol::FS_READ_FILE_METHOD;
use crate::protocol::FS_REMOVE_METHOD;
use crate::protocol::FS_WRITE_FILE_METHOD;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
use codex_app_server_protocol::FsCreateDirectoryResponse;
use codex_app_server_protocol::FsGetMetadataParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryParams;
use codex_app_server_protocol::FsReadDirectoryResponse;
use codex_app_server_protocol::FsReadFileParams;
use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsRemoveParams;
use codex_app_server_protocol::FsRemoveResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum ExecServerInboundMessage {
Request(ExecServerRequest),
Notification(ExecServerClientNotification),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum ExecServerRequest {
Initialize {
request_id: RequestId,
params: InitializeParams,
},
Exec {
request_id: RequestId,
params: ExecParams,
},
Read {
request_id: RequestId,
params: ReadParams,
},
Write {
request_id: RequestId,
params: WriteParams,
},
Terminate {
request_id: RequestId,
params: TerminateParams,
},
FsReadFile {
request_id: RequestId,
params: FsReadFileParams,
},
FsWriteFile {
request_id: RequestId,
params: FsWriteFileParams,
},
FsCreateDirectory {
request_id: RequestId,
params: FsCreateDirectoryParams,
},
FsGetMetadata {
request_id: RequestId,
params: FsGetMetadataParams,
},
FsReadDirectory {
request_id: RequestId,
params: FsReadDirectoryParams,
},
FsRemove {
request_id: RequestId,
params: FsRemoveParams,
},
FsCopy {
request_id: RequestId,
params: FsCopyParams,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum ExecServerClientNotification {
Initialized,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum ExecServerOutboundMessage {
Response {
request_id: RequestId,
response: ExecServerResponseMessage,
},
Error {
request_id: RequestId,
error: JSONRPCErrorError,
},
Notification(ExecServerServerNotification),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum ExecServerResponseMessage {
Initialize(InitializeResponse),
Exec(ExecResponse),
Read(ReadResponse),
Write(WriteResponse),
Terminate(TerminateResponse),
FsReadFile(FsReadFileResponse),
FsWriteFile(FsWriteFileResponse),
FsCreateDirectory(FsCreateDirectoryResponse),
FsGetMetadata(FsGetMetadataResponse),
FsReadDirectory(FsReadDirectoryResponse),
FsRemove(FsRemoveResponse),
FsCopy(FsCopyResponse),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum ExecServerServerNotification {
OutputDelta(ExecOutputDeltaNotification),
Exited(ExecExitedNotification),
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum RoutedExecServerMessage {
Inbound(ExecServerInboundMessage),
ImmediateOutbound(ExecServerOutboundMessage),
}
pub(crate) fn route_jsonrpc_message(
message: JSONRPCMessage,
) -> Result<RoutedExecServerMessage, String> {
match message {
JSONRPCMessage::Request(request) => route_request(request),
JSONRPCMessage::Notification(notification) => route_notification(notification),
JSONRPCMessage::Response(response) => Err(format!(
"unexpected client response for request id {:?}",
response.id
)),
JSONRPCMessage::Error(error) => Err(format!(
"unexpected client error for request id {:?}",
error.id
)),
}
}
pub(crate) fn encode_outbound_message(
message: ExecServerOutboundMessage,
) -> Result<JSONRPCMessage, serde_json::Error> {
match message {
ExecServerOutboundMessage::Response {
request_id,
response,
} => Ok(JSONRPCMessage::Response(JSONRPCResponse {
id: request_id,
result: serialize_response(response)?,
})),
ExecServerOutboundMessage::Error { request_id, error } => {
Ok(JSONRPCMessage::Error(JSONRPCError {
id: request_id,
error,
}))
}
ExecServerOutboundMessage::Notification(notification) => Ok(JSONRPCMessage::Notification(
serialize_notification(notification)?,
)),
}
}
pub(crate) fn invalid_request(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32600,
data: None,
message,
}
}
pub(crate) fn invalid_params(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32602,
data: None,
message,
}
}
pub(crate) fn internal_error(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32603,
data: None,
message,
}
}
fn route_request(request: JSONRPCRequest) -> Result<RoutedExecServerMessage, String> {
match request.method.as_str() {
INITIALIZE_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::Initialize { request_id, params }
})),
EXEC_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::Exec { request_id, params }
})),
EXEC_READ_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::Read { request_id, params }
})),
EXEC_WRITE_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::Write { request_id, params }
})),
EXEC_TERMINATE_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::Terminate { request_id, params }
})),
FS_READ_FILE_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::FsReadFile { request_id, params }
})),
FS_WRITE_FILE_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::FsWriteFile { request_id, params }
})),
FS_CREATE_DIRECTORY_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::FsCreateDirectory { request_id, params }
})),
FS_GET_METADATA_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::FsGetMetadata { request_id, params }
})),
FS_READ_DIRECTORY_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::FsReadDirectory { request_id, params }
})),
FS_REMOVE_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::FsRemove { request_id, params }
})),
FS_COPY_METHOD => Ok(parse_request_params(request, |request_id, params| {
ExecServerRequest::FsCopy { request_id, params }
})),
other => Ok(RoutedExecServerMessage::ImmediateOutbound(
ExecServerOutboundMessage::Error {
request_id: request.id,
error: invalid_request(format!("unknown method: {other}")),
},
)),
}
}
fn route_notification(
notification: JSONRPCNotification,
) -> Result<RoutedExecServerMessage, String> {
match notification.method.as_str() {
INITIALIZED_METHOD => Ok(RoutedExecServerMessage::Inbound(
ExecServerInboundMessage::Notification(ExecServerClientNotification::Initialized),
)),
other => Err(format!("unexpected notification method: {other}")),
}
}
fn parse_request_params<P, F>(request: JSONRPCRequest, build: F) -> RoutedExecServerMessage
where
P: DeserializeOwned,
F: FnOnce(RequestId, P) -> ExecServerRequest,
{
let request_id = request.id;
match serde_json::from_value::<P>(request.params.unwrap_or(serde_json::Value::Null)) {
Ok(params) => RoutedExecServerMessage::Inbound(ExecServerInboundMessage::Request(build(
request_id, params,
))),
Err(err) => RoutedExecServerMessage::ImmediateOutbound(ExecServerOutboundMessage::Error {
request_id,
error: invalid_params(err.to_string()),
}),
}
}
fn serialize_response(
response: ExecServerResponseMessage,
) -> Result<serde_json::Value, serde_json::Error> {
match response {
ExecServerResponseMessage::Initialize(response) => serde_json::to_value(response),
ExecServerResponseMessage::Exec(response) => serde_json::to_value(response),
ExecServerResponseMessage::Read(response) => serde_json::to_value(response),
ExecServerResponseMessage::Write(response) => serde_json::to_value(response),
ExecServerResponseMessage::Terminate(response) => serde_json::to_value(response),
ExecServerResponseMessage::FsReadFile(response) => serde_json::to_value(response),
ExecServerResponseMessage::FsWriteFile(response) => serde_json::to_value(response),
ExecServerResponseMessage::FsCreateDirectory(response) => serde_json::to_value(response),
ExecServerResponseMessage::FsGetMetadata(response) => serde_json::to_value(response),
ExecServerResponseMessage::FsReadDirectory(response) => serde_json::to_value(response),
ExecServerResponseMessage::FsRemove(response) => serde_json::to_value(response),
ExecServerResponseMessage::FsCopy(response) => serde_json::to_value(response),
}
}
fn serialize_notification(
notification: ExecServerServerNotification,
) -> Result<JSONRPCNotification, serde_json::Error> {
match notification {
ExecServerServerNotification::OutputDelta(params) => Ok(JSONRPCNotification {
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
params: Some(serde_json::to_value(params)?),
}),
ExecServerServerNotification::Exited(params) => Ok(JSONRPCNotification {
method: EXEC_EXITED_METHOD.to_string(),
params: Some(serde_json::to_value(params)?),
}),
}
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use serde_json::json;
use super::ExecServerClientNotification;
use super::ExecServerInboundMessage;
use super::ExecServerOutboundMessage;
use super::ExecServerRequest;
use super::ExecServerResponseMessage;
use super::ExecServerServerNotification;
use super::RoutedExecServerMessage;
use super::encode_outbound_message;
use super::route_jsonrpc_message;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ExecSandboxConfig;
use crate::protocol::ExecSandboxMode;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
#[test]
fn routes_initialize_requests_to_typed_variants() {
let routed = route_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: INITIALIZE_METHOD.to_string(),
params: Some(json!({ "clientName": "test-client" })),
trace: None,
}))
.expect("initialize request should route");
assert_eq!(
routed,
RoutedExecServerMessage::Inbound(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test-client".to_string(),
},
},
))
);
}
#[test]
fn malformed_exec_params_return_immediate_error_outbound() {
let routed = route_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(2),
method: EXEC_METHOD.to_string(),
params: Some(json!({ "processId": "proc-1" })),
trace: None,
}))
.expect("exec request should route");
let RoutedExecServerMessage::ImmediateOutbound(ExecServerOutboundMessage::Error {
request_id,
error,
}) = routed
else {
panic!("expected invalid-params error outbound");
};
assert_eq!(request_id, RequestId::Integer(2));
assert_eq!(error.code, -32602);
}
#[test]
fn routes_initialized_notifications_to_typed_variants() {
let routed = route_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification {
method: INITIALIZED_METHOD.to_string(),
params: Some(json!({})),
}))
.expect("initialized notification should route");
assert_eq!(
routed,
RoutedExecServerMessage::Inbound(ExecServerInboundMessage::Notification(
ExecServerClientNotification::Initialized,
))
);
}
#[test]
fn serializes_typed_notifications_back_to_jsonrpc() {
let message = encode_outbound_message(ExecServerOutboundMessage::Notification(
ExecServerServerNotification::Exited(ExecExitedNotification {
process_id: "proc-1".to_string(),
exit_code: 0,
}),
))
.expect("notification should serialize");
assert_eq!(
message,
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_EXITED_METHOD.to_string(),
params: Some(json!({
"processId": "proc-1",
"exitCode": 0,
})),
})
);
}
#[test]
fn serializes_typed_responses_back_to_jsonrpc() {
let message = encode_outbound_message(ExecServerOutboundMessage::Response {
request_id: RequestId::Integer(3),
response: ExecServerResponseMessage::Exec(ExecResponse {
process_id: "proc-1".to_string(),
}),
})
.expect("response should serialize");
assert_eq!(
message,
JSONRPCMessage::Response(codex_app_server_protocol::JSONRPCResponse {
id: RequestId::Integer(3),
result: json!({
"processId": "proc-1",
}),
})
);
}
#[test]
fn routes_exec_requests_with_typed_params() {
let cwd = std::env::current_dir().expect("cwd");
let routed = route_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(4),
method: EXEC_METHOD.to_string(),
params: Some(json!({
"processId": "proc-1",
"argv": ["bash", "-lc", "true"],
"cwd": cwd,
"env": {},
"tty": true,
"arg0": null,
})),
trace: None,
}))
.expect("exec request should route");
let RoutedExecServerMessage::Inbound(ExecServerInboundMessage::Request(
ExecServerRequest::Exec { request_id, params },
)) = routed
else {
panic!("expected typed exec request");
};
assert_eq!(request_id, RequestId::Integer(4));
assert_eq!(
params,
ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().expect("cwd"),
env: std::collections::HashMap::new(),
tty: true,
arg0: None,
sandbox: None,
}
);
}
#[test]
fn routes_exec_requests_with_optional_sandbox_config() {
let cwd = std::env::current_dir().expect("cwd");
let routed = route_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(4),
method: EXEC_METHOD.to_string(),
params: Some(json!({
"processId": "proc-1",
"argv": ["bash", "-lc", "true"],
"cwd": cwd,
"env": {},
"tty": true,
"arg0": null,
"sandbox": {
"mode": "none",
},
})),
trace: None,
}))
.expect("exec request with sandbox should route");
let RoutedExecServerMessage::Inbound(ExecServerInboundMessage::Request(
ExecServerRequest::Exec { request_id, params },
)) = routed
else {
panic!("expected typed exec request");
};
assert_eq!(request_id, RequestId::Integer(4));
assert_eq!(
params,
ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().expect("cwd"),
env: std::collections::HashMap::new(),
tty: true,
arg0: None,
sandbox: Some(ExecSandboxConfig {
mode: ExecSandboxMode::None,
}),
}
);
}
#[test]
fn unknown_request_methods_return_immediate_invalid_request_errors() {
let routed = route_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(5),
method: "process/unknown".to_string(),
params: Some(json!({})),
trace: None,
}))
.expect("unknown request should still route");
assert_eq!(
routed,
RoutedExecServerMessage::ImmediateOutbound(ExecServerOutboundMessage::Error {
request_id: RequestId::Integer(5),
error: super::invalid_request("unknown method: process/unknown".to_string()),
})
);
}
#[test]
fn unexpected_client_notifications_are_rejected() {
let err = route_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification {
method: "process/output".to_string(),
params: Some(json!({})),
}))
.expect_err("unexpected client notification should fail");
assert_eq!(err, "unexpected notification method: process/output");
}
#[test]
fn unexpected_client_responses_are_rejected() {
let err = route_jsonrpc_message(JSONRPCMessage::Response(JSONRPCResponse {
id: RequestId::Integer(6),
result: json!({}),
}))
.expect_err("unexpected client response should fail");
assert_eq!(err, "unexpected client response for request id Integer(6)");
}
}

View File

@@ -0,0 +1,166 @@
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tracing::warn;
use crate::connection::JsonRpcConnection;
use crate::server::processor::run_connection;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ExecServerTransport {
Stdio,
WebSocket { bind_address: SocketAddr },
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ExecServerTransportParseError {
UnsupportedListenUrl(String),
InvalidWebSocketListenUrl(String),
}
impl std::fmt::Display for ExecServerTransportParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ExecServerTransportParseError::UnsupportedListenUrl(listen_url) => write!(
f,
"unsupported --listen URL `{listen_url}`; expected `stdio://` or `ws://IP:PORT`"
),
ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url) => write!(
f,
"invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`"
),
}
}
}
impl std::error::Error for ExecServerTransportParseError {}
impl ExecServerTransport {
pub const DEFAULT_LISTEN_URL: &str = "stdio://";
pub fn from_listen_url(listen_url: &str) -> Result<Self, ExecServerTransportParseError> {
if listen_url == Self::DEFAULT_LISTEN_URL {
return Ok(Self::Stdio);
}
if let Some(socket_addr) = listen_url.strip_prefix("ws://") {
let bind_address = socket_addr.parse::<SocketAddr>().map_err(|_| {
ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url.to_string())
})?;
return Ok(Self::WebSocket { bind_address });
}
Err(ExecServerTransportParseError::UnsupportedListenUrl(
listen_url.to_string(),
))
}
}
impl FromStr for ExecServerTransport {
type Err = ExecServerTransportParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::from_listen_url(s)
}
}
pub(crate) async fn run_transport(
transport: ExecServerTransport,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match transport {
ExecServerTransport::Stdio => {
run_connection(JsonRpcConnection::from_stdio(
tokio::io::stdin(),
tokio::io::stdout(),
"exec-server stdio".to_string(),
))
.await;
Ok(())
}
ExecServerTransport::WebSocket { bind_address } => {
run_websocket_listener(bind_address).await
}
}
}
async fn run_websocket_listener(
bind_address: SocketAddr,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(bind_address).await?;
let local_addr = listener.local_addr()?;
print_websocket_startup_banner(local_addr);
loop {
let (stream, peer_addr) = listener.accept().await?;
tokio::spawn(async move {
match accept_async(stream).await {
Ok(websocket) => {
run_connection(JsonRpcConnection::from_websocket(
websocket,
format!("exec-server websocket {peer_addr}"),
))
.await;
}
Err(err) => {
warn!(
"failed to accept exec-server websocket connection from {peer_addr}: {err}"
);
}
}
});
}
}
#[allow(clippy::print_stderr)]
fn print_websocket_startup_banner(addr: SocketAddr) {
eprintln!("codex-exec-server listening on ws://{addr}");
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use super::ExecServerTransport;
#[test]
fn exec_server_transport_parses_stdio_listen_url() {
let transport =
ExecServerTransport::from_listen_url(ExecServerTransport::DEFAULT_LISTEN_URL)
.expect("stdio listen URL should parse");
assert_eq!(transport, ExecServerTransport::Stdio);
}
#[test]
fn exec_server_transport_parses_websocket_listen_url() {
let transport = ExecServerTransport::from_listen_url("ws://127.0.0.1:1234")
.expect("websocket listen URL should parse");
assert_eq!(
transport,
ExecServerTransport::WebSocket {
bind_address: "127.0.0.1:1234".parse().expect("valid socket address"),
}
);
}
#[test]
fn exec_server_transport_rejects_invalid_websocket_listen_url() {
let err = ExecServerTransport::from_listen_url("ws://localhost:1234")
.expect_err("hostname bind address should be rejected");
assert_eq!(
err.to_string(),
"invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`"
);
}
#[test]
fn exec_server_transport_rejects_unsupported_listen_url() {
let err = ExecServerTransport::from_listen_url("http://127.0.0.1:1234")
.expect_err("unsupported scheme should fail");
assert_eq!(
err.to_string(),
"unsupported --listen URL `http://127.0.0.1:1234`; expected `stdio://` or `ws://IP:PORT`"
);
}
}

View File

@@ -1,46 +0,0 @@
use std::path::PathBuf;
use std::process::Stdio;
use tokio::process::Child;
use tokio::process::ChildStdin;
use tokio::process::ChildStdout;
use tokio::process::Command;
use crate::client::ExecServerError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecServerLaunchCommand {
pub program: PathBuf,
pub args: Vec<String>,
}
pub(crate) struct SpawnedStdioExecServer {
pub(crate) child: Child,
pub(crate) stdin: ChildStdin,
pub(crate) stdout: ChildStdout,
}
pub(crate) fn spawn_stdio_exec_server(
command: ExecServerLaunchCommand,
) -> Result<SpawnedStdioExecServer, ExecServerError> {
let mut child = Command::new(&command.program);
child.args(&command.args);
child.stdin(Stdio::piped());
child.stdout(Stdio::piped());
child.stderr(Stdio::inherit());
child.kill_on_drop(true);
let mut child = child.spawn().map_err(ExecServerError::Spawn)?;
let stdin = child.stdin.take().ok_or_else(|| {
ExecServerError::Protocol("exec-server stdin was not captured".to_string())
})?;
let stdout = child.stdout.take().ok_or_else(|| {
ExecServerError::Protocol("exec-server stdout was not captured".to_string())
})?;
Ok(SpawnedStdioExecServer {
child,
stdin,
stdout,
})
}

View File

@@ -3,19 +3,29 @@
use std::process::Stdio;
use std::time::Duration;
use anyhow::Context;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_exec_server::ExecOutputStream;
use codex_exec_server::ExecParams;
use codex_exec_server::ExecServerClient;
use codex_exec_server::ExecServerClientConnectOptions;
use codex_exec_server::ExecServerEvent;
use codex_exec_server::ExecServerLaunchCommand;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use codex_exec_server::RemoteExecServerConnectArgs;
use codex_exec_server::spawn_local_exec_server;
use codex_utils_cargo_bin::cargo_bin;
use pretty_assertions::assert_eq;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::process::Command;
use tokio::sync::broadcast;
use tokio::time::timeout;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -66,7 +76,7 @@ async fn exec_server_accepts_initialize_over_stdio() -> anyhow::Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_stubs_command_exec_over_stdio() -> anyhow::Result<()> {
async fn exec_server_accepts_explicit_none_sandbox_over_stdio() -> anyhow::Result<()> {
let binary = cargo_bin("codex-exec-server")?;
let mut child = Command::new(binary);
child.stdin(Stdio::piped());
@@ -78,6 +88,324 @@ async fn exec_server_stubs_command_exec_over_stdio() -> anyhow::Result<()> {
let stdout = child.stdout.take().expect("stdout");
let mut stdout = BufReader::new(stdout).lines();
send_initialize_over_stdio(&mut stdin, &mut stdout).await?;
let exec = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(2),
method: "process/start".to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"argv": ["printf", "sandbox-none"],
"cwd": std::env::current_dir()?,
"env": {},
"tty": false,
"arg0": null,
"sandbox": {
"mode": "none"
}
})),
trace: None,
});
stdin
.write_all(format!("{}\n", serde_json::to_string(&exec)?).as_bytes())
.await?;
let response_line = timeout(Duration::from_secs(5), stdout.next_line()).await??;
let response_line = response_line.expect("exec response line");
let response: JSONRPCMessage = serde_json::from_str(&response_line)?;
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
panic!("expected process/start response");
};
assert_eq!(id, RequestId::Integer(2));
assert_eq!(result, serde_json::json!({ "processId": "proc-1" }));
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut saw_output = false;
while !saw_output {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let line = timeout(remaining, stdout.next_line()).await??;
let line = line.context("missing process notification")?;
let message: JSONRPCMessage = serde_json::from_str(&line)?;
if let JSONRPCMessage::Notification(JSONRPCNotification { method, params }) = message
&& method == "process/output"
{
let params = params.context("missing process/output params")?;
assert_eq!(params["processId"], "proc-1");
assert_eq!(params["stream"], "stdout");
assert_eq!(params["chunk"], "c2FuZGJveC1ub25l");
saw_output = true;
}
}
child.start_kill()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_rejects_host_default_sandbox_over_stdio() -> anyhow::Result<()> {
let binary = cargo_bin("codex-exec-server")?;
let mut child = Command::new(binary);
child.stdin(Stdio::piped());
child.stdout(Stdio::piped());
child.stderr(Stdio::inherit());
let mut child = child.spawn()?;
let mut stdin = child.stdin.take().expect("stdin");
let stdout = child.stdout.take().expect("stdout");
let mut stdout = BufReader::new(stdout).lines();
send_initialize_over_stdio(&mut stdin, &mut stdout).await?;
let exec = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(2),
method: "process/start".to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"argv": ["bash", "-lc", "true"],
"cwd": std::env::current_dir()?,
"env": {},
"tty": false,
"arg0": null,
"sandbox": {
"mode": "hostDefault"
}
})),
trace: None,
});
stdin
.write_all(format!("{}\n", serde_json::to_string(&exec)?).as_bytes())
.await?;
let response_line = timeout(Duration::from_secs(5), stdout.next_line()).await??;
let response_line = response_line.expect("exec error line");
let response: JSONRPCMessage = serde_json::from_str(&response_line)?;
let JSONRPCMessage::Error(codex_app_server_protocol::JSONRPCError { id, error }) = response
else {
panic!("expected process/start error");
};
assert_eq!(id, RequestId::Integer(2));
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"sandbox mode `hostDefault` is not supported by exec-server yet"
);
child.start_kill()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_client_streams_output_and_accepts_writes() -> anyhow::Result<()> {
let mut env = std::collections::HashMap::new();
if let Some(path) = std::env::var_os("PATH") {
env.insert("PATH".to_string(), path.to_string_lossy().into_owned());
}
let server = spawn_local_exec_server(
ExecServerLaunchCommand {
program: cargo_bin("codex-exec-server")?,
args: Vec::new(),
},
ExecServerClientConnectOptions {
client_name: "exec-server-test".to_string(),
initialize_timeout: Duration::from_secs(5),
},
)
.await?;
let client = server.client();
let mut events = client.event_receiver();
let response = client
.exec(ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
"printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"
.to_string(),
],
cwd: std::env::current_dir()?,
env,
tty: true,
arg0: None,
sandbox: None,
})
.await?;
let process_id = response.process_id;
let (stream, ready_output) = recv_until_contains(&mut events, &process_id, "ready").await?;
assert_eq!(stream, ExecOutputStream::Pty);
assert!(
ready_output.contains("ready"),
"expected initial ready output"
);
client.write(&process_id, b"hello\n".to_vec()).await?;
let (stream, echoed_output) =
recv_until_contains(&mut events, &process_id, "echo:hello").await?;
assert_eq!(stream, ExecOutputStream::Pty);
assert!(
echoed_output.contains("echo:hello"),
"expected echoed output"
);
client.terminate(&process_id).await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_client_connects_over_websocket() -> anyhow::Result<()> {
let mut env = std::collections::HashMap::new();
if let Some(path) = std::env::var_os("PATH") {
env.insert("PATH".to_string(), path.to_string_lossy().into_owned());
}
let binary = cargo_bin("codex-exec-server")?;
let mut child = Command::new(binary);
child.args(["--listen", "ws://127.0.0.1:0"]);
child.stdin(Stdio::null());
child.stdout(Stdio::null());
child.stderr(Stdio::piped());
let mut child = child.spawn()?;
let stderr = child.stderr.take().expect("stderr");
let mut stderr_lines = BufReader::new(stderr).lines();
let websocket_url = read_websocket_url(&mut stderr_lines).await?;
let client = ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
websocket_url,
client_name: "exec-server-test".to_string(),
connect_timeout: Duration::from_secs(5),
initialize_timeout: Duration::from_secs(5),
})
.await?;
let mut events = client.event_receiver();
let response = client
.exec(ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
"printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"
.to_string(),
],
cwd: std::env::current_dir()?,
env,
tty: true,
arg0: None,
sandbox: None,
})
.await?;
let process_id = response.process_id;
let (stream, ready_output) = recv_until_contains(&mut events, &process_id, "ready").await?;
assert_eq!(stream, ExecOutputStream::Pty);
assert!(
ready_output.contains("ready"),
"expected initial ready output"
);
client.write(&process_id, b"hello\n".to_vec()).await?;
let (stream, echoed_output) =
recv_until_contains(&mut events, &process_id, "echo:hello").await?;
assert_eq!(stream, ExecOutputStream::Pty);
assert!(
echoed_output.contains("echo:hello"),
"expected echoed output"
);
client.terminate(&process_id).await?;
child.start_kill()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn websocket_disconnect_terminates_processes_for_that_connection() -> anyhow::Result<()> {
let mut env = std::collections::HashMap::new();
if let Some(path) = std::env::var_os("PATH") {
env.insert("PATH".to_string(), path.to_string_lossy().into_owned());
}
let marker_path = std::env::temp_dir().join(format!(
"codex-exec-server-disconnect-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_nanos()
));
let _ = std::fs::remove_file(&marker_path);
let binary = cargo_bin("codex-exec-server")?;
let mut child = Command::new(binary);
child.args(["--listen", "ws://127.0.0.1:0"]);
child.stdin(Stdio::null());
child.stdout(Stdio::null());
child.stderr(Stdio::piped());
let mut child = child.spawn()?;
let stderr = child.stderr.take().expect("stderr");
let mut stderr_lines = BufReader::new(stderr).lines();
let websocket_url = read_websocket_url(&mut stderr_lines).await?;
{
let client = ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
websocket_url,
client_name: "exec-server-test".to_string(),
connect_timeout: Duration::from_secs(5),
initialize_timeout: Duration::from_secs(5),
})
.await?;
let _response = client
.exec(ExecParams {
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
format!("sleep 2; printf disconnected > {}", marker_path.display()),
],
cwd: std::env::current_dir()?,
env,
tty: false,
arg0: None,
sandbox: None,
})
.await?;
}
tokio::time::sleep(Duration::from_secs(3)).await;
assert!(
!marker_path.exists(),
"managed process should be terminated when the websocket client disconnects"
);
child.start_kill()?;
let _ = std::fs::remove_file(&marker_path);
Ok(())
}
async fn read_websocket_url<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> anyhow::Result<String>
where
R: tokio::io::AsyncRead + Unpin,
{
let line = timeout(Duration::from_secs(5), lines.next_line()).await??;
let line = line.context("missing websocket startup banner")?;
let websocket_url = line
.split_whitespace()
.find(|part| part.starts_with("ws://"))
.context("missing websocket URL in startup banner")?;
Ok(websocket_url.to_string())
}
async fn send_initialize_over_stdio<W, R>(
stdin: &mut W,
stdout: &mut tokio::io::Lines<BufReader<R>>,
) -> anyhow::Result<()>
where
W: tokio::io::AsyncWrite + Unpin,
R: tokio::io::AsyncRead + Unpin,
{
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: "initialize".to_string(),
@@ -89,39 +417,46 @@ async fn exec_server_stubs_command_exec_over_stdio() -> anyhow::Result<()> {
stdin
.write_all(format!("{}\n", serde_json::to_string(&initialize)?).as_bytes())
.await?;
let _ = timeout(Duration::from_secs(5), stdout.next_line()).await??;
let exec = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(2),
method: "command/exec".to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"argv": ["true"],
"cwd": std::env::current_dir()?,
"env": {},
"tty": false,
"arg0": null
})),
trace: None,
});
stdin
.write_all(format!("{}\n", serde_json::to_string(&exec)?).as_bytes())
.await?;
let response_line = timeout(Duration::from_secs(5), stdout.next_line()).await??;
let response_line = response_line.expect("exec response line");
let response_line = response_line
.ok_or_else(|| anyhow::anyhow!("missing initialize response line from stdio server"))?;
let response: JSONRPCMessage = serde_json::from_str(&response_line)?;
let JSONRPCMessage::Error(codex_app_server_protocol::JSONRPCError { id, error }) = response
else {
panic!("expected command/exec stub error");
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
panic!("expected initialize response");
};
assert_eq!(id, RequestId::Integer(2));
assert_eq!(error.code, -32601);
assert_eq!(
error.message,
"exec-server stub does not implement `command/exec` yet"
);
assert_eq!(id, RequestId::Integer(1));
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
assert_eq!(initialize_response.protocol_version, "exec-server.v0");
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
method: "initialized".to_string(),
params: Some(serde_json::json!({})),
});
stdin
.write_all(format!("{}\n", serde_json::to_string(&initialized)?).as_bytes())
.await?;
child.start_kill()?;
Ok(())
}
async fn recv_until_contains(
events: &mut broadcast::Receiver<ExecServerEvent>,
process_id: &str,
needle: &str,
) -> anyhow::Result<(ExecOutputStream, String)> {
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut collected = String::new();
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let event = timeout(remaining, events.recv()).await??;
if let ExecServerEvent::OutputDelta(output_event) = event
&& output_event.process_id == process_id
{
collected.push_str(&String::from_utf8_lossy(&output_event.chunk.into_inner()));
if collected.contains(needle) {
return Ok((output_event.stream, collected));
}
}
}
}