[codex] Wait for MCP readiness in core integration tests (#24964)

Ensures MCP-backed `codex-core` integration tests exercise initialized
servers instead of racing server startup.

I've been idly investigating a few flakes and the failure modes are much
more confusing when a tool call fails because of a failed server start
than when the failed server start causes the test to fail directly.
This commit is contained in:
Adam Perry @ OpenAI
2026-05-29 10:22:27 -07:00
committed by GitHub
parent e29bbb5368
commit 3e666dd32a
8 changed files with 73 additions and 85 deletions

View File

@@ -248,6 +248,39 @@ where
wait_for_event_with_timeout(codex, predicate, Duration::from_secs(1)).await
}
/// Waits for a configured MCP server to finish startup and requires it to be ready.
pub async fn wait_for_mcp_server(codex: &CodexThread, server_name: &str) -> anyhow::Result<()> {
use codex_protocol::protocol::EventMsg;
// Wait for the startup summary regardless of outcome, then interpret the
// requested server's ready, failed, or cancelled entry below.
let summary = loop {
let event = codex
.next_event()
.await
.expect("stream ended unexpectedly while waiting for MCP startup");
if let EventMsg::McpStartupComplete(summary) = event.msg {
break summary;
}
};
if let Some(failure) = summary
.failed
.iter()
.find(|failure| failure.server == server_name)
{
let error = &failure.error;
anyhow::bail!("MCP server {server_name} failed to start: {error}");
}
if summary.cancelled.iter().any(|server| server == server_name) {
anyhow::bail!("MCP server {server_name} startup was cancelled");
}
assert!(
summary.ready.iter().any(|server| server == server_name),
"expected MCP server {server_name} to be ready; startup summary: {summary:?}"
);
Ok(())
}
pub async fn submit_thread_settings(
codex: &CodexThread,
thread_settings: codex_protocol::protocol::ThreadSettingsOverrides,

View File

@@ -38,6 +38,7 @@ use core_test_support::test_codex::test_codex;
use core_test_support::test_codex::turn_permission_fields;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use core_test_support::wait_for_mcp_server;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::collections::HashMap;
@@ -283,6 +284,7 @@ async fn run_code_mode_turn_with_rmcp_config(
.expect("test mcp servers should accept any configuration");
});
let test = builder.build(server).await?;
wait_for_mcp_server(&test.codex, "rmcp").await?;
responses::mount_sse_once(
server,

View File

@@ -22,6 +22,7 @@ use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::stdio_server_bin;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_mcp_server;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
@@ -284,6 +285,7 @@ async fn pre_tool_use_blocks_mcp_tool_before_execution(
})
.build(&server)
.await?;
wait_for_mcp_server(&test.codex, RMCP_SERVER).await?;
test.submit_turn("call the rmcp echo tool with the MCP pre hook")
.await?;
@@ -375,6 +377,7 @@ async fn pre_tool_use_rewrites_mcp_tool_before_execution() -> Result<()> {
})
.build(&server)
.await?;
wait_for_mcp_server(&test.codex, RMCP_SERVER).await?;
test.submit_turn("call the rmcp echo tool with the MCP pre hook rewrite")
.await?;
@@ -471,6 +474,7 @@ async fn post_tool_use_records_mcp_tool_payload_and_context(
})
.build(&server)
.await?;
wait_for_mcp_server(&test.codex, RMCP_SERVER).await?;
test.submit_turn("call the rmcp echo tool with the MCP post hook")
.await?;

View File

