mirror of
https://github.com/openai/codex.git
synced 2026-03-10 00:23:20 +00:00
Compare commits
1 Commits
codex-cli-
...
dev/handof
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
86c8044e87 |
@@ -4483,6 +4483,32 @@
|
||||
"title": "SessionUpdatedRealtimeEvent",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"InputTranscriptDelta": {
|
||||
"$ref": "#/definitions/RealtimeTranscriptDelta"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"InputTranscriptDelta"
|
||||
],
|
||||
"title": "InputTranscriptDeltaRealtimeEvent",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"OutputTranscriptDelta": {
|
||||
"$ref": "#/definitions/RealtimeTranscriptDelta"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"OutputTranscriptDelta"
|
||||
],
|
||||
"title": "OutputTranscriptDeltaRealtimeEvent",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
@@ -4556,7 +4582,44 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"RealtimeHandoffMessage": {
|
||||
"RealtimeHandoffRequested": {
|
||||
"properties": {
|
||||
"active_transcript": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/RealtimeTranscriptEntry"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"handoff_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"input_transcript": {
|
||||
"type": "string"
|
||||
},
|
||||
"item_id": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"active_transcript",
|
||||
"handoff_id",
|
||||
"input_transcript",
|
||||
"item_id"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeTranscriptDelta": {
|
||||
"properties": {
|
||||
"delta": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"delta"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeTranscriptEntry": {
|
||||
"properties": {
|
||||
"role": {
|
||||
"type": "string"
|
||||
@@ -4571,32 +4634,6 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeHandoffRequested": {
|
||||
"properties": {
|
||||
"handoff_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"input_transcript": {
|
||||
"type": "string"
|
||||
},
|
||||
"item_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"messages": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/RealtimeHandoffMessage"
|
||||
},
|
||||
"type": "array"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"handoff_id",
|
||||
"input_transcript",
|
||||
"item_id",
|
||||
"messages"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ReasoningEffort": {
|
||||
"description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning",
|
||||
"enum": [
|
||||
|
||||
@@ -6593,6 +6593,32 @@
|
||||
"title": "SessionUpdatedRealtimeEvent",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"InputTranscriptDelta": {
|
||||
"$ref": "#/definitions/RealtimeTranscriptDelta"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"InputTranscriptDelta"
|
||||
],
|
||||
"title": "InputTranscriptDeltaRealtimeEvent",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"OutputTranscriptDelta": {
|
||||
"$ref": "#/definitions/RealtimeTranscriptDelta"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"OutputTranscriptDelta"
|
||||
],
|
||||
"title": "OutputTranscriptDeltaRealtimeEvent",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
@@ -6666,7 +6692,44 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"RealtimeHandoffMessage": {
|
||||
"RealtimeHandoffRequested": {
|
||||
"properties": {
|
||||
"active_transcript": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/RealtimeTranscriptEntry"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"handoff_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"input_transcript": {
|
||||
"type": "string"
|
||||
},
|
||||
"item_id": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"active_transcript",
|
||||
"handoff_id",
|
||||
"input_transcript",
|
||||
"item_id"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeTranscriptDelta": {
|
||||
"properties": {
|
||||
"delta": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"delta"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeTranscriptEntry": {
|
||||
"properties": {
|
||||
"role": {
|
||||
"type": "string"
|
||||
@@ -6681,32 +6744,6 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeHandoffRequested": {
|
||||
"properties": {
|
||||
"handoff_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"input_transcript": {
|
||||
"type": "string"
|
||||
},
|
||||
"item_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"messages": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/RealtimeHandoffMessage"
|
||||
},
|
||||
"type": "array"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"handoff_id",
|
||||
"input_transcript",
|
||||
"item_id",
|
||||
"messages"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RejectConfig": {
|
||||
"properties": {
|
||||
"mcp_elicitations": {
|
||||
|
||||
@@ -9363,6 +9363,32 @@
|
||||
"title": "SessionUpdatedRealtimeEvent",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"InputTranscriptDelta": {
|
||||
"$ref": "#/definitions/RealtimeTranscriptDelta"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"InputTranscriptDelta"
|
||||
],
|
||||
"title": "InputTranscriptDeltaRealtimeEvent",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"OutputTranscriptDelta": {
|
||||
"$ref": "#/definitions/RealtimeTranscriptDelta"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"OutputTranscriptDelta"
|
||||
],
|
||||
"title": "OutputTranscriptDeltaRealtimeEvent",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
@@ -9436,7 +9462,44 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"RealtimeHandoffMessage": {
|
||||
"RealtimeHandoffRequested": {
|
||||
"properties": {
|
||||
"active_transcript": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/RealtimeTranscriptEntry"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"handoff_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"input_transcript": {
|
||||
"type": "string"
|
||||
},
|
||||
"item_id": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"active_transcript",
|
||||
"handoff_id",
|
||||
"input_transcript",
|
||||
"item_id"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeTranscriptDelta": {
|
||||
"properties": {
|
||||
"delta": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"delta"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeTranscriptEntry": {
|
||||
"properties": {
|
||||
"role": {
|
||||
"type": "string"
|
||||
@@ -9451,32 +9514,6 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeHandoffRequested": {
|
||||
"properties": {
|
||||
"handoff_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"input_transcript": {
|
||||
"type": "string"
|
||||
},
|
||||
"item_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"messages": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/RealtimeHandoffMessage"
|
||||
},
|
||||
"type": "array"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"handoff_id",
|
||||
"input_transcript",
|
||||
"item_id",
|
||||
"messages"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ReasoningEffort": {
|
||||
"description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning",
|
||||
"enum": [
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { RealtimeAudioFrame } from "./RealtimeAudioFrame";
|
||||
import type { RealtimeHandoffRequested } from "./RealtimeHandoffRequested";
|
||||
import type { RealtimeTranscriptDelta } from "./RealtimeTranscriptDelta";
|
||||
import type { JsonValue } from "./serde_json/JsonValue";
|
||||
|
||||
export type RealtimeEvent = { "SessionUpdated": { session_id: string, instructions: string | null, } } | { "AudioOut": RealtimeAudioFrame } | { "ConversationItemAdded": JsonValue } | { "ConversationItemDone": { item_id: string, } } | { "HandoffRequested": RealtimeHandoffRequested } | { "Error": string };
|
||||
export type RealtimeEvent = { "SessionUpdated": { session_id: string, instructions: string | null, } } | { "InputTranscriptDelta": RealtimeTranscriptDelta } | { "OutputTranscriptDelta": RealtimeTranscriptDelta } | { "AudioOut": RealtimeAudioFrame } | { "ConversationItemAdded": JsonValue } | { "ConversationItemDone": { item_id: string, } } | { "HandoffRequested": RealtimeHandoffRequested } | { "Error": string };
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { RealtimeHandoffMessage } from "./RealtimeHandoffMessage";
|
||||
import type { RealtimeTranscriptEntry } from "./RealtimeTranscriptEntry";
|
||||
|
||||
export type RealtimeHandoffRequested = { handoff_id: string, item_id: string, input_transcript: string, messages: Array<RealtimeHandoffMessage>, };
|
||||
export type RealtimeHandoffRequested = { handoff_id: string, item_id: string, input_transcript: string, active_transcript: Array<RealtimeTranscriptEntry>, };
|
||||
|
||||
@@ -2,4 +2,4 @@
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
export type RealtimeHandoffMessage = { role: string, text: string, };
|
||||
export type RealtimeTranscriptDelta = { delta: string, };
|
||||
@@ -0,0 +1,5 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
export type RealtimeTranscriptEntry = { role: string, text: string, };
|
||||
@@ -142,8 +142,9 @@ export type { RealtimeConversationClosedEvent } from "./RealtimeConversationClos
|
||||
export type { RealtimeConversationRealtimeEvent } from "./RealtimeConversationRealtimeEvent";
|
||||
export type { RealtimeConversationStartedEvent } from "./RealtimeConversationStartedEvent";
|
||||
export type { RealtimeEvent } from "./RealtimeEvent";
|
||||
export type { RealtimeHandoffMessage } from "./RealtimeHandoffMessage";
|
||||
export type { RealtimeHandoffRequested } from "./RealtimeHandoffRequested";
|
||||
export type { RealtimeTranscriptDelta } from "./RealtimeTranscriptDelta";
|
||||
export type { RealtimeTranscriptEntry } from "./RealtimeTranscriptEntry";
|
||||
export type { ReasoningContentDeltaEvent } from "./ReasoningContentDeltaEvent";
|
||||
export type { ReasoningEffort } from "./ReasoningEffort";
|
||||
export type { ReasoningItem } from "./ReasoningItem";
|
||||
|
||||
@@ -272,6 +272,8 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
if let ApiVersion::V2 = api_version {
|
||||
match event.payload {
|
||||
RealtimeEvent::SessionUpdated { .. } => {}
|
||||
RealtimeEvent::InputTranscriptDelta(_) => {}
|
||||
RealtimeEvent::OutputTranscriptDelta(_) => {}
|
||||
RealtimeEvent::AudioOut(audio) => {
|
||||
let notification = ThreadRealtimeOutputAudioDeltaNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -303,7 +305,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
"handoff_id": handoff.handoff_id,
|
||||
"item_id": handoff.item_id,
|
||||
"input_transcript": handoff.input_transcript,
|
||||
"messages": handoff.messages,
|
||||
"active_transcript": handoff.active_transcript,
|
||||
}),
|
||||
};
|
||||
outgoing
|
||||
|
||||
@@ -4,6 +4,8 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
|
||||
@@ -198,9 +200,15 @@ pub struct RealtimeWebsocketWriter {
|
||||
#[derive(Clone)]
|
||||
pub struct RealtimeWebsocketEvents {
|
||||
rx_message: Arc<Mutex<mpsc::UnboundedReceiver<Result<Message, WsError>>>>,
|
||||
active_transcript: Arc<Mutex<ActiveTranscriptState>>,
|
||||
is_closed: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ActiveTranscriptState {
|
||||
entries: Vec<RealtimeTranscriptEntry>,
|
||||
}
|
||||
|
||||
impl RealtimeWebsocketConnection {
|
||||
pub async fn send_audio_frame(&self, frame: RealtimeAudioFrame) -> Result<(), ApiError> {
|
||||
self.writer.send_audio_frame(frame).await
|
||||
@@ -249,6 +257,7 @@ impl RealtimeWebsocketConnection {
|
||||
},
|
||||
events: RealtimeWebsocketEvents {
|
||||
rx_message: Arc::new(Mutex::new(rx_message)),
|
||||
active_transcript: Arc::new(Mutex::new(ActiveTranscriptState::default())),
|
||||
is_closed,
|
||||
},
|
||||
}
|
||||
@@ -366,7 +375,8 @@ impl RealtimeWebsocketEvents {
|
||||
|
||||
match msg {
|
||||
Message::Text(text) => {
|
||||
if let Some(event) = parse_realtime_event(&text) {
|
||||
if let Some(mut event) = parse_realtime_event(&text) {
|
||||
self.update_active_transcript(&mut event).await;
|
||||
debug!(?event, "realtime websocket parsed event");
|
||||
return Ok(Some(event));
|
||||
}
|
||||
@@ -390,6 +400,44 @@ impl RealtimeWebsocketEvents {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_active_transcript(&self, event: &mut RealtimeEvent) {
|
||||
let mut active_transcript = self.active_transcript.lock().await;
|
||||
match event {
|
||||
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta }) => {
|
||||
append_transcript_delta(&mut active_transcript.entries, "user", delta);
|
||||
}
|
||||
RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta }) => {
|
||||
append_transcript_delta(&mut active_transcript.entries, "assistant", delta);
|
||||
}
|
||||
RealtimeEvent::HandoffRequested(handoff) => {
|
||||
handoff.active_transcript = std::mem::take(&mut active_transcript.entries);
|
||||
}
|
||||
RealtimeEvent::SessionUpdated { .. }
|
||||
| RealtimeEvent::AudioOut(_)
|
||||
| RealtimeEvent::ConversationItemAdded(_)
|
||||
| RealtimeEvent::ConversationItemDone { .. }
|
||||
| RealtimeEvent::Error(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn append_transcript_delta(entries: &mut Vec<RealtimeTranscriptEntry>, role: &str, delta: &str) {
|
||||
if delta.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(last_entry) = entries.last_mut()
|
||||
&& last_entry.role == role
|
||||
{
|
||||
last_entry.text.push_str(delta);
|
||||
return;
|
||||
}
|
||||
|
||||
entries.push(RealtimeTranscriptEntry {
|
||||
role: role.to_string(),
|
||||
text: delta.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
pub struct RealtimeWebsocketClient {
|
||||
@@ -558,8 +606,9 @@ fn normalize_realtime_path(url: &mut Url) {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffMessage;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffRequested;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
|
||||
use http::HeaderValue;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
@@ -644,10 +693,7 @@ mod tests {
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_123",
|
||||
"item_id": "item_123",
|
||||
"input_transcript": "delegate this",
|
||||
"messages": [
|
||||
{"role": "user", "text": "delegate this"}
|
||||
]
|
||||
"input_transcript": "delegate this"
|
||||
})
|
||||
.to_string();
|
||||
|
||||
@@ -657,14 +703,47 @@ mod tests {
|
||||
handoff_id: "handoff_123".to_string(),
|
||||
item_id: "item_123".to_string(),
|
||||
input_transcript: "delegate this".to_string(),
|
||||
messages: vec![RealtimeHandoffMessage {
|
||||
role: "user".to_string(),
|
||||
text: "delegate this".to_string(),
|
||||
}],
|
||||
active_transcript: Vec::new(),
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_input_transcript_delta_event() {
|
||||
let payload = json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "hello "
|
||||
})
|
||||
.to_string();
|
||||
|
||||
assert_eq!(
|
||||
parse_realtime_event(payload.as_str()),
|
||||
Some(RealtimeEvent::InputTranscriptDelta(
|
||||
RealtimeTranscriptDelta {
|
||||
delta: "hello ".to_string(),
|
||||
}
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_output_transcript_delta_event() {
|
||||
let payload = json!({
|
||||
"type": "conversation.output_transcript.delta",
|
||||
"delta": "hi"
|
||||
})
|
||||
.to_string();
|
||||
|
||||
assert_eq!(
|
||||
parse_realtime_event(payload.as_str()),
|
||||
Some(RealtimeEvent::OutputTranscriptDelta(
|
||||
RealtimeTranscriptDelta {
|
||||
delta: "hi".to_string(),
|
||||
}
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_request_headers_matches_http_precedence() {
|
||||
let mut provider_headers = HeaderMap::new();
|
||||
@@ -853,13 +932,45 @@ mod tests {
|
||||
.await
|
||||
.expect("send audio");
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "delegate "
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("send input transcript delta");
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "now"
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("send input transcript delta");
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "conversation.output_transcript.delta",
|
||||
"delta": "working"
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("send output transcript delta");
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_1",
|
||||
"item_id": "item_2",
|
||||
"input_transcript": "delegate now",
|
||||
"messages": [{"role": "user", "text": "delegate now"}]
|
||||
"input_transcript": "delegate now"
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
@@ -945,6 +1056,42 @@ mod tests {
|
||||
})
|
||||
);
|
||||
|
||||
let input_delta_event = connection
|
||||
.next_event()
|
||||
.await
|
||||
.expect("next event")
|
||||
.expect("event");
|
||||
assert_eq!(
|
||||
input_delta_event,
|
||||
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta {
|
||||
delta: "delegate ".to_string(),
|
||||
})
|
||||
);
|
||||
|
||||
let input_delta_event = connection
|
||||
.next_event()
|
||||
.await
|
||||
.expect("next event")
|
||||
.expect("event");
|
||||
assert_eq!(
|
||||
input_delta_event,
|
||||
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta {
|
||||
delta: "now".to_string(),
|
||||
})
|
||||
);
|
||||
|
||||
let output_delta_event = connection
|
||||
.next_event()
|
||||
.await
|
||||
.expect("next event")
|
||||
.expect("event");
|
||||
assert_eq!(
|
||||
output_delta_event,
|
||||
RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta {
|
||||
delta: "working".to_string(),
|
||||
})
|
||||
);
|
||||
|
||||
let added_event = connection
|
||||
.next_event()
|
||||
.await
|
||||
@@ -956,10 +1103,16 @@ mod tests {
|
||||
handoff_id: "handoff_1".to_string(),
|
||||
item_id: "item_2".to_string(),
|
||||
input_transcript: "delegate now".to_string(),
|
||||
messages: vec![RealtimeHandoffMessage {
|
||||
role: "user".to_string(),
|
||||
text: "delegate now".to_string(),
|
||||
}],
|
||||
active_transcript: vec![
|
||||
RealtimeTranscriptEntry {
|
||||
role: "user".to_string(),
|
||||
text: "delegate now".to_string(),
|
||||
},
|
||||
RealtimeTranscriptEntry {
|
||||
role: "assistant".to_string(),
|
||||
text: "working".to_string(),
|
||||
},
|
||||
],
|
||||
})
|
||||
);
|
||||
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
pub use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
pub use codex_protocol::protocol::RealtimeEvent;
|
||||
pub use codex_protocol::protocol::RealtimeHandoffMessage;
|
||||
pub use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
pub use codex_protocol::protocol::RealtimeTranscriptDelta;
|
||||
pub use codex_protocol::protocol::RealtimeTranscriptEntry;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use tracing::debug;
|
||||
@@ -135,6 +136,16 @@ pub(super) fn parse_realtime_event(payload: &str) -> Option<RealtimeEvent> {
|
||||
.and_then(|v| u32::try_from(v).ok()),
|
||||
}))
|
||||
}
|
||||
"conversation.input_transcript.delta" => parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
|
||||
"conversation.output_transcript.delta" => parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })),
|
||||
"conversation.item.added" => parsed
|
||||
.get("item")
|
||||
.cloned()
|
||||
@@ -159,21 +170,11 @@ pub(super) fn parse_realtime_event(payload: &str) -> Option<RealtimeEvent> {
|
||||
.get("input_transcript")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
let messages = parsed
|
||||
.get("messages")
|
||||
.and_then(Value::as_array)?
|
||||
.iter()
|
||||
.filter_map(|message| {
|
||||
let role = message.get("role").and_then(Value::as_str)?.to_string();
|
||||
let text = message.get("text").and_then(Value::as_str)?.to_string();
|
||||
Some(RealtimeHandoffMessage { role, text })
|
||||
})
|
||||
.collect();
|
||||
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
|
||||
handoff_id,
|
||||
item_id,
|
||||
input_transcript,
|
||||
messages,
|
||||
active_transcript: Vec::new(),
|
||||
}))
|
||||
}
|
||||
"error" => parsed
|
||||
|
||||
@@ -394,15 +394,17 @@ pub(crate) async fn handle_audio(
|
||||
}
|
||||
|
||||
fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Option<String> {
|
||||
let messages = handoff
|
||||
.messages
|
||||
let active_transcript = handoff
|
||||
.active_transcript
|
||||
.iter()
|
||||
.map(|message| format!("{}: {}", message.role, message.text))
|
||||
.map(|entry| format!("{}: {}", entry.role, entry.text))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
(!messages.is_empty()).then_some(messages).or_else(|| {
|
||||
(!handoff.input_transcript.is_empty()).then(|| handoff.input_transcript.clone())
|
||||
})
|
||||
(!active_transcript.is_empty())
|
||||
.then_some(active_transcript)
|
||||
.or_else(|| {
|
||||
(!handoff.input_transcript.is_empty()).then(|| handoff.input_transcript.clone())
|
||||
})
|
||||
}
|
||||
|
||||
fn realtime_api_key(
|
||||
@@ -603,22 +605,22 @@ mod tests {
|
||||
use super::RealtimeHandoffState;
|
||||
use super::realtime_text_from_handoff_request;
|
||||
use async_channel::bounded;
|
||||
use codex_protocol::protocol::RealtimeHandoffMessage;
|
||||
use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use codex_protocol::protocol::RealtimeTranscriptEntry;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn extracts_text_from_handoff_request_messages() {
|
||||
fn extracts_text_from_handoff_request_active_transcript() {
|
||||
let handoff = RealtimeHandoffRequested {
|
||||
handoff_id: "handoff_1".to_string(),
|
||||
item_id: "item_1".to_string(),
|
||||
input_transcript: "ignored".to_string(),
|
||||
messages: vec![
|
||||
RealtimeHandoffMessage {
|
||||
active_transcript: vec![
|
||||
RealtimeTranscriptEntry {
|
||||
role: "user".to_string(),
|
||||
text: "hello".to_string(),
|
||||
},
|
||||
RealtimeHandoffMessage {
|
||||
RealtimeTranscriptEntry {
|
||||
role: "assistant".to_string(),
|
||||
text: "hi there".to_string(),
|
||||
},
|
||||
@@ -636,7 +638,7 @@ mod tests {
|
||||
handoff_id: "handoff_1".to_string(),
|
||||
item_id: "item_1".to_string(),
|
||||
input_transcript: "ignored".to_string(),
|
||||
messages: vec![],
|
||||
active_transcript: vec![],
|
||||
};
|
||||
assert_eq!(
|
||||
realtime_text_from_handoff_request(&handoff),
|
||||
@@ -650,7 +652,7 @@ mod tests {
|
||||
handoff_id: "handoff_1".to_string(),
|
||||
item_id: "item_1".to_string(),
|
||||
input_transcript: String::new(),
|
||||
messages: vec![],
|
||||
active_transcript: vec![],
|
||||
};
|
||||
assert_eq!(realtime_text_from_handoff_request(&handoff), None);
|
||||
}
|
||||
|
||||
@@ -970,12 +970,15 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Re
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_1", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "delegate hello"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_1",
|
||||
"item_id": "item_1",
|
||||
"input_transcript": "delegate hello",
|
||||
"messages": [{ "role": "user", "text": "delegate hello" }]
|
||||
"input_transcript": "delegate hello"
|
||||
}),
|
||||
],
|
||||
vec![],
|
||||
@@ -1089,12 +1092,15 @@ async fn conversation_handoff_persists_across_item_done_until_turn_complete() ->
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_item_done", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "delegate now"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_item_done",
|
||||
"item_id": "item_item_done",
|
||||
"input_transcript": "delegate now",
|
||||
"messages": [{ "role": "user", "text": "delegate now" }]
|
||||
"input_transcript": "delegate now"
|
||||
}),
|
||||
],
|
||||
vec![json!({
|
||||
@@ -1229,12 +1235,15 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_inbound", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "text from realtime"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_inbound",
|
||||
"item_id": "item_inbound",
|
||||
"input_transcript": "text from realtime",
|
||||
"messages": [{ "role": "user", "text": "text from realtime" }]
|
||||
"input_transcript": "text from realtime"
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
@@ -1293,7 +1302,7 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_handoff_request_uses_all_messages() -> Result<()> {
|
||||
async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
@@ -1312,16 +1321,23 @@ async fn inbound_handoff_request_uses_all_messages() -> Result<()> {
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_inbound_multi", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.output_transcript.delta",
|
||||
"delta": "assistant context"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "delegated query"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.output_transcript.delta",
|
||||
"delta": "assist confirm"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_inbound_multi",
|
||||
"item_id": "item_inbound_multi",
|
||||
"input_transcript": "ignored",
|
||||
"messages": [
|
||||
{ "role": "assistant", "text": "assistant context" },
|
||||
{ "role": "user", "text": "delegated query" },
|
||||
{ "role": "assistant", "text": "assist confirm" },
|
||||
]
|
||||
"input_transcript": "ignored"
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
@@ -1363,6 +1379,131 @@ async fn inbound_handoff_request_uses_all_messages() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
let response_mock = responses::mount_sse_sequence(
|
||||
&api_server,
|
||||
vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "first ok"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_assistant_message("msg-2", "second ok"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_inbound_clear", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "first question"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_inbound_clear_1",
|
||||
"item_id": "item_inbound_clear_1",
|
||||
"input_transcript": "first question"
|
||||
}),
|
||||
],
|
||||
vec![],
|
||||
vec![
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "second question"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_inbound_clear_2",
|
||||
"item_id": "item_inbound_clear_2",
|
||||
"input_transcript": "second question"
|
||||
}),
|
||||
],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
|
||||
frame: RealtimeAudioFrame {
|
||||
data: "AQID".to_string(),
|
||||
sample_rate: 24000,
|
||||
num_channels: 1,
|
||||
samples_per_channel: Some(480),
|
||||
},
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let requests = response_mock.requests();
|
||||
assert_eq!(requests.len(), 2);
|
||||
|
||||
let first_user_texts = requests[0].message_input_texts("user");
|
||||
assert!(
|
||||
first_user_texts
|
||||
.iter()
|
||||
.any(|text| text == "user: first question")
|
||||
);
|
||||
|
||||
let second_user_texts = requests[1].message_input_texts("user");
|
||||
assert!(
|
||||
second_user_texts
|
||||
.iter()
|
||||
.any(|text| text == "user: second question")
|
||||
);
|
||||
assert!(
|
||||
!second_user_texts
|
||||
.iter()
|
||||
.any(|text| text == "user: first question\nuser: second question")
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -1473,12 +1614,15 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_echo_guard", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "delegate now"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_echo_guard",
|
||||
"item_id": "item_echo_guard",
|
||||
"input_transcript": "delegate now",
|
||||
"messages": [{"role": "user", "text": "delegate now"}]
|
||||
"input_transcript": "delegate now"
|
||||
}),
|
||||
],
|
||||
vec![
|
||||
@@ -1621,12 +1765,15 @@ async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> R
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_non_blocking", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "delegate now"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_non_blocking",
|
||||
"item_id": "item_non_blocking",
|
||||
"input_transcript": "delegate now",
|
||||
"messages": [{"role": "user", "text": "delegate now"}]
|
||||
"input_transcript": "delegate now"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.output_audio.delta",
|
||||
@@ -1748,13 +1895,18 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_steer", "instructions": "backend prompt" }
|
||||
})],
|
||||
vec![json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_steer",
|
||||
"item_id": "item_steer",
|
||||
"input_transcript": "steer via realtime",
|
||||
"messages": [{ "role": "user", "text": "steer via realtime" }]
|
||||
})],
|
||||
vec![
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "steer via realtime"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_steer",
|
||||
"item_id": "item_steer",
|
||||
"input_transcript": "steer via realtime"
|
||||
}),
|
||||
],
|
||||
]])
|
||||
.await;
|
||||
|
||||
@@ -1879,12 +2031,15 @@ async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio()
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_handoff_request", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": delegated_text
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_audio",
|
||||
"item_id": "item_audio",
|
||||
"input_transcript": delegated_text,
|
||||
"messages": [{ "role": "user", "text": delegated_text }]
|
||||
"input_transcript": delegated_text
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.output_audio.delta",
|
||||
|
||||
@@ -132,7 +132,12 @@ pub struct RealtimeAudioFrame {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct RealtimeHandoffMessage {
|
||||
pub struct RealtimeTranscriptDelta {
|
||||
pub delta: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct RealtimeTranscriptEntry {
|
||||
pub role: String,
|
||||
pub text: String,
|
||||
}
|
||||
@@ -142,7 +147,7 @@ pub struct RealtimeHandoffRequested {
|
||||
pub handoff_id: String,
|
||||
pub item_id: String,
|
||||
pub input_transcript: String,
|
||||
pub messages: Vec<RealtimeHandoffMessage>,
|
||||
pub active_transcript: Vec<RealtimeTranscriptEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
@@ -151,6 +156,8 @@ pub enum RealtimeEvent {
|
||||
session_id: String,
|
||||
instructions: Option<String>,
|
||||
},
|
||||
InputTranscriptDelta(RealtimeTranscriptDelta),
|
||||
OutputTranscriptDelta(RealtimeTranscriptDelta),
|
||||
AudioOut(RealtimeAudioFrame),
|
||||
ConversationItemAdded(Value),
|
||||
ConversationItemDone {
|
||||
|
||||
@@ -264,6 +264,8 @@ impl ChatWidget {
|
||||
RealtimeEvent::SessionUpdated { session_id, .. } => {
|
||||
self.realtime_conversation.session_id = Some(session_id);
|
||||
}
|
||||
RealtimeEvent::InputTranscriptDelta(_) => {}
|
||||
RealtimeEvent::OutputTranscriptDelta(_) => {}
|
||||
RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame),
|
||||
RealtimeEvent::ConversationItemAdded(_item) => {}
|
||||
RealtimeEvent::ConversationItemDone { .. } => {}
|
||||
|
||||
Reference in New Issue
Block a user