mirror of
https://github.com/openai/codex.git
synced 2026-05-21 19:45:26 +00:00
Compare commits
14 Commits
dev/winsto
...
aibrahim/h
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6afe612b70 | ||
|
|
2648a7c52b | ||
|
|
60dc38b962 | ||
|
|
21e634da6b | ||
|
|
5f8024e9e4 | ||
|
|
f7c740fff1 | ||
|
|
a7a812e7bf | ||
|
|
7f126df60e | ||
|
|
128a7d2ae5 | ||
|
|
5498a8a3e3 | ||
|
|
19d9d7c6a5 | ||
|
|
091fa0b980 | ||
|
|
968f3fc9de | ||
|
|
7e0bcc0812 |
256
MODULE.bazel.lock
generated
256
MODULE.bazel.lock
generated
File diff suppressed because one or more lines are too long
677
codex-rs/Cargo.lock
generated
677
codex-rs/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -163,6 +163,7 @@ clap_complete = "4"
|
||||
color-eyre = "0.6.3"
|
||||
crossbeam-channel = "0.5.15"
|
||||
crossterm = "0.28.1"
|
||||
cpal = "0.15.3"
|
||||
ctor = "0.6.3"
|
||||
derive_more = "2"
|
||||
diffy = "0.4.2"
|
||||
|
||||
@@ -367,6 +367,9 @@
|
||||
"prevent_idle_sleep": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"realtime_conversation": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"remote_models": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -1651,6 +1654,9 @@
|
||||
"prevent_idle_sleep": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"realtime_conversation": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"remote_models": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -146,6 +146,8 @@ pub enum Feature {
|
||||
ResponsesWebsockets,
|
||||
/// Enable Responses API websocket v2 mode.
|
||||
ResponsesWebsocketsV2,
|
||||
/// Enable hidden TUI `/realtime` low-level realtime conversation controls.
|
||||
RealtimeConversation,
|
||||
}
|
||||
|
||||
impl Feature {
|
||||
@@ -661,6 +663,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::RealtimeConversation,
|
||||
key: "realtime_conversation",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
];
|
||||
|
||||
/// Push a warning event if any under-development features are enabled.
|
||||
|
||||
@@ -254,26 +254,31 @@ pub(crate) async fn handle_audio(
|
||||
|
||||
fn realtime_text_from_conversation_item(item: &Value) -> Option<String> {
|
||||
match item.get("type").and_then(Value::as_str) {
|
||||
Some("message") => {
|
||||
if item.get("role").and_then(Value::as_str) != Some("assistant") {
|
||||
return None;
|
||||
}
|
||||
let content = item.get("content")?.as_array()?;
|
||||
let text = content
|
||||
.iter()
|
||||
.filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text"))
|
||||
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
|
||||
.collect::<String>();
|
||||
if text.is_empty() { None } else { Some(text) }
|
||||
}
|
||||
Some("spawn_transcript") => item
|
||||
.get("delta_user_transcript")
|
||||
.and_then(Value::as_str)
|
||||
.and_then(|text| (!text.is_empty()).then(|| text.to_string())),
|
||||
Some("message") => extract_assistant_message_text(item),
|
||||
Some("spawn_transcript") => extract_spawn_transcript_delta(item),
|
||||
Some(_) | None => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_assistant_message_text(item: &Value) -> Option<String> {
|
||||
if item.get("role").and_then(Value::as_str) != Some("assistant") {
|
||||
return None;
|
||||
}
|
||||
let content = item.get("content")?.as_array()?;
|
||||
let text = content
|
||||
.iter()
|
||||
.filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text"))
|
||||
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
|
||||
.collect::<String>();
|
||||
if text.is_empty() { None } else { Some(text) }
|
||||
}
|
||||
|
||||
fn extract_spawn_transcript_delta(item: &Value) -> Option<String> {
|
||||
item.get("delta_user_transcript")
|
||||
.and_then(Value::as_str)
|
||||
.and_then(|text| (!text.is_empty()).then(|| text.to_string()))
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_text(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: String,
|
||||
|
||||
@@ -1002,8 +1002,13 @@ impl App {
|
||||
let codex_op_tx = if let Some(thread) = live_thread {
|
||||
crate::chatwidget::spawn_op_forwarder(thread)
|
||||
} else {
|
||||
let (tx, _rx) = unbounded_channel();
|
||||
tx
|
||||
let (codex_op_tx, codex_op_rx) = unbounded_channel();
|
||||
let (realtime_audio_op_tx, realtime_audio_op_rx) = tokio::sync::mpsc::channel(1);
|
||||
drop((codex_op_rx, realtime_audio_op_rx));
|
||||
crate::chatwidget::ChatWidgetOpSenders {
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
}
|
||||
};
|
||||
self.chat_widget = ChatWidget::new_with_op_sender(init, codex_op_tx);
|
||||
|
||||
@@ -2713,7 +2718,6 @@ impl App {
|
||||
AppEvent::TranscriptionFailed { id, error: _ } => {
|
||||
self.chat_widget.remove_transcription_placeholder(&id);
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
AppEvent::UpdateRecordingMeter { id, text } => {
|
||||
// Update in place to preserve the element id for subsequent frames.
|
||||
let updated = self.chat_widget.update_transcription_in_place(&id, &text);
|
||||
|
||||
@@ -321,9 +321,9 @@ pub(crate) enum AppEvent {
|
||||
/// Re-open the permissions presets popup.
|
||||
OpenPermissionsPopup,
|
||||
|
||||
/// Live update for the in-progress voice recording placeholder. Carries
|
||||
/// the placeholder `id` and the text to display (e.g., an ASCII meter).
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
/// Live update for an in-progress named composer placeholder (used by
|
||||
/// voice recording/transcription and realtime mic metering). Carries the
|
||||
/// placeholder `id` and the text to display.
|
||||
UpdateRecordingMeter {
|
||||
id: String,
|
||||
text: String,
|
||||
|
||||
@@ -215,8 +215,6 @@ use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
@@ -389,6 +387,7 @@ pub(crate) struct ChatComposer {
|
||||
collaboration_mode_indicator: Option<CollaborationModeIndicator>,
|
||||
connectors_enabled: bool,
|
||||
personality_command_enabled: bool,
|
||||
realtime_command_enabled: bool,
|
||||
windows_degraded_sandbox_active: bool,
|
||||
status_line_value: Option<Line<'static>>,
|
||||
status_line_enabled: bool,
|
||||
@@ -494,6 +493,7 @@ impl ChatComposer {
|
||||
collaboration_mode_indicator: None,
|
||||
connectors_enabled: false,
|
||||
personality_command_enabled: false,
|
||||
realtime_command_enabled: false,
|
||||
windows_degraded_sandbox_active: false,
|
||||
status_line_value: None,
|
||||
status_line_enabled: false,
|
||||
@@ -577,7 +577,9 @@ impl ChatComposer {
|
||||
pub fn set_personality_command_enabled(&mut self, enabled: bool) {
|
||||
self.personality_command_enabled = enabled;
|
||||
}
|
||||
|
||||
pub fn set_realtime_command_enabled(&mut self, enabled: bool) {
|
||||
self.realtime_command_enabled = enabled;
|
||||
}
|
||||
pub fn set_voice_transcription_enabled(&mut self, enabled: bool) {
|
||||
self.voice_state.transcription_enabled = enabled;
|
||||
if !enabled {
|
||||
@@ -2245,6 +2247,7 @@ impl ChatComposer {
|
||||
self.collaboration_modes_enabled,
|
||||
self.connectors_enabled,
|
||||
self.personality_command_enabled,
|
||||
self.realtime_command_enabled,
|
||||
self.windows_degraded_sandbox_active,
|
||||
)
|
||||
.is_some();
|
||||
@@ -2444,6 +2447,7 @@ impl ChatComposer {
|
||||
self.collaboration_modes_enabled,
|
||||
self.connectors_enabled,
|
||||
self.personality_command_enabled,
|
||||
self.realtime_command_enabled,
|
||||
self.windows_degraded_sandbox_active,
|
||||
)
|
||||
{
|
||||
@@ -2478,6 +2482,7 @@ impl ChatComposer {
|
||||
self.collaboration_modes_enabled,
|
||||
self.connectors_enabled,
|
||||
self.personality_command_enabled,
|
||||
self.realtime_command_enabled,
|
||||
self.windows_degraded_sandbox_active,
|
||||
)?;
|
||||
|
||||
@@ -3310,6 +3315,7 @@ impl ChatComposer {
|
||||
self.collaboration_modes_enabled,
|
||||
self.connectors_enabled,
|
||||
self.personality_command_enabled,
|
||||
self.realtime_command_enabled,
|
||||
self.windows_degraded_sandbox_active,
|
||||
)
|
||||
.is_some();
|
||||
@@ -3371,6 +3377,7 @@ impl ChatComposer {
|
||||
self.collaboration_modes_enabled,
|
||||
self.connectors_enabled,
|
||||
self.personality_command_enabled,
|
||||
self.realtime_command_enabled,
|
||||
self.windows_degraded_sandbox_active,
|
||||
) {
|
||||
return true;
|
||||
@@ -3424,12 +3431,14 @@ impl ChatComposer {
|
||||
let collaboration_modes_enabled = self.collaboration_modes_enabled;
|
||||
let connectors_enabled = self.connectors_enabled;
|
||||
let personality_command_enabled = self.personality_command_enabled;
|
||||
let realtime_command_enabled = self.realtime_command_enabled;
|
||||
let mut command_popup = CommandPopup::new(
|
||||
self.custom_prompts.clone(),
|
||||
CommandPopupFlags {
|
||||
collaboration_modes_enabled,
|
||||
connectors_enabled,
|
||||
personality_command_enabled,
|
||||
realtime_command_enabled,
|
||||
windows_degraded_sandbox_active: self.windows_degraded_sandbox_active,
|
||||
},
|
||||
);
|
||||
@@ -3831,13 +3840,10 @@ impl ChatComposer {
|
||||
self.voice_state.recording_placeholder_id = Some(id);
|
||||
// Spawn metering animation
|
||||
if let Some(v) = &self.voice_state.voice {
|
||||
let data = v.data_arc();
|
||||
let stop = v.stopped_flag();
|
||||
let sr = v.sample_rate();
|
||||
let ch = v.channels();
|
||||
let peak = v.last_peak_arc();
|
||||
if let Some(idref) = &self.voice_state.recording_placeholder_id {
|
||||
self.spawn_recording_meter(idref.clone(), sr, ch, data, peak, stop);
|
||||
self.spawn_live_meter_updates(idref.clone(), None, peak, stop);
|
||||
}
|
||||
}
|
||||
true
|
||||
@@ -3851,12 +3857,10 @@ impl ChatComposer {
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_recording_meter(
|
||||
pub(crate) fn spawn_live_meter_updates(
|
||||
&self,
|
||||
id: String,
|
||||
_sample_rate: u32,
|
||||
_channels: u16,
|
||||
_data: Arc<Mutex<Vec<i16>>>,
|
||||
prefix: Option<String>,
|
||||
last_peak: Arc<std::sync::atomic::AtomicU16>,
|
||||
stop: Arc<std::sync::atomic::AtomicBool>,
|
||||
) {
|
||||
@@ -3868,7 +3872,11 @@ impl ChatComposer {
|
||||
if stop.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let text = meter.next_text(last_peak.load(Ordering::Relaxed));
|
||||
let meter_text = meter.next_text(last_peak.load(Ordering::Relaxed));
|
||||
let text = match &prefix {
|
||||
Some(prefix) => format!("{prefix}{meter_text}"),
|
||||
None => meter_text,
|
||||
};
|
||||
tx.send(crate::app_event::AppEvent::UpdateRecordingMeter {
|
||||
id: id.clone(),
|
||||
text,
|
||||
@@ -3946,6 +3954,38 @@ impl ChatComposer {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
impl ChatComposer {
|
||||
pub(crate) fn spawn_live_meter_updates(
|
||||
&self,
|
||||
id: String,
|
||||
prefix: Option<String>,
|
||||
last_peak: Arc<std::sync::atomic::AtomicU16>,
|
||||
stop: Arc<std::sync::atomic::AtomicBool>,
|
||||
) {
|
||||
let tx = self.app_event_tx.clone();
|
||||
std::thread::spawn(move || {
|
||||
use std::time::Duration;
|
||||
let mut meter = crate::voice::RecordingMeterState::new();
|
||||
loop {
|
||||
if stop.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let meter_text = meter.next_text(last_peak.load(Ordering::Relaxed));
|
||||
let text = match &prefix {
|
||||
Some(prefix) => format!("{prefix}{meter_text}"),
|
||||
None => meter_text,
|
||||
};
|
||||
tx.send(crate::app_event::AppEvent::UpdateRecordingMeter {
|
||||
id: id.clone(),
|
||||
text,
|
||||
});
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn skill_display_name(skill: &SkillMetadata) -> &str {
|
||||
skill
|
||||
.interface
|
||||
|
||||
@@ -39,6 +39,7 @@ pub(crate) struct CommandPopupFlags {
|
||||
pub(crate) collaboration_modes_enabled: bool,
|
||||
pub(crate) connectors_enabled: bool,
|
||||
pub(crate) personality_command_enabled: bool,
|
||||
pub(crate) realtime_command_enabled: bool,
|
||||
pub(crate) windows_degraded_sandbox_active: bool,
|
||||
}
|
||||
|
||||
@@ -49,6 +50,7 @@ impl CommandPopup {
|
||||
flags.collaboration_modes_enabled,
|
||||
flags.connectors_enabled,
|
||||
flags.personality_command_enabled,
|
||||
flags.realtime_command_enabled,
|
||||
flags.windows_degraded_sandbox_active,
|
||||
)
|
||||
.into_iter()
|
||||
@@ -495,6 +497,7 @@ mod tests {
|
||||
collaboration_modes_enabled: true,
|
||||
connectors_enabled: false,
|
||||
personality_command_enabled: true,
|
||||
realtime_command_enabled: false,
|
||||
windows_degraded_sandbox_active: false,
|
||||
},
|
||||
);
|
||||
@@ -514,6 +517,7 @@ mod tests {
|
||||
collaboration_modes_enabled: true,
|
||||
connectors_enabled: false,
|
||||
personality_command_enabled: true,
|
||||
realtime_command_enabled: false,
|
||||
windows_degraded_sandbox_active: false,
|
||||
},
|
||||
);
|
||||
@@ -533,6 +537,7 @@ mod tests {
|
||||
collaboration_modes_enabled: true,
|
||||
connectors_enabled: false,
|
||||
personality_command_enabled: false,
|
||||
realtime_command_enabled: false,
|
||||
windows_degraded_sandbox_active: false,
|
||||
},
|
||||
);
|
||||
@@ -560,6 +565,7 @@ mod tests {
|
||||
collaboration_modes_enabled: true,
|
||||
connectors_enabled: false,
|
||||
personality_command_enabled: true,
|
||||
realtime_command_enabled: false,
|
||||
windows_degraded_sandbox_active: false,
|
||||
},
|
||||
);
|
||||
@@ -571,6 +577,54 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn realtime_command_hidden_when_disabled() {
|
||||
let mut popup = CommandPopup::new(
|
||||
Vec::new(),
|
||||
CommandPopupFlags {
|
||||
collaboration_modes_enabled: true,
|
||||
connectors_enabled: false,
|
||||
personality_command_enabled: true,
|
||||
realtime_command_enabled: false,
|
||||
windows_degraded_sandbox_active: false,
|
||||
},
|
||||
);
|
||||
popup.on_composer_text_change("/rea".to_string());
|
||||
|
||||
let cmds: Vec<&str> = popup
|
||||
.filtered_items()
|
||||
.into_iter()
|
||||
.filter_map(|item| match item {
|
||||
CommandItem::Builtin(cmd) => Some(cmd.command()),
|
||||
CommandItem::UserPrompt(_) => None,
|
||||
})
|
||||
.collect();
|
||||
assert!(
|
||||
!cmds.contains(&"realtime"),
|
||||
"expected '/realtime' to be hidden when disabled, got {cmds:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn realtime_command_visible_when_enabled() {
|
||||
let mut popup = CommandPopup::new(
|
||||
Vec::new(),
|
||||
CommandPopupFlags {
|
||||
collaboration_modes_enabled: true,
|
||||
connectors_enabled: false,
|
||||
personality_command_enabled: true,
|
||||
realtime_command_enabled: true,
|
||||
windows_degraded_sandbox_active: false,
|
||||
},
|
||||
);
|
||||
popup.on_composer_text_change("/realtime".to_string());
|
||||
|
||||
match popup.selected_item() {
|
||||
Some(CommandItem::Builtin(cmd)) => assert_eq!(cmd.command(), "realtime"),
|
||||
other => panic!("expected realtime to be selected for exact match, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn debug_commands_are_hidden_from_popup() {
|
||||
let popup = CommandPopup::new(Vec::new(), CommandPopupFlags::default());
|
||||
|
||||
@@ -292,11 +292,14 @@ impl BottomPane {
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub fn set_realtime_command_enabled(&mut self, enabled: bool) {
|
||||
self.composer.set_realtime_command_enabled(enabled);
|
||||
self.request_redraw();
|
||||
}
|
||||
pub fn set_voice_transcription_enabled(&mut self, enabled: bool) {
|
||||
self.composer.set_voice_transcription_enabled(enabled);
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
/// Update the key hint shown next to queued messages so it matches the
|
||||
/// binding that `ChatWidget` actually listens for.
|
||||
pub(crate) fn set_queued_message_edit_binding(&mut self, binding: KeyBinding) {
|
||||
@@ -563,6 +566,17 @@ impl BottomPane {
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_live_meter_updates(
|
||||
&self,
|
||||
id: String,
|
||||
prefix: Option<String>,
|
||||
last_peak: std::sync::Arc<std::sync::atomic::AtomicU16>,
|
||||
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
) {
|
||||
self.composer
|
||||
.spawn_live_meter_updates(id, prefix, last_peak, stop);
|
||||
}
|
||||
|
||||
pub(crate) fn set_remote_image_urls(&mut self, urls: Vec<String>) {
|
||||
self.composer.set_remote_image_urls(urls);
|
||||
self.request_redraw();
|
||||
@@ -1005,7 +1019,6 @@ impl BottomPane {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
impl BottomPane {
|
||||
pub(crate) fn replace_transcription(&mut self, id: &str, text: &str) {
|
||||
self.composer.replace_transcription(id, text);
|
||||
|
||||
@@ -13,6 +13,7 @@ pub(crate) fn builtins_for_input(
|
||||
collaboration_modes_enabled: bool,
|
||||
connectors_enabled: bool,
|
||||
personality_command_enabled: bool,
|
||||
realtime_command_enabled: bool,
|
||||
allow_elevate_sandbox: bool,
|
||||
) -> Vec<(&'static str, SlashCommand)> {
|
||||
built_in_slash_commands()
|
||||
@@ -24,6 +25,7 @@ pub(crate) fn builtins_for_input(
|
||||
})
|
||||
.filter(|(_, cmd)| connectors_enabled || *cmd != SlashCommand::Apps)
|
||||
.filter(|(_, cmd)| personality_command_enabled || *cmd != SlashCommand::Personality)
|
||||
.filter(|(_, cmd)| realtime_command_enabled || *cmd != SlashCommand::Realtime)
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -33,12 +35,14 @@ pub(crate) fn find_builtin_command(
|
||||
collaboration_modes_enabled: bool,
|
||||
connectors_enabled: bool,
|
||||
personality_command_enabled: bool,
|
||||
realtime_command_enabled: bool,
|
||||
allow_elevate_sandbox: bool,
|
||||
) -> Option<SlashCommand> {
|
||||
builtins_for_input(
|
||||
collaboration_modes_enabled,
|
||||
connectors_enabled,
|
||||
personality_command_enabled,
|
||||
realtime_command_enabled,
|
||||
allow_elevate_sandbox,
|
||||
)
|
||||
.into_iter()
|
||||
@@ -52,12 +56,14 @@ pub(crate) fn has_builtin_prefix(
|
||||
collaboration_modes_enabled: bool,
|
||||
connectors_enabled: bool,
|
||||
personality_command_enabled: bool,
|
||||
realtime_command_enabled: bool,
|
||||
allow_elevate_sandbox: bool,
|
||||
) -> bool {
|
||||
builtins_for_input(
|
||||
collaboration_modes_enabled,
|
||||
connectors_enabled,
|
||||
personality_command_enabled,
|
||||
realtime_command_enabled,
|
||||
allow_elevate_sandbox,
|
||||
)
|
||||
.into_iter()
|
||||
@@ -71,14 +77,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn debug_command_still_resolves_for_dispatch() {
|
||||
let cmd = find_builtin_command("debug-config", true, true, true, false);
|
||||
let cmd = find_builtin_command("debug-config", true, true, true, true, false);
|
||||
assert_eq!(cmd, Some(SlashCommand::DebugConfig));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_command_resolves_for_dispatch() {
|
||||
assert_eq!(
|
||||
find_builtin_command("clear", true, true, true, false),
|
||||
find_builtin_command("clear", true, true, true, true, false),
|
||||
Some(SlashCommand::Clear)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -92,6 +92,7 @@ use codex_protocol::protocol::AgentReasoningRawContentEvent;
|
||||
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_protocol::protocol::BackgroundEventEvent;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::ConversationStartParams;
|
||||
use codex_protocol::protocol::CreditsSnapshot;
|
||||
use codex_protocol::protocol::DeprecationNoticeEvent;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
@@ -114,6 +115,10 @@ use codex_protocol::protocol::McpToolCallEndEvent;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::PatchApplyBeginEvent;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::RealtimeConversationClosedEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationStartedEvent;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::ReviewTarget;
|
||||
use codex_protocol::protocol::SkillMetadata as ProtocolSkillMetadata;
|
||||
@@ -163,6 +168,11 @@ const PLAN_MODE_REASONING_SCOPE_TITLE: &str = "Apply reasoning change";
|
||||
const PLAN_MODE_REASONING_SCOPE_PLAN_ONLY: &str = "Apply to Plan mode override";
|
||||
const PLAN_MODE_REASONING_SCOPE_ALL_MODES: &str = "Apply to global default and Plan mode override";
|
||||
const CONNECTORS_SELECTION_VIEW_ID: &str = "connectors-selection";
|
||||
const REALTIME_CONVERSATION_PROMPT: &str =
|
||||
"Low-level realtime conversation for TUI audio loop testing.";
|
||||
const REALTIME_METER_PLACEHOLDER_ID: &str = "realtime-meter";
|
||||
const REALTIME_LISTENING_PLACEHOLDER_TEXT: &str = "Realtime listening ⠤⠤⠤⠤";
|
||||
const REALTIME_LISTENING_PREFIX_TEXT: &str = "Realtime listening ";
|
||||
|
||||
/// Choose the keybinding used to edit the most-recently queued message.
|
||||
///
|
||||
@@ -235,6 +245,8 @@ use crate::key_hint;
|
||||
use crate::key_hint::KeyBinding;
|
||||
use crate::markdown::append_markdown;
|
||||
use crate::multi_agents;
|
||||
use crate::realtime_audio::RealtimeAudioController;
|
||||
use crate::realtime_audio::RealtimeMicMeterHandles;
|
||||
use crate::render::Insets;
|
||||
use crate::render::renderable::ColumnRenderable;
|
||||
use crate::render::renderable::FlexRenderable;
|
||||
@@ -250,6 +262,7 @@ use crate::tui::FrameRequester;
|
||||
mod interrupts;
|
||||
use self::interrupts::InterruptManager;
|
||||
mod agent;
|
||||
pub(crate) use self::agent::ChatWidgetOpSenders;
|
||||
use self::agent::spawn_agent;
|
||||
use self::agent::spawn_agent_from_existing;
|
||||
pub(crate) use self::agent::spawn_op_forwarder;
|
||||
@@ -503,6 +516,13 @@ pub(crate) enum ExternalEditorState {
|
||||
Active,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
enum RealtimeConversationUiState {
|
||||
#[default]
|
||||
Stopped,
|
||||
Working,
|
||||
}
|
||||
|
||||
/// Maintains the per-session UI state and interaction state machines for the chat screen.
|
||||
///
|
||||
/// `ChatWidget` owns the state derived from the protocol event stream (history cells, streaming
|
||||
@@ -518,6 +538,7 @@ pub(crate) enum ExternalEditorState {
|
||||
pub(crate) struct ChatWidget {
|
||||
app_event_tx: AppEventSender,
|
||||
codex_op_tx: UnboundedSender<Op>,
|
||||
realtime_audio_op_tx: tokio::sync::mpsc::Sender<Op>,
|
||||
bottom_pane: BottomPane,
|
||||
active_cell: Option<Box<dyn HistoryCell>>,
|
||||
/// Monotonic-ish counter used to invalidate transcript overlay caching.
|
||||
@@ -665,6 +686,9 @@ pub(crate) struct ChatWidget {
|
||||
// True once we've attempted a branch lookup for the current CWD.
|
||||
status_line_branch_lookup_complete: bool,
|
||||
external_editor_state: ExternalEditorState,
|
||||
realtime_conversation_state: RealtimeConversationUiState,
|
||||
realtime_audio_controller: Option<RealtimeAudioController>,
|
||||
realtime_meter_placeholder_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Snapshot of active-cell state that affects transcript overlay rendering.
|
||||
@@ -2733,7 +2757,10 @@ impl ChatWidget {
|
||||
let prevent_idle_sleep = config.features.enabled(Feature::PreventIdleSleep);
|
||||
let mut rng = rand::rng();
|
||||
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
|
||||
let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), thread_manager);
|
||||
let ChatWidgetOpSenders {
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
} = spawn_agent(config.clone(), app_event_tx.clone(), thread_manager);
|
||||
|
||||
let model_override = model.as_deref();
|
||||
let model_for_header = model
|
||||
@@ -2765,6 +2792,7 @@ impl ChatWidget {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
frame_requester: frame_requester.clone(),
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
bottom_pane: BottomPane::new(BottomPaneParams {
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
@@ -2846,6 +2874,9 @@ impl ChatWidget {
|
||||
status_line_branch_pending: false,
|
||||
status_line_branch_lookup_complete: false,
|
||||
external_editor_state: ExternalEditorState::Closed,
|
||||
realtime_conversation_state: RealtimeConversationUiState::Stopped,
|
||||
realtime_audio_controller: None,
|
||||
realtime_meter_placeholder_id: None,
|
||||
};
|
||||
|
||||
widget.prefetch_rate_limits();
|
||||
@@ -2860,6 +2891,7 @@ impl ChatWidget {
|
||||
.set_status_line_enabled(!widget.configured_status_line_items().is_empty());
|
||||
widget.bottom_pane.set_collaboration_modes_enabled(true);
|
||||
widget.sync_personality_command_enabled();
|
||||
widget.sync_realtime_command_enabled();
|
||||
widget
|
||||
.bottom_pane
|
||||
.set_queued_message_edit_binding(widget.queued_message_edit_binding);
|
||||
@@ -2880,10 +2912,7 @@ impl ChatWidget {
|
||||
widget
|
||||
}
|
||||
|
||||
pub(crate) fn new_with_op_sender(
|
||||
common: ChatWidgetInit,
|
||||
codex_op_tx: UnboundedSender<Op>,
|
||||
) -> Self {
|
||||
pub(crate) fn new_with_op_sender(common: ChatWidgetInit, senders: ChatWidgetOpSenders) -> Self {
|
||||
let ChatWidgetInit {
|
||||
config,
|
||||
frame_requester,
|
||||
@@ -2905,6 +2934,10 @@ impl ChatWidget {
|
||||
let prevent_idle_sleep = config.features.enabled(Feature::PreventIdleSleep);
|
||||
let mut rng = rand::rng();
|
||||
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
|
||||
let ChatWidgetOpSenders {
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
} = senders;
|
||||
|
||||
let model_override = model.as_deref();
|
||||
let model_for_header = model
|
||||
@@ -2936,6 +2969,7 @@ impl ChatWidget {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
frame_requester: frame_requester.clone(),
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
bottom_pane: BottomPane::new(BottomPaneParams {
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
@@ -3017,6 +3051,9 @@ impl ChatWidget {
|
||||
status_line_branch_pending: false,
|
||||
status_line_branch_lookup_complete: false,
|
||||
external_editor_state: ExternalEditorState::Closed,
|
||||
realtime_conversation_state: RealtimeConversationUiState::Stopped,
|
||||
realtime_audio_controller: None,
|
||||
realtime_meter_placeholder_id: None,
|
||||
};
|
||||
|
||||
widget.prefetch_rate_limits();
|
||||
@@ -3031,6 +3068,7 @@ impl ChatWidget {
|
||||
.set_status_line_enabled(!widget.configured_status_line_items().is_empty());
|
||||
widget.bottom_pane.set_collaboration_modes_enabled(true);
|
||||
widget.sync_personality_command_enabled();
|
||||
widget.sync_realtime_command_enabled();
|
||||
widget
|
||||
.bottom_pane
|
||||
.set_queued_message_edit_binding(widget.queued_message_edit_binding);
|
||||
@@ -3076,8 +3114,10 @@ impl ChatWidget {
|
||||
.unwrap_or(header_model);
|
||||
|
||||
let current_cwd = Some(session_configured.cwd.clone());
|
||||
let codex_op_tx =
|
||||
spawn_agent_from_existing(conversation, session_configured, app_event_tx.clone());
|
||||
let ChatWidgetOpSenders {
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
} = spawn_agent_from_existing(conversation, session_configured, app_event_tx.clone());
|
||||
|
||||
let fallback_default = Settings {
|
||||
model: header_model.clone(),
|
||||
@@ -3096,6 +3136,7 @@ impl ChatWidget {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
frame_requester: frame_requester.clone(),
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
bottom_pane: BottomPane::new(BottomPaneParams {
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
@@ -3177,6 +3218,9 @@ impl ChatWidget {
|
||||
status_line_branch_pending: false,
|
||||
status_line_branch_lookup_complete: false,
|
||||
external_editor_state: ExternalEditorState::Closed,
|
||||
realtime_conversation_state: RealtimeConversationUiState::Stopped,
|
||||
realtime_audio_controller: None,
|
||||
realtime_meter_placeholder_id: None,
|
||||
};
|
||||
|
||||
widget.prefetch_rate_limits();
|
||||
@@ -3191,6 +3235,7 @@ impl ChatWidget {
|
||||
.set_status_line_enabled(!widget.configured_status_line_items().is_empty());
|
||||
widget.bottom_pane.set_collaboration_modes_enabled(true);
|
||||
widget.sync_personality_command_enabled();
|
||||
widget.sync_realtime_command_enabled();
|
||||
widget
|
||||
.bottom_pane
|
||||
.set_queued_message_edit_binding(widget.queued_message_edit_binding);
|
||||
@@ -3402,6 +3447,140 @@ impl ChatWidget {
|
||||
self.bottom_pane.can_launch_external_editor()
|
||||
}
|
||||
|
||||
fn enable_realtime_ui(&mut self) {
|
||||
self.disable_realtime_ui();
|
||||
self.set_footer_hint_override(Some(vec![(
|
||||
"/realtime".to_string(),
|
||||
"Stop voice chat".to_string(),
|
||||
)]));
|
||||
let id = REALTIME_METER_PLACEHOLDER_ID.to_string();
|
||||
self.replace_transcription(&id, REALTIME_LISTENING_PLACEHOLDER_TEXT);
|
||||
|
||||
if let Some(controller) = &self.realtime_audio_controller
|
||||
&& let Some(RealtimeMicMeterHandles { last_peak, stop }) = controller.meter_handles()
|
||||
{
|
||||
self.bottom_pane.spawn_live_meter_updates(
|
||||
id.clone(),
|
||||
Some(REALTIME_LISTENING_PREFIX_TEXT.to_string()),
|
||||
last_peak,
|
||||
stop,
|
||||
);
|
||||
}
|
||||
|
||||
self.realtime_meter_placeholder_id = Some(id);
|
||||
}
|
||||
|
||||
fn disable_realtime_ui(&mut self) {
|
||||
self.set_footer_hint_override(None);
|
||||
if let Some(id) = self.realtime_meter_placeholder_id.take() {
|
||||
self.remove_transcription_placeholder(&id);
|
||||
}
|
||||
}
|
||||
|
||||
fn toggle_realtime_conversation(&mut self) {
|
||||
if !self.realtime_command_enabled() {
|
||||
self.add_info_message(
|
||||
"Realtime conversation is disabled.".to_string(),
|
||||
Some(
|
||||
"Enable `[features].realtime_conversation = true` in config.toml.".to_string(),
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
match self.realtime_conversation_state {
|
||||
RealtimeConversationUiState::Stopped => {
|
||||
self.realtime_conversation_state = RealtimeConversationUiState::Working;
|
||||
self.add_info_message("Starting realtime conversation...".to_string(), None);
|
||||
self.submit_op(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: REALTIME_CONVERSATION_PROMPT.to_string(),
|
||||
session_id: None,
|
||||
}));
|
||||
}
|
||||
RealtimeConversationUiState::Working => {
|
||||
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
|
||||
if let Some(a) = self.realtime_audio_controller.take() {
|
||||
RealtimeAudioController::shutdown(a)
|
||||
}
|
||||
self.disable_realtime_ui();
|
||||
self.add_info_message("Stopping realtime conversation...".to_string(), None);
|
||||
self.submit_op(Op::RealtimeConversationClose);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_realtime_conversation_started(&mut self, event: RealtimeConversationStartedEvent) {
|
||||
if self.realtime_conversation_state == RealtimeConversationUiState::Stopped {
|
||||
self.submit_op(Op::RealtimeConversationClose);
|
||||
return;
|
||||
}
|
||||
|
||||
match RealtimeAudioController::start(self.realtime_audio_op_tx.clone()) {
|
||||
Ok(controller) => {
|
||||
self.realtime_audio_controller = Some(controller);
|
||||
self.realtime_conversation_state = RealtimeConversationUiState::Working;
|
||||
self.enable_realtime_ui();
|
||||
let message = match event.session_id {
|
||||
Some(session_id) => format!("Realtime conversation started ({session_id})."),
|
||||
None => "Realtime conversation started.".to_string(),
|
||||
};
|
||||
self.add_info_message(message, None);
|
||||
}
|
||||
Err(err) => {
|
||||
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
|
||||
self.disable_realtime_ui();
|
||||
self.add_error_message(format!("Failed to start realtime audio IO: {err}"));
|
||||
self.submit_op(Op::RealtimeConversationClose);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_realtime_conversation_event(&mut self, event: RealtimeConversationRealtimeEvent) {
|
||||
match event.payload {
|
||||
RealtimeEvent::AudioOut(frame) => {
|
||||
if let Some(controller) = &self.realtime_audio_controller
|
||||
&& let Err(err) = controller.enqueue_audio_out(frame)
|
||||
{
|
||||
warn!("failed to queue realtime audio playback frame: {err}");
|
||||
}
|
||||
}
|
||||
RealtimeEvent::Error(message) => {
|
||||
if let Some(a) = self.realtime_audio_controller.take() {
|
||||
RealtimeAudioController::shutdown(a)
|
||||
}
|
||||
if self.realtime_conversation_state != RealtimeConversationUiState::Stopped {
|
||||
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
|
||||
}
|
||||
self.disable_realtime_ui();
|
||||
self.add_error_message(format!("Realtime conversation error: {message}"));
|
||||
}
|
||||
RealtimeEvent::SessionCreated { session_id } => {
|
||||
debug!("realtime conversation session created: {session_id}");
|
||||
}
|
||||
RealtimeEvent::SessionUpdated { backend_prompt } => {
|
||||
debug!(
|
||||
"realtime conversation session updated: {:?}",
|
||||
backend_prompt
|
||||
);
|
||||
}
|
||||
RealtimeEvent::ConversationItemAdded(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_realtime_conversation_closed(&mut self, event: RealtimeConversationClosedEvent) {
|
||||
if let Some(a) = self.realtime_audio_controller.take() {
|
||||
RealtimeAudioController::shutdown(a)
|
||||
}
|
||||
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
|
||||
self.disable_realtime_ui();
|
||||
|
||||
let message = match event.reason {
|
||||
Some(reason) => format!("Realtime conversation closed ({reason})."),
|
||||
None => "Realtime conversation closed.".to_string(),
|
||||
};
|
||||
self.add_info_message(message, None);
|
||||
}
|
||||
|
||||
fn dispatch_command(&mut self, cmd: SlashCommand) {
|
||||
if !cmd.available_during_task() && self.bottom_pane.is_task_running() {
|
||||
let message = format!(
|
||||
@@ -3559,6 +3738,9 @@ impl ChatWidget {
|
||||
SlashCommand::Experimental => {
|
||||
self.open_experimental_popup();
|
||||
}
|
||||
SlashCommand::Realtime => {
|
||||
self.toggle_realtime_conversation();
|
||||
}
|
||||
SlashCommand::Quit | SlashCommand::Exit => {
|
||||
self.request_quit_without_confirmation();
|
||||
}
|
||||
@@ -4302,7 +4484,9 @@ impl ChatWidget {
|
||||
}
|
||||
}
|
||||
EventMsg::UserMessage(ev) => {
|
||||
if from_replay {
|
||||
if from_replay
|
||||
|| self.realtime_conversation_state == RealtimeConversationUiState::Working
|
||||
{
|
||||
self.on_user_message_event(ev);
|
||||
}
|
||||
}
|
||||
@@ -4332,14 +4516,26 @@ impl ChatWidget {
|
||||
});
|
||||
}
|
||||
}
|
||||
EventMsg::RealtimeConversationStarted(ev) => {
|
||||
if !from_replay {
|
||||
self.on_realtime_conversation_started(ev);
|
||||
}
|
||||
}
|
||||
EventMsg::RealtimeConversationRealtime(ev) => {
|
||||
if !from_replay {
|
||||
self.on_realtime_conversation_event(ev);
|
||||
}
|
||||
}
|
||||
EventMsg::RealtimeConversationClosed(ev) => {
|
||||
if !from_replay {
|
||||
self.on_realtime_conversation_closed(ev);
|
||||
}
|
||||
}
|
||||
EventMsg::RawResponseItem(_)
|
||||
| EventMsg::ItemStarted(_)
|
||||
| EventMsg::AgentMessageContentDelta(_)
|
||||
| EventMsg::ReasoningContentDelta(_)
|
||||
| EventMsg::ReasoningRawContentDelta(_)
|
||||
| EventMsg::RealtimeConversationStarted(_)
|
||||
| EventMsg::RealtimeConversationRealtime(_)
|
||||
| EventMsg::RealtimeConversationClosed(_)
|
||||
| EventMsg::DynamicToolCallRequest(_) => {}
|
||||
EventMsg::ItemCompleted(event) => {
|
||||
let item = event.item;
|
||||
@@ -6389,6 +6585,19 @@ impl ChatWidget {
|
||||
if feature == Feature::Personality {
|
||||
self.sync_personality_command_enabled();
|
||||
}
|
||||
if feature == Feature::RealtimeConversation {
|
||||
self.sync_realtime_command_enabled();
|
||||
if !enabled {
|
||||
if self.realtime_conversation_state != RealtimeConversationUiState::Stopped {
|
||||
self.submit_op(Op::RealtimeConversationClose);
|
||||
}
|
||||
if let Some(a) = self.realtime_audio_controller.take() {
|
||||
RealtimeAudioController::shutdown(a)
|
||||
}
|
||||
self.realtime_conversation_state = RealtimeConversationUiState::Stopped;
|
||||
self.disable_realtime_ui();
|
||||
}
|
||||
}
|
||||
if feature == Feature::PreventIdleSleep {
|
||||
self.turn_sleep_inhibitor = SleepInhibitor::new(enabled);
|
||||
self.turn_sleep_inhibitor
|
||||
@@ -6501,6 +6710,16 @@ impl ChatWidget {
|
||||
.set_personality_command_enabled(self.config.features.enabled(Feature::Personality));
|
||||
}
|
||||
|
||||
fn sync_realtime_command_enabled(&mut self) {
|
||||
self.bottom_pane.set_realtime_command_enabled(
|
||||
self.config.features.enabled(Feature::RealtimeConversation),
|
||||
);
|
||||
}
|
||||
|
||||
fn realtime_command_enabled(&self) -> bool {
|
||||
self.config.features.enabled(Feature::RealtimeConversation)
|
||||
}
|
||||
|
||||
fn current_model_supports_personality(&self) -> bool {
|
||||
let model = self.current_model();
|
||||
self.models_manager
|
||||
@@ -7537,7 +7756,6 @@ impl ChatWidget {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
impl ChatWidget {
|
||||
pub(crate) fn replace_transcription(&mut self, id: &str, text: &str) {
|
||||
self.bottom_pane.replace_transcription(id, text);
|
||||
|
||||
@@ -7,20 +7,31 @@ use codex_core::config::Config;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::sync::mpsc::channel;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
|
||||
const REALTIME_MIC_AUDIO_QUEUE_CAPACITY: usize = 64;
|
||||
|
||||
pub(crate) struct ChatWidgetOpSenders {
|
||||
pub(crate) codex_op_tx: UnboundedSender<Op>,
|
||||
pub(crate) realtime_audio_op_tx: Sender<Op>,
|
||||
}
|
||||
|
||||
/// Spawn the agent bootstrapper and op forwarding loop, returning the
|
||||
/// `UnboundedSender<Op>` used by the UI to submit operations.
|
||||
pub(crate) fn spawn_agent(
|
||||
config: Config,
|
||||
app_event_tx: AppEventSender,
|
||||
server: Arc<ThreadManager>,
|
||||
) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
) -> ChatWidgetOpSenders {
|
||||
let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels();
|
||||
|
||||
let app_event_tx_clone = app_event_tx;
|
||||
tokio::spawn(async move {
|
||||
@@ -51,15 +62,7 @@ pub(crate) fn spawn_agent(
|
||||
};
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
|
||||
|
||||
let thread_clone = thread.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
let id = thread_clone.submit(op).await;
|
||||
if let Err(e) = id {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx);
|
||||
|
||||
while let Ok(event) = thread.next_event().await {
|
||||
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
|
||||
@@ -72,18 +75,21 @@ pub(crate) fn spawn_agent(
|
||||
}
|
||||
});
|
||||
|
||||
codex_op_tx
|
||||
ChatWidgetOpSenders {
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn agent loops for an existing thread (e.g., a forked thread).
|
||||
/// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent
|
||||
/// events and accepts Ops for submission.
|
||||
pub(crate) fn spawn_agent_from_existing(
|
||||
thread: std::sync::Arc<CodexThread>,
|
||||
thread: Arc<CodexThread>,
|
||||
session_configured: codex_protocol::protocol::SessionConfiguredEvent,
|
||||
app_event_tx: AppEventSender,
|
||||
) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
) -> ChatWidgetOpSenders {
|
||||
let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels();
|
||||
|
||||
let app_event_tx_clone = app_event_tx;
|
||||
tokio::spawn(async move {
|
||||
@@ -94,15 +100,7 @@ pub(crate) fn spawn_agent_from_existing(
|
||||
};
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
|
||||
|
||||
let thread_clone = thread.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
let id = thread_clone.submit(op).await;
|
||||
if let Err(e) = id {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx);
|
||||
|
||||
while let Ok(event) = thread.next_event().await {
|
||||
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
|
||||
@@ -115,13 +113,56 @@ pub(crate) fn spawn_agent_from_existing(
|
||||
}
|
||||
});
|
||||
|
||||
codex_op_tx
|
||||
ChatWidgetOpSenders {
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn an op-forwarding loop for an existing thread without subscribing to events.
|
||||
pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
pub(crate) fn spawn_op_forwarder(thread: Arc<CodexThread>) -> ChatWidgetOpSenders {
|
||||
let (codex_op_tx, codex_op_rx, realtime_audio_op_tx, realtime_audio_op_rx) = op_channels();
|
||||
spawn_bounded_op_forwarder_threads(thread, codex_op_rx, realtime_audio_op_rx);
|
||||
|
||||
ChatWidgetOpSenders {
|
||||
codex_op_tx,
|
||||
realtime_audio_op_tx,
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_bounded_op_forwarder(thread: Arc<CodexThread>, mut op_rx: Receiver<Op>) {
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = op_rx.recv().await {
|
||||
if let Err(e) = thread.submit(op).await {
|
||||
tracing::error!("failed to submit realtime audio op: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn op_channels() -> (
|
||||
UnboundedSender<Op>,
|
||||
UnboundedReceiver<Op>,
|
||||
Sender<Op>,
|
||||
Receiver<Op>,
|
||||
) {
|
||||
let (codex_op_tx, codex_op_rx) = unbounded_channel::<Op>();
|
||||
let (realtime_audio_op_tx, realtime_audio_op_rx) =
|
||||
channel::<Op>(REALTIME_MIC_AUDIO_QUEUE_CAPACITY);
|
||||
(
|
||||
codex_op_tx,
|
||||
codex_op_rx,
|
||||
realtime_audio_op_tx,
|
||||
realtime_audio_op_rx,
|
||||
)
|
||||
}
|
||||
|
||||
fn spawn_bounded_op_forwarder_threads(
|
||||
thread: Arc<CodexThread>,
|
||||
mut codex_op_rx: UnboundedReceiver<Op>,
|
||||
realtime_audio_op_rx: Receiver<Op>,
|
||||
) {
|
||||
spawn_bounded_op_forwarder(thread.clone(), realtime_audio_op_rx);
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
if let Err(e) = thread.submit(op).await {
|
||||
@@ -129,6 +170,4 @@ pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> Unbound
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
codex_op_tx
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ use codex_protocol::protocol::AgentReasoningEvent;
|
||||
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_protocol::protocol::BackgroundEventEvent;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::ConversationStartParams;
|
||||
use codex_protocol::protocol::CreditsSnapshot;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
@@ -73,6 +74,11 @@ use codex_protocol::protocol::PatchApplyBeginEvent;
|
||||
use codex_protocol::protocol::PatchApplyEndEvent;
|
||||
use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
use codex_protocol::protocol::RealtimeConversationClosedEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationStartedEvent;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::ReviewTarget;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -1643,10 +1649,13 @@ async fn make_chatwidget_manual(
|
||||
},
|
||||
};
|
||||
let current_collaboration_mode = base_mode;
|
||||
let (realtime_audio_op_tx, realtime_audio_op_rx) = tokio::sync::mpsc::channel(1);
|
||||
drop(realtime_audio_op_rx);
|
||||
let active_collaboration_mask = collaboration_modes::default_mask(models_manager.as_ref());
|
||||
let mut widget = ChatWidget {
|
||||
app_event_tx,
|
||||
codex_op_tx: op_tx,
|
||||
realtime_audio_op_tx,
|
||||
bottom_pane: bottom,
|
||||
active_cell: None,
|
||||
active_cell_revision: 0,
|
||||
@@ -1720,6 +1729,9 @@ async fn make_chatwidget_manual(
|
||||
status_line_branch_pending: false,
|
||||
status_line_branch_lookup_complete: false,
|
||||
external_editor_state: ExternalEditorState::Closed,
|
||||
realtime_conversation_state: RealtimeConversationUiState::Stopped,
|
||||
realtime_audio_controller: None,
|
||||
realtime_meter_placeholder_id: None,
|
||||
};
|
||||
widget.set_model(&resolved_model);
|
||||
(widget, rx, op_rx)
|
||||
@@ -3379,7 +3391,7 @@ async fn unified_exec_begin_restores_working_status_snapshot() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn steer_enter_queues_while_plan_stream_is_active() {
|
||||
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
let (mut chat, _, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.thread_id = Some(ThreadId::new());
|
||||
chat.set_feature_enabled(Feature::CollaborationModes, true);
|
||||
let plan_mask =
|
||||
@@ -3404,7 +3416,7 @@ async fn steer_enter_queues_while_plan_stream_is_active() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn steer_enter_queues_while_final_answer_stream_is_active() {
|
||||
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
let (mut chat, _, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.thread_id = Some(ThreadId::new());
|
||||
chat.on_task_started();
|
||||
// Keep the assistant stream open (no commit tick/finalize) to model the repro window:
|
||||
@@ -3528,7 +3540,7 @@ async fn steer_enter_submits_when_plan_stream_is_not_active() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn ctrl_c_shutdown_works_with_caps_lock() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
let (mut chat, mut rx, _) = make_chatwidget_manual(None).await;
|
||||
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Char('C'), KeyModifiers::CONTROL));
|
||||
|
||||
@@ -4616,6 +4628,196 @@ async fn slash_quit_requests_exit() {
|
||||
assert_matches!(rx.try_recv(), Ok(AppEvent::Exit(ExitMode::ShutdownFirst)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn slash_realtime_shows_error_when_feature_disabled() {
|
||||
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.set_feature_enabled(Feature::RealtimeConversation, false);
|
||||
|
||||
chat.dispatch_command(SlashCommand::Realtime);
|
||||
|
||||
match op_rx.try_recv() {
|
||||
Err(TryRecvError::Empty) => {}
|
||||
other => panic!("expected no realtime op when feature is disabled, got {other:?}"),
|
||||
}
|
||||
|
||||
let rendered = drain_insert_history(&mut rx)
|
||||
.iter()
|
||||
.map(|lines| lines_to_single_string(lines))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
assert!(
|
||||
rendered.contains("Realtime conversation is disabled."),
|
||||
"expected disabled message, got {rendered:?}"
|
||||
);
|
||||
assert!(
|
||||
rendered.contains("realtime_conversation = true"),
|
||||
"expected config hint, got {rendered:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn slash_realtime_toggle_sends_start_then_close_ops() {
|
||||
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.set_feature_enabled(Feature::RealtimeConversation, true);
|
||||
|
||||
chat.dispatch_command(SlashCommand::Realtime);
|
||||
|
||||
let start = match op_rx.try_recv() {
|
||||
Ok(Op::RealtimeConversationStart(params)) => params,
|
||||
other => panic!("expected realtime start op, got {other:?}"),
|
||||
};
|
||||
assert_eq!(
|
||||
start,
|
||||
ConversationStartParams {
|
||||
prompt: REALTIME_CONVERSATION_PROMPT.to_string(),
|
||||
session_id: None,
|
||||
}
|
||||
);
|
||||
|
||||
chat.dispatch_command(SlashCommand::Realtime);
|
||||
|
||||
assert_matches!(op_rx.try_recv(), Ok(Op::RealtimeConversationClose));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn disabling_realtime_feature_closes_active_session() {
|
||||
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.set_feature_enabled(Feature::RealtimeConversation, true);
|
||||
|
||||
chat.dispatch_command(SlashCommand::Realtime);
|
||||
assert_matches!(
|
||||
op_rx.try_recv(),
|
||||
Ok(Op::RealtimeConversationStart(
|
||||
ConversationStartParams { .. }
|
||||
))
|
||||
);
|
||||
assert_eq!(
|
||||
chat.realtime_conversation_state,
|
||||
RealtimeConversationUiState::Working
|
||||
);
|
||||
|
||||
chat.set_feature_enabled(Feature::RealtimeConversation, false);
|
||||
|
||||
assert_eq!(
|
||||
chat.realtime_conversation_state,
|
||||
RealtimeConversationUiState::Stopped
|
||||
);
|
||||
assert_matches!(op_rx.try_recv(), Ok(Op::RealtimeConversationClose));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_events_update_lifecycle_state() {
|
||||
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.set_feature_enabled(Feature::RealtimeConversation, true);
|
||||
|
||||
chat.dispatch_command(SlashCommand::Realtime);
|
||||
assert_matches!(
|
||||
op_rx.try_recv(),
|
||||
Ok(Op::RealtimeConversationStart(
|
||||
ConversationStartParams { .. }
|
||||
))
|
||||
);
|
||||
assert_eq!(
|
||||
chat.realtime_conversation_state,
|
||||
RealtimeConversationUiState::Working
|
||||
);
|
||||
|
||||
chat.handle_codex_event(Event {
|
||||
id: "rt-started".into(),
|
||||
msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent {
|
||||
session_id: Some("sess-123".to_string()),
|
||||
}),
|
||||
});
|
||||
assert_eq!(
|
||||
chat.realtime_conversation_state,
|
||||
RealtimeConversationUiState::Working
|
||||
);
|
||||
|
||||
chat.handle_codex_event(Event {
|
||||
id: "rt-audio".into(),
|
||||
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::AudioOut(RealtimeAudioFrame {
|
||||
data: String::new(),
|
||||
sample_rate: 24_000,
|
||||
num_channels: 1,
|
||||
samples_per_channel: Some(0),
|
||||
}),
|
||||
}),
|
||||
});
|
||||
assert_eq!(
|
||||
chat.realtime_conversation_state,
|
||||
RealtimeConversationUiState::Working
|
||||
);
|
||||
|
||||
chat.handle_codex_event(Event {
|
||||
id: "rt-closed".into(),
|
||||
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent {
|
||||
reason: Some("requested".to_string()),
|
||||
}),
|
||||
});
|
||||
assert_eq!(
|
||||
chat.realtime_conversation_state,
|
||||
RealtimeConversationUiState::Stopped
|
||||
);
|
||||
|
||||
let rendered = drain_insert_history(&mut rx)
|
||||
.iter()
|
||||
.map(|lines| lines_to_single_string(lines))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
assert!(
|
||||
rendered.contains("Starting realtime conversation..."),
|
||||
"expected start status message, got {rendered:?}"
|
||||
);
|
||||
assert!(
|
||||
rendered.contains("Realtime conversation started (sess-123)."),
|
||||
"expected started status message, got {rendered:?}"
|
||||
);
|
||||
assert!(
|
||||
rendered.contains("Realtime conversation closed (requested)."),
|
||||
"expected closed status message, got {rendered:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn live_user_message_renders_only_during_realtime_mode() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
|
||||
let user_message_event = || {
|
||||
EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "delegated user text".to_string(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})
|
||||
};
|
||||
|
||||
chat.handle_codex_event(Event {
|
||||
id: "live-user-normal".into(),
|
||||
msg: user_message_event(),
|
||||
});
|
||||
assert!(
|
||||
drain_insert_history(&mut rx).is_empty(),
|
||||
"expected live user message to remain hidden outside realtime mode"
|
||||
);
|
||||
|
||||
chat.realtime_conversation_state = RealtimeConversationUiState::Working;
|
||||
chat.handle_codex_event(Event {
|
||||
id: "live-user-realtime".into(),
|
||||
msg: user_message_event(),
|
||||
});
|
||||
|
||||
let rendered = drain_insert_history(&mut rx)
|
||||
.iter()
|
||||
.map(|lines| lines_to_single_string(lines))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
assert!(
|
||||
rendered.contains("delegated user text"),
|
||||
"expected live user message in realtime mode, got {rendered:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn slash_exit_requests_exit() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
|
||||
@@ -93,6 +93,7 @@ pub mod onboarding;
|
||||
mod oss_selection;
|
||||
mod pager_overlay;
|
||||
pub mod public_widgets;
|
||||
mod realtime_audio;
|
||||
mod render;
|
||||
mod resume_picker;
|
||||
mod selection_list;
|
||||
@@ -116,7 +117,10 @@ mod updates;
|
||||
mod version;
|
||||
#[cfg(all(not(target_os = "linux"), feature = "voice-input"))]
|
||||
mod voice;
|
||||
#[cfg(all(not(target_os = "linux"), not(feature = "voice-input")))]
|
||||
#[cfg(any(
|
||||
target_os = "linux",
|
||||
all(not(target_os = "linux"), not(feature = "voice-input"))
|
||||
))]
|
||||
mod voice {
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
|
||||
569
codex-rs/tui/src/realtime_audio.rs
Normal file
569
codex-rs/tui/src/realtime_audio.rs
Normal file
@@ -0,0 +1,569 @@
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use base64::Engine;
|
||||
use codex_protocol::protocol::ConversationAudioParams;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use cpal::traits::DeviceTrait;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use cpal::traits::HostTrait;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use cpal::traits::StreamTrait;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::AtomicU16;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tracing::warn;
|
||||
const TARGET_SAMPLE_RATE: u32 = 24_000;
|
||||
const TARGET_NUM_CHANNELS: u16 = 1;
|
||||
const TARGET_SAMPLES_PER_CHANNEL: u32 = 480;
|
||||
const PLAYBACK_BUFFER_SECONDS: usize = 5;
|
||||
const MAX_AUDIO_FRAME_DECODED_BYTES: usize = 128 * 1024;
|
||||
const MAX_AUDIO_FRAME_ENCODED_BYTES: usize = 192 * 1024;
|
||||
const MIN_REALTIME_PLAYBACK_SAMPLE_RATE: u32 = 8_000;
|
||||
const MAX_REALTIME_PLAYBACK_SAMPLE_RATE: u32 = 192_000;
|
||||
const MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME: usize = 480_000;
|
||||
|
||||
pub(crate) struct RealtimeAudioController {
|
||||
backend: RealtimeAudioBackend,
|
||||
}
|
||||
|
||||
pub(crate) struct RealtimeMicMeterHandles {
|
||||
pub(crate) last_peak: Arc<AtomicU16>,
|
||||
pub(crate) stop: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
enum RealtimeAudioBackend {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
Live(LiveRealtimeAudioController),
|
||||
Stub,
|
||||
}
|
||||
struct LiveRealtimeAudioController {
|
||||
input_capture: crate::voice::VoiceCapture,
|
||||
input_capture_thread: Option<thread::JoinHandle<()>>,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
output_stream: cpal::Stream,
|
||||
playback_state: Arc<Mutex<PlaybackState>>,
|
||||
last_input_peak: Arc<AtomicU16>,
|
||||
meter_stop: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl RealtimeAudioController {
|
||||
pub(crate) fn start(realtime_audio_op_tx: Sender<Op>) -> Result<Self> {
|
||||
if cfg!(test) {
|
||||
drop(realtime_audio_op_tx);
|
||||
return Ok(Self {
|
||||
backend: RealtimeAudioBackend::Stub,
|
||||
});
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
drop(realtime_audio_op_tx);
|
||||
return Ok(Self {
|
||||
backend: RealtimeAudioBackend::Stub,
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let host = cpal::default_host();
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let output_device = host
|
||||
.default_output_device()
|
||||
.context("no default output device available")?;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let output_supported = output_device
|
||||
.default_output_config()
|
||||
.context("failed to query default output config")?;
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let output_config = output_supported.config();
|
||||
// TODO(aibrahim): Add persisted audio device + sample-rate selection/config for TUI
|
||||
// realtime conversations instead of always using defaults.
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let input_capture = crate::voice::VoiceCapture::start().map_err(anyhow::Error::msg)?;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let source_sample_rate = input_capture.sample_rate();
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let source_channels = input_capture.channels();
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
if source_sample_rate == 0 || source_channels == 0 {
|
||||
return Err(anyhow::anyhow!(
|
||||
"unsupported realtime microphone format from VoiceCapture"
|
||||
));
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let source_data = input_capture.data_arc();
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let last_input_peak = input_capture.last_peak_arc();
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let meter_stop = input_capture.stopped_flag();
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let mic_state = Arc::new(Mutex::new(MicCaptureState::new(
|
||||
realtime_audio_op_tx,
|
||||
source_sample_rate,
|
||||
source_channels,
|
||||
)));
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let source_data_thread = source_data;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let input_state = Arc::clone(&mic_state);
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let input_capture_thread = thread::spawn({
|
||||
let stop = Arc::clone(&meter_stop);
|
||||
move || {
|
||||
while !stop.load(Ordering::Relaxed) {
|
||||
let samples = match source_data_thread.lock() {
|
||||
Ok(mut data) => {
|
||||
if data.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(std::mem::take(&mut *data))
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("failed to lock realtime microphone buffer");
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(samples) = samples {
|
||||
if let Ok(mut state) = input_state.lock() {
|
||||
state.push_input_samples_i16(&samples);
|
||||
} else {
|
||||
warn!("failed to lock realtime microphone processing state");
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let playback_state = Arc::new(Mutex::new(PlaybackState::new(
|
||||
output_config.sample_rate.0,
|
||||
output_config.channels,
|
||||
)));
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let output_stream = build_output_stream(
|
||||
&output_device,
|
||||
output_supported.sample_format(),
|
||||
&output_config,
|
||||
Arc::clone(&playback_state),
|
||||
)
|
||||
.context("failed to open speaker output stream")?;
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
output_stream
|
||||
.play()
|
||||
.context("failed to start speaker output stream")?;
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
Ok(Self {
|
||||
backend: RealtimeAudioBackend::Live(LiveRealtimeAudioController {
|
||||
input_capture,
|
||||
input_capture_thread: Some(input_capture_thread),
|
||||
output_stream,
|
||||
playback_state,
|
||||
last_input_peak,
|
||||
meter_stop,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn enqueue_audio_out(&self, frame: RealtimeAudioFrame) -> Result<()> {
|
||||
match &self.backend {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
RealtimeAudioBackend::Live(controller) => {
|
||||
let mut state = controller
|
||||
.playback_state
|
||||
.lock()
|
||||
.map_err(|_| anyhow::anyhow!("playback state lock poisoned"))?;
|
||||
state.enqueue(frame)?;
|
||||
}
|
||||
RealtimeAudioBackend::Stub => {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown(self) {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
if let RealtimeAudioBackend::Live(controller) = self.backend {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
if let Err(err) = controller.input_capture.stop().map(|_| ()) {
|
||||
warn!("failed to stop realtime microphone capture: {err}");
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
if let Some(handle) = controller.input_capture_thread {
|
||||
if let Err(err) = handle.join() {
|
||||
warn!("failed to join realtime microphone input thread: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn meter_handles(&self) -> Option<RealtimeMicMeterHandles> {
|
||||
match &self.backend {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
RealtimeAudioBackend::Live(controller) => Some(RealtimeMicMeterHandles {
|
||||
last_peak: Arc::clone(&controller.last_input_peak),
|
||||
stop: Arc::clone(&controller.meter_stop),
|
||||
}),
|
||||
RealtimeAudioBackend::Stub => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Debug)]
|
||||
struct MicCaptureState {
|
||||
realtime_audio_op_tx: Sender<Op>,
|
||||
source_sample_rate: u32,
|
||||
source_channels: u16,
|
||||
source_mono: Vec<f32>,
|
||||
source_position: f64,
|
||||
resampled: Vec<f32>,
|
||||
}
|
||||
impl MicCaptureState {
|
||||
fn new(
|
||||
realtime_audio_op_tx: Sender<Op>,
|
||||
source_sample_rate: u32,
|
||||
source_channels: u16,
|
||||
) -> Self {
|
||||
Self {
|
||||
realtime_audio_op_tx,
|
||||
source_sample_rate,
|
||||
source_channels,
|
||||
source_mono: Vec::new(),
|
||||
source_position: 0.0,
|
||||
resampled: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn push_input_samples_i16(&mut self, data: &[i16]) {
|
||||
self.push_mono_samples_from_frames(
|
||||
data.iter().map(|sample| *sample as f32 / i16::MAX as f32),
|
||||
);
|
||||
}
|
||||
|
||||
fn push_mono_samples_from_frames<I>(&mut self, mut samples: I)
|
||||
where
|
||||
I: Iterator<Item = f32>,
|
||||
{
|
||||
let channels = usize::from(self.source_channels.max(1));
|
||||
loop {
|
||||
let mut sum = 0.0f32;
|
||||
let mut count = 0usize;
|
||||
for _ in 0..channels {
|
||||
let Some(sample) = samples.next() else {
|
||||
self.process_and_send_ready_frames();
|
||||
return;
|
||||
};
|
||||
sum += sample;
|
||||
count += 1;
|
||||
}
|
||||
self.source_mono.push(sum / count as f32);
|
||||
}
|
||||
}
|
||||
|
||||
fn process_and_send_ready_frames(&mut self) {
|
||||
if self.source_mono.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let step = self.source_sample_rate as f64 / TARGET_SAMPLE_RATE as f64;
|
||||
while self.source_position + 1.0 < self.source_mono.len() as f64 {
|
||||
let idx = self.source_position.floor() as usize;
|
||||
let frac = (self.source_position - idx as f64) as f32;
|
||||
let a = self.source_mono[idx];
|
||||
let b = self.source_mono[idx + 1];
|
||||
self.resampled.push(a + (b - a) * frac);
|
||||
self.source_position += step;
|
||||
}
|
||||
|
||||
let consumed = self.source_position.floor() as usize;
|
||||
if consumed > 0 {
|
||||
self.source_mono.drain(..consumed);
|
||||
self.source_position -= consumed as f64;
|
||||
}
|
||||
|
||||
let chunk_len = TARGET_SAMPLES_PER_CHANNEL as usize;
|
||||
while self.resampled.len() >= chunk_len {
|
||||
let samples: Vec<f32> = self.resampled.drain(..chunk_len).collect();
|
||||
let data = encode_pcm16_le_base64(&samples);
|
||||
let op = Op::RealtimeConversationAudio(ConversationAudioParams {
|
||||
frame: RealtimeAudioFrame {
|
||||
data,
|
||||
sample_rate: TARGET_SAMPLE_RATE,
|
||||
num_channels: TARGET_NUM_CHANNELS,
|
||||
samples_per_channel: Some(TARGET_SAMPLES_PER_CHANNEL),
|
||||
},
|
||||
});
|
||||
match self.realtime_audio_op_tx.try_send(op) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(_)) => {
|
||||
warn!("dropping realtime microphone frame due to full TUI audio queue");
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
warn!("failed to send realtime microphone frame: channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Debug)]
|
||||
struct PlaybackState {
|
||||
output_sample_rate: u32,
|
||||
output_channels: u16,
|
||||
queue: VecDeque<f32>,
|
||||
max_queue_samples: usize,
|
||||
}
|
||||
impl PlaybackState {
|
||||
fn new(output_sample_rate: u32, output_channels: u16) -> Self {
|
||||
let max_queue_samples =
|
||||
output_sample_rate as usize * usize::from(output_channels) * PLAYBACK_BUFFER_SECONDS;
|
||||
Self {
|
||||
output_sample_rate,
|
||||
output_channels,
|
||||
queue: VecDeque::new(),
|
||||
max_queue_samples,
|
||||
}
|
||||
}
|
||||
|
||||
fn enqueue(&mut self, frame: RealtimeAudioFrame) -> Result<()> {
|
||||
if frame.num_channels == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
if frame.data.len() > MAX_AUDIO_FRAME_ENCODED_BYTES {
|
||||
warn!(
|
||||
encoded_len = frame.data.len(),
|
||||
"dropping oversized realtime audio frame before base64 decode"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
let decoded = base64::engine::general_purpose::STANDARD
|
||||
.decode(frame.data.as_bytes())
|
||||
.context("failed to decode realtime audio base64")?;
|
||||
if decoded.len() > MAX_AUDIO_FRAME_DECODED_BYTES {
|
||||
warn!(
|
||||
decoded_len = decoded.len(),
|
||||
"dropping oversized realtime audio frame after base64 decode"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
if decoded.len() % 2 != 0 {
|
||||
return Err(anyhow::anyhow!(
|
||||
"realtime audio payload has odd byte length"
|
||||
));
|
||||
}
|
||||
|
||||
let pcm: Vec<i16> = decoded
|
||||
.chunks_exact(2)
|
||||
.map(|chunk| i16::from_le_bytes([chunk[0], chunk[1]]))
|
||||
.collect();
|
||||
if pcm.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mono = interleaved_i16_to_mono_f32(&pcm, frame.num_channels);
|
||||
if frame.sample_rate == 0 {
|
||||
warn!("dropping realtime audio frame with zero sample rate");
|
||||
return Ok(());
|
||||
}
|
||||
if !is_supported_realtime_playback_sample_rate(frame.sample_rate) {
|
||||
warn!(
|
||||
sample_rate = frame.sample_rate,
|
||||
min_sample_rate = MIN_REALTIME_PLAYBACK_SAMPLE_RATE,
|
||||
max_sample_rate = MAX_REALTIME_PLAYBACK_SAMPLE_RATE,
|
||||
"dropping realtime audio frame with unsupported sample rate"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(resampled) =
|
||||
resample_linear_mono(&mono, frame.sample_rate, self.output_sample_rate.max(1))
|
||||
else {
|
||||
warn!(
|
||||
src_rate = frame.sample_rate,
|
||||
dst_rate = self.output_sample_rate.max(1),
|
||||
max_samples = MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME,
|
||||
"dropping realtime audio frame due to oversized resampled output"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
for sample in resampled {
|
||||
for _ in 0..self.output_channels {
|
||||
self.queue.push_back(sample);
|
||||
}
|
||||
}
|
||||
|
||||
if self.queue.len() > self.max_queue_samples {
|
||||
let drop_count = self.queue.len() - self.max_queue_samples;
|
||||
self.queue.drain(..drop_count);
|
||||
warn!("dropping old playback samples due to realtime audio buffer overflow");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn next_sample(&mut self) -> f32 {
|
||||
self.queue.pop_front().unwrap_or(0.0)
|
||||
}
|
||||
}
|
||||
fn interleaved_i16_to_mono_f32(samples: &[i16], num_channels: u16) -> Vec<f32> {
|
||||
let channels = usize::from(num_channels.max(1));
|
||||
let mut mono = Vec::with_capacity(samples.len() / channels.max(1));
|
||||
for frame in samples.chunks(channels) {
|
||||
let sum: f32 = frame
|
||||
.iter()
|
||||
.map(|sample| *sample as f32 / i16::MAX as f32)
|
||||
.sum();
|
||||
mono.push(sum / frame.len() as f32);
|
||||
}
|
||||
mono
|
||||
}
|
||||
|
||||
fn is_supported_realtime_playback_sample_rate(sample_rate: u32) -> bool {
|
||||
(MIN_REALTIME_PLAYBACK_SAMPLE_RATE..=MAX_REALTIME_PLAYBACK_SAMPLE_RATE).contains(&sample_rate)
|
||||
}
|
||||
|
||||
fn resample_linear_mono(input: &[f32], src_rate: u32, dst_rate: u32) -> Option<Vec<f32>> {
|
||||
if input.is_empty() || src_rate == 0 || dst_rate == 0 {
|
||||
return Some(Vec::new());
|
||||
}
|
||||
if src_rate == dst_rate || input.len() == 1 {
|
||||
return Some(input.to_vec());
|
||||
}
|
||||
|
||||
let out_len = ((input.len() as u64 * dst_rate as u64) / src_rate as u64).max(1);
|
||||
if out_len > MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME as u64 {
|
||||
return None;
|
||||
}
|
||||
let out_len = usize::try_from(out_len).ok()?;
|
||||
let step = src_rate as f64 / dst_rate as f64;
|
||||
let mut pos = 0.0f64;
|
||||
let mut out = Vec::with_capacity(out_len);
|
||||
for _ in 0..out_len {
|
||||
let idx = pos.floor() as usize;
|
||||
if idx + 1 >= input.len() {
|
||||
out.push(*input.last().unwrap_or(&0.0));
|
||||
} else {
|
||||
let frac = (pos - idx as f64) as f32;
|
||||
let a = input[idx];
|
||||
let b = input[idx + 1];
|
||||
out.push(a + (b - a) * frac);
|
||||
}
|
||||
pos += step;
|
||||
}
|
||||
Some(out)
|
||||
}
|
||||
fn encode_pcm16_le_base64(samples: &[f32]) -> String {
|
||||
let mut bytes = Vec::with_capacity(samples.len() * 2);
|
||||
for sample in samples {
|
||||
let clamped = sample.clamp(-1.0, 1.0);
|
||||
let scaled = (clamped * i16::MAX as f32).round() as i16;
|
||||
bytes.extend_from_slice(&scaled.to_le_bytes());
|
||||
}
|
||||
base64::engine::general_purpose::STANDARD.encode(bytes)
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn build_output_stream(
|
||||
device: &cpal::Device,
|
||||
sample_format: cpal::SampleFormat,
|
||||
config: &cpal::StreamConfig,
|
||||
playback_state: Arc<Mutex<PlaybackState>>,
|
||||
) -> Result<cpal::Stream> {
|
||||
let err_fn = |err| warn!("realtime speaker stream error: {err}");
|
||||
let stream = match sample_format {
|
||||
cpal::SampleFormat::F32 => device.build_output_stream(
|
||||
config,
|
||||
move |data: &mut [f32], _| write_output_f32(data, &playback_state),
|
||||
err_fn,
|
||||
None,
|
||||
)?,
|
||||
cpal::SampleFormat::I16 => device.build_output_stream(
|
||||
config,
|
||||
move |data: &mut [i16], _| write_output_i16(data, &playback_state),
|
||||
err_fn,
|
||||
None,
|
||||
)?,
|
||||
cpal::SampleFormat::U16 => device.build_output_stream(
|
||||
config,
|
||||
move |data: &mut [u16], _| write_output_u16(data, &playback_state),
|
||||
err_fn,
|
||||
None,
|
||||
)?,
|
||||
other => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"unsupported speaker sample format: {other:?}"
|
||||
));
|
||||
}
|
||||
};
|
||||
Ok(stream)
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn write_output_f32(data: &mut [f32], playback_state: &Arc<Mutex<PlaybackState>>) {
|
||||
fill_output_buffer(data, playback_state, |sample| sample);
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn write_output_i16(data: &mut [i16], playback_state: &Arc<Mutex<PlaybackState>>) {
|
||||
fill_output_buffer(data, playback_state, |sample| {
|
||||
(sample.clamp(-1.0, 1.0) * i16::MAX as f32).round() as i16
|
||||
});
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn write_output_u16(data: &mut [u16], playback_state: &Arc<Mutex<PlaybackState>>) {
|
||||
fill_output_buffer(data, playback_state, |sample| {
|
||||
let normalized = (sample.clamp(-1.0, 1.0) + 1.0) * 0.5;
|
||||
(normalized * u16::MAX as f32).round() as u16
|
||||
});
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn fill_output_buffer<T>(
|
||||
data: &mut [T],
|
||||
playback_state: &Arc<Mutex<PlaybackState>>,
|
||||
mut convert: impl FnMut(f32) -> T,
|
||||
) {
|
||||
let mut maybe_state = playback_state.lock().ok();
|
||||
for slot in data.iter_mut() {
|
||||
let sample = maybe_state
|
||||
.as_mut()
|
||||
.map_or(0.0, |state| state.next_sample());
|
||||
*slot = convert(sample);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME;
|
||||
use super::is_supported_realtime_playback_sample_rate;
|
||||
use super::resample_linear_mono;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn resample_linear_passthrough_when_same_rate() {
|
||||
let input = vec![0.1, -0.2, 0.3];
|
||||
assert_eq!(resample_linear_mono(&input, 24_000, 24_000), Some(input));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn playback_sample_rate_validation_rejects_unrealistic_rates() {
|
||||
assert!(!is_supported_realtime_playback_sample_rate(1));
|
||||
assert!(!is_supported_realtime_playback_sample_rate(7_999));
|
||||
assert!(is_supported_realtime_playback_sample_rate(8_000));
|
||||
assert!(is_supported_realtime_playback_sample_rate(24_000));
|
||||
assert!(is_supported_realtime_playback_sample_rate(192_000));
|
||||
assert!(!is_supported_realtime_playback_sample_rate(192_001));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resample_linear_returns_none_when_output_would_exceed_cap() {
|
||||
let input = vec![0.0; (MAX_RESAMPLED_MONO_SAMPLES_PER_FRAME / 2) + 1];
|
||||
assert_eq!(resample_linear_mono(&input, 1, 2), None);
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,7 @@ pub enum SlashCommand {
|
||||
#[strum(serialize = "sandbox-add-read-dir")]
|
||||
SandboxReadRoot,
|
||||
Experimental,
|
||||
Realtime,
|
||||
Skills,
|
||||
Review,
|
||||
Rename,
|
||||
@@ -79,6 +80,7 @@ impl SlashCommand {
|
||||
SlashCommand::DebugConfig => "show config layers and requirement sources for debugging",
|
||||
SlashCommand::Statusline => "configure which items appear in the status line",
|
||||
SlashCommand::Theme => "choose a syntax highlighting theme",
|
||||
SlashCommand::Realtime => "toggle hidden low-level realtime mic/speaker loop",
|
||||
SlashCommand::Ps => "list background terminals",
|
||||
SlashCommand::Clean => "stop all background terminals",
|
||||
SlashCommand::MemoryDrop => "DO NOT USE",
|
||||
@@ -155,6 +157,7 @@ impl SlashCommand {
|
||||
| SlashCommand::Feedback
|
||||
| SlashCommand::Quit
|
||||
| SlashCommand::Exit => true,
|
||||
SlashCommand::Realtime => true,
|
||||
SlashCommand::Rollout => true,
|
||||
SlashCommand::TestApproval => true,
|
||||
SlashCommand::Collab => true,
|
||||
|
||||
Reference in New Issue
Block a user