mirror of
https://github.com/openai/codex.git
synced 2026-04-29 17:06:51 +00:00
Update realtime websocket API (#13265)
- migrate the realtime websocket transport to the new session and handoff flow - make the realtime model configurable in config.toml and use API-key auth for the websocket --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::auth::OPENAI_API_KEY_ENV_VAR;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::ConversationAudioParams;
|
||||
use codex_protocol::protocol::ConversationStartParams;
|
||||
@@ -22,6 +24,7 @@ use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::ffi::OsString;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
@@ -33,16 +36,16 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
vec![],
|
||||
vec![
|
||||
vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_1" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_1", "instructions": "backend prompt" }
|
||||
})],
|
||||
vec![],
|
||||
vec![
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"type": "conversation.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
"channels": 1
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
@@ -77,14 +80,14 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
assert!(started.session_id.is_some());
|
||||
|
||||
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(session_created, "sess_1");
|
||||
assert_eq!(session_updated, "sess_1");
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
|
||||
@@ -117,17 +120,29 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
assert_eq!(connection.len(), 3);
|
||||
assert_eq!(
|
||||
connection[0].body_json()["type"].as_str(),
|
||||
Some("session.create")
|
||||
Some("session.update")
|
||||
);
|
||||
assert_eq!(
|
||||
connection[0].body_json()["session"]["conversation_id"]
|
||||
.as_str()
|
||||
.expect("session.create conversation_id"),
|
||||
connection[0].body_json()["session"]["instructions"].as_str(),
|
||||
Some("backend prompt")
|
||||
);
|
||||
assert_eq!(
|
||||
server.handshakes()[1]
|
||||
.header("x-session-id")
|
||||
.expect("session.update x-session-id header"),
|
||||
started
|
||||
.session_id
|
||||
.as_deref()
|
||||
.expect("started session id should be present")
|
||||
);
|
||||
assert_eq!(
|
||||
server.handshakes()[1].header("authorization").as_deref(),
|
||||
Some("Bearer dummy")
|
||||
);
|
||||
assert_eq!(
|
||||
server.handshakes()[1].uri(),
|
||||
"/v1/realtime?intent=quicksilver&model=realtime-test-model"
|
||||
);
|
||||
let mut request_types = [
|
||||
connection[1].body_json()["type"]
|
||||
.as_str()
|
||||
@@ -143,7 +158,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
request_types,
|
||||
[
|
||||
"conversation.item.create".to_string(),
|
||||
"response.input_audio.delta".to_string(),
|
||||
"input_audio_buffer.append".to_string(),
|
||||
]
|
||||
);
|
||||
|
||||
@@ -162,15 +177,74 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let _env_guard = EnvGuard::set(OPENAI_API_KEY_ENV_VAR, "env-realtime-key");
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_env", "instructions": "backend prompt" }
|
||||
})]],
|
||||
])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let started = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
assert!(started.session_id.is_some());
|
||||
|
||||
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(session_updated, "sess_env");
|
||||
|
||||
assert_eq!(
|
||||
server.handshakes()[1].header("authorization").as_deref(),
|
||||
Some("Bearer env-realtime-key")
|
||||
);
|
||||
|
||||
test.codex.submit(Op::RealtimeConversationClose).await?;
|
||||
let _closed = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_transport_close_emits_closed_event() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let session_created = vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_1" }
|
||||
let session_updated = vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_1", "instructions": "backend prompt" }
|
||||
})];
|
||||
let server = start_websocket_server(vec![vec![], vec![session_created]]).await;
|
||||
let server = start_websocket_server(vec![vec![], vec![session_updated]]).await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
@@ -192,14 +266,14 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
assert!(started.session_id.is_some());
|
||||
|
||||
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(session_created, "sess_1");
|
||||
assert_eq!(session_updated, "sess_1");
|
||||
|
||||
let closed = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
||||
@@ -212,6 +286,34 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct EnvGuard {
|
||||
key: &'static str,
|
||||
original: Option<OsString>,
|
||||
}
|
||||
|
||||
impl EnvGuard {
|
||||
fn set(key: &'static str, value: &str) -> Self {
|
||||
let original = std::env::var_os(key);
|
||||
// SAFETY: this guard restores the original value before the test exits.
|
||||
unsafe {
|
||||
std::env::set_var(key, value);
|
||||
}
|
||||
Self { key, original }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EnvGuard {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: this guard restores the original value for the modified env var.
|
||||
unsafe {
|
||||
match &self.original {
|
||||
Some(value) => std::env::set_var(self.key, value),
|
||||
None => std::env::remove_var(self.key),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_audio_before_start_emits_error() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -276,19 +378,19 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_old" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_old", "instructions": "old" }
|
||||
})]],
|
||||
vec![
|
||||
vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_new" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_new", "instructions": "new" }
|
||||
})],
|
||||
vec![json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"type": "conversation.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
"channels": 1
|
||||
})],
|
||||
],
|
||||
])
|
||||
@@ -305,7 +407,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
|
||||
.await?;
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_old" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
@@ -321,7 +423,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
|
||||
.await?;
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_new" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
@@ -351,17 +453,25 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
|
||||
assert_eq!(connections.len(), 3);
|
||||
assert_eq!(connections[1].len(), 1);
|
||||
assert_eq!(
|
||||
connections[1][0].body_json()["session"]["conversation_id"].as_str(),
|
||||
connections[1][0].body_json()["session"]["instructions"].as_str(),
|
||||
Some("old")
|
||||
);
|
||||
assert_eq!(
|
||||
server.handshakes()[1].header("x-session-id").as_deref(),
|
||||
Some("conv_old")
|
||||
);
|
||||
assert_eq!(connections[2].len(), 2);
|
||||
assert_eq!(
|
||||
connections[2][0].body_json()["session"]["conversation_id"].as_str(),
|
||||
connections[2][0].body_json()["session"]["instructions"].as_str(),
|
||||
Some("new")
|
||||
);
|
||||
assert_eq!(
|
||||
server.handshakes()[2].header("x-session-id").as_deref(),
|
||||
Some("conv_new")
|
||||
);
|
||||
assert_eq!(
|
||||
connections[2][1].body_json()["type"].as_str(),
|
||||
Some("response.input_audio.delta")
|
||||
Some("input_audio_buffer.append")
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
@@ -374,8 +484,8 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul
|
||||
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_override" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_override", "instructions": "backend prompt" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
@@ -399,14 +509,14 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(session_created, "sess_override");
|
||||
assert_eq!(session_updated, "sess_override");
|
||||
|
||||
let startup_connections = startup_server.connections();
|
||||
assert_eq!(startup_connections.len(), 1);
|
||||
@@ -415,7 +525,7 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul
|
||||
assert_eq!(realtime_connections.len(), 1);
|
||||
assert_eq!(
|
||||
realtime_connections[0][0].body_json()["type"].as_str(),
|
||||
Some("session.create")
|
||||
Some("session.update")
|
||||
);
|
||||
|
||||
startup_server.shutdown().await;
|
||||
@@ -430,8 +540,8 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_override" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_override", "instructions": "prompt from config" }
|
||||
})]],
|
||||
])
|
||||
.await;
|
||||
@@ -449,19 +559,19 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(session_created, "sess_override");
|
||||
assert_eq!(session_updated, "sess_override");
|
||||
|
||||
let connections = server.connections();
|
||||
assert_eq!(connections.len(), 2);
|
||||
assert_eq!(
|
||||
connections[1][0].body_json()["session"]["backend_prompt"].as_str(),
|
||||
connections[1][0].body_json()["session"]["instructions"].as_str(),
|
||||
Some("prompt from config")
|
||||
);
|
||||
|
||||
@@ -470,7 +580,7 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() -> Result<()> {
|
||||
async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
@@ -485,10 +595,19 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() ->
|
||||
.await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![
|
||||
vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_1" }
|
||||
})],
|
||||
vec![
|
||||
json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_1", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_1",
|
||||
"item_id": "item_1",
|
||||
"input_transcript": "delegate hello",
|
||||
"messages": [{ "role": "user", "text": "delegate hello" }]
|
||||
}),
|
||||
],
|
||||
vec![],
|
||||
]])
|
||||
.await;
|
||||
@@ -508,16 +627,27 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(session_created, "sess_1");
|
||||
assert_eq!(session_updated, "sess_1");
|
||||
|
||||
test.submit_turn("hello").await?;
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::HandoffRequested(handoff),
|
||||
}) if handoff.handoff_id == "handoff_1" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
@@ -533,14 +663,18 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() ->
|
||||
assert_eq!(realtime_connections[0].len(), 2);
|
||||
assert_eq!(
|
||||
realtime_connections[0][0].body_json()["type"].as_str(),
|
||||
Some("session.create")
|
||||
Some("session.update")
|
||||
);
|
||||
assert_eq!(
|
||||
realtime_connections[0][1].body_json()["type"].as_str(),
|
||||
Some("conversation.item.create")
|
||||
Some("conversation.handoff.append")
|
||||
);
|
||||
assert_eq!(
|
||||
realtime_connections[0][1].body_json()["item"]["content"][0]["text"].as_str(),
|
||||
realtime_connections[0][1].body_json()["handoff_id"].as_str(),
|
||||
Some("handoff_1")
|
||||
);
|
||||
assert_eq!(
|
||||
realtime_connections[0][1].body_json()["output_text"].as_str(),
|
||||
Some("assistant says hi")
|
||||
);
|
||||
|
||||
@@ -548,6 +682,145 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() ->
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_handoff_persists_across_item_done_until_turn_complete() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let (gate_second_message_tx, gate_second_message_rx) = oneshot::channel();
|
||||
let first_chunks = vec![
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(responses::ev_response_created("resp-1")),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(responses::ev_assistant_message(
|
||||
"msg-1",
|
||||
"assistant message 1",
|
||||
)),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: Some(gate_second_message_rx),
|
||||
body: sse_event(responses::ev_assistant_message(
|
||||
"msg-2",
|
||||
"assistant message 2",
|
||||
)),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(responses::ev_completed("resp-1")),
|
||||
},
|
||||
];
|
||||
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_item_done", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_item_done",
|
||||
"item_id": "item_item_done",
|
||||
"input_transcript": "delegate now",
|
||||
"messages": [{ "role": "user", "text": "delegate now" }]
|
||||
}),
|
||||
],
|
||||
vec![json!({
|
||||
"type": "conversation.item.done",
|
||||
"item": { "id": "item_item_done" }
|
||||
})],
|
||||
vec![],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_streaming_server(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_item_done" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::HandoffRequested(handoff),
|
||||
}) if handoff.handoff_id == "handoff_item_done" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let first_append = realtime_server.wait_for_request(0, 1).await;
|
||||
assert_eq!(
|
||||
first_append.body_json()["type"].as_str(),
|
||||
Some("conversation.handoff.append")
|
||||
);
|
||||
assert_eq!(
|
||||
first_append.body_json()["handoff_id"].as_str(),
|
||||
Some("handoff_item_done")
|
||||
);
|
||||
assert_eq!(
|
||||
first_append.body_json()["output_text"].as_str(),
|
||||
Some("assistant message 1")
|
||||
);
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::ConversationItemDone { item_id },
|
||||
}) if item_id == "item_item_done" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = gate_second_message_tx.send(());
|
||||
|
||||
let second_append = realtime_server.wait_for_request(0, 2).await;
|
||||
assert_eq!(
|
||||
second_append.body_json()["type"].as_str(),
|
||||
Some("conversation.handoff.append")
|
||||
);
|
||||
assert_eq!(
|
||||
second_append.body_json()["handoff_id"].as_str(),
|
||||
Some("handoff_item_done")
|
||||
);
|
||||
assert_eq!(
|
||||
second_append.body_json()["output_text"].as_str(),
|
||||
Some("assistant message 2")
|
||||
);
|
||||
|
||||
let completion = completions
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("missing delegated turn completion");
|
||||
completion
|
||||
.await
|
||||
.expect("delegated turn request did not complete");
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
api_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sse_event(event: Value) -> String {
|
||||
responses::sse(vec![event])
|
||||
}
|
||||
@@ -567,7 +840,7 @@ fn message_input_texts(body: &Value, role: &str) -> Vec<String> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> {
|
||||
async fn inbound_handoff_request_starts_turn() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
@@ -583,16 +856,15 @@ async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> {
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_inbound" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_inbound", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "text from realtime"}]
|
||||
}
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_inbound",
|
||||
"item_id": "item_inbound",
|
||||
"input_transcript": "text from realtime",
|
||||
"messages": [{ "role": "user", "text": "text from realtime" }]
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
@@ -612,14 +884,26 @@ async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> {
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(session_created, "sess_inbound");
|
||||
assert_eq!(session_updated, "sess_inbound");
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::HandoffRequested(handoff),
|
||||
}) if handoff.handoff_id == "handoff_inbound"
|
||||
&& handoff.input_transcript == "text from realtime" =>
|
||||
{
|
||||
Some(())
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
@@ -635,15 +919,15 @@ async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> Result<()> {
|
||||
async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_ignore_user_role" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_ignore_item", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
@@ -654,10 +938,10 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R
|
||||
}
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"type": "conversation.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
"channels": 1
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
@@ -679,8 +963,8 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
}) if session_id == "sess_ignore_user_role" => Some(()),
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_ignore_item" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
@@ -695,7 +979,7 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("timed out waiting for realtime audio after user-role conversation item");
|
||||
.expect("timed out waiting for realtime audio after conversation item");
|
||||
assert_eq!(audio_out.data, "AQID");
|
||||
|
||||
let unexpected_turn_started = tokio::time::timeout(
|
||||
@@ -741,16 +1025,15 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
let realtime_server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_echo_guard" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_echo_guard", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "delegate now"}]
|
||||
}
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_echo_guard",
|
||||
"item_id": "item_echo_guard",
|
||||
"input_transcript": "delegate now",
|
||||
"messages": [{"role": "user", "text": "delegate now"}]
|
||||
}),
|
||||
],
|
||||
vec![
|
||||
@@ -763,10 +1046,10 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
}
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"type": "conversation.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
"channels": 1
|
||||
}),
|
||||
],
|
||||
]])
|
||||
@@ -789,7 +1072,7 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_echo_guard" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
@@ -797,14 +1080,8 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::ConversationItemAdded(item),
|
||||
}) => item
|
||||
.get("content")
|
||||
.and_then(Value::as_array)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.any(|content| content.get("text").and_then(Value::as_str) == Some("delegate now"))
|
||||
.then_some(()),
|
||||
payload: RealtimeEvent::HandoffRequested(handoff),
|
||||
}) if handoff.input_transcript == "delegate now" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
@@ -817,19 +1094,22 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
let mirrored_request = realtime_server.wait_for_request(0, 1).await;
|
||||
let mirrored_request_body = mirrored_request.body_json();
|
||||
eprintln!(
|
||||
"[realtime test +{}ms] saw mirrored request type={:?} role={:?} text={:?} data={:?}",
|
||||
"[realtime test +{}ms] saw mirrored request type={:?} handoff_id={:?} text={:?}",
|
||||
start.elapsed().as_millis(),
|
||||
mirrored_request_body["type"].as_str(),
|
||||
mirrored_request_body["item"]["role"].as_str(),
|
||||
mirrored_request_body["item"]["content"][0]["text"].as_str(),
|
||||
mirrored_request_body["item"]["content"][0]["data"].as_str(),
|
||||
mirrored_request_body["handoff_id"].as_str(),
|
||||
mirrored_request_body["output_text"].as_str(),
|
||||
);
|
||||
assert_eq!(
|
||||
mirrored_request_body["type"].as_str(),
|
||||
Some("conversation.item.create")
|
||||
Some("conversation.handoff.append")
|
||||
);
|
||||
assert_eq!(
|
||||
mirrored_request_body["item"]["content"][0]["text"].as_str(),
|
||||
mirrored_request_body["handoff_id"].as_str(),
|
||||
Some("handoff_echo_guard")
|
||||
);
|
||||
assert_eq!(
|
||||
mirrored_request_body["output_text"].as_str(),
|
||||
Some("assistant says hi")
|
||||
);
|
||||
|
||||
@@ -875,7 +1155,7 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Result<()> {
|
||||
async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
||||
@@ -893,22 +1173,21 @@ async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Res
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_non_blocking" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_non_blocking", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "delegate now"}]
|
||||
}
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_non_blocking",
|
||||
"item_id": "item_non_blocking",
|
||||
"input_transcript": "delegate now",
|
||||
"messages": [{"role": "user", "text": "delegate now"}]
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"type": "conversation.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
"channels": 1
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
@@ -930,7 +1209,7 @@ async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Res
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_non_blocking" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
@@ -938,14 +1217,8 @@ async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Res
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::ConversationItemAdded(item),
|
||||
}) => item
|
||||
.get("content")
|
||||
.and_then(Value::as_array)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.any(|content| content.get("text").and_then(Value::as_str) == Some("delegate now"))
|
||||
.then_some(()),
|
||||
payload: RealtimeEvent::HandoffRequested(handoff),
|
||||
}) if handoff.input_transcript == "delegate now" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
@@ -982,7 +1255,7 @@ async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Res
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_realtime_text_steers_active_turn() -> Result<()> {
|
||||
async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
||||
@@ -1027,17 +1300,15 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> {
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![
|
||||
vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_steer" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_steer", "instructions": "backend prompt" }
|
||||
})],
|
||||
vec![],
|
||||
vec![json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "steer via realtime"}]
|
||||
}
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_steer",
|
||||
"item_id": "item_steer",
|
||||
"input_transcript": "steer via realtime",
|
||||
"messages": [{ "role": "user", "text": "steer via realtime" }]
|
||||
})],
|
||||
]])
|
||||
.await;
|
||||
@@ -1058,7 +1329,7 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> {
|
||||
.await?;
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_steer" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
@@ -1092,16 +1363,8 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> {
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::ConversationItemAdded(item),
|
||||
}) => item
|
||||
.get("content")
|
||||
.and_then(Value::as_array)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.any(|content| {
|
||||
content.get("text").and_then(Value::as_str) == Some("steer via realtime")
|
||||
})
|
||||
.then_some(()),
|
||||
payload: RealtimeEvent::HandoffRequested(handoff),
|
||||
}) if handoff.input_transcript == "steer via realtime" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
@@ -1141,7 +1404,7 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio() -> Result<()> {
|
||||
async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
||||
@@ -1157,33 +1420,24 @@ async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio(
|
||||
];
|
||||
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
|
||||
|
||||
let delegated_text = "delegate from spawn transcript";
|
||||
let delegated_text = "delegate from handoff request";
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_spawn_transcript" }
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_handoff_request", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "spawn_transcript",
|
||||
"seq": 1,
|
||||
"full_user_transcript": delegated_text,
|
||||
"delta_user_transcript": delegated_text,
|
||||
"backend_prompt_messages": [{
|
||||
"role": "user",
|
||||
"channel": null,
|
||||
"content": delegated_text,
|
||||
"content_type": "text"
|
||||
}],
|
||||
"transcript_source": "backend_prompt_messages"
|
||||
}
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_audio",
|
||||
"item_id": "item_audio",
|
||||
"input_transcript": delegated_text,
|
||||
"messages": [{ "role": "user", "text": delegated_text }]
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"type": "conversation.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
"channels": 1
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
@@ -1205,18 +1459,17 @@ async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio(
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
}) if session_id == "sess_spawn_transcript" => Some(()),
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_handoff_request" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::ConversationItemAdded(item),
|
||||
}) => (item.get("type").and_then(Value::as_str) == Some("spawn_transcript")
|
||||
&& item.get("delta_user_transcript").and_then(Value::as_str) == Some(delegated_text))
|
||||
.then_some(()),
|
||||
payload: RealtimeEvent::HandoffRequested(handoff),
|
||||
}) => (handoff.handoff_id == "handoff_audio" && handoff.input_transcript == delegated_text)
|
||||
.then_some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
@@ -1231,7 +1484,7 @@ async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio(
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("timed out waiting for realtime audio after spawn_transcript");
|
||||
.expect("timed out waiting for realtime audio after handoff request");
|
||||
assert_eq!(audio_out.data, "AQID");
|
||||
|
||||
let completion = completions
|
||||
|
||||
Reference in New Issue
Block a user