Mirror user text into realtime (#17520)

- Let typed user messages submit while realtime is active and mirror
accepted text into the realtime text stream.
- Add integration coverage and snapshot for outbound realtime text.
This commit is contained in:
Ahmed Ibrahim
2026-04-12 15:03:14 -07:00
committed by GitHub
parent cb870a169a
commit d840b247d7
7 changed files with 200 additions and 102 deletions

View File

@@ -1790,6 +1790,126 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_user_text_turn_is_sent_to_realtime_when_active() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let response_mock = responses::mount_sse_once(
&api_server,
responses::sse(vec![
responses::ev_response_created("resp_user_text"),
responses::ev_assistant_message("msg_user_text", "ack"),
responses::ev_completed("resp_user_text"),
]),
)
.await;
let realtime_server = start_websocket_server(vec![vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_user_text", "instructions": "backend prompt" }
})],
vec![],
]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.experimental_realtime_ws_startup_context = Some(String::new());
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: Some(Some("backend prompt".to_string())),
session_id: None,
transport: None,
voice: None,
}))
.await?;
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_user_text");
let user_text = "typed follow-up for realtime";
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user_text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let realtime_text_request = wait_for_matching_websocket_request(
&realtime_server,
"normal user turn text mirrored to realtime",
|request| websocket_request_text(request).as_deref() == Some(user_text),
)
.await;
let model_user_texts = response_mock.single_request().message_input_texts("user");
assert_eq!(
(
model_user_texts.iter().any(|text| text == user_text),
websocket_request_text(&realtime_text_request),
),
(true, Some(user_text.to_string())),
);
let realtime_response_create = timeout(Duration::from_millis(200), async {
wait_for_matching_websocket_request(
&realtime_server,
"unexpected realtime response request for mirrored user text",
|request| request.body_json()["type"].as_str() == Some("response.create"),
)
.await
})
.await;
assert!(
realtime_response_create.is_err(),
"mirrored user text should not request a realtime response"
);
let realtime_request_body = realtime_text_request.body_json();
let content = &realtime_request_body["item"]["content"][0];
let snapshot = format!(
"type: {}\nitem.type: {}\nitem.role: {}\ncontent[0].type: {}\ncontent[0].text: {}\nresponse.create: {}",
realtime_request_body["type"].as_str().unwrap_or_default(),
realtime_request_body["item"]["type"]
.as_str()
.unwrap_or_default(),
realtime_request_body["item"]["role"]
.as_str()
.unwrap_or_default(),
content["type"].as_str().unwrap_or_default(),
content["text"].as_str().unwrap_or_default(),
realtime_response_create.is_ok(),
);
insta::assert_snapshot!(
"conversation_user_text_turn_is_sent_to_realtime_when_active",
snapshot
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -2767,6 +2887,7 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
"type": "session.updated",
"session": { "id": "sess_steer", "instructions": "backend prompt" }
})],
vec![],
vec![
json!({
"type": "conversation.input_transcript.delta",
@@ -2822,6 +2943,12 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
matches!(event, EventMsg::AgentMessageContentDelta(_))
})
.await;
let _ = wait_for_matching_websocket_request(
&realtime_server,
"first prompt mirrored to realtime",
|request| websocket_request_text(request).as_deref() == Some("first prompt"),
)
.await;
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {