From f3ba2aa4f87009222a9be3117d46659ea46937e0 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Tue, 5 May 2026 13:36:02 -0700 Subject: [PATCH] Model retained JSON-RPC transport generically Co-authored-by: Codex --- codex-rs/exec-server/src/client_transport.rs | 2 +- codex-rs/exec-server/src/connection.rs | 102 +++++++++++-------- 2 files changed, 61 insertions(+), 43 deletions(-) diff --git a/codex-rs/exec-server/src/client_transport.rs b/codex-rs/exec-server/src/client_transport.rs index 7543d283b4..2f8f1b8a05 100644 --- a/codex-rs/exec-server/src/client_transport.rs +++ b/codex-rs/exec-server/src/client_transport.rs @@ -107,7 +107,7 @@ impl ExecServerClient { Self::connect( JsonRpcConnection::from_stdio(stdout, stdin, "exec-server stdio command".to_string()) - .with_stdio_child(child), + .with_child_process(child), args.into(), ) .await diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index f7832e5a34..07b3d7f4d5 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -24,24 +24,40 @@ pub(crate) enum JsonRpcConnectionEvent { Disconnected { reason: Option }, } -struct StdioTransport { - child: Child, +#[derive(Default)] +struct JsonRpcTransport { + child_process: Option, } -impl Drop for StdioTransport { +impl JsonRpcTransport { + fn with_child_process(child_process: Child) -> Self { + Self { + child_process: Some(child_process), + } + } +} + +impl Drop for JsonRpcTransport { fn drop(&mut self) { - if let Err(err) = self.child.start_kill() { + if let Some(child_process) = self.child_process.as_mut() + && let Err(err) = child_process.start_kill() + { debug!("failed to terminate exec-server stdio child: {err}"); } } } -pub(crate) struct JsonRpcConnection { - outgoing_tx: Option>, - incoming_rx: Option>, - disconnected_rx: Option>, +struct JsonRpcConnectionRuntime { + outgoing_tx: mpsc::Sender, + incoming_rx: mpsc::Receiver, + disconnected_rx: watch::Receiver, task_handles: Vec>, - _stdio_transport: Option, +} + +pub(crate) struct JsonRpcConnection { + runtime: Option, + #[allow(dead_code)] + transport: JsonRpcTransport, } impl JsonRpcConnection { @@ -128,11 +144,13 @@ impl JsonRpcConnection { }); Self { - outgoing_tx: Some(outgoing_tx), - incoming_rx: Some(incoming_rx), - disconnected_rx: Some(disconnected_rx), - task_handles: vec![reader_task, writer_task], - _stdio_transport: None, + runtime: Some(JsonRpcConnectionRuntime { + outgoing_tx, + incoming_rx, + disconnected_rx, + task_handles: vec![reader_task, writer_task], + }), + transport: JsonRpcTransport::default(), } } @@ -263,11 +281,13 @@ impl JsonRpcConnection { }); Self { - outgoing_tx: Some(outgoing_tx), - incoming_rx: Some(incoming_rx), - disconnected_rx: Some(disconnected_rx), - task_handles: vec![reader_task, writer_task], - _stdio_transport: None, + runtime: Some(JsonRpcConnectionRuntime { + outgoing_tx, + incoming_rx, + disconnected_rx, + task_handles: vec![reader_task, writer_task], + }), + transport: JsonRpcTransport::default(), } } @@ -279,22 +299,20 @@ impl JsonRpcConnection { watch::Receiver, Vec>, ) { - ( - self.outgoing_tx - .take() - .expect("JSON-RPC client runtime already taken"), - self.incoming_rx - .take() - .expect("JSON-RPC client runtime already taken"), - self.disconnected_rx - .take() - .expect("JSON-RPC client runtime already taken"), - std::mem::take(&mut self.task_handles), - ) + let JsonRpcConnectionRuntime { + outgoing_tx, + incoming_rx, + disconnected_rx, + task_handles, + } = self + .runtime + .take() + .expect("JSON-RPC client runtime already taken"); + (outgoing_tx, incoming_rx, disconnected_rx, task_handles) } - pub(crate) fn with_stdio_child(mut self, child: Child) -> Self { - self._stdio_transport = Some(StdioTransport { child }); + pub(crate) fn with_child_process(mut self, child_process: Child) -> Self { + self.transport = JsonRpcTransport::with_child_process(child_process); self } @@ -306,15 +324,15 @@ impl JsonRpcConnection { watch::Receiver, Vec>, ) { - ( - self.outgoing_tx - .expect("JSON-RPC connection parts already taken"), - self.incoming_rx - .expect("JSON-RPC connection parts already taken"), - self.disconnected_rx - .expect("JSON-RPC connection parts already taken"), - self.task_handles, - ) + let JsonRpcConnectionRuntime { + outgoing_tx, + incoming_rx, + disconnected_rx, + task_handles, + } = self + .runtime + .expect("JSON-RPC connection parts already taken"); + (outgoing_tx, incoming_rx, disconnected_rx, task_handles) } }