mirror of
https://github.com/openai/codex.git
synced 2026-04-07 22:34:49 +00:00
Compare commits
1 Commits
dev/window
...
realtime-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96fee71d24 |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -8239,6 +8239,7 @@ dependencies = [
|
||||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"native-tls",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -2815,6 +2815,47 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadRealtimeStartProtocol": {
|
||||
"description": "EXPERIMENTAL - realtime start protocol selected by the client.",
|
||||
"oneOf": [
|
||||
{
|
||||
"properties": {
|
||||
"type": {
|
||||
"enum": [
|
||||
"jsonRpcPcm"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"type"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartProtocol",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"offerSdp": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"rtc"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"offerSdp",
|
||||
"type"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartProtocol",
|
||||
"type": "object"
|
||||
}
|
||||
]
|
||||
},
|
||||
"ThreadResumeParams": {
|
||||
"description": "There are three ways to resume a thread: 1. By thread_id: load the thread from disk by thread_id and resume it. 2. By history: instantiate the thread from memory and resume it. 3. By path: load the thread from disk by path and resume it.\n\nThe precedence is: history > path > thread_id. If using history or path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.",
|
||||
"properties": {
|
||||
|
||||
@@ -13518,6 +13518,88 @@
|
||||
"title": "ThreadRealtimeOutputAudioDeltaNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadRealtimeStartProtocol": {
|
||||
"description": "EXPERIMENTAL - realtime start protocol selected by the client.",
|
||||
"oneOf": [
|
||||
{
|
||||
"properties": {
|
||||
"type": {
|
||||
"enum": [
|
||||
"jsonRpcPcm"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"type"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartProtocol",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"offerSdp": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"rtc"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"offerSdp",
|
||||
"type"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartProtocol",
|
||||
"type": "object"
|
||||
}
|
||||
]
|
||||
},
|
||||
"ThreadRealtimeStartResponseProtocol": {
|
||||
"description": "EXPERIMENTAL - realtime start response protocol payload.",
|
||||
"oneOf": [
|
||||
{
|
||||
"properties": {
|
||||
"type": {
|
||||
"enum": [
|
||||
"jsonRpcPcm"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartResponseProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"type"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartResponseProtocol",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"answerSdp": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"rtc"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartResponseProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"answerSdp",
|
||||
"type"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartResponseProtocol",
|
||||
"type": "object"
|
||||
}
|
||||
]
|
||||
},
|
||||
"ThreadRealtimeStartedNotification": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"description": "EXPERIMENTAL - emitted when thread realtime startup is accepted.",
|
||||
|
||||
@@ -11373,6 +11373,88 @@
|
||||
"title": "ThreadRealtimeOutputAudioDeltaNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadRealtimeStartProtocol": {
|
||||
"description": "EXPERIMENTAL - realtime start protocol selected by the client.",
|
||||
"oneOf": [
|
||||
{
|
||||
"properties": {
|
||||
"type": {
|
||||
"enum": [
|
||||
"jsonRpcPcm"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"type"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartProtocol",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"offerSdp": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"rtc"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"offerSdp",
|
||||
"type"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartProtocol",
|
||||
"type": "object"
|
||||
}
|
||||
]
|
||||
},
|
||||
"ThreadRealtimeStartResponseProtocol": {
|
||||
"description": "EXPERIMENTAL - realtime start response protocol payload.",
|
||||
"oneOf": [
|
||||
{
|
||||
"properties": {
|
||||
"type": {
|
||||
"enum": [
|
||||
"jsonRpcPcm"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartResponseProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"type"
|
||||
],
|
||||
"title": "JsonRpcPcmThreadRealtimeStartResponseProtocol",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"answerSdp": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"rtc"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartResponseProtocolType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"answerSdp",
|
||||
"type"
|
||||
],
|
||||
"title": "RtcThreadRealtimeStartResponseProtocol",
|
||||
"type": "object"
|
||||
}
|
||||
]
|
||||
},
|
||||
"ThreadRealtimeStartedNotification": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"description": "EXPERIMENTAL - emitted when thread realtime startup is accepted.",
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
/**
|
||||
* EXPERIMENTAL - realtime start protocol selected by the client.
|
||||
*/
|
||||
export type ThreadRealtimeStartProtocol = { "type": "jsonRpcPcm" } | { "type": "rtc", offerSdp: string, };
|
||||
@@ -0,0 +1,8 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
/**
|
||||
* EXPERIMENTAL - realtime start response protocol payload.
|
||||
*/
|
||||
export type ThreadRealtimeStartResponseProtocol = { "type": "jsonRpcPcm" } | { "type": "rtc", answerSdp: string, };
|
||||
@@ -291,6 +291,8 @@ export type { ThreadRealtimeClosedNotification } from "./ThreadRealtimeClosedNot
|
||||
export type { ThreadRealtimeErrorNotification } from "./ThreadRealtimeErrorNotification";
|
||||
export type { ThreadRealtimeItemAddedNotification } from "./ThreadRealtimeItemAddedNotification";
|
||||
export type { ThreadRealtimeOutputAudioDeltaNotification } from "./ThreadRealtimeOutputAudioDeltaNotification";
|
||||
export type { ThreadRealtimeStartProtocol } from "./ThreadRealtimeStartProtocol";
|
||||
export type { ThreadRealtimeStartResponseProtocol } from "./ThreadRealtimeStartResponseProtocol";
|
||||
export type { ThreadRealtimeStartedNotification } from "./ThreadRealtimeStartedNotification";
|
||||
export type { ThreadRealtimeTranscriptUpdatedNotification } from "./ThreadRealtimeTranscriptUpdatedNotification";
|
||||
export type { ThreadResumeParams } from "./ThreadResumeParams";
|
||||
|
||||
@@ -1695,6 +1695,7 @@ mod tests {
|
||||
thread_id: "thr_123".to_string(),
|
||||
prompt: "You are on a call".to_string(),
|
||||
session_id: Some("sess_456".to_string()),
|
||||
protocol: v2::ThreadRealtimeStartProtocol::JsonRpcPcm,
|
||||
},
|
||||
};
|
||||
assert_eq!(
|
||||
@@ -1704,7 +1705,37 @@ mod tests {
|
||||
"params": {
|
||||
"threadId": "thr_123",
|
||||
"prompt": "You are on a call",
|
||||
"sessionId": "sess_456"
|
||||
"sessionId": "sess_456",
|
||||
"protocol": { "type": "jsonRpcPcm" }
|
||||
}
|
||||
}),
|
||||
serde_json::to_value(&request)?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_thread_realtime_start_rtc_protocol() -> Result<()> {
|
||||
let request = ClientRequest::ThreadRealtimeStart {
|
||||
request_id: RequestId::Integer(9),
|
||||
params: v2::ThreadRealtimeStartParams {
|
||||
thread_id: "thr_123".to_string(),
|
||||
prompt: "You are on a call".to_string(),
|
||||
session_id: None,
|
||||
protocol: v2::ThreadRealtimeStartProtocol::Rtc {
|
||||
offer_sdp: "v=0".to_string(),
|
||||
},
|
||||
},
|
||||
};
|
||||
assert_eq!(
|
||||
json!({
|
||||
"method": "thread/realtime/start",
|
||||
"id": 9,
|
||||
"params": {
|
||||
"threadId": "thr_123",
|
||||
"prompt": "You are on a call",
|
||||
"sessionId": null,
|
||||
"protocol": { "type": "rtc", "offerSdp": "v=0" }
|
||||
}
|
||||
}),
|
||||
serde_json::to_value(&request)?,
|
||||
@@ -1784,6 +1815,7 @@ mod tests {
|
||||
thread_id: "thr_123".to_string(),
|
||||
prompt: "You are on a call".to_string(),
|
||||
session_id: None,
|
||||
protocol: v2::ThreadRealtimeStartProtocol::JsonRpcPcm,
|
||||
},
|
||||
};
|
||||
let reason = crate::experimental_api::ExperimentalApi::experimental_reason(&request);
|
||||
|
||||
@@ -3807,7 +3807,7 @@ impl From<ThreadRealtimeAudioChunk> for CoreRealtimeAudioFrame {
|
||||
}
|
||||
|
||||
/// EXPERIMENTAL - start a thread-scoped realtime session.
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadRealtimeStartParams {
|
||||
@@ -3815,13 +3815,44 @@ pub struct ThreadRealtimeStartParams {
|
||||
pub prompt: String,
|
||||
#[ts(optional = nullable)]
|
||||
pub session_id: Option<String>,
|
||||
pub protocol: ThreadRealtimeStartProtocol,
|
||||
}
|
||||
|
||||
/// EXPERIMENTAL - realtime start protocol selected by the client.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
#[ts(tag = "type")]
|
||||
pub enum ThreadRealtimeStartProtocol {
|
||||
JsonRpcPcm,
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
Rtc {
|
||||
offer_sdp: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// EXPERIMENTAL - response for starting thread realtime.
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadRealtimeStartResponse {}
|
||||
pub struct ThreadRealtimeStartResponse {
|
||||
pub protocol: ThreadRealtimeStartResponseProtocol,
|
||||
}
|
||||
|
||||
/// EXPERIMENTAL - realtime start response protocol payload.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
#[ts(tag = "type")]
|
||||
pub enum ThreadRealtimeStartResponseProtocol {
|
||||
JsonRpcPcm,
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
Rtc {
|
||||
answer_sdp: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// EXPERIMENTAL - append audio input to thread realtime.
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
|
||||
|
||||
@@ -151,8 +151,8 @@ Example with notification opt-out:
|
||||
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
|
||||
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. Review and manual compaction turns reject `turn/steer`.
|
||||
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
|
||||
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); returns `{}` and streams `thread/realtime/*` notifications.
|
||||
- `thread/realtime/appendAudio` — append an input audio chunk to the active realtime session (experimental); returns `{}`.
|
||||
- `thread/realtime/start` — start a thread-scoped realtime session (experimental). With `features.realtime_rtc=false`, clients send `{ protocol: { type: "jsonRpcPcm" } }`, receive `{ protocol: { type: "jsonRpcPcm" } }`, and stream audio over JSON-RPC. With `features.realtime_rtc=true`, clients send `{ protocol: { type: "rtc", offerSdp } }`, receive `{ protocol: { type: "rtc", answerSdp } }`, and media flows over WebRTC while the harness keeps handling realtime events and tool calls.
|
||||
- `thread/realtime/appendAudio` — append an input audio chunk to the active JSON-RPC PCM realtime session (experimental); returns `{}`. Unavailable when `features.realtime_rtc=true`.
|
||||
- `thread/realtime/appendText` — append text input to the active realtime session (experimental); returns `{}`.
|
||||
- `thread/realtime/stop` — stop the active realtime session for the thread (experimental); returns `{}`.
|
||||
- `review/start` — kick off Codex’s automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.
|
||||
|
||||
@@ -142,7 +142,9 @@ use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeAppendTextParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeAppendTextResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartProtocol;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartResponseProtocol;
|
||||
use codex_app_server_protocol::ThreadRealtimeStopParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeStopResponse;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
@@ -273,6 +275,7 @@ use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus;
|
||||
use codex_protocol::protocol::McpServerRefreshConfig;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
|
||||
use codex_protocol::protocol::RealtimeConversationTransport;
|
||||
use codex_protocol::protocol::ReviewDelivery as CoreReviewDelivery;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::ReviewTarget as CoreReviewTarget;
|
||||
@@ -6795,30 +6798,101 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
};
|
||||
|
||||
let submit = self
|
||||
.submit_core_op(
|
||||
&request_id,
|
||||
thread.as_ref(),
|
||||
Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: params.prompt,
|
||||
session_id: params.session_id,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
match submit {
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadRealtimeStartResponse::default())
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
let realtime_rtc_enabled = thread.enabled(Feature::RealtimeRtc);
|
||||
match params.protocol {
|
||||
ThreadRealtimeStartProtocol::JsonRpcPcm if realtime_rtc_enabled => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to start realtime conversation: {err}"),
|
||||
"thread/realtime/start requires rtc protocol when features.realtime_rtc is enabled"
|
||||
.to_string(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ThreadRealtimeStartProtocol::JsonRpcPcm => {
|
||||
let submit = self
|
||||
.submit_core_op(
|
||||
&request_id,
|
||||
thread.as_ref(),
|
||||
Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: params.prompt,
|
||||
session_id: params.session_id,
|
||||
transport: RealtimeConversationTransport::Websocket,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
match submit {
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ThreadRealtimeStartResponse {
|
||||
protocol: ThreadRealtimeStartResponseProtocol::JsonRpcPcm,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!("failed to start realtime conversation: {err}"),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
ThreadRealtimeStartProtocol::Rtc { .. } if !realtime_rtc_enabled => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"thread/realtime/start rtc protocol requires features.realtime_rtc".to_string(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ThreadRealtimeStartProtocol::Rtc { offer_sdp } => {
|
||||
let sub_id = format!("{}:{}", request_id.connection_id, request_id.request_id);
|
||||
let trace = self.request_trace_context(&request_id).await;
|
||||
let start = thread
|
||||
.start_realtime_conversation(
|
||||
sub_id,
|
||||
ConversationStartParams {
|
||||
prompt: params.prompt,
|
||||
session_id: params.session_id,
|
||||
transport: RealtimeConversationTransport::Rtc { offer_sdp },
|
||||
},
|
||||
trace,
|
||||
)
|
||||
.await;
|
||||
|
||||
match start {
|
||||
Ok(result) => {
|
||||
let Some(answer_sdp) = result.answer_sdp else {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
"realtime rtc start did not return an SDP answer".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ThreadRealtimeStartResponse {
|
||||
protocol: ThreadRealtimeStartResponseProtocol::Rtc {
|
||||
answer_sdp,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!("failed to start realtime conversation: {err}"),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6834,6 +6908,16 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
};
|
||||
|
||||
if thread.enabled(Feature::RealtimeRtc) {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"thread/realtime/appendAudio is unavailable when features.realtime_rtc is enabled"
|
||||
.to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let submit = self
|
||||
.submit_core_op(
|
||||
&request_id,
|
||||
|
||||
@@ -12,6 +12,7 @@ use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::MockExperimentalMethodParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartProtocol;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -75,6 +76,7 @@ async fn realtime_conversation_start_requires_experimental_api_capability() -> R
|
||||
thread_id: "thr_123".to_string(),
|
||||
prompt: "hello".to_string(),
|
||||
session_id: None,
|
||||
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
|
||||
})
|
||||
.await?;
|
||||
let error = timeout(
|
||||
|
||||
@@ -17,6 +17,7 @@ use codex_app_server_protocol::ThreadRealtimeErrorNotification;
|
||||
use codex_app_server_protocol::ThreadRealtimeItemAddedNotification;
|
||||
use codex_app_server_protocol::ThreadRealtimeOutputAudioDeltaNotification;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartProtocol;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartedNotification;
|
||||
use codex_app_server_protocol::ThreadRealtimeStopParams;
|
||||
@@ -121,6 +122,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
|
||||
thread_id: thread_start.thread.id.clone(),
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
|
||||
})
|
||||
.await?;
|
||||
let start_response: JSONRPCResponse = timeout(
|
||||
@@ -330,6 +332,7 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
|
||||
thread_id: thread_start.thread.id.clone(),
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
|
||||
})
|
||||
.await?;
|
||||
let start_response: JSONRPCResponse = timeout(
|
||||
@@ -401,6 +404,7 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> {
|
||||
thread_id: thread_start.thread.id.clone(),
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
|
||||
})
|
||||
.await?;
|
||||
let error = timeout(
|
||||
|
||||
@@ -23,6 +23,7 @@ tungstenite = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
eventsource-stream = { workspace = true }
|
||||
regex-lite = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["multipart"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
url = { workspace = true }
|
||||
|
||||
@@ -32,7 +33,6 @@ assert_matches = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
tokio-test = { workspace = true }
|
||||
wiremock = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
189
codex-rs/codex-api/src/endpoint/realtime_websocket/calls.rs
Normal file
189
codex-rs/codex-api/src/endpoint/realtime_websocket/calls.rs
Normal file
@@ -0,0 +1,189 @@
|
||||
use crate::endpoint::realtime_websocket::methods::merge_request_headers;
|
||||
use crate::endpoint::realtime_websocket::methods_common::session_update_session;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
|
||||
use crate::error::ApiError;
|
||||
use crate::provider::Provider;
|
||||
use codex_client::build_reqwest_client_with_custom_ca;
|
||||
use http::HeaderMap;
|
||||
use http::header::LOCATION;
|
||||
use reqwest::multipart::Form;
|
||||
use serde::Serialize;
|
||||
use url::Url;
|
||||
|
||||
const REALTIME_CALLS_PATH: &str = "/v1/realtime/calls";
|
||||
|
||||
pub struct RealtimeCallCreateResponse {
|
||||
pub call_id: String,
|
||||
pub answer_sdp: String,
|
||||
}
|
||||
|
||||
pub struct RealtimeCallClient {
|
||||
provider: Provider,
|
||||
}
|
||||
|
||||
impl RealtimeCallClient {
|
||||
pub fn new(provider: Provider) -> Self {
|
||||
Self { provider }
|
||||
}
|
||||
|
||||
pub async fn create(
|
||||
&self,
|
||||
config: &RealtimeSessionConfig,
|
||||
offer_sdp: String,
|
||||
extra_headers: HeaderMap,
|
||||
default_headers: HeaderMap,
|
||||
) -> Result<RealtimeCallCreateResponse, ApiError> {
|
||||
let client = build_reqwest_client_with_custom_ca(reqwest::Client::builder())
|
||||
.map_err(|err| ApiError::Stream(format!("failed to configure realtime HTTP: {err}")))?;
|
||||
let url = realtime_calls_url(&self.provider.base_url)?;
|
||||
let headers = merge_request_headers(&self.provider.headers, extra_headers, default_headers);
|
||||
let response = client
|
||||
.post(url)
|
||||
.headers(headers)
|
||||
.multipart(session_form(config, offer_sdp)?)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|err| ApiError::Stream(format!("failed to create realtime call: {err}")))?;
|
||||
let status = response.status();
|
||||
let headers = response.headers().clone();
|
||||
let answer_sdp = response.text().await.map_err(|err| {
|
||||
ApiError::Stream(format!("failed to read realtime call answer: {err}"))
|
||||
})?;
|
||||
if !status.is_success() {
|
||||
return Err(ApiError::Stream(format!(
|
||||
"realtime call failed with HTTP {status}: {answer_sdp}"
|
||||
)));
|
||||
}
|
||||
let call_id =
|
||||
call_id_from_location(headers.get(LOCATION).and_then(|value| value.to_str().ok()))?;
|
||||
Ok(RealtimeCallCreateResponse {
|
||||
call_id,
|
||||
answer_sdp,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn realtime_calls_url(base_url: &str) -> Result<Url, ApiError> {
|
||||
let mut url =
|
||||
Url::parse(base_url).map_err(|err| ApiError::Stream(format!("invalid base URL: {err}")))?;
|
||||
match url.scheme() {
|
||||
"http" | "https" => {}
|
||||
"ws" => {
|
||||
let _ = url.set_scheme("http");
|
||||
}
|
||||
"wss" => {
|
||||
let _ = url.set_scheme("https");
|
||||
}
|
||||
scheme => {
|
||||
return Err(ApiError::Stream(format!(
|
||||
"unsupported realtime calls URL scheme: {scheme}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
url.set_path(REALTIME_CALLS_PATH);
|
||||
url.set_query(None);
|
||||
Ok(url)
|
||||
}
|
||||
|
||||
fn session_form(config: &RealtimeSessionConfig, offer_sdp: String) -> Result<Form, ApiError> {
|
||||
let session_json = serde_json::to_string(&session_payload(config)).map_err(|err| {
|
||||
ApiError::Stream(format!("failed to serialize realtime call session: {err}"))
|
||||
})?;
|
||||
Ok(Form::new()
|
||||
.text("sdp", offer_sdp)
|
||||
.text("session", session_json))
|
||||
}
|
||||
|
||||
fn session_payload(config: &RealtimeSessionConfig) -> RealtimeCallSession {
|
||||
let session = session_update_session(
|
||||
config.event_parser,
|
||||
config.instructions.clone(),
|
||||
config.session_mode,
|
||||
);
|
||||
RealtimeCallSession {
|
||||
session,
|
||||
model: config.model.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct RealtimeCallSession {
|
||||
#[serde(flatten)]
|
||||
session: SessionUpdateSession,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
model: Option<String>,
|
||||
}
|
||||
|
||||
fn call_id_from_location(location: Option<&str>) -> Result<String, ApiError> {
|
||||
let Some(location) = location else {
|
||||
return Err(ApiError::Stream(
|
||||
"realtime call response missing Location header".to_string(),
|
||||
));
|
||||
};
|
||||
let call_id = location
|
||||
.trim_end_matches('/')
|
||||
.rsplit('/')
|
||||
.next()
|
||||
.filter(|call_id| !call_id.is_empty())
|
||||
.ok_or_else(|| ApiError::Stream("invalid realtime call Location header".to_string()))?;
|
||||
Ok(call_id.to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn realtime_calls_url_uses_calls_path() {
|
||||
assert_eq!(
|
||||
realtime_calls_url("wss://api.openai.com/v1/realtime")
|
||||
.expect("url")
|
||||
.as_str(),
|
||||
"https://api.openai.com/v1/realtime/calls"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn call_id_from_location_extracts_last_path_segment() {
|
||||
assert_eq!(
|
||||
call_id_from_location(Some("/v1/realtime/calls/rtc_123")).expect("call id"),
|
||||
"rtc_123"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn session_form_contains_offer_and_session() {
|
||||
let form = session_form(
|
||||
&RealtimeSessionConfig {
|
||||
instructions: "backend prompt".to_string(),
|
||||
model: Some("gpt-realtime".to_string()),
|
||||
session_id: Some("session".to_string()),
|
||||
event_parser: RealtimeEventParser::RealtimeV2,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
},
|
||||
"v=0".to_string(),
|
||||
)
|
||||
.expect("form");
|
||||
assert!(!form.boundary().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn session_payload_includes_model() {
|
||||
let payload = serde_json::to_value(session_payload(&RealtimeSessionConfig {
|
||||
instructions: "backend prompt".to_string(),
|
||||
model: Some("gpt-realtime".to_string()),
|
||||
session_id: Some("session".to_string()),
|
||||
event_parser: RealtimeEventParser::RealtimeV2,
|
||||
session_mode: RealtimeSessionMode::Conversational,
|
||||
}))
|
||||
.expect("session payload");
|
||||
|
||||
assert_eq!(payload["type"], "realtime");
|
||||
assert_eq!(payload["model"], "gpt-realtime");
|
||||
assert_eq!(payload["instructions"], "backend prompt");
|
||||
}
|
||||
}
|
||||
@@ -463,7 +463,34 @@ impl RealtimeWebsocketClient {
|
||||
config.event_parser,
|
||||
config.session_mode,
|
||||
)?;
|
||||
self.connect_with_url(ws_url, config, extra_headers, default_headers)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn connect_to_call_id(
|
||||
&self,
|
||||
config: RealtimeSessionConfig,
|
||||
call_id: &str,
|
||||
extra_headers: HeaderMap,
|
||||
default_headers: HeaderMap,
|
||||
) -> Result<RealtimeWebsocketConnection, ApiError> {
|
||||
ensure_rustls_crypto_provider();
|
||||
let ws_url = websocket_url_from_api_url_with_call_id(
|
||||
self.provider.base_url.as_str(),
|
||||
self.provider.query_params.as_ref(),
|
||||
call_id,
|
||||
)?;
|
||||
self.connect_with_url(ws_url, config, extra_headers, default_headers)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn connect_with_url(
|
||||
&self,
|
||||
ws_url: Url,
|
||||
config: RealtimeSessionConfig,
|
||||
extra_headers: HeaderMap,
|
||||
default_headers: HeaderMap,
|
||||
) -> Result<RealtimeWebsocketConnection, ApiError> {
|
||||
let mut request = ws_url
|
||||
.as_str()
|
||||
.into_client_request()
|
||||
@@ -509,7 +536,7 @@ impl RealtimeWebsocketClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_request_headers(
|
||||
pub(super) fn merge_request_headers(
|
||||
provider_headers: &HeaderMap,
|
||||
extra_headers: HeaderMap,
|
||||
default_headers: HeaderMap,
|
||||
@@ -596,6 +623,46 @@ fn websocket_url_from_api_url(
|
||||
Ok(url)
|
||||
}
|
||||
|
||||
fn websocket_url_from_api_url_with_call_id(
|
||||
api_url: &str,
|
||||
query_params: Option<&HashMap<String, String>>,
|
||||
call_id: &str,
|
||||
) -> Result<Url, ApiError> {
|
||||
let mut url = Url::parse(api_url)
|
||||
.map_err(|err| ApiError::Stream(format!("failed to parse realtime api_url: {err}")))?;
|
||||
|
||||
normalize_realtime_path(&mut url);
|
||||
|
||||
match url.scheme() {
|
||||
"ws" | "wss" => {}
|
||||
"http" | "https" => {
|
||||
let scheme = if url.scheme() == "http" { "ws" } else { "wss" };
|
||||
let _ = url.set_scheme(scheme);
|
||||
}
|
||||
scheme => {
|
||||
return Err(ApiError::Stream(format!(
|
||||
"unsupported realtime api_url scheme: {scheme}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
url.set_query(None);
|
||||
{
|
||||
let mut query = url.query_pairs_mut();
|
||||
query.append_pair("call_id", call_id);
|
||||
if let Some(query_params) = query_params {
|
||||
for (key, value) in query_params {
|
||||
if matches!(key.as_str(), "call_id" | "intent" | "model") {
|
||||
continue;
|
||||
}
|
||||
query.append_pair(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(url)
|
||||
}
|
||||
|
||||
fn normalize_realtime_path(url: &mut Url) {
|
||||
let path = url.path().to_string();
|
||||
if path.is_empty() || path == "/" {
|
||||
@@ -1094,6 +1161,25 @@ mod tests {
|
||||
assert_eq!(url.as_str(), "wss://example.com/v1/realtime");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn websocket_url_with_call_id_uses_call_id_only_for_session_selectors() {
|
||||
let url = websocket_url_from_api_url_with_call_id(
|
||||
"https://example.com/v1/realtime?model=old&foo=bar",
|
||||
Some(&HashMap::from([
|
||||
("trace".to_string(), "1".to_string()),
|
||||
("intent".to_string(), "ignored".to_string()),
|
||||
("model".to_string(), "ignored".to_string()),
|
||||
])),
|
||||
"rtc_123",
|
||||
)
|
||||
.expect("build ws url");
|
||||
|
||||
assert_eq!(
|
||||
url.as_str(),
|
||||
"wss://example.com/v1/realtime?call_id=rtc_123&trace=1"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn e2e_connect_and_exchange_events_against_mock_ws_server() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
mod calls;
|
||||
pub mod methods;
|
||||
mod methods_common;
|
||||
mod methods_v1;
|
||||
@@ -7,6 +8,8 @@ mod protocol_common;
|
||||
mod protocol_v1;
|
||||
mod protocol_v2;
|
||||
|
||||
pub use calls::RealtimeCallClient;
|
||||
pub use calls::RealtimeCallCreateResponse;
|
||||
pub use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
pub use codex_protocol::protocol::RealtimeEvent;
|
||||
pub use methods::RealtimeWebsocketClient;
|
||||
|
||||
@@ -31,6 +31,8 @@ pub use crate::common::response_create_client_metadata;
|
||||
pub use crate::endpoint::compact::CompactClient;
|
||||
pub use crate::endpoint::memories::MemoriesClient;
|
||||
pub use crate::endpoint::models::ModelsClient;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeCallClient;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeCallCreateResponse;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;
|
||||
|
||||
@@ -437,6 +437,9 @@
|
||||
"realtime_conversation": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"realtime_rtc": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"remote_models": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -2144,6 +2147,9 @@
|
||||
"realtime_conversation": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"realtime_rtc": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"remote_models": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -27,10 +27,12 @@ use crate::exec_policy::ExecPolicyManager;
|
||||
use crate::parse_turn_item;
|
||||
use crate::path_utils::normalize_for_native_workdir;
|
||||
use crate::realtime_conversation::RealtimeConversationManager;
|
||||
use crate::realtime_conversation::RealtimeConversationStartResult;
|
||||
use crate::realtime_conversation::handle_audio as handle_realtime_conversation_audio;
|
||||
use crate::realtime_conversation::handle_close as handle_realtime_conversation_close;
|
||||
use crate::realtime_conversation::handle_start as handle_realtime_conversation_start;
|
||||
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
|
||||
use crate::realtime_conversation::send_start_error as send_realtime_conversation_start_error;
|
||||
use crate::realtime_conversation::start_and_send_events as start_realtime_conversation_and_send_events;
|
||||
use crate::render_skills_section;
|
||||
use crate::rollout::session_index;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
@@ -346,6 +348,7 @@ use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::BackgroundEventEvent;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::ConversationStartParams;
|
||||
use codex_protocol::protocol::DeprecationNoticeEvent;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::Event;
|
||||
@@ -734,6 +737,36 @@ impl Codex {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start_realtime_conversation(
|
||||
&self,
|
||||
sub_id: String,
|
||||
params: ConversationStartParams,
|
||||
trace: Option<W3cTraceContext>,
|
||||
) -> CodexResult<RealtimeConversationStartResult> {
|
||||
let (tx_result, rx_result) = oneshot::channel();
|
||||
self.session
|
||||
.realtime_start_waiters
|
||||
.lock()
|
||||
.await
|
||||
.insert(sub_id.clone(), tx_result);
|
||||
let submit_result = self
|
||||
.submit_with_id(Submission {
|
||||
id: sub_id.clone(),
|
||||
op: Op::RealtimeConversationStart(params),
|
||||
trace,
|
||||
})
|
||||
.await;
|
||||
if let Err(err) = submit_result {
|
||||
self.session
|
||||
.realtime_start_waiters
|
||||
.lock()
|
||||
.await
|
||||
.remove(&sub_id);
|
||||
return Err(err);
|
||||
}
|
||||
rx_result.await.map_err(|_| CodexErr::InternalAgentDied)?
|
||||
}
|
||||
|
||||
pub async fn shutdown_and_wait(&self) -> CodexResult<()> {
|
||||
let session_loop_termination = self.session_loop_termination.clone();
|
||||
match self.submit(Op::Shutdown).await {
|
||||
@@ -823,6 +856,8 @@ pub(crate) struct Session {
|
||||
features: ManagedFeatures,
|
||||
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
|
||||
pub(crate) conversation: Arc<RealtimeConversationManager>,
|
||||
realtime_start_waiters:
|
||||
Mutex<HashMap<String, oneshot::Sender<CodexResult<RealtimeConversationStartResult>>>>,
|
||||
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
|
||||
mailbox: Mailbox,
|
||||
mailbox_rx: Mutex<MailboxReceiver>,
|
||||
@@ -1978,6 +2013,7 @@ impl Session {
|
||||
features: config.features.clone(),
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
conversation: Arc::new(RealtimeConversationManager::new()),
|
||||
realtime_start_waiters: Mutex::new(HashMap::new()),
|
||||
active_turn: Mutex::new(None),
|
||||
mailbox,
|
||||
mailbox_rx: Mutex::new(mailbox_rx),
|
||||
@@ -4512,17 +4548,18 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
false
|
||||
}
|
||||
Op::RealtimeConversationStart(params) => {
|
||||
if let Err(err) =
|
||||
handle_realtime_conversation_start(&sess, sub.id.clone(), params).await
|
||||
{
|
||||
sess.send_event_raw(Event {
|
||||
id: sub.id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: err.to_string(),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
let result =
|
||||
start_realtime_conversation_and_send_events(&sess, &sub.id, params).await;
|
||||
match sess.realtime_start_waiters.lock().await.remove(&sub.id) {
|
||||
Some(tx_result) => {
|
||||
let _ = tx_result.send(result);
|
||||
}
|
||||
None => {
|
||||
if let Err(err) = result {
|
||||
send_realtime_conversation_start_error(&sess, sub.id.clone(), err)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::codex::Codex;
|
||||
use crate::codex::SteerInputError;
|
||||
use crate::config::ConstraintResult;
|
||||
use crate::file_watcher::WatchRegistration;
|
||||
use crate::realtime_conversation::RealtimeConversationStartResult;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::config_types::ApprovalsReviewer;
|
||||
use codex_protocol::config_types::Personality;
|
||||
@@ -14,6 +15,7 @@ use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::ConversationStartParams;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
@@ -92,6 +94,17 @@ impl CodexThread {
|
||||
self.codex.submit_with_trace(op, trace).await
|
||||
}
|
||||
|
||||
pub async fn start_realtime_conversation(
|
||||
&self,
|
||||
sub_id: String,
|
||||
params: ConversationStartParams,
|
||||
trace: Option<W3cTraceContext>,
|
||||
) -> CodexResult<RealtimeConversationStartResult> {
|
||||
self.codex
|
||||
.start_realtime_conversation(sub_id, params, trace)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn steer_input(
|
||||
&self,
|
||||
input: Vec<UserInput>,
|
||||
|
||||
@@ -14,6 +14,7 @@ pub mod codex;
|
||||
mod realtime_context;
|
||||
mod realtime_conversation;
|
||||
pub use codex::SteerInputError;
|
||||
pub use realtime_conversation::RealtimeConversationStartResult;
|
||||
mod codex_thread;
|
||||
mod compact_remote;
|
||||
pub use codex_thread::CodexThread;
|
||||
|
||||
@@ -9,6 +9,7 @@ use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use codex_api::Provider as ApiProvider;
|
||||
use codex_api::RealtimeAudioFrame;
|
||||
use codex_api::RealtimeCallClient;
|
||||
use codex_api::RealtimeEvent;
|
||||
use codex_api::RealtimeEventParser;
|
||||
use codex_api::RealtimeSessionConfig;
|
||||
@@ -34,6 +35,7 @@ use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RealtimeConversationClosedEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationStartedEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationTransport;
|
||||
use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
@@ -117,6 +119,10 @@ struct RealtimeInputTask {
|
||||
session_kind: RealtimeSessionKind,
|
||||
}
|
||||
|
||||
pub struct RealtimeConversationStartResult {
|
||||
pub answer_sdp: Option<String>,
|
||||
}
|
||||
|
||||
impl RealtimeHandoffState {
|
||||
fn new(output_tx: Sender<HandoffOutput>, session_kind: RealtimeSessionKind) -> Self {
|
||||
Self {
|
||||
@@ -159,6 +165,7 @@ impl RealtimeConversationManager {
|
||||
api_provider: ApiProvider,
|
||||
extra_headers: Option<HeaderMap>,
|
||||
session_config: RealtimeSessionConfig,
|
||||
sideband_call_id: Option<String>,
|
||||
) -> CodexResult<(Receiver<RealtimeEvent>, Arc<AtomicBool>)> {
|
||||
let previous_state = {
|
||||
let mut guard = self.state.lock().await;
|
||||
@@ -173,14 +180,20 @@ impl RealtimeConversationManager {
|
||||
};
|
||||
|
||||
let client = RealtimeWebsocketClient::new(api_provider);
|
||||
let connection = client
|
||||
.connect(
|
||||
session_config,
|
||||
extra_headers.unwrap_or_default(),
|
||||
default_headers(),
|
||||
)
|
||||
.await
|
||||
.map_err(map_api_error)?;
|
||||
let headers = extra_headers.unwrap_or_default();
|
||||
let connection = match sideband_call_id {
|
||||
Some(call_id) => {
|
||||
client
|
||||
.connect_to_call_id(session_config, &call_id, headers, default_headers())
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
client
|
||||
.connect(session_config, headers, default_headers())
|
||||
.await
|
||||
}
|
||||
}
|
||||
.map_err(map_api_error)?;
|
||||
|
||||
let writer = connection.writer();
|
||||
let events = connection.events();
|
||||
@@ -406,39 +419,32 @@ async fn stop_conversation_state(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_start(
|
||||
pub(crate) async fn send_start_error(sess: &Arc<Session>, sub_id: String, err: CodexErr) {
|
||||
error!("failed to start realtime conversation: {err}");
|
||||
let message = err.to_string();
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error(message),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn start_and_send_events(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: String,
|
||||
sub_id: &str,
|
||||
params: ConversationStartParams,
|
||||
) -> CodexResult<()> {
|
||||
) -> CodexResult<RealtimeConversationStartResult> {
|
||||
let prepared_start = match prepare_realtime_start(sess, params).await {
|
||||
Ok(prepared_start) => prepared_start,
|
||||
Err(err) => {
|
||||
error!("failed to prepare realtime conversation: {err}");
|
||||
let message = err.to_string();
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error(message),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
return Ok(());
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = handle_start_inner(sess, &sub_id, prepared_start).await {
|
||||
error!("failed to start realtime conversation: {err}");
|
||||
let message = err.to_string();
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error(message),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
handle_start_inner(sess, sub_id, prepared_start).await
|
||||
}
|
||||
|
||||
struct PreparedRealtimeConversationStart {
|
||||
@@ -447,6 +453,7 @@ struct PreparedRealtimeConversationStart {
|
||||
requested_session_id: Option<String>,
|
||||
version: RealtimeWsVersion,
|
||||
session_config: RealtimeSessionConfig,
|
||||
transport: RealtimeConversationTransport,
|
||||
}
|
||||
|
||||
async fn prepare_realtime_start(
|
||||
@@ -484,7 +491,11 @@ async fn prepare_realtime_start(
|
||||
format!("{prompt}\n\n{startup_context}")
|
||||
};
|
||||
let model = config.experimental_realtime_ws_model.clone();
|
||||
let version = config.realtime.version;
|
||||
let transport = params.transport;
|
||||
let version = match transport {
|
||||
RealtimeConversationTransport::Rtc { .. } => RealtimeWsVersion::V2,
|
||||
RealtimeConversationTransport::Websocket => config.realtime.version,
|
||||
};
|
||||
let event_parser = match version {
|
||||
RealtimeWsVersion::V1 => RealtimeEventParser::V1,
|
||||
RealtimeWsVersion::V2 => RealtimeEventParser::RealtimeV2,
|
||||
@@ -509,6 +520,7 @@ async fn prepare_realtime_start(
|
||||
requested_session_id,
|
||||
version,
|
||||
session_config,
|
||||
transport,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -516,18 +528,39 @@ async fn handle_start_inner(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: &str,
|
||||
prepared_start: PreparedRealtimeConversationStart,
|
||||
) -> CodexResult<()> {
|
||||
) -> CodexResult<RealtimeConversationStartResult> {
|
||||
let PreparedRealtimeConversationStart {
|
||||
api_provider,
|
||||
extra_headers,
|
||||
requested_session_id,
|
||||
version,
|
||||
session_config,
|
||||
transport,
|
||||
} = prepared_start;
|
||||
let (answer_sdp, sideband_call_id) = match transport {
|
||||
RealtimeConversationTransport::Websocket => (None, None),
|
||||
RealtimeConversationTransport::Rtc { offer_sdp } => {
|
||||
let call = RealtimeCallClient::new(api_provider.clone())
|
||||
.create(
|
||||
&session_config,
|
||||
offer_sdp,
|
||||
extra_headers.clone().unwrap_or_default(),
|
||||
default_headers(),
|
||||
)
|
||||
.await
|
||||
.map_err(map_api_error)?;
|
||||
(Some(call.answer_sdp), Some(call.call_id))
|
||||
}
|
||||
};
|
||||
info!("starting realtime conversation");
|
||||
let (events_rx, realtime_active) = sess
|
||||
.conversation
|
||||
.start(api_provider, extra_headers, session_config)
|
||||
.start(
|
||||
api_provider,
|
||||
extra_headers,
|
||||
session_config,
|
||||
sideband_call_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("realtime conversation started");
|
||||
@@ -601,7 +634,7 @@ async fn handle_start_inner(
|
||||
.register_fanout_task(&realtime_active, fanout_task)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
Ok(RealtimeConversationStartResult { answer_sdp })
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_audio(
|
||||
|
||||
@@ -118,6 +118,7 @@ async fn start_realtime_conversation(codex: &codex_core::CodexThread) -> Result<
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -182,6 +182,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -324,6 +325,7 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() ->
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -383,6 +385,7 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -466,6 +469,7 @@ async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Res
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -506,6 +510,7 @@ async fn conversation_start_connect_failure_emits_realtime_error_only() -> Resul
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -594,6 +599,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "old".to_string(),
|
||||
session_id: Some("conv_old".to_string()),
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
@@ -610,6 +616,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "new".to_string(),
|
||||
session_id: Some("conv_new".to_string()),
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
@@ -696,6 +703,7 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -750,6 +758,7 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "prompt from op".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -812,6 +821,7 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "prompt from op".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -872,6 +882,7 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "prompt from op".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -925,6 +936,7 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -978,6 +990,7 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -1029,6 +1042,7 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -1113,6 +1127,7 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Re
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -1239,6 +1254,7 @@ async fn conversation_handoff_persists_across_item_done_until_turn_complete() ->
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -1380,6 +1396,7 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -1474,6 +1491,7 @@ async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -1566,6 +1584,7 @@ async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -1665,6 +1684,7 @@ async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio(
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -1777,6 +1797,7 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -1919,6 +1940,7 @@ async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> R
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -2045,6 +2067,7 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
@@ -2186,6 +2209,7 @@ async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio()
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -176,6 +176,8 @@ pub enum Feature {
|
||||
FastMode,
|
||||
/// Enable experimental realtime voice conversation mode in the TUI.
|
||||
RealtimeConversation,
|
||||
/// Switch realtime conversation setup to the WebRTC sideband protocol.
|
||||
RealtimeRtc,
|
||||
/// Removed compatibility flag. The TUI now always uses the app-server implementation.
|
||||
TuiAppServer,
|
||||
/// Prevent idle system sleep while a turn is actively running.
|
||||
@@ -825,6 +827,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::RealtimeRtc,
|
||||
key: "realtime_rtc",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::TuiAppServer,
|
||||
key: "tui_app_server",
|
||||
|
||||
@@ -135,6 +135,18 @@ pub struct ConversationStartParams {
|
||||
pub prompt: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub session_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub transport: RealtimeConversationTransport,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum RealtimeConversationTransport {
|
||||
#[default]
|
||||
Websocket,
|
||||
Rtc {
|
||||
offer_sdp: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
@@ -4395,6 +4407,7 @@ mod tests {
|
||||
let start = Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "be helpful".to_string(),
|
||||
session_id: Some("conv_1".to_string()),
|
||||
transport: Default::default(),
|
||||
});
|
||||
let text = Op::RealtimeConversationText(ConversationTextParams {
|
||||
text: "hello".to_string(),
|
||||
|
||||
@@ -41,6 +41,7 @@ use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeAppendTextParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeAppendTextResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartProtocol;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeStopParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeStopResponse;
|
||||
@@ -658,6 +659,7 @@ impl AppServerSession {
|
||||
thread_id: thread_id.to_string(),
|
||||
prompt: params.prompt,
|
||||
session_id: params.session_id,
|
||||
protocol: ThreadRealtimeStartProtocol::JsonRpcPcm,
|
||||
},
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -229,6 +229,7 @@ impl ChatWidget {
|
||||
ConversationStartParams {
|
||||
prompt: REALTIME_CONVERSATION_PROMPT.to_string(),
|
||||
session_id: None,
|
||||
transport: Default::default(),
|
||||
},
|
||||
));
|
||||
self.request_redraw();
|
||||
|
||||
Reference in New Issue
Block a user