Compare commits

...

4 Commits

Author SHA1 Message Date
root
cbe0a9592f codex: bootstrap flaky test triage
Co-authored-by: Codex <noreply@openai.com>
2026-03-13 06:59:01 +00:00
Ahmed Ibrahim
c7e847aaeb Add diagnostics for read_only_unless_trusted timeout flake (#14518)
## Summary
- add targeted diagnostic logging for the
read_only_unless_trusted_requires_approval scenarios in
approval_matrix_covers_all_modes
- add a scoped timeout buffer only for ro_unless_trusted write-file
scenarios: 1000ms -> 2000ms
- keep all other write-file scenarios at 1000ms

## Why
The last two main failures were both in codex-core::all
suite::approvals::approval_matrix_covers_all_modes with exit_code=124 in
the same scenario. This points to execution-time jitter in CI rather
than a semantic approval-policy mismatch.

## Notes
- This does not introduce any >5s timeout and does not
disable/quarantine tests.
- The timeout increase is tightly scoped to the single flaky path and
keeps the matrix deterministic under CI scheduling variance.
2026-03-12 23:51:03 -07:00
Ahmed Ibrahim
2253a9d1d7 Add realtime transcription mode for websocket sessions (#14556)
- add experimental_realtime_ws_mode (conversational/transcription) and
plumb it into realtime conversation session config
- switch realtime websocket intent and session.update payload shape
based on mode
- update config schema and realtime/config tests

---------

Co-authored-by: Codex <noreply@openai.com>
2026-03-12 23:50:30 -07:00
Ahmed Ibrahim
eaf81d3f6f Add codex tool support for realtime v2 handoff (#14554)
- Advertise a `codex` function tool in realtime v2 session updates.
- Emit handoff replies as `function_call_output` items while keeping v1
behavior unchanged.
- Split realtime event parsing into explicit v1/v2 modules with shared
common helpers.

---------

Co-authored-by: Codex <noreply@openai.com>
2026-03-12 23:30:02 -07:00
13 changed files with 933 additions and 312 deletions

View File

@@ -1,16 +1,20 @@
use crate::endpoint::realtime_websocket::protocol::ConversationItem;
use crate::endpoint::realtime_websocket::protocol::ConversationFunctionCallOutputItem;
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload;
use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem;
use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput;
use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
use crate::endpoint::realtime_websocket::protocol::parse_realtime_event;
use crate::error::ApiError;
@@ -21,6 +25,7 @@ use futures::SinkExt;
use futures::StreamExt;
use http::HeaderMap;
use http::HeaderValue;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -41,6 +46,23 @@ use tracing::trace;
use tungstenite::protocol::WebSocketConfig;
use url::Url;
const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
const REALTIME_AUDIO_VOICE: &str = "fathom";
const REALTIME_V1_SESSION_TYPE: &str = "quicksilver";
const REALTIME_V2_SESSION_TYPE: &str = "realtime";
const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex";
const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result.";
fn normalized_session_mode(
event_parser: RealtimeEventParser,
session_mode: RealtimeSessionMode,
) -> RealtimeSessionMode {
match event_parser {
RealtimeEventParser::V1 => RealtimeSessionMode::Conversational,
RealtimeEventParser::RealtimeV2 => session_mode,
}
}
struct WsStream {
tx_command: mpsc::Sender<WsCommand>,
pump_task: tokio::task::JoinHandle<()>,
@@ -197,6 +219,7 @@ pub struct RealtimeWebsocketConnection {
pub struct RealtimeWebsocketWriter {
stream: Arc<WsStream>,
is_closed: Arc<AtomicBool>,
event_parser: RealtimeEventParser,
}
#[derive(Clone)]
@@ -258,6 +281,7 @@ impl RealtimeWebsocketConnection {
writer: RealtimeWebsocketWriter {
stream: Arc::clone(&stream),
is_closed: Arc::clone(&is_closed),
event_parser,
},
events: RealtimeWebsocketEvents {
rx_message: Arc::new(Mutex::new(rx_message)),
@@ -276,15 +300,19 @@ impl RealtimeWebsocketWriter {
}
pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> {
let content_kind = match self.event_parser {
RealtimeEventParser::V1 => "text",
RealtimeEventParser::RealtimeV2 => "input_text",
};
self.send_json(RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItem {
item: ConversationItemPayload::Message(ConversationMessageItem {
kind: "message".to_string(),
role: "user".to_string(),
content: vec![ConversationItemContent {
kind: "text".to_string(),
kind: content_kind.to_string(),
text,
}],
},
}),
})
.await
}
@@ -294,29 +322,80 @@ impl RealtimeWebsocketWriter {
handoff_id: String,
output_text: String,
) -> Result<(), ApiError> {
self.send_json(RealtimeOutboundMessage::ConversationHandoffAppend {
handoff_id,
output_text,
})
.await
let message = match self.event_parser {
RealtimeEventParser::V1 => RealtimeOutboundMessage::ConversationHandoffAppend {
handoff_id,
output_text,
},
RealtimeEventParser::RealtimeV2 => RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::FunctionCallOutput(
ConversationFunctionCallOutputItem {
kind: "function_call_output".to_string(),
call_id: handoff_id,
output: output_text,
},
),
},
};
self.send_json(message).await
}
pub async fn send_session_update(&self, instructions: String) -> Result<(), ApiError> {
pub async fn send_session_update(
&self,
instructions: String,
session_mode: RealtimeSessionMode,
) -> Result<(), ApiError> {
let session_mode = normalized_session_mode(self.event_parser, session_mode);
let (session_kind, session_instructions, output_audio) = match session_mode {
RealtimeSessionMode::Conversational => {
let kind = match self.event_parser {
RealtimeEventParser::V1 => REALTIME_V1_SESSION_TYPE.to_string(),
RealtimeEventParser::RealtimeV2 => REALTIME_V2_SESSION_TYPE.to_string(),
};
(
kind,
Some(instructions),
Some(SessionAudioOutput {
voice: REALTIME_AUDIO_VOICE.to_string(),
}),
)
}
RealtimeSessionMode::Transcription => ("transcription".to_string(), None, None),
};
let tools = match self.event_parser {
RealtimeEventParser::RealtimeV2 => Some(vec![SessionFunctionTool {
kind: "function".to_string(),
name: REALTIME_V2_CODEX_TOOL_NAME.to_string(),
description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(),
parameters: json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Prompt text for the delegated Codex task."
}
},
"required": ["prompt"],
"additionalProperties": false
}),
}]),
RealtimeEventParser::V1 => None,
};
self.send_json(RealtimeOutboundMessage::SessionUpdate {
session: SessionUpdateSession {
kind: "quicksilver".to_string(),
instructions,
kind: session_kind,
instructions: session_instructions,
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
kind: "audio/pcm".to_string(),
rate: 24_000,
rate: REALTIME_AUDIO_SAMPLE_RATE,
},
},
output: SessionAudioOutput {
voice: "fathom".to_string(),
},
output: output_audio,
},
tools,
},
})
.await
@@ -465,6 +544,8 @@ impl RealtimeWebsocketClient {
self.provider.base_url.as_str(),
self.provider.query_params.as_ref(),
config.model.as_deref(),
config.event_parser,
config.session_mode,
)?;
let mut request = ws_url
@@ -506,7 +587,7 @@ impl RealtimeWebsocketClient {
);
connection
.writer
.send_session_update(config.instructions)
.send_session_update(config.instructions, config.session_mode)
.await?;
Ok(connection)
}
@@ -551,6 +632,8 @@ fn websocket_url_from_api_url(
api_url: &str,
query_params: Option<&HashMap<String, String>>,
model: Option<&str>,
event_parser: RealtimeEventParser,
_session_mode: RealtimeSessionMode,
) -> Result<Url, ApiError> {
let mut url = Url::parse(api_url)
.map_err(|err| ApiError::Stream(format!("failed to parse realtime api_url: {err}")))?;
@@ -570,9 +653,20 @@ fn websocket_url_from_api_url(
}
}
{
let intent = match event_parser {
RealtimeEventParser::V1 => Some("quicksilver"),
RealtimeEventParser::RealtimeV2 => None,
};
let has_extra_query_params = query_params.is_some_and(|query_params| {
query_params
.iter()
.any(|(key, _)| key != "intent" && !(key == "model" && model.is_some()))
});
if intent.is_some() || model.is_some() || has_extra_query_params {
let mut query = url.query_pairs_mut();
query.append_pair("intent", "quicksilver");
if let Some(intent) = intent {
query.append_pair("intent", intent);
}
if let Some(model) = model {
query.append_pair("model", model);
}
@@ -853,8 +947,14 @@ mod tests {
#[test]
fn websocket_url_from_http_base_defaults_to_ws_path() {
let url =
websocket_url_from_api_url("http://127.0.0.1:8011", None, None).expect("build ws url");
let url = websocket_url_from_api_url(
"http://127.0.0.1:8011",
None,
None,
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"ws://127.0.0.1:8011/v1/realtime?intent=quicksilver"
@@ -863,9 +963,14 @@ mod tests {
#[test]
fn websocket_url_from_ws_base_defaults_to_ws_path() {
let url =
websocket_url_from_api_url("wss://example.com", None, Some("realtime-test-model"))
.expect("build ws url");
let url = websocket_url_from_api_url(
"wss://example.com",
None,
Some("realtime-test-model"),
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://example.com/v1/realtime?intent=quicksilver&model=realtime-test-model"
@@ -874,8 +979,14 @@ mod tests {
#[test]
fn websocket_url_from_v1_base_appends_realtime_path() {
let url = websocket_url_from_api_url("https://api.openai.com/v1", None, Some("snapshot"))
.expect("build ws url");
let url = websocket_url_from_api_url(
"https://api.openai.com/v1",
None,
Some("snapshot"),
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://api.openai.com/v1/realtime?intent=quicksilver&model=snapshot"
@@ -884,9 +995,14 @@ mod tests {
#[test]
fn websocket_url_from_nested_v1_base_appends_realtime_path() {
let url =
websocket_url_from_api_url("https://example.com/openai/v1", None, Some("snapshot"))
.expect("build ws url");
let url = websocket_url_from_api_url(
"https://example.com/openai/v1",
None,
Some("snapshot"),
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://example.com/openai/v1/realtime?intent=quicksilver&model=snapshot"
@@ -902,6 +1018,8 @@ mod tests {
("intent".to_string(), "ignored".to_string()),
])),
Some("snapshot"),
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
@@ -910,6 +1028,54 @@ mod tests {
);
}
#[test]
fn websocket_url_v1_ignores_transcription_mode() {
let url = websocket_url_from_api_url(
"https://example.com",
None,
None,
RealtimeEventParser::V1,
RealtimeSessionMode::Transcription,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://example.com/v1/realtime?intent=quicksilver"
);
}
#[test]
fn websocket_url_omits_intent_for_realtime_v2_conversational_mode() {
let url = websocket_url_from_api_url(
"https://example.com/v1/realtime?foo=bar",
Some(&HashMap::from([
("trace".to_string(), "1".to_string()),
("intent".to_string(), "ignored".to_string()),
])),
Some("snapshot"),
RealtimeEventParser::RealtimeV2,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://example.com/v1/realtime?foo=bar&model=snapshot&trace=1"
);
}
#[test]
fn websocket_url_omits_intent_for_realtime_v2_transcription_mode() {
let url = websocket_url_from_api_url(
"https://example.com",
None,
None,
RealtimeEventParser::RealtimeV2,
RealtimeSessionMode::Transcription,
)
.expect("build ws url");
assert_eq!(url.as_str(), "wss://example.com/v1/realtime");
}
#[tokio::test]
async fn e2e_connect_and_exchange_events_against_mock_ws_server() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
@@ -1075,6 +1241,7 @@ mod tests {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
@@ -1195,6 +1362,352 @@ mod tests {
server.await.expect("server task");
}
#[tokio::test]
async fn realtime_v2_session_update_includes_codex_tool_and_handoff_output_item() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local addr");
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let mut ws = accept_async(stream).await.expect("accept ws");
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
assert_eq!(
first_json["session"]["type"],
Value::String("realtime".to_string())
);
assert_eq!(
first_json["session"]["tools"][0]["type"],
Value::String("function".to_string())
);
assert_eq!(
first_json["session"]["tools"][0]["name"],
Value::String("codex".to_string())
);
assert_eq!(
first_json["session"]["tools"][0]["parameters"]["required"],
json!(["prompt"])
);
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_v2", "instructions": "backend prompt"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
let second = ws
.next()
.await
.expect("second msg")
.expect("second msg ok")
.into_text()
.expect("text");
let second_json: Value = serde_json::from_str(&second).expect("json");
assert_eq!(second_json["type"], "conversation.item.create");
assert_eq!(
second_json["item"]["type"],
Value::String("message".to_string())
);
assert_eq!(
second_json["item"]["content"][0]["type"],
Value::String("input_text".to_string())
);
assert_eq!(
second_json["item"]["content"][0]["text"],
Value::String("delegate this".to_string())
);
let third = ws
.next()
.await
.expect("third msg")
.expect("third msg ok")
.into_text()
.expect("text");
let third_json: Value = serde_json::from_str(&third).expect("json");
assert_eq!(third_json["type"], "conversation.item.create");
assert_eq!(
third_json["item"]["type"],
Value::String("function_call_output".to_string())
);
assert_eq!(
third_json["item"]["call_id"],
Value::String("call_1".to_string())
);
assert_eq!(
third_json["item"]["output"],
Value::String("delegated result".to_string())
);
});
let provider = Provider {
name: "test".to_string(),
base_url: format!("http://{addr}"),
query_params: Some(HashMap::new()),
headers: HeaderMap::new(),
retry: crate::provider::RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: false,
retry_transport: false,
},
stream_idle_timeout: Duration::from_secs(5),
};
let client = RealtimeWebsocketClient::new(provider);
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let created = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
created,
RealtimeEvent::SessionUpdated {
session_id: "sess_v2".to_string(),
instructions: Some("backend prompt".to_string()),
}
);
connection
.send_conversation_item_create("delegate this".to_string())
.await
.expect("send text item");
connection
.send_conversation_handoff_append("call_1".to_string(), "delegated result".to_string())
.await
.expect("send handoff output");
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn transcription_mode_session_update_omits_output_audio_and_instructions() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local addr");
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let mut ws = accept_async(stream).await.expect("accept ws");
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
assert_eq!(
first_json["session"]["type"],
Value::String("transcription".to_string())
);
assert!(first_json["session"].get("instructions").is_none());
assert!(first_json["session"]["audio"].get("output").is_none());
assert_eq!(
first_json["session"]["tools"][0]["name"],
Value::String("codex".to_string())
);
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_transcription"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
let second = ws
.next()
.await
.expect("second msg")
.expect("second msg ok")
.into_text()
.expect("text");
let second_json: Value = serde_json::from_str(&second).expect("json");
assert_eq!(second_json["type"], "input_audio_buffer.append");
});
let provider = Provider {
name: "test".to_string(),
base_url: format!("http://{addr}"),
query_params: Some(HashMap::new()),
headers: HeaderMap::new(),
retry: crate::provider::RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: false,
retry_transport: false,
},
stream_idle_timeout: Duration::from_secs(5),
};
let client = RealtimeWebsocketClient::new(provider);
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
session_mode: RealtimeSessionMode::Transcription,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let created = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
created,
RealtimeEvent::SessionUpdated {
session_id: "sess_transcription".to_string(),
instructions: None,
}
);
connection
.send_audio_frame(RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: Some(480),
})
.await
.expect("send audio");
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn v1_transcription_mode_is_treated_as_conversational() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local addr");
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let mut ws = accept_async(stream).await.expect("accept ws");
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
assert_eq!(
first_json["session"]["type"],
Value::String("quicksilver".to_string())
);
assert_eq!(
first_json["session"]["instructions"],
Value::String("backend prompt".to_string())
);
assert_eq!(
first_json["session"]["audio"]["output"]["voice"],
Value::String("fathom".to_string())
);
assert!(first_json["session"].get("tools").is_none());
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_v1_mode"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
});
let provider = Provider {
name: "test".to_string(),
base_url: format!("http://{addr}"),
query_params: Some(HashMap::new()),
headers: HeaderMap::new(),
retry: crate::provider::RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: false,
retry_transport: false,
},
stream_idle_timeout: Duration::from_secs(5),
};
let client = RealtimeWebsocketClient::new(provider);
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Transcription,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let created = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
created,
RealtimeEvent::SessionUpdated {
session_id: "sess_v1_mode".to_string(),
instructions: None,
}
);
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn send_does_not_block_while_next_event_waits_for_inbound_data() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
@@ -1258,6 +1771,7 @@ mod tests {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),

