Cap realtime mirrored user turns (#17685)

Cap mirrored user text sent to realtime with the existing 300-token turn
budget while preserving the full model turn.

Adds integration coverage for capped realtime mirror payloads.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-13 14:31:18 -07:00
committed by GitHub
parent ecdd733a48
commit ec0133f5f8
4 changed files with 164 additions and 24 deletions

View File

@@ -4904,6 +4904,8 @@ mod handlers {
use crate::config_loader::CloudRequirementsLoader;
use crate::config_loader::LoaderOverrides;
use crate::config_loader::load_config_layers_state;
use crate::realtime_context::REALTIME_TURN_TOKEN_BUDGET;
use crate::realtime_context::truncate_realtime_text_to_token_budget;
use codex_features::Feature;
use codex_utils_absolute_path::AbsolutePathBuf;
@@ -5124,6 +5126,10 @@ mod handlers {
if text.is_empty() {
return;
}
let text = truncate_realtime_text_to_token_budget(&text, REALTIME_TURN_TOKEN_BUDGET);
if text.is_empty() {
return;
}
if sess.conversation.running_state().await.is_none() {
return;
}

View File

@@ -27,7 +27,7 @@ const CURRENT_THREAD_SECTION_TOKEN_BUDGET: usize = 1_200;
const RECENT_WORK_SECTION_TOKEN_BUDGET: usize = 2_200;
const WORKSPACE_SECTION_TOKEN_BUDGET: usize = 1_600;
const NOTES_SECTION_TOKEN_BUDGET: usize = 300;
const CURRENT_THREAD_TURN_TOKEN_BUDGET: usize = 300;
pub(crate) const REALTIME_TURN_TOKEN_BUDGET: usize = 300;
const MAX_RECENT_THREADS: usize = 40;
const MAX_RECENT_WORK_GROUPS: usize = 8;
const MAX_CURRENT_CWD_ASKS: usize = 8;
@@ -262,30 +262,9 @@ fn build_current_thread_section(items: &[ResponseItem]) -> Option<String> {
turn_lines.push(assistant_messages.join("\n\n"));
}
let turn_budget = CURRENT_THREAD_TURN_TOKEN_BUDGET.min(remaining_budget);
let turn_budget = REALTIME_TURN_TOKEN_BUDGET.min(remaining_budget);
let turn_text = turn_lines.join("\n");
let mut truncation_budget = turn_budget;
let turn_text = loop {
let candidate = truncate_text(&turn_text, TruncationPolicy::Tokens(truncation_budget));
let candidate_tokens = approx_token_count(&candidate);
if candidate_tokens <= turn_budget {
break candidate;
}
// The shared truncator adds its marker after choosing preserved
// content, so tighten the content budget until the rendered turn
// itself fits the per-turn cap.
let excess_tokens = candidate_tokens.saturating_sub(turn_budget);
let next_budget = truncation_budget.saturating_sub(excess_tokens.max(1));
if next_budget == 0 {
let candidate = truncate_text(&turn_text, TruncationPolicy::Tokens(0));
if approx_token_count(&candidate) <= turn_budget {
break candidate;
}
break String::new();
}
truncation_budget = next_budget;
};
let turn_text = truncate_realtime_text_to_token_budget(&turn_text, turn_budget);
let turn_tokens = approx_token_count(&turn_text);
if turn_tokens == 0 {
continue;
@@ -300,6 +279,31 @@ fn build_current_thread_section(items: &[ResponseItem]) -> Option<String> {
(retained_turn_count > 0).then(|| lines.join("\n"))
}
pub(crate) fn truncate_realtime_text_to_token_budget(text: &str, budget_tokens: usize) -> String {
let mut truncation_budget = budget_tokens;
loop {
let candidate = truncate_text(text, TruncationPolicy::Tokens(truncation_budget));
let candidate_tokens = approx_token_count(&candidate);
if candidate_tokens <= budget_tokens {
break candidate;
}
// The shared truncator adds its marker after choosing preserved
// content, so tighten the content budget until the rendered turn
// itself fits the per-turn cap.
let excess_tokens = candidate_tokens.saturating_sub(budget_tokens);
let next_budget = truncation_budget.saturating_sub(excess_tokens.max(1));
if next_budget == 0 {
let candidate = truncate_text(text, TruncationPolicy::Tokens(0));
if approx_token_count(&candidate) <= budget_tokens {
break candidate;
}
break String::new();
}
truncation_budget = next_budget;
}
}
fn build_workspace_section_with_user_root(
cwd: &Path,
user_root: Option<PathBuf>,

View File

@@ -25,6 +25,7 @@ use codex_protocol::protocol::RealtimeVoice;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
use codex_utils_output_truncation::approx_token_count;
use core_test_support::responses;
use core_test_support::responses::WebSocketConnectionConfig;
use core_test_support::responses::start_mock_server;
@@ -1910,6 +1911,123 @@ async fn conversation_user_text_turn_is_sent_to_realtime_when_active() -> Result
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_user_text_turn_is_capped_when_mirrored_to_realtime() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let response_mock = responses::mount_sse_once(
&api_server,
responses::sse(vec![
responses::ev_response_created("resp_long_user_text"),
responses::ev_assistant_message("msg_long_user_text", "ack"),
responses::ev_completed("resp_long_user_text"),
]),
)
.await;
let realtime_server = start_websocket_server(vec![vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_long_user_text", "instructions": "backend prompt" }
})],
vec![],
]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.experimental_realtime_ws_startup_context = Some(String::new());
}
});
let test = builder.build(&api_server).await?;
// Phase 1: start realtime so the next normal user turn mirrors over the
// active WebSocket session.
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: Some(Some("backend prompt".to_string())),
session_id: None,
transport: None,
voice: None,
}))
.await?;
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_updated, "sess_long_user_text");
// Phase 2: submit one oversized text turn. The model request should keep
// the exact user text, while the realtime mirror should get the capped copy.
let user_text = format!(
"mirror-head {} mirror-middle {} mirror-tail",
"alpha ".repeat(900),
"omega ".repeat(900),
);
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user_text.clone(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
// Phase 3: capture the mirrored WebSocket item; the snapshot below records
// the capped payload shape.
let realtime_text_request = wait_for_matching_websocket_request(
&realtime_server,
"capped normal user turn text mirrored to realtime",
|request| websocket_request_text(request).is_some_and(|text| text.contains("mirror-head")),
)
.await;
let realtime_text =
websocket_request_text(&realtime_text_request).expect("realtime request text");
let model_user_texts = response_mock.single_request().message_input_texts("user");
let realtime_request_body = realtime_text_request.body_json();
let content = &realtime_request_body["item"]["content"][0];
// Snapshot the request envelope and capped text together so reviewers can
// see the preserved head/tail and truncation marker in one place.
let snapshot = format!(
"type: {}\nitem.type: {}\nitem.role: {}\ncontent[0].type: {}\nmodel_has_full_user_text: {}\nrealtime_text_equal_full_user_text: {}\nrealtime_text_approx_tokens: {}\ncontent[0].text: {}",
realtime_request_body["type"].as_str().unwrap_or_default(),
realtime_request_body["item"]["type"]
.as_str()
.unwrap_or_default(),
realtime_request_body["item"]["role"]
.as_str()
.unwrap_or_default(),
content["type"].as_str().unwrap_or_default(),
model_user_texts.iter().any(|text| text == &user_text),
realtime_text == user_text,
approx_token_count(&realtime_text),
realtime_text,
);
insta::assert_snapshot!(
"conversation_user_text_turn_is_capped_when_mirrored_to_realtime",
snapshot
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -0,0 +1,12 @@
---
source: core/tests/suite/realtime_conversation.rs
expression: snapshot
---
type: conversation.item.create
item.type: message
item.role: user
content[0].type: input_text
model_has_full_user_text: true
realtime_text_equal_full_user_text: false
realtime_text_approx_tokens: 300
content[0].text: mirror-head alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alpha alph…2417 tokens truncated…ega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega omega mirror-tail