diff --git a/codex-rs/core/tests/common/lib.rs b/codex-rs/core/tests/common/lib.rs index 194fb2ae0a..3d7fd21228 100644 --- a/codex-rs/core/tests/common/lib.rs +++ b/codex-rs/core/tests/common/lib.rs @@ -248,6 +248,39 @@ where wait_for_event_with_timeout(codex, predicate, Duration::from_secs(1)).await } +/// Waits for a configured MCP server to finish startup and requires it to be ready. +pub async fn wait_for_mcp_server(codex: &CodexThread, server_name: &str) -> anyhow::Result<()> { + use codex_protocol::protocol::EventMsg; + + // Wait for the startup summary regardless of outcome, then interpret the + // requested server's ready, failed, or cancelled entry below. + let summary = loop { + let event = codex + .next_event() + .await + .expect("stream ended unexpectedly while waiting for MCP startup"); + if let EventMsg::McpStartupComplete(summary) = event.msg { + break summary; + } + }; + if let Some(failure) = summary + .failed + .iter() + .find(|failure| failure.server == server_name) + { + let error = &failure.error; + anyhow::bail!("MCP server {server_name} failed to start: {error}"); + } + if summary.cancelled.iter().any(|server| server == server_name) { + anyhow::bail!("MCP server {server_name} startup was cancelled"); + } + assert!( + summary.ready.iter().any(|server| server == server_name), + "expected MCP server {server_name} to be ready; startup summary: {summary:?}" + ); + Ok(()) +} + pub async fn submit_thread_settings( codex: &CodexThread, thread_settings: codex_protocol::protocol::ThreadSettingsOverrides, diff --git a/codex-rs/core/tests/suite/code_mode.rs b/codex-rs/core/tests/suite/code_mode.rs index cbf6993bc9..9881869313 100644 --- a/codex-rs/core/tests/suite/code_mode.rs +++ b/codex-rs/core/tests/suite/code_mode.rs @@ -38,6 +38,7 @@ use core_test_support::test_codex::test_codex; use core_test_support::test_codex::turn_permission_fields; use core_test_support::wait_for_event; use core_test_support::wait_for_event_match; +use core_test_support::wait_for_mcp_server; use pretty_assertions::assert_eq; use serde_json::Value; use std::collections::HashMap; @@ -283,6 +284,7 @@ async fn run_code_mode_turn_with_rmcp_config( .expect("test mcp servers should accept any configuration"); }); let test = builder.build(server).await?; + wait_for_mcp_server(&test.codex, "rmcp").await?; responses::mount_sse_once( server, diff --git a/codex-rs/core/tests/suite/hooks_mcp.rs b/codex-rs/core/tests/suite/hooks_mcp.rs index 940db7cf4c..00f1546a7c 100644 --- a/codex-rs/core/tests/suite/hooks_mcp.rs +++ b/codex-rs/core/tests/suite/hooks_mcp.rs @@ -22,6 +22,7 @@ use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::stdio_server_bin; use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_mcp_server; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; @@ -284,6 +285,7 @@ async fn pre_tool_use_blocks_mcp_tool_before_execution( }) .build(&server) .await?; + wait_for_mcp_server(&test.codex, RMCP_SERVER).await?; test.submit_turn("call the rmcp echo tool with the MCP pre hook") .await?; @@ -375,6 +377,7 @@ async fn pre_tool_use_rewrites_mcp_tool_before_execution() -> Result<()> { }) .build(&server) .await?; + wait_for_mcp_server(&test.codex, RMCP_SERVER).await?; test.submit_turn("call the rmcp echo tool with the MCP pre hook rewrite") .await?; @@ -471,6 +474,7 @@ async fn post_tool_use_records_mcp_tool_payload_and_context( }) .build(&server) .await?; + wait_for_mcp_server(&test.codex, RMCP_SERVER).await?; test.submit_turn("call the rmcp echo tool with the MCP post hook") .await?; diff --git a/codex-rs/core/tests/suite/plugins.rs b/codex-rs/core/tests/suite/plugins.rs index b89b611a83..d7d7c7e8f5 100644 --- a/codex-rs/core/tests/suite/plugins.rs +++ b/codex-rs/core/tests/suite/plugins.rs @@ -6,7 +6,6 @@ use std::time::Duration; use std::time::Instant; use anyhow::Result; -use anyhow::bail; use codex_features::Feature; use codex_login::CodexAuth; use codex_protocol::protocol::EventMsg; @@ -21,7 +20,7 @@ use core_test_support::skip_if_no_network; use core_test_support::stdio_server_bin; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; -use core_test_support::wait_for_event_with_timeout; +use core_test_support::wait_for_mcp_server; use tempfile::TempDir; use wiremock::MockServer; @@ -140,45 +139,6 @@ async fn build_apps_enabled_plugin_test_codex( .codex) } -async fn wait_for_sample_mcp_ready(codex: &codex_core::CodexThread) -> Result<()> { - let startup_event = wait_for_event_with_timeout( - codex, - |ev| match ev { - EventMsg::McpStartupComplete(summary) => { - summary.ready.iter().any(|server| server == "sample") - || summary - .failed - .iter() - .any(|failure| failure.server == "sample") - || summary.cancelled.iter().any(|server| server == "sample") - } - _ => false, - }, - Duration::from_secs(70), - ) - .await; - let EventMsg::McpStartupComplete(startup) = startup_event else { - unreachable!("event guard guarantees McpStartupComplete"); - }; - if let Some(failure) = startup - .failed - .iter() - .find(|failure| failure.server == "sample") - { - let error = &failure.error; - bail!("plugin MCP server failed to start: {error}"); - } - if startup.cancelled.iter().any(|server| server == "sample") { - bail!("plugin MCP server startup was cancelled"); - } - assert!( - startup.ready.iter().any(|server| server == "sample"), - "expected plugin MCP server to be ready; startup summary: {startup:?}" - ); - - Ok(()) -} - fn tool_names(body: &serde_json::Value) -> Vec { body.get("tools") .and_then(serde_json::Value::as_array) @@ -296,7 +256,7 @@ async fn explicit_plugin_mentions_inject_plugin_guidance() -> Result<()> { let codex = build_apps_enabled_plugin_test_codex(&server, codex_home, apps_server.chatgpt_base_url) .await?; - wait_for_sample_mcp_ready(&codex).await?; + wait_for_mcp_server(&codex, "sample").await?; codex .submit(Op::UserInput { diff --git a/codex-rs/core/tests/suite/rmcp_client.rs b/codex-rs/core/tests/suite/rmcp_client.rs index 8aa01a4a11..0d669e5dd7 100644 --- a/codex-rs/core/tests/suite/rmcp_client.rs +++ b/codex-rs/core/tests/suite/rmcp_client.rs @@ -55,7 +55,7 @@ use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; use core_test_support::test_codex::turn_permission_fields; use core_test_support::wait_for_event; -use core_test_support::wait_for_event_with_timeout; +use core_test_support::wait_for_mcp_server; use reqwest::Client; use reqwest::StatusCode; use serde_json::Value; @@ -268,44 +268,6 @@ fn copy_binary_to_remote_env( Ok(remote_path) } -async fn wait_for_mcp_server(fixture: &TestCodex, server_name: &str) -> anyhow::Result<()> { - let startup_event = wait_for_event_with_timeout( - &fixture.codex, - |ev| match ev { - EventMsg::McpStartupComplete(summary) => { - summary.ready.iter().any(|server| server == server_name) - || summary - .failed - .iter() - .any(|failure| failure.server == server_name) - || summary.cancelled.iter().any(|server| server == server_name) - } - _ => false, - }, - Duration::from_secs(70), - ) - .await; - let EventMsg::McpStartupComplete(summary) = startup_event else { - unreachable!("event guard guarantees McpStartupComplete"); - }; - if let Some(failure) = summary - .failed - .iter() - .find(|failure| failure.server == server_name) - { - let error = &failure.error; - anyhow::bail!("MCP server {server_name} failed to start: {error}"); - } - if summary.cancelled.iter().any(|server| server == server_name) { - anyhow::bail!("MCP server {server_name} startup was cancelled"); - } - ensure!( - summary.ready.iter().any(|server| server == server_name), - "expected MCP server {server_name} to be ready; startup summary: {summary:?}" - ); - Ok(()) -} - struct TestMcpServerOptions { environment_id: String, supports_parallel_tool_calls: bool, @@ -517,6 +479,8 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> { }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; + fixture .codex .submit(read_only_user_turn(&fixture, "call the rmcp echo tool")) @@ -640,6 +604,7 @@ async fn stdio_server_uses_configured_cwd_before_runtime_fallback() -> anyhow::R }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; let expected_cwd = expected_cwd .lock() @@ -702,6 +667,7 @@ async fn local_stdio_server_uses_runtime_fallback_cwd_when_config_omits_cwd() -> }) .build(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; let expected_cwd = expected_cwd .lock() @@ -760,7 +726,7 @@ async fn stdio_mcp_tool_call_includes_sandbox_state_meta() -> anyhow::Result<()> .build_with_remote_env(&server) .await?; - wait_for_mcp_server(&fixture, server_name).await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; fixture .submit_turn_with_permission_profile( @@ -857,6 +823,8 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow:: }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; + fixture .codex // Keep this baseline on the mutable sync tool so read-only hints do not @@ -991,6 +959,8 @@ async fn stdio_mcp_read_only_tool_calls_run_concurrently_without_server_opt_in() }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; + fixture .codex .submit(read_only_user_turn( @@ -1073,6 +1043,8 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; + fixture .codex // Exercise the server opt-in with the mutable sync tool rather than the @@ -1157,7 +1129,7 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> { }) .build_with_remote_env(&server) .await?; - wait_for_mcp_server(&fixture, server_name).await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; fixture .codex @@ -1290,7 +1262,7 @@ async fn stdio_image_responses_preserve_original_detail_metadata() -> anyhow::Re }) .build_with_remote_env(&server) .await?; - wait_for_mcp_server(&fixture, server_name).await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; fixture .codex @@ -1427,6 +1399,7 @@ async fn stdio_image_responses_are_sanitized_for_text_only_model() -> anyhow::Re }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; fixture .thread_manager @@ -1529,6 +1502,8 @@ async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> { }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; + fixture .codex .submit(read_only_user_turn(&fixture, "call the rmcp echo tool")) @@ -1647,6 +1622,7 @@ async fn stdio_server_propagates_explicit_local_env_var_source() -> anyhow::Resu }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; fixture .codex @@ -1739,6 +1715,7 @@ async fn remote_stdio_env_var_source_does_not_copy_local_env() -> anyhow::Result }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; fixture .codex @@ -1922,6 +1899,8 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> { }) .build_with_remote_env(&server) .await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; + // Phase 4: submit the user turn that should trigger the MCP tool call. fixture .codex @@ -2110,7 +2089,7 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> { .await?; // Phase 5: wait for MCP startup before the turn is submitted, which keeps // failures tied to server startup/discovery. - wait_for_mcp_server(&fixture, server_name).await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; // Phase 6: submit the user turn that should invoke the OAuth-backed tool. fixture diff --git a/codex-rs/core/tests/suite/search_tool.rs b/codex-rs/core/tests/suite/search_tool.rs index 0dc956d951..42b1b6bb3f 100644 --- a/codex-rs/core/tests/suite/search_tool.rs +++ b/codex-rs/core/tests/suite/search_tool.rs @@ -47,6 +47,7 @@ use core_test_support::skip_if_no_network; use core_test_support::stdio_server_bin; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; +use core_test_support::wait_for_mcp_server; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; @@ -1114,6 +1115,7 @@ async fn tool_search_indexes_only_enabled_non_app_mcp_tools() -> Result<()> { .expect("test mcp servers should accept any configuration"); }); let test = builder.build(&server).await?; + wait_for_mcp_server(&test.codex, "rmcp").await?; test.submit_turn_with_approval_and_permission_profile( "Find the rmcp echo and image tools.", @@ -1243,6 +1245,7 @@ async fn tool_search_surfaced_mcp_tool_errors_are_returned_to_model() -> Result< .expect("test mcp servers should accept any configuration"); }); let test = builder.build(&server).await?; + wait_for_mcp_server(&test.codex, "rmcp").await?; test.codex .submit(Op::UserInput { @@ -1391,6 +1394,7 @@ async fn tool_search_uses_non_app_mcp_server_instructions_as_namespace_descripti .expect("test mcp servers should accept any configuration"); }); let test = builder.build(&server).await?; + wait_for_mcp_server(&test.codex, "rmcp").await?; test.submit_turn_with_approval_and_permission_profile( "Find the rmcp echo tool.", diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 85c1e08c84..4d9c89638e 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -29,6 +29,7 @@ use core_test_support::test_codex::test_codex; use core_test_support::test_codex::turn_permission_fields; use core_test_support::wait_for_event; use core_test_support::wait_for_event_match; +use core_test_support::wait_for_mcp_server; use pretty_assertions::assert_eq; use serde_json::json; use std::collections::HashMap; @@ -460,6 +461,7 @@ async fn mcp_call_marks_thread_memory_mode_polluted_when_configured() -> Result< .expect("test mcp servers should accept any configuration"); }); let test = builder.build(&server).await?; + wait_for_mcp_server(&test.codex, server_name).await?; let db = test.codex.state_db().expect("state db enabled"); let thread_id = test.session_configured.thread_id; let cwd = test.cwd_path().to_path_buf(); diff --git a/codex-rs/core/tests/suite/truncation.rs b/codex-rs/core/tests/suite/truncation.rs index fe078977ff..db90b663e0 100644 --- a/codex-rs/core/tests/suite/truncation.rs +++ b/codex-rs/core/tests/suite/truncation.rs @@ -23,6 +23,7 @@ use core_test_support::skip_if_no_network; use core_test_support::stdio_server_bin; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; +use core_test_support::wait_for_mcp_server; use serde_json::Value; use serde_json::json; use std::collections::HashMap; @@ -410,6 +411,7 @@ async fn mcp_tool_call_output_exceeds_limit_truncated_for_model() -> Result<()> config.tool_output_token_limit = Some(500); }); let fixture = builder.build(&server).await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; fixture .submit_turn_with_permission_profile( @@ -509,6 +511,7 @@ async fn mcp_image_output_preserves_image_and_no_text_summary() -> Result<()> { .expect("test mcp servers should accept any configuration"); }); let fixture = builder.build(&server).await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; let session_model = fixture.session_configured.model.clone(); let permission_profile = PermissionProfile::read_only(); let sandbox_policy = permission_profile.to_legacy_sandbox_policy(fixture.cwd.path())?; @@ -798,6 +801,7 @@ async fn mcp_tool_call_output_not_truncated_with_custom_limit() -> Result<()> { .expect("test mcp servers should accept any configuration"); }); let fixture = builder.build(&server).await?; + wait_for_mcp_server(&fixture.codex, server_name).await?; fixture .submit_turn_with_permission_profile(