mirror of
https://github.com/openai/codex.git
synced 2026-03-13 10:13:49 +00:00
Compare commits
4 Commits
dev/cc/mul
...
codex/fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cbe0a9592f | ||
|
|
c7e847aaeb | ||
|
|
2253a9d1d7 | ||
|
|
eaf81d3f6f |
@@ -1,16 +1,20 @@
|
||||
use crate::endpoint::realtime_websocket::protocol::ConversationItem;
|
||||
use crate::endpoint::realtime_websocket::protocol::ConversationFunctionCallOutputItem;
|
||||
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
|
||||
use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload;
|
||||
use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
|
||||
use crate::endpoint::realtime_websocket::protocol::parse_realtime_event;
|
||||
use crate::error::ApiError;
|
||||
@@ -21,6 +25,7 @@ use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
@@ -41,6 +46,23 @@ use tracing::trace;
|
||||
use tungstenite::protocol::WebSocketConfig;
|
||||
use url::Url;
|
||||
|
||||
const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
|
||||
const REALTIME_AUDIO_VOICE: &str = "fathom";
|
||||
const REALTIME_V1_SESSION_TYPE: &str = "quicksilver";
|
||||
const REALTIME_V2_SESSION_TYPE: &str = "realtime";
|
||||
const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex";
|
||||
const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result.";
|
||||
|
||||
fn normalized_session_mode(
|
||||
event_parser: RealtimeEventParser,
|
||||
session_mode: RealtimeSessionMode,
|
||||
) -> RealtimeSessionMode {
|
||||
match event_parser {
|
||||
RealtimeEventParser::V1 => RealtimeSessionMode::Conversational,
|
||||
RealtimeEventParser::RealtimeV2 => session_mode,
|
||||
}
|
||||
}
|
||||
|
||||
struct WsStream {
|
||||
tx_command: mpsc::Sender<WsCommand>,
|
||||
pump_task: tokio::task::JoinHandle<()>,
|
||||
@@ -197,6 +219,7 @@ pub struct RealtimeWebsocketConnection {
|
||||
pub struct RealtimeWebsocketWriter {
|
||||
stream: Arc<WsStream>,
|
||||
is_closed: Arc<AtomicBool>,
|
||||
event_parser: RealtimeEventParser,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -258,6 +281,7 @@ impl RealtimeWebsocketConnection {
|
||||
writer: RealtimeWebsocketWriter {
|
||||
stream: Arc::clone(&stream),
|
||||
is_closed: Arc::clone(&is_closed),
|
||||
event_parser,
|
||||
},
|
||||
events: RealtimeWebsocketEvents {
|
||||
rx_message: Arc::new(Mutex::new(rx_message)),
|
||||
@@ -276,15 +300,19 @@ impl RealtimeWebsocketWriter {
|
||||
}
|
||||
|
||||
pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> {
|
||||
let content_kind = match self.event_parser {
|
||||
RealtimeEventParser::V1 => "text",
|
||||
RealtimeEventParser::RealtimeV2 => "input_text",
|
||||
};
|
||||
self.send_json(RealtimeOutboundMessage::ConversationItemCreate {
|
||||
item: ConversationItem {
|
||||
item: ConversationItemPayload::Message(ConversationMessageItem {
|
||||
kind: "message".to_string(),
|
||||
role: "user".to_string(),
|
||||
content: vec![ConversationItemContent {
|
||||
kind: "text".to_string(),
|
||||
kind: content_kind.to_string(),
|
||||
text,
|
||||
}],
|
||||
},
|
||||
}),
|
||||
})
|
||||
.await
|
||||
}
|
||||
@@ -294,29 +322,80 @@ impl RealtimeWebsocketWriter {
|
||||
handoff_id: String,
|
||||
output_text: String,
|
||||
) -> Result<(), ApiError> {
|
||||
self.send_json(RealtimeOutboundMessage::ConversationHandoffAppend {
|
||||
handoff_id,
|
||||
output_text,
|
||||
})
|
||||
.await
|
||||
let message = match self.event_parser {
|
||||
RealtimeEventParser::V1 => RealtimeOutboundMessage::ConversationHandoffAppend {
|
||||
handoff_id,
|
||||
output_text,
|
||||
},
|
||||
RealtimeEventParser::RealtimeV2 => RealtimeOutboundMessage::ConversationItemCreate {
|
||||
item: ConversationItemPayload::FunctionCallOutput(
|
||||
ConversationFunctionCallOutputItem {
|
||||
kind: "function_call_output".to_string(),
|
||||
call_id: handoff_id,
|
||||
output: output_text,
|
||||
},
|
||||
),
|
||||
},
|
||||
};
|
||||
|
||||
self.send_json(message).await
|
||||
}
|
||||
|
||||
pub async fn send_session_update(&self, instructions: String) -> Result<(), ApiError> {
|
||||
pub async fn send_session_update(
|
||||
&self,
|
||||
instructions: String,
|
||||
session_mode: RealtimeSessionMode,
|
||||
) -> Result<(), ApiError> {
|
||||
let session_mode = normalized_session_mode(self.event_parser, session_mode);
|
||||
let (session_kind, session_instructions, output_audio) = match session_mode {
|
||||
RealtimeSessionMode::Conversational => {
|
||||
let kind = match self.event_parser {
|
||||
RealtimeEventParser::V1 => REALTIME_V1_SESSION_TYPE.to_string(),
|
||||
RealtimeEventParser::RealtimeV2 => REALTIME_V2_SESSION_TYPE.to_string(),
|
||||
};
|
||||
(
|
||||
kind,
|
||||
Some(instructions),
|
||||
Some(SessionAudioOutput {
|
||||
voice: REALTIME_AUDIO_VOICE.to_string(),
|
||||
}),
|
||||
)
|
||||
}
|
||||
RealtimeSessionMode::Transcription => ("transcription".to_string(), None, None),
|
||||
};
|
||||
let tools = match self.event_parser {
|
||||
RealtimeEventParser::RealtimeV2 => Some(vec![SessionFunctionTool {
|
||||
kind: "function".to_string(),
|
||||
name: REALTIME_V2_CODEX_TOOL_NAME.to_string(),
|
||||
description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(),
|
||||
parameters: json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"prompt": {
|
||||
"type": "string",
|
||||
"description": "Prompt text for the delegated Codex task."
|
||||
}
|
||||
},
|
||||
"required": ["prompt"],
|
||||
"additionalProperties": false
|
||||
}),
|
||||
}]),
|
||||
RealtimeEventParser::V1 => None,
|
||||
};
|
||||
self.send_json(RealtimeOutboundMessage::SessionUpdate {
|
||||
session: SessionUpdateSession {
|
||||
kind: "quicksilver".to_string(),
|
||||
instructions,
|
||||
kind: session_kind,
|
||||
instructions: session_instructions,
|
||||
audio: SessionAudio {
|
||||
input: SessionAudioInput {
|
||||
format: SessionAudioFormat {
|
||||
kind: "audio/pcm".to_string(),
|
||||
rate: 24_000,
|
||||
rate: REALTIME_AUDIO_SAMPLE_RATE,
|
||||
},
|
||||
},
|
||||
output: SessionAudioOutput {
|
||||
voice: "fathom".to_string(),
|
||||
},
|
||||
output: output_audio,
|
||||
},
|
||||
tools,
|
||||
},
|
||||
})
|
||||
.await
|
||||
@@ -465,6 +544,8 @@ impl RealtimeWebsocketClient {
|
||||
self.provider.base_url.as_str(),
|
||||
self.provider.query_params.as_ref(),
|
||||
config.model.as_deref(),
|
||||
config.event_parser,
|
||||
config.session_mode,
|
||||
)?;
|
||||
|
||||
let mut request = ws_url
|
||||
@@ -506,7 +587,7 @@ impl RealtimeWebsocketClient {
|
||||
);
|
||||
connection
|
||||
.writer
|
||||
.send_session_update(config.instructions)
|
||||
.send_session_update(config.instructions, config.session_mode)
|
||||
.await?;
|
||||
Ok(connection)
|
||||
}
|
||||
@@ -551,6 +632,8 @@ fn websocket_url_from_api_url(
|
||||
api_url: &str,
|
||||
query_params: Option<&HashMap<String, String>>,
|
||||
model: Option<&str>,
|
||||
event_parser: RealtimeEventParser,
|
||||
_session_mode: RealtimeSessionMode,
|
||||
) -> Result<Url, ApiError> {
|
||||
let mut url = Url::parse(api_url)
|
||||
.map_err(|err| ApiError::Stream(format!("failed to parse realtime api_url: {err}")))?;
|
||||
@@ -570,9 +653,20 @@ fn websocket_url_from_api_url(
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let intent = match event_parser {
|
||||
RealtimeEventParser::V1 => Some("quicksilver"),
|
||||
RealtimeEventParser::RealtimeV2 => None,
|
||||
};
|
||||
let has_extra_query_params = query_params.is_some_and(|query_params| {
|
||||
query_params
|
||||
.iter()
|
||||
.any(|(key, _)| key != "intent" && !(key == "model" && model.is_some()))
|
||||
});
|
||||
if intent.is_some() || model.is_some() || has_extra_query_params {
|
||||
let mut query = url.query_pairs_mut();
|
||||
query.append_pair("intent", "quicksilver");
|
||||
if let Some(intent) = intent {
|
||||
query.append_pair("intent", intent);
|
||||
}
|
||||
if let Some(model) = model {
|
||||
query.append_pair("model", model);
|
||||
}
|
||||
@@ -853,8 +947,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn websocket_url_from_http_base_defaults_to_ws_path() {
|
||||
let url =
|
||||
websocket_url_from_api_url("http://127.0.0.1:8011", None, None).expect("build ws url");
|
||||
let url = websocket_url_from_api_url(
|
||||
"http://127.0.0.1:8011",
|
||||
None,
|
||||
None,
|
||||
RealtimeEventParser::V1,
|
||||
RealtimeSessionMode::Conversational,
|
||||
)
|
||||
.expect("build ws url");
|
||||
assert_eq!(
|
||||
url.as_str(),
|
||||
"ws://127.0.0.1:8011/v1/realtime?intent=quicksilver"
|
||||
@@ -863,9 +963,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn websocket_url_from_ws_base_defaults_to_ws_path() {
|
||||
let url =
|
||||
websocket_url_from_api_url("wss://example.com", None, Some("realtime-test-model"))
|
||||
.expect("build ws url");
|
||||
let url = websocket_url_from_api_url(
|
||||
"wss://example.com",
|
||||
None,
|
||||
Some("realtime-test-model"),
|
||||
RealtimeEventParser::V1,
|
||||
RealtimeSessionMode::Conversational,
|
||||
)
|
||||
.expect("build ws url");
|
||||
assert_eq!(
|
||||
url.as_str(),
|
||||
"wss://example.com/v1/realtime?intent=quicksilver&model=realtime-test-model"
|
||||
@@ -874,8 +979,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn websocket_url_from_v1_base_appends_realtime_path() {
|
||||
let url = websocket_url_from_api_url("https://api.openai.com/v1", None, Some("snapshot"))
|
||||
.expect("build ws url");
|
||||
let url = websocket_url_from_api_url(
|
||||
"https://api.openai.com/v1",
|
||||
None,
|
||||
Some("snapshot"),
|
||||
RealtimeEventParser::V1,
|
||||
RealtimeSessionMode::Conversational,
|
||||
)
|
||||
.expect("build ws url");
|
||||
assert_eq!(
|
||||
url.as_str(),
|
||||
"wss://api.openai.com/v1/realtime?intent=quicksilver&model=snapshot"
|
||||
@@ -884,9 +995,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn websocket_url_from_nested_v1_base_appends_realtime_path() {
|
||||
let url =
|
||||
websocket_url_from_api_url("https://example.com/openai/v1", None, Some("snapshot"))
|
||||
.expect("build ws url");
|
||||
let url = websocket_url_from_api_url(
|
||||
"https://example.com/openai/v1",
|
||||
None,
|
||||
Some("snapshot"),
|
||||
RealtimeEventParser::V1,
|
||||
RealtimeSessionMode::Conversational,
|
||||
)
|
||||
.expect("build ws url");
|
||||
assert_eq!(
|
||||
url.as_str(),
|
||||
"wss://example.com/openai/v1/realtime?intent=quicksilver&model=snapshot"
|
||||
@@ -902,6 +1018,8 @@ mod tests {
|
||||
("intent".to_string(), "ignored".to_string()),
|
||||
])),
|
||||
Some("snapshot"),
|
||||
RealtimeEventParser::V1,
|
||||
RealtimeSessionMode::Conversational,
|
||||
)
|
||||
.expect("build ws url");
|
||||
assert_eq!(
|
||||
@@ -910,6 +1028,54 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn websocket_url_v1_ignores_transcription_mode() {
|
||||
let url = websocket_url_from_api_url(
|
||||
"https://example.com",
|
||||
None,
|
||||
None,
|
||||
RealtimeEventParser::V1,
|
||||
RealtimeSessionMode::Transcription,
|
||||
)
|
||||
.expect("build ws url");
|
||||
assert_eq!(
|
||||
url.as_str(),
|
||||
"wss://example.com/v1/realtime?intent=quicksilver"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn websocket_url_omits_intent_for_realtime_v2_conversational_mode() {
|
||||
let url = websocket_url_from_api_url(
|
||||
"https://example.com/v1/realtime?foo=bar",
|
||||
Some(&HashMap::from([
|
||||
("trace".to_string(), "1".to_string()),
|
||||
("intent".to_string(), "ignored".to_string()),
|
||||
])),
|
||||
Some("snapshot"),
|
||||
RealtimeEventParser::RealtimeV2,
|
||||
RealtimeSessionMode::Conversational,
|
||||
)
|
||||
.expect("build ws url");
|
||||
assert_eq!(
|
||||
url.as_str(),
|
||||
"wss://example.com/v1/realtime?foo=bar&model=snapshot&trace=1"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn websocket_url_omits_intent_for_realtime_v2_transcription_mode() {
|
||||
let url = websocket_url_from_api_url(
|
||||
"https://example.com",
|
||||
None,
|
||||
None,
|
||||
RealtimeEventParser::RealtimeV2,
|
||||
RealtimeSessionMode::Transcription,
|
||||
)
|
||||
.expect("build ws url");
|
||||
assert_eq!(url.as_str(), "wss://example.com/v1/realtime");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn e2e_connect_and_exchange_events_against_mock_ws_server() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
|
||||
@@ -1075,6 +1241,7 @@ mod tests {
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_1".to_string()),
|
||||
event_parser: RealtimeEventParser::V1,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
@@ -1195,6 +1362,352 @@ mod tests {
|
||||
server.await.expect("server task");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_v2_session_update_includes_codex_tool_and_handoff_output_item() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
|
||||
let addr = listener.local_addr().expect("local addr");
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.expect("accept");
|
||||
let mut ws = accept_async(stream).await.expect("accept ws");
|
||||
|
||||
let first = ws
|
||||
.next()
|
||||
.await
|
||||
.expect("first msg")
|
||||
.expect("first msg ok")
|
||||
.into_text()
|
||||
.expect("text");
|
||||
let first_json: Value = serde_json::from_str(&first).expect("json");
|
||||
assert_eq!(first_json["type"], "session.update");
|
||||
assert_eq!(
|
||||
first_json["session"]["type"],
|
||||
Value::String("realtime".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
first_json["session"]["tools"][0]["type"],
|
||||
Value::String("function".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
first_json["session"]["tools"][0]["name"],
|
||||
Value::String("codex".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
first_json["session"]["tools"][0]["parameters"]["required"],
|
||||
json!(["prompt"])
|
||||
);
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "session.updated",
|
||||
"session": {"id": "sess_v2", "instructions": "backend prompt"}
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("send session.updated");
|
||||
|
||||
let second = ws
|
||||
.next()
|
||||
.await
|
||||
.expect("second msg")
|
||||
.expect("second msg ok")
|
||||
.into_text()
|
||||
.expect("text");
|
||||
let second_json: Value = serde_json::from_str(&second).expect("json");
|
||||
assert_eq!(second_json["type"], "conversation.item.create");
|
||||
assert_eq!(
|
||||
second_json["item"]["type"],
|
||||
Value::String("message".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
second_json["item"]["content"][0]["type"],
|
||||
Value::String("input_text".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
second_json["item"]["content"][0]["text"],
|
||||
Value::String("delegate this".to_string())
|
||||
);
|
||||
|
||||
let third = ws
|
||||
.next()
|
||||
.await
|
||||
.expect("third msg")
|
||||
.expect("third msg ok")
|
||||
.into_text()
|
||||
.expect("text");
|
||||
let third_json: Value = serde_json::from_str(&third).expect("json");
|
||||
assert_eq!(third_json["type"], "conversation.item.create");
|
||||
assert_eq!(
|
||||
third_json["item"]["type"],
|
||||
Value::String("function_call_output".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
third_json["item"]["call_id"],
|
||||
Value::String("call_1".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
third_json["item"]["output"],
|
||||
Value::String("delegated result".to_string())
|
||||
);
|
||||
});
|
||||
|
||||
let provider = Provider {
|
||||
name: "test".to_string(),
|
||||
base_url: format!("http://{addr}"),
|
||||
query_params: Some(HashMap::new()),
|
||||
headers: HeaderMap::new(),
|
||||
retry: crate::provider::RetryConfig {
|
||||
max_attempts: 1,
|
||||
base_delay: Duration::from_millis(1),
|
||||
retry_429: false,
|
||||
retry_5xx: false,
|
||||
retry_transport: false,
|
||||
},
|
||||
stream_idle_timeout: Duration::from_secs(5),
|
||||
};
|
||||
let client = RealtimeWebsocketClient::new(provider);
|
||||
let connection = client
|
||||
.connect(
|
||||
RealtimeSessionConfig {
|
||||
instructions: "backend prompt".to_string(),
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_1".to_string()),
|
||||
event_parser: RealtimeEventParser::RealtimeV2,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
)
|
||||
.await
|
||||
.expect("connect");
|
||||
|
||||
let created = connection
|
||||
.next_event()
|
||||
.await
|
||||
.expect("next event")
|
||||
.expect("event");
|
||||
assert_eq!(
|
||||
created,
|
||||
RealtimeEvent::SessionUpdated {
|
||||
session_id: "sess_v2".to_string(),
|
||||
instructions: Some("backend prompt".to_string()),
|
||||
}
|
||||
);
|
||||
|
||||
connection
|
||||
.send_conversation_item_create("delegate this".to_string())
|
||||
.await
|
||||
.expect("send text item");
|
||||
connection
|
||||
.send_conversation_handoff_append("call_1".to_string(), "delegated result".to_string())
|
||||
.await
|
||||
.expect("send handoff output");
|
||||
|
||||
connection.close().await.expect("close");
|
||||
server.await.expect("server task");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn transcription_mode_session_update_omits_output_audio_and_instructions() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
|
||||
let addr = listener.local_addr().expect("local addr");
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.expect("accept");
|
||||
let mut ws = accept_async(stream).await.expect("accept ws");
|
||||
|
||||
let first = ws
|
||||
.next()
|
||||
.await
|
||||
.expect("first msg")
|
||||
.expect("first msg ok")
|
||||
.into_text()
|
||||
.expect("text");
|
||||
let first_json: Value = serde_json::from_str(&first).expect("json");
|
||||
assert_eq!(first_json["type"], "session.update");
|
||||
assert_eq!(
|
||||
first_json["session"]["type"],
|
||||
Value::String("transcription".to_string())
|
||||
);
|
||||
assert!(first_json["session"].get("instructions").is_none());
|
||||
assert!(first_json["session"]["audio"].get("output").is_none());
|
||||
assert_eq!(
|
||||
first_json["session"]["tools"][0]["name"],
|
||||
Value::String("codex".to_string())
|
||||
);
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "session.updated",
|
||||
"session": {"id": "sess_transcription"}
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("send session.updated");
|
||||
|
||||
let second = ws
|
||||
.next()
|
||||
.await
|
||||
.expect("second msg")
|
||||
.expect("second msg ok")
|
||||
.into_text()
|
||||
.expect("text");
|
||||
let second_json: Value = serde_json::from_str(&second).expect("json");
|
||||
assert_eq!(second_json["type"], "input_audio_buffer.append");
|
||||
});
|
||||
|
||||
let provider = Provider {
|
||||
name: "test".to_string(),
|
||||
base_url: format!("http://{addr}"),
|
||||
query_params: Some(HashMap::new()),
|
||||
headers: HeaderMap::new(),
|
||||
retry: crate::provider::RetryConfig {
|
||||
max_attempts: 1,
|
||||
base_delay: Duration::from_millis(1),
|
||||
retry_429: false,
|
||||
retry_5xx: false,
|
||||
retry_transport: false,
|
||||
},
|
||||
stream_idle_timeout: Duration::from_secs(5),
|
||||
};
|
||||
let client = RealtimeWebsocketClient::new(provider);
|
||||
let connection = client
|
||||
.connect(
|
||||
RealtimeSessionConfig {
|
||||
instructions: "backend prompt".to_string(),
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_1".to_string()),
|
||||
event_parser: RealtimeEventParser::RealtimeV2,
|
||||
session_mode: RealtimeSessionMode::Transcription,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
)
|
||||
.await
|
||||
.expect("connect");
|
||||
|
||||
let created = connection
|
||||
.next_event()
|
||||
.await
|
||||
.expect("next event")
|
||||
.expect("event");
|
||||
assert_eq!(
|
||||
created,
|
||||
RealtimeEvent::SessionUpdated {
|
||||
session_id: "sess_transcription".to_string(),
|
||||
instructions: None,
|
||||
}
|
||||
);
|
||||
|
||||
connection
|
||||
.send_audio_frame(RealtimeAudioFrame {
|
||||
data: "AQID".to_string(),
|
||||
sample_rate: 24_000,
|
||||
num_channels: 1,
|
||||
samples_per_channel: Some(480),
|
||||
})
|
||||
.await
|
||||
.expect("send audio");
|
||||
|
||||
connection.close().await.expect("close");
|
||||
server.await.expect("server task");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_transcription_mode_is_treated_as_conversational() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
|
||||
let addr = listener.local_addr().expect("local addr");
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.expect("accept");
|
||||
let mut ws = accept_async(stream).await.expect("accept ws");
|
||||
|
||||
let first = ws
|
||||
.next()
|
||||
.await
|
||||
.expect("first msg")
|
||||
.expect("first msg ok")
|
||||
.into_text()
|
||||
.expect("text");
|
||||
let first_json: Value = serde_json::from_str(&first).expect("json");
|
||||
assert_eq!(first_json["type"], "session.update");
|
||||
assert_eq!(
|
||||
first_json["session"]["type"],
|
||||
Value::String("quicksilver".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
first_json["session"]["instructions"],
|
||||
Value::String("backend prompt".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
first_json["session"]["audio"]["output"]["voice"],
|
||||
Value::String("fathom".to_string())
|
||||
);
|
||||
assert!(first_json["session"].get("tools").is_none());
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "session.updated",
|
||||
"session": {"id": "sess_v1_mode"}
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("send session.updated");
|
||||
});
|
||||
|
||||
let provider = Provider {
|
||||
name: "test".to_string(),
|
||||
base_url: format!("http://{addr}"),
|
||||
query_params: Some(HashMap::new()),
|
||||
headers: HeaderMap::new(),
|
||||
retry: crate::provider::RetryConfig {
|
||||
max_attempts: 1,
|
||||
base_delay: Duration::from_millis(1),
|
||||
retry_429: false,
|
||||
retry_5xx: false,
|
||||
retry_transport: false,
|
||||
},
|
||||
stream_idle_timeout: Duration::from_secs(5),
|
||||
};
|
||||
let client = RealtimeWebsocketClient::new(provider);
|
||||
let connection = client
|
||||
.connect(
|
||||
RealtimeSessionConfig {
|
||||
instructions: "backend prompt".to_string(),
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_1".to_string()),
|
||||
event_parser: RealtimeEventParser::V1,
|
||||
session_mode: RealtimeSessionMode::Transcription,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
)
|
||||
.await
|
||||
.expect("connect");
|
||||
|
||||
let created = connection
|
||||
.next_event()
|
||||
.await
|
||||
.expect("next event")
|
||||
.expect("event");
|
||||
assert_eq!(
|
||||
created,
|
||||
RealtimeEvent::SessionUpdated {
|
||||
session_id: "sess_v1_mode".to_string(),
|
||||
instructions: None,
|
||||
}
|
||||
);
|
||||
|
||||
connection.close().await.expect("close");
|
||||
server.await.expect("server task");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_does_not_block_while_next_event_waits_for_inbound_data() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
|
||||
@@ -1258,6 +1771,7 @@ mod tests {
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_1".to_string()),
|
||||
event_parser: RealtimeEventParser::V1,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
pub mod methods;
|
||||
pub mod protocol;
|
||||
mod protocol_common;
|
||||
mod protocol_v1;
|
||||
mod protocol_v2;
|
||||
|
||||
pub use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
@@ -10,3 +12,4 @@ pub use methods::RealtimeWebsocketEvents;
|
||||
pub use methods::RealtimeWebsocketWriter;
|
||||
pub use protocol::RealtimeEventParser;
|
||||
pub use protocol::RealtimeSessionConfig;
|
||||
pub use protocol::RealtimeSessionMode;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::endpoint::realtime_websocket::protocol_v1::parse_realtime_event_v1;
|
||||
use crate::endpoint::realtime_websocket::protocol_v2::parse_realtime_event_v2;
|
||||
pub use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
pub use codex_protocol::protocol::RealtimeEvent;
|
||||
@@ -6,7 +7,6 @@ pub use codex_protocol::protocol::RealtimeTranscriptDelta;
|
||||
pub use codex_protocol::protocol::RealtimeTranscriptEntry;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use tracing::debug;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RealtimeEventParser {
|
||||
@@ -14,12 +14,19 @@ pub enum RealtimeEventParser {
|
||||
RealtimeV2,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RealtimeSessionMode {
|
||||
Conversational,
|
||||
Transcription,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RealtimeSessionConfig {
|
||||
pub instructions: String,
|
||||
pub model: Option<String>,
|
||||
pub session_id: Option<String>,
|
||||
pub event_parser: RealtimeEventParser,
|
||||
pub session_mode: RealtimeSessionMode,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
@@ -35,21 +42,25 @@ pub(super) enum RealtimeOutboundMessage {
|
||||
#[serde(rename = "session.update")]
|
||||
SessionUpdate { session: SessionUpdateSession },
|
||||
#[serde(rename = "conversation.item.create")]
|
||||
ConversationItemCreate { item: ConversationItem },
|
||||
ConversationItemCreate { item: ConversationItemPayload },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub(super) struct SessionUpdateSession {
|
||||
#[serde(rename = "type")]
|
||||
pub(super) kind: String,
|
||||
pub(super) instructions: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) instructions: Option<String>,
|
||||
pub(super) audio: SessionAudio,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) tools: Option<Vec<SessionFunctionTool>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub(super) struct SessionAudio {
|
||||
pub(super) input: SessionAudioInput,
|
||||
pub(super) output: SessionAudioOutput,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) output: Option<SessionAudioOutput>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
@@ -70,13 +81,28 @@ pub(super) struct SessionAudioOutput {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub(super) struct ConversationItem {
|
||||
pub(super) struct ConversationMessageItem {
|
||||
#[serde(rename = "type")]
|
||||
pub(super) kind: String,
|
||||
pub(super) role: String,
|
||||
pub(super) content: Vec<ConversationItemContent>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub(super) enum ConversationItemPayload {
|
||||
Message(ConversationMessageItem),
|
||||
FunctionCallOutput(ConversationFunctionCallOutputItem),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub(super) struct ConversationFunctionCallOutputItem {
|
||||
#[serde(rename = "type")]
|
||||
pub(super) kind: String,
|
||||
pub(super) call_id: String,
|
||||
pub(super) output: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub(super) struct ConversationItemContent {
|
||||
#[serde(rename = "type")]
|
||||
@@ -84,6 +110,15 @@ pub(super) struct ConversationItemContent {
|
||||
pub(super) text: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub(super) struct SessionFunctionTool {
|
||||
#[serde(rename = "type")]
|
||||
pub(super) kind: String,
|
||||
pub(super) name: String,
|
||||
pub(super) description: String,
|
||||
pub(super) parameters: Value,
|
||||
}
|
||||
|
||||
pub(super) fn parse_realtime_event(
|
||||
payload: &str,
|
||||
event_parser: RealtimeEventParser,
|
||||
@@ -93,125 +128,3 @@ pub(super) fn parse_realtime_event(
|
||||
RealtimeEventParser::RealtimeV2 => parse_realtime_event_v2(payload),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_realtime_event_v1(payload: &str) -> Option<RealtimeEvent> {
|
||||
let parsed: Value = match serde_json::from_str(payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(err) => {
|
||||
debug!("failed to parse realtime event: {err}, data: {payload}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let message_type = match parsed.get("type").and_then(Value::as_str) {
|
||||
Some(message_type) => message_type,
|
||||
None => {
|
||||
debug!("received realtime event without type field: {payload}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
match message_type {
|
||||
"session.updated" => {
|
||||
let session_id = parsed
|
||||
.get("session")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|session| session.get("id"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string);
|
||||
let instructions = parsed
|
||||
.get("session")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|session| session.get("instructions"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string);
|
||||
session_id.map(|session_id| RealtimeEvent::SessionUpdated {
|
||||
session_id,
|
||||
instructions,
|
||||
})
|
||||
}
|
||||
"conversation.output_audio.delta" => {
|
||||
let data = parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.or_else(|| parsed.get("data").and_then(Value::as_str))
|
||||
.map(str::to_string)?;
|
||||
let sample_rate = parsed
|
||||
.get("sample_rate")
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|v| u32::try_from(v).ok())?;
|
||||
let num_channels = parsed
|
||||
.get("channels")
|
||||
.or_else(|| parsed.get("num_channels"))
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|v| u16::try_from(v).ok())?;
|
||||
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
|
||||
data,
|
||||
sample_rate,
|
||||
num_channels,
|
||||
samples_per_channel: parsed
|
||||
.get("samples_per_channel")
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|v| u32::try_from(v).ok()),
|
||||
}))
|
||||
}
|
||||
"conversation.input_transcript.delta" => parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
|
||||
"conversation.output_transcript.delta" => parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })),
|
||||
"conversation.item.added" => parsed
|
||||
.get("item")
|
||||
.cloned()
|
||||
.map(RealtimeEvent::ConversationItemAdded),
|
||||
"conversation.item.done" => parsed
|
||||
.get("item")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|item| item.get("id"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id }),
|
||||
"conversation.handoff.requested" => {
|
||||
let handoff_id = parsed
|
||||
.get("handoff_id")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
let item_id = parsed
|
||||
.get("item_id")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
let input_transcript = parsed
|
||||
.get("input_transcript")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
|
||||
handoff_id,
|
||||
item_id,
|
||||
input_transcript,
|
||||
active_transcript: Vec::new(),
|
||||
}))
|
||||
}
|
||||
"error" => parsed
|
||||
.get("message")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.or_else(|| {
|
||||
parsed
|
||||
.get("error")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|error| error.get("message"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
})
|
||||
.or_else(|| parsed.get("error").map(std::string::ToString::to_string))
|
||||
.map(RealtimeEvent::Error),
|
||||
_ => {
|
||||
debug!("received unsupported realtime event type: {message_type}, data: {payload}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeTranscriptDelta;
|
||||
use serde_json::Value;
|
||||
use tracing::debug;
|
||||
|
||||
pub(super) fn parse_realtime_payload(payload: &str, parser_name: &str) -> Option<(Value, String)> {
|
||||
let parsed: Value = match serde_json::from_str(payload) {
|
||||
Ok(message) => message,
|
||||
Err(err) => {
|
||||
debug!("failed to parse {parser_name} event: {err}, data: {payload}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let message_type = match parsed.get("type").and_then(Value::as_str) {
|
||||
Some(message_type) => message_type.to_string(),
|
||||
None => {
|
||||
debug!("received {parser_name} event without type field: {payload}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
Some((parsed, message_type))
|
||||
}
|
||||
|
||||
pub(super) fn parse_session_updated_event(parsed: &Value) -> Option<RealtimeEvent> {
|
||||
let session_id = parsed
|
||||
.get("session")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|session| session.get("id"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
let instructions = parsed
|
||||
.get("session")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|session| session.get("instructions"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string);
|
||||
Some(RealtimeEvent::SessionUpdated {
|
||||
session_id,
|
||||
instructions,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn parse_transcript_delta_event(
|
||||
parsed: &Value,
|
||||
field: &str,
|
||||
) -> Option<RealtimeTranscriptDelta> {
|
||||
parsed
|
||||
.get(field)
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|delta| RealtimeTranscriptDelta { delta })
|
||||
}
|
||||
|
||||
pub(super) fn parse_error_event(parsed: &Value) -> Option<RealtimeEvent> {
|
||||
parsed
|
||||
.get("message")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.or_else(|| {
|
||||
parsed
|
||||
.get("error")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|error| error.get("message"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
})
|
||||
.or_else(|| parsed.get("error").map(ToString::to_string))
|
||||
.map(RealtimeEvent::Error)
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
use crate::endpoint::realtime_websocket::protocol_common::parse_error_event;
|
||||
use crate::endpoint::realtime_websocket::protocol_common::parse_realtime_payload;
|
||||
use crate::endpoint::realtime_websocket::protocol_common::parse_session_updated_event;
|
||||
use crate::endpoint::realtime_websocket::protocol_common::parse_transcript_delta_event;
|
||||
use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use serde_json::Value;
|
||||
use tracing::debug;
|
||||
|
||||
pub(super) fn parse_realtime_event_v1(payload: &str) -> Option<RealtimeEvent> {
|
||||
let (parsed, message_type) = parse_realtime_payload(payload, "realtime v1")?;
|
||||
match message_type.as_str() {
|
||||
"session.updated" => parse_session_updated_event(&parsed),
|
||||
"conversation.output_audio.delta" => {
|
||||
let data = parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.or_else(|| parsed.get("data").and_then(Value::as_str))
|
||||
.map(str::to_string)?;
|
||||
let sample_rate = parsed
|
||||
.get("sample_rate")
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|value| u32::try_from(value).ok())?;
|
||||
let num_channels = parsed
|
||||
.get("channels")
|
||||
.or_else(|| parsed.get("num_channels"))
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|value| u16::try_from(value).ok())?;
|
||||
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
|
||||
data,
|
||||
sample_rate,
|
||||
num_channels,
|
||||
samples_per_channel: parsed
|
||||
.get("samples_per_channel")
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|value| u32::try_from(value).ok()),
|
||||
}))
|
||||
}
|
||||
"conversation.input_transcript.delta" => {
|
||||
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::InputTranscriptDelta)
|
||||
}
|
||||
"conversation.output_transcript.delta" => {
|
||||
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::OutputTranscriptDelta)
|
||||
}
|
||||
"conversation.item.added" => parsed
|
||||
.get("item")
|
||||
.cloned()
|
||||
.map(RealtimeEvent::ConversationItemAdded),
|
||||
"conversation.item.done" => parsed
|
||||
.get("item")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|item| item.get("id"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id }),
|
||||
"conversation.handoff.requested" => {
|
||||
let handoff_id = parsed
|
||||
.get("handoff_id")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
let item_id = parsed
|
||||
.get("item_id")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
let input_transcript = parsed
|
||||
.get("input_transcript")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
|
||||
handoff_id,
|
||||
item_id,
|
||||
input_transcript,
|
||||
active_transcript: Vec::new(),
|
||||
}))
|
||||
}
|
||||
"error" => parse_error_event(&parsed),
|
||||
_ => {
|
||||
debug!("received unsupported realtime v1 event type: {message_type}, data: {payload}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,157 +1,130 @@
|
||||
use crate::endpoint::realtime_websocket::protocol_common::parse_error_event;
|
||||
use crate::endpoint::realtime_websocket::protocol_common::parse_realtime_payload;
|
||||
use crate::endpoint::realtime_websocket::protocol_common::parse_session_updated_event;
|
||||
use crate::endpoint::realtime_websocket::protocol_common::parse_transcript_delta_event;
|
||||
use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use codex_protocol::protocol::RealtimeTranscriptDelta;
|
||||
use serde_json::Map as JsonMap;
|
||||
use serde_json::Value;
|
||||
use tracing::debug;
|
||||
|
||||
const CODEX_TOOL_NAME: &str = "codex";
|
||||
const DEFAULT_AUDIO_SAMPLE_RATE: u32 = 24_000;
|
||||
const DEFAULT_AUDIO_CHANNELS: u16 = 1;
|
||||
const TOOL_ARGUMENT_KEYS: [&str; 5] = ["input_transcript", "input", "text", "prompt", "query"];
|
||||
|
||||
pub(super) fn parse_realtime_event_v2(payload: &str) -> Option<RealtimeEvent> {
|
||||
let parsed: Value = match serde_json::from_str(payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(err) => {
|
||||
debug!("failed to parse realtime v2 event: {err}, data: {payload}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let (parsed, message_type) = parse_realtime_payload(payload, "realtime v2")?;
|
||||
|
||||
let message_type = match parsed.get("type").and_then(Value::as_str) {
|
||||
Some(message_type) => message_type,
|
||||
None => {
|
||||
debug!("received realtime v2 event without type field: {payload}");
|
||||
return None;
|
||||
match message_type.as_str() {
|
||||
"session.updated" => parse_session_updated_event(&parsed),
|
||||
"response.output_audio.delta" => parse_output_audio_delta_event(&parsed),
|
||||
"conversation.item.input_audio_transcription.delta" => {
|
||||
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::InputTranscriptDelta)
|
||||
}
|
||||
};
|
||||
|
||||
match message_type {
|
||||
"session.updated" => {
|
||||
let session_id = parsed
|
||||
.get("session")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|session| session.get("id"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string);
|
||||
let instructions = parsed
|
||||
.get("session")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|session| session.get("instructions"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string);
|
||||
session_id.map(|session_id| RealtimeEvent::SessionUpdated {
|
||||
session_id,
|
||||
instructions,
|
||||
})
|
||||
"conversation.item.input_audio_transcription.completed" => {
|
||||
parse_transcript_delta_event(&parsed, "transcript")
|
||||
.map(RealtimeEvent::InputTranscriptDelta)
|
||||
}
|
||||
"response.output_audio.delta" => {
|
||||
let data = parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
let sample_rate = parsed
|
||||
.get("sample_rate")
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|value| u32::try_from(value).ok())
|
||||
.unwrap_or(24_000);
|
||||
let num_channels = parsed
|
||||
.get("channels")
|
||||
.or_else(|| parsed.get("num_channels"))
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|value| u16::try_from(value).ok())
|
||||
.unwrap_or(1);
|
||||
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
|
||||
data,
|
||||
sample_rate,
|
||||
num_channels,
|
||||
samples_per_channel: parsed
|
||||
.get("samples_per_channel")
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|value| u32::try_from(value).ok()),
|
||||
}))
|
||||
"response.output_text.delta" | "response.output_audio_transcript.delta" => {
|
||||
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::OutputTranscriptDelta)
|
||||
}
|
||||
"conversation.item.input_audio_transcription.delta" => parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
|
||||
"conversation.item.input_audio_transcription.completed" => parsed
|
||||
.get("transcript")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
|
||||
"response.output_text.delta" | "response.output_audio_transcript.delta" => parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })),
|
||||
"conversation.item.added" => parsed
|
||||
.get("item")
|
||||
.cloned()
|
||||
.map(RealtimeEvent::ConversationItemAdded),
|
||||
"conversation.item.done" => {
|
||||
let item = parsed.get("item")?.as_object()?;
|
||||
let item_type = item.get("type").and_then(Value::as_str);
|
||||
let item_name = item.get("name").and_then(Value::as_str);
|
||||
|
||||
if item_type == Some("function_call") && item_name == Some("codex") {
|
||||
let call_id = item
|
||||
.get("call_id")
|
||||
.and_then(Value::as_str)
|
||||
.or_else(|| item.get("id").and_then(Value::as_str))?;
|
||||
let item_id = item
|
||||
.get("id")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or(call_id)
|
||||
.to_string();
|
||||
let arguments = item.get("arguments").and_then(Value::as_str).unwrap_or("");
|
||||
let mut input_transcript = String::new();
|
||||
if !arguments.is_empty() {
|
||||
if let Ok(arguments_json) = serde_json::from_str::<Value>(arguments)
|
||||
&& let Some(arguments_object) = arguments_json.as_object()
|
||||
{
|
||||
for key in ["input_transcript", "input", "text", "prompt", "query"] {
|
||||
if let Some(value) = arguments_object.get(key).and_then(Value::as_str) {
|
||||
let trimmed = value.trim();
|
||||
if !trimmed.is_empty() {
|
||||
input_transcript = trimmed.to_string();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if input_transcript.is_empty() {
|
||||
input_transcript = arguments.to_string();
|
||||
}
|
||||
}
|
||||
|
||||
return Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
|
||||
handoff_id: call_id.to_string(),
|
||||
item_id,
|
||||
input_transcript,
|
||||
active_transcript: Vec::new(),
|
||||
}));
|
||||
}
|
||||
|
||||
item.get("id")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id })
|
||||
}
|
||||
"error" => parsed
|
||||
.get("message")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.or_else(|| {
|
||||
parsed
|
||||
.get("error")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|error| error.get("message"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
})
|
||||
.or_else(|| parsed.get("error").map(ToString::to_string))
|
||||
.map(RealtimeEvent::Error),
|
||||
"conversation.item.done" => parse_conversation_item_done_event(&parsed),
|
||||
"error" => parse_error_event(&parsed),
|
||||
_ => {
|
||||
debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_output_audio_delta_event(parsed: &Value) -> Option<RealtimeEvent> {
|
||||
let data = parsed
|
||||
.get("delta")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)?;
|
||||
let sample_rate = parsed
|
||||
.get("sample_rate")
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|value| u32::try_from(value).ok())
|
||||
.unwrap_or(DEFAULT_AUDIO_SAMPLE_RATE);
|
||||
let num_channels = parsed
|
||||
.get("channels")
|
||||
.or_else(|| parsed.get("num_channels"))
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|value| u16::try_from(value).ok())
|
||||
.unwrap_or(DEFAULT_AUDIO_CHANNELS);
|
||||
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
|
||||
data,
|
||||
sample_rate,
|
||||
num_channels,
|
||||
samples_per_channel: parsed
|
||||
.get("samples_per_channel")
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|value| u32::try_from(value).ok()),
|
||||
}))
|
||||
}
|
||||
|
||||
fn parse_conversation_item_done_event(parsed: &Value) -> Option<RealtimeEvent> {
|
||||
let item = parsed.get("item")?.as_object()?;
|
||||
if let Some(handoff) = parse_handoff_requested_event(item) {
|
||||
return Some(handoff);
|
||||
}
|
||||
|
||||
item.get("id")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id })
|
||||
}
|
||||
|
||||
fn parse_handoff_requested_event(item: &JsonMap<String, Value>) -> Option<RealtimeEvent> {
|
||||
let item_type = item.get("type").and_then(Value::as_str);
|
||||
let item_name = item.get("name").and_then(Value::as_str);
|
||||
if item_type != Some("function_call") || item_name != Some(CODEX_TOOL_NAME) {
|
||||
return None;
|
||||
}
|
||||
|
||||
let call_id = item
|
||||
.get("call_id")
|
||||
.and_then(Value::as_str)
|
||||
.or_else(|| item.get("id").and_then(Value::as_str))?;
|
||||
let item_id = item
|
||||
.get("id")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or(call_id)
|
||||
.to_string();
|
||||
let arguments = item.get("arguments").and_then(Value::as_str).unwrap_or("");
|
||||
|
||||
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
|
||||
handoff_id: call_id.to_string(),
|
||||
item_id,
|
||||
input_transcript: extract_input_transcript(arguments),
|
||||
active_transcript: Vec::new(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn extract_input_transcript(arguments: &str) -> String {
|
||||
if arguments.is_empty() {
|
||||
return String::new();
|
||||
}
|
||||
|
||||
if let Ok(arguments_json) = serde_json::from_str::<Value>(arguments)
|
||||
&& let Some(arguments_object) = arguments_json.as_object()
|
||||
{
|
||||
for key in TOOL_ARGUMENT_KEYS {
|
||||
if let Some(value) = arguments_object.get(key).and_then(Value::as_str) {
|
||||
let trimmed = value.trim();
|
||||
if !trimmed.is_empty() {
|
||||
return trimmed.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
arguments.to_string()
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ pub use crate::endpoint::memories::MemoriesClient;
|
||||
pub use crate::endpoint::models::ModelsClient;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketClient;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketConnection;
|
||||
pub use crate::endpoint::responses::ResponsesClient;
|
||||
|
||||
@@ -6,6 +6,7 @@ use codex_api::RealtimeAudioFrame;
|
||||
use codex_api::RealtimeEvent;
|
||||
use codex_api::RealtimeEventParser;
|
||||
use codex_api::RealtimeSessionConfig;
|
||||
use codex_api::RealtimeSessionMode;
|
||||
use codex_api::RealtimeWebsocketClient;
|
||||
use codex_api::provider::Provider;
|
||||
use codex_api::provider::RetryConfig;
|
||||
@@ -142,6 +143,7 @@ async fn realtime_ws_e2e_session_create_and_event_flow() {
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_123".to_string()),
|
||||
event_parser: RealtimeEventParser::V1,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
@@ -235,6 +237,7 @@ async fn realtime_ws_e2e_send_while_next_event_waits() {
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_123".to_string()),
|
||||
event_parser: RealtimeEventParser::V1,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
@@ -299,6 +302,7 @@ async fn realtime_ws_e2e_disconnected_emitted_once() {
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_123".to_string()),
|
||||
event_parser: RealtimeEventParser::V1,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
@@ -360,6 +364,7 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() {
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_123".to_string()),
|
||||
event_parser: RealtimeEventParser::V1,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
@@ -424,6 +429,7 @@ async fn realtime_ws_e2e_realtime_v2_parser_emits_handoff_requested() {
|
||||
model: Some("realtime-test-model".to_string()),
|
||||
session_id: Some("conv_123".to_string()),
|
||||
event_parser: RealtimeEventParser::RealtimeV2,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
|
||||
@@ -1342,6 +1342,13 @@
|
||||
},
|
||||
"type": "object"
|
||||
},
|
||||
"RealtimeWsMode": {
|
||||
"enum": [
|
||||
"conversational",
|
||||
"transcription"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"ReasoningEffort": {
|
||||
"description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning",
|
||||
"enum": [
|
||||
@@ -1816,6 +1823,14 @@
|
||||
"description": "Experimental / do not use. Overrides only the realtime conversation websocket transport base URL (the `Op::RealtimeConversation` `/v1/realtime` connection) without changing normal provider HTTP requests.",
|
||||
"type": "string"
|
||||
},
|
||||
"experimental_realtime_ws_mode": {
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/RealtimeWsMode"
|
||||
}
|
||||
],
|
||||
"description": "Experimental / do not use. Selects the realtime websocket intent mode. `conversational` is speech-to-speech while `transcription` is transcript-only."
|
||||
},
|
||||
"experimental_realtime_ws_model": {
|
||||
"description": "Experimental / do not use. Selects the realtime websocket model/snapshot used for the `Op::RealtimeConversation` connection.",
|
||||
"type": "string"
|
||||
|
||||
@@ -4129,6 +4129,7 @@ fn test_precedence_fixture_with_o3_profile() -> std::io::Result<()> {
|
||||
experimental_realtime_start_instructions: None,
|
||||
experimental_realtime_ws_base_url: None,
|
||||
experimental_realtime_ws_model: None,
|
||||
experimental_realtime_ws_mode: RealtimeWsMode::Conversational,
|
||||
experimental_realtime_ws_backend_prompt: None,
|
||||
experimental_realtime_ws_startup_context: None,
|
||||
base_instructions: None,
|
||||
@@ -4265,6 +4266,7 @@ fn test_precedence_fixture_with_gpt3_profile() -> std::io::Result<()> {
|
||||
experimental_realtime_start_instructions: None,
|
||||
experimental_realtime_ws_base_url: None,
|
||||
experimental_realtime_ws_model: None,
|
||||
experimental_realtime_ws_mode: RealtimeWsMode::Conversational,
|
||||
experimental_realtime_ws_backend_prompt: None,
|
||||
experimental_realtime_ws_startup_context: None,
|
||||
base_instructions: None,
|
||||
@@ -4399,6 +4401,7 @@ fn test_precedence_fixture_with_zdr_profile() -> std::io::Result<()> {
|
||||
experimental_realtime_start_instructions: None,
|
||||
experimental_realtime_ws_base_url: None,
|
||||
experimental_realtime_ws_model: None,
|
||||
experimental_realtime_ws_mode: RealtimeWsMode::Conversational,
|
||||
experimental_realtime_ws_backend_prompt: None,
|
||||
experimental_realtime_ws_startup_context: None,
|
||||
base_instructions: None,
|
||||
@@ -4519,6 +4522,7 @@ fn test_precedence_fixture_with_gpt5_profile() -> std::io::Result<()> {
|
||||
experimental_realtime_start_instructions: None,
|
||||
experimental_realtime_ws_base_url: None,
|
||||
experimental_realtime_ws_model: None,
|
||||
experimental_realtime_ws_mode: RealtimeWsMode::Conversational,
|
||||
experimental_realtime_ws_backend_prompt: None,
|
||||
experimental_realtime_ws_startup_context: None,
|
||||
base_instructions: None,
|
||||
@@ -5566,6 +5570,34 @@ experimental_realtime_ws_model = "realtime-test-model"
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn experimental_realtime_ws_mode_loads_from_config_toml() -> std::io::Result<()> {
|
||||
let cfg: ConfigToml = toml::from_str(
|
||||
r#"
|
||||
experimental_realtime_ws_mode = "transcription"
|
||||
"#,
|
||||
)
|
||||
.expect("TOML deserialization should succeed");
|
||||
|
||||
assert_eq!(
|
||||
cfg.experimental_realtime_ws_mode,
|
||||
Some(RealtimeWsMode::Transcription)
|
||||
);
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let config = Config::load_from_base_config_with_overrides(
|
||||
cfg,
|
||||
ConfigOverrides::default(),
|
||||
codex_home.path().to_path_buf(),
|
||||
)?;
|
||||
|
||||
assert_eq!(
|
||||
config.experimental_realtime_ws_mode,
|
||||
RealtimeWsMode::Transcription
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn realtime_audio_loads_from_config_toml() -> std::io::Result<()> {
|
||||
let cfg: ConfigToml = toml::from_str(
|
||||
|
||||
@@ -463,6 +463,9 @@ pub struct Config {
|
||||
/// Experimental / do not use. Selects the realtime websocket model/snapshot
|
||||
/// used for the `Op::RealtimeConversation` connection.
|
||||
pub experimental_realtime_ws_model: Option<String>,
|
||||
/// Experimental / do not use. Selects the realtime websocket intent mode.
|
||||
/// `conversational` is speech-to-speech while `transcription` is transcript-only.
|
||||
pub experimental_realtime_ws_mode: RealtimeWsMode,
|
||||
/// Experimental / do not use. Overrides only the realtime conversation
|
||||
/// websocket transport instructions (the `Op::RealtimeConversation`
|
||||
/// `/ws` session.update instructions) without changing normal prompts.
|
||||
@@ -1238,6 +1241,9 @@ pub struct ConfigToml {
|
||||
/// Experimental / do not use. Selects the realtime websocket model/snapshot
|
||||
/// used for the `Op::RealtimeConversation` connection.
|
||||
pub experimental_realtime_ws_model: Option<String>,
|
||||
/// Experimental / do not use. Selects the realtime websocket intent mode.
|
||||
/// `conversational` is speech-to-speech while `transcription` is transcript-only.
|
||||
pub experimental_realtime_ws_mode: Option<RealtimeWsMode>,
|
||||
/// Experimental / do not use. Overrides only the realtime conversation
|
||||
/// websocket transport instructions (the `Op::RealtimeConversation`
|
||||
/// `/ws` session.update instructions) without changing normal prompts.
|
||||
@@ -1383,6 +1389,14 @@ pub struct RealtimeAudioConfig {
|
||||
pub speaker: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Default, PartialEq, Eq, JsonSchema)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum RealtimeWsMode {
|
||||
#[default]
|
||||
Conversational,
|
||||
Transcription,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, JsonSchema)]
|
||||
#[schemars(deny_unknown_fields)]
|
||||
pub struct RealtimeAudioToml {
|
||||
@@ -2462,6 +2476,7 @@ impl Config {
|
||||
}),
|
||||
experimental_realtime_ws_base_url: cfg.experimental_realtime_ws_base_url,
|
||||
experimental_realtime_ws_model: cfg.experimental_realtime_ws_model,
|
||||
experimental_realtime_ws_mode: cfg.experimental_realtime_ws_mode.unwrap_or_default(),
|
||||
experimental_realtime_ws_backend_prompt: cfg.experimental_realtime_ws_backend_prompt,
|
||||
experimental_realtime_ws_startup_context: cfg.experimental_realtime_ws_startup_context,
|
||||
experimental_realtime_start_instructions: cfg.experimental_realtime_start_instructions,
|
||||
|
||||
@@ -15,6 +15,7 @@ use codex_api::RealtimeAudioFrame;
|
||||
use codex_api::RealtimeEvent;
|
||||
use codex_api::RealtimeEventParser;
|
||||
use codex_api::RealtimeSessionConfig;
|
||||
use codex_api::RealtimeSessionMode;
|
||||
use codex_api::RealtimeWebsocketClient;
|
||||
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketEvents;
|
||||
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketWriter;
|
||||
@@ -116,10 +117,7 @@ impl RealtimeConversationManager {
|
||||
&self,
|
||||
api_provider: ApiProvider,
|
||||
extra_headers: Option<HeaderMap>,
|
||||
prompt: String,
|
||||
model: Option<String>,
|
||||
session_id: Option<String>,
|
||||
event_parser: RealtimeEventParser,
|
||||
session_config: RealtimeSessionConfig,
|
||||
) -> CodexResult<(Receiver<RealtimeEvent>, Arc<AtomicBool>)> {
|
||||
let previous_state = {
|
||||
let mut guard = self.state.lock().await;
|
||||
@@ -131,12 +129,6 @@ impl RealtimeConversationManager {
|
||||
let _ = state.task.await;
|
||||
}
|
||||
|
||||
let session_config = RealtimeSessionConfig {
|
||||
instructions: prompt,
|
||||
model,
|
||||
session_id,
|
||||
event_parser,
|
||||
};
|
||||
let client = RealtimeWebsocketClient::new(api_provider);
|
||||
let connection = client
|
||||
.connect(
|
||||
@@ -307,23 +299,26 @@ pub(crate) async fn handle_start(
|
||||
} else {
|
||||
RealtimeEventParser::V1
|
||||
};
|
||||
|
||||
let session_mode = match config.experimental_realtime_ws_mode {
|
||||
crate::config::RealtimeWsMode::Conversational => RealtimeSessionMode::Conversational,
|
||||
crate::config::RealtimeWsMode::Transcription => RealtimeSessionMode::Transcription,
|
||||
};
|
||||
let requested_session_id = params
|
||||
.session_id
|
||||
.or_else(|| Some(sess.conversation_id.to_string()));
|
||||
let session_config = RealtimeSessionConfig {
|
||||
instructions: prompt,
|
||||
model,
|
||||
session_id: requested_session_id.clone(),
|
||||
event_parser,
|
||||
session_mode,
|
||||
};
|
||||
let extra_headers =
|
||||
realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?;
|
||||
info!("starting realtime conversation");
|
||||
let (events_rx, realtime_active) = match sess
|
||||
.conversation
|
||||
.start(
|
||||
api_provider,
|
||||
extra_headers,
|
||||
prompt,
|
||||
model,
|
||||
requested_session_id.clone(),
|
||||
event_parser,
|
||||
)
|
||||
.start(api_provider, extra_headers, session_config)
|
||||
.await
|
||||
{
|
||||
Ok(events_rx) => events_rx,
|
||||
|
||||
@@ -123,7 +123,7 @@ impl ActionKind {
|
||||
let (path, _) = target.resolve_for_patch(test);
|
||||
let _ = fs::remove_file(&path);
|
||||
let command = format!("printf {content:?} > {path:?} && cat {path:?}");
|
||||
let event = shell_event(call_id, &command, 1_000, sandbox_permissions)?;
|
||||
let event = shell_event(call_id, &command, 5_000, sandbox_permissions)?;
|
||||
Ok((event, Some(command)))
|
||||
}
|
||||
ActionKind::FetchUrl {
|
||||
|
||||
Reference in New Issue
Block a user