Support ChatGPT realtime calls auth

Route realtime call auth through CoreAuthProvider and send JSON payloads for non-v1 realtime calls while preserving the v1 multipart request shape.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-04 12:28:59 -07:00
parent 0a623f5bb4
commit d4a29146d2
5 changed files with 168 additions and 67 deletions

View File

@@ -1,7 +1,10 @@
use crate::api_bridge::CoreAuthProvider;
use crate::auth::add_auth_headers_to_header_map;
use crate::endpoint::realtime_websocket::methods_common::conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_common::conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_common::normalized_session_mode;
use crate::endpoint::realtime_websocket::methods_common::session_update_session;
use crate::endpoint::realtime_websocket::methods_common::webrtc_intent;
use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
@@ -25,6 +28,9 @@ use opus::Application;
use opus::Channels;
use opus::Decoder as OpusDecoder;
use opus::Encoder as OpusEncoder;
use reqwest::multipart::Form;
use reqwest::multipart::Part;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
@@ -362,12 +368,13 @@ fn append_transcript_delta(entries: &mut Vec<RealtimeTranscriptEntry>, role: &st
}
pub struct RealtimeWebRtcClient {
auth: CoreAuthProvider,
provider: Provider,
}
impl RealtimeWebRtcClient {
pub fn new(provider: Provider) -> Self {
Self { provider }
pub fn new(provider: Provider, auth: CoreAuthProvider) -> Self {
Self { auth, provider }
}
pub async fn connect(
@@ -380,9 +387,12 @@ impl RealtimeWebRtcClient {
let calls_url = calls_url_from_api_url(
self.provider.base_url.as_str(),
self.provider.query_params.as_ref(),
config.model.as_deref(),
config.event_parser,
)?;
let headers = merge_request_headers(
&self.auth,
&self.provider.headers,
with_session_id_header(extra_headers, config.session_id.as_deref())?,
default_headers,
@@ -412,12 +422,14 @@ impl RealtimeWebRtcClient {
}
fn merge_request_headers(
auth: &CoreAuthProvider,
provider_headers: &HeaderMap,
extra_headers: HeaderMap,
default_headers: HeaderMap,
) -> HeaderMap {
let mut headers = provider_headers.clone();
headers.extend(extra_headers);
add_auth_headers_to_header_map(auth, &mut headers);
for (name, value) in &default_headers {
if let http::header::Entry::Vacant(entry) = headers.entry(name) {
entry.insert(value.clone());
@@ -445,6 +457,8 @@ fn with_session_id_header(
fn calls_url_from_api_url(
api_url: &str,
query_params: Option<&HashMap<String, String>>,
model: Option<&str>,
event_parser: RealtimeEventParser,
) -> Result<Url, ApiError> {
let mut url = Url::parse(api_url)
.map_err(|err| ApiError::Stream(format!("failed to parse realtime api_url: {err}")))?;
@@ -466,10 +480,27 @@ fn calls_url_from_api_url(
}
}
if let Some(query_params) = query_params {
let intent = webrtc_intent(event_parser);
let has_extra_query_params = query_params.is_some_and(|query_params| {
query_params
.iter()
.any(|(key, _)| key != "intent" && !(key == "model" && model.is_some()))
});
if intent.is_some() || model.is_some() || has_extra_query_params {
let mut query = url.query_pairs_mut();
for (key, value) in query_params {
query.append_pair(key, value);
if let Some(intent) = intent {
query.append_pair("intent", intent);
}
if let Some(model) = model {
query.append_pair("model", model);
}
if let Some(query_params) = query_params {
for (key, value) in query_params {
if key == "intent" || (key == "model" && model.is_some()) {
continue;
}
query.append_pair(key, value);
}
}
}
@@ -509,7 +540,10 @@ fn normalize_realtime_calls_path(url: &mut Url) {
if path.ends_with("/v1/") {
url.set_path(&format!("{path}realtime/calls"));
return;
}
url.set_path(&format!("{}/realtime/calls", path.trim_end_matches('/')));
}
async fn connect_webrtc_transport(
@@ -814,18 +848,44 @@ async fn fetch_realtime_answer(
{
session_json.insert("model".to_string(), serde_json::Value::String(model));
}
let session_payload = serde_json::to_string(&session_json)
.map_err(|err| ApiError::Stream(format!("failed to encode realtime session: {err}")))?;
let form = reqwest::multipart::Form::new()
.text("sdp", offer_sdp)
.text("session", session_payload);
let response = client
.post(calls_url)
.headers(headers)
.multipart(form)
.send()
.await
.map_err(|err| ApiError::Stream(format!("failed to create realtime WebRTC call: {err}")))?;
let response = if calls_url.path().ends_with("/v1/realtime/calls") {
let session_payload = serde_json::to_string(&session_json)
.map_err(|err| ApiError::Stream(format!("failed to encode realtime session: {err}")))?;
let form = Form::new()
.part(
"sdp",
Part::text(offer_sdp)
.mime_str("application/sdp")
.map_err(|err| {
ApiError::Stream(format!("failed to encode realtime SDP part: {err}"))
})?,
)
.part(
"session",
Part::text(session_payload)
.mime_str("application/json")
.map_err(|err| {
ApiError::Stream(format!("failed to encode realtime session part: {err}"))
})?,
);
client
.post(calls_url)
.headers(headers)
.multipart(form)
.send()
.await
} else {
client
.post(calls_url)
.headers(headers)
.json(&json!({
"sdp": offer_sdp,
"session": session_json,
}))
.send()
.await
}
.map_err(|err| ApiError::Stream(format!("failed to create realtime WebRTC call: {err}")))?;
let status = response.status();
let body = response.text().await.map_err(|err| {
ApiError::Stream(format!("failed to read realtime WebRTC answer SDP: {err}"))
@@ -893,8 +953,13 @@ mod tests {
#[test]
fn calls_url_from_api_url_normalizes_http_root() {
let query_params = HashMap::from([("model".to_string(), "gpt-realtime".to_string())]);
let calls_url =
calls_url_from_api_url("http://example.com", Some(&query_params)).expect("calls url");
let calls_url = calls_url_from_api_url(
"http://example.com",
Some(&query_params),
Some("gpt-realtime"),
RealtimeEventParser::RealtimeV2,
)
.expect("calls url");
assert_eq!(
calls_url.as_str(),
@@ -905,9 +970,13 @@ mod tests {
#[test]
fn calls_url_from_api_url_preserves_v1_realtime_path_and_query() {
let query_params = HashMap::from([("model".to_string(), "gpt-realtime".to_string())]);
let calls_url =
calls_url_from_api_url("wss://example.com/v1/realtime?foo=bar", Some(&query_params))
.expect("calls url");
let calls_url = calls_url_from_api_url(
"wss://example.com/v1/realtime?foo=bar",
Some(&query_params),
Some("gpt-realtime"),
RealtimeEventParser::RealtimeV2,
)
.expect("calls url");
assert_eq!(
calls_url.as_str(),
@@ -915,6 +984,54 @@ mod tests {
);
}
#[test]
fn calls_url_from_api_url_appends_quicksilver_intent_for_v1() {
let calls_url = calls_url_from_api_url(
"wss://example.com/v1/realtime",
None,
Some("quicksilver-test-model"),
RealtimeEventParser::V1,
)
.expect("calls url");
assert_eq!(
calls_url.as_str(),
"https://example.com/v1/realtime/calls?intent=quicksilver&model=quicksilver-test-model"
);
}
#[test]
fn calls_url_from_api_url_omits_intent_for_v2() {
let calls_url = calls_url_from_api_url(
"wss://example.com/v1/realtime",
None,
Some("gpt-realtime"),
RealtimeEventParser::RealtimeV2,
)
.expect("calls url");
assert_eq!(
calls_url.as_str(),
"https://example.com/v1/realtime/calls?model=gpt-realtime"
);
}
#[test]
fn calls_url_from_api_url_appends_calls_path_to_chatgpt_base_url() {
let calls_url = calls_url_from_api_url(
"https://chatgpt.com/backend-api/codex",
None,
Some("gpt-realtime"),
RealtimeEventParser::RealtimeV2,
)
.expect("calls url");
assert_eq!(
calls_url.as_str(),
"https://chatgpt.com/backend-api/codex/realtime/calls?model=gpt-realtime"
);
}
#[test]
fn parse_session_updated_event() {
let payload = json!({

View File

@@ -1,9 +1,11 @@
use crate::endpoint::realtime_websocket::methods_v1::conversation_handoff_append_message as v1_conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_v1::conversation_item_create_message as v1_conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_v1::session_update_session as v1_session_update_session;
use crate::endpoint::realtime_websocket::methods_v1::webrtc_intent as v1_webrtc_intent;
use crate::endpoint::realtime_websocket::methods_v2::conversation_handoff_append_message as v2_conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_v2::conversation_item_create_message as v2_conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_v2::session_update_session as v2_session_update_session;
use crate::endpoint::realtime_websocket::methods_v2::webrtc_intent as v2_webrtc_intent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
@@ -57,3 +59,10 @@ pub(super) fn session_update_session(
RealtimeEventParser::RealtimeV2 => v2_session_update_session(instructions, session_mode),
}
}
pub(super) fn webrtc_intent(event_parser: RealtimeEventParser) -> Option<&'static str> {
match event_parser {
RealtimeEventParser::V1 => v1_webrtc_intent(),
RealtimeEventParser::RealtimeV2 => v2_webrtc_intent(),
}
}

View File

@@ -61,3 +61,7 @@ pub(super) fn session_update_session(instructions: String) -> SessionUpdateSessi
tool_choice: None,
}
}
pub(super) fn webrtc_intent() -> Option<&'static str> {
Some("quicksilver")
}

View File

@@ -126,3 +126,7 @@ pub(super) fn session_update_session(
},
}
}
pub(super) fn webrtc_intent() -> Option<&'static str> {
None
}

View File

@@ -14,14 +14,13 @@ use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeSessionMode;
use codex_api::RealtimeWebRtcClient;
use codex_api::api_bridge::CoreAuthProvider;
use codex_api::api_bridge::map_api_error;
use codex_api::endpoint::realtime_websocket::RealtimeWebRtcEvents;
use codex_api::endpoint::realtime_websocket::RealtimeWebRtcWriter;
use codex_app_server_protocol::AuthMode;
use codex_login::CodexAuth;
use codex_login::api_bridge::auth_provider_from_auth;
use codex_login::default_client::default_headers;
use codex_login::read_openai_api_key_from_env;
use codex_model_provider_info::ModelProviderInfo;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::protocol::CodexErrorInfo;
@@ -37,7 +36,6 @@ use codex_protocol::protocol::RealtimeConversationStartedEvent;
use codex_protocol::protocol::RealtimeHandoffRequested;
use http::HeaderMap;
use http::HeaderValue;
use http::header::AUTHORIZATION;
use serde_json::Value;
use serde_json::json;
use std::sync::Arc;
@@ -157,6 +155,7 @@ impl RealtimeConversationManager {
pub(crate) async fn start(
&self,
api_provider: ApiProvider,
api_auth: CoreAuthProvider,
extra_headers: Option<HeaderMap>,
session_config: RealtimeSessionConfig,
) -> CodexResult<(Receiver<RealtimeEvent>, Arc<AtomicBool>)> {
@@ -172,7 +171,7 @@ impl RealtimeConversationManager {
RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2,
};
let client = RealtimeWebRtcClient::new(api_provider);
let client = RealtimeWebRtcClient::new(api_provider, api_auth);
let connection = client
.connect(
session_config,
@@ -444,6 +443,7 @@ pub(crate) async fn handle_start(
struct PreparedRealtimeConversationStart {
api_provider: ApiProvider,
api_auth: CoreAuthProvider,
extra_headers: Option<HeaderMap>,
requested_session_id: Option<String>,
version: RealtimeWsVersion,
@@ -461,8 +461,8 @@ async fn prepare_realtime_start(
.auth_manager()
.unwrap_or_else(|| Arc::clone(&sess.services.auth_manager));
let auth = auth_manager.auth().await;
let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?;
let mut api_provider = provider.to_api_provider(Some(AuthMode::ApiKey))?;
let api_auth = auth_provider_from_auth(auth.clone(), &provider)?;
let mut api_provider = provider.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?;
let config = sess.get_config().await;
if let Some(realtime_ws_base_url) = &config.experimental_realtime_ws_base_url {
api_provider.base_url = realtime_ws_base_url.clone();
@@ -502,10 +502,10 @@ async fn prepare_realtime_start(
event_parser,
session_mode,
};
let extra_headers =
realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?;
let extra_headers = realtime_request_headers(requested_session_id.as_deref())?;
Ok(PreparedRealtimeConversationStart {
api_provider,
api_auth,
extra_headers,
requested_session_id,
version,
@@ -520,6 +520,7 @@ async fn handle_start_inner(
) -> CodexResult<()> {
let PreparedRealtimeConversationStart {
api_provider,
api_auth,
extra_headers,
requested_session_id,
version,
@@ -528,7 +529,7 @@ async fn handle_start_inner(
info!("starting realtime conversation");
let (events_rx, realtime_active) = sess
.conversation
.start(api_provider, extra_headers, session_config)
.start(api_provider, api_auth, extra_headers, session_config)
.await?;
info!("realtime conversation started");
@@ -633,36 +634,7 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt
.or((!handoff.input_transcript.is_empty()).then_some(handoff.input_transcript.clone()))
}
fn realtime_api_key(auth: Option<&CodexAuth>, provider: &ModelProviderInfo) -> CodexResult<String> {
if let Some(api_key) = provider.api_key()? {
return Ok(api_key);
}
if let Some(token) = provider.experimental_bearer_token.clone() {
return Ok(token);
}
if let Some(api_key) = auth.and_then(CodexAuth::api_key) {
return Ok(api_key.to_string());
}
// TODO(aibrahim): Remove this temporary fallback once realtime auth no longer
// requires API key auth for ChatGPT/SIWC sessions.
if provider.is_openai()
&& let Some(api_key) = read_openai_api_key_from_env()
{
return Ok(api_key);
}
Err(CodexErr::InvalidRequest(
"realtime conversation requires API key auth".to_string(),
))
}
fn realtime_request_headers(
session_id: Option<&str>,
api_key: &str,
) -> CodexResult<Option<HeaderMap>> {
fn realtime_request_headers(session_id: Option<&str>) -> CodexResult<Option<HeaderMap>> {
let mut headers = HeaderMap::new();
if let Some(session_id) = session_id
@@ -671,11 +643,6 @@ fn realtime_request_headers(
headers.insert("x-session-id", session_id);
}
let auth_value = HeaderValue::from_str(&format!("Bearer {api_key}")).map_err(|err| {
CodexErr::InvalidRequest(format!("invalid realtime api key header: {err}"))
})?;
headers.insert(AUTHORIZATION, auth_value);
Ok(Some(headers))
}