mirror of
https://github.com/openai/codex.git
synced 2026-03-19 13:13:48 +00:00
Compare commits
6 Commits
starr/exec
...
starr/exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
84a6cbe944 | ||
|
|
43b112c263 | ||
|
|
66f49ea604 | ||
|
|
c5dbe421bb | ||
|
|
0a846a2625 | ||
|
|
16ff474725 |
@@ -1,242 +0,0 @@
|
||||
# exec-server design notes
|
||||
|
||||
This document sketches a likely direction for integrating `codex-exec-server`
|
||||
with unified exec without baking the full tool-call policy stack into the
|
||||
server.
|
||||
|
||||
The goal is:
|
||||
|
||||
- keep exec-server generic and reusable
|
||||
- keep approval, sandbox, and retry policy in `core`
|
||||
- preserve the unified-exec event flow the model already depends on
|
||||
- support retained output caps so polling and snapshot-style APIs do not grow
|
||||
memory without bound
|
||||
|
||||
## Unified exec today
|
||||
|
||||
Today the flow for LLM-visible interactive execution is:
|
||||
|
||||
1. The model sees the `exec_command` and `write_stdin` tools.
|
||||
2. `UnifiedExecHandler` parses the tool arguments and allocates a process id.
|
||||
3. `UnifiedExecProcessManager::exec_command(...)` calls
|
||||
`open_session_with_sandbox(...)`.
|
||||
4. `ToolOrchestrator` drives approval, sandbox selection, managed network
|
||||
approval, and sandbox-denial retry behavior.
|
||||
5. `UnifiedExecRuntime` builds a `CommandSpec`, asks the current
|
||||
`SandboxAttempt` to transform it into an `ExecRequest`, and passes that
|
||||
resolved request back to the process manager.
|
||||
6. `open_session_with_exec_env(...)` spawns the process from that resolved
|
||||
`ExecRequest`.
|
||||
7. Unified exec emits an `ExecCommandBegin` event.
|
||||
8. Unified exec starts a background output watcher that emits
|
||||
`ExecCommandOutputDelta` events.
|
||||
9. The initial tool call collects output until the requested yield deadline and
|
||||
returns an `ExecCommandToolOutput` snapshot to the model.
|
||||
10. If the process is still running, unified exec stores it and later emits
|
||||
`ExecCommandEnd` when the exit watcher fires.
|
||||
11. A later `write_stdin` tool call writes to the stored process, emits a
|
||||
`TerminalInteraction` event, collects another bounded snapshot, and returns
|
||||
that tool response to the model.
|
||||
|
||||
Important observation: the 250ms / 10s yield-window behavior is not really a
|
||||
process-server concern. It is a client-side convenience layer for the LLM tool
|
||||
API. The server should focus on raw process lifecycle and streaming events.
|
||||
|
||||
## Proposed boundary
|
||||
|
||||
The clean split is:
|
||||
|
||||
- exec-server server: process lifecycle, output streaming, retained output caps
|
||||
- exec-server client: `wait`, `communicate`, yield-window helpers, session
|
||||
bookkeeping
|
||||
- unified exec in `core`: tool parsing, event emission, approvals, sandboxing,
|
||||
managed networking, retry semantics
|
||||
|
||||
If exec-server is used by unified exec later, the boundary should sit between
|
||||
step 5 and step 6 above: after policy has produced a resolved spawn request, but
|
||||
before the actual PTY or pipe spawn.
|
||||
|
||||
## Suggested process API
|
||||
|
||||
Start simple and explicit:
|
||||
|
||||
- `process/start`
|
||||
- `process/write`
|
||||
- `process/closeStdin`
|
||||
- `process/resize`
|
||||
- `process/terminate`
|
||||
- `process/wait`
|
||||
- `process/snapshot`
|
||||
|
||||
Server notifications:
|
||||
|
||||
- `process/output`
|
||||
- `process/exited`
|
||||
- optionally `process/started`
|
||||
- optionally `process/failed`
|
||||
|
||||
Suggested request shapes:
|
||||
|
||||
```rust
|
||||
enum ProcessStartRequest {
|
||||
Direct(DirectExecSpec),
|
||||
Prepared(PreparedExecSpec),
|
||||
}
|
||||
|
||||
struct DirectExecSpec {
|
||||
process_id: String,
|
||||
argv: Vec<String>,
|
||||
cwd: PathBuf,
|
||||
env: HashMap<String, String>,
|
||||
arg0: Option<String>,
|
||||
io: ProcessIo,
|
||||
}
|
||||
|
||||
struct PreparedExecSpec {
|
||||
process_id: String,
|
||||
request: PreparedExecRequest,
|
||||
io: ProcessIo,
|
||||
}
|
||||
|
||||
enum ProcessIo {
|
||||
Pty { rows: u16, cols: u16 },
|
||||
Pipe { stdin: StdinMode },
|
||||
}
|
||||
|
||||
enum StdinMode {
|
||||
Open,
|
||||
Closed,
|
||||
}
|
||||
|
||||
enum TerminateMode {
|
||||
Graceful { timeout_ms: u64 },
|
||||
Force,
|
||||
}
|
||||
```
|
||||
|
||||
Notes:
|
||||
|
||||
- `processId` remains a protocol handle, not an OS pid.
|
||||
- `wait` is a good generic API because many callers want process completion
|
||||
without manually wiring notifications.
|
||||
- `communicate` is also a reasonable API, but it should probably start as a
|
||||
client helper built on top of `write + closeStdin + wait + snapshot`.
|
||||
- If an RPC form of `communicate` is added later, it should be a convenience
|
||||
wrapper rather than the primitive execution model.
|
||||
|
||||
## Output capping
|
||||
|
||||
Even with event streaming, the server should retain a bounded amount of output
|
||||
per process so callers can poll, wait, or reconnect without unbounded memory
|
||||
growth.
|
||||
|
||||
Suggested behavior:
|
||||
|
||||
- stream every output chunk live via `process/output`
|
||||
- retain capped output per process in memory
|
||||
- keep stdout and stderr separately for pipe-backed processes
|
||||
- for PTY-backed processes, treat retained output as a single terminal stream
|
||||
- expose truncation metadata on snapshots
|
||||
|
||||
Suggested snapshot response:
|
||||
|
||||
```rust
|
||||
struct ProcessSnapshot {
|
||||
stdout: Vec<u8>,
|
||||
stderr: Vec<u8>,
|
||||
terminal: Vec<u8>,
|
||||
truncated: bool,
|
||||
exit_code: Option<i32>,
|
||||
running: bool,
|
||||
}
|
||||
```
|
||||
|
||||
Implementation-wise, the current `HeadTailBuffer` pattern used by unified exec
|
||||
is a good fit. The cap should be server config, not request config, so memory
|
||||
use stays predictable.
|
||||
|
||||
## Sandboxing and networking
|
||||
|
||||
### How unified exec does it today
|
||||
|
||||
Unified exec does not hand raw command args directly to the PTY layer for tool
|
||||
calls. Instead, it:
|
||||
|
||||
1. computes approval requirements
|
||||
2. chooses a sandbox attempt
|
||||
3. applies managed-network policy if needed
|
||||
4. transforms `CommandSpec` into `ExecRequest`
|
||||
5. spawns from that resolved `ExecRequest`
|
||||
|
||||
That split is already valuable and should be preserved.
|
||||
|
||||
### Recommended exec-server design
|
||||
|
||||
Do not put approval policy into exec-server.
|
||||
|
||||
Instead, support two execution modes:
|
||||
|
||||
- `Direct`: raw command, intended for orchestrator-side or already-trusted use
|
||||
- `Prepared`: already-resolved spawn request, intended for tool-call execution
|
||||
|
||||
For tool calls from the LLM side:
|
||||
|
||||
1. `core` runs the existing approval + sandbox + managed-network flow
|
||||
2. `core` produces a resolved `ExecRequest`
|
||||
3. the exec-server client sends `PreparedExecSpec`
|
||||
4. exec-server spawns exactly that request and streams process events
|
||||
|
||||
For orchestrator-side execution:
|
||||
|
||||
1. caller sends `DirectExecSpec`
|
||||
2. exec-server spawns directly without running approval or sandbox policy
|
||||
|
||||
This gives one generic process API while keeping the policy-sensitive logic in
|
||||
the place that already owns it.
|
||||
|
||||
### Why not make exec-server own sandbox selection?
|
||||
|
||||
That would force exec-server to understand:
|
||||
|
||||
- approval policy
|
||||
- exec policy / prefix rules
|
||||
- managed-network approval flow
|
||||
- sandbox retry semantics
|
||||
- guardian routing
|
||||
- feature-flag-driven sandbox selection
|
||||
- platform-specific sandbox helper configuration
|
||||
|
||||
That is too opinionated for a reusable process service.
|
||||
|
||||
## Optional future server config
|
||||
|
||||
If exec-server grows beyond the current prototype, a config object like this
|
||||
would be enough:
|
||||
|
||||
```rust
|
||||
struct ExecServerConfig {
|
||||
shutdown_grace_period_ms: u64,
|
||||
max_processes_per_connection: usize,
|
||||
retained_output_bytes_per_process: usize,
|
||||
allow_direct_exec: bool,
|
||||
allow_prepared_exec: bool,
|
||||
}
|
||||
```
|
||||
|
||||
That keeps policy surface small:
|
||||
|
||||
- lifecycle limits live in the server
|
||||
- trust and sandbox policy stay with the caller
|
||||
|
||||
## Mapping back to LLM-visible events
|
||||
|
||||
If unified exec is later backed by exec-server, the `core` client wrapper should
|
||||
keep owning the translation into the existing event model:
|
||||
|
||||
- `process/start` success -> `ExecCommandBegin`
|
||||
- `process/output` -> `ExecCommandOutputDelta`
|
||||
- local `process/write` call -> `TerminalInteraction`
|
||||
- `process/exited` plus retained transcript -> `ExecCommandEnd`
|
||||
|
||||
That preserves the current LLM-facing contract while making the process backend
|
||||
swappable.
|
||||
@@ -1,50 +1,28 @@
|
||||
# codex-exec-server
|
||||
|
||||
`codex-exec-server` is a small standalone JSON-RPC server for spawning and
|
||||
controlling subprocesses through `codex-utils-pty`.
|
||||
`codex-exec-server` is a small standalone stdio JSON-RPC server for spawning
|
||||
and controlling subprocesses through `codex-utils-pty`.
|
||||
|
||||
This PR intentionally lands only the standalone binary, client, wire protocol,
|
||||
and docs. Exec and filesystem methods are stubbed server-side here and are
|
||||
implemented in follow-up PRs.
|
||||
|
||||
It currently provides:
|
||||
|
||||
- a standalone binary: `codex-exec-server`
|
||||
- a transport-agnostic server runtime with stdio and websocket entrypoints
|
||||
- a Rust client: `ExecServerClient`
|
||||
- a direct in-process client mode: `ExecServerClient::connect_in_process`
|
||||
- a separate local launch helper: `spawn_local_exec_server`
|
||||
- a small protocol module with shared request/response types
|
||||
|
||||
This crate is intentionally narrow. It is not wired into the main Codex CLI or
|
||||
unified-exec in this PR; it is only the standalone transport layer.
|
||||
|
||||
The internal shape is intentionally closer to `app-server` than the first cut:
|
||||
|
||||
- transport adapters are separate from the per-connection request processor
|
||||
- JSON-RPC route matching is separate from the stateful exec handler
|
||||
- the client only speaks the protocol; it does not spawn a server subprocess
|
||||
- the client can also bypass the JSON-RPC transport/routing layer in local
|
||||
in-process mode and call the typed handler directly
|
||||
- local child-process launch is handled by a separate helper/factory layer
|
||||
|
||||
That split is meant to leave reusable seams if exec-server and app-server later
|
||||
share transport or JSON-RPC connection utilities. It also keeps the core
|
||||
handler testable without the RPC server implementation itself.
|
||||
|
||||
Design notes for a likely future integration with unified exec, including
|
||||
rough call flow, buffering, and sandboxing boundaries, live in
|
||||
[DESIGN.md](./DESIGN.md).
|
||||
|
||||
## Transport
|
||||
|
||||
The server speaks the same JSON-RPC message shapes over multiple transports.
|
||||
The server speaks newline-delimited JSON-RPC 2.0 over stdio.
|
||||
|
||||
The standalone binary supports:
|
||||
|
||||
- `stdio://` (default)
|
||||
- `ws://IP:PORT`
|
||||
|
||||
Wire framing:
|
||||
|
||||
- stdio: one newline-delimited JSON-RPC message per line on stdin/stdout
|
||||
- websocket: one JSON-RPC message per websocket text frame
|
||||
- `stdin`: one JSON-RPC message per line
|
||||
- `stdout`: one JSON-RPC message per line
|
||||
- `stderr`: reserved for logs / process errors
|
||||
|
||||
Like the app-server transport, messages on the wire omit the `"jsonrpc":"2.0"`
|
||||
field and use the shared `codex-app-server-protocol` envelope types.
|
||||
@@ -62,19 +40,13 @@ Each connection follows this sequence:
|
||||
1. Send `initialize`.
|
||||
2. Wait for the `initialize` response.
|
||||
3. Send `initialized`.
|
||||
4. Start and manage processes with `process/start`, `process/read`,
|
||||
`process/write`, and `process/terminate`.
|
||||
5. Read streaming notifications from `process/output` and
|
||||
`process/exited`.
|
||||
4. Call exec or filesystem RPCs once the follow-up implementation PRs land.
|
||||
|
||||
If the client sends exec methods before completing the `initialize` /
|
||||
`initialized` handshake, the server rejects them.
|
||||
If the server receives any notification other than `initialized`, it replies
|
||||
with an error using request id `-1`.
|
||||
|
||||
If a connection closes, the server terminates any remaining managed processes
|
||||
for that connection.
|
||||
|
||||
TODO: add authentication to the `initialize` setup before this is used across a
|
||||
trust boundary.
|
||||
If the stdio connection closes, the server terminates any remaining managed
|
||||
processes before exiting.
|
||||
|
||||
## API
|
||||
|
||||
@@ -101,12 +73,12 @@ Response:
|
||||
### `initialized`
|
||||
|
||||
Handshake acknowledgement notification sent by the client after a successful
|
||||
`initialize` response. Exec methods are rejected until this arrives.
|
||||
`initialize` response.
|
||||
|
||||
Params are currently ignored. Sending any other client notification method is a
|
||||
protocol error.
|
||||
Params are currently ignored. Sending any other notification method is treated
|
||||
as an invalid request.
|
||||
|
||||
### `process/start`
|
||||
### `command/exec`
|
||||
|
||||
Starts a new managed process.
|
||||
|
||||
@@ -121,36 +93,44 @@ Request params:
|
||||
"PATH": "/usr/bin:/bin"
|
||||
},
|
||||
"tty": true,
|
||||
"outputBytesCap": 16384,
|
||||
"arg0": null
|
||||
}
|
||||
```
|
||||
|
||||
Field definitions:
|
||||
|
||||
- `processId`: caller-chosen stable id for this process within the connection.
|
||||
- `argv`: command vector. It must be non-empty.
|
||||
- `cwd`: absolute working directory used for the child process.
|
||||
- `env`: environment variables passed to the child process.
|
||||
- `tty`: when `true`, spawn a PTY-backed interactive process; when `false`,
|
||||
spawn a pipe-backed process with closed stdin.
|
||||
- `outputBytesCap`: maximum retained stdout/stderr bytes per stream for the
|
||||
in-memory buffer. Defaults to `codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP`.
|
||||
- `arg0`: optional argv0 override forwarded to `codex-utils-pty`.
|
||||
|
||||
Response:
|
||||
|
||||
```json
|
||||
{
|
||||
"processId": "proc-1"
|
||||
"processId": "proc-1",
|
||||
"running": true,
|
||||
"exitCode": null,
|
||||
"stdout": null,
|
||||
"stderr": null
|
||||
}
|
||||
```
|
||||
|
||||
Behavior notes:
|
||||
|
||||
- `processId` is chosen by the client and must be unique for the connection.
|
||||
- PTY-backed processes accept later writes through `process/write`.
|
||||
- Reusing an existing `processId` is rejected.
|
||||
- PTY-backed processes accept later writes through `command/exec/write`.
|
||||
- Pipe-backed processes are launched with stdin closed and reject writes.
|
||||
- Output is streamed asynchronously via `process/output`.
|
||||
- Exit is reported asynchronously via `process/exited`.
|
||||
- Output is streamed asynchronously via `command/exec/outputDelta`.
|
||||
- Exit is reported asynchronously via `command/exec/exited`.
|
||||
|
||||
### `process/write`
|
||||
### `command/exec/write`
|
||||
|
||||
Writes raw bytes to a running PTY-backed process stdin.
|
||||
|
||||
@@ -178,48 +158,7 @@ Behavior notes:
|
||||
- Writes to an unknown `processId` are rejected.
|
||||
- Writes to a non-PTY process are rejected because stdin is already closed.
|
||||
|
||||
### `process/read`
|
||||
|
||||
Reads retained output from a managed process by sequence number.
|
||||
|
||||
Request params:
|
||||
|
||||
```json
|
||||
{
|
||||
"processId": "proc-1",
|
||||
"afterSeq": 0,
|
||||
"maxBytes": 65536,
|
||||
"waitMs": 250
|
||||
}
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```json
|
||||
{
|
||||
"chunks": [
|
||||
{
|
||||
"seq": 1,
|
||||
"stream": "pty",
|
||||
"chunk": "aGVsbG8K"
|
||||
}
|
||||
],
|
||||
"nextSeq": 2,
|
||||
"exited": false,
|
||||
"exitCode": null
|
||||
}
|
||||
```
|
||||
|
||||
Behavior notes:
|
||||
|
||||
- Output is retained in bounded server memory so callers can poll without
|
||||
relying only on notifications.
|
||||
- `afterSeq` is exclusive: `0` reads from the beginning of the retained buffer.
|
||||
- `waitMs` waits briefly for new output or exit if nothing is currently
|
||||
available.
|
||||
- Once retained output exceeds the per-process cap, oldest chunks are dropped.
|
||||
|
||||
### `process/terminate`
|
||||
### `command/exec/terminate`
|
||||
|
||||
Terminates a running managed process.
|
||||
|
||||
@@ -249,7 +188,7 @@ If the process is already unknown or already removed, the server responds with:
|
||||
|
||||
## Notifications
|
||||
|
||||
### `process/output`
|
||||
### `command/exec/outputDelta`
|
||||
|
||||
Streaming output chunk from a running process.
|
||||
|
||||
@@ -266,10 +205,10 @@ Params:
|
||||
Fields:
|
||||
|
||||
- `processId`: process identifier
|
||||
- `stream`: `"stdout"`, `"stderr"`, or `"pty"` for PTY-backed processes
|
||||
- `stream`: `"stdout"` or `"stderr"`
|
||||
- `chunk`: base64-encoded output bytes
|
||||
|
||||
### `process/exited`
|
||||
### `command/exec/exited`
|
||||
|
||||
Final process exit notification.
|
||||
|
||||
@@ -304,58 +243,13 @@ Typical error cases:
|
||||
The crate exports:
|
||||
|
||||
- `ExecServerClient`
|
||||
- `ExecServerClientConnectOptions`
|
||||
- `RemoteExecServerConnectArgs`
|
||||
- `ExecServerLaunchCommand`
|
||||
- `ExecServerEvent`
|
||||
- `SpawnedExecServer`
|
||||
- `ExecServerProcess`
|
||||
- `ExecServerError`
|
||||
- `ExecServerTransport`
|
||||
- `spawn_local_exec_server(...)`
|
||||
- protocol structs such as `ExecParams`, `ExecResponse`,
|
||||
`WriteParams`, `TerminateParams`, `ExecOutputDeltaNotification`, and
|
||||
`ExecExitedNotification`
|
||||
- `run_main()` and `run_main_with_transport(...)`
|
||||
|
||||
### Binary
|
||||
|
||||
Run over stdio:
|
||||
|
||||
```text
|
||||
codex-exec-server
|
||||
```
|
||||
|
||||
Run as a websocket server:
|
||||
|
||||
```text
|
||||
codex-exec-server --listen ws://127.0.0.1:8080
|
||||
```
|
||||
|
||||
### Client
|
||||
|
||||
Connect the client to an existing server transport:
|
||||
|
||||
- `ExecServerClient::connect_stdio(...)`
|
||||
- `ExecServerClient::connect_websocket(...)`
|
||||
- `ExecServerClient::connect_in_process(...)` for a local no-transport mode
|
||||
backed directly by the typed handler
|
||||
|
||||
Timeout behavior:
|
||||
|
||||
- stdio and websocket clients both enforce an initialize-handshake timeout
|
||||
- websocket clients also enforce a connect timeout before the handshake begins
|
||||
|
||||
Events:
|
||||
|
||||
- `ExecServerClient::event_receiver()` yields `ExecServerEvent`
|
||||
- output events include both `stream` (`stdout`, `stderr`, or `pty`) and raw
|
||||
bytes
|
||||
- process lifetime is tracked by server notifications such as
|
||||
`process/exited`, not by a client-side process registry
|
||||
|
||||
Spawning a local child process is deliberately separate:
|
||||
|
||||
- `spawn_local_exec_server(...)`
|
||||
- `run_main()` for embedding the stdio server in a binary
|
||||
|
||||
## Example session
|
||||
|
||||
@@ -370,23 +264,23 @@ Initialize:
|
||||
Start a process:
|
||||
|
||||
```json
|
||||
{"id":2,"method":"process/start","params":{"processId":"proc-1","argv":["bash","-lc","printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"],"cwd":"/tmp","env":{"PATH":"/usr/bin:/bin"},"tty":true,"arg0":null}}
|
||||
{"id":2,"result":{"processId":"proc-1"}}
|
||||
{"method":"process/output","params":{"processId":"proc-1","stream":"pty","chunk":"cmVhZHkK"}}
|
||||
{"id":2,"method":"command/exec","params":{"processId":"proc-1","argv":["bash","-lc","printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"],"cwd":"/tmp","env":{"PATH":"/usr/bin:/bin"},"tty":true,"outputBytesCap":4096,"arg0":null}}
|
||||
{"id":2,"result":{"processId":"proc-1","running":true,"exitCode":null,"stdout":null,"stderr":null}}
|
||||
{"method":"command/exec/outputDelta","params":{"processId":"proc-1","stream":"stdout","chunk":"cmVhZHkK"}}
|
||||
```
|
||||
|
||||
Write to the process:
|
||||
|
||||
```json
|
||||
{"id":3,"method":"process/write","params":{"processId":"proc-1","chunk":"aGVsbG8K"}}
|
||||
{"id":3,"method":"command/exec/write","params":{"processId":"proc-1","chunk":"aGVsbG8K"}}
|
||||
{"id":3,"result":{"accepted":true}}
|
||||
{"method":"process/output","params":{"processId":"proc-1","stream":"pty","chunk":"ZWNobzpoZWxsbwo="}}
|
||||
{"method":"command/exec/outputDelta","params":{"processId":"proc-1","stream":"stdout","chunk":"ZWNobzpoZWxsbwo="}}
|
||||
```
|
||||
|
||||
Terminate it:
|
||||
|
||||
```json
|
||||
{"id":4,"method":"process/terminate","params":{"processId":"proc-1"}}
|
||||
{"id":4,"method":"command/exec/terminate","params":{"processId":"proc-1"}}
|
||||
{"id":4,"result":{"running":true}}
|
||||
{"method":"process/exited","params":{"processId":"proc-1","exitCode":0}}
|
||||
{"method":"command/exec/exited","params":{"processId":"proc-1","exitCode":0}}
|
||||
```
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
95
codex-rs/exec-server/src/client/local_backend.rs
Normal file
95
codex-rs/exec-server/src/client/local_backend.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::ExecResponse;
|
||||
use crate::protocol::InitializeResponse;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::ReadResponse;
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteParams;
|
||||
use crate::protocol::WriteResponse;
|
||||
use crate::server::ExecServerHandler;
|
||||
|
||||
use super::ExecServerError;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(super) struct LocalBackend {
|
||||
handler: Arc<ExecServerHandler>,
|
||||
}
|
||||
|
||||
impl LocalBackend {
|
||||
pub(super) fn new(handler: ExecServerHandler) -> Self {
|
||||
Self {
|
||||
handler: Arc::new(handler),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn shutdown(&self) {
|
||||
self.handler.shutdown().await;
|
||||
}
|
||||
|
||||
pub(super) async fn initialize(&self) -> Result<InitializeResponse, ExecServerError> {
|
||||
self.handler
|
||||
.initialize()
|
||||
.map_err(|error| ExecServerError::Server {
|
||||
code: error.code,
|
||||
message: error.message,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn initialized(&self) -> Result<(), ExecServerError> {
|
||||
self.handler
|
||||
.initialized()
|
||||
.map_err(ExecServerError::Protocol)
|
||||
}
|
||||
|
||||
pub(super) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
|
||||
self.handler
|
||||
.exec(params)
|
||||
.await
|
||||
.map_err(|error| ExecServerError::Server {
|
||||
code: error.code,
|
||||
message: error.message,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn exec_read(
|
||||
&self,
|
||||
params: ReadParams,
|
||||
) -> Result<ReadResponse, ExecServerError> {
|
||||
self.handler
|
||||
.exec_read(params)
|
||||
.await
|
||||
.map_err(|error| ExecServerError::Server {
|
||||
code: error.code,
|
||||
message: error.message,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn exec_write(
|
||||
&self,
|
||||
params: WriteParams,
|
||||
) -> Result<WriteResponse, ExecServerError> {
|
||||
self.handler
|
||||
.exec_write(params)
|
||||
.await
|
||||
.map_err(|error| ExecServerError::Server {
|
||||
code: error.code,
|
||||
message: error.message,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn terminate(
|
||||
&self,
|
||||
params: TerminateParams,
|
||||
) -> Result<TerminateResponse, ExecServerError> {
|
||||
self.handler
|
||||
.terminate(params)
|
||||
.await
|
||||
.map_err(|error| ExecServerError::Server {
|
||||
code: error.code,
|
||||
message: error.message,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ pub(crate) enum JsonRpcConnectionEvent {
|
||||
pub(crate) struct JsonRpcConnection {
|
||||
outgoing_tx: mpsc::Sender<JSONRPCMessage>,
|
||||
incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
task_handles: Vec<tokio::task::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl JsonRpcConnection {
|
||||
@@ -35,7 +36,7 @@ impl JsonRpcConnection {
|
||||
|
||||
let reader_label = connection_label.clone();
|
||||
let incoming_tx_for_reader = incoming_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let reader_task = tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(reader).lines();
|
||||
loop {
|
||||
match lines.next_line().await {
|
||||
@@ -66,7 +67,7 @@ impl JsonRpcConnection {
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
send_disconnected(&incoming_tx_for_reader, None).await;
|
||||
send_disconnected(&incoming_tx_for_reader, /*reason*/ None).await;
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -83,7 +84,7 @@ impl JsonRpcConnection {
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
let writer_task = tokio::spawn(async move {
|
||||
let mut writer = BufWriter::new(writer);
|
||||
while let Some(message) = outgoing_rx.recv().await {
|
||||
if let Err(err) = write_jsonrpc_line_message(&mut writer, &message).await {
|
||||
@@ -102,6 +103,7 @@ impl JsonRpcConnection {
|
||||
Self {
|
||||
outgoing_tx,
|
||||
incoming_rx,
|
||||
task_handles: vec![reader_task, writer_task],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,7 +117,7 @@ impl JsonRpcConnection {
|
||||
|
||||
let reader_label = connection_label.clone();
|
||||
let incoming_tx_for_reader = incoming_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let reader_task = tokio::spawn(async move {
|
||||
loop {
|
||||
match websocket_reader.next().await {
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
@@ -165,7 +167,7 @@ impl JsonRpcConnection {
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Close(_))) => {
|
||||
send_disconnected(&incoming_tx_for_reader, None).await;
|
||||
send_disconnected(&incoming_tx_for_reader, /*reason*/ None).await;
|
||||
break;
|
||||
}
|
||||
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {}
|
||||
@@ -181,14 +183,14 @@ impl JsonRpcConnection {
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
send_disconnected(&incoming_tx_for_reader, None).await;
|
||||
send_disconnected(&incoming_tx_for_reader, /*reason*/ None).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
let writer_task = tokio::spawn(async move {
|
||||
while let Some(message) = outgoing_rx.recv().await {
|
||||
match serialize_jsonrpc_message(&message) {
|
||||
Ok(encoded) => {
|
||||
@@ -221,6 +223,7 @@ impl JsonRpcConnection {
|
||||
Self {
|
||||
outgoing_tx,
|
||||
incoming_rx,
|
||||
task_handles: vec![reader_task, writer_task],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,8 +232,9 @@ impl JsonRpcConnection {
|
||||
) -> (
|
||||
mpsc::Sender<JSONRPCMessage>,
|
||||
mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
Vec<tokio::task::JoinHandle<()>>,
|
||||
) {
|
||||
(self.outgoing_tx, self.incoming_rx)
|
||||
(self.outgoing_tx, self.incoming_rx, self.task_handles)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,158 +264,3 @@ where
|
||||
fn serialize_jsonrpc_message(message: &JSONRPCMessage) -> Result<String, serde_json::Error> {
|
||||
serde_json::to_string(message)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::JsonRpcConnection;
|
||||
use super::JsonRpcConnectionEvent;
|
||||
use super::serialize_jsonrpc_message;
|
||||
|
||||
async fn recv_event(
|
||||
incoming_rx: &mut mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
) -> JsonRpcConnectionEvent {
|
||||
let recv_result = timeout(Duration::from_secs(1), incoming_rx.recv()).await;
|
||||
let maybe_event = match recv_result {
|
||||
Ok(maybe_event) => maybe_event,
|
||||
Err(err) => panic!("timed out waiting for connection event: {err}"),
|
||||
};
|
||||
match maybe_event {
|
||||
Some(event) => event,
|
||||
None => panic!("connection event stream ended unexpectedly"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_jsonrpc_line<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> JSONRPCMessage
|
||||
where
|
||||
R: tokio::io::AsyncRead + Unpin,
|
||||
{
|
||||
let next_line = timeout(Duration::from_secs(1), lines.next_line()).await;
|
||||
let line_result = match next_line {
|
||||
Ok(line_result) => line_result,
|
||||
Err(err) => panic!("timed out waiting for JSON-RPC line: {err}"),
|
||||
};
|
||||
let maybe_line = match line_result {
|
||||
Ok(maybe_line) => maybe_line,
|
||||
Err(err) => panic!("failed to read JSON-RPC line: {err}"),
|
||||
};
|
||||
let line = match maybe_line {
|
||||
Some(line) => line,
|
||||
None => panic!("connection closed before JSON-RPC line arrived"),
|
||||
};
|
||||
match serde_json::from_str::<JSONRPCMessage>(&line) {
|
||||
Ok(message) => message,
|
||||
Err(err) => panic!("failed to parse JSON-RPC line: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stdio_connection_reads_and_writes_jsonrpc_messages() {
|
||||
let (mut writer_to_connection, connection_reader) = tokio::io::duplex(1024);
|
||||
let (connection_writer, reader_from_connection) = tokio::io::duplex(1024);
|
||||
let connection =
|
||||
JsonRpcConnection::from_stdio(connection_reader, connection_writer, "test".to_string());
|
||||
let (outgoing_tx, mut incoming_rx) = connection.into_parts();
|
||||
|
||||
let incoming_message = JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: RequestId::Integer(7),
|
||||
method: "initialize".to_string(),
|
||||
params: Some(serde_json::json!({ "clientName": "test-client" })),
|
||||
trace: None,
|
||||
});
|
||||
let encoded = match serialize_jsonrpc_message(&incoming_message) {
|
||||
Ok(encoded) => encoded,
|
||||
Err(err) => panic!("failed to serialize incoming message: {err}"),
|
||||
};
|
||||
if let Err(err) = writer_to_connection
|
||||
.write_all(format!("{encoded}\n").as_bytes())
|
||||
.await
|
||||
{
|
||||
panic!("failed to write to connection: {err}");
|
||||
}
|
||||
|
||||
let event = recv_event(&mut incoming_rx).await;
|
||||
match event {
|
||||
JsonRpcConnectionEvent::Message(message) => {
|
||||
assert_eq!(message, incoming_message);
|
||||
}
|
||||
JsonRpcConnectionEvent::Disconnected { reason } => {
|
||||
panic!("unexpected disconnect event: {reason:?}");
|
||||
}
|
||||
}
|
||||
|
||||
let outgoing_message = JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: RequestId::Integer(7),
|
||||
result: serde_json::json!({ "protocolVersion": "exec-server.v0" }),
|
||||
});
|
||||
if let Err(err) = outgoing_tx.send(outgoing_message.clone()).await {
|
||||
panic!("failed to queue outgoing message: {err}");
|
||||
}
|
||||
|
||||
let mut lines = BufReader::new(reader_from_connection).lines();
|
||||
let message = read_jsonrpc_line(&mut lines).await;
|
||||
assert_eq!(message, outgoing_message);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stdio_connection_reports_parse_errors() {
|
||||
let (mut writer_to_connection, connection_reader) = tokio::io::duplex(1024);
|
||||
let (connection_writer, _reader_from_connection) = tokio::io::duplex(1024);
|
||||
let connection =
|
||||
JsonRpcConnection::from_stdio(connection_reader, connection_writer, "test".to_string());
|
||||
let (_outgoing_tx, mut incoming_rx) = connection.into_parts();
|
||||
|
||||
if let Err(err) = writer_to_connection.write_all(b"not-json\n").await {
|
||||
panic!("failed to write invalid JSON: {err}");
|
||||
}
|
||||
|
||||
let event = recv_event(&mut incoming_rx).await;
|
||||
match event {
|
||||
JsonRpcConnectionEvent::Disconnected { reason } => {
|
||||
let reason = match reason {
|
||||
Some(reason) => reason,
|
||||
None => panic!("expected a parse error reason"),
|
||||
};
|
||||
assert!(
|
||||
reason.contains("failed to parse JSON-RPC message from test"),
|
||||
"unexpected disconnect reason: {reason}"
|
||||
);
|
||||
}
|
||||
JsonRpcConnectionEvent::Message(message) => {
|
||||
panic!("unexpected JSON-RPC message: {message:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stdio_connection_reports_clean_disconnect() {
|
||||
let (writer_to_connection, connection_reader) = tokio::io::duplex(1024);
|
||||
let (connection_writer, _reader_from_connection) = tokio::io::duplex(1024);
|
||||
let connection =
|
||||
JsonRpcConnection::from_stdio(connection_reader, connection_writer, "test".to_string());
|
||||
let (_outgoing_tx, mut incoming_rx) = connection.into_parts();
|
||||
drop(writer_to_connection);
|
||||
|
||||
let event = recv_event(&mut incoming_rx).await;
|
||||
match event {
|
||||
JsonRpcConnectionEvent::Disconnected { reason } => {
|
||||
assert_eq!(reason, None);
|
||||
}
|
||||
JsonRpcConnectionEvent::Message(message) => {
|
||||
panic!("unexpected JSON-RPC message: {message:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ mod client_api;
|
||||
mod connection;
|
||||
mod local;
|
||||
mod protocol;
|
||||
mod rpc;
|
||||
mod server;
|
||||
|
||||
pub use client::ExecServerClient;
|
||||
@@ -20,6 +21,8 @@ pub use protocol::ExecParams;
|
||||
pub use protocol::ExecResponse;
|
||||
pub use protocol::InitializeParams;
|
||||
pub use protocol::InitializeResponse;
|
||||
pub use protocol::ReadParams;
|
||||
pub use protocol::ReadResponse;
|
||||
pub use protocol::TerminateParams;
|
||||
pub use protocol::TerminateResponse;
|
||||
pub use protocol::WriteParams;
|
||||
|
||||
562
codex-rs/exec-server/src/rpc.rs
Normal file
562
codex-rs/exec-server/src/rpc.rs
Normal file
@@ -0,0 +1,562 @@
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::connection::JsonRpcConnectionEvent;
|
||||
|
||||
type PendingRequest = oneshot::Sender<Result<Value, JSONRPCErrorError>>;
|
||||
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
|
||||
type RequestRoute<S> =
|
||||
Box<dyn Fn(Arc<S>, JSONRPCRequest) -> BoxFuture<RpcServerOutboundMessage> + Send + Sync>;
|
||||
type NotificationRoute<S> =
|
||||
Box<dyn Fn(Arc<S>, JSONRPCNotification) -> BoxFuture<Result<(), String>> + Send + Sync>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum RpcClientEvent {
|
||||
Notification(JSONRPCNotification),
|
||||
Disconnected { reason: Option<String> },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub(crate) enum RpcServerOutboundMessage {
|
||||
Response {
|
||||
request_id: RequestId,
|
||||
result: Value,
|
||||
},
|
||||
Error {
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
},
|
||||
#[allow(dead_code)]
|
||||
Notification(JSONRPCNotification),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RpcNotificationSender {
|
||||
outgoing_tx: mpsc::Sender<RpcServerOutboundMessage>,
|
||||
}
|
||||
|
||||
impl RpcNotificationSender {
|
||||
pub(crate) fn new(outgoing_tx: mpsc::Sender<RpcServerOutboundMessage>) -> Self {
|
||||
Self { outgoing_tx }
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn notify<P: Serialize>(
|
||||
&self,
|
||||
method: &str,
|
||||
params: &P,
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
let params = serde_json::to_value(params).map_err(|err| internal_error(err.to_string()))?;
|
||||
self.outgoing_tx
|
||||
.send(RpcServerOutboundMessage::Notification(
|
||||
JSONRPCNotification {
|
||||
method: method.to_string(),
|
||||
params: Some(params),
|
||||
},
|
||||
))
|
||||
.await
|
||||
.map_err(|_| internal_error("RPC connection closed while sending notification".into()))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RpcRouter<S> {
|
||||
request_routes: HashMap<&'static str, RequestRoute<S>>,
|
||||
notification_routes: HashMap<&'static str, NotificationRoute<S>>,
|
||||
}
|
||||
|
||||
impl<S> Default for RpcRouter<S> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
request_routes: HashMap::new(),
|
||||
notification_routes: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> RpcRouter<S>
|
||||
where
|
||||
S: Send + Sync + 'static,
|
||||
{
|
||||
pub(crate) fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub(crate) fn request<P, R, F, Fut>(&mut self, method: &'static str, handler: F)
|
||||
where
|
||||
P: DeserializeOwned + Send + 'static,
|
||||
R: Serialize + Send + 'static,
|
||||
F: Fn(Arc<S>, P) -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = Result<R, JSONRPCErrorError>> + Send + 'static,
|
||||
{
|
||||
self.request_routes.insert(
|
||||
method,
|
||||
Box::new(move |state, request| {
|
||||
let request_id = request.id;
|
||||
let params = request.params;
|
||||
let response =
|
||||
decode_request_params::<P>(params).map(|params| handler(state, params));
|
||||
Box::pin(async move {
|
||||
let response = match response {
|
||||
Ok(response) => response.await,
|
||||
Err(error) => {
|
||||
return RpcServerOutboundMessage::Error { request_id, error };
|
||||
}
|
||||
};
|
||||
match response {
|
||||
Ok(result) => match serde_json::to_value(result) {
|
||||
Ok(result) => RpcServerOutboundMessage::Response { request_id, result },
|
||||
Err(err) => RpcServerOutboundMessage::Error {
|
||||
request_id,
|
||||
error: internal_error(err.to_string()),
|
||||
},
|
||||
},
|
||||
Err(error) => RpcServerOutboundMessage::Error { request_id, error },
|
||||
}
|
||||
})
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn notification<P, F, Fut>(&mut self, method: &'static str, handler: F)
|
||||
where
|
||||
P: DeserializeOwned + Send + 'static,
|
||||
F: Fn(Arc<S>, P) -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = Result<(), String>> + Send + 'static,
|
||||
{
|
||||
self.notification_routes.insert(
|
||||
method,
|
||||
Box::new(move |state, notification| {
|
||||
let params = decode_notification_params::<P>(notification.params)
|
||||
.map(|params| handler(state, params));
|
||||
Box::pin(async move {
|
||||
let handler = match params {
|
||||
Ok(handler) => handler,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
handler.await
|
||||
})
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn request_route(&self, method: &str) -> Option<&RequestRoute<S>> {
|
||||
self.request_routes.get(method)
|
||||
}
|
||||
|
||||
pub(crate) fn notification_route(&self, method: &str) -> Option<&NotificationRoute<S>> {
|
||||
self.notification_routes.get(method)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RpcClient {
|
||||
write_tx: mpsc::Sender<JSONRPCMessage>,
|
||||
pending: Arc<Mutex<HashMap<RequestId, PendingRequest>>>,
|
||||
next_request_id: AtomicI64,
|
||||
transport_tasks: Vec<JoinHandle<()>>,
|
||||
reader_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl RpcClient {
|
||||
pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver<RpcClientEvent>) {
|
||||
let (write_tx, mut incoming_rx, transport_tasks) = connection.into_parts();
|
||||
let pending = Arc::new(Mutex::new(HashMap::<RequestId, PendingRequest>::new()));
|
||||
let (event_tx, event_rx) = mpsc::channel(128);
|
||||
|
||||
let pending_for_reader = Arc::clone(&pending);
|
||||
let reader_task = tokio::spawn(async move {
|
||||
while let Some(event) = incoming_rx.recv().await {
|
||||
match event {
|
||||
JsonRpcConnectionEvent::Message(message) => {
|
||||
if let Err(err) =
|
||||
handle_server_message(&pending_for_reader, &event_tx, message).await
|
||||
{
|
||||
warn!("JSON-RPC client closing after protocol error: {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
JsonRpcConnectionEvent::Disconnected { reason } => {
|
||||
let _ = event_tx.send(RpcClientEvent::Disconnected { reason }).await;
|
||||
drain_pending(&pending_for_reader).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let _ = event_tx
|
||||
.send(RpcClientEvent::Disconnected { reason: None })
|
||||
.await;
|
||||
drain_pending(&pending_for_reader).await;
|
||||
});
|
||||
|
||||
(
|
||||
Self {
|
||||
write_tx,
|
||||
pending,
|
||||
next_request_id: AtomicI64::new(1),
|
||||
transport_tasks,
|
||||
reader_task,
|
||||
},
|
||||
event_rx,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn notify<P: Serialize>(
|
||||
&self,
|
||||
method: &str,
|
||||
params: &P,
|
||||
) -> Result<(), serde_json::Error> {
|
||||
let params = serde_json::to_value(params)?;
|
||||
self.write_tx
|
||||
.send(JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: method.to_string(),
|
||||
params: Some(params),
|
||||
}))
|
||||
.await
|
||||
.map_err(|_| {
|
||||
serde_json::Error::io(std::io::Error::new(
|
||||
std::io::ErrorKind::BrokenPipe,
|
||||
"JSON-RPC transport closed",
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn call<P, T>(&self, method: &str, params: &P) -> Result<T, RpcCallError>
|
||||
where
|
||||
P: Serialize,
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
let request_id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::SeqCst));
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.pending
|
||||
.lock()
|
||||
.await
|
||||
.insert(request_id.clone(), response_tx);
|
||||
|
||||
let params = match serde_json::to_value(params) {
|
||||
Ok(params) => params,
|
||||
Err(err) => {
|
||||
self.pending.lock().await.remove(&request_id);
|
||||
return Err(RpcCallError::Json(err));
|
||||
}
|
||||
};
|
||||
if self
|
||||
.write_tx
|
||||
.send(JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: request_id.clone(),
|
||||
method: method.to_string(),
|
||||
params: Some(params),
|
||||
trace: None,
|
||||
}))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
self.pending.lock().await.remove(&request_id);
|
||||
return Err(RpcCallError::Closed);
|
||||
}
|
||||
|
||||
let result = response_rx.await.map_err(|_| RpcCallError::Closed)?;
|
||||
let response = match result {
|
||||
Ok(response) => response,
|
||||
Err(error) => return Err(RpcCallError::Server(error)),
|
||||
};
|
||||
serde_json::from_value(response).map_err(RpcCallError::Json)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn pending_request_count(&self) -> usize {
|
||||
self.pending.lock().await.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RpcClient {
|
||||
fn drop(&mut self) {
|
||||
for task in &self.transport_tasks {
|
||||
task.abort();
|
||||
}
|
||||
self.reader_task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum RpcCallError {
|
||||
Closed,
|
||||
Json(serde_json::Error),
|
||||
Server(JSONRPCErrorError),
|
||||
}
|
||||
|
||||
pub(crate) fn encode_server_message(
|
||||
message: RpcServerOutboundMessage,
|
||||
) -> Result<JSONRPCMessage, serde_json::Error> {
|
||||
match message {
|
||||
RpcServerOutboundMessage::Response { request_id, result } => {
|
||||
Ok(JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: request_id,
|
||||
result,
|
||||
}))
|
||||
}
|
||||
RpcServerOutboundMessage::Error { request_id, error } => {
|
||||
Ok(JSONRPCMessage::Error(JSONRPCError {
|
||||
id: request_id,
|
||||
error,
|
||||
}))
|
||||
}
|
||||
RpcServerOutboundMessage::Notification(notification) => {
|
||||
Ok(JSONRPCMessage::Notification(notification))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn invalid_request(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32600,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn method_not_found(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32601,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn invalid_params(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32602,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn internal_error(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32603,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_request_params<P>(params: Option<Value>) -> Result<P, JSONRPCErrorError>
|
||||
where
|
||||
P: DeserializeOwned,
|
||||
{
|
||||
decode_params(params).map_err(|err| invalid_params(err.to_string()))
|
||||
}
|
||||
|
||||
fn decode_notification_params<P>(params: Option<Value>) -> Result<P, String>
|
||||
where
|
||||
P: DeserializeOwned,
|
||||
{
|
||||
decode_params(params).map_err(|err| err.to_string())
|
||||
}
|
||||
|
||||
fn decode_params<P>(params: Option<Value>) -> Result<P, serde_json::Error>
|
||||
where
|
||||
P: DeserializeOwned,
|
||||
{
|
||||
let params = params.unwrap_or(Value::Null);
|
||||
match serde_json::from_value(params.clone()) {
|
||||
Ok(params) => Ok(params),
|
||||
Err(err) => {
|
||||
if matches!(params, Value::Object(ref map) if map.is_empty()) {
|
||||
serde_json::from_value(Value::Null).map_err(|_| err)
|
||||
} else {
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_server_message(
|
||||
pending: &Mutex<HashMap<RequestId, PendingRequest>>,
|
||||
event_tx: &mpsc::Sender<RpcClientEvent>,
|
||||
message: JSONRPCMessage,
|
||||
) -> Result<(), String> {
|
||||
match message {
|
||||
JSONRPCMessage::Response(JSONRPCResponse { id, result }) => {
|
||||
if let Some(pending) = pending.lock().await.remove(&id) {
|
||||
let _ = pending.send(Ok(result));
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Error(JSONRPCError { id, error }) => {
|
||||
if let Some(pending) = pending.lock().await.remove(&id) {
|
||||
let _ = pending.send(Err(error));
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
let _ = event_tx
|
||||
.send(RpcClientEvent::Notification(notification))
|
||||
.await;
|
||||
}
|
||||
JSONRPCMessage::Request(request) => {
|
||||
return Err(format!(
|
||||
"unexpected JSON-RPC request from remote server: {}",
|
||||
request.method
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn drain_pending(pending: &Mutex<HashMap<RequestId, PendingRequest>>) {
|
||||
let pending = {
|
||||
let mut pending = pending.lock().await;
|
||||
pending
|
||||
.drain()
|
||||
.map(|(_, pending)| pending)
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
for pending in pending {
|
||||
let _ = pending.send(Err(JSONRPCErrorError {
|
||||
code: -32000,
|
||||
data: None,
|
||||
message: "JSON-RPC transport closed".to_string(),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::RpcClient;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
|
||||
async fn read_jsonrpc_line<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> JSONRPCMessage
|
||||
where
|
||||
R: tokio::io::AsyncRead + Unpin,
|
||||
{
|
||||
let next_line = timeout(Duration::from_secs(1), lines.next_line()).await;
|
||||
let line_result = match next_line {
|
||||
Ok(line_result) => line_result,
|
||||
Err(err) => panic!("timed out waiting for JSON-RPC line: {err}"),
|
||||
};
|
||||
let maybe_line = match line_result {
|
||||
Ok(maybe_line) => maybe_line,
|
||||
Err(err) => panic!("failed to read JSON-RPC line: {err}"),
|
||||
};
|
||||
let line = match maybe_line {
|
||||
Some(line) => line,
|
||||
None => panic!("server connection closed before JSON-RPC line arrived"),
|
||||
};
|
||||
match serde_json::from_str::<JSONRPCMessage>(&line) {
|
||||
Ok(message) => message,
|
||||
Err(err) => panic!("failed to parse JSON-RPC line: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_jsonrpc_line<W>(writer: &mut W, message: JSONRPCMessage)
|
||||
where
|
||||
W: tokio::io::AsyncWrite + Unpin,
|
||||
{
|
||||
let encoded = match serde_json::to_string(&message) {
|
||||
Ok(encoded) => encoded,
|
||||
Err(err) => panic!("failed to encode JSON-RPC message: {err}"),
|
||||
};
|
||||
if let Err(err) = writer.write_all(format!("{encoded}\n").as_bytes()).await {
|
||||
panic!("failed to write JSON-RPC line: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rpc_client_matches_out_of_order_responses_by_request_id() {
|
||||
let (client_stdin, server_reader) = tokio::io::duplex(4096);
|
||||
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
|
||||
let (client, _events_rx) = RpcClient::new(JsonRpcConnection::from_stdio(
|
||||
client_stdout,
|
||||
client_stdin,
|
||||
"test-rpc".to_string(),
|
||||
));
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(server_reader).lines();
|
||||
|
||||
let first = read_jsonrpc_line(&mut lines).await;
|
||||
let second = read_jsonrpc_line(&mut lines).await;
|
||||
let (slow_request, fast_request) = match (first, second) {
|
||||
(
|
||||
JSONRPCMessage::Request(first_request),
|
||||
JSONRPCMessage::Request(second_request),
|
||||
) if first_request.method == "slow" && second_request.method == "fast" => {
|
||||
(first_request, second_request)
|
||||
}
|
||||
(
|
||||
JSONRPCMessage::Request(first_request),
|
||||
JSONRPCMessage::Request(second_request),
|
||||
) if first_request.method == "fast" && second_request.method == "slow" => {
|
||||
(second_request, first_request)
|
||||
}
|
||||
_ => panic!("expected slow and fast requests"),
|
||||
};
|
||||
|
||||
write_jsonrpc_line(
|
||||
&mut server_writer,
|
||||
JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: fast_request.id,
|
||||
result: serde_json::json!({ "value": "fast" }),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
write_jsonrpc_line(
|
||||
&mut server_writer,
|
||||
JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: slow_request.id,
|
||||
result: serde_json::json!({ "value": "slow" }),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
let slow_params = serde_json::json!({ "n": 1 });
|
||||
let fast_params = serde_json::json!({ "n": 2 });
|
||||
let (slow, fast) = tokio::join!(
|
||||
client.call::<_, serde_json::Value>("slow", &slow_params),
|
||||
client.call::<_, serde_json::Value>("fast", &fast_params),
|
||||
);
|
||||
|
||||
let slow = slow.unwrap_or_else(|err| panic!("slow request failed: {err:?}"));
|
||||
let fast = fast.unwrap_or_else(|err| panic!("fast request failed: {err:?}"));
|
||||
assert_eq!(slow, serde_json::json!({ "value": "slow" }));
|
||||
assert_eq!(fast, serde_json::json!({ "value": "fast" }));
|
||||
|
||||
assert_eq!(client.pending_request_count().await, 0);
|
||||
|
||||
if let Err(err) = server.await {
|
||||
panic!("server task failed: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,9 @@
|
||||
mod handler;
|
||||
mod processor;
|
||||
mod routing;
|
||||
mod registry;
|
||||
mod transport;
|
||||
|
||||
pub(crate) use handler::ExecServerHandler;
|
||||
pub(crate) use routing::ExecServerOutboundMessage;
|
||||
pub(crate) use routing::ExecServerServerNotification;
|
||||
pub use transport::ExecServerTransport;
|
||||
pub use transport::ExecServerTransportParseError;
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
74
codex-rs/exec-server/src/server/handler/tests.rs
Normal file
74
codex-rs/exec-server/src/server/handler/tests.rs
Normal file
@@ -0,0 +1,74 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use super::ExecServerHandler;
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::InitializeResponse;
|
||||
use crate::protocol::PROTOCOL_VERSION;
|
||||
use crate::rpc::RpcNotificationSender;
|
||||
|
||||
fn exec_params(process_id: &str) -> ExecParams {
|
||||
let mut env = HashMap::new();
|
||||
if let Some(path) = std::env::var_os("PATH") {
|
||||
env.insert("PATH".to_string(), path.to_string_lossy().into_owned());
|
||||
}
|
||||
ExecParams {
|
||||
process_id: process_id.to_string(),
|
||||
argv: vec![
|
||||
"bash".to_string(),
|
||||
"-lc".to_string(),
|
||||
"sleep 0.1".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir().expect("cwd"),
|
||||
env,
|
||||
tty: false,
|
||||
arg0: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn initialized_handler() -> Arc<ExecServerHandler> {
|
||||
let (outgoing_tx, _outgoing_rx) = mpsc::channel(16);
|
||||
let handler = Arc::new(ExecServerHandler::new(RpcNotificationSender::new(
|
||||
outgoing_tx,
|
||||
)));
|
||||
assert_eq!(
|
||||
handler.initialize().expect("initialize"),
|
||||
InitializeResponse {
|
||||
protocol_version: PROTOCOL_VERSION.to_string(),
|
||||
}
|
||||
);
|
||||
handler.initialized().expect("initialized");
|
||||
handler
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn duplicate_process_ids_allow_only_one_successful_start() {
|
||||
let handler = initialized_handler().await;
|
||||
let first_handler = Arc::clone(&handler);
|
||||
let second_handler = Arc::clone(&handler);
|
||||
|
||||
let (first, second) = tokio::join!(
|
||||
first_handler.exec(exec_params("proc-1")),
|
||||
second_handler.exec(exec_params("proc-1")),
|
||||
);
|
||||
|
||||
let (successes, failures): (Vec<_>, Vec<_>) =
|
||||
[first, second].into_iter().partition(Result::is_ok);
|
||||
assert_eq!(successes.len(), 1);
|
||||
assert_eq!(failures.len(), 1);
|
||||
|
||||
let error = failures
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("one failed request")
|
||||
.expect_err("expected duplicate process error");
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(error.message, "process proc-1 already exists");
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(150)).await;
|
||||
handler.shutdown().await;
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
@@ -5,25 +7,24 @@ use tracing::warn;
|
||||
use crate::connection::CHANNEL_CAPACITY;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::connection::JsonRpcConnectionEvent;
|
||||
use crate::server::handler::ExecServerHandler;
|
||||
use crate::server::routing::ExecServerClientNotification;
|
||||
use crate::server::routing::ExecServerInboundMessage;
|
||||
use crate::server::routing::ExecServerOutboundMessage;
|
||||
use crate::server::routing::ExecServerRequest;
|
||||
use crate::server::routing::ExecServerResponseMessage;
|
||||
use crate::server::routing::RoutedExecServerMessage;
|
||||
use crate::server::routing::encode_outbound_message;
|
||||
use crate::server::routing::route_jsonrpc_message;
|
||||
use crate::rpc::RpcNotificationSender;
|
||||
use crate::rpc::RpcServerOutboundMessage;
|
||||
use crate::rpc::encode_server_message;
|
||||
use crate::rpc::method_not_found;
|
||||
use crate::server::ExecServerHandler;
|
||||
use crate::server::registry::build_router;
|
||||
|
||||
pub(crate) async fn run_connection(connection: JsonRpcConnection) {
|
||||
let (json_outgoing_tx, mut incoming_rx) = connection.into_parts();
|
||||
let router = Arc::new(build_router());
|
||||
let (json_outgoing_tx, mut incoming_rx, _connection_tasks) = connection.into_parts();
|
||||
let (outgoing_tx, mut outgoing_rx) =
|
||||
mpsc::channel::<ExecServerOutboundMessage>(CHANNEL_CAPACITY);
|
||||
let mut handler = ExecServerHandler::new(outgoing_tx.clone());
|
||||
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
|
||||
let notifications = RpcNotificationSender::new(outgoing_tx.clone());
|
||||
let handler = Arc::new(ExecServerHandler::new(notifications));
|
||||
|
||||
let outbound_task = tokio::spawn(async move {
|
||||
while let Some(message) = outgoing_rx.recv().await {
|
||||
let json_message = match encode_outbound_message(message) {
|
||||
let json_message = match encode_server_message(message) {
|
||||
Ok(json_message) => json_message,
|
||||
Err(err) => {
|
||||
warn!("failed to serialize exec-server outbound message: {err}");
|
||||
@@ -38,21 +39,55 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) {
|
||||
|
||||
while let Some(event) = incoming_rx.recv().await {
|
||||
match event {
|
||||
JsonRpcConnectionEvent::Message(message) => match route_jsonrpc_message(message) {
|
||||
Ok(RoutedExecServerMessage::Inbound(message)) => {
|
||||
if let Err(err) = dispatch_to_handler(&mut handler, message, &outgoing_tx).await
|
||||
JsonRpcConnectionEvent::Message(message) => match message {
|
||||
codex_app_server_protocol::JSONRPCMessage::Request(request) => {
|
||||
if let Some(route) = router.request_route(request.method.as_str()) {
|
||||
let route = route(handler.clone(), request);
|
||||
let outgoing_tx = outgoing_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let message = route.await;
|
||||
let _ = outgoing_tx.send(message).await;
|
||||
});
|
||||
} else if outgoing_tx
|
||||
.send(RpcServerOutboundMessage::Error {
|
||||
request_id: request.id,
|
||||
error: method_not_found(format!(
|
||||
"exec-server stub does not implement `{}` yet",
|
||||
request.method
|
||||
)),
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
codex_app_server_protocol::JSONRPCMessage::Notification(notification) => {
|
||||
let Some(route) = router.notification_route(notification.method.as_str())
|
||||
else {
|
||||
warn!(
|
||||
"closing exec-server connection after unexpected notification: {}",
|
||||
notification.method
|
||||
);
|
||||
break;
|
||||
};
|
||||
if let Err(err) = route(handler.clone(), notification).await {
|
||||
warn!("closing exec-server connection after protocol error: {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(RoutedExecServerMessage::ImmediateOutbound(message)) => {
|
||||
if outgoing_tx.send(message).await.is_err() {
|
||||
break;
|
||||
}
|
||||
codex_app_server_protocol::JSONRPCMessage::Response(response) => {
|
||||
warn!(
|
||||
"closing exec-server connection after unexpected client response: {:?}",
|
||||
response.id
|
||||
);
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("closing exec-server connection after protocol error: {err}");
|
||||
codex_app_server_protocol::JSONRPCMessage::Error(error) => {
|
||||
warn!(
|
||||
"closing exec-server connection after unexpected client error: {:?}",
|
||||
error.id
|
||||
);
|
||||
break;
|
||||
}
|
||||
},
|
||||
@@ -66,74 +101,6 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) {
|
||||
}
|
||||
|
||||
handler.shutdown().await;
|
||||
drop(handler);
|
||||
drop(outgoing_tx);
|
||||
let _ = outbound_task.await;
|
||||
}
|
||||
|
||||
async fn dispatch_to_handler(
|
||||
handler: &mut ExecServerHandler,
|
||||
message: ExecServerInboundMessage,
|
||||
outgoing_tx: &mpsc::Sender<ExecServerOutboundMessage>,
|
||||
) -> Result<(), String> {
|
||||
match message {
|
||||
ExecServerInboundMessage::Request(request) => {
|
||||
let outbound = match request {
|
||||
ExecServerRequest::Initialize { request_id, .. } => request_outbound(
|
||||
request_id,
|
||||
handler
|
||||
.initialize()
|
||||
.map(ExecServerResponseMessage::Initialize),
|
||||
),
|
||||
ExecServerRequest::Exec { request_id, params } => request_outbound(
|
||||
request_id,
|
||||
handler
|
||||
.exec(params)
|
||||
.await
|
||||
.map(ExecServerResponseMessage::Exec),
|
||||
),
|
||||
ExecServerRequest::Read { request_id, params } => request_outbound(
|
||||
request_id,
|
||||
handler
|
||||
.read(params)
|
||||
.await
|
||||
.map(ExecServerResponseMessage::Read),
|
||||
),
|
||||
ExecServerRequest::Write { request_id, params } => request_outbound(
|
||||
request_id,
|
||||
handler
|
||||
.write(params)
|
||||
.await
|
||||
.map(ExecServerResponseMessage::Write),
|
||||
),
|
||||
ExecServerRequest::Terminate { request_id, params } => request_outbound(
|
||||
request_id,
|
||||
handler
|
||||
.terminate(params)
|
||||
.await
|
||||
.map(ExecServerResponseMessage::Terminate),
|
||||
),
|
||||
};
|
||||
outgoing_tx
|
||||
.send(outbound)
|
||||
.await
|
||||
.map_err(|_| "outbound channel closed".to_string())
|
||||
}
|
||||
ExecServerInboundMessage::Notification(ExecServerClientNotification::Initialized) => {
|
||||
handler.initialized()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn request_outbound(
|
||||
request_id: codex_app_server_protocol::RequestId,
|
||||
result: Result<ExecServerResponseMessage, codex_app_server_protocol::JSONRPCErrorError>,
|
||||
) -> ExecServerOutboundMessage {
|
||||
match result {
|
||||
Ok(response) => ExecServerOutboundMessage::Response {
|
||||
request_id,
|
||||
response,
|
||||
},
|
||||
Err(error) => ExecServerOutboundMessage::Error { request_id, error },
|
||||
}
|
||||
}
|
||||
|
||||
52
codex-rs/exec-server/src/server/registry.rs
Normal file
52
codex-rs/exec-server/src/server/registry.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::protocol::EXEC_METHOD;
|
||||
use crate::protocol::EXEC_READ_METHOD;
|
||||
use crate::protocol::EXEC_TERMINATE_METHOD;
|
||||
use crate::protocol::EXEC_WRITE_METHOD;
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::INITIALIZE_METHOD;
|
||||
use crate::protocol::INITIALIZED_METHOD;
|
||||
use crate::protocol::InitializeParams;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::WriteParams;
|
||||
use crate::rpc::RpcRouter;
|
||||
use crate::server::ExecServerHandler;
|
||||
|
||||
pub(crate) fn build_router() -> RpcRouter<ExecServerHandler> {
|
||||
let mut router = RpcRouter::new();
|
||||
router.request(
|
||||
INITIALIZE_METHOD,
|
||||
|handler: Arc<ExecServerHandler>, _params: InitializeParams| async move {
|
||||
handler.initialize()
|
||||
},
|
||||
);
|
||||
router.notification(
|
||||
INITIALIZED_METHOD,
|
||||
|handler: Arc<ExecServerHandler>, (): ()| async move { handler.initialized() },
|
||||
);
|
||||
router.request(
|
||||
EXEC_METHOD,
|
||||
|handler: Arc<ExecServerHandler>, params: ExecParams| async move { handler.exec(params).await },
|
||||
);
|
||||
router.request(
|
||||
EXEC_READ_METHOD,
|
||||
|handler: Arc<ExecServerHandler>, params: ReadParams| async move {
|
||||
handler.exec_read(params).await
|
||||
},
|
||||
);
|
||||
router.request(
|
||||
EXEC_WRITE_METHOD,
|
||||
|handler: Arc<ExecServerHandler>, params: WriteParams| async move {
|
||||
handler.exec_write(params).await
|
||||
},
|
||||
);
|
||||
router.request(
|
||||
EXEC_TERMINATE_METHOD,
|
||||
|handler: Arc<ExecServerHandler>, params: TerminateParams| async move {
|
||||
handler.terminate(params).await
|
||||
},
|
||||
);
|
||||
router
|
||||
}
|
||||
@@ -1,454 +0,0 @@
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
use crate::protocol::EXEC_EXITED_METHOD;
|
||||
use crate::protocol::EXEC_METHOD;
|
||||
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
|
||||
use crate::protocol::EXEC_READ_METHOD;
|
||||
use crate::protocol::EXEC_TERMINATE_METHOD;
|
||||
use crate::protocol::EXEC_WRITE_METHOD;
|
||||
use crate::protocol::ExecExitedNotification;
|
||||
use crate::protocol::ExecOutputDeltaNotification;
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::ExecResponse;
|
||||
use crate::protocol::INITIALIZE_METHOD;
|
||||
use crate::protocol::INITIALIZED_METHOD;
|
||||
use crate::protocol::InitializeParams;
|
||||
use crate::protocol::InitializeResponse;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::ReadResponse;
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteParams;
|
||||
use crate::protocol::WriteResponse;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum ExecServerInboundMessage {
|
||||
Request(ExecServerRequest),
|
||||
Notification(ExecServerClientNotification),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum ExecServerRequest {
|
||||
Initialize {
|
||||
request_id: RequestId,
|
||||
params: InitializeParams,
|
||||
},
|
||||
Exec {
|
||||
request_id: RequestId,
|
||||
params: ExecParams,
|
||||
},
|
||||
Read {
|
||||
request_id: RequestId,
|
||||
params: ReadParams,
|
||||
},
|
||||
Write {
|
||||
request_id: RequestId,
|
||||
params: WriteParams,
|
||||
},
|
||||
Terminate {
|
||||
request_id: RequestId,
|
||||
params: TerminateParams,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum ExecServerClientNotification {
|
||||
Initialized,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub(crate) enum ExecServerOutboundMessage {
|
||||
Response {
|
||||
request_id: RequestId,
|
||||
response: ExecServerResponseMessage,
|
||||
},
|
||||
Error {
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
},
|
||||
Notification(ExecServerServerNotification),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum ExecServerResponseMessage {
|
||||
Initialize(InitializeResponse),
|
||||
Exec(ExecResponse),
|
||||
Read(ReadResponse),
|
||||
Write(WriteResponse),
|
||||
Terminate(TerminateResponse),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum ExecServerServerNotification {
|
||||
OutputDelta(ExecOutputDeltaNotification),
|
||||
Exited(ExecExitedNotification),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub(crate) enum RoutedExecServerMessage {
|
||||
Inbound(ExecServerInboundMessage),
|
||||
ImmediateOutbound(ExecServerOutboundMessage),
|
||||
}
|
||||
|
||||
pub(crate) fn route_jsonrpc_message(
|
||||
message: JSONRPCMessage,
|
||||
) -> Result<RoutedExecServerMessage, String> {
|
||||
match message {
|
||||
JSONRPCMessage::Request(request) => route_request(request),
|
||||
JSONRPCMessage::Notification(notification) => route_notification(notification),
|
||||
JSONRPCMessage::Response(response) => Err(format!(
|
||||
"unexpected client response for request id {:?}",
|
||||
response.id
|
||||
)),
|
||||
JSONRPCMessage::Error(error) => Err(format!(
|
||||
"unexpected client error for request id {:?}",
|
||||
error.id
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn encode_outbound_message(
|
||||
message: ExecServerOutboundMessage,
|
||||
) -> Result<JSONRPCMessage, serde_json::Error> {
|
||||
match message {
|
||||
ExecServerOutboundMessage::Response {
|
||||
request_id,
|
||||
response,
|
||||
} => Ok(JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: request_id,
|
||||
result: serialize_response(response)?,
|
||||
})),
|
||||
ExecServerOutboundMessage::Error { request_id, error } => {
|
||||
Ok(JSONRPCMessage::Error(JSONRPCError {
|
||||
id: request_id,
|
||||
error,
|
||||
}))
|
||||
}
|
||||
ExecServerOutboundMessage::Notification(notification) => Ok(JSONRPCMessage::Notification(
|
||||
serialize_notification(notification)?,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn invalid_request(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32600,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn invalid_params(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32602,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn internal_error(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32603,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
fn route_request(request: JSONRPCRequest) -> Result<RoutedExecServerMessage, String> {
|
||||
match request.method.as_str() {
|
||||
INITIALIZE_METHOD => Ok(parse_request_params(request, |request_id, params| {
|
||||
ExecServerRequest::Initialize { request_id, params }
|
||||
})),
|
||||
EXEC_METHOD => Ok(parse_request_params(request, |request_id, params| {
|
||||
ExecServerRequest::Exec { request_id, params }
|
||||
})),
|
||||
EXEC_READ_METHOD => Ok(parse_request_params(request, |request_id, params| {
|
||||
ExecServerRequest::Read { request_id, params }
|
||||
})),
|
||||
EXEC_WRITE_METHOD => Ok(parse_request_params(request, |request_id, params| {
|
||||
ExecServerRequest::Write { request_id, params }
|
||||
})),
|
||||
EXEC_TERMINATE_METHOD => Ok(parse_request_params(request, |request_id, params| {
|
||||
ExecServerRequest::Terminate { request_id, params }
|
||||
})),
|
||||
other => Ok(RoutedExecServerMessage::ImmediateOutbound(
|
||||
ExecServerOutboundMessage::Error {
|
||||
request_id: request.id,
|
||||
error: invalid_request(format!("unknown method: {other}")),
|
||||
},
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn route_notification(
|
||||
notification: JSONRPCNotification,
|
||||
) -> Result<RoutedExecServerMessage, String> {
|
||||
match notification.method.as_str() {
|
||||
INITIALIZED_METHOD => Ok(RoutedExecServerMessage::Inbound(
|
||||
ExecServerInboundMessage::Notification(ExecServerClientNotification::Initialized),
|
||||
)),
|
||||
other => Err(format!("unexpected notification method: {other}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_request_params<P, F>(request: JSONRPCRequest, build: F) -> RoutedExecServerMessage
|
||||
where
|
||||
P: DeserializeOwned,
|
||||
F: FnOnce(RequestId, P) -> ExecServerRequest,
|
||||
{
|
||||
let request_id = request.id;
|
||||
match serde_json::from_value::<P>(request.params.unwrap_or(serde_json::Value::Null)) {
|
||||
Ok(params) => RoutedExecServerMessage::Inbound(ExecServerInboundMessage::Request(build(
|
||||
request_id, params,
|
||||
))),
|
||||
Err(err) => RoutedExecServerMessage::ImmediateOutbound(ExecServerOutboundMessage::Error {
|
||||
request_id,
|
||||
error: invalid_params(err.to_string()),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_response(
|
||||
response: ExecServerResponseMessage,
|
||||
) -> Result<serde_json::Value, serde_json::Error> {
|
||||
match response {
|
||||
ExecServerResponseMessage::Initialize(response) => serde_json::to_value(response),
|
||||
ExecServerResponseMessage::Exec(response) => serde_json::to_value(response),
|
||||
ExecServerResponseMessage::Read(response) => serde_json::to_value(response),
|
||||
ExecServerResponseMessage::Write(response) => serde_json::to_value(response),
|
||||
ExecServerResponseMessage::Terminate(response) => serde_json::to_value(response),
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_notification(
|
||||
notification: ExecServerServerNotification,
|
||||
) -> Result<JSONRPCNotification, serde_json::Error> {
|
||||
match notification {
|
||||
ExecServerServerNotification::OutputDelta(params) => Ok(JSONRPCNotification {
|
||||
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
|
||||
params: Some(serde_json::to_value(params)?),
|
||||
}),
|
||||
ExecServerServerNotification::Exited(params) => Ok(JSONRPCNotification {
|
||||
method: EXEC_EXITED_METHOD.to_string(),
|
||||
params: Some(serde_json::to_value(params)?),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
use super::ExecServerClientNotification;
|
||||
use super::ExecServerInboundMessage;
|
||||
use super::ExecServerOutboundMessage;
|
||||
use super::ExecServerRequest;
|
||||
use super::ExecServerResponseMessage;
|
||||
use super::ExecServerServerNotification;
|
||||
use super::RoutedExecServerMessage;
|
||||
use super::encode_outbound_message;
|
||||
use super::route_jsonrpc_message;
|
||||
use crate::protocol::EXEC_EXITED_METHOD;
|
||||
use crate::protocol::EXEC_METHOD;
|
||||
use crate::protocol::ExecExitedNotification;
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::ExecResponse;
|
||||
use crate::protocol::INITIALIZE_METHOD;
|
||||
use crate::protocol::INITIALIZED_METHOD;
|
||||
use crate::protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
|
||||
#[test]
|
||||
fn routes_initialize_requests_to_typed_variants() {
|
||||
let routed = route_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: RequestId::Integer(1),
|
||||
method: INITIALIZE_METHOD.to_string(),
|
||||
params: Some(json!({ "clientName": "test-client" })),
|
||||
trace: None,
|
||||
}))
|
||||
.expect("initialize request should route");
|
||||
|
||||
assert_eq!(
|
||||
routed,
|
||||
RoutedExecServerMessage::Inbound(ExecServerInboundMessage::Request(
|
||||
ExecServerRequest::Initialize {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: InitializeParams {
|
||||
client_name: "test-client".to_string(),
|
||||
},
|
||||
},
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn malformed_exec_params_return_immediate_error_outbound() {
|
||||
let routed = route_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: RequestId::Integer(2),
|
||||
method: EXEC_METHOD.to_string(),
|
||||
params: Some(json!({ "processId": "proc-1" })),
|
||||
trace: None,
|
||||
}))
|
||||
.expect("exec request should route");
|
||||
|
||||
let RoutedExecServerMessage::ImmediateOutbound(ExecServerOutboundMessage::Error {
|
||||
request_id,
|
||||
error,
|
||||
}) = routed
|
||||
else {
|
||||
panic!("expected invalid-params error outbound");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(2));
|
||||
assert_eq!(error.code, -32602);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn routes_initialized_notifications_to_typed_variants() {
|
||||
let routed = route_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: INITIALIZED_METHOD.to_string(),
|
||||
params: Some(json!({})),
|
||||
}))
|
||||
.expect("initialized notification should route");
|
||||
|
||||
assert_eq!(
|
||||
routed,
|
||||
RoutedExecServerMessage::Inbound(ExecServerInboundMessage::Notification(
|
||||
ExecServerClientNotification::Initialized,
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serializes_typed_notifications_back_to_jsonrpc() {
|
||||
let message = encode_outbound_message(ExecServerOutboundMessage::Notification(
|
||||
ExecServerServerNotification::Exited(ExecExitedNotification {
|
||||
process_id: "proc-1".to_string(),
|
||||
exit_code: 0,
|
||||
}),
|
||||
))
|
||||
.expect("notification should serialize");
|
||||
|
||||
assert_eq!(
|
||||
message,
|
||||
JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: EXEC_EXITED_METHOD.to_string(),
|
||||
params: Some(json!({
|
||||
"processId": "proc-1",
|
||||
"exitCode": 0,
|
||||
})),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serializes_typed_responses_back_to_jsonrpc() {
|
||||
let message = encode_outbound_message(ExecServerOutboundMessage::Response {
|
||||
request_id: RequestId::Integer(3),
|
||||
response: ExecServerResponseMessage::Exec(ExecResponse {
|
||||
process_id: "proc-1".to_string(),
|
||||
}),
|
||||
})
|
||||
.expect("response should serialize");
|
||||
|
||||
assert_eq!(
|
||||
message,
|
||||
JSONRPCMessage::Response(codex_app_server_protocol::JSONRPCResponse {
|
||||
id: RequestId::Integer(3),
|
||||
result: json!({
|
||||
"processId": "proc-1",
|
||||
}),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn routes_exec_requests_with_typed_params() {
|
||||
let cwd = std::env::current_dir().expect("cwd");
|
||||
let routed = route_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: RequestId::Integer(4),
|
||||
method: EXEC_METHOD.to_string(),
|
||||
params: Some(json!({
|
||||
"processId": "proc-1",
|
||||
"argv": ["bash", "-lc", "true"],
|
||||
"cwd": cwd,
|
||||
"env": {},
|
||||
"tty": true,
|
||||
"arg0": null,
|
||||
})),
|
||||
trace: None,
|
||||
}))
|
||||
.expect("exec request should route");
|
||||
|
||||
let RoutedExecServerMessage::Inbound(ExecServerInboundMessage::Request(
|
||||
ExecServerRequest::Exec { request_id, params },
|
||||
)) = routed
|
||||
else {
|
||||
panic!("expected typed exec request");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(4));
|
||||
assert_eq!(
|
||||
params,
|
||||
ExecParams {
|
||||
process_id: "proc-1".to_string(),
|
||||
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
|
||||
cwd: std::env::current_dir().expect("cwd"),
|
||||
env: std::collections::HashMap::new(),
|
||||
tty: true,
|
||||
arg0: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_request_methods_return_immediate_invalid_request_errors() {
|
||||
let routed = route_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: RequestId::Integer(5),
|
||||
method: "process/unknown".to_string(),
|
||||
params: Some(json!({})),
|
||||
trace: None,
|
||||
}))
|
||||
.expect("unknown request should still route");
|
||||
|
||||
assert_eq!(
|
||||
routed,
|
||||
RoutedExecServerMessage::ImmediateOutbound(ExecServerOutboundMessage::Error {
|
||||
request_id: RequestId::Integer(5),
|
||||
error: super::invalid_request("unknown method: process/unknown".to_string()),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unexpected_client_notifications_are_rejected() {
|
||||
let err = route_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: "process/output".to_string(),
|
||||
params: Some(json!({})),
|
||||
}))
|
||||
.expect_err("unexpected client notification should fail");
|
||||
|
||||
assert_eq!(err, "unexpected notification method: process/output");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unexpected_client_responses_are_rejected() {
|
||||
let err = route_jsonrpc_message(JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: RequestId::Integer(6),
|
||||
result: json!({}),
|
||||
}))
|
||||
.expect_err("unexpected client response should fail");
|
||||
|
||||
assert_eq!(err, "unexpected client response for request id Integer(6)");
|
||||
}
|
||||
}
|
||||
@@ -200,69 +200,6 @@ async fn exec_server_client_connects_over_websocket() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn websocket_disconnect_terminates_processes_for_that_connection() -> anyhow::Result<()> {
|
||||
let mut env = std::collections::HashMap::new();
|
||||
if let Some(path) = std::env::var_os("PATH") {
|
||||
env.insert("PATH".to_string(), path.to_string_lossy().into_owned());
|
||||
}
|
||||
|
||||
let marker_path = std::env::temp_dir().join(format!(
|
||||
"codex-exec-server-disconnect-{}-{}",
|
||||
std::process::id(),
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)?
|
||||
.as_nanos()
|
||||
));
|
||||
let _ = std::fs::remove_file(&marker_path);
|
||||
|
||||
let binary = cargo_bin("codex-exec-server")?;
|
||||
let mut child = Command::new(binary);
|
||||
child.args(["--listen", "ws://127.0.0.1:0"]);
|
||||
child.stdin(Stdio::null());
|
||||
child.stdout(Stdio::null());
|
||||
child.stderr(Stdio::piped());
|
||||
let mut child = child.spawn()?;
|
||||
let stderr = child.stderr.take().expect("stderr");
|
||||
let mut stderr_lines = BufReader::new(stderr).lines();
|
||||
let websocket_url = read_websocket_url(&mut stderr_lines).await?;
|
||||
|
||||
{
|
||||
let client = ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
|
||||
websocket_url,
|
||||
client_name: "exec-server-test".to_string(),
|
||||
connect_timeout: Duration::from_secs(5),
|
||||
initialize_timeout: Duration::from_secs(5),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let _response = client
|
||||
.exec(ExecParams {
|
||||
process_id: "proc-1".to_string(),
|
||||
argv: vec![
|
||||
"bash".to_string(),
|
||||
"-lc".to_string(),
|
||||
format!("sleep 2; printf disconnected > {}", marker_path.display()),
|
||||
],
|
||||
cwd: std::env::current_dir()?,
|
||||
env,
|
||||
tty: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
assert!(
|
||||
!marker_path.exists(),
|
||||
"managed process should be terminated when the websocket client disconnects"
|
||||
);
|
||||
|
||||
child.start_kill()?;
|
||||
let _ = std::fs::remove_file(&marker_path);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_websocket_url<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> anyhow::Result<String>
|
||||
where
|
||||
R: tokio::io::AsyncRead + Unpin,
|
||||
|
||||
Reference in New Issue
Block a user