Compare commits

...

14 Commits

Author SHA1 Message Date
Ahmed Ibrahim
6afe612b70 Gate realtime cpal audio paths for linux builds
Avoid linux cpal/alsa build dependency by gating realtime audio cpal imports and controller paths to non-linux.

Co-authored-by: Codex <noreply@openai.com>
2026-02-23 21:29:54 -08:00
Ahmed Ibrahim
2648a7c52b tui: remove underscore placeholders and dead-code locals
remove leading-underscore placeholders introduced by realtime refactor

Co-authored-by: Codex <noreply@openai.com>
2026-02-23 19:59:51 -08:00
Ahmed Ibrahim
60dc38b962 Refactor realtime TUI op forwarding and transcript parsing 2026-02-23 19:45:28 -08:00
Ahmed Ibrahim
21e634da6b fix 2026-02-23 19:18:08 -08:00
Ahmed Ibrahim
5f8024e9e4 refactor(realtime): trim diff by reusing transcription placeholders
Co-authored-by: Codex <noreply@openai.com>
2026-02-23 16:48:21 -08:00
Ahmed Ibrahim
f7c740fff1 refactor(realtime): reuse voice meter placeholder plumbing
Co-authored-by: Codex <noreply@openai.com>
2026-02-23 16:46:51 -08:00
Ahmed Ibrahim
a7a812e7bf Merge origin/main into aibrahim/hidden-realtime-tui
Resolve latest-main conflicts by keeping upstream voice transcription infra and folding /realtime meter updates into the existing pre-draw lifecycle.

Co-authored-by: Codex <noreply@openai.com>
2026-02-23 16:14:29 -08:00
Ahmed Ibrahim
7f126df60e Merge origin/main into aibrahim/hidden-realtime-tui
Integrate the realtime TUI audio safety fixes and reuse the voice-style meter/composer lifecycle for /realtime.

Co-authored-by: Codex <noreply@openai.com>
2026-02-23 15:23:06 -08:00
Ahmed Ibrahim
128a7d2ae5 fix 2026-02-23 14:40:22 -08:00
Ahmed Ibrahim
5498a8a3e3 Merge branch 'main' into aibrahim/hidden-realtime-tui 2026-02-23 14:12:22 -08:00
Ahmed Ibrahim
19d9d7c6a5 fix 2026-02-23 14:10:51 -08:00
Ahmed Ibrahim
091fa0b980 fix 2026-02-23 14:04:37 -08:00
Ahmed Ibrahim
968f3fc9de fix 2026-02-23 14:00:26 -08:00
Ahmed Ibrahim
7e0bcc0812 Add hidden /realtime TUI conversation mode
Add a hidden under-development realtime_conversation feature and /realtime slash command gating.
Wire TUI realtime audio I/O, lifecycle handling, and live delegated user-message rendering during realtime mode.

Co-authored-by: Codex <noreply@openai.com>
2026-02-23 10:10:06 -08:00
18 changed files with 1656 additions and 615 deletions

256
MODULE.bazel.lock generated

File diff suppressed because one or more lines are too long

677
codex-rs/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -163,6 +163,7 @@ clap_complete = "4"
color-eyre = "0.6.3"
crossbeam-channel = "0.5.15"
crossterm = "0.28.1"
cpal = "0.15.3"
ctor = "0.6.3"
derive_more = "2"
diffy = "0.4.2"

View File

@@ -367,6 +367,9 @@
"prevent_idle_sleep": {
"type": "boolean"
},
"realtime_conversation": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},
@@ -1651,6 +1654,9 @@
"prevent_idle_sleep": {
"type": "boolean"
},
"realtime_conversation": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},

View File

@@ -146,6 +146,8 @@ pub enum Feature {
ResponsesWebsockets,
/// Enable Responses API websocket v2 mode.
ResponsesWebsocketsV2,
/// Enable hidden TUI `/realtime` low-level realtime conversation controls.
RealtimeConversation,
}
impl Feature {
@@ -661,6 +663,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::RealtimeConversation,
key: "realtime_conversation",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
];
/// Push a warning event if any under-development features are enabled.

View File

