Compare commits

...

10 Commits

Author SHA1 Message Date
Ahmed Ibrahim
093406c67f feedback 2025-10-17 08:06:14 -07:00
Ahmed Ibrahim
daf762f607 tests 2025-10-16 22:23:57 -07:00
Ahmed Ibrahim
cafd08ab41 tests 2025-10-16 17:23:20 -07:00
Ahmed Ibrahim
a4e042b671 tests 2025-10-16 17:22:54 -07:00
Ahmed Ibrahim
2994b63fe5 tests 2025-10-16 17:19:36 -07:00
Ahmed Ibrahim
0cf9bd6aa7 tests 2025-10-16 17:12:14 -07:00
Ahmed Ibrahim
f2444893ca tests 2025-10-16 17:05:34 -07:00
Ahmed Ibrahim
d4e59dedd8 fix_compact 2025-10-16 16:47:42 -07:00
Ahmed Ibrahim
e0f7c32217 fix_compact 2025-10-16 16:27:15 -07:00
Ahmed Ibrahim
578a6bc9e1 fix_compact 2025-10-16 16:09:56 -07:00
3 changed files with 495 additions and 41 deletions

View File

@@ -71,13 +71,15 @@ async fn run_compact_task_inner(
input: Vec<InputItem>,
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut turn_input = sess
.turn_input_with_history(vec![initial_input_for_turn.clone().into()])
.await;
// Track the items we append for this compact prompt so trimming does not drop them.
let extra_items: Vec<ResponseItem> = vec![initial_input_for_turn.clone().into()];
let mut turn_input = sess.turn_input_with_history(extra_items.clone()).await;
let mut truncated_count = 0usize;
let mut trimmed_tails: Vec<Vec<ResponseItem>> = Vec::new();
let max_retries = turn_context.client.get_provider().stream_max_retries();
let mut retries = 0;
let mut context_retries = 0;
let mut stream_retries = 0;
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(),
@@ -114,11 +116,32 @@ async fn run_compact_task_inner(
return;
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input.len() > 1 {
turn_input.remove(0);
truncated_count += 1;
retries = 0;
continue;
// Drop the most recent user turn (its message plus ensuing traffic) and retry.
if turn_input.len() > extra_items.len() {
let history_len = turn_input.len() - extra_items.len();
let mut prompt_items = turn_input.split_off(history_len);
let trimmed = trim_recent_history_to_previous_user_message(&mut turn_input);
turn_input.append(&mut prompt_items);
if !trimmed.is_empty() {
truncated_count += trimmed.len();
trimmed_tails.push(trimmed);
if context_retries >= max_retries {
sess.set_total_tokens_full(&sub_id, turn_context.as_ref())
.await;
let event = Event {
id: sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: e.to_string(),
}),
};
sess.send_event(event).await;
return;
}
context_retries += 1;
stream_retries = 0;
// Keep stream retry budget untouched; we trimmed context successfully.
continue;
}
}
sess.set_total_tokens_full(&sub_id, turn_context.as_ref())
.await;
@@ -132,12 +155,12 @@ async fn run_compact_task_inner(
return;
}
Err(e) => {
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
if stream_retries < max_retries {
stream_retries += 1;
let delay = backoff(stream_retries);
sess.notify_stream_error(
&sub_id,
format!("Re-connecting... {retries}/{max_retries}"),
format!("Re-connecting... {stream_retries}/{max_retries}"),
)
.await;
tokio::time::sleep(delay).await;
@@ -160,7 +183,10 @@ async fn run_compact_task_inner(
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
let user_messages = collect_user_messages(&history_snapshot);
let initial_context = sess.build_initial_context(turn_context.as_ref());
let new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
for mut trimmed in trimmed_tails.into_iter().rev() {
new_history.append(&mut trimmed);
}
sess.replace_history(new_history).await;
let rollout_item = RolloutItem::Compacted(CompactedItem {
@@ -177,6 +203,27 @@ async fn run_compact_task_inner(
sess.send_event(event).await;
}
/// Trim conversation history back to the previous user message boundary, removing that user turn.
///
/// Returns the removed items in their original order so they can be restored later.
fn trim_recent_history_to_previous_user_message(
turn_input: &mut Vec<ResponseItem>,
) -> Vec<ResponseItem> {
if turn_input.is_empty() {
return Vec::new();
}
if let Some(last_user_index) = turn_input.iter().rposition(|item| {
matches!(
item,
ResponseItem::Message { role, .. } if role == "user"
)
}) {
turn_input.split_off(last_user_index)
} else {
std::mem::take(turn_input)
}
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
let mut pieces = Vec::new();
for item in content {

View File

@@ -239,6 +239,20 @@ pub fn ev_apply_patch_function_call(call_id: &str, patch: &str) -> Value {
})
}
pub fn ev_function_call_output(call_id: &str, content: &str) -> Value {
serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "function_call_output",
"call_id": call_id,
"output": {
"content": content,
"success": true
}
}
})
}
pub fn sse_failed(id: &str, code: &str, message: &str) -> String {
sse(vec![serde_json::json!({
"type": "response.failed",

View File

@@ -19,17 +19,20 @@ use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_function_call_output;
use core_test_support::responses::mount_sse_once_match;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::sse_failed;
use core_test_support::responses::start_mock_server;
use pretty_assertions::assert_eq;
use serde_json::Value;
// --- Test helpers -----------------------------------------------------------
pub(super) const FIRST_REPLY: &str = "FIRST_REPLY";
pub(super) const SUMMARY_TEXT: &str = "SUMMARY_ONLY_CONTEXT";
const THIRD_USER_MSG: &str = "next turn";
const THIRD_ASSISTANT_MSG: &str = "post compact assistant";
const AUTO_SUMMARY_TEXT: &str = "AUTO_SUMMARY";
const FIRST_AUTO_MSG: &str = "token limit start";
const SECOND_AUTO_MSG: &str = "token limit push";
@@ -644,6 +647,10 @@ async fn manual_compact_retries_after_context_window_error() {
ev_assistant_message("m2", SUMMARY_TEXT),
ev_completed("r2"),
]);
let third_turn = sse(vec![
ev_assistant_message("m3", THIRD_ASSISTANT_MSG),
ev_completed("r3"),
]);
let request_log = mount_sse_sequence(
&server,
@@ -651,6 +658,7 @@ async fn manual_compact_retries_after_context_window_error() {
user_turn.clone(),
compact_failed.clone(),
compact_succeeds.clone(),
third_turn,
],
)
.await;
@@ -688,17 +696,29 @@ async fn manual_compact_retries_after_context_window_error() {
panic!("expected background event after compact retry");
};
assert!(
event.message.contains("Trimmed 1 older conversation item"),
event
.message
.contains("Trimmed 2 older conversation item(s)"),
"background event should mention trimmed item count: {}",
event.message
);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: THIRD_USER_MSG.into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = request_log.requests();
assert_eq!(
requests.len(),
3,
"expected user turn and two compact attempts"
4,
"expected user turn, two compact attempts, and one follow-up turn"
);
let compact_attempt = requests[1].body_json();
@@ -710,42 +730,415 @@ async fn manual_compact_retries_after_context_window_error() {
let retry_input = retry_attempt["input"]
.as_array()
.unwrap_or_else(|| panic!("retry attempt missing input array: {retry_attempt}"));
assert_eq!(
compact_input
.last()
.and_then(|item| item.get("content"))
.and_then(|v| v.as_array())
fn extract_text(item: &Value) -> Option<String> {
item.get("content")
.and_then(Value::as_array)
.and_then(|items| items.first())
.and_then(|entry| entry.get("text"))
.and_then(|text| text.as_str()),
.and_then(Value::as_str)
.map(str::to_string)
}
assert_eq!(
extract_text(compact_input.last().expect("compact input empty")).as_deref(),
Some(SUMMARIZATION_PROMPT),
"compact attempt should include summarization prompt"
"compact attempt should include summarization prompt",
);
assert_eq!(
retry_input
.last()
.and_then(|item| item.get("content"))
.and_then(|v| v.as_array())
.and_then(|items| items.first())
.and_then(|entry| entry.get("text"))
.and_then(|text| text.as_str()),
extract_text(retry_input.last().expect("retry input empty")).as_deref(),
Some(SUMMARIZATION_PROMPT),
"retry attempt should include summarization prompt"
"retry attempt should include summarization prompt",
);
let contains_text = |items: &[Value], needle: &str| {
items
.iter()
.any(|item| extract_text(item).is_some_and(|text| text == needle))
};
assert!(
contains_text(compact_input, "first turn"),
"compact attempt should include original user message",
);
assert!(
contains_text(compact_input, FIRST_REPLY),
"compact attempt should include original assistant reply",
);
assert!(
!contains_text(retry_input, "first turn"),
"retry should drop original user message",
);
assert!(
!contains_text(retry_input, FIRST_REPLY),
"retry should drop assistant reply tied to original user message",
);
assert_eq!(
retry_input.len(),
compact_input.len().saturating_sub(1),
"retry should drop exactly one history item (before {} vs after {})",
compact_input.len().saturating_sub(retry_input.len()),
2,
"retry should drop the most recent user turn (before {} vs after {})",
compact_input.len(),
retry_input.len()
);
if let (Some(first_before), Some(first_after)) = (compact_input.first(), retry_input.first()) {
assert_ne!(
first_before, first_after,
"retry should drop the oldest conversation item"
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compact_trims_last_user_turn_with_function_calls_on_context_error() {
skip_if_no_network!();
// Scenario 1: ensure the retry trims the most recent turn when function calls are involved.
const FIRST_USER_MSG: &str = "first user turn";
const SECOND_USER_MSG: &str = "second user turn";
const FIRST_CALL_A: &str = "call-first-a";
const FIRST_CALL_B: &str = "call-first-b";
const SECOND_CALL_A: &str = "call-second-a";
const SECOND_CALL_B: &str = "call-second-b";
{
let server = start_mock_server().await;
let first_turn_initial = sse(vec![ev_function_call(FIRST_CALL_A, "tool.first.a", "{}")]);
let first_turn_second_call = sse(vec![
ev_function_call_output(FIRST_CALL_A, "first-call-a output"),
ev_function_call(FIRST_CALL_B, "tool.first.b", "{}"),
]);
let first_turn_complete = sse(vec![
ev_function_call_output(FIRST_CALL_B, "first-call-b output"),
ev_assistant_message("assistant-first", "first turn complete"),
ev_completed("resp-first"),
]);
let second_turn_initial = sse(vec![ev_function_call(SECOND_CALL_A, "tool.second.a", "{}")]);
let second_turn_second_call = sse(vec![
ev_function_call_output(SECOND_CALL_A, "second-call-a output"),
ev_function_call(SECOND_CALL_B, "tool.second.b", "{}"),
]);
let second_turn_complete = sse(vec![
ev_function_call_output(SECOND_CALL_B, "second-call-b output"),
ev_assistant_message("assistant-second", "second turn complete"),
ev_completed("resp-second"),
]);
let compact_failed = sse_failed(
"resp-fail",
"context_length_exceeded",
CONTEXT_LIMIT_MESSAGE,
);
let compact_retry = sse(vec![
ev_assistant_message("assistant-summary", SUMMARY_TEXT),
ev_completed("resp-summary"),
]);
let request_log = mount_sse_sequence(
&server,
vec![
first_turn_initial,
first_turn_second_call,
first_turn_complete,
second_turn_initial,
second_turn_second_call,
second_turn_complete,
compact_failed,
compact_retry,
],
)
.await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
let home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&home);
config.model_provider = model_provider;
config.model_auto_compact_token_limit = Some(200_000);
let codex = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"))
.new_conversation(config)
.await
.unwrap()
.conversation;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: FIRST_USER_MSG.into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: SECOND_USER_MSG.into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex.submit(Op::Compact).await.unwrap();
let EventMsg::BackgroundEvent(event) =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::BackgroundEvent(_))).await
else {
panic!("expected background event after compact retry");
};
assert!(
event
.message
.contains("Trimmed 2 older conversation item(s)"),
"background event should report trimming chunked user turn: {}",
event.message
);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = request_log.requests();
assert_eq!(
requests.len(),
8,
"expected two user turns (with tool call round-trips) followed by compact attempt + retry"
);
let compact_attempt = requests[6].body_json();
let retry_attempt = requests[7].body_json();
fn extract_text(item: &Value) -> Option<String> {
item.get("content")
.and_then(Value::as_array)
.and_then(|items| items.first())
.and_then(|entry| entry.get("text"))
.and_then(Value::as_str)
.map(str::to_string)
}
let contains_text = |items: &[Value], needle: &str| {
items
.iter()
.any(|item| extract_text(item).is_some_and(|text| text == needle))
};
assert!(
contains_text(
compact_attempt["input"].as_array().unwrap(),
SECOND_USER_MSG
),
"initial compact attempt should include most recent user message",
);
assert!(
!contains_text(retry_attempt["input"].as_array().unwrap(), SECOND_USER_MSG),
"retry should drop the most recent user message",
);
assert!(
contains_text(
compact_attempt["input"].as_array().unwrap(),
"second turn complete"
),
"initial compact attempt should include assistant reply for most recent turn",
);
assert!(
!contains_text(
retry_attempt["input"].as_array().unwrap(),
"second turn complete"
),
"retry should drop assistant reply for most recent turn",
);
assert_eq!(
compact_attempt["input"]
.as_array()
.unwrap()
.len()
.saturating_sub(retry_attempt["input"].as_array().unwrap().len()),
2,
"retry should drop the most recent user turn from the prompt",
);
let retry_call_ids: std::collections::HashSet<_> = retry_attempt["input"]
.as_array()
.unwrap()
.iter()
.filter_map(|item| item.get("call_id").and_then(|v| v.as_str()))
.collect();
assert!(
!retry_call_ids.contains(SECOND_CALL_A),
"retry should remove function call {SECOND_CALL_A}"
);
assert!(
!retry_call_ids.contains(SECOND_CALL_B),
"retry should remove function call {SECOND_CALL_B}"
);
}
// Scenario 2: after a retry succeeds, the trimmed turn is restored to history for the next user input.
{
const SIMPLE_FIRST_USER_MSG: &str = "first user turn";
const SIMPLE_FIRST_ASSISTANT_MSG: &str = "first assistant reply";
const SIMPLE_SECOND_USER_MSG: &str = "second user turn";
const SIMPLE_SECOND_ASSISTANT_MSG: &str = "second assistant reply";
const SIMPLE_THIRD_USER_MSG: &str = "post compact user";
const SIMPLE_THIRD_ASSISTANT_MSG: &str = "post compact assistant";
let server = start_mock_server().await;
let first_turn = sse(vec![
ev_assistant_message("assistant-first", SIMPLE_FIRST_ASSISTANT_MSG),
ev_completed("resp-first"),
]);
let second_turn = sse(vec![
ev_assistant_message("assistant-second", SIMPLE_SECOND_ASSISTANT_MSG),
ev_completed("resp-second"),
]);
let compact_failed = sse_failed(
"resp-fail",
"context_length_exceeded",
CONTEXT_LIMIT_MESSAGE,
);
let compact_retry = sse(vec![
ev_assistant_message("assistant-summary", SUMMARY_TEXT),
ev_completed("resp-summary"),
]);
let third_turn = sse(vec![
ev_assistant_message("assistant-third", SIMPLE_THIRD_ASSISTANT_MSG),
ev_completed("resp-third"),
]);
let request_log = mount_sse_sequence(
&server,
vec![
first_turn,
second_turn,
compact_failed,
compact_retry,
third_turn,
],
)
.await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
let home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&home);
config.model_provider = model_provider;
config.model_auto_compact_token_limit = Some(200_000);
let codex = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"))
.new_conversation(config)
.await
.unwrap()
.conversation;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: SIMPLE_FIRST_USER_MSG.into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: SIMPLE_SECOND_USER_MSG.into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex.submit(Op::Compact).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: SIMPLE_THIRD_USER_MSG.into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = request_log.requests();
assert_eq!(
requests.len(),
5,
"expected two user turns, two compact attempts, and a post-compact turn",
);
let retry_request = &requests[3];
let retry_body = retry_request.body_json();
let retry_input = retry_body
.get("input")
.and_then(Value::as_array)
.expect("retry request missing input array");
assert!(
retry_input.iter().all(|item| {
item.get("content")
.and_then(Value::as_array)
.and_then(|entries| entries.first())
.and_then(|entry| entry.get("text"))
.and_then(Value::as_str)
.map(|text| {
text != SIMPLE_SECOND_USER_MSG && text != SIMPLE_SECOND_ASSISTANT_MSG
})
.unwrap_or(true)
}),
"retry compact input should omit trimmed second turn",
);
let final_request = &requests[4];
let body = final_request.body_json();
let input_items = body
.get("input")
.and_then(Value::as_array)
.expect("final request missing input array");
fn message_index(items: &[Value], needle: &str) -> Option<usize> {
items.iter().position(|item| {
item.get("type").and_then(Value::as_str) == Some("message")
&& item
.get("content")
.and_then(Value::as_array)
.and_then(|entries| entries.first())
.and_then(|entry| entry.get("text"))
.and_then(Value::as_str)
.is_some_and(|text| text == needle)
})
}
let summary_index = input_items
.iter()
.position(|item| {
item.get("content")
.and_then(Value::as_array)
.and_then(|entries| entries.first())
.and_then(|entry| entry.get("text"))
.and_then(Value::as_str)
.map(|text| text.contains(SUMMARY_TEXT))
.unwrap_or(false)
})
.expect("final request should include summary bridge");
let second_user_index = message_index(input_items, SIMPLE_SECOND_USER_MSG)
.expect("trimmed second user message should remain in history");
let second_assistant_index = message_index(input_items, SIMPLE_SECOND_ASSISTANT_MSG)
.expect("trimmed assistant reply should remain in history");
let third_user_index = message_index(input_items, SIMPLE_THIRD_USER_MSG)
.expect("post-compact user turn should be present");
assert!(
summary_index < second_user_index,
"summary bridge should precede restored user message"
);
assert!(
second_user_index < second_assistant_index,
"restored user message should precede assistant reply"
);
assert!(
second_assistant_index < third_user_index,
"restored assistant reply should precede new user turn"
);
} else {
panic!("expected non-empty compact inputs");
}
}