[exec-server] add HTTP-upgrade listen mode

This commit is contained in:
Ruslan Nigmatullin
2026-05-11 22:11:36 +00:00
parent b97c510ad7
commit 86b218d390
7 changed files with 187 additions and 23 deletions

View File

@@ -22,6 +22,7 @@ the wire.
The CLI entrypoint supports:
- `ws://IP:PORT` (default)
- `ws+http://IP:PORT` for HTTP-upgrade websocket connections plus `/readyz`
- `--remote URL --executor-id ID [--name NAME]`
Remote mode registers the local exec-server with the executor registry,

View File

@@ -57,17 +57,21 @@ impl ExecServerClient {
) -> Result<Self, ExecServerError> {
ensure_rustls_crypto_provider();
let websocket_url = args.websocket_url.clone();
let websocket_connect_url = websocket_connect_url(&websocket_url);
let connect_timeout = args.connect_timeout;
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
let (stream, _) = timeout(
connect_timeout,
connect_async(websocket_connect_url.as_str()),
)
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
Self::connect(
JsonRpcConnection::from_websocket(
@@ -120,6 +124,12 @@ impl ExecServerClient {
}
}
fn websocket_connect_url(websocket_url: &str) -> String {
websocket_url
.strip_prefix("ws+http://")
.map_or_else(|| websocket_url.to_string(), |url| format!("ws://{url}"))
}
fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command {
let mut command = Command::new(&stdio_command.program);
command.args(&stdio_command.args);

View File

@@ -276,12 +276,15 @@ fn validate_websocket_url(url: String) -> Result<String, ExecServerError> {
"environment url cannot be empty".to_string(),
));
}
if !url.starts_with("ws://") && !url.starts_with("wss://") {
if !url.starts_with("ws://") && !url.starts_with("wss://") && !url.starts_with("ws+http://") {
return Err(ExecServerError::Protocol(format!(
"environment url `{url}` must use ws:// or wss://"
"environment url `{url}` must use ws://, wss://, or ws+http://"
)));
}
url.into_client_request().map_err(|err| {
let websocket_connect_url = url
.strip_prefix("ws+http://")
.map_or_else(|| url.to_string(), |url| format!("ws://{url}"));
websocket_connect_url.into_client_request().map_err(|err| {
ExecServerError::Protocol(format!("environment url `{url}` is invalid: {err}"))
})?;
Ok(url.to_string())
@@ -438,7 +441,7 @@ mod tests {
url: Some("http://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment url `http://127.0.0.1:8765` must use ws:// or wss://",
"environment url `http://127.0.0.1:8765` must use ws://, wss://, or ws+http://",
),
(
EnvironmentToml {

View File

@@ -1,3 +1,11 @@
use axum::Router;
use axum::extract::ConnectInfo;
use axum::extract::State;
use axum::extract::ws::WebSocketUpgrade;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::any;
use axum::routing::get;
use std::io::Write as _;
use std::net::SocketAddr;
use tokio::io;
@@ -17,6 +25,7 @@ pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) enum ExecServerListenTransport {
WebSocket(SocketAddr),
HttpUpgradeWebSocket(SocketAddr),
Stdio,
}
@@ -31,11 +40,11 @@ impl std::fmt::Display for ExecServerListenUrlParseError {
match self {
ExecServerListenUrlParseError::UnsupportedListenUrl(listen_url) => write!(
f,
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT` or `stdio`"
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`, `ws+http://IP:PORT`, or `stdio`"
),
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url) => write!(
f,
"invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`"
"invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT` or `ws+http://IP:PORT`"
),
}
}
@@ -59,6 +68,15 @@ pub(crate) fn parse_listen_url(
});
}
if let Some(socket_addr) = listen_url.strip_prefix("ws+http://") {
return socket_addr
.parse::<SocketAddr>()
.map(ExecServerListenTransport::HttpUpgradeWebSocket)
.map_err(|_| {
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string())
});
}
Err(ExecServerListenUrlParseError::UnsupportedListenUrl(
listen_url.to_string(),
))
@@ -72,6 +90,9 @@ pub(crate) async fn run_transport(
ExecServerListenTransport::WebSocket(bind_address) => {
run_websocket_listener(bind_address, runtime_paths).await
}
ExecServerListenTransport::HttpUpgradeWebSocket(bind_address) => {
run_http_upgrade_websocket_listener(bind_address, runtime_paths).await
}
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await,
}
}
@@ -137,6 +158,55 @@ async fn run_websocket_listener(
}
}
async fn run_http_upgrade_websocket_listener(
bind_address: SocketAddr,
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(bind_address).await?;
let local_addr = listener.local_addr()?;
let processor = ConnectionProcessor::new(runtime_paths);
info!("codex-exec-server listening on ws+http://{local_addr}");
println!("ws+http://{local_addr}");
std::io::stdout().flush()?;
let router = Router::new()
.route("/", any(websocket_upgrade_handler))
.route("/readyz", get(readiness_handler))
.with_state(ExecServerWebSocketState { processor });
axum::serve(
listener,
router.into_make_service_with_connect_info::<SocketAddr>(),
)
.await?;
Ok(())
}
#[derive(Clone)]
struct ExecServerWebSocketState {
processor: ConnectionProcessor,
}
async fn readiness_handler() -> StatusCode {
StatusCode::OK
}
async fn websocket_upgrade_handler(
websocket: WebSocketUpgrade,
ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
State(state): State<ExecServerWebSocketState>,
) -> impl IntoResponse {
info!(%peer_addr, "exec-server HTTP-upgrade websocket client connected");
websocket.on_upgrade(move |stream| async move {
state
.processor
.run_connection(JsonRpcConnection::from_axum_websocket(
stream,
format!("exec-server HTTP-upgrade websocket {peer_addr}"),
))
.await;
})
}
#[cfg(test)]
#[path = "transport_tests.rs"]
mod transport_tests;

