limit automatic compactions per turn

This commit is contained in:
Roy Han
2026-05-19 17:31:37 -07:00
parent 162a6e746b
commit 0bb3a2bbd5
4 changed files with 496 additions and 27 deletions

View File

@@ -114,6 +114,13 @@ use tracing::trace;
use tracing::trace_span;
use tracing::warn;
const MAX_AUTO_COMPACTIONS_PER_TURN: usize = 3;
pub(crate) enum RunTurnResult {
Continue(Option<String>),
Terminal,
}
/// Takes a user message as input and runs a loop where, at each sampling request, the model
/// replies with either:
///
@@ -132,33 +139,44 @@ pub(crate) async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
turn_extension_data: Arc<codex_extension_api::ExtensionData>,
auto_compact_limiter: &mut AutoCompactTurnLimiter,
input: Vec<UserInput>,
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> RunTurnResult {
let mut client_session =
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
// TODO(ccunningham): Pre-turn compaction runs before context updates and the
// new user message are recorded. Estimate pending incoming items (context
// diffs/full reinjection + user input) and trigger compaction preemptively
// when they would push the thread over the compaction threshold.
let pre_sampling_compact =
match run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await {
Ok(pre_sampling_compact) => pre_sampling_compact,
Err(err) => {
if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
&& let Err(err) = sess
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
turn_context: turn_context.as_ref(),
})
.await
{
warn!("failed to usage-limit active goal after usage-limit error: {err}");
}
error!("Failed to run pre-sampling compact");
return None;
let pre_sampling_compact = match run_pre_sampling_compact(
&sess,
&turn_context,
&mut client_session,
auto_compact_limiter,
)
.await
{
Ok(pre_sampling_compact) => pre_sampling_compact,
Err(err) => {
if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
&& let Err(err) = sess
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
turn_context: turn_context.as_ref(),
})
.await
{
warn!("failed to usage-limit active goal after usage-limit error: {err}");
}
};
error!("Failed to run pre-sampling compact");
return if err.to_codex_protocol_error() == CodexErrorInfo::ContextWindowExceeded {
RunTurnResult::Terminal
} else {
RunTurnResult::Continue(None)
};
}
};
if pre_sampling_compact.reset_client_session {
client_session.reset_websocket_session();
}
@@ -166,11 +184,14 @@ pub(crate) async fn run_turn(
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
.await;
let (injection_items, explicitly_enabled_connectors) =
build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await?;
let Some((injection_items, explicitly_enabled_connectors)) =
build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await
else {
return RunTurnResult::Continue(None);
};
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
return RunTurnResult::Continue(None);
}
if !input.is_empty() {
let initial_turn_input = TurnInput::UserInput(input.clone());
@@ -183,7 +204,7 @@ pub(crate) async fn run_turn(
user_prompt_submit_outcome.additional_contexts,
)
.await;
return None;
return RunTurnResult::Continue(None);
}
record_pending_input(
&sess,
@@ -249,6 +270,7 @@ pub(crate) async fn run_turn(
let mut blocked_pending_input = false;
let mut accepted_pending_input = false;
for pending_input_item in pending_input {
let is_user_input = matches!(&pending_input_item, TurnInput::UserInput(_));
let hook_outcome =
inspect_pending_input(&sess, &turn_context, &pending_input_item).await;
if hook_outcome.should_stop {
@@ -264,6 +286,9 @@ pub(crate) async fn run_turn(
hook_outcome.additional_contexts,
)
.await;
if is_user_input {
auto_compact_limiter.reset_after_user_input();
}
}
}
if blocked_pending_input && !accepted_pending_input {
@@ -329,6 +354,7 @@ pub(crate) async fn run_turn(
&sess,
&turn_context,
&mut client_session,
auto_compact_limiter,
InitialContextInjection::BeforeLastUserMessage,
CompactionReason::ContextLimit,
CompactionPhase::MidTurn,
@@ -348,7 +374,13 @@ pub(crate) async fn run_turn(
"failed to usage-limit active goal after usage-limit error: {err}"
);
}
return None;
return if err.to_codex_protocol_error()
== CodexErrorInfo::ContextWindowExceeded
{
RunTurnResult::Terminal
} else {
RunTurnResult::Continue(None)
};
}
};
if reset_client_session {
@@ -399,7 +431,7 @@ pub(crate) async fn run_turn(
)
.await
{
return None;
return RunTurnResult::Continue(None);
}
break;
}
@@ -447,7 +479,7 @@ pub(crate) async fn run_turn(
}
}
last_agent_message
RunTurnResult::Continue(last_agent_message)
}
#[expect(
@@ -652,6 +684,56 @@ struct AutoCompactTokenStatus {
token_limit_reached: bool,
}
#[derive(Default)]
pub(crate) struct AutoCompactTurnLimiter {
attempts: usize,
}
impl AutoCompactTurnLimiter {
fn reset_after_user_input(&mut self) {
self.attempts = 0;
}
async fn acquire(
&mut self,
sess: &Session,
turn_context: &TurnContext,
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<()> {
if self.attempts >= MAX_AUTO_COMPACTIONS_PER_TURN {
let token_status = auto_compact_token_status(sess, turn_context).await;
warn!(
turn_id = %turn_context.sub_id,
auto_compactions_attempted = self.attempts,
auto_compaction_limit = MAX_AUTO_COMPACTIONS_PER_TURN,
reason = ?reason,
phase = ?phase,
active_context_tokens = token_status.active_context_tokens,
auto_compact_scope_tokens = token_status.auto_compact_scope_tokens,
auto_compact_scope_limit = token_status.auto_compact_scope_limit,
full_context_window_limit = ?token_status.full_context_window_limit,
full_context_window_limit_reached = token_status.full_context_window_limit_reached,
"automatic compaction limit reached; stopping turn"
);
sess.send_event(
turn_context,
EventMsg::Error(ErrorEvent {
message: format!(
"Codex stopped after {MAX_AUTO_COMPACTIONS_PER_TURN} automatic compactions in this turn because the context remained too large. Queued input for this turn was not processed; start a new thread or reduce earlier history before retrying."
),
codex_error_info: Some(CodexErrorInfo::ContextWindowExceeded),
}),
)
.await;
return Err(CodexErr::ContextWindowExceeded);
}
self.attempts += 1;
Ok(())
}
}
async fn auto_compact_token_status(
sess: &Session,
turn_context: &TurnContext,
@@ -708,9 +790,15 @@ async fn run_pre_sampling_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
client_session: &mut ModelClientSession,
auto_compact_limiter: &mut AutoCompactTurnLimiter,
) -> CodexResult<PreSamplingCompactResult> {
let mut pre_sampling_compacted =
maybe_run_previous_model_inline_compact(sess, turn_context, client_session).await?;
let mut pre_sampling_compacted = maybe_run_previous_model_inline_compact(
sess,
turn_context,
client_session,
auto_compact_limiter,
)
.await?;
let mut reset_client_session = pre_sampling_compacted;
let token_status = auto_compact_token_status(sess.as_ref(), turn_context.as_ref()).await;
// Compact if the configured auto-compaction budget or usable context window is exhausted.
@@ -719,6 +807,7 @@ async fn run_pre_sampling_compact(
sess,
turn_context,
client_session,
auto_compact_limiter,
InitialContextInjection::DoNotInject,
CompactionReason::ContextLimit,
CompactionPhase::PreTurn,
@@ -741,6 +830,7 @@ async fn maybe_run_previous_model_inline_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
client_session: &mut ModelClientSession,
auto_compact_limiter: &mut AutoCompactTurnLimiter,
) -> CodexResult<bool> {
let Some(previous_turn_settings) = sess.previous_turn_settings().await else {
return Ok(false);
@@ -780,6 +870,7 @@ async fn maybe_run_previous_model_inline_compact(
sess,
&previous_model_turn_context,
client_session,
auto_compact_limiter,
InitialContextInjection::DoNotInject,
CompactionReason::ModelDownshift,
CompactionPhase::PreTurn,
@@ -794,10 +885,15 @@ async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
client_session: &mut ModelClientSession,
auto_compact_limiter: &mut AutoCompactTurnLimiter,
initial_context_injection: InitialContextInjection,
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<bool> {
auto_compact_limiter
.acquire(sess.as_ref(), turn_context.as_ref(), reason, phase)
.await?;
if should_use_remote_compact_task(turn_context.provider.info()) {
if turn_context.features.enabled(Feature::RemoteCompactionV2) {
run_inline_remote_auto_compact_task_v2(

View File

@@ -2,6 +2,8 @@ use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use crate::session::turn::AutoCompactTurnLimiter;
use crate::session::turn::RunTurnResult;
use crate::session::turn::run_turn;
use crate::session::turn_context::TurnContext;
use crate::session_startup_prewarm::SessionStartupPrewarmResolution;
@@ -69,17 +71,42 @@ impl SessionTask for RegularTask {
};
let mut next_input = input;
let mut prewarmed_client_session = prewarmed_client_session;
let mut auto_compact_limiter = AutoCompactTurnLimiter::default();
loop {
let last_agent_message = run_turn(
let turn_result = run_turn(
Arc::clone(&sess),
Arc::clone(&ctx),
Arc::clone(&turn_extension_data),
&mut auto_compact_limiter,
next_input,
prewarmed_client_session.take(),
cancellation_token.child_token(),
)
.instrument(run_turn_span.clone())
.await;
let (last_agent_message, can_restart_with_pending_input) = match turn_result {
RunTurnResult::Continue(last_agent_message) => (last_agent_message, true),
RunTurnResult::Terminal => {
let turn_state = {
let active_turn = sess.active_turn.lock().await;
active_turn
.as_ref()
.map(|active_turn| Arc::clone(&active_turn.turn_state))
};
if let Some(turn_state) = turn_state {
turn_state.lock().await.clear_pending_waiters();
drop(
sess.input_queue
.take_pending_input_for_turn_state(turn_state.as_ref())
.await,
);
}
(None, false)
}
};
if !can_restart_with_pending_input {
return last_agent_message;
}
if !sess.input_queue.has_pending_input(&sess.active_turn).await {
return last_agent_message;
}

View File

@@ -13,6 +13,7 @@ use codex_protocol::models::PermissionProfile;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelsResponse;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::HookEventName;
use codex_protocol::protocol::HookRunStatus;
@@ -1490,6 +1491,119 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
assert_eq!(requests_payloads.len(), 7);
}
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn auto_compact_stops_after_per_turn_limit() {
skip_if_no_network!();
let server = start_mock_server().await;
let token_count_used = 270_000;
let token_count_used_after_compaction = 80_000;
let sampling_response = |idx: usize| {
let reasoning = ev_reasoning_item(
&format!("limit-reasoning-{idx}"),
&[&format!("step {idx}")],
&[],
);
sse(vec![
reasoning,
ev_shell_command_call(
&format!("limit-shell-{idx}"),
&format!("echo auto-compact-limit-{idx}"),
),
ev_completed_with_tokens(&format!("limit-response-{idx}"), token_count_used),
])
};
let compact_response = |idx: usize| {
sse(vec![
ev_assistant_message(
&format!("limit-compact-message-{idx}"),
&format!("AUTO_LIMIT_SUMMARY_{idx}"),
),
ev_completed_with_tokens(
&format!("limit-compact-response-{idx}"),
token_count_used_after_compaction,
),
])
};
let request_log = mount_sse_sequence(
&server,
vec![
sampling_response(1),
compact_response(1),
sampling_response(2),
compact_response(2),
sampling_response(3),
compact_response(3),
sampling_response(4),
],
)
.await;
let model_provider = non_openai_model_provider(&server);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200_000);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "trigger repeated auto compaction".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
})
.await
.expect("submit user input");
let error = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err)
if err
.message
.contains("Codex stopped after 3 automatic compactions in this turn") =>
{
Some(err.clone())
}
_ => None,
})
.await;
assert_eq!(
error.codex_error_info,
Some(CodexErrorInfo::ContextWindowExceeded)
);
let requests = request_log.requests();
let compaction_requests = requests
.iter()
.filter(|request| {
body_contains_text(&request.body_json().to_string(), SUMMARIZATION_PROMPT)
})
.count();
assert_eq!(
compaction_requests, 3,
"fourth automatic compaction should be blocked before dispatch"
);
assert_eq!(
requests.len(),
7,
"expected four sampling requests and three dispatched compaction requests"
);
}
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]

View File

@@ -20,6 +20,7 @@ use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_reasoning_item;
use core_test_support::responses::ev_reasoning_item_added;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::ev_shell_command_call;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::StreamingSseServer;
use core_test_support::streaming_sse::start_streaming_sse_server;
@@ -679,6 +680,237 @@ async fn steered_user_input_follows_compact_when_only_the_steer_needs_follow_up(
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steered_user_input_resets_auto_compact_limit() {
let (gate_first_completed_tx, gate_first_completed_rx) = oneshot::channel();
let (gate_second_completed_tx, gate_second_completed_rx) = oneshot::channel();
let (gate_third_completed_tx, gate_third_completed_rx) = oneshot::channel();
let (gate_fourth_completed_tx, gate_fourth_completed_rx) = oneshot::channel();
let sampling_chunks = |idx: usize, text: &'static str, gate| {
vec![
chunk(ev_response_created(&format!("resp-{idx}"))),
chunk(ev_message_item_added(&format!("msg-{idx}"), "")),
chunk(ev_output_text_delta(text)),
chunk(json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": format!("msg-{idx}"),
"content": [{"type": "output_text", "text": text}],
"phase": "commentary",
}
})),
gated_chunk(
gate,
vec![ev_completed_with_tokens(
&format!("resp-{idx}"),
/*total_tokens*/ 500,
)],
),
]
};
let compact_chunks = |idx: usize| {
vec![
chunk(ev_response_created(&format!("resp-compact-{idx}"))),
chunk(ev_message_item_done(
&format!("msg-compact-{idx}"),
&format!("AUTO_COMPACT_SUMMARY_{idx}"),
)),
chunk(ev_completed_with_tokens(
&format!("resp-compact-{idx}"),
/*total_tokens*/ 50,
)),
]
};
let final_chunks = vec![
chunk(ev_response_created("resp-final")),
chunk(ev_message_item_done("msg-final", "processed fifth prompt")),
chunk(ev_completed_with_tokens(
"resp-final",
/*total_tokens*/ 70,
)),
];
let (server, _completions) = start_streaming_sse_server(vec![
sampling_chunks(1, "first answer", gate_first_completed_rx),
compact_chunks(1),
sampling_chunks(2, "processed second prompt", gate_second_completed_rx),
compact_chunks(2),
sampling_chunks(3, "processed third prompt", gate_third_completed_rx),
compact_chunks(3),
sampling_chunks(4, "processed fourth prompt", gate_fourth_completed_rx),
compact_chunks(4),
final_chunks,
])
.await;
let codex = test_codex()
.with_model("gpt-5.4")
.with_config(|config| {
config.model_provider.name = "OpenAI (test)".to_string();
config.model_provider.supports_websockets = false;
config.model_auto_compact_token_limit = Some(200);
})
.build_with_streaming_server(&server)
.await
.unwrap_or_else(|err| panic!("build streaming Codex test session: {err}"))
.codex;
submit_user_input(&codex, "first prompt").await;
wait_for_agent_message(&codex, "first answer").await;
steer_user_input(&codex, "second prompt").await;
let _ = gate_first_completed_tx.send(());
wait_for_agent_message(&codex, "processed second prompt").await;
steer_user_input(&codex, "third prompt").await;
let _ = gate_second_completed_tx.send(());
wait_for_agent_message(&codex, "processed third prompt").await;
steer_user_input(&codex, "fourth prompt").await;
let _ = gate_third_completed_tx.send(());
wait_for_agent_message(&codex, "processed fourth prompt").await;
steer_user_input(&codex, "fifth prompt").await;
let _ = gate_fourth_completed_tx.send(());
wait_for_agent_message(&codex, "processed fifth prompt").await;
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_eq!(
requests.len(),
9,
"four auto compactions should be allowed when separated by accepted steered input"
);
let final_body: Value =
from_slice(&requests[8]).unwrap_or_else(|err| panic!("parse final request: {err}"));
let final_user_texts = message_input_texts(&final_body, "user");
assert!(
final_user_texts.iter().any(|text| text == "fifth prompt"),
"steered input after the fourth compaction should be processed"
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steered_user_input_is_discarded_when_auto_compact_guard_fires() {
let (gate_fourth_completed_tx, gate_fourth_completed_rx) = oneshot::channel();
let token_count_used = 270_000;
let token_count_used_after_compaction = 80_000;
let tool_sampling_chunks = |idx: usize| {
vec![
chunk(ev_response_created(&format!("resp-{idx}"))),
chunk(ev_shell_command_call(
&format!("call-{idx}"),
&format!("echo step-{idx}"),
)),
chunk(ev_completed_with_tokens(
&format!("resp-{idx}"),
token_count_used,
)),
]
};
let compact_chunks = |idx: usize| {
vec![
chunk(ev_response_created(&format!("resp-compact-{idx}"))),
chunk(ev_message_item_done(
&format!("msg-compact-{idx}"),
&format!("AUTO_COMPACT_SUMMARY_{idx}"),
)),
chunk(ev_completed_with_tokens(
&format!("resp-compact-{idx}"),
token_count_used_after_compaction,
)),
]
};
let fourth_sampling_chunks = vec![
chunk(ev_response_created("resp-4")),
chunk(ev_message_item_added("msg-4", "")),
chunk(ev_output_text_delta("ready for late steer")),
chunk(ev_message_item_done("msg-4", "ready for late steer")),
gated_chunk(
gate_fourth_completed_rx,
vec![ev_completed_with_tokens("resp-4", token_count_used)],
),
];
let next_turn_chunks = vec![
chunk(ev_response_created("resp-next")),
chunk(ev_message_item_done("msg-next", "processed next prompt")),
chunk(ev_completed_with_tokens(
"resp-next",
/*total_tokens*/ 70,
)),
];
let mut responses = Vec::new();
for idx in 1..=3 {
responses.push(tool_sampling_chunks(idx));
responses.push(compact_chunks(idx));
}
responses.extend([fourth_sampling_chunks, compact_chunks(4), next_turn_chunks]);
let (server, _completions) = start_streaming_sse_server(responses).await;
let test = test_codex()
.with_model("gpt-5.4")
.with_config(|config| {
config.model_provider.name = "OpenAI (test)".to_string();
config.model_provider.supports_websockets = false;
config.model_auto_compact_token_limit = Some(200_000);
})
.build_with_streaming_server(&server)
.await
.unwrap_or_else(|err| panic!("build streaming Codex test session: {err}"));
let codex = test.codex.clone();
submit_danger_full_access_user_turn(&test, "first prompt").await;
wait_for_agent_message(&codex, "ready for late steer").await;
steer_user_input(&codex, "late steer").await;
let _ = gate_fourth_completed_tx.send(());
wait_for_turn_complete(&codex).await;
assert_eq!(
server.requests().await.len(),
7,
"guarded turn should stop before dispatching a fourth automatic compaction"
);
submit_user_input(&codex, "next prompt").await;
wait_for_agent_message(&codex, "processed next prompt").await;
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_eq!(
requests.len(),
9,
"next turn should compact and sample without replaying the discarded steer"
);
let all_user_texts: Vec<String> = requests
.iter()
.flat_map(|request| {
let body: Value =
from_slice(request).unwrap_or_else(|err| panic!("parse request body: {err}"));
message_input_texts(&body, "user")
})
.collect();
assert!(
!all_user_texts.iter().any(|text| text == "late steer"),
"late steer should be discarded after the terminal compaction guard"
);
assert!(
all_user_texts.iter().any(|text| text == "next prompt"),
"new input after the terminal guard should still start a normal turn"
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steered_user_input_waits_when_tool_output_triggers_compact_before_next_request() {
let (gate_first_completed_tx, gate_first_completed_rx) = oneshot::channel();