Compare commits

...

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
86c8044e87 Use realtime transcript for handoff context 2026-03-09 13:34:20 -07:00
15 changed files with 596 additions and 156 deletions

View File

@@ -4483,6 +4483,32 @@
"title": "SessionUpdatedRealtimeEvent",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"InputTranscriptDelta": {
"$ref": "#/definitions/RealtimeTranscriptDelta"
}
},
"required": [
"InputTranscriptDelta"
],
"title": "InputTranscriptDeltaRealtimeEvent",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"OutputTranscriptDelta": {
"$ref": "#/definitions/RealtimeTranscriptDelta"
}
},
"required": [
"OutputTranscriptDelta"
],
"title": "OutputTranscriptDeltaRealtimeEvent",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
@@ -4556,7 +4582,44 @@
}
]
},
"RealtimeHandoffMessage": {
"RealtimeHandoffRequested": {
"properties": {
"active_transcript": {
"items": {
"$ref": "#/definitions/RealtimeTranscriptEntry"
},
"type": "array"
},
"handoff_id": {
"type": "string"
},
"input_transcript": {
"type": "string"
},
"item_id": {
"type": "string"
}
},
"required": [
"active_transcript",
"handoff_id",
"input_transcript",
"item_id"
],
"type": "object"
},
"RealtimeTranscriptDelta": {
"properties": {
"delta": {
"type": "string"
}
},
"required": [
"delta"
],
"type": "object"
},
"RealtimeTranscriptEntry": {
"properties": {
"role": {
"type": "string"
@@ -4571,32 +4634,6 @@
],
"type": "object"
},
"RealtimeHandoffRequested": {
"properties": {
"handoff_id": {
"type": "string"
},
"input_transcript": {
"type": "string"
},
"item_id": {
"type": "string"
},
"messages": {
"items": {
"$ref": "#/definitions/RealtimeHandoffMessage"
},
"type": "array"
}
},
"required": [
"handoff_id",
"input_transcript",
"item_id",
"messages"
],
"type": "object"
},
"ReasoningEffort": {
"description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning",
"enum": [

View File

@@ -6593,6 +6593,32 @@
"title": "SessionUpdatedRealtimeEvent",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"InputTranscriptDelta": {
"$ref": "#/definitions/RealtimeTranscriptDelta"
}
},
"required": [
"InputTranscriptDelta"
],
"title": "InputTranscriptDeltaRealtimeEvent",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"OutputTranscriptDelta": {
"$ref": "#/definitions/RealtimeTranscriptDelta"
}
},
"required": [
"OutputTranscriptDelta"
],
"title": "OutputTranscriptDeltaRealtimeEvent",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
@@ -6666,7 +6692,44 @@
}
]
},
"RealtimeHandoffMessage": {
"RealtimeHandoffRequested": {
"properties": {
"active_transcript": {
"items": {
"$ref": "#/definitions/RealtimeTranscriptEntry"
},
"type": "array"
},
"handoff_id": {
"type": "string"
},
"input_transcript": {
"type": "string"
},
"item_id": {
"type": "string"
}
},
"required": [
"active_transcript",
"handoff_id",
"input_transcript",
"item_id"
],
"type": "object"
},
"RealtimeTranscriptDelta": {
"properties": {
"delta": {
"type": "string"
}
},
"required": [
"delta"
],
"type": "object"
},
"RealtimeTranscriptEntry": {
"properties": {
"role": {
"type": "string"
@@ -6681,32 +6744,6 @@
],
"type": "object"
},
"RealtimeHandoffRequested": {
"properties": {
"handoff_id": {
"type": "string"
},
"input_transcript": {
"type": "string"
},
"item_id": {
"type": "string"
},
"messages": {
"items": {
"$ref": "#/definitions/RealtimeHandoffMessage"
},
"type": "array"
}
},
"required": [
"handoff_id",
"input_transcript",
"item_id",
"messages"
],
"type": "object"
},
"RejectConfig": {
"properties": {
"mcp_elicitations": {

View File

@@ -9363,6 +9363,32 @@
"title": "SessionUpdatedRealtimeEvent",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"InputTranscriptDelta": {
"$ref": "#/definitions/RealtimeTranscriptDelta"
}
},
"required": [
"InputTranscriptDelta"
],
"title": "InputTranscriptDeltaRealtimeEvent",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"OutputTranscriptDelta": {
"$ref": "#/definitions/RealtimeTranscriptDelta"
}
},
"required": [
"OutputTranscriptDelta"
],
"title": "OutputTranscriptDeltaRealtimeEvent",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
@@ -9436,7 +9462,44 @@
}
]
},
"RealtimeHandoffMessage": {
"RealtimeHandoffRequested": {
"properties": {
"active_transcript": {
"items": {
"$ref": "#/definitions/RealtimeTranscriptEntry"
},
"type": "array"
},
"handoff_id": {
"type": "string"
},
"input_transcript": {
"type": "string"
},
"item_id": {
"type": "string"
}
},
"required": [
"active_transcript",
"handoff_id",
"input_transcript",
"item_id"
],
"type": "object"
},
"RealtimeTranscriptDelta": {
"properties": {
"delta": {
"type": "string"
}
},
"required": [
"delta"
],
"type": "object"
},
"RealtimeTranscriptEntry": {
"properties": {
"role": {
"type": "string"
@@ -9451,32 +9514,6 @@
],
"type": "object"
},
"RealtimeHandoffRequested": {
"properties": {
"handoff_id": {
"type": "string"
},
"input_transcript": {
"type": "string"
},
"item_id": {
"type": "string"
},
"messages": {
"items": {
"$ref": "#/definitions/RealtimeHandoffMessage"
},
"type": "array"
}
},
"required": [
"handoff_id",
"input_transcript",
"item_id",
"messages"
],
"type": "object"
},
"ReasoningEffort": {
"description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning",
"enum": [

View File

@@ -3,6 +3,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { RealtimeAudioFrame } from "./RealtimeAudioFrame";
import type { RealtimeHandoffRequested } from "./RealtimeHandoffRequested";
import type { RealtimeTranscriptDelta } from "./RealtimeTranscriptDelta";
import type { JsonValue } from "./serde_json/JsonValue";
export type RealtimeEvent = { "SessionUpdated": { session_id: string, instructions: string | null, } } | { "AudioOut": RealtimeAudioFrame } | { "ConversationItemAdded": JsonValue } | { "ConversationItemDone": { item_id: string, } } | { "HandoffRequested": RealtimeHandoffRequested } | { "Error": string };
export type RealtimeEvent = { "SessionUpdated": { session_id: string, instructions: string | null, } } | { "InputTranscriptDelta": RealtimeTranscriptDelta } | { "OutputTranscriptDelta": RealtimeTranscriptDelta } | { "AudioOut": RealtimeAudioFrame } | { "ConversationItemAdded": JsonValue } | { "ConversationItemDone": { item_id: string, } } | { "HandoffRequested": RealtimeHandoffRequested } | { "Error": string };

View File

@@ -1,6 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { RealtimeHandoffMessage } from "./RealtimeHandoffMessage";
import type { RealtimeTranscriptEntry } from "./RealtimeTranscriptEntry";
export type RealtimeHandoffRequested = { handoff_id: string, item_id: string, input_transcript: string, messages: Array<RealtimeHandoffMessage>, };
export type RealtimeHandoffRequested = { handoff_id: string, item_id: string, input_transcript: string, active_transcript: Array<RealtimeTranscriptEntry>, };

View File

@@ -2,4 +2,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type RealtimeHandoffMessage = { role: string, text: string, };
export type RealtimeTranscriptDelta = { delta: string, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type RealtimeTranscriptEntry = { role: string, text: string, };

View File

@@ -142,8 +142,9 @@ export type { RealtimeConversationClosedEvent } from "./RealtimeConversationClos
export type { RealtimeConversationRealtimeEvent } from "./RealtimeConversationRealtimeEvent";
export type { RealtimeConversationStartedEvent } from "./RealtimeConversationStartedEvent";
export type { RealtimeEvent } from "./RealtimeEvent";
export type { RealtimeHandoffMessage } from "./RealtimeHandoffMessage";
export type { RealtimeHandoffRequested } from "./RealtimeHandoffRequested";
export type { RealtimeTranscriptDelta } from "./RealtimeTranscriptDelta";
export type { RealtimeTranscriptEntry } from "./RealtimeTranscriptEntry";
export type { ReasoningContentDeltaEvent } from "./ReasoningContentDeltaEvent";
export type { ReasoningEffort } from "./ReasoningEffort";
export type { ReasoningItem } from "./ReasoningItem";

View File

@@ -272,6 +272,8 @@ pub(crate) async fn apply_bespoke_event_handling(
if let ApiVersion::V2 = api_version {
match event.payload {
RealtimeEvent::SessionUpdated { .. } => {}
RealtimeEvent::InputTranscriptDelta(_) => {}
RealtimeEvent::OutputTranscriptDelta(_) => {}
RealtimeEvent::AudioOut(audio) => {
let notification = ThreadRealtimeOutputAudioDeltaNotification {
thread_id: conversation_id.to_string(),
@@ -303,7 +305,7 @@ pub(crate) async fn apply_bespoke_event_handling(
"handoff_id": handoff.handoff_id,
"item_id": handoff.item_id,
"input_transcript": handoff.input_transcript,
"messages": handoff.messages,
"active_transcript": handoff.active_transcript,
}),
};
outgoing

View File

@@ -4,6 +4,8 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
@@ -198,9 +200,15 @@ pub struct RealtimeWebsocketWriter {
#[derive(Clone)]
pub struct RealtimeWebsocketEvents {
rx_message: Arc<Mutex<mpsc::UnboundedReceiver<Result<Message, WsError>>>>,
active_transcript: Arc<Mutex<ActiveTranscriptState>>,
is_closed: Arc<AtomicBool>,
}
#[derive(Default)]
struct ActiveTranscriptState {
entries: Vec<RealtimeTranscriptEntry>,
}
impl RealtimeWebsocketConnection {
pub async fn send_audio_frame(&self, frame: RealtimeAudioFrame) -> Result<(), ApiError> {
self.writer.send_audio_frame(frame).await
@@ -249,6 +257,7 @@ impl RealtimeWebsocketConnection {
},
events: RealtimeWebsocketEvents {
rx_message: Arc::new(Mutex::new(rx_message)),
active_transcript: Arc::new(Mutex::new(ActiveTranscriptState::default())),
is_closed,
},
}
@@ -366,7 +375,8 @@ impl RealtimeWebsocketEvents {
match msg {
Message::Text(text) => {
if let Some(event) = parse_realtime_event(&text) {
if let Some(mut event) = parse_realtime_event(&text) {
self.update_active_transcript(&mut event).await;
debug!(?event, "realtime websocket parsed event");
return Ok(Some(event));
}
@@ -390,6 +400,44 @@ impl RealtimeWebsocketEvents {
}
}
}
async fn update_active_transcript(&self, event: &mut RealtimeEvent) {
let mut active_transcript = self.active_transcript.lock().await;
match event {
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta }) => {
append_transcript_delta(&mut active_transcript.entries, "user", delta);
}
RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta }) => {
append_transcript_delta(&mut active_transcript.entries, "assistant", delta);
}
RealtimeEvent::HandoffRequested(handoff) => {
handoff.active_transcript = std::mem::take(&mut active_transcript.entries);
}
RealtimeEvent::SessionUpdated { .. }
| RealtimeEvent::AudioOut(_)
| RealtimeEvent::ConversationItemAdded(_)
| RealtimeEvent::ConversationItemDone { .. }
| RealtimeEvent::Error(_) => {}
}
}
}
fn append_transcript_delta(entries: &mut Vec<RealtimeTranscriptEntry>, role: &str, delta: &str) {
if delta.is_empty() {
return;
}
if let Some(last_entry) = entries.last_mut()
&& last_entry.role == role
{
last_entry.text.push_str(delta);
return;
}
entries.push(RealtimeTranscriptEntry {
role: role.to_string(),
text: delta.to_string(),
});
}
pub struct RealtimeWebsocketClient {
@@ -558,8 +606,9 @@ fn normalize_realtime_path(url: &mut Url) {
#[cfg(test)]
mod tests {
use super::*;
use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffRequested;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
use http::HeaderValue;
use pretty_assertions::assert_eq;
use serde_json::Value;
@@ -644,10 +693,7 @@ mod tests {
"type": "conversation.handoff.requested",
"handoff_id": "handoff_123",
"item_id": "item_123",
"input_transcript": "delegate this",
"messages": [
{"role": "user", "text": "delegate this"}
]
"input_transcript": "delegate this"
})
.to_string();
@@ -657,14 +703,47 @@ mod tests {
handoff_id: "handoff_123".to_string(),
item_id: "item_123".to_string(),
input_transcript: "delegate this".to_string(),
messages: vec![RealtimeHandoffMessage {
role: "user".to_string(),
text: "delegate this".to_string(),
}],
active_transcript: Vec::new(),
}))
);
}
#[test]
fn parse_input_transcript_delta_event() {
let payload = json!({
"type": "conversation.input_transcript.delta",
"delta": "hello "
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str()),
Some(RealtimeEvent::InputTranscriptDelta(
RealtimeTranscriptDelta {
delta: "hello ".to_string(),
}
))
);
}
#[test]
fn parse_output_transcript_delta_event() {
let payload = json!({
"type": "conversation.output_transcript.delta",
"delta": "hi"
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str()),
Some(RealtimeEvent::OutputTranscriptDelta(
RealtimeTranscriptDelta {
delta: "hi".to_string(),
}
))
);
}
#[test]
fn merge_request_headers_matches_http_precedence() {
let mut provider_headers = HeaderMap::new();
@@ -853,13 +932,45 @@ mod tests {
.await
.expect("send audio");
ws.send(Message::Text(
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate "
})
.to_string()
.into(),
))
.await
.expect("send input transcript delta");
ws.send(Message::Text(
json!({
"type": "conversation.input_transcript.delta",
"delta": "now"
})
.to_string()
.into(),
))
.await
.expect("send input transcript delta");
ws.send(Message::Text(
json!({
"type": "conversation.output_transcript.delta",
"delta": "working"
})
.to_string()
.into(),
))
.await
.expect("send output transcript delta");
ws.send(Message::Text(
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_1",
"item_id": "item_2",
"input_transcript": "delegate now",
"messages": [{"role": "user", "text": "delegate now"}]
"input_transcript": "delegate now"
})
.to_string()
.into(),
@@ -945,6 +1056,42 @@ mod tests {
})
);
let input_delta_event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
input_delta_event,
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta {
delta: "delegate ".to_string(),
})
);
let input_delta_event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
input_delta_event,
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta {
delta: "now".to_string(),
})
);
let output_delta_event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
output_delta_event,
RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta {
delta: "working".to_string(),
})
);
let added_event = connection
.next_event()
.await
@@ -956,10 +1103,16 @@ mod tests {
handoff_id: "handoff_1".to_string(),
item_id: "item_2".to_string(),
input_transcript: "delegate now".to_string(),
messages: vec![RealtimeHandoffMessage {
role: "user".to_string(),
text: "delegate now".to_string(),
}],
active_transcript: vec![
RealtimeTranscriptEntry {
role: "user".to_string(),
text: "delegate now".to_string(),
},
RealtimeTranscriptEntry {
role: "assistant".to_string(),
text: "working".to_string(),
},
],
})
);

