mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
codex: fix realtime startup websocket test race
This commit is contained in:
@@ -2,7 +2,6 @@ use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::auth::OPENAI_API_KEY_ENV_VAR;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
@@ -39,9 +38,6 @@ use tokio::sync::oneshot;
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const MEMORY_PROMPT_PHRASE: &str =
|
||||
"You have access to a memory folder with guidance from prior runs.";
|
||||
const REALTIME_NORMAL_CLOSE_MESSAGE: &str =
|
||||
"failed to send realtime request: Connection closed normally";
|
||||
|
||||
fn websocket_request_text(
|
||||
request: &core_test_support::responses::WebSocketRequest,
|
||||
) -> Option<String> {
|
||||
@@ -58,32 +54,33 @@ fn websocket_request_instructions(
|
||||
.map(str::to_owned)
|
||||
}
|
||||
|
||||
async fn wait_for_session_updated_allowing_clean_close(
|
||||
codex: &CodexThread,
|
||||
expected_session_id: &str,
|
||||
) {
|
||||
tokio::time::timeout(Duration::from_secs(3), async {
|
||||
loop {
|
||||
let event = match codex.next_event().await {
|
||||
Ok(event) => event,
|
||||
Err(err) => panic!("realtime conversation event should arrive: {err:?}"),
|
||||
};
|
||||
match event.msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == expected_session_id => return,
|
||||
EventMsg::Error(err) if err.message.contains(REALTIME_NORMAL_CLOSE_MESSAGE) => {}
|
||||
EventMsg::Error(err) => panic!("conversation start failed: {err:?}"),
|
||||
_ => {}
|
||||
}
|
||||
async fn wait_for_matching_websocket_request<F>(
|
||||
server: &core_test_support::responses::WebSocketTestServer,
|
||||
description: &str,
|
||||
predicate: F,
|
||||
) -> core_test_support::responses::WebSocketRequest
|
||||
where
|
||||
F: Fn(&core_test_support::responses::WebSocketRequest) -> bool,
|
||||
{
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
|
||||
loop {
|
||||
if let Some(request) = server
|
||||
.connections()
|
||||
.iter()
|
||||
.flat_map(|connection| connection.iter())
|
||||
.find(|request| predicate(request))
|
||||
.cloned()
|
||||
{
|
||||
return request;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| {
|
||||
panic!("timeout waiting for session.updated event for {expected_session_id}")
|
||||
});
|
||||
}
|
||||
|
||||
assert!(
|
||||
tokio::time::Instant::now() < deadline,
|
||||
"timed out waiting for {description}"
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
async fn seed_recent_thread(
|
||||
test: &TestCodex,
|
||||
title: &str,
|
||||
@@ -696,9 +693,12 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_custom_context").await;
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&server,
|
||||
"startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let instructions = websocket_request_instructions(&startup_context_request)
|
||||
.expect("custom startup context request should contain instructions");
|
||||
|
||||
@@ -746,9 +746,12 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_no_context").await;
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&server,
|
||||
"startup context disable request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let instructions = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context disable request should contain instructions");
|
||||
|
||||
@@ -792,9 +795,12 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_context").await;
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&server,
|
||||
"startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
@@ -838,9 +844,12 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_workspace").await;
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&server,
|
||||
"workspace-map startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
@@ -882,17 +891,12 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_truncated" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&server,
|
||||
"truncated startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
@@ -904,7 +908,11 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let explicit_text_request = server.wait_for_request(1, 1).await;
|
||||
let explicit_text_request =
|
||||
wait_for_matching_websocket_request(&server, "explicit realtime text request", |request| {
|
||||
websocket_request_text(request).as_deref() == Some("hello")
|
||||
})
|
||||
.await;
|
||||
assert_eq!(
|
||||
websocket_request_text(&explicit_text_request),
|
||||
Some("hello".to_string())
|
||||
|
||||
Reference in New Issue
Block a user