View File

@@ -1,5 +1,7 @@
pub mod methods;
pub mod protocol;
mod protocol_common;
mod protocol_v1;
mod protocol_v2;
pub use codex_protocol::protocol::RealtimeAudioFrame;
@@ -10,3 +12,4 @@ pub use methods::RealtimeWebsocketEvents;
pub use methods::RealtimeWebsocketWriter;
pub use protocol::RealtimeEventParser;
pub use protocol::RealtimeSessionConfig;
pub use protocol::RealtimeSessionMode;

View File

@@ -1,3 +1,4 @@
use crate::endpoint::realtime_websocket::protocol_v1::parse_realtime_event_v1;
use crate::endpoint::realtime_websocket::protocol_v2::parse_realtime_event_v2;
pub use codex_protocol::protocol::RealtimeAudioFrame;
pub use codex_protocol::protocol::RealtimeEvent;
@@ -6,7 +7,6 @@ pub use codex_protocol::protocol::RealtimeTranscriptDelta;
pub use codex_protocol::protocol::RealtimeTranscriptEntry;
use serde::Serialize;
use serde_json::Value;
use tracing::debug;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RealtimeEventParser {
@@ -14,12 +14,19 @@ pub enum RealtimeEventParser {
RealtimeV2,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RealtimeSessionMode {
Conversational,
Transcription,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RealtimeSessionConfig {
pub instructions: String,
pub model: Option<String>,
pub session_id: Option<String>,
pub event_parser: RealtimeEventParser,
pub session_mode: RealtimeSessionMode,
}
#[derive(Debug, Clone, Serialize)]
@@ -35,21 +42,25 @@ pub(super) enum RealtimeOutboundMessage {
#[serde(rename = "session.update")]
SessionUpdate { session: SessionUpdateSession },
#[serde(rename = "conversation.item.create")]
ConversationItemCreate { item: ConversationItem },
ConversationItemCreate { item: ConversationItemPayload },
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct SessionUpdateSession {
#[serde(rename = "type")]
pub(super) kind: String,
pub(super) instructions: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) instructions: Option<String>,
pub(super) audio: SessionAudio,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) tools: Option<Vec<SessionFunctionTool>>,
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct SessionAudio {
pub(super) input: SessionAudioInput,
pub(super) output: SessionAudioOutput,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) output: Option<SessionAudioOutput>,
}
#[derive(Debug, Clone, Serialize)]
@@ -70,13 +81,28 @@ pub(super) struct SessionAudioOutput {
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct ConversationItem {
pub(super) struct ConversationMessageItem {
#[serde(rename = "type")]
pub(super) kind: String,
pub(super) role: String,
pub(super) content: Vec<ConversationItemContent>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub(super) enum ConversationItemPayload {
Message(ConversationMessageItem),
FunctionCallOutput(ConversationFunctionCallOutputItem),
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct ConversationFunctionCallOutputItem {
#[serde(rename = "type")]
pub(super) kind: String,
pub(super) call_id: String,
pub(super) output: String,
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct ConversationItemContent {
#[serde(rename = "type")]
@@ -84,6 +110,15 @@ pub(super) struct ConversationItemContent {
pub(super) text: String,
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct SessionFunctionTool {
#[serde(rename = "type")]
pub(super) kind: String,
pub(super) name: String,
pub(super) description: String,
pub(super) parameters: Value,
}
pub(super) fn parse_realtime_event(
payload: &str,
event_parser: RealtimeEventParser,
@@ -93,125 +128,3 @@ pub(super) fn parse_realtime_event(
RealtimeEventParser::RealtimeV2 => parse_realtime_event_v2(payload),
}
}
fn parse_realtime_event_v1(payload: &str) -> Option<RealtimeEvent> {
let parsed: Value = match serde_json::from_str(payload) {
Ok(msg) => msg,
Err(err) => {
debug!("failed to parse realtime event: {err}, data: {payload}");
return None;
}
};
let message_type = match parsed.get("type").and_then(Value::as_str) {
Some(message_type) => message_type,
None => {
debug!("received realtime event without type field: {payload}");
return None;
}
};
match message_type {
"session.updated" => {
let session_id = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("id"))
.and_then(Value::as_str)
.map(str::to_string);
let instructions = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("instructions"))
.and_then(Value::as_str)
.map(str::to_string);
session_id.map(|session_id| RealtimeEvent::SessionUpdated {
session_id,
instructions,
})
}
"conversation.output_audio.delta" => {
let data = parsed
.get("delta")
.and_then(Value::as_str)
.or_else(|| parsed.get("data").and_then(Value::as_str))
.map(str::to_string)?;
let sample_rate = parsed
.get("sample_rate")
.and_then(Value::as_u64)
.and_then(|v| u32::try_from(v).ok())?;
let num_channels = parsed
.get("channels")
.or_else(|| parsed.get("num_channels"))
.and_then(Value::as_u64)
.and_then(|v| u16::try_from(v).ok())?;
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data,
sample_rate,
num_channels,
samples_per_channel: parsed
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|v| u32::try_from(v).ok()),
}))
}
"conversation.input_transcript.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.output_transcript.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.item.added" => parsed
.get("item")
.cloned()
.map(RealtimeEvent::ConversationItemAdded),
"conversation.item.done" => parsed
.get("item")
.and_then(Value::as_object)
.and_then(|item| item.get("id"))
.and_then(Value::as_str)
.map(str::to_string)
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id }),
"conversation.handoff.requested" => {
let handoff_id = parsed
.get("handoff_id")
.and_then(Value::as_str)
.map(str::to_string)?;
let item_id = parsed
.get("item_id")
.and_then(Value::as_str)
.map(str::to_string)?;
let input_transcript = parsed
.get("input_transcript")
.and_then(Value::as_str)
.map(str::to_string)?;
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id,
item_id,
input_transcript,
active_transcript: Vec::new(),
}))
}
"error" => parsed
.get("message")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
parsed
.get("error")
.and_then(Value::as_object)
.and_then(|error| error.get("message"))
.and_then(Value::as_str)
.map(str::to_string)
})
.or_else(|| parsed.get("error").map(std::string::ToString::to_string))
.map(RealtimeEvent::Error),
_ => {
debug!("received unsupported realtime event type: {message_type}, data: {payload}");
None
}
}
}