View File

@@ -1,7 +1,8 @@
pub use codex_protocol::protocol::RealtimeAudioFrame;
pub use codex_protocol::protocol::RealtimeEvent;
pub use codex_protocol::protocol::RealtimeHandoffMessage;
pub use codex_protocol::protocol::RealtimeHandoffRequested;
pub use codex_protocol::protocol::RealtimeTranscriptDelta;
pub use codex_protocol::protocol::RealtimeTranscriptEntry;
use serde::Serialize;
use serde_json::Value;
use tracing::debug;
@@ -135,6 +136,16 @@ pub(super) fn parse_realtime_event(payload: &str) -> Option<RealtimeEvent> {
.and_then(|v| u32::try_from(v).ok()),
}))
}
"conversation.input_transcript.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.output_transcript.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.item.added" => parsed
.get("item")
.cloned()
@@ -159,21 +170,11 @@ pub(super) fn parse_realtime_event(payload: &str) -> Option<RealtimeEvent> {
.get("input_transcript")
.and_then(Value::as_str)
.map(str::to_string)?;
let messages = parsed
.get("messages")
.and_then(Value::as_array)?
.iter()
.filter_map(|message| {
let role = message.get("role").and_then(Value::as_str)?.to_string();
let text = message.get("text").and_then(Value::as_str)?.to_string();
Some(RealtimeHandoffMessage { role, text })
})
.collect();
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id,
item_id,
input_transcript,
messages,
active_transcript: Vec::new(),
}))
}
"error" => parsed

