Files
codex/codex-rs/exec-server/src/client_transport.rs
Ruslan Nigmatullin 95d8669ab2 [exec-server] serve websocket listener via HTTP upgrade (#21963)
## Why

`codex exec-server` should keep the existing public `ws://IP:PORT` URL
shape while serving that websocket connection through an HTTP upgrade
path internally. That keeps the client-facing configuration simple and
allows the listener to work through intermediate HTTP-aware
infrastructure.

## What changed

- keep the emitted and configured exec-server URL as `ws://IP:PORT`
- serve that websocket endpoint through Axum HTTP upgrade handling on
`/`
- expose `GET /readyz` from the same listener for readiness checks
- route upgraded Axum websocket streams through the shared JSON-RPC
connection machinery
- initialize the rustls crypto provider before websocket client
connections
- preserve inbound binary websocket JSON-RPC parsing for compatibility
with the prior transport behavior

## Verification

- `cargo test -p codex-exec-server --test health --test process --test
websocket --test initialize --test exec_process`
2026-05-11 17:04:21 -07:00

134 lines
4.5 KiB
Rust

use std::process::Stdio;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Command;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::debug;
use tracing::warn;
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::client_api::StdioExecServerCommand;
use crate::client_api::StdioExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
const ENVIRONMENT_CLIENT_NAME: &str = "codex-environment";
impl ExecServerClient {
pub(crate) async fn connect_for_transport(
transport_params: crate::client_api::ExecServerTransportParams,
) -> Result<Self, ExecServerError> {
match transport_params {
crate::client_api::ExecServerTransportParams::WebSocketUrl {
websocket_url,
connect_timeout,
initialize_timeout,
} => {
Self::connect_websocket(RemoteExecServerConnectArgs {
websocket_url,
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
connect_timeout,
initialize_timeout,
resume_session_id: None,
})
.await
}
crate::client_api::ExecServerTransportParams::StdioCommand {
command,
initialize_timeout,
} => {
Self::connect_stdio_command(StdioExecServerConnectArgs {
command,
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
initialize_timeout,
resume_session_id: None,
})
.await
}
}
}
pub async fn connect_websocket(
args: RemoteExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
ensure_rustls_crypto_provider();
let websocket_url = args.websocket_url.clone();
let connect_timeout = args.connect_timeout;
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
Self::connect(
JsonRpcConnection::from_websocket(
stream,
format!("exec-server websocket {websocket_url}"),
),
args.into(),
)
.await
}
pub(crate) async fn connect_stdio_command(
args: StdioExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
let mut child = stdio_command_process(&args.command)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(ExecServerError::Spawn)?;
let stdin = child.stdin.take().ok_or_else(|| {
ExecServerError::Protocol("spawned exec-server command has no stdin".to_string())
})?;
let stdout = child.stdout.take().ok_or_else(|| {
ExecServerError::Protocol("spawned exec-server command has no stdout".to_string())
})?;
if let Some(stderr) = child.stderr.take() {
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => debug!("exec-server stdio stderr: {line}"),
Ok(None) => break,
Err(err) => {
warn!("failed to read exec-server stdio stderr: {err}");
break;
}
}
}
});
}
Self::connect(
JsonRpcConnection::from_stdio(stdout, stdin, "exec-server stdio command".to_string())
.with_child_process(child),
args.into(),
)
.await
}
}
fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command {
let mut command = Command::new(&stdio_command.program);
command.args(&stdio_command.args);
command.envs(&stdio_command.env);
if let Some(cwd) = &stdio_command.cwd {
command.current_dir(cwd);
}
#[cfg(unix)]
command.process_group(0);
command
}