View File

@@ -125,13 +125,37 @@ fn parse_listen_url_accepts_websocket_url() {
);
}
#[test]
fn parse_listen_url_accepts_http_upgrade_websocket_url() {
let transport = parse_listen_url("ws+http://127.0.0.1:1234")
.expect("HTTP-upgrade websocket listen URL should parse");
assert_eq!(
transport,
ExecServerListenTransport::HttpUpgradeWebSocket(
"127.0.0.1:1234"
.parse::<SocketAddr>()
.expect("valid socket address")
)
);
}
#[test]
fn parse_listen_url_rejects_invalid_websocket_url() {
let err = parse_listen_url("ws://localhost:1234")
.expect_err("hostname bind address should be rejected");
assert_eq!(
err.to_string(),
"invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`"
"invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT` or `ws+http://IP:PORT`"
);
}
#[test]
fn parse_listen_url_rejects_invalid_http_upgrade_websocket_url() {
let err = parse_listen_url("ws+http://localhost:1234")
.expect_err("hostname bind address should be rejected");
assert_eq!(
err.to_string(),
"invalid websocket --listen URL `ws+http://localhost:1234`; expected `ws://IP:PORT` or `ws+http://IP:PORT`"
);
}
@@ -141,7 +165,7 @@ fn parse_listen_url_rejects_unsupported_url() {
parse_listen_url("http://127.0.0.1:1234").expect_err("unsupported scheme should fail");
assert_eq!(
err.to_string(),
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT` or `stdio`"
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`, `ws+http://IP:PORT`, or `stdio`"
);
}

View File

@@ -57,10 +57,28 @@ pub(crate) fn test_codex_helper_paths() -> anyhow::Result<TestCodexHelperPaths>
}
pub(crate) async fn exec_server() -> anyhow::Result<ExecServerHarness> {
exec_server_with_env(std::iter::empty::<(&str, &str)>()).await
exec_server_with_listen_url_and_env("ws://127.0.0.1:0", std::iter::empty::<(&str, &str)>())
.await
}
pub(crate) async fn http_exec_server() -> anyhow::Result<ExecServerHarness> {
exec_server_with_listen_url_and_env("ws+http://127.0.0.1:0", std::iter::empty::<(&str, &str)>())
.await
}
pub(crate) async fn exec_server_with_env<I, K, V>(env: I) -> anyhow::Result<ExecServerHarness>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<std::ffi::OsStr>,
V: AsRef<std::ffi::OsStr>,
{
exec_server_with_listen_url_and_env("ws://127.0.0.1:0", env).await
}
async fn exec_server_with_listen_url_and_env<I, K, V>(
listen_url: &str,
env: I,
) -> anyhow::Result<ExecServerHarness>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<std::ffi::OsStr>,
@@ -69,7 +87,7 @@ where
let helper_paths = test_codex_helper_paths()?;
let codex_home = TempDir::new()?;
let mut child = Command::new(&helper_paths.codex_exe);
child.args(["exec-server", "--listen", "ws://127.0.0.1:0"]);
child.args(["exec-server", "--listen", listen_url]);
child.stdin(Stdio::null());
child.stdout(Stdio::piped());
child.stderr(Stdio::inherit());
@@ -79,7 +97,8 @@ where
let mut child = child.spawn()?;
let websocket_url = read_listen_url_from_stdout(&mut child).await?;
let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?;
let (websocket, _) =
connect_websocket_when_ready(&websocket_connect_url(&websocket_url)).await?;
Ok(ExecServerHarness {
_codex_home: codex_home,
_helper_paths: helper_paths,
@@ -101,7 +120,8 @@ impl ExecServerHarness {
}
pub(crate) async fn reconnect_websocket(&mut self) -> anyhow::Result<()> {
let (websocket, _) = connect_websocket_when_ready(&self.websocket_url).await?;
let (websocket, _) =
connect_websocket_when_ready(&websocket_connect_url(&self.websocket_url)).await?;
self.websocket = websocket;
Ok(())
}
@@ -254,8 +274,14 @@ async fn read_listen_url_from_stdout(child: &mut Child) -> anyhow::Result<String
.map_err(|_| anyhow!("timed out waiting for exec-server stdout"))??
.ok_or_else(|| anyhow!("exec-server stdout closed before emitting listen URL"))?;
let listen_url = line.trim();
if listen_url.starts_with("ws://") {
if listen_url.starts_with("ws://") || listen_url.starts_with("ws+http://") {
return Ok(listen_url.to_string());
}
}
}
fn websocket_connect_url(websocket_url: &str) -> String {
websocket_url
.strip_prefix("ws+http://")
.map_or_else(|| websocket_url.to_string(), |url| format!("ws://{url}"))
}

View File

@@ -0,0 +1,30 @@
#![cfg(unix)]
mod common;
use codex_exec_server::ExecServerClient;
use codex_exec_server::RemoteExecServerConnectArgs;
use common::exec_server::http_exec_server;
use pretty_assertions::assert_eq;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn http_upgrade_exec_server_serves_readyz_and_accepts_clients() -> anyhow::Result<()> {
let mut server = http_exec_server().await?;
let http_base_url = server
.websocket_url()
.strip_prefix("ws+http://")
.expect("HTTP-upgrade websocket URL should use ws+http://");
let response = reqwest::get(format!("http://{http_base_url}/readyz")).await?;
assert_eq!(response.status(), reqwest::StatusCode::OK);
let client = ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new(
server.websocket_url().to_string(),
"exec-server-health-test".to_string(),
))
.await?;
drop(client);
server.shutdown().await?;
Ok(())
}