View File

@@ -0,0 +1,71 @@
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RealtimeTranscriptDelta;
use serde_json::Value;
use tracing::debug;
pub(super) fn parse_realtime_payload(payload: &str, parser_name: &str) -> Option<(Value, String)> {
let parsed: Value = match serde_json::from_str(payload) {
Ok(message) => message,
Err(err) => {
debug!("failed to parse {parser_name} event: {err}, data: {payload}");
return None;
}
};
let message_type = match parsed.get("type").and_then(Value::as_str) {
Some(message_type) => message_type.to_string(),
None => {
debug!("received {parser_name} event without type field: {payload}");
return None;
}
};
Some((parsed, message_type))
}
pub(super) fn parse_session_updated_event(parsed: &Value) -> Option<RealtimeEvent> {
let session_id = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("id"))
.and_then(Value::as_str)
.map(str::to_string)?;
let instructions = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("instructions"))
.and_then(Value::as_str)
.map(str::to_string);
Some(RealtimeEvent::SessionUpdated {
session_id,
instructions,
})
}
pub(super) fn parse_transcript_delta_event(
parsed: &Value,
field: &str,
) -> Option<RealtimeTranscriptDelta> {
parsed
.get(field)
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeTranscriptDelta { delta })
}
pub(super) fn parse_error_event(parsed: &Value) -> Option<RealtimeEvent> {
parsed
.get("message")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
parsed
.get("error")
.and_then(Value::as_object)
.and_then(|error| error.get("message"))
.and_then(Value::as_str)
.map(str::to_string)
})
.or_else(|| parsed.get("error").map(ToString::to_string))
.map(RealtimeEvent::Error)
}

