mirror of
https://github.com/openai/codex.git
synced 2026-04-28 08:34:54 +00:00
Select Current Thread startup context by budget from newest turns, cap each rendered turn at 300 approximate tokens, and add formatter plus integration snapshot coverage.
3002 lines
101 KiB
Rust
3002 lines
101 KiB
Rust
use anyhow::Context;
|
|
use anyhow::Result;
|
|
use chrono::Utc;
|
|
use codex_config::config_toml::RealtimeWsVersion;
|
|
use codex_core::test_support::auth_manager_from_auth;
|
|
use codex_login::CodexAuth;
|
|
use codex_login::OPENAI_API_KEY_ENV_VAR;
|
|
use codex_protocol::ThreadId;
|
|
use codex_protocol::models::ContentItem;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::protocol::CodexErrorInfo;
|
|
use codex_protocol::protocol::ConversationAudioParams;
|
|
use codex_protocol::protocol::ConversationStartParams;
|
|
use codex_protocol::protocol::ConversationStartTransport;
|
|
use codex_protocol::protocol::ConversationTextParams;
|
|
use codex_protocol::protocol::ErrorEvent;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::InitialHistory;
|
|
use codex_protocol::protocol::Op;
|
|
use codex_protocol::protocol::RealtimeAudioFrame;
|
|
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
|
use codex_protocol::protocol::RealtimeConversationVersion;
|
|
use codex_protocol::protocol::RealtimeEvent;
|
|
use codex_protocol::protocol::RealtimeVoice;
|
|
use codex_protocol::protocol::RolloutItem;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use codex_protocol::user_input::UserInput;
|
|
use core_test_support::responses;
|
|
use core_test_support::responses::WebSocketConnectionConfig;
|
|
use core_test_support::responses::start_mock_server;
|
|
use core_test_support::responses::start_websocket_server;
|
|
use core_test_support::responses::start_websocket_server_with_headers;
|
|
use core_test_support::skip_if_no_network;
|
|
use core_test_support::streaming_sse::StreamingSseChunk;
|
|
use core_test_support::streaming_sse::start_streaming_sse_server;
|
|
use core_test_support::test_codex::TestCodex;
|
|
use core_test_support::test_codex::test_codex;
|
|
use core_test_support::wait_for_event;
|
|
use core_test_support::wait_for_event_match;
|
|
use pretty_assertions::assert_eq;
|
|
use serde_json::Value;
|
|
use serde_json::json;
|
|
use std::fs;
|
|
use std::process::Command;
|
|
use std::sync::Arc;
|
|
use std::sync::Mutex;
|
|
use std::time::Duration;
|
|
use tokio::sync::oneshot;
|
|
use tokio::time::timeout;
|
|
use wiremock::Match;
|
|
use wiremock::Mock;
|
|
use wiremock::Request as WiremockRequest;
|
|
use wiremock::ResponseTemplate;
|
|
use wiremock::matchers::method;
|
|
use wiremock::matchers::path_regex;
|
|
|
|
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
|
const REALTIME_BACKEND_PROMPT: &str = include_str!("../../templates/realtime/backend_prompt.md");
|
|
const USER_FIRST_NAME_PLACEHOLDER: &str = "{{ user_first_name }}";
|
|
const MEMORY_PROMPT_PHRASE: &str =
|
|
"You have access to a memory folder with guidance from prior runs.";
|
|
const REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR: &str =
|
|
"CODEX_REALTIME_CONVERSATION_TEST_SUBPROCESS";
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct RealtimeCallRequestCapture {
|
|
requests: Arc<Mutex<Vec<WiremockRequest>>>,
|
|
}
|
|
|
|
impl RealtimeCallRequestCapture {
|
|
fn new() -> Self {
|
|
Self {
|
|
requests: Arc::new(Mutex::new(Vec::new())),
|
|
}
|
|
}
|
|
|
|
fn single_request(&self) -> WiremockRequest {
|
|
let requests = self
|
|
.requests
|
|
.lock()
|
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
|
assert_eq!(requests.len(), 1, "expected one realtime call request");
|
|
requests[0].clone()
|
|
}
|
|
}
|
|
|
|
impl Match for RealtimeCallRequestCapture {
|
|
fn matches(&self, request: &WiremockRequest) -> bool {
|
|
self.requests
|
|
.lock()
|
|
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
|
.push(request.clone());
|
|
true
|
|
}
|
|
}
|
|
|
|
fn websocket_request_text(
|
|
request: &core_test_support::responses::WebSocketRequest,
|
|
) -> Option<String> {
|
|
request.body_json()["item"]["content"][0]["text"]
|
|
.as_str()
|
|
.map(str::to_owned)
|
|
}
|
|
|
|
fn websocket_request_instructions(
|
|
request: &core_test_support::responses::WebSocketRequest,
|
|
) -> Option<String> {
|
|
request.body_json()["session"]["instructions"]
|
|
.as_str()
|
|
.map(str::to_owned)
|
|
}
|
|
|
|
fn expected_realtime_backend_prompt() -> String {
|
|
REALTIME_BACKEND_PROMPT
|
|
.trim_end()
|
|
.replace(USER_FIRST_NAME_PLACEHOLDER, &test_user_first_name())
|
|
}
|
|
|
|
fn test_user_first_name() -> String {
|
|
[whoami::realname(), whoami::username()]
|
|
.into_iter()
|
|
.filter_map(|name| name.split_whitespace().next().map(str::to_string))
|
|
.find(|name| !name.is_empty())
|
|
.unwrap_or_else(|| "there".to_string())
|
|
}
|
|
|
|
async fn wait_for_matching_websocket_request<F>(
|
|
server: &core_test_support::responses::WebSocketTestServer,
|
|
description: &str,
|
|
predicate: F,
|
|
) -> core_test_support::responses::WebSocketRequest
|
|
where
|
|
F: Fn(&core_test_support::responses::WebSocketRequest) -> bool,
|
|
{
|
|
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
|
|
loop {
|
|
if let Some(request) = server
|
|
.connections()
|
|
.iter()
|
|
.flat_map(|connection| connection.iter())
|
|
.find(|request| predicate(request))
|
|
.cloned()
|
|
{
|
|
return request;
|
|
}
|
|
|
|
assert!(
|
|
tokio::time::Instant::now() < deadline,
|
|
"timed out waiting for {description}"
|
|
);
|
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
|
}
|
|
}
|
|
|
|
fn run_realtime_conversation_test_in_subprocess(
|
|
test_name: &str,
|
|
openai_api_key: Option<&str>,
|
|
) -> Result<()> {
|
|
let mut command = Command::new(std::env::current_exe()?);
|
|
command
|
|
.arg("--exact")
|
|
.arg(test_name)
|
|
.env(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR, "1");
|
|
match openai_api_key {
|
|
Some(openai_api_key) => {
|
|
command.env(OPENAI_API_KEY_ENV_VAR, openai_api_key);
|
|
}
|
|
None => {
|
|
command.env_remove(OPENAI_API_KEY_ENV_VAR);
|
|
}
|
|
}
|
|
let output = command.output()?;
|
|
assert!(
|
|
output.status.success(),
|
|
"subprocess test `{test_name}` failed\nstdout:\n{}\nstderr:\n{}",
|
|
String::from_utf8_lossy(&output.stdout),
|
|
String::from_utf8_lossy(&output.stderr),
|
|
);
|
|
Ok(())
|
|
}
|
|
async fn seed_recent_thread(
|
|
test: &TestCodex,
|
|
title: &str,
|
|
first_user_message: &str,
|
|
slug: &str,
|
|
) -> Result<()> {
|
|
let db = test.codex.state_db().context("state db enabled")?;
|
|
let thread_id = ThreadId::new();
|
|
let updated_at = Utc::now();
|
|
let mut metadata_builder = codex_state::ThreadMetadataBuilder::new(
|
|
thread_id,
|
|
test.codex_home_path()
|
|
.join(format!("rollout-{thread_id}.jsonl")),
|
|
updated_at,
|
|
SessionSource::Cli,
|
|
);
|
|
metadata_builder.cwd = test.workspace_path(format!("workspace-{slug}"));
|
|
metadata_builder.model_provider = Some("test-provider".to_string());
|
|
metadata_builder.git_branch = Some(format!("branch-{slug}"));
|
|
let mut metadata = metadata_builder.build("test-provider");
|
|
metadata.title = title.to_string();
|
|
metadata.first_user_message = Some(first_user_message.to_string());
|
|
db.upsert_thread(&metadata).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![
|
|
vec![],
|
|
vec![
|
|
vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_1", "instructions": "backend prompt" }
|
|
})],
|
|
vec![],
|
|
vec![
|
|
json!({
|
|
"type": "conversation.output_audio.delta",
|
|
"delta": "AQID",
|
|
"sample_rate": 24000,
|
|
"channels": 1
|
|
}),
|
|
json!({
|
|
"type": "conversation.item.added",
|
|
"item": {
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"content": [{"type": "text", "text": "hi"}]
|
|
}
|
|
}),
|
|
],
|
|
],
|
|
])
|
|
.await;
|
|
|
|
let mut builder = test_codex();
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
assert!(
|
|
server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let started = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
|
|
EventMsg::Error(err) => Some(Err(err.clone())),
|
|
_ => None,
|
|
})
|
|
.await
|
|
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
|
assert!(started.session_id.is_some());
|
|
assert_eq!(started.version, RealtimeConversationVersion::V1);
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_1");
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
|
|
frame: RealtimeAudioFrame {
|
|
data: "AQID".to_string(),
|
|
sample_rate: 24000,
|
|
num_channels: 1,
|
|
samples_per_channel: Some(480),
|
|
item_id: None,
|
|
},
|
|
}))
|
|
.await?;
|
|
test.codex
|
|
.submit(Op::RealtimeConversationText(ConversationTextParams {
|
|
text: "hello".to_string(),
|
|
}))
|
|
.await?;
|
|
|
|
let audio_out = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::AudioOut(frame),
|
|
}) => Some(frame.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(audio_out.data, "AQID");
|
|
|
|
let connections = server.connections();
|
|
assert_eq!(connections.len(), 2);
|
|
let connection = &connections[1];
|
|
assert_eq!(connection.len(), 3);
|
|
assert_eq!(
|
|
connection[0].body_json()["type"].as_str(),
|
|
Some("session.update")
|
|
);
|
|
assert_eq!(
|
|
connection[0].body_json()["session"]["audio"]["output"]["voice"],
|
|
"cove"
|
|
);
|
|
let initial_instructions = websocket_request_instructions(&connection[0])
|
|
.expect("initial session update instructions");
|
|
assert!(initial_instructions.starts_with("backend prompt"));
|
|
assert_eq!(
|
|
server.handshakes()[1]
|
|
.header("x-session-id")
|
|
.expect("session.update x-session-id header"),
|
|
started
|
|
.session_id
|
|
.as_deref()
|
|
.expect("started session id should be present")
|
|
);
|
|
assert_eq!(
|
|
server.handshakes()[1].header("authorization").as_deref(),
|
|
Some("Bearer dummy")
|
|
);
|
|
assert_eq!(
|
|
server.handshakes()[1].uri(),
|
|
"/v1/realtime?intent=quicksilver&model=realtime-test-model"
|
|
);
|
|
let mut request_types = [
|
|
connection[1].body_json()["type"]
|
|
.as_str()
|
|
.expect("request type")
|
|
.to_string(),
|
|
connection[2].body_json()["type"]
|
|
.as_str()
|
|
.expect("request type")
|
|
.to_string(),
|
|
];
|
|
request_types.sort();
|
|
assert_eq!(
|
|
request_types,
|
|
[
|
|
"conversation.item.create".to_string(),
|
|
"input_audio_buffer.append".to_string(),
|
|
]
|
|
);
|
|
|
|
test.codex.submit(Op::RealtimeConversationClose).await?;
|
|
let closed = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert!(matches!(
|
|
closed.reason.as_deref(),
|
|
Some("requested" | "transport_closed")
|
|
));
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_start_defaults_to_v2_and_gpt_realtime_1_5() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let api_server = start_mock_server().await;
|
|
let realtime_server = start_websocket_server(vec![vec![vec![]]]).await;
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
let mut builder = test_codex().with_config(move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.experimental_realtime_ws_startup_context = Some(String::new());
|
|
});
|
|
let test = builder.build(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let started = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
|
|
EventMsg::Error(err) => Some(Err(err.clone())),
|
|
_ => None,
|
|
})
|
|
.await
|
|
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
|
|
|
assert!(
|
|
realtime_server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
let session_update = realtime_server
|
|
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 0)
|
|
.await;
|
|
let body = session_update.body_json();
|
|
assert_eq!(
|
|
json!({
|
|
"startedVersion": started.version,
|
|
"handshakeUri": realtime_server.single_handshake().uri(),
|
|
"voice": body["session"]["audio"]["output"]["voice"],
|
|
"instructions": body["session"]["instructions"],
|
|
}),
|
|
json!({
|
|
"startedVersion": RealtimeConversationVersion::V2,
|
|
"handshakeUri": "/v1/realtime?model=gpt-realtime-1.5",
|
|
"voice": "marin",
|
|
"instructions": "backend prompt",
|
|
})
|
|
);
|
|
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_mock_server().await;
|
|
let capture = RealtimeCallRequestCapture::new();
|
|
Mock::given(method("POST"))
|
|
.and(path_regex(".*/realtime/calls$"))
|
|
.and(capture.clone())
|
|
.respond_with(
|
|
ResponseTemplate::new(200)
|
|
.insert_header("Location", "/v1/realtime/calls/calls/rtc_core_test")
|
|
.set_body_string("v=answer\r\n"),
|
|
)
|
|
.mount(&server)
|
|
.await;
|
|
let realtime_server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
|
|
requests: vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_webrtc", "instructions": "backend prompt" }
|
|
})]],
|
|
response_headers: Vec::new(),
|
|
accept_delay: None,
|
|
close_after_requests: false,
|
|
}])
|
|
.await;
|
|
|
|
let realtime_ws_base_url = realtime_server.uri().to_string();
|
|
let mut builder = test_codex().with_config(move |config| {
|
|
config.experimental_realtime_ws_backend_prompt = Some("backend prompt".to_string());
|
|
config.experimental_realtime_ws_model = Some("realtime-test-model".to_string());
|
|
config.experimental_realtime_ws_startup_context = Some("startup context".to_string());
|
|
config.experimental_realtime_ws_base_url = Some(realtime_ws_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: Some(ConversationStartTransport::Webrtc {
|
|
sdp: "v=offer\r\n".to_string(),
|
|
}),
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
// Phase 1: the client gets the SDP answer that configures its peer connection, and then the
|
|
// normal realtime event stream from the joined sideband WebSocket.
|
|
let created = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationSdp(created) => Some(Ok(created.clone())),
|
|
EventMsg::Error(err) => Some(Err(err.clone())),
|
|
_ => None,
|
|
})
|
|
.await
|
|
.unwrap_or_else(|err: ErrorEvent| panic!("conversation call create failed: {err:?}"));
|
|
assert_eq!(created.sdp, "v=answer\r\n");
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_webrtc");
|
|
|
|
// Phase 2: call creation posts the offer and generated session together, so the media leg can
|
|
// begin inference before the sideband WebSocket is ready.
|
|
let request = capture.single_request();
|
|
assert_eq!(request.url.path(), "/v1/realtime/calls");
|
|
assert_eq!(request.url.query(), None);
|
|
assert_eq!(
|
|
request
|
|
.headers
|
|
.get("authorization")
|
|
.and_then(|value| value.to_str().ok()),
|
|
Some("Bearer dummy")
|
|
);
|
|
assert_eq!(
|
|
request
|
|
.headers
|
|
.get("content-type")
|
|
.and_then(|value| value.to_str().ok()),
|
|
Some("multipart/form-data; boundary=codex-realtime-call-boundary")
|
|
);
|
|
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
|
|
let session = r#"{"audio":{"input":{"format":{"type":"audio/pcm","rate":24000}},"output":{"voice":"cove"}},"type":"quicksilver","model":"realtime-test-model","instructions":"backend prompt\n\nstartup context"}"#;
|
|
assert_eq!(
|
|
body,
|
|
format!(
|
|
"--codex-realtime-call-boundary\r\n\
|
|
Content-Disposition: form-data; name=\"sdp\"\r\n\
|
|
Content-Type: application/sdp\r\n\
|
|
\r\n\
|
|
v=offer\r\n\
|
|
\r\n\
|
|
--codex-realtime-call-boundary\r\n\
|
|
Content-Disposition: form-data; name=\"session\"\r\n\
|
|
Content-Type: application/json\r\n\
|
|
\r\n\
|
|
{session}\r\n\
|
|
--codex-realtime-call-boundary--\r\n"
|
|
)
|
|
);
|
|
|
|
// Phase 3: the server joins that same call over the direct sideband WebSocket, sends the
|
|
// ordinary session.update, and keeps the conversation alive until the client closes it.
|
|
let session_update = realtime_server
|
|
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 0)
|
|
.await;
|
|
assert_eq!(
|
|
session_update.body_json()["type"].as_str(),
|
|
Some("session.update")
|
|
);
|
|
assert!(
|
|
websocket_request_instructions(&session_update)
|
|
.context("session.update should include instructions")?
|
|
.contains("startup context")
|
|
);
|
|
let handshake = realtime_server.single_handshake();
|
|
assert_eq!(
|
|
handshake.uri(),
|
|
"/v1/realtime?intent=quicksilver&call_id=rtc_core_test"
|
|
);
|
|
assert_eq!(
|
|
handshake.header("authorization").as_deref(),
|
|
Some("Bearer dummy")
|
|
);
|
|
|
|
test.codex.submit(Op::RealtimeConversationClose).await?;
|
|
let closed = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert!(matches!(
|
|
closed.reason.as_deref(),
|
|
Some("requested" | "transport_closed")
|
|
));
|
|
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
|
|
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
|
|
return run_realtime_conversation_test_in_subprocess(
|
|
"suite::realtime_conversation::conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth",
|
|
Some("env-realtime-key"),
|
|
);
|
|
}
|
|
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![
|
|
vec![],
|
|
vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_env", "instructions": "backend prompt" }
|
|
})]],
|
|
])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
assert!(
|
|
server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let started = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
|
|
EventMsg::Error(err) => Some(Err(err.clone())),
|
|
_ => None,
|
|
})
|
|
.await
|
|
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
|
assert!(started.session_id.is_some());
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_env");
|
|
|
|
assert_eq!(
|
|
server.handshakes()[1].header("authorization").as_deref(),
|
|
Some("Bearer env-realtime-key")
|
|
);
|
|
|
|
test.codex.submit(Op::RealtimeConversationClose).await?;
|
|
let _closed = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_transport_close_emits_closed_event() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let session_updated = vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_1", "instructions": "backend prompt" }
|
|
})];
|
|
let server = start_websocket_server(vec![vec![], vec![session_updated]]).await;
|
|
|
|
let mut builder = test_codex();
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
assert!(
|
|
server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let started = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
|
|
EventMsg::Error(err) => Some(Err(err.clone())),
|
|
_ => None,
|
|
})
|
|
.await
|
|
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
|
assert!(started.session_id.is_some());
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_1");
|
|
|
|
let closed = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(closed.reason.as_deref(), Some("transport_closed"));
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_audio_before_start_emits_error() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![]).await;
|
|
let mut builder = test_codex();
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
|
|
frame: RealtimeAudioFrame {
|
|
data: "AQID".to_string(),
|
|
sample_rate: 24000,
|
|
num_channels: 1,
|
|
samples_per_channel: Some(480),
|
|
item_id: None,
|
|
},
|
|
}))
|
|
.await?;
|
|
|
|
let err = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::Error(err) => Some(err.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(err.codex_error_info, Some(CodexErrorInfo::BadRequest));
|
|
assert_eq!(err.message, "conversation is not running");
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Result<()> {
|
|
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
|
|
return run_realtime_conversation_test_in_subprocess(
|
|
"suite::realtime_conversation::conversation_start_preflight_failure_emits_realtime_error_only",
|
|
/*openai_api_key*/ None,
|
|
);
|
|
}
|
|
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![]).await;
|
|
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let err = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::Error(message),
|
|
}) => Some(message.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(err, "realtime conversation requires API key auth");
|
|
|
|
let closed = timeout(Duration::from_millis(200), async {
|
|
wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
|
_ => None,
|
|
})
|
|
.await
|
|
})
|
|
.await;
|
|
assert!(closed.is_err(), "preflight failure should not emit closed");
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_start_connect_failure_emits_realtime_error_only() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![]).await;
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config.experimental_realtime_ws_base_url = Some("http://127.0.0.1:1".to_string());
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
});
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let err = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::Error(message),
|
|
}) => Some(message.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert!(!err.is_empty());
|
|
|
|
let closed = timeout(Duration::from_millis(200), async {
|
|
wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
|
_ => None,
|
|
})
|
|
.await
|
|
})
|
|
.await;
|
|
assert!(closed.is_err(), "connect failure should not emit closed");
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_text_before_start_emits_error() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![]).await;
|
|
let mut builder = test_codex();
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationText(ConversationTextParams {
|
|
text: "hello".to_string(),
|
|
}))
|
|
.await?;
|
|
|
|
let err = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::Error(err) => Some(err.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(err.codex_error_info, Some(CodexErrorInfo::BadRequest));
|
|
assert_eq!(err.message, "conversation is not running");
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_second_start_replaces_runtime() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![
|
|
vec![],
|
|
vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_old", "instructions": "old" }
|
|
})]],
|
|
vec![
|
|
vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_new", "instructions": "new" }
|
|
})],
|
|
vec![json!({
|
|
"type": "conversation.output_audio.delta",
|
|
"delta": "AQID",
|
|
"sample_rate": 24000,
|
|
"channels": 1
|
|
})],
|
|
],
|
|
])
|
|
.await;
|
|
let mut builder = test_codex();
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
assert!(
|
|
server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("old".to_string())),
|
|
session_id: Some("conv_old".to_string()),
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) if session_id == "sess_old" => Some(Ok(())),
|
|
EventMsg::Error(err) => Some(Err(err.clone())),
|
|
_ => None,
|
|
})
|
|
.await
|
|
.unwrap_or_else(|err: ErrorEvent| panic!("first conversation start failed: {err:?}"));
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("new".to_string())),
|
|
session_id: Some("conv_new".to_string()),
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) if session_id == "sess_new" => Some(Ok(())),
|
|
EventMsg::Error(err) => Some(Err(err.clone())),
|
|
_ => None,
|
|
})
|
|
.await
|
|
.unwrap_or_else(|err: ErrorEvent| panic!("second conversation start failed: {err:?}"));
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
|
|
frame: RealtimeAudioFrame {
|
|
data: "AQID".to_string(),
|
|
sample_rate: 24000,
|
|
num_channels: 1,
|
|
samples_per_channel: Some(480),
|
|
item_id: None,
|
|
},
|
|
}))
|
|
.await?;
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::AudioOut(frame),
|
|
}) if frame.data == "AQID" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let connections = server.connections();
|
|
assert_eq!(connections.len(), 3);
|
|
assert_eq!(connections[1].len(), 1);
|
|
let old_instructions =
|
|
websocket_request_instructions(&connections[1][0]).expect("old session instructions");
|
|
assert!(old_instructions.starts_with("old"));
|
|
assert_eq!(
|
|
server.handshakes()[1].header("x-session-id").as_deref(),
|
|
Some("conv_old")
|
|
);
|
|
assert_eq!(connections[2].len(), 2);
|
|
let new_instructions =
|
|
websocket_request_instructions(&connections[2][0]).expect("new session instructions");
|
|
assert!(new_instructions.starts_with("new"));
|
|
assert_eq!(
|
|
server.handshakes()[2].header("x-session-id").as_deref(),
|
|
Some("conv_new")
|
|
);
|
|
assert_eq!(
|
|
connections[2][1].body_json()["type"].as_str(),
|
|
Some("input_audio_buffer.append")
|
|
);
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let startup_server = start_websocket_server(vec![vec![]]).await;
|
|
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_override", "instructions": "backend prompt" }
|
|
})]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build_with_websocket_server(&startup_server).await?;
|
|
assert!(
|
|
startup_server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_override");
|
|
|
|
let startup_connections = startup_server.connections();
|
|
assert_eq!(startup_connections.len(), 1);
|
|
|
|
let realtime_connections = realtime_server.connections();
|
|
assert_eq!(realtime_connections.len(), 1);
|
|
assert_eq!(
|
|
realtime_connections[0][0].body_json()["type"].as_str(),
|
|
Some("session.update")
|
|
);
|
|
|
|
startup_server.shutdown().await;
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_uses_default_realtime_backend_prompt() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![
|
|
vec![],
|
|
vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_default", "instructions": "default" }
|
|
})]],
|
|
])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config.experimental_realtime_ws_startup_context =
|
|
Some("controlled startup context".to_string());
|
|
});
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
assert!(
|
|
server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: None,
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_default");
|
|
|
|
let connections = server.connections();
|
|
assert_eq!(connections.len(), 2);
|
|
let instructions =
|
|
websocket_request_instructions(&connections[1][0]).expect("default session instructions");
|
|
assert_eq!(
|
|
instructions,
|
|
format!(
|
|
"{}\n\ncontrolled startup context",
|
|
expected_realtime_backend_prompt()
|
|
)
|
|
);
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_uses_empty_instructions_for_null_or_empty_prompt() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![
|
|
vec![],
|
|
vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_null", "instructions": "" }
|
|
})]],
|
|
vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_empty", "instructions": "" }
|
|
})]],
|
|
])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config.experimental_realtime_ws_startup_context = Some(String::new());
|
|
});
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
assert!(
|
|
server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
for (prompt, expected_session_id) in [
|
|
(Some(None), "sess_null"),
|
|
(Some(Some(String::new())), "sess_empty"),
|
|
] {
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt,
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, expected_session_id);
|
|
|
|
test.codex.submit(Op::RealtimeConversationClose).await?;
|
|
let _closed = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
}
|
|
|
|
let connections = server.connections();
|
|
assert_eq!(connections.len(), 3);
|
|
let null_instructions =
|
|
websocket_request_instructions(&connections[1][0]).expect("null prompt instructions");
|
|
let empty_instructions =
|
|
websocket_request_instructions(&connections[2][0]).expect("empty prompt instructions");
|
|
assert_eq!(null_instructions, "");
|
|
assert_eq!(empty_instructions, "");
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_uses_explicit_start_voice() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![
|
|
vec![],
|
|
vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_voice", "instructions": "backend prompt" }
|
|
})]],
|
|
])
|
|
.await;
|
|
let test = test_codex().build_with_websocket_server(&server).await?;
|
|
assert!(
|
|
server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: Some(RealtimeVoice::Breeze),
|
|
}))
|
|
.await?;
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_voice");
|
|
|
|
let connections = server.connections();
|
|
assert_eq!(
|
|
connections[1][0].body_json()["session"]["audio"]["output"]["voice"],
|
|
"breeze"
|
|
);
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_uses_configured_realtime_voice() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![
|
|
vec![],
|
|
vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_config_voice", "instructions": "backend prompt" }
|
|
})]],
|
|
])
|
|
.await;
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config.realtime.voice = Some(RealtimeVoice::Cove);
|
|
});
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
assert!(
|
|
server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_config_voice");
|
|
|
|
let connections = server.connections();
|
|
assert_eq!(
|
|
connections[1][0].body_json()["session"]["audio"]["output"]["voice"],
|
|
"cove"
|
|
);
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_rejects_voice_for_wrong_realtime_version() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let api_server = start_mock_server().await;
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config.realtime.version = RealtimeWsVersion::V2;
|
|
});
|
|
let test = builder.build(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: Some(RealtimeVoice::Cove),
|
|
}))
|
|
.await?;
|
|
|
|
let error = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::Error(message),
|
|
}) => Some(message.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert!(error.contains("realtime voice `cove` is not supported for v2"));
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_websocket_server(vec![
|
|
vec![],
|
|
vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_override", "instructions": "prompt from config" }
|
|
})]],
|
|
])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
|
});
|
|
let test = builder.build_with_websocket_server(&server).await?;
|
|
assert!(
|
|
server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("prompt from op".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_override");
|
|
|
|
let connections = server.connections();
|
|
assert_eq!(connections.len(), 2);
|
|
let overridden_instructions = websocket_request_instructions(&connections[1][0])
|
|
.expect("overridden session instructions");
|
|
assert!(overridden_instructions.starts_with("prompt from config"));
|
|
|
|
server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_uses_experimental_realtime_ws_startup_context_override() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let startup_server = start_websocket_server(vec![vec![]]).await;
|
|
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_custom_context", "instructions": "prompt from config" }
|
|
})]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
|
config.experimental_realtime_ws_startup_context =
|
|
Some("custom startup context".to_string());
|
|
}
|
|
});
|
|
let test = builder.build_with_websocket_server(&startup_server).await?;
|
|
seed_recent_thread(
|
|
&test,
|
|
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
|
"Investigate realtime startup context",
|
|
"custom-context",
|
|
)
|
|
.await?;
|
|
fs::create_dir_all(test.workspace_path("docs"))?;
|
|
fs::write(test.workspace_path("README.md"), "workspace marker")?;
|
|
assert!(
|
|
startup_server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("prompt from op".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let startup_context_request = wait_for_matching_websocket_request(
|
|
&realtime_server,
|
|
"startup context request with instructions",
|
|
|request| websocket_request_instructions(request).is_some(),
|
|
)
|
|
.await;
|
|
let instructions = websocket_request_instructions(&startup_context_request)
|
|
.expect("custom startup context request should contain instructions");
|
|
|
|
assert_eq!(instructions, "prompt from config\n\ncustom startup context");
|
|
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
|
|
assert!(!instructions.contains("## Machine / Workspace Map"));
|
|
|
|
startup_server.shutdown().await;
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_disables_realtime_startup_context_with_empty_override() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let startup_server = start_websocket_server(vec![vec![]]).await;
|
|
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_no_context", "instructions": "prompt from config" }
|
|
})]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
|
config.experimental_realtime_ws_startup_context = Some(String::new());
|
|
}
|
|
});
|
|
let test = builder.build_with_websocket_server(&startup_server).await?;
|
|
seed_recent_thread(
|
|
&test,
|
|
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
|
"Investigate realtime startup context",
|
|
"no-context",
|
|
)
|
|
.await?;
|
|
fs::create_dir_all(test.workspace_path("docs"))?;
|
|
fs::write(test.workspace_path("README.md"), "workspace marker")?;
|
|
assert!(
|
|
startup_server
|
|
.wait_for_handshakes(/*expected*/ 1, Duration::from_secs(2))
|
|
.await
|
|
);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("prompt from op".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let startup_context_request = wait_for_matching_websocket_request(
|
|
&realtime_server,
|
|
"startup context disable request with instructions",
|
|
|request| websocket_request_instructions(request).is_some(),
|
|
)
|
|
.await;
|
|
let instructions = websocket_request_instructions(&startup_context_request)
|
|
.expect("startup context disable request should contain instructions");
|
|
|
|
assert_eq!(instructions, "prompt from config");
|
|
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
|
|
assert!(!instructions.contains("## Machine / Workspace Map"));
|
|
|
|
startup_server.shutdown().await;
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_start_injects_startup_context_from_thread_history() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let startup_server = start_websocket_server(vec![vec![]]).await;
|
|
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_context", "instructions": "backend prompt" }
|
|
})]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build_with_websocket_server(&startup_server).await?;
|
|
seed_recent_thread(
|
|
&test,
|
|
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
|
"Investigate realtime startup context",
|
|
"latest",
|
|
)
|
|
.await?;
|
|
fs::create_dir_all(test.workspace_path("docs"))?;
|
|
fs::write(test.workspace_path("README.md"), "workspace marker")?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let startup_context_request = wait_for_matching_websocket_request(
|
|
&realtime_server,
|
|
"startup context request with instructions",
|
|
|request| websocket_request_instructions(request).is_some(),
|
|
)
|
|
.await;
|
|
let startup_context = websocket_request_instructions(&startup_context_request)
|
|
.expect("startup context request should contain instructions");
|
|
|
|
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
|
assert!(!startup_context.contains("## User"));
|
|
assert!(startup_context.contains("### "));
|
|
assert!(startup_context.contains("Recent sessions: 1"));
|
|
assert!(startup_context.contains("Latest branch: branch-latest"));
|
|
assert!(startup_context.contains("User asks:"));
|
|
assert!(startup_context.contains("Investigate realtime startup context"));
|
|
assert!(startup_context.contains("## Machine / Workspace Map"));
|
|
assert!(startup_context.contains("README.md"));
|
|
assert!(!startup_context.contains(MEMORY_PROMPT_PHRASE));
|
|
|
|
startup_server.shutdown().await;
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_startup_context_current_thread_selects_many_turns_by_budget() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let api_server = start_mock_server().await;
|
|
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_current_thread_budget", "instructions": "backend prompt" }
|
|
})]]])
|
|
.await;
|
|
|
|
let latest_long_user_turn = format!(
|
|
"latest-long-start {} latest-long-middle {} latest-long-end",
|
|
"head detail ".repeat(120),
|
|
"tail detail ".repeat(170),
|
|
);
|
|
let mut user_turns = (1..=7)
|
|
.map(|index| {
|
|
format!(
|
|
"short-turn-{index}-start {} short-turn-{index}-end",
|
|
"detail ".repeat(86)
|
|
)
|
|
})
|
|
.collect::<Vec<_>>();
|
|
user_turns.push(latest_long_user_turn.clone());
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build(&api_server).await?;
|
|
|
|
// Seed completed turns through a resumed thread so this remains an
|
|
// end-to-end startup-context test without paying for a model turn per
|
|
// fixture entry in platform CI.
|
|
let history = user_turns
|
|
.into_iter()
|
|
.enumerate()
|
|
.flat_map(|(index, user_turn)| {
|
|
let turn_number = index + 1;
|
|
let assistant_turn = format!("assistant turn {turn_number}");
|
|
[
|
|
RolloutItem::ResponseItem(ResponseItem::Message {
|
|
id: None,
|
|
role: "user".to_string(),
|
|
content: vec![ContentItem::InputText { text: user_turn }],
|
|
end_turn: None,
|
|
phase: None,
|
|
}),
|
|
RolloutItem::ResponseItem(ResponseItem::Message {
|
|
id: None,
|
|
role: "assistant".to_string(),
|
|
content: vec![ContentItem::OutputText {
|
|
text: assistant_turn,
|
|
}],
|
|
end_turn: None,
|
|
phase: None,
|
|
}),
|
|
]
|
|
})
|
|
.collect::<Vec<_>>();
|
|
test.codex.shutdown_and_wait().await?;
|
|
let resumed_thread = test
|
|
.thread_manager
|
|
.resume_thread_with_history(
|
|
test.config.clone(),
|
|
InitialHistory::Forked(history),
|
|
auth_manager_from_auth(CodexAuth::from_api_key("dummy")),
|
|
/*persist_extended_history*/ false,
|
|
/*parent_trace*/ None,
|
|
)
|
|
.await?;
|
|
let codex = resumed_thread.thread;
|
|
|
|
codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let startup_context_request = wait_for_matching_websocket_request(
|
|
&realtime_server,
|
|
"current thread budget startup context request with instructions",
|
|
|request| websocket_request_instructions(request).is_some(),
|
|
)
|
|
.await;
|
|
let startup_context = websocket_request_instructions(&startup_context_request)
|
|
.expect("startup context request should contain instructions");
|
|
|
|
// Isolate only the Current Thread section; the startup prompt may also include
|
|
// workspace and notes sections after it.
|
|
let current_thread_start = startup_context
|
|
.find("## Current Thread")
|
|
.expect("startup context should include current thread section");
|
|
let current_thread_and_rest = &startup_context[current_thread_start..];
|
|
let current_thread_end = [
|
|
"\n## Recent Work",
|
|
"\n## Machine / Workspace Map",
|
|
"\n## Notes",
|
|
]
|
|
.iter()
|
|
.filter_map(|marker| current_thread_and_rest.find(marker))
|
|
.min()
|
|
.unwrap_or(current_thread_and_rest.len());
|
|
let current_thread = ¤t_thread_and_rest[..current_thread_end];
|
|
|
|
let rendered_turns = current_thread
|
|
.split("\n### ")
|
|
.skip(1)
|
|
.map(|turn| format!("### {turn}"))
|
|
.collect::<Vec<_>>();
|
|
let over_budget_turns = rendered_turns
|
|
.iter()
|
|
.filter_map(|turn| {
|
|
let token_count = turn.len().div_ceil(4);
|
|
(token_count > 300).then(|| {
|
|
(
|
|
turn.lines().next().unwrap_or_default().to_string(),
|
|
token_count,
|
|
)
|
|
})
|
|
})
|
|
.collect::<Vec<_>>();
|
|
let latest_rendered_source =
|
|
format!("### Latest turn\nUser:\n{latest_long_user_turn}\n\nAssistant:\nassistant turn 8");
|
|
|
|
// Snapshot the actual section so turn order, oldest-first omission, and
|
|
// start/end truncation behavior are reviewed together.
|
|
let snapshot = format!(
|
|
"latest_source_tokens: {}\nrendered_turn_count: {}\nover_budget_turns: {over_budget_turns:?}\n\n{current_thread}",
|
|
latest_rendered_source.len().div_ceil(4),
|
|
rendered_turns.len(),
|
|
);
|
|
insta::assert_snapshot!(
|
|
"conversation_startup_context_current_thread_selects_many_turns_by_budget",
|
|
snapshot
|
|
);
|
|
|
|
// The input includes a turn over 300 approximate tokens, and every rendered
|
|
// turn still fits the per-turn cap after labels and truncation markers.
|
|
assert_eq!(
|
|
(
|
|
latest_rendered_source.len().div_ceil(4) > 300,
|
|
over_budget_turns,
|
|
),
|
|
(true, Vec::<(String, usize)>::new()),
|
|
);
|
|
|
|
codex.shutdown_and_wait().await?;
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let startup_server = start_websocket_server(vec![vec![]]).await;
|
|
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_workspace", "instructions": "backend prompt" }
|
|
})]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build_with_websocket_server(&startup_server).await?;
|
|
fs::create_dir_all(test.workspace_path("codex-rs/core"))?;
|
|
fs::write(test.workspace_path("notes.txt"), "workspace marker")?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let startup_context_request = wait_for_matching_websocket_request(
|
|
&realtime_server,
|
|
"workspace-map startup context request with instructions",
|
|
|request| websocket_request_instructions(request).is_some(),
|
|
)
|
|
.await;
|
|
let startup_context = websocket_request_instructions(&startup_context_request)
|
|
.expect("startup context request should contain instructions");
|
|
|
|
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
|
assert!(startup_context.contains("## Machine / Workspace Map"));
|
|
assert!(startup_context.contains("notes.txt"));
|
|
assert!(startup_context.contains("codex-rs/"));
|
|
|
|
startup_server.shutdown().await;
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_startup_context_is_truncated_and_sent_once_per_start() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let startup_server = start_websocket_server(vec![vec![]]).await;
|
|
let realtime_server = start_websocket_server(vec![vec![
|
|
vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_truncated", "instructions": "backend prompt" }
|
|
})],
|
|
vec![],
|
|
]])
|
|
.await;
|
|
|
|
let oversized_summary = "recent work ".repeat(3_500);
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build_with_websocket_server(&startup_server).await?;
|
|
seed_recent_thread(&test, &oversized_summary, "summary", "oversized").await?;
|
|
fs::write(test.workspace_path("marker.txt"), "marker")?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let startup_context_request = wait_for_matching_websocket_request(
|
|
&realtime_server,
|
|
"truncated startup context request with instructions",
|
|
|request| websocket_request_instructions(request).is_some(),
|
|
)
|
|
.await;
|
|
let startup_context = websocket_request_instructions(&startup_context_request)
|
|
.expect("startup context request should contain instructions");
|
|
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
|
assert!(startup_context.len() <= 20_500);
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationText(ConversationTextParams {
|
|
text: "hello".to_string(),
|
|
}))
|
|
.await?;
|
|
|
|
let explicit_text_request = wait_for_matching_websocket_request(
|
|
&realtime_server,
|
|
"explicit realtime text request",
|
|
|request| websocket_request_text(request).as_deref() == Some("hello"),
|
|
)
|
|
.await;
|
|
assert_eq!(
|
|
websocket_request_text(&explicit_text_request),
|
|
Some("hello".to_string())
|
|
);
|
|
|
|
startup_server.shutdown().await;
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let api_server = start_mock_server().await;
|
|
let _response_mock = responses::mount_sse_once(
|
|
&api_server,
|
|
responses::sse(vec![
|
|
responses::ev_response_created("resp_1"),
|
|
responses::ev_assistant_message("msg_1", "assistant says hi"),
|
|
responses::ev_completed("resp_1"),
|
|
]),
|
|
)
|
|
.await;
|
|
|
|
let realtime_server = start_websocket_server(vec![vec![
|
|
vec![
|
|
json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_1", "instructions": "backend prompt" }
|
|
}),
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": "delegate hello"
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_1",
|
|
"item_id": "item_1",
|
|
"input_transcript": "delegate hello"
|
|
}),
|
|
],
|
|
vec![],
|
|
]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_1");
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::HandoffRequested(handoff),
|
|
}) if handoff.handoff_id == "handoff_1" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
|
|
while tokio::time::Instant::now() < deadline {
|
|
let connections = realtime_server.connections();
|
|
if connections.len() == 1 && connections[0].len() >= 2 {
|
|
break;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
|
}
|
|
|
|
let realtime_connections = realtime_server.connections();
|
|
assert_eq!(realtime_connections.len(), 1);
|
|
assert_eq!(realtime_connections[0].len(), 2);
|
|
assert_eq!(
|
|
realtime_connections[0][0].body_json()["type"].as_str(),
|
|
Some("session.update")
|
|
);
|
|
assert_eq!(
|
|
realtime_connections[0][1].body_json()["type"].as_str(),
|
|
Some("conversation.handoff.append")
|
|
);
|
|
assert_eq!(
|
|
realtime_connections[0][1].body_json()["handoff_id"].as_str(),
|
|
Some("handoff_1")
|
|
);
|
|
assert_eq!(
|
|
realtime_connections[0][1].body_json()["output_text"].as_str(),
|
|
Some("\"Agent Final Message\":\n\nassistant says hi")
|
|
);
|
|
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn conversation_handoff_persists_across_item_done_until_turn_complete() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let (gate_second_message_tx, gate_second_message_rx) = oneshot::channel();
|
|
let first_chunks = vec![
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_response_created("resp-1")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_assistant_message(
|
|
"msg-1",
|
|
"assistant message 1",
|
|
)),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: Some(gate_second_message_rx),
|
|
body: sse_event(responses::ev_assistant_message(
|
|
"msg-2",
|
|
"assistant message 2",
|
|
)),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_completed("resp-1")),
|
|
},
|
|
];
|
|
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
|
|
|
|
let realtime_server = start_websocket_server(vec![vec![
|
|
vec![
|
|
json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_item_done", "instructions": "backend prompt" }
|
|
}),
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": "delegate now"
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_item_done",
|
|
"item_id": "item_item_done",
|
|
"input_transcript": "delegate now"
|
|
}),
|
|
],
|
|
vec![json!({
|
|
"type": "conversation.item.done",
|
|
"item": { "id": "item_item_done" }
|
|
})],
|
|
vec![],
|
|
]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build_with_streaming_server(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) if session_id == "sess_item_done" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::HandoffRequested(handoff),
|
|
}) if handoff.handoff_id == "handoff_item_done" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let first_append = realtime_server
|
|
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 1)
|
|
.await;
|
|
assert_eq!(
|
|
first_append.body_json()["type"].as_str(),
|
|
Some("conversation.handoff.append")
|
|
);
|
|
assert_eq!(
|
|
first_append.body_json()["handoff_id"].as_str(),
|
|
Some("handoff_item_done")
|
|
);
|
|
assert_eq!(
|
|
first_append.body_json()["output_text"].as_str(),
|
|
Some("\"Agent Final Message\":\n\nassistant message 1")
|
|
);
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::ConversationItemDone { item_id },
|
|
}) if item_id == "item_item_done" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let _ = gate_second_message_tx.send(());
|
|
|
|
let second_append = realtime_server
|
|
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 2)
|
|
.await;
|
|
assert_eq!(
|
|
second_append.body_json()["type"].as_str(),
|
|
Some("conversation.handoff.append")
|
|
);
|
|
assert_eq!(
|
|
second_append.body_json()["handoff_id"].as_str(),
|
|
Some("handoff_item_done")
|
|
);
|
|
assert_eq!(
|
|
second_append.body_json()["output_text"].as_str(),
|
|
Some("\"Agent Final Message\":\n\nassistant message 2")
|
|
);
|
|
|
|
let completion = completions
|
|
.into_iter()
|
|
.next()
|
|
.expect("missing delegated turn completion");
|
|
completion
|
|
.await
|
|
.expect("delegated turn request did not complete");
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
realtime_server.shutdown().await;
|
|
api_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
fn sse_event(event: Value) -> String {
|
|
responses::sse(vec![event])
|
|
}
|
|
|
|
fn message_input_texts(body: &Value, role: &str) -> Vec<String> {
|
|
body.get("input")
|
|
.and_then(Value::as_array)
|
|
.into_iter()
|
|
.flatten()
|
|
.filter(|item| item.get("type").and_then(Value::as_str) == Some("message"))
|
|
.filter(|item| item.get("role").and_then(Value::as_str) == Some(role))
|
|
.filter_map(|item| item.get("content").and_then(Value::as_array))
|
|
.flatten()
|
|
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text"))
|
|
.filter_map(|span| span.get("text").and_then(Value::as_str).map(str::to_owned))
|
|
.collect()
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn inbound_handoff_request_starts_turn() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let api_server = start_mock_server().await;
|
|
let response_mock = responses::mount_sse_once(
|
|
&api_server,
|
|
responses::sse(vec![
|
|
responses::ev_response_created("resp-1"),
|
|
responses::ev_assistant_message("msg-1", "ok"),
|
|
responses::ev_completed("resp-1"),
|
|
]),
|
|
)
|
|
.await;
|
|
|
|
let realtime_server = start_websocket_server(vec![vec![vec![
|
|
json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_inbound", "instructions": "backend prompt" }
|
|
}),
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": "text from realtime"
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_inbound",
|
|
"item_id": "item_inbound",
|
|
"input_transcript": "text from realtime"
|
|
}),
|
|
]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
assert_eq!(session_updated, "sess_inbound");
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::HandoffRequested(handoff),
|
|
}) if handoff.handoff_id == "handoff_inbound"
|
|
&& handoff.input_transcript == "text from realtime" =>
|
|
{
|
|
Some(())
|
|
}
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
let request = response_mock.single_request();
|
|
let user_texts = request.message_input_texts("user");
|
|
assert!(
|
|
user_texts
|
|
.iter()
|
|
.any(|text| text == "user: text from realtime")
|
|
);
|
|
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let api_server = start_mock_server().await;
|
|
let response_mock = responses::mount_sse_once(
|
|
&api_server,
|
|
responses::sse(vec![
|
|
responses::ev_response_created("resp-1"),
|
|
responses::ev_assistant_message("msg-1", "ok"),
|
|
responses::ev_completed("resp-1"),
|
|
]),
|
|
)
|
|
.await;
|
|
|
|
let realtime_server = start_websocket_server(vec![vec![vec![
|
|
json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_inbound_multi", "instructions": "backend prompt" }
|
|
}),
|
|
json!({
|
|
"type": "conversation.output_transcript.delta",
|
|
"delta": "assistant context"
|
|
}),
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": "delegated query"
|
|
}),
|
|
json!({
|
|
"type": "conversation.output_transcript.delta",
|
|
"delta": "assist confirm"
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_inbound_multi",
|
|
"item_id": "item_inbound_multi",
|
|
"input_transcript": "ignored"
|
|
}),
|
|
]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
let request = response_mock.single_request();
|
|
let user_texts = request.message_input_texts("user");
|
|
assert!(user_texts.iter().any(|text| text
|
|
== "assistant: assistant context\nuser: delegated query\nassistant: assist confirm"));
|
|
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let api_server = start_mock_server().await;
|
|
let response_mock = responses::mount_sse_sequence(
|
|
&api_server,
|
|
vec![
|
|
responses::sse(vec![
|
|
responses::ev_response_created("resp-1"),
|
|
responses::ev_assistant_message("msg-1", "first ok"),
|
|
responses::ev_completed("resp-1"),
|
|
]),
|
|
responses::sse(vec![
|
|
responses::ev_response_created("resp-2"),
|
|
responses::ev_assistant_message("msg-2", "second ok"),
|
|
responses::ev_completed("resp-2"),
|
|
]),
|
|
],
|
|
)
|
|
.await;
|
|
|
|
let realtime_server = start_websocket_server(vec![vec![
|
|
vec![
|
|
json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_inbound_clear", "instructions": "backend prompt" }
|
|
}),
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": "first question"
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_inbound_clear_1",
|
|
"item_id": "item_inbound_clear_1",
|
|
"input_transcript": "first question"
|
|
}),
|
|
],
|
|
vec![],
|
|
vec![
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": "second question"
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_inbound_clear_2",
|
|
"item_id": "item_inbound_clear_2",
|
|
"input_transcript": "second question"
|
|
}),
|
|
],
|
|
]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) => Some(session_id.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
|
|
frame: RealtimeAudioFrame {
|
|
data: "AQID".to_string(),
|
|
sample_rate: 24000,
|
|
num_channels: 1,
|
|
samples_per_channel: Some(480),
|
|
item_id: None,
|
|
},
|
|
}))
|
|
.await?;
|
|
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
let requests = response_mock.requests();
|
|
assert_eq!(requests.len(), 2);
|
|
|
|
let first_user_texts = requests[0].message_input_texts("user");
|
|
assert!(
|
|
first_user_texts
|
|
.iter()
|
|
.any(|text| text == "user: first question")
|
|
);
|
|
|
|
let second_user_texts = requests[1].message_input_texts("user");
|
|
assert!(
|
|
second_user_texts
|
|
.iter()
|
|
.any(|text| text == "user: second question")
|
|
);
|
|
assert!(
|
|
!second_user_texts
|
|
.iter()
|
|
.any(|text| text == "user: first question\nuser: second question")
|
|
);
|
|
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let api_server = start_mock_server().await;
|
|
|
|
let realtime_server = start_websocket_server(vec![vec![vec![
|
|
json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_ignore_item", "instructions": "backend prompt" }
|
|
}),
|
|
json!({
|
|
"type": "conversation.item.added",
|
|
"item": {
|
|
"type": "message",
|
|
"role": "user",
|
|
"content": [{"type": "text", "text": "echoed local text"}]
|
|
}
|
|
}),
|
|
json!({
|
|
"type": "conversation.output_audio.delta",
|
|
"delta": "AQID",
|
|
"sample_rate": 24000,
|
|
"channels": 1
|
|
}),
|
|
]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) if session_id == "sess_ignore_item" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let audio_out = tokio::time::timeout(
|
|
Duration::from_millis(500),
|
|
wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::AudioOut(frame),
|
|
}) => Some(frame.clone()),
|
|
_ => None,
|
|
}),
|
|
)
|
|
.await
|
|
.expect("timed out waiting for realtime audio after conversation item");
|
|
assert_eq!(audio_out.data, "AQID");
|
|
|
|
let unexpected_turn_started = tokio::time::timeout(
|
|
Duration::from_millis(200),
|
|
wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::TurnStarted(_) => Some(()),
|
|
_ => None,
|
|
}),
|
|
)
|
|
.await;
|
|
assert!(unexpected_turn_started.is_err());
|
|
|
|
realtime_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_audio() -> Result<()>
|
|
{
|
|
skip_if_no_network!(Ok(()));
|
|
let start = std::time::Instant::now();
|
|
|
|
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
|
let first_chunks = vec![
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_response_created("resp-1")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_assistant_message(
|
|
"msg-1",
|
|
"assistant says hi",
|
|
)),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: Some(gate_completed_rx),
|
|
body: sse_event(responses::ev_completed("resp-1")),
|
|
},
|
|
];
|
|
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
|
|
|
|
let realtime_server = start_websocket_server(vec![vec![
|
|
vec![
|
|
json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_echo_guard", "instructions": "backend prompt" }
|
|
}),
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": "delegate now"
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_echo_guard",
|
|
"item_id": "item_echo_guard",
|
|
"input_transcript": "delegate now"
|
|
}),
|
|
],
|
|
vec![
|
|
json!({
|
|
"type": "conversation.item.added",
|
|
"item": {
|
|
"type": "message",
|
|
"role": "user",
|
|
"content": [{"type": "text", "text": "assistant says hi"}]
|
|
}
|
|
}),
|
|
json!({
|
|
"type": "conversation.output_audio.delta",
|
|
"delta": "AQID",
|
|
"sample_rate": 24000,
|
|
"channels": 1
|
|
}),
|
|
],
|
|
]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_model("gpt-5.1").with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build_with_streaming_server(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) if session_id == "sess_echo_guard" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::HandoffRequested(handoff),
|
|
}) if handoff.input_transcript == "delegate now" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
eprintln!(
|
|
"[realtime test +{}ms] saw trigger text={:?}",
|
|
start.elapsed().as_millis(),
|
|
"delegate now"
|
|
);
|
|
|
|
let mirrored_request = realtime_server
|
|
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 1)
|
|
.await;
|
|
let mirrored_request_body = mirrored_request.body_json();
|
|
eprintln!(
|
|
"[realtime test +{}ms] saw mirrored request type={:?} handoff_id={:?} text={:?}",
|
|
start.elapsed().as_millis(),
|
|
mirrored_request_body["type"].as_str(),
|
|
mirrored_request_body["handoff_id"].as_str(),
|
|
mirrored_request_body["output_text"].as_str(),
|
|
);
|
|
assert_eq!(
|
|
mirrored_request_body["type"].as_str(),
|
|
Some("conversation.handoff.append")
|
|
);
|
|
assert_eq!(
|
|
mirrored_request_body["handoff_id"].as_str(),
|
|
Some("handoff_echo_guard")
|
|
);
|
|
assert_eq!(
|
|
mirrored_request_body["output_text"].as_str(),
|
|
Some("\"Agent Final Message\":\n\nassistant says hi")
|
|
);
|
|
|
|
let audio_out = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::AudioOut(frame),
|
|
}) => Some(frame.clone()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
eprintln!(
|
|
"[realtime test +{}ms] saw audio out data={} sample_rate={} num_channels={}",
|
|
start.elapsed().as_millis(),
|
|
audio_out.data,
|
|
audio_out.sample_rate,
|
|
audio_out.num_channels
|
|
);
|
|
assert_eq!(audio_out.data, "AQID");
|
|
|
|
let completion = completions
|
|
.into_iter()
|
|
.next()
|
|
.expect("missing delegated turn completion");
|
|
let _ = gate_completed_tx.send(());
|
|
completion
|
|
.await
|
|
.expect("delegated turn request did not complete");
|
|
eprintln!(
|
|
"[realtime test +{}ms] delegated completion resolved",
|
|
start.elapsed().as_millis()
|
|
);
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
let requests = api_server.requests().await;
|
|
assert_eq!(requests.len(), 1);
|
|
|
|
realtime_server.shutdown().await;
|
|
api_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
|
let first_chunks = vec![
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_response_created("resp-1")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: Some(gate_completed_rx),
|
|
body: sse_event(responses::ev_completed("resp-1")),
|
|
},
|
|
];
|
|
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
|
|
|
|
let realtime_server = start_websocket_server(vec![vec![vec![
|
|
json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_non_blocking", "instructions": "backend prompt" }
|
|
}),
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": "delegate now"
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_non_blocking",
|
|
"item_id": "item_non_blocking",
|
|
"input_transcript": "delegate now"
|
|
}),
|
|
json!({
|
|
"type": "conversation.output_audio.delta",
|
|
"delta": "AQID",
|
|
"sample_rate": 24000,
|
|
"channels": 1
|
|
}),
|
|
]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_model("gpt-5.1").with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build_with_streaming_server(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) if session_id == "sess_non_blocking" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::HandoffRequested(handoff),
|
|
}) if handoff.input_transcript == "delegate now" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let audio_out = tokio::time::timeout(
|
|
Duration::from_millis(500),
|
|
wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::AudioOut(frame),
|
|
}) => Some(frame.clone()),
|
|
_ => None,
|
|
}),
|
|
)
|
|
.await
|
|
.expect("timed out waiting for realtime audio while delegated turn was still pending");
|
|
assert_eq!(audio_out.data, "AQID");
|
|
|
|
let completion = completions
|
|
.into_iter()
|
|
.next()
|
|
.expect("missing delegated turn completion");
|
|
let _ = gate_completed_tx.send(());
|
|
completion
|
|
.await
|
|
.expect("delegated turn request did not complete");
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
realtime_server.shutdown().await;
|
|
api_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
|
let first_chunks = vec![
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_response_created("resp-1")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_message_item_added("msg-1", "")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_output_text_delta("first ")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_output_text_delta("turn")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_assistant_message("msg-1", "first turn")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: Some(gate_completed_rx),
|
|
body: sse_event(responses::ev_completed("resp-1")),
|
|
},
|
|
];
|
|
let second_chunks = vec![
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_response_created("resp-2")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_completed("resp-2")),
|
|
},
|
|
];
|
|
let (api_server, completions) =
|
|
start_streaming_sse_server(vec![first_chunks, second_chunks]).await;
|
|
|
|
let realtime_server = start_websocket_server(vec![vec![
|
|
vec![json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_steer", "instructions": "backend prompt" }
|
|
})],
|
|
vec![
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": "steer via realtime"
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_steer",
|
|
"item_id": "item_steer",
|
|
"input_transcript": "steer via realtime"
|
|
}),
|
|
],
|
|
]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_model("gpt-5.1").with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build_with_streaming_server(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) if session_id == "sess_steer" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
test.codex
|
|
.submit(Op::UserInput {
|
|
items: vec![UserInput::Text {
|
|
text: "first prompt".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
final_output_json_schema: None,
|
|
responsesapi_client_metadata: None,
|
|
})
|
|
.await?;
|
|
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::AgentMessageContentDelta(_))
|
|
})
|
|
.await;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
|
|
frame: RealtimeAudioFrame {
|
|
data: "AQID".to_string(),
|
|
sample_rate: 24000,
|
|
num_channels: 1,
|
|
samples_per_channel: Some(480),
|
|
item_id: None,
|
|
},
|
|
}))
|
|
.await?;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::HandoffRequested(handoff),
|
|
}) if handoff.input_transcript == "steer via realtime" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let mut completion_iter = completions.into_iter();
|
|
let first_completion = completion_iter.next().expect("missing first completion");
|
|
let second_completion = completion_iter.next().expect("missing second completion");
|
|
|
|
let _ = gate_completed_tx.send(());
|
|
first_completion
|
|
.await
|
|
.expect("first request did not complete");
|
|
second_completion
|
|
.await
|
|
.expect("second request did not complete");
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
let requests = api_server.requests().await;
|
|
assert_eq!(requests.len(), 2);
|
|
|
|
let first_body: Value = serde_json::from_slice(&requests[0]).expect("parse first request");
|
|
let second_body: Value = serde_json::from_slice(&requests[1]).expect("parse second request");
|
|
let first_texts = message_input_texts(&first_body, "user");
|
|
let second_texts = message_input_texts(&second_body, "user");
|
|
|
|
assert!(first_texts.iter().any(|text| text == "first prompt"));
|
|
assert!(
|
|
!first_texts
|
|
.iter()
|
|
.any(|text| text == "user: steer via realtime")
|
|
);
|
|
assert!(second_texts.iter().any(|text| text == "first prompt"));
|
|
assert!(
|
|
second_texts
|
|
.iter()
|
|
.any(|text| text == "user: steer via realtime")
|
|
);
|
|
|
|
realtime_server.shutdown().await;
|
|
api_server.shutdown().await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
|
let first_chunks = vec![
|
|
StreamingSseChunk {
|
|
gate: None,
|
|
body: sse_event(responses::ev_response_created("resp-1")),
|
|
},
|
|
StreamingSseChunk {
|
|
gate: Some(gate_completed_rx),
|
|
body: sse_event(responses::ev_completed("resp-1")),
|
|
},
|
|
];
|
|
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
|
|
|
|
let delegated_text = "delegate from handoff request";
|
|
let realtime_server = start_websocket_server(vec![vec![vec![
|
|
json!({
|
|
"type": "session.updated",
|
|
"session": { "id": "sess_handoff_request", "instructions": "backend prompt" }
|
|
}),
|
|
json!({
|
|
"type": "conversation.input_transcript.delta",
|
|
"delta": delegated_text
|
|
}),
|
|
json!({
|
|
"type": "conversation.handoff.requested",
|
|
"handoff_id": "handoff_audio",
|
|
"item_id": "item_audio",
|
|
"input_transcript": delegated_text
|
|
}),
|
|
json!({
|
|
"type": "conversation.output_audio.delta",
|
|
"delta": "AQID",
|
|
"sample_rate": 24000,
|
|
"channels": 1
|
|
}),
|
|
]]])
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_model("gpt-5.1").with_config({
|
|
let realtime_base_url = realtime_server.uri().to_string();
|
|
move |config| {
|
|
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
|
config.realtime.version = RealtimeWsVersion::V1;
|
|
}
|
|
});
|
|
let test = builder.build_with_streaming_server(&api_server).await?;
|
|
|
|
test.codex
|
|
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
|
prompt: Some(Some("backend prompt".to_string())),
|
|
session_id: None,
|
|
transport: None,
|
|
voice: None,
|
|
}))
|
|
.await?;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
|
}) if session_id == "sess_handoff_request" => Some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::HandoffRequested(handoff),
|
|
}) => (handoff.handoff_id == "handoff_audio" && handoff.input_transcript == delegated_text)
|
|
.then_some(()),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
|
|
let audio_out = tokio::time::timeout(
|
|
Duration::from_millis(500),
|
|
wait_for_event_match(&test.codex, |msg| match msg {
|
|
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
|
payload: RealtimeEvent::AudioOut(frame),
|
|
}) => Some(frame.clone()),
|
|
_ => None,
|
|
}),
|
|
)
|
|
.await
|
|
.expect("timed out waiting for realtime audio after handoff request");
|
|
assert_eq!(audio_out.data, "AQID");
|
|
|
|
let completion = completions
|
|
.into_iter()
|
|
.next()
|
|
.expect("missing delegated turn completion");
|
|
let _ = gate_completed_tx.send(());
|
|
completion
|
|
.await
|
|
.expect("delegated turn request did not complete");
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
let requests = api_server.requests().await;
|
|
assert_eq!(requests.len(), 1);
|
|
let first_body: Value = serde_json::from_slice(&requests[0]).expect("parse first request");
|
|
let first_texts = message_input_texts(&first_body, "user");
|
|
let expected_text = format!("user: {delegated_text}");
|
|
assert!(first_texts.iter().any(|text| text == &expected_text));
|
|
|
|
realtime_server.shutdown().await;
|
|
api_server.shutdown().await;
|
|
Ok(())
|
|
}
|