mirror of
https://github.com/openai/codex.git
synced 2026-05-22 03:54:18 +00:00
prevent oversized turns from poisoning history
This commit is contained in:
@@ -18,12 +18,14 @@ use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE;
|
||||
use codex_app_server::INVALID_PARAMS_ERROR_CODE;
|
||||
use codex_app_server_protocol::ByteRange;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_app_server_protocol::CollabAgentStatus;
|
||||
use codex_app_server_protocol::CollabAgentTool;
|
||||
use codex_app_server_protocol::CollabAgentToolCallStatus;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
||||
use codex_app_server_protocol::CommandExecutionStatus;
|
||||
use codex_app_server_protocol::ErrorNotification;
|
||||
use codex_app_server_protocol::FileChangeApprovalDecision;
|
||||
use codex_app_server_protocol::FileChangePatchUpdatedNotification;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
|
||||
@@ -954,6 +956,117 @@ async fn turn_start_rejects_combined_oversized_text_input() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_oversized_input_emits_input_too_large_error_notification_v2() -> Result<()> {
|
||||
let server = create_mock_responses_server_sequence_unchecked(vec![
|
||||
create_final_assistant_message_sse_response("unexpected model request")?,
|
||||
])
|
||||
.await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&BTreeMap::default(),
|
||||
)?;
|
||||
let config_path = codex_home.path().join("config.toml");
|
||||
let config_toml = std::fs::read_to_string(&config_path)?;
|
||||
std::fs::write(
|
||||
&config_path,
|
||||
config_toml.replace(
|
||||
"model_provider = \"mock_provider\"\n",
|
||||
"model_provider = \"mock_provider\"\nmodel_context_window = 100\n",
|
||||
),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: format!("oversized app-server sentinel {}", "x ".repeat(1_000)),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let turn_start: TurnStartResponse = to_response(turn_resp)?;
|
||||
|
||||
let expected_message =
|
||||
"This message is too large to send. Split it into smaller chunks before retrying.";
|
||||
let mut error = None;
|
||||
let completed = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let message = mcp.read_next_message().await?;
|
||||
let JSONRPCMessage::Notification(notification) = message else {
|
||||
continue;
|
||||
};
|
||||
match notification.method.as_str() {
|
||||
"error" => {
|
||||
let params = notification.params.ok_or_else(|| {
|
||||
anyhow::anyhow!("error notifications must include params")
|
||||
})?;
|
||||
error = Some(serde_json::from_value::<ErrorNotification>(params)?);
|
||||
}
|
||||
"turn/completed" => {
|
||||
let params = notification.params.ok_or_else(|| {
|
||||
anyhow::anyhow!("turn/completed notifications must include params")
|
||||
})?;
|
||||
let completed: TurnCompletedNotification = serde_json::from_value(params)?;
|
||||
if completed.thread_id == thread.id && completed.turn.id == turn_start.turn.id {
|
||||
return Ok::<TurnCompletedNotification, anyhow::Error>(completed);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
|
||||
let error = error.expect("expected error notification before turn/completed");
|
||||
assert_eq!(error.thread_id, thread.id);
|
||||
assert_eq!(error.turn_id, turn_start.turn.id);
|
||||
assert!(!error.will_retry);
|
||||
assert_eq!(error.error.message, expected_message);
|
||||
assert_eq!(
|
||||
error.error.codex_error_info,
|
||||
Some(CodexErrorInfo::ContextWindowExceeded)
|
||||
);
|
||||
assert_eq!(completed.turn.status, TurnStatus::Failed);
|
||||
let completed_error = completed
|
||||
.turn
|
||||
.error
|
||||
.expect("failed turn should carry error");
|
||||
assert_eq!(completed_error.message, expected_message);
|
||||
assert_eq!(
|
||||
completed_error.codex_error_info,
|
||||
Some(CodexErrorInfo::ContextWindowExceeded)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_rejects_invalid_permission_selection_before_starting_turn() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
|
||||
@@ -508,7 +508,7 @@ fn estimate_encrypted_function_output_length(encoded_len: usize) -> usize {
|
||||
encoded_len.saturating_mul(9).div_ceil(16)
|
||||
}
|
||||
|
||||
fn estimate_item_token_count(item: &ResponseItem) -> i64 {
|
||||
pub(crate) fn estimate_item_token_count(item: &ResponseItem) -> i64 {
|
||||
let model_visible_bytes = estimate_response_item_model_visible_bytes(item);
|
||||
approx_tokens_from_byte_count_i64(model_visible_bytes)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,9 @@ pub(crate) mod updates;
|
||||
|
||||
pub(crate) use history::ContextManager;
|
||||
pub(crate) use history::TotalTokenUsageBreakdown;
|
||||
pub(crate) use history::estimate_item_token_count;
|
||||
pub(crate) use history::estimate_response_item_model_visible_bytes;
|
||||
pub(crate) use history::is_codex_generated_item;
|
||||
pub(crate) use history::is_user_turn_boundary;
|
||||
pub(crate) use history::truncate_function_output_payload;
|
||||
pub(crate) use normalize::strip_images_when_unsupported;
|
||||
|
||||
@@ -16,6 +16,8 @@ use crate::compact_remote::run_inline_remote_auto_compact_task;
|
||||
use crate::compact_remote_v2::run_inline_remote_auto_compact_task as run_inline_remote_auto_compact_task_v2;
|
||||
use crate::connectors;
|
||||
use crate::context::ContextualUserFragment;
|
||||
use crate::context_manager::estimate_item_token_count;
|
||||
use crate::context_manager::strip_images_when_unsupported;
|
||||
use crate::feedback_tags;
|
||||
use crate::goals::GoalRuntimeEvent;
|
||||
use crate::hook_runtime::inspect_pending_input;
|
||||
@@ -83,6 +85,7 @@ use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::MessagePhase;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::InputModality;
|
||||
use codex_protocol::protocol::AgentMessageContentDeltaEvent;
|
||||
use codex_protocol::protocol::AgentReasoningSectionBreakEvent;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
@@ -96,6 +99,7 @@ use codex_protocol::protocol::WarningEvent;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_tools::ToolName;
|
||||
use codex_tools::filter_request_plugin_install_discoverable_tools_for_client;
|
||||
use codex_utils_output_truncation::approx_token_count;
|
||||
use codex_utils_stream_parser::AssistantTextChunk;
|
||||
use codex_utils_stream_parser::AssistantTextStreamParser;
|
||||
use codex_utils_stream_parser::ProposedPlanSegment;
|
||||
@@ -136,32 +140,50 @@ pub(crate) async fn run_turn(
|
||||
prewarmed_client_session: Option<ModelClientSession>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let estimated_incoming_tokens =
|
||||
estimate_incoming_user_input_tokens(&input, &turn_context.model_info.input_modalities);
|
||||
if incoming_input_exceeds_context_window(turn_context.as_ref(), estimated_incoming_tokens) {
|
||||
send_oversized_input_error(&sess, &turn_context).await;
|
||||
return None;
|
||||
}
|
||||
let mut client_session =
|
||||
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
|
||||
// TODO(ccunningham): Pre-turn compaction runs before context updates and the
|
||||
// new user message are recorded. Estimate pending incoming items (context
|
||||
// diffs/full reinjection + user input) and trigger compaction preemptively
|
||||
// when they would push the thread over the compaction threshold.
|
||||
let pre_sampling_compact =
|
||||
match run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await {
|
||||
Ok(pre_sampling_compact) => pre_sampling_compact,
|
||||
Err(err) => {
|
||||
if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
|
||||
&& let Err(err) = sess
|
||||
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
|
||||
turn_context: turn_context.as_ref(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
warn!("failed to usage-limit active goal after usage-limit error: {err}");
|
||||
}
|
||||
error!("Failed to run pre-sampling compact");
|
||||
return None;
|
||||
let pre_sampling_compact = match run_pre_sampling_compact(
|
||||
&sess,
|
||||
&turn_context,
|
||||
&mut client_session,
|
||||
estimated_incoming_tokens,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(pre_sampling_compact) => pre_sampling_compact,
|
||||
Err(err) => {
|
||||
if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
|
||||
&& let Err(err) = sess
|
||||
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
|
||||
turn_context: turn_context.as_ref(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
warn!("failed to usage-limit active goal after usage-limit error: {err}");
|
||||
}
|
||||
};
|
||||
error!("Failed to run pre-sampling compact");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
if pre_sampling_compact.reset_client_session {
|
||||
client_session.reset_websocket_session();
|
||||
}
|
||||
if incoming_turn_exceeds_context_window(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
estimated_incoming_tokens,
|
||||
)
|
||||
.await
|
||||
{
|
||||
send_oversized_input_error(&sess, &turn_context).await;
|
||||
return None;
|
||||
}
|
||||
|
||||
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
|
||||
.await;
|
||||
@@ -298,8 +320,12 @@ pub(crate) async fn run_turn(
|
||||
can_drain_pending_input = true;
|
||||
let has_pending_input = sess.input_queue.has_pending_input(&sess.active_turn).await;
|
||||
let needs_follow_up = model_needs_follow_up || has_pending_input;
|
||||
let token_status =
|
||||
auto_compact_token_status(sess.as_ref(), turn_context.as_ref()).await;
|
||||
let token_status = auto_compact_token_status(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
/*estimated_incoming_tokens*/ 0,
|
||||
)
|
||||
.await;
|
||||
let token_limit_reached = token_status.token_limit_reached;
|
||||
|
||||
let estimated_token_count =
|
||||
@@ -640,7 +666,10 @@ struct PreSamplingCompactResult {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AutoCompactTokenStatus {
|
||||
// Current active context usage before adding any unrecorded incoming input.
|
||||
current_context_tokens: i64,
|
||||
// Full active context usage, independent of the configured auto-compact scope.
|
||||
// Includes any unrecorded incoming input passed to the projection helper.
|
||||
active_context_tokens: i64,
|
||||
// Usage counted against `model_auto_compact_token_limit` for the current scope.
|
||||
auto_compact_scope_tokens: i64,
|
||||
@@ -652,17 +681,40 @@ struct AutoCompactTokenStatus {
|
||||
token_limit_reached: bool,
|
||||
}
|
||||
|
||||
impl AutoCompactTokenStatus {
|
||||
fn auto_compact_scope_limit_reached(&self) -> bool {
|
||||
self.auto_compact_scope_tokens >= self.auto_compact_scope_limit
|
||||
}
|
||||
|
||||
fn current_context_window_limit_reached(&self) -> bool {
|
||||
self.full_context_window_limit
|
||||
.is_some_and(|full_context_window_limit| {
|
||||
self.current_context_tokens >= full_context_window_limit
|
||||
})
|
||||
}
|
||||
|
||||
fn should_compact_before_recording_incoming(&self) -> bool {
|
||||
self.current_context_window_limit_reached()
|
||||
|| (self.current_context_tokens > 0
|
||||
&& (self.auto_compact_scope_limit_reached()
|
||||
|| self.full_context_window_limit_reached))
|
||||
}
|
||||
}
|
||||
|
||||
async fn auto_compact_token_status(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
estimated_incoming_tokens: i64,
|
||||
) -> AutoCompactTokenStatus {
|
||||
let active_context_tokens = sess.get_total_token_usage().await;
|
||||
let current_context_tokens = sess.get_total_token_usage().await;
|
||||
let projected_active_context_tokens =
|
||||
current_context_tokens.saturating_add(estimated_incoming_tokens);
|
||||
let mut auto_compact_window_ordinal = None;
|
||||
let mut auto_compact_window_prefill_tokens = None;
|
||||
let (auto_compact_scope_tokens, auto_compact_scope_limit, full_context_window_limit) =
|
||||
match turn_context.config.model_auto_compact_token_limit_scope {
|
||||
AutoCompactTokenLimitScope::Total => (
|
||||
active_context_tokens,
|
||||
projected_active_context_tokens,
|
||||
turn_context
|
||||
.model_info
|
||||
.auto_compact_token_limit()
|
||||
@@ -673,9 +725,11 @@ async fn auto_compact_token_status(
|
||||
let window = sess.auto_compact_window_snapshot().await;
|
||||
auto_compact_window_ordinal = Some(window.ordinal);
|
||||
auto_compact_window_prefill_tokens = window.prefill_input_tokens;
|
||||
let baseline = window.prefill_input_tokens.unwrap_or(active_context_tokens);
|
||||
let baseline = window
|
||||
.prefill_input_tokens
|
||||
.unwrap_or(current_context_tokens);
|
||||
(
|
||||
active_context_tokens.saturating_sub(baseline),
|
||||
projected_active_context_tokens.saturating_sub(baseline),
|
||||
turn_context
|
||||
.config
|
||||
.model_auto_compact_token_limit
|
||||
@@ -687,13 +741,14 @@ async fn auto_compact_token_status(
|
||||
};
|
||||
let full_context_window_limit_reached =
|
||||
full_context_window_limit.is_some_and(|full_context_window_limit| {
|
||||
active_context_tokens >= full_context_window_limit
|
||||
projected_active_context_tokens >= full_context_window_limit
|
||||
});
|
||||
let token_limit_reached =
|
||||
auto_compact_scope_tokens >= auto_compact_scope_limit || full_context_window_limit_reached;
|
||||
|
||||
AutoCompactTokenStatus {
|
||||
active_context_tokens,
|
||||
current_context_tokens,
|
||||
active_context_tokens: projected_active_context_tokens,
|
||||
auto_compact_scope_tokens,
|
||||
auto_compact_scope_limit,
|
||||
full_context_window_limit,
|
||||
@@ -704,17 +759,99 @@ async fn auto_compact_token_status(
|
||||
}
|
||||
}
|
||||
|
||||
fn estimate_incoming_user_input_tokens(
|
||||
input: &[UserInput],
|
||||
input_modalities: &[InputModality],
|
||||
) -> i64 {
|
||||
if input.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.to_vec());
|
||||
let mut response_items = vec![ResponseItem::from(initial_input_for_turn)];
|
||||
strip_images_when_unsupported(input_modalities, &mut response_items);
|
||||
response_items
|
||||
.iter()
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add)
|
||||
}
|
||||
|
||||
fn incoming_input_exceeds_context_window(
|
||||
turn_context: &TurnContext,
|
||||
estimated_incoming_tokens: i64,
|
||||
) -> bool {
|
||||
if estimated_incoming_tokens <= 0 {
|
||||
return false;
|
||||
}
|
||||
let Some(model_context_window) = turn_context.model_context_window() else {
|
||||
return false;
|
||||
};
|
||||
estimated_incoming_tokens > model_context_window
|
||||
}
|
||||
|
||||
async fn incoming_turn_exceeds_context_window(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
estimated_incoming_tokens: i64,
|
||||
) -> bool {
|
||||
if estimated_incoming_tokens <= 0 {
|
||||
return false;
|
||||
}
|
||||
let Some(model_context_window) = turn_context.model_context_window() else {
|
||||
return false;
|
||||
};
|
||||
let estimated_history_tokens =
|
||||
estimate_history_item_tokens(sess, &turn_context.model_info.input_modalities).await;
|
||||
let base_instructions = sess.get_base_instructions().await;
|
||||
let estimated_base_instruction_tokens =
|
||||
i64::try_from(approx_token_count(&base_instructions.text)).unwrap_or(i64::MAX);
|
||||
let projected_tokens = estimated_history_tokens
|
||||
.saturating_add(estimated_base_instruction_tokens)
|
||||
.saturating_add(estimated_incoming_tokens);
|
||||
projected_tokens > model_context_window
|
||||
}
|
||||
|
||||
async fn estimate_history_item_tokens(sess: &Session, input_modalities: &[InputModality]) -> i64 {
|
||||
sess.clone_history()
|
||||
.await
|
||||
.for_prompt(input_modalities)
|
||||
.iter()
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add)
|
||||
}
|
||||
|
||||
async fn send_oversized_input_error(sess: &Session, turn_context: &TurnContext) {
|
||||
sess.send_event(
|
||||
turn_context,
|
||||
EventMsg::Error(ErrorEvent {
|
||||
message:
|
||||
"This message is too large to send. Split it into smaller chunks before retrying."
|
||||
.to_string(),
|
||||
codex_error_info: Some(CodexErrorInfo::ContextWindowExceeded),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn run_pre_sampling_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
client_session: &mut ModelClientSession,
|
||||
estimated_incoming_tokens: i64,
|
||||
) -> CodexResult<PreSamplingCompactResult> {
|
||||
let mut pre_sampling_compacted =
|
||||
maybe_run_previous_model_inline_compact(sess, turn_context, client_session).await?;
|
||||
let mut reset_client_session = pre_sampling_compacted;
|
||||
let token_status = auto_compact_token_status(sess.as_ref(), turn_context.as_ref()).await;
|
||||
// Compact if the configured auto-compaction budget or usable context window is exhausted.
|
||||
if token_status.token_limit_reached {
|
||||
let token_status = auto_compact_token_status(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
estimated_incoming_tokens,
|
||||
)
|
||||
.await;
|
||||
// Compact if current usage has exhausted the usable context window, or if
|
||||
// projected incoming input would exhaust the configured auto-compaction
|
||||
// budget. The incoming input is not recorded yet, so this gives compaction
|
||||
// a chance to make room before persistence.
|
||||
if token_status.should_compact_before_recording_incoming() {
|
||||
reset_client_session |= run_auto_compact(
|
||||
sess,
|
||||
turn_context,
|
||||
|
||||
@@ -13,6 +13,7 @@ use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ModelsResponse;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::HookEventName;
|
||||
use codex_protocol::protocol::HookRunStatus;
|
||||
@@ -75,6 +76,7 @@ const DUMMY_CALL_ID: &str = "call-multi-auto";
|
||||
const FUNCTION_CALL_LIMIT_MSG: &str = "function call limit push";
|
||||
const POST_AUTO_USER_MSG: &str = "post auto follow-up";
|
||||
const PRETURN_CONTEXT_DIFF_CWD: &str = "/tmp/PRETURN_CONTEXT_DIFF_CWD";
|
||||
const PRETURN_INCOMING_COMPACT_MSG: &str = "incoming pushes compaction";
|
||||
|
||||
pub(super) const COMPACT_WARNING_MESSAGE: &str = "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.";
|
||||
|
||||
@@ -1692,6 +1694,217 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pre_turn_auto_compact_accounts_for_incoming_user_input() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let first_turn = sse(vec![
|
||||
ev_assistant_message("m1", FIRST_REPLY),
|
||||
ev_completed_with_tokens("r1", /*total_tokens*/ 190),
|
||||
]);
|
||||
let compact_turn = sse(vec![
|
||||
ev_assistant_message("m2", "PRETURN_INCOMING_SUMMARY"),
|
||||
ev_completed_with_tokens("r2", /*total_tokens*/ 80),
|
||||
]);
|
||||
let follow_up_turn = sse(vec![
|
||||
ev_assistant_message("m3", FINAL_REPLY),
|
||||
ev_completed_with_tokens("r3", /*total_tokens*/ 90),
|
||||
]);
|
||||
let request_log =
|
||||
mount_sse_sequence(&server, vec![first_turn, compact_turn, follow_up_turn]).await;
|
||||
|
||||
let model_provider = non_openai_model_provider(&server);
|
||||
let codex = test_codex()
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
set_test_compact_prompt(config);
|
||||
config.model_auto_compact_token_limit = Some(200);
|
||||
})
|
||||
.build(&server)
|
||||
.await
|
||||
.expect("build codex")
|
||||
.codex;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
environments: None,
|
||||
items: vec![UserInput::Text {
|
||||
text: FIRST_AUTO_MSG.into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
thread_settings: Default::default(),
|
||||
})
|
||||
.await
|
||||
.expect("submit first user input");
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
environments: None,
|
||||
items: vec![UserInput::Text {
|
||||
text: format!("{} {}", PRETURN_INCOMING_COMPACT_MSG, "x ".repeat(100)),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
thread_settings: Default::default(),
|
||||
})
|
||||
.await
|
||||
.expect("submit incoming user input");
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
3,
|
||||
"expected first turn, pre-turn compact, and post-compact follow-up"
|
||||
);
|
||||
let compact_body = requests[1].body_json().to_string();
|
||||
assert!(
|
||||
body_contains_text(&compact_body, SUMMARIZATION_PROMPT),
|
||||
"incoming input should trigger pre-turn compaction"
|
||||
);
|
||||
assert!(
|
||||
!compact_body.contains(PRETURN_INCOMING_COMPACT_MSG),
|
||||
"incoming user message should not be included in the compaction request"
|
||||
);
|
||||
|
||||
let follow_up_body = requests[2].body_json().to_string();
|
||||
assert!(
|
||||
follow_up_body.contains(PRETURN_INCOMING_COMPACT_MSG),
|
||||
"incoming user message should be sent after pre-turn compaction"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pre_turn_auto_compact_skips_empty_history_for_large_first_prompt() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let first_turn = sse(vec![
|
||||
ev_assistant_message("m1", FIRST_REPLY),
|
||||
ev_completed_with_tokens("r1", /*total_tokens*/ 90),
|
||||
]);
|
||||
let request_log = mount_sse_once(&server, first_turn).await;
|
||||
|
||||
let model_provider = non_openai_model_provider(&server);
|
||||
let codex = test_codex()
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
set_test_compact_prompt(config);
|
||||
config.model_auto_compact_token_limit = Some(200);
|
||||
config.model_context_window = Some(10_000);
|
||||
})
|
||||
.build(&server)
|
||||
.await
|
||||
.expect("build codex")
|
||||
.codex;
|
||||
|
||||
let large_first_prompt = format!("large-first-prompt-sentinel {}", "x ".repeat(2_000));
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
environments: None,
|
||||
items: vec![UserInput::Text {
|
||||
text: large_first_prompt.clone(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
thread_settings: Default::default(),
|
||||
})
|
||||
.await
|
||||
.expect("submit large first user input");
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
1,
|
||||
"large first prompt should sample directly without empty-history compaction"
|
||||
);
|
||||
let request_body = requests[0].body_json().to_string();
|
||||
assert!(
|
||||
body_contains_text(&request_body, &large_first_prompt),
|
||||
"large first prompt should be included in the sampling request"
|
||||
);
|
||||
assert!(
|
||||
!body_contains_text(&request_body, SUMMARIZATION_PROMPT),
|
||||
"empty history should not trigger a pre-turn compaction request"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn oversized_incoming_user_input_is_rejected_before_persistence() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let compact_turn = sse(vec![
|
||||
ev_assistant_message("m1", "EMPTY_COMPACT_SUMMARY"),
|
||||
ev_completed_with_tokens("r1", /*total_tokens*/ 10),
|
||||
]);
|
||||
let request_log = mount_sse_once(&server, compact_turn).await;
|
||||
|
||||
let model_provider = non_openai_model_provider(&server);
|
||||
let test = test_codex()
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
set_test_compact_prompt(config);
|
||||
config.model_context_window = Some(100);
|
||||
})
|
||||
.build(&server)
|
||||
.await
|
||||
.expect("build codex");
|
||||
let rollout_path = test
|
||||
.session_configured
|
||||
.rollout_path
|
||||
.clone()
|
||||
.expect("rollout path");
|
||||
let oversized_text = format!("oversized-input-sentinel {}", "x ".repeat(1_000));
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
environments: None,
|
||||
items: vec![UserInput::Text {
|
||||
text: oversized_text.clone(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
thread_settings: Default::default(),
|
||||
})
|
||||
.await
|
||||
.expect("submit oversized user input");
|
||||
|
||||
let error = wait_for_event_match(&test.codex, |event| match event {
|
||||
EventMsg::Error(error) => Some(error.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(
|
||||
error.codex_error_info,
|
||||
Some(CodexErrorInfo::ContextWindowExceeded)
|
||||
);
|
||||
assert_eq!(
|
||||
error.message,
|
||||
"This message is too large to send. Split it into smaller chunks before retrying."
|
||||
);
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
0,
|
||||
"oversized input should reject before pre-turn compaction or sampling"
|
||||
);
|
||||
let rollout = fs::read_to_string(&rollout_path).unwrap_or_default();
|
||||
assert!(
|
||||
!rollout.contains(&oversized_text),
|
||||
"oversized input should not be persisted to rollout"
|
||||
);
|
||||
}
|
||||
|
||||
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
|
||||
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
|
||||
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]
|
||||
@@ -2961,7 +3174,7 @@ async fn snapshot_request_shape_mid_turn_continuation_compaction() {
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let context_window = 100;
|
||||
let context_window = 8_000;
|
||||
let limit = context_window * 90 / 100;
|
||||
let over_limit_tokens = context_window * 95 / 100 + 1;
|
||||
|
||||
@@ -3068,8 +3281,8 @@ async fn auto_compact_clamps_config_limit_to_context_window() {
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let context_window = 100;
|
||||
let config_limit = 200;
|
||||
let context_window = 8_000;
|
||||
let config_limit = 20_000;
|
||||
let over_limit_tokens = context_window * 90 / 100 + 1;
|
||||
|
||||
let first_turn = sse(vec![
|
||||
@@ -3155,7 +3368,7 @@ async fn auto_compact_body_after_prefix_ignores_starting_window_prefix() {
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
set_test_compact_prompt(config);
|
||||
config.model_context_window = Some(1_000);
|
||||
config.model_context_window = Some(8_000);
|
||||
config.model_auto_compact_token_limit = Some(100);
|
||||
config.model_auto_compact_token_limit_scope =
|
||||
AutoCompactTokenLimitScope::BodyAfterPrefix;
|
||||
@@ -3306,7 +3519,7 @@ async fn auto_compact_body_after_prefix_still_caps_at_context_window() {
|
||||
]);
|
||||
let second_turn = sse(vec![
|
||||
ev_assistant_message("m2", SECOND_LARGE_REPLY),
|
||||
ev_completed_with_usage("r2", /*input_tokens*/ 98, /*output_tokens*/ 1),
|
||||
ev_completed_with_usage("r2", /*input_tokens*/ 7_998, /*output_tokens*/ 1),
|
||||
]);
|
||||
let auto_compact_turn = sse(vec![
|
||||
ev_assistant_message("m3", AUTO_SUMMARY_TEXT),
|
||||
@@ -3327,8 +3540,8 @@ async fn auto_compact_body_after_prefix_still_caps_at_context_window() {
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
set_test_compact_prompt(config);
|
||||
config.model_context_window = Some(100);
|
||||
config.model_auto_compact_token_limit = Some(200);
|
||||
config.model_context_window = Some(8_000);
|
||||
config.model_auto_compact_token_limit = Some(20_000);
|
||||
config.model_auto_compact_token_limit_scope =
|
||||
AutoCompactTokenLimitScope::BodyAfterPrefix;
|
||||
})
|
||||
|
||||
@@ -1091,7 +1091,7 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(|config| {
|
||||
config.model_context_window = Some(2_000);
|
||||
config.model_context_window = Some(8_000);
|
||||
config.model_auto_compact_token_limit = Some(200_000);
|
||||
}),
|
||||
)
|
||||
@@ -1213,7 +1213,7 @@ async fn auto_remote_compact_trims_function_call_history_to_fit_context_window()
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(|config| {
|
||||
config.model_context_window = Some(2_000);
|
||||
config.model_context_window = Some(8_000);
|
||||
config.model_auto_compact_token_limit = Some(200_000);
|
||||
}),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user