Propagate inline auto-compact failures and simplify local compaction

This commit is contained in:
Charles Cunningham
2026-02-06 12:32:02 -08:00
parent fbaee85aa5
commit 14bb780e86
4 changed files with 92 additions and 16 deletions

View File

@@ -3908,8 +3908,7 @@ async fn run_auto_compact(
if should_use_remote_compact_task(&turn_context.provider) {
run_remote_compaction(Arc::clone(sess), Arc::clone(turn_context)).await
} else {
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
Ok(())
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await
}
}

View File

@@ -40,7 +40,7 @@ pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bo
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
text: prompt,
@@ -48,10 +48,10 @@ pub(crate) async fn run_inline_auto_compact_task(
text_elements: Vec::new(),
}];
run_compact_task_inner(sess, turn_context, input).await;
run_local_compaction(sess, turn_context, input).await
}
pub(crate) async fn run_compact_task(
pub(crate) async fn run_user_requested_local_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
@@ -61,14 +61,17 @@ pub(crate) async fn run_compact_task(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await;
if let Err(err) = run_local_compaction(sess.clone(), turn_context.clone(), input).await {
let event = EventMsg::Error(err.to_error_event(None));
sess.send_event(&turn_context, event).await;
}
}
async fn run_compact_task_inner(
async fn run_local_compaction(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
@@ -143,7 +146,7 @@ async fn run_compact_task_inner(
break;
}
Err(CodexErr::Interrupted) => {
return;
return Ok(());
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
@@ -157,9 +160,7 @@ async fn run_compact_task_inner(
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return;
return Err(e);
}
Err(e) => {
if retries < max_retries {
@@ -174,9 +175,7 @@ async fn run_compact_task_inner(
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return;
return Err(e);
}
}
}
@@ -211,6 +210,7 @@ async fn run_compact_task_inner(
message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(),
});
sess.send_event(&turn_context, warning).await;
Ok(())
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {

View File

@@ -38,7 +38,7 @@ impl SessionTask for CompactTask {
1,
&[("type", "local")],
);
crate::compact::run_compact_task(session, ctx, input).await
crate::compact::run_user_requested_local_compact_task(session, ctx, input).await
}
None

View File

@@ -30,6 +30,7 @@ use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::mount_compact_json_once;
use core_test_support::responses::mount_response_once_match;
use core_test_support::responses::mount_response_sequence;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_once_match;
@@ -41,6 +42,8 @@ use core_test_support::responses::start_mock_server;
use pretty_assertions::assert_eq;
use serde_json::json;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::body_string_contains;
// --- Test helpers -----------------------------------------------------------
pub(super) const FIRST_REPLY: &str = "FIRST_REPLY";
@@ -1433,6 +1436,80 @@ async fn auto_compact_starts_after_turn_started() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_failure_aborts_turn_before_sampling_for_local_provider() {
skip_if_no_network!();
let server = start_mock_server().await;
let second_turn_text = "this turn should fail before sampling";
let first_turn = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 1_000_000),
]);
let first_turn_mock = mount_sse_once(&server, first_turn).await;
let compact_failure_mock = mount_response_once_match(
&server,
body_string_contains(SUMMARIZATION_PROMPT),
ResponseTemplate::new(500)
.insert_header("content-type", "application/json")
.set_body_json(json!({
"error": {"type": "internal_server_error", "message": "synthetic compaction failure"}
})),
)
.await;
let model_provider = non_openai_model_provider(&server);
let mut builder = test_codex().with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200);
config.model_provider.stream_max_retries = Some(0);
});
let codex = builder.build(&server).await.unwrap().codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "seed turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_turn_text.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
let EventMsg::Error(error) =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::Error(_))).await
else {
panic!("expected error event");
};
assert!(
error.message.contains("Error running auto compact task"),
"expected auto-compact failure prefix in error message: {}",
error.message
);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(first_turn_mock.requests().len(), 1);
// The compact mock receives any unmatched `/responses` calls. Keeping this at 1
// verifies we only attempted the compact request and never proceeded to sampling.
assert_eq!(compact_failure_mock.requests().len(), 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() {
skip_if_no_network!();