From d94300d782c45ea66aa95afacf789f0b9779c03a Mon Sep 17 00:00:00 2001 From: starr-openai Date: Mon, 18 May 2026 10:42:43 -0700 Subject: [PATCH] Preserve exec-server websocket keepalive ownership --- codex-rs/exec-server/src/connection.rs | 4 ++-- codex-rs/exec-server/src/relay.rs | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index ca1b70b85d..d1e2ad7720 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -323,11 +323,11 @@ impl JsonRpcConnection { where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - Self::from_websocket_stream(stream, connection_label, /*ping_interval*/ None) + Self::from_websocket_stream(stream, connection_label, Some(WEBSOCKET_KEEPALIVE_INTERVAL)) } pub(crate) fn from_axum_websocket(stream: AxumWebSocket, connection_label: String) -> Self { - Self::from_websocket_stream(stream, connection_label, Some(WEBSOCKET_KEEPALIVE_INTERVAL)) + Self::from_websocket_stream(stream, connection_label, /*ping_interval*/ None) } fn from_websocket_stream( diff --git a/codex-rs/exec-server/src/relay.rs b/codex-rs/exec-server/src/relay.rs index 6a6fef9d62..7a676e3469 100644 --- a/codex-rs/exec-server/src/relay.rs +++ b/codex-rs/exec-server/src/relay.rs @@ -19,6 +19,7 @@ use crate::connection::CHANNEL_CAPACITY; use crate::connection::JsonRpcConnection; use crate::connection::JsonRpcConnectionEvent; use crate::connection::JsonRpcTransport; +use crate::connection::WEBSOCKET_KEEPALIVE_INTERVAL; use crate::relay_proto::RelayData; use crate::relay_proto::RelayMessageFrame; use crate::relay_proto::RelayResume; @@ -166,6 +167,11 @@ where } let mut next_seq = 0u32; + let mut keepalive = tokio::time::interval_at( + tokio::time::Instant::now() + WEBSOCKET_KEEPALIVE_INTERVAL, + WEBSOCKET_KEEPALIVE_INTERVAL, + ); + keepalive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { maybe_message = outgoing_rx.recv() => { @@ -190,6 +196,12 @@ where break; } } + _ = keepalive.tick() => { + if websocket.send(Message::Ping(Vec::new().into())).await.is_err() { + let _ = disconnected_tx.send(true); + break; + } + } incoming_message = websocket.next() => { match incoming_message { Some(Ok(Message::Binary(payload))) => { @@ -312,6 +324,11 @@ pub(crate) async fn run_multiplexed_executor( mpsc::channel::>(CHANNEL_CAPACITY); let mut streams: HashMap = HashMap::new(); + let mut keepalive = tokio::time::interval_at( + tokio::time::Instant::now() + WEBSOCKET_KEEPALIVE_INTERVAL, + WEBSOCKET_KEEPALIVE_INTERVAL, + ); + keepalive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { let frame = tokio::select! { maybe_encoded = physical_outgoing_rx.recv() => { @@ -323,6 +340,12 @@ pub(crate) async fn run_multiplexed_executor( } continue; } + _ = keepalive.tick() => { + if websocket.send(Message::Ping(Vec::new().into())).await.is_err() { + break; + } + continue; + } incoming_message = websocket.next() => match incoming_message { Some(Ok(Message::Binary(payload))) => { match decode_relay_message_frame(payload.as_ref()) {