Compare commits

...

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
96fee71d24 Add realtime rtc protocol support
Adds the realtime_rtc protocol path so app-server clients can negotiate WebRTC media while core keeps realtime events and tool handling. Regenerates protocol/config schemas for the new request and response shapes.

Co-authored-by: Codex <noreply@openai.com>
2026-04-06 15:59:12 -07:00
29 changed files with 872 additions and 76 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -8239,6 +8239,7 @@ dependencies = [
"js-sys",
"log",
"mime",
"mime_guess",
"native-tls",
"percent-encoding",
"pin-project-lite",

View File

@@ -2815,6 +2815,47 @@
],
"type": "object"
},
"ThreadRealtimeStartProtocol": {
"description": "EXPERIMENTAL - realtime start protocol selected by the client.",
"oneOf": [
{
"properties": {
"type": {
"enum": [
"jsonRpcPcm"
],
"title": "JsonRpcPcmThreadRealtimeStartProtocolType",
"type": "string"
}
},
"required": [
"type"
],
"title": "JsonRpcPcmThreadRealtimeStartProtocol",
"type": "object"
},
{
"properties": {
"offerSdp": {
"type": "string"
},
"type": {
"enum": [
"rtc"
],
"title": "RtcThreadRealtimeStartProtocolType",
"type": "string"
}
},
"required": [
"offerSdp",
"type"
],
"title": "RtcThreadRealtimeStartProtocol",
"type": "object"
}
]
},
"ThreadResumeParams": {
"description": "There are three ways to resume a thread: 1. By thread_id: load the thread from disk by thread_id and resume it. 2. By history: instantiate the thread from memory and resume it. 3. By path: load the thread from disk by path and resume it.\n\nThe precedence is: history > path > thread_id. If using history or path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.",
"properties": {

View File

@@ -13518,6 +13518,88 @@
"title": "ThreadRealtimeOutputAudioDeltaNotification",
"type": "object"
},
"ThreadRealtimeStartProtocol": {
"description": "EXPERIMENTAL - realtime start protocol selected by the client.",
"oneOf": [
{
"properties": {
"type": {
"enum": [
"jsonRpcPcm"
],
"title": "JsonRpcPcmThreadRealtimeStartProtocolType",
"type": "string"
}
},
"required": [
"type"
],
"title": "JsonRpcPcmThreadRealtimeStartProtocol",
"type": "object"
},
{
"properties": {
"offerSdp": {
"type": "string"
},
"type": {
"enum": [
"rtc"
],
"title": "RtcThreadRealtimeStartProtocolType",
"type": "string"
}
},
"required": [
"offerSdp",
"type"
],
"title": "RtcThreadRealtimeStartProtocol",
"type": "object"
}
]
},
"ThreadRealtimeStartResponseProtocol": {
"description": "EXPERIMENTAL - realtime start response protocol payload.",
"oneOf": [
{
"properties": {
"type": {
"enum": [
"jsonRpcPcm"
],
"title": "JsonRpcPcmThreadRealtimeStartResponseProtocolType",
"type": "string"
}
},
"required": [
"type"
],
"title": "JsonRpcPcmThreadRealtimeStartResponseProtocol",
"type": "object"
},
{
"properties": {
"answerSdp": {
"type": "string"
},
"type": {
"enum": [
"rtc"
],
"title": "RtcThreadRealtimeStartResponseProtocolType",
"type": "string"
}
},
"required": [
"answerSdp",
"type"
],
"title": "RtcThreadRealtimeStartResponseProtocol",
"type": "object"
}
]
},
"ThreadRealtimeStartedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "EXPERIMENTAL - emitted when thread realtime startup is accepted.",

View File

@@ -11373,6 +11373,88 @@
"title": "ThreadRealtimeOutputAudioDeltaNotification",
"type": "object"
},
"ThreadRealtimeStartProtocol": {
"description": "EXPERIMENTAL - realtime start protocol selected by the client.",
"oneOf": [
{
"properties": {
"type": {
"enum": [
"jsonRpcPcm"
],
"title": "JsonRpcPcmThreadRealtimeStartProtocolType",
"type": "string"
}
},
"required": [
"type"
],
"title": "JsonRpcPcmThreadRealtimeStartProtocol",
"type": "object"
},
{
"properties": {
"offerSdp": {
"type": "string"
},
"type": {
"enum": [
"rtc"
],
"title": "RtcThreadRealtimeStartProtocolType",
"type": "string"
}
},
"required": [
"offerSdp",
"type"
],
"title": "RtcThreadRealtimeStartProtocol",
"type": "object"
}
]
},
"ThreadRealtimeStartResponseProtocol": {
"description": "EXPERIMENTAL - realtime start response protocol payload.",
"oneOf": [
{
"properties": {
"type": {
"enum": [
"jsonRpcPcm"
],
"title": "JsonRpcPcmThreadRealtimeStartResponseProtocolType",
"type": "string"
}
},
"required": [
"type"
],
"title": "JsonRpcPcmThreadRealtimeStartResponseProtocol",
"type": "object"
},
{
"properties": {
"answerSdp": {
"type": "string"
},
"type": {
"enum": [
"rtc"
],
"title": "RtcThreadRealtimeStartResponseProtocolType",
"type": "string"
}
},
"required": [
"answerSdp",
"type"
],
"title": "RtcThreadRealtimeStartResponseProtocol",
"type": "object"
}
]
},
"ThreadRealtimeStartedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "EXPERIMENTAL - emitted when thread realtime startup is accepted.",

