compact: split core logic changes from snapshot test coverage

This commit is contained in:
Charles Cunningham
2026-02-11 12:05:26 -08:00
parent 7255c6b03c
commit b774ea09ad
8 changed files with 1213 additions and 87 deletions

View File

@@ -17,6 +17,8 @@ use crate::analytics_client::AnalyticsEventsClient;
use crate::analytics_client::build_track_events_context;
use crate::apps::render_apps_section;
use crate::compact;
use crate::compact::AutoCompactCallsite;
use crate::compact::TurnContextReinjection;
use crate::compact::run_inline_auto_compact_task;
use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
@@ -123,6 +125,7 @@ use crate::config::types::McpServerConfig;
use crate::config::types::ShellEnvironmentPolicy;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::context_manager::estimate_item_token_count;
use crate::environment_context::EnvironmentContext;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
@@ -2166,11 +2169,11 @@ impl Session {
history.replace(replacement.clone());
} else {
let user_messages = collect_user_messages(history.raw_items());
let rebuilt = compact::build_compacted_history(
self.build_initial_context(turn_context).await,
let mut rebuilt = self.build_initial_context(turn_context).await;
rebuilt.extend(compact::build_compacted_history(
&user_messages,
&compacted.message,
);
));
history.replace(rebuilt);
}
}
@@ -2187,9 +2190,14 @@ impl Session {
&self,
turn_context: &TurnContext,
compacted_history: Vec<ResponseItem>,
turn_context_reinjection: TurnContextReinjection,
) -> Vec<ResponseItem> {
let initial_context = self.build_initial_context(turn_context).await;
compact::process_compacted_history(compacted_history, &initial_context)
compact::process_compacted_history(
compacted_history,
&initial_context,
turn_context_reinjection,
)
}
/// Append ResponseItems to the in-memory conversation history only.
@@ -2239,6 +2247,11 @@ impl Session {
self.flush_rollout().await;
}
pub(crate) async fn mark_initial_context_unseeded_for_next_turn(&self) {
let mut state = self.state.lock().await;
state.initial_context_seeded = false;
}
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
let rollout_items: Vec<RolloutItem> = items
.iter()
@@ -2295,14 +2308,6 @@ impl Session {
DeveloperInstructions::new(SEARCH_TOOL_DEVELOPER_INSTRUCTIONS.to_string()).into(),
);
}
// Add developer instructions for memories.
if let Some(memory_prompt) =
memories::build_memory_tool_developer_instructions(&turn_context.config.codex_home)
.await
&& turn_context.features.enabled(Feature::MemoryTool)
{
items.push(DeveloperInstructions::new(memory_prompt).into());
}
// Add developer instructions from collaboration_mode if they exist and are non-empty
let (collaboration_mode, base_instructions) = {
let state = self.state.lock().await;
@@ -3122,19 +3127,29 @@ mod handlers {
if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await {
sess.seed_initial_context_if_needed(&current_context).await;
let resumed_model = sess.take_pending_resume_previous_model().await;
let update_items = sess.build_settings_update_items(
let pre_turn_context_items = sess.build_settings_update_items(
previous_context.as_ref(),
resumed_model.as_deref(),
&current_context,
);
if !update_items.is_empty() {
sess.record_conversation_items(&current_context, &update_items)
let has_user_input = !items.is_empty();
if !has_user_input && !pre_turn_context_items.is_empty() {
// Empty-input UserTurn still needs these model-visible updates persisted now.
// Otherwise `previous_context` advances and the next non-empty turn computes no diff.
sess.record_conversation_items(&current_context, &pre_turn_context_items)
.await;
}
sess.refresh_mcp_servers_if_requested(&current_context)
.await;
let regular_task = sess.take_startup_regular_task().await.unwrap_or_default();
let regular_task = if has_user_input {
sess.take_startup_regular_task()
.await
.unwrap_or_default()
.with_pre_turn_context_items(pre_turn_context_items)
} else {
sess.take_startup_regular_task().await.unwrap_or_default()
};
sess.spawn_task(Arc::clone(&current_context), items, regular_task)
.await;
*previous_context = Some(current_context);
@@ -3861,6 +3876,7 @@ pub(crate) async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
pre_turn_context_items: Vec<ResponseItem>,
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -3870,7 +3886,9 @@ pub(crate) async fn run_turn(
let model_info = turn_context.model_info.clone();
let auto_compact_limit = model_info.auto_compact_token_limit().unwrap_or(i64::MAX);
let total_usage_tokens = sess.get_total_token_usage().await;
let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into();
let mut incoming_turn_items = pre_turn_context_items.clone();
incoming_turn_items.push(response_item.clone());
let event = EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_context.sub_id.clone(),
@@ -3878,11 +3896,34 @@ pub(crate) async fn run_turn(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, event).await;
if total_usage_tokens >= auto_compact_limit
&& run_auto_compact(&sess, &turn_context).await.is_err()
let pre_turn_compaction_outcome = match run_pre_turn_auto_compaction_if_needed(
&sess,
&turn_context,
auto_compact_limit,
&incoming_turn_items,
)
.await
{
return None;
}
Ok(outcome) => outcome,
Err(CodexErr::ContextWindowExceeded) => {
let incoming_items_tokens_estimate = incoming_turn_items
.iter()
.map(estimate_item_token_count)
.fold(0_i64, i64::saturating_add);
let message = format!(
"Incoming user message and/or turn context is too large to fit in context window. Please reduce the size of your message and try again. (incoming_items_tokens_estimate={incoming_items_tokens_estimate})"
);
let event =
EventMsg::Error(CodexErr::ContextWindowExceeded.to_error_event(Some(message)));
sess.send_event(&turn_context, event).await;
return None;
}
Err(err) => {
let event = EventMsg::Error(err.to_error_event(None));
sess.send_event(&turn_context, event).await;
return None;
}
};
let skills_outcome = Some(
sess.services
@@ -3954,10 +3995,15 @@ pub(crate) async fn run_turn(
.await;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
persist_pre_turn_items_for_compaction_outcome(
&sess,
&turn_context,
pre_turn_compaction_outcome,
&pre_turn_context_items,
&input,
response_item,
)
.await;
if !skill_items.is_empty() {
sess.record_conversation_items(&turn_context, &skill_items)
@@ -4059,7 +4105,19 @@ pub(crate) async fn run_turn(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
if run_auto_compact(&sess, &turn_context).await.is_err() {
if let Err(err) = run_auto_compact(
&sess,
&turn_context,
AutoCompactCallsite::MidTurnContinuation,
TurnContextReinjection::ReinjectAboveLastRealUser,
None,
)
.await
{
let event = EventMsg::Error(
err.to_error_event(Some("Error running auto compact task".to_string())),
);
sess.send_event(&turn_context, event).await;
return None;
}
continue;
@@ -4119,13 +4177,158 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) -> CodexResult<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
} else {
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PreTurnCompactionOutcome {
/// Pre-turn input fits without compaction.
NotNeeded,
/// Pre-turn compaction succeeded with incoming turn context + user message included.
CompactedWithIncomingItems,
/// Pre-turn compaction succeeded without incoming turn items
/// (incoming user message should be appended after the compaction summary).
/// This compaction strategy is currently out of distribution for our compaction model,
/// but is planned to be trained on in the future.
#[cfg(test)]
CompactedWithoutIncomingItems,
}
async fn persist_pre_turn_items_for_compaction_outcome(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
outcome: PreTurnCompactionOutcome,
pre_turn_context_items: &[ResponseItem],
input: &[UserInput],
response_item: ResponseItem,
) {
match outcome {
PreTurnCompactionOutcome::CompactedWithIncomingItems => {
// Incoming turn items were already part of pre-turn compaction input, and the
// user prompt is already persisted in history after compaction. Emit lifecycle events
// only so UI/consumers still observe a normal user turn item transition.
let turn_item = TurnItem::UserMessage(UserMessageItem::new(input));
sess.emit_turn_item_started(turn_context.as_ref(), &turn_item)
.await;
sess.emit_turn_item_completed(turn_context.as_ref(), turn_item)
.await;
sess.ensure_rollout_materialized().await;
}
PreTurnCompactionOutcome::NotNeeded => {
if !pre_turn_context_items.is_empty() {
sess.record_conversation_items(turn_context, pre_turn_context_items)
.await;
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), input, response_item)
.await;
}
#[cfg(test)]
PreTurnCompactionOutcome::CompactedWithoutIncomingItems => {
// Reserved path for future models that compact pre-turn history without incoming turn
// items; reseed canonical initial context above the incoming user message.
let initial_context = sess.build_initial_context(turn_context.as_ref()).await;
if !initial_context.is_empty() {
sess.record_conversation_items(turn_context, &initial_context)
.await;
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), input, response_item)
.await;
}
}
Ok(())
}
/// Runs pre-turn auto-compaction with incoming turn context + user message included.
async fn run_pre_turn_auto_compaction_if_needed(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
auto_compact_limit: i64,
incoming_turn_items: &[ResponseItem],
) -> CodexResult<PreTurnCompactionOutcome> {
let total_usage_tokens = sess.get_total_token_usage().await;
let incoming_items_tokens_estimate = incoming_turn_items
.iter()
.map(estimate_item_token_count)
.fold(0_i64, i64::saturating_add);
if !is_projected_submission_over_auto_compact_limit(
total_usage_tokens,
incoming_items_tokens_estimate,
auto_compact_limit,
) {
return Ok(PreTurnCompactionOutcome::NotNeeded);
}
let compact_result = run_auto_compact(
sess,
turn_context,
AutoCompactCallsite::PreTurnIncludingIncomingUserMessage,
TurnContextReinjection::ReinjectAboveLastRealUser,
Some(incoming_turn_items.to_vec()),
)
.await;
if let Err(err) = compact_result {
if matches!(err, CodexErr::ContextWindowExceeded) {
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?AutoCompactCallsite::PreTurnIncludingIncomingUserMessage,
incoming_items_tokens_estimate,
auto_compact_limit,
reason = "pre-turn compaction exceeded context window",
"incoming user/context is too large for pre-turn auto-compaction flow"
);
}
return Err(err);
}
Ok(PreTurnCompactionOutcome::CompactedWithIncomingItems)
}
fn is_projected_submission_over_auto_compact_limit(
total_usage_tokens: i64,
incoming_user_tokens_estimate: i64,
auto_compact_limit: i64,
) -> bool {
if auto_compact_limit == i64::MAX {
return false;
}
total_usage_tokens.saturating_add(incoming_user_tokens_estimate) >= auto_compact_limit
}
async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
let result = if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(
Arc::clone(sess),
Arc::clone(turn_context),
auto_compact_callsite,
turn_context_reinjection,
incoming_items,
)
.await
} else {
run_inline_auto_compact_task(
Arc::clone(sess),
Arc::clone(turn_context),
auto_compact_callsite,
turn_context_reinjection,
incoming_items,
)
.await
};
if let Err(err) = &result {
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
compact_error = %err,
"auto compaction failed"
);
}
result
}
fn filter_connectors_for_input(
@@ -4996,7 +5199,7 @@ async fn try_run_sampling_request(
ResponseEvent::Completed {
response_id: _,
token_usage,
can_append: _,
..
} => {
if let Some(state) = plan_mode_state.as_mut() {
flush_proposed_plan_segments_all(&sess, &turn_context, state).await;
@@ -5208,6 +5411,151 @@ mod tests {
}
}
#[test]
fn pre_turn_projection_uses_incoming_user_tokens_for_compaction() {
assert!(is_projected_submission_over_auto_compact_limit(90, 15, 100));
assert!(!is_projected_submission_over_auto_compact_limit(90, 9, 100));
}
#[test]
fn pre_turn_projection_does_not_compact_with_unbounded_limit() {
assert!(!is_projected_submission_over_auto_compact_limit(
i64::MAX - 1,
100,
i64::MAX,
));
}
#[test]
fn post_compaction_projection_triggers_error_when_still_over_limit() {
assert!(is_projected_submission_over_auto_compact_limit(95, 10, 100));
assert!(is_projected_submission_over_auto_compact_limit(
100, 10, 100
));
assert!(!is_projected_submission_over_auto_compact_limit(
80, 10, 100
));
}
#[tokio::test]
async fn reserved_compacted_without_incoming_items_records_initial_context_and_prompt() {
let (session, turn_context) = make_session_and_context().await;
let session = Arc::new(session);
let turn_context = Arc::new(turn_context);
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into();
let stale_pre_turn_context_items = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "stale context diff".to_string(),
}],
end_turn: None,
phase: None,
}];
persist_pre_turn_items_for_compaction_outcome(
&session,
&turn_context,
PreTurnCompactionOutcome::CompactedWithoutIncomingItems,
&stale_pre_turn_context_items,
&input,
response_item.clone(),
)
.await;
let mut expected = session.build_initial_context(turn_context.as_ref()).await;
expected.push(response_item);
let actual = session.clone_history().await.raw_items().to_vec();
assert_eq!(actual, expected);
}
#[tokio::test]
async fn compacted_with_incoming_items_emits_lifecycle_without_history_writes() {
let (session, turn_context) = make_session_and_context().await;
let session = Arc::new(session);
let turn_context = Arc::new(turn_context);
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into();
let stale_pre_turn_context_items = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "stale context diff".to_string(),
}],
end_turn: None,
phase: None,
}];
persist_pre_turn_items_for_compaction_outcome(
&session,
&turn_context,
PreTurnCompactionOutcome::CompactedWithIncomingItems,
&stale_pre_turn_context_items,
&input,
response_item,
)
.await;
let actual = session.clone_history().await.raw_items().to_vec();
assert_eq!(actual, Vec::<ResponseItem>::new());
}
#[test]
fn estimate_user_input_token_count_is_positive_for_text_input() {
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
let response_input_item = ResponseInputItem::from(input);
let response_item: ResponseItem = response_input_item.into();
let estimated_tokens = estimate_item_token_count(&response_item);
assert!(estimated_tokens > 0);
}
#[test]
fn estimate_user_input_token_count_ignores_skill_and_mention_payload_lengths() {
let short = vec![
UserInput::Skill {
name: "s".to_string(),
path: PathBuf::from("/s"),
},
UserInput::Mention {
name: "m".to_string(),
path: "app://m".to_string(),
},
];
let long = vec![
UserInput::Skill {
name: "very-long-skill-name-that-should-not-affect-prompt-serialization"
.to_string(),
path: PathBuf::from(
"/very/long/skill/path/that/should/not/affect/prompt/serialization/SKILL.md",
),
},
UserInput::Mention {
name: "very-long-mention-name-that-should-not-affect-prompt-serialization"
.to_string(),
path: "app://very-long-connector-path-that-should-not-affect-prompt-serialization"
.to_string(),
},
];
let short_response_input_item = ResponseInputItem::from(short);
let long_response_input_item = ResponseInputItem::from(long);
let short_response_item: ResponseItem = short_response_input_item.into();
let long_response_item: ResponseItem = long_response_input_item.into();
let short_tokens = estimate_item_token_count(&short_response_item);
let long_tokens = estimate_item_token_count(&long_response_item);
assert_eq!(short_tokens, long_tokens);
}
fn make_connector(id: &str, name: &str) -> AppInfo {
AppInfo {
id: id.to_string(),
@@ -6969,8 +7317,8 @@ mod tests {
.clone()
.for_prompt(&reconstruction_turn.model_info.input_modalities);
let user_messages1 = collect_user_messages(&snapshot1);
let rebuilt1 =
compact::build_compacted_history(initial_context.clone(), &user_messages1, summary1);
let mut rebuilt1 = initial_context.clone();
rebuilt1.extend(compact::build_compacted_history(&user_messages1, summary1));
live_history.replace(rebuilt1);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
message: summary1.to_string(),
@@ -7012,8 +7360,8 @@ mod tests {
.clone()
.for_prompt(&reconstruction_turn.model_info.input_modalities);
let user_messages2 = collect_user_messages(&snapshot2);
let rebuilt2 =
compact::build_compacted_history(initial_context.clone(), &user_messages2, summary2);
let mut rebuilt2 = initial_context.clone();
rebuilt2.extend(compact::build_compacted_history(&user_messages2, summary2));
live_history.replace(rebuilt2);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
message: summary2.to_string(),

View File

@@ -7,6 +7,7 @@ use crate::client_common::ResponseEvent;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::get_last_assistant_message_from_turn;
use crate::context_manager::is_user_turn_boundary;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::protocol::CompactedItem;
@@ -32,6 +33,32 @@ pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt
pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum AutoCompactCallsite {
/// Pre-turn auto-compaction where the incoming turn context + user message are included in
/// the compaction request.
PreTurnIncludingIncomingUserMessage,
/// Reserved pre-turn auto-compaction strategy that compacts from the end of the previous turn
/// only, excluding incoming turn context + user message. This is currently unused by the
/// default pre-turn flow and retained for future model-specific strategies.
#[allow(dead_code)]
PreTurnExcludingIncomingUserMessage,
/// Mid-turn compaction between assistant responses in a follow-up loop.
MidTurnContinuation,
}
/// Controls whether compacted-history processing should reinsert canonical turn context.
///
/// When callers exclude incoming user/context from the compaction request, they should typically
/// set reinjection to `Skip` and append canonical context together with the next user message.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum TurnContextReinjection {
/// Insert canonical context immediately above the last real user message in compacted history.
ReinjectAboveLastRealUser,
/// Do not reinsert canonical context while processing compacted history.
Skip,
}
pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bool {
provider.is_openai()
}
@@ -39,6 +66,9 @@ pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bo
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
@@ -47,7 +77,15 @@ pub(crate) async fn run_inline_auto_compact_task(
text_elements: Vec::new(),
}];
run_compact_task_inner(sess, turn_context, input).await?;
run_compact_task_inner(
sess,
turn_context,
input,
Some(auto_compact_callsite),
turn_context_reinjection,
incoming_items,
)
.await?;
Ok(())
}
@@ -62,13 +100,24 @@ pub(crate) async fn run_compact_task(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await
run_compact_task_inner(
sess,
turn_context,
input,
None,
TurnContextReinjection::Skip,
None,
)
.await
}
async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
auto_compact_callsite: Option<AutoCompactCallsite>,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
@@ -76,6 +125,15 @@ async fn run_compact_task_inner(
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
if let Some(incoming_items) = incoming_items.as_ref() {
history.record_items(incoming_items.iter(), turn_context.truncation_policy);
}
if !history.raw_items().iter().any(is_user_turn_boundary) {
// Nothing to compact: do not rewrite history when there is no user-turn boundary.
sess.emit_turn_item_completed(&turn_context, compaction_item)
.await;
return Ok(());
}
history.record_items(
&[initial_input_for_turn.into()],
turn_context.truncation_policy,
@@ -151,8 +209,11 @@ async fn run_compact_task_inner(
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
// Trim from the beginning to preserve cache (prefix-based) and keep recent
// messages intact.
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
);
history.remove_first_item();
@@ -161,8 +222,12 @@ async fn run_compact_task_inner(
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
compact_error = %e,
"compaction failed after history truncation could not proceed"
);
return Err(e);
}
Err(e) => {
@@ -177,11 +242,16 @@ async fn run_compact_task_inner(
.await;
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return Err(e);
}
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
retries,
max_retries,
compact_error = %e,
"compaction failed after retry exhaustion"
);
return Err(e);
}
}
}
@@ -191,9 +261,32 @@ async fn run_compact_task_inner(
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
let user_messages = collect_user_messages(history_items);
let incoming_user_items = match incoming_items.as_ref() {
Some(items) => items
.iter()
.filter(|item| real_user_message_text(item).is_some())
.cloned()
.collect(),
None => Vec::new(),
};
let initial_context = sess.build_initial_context(turn_context.as_ref()).await;
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
let initial_context = match turn_context_reinjection {
TurnContextReinjection::ReinjectAboveLastRealUser => {
sess.build_initial_context(turn_context.as_ref()).await
}
TurnContextReinjection::Skip => Vec::new(),
};
let compacted_history = build_compacted_history_with_limit(
&user_messages,
&incoming_user_items,
&summary_text,
COMPACT_USER_MESSAGE_MAX_TOKENS,
);
let mut new_history = process_compacted_history(
compacted_history,
&initial_context,
turn_context_reinjection,
);
let ghost_snapshots: Vec<ResponseItem> = history_items
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
@@ -205,7 +298,7 @@ async fn run_compact_task_inner(
let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.clone(),
replacement_history: None,
replacement_history: Some(sess.clone_history().await.raw_items().to_vec()),
});
sess.persist_rollout_items(&[rollout_item]).await;
@@ -260,28 +353,39 @@ pub(crate) fn is_summary_message(message: &str) -> bool {
pub(crate) fn process_compacted_history(
mut compacted_history: Vec<ResponseItem>,
initial_context: &[ResponseItem],
turn_context_reinjection: TurnContextReinjection,
) -> Vec<ResponseItem> {
// Keep only model-visible transcript items that we allow from remote compaction output.
compacted_history.retain(should_keep_compacted_history_item);
let initial_context = initial_context.to_vec();
// Re-inject canonical context from the current session since we stripped it
// from the pre-compaction history. Keep it right before the last user
// message so older user messages remain earlier in the transcript.
if let Some(last_user_index) = compacted_history.iter().rposition(|item| {
matches!(
crate::event_mapping::parse_turn_item(item),
Some(TurnItem::UserMessage(_))
)
}) {
compacted_history.splice(last_user_index..last_user_index, initial_context);
} else {
compacted_history.extend(initial_context);
match turn_context_reinjection {
TurnContextReinjection::ReinjectAboveLastRealUser => {
// Insert immediately above the last real user message so turn context applies to that
// user input rather than an earlier turn.
if let Some(insertion_index) = compacted_history
.iter()
.rposition(|item| real_user_message_text(item).is_some())
{
compacted_history
.splice(insertion_index..insertion_index, initial_context.to_vec());
}
}
TurnContextReinjection::Skip => {}
}
compacted_history
}
fn real_user_message_text(item: &ResponseItem) -> Option<String> {
match crate::event_mapping::parse_turn_item(item) {
Some(TurnItem::UserMessage(user_message)) => {
let message = user_message.message();
(!is_summary_message(&message)).then_some(message)
}
_ => None,
}
}
/// Returns whether an item from remote compaction output should be preserved.
///
/// Called while processing the model-provided compacted transcript, before we
@@ -307,24 +411,24 @@ fn should_keep_compacted_history_item(item: &ResponseItem) -> bool {
}
pub(crate) fn build_compacted_history(
initial_context: Vec<ResponseItem>,
user_messages: &[String],
summary_text: &str,
) -> Vec<ResponseItem> {
build_compacted_history_with_limit(
initial_context,
user_messages,
&[],
summary_text,
COMPACT_USER_MESSAGE_MAX_TOKENS,
)
}
fn build_compacted_history_with_limit(
mut history: Vec<ResponseItem>,
user_messages: &[String],
incoming_user_items: &[ResponseItem],
summary_text: &str,
max_tokens: usize,
) -> Vec<ResponseItem> {
let mut history = Vec::new();
let mut selected_messages: Vec<String> = Vec::new();
if max_tokens > 0 {
let mut remaining = max_tokens;
@@ -357,6 +461,8 @@ fn build_compacted_history_with_limit(
});
}
history.extend(incoming_user_items.iter().cloned());
let summary_text = if summary_text.is_empty() {
"(no summary available)".to_string()
} else {
@@ -535,8 +641,8 @@ do things
let max_tokens = 16;
let big = "word ".repeat(200);
let history = super::build_compacted_history_with_limit(
Vec::new(),
std::slice::from_ref(&big),
&[],
"SUMMARY",
max_tokens,
);
@@ -572,11 +678,10 @@ do things
#[test]
fn build_token_limited_compacted_history_appends_summary_message() {
let initial_context: Vec<ResponseItem> = Vec::new();
let user_messages = vec!["first user message".to_string()];
let summary_text = "summary text";
let history = build_compacted_history(initial_context, &user_messages, summary_text);
let history = build_compacted_history(&user_messages, summary_text);
assert!(
!history.is_empty(),
"expected compacted history to include summary"
@@ -592,6 +697,55 @@ do things
assert_eq!(summary, summary_text);
}
#[test]
fn build_compacted_history_preserves_incoming_user_item_structure() {
let preserved_user_item = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![
ContentItem::InputImage {
image_url: "data:image/png;base64,AAAA".to_string(),
},
ContentItem::InputText {
text: "latest user with image".to_string(),
},
],
end_turn: None,
phase: None,
};
let history = super::build_compacted_history_with_limit(
&["older user".to_string()],
std::slice::from_ref(&preserved_user_item),
"SUMMARY",
128,
);
let expected = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
preserved_user_item,
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "SUMMARY".to_string(),
}],
end_turn: None,
phase: None,
},
];
assert_eq!(history, expected);
}
#[test]
fn process_compacted_history_replaces_developer_messages() {
let compacted_history = vec![
@@ -657,7 +811,11 @@ do things
},
];
let refreshed = process_compacted_history(compacted_history, &initial_context);
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
@@ -766,7 +924,11 @@ keep me updated
},
];
let refreshed = process_compacted_history(compacted_history, &initial_context);
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
@@ -902,7 +1064,11 @@ keep me updated
phase: None,
}];
let refreshed = process_compacted_history(compacted_history, &initial_context);
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
@@ -967,7 +1133,11 @@ keep me updated
phase: None,
}];
let refreshed = process_compacted_history(compacted_history, &initial_context);
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
@@ -1008,4 +1178,320 @@ keep me updated
];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_pre_turn_places_summary_last() {
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
},
];
let initial_context = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
}];
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
},
];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_preserves_summary_order() {
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nolder summary"),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "newer user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nlatest summary"),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "assistant after latest summary".to_string(),
}],
end_turn: None,
phase: None,
},
];
let refreshed =
process_compacted_history(compacted_history, &[], TurnContextReinjection::Skip);
let expected = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nolder summary"),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "newer user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nlatest summary"),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "assistant after latest summary".to_string(),
}],
end_turn: None,
phase: None,
},
];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_skips_context_insertion_without_real_user_message() {
let compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
}];
let initial_context = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
}];
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::Skip,
);
let expected = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
}];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_reinject_noops_without_real_user_message() {
let compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
}];
let initial_context = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
}];
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
}];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_mid_turn_without_orphan_user_places_summary_last() {
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
},
];
let initial_context = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
}];
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
},
];
assert_eq!(refreshed, expected);
}
}

