From 6e3f31f8892097a3cf04d2f277bdaa27ff0196f4 Mon Sep 17 00:00:00 2001 From: Ee Durbin Date: Mon, 11 May 2026 16:05:28 -0700 Subject: [PATCH] Core: pause pending steers after usage limits Co-authored-by: Codex --- codex-rs/core/src/session/input_queue.rs | 52 +++++-- codex-rs/core/src/session/mod.rs | 26 +++- codex-rs/core/src/session/tests.rs | 149 +++++++++++++++++++- codex-rs/core/src/session/turn.rs | 31 +++- codex-rs/core/src/state/mod.rs | 1 + codex-rs/core/src/state/turn.rs | 39 +++++ codex-rs/core/src/tasks/mod.rs | 17 ++- codex-rs/core/src/tasks/regular.rs | 3 + codex-rs/core/tests/suite/client.rs | 75 ++++++++++ codex-rs/core/tests/suite/quota_exceeded.rs | 87 ++++++++++++ 10 files changed, 452 insertions(+), 28 deletions(-) diff --git a/codex-rs/core/src/session/input_queue.rs b/codex-rs/core/src/session/input_queue.rs index 2e8a6ded9d..406c73511a 100644 --- a/codex-rs/core/src/session/input_queue.rs +++ b/codex-rs/core/src/session/input_queue.rs @@ -1,5 +1,6 @@ use crate::state::ActiveTurn; use crate::state::MailboxDeliveryPhase; +use crate::state::PendingInputItem; use crate::state::TurnState; use codex_protocol::models::ResponseInputItem; use codex_protocol::protocol::InterAgentCommunication; @@ -11,7 +12,7 @@ use tokio::sync::watch; /// Turn-local pending input storage owned by the input queue flow. #[derive(Default)] pub(crate) struct TurnInputQueue { - items: Vec, + items: Vec, } /// Session-scoped pending input storage and active-turn mailbox delivery coordination. @@ -107,6 +108,7 @@ impl InputQueue { let mut turn_state = active_turn.turn_state.lock().await; turn_state.clear_pending_waiters(); turn_state.pending_input.items.clear(); + turn_state.clear_usage_limit_reached(); } pub(crate) async fn defer_mailbox_delivery_to_next_turn( @@ -148,13 +150,16 @@ impl InputQueue { .accept_mailbox_delivery_for_current_turn(); } - pub(super) async fn push_pending_input_and_accept_mailbox_delivery_for_turn_state( + pub(super) async fn push_pending_steer_input_and_accept_mailbox_delivery_for_turn_state( &self, turn_state: &Mutex, input: ResponseInputItem, ) { let mut turn_state = turn_state.lock().await; - turn_state.pending_input.items.push(input); + turn_state + .pending_input + .items + .push(PendingInputItem::turn_steer(input)); turn_state.accept_mailbox_delivery_for_current_turn(); } @@ -163,13 +168,18 @@ impl InputQueue { turn_state: &Mutex, input: Vec, ) { - turn_state.lock().await.pending_input.items.extend(input); + turn_state + .lock() + .await + .pending_input + .items + .extend(input.into_iter().map(PendingInputItem::injected)); } pub(crate) async fn take_pending_input_for_turn_state( &self, turn_state: &Mutex, - ) -> Vec { + ) -> Vec { turn_state.lock().await.pending_input.items.split_off(0) } @@ -191,7 +201,7 @@ impl InputQueue { .await .pending_input .items - .extend(input); + .extend(input.into_iter().map(PendingInputItem::injected)); Ok(()) } None => Err(input), @@ -202,10 +212,10 @@ impl InputQueue { clippy::await_holding_invalid_type, reason = "active turn checks and turn state updates must remain atomic" )] - pub(crate) async fn prepend_pending_input( + pub(crate) async fn prepend_pending_input_items( &self, active_turn: &Mutex>, - mut input: Vec, + mut input: Vec, ) -> Result<(), ()> { let mut active = active_turn.lock().await; match active.as_mut() { @@ -222,14 +232,25 @@ impl InputQueue { } } - #[expect( - clippy::await_holding_invalid_type, - reason = "active turn checks and turn state updates must remain atomic" - )] pub(crate) async fn get_pending_input( &self, active_turn: &Mutex>, ) -> Vec { + self.get_pending_input_items(active_turn) + .await + .into_iter() + .map(PendingInputItem::into_response_input_item) + .collect() + } + + #[expect( + clippy::await_holding_invalid_type, + reason = "active turn checks and turn state updates must remain atomic" + )] + pub(crate) async fn get_pending_input_items( + &self, + active_turn: &Mutex>, + ) -> Vec { let (pending_input, accepts_mailbox_delivery) = { let mut active = active_turn.lock().await; match active.as_mut() { @@ -246,7 +267,12 @@ impl InputQueue { if !accepts_mailbox_delivery { return pending_input; } - let mailbox_items = self.drain_mailbox_input_items().await; + let mailbox_items = self + .drain_mailbox_input_items() + .await + .into_iter() + .map(PendingInputItem::injected) + .collect::>(); if pending_input.is_empty() { mailbox_items } else if mailbox_items.is_empty() { diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index ddab7752f7..35c26f0459 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -3190,7 +3190,7 @@ impl Session { } self.input_queue - .push_pending_input_and_accept_mailbox_delivery_for_turn_state( + .push_pending_steer_input_and_accept_mailbox_delivery_for_turn_state( active_turn.turn_state.as_ref(), input.into(), ) @@ -3208,6 +3208,30 @@ impl Session { .await } + pub(crate) async fn mark_usage_limit_reached(&self, sub_id: &str) { + let turn_state = self + .input_queue + .turn_state_for_sub_id(&self.active_turn, sub_id) + .await; + let Some(turn_state) = turn_state else { + return; + }; + turn_state.lock().await.mark_usage_limit_reached(); + } + + pub(crate) async fn usage_limit_reached_for_active_turn(&self) -> bool { + let turn_state = { + let active = self.active_turn.lock().await; + active + .as_ref() + .map(|active_turn| Arc::clone(&active_turn.turn_state)) + }; + let Some(turn_state) = turn_state else { + return false; + }; + turn_state.lock().await.usage_limit_reached() + } + pub(crate) async fn record_memory_citation_for_turn(&self, sub_id: &str) { let turn_state = self .input_queue diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index f0cdc70fdf..6f3b2939a1 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -4,6 +4,7 @@ use crate::config::ConfigBuilder; use crate::config::test_config; use crate::context::ContextualUserFragment; use crate::context::TurnAborted; +use crate::context::UserShellCommand; use crate::function_tool::FunctionCallError; use crate::shell::default_user_shell; use crate::skills::SkillRenderSideEffects; @@ -61,6 +62,7 @@ use crate::goals::GoalRuntimeEvent; use crate::goals::SetGoalRequest; use crate::rollout::recorder::RolloutRecorder; use crate::state::ActiveTurn; +use crate::state::PendingInputItem; use crate::state::TaskKind; use crate::tasks::SessionTask; use crate::tasks::SessionTaskContext; @@ -7820,6 +7822,132 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input() )); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn task_finish_discards_leftover_pending_steer_after_usage_limit() { + let (sess, tc, rx) = make_session_and_context_with_rx().await; + let input = vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }]; + sess.spawn_task( + Arc::clone(&tc), + input, + NeverEndingTask { + kind: TaskKind::Regular, + listen_to_cancellation_token: false, + }, + ) + .await; + + while rx.try_recv().is_ok() {} + + sess.steer_input( + vec![UserInput::Text { + text: "late pending input".to_string(), + text_elements: Vec::new(), + }], + Some(&tc.sub_id), + /*responsesapi_client_metadata*/ None, + ) + .await + .expect("steer active turn"); + sess.mark_usage_limit_reached(&tc.sub_id).await; + + sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None) + .await; + + let history = sess.clone_history().await; + let unexpected = ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "late pending input".to_string(), + }], + phase: None, + }; + assert!( + !history.raw_items().iter().any(|item| item == &unexpected), + "expected pending input to be discarded after usage-limit completion" + ); + + let first = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("expected turn complete event") + .expect("channel open"); + assert!(matches!( + first.msg, + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id, + last_agent_message: None, + time_to_first_token_ms: None, + .. + }) if turn_id == tc.sub_id + )); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn task_finish_preserves_injected_pending_context_after_usage_limit() { + let (sess, tc, rx) = make_session_and_context_with_rx().await; + let input = vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }]; + sess.spawn_task( + Arc::clone(&tc), + input, + NeverEndingTask { + kind: TaskKind::Regular, + listen_to_cancellation_token: false, + }, + ) + .await; + + while rx.try_recv().is_ok() {} + + let shell_output = match ContextualUserFragment::into(UserShellCommand::new( + "echo hi", + /*exit_code*/ 0, + std::time::Duration::from_secs(1), + "hi", + )) { + ResponseItem::Message { + role, + content, + phase, + .. + } => ResponseInputItem::Message { + role, + content, + phase, + }, + other => panic!("expected user shell output message, got {other:?}"), + }; + let code_mode_notification = ResponseInputItem::CustomToolCallOutput { + call_id: "call-1".to_string(), + name: Some("code_mode".to_string()), + output: FunctionCallOutputPayload::from_text("cell output".to_string()), + }; + sess.inject_response_items(vec![shell_output.clone(), code_mode_notification.clone()]) + .await + .expect("inject pending context into active turn"); + sess.mark_usage_limit_reached(&tc.sub_id).await; + + sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None) + .await; + + let history = sess.clone_history().await; + assert!( + history + .raw_items() + .contains(&ResponseItem::from(shell_output)) + ); + assert!( + history + .raw_items() + .contains(&ResponseItem::from(code_mode_notification)) + ); +} + #[tokio::test] async fn steer_input_requires_active_turn() { let (sess, _tc, _rx) = make_session_and_context_with_rx().await; @@ -7997,8 +8125,18 @@ async fn prepend_pending_input_keeps_older_tail_ahead_of_newer_input() { .await .expect("inject initial pending input into active turn"); - let drained = sess.input_queue.get_pending_input(&sess.active_turn).await; - assert_eq!(drained, vec![blocked, later.clone()]); + let drained = sess + .input_queue + .get_pending_input_items(&sess.active_turn) + .await; + assert_eq!( + drained + .iter() + .cloned() + .map(PendingInputItem::into_response_input_item) + .collect::>(), + vec![blocked, later.clone()] + ); sess.inject_response_items(vec![newer.clone()]) .await @@ -8007,7 +8145,7 @@ async fn prepend_pending_input_keeps_older_tail_ahead_of_newer_input() { let mut drained_iter = drained.into_iter(); let _blocked = drained_iter.next().expect("blocked prompt should exist"); sess.input_queue - .prepend_pending_input(&sess.active_turn, drained_iter.collect()) + .prepend_pending_input_items(&sess.active_turn, drained_iter.collect()) .await .expect("requeue later pending input at the front of the queue"); @@ -8098,7 +8236,10 @@ async fn abort_empty_active_turn_preserves_pending_input() { assert_eq!( sess.input_queue .take_pending_input_for_turn_state(turn_state.as_ref()) - .await, + .await + .into_iter() + .map(PendingInputItem::into_response_input_item) + .collect::>(), vec![pending_item] ); } diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 7b1a59f610..9ca7c12e68 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -264,7 +264,9 @@ pub(crate) async fn run_turn( // submitted through the UI while the model was running. Though the UI // may support this, the model might not. let pending_input = if can_drain_pending_input { - sess.input_queue.get_pending_input(&sess.active_turn).await + sess.input_queue + .get_pending_input_items(&sess.active_turn) + .await } else { Vec::new() }; @@ -276,7 +278,13 @@ pub(crate) async fn run_turn( if !pending_input.is_empty() { let mut pending_input_iter = pending_input.into_iter(); while let Some(pending_input_item) = pending_input_iter.next() { - match inspect_pending_input(&sess, &turn_context, pending_input_item).await { + match inspect_pending_input( + &sess, + &turn_context, + pending_input_item.into_response_input_item(), + ) + .await + { PendingInputHookDisposition::Accepted(pending_input) => { accepted_pending_input.push(*pending_input); } @@ -287,7 +295,10 @@ pub(crate) async fn run_turn( if !remaining_pending_input.is_empty() { let _ = sess .input_queue - .prepend_pending_input(&sess.active_turn, remaining_pending_input) + .prepend_pending_input_items( + &sess.active_turn, + remaining_pending_input, + ) .await; requeued_pending_input = true; } @@ -1103,12 +1114,18 @@ async fn run_sampling_request( sess.set_total_tokens_full(&turn_context).await; return Err(CodexErr::ContextWindowExceeded); } - Err(CodexErr::UsageLimitReached(e)) => { - let rate_limits = e.rate_limits.clone(); - if let Some(rate_limits) = rate_limits { + Err( + err @ (CodexErr::UsageLimitReached(_) + | CodexErr::QuotaExceeded + | CodexErr::UsageNotIncluded), + ) => { + if let CodexErr::UsageLimitReached(e) = &err + && let Some(rate_limits) = e.rate_limits.clone() + { sess.update_rate_limits(&turn_context, *rate_limits).await; } - return Err(CodexErr::UsageLimitReached(e)); + sess.mark_usage_limit_reached(&turn_context.sub_id).await; + return Err(err); } Err(err) => err, }; diff --git a/codex-rs/core/src/state/mod.rs b/codex-rs/core/src/state/mod.rs index 3122ec5f25..a1cb67aa99 100644 --- a/codex-rs/core/src/state/mod.rs +++ b/codex-rs/core/src/state/mod.rs @@ -8,6 +8,7 @@ pub(crate) use service::SessionServices; pub(crate) use session::SessionState; pub(crate) use turn::ActiveTurn; pub(crate) use turn::MailboxDeliveryPhase; +pub(crate) use turn::PendingInputItem; pub(crate) use turn::PendingRequestPermissions; pub(crate) use turn::RunningTask; pub(crate) use turn::TaskKind; diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 86438ad56a..a4f941886a 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -23,9 +23,36 @@ use crate::session::TurnInputQueue; use crate::session::turn_context::TurnContext; use crate::tasks::AnySessionTask; use codex_protocol::models::AdditionalPermissionProfile; +use codex_protocol::models::ResponseInputItem; use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::TokenUsage; +#[derive(Clone, Debug, PartialEq)] +pub(crate) enum PendingInputItem { + TurnSteer(ResponseInputItem), + Injected(ResponseInputItem), +} + +impl PendingInputItem { + pub(crate) fn turn_steer(input: ResponseInputItem) -> Self { + Self::TurnSteer(input) + } + + pub(crate) fn injected(input: ResponseInputItem) -> Self { + Self::Injected(input) + } + + pub(crate) fn is_turn_steer(&self) -> bool { + matches!(self, Self::TurnSteer(_)) + } + + pub(crate) fn into_response_input_item(self) -> ResponseInputItem { + match self { + Self::TurnSteer(input) | Self::Injected(input) => input, + } + } +} + /// Metadata about the currently running turn. pub(crate) struct ActiveTurn { pub(crate) tasks: IndexMap, @@ -116,6 +143,7 @@ pub(crate) struct TurnState { pending_elicitations: HashMap<(String, RequestId), oneshot::Sender>, pending_dynamic_tools: HashMap>, pub(crate) pending_input: TurnInputQueue, + usage_limit_reached: bool, mailbox_delivery_phase: MailboxDeliveryPhase, granted_permissions: Option, strict_auto_review_enabled: bool, @@ -219,6 +247,17 @@ impl TurnState { self.pending_dynamic_tools.remove(key) } + pub(crate) fn mark_usage_limit_reached(&mut self) { + self.usage_limit_reached = true; + } + + pub(crate) fn clear_usage_limit_reached(&mut self) { + self.usage_limit_reached = false; + } + + pub(crate) fn usage_limit_reached(&self) -> bool { + self.usage_limit_reached + } pub(crate) fn accept_mailbox_delivery_for_current_turn(&mut self) { self.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn); } diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index ba7f707927..ab32596c84 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -31,6 +31,7 @@ use crate::hook_runtime::record_pending_input; use crate::session::session::Session; use crate::session::turn_context::TurnContext; use crate::state::ActiveTurn; +use crate::state::PendingInputItem; use crate::state::RunningTask; use crate::state::TaskKind; use codex_analytics::TurnTokenUsageFact; @@ -42,7 +43,6 @@ use codex_otel::TURN_MEMORY_METRIC; use codex_otel::TURN_NETWORK_PROXY_METRIC; use codex_otel::TURN_TOKEN_USAGE_METRIC; use codex_otel::TURN_TOOL_CALL_METRIC; -use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; @@ -587,12 +587,13 @@ impl Session { .turn_metadata_state .cancel_git_enrichment_task(); - let mut pending_input = Vec::::new(); + let mut pending_input = Vec::::new(); let mut should_clear_active_turn = false; let mut token_usage_at_turn_start = None; let mut turn_had_memory_citation = false; let mut turn_tool_calls = 0_u64; let mut records_turn_token_usage_on_span = false; + let mut usage_limit_reached = false; let turn_state = { let mut active = self.active_turn.lock().await; if let Some(at) = active.as_mut() @@ -619,10 +620,20 @@ impl Session { turn_had_memory_citation = ts.has_memory_citation; turn_tool_calls = ts.tool_calls; token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone()); + usage_limit_reached = ts.usage_limit_reached(); } if !pending_input.is_empty() { for pending_input_item in pending_input { - match inspect_pending_input(self, &turn_context, pending_input_item).await { + if usage_limit_reached && pending_input_item.is_turn_steer() { + continue; + } + match inspect_pending_input( + self, + &turn_context, + pending_input_item.into_response_input_item(), + ) + .await + { PendingInputHookDisposition::Accepted(pending_input) => { record_pending_input(self, &turn_context, *pending_input).await; } diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 50414df278..ae74ed0185 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -80,6 +80,9 @@ impl SessionTask for RegularTask { ) .instrument(run_turn_span.clone()) .await; + if sess.usage_limit_reached_for_active_turn().await { + return last_agent_message; + } if !sess.input_queue.has_pending_input(&sess.active_turn).await { return last_agent_message; } diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index f6731a0ea4..abbb4d6fbb 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -2671,6 +2671,81 @@ async fn usage_limit_error_emits_rate_limit_event() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn usage_limit_error_does_not_auto_send_pending_steer() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + let server = MockServer::start().await; + + let response = ResponseTemplate::new(429) + .insert_header("x-codex-primary-used-percent", "100.0") + .insert_header("x-codex-secondary-used-percent", "87.5") + .insert_header("x-codex-primary-over-secondary-limit-percent", "95.0") + .insert_header("x-codex-primary-window-minutes", "15") + .insert_header("x-codex-secondary-window-minutes", "60") + .set_delay(std::time::Duration::from_millis(100)) + .set_body_json(json!({ + "error": { + "type": "usage_limit_reached", + "message": "limit reached", + "resets_at": 1704067242, + "plan_type": "pro" + } + })); + + Mock::given(method("POST")) + .and(path("/v1/responses")) + .respond_with(response) + .expect(1) + .mount(&server) + .await; + + let mut builder = test_codex(); + let codex_fixture = builder.build(&server).await?; + let codex = codex_fixture.codex.clone(); + + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + thread_settings: Default::default(), + }) + .await?; + wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnStarted(_))).await; + + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: "steer while blocked".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + thread_settings: Default::default(), + }) + .await?; + + wait_for_event(&codex, |msg| matches!(msg, EventMsg::Error(_))).await; + wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await; + tokio::time::sleep(std::time::Duration::from_millis(150)).await; + + assert_eq!( + server + .received_requests() + .await + .expect("mock server should not fail") + .len(), + 1 + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn context_window_error_sets_total_tokens_to_model_window() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/core/tests/suite/quota_exceeded.rs b/codex-rs/core/tests/suite/quota_exceeded.rs index 413855f641..1ab74dce72 100644 --- a/codex-rs/core/tests/suite/quota_exceeded.rs +++ b/codex-rs/core/tests/suite/quota_exceeded.rs @@ -3,8 +3,10 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_response_once; use core_test_support::responses::mount_sse_once; use core_test_support::responses::sse; +use core_test_support::responses::sse_response; use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::test_codex::test_codex; @@ -75,3 +77,88 @@ async fn quota_exceeded_emits_single_error_event() -> Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn quota_exceeded_does_not_auto_send_pending_steer() -> Result<()> { + assert_usage_limit_like_failure_does_not_auto_send_pending_steer( + "insufficient_quota", + "You exceeded your current quota.", + ) + .await +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn usage_not_included_does_not_auto_send_pending_steer() -> Result<()> { + assert_usage_limit_like_failure_does_not_auto_send_pending_steer( + "usage_not_included", + "Usage is not included with this plan.", + ) + .await +} + +async fn assert_usage_limit_like_failure_does_not_auto_send_pending_steer( + code: &str, + message: &str, +) -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let mut builder = test_codex(); + mount_response_once( + &server, + sse_response(sse(vec![ + ev_response_created("resp-1"), + json!({ + "type": "response.failed", + "response": { + "id": "resp-1", + "error": { + "code": code, + "message": message, + } + } + }), + ])) + .set_delay(std::time::Duration::from_millis(100)), + ) + .await; + let test = builder.build(&server).await?; + + test.codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + thread_settings: Default::default(), + }) + .await?; + wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnStarted(_))).await; + + test.codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: "steer while blocked".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + thread_settings: Default::default(), + }) + .await?; + + wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::Error(_))).await; + wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await; + tokio::time::sleep(std::time::Duration::from_millis(150)).await; + + assert_eq!( + server.received_requests().await.unwrap_or_default().len(), + 1 + ); + + Ok(()) +}