[exec-server] remove reverted HTTP upgrade residue

This commit is contained in:
Ruslan Nigmatullin
2026-05-11 22:30:19 +00:00
parent db05316160
commit 6c4a9ad192

View File

@@ -5,12 +5,8 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use axum::extract::ws::Message as AxumWebSocketMessage;
use axum::extract::ws::WebSocket as AxumWebSocket;
use codex_app_server_protocol::JSONRPCMessage;
use futures::Sink;
use futures::SinkExt;
use futures::Stream;
use futures::StreamExt;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
@@ -318,30 +314,11 @@ impl JsonRpcConnection {
pub(crate) fn from_websocket<S>(stream: WebSocketStream<S>, connection_label: String) -> Self
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let (websocket_writer, websocket_reader) = stream.split();
Self::from_websocket_parts(websocket_writer, websocket_reader, connection_label)
}
pub(crate) fn from_axum_websocket(stream: AxumWebSocket, connection_label: String) -> Self {
let (websocket_writer, websocket_reader) = stream.split();
Self::from_websocket_parts(websocket_writer, websocket_reader, connection_label)
}
fn from_websocket_parts<W, R, M, E>(
mut websocket_writer: W,
mut websocket_reader: R,
connection_label: String,
) -> Self
where
W: Sink<M, Error = E> + Unpin + Send + 'static,
R: Stream<Item = Result<M, E>> + Unpin + Send + 'static,
M: JsonRpcWebSocketMessage,
E: std::fmt::Display + Send + 'static,
{
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (disconnected_tx, disconnected_rx) = watch::channel(false);
let (mut websocket_writer, mut websocket_reader) = stream.split();
let reader_label = connection_label.clone();
let incoming_tx_for_reader = incoming_tx.clone();
@@ -349,36 +326,41 @@ impl JsonRpcConnection {
let reader_task = tokio::spawn(async move {
loop {
match websocket_reader.next().await {
Some(Ok(message)) => match message.parse_jsonrpc_frame() {
Ok(JsonRpcWebSocketFrame::Message(message)) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<JSONRPCMessage>(text.as_ref()) {
Ok(message) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
Err(err) => {
send_malformed_message(
&incoming_tx_for_reader,
Some(format!(
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
}
}
Err(err) => {
send_malformed_message(
&incoming_tx_for_reader,
Some(format!(
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
}
Ok(JsonRpcWebSocketFrame::Close) => {
send_disconnected(
&incoming_tx_for_reader,
&disconnected_tx_for_reader,
/*reason*/ None,
)
.await;
break;
}
Ok(JsonRpcWebSocketFrame::Ignore) => {}
},
}
Some(Ok(Message::Close(_))) => {
send_disconnected(
&incoming_tx_for_reader,
&disconnected_tx_for_reader,
/*reason*/ None,
)
.await;
break;
}
Some(Ok(Message::Binary(_)))
| Some(Ok(Message::Ping(_)))
| Some(Ok(Message::Pong(_)))
| Some(Ok(Message::Frame(_))) => {}
Some(Err(err)) => {
send_disconnected(
&incoming_tx_for_reader,
@@ -407,7 +389,8 @@ impl JsonRpcConnection {
while let Some(message) = outgoing_rx.recv().await {
match serialize_jsonrpc_message(&message) {
Ok(encoded) => {
if let Err(err) = websocket_writer.send(M::from_text(encoded)).await {
if let Err(err) = websocket_writer.send(Message::Text(encoded.into())).await
{
send_disconnected(
&incoming_tx,
&disconnected_tx,
@@ -449,53 +432,6 @@ impl JsonRpcConnection {
}
}
enum JsonRpcWebSocketFrame {
Message(JSONRPCMessage),
Close,
Ignore,
}
trait JsonRpcWebSocketMessage: Send + 'static {
fn parse_jsonrpc_frame(self) -> Result<JsonRpcWebSocketFrame, serde_json::Error>;
fn from_text(text: String) -> Self;
}
impl JsonRpcWebSocketMessage for Message {
fn parse_jsonrpc_frame(self) -> Result<JsonRpcWebSocketFrame, serde_json::Error> {
match self {
Message::Text(text) => {
serde_json::from_str(text.as_ref()).map(JsonRpcWebSocketFrame::Message)
}
Message::Close(_) => Ok(JsonRpcWebSocketFrame::Close),
Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => {
Ok(JsonRpcWebSocketFrame::Ignore)
}
}
}
fn from_text(text: String) -> Self {
Self::Text(text.into())
}
}
impl JsonRpcWebSocketMessage for AxumWebSocketMessage {
fn parse_jsonrpc_frame(self) -> Result<JsonRpcWebSocketFrame, serde_json::Error> {
match self {
AxumWebSocketMessage::Text(text) => {
serde_json::from_str(text.as_ref()).map(JsonRpcWebSocketFrame::Message)
}
AxumWebSocketMessage::Close(_) => Ok(JsonRpcWebSocketFrame::Close),
AxumWebSocketMessage::Binary(_)
| AxumWebSocketMessage::Ping(_)
| AxumWebSocketMessage::Pong(_) => Ok(JsonRpcWebSocketFrame::Ignore),
}
}
fn from_text(text: String) -> Self {
Self::Text(text.into())
}
}
async fn send_disconnected(
incoming_tx: &mpsc::Sender<JsonRpcConnectionEvent>,
disconnected_tx: &watch::Sender<bool>,