From 686953b63befb3231cb0e472cea51fd211e05584 Mon Sep 17 00:00:00 2001 From: Ruslan Nigmatullin Date: Mon, 11 May 2026 22:08:49 +0000 Subject: [PATCH] [exec-server] preserve raw websocket listen mode --- codex-rs/exec-server/src/server/transport.rs | 67 +++++++------------- codex-rs/exec-server/tests/health.rs | 21 ------ 2 files changed, 23 insertions(+), 65 deletions(-) delete mode 100644 codex-rs/exec-server/tests/health.rs diff --git a/codex-rs/exec-server/src/server/transport.rs b/codex-rs/exec-server/src/server/transport.rs index 92b2c77aaa..9deafa9b24 100644 --- a/codex-rs/exec-server/src/server/transport.rs +++ b/codex-rs/exec-server/src/server/transport.rs @@ -1,18 +1,12 @@ -use axum::Router; -use axum::extract::ConnectInfo; -use axum::extract::State; -use axum::extract::ws::WebSocketUpgrade; -use axum::http::StatusCode; -use axum::response::IntoResponse; -use axum::routing::any; -use axum::routing::get; use std::io::Write as _; use std::net::SocketAddr; use tokio::io; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::net::TcpListener; +use tokio_tungstenite::accept_async; use tracing::info; +use tracing::warn; use crate::ExecServerRuntimePaths; use crate::connection::JsonRpcConnection; @@ -120,42 +114,27 @@ async fn run_websocket_listener( println!("ws://{local_addr}"); std::io::stdout().flush()?; - let router = Router::new() - .route("/", any(websocket_upgrade_handler)) - .route("/readyz", get(readiness_handler)) - .with_state(ExecServerWebSocketState { processor }); - axum::serve( - listener, - router.into_make_service_with_connect_info::(), - ) - .await?; - Ok(()) -} - -#[derive(Clone)] -struct ExecServerWebSocketState { - processor: ConnectionProcessor, -} - -async fn readiness_handler() -> StatusCode { - StatusCode::OK -} - -async fn websocket_upgrade_handler( - websocket: WebSocketUpgrade, - ConnectInfo(peer_addr): ConnectInfo, - State(state): State, -) -> impl IntoResponse { - info!(%peer_addr, "exec-server websocket client connected"); - websocket.on_upgrade(move |stream| async move { - state - .processor - .run_connection(JsonRpcConnection::from_axum_websocket( - stream, - format!("exec-server websocket {peer_addr}"), - )) - .await; - }) + loop { + let (stream, peer_addr) = listener.accept().await?; + let processor = processor.clone(); + tokio::spawn(async move { + match accept_async(stream).await { + Ok(websocket) => { + processor + .run_connection(JsonRpcConnection::from_websocket( + websocket, + format!("exec-server websocket {peer_addr}"), + )) + .await; + } + Err(err) => { + warn!( + "failed to accept exec-server websocket connection from {peer_addr}: {err}" + ); + } + } + }); + } } #[cfg(test)] diff --git a/codex-rs/exec-server/tests/health.rs b/codex-rs/exec-server/tests/health.rs deleted file mode 100644 index 91b3806a22..0000000000 --- a/codex-rs/exec-server/tests/health.rs +++ /dev/null @@ -1,21 +0,0 @@ -#![cfg(unix)] - -mod common; - -use common::exec_server::exec_server; -use pretty_assertions::assert_eq; - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn exec_server_serves_readyz_alongside_websocket_endpoint() -> anyhow::Result<()> { - let mut server = exec_server().await?; - let http_base_url = server - .websocket_url() - .strip_prefix("ws://") - .expect("websocket URL should use ws://"); - - let response = reqwest::get(format!("http://{http_base_url}/readyz")).await?; - assert_eq!(response.status(), reqwest::StatusCode::OK); - - server.shutdown().await?; - Ok(()) -}