From 90804bb2eb03b096470641c55f823142de7613bc Mon Sep 17 00:00:00 2001 From: starr-openai Date: Mon, 18 May 2026 16:14:53 -0700 Subject: [PATCH] Remove manual websocket pong handling --- codex-rs/exec-server/src/connection.rs | 57 +++--------------- codex-rs/exec-server/src/relay.rs | 82 +------------------------- 2 files changed, 10 insertions(+), 129 deletions(-) diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index 750c9e0995..b211c504ac 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -401,19 +401,6 @@ impl JsonRpcConnection { break; } } - Ok(JsonRpcWebSocketFrame::Ping(payload)) => { - if let Err(err) = websocket.send(M::pong(payload)).await { - send_disconnected( - &incoming_tx, - &disconnected_tx, - Some(format!( - "failed to write websocket pong to {connection_label}: {err}" - )), - ) - .await; - break; - } - } Ok(JsonRpcWebSocketFrame::Close) => { send_disconnected( &incoming_tx, @@ -477,7 +464,6 @@ impl JsonRpcConnection { enum JsonRpcWebSocketFrame { Message(JSONRPCMessage), - Ping(bytes::Bytes), Close, Ignore, } @@ -486,7 +472,6 @@ trait JsonRpcWebSocketMessage: Send + 'static { fn parse_jsonrpc_frame(self) -> Result; fn from_text(text: String) -> Self; fn ping() -> Self; - fn pong(payload: bytes::Bytes) -> Self; } impl JsonRpcWebSocketMessage for Message { @@ -499,8 +484,9 @@ impl JsonRpcWebSocketMessage for Message { serde_json::from_slice(bytes.as_ref()).map(JsonRpcWebSocketFrame::Message) } Message::Close(_) => Ok(JsonRpcWebSocketFrame::Close), - Message::Ping(payload) => Ok(JsonRpcWebSocketFrame::Ping(payload)), - Message::Pong(_) | Message::Frame(_) => Ok(JsonRpcWebSocketFrame::Ignore), + Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => { + Ok(JsonRpcWebSocketFrame::Ignore) + } } } @@ -511,10 +497,6 @@ impl JsonRpcWebSocketMessage for Message { fn ping() -> Self { Self::Ping(Vec::new().into()) } - - fn pong(payload: bytes::Bytes) -> Self { - Self::Pong(payload) - } } impl JsonRpcWebSocketMessage for AxumWebSocketMessage { @@ -527,8 +509,9 @@ impl JsonRpcWebSocketMessage for AxumWebSocketMessage { serde_json::from_slice(bytes.as_ref()).map(JsonRpcWebSocketFrame::Message) } AxumWebSocketMessage::Close(_) => Ok(JsonRpcWebSocketFrame::Close), - AxumWebSocketMessage::Ping(payload) => Ok(JsonRpcWebSocketFrame::Ping(payload)), - AxumWebSocketMessage::Pong(_) => Ok(JsonRpcWebSocketFrame::Ignore), + AxumWebSocketMessage::Ping(_) | AxumWebSocketMessage::Pong(_) => { + Ok(JsonRpcWebSocketFrame::Ignore) + } } } @@ -539,10 +522,6 @@ impl JsonRpcWebSocketMessage for AxumWebSocketMessage { fn ping() -> Self { Self::Ping(Vec::new().into()) } - - fn pong(payload: bytes::Bytes) -> Self { - Self::Pong(payload) - } } async fn send_disconnected( @@ -628,23 +607,6 @@ mod tests { use super::*; - #[tokio::test] - async fn websocket_connection_pongs_server_ping() -> anyhow::Result<()> { - let (client_websocket, mut server_websocket) = websocket_pair().await?; - let connection = JsonRpcConnection::from_websocket(client_websocket, "test".into()); - - server_websocket - .send(Message::Ping(b"check".to_vec().into())) - .await?; - let message = timeout(Duration::from_secs(1), server_websocket.next()) - .await? - .expect("websocket should stay open")?; - assert_eq!(message, Message::Pong(b"check".to_vec().into())); - - drop(connection); - Ok(()) - } - #[tokio::test] async fn websocket_connection_sends_configured_ping() -> anyhow::Result<()> { let (client_websocket, mut server_websocket) = websocket_pair().await?; @@ -733,7 +695,7 @@ mod tests { connection.outgoing_tx.send(message.clone()).await?; control.wait_for_blocked_write().await?; - control.send_inbound(Message::Ping(b"check".to_vec().into()))?; + control.send_inbound(Message::Pong(b"check".to_vec().into()))?; assert!( timeout(Duration::from_millis(50), connection.incoming_rx.recv()) .await @@ -745,11 +707,6 @@ mod tests { timeout(Duration::from_secs(1), outbound_rx.next()).await?, Some(Message::Text(text)) if serde_json::from_str::(&text)? == message )); - assert!(matches!( - timeout(Duration::from_secs(1), outbound_rx.next()).await?, - Some(Message::Pong(payload)) if payload.as_ref() == b"check" - )); - drop(connection); Ok(()) } diff --git a/codex-rs/exec-server/src/relay.rs b/codex-rs/exec-server/src/relay.rs index 4dd607a305..da7a7422bb 100644 --- a/codex-rs/exec-server/src/relay.rs +++ b/codex-rs/exec-server/src/relay.rs @@ -268,12 +268,6 @@ where | RelayFrameBodyKind::Heartbeat => {} } } - Some(Ok(Message::Ping(payload))) => { - if websocket.send(Message::Pong(payload)).await.is_err() { - let _ = disconnected_tx.send(true); - break; - } - } Some(Ok(Message::Close(_))) | None => { let _ = disconnected_tx.send(true); let _ = incoming_tx @@ -281,7 +275,7 @@ where .await; break; } - Some(Ok(Message::Pong(_) | Message::Frame(_))) => {} + Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => {} Some(Ok(Message::Text(_))) => { let _ = incoming_tx .send(JsonRpcConnectionEvent::MalformedMessage { @@ -359,14 +353,8 @@ pub(crate) async fn run_multiplexed_executor( } } } - Some(Ok(Message::Ping(payload))) => { - if websocket.send(Message::Pong(payload)).await.is_err() { - break; - } - continue; - } Some(Ok(Message::Close(_))) | None => break, - Some(Ok(Message::Pong(_) | Message::Frame(_))) => continue, + Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => continue, Some(Ok(Message::Text(_))) => { warn!("dropping non-binary relay message frame from harness"); continue; @@ -517,46 +505,6 @@ mod tests { use super::*; - fn test_runtime_paths() -> anyhow::Result { - crate::ExecServerRuntimePaths::new( - std::env::current_exe()?, - /*codex_linux_sandbox_exe*/ None, - ) - .map_err(anyhow::Error::from) - } - - #[tokio::test] - async fn multiplexed_executor_pongs_server_ping() -> anyhow::Result<()> { - let (client_websocket, mut server_websocket) = websocket_pair().await?; - let executor_task = tokio::spawn(run_multiplexed_executor( - client_websocket, - ConnectionProcessor::new(test_runtime_paths()?), - )); - - server_websocket - .send(Message::Ping(b"check".to_vec().into())) - .await?; - read_pong(&mut server_websocket).await?; - - executor_task.abort(); - let _ = executor_task.await; - Ok(()) - } - - #[tokio::test] - async fn harness_connection_pongs_server_ping() -> anyhow::Result<()> { - let (client_websocket, mut server_websocket) = websocket_pair().await?; - let connection = harness_connection_from_websocket(client_websocket, "test".to_string()); - - server_websocket - .send(Message::Ping(b"check".to_vec().into())) - .await?; - read_pong(&mut server_websocket).await?; - - drop(connection); - Ok(()) - } - #[tokio::test] async fn harness_connection_receives_relay_data() -> anyhow::Result<()> { let (client_websocket, mut server_websocket) = websocket_pair().await?; @@ -637,7 +585,7 @@ mod tests { control.set_write_blocked(); connection.outgoing_tx.send(message.clone()).await?; control.wait_for_blocked_write().await?; - control.send_inbound(Message::Ping(b"check".to_vec().into()))?; + control.send_inbound(Message::Pong(b"check".to_vec().into()))?; assert!( timeout(Duration::from_millis(50), connection.incoming_rx.recv()) .await @@ -654,11 +602,6 @@ mod tests { let frame = decode_relay_message_frame(data_payload.as_ref())?; assert_eq!(frame.stream_id, stream_id); assert_eq!(frame.into_jsonrpc_message()?, message); - assert!(matches!( - timeout(Duration::from_secs(1), outbound_rx.next()).await?, - Some(Message::Pong(payload)) if payload.as_ref() == b"check" - )); - drop(connection); Ok(()) } @@ -701,25 +644,6 @@ mod tests { }) } - async fn read_pong( - websocket: &mut WebSocketStream, - ) -> anyhow::Result<()> { - loop { - let Some(message) = timeout(Duration::from_secs(1), websocket.next()).await? else { - anyhow::bail!("websocket closed before pong"); - }; - match message? { - Message::Pong(payload) if payload.as_ref() == b"check" => return Ok(()), - Message::Binary(_) - | Message::Text(_) - | Message::Ping(_) - | Message::Pong(_) - | Message::Frame(_) => {} - Message::Close(_) => anyhow::bail!("websocket closed before pong"), - } - } - } - struct ControlledWebSocket { inbound_rx: futures_mpsc::UnboundedReceiver>, outbound_tx: futures_mpsc::UnboundedSender,