mirror of
https://github.com/openai/codex.git
synced 2026-03-03 13:13:18 +00:00
Compare commits
6 Commits
fix/notify
...
fix/ws-mid
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f86a74b4f5 | ||
|
|
b20528ccba | ||
|
|
214abf86bb | ||
|
|
d6ca9cd092 | ||
|
|
d32d8d0124 | ||
|
|
5cf1dd10a2 |
@@ -3754,25 +3754,19 @@ pub(crate) async fn run_turn(
|
||||
needs_follow_up,
|
||||
last_agent_message: sampling_request_last_agent_message,
|
||||
} = sampling_request_output;
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
|
||||
|
||||
let estimated_token_count =
|
||||
sess.get_estimated_token_count(turn_context.as_ref()).await;
|
||||
|
||||
info!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
total_usage_tokens,
|
||||
estimated_token_count = ?estimated_token_count,
|
||||
let token_limit_reached = log_post_sampling_token_usage_and_maybe_compact(
|
||||
&sess,
|
||||
&turn_context,
|
||||
auto_compact_limit,
|
||||
token_limit_reached,
|
||||
needs_follow_up,
|
||||
"post sampling token usage"
|
||||
);
|
||||
true,
|
||||
"run_turn",
|
||||
)
|
||||
.await;
|
||||
|
||||
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
|
||||
// As long as compaction works well in getting us way below the token
|
||||
// limit, we should not worry about being in an infinite loop.
|
||||
if token_limit_reached && needs_follow_up {
|
||||
run_auto_compact(&sess, &turn_context).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -3838,6 +3832,38 @@ async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>)
|
||||
}
|
||||
}
|
||||
|
||||
async fn log_post_sampling_token_usage_and_maybe_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
auto_compact_limit: i64,
|
||||
needs_follow_up: bool,
|
||||
allow_auto_compact: bool,
|
||||
checkpoint: &'static str,
|
||||
) -> bool {
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
let estimated_token_count = sess.get_estimated_token_count(turn_context.as_ref()).await;
|
||||
let token_limit_reached = total_usage_tokens >= auto_compact_limit
|
||||
|| estimated_token_count.is_some_and(|count| count >= auto_compact_limit);
|
||||
|
||||
info!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
total_usage_tokens,
|
||||
estimated_token_count = ?estimated_token_count,
|
||||
auto_compact_limit,
|
||||
token_limit_reached,
|
||||
needs_follow_up,
|
||||
allow_auto_compact,
|
||||
checkpoint,
|
||||
"post sampling token usage"
|
||||
);
|
||||
|
||||
if allow_auto_compact && token_limit_reached && needs_follow_up {
|
||||
run_auto_compact(sess, turn_context).await;
|
||||
}
|
||||
|
||||
token_limit_reached
|
||||
}
|
||||
|
||||
fn filter_connectors_for_input(
|
||||
connectors: Vec<connectors::AppInfo>,
|
||||
input: &[ResponseItem],
|
||||
@@ -4465,12 +4491,23 @@ async fn drain_in_flight(
|
||||
in_flight: &mut FuturesOrdered<BoxFuture<'static, CodexResult<ResponseInputItem>>>,
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
auto_compact_limit: i64,
|
||||
needs_follow_up: bool,
|
||||
) -> CodexResult<()> {
|
||||
while let Some(res) = in_flight.next().await {
|
||||
match res {
|
||||
Ok(response_input) => {
|
||||
sess.record_conversation_items(&turn_context, &[response_input.into()])
|
||||
.await;
|
||||
log_post_sampling_token_usage_and_maybe_compact(
|
||||
&sess,
|
||||
&turn_context,
|
||||
auto_compact_limit,
|
||||
needs_follow_up,
|
||||
in_flight.is_empty(),
|
||||
"drain_in_flight",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
error_or_panic(format!("in-flight tool future failed during drain: {err}"));
|
||||
@@ -4546,6 +4583,10 @@ async fn try_run_sampling_request(
|
||||
let mut in_flight: FuturesOrdered<BoxFuture<'static, CodexResult<ResponseInputItem>>> =
|
||||
FuturesOrdered::new();
|
||||
let mut needs_follow_up = false;
|
||||
let auto_compact_limit = turn_context
|
||||
.model_info
|
||||
.auto_compact_token_limit()
|
||||
.unwrap_or(i64::MAX);
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
let mut active_item: Option<TurnItem> = None;
|
||||
let mut should_emit_turn_diff = false;
|
||||
@@ -4761,7 +4802,14 @@ async fn try_run_sampling_request(
|
||||
}
|
||||
};
|
||||
|
||||
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
|
||||
drain_in_flight(
|
||||
&mut in_flight,
|
||||
sess.clone(),
|
||||
turn_context.clone(),
|
||||
auto_compact_limit,
|
||||
needs_follow_up,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if should_emit_turn_diff {
|
||||
let unified_diff = {
|
||||
|
||||
@@ -126,7 +126,12 @@ fn trim_function_call_history_to_fit_context_window(
|
||||
let Some(last_item) = history.raw_items().last() else {
|
||||
break;
|
||||
};
|
||||
if !is_codex_generated_item(last_item) {
|
||||
// Keep a trailing tool call until its output is present; trimming the
|
||||
// call first can orphan a later-arriving output during mid-stream auto-compaction.
|
||||
if is_pending_tool_call_without_output(last_item, history.raw_items()) {
|
||||
break;
|
||||
}
|
||||
if !is_remote_compaction_trim_candidate(last_item) {
|
||||
break;
|
||||
}
|
||||
if !history.remove_last_item() {
|
||||
@@ -137,3 +142,105 @@ fn trim_function_call_history_to_fit_context_window(
|
||||
|
||||
deleted_items
|
||||
}
|
||||
|
||||
fn is_remote_compaction_trim_candidate(item: &ResponseItem) -> bool {
|
||||
is_codex_generated_item(item)
|
||||
|| matches!(
|
||||
item,
|
||||
ResponseItem::FunctionCall { .. }
|
||||
| ResponseItem::CustomToolCall { .. }
|
||||
| ResponseItem::LocalShellCall { .. }
|
||||
)
|
||||
}
|
||||
|
||||
fn is_pending_tool_call_without_output(item: &ResponseItem, items: &[ResponseItem]) -> bool {
|
||||
match item {
|
||||
ResponseItem::FunctionCall { call_id, .. } => !items.iter().any(|candidate| {
|
||||
matches!(
|
||||
candidate,
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: existing, ..
|
||||
} if existing == call_id
|
||||
)
|
||||
}),
|
||||
ResponseItem::CustomToolCall { call_id, .. } => !items.iter().any(|candidate| {
|
||||
matches!(
|
||||
candidate,
|
||||
ResponseItem::CustomToolCallOutput {
|
||||
call_id: existing, ..
|
||||
} if existing == call_id
|
||||
)
|
||||
}),
|
||||
ResponseItem::LocalShellCall { call_id, .. } => {
|
||||
let Some(call_id) = call_id.as_ref() else {
|
||||
return true;
|
||||
};
|
||||
!items.iter().any(|candidate| {
|
||||
matches!(
|
||||
candidate,
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: existing, ..
|
||||
} if existing == call_id
|
||||
)
|
||||
})
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::codex::make_session_and_context;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[tokio::test]
|
||||
async fn trim_keeps_trailing_function_call_without_output() {
|
||||
let (_session, mut turn_context) = make_session_and_context().await;
|
||||
turn_context.model_info.context_window = Some(200);
|
||||
turn_context.model_info.effective_context_window_percent = 100;
|
||||
|
||||
let mut history = ContextManager::new();
|
||||
history.record_items(
|
||||
&[
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "user question".to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
},
|
||||
ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
name: "shell_command".to_string(),
|
||||
arguments: serde_json::json!({
|
||||
"command": format!("echo {}", "x".repeat(2_000)),
|
||||
})
|
||||
.to_string(),
|
||||
call_id: "pending-call".to_string(),
|
||||
},
|
||||
],
|
||||
turn_context.truncation_policy,
|
||||
);
|
||||
|
||||
let deleted_items = trim_function_call_history_to_fit_context_window(
|
||||
&mut history,
|
||||
&turn_context,
|
||||
&BaseInstructions {
|
||||
text: "base".to_string(),
|
||||
},
|
||||
);
|
||||
|
||||
assert_eq!(deleted_items, 0);
|
||||
assert!(
|
||||
history
|
||||
.raw_items()
|
||||
.iter()
|
||||
.any(|item| matches!(item, ResponseItem::FunctionCall { call_id, .. } if call_id == "pending-call")),
|
||||
"expected trailing function_call without output to be preserved"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -237,6 +237,74 @@ async fn remote_compact_runs_automatically() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "windows", ignore)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_auto_compact_runs_after_multi_tool_output_drain() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let first_call_id = "first-large-call";
|
||||
let second_call_id = "second-small-call";
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(|config| {
|
||||
config.features.enable(Feature::RemoteCompaction);
|
||||
config.base_instructions = Some("test instructions".to_string());
|
||||
config.model_auto_compact_token_limit = Some(500);
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
responses::mount_sse_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
sse(vec![
|
||||
responses::ev_shell_command_call(first_call_id, "yes compact-me | head -c 9000"),
|
||||
responses::ev_shell_command_call(second_call_id, "printf small-output"),
|
||||
responses::ev_completed_with_tokens("resp-1", 10),
|
||||
]),
|
||||
sse(vec![
|
||||
responses::ev_assistant_message("m2", "after compact"),
|
||||
responses::ev_completed_with_tokens("resp-2", 10),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let compact_mock =
|
||||
responses::mount_compact_json_once(harness.server(), serde_json::json!({ "output": [] }))
|
||||
.await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "trigger remote compact during drain".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let compact_request = compact_mock.single_request();
|
||||
assert!(
|
||||
compact_request
|
||||
.function_call_output_text(first_call_id)
|
||||
.is_some(),
|
||||
"expected compaction request to include the first drained tool output"
|
||||
);
|
||||
assert!(
|
||||
compact_request
|
||||
.function_call_output_text(second_call_id)
|
||||
.is_some(),
|
||||
"expected compaction request to include the second drained tool output"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "windows", ignore)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {
|
||||
|
||||
Reference in New Issue
Block a user