mirror of
https://github.com/openai/codex.git
synced 2026-05-04 19:36:45 +00:00
Stabilize realtime startup context tests (#13876)
## What changed - The realtime startup-context tests no longer assume the interesting websocket payload is always `connection 1 / request 0`. - Instead, they now wait for the first outbound websocket request that actually carries `session.instructions`, regardless of which websocket connection won the accept-order race on the runner. - The env-key fallback test stays serialized because it mutates process environment. ## Why this fixes the flake - The old test synchronized on the mirrored `session.updated` client event and then inspected a fixed websocket slot. - On CI, the response websocket and the realtime websocket can race each other during startup. When the response websocket wins that race, the fixed slot can contain `response.create` instead of the startup-context-bearing `session.update` request the test actually cares about. - That made the test fail nondeterministically by inspecting the wrong request, or by timing out waiting on a secondary event even though the real outbound request path was correct. - Waiting directly on the first request whose payload includes `session.instructions` removes both ordering assumptions and makes the assertion line up with the actual contract under test. - Separately, serializing the environment-mutating fallback case prevents unrelated tests from seeing partially updated auth state. ## Scope - Test-only change.
This commit is contained in:
@@ -29,6 +29,7 @@ use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use serial_test::serial;
|
||||
use std::ffi::OsString;
|
||||
use std::fs;
|
||||
use std::time::Duration;
|
||||
@@ -37,7 +38,6 @@ use tokio::sync::oneshot;
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const MEMORY_PROMPT_PHRASE: &str =
|
||||
"You have access to a memory folder with guidance from prior runs.";
|
||||
|
||||
fn websocket_request_text(
|
||||
request: &core_test_support::responses::WebSocketRequest,
|
||||
) -> Option<String> {
|
||||
@@ -54,6 +54,33 @@ fn websocket_request_instructions(
|
||||
.map(str::to_owned)
|
||||
}
|
||||
|
||||
async fn wait_for_matching_websocket_request<F>(
|
||||
server: &core_test_support::responses::WebSocketTestServer,
|
||||
description: &str,
|
||||
predicate: F,
|
||||
) -> core_test_support::responses::WebSocketRequest
|
||||
where
|
||||
F: Fn(&core_test_support::responses::WebSocketRequest) -> bool,
|
||||
{
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
|
||||
loop {
|
||||
if let Some(request) = server
|
||||
.connections()
|
||||
.iter()
|
||||
.flat_map(|connection| connection.iter())
|
||||
.find(|request| predicate(request))
|
||||
.cloned()
|
||||
{
|
||||
return request;
|
||||
}
|
||||
|
||||
assert!(
|
||||
tokio::time::Instant::now() < deadline,
|
||||
"timed out waiting for {description}"
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
async fn seed_recent_thread(
|
||||
test: &TestCodex,
|
||||
title: &str,
|
||||
@@ -230,6 +257,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[serial(openai_api_key_env)]
|
||||
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -632,21 +660,23 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
|
||||
async fn conversation_uses_experimental_realtime_ws_startup_context_override() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_custom_context", "instructions": "prompt from config" }
|
||||
})]],
|
||||
])
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_custom_context", "instructions": "prompt from config" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
||||
config.experimental_realtime_ws_startup_context =
|
||||
Some("custom startup context".to_string());
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
||||
config.experimental_realtime_ws_startup_context =
|
||||
Some("custom startup context".to_string());
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
seed_recent_thread(
|
||||
&test,
|
||||
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
||||
@@ -656,7 +686,11 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
.await?;
|
||||
fs::create_dir_all(test.workspace_path("docs"))?;
|
||||
fs::write(test.workspace_path("README.md"), "workspace marker")?;
|
||||
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
|
||||
assert!(
|
||||
startup_server
|
||||
.wait_for_handshakes(1, Duration::from_secs(2))
|
||||
.await
|
||||
);
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
@@ -665,17 +699,12 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_custom_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let instructions = websocket_request_instructions(&startup_context_request)
|
||||
.expect("custom startup context request should contain instructions");
|
||||
|
||||
@@ -683,7 +712,8 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(!instructions.contains("## Machine / Workspace Map"));
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -691,20 +721,22 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
async fn conversation_disables_realtime_startup_context_with_empty_override() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_no_context", "instructions": "prompt from config" }
|
||||
})]],
|
||||
])
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_no_context", "instructions": "prompt from config" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
||||
config.experimental_realtime_ws_startup_context = Some(String::new());
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
||||
config.experimental_realtime_ws_startup_context = Some(String::new());
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
seed_recent_thread(
|
||||
&test,
|
||||
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
||||
@@ -714,7 +746,11 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
.await?;
|
||||
fs::create_dir_all(test.workspace_path("docs"))?;
|
||||
fs::write(test.workspace_path("README.md"), "workspace marker")?;
|
||||
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
|
||||
assert!(
|
||||
startup_server
|
||||
.wait_for_handshakes(1, Duration::from_secs(2))
|
||||
.await
|
||||
);
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
@@ -723,17 +759,12 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_no_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"startup context disable request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let instructions = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context disable request should contain instructions");
|
||||
|
||||
@@ -741,7 +772,8 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(!instructions.contains("## Machine / Workspace Map"));
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -749,17 +781,20 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
async fn conversation_start_injects_startup_context_from_thread_history() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_context", "instructions": "backend prompt" }
|
||||
})]],
|
||||
])
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_context", "instructions": "backend prompt" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
seed_recent_thread(
|
||||
&test,
|
||||
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
||||
@@ -777,17 +812,12 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
@@ -802,7 +832,8 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
assert!(startup_context.contains("README.md"));
|
||||
assert!(!startup_context.contains(MEMORY_PROMPT_PHRASE));
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -810,17 +841,20 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_workspace", "instructions": "backend prompt" }
|
||||
})]],
|
||||
])
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_workspace", "instructions": "backend prompt" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
fs::create_dir_all(test.workspace_path("codex-rs/core"))?;
|
||||
fs::write(test.workspace_path("notes.txt"), "workspace marker")?;
|
||||
|
||||
@@ -831,17 +865,12 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_workspace" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"workspace-map startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
@@ -850,7 +879,8 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
assert!(startup_context.contains("notes.txt"));
|
||||
assert!(startup_context.contains("codex-rs/"));
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -858,21 +888,24 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
async fn conversation_startup_context_is_truncated_and_sent_once_per_start() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![
|
||||
vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_truncated", "instructions": "backend prompt" }
|
||||
})],
|
||||
vec![],
|
||||
vec![
|
||||
vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_truncated", "instructions": "backend prompt" }
|
||||
})],
|
||||
vec![],
|
||||
],
|
||||
])
|
||||
]])
|
||||
.await;
|
||||
|
||||
let oversized_summary = "recent work ".repeat(3_500);
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
seed_recent_thread(&test, &oversized_summary, "summary", "oversized").await?;
|
||||
fs::write(test.workspace_path("marker.txt"), "marker")?;
|
||||
|
||||
@@ -883,17 +916,12 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_truncated" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"truncated startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
@@ -905,13 +933,19 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let explicit_text_request = server.wait_for_request(1, 1).await;
|
||||
let explicit_text_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"explicit realtime text request",
|
||||
|request| websocket_request_text(request).as_deref() == Some("hello"),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(
|
||||
websocket_request_text(&explicit_text_request),
|
||||
Some("hello".to_string())
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user