Refactor realtime TUI op forwarding and transcript parsing

This commit is contained in:
Ahmed Ibrahim
2026-02-23 19:45:28 -08:00
parent 21e634da6b
commit 60dc38b962
3 changed files with 67 additions and 59 deletions

View File

@@ -254,26 +254,31 @@ pub(crate) async fn handle_audio(
fn realtime_text_from_conversation_item(item: &Value) -> Option<String> {
match item.get("type").and_then(Value::as_str) {
Some("message") => {
if item.get("role").and_then(Value::as_str) != Some("assistant") {
return None;
}
let content = item.get("content")?.as_array()?;
let text = content
.iter()
.filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text"))
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
.collect::<String>();
if text.is_empty() { None } else { Some(text) }
}
Some("spawn_transcript") => item
.get("delta_user_transcript")
.and_then(Value::as_str)
.and_then(|text| (!text.is_empty()).then(|| text.to_string())),
Some("message") => extract_assistant_message_text(item),
Some("spawn_transcript") => extract_spawn_transcript_delta(item),
Some(_) | None => None,
}
}
fn extract_assistant_message_text(item: &Value) -> Option<String> {
if item.get("role").and_then(Value::as_str) != Some("assistant") {
return None;
}
let content = item.get("content")?.as_array()?;
let text = content
.iter()
.filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text"))
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
.collect::<String>();
if text.is_empty() { None } else { Some(text) }
}
fn extract_spawn_transcript_delta(item: &Value) -> Option<String> {
item.get("delta_user_transcript")
.and_then(Value::as_str)
.and_then(|text| (!text.is_empty()).then(|| text.to_string()))
}
pub(crate) async fn handle_text(
sess: &Arc<Session>,
sub_id: String,

View File

@@ -171,6 +171,8 @@ const CONNECTORS_SELECTION_VIEW_ID: &str = "connectors-selection";
const REALTIME_CONVERSATION_PROMPT: &str =
"Low-level realtime conversation for TUI audio loop testing.";
const REALTIME_METER_PLACEHOLDER_ID: &str = "realtime-meter";
const REALTIME_LISTENING_PLACEHOLDER_TEXT: &str = "Realtime listening ⠤⠤⠤⠤";
const REALTIME_LISTENING_PREFIX_TEXT: &str = "Realtime listening ";
/// Choose the keybinding used to edit the most-recently queued message.
///
@@ -3452,14 +3454,14 @@ impl ChatWidget {
"Stop voice chat".to_string(),
)]));
let id = REALTIME_METER_PLACEHOLDER_ID.to_string();
self.replace_transcription(&id, "Realtime listening ⠤⠤⠤⠤");
self.replace_transcription(&id, REALTIME_LISTENING_PLACEHOLDER_TEXT);
if let Some(controller) = &self.realtime_audio_controller
&& let Some(RealtimeMicMeterHandles { last_peak, stop }) = controller.meter_handles()
{
self.bottom_pane.spawn_live_meter_updates(
id.clone(),
Some("Realtime listening ".to_string()),
Some(REALTIME_LISTENING_PREFIX_TEXT.to_string()),
last_peak,
stop,
);

View File

@@ -9,6 +9,7 @@ use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::unbounded_channel;
@@ -30,9 +31,7 @@ pub(crate) fn spawn_agent(
app_event_tx: AppEventSender,
server: Arc<ThreadManager>,
) -> ChatWidgetOpSenders {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
let (realtime_audio_op_tx, realtime_audio_op_rx) =
channel::<Op>(REALTIME_MIC_AUDIO_QUEUE_CAPACITY);
let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels();
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
@@ -63,16 +62,7 @@ pub(crate) fn spawn_agent(
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
let thread_clone = thread.clone();
spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx);
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
let id = thread_clone.submit(op).await;
if let Err(e) = id {
tracing::error!("failed to submit op: {e}");
}
}
});
spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx);
while let Ok(event) = thread.next_event().await {
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
@@ -95,13 +85,11 @@ pub(crate) fn spawn_agent(
/// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent
/// events and accepts Ops for submission.
pub(crate) fn spawn_agent_from_existing(
thread: std::sync::Arc<CodexThread>,
thread: Arc<CodexThread>,
session_configured: codex_protocol::protocol::SessionConfiguredEvent,
app_event_tx: AppEventSender,
) -> ChatWidgetOpSenders {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
let (realtime_audio_op_tx, realtime_audio_op_rx) =
channel::<Op>(REALTIME_MIC_AUDIO_QUEUE_CAPACITY);
let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels();
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
@@ -112,16 +100,7 @@ pub(crate) fn spawn_agent_from_existing(
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
let thread_clone = thread.clone();
spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx);
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
let id = thread_clone.submit(op).await;
if let Err(e) = id {
tracing::error!("failed to submit op: {e}");
}
}
});
spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx);
while let Ok(event) = thread.next_event().await {
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
@@ -141,19 +120,9 @@ pub(crate) fn spawn_agent_from_existing(
}
/// Spawn an op-forwarding loop for an existing thread without subscribing to events.
pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> ChatWidgetOpSenders {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
let (realtime_audio_op_tx, realtime_audio_op_rx) =
channel::<Op>(REALTIME_MIC_AUDIO_QUEUE_CAPACITY);
spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx);
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
if let Err(e) = thread.submit(op).await {
tracing::error!("failed to submit op: {e}");
}
}
});
pub(crate) fn spawn_op_forwarder(thread: Arc<CodexThread>) -> ChatWidgetOpSenders {
let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels();
spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx);
ChatWidgetOpSenders {
codex_op_tx,
@@ -161,7 +130,7 @@ pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> ChatWid
}
}
fn spawn_bounded_op_forwarder(thread: std::sync::Arc<CodexThread>, mut op_rx: Receiver<Op>) {
fn spawn_bounded_op_forwarder(thread: Arc<CodexThread>, mut op_rx: Receiver<Op>) {
tokio::spawn(async move {
while let Some(op) = op_rx.recv().await {
if let Err(e) = thread.submit(op).await {
@@ -170,3 +139,35 @@ fn spawn_bounded_op_forwarder(thread: std::sync::Arc<CodexThread>, mut op_rx: Re
}
});
}
fn op_channels() -> (
UnboundedSender<Op>,
UnboundedReceiver<Op>,
Sender<Op>,
Receiver<Op>,
) {
let (codex_op_tx, codex_op_rx) = unbounded_channel::<Op>();
let (realtime_audio_op_tx, realtime_audio_op_rx) =
channel::<Op>(REALTIME_MIC_AUDIO_QUEUE_CAPACITY);
(
codex_op_tx,
codex_op_rx,
realtime_audio_op_tx,
realtime_audio_op_rx,
)
}
fn spawn_bounded_op_forwarder_threads(
thread: Arc<CodexThread>,
mut codex_op_rx: UnboundedReceiver<Op>,
realtime_audio_op_rx: Receiver<Op>,
) {
spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx);
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
if let Err(e) = thread.submit(op).await {
tracing::error!("failed to submit op: {e}");
}
}
});
}