View File

@@ -0,0 +1,8 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* EXPERIMENTAL - realtime start protocol selected by the client.
*/
export type ThreadRealtimeStartProtocol = { "type": "jsonRpcPcm" } | { "type": "rtc", offerSdp: string, };

View File

@@ -0,0 +1,8 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* EXPERIMENTAL - realtime start response protocol payload.
*/
export type ThreadRealtimeStartResponseProtocol = { "type": "jsonRpcPcm" } | { "type": "rtc", answerSdp: string, };

View File

@@ -291,6 +291,8 @@ export type { ThreadRealtimeClosedNotification } from "./ThreadRealtimeClosedNot
export type { ThreadRealtimeErrorNotification } from "./ThreadRealtimeErrorNotification";
export type { ThreadRealtimeItemAddedNotification } from "./ThreadRealtimeItemAddedNotification";
export type { ThreadRealtimeOutputAudioDeltaNotification } from "./ThreadRealtimeOutputAudioDeltaNotification";
export type { ThreadRealtimeStartProtocol } from "./ThreadRealtimeStartProtocol";
export type { ThreadRealtimeStartResponseProtocol } from "./ThreadRealtimeStartResponseProtocol";
export type { ThreadRealtimeStartedNotification } from "./ThreadRealtimeStartedNotification";
export type { ThreadRealtimeTranscriptUpdatedNotification } from "./ThreadRealtimeTranscriptUpdatedNotification";
export type { ThreadResumeParams } from "./ThreadResumeParams";

View File

@@ -1695,6 +1695,7 @@ mod tests {
thread_id: "thr_123".to_string(),
prompt: "You are on a call".to_string(),
session_id: Some("sess_456".to_string()),
protocol: v2::ThreadRealtimeStartProtocol::JsonRpcPcm,
},
};
assert_eq!(
@@ -1704,7 +1705,37 @@ mod tests {
"params": {
"threadId": "thr_123",
"prompt": "You are on a call",
"sessionId": "sess_456"
"sessionId": "sess_456",
"protocol": { "type": "jsonRpcPcm" }
}
}),
serde_json::to_value(&request)?,
);
Ok(())
}
#[test]
fn serialize_thread_realtime_start_rtc_protocol() -> Result<()> {
let request = ClientRequest::ThreadRealtimeStart {
request_id: RequestId::Integer(9),
params: v2::ThreadRealtimeStartParams {
thread_id: "thr_123".to_string(),
prompt: "You are on a call".to_string(),
session_id: None,
protocol: v2::ThreadRealtimeStartProtocol::Rtc {
offer_sdp: "v=0".to_string(),
},
},
};
assert_eq!(
json!({
"method": "thread/realtime/start",
"id": 9,
"params": {
"threadId": "thr_123",
"prompt": "You are on a call",
"sessionId": null,
"protocol": { "type": "rtc", "offerSdp": "v=0" }
}
}),
serde_json::to_value(&request)?,
@@ -1784,6 +1815,7 @@ mod tests {
thread_id: "thr_123".to_string(),
prompt: "You are on a call".to_string(),
session_id: None,
protocol: v2::ThreadRealtimeStartProtocol::JsonRpcPcm,
},
};
let reason = crate::experimental_api::ExperimentalApi::experimental_reason(&request);

View File

@@ -3807,7 +3807,7 @@ impl From<ThreadRealtimeAudioChunk> for CoreRealtimeAudioFrame {
}
/// EXPERIMENTAL - start a thread-scoped realtime session.
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadRealtimeStartParams {
@@ -3815,13 +3815,44 @@ pub struct ThreadRealtimeStartParams {
pub prompt: String,
#[ts(optional = nullable)]
pub session_id: Option<String>,
pub protocol: ThreadRealtimeStartProtocol,
}
/// EXPERIMENTAL - realtime start protocol selected by the client.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "camelCase")]
#[ts(export_to = "v2/")]
#[ts(tag = "type")]
pub enum ThreadRealtimeStartProtocol {
JsonRpcPcm,
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
Rtc {
offer_sdp: String,
},
}
/// EXPERIMENTAL - response for starting thread realtime.
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadRealtimeStartResponse {}
pub struct ThreadRealtimeStartResponse {
pub protocol: ThreadRealtimeStartResponseProtocol,
}
/// EXPERIMENTAL - realtime start response protocol payload.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "camelCase")]
#[ts(export_to = "v2/")]
#[ts(tag = "type")]
pub enum ThreadRealtimeStartResponseProtocol {
JsonRpcPcm,
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
Rtc {
answer_sdp: String,
},
}
/// EXPERIMENTAL - append audio input to thread realtime.
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]

View File

@@ -151,8 +151,8 @@ Example with notification opt-out:
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. Review and manual compaction turns reject `turn/steer`.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); returns `{}` and streams `thread/realtime/*` notifications.
- `thread/realtime/appendAudio` — append an input audio chunk to the active realtime session (experimental); returns `{}`.
- `thread/realtime/start` — start a thread-scoped realtime session (experimental). With `features.realtime_rtc=false`, clients send `{ protocol: { type: "jsonRpcPcm" } }`, receive `{ protocol: { type: "jsonRpcPcm" } }`, and stream audio over JSON-RPC. With `features.realtime_rtc=true`, clients send `{ protocol: { type: "rtc", offerSdp } }`, receive `{ protocol: { type: "rtc", answerSdp } }`, and media flows over WebRTC while the harness keeps handling realtime events and tool calls.
- `thread/realtime/appendAudio` — append an input audio chunk to the active JSON-RPC PCM realtime session (experimental); returns `{}`. Unavailable when `features.realtime_rtc=true`.
- `thread/realtime/appendText` — append text input to the active realtime session (experimental); returns `{}`.
- `thread/realtime/stop` — stop the active realtime session for the thread (experimental); returns `{}`.
- `review/start` — kick off Codexs automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.

