Add realtime output modality and transcript events (#17701)

- Add outputModality to thread/realtime/start and wire text/audio output
selection through app-server, core, API, and TUI.\n- Rename the realtime
transcript delta notification and add a separate transcript done
notification that forwards final text from item done without correlating
it with deltas.
This commit is contained in:
Ahmed Ibrahim
2026-04-14 00:13:13 -07:00
committed by GitHub
parent a6b03a22cc
commit 2f6fc7c137
38 changed files with 711 additions and 77 deletions

View File

@@ -31,7 +31,8 @@ use codex_app_server_protocol::ThreadRealtimeStartTransport;
use codex_app_server_protocol::ThreadRealtimeStartedNotification;
use codex_app_server_protocol::ThreadRealtimeStopParams;
use codex_app_server_protocol::ThreadRealtimeStopResponse;
use codex_app_server_protocol::ThreadRealtimeTranscriptUpdatedNotification;
use codex_app_server_protocol::ThreadRealtimeTranscriptDeltaNotification;
use codex_app_server_protocol::ThreadRealtimeTranscriptDoneNotification;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
@@ -39,6 +40,7 @@ use codex_app_server_protocol::TurnStartedNotification;
use codex_features::FEATURES;
use codex_features::Feature;
use codex_protocol::protocol::RealtimeConversationVersion;
use codex_protocol::protocol::RealtimeOutputModality;
use codex_protocol::protocol::RealtimeVoice;
use codex_protocol::protocol::RealtimeVoicesList;
use core_test_support::responses;
@@ -301,6 +303,7 @@ impl RealtimeE2eHarness {
.mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: self.thread_id.clone(),
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
session_id: None,
transport: Some(ThreadRealtimeStartTransport::Webrtc {
@@ -478,6 +481,15 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
"type": "response.output_text.delta",
"delta": "working"
}),
json!({
"type": "conversation.item.done",
"item": {
"id": "item_assistant_1",
"type": "message",
"role": "assistant",
"content": [{ "type": "output_text", "text": "working on it" }]
}
}),
json!({
"type": "conversation.item.done",
"item": {
@@ -523,6 +535,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: thread_start.thread.id.clone(),
output_modality: RealtimeOutputModality::Audio,
prompt: None,
session_id: None,
transport: None,
@@ -554,6 +567,10 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
startup_context_request.body_json()["session"]["audio"]["output"]["voice"],
"cedar"
);
assert_eq!(
startup_context_request.body_json()["session"]["output_modalities"],
json!(["audio"])
);
let startup_context_instructions =
startup_context_request.body_json()["session"]["instructions"]
.as_str()
@@ -612,24 +629,32 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
assert_eq!(item_added.thread_id, output_audio.thread_id);
assert_eq!(item_added.item["type"], json!("message"));
let first_transcript_update = read_notification::<ThreadRealtimeTranscriptUpdatedNotification>(
let first_transcript_delta = read_notification::<ThreadRealtimeTranscriptDeltaNotification>(
&mut mcp,
"thread/realtime/transcriptUpdated",
"thread/realtime/transcript/delta",
)
.await?;
assert_eq!(first_transcript_update.thread_id, output_audio.thread_id);
assert_eq!(first_transcript_update.role, "user");
assert_eq!(first_transcript_update.text, "delegate now");
assert_eq!(first_transcript_delta.thread_id, output_audio.thread_id);
assert_eq!(first_transcript_delta.role, "user");
assert_eq!(first_transcript_delta.delta, "delegate now");
let second_transcript_update =
read_notification::<ThreadRealtimeTranscriptUpdatedNotification>(
&mut mcp,
"thread/realtime/transcriptUpdated",
)
.await?;
assert_eq!(second_transcript_update.thread_id, output_audio.thread_id);
assert_eq!(second_transcript_update.role, "assistant");
assert_eq!(second_transcript_update.text, "working");
let second_transcript_delta = read_notification::<ThreadRealtimeTranscriptDeltaNotification>(
&mut mcp,
"thread/realtime/transcript/delta",
)
.await?;
assert_eq!(second_transcript_delta.thread_id, output_audio.thread_id);
assert_eq!(second_transcript_delta.role, "assistant");
assert_eq!(second_transcript_delta.delta, "working");
let final_transcript_done = read_notification::<ThreadRealtimeTranscriptDoneNotification>(
&mut mcp,
"thread/realtime/transcript/done",
)
.await?;
assert_eq!(final_transcript_done.thread_id, output_audio.thread_id);
assert_eq!(final_transcript_done.role, "assistant");
assert_eq!(final_transcript_done.text, "working on it");
let handoff_item_added = read_notification::<ThreadRealtimeItemAddedNotification>(
&mut mcp,
@@ -693,6 +718,140 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn realtime_text_output_modality_requests_text_output_and_final_transcript() -> Result<()> {
skip_if_no_network!(Ok(()));
let responses_server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let realtime_server = start_websocket_server(vec![vec![vec![
json!({
"type": "session.updated",
"session": { "id": "sess_text", "instructions": "backend prompt" }
}),
json!({
"type": "response.output_text.delta",
"delta": "hello "
}),
json!({
"type": "response.output_text.delta",
"delta": "world"
}),
json!({
"type": "response.output_audio_transcript.done",
"transcript": "hello world"
}),
json!({
"type": "conversation.item.done",
"item": {
"id": "item_output_1",
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "hello world"}]
}
}),
]]])
.await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&responses_server.uri(),
realtime_server.uri(),
/*realtime_enabled*/ true,
StartupContextConfig::Generated,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
mcp.initialize().await?;
login_with_api_key(&mut mcp, "sk-test-key").await?;
let thread_start_request_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_request_id)),
)
.await??;
let thread_start: ThreadStartResponse = to_response(thread_start_response)?;
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: thread_start.thread.id.clone(),
output_modality: RealtimeOutputModality::Text,
prompt: None,
session_id: None,
transport: None,
voice: None,
})
.await?;
let start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_request_id)),
)
.await??;
let _: ThreadRealtimeStartResponse = to_response(start_response)?;
let session_update = realtime_server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 0)
.await;
assert_eq!(
session_update.body_json()["session"]["output_modalities"],
json!(["text"])
);
let first_delta = read_notification::<ThreadRealtimeTranscriptDeltaNotification>(
&mut mcp,
"thread/realtime/transcript/delta",
)
.await?;
let second_delta = read_notification::<ThreadRealtimeTranscriptDeltaNotification>(
&mut mcp,
"thread/realtime/transcript/delta",
)
.await?;
let done = read_notification::<ThreadRealtimeTranscriptDoneNotification>(
&mut mcp,
"thread/realtime/transcript/done",
)
.await?;
assert_eq!(
vec![first_delta, second_delta],
vec![
ThreadRealtimeTranscriptDeltaNotification {
thread_id: thread_start.thread.id.clone(),
role: "assistant".to_string(),
delta: "hello ".to_string(),
},
ThreadRealtimeTranscriptDeltaNotification {
thread_id: thread_start.thread.id.clone(),
role: "assistant".to_string(),
delta: "world".to_string(),
},
]
);
assert_eq!(
done,
ThreadRealtimeTranscriptDoneNotification {
thread_id: thread_start.thread.id,
role: "assistant".to_string(),
text: "hello world".to_string(),
}
);
assert!(
timeout(
Duration::from_millis(200),
mcp.read_stream_until_notification_message("thread/realtime/transcript/done"),
)
.await
.is_err(),
"should not emit duplicate transcript done from audio transcript done"
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test]
async fn realtime_list_voices_returns_supported_names() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -793,6 +952,7 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: thread_start.thread.id.clone(),
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
session_id: None,
transport: None,
@@ -889,6 +1049,7 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: thread_id.clone(),
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
session_id: None,
transport: Some(ThreadRealtimeStartTransport::Webrtc {
@@ -1163,11 +1324,11 @@ async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Resu
harness.append_text(thread_id, "hello").await?;
let transcript = harness
.read_notification::<ThreadRealtimeTranscriptUpdatedNotification>(
"thread/realtime/transcriptUpdated",
.read_notification::<ThreadRealtimeTranscriptDeltaNotification>(
"thread/realtime/transcript/delta",
)
.await?;
assert_eq!(transcript.text, "transcribed audio");
assert_eq!(transcript.delta, "transcribed audio");
let output_audio = harness
.read_notification::<ThreadRealtimeOutputAudioDeltaNotification>(
"thread/realtime/outputAudio/delta",
@@ -1252,11 +1413,11 @@ async fn webrtc_v2_text_input_is_append_only_while_response_is_active() -> Resul
"first",
);
let transcript = harness
.read_notification::<ThreadRealtimeTranscriptUpdatedNotification>(
"thread/realtime/transcriptUpdated",
.read_notification::<ThreadRealtimeTranscriptDeltaNotification>(
"thread/realtime/transcript/delta",
)
.await?;
assert_eq!(transcript.text, "active response started");
assert_eq!(transcript.delta, "active response started");
// Phase 3: send a second text turn while `resp_active` is still open. The
// user message must reach realtime without requesting another response.
@@ -1736,6 +1897,7 @@ async fn realtime_webrtc_start_surfaces_backend_error() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: thread_start.thread.id,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
session_id: None,
transport: Some(ThreadRealtimeStartTransport::Webrtc {
@@ -1794,6 +1956,7 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: thread_start.thread.id.clone(),
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
session_id: None,
transport: None,