Compare commits

...

3 Commits

Author SHA1 Message Date
Kshitijh Meelu
c8b00ac21a Add realtime startup timing logs 2026-05-04 08:49:07 -07:00
Kshitijh Meelu
df4a90740f Test async realtime sideband SDP timing 2026-05-04 08:17:43 -07:00
Kshitijh Meelu
73d6392f4f Make realtime sideband startup async
Move the WebRTC sideband websocket connect out of the start critical path. The call-create request still returns the SDP answer synchronously, while the sideband input task connects in the background and uses the existing input channels to queue text, handoff output, and audio until the websocket is ready.

Add coverage that a delayed sideband accepts queued text after the SDP answer has already been emitted.

Co-authored-by: Codex <noreply@openai.com>
2026-05-01 21:48:35 -07:00
2 changed files with 248 additions and 95 deletions

View File

@@ -52,6 +52,7 @@ use serde_json::json;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Instant;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;
@@ -196,6 +197,12 @@ struct RealtimeInputTask {
event_parser: RealtimeEventParser,
}
struct RealtimeInputChannels {
user_text_rx: Receiver<String>,
handoff_output_rx: Receiver<HandoffOutput>,
audio_rx: Receiver<RealtimeAudioFrame>,
}
impl RealtimeHandoffState {
fn new(output_tx: Sender<HandoffOutput>, session_kind: RealtimeSessionKind) -> Self {
Self {
@@ -212,7 +219,6 @@ struct ConversationState {
audio_tx: Sender<RealtimeAudioFrame>,
user_text_tx: Sender<String>,
session_kind: RealtimeSessionKind,
writer: RealtimeWebsocketWriter,
handoff: RealtimeHandoffState,
input_task: JoinHandle<()>,
fanout_task: Option<JoinHandle<()>>,
@@ -271,6 +277,7 @@ impl RealtimeConversationManager {
}
async fn start_inner(&self, start: RealtimeStart) -> CodexResult<RealtimeStartOutput> {
let startup_started_at = Instant::now();
let RealtimeStart {
api_provider,
extra_headers,
@@ -284,39 +291,6 @@ impl RealtimeConversationManager {
RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2,
};
let client = RealtimeWebsocketClient::new(api_provider);
let (connection, sdp) = if let Some(sdp) = sdp {
let call = model_client
.create_realtime_call_with_headers(
sdp,
session_config.clone(),
extra_headers.unwrap_or_default(),
)
.await?;
let connection = client
.connect_webrtc_sideband(
session_config,
&call.call_id,
call.sideband_headers,
default_headers(),
)
.await
.map_err(map_api_error)?;
(connection, Some(call.sdp))
} else {
let connection = client
.connect(
session_config,
extra_headers.unwrap_or_default(),
default_headers(),
)
.await
.map_err(map_api_error)?;
(connection, None)
};
let writer = connection.writer();
let events = connection.events();
let (audio_tx, audio_rx) =
async_channel::bounded::<RealtimeAudioFrame>(AUDIO_IN_QUEUE_CAPACITY);
let (user_text_tx, user_text_rx) =
@@ -328,24 +302,85 @@ impl RealtimeConversationManager {
let realtime_active = Arc::new(AtomicBool::new(true));
let handoff = RealtimeHandoffState::new(handoff_output_tx, session_kind);
let task = spawn_realtime_input_task(RealtimeInputTask {
writer: writer.clone(),
events,
let input_channels = RealtimeInputChannels {
user_text_rx,
handoff_output_rx,
audio_rx,
events_tx,
handoff_state: handoff.clone(),
session_kind,
event_parser,
});
};
let client = RealtimeWebsocketClient::new(api_provider);
let (task, sdp) = if let Some(sdp) = sdp {
info!(transport = "webrtc", "creating realtime call");
let call_started_at = Instant::now();
let call = model_client
.create_realtime_call_with_headers(
sdp,
session_config.clone(),
extra_headers.unwrap_or_default(),
)
.await?;
info!(
transport = "webrtc",
call_id = %call.call_id,
elapsed_ms = call_started_at.elapsed().as_millis() as u64,
total_elapsed_ms = startup_started_at.elapsed().as_millis() as u64,
"realtime call created; sdp answer ready"
);
let task = spawn_webrtc_sideband_input_task(RealtimeWebrtcSidebandInputTask {
client,
session_config,
call_id: call.call_id,
sideband_headers: call.sideband_headers,
input_channels,
events_tx,
handoff_state: handoff.clone(),
session_kind,
event_parser,
realtime_active: Arc::clone(&realtime_active),
startup_started_at,
});
info!(
transport = "webrtc",
total_elapsed_ms = startup_started_at.elapsed().as_millis() as u64,
"spawned realtime sideband connection task"
);
(task, Some(call.sdp))
} else {
info!(transport = "websocket", "connecting realtime websocket");
let connect_started_at = Instant::now();
let connection = client
.connect(
session_config,
extra_headers.unwrap_or_default(),
default_headers(),
)
.await
.map_err(map_api_error)?;
info!(
transport = "websocket",
elapsed_ms = connect_started_at.elapsed().as_millis() as u64,
total_elapsed_ms = startup_started_at.elapsed().as_millis() as u64,
"connected realtime websocket"
);
let task = spawn_realtime_input_task(RealtimeInputTask {
writer: connection.writer(),
events: connection.events(),
user_text_rx: input_channels.user_text_rx,
handoff_output_rx: input_channels.handoff_output_rx,
audio_rx: input_channels.audio_rx,
events_tx,
handoff_state: handoff.clone(),
session_kind,
event_parser,
});
(task, None)
};
let mut guard = self.state.lock().await;
*guard = Some(ConversationState {
audio_tx,
user_text_tx,
session_kind,
writer,
handoff,
input_task: task,
fanout_task: None,
@@ -805,6 +840,7 @@ async fn handle_start_inner(
msg: EventMsg::RealtimeConversationSdp(RealtimeConversationSdpEvent { sdp }),
})
.await;
info!("sent realtime sdp answer to client");
}
let sess_clone = Arc::clone(sess);
@@ -1004,6 +1040,100 @@ pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
}
fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
tokio::spawn(run_realtime_input_task(input))
}
struct RealtimeWebrtcSidebandInputTask {
client: RealtimeWebsocketClient,
session_config: RealtimeSessionConfig,
call_id: String,
sideband_headers: HeaderMap,
input_channels: RealtimeInputChannels,
events_tx: Sender<RealtimeEvent>,
handoff_state: RealtimeHandoffState,
session_kind: RealtimeSessionKind,
event_parser: RealtimeEventParser,
realtime_active: Arc<AtomicBool>,
startup_started_at: Instant,
}
fn spawn_webrtc_sideband_input_task(input: RealtimeWebrtcSidebandInputTask) -> JoinHandle<()> {
let RealtimeWebrtcSidebandInputTask {
client,
session_config,
call_id,
sideband_headers,
input_channels,
events_tx,
handoff_state,
session_kind,
event_parser,
realtime_active,
startup_started_at,
} = input;
tokio::spawn(async move {
if !realtime_active.load(Ordering::Relaxed) {
return;
}
info!(%call_id, "connecting realtime sideband websocket");
let sideband_started_at = Instant::now();
let connection = match client
.connect_webrtc_sideband(
session_config,
&call_id,
sideband_headers,
default_headers(),
)
.await
{
Ok(connection) => {
info!(
%call_id,
elapsed_ms = sideband_started_at.elapsed().as_millis() as u64,
total_elapsed_ms = startup_started_at.elapsed().as_millis() as u64,
"connected realtime sideband websocket"
);
connection
}
Err(err) => {
if realtime_active.load(Ordering::Relaxed) {
let mapped_error = map_api_error(err);
warn!(
%call_id,
elapsed_ms = sideband_started_at.elapsed().as_millis() as u64,
total_elapsed_ms = startup_started_at.elapsed().as_millis() as u64,
"failed to connect realtime sideband: {mapped_error}"
);
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
}
return;
}
};
if !realtime_active.load(Ordering::Relaxed) {
return;
}
run_realtime_input_task(RealtimeInputTask {
writer: connection.writer(),
events: connection.events(),
user_text_rx: input_channels.user_text_rx,
handoff_output_rx: input_channels.handoff_output_rx,
audio_rx: input_channels.audio_rx,
events_tx,
handoff_state,
session_kind,
event_parser,
})
.await;
})
}
async fn run_realtime_input_task(input: RealtimeInputTask) {
let RealtimeInputTask {
writer,
events,
@@ -1016,57 +1146,55 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
event_parser,
} = input;
tokio::spawn(async move {
let mut output_audio_state: Option<OutputAudioState> = None;
let mut response_create_queue = RealtimeResponseCreateQueue::default();
let mut output_audio_state: Option<OutputAudioState> = None;
let mut response_create_queue = RealtimeResponseCreateQueue::default();
loop {
let result = tokio::select! {
// Text typed by the user that should be sent into realtime.
user_text = user_text_rx.recv() => {
handle_user_text_input(
user_text,
&writer,
&events_tx,
)
.await
}
// Background agent progress or final output that should be sent back to realtime.
background_agent_output = handoff_output_rx.recv() => {
handle_handoff_output(
background_agent_output,
&writer,
&events_tx,
&handoff_state,
event_parser,
&mut response_create_queue,
)
.await
}
// Events received from the realtime server.
realtime_event = events.next_event() => {
handle_realtime_server_event(
realtime_event,
&writer,
&events_tx,
&handoff_state,
session_kind,
&mut output_audio_state,
&mut response_create_queue,
)
loop {
let result = tokio::select! {
// Text typed by the user that should be sent into realtime.
user_text = user_text_rx.recv() => {
handle_user_text_input(
user_text,
&writer,
&events_tx,
)
.await
}
// Audio frames captured from the user microphone.
user_audio_frame = audio_rx.recv() => {
handle_user_audio_input(user_audio_frame, &writer, &events_tx)
.await
}
};
if result.is_err() {
break;
}
// Background agent progress or final output that should be sent back to realtime.
background_agent_output = handoff_output_rx.recv() => {
handle_handoff_output(
background_agent_output,
&writer,
&events_tx,
&handoff_state,
event_parser,
&mut response_create_queue,
)
.await
}
// Events received from the realtime server.
realtime_event = events.next_event() => {
handle_realtime_server_event(
realtime_event,
&writer,
&events_tx,
&handoff_state,
session_kind,
&mut output_audio_state,
&mut response_create_queue,
)
.await
}
// Audio frames captured from the user microphone.
user_audio_frame = audio_rx.recv() => {
handle_user_audio_input(user_audio_frame, &writer, &events_tx)
.await
}
};
if result.is_err() {
break;
}
})
}
}
async fn handle_user_text_input(

View File

@@ -48,6 +48,7 @@ use std::process::Command;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::oneshot;
use tokio::time::timeout;
use wiremock::Match;
@@ -456,6 +457,7 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let sideband_accept_delay = Duration::from_millis(1000);
let capture = RealtimeCallRequestCapture::new();
Mock::given(method("POST"))
.and(path_regex(".*/realtime/calls$"))
@@ -468,12 +470,15 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
.mount(&server)
.await;
let realtime_server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
requests: vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_webrtc", "instructions": "backend prompt" }
})]],
requests: vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_webrtc", "instructions": "backend prompt" }
})],
vec![],
],
response_headers: Vec::new(),
accept_delay: None,
accept_delay: Some(sideband_accept_delay),
close_after_requests: false,
}])
.await;
@@ -488,6 +493,7 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
});
let test = builder.build(&server).await?;
let start = Instant::now();
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
output_modality: RealtimeOutputModality::Audio,
@@ -509,7 +515,19 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation call create failed: {err:?}"));
let sdp_elapsed = start.elapsed();
assert_eq!(created.sdp, "v=answer\r\n");
assert!(
sdp_elapsed < sideband_accept_delay,
"SDP answer should arrive before sideband accept delay; elapsed={sdp_elapsed:?}, delay={sideband_accept_delay:?}"
);
assert!(realtime_server.handshakes().is_empty());
test.codex
.submit(Op::RealtimeConversationText(ConversationTextParams {
text: "queued before sideband".to_string(),
}))
.await?;
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
@@ -578,6 +596,13 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
.context("session.update should include instructions")?
.contains("startup context")
);
let queued_text = realtime_server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 1)
.await;
assert_eq!(
websocket_request_text(&queued_text).as_deref(),
Some("queued before sideband")
);
let handshake = realtime_server.single_handshake();
assert_eq!(
handshake.uri(),