From ebca8c2aa992b6c1614bcbc5dc4e4e00b17cb1ba Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Fri, 20 Mar 2026 14:09:45 -0700 Subject: [PATCH] codex: send realtime transcript deltas only Emit only the live transcript delta entries from realtime transcript notifications instead of the accumulated transcript snapshot, and update the v2 test and docs to match. Co-authored-by: Codex --- .../app-server-protocol/src/protocol/v2.rs | 2 +- codex-rs/app-server/README.md | 2 +- .../app-server/src/bespoke_event_handling.rs | 21 ++++++------- codex-rs/app-server/src/thread_state.rs | 31 ------------------- .../tests/suite/v2/realtime_conversation.rs | 14 +++------ 5 files changed, 15 insertions(+), 55 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index d14d23b62e..565adfd7ea 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3787,7 +3787,7 @@ pub struct ThreadRealtimeItemAddedNotification { pub item: JsonValue, } -/// EXPERIMENTAL - flat transcript snapshot emitted whenever realtime +/// EXPERIMENTAL - flat transcript delta emitted whenever realtime /// transcript text changes. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 349090e786..d82e1c6770 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -826,7 +826,7 @@ The thread realtime API emits thread-scoped notifications for session lifecycle - `thread/realtime/started` — `{ threadId, sessionId }` once realtime starts for the thread (experimental). - `thread/realtime/itemAdded` — `{ threadId, item }` for raw non-audio realtime items that do not have a dedicated typed app-server notification (experimental). `item` is forwarded as raw JSON while the upstream websocket item schema remains unstable. -- `thread/realtime/transcriptUpdated` — `{ threadId, transcript }` whenever realtime transcript text changes (experimental). `transcript` is a flat array of `{ role, text }` entries accumulated from the live user/assistant transcript deltas. +- `thread/realtime/transcriptUpdated` — `{ threadId, transcript }` whenever realtime transcript text changes (experimental). `transcript` is a flat array of `{ role, text }` entries for the live delta(s) from that realtime event, not the full accumulated transcript. - `thread/realtime/outputAudio/delta` — `{ threadId, audio }` for streamed output audio chunks (experimental). `audio` uses camelCase fields (`data`, `sampleRate`, `numChannels`, `samplesPerChannel`). - `thread/realtime/error` — `{ threadId, message }` when realtime encounters a transport or backend error (experimental). - `thread/realtime/closed` — `{ threadId, reason }` when the realtime transport closes (experimental). diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 1ef000f356..c8d109edef 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -86,6 +86,7 @@ use codex_app_server_protocol::ThreadRealtimeErrorNotification; use codex_app_server_protocol::ThreadRealtimeItemAddedNotification; use codex_app_server_protocol::ThreadRealtimeOutputAudioDeltaNotification; use codex_app_server_protocol::ThreadRealtimeStartedNotification; +use codex_app_server_protocol::ThreadRealtimeTranscriptEntry; use codex_app_server_protocol::ThreadRealtimeTranscriptUpdatedNotification; use codex_app_server_protocol::ThreadRollbackResponse; use codex_app_server_protocol::ThreadTokenUsage; @@ -367,7 +368,6 @@ pub(crate) async fn apply_bespoke_event_handling( } } EventMsg::RealtimeConversationStarted(event) => { - thread_state.lock().await.reset_realtime_transcript(); if let ApiVersion::V2 = api_version { let notification = ThreadRealtimeStartedNotification { thread_id: conversation_id.to_string(), @@ -400,13 +400,12 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } RealtimeEvent::InputTranscriptDelta(event) => { - let transcript = thread_state - .lock() - .await - .append_realtime_transcript_delta("user", &event.delta); let notification = ThreadRealtimeTranscriptUpdatedNotification { thread_id: conversation_id.to_string(), - transcript, + transcript: vec![ThreadRealtimeTranscriptEntry { + role: "user".to_string(), + text: event.delta, + }], }; outgoing .send_server_notification( @@ -415,13 +414,12 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } RealtimeEvent::OutputTranscriptDelta(event) => { - let transcript = thread_state - .lock() - .await - .append_realtime_transcript_delta("assistant", &event.delta); let notification = ThreadRealtimeTranscriptUpdatedNotification { thread_id: conversation_id.to_string(), - transcript, + transcript: vec![ThreadRealtimeTranscriptEntry { + role: "assistant".to_string(), + text: event.delta, + }], }; outgoing .send_server_notification( @@ -482,7 +480,6 @@ pub(crate) async fn apply_bespoke_event_handling( } } EventMsg::RealtimeConversationClosed(event) => { - thread_state.lock().await.reset_realtime_transcript(); if let ApiVersion::V2 = api_version { let notification = ThreadRealtimeClosedNotification { thread_id: conversation_id.to_string(), diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 1815481915..be5478dd51 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -2,7 +2,6 @@ use crate::outgoing_message::ConnectionId; use crate::outgoing_message::ConnectionRequestId; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ThreadHistoryBuilder; -use codex_app_server_protocol::ThreadRealtimeTranscriptEntry; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnError; use codex_core::CodexThread; @@ -55,7 +54,6 @@ pub(crate) struct ThreadState { pub(crate) pending_interrupts: PendingInterruptQueue, pub(crate) pending_rollbacks: Option, pub(crate) turn_summary: TurnSummary, - pub(crate) realtime_transcript: Vec, pub(crate) cancel_tx: Option>, pub(crate) experimental_raw_events: bool, pub(crate) listener_generation: u64, @@ -93,7 +91,6 @@ impl ThreadState { } self.listener_command_tx = None; self.current_turn_history.reset(); - self.realtime_transcript.clear(); self.listener_thread = None; } @@ -117,34 +114,6 @@ impl ThreadState { self.current_turn_history.reset(); } } - - pub(crate) fn reset_realtime_transcript(&mut self) { - self.realtime_transcript.clear(); - } - - pub(crate) fn append_realtime_transcript_delta( - &mut self, - role: &str, - delta: &str, - ) -> Vec { - if delta.is_empty() { - return self.realtime_transcript.clone(); - } - - if let Some(last_entry) = self.realtime_transcript.last_mut() - && last_entry.role == role - { - last_entry.text.push_str(delta); - return self.realtime_transcript.clone(); - } - - self.realtime_transcript - .push(ThreadRealtimeTranscriptEntry { - role: role.to_string(), - text: delta.to_string(), - }); - self.realtime_transcript.clone() - } } struct ThreadEntry { diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 016076bcb9..9b6f119056 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -213,16 +213,10 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { assert_eq!(second_transcript_update.thread_id, output_audio.thread_id); assert_eq!( second_transcript_update.transcript, - vec![ - ThreadRealtimeTranscriptEntry { - role: "user".to_string(), - text: "delegate now".to_string(), - }, - ThreadRealtimeTranscriptEntry { - role: "assistant".to_string(), - text: "working".to_string(), - }, - ] + vec![ThreadRealtimeTranscriptEntry { + role: "assistant".to_string(), + text: "working".to_string(), + }] ); let realtime_error =