View File

@@ -0,0 +1,83 @@
use crate::endpoint::realtime_websocket::protocol_common::parse_error_event;
use crate::endpoint::realtime_websocket::protocol_common::parse_realtime_payload;
use crate::endpoint::realtime_websocket::protocol_common::parse_session_updated_event;
use crate::endpoint::realtime_websocket::protocol_common::parse_transcript_delta_event;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RealtimeHandoffRequested;
use serde_json::Value;
use tracing::debug;
pub(super) fn parse_realtime_event_v1(payload: &str) -> Option<RealtimeEvent> {
let (parsed, message_type) = parse_realtime_payload(payload, "realtime v1")?;
match message_type.as_str() {
"session.updated" => parse_session_updated_event(&parsed),
"conversation.output_audio.delta" => {
let data = parsed
.get("delta")
.and_then(Value::as_str)
.or_else(|| parsed.get("data").and_then(Value::as_str))
.map(str::to_string)?;
let sample_rate = parsed
.get("sample_rate")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())?;
let num_channels = parsed
.get("channels")
.or_else(|| parsed.get("num_channels"))
.and_then(Value::as_u64)
.and_then(|value| u16::try_from(value).ok())?;
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data,
sample_rate,
num_channels,
samples_per_channel: parsed
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok()),
}))
}
"conversation.input_transcript.delta" => {
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::InputTranscriptDelta)
}
"conversation.output_transcript.delta" => {
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::OutputTranscriptDelta)
}
"conversation.item.added" => parsed
.get("item")
.cloned()
.map(RealtimeEvent::ConversationItemAdded),
"conversation.item.done" => parsed
.get("item")
.and_then(Value::as_object)
.and_then(|item| item.get("id"))
.and_then(Value::as_str)
.map(str::to_string)
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id }),
"conversation.handoff.requested" => {
let handoff_id = parsed
.get("handoff_id")
.and_then(Value::as_str)
.map(str::to_string)?;
let item_id = parsed
.get("item_id")
.and_then(Value::as_str)
.map(str::to_string)?;
let input_transcript = parsed
.get("input_transcript")
.and_then(Value::as_str)
.map(str::to_string)?;
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id,
item_id,
input_transcript,
active_transcript: Vec::new(),
}))
}
"error" => parse_error_event(&parsed),
_ => {
debug!("received unsupported realtime v1 event type: {message_type}, data: {payload}");
None
}
}
}

