Compare commits

...

14 Commits

Author SHA1 Message Date
Ahmed Ibrahim
50830d2187 Fix realtime append-only app-server tests 2026-04-12 13:59:05 -07:00
Ahmed Ibrahim
e35c654b73 Update app-server realtime text tests 2026-04-12 13:39:05 -07:00
Ahmed Ibrahim
03606c72fd Route realtime handoff through inner turn handler 2026-04-12 12:50:51 -07:00
Ahmed Ibrahim
9d71ecf606 Trim realtime text mirror plumbing 2026-04-12 12:48:01 -07:00
Ahmed Ibrahim
c0b675291d Make realtime text input append only 2026-04-12 12:42:58 -07:00
Ahmed Ibrahim
70c065ae25 Keep realtime mirror outside turn handler 2026-04-12 12:37:48 -07:00
Ahmed Ibrahim
649a5f722e Expand realtime mirror wrappers 2026-04-12 12:31:11 -07:00
Ahmed Ibrahim
1826d488d0 Simplify realtime user input mirror 2026-04-12 12:18:54 -07:00
Ahmed Ibrahim
c5c82d2646 Remove realtime mirror routing wrapper 2026-04-12 12:14:19 -07:00
Ahmed Ibrahim
ae5f97f421 Simplify realtime text mirror routing 2026-04-12 12:07:02 -07:00
Ahmed Ibrahim
582c528fd7 Keep mirrored user text append-only in realtime 2026-04-12 11:59:57 -07:00
Ahmed Ibrahim
b2c7e3f668 codex: fix realtime handoff Bazel fixture 2026-04-11 23:52:19 -07:00
Ahmed Ibrahim
a232a9ab01 codex: fix CI failure on PR #17520 2026-04-11 23:39:41 -07:00
Ahmed Ibrahim
c9438f6891 Mirror user text into realtime 2026-04-11 23:28:13 -07:00
7 changed files with 200 additions and 102 deletions

View File

