Core: pause pending steers after usage limits

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ee Durbin
2026-05-11 16:05:28 -07:00
parent 80fdd4688f
commit 6e3f31f889
10 changed files with 452 additions and 28 deletions

View File

@@ -1,5 +1,6 @@
use crate::state::ActiveTurn;
use crate::state::MailboxDeliveryPhase;
use crate::state::PendingInputItem;
use crate::state::TurnState;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::protocol::InterAgentCommunication;
@@ -11,7 +12,7 @@ use tokio::sync::watch;
/// Turn-local pending input storage owned by the input queue flow.
#[derive(Default)]
pub(crate) struct TurnInputQueue {
items: Vec<ResponseInputItem>,
items: Vec<PendingInputItem>,
}
/// Session-scoped pending input storage and active-turn mailbox delivery coordination.
@@ -107,6 +108,7 @@ impl InputQueue {
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.clear_pending_waiters();
turn_state.pending_input.items.clear();
turn_state.clear_usage_limit_reached();
}
pub(crate) async fn defer_mailbox_delivery_to_next_turn(
@@ -148,13 +150,16 @@ impl InputQueue {
.accept_mailbox_delivery_for_current_turn();
}
pub(super) async fn push_pending_input_and_accept_mailbox_delivery_for_turn_state(
pub(super) async fn push_pending_steer_input_and_accept_mailbox_delivery_for_turn_state(
&self,
turn_state: &Mutex<TurnState>,
input: ResponseInputItem,
) {
let mut turn_state = turn_state.lock().await;
turn_state.pending_input.items.push(input);
turn_state
.pending_input
.items
.push(PendingInputItem::turn_steer(input));
turn_state.accept_mailbox_delivery_for_current_turn();
}
@@ -163,13 +168,18 @@ impl InputQueue {
turn_state: &Mutex<TurnState>,
input: Vec<ResponseInputItem>,
) {
turn_state.lock().await.pending_input.items.extend(input);
turn_state
.lock()
.await
.pending_input
.items
.extend(input.into_iter().map(PendingInputItem::injected));
}
pub(crate) async fn take_pending_input_for_turn_state(
&self,
turn_state: &Mutex<TurnState>,
) -> Vec<ResponseInputItem> {
) -> Vec<PendingInputItem> {
turn_state.lock().await.pending_input.items.split_off(0)
}
@@ -191,7 +201,7 @@ impl InputQueue {
.await
.pending_input
.items
.extend(input);
.extend(input.into_iter().map(PendingInputItem::injected));
Ok(())
}
None => Err(input),
@@ -202,10 +212,10 @@ impl InputQueue {
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub(crate) async fn prepend_pending_input(
pub(crate) async fn prepend_pending_input_items(
&self,
active_turn: &Mutex<Option<ActiveTurn>>,
mut input: Vec<ResponseInputItem>,
mut input: Vec<PendingInputItem>,
) -> Result<(), ()> {
let mut active = active_turn.lock().await;
match active.as_mut() {
@@ -222,14 +232,25 @@ impl InputQueue {
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub(crate) async fn get_pending_input(
&self,
active_turn: &Mutex<Option<ActiveTurn>>,
) -> Vec<ResponseInputItem> {
self.get_pending_input_items(active_turn)
.await
.into_iter()
.map(PendingInputItem::into_response_input_item)
.collect()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub(crate) async fn get_pending_input_items(
&self,
active_turn: &Mutex<Option<ActiveTurn>>,
) -> Vec<PendingInputItem> {
let (pending_input, accepts_mailbox_delivery) = {
let mut active = active_turn.lock().await;
match active.as_mut() {
@@ -246,7 +267,12 @@ impl InputQueue {
if !accepts_mailbox_delivery {
return pending_input;
}
let mailbox_items = self.drain_mailbox_input_items().await;
let mailbox_items = self
.drain_mailbox_input_items()
.await
.into_iter()
.map(PendingInputItem::injected)
.collect::<Vec<_>>();
if pending_input.is_empty() {
mailbox_items
} else if mailbox_items.is_empty() {

View File

@@ -3190,7 +3190,7 @@ impl Session {
}
self.input_queue
.push_pending_input_and_accept_mailbox_delivery_for_turn_state(
.push_pending_steer_input_and_accept_mailbox_delivery_for_turn_state(
active_turn.turn_state.as_ref(),
input.into(),
)
@@ -3208,6 +3208,30 @@ impl Session {
.await
}
pub(crate) async fn mark_usage_limit_reached(&self, sub_id: &str) {
let turn_state = self
.input_queue
.turn_state_for_sub_id(&self.active_turn, sub_id)
.await;
let Some(turn_state) = turn_state else {
return;
};
turn_state.lock().await.mark_usage_limit_reached();
}
pub(crate) async fn usage_limit_reached_for_active_turn(&self) -> bool {
let turn_state = {
let active = self.active_turn.lock().await;
active
.as_ref()
.map(|active_turn| Arc::clone(&active_turn.turn_state))
};
let Some(turn_state) = turn_state else {
return false;
};
turn_state.lock().await.usage_limit_reached()
}
pub(crate) async fn record_memory_citation_for_turn(&self, sub_id: &str) {
let turn_state = self
.input_queue

View File

@@ -4,6 +4,7 @@ use crate::config::ConfigBuilder;
use crate::config::test_config;
use crate::context::ContextualUserFragment;
use crate::context::TurnAborted;
use crate::context::UserShellCommand;
use crate::function_tool::FunctionCallError;
use crate::shell::default_user_shell;
use crate::skills::SkillRenderSideEffects;
@@ -61,6 +62,7 @@ use crate::goals::GoalRuntimeEvent;
use crate::goals::SetGoalRequest;
use crate::rollout::recorder::RolloutRecorder;
use crate::state::ActiveTurn;
use crate::state::PendingInputItem;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
@@ -7820,6 +7822,132 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn task_finish_discards_leftover_pending_steer_after_usage_limit() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
sess.spawn_task(
Arc::clone(&tc),
input,
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: false,
},
)
.await;
while rx.try_recv().is_ok() {}
sess.steer_input(
vec![UserInput::Text {
text: "late pending input".to_string(),
text_elements: Vec::new(),
}],
Some(&tc.sub_id),
/*responsesapi_client_metadata*/ None,
)
.await
.expect("steer active turn");
sess.mark_usage_limit_reached(&tc.sub_id).await;
sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None)
.await;
let history = sess.clone_history().await;
let unexpected = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "late pending input".to_string(),
}],
phase: None,
};
assert!(
!history.raw_items().iter().any(|item| item == &unexpected),
"expected pending input to be discarded after usage-limit completion"
);
let first = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("expected turn complete event")
.expect("channel open");
assert!(matches!(
first.msg,
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id,
last_agent_message: None,
time_to_first_token_ms: None,
..
}) if turn_id == tc.sub_id
));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn task_finish_preserves_injected_pending_context_after_usage_limit() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
sess.spawn_task(
Arc::clone(&tc),
input,
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: false,
},
)
.await;
while rx.try_recv().is_ok() {}
let shell_output = match ContextualUserFragment::into(UserShellCommand::new(
"echo hi",
/*exit_code*/ 0,
std::time::Duration::from_secs(1),
"hi",
)) {
ResponseItem::Message {
role,
content,
phase,
..
} => ResponseInputItem::Message {
role,
content,
phase,
},
other => panic!("expected user shell output message, got {other:?}"),
};
let code_mode_notification = ResponseInputItem::CustomToolCallOutput {
call_id: "call-1".to_string(),
name: Some("code_mode".to_string()),
output: FunctionCallOutputPayload::from_text("cell output".to_string()),
};
sess.inject_response_items(vec![shell_output.clone(), code_mode_notification.clone()])
.await
.expect("inject pending context into active turn");
sess.mark_usage_limit_reached(&tc.sub_id).await;
sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None)
.await;
let history = sess.clone_history().await;
assert!(
history
.raw_items()
.contains(&ResponseItem::from(shell_output))
);
assert!(
history
.raw_items()
.contains(&ResponseItem::from(code_mode_notification))
);
}
#[tokio::test]
async fn steer_input_requires_active_turn() {
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;
@@ -7997,8 +8125,18 @@ async fn prepend_pending_input_keeps_older_tail_ahead_of_newer_input() {
.await
.expect("inject initial pending input into active turn");
let drained = sess.input_queue.get_pending_input(&sess.active_turn).await;
assert_eq!(drained, vec![blocked, later.clone()]);
let drained = sess
.input_queue
.get_pending_input_items(&sess.active_turn)
.await;
assert_eq!(
drained
.iter()
.cloned()
.map(PendingInputItem::into_response_input_item)
.collect::<Vec<_>>(),
vec![blocked, later.clone()]
);
sess.inject_response_items(vec![newer.clone()])
.await
@@ -8007,7 +8145,7 @@ async fn prepend_pending_input_keeps_older_tail_ahead_of_newer_input() {
let mut drained_iter = drained.into_iter();
let _blocked = drained_iter.next().expect("blocked prompt should exist");
sess.input_queue
.prepend_pending_input(&sess.active_turn, drained_iter.collect())
.prepend_pending_input_items(&sess.active_turn, drained_iter.collect())
.await
.expect("requeue later pending input at the front of the queue");
@@ -8098,7 +8236,10 @@ async fn abort_empty_active_turn_preserves_pending_input() {
assert_eq!(
sess.input_queue
.take_pending_input_for_turn_state(turn_state.as_ref())
.await,
.await
.into_iter()
.map(PendingInputItem::into_response_input_item)
.collect::<Vec<_>>(),
vec![pending_item]
);
}

View File

@@ -264,7 +264,9 @@ pub(crate) async fn run_turn(
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
let pending_input = if can_drain_pending_input {
sess.input_queue.get_pending_input(&sess.active_turn).await
sess.input_queue
.get_pending_input_items(&sess.active_turn)
.await
} else {
Vec::new()
};
@@ -276,7 +278,13 @@ pub(crate) async fn run_turn(
if !pending_input.is_empty() {
let mut pending_input_iter = pending_input.into_iter();
while let Some(pending_input_item) = pending_input_iter.next() {
match inspect_pending_input(&sess, &turn_context, pending_input_item).await {
match inspect_pending_input(
&sess,
&turn_context,
pending_input_item.into_response_input_item(),
)
.await
{
PendingInputHookDisposition::Accepted(pending_input) => {
accepted_pending_input.push(*pending_input);
}
@@ -287,7 +295,10 @@ pub(crate) async fn run_turn(
if !remaining_pending_input.is_empty() {
let _ = sess
.input_queue
.prepend_pending_input(&sess.active_turn, remaining_pending_input)
.prepend_pending_input_items(
&sess.active_turn,
remaining_pending_input,
)
.await;
requeued_pending_input = true;
}
@@ -1103,12 +1114,18 @@ async fn run_sampling_request(
sess.set_total_tokens_full(&turn_context).await;
return Err(CodexErr::ContextWindowExceeded);
}
Err(CodexErr::UsageLimitReached(e)) => {
let rate_limits = e.rate_limits.clone();
if let Some(rate_limits) = rate_limits {
Err(
err @ (CodexErr::UsageLimitReached(_)
| CodexErr::QuotaExceeded
| CodexErr::UsageNotIncluded),
) => {
if let CodexErr::UsageLimitReached(e) = &err
&& let Some(rate_limits) = e.rate_limits.clone()
{
sess.update_rate_limits(&turn_context, *rate_limits).await;
}
return Err(CodexErr::UsageLimitReached(e));
sess.mark_usage_limit_reached(&turn_context.sub_id).await;
return Err(err);
}
Err(err) => err,
};

View File

@@ -8,6 +8,7 @@ pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::MailboxDeliveryPhase;
pub(crate) use turn::PendingInputItem;
pub(crate) use turn::PendingRequestPermissions;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;

View File

@@ -23,9 +23,36 @@ use crate::session::TurnInputQueue;
use crate::session::turn_context::TurnContext;
use crate::tasks::AnySessionTask;
use codex_protocol::models::AdditionalPermissionProfile;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::TokenUsage;
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum PendingInputItem {
TurnSteer(ResponseInputItem),
Injected(ResponseInputItem),
}
impl PendingInputItem {
pub(crate) fn turn_steer(input: ResponseInputItem) -> Self {
Self::TurnSteer(input)
}
pub(crate) fn injected(input: ResponseInputItem) -> Self {
Self::Injected(input)
}
pub(crate) fn is_turn_steer(&self) -> bool {
matches!(self, Self::TurnSteer(_))
}
pub(crate) fn into_response_input_item(self) -> ResponseInputItem {
match self {
Self::TurnSteer(input) | Self::Injected(input) => input,
}
}
}
/// Metadata about the currently running turn.
pub(crate) struct ActiveTurn {
pub(crate) tasks: IndexMap<String, RunningTask>,
@@ -116,6 +143,7 @@ pub(crate) struct TurnState {
pending_elicitations: HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>,
pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>,
pub(crate) pending_input: TurnInputQueue,
usage_limit_reached: bool,
mailbox_delivery_phase: MailboxDeliveryPhase,
granted_permissions: Option<AdditionalPermissionProfile>,
strict_auto_review_enabled: bool,
@@ -219,6 +247,17 @@ impl TurnState {
self.pending_dynamic_tools.remove(key)
}
pub(crate) fn mark_usage_limit_reached(&mut self) {
self.usage_limit_reached = true;
}
pub(crate) fn clear_usage_limit_reached(&mut self) {
self.usage_limit_reached = false;
}
pub(crate) fn usage_limit_reached(&self) -> bool {
self.usage_limit_reached
}
pub(crate) fn accept_mailbox_delivery_for_current_turn(&mut self) {
self.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn);
}

View File

@@ -31,6 +31,7 @@ use crate::hook_runtime::record_pending_input;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::ActiveTurn;
use crate::state::PendingInputItem;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_analytics::TurnTokenUsageFact;
@@ -42,7 +43,6 @@ use codex_otel::TURN_MEMORY_METRIC;
use codex_otel::TURN_NETWORK_PROXY_METRIC;
use codex_otel::TURN_TOKEN_USAGE_METRIC;
use codex_otel::TURN_TOOL_CALL_METRIC;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
@@ -587,12 +587,13 @@ impl Session {
.turn_metadata_state
.cancel_git_enrichment_task();
let mut pending_input = Vec::<ResponseInputItem>::new();
let mut pending_input = Vec::<PendingInputItem>::new();
let mut should_clear_active_turn = false;
let mut token_usage_at_turn_start = None;
let mut turn_had_memory_citation = false;
let mut turn_tool_calls = 0_u64;
let mut records_turn_token_usage_on_span = false;
let mut usage_limit_reached = false;
let turn_state = {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut()
@@ -619,10 +620,20 @@ impl Session {
turn_had_memory_citation = ts.has_memory_citation;
turn_tool_calls = ts.tool_calls;
token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone());
usage_limit_reached = ts.usage_limit_reached();
}
if !pending_input.is_empty() {
for pending_input_item in pending_input {
match inspect_pending_input(self, &turn_context, pending_input_item).await {
if usage_limit_reached && pending_input_item.is_turn_steer() {
continue;
}
match inspect_pending_input(
self,
&turn_context,
pending_input_item.into_response_input_item(),
)
.await
{
PendingInputHookDisposition::Accepted(pending_input) => {
record_pending_input(self, &turn_context, *pending_input).await;
}

View File

@@ -80,6 +80,9 @@ impl SessionTask for RegularTask {
)
.instrument(run_turn_span.clone())
.await;
if sess.usage_limit_reached_for_active_turn().await {
return last_agent_message;
}
if !sess.input_queue.has_pending_input(&sess.active_turn).await {
return last_agent_message;
}

View File

@@ -2671,6 +2671,81 @@ async fn usage_limit_error_emits_rate_limit_event() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn usage_limit_error_does_not_auto_send_pending_steer() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
let response = ResponseTemplate::new(429)
.insert_header("x-codex-primary-used-percent", "100.0")
.insert_header("x-codex-secondary-used-percent", "87.5")
.insert_header("x-codex-primary-over-secondary-limit-percent", "95.0")
.insert_header("x-codex-primary-window-minutes", "15")
.insert_header("x-codex-secondary-window-minutes", "60")
.set_delay(std::time::Duration::from_millis(100))
.set_body_json(json!({
"error": {
"type": "usage_limit_reached",
"message": "limit reached",
"resets_at": 1704067242,
"plan_type": "pro"
}
}));
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(response)
.expect(1)
.mount(&server)
.await;
let mut builder = test_codex();
let codex_fixture = builder.build(&server).await?;
let codex = codex_fixture.codex.clone();
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnStarted(_))).await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "steer while blocked".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |msg| matches!(msg, EventMsg::Error(_))).await;
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
assert_eq!(
server
.received_requests()
.await
.expect("mock server should not fail")
.len(),
1
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn context_window_error_sets_total_tokens_to_model_window() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -3,8 +3,10 @@ use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_response_once;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
@@ -75,3 +77,88 @@ async fn quota_exceeded_emits_single_error_event() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn quota_exceeded_does_not_auto_send_pending_steer() -> Result<()> {
assert_usage_limit_like_failure_does_not_auto_send_pending_steer(
"insufficient_quota",
"You exceeded your current quota.",
)
.await
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn usage_not_included_does_not_auto_send_pending_steer() -> Result<()> {
assert_usage_limit_like_failure_does_not_auto_send_pending_steer(
"usage_not_included",
"Usage is not included with this plan.",
)
.await
}
async fn assert_usage_limit_like_failure_does_not_auto_send_pending_steer(
code: &str,
message: &str,
) -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex();
mount_response_once(
&server,
sse_response(sse(vec![
ev_response_created("resp-1"),
json!({
"type": "response.failed",
"response": {
"id": "resp-1",
"error": {
"code": code,
"message": message,
}
}
}),
]))
.set_delay(std::time::Duration::from_millis(100)),
)
.await;
let test = builder.build(&server).await?;
test.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnStarted(_))).await;
test.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "steer while blocked".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::Error(_))).await;
wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
assert_eq!(
server.received_requests().await.unwrap_or_default().len(),
1
);
Ok(())
}