Compare commits

...

2 Commits

Author SHA1 Message Date
Ahmed Ibrahim
4b0e7ac4e7 Propagate auto-compact errors with Result 2026-02-06 10:21:59 -08:00
Ahmed Ibrahim
d839bfe680 Stop auto-compact loop on compact failure 2026-02-06 10:14:14 -08:00
5 changed files with 115 additions and 27 deletions

View File

@@ -3654,8 +3654,12 @@ pub(crate) async fn run_turn(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, event).await;
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(&sess, &turn_context).await;
if total_usage_tokens >= auto_compact_limit
&& let Err(e) = run_auto_compact(&sess, &turn_context).await
{
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return None;
}
let skills_outcome = Some(
@@ -3837,7 +3841,11 @@ pub(crate) async fn run_turn(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
run_auto_compact(&sess, &turn_context).await;
if let Err(e) = run_auto_compact(&sess, &turn_context).await {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
break;
}
continue;
}
@@ -3895,12 +3903,16 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> crate::error::Result<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
} else {
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
}
Ok(())
}
fn filter_connectors_for_input(

View File

@@ -40,7 +40,7 @@ pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bo
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
text: prompt,
@@ -48,7 +48,7 @@ pub(crate) async fn run_inline_auto_compact_task(
text_elements: Vec::new(),
}];
run_compact_task_inner(sess, turn_context, input).await;
run_compact_task_inner(sess, turn_context, input).await
}
pub(crate) async fn run_compact_task(
@@ -61,14 +61,17 @@ pub(crate) async fn run_compact_task(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await;
if let Err(err) = run_compact_task_inner(sess.clone(), turn_context.clone(), input).await {
let event = EventMsg::Error(err.to_error_event(None));
sess.send_event(&turn_context, event).await;
}
}
async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
@@ -142,9 +145,7 @@ async fn run_compact_task_inner(
}
break;
}
Err(CodexErr::Interrupted) => {
return;
}
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
@@ -157,9 +158,7 @@ async fn run_compact_task_inner(
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return;
return Err(e);
}
Err(e) => {
if retries < max_retries {
@@ -174,9 +173,7 @@ async fn run_compact_task_inner(
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return;
return Err(e);
}
}
}
@@ -211,6 +208,7 @@ async fn run_compact_task_inner(
message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(),
});
sess.send_event(&turn_context, warning).await;
Ok(())
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {

View File

@@ -19,8 +19,8 @@ use tracing::info;
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
run_remote_compact_task_inner(&sess, &turn_context).await;
) -> CodexResult<()> {
run_remote_compact_task_inner(&sess, &turn_context).await
}
pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Arc<TurnContext>) {
@@ -30,18 +30,22 @@ pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Ar
});
sess.send_event(&turn_context, start_event).await;
run_remote_compact_task_inner(&sess, &turn_context).await;
}
async fn run_remote_compact_task_inner(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
if let Err(err) = run_remote_compact_task_inner(&sess, &turn_context).await {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
sess.send_event(turn_context, event).await;
sess.send_event(&turn_context, event).await;
}
}
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
run_remote_compact_task_inner_impl(sess, turn_context).await?;
Ok(())
}
async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,

View File

@@ -2226,6 +2226,79 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_failure_stops_follow_up_sampling_loop() {
skip_if_no_network!();
let server = start_mock_server().await;
let context_window = 100;
let limit = context_window * 90 / 100;
let over_limit_tokens = context_window * 95 / 100 + 1;
let first_turn = sse(vec![
ev_function_call(DUMMY_CALL_ID, DUMMY_FUNCTION_NAME, "{}"),
ev_completed_with_tokens("r1", over_limit_tokens),
]);
let follow_up_turn = sse(vec![
ev_assistant_message("m2", "SHOULD_NOT_BE_REQUESTED"),
ev_completed_with_tokens("r2", 1),
]);
mount_sse_once(&server, first_turn).await;
let follow_up_mock = mount_sse_once(&server, follow_up_turn).await;
let compact_mock =
mount_compact_json_once(&server, serde_json::json!({ "bad": "shape" })).await;
let codex = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
set_test_compact_prompt(config);
config.model_context_window = Some(context_window);
config.model_auto_compact_token_limit = Some(limit);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: FUNCTION_CALL_LIMIT_MSG.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
let error_message = wait_for_event_match(&codex, |msg| match msg {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
assert!(
!error_message.contains("Error running remote compact task"),
"expected propagated compact error instead of wrapper message, got: {error_message}"
);
assert!(
!error_message.is_empty(),
"expected non-empty compact error message"
);
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
assert_eq!(
compact_mock.requests().len(),
1,
"auto compact should run once before failing"
);
assert!(
follow_up_mock.requests().is_empty(),
"turn should stop after auto compact failure and avoid follow-up sampling"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
skip_if_no_network!();

View File

@@ -245,6 +245,7 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(2_000);
config.model_auto_compact_token_limit = Some(i64::MAX);
}),
)
.await?;