@@ -6,7 +6,6 @@ use std::time::Duration;
use std::time::Instant;
use anyhow::Result;
use anyhow::bail;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_protocol::protocol::EventMsg;
@@ -21,7 +20,7 @@ use core_test_support::skip_if_no_network;
use core_test_support::stdio_server_bin;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use core_test_support::wait_for_mcp_server;
use tempfile::TempDir;
use wiremock::MockServer;
@@ -140,45 +139,6 @@ async fn build_apps_enabled_plugin_test_codex(
.codex)
}
async fn wait_for_sample_mcp_ready(codex: &codex_core::CodexThread) -> Result<()> {
let startup_event = wait_for_event_with_timeout(
codex,
|ev| match ev {
EventMsg::McpStartupComplete(summary) => {
summary.ready.iter().any(|server| server == "sample")
|| summary
.failed
.iter()
.any(|failure| failure.server == "sample")
|| summary.cancelled.iter().any(|server| server == "sample")
}
_ => false,
},
Duration::from_secs(70),
)
.await;
let EventMsg::McpStartupComplete(startup) = startup_event else {
unreachable!("event guard guarantees McpStartupComplete");
};
if let Some(failure) = startup
.failed
.iter()
.find(|failure| failure.server == "sample")
{
let error = &failure.error;
bail!("plugin MCP server failed to start: {error}");
}
if startup.cancelled.iter().any(|server| server == "sample") {
bail!("plugin MCP server startup was cancelled");
}
assert!(
startup.ready.iter().any(|server| server == "sample"),
"expected plugin MCP server to be ready; startup summary: {startup:?}"
);
Ok(())
}
fn tool_names(body: &serde_json::Value) -> Vec<String> {
body.get("tools")
.and_then(serde_json::Value::as_array)
@@ -296,7 +256,7 @@ async fn explicit_plugin_mentions_inject_plugin_guidance() -> Result<()> {
let codex =
build_apps_enabled_plugin_test_codex(&server, codex_home, apps_server.chatgpt_base_url)
.await?;
wait_for_sample_mcp_ready(&codex).await?;
wait_for_mcp_server(&codex, "sample").await?;
codex
.submit(Op::UserInput {

View File

@@ -55,7 +55,7 @@ use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::test_codex::turn_permission_fields;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use core_test_support::wait_for_mcp_server;
use reqwest::Client;
use reqwest::StatusCode;
use serde_json::Value;
@@ -268,44 +268,6 @@ fn copy_binary_to_remote_env(
Ok(remote_path)
}
async fn wait_for_mcp_server(fixture: &TestCodex, server_name: &str) -> anyhow::Result<()> {
let startup_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| match ev {
EventMsg::McpStartupComplete(summary) => {
summary.ready.iter().any(|server| server == server_name)
|| summary
.failed
.iter()
.any(|failure| failure.server == server_name)
|| summary.cancelled.iter().any(|server| server == server_name)
}
_ => false,
},
Duration::from_secs(70),
)
.await;
let EventMsg::McpStartupComplete(summary) = startup_event else {
unreachable!("event guard guarantees McpStartupComplete");
};
if let Some(failure) = summary
.failed
.iter()
.find(|failure| failure.server == server_name)
{
let error = &failure.error;
anyhow::bail!("MCP server {server_name} failed to start: {error}");
}
if summary.cancelled.iter().any(|server| server == server_name) {
anyhow::bail!("MCP server {server_name} startup was cancelled");
}
ensure!(
summary.ready.iter().any(|server| server == server_name),
"expected MCP server {server_name} to be ready; startup summary: {summary:?}"
);
Ok(())
}
struct TestMcpServerOptions {
environment_id: String,
supports_parallel_tool_calls: bool,
@@ -517,6 +479,8 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> {
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.codex
.submit(read_only_user_turn(&fixture, "call the rmcp echo tool"))
@@ -640,6 +604,7 @@ async fn stdio_server_uses_configured_cwd_before_runtime_fallback() -> anyhow::R
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
let expected_cwd = expected_cwd
.lock()
@@ -702,6 +667,7 @@ async fn local_stdio_server_uses_runtime_fallback_cwd_when_config_omits_cwd() ->
})
.build(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
let expected_cwd = expected_cwd
.lock()
@@ -760,7 +726,7 @@ async fn stdio_mcp_tool_call_includes_sandbox_state_meta() -> anyhow::Result<()>
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture, server_name).await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.submit_turn_with_permission_profile(
@@ -857,6 +823,8 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.codex
// Keep this baseline on the mutable sync tool so read-only hints do not
@@ -991,6 +959,8 @@ async fn stdio_mcp_read_only_tool_calls_run_concurrently_without_server_opt_in()
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.codex
.submit(read_only_user_turn(
@@ -1073,6 +1043,8 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.codex
// Exercise the server opt-in with the mutable sync tool rather than the
@@ -1157,7 +1129,7 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture, server_name).await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.codex
@@ -1290,7 +1262,7 @@ async fn stdio_image_responses_preserve_original_detail_metadata() -> anyhow::Re
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture, server_name).await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.codex
@@ -1427,6 +1399,7 @@ async fn stdio_image_responses_are_sanitized_for_text_only_model() -> anyhow::Re
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.thread_manager
@@ -1529,6 +1502,8 @@ async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> {
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.codex
.submit(read_only_user_turn(&fixture, "call the rmcp echo tool"))
@@ -1647,6 +1622,7 @@ async fn stdio_server_propagates_explicit_local_env_var_source() -> anyhow::Resu
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.codex
@@ -1739,6 +1715,7 @@ async fn remote_stdio_env_var_source_does_not_copy_local_env() -> anyhow::Result
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.codex
@@ -1922,6 +1899,8 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> {
})
.build_with_remote_env(&server)
.await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
// Phase 4: submit the user turn that should trigger the MCP tool call.
fixture
.codex
@@ -2110,7 +2089,7 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
.await?;
// Phase 5: wait for MCP startup before the turn is submitted, which keeps
// failures tied to server startup/discovery.
wait_for_mcp_server(&fixture, server_name).await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
// Phase 6: submit the user turn that should invoke the OAuth-backed tool.
fixture

View File

@@ -47,6 +47,7 @@ use core_test_support::skip_if_no_network;
use core_test_support::stdio_server_bin;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_mcp_server;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
@@ -1114,6 +1115,7 @@ async fn tool_search_indexes_only_enabled_non_app_mcp_tools() -> Result<()> {
.expect("test mcp servers should accept any configuration");
});
let test = builder.build(&server).await?;
wait_for_mcp_server(&test.codex, "rmcp").await?;
test.submit_turn_with_approval_and_permission_profile(
"Find the rmcp echo and image tools.",
@@ -1243,6 +1245,7 @@ async fn tool_search_surfaced_mcp_tool_errors_are_returned_to_model() -> Result<
.expect("test mcp servers should accept any configuration");
});
let test = builder.build(&server).await?;
wait_for_mcp_server(&test.codex, "rmcp").await?;
test.codex
.submit(Op::UserInput {
@@ -1391,6 +1394,7 @@ async fn tool_search_uses_non_app_mcp_server_instructions_as_namespace_descripti
.expect("test mcp servers should accept any configuration");
});
let test = builder.build(&server).await?;
wait_for_mcp_server(&test.codex, "rmcp").await?;
test.submit_turn_with_approval_and_permission_profile(
"Find the rmcp echo tool.",

View File

@@ -29,6 +29,7 @@ use core_test_support::test_codex::test_codex;
use core_test_support::test_codex::turn_permission_fields;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use core_test_support::wait_for_mcp_server;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::collections::HashMap;
@@ -460,6 +461,7 @@ async fn mcp_call_marks_thread_memory_mode_polluted_when_configured() -> Result<
.expect("test mcp servers should accept any configuration");
});
let test = builder.build(&server).await?;
wait_for_mcp_server(&test.codex, server_name).await?;
let db = test.codex.state_db().expect("state db enabled");
let thread_id = test.session_configured.thread_id;
let cwd = test.cwd_path().to_path_buf();

View File

@@ -23,6 +23,7 @@ use core_test_support::skip_if_no_network;
use core_test_support::stdio_server_bin;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_mcp_server;
use serde_json::Value;
use serde_json::json;
use std::collections::HashMap;
@@ -410,6 +411,7 @@ async fn mcp_tool_call_output_exceeds_limit_truncated_for_model() -> Result<()>
config.tool_output_token_limit = Some(500);
});
let fixture = builder.build(&server).await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.submit_turn_with_permission_profile(
@@ -509,6 +511,7 @@ async fn mcp_image_output_preserves_image_and_no_text_summary() -> Result<()> {
.expect("test mcp servers should accept any configuration");
});
let fixture = builder.build(&server).await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
let session_model = fixture.session_configured.model.clone();
let permission_profile = PermissionProfile::read_only();
let sandbox_policy = permission_profile.to_legacy_sandbox_policy(fixture.cwd.path())?;
@@ -798,6 +801,7 @@ async fn mcp_tool_call_output_not_truncated_with_custom_limit() -> Result<()> {
.expect("test mcp servers should accept any configuration");
});
let fixture = builder.build(&server).await?;
wait_for_mcp_server(&fixture.codex, server_name).await?;
fixture
.submit_turn_with_permission_profile(