From 0bb3a2bbd59620f7ac4e92ba6a047ba668a60b68 Mon Sep 17 00:00:00 2001 From: Roy Han Date: Tue, 19 May 2026 17:31:37 -0700 Subject: [PATCH] limit automatic compactions per turn --- codex-rs/core/src/session/turn.rs | 148 ++++++++++--- codex-rs/core/src/tasks/regular.rs | 29 ++- codex-rs/core/tests/suite/compact.rs | 114 ++++++++++ codex-rs/core/tests/suite/pending_input.rs | 232 +++++++++++++++++++++ 4 files changed, 496 insertions(+), 27 deletions(-) diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 26ba845460..3df8052902 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -114,6 +114,13 @@ use tracing::trace; use tracing::trace_span; use tracing::warn; +const MAX_AUTO_COMPACTIONS_PER_TURN: usize = 3; + +pub(crate) enum RunTurnResult { + Continue(Option), + Terminal, +} + /// Takes a user message as input and runs a loop where, at each sampling request, the model /// replies with either: /// @@ -132,33 +139,44 @@ pub(crate) async fn run_turn( sess: Arc, turn_context: Arc, turn_extension_data: Arc, + auto_compact_limiter: &mut AutoCompactTurnLimiter, input: Vec, prewarmed_client_session: Option, cancellation_token: CancellationToken, -) -> Option { +) -> RunTurnResult { let mut client_session = prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session()); // TODO(ccunningham): Pre-turn compaction runs before context updates and the // new user message are recorded. Estimate pending incoming items (context // diffs/full reinjection + user input) and trigger compaction preemptively // when they would push the thread over the compaction threshold. - let pre_sampling_compact = - match run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await { - Ok(pre_sampling_compact) => pre_sampling_compact, - Err(err) => { - if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded - && let Err(err) = sess - .goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached { - turn_context: turn_context.as_ref(), - }) - .await - { - warn!("failed to usage-limit active goal after usage-limit error: {err}"); - } - error!("Failed to run pre-sampling compact"); - return None; + let pre_sampling_compact = match run_pre_sampling_compact( + &sess, + &turn_context, + &mut client_session, + auto_compact_limiter, + ) + .await + { + Ok(pre_sampling_compact) => pre_sampling_compact, + Err(err) => { + if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded + && let Err(err) = sess + .goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached { + turn_context: turn_context.as_ref(), + }) + .await + { + warn!("failed to usage-limit active goal after usage-limit error: {err}"); } - }; + error!("Failed to run pre-sampling compact"); + return if err.to_codex_protocol_error() == CodexErrorInfo::ContextWindowExceeded { + RunTurnResult::Terminal + } else { + RunTurnResult::Continue(None) + }; + } + }; if pre_sampling_compact.reset_client_session { client_session.reset_websocket_session(); } @@ -166,11 +184,14 @@ pub(crate) async fn run_turn( sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref()) .await; - let (injection_items, explicitly_enabled_connectors) = - build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await?; + let Some((injection_items, explicitly_enabled_connectors)) = + build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await + else { + return RunTurnResult::Continue(None); + }; if run_pending_session_start_hooks(&sess, &turn_context).await { - return None; + return RunTurnResult::Continue(None); } if !input.is_empty() { let initial_turn_input = TurnInput::UserInput(input.clone()); @@ -183,7 +204,7 @@ pub(crate) async fn run_turn( user_prompt_submit_outcome.additional_contexts, ) .await; - return None; + return RunTurnResult::Continue(None); } record_pending_input( &sess, @@ -249,6 +270,7 @@ pub(crate) async fn run_turn( let mut blocked_pending_input = false; let mut accepted_pending_input = false; for pending_input_item in pending_input { + let is_user_input = matches!(&pending_input_item, TurnInput::UserInput(_)); let hook_outcome = inspect_pending_input(&sess, &turn_context, &pending_input_item).await; if hook_outcome.should_stop { @@ -264,6 +286,9 @@ pub(crate) async fn run_turn( hook_outcome.additional_contexts, ) .await; + if is_user_input { + auto_compact_limiter.reset_after_user_input(); + } } } if blocked_pending_input && !accepted_pending_input { @@ -329,6 +354,7 @@ pub(crate) async fn run_turn( &sess, &turn_context, &mut client_session, + auto_compact_limiter, InitialContextInjection::BeforeLastUserMessage, CompactionReason::ContextLimit, CompactionPhase::MidTurn, @@ -348,7 +374,13 @@ pub(crate) async fn run_turn( "failed to usage-limit active goal after usage-limit error: {err}" ); } - return None; + return if err.to_codex_protocol_error() + == CodexErrorInfo::ContextWindowExceeded + { + RunTurnResult::Terminal + } else { + RunTurnResult::Continue(None) + }; } }; if reset_client_session { @@ -399,7 +431,7 @@ pub(crate) async fn run_turn( ) .await { - return None; + return RunTurnResult::Continue(None); } break; } @@ -447,7 +479,7 @@ pub(crate) async fn run_turn( } } - last_agent_message + RunTurnResult::Continue(last_agent_message) } #[expect( @@ -652,6 +684,56 @@ struct AutoCompactTokenStatus { token_limit_reached: bool, } +#[derive(Default)] +pub(crate) struct AutoCompactTurnLimiter { + attempts: usize, +} + +impl AutoCompactTurnLimiter { + fn reset_after_user_input(&mut self) { + self.attempts = 0; + } + + async fn acquire( + &mut self, + sess: &Session, + turn_context: &TurnContext, + reason: CompactionReason, + phase: CompactionPhase, + ) -> CodexResult<()> { + if self.attempts >= MAX_AUTO_COMPACTIONS_PER_TURN { + let token_status = auto_compact_token_status(sess, turn_context).await; + warn!( + turn_id = %turn_context.sub_id, + auto_compactions_attempted = self.attempts, + auto_compaction_limit = MAX_AUTO_COMPACTIONS_PER_TURN, + reason = ?reason, + phase = ?phase, + active_context_tokens = token_status.active_context_tokens, + auto_compact_scope_tokens = token_status.auto_compact_scope_tokens, + auto_compact_scope_limit = token_status.auto_compact_scope_limit, + full_context_window_limit = ?token_status.full_context_window_limit, + full_context_window_limit_reached = token_status.full_context_window_limit_reached, + "automatic compaction limit reached; stopping turn" + ); + sess.send_event( + turn_context, + EventMsg::Error(ErrorEvent { + message: format!( + "Codex stopped after {MAX_AUTO_COMPACTIONS_PER_TURN} automatic compactions in this turn because the context remained too large. Queued input for this turn was not processed; start a new thread or reduce earlier history before retrying." + ), + codex_error_info: Some(CodexErrorInfo::ContextWindowExceeded), + }), + ) + .await; + return Err(CodexErr::ContextWindowExceeded); + } + + self.attempts += 1; + Ok(()) + } +} + async fn auto_compact_token_status( sess: &Session, turn_context: &TurnContext, @@ -708,9 +790,15 @@ async fn run_pre_sampling_compact( sess: &Arc, turn_context: &Arc, client_session: &mut ModelClientSession, + auto_compact_limiter: &mut AutoCompactTurnLimiter, ) -> CodexResult { - let mut pre_sampling_compacted = - maybe_run_previous_model_inline_compact(sess, turn_context, client_session).await?; + let mut pre_sampling_compacted = maybe_run_previous_model_inline_compact( + sess, + turn_context, + client_session, + auto_compact_limiter, + ) + .await?; let mut reset_client_session = pre_sampling_compacted; let token_status = auto_compact_token_status(sess.as_ref(), turn_context.as_ref()).await; // Compact if the configured auto-compaction budget or usable context window is exhausted. @@ -719,6 +807,7 @@ async fn run_pre_sampling_compact( sess, turn_context, client_session, + auto_compact_limiter, InitialContextInjection::DoNotInject, CompactionReason::ContextLimit, CompactionPhase::PreTurn, @@ -741,6 +830,7 @@ async fn maybe_run_previous_model_inline_compact( sess: &Arc, turn_context: &Arc, client_session: &mut ModelClientSession, + auto_compact_limiter: &mut AutoCompactTurnLimiter, ) -> CodexResult { let Some(previous_turn_settings) = sess.previous_turn_settings().await else { return Ok(false); @@ -780,6 +870,7 @@ async fn maybe_run_previous_model_inline_compact( sess, &previous_model_turn_context, client_session, + auto_compact_limiter, InitialContextInjection::DoNotInject, CompactionReason::ModelDownshift, CompactionPhase::PreTurn, @@ -794,10 +885,15 @@ async fn run_auto_compact( sess: &Arc, turn_context: &Arc, client_session: &mut ModelClientSession, + auto_compact_limiter: &mut AutoCompactTurnLimiter, initial_context_injection: InitialContextInjection, reason: CompactionReason, phase: CompactionPhase, ) -> CodexResult { + auto_compact_limiter + .acquire(sess.as_ref(), turn_context.as_ref(), reason, phase) + .await?; + if should_use_remote_compact_task(turn_context.provider.info()) { if turn_context.features.enabled(Feature::RemoteCompactionV2) { run_inline_remote_auto_compact_task_v2( diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 50414df278..69447bc8d7 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -2,6 +2,8 @@ use std::sync::Arc; use tokio_util::sync::CancellationToken; +use crate::session::turn::AutoCompactTurnLimiter; +use crate::session::turn::RunTurnResult; use crate::session::turn::run_turn; use crate::session::turn_context::TurnContext; use crate::session_startup_prewarm::SessionStartupPrewarmResolution; @@ -69,17 +71,42 @@ impl SessionTask for RegularTask { }; let mut next_input = input; let mut prewarmed_client_session = prewarmed_client_session; + let mut auto_compact_limiter = AutoCompactTurnLimiter::default(); loop { - let last_agent_message = run_turn( + let turn_result = run_turn( Arc::clone(&sess), Arc::clone(&ctx), Arc::clone(&turn_extension_data), + &mut auto_compact_limiter, next_input, prewarmed_client_session.take(), cancellation_token.child_token(), ) .instrument(run_turn_span.clone()) .await; + let (last_agent_message, can_restart_with_pending_input) = match turn_result { + RunTurnResult::Continue(last_agent_message) => (last_agent_message, true), + RunTurnResult::Terminal => { + let turn_state = { + let active_turn = sess.active_turn.lock().await; + active_turn + .as_ref() + .map(|active_turn| Arc::clone(&active_turn.turn_state)) + }; + if let Some(turn_state) = turn_state { + turn_state.lock().await.clear_pending_waiters(); + drop( + sess.input_queue + .take_pending_input_for_turn_state(turn_state.as_ref()) + .await, + ); + } + (None, false) + } + }; + if !can_restart_with_pending_input { + return last_agent_message; + } if !sess.input_queue.has_pending_input(&sess.active_turn).await { return last_agent_message; } diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 140b4ec2b1..938046e30b 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -13,6 +13,7 @@ use codex_protocol::models::PermissionProfile; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ModelsResponse; use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::HookEventName; use codex_protocol::protocol::HookRunStatus; @@ -1490,6 +1491,119 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { assert_eq!(requests_payloads.len(), 7); } +// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts. +#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))] +#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn auto_compact_stops_after_per_turn_limit() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let token_count_used = 270_000; + let token_count_used_after_compaction = 80_000; + let sampling_response = |idx: usize| { + let reasoning = ev_reasoning_item( + &format!("limit-reasoning-{idx}"), + &[&format!("step {idx}")], + &[], + ); + sse(vec![ + reasoning, + ev_shell_command_call( + &format!("limit-shell-{idx}"), + &format!("echo auto-compact-limit-{idx}"), + ), + ev_completed_with_tokens(&format!("limit-response-{idx}"), token_count_used), + ]) + }; + let compact_response = |idx: usize| { + sse(vec![ + ev_assistant_message( + &format!("limit-compact-message-{idx}"), + &format!("AUTO_LIMIT_SUMMARY_{idx}"), + ), + ev_completed_with_tokens( + &format!("limit-compact-response-{idx}"), + token_count_used_after_compaction, + ), + ]) + }; + + let request_log = mount_sse_sequence( + &server, + vec![ + sampling_response(1), + compact_response(1), + sampling_response(2), + compact_response(2), + sampling_response(3), + compact_response(3), + sampling_response(4), + ], + ) + .await; + + let model_provider = non_openai_model_provider(&server); + let codex = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(200_000); + }) + .build(&server) + .await + .expect("build codex") + .codex; + + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: "trigger repeated auto compaction".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + thread_settings: Default::default(), + }) + .await + .expect("submit user input"); + + let error = wait_for_event_match(&codex, |event| match event { + EventMsg::Error(err) + if err + .message + .contains("Codex stopped after 3 automatic compactions in this turn") => + { + Some(err.clone()) + } + _ => None, + }) + .await; + + assert_eq!( + error.codex_error_info, + Some(CodexErrorInfo::ContextWindowExceeded) + ); + + let requests = request_log.requests(); + let compaction_requests = requests + .iter() + .filter(|request| { + body_contains_text(&request.body_json().to_string(), SUMMARIZATION_PROMPT) + }) + .count(); + assert_eq!( + compaction_requests, 3, + "fourth automatic compaction should be blocked before dispatch" + ); + assert_eq!( + requests.len(), + 7, + "expected four sampling requests and three dispatched compaction requests" + ); +} + // Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts. #[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))] #[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))] diff --git a/codex-rs/core/tests/suite/pending_input.rs b/codex-rs/core/tests/suite/pending_input.rs index b057b86171..81979117e3 100644 --- a/codex-rs/core/tests/suite/pending_input.rs +++ b/codex-rs/core/tests/suite/pending_input.rs @@ -20,6 +20,7 @@ use core_test_support::responses::ev_output_text_delta; use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::ev_reasoning_item_added; use core_test_support::responses::ev_response_created; +use core_test_support::responses::ev_shell_command_call; use core_test_support::streaming_sse::StreamingSseChunk; use core_test_support::streaming_sse::StreamingSseServer; use core_test_support::streaming_sse::start_streaming_sse_server; @@ -679,6 +680,237 @@ async fn steered_user_input_follows_compact_when_only_the_steer_needs_follow_up( server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn steered_user_input_resets_auto_compact_limit() { + let (gate_first_completed_tx, gate_first_completed_rx) = oneshot::channel(); + let (gate_second_completed_tx, gate_second_completed_rx) = oneshot::channel(); + let (gate_third_completed_tx, gate_third_completed_rx) = oneshot::channel(); + let (gate_fourth_completed_tx, gate_fourth_completed_rx) = oneshot::channel(); + + let sampling_chunks = |idx: usize, text: &'static str, gate| { + vec![ + chunk(ev_response_created(&format!("resp-{idx}"))), + chunk(ev_message_item_added(&format!("msg-{idx}"), "")), + chunk(ev_output_text_delta(text)), + chunk(json!({ + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "id": format!("msg-{idx}"), + "content": [{"type": "output_text", "text": text}], + "phase": "commentary", + } + })), + gated_chunk( + gate, + vec![ev_completed_with_tokens( + &format!("resp-{idx}"), + /*total_tokens*/ 500, + )], + ), + ] + }; + let compact_chunks = |idx: usize| { + vec![ + chunk(ev_response_created(&format!("resp-compact-{idx}"))), + chunk(ev_message_item_done( + &format!("msg-compact-{idx}"), + &format!("AUTO_COMPACT_SUMMARY_{idx}"), + )), + chunk(ev_completed_with_tokens( + &format!("resp-compact-{idx}"), + /*total_tokens*/ 50, + )), + ] + }; + let final_chunks = vec![ + chunk(ev_response_created("resp-final")), + chunk(ev_message_item_done("msg-final", "processed fifth prompt")), + chunk(ev_completed_with_tokens( + "resp-final", + /*total_tokens*/ 70, + )), + ]; + + let (server, _completions) = start_streaming_sse_server(vec![ + sampling_chunks(1, "first answer", gate_first_completed_rx), + compact_chunks(1), + sampling_chunks(2, "processed second prompt", gate_second_completed_rx), + compact_chunks(2), + sampling_chunks(3, "processed third prompt", gate_third_completed_rx), + compact_chunks(3), + sampling_chunks(4, "processed fourth prompt", gate_fourth_completed_rx), + compact_chunks(4), + final_chunks, + ]) + .await; + + let codex = test_codex() + .with_model("gpt-5.4") + .with_config(|config| { + config.model_provider.name = "OpenAI (test)".to_string(); + config.model_provider.supports_websockets = false; + config.model_auto_compact_token_limit = Some(200); + }) + .build_with_streaming_server(&server) + .await + .unwrap_or_else(|err| panic!("build streaming Codex test session: {err}")) + .codex; + + submit_user_input(&codex, "first prompt").await; + + wait_for_agent_message(&codex, "first answer").await; + steer_user_input(&codex, "second prompt").await; + let _ = gate_first_completed_tx.send(()); + + wait_for_agent_message(&codex, "processed second prompt").await; + steer_user_input(&codex, "third prompt").await; + let _ = gate_second_completed_tx.send(()); + + wait_for_agent_message(&codex, "processed third prompt").await; + steer_user_input(&codex, "fourth prompt").await; + let _ = gate_third_completed_tx.send(()); + + wait_for_agent_message(&codex, "processed fourth prompt").await; + steer_user_input(&codex, "fifth prompt").await; + let _ = gate_fourth_completed_tx.send(()); + + wait_for_agent_message(&codex, "processed fifth prompt").await; + wait_for_turn_complete(&codex).await; + + let requests = server.requests().await; + assert_eq!( + requests.len(), + 9, + "four auto compactions should be allowed when separated by accepted steered input" + ); + + let final_body: Value = + from_slice(&requests[8]).unwrap_or_else(|err| panic!("parse final request: {err}")); + let final_user_texts = message_input_texts(&final_body, "user"); + assert!( + final_user_texts.iter().any(|text| text == "fifth prompt"), + "steered input after the fourth compaction should be processed" + ); + + server.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn steered_user_input_is_discarded_when_auto_compact_guard_fires() { + let (gate_fourth_completed_tx, gate_fourth_completed_rx) = oneshot::channel(); + let token_count_used = 270_000; + let token_count_used_after_compaction = 80_000; + + let tool_sampling_chunks = |idx: usize| { + vec![ + chunk(ev_response_created(&format!("resp-{idx}"))), + chunk(ev_shell_command_call( + &format!("call-{idx}"), + &format!("echo step-{idx}"), + )), + chunk(ev_completed_with_tokens( + &format!("resp-{idx}"), + token_count_used, + )), + ] + }; + let compact_chunks = |idx: usize| { + vec![ + chunk(ev_response_created(&format!("resp-compact-{idx}"))), + chunk(ev_message_item_done( + &format!("msg-compact-{idx}"), + &format!("AUTO_COMPACT_SUMMARY_{idx}"), + )), + chunk(ev_completed_with_tokens( + &format!("resp-compact-{idx}"), + token_count_used_after_compaction, + )), + ] + }; + let fourth_sampling_chunks = vec![ + chunk(ev_response_created("resp-4")), + chunk(ev_message_item_added("msg-4", "")), + chunk(ev_output_text_delta("ready for late steer")), + chunk(ev_message_item_done("msg-4", "ready for late steer")), + gated_chunk( + gate_fourth_completed_rx, + vec![ev_completed_with_tokens("resp-4", token_count_used)], + ), + ]; + let next_turn_chunks = vec![ + chunk(ev_response_created("resp-next")), + chunk(ev_message_item_done("msg-next", "processed next prompt")), + chunk(ev_completed_with_tokens( + "resp-next", + /*total_tokens*/ 70, + )), + ]; + + let mut responses = Vec::new(); + for idx in 1..=3 { + responses.push(tool_sampling_chunks(idx)); + responses.push(compact_chunks(idx)); + } + responses.extend([fourth_sampling_chunks, compact_chunks(4), next_turn_chunks]); + let (server, _completions) = start_streaming_sse_server(responses).await; + + let test = test_codex() + .with_model("gpt-5.4") + .with_config(|config| { + config.model_provider.name = "OpenAI (test)".to_string(); + config.model_provider.supports_websockets = false; + config.model_auto_compact_token_limit = Some(200_000); + }) + .build_with_streaming_server(&server) + .await + .unwrap_or_else(|err| panic!("build streaming Codex test session: {err}")); + let codex = test.codex.clone(); + + submit_danger_full_access_user_turn(&test, "first prompt").await; + wait_for_agent_message(&codex, "ready for late steer").await; + steer_user_input(&codex, "late steer").await; + let _ = gate_fourth_completed_tx.send(()); + + wait_for_turn_complete(&codex).await; + assert_eq!( + server.requests().await.len(), + 7, + "guarded turn should stop before dispatching a fourth automatic compaction" + ); + + submit_user_input(&codex, "next prompt").await; + wait_for_agent_message(&codex, "processed next prompt").await; + wait_for_turn_complete(&codex).await; + + let requests = server.requests().await; + assert_eq!( + requests.len(), + 9, + "next turn should compact and sample without replaying the discarded steer" + ); + + let all_user_texts: Vec = requests + .iter() + .flat_map(|request| { + let body: Value = + from_slice(request).unwrap_or_else(|err| panic!("parse request body: {err}")); + message_input_texts(&body, "user") + }) + .collect(); + assert!( + !all_user_texts.iter().any(|text| text == "late steer"), + "late steer should be discarded after the terminal compaction guard" + ); + assert!( + all_user_texts.iter().any(|text| text == "next prompt"), + "new input after the terminal guard should still start a normal turn" + ); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn steered_user_input_waits_when_tool_output_triggers_compact_before_next_request() { let (gate_first_completed_tx, gate_first_completed_rx) = oneshot::channel();