Send events to realtime api (#12423)

- Send assistant messages, ExecCommandBegin, and
PatchApplyBegin/PatchApplyEnd
This commit is contained in:
Ahmed Ibrahim
2026-02-21 23:24:51 -08:00
committed by GitHub
parent 85b00ae8de
commit 55fc075723
2 changed files with 146 additions and 0 deletions

View File

@@ -9,6 +9,8 @@ use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeEvent;
use core_test_support::responses;
use core_test_support::responses::start_mock_server;
use core_test_support::responses::start_websocket_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
@@ -459,3 +461,82 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let _response_mock = responses::mount_sse_once(
&api_server,
responses::sse(vec![
responses::ev_response_created("resp_1"),
responses::ev_assistant_message("msg_1", "assistant says hi"),
responses::ev_completed("resp_1"),
]),
)
.await;
let realtime_server = start_websocket_server(vec![vec![
vec![json!({
"type": "session.created",
"session": { "id": "sess_1" }
})],
vec![],
]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionCreated { session_id },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_created, "sess_1");
test.submit_turn("hello").await?;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < deadline {
let connections = realtime_server.connections();
if connections.len() == 1 && connections[0].len() >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let realtime_connections = realtime_server.connections();
assert_eq!(realtime_connections.len(), 1);
assert_eq!(realtime_connections[0].len(), 2);
assert_eq!(
realtime_connections[0][0].body_json()["type"].as_str(),
Some("session.create")
);
assert_eq!(
realtime_connections[0][1].body_json()["type"].as_str(),
Some("conversation.item.create")
);
assert_eq!(
realtime_connections[0][1].body_json()["item"]["content"][0]["text"].as_str(),
Some("assistant says hi")
);
realtime_server.shutdown().await;
Ok(())
}