View File

@@ -1,157 +1,130 @@
use crate::endpoint::realtime_websocket::protocol_common::parse_error_event;
use crate::endpoint::realtime_websocket::protocol_common::parse_realtime_payload;
use crate::endpoint::realtime_websocket::protocol_common::parse_session_updated_event;
use crate::endpoint::realtime_websocket::protocol_common::parse_transcript_delta_event;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RealtimeHandoffRequested;
use codex_protocol::protocol::RealtimeTranscriptDelta;
use serde_json::Map as JsonMap;
use serde_json::Value;
use tracing::debug;
const CODEX_TOOL_NAME: &str = "codex";
const DEFAULT_AUDIO_SAMPLE_RATE: u32 = 24_000;
const DEFAULT_AUDIO_CHANNELS: u16 = 1;
const TOOL_ARGUMENT_KEYS: [&str; 5] = ["input_transcript", "input", "text", "prompt", "query"];
pub(super) fn parse_realtime_event_v2(payload: &str) -> Option<RealtimeEvent> {
let parsed: Value = match serde_json::from_str(payload) {
Ok(msg) => msg,
Err(err) => {
debug!("failed to parse realtime v2 event: {err}, data: {payload}");
return None;
}
};
let (parsed, message_type) = parse_realtime_payload(payload, "realtime v2")?;
let message_type = match parsed.get("type").and_then(Value::as_str) {
Some(message_type) => message_type,
None => {
debug!("received realtime v2 event without type field: {payload}");
return None;
match message_type.as_str() {
"session.updated" => parse_session_updated_event(&parsed),
"response.output_audio.delta" => parse_output_audio_delta_event(&parsed),
"conversation.item.input_audio_transcription.delta" => {
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::InputTranscriptDelta)
}
};
match message_type {
"session.updated" => {
let session_id = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("id"))
.and_then(Value::as_str)
.map(str::to_string);
let instructions = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("instructions"))
.and_then(Value::as_str)
.map(str::to_string);
session_id.map(|session_id| RealtimeEvent::SessionUpdated {
session_id,
instructions,
})
"conversation.item.input_audio_transcription.completed" => {
parse_transcript_delta_event(&parsed, "transcript")
.map(RealtimeEvent::InputTranscriptDelta)
}
"response.output_audio.delta" => {
let data = parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)?;
let sample_rate = parsed
.get("sample_rate")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())
.unwrap_or(24_000);
let num_channels = parsed
.get("channels")
.or_else(|| parsed.get("num_channels"))
.and_then(Value::as_u64)
.and_then(|value| u16::try_from(value).ok())
.unwrap_or(1);
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data,
sample_rate,
num_channels,
samples_per_channel: parsed
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok()),
}))
"response.output_text.delta" | "response.output_audio_transcript.delta" => {
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::OutputTranscriptDelta)
}
"conversation.item.input_audio_transcription.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.item.input_audio_transcription.completed" => parsed
.get("transcript")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"response.output_text.delta" | "response.output_audio_transcript.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.item.added" => parsed
.get("item")
.cloned()
.map(RealtimeEvent::ConversationItemAdded),
"conversation.item.done" => {
let item = parsed.get("item")?.as_object()?;
let item_type = item.get("type").and_then(Value::as_str);
let item_name = item.get("name").and_then(Value::as_str);
if item_type == Some("function_call") && item_name == Some("codex") {
let call_id = item
.get("call_id")
.and_then(Value::as_str)
.or_else(|| item.get("id").and_then(Value::as_str))?;
let item_id = item
.get("id")
.and_then(Value::as_str)
.unwrap_or(call_id)
.to_string();
let arguments = item.get("arguments").and_then(Value::as_str).unwrap_or("");
let mut input_transcript = String::new();
if !arguments.is_empty() {
if let Ok(arguments_json) = serde_json::from_str::<Value>(arguments)
&& let Some(arguments_object) = arguments_json.as_object()
{
for key in ["input_transcript", "input", "text", "prompt", "query"] {
if let Some(value) = arguments_object.get(key).and_then(Value::as_str) {
let trimmed = value.trim();
if !trimmed.is_empty() {
input_transcript = trimmed.to_string();
break;
}
}
}
}
if input_transcript.is_empty() {
input_transcript = arguments.to_string();
}
}
return Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: call_id.to_string(),
item_id,
input_transcript,
active_transcript: Vec::new(),
}));
}
item.get("id")
.and_then(Value::as_str)
.map(str::to_string)
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id })
}
"error" => parsed
.get("message")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
parsed
.get("error")
.and_then(Value::as_object)
.and_then(|error| error.get("message"))
.and_then(Value::as_str)
.map(str::to_string)
})
.or_else(|| parsed.get("error").map(ToString::to_string))
.map(RealtimeEvent::Error),
"conversation.item.done" => parse_conversation_item_done_event(&parsed),
"error" => parse_error_event(&parsed),
_ => {
debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}");
None
}
}
}
fn parse_output_audio_delta_event(parsed: &Value) -> Option<RealtimeEvent> {
let data = parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)?;
let sample_rate = parsed
.get("sample_rate")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())
.unwrap_or(DEFAULT_AUDIO_SAMPLE_RATE);
let num_channels = parsed
.get("channels")
.or_else(|| parsed.get("num_channels"))
.and_then(Value::as_u64)
.and_then(|value| u16::try_from(value).ok())
.unwrap_or(DEFAULT_AUDIO_CHANNELS);
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data,
sample_rate,
num_channels,
samples_per_channel: parsed
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok()),
}))
}
fn parse_conversation_item_done_event(parsed: &Value) -> Option<RealtimeEvent> {
let item = parsed.get("item")?.as_object()?;
if let Some(handoff) = parse_handoff_requested_event(item) {
return Some(handoff);
}
item.get("id")
.and_then(Value::as_str)
.map(str::to_string)
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id })
}
fn parse_handoff_requested_event(item: &JsonMap<String, Value>) -> Option<RealtimeEvent> {
let item_type = item.get("type").and_then(Value::as_str);
let item_name = item.get("name").and_then(Value::as_str);
if item_type != Some("function_call") || item_name != Some(CODEX_TOOL_NAME) {
return None;
}
let call_id = item
.get("call_id")
.and_then(Value::as_str)
.or_else(|| item.get("id").and_then(Value::as_str))?;
let item_id = item
.get("id")
.and_then(Value::as_str)
.unwrap_or(call_id)
.to_string();
let arguments = item.get("arguments").and_then(Value::as_str).unwrap_or("");
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: call_id.to_string(),
item_id,
input_transcript: extract_input_transcript(arguments),
active_transcript: Vec::new(),
}))
}
fn extract_input_transcript(arguments: &str) -> String {
if arguments.is_empty() {
return String::new();
}
if let Ok(arguments_json) = serde_json::from_str::<Value>(arguments)
&& let Some(arguments_object) = arguments_json.as_object()
{
for key in TOOL_ARGUMENT_KEYS {
if let Some(value) = arguments_object.get(key).and_then(Value::as_str) {
let trimmed = value.trim();
if !trimmed.is_empty() {
return trimmed.to_string();
}
}
}
}
arguments.to_string()
}

