diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index d14d23b62e..565adfd7ea 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3787,7 +3787,7 @@ pub struct ThreadRealtimeItemAddedNotification { pub item: JsonValue, } -/// EXPERIMENTAL - flat transcript snapshot emitted whenever realtime +/// EXPERIMENTAL - flat transcript delta emitted whenever realtime /// transcript text changes. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 349090e786..d82e1c6770 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -826,7 +826,7 @@ The thread realtime API emits thread-scoped notifications for session lifecycle - `thread/realtime/started` — `{ threadId, sessionId }` once realtime starts for the thread (experimental). - `thread/realtime/itemAdded` — `{ threadId, item }` for raw non-audio realtime items that do not have a dedicated typed app-server notification (experimental). `item` is forwarded as raw JSON while the upstream websocket item schema remains unstable. -- `thread/realtime/transcriptUpdated` — `{ threadId, transcript }` whenever realtime transcript text changes (experimental). `transcript` is a flat array of `{ role, text }` entries accumulated from the live user/assistant transcript deltas. +- `thread/realtime/transcriptUpdated` — `{ threadId, transcript }` whenever realtime transcript text changes (experimental). `transcript` is a flat array of `{ role, text }` entries for the live delta(s) from that realtime event, not the full accumulated transcript. - `thread/realtime/outputAudio/delta` — `{ threadId, audio }` for streamed output audio chunks (experimental). `audio` uses camelCase fields (`data`, `sampleRate`, `numChannels`, `samplesPerChannel`). - `thread/realtime/error` — `{ threadId, message }` when realtime encounters a transport or backend error (experimental). - `thread/realtime/closed` — `{ threadId, reason }` when the realtime transport closes (experimental). diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 1ef000f356..c8d109edef 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -86,6 +86,7 @@ use codex_app_server_protocol::ThreadRealtimeErrorNotification; use codex_app_server_protocol::ThreadRealtimeItemAddedNotification; use codex_app_server_protocol::ThreadRealtimeOutputAudioDeltaNotification; use codex_app_server_protocol::ThreadRealtimeStartedNotification; +use codex_app_server_protocol::ThreadRealtimeTranscriptEntry; use codex_app_server_protocol::ThreadRealtimeTranscriptUpdatedNotification; use codex_app_server_protocol::ThreadRollbackResponse; use codex_app_server_protocol::ThreadTokenUsage; @@ -367,7 +368,6 @@ pub(crate) async fn apply_bespoke_event_handling( } } EventMsg::RealtimeConversationStarted(event) => { - thread_state.lock().await.reset_realtime_transcript(); if let ApiVersion::V2 = api_version { let notification = ThreadRealtimeStartedNotification { thread_id: conversation_id.to_string(), @@ -400,13 +400,12 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } RealtimeEvent::InputTranscriptDelta(event) => { - let transcript = thread_state - .lock() - .await - .append_realtime_transcript_delta("user", &event.delta); let notification = ThreadRealtimeTranscriptUpdatedNotification { thread_id: conversation_id.to_string(), - transcript, + transcript: vec![ThreadRealtimeTranscriptEntry { + role: "user".to_string(), + text: event.delta, + }], }; outgoing .send_server_notification( @@ -415,13 +414,12 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } RealtimeEvent::OutputTranscriptDelta(event) => { - let transcript = thread_state - .lock() - .await - .append_realtime_transcript_delta("assistant", &event.delta); let notification = ThreadRealtimeTranscriptUpdatedNotification { thread_id: conversation_id.to_string(), - transcript, + transcript: vec![ThreadRealtimeTranscriptEntry { + role: "assistant".to_string(), + text: event.delta, + }], }; outgoing .send_server_notification( @@ -482,7 +480,6 @@ pub(crate) async fn apply_bespoke_event_handling( } } EventMsg::RealtimeConversationClosed(event) => { - thread_state.lock().await.reset_realtime_transcript(); if let ApiVersion::V2 = api_version { let notification = ThreadRealtimeClosedNotification { thread_id: conversation_id.to_string(), diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 1815481915..be5478dd51 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -2,7 +2,6 @@ use crate::outgoing_message::ConnectionId; use crate::outgoing_message::ConnectionRequestId; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ThreadHistoryBuilder; -use codex_app_server_protocol::ThreadRealtimeTranscriptEntry; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnError; use codex_core::CodexThread; @@ -55,7 +54,6 @@ pub(crate) struct ThreadState { pub(crate) pending_interrupts: PendingInterruptQueue, pub(crate) pending_rollbacks: Option, pub(crate) turn_summary: TurnSummary, - pub(crate) realtime_transcript: Vec, pub(crate) cancel_tx: Option>, pub(crate) experimental_raw_events: bool, pub(crate) listener_generation: u64, @@ -93,7 +91,6 @@ impl ThreadState { } self.listener_command_tx = None; self.current_turn_history.reset(); - self.realtime_transcript.clear(); self.listener_thread = None; } @@ -117,34 +114,6 @@ impl ThreadState { self.current_turn_history.reset(); } } - - pub(crate) fn reset_realtime_transcript(&mut self) { - self.realtime_transcript.clear(); - } - - pub(crate) fn append_realtime_transcript_delta( - &mut self, - role: &str, - delta: &str, - ) -> Vec { - if delta.is_empty() { - return self.realtime_transcript.clone(); - } - - if let Some(last_entry) = self.realtime_transcript.last_mut() - && last_entry.role == role - { - last_entry.text.push_str(delta); - return self.realtime_transcript.clone(); - } - - self.realtime_transcript - .push(ThreadRealtimeTranscriptEntry { - role: role.to_string(), - text: delta.to_string(), - }); - self.realtime_transcript.clone() - } } struct ThreadEntry { diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 016076bcb9..9b6f119056 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -213,16 +213,10 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { assert_eq!(second_transcript_update.thread_id, output_audio.thread_id); assert_eq!( second_transcript_update.transcript, - vec![ - ThreadRealtimeTranscriptEntry { - role: "user".to_string(), - text: "delegate now".to_string(), - }, - ThreadRealtimeTranscriptEntry { - role: "assistant".to_string(), - text: "working".to_string(), - }, - ] + vec![ThreadRealtimeTranscriptEntry { + role: "assistant".to_string(), + text: "working".to_string(), + }] ); let realtime_error =