Files
codex/codex-rs/core/tests/suite/realtime_conversation.rs
2026-03-09 13:34:20 -07:00

2122 lines
71 KiB
Rust

use anyhow::Context;
use anyhow::Result;
use chrono::Utc;
use codex_core::CodexAuth;
use codex_core::auth::OPENAI_API_KEY_ENV_VAR;
use codex_protocol::ThreadId;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::ConversationTextParams;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use core_test_support::responses::start_mock_server;
use core_test_support::responses::start_websocket_server;
use core_test_support::skip_if_no_network;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use serial_test::serial;
use std::ffi::OsString;
use std::fs;
use std::time::Duration;
use tokio::sync::oneshot;
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
const MEMORY_PROMPT_PHRASE: &str =
"You have access to a memory folder with guidance from prior runs.";
fn websocket_request_text(
request: &core_test_support::responses::WebSocketRequest,
) -> Option<String> {
request.body_json()["item"]["content"][0]["text"]
.as_str()
.map(str::to_owned)
}
fn websocket_request_instructions(
request: &core_test_support::responses::WebSocketRequest,
) -> Option<String> {
request.body_json()["session"]["instructions"]
.as_str()
.map(str::to_owned)
}
async fn wait_for_matching_websocket_request<F>(
server: &core_test_support::responses::WebSocketTestServer,
description: &str,
predicate: F,
) -> core_test_support::responses::WebSocketRequest
where
F: Fn(&core_test_support::responses::WebSocketRequest) -> bool,
{
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
if let Some(request) = server
.connections()
.iter()
.flat_map(|connection| connection.iter())
.find(|request| predicate(request))
.cloned()
{
return request;
}
assert!(
tokio::time::Instant::now() < deadline,
"timed out waiting for {description}"
);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
async fn seed_recent_thread(
test: &TestCodex,
title: &str,
first_user_message: &str,
slug: &str,
) -> Result<()> {
let db = test.codex.state_db().context("state db enabled")?;
let thread_id = ThreadId::new();
let updated_at = Utc::now();
let mut metadata_builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
test.codex_home_path()
.join(format!("rollout-{thread_id}.jsonl")),
updated_at,
SessionSource::Cli,
);
metadata_builder.cwd = test.workspace_path(format!("workspace-{slug}"));
metadata_builder.model_provider = Some("test-provider".to_string());
metadata_builder.git_branch = Some(format!("branch-{slug}"));
let mut metadata = metadata_builder.build("test-provider");
metadata.title = title.to_string();
metadata.first_user_message = Some(first_user_message.to_string());
db.upsert_thread(&metadata).await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
vec![],
vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_1", "instructions": "backend prompt" }
})],
vec![],
vec![
json!({
"type": "conversation.output_audio.delta",
"delta": "AQID",
"sample_rate": 24000,
"channels": 1
}),
json!({
"type": "conversation.item.added",
"item": {
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "hi"}]
}
}),
],
],
])
.await;
let mut builder = test_codex();
let test = builder.build_with_websocket_server(&server).await?;
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let started = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
assert!(started.session_id.is_some());
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_1");
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
},
}))
.await?;
test.codex
.submit(Op::RealtimeConversationText(ConversationTextParams {
text: "hello".to_string(),
}))
.await?;
let audio_out = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(frame),
}) => Some(frame.clone()),
_ => None,
})
.await;
assert_eq!(audio_out.data, "AQID");
let connections = server.connections();
assert_eq!(connections.len(), 2);
let connection = &connections[1];
assert_eq!(connection.len(), 3);
assert_eq!(
connection[0].body_json()["type"].as_str(),
Some("session.update")
);
let initial_instructions = websocket_request_instructions(&connection[0])
.expect("initial session update instructions");
assert!(initial_instructions.starts_with("backend prompt"));
assert_eq!(
server.handshakes()[1]
.header("x-session-id")
.expect("session.update x-session-id header"),
started
.session_id
.as_deref()
.expect("started session id should be present")
);
assert_eq!(
server.handshakes()[1].header("authorization").as_deref(),
Some("Bearer dummy")
);
assert_eq!(
server.handshakes()[1].uri(),
"/v1/realtime?intent=quicksilver&model=realtime-test-model"
);
let mut request_types = [
connection[1].body_json()["type"]
.as_str()
.expect("request type")
.to_string(),
connection[2].body_json()["type"]
.as_str()
.expect("request type")
.to_string(),
];
request_types.sort();
assert_eq!(
request_types,
[
"conversation.item.create".to_string(),
"input_audio_buffer.append".to_string(),
]
);
test.codex.submit(Op::RealtimeConversationClose).await?;
let closed = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await;
assert!(matches!(
closed.reason.as_deref(),
Some("requested" | "transport_closed")
));
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial(openai_api_key_env)]
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
skip_if_no_network!(Ok(()));
let _env_guard = EnvGuard::set(OPENAI_API_KEY_ENV_VAR, "env-realtime-key");
let server = start_websocket_server(vec![
vec![],
vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_env", "instructions": "backend prompt" }
})]],
])
.await;
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let test = builder.build_with_websocket_server(&server).await?;
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let started = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
assert!(started.session_id.is_some());
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_env");
assert_eq!(
server.handshakes()[1].header("authorization").as_deref(),
Some("Bearer env-realtime-key")
);
test.codex.submit(Op::RealtimeConversationClose).await?;
let _closed = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await;
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_transport_close_emits_closed_event() -> Result<()> {
skip_if_no_network!(Ok(()));
let session_updated = vec![json!({
"type": "session.updated",
"session": { "id": "sess_1", "instructions": "backend prompt" }
})];
let server = start_websocket_server(vec![vec![], vec![session_updated]]).await;
let mut builder = test_codex();
let test = builder.build_with_websocket_server(&server).await?;
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let started = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
assert!(started.session_id.is_some());
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_1");
let closed = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await;
assert_eq!(closed.reason.as_deref(), Some("transport_closed"));
server.shutdown().await;
Ok(())
}
struct EnvGuard {
key: &'static str,
original: Option<OsString>,
}
impl EnvGuard {
fn set(key: &'static str, value: &str) -> Self {
let original = std::env::var_os(key);
// SAFETY: this guard restores the original value before the test exits.
unsafe {
std::env::set_var(key, value);
}
Self { key, original }
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
// SAFETY: this guard restores the original value for the modified env var.
unsafe {
match &self.original {
Some(value) => std::env::set_var(self.key, value),
None => std::env::remove_var(self.key),
}
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_audio_before_start_emits_error() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![]).await;
let mut builder = test_codex();
let test = builder.build_with_websocket_server(&server).await?;
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
},
}))
.await?;
let err = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::Error(err) => Some(err.clone()),
_ => None,
})
.await;
assert_eq!(err.codex_error_info, Some(CodexErrorInfo::BadRequest));
assert_eq!(err.message, "conversation is not running");
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_text_before_start_emits_error() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![]).await;
let mut builder = test_codex();
let test = builder.build_with_websocket_server(&server).await?;
test.codex
.submit(Op::RealtimeConversationText(ConversationTextParams {
text: "hello".to_string(),
}))
.await?;
let err = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::Error(err) => Some(err.clone()),
_ => None,
})
.await;
assert_eq!(err.codex_error_info, Some(CodexErrorInfo::BadRequest));
assert_eq!(err.message, "conversation is not running");
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_second_start_replaces_runtime() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
vec![],
vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_old", "instructions": "old" }
})]],
vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_new", "instructions": "new" }
})],
vec![json!({
"type": "conversation.output_audio.delta",
"delta": "AQID",
"sample_rate": 24000,
"channels": 1
})],
],
])
.await;
let mut builder = test_codex();
let test = builder.build_with_websocket_server(&server).await?;
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "old".to_string(),
session_id: Some("conv_old".to_string()),
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_old" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("first conversation start failed: {err:?}"));
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "new".to_string(),
session_id: Some("conv_new".to_string()),
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_new" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("second conversation start failed: {err:?}"));
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
},
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(frame),
}) if frame.data == "AQID" => Some(()),
_ => None,
})
.await;
let connections = server.connections();
assert_eq!(connections.len(), 3);
assert_eq!(connections[1].len(), 1);
let old_instructions =
websocket_request_instructions(&connections[1][0]).expect("old session instructions");
assert!(old_instructions.starts_with("old"));
assert_eq!(
server.handshakes()[1].header("x-session-id").as_deref(),
Some("conv_old")
);
assert_eq!(connections[2].len(), 2);
let new_instructions =
websocket_request_instructions(&connections[2][0]).expect("new session instructions");
assert!(new_instructions.starts_with("new"));
assert_eq!(
server.handshakes()[2].header("x-session-id").as_deref(),
Some("conv_new")
);
assert_eq!(
connections[2][1].body_json()["type"].as_str(),
Some("input_audio_buffer.append")
);
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Result<()> {
skip_if_no_network!(Ok(()));
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_override", "instructions": "backend prompt" }
})]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_websocket_server(&startup_server).await?;
assert!(
startup_server
.wait_for_handshakes(1, Duration::from_secs(2))
.await
);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_override");
let startup_connections = startup_server.connections();
assert_eq!(startup_connections.len(), 1);
let realtime_connections = realtime_server.connections();
assert_eq!(realtime_connections.len(), 1);
assert_eq!(
realtime_connections[0][0].body_json()["type"].as_str(),
Some("session.update")
);
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
vec![],
vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_override", "instructions": "prompt from config" }
})]],
])
.await;
let mut builder = test_codex().with_config(|config| {
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
});
let test = builder.build_with_websocket_server(&server).await?;
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "prompt from op".to_string(),
session_id: None,
}))
.await?;
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_override");
let connections = server.connections();
assert_eq!(connections.len(), 2);
let overridden_instructions = websocket_request_instructions(&connections[1][0])
.expect("overridden session instructions");
assert!(overridden_instructions.starts_with("prompt from config"));
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_uses_experimental_realtime_ws_startup_context_override() -> Result<()> {
skip_if_no_network!(Ok(()));
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_custom_context", "instructions": "prompt from config" }
})]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
config.experimental_realtime_ws_startup_context =
Some("custom startup context".to_string());
}
});
let test = builder.build_with_websocket_server(&startup_server).await?;
seed_recent_thread(
&test,
"Recent work: cleaned up startup flows and reviewed websocket routing.",
"Investigate realtime startup context",
"custom-context",
)
.await?;
fs::create_dir_all(test.workspace_path("docs"))?;
fs::write(test.workspace_path("README.md"), "workspace marker")?;
assert!(
startup_server
.wait_for_handshakes(1, Duration::from_secs(2))
.await
);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "prompt from op".to_string(),
session_id: None,
}))
.await?;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"startup context request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let instructions = websocket_request_instructions(&startup_context_request)
.expect("custom startup context request should contain instructions");
assert_eq!(instructions, "prompt from config\n\ncustom startup context");
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
assert!(!instructions.contains("## Machine / Workspace Map"));
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_disables_realtime_startup_context_with_empty_override() -> Result<()> {
skip_if_no_network!(Ok(()));
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_no_context", "instructions": "prompt from config" }
})]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
config.experimental_realtime_ws_startup_context = Some(String::new());
}
});
let test = builder.build_with_websocket_server(&startup_server).await?;
seed_recent_thread(
&test,
"Recent work: cleaned up startup flows and reviewed websocket routing.",
"Investigate realtime startup context",
"no-context",
)
.await?;
fs::create_dir_all(test.workspace_path("docs"))?;
fs::write(test.workspace_path("README.md"), "workspace marker")?;
assert!(
startup_server
.wait_for_handshakes(1, Duration::from_secs(2))
.await
);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "prompt from op".to_string(),
session_id: None,
}))
.await?;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"startup context disable request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let instructions = websocket_request_instructions(&startup_context_request)
.expect("startup context disable request should contain instructions");
assert_eq!(instructions, "prompt from config");
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
assert!(!instructions.contains("## Machine / Workspace Map"));
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_start_injects_startup_context_from_thread_history() -> Result<()> {
skip_if_no_network!(Ok(()));
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_context", "instructions": "backend prompt" }
})]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_websocket_server(&startup_server).await?;
seed_recent_thread(
&test,
"Recent work: cleaned up startup flows and reviewed websocket routing.",
"Investigate realtime startup context",
"latest",
)
.await?;
fs::create_dir_all(test.workspace_path("docs"))?;
fs::write(test.workspace_path("README.md"), "workspace marker")?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"startup context request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let startup_context = websocket_request_instructions(&startup_context_request)
.expect("startup context request should contain instructions");
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
assert!(!startup_context.contains("## User"));
assert!(startup_context.contains("### "));
assert!(startup_context.contains("Recent sessions: 1"));
assert!(startup_context.contains("Latest branch: branch-latest"));
assert!(startup_context.contains("User asks:"));
assert!(startup_context.contains("Investigate realtime startup context"));
assert!(startup_context.contains("## Machine / Workspace Map"));
assert!(startup_context.contains("README.md"));
assert!(!startup_context.contains(MEMORY_PROMPT_PHRASE));
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()> {
skip_if_no_network!(Ok(()));
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_workspace", "instructions": "backend prompt" }
})]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_websocket_server(&startup_server).await?;
fs::create_dir_all(test.workspace_path("codex-rs/core"))?;
fs::write(test.workspace_path("notes.txt"), "workspace marker")?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"workspace-map startup context request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let startup_context = websocket_request_instructions(&startup_context_request)
.expect("startup context request should contain instructions");
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
assert!(startup_context.contains("## Machine / Workspace Map"));
assert!(startup_context.contains("notes.txt"));
assert!(startup_context.contains("codex-rs/"));
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_startup_context_is_truncated_and_sent_once_per_start() -> Result<()> {
skip_if_no_network!(Ok(()));
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_truncated", "instructions": "backend prompt" }
})],
vec![],
]])
.await;
let oversized_summary = "recent work ".repeat(3_500);
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_websocket_server(&startup_server).await?;
seed_recent_thread(&test, &oversized_summary, "summary", "oversized").await?;
fs::write(test.workspace_path("marker.txt"), "marker")?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"truncated startup context request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let startup_context = websocket_request_instructions(&startup_context_request)
.expect("startup context request should contain instructions");
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
assert!(startup_context.len() <= 20_500);
test.codex
.submit(Op::RealtimeConversationText(ConversationTextParams {
text: "hello".to_string(),
}))
.await?;
let explicit_text_request = wait_for_matching_websocket_request(
&realtime_server,
"explicit realtime text request",
|request| websocket_request_text(request).as_deref() == Some("hello"),
)
.await;
assert_eq!(
websocket_request_text(&explicit_text_request),
Some("hello".to_string())
);
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let _response_mock = responses::mount_sse_once(
&api_server,
responses::sse(vec![
responses::ev_response_created("resp_1"),
responses::ev_assistant_message("msg_1", "assistant says hi"),
responses::ev_completed("resp_1"),
]),
)
.await;
let realtime_server = start_websocket_server(vec![vec![
vec![
json!({
"type": "session.updated",
"session": { "id": "sess_1", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate hello"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_1",
"item_id": "item_1",
"input_transcript": "delegate hello"
}),
],
vec![],
]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_1");
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::HandoffRequested(handoff),
}) if handoff.handoff_id == "handoff_1" => Some(()),
_ => None,
})
.await;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < deadline {
let connections = realtime_server.connections();
if connections.len() == 1 && connections[0].len() >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let realtime_connections = realtime_server.connections();
assert_eq!(realtime_connections.len(), 1);
assert_eq!(realtime_connections[0].len(), 2);
assert_eq!(
realtime_connections[0][0].body_json()["type"].as_str(),
Some("session.update")
);
assert_eq!(
realtime_connections[0][1].body_json()["type"].as_str(),
Some("conversation.handoff.append")
);
assert_eq!(
realtime_connections[0][1].body_json()["handoff_id"].as_str(),
Some("handoff_1")
);
assert_eq!(
realtime_connections[0][1].body_json()["output_text"].as_str(),
Some("assistant says hi")
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_handoff_persists_across_item_done_until_turn_complete() -> Result<()> {
skip_if_no_network!(Ok(()));
let (gate_second_message_tx, gate_second_message_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_assistant_message(
"msg-1",
"assistant message 1",
)),
},
StreamingSseChunk {
gate: Some(gate_second_message_rx),
body: sse_event(responses::ev_assistant_message(
"msg-2",
"assistant message 2",
)),
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_completed("resp-1")),
},
];
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
let realtime_server = start_websocket_server(vec![vec![
vec![
json!({
"type": "session.updated",
"session": { "id": "sess_item_done", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate now"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_item_done",
"item_id": "item_item_done",
"input_transcript": "delegate now"
}),
],
vec![json!({
"type": "conversation.item.done",
"item": { "id": "item_item_done" }
})],
vec![],
]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_streaming_server(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_item_done" => Some(()),
_ => None,
})
.await;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::HandoffRequested(handoff),
}) if handoff.handoff_id == "handoff_item_done" => Some(()),
_ => None,
})
.await;
let first_append = realtime_server.wait_for_request(0, 1).await;
assert_eq!(
first_append.body_json()["type"].as_str(),
Some("conversation.handoff.append")
);
assert_eq!(
first_append.body_json()["handoff_id"].as_str(),
Some("handoff_item_done")
);
assert_eq!(
first_append.body_json()["output_text"].as_str(),
Some("assistant message 1")
);
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::ConversationItemDone { item_id },
}) if item_id == "item_item_done" => Some(()),
_ => None,
})
.await;
let _ = gate_second_message_tx.send(());
let second_append = realtime_server.wait_for_request(0, 2).await;
assert_eq!(
second_append.body_json()["type"].as_str(),
Some("conversation.handoff.append")
);
assert_eq!(
second_append.body_json()["handoff_id"].as_str(),
Some("handoff_item_done")
);
assert_eq!(
second_append.body_json()["output_text"].as_str(),
Some("assistant message 2")
);
let completion = completions
.into_iter()
.next()
.expect("missing delegated turn completion");
completion
.await
.expect("delegated turn request did not complete");
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
realtime_server.shutdown().await;
api_server.shutdown().await;
Ok(())
}
fn sse_event(event: Value) -> String {
responses::sse(vec![event])
}
fn message_input_texts(body: &Value, role: &str) -> Vec<String> {
body.get("input")
.and_then(Value::as_array)
.into_iter()
.flatten()
.filter(|item| item.get("type").and_then(Value::as_str) == Some("message"))
.filter(|item| item.get("role").and_then(Value::as_str) == Some(role))
.filter_map(|item| item.get("content").and_then(Value::as_array))
.flatten()
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text"))
.filter_map(|span| span.get("text").and_then(Value::as_str).map(str::to_owned))
.collect()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_handoff_request_starts_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let response_mock = responses::mount_sse_once(
&api_server,
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "ok"),
responses::ev_completed("resp-1"),
]),
)
.await;
let realtime_server = start_websocket_server(vec![vec![vec![
json!({
"type": "session.updated",
"session": { "id": "sess_inbound", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "text from realtime"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_inbound",
"item_id": "item_inbound",
"input_transcript": "text from realtime"
}),
]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_inbound");
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::HandoffRequested(handoff),
}) if handoff.handoff_id == "handoff_inbound"
&& handoff.input_transcript == "text from realtime" =>
{
Some(())
}
_ => None,
})
.await;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let request = response_mock.single_request();
let user_texts = request.message_input_texts("user");
assert!(
user_texts
.iter()
.any(|text| text == "user: text from realtime")
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let response_mock = responses::mount_sse_once(
&api_server,
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "ok"),
responses::ev_completed("resp-1"),
]),
)
.await;
let realtime_server = start_websocket_server(vec![vec![vec![
json!({
"type": "session.updated",
"session": { "id": "sess_inbound_multi", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.output_transcript.delta",
"delta": "assistant context"
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegated query"
}),
json!({
"type": "conversation.output_transcript.delta",
"delta": "assist confirm"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_inbound_multi",
"item_id": "item_inbound_multi",
"input_transcript": "ignored"
}),
]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let request = response_mock.single_request();
let user_texts = request.message_input_texts("user");
assert!(user_texts.iter().any(|text| text
== "assistant: assistant context\nuser: delegated query\nassistant: assist confirm"));
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let response_mock = responses::mount_sse_sequence(
&api_server,
vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "first ok"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_response_created("resp-2"),
responses::ev_assistant_message("msg-2", "second ok"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
let realtime_server = start_websocket_server(vec![vec![
vec![
json!({
"type": "session.updated",
"session": { "id": "sess_inbound_clear", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "first question"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_inbound_clear_1",
"item_id": "item_inbound_clear_1",
"input_transcript": "first question"
}),
],
vec![],
vec![
json!({
"type": "conversation.input_transcript.delta",
"delta": "second question"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_inbound_clear_2",
"item_id": "item_inbound_clear_2",
"input_transcript": "second question"
}),
],
]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
},
}))
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = response_mock.requests();
assert_eq!(requests.len(), 2);
let first_user_texts = requests[0].message_input_texts("user");
assert!(
first_user_texts
.iter()
.any(|text| text == "user: first question")
);
let second_user_texts = requests[1].message_input_texts("user");
assert!(
second_user_texts
.iter()
.any(|text| text == "user: second question")
);
assert!(
!second_user_texts
.iter()
.any(|text| text == "user: first question\nuser: second question")
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let realtime_server = start_websocket_server(vec![vec![vec![
json!({
"type": "session.updated",
"session": { "id": "sess_ignore_item", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.item.added",
"item": {
"type": "message",
"role": "user",
"content": [{"type": "text", "text": "echoed local text"}]
}
}),
json!({
"type": "conversation.output_audio.delta",
"delta": "AQID",
"sample_rate": 24000,
"channels": 1
}),
]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_ignore_item" => Some(()),
_ => None,
})
.await;
let audio_out = tokio::time::timeout(
Duration::from_millis(500),
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(frame),
}) => Some(frame.clone()),
_ => None,
}),
)
.await
.expect("timed out waiting for realtime audio after conversation item");
assert_eq!(audio_out.data, "AQID");
let unexpected_turn_started = tokio::time::timeout(
Duration::from_millis(200),
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::TurnStarted(_) => Some(()),
_ => None,
}),
)
.await;
assert!(unexpected_turn_started.is_err());
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_audio() -> Result<()>
{
skip_if_no_network!(Ok(()));
let start = std::time::Instant::now();
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_assistant_message(
"msg-1",
"assistant says hi",
)),
},
StreamingSseChunk {
gate: Some(gate_completed_rx),
body: sse_event(responses::ev_completed("resp-1")),
},
];
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
let realtime_server = start_websocket_server(vec![vec![
vec![
json!({
"type": "session.updated",
"session": { "id": "sess_echo_guard", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate now"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_echo_guard",
"item_id": "item_echo_guard",
"input_transcript": "delegate now"
}),
],
vec![
json!({
"type": "conversation.item.added",
"item": {
"type": "message",
"role": "user",
"content": [{"type": "text", "text": "assistant says hi"}]
}
}),
json!({
"type": "conversation.output_audio.delta",
"delta": "AQID",
"sample_rate": 24000,
"channels": 1
}),
],
]])
.await;
let mut builder = test_codex().with_model("gpt-5.1").with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_streaming_server(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_echo_guard" => Some(()),
_ => None,
})
.await;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::HandoffRequested(handoff),
}) if handoff.input_transcript == "delegate now" => Some(()),
_ => None,
})
.await;
eprintln!(
"[realtime test +{}ms] saw trigger text={:?}",
start.elapsed().as_millis(),
"delegate now"
);
let mirrored_request = realtime_server.wait_for_request(0, 1).await;
let mirrored_request_body = mirrored_request.body_json();
eprintln!(
"[realtime test +{}ms] saw mirrored request type={:?} handoff_id={:?} text={:?}",
start.elapsed().as_millis(),
mirrored_request_body["type"].as_str(),
mirrored_request_body["handoff_id"].as_str(),
mirrored_request_body["output_text"].as_str(),
);
assert_eq!(
mirrored_request_body["type"].as_str(),
Some("conversation.handoff.append")
);
assert_eq!(
mirrored_request_body["handoff_id"].as_str(),
Some("handoff_echo_guard")
);
assert_eq!(
mirrored_request_body["output_text"].as_str(),
Some("assistant says hi")
);
let audio_out = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(frame),
}) => Some(frame.clone()),
_ => None,
})
.await;
eprintln!(
"[realtime test +{}ms] saw audio out data={} sample_rate={} num_channels={}",
start.elapsed().as_millis(),
audio_out.data,
audio_out.sample_rate,
audio_out.num_channels
);
assert_eq!(audio_out.data, "AQID");
let completion = completions
.into_iter()
.next()
.expect("missing delegated turn completion");
let _ = gate_completed_tx.send(());
completion
.await
.expect("delegated turn request did not complete");
eprintln!(
"[realtime test +{}ms] delegated completion resolved",
start.elapsed().as_millis()
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = api_server.requests().await;
assert_eq!(requests.len(), 1);
realtime_server.shutdown().await;
api_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> Result<()> {
skip_if_no_network!(Ok(()));
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: Some(gate_completed_rx),
body: sse_event(responses::ev_completed("resp-1")),
},
];
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
let realtime_server = start_websocket_server(vec![vec![vec![
json!({
"type": "session.updated",
"session": { "id": "sess_non_blocking", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate now"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_non_blocking",
"item_id": "item_non_blocking",
"input_transcript": "delegate now"
}),
json!({
"type": "conversation.output_audio.delta",
"delta": "AQID",
"sample_rate": 24000,
"channels": 1
}),
]]])
.await;
let mut builder = test_codex().with_model("gpt-5.1").with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_streaming_server(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_non_blocking" => Some(()),
_ => None,
})
.await;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::HandoffRequested(handoff),
}) if handoff.input_transcript == "delegate now" => Some(()),
_ => None,
})
.await;
let audio_out = tokio::time::timeout(
Duration::from_millis(500),
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(frame),
}) => Some(frame.clone()),
_ => None,
}),
)
.await
.expect("timed out waiting for realtime audio while delegated turn was still pending");
assert_eq!(audio_out.data, "AQID");
let completion = completions
.into_iter()
.next()
.expect("missing delegated turn completion");
let _ = gate_completed_tx.send(());
completion
.await
.expect("delegated turn request did not complete");
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
realtime_server.shutdown().await;
api_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_message_item_added("msg-1", "")),
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_output_text_delta("first ")),
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_output_text_delta("turn")),
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_assistant_message("msg-1", "first turn")),
},
StreamingSseChunk {
gate: Some(gate_completed_rx),
body: sse_event(responses::ev_completed("resp-1")),
},
];
let second_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_response_created("resp-2")),
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_completed("resp-2")),
},
];
let (api_server, completions) =
start_streaming_sse_server(vec![first_chunks, second_chunks]).await;
let realtime_server = start_websocket_server(vec![vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_steer", "instructions": "backend prompt" }
})],
vec![
json!({
"type": "conversation.input_transcript.delta",
"delta": "steer via realtime"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_steer",
"item_id": "item_steer",
"input_transcript": "steer via realtime"
}),
],
]])
.await;
let mut builder = test_codex().with_model("gpt-5.1").with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_streaming_server(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_steer" => Some(()),
_ => None,
})
.await;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "first prompt".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::AgentMessageContentDelta(_))
})
.await;
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
},
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::HandoffRequested(handoff),
}) if handoff.input_transcript == "steer via realtime" => Some(()),
_ => None,
})
.await;
let mut completion_iter = completions.into_iter();
let first_completion = completion_iter.next().expect("missing first completion");
let second_completion = completion_iter.next().expect("missing second completion");
let _ = gate_completed_tx.send(());
first_completion
.await
.expect("first request did not complete");
second_completion
.await
.expect("second request did not complete");
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = api_server.requests().await;
assert_eq!(requests.len(), 2);
let first_body: Value = serde_json::from_slice(&requests[0]).expect("parse first request");
let second_body: Value = serde_json::from_slice(&requests[1]).expect("parse second request");
let first_texts = message_input_texts(&first_body, "user");
let second_texts = message_input_texts(&second_body, "user");
assert!(first_texts.iter().any(|text| text == "first prompt"));
assert!(
!first_texts
.iter()
.any(|text| text == "user: steer via realtime")
);
assert!(second_texts.iter().any(|text| text == "first prompt"));
assert!(
second_texts
.iter()
.any(|text| text == "user: steer via realtime")
);
realtime_server.shutdown().await;
api_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio() -> Result<()> {
skip_if_no_network!(Ok(()));
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: Some(gate_completed_rx),
body: sse_event(responses::ev_completed("resp-1")),
},
];
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
let delegated_text = "delegate from handoff request";
let realtime_server = start_websocket_server(vec![vec![vec![
json!({
"type": "session.updated",
"session": { "id": "sess_handoff_request", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": delegated_text
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_audio",
"item_id": "item_audio",
"input_transcript": delegated_text
}),
json!({
"type": "conversation.output_audio.delta",
"delta": "AQID",
"sample_rate": 24000,
"channels": 1
}),
]]])
.await;
let mut builder = test_codex().with_model("gpt-5.1").with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_streaming_server(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_handoff_request" => Some(()),
_ => None,
})
.await;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::HandoffRequested(handoff),
}) => (handoff.handoff_id == "handoff_audio" && handoff.input_transcript == delegated_text)
.then_some(()),
_ => None,
})
.await;
let audio_out = tokio::time::timeout(
Duration::from_millis(500),
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(frame),
}) => Some(frame.clone()),
_ => None,
}),
)
.await
.expect("timed out waiting for realtime audio after handoff request");
assert_eq!(audio_out.data, "AQID");
let completion = completions
.into_iter()
.next()
.expect("missing delegated turn completion");
let _ = gate_completed_tx.send(());
completion
.await
.expect("delegated turn request did not complete");
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = api_server.requests().await;
assert_eq!(requests.len(), 1);
let first_body: Value = serde_json::from_slice(&requests[0]).expect("parse first request");
let first_texts = message_input_texts(&first_body, "user");
let expected_text = format!("user: {delegated_text}");
assert!(first_texts.iter().any(|text| text == &expected_text));
realtime_server.shutdown().await;
api_server.shutdown().await;
Ok(())
}