Budget realtime current thread context (#17519)

Select Current Thread startup context by budget from newest turns, cap
each rendered turn at 300 approximate tokens, and add formatter plus
integration snapshot coverage.
This commit is contained in:
Ahmed Ibrahim
2026-04-12 11:59:09 -07:00
committed by GitHub
parent 1288bb60a1
commit 4db60d5d8b
4 changed files with 389 additions and 26 deletions

View File

@@ -2,9 +2,12 @@ use anyhow::Context;
use anyhow::Result;
use chrono::Utc;
use codex_config::config_toml::RealtimeWsVersion;
use codex_core::test_support::auth_manager_from_auth;
use codex_login::CodexAuth;
use codex_login::OPENAI_API_KEY_ENV_VAR;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::ConversationStartParams;
@@ -12,12 +15,14 @@ use codex_protocol::protocol::ConversationStartTransport;
use codex_protocol::protocol::ConversationTextParams;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeConversationVersion;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RealtimeVoice;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
@@ -1508,6 +1513,165 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_startup_context_current_thread_selects_many_turns_by_budget() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_current_thread_budget", "instructions": "backend prompt" }
})]]])
.await;
let latest_long_user_turn = format!(
"latest-long-start {} latest-long-middle {} latest-long-end",
"head detail ".repeat(120),
"tail detail ".repeat(170),
);
let mut user_turns = (1..=7)
.map(|index| {
format!(
"short-turn-{index}-start {} short-turn-{index}-end",
"detail ".repeat(86)
)
})
.collect::<Vec<_>>();
user_turns.push(latest_long_user_turn.clone());
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.realtime.version = RealtimeWsVersion::V1;
}
});
let test = builder.build(&api_server).await?;
// Seed completed turns through a resumed thread so this remains an
// end-to-end startup-context test without paying for a model turn per
// fixture entry in platform CI.
let history = user_turns
.into_iter()
.enumerate()
.flat_map(|(index, user_turn)| {
let turn_number = index + 1;
let assistant_turn = format!("assistant turn {turn_number}");
[
RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: user_turn }],
end_turn: None,
phase: None,
}),
RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: assistant_turn,
}],
end_turn: None,
phase: None,
}),
]
})
.collect::<Vec<_>>();
test.codex.shutdown_and_wait().await?;
let resumed_thread = test
.thread_manager
.resume_thread_with_history(
test.config.clone(),
InitialHistory::Forked(history),
auth_manager_from_auth(CodexAuth::from_api_key("dummy")),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await?;
let codex = resumed_thread.thread;
codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: Some(Some("backend prompt".to_string())),
session_id: None,
transport: None,
voice: None,
}))
.await?;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"current thread budget startup context request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let startup_context = websocket_request_instructions(&startup_context_request)
.expect("startup context request should contain instructions");
// Isolate only the Current Thread section; the startup prompt may also include
// workspace and notes sections after it.
let current_thread_start = startup_context
.find("## Current Thread")
.expect("startup context should include current thread section");
let current_thread_and_rest = &startup_context[current_thread_start..];
let current_thread_end = [
"\n## Recent Work",
"\n## Machine / Workspace Map",
"\n## Notes",
]
.iter()
.filter_map(|marker| current_thread_and_rest.find(marker))
.min()
.unwrap_or(current_thread_and_rest.len());
let current_thread = &current_thread_and_rest[..current_thread_end];
let rendered_turns = current_thread
.split("\n### ")
.skip(1)
.map(|turn| format!("### {turn}"))
.collect::<Vec<_>>();
let over_budget_turns = rendered_turns
.iter()
.filter_map(|turn| {
let token_count = turn.len().div_ceil(4);
(token_count > 300).then(|| {
(
turn.lines().next().unwrap_or_default().to_string(),
token_count,
)
})
})
.collect::<Vec<_>>();
let latest_rendered_source =
format!("### Latest turn\nUser:\n{latest_long_user_turn}\n\nAssistant:\nassistant turn 8");
// Snapshot the actual section so turn order, oldest-first omission, and
// start/end truncation behavior are reviewed together.
let snapshot = format!(
"latest_source_tokens: {}\nrendered_turn_count: {}\nover_budget_turns: {over_budget_turns:?}\n\n{current_thread}",
latest_rendered_source.len().div_ceil(4),
rendered_turns.len(),
);
insta::assert_snapshot!(
"conversation_startup_context_current_thread_selects_many_turns_by_budget",
snapshot
);
// The input includes a turn over 300 approximate tokens, and every rendered
// turn still fits the per-turn cap after labels and truncation markers.
assert_eq!(
(
latest_rendered_source.len().div_ceil(4) > 300,
over_budget_turns,
),
(true, Vec::<(String, usize)>::new()),
);
codex.shutdown_and_wait().await?;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()> {
skip_if_no_network!(Ok(()));