diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs index 7b9ca39422..4fb4b763b4 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_start.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -20,6 +20,7 @@ use codex_app_server_protocol::ThreadStatus; use codex_app_server_protocol::ThreadStatusChangedNotification; use codex_config::types::AuthCredentialsStoreMode; use codex_core::config::set_project_trust_level; +use codex_exec_server::ExecServerRuntimePaths; use codex_git_utils::resolve_root_git_project_for_trust; use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR; use codex_protocol::config_types::ServiceTier; @@ -28,11 +29,16 @@ use codex_protocol::openai_models::ReasoningEffort; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; +use std::net::TcpListener; use std::path::Path; use std::path::PathBuf; use std::time::Duration; use tempfile::TempDir; +use tokio::task::JoinHandle; +use tokio::time::Instant; +use tokio::time::sleep; use tokio::time::timeout; +use tokio_tungstenite::connect_async; use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; @@ -47,6 +53,7 @@ use super::analytics::wait_for_analytics_payload; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL"; const CODEX_EXEC_SERVER_CWD_ENV_VAR: &str = "CODEX_EXEC_SERVER_CWD"; +const EXEC_SERVER_CONNECT_RETRY_INTERVAL: Duration = Duration::from_millis(25); #[tokio::test] async fn thread_start_creates_thread_and_emits_started() -> Result<()> { @@ -203,6 +210,7 @@ async fn thread_start_uses_remote_default_cwd_when_request_omits_cwd() -> Result let codex_home = TempDir::new()?; create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?; + let exec_server = TestExecServer::spawn().await?; let remote_default_cwd = TempDir::new()?; let remote_default_cwd = remote_default_cwd.path().to_string_lossy().into_owned(); @@ -210,7 +218,10 @@ async fn thread_start_uses_remote_default_cwd_when_request_omits_cwd() -> Result let mut mcp = McpProcess::new_with_env( codex_home.path(), &[ - (CODEX_EXEC_SERVER_URL_ENV_VAR, Some("ws://127.0.0.1:1")), + ( + CODEX_EXEC_SERVER_URL_ENV_VAR, + Some(exec_server.websocket_url.as_str()), + ), ( CODEX_EXEC_SERVER_CWD_ENV_VAR, Some(remote_default_cwd.as_str()), @@ -241,6 +252,7 @@ async fn thread_start_explicit_cwd_overrides_remote_default_cwd() -> Result<()> let codex_home = TempDir::new()?; create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?; + let exec_server = TestExecServer::spawn().await?; let remote_default_cwd = TempDir::new()?; let remote_default_cwd = remote_default_cwd.path().to_string_lossy().into_owned(); @@ -250,7 +262,10 @@ async fn thread_start_explicit_cwd_overrides_remote_default_cwd() -> Result<()> let mut mcp = McpProcess::new_with_env( codex_home.path(), &[ - (CODEX_EXEC_SERVER_URL_ENV_VAR, Some("ws://127.0.0.1:1")), + ( + CODEX_EXEC_SERVER_URL_ENV_VAR, + Some(exec_server.websocket_url.as_str()), + ), ( CODEX_EXEC_SERVER_CWD_ENV_VAR, Some(remote_default_cwd.as_str()), @@ -407,6 +422,64 @@ fn normalize_path_for_comparison(path: impl AsRef) -> PathBuf { path.as_ref().to_path_buf() } +struct TestExecServer { + websocket_url: String, + task: JoinHandle<()>, +} + +impl TestExecServer { + async fn spawn() -> Result { + let bind_addr = TcpListener::bind("127.0.0.1:0")?.local_addr()?; + let websocket_url = format!("ws://{bind_addr}"); + let runtime_paths = ExecServerRuntimePaths::new( + std::env::current_exe()?, + /*codex_linux_sandbox_exe*/ None, + )?; + let listen_url = websocket_url.clone(); + let task = tokio::spawn(async move { + codex_exec_server::run_main(&listen_url, runtime_paths) + .await + .expect("test exec-server should run"); + }); + wait_for_exec_server(websocket_url.as_str()).await?; + Ok(Self { + websocket_url, + task, + }) + } +} + +impl Drop for TestExecServer { + fn drop(&mut self) { + self.task.abort(); + } +} + +async fn wait_for_exec_server(websocket_url: &str) -> Result<()> { + let deadline = Instant::now() + DEFAULT_READ_TIMEOUT; + loop { + match connect_async(websocket_url).await { + Ok((mut websocket, _)) => { + websocket.close(None).await?; + return Ok(()); + } + Err(err) if Instant::now() < deadline => { + let is_connection_refused = matches!( + err, + tokio_tungstenite::tungstenite::Error::Io(ref io_err) + if io_err.kind() == std::io::ErrorKind::ConnectionRefused + ); + if is_connection_refused { + sleep(EXEC_SERVER_CONNECT_RETRY_INTERVAL).await; + continue; + } + return Err(err.into()); + } + Err(err) => return Err(err.into()), + } + } +} + #[tokio::test] async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await;