@@ -661,7 +661,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
let connections = realtime_server.connections();
assert_eq!(connections.len(), 1);
let connection = &connections[0];
assert_eq!(connection.len(), 4);
assert_eq!(connection.len(), 3);
assert_eq!(
connection[0].body_json()["type"].as_str(),
Some("session.update")
@@ -679,10 +679,6 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
.as_str()
.context("expected websocket request type")?
.to_string(),
connection[3].body_json()["type"]
.as_str()
.context("expected websocket request type")?
.to_string(),
];
request_types.sort();
assert_eq!(
@@ -690,7 +686,6 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
[
"conversation.item.create".to_string(),
"input_audio_buffer.append".to_string(),
"response.create".to_string(),
]
);
@@ -1153,7 +1148,6 @@ async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Resu
"samples_per_channel": 512
}),
],
vec![],
])]),
)
.await?;
@@ -1185,7 +1179,6 @@ async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Resu
let requests = [
harness.sideband_outbound_request(/*request_index*/ 1).await,
harness.sideband_outbound_request(/*request_index*/ 2).await,
harness.sideband_outbound_request(/*request_index*/ 3).await,
];
assert!(
requests
@@ -1208,13 +1201,12 @@ async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Resu
Ok(())
}
/// Regression coverage for Realtime V2's single-active-response rule.
/// Regression coverage for Realtime V2 text input while a response is active.
///
/// The Realtime API rejects a new `response.create` while a default response is
/// still active, so the input task should queue the second create and flush it
/// only after the server sends `response.done` for the active response.
/// Text input is append-only, so app-server should send the user message without
/// requesting a new realtime response.
#[tokio::test]
async fn webrtc_v2_queues_text_response_create_while_response_is_active() -> Result<()> {
async fn webrtc_v2_text_input_is_append_only_while_response_is_active() -> Result<()> {
skip_if_no_network!(Ok(()));
// Phase 1: script a server-side response that becomes active after the first
@@ -1224,7 +1216,6 @@ async fn webrtc_v2_queues_text_response_create_while_response_is_active() -> Res
no_main_loop_responses(),
realtime_sideband(vec![realtime_sideband_connection(vec![
vec![session_updated("sess_v2_response_queue")],
vec![],
vec![
json!({
"type": "response.created",
@@ -1240,7 +1231,6 @@ async fn webrtc_v2_queues_text_response_create_while_response_is_active() -> Res
"type": "response.done",
"response": { "id": "resp_active" }
})],
vec![],
])]),
)
.await?;
@@ -1253,15 +1243,14 @@ async fn webrtc_v2_queues_text_response_create_while_response_is_active() -> Res
// notifications; they are the protocol frames app-server sends upstream.
assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?;
// Phase 2: send the first text turn. It is safe to emit `response.create`
// immediately because no default response is active yet.
// Phase 2: send the first text turn. Text input is append-only, so this
// sends only the user text item.
let thread_id = started.started.thread_id.clone();
harness.append_text(thread_id.clone(), "first").await?;
assert_v2_user_text_item(
&harness.sideband_outbound_request(/*request_index*/ 1).await,
"first",
);
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 2).await);
let transcript = harness
.read_notification::<ThreadRealtimeTranscriptUpdatedNotification>(
"thread/realtime/transcriptUpdated",
@@ -1270,39 +1259,28 @@ async fn webrtc_v2_queues_text_response_create_while_response_is_active() -> Res
assert_eq!(transcript.text, "active response started");
// Phase 3: send a second text turn while `resp_active` is still open. The
// user message must reach realtime, but `response.create` must not be sent
// yet or the Realtime API rejects it as an active-response conflict.
// user message must reach realtime without requesting another response.
harness.append_text(thread_id.clone(), "second").await?;
assert_v2_user_text_item(
&harness.sideband_outbound_request(/*request_index*/ 3).await,
&harness.sideband_outbound_request(/*request_index*/ 2).await,
"second",
);
// Phase 4: the audio input causes the scripted sideband stream to send
// `response.done`, which clears the active response and flushes the queued
// `response.create` for the second text turn.
// Phase 4: audio still forwards normally after text input.
harness.append_audio(thread_id).await?;
// This is the negative check: if the second text turn had emitted
// `response.create` immediately, request 4 would be that create instead of
// the audio append.
let audio = harness.sideband_outbound_request(/*request_index*/ 4).await;
let audio = harness.sideband_outbound_request(/*request_index*/ 3).await;
assert_eq!(audio["type"], "input_audio_buffer.append");
assert_eq!(audio["audio"], "BQYH");
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 5).await);
harness.shutdown().await;
Ok(())
}
/// Regression coverage for the same queued `response.create` path when the
/// active Realtime V2 response is cancelled instead of completed.
///
/// `response.cancelled` should clear the active-response guard exactly like
/// `response.done`, so a text turn queued during the active response still gets
/// one deferred `response.create`.
/// Regression coverage for append-only Realtime V2 text input when the active
/// response is cancelled instead of completed.
#[tokio::test]
async fn webrtc_v2_flushes_queued_text_response_create_when_response_is_cancelled() -> Result<()> {
async fn webrtc_v2_text_input_is_append_only_when_response_is_cancelled() -> Result<()> {
skip_if_no_network!(Ok(()));
// Phase 1: script a server-side response that becomes active after the first
@@ -1312,7 +1290,6 @@ async fn webrtc_v2_flushes_queued_text_response_create_when_response_is_cancelle
no_main_loop_responses(),
realtime_sideband(vec![realtime_sideband_connection(vec![
vec![session_updated("sess_v2_response_cancel_queue")],
vec![],
vec![json!({
"type": "response.created",
"response": { "id": "resp_cancelled" }
@@ -1322,7 +1299,6 @@ async fn webrtc_v2_flushes_queued_text_response_create_when_response_is_cancelle
"type": "response.cancelled",
"response": { "id": "resp_cancelled" }
})],
vec![],
])]),
)
.await?;
@@ -1331,36 +1307,29 @@ async fn webrtc_v2_flushes_queued_text_response_create_when_response_is_cancelle
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?;
// Phase 2: send the first text turn. It is safe to emit `response.create`
// immediately because no default response is active yet.
// Phase 2: send the first text turn. Text input is append-only, so this
// sends only the user text item.
let thread_id = started.started.thread_id.clone();
harness.append_text(thread_id.clone(), "first").await?;
assert_v2_user_text_item(
&harness.sideband_outbound_request(/*request_index*/ 1).await,
"first",
);
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 2).await);
// Phase 3: send a second text turn while `resp_cancelled` is still open.
// The user message must reach realtime, but `response.create` stays queued.
// The user message must reach realtime without requesting another response.
harness.append_text(thread_id.clone(), "second").await?;
assert_v2_user_text_item(
&harness.sideband_outbound_request(/*request_index*/ 3).await,
&harness.sideband_outbound_request(/*request_index*/ 2).await,
"second",
);
// Phase 4: the audio input causes the scripted sideband stream to send
// `response.cancelled`, which clears the active response and flushes the
// queued `response.create` for the second text turn.
// Phase 4: audio still forwards normally after text input.
harness.append_audio(thread_id).await?;
// This is the negative check: if the second text turn had emitted
// `response.create` immediately, request 4 would be that create instead of
// the audio append.
let audio = harness.sideband_outbound_request(/*request_index*/ 4).await;
let audio = harness.sideband_outbound_request(/*request_index*/ 3).await;
assert_eq!(audio["type"], "input_audio_buffer.append");
assert_eq!(audio["audio"], "BQYH");
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 5).await);
harness.shutdown().await;
Ok(())

