diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 47a4354e2d..71c273ea0e 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -254,26 +254,31 @@ pub(crate) async fn handle_audio( fn realtime_text_from_conversation_item(item: &Value) -> Option { match item.get("type").and_then(Value::as_str) { - Some("message") => { - if item.get("role").and_then(Value::as_str) != Some("assistant") { - return None; - } - let content = item.get("content")?.as_array()?; - let text = content - .iter() - .filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text")) - .filter_map(|entry| entry.get("text").and_then(Value::as_str)) - .collect::(); - if text.is_empty() { None } else { Some(text) } - } - Some("spawn_transcript") => item - .get("delta_user_transcript") - .and_then(Value::as_str) - .and_then(|text| (!text.is_empty()).then(|| text.to_string())), + Some("message") => extract_assistant_message_text(item), + Some("spawn_transcript") => extract_spawn_transcript_delta(item), Some(_) | None => None, } } +fn extract_assistant_message_text(item: &Value) -> Option { + if item.get("role").and_then(Value::as_str) != Some("assistant") { + return None; + } + let content = item.get("content")?.as_array()?; + let text = content + .iter() + .filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text")) + .filter_map(|entry| entry.get("text").and_then(Value::as_str)) + .collect::(); + if text.is_empty() { None } else { Some(text) } +} + +fn extract_spawn_transcript_delta(item: &Value) -> Option { + item.get("delta_user_transcript") + .and_then(Value::as_str) + .and_then(|text| (!text.is_empty()).then(|| text.to_string())) +} + pub(crate) async fn handle_text( sess: &Arc, sub_id: String, diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index ec5730260d..d5ac7275b5 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -171,6 +171,8 @@ const CONNECTORS_SELECTION_VIEW_ID: &str = "connectors-selection"; const REALTIME_CONVERSATION_PROMPT: &str = "Low-level realtime conversation for TUI audio loop testing."; const REALTIME_METER_PLACEHOLDER_ID: &str = "realtime-meter"; +const REALTIME_LISTENING_PLACEHOLDER_TEXT: &str = "Realtime listening ⠤⠤⠤⠤"; +const REALTIME_LISTENING_PREFIX_TEXT: &str = "Realtime listening "; /// Choose the keybinding used to edit the most-recently queued message. /// @@ -3452,14 +3454,14 @@ impl ChatWidget { "Stop voice chat".to_string(), )])); let id = REALTIME_METER_PLACEHOLDER_ID.to_string(); - self.replace_transcription(&id, "Realtime listening ⠤⠤⠤⠤"); + self.replace_transcription(&id, REALTIME_LISTENING_PLACEHOLDER_TEXT); if let Some(controller) = &self.realtime_audio_controller && let Some(RealtimeMicMeterHandles { last_peak, stop }) = controller.meter_handles() { self.bottom_pane.spawn_live_meter_updates( id.clone(), - Some("Realtime listening ".to_string()), + Some(REALTIME_LISTENING_PREFIX_TEXT.to_string()), last_peak, stop, ); diff --git a/codex-rs/tui/src/chatwidget/agent.rs b/codex-rs/tui/src/chatwidget/agent.rs index e8121f788a..aaf0b295b5 100644 --- a/codex-rs/tui/src/chatwidget/agent.rs +++ b/codex-rs/tui/src/chatwidget/agent.rs @@ -9,6 +9,7 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::unbounded_channel; @@ -30,9 +31,7 @@ pub(crate) fn spawn_agent( app_event_tx: AppEventSender, server: Arc, ) -> ChatWidgetOpSenders { - let (codex_op_tx, mut codex_op_rx) = unbounded_channel::(); - let (realtime_audio_op_tx, realtime_audio_op_rx) = - channel::(REALTIME_MIC_AUDIO_QUEUE_CAPACITY); + let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels(); let app_event_tx_clone = app_event_tx; tokio::spawn(async move { @@ -63,16 +62,7 @@ pub(crate) fn spawn_agent( }; app_event_tx_clone.send(AppEvent::CodexEvent(ev)); - let thread_clone = thread.clone(); - spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx); - tokio::spawn(async move { - while let Some(op) = codex_op_rx.recv().await { - let id = thread_clone.submit(op).await; - if let Err(e) = id { - tracing::error!("failed to submit op: {e}"); - } - } - }); + spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx); while let Ok(event) = thread.next_event().await { let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete); @@ -95,13 +85,11 @@ pub(crate) fn spawn_agent( /// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent /// events and accepts Ops for submission. pub(crate) fn spawn_agent_from_existing( - thread: std::sync::Arc, + thread: Arc, session_configured: codex_protocol::protocol::SessionConfiguredEvent, app_event_tx: AppEventSender, ) -> ChatWidgetOpSenders { - let (codex_op_tx, mut codex_op_rx) = unbounded_channel::(); - let (realtime_audio_op_tx, realtime_audio_op_rx) = - channel::(REALTIME_MIC_AUDIO_QUEUE_CAPACITY); + let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels(); let app_event_tx_clone = app_event_tx; tokio::spawn(async move { @@ -112,16 +100,7 @@ pub(crate) fn spawn_agent_from_existing( }; app_event_tx_clone.send(AppEvent::CodexEvent(ev)); - let thread_clone = thread.clone(); - spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx); - tokio::spawn(async move { - while let Some(op) = codex_op_rx.recv().await { - let id = thread_clone.submit(op).await; - if let Err(e) = id { - tracing::error!("failed to submit op: {e}"); - } - } - }); + spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx); while let Ok(event) = thread.next_event().await { let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete); @@ -141,19 +120,9 @@ pub(crate) fn spawn_agent_from_existing( } /// Spawn an op-forwarding loop for an existing thread without subscribing to events. -pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc) -> ChatWidgetOpSenders { - let (codex_op_tx, mut codex_op_rx) = unbounded_channel::(); - let (realtime_audio_op_tx, realtime_audio_op_rx) = - channel::(REALTIME_MIC_AUDIO_QUEUE_CAPACITY); - - spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx); - tokio::spawn(async move { - while let Some(op) = codex_op_rx.recv().await { - if let Err(e) = thread.submit(op).await { - tracing::error!("failed to submit op: {e}"); - } - } - }); +pub(crate) fn spawn_op_forwarder(thread: Arc) -> ChatWidgetOpSenders { + let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels(); + spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx); ChatWidgetOpSenders { codex_op_tx, @@ -161,7 +130,7 @@ pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc) -> ChatWid } } -fn spawn_bounded_op_forwarder(thread: std::sync::Arc, mut op_rx: Receiver) { +fn spawn_bounded_op_forwarder(thread: Arc, mut op_rx: Receiver) { tokio::spawn(async move { while let Some(op) = op_rx.recv().await { if let Err(e) = thread.submit(op).await { @@ -170,3 +139,35 @@ fn spawn_bounded_op_forwarder(thread: std::sync::Arc, mut op_rx: Re } }); } + +fn op_channels() -> ( + UnboundedSender, + UnboundedReceiver, + Sender, + Receiver, +) { + let (codex_op_tx, codex_op_rx) = unbounded_channel::(); + let (realtime_audio_op_tx, realtime_audio_op_rx) = + channel::(REALTIME_MIC_AUDIO_QUEUE_CAPACITY); + ( + codex_op_tx, + codex_op_rx, + realtime_audio_op_tx, + realtime_audio_op_rx, + ) +} + +fn spawn_bounded_op_forwarder_threads( + thread: Arc, + mut codex_op_rx: UnboundedReceiver, + realtime_audio_op_rx: Receiver, +) { + spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx); + tokio::spawn(async move { + while let Some(op) = codex_op_rx.recv().await { + if let Err(e) = thread.submit(op).await { + tracing::error!("failed to submit op: {e}"); + } + } + }); +}