View File

@@ -142,7 +142,9 @@ use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse;
use codex_app_server_protocol::ThreadRealtimeAppendTextParams;
use codex_app_server_protocol::ThreadRealtimeAppendTextResponse;
use codex_app_server_protocol::ThreadRealtimeStartParams;
use codex_app_server_protocol::ThreadRealtimeStartProtocol;
use codex_app_server_protocol::ThreadRealtimeStartResponse;
use codex_app_server_protocol::ThreadRealtimeStartResponseProtocol;
use codex_app_server_protocol::ThreadRealtimeStopParams;
use codex_app_server_protocol::ThreadRealtimeStopResponse;
use codex_app_server_protocol::ThreadResumeParams;
@@ -273,6 +275,7 @@ use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
use codex_protocol::protocol::RealtimeConversationTransport;
use codex_protocol::protocol::ReviewDelivery as CoreReviewDelivery;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget as CoreReviewTarget;
@@ -6795,30 +6798,101 @@ impl CodexMessageProcessor {
return;
};
let submit = self
.submit_core_op(
&request_id,
thread.as_ref(),
Op::RealtimeConversationStart(ConversationStartParams {
prompt: params.prompt,
session_id: params.session_id,
}),
)
.await;
match submit {
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadRealtimeStartResponse::default())
.await;
}
Err(err) => {
self.send_internal_error(
let realtime_rtc_enabled = thread.enabled(Feature::RealtimeRtc);
match params.protocol {
ThreadRealtimeStartProtocol::JsonRpcPcm if realtime_rtc_enabled => {
self.send_invalid_request_error(
request_id,
format!("failed to start realtime conversation: {err}"),
"thread/realtime/start requires rtc protocol when features.realtime_rtc is enabled"
.to_string(),
)
.await;
}
ThreadRealtimeStartProtocol::JsonRpcPcm => {
let submit = self
.submit_core_op(
&request_id,
thread.as_ref(),
Op::RealtimeConversationStart(ConversationStartParams {
prompt: params.prompt,
session_id: params.session_id,
transport: RealtimeConversationTransport::Websocket,
}),
)
.await;
match submit {
Ok(_) => {
self.outgoing
.send_response(
request_id,
ThreadRealtimeStartResponse {
protocol: ThreadRealtimeStartResponseProtocol::JsonRpcPcm,
},
)
.await;
}
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to start realtime conversation: {err}"),
)
.await;
}
}
}
ThreadRealtimeStartProtocol::Rtc { .. } if !realtime_rtc_enabled => {
self.send_invalid_request_error(
request_id,
"thread/realtime/start rtc protocol requires features.realtime_rtc".to_string(),
)
.await;
}
ThreadRealtimeStartProtocol::Rtc { offer_sdp } => {
let sub_id = format!("{}:{}", request_id.connection_id, request_id.request_id);
let trace = self.request_trace_context(&request_id).await;
let start = thread
.start_realtime_conversation(
sub_id,
ConversationStartParams {
prompt: params.prompt,
session_id: params.session_id,
transport: RealtimeConversationTransport::Rtc { offer_sdp },
},
trace,
)
.await;
match start {
Ok(result) => {
let Some(answer_sdp) = result.answer_sdp else {
self.send_internal_error(
request_id,
"realtime rtc start did not return an SDP answer".to_string(),
)
.await;
return;
};
self.outgoing
.send_response(
request_id,
ThreadRealtimeStartResponse {
protocol: ThreadRealtimeStartResponseProtocol::Rtc {
answer_sdp,
},
},
)
.await;
}
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to start realtime conversation: {err}"),
)
.await;
}
}
}
}
}
@@ -6834,6 +6908,16 @@ impl CodexMessageProcessor {
return;
};
if thread.enabled(Feature::RealtimeRtc) {
self.send_invalid_request_error(
request_id,
"thread/realtime/appendAudio is unavailable when features.realtime_rtc is enabled"
.to_string(),
)
.await;
return;
}
let submit = self
.submit_core_op(
&request_id,

View File

@@ -12,6 +12,7 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::MockExperimentalMethodParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadRealtimeStartParams;
use codex_app_server_protocol::ThreadRealtimeStartProtocol;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use pretty_assertions::assert_eq;
@@ -75,6 +76,7 @@ async fn realtime_conversation_start_requires_experimental_api_capability() -> R
thread_id: "thr_123".to_string(),
prompt: "hello".to_string(),
session_id: None,
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
})
.await?;
let error = timeout(

View File

@@ -17,6 +17,7 @@ use codex_app_server_protocol::ThreadRealtimeErrorNotification;
use codex_app_server_protocol::ThreadRealtimeItemAddedNotification;
use codex_app_server_protocol::ThreadRealtimeOutputAudioDeltaNotification;
use codex_app_server_protocol::ThreadRealtimeStartParams;
use codex_app_server_protocol::ThreadRealtimeStartProtocol;
use codex_app_server_protocol::ThreadRealtimeStartResponse;
use codex_app_server_protocol::ThreadRealtimeStartedNotification;
use codex_app_server_protocol::ThreadRealtimeStopParams;
@@ -121,6 +122,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
thread_id: thread_start.thread.id.clone(),
prompt: "backend prompt".to_string(),
session_id: None,
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
})
.await?;
let start_response: JSONRPCResponse = timeout(
@@ -330,6 +332,7 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
thread_id: thread_start.thread.id.clone(),
prompt: "backend prompt".to_string(),
session_id: None,
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
})
.await?;
let start_response: JSONRPCResponse = timeout(
@@ -401,6 +404,7 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> {
thread_id: thread_start.thread.id.clone(),
prompt: "backend prompt".to_string(),
session_id: None,
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
})
.await?;
let error = timeout(

View File

@@ -23,6 +23,7 @@ tungstenite = { workspace = true }
tracing = { workspace = true }
eventsource-stream = { workspace = true }
regex-lite = { workspace = true }
reqwest = { workspace = true, features = ["multipart"] }
tokio-util = { workspace = true, features = ["codec"] }
url = { workspace = true }
@@ -32,7 +33,6 @@ assert_matches = { workspace = true }
pretty_assertions = { workspace = true }
tokio-test = { workspace = true }
wiremock = { workspace = true }
reqwest = { workspace = true }
[lints]
workspace = true

View File

@@ -0,0 +1,189 @@
use crate::endpoint::realtime_websocket::methods::merge_request_headers;
use crate::endpoint::realtime_websocket::methods_common::session_update_session;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
use crate::error::ApiError;
use crate::provider::Provider;
use codex_client::build_reqwest_client_with_custom_ca;
use http::HeaderMap;
use http::header::LOCATION;
use reqwest::multipart::Form;
use serde::Serialize;
use url::Url;
const REALTIME_CALLS_PATH: &str = "/v1/realtime/calls";
pub struct RealtimeCallCreateResponse {
pub call_id: String,
pub answer_sdp: String,
}
pub struct RealtimeCallClient {
provider: Provider,
}
impl RealtimeCallClient {
pub fn new(provider: Provider) -> Self {
Self { provider }
}
pub async fn create(
&self,
config: &RealtimeSessionConfig,
offer_sdp: String,
extra_headers: HeaderMap,
default_headers: HeaderMap,
) -> Result<RealtimeCallCreateResponse, ApiError> {
let client = build_reqwest_client_with_custom_ca(reqwest::Client::builder())
.map_err(|err| ApiError::Stream(format!("failed to configure realtime HTTP: {err}")))?;
let url = realtime_calls_url(&self.provider.base_url)?;
let headers = merge_request_headers(&self.provider.headers, extra_headers, default_headers);
let response = client
.post(url)
.headers(headers)
.multipart(session_form(config, offer_sdp)?)
.send()
.await
.map_err(|err| ApiError::Stream(format!("failed to create realtime call: {err}")))?;
let status = response.status();
let headers = response.headers().clone();
let answer_sdp = response.text().await.map_err(|err| {
ApiError::Stream(format!("failed to read realtime call answer: {err}"))
})?;
if !status.is_success() {
return Err(ApiError::Stream(format!(
"realtime call failed with HTTP {status}: {answer_sdp}"
)));
}
let call_id =
call_id_from_location(headers.get(LOCATION).and_then(|value| value.to_str().ok()))?;
Ok(RealtimeCallCreateResponse {
call_id,
answer_sdp,
})
}
}
fn realtime_calls_url(base_url: &str) -> Result<Url, ApiError> {
let mut url =
Url::parse(base_url).map_err(|err| ApiError::Stream(format!("invalid base URL: {err}")))?;
match url.scheme() {
"http" | "https" => {}
"ws" => {
let _ = url.set_scheme("http");
}
"wss" => {
let _ = url.set_scheme("https");
}
scheme => {
return Err(ApiError::Stream(format!(
"unsupported realtime calls URL scheme: {scheme}"
)));
}
}
url.set_path(REALTIME_CALLS_PATH);
url.set_query(None);
Ok(url)
}
fn session_form(config: &RealtimeSessionConfig, offer_sdp: String) -> Result<Form, ApiError> {
let session_json = serde_json::to_string(&session_payload(config)).map_err(|err| {
ApiError::Stream(format!("failed to serialize realtime call session: {err}"))
})?;
Ok(Form::new()
.text("sdp", offer_sdp)
.text("session", session_json))
}
fn session_payload(config: &RealtimeSessionConfig) -> RealtimeCallSession {
let session = session_update_session(
config.event_parser,
config.instructions.clone(),
config.session_mode,
);
RealtimeCallSession {
session,
model: config.model.clone(),
}
}
#[derive(Serialize)]
struct RealtimeCallSession {
#[serde(flatten)]
session: SessionUpdateSession,
#[serde(skip_serializing_if = "Option::is_none")]
model: Option<String>,
}
fn call_id_from_location(location: Option<&str>) -> Result<String, ApiError> {
let Some(location) = location else {
return Err(ApiError::Stream(
"realtime call response missing Location header".to_string(),
));
};
let call_id = location
.trim_end_matches('/')
.rsplit('/')
.next()
.filter(|call_id| !call_id.is_empty())
.ok_or_else(|| ApiError::Stream("invalid realtime call Location header".to_string()))?;
Ok(call_id.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use pretty_assertions::assert_eq;
#[test]
fn realtime_calls_url_uses_calls_path() {
assert_eq!(
realtime_calls_url("wss://api.openai.com/v1/realtime")
.expect("url")
.as_str(),
"https://api.openai.com/v1/realtime/calls"
);
}
#[test]
fn call_id_from_location_extracts_last_path_segment() {
assert_eq!(
call_id_from_location(Some("/v1/realtime/calls/rtc_123")).expect("call id"),
"rtc_123"
);
}
#[test]
fn session_form_contains_offer_and_session() {
let form = session_form(
&RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("gpt-realtime".to_string()),
session_id: Some("session".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
session_mode: RealtimeSessionMode::Conversational,
},
"v=0".to_string(),
)
.expect("form");
assert!(!form.boundary().is_empty());
}
#[test]
fn session_payload_includes_model() {
let payload = serde_json::to_value(session_payload(&RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("gpt-realtime".to_string()),
session_id: Some("session".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
session_mode: RealtimeSessionMode::Conversational,
}))
.expect("session payload");
assert_eq!(payload["type"], "realtime");
assert_eq!(payload["model"], "gpt-realtime");
assert_eq!(payload["instructions"], "backend prompt");
}
}

View File

@@ -463,7 +463,34 @@ impl RealtimeWebsocketClient {
config.event_parser,
config.session_mode,
)?;
self.connect_with_url(ws_url, config, extra_headers, default_headers)
.await
}
pub async fn connect_to_call_id(
&self,
config: RealtimeSessionConfig,
call_id: &str,
extra_headers: HeaderMap,
default_headers: HeaderMap,
) -> Result<RealtimeWebsocketConnection, ApiError> {
ensure_rustls_crypto_provider();
let ws_url = websocket_url_from_api_url_with_call_id(
self.provider.base_url.as_str(),
self.provider.query_params.as_ref(),
call_id,
)?;
self.connect_with_url(ws_url, config, extra_headers, default_headers)
.await
}
async fn connect_with_url(
&self,
ws_url: Url,
config: RealtimeSessionConfig,
extra_headers: HeaderMap,
default_headers: HeaderMap,
) -> Result<RealtimeWebsocketConnection, ApiError> {
let mut request = ws_url
.as_str()
.into_client_request()
@@ -509,7 +536,7 @@ impl RealtimeWebsocketClient {
}
}
fn merge_request_headers(
pub(super) fn merge_request_headers(
provider_headers: &HeaderMap,
extra_headers: HeaderMap,
default_headers: HeaderMap,
@@ -596,6 +623,46 @@ fn websocket_url_from_api_url(
Ok(url)
}
fn websocket_url_from_api_url_with_call_id(
api_url: &str,
query_params: Option<&HashMap<String, String>>,
call_id: &str,
) -> Result<Url, ApiError> {
let mut url = Url::parse(api_url)
.map_err(|err| ApiError::Stream(format!("failed to parse realtime api_url: {err}")))?;
normalize_realtime_path(&mut url);
match url.scheme() {
"ws" | "wss" => {}
"http" | "https" => {
let scheme = if url.scheme() == "http" { "ws" } else { "wss" };
let _ = url.set_scheme(scheme);
}
scheme => {
return Err(ApiError::Stream(format!(
"unsupported realtime api_url scheme: {scheme}"
)));
}
}
url.set_query(None);
{
let mut query = url.query_pairs_mut();
query.append_pair("call_id", call_id);
if let Some(query_params) = query_params {
for (key, value) in query_params {
if matches!(key.as_str(), "call_id" | "intent" | "model") {
continue;
}
query.append_pair(key, value);
}
}
}
Ok(url)
}
fn normalize_realtime_path(url: &mut Url) {
let path = url.path().to_string();
if path.is_empty() || path == "/" {
@@ -1094,6 +1161,25 @@ mod tests {
assert_eq!(url.as_str(), "wss://example.com/v1/realtime");
}
#[test]
fn websocket_url_with_call_id_uses_call_id_only_for_session_selectors() {
let url = websocket_url_from_api_url_with_call_id(
"https://example.com/v1/realtime?model=old&foo=bar",
Some(&HashMap::from([
("trace".to_string(), "1".to_string()),
("intent".to_string(), "ignored".to_string()),
("model".to_string(), "ignored".to_string()),
])),
"rtc_123",
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://example.com/v1/realtime?call_id=rtc_123&trace=1"
);
}
#[tokio::test]
async fn e2e_connect_and_exchange_events_against_mock_ws_server() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");

View File

@@ -1,3 +1,4 @@
mod calls;
pub mod methods;
mod methods_common;
mod methods_v1;
@@ -7,6 +8,8 @@ mod protocol_common;
mod protocol_v1;
mod protocol_v2;
pub use calls::RealtimeCallClient;
pub use calls::RealtimeCallCreateResponse;
pub use codex_protocol::protocol::RealtimeAudioFrame;
pub use codex_protocol::protocol::RealtimeEvent;
pub use methods::RealtimeWebsocketClient;

View File

@@ -31,6 +31,8 @@ pub use crate::common::response_create_client_metadata;
pub use crate::endpoint::compact::CompactClient;
pub use crate::endpoint::memories::MemoriesClient;
pub use crate::endpoint::models::ModelsClient;
pub use crate::endpoint::realtime_websocket::RealtimeCallClient;
pub use crate::endpoint::realtime_websocket::RealtimeCallCreateResponse;
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;

View File

@@ -437,6 +437,9 @@
"realtime_conversation": {
"type": "boolean"
},
"realtime_rtc": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},
@@ -2144,6 +2147,9 @@
"realtime_conversation": {
"type": "boolean"
},
"realtime_rtc": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},

View File

@@ -27,10 +27,12 @@ use crate::exec_policy::ExecPolicyManager;
use crate::parse_turn_item;
use crate::path_utils::normalize_for_native_workdir;
use crate::realtime_conversation::RealtimeConversationManager;
use crate::realtime_conversation::RealtimeConversationStartResult;
use crate::realtime_conversation::handle_audio as handle_realtime_conversation_audio;
use crate::realtime_conversation::handle_close as handle_realtime_conversation_close;
use crate::realtime_conversation::handle_start as handle_realtime_conversation_start;
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
use crate::realtime_conversation::send_start_error as send_realtime_conversation_start_error;
use crate::realtime_conversation::start_and_send_events as start_realtime_conversation_and_send_events;
use crate::render_skills_section;
use crate::rollout::session_index;
use crate::session_prefix::format_subagent_notification_message;
@@ -346,6 +348,7 @@ use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::BackgroundEventEvent;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::DeprecationNoticeEvent;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::Event;
@@ -734,6 +737,36 @@ impl Codex {
Ok(())
}
pub async fn start_realtime_conversation(
&self,
sub_id: String,
params: ConversationStartParams,
trace: Option<W3cTraceContext>,
) -> CodexResult<RealtimeConversationStartResult> {
let (tx_result, rx_result) = oneshot::channel();
self.session
.realtime_start_waiters
.lock()
.await
.insert(sub_id.clone(), tx_result);
let submit_result = self
.submit_with_id(Submission {
id: sub_id.clone(),
op: Op::RealtimeConversationStart(params),
trace,
})
.await;
if let Err(err) = submit_result {
self.session
.realtime_start_waiters
.lock()
.await
.remove(&sub_id);
return Err(err);
}
rx_result.await.map_err(|_| CodexErr::InternalAgentDied)?
}
pub async fn shutdown_and_wait(&self) -> CodexResult<()> {
let session_loop_termination = self.session_loop_termination.clone();
match self.submit(Op::Shutdown).await {
@@ -823,6 +856,8 @@ pub(crate) struct Session {
features: ManagedFeatures,
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(crate) conversation: Arc<RealtimeConversationManager>,
realtime_start_waiters:
Mutex<HashMap<String, oneshot::Sender<CodexResult<RealtimeConversationStartResult>>>>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
mailbox: Mailbox,
mailbox_rx: Mutex<MailboxReceiver>,
@@ -1978,6 +2013,7 @@ impl Session {
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()),
realtime_start_waiters: Mutex::new(HashMap::new()),
active_turn: Mutex::new(None),
mailbox,
mailbox_rx: Mutex::new(mailbox_rx),
@@ -4512,17 +4548,18 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
false
}
Op::RealtimeConversationStart(params) => {
if let Err(err) =
handle_realtime_conversation_start(&sess, sub.id.clone(), params).await
{
sess.send_event_raw(Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: err.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
}),
})
.await;
let result =
start_realtime_conversation_and_send_events(&sess, &sub.id, params).await;
match sess.realtime_start_waiters.lock().await.remove(&sub.id) {
Some(tx_result) => {
let _ = tx_result.send(result);
}
None => {
if let Err(err) = result {
send_realtime_conversation_start_error(&sess, sub.id.clone(), err)
.await;
}
}
}
false
}

View File

@@ -3,6 +3,7 @@ use crate::codex::Codex;
use crate::codex::SteerInputError;
use crate::config::ConstraintResult;
use crate::file_watcher::WatchRegistration;
use crate::realtime_conversation::RealtimeConversationStartResult;
use codex_features::Feature;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::Personality;
@@ -14,6 +15,7 @@ use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SandboxPolicy;
@@ -92,6 +94,17 @@ impl CodexThread {
self.codex.submit_with_trace(op, trace).await
}
pub async fn start_realtime_conversation(
&self,
sub_id: String,
params: ConversationStartParams,
trace: Option<W3cTraceContext>,
) -> CodexResult<RealtimeConversationStartResult> {
self.codex
.start_realtime_conversation(sub_id, params, trace)
.await
}
pub async fn steer_input(
&self,
input: Vec<UserInput>,

View File

@@ -14,6 +14,7 @@ pub mod codex;
mod realtime_context;
mod realtime_conversation;
pub use codex::SteerInputError;
pub use realtime_conversation::RealtimeConversationStartResult;
mod codex_thread;
mod compact_remote;
pub use codex_thread::CodexThread;

View File

@@ -9,6 +9,7 @@ use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_api::Provider as ApiProvider;
use codex_api::RealtimeAudioFrame;
use codex_api::RealtimeCallClient;
use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
@@ -34,6 +35,7 @@ use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RealtimeConversationClosedEvent;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeConversationStartedEvent;
use codex_protocol::protocol::RealtimeConversationTransport;
use codex_protocol::protocol::RealtimeHandoffRequested;
use http::HeaderMap;
use http::HeaderValue;
@@ -117,6 +119,10 @@ struct RealtimeInputTask {
session_kind: RealtimeSessionKind,
}
pub struct RealtimeConversationStartResult {
pub answer_sdp: Option<String>,
}
impl RealtimeHandoffState {
fn new(output_tx: Sender<HandoffOutput>, session_kind: RealtimeSessionKind) -> Self {
Self {
@@ -159,6 +165,7 @@ impl RealtimeConversationManager {
api_provider: ApiProvider,
extra_headers: Option<HeaderMap>,
session_config: RealtimeSessionConfig,
sideband_call_id: Option<String>,
) -> CodexResult<(Receiver<RealtimeEvent>, Arc<AtomicBool>)> {
let previous_state = {
let mut guard = self.state.lock().await;
@@ -173,14 +180,20 @@ impl RealtimeConversationManager {
};
let client = RealtimeWebsocketClient::new(api_provider);
let connection = client
.connect(
session_config,
extra_headers.unwrap_or_default(),
default_headers(),
)
.await
.map_err(map_api_error)?;
let headers = extra_headers.unwrap_or_default();
let connection = match sideband_call_id {
Some(call_id) => {
client
.connect_to_call_id(session_config, &call_id, headers, default_headers())
.await
}
None => {
client
.connect(session_config, headers, default_headers())
.await
}
}
.map_err(map_api_error)?;
let writer = connection.writer();
let events = connection.events();
@@ -406,39 +419,32 @@ async fn stop_conversation_state(
}
}
pub(crate) async fn handle_start(
pub(crate) async fn send_start_error(sess: &Arc<Session>, sub_id: String, err: CodexErr) {
error!("failed to start realtime conversation: {err}");
let message = err.to_string();
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}),
})
.await;
}
pub(crate) async fn start_and_send_events(
sess: &Arc<Session>,
sub_id: String,
sub_id: &str,
params: ConversationStartParams,
) -> CodexResult<()> {
) -> CodexResult<RealtimeConversationStartResult> {
let prepared_start = match prepare_realtime_start(sess, params).await {
Ok(prepared_start) => prepared_start,
Err(err) => {
error!("failed to prepare realtime conversation: {err}");
let message = err.to_string();
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}),
})
.await;
return Ok(());
return Err(err);
}
};
if let Err(err) = handle_start_inner(sess, &sub_id, prepared_start).await {
error!("failed to start realtime conversation: {err}");
let message = err.to_string();
sess.send_event_raw(Event {
id: sub_id.clone(),
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}),
})
.await;
}
Ok(())
handle_start_inner(sess, sub_id, prepared_start).await
}
struct PreparedRealtimeConversationStart {
@@ -447,6 +453,7 @@ struct PreparedRealtimeConversationStart {
requested_session_id: Option<String>,
version: RealtimeWsVersion,
session_config: RealtimeSessionConfig,
transport: RealtimeConversationTransport,
}
async fn prepare_realtime_start(
@@ -484,7 +491,11 @@ async fn prepare_realtime_start(
format!("{prompt}\n\n{startup_context}")
};
let model = config.experimental_realtime_ws_model.clone();
let version = config.realtime.version;
let transport = params.transport;
let version = match transport {
RealtimeConversationTransport::Rtc { .. } => RealtimeWsVersion::V2,
RealtimeConversationTransport::Websocket => config.realtime.version,
};
let event_parser = match version {
RealtimeWsVersion::V1 => RealtimeEventParser::V1,
RealtimeWsVersion::V2 => RealtimeEventParser::RealtimeV2,
@@ -509,6 +520,7 @@ async fn prepare_realtime_start(
requested_session_id,
version,
session_config,
transport,
})
}
@@ -516,18 +528,39 @@ async fn handle_start_inner(
sess: &Arc<Session>,
sub_id: &str,
prepared_start: PreparedRealtimeConversationStart,
) -> CodexResult<()> {
) -> CodexResult<RealtimeConversationStartResult> {
let PreparedRealtimeConversationStart {
api_provider,
extra_headers,
requested_session_id,
version,
session_config,
transport,
} = prepared_start;
let (answer_sdp, sideband_call_id) = match transport {
RealtimeConversationTransport::Websocket => (None, None),
RealtimeConversationTransport::Rtc { offer_sdp } => {
let call = RealtimeCallClient::new(api_provider.clone())
.create(
&session_config,
offer_sdp,
extra_headers.clone().unwrap_or_default(),
default_headers(),
)
.await
.map_err(map_api_error)?;
(Some(call.answer_sdp), Some(call.call_id))
}
};
info!("starting realtime conversation");
let (events_rx, realtime_active) = sess
.conversation
.start(api_provider, extra_headers, session_config)
.start(
api_provider,
extra_headers,
session_config,
sideband_call_id,
)
.await?;
info!("realtime conversation started");
@@ -601,7 +634,7 @@ async fn handle_start_inner(
.register_fanout_task(&realtime_active, fanout_task)
.await;
Ok(())
Ok(RealtimeConversationStartResult { answer_sdp })
}
pub(crate) async fn handle_audio(

View File

@@ -118,6 +118,7 @@ async fn start_realtime_conversation(codex: &codex_core::CodexThread) -> Result<
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;

View File

@@ -182,6 +182,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -324,6 +325,7 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() ->
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -383,6 +385,7 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -466,6 +469,7 @@ async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Res
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -506,6 +510,7 @@ async fn conversation_start_connect_failure_emits_realtime_error_only() -> Resul
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -594,6 +599,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "old".to_string(),
session_id: Some("conv_old".to_string()),
transport: Default::default(),
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
@@ -610,6 +616,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "new".to_string(),
session_id: Some("conv_new".to_string()),
transport: Default::default(),
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
@@ -696,6 +703,7 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -750,6 +758,7 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "prompt from op".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -812,6 +821,7 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "prompt from op".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -872,6 +882,7 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "prompt from op".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -925,6 +936,7 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -978,6 +990,7 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -1029,6 +1042,7 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -1113,6 +1127,7 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Re
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -1239,6 +1254,7 @@ async fn conversation_handoff_persists_across_item_done_until_turn_complete() ->
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -1380,6 +1396,7 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -1474,6 +1491,7 @@ async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -1566,6 +1584,7 @@ async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -1665,6 +1684,7 @@ async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio(
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -1777,6 +1797,7 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -1919,6 +1940,7 @@ async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> R
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
@@ -2045,6 +2067,7 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
@@ -2186,6 +2209,7 @@ async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio()
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
transport: Default::default(),
}))
.await?;

View File

@@ -176,6 +176,8 @@ pub enum Feature {
FastMode,
/// Enable experimental realtime voice conversation mode in the TUI.
RealtimeConversation,
/// Switch realtime conversation setup to the WebRTC sideband protocol.
RealtimeRtc,
/// Removed compatibility flag. The TUI now always uses the app-server implementation.
TuiAppServer,
/// Prevent idle system sleep while a turn is actively running.
@@ -825,6 +827,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::RealtimeRtc,
key: "realtime_rtc",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::TuiAppServer,
key: "tui_app_server",

View File

@@ -135,6 +135,18 @@ pub struct ConversationStartParams {
pub prompt: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(default)]
pub transport: RealtimeConversationTransport,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum RealtimeConversationTransport {
#[default]
Websocket,
Rtc {
offer_sdp: String,
},
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
@@ -4395,6 +4407,7 @@ mod tests {
let start = Op::RealtimeConversationStart(ConversationStartParams {
prompt: "be helpful".to_string(),
session_id: Some("conv_1".to_string()),
transport: Default::default(),
});
let text = Op::RealtimeConversationText(ConversationTextParams {
text: "hello".to_string(),

View File

@@ -41,6 +41,7 @@ use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse;
use codex_app_server_protocol::ThreadRealtimeAppendTextParams;
use codex_app_server_protocol::ThreadRealtimeAppendTextResponse;
use codex_app_server_protocol::ThreadRealtimeStartParams;
use codex_app_server_protocol::ThreadRealtimeStartProtocol;
use codex_app_server_protocol::ThreadRealtimeStartResponse;
use codex_app_server_protocol::ThreadRealtimeStopParams;
use codex_app_server_protocol::ThreadRealtimeStopResponse;
@@ -658,6 +659,7 @@ impl AppServerSession {
thread_id: thread_id.to_string(),
prompt: params.prompt,
session_id: params.session_id,
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
},
})
.await

View File

@@ -229,6 +229,7 @@ impl ChatWidget {
ConversationStartParams {
prompt: REALTIME_CONVERSATION_PROMPT.to_string(),
session_id: None,
transport: Default::default(),
},
));
self.request_redraw();