[stack 2/4] Align main realtime v2 wire and runtime flow (#14830)

## Stack Position
2/4. Built on top of #14828.

## Base
- #14828

## Unblocks
- #14829
- #14827

## Scope
- Port the realtime v2 wire parsing, session, app-server, and
conversation runtime behavior onto the split websocket-method base.
- Branch runtime behavior directly on the current realtime session kind
instead of parser-derived flow flags.
- Keep regression coverage in the existing e2e suites.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-03-16 21:38:07 -07:00
committed by GitHub
parent 1d85fe79ed
commit fbd7f9b986
28 changed files with 807 additions and 140 deletions

View File

@@ -272,12 +272,12 @@ impl RealtimeWebsocketConnection {
impl RealtimeWebsocketWriter {
pub async fn send_audio_frame(&self, frame: RealtimeAudioFrame) -> Result<(), ApiError> {
self.send_json(RealtimeOutboundMessage::InputAudioBufferAppend { audio: frame.data })
self.send_json(&RealtimeOutboundMessage::InputAudioBufferAppend { audio: frame.data })
.await
}
pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> {
self.send_json(conversation_item_create_message(self.event_parser, text))
self.send_json(&conversation_item_create_message(self.event_parser, text))
.await
}
@@ -286,7 +286,7 @@ impl RealtimeWebsocketWriter {
handoff_id: String,
output_text: String,
) -> Result<(), ApiError> {
self.send_json(conversation_handoff_append_message(
self.send_json(&conversation_handoff_append_message(
self.event_parser,
handoff_id,
output_text,
@@ -294,6 +294,11 @@ impl RealtimeWebsocketWriter {
.await
}
pub async fn send_response_create(&self) -> Result<(), ApiError> {
self.send_json(&RealtimeOutboundMessage::ResponseCreate)
.await
}
pub async fn send_session_update(
&self,
instructions: String,
@@ -301,7 +306,7 @@ impl RealtimeWebsocketWriter {
) -> Result<(), ApiError> {
let session_mode = normalized_session_mode(self.event_parser, session_mode);
let session = session_update_session(self.event_parser, instructions, session_mode);
self.send_json(RealtimeOutboundMessage::SessionUpdate { session })
self.send_json(&RealtimeOutboundMessage::SessionUpdate { session })
.await
}
@@ -319,11 +324,14 @@ impl RealtimeWebsocketWriter {
Ok(())
}
async fn send_json(&self, message: RealtimeOutboundMessage) -> Result<(), ApiError> {
let payload = serde_json::to_string(&message)
async fn send_json(&self, message: &RealtimeOutboundMessage) -> Result<(), ApiError> {
let payload = serde_json::to_string(message)
.map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?;
debug!(?message, "realtime websocket request");
self.send_payload(payload).await
}
pub async fn send_payload(&self, payload: String) -> Result<(), ApiError> {
if self.is_closed.load(Ordering::SeqCst) {
return Err(ApiError::Stream(
"realtime websocket connection is closed".to_string(),
@@ -392,6 +400,7 @@ impl RealtimeWebsocketEvents {
async fn update_active_transcript(&self, event: &mut RealtimeEvent) {
let mut active_transcript = self.active_transcript.lock().await;
match event {
RealtimeEvent::InputAudioSpeechStarted(_) => {}
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta }) => {
append_transcript_delta(&mut active_transcript.entries, "user", delta);
}
@@ -403,6 +412,7 @@ impl RealtimeWebsocketEvents {
}
RealtimeEvent::SessionUpdated { .. }
| RealtimeEvent::AudioOut(_)
| RealtimeEvent::ResponseCancelled(_)
| RealtimeEvent::ConversationItemAdded(_)
| RealtimeEvent::ConversationItemDone { .. }
| RealtimeEvent::Error(_) => {}
@@ -616,6 +626,8 @@ mod tests {
use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffRequested;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
use codex_protocol::protocol::RealtimeInputAudioSpeechStarted;
use codex_protocol::protocol::RealtimeResponseCancelled;
use http::HeaderValue;
use pretty_assertions::assert_eq;
use serde_json::Value;
@@ -660,6 +672,7 @@ mod tests {
sample_rate: 48000,
num_channels: 1,
samples_per_channel: Some(960),
item_id: None,
}))
);
}
@@ -809,10 +822,112 @@ mod tests {
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: None,
item_id: None,
}))
);
}
#[test]
fn parse_realtime_v2_response_audio_delta_with_item_id() {
let payload = json!({
"type": "response.audio.delta",
"delta": "AQID",
"item_id": "item_audio_1"
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: None,
item_id: Some("item_audio_1".to_string()),
}))
);
}
#[test]
fn parse_realtime_v2_speech_started_event() {
let payload = json!({
"type": "input_audio_buffer.speech_started",
"item_id": "item_input_1"
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
Some(RealtimeEvent::InputAudioSpeechStarted(
RealtimeInputAudioSpeechStarted {
item_id: Some("item_input_1".to_string()),
}
))
);
}
#[test]
fn parse_realtime_v2_response_cancelled_event() {
let payload = json!({
"type": "response.cancelled",
"response": {"id": "resp_cancelled_1"}
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
Some(RealtimeEvent::ResponseCancelled(
RealtimeResponseCancelled {
response_id: Some("resp_cancelled_1".to_string()),
}
))
);
}
#[test]
fn parse_realtime_v2_response_done_handoff_event() {
let payload = json!({
"type": "response.done",
"response": {
"output": [{
"id": "item_123",
"type": "function_call",
"name": "codex",
"call_id": "call_123",
"arguments": "{\"prompt\":\"delegate from done\"}"
}]
}
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: "call_123".to_string(),
item_id: "item_123".to_string(),
input_transcript: "delegate from done".to_string(),
active_transcript: Vec::new(),
}))
);
}
#[test]
fn parse_realtime_v2_response_created_event() {
let payload = json!({
"type": "response.created",
"response": {"id": "resp_created_1"}
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
Some(RealtimeEvent::ConversationItemAdded(json!({
"type": "response.created",
"response": {"id": "resp_created_1"}
})))
);
}
#[test]
fn merge_request_headers_matches_http_precedence() {
let mut provider_headers = HeaderMap::new();
@@ -1169,6 +1284,7 @@ mod tests {
sample_rate: 48000,
num_channels: 1,
samples_per_channel: Some(960),
item_id: None,
})
.await
.expect("send audio");
@@ -1196,6 +1312,7 @@ mod tests {
sample_rate: 48000,
num_channels: 1,
samples_per_channel: None,
item_id: None,
})
);
@@ -1285,9 +1402,38 @@ mod tests {
first_json["session"]["type"],
Value::String("realtime".to_string())
);
assert_eq!(first_json["session"]["output_modalities"], json!(["audio"]));
assert_eq!(
first_json["session"]["audio"]["input"]["format"],
json!({
"type": "audio/pcm",
"rate": 24_000,
})
);
assert_eq!(
first_json["session"]["audio"]["input"]["noise_reduction"],
json!({
"type": "near_field",
})
);
assert_eq!(
first_json["session"]["audio"]["input"]["turn_detection"],
json!({
"type": "server_vad",
"interrupt_response": true,
"create_response": true,
})
);
assert_eq!(
first_json["session"]["audio"]["output"]["format"],
json!({
"type": "audio/pcm",
"rate": 24_000,
})
);
assert_eq!(
first_json["session"]["audio"]["output"]["voice"],
Value::String("alloy".to_string())
Value::String("marin".to_string())
);
assert_eq!(
first_json["session"]["tools"][0]["type"],
@@ -1301,6 +1447,10 @@ mod tests {
first_json["session"]["tools"][0]["parameters"]["required"],
json!(["prompt"])
);
assert_eq!(
first_json["session"]["tool_choice"],
Value::String("auto".to_string())
);
ws.send(Message::Text(
json!({
@@ -1511,6 +1661,7 @@ mod tests {
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: Some(480),
item_id: None,
})
.await
.expect("send audio");
@@ -1690,6 +1841,7 @@ mod tests {
sample_rate: 48000,
num_channels: 1,
samples_per_channel: Some(960),
item_id: None,
}),
)
.await