From 44c202d347cf3f88c9283cd5304012edcf5a2b34 Mon Sep 17 00:00:00 2001 From: Roy Han Date: Fri, 22 May 2026 14:26:09 -0700 Subject: [PATCH] preserve terminal pending input --- codex-rs/core/src/session/input_queue.rs | 6 ++-- codex-rs/core/src/session/tests.rs | 34 ++++++++++++++++++++-- codex-rs/core/src/tasks/regular.rs | 31 ++++++++++++++++++-- codex-rs/core/tests/suite/pending_input.rs | 12 ++++++-- 4 files changed, 74 insertions(+), 9 deletions(-) diff --git a/codex-rs/core/src/session/input_queue.rs b/codex-rs/core/src/session/input_queue.rs index 561604386d..633a887221 100644 --- a/codex-rs/core/src/session/input_queue.rs +++ b/codex-rs/core/src/session/input_queue.rs @@ -116,14 +116,14 @@ impl InputQueue { turn_state.pending_input.items.clear(); } - pub(crate) async fn mark_terminal_and_clear_pending_for_turn_state( + pub(crate) async fn mark_terminal_and_take_pending_for_turn_state( &self, turn_state: &Mutex, - ) { + ) -> Vec { let mut turn_state = turn_state.lock().await; turn_state.mark_terminal(); turn_state.clear_pending_waiters(); - turn_state.pending_input.items.clear(); + turn_state.pending_input.items.split_off(0) } pub(crate) async fn defer_mailbox_delivery_to_next_turn( diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index a373e9d790..d865340fe0 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -8140,9 +8140,39 @@ async fn terminal_active_turn_rejects_new_pending_input() { let active_turn = sess.active_turn.lock().await; Arc::clone(&active_turn.as_ref().expect("active turn").turn_state) }; - sess.input_queue - .mark_terminal_and_clear_pending_for_turn_state(turn_state.as_ref()) + let pending_user_input = vec![UserInput::Text { + text: "pending steer".to_string(), + text_elements: Vec::new(), + }]; + sess.steer_input( + pending_user_input.clone(), + Some(&tc.sub_id), + /*responsesapi_client_metadata*/ None, + ) + .await + .expect("steer input before terminal"); + let pending_response_item = ResponseInputItem::Message { + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "pending injected input".to_string(), + }], + phase: None, + }; + sess.inject_response_items(vec![pending_response_item.clone()]) + .await + .expect("inject input before terminal"); + + let terminal_pending_input = sess + .input_queue + .mark_terminal_and_take_pending_for_turn_state(turn_state.as_ref()) .await; + assert_eq!( + terminal_pending_input, + vec![ + TurnInput::UserInput(pending_user_input), + TurnInput::ResponseInputItem(pending_response_item) + ] + ); let steer_input = vec![UserInput::Text { text: "late steer".to_string(), diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 3f6383e81c..da8321392c 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -2,12 +2,15 @@ use std::sync::Arc; use tokio_util::sync::CancellationToken; +use crate::session::TurnInput; use crate::session::turn::AutoCompactTurnLimiter; use crate::session::turn::RunTurnResult; use crate::session::turn::run_turn; use crate::session::turn_context::TurnContext; use crate::session_startup_prewarm::SessionStartupPrewarmResolution; use crate::state::TaskKind; +use codex_protocol::protocol::CodexErrorInfo; +use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::user_input::UserInput; @@ -98,9 +101,33 @@ impl SessionTask for RegularTask { .map(|active_turn| Arc::clone(&active_turn.turn_state)) }; if let Some(turn_state) = turn_state { - sess.input_queue - .mark_terminal_and_clear_pending_for_turn_state(turn_state.as_ref()) + let pending_input = sess + .input_queue + .mark_terminal_and_take_pending_for_turn_state(turn_state.as_ref()) .await; + let mut rejected_user_input = false; + let mut response_items_for_next_turn = Vec::new(); + for pending_input_item in pending_input { + match pending_input_item { + TurnInput::UserInput(_) => rejected_user_input = true, + TurnInput::ResponseInputItem(item) => { + response_items_for_next_turn.push(item); + } + } + } + sess.input_queue + .queue_response_items_for_next_turn(response_items_for_next_turn) + .await; + if rejected_user_input { + sess.send_event( + ctx.as_ref(), + EventMsg::Error(ErrorEvent { + message: "Pending user input was not processed because Codex stopped this turn after repeated automatic compactions. Submit it again in a new turn or reduce earlier history before retrying.".to_string(), + codex_error_info: Some(CodexErrorInfo::ContextWindowExceeded), + }), + ) + .await; + } } return None; } diff --git a/codex-rs/core/tests/suite/pending_input.rs b/codex-rs/core/tests/suite/pending_input.rs index be4b425655..122b6d4958 100644 --- a/codex-rs/core/tests/suite/pending_input.rs +++ b/codex-rs/core/tests/suite/pending_input.rs @@ -798,7 +798,7 @@ async fn steered_user_input_resets_auto_compact_limit() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn steered_user_input_is_discarded_when_auto_compact_guard_fires() { +async fn steered_user_input_is_rejected_when_auto_compact_guard_fires() { let (gate_fourth_completed_tx, gate_fourth_completed_rx) = oneshot::channel(); let token_count_used = 270_000; let token_count_used_after_compaction = 80_000; @@ -873,6 +873,14 @@ async fn steered_user_input_is_discarded_when_auto_compact_guard_fires() { steer_user_input(&codex, "late steer").await; let _ = gate_fourth_completed_tx.send(()); + wait_for_event(&codex, |event| { + matches!( + event, + EventMsg::Error(error) + if error.message.contains("Pending user input was not processed") + ) + }) + .await; wait_for_turn_complete(&codex).await; assert_eq!( server.requests().await.len(), @@ -901,7 +909,7 @@ async fn steered_user_input_is_discarded_when_auto_compact_guard_fires() { .collect(); assert!( !all_user_texts.iter().any(|text| text == "late steer"), - "late steer should be discarded after the terminal compaction guard" + "late steer should be rejected after the terminal compaction guard" ); assert!( all_user_texts.iter().any(|text| text == "next prompt"),