mirror of
https://github.com/openai/codex.git
synced 2026-05-01 09:56:37 +00:00
Add WebRTC media transport to realtime TUI (#17058)
Adds the `[realtime].transport = "webrtc"` TUI media path using a new `codex-realtime-webrtc` crate, while leaving app-server as the signaling/event source.\n\nLocal checks: fmt, diff-check, dependency tree only; test signal should come from CI. --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -1,10 +1,17 @@
|
||||
use super::*;
|
||||
use codex_config::config_toml::RealtimeTransport;
|
||||
use codex_protocol::protocol::ConversationStartParams;
|
||||
use codex_protocol::protocol::ConversationStartTransport;
|
||||
use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
use codex_protocol::protocol::RealtimeConversationClosedEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationStartedEvent;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_realtime_webrtc::RealtimeWebrtcEvent;
|
||||
use codex_realtime_webrtc::RealtimeWebrtcSession;
|
||||
use codex_realtime_webrtc::RealtimeWebrtcSessionHandle;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use std::sync::atomic::AtomicU16;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -25,6 +32,7 @@ pub(super) struct RealtimeConversationUiState {
|
||||
requested_close: bool,
|
||||
session_id: Option<String>,
|
||||
warned_audio_only_submission: bool,
|
||||
transport: RealtimeConversationUiTransport,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub(super) meter_placeholder_id: Option<String>,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
@@ -35,6 +43,15 @@ pub(super) struct RealtimeConversationUiState {
|
||||
audio_player: Option<crate::voice::RealtimeAudioPlayer>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
enum RealtimeConversationUiTransport {
|
||||
#[default]
|
||||
Websocket,
|
||||
Webrtc {
|
||||
handle: Option<RealtimeWebrtcSessionHandle>,
|
||||
},
|
||||
}
|
||||
|
||||
impl RealtimeConversationUiState {
|
||||
pub(super) fn is_live(&self) -> bool {
|
||||
matches!(
|
||||
@@ -225,14 +242,31 @@ impl ChatWidget {
|
||||
self.realtime_conversation.session_id = None;
|
||||
self.realtime_conversation.warned_audio_only_submission = false;
|
||||
self.set_footer_hint_override(Some(Self::realtime_footer_hint_items()));
|
||||
match self.config.realtime.transport {
|
||||
RealtimeTransport::Websocket => {
|
||||
self.realtime_conversation.transport = RealtimeConversationUiTransport::Websocket;
|
||||
self.submit_realtime_conversation_start(/*transport*/ None);
|
||||
}
|
||||
RealtimeTransport::WebRtc => {
|
||||
self.realtime_conversation.transport =
|
||||
RealtimeConversationUiTransport::Webrtc { handle: None };
|
||||
start_realtime_webrtc_offer_task(self.app_event_tx.clone());
|
||||
}
|
||||
}
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
fn submit_realtime_conversation_start(
|
||||
&mut self,
|
||||
transport: Option<ConversationStartTransport>,
|
||||
) {
|
||||
self.submit_op(AppCommand::realtime_conversation_start(
|
||||
ConversationStartParams {
|
||||
prompt: REALTIME_CONVERSATION_PROMPT.to_string(),
|
||||
session_id: None,
|
||||
transport: None,
|
||||
transport,
|
||||
},
|
||||
));
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(super) fn request_realtime_conversation_close(&mut self, info_message: Option<String>) {
|
||||
@@ -247,6 +281,7 @@ impl ChatWidget {
|
||||
self.realtime_conversation.phase = RealtimeConversationPhase::Stopping;
|
||||
self.submit_op(AppCommand::realtime_conversation_close());
|
||||
self.stop_realtime_local_audio();
|
||||
self.close_realtime_webrtc_transport();
|
||||
self.set_footer_hint_override(/*items*/ None);
|
||||
|
||||
if let Some(message) = info_message {
|
||||
@@ -258,11 +293,13 @@ impl ChatWidget {
|
||||
|
||||
pub(super) fn reset_realtime_conversation_state(&mut self) {
|
||||
self.stop_realtime_local_audio();
|
||||
self.close_realtime_webrtc_transport();
|
||||
self.set_footer_hint_override(/*items*/ None);
|
||||
self.realtime_conversation.phase = RealtimeConversationPhase::Inactive;
|
||||
self.realtime_conversation.requested_close = false;
|
||||
self.realtime_conversation.session_id = None;
|
||||
self.realtime_conversation.warned_audio_only_submission = false;
|
||||
self.realtime_conversation.transport = RealtimeConversationUiTransport::Websocket;
|
||||
}
|
||||
|
||||
fn fail_realtime_conversation(&mut self, message: String) {
|
||||
@@ -283,11 +320,15 @@ impl ChatWidget {
|
||||
self.request_realtime_conversation_close(/*info_message*/ None);
|
||||
return;
|
||||
}
|
||||
self.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
self.realtime_conversation.session_id = ev.session_id;
|
||||
self.realtime_conversation.warned_audio_only_submission = false;
|
||||
self.set_footer_hint_override(Some(Self::realtime_footer_hint_items()));
|
||||
self.start_realtime_local_audio();
|
||||
if self.realtime_conversation_uses_webrtc() {
|
||||
self.realtime_conversation.phase = RealtimeConversationPhase::Starting;
|
||||
} else {
|
||||
self.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
self.start_realtime_local_audio();
|
||||
}
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
@@ -295,6 +336,16 @@ impl ChatWidget {
|
||||
&mut self,
|
||||
ev: RealtimeConversationRealtimeEvent,
|
||||
) {
|
||||
if self.realtime_conversation_uses_webrtc()
|
||||
&& matches!(
|
||||
ev.payload,
|
||||
RealtimeEvent::AudioOut(_)
|
||||
| RealtimeEvent::InputAudioSpeechStarted(_)
|
||||
| RealtimeEvent::ResponseCancelled(_)
|
||||
)
|
||||
{
|
||||
return;
|
||||
}
|
||||
match ev.payload {
|
||||
RealtimeEvent::SessionUpdated { session_id, .. } => {
|
||||
self.realtime_conversation.session_id = Some(session_id);
|
||||
@@ -314,6 +365,13 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
pub(super) fn on_realtime_conversation_closed(&mut self, ev: RealtimeConversationClosedEvent) {
|
||||
if self.realtime_conversation_uses_webrtc()
|
||||
&& self.realtime_conversation.is_live()
|
||||
&& ev.reason.as_deref() == Some("transport_closed")
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let requested = self.realtime_conversation.requested_close;
|
||||
let reason = ev.reason;
|
||||
self.reset_realtime_conversation_state();
|
||||
@@ -329,6 +387,115 @@ impl ChatWidget {
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(super) fn on_realtime_conversation_sdp(&mut self, sdp: String) {
|
||||
let RealtimeConversationUiTransport::Webrtc {
|
||||
handle: Some(handle),
|
||||
} = &self.realtime_conversation.transport
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Err(err) = handle.apply_answer_sdp(sdp) {
|
||||
self.fail_realtime_conversation(format!("Failed to connect realtime WebRTC: {err}"));
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_realtime_webrtc_offer_created(
|
||||
&mut self,
|
||||
result: Result<crate::app_event::RealtimeWebrtcOffer, String>,
|
||||
) {
|
||||
if self.realtime_conversation.phase != RealtimeConversationPhase::Starting
|
||||
|| !matches!(
|
||||
self.realtime_conversation.transport,
|
||||
RealtimeConversationUiTransport::Webrtc { handle: None }
|
||||
)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let offer = match result {
|
||||
Ok(offer) => offer,
|
||||
Err(err) => {
|
||||
self.fail_realtime_conversation(format!("Failed to start realtime WebRTC: {err}"));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
self.realtime_conversation.transport = RealtimeConversationUiTransport::Webrtc {
|
||||
handle: Some(offer.handle),
|
||||
};
|
||||
self.submit_realtime_conversation_start(Some(ConversationStartTransport::Webrtc {
|
||||
sdp: offer.offer_sdp,
|
||||
}));
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(crate) fn on_realtime_webrtc_event(&mut self, event: RealtimeWebrtcEvent) {
|
||||
if !self.realtime_conversation_uses_webrtc() {
|
||||
return;
|
||||
}
|
||||
|
||||
match event {
|
||||
RealtimeWebrtcEvent::Connected => {
|
||||
if self.realtime_conversation.phase != RealtimeConversationPhase::Starting {
|
||||
return;
|
||||
}
|
||||
self.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
self.set_footer_hint_override(Some(Self::realtime_footer_hint_items()));
|
||||
self.request_redraw();
|
||||
}
|
||||
RealtimeWebrtcEvent::Closed => {
|
||||
self.reset_realtime_conversation_state();
|
||||
self.request_redraw();
|
||||
}
|
||||
RealtimeWebrtcEvent::Failed(message) => {
|
||||
self.fail_realtime_conversation(format!("Realtime WebRTC error: {message}"));
|
||||
}
|
||||
RealtimeWebrtcEvent::LocalAudioLevel(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_realtime_webrtc_local_audio_level(&mut self, peak: u16) {
|
||||
if !self.realtime_conversation_uses_webrtc() || peak == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let _ = peak;
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
let RealtimeConversationUiTransport::Webrtc {
|
||||
handle: Some(handle),
|
||||
} = &self.realtime_conversation.transport
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let peak = handle.local_audio_peak();
|
||||
if self.realtime_conversation.meter_placeholder_id.is_none() {
|
||||
self.start_realtime_webrtc_meter(peak);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn realtime_conversation_uses_webrtc(&self) -> bool {
|
||||
matches!(
|
||||
self.realtime_conversation.transport,
|
||||
RealtimeConversationUiTransport::Webrtc { .. }
|
||||
)
|
||||
}
|
||||
|
||||
fn close_realtime_webrtc_transport(&mut self) {
|
||||
if let RealtimeConversationUiTransport::Webrtc { handle } =
|
||||
&mut self.realtime_conversation.transport
|
||||
&& let Some(handle) = handle.take()
|
||||
{
|
||||
handle.close();
|
||||
}
|
||||
}
|
||||
|
||||
fn enqueue_realtime_audio_out(&mut self, frame: &RealtimeAudioFrame) {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
@@ -364,18 +531,12 @@ impl ChatWidget {
|
||||
return;
|
||||
}
|
||||
|
||||
let placeholder_id = self.bottom_pane.insert_recording_meter_placeholder("⠤⠤⠤⠤");
|
||||
self.realtime_conversation.meter_placeholder_id = Some(placeholder_id.clone());
|
||||
self.request_redraw();
|
||||
|
||||
let capture = match crate::voice::VoiceCapture::start_realtime(
|
||||
&self.config,
|
||||
self.app_event_tx.clone(),
|
||||
) {
|
||||
Ok(capture) => capture,
|
||||
Err(err) => {
|
||||
self.realtime_conversation.meter_placeholder_id = None;
|
||||
self.remove_recording_meter_placeholder(&placeholder_id);
|
||||
self.fail_realtime_conversation(format!(
|
||||
"Failed to start microphone capture: {err}"
|
||||
));
|
||||
@@ -385,33 +546,33 @@ impl ChatWidget {
|
||||
|
||||
let stop_flag = capture.stopped_flag();
|
||||
let peak = capture.last_peak_arc();
|
||||
let meter_placeholder_id = placeholder_id;
|
||||
let app_event_tx = self.app_event_tx.clone();
|
||||
|
||||
self.realtime_conversation.capture_stop_flag = Some(stop_flag.clone());
|
||||
self.start_realtime_meter(stop_flag.clone(), peak);
|
||||
self.realtime_conversation.capture_stop_flag = Some(stop_flag);
|
||||
self.realtime_conversation.capture = Some(capture);
|
||||
if self.realtime_conversation.audio_player.is_none() {
|
||||
self.realtime_conversation.audio_player =
|
||||
crate::voice::RealtimeAudioPlayer::start(&self.config).ok();
|
||||
}
|
||||
}
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let mut meter = crate::voice::RecordingMeterState::new();
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn start_realtime_webrtc_meter(&mut self, peak: Arc<AtomicU16>) {
|
||||
if self.realtime_conversation.capture_stop_flag.is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
loop {
|
||||
if stop_flag.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let stop_flag = Arc::new(AtomicBool::new(false));
|
||||
self.start_realtime_meter(stop_flag.clone(), peak);
|
||||
self.realtime_conversation.capture_stop_flag = Some(stop_flag);
|
||||
}
|
||||
|
||||
let meter_text = meter.next_text(peak.load(Ordering::Relaxed));
|
||||
app_event_tx.send(AppEvent::UpdateRecordingMeter {
|
||||
id: meter_placeholder_id.clone(),
|
||||
text: meter_text,
|
||||
});
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn start_realtime_meter(&mut self, stop_flag: Arc<AtomicBool>, peak: Arc<AtomicU16>) {
|
||||
let placeholder_id = self.bottom_pane.insert_recording_meter_placeholder("⠤⠤⠤⠤");
|
||||
self.realtime_conversation.meter_placeholder_id = Some(placeholder_id.clone());
|
||||
self.request_redraw();
|
||||
|
||||
std::thread::sleep(Duration::from_millis(60));
|
||||
}
|
||||
});
|
||||
start_realtime_meter_task(placeholder_id, self.app_event_tx.clone(), stop_flag, peak);
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -479,3 +640,56 @@ impl ChatWidget {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start_realtime_webrtc_offer_task(app_event_tx: AppEventSender) {
|
||||
std::thread::spawn(move || {
|
||||
let result = match RealtimeWebrtcSession::start() {
|
||||
Ok(started) => {
|
||||
let event_tx = app_event_tx.clone();
|
||||
let local_audio_peak = started.handle.local_audio_peak();
|
||||
std::thread::spawn(move || {
|
||||
for event in started.events {
|
||||
if let RealtimeWebrtcEvent::LocalAudioLevel(peak) = event {
|
||||
local_audio_peak.store(peak, Ordering::Relaxed);
|
||||
event_tx.send(AppEvent::RealtimeWebrtcLocalAudioLevel(peak));
|
||||
} else {
|
||||
event_tx.send(AppEvent::RealtimeWebrtcEvent(event));
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(crate::app_event::RealtimeWebrtcOffer {
|
||||
offer_sdp: started.offer_sdp,
|
||||
handle: started.handle,
|
||||
})
|
||||
}
|
||||
Err(err) => Err(err.to_string()),
|
||||
};
|
||||
app_event_tx.send(AppEvent::RealtimeWebrtcOfferCreated { result });
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn start_realtime_meter_task(
|
||||
meter_placeholder_id: String,
|
||||
app_event_tx: AppEventSender,
|
||||
stop_flag: Arc<AtomicBool>,
|
||||
peak: Arc<AtomicU16>,
|
||||
) {
|
||||
std::thread::spawn(move || {
|
||||
let mut meter = crate::voice::RecordingMeterState::new();
|
||||
|
||||
loop {
|
||||
if stop_flag.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
let meter_text = meter.next_text(peak.load(Ordering::Relaxed));
|
||||
app_event_tx.send(AppEvent::UpdateRecordingMeter {
|
||||
id: meter_placeholder_id.clone(),
|
||||
text: meter_text,
|
||||
});
|
||||
|
||||
std::thread::sleep(Duration::from_millis(60));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user