Add WebRTC realtime app-server e2e tests

This commit is contained in:
Ahmed Ibrahim
2026-04-07 23:14:40 -07:00
parent 6122b7734e
commit d89965dcc9

View File

@@ -1,12 +1,18 @@
use anyhow::Context;
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::LoginAccountResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadRealtimeAppendAudioParams;
use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse;
use codex_app_server_protocol::ThreadRealtimeAppendTextParams;
@@ -26,28 +32,38 @@ use codex_app_server_protocol::ThreadRealtimeStopResponse;
use codex_app_server_protocol::ThreadRealtimeTranscriptUpdatedNotification;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartedNotification;
use codex_features::FEATURES;
use codex_features::Feature;
use codex_protocol::protocol::RealtimeConversationVersion;
use core_test_support::responses;
use core_test_support::responses::WebSocketConnectionConfig;
use core_test_support::responses::WebSocketRequest;
use core_test_support::responses::WebSocketTestServer;
use core_test_support::responses::start_websocket_server;
use core_test_support::responses::start_websocket_server_with_headers;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
use serde::de::DeserializeOwned;
use serde_json::Value;
use serde_json::json;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::mpsc;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::Match;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request as WiremockRequest;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::path_regex;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
@@ -90,6 +106,246 @@ impl Match for RealtimeCallRequestCapture {
}
}
struct GatedSseResponse {
gate_rx: Mutex<Option<mpsc::Receiver<()>>>,
response: String,
}
impl Respond for GatedSseResponse {
fn respond(&self, _: &WiremockRequest) -> ResponseTemplate {
let gate_rx = self
.gate_rx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.take();
if let Some(gate_rx) = gate_rx {
let _ = gate_rx.recv();
}
responses::sse_response(self.response.clone())
}
}
#[derive(Debug, Clone, Copy)]
enum RealtimeTestVersion {
V1,
V2,
}
impl RealtimeTestVersion {
fn config_value(self) -> &'static str {
match self {
RealtimeTestVersion::V1 => "v1",
RealtimeTestVersion::V2 => "v2",
}
}
}
#[derive(Debug, PartialEq)]
struct StartedWebrtcRealtime {
started: ThreadRealtimeStartedNotification,
sdp: ThreadRealtimeSdpNotification,
}
struct RealtimeE2eHarness {
mcp: McpProcess,
_codex_home: TempDir,
responses_server: MockServer,
realtime_server: WebSocketTestServer,
call_capture: RealtimeCallRequestCapture,
thread_id: String,
}
impl RealtimeE2eHarness {
async fn new(
realtime_version: RealtimeTestVersion,
responses: Vec<String>,
sideband_connections: Vec<WebSocketConnectionConfig>,
) -> Result<Self> {
let responses_server = create_mock_responses_server_sequence_unchecked(responses).await;
Self::new_with_responses_server(realtime_version, responses_server, sideband_connections)
.await
}
async fn new_with_responses_server(
realtime_version: RealtimeTestVersion,
responses_server: MockServer,
sideband_connections: Vec<WebSocketConnectionConfig>,
) -> Result<Self> {
let call_capture = RealtimeCallRequestCapture::new();
Mock::given(method("POST"))
.and(path("/v1/realtime/calls"))
.and(call_capture.clone())
.respond_with(
ResponseTemplate::new(200)
.insert_header("Location", "/v1/realtime/calls/rtc_e2e")
.set_body_string("v=answer\r\n"),
)
.mount(&responses_server)
.await;
let realtime_server = start_websocket_server_with_headers(sideband_connections).await;
let codex_home = TempDir::new()?;
create_config_toml_with_realtime_version(
codex_home.path(),
&responses_server.uri(),
realtime_server.uri(),
/*realtime_enabled*/ true,
StartupContextConfig::Override("startup context"),
realtime_version,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
mcp.initialize().await?;
login_with_api_key(&mut mcp, "sk-test-key").await?;
let thread_start_request_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_request_id)),
)
.await??;
let thread_start: ThreadStartResponse = to_response(thread_start_response)?;
Ok(Self {
mcp,
_codex_home: codex_home,
responses_server,
realtime_server,
call_capture,
thread_id: thread_start.thread.id,
})
}
async fn start_webrtc_realtime(&mut self, offer_sdp: &str) -> Result<StartedWebrtcRealtime> {
let start_request_id = self
.mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: self.thread_id.clone(),
prompt: "backend prompt".to_string(),
session_id: None,
transport: Some(ThreadRealtimeStartTransport::Webrtc {
sdp: offer_sdp.to_string(),
}),
})
.await?;
let start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
self.mcp
.read_stream_until_response_message(RequestId::Integer(start_request_id)),
)
.await??;
let _: ThreadRealtimeStartResponse = to_response(start_response)?;
let started = self
.read_notification::<ThreadRealtimeStartedNotification>("thread/realtime/started")
.await?;
let sdp = self
.read_notification::<ThreadRealtimeSdpNotification>("thread/realtime/sdp")
.await?;
Ok(StartedWebrtcRealtime { started, sdp })
}
async fn read_notification<T: DeserializeOwned>(&mut self, method: &str) -> Result<T> {
read_notification(&mut self.mcp, method).await
}
async fn sideband_outbound_request(&self, request_index: usize) -> Value {
self.realtime_server
.wait_for_request(/*connection_index*/ 0, request_index)
.await
.body_json()
}
async fn append_audio(&mut self, thread_id: String) -> Result<()> {
let request_id = self
.mcp
.send_thread_realtime_append_audio_request(ThreadRealtimeAppendAudioParams {
thread_id,
audio: ThreadRealtimeAudioChunk {
data: "BQYH".to_string(),
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: Some(480),
item_id: None,
},
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
self.mcp
.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let _: ThreadRealtimeAppendAudioResponse = to_response(response)?;
Ok(())
}
async fn append_text(&mut self, thread_id: String, text: &str) -> Result<()> {
let request_id = self
.mcp
.send_thread_realtime_append_text_request(ThreadRealtimeAppendTextParams {
thread_id,
text: text.to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
self.mcp
.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let _: ThreadRealtimeAppendTextResponse = to_response(response)?;
Ok(())
}
async fn responses_requests(&self) -> Result<Vec<Value>> {
responses_requests(&self.responses_server).await
}
async fn shutdown(self) {
self.realtime_server.shutdown().await;
}
}
fn sideband_connection(requests: Vec<Vec<Value>>) -> WebSocketConnectionConfig {
WebSocketConnectionConfig {
requests,
response_headers: Vec::new(),
accept_delay: None,
close_after_requests: true,
}
}
fn open_sideband_connection(requests: Vec<Vec<Value>>) -> WebSocketConnectionConfig {
WebSocketConnectionConfig {
close_after_requests: false,
..sideband_connection(requests)
}
}
fn session_updated(session_id: &str) -> Value {
json!({
"type": "session.updated",
"session": { "id": session_id, "instructions": "backend prompt" }
})
}
fn v2_codex_tool_call(call_id: &str, prompt: &str) -> Value {
json!({
"type": "conversation.item.done",
"item": {
"id": format!("item_{call_id}"),
"type": "function_call",
"name": "codex",
"call_id": call_id,
"arguments": json!({ "prompt": prompt }).to_string()
}
})
}
#[tokio::test]
async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -582,6 +838,381 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn webrtc_v1_start_posts_offer_returns_sdp_and_joins_sideband() -> Result<()> {
skip_if_no_network!(Ok(()));
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V1,
Vec::new(),
vec![open_sideband_connection(vec![vec![session_updated(
"sess_v1_webrtc",
)]])],
)
.await?;
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
assert_eq!(
started,
StartedWebrtcRealtime {
started: ThreadRealtimeStartedNotification {
thread_id: harness.thread_id.clone(),
session_id: Some("sess_v1_webrtc".to_string()),
version: RealtimeConversationVersion::V1,
},
sdp: ThreadRealtimeSdpNotification {
thread_id: harness.thread_id.clone(),
sdp: "v=answer\r\n".to_string(),
},
}
);
assert_call_create_multipart(
harness.call_capture.single_request(),
"v=offer\r\n",
v1_session_create_json(),
)?;
assert_eq!(
harness.realtime_server.single_handshake().uri(),
"/v1/realtime?call_id=rtc_e2e"
);
let session_update = harness.sideband_outbound_request(/*request_index*/ 0).await;
assert_v1_session_update(&session_update)?;
let closed = timeout(
Duration::from_millis(100),
harness
.mcp
.read_stream_until_notification_message("thread/realtime/closed"),
)
.await;
assert!(closed.is_err(), "WebRTC start should not close immediately");
harness.shutdown().await;
Ok(())
}
#[tokio::test]
async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> {
skip_if_no_network!(Ok(()));
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V1,
vec![create_final_assistant_message_sse_response(
"delegated from v1",
)?],
vec![sideband_connection(vec![
vec![
session_updated("sess_v1_handoff"),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate from v1"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_v1",
"item_id": "item_v1",
"input_transcript": "delegate from v1"
}),
],
vec![],
])],
)
.await?;
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
assert_eq!(started.started.version, RealtimeConversationVersion::V1);
let turn_started = harness
.read_notification::<TurnStartedNotification>("turn/started")
.await?;
assert_eq!(turn_started.thread_id, harness.thread_id);
let turn_completed = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
assert_eq!(turn_completed.thread_id, harness.thread_id);
let requests = harness.responses_requests().await?;
assert_eq!(requests.len(), 1);
assert!(
response_request_contains_text(&requests[0], "user: delegate from v1"),
"delegated Responses request should contain realtime text: {}",
requests[0]
);
let handoff_append = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_eq!(
handoff_append,
json!({
"type": "conversation.handoff.append",
"handoff_id": "handoff_v1",
"output_text": "\"Agent Final Message\":\n\ndelegated from v1",
})
);
harness.shutdown().await;
Ok(())
}
#[tokio::test]
async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Result<()> {
skip_if_no_network!(Ok(()));
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V2,
Vec::new(),
vec![sideband_connection(vec![
vec![session_updated("sess_v2_stream")],
vec![],
vec![
json!({
"type": "conversation.item.input_audio_transcription.delta",
"delta": "transcribed audio"
}),
json!({
"type": "response.output_audio.delta",
"delta": "AQID",
"sample_rate": 24_000,
"channels": 1,
"samples_per_channel": 512
}),
],
vec![],
])],
)
.await?;
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?;
let thread_id = started.started.thread_id.clone();
harness.append_audio(thread_id.clone()).await?;
harness.append_text(thread_id).await?;
let transcript = harness
.read_notification::<ThreadRealtimeTranscriptUpdatedNotification>(
"thread/realtime/transcriptUpdated",
)
.await?;
assert_eq!(transcript.text, "transcribed audio");
let output_audio = harness
.read_notification::<ThreadRealtimeOutputAudioDeltaNotification>(
"thread/realtime/outputAudio/delta",
)
.await?;
assert_eq!(output_audio.audio.data, "AQID");
let requests = [
harness.sideband_outbound_request(/*request_index*/ 1).await,
harness.sideband_outbound_request(/*request_index*/ 2).await,
harness.sideband_outbound_request(/*request_index*/ 3).await,
];
assert!(
requests
.iter()
.any(|request| request["type"] == "input_audio_buffer.append"
&& request["audio"] == "BQYH"),
"sideband requests should include audio append: {requests:?}"
);
assert!(
requests.iter().any(|request| {
request["type"] == "conversation.item.create"
&& request["item"]["type"] == "message"
&& request["item"]["role"] == "user"
&& request["item"]["content"][0]["text"] == "hello"
}),
"sideband requests should include user text item: {requests:?}"
);
harness.shutdown().await;
Ok(())
}
#[tokio::test]
async fn webrtc_v2_codex_tool_call_delegates_and_returns_function_output() -> Result<()> {
skip_if_no_network!(Ok(()));
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V2,
vec![create_final_assistant_message_sse_response(
"delegated from v2",
)?],
vec![sideband_connection(vec![
vec![
session_updated("sess_v2_tool"),
v2_codex_tool_call("call_v2", "delegate from v2"),
],
vec![],
])],
)
.await?;
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
let turn_started = harness
.read_notification::<TurnStartedNotification>("turn/started")
.await?;
assert_eq!(turn_started.thread_id, harness.thread_id);
let turn_completed = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
assert_eq!(turn_completed.thread_id, harness.thread_id);
let requests = harness.responses_requests().await?;
assert_eq!(requests.len(), 1);
assert!(
response_request_contains_text(&requests[0], "delegate from v2"),
"delegated Responses request should contain tool prompt: {}",
requests[0]
);
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_v2_function_call_output(&tool_output, "call_v2", "delegated from v2");
harness.shutdown().await;
Ok(())
}
#[tokio::test]
async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<()> {
skip_if_no_network!(Ok(()));
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V2,
vec![
create_shell_command_sse_response(
vec!["printf".to_string(), "realtime-tool-ok".to_string()],
/*workdir*/ None,
Some(5000),
"shell_call",
)?,
create_final_assistant_message_sse_response("shell tool finished")?,
],
vec![sideband_connection(vec![
vec![
session_updated("sess_v2_shell"),
v2_codex_tool_call("call_shell", "run shell through delegated turn"),
],
vec![],
])],
)
.await?;
let _ = harness.start_webrtc_realtime("v=offer\r\n").await?;
let started_command = wait_for_started_command_execution(&mut harness.mcp).await?;
let ThreadItem::CommandExecution { id, status, .. } = started_command.item else {
unreachable!("helper returns command execution items");
};
assert_eq!(
(id.as_str(), status),
("shell_call", CommandExecutionStatus::InProgress)
);
let completed_command = wait_for_completed_command_execution(&mut harness.mcp).await?;
let ThreadItem::CommandExecution {
id,
status,
aggregated_output,
..
} = completed_command.item
else {
unreachable!("helper returns command execution items");
};
assert_eq!(id.as_str(), "shell_call");
assert_eq!(status, CommandExecutionStatus::Completed);
assert_eq!(aggregated_output.as_deref(), Some("realtime-tool-ok"));
let turn_completed = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
assert_eq!(turn_completed.thread_id, harness.thread_id);
let requests = harness.responses_requests().await?;
assert_eq!(requests.len(), 2);
assert!(
response_request_contains_text(&requests[1], "realtime-tool-ok"),
"follow-up Responses request should contain shell output: {}",
requests[1]
);
let function_outputs = function_call_output_sideband_requests(&harness.realtime_server);
assert_eq!(function_outputs.len(), 1);
assert_v2_function_call_output(&function_outputs[0], "call_shell", "shell tool finished");
harness.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn webrtc_v2_tool_call_does_not_block_sideband_audio() -> Result<()> {
skip_if_no_network!(Ok(()));
let responses_server = responses::start_mock_server().await;
let (gate_completed_tx, gate_completed_rx) = mpsc::channel();
let gated_response = responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "late delegated result"),
responses::ev_completed("resp-1"),
]);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(GatedSseResponse {
gate_rx: Mutex::new(Some(gate_completed_rx)),
response: gated_response,
})
.expect(1)
.mount(&responses_server)
.await;
let mut harness = RealtimeE2eHarness::new_with_responses_server(
RealtimeTestVersion::V2,
responses_server,
vec![sideband_connection(vec![
vec![
session_updated("sess_v2_nonblocking"),
v2_codex_tool_call("call_audio", "delegate while audio continues"),
json!({
"type": "response.output_audio.delta",
"delta": "CQoL",
"sample_rate": 24_000,
"channels": 1,
"samples_per_channel": 256
}),
],
vec![],
])],
)
.await?;
let _ = harness.start_webrtc_realtime("v=offer\r\n").await?;
let _ = harness
.read_notification::<TurnStartedNotification>("turn/started")
.await?;
let audio = harness
.read_notification::<ThreadRealtimeOutputAudioDeltaNotification>(
"thread/realtime/outputAudio/delta",
)
.await?;
assert_eq!(audio.audio.data, "CQoL");
let _ = gate_completed_tx.send(());
let turn_completed = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
assert_eq!(turn_completed.thread_id, harness.thread_id);
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_v2_function_call_output(&tool_output, "call_audio", "late delegated result");
harness.shutdown().await;
Ok(())
}
#[tokio::test]
async fn realtime_webrtc_start_surfaces_backend_error() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -722,18 +1353,176 @@ async fn login_with_api_key(mcp: &mut McpProcess, api_key: &str) -> Result<()> {
Ok(())
}
async fn wait_for_started_command_execution(
mcp: &mut McpProcess,
) -> Result<ItemStartedNotification> {
loop {
let started = read_notification::<ItemStartedNotification>(mcp, "item/started").await?;
if let ThreadItem::CommandExecution { .. } = &started.item {
return Ok(started);
}
}
}
async fn wait_for_completed_command_execution(
mcp: &mut McpProcess,
) -> Result<ItemCompletedNotification> {
loop {
let completed =
read_notification::<ItemCompletedNotification>(mcp, "item/completed").await?;
if let ThreadItem::CommandExecution { .. } = &completed.item {
return Ok(completed);
}
}
}
async fn responses_requests(server: &MockServer) -> Result<Vec<Value>> {
server
.received_requests()
.await
.context("failed to fetch received requests")?
.into_iter()
.filter(|request| request.url.path().ends_with("/responses"))
.map(|request| {
request
.body_json::<Value>()
.context("Responses request body should be JSON")
})
.collect()
}
fn response_request_contains_text(request: &Value, text: &str) -> bool {
request.to_string().contains(text)
}
fn function_call_output_sideband_requests(server: &WebSocketTestServer) -> Vec<Value> {
server
.single_connection()
.iter()
.map(WebSocketRequest::body_json)
.filter(|request| {
request["type"] == "conversation.item.create"
&& request["item"]["type"] == "function_call_output"
})
.collect()
}
fn assert_v2_function_call_output(request: &Value, call_id: &str, expected_output: &str) {
assert_eq!(
request,
&json!({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": call_id,
"output": format!("\"Agent Final Message\":\n\n{expected_output}"),
}
})
);
}
fn assert_v1_session_update(request: &Value) -> Result<()> {
assert_eq!(request["type"].as_str(), Some("session.update"));
assert_eq!(request["session"]["type"].as_str(), Some("quicksilver"));
assert!(
request["session"]["instructions"]
.as_str()
.context("v1 session.update instructions")?
.contains("startup context")
);
assert_eq!(
request["session"]["audio"]["output"]["voice"].as_str(),
Some("fathom")
);
assert_eq!(request["session"]["tools"], Value::Null);
Ok(())
}
fn assert_v2_session_update(request: &Value) -> Result<()> {
assert_eq!(request["type"].as_str(), Some("session.update"));
assert_eq!(request["session"]["type"].as_str(), Some("realtime"));
assert!(
request["session"]["instructions"]
.as_str()
.context("v2 session.update instructions")?
.contains("startup context")
);
assert_eq!(
request["session"]["tools"][0]["name"].as_str(),
Some("codex")
);
Ok(())
}
fn assert_call_create_multipart(
request: WiremockRequest,
offer_sdp: &str,
session: &str,
) -> Result<()> {
assert_eq!(request.url.path(), "/v1/realtime/calls");
assert_eq!(request.url.query(), None);
assert_eq!(
request
.headers
.get("content-type")
.and_then(|value| value.to_str().ok()),
Some("multipart/form-data; boundary=codex-realtime-call-boundary")
);
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
assert_eq!(
body,
format!(
"--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"sdp\"\r\n\
Content-Type: application/sdp\r\n\
\r\n\
{offer_sdp}\r\n\
--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"session\"\r\n\
Content-Type: application/json\r\n\
\r\n\
{session}\r\n\
--codex-realtime-call-boundary--\r\n"
)
);
Ok(())
}
fn v1_session_create_json() -> &'static str {
r#"{"audio":{"input":{"format":{"type":"audio/pcm","rate":24000}},"output":{"voice":"fathom"}},"type":"quicksilver","instructions":"backend prompt\n\nstartup context"}"#
}
fn create_config_toml(
codex_home: &Path,
responses_server_uri: &str,
realtime_server_uri: &str,
realtime_enabled: bool,
startup_context: StartupContextConfig<'_>,
) -> std::io::Result<()> {
create_config_toml_with_realtime_version(
codex_home,
responses_server_uri,
realtime_server_uri,
realtime_enabled,
startup_context,
RealtimeTestVersion::V2,
)
}
fn create_config_toml_with_realtime_version(
codex_home: &Path,
responses_server_uri: &str,
realtime_server_uri: &str,
realtime_enabled: bool,
startup_context: StartupContextConfig<'_>,
realtime_version: RealtimeTestVersion,
) -> std::io::Result<()> {
let realtime_feature_key = FEATURES
.iter()
.find(|spec| spec.id == Feature::RealtimeConversation)
.map(|spec| spec.key)
.unwrap_or("realtime_conversation");
let realtime_version = realtime_version.config_value();
let startup_context = match startup_context {
StartupContextConfig::Generated => String::new(),
StartupContextConfig::Override(context) => {
@@ -754,7 +1543,7 @@ experimental_realtime_ws_backend_prompt = "backend prompt"
{startup_context}
[realtime]
version = "v2"
version = "{realtime_version}"
type = "conversational"
[features]