Propagate auto-compact errors with Result

This commit is contained in:
Ahmed Ibrahim
2026-02-06 10:21:59 -08:00
parent d839bfe680
commit 4b0e7ac4e7
4 changed files with 41 additions and 31 deletions

View File

@@ -3654,7 +3654,11 @@ pub(crate) async fn run_turn(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, event).await;
if total_usage_tokens >= auto_compact_limit && !run_auto_compact(&sess, &turn_context).await {
if total_usage_tokens >= auto_compact_limit
&& let Err(e) = run_auto_compact(&sess, &turn_context).await
{
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return None;
}
@@ -3837,7 +3841,9 @@ pub(crate) async fn run_turn(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
if !run_auto_compact(&sess, &turn_context).await {
if let Err(e) = run_auto_compact(&sess, &turn_context).await {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
break;
}
continue;
@@ -3897,12 +3903,16 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) -> bool {
async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> crate::error::Result<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
} else {
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
}
Ok(())
}
fn filter_connectors_for_input(

View File

@@ -40,7 +40,7 @@ pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bo
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) -> bool {
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
text: prompt,
@@ -61,14 +61,17 @@ pub(crate) async fn run_compact_task(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
let _ = run_compact_task_inner(sess.clone(), turn_context, input).await;
if let Err(err) = run_compact_task_inner(sess.clone(), turn_context.clone(), input).await {
let event = EventMsg::Error(err.to_error_event(None));
sess.send_event(&turn_context, event).await;
}
}
async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) -> bool {
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
@@ -142,9 +145,7 @@ async fn run_compact_task_inner(
}
break;
}
Err(CodexErr::Interrupted) => {
return false;
}
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
@@ -157,9 +158,7 @@ async fn run_compact_task_inner(
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return false;
return Err(e);
}
Err(e) => {
if retries < max_retries {
@@ -174,9 +173,7 @@ async fn run_compact_task_inner(
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return false;
return Err(e);
}
}
}
@@ -211,7 +208,7 @@ async fn run_compact_task_inner(
message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(),
});
sess.send_event(&turn_context, warning).await;
true
Ok(())
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {

View File

@@ -19,7 +19,7 @@ use tracing::info;
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) -> bool {
) -> CodexResult<()> {
run_remote_compact_task_inner(&sess, &turn_context).await
}
@@ -30,21 +30,20 @@ pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Ar
});
sess.send_event(&turn_context, start_event).await;
let _ = run_remote_compact_task_inner(&sess, &turn_context).await;
if let Err(err) = run_remote_compact_task_inner(&sess, &turn_context).await {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
sess.send_event(&turn_context, event).await;
}
}
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> bool {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
sess.send_event(turn_context, event).await;
return false;
}
true
) -> CodexResult<()> {
run_remote_compact_task_inner_impl(sess, turn_context).await?;
Ok(())
}
async fn run_remote_compact_task_inner_impl(

View File

@@ -2279,8 +2279,12 @@ async fn auto_compact_failure_stops_follow_up_sampling_loop() {
})
.await;
assert!(
error_message.contains("Error running remote compact task"),
"expected remote auto compact failure message, got: {error_message}"
!error_message.contains("Error running remote compact task"),
"expected propagated compact error instead of wrapper message, got: {error_message}"
);
assert!(
!error_message.is_empty(),
"expected non-empty compact error message"
);
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;