View File

@@ -3,10 +3,14 @@ use std::sync::Arc;
use crate::Prompt;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::compact::AutoCompactCallsite;
use crate::compact::TurnContextReinjection;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::context_manager::estimate_item_token_count;
use crate::context_manager::estimate_response_item_model_visible_bytes;
use crate::context_manager::is_codex_generated_item;
use crate::context_manager::is_user_turn_boundary;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::protocol::CompactedItem;
@@ -24,8 +28,19 @@ use tracing::info;
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
// Controls whether canonical turn context should be reinserted into compacted history.
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
run_remote_compact_task_inner(&sess, &turn_context).await?;
run_remote_compact_task_inner(
&sess,
&turn_context,
auto_compact_callsite,
turn_context_reinjection,
incoming_items,
)
.await?;
Ok(())
}
@@ -40,18 +55,40 @@ pub(crate) async fn run_remote_compact_task(
});
sess.send_event(&turn_context, start_event).await;
run_remote_compact_task_inner(&sess, &turn_context).await
run_remote_compact_task_inner(
&sess,
&turn_context,
AutoCompactCallsite::PreTurnExcludingIncomingUserMessage,
// Manual `/compact` should not reinsert turn context into compacted history; we reseed
// canonical initial context before the next user turn.
TurnContextReinjection::Skip,
None,
)
.await
}
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
if let Err(err) = run_remote_compact_task_inner_impl(
sess,
turn_context,
auto_compact_callsite,
turn_context_reinjection,
incoming_items,
)
.await
{
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
compact_error = %err,
"remote compaction task failed"
);
sess.send_event(turn_context, event).await;
return Err(err);
}
Ok(())
@@ -60,6 +97,9 @@ async fn run_remote_compact_task_inner(
async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(turn_context, &compaction_item)
@@ -70,10 +110,21 @@ async fn run_remote_compact_task_inner_impl(
&mut history,
turn_context.as_ref(),
&base_instructions,
incoming_items.as_deref(),
);
if let Some(incoming_items) = incoming_items {
history.record_items(incoming_items.iter(), turn_context.truncation_policy);
}
if !history.raw_items().iter().any(is_user_turn_boundary) {
// Nothing to compact: do not rewrite history when there is no user-turn boundary.
sess.emit_turn_item_completed(turn_context, compaction_item)
.await;
return Ok(());
}
if deleted_items > 0 {
info!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
deleted_items,
"trimmed history items before remote compaction"
);
@@ -110,6 +161,7 @@ async fn run_remote_compact_task_inner_impl(
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
log_remote_compact_failure(
turn_context,
auto_compact_callsite,
&compact_request_log_data,
total_usage_breakdown,
&err,
@@ -118,7 +170,7 @@ async fn run_remote_compact_task_inner_impl(
})
.await?;
new_history = sess
.process_compacted_history(turn_context, new_history)
.process_compacted_history(turn_context, new_history, turn_context_reinjection)
.await;
if !ghost_snapshots.is_empty() {
@@ -163,12 +215,14 @@ fn build_compact_request_log_data(
fn log_remote_compact_failure(
turn_context: &TurnContext,
auto_compact_callsite: AutoCompactCallsite,
log_data: &CompactRequestLogData,
total_usage_breakdown: TotalTokenUsageBreakdown,
err: &CodexErr,
) {
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
last_api_response_total_tokens = total_usage_breakdown.last_api_response_total_tokens,
all_history_items_model_visible_bytes = total_usage_breakdown.all_history_items_model_visible_bytes,
estimated_tokens_of_items_added_since_last_successful_api_response = total_usage_breakdown.estimated_tokens_of_items_added_since_last_successful_api_response,
@@ -184,15 +238,37 @@ fn trim_function_call_history_to_fit_context_window(
history: &mut ContextManager,
turn_context: &TurnContext,
base_instructions: &BaseInstructions,
incoming_items: Option<&[ResponseItem]>,
) -> usize {
let Some(context_window) = turn_context.model_context_window() else {
return 0;
};
let incoming_items_tokens = incoming_items
.unwrap_or_default()
.iter()
.map(estimate_item_token_count)
.fold(0_i64, i64::saturating_add);
trim_codex_generated_tail_items_to_fit_context_window(
history,
context_window,
base_instructions,
incoming_items_tokens,
)
}
fn trim_codex_generated_tail_items_to_fit_context_window(
history: &mut ContextManager,
context_window: i64,
base_instructions: &BaseInstructions,
incoming_items_tokens: i64,
) -> usize {
let mut deleted_items = 0usize;
let Some(context_window) = turn_context.model_context_window() else {
return deleted_items;
};
while history
.estimate_token_count_with_base_instructions(base_instructions)
.is_some_and(|estimated_tokens| estimated_tokens > context_window)
.is_some_and(|estimated_tokens| {
estimated_tokens.saturating_add(incoming_items_tokens) > context_window
})
{
let Some(last_item) = history.raw_items().last() else {
break;
@@ -208,3 +284,90 @@ fn trim_function_call_history_to_fit_context_window(
deleted_items
}
#[cfg(test)]
mod tests {
use super::*;
use crate::truncate::TruncationPolicy;
use codex_protocol::models::ContentItem;
use pretty_assertions::assert_eq;
fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn developer_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
#[test]
fn trim_accounts_for_incoming_items_tokens() {
let base_instructions = BaseInstructions {
text: String::new(),
};
let incoming_items = vec![user_message(
"INCOMING_USER_MESSAGE_THAT_TIPS_OVER_THE_WINDOW",
)];
let incoming_items_tokens = incoming_items
.iter()
.map(estimate_item_token_count)
.fold(0_i64, i64::saturating_add);
assert!(
incoming_items_tokens > 0,
"expected incoming item token estimate to be positive"
);
let mut history = ContextManager::new();
let history_items = vec![
user_message("USER_ONE"),
developer_message("TRAILING_CODEX_GENERATED_CONTEXT"),
];
history.record_items(history_items.iter(), TruncationPolicy::Tokens(10_000));
let history_tokens = history
.estimate_token_count_with_base_instructions(&base_instructions)
.unwrap_or_default();
let context_window = history_tokens
.saturating_add(incoming_items_tokens)
.saturating_sub(1);
let mut without_incoming_projection = history.clone();
let deleted_without_incoming = trim_codex_generated_tail_items_to_fit_context_window(
&mut without_incoming_projection,
context_window,
&base_instructions,
0,
);
assert_eq!(
deleted_without_incoming, 0,
"history-only projection should not trim when currently under the limit"
);
let deleted_with_incoming = trim_codex_generated_tail_items_to_fit_context_window(
&mut history,
context_window,
&base_instructions,
incoming_items_tokens,
);
assert_eq!(
deleted_with_incoming, 1,
"incoming projection should trim trailing codex-generated history to fit pre-turn request"
);
assert_eq!(history.raw_items(), vec![user_message("USER_ONE")]);
}
}

View File

@@ -395,7 +395,7 @@ fn estimate_reasoning_length(encoded_len: usize) -> usize {
.saturating_sub(650)
}
fn estimate_item_token_count(item: &ResponseItem) -> i64 {
pub(crate) fn estimate_item_token_count(item: &ResponseItem) -> i64 {
let model_visible_bytes = estimate_response_item_model_visible_bytes(item);
approx_tokens_from_byte_count_i64(model_visible_bytes)
}

View File

@@ -3,6 +3,7 @@ mod normalize;
pub(crate) use history::ContextManager;
pub(crate) use history::TotalTokenUsageBreakdown;
pub(crate) use history::estimate_item_token_count;
pub(crate) use history::estimate_response_item_model_visible_bytes;
pub(crate) use history::is_codex_generated_item;
pub(crate) use history::is_user_turn_boundary;

View File

@@ -3,6 +3,8 @@ use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
use crate::codex::TurnContext;
use crate::context_manager::is_user_turn_boundary;
use crate::protocol::EventMsg;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_protocol::user_input::UserInput;
@@ -25,20 +27,48 @@ impl SessionTask for CompactTask {
_cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();
let has_user_turn_boundary = session
.clone_history()
.await
.raw_items()
.iter()
.any(is_user_turn_boundary);
if crate::compact::should_use_remote_compact_task(&ctx.provider) {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "remote")],
);
let _ = crate::compact_remote::run_remote_compact_task(session, ctx).await;
if let Err(err) =
crate::compact_remote::run_remote_compact_task(session.clone(), ctx.clone()).await
{
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
session.send_event(&ctx, event).await;
} else if has_user_turn_boundary {
// Manual `/compact` rewrites history to compacted transcript items and drops
// per-turn context entries. Force initial-context reseeding on the next user turn.
session.mark_initial_context_unseeded_for_next_turn().await;
}
} else {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "local")],
);
let _ = crate::compact::run_compact_task(session, ctx, input).await;
if let Err(err) =
crate::compact::run_compact_task(session.clone(), ctx.clone(), input).await
{
let event = EventMsg::Error(
err.to_error_event(Some("Error running local compact task".to_string())),
);
session.send_event(&ctx, event).await;
} else if has_user_turn_boundary {
// Manual `/compact` rewrites history to compacted transcript items and drops
// per-turn context entries. Force initial-context reseeding on the next user turn.
session.mark_initial_context_unseeded_for_next_turn().await;
}
}
None

View File

@@ -8,6 +8,7 @@ use crate::codex::run_turn;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_otel::OtelManager;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::user_input::UserInput;
use futures::future::BoxFuture;
@@ -24,12 +25,14 @@ type PrewarmedSessionTask = JoinHandle<Option<ModelClientSession>>;
pub(crate) struct RegularTask {
prewarmed_session_task: Mutex<Option<PrewarmedSessionTask>>,
pre_turn_context_items: Vec<ResponseItem>,
}
impl Default for RegularTask {
fn default() -> Self {
Self {
prewarmed_session_task: Mutex::new(None),
pre_turn_context_items: Vec::new(),
}
}
}
@@ -58,9 +61,15 @@ impl RegularTask {
Self {
prewarmed_session_task: Mutex::new(Some(prewarmed_session_task)),
pre_turn_context_items: Vec::new(),
}
}
pub(crate) fn with_pre_turn_context_items(mut self, items: Vec<ResponseItem>) -> Self {
self.pre_turn_context_items = items;
self
}
async fn take_prewarmed_session(&self) -> Option<ModelClientSession> {
let prewarmed_session_task = self
.prewarmed_session_task
@@ -104,6 +113,7 @@ impl SessionTask for RegularTask {
sess,
ctx,
input,
self.pre_turn_context_items.clone(),
prewarmed_client_session,
cancellation_token,
)

View File

@@ -113,6 +113,94 @@ async fn model_change_appends_model_instructions_developer_message() -> Result<(
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn settings_only_empty_turn_persists_updates_for_next_non_empty_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let resp_mock = mount_sse_sequence(
&server,
vec![sse_completed("resp-1"), sse_completed("resp-2")],
)
.await;
let mut builder = test_codex().with_model("gpt-5.2-codex");
let test = builder.build(&server).await?;
let model = test.session_configured.model.clone();
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "first".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
model: model.clone(),
effort: test.config.model_reasoning_effort,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
// Settings-only turn with no user message.
test.codex
.submit(Op::UserTurn {
items: Vec::new(),
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: model.clone(),
effort: test.config.model_reasoning_effort,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "after settings-only turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model,
effort: test.config.model_reasoning_effort,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = resp_mock.requests();
assert_eq!(
requests.len(),
2,
"expected only first and third turns to hit the model"
);
let third_turn_request = requests.last().expect("expected third turn request");
let developer_texts = third_turn_request.message_input_texts("developer");
assert!(
developer_texts
.iter()
.any(|text| text.contains("sandbox_mode` is `danger-full-access`")),
"expected danger-full-access permissions update in next non-empty turn: {developer_texts:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn model_and_personality_change_only_appends_model_instructions() -> Result<()> {
skip_if_no_network!(Ok(()));