mirror of
https://github.com/openai/codex.git
synced 2026-04-24 22:54:54 +00:00
feat: proxy context left after compaction (#6597)
This commit is contained in:
@@ -96,6 +96,7 @@ use crate::protocol::StreamErrorEvent;
|
||||
use crate::protocol::Submission;
|
||||
use crate::protocol::TokenCountEvent;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::protocol::TokenUsageInfo;
|
||||
use crate::protocol::TurnDiffEvent;
|
||||
use crate::protocol::WarningEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
@@ -1082,6 +1083,36 @@ impl Session {
|
||||
self.send_token_count_event(turn_context).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn override_last_token_usage_estimate(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
estimated_total_tokens: i64,
|
||||
) {
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
let mut info = state.token_info().unwrap_or(TokenUsageInfo {
|
||||
total_token_usage: TokenUsage::default(),
|
||||
last_token_usage: TokenUsage::default(),
|
||||
model_context_window: None,
|
||||
});
|
||||
|
||||
info.last_token_usage = TokenUsage {
|
||||
input_tokens: 0,
|
||||
cached_input_tokens: 0,
|
||||
output_tokens: 0,
|
||||
reasoning_output_tokens: 0,
|
||||
total_tokens: estimated_total_tokens.max(0),
|
||||
};
|
||||
|
||||
if info.model_context_window.is_none() {
|
||||
info.model_context_window = turn_context.client.get_model_context_window();
|
||||
}
|
||||
|
||||
state.set_token_info(Some(info));
|
||||
}
|
||||
self.send_token_count_event(turn_context).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn update_rate_limits(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
|
||||
@@ -153,6 +153,15 @@ async fn run_compact_task_inner(
|
||||
new_history.extend(ghost_snapshots);
|
||||
sess.replace_history(new_history).await;
|
||||
|
||||
if let Some(estimated_tokens) = sess
|
||||
.clone_history()
|
||||
.await
|
||||
.estimate_token_count(&turn_context)
|
||||
{
|
||||
sess.override_last_token_usage_estimate(&turn_context, estimated_tokens)
|
||||
.await;
|
||||
}
|
||||
|
||||
let rollout_item = RolloutItem::Compacted(CompactedItem {
|
||||
message: summary_text.clone(),
|
||||
});
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
use crate::codex::TurnContext;
|
||||
use crate::context_manager::normalize;
|
||||
use crate::context_manager::truncate::format_output_for_model_body;
|
||||
use crate::context_manager::truncate::globally_truncate_function_output_items;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use codex_utils_tokenizer::Tokenizer;
|
||||
use std::ops::Deref;
|
||||
|
||||
use crate::context_manager::normalize;
|
||||
use crate::context_manager::truncate::format_output_for_model_body;
|
||||
use crate::context_manager::truncate::globally_truncate_function_output_items;
|
||||
|
||||
/// Transcript of conversation history
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct ContextManager {
|
||||
@@ -28,6 +29,10 @@ impl ContextManager {
|
||||
self.token_info.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_info(&mut self, info: Option<TokenUsageInfo>) {
|
||||
self.token_info = info;
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_usage_full(&mut self, context_window: i64) {
|
||||
match &mut self.token_info {
|
||||
Some(info) => info.fill_to_context_window(context_window),
|
||||
@@ -68,6 +73,28 @@ impl ContextManager {
|
||||
history
|
||||
}
|
||||
|
||||
// Estimate the number of tokens in the history. Return None if no tokenizer
|
||||
// is available. This does not consider the reasoning traces.
|
||||
// /!\ The value is a lower bound estimate and does not represent the exact
|
||||
// context length.
|
||||
pub(crate) fn estimate_token_count(&self, turn_context: &TurnContext) -> Option<i64> {
|
||||
let model = turn_context.client.get_model();
|
||||
let tokenizer = Tokenizer::for_model(model.as_str()).ok()?;
|
||||
let model_family = turn_context.client.get_model_family();
|
||||
|
||||
Some(
|
||||
self.items
|
||||
.iter()
|
||||
.map(|item| {
|
||||
serde_json::to_string(&item)
|
||||
.map(|item| tokenizer.count(&item))
|
||||
.unwrap_or_default()
|
||||
})
|
||||
.sum::<i64>()
|
||||
+ tokenizer.count(model_family.base_instructions.as_str()),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn remove_first_item(&mut self) {
|
||||
if !self.items.is_empty() {
|
||||
// Remove the oldest item (front of the list). Items are ordered from
|
||||
|
||||
@@ -42,6 +42,10 @@ impl SessionState {
|
||||
self.history.replace(items);
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_info(&mut self, info: Option<TokenUsageInfo>) {
|
||||
self.history.set_token_info(info);
|
||||
}
|
||||
|
||||
// Token/rate limit helpers
|
||||
pub(crate) fn update_token_info_from_usage(
|
||||
&mut self,
|
||||
|
||||
@@ -14,6 +14,7 @@ use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use std::collections::VecDeque;
|
||||
use tempfile::TempDir;
|
||||
|
||||
@@ -366,6 +367,72 @@ async fn manual_compact_uses_custom_prompt() {
|
||||
assert!(!found_default_prompt, "default prompt should be replaced");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn manual_compact_emits_estimated_token_usage_event() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
// Compact run where the API reports zero tokens in usage. Our local
|
||||
// estimator should still compute a non-zero context size for the compacted
|
||||
// history.
|
||||
let sse_compact = sse(vec![
|
||||
ev_assistant_message("m1", SUMMARY_TEXT),
|
||||
ev_completed_with_tokens("r1", 0),
|
||||
]);
|
||||
mount_sse_once(&server, sse_compact).await;
|
||||
|
||||
let model_provider = ModelProviderInfo {
|
||||
base_url: Some(format!("{}/v1", server.uri())),
|
||||
..built_in_model_providers()["openai"].clone()
|
||||
};
|
||||
let home = TempDir::new().unwrap();
|
||||
let mut config = load_default_config_for_test(&home);
|
||||
config.model_provider = model_provider;
|
||||
set_test_compact_prompt(&mut config);
|
||||
|
||||
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
|
||||
let NewConversation {
|
||||
conversation: codex,
|
||||
..
|
||||
} = conversation_manager.new_conversation(config).await.unwrap();
|
||||
|
||||
// Trigger manual compact and collect TokenCount events for the compact turn.
|
||||
codex.submit(Op::Compact).await.unwrap();
|
||||
|
||||
// First TokenCount: from the compact API call (usage.total_tokens = 0).
|
||||
let first = wait_for_event_match(&codex, |ev| match ev {
|
||||
EventMsg::TokenCount(tc) => tc
|
||||
.info
|
||||
.as_ref()
|
||||
.map(|info| info.last_token_usage.total_tokens),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
// Second TokenCount: from the local post-compaction estimate.
|
||||
let last = wait_for_event_match(&codex, |ev| match ev {
|
||||
EventMsg::TokenCount(tc) => tc
|
||||
.info
|
||||
.as_ref()
|
||||
.map(|info| info.last_token_usage.total_tokens),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
// Ensure the compact task itself completes.
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
assert_eq!(
|
||||
first, 0,
|
||||
"expected first TokenCount from compact API usage to be zero"
|
||||
);
|
||||
assert!(
|
||||
last > 0,
|
||||
"second TokenCount should reflect a non-zero estimated context size after compaction"
|
||||
);
|
||||
}
|
||||
|
||||
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
|
||||
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
|
||||
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]
|
||||
|
||||
Reference in New Issue
Block a user