View File

@@ -2265,7 +2265,7 @@ impl Session {
}
pub(crate) async fn route_realtime_text_input(self: &Arc<Self>, text: String) {
handlers::user_input_or_turn(
handlers::user_input_or_turn_inner(
self,
self.next_internal_sub_id(),
Op::UserInput {
@@ -2276,6 +2276,7 @@ impl Session {
final_output_json_schema: None,
responsesapi_client_metadata: None,
},
/*mirror_user_text_to_realtime*/ None,
)
.await;
}
@@ -4910,6 +4911,7 @@ mod handlers {
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Settings;
use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::items::UserMessageItem;
use codex_protocol::mcp::RequestId as ProtocolRequestId;
use codex_protocol::user_input::UserInput;
use codex_rmcp_client::ElicitationAction;
@@ -4917,6 +4919,7 @@ mod handlers {
use serde_json::Value;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::debug;
use tracing::info;
use tracing::warn;
@@ -4958,6 +4961,21 @@ mod handlers {
}
pub async fn user_input_or_turn(sess: &Arc<Session>, sub_id: String, op: Op) {
user_input_or_turn_inner(
sess,
sub_id,
op,
/*mirror_user_text_to_realtime*/ Some(()),
)
.await;
}
pub(super) async fn user_input_or_turn_inner(
sess: &Arc<Session>,
sub_id: String,
op: Op,
mirror_user_text_to_realtime: Option<()>,
) {
let (items, updates, responsesapi_client_metadata) = match op {
Op::UserTurn {
cwd,
@@ -5023,7 +5041,7 @@ mod handlers {
};
sess.maybe_emit_unknown_model_warning_for_turn(current_context.as_ref())
.await;
match sess
let accepted_items = match sess
.steer_input(
items.clone(),
/*expected_turn_id*/ None,
@@ -5031,7 +5049,10 @@ mod handlers {
)
.await
{
Ok(_) => current_context.session_telemetry.user_prompt(&items),
Ok(_) => {
current_context.session_telemetry.user_prompt(&items);
Some(items)
}
Err(SteerInputError::NoActiveTurn(items)) => {
if let Some(responsesapi_client_metadata) = responsesapi_client_metadata {
current_context
@@ -5041,12 +5062,14 @@ mod handlers {
current_context.session_telemetry.user_prompt(&items);
sess.refresh_mcp_servers_if_requested(&current_context)
.await;
let accepted_items = items.clone();
sess.spawn_task(
Arc::clone(&current_context),
items,
crate::tasks::RegularTask::new(),
)
.await;
Some(accepted_items)
}
Err(err) => {
sess.send_event_raw(Event {
@@ -5054,7 +5077,24 @@ mod handlers {
msg: EventMsg::Error(err.to_error_event()),
})
.await;
None
}
};
if let (Some(items), Some(())) = (accepted_items, mirror_user_text_to_realtime) {
self::mirror_user_text_to_realtime(sess, &items).await;
}
}
async fn mirror_user_text_to_realtime(sess: &Arc<Session>, items: &[UserInput]) {
let text = UserMessageItem::new(items).message();
if text.is_empty() {
return;
}
if sess.conversation.running_state().await.is_none() {
return;
}
if let Err(err) = sess.conversation.text_in(text).await {
debug!("failed to mirror user text to realtime conversation: {err}");
}
}

View File

@@ -945,8 +945,6 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
user_text,
&writer,
&events_tx,
session_kind,
&mut response_create_queue,
)
.await
}
@@ -992,8 +990,6 @@ async fn handle_user_text_input(
text: Result<String, RecvError>,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
session_kind: RealtimeSessionKind,
response_create_queue: &mut RealtimeResponseCreateQueue,
) -> anyhow::Result<()> {
let text = text.context("user text input channel closed")?;
@@ -1005,14 +1001,6 @@ async fn handle_user_text_input(
.await;
return Err(mapped_error.into());
}
match session_kind {
RealtimeSessionKind::V1 => {}
RealtimeSessionKind::V2 => {
response_create_queue
.request_create(writer, events_tx, "text")
.await?;
}
}
Ok(())
}

View File

@@ -1626,6 +1626,126 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_user_text_turn_is_sent_to_realtime_when_active() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let response_mock = responses::mount_sse_once(
&api_server,
responses::sse(vec![
responses::ev_response_created("resp_user_text"),
responses::ev_assistant_message("msg_user_text", "ack"),
responses::ev_completed("resp_user_text"),
]),
)
.await;
let realtime_server = start_websocket_server(vec![vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_user_text", "instructions": "backend prompt" }
})],
vec![],
]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.experimental_realtime_ws_startup_context = Some(String::new());
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: Some(Some("backend prompt".to_string())),
session_id: None,
transport: None,
voice: None,
}))
.await?;
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_user_text");
let user_text = "typed follow-up for realtime";
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user_text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let realtime_text_request = wait_for_matching_websocket_request(
&realtime_server,
"normal user turn text mirrored to realtime",
|request| websocket_request_text(request).as_deref() == Some(user_text),
)
.await;
let model_user_texts = response_mock.single_request().message_input_texts("user");
assert_eq!(
(
model_user_texts.iter().any(|text| text == user_text),
websocket_request_text(&realtime_text_request),
),
(true, Some(user_text.to_string())),
);
let realtime_response_create = timeout(Duration::from_millis(200), async {
wait_for_matching_websocket_request(
&realtime_server,
"unexpected realtime response request for mirrored user text",
|request| request.body_json()["type"].as_str() == Some("response.create"),
)
.await
})
.await;
assert!(
realtime_response_create.is_err(),
"mirrored user text should not request a realtime response"
);
let realtime_request_body = realtime_text_request.body_json();
let content = &realtime_request_body["item"]["content"][0];
let snapshot = format!(
"type: {}\nitem.type: {}\nitem.role: {}\ncontent[0].type: {}\ncontent[0].text: {}\nresponse.create: {}",
realtime_request_body["type"].as_str().unwrap_or_default(),
realtime_request_body["item"]["type"]
.as_str()
.unwrap_or_default(),
realtime_request_body["item"]["role"]
.as_str()
.unwrap_or_default(),
content["type"].as_str().unwrap_or_default(),
content["text"].as_str().unwrap_or_default(),
realtime_response_create.is_ok(),
);
insta::assert_snapshot!(
"conversation_user_text_turn_is_sent_to_realtime_when_active",
snapshot
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -2603,6 +2723,7 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
"type": "session.updated",
"session": { "id": "sess_steer", "instructions": "backend prompt" }
})],
vec![],
vec![
json!({
"type": "conversation.input_transcript.delta",
@@ -2658,6 +2779,12 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
matches!(event, EventMsg::AgentMessageContentDelta(_))
})
.await;
let _ = wait_for_matching_websocket_request(
&realtime_server,
"first prompt mirrored to realtime",
|request| websocket_request_text(request).as_deref() == Some("first prompt"),
)
.await;
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {

View File

@@ -0,0 +1,10 @@
---
source: core/tests/suite/realtime_conversation.rs
expression: snapshot
---
type: conversation.item.create
item.type: message
item.role: user
content[0].type: input_text
content[0].text: typed follow-up for realtime
response.create: false

View File

@@ -5050,11 +5050,6 @@ impl ChatWidget {
{
return;
}
let Some(user_message) =
self.maybe_defer_user_message_for_realtime(user_message)
else {
return;
};
let should_submit_now =
self.is_session_configured() && !self.is_plan_streaming_in_tui();
if should_submit_now {
@@ -5085,11 +5080,6 @@ impl ChatWidget {
.bottom_pane
.take_recent_submission_mention_bindings(),
};
let Some(user_message) =
self.maybe_defer_user_message_for_realtime(user_message)
else {
return;
};
self.queue_user_message(user_message);
}
InputResult::Command(cmd) => {

View File

@@ -29,7 +29,6 @@ pub(super) struct RealtimeConversationUiState {
pub(super) phase: RealtimeConversationPhase,
requested_close: bool,
session_id: Option<String>,
warned_audio_only_submission: bool,
transport: RealtimeConversationUiTransport,
#[cfg(not(target_os = "linux"))]
pub(super) meter_placeholder_id: Option<String>,
@@ -191,28 +190,6 @@ impl ChatWidget {
self.last_rendered_user_message_event.as_ref() != Some(&key)
}
pub(super) fn maybe_defer_user_message_for_realtime(
&mut self,
user_message: UserMessage,
) -> Option<UserMessage> {
if !self.realtime_conversation.is_live() {
return Some(user_message);
}
self.restore_user_message_to_composer(user_message);
if !self.realtime_conversation.warned_audio_only_submission {
self.realtime_conversation.warned_audio_only_submission = true;
self.add_info_message(
"Realtime voice mode is audio-only. Use /realtime to stop.".to_string(),
/*hint*/ None,
);
} else {
self.request_redraw();
}
None
}
fn realtime_footer_hint_items() -> Vec<(String, String)> {
vec![("/realtime".to_string(), "stop live voice".to_string())]
}
@@ -238,7 +215,6 @@ impl ChatWidget {
self.realtime_conversation.phase = RealtimeConversationPhase::Starting;
self.realtime_conversation.requested_close = false;
self.realtime_conversation.session_id = None;
self.realtime_conversation.warned_audio_only_submission = false;
self.set_footer_hint_override(Some(Self::realtime_footer_hint_items()));
match self.config.realtime.transport {
RealtimeTransport::Websocket => {
@@ -297,7 +273,6 @@ impl ChatWidget {
self.realtime_conversation.phase = RealtimeConversationPhase::Inactive;
self.realtime_conversation.requested_close = false;
self.realtime_conversation.session_id = None;
self.realtime_conversation.warned_audio_only_submission = false;
self.realtime_conversation.transport = RealtimeConversationUiTransport::Websocket;
}
@@ -320,7 +295,6 @@ impl ChatWidget {
return;
}
self.realtime_conversation.session_id = ev.session_id;
self.realtime_conversation.warned_audio_only_submission = false;
self.set_footer_hint_override(Some(Self::realtime_footer_hint_items()));
if self.realtime_conversation_uses_webrtc() {
self.realtime_conversation.phase = RealtimeConversationPhase::Starting;