From b774ea09ad2655717c9d00459b7729def65e1d59 Mon Sep 17 00:00:00 2001 From: Charles Cunningham Date: Wed, 11 Feb 2026 12:05:26 -0800 Subject: [PATCH] compact: split core logic changes from snapshot test coverage --- codex-rs/core/src/codex.rs | 422 ++++++++++++-- codex-rs/core/src/compact.rs | 558 +++++++++++++++++-- codex-rs/core/src/compact_remote.rs | 185 +++++- codex-rs/core/src/context_manager/history.rs | 2 +- codex-rs/core/src/context_manager/mod.rs | 1 + codex-rs/core/src/tasks/compact.rs | 34 +- codex-rs/core/src/tasks/regular.rs | 10 + codex-rs/core/tests/suite/model_switching.rs | 88 +++ 8 files changed, 1213 insertions(+), 87 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 2b9b04e2ed..38849c6e5a 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -17,6 +17,8 @@ use crate::analytics_client::AnalyticsEventsClient; use crate::analytics_client::build_track_events_context; use crate::apps::render_apps_section; use crate::compact; +use crate::compact::AutoCompactCallsite; +use crate::compact::TurnContextReinjection; use crate::compact::run_inline_auto_compact_task; use crate::compact::should_use_remote_compact_task; use crate::compact_remote::run_inline_remote_auto_compact_task; @@ -123,6 +125,7 @@ use crate::config::types::McpServerConfig; use crate::config::types::ShellEnvironmentPolicy; use crate::context_manager::ContextManager; use crate::context_manager::TotalTokenUsageBreakdown; +use crate::context_manager::estimate_item_token_count; use crate::environment_context::EnvironmentContext; use crate::error::CodexErr; use crate::error::Result as CodexResult; @@ -2166,11 +2169,11 @@ impl Session { history.replace(replacement.clone()); } else { let user_messages = collect_user_messages(history.raw_items()); - let rebuilt = compact::build_compacted_history( - self.build_initial_context(turn_context).await, + let mut rebuilt = self.build_initial_context(turn_context).await; + rebuilt.extend(compact::build_compacted_history( &user_messages, &compacted.message, - ); + )); history.replace(rebuilt); } } @@ -2187,9 +2190,14 @@ impl Session { &self, turn_context: &TurnContext, compacted_history: Vec, + turn_context_reinjection: TurnContextReinjection, ) -> Vec { let initial_context = self.build_initial_context(turn_context).await; - compact::process_compacted_history(compacted_history, &initial_context) + compact::process_compacted_history( + compacted_history, + &initial_context, + turn_context_reinjection, + ) } /// Append ResponseItems to the in-memory conversation history only. @@ -2239,6 +2247,11 @@ impl Session { self.flush_rollout().await; } + pub(crate) async fn mark_initial_context_unseeded_for_next_turn(&self) { + let mut state = self.state.lock().await; + state.initial_context_seeded = false; + } + async fn persist_rollout_response_items(&self, items: &[ResponseItem]) { let rollout_items: Vec = items .iter() @@ -2295,14 +2308,6 @@ impl Session { DeveloperInstructions::new(SEARCH_TOOL_DEVELOPER_INSTRUCTIONS.to_string()).into(), ); } - // Add developer instructions for memories. - if let Some(memory_prompt) = - memories::build_memory_tool_developer_instructions(&turn_context.config.codex_home) - .await - && turn_context.features.enabled(Feature::MemoryTool) - { - items.push(DeveloperInstructions::new(memory_prompt).into()); - } // Add developer instructions from collaboration_mode if they exist and are non-empty let (collaboration_mode, base_instructions) = { let state = self.state.lock().await; @@ -3122,19 +3127,29 @@ mod handlers { if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await { sess.seed_initial_context_if_needed(¤t_context).await; let resumed_model = sess.take_pending_resume_previous_model().await; - let update_items = sess.build_settings_update_items( + let pre_turn_context_items = sess.build_settings_update_items( previous_context.as_ref(), resumed_model.as_deref(), ¤t_context, ); - if !update_items.is_empty() { - sess.record_conversation_items(¤t_context, &update_items) + let has_user_input = !items.is_empty(); + if !has_user_input && !pre_turn_context_items.is_empty() { + // Empty-input UserTurn still needs these model-visible updates persisted now. + // Otherwise `previous_context` advances and the next non-empty turn computes no diff. + sess.record_conversation_items(¤t_context, &pre_turn_context_items) .await; } sess.refresh_mcp_servers_if_requested(¤t_context) .await; - let regular_task = sess.take_startup_regular_task().await.unwrap_or_default(); + let regular_task = if has_user_input { + sess.take_startup_regular_task() + .await + .unwrap_or_default() + .with_pre_turn_context_items(pre_turn_context_items) + } else { + sess.take_startup_regular_task().await.unwrap_or_default() + }; sess.spawn_task(Arc::clone(¤t_context), items, regular_task) .await; *previous_context = Some(current_context); @@ -3861,6 +3876,7 @@ pub(crate) async fn run_turn( sess: Arc, turn_context: Arc, input: Vec, + pre_turn_context_items: Vec, prewarmed_client_session: Option, cancellation_token: CancellationToken, ) -> Option { @@ -3870,7 +3886,9 @@ pub(crate) async fn run_turn( let model_info = turn_context.model_info.clone(); let auto_compact_limit = model_info.auto_compact_token_limit().unwrap_or(i64::MAX); - let total_usage_tokens = sess.get_total_token_usage().await; + let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into(); + let mut incoming_turn_items = pre_turn_context_items.clone(); + incoming_turn_items.push(response_item.clone()); let event = EventMsg::TurnStarted(TurnStartedEvent { turn_id: turn_context.sub_id.clone(), @@ -3878,11 +3896,34 @@ pub(crate) async fn run_turn( collaboration_mode_kind: turn_context.collaboration_mode.mode, }); sess.send_event(&turn_context, event).await; - if total_usage_tokens >= auto_compact_limit - && run_auto_compact(&sess, &turn_context).await.is_err() + let pre_turn_compaction_outcome = match run_pre_turn_auto_compaction_if_needed( + &sess, + &turn_context, + auto_compact_limit, + &incoming_turn_items, + ) + .await { - return None; - } + Ok(outcome) => outcome, + Err(CodexErr::ContextWindowExceeded) => { + let incoming_items_tokens_estimate = incoming_turn_items + .iter() + .map(estimate_item_token_count) + .fold(0_i64, i64::saturating_add); + let message = format!( + "Incoming user message and/or turn context is too large to fit in context window. Please reduce the size of your message and try again. (incoming_items_tokens_estimate={incoming_items_tokens_estimate})" + ); + let event = + EventMsg::Error(CodexErr::ContextWindowExceeded.to_error_event(Some(message))); + sess.send_event(&turn_context, event).await; + return None; + } + Err(err) => { + let event = EventMsg::Error(err.to_error_event(None)); + sess.send_event(&turn_context, event).await; + return None; + } + }; let skills_outcome = Some( sess.services @@ -3954,10 +3995,15 @@ pub(crate) async fn run_turn( .await; } - let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone()); - let response_item: ResponseItem = initial_input_for_turn.clone().into(); - sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item) - .await; + persist_pre_turn_items_for_compaction_outcome( + &sess, + &turn_context, + pre_turn_compaction_outcome, + &pre_turn_context_items, + &input, + response_item, + ) + .await; if !skill_items.is_empty() { sess.record_conversation_items(&turn_context, &skill_items) @@ -4059,7 +4105,19 @@ pub(crate) async fn run_turn( // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. if token_limit_reached && needs_follow_up { - if run_auto_compact(&sess, &turn_context).await.is_err() { + if let Err(err) = run_auto_compact( + &sess, + &turn_context, + AutoCompactCallsite::MidTurnContinuation, + TurnContextReinjection::ReinjectAboveLastRealUser, + None, + ) + .await + { + let event = EventMsg::Error( + err.to_error_event(Some("Error running auto compact task".to_string())), + ); + sess.send_event(&turn_context, event).await; return None; } continue; @@ -4119,13 +4177,158 @@ pub(crate) async fn run_turn( last_agent_message } -async fn run_auto_compact(sess: &Arc, turn_context: &Arc) -> CodexResult<()> { - if should_use_remote_compact_task(&turn_context.provider) { - run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?; - } else { - run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PreTurnCompactionOutcome { + /// Pre-turn input fits without compaction. + NotNeeded, + /// Pre-turn compaction succeeded with incoming turn context + user message included. + CompactedWithIncomingItems, + /// Pre-turn compaction succeeded without incoming turn items + /// (incoming user message should be appended after the compaction summary). + /// This compaction strategy is currently out of distribution for our compaction model, + /// but is planned to be trained on in the future. + #[cfg(test)] + CompactedWithoutIncomingItems, +} + +async fn persist_pre_turn_items_for_compaction_outcome( + sess: &Arc, + turn_context: &Arc, + outcome: PreTurnCompactionOutcome, + pre_turn_context_items: &[ResponseItem], + input: &[UserInput], + response_item: ResponseItem, +) { + match outcome { + PreTurnCompactionOutcome::CompactedWithIncomingItems => { + // Incoming turn items were already part of pre-turn compaction input, and the + // user prompt is already persisted in history after compaction. Emit lifecycle events + // only so UI/consumers still observe a normal user turn item transition. + let turn_item = TurnItem::UserMessage(UserMessageItem::new(input)); + sess.emit_turn_item_started(turn_context.as_ref(), &turn_item) + .await; + sess.emit_turn_item_completed(turn_context.as_ref(), turn_item) + .await; + sess.ensure_rollout_materialized().await; + } + PreTurnCompactionOutcome::NotNeeded => { + if !pre_turn_context_items.is_empty() { + sess.record_conversation_items(turn_context, pre_turn_context_items) + .await; + } + sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), input, response_item) + .await; + } + #[cfg(test)] + PreTurnCompactionOutcome::CompactedWithoutIncomingItems => { + // Reserved path for future models that compact pre-turn history without incoming turn + // items; reseed canonical initial context above the incoming user message. + let initial_context = sess.build_initial_context(turn_context.as_ref()).await; + if !initial_context.is_empty() { + sess.record_conversation_items(turn_context, &initial_context) + .await; + } + sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), input, response_item) + .await; + } } - Ok(()) +} + +/// Runs pre-turn auto-compaction with incoming turn context + user message included. +async fn run_pre_turn_auto_compaction_if_needed( + sess: &Arc, + turn_context: &Arc, + auto_compact_limit: i64, + incoming_turn_items: &[ResponseItem], +) -> CodexResult { + let total_usage_tokens = sess.get_total_token_usage().await; + let incoming_items_tokens_estimate = incoming_turn_items + .iter() + .map(estimate_item_token_count) + .fold(0_i64, i64::saturating_add); + if !is_projected_submission_over_auto_compact_limit( + total_usage_tokens, + incoming_items_tokens_estimate, + auto_compact_limit, + ) { + return Ok(PreTurnCompactionOutcome::NotNeeded); + } + + let compact_result = run_auto_compact( + sess, + turn_context, + AutoCompactCallsite::PreTurnIncludingIncomingUserMessage, + TurnContextReinjection::ReinjectAboveLastRealUser, + Some(incoming_turn_items.to_vec()), + ) + .await; + + if let Err(err) = compact_result { + if matches!(err, CodexErr::ContextWindowExceeded) { + error!( + turn_id = %turn_context.sub_id, + auto_compact_callsite = ?AutoCompactCallsite::PreTurnIncludingIncomingUserMessage, + incoming_items_tokens_estimate, + auto_compact_limit, + reason = "pre-turn compaction exceeded context window", + "incoming user/context is too large for pre-turn auto-compaction flow" + ); + } + return Err(err); + } + + Ok(PreTurnCompactionOutcome::CompactedWithIncomingItems) +} + +fn is_projected_submission_over_auto_compact_limit( + total_usage_tokens: i64, + incoming_user_tokens_estimate: i64, + auto_compact_limit: i64, +) -> bool { + if auto_compact_limit == i64::MAX { + return false; + } + + total_usage_tokens.saturating_add(incoming_user_tokens_estimate) >= auto_compact_limit +} + +async fn run_auto_compact( + sess: &Arc, + turn_context: &Arc, + auto_compact_callsite: AutoCompactCallsite, + turn_context_reinjection: TurnContextReinjection, + incoming_items: Option>, +) -> CodexResult<()> { + let result = if should_use_remote_compact_task(&turn_context.provider) { + run_inline_remote_auto_compact_task( + Arc::clone(sess), + Arc::clone(turn_context), + auto_compact_callsite, + turn_context_reinjection, + incoming_items, + ) + .await + } else { + run_inline_auto_compact_task( + Arc::clone(sess), + Arc::clone(turn_context), + auto_compact_callsite, + turn_context_reinjection, + incoming_items, + ) + .await + }; + + if let Err(err) = &result { + error!( + turn_id = %turn_context.sub_id, + auto_compact_callsite = ?auto_compact_callsite, + compact_error = %err, + "auto compaction failed" + ); + } + + result } fn filter_connectors_for_input( @@ -4996,7 +5199,7 @@ async fn try_run_sampling_request( ResponseEvent::Completed { response_id: _, token_usage, - can_append: _, + .. } => { if let Some(state) = plan_mode_state.as_mut() { flush_proposed_plan_segments_all(&sess, &turn_context, state).await; @@ -5208,6 +5411,151 @@ mod tests { } } + #[test] + fn pre_turn_projection_uses_incoming_user_tokens_for_compaction() { + assert!(is_projected_submission_over_auto_compact_limit(90, 15, 100)); + assert!(!is_projected_submission_over_auto_compact_limit(90, 9, 100)); + } + + #[test] + fn pre_turn_projection_does_not_compact_with_unbounded_limit() { + assert!(!is_projected_submission_over_auto_compact_limit( + i64::MAX - 1, + 100, + i64::MAX, + )); + } + + #[test] + fn post_compaction_projection_triggers_error_when_still_over_limit() { + assert!(is_projected_submission_over_auto_compact_limit(95, 10, 100)); + assert!(is_projected_submission_over_auto_compact_limit( + 100, 10, 100 + )); + assert!(!is_projected_submission_over_auto_compact_limit( + 80, 10, 100 + )); + } + + #[tokio::test] + async fn reserved_compacted_without_incoming_items_records_initial_context_and_prompt() { + let (session, turn_context) = make_session_and_context().await; + let session = Arc::new(session); + let turn_context = Arc::new(turn_context); + let input = vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }]; + let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into(); + let stale_pre_turn_context_items = vec![ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: "stale context diff".to_string(), + }], + end_turn: None, + phase: None, + }]; + + persist_pre_turn_items_for_compaction_outcome( + &session, + &turn_context, + PreTurnCompactionOutcome::CompactedWithoutIncomingItems, + &stale_pre_turn_context_items, + &input, + response_item.clone(), + ) + .await; + + let mut expected = session.build_initial_context(turn_context.as_ref()).await; + expected.push(response_item); + let actual = session.clone_history().await.raw_items().to_vec(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn compacted_with_incoming_items_emits_lifecycle_without_history_writes() { + let (session, turn_context) = make_session_and_context().await; + let session = Arc::new(session); + let turn_context = Arc::new(turn_context); + let input = vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }]; + let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into(); + let stale_pre_turn_context_items = vec![ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: "stale context diff".to_string(), + }], + end_turn: None, + phase: None, + }]; + + persist_pre_turn_items_for_compaction_outcome( + &session, + &turn_context, + PreTurnCompactionOutcome::CompactedWithIncomingItems, + &stale_pre_turn_context_items, + &input, + response_item, + ) + .await; + + let actual = session.clone_history().await.raw_items().to_vec(); + assert_eq!(actual, Vec::::new()); + } + + #[test] + fn estimate_user_input_token_count_is_positive_for_text_input() { + let input = vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }]; + let response_input_item = ResponseInputItem::from(input); + let response_item: ResponseItem = response_input_item.into(); + let estimated_tokens = estimate_item_token_count(&response_item); + assert!(estimated_tokens > 0); + } + + #[test] + fn estimate_user_input_token_count_ignores_skill_and_mention_payload_lengths() { + let short = vec![ + UserInput::Skill { + name: "s".to_string(), + path: PathBuf::from("/s"), + }, + UserInput::Mention { + name: "m".to_string(), + path: "app://m".to_string(), + }, + ]; + let long = vec![ + UserInput::Skill { + name: "very-long-skill-name-that-should-not-affect-prompt-serialization" + .to_string(), + path: PathBuf::from( + "/very/long/skill/path/that/should/not/affect/prompt/serialization/SKILL.md", + ), + }, + UserInput::Mention { + name: "very-long-mention-name-that-should-not-affect-prompt-serialization" + .to_string(), + path: "app://very-long-connector-path-that-should-not-affect-prompt-serialization" + .to_string(), + }, + ]; + + let short_response_input_item = ResponseInputItem::from(short); + let long_response_input_item = ResponseInputItem::from(long); + let short_response_item: ResponseItem = short_response_input_item.into(); + let long_response_item: ResponseItem = long_response_input_item.into(); + let short_tokens = estimate_item_token_count(&short_response_item); + let long_tokens = estimate_item_token_count(&long_response_item); + assert_eq!(short_tokens, long_tokens); + } + fn make_connector(id: &str, name: &str) -> AppInfo { AppInfo { id: id.to_string(), @@ -6969,8 +7317,8 @@ mod tests { .clone() .for_prompt(&reconstruction_turn.model_info.input_modalities); let user_messages1 = collect_user_messages(&snapshot1); - let rebuilt1 = - compact::build_compacted_history(initial_context.clone(), &user_messages1, summary1); + let mut rebuilt1 = initial_context.clone(); + rebuilt1.extend(compact::build_compacted_history(&user_messages1, summary1)); live_history.replace(rebuilt1); rollout_items.push(RolloutItem::Compacted(CompactedItem { message: summary1.to_string(), @@ -7012,8 +7360,8 @@ mod tests { .clone() .for_prompt(&reconstruction_turn.model_info.input_modalities); let user_messages2 = collect_user_messages(&snapshot2); - let rebuilt2 = - compact::build_compacted_history(initial_context.clone(), &user_messages2, summary2); + let mut rebuilt2 = initial_context.clone(); + rebuilt2.extend(compact::build_compacted_history(&user_messages2, summary2)); live_history.replace(rebuilt2); rollout_items.push(RolloutItem::Compacted(CompactedItem { message: summary2.to_string(), diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index f7de93dfe5..d9eb42bfe2 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -7,6 +7,7 @@ use crate::client_common::ResponseEvent; use crate::codex::Session; use crate::codex::TurnContext; use crate::codex::get_last_assistant_message_from_turn; +use crate::context_manager::is_user_turn_boundary; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::protocol::CompactedItem; @@ -32,6 +33,32 @@ pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md"); const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum AutoCompactCallsite { + /// Pre-turn auto-compaction where the incoming turn context + user message are included in + /// the compaction request. + PreTurnIncludingIncomingUserMessage, + /// Reserved pre-turn auto-compaction strategy that compacts from the end of the previous turn + /// only, excluding incoming turn context + user message. This is currently unused by the + /// default pre-turn flow and retained for future model-specific strategies. + #[allow(dead_code)] + PreTurnExcludingIncomingUserMessage, + /// Mid-turn compaction between assistant responses in a follow-up loop. + MidTurnContinuation, +} + +/// Controls whether compacted-history processing should reinsert canonical turn context. +/// +/// When callers exclude incoming user/context from the compaction request, they should typically +/// set reinjection to `Skip` and append canonical context together with the next user message. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum TurnContextReinjection { + /// Insert canonical context immediately above the last real user message in compacted history. + ReinjectAboveLastRealUser, + /// Do not reinsert canonical context while processing compacted history. + Skip, +} + pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bool { provider.is_openai() } @@ -39,6 +66,9 @@ pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bo pub(crate) async fn run_inline_auto_compact_task( sess: Arc, turn_context: Arc, + auto_compact_callsite: AutoCompactCallsite, + turn_context_reinjection: TurnContextReinjection, + incoming_items: Option>, ) -> CodexResult<()> { let prompt = turn_context.compact_prompt().to_string(); let input = vec![UserInput::Text { @@ -47,7 +77,15 @@ pub(crate) async fn run_inline_auto_compact_task( text_elements: Vec::new(), }]; - run_compact_task_inner(sess, turn_context, input).await?; + run_compact_task_inner( + sess, + turn_context, + input, + Some(auto_compact_callsite), + turn_context_reinjection, + incoming_items, + ) + .await?; Ok(()) } @@ -62,13 +100,24 @@ pub(crate) async fn run_compact_task( collaboration_mode_kind: turn_context.collaboration_mode.mode, }); sess.send_event(&turn_context, start_event).await; - run_compact_task_inner(sess.clone(), turn_context, input).await + run_compact_task_inner( + sess, + turn_context, + input, + None, + TurnContextReinjection::Skip, + None, + ) + .await } async fn run_compact_task_inner( sess: Arc, turn_context: Arc, input: Vec, + auto_compact_callsite: Option, + turn_context_reinjection: TurnContextReinjection, + incoming_items: Option>, ) -> CodexResult<()> { let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); sess.emit_turn_item_started(&turn_context, &compaction_item) @@ -76,6 +125,15 @@ async fn run_compact_task_inner( let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); let mut history = sess.clone_history().await; + if let Some(incoming_items) = incoming_items.as_ref() { + history.record_items(incoming_items.iter(), turn_context.truncation_policy); + } + if !history.raw_items().iter().any(is_user_turn_boundary) { + // Nothing to compact: do not rewrite history when there is no user-turn boundary. + sess.emit_turn_item_completed(&turn_context, compaction_item) + .await; + return Ok(()); + } history.record_items( &[initial_input_for_turn.into()], turn_context.truncation_policy, @@ -151,8 +209,11 @@ async fn run_compact_task_inner( } Err(e @ CodexErr::ContextWindowExceeded) => { if turn_input_len > 1 { - // Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact. + // Trim from the beginning to preserve cache (prefix-based) and keep recent + // messages intact. error!( + turn_id = %turn_context.sub_id, + auto_compact_callsite = ?auto_compact_callsite, "Context window exceeded while compacting; removing oldest history item. Error: {e}" ); history.remove_first_item(); @@ -161,8 +222,12 @@ async fn run_compact_task_inner( continue; } sess.set_total_tokens_full(turn_context.as_ref()).await; - let event = EventMsg::Error(e.to_error_event(None)); - sess.send_event(&turn_context, event).await; + error!( + turn_id = %turn_context.sub_id, + auto_compact_callsite = ?auto_compact_callsite, + compact_error = %e, + "compaction failed after history truncation could not proceed" + ); return Err(e); } Err(e) => { @@ -177,11 +242,16 @@ async fn run_compact_task_inner( .await; tokio::time::sleep(delay).await; continue; - } else { - let event = EventMsg::Error(e.to_error_event(None)); - sess.send_event(&turn_context, event).await; - return Err(e); } + error!( + turn_id = %turn_context.sub_id, + auto_compact_callsite = ?auto_compact_callsite, + retries, + max_retries, + compact_error = %e, + "compaction failed after retry exhaustion" + ); + return Err(e); } } } @@ -191,9 +261,32 @@ async fn run_compact_task_inner( let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default(); let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}"); let user_messages = collect_user_messages(history_items); + let incoming_user_items = match incoming_items.as_ref() { + Some(items) => items + .iter() + .filter(|item| real_user_message_text(item).is_some()) + .cloned() + .collect(), + None => Vec::new(), + }; - let initial_context = sess.build_initial_context(turn_context.as_ref()).await; - let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text); + let initial_context = match turn_context_reinjection { + TurnContextReinjection::ReinjectAboveLastRealUser => { + sess.build_initial_context(turn_context.as_ref()).await + } + TurnContextReinjection::Skip => Vec::new(), + }; + let compacted_history = build_compacted_history_with_limit( + &user_messages, + &incoming_user_items, + &summary_text, + COMPACT_USER_MESSAGE_MAX_TOKENS, + ); + let mut new_history = process_compacted_history( + compacted_history, + &initial_context, + turn_context_reinjection, + ); let ghost_snapshots: Vec = history_items .iter() .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) @@ -205,7 +298,7 @@ async fn run_compact_task_inner( let rollout_item = RolloutItem::Compacted(CompactedItem { message: summary_text.clone(), - replacement_history: None, + replacement_history: Some(sess.clone_history().await.raw_items().to_vec()), }); sess.persist_rollout_items(&[rollout_item]).await; @@ -260,28 +353,39 @@ pub(crate) fn is_summary_message(message: &str) -> bool { pub(crate) fn process_compacted_history( mut compacted_history: Vec, initial_context: &[ResponseItem], + turn_context_reinjection: TurnContextReinjection, ) -> Vec { + // Keep only model-visible transcript items that we allow from remote compaction output. compacted_history.retain(should_keep_compacted_history_item); - let initial_context = initial_context.to_vec(); - - // Re-inject canonical context from the current session since we stripped it - // from the pre-compaction history. Keep it right before the last user - // message so older user messages remain earlier in the transcript. - if let Some(last_user_index) = compacted_history.iter().rposition(|item| { - matches!( - crate::event_mapping::parse_turn_item(item), - Some(TurnItem::UserMessage(_)) - ) - }) { - compacted_history.splice(last_user_index..last_user_index, initial_context); - } else { - compacted_history.extend(initial_context); + match turn_context_reinjection { + TurnContextReinjection::ReinjectAboveLastRealUser => { + // Insert immediately above the last real user message so turn context applies to that + // user input rather than an earlier turn. + if let Some(insertion_index) = compacted_history + .iter() + .rposition(|item| real_user_message_text(item).is_some()) + { + compacted_history + .splice(insertion_index..insertion_index, initial_context.to_vec()); + } + } + TurnContextReinjection::Skip => {} } compacted_history } +fn real_user_message_text(item: &ResponseItem) -> Option { + match crate::event_mapping::parse_turn_item(item) { + Some(TurnItem::UserMessage(user_message)) => { + let message = user_message.message(); + (!is_summary_message(&message)).then_some(message) + } + _ => None, + } +} + /// Returns whether an item from remote compaction output should be preserved. /// /// Called while processing the model-provided compacted transcript, before we @@ -307,24 +411,24 @@ fn should_keep_compacted_history_item(item: &ResponseItem) -> bool { } pub(crate) fn build_compacted_history( - initial_context: Vec, user_messages: &[String], summary_text: &str, ) -> Vec { build_compacted_history_with_limit( - initial_context, user_messages, + &[], summary_text, COMPACT_USER_MESSAGE_MAX_TOKENS, ) } fn build_compacted_history_with_limit( - mut history: Vec, user_messages: &[String], + incoming_user_items: &[ResponseItem], summary_text: &str, max_tokens: usize, ) -> Vec { + let mut history = Vec::new(); let mut selected_messages: Vec = Vec::new(); if max_tokens > 0 { let mut remaining = max_tokens; @@ -357,6 +461,8 @@ fn build_compacted_history_with_limit( }); } + history.extend(incoming_user_items.iter().cloned()); + let summary_text = if summary_text.is_empty() { "(no summary available)".to_string() } else { @@ -535,8 +641,8 @@ do things let max_tokens = 16; let big = "word ".repeat(200); let history = super::build_compacted_history_with_limit( - Vec::new(), std::slice::from_ref(&big), + &[], "SUMMARY", max_tokens, ); @@ -572,11 +678,10 @@ do things #[test] fn build_token_limited_compacted_history_appends_summary_message() { - let initial_context: Vec = Vec::new(); let user_messages = vec!["first user message".to_string()]; let summary_text = "summary text"; - let history = build_compacted_history(initial_context, &user_messages, summary_text); + let history = build_compacted_history(&user_messages, summary_text); assert!( !history.is_empty(), "expected compacted history to include summary" @@ -592,6 +697,55 @@ do things assert_eq!(summary, summary_text); } + #[test] + fn build_compacted_history_preserves_incoming_user_item_structure() { + let preserved_user_item = ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ + ContentItem::InputImage { + image_url: "data:image/png;base64,AAAA".to_string(), + }, + ContentItem::InputText { + text: "latest user with image".to_string(), + }, + ], + end_turn: None, + phase: None, + }; + + let history = super::build_compacted_history_with_limit( + &["older user".to_string()], + std::slice::from_ref(&preserved_user_item), + "SUMMARY", + 128, + ); + + let expected = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "older user".to_string(), + }], + end_turn: None, + phase: None, + }, + preserved_user_item, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "SUMMARY".to_string(), + }], + end_turn: None, + phase: None, + }, + ]; + + assert_eq!(history, expected); + } + #[test] fn process_compacted_history_replaces_developer_messages() { let compacted_history = vec![ @@ -657,7 +811,11 @@ do things }, ]; - let refreshed = process_compacted_history(compacted_history, &initial_context); + let refreshed = process_compacted_history( + compacted_history, + &initial_context, + TurnContextReinjection::ReinjectAboveLastRealUser, + ); let expected = vec![ ResponseItem::Message { id: None, @@ -766,7 +924,11 @@ keep me updated }, ]; - let refreshed = process_compacted_history(compacted_history, &initial_context); + let refreshed = process_compacted_history( + compacted_history, + &initial_context, + TurnContextReinjection::ReinjectAboveLastRealUser, + ); let expected = vec![ ResponseItem::Message { id: None, @@ -902,7 +1064,11 @@ keep me updated phase: None, }]; - let refreshed = process_compacted_history(compacted_history, &initial_context); + let refreshed = process_compacted_history( + compacted_history, + &initial_context, + TurnContextReinjection::ReinjectAboveLastRealUser, + ); let expected = vec![ ResponseItem::Message { id: None, @@ -967,7 +1133,11 @@ keep me updated phase: None, }]; - let refreshed = process_compacted_history(compacted_history, &initial_context); + let refreshed = process_compacted_history( + compacted_history, + &initial_context, + TurnContextReinjection::ReinjectAboveLastRealUser, + ); let expected = vec![ ResponseItem::Message { id: None, @@ -1008,4 +1178,320 @@ keep me updated ]; assert_eq!(refreshed, expected); } + + #[test] + fn process_compacted_history_pre_turn_places_summary_last() { + let compacted_history = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "older user".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nsummary text"), + }], + end_turn: None, + phase: None, + }, + ]; + let initial_context = vec![ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: "fresh permissions".to_string(), + }], + end_turn: None, + phase: None, + }]; + + let refreshed = process_compacted_history( + compacted_history, + &initial_context, + TurnContextReinjection::ReinjectAboveLastRealUser, + ); + let expected = vec![ + ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: "fresh permissions".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "older user".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nsummary text"), + }], + end_turn: None, + phase: None, + }, + ]; + assert_eq!(refreshed, expected); + } + + #[test] + fn process_compacted_history_preserves_summary_order() { + let compacted_history = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "older user".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nolder summary"), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "newer user".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nlatest summary"), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "assistant after latest summary".to_string(), + }], + end_turn: None, + phase: None, + }, + ]; + + let refreshed = + process_compacted_history(compacted_history, &[], TurnContextReinjection::Skip); + let expected = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "older user".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nolder summary"), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "newer user".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nlatest summary"), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "assistant after latest summary".to_string(), + }], + end_turn: None, + phase: None, + }, + ]; + assert_eq!(refreshed, expected); + } + + #[test] + fn process_compacted_history_skips_context_insertion_without_real_user_message() { + let compacted_history = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nsummary text"), + }], + end_turn: None, + phase: None, + }]; + let initial_context = vec![ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: "fresh permissions".to_string(), + }], + end_turn: None, + phase: None, + }]; + + let refreshed = process_compacted_history( + compacted_history, + &initial_context, + TurnContextReinjection::Skip, + ); + let expected = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nsummary text"), + }], + end_turn: None, + phase: None, + }]; + assert_eq!(refreshed, expected); + } + + #[test] + fn process_compacted_history_reinject_noops_without_real_user_message() { + let compacted_history = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nsummary text"), + }], + end_turn: None, + phase: None, + }]; + let initial_context = vec![ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: "fresh permissions".to_string(), + }], + end_turn: None, + phase: None, + }]; + + let refreshed = process_compacted_history( + compacted_history, + &initial_context, + TurnContextReinjection::ReinjectAboveLastRealUser, + ); + let expected = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nsummary text"), + }], + end_turn: None, + phase: None, + }]; + assert_eq!(refreshed, expected); + } + + #[test] + fn process_compacted_history_mid_turn_without_orphan_user_places_summary_last() { + let compacted_history = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "older user".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nsummary text"), + }], + end_turn: None, + phase: None, + }, + ]; + let initial_context = vec![ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: "fresh permissions".to_string(), + }], + end_turn: None, + phase: None, + }]; + + let refreshed = process_compacted_history( + compacted_history, + &initial_context, + TurnContextReinjection::ReinjectAboveLastRealUser, + ); + let expected = vec![ + ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: "fresh permissions".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "older user".to_string(), + }], + end_turn: None, + phase: None, + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!("{SUMMARY_PREFIX}\nsummary text"), + }], + end_turn: None, + phase: None, + }, + ]; + assert_eq!(refreshed, expected); + } } diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 4fe07560b9..55891c2b67 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -3,10 +3,14 @@ use std::sync::Arc; use crate::Prompt; use crate::codex::Session; use crate::codex::TurnContext; +use crate::compact::AutoCompactCallsite; +use crate::compact::TurnContextReinjection; use crate::context_manager::ContextManager; use crate::context_manager::TotalTokenUsageBreakdown; +use crate::context_manager::estimate_item_token_count; use crate::context_manager::estimate_response_item_model_visible_bytes; use crate::context_manager::is_codex_generated_item; +use crate::context_manager::is_user_turn_boundary; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::protocol::CompactedItem; @@ -24,8 +28,19 @@ use tracing::info; pub(crate) async fn run_inline_remote_auto_compact_task( sess: Arc, turn_context: Arc, + auto_compact_callsite: AutoCompactCallsite, + // Controls whether canonical turn context should be reinserted into compacted history. + turn_context_reinjection: TurnContextReinjection, + incoming_items: Option>, ) -> CodexResult<()> { - run_remote_compact_task_inner(&sess, &turn_context).await?; + run_remote_compact_task_inner( + &sess, + &turn_context, + auto_compact_callsite, + turn_context_reinjection, + incoming_items, + ) + .await?; Ok(()) } @@ -40,18 +55,40 @@ pub(crate) async fn run_remote_compact_task( }); sess.send_event(&turn_context, start_event).await; - run_remote_compact_task_inner(&sess, &turn_context).await + run_remote_compact_task_inner( + &sess, + &turn_context, + AutoCompactCallsite::PreTurnExcludingIncomingUserMessage, + // Manual `/compact` should not reinsert turn context into compacted history; we reseed + // canonical initial context before the next user turn. + TurnContextReinjection::Skip, + None, + ) + .await } async fn run_remote_compact_task_inner( sess: &Arc, turn_context: &Arc, + auto_compact_callsite: AutoCompactCallsite, + turn_context_reinjection: TurnContextReinjection, + incoming_items: Option>, ) -> CodexResult<()> { - if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await { - let event = EventMsg::Error( - err.to_error_event(Some("Error running remote compact task".to_string())), + if let Err(err) = run_remote_compact_task_inner_impl( + sess, + turn_context, + auto_compact_callsite, + turn_context_reinjection, + incoming_items, + ) + .await + { + error!( + turn_id = %turn_context.sub_id, + auto_compact_callsite = ?auto_compact_callsite, + compact_error = %err, + "remote compaction task failed" ); - sess.send_event(turn_context, event).await; return Err(err); } Ok(()) @@ -60,6 +97,9 @@ async fn run_remote_compact_task_inner( async fn run_remote_compact_task_inner_impl( sess: &Arc, turn_context: &Arc, + auto_compact_callsite: AutoCompactCallsite, + turn_context_reinjection: TurnContextReinjection, + incoming_items: Option>, ) -> CodexResult<()> { let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); sess.emit_turn_item_started(turn_context, &compaction_item) @@ -70,10 +110,21 @@ async fn run_remote_compact_task_inner_impl( &mut history, turn_context.as_ref(), &base_instructions, + incoming_items.as_deref(), ); + if let Some(incoming_items) = incoming_items { + history.record_items(incoming_items.iter(), turn_context.truncation_policy); + } + if !history.raw_items().iter().any(is_user_turn_boundary) { + // Nothing to compact: do not rewrite history when there is no user-turn boundary. + sess.emit_turn_item_completed(turn_context, compaction_item) + .await; + return Ok(()); + } if deleted_items > 0 { info!( turn_id = %turn_context.sub_id, + auto_compact_callsite = ?auto_compact_callsite, deleted_items, "trimmed history items before remote compaction" ); @@ -110,6 +161,7 @@ async fn run_remote_compact_task_inner_impl( build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text); log_remote_compact_failure( turn_context, + auto_compact_callsite, &compact_request_log_data, total_usage_breakdown, &err, @@ -118,7 +170,7 @@ async fn run_remote_compact_task_inner_impl( }) .await?; new_history = sess - .process_compacted_history(turn_context, new_history) + .process_compacted_history(turn_context, new_history, turn_context_reinjection) .await; if !ghost_snapshots.is_empty() { @@ -163,12 +215,14 @@ fn build_compact_request_log_data( fn log_remote_compact_failure( turn_context: &TurnContext, + auto_compact_callsite: AutoCompactCallsite, log_data: &CompactRequestLogData, total_usage_breakdown: TotalTokenUsageBreakdown, err: &CodexErr, ) { error!( turn_id = %turn_context.sub_id, + auto_compact_callsite = ?auto_compact_callsite, last_api_response_total_tokens = total_usage_breakdown.last_api_response_total_tokens, all_history_items_model_visible_bytes = total_usage_breakdown.all_history_items_model_visible_bytes, estimated_tokens_of_items_added_since_last_successful_api_response = total_usage_breakdown.estimated_tokens_of_items_added_since_last_successful_api_response, @@ -184,15 +238,37 @@ fn trim_function_call_history_to_fit_context_window( history: &mut ContextManager, turn_context: &TurnContext, base_instructions: &BaseInstructions, + incoming_items: Option<&[ResponseItem]>, +) -> usize { + let Some(context_window) = turn_context.model_context_window() else { + return 0; + }; + let incoming_items_tokens = incoming_items + .unwrap_or_default() + .iter() + .map(estimate_item_token_count) + .fold(0_i64, i64::saturating_add); + trim_codex_generated_tail_items_to_fit_context_window( + history, + context_window, + base_instructions, + incoming_items_tokens, + ) +} + +fn trim_codex_generated_tail_items_to_fit_context_window( + history: &mut ContextManager, + context_window: i64, + base_instructions: &BaseInstructions, + incoming_items_tokens: i64, ) -> usize { let mut deleted_items = 0usize; - let Some(context_window) = turn_context.model_context_window() else { - return deleted_items; - }; while history .estimate_token_count_with_base_instructions(base_instructions) - .is_some_and(|estimated_tokens| estimated_tokens > context_window) + .is_some_and(|estimated_tokens| { + estimated_tokens.saturating_add(incoming_items_tokens) > context_window + }) { let Some(last_item) = history.raw_items().last() else { break; @@ -208,3 +284,90 @@ fn trim_function_call_history_to_fit_context_window( deleted_items } + +#[cfg(test)] +mod tests { + use super::*; + use crate::truncate::TruncationPolicy; + use codex_protocol::models::ContentItem; + use pretty_assertions::assert_eq; + + fn user_message(text: &str) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: text.to_string(), + }], + end_turn: None, + phase: None, + } + } + + fn developer_message(text: &str) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: text.to_string(), + }], + end_turn: None, + phase: None, + } + } + + #[test] + fn trim_accounts_for_incoming_items_tokens() { + let base_instructions = BaseInstructions { + text: String::new(), + }; + let incoming_items = vec![user_message( + "INCOMING_USER_MESSAGE_THAT_TIPS_OVER_THE_WINDOW", + )]; + let incoming_items_tokens = incoming_items + .iter() + .map(estimate_item_token_count) + .fold(0_i64, i64::saturating_add); + assert!( + incoming_items_tokens > 0, + "expected incoming item token estimate to be positive" + ); + + let mut history = ContextManager::new(); + let history_items = vec![ + user_message("USER_ONE"), + developer_message("TRAILING_CODEX_GENERATED_CONTEXT"), + ]; + history.record_items(history_items.iter(), TruncationPolicy::Tokens(10_000)); + let history_tokens = history + .estimate_token_count_with_base_instructions(&base_instructions) + .unwrap_or_default(); + let context_window = history_tokens + .saturating_add(incoming_items_tokens) + .saturating_sub(1); + let mut without_incoming_projection = history.clone(); + + let deleted_without_incoming = trim_codex_generated_tail_items_to_fit_context_window( + &mut without_incoming_projection, + context_window, + &base_instructions, + 0, + ); + assert_eq!( + deleted_without_incoming, 0, + "history-only projection should not trim when currently under the limit" + ); + + let deleted_with_incoming = trim_codex_generated_tail_items_to_fit_context_window( + &mut history, + context_window, + &base_instructions, + incoming_items_tokens, + ); + assert_eq!( + deleted_with_incoming, 1, + "incoming projection should trim trailing codex-generated history to fit pre-turn request" + ); + assert_eq!(history.raw_items(), vec![user_message("USER_ONE")]); + } +} diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index b6b3ceae5a..acd9778d96 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -395,7 +395,7 @@ fn estimate_reasoning_length(encoded_len: usize) -> usize { .saturating_sub(650) } -fn estimate_item_token_count(item: &ResponseItem) -> i64 { +pub(crate) fn estimate_item_token_count(item: &ResponseItem) -> i64 { let model_visible_bytes = estimate_response_item_model_visible_bytes(item); approx_tokens_from_byte_count_i64(model_visible_bytes) } diff --git a/codex-rs/core/src/context_manager/mod.rs b/codex-rs/core/src/context_manager/mod.rs index dcaf794c49..5de617be7d 100644 --- a/codex-rs/core/src/context_manager/mod.rs +++ b/codex-rs/core/src/context_manager/mod.rs @@ -3,6 +3,7 @@ mod normalize; pub(crate) use history::ContextManager; pub(crate) use history::TotalTokenUsageBreakdown; +pub(crate) use history::estimate_item_token_count; pub(crate) use history::estimate_response_item_model_visible_bytes; pub(crate) use history::is_codex_generated_item; pub(crate) use history::is_user_turn_boundary; diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index b56f7b1df5..9835c95bee 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use super::SessionTask; use super::SessionTaskContext; use crate::codex::TurnContext; +use crate::context_manager::is_user_turn_boundary; +use crate::protocol::EventMsg; use crate::state::TaskKind; use async_trait::async_trait; use codex_protocol::user_input::UserInput; @@ -25,20 +27,48 @@ impl SessionTask for CompactTask { _cancellation_token: CancellationToken, ) -> Option { let session = session.clone_session(); + let has_user_turn_boundary = session + .clone_history() + .await + .raw_items() + .iter() + .any(is_user_turn_boundary); if crate::compact::should_use_remote_compact_task(&ctx.provider) { let _ = session.services.otel_manager.counter( "codex.task.compact", 1, &[("type", "remote")], ); - let _ = crate::compact_remote::run_remote_compact_task(session, ctx).await; + if let Err(err) = + crate::compact_remote::run_remote_compact_task(session.clone(), ctx.clone()).await + { + let event = EventMsg::Error( + err.to_error_event(Some("Error running remote compact task".to_string())), + ); + session.send_event(&ctx, event).await; + } else if has_user_turn_boundary { + // Manual `/compact` rewrites history to compacted transcript items and drops + // per-turn context entries. Force initial-context reseeding on the next user turn. + session.mark_initial_context_unseeded_for_next_turn().await; + } } else { let _ = session.services.otel_manager.counter( "codex.task.compact", 1, &[("type", "local")], ); - let _ = crate::compact::run_compact_task(session, ctx, input).await; + if let Err(err) = + crate::compact::run_compact_task(session.clone(), ctx.clone(), input).await + { + let event = EventMsg::Error( + err.to_error_event(Some("Error running local compact task".to_string())), + ); + session.send_event(&ctx, event).await; + } else if has_user_turn_boundary { + // Manual `/compact` rewrites history to compacted transcript items and drops + // per-turn context entries. Force initial-context reseeding on the next user turn. + session.mark_initial_context_unseeded_for_next_turn().await; + } } None diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 8782d7cedb..355e099369 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -8,6 +8,7 @@ use crate::codex::run_turn; use crate::state::TaskKind; use async_trait::async_trait; use codex_otel::OtelManager; +use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ModelInfo; use codex_protocol::user_input::UserInput; use futures::future::BoxFuture; @@ -24,12 +25,14 @@ type PrewarmedSessionTask = JoinHandle>; pub(crate) struct RegularTask { prewarmed_session_task: Mutex>, + pre_turn_context_items: Vec, } impl Default for RegularTask { fn default() -> Self { Self { prewarmed_session_task: Mutex::new(None), + pre_turn_context_items: Vec::new(), } } } @@ -58,9 +61,15 @@ impl RegularTask { Self { prewarmed_session_task: Mutex::new(Some(prewarmed_session_task)), + pre_turn_context_items: Vec::new(), } } + pub(crate) fn with_pre_turn_context_items(mut self, items: Vec) -> Self { + self.pre_turn_context_items = items; + self + } + async fn take_prewarmed_session(&self) -> Option { let prewarmed_session_task = self .prewarmed_session_task @@ -104,6 +113,7 @@ impl SessionTask for RegularTask { sess, ctx, input, + self.pre_turn_context_items.clone(), prewarmed_client_session, cancellation_token, ) diff --git a/codex-rs/core/tests/suite/model_switching.rs b/codex-rs/core/tests/suite/model_switching.rs index fddd3d282d..8c58f9b20f 100644 --- a/codex-rs/core/tests/suite/model_switching.rs +++ b/codex-rs/core/tests/suite/model_switching.rs @@ -113,6 +113,94 @@ async fn model_change_appends_model_instructions_developer_message() -> Result<( Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn settings_only_empty_turn_persists_updates_for_next_non_empty_turn() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let resp_mock = mount_sse_sequence( + &server, + vec![sse_completed("resp-1"), sse_completed("resp-2")], + ) + .await; + + let mut builder = test_codex().with_model("gpt-5.2-codex"); + let test = builder.build(&server).await?; + let model = test.session_configured.model.clone(); + + test.codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "first".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: test.cwd_path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::ReadOnly, + model: model.clone(), + effort: test.config.model_reasoning_effort, + summary: ReasoningSummary::Auto, + collaboration_mode: None, + personality: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + // Settings-only turn with no user message. + test.codex + .submit(Op::UserTurn { + items: Vec::new(), + final_output_json_schema: None, + cwd: test.cwd_path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: model.clone(), + effort: test.config.model_reasoning_effort, + summary: ReasoningSummary::Auto, + collaboration_mode: None, + personality: None, + }) + .await?; + + test.codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "after settings-only turn".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: test.cwd_path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model, + effort: test.config.model_reasoning_effort, + summary: ReasoningSummary::Auto, + collaboration_mode: None, + personality: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let requests = resp_mock.requests(); + assert_eq!( + requests.len(), + 2, + "expected only first and third turns to hit the model" + ); + + let third_turn_request = requests.last().expect("expected third turn request"); + let developer_texts = third_turn_request.message_input_texts("developer"); + assert!( + developer_texts + .iter() + .any(|text| text.contains("sandbox_mode` is `danger-full-access`")), + "expected danger-full-access permissions update in next non-empty turn: {developer_texts:?}" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn model_and_personality_change_only_appends_model_instructions() -> Result<()> { skip_if_no_network!(Ok(()));