View File

@@ -394,15 +394,17 @@ pub(crate) async fn handle_audio(
}
fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Option<String> {
let messages = handoff
.messages
let active_transcript = handoff
.active_transcript
.iter()
.map(|message| format!("{}: {}", message.role, message.text))
.map(|entry| format!("{}: {}", entry.role, entry.text))
.collect::<Vec<_>>()
.join("\n");
(!messages.is_empty()).then_some(messages).or_else(|| {
(!handoff.input_transcript.is_empty()).then(|| handoff.input_transcript.clone())
})
(!active_transcript.is_empty())
.then_some(active_transcript)
.or_else(|| {
(!handoff.input_transcript.is_empty()).then(|| handoff.input_transcript.clone())
})
}
fn realtime_api_key(
@@ -603,22 +605,22 @@ mod tests {
use super::RealtimeHandoffState;
use super::realtime_text_from_handoff_request;
use async_channel::bounded;
use codex_protocol::protocol::RealtimeHandoffMessage;
use codex_protocol::protocol::RealtimeHandoffRequested;
use codex_protocol::protocol::RealtimeTranscriptEntry;
use pretty_assertions::assert_eq;
#[test]
fn extracts_text_from_handoff_request_messages() {
fn extracts_text_from_handoff_request_active_transcript() {
let handoff = RealtimeHandoffRequested {
handoff_id: "handoff_1".to_string(),
item_id: "item_1".to_string(),
input_transcript: "ignored".to_string(),
messages: vec![
RealtimeHandoffMessage {
active_transcript: vec![
RealtimeTranscriptEntry {
role: "user".to_string(),
text: "hello".to_string(),
},
RealtimeHandoffMessage {
RealtimeTranscriptEntry {
role: "assistant".to_string(),
text: "hi there".to_string(),
},
@@ -636,7 +638,7 @@ mod tests {
handoff_id: "handoff_1".to_string(),
item_id: "item_1".to_string(),
input_transcript: "ignored".to_string(),
messages: vec![],
active_transcript: vec![],
};
assert_eq!(
realtime_text_from_handoff_request(&handoff),
@@ -650,7 +652,7 @@ mod tests {
handoff_id: "handoff_1".to_string(),
item_id: "item_1".to_string(),
input_transcript: String::new(),
messages: vec![],
active_transcript: vec![],
};
assert_eq!(realtime_text_from_handoff_request(&handoff), None);
}

View File

@@ -970,12 +970,15 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Re
"type": "session.updated",
"session": { "id": "sess_1", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate hello"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_1",
"item_id": "item_1",
"input_transcript": "delegate hello",
"messages": [{ "role": "user", "text": "delegate hello" }]
"input_transcript": "delegate hello"
}),
],
vec![],
@@ -1089,12 +1092,15 @@ async fn conversation_handoff_persists_across_item_done_until_turn_complete() ->
"type": "session.updated",
"session": { "id": "sess_item_done", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate now"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_item_done",
"item_id": "item_item_done",
"input_transcript": "delegate now",
"messages": [{ "role": "user", "text": "delegate now" }]
"input_transcript": "delegate now"
}),
],
vec![json!({
@@ -1229,12 +1235,15 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
"type": "session.updated",
"session": { "id": "sess_inbound", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "text from realtime"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_inbound",
"item_id": "item_inbound",
"input_transcript": "text from realtime",
"messages": [{ "role": "user", "text": "text from realtime" }]
"input_transcript": "text from realtime"
}),
]]])
.await;
@@ -1293,7 +1302,7 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_handoff_request_uses_all_messages() -> Result<()> {
async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
@@ -1312,16 +1321,23 @@ async fn inbound_handoff_request_uses_all_messages() -> Result<()> {
"type": "session.updated",
"session": { "id": "sess_inbound_multi", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.output_transcript.delta",
"delta": "assistant context"
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegated query"
}),
json!({
"type": "conversation.output_transcript.delta",
"delta": "assist confirm"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_inbound_multi",
"item_id": "item_inbound_multi",
"input_transcript": "ignored",
"messages": [
{ "role": "assistant", "text": "assistant context" },
{ "role": "user", "text": "delegated query" },
{ "role": "assistant", "text": "assist confirm" },
]
"input_transcript": "ignored"
}),
]]])
.await;
@@ -1363,6 +1379,131 @@ async fn inbound_handoff_request_uses_all_messages() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let response_mock = responses::mount_sse_sequence(
&api_server,
vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "first ok"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_response_created("resp-2"),
responses::ev_assistant_message("msg-2", "second ok"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
let realtime_server = start_websocket_server(vec![vec![
vec![
json!({
"type": "session.updated",
"session": { "id": "sess_inbound_clear", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "first question"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_inbound_clear_1",
"item_id": "item_inbound_clear_1",
"input_transcript": "first question"
}),
],
vec![],
vec![
json!({
"type": "conversation.input_transcript.delta",
"delta": "second question"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_inbound_clear_2",
"item_id": "item_inbound_clear_2",
"input_transcript": "second question"
}),
],
]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _ = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
},
}))
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = response_mock.requests();
assert_eq!(requests.len(), 2);
let first_user_texts = requests[0].message_input_texts("user");
assert!(
first_user_texts
.iter()
.any(|text| text == "user: first question")
);
let second_user_texts = requests[1].message_input_texts("user");
assert!(
second_user_texts
.iter()
.any(|text| text == "user: second question")
);
assert!(
!second_user_texts
.iter()
.any(|text| text == "user: first question\nuser: second question")
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -1473,12 +1614,15 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
"type": "session.updated",
"session": { "id": "sess_echo_guard", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate now"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_echo_guard",
"item_id": "item_echo_guard",
"input_transcript": "delegate now",
"messages": [{"role": "user", "text": "delegate now"}]
"input_transcript": "delegate now"
}),
],
vec![
@@ -1621,12 +1765,15 @@ async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> R
"type": "session.updated",
"session": { "id": "sess_non_blocking", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegate now"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_non_blocking",
"item_id": "item_non_blocking",
"input_transcript": "delegate now",
"messages": [{"role": "user", "text": "delegate now"}]
"input_transcript": "delegate now"
}),
json!({
"type": "conversation.output_audio.delta",
@@ -1748,13 +1895,18 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
"type": "session.updated",
"session": { "id": "sess_steer", "instructions": "backend prompt" }
})],
vec![json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_steer",
"item_id": "item_steer",
"input_transcript": "steer via realtime",
"messages": [{ "role": "user", "text": "steer via realtime" }]
})],
vec![
json!({
"type": "conversation.input_transcript.delta",
"delta": "steer via realtime"
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_steer",
"item_id": "item_steer",
"input_transcript": "steer via realtime"
}),
],
]])
.await;
@@ -1879,12 +2031,15 @@ async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio()
"type": "session.updated",
"session": { "id": "sess_handoff_request", "instructions": "backend prompt" }
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": delegated_text
}),
json!({
"type": "conversation.handoff.requested",
"handoff_id": "handoff_audio",
"item_id": "item_audio",
"input_transcript": delegated_text,
"messages": [{ "role": "user", "text": delegated_text }]
"input_transcript": delegated_text
}),
json!({
"type": "conversation.output_audio.delta",

View File

@@ -132,7 +132,12 @@ pub struct RealtimeAudioFrame {
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
pub struct RealtimeHandoffMessage {
pub struct RealtimeTranscriptDelta {
pub delta: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
pub struct RealtimeTranscriptEntry {
pub role: String,
pub text: String,
}
@@ -142,7 +147,7 @@ pub struct RealtimeHandoffRequested {
pub handoff_id: String,
pub item_id: String,
pub input_transcript: String,
pub messages: Vec<RealtimeHandoffMessage>,
pub active_transcript: Vec<RealtimeTranscriptEntry>,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
@@ -151,6 +156,8 @@ pub enum RealtimeEvent {
session_id: String,
instructions: Option<String>,
},
InputTranscriptDelta(RealtimeTranscriptDelta),
OutputTranscriptDelta(RealtimeTranscriptDelta),
AudioOut(RealtimeAudioFrame),
ConversationItemAdded(Value),
ConversationItemDone {

View File

@@ -264,6 +264,8 @@ impl ChatWidget {
RealtimeEvent::SessionUpdated { session_id, .. } => {
self.realtime_conversation.session_id = Some(session_id);
}
RealtimeEvent::InputTranscriptDelta(_) => {}
RealtimeEvent::OutputTranscriptDelta(_) => {}
RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame),
RealtimeEvent::ConversationItemAdded(_item) => {}
RealtimeEvent::ConversationItemDone { .. } => {}