@@ -254,26 +254,31 @@ pub(crate) async fn handle_audio(
fn realtime_text_from_conversation_item(item: &Value) -> Option<String> {
match item.get("type").and_then(Value::as_str) {
Some("message") => {
if item.get("role").and_then(Value::as_str) != Some("assistant") {
return None;
}
let content = item.get("content")?.as_array()?;
let text = content
.iter()
.filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text"))
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
.collect::<String>();
if text.is_empty() { None } else { Some(text) }
}
Some("spawn_transcript") => item
.get("delta_user_transcript")
.and_then(Value::as_str)
.and_then(|text| (!text.is_empty()).then(|| text.to_string())),
Some("message") => extract_assistant_message_text(item),
Some("spawn_transcript") => extract_spawn_transcript_delta(item),
Some(_) | None => None,
}
}
fn extract_assistant_message_text(item: &Value) -> Option<String> {
if item.get("role").and_then(Value::as_str) != Some("assistant") {
return None;
}
let content = item.get("content")?.as_array()?;
let text = content
.iter()
.filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text"))
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
.collect::<String>();
if text.is_empty() { None } else { Some(text) }
}
fn extract_spawn_transcript_delta(item: &Value) -> Option<String> {
item.get("delta_user_transcript")
.and_then(Value::as_str)
.and_then(|text| (!text.is_empty()).then(|| text.to_string()))
}
pub(crate) async fn handle_text(
sess: &Arc<Session>,
sub_id: String,

View File

@@ -1002,8 +1002,13 @@ impl App {
let codex_op_tx = if let Some(thread) = live_thread {
crate::chatwidget::spawn_op_forwarder(thread)
} else {
let (tx, _rx) = unbounded_channel();
tx
let (codex_op_tx, codex_op_rx) = unbounded_channel();
let (realtime_audio_op_tx, realtime_audio_op_rx) = tokio::sync::mpsc::channel(1);
drop((codex_op_rx, realtime_audio_op_rx));
crate::chatwidget::ChatWidgetOpSenders {
codex_op_tx,
realtime_audio_op_tx,
}
};
self.chat_widget = ChatWidget::new_with_op_sender(init, codex_op_tx);
@@ -2713,7 +2718,6 @@ impl App {
AppEvent::TranscriptionFailed { id, error: _ } => {
self.chat_widget.remove_transcription_placeholder(&id);
}
#[cfg(not(target_os = "linux"))]
AppEvent::UpdateRecordingMeter { id, text } => {
// Update in place to preserve the element id for subsequent frames.
let updated = self.chat_widget.update_transcription_in_place(&id, &text);

View File

@@ -321,9 +321,9 @@ pub(crate) enum AppEvent {
/// Re-open the permissions presets popup.
OpenPermissionsPopup,
/// Live update for the in-progress voice recording placeholder. Carries
/// the placeholder `id` and the text to display (e.g., an ASCII meter).
#[cfg(not(target_os = "linux"))]
/// Live update for an in-progress named composer placeholder (used by
/// voice recording/transcription and realtime mic metering). Carries the
/// placeholder `id` and the text to display.
UpdateRecordingMeter {
id: String,
text: String,

View File

@@ -215,8 +215,6 @@ use std::collections::VecDeque;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
#[cfg(not(target_os = "linux"))]
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
#[cfg(not(target_os = "linux"))]
@@ -389,6 +387,7 @@ pub(crate) struct ChatComposer {
collaboration_mode_indicator: Option<CollaborationModeIndicator>,
connectors_enabled: bool,
personality_command_enabled: bool,
realtime_command_enabled: bool,
windows_degraded_sandbox_active: bool,
status_line_value: Option<Line<'static>>,
status_line_enabled: bool,
@@ -494,6 +493,7 @@ impl ChatComposer {
collaboration_mode_indicator: None,
connectors_enabled: false,
personality_command_enabled: false,
realtime_command_enabled: false,
windows_degraded_sandbox_active: false,
status_line_value: None,
status_line_enabled: false,
@@ -577,7 +577,9 @@ impl ChatComposer {
pub fn set_personality_command_enabled(&mut self, enabled: bool) {
self.personality_command_enabled = enabled;
}
pub fn set_realtime_command_enabled(&mut self, enabled: bool) {
self.realtime_command_enabled = enabled;
}
pub fn set_voice_transcription_enabled(&mut self, enabled: bool) {
self.voice_state.transcription_enabled = enabled;
if !enabled {
@@ -2245,6 +2247,7 @@ impl ChatComposer {
self.collaboration_modes_enabled,
self.connectors_enabled,
self.personality_command_enabled,
self.realtime_command_enabled,
self.windows_degraded_sandbox_active,
)
.is_some();
@@ -2444,6 +2447,7 @@ impl ChatComposer {
self.collaboration_modes_enabled,
self.connectors_enabled,
self.personality_command_enabled,
self.realtime_command_enabled,
self.windows_degraded_sandbox_active,
)
{
@@ -2478,6 +2482,7 @@ impl ChatComposer {
self.collaboration_modes_enabled,
self.connectors_enabled,
self.personality_command_enabled,
self.realtime_command_enabled,
self.windows_degraded_sandbox_active,
)?;
@@ -3310,6 +3315,7 @@ impl ChatComposer {
self.collaboration_modes_enabled,
self.connectors_enabled,
self.personality_command_enabled,
self.realtime_command_enabled,
self.windows_degraded_sandbox_active,
)
.is_some();
@@ -3371,6 +3377,7 @@ impl ChatComposer {
self.collaboration_modes_enabled,
self.connectors_enabled,
self.personality_command_enabled,
self.realtime_command_enabled,
self.windows_degraded_sandbox_active,
) {
return true;
@@ -3424,12 +3431,14 @@ impl ChatComposer {
let collaboration_modes_enabled = self.collaboration_modes_enabled;
let connectors_enabled = self.connectors_enabled;
let personality_command_enabled = self.personality_command_enabled;
let realtime_command_enabled = self.realtime_command_enabled;
let mut command_popup = CommandPopup::new(
self.custom_prompts.clone(),
CommandPopupFlags {
collaboration_modes_enabled,
connectors_enabled,
personality_command_enabled,
realtime_command_enabled,
windows_degraded_sandbox_active: self.windows_degraded_sandbox_active,
},
);
@@ -3831,13 +3840,10 @@ impl ChatComposer {
self.voice_state.recording_placeholder_id = Some(id);
// Spawn metering animation
if let Some(v) = &self.voice_state.voice {
let data = v.data_arc();
let stop = v.stopped_flag();
let sr = v.sample_rate();
let ch = v.channels();
let peak = v.last_peak_arc();
if let Some(idref) = &self.voice_state.recording_placeholder_id {
self.spawn_recording_meter(idref.clone(), sr, ch, data, peak, stop);
self.spawn_live_meter_updates(idref.clone(), None, peak, stop);
}
}
true
@@ -3851,12 +3857,10 @@ impl ChatComposer {
}
}
fn spawn_recording_meter(
pub(crate) fn spawn_live_meter_updates(
&self,
id: String,
_sample_rate: u32,
_channels: u16,
_data: Arc<Mutex<Vec<i16>>>,
prefix: Option<String>,
last_peak: Arc<std::sync::atomic::AtomicU16>,
stop: Arc<std::sync::atomic::AtomicBool>,
) {
@@ -3868,7 +3872,11 @@ impl ChatComposer {
if stop.load(Ordering::Relaxed) {
break;
}
let text = meter.next_text(last_peak.load(Ordering::Relaxed));
let meter_text = meter.next_text(last_peak.load(Ordering::Relaxed));
let text = match &prefix {
Some(prefix) => format!("{prefix}{meter_text}"),
None => meter_text,
};
tx.send(crate::app_event::AppEvent::UpdateRecordingMeter {
id: id.clone(),
text,
@@ -3946,6 +3954,38 @@ impl ChatComposer {
}
}
#[cfg(target_os = "linux")]
impl ChatComposer {
pub(crate) fn spawn_live_meter_updates(
&self,
id: String,
prefix: Option<String>,
last_peak: Arc<std::sync::atomic::AtomicU16>,
stop: Arc<std::sync::atomic::AtomicBool>,
) {
let tx = self.app_event_tx.clone();
std::thread::spawn(move || {
use std::time::Duration;
let mut meter = crate::voice::RecordingMeterState::new();
loop {
if stop.load(Ordering::Relaxed) {
break;
}
let meter_text = meter.next_text(last_peak.load(Ordering::Relaxed));
let text = match &prefix {
Some(prefix) => format!("{prefix}{meter_text}"),
None => meter_text,
};
tx.send(crate::app_event::AppEvent::UpdateRecordingMeter {
id: id.clone(),
text,
});
std::thread::sleep(Duration::from_millis(100));
}
});
}
}
fn skill_display_name(skill: &SkillMetadata) -> &str {
skill
.interface

View File

@@ -39,6 +39,7 @@ pub(crate) struct CommandPopupFlags {
pub(crate) collaboration_modes_enabled: bool,
pub(crate) connectors_enabled: bool,
pub(crate) personality_command_enabled: bool,
pub(crate) realtime_command_enabled: bool,
pub(crate) windows_degraded_sandbox_active: bool,
}
@@ -49,6 +50,7 @@ impl CommandPopup {
flags.collaboration_modes_enabled,
flags.connectors_enabled,
flags.personality_command_enabled,
flags.realtime_command_enabled,
flags.windows_degraded_sandbox_active,
)
.into_iter()
@@ -495,6 +497,7 @@ mod tests {
collaboration_modes_enabled: true,
connectors_enabled: false,
personality_command_enabled: true,
realtime_command_enabled: false,
windows_degraded_sandbox_active: false,
},
);
@@ -514,6 +517,7 @@ mod tests {
collaboration_modes_enabled: true,
connectors_enabled: false,
personality_command_enabled: true,
realtime_command_enabled: false,
windows_degraded_sandbox_active: false,
},
);
@@ -533,6 +537,7 @@ mod tests {
collaboration_modes_enabled: true,
connectors_enabled: false,
personality_command_enabled: false,
realtime_command_enabled: false,
windows_degraded_sandbox_active: false,
},
);
@@ -560,6 +565,7 @@ mod tests {
collaboration_modes_enabled: true,
connectors_enabled: false,
personality_command_enabled: true,
realtime_command_enabled: false,
windows_degraded_sandbox_active: false,
},
);
@@ -571,6 +577,54 @@ mod tests {
}
}
#[test]
fn realtime_command_hidden_when_disabled() {
let mut popup = CommandPopup::new(
Vec::new(),
CommandPopupFlags {
collaboration_modes_enabled: true,
connectors_enabled: false,
personality_command_enabled: true,
realtime_command_enabled: false,
windows_degraded_sandbox_active: false,
},
);
popup.on_composer_text_change("/rea".to_string());
let cmds: Vec<&str> = popup
.filtered_items()
.into_iter()
.filter_map(|item| match item {
CommandItem::Builtin(cmd) => Some(cmd.command()),
CommandItem::UserPrompt(_) => None,
})
.collect();
assert!(
!cmds.contains(&"realtime"),
"expected '/realtime' to be hidden when disabled, got {cmds:?}"
);
}
#[test]
fn realtime_command_visible_when_enabled() {
let mut popup = CommandPopup::new(
Vec::new(),
CommandPopupFlags {
collaboration_modes_enabled: true,
connectors_enabled: false,
personality_command_enabled: true,
realtime_command_enabled: true,
windows_degraded_sandbox_active: false,
},
);
popup.on_composer_text_change("/realtime".to_string());
match popup.selected_item() {
Some(CommandItem::Builtin(cmd)) => assert_eq!(cmd.command(), "realtime"),
other => panic!("expected realtime to be selected for exact match, got {other:?}"),
}
}
#[test]
fn debug_commands_are_hidden_from_popup() {
let popup = CommandPopup::new(Vec::new(), CommandPopupFlags::default());

View File

@@ -292,11 +292,14 @@ impl BottomPane {
self.request_redraw();
}
pub fn set_realtime_command_enabled(&mut self, enabled: bool) {
self.composer.set_realtime_command_enabled(enabled);
self.request_redraw();
}
pub fn set_voice_transcription_enabled(&mut self, enabled: bool) {
self.composer.set_voice_transcription_enabled(enabled);
self.request_redraw();
}
/// Update the key hint shown next to queued messages so it matches the
/// binding that `ChatWidget` actually listens for.
pub(crate) fn set_queued_message_edit_binding(&mut self, binding: KeyBinding) {
@@ -563,6 +566,17 @@ impl BottomPane {
self.request_redraw();
}
pub(crate) fn spawn_live_meter_updates(
&self,
id: String,
prefix: Option<String>,
last_peak: std::sync::Arc<std::sync::atomic::AtomicU16>,
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
) {
self.composer
.spawn_live_meter_updates(id, prefix, last_peak, stop);
}
pub(crate) fn set_remote_image_urls(&mut self, urls: Vec<String>) {
self.composer.set_remote_image_urls(urls);
self.request_redraw();
@@ -1005,7 +1019,6 @@ impl BottomPane {
}
}
#[cfg(not(target_os = "linux"))]
impl BottomPane {
pub(crate) fn replace_transcription(&mut self, id: &str, text: &str) {
self.composer.replace_transcription(id, text);

View File

@@ -13,6 +13,7 @@ pub(crate) fn builtins_for_input(
collaboration_modes_enabled: bool,
connectors_enabled: bool,
personality_command_enabled: bool,
realtime_command_enabled: bool,
allow_elevate_sandbox: bool,
) -> Vec<(&'static str, SlashCommand)> {
built_in_slash_commands()
@@ -24,6 +25,7 @@ pub(crate) fn builtins_for_input(
})
.filter(|(_, cmd)| connectors_enabled || *cmd != SlashCommand::Apps)
.filter(|(_, cmd)| personality_command_enabled || *cmd != SlashCommand::Personality)
.filter(|(_, cmd)| realtime_command_enabled || *cmd != SlashCommand::Realtime)
.collect()
}
@@ -33,12 +35,14 @@ pub(crate) fn find_builtin_command(
collaboration_modes_enabled: bool,
connectors_enabled: bool,
personality_command_enabled: bool,
realtime_command_enabled: bool,
allow_elevate_sandbox: bool,
) -> Option<SlashCommand> {
builtins_for_input(
collaboration_modes_enabled,
connectors_enabled,
personality_command_enabled,
realtime_command_enabled,
allow_elevate_sandbox,
)
.into_iter()
@@ -52,12 +56,14 @@ pub(crate) fn has_builtin_prefix(
collaboration_modes_enabled: bool,
connectors_enabled: bool,
personality_command_enabled: bool,
realtime_command_enabled: bool,
allow_elevate_sandbox: bool,
) -> bool {
builtins_for_input(
collaboration_modes_enabled,
connectors_enabled,
personality_command_enabled,
realtime_command_enabled,
allow_elevate_sandbox,
)
.into_iter()
@@ -71,14 +77,14 @@ mod tests {
#[test]
fn debug_command_still_resolves_for_dispatch() {
let cmd = find_builtin_command("debug-config", true, true, true, false);
let cmd = find_builtin_command("debug-config", true, true, true, true, false);
assert_eq!(cmd, Some(SlashCommand::DebugConfig));
}
#[test]
fn clear_command_resolves_for_dispatch() {
assert_eq!(
find_builtin_command("clear", true, true, true, false),
find_builtin_command("clear", true, true, true, true, false),
Some(SlashCommand::Clear)
);
}

View File

@@ -92,6 +92,7 @@ use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::BackgroundEventEvent;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::CreditsSnapshot;
use codex_protocol::protocol::DeprecationNoticeEvent;
use codex_protocol::protocol::ErrorEvent;
@@ -114,6 +115,10 @@ use codex_protocol::protocol::McpToolCallEndEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RealtimeConversationClosedEvent;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeConversationStartedEvent;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
use codex_protocol::protocol::SkillMetadata as ProtocolSkillMetadata;
@@ -163,6 +168,11 @@ const PLAN_MODE_REASONING_SCOPE_TITLE: &str = "Apply reasoning change";
const PLAN_MODE_REASONING_SCOPE_PLAN_ONLY: &str = "Apply to Plan mode override";
const PLAN_MODE_REASONING_SCOPE_ALL_MODES: &str = "Apply to global default and Plan mode override";
const CONNECTORS_SELECTION_VIEW_ID: &str = "connectors-selection";
const REALTIME_CONVERSATION_PROMPT: &str =
"Low-level realtime conversation for TUI audio loop testing.";
const REALTIME_METER_PLACEHOLDER_ID: &str = "realtime-meter";
const REALTIME_LISTENING_PLACEHOLDER_TEXT: &str = "Realtime listening ⠤⠤⠤⠤";
const REALTIME_LISTENING_PREFIX_TEXT: &str = "Realtime listening ";
/// Choose the keybinding used to edit the most-recently queued message.
///
@@ -235,6 +245,8 @@ use crate::key_hint;
use crate::key_hint::KeyBinding;
use crate::markdown::append_markdown;
use crate::multi_agents;
use crate::realtime_audio::RealtimeAudioController;
use crate::realtime_audio::RealtimeMicMeterHandles;
use crate::render::Insets;
use crate::render::renderable::ColumnRenderable;
use crate::render::renderable::FlexRenderable;
@@ -250,6 +262,7 @@ use crate::tui::FrameRequester;
mod interrupts;
use self::interrupts::InterruptManager;
mod agent;
pub(crate) use self::agent::ChatWidgetOpSenders;
use self::agent::spawn_agent;
use self::agent::spawn_agent_from_existing;
pub(crate) use self::agent::spawn_op_forwarder;
@@ -503,6 +516,13 @@ pub(crate) enum ExternalEditorState {
Active,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
enum RealtimeConversationUiState {
#[default]
Stopped,
Working,
}
/// Maintains the per-session UI state and interaction state machines for the chat screen.
///
/// `ChatWidget` owns the state derived from the protocol event stream (history cells, streaming
@@ -518,6 +538,7 @@ pub(crate) enum ExternalEditorState {
pub(crate) struct ChatWidget {
app_event_tx: AppEventSender,
codex_op_tx: UnboundedSender<Op>,
realtime_audio_op_tx: tokio::sync::mpsc::Sender<Op>,
bottom_pane: BottomPane,
active_cell: Option<Box<dyn HistoryCell>>,
/// Monotonic-ish counter used to invalidate transcript overlay caching.
@@ -665,6 +686,9 @@ pub(crate) struct ChatWidget {
// True once we've attempted a branch lookup for the current CWD.
status_line_branch_lookup_complete: bool,
external_editor_state: ExternalEditorState,
realtime_conversation_state: RealtimeConversationUiState,
realtime_audio_controller: Option<RealtimeAudioController>,
realtime_meter_placeholder_id: Option<String>,
}
/// Snapshot of active-cell state that affects transcript overlay rendering.
@@ -2733,7 +2757,10 @@ impl ChatWidget {
let prevent_idle_sleep = config.features.enabled(Feature::PreventIdleSleep);
let mut rng = rand::rng();
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), thread_manager);
let ChatWidgetOpSenders {
codex_op_tx,
realtime_audio_op_tx,
} = spawn_agent(config.clone(), app_event_tx.clone(), thread_manager);
let model_override = model.as_deref();
let model_for_header = model
@@ -2765,6 +2792,7 @@ impl ChatWidget {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
codex_op_tx,
realtime_audio_op_tx,
bottom_pane: BottomPane::new(BottomPaneParams {
frame_requester,
app_event_tx,
@@ -2846,6 +2874,9 @@ impl ChatWidget {
status_line_branch_pending: false,
status_line_branch_lookup_complete: false,
external_editor_state: ExternalEditorState::Closed,
realtime_conversation_state: RealtimeConversationUiState::Stopped,
realtime_audio_controller: None,
realtime_meter_placeholder_id: None,
};
widget.prefetch_rate_limits();
@@ -2860,6 +2891,7 @@ impl ChatWidget {
.set_status_line_enabled(!widget.configured_status_line_items().is_empty());
widget.bottom_pane.set_collaboration_modes_enabled(true);
widget.sync_personality_command_enabled();
widget.sync_realtime_command_enabled();
widget
.bottom_pane
.set_queued_message_edit_binding(widget.queued_message_edit_binding);
@@ -2880,10 +2912,7 @@ impl ChatWidget {
widget
}
pub(crate) fn new_with_op_sender(
common: ChatWidgetInit,
codex_op_tx: UnboundedSender<Op>,
) -> Self {
pub(crate) fn new_with_op_sender(common: ChatWidgetInit, senders: ChatWidgetOpSenders) -> Self {
let ChatWidgetInit {
config,
frame_requester,
@@ -2905,6 +2934,10 @@ impl ChatWidget {
let prevent_idle_sleep = config.features.enabled(Feature::PreventIdleSleep);
let mut rng = rand::rng();
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
let ChatWidgetOpSenders {
codex_op_tx,
realtime_audio_op_tx,
} = senders;
let model_override = model.as_deref();
let model_for_header = model
@@ -2936,6 +2969,7 @@ impl ChatWidget {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
codex_op_tx,
realtime_audio_op_tx,
bottom_pane: BottomPane::new(BottomPaneParams {
frame_requester,
app_event_tx,
@@ -3017,6 +3051,9 @@ impl ChatWidget {
status_line_branch_pending: false,
status_line_branch_lookup_complete: false,
external_editor_state: ExternalEditorState::Closed,
realtime_conversation_state: RealtimeConversationUiState::Stopped,
realtime_audio_controller: None,
realtime_meter_placeholder_id: None,
};
widget.prefetch_rate_limits();
@@ -3031,6 +3068,7 @@ impl ChatWidget {
.set_status_line_enabled(!widget.configured_status_line_items().is_empty());
widget.bottom_pane.set_collaboration_modes_enabled(true);
widget.sync_personality_command_enabled();
widget.sync_realtime_command_enabled();
widget
.bottom_pane
.set_queued_message_edit_binding(widget.queued_message_edit_binding);
@@ -3076,8 +3114,10 @@ impl ChatWidget {
.unwrap_or(header_model);
let current_cwd = Some(session_configured.cwd.clone());
let codex_op_tx =
spawn_agent_from_existing(conversation, session_configured, app_event_tx.clone());
let ChatWidgetOpSenders {
codex_op_tx,
realtime_audio_op_tx,
} = spawn_agent_from_existing(conversation, session_configured, app_event_tx.clone());
let fallback_default = Settings {
model: header_model.clone(),
@@ -3096,6 +3136,7 @@ impl ChatWidget {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
codex_op_tx,
realtime_audio_op_tx,
bottom_pane: BottomPane::new(BottomPaneParams {
frame_requester,
app_event_tx,
@@ -3177,6 +3218,9 @@ impl ChatWidget {
status_line_branch_pending: false,
status_line_branch_lookup_complete: false,
external_editor_state: ExternalEditorState::Closed,
realtime_conversation_state: RealtimeConversationUiState::Stopped,
realtime_audio_controller: None,
realtime_meter_placeholder_id: None,
};
widget.prefetch_rate_limits();
@@ -3191,6 +3235,7 @@ impl ChatWidget {
.set_status_line_enabled(!widget.configured_status_line_items().is_empty());
widget.bottom_pane.set_collaboration_modes_enabled(true);
widget.sync_personality_command_enabled();
widget.sync_realtime_command_enabled();
widget
.bottom_pane
.set_queued_message_edit_binding(widget.queued_message_edit_binding);
@@ -3402,6 +3447,140 @@ impl ChatWidget {
self.bottom_pane.can_launch_external_editor()
}
fn enable_realtime_ui(&mut self) {
self.disable_realtime_ui();
self.set_footer_hint_override(Some(vec![(
"/realtime".to_string(),
"Stop voice chat".to_string(),
)]));
let id = REALTIME_METER_PLACEHOLDER_ID.to_string();
self.replace_transcription(&id, REALTIME_LISTENING_PLACEHOLDER_TEXT);
if let Some(controller) = &self.realtime_audio_controller
&& let Some(RealtimeMicMeterHandles { last_peak, stop }) = controller.meter_handles()
{
self.bottom_pane.spawn_live_meter_updates(
id.clone(),
Some(REALTIME_LISTENING_PREFIX_TEXT.to_string()),
last_peak,
stop,
);
}
self.realtime_meter_placeholder_id = Some(id);
}
fn disable_realtime_ui(&mut self) {
self.set_footer_hint_override(None);
if let Some(id) = self.realtime_meter_placeholder_id.take() {
self.remove_transcription_placeholder(&id);
}
}
fn toggle_realtime_conversation(&mut self) {
if !self.realtime_command_enabled() {
self.add_info_message(
"Realtime conversation is disabled.".to_string(),
Some(
"Enable `[features].realtime_conversation = true` in config.toml.".to_string(),
),
);
return;
}
match self.realtime_conversation_state {
RealtimeConversationUiState::Stopped => {
self.realtime_conversation_state = RealtimeConversationUiState::Working;
self.add_info_message("Starting realtime conversation...".to_string(), None);
self.submit_op(Op::RealtimeConversationStart(ConversationStartParams {
prompt: REALTIME_CONVERSATION_PROMPT.to_string(),
session_id: None,
}));
}
RealtimeConversationUiState::Working => {
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
if let Some(a) = self.realtime_audio_controller.take() {
RealtimeAudioController::shutdown(a)
}
self.disable_realtime_ui();
self.add_info_message("Stopping realtime conversation...".to_string(), None);
self.submit_op(Op::RealtimeConversationClose);
}
}
}
fn on_realtime_conversation_started(&mut self, event: RealtimeConversationStartedEvent) {
if self.realtime_conversation_state == RealtimeConversationUiState::Stopped {
self.submit_op(Op::RealtimeConversationClose);
return;
}
match RealtimeAudioController::start(self.realtime_audio_op_tx.clone()) {
Ok(controller) => {
self.realtime_audio_controller = Some(controller);
self.realtime_conversation_state = RealtimeConversationUiState::Working;
self.enable_realtime_ui();
let message = match event.session_id {
Some(session_id) => format!("Realtime conversation started ({session_id})."),
None => "Realtime conversation started.".to_string(),
};
self.add_info_message(message, None);
}
Err(err) => {
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
self.disable_realtime_ui();
self.add_error_message(format!("Failed to start realtime audio IO: {err}"));
self.submit_op(Op::RealtimeConversationClose);
}
}
}
fn on_realtime_conversation_event(&mut self, event: RealtimeConversationRealtimeEvent) {
match event.payload {
RealtimeEvent::AudioOut(frame) => {
if let Some(controller) = &self.realtime_audio_controller
&& let Err(err) = controller.enqueue_audio_out(frame)
{
warn!("failed to queue realtime audio playback frame: {err}");
}
}
RealtimeEvent::Error(message) => {
if let Some(a) = self.realtime_audio_controller.take() {
RealtimeAudioController::shutdown(a)
}
if self.realtime_conversation_state != RealtimeConversationUiState::Stopped {
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
}
self.disable_realtime_ui();
self.add_error_message(format!("Realtime conversation error: {message}"));
}
RealtimeEvent::SessionCreated { session_id } => {
debug!("realtime conversation session created: {session_id}");
}
RealtimeEvent::SessionUpdated { backend_prompt } => {
debug!(
"realtime conversation session updated: {:?}",
backend_prompt
);
}
RealtimeEvent::ConversationItemAdded(_) => {}
}
}
fn on_realtime_conversation_closed(&mut self, event: RealtimeConversationClosedEvent) {
if let Some(a) = self.realtime_audio_controller.take() {
RealtimeAudioController::shutdown(a)
}
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
self.disable_realtime_ui();
let message = match event.reason {
Some(reason) => format!("Realtime conversation closed ({reason})."),
None => "Realtime conversation closed.".to_string(),
};
self.add_info_message(message, None);
}
fn dispatch_command(&mut self, cmd: SlashCommand) {
if !cmd.available_during_task() && self.bottom_pane.is_task_running() {
let message = format!(
@@ -3559,6 +3738,9 @@ impl ChatWidget {
SlashCommand::Experimental => {
self.open_experimental_popup();
}
SlashCommand::Realtime => {
self.toggle_realtime_conversation();
}
SlashCommand::Quit | SlashCommand::Exit => {
self.request_quit_without_confirmation();
}
@@ -4302,7 +4484,9 @@ impl ChatWidget {
}
}
EventMsg::UserMessage(ev) => {
if from_replay {
if from_replay
|| self.realtime_conversation_state == RealtimeConversationUiState::Working
{
self.on_user_message_event(ev);
}
}
@@ -4332,14 +4516,26 @@ impl ChatWidget {
});
}
}
EventMsg::RealtimeConversationStarted(ev) => {
if !from_replay {
self.on_realtime_conversation_started(ev);
}
}
EventMsg::RealtimeConversationRealtime(ev) => {
if !from_replay {
self.on_realtime_conversation_event(ev);
}
}
EventMsg::RealtimeConversationClosed(ev) => {
if !from_replay {
self.on_realtime_conversation_closed(ev);
}
}
EventMsg::RawResponseItem(_)
| EventMsg::ItemStarted(_)
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::RealtimeConversationStarted(_)
| EventMsg::RealtimeConversationRealtime(_)
| EventMsg::RealtimeConversationClosed(_)
| EventMsg::DynamicToolCallRequest(_) => {}
EventMsg::ItemCompleted(event) => {
let item = event.item;
@@ -6389,6 +6585,19 @@ impl ChatWidget {
if feature == Feature::Personality {
self.sync_personality_command_enabled();
}
if feature == Feature::RealtimeConversation {
self.sync_realtime_command_enabled();
if !enabled {
if self.realtime_conversation_state != RealtimeConversationUiState::Stopped {
self.submit_op(Op::RealtimeConversationClose);
}
if let Some(a) = self.realtime_audio_controller.take() {
RealtimeAudioController::shutdown(a)
}
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
self.disable_realtime_ui();
}
}
if feature == Feature::PreventIdleSleep {
self.turn_sleep_inhibitor = SleepInhibitor::new(enabled);
self.turn_sleep_inhibitor
@@ -6501,6 +6710,16 @@ impl ChatWidget {
.set_personality_command_enabled(self.config.features.enabled(Feature::Personality));
}
fn sync_realtime_command_enabled(&mut self) {
self.bottom_pane.set_realtime_command_enabled(
self.config.features.enabled(Feature::RealtimeConversation),
);
}
fn realtime_command_enabled(&self) -> bool {
self.config.features.enabled(Feature::RealtimeConversation)
}
fn current_model_supports_personality(&self) -> bool {
let model = self.current_model();
self.models_manager
@@ -7537,7 +7756,6 @@ impl ChatWidget {
}
}
#[cfg(not(target_os = "linux"))]
impl ChatWidget {
pub(crate) fn replace_transcription(&mut self, id: &str, text: &str) {
self.bottom_pane.replace_transcription(id, text);

View File

@@ -7,20 +7,31 @@ use codex_core::config::Config;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::unbounded_channel;
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
const REALTIME_MIC_AUDIO_QUEUE_CAPACITY: usize = 64;
pub(crate) struct ChatWidgetOpSenders {
pub(crate) codex_op_tx: UnboundedSender<Op>,
pub(crate) realtime_audio_op_tx: Sender<Op>,
}
/// Spawn the agent bootstrapper and op forwarding loop, returning the
/// `UnboundedSender<Op>` used by the UI to submit operations.
pub(crate) fn spawn_agent(
config: Config,
app_event_tx: AppEventSender,
server: Arc<ThreadManager>,
) -> UnboundedSender<Op> {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
) -> ChatWidgetOpSenders {
let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels();
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
@@ -51,15 +62,7 @@ pub(crate) fn spawn_agent(
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
let thread_clone = thread.clone();
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
let id = thread_clone.submit(op).await;
if let Err(e) = id {
tracing::error!("failed to submit op: {e}");
}
}
});
spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx);
while let Ok(event) = thread.next_event().await {
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
@@ -72,18 +75,21 @@ pub(crate) fn spawn_agent(
}
});
codex_op_tx
ChatWidgetOpSenders {
codex_op_tx,
realtime_audio_op_tx,
}
}
/// Spawn agent loops for an existing thread (e.g., a forked thread).
/// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent
/// events and accepts Ops for submission.
pub(crate) fn spawn_agent_from_existing(
thread: std::sync::Arc<CodexThread>,
thread: Arc<CodexThread>,
session_configured: codex_protocol::protocol::SessionConfiguredEvent,
app_event_tx: AppEventSender,
) -> UnboundedSender<Op> {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
) -> ChatWidgetOpSenders {
let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels();
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
@@ -94,15 +100,7 @@ pub(crate) fn spawn_agent_from_existing(
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
let thread_clone = thread.clone();
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
let id = thread_clone.submit(op).await;
if let Err(e) = id {
tracing::error!("failed to submit op: {e}");
}
}
});
spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx);
while let Ok(event) = thread.next_event().await {
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
@@ -115,13 +113,56 @@ pub(crate) fn spawn_agent_from_existing(
}
});
codex_op_tx
ChatWidgetOpSenders {
codex_op_tx,
realtime_audio_op_tx,
}
}
/// Spawn an op-forwarding loop for an existing thread without subscribing to events.
pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> UnboundedSender<Op> {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
pub(crate) fn spawn_op_forwarder(thread: Arc<CodexThread>) -> ChatWidgetOpSenders {
let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels();
spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx);
ChatWidgetOpSenders {
codex_op_tx,
realtime_audio_op_tx,
}
}
fn spawn_bounded_op_forwarder(thread: Arc<CodexThread>, mut op_rx: Receiver<Op>) {
tokio::spawn(async move {
while let Some(op) = op_rx.recv().await {
if let Err(e) = thread.submit(op).await {
tracing::error!("failed to submit realtime audio op: {e}");
}
}
});
}
fn op_channels() -> (
UnboundedSender<Op>,
UnboundedReceiver<Op>,
Sender<Op>,
Receiver<Op>,
) {
let (codex_op_tx, codex_op_rx) = unbounded_channel::<Op>();
let (realtime_audio_op_tx, realtime_audio_op_rx) =
channel::<Op>(REALTIME_MIC_AUDIO_QUEUE_CAPACITY);
(
codex_op_tx,
codex_op_rx,
realtime_audio_op_tx,
realtime_audio_op_rx,
)
}
fn spawn_bounded_op_forwarder_threads(
thread: Arc<CodexThread>,
mut codex_op_rx: UnboundedReceiver<Op>,
realtime_audio_op_rx: Receiver<Op>,
) {
spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx);
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
if let Err(e) = thread.submit(op).await {
@@ -129,6 +170,4 @@ pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> Unbound
}
}
});
codex_op_tx
}

View File

@@ -53,6 +53,7 @@ use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::BackgroundEventEvent;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::CreditsSnapshot;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
@@ -73,6 +74,11 @@ use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::PatchApplyEndEvent;
use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
use codex_protocol::protocol::RateLimitWindow;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeConversationClosedEvent;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeConversationStartedEvent;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
use codex_protocol::protocol::SessionSource;
@@ -1643,10 +1649,13 @@ async fn make_chatwidget_manual(
},
};
let current_collaboration_mode = base_mode;
let (realtime_audio_op_tx, realtime_audio_op_rx) = tokio::sync::mpsc::channel(1);
drop(realtime_audio_op_rx);
let active_collaboration_mask = collaboration_modes::default_mask(models_manager.as_ref());
let mut widget = ChatWidget {
app_event_tx,
codex_op_tx: op_tx,
realtime_audio_op_tx,
bottom_pane: bottom,
active_cell: None,
active_cell_revision: 0,
@@ -1720,6 +1729,9 @@ async fn make_chatwidget_manual(
status_line_branch_pending: false,
status_line_branch_lookup_complete: false,
external_editor_state: ExternalEditorState::Closed,
realtime_conversation_state: RealtimeConversationUiState::Stopped,
realtime_audio_controller: None,
realtime_meter_placeholder_id: None,
};
widget.set_model(&resolved_model);
(widget, rx, op_rx)
@@ -3379,7 +3391,7 @@ async fn unified_exec_begin_restores_working_status_snapshot() {
#[tokio::test]
async fn steer_enter_queues_while_plan_stream_is_active() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
let (mut chat, _, mut op_rx) = make_chatwidget_manual(None).await;
chat.thread_id = Some(ThreadId::new());
chat.set_feature_enabled(Feature::CollaborationModes, true);
let plan_mask =
@@ -3404,7 +3416,7 @@ async fn steer_enter_queues_while_plan_stream_is_active() {
#[tokio::test]
async fn steer_enter_queues_while_final_answer_stream_is_active() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
let (mut chat, _, mut op_rx) = make_chatwidget_manual(None).await;
chat.thread_id = Some(ThreadId::new());
chat.on_task_started();
// Keep the assistant stream open (no commit tick/finalize) to model the repro window:
@@ -3528,7 +3540,7 @@ async fn steer_enter_submits_when_plan_stream_is_not_active() {
#[tokio::test]
async fn ctrl_c_shutdown_works_with_caps_lock() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
let (mut chat, mut rx, _) = make_chatwidget_manual(None).await;
chat.handle_key_event(KeyEvent::new(KeyCode::Char('C'), KeyModifiers::CONTROL));
@@ -4616,6 +4628,196 @@ async fn slash_quit_requests_exit() {
assert_matches!(rx.try_recv(), Ok(AppEvent::Exit(ExitMode::ShutdownFirst)));
}
#[tokio::test]
async fn slash_realtime_shows_error_when_feature_disabled() {
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(None).await;
chat.set_feature_enabled(Feature::RealtimeConversation, false);
chat.dispatch_command(SlashCommand::Realtime);
match op_rx.try_recv() {
Err(TryRecvError::Empty) => {}
other => panic!("expected no realtime op when feature is disabled, got {other:?}"),
}
let rendered = drain_insert_history(&mut rx)
.iter()
.map(|lines| lines_to_single_string(lines))
.collect::<Vec<_>>()
.join("\n");
assert!(
rendered.contains("Realtime conversation is disabled."),
"expected disabled message, got {rendered:?}"
);
assert!(
rendered.contains("realtime_conversation = true"),
"expected config hint, got {rendered:?}"
);
}
#[tokio::test]
async fn slash_realtime_toggle_sends_start_then_close_ops() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
chat.set_feature_enabled(Feature::RealtimeConversation, true);
chat.dispatch_command(SlashCommand::Realtime);
let start = match op_rx.try_recv() {
Ok(Op::RealtimeConversationStart(params)) => params,
other => panic!("expected realtime start op, got {other:?}"),
};
assert_eq!(
start,
ConversationStartParams {
prompt: REALTIME_CONVERSATION_PROMPT.to_string(),
session_id: None,
}
);
chat.dispatch_command(SlashCommand::Realtime);
assert_matches!(op_rx.try_recv(), Ok(Op::RealtimeConversationClose));
}
#[tokio::test]
async fn disabling_realtime_feature_closes_active_session() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
chat.set_feature_enabled(Feature::RealtimeConversation, true);
chat.dispatch_command(SlashCommand::Realtime);
assert_matches!(
op_rx.try_recv(),
Ok(Op::RealtimeConversationStart(
ConversationStartParams { .. }
))
);
assert_eq!(
chat.realtime_conversation_state,
RealtimeConversationUiState::Working
);
chat.set_feature_enabled(Feature::RealtimeConversation, false);
assert_eq!(
chat.realtime_conversation_state,
RealtimeConversationUiState::Stopped
);
assert_matches!(op_rx.try_recv(), Ok(Op::RealtimeConversationClose));
}
#[tokio::test]
async fn realtime_events_update_lifecycle_state() {
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(None).await;
chat.set_feature_enabled(Feature::RealtimeConversation, true);
chat.dispatch_command(SlashCommand::Realtime);
assert_matches!(
op_rx.try_recv(),
Ok(Op::RealtimeConversationStart(
ConversationStartParams { .. }
))
);
assert_eq!(
chat.realtime_conversation_state,
RealtimeConversationUiState::Working
);
chat.handle_codex_event(Event {
id: "rt-started".into(),
msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent {
session_id: Some("sess-123".to_string()),
}),
});
assert_eq!(
chat.realtime_conversation_state,
RealtimeConversationUiState::Working
);
chat.handle_codex_event(Event {
id: "rt-audio".into(),
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(RealtimeAudioFrame {
data: String::new(),
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: Some(0),
}),
}),
});
assert_eq!(
chat.realtime_conversation_state,
RealtimeConversationUiState::Working
);
chat.handle_codex_event(Event {
id: "rt-closed".into(),
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent {
reason: Some("requested".to_string()),
}),
});
assert_eq!(
chat.realtime_conversation_state,
RealtimeConversationUiState::Stopped
);
let rendered = drain_insert_history(&mut rx)
.iter()
.map(|lines| lines_to_single_string(lines))
.collect::<Vec<_>>()
.join("\n");
assert!(
rendered.contains("Starting realtime conversation..."),
"expected start status message, got {rendered:?}"
);
assert!(
rendered.contains("Realtime conversation started (sess-123)."),
"expected started status message, got {rendered:?}"
);
assert!(
rendered.contains("Realtime conversation closed (requested)."),
"expected closed status message, got {rendered:?}"
);
}
#[tokio::test]
async fn live_user_message_renders_only_during_realtime_mode() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
let user_message_event = || {
EventMsg::UserMessage(UserMessageEvent {
message: "delegated user text".to_string(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})
};
chat.handle_codex_event(Event {
id: "live-user-normal".into(),
msg: user_message_event(),
});
assert!(
drain_insert_history(&mut rx).is_empty(),
"expected live user message to remain hidden outside realtime mode"
);
chat.realtime_conversation_state = RealtimeConversationUiState::Working;
chat.handle_codex_event(Event {
id: "live-user-realtime".into(),
msg: user_message_event(),
});
let rendered = drain_insert_history(&mut rx)
.iter()
.map(|lines| lines_to_single_string(lines))
.collect::<Vec<_>>()
.join("\n");
assert!(
rendered.contains("delegated user text"),
"expected live user message in realtime mode, got {rendered:?}"
);
}
#[tokio::test]
async fn slash_exit_requests_exit() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;

View File

@@ -93,6 +93,7 @@ pub mod onboarding;
mod oss_selection;
mod pager_overlay;
pub mod public_widgets;
mod realtime_audio;
mod render;
mod resume_picker;
mod selection_list;
@@ -116,7 +117,10 @@ mod updates;
mod version;
#[cfg(all(not(target_os = "linux"), feature = "voice-input"))]
mod voice;
#[cfg(all(not(target_os = "linux"), not(feature = "voice-input")))]
#[cfg(any(
target_os = "linux",
all(not(target_os = "linux"), not(feature = "voice-input"))
))]
mod voice {
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;

View File

@@ -0,0 +1,569 @@
use anyhow::Context;
use anyhow::Result;
use base64::Engine;
use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeAudioFrame;
#[cfg(not(target_os = "linux"))]
use cpal::traits::DeviceTrait;
#[cfg(not(target_os = "linux"))]
use cpal::traits::HostTrait;
#[cfg(not(target_os = "linux"))]
use cpal::traits::StreamTrait;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;
use std::thread;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::error::TrySendError;
use tracing::warn;
const TARGET_SAMPLE_RATE: u32 = 24_000;
const TARGET_NUM_CHANNELS: u16 = 1;
const TARGET_SAMPLES_PER_CHANNEL: u32 = 480;
const PLAYBACK_BUFFER_SECONDS: usize = 5;
const MAX_AUDIO_FRAME_DECODED_BYTES: usize = 128 * 1024;
const MAX_AUDIO_FRAME_ENCODED_BYTES: usize = 192 * 1024;
const MIN_REALTIME_PLAYBACK_SAMPLE_RATE: u32 = 8_000;
const MAX_REALTIME_PLAYBACK_SAMPLE_RATE: u32 = 192_000;
const MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME: usize = 480_000;
pub(crate) struct RealtimeAudioController {
backend: RealtimeAudioBackend,
}
pub(crate) struct RealtimeMicMeterHandles {
pub(crate) last_peak: Arc<AtomicU16>,
pub(crate) stop: Arc<AtomicBool>,
}
enum RealtimeAudioBackend {
#[cfg(not(target_os = "linux"))]
Live(LiveRealtimeAudioController),
Stub,
}
struct LiveRealtimeAudioController {
input_capture: crate::voice::VoiceCapture,
input_capture_thread: Option<thread::JoinHandle<()>>,
#[cfg(not(target_os = "linux"))]
output_stream: cpal::Stream,
playback_state: Arc<Mutex<PlaybackState>>,
last_input_peak: Arc<AtomicU16>,
meter_stop: Arc<AtomicBool>,
}
impl RealtimeAudioController {
pub(crate) fn start(realtime_audio_op_tx: Sender<Op>) -> Result<Self> {
if cfg!(test) {
drop(realtime_audio_op_tx);
return Ok(Self {
backend: RealtimeAudioBackend::Stub,
});
}
#[cfg(target_os = "linux")]
{
drop(realtime_audio_op_tx);
return Ok(Self {
backend: RealtimeAudioBackend::Stub,
});
}
#[cfg(not(target_os = "linux"))]
let host = cpal::default_host();
#[cfg(not(target_os = "linux"))]
let output_device = host
.default_output_device()
.context("no default output device available")?;
#[cfg(not(target_os = "linux"))]
let output_supported = output_device
.default_output_config()
.context("failed to query default output config")?;
#[cfg(not(target_os = "linux"))]
let output_config = output_supported.config();
// TODO(aibrahim): Add persisted audio device + sample-rate selection/config for TUI
// realtime conversations instead of always using defaults.
#[cfg(not(target_os = "linux"))]
let input_capture = crate::voice::VoiceCapture::start().map_err(anyhow::Error::msg)?;
#[cfg(not(target_os = "linux"))]
let source_sample_rate = input_capture.sample_rate();
#[cfg(not(target_os = "linux"))]
let source_channels = input_capture.channels();
#[cfg(not(target_os = "linux"))]
if source_sample_rate == 0 || source_channels == 0 {
return Err(anyhow::anyhow!(
"unsupported realtime microphone format from VoiceCapture"
));
}
#[cfg(not(target_os = "linux"))]
let source_data = input_capture.data_arc();
#[cfg(not(target_os = "linux"))]
let last_input_peak = input_capture.last_peak_arc();
#[cfg(not(target_os = "linux"))]
let meter_stop = input_capture.stopped_flag();
#[cfg(not(target_os = "linux"))]
let mic_state = Arc::new(Mutex::new(MicCaptureState::new(
realtime_audio_op_tx,
source_sample_rate,
source_channels,
)));
#[cfg(not(target_os = "linux"))]
let source_data_thread = source_data;
#[cfg(not(target_os = "linux"))]
let input_state = Arc::clone(&mic_state);
#[cfg(not(target_os = "linux"))]
let input_capture_thread = thread::spawn({
let stop = Arc::clone(&meter_stop);
move || {
while !stop.load(Ordering::Relaxed) {
let samples = match source_data_thread.lock() {
Ok(mut data) => {
if data.is_empty() {
None
} else {
Some(std::mem::take(&mut *data))
}
}
Err(_) => {
warn!("failed to lock realtime microphone buffer");
None
}
};
if let Some(samples) = samples {
if let Ok(mut state) = input_state.lock() {
state.push_input_samples_i16(&samples);
} else {
warn!("failed to lock realtime microphone processing state");
}
}
thread::sleep(Duration::from_millis(5));
}
}
});
#[cfg(not(target_os = "linux"))]
let playback_state = Arc::new(Mutex::new(PlaybackState::new(
output_config.sample_rate.0,
output_config.channels,
)));
#[cfg(not(target_os = "linux"))]
let output_stream = build_output_stream(
&output_device,
output_supported.sample_format(),
&output_config,
Arc::clone(&playback_state),
)
.context("failed to open speaker output stream")?;
#[cfg(not(target_os = "linux"))]
output_stream
.play()
.context("failed to start speaker output stream")?;
#[cfg(not(target_os = "linux"))]
Ok(Self {
backend: RealtimeAudioBackend::Live(LiveRealtimeAudioController {
input_capture,
input_capture_thread: Some(input_capture_thread),
output_stream,
playback_state,
last_input_peak,
meter_stop,
}),
})
}
pub(crate) fn enqueue_audio_out(&self, frame: RealtimeAudioFrame) -> Result<()> {
match &self.backend {
#[cfg(not(target_os = "linux"))]
RealtimeAudioBackend::Live(controller) => {
let mut state = controller
.playback_state
.lock()
.map_err(|_| anyhow::anyhow!("playback state lock poisoned"))?;
state.enqueue(frame)?;
}
RealtimeAudioBackend::Stub => {
return Ok(());
}
}
Ok(())
}
pub(crate) fn shutdown(self) {
#[cfg(not(target_os = "linux"))]
if let RealtimeAudioBackend::Live(controller) = self.backend {
#[cfg(not(target_os = "linux"))]
if let Err(err) = controller.input_capture.stop().map(|_| ()) {
warn!("failed to stop realtime microphone capture: {err}");
}
#[cfg(not(target_os = "linux"))]
if let Some(handle) = controller.input_capture_thread {
if let Err(err) = handle.join() {
warn!("failed to join realtime microphone input thread: {err:?}");
}
}
}
}
pub(crate) fn meter_handles(&self) -> Option<RealtimeMicMeterHandles> {
match &self.backend {
#[cfg(not(target_os = "linux"))]
RealtimeAudioBackend::Live(controller) => Some(RealtimeMicMeterHandles {
last_peak: Arc::clone(&controller.last_input_peak),
stop: Arc::clone(&controller.meter_stop),
}),
RealtimeAudioBackend::Stub => None,
}
}
}
#[derive(Debug)]
struct MicCaptureState {
realtime_audio_op_tx: Sender<Op>,
source_sample_rate: u32,
source_channels: u16,
source_mono: Vec<f32>,
source_position: f64,
resampled: Vec<f32>,
}
impl MicCaptureState {
fn new(
realtime_audio_op_tx: Sender<Op>,
source_sample_rate: u32,
source_channels: u16,
) -> Self {
Self {
realtime_audio_op_tx,
source_sample_rate,
source_channels,
source_mono: Vec::new(),
source_position: 0.0,
resampled: Vec::new(),
}
}
fn push_input_samples_i16(&mut self, data: &[i16]) {
self.push_mono_samples_from_frames(
data.iter().map(|sample| *sample as f32 / i16::MAX as f32),
);
}
fn push_mono_samples_from_frames<I>(&mut self, mut samples: I)
where
I: Iterator<Item = f32>,
{
let channels = usize::from(self.source_channels.max(1));
loop {
let mut sum = 0.0f32;
let mut count = 0usize;
for _ in 0..channels {
let Some(sample) = samples.next() else {
self.process_and_send_ready_frames();
return;
};
sum += sample;
count += 1;
}
self.source_mono.push(sum / count as f32);
}
}
fn process_and_send_ready_frames(&mut self) {
if self.source_mono.is_empty() {
return;
}
let step = self.source_sample_rate as f64 / TARGET_SAMPLE_RATE as f64;
while self.source_position + 1.0 < self.source_mono.len() as f64 {
let idx = self.source_position.floor() as usize;
let frac = (self.source_position - idx as f64) as f32;
let a = self.source_mono[idx];
let b = self.source_mono[idx + 1];
self.resampled.push(a + (b - a) * frac);
self.source_position += step;
}
let consumed = self.source_position.floor() as usize;
if consumed > 0 {
self.source_mono.drain(..consumed);
self.source_position -= consumed as f64;
}
let chunk_len = TARGET_SAMPLES_PER_CHANNEL as usize;
while self.resampled.len() >= chunk_len {
let samples: Vec<f32> = self.resampled.drain(..chunk_len).collect();
let data = encode_pcm16_le_base64(&samples);
let op = Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data,
sample_rate: TARGET_SAMPLE_RATE,
num_channels: TARGET_NUM_CHANNELS,
samples_per_channel: Some(TARGET_SAMPLES_PER_CHANNEL),
},
});
match self.realtime_audio_op_tx.try_send(op) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
warn!("dropping realtime microphone frame due to full TUI audio queue");
}
Err(TrySendError::Closed(_)) => {
warn!("failed to send realtime microphone frame: channel closed");
break;
}
}
}
}
}
#[derive(Debug)]
struct PlaybackState {
output_sample_rate: u32,
output_channels: u16,
queue: VecDeque<f32>,
max_queue_samples: usize,
}
impl PlaybackState {
fn new(output_sample_rate: u32, output_channels: u16) -> Self {
let max_queue_samples =
output_sample_rate as usize * usize::from(output_channels) * PLAYBACK_BUFFER_SECONDS;
Self {
output_sample_rate,
output_channels,
queue: VecDeque::new(),
max_queue_samples,
}
}
fn enqueue(&mut self, frame: RealtimeAudioFrame) -> Result<()> {
if frame.num_channels == 0 {
return Ok(());
}
if frame.data.len() > MAX_AUDIO_FRAME_ENCODED_BYTES {
warn!(
encoded_len = frame.data.len(),
"dropping oversized realtime audio frame before base64 decode"
);
return Ok(());
}
let decoded = base64::engine::general_purpose::STANDARD
.decode(frame.data.as_bytes())
.context("failed to decode realtime audio base64")?;
if decoded.len() > MAX_AUDIO_FRAME_DECODED_BYTES {
warn!(
decoded_len = decoded.len(),
"dropping oversized realtime audio frame after base64 decode"
);
return Ok(());
}
if decoded.len() % 2 != 0 {
return Err(anyhow::anyhow!(
"realtime audio payload has odd byte length"
));
}
let pcm: Vec<i16> = decoded
.chunks_exact(2)
.map(|chunk| i16::from_le_bytes([chunk[0], chunk[1]]))
.collect();
if pcm.is_empty() {
return Ok(());
}
let mono = interleaved_i16_to_mono_f32(&pcm, frame.num_channels);
if frame.sample_rate == 0 {
warn!("dropping realtime audio frame with zero sample rate");
return Ok(());
}
if !is_supported_realtime_playback_sample_rate(frame.sample_rate) {
warn!(
sample_rate = frame.sample_rate,
min_sample_rate = MIN_REALTIME_PLAYBACK_SAMPLE_RATE,
max_sample_rate = MAX_REALTIME_PLAYBACK_SAMPLE_RATE,
"dropping realtime audio frame with unsupported sample rate"
);
return Ok(());
}
let Some(resampled) =
resample_linear_mono(&mono, frame.sample_rate, self.output_sample_rate.max(1))
else {
warn!(
src_rate = frame.sample_rate,
dst_rate = self.output_sample_rate.max(1),
max_samples = MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME,
"dropping realtime audio frame due to oversized resampled output"
);
return Ok(());
};
for sample in resampled {
for _ in 0..self.output_channels {
self.queue.push_back(sample);
}
}
if self.queue.len() > self.max_queue_samples {
let drop_count = self.queue.len() - self.max_queue_samples;
self.queue.drain(..drop_count);
warn!("dropping old playback samples due to realtime audio buffer overflow");
}
Ok(())
}
fn next_sample(&mut self) -> f32 {
self.queue.pop_front().unwrap_or(0.0)
}
}
fn interleaved_i16_to_mono_f32(samples: &[i16], num_channels: u16) -> Vec<f32> {
let channels = usize::from(num_channels.max(1));
let mut mono = Vec::with_capacity(samples.len() / channels.max(1));
for frame in samples.chunks(channels) {
let sum: f32 = frame
.iter()
.map(|sample| *sample as f32 / i16::MAX as f32)
.sum();
mono.push(sum / frame.len() as f32);
}
mono
}
fn is_supported_realtime_playback_sample_rate(sample_rate: u32) -> bool {
(MIN_REALTIME_PLAYBACK_SAMPLE_RATE..=MAX_REALTIME_PLAYBACK_SAMPLE_RATE).contains(&sample_rate)
}
fn resample_linear_mono(input: &[f32], src_rate: u32, dst_rate: u32) -> Option<Vec<f32>> {
if input.is_empty() || src_rate == 0 || dst_rate == 0 {
return Some(Vec::new());
}
if src_rate == dst_rate || input.len() == 1 {
return Some(input.to_vec());
}
let out_len = ((input.len() as u64 * dst_rate as u64) / src_rate as u64).max(1);
if out_len > MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME as u64 {
return None;
}
let out_len = usize::try_from(out_len).ok()?;
let step = src_rate as f64 / dst_rate as f64;
let mut pos = 0.0f64;
let mut out = Vec::with_capacity(out_len);
for _ in 0..out_len {
let idx = pos.floor() as usize;
if idx + 1 >= input.len() {
out.push(*input.last().unwrap_or(&0.0));
} else {
let frac = (pos - idx as f64) as f32;
let a = input[idx];
let b = input[idx + 1];
out.push(a + (b - a) * frac);
}
pos += step;
}
Some(out)
}
fn encode_pcm16_le_base64(samples: &[f32]) -> String {
let mut bytes = Vec::with_capacity(samples.len() * 2);
for sample in samples {
let clamped = sample.clamp(-1.0, 1.0);
let scaled = (clamped * i16::MAX as f32).round() as i16;
bytes.extend_from_slice(&scaled.to_le_bytes());
}
base64::engine::general_purpose::STANDARD.encode(bytes)
}
#[cfg(not(target_os = "linux"))]
fn build_output_stream(
device: &cpal::Device,
sample_format: cpal::SampleFormat,
config: &cpal::StreamConfig,
playback_state: Arc<Mutex<PlaybackState>>,
) -> Result<cpal::Stream> {
let err_fn = |err| warn!("realtime speaker stream error: {err}");
let stream = match sample_format {
cpal::SampleFormat::F32 => device.build_output_stream(
config,
move |data: &mut [f32], _| write_output_f32(data, &playback_state),
err_fn,
None,
)?,
cpal::SampleFormat::I16 => device.build_output_stream(
config,
move |data: &mut [i16], _| write_output_i16(data, &playback_state),
err_fn,
None,
)?,
cpal::SampleFormat::U16 => device.build_output_stream(
config,
move |data: &mut [u16], _| write_output_u16(data, &playback_state),
err_fn,
None,
)?,
other => {
return Err(anyhow::anyhow!(
"unsupported speaker sample format: {other:?}"
));
}
};
Ok(stream)
}
#[cfg(not(target_os = "linux"))]
fn write_output_f32(data: &mut [f32], playback_state: &Arc<Mutex<PlaybackState>>) {
fill_output_buffer(data, playback_state, |sample| sample);
}
#[cfg(not(target_os = "linux"))]
fn write_output_i16(data: &mut [i16], playback_state: &Arc<Mutex<PlaybackState>>) {
fill_output_buffer(data, playback_state, |sample| {
(sample.clamp(-1.0, 1.0) * i16::MAX as f32).round() as i16
});
}
#[cfg(not(target_os = "linux"))]
fn write_output_u16(data: &mut [u16], playback_state: &Arc<Mutex<PlaybackState>>) {
fill_output_buffer(data, playback_state, |sample| {
let normalized = (sample.clamp(-1.0, 1.0) + 1.0) * 0.5;
(normalized * u16::MAX as f32).round() as u16
});
}
#[cfg(not(target_os = "linux"))]
fn fill_output_buffer<T>(
data: &mut [T],
playback_state: &Arc<Mutex<PlaybackState>>,
mut convert: impl FnMut(f32) -> T,
) {
let mut maybe_state = playback_state.lock().ok();
for slot in data.iter_mut() {
let sample = maybe_state
.as_mut()
.map_or(0.0, |state| state.next_sample());
*slot = convert(sample);
}
}
#[cfg(test)]
mod tests {
use super::MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME;
use super::is_supported_realtime_playback_sample_rate;
use super::resample_linear_mono;
use pretty_assertions::assert_eq;
#[test]
fn resample_linear_passthrough_when_same_rate() {
let input = vec![0.1, -0.2, 0.3];
assert_eq!(resample_linear_mono(&input, 24_000, 24_000), Some(input));
}
#[test]
fn playback_sample_rate_validation_rejects_unrealistic_rates() {
assert!(!is_supported_realtime_playback_sample_rate(1));
assert!(!is_supported_realtime_playback_sample_rate(7_999));
assert!(is_supported_realtime_playback_sample_rate(8_000));
assert!(is_supported_realtime_playback_sample_rate(24_000));
assert!(is_supported_realtime_playback_sample_rate(192_000));
assert!(!is_supported_realtime_playback_sample_rate(192_001));
}
#[test]
fn resample_linear_returns_none_when_output_would_exceed_cap() {
let input = vec![0.0; (MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME / 2) + 1];
assert_eq!(resample_linear_mono(&input, 1, 2), None);
}
}

View File

@@ -20,6 +20,7 @@ pub enum SlashCommand {
#[strum(serialize = "sandbox-add-read-dir")]
SandboxReadRoot,
Experimental,
Realtime,
Skills,
Review,
Rename,
@@ -79,6 +80,7 @@ impl SlashCommand {
SlashCommand::DebugConfig => "show config layers and requirement sources for debugging",
SlashCommand::Statusline => "configure which items appear in the status line",
SlashCommand::Theme => "choose a syntax highlighting theme",
SlashCommand::Realtime => "toggle hidden low-level realtime mic/speaker loop",
SlashCommand::Ps => "list background terminals",
SlashCommand::Clean => "stop all background terminals",
SlashCommand::MemoryDrop => "DO NOT USE",
@@ -155,6 +157,7 @@ impl SlashCommand {
| SlashCommand::Feedback
| SlashCommand::Quit
| SlashCommand::Exit => true,
SlashCommand::Realtime => true,
SlashCommand::Rollout => true,
SlashCommand::TestApproval => true,
SlashCommand::Collab => true,