Compare commits

...

6 Commits

Author SHA1 Message Date
Charles Cunningham
f86a74b4f5 Gate drain-time compaction on actual follow-up 2026-02-05 23:33:49 -08:00
Charles Cunningham
b20528ccba core: remove stream item auto-compact checkpoints 2026-02-05 20:18:16 -08:00
Charles Cunningham
214abf86bb Clarify pending-call trim guard in remote compaction 2026-02-05 18:55:42 -08:00
Charles Cunningham
d6ca9cd092 Preserve pending tool calls during remote trim 2026-02-05 17:06:51 -08:00
Charles Cunningham
d32d8d0124 Avoid compaction while tool outputs are still in flight 2026-02-05 17:03:24 -08:00
Charles Cunningham
5cf1dd10a2 Run auto-compaction checks during streaming 2026-02-05 16:38:38 -08:00
3 changed files with 240 additions and 17 deletions

View File

@@ -3754,25 +3754,19 @@ pub(crate) async fn run_turn(
needs_follow_up,
last_agent_message: sampling_request_last_agent_message,
} = sampling_request_output;
let total_usage_tokens = sess.get_total_token_usage().await;
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
let estimated_token_count =
sess.get_estimated_token_count(turn_context.as_ref()).await;
info!(
turn_id = %turn_context.sub_id,
total_usage_tokens,
estimated_token_count = ?estimated_token_count,
let token_limit_reached = log_post_sampling_token_usage_and_maybe_compact(
&sess,
&turn_context,
auto_compact_limit,
token_limit_reached,
needs_follow_up,
"post sampling token usage"
);
true,
"run_turn",
)
.await;
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
// As long as compaction works well in getting us way below the token
// limit, we should not worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
run_auto_compact(&sess, &turn_context).await;
continue;
}
@@ -3838,6 +3832,38 @@ async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>)
}
}
async fn log_post_sampling_token_usage_and_maybe_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
auto_compact_limit: i64,
needs_follow_up: bool,
allow_auto_compact: bool,
checkpoint: &'static str,
) -> bool {
let total_usage_tokens = sess.get_total_token_usage().await;
let estimated_token_count = sess.get_estimated_token_count(turn_context.as_ref()).await;
let token_limit_reached = total_usage_tokens >= auto_compact_limit
|| estimated_token_count.is_some_and(|count| count >= auto_compact_limit);
info!(
turn_id = %turn_context.sub_id,
total_usage_tokens,
estimated_token_count = ?estimated_token_count,
auto_compact_limit,
token_limit_reached,
needs_follow_up,
allow_auto_compact,
checkpoint,
"post sampling token usage"
);
if allow_auto_compact && token_limit_reached && needs_follow_up {
run_auto_compact(sess, turn_context).await;
}
token_limit_reached
}
fn filter_connectors_for_input(
connectors: Vec<connectors::AppInfo>,
input: &[ResponseItem],
@@ -4465,12 +4491,23 @@ async fn drain_in_flight(
in_flight: &mut FuturesOrdered<BoxFuture<'static, CodexResult<ResponseInputItem>>>,
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
auto_compact_limit: i64,
needs_follow_up: bool,
) -> CodexResult<()> {
while let Some(res) = in_flight.next().await {
match res {
Ok(response_input) => {
sess.record_conversation_items(&turn_context, &[response_input.into()])
.await;
log_post_sampling_token_usage_and_maybe_compact(
&sess,
&turn_context,
auto_compact_limit,
needs_follow_up,
in_flight.is_empty(),
"drain_in_flight",
)
.await;
}
Err(err) => {
error_or_panic(format!("in-flight tool future failed during drain: {err}"));
@@ -4546,6 +4583,10 @@ async fn try_run_sampling_request(
let mut in_flight: FuturesOrdered<BoxFuture<'static, CodexResult<ResponseInputItem>>> =
FuturesOrdered::new();
let mut needs_follow_up = false;
let auto_compact_limit = turn_context
.model_info
.auto_compact_token_limit()
.unwrap_or(i64::MAX);
let mut last_agent_message: Option<String> = None;
let mut active_item: Option<TurnItem> = None;
let mut should_emit_turn_diff = false;
@@ -4761,7 +4802,14 @@ async fn try_run_sampling_request(
}
};
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
drain_in_flight(
&mut in_flight,
sess.clone(),
turn_context.clone(),
auto_compact_limit,
needs_follow_up,
)
.await?;
if should_emit_turn_diff {
let unified_diff = {

View File

@@ -126,7 +126,12 @@ fn trim_function_call_history_to_fit_context_window(
let Some(last_item) = history.raw_items().last() else {
break;
};
if !is_codex_generated_item(last_item) {
// Keep a trailing tool call until its output is present; trimming the
// call first can orphan a later-arriving output during mid-stream auto-compaction.
if is_pending_tool_call_without_output(last_item, history.raw_items()) {
break;
}
if !is_remote_compaction_trim_candidate(last_item) {
break;
}
if !history.remove_last_item() {
@@ -137,3 +142,105 @@ fn trim_function_call_history_to_fit_context_window(
deleted_items
}
fn is_remote_compaction_trim_candidate(item: &ResponseItem) -> bool {
is_codex_generated_item(item)
|| matches!(
item,
ResponseItem::FunctionCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::LocalShellCall { .. }
)
}
fn is_pending_tool_call_without_output(item: &ResponseItem, items: &[ResponseItem]) -> bool {
match item {
ResponseItem::FunctionCall { call_id, .. } => !items.iter().any(|candidate| {
matches!(
candidate,
ResponseItem::FunctionCallOutput {
call_id: existing, ..
} if existing == call_id
)
}),
ResponseItem::CustomToolCall { call_id, .. } => !items.iter().any(|candidate| {
matches!(
candidate,
ResponseItem::CustomToolCallOutput {
call_id: existing, ..
} if existing == call_id
)
}),
ResponseItem::LocalShellCall { call_id, .. } => {
let Some(call_id) = call_id.as_ref() else {
return true;
};
!items.iter().any(|candidate| {
matches!(
candidate,
ResponseItem::FunctionCallOutput {
call_id: existing, ..
} if existing == call_id
)
})
}
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codex::make_session_and_context;
use codex_protocol::models::ContentItem;
use pretty_assertions::assert_eq;
#[tokio::test]
async fn trim_keeps_trailing_function_call_without_output() {
let (_session, mut turn_context) = make_session_and_context().await;
turn_context.model_info.context_window = Some(200);
turn_context.model_info.effective_context_window_percent = 100;
let mut history = ContextManager::new();
history.record_items(
&[
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "user question".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::FunctionCall {
id: None,
name: "shell_command".to_string(),
arguments: serde_json::json!({
"command": format!("echo {}", "x".repeat(2_000)),
})
.to_string(),
call_id: "pending-call".to_string(),
},
],
turn_context.truncation_policy,
);
let deleted_items = trim_function_call_history_to_fit_context_window(
&mut history,
&turn_context,
&BaseInstructions {
text: "base".to_string(),
},
);
assert_eq!(deleted_items, 0);
assert!(
history
.raw_items()
.iter()
.any(|item| matches!(item, ResponseItem::FunctionCall { call_id, .. } if call_id == "pending-call")),
"expected trailing function_call without output to be preserved"
);
}
}

View File

@@ -237,6 +237,74 @@ async fn remote_compact_runs_automatically() -> Result<()> {
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_auto_compact_runs_after_multi_tool_output_drain() -> Result<()> {
skip_if_no_network!(Ok(()));
let first_call_id = "first-large-call";
let second_call_id = "second-small-call";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
config.base_instructions = Some("test instructions".to_string());
config.model_auto_compact_token_limit = Some(500);
}),
)
.await?;
let codex = harness.test().codex.clone();
responses::mount_sse_sequence(
harness.server(),
vec![
sse(vec![
responses::ev_shell_command_call(first_call_id, "yes compact-me | head -c 9000"),
responses::ev_shell_command_call(second_call_id, "printf small-output"),
responses::ev_completed_with_tokens("resp-1", 10),
]),
sse(vec![
responses::ev_assistant_message("m2", "after compact"),
responses::ev_completed_with_tokens("resp-2", 10),
]),
],
)
.await;
let compact_mock =
responses::mount_compact_json_once(harness.server(), serde_json::json!({ "output": [] }))
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "trigger remote compact during drain".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_request = compact_mock.single_request();
assert!(
compact_request
.function_call_output_text(first_call_id)
.is_some(),
"expected compaction request to include the first drained tool output"
);
assert!(
compact_request
.function_call_output_text(second_call_id)
.is_some(),
"expected compaction request to include the second drained tool output"
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {