[stack 1/4] Split realtime websocket methods by version (#14828)

## Stack Position
1/4. Base PR in the realtime stack.

## Base
- `main`

## Unblocks
- #14830

## Scope
- Split the realtime websocket request builders into `common`, `v1`, and
`v2` modules.
- Keep runtime behavior unchanged in this PR.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-03-16 16:00:59 -07:00
committed by GitHub
parent a3ba10b44b
commit 6f05d8d735
5 changed files with 250 additions and 118 deletions

View File

@@ -1,7 +1,8 @@
use crate::endpoint::realtime_websocket::protocol::ConversationFunctionCallOutputItem;
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload;
use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem;
use crate::endpoint::realtime_websocket::methods_common::conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_common::conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_common::normalized_session_mode;
use crate::endpoint::realtime_websocket::methods_common::session_update_session;
use crate::endpoint::realtime_websocket::methods_common::websocket_intent;
use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
@@ -10,13 +11,6 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice;
use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
use crate::endpoint::realtime_websocket::protocol::parse_realtime_event;
use crate::error::ApiError;
use crate::provider::Provider;
@@ -26,7 +20,6 @@ use futures::SinkExt;
use futures::StreamExt;
use http::HeaderMap;
use http::HeaderValue;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -47,22 +40,6 @@ use tracing::trace;
use tungstenite::protocol::WebSocketConfig;
use url::Url;
const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
const REALTIME_V1_SESSION_TYPE: &str = "quicksilver";
const REALTIME_V2_SESSION_TYPE: &str = "realtime";
const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex";
const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result.";
fn normalized_session_mode(
event_parser: RealtimeEventParser,
session_mode: RealtimeSessionMode,
) -> RealtimeSessionMode {
match event_parser {
RealtimeEventParser::V1 => RealtimeSessionMode::Conversational,
RealtimeEventParser::RealtimeV2 => session_mode,
}
}
struct WsStream {
tx_command: mpsc::Sender<WsCommand>,
pump_task: tokio::task::JoinHandle<()>,
@@ -300,21 +277,8 @@ impl RealtimeWebsocketWriter {
}
pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> {
let content_kind = match self.event_parser {
RealtimeEventParser::V1 => "text",
RealtimeEventParser::RealtimeV2 => "input_text",
};
self.send_json(RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::Message(ConversationMessageItem {
kind: "message".to_string(),
role: "user".to_string(),
content: vec![ConversationItemContent {
kind: content_kind.to_string(),
text,
}],
}),
})
.await
self.send_json(conversation_item_create_message(self.event_parser, text))
.await
}
pub async fn send_conversation_handoff_append(
@@ -322,23 +286,12 @@ impl RealtimeWebsocketWriter {
handoff_id: String,
output_text: String,
) -> Result<(), ApiError> {
let message = match self.event_parser {
RealtimeEventParser::V1 => RealtimeOutboundMessage::ConversationHandoffAppend {
handoff_id,
output_text,
},
RealtimeEventParser::RealtimeV2 => RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::FunctionCallOutput(
ConversationFunctionCallOutputItem {
kind: "function_call_output".to_string(),
call_id: handoff_id,
output: output_text,
},
),
},
};
self.send_json(message).await
self.send_json(conversation_handoff_append_message(
self.event_parser,
handoff_id,
output_text,
))
.await
}
pub async fn send_session_update(
@@ -347,60 +300,9 @@ impl RealtimeWebsocketWriter {
session_mode: RealtimeSessionMode,
) -> Result<(), ApiError> {
let session_mode = normalized_session_mode(self.event_parser, session_mode);
let (session_kind, session_instructions, output_audio) = match session_mode {
RealtimeSessionMode::Conversational => {
let kind = match self.event_parser {
RealtimeEventParser::V1 => REALTIME_V1_SESSION_TYPE.to_string(),
RealtimeEventParser::RealtimeV2 => REALTIME_V2_SESSION_TYPE.to_string(),
};
let voice = match self.event_parser {
RealtimeEventParser::V1 => SessionAudioVoice::Fathom,
RealtimeEventParser::RealtimeV2 => SessionAudioVoice::Alloy,
};
(kind, Some(instructions), Some(SessionAudioOutput { voice }))
}
RealtimeSessionMode::Transcription => ("transcription".to_string(), None, None),
};
let tools = match (self.event_parser, session_mode) {
(RealtimeEventParser::RealtimeV2, RealtimeSessionMode::Conversational) => {
Some(vec![SessionFunctionTool {
kind: "function".to_string(),
name: REALTIME_V2_CODEX_TOOL_NAME.to_string(),
description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(),
parameters: json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Prompt text for the delegated Codex task."
}
},
"required": ["prompt"],
"additionalProperties": false
}),
}])
}
(RealtimeEventParser::RealtimeV2, RealtimeSessionMode::Transcription)
| (RealtimeEventParser::V1, RealtimeSessionMode::Conversational)
| (RealtimeEventParser::V1, RealtimeSessionMode::Transcription) => None,
};
self.send_json(RealtimeOutboundMessage::SessionUpdate {
session: SessionUpdateSession {
kind: session_kind,
instructions: session_instructions,
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
kind: "audio/pcm".to_string(),
rate: REALTIME_AUDIO_SAMPLE_RATE,
},
},
output: output_audio,
},
tools,
},
})
.await
let session = session_update_session(self.event_parser, instructions, session_mode);
self.send_json(RealtimeOutboundMessage::SessionUpdate { session })
.await
}
pub async fn close(&self) -> Result<(), ApiError> {
@@ -655,10 +557,7 @@ fn websocket_url_from_api_url(
}
}
let intent = match event_parser {
RealtimeEventParser::V1 => Some("quicksilver"),
RealtimeEventParser::RealtimeV2 => None,
};
let intent = websocket_intent(event_parser);
let has_extra_query_params = query_params.is_some_and(|query_params| {
query_params
.iter()

View File

@@ -0,0 +1,67 @@
use crate::endpoint::realtime_websocket::methods_v1::conversation_handoff_append_message as v1_conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_v1::conversation_item_create_message as v1_conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_v1::session_update_session as v1_session_update_session;
use crate::endpoint::realtime_websocket::methods_v1::websocket_intent as v1_websocket_intent;
use crate::endpoint::realtime_websocket::methods_v2::conversation_handoff_append_message as v2_conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_v2::conversation_item_create_message as v2_conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_v2::session_update_session as v2_session_update_session;
use crate::endpoint::realtime_websocket::methods_v2::websocket_intent as v2_websocket_intent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
pub(super) const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
pub(super) const REALTIME_AUDIO_FORMAT: &str = "audio/pcm";
pub(super) fn normalized_session_mode(
event_parser: RealtimeEventParser,
session_mode: RealtimeSessionMode,
) -> RealtimeSessionMode {
match event_parser {
RealtimeEventParser::V1 => RealtimeSessionMode::Conversational,
RealtimeEventParser::RealtimeV2 => session_mode,
}
}
pub(super) fn conversation_item_create_message(
event_parser: RealtimeEventParser,
text: String,
) -> RealtimeOutboundMessage {
match event_parser {
RealtimeEventParser::V1 => v1_conversation_item_create_message(text),
RealtimeEventParser::RealtimeV2 => v2_conversation_item_create_message(text),
}
}
pub(super) fn conversation_handoff_append_message(
event_parser: RealtimeEventParser,
handoff_id: String,
output_text: String,
) -> RealtimeOutboundMessage {
match event_parser {
RealtimeEventParser::V1 => v1_conversation_handoff_append_message(handoff_id, output_text),
RealtimeEventParser::RealtimeV2 => {
v2_conversation_handoff_append_message(handoff_id, output_text)
}
}
}
pub(super) fn session_update_session(
event_parser: RealtimeEventParser,
instructions: String,
session_mode: RealtimeSessionMode,
) -> SessionUpdateSession {
let session_mode = normalized_session_mode(event_parser, session_mode);
match event_parser {
RealtimeEventParser::V1 => v1_session_update_session(instructions),
RealtimeEventParser::RealtimeV2 => v2_session_update_session(instructions, session_mode),
}
}
pub(super) fn websocket_intent(event_parser: RealtimeEventParser) -> Option<&'static str> {
match event_parser {
RealtimeEventParser::V1 => v1_websocket_intent(),
RealtimeEventParser::RealtimeV2 => v2_websocket_intent(),
}
}

View File

@@ -0,0 +1,60 @@
use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_FORMAT;
use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_SAMPLE_RATE;
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload;
use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
const REALTIME_V1_SESSION_TYPE: &str = "quicksilver";
pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage {
RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::Message(ConversationMessageItem {
kind: "message".to_string(),
role: "user".to_string(),
content: vec![ConversationItemContent {
kind: "text".to_string(),
text,
}],
}),
}
}
pub(super) fn conversation_handoff_append_message(
handoff_id: String,
output_text: String,
) -> RealtimeOutboundMessage {
RealtimeOutboundMessage::ConversationHandoffAppend {
handoff_id,
output_text,
}
}
pub(super) fn session_update_session(instructions: String) -> SessionUpdateSession {
SessionUpdateSession {
kind: REALTIME_V1_SESSION_TYPE.to_string(),
instructions: Some(instructions),
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
kind: REALTIME_AUDIO_FORMAT.to_string(),
rate: REALTIME_AUDIO_SAMPLE_RATE,
},
},
output: Some(SessionAudioOutput {
voice: SessionAudioVoice::Fathom,
}),
},
tools: None,
}
}
pub(super) fn websocket_intent() -> Option<&'static str> {
Some("quicksilver")
}

View File

@@ -0,0 +1,103 @@
use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_FORMAT;
use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_SAMPLE_RATE;
use crate::endpoint::realtime_websocket::protocol::ConversationFunctionCallOutputItem;
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload;
use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice;
use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
use serde_json::json;
const REALTIME_V2_SESSION_TYPE: &str = "realtime";
const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex";
const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result.";
pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage {
RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::Message(ConversationMessageItem {
kind: "message".to_string(),
role: "user".to_string(),
content: vec![ConversationItemContent {
kind: "input_text".to_string(),
text,
}],
}),
}
}
pub(super) fn conversation_handoff_append_message(
handoff_id: String,
output_text: String,
) -> RealtimeOutboundMessage {
RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::FunctionCallOutput(ConversationFunctionCallOutputItem {
kind: "function_call_output".to_string(),
call_id: handoff_id,
output: output_text,
}),
}
}
pub(super) fn session_update_session(
instructions: String,
session_mode: RealtimeSessionMode,
) -> SessionUpdateSession {
match session_mode {
RealtimeSessionMode::Conversational => SessionUpdateSession {
kind: REALTIME_V2_SESSION_TYPE.to_string(),
instructions: Some(instructions),
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
kind: REALTIME_AUDIO_FORMAT.to_string(),
rate: REALTIME_AUDIO_SAMPLE_RATE,
},
},
output: Some(SessionAudioOutput {
voice: SessionAudioVoice::Alloy,
}),
},
tools: Some(vec![SessionFunctionTool {
kind: "function".to_string(),
name: REALTIME_V2_CODEX_TOOL_NAME.to_string(),
description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(),
parameters: json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Prompt text for the delegated Codex task."
}
},
"required": ["prompt"],
"additionalProperties": false
}),
}]),
},
RealtimeSessionMode::Transcription => SessionUpdateSession {
kind: "transcription".to_string(),
instructions: None,
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
kind: REALTIME_AUDIO_FORMAT.to_string(),
rate: REALTIME_AUDIO_SAMPLE_RATE,
},
},
output: None,
},
tools: None,
},
}
}
pub(super) fn websocket_intent() -> Option<&'static str> {
None
}

View File

@@ -1,4 +1,7 @@
pub mod methods;
mod methods_common;
mod methods_v1;
mod methods_v2;
pub mod protocol;
mod protocol_common;
mod protocol_v1;