diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 66f704f83d..1bf58a3957 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -18,12 +18,14 @@ use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE; use codex_app_server::INVALID_PARAMS_ERROR_CODE; use codex_app_server_protocol::ByteRange; use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::CodexErrorInfo; use codex_app_server_protocol::CollabAgentStatus; use codex_app_server_protocol::CollabAgentTool; use codex_app_server_protocol::CollabAgentToolCallStatus; use codex_app_server_protocol::CommandExecutionApprovalDecision; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::CommandExecutionStatus; +use codex_app_server_protocol::ErrorNotification; use codex_app_server_protocol::FileChangeApprovalDecision; use codex_app_server_protocol::FileChangePatchUpdatedNotification; use codex_app_server_protocol::FileChangeRequestApprovalResponse; @@ -954,6 +956,117 @@ async fn turn_start_rejects_combined_oversized_text_input() -> Result<()> { Ok(()) } +#[tokio::test] +async fn turn_start_oversized_input_emits_input_too_large_error_notification_v2() -> Result<()> { + let server = create_mock_responses_server_sequence_unchecked(vec![ + create_final_assistant_message_sse_response("unexpected model request")?, + ]) + .await; + + let codex_home = TempDir::new()?; + create_config_toml( + codex_home.path(), + &server.uri(), + "never", + &BTreeMap::default(), + )?; + let config_path = codex_home.path().join("config.toml"); + let config_toml = std::fs::read_to_string(&config_path)?; + std::fs::write( + &config_path, + config_toml.replace( + "model_provider = \"mock_provider\"\n", + "model_provider = \"mock_provider\"\nmodel_context_window = 100\n", + ), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: format!("oversized app-server sentinel {}", "x ".repeat(1_000)), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let turn_start: TurnStartResponse = to_response(turn_resp)?; + + let expected_message = + "This message is too large to send. Split it into smaller chunks before retrying."; + let mut error = None; + let completed = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let message = mcp.read_next_message().await?; + let JSONRPCMessage::Notification(notification) = message else { + continue; + }; + match notification.method.as_str() { + "error" => { + let params = notification.params.ok_or_else(|| { + anyhow::anyhow!("error notifications must include params") + })?; + error = Some(serde_json::from_value::(params)?); + } + "turn/completed" => { + let params = notification.params.ok_or_else(|| { + anyhow::anyhow!("turn/completed notifications must include params") + })?; + let completed: TurnCompletedNotification = serde_json::from_value(params)?; + if completed.thread_id == thread.id && completed.turn.id == turn_start.turn.id { + return Ok::(completed); + } + } + _ => {} + } + } + }) + .await??; + + let error = error.expect("expected error notification before turn/completed"); + assert_eq!(error.thread_id, thread.id); + assert_eq!(error.turn_id, turn_start.turn.id); + assert!(!error.will_retry); + assert_eq!(error.error.message, expected_message); + assert_eq!( + error.error.codex_error_info, + Some(CodexErrorInfo::ContextWindowExceeded) + ); + assert_eq!(completed.turn.status, TurnStatus::Failed); + let completed_error = completed + .turn + .error + .expect("failed turn should carry error"); + assert_eq!(completed_error.message, expected_message); + assert_eq!( + completed_error.codex_error_info, + Some(CodexErrorInfo::ContextWindowExceeded) + ); + + Ok(()) +} + #[tokio::test] async fn turn_start_rejects_invalid_permission_selection_before_starting_turn() -> Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 6a05535488..d3c9255038 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -508,7 +508,7 @@ fn estimate_encrypted_function_output_length(encoded_len: usize) -> usize { encoded_len.saturating_mul(9).div_ceil(16) } -fn estimate_item_token_count(item: &ResponseItem) -> i64 { +pub(crate) fn estimate_item_token_count(item: &ResponseItem) -> i64 { let model_visible_bytes = estimate_response_item_model_visible_bytes(item); approx_tokens_from_byte_count_i64(model_visible_bytes) } diff --git a/codex-rs/core/src/context_manager/mod.rs b/codex-rs/core/src/context_manager/mod.rs index 9dd85d1ed9..248525ece2 100644 --- a/codex-rs/core/src/context_manager/mod.rs +++ b/codex-rs/core/src/context_manager/mod.rs @@ -4,7 +4,9 @@ pub(crate) mod updates; pub(crate) use history::ContextManager; pub(crate) use history::TotalTokenUsageBreakdown; +pub(crate) use history::estimate_item_token_count; pub(crate) use history::estimate_response_item_model_visible_bytes; pub(crate) use history::is_codex_generated_item; pub(crate) use history::is_user_turn_boundary; pub(crate) use history::truncate_function_output_payload; +pub(crate) use normalize::strip_images_when_unsupported; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 4ac511f4fa..6a87744269 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -16,6 +16,8 @@ use crate::compact_remote::run_inline_remote_auto_compact_task; use crate::compact_remote_v2::run_inline_remote_auto_compact_task as run_inline_remote_auto_compact_task_v2; use crate::connectors; use crate::context::ContextualUserFragment; +use crate::context_manager::estimate_item_token_count; +use crate::context_manager::strip_images_when_unsupported; use crate::feedback_tags; use crate::goals::GoalRuntimeEvent; use crate::hook_runtime::inspect_pending_input; @@ -83,6 +85,7 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::MessagePhase; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; +use codex_protocol::openai_models::InputModality; use codex_protocol::protocol::AgentMessageContentDeltaEvent; use codex_protocol::protocol::AgentReasoningSectionBreakEvent; use codex_protocol::protocol::CodexErrorInfo; @@ -96,6 +99,7 @@ use codex_protocol::protocol::WarningEvent; use codex_protocol::user_input::UserInput; use codex_tools::ToolName; use codex_tools::filter_request_plugin_install_discoverable_tools_for_client; +use codex_utils_output_truncation::approx_token_count; use codex_utils_stream_parser::AssistantTextChunk; use codex_utils_stream_parser::AssistantTextStreamParser; use codex_utils_stream_parser::ProposedPlanSegment; @@ -136,32 +140,50 @@ pub(crate) async fn run_turn( prewarmed_client_session: Option, cancellation_token: CancellationToken, ) -> Option { + let estimated_incoming_tokens = + estimate_incoming_user_input_tokens(&input, &turn_context.model_info.input_modalities); + if incoming_input_exceeds_context_window(turn_context.as_ref(), estimated_incoming_tokens) { + send_oversized_input_error(&sess, &turn_context).await; + return None; + } let mut client_session = prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session()); - // TODO(ccunningham): Pre-turn compaction runs before context updates and the - // new user message are recorded. Estimate pending incoming items (context - // diffs/full reinjection + user input) and trigger compaction preemptively - // when they would push the thread over the compaction threshold. - let pre_sampling_compact = - match run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await { - Ok(pre_sampling_compact) => pre_sampling_compact, - Err(err) => { - if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded - && let Err(err) = sess - .goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached { - turn_context: turn_context.as_ref(), - }) - .await - { - warn!("failed to usage-limit active goal after usage-limit error: {err}"); - } - error!("Failed to run pre-sampling compact"); - return None; + let pre_sampling_compact = match run_pre_sampling_compact( + &sess, + &turn_context, + &mut client_session, + estimated_incoming_tokens, + ) + .await + { + Ok(pre_sampling_compact) => pre_sampling_compact, + Err(err) => { + if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded + && let Err(err) = sess + .goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached { + turn_context: turn_context.as_ref(), + }) + .await + { + warn!("failed to usage-limit active goal after usage-limit error: {err}"); } - }; + error!("Failed to run pre-sampling compact"); + return None; + } + }; if pre_sampling_compact.reset_client_session { client_session.reset_websocket_session(); } + if incoming_turn_exceeds_context_window( + sess.as_ref(), + turn_context.as_ref(), + estimated_incoming_tokens, + ) + .await + { + send_oversized_input_error(&sess, &turn_context).await; + return None; + } sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref()) .await; @@ -298,8 +320,12 @@ pub(crate) async fn run_turn( can_drain_pending_input = true; let has_pending_input = sess.input_queue.has_pending_input(&sess.active_turn).await; let needs_follow_up = model_needs_follow_up || has_pending_input; - let token_status = - auto_compact_token_status(sess.as_ref(), turn_context.as_ref()).await; + let token_status = auto_compact_token_status( + sess.as_ref(), + turn_context.as_ref(), + /*estimated_incoming_tokens*/ 0, + ) + .await; let token_limit_reached = token_status.token_limit_reached; let estimated_token_count = @@ -640,7 +666,10 @@ struct PreSamplingCompactResult { #[derive(Debug)] struct AutoCompactTokenStatus { + // Current active context usage before adding any unrecorded incoming input. + current_context_tokens: i64, // Full active context usage, independent of the configured auto-compact scope. + // Includes any unrecorded incoming input passed to the projection helper. active_context_tokens: i64, // Usage counted against `model_auto_compact_token_limit` for the current scope. auto_compact_scope_tokens: i64, @@ -652,17 +681,40 @@ struct AutoCompactTokenStatus { token_limit_reached: bool, } +impl AutoCompactTokenStatus { + fn auto_compact_scope_limit_reached(&self) -> bool { + self.auto_compact_scope_tokens >= self.auto_compact_scope_limit + } + + fn current_context_window_limit_reached(&self) -> bool { + self.full_context_window_limit + .is_some_and(|full_context_window_limit| { + self.current_context_tokens >= full_context_window_limit + }) + } + + fn should_compact_before_recording_incoming(&self) -> bool { + self.current_context_window_limit_reached() + || (self.current_context_tokens > 0 + && (self.auto_compact_scope_limit_reached() + || self.full_context_window_limit_reached)) + } +} + async fn auto_compact_token_status( sess: &Session, turn_context: &TurnContext, + estimated_incoming_tokens: i64, ) -> AutoCompactTokenStatus { - let active_context_tokens = sess.get_total_token_usage().await; + let current_context_tokens = sess.get_total_token_usage().await; + let projected_active_context_tokens = + current_context_tokens.saturating_add(estimated_incoming_tokens); let mut auto_compact_window_ordinal = None; let mut auto_compact_window_prefill_tokens = None; let (auto_compact_scope_tokens, auto_compact_scope_limit, full_context_window_limit) = match turn_context.config.model_auto_compact_token_limit_scope { AutoCompactTokenLimitScope::Total => ( - active_context_tokens, + projected_active_context_tokens, turn_context .model_info .auto_compact_token_limit() @@ -673,9 +725,11 @@ async fn auto_compact_token_status( let window = sess.auto_compact_window_snapshot().await; auto_compact_window_ordinal = Some(window.ordinal); auto_compact_window_prefill_tokens = window.prefill_input_tokens; - let baseline = window.prefill_input_tokens.unwrap_or(active_context_tokens); + let baseline = window + .prefill_input_tokens + .unwrap_or(current_context_tokens); ( - active_context_tokens.saturating_sub(baseline), + projected_active_context_tokens.saturating_sub(baseline), turn_context .config .model_auto_compact_token_limit @@ -687,13 +741,14 @@ async fn auto_compact_token_status( }; let full_context_window_limit_reached = full_context_window_limit.is_some_and(|full_context_window_limit| { - active_context_tokens >= full_context_window_limit + projected_active_context_tokens >= full_context_window_limit }); let token_limit_reached = auto_compact_scope_tokens >= auto_compact_scope_limit || full_context_window_limit_reached; AutoCompactTokenStatus { - active_context_tokens, + current_context_tokens, + active_context_tokens: projected_active_context_tokens, auto_compact_scope_tokens, auto_compact_scope_limit, full_context_window_limit, @@ -704,17 +759,99 @@ async fn auto_compact_token_status( } } +fn estimate_incoming_user_input_tokens( + input: &[UserInput], + input_modalities: &[InputModality], +) -> i64 { + if input.is_empty() { + return 0; + } + let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.to_vec()); + let mut response_items = vec![ResponseItem::from(initial_input_for_turn)]; + strip_images_when_unsupported(input_modalities, &mut response_items); + response_items + .iter() + .map(estimate_item_token_count) + .fold(0i64, i64::saturating_add) +} + +fn incoming_input_exceeds_context_window( + turn_context: &TurnContext, + estimated_incoming_tokens: i64, +) -> bool { + if estimated_incoming_tokens <= 0 { + return false; + } + let Some(model_context_window) = turn_context.model_context_window() else { + return false; + }; + estimated_incoming_tokens > model_context_window +} + +async fn incoming_turn_exceeds_context_window( + sess: &Session, + turn_context: &TurnContext, + estimated_incoming_tokens: i64, +) -> bool { + if estimated_incoming_tokens <= 0 { + return false; + } + let Some(model_context_window) = turn_context.model_context_window() else { + return false; + }; + let estimated_history_tokens = + estimate_history_item_tokens(sess, &turn_context.model_info.input_modalities).await; + let base_instructions = sess.get_base_instructions().await; + let estimated_base_instruction_tokens = + i64::try_from(approx_token_count(&base_instructions.text)).unwrap_or(i64::MAX); + let projected_tokens = estimated_history_tokens + .saturating_add(estimated_base_instruction_tokens) + .saturating_add(estimated_incoming_tokens); + projected_tokens > model_context_window +} + +async fn estimate_history_item_tokens(sess: &Session, input_modalities: &[InputModality]) -> i64 { + sess.clone_history() + .await + .for_prompt(input_modalities) + .iter() + .map(estimate_item_token_count) + .fold(0i64, i64::saturating_add) +} + +async fn send_oversized_input_error(sess: &Session, turn_context: &TurnContext) { + sess.send_event( + turn_context, + EventMsg::Error(ErrorEvent { + message: + "This message is too large to send. Split it into smaller chunks before retrying." + .to_string(), + codex_error_info: Some(CodexErrorInfo::ContextWindowExceeded), + }), + ) + .await; +} + async fn run_pre_sampling_compact( sess: &Arc, turn_context: &Arc, client_session: &mut ModelClientSession, + estimated_incoming_tokens: i64, ) -> CodexResult { let mut pre_sampling_compacted = maybe_run_previous_model_inline_compact(sess, turn_context, client_session).await?; let mut reset_client_session = pre_sampling_compacted; - let token_status = auto_compact_token_status(sess.as_ref(), turn_context.as_ref()).await; - // Compact if the configured auto-compaction budget or usable context window is exhausted. - if token_status.token_limit_reached { + let token_status = auto_compact_token_status( + sess.as_ref(), + turn_context.as_ref(), + estimated_incoming_tokens, + ) + .await; + // Compact if current usage has exhausted the usable context window, or if + // projected incoming input would exhaust the configured auto-compaction + // budget. The incoming input is not recorded yet, so this gives compaction + // a chance to make room before persistence. + if token_status.should_compact_before_recording_incoming() { reset_client_session |= run_auto_compact( sess, turn_context, diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 140b4ec2b1..ad50cdad59 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -13,6 +13,7 @@ use codex_protocol::models::PermissionProfile; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ModelsResponse; use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::HookEventName; use codex_protocol::protocol::HookRunStatus; @@ -75,6 +76,7 @@ const DUMMY_CALL_ID: &str = "call-multi-auto"; const FUNCTION_CALL_LIMIT_MSG: &str = "function call limit push"; const POST_AUTO_USER_MSG: &str = "post auto follow-up"; const PRETURN_CONTEXT_DIFF_CWD: &str = "/tmp/PRETURN_CONTEXT_DIFF_CWD"; +const PRETURN_INCOMING_COMPACT_MSG: &str = "incoming pushes compaction"; pub(super) const COMPACT_WARNING_MESSAGE: &str = "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted."; @@ -1692,6 +1694,217 @@ async fn auto_compact_runs_after_token_limit_hit() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pre_turn_auto_compact_accounts_for_incoming_user_input() { + skip_if_no_network!(); + + let server = start_mock_server().await; + let first_turn = sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed_with_tokens("r1", /*total_tokens*/ 190), + ]); + let compact_turn = sse(vec![ + ev_assistant_message("m2", "PRETURN_INCOMING_SUMMARY"), + ev_completed_with_tokens("r2", /*total_tokens*/ 80), + ]); + let follow_up_turn = sse(vec![ + ev_assistant_message("m3", FINAL_REPLY), + ev_completed_with_tokens("r3", /*total_tokens*/ 90), + ]); + let request_log = + mount_sse_sequence(&server, vec![first_turn, compact_turn, follow_up_turn]).await; + + let model_provider = non_openai_model_provider(&server); + let codex = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(200); + }) + .build(&server) + .await + .expect("build codex") + .codex; + + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: FIRST_AUTO_MSG.into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + thread_settings: Default::default(), + }) + .await + .expect("submit first user input"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: format!("{} {}", PRETURN_INCOMING_COMPACT_MSG, "x ".repeat(100)), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + thread_settings: Default::default(), + }) + .await + .expect("submit incoming user input"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let requests = request_log.requests(); + assert_eq!( + requests.len(), + 3, + "expected first turn, pre-turn compact, and post-compact follow-up" + ); + let compact_body = requests[1].body_json().to_string(); + assert!( + body_contains_text(&compact_body, SUMMARIZATION_PROMPT), + "incoming input should trigger pre-turn compaction" + ); + assert!( + !compact_body.contains(PRETURN_INCOMING_COMPACT_MSG), + "incoming user message should not be included in the compaction request" + ); + + let follow_up_body = requests[2].body_json().to_string(); + assert!( + follow_up_body.contains(PRETURN_INCOMING_COMPACT_MSG), + "incoming user message should be sent after pre-turn compaction" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pre_turn_auto_compact_skips_empty_history_for_large_first_prompt() { + skip_if_no_network!(); + + let server = start_mock_server().await; + let first_turn = sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed_with_tokens("r1", /*total_tokens*/ 90), + ]); + let request_log = mount_sse_once(&server, first_turn).await; + + let model_provider = non_openai_model_provider(&server); + let codex = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(200); + config.model_context_window = Some(10_000); + }) + .build(&server) + .await + .expect("build codex") + .codex; + + let large_first_prompt = format!("large-first-prompt-sentinel {}", "x ".repeat(2_000)); + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: large_first_prompt.clone(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + thread_settings: Default::default(), + }) + .await + .expect("submit large first user input"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let requests = request_log.requests(); + assert_eq!( + requests.len(), + 1, + "large first prompt should sample directly without empty-history compaction" + ); + let request_body = requests[0].body_json().to_string(); + assert!( + body_contains_text(&request_body, &large_first_prompt), + "large first prompt should be included in the sampling request" + ); + assert!( + !body_contains_text(&request_body, SUMMARIZATION_PROMPT), + "empty history should not trigger a pre-turn compaction request" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_incoming_user_input_is_rejected_before_persistence() { + skip_if_no_network!(); + + let server = start_mock_server().await; + let compact_turn = sse(vec![ + ev_assistant_message("m1", "EMPTY_COMPACT_SUMMARY"), + ev_completed_with_tokens("r1", /*total_tokens*/ 10), + ]); + let request_log = mount_sse_once(&server, compact_turn).await; + + let model_provider = non_openai_model_provider(&server); + let test = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + config.model_context_window = Some(100); + }) + .build(&server) + .await + .expect("build codex"); + let rollout_path = test + .session_configured + .rollout_path + .clone() + .expect("rollout path"); + let oversized_text = format!("oversized-input-sentinel {}", "x ".repeat(1_000)); + + test.codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: oversized_text.clone(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + thread_settings: Default::default(), + }) + .await + .expect("submit oversized user input"); + + let error = wait_for_event_match(&test.codex, |event| match event { + EventMsg::Error(error) => Some(error.clone()), + _ => None, + }) + .await; + assert_eq!( + error.codex_error_info, + Some(CodexErrorInfo::ContextWindowExceeded) + ); + assert_eq!( + error.message, + "This message is too large to send. Split it into smaller chunks before retrying." + ); + + let requests = request_log.requests(); + assert_eq!( + requests.len(), + 0, + "oversized input should reject before pre-turn compaction or sampling" + ); + let rollout = fs::read_to_string(&rollout_path).unwrap_or_default(); + assert!( + !rollout.contains(&oversized_text), + "oversized input should not be persisted to rollout" + ); +} + // Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts. #[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))] #[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))] @@ -2961,7 +3174,7 @@ async fn snapshot_request_shape_mid_turn_continuation_compaction() { let server = start_mock_server().await; - let context_window = 100; + let context_window = 8_000; let limit = context_window * 90 / 100; let over_limit_tokens = context_window * 95 / 100 + 1; @@ -3068,8 +3281,8 @@ async fn auto_compact_clamps_config_limit_to_context_window() { let server = start_mock_server().await; - let context_window = 100; - let config_limit = 200; + let context_window = 8_000; + let config_limit = 20_000; let over_limit_tokens = context_window * 90 / 100 + 1; let first_turn = sse(vec![ @@ -3155,7 +3368,7 @@ async fn auto_compact_body_after_prefix_ignores_starting_window_prefix() { .with_config(move |config| { config.model_provider = model_provider; set_test_compact_prompt(config); - config.model_context_window = Some(1_000); + config.model_context_window = Some(8_000); config.model_auto_compact_token_limit = Some(100); config.model_auto_compact_token_limit_scope = AutoCompactTokenLimitScope::BodyAfterPrefix; @@ -3306,7 +3519,7 @@ async fn auto_compact_body_after_prefix_still_caps_at_context_window() { ]); let second_turn = sse(vec![ ev_assistant_message("m2", SECOND_LARGE_REPLY), - ev_completed_with_usage("r2", /*input_tokens*/ 98, /*output_tokens*/ 1), + ev_completed_with_usage("r2", /*input_tokens*/ 7_998, /*output_tokens*/ 1), ]); let auto_compact_turn = sse(vec![ ev_assistant_message("m3", AUTO_SUMMARY_TEXT), @@ -3327,8 +3540,8 @@ async fn auto_compact_body_after_prefix_still_caps_at_context_window() { .with_config(move |config| { config.model_provider = model_provider; set_test_compact_prompt(config); - config.model_context_window = Some(100); - config.model_auto_compact_token_limit = Some(200); + config.model_context_window = Some(8_000); + config.model_auto_compact_token_limit = Some(20_000); config.model_auto_compact_token_limit_scope = AutoCompactTokenLimitScope::BodyAfterPrefix; }) diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index d868abbfee..7163df9008 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -1091,7 +1091,7 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R test_codex() .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) .with_config(|config| { - config.model_context_window = Some(2_000); + config.model_context_window = Some(8_000); config.model_auto_compact_token_limit = Some(200_000); }), ) @@ -1213,7 +1213,7 @@ async fn auto_remote_compact_trims_function_call_history_to_fit_context_window() test_codex() .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) .with_config(|config| { - config.model_context_window = Some(2_000); + config.model_context_window = Some(8_000); config.model_auto_compact_token_limit = Some(200_000); }), )