View File

@@ -29,6 +29,7 @@ pub use crate::endpoint::memories::MemoriesClient;
pub use crate::endpoint::models::ModelsClient;
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketClient;
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketConnection;
pub use crate::endpoint::responses::ResponsesClient;

View File

@@ -6,6 +6,7 @@ use codex_api::RealtimeAudioFrame;
use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeSessionMode;
use codex_api::RealtimeWebsocketClient;
use codex_api::provider::Provider;
use codex_api::provider::RetryConfig;
@@ -142,6 +143,7 @@ async fn realtime_ws_e2e_session_create_and_event_flow() {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
@@ -235,6 +237,7 @@ async fn realtime_ws_e2e_send_while_next_event_waits() {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
@@ -299,6 +302,7 @@ async fn realtime_ws_e2e_disconnected_emitted_once() {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
@@ -360,6 +364,7 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
@@ -424,6 +429,7 @@ async fn realtime_ws_e2e_realtime_v2_parser_emits_handoff_requested() {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),

View File

@@ -1342,6 +1342,13 @@
},
"type": "object"
},
"RealtimeWsMode": {
"enum": [
"conversational",
"transcription"
],
"type": "string"
},
"ReasoningEffort": {
"description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning",
"enum": [
@@ -1816,6 +1823,14 @@
"description": "Experimental / do not use. Overrides only the realtime conversation websocket transport base URL (the `Op::RealtimeConversation` `/v1/realtime` connection) without changing normal provider HTTP requests.",
"type": "string"
},
"experimental_realtime_ws_mode": {
"allOf": [
{
"$ref": "#/definitions/RealtimeWsMode"
}
],
"description": "Experimental / do not use. Selects the realtime websocket intent mode. `conversational` is speech-to-speech while `transcription` is transcript-only."
},
"experimental_realtime_ws_model": {
"description": "Experimental / do not use. Selects the realtime websocket model/snapshot used for the `Op::RealtimeConversation` connection.",
"type": "string"

View File

@@ -4129,6 +4129,7 @@ fn test_precedence_fixture_with_o3_profile() -> std::io::Result<()> {
experimental_realtime_start_instructions: None,
experimental_realtime_ws_base_url: None,
experimental_realtime_ws_model: None,
experimental_realtime_ws_mode: RealtimeWsMode::Conversational,
experimental_realtime_ws_backend_prompt: None,
experimental_realtime_ws_startup_context: None,
base_instructions: None,
@@ -4265,6 +4266,7 @@ fn test_precedence_fixture_with_gpt3_profile() -> std::io::Result<()> {
experimental_realtime_start_instructions: None,
experimental_realtime_ws_base_url: None,
experimental_realtime_ws_model: None,
experimental_realtime_ws_mode: RealtimeWsMode::Conversational,
experimental_realtime_ws_backend_prompt: None,
experimental_realtime_ws_startup_context: None,
base_instructions: None,
@@ -4399,6 +4401,7 @@ fn test_precedence_fixture_with_zdr_profile() -> std::io::Result<()> {
experimental_realtime_start_instructions: None,
experimental_realtime_ws_base_url: None,
experimental_realtime_ws_model: None,
experimental_realtime_ws_mode: RealtimeWsMode::Conversational,
experimental_realtime_ws_backend_prompt: None,
experimental_realtime_ws_startup_context: None,
base_instructions: None,
@@ -4519,6 +4522,7 @@ fn test_precedence_fixture_with_gpt5_profile() -> std::io::Result<()> {
experimental_realtime_start_instructions: None,
experimental_realtime_ws_base_url: None,
experimental_realtime_ws_model: None,
experimental_realtime_ws_mode: RealtimeWsMode::Conversational,
experimental_realtime_ws_backend_prompt: None,
experimental_realtime_ws_startup_context: None,
base_instructions: None,
@@ -5566,6 +5570,34 @@ experimental_realtime_ws_model = "realtime-test-model"
Ok(())
}
#[test]
fn experimental_realtime_ws_mode_loads_from_config_toml() -> std::io::Result<()> {
let cfg: ConfigToml = toml::from_str(
r#"
experimental_realtime_ws_mode = "transcription"
"#,
)
.expect("TOML deserialization should succeed");
assert_eq!(
cfg.experimental_realtime_ws_mode,
Some(RealtimeWsMode::Transcription)
);
let codex_home = TempDir::new()?;
let config = Config::load_from_base_config_with_overrides(
cfg,
ConfigOverrides::default(),
codex_home.path().to_path_buf(),
)?;
assert_eq!(
config.experimental_realtime_ws_mode,
RealtimeWsMode::Transcription
);
Ok(())
}
#[test]
fn realtime_audio_loads_from_config_toml() -> std::io::Result<()> {
let cfg: ConfigToml = toml::from_str(

View File

@@ -463,6 +463,9 @@ pub struct Config {
/// Experimental / do not use. Selects the realtime websocket model/snapshot
/// used for the `Op::RealtimeConversation` connection.
pub experimental_realtime_ws_model: Option<String>,
/// Experimental / do not use. Selects the realtime websocket intent mode.
/// `conversational` is speech-to-speech while `transcription` is transcript-only.
pub experimental_realtime_ws_mode: RealtimeWsMode,
/// Experimental / do not use. Overrides only the realtime conversation
/// websocket transport instructions (the `Op::RealtimeConversation`
/// `/ws` session.update instructions) without changing normal prompts.
@@ -1238,6 +1241,9 @@ pub struct ConfigToml {
/// Experimental / do not use. Selects the realtime websocket model/snapshot
/// used for the `Op::RealtimeConversation` connection.
pub experimental_realtime_ws_model: Option<String>,
/// Experimental / do not use. Selects the realtime websocket intent mode.
/// `conversational` is speech-to-speech while `transcription` is transcript-only.
pub experimental_realtime_ws_mode: Option<RealtimeWsMode>,
/// Experimental / do not use. Overrides only the realtime conversation
/// websocket transport instructions (the `Op::RealtimeConversation`
/// `/ws` session.update instructions) without changing normal prompts.
@@ -1383,6 +1389,14 @@ pub struct RealtimeAudioConfig {
pub speaker: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Default, PartialEq, Eq, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum RealtimeWsMode {
#[default]
Conversational,
Transcription,
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, JsonSchema)]
#[schemars(deny_unknown_fields)]
pub struct RealtimeAudioToml {
@@ -2462,6 +2476,7 @@ impl Config {
}),
experimental_realtime_ws_base_url: cfg.experimental_realtime_ws_base_url,
experimental_realtime_ws_model: cfg.experimental_realtime_ws_model,
experimental_realtime_ws_mode: cfg.experimental_realtime_ws_mode.unwrap_or_default(),
experimental_realtime_ws_backend_prompt: cfg.experimental_realtime_ws_backend_prompt,
experimental_realtime_ws_startup_context: cfg.experimental_realtime_ws_startup_context,
experimental_realtime_start_instructions: cfg.experimental_realtime_start_instructions,

View File

@@ -15,6 +15,7 @@ use codex_api::RealtimeAudioFrame;
use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeSessionMode;
use codex_api::RealtimeWebsocketClient;
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketEvents;
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketWriter;
@@ -116,10 +117,7 @@ impl RealtimeConversationManager {
&self,
api_provider: ApiProvider,
extra_headers: Option<HeaderMap>,
prompt: String,
model: Option<String>,
session_id: Option<String>,
event_parser: RealtimeEventParser,
session_config: RealtimeSessionConfig,
) -> CodexResult<(Receiver<RealtimeEvent>, Arc<AtomicBool>)> {
let previous_state = {
let mut guard = self.state.lock().await;
@@ -131,12 +129,6 @@ impl RealtimeConversationManager {
let _ = state.task.await;
}
let session_config = RealtimeSessionConfig {
instructions: prompt,
model,
session_id,
event_parser,
};
let client = RealtimeWebsocketClient::new(api_provider);
let connection = client
.connect(
@@ -307,23 +299,26 @@ pub(crate) async fn handle_start(
} else {
RealtimeEventParser::V1
};
let session_mode = match config.experimental_realtime_ws_mode {
crate::config::RealtimeWsMode::Conversational => RealtimeSessionMode::Conversational,
crate::config::RealtimeWsMode::Transcription => RealtimeSessionMode::Transcription,
};
let requested_session_id = params
.session_id
.or_else(|| Some(sess.conversation_id.to_string()));
let session_config = RealtimeSessionConfig {
instructions: prompt,
model,
session_id: requested_session_id.clone(),
event_parser,
session_mode,
};
let extra_headers =
realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?;
info!("starting realtime conversation");
let (events_rx, realtime_active) = match sess
.conversation
.start(
api_provider,
extra_headers,
prompt,
model,
requested_session_id.clone(),
event_parser,
)
.start(api_provider, extra_headers, session_config)
.await
{
Ok(events_rx) => events_rx,

View File

@@ -123,7 +123,7 @@ impl ActionKind {
let (path, _) = target.resolve_for_patch(test);
let _ = fs::remove_file(&path);
let command = format!("printf {content:?} > {path:?} && cat {path:?}");
let event = shell_event(call_id, &command, 1_000, sandbox_permissions)?;
let event = shell_event(call_id, &command, 5_000, sandbox_permissions)?;
Ok((event, Some(command)))
}
ActionKind::FetchUrl {