Add WebRTC transport to realtime start (#16960)

Adds WebRTC startup to the experimental app-server
`thread/realtime/start` method with an optional transport enum. The
websocket path remains the default; WebRTC offers create the realtime
session through the shared start flow and emit the answer SDP via
`thread/realtime/sdp`.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-07 15:43:38 -07:00
committed by GitHub
parent 6c36e7d688
commit fb3dcfde1d
42 changed files with 1574 additions and 85 deletions

View File

@@ -12,6 +12,7 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::MockExperimentalMethodParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadRealtimeStartParams;
use codex_app_server_protocol::ThreadRealtimeStartTransport;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use pretty_assertions::assert_eq;
@@ -75,6 +76,44 @@ async fn realtime_conversation_start_requires_experimental_api_capability() -> R
thread_id: "thr_123".to_string(),
prompt: "hello".to_string(),
session_id: None,
transport: None,
})
.await?;
let error = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_experimental_capability_error(error, "thread/realtime/start");
Ok(())
}
#[tokio::test]
async fn realtime_webrtc_start_requires_experimental_api_capability() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
let init = mcp
.initialize_with_capabilities(
default_client_info(),
Some(InitializeCapabilities {
experimental_api: false,
opt_out_notification_methods: None,
}),
)
.await?;
let JSONRPCMessage::Response(_) = init else {
anyhow::bail!("expected initialize response, got {init:?}");
};
let request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: "thr_123".to_string(),
prompt: "hello".to_string(),
session_id: None,
transport: Some(ThreadRealtimeStartTransport::Webrtc {
sdp: "v=offer\r\n".to_string(),
}),
})
.await?;
let error = timeout(

View File

@@ -16,8 +16,10 @@ use codex_app_server_protocol::ThreadRealtimeClosedNotification;
use codex_app_server_protocol::ThreadRealtimeErrorNotification;
use codex_app_server_protocol::ThreadRealtimeItemAddedNotification;
use codex_app_server_protocol::ThreadRealtimeOutputAudioDeltaNotification;
use codex_app_server_protocol::ThreadRealtimeSdpNotification;
use codex_app_server_protocol::ThreadRealtimeStartParams;
use codex_app_server_protocol::ThreadRealtimeStartResponse;
use codex_app_server_protocol::ThreadRealtimeStartTransport;
use codex_app_server_protocol::ThreadRealtimeStartedNotification;
use codex_app_server_protocol::ThreadRealtimeStopParams;
use codex_app_server_protocol::ThreadRealtimeStopResponse;
@@ -33,13 +35,59 @@ use pretty_assertions::assert_eq;
use serde::de::DeserializeOwned;
use serde_json::json;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::Match;
use wiremock::Mock;
use wiremock::Request as WiremockRequest;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
#[derive(Debug, Clone, Copy)]
enum StartupContextConfig<'a> {
Generated,
Override(&'a str),
}
#[derive(Debug, Clone)]
struct RealtimeCallRequestCapture {
requests: Arc<Mutex<Vec<WiremockRequest>>>,
}
impl RealtimeCallRequestCapture {
fn new() -> Self {
Self {
requests: Arc::new(Mutex::new(Vec::new())),
}
}
fn single_request(&self) -> WiremockRequest {
let requests = self
.requests
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
assert_eq!(requests.len(), 1, "expected one realtime call request");
requests[0].clone()
}
}
impl Match for RealtimeCallRequestCapture {
fn matches(&self, request: &WiremockRequest) -> bool {
self.requests
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(request.clone());
true
}
}
#[tokio::test]
async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -100,6 +148,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
&responses_server.uri(),
realtime_server.uri(),
/*realtime_enabled*/ true,
StartupContextConfig::Generated,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
@@ -121,6 +170,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
thread_id: thread_start.thread.id.clone(),
prompt: "backend prompt".to_string(),
session_id: None,
transport: None,
})
.await?;
let start_response: JSONRPCResponse = timeout(
@@ -309,6 +359,7 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
&responses_server.uri(),
realtime_server.uri(),
/*realtime_enabled*/ true,
StartupContextConfig::Generated,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
@@ -330,6 +381,7 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
thread_id: thread_start.thread.id.clone(),
prompt: "backend prompt".to_string(),
session_id: None,
transport: None,
})
.await?;
let start_response: JSONRPCResponse = timeout(
@@ -368,6 +420,169 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
skip_if_no_network!(Ok(()));
let responses_server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let call_capture = RealtimeCallRequestCapture::new();
Mock::given(method("POST"))
.and(path("/v1/realtime/calls"))
.and(call_capture.clone())
.respond_with(ResponseTemplate::new(200).set_body_string("v=answer\r\n"))
.mount(&responses_server)
.await;
let realtime_server = start_websocket_server(vec![vec![]]).await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&responses_server.uri(),
realtime_server.uri(),
/*realtime_enabled*/ true,
StartupContextConfig::Override("startup context"),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
mcp.initialize().await?;
login_with_api_key(&mut mcp, "sk-test-key").await?;
let thread_start_request_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_request_id)),
)
.await??;
let thread_start: ThreadStartResponse = to_response(thread_start_response)?;
let thread_id = thread_start.thread.id;
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: thread_id.clone(),
prompt: "backend prompt".to_string(),
session_id: None,
transport: Some(ThreadRealtimeStartTransport::Webrtc {
sdp: "v=offer\r\n".to_string(),
}),
})
.await?;
let start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_request_id)),
)
.await??;
let _: ThreadRealtimeStartResponse = to_response(start_response)?;
let sdp_notification =
read_notification::<ThreadRealtimeSdpNotification>(&mut mcp, "thread/realtime/sdp").await?;
assert_eq!(
sdp_notification,
ThreadRealtimeSdpNotification {
thread_id: thread_id.clone(),
sdp: "v=answer\r\n".to_string()
}
);
let closed_notification =
read_notification::<ThreadRealtimeClosedNotification>(&mut mcp, "thread/realtime/closed")
.await?;
assert_eq!(
closed_notification,
ThreadRealtimeClosedNotification {
thread_id: thread_id.clone(),
reason: Some("transport_closed".to_string())
}
);
let request = call_capture.single_request();
assert_eq!(request.url.path(), "/v1/realtime/calls");
assert_eq!(request.url.query(), None);
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
let session = r#"{"tool_choice":"auto","type":"realtime","instructions":"backend prompt\n\nstartup context","output_modalities":["audio"],"audio":{"input":{"format":{"type":"audio/pcm","rate":24000},"noise_reduction":{"type":"near_field"},"turn_detection":{"type":"server_vad","interrupt_response":true,"create_response":true}},"output":{"format":{"type":"audio/pcm","rate":24000},"voice":"marin"}},"tools":[{"type":"function","name":"codex","description":"Delegate a request to Codex and return the final result to the user. Use this as the default action. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.","parameters":{"type":"object","properties":{"prompt":{"type":"string","description":"The user request to delegate to Codex."}},"required":["prompt"],"additionalProperties":false}}]}"#;
assert_eq!(
body,
format!(
"--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"sdp\"\r\n\
Content-Type: application/sdp\r\n\
\r\n\
v=offer\r\n\
\r\n\
--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"session\"\r\n\
Content-Type: application/json\r\n\
\r\n\
{session}\r\n\
--codex-realtime-call-boundary--\r\n"
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test]
async fn realtime_webrtc_start_surfaces_backend_error() -> Result<()> {
skip_if_no_network!(Ok(()));
let responses_server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
Mock::given(method("POST"))
.and(path("/v1/realtime/calls"))
.respond_with(ResponseTemplate::new(500).set_body_string("boom"))
.mount(&responses_server)
.await;
let realtime_server = start_websocket_server(vec![vec![]]).await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&responses_server.uri(),
realtime_server.uri(),
/*realtime_enabled*/ true,
StartupContextConfig::Override("startup context"),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
mcp.initialize().await?;
login_with_api_key(&mut mcp, "sk-test-key").await?;
let thread_start_request_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_request_id)),
)
.await??;
let thread_start: ThreadStartResponse = to_response(thread_start_response)?;
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: thread_start.thread.id,
prompt: "backend prompt".to_string(),
session_id: None,
transport: Some(ThreadRealtimeStartTransport::Webrtc {
sdp: "v=offer\r\n".to_string(),
}),
})
.await?;
let start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_request_id)),
)
.await??;
let _: ThreadRealtimeStartResponse = to_response(start_response)?;
let error =
read_notification::<ThreadRealtimeErrorNotification>(&mut mcp, "thread/realtime/error")
.await?;
assert!(error.message.contains("currently experiencing high demand"));
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test]
async fn realtime_conversation_requires_feature_flag() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -381,6 +596,7 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> {
&responses_server.uri(),
realtime_server.uri(),
/*realtime_enabled*/ false,
StartupContextConfig::Generated,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
@@ -401,6 +617,7 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> {
thread_id: thread_start.thread.id.clone(),
prompt: "backend prompt".to_string(),
session_id: None,
transport: None,
})
.await?;
let error = timeout(
@@ -450,12 +667,19 @@ fn create_config_toml(
responses_server_uri: &str,
realtime_server_uri: &str,
realtime_enabled: bool,
startup_context: StartupContextConfig<'_>,
) -> std::io::Result<()> {
let realtime_feature_key = FEATURES
.iter()
.find(|spec| spec.id == Feature::RealtimeConversation)
.map(|spec| spec.key)
.unwrap_or("realtime_conversation");
let startup_context = match startup_context {
StartupContextConfig::Generated => String::new(),
StartupContextConfig::Override(context) => {
format!("experimental_realtime_ws_startup_context = {context:?}\n")
}
};
std::fs::write(
codex_home.join("config.toml"),
@@ -466,6 +690,8 @@ approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
experimental_realtime_ws_base_url = "{realtime_server_uri}"
experimental_realtime_ws_backend_prompt = "backend prompt"
{startup_context}
[realtime]
version = "v2"