mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Stabilize websocket test server binding (#14002)
## Summary - stop reserving a localhost port in the websocket tests before spawning the server - let the app-server bind `127.0.0.1:0` itself and read back the actual bound websocket address from stderr - update the websocket test helpers and callers to use the discovered address ## Why this fixes the flake The previous harness reserved a port in the test process, dropped it, and then asked the server process to bind that same address. On busy runners there is a race between releasing the reservation and the child process rebinding it, which can produce sporadic startup failures. Binding to port `0` inside the server removes that race entirely, and waiting for the server to report the real bound address makes the tests connect only after the listener is actually ready.
This commit is contained in:
@@ -29,7 +29,6 @@ use super::connection_handling_websocket::assert_no_message;
|
||||
use super::connection_handling_websocket::connect_websocket;
|
||||
use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_jsonrpc_message;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
@@ -712,8 +711,7 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
let mut ws2 = connect_websocket(bind_addr).await?;
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
use tempfile::TempDir;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::Child;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::Duration;
|
||||
@@ -40,8 +41,7 @@ async fn websocket_transport_routes_per_connection_handshake_and_responses() ->
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
let mut ws2 = connect_websocket(bind_addr).await?;
|
||||
@@ -86,8 +86,7 @@ async fn websocket_transport_serves_health_endpoints_on_same_listener() -> Resul
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let readyz = http_get(&client, bind_addr, "/readyz").await?;
|
||||
@@ -108,15 +107,12 @@ async fn websocket_transport_serves_health_endpoints_on_same_listener() -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn spawn_websocket_server(
|
||||
codex_home: &Path,
|
||||
bind_addr: SocketAddr,
|
||||
) -> Result<Child> {
|
||||
pub(super) async fn spawn_websocket_server(codex_home: &Path) -> Result<(Child, SocketAddr)> {
|
||||
let program = codex_utils_cargo_bin::cargo_bin("codex-app-server")
|
||||
.context("should find app-server binary")?;
|
||||
let mut cmd = Command::new(program);
|
||||
cmd.arg("--listen")
|
||||
.arg(format!("ws://{bind_addr}"))
|
||||
.arg("ws://127.0.0.1:0")
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::piped())
|
||||
@@ -127,23 +123,57 @@ pub(super) async fn spawn_websocket_server(
|
||||
.spawn()
|
||||
.context("failed to spawn websocket app-server process")?;
|
||||
|
||||
if let Some(stderr) = process.stderr.take() {
|
||||
let mut stderr_reader = tokio::io::BufReader::new(stderr).lines();
|
||||
tokio::spawn(async move {
|
||||
while let Ok(Some(line)) = stderr_reader.next_line().await {
|
||||
eprintln!("[websocket app-server stderr] {line}");
|
||||
let stderr = process
|
||||
.stderr
|
||||
.take()
|
||||
.context("failed to capture websocket app-server stderr")?;
|
||||
let mut stderr_reader = BufReader::new(stderr).lines();
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
let bind_addr = loop {
|
||||
let line = timeout(
|
||||
deadline.saturating_duration_since(Instant::now()),
|
||||
stderr_reader.next_line(),
|
||||
)
|
||||
.await
|
||||
.context("timed out waiting for websocket app-server to report bound websocket address")?
|
||||
.context("failed to read websocket app-server stderr")?
|
||||
.context("websocket app-server exited before reporting bound websocket address")?;
|
||||
eprintln!("[websocket app-server stderr] {line}");
|
||||
|
||||
let stripped_line = {
|
||||
let mut stripped = String::with_capacity(line.len());
|
||||
let mut chars = line.chars().peekable();
|
||||
while let Some(ch) = chars.next() {
|
||||
if ch == '\u{1b}' && matches!(chars.peek(), Some(&'[')) {
|
||||
chars.next();
|
||||
for next in chars.by_ref() {
|
||||
if ('@'..='~').contains(&next) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
stripped.push(ch);
|
||||
}
|
||||
});
|
||||
}
|
||||
stripped
|
||||
};
|
||||
|
||||
Ok(process)
|
||||
}
|
||||
if let Some(bind_addr) = stripped_line
|
||||
.split_whitespace()
|
||||
.find_map(|token| token.strip_prefix("ws://"))
|
||||
.and_then(|addr| addr.parse::<SocketAddr>().ok())
|
||||
{
|
||||
break bind_addr;
|
||||
}
|
||||
};
|
||||
|
||||
pub(super) fn reserve_local_addr() -> Result<SocketAddr> {
|
||||
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
|
||||
let addr = listener.local_addr()?;
|
||||
drop(listener);
|
||||
Ok(addr)
|
||||
tokio::spawn(async move {
|
||||
while let Ok(Some(line)) = stderr_reader.next_line().await {
|
||||
eprintln!("[websocket app-server stderr] {line}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok((process, bind_addr))
|
||||
}
|
||||
|
||||
pub(super) async fn connect_websocket(bind_addr: SocketAddr) -> Result<WsClient> {
|
||||
|
||||
@@ -3,7 +3,6 @@ use super::connection_handling_websocket::WsClient;
|
||||
use super::connection_handling_websocket::connect_websocket;
|
||||
use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_response_for_id;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
@@ -154,8 +153,7 @@ async fn start_ctrl_c_restart_fixture(turn_delay: Duration) -> Result<GracefulCt
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
let mut ws = connect_websocket(bind_addr).await?;
|
||||
|
||||
send_initialize_request(&mut ws, 1, "ws_graceful_shutdown").await?;
|
||||
|
||||
@@ -6,7 +6,6 @@ use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_notification_for_method;
|
||||
use super::connection_handling_websocket::read_response_and_notification_for_method;
|
||||
use super::connection_handling_websocket::read_response_for_id;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
@@ -34,8 +33,7 @@ async fn thread_name_updated_broadcasts_for_loaded_threads() -> Result<()> {
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-00-00")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let result = async {
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
@@ -96,8 +94,7 @@ async fn thread_name_updated_broadcasts_for_not_loaded_threads() -> Result<()> {
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-05-00")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let result = async {
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
|
||||
Reference in New Issue
Block a user