From 74e96987b8ea486987bf27c40a7c8c4e0d12adf7 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Mon, 4 May 2026 12:52:40 -0700 Subject: [PATCH] Simplify exec-server transport internals Keep environment transport connection policy on ExecServerClient instead of the transport enum, and replace the JSON-RPC connection tuple alias with named connection parts. Co-authored-by: Codex --- codex-rs/exec-server/src/client.rs | 2 +- codex-rs/exec-server/src/client_transport.rs | 19 ++++++------- codex-rs/exec-server/src/connection.rs | 29 ++++++++++---------- codex-rs/exec-server/src/rpc.rs | 8 ++++-- codex-rs/exec-server/src/server/processor.rs | 13 ++++----- 5 files changed, 37 insertions(+), 34 deletions(-) diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index c04c57eb25..54a67fea3c 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -207,7 +207,7 @@ impl LazyRemoteExecServerClient { self.client .get_or_try_init(|| { let transport = self.transport.clone(); - async move { transport.connect_for_environment().await } + async move { ExecServerClient::connect_for_environment(transport).await } }) .await .cloned() diff --git a/codex-rs/exec-server/src/client_transport.rs b/codex-rs/exec-server/src/client_transport.rs index d877aae3df..00ecc2bd2e 100644 --- a/codex-rs/exec-server/src/client_transport.rs +++ b/codex-rs/exec-server/src/client_transport.rs @@ -21,7 +21,6 @@ use tracing::warn; use crate::ExecServerClient; use crate::ExecServerError; -use crate::client_api::ExecServerTransport; use crate::client_api::RemoteExecServerConnectArgs; use crate::client_api::StdioExecServerConnectArgs; use crate::connection::JsonRpcConnection; @@ -32,11 +31,13 @@ const ENVIRONMENT_INITIALIZE_TIMEOUT: Duration = Duration::from_secs(5); #[cfg(unix)] const STDIO_CHILD_TERM_GRACE_PERIOD: Duration = Duration::from_millis(500); -impl ExecServerTransport { - pub(crate) async fn connect_for_environment(self) -> Result { - match self { - ExecServerTransport::WebSocketUrl(websocket_url) => { - ExecServerClient::connect_websocket(RemoteExecServerConnectArgs { +impl ExecServerClient { + pub(crate) async fn connect_for_environment( + transport: crate::client_api::ExecServerTransport, + ) -> Result { + match transport { + crate::client_api::ExecServerTransport::WebSocketUrl(websocket_url) => { + Self::connect_websocket(RemoteExecServerConnectArgs { websocket_url, client_name: ENVIRONMENT_CLIENT_NAME.to_string(), connect_timeout: ENVIRONMENT_CONNECT_TIMEOUT, @@ -45,8 +46,8 @@ impl ExecServerTransport { }) .await } - ExecServerTransport::StdioShellCommand(shell_command) => { - ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs { + crate::client_api::ExecServerTransport::StdioShellCommand(shell_command) => { + Self::connect_stdio_command(StdioExecServerConnectArgs { shell_command, client_name: ENVIRONMENT_CLIENT_NAME.to_string(), initialize_timeout: ENVIRONMENT_INITIALIZE_TIMEOUT, @@ -56,9 +57,7 @@ impl ExecServerTransport { } } } -} -impl ExecServerClient { pub async fn connect_websocket( args: RemoteExecServerConnectArgs, ) -> Result { diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index 0ec4c6bc5f..9d06f0841d 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -16,13 +16,14 @@ use tokio::io::BufWriter; pub(crate) const CHANNEL_CAPACITY: usize = 128; pub(crate) type JsonRpcTransportLifetime = Box; -pub(crate) type JsonRpcConnectionParts = ( - mpsc::Sender, - mpsc::Receiver, - watch::Receiver, - Vec>, - Option, -); + +pub(crate) struct JsonRpcConnectionParts { + pub(crate) outgoing_tx: mpsc::Sender, + pub(crate) incoming_rx: mpsc::Receiver, + pub(crate) disconnected_rx: watch::Receiver, + pub(crate) task_handles: Vec>, + pub(crate) transport_lifetime: Option, +} #[derive(Debug)] pub(crate) enum JsonRpcConnectionEvent { @@ -272,13 +273,13 @@ impl JsonRpcConnection { } pub(crate) fn into_parts(self) -> JsonRpcConnectionParts { - ( - self.outgoing_tx, - self.incoming_rx, - self.disconnected_rx, - self.task_handles, - self.transport_lifetime, - ) + JsonRpcConnectionParts { + outgoing_tx: self.outgoing_tx, + incoming_rx: self.incoming_rx, + disconnected_rx: self.disconnected_rx, + task_handles: self.task_handles, + transport_lifetime: self.transport_lifetime, + } } } diff --git a/codex-rs/exec-server/src/rpc.rs b/codex-rs/exec-server/src/rpc.rs index d9d8fbbf72..3b155c08a2 100644 --- a/codex-rs/exec-server/src/rpc.rs +++ b/codex-rs/exec-server/src/rpc.rs @@ -243,8 +243,12 @@ struct TransportLifetime { impl RpcClient { pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver) { - let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks, transport_lifetime) = - connection.into_parts(); + let connection_parts = connection.into_parts(); + let write_tx = connection_parts.outgoing_tx; + let mut incoming_rx = connection_parts.incoming_rx; + let disconnected_rx = connection_parts.disconnected_rx; + let transport_tasks = connection_parts.task_handles; + let transport_lifetime = connection_parts.transport_lifetime; let pending = Arc::new(Mutex::new(HashMap::::new())); let (event_tx, event_rx) = mpsc::channel(128); diff --git a/codex-rs/exec-server/src/server/processor.rs b/codex-rs/exec-server/src/server/processor.rs index b7e1a03bd5..11472dc626 100644 --- a/codex-rs/exec-server/src/server/processor.rs +++ b/codex-rs/exec-server/src/server/processor.rs @@ -47,13 +47,12 @@ async fn run_connection( runtime_paths: ExecServerRuntimePaths, ) { let router = Arc::new(build_router()); - let ( - json_outgoing_tx, - mut incoming_rx, - mut disconnected_rx, - connection_tasks, - _transport_lifetime, - ) = connection.into_parts(); + let connection_parts = connection.into_parts(); + let json_outgoing_tx = connection_parts.outgoing_tx; + let mut incoming_rx = connection_parts.incoming_rx; + let mut disconnected_rx = connection_parts.disconnected_rx; + let connection_tasks = connection_parts.task_handles; + let _transport_lifetime = connection_parts.transport_lifetime; let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); let notifications = RpcNotificationSender::new(outgoing_tx.clone());