mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Add WebRTC realtime app-server e2e tests
This commit is contained in:
@@ -1,12 +1,18 @@
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::CommandExecutionStatus;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ItemStartedNotification;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::LoginAccountResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadRealtimeAppendAudioParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeAppendTextParams;
|
||||
@@ -26,28 +32,38 @@ use codex_app_server_protocol::ThreadRealtimeStopResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeTranscriptUpdatedNotification;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnStartedNotification;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::protocol::RealtimeConversationVersion;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::WebSocketConnectionConfig;
|
||||
use core_test_support::responses::WebSocketRequest;
|
||||
use core_test_support::responses::WebSocketTestServer;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::responses::start_websocket_server_with_headers;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::mpsc;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use wiremock::Match;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::Request as WiremockRequest;
|
||||
use wiremock::Respond;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
use wiremock::matchers::path_regex;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
@@ -90,6 +106,246 @@ impl Match for RealtimeCallRequestCapture {
|
||||
}
|
||||
}
|
||||
|
||||
struct GatedSseResponse {
|
||||
gate_rx: Mutex<Option<mpsc::Receiver<()>>>,
|
||||
response: String,
|
||||
}
|
||||
|
||||
impl Respond for GatedSseResponse {
|
||||
fn respond(&self, _: &WiremockRequest) -> ResponseTemplate {
|
||||
let gate_rx = self
|
||||
.gate_rx
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.take();
|
||||
if let Some(gate_rx) = gate_rx {
|
||||
let _ = gate_rx.recv();
|
||||
}
|
||||
responses::sse_response(self.response.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum RealtimeTestVersion {
|
||||
V1,
|
||||
V2,
|
||||
}
|
||||
|
||||
impl RealtimeTestVersion {
|
||||
fn config_value(self) -> &'static str {
|
||||
match self {
|
||||
RealtimeTestVersion::V1 => "v1",
|
||||
RealtimeTestVersion::V2 => "v2",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct StartedWebrtcRealtime {
|
||||
started: ThreadRealtimeStartedNotification,
|
||||
sdp: ThreadRealtimeSdpNotification,
|
||||
}
|
||||
|
||||
struct RealtimeE2eHarness {
|
||||
mcp: McpProcess,
|
||||
_codex_home: TempDir,
|
||||
responses_server: MockServer,
|
||||
realtime_server: WebSocketTestServer,
|
||||
call_capture: RealtimeCallRequestCapture,
|
||||
thread_id: String,
|
||||
}
|
||||
|
||||
impl RealtimeE2eHarness {
|
||||
async fn new(
|
||||
realtime_version: RealtimeTestVersion,
|
||||
responses: Vec<String>,
|
||||
sideband_connections: Vec<WebSocketConnectionConfig>,
|
||||
) -> Result<Self> {
|
||||
let responses_server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
Self::new_with_responses_server(realtime_version, responses_server, sideband_connections)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn new_with_responses_server(
|
||||
realtime_version: RealtimeTestVersion,
|
||||
responses_server: MockServer,
|
||||
sideband_connections: Vec<WebSocketConnectionConfig>,
|
||||
) -> Result<Self> {
|
||||
let call_capture = RealtimeCallRequestCapture::new();
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/realtime/calls"))
|
||||
.and(call_capture.clone())
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("Location", "/v1/realtime/calls/rtc_e2e")
|
||||
.set_body_string("v=answer\r\n"),
|
||||
)
|
||||
.mount(&responses_server)
|
||||
.await;
|
||||
|
||||
let realtime_server = start_websocket_server_with_headers(sideband_connections).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml_with_realtime_version(
|
||||
codex_home.path(),
|
||||
&responses_server.uri(),
|
||||
realtime_server.uri(),
|
||||
/*realtime_enabled*/ true,
|
||||
StartupContextConfig::Override("startup context"),
|
||||
realtime_version,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
mcp.initialize().await?;
|
||||
login_with_api_key(&mut mcp, "sk-test-key").await?;
|
||||
|
||||
let thread_start_request_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
let thread_start_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let thread_start: ThreadStartResponse = to_response(thread_start_response)?;
|
||||
|
||||
Ok(Self {
|
||||
mcp,
|
||||
_codex_home: codex_home,
|
||||
responses_server,
|
||||
realtime_server,
|
||||
call_capture,
|
||||
thread_id: thread_start.thread.id,
|
||||
})
|
||||
}
|
||||
|
||||
async fn start_webrtc_realtime(&mut self, offer_sdp: &str) -> Result<StartedWebrtcRealtime> {
|
||||
let start_request_id = self
|
||||
.mcp
|
||||
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
|
||||
thread_id: self.thread_id.clone(),
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Some(ThreadRealtimeStartTransport::Webrtc {
|
||||
sdp: offer_sdp.to_string(),
|
||||
}),
|
||||
})
|
||||
.await?;
|
||||
let start_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
self.mcp
|
||||
.read_stream_until_response_message(RequestId::Integer(start_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadRealtimeStartResponse = to_response(start_response)?;
|
||||
|
||||
let started = self
|
||||
.read_notification::<ThreadRealtimeStartedNotification>("thread/realtime/started")
|
||||
.await?;
|
||||
let sdp = self
|
||||
.read_notification::<ThreadRealtimeSdpNotification>("thread/realtime/sdp")
|
||||
.await?;
|
||||
|
||||
Ok(StartedWebrtcRealtime { started, sdp })
|
||||
}
|
||||
|
||||
async fn read_notification<T: DeserializeOwned>(&mut self, method: &str) -> Result<T> {
|
||||
read_notification(&mut self.mcp, method).await
|
||||
}
|
||||
|
||||
async fn sideband_outbound_request(&self, request_index: usize) -> Value {
|
||||
self.realtime_server
|
||||
.wait_for_request(/*connection_index*/ 0, request_index)
|
||||
.await
|
||||
.body_json()
|
||||
}
|
||||
|
||||
async fn append_audio(&mut self, thread_id: String) -> Result<()> {
|
||||
let request_id = self
|
||||
.mcp
|
||||
.send_thread_realtime_append_audio_request(ThreadRealtimeAppendAudioParams {
|
||||
thread_id,
|
||||
audio: ThreadRealtimeAudioChunk {
|
||||
data: "BQYH".to_string(),
|
||||
sample_rate: 24_000,
|
||||
num_channels: 1,
|
||||
samples_per_channel: Some(480),
|
||||
item_id: None,
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
self.mcp
|
||||
.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadRealtimeAppendAudioResponse = to_response(response)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn append_text(&mut self, thread_id: String, text: &str) -> Result<()> {
|
||||
let request_id = self
|
||||
.mcp
|
||||
.send_thread_realtime_append_text_request(ThreadRealtimeAppendTextParams {
|
||||
thread_id,
|
||||
text: text.to_string(),
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
self.mcp
|
||||
.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadRealtimeAppendTextResponse = to_response(response)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn responses_requests(&self) -> Result<Vec<Value>> {
|
||||
responses_requests(&self.responses_server).await
|
||||
}
|
||||
|
||||
async fn shutdown(self) {
|
||||
self.realtime_server.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
fn sideband_connection(requests: Vec<Vec<Value>>) -> WebSocketConnectionConfig {
|
||||
WebSocketConnectionConfig {
|
||||
requests,
|
||||
response_headers: Vec::new(),
|
||||
accept_delay: None,
|
||||
close_after_requests: true,
|
||||
}
|
||||
}
|
||||
|
||||
fn open_sideband_connection(requests: Vec<Vec<Value>>) -> WebSocketConnectionConfig {
|
||||
WebSocketConnectionConfig {
|
||||
close_after_requests: false,
|
||||
..sideband_connection(requests)
|
||||
}
|
||||
}
|
||||
|
||||
fn session_updated(session_id: &str) -> Value {
|
||||
json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": session_id, "instructions": "backend prompt" }
|
||||
})
|
||||
}
|
||||
|
||||
fn v2_codex_tool_call(call_id: &str, prompt: &str) -> Value {
|
||||
json!({
|
||||
"type": "conversation.item.done",
|
||||
"item": {
|
||||
"id": format!("item_{call_id}"),
|
||||
"type": "function_call",
|
||||
"name": "codex",
|
||||
"call_id": call_id,
|
||||
"arguments": json!({ "prompt": prompt }).to_string()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -582,6 +838,381 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn webrtc_v1_start_posts_offer_returns_sdp_and_joins_sideband() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let mut harness = RealtimeE2eHarness::new(
|
||||
RealtimeTestVersion::V1,
|
||||
Vec::new(),
|
||||
vec![open_sideband_connection(vec![vec![session_updated(
|
||||
"sess_v1_webrtc",
|
||||
)]])],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
|
||||
assert_eq!(
|
||||
started,
|
||||
StartedWebrtcRealtime {
|
||||
started: ThreadRealtimeStartedNotification {
|
||||
thread_id: harness.thread_id.clone(),
|
||||
session_id: Some("sess_v1_webrtc".to_string()),
|
||||
version: RealtimeConversationVersion::V1,
|
||||
},
|
||||
sdp: ThreadRealtimeSdpNotification {
|
||||
thread_id: harness.thread_id.clone(),
|
||||
sdp: "v=answer\r\n".to_string(),
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
assert_call_create_multipart(
|
||||
harness.call_capture.single_request(),
|
||||
"v=offer\r\n",
|
||||
v1_session_create_json(),
|
||||
)?;
|
||||
assert_eq!(
|
||||
harness.realtime_server.single_handshake().uri(),
|
||||
"/v1/realtime?call_id=rtc_e2e"
|
||||
);
|
||||
|
||||
let session_update = harness.sideband_outbound_request(/*request_index*/ 0).await;
|
||||
assert_v1_session_update(&session_update)?;
|
||||
|
||||
let closed = timeout(
|
||||
Duration::from_millis(100),
|
||||
harness
|
||||
.mcp
|
||||
.read_stream_until_notification_message("thread/realtime/closed"),
|
||||
)
|
||||
.await;
|
||||
assert!(closed.is_err(), "WebRTC start should not close immediately");
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let mut harness = RealtimeE2eHarness::new(
|
||||
RealtimeTestVersion::V1,
|
||||
vec![create_final_assistant_message_sse_response(
|
||||
"delegated from v1",
|
||||
)?],
|
||||
vec![sideband_connection(vec![
|
||||
vec![
|
||||
session_updated("sess_v1_handoff"),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "delegate from v1"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_v1",
|
||||
"item_id": "item_v1",
|
||||
"input_transcript": "delegate from v1"
|
||||
}),
|
||||
],
|
||||
vec![],
|
||||
])],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
|
||||
assert_eq!(started.started.version, RealtimeConversationVersion::V1);
|
||||
|
||||
let turn_started = harness
|
||||
.read_notification::<TurnStartedNotification>("turn/started")
|
||||
.await?;
|
||||
assert_eq!(turn_started.thread_id, harness.thread_id);
|
||||
let turn_completed = harness
|
||||
.read_notification::<TurnCompletedNotification>("turn/completed")
|
||||
.await?;
|
||||
assert_eq!(turn_completed.thread_id, harness.thread_id);
|
||||
|
||||
let requests = harness.responses_requests().await?;
|
||||
assert_eq!(requests.len(), 1);
|
||||
assert!(
|
||||
response_request_contains_text(&requests[0], "user: delegate from v1"),
|
||||
"delegated Responses request should contain realtime text: {}",
|
||||
requests[0]
|
||||
);
|
||||
|
||||
let handoff_append = harness.sideband_outbound_request(/*request_index*/ 1).await;
|
||||
assert_eq!(
|
||||
handoff_append,
|
||||
json!({
|
||||
"type": "conversation.handoff.append",
|
||||
"handoff_id": "handoff_v1",
|
||||
"output_text": "\"Agent Final Message\":\n\ndelegated from v1",
|
||||
})
|
||||
);
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let mut harness = RealtimeE2eHarness::new(
|
||||
RealtimeTestVersion::V2,
|
||||
Vec::new(),
|
||||
vec![sideband_connection(vec![
|
||||
vec![session_updated("sess_v2_stream")],
|
||||
vec![],
|
||||
vec![
|
||||
json!({
|
||||
"type": "conversation.item.input_audio_transcription.delta",
|
||||
"delta": "transcribed audio"
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24_000,
|
||||
"channels": 1,
|
||||
"samples_per_channel": 512
|
||||
}),
|
||||
],
|
||||
vec![],
|
||||
])],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
|
||||
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
|
||||
assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?;
|
||||
|
||||
let thread_id = started.started.thread_id.clone();
|
||||
harness.append_audio(thread_id.clone()).await?;
|
||||
harness.append_text(thread_id).await?;
|
||||
|
||||
let transcript = harness
|
||||
.read_notification::<ThreadRealtimeTranscriptUpdatedNotification>(
|
||||
"thread/realtime/transcriptUpdated",
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(transcript.text, "transcribed audio");
|
||||
let output_audio = harness
|
||||
.read_notification::<ThreadRealtimeOutputAudioDeltaNotification>(
|
||||
"thread/realtime/outputAudio/delta",
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(output_audio.audio.data, "AQID");
|
||||
|
||||
let requests = [
|
||||
harness.sideband_outbound_request(/*request_index*/ 1).await,
|
||||
harness.sideband_outbound_request(/*request_index*/ 2).await,
|
||||
harness.sideband_outbound_request(/*request_index*/ 3).await,
|
||||
];
|
||||
assert!(
|
||||
requests
|
||||
.iter()
|
||||
.any(|request| request["type"] == "input_audio_buffer.append"
|
||||
&& request["audio"] == "BQYH"),
|
||||
"sideband requests should include audio append: {requests:?}"
|
||||
);
|
||||
assert!(
|
||||
requests.iter().any(|request| {
|
||||
request["type"] == "conversation.item.create"
|
||||
&& request["item"]["type"] == "message"
|
||||
&& request["item"]["role"] == "user"
|
||||
&& request["item"]["content"][0]["text"] == "hello"
|
||||
}),
|
||||
"sideband requests should include user text item: {requests:?}"
|
||||
);
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn webrtc_v2_codex_tool_call_delegates_and_returns_function_output() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let mut harness = RealtimeE2eHarness::new(
|
||||
RealtimeTestVersion::V2,
|
||||
vec![create_final_assistant_message_sse_response(
|
||||
"delegated from v2",
|
||||
)?],
|
||||
vec![sideband_connection(vec![
|
||||
vec![
|
||||
session_updated("sess_v2_tool"),
|
||||
v2_codex_tool_call("call_v2", "delegate from v2"),
|
||||
],
|
||||
vec![],
|
||||
])],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
|
||||
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
|
||||
|
||||
let turn_started = harness
|
||||
.read_notification::<TurnStartedNotification>("turn/started")
|
||||
.await?;
|
||||
assert_eq!(turn_started.thread_id, harness.thread_id);
|
||||
let turn_completed = harness
|
||||
.read_notification::<TurnCompletedNotification>("turn/completed")
|
||||
.await?;
|
||||
assert_eq!(turn_completed.thread_id, harness.thread_id);
|
||||
|
||||
let requests = harness.responses_requests().await?;
|
||||
assert_eq!(requests.len(), 1);
|
||||
assert!(
|
||||
response_request_contains_text(&requests[0], "delegate from v2"),
|
||||
"delegated Responses request should contain tool prompt: {}",
|
||||
requests[0]
|
||||
);
|
||||
|
||||
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
|
||||
assert_v2_function_call_output(&tool_output, "call_v2", "delegated from v2");
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let mut harness = RealtimeE2eHarness::new(
|
||||
RealtimeTestVersion::V2,
|
||||
vec![
|
||||
create_shell_command_sse_response(
|
||||
vec!["printf".to_string(), "realtime-tool-ok".to_string()],
|
||||
/*workdir*/ None,
|
||||
Some(5000),
|
||||
"shell_call",
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("shell tool finished")?,
|
||||
],
|
||||
vec![sideband_connection(vec![
|
||||
vec![
|
||||
session_updated("sess_v2_shell"),
|
||||
v2_codex_tool_call("call_shell", "run shell through delegated turn"),
|
||||
],
|
||||
vec![],
|
||||
])],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let _ = harness.start_webrtc_realtime("v=offer\r\n").await?;
|
||||
|
||||
let started_command = wait_for_started_command_execution(&mut harness.mcp).await?;
|
||||
let ThreadItem::CommandExecution { id, status, .. } = started_command.item else {
|
||||
unreachable!("helper returns command execution items");
|
||||
};
|
||||
assert_eq!(
|
||||
(id.as_str(), status),
|
||||
("shell_call", CommandExecutionStatus::InProgress)
|
||||
);
|
||||
|
||||
let completed_command = wait_for_completed_command_execution(&mut harness.mcp).await?;
|
||||
let ThreadItem::CommandExecution {
|
||||
id,
|
||||
status,
|
||||
aggregated_output,
|
||||
..
|
||||
} = completed_command.item
|
||||
else {
|
||||
unreachable!("helper returns command execution items");
|
||||
};
|
||||
assert_eq!(id.as_str(), "shell_call");
|
||||
assert_eq!(status, CommandExecutionStatus::Completed);
|
||||
assert_eq!(aggregated_output.as_deref(), Some("realtime-tool-ok"));
|
||||
|
||||
let turn_completed = harness
|
||||
.read_notification::<TurnCompletedNotification>("turn/completed")
|
||||
.await?;
|
||||
assert_eq!(turn_completed.thread_id, harness.thread_id);
|
||||
|
||||
let requests = harness.responses_requests().await?;
|
||||
assert_eq!(requests.len(), 2);
|
||||
assert!(
|
||||
response_request_contains_text(&requests[1], "realtime-tool-ok"),
|
||||
"follow-up Responses request should contain shell output: {}",
|
||||
requests[1]
|
||||
);
|
||||
|
||||
let function_outputs = function_call_output_sideband_requests(&harness.realtime_server);
|
||||
assert_eq!(function_outputs.len(), 1);
|
||||
assert_v2_function_call_output(&function_outputs[0], "call_shell", "shell tool finished");
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn webrtc_v2_tool_call_does_not_block_sideband_audio() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let responses_server = responses::start_mock_server().await;
|
||||
let (gate_completed_tx, gate_completed_rx) = mpsc::channel();
|
||||
let gated_response = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "late delegated result"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(GatedSseResponse {
|
||||
gate_rx: Mutex::new(Some(gate_completed_rx)),
|
||||
response: gated_response,
|
||||
})
|
||||
.expect(1)
|
||||
.mount(&responses_server)
|
||||
.await;
|
||||
|
||||
let mut harness = RealtimeE2eHarness::new_with_responses_server(
|
||||
RealtimeTestVersion::V2,
|
||||
responses_server,
|
||||
vec![sideband_connection(vec![
|
||||
vec![
|
||||
session_updated("sess_v2_nonblocking"),
|
||||
v2_codex_tool_call("call_audio", "delegate while audio continues"),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"delta": "CQoL",
|
||||
"sample_rate": 24_000,
|
||||
"channels": 1,
|
||||
"samples_per_channel": 256
|
||||
}),
|
||||
],
|
||||
vec![],
|
||||
])],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let _ = harness.start_webrtc_realtime("v=offer\r\n").await?;
|
||||
let _ = harness
|
||||
.read_notification::<TurnStartedNotification>("turn/started")
|
||||
.await?;
|
||||
|
||||
let audio = harness
|
||||
.read_notification::<ThreadRealtimeOutputAudioDeltaNotification>(
|
||||
"thread/realtime/outputAudio/delta",
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(audio.audio.data, "CQoL");
|
||||
|
||||
let _ = gate_completed_tx.send(());
|
||||
let turn_completed = harness
|
||||
.read_notification::<TurnCompletedNotification>("turn/completed")
|
||||
.await?;
|
||||
assert_eq!(turn_completed.thread_id, harness.thread_id);
|
||||
|
||||
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
|
||||
assert_v2_function_call_output(&tool_output, "call_audio", "late delegated result");
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_webrtc_start_surfaces_backend_error() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -722,18 +1353,176 @@ async fn login_with_api_key(mcp: &mut McpProcess, api_key: &str) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_started_command_execution(
|
||||
mcp: &mut McpProcess,
|
||||
) -> Result<ItemStartedNotification> {
|
||||
loop {
|
||||
let started = read_notification::<ItemStartedNotification>(mcp, "item/started").await?;
|
||||
if let ThreadItem::CommandExecution { .. } = &started.item {
|
||||
return Ok(started);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_completed_command_execution(
|
||||
mcp: &mut McpProcess,
|
||||
) -> Result<ItemCompletedNotification> {
|
||||
loop {
|
||||
let completed =
|
||||
read_notification::<ItemCompletedNotification>(mcp, "item/completed").await?;
|
||||
if let ThreadItem::CommandExecution { .. } = &completed.item {
|
||||
return Ok(completed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn responses_requests(server: &MockServer) -> Result<Vec<Value>> {
|
||||
server
|
||||
.received_requests()
|
||||
.await
|
||||
.context("failed to fetch received requests")?
|
||||
.into_iter()
|
||||
.filter(|request| request.url.path().ends_with("/responses"))
|
||||
.map(|request| {
|
||||
request
|
||||
.body_json::<Value>()
|
||||
.context("Responses request body should be JSON")
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn response_request_contains_text(request: &Value, text: &str) -> bool {
|
||||
request.to_string().contains(text)
|
||||
}
|
||||
|
||||
fn function_call_output_sideband_requests(server: &WebSocketTestServer) -> Vec<Value> {
|
||||
server
|
||||
.single_connection()
|
||||
.iter()
|
||||
.map(WebSocketRequest::body_json)
|
||||
.filter(|request| {
|
||||
request["type"] == "conversation.item.create"
|
||||
&& request["item"]["type"] == "function_call_output"
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn assert_v2_function_call_output(request: &Value, call_id: &str, expected_output: &str) {
|
||||
assert_eq!(
|
||||
request,
|
||||
&json!({
|
||||
"type": "conversation.item.create",
|
||||
"item": {
|
||||
"type": "function_call_output",
|
||||
"call_id": call_id,
|
||||
"output": format!("\"Agent Final Message\":\n\n{expected_output}"),
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
fn assert_v1_session_update(request: &Value) -> Result<()> {
|
||||
assert_eq!(request["type"].as_str(), Some("session.update"));
|
||||
assert_eq!(request["session"]["type"].as_str(), Some("quicksilver"));
|
||||
assert!(
|
||||
request["session"]["instructions"]
|
||||
.as_str()
|
||||
.context("v1 session.update instructions")?
|
||||
.contains("startup context")
|
||||
);
|
||||
assert_eq!(
|
||||
request["session"]["audio"]["output"]["voice"].as_str(),
|
||||
Some("fathom")
|
||||
);
|
||||
assert_eq!(request["session"]["tools"], Value::Null);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn assert_v2_session_update(request: &Value) -> Result<()> {
|
||||
assert_eq!(request["type"].as_str(), Some("session.update"));
|
||||
assert_eq!(request["session"]["type"].as_str(), Some("realtime"));
|
||||
assert!(
|
||||
request["session"]["instructions"]
|
||||
.as_str()
|
||||
.context("v2 session.update instructions")?
|
||||
.contains("startup context")
|
||||
);
|
||||
assert_eq!(
|
||||
request["session"]["tools"][0]["name"].as_str(),
|
||||
Some("codex")
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn assert_call_create_multipart(
|
||||
request: WiremockRequest,
|
||||
offer_sdp: &str,
|
||||
session: &str,
|
||||
) -> Result<()> {
|
||||
assert_eq!(request.url.path(), "/v1/realtime/calls");
|
||||
assert_eq!(request.url.query(), None);
|
||||
assert_eq!(
|
||||
request
|
||||
.headers
|
||||
.get("content-type")
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some("multipart/form-data; boundary=codex-realtime-call-boundary")
|
||||
);
|
||||
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
|
||||
assert_eq!(
|
||||
body,
|
||||
format!(
|
||||
"--codex-realtime-call-boundary\r\n\
|
||||
Content-Disposition: form-data; name=\"sdp\"\r\n\
|
||||
Content-Type: application/sdp\r\n\
|
||||
\r\n\
|
||||
{offer_sdp}\r\n\
|
||||
--codex-realtime-call-boundary\r\n\
|
||||
Content-Disposition: form-data; name=\"session\"\r\n\
|
||||
Content-Type: application/json\r\n\
|
||||
\r\n\
|
||||
{session}\r\n\
|
||||
--codex-realtime-call-boundary--\r\n"
|
||||
)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn v1_session_create_json() -> &'static str {
|
||||
r#"{"audio":{"input":{"format":{"type":"audio/pcm","rate":24000}},"output":{"voice":"fathom"}},"type":"quicksilver","instructions":"backend prompt\n\nstartup context"}"#
|
||||
}
|
||||
|
||||
fn create_config_toml(
|
||||
codex_home: &Path,
|
||||
responses_server_uri: &str,
|
||||
realtime_server_uri: &str,
|
||||
realtime_enabled: bool,
|
||||
startup_context: StartupContextConfig<'_>,
|
||||
) -> std::io::Result<()> {
|
||||
create_config_toml_with_realtime_version(
|
||||
codex_home,
|
||||
responses_server_uri,
|
||||
realtime_server_uri,
|
||||
realtime_enabled,
|
||||
startup_context,
|
||||
RealtimeTestVersion::V2,
|
||||
)
|
||||
}
|
||||
|
||||
fn create_config_toml_with_realtime_version(
|
||||
codex_home: &Path,
|
||||
responses_server_uri: &str,
|
||||
realtime_server_uri: &str,
|
||||
realtime_enabled: bool,
|
||||
startup_context: StartupContextConfig<'_>,
|
||||
realtime_version: RealtimeTestVersion,
|
||||
) -> std::io::Result<()> {
|
||||
let realtime_feature_key = FEATURES
|
||||
.iter()
|
||||
.find(|spec| spec.id == Feature::RealtimeConversation)
|
||||
.map(|spec| spec.key)
|
||||
.unwrap_or("realtime_conversation");
|
||||
let realtime_version = realtime_version.config_value();
|
||||
let startup_context = match startup_context {
|
||||
StartupContextConfig::Generated => String::new(),
|
||||
StartupContextConfig::Override(context) => {
|
||||
@@ -754,7 +1543,7 @@ experimental_realtime_ws_backend_prompt = "backend prompt"
|
||||
{startup_context}
|
||||
|
||||
[realtime]
|
||||
version = "v2"
|
||||
version = "{realtime_version}"
|
||||
type = "conversational"
|
||||
|
||||
[features]
|
||||
|
||||
Reference in New Issue
Block a user