Fail agent loop eagerly on auto compaction failure

This commit is contained in:
Charles Cunningham
2026-02-06 12:08:16 -08:00
parent 36c16e0c58
commit fbaee85aa5
5 changed files with 241 additions and 24 deletions

View File

@@ -17,7 +17,7 @@ use crate::analytics_client::build_track_events_context;
use crate::compact;
use crate::compact::run_inline_auto_compact_task;
use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::compact_remote::run_remote_compaction;
use crate::connectors;
use crate::exec_policy::ExecPolicyManager;
use crate::features::FEATURES;
@@ -3654,8 +3654,11 @@ pub(crate) async fn run_turn(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, event).await;
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(&sess, &turn_context).await;
if total_usage_tokens >= auto_compact_limit
&& let Err(err) = run_auto_compact(&sess, &turn_context).await
{
emit_auto_compact_error(&sess, &turn_context, err).await;
return None;
}
let skills_outcome = Some(
@@ -3837,7 +3840,10 @@ pub(crate) async fn run_turn(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
run_auto_compact(&sess, &turn_context).await;
if let Err(err) = run_auto_compact(&sess, &turn_context).await {
emit_auto_compact_error(&sess, &turn_context, err).await;
break;
}
continue;
}
@@ -3895,14 +3901,28 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> crate::error::Result<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
run_remote_compaction(Arc::clone(sess), Arc::clone(turn_context)).await
} else {
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
Ok(())
}
}
async fn emit_auto_compact_error(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
err: CodexErr,
) {
let event =
EventMsg::Error(err.to_error_event(Some("Error running auto compact task".to_string())));
sess.send_event(turn_context, event).await;
}
fn filter_connectors_for_input(
connectors: Vec<connectors::AppInfo>,
input: &[ResponseItem],

View File

@@ -16,38 +16,32 @@ use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ResponseItem;
use tracing::info;
pub(crate) async fn run_inline_remote_auto_compact_task(
pub(crate) async fn run_user_requested_remote_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
run_remote_compact_task_inner(&sess, &turn_context).await;
}
pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Arc<TurnContext>) {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_remote_compact_task_inner(&sess, &turn_context).await;
}
async fn run_remote_compact_task_inner(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
if let Err(err) = run_remote_compaction(Arc::clone(&sess), Arc::clone(&turn_context)).await {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
sess.send_event(turn_context, event).await;
// This path is for manual `/compact`, where we report the error and keep the session
// alive. Auto-compact callers handle the error themselves to fail early.
sess.send_event(&turn_context, event).await;
}
}
async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
pub(crate) async fn run_remote_compaction(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(turn_context, &compaction_item)
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
let mut history = sess.clone_history().await;
let base_instructions = sess.get_base_instructions().await;
@@ -95,7 +89,7 @@ async fn run_remote_compact_task_inner_impl(
new_history.extend(ghost_snapshots);
}
sess.replace_history(new_history.clone()).await;
sess.recompute_token_usage(turn_context).await;
sess.recompute_token_usage(&turn_context).await;
let compacted_item = CompactedItem {
message: String::new(),
@@ -104,7 +98,7 @@ async fn run_remote_compact_task_inner_impl(
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
sess.emit_turn_item_completed(turn_context, compaction_item)
sess.emit_turn_item_completed(&turn_context, compaction_item)
.await;
Ok(())
}

View File

@@ -31,7 +31,7 @@ impl SessionTask for CompactTask {
1,
&[("type", "remote")],
);
crate::compact_remote::run_remote_compact_task(session, ctx).await
crate::compact_remote::run_user_requested_remote_compact_task(session, ctx).await
} else {
let _ = session.services.otel_manager.counter(
"codex.task.compact",

View File

@@ -788,6 +788,19 @@ pub async fn mount_compact_json_once(server: &MockServer, body: serde_json::Valu
response_mock
}
pub async fn mount_compact_response(
server: &MockServer,
response: ResponseTemplate,
max_times: u64,
) -> ResponseMock {
let (mock, response_mock) = compact_mock();
mock.respond_with(response)
.up_to_n_times(max_times)
.mount(server)
.await;
response_mock
}
pub async fn mount_models_once(server: &MockServer, body: ModelsResponse) -> ModelsMock {
let (mock, models_mock) = models_mock();
mock.respond_with(

View File

@@ -23,6 +23,8 @@ use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use wiremock::ResponseTemplate;
use wiremock::matchers::body_string_contains;
fn approx_token_count(text: &str) -> i64 {
i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX)
@@ -228,6 +230,194 @@ async fn remote_compact_runs_automatically() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_auto_compact_500_fails_turn_before_followup_sampling() -> Result<()> {
skip_if_no_network!(Ok(()));
let compact_retries = 2;
let second_turn_text = "this turn should fail before sampling";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config.model_auto_compact_token_limit = Some(200);
config.model_provider.request_max_retries = Some(compact_retries);
config.model_provider.supports_websockets = false;
}),
)
.await?;
let codex = harness.test().codex.clone();
let initial_sampling = mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("m1", "initial"),
responses::ev_completed_with_tokens("resp-1", 1_000_000),
]),
)
.await;
let follow_up_sampling = responses::mount_sse_once_match(
harness.server(),
body_string_contains(second_turn_text),
sse(vec![
responses::ev_assistant_message("m2", "should not run"),
responses::ev_completed("resp-2"),
]),
)
.await;
let compact_failure = responses::mount_compact_response(
harness.server(),
ResponseTemplate::new(500)
.insert_header("content-type", "application/json")
.set_body_json(serde_json::json!({
"error": {"type": "internal_server_error", "message": "synthetic 500"}
})),
compact_retries + 1,
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "seed turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_turn_text.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let EventMsg::Error(error) =
wait_for_event(&codex, |event| matches!(event, EventMsg::Error(_))).await
else {
panic!("expected error event");
};
assert!(
error.message.contains("Error running auto compact task"),
"expected auto-compact failure prefix in error message: {}",
error.message
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(initial_sampling.requests().len(), 1);
assert!(
follow_up_sampling.requests().is_empty(),
"follow-up sampling should never run when auto compact fails"
);
assert_eq!(
compact_failure.requests().len(),
usize::try_from(compact_retries + 1).expect("retry count fits usize"),
"retryable compact failures should use the request retry budget"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_auto_compact_context_error_does_not_retry_or_sample() -> Result<()> {
skip_if_no_network!(Ok(()));
let second_turn_text = "this turn should fail before sampling with context error";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
config.model_provider.request_max_retries = Some(4);
config.model_provider.supports_websockets = false;
}),
)
.await?;
let codex = harness.test().codex.clone();
let initial_sampling = mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("m1", "initial"),
responses::ev_completed_with_tokens("resp-1", 1_000_000),
]),
)
.await;
let follow_up_sampling = responses::mount_sse_once_match(
harness.server(),
body_string_contains(second_turn_text),
sse(vec![
responses::ev_assistant_message("m2", "should not run"),
responses::ev_completed("resp-2"),
]),
)
.await;
let compact_failure = responses::mount_compact_response(
harness.server(),
ResponseTemplate::new(400)
.insert_header("content-type", "application/json")
.set_body_json(serde_json::json!({
"error": {
"type": "context_length_exceeded",
"message": "Context window exceeded while compacting"
}
})),
10,
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "seed turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_turn_text.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let EventMsg::Error(error) =
wait_for_event(&codex, |event| matches!(event, EventMsg::Error(_))).await
else {
panic!("expected error event");
};
assert!(
error.message.contains("Error running auto compact task"),
"expected auto-compact failure prefix in error message: {}",
error.message
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(initial_sampling.requests().len(), 1);
assert!(
follow_up_sampling.requests().is_empty(),
"follow-up sampling should never run when auto compact fails"
);
assert_eq!(
compact_failure.requests().len(),
1,
"non-retryable compact failures should not be retried"
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {