From 6c4a9ad19288181a2f0bf82fe4ffbe84bda2f662 Mon Sep 17 00:00:00 2001 From: Ruslan Nigmatullin Date: Mon, 11 May 2026 22:30:19 +0000 Subject: [PATCH] [exec-server] remove reverted HTTP upgrade residue --- codex-rs/exec-server/src/connection.rs | 136 +++++++------------------ 1 file changed, 36 insertions(+), 100 deletions(-) diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index 104d09f57d..cd0e72297f 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -5,12 +5,8 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; -use axum::extract::ws::Message as AxumWebSocketMessage; -use axum::extract::ws::WebSocket as AxumWebSocket; use codex_app_server_protocol::JSONRPCMessage; -use futures::Sink; use futures::SinkExt; -use futures::Stream; use futures::StreamExt; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; @@ -318,30 +314,11 @@ impl JsonRpcConnection { pub(crate) fn from_websocket(stream: WebSocketStream, connection_label: String) -> Self where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, - { - let (websocket_writer, websocket_reader) = stream.split(); - Self::from_websocket_parts(websocket_writer, websocket_reader, connection_label) - } - - pub(crate) fn from_axum_websocket(stream: AxumWebSocket, connection_label: String) -> Self { - let (websocket_writer, websocket_reader) = stream.split(); - Self::from_websocket_parts(websocket_writer, websocket_reader, connection_label) - } - - fn from_websocket_parts( - mut websocket_writer: W, - mut websocket_reader: R, - connection_label: String, - ) -> Self - where - W: Sink + Unpin + Send + 'static, - R: Stream> + Unpin + Send + 'static, - M: JsonRpcWebSocketMessage, - E: std::fmt::Display + Send + 'static, { let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY); let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY); let (disconnected_tx, disconnected_rx) = watch::channel(false); + let (mut websocket_writer, mut websocket_reader) = stream.split(); let reader_label = connection_label.clone(); let incoming_tx_for_reader = incoming_tx.clone(); @@ -349,36 +326,41 @@ impl JsonRpcConnection { let reader_task = tokio::spawn(async move { loop { match websocket_reader.next().await { - Some(Ok(message)) => match message.parse_jsonrpc_frame() { - Ok(JsonRpcWebSocketFrame::Message(message)) => { - if incoming_tx_for_reader - .send(JsonRpcConnectionEvent::Message(message)) - .await - .is_err() - { - break; + Some(Ok(Message::Text(text))) => { + match serde_json::from_str::(text.as_ref()) { + Ok(message) => { + if incoming_tx_for_reader + .send(JsonRpcConnectionEvent::Message(message)) + .await + .is_err() + { + break; + } + } + Err(err) => { + send_malformed_message( + &incoming_tx_for_reader, + Some(format!( + "failed to parse websocket JSON-RPC message from {reader_label}: {err}" + )), + ) + .await; } } - Err(err) => { - send_malformed_message( - &incoming_tx_for_reader, - Some(format!( - "failed to parse websocket JSON-RPC message from {reader_label}: {err}" - )), - ) - .await; - } - Ok(JsonRpcWebSocketFrame::Close) => { - send_disconnected( - &incoming_tx_for_reader, - &disconnected_tx_for_reader, - /*reason*/ None, - ) - .await; - break; - } - Ok(JsonRpcWebSocketFrame::Ignore) => {} - }, + } + Some(Ok(Message::Close(_))) => { + send_disconnected( + &incoming_tx_for_reader, + &disconnected_tx_for_reader, + /*reason*/ None, + ) + .await; + break; + } + Some(Ok(Message::Binary(_))) + | Some(Ok(Message::Ping(_))) + | Some(Ok(Message::Pong(_))) + | Some(Ok(Message::Frame(_))) => {} Some(Err(err)) => { send_disconnected( &incoming_tx_for_reader, @@ -407,7 +389,8 @@ impl JsonRpcConnection { while let Some(message) = outgoing_rx.recv().await { match serialize_jsonrpc_message(&message) { Ok(encoded) => { - if let Err(err) = websocket_writer.send(M::from_text(encoded)).await { + if let Err(err) = websocket_writer.send(Message::Text(encoded.into())).await + { send_disconnected( &incoming_tx, &disconnected_tx, @@ -449,53 +432,6 @@ impl JsonRpcConnection { } } -enum JsonRpcWebSocketFrame { - Message(JSONRPCMessage), - Close, - Ignore, -} - -trait JsonRpcWebSocketMessage: Send + 'static { - fn parse_jsonrpc_frame(self) -> Result; - fn from_text(text: String) -> Self; -} - -impl JsonRpcWebSocketMessage for Message { - fn parse_jsonrpc_frame(self) -> Result { - match self { - Message::Text(text) => { - serde_json::from_str(text.as_ref()).map(JsonRpcWebSocketFrame::Message) - } - Message::Close(_) => Ok(JsonRpcWebSocketFrame::Close), - Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => { - Ok(JsonRpcWebSocketFrame::Ignore) - } - } - } - - fn from_text(text: String) -> Self { - Self::Text(text.into()) - } -} - -impl JsonRpcWebSocketMessage for AxumWebSocketMessage { - fn parse_jsonrpc_frame(self) -> Result { - match self { - AxumWebSocketMessage::Text(text) => { - serde_json::from_str(text.as_ref()).map(JsonRpcWebSocketFrame::Message) - } - AxumWebSocketMessage::Close(_) => Ok(JsonRpcWebSocketFrame::Close), - AxumWebSocketMessage::Binary(_) - | AxumWebSocketMessage::Ping(_) - | AxumWebSocketMessage::Pong(_) => Ok(JsonRpcWebSocketFrame::Ignore), - } - } - - fn from_text(text: String) -> Self { - Self::Text(text.into()) - } -} - async fn send_disconnected( incoming_tx: &mpsc::Sender, disconnected_tx: &watch::Sender,