use crate::app_event::AppEvent; use crate::app_event_sender::AppEventSender; use base64::Engine; use codex_core::auth::AuthCredentialsStoreMode; use codex_core::config::Config; use codex_core::config::find_codex_home; use codex_core::default_client::get_codex_user_agent; use codex_login::AuthMode; use codex_login::CodexAuth; use codex_protocol::protocol::ConversationAudioParams; use codex_protocol::protocol::Op; use codex_protocol::protocol::RealtimeAudioFrame; use cpal::traits::DeviceTrait; use cpal::traits::HostTrait; use cpal::traits::StreamTrait; use hound::SampleFormat; use hound::WavSpec; use hound::WavWriter; use std::collections::VecDeque; use std::io::Cursor; use std::sync::Arc; use std::sync::Mutex; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU16; use std::sync::atomic::Ordering; use tracing::error; use tracing::info; use tracing::trace; const AUDIO_MODEL: &str = "gpt-4o-mini-transcribe"; const MODEL_AUDIO_SAMPLE_RATE: u32 = 24_000; const MODEL_AUDIO_CHANNELS: u16 = 1; struct TranscriptionAuthContext { mode: AuthMode, bearer_token: String, chatgpt_account_id: Option, chatgpt_base_url: String, } pub struct RecordedAudio { pub data: Vec, pub sample_rate: u32, pub channels: u16, } pub struct VoiceCapture { stream: Option, sample_rate: u32, channels: u16, data: Arc>>, stopped: Arc, last_peak: Arc, } impl VoiceCapture { pub fn start() -> Result { let (device, config) = select_default_input_device_and_config()?; let sample_rate = config.sample_rate().0; let channels = config.channels(); let data: Arc>> = Arc::new(Mutex::new(Vec::new())); let stopped = Arc::new(AtomicBool::new(false)); let last_peak = Arc::new(AtomicU16::new(0)); let stream = build_input_stream(&device, &config, data.clone(), last_peak.clone())?; stream .play() .map_err(|e| format!("failed to start input stream: {e}"))?; Ok(Self { stream: Some(stream), sample_rate, channels, data, stopped, last_peak, }) } pub fn start_realtime(config: &Config, tx: AppEventSender) -> Result { let (device, config) = select_realtime_input_device_and_config(config)?; let sample_rate = config.sample_rate().0; let channels = config.channels(); let data: Arc>> = Arc::new(Mutex::new(Vec::new())); let stopped = Arc::new(AtomicBool::new(false)); let last_peak = Arc::new(AtomicU16::new(0)); let stream = build_realtime_input_stream( &device, &config, sample_rate, channels, tx, last_peak.clone(), )?; stream .play() .map_err(|e| format!("failed to start input stream: {e}"))?; Ok(Self { stream: Some(stream), sample_rate, channels, data, stopped, last_peak, }) } pub fn stop(mut self) -> Result { // Mark stopped so any metering task can exit cleanly. self.stopped.store(true, Ordering::SeqCst); // Dropping the stream stops capture. self.stream.take(); let data = self .data .lock() .map_err(|_| "failed to lock audio buffer".to_string())? .clone(); Ok(RecordedAudio { data, sample_rate: self.sample_rate, channels: self.channels, }) } pub fn data_arc(&self) -> Arc>> { self.data.clone() } pub fn stopped_flag(&self) -> Arc { self.stopped.clone() } pub fn sample_rate(&self) -> u32 { self.sample_rate } pub fn channels(&self) -> u16 { self.channels } pub fn last_peak_arc(&self) -> Arc { self.last_peak.clone() } } pub(crate) struct RecordingMeterState { history: VecDeque, noise_ema: f64, env: f64, } impl RecordingMeterState { pub(crate) fn new() -> Self { let mut history = VecDeque::with_capacity(4); while history.len() < 4 { history.push_back('⠤'); } Self { history, noise_ema: 0.02, env: 0.0, } } pub(crate) fn next_text(&mut self, peak: u16) -> String { const SYMBOLS: [char; 7] = ['⠤', '⠴', '⠶', '⠷', '⡷', '⡿', '⣿']; const ALPHA_NOISE: f64 = 0.05; const ATTACK: f64 = 0.80; const RELEASE: f64 = 0.25; let latest_peak = peak as f64 / (i16::MAX as f64); if latest_peak > self.env { self.env = ATTACK * latest_peak + (1.0 - ATTACK) * self.env; } else { self.env = RELEASE * latest_peak + (1.0 - RELEASE) * self.env; } let rms_approx = self.env * 0.7; self.noise_ema = (1.0 - ALPHA_NOISE) * self.noise_ema + ALPHA_NOISE * rms_approx; let ref_level = self.noise_ema.max(0.01); let fast_signal = 0.8 * latest_peak + 0.2 * self.env; let target = 2.0f64; let raw = (fast_signal / (ref_level * target)).max(0.0); let k = 1.6f64; let compressed = (raw.ln_1p() / k.ln_1p()).min(1.0); let idx = (compressed * (SYMBOLS.len() as f64 - 1.0)) .round() .clamp(0.0, SYMBOLS.len() as f64 - 1.0) as usize; let level_char = SYMBOLS[idx]; if self.history.len() >= 4 { self.history.pop_front(); } self.history.push_back(level_char); let mut text = String::with_capacity(4); for ch in &self.history { text.push(*ch); } text } } pub fn transcribe_async( id: String, audio: RecordedAudio, context: Option, tx: AppEventSender, ) { std::thread::spawn(move || { // Enforce minimum duration to avoid garbage outputs. const MIN_DURATION_SECONDS: f32 = 1.0; let duration_seconds = clip_duration_seconds(&audio); if duration_seconds < MIN_DURATION_SECONDS { let msg = format!( "recording too short ({duration_seconds:.2}s); minimum is {MIN_DURATION_SECONDS:.2}s" ); info!("{msg}"); tx.send(AppEvent::TranscriptionFailed { id, error: msg }); return; } // Encode entire clip as normalized WAV. let wav_bytes = match encode_wav_normalized(&audio) { Ok(b) => b, Err(e) => { error!("failed to encode wav: {e}"); tx.send(AppEvent::TranscriptionFailed { id, error: e }); return; } }; // Run the HTTP request on a small, dedicated runtime. let rt = match tokio::runtime::Runtime::new() { Ok(rt) => rt, Err(e) => { error!("failed to create tokio runtime: {e}"); return; } }; let tx2 = tx.clone(); let id2 = id.clone(); let res: Result = rt .block_on(async move { transcribe_bytes(wav_bytes, context, duration_seconds).await }); match res { Ok(text) => { tx2.send(AppEvent::TranscriptionComplete { id: id2, text }); info!("voice transcription succeeded"); } Err(e) => { error!("voice transcription error: {e}"); tx.send(AppEvent::TranscriptionFailed { id, error: e }); } } }); } // ------------------------- // Voice input helpers // ------------------------- fn select_default_input_device_and_config() -> Result<(cpal::Device, cpal::SupportedStreamConfig), String> { let host = cpal::default_host(); let device = host .default_input_device() .ok_or_else(|| "no input audio device available".to_string())?; let config = crate::audio_device::preferred_input_config(&device)?; Ok((device, config)) } fn select_realtime_input_device_and_config( config: &Config, ) -> Result<(cpal::Device, cpal::SupportedStreamConfig), String> { crate::audio_device::select_configured_input_device_and_config(config) } fn build_input_stream( device: &cpal::Device, config: &cpal::SupportedStreamConfig, data: Arc>>, last_peak: Arc, ) -> Result { match config.sample_format() { cpal::SampleFormat::F32 => device .build_input_stream( &config.clone().into(), move |input: &[f32], _| { let peak = peak_f32(input); last_peak.store(peak, Ordering::Relaxed); if let Ok(mut buf) = data.lock() { for &s in input { buf.push(f32_to_i16(s)); } } }, move |err| error!("audio input error: {err}"), None, ) .map_err(|e| format!("failed to build input stream: {e}")), cpal::SampleFormat::I16 => device .build_input_stream( &config.clone().into(), move |input: &[i16], _| { let peak = peak_i16(input); last_peak.store(peak, Ordering::Relaxed); if let Ok(mut buf) = data.lock() { buf.extend_from_slice(input); } }, move |err| error!("audio input error: {err}"), None, ) .map_err(|e| format!("failed to build input stream: {e}")), cpal::SampleFormat::U16 => device .build_input_stream( &config.clone().into(), move |input: &[u16], _| { if let Ok(mut buf) = data.lock() { let peak = convert_u16_to_i16_and_peak(input, &mut buf); last_peak.store(peak, Ordering::Relaxed); } }, move |err| error!("audio input error: {err}"), None, ) .map_err(|e| format!("failed to build input stream: {e}")), _ => Err("unsupported input sample format".to_string()), } } fn build_realtime_input_stream( device: &cpal::Device, config: &cpal::SupportedStreamConfig, sample_rate: u32, channels: u16, tx: AppEventSender, last_peak: Arc, ) -> Result { match config.sample_format() { cpal::SampleFormat::F32 => device .build_input_stream( &config.clone().into(), move |input: &[f32], _| { let peak = peak_f32(input); last_peak.store(peak, Ordering::Relaxed); let samples = input.iter().copied().map(f32_to_i16).collect::>(); send_realtime_audio_chunk(&tx, samples, sample_rate, channels); }, move |err| error!("audio input error: {err}"), None, ) .map_err(|e| format!("failed to build input stream: {e}")), cpal::SampleFormat::I16 => device .build_input_stream( &config.clone().into(), move |input: &[i16], _| { let peak = peak_i16(input); last_peak.store(peak, Ordering::Relaxed); send_realtime_audio_chunk(&tx, input.to_vec(), sample_rate, channels); }, move |err| error!("audio input error: {err}"), None, ) .map_err(|e| format!("failed to build input stream: {e}")), cpal::SampleFormat::U16 => device .build_input_stream( &config.clone().into(), move |input: &[u16], _| { let mut samples = Vec::with_capacity(input.len()); let peak = convert_u16_to_i16_and_peak(input, &mut samples); last_peak.store(peak, Ordering::Relaxed); send_realtime_audio_chunk(&tx, samples, sample_rate, channels); }, move |err| error!("audio input error: {err}"), None, ) .map_err(|e| format!("failed to build input stream: {e}")), _ => Err("unsupported input sample format".to_string()), } } fn send_realtime_audio_chunk( tx: &AppEventSender, samples: Vec, sample_rate: u32, channels: u16, ) { if samples.is_empty() || sample_rate == 0 || channels == 0 { return; } let samples = if sample_rate == MODEL_AUDIO_SAMPLE_RATE && channels == MODEL_AUDIO_CHANNELS { samples } else { convert_pcm16( &samples, sample_rate, channels, MODEL_AUDIO_SAMPLE_RATE, MODEL_AUDIO_CHANNELS, ) }; if samples.is_empty() { return; } let mut bytes = Vec::with_capacity(samples.len() * 2); for sample in &samples { bytes.extend_from_slice(&sample.to_le_bytes()); } let encoded = base64::engine::general_purpose::STANDARD.encode(bytes); let samples_per_channel = (samples.len() / usize::from(MODEL_AUDIO_CHANNELS)) as u32; tx.send(AppEvent::CodexOp(Op::RealtimeConversationAudio( ConversationAudioParams { frame: RealtimeAudioFrame { data: encoded, sample_rate: MODEL_AUDIO_SAMPLE_RATE, num_channels: MODEL_AUDIO_CHANNELS, samples_per_channel: Some(samples_per_channel), }, }, ))); } #[inline] fn f32_abs_to_u16(x: f32) -> u16 { let peak_u = (x.abs().min(1.0) * i16::MAX as f32) as i32; peak_u.max(0) as u16 } #[inline] fn f32_to_i16(s: f32) -> i16 { (s.clamp(-1.0, 1.0) * i16::MAX as f32) as i16 } fn peak_f32(input: &[f32]) -> u16 { let mut peak: f32 = 0.0; for &s in input { let a = s.abs(); if a > peak { peak = a; } } f32_abs_to_u16(peak) } fn peak_i16(input: &[i16]) -> u16 { let mut peak: i32 = 0; for &s in input { let a = (s as i32).unsigned_abs() as i32; if a > peak { peak = a; } } peak as u16 } fn convert_u16_to_i16_and_peak(input: &[u16], out: &mut Vec) -> u16 { let mut peak: i32 = 0; for &s in input { let v_i16 = (s as i32 - 32768) as i16; let a = (v_i16 as i32).unsigned_abs() as i32; if a > peak { peak = a; } out.push(v_i16); } peak as u16 } // ------------------------- // Realtime audio playback helpers // ------------------------- pub(crate) struct RealtimeAudioPlayer { _stream: cpal::Stream, queue: Arc>>, output_sample_rate: u32, output_channels: u16, } impl RealtimeAudioPlayer { pub(crate) fn start(config: &Config) -> Result { let (device, config) = crate::audio_device::select_configured_output_device_and_config(config)?; let output_sample_rate = config.sample_rate().0; let output_channels = config.channels(); let queue = Arc::new(Mutex::new(VecDeque::new())); let stream = build_output_stream(&device, &config, Arc::clone(&queue))?; stream .play() .map_err(|e| format!("failed to start output stream: {e}"))?; Ok(Self { _stream: stream, queue, output_sample_rate, output_channels, }) } pub(crate) fn enqueue_frame(&self, frame: &RealtimeAudioFrame) -> Result<(), String> { if frame.num_channels == 0 || frame.sample_rate == 0 { return Err("invalid realtime audio frame format".to_string()); } let raw_bytes = base64::engine::general_purpose::STANDARD .decode(&frame.data) .map_err(|e| format!("failed to decode realtime audio: {e}"))?; if raw_bytes.len() % 2 != 0 { return Err("realtime audio frame had odd byte length".to_string()); } let mut pcm = Vec::with_capacity(raw_bytes.len() / 2); for pair in raw_bytes.chunks_exact(2) { pcm.push(i16::from_le_bytes([pair[0], pair[1]])); } let converted = convert_pcm16( &pcm, frame.sample_rate, frame.num_channels, self.output_sample_rate, self.output_channels, ); if converted.is_empty() { return Ok(()); } let mut guard = self .queue .lock() .map_err(|_| "failed to lock output audio queue".to_string())?; // TODO(aibrahim): Cap or trim this queue if we observe producer bursts outrunning playback. guard.extend(converted); Ok(()) } pub(crate) fn clear(&self) { if let Ok(mut guard) = self.queue.lock() { guard.clear(); } } } fn build_output_stream( device: &cpal::Device, config: &cpal::SupportedStreamConfig, queue: Arc>>, ) -> Result { let config_any: cpal::StreamConfig = config.clone().into(); match config.sample_format() { cpal::SampleFormat::F32 => device .build_output_stream( &config_any, move |output: &mut [f32], _| fill_output_f32(output, &queue), move |err| error!("audio output error: {err}"), None, ) .map_err(|e| format!("failed to build f32 output stream: {e}")), cpal::SampleFormat::I16 => device .build_output_stream( &config_any, move |output: &mut [i16], _| fill_output_i16(output, &queue), move |err| error!("audio output error: {err}"), None, ) .map_err(|e| format!("failed to build i16 output stream: {e}")), cpal::SampleFormat::U16 => device .build_output_stream( &config_any, move |output: &mut [u16], _| fill_output_u16(output, &queue), move |err| error!("audio output error: {err}"), None, ) .map_err(|e| format!("failed to build u16 output stream: {e}")), other => Err(format!("unsupported output sample format: {other:?}")), } } fn fill_output_i16(output: &mut [i16], queue: &Arc>>) { if let Ok(mut guard) = queue.lock() { for sample in output { *sample = guard.pop_front().unwrap_or(0); } return; } output.fill(0); } fn fill_output_f32(output: &mut [f32], queue: &Arc>>) { if let Ok(mut guard) = queue.lock() { for sample in output { let v = guard.pop_front().unwrap_or(0); *sample = (v as f32) / (i16::MAX as f32); } return; } output.fill(0.0); } fn fill_output_u16(output: &mut [u16], queue: &Arc>>) { if let Ok(mut guard) = queue.lock() { for sample in output { let v = guard.pop_front().unwrap_or(0); *sample = (v as i32 + 32768).clamp(0, u16::MAX as i32) as u16; } return; } output.fill(32768); } fn convert_pcm16( input: &[i16], input_sample_rate: u32, input_channels: u16, output_sample_rate: u32, output_channels: u16, ) -> Vec { if input.is_empty() || input_channels == 0 || output_channels == 0 { return Vec::new(); } let in_channels = input_channels as usize; let out_channels = output_channels as usize; let in_frames = input.len() / in_channels; if in_frames == 0 { return Vec::new(); } let out_frames = if input_sample_rate == output_sample_rate { in_frames } else { (((in_frames as u64) * (output_sample_rate as u64)) / (input_sample_rate as u64)).max(1) as usize }; let mut out = Vec::with_capacity(out_frames.saturating_mul(out_channels)); for out_frame_idx in 0..out_frames { let src_frame_idx = if out_frames <= 1 || in_frames <= 1 { 0 } else { ((out_frame_idx as u64) * ((in_frames - 1) as u64) / ((out_frames - 1) as u64)) as usize }; let src_start = src_frame_idx.saturating_mul(in_channels); let src = &input[src_start..src_start + in_channels]; match (in_channels, out_channels) { (1, 1) => out.push(src[0]), (1, n) => { for _ in 0..n { out.push(src[0]); } } (n, 1) if n >= 2 => { let sum: i32 = src.iter().map(|s| *s as i32).sum(); out.push((sum / (n as i32)) as i16); } (n, m) if n == m => out.extend_from_slice(src), (n, m) if n > m => out.extend_from_slice(&src[..m]), (n, m) => { out.extend_from_slice(src); let last = *src.last().unwrap_or(&0); for _ in n..m { out.push(last); } } } } out } // ------------------------- // Transcription helpers // ------------------------- fn clip_duration_seconds(audio: &RecordedAudio) -> f32 { let total_samples = audio.data.len() as f32; let samples_per_second = (audio.sample_rate as f32) * (audio.channels as f32); if samples_per_second > 0.0 { total_samples / samples_per_second } else { 0.0 } } fn encode_wav_normalized(audio: &RecordedAudio) -> Result, String> { let converted; let (channels, sample_rate, segment) = if audio.channels == MODEL_AUDIO_CHANNELS && audio.sample_rate == MODEL_AUDIO_SAMPLE_RATE { (audio.channels, audio.sample_rate, audio.data.as_slice()) } else { converted = convert_pcm16( &audio.data, audio.sample_rate, audio.channels, MODEL_AUDIO_SAMPLE_RATE, MODEL_AUDIO_CHANNELS, ); ( MODEL_AUDIO_CHANNELS, MODEL_AUDIO_SAMPLE_RATE, converted.as_slice(), ) }; let mut wav_bytes: Vec = Vec::new(); let spec = WavSpec { channels, sample_rate, bits_per_sample: 16, sample_format: SampleFormat::Int, }; let mut cursor = Cursor::new(&mut wav_bytes); let mut writer = WavWriter::new(&mut cursor, spec).map_err(|_| "failed to create wav writer".to_string())?; // Simple peak normalization with headroom to improve audibility on quiet inputs. let mut peak: i16 = 0; for &s in segment { let a = s.unsigned_abs(); if a > peak.unsigned_abs() { peak = s; } } let peak_abs = (peak as i32).unsigned_abs() as i32; let target = (i16::MAX as f32) * 0.9; // leave some headroom let gain: f32 = if peak_abs > 0 { target / (peak_abs as f32) } else { 1.0 }; for &s in segment { let v = ((s as f32) * gain) .round() .clamp(i16::MIN as f32, i16::MAX as f32) as i16; writer .write_sample(v) .map_err(|_| "failed writing wav sample".to_string())?; } writer .finalize() .map_err(|_| "failed to finalize wav".to_string())?; Ok(wav_bytes) } fn normalize_chatgpt_base_url(input: &str) -> String { let mut base_url = input.to_string(); while base_url.ends_with('/') { base_url.pop(); } if (base_url.starts_with("https://chatgpt.com") || base_url.starts_with("https://chat.openai.com")) && !base_url.contains("/backend-api") { base_url = format!("{base_url}/backend-api"); } base_url } async fn resolve_auth() -> Result { let codex_home = find_codex_home().map_err(|e| format!("failed to find codex home: {e}"))?; let auth = CodexAuth::from_auth_storage(&codex_home, AuthCredentialsStoreMode::Auto) .map_err(|e| format!("failed to read auth.json: {e}"))? .ok_or_else(|| "No Codex auth is configured; please run `codex login`".to_string())?; let chatgpt_account_id = auth.get_account_id(); let token = auth .get_token() .map_err(|e| format!("failed to get auth token: {e}"))?; let config = Config::load_with_cli_overrides(Vec::new()) .await .map_err(|e| format!("failed to load config: {e}"))?; Ok(TranscriptionAuthContext { mode: auth.api_auth_mode(), bearer_token: token, chatgpt_account_id, chatgpt_base_url: normalize_chatgpt_base_url(&config.chatgpt_base_url), }) } async fn transcribe_bytes( wav_bytes: Vec, context: Option, duration_seconds: f32, ) -> Result { let auth = resolve_auth().await?; let client = reqwest::Client::new(); let audio_bytes = wav_bytes.len(); let prompt_for_log = context.as_deref().unwrap_or("").to_string(); let (endpoint, request) = if matches!(auth.mode, AuthMode::Chatgpt | AuthMode::ChatgptAuthTokens) { let part = reqwest::multipart::Part::bytes(wav_bytes) .file_name("audio.wav") .mime_str("audio/wav") .map_err(|e| format!("failed to set mime: {e}"))?; let form = reqwest::multipart::Form::new().part("file", part); let endpoint = format!("{}/transcribe", auth.chatgpt_base_url); let mut req = client .post(&endpoint) .bearer_auth(&auth.bearer_token) .multipart(form) .header("User-Agent", get_codex_user_agent()); if let Some(acc) = auth.chatgpt_account_id { req = req.header("ChatGPT-Account-Id", acc); } (endpoint, req) } else { let part = reqwest::multipart::Part::bytes(wav_bytes) .file_name("audio.wav") .mime_str("audio/wav") .map_err(|e| format!("failed to set mime: {e}"))?; let mut form = reqwest::multipart::Form::new() .text("model", AUDIO_MODEL) .part("file", part); if let Some(context) = context { form = form.text("prompt", context); } let endpoint = "https://api.openai.com/v1/audio/transcriptions".to_string(); ( endpoint, client .post("https://api.openai.com/v1/audio/transcriptions") .bearer_auth(&auth.bearer_token) .multipart(form) .header("User-Agent", get_codex_user_agent()), ) }; let audio_kib = audio_bytes as f32 / 1024.0; let mode = auth.mode; trace!( "sending transcription request: mode={mode:?} endpoint={endpoint} duration={duration_seconds:.2}s audio={audio_kib:.1}KiB prompt={prompt_for_log}" ); let resp = request .send() .await .map_err(|e| format!("transcription request failed: {e}"))?; if !resp.status().is_success() { let status = resp.status(); let body = resp .text() .await .unwrap_or_else(|_| "".to_string()); return Err(format!("transcription failed: {status} {body}")); } let v: serde_json::Value = resp .json() .await .map_err(|e| format!("failed to parse json: {e}"))?; let text = v .get("text") .and_then(|t| t.as_str()) .unwrap_or("") .to_string(); if text.is_empty() { Err("empty transcription result".to_string()) } else { Ok(text) } } #[cfg(test)] mod tests { use super::RecordedAudio; use super::convert_pcm16; use super::encode_wav_normalized; use pretty_assertions::assert_eq; use std::io::Cursor; #[test] fn convert_pcm16_downmixes_and_resamples_for_model_input() { let input = vec![100, 300, 200, 400, 500, 700, 600, 800]; let converted = convert_pcm16(&input, 48_000, 2, 24_000, 1); assert_eq!(converted, vec![200, 700]); } #[test] fn encode_wav_normalized_outputs_24khz_mono_audio() { let audio = RecordedAudio { data: vec![100, 300, 200, 400, 500, 700, 600, 800], sample_rate: 48_000, channels: 2, }; let wav = encode_wav_normalized(&audio).expect("wav should encode"); let reader = hound::WavReader::new(Cursor::new(wav)).expect("wav should parse"); let spec = reader.spec(); let samples = reader .into_samples::() .collect::, _>>() .expect("samples should decode"); assert_eq!(spec.channels, 1); assert_eq!(spec.sample_rate, 24_000); assert_eq!(samples, vec![8_426, 29_490]); } }