mirror of
https://github.com/openai/codex.git
synced 2026-04-08 14:54:47 +00:00
Compare commits
19 Commits
dev/window
...
codex/real
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
077a9b4cfb | ||
|
|
c27c30699b | ||
|
|
548f90dc48 | ||
|
|
57f2d0c7f0 | ||
|
|
b8d83bb0ef | ||
|
|
a49c15ba07 | ||
|
|
de9f3d3835 | ||
|
|
6d61e60a39 | ||
|
|
55994eeaed | ||
|
|
ced98986ed | ||
|
|
847245d9fb | ||
|
|
ed3454c385 | ||
|
|
0e0481ba0f | ||
|
|
51108dccaa | ||
|
|
24ffd8d6f8 | ||
|
|
daf3c8b1f8 | ||
|
|
f4fe74b190 | ||
|
|
5f2320342e | ||
|
|
5eb7e86114 |
@@ -188,7 +188,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
|
||||
read_notification::<ThreadRealtimeClosedNotification>(&mut mcp, "thread/realtime/closed")
|
||||
.await?;
|
||||
assert_eq!(closed.thread_id, output_audio.thread_id);
|
||||
assert_eq!(closed.reason.as_deref(), Some("transport_closed"));
|
||||
assert_eq!(closed.reason.as_deref(), Some("error"));
|
||||
|
||||
let connections = realtime_server.connections();
|
||||
assert_eq!(connections.len(), 1);
|
||||
|
||||
@@ -56,6 +56,18 @@ const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000;
|
||||
const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str =
|
||||
"Conversation already has an active response in progress:";
|
||||
|
||||
#[derive(Debug)]
|
||||
enum RealtimeConversationEnd {
|
||||
Requested,
|
||||
TransportClosed,
|
||||
Error,
|
||||
}
|
||||
|
||||
enum RealtimeFanoutTaskStop {
|
||||
Abort,
|
||||
Detach,
|
||||
}
|
||||
|
||||
pub(crate) struct RealtimeConversationManager {
|
||||
state: Mutex<Option<ConversationState>>,
|
||||
}
|
||||
@@ -120,7 +132,8 @@ struct ConversationState {
|
||||
user_text_tx: Sender<String>,
|
||||
writer: RealtimeWebsocketWriter,
|
||||
handoff: RealtimeHandoffState,
|
||||
task: JoinHandle<()>,
|
||||
input_task: JoinHandle<()>,
|
||||
fanout_task: Option<JoinHandle<()>>,
|
||||
realtime_active: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
@@ -150,9 +163,7 @@ impl RealtimeConversationManager {
|
||||
guard.take()
|
||||
};
|
||||
if let Some(state) = previous_state {
|
||||
state.realtime_active.store(false, Ordering::Relaxed);
|
||||
state.task.abort();
|
||||
let _ = state.task.await;
|
||||
stop_conversation_state(state, RealtimeFanoutTaskStop::Abort).await;
|
||||
}
|
||||
let session_kind = match session_config.event_parser {
|
||||
RealtimeEventParser::V1 => RealtimeSessionKind::V1,
|
||||
@@ -199,12 +210,48 @@ impl RealtimeConversationManager {
|
||||
user_text_tx,
|
||||
writer,
|
||||
handoff,
|
||||
task,
|
||||
input_task: task,
|
||||
fanout_task: None,
|
||||
realtime_active: Arc::clone(&realtime_active),
|
||||
});
|
||||
Ok((events_rx, realtime_active))
|
||||
}
|
||||
|
||||
pub(crate) async fn register_fanout_task(
|
||||
&self,
|
||||
realtime_active: &Arc<AtomicBool>,
|
||||
fanout_task: JoinHandle<()>,
|
||||
) {
|
||||
let mut fanout_task = Some(fanout_task);
|
||||
{
|
||||
let mut guard = self.state.lock().await;
|
||||
if let Some(state) = guard.as_mut()
|
||||
&& Arc::ptr_eq(&state.realtime_active, realtime_active)
|
||||
{
|
||||
state.fanout_task = fanout_task.take();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(fanout_task) = fanout_task {
|
||||
fanout_task.abort();
|
||||
let _ = fanout_task.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_if_active(&self, realtime_active: &Arc<AtomicBool>) {
|
||||
let state = {
|
||||
let mut guard = self.state.lock().await;
|
||||
match guard.as_ref() {
|
||||
Some(state) if Arc::ptr_eq(&state.realtime_active, realtime_active) => guard.take(),
|
||||
_ => None,
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(state) = state {
|
||||
stop_conversation_state(state, RealtimeFanoutTaskStop::Detach).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn audio_in(&self, frame: RealtimeAudioFrame) -> CodexResult<()> {
|
||||
let sender = {
|
||||
let guard = self.state.lock().await;
|
||||
@@ -332,19 +379,77 @@ impl RealtimeConversationManager {
|
||||
};
|
||||
|
||||
if let Some(state) = state {
|
||||
state.realtime_active.store(false, Ordering::Relaxed);
|
||||
state.task.abort();
|
||||
let _ = state.task.await;
|
||||
stop_conversation_state(state, RealtimeFanoutTaskStop::Abort).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn stop_conversation_state(
|
||||
mut state: ConversationState,
|
||||
fanout_task_stop: RealtimeFanoutTaskStop,
|
||||
) {
|
||||
state.realtime_active.store(false, Ordering::Relaxed);
|
||||
state.input_task.abort();
|
||||
let _ = state.input_task.await;
|
||||
|
||||
if let Some(fanout_task) = state.fanout_task.take() {
|
||||
match fanout_task_stop {
|
||||
RealtimeFanoutTaskStop::Abort => {
|
||||
fanout_task.abort();
|
||||
let _ = fanout_task.await;
|
||||
}
|
||||
RealtimeFanoutTaskStop::Detach => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_start(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: String,
|
||||
params: ConversationStartParams,
|
||||
) -> CodexResult<()> {
|
||||
let prepared_start = match prepare_realtime_start(sess, params).await {
|
||||
Ok(prepared_start) => prepared_start,
|
||||
Err(err) => {
|
||||
error!("failed to prepare realtime conversation: {err}");
|
||||
let message = err.to_string();
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error(message),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = handle_start_inner(sess, &sub_id, prepared_start).await {
|
||||
error!("failed to start realtime conversation: {err}");
|
||||
let message = err.to_string();
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error(message),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct PreparedRealtimeConversationStart {
|
||||
api_provider: ApiProvider,
|
||||
extra_headers: Option<HeaderMap>,
|
||||
requested_session_id: Option<String>,
|
||||
session_config: RealtimeSessionConfig,
|
||||
}
|
||||
|
||||
async fn prepare_realtime_start(
|
||||
sess: &Arc<Session>,
|
||||
params: ConversationStartParams,
|
||||
) -> CodexResult<PreparedRealtimeConversationStart> {
|
||||
let provider = sess.provider().await;
|
||||
let auth = sess.services.auth_manager.auth().await;
|
||||
let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?;
|
||||
@@ -379,9 +484,7 @@ pub(crate) async fn handle_start(
|
||||
RealtimeWsMode::Conversational => RealtimeSessionMode::Conversational,
|
||||
RealtimeWsMode::Transcription => RealtimeSessionMode::Transcription,
|
||||
};
|
||||
let requested_session_id = params
|
||||
.session_id
|
||||
.or_else(|| Some(sess.conversation_id.to_string()));
|
||||
let requested_session_id = params.session_id.or(Some(sess.conversation_id.to_string()));
|
||||
let session_config = RealtimeSessionConfig {
|
||||
instructions: prompt,
|
||||
model,
|
||||
@@ -391,24 +494,35 @@ pub(crate) async fn handle_start(
|
||||
};
|
||||
let extra_headers =
|
||||
realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?;
|
||||
Ok(PreparedRealtimeConversationStart {
|
||||
api_provider,
|
||||
extra_headers,
|
||||
requested_session_id,
|
||||
session_config,
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_start_inner(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: &str,
|
||||
prepared_start: PreparedRealtimeConversationStart,
|
||||
) -> CodexResult<()> {
|
||||
let PreparedRealtimeConversationStart {
|
||||
api_provider,
|
||||
extra_headers,
|
||||
requested_session_id,
|
||||
session_config,
|
||||
} = prepared_start;
|
||||
info!("starting realtime conversation");
|
||||
let (events_rx, realtime_active) = match sess
|
||||
let (events_rx, realtime_active) = sess
|
||||
.conversation
|
||||
.start(api_provider, extra_headers, session_config)
|
||||
.await
|
||||
{
|
||||
Ok(events_rx) => events_rx,
|
||||
Err(err) => {
|
||||
error!("failed to start realtime conversation: {err}");
|
||||
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
.await?;
|
||||
|
||||
info!("realtime conversation started");
|
||||
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id.clone(),
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent {
|
||||
session_id: requested_session_id,
|
||||
}),
|
||||
@@ -416,12 +530,18 @@ pub(crate) async fn handle_start(
|
||||
.await;
|
||||
|
||||
let sess_clone = Arc::clone(sess);
|
||||
tokio::spawn(async move {
|
||||
let sub_id = sub_id.to_string();
|
||||
let fanout_realtime_active = Arc::clone(&realtime_active);
|
||||
let fanout_task = tokio::spawn(async move {
|
||||
let ev = |msg| Event {
|
||||
id: sub_id.clone(),
|
||||
msg,
|
||||
};
|
||||
let mut end = RealtimeConversationEnd::TransportClosed;
|
||||
while let Ok(event) = events_rx.recv().await {
|
||||
if !fanout_realtime_active.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
// if not audio out, log the event
|
||||
if !matches!(event, RealtimeEvent::AudioOut(_)) {
|
||||
info!(
|
||||
@@ -429,6 +549,9 @@ pub(crate) async fn handle_start(
|
||||
"received realtime conversation event"
|
||||
);
|
||||
}
|
||||
if matches!(event, RealtimeEvent::Error(_)) {
|
||||
end = RealtimeConversationEnd::Error;
|
||||
}
|
||||
let maybe_routed_text = match &event {
|
||||
RealtimeEvent::HandoffRequested(handoff) => {
|
||||
realtime_text_from_handoff_request(handoff)
|
||||
@@ -440,6 +563,9 @@ pub(crate) async fn handle_start(
|
||||
let sess_for_routed_text = Arc::clone(&sess_clone);
|
||||
sess_for_routed_text.route_realtime_text_input(text).await;
|
||||
}
|
||||
if !fanout_realtime_active.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
sess_clone
|
||||
.send_event_raw(ev(EventMsg::RealtimeConversationRealtime(
|
||||
RealtimeConversationRealtimeEvent {
|
||||
@@ -448,17 +574,20 @@ pub(crate) async fn handle_start(
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
if realtime_active.swap(false, Ordering::Relaxed) {
|
||||
info!("realtime conversation transport closed");
|
||||
if fanout_realtime_active.swap(false, Ordering::Relaxed) {
|
||||
if matches!(end, RealtimeConversationEnd::TransportClosed) {
|
||||
info!("realtime conversation transport closed");
|
||||
}
|
||||
sess_clone
|
||||
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(
|
||||
RealtimeConversationClosedEvent {
|
||||
reason: Some("transport_closed".to_string()),
|
||||
},
|
||||
)))
|
||||
.conversation
|
||||
.finish_if_active(&fanout_realtime_active)
|
||||
.await;
|
||||
send_realtime_conversation_closed(&sess_clone, sub_id, end).await;
|
||||
}
|
||||
});
|
||||
sess.conversation
|
||||
.register_fanout_task(&realtime_active, fanout_task)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -470,7 +599,12 @@ pub(crate) async fn handle_audio(
|
||||
) {
|
||||
if let Err(err) = sess.conversation.audio_in(params.frame).await {
|
||||
error!("failed to append realtime audio: {err}");
|
||||
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
|
||||
if sess.conversation.running_state().await.is_some() {
|
||||
warn!("realtime audio input failed while the session was already ending");
|
||||
} else {
|
||||
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,14 +612,12 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt
|
||||
let active_transcript = handoff
|
||||
.active_transcript
|
||||
.iter()
|
||||
.map(|entry| format!("{}: {}", entry.role, entry.text))
|
||||
.map(|entry| format!("{role}: {text}", role = entry.role, text = entry.text))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
(!active_transcript.is_empty())
|
||||
.then_some(active_transcript)
|
||||
.or_else(|| {
|
||||
(!handoff.input_transcript.is_empty()).then(|| handoff.input_transcript.clone())
|
||||
})
|
||||
.or((!handoff.input_transcript.is_empty()).then_some(handoff.input_transcript.clone()))
|
||||
}
|
||||
|
||||
fn realtime_api_key(
|
||||
@@ -545,25 +677,17 @@ pub(crate) async fn handle_text(
|
||||
debug!(text = %params.text, "[realtime-text] appending realtime conversation text input");
|
||||
if let Err(err) = sess.conversation.text_in(params.text).await {
|
||||
error!("failed to append realtime text: {err}");
|
||||
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
|
||||
if sess.conversation.running_state().await.is_some() {
|
||||
warn!("realtime text input failed while the session was already ending");
|
||||
} else {
|
||||
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
|
||||
match sess.conversation.shutdown().await {
|
||||
Ok(()) => {
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent {
|
||||
reason: Some("requested".to_string()),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
|
||||
}
|
||||
}
|
||||
end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Requested).await;
|
||||
}
|
||||
|
||||
fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
@@ -591,6 +715,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
if let Err(err) = writer.send_conversation_item_create(text).await {
|
||||
let mapped_error = map_api_error(err);
|
||||
warn!("failed to send input text: {mapped_error}");
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
if matches!(session_kind, RealtimeSessionKind::V2) {
|
||||
@@ -599,6 +726,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
} else if let Err(err) = writer.send_response_create().await {
|
||||
let mapped_error = map_api_error(err);
|
||||
warn!("failed to send text response.create: {mapped_error}");
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
break;
|
||||
} else {
|
||||
pending_response_create = false;
|
||||
@@ -623,6 +753,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
{
|
||||
let mapped_error = map_api_error(err);
|
||||
warn!("failed to send handoff output: {mapped_error}");
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -636,6 +769,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
{
|
||||
let mapped_error = map_api_error(err);
|
||||
warn!("failed to send handoff output: {mapped_error}");
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
if matches!(session_kind, RealtimeSessionKind::V2) {
|
||||
@@ -646,6 +782,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
warn!(
|
||||
"failed to send handoff response.create: {mapped_error}"
|
||||
);
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
break;
|
||||
} else {
|
||||
pending_response_create = false;
|
||||
@@ -683,6 +822,11 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
warn!(
|
||||
"failed to send deferred response.create: {mapped_error}"
|
||||
);
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(
|
||||
mapped_error.to_string(),
|
||||
))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
pending_response_create = false;
|
||||
@@ -730,6 +874,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
warn!(
|
||||
"failed to send deferred response.create after cancellation: {mapped_error}"
|
||||
);
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
pending_response_create = false;
|
||||
@@ -771,11 +918,6 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(
|
||||
"realtime websocket connection is closed".to_string(),
|
||||
))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -798,6 +940,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
if let Err(err) = writer.send_audio_frame(frame).await {
|
||||
let mapped_error = map_api_error(err);
|
||||
error!("failed to send input audio: {mapped_error}");
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -837,7 +982,7 @@ fn update_output_audio_state(
|
||||
fn audio_duration_ms(frame: &RealtimeAudioFrame) -> u32 {
|
||||
let Some(samples_per_channel) = frame
|
||||
.samples_per_channel
|
||||
.or_else(|| decoded_samples_per_channel(frame))
|
||||
.or(decoded_samples_per_channel(frame))
|
||||
else {
|
||||
return 0;
|
||||
};
|
||||
@@ -868,6 +1013,33 @@ async fn send_conversation_error(
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn end_realtime_conversation(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: String,
|
||||
end: RealtimeConversationEnd,
|
||||
) {
|
||||
let _ = sess.conversation.shutdown().await;
|
||||
send_realtime_conversation_closed(sess, sub_id, end).await;
|
||||
}
|
||||
|
||||
async fn send_realtime_conversation_closed(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: String,
|
||||
end: RealtimeConversationEnd,
|
||||
) {
|
||||
let reason = match end {
|
||||
RealtimeConversationEnd::Requested => Some("requested".to_string()),
|
||||
RealtimeConversationEnd::TransportClosed => Some("transport_closed".to_string()),
|
||||
RealtimeConversationEnd::Error => Some("error".to_string()),
|
||||
};
|
||||
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent { reason }),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "realtime_conversation_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -29,15 +29,17 @@ use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use serial_test::serial;
|
||||
use std::ffi::OsString;
|
||||
use std::fs;
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const MEMORY_PROMPT_PHRASE: &str =
|
||||
"You have access to a memory folder with guidance from prior runs.";
|
||||
const REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR: &str =
|
||||
"CODEX_REALTIME_CONVERSATION_TEST_SUBPROCESS";
|
||||
fn websocket_request_text(
|
||||
request: &core_test_support::responses::WebSocketRequest,
|
||||
) -> Option<String> {
|
||||
@@ -81,6 +83,33 @@ where
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn run_realtime_conversation_test_in_subprocess(
|
||||
test_name: &str,
|
||||
openai_api_key: Option<&str>,
|
||||
) -> Result<()> {
|
||||
let mut command = Command::new(std::env::current_exe()?);
|
||||
command
|
||||
.arg("--exact")
|
||||
.arg(test_name)
|
||||
.env(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR, "1");
|
||||
match openai_api_key {
|
||||
Some(openai_api_key) => {
|
||||
command.env(OPENAI_API_KEY_ENV_VAR, openai_api_key);
|
||||
}
|
||||
None => {
|
||||
command.env_remove(OPENAI_API_KEY_ENV_VAR);
|
||||
}
|
||||
}
|
||||
let output = command.output()?;
|
||||
assert!(
|
||||
output.status.success(),
|
||||
"subprocess test `{test_name}` failed\nstdout:\n{}\nstderr:\n{}",
|
||||
String::from_utf8_lossy(&output.stdout),
|
||||
String::from_utf8_lossy(&output.stderr),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
async fn seed_recent_thread(
|
||||
test: &TestCodex,
|
||||
title: &str,
|
||||
@@ -258,11 +287,16 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[serial(openai_api_key_env)]
|
||||
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
|
||||
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
|
||||
return run_realtime_conversation_test_in_subprocess(
|
||||
"suite::realtime_conversation::conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth",
|
||||
Some("env-realtime-key"),
|
||||
);
|
||||
}
|
||||
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let _env_guard = EnvGuard::set(OPENAI_API_KEY_ENV_VAR, "env-realtime-key");
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
@@ -367,34 +401,6 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct EnvGuard {
|
||||
key: &'static str,
|
||||
original: Option<OsString>,
|
||||
}
|
||||
|
||||
impl EnvGuard {
|
||||
fn set(key: &'static str, value: &str) -> Self {
|
||||
let original = std::env::var_os(key);
|
||||
// SAFETY: this guard restores the original value before the test exits.
|
||||
unsafe {
|
||||
std::env::set_var(key, value);
|
||||
}
|
||||
Self { key, original }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EnvGuard {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: this guard restores the original value for the modified env var.
|
||||
unsafe {
|
||||
match &self.original {
|
||||
Some(value) => std::env::set_var(self.key, value),
|
||||
None => std::env::remove_var(self.key),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_audio_before_start_emits_error() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -427,6 +433,91 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Result<()> {
|
||||
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
|
||||
return run_realtime_conversation_test_in_subprocess(
|
||||
"suite::realtime_conversation::conversation_start_preflight_failure_emits_realtime_error_only",
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![]).await;
|
||||
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let err = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error(message),
|
||||
}) => Some(message.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(err, "realtime conversation requires API key auth");
|
||||
|
||||
let closed = timeout(Duration::from_millis(200), async {
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
})
|
||||
.await;
|
||||
assert!(closed.is_err(), "preflight failure should not emit closed");
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_connect_failure_emits_realtime_error_only() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![]).await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.experimental_realtime_ws_base_url = Some("http://127.0.0.1:1".to_string());
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let err = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error(message),
|
||||
}) => Some(message.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert!(!err.is_empty());
|
||||
|
||||
let closed = timeout(Duration::from_millis(200), async {
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
})
|
||||
.await;
|
||||
assert!(closed.is_err(), "connect failure should not emit closed");
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_text_before_start_emits_error() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -7866,7 +7866,6 @@ impl ChatWidget {
|
||||
self.request_realtime_conversation_close(Some(
|
||||
"Realtime voice mode was closed because the feature was disabled.".to_string(),
|
||||
));
|
||||
self.reset_realtime_conversation_state();
|
||||
}
|
||||
}
|
||||
if feature == Feature::FastMode {
|
||||
@@ -8053,7 +8052,7 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
pub(crate) fn realtime_conversation_is_live(&self) -> bool {
|
||||
self.realtime_conversation.is_active()
|
||||
self.realtime_conversation.is_live()
|
||||
}
|
||||
|
||||
fn current_realtime_audio_device_name(&self, kind: RealtimeAudioDeviceKind) -> Option<String> {
|
||||
@@ -8665,6 +8664,13 @@ impl ChatWidget {
|
||||
}
|
||||
return;
|
||||
}
|
||||
if self.realtime_conversation.is_live() {
|
||||
self.bottom_pane.clear_quit_shortcut_hint();
|
||||
self.quit_shortcut_expires_at = None;
|
||||
self.quit_shortcut_key = None;
|
||||
self.request_realtime_conversation_close(/*info_message*/ None);
|
||||
return;
|
||||
}
|
||||
|
||||
if !DOUBLE_PRESS_QUIT_SHORTCUT_ENABLED {
|
||||
if self.is_cancellable_work_active() {
|
||||
@@ -9256,6 +9262,12 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
pub(crate) fn remove_transcription_placeholder(&mut self, id: &str) {
|
||||
if self.realtime_conversation.is_live()
|
||||
&& self.realtime_conversation.meter_placeholder_id.as_deref() == Some(id)
|
||||
{
|
||||
self.realtime_conversation.meter_placeholder_id = None;
|
||||
self.request_realtime_conversation_close(/*info_message*/ None);
|
||||
}
|
||||
self.bottom_pane.remove_transcription_placeholder(id);
|
||||
// Ensure the UI redraws to reflect placeholder removal.
|
||||
self.request_redraw();
|
||||
|
||||
@@ -21,11 +21,11 @@ pub(super) enum RealtimeConversationPhase {
|
||||
|
||||
#[derive(Default)]
|
||||
pub(super) struct RealtimeConversationUiState {
|
||||
phase: RealtimeConversationPhase,
|
||||
pub(super) phase: RealtimeConversationPhase,
|
||||
requested_close: bool,
|
||||
session_id: Option<String>,
|
||||
warned_audio_only_submission: bool,
|
||||
meter_placeholder_id: Option<String>,
|
||||
pub(super) meter_placeholder_id: Option<String>,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
capture_stop_flag: Option<Arc<AtomicBool>>,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
@@ -244,13 +244,22 @@ impl ChatWidget {
|
||||
self.realtime_conversation.warned_audio_only_submission = false;
|
||||
}
|
||||
|
||||
fn fail_realtime_conversation(&mut self, message: String) {
|
||||
self.add_error_message(message);
|
||||
if self.realtime_conversation.is_live() {
|
||||
self.request_realtime_conversation_close(/*info_message*/ None);
|
||||
} else {
|
||||
self.reset_realtime_conversation_state();
|
||||
self.request_redraw();
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn on_realtime_conversation_started(
|
||||
&mut self,
|
||||
ev: RealtimeConversationStartedEvent,
|
||||
) {
|
||||
if !self.realtime_conversation_enabled() {
|
||||
self.submit_op(Op::RealtimeConversationClose);
|
||||
self.reset_realtime_conversation_state();
|
||||
self.request_realtime_conversation_close(/*info_message*/ None);
|
||||
return;
|
||||
}
|
||||
self.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
@@ -287,8 +296,7 @@ impl ChatWidget {
|
||||
RealtimeEvent::ConversationItemDone { .. } => {}
|
||||
RealtimeEvent::HandoffRequested(_) => {}
|
||||
RealtimeEvent::Error(message) => {
|
||||
self.add_error_message(format!("Realtime voice error: {message}"));
|
||||
self.reset_realtime_conversation_state();
|
||||
self.fail_realtime_conversation(format!("Realtime voice error: {message}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -297,7 +305,10 @@ impl ChatWidget {
|
||||
let requested = self.realtime_conversation.requested_close;
|
||||
let reason = ev.reason;
|
||||
self.reset_realtime_conversation_state();
|
||||
if !requested && let Some(reason) = reason {
|
||||
if !requested
|
||||
&& let Some(reason) = reason
|
||||
&& reason != "error"
|
||||
{
|
||||
self.add_info_message(
|
||||
format!("Realtime voice mode closed: {reason}"),
|
||||
/*hint*/ None,
|
||||
@@ -345,9 +356,11 @@ impl ChatWidget {
|
||||
) {
|
||||
Ok(capture) => capture,
|
||||
Err(err) => {
|
||||
self.remove_transcription_placeholder(&placeholder_id);
|
||||
self.realtime_conversation.meter_placeholder_id = None;
|
||||
self.add_error_message(format!("Failed to start microphone capture: {err}"));
|
||||
self.remove_transcription_placeholder(&placeholder_id);
|
||||
self.fail_realtime_conversation(format!(
|
||||
"Failed to start microphone capture: {err}"
|
||||
));
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -410,7 +423,9 @@ impl ChatWidget {
|
||||
self.realtime_conversation.audio_player = Some(player);
|
||||
}
|
||||
Err(err) => {
|
||||
self.add_error_message(format!("Failed to start speaker output: {err}"));
|
||||
self.fail_realtime_conversation(format!(
|
||||
"Failed to start speaker output: {err}"
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::app_event_sender::AppEventSender;
|
||||
use crate::bottom_pane::FeedbackAudience;
|
||||
use crate::bottom_pane::LocalImageAttachment;
|
||||
use crate::bottom_pane::MentionBinding;
|
||||
use crate::chatwidget::realtime::RealtimeConversationPhase;
|
||||
use crate::history_cell::UserHistoryCell;
|
||||
use crate::test_backend::VT100Backend;
|
||||
use crate::tui::FrameRequester;
|
||||
@@ -95,6 +96,9 @@ use codex_protocol::protocol::PatchApplyEndEvent;
|
||||
use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
use codex_protocol::protocol::ReadOnlyAccess;
|
||||
use codex_protocol::protocol::RealtimeConversationClosedEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::ReviewTarget;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
@@ -1957,6 +1961,21 @@ fn next_interrupt_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Op>) {
|
||||
}
|
||||
}
|
||||
|
||||
fn next_realtime_close_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Op>) {
|
||||
loop {
|
||||
match op_rx.try_recv() {
|
||||
Ok(Op::RealtimeConversationClose) => return,
|
||||
Ok(_) => continue,
|
||||
Err(TryRecvError::Empty) => {
|
||||
panic!("expected realtime close op but queue was empty")
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
panic!("expected realtime close op but channel closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_no_submit_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Op>) {
|
||||
while let Ok(op) = op_rx.try_recv() {
|
||||
assert!(
|
||||
@@ -4744,6 +4763,22 @@ async fn ctrl_c_shutdown_works_with_caps_lock() {
|
||||
assert_matches!(rx.try_recv(), Ok(AppEvent::Exit(ExitMode::ShutdownFirst)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ctrl_c_closes_realtime_conversation_before_interrupt_or_quit() {
|
||||
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Char('c'), KeyModifiers::CONTROL));
|
||||
|
||||
next_realtime_close_op(&mut op_rx);
|
||||
assert_eq!(
|
||||
chat.realtime_conversation.phase,
|
||||
RealtimeConversationPhase::Stopping
|
||||
);
|
||||
assert!(!chat.bottom_pane.quit_shortcut_hint_visible());
|
||||
assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ctrl_d_quits_without_prompt() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
@@ -4793,6 +4828,45 @@ async fn ctrl_c_cleared_prompt_is_recoverable_via_history() {
|
||||
assert_eq!(vec![PathBuf::from("/tmp/preview.png")], images);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_error_closes_without_followup_closed_info() {
|
||||
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
|
||||
chat.on_realtime_conversation_realtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error("boom".to_string()),
|
||||
});
|
||||
next_realtime_close_op(&mut op_rx);
|
||||
|
||||
chat.on_realtime_conversation_closed(RealtimeConversationClosedEvent {
|
||||
reason: Some("error".to_string()),
|
||||
});
|
||||
|
||||
let rendered = drain_insert_history(&mut rx)
|
||||
.into_iter()
|
||||
.map(|lines| lines_to_single_string(&lines))
|
||||
.collect::<Vec<_>>();
|
||||
assert_snapshot!(rendered.join("\n\n"), @"■ Realtime voice error: boom");
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
#[tokio::test]
|
||||
async fn removing_active_realtime_placeholder_closes_realtime_conversation() {
|
||||
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
let placeholder_id = chat.bottom_pane.insert_transcription_placeholder("⠤⠤⠤⠤");
|
||||
chat.realtime_conversation.meter_placeholder_id = Some(placeholder_id.clone());
|
||||
|
||||
chat.remove_transcription_placeholder(&placeholder_id);
|
||||
|
||||
next_realtime_close_op(&mut op_rx);
|
||||
assert_eq!(chat.realtime_conversation.meter_placeholder_id, None);
|
||||
assert_eq!(
|
||||
chat.realtime_conversation.phase,
|
||||
RealtimeConversationPhase::Stopping
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn exec_history_cell_shows_working_then_completed() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
|
||||
@@ -7652,7 +7652,6 @@ impl ChatWidget {
|
||||
self.request_realtime_conversation_close(Some(
|
||||
"Realtime voice mode was closed because the feature was disabled.".to_string(),
|
||||
));
|
||||
self.reset_realtime_conversation_state();
|
||||
}
|
||||
}
|
||||
if feature == Feature::FastMode {
|
||||
@@ -7867,7 +7866,7 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
pub(crate) fn realtime_conversation_is_live(&self) -> bool {
|
||||
self.realtime_conversation.is_active()
|
||||
self.realtime_conversation.is_live()
|
||||
}
|
||||
|
||||
fn current_realtime_audio_device_name(&self, kind: RealtimeAudioDeviceKind) -> Option<String> {
|
||||
@@ -8482,6 +8481,13 @@ impl ChatWidget {
|
||||
}
|
||||
return;
|
||||
}
|
||||
if self.realtime_conversation.is_live() {
|
||||
self.bottom_pane.clear_quit_shortcut_hint();
|
||||
self.quit_shortcut_expires_at = None;
|
||||
self.quit_shortcut_key = None;
|
||||
self.request_realtime_conversation_close(/*info_message*/ None);
|
||||
return;
|
||||
}
|
||||
|
||||
if !DOUBLE_PRESS_QUIT_SHORTCUT_ENABLED {
|
||||
if self.is_cancellable_work_active() {
|
||||
@@ -9068,6 +9074,12 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
pub(crate) fn remove_transcription_placeholder(&mut self, id: &str) {
|
||||
if self.realtime_conversation.is_live()
|
||||
&& self.realtime_conversation.meter_placeholder_id.as_deref() == Some(id)
|
||||
{
|
||||
self.realtime_conversation.meter_placeholder_id = None;
|
||||
self.request_realtime_conversation_close(/*info_message*/ None);
|
||||
}
|
||||
self.bottom_pane.remove_transcription_placeholder(id);
|
||||
// Ensure the UI redraws to reflect placeholder removal.
|
||||
self.request_redraw();
|
||||
|
||||
@@ -21,11 +21,11 @@ pub(super) enum RealtimeConversationPhase {
|
||||
|
||||
#[derive(Default)]
|
||||
pub(super) struct RealtimeConversationUiState {
|
||||
phase: RealtimeConversationPhase,
|
||||
pub(super) phase: RealtimeConversationPhase,
|
||||
requested_close: bool,
|
||||
session_id: Option<String>,
|
||||
warned_audio_only_submission: bool,
|
||||
meter_placeholder_id: Option<String>,
|
||||
pub(super) meter_placeholder_id: Option<String>,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
capture_stop_flag: Option<Arc<AtomicBool>>,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
@@ -243,13 +243,22 @@ impl ChatWidget {
|
||||
self.realtime_conversation.warned_audio_only_submission = false;
|
||||
}
|
||||
|
||||
fn fail_realtime_conversation(&mut self, message: String) {
|
||||
self.add_error_message(message);
|
||||
if self.realtime_conversation.is_live() {
|
||||
self.request_realtime_conversation_close(/*info_message*/ None);
|
||||
} else {
|
||||
self.reset_realtime_conversation_state();
|
||||
self.request_redraw();
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn on_realtime_conversation_started(
|
||||
&mut self,
|
||||
ev: RealtimeConversationStartedEvent,
|
||||
) {
|
||||
if !self.realtime_conversation_enabled() {
|
||||
self.submit_op(AppCommand::realtime_conversation_close());
|
||||
self.reset_realtime_conversation_state();
|
||||
self.request_realtime_conversation_close(/*info_message*/ None);
|
||||
return;
|
||||
}
|
||||
self.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
@@ -277,8 +286,7 @@ impl ChatWidget {
|
||||
RealtimeEvent::ConversationItemDone { .. } => {}
|
||||
RealtimeEvent::HandoffRequested(_) => {}
|
||||
RealtimeEvent::Error(message) => {
|
||||
self.add_error_message(format!("Realtime voice error: {message}"));
|
||||
self.reset_realtime_conversation_state();
|
||||
self.fail_realtime_conversation(format!("Realtime voice error: {message}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -287,7 +295,10 @@ impl ChatWidget {
|
||||
let requested = self.realtime_conversation.requested_close;
|
||||
let reason = ev.reason;
|
||||
self.reset_realtime_conversation_state();
|
||||
if !requested && let Some(reason) = reason {
|
||||
if !requested
|
||||
&& let Some(reason) = reason
|
||||
&& reason != "error"
|
||||
{
|
||||
self.add_info_message(
|
||||
format!("Realtime voice mode closed: {reason}"),
|
||||
/*hint*/ None,
|
||||
@@ -341,9 +352,11 @@ impl ChatWidget {
|
||||
) {
|
||||
Ok(capture) => capture,
|
||||
Err(err) => {
|
||||
self.remove_transcription_placeholder(&placeholder_id);
|
||||
self.realtime_conversation.meter_placeholder_id = None;
|
||||
self.add_error_message(format!("Failed to start microphone capture: {err}"));
|
||||
self.remove_transcription_placeholder(&placeholder_id);
|
||||
self.fail_realtime_conversation(format!(
|
||||
"Failed to start microphone capture: {err}"
|
||||
));
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -400,7 +413,9 @@ impl ChatWidget {
|
||||
self.realtime_conversation.audio_player = Some(player);
|
||||
}
|
||||
Err(err) => {
|
||||
self.add_error_message(format!("Failed to start speaker output: {err}"));
|
||||
self.fail_realtime_conversation(format!(
|
||||
"Failed to start speaker output: {err}"
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::app_event_sender::AppEventSender;
|
||||
use crate::bottom_pane::FeedbackAudience;
|
||||
use crate::bottom_pane::LocalImageAttachment;
|
||||
use crate::bottom_pane::MentionBinding;
|
||||
use crate::chatwidget::realtime::RealtimeConversationPhase;
|
||||
use crate::history_cell::UserHistoryCell;
|
||||
use crate::model_catalog::ModelCatalog;
|
||||
use crate::test_backend::VT100Backend;
|
||||
@@ -94,6 +95,9 @@ use codex_protocol::protocol::PatchApplyEndEvent;
|
||||
use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
use codex_protocol::protocol::ReadOnlyAccess;
|
||||
use codex_protocol::protocol::RealtimeConversationClosedEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::ReviewTarget;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
@@ -1954,6 +1958,21 @@ fn next_interrupt_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Op>) {
|
||||
}
|
||||
}
|
||||
|
||||
fn next_realtime_close_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Op>) {
|
||||
loop {
|
||||
match op_rx.try_recv() {
|
||||
Ok(Op::RealtimeConversationClose) => return,
|
||||
Ok(_) => continue,
|
||||
Err(TryRecvError::Empty) => {
|
||||
panic!("expected realtime close op but queue was empty")
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
panic!("expected realtime close op but channel closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_no_submit_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Op>) {
|
||||
while let Ok(op) = op_rx.try_recv() {
|
||||
assert!(
|
||||
@@ -4706,6 +4725,22 @@ async fn ctrl_c_shutdown_works_with_caps_lock() {
|
||||
assert_matches!(rx.try_recv(), Ok(AppEvent::Exit(ExitMode::ShutdownFirst)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ctrl_c_closes_realtime_conversation_before_interrupt_or_quit() {
|
||||
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Char('c'), KeyModifiers::CONTROL));
|
||||
|
||||
next_realtime_close_op(&mut op_rx);
|
||||
assert_eq!(
|
||||
chat.realtime_conversation.phase,
|
||||
RealtimeConversationPhase::Stopping
|
||||
);
|
||||
assert!(!chat.bottom_pane.quit_shortcut_hint_visible());
|
||||
assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ctrl_d_quits_without_prompt() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
@@ -4755,6 +4790,45 @@ async fn ctrl_c_cleared_prompt_is_recoverable_via_history() {
|
||||
assert_eq!(vec![PathBuf::from("/tmp/preview.png")], images);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_error_closes_without_followup_closed_info() {
|
||||
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
|
||||
chat.on_realtime_conversation_realtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error("boom".to_string()),
|
||||
});
|
||||
next_realtime_close_op(&mut op_rx);
|
||||
|
||||
chat.on_realtime_conversation_closed(RealtimeConversationClosedEvent {
|
||||
reason: Some("error".to_string()),
|
||||
});
|
||||
|
||||
let rendered = drain_insert_history(&mut rx)
|
||||
.into_iter()
|
||||
.map(|lines| lines_to_single_string(&lines))
|
||||
.collect::<Vec<_>>();
|
||||
assert_snapshot!(rendered.join("\n\n"), @"■ Realtime voice error: boom");
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
#[tokio::test]
|
||||
async fn removing_active_realtime_placeholder_closes_realtime_conversation() {
|
||||
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.realtime_conversation.phase = RealtimeConversationPhase::Active;
|
||||
let placeholder_id = chat.bottom_pane.insert_transcription_placeholder("⠤⠤⠤⠤");
|
||||
chat.realtime_conversation.meter_placeholder_id = Some(placeholder_id.clone());
|
||||
|
||||
chat.remove_transcription_placeholder(&placeholder_id);
|
||||
|
||||
next_realtime_close_op(&mut op_rx);
|
||||
assert_eq!(chat.realtime_conversation.meter_placeholder_id, None);
|
||||
assert_eq!(
|
||||
chat.realtime_conversation.phase,
|
||||
RealtimeConversationPhase::Stopping
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn exec_history_cell_shows_working_then_completed() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
|
||||
Reference in New Issue
Block a user