From 86b218d3908dceaeb29d9de0135e0fdfb3cb14de Mon Sep 17 00:00:00 2001 From: Ruslan Nigmatullin Date: Mon, 11 May 2026 22:11:36 +0000 Subject: [PATCH] [exec-server] add HTTP-upgrade listen mode --- codex-rs/exec-server/README.md | 1 + codex-rs/exec-server/src/client_transport.rs | 30 +++++--- codex-rs/exec-server/src/environment_toml.rs | 11 ++- codex-rs/exec-server/src/server/transport.rs | 74 ++++++++++++++++++- .../exec-server/src/server/transport_tests.rs | 28 ++++++- .../exec-server/tests/common/exec_server.rs | 36 +++++++-- codex-rs/exec-server/tests/health.rs | 30 ++++++++ 7 files changed, 187 insertions(+), 23 deletions(-) create mode 100644 codex-rs/exec-server/tests/health.rs diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index 81664eaca0..8a705135b7 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -22,6 +22,7 @@ the wire. The CLI entrypoint supports: - `ws://IP:PORT` (default) +- `ws+http://IP:PORT` for HTTP-upgrade websocket connections plus `/readyz` - `--remote URL --executor-id ID [--name NAME]` Remote mode registers the local exec-server with the executor registry, diff --git a/codex-rs/exec-server/src/client_transport.rs b/codex-rs/exec-server/src/client_transport.rs index 8ca1eb0280..0d089622ca 100644 --- a/codex-rs/exec-server/src/client_transport.rs +++ b/codex-rs/exec-server/src/client_transport.rs @@ -57,17 +57,21 @@ impl ExecServerClient { ) -> Result { ensure_rustls_crypto_provider(); let websocket_url = args.websocket_url.clone(); + let websocket_connect_url = websocket_connect_url(&websocket_url); let connect_timeout = args.connect_timeout; - let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str())) - .await - .map_err(|_| ExecServerError::WebSocketConnectTimeout { - url: websocket_url.clone(), - timeout: connect_timeout, - })? - .map_err(|source| ExecServerError::WebSocketConnect { - url: websocket_url.clone(), - source, - })?; + let (stream, _) = timeout( + connect_timeout, + connect_async(websocket_connect_url.as_str()), + ) + .await + .map_err(|_| ExecServerError::WebSocketConnectTimeout { + url: websocket_url.clone(), + timeout: connect_timeout, + })? + .map_err(|source| ExecServerError::WebSocketConnect { + url: websocket_url.clone(), + source, + })?; Self::connect( JsonRpcConnection::from_websocket( @@ -120,6 +124,12 @@ impl ExecServerClient { } } +fn websocket_connect_url(websocket_url: &str) -> String { + websocket_url + .strip_prefix("ws+http://") + .map_or_else(|| websocket_url.to_string(), |url| format!("ws://{url}")) +} + fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command { let mut command = Command::new(&stdio_command.program); command.args(&stdio_command.args); diff --git a/codex-rs/exec-server/src/environment_toml.rs b/codex-rs/exec-server/src/environment_toml.rs index 90f4c78262..2937bb5cd7 100644 --- a/codex-rs/exec-server/src/environment_toml.rs +++ b/codex-rs/exec-server/src/environment_toml.rs @@ -276,12 +276,15 @@ fn validate_websocket_url(url: String) -> Result { "environment url cannot be empty".to_string(), )); } - if !url.starts_with("ws://") && !url.starts_with("wss://") { + if !url.starts_with("ws://") && !url.starts_with("wss://") && !url.starts_with("ws+http://") { return Err(ExecServerError::Protocol(format!( - "environment url `{url}` must use ws:// or wss://" + "environment url `{url}` must use ws://, wss://, or ws+http://" ))); } - url.into_client_request().map_err(|err| { + let websocket_connect_url = url + .strip_prefix("ws+http://") + .map_or_else(|| url.to_string(), |url| format!("ws://{url}")); + websocket_connect_url.into_client_request().map_err(|err| { ExecServerError::Protocol(format!("environment url `{url}` is invalid: {err}")) })?; Ok(url.to_string()) @@ -438,7 +441,7 @@ mod tests { url: Some("http://127.0.0.1:8765".to_string()), ..Default::default() }, - "environment url `http://127.0.0.1:8765` must use ws:// or wss://", + "environment url `http://127.0.0.1:8765` must use ws://, wss://, or ws+http://", ), ( EnvironmentToml { diff --git a/codex-rs/exec-server/src/server/transport.rs b/codex-rs/exec-server/src/server/transport.rs index 9deafa9b24..db5128b62b 100644 --- a/codex-rs/exec-server/src/server/transport.rs +++ b/codex-rs/exec-server/src/server/transport.rs @@ -1,3 +1,11 @@ +use axum::Router; +use axum::extract::ConnectInfo; +use axum::extract::State; +use axum::extract::ws::WebSocketUpgrade; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::routing::any; +use axum::routing::get; use std::io::Write as _; use std::net::SocketAddr; use tokio::io; @@ -17,6 +25,7 @@ pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0"; #[derive(Debug, Clone, Eq, PartialEq)] pub(crate) enum ExecServerListenTransport { WebSocket(SocketAddr), + HttpUpgradeWebSocket(SocketAddr), Stdio, } @@ -31,11 +40,11 @@ impl std::fmt::Display for ExecServerListenUrlParseError { match self { ExecServerListenUrlParseError::UnsupportedListenUrl(listen_url) => write!( f, - "unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT` or `stdio`" + "unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`, `ws+http://IP:PORT`, or `stdio`" ), ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url) => write!( f, - "invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`" + "invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT` or `ws+http://IP:PORT`" ), } } @@ -59,6 +68,15 @@ pub(crate) fn parse_listen_url( }); } + if let Some(socket_addr) = listen_url.strip_prefix("ws+http://") { + return socket_addr + .parse::() + .map(ExecServerListenTransport::HttpUpgradeWebSocket) + .map_err(|_| { + ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string()) + }); + } + Err(ExecServerListenUrlParseError::UnsupportedListenUrl( listen_url.to_string(), )) @@ -72,6 +90,9 @@ pub(crate) async fn run_transport( ExecServerListenTransport::WebSocket(bind_address) => { run_websocket_listener(bind_address, runtime_paths).await } + ExecServerListenTransport::HttpUpgradeWebSocket(bind_address) => { + run_http_upgrade_websocket_listener(bind_address, runtime_paths).await + } ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await, } } @@ -137,6 +158,55 @@ async fn run_websocket_listener( } } +async fn run_http_upgrade_websocket_listener( + bind_address: SocketAddr, + runtime_paths: ExecServerRuntimePaths, +) -> Result<(), Box> { + let listener = TcpListener::bind(bind_address).await?; + let local_addr = listener.local_addr()?; + let processor = ConnectionProcessor::new(runtime_paths); + info!("codex-exec-server listening on ws+http://{local_addr}"); + println!("ws+http://{local_addr}"); + std::io::stdout().flush()?; + + let router = Router::new() + .route("/", any(websocket_upgrade_handler)) + .route("/readyz", get(readiness_handler)) + .with_state(ExecServerWebSocketState { processor }); + axum::serve( + listener, + router.into_make_service_with_connect_info::(), + ) + .await?; + Ok(()) +} + +#[derive(Clone)] +struct ExecServerWebSocketState { + processor: ConnectionProcessor, +} + +async fn readiness_handler() -> StatusCode { + StatusCode::OK +} + +async fn websocket_upgrade_handler( + websocket: WebSocketUpgrade, + ConnectInfo(peer_addr): ConnectInfo, + State(state): State, +) -> impl IntoResponse { + info!(%peer_addr, "exec-server HTTP-upgrade websocket client connected"); + websocket.on_upgrade(move |stream| async move { + state + .processor + .run_connection(JsonRpcConnection::from_axum_websocket( + stream, + format!("exec-server HTTP-upgrade websocket {peer_addr}"), + )) + .await; + }) +} + #[cfg(test)] #[path = "transport_tests.rs"] mod transport_tests; diff --git a/codex-rs/exec-server/src/server/transport_tests.rs b/codex-rs/exec-server/src/server/transport_tests.rs index b9787d8a37..6916450a91 100644 --- a/codex-rs/exec-server/src/server/transport_tests.rs +++ b/codex-rs/exec-server/src/server/transport_tests.rs @@ -125,13 +125,37 @@ fn parse_listen_url_accepts_websocket_url() { ); } +#[test] +fn parse_listen_url_accepts_http_upgrade_websocket_url() { + let transport = parse_listen_url("ws+http://127.0.0.1:1234") + .expect("HTTP-upgrade websocket listen URL should parse"); + assert_eq!( + transport, + ExecServerListenTransport::HttpUpgradeWebSocket( + "127.0.0.1:1234" + .parse::() + .expect("valid socket address") + ) + ); +} + #[test] fn parse_listen_url_rejects_invalid_websocket_url() { let err = parse_listen_url("ws://localhost:1234") .expect_err("hostname bind address should be rejected"); assert_eq!( err.to_string(), - "invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`" + "invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT` or `ws+http://IP:PORT`" + ); +} + +#[test] +fn parse_listen_url_rejects_invalid_http_upgrade_websocket_url() { + let err = parse_listen_url("ws+http://localhost:1234") + .expect_err("hostname bind address should be rejected"); + assert_eq!( + err.to_string(), + "invalid websocket --listen URL `ws+http://localhost:1234`; expected `ws://IP:PORT` or `ws+http://IP:PORT`" ); } @@ -141,7 +165,7 @@ fn parse_listen_url_rejects_unsupported_url() { parse_listen_url("http://127.0.0.1:1234").expect_err("unsupported scheme should fail"); assert_eq!( err.to_string(), - "unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT` or `stdio`" + "unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`, `ws+http://IP:PORT`, or `stdio`" ); } diff --git a/codex-rs/exec-server/tests/common/exec_server.rs b/codex-rs/exec-server/tests/common/exec_server.rs index 4ff7408715..0fc95dfbbb 100644 --- a/codex-rs/exec-server/tests/common/exec_server.rs +++ b/codex-rs/exec-server/tests/common/exec_server.rs @@ -57,10 +57,28 @@ pub(crate) fn test_codex_helper_paths() -> anyhow::Result } pub(crate) async fn exec_server() -> anyhow::Result { - exec_server_with_env(std::iter::empty::<(&str, &str)>()).await + exec_server_with_listen_url_and_env("ws://127.0.0.1:0", std::iter::empty::<(&str, &str)>()) + .await +} + +pub(crate) async fn http_exec_server() -> anyhow::Result { + exec_server_with_listen_url_and_env("ws+http://127.0.0.1:0", std::iter::empty::<(&str, &str)>()) + .await } pub(crate) async fn exec_server_with_env(env: I) -> anyhow::Result +where + I: IntoIterator, + K: AsRef, + V: AsRef, +{ + exec_server_with_listen_url_and_env("ws://127.0.0.1:0", env).await +} + +async fn exec_server_with_listen_url_and_env( + listen_url: &str, + env: I, +) -> anyhow::Result where I: IntoIterator, K: AsRef, @@ -69,7 +87,7 @@ where let helper_paths = test_codex_helper_paths()?; let codex_home = TempDir::new()?; let mut child = Command::new(&helper_paths.codex_exe); - child.args(["exec-server", "--listen", "ws://127.0.0.1:0"]); + child.args(["exec-server", "--listen", listen_url]); child.stdin(Stdio::null()); child.stdout(Stdio::piped()); child.stderr(Stdio::inherit()); @@ -79,7 +97,8 @@ where let mut child = child.spawn()?; let websocket_url = read_listen_url_from_stdout(&mut child).await?; - let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?; + let (websocket, _) = + connect_websocket_when_ready(&websocket_connect_url(&websocket_url)).await?; Ok(ExecServerHarness { _codex_home: codex_home, _helper_paths: helper_paths, @@ -101,7 +120,8 @@ impl ExecServerHarness { } pub(crate) async fn reconnect_websocket(&mut self) -> anyhow::Result<()> { - let (websocket, _) = connect_websocket_when_ready(&self.websocket_url).await?; + let (websocket, _) = + connect_websocket_when_ready(&websocket_connect_url(&self.websocket_url)).await?; self.websocket = websocket; Ok(()) } @@ -254,8 +274,14 @@ async fn read_listen_url_from_stdout(child: &mut Child) -> anyhow::Result String { + websocket_url + .strip_prefix("ws+http://") + .map_or_else(|| websocket_url.to_string(), |url| format!("ws://{url}")) +} diff --git a/codex-rs/exec-server/tests/health.rs b/codex-rs/exec-server/tests/health.rs new file mode 100644 index 0000000000..6b366db44e --- /dev/null +++ b/codex-rs/exec-server/tests/health.rs @@ -0,0 +1,30 @@ +#![cfg(unix)] + +mod common; + +use codex_exec_server::ExecServerClient; +use codex_exec_server::RemoteExecServerConnectArgs; +use common::exec_server::http_exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn http_upgrade_exec_server_serves_readyz_and_accepts_clients() -> anyhow::Result<()> { + let mut server = http_exec_server().await?; + let http_base_url = server + .websocket_url() + .strip_prefix("ws+http://") + .expect("HTTP-upgrade websocket URL should use ws+http://"); + + let response = reqwest::get(format!("http://{http_base_url}/readyz")).await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let client = ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( + server.websocket_url().to_string(), + "exec-server-health-test".to_string(), + )) + .await?; + drop(client); + + server.shutdown().await?; + Ok(()) +}