Align realtime error close handling

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-03-17 09:33:52 -07:00
parent 5eb7e86114
commit 5f2320342e
2 changed files with 11 additions and 14 deletions

View File

@@ -188,7 +188,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
read_notification::<ThreadRealtimeClosedNotification>(&mut mcp, "thread/realtime/closed")
.await?;
assert_eq!(closed.thread_id, output_audio.thread_id);
assert_eq!(closed.reason.as_deref(), Some("transport_closed"));
assert_eq!(closed.reason.as_deref(), Some("error"));
let connections = realtime_server.connections();
assert_eq!(connections.len(), 1);

View File

@@ -60,13 +60,8 @@ const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str =
enum RealtimeConversationEnd {
Requested,
TransportClosed,
Error(RealtimeConversationError),
}
#[derive(Debug)]
enum RealtimeConversationError {
Emit(String),
AlreadySent,
Error(String),
ErrorAlreadySent,
}
pub(crate) struct RealtimeConversationManager {
@@ -363,7 +358,7 @@ pub(crate) async fn handle_start(
end_realtime_conversation(
sess,
sub_id,
RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())),
RealtimeConversationEnd::Error(err.to_string()),
)
.await;
}
@@ -454,7 +449,7 @@ async fn handle_start_inner(
);
}
if matches!(event, RealtimeEvent::Error(_)) {
end = RealtimeConversationEnd::Error(RealtimeConversationError::AlreadySent);
end = RealtimeConversationEnd::ErrorAlreadySent;
}
let maybe_routed_text = match &event {
RealtimeEvent::HandoffRequested(handoff) => {
@@ -497,7 +492,7 @@ pub(crate) async fn handle_audio(
end_realtime_conversation(
sess,
sub_id,
RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())),
RealtimeConversationEnd::Error(err.to_string()),
)
.await;
} else {
@@ -582,7 +577,7 @@ pub(crate) async fn handle_text(
end_realtime_conversation(
sess,
sub_id,
RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())),
RealtimeConversationEnd::Error(err.to_string()),
)
.await;
} else {
@@ -900,7 +895,7 @@ async fn end_realtime_conversation(
) {
let _ = sess.conversation.shutdown().await;
if let RealtimeConversationEnd::Error(RealtimeConversationError::Emit(message)) = &end {
if let RealtimeConversationEnd::Error(message) = &end {
sess.send_event_raw(Event {
id: sub_id.clone(),
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
@@ -913,7 +908,9 @@ async fn end_realtime_conversation(
let reason = match end {
RealtimeConversationEnd::Requested => Some("requested".to_string()),
RealtimeConversationEnd::TransportClosed => Some("transport_closed".to_string()),
RealtimeConversationEnd::Error(_) => Some("error".to_string()),
RealtimeConversationEnd::Error(_) | RealtimeConversationEnd::ErrorAlreadySent => {
Some("error".to_string())
}
};
sess.send_event_raw(Event {