diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index 8a705135b7..81664eaca0 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -22,7 +22,6 @@ the wire. The CLI entrypoint supports: - `ws://IP:PORT` (default) -- `ws+http://IP:PORT` for HTTP-upgrade websocket connections plus `/readyz` - `--remote URL --executor-id ID [--name NAME]` Remote mode registers the local exec-server with the executor registry, diff --git a/codex-rs/exec-server/src/client_transport.rs b/codex-rs/exec-server/src/client_transport.rs index 0d089622ca..8ca1eb0280 100644 --- a/codex-rs/exec-server/src/client_transport.rs +++ b/codex-rs/exec-server/src/client_transport.rs @@ -57,21 +57,17 @@ impl ExecServerClient { ) -> Result { ensure_rustls_crypto_provider(); let websocket_url = args.websocket_url.clone(); - let websocket_connect_url = websocket_connect_url(&websocket_url); let connect_timeout = args.connect_timeout; - let (stream, _) = timeout( - connect_timeout, - connect_async(websocket_connect_url.as_str()), - ) - .await - .map_err(|_| ExecServerError::WebSocketConnectTimeout { - url: websocket_url.clone(), - timeout: connect_timeout, - })? - .map_err(|source| ExecServerError::WebSocketConnect { - url: websocket_url.clone(), - source, - })?; + let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str())) + .await + .map_err(|_| ExecServerError::WebSocketConnectTimeout { + url: websocket_url.clone(), + timeout: connect_timeout, + })? + .map_err(|source| ExecServerError::WebSocketConnect { + url: websocket_url.clone(), + source, + })?; Self::connect( JsonRpcConnection::from_websocket( @@ -124,12 +120,6 @@ impl ExecServerClient { } } -fn websocket_connect_url(websocket_url: &str) -> String { - websocket_url - .strip_prefix("ws+http://") - .map_or_else(|| websocket_url.to_string(), |url| format!("ws://{url}")) -} - fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command { let mut command = Command::new(&stdio_command.program); command.args(&stdio_command.args); diff --git a/codex-rs/exec-server/src/environment_toml.rs b/codex-rs/exec-server/src/environment_toml.rs index 2937bb5cd7..90f4c78262 100644 --- a/codex-rs/exec-server/src/environment_toml.rs +++ b/codex-rs/exec-server/src/environment_toml.rs @@ -276,15 +276,12 @@ fn validate_websocket_url(url: String) -> Result { "environment url cannot be empty".to_string(), )); } - if !url.starts_with("ws://") && !url.starts_with("wss://") && !url.starts_with("ws+http://") { + if !url.starts_with("ws://") && !url.starts_with("wss://") { return Err(ExecServerError::Protocol(format!( - "environment url `{url}` must use ws://, wss://, or ws+http://" + "environment url `{url}` must use ws:// or wss://" ))); } - let websocket_connect_url = url - .strip_prefix("ws+http://") - .map_or_else(|| url.to_string(), |url| format!("ws://{url}")); - websocket_connect_url.into_client_request().map_err(|err| { + url.into_client_request().map_err(|err| { ExecServerError::Protocol(format!("environment url `{url}` is invalid: {err}")) })?; Ok(url.to_string()) @@ -441,7 +438,7 @@ mod tests { url: Some("http://127.0.0.1:8765".to_string()), ..Default::default() }, - "environment url `http://127.0.0.1:8765` must use ws://, wss://, or ws+http://", + "environment url `http://127.0.0.1:8765` must use ws:// or wss://", ), ( EnvironmentToml { diff --git a/codex-rs/exec-server/src/server/transport.rs b/codex-rs/exec-server/src/server/transport.rs index db5128b62b..9deafa9b24 100644 --- a/codex-rs/exec-server/src/server/transport.rs +++ b/codex-rs/exec-server/src/server/transport.rs @@ -1,11 +1,3 @@ -use axum::Router; -use axum::extract::ConnectInfo; -use axum::extract::State; -use axum::extract::ws::WebSocketUpgrade; -use axum::http::StatusCode; -use axum::response::IntoResponse; -use axum::routing::any; -use axum::routing::get; use std::io::Write as _; use std::net::SocketAddr; use tokio::io; @@ -25,7 +17,6 @@ pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0"; #[derive(Debug, Clone, Eq, PartialEq)] pub(crate) enum ExecServerListenTransport { WebSocket(SocketAddr), - HttpUpgradeWebSocket(SocketAddr), Stdio, } @@ -40,11 +31,11 @@ impl std::fmt::Display for ExecServerListenUrlParseError { match self { ExecServerListenUrlParseError::UnsupportedListenUrl(listen_url) => write!( f, - "unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`, `ws+http://IP:PORT`, or `stdio`" + "unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT` or `stdio`" ), ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url) => write!( f, - "invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT` or `ws+http://IP:PORT`" + "invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`" ), } } @@ -68,15 +59,6 @@ pub(crate) fn parse_listen_url( }); } - if let Some(socket_addr) = listen_url.strip_prefix("ws+http://") { - return socket_addr - .parse::() - .map(ExecServerListenTransport::HttpUpgradeWebSocket) - .map_err(|_| { - ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string()) - }); - } - Err(ExecServerListenUrlParseError::UnsupportedListenUrl( listen_url.to_string(), )) @@ -90,9 +72,6 @@ pub(crate) async fn run_transport( ExecServerListenTransport::WebSocket(bind_address) => { run_websocket_listener(bind_address, runtime_paths).await } - ExecServerListenTransport::HttpUpgradeWebSocket(bind_address) => { - run_http_upgrade_websocket_listener(bind_address, runtime_paths).await - } ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await, } } @@ -158,55 +137,6 @@ async fn run_websocket_listener( } } -async fn run_http_upgrade_websocket_listener( - bind_address: SocketAddr, - runtime_paths: ExecServerRuntimePaths, -) -> Result<(), Box> { - let listener = TcpListener::bind(bind_address).await?; - let local_addr = listener.local_addr()?; - let processor = ConnectionProcessor::new(runtime_paths); - info!("codex-exec-server listening on ws+http://{local_addr}"); - println!("ws+http://{local_addr}"); - std::io::stdout().flush()?; - - let router = Router::new() - .route("/", any(websocket_upgrade_handler)) - .route("/readyz", get(readiness_handler)) - .with_state(ExecServerWebSocketState { processor }); - axum::serve( - listener, - router.into_make_service_with_connect_info::(), - ) - .await?; - Ok(()) -} - -#[derive(Clone)] -struct ExecServerWebSocketState { - processor: ConnectionProcessor, -} - -async fn readiness_handler() -> StatusCode { - StatusCode::OK -} - -async fn websocket_upgrade_handler( - websocket: WebSocketUpgrade, - ConnectInfo(peer_addr): ConnectInfo, - State(state): State, -) -> impl IntoResponse { - info!(%peer_addr, "exec-server HTTP-upgrade websocket client connected"); - websocket.on_upgrade(move |stream| async move { - state - .processor - .run_connection(JsonRpcConnection::from_axum_websocket( - stream, - format!("exec-server HTTP-upgrade websocket {peer_addr}"), - )) - .await; - }) -} - #[cfg(test)] #[path = "transport_tests.rs"] mod transport_tests; diff --git a/codex-rs/exec-server/src/server/transport_tests.rs b/codex-rs/exec-server/src/server/transport_tests.rs index 6916450a91..b9787d8a37 100644 --- a/codex-rs/exec-server/src/server/transport_tests.rs +++ b/codex-rs/exec-server/src/server/transport_tests.rs @@ -125,37 +125,13 @@ fn parse_listen_url_accepts_websocket_url() { ); } -#[test] -fn parse_listen_url_accepts_http_upgrade_websocket_url() { - let transport = parse_listen_url("ws+http://127.0.0.1:1234") - .expect("HTTP-upgrade websocket listen URL should parse"); - assert_eq!( - transport, - ExecServerListenTransport::HttpUpgradeWebSocket( - "127.0.0.1:1234" - .parse::() - .expect("valid socket address") - ) - ); -} - #[test] fn parse_listen_url_rejects_invalid_websocket_url() { let err = parse_listen_url("ws://localhost:1234") .expect_err("hostname bind address should be rejected"); assert_eq!( err.to_string(), - "invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT` or `ws+http://IP:PORT`" - ); -} - -#[test] -fn parse_listen_url_rejects_invalid_http_upgrade_websocket_url() { - let err = parse_listen_url("ws+http://localhost:1234") - .expect_err("hostname bind address should be rejected"); - assert_eq!( - err.to_string(), - "invalid websocket --listen URL `ws+http://localhost:1234`; expected `ws://IP:PORT` or `ws+http://IP:PORT`" + "invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`" ); } @@ -165,7 +141,7 @@ fn parse_listen_url_rejects_unsupported_url() { parse_listen_url("http://127.0.0.1:1234").expect_err("unsupported scheme should fail"); assert_eq!( err.to_string(), - "unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`, `ws+http://IP:PORT`, or `stdio`" + "unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT` or `stdio`" ); } diff --git a/codex-rs/exec-server/tests/common/exec_server.rs b/codex-rs/exec-server/tests/common/exec_server.rs index 0fc95dfbbb..4ff7408715 100644 --- a/codex-rs/exec-server/tests/common/exec_server.rs +++ b/codex-rs/exec-server/tests/common/exec_server.rs @@ -57,28 +57,10 @@ pub(crate) fn test_codex_helper_paths() -> anyhow::Result } pub(crate) async fn exec_server() -> anyhow::Result { - exec_server_with_listen_url_and_env("ws://127.0.0.1:0", std::iter::empty::<(&str, &str)>()) - .await -} - -pub(crate) async fn http_exec_server() -> anyhow::Result { - exec_server_with_listen_url_and_env("ws+http://127.0.0.1:0", std::iter::empty::<(&str, &str)>()) - .await + exec_server_with_env(std::iter::empty::<(&str, &str)>()).await } pub(crate) async fn exec_server_with_env(env: I) -> anyhow::Result -where - I: IntoIterator, - K: AsRef, - V: AsRef, -{ - exec_server_with_listen_url_and_env("ws://127.0.0.1:0", env).await -} - -async fn exec_server_with_listen_url_and_env( - listen_url: &str, - env: I, -) -> anyhow::Result where I: IntoIterator, K: AsRef, @@ -87,7 +69,7 @@ where let helper_paths = test_codex_helper_paths()?; let codex_home = TempDir::new()?; let mut child = Command::new(&helper_paths.codex_exe); - child.args(["exec-server", "--listen", listen_url]); + child.args(["exec-server", "--listen", "ws://127.0.0.1:0"]); child.stdin(Stdio::null()); child.stdout(Stdio::piped()); child.stderr(Stdio::inherit()); @@ -97,8 +79,7 @@ where let mut child = child.spawn()?; let websocket_url = read_listen_url_from_stdout(&mut child).await?; - let (websocket, _) = - connect_websocket_when_ready(&websocket_connect_url(&websocket_url)).await?; + let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?; Ok(ExecServerHarness { _codex_home: codex_home, _helper_paths: helper_paths, @@ -120,8 +101,7 @@ impl ExecServerHarness { } pub(crate) async fn reconnect_websocket(&mut self) -> anyhow::Result<()> { - let (websocket, _) = - connect_websocket_when_ready(&websocket_connect_url(&self.websocket_url)).await?; + let (websocket, _) = connect_websocket_when_ready(&self.websocket_url).await?; self.websocket = websocket; Ok(()) } @@ -274,14 +254,8 @@ async fn read_listen_url_from_stdout(child: &mut Child) -> anyhow::Result String { - websocket_url - .strip_prefix("ws+http://") - .map_or_else(|| websocket_url.to_string(), |url| format!("ws://{url}")) -} diff --git a/codex-rs/exec-server/tests/health.rs b/codex-rs/exec-server/tests/health.rs deleted file mode 100644 index 6b366db44e..0000000000 --- a/codex-rs/exec-server/tests/health.rs +++ /dev/null @@ -1,30 +0,0 @@ -#![cfg(unix)] - -mod common; - -use codex_exec_server::ExecServerClient; -use codex_exec_server::RemoteExecServerConnectArgs; -use common::exec_server::http_exec_server; -use pretty_assertions::assert_eq; - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn http_upgrade_exec_server_serves_readyz_and_accepts_clients() -> anyhow::Result<()> { - let mut server = http_exec_server().await?; - let http_base_url = server - .websocket_url() - .strip_prefix("ws+http://") - .expect("HTTP-upgrade websocket URL should use ws+http://"); - - let response = reqwest::get(format!("http://{http_base_url}/readyz")).await?; - assert_eq!(response.status(), reqwest::StatusCode::OK); - - let client = ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( - server.websocket_url().to_string(), - "exec-server-health-test".to_string(), - )) - .await?; - drop(client); - - server.shutdown().await?; - Ok(()) -}