Queue nudges while plan generating (#10457)

## Summary

This PR fixes a UI/streaming race when nudged or steer-enabled messages
are queued during an active Plan stream.

Previously, `submit_user_message_with_mode` switched collaboration mode
immediately (via `set_collaboration_mask`) even when the message was
queued. If that happened mid-Plan stream, `active_mode_kind` could flip
away from Plan before the turn finished, causing subsequent
`on_plan_delta` updates to be ignored in the UI.

Now, mode switching is deferred until the queued message is actually
submitted.

## What changed

- Added a per-message deferred mode override on `UserMessage`:
  - `collaboration_mode_override: Option<CollaborationModeMask>`
- Updated `submit_user_message_with_mode` to:
  - create a `UserMessage` carrying the mode override
- queue or submit that message without mutating global mode immediately
- Updated `submit_user_message` to:
- apply `collaboration_mode_override` just before constructing/sending
`Op::UserTurn`
- Kept queueing condition scoped to active Plan stream rendering:
- queue only while plan output is actively streaming in TUI
(`plan_stream_controller.is_some()`)

## Why

This preserves Plan mode for the remainder of the in-flight Plan turn,
so streamed plan deltas continue rendering correctly, while still
ensuring the follow-up queued message is sent with the intended
collaboration mode.

## Behavior after this change

- If a nudged/steer submission happens while Plan output is actively
streaming:
  - message is queued
  - UI stays in Plan mode for the running turn
- once dequeued/submitted, mode override is applied and the message is
sent in the intended mode
- If no Plan stream is active:
- submission proceeds immediately and mode override is applied as before

## Tests

Added/updated coverage in `tui/src/chatwidget/tests.rs`:

- `submit_user_message_with_mode_queues_while_plan_stream_is_active`
  - asserts mode remains Plan while queued
- asserts mode switches to Code when queued message is actually
submitted
- `submit_user_message_with_mode_submits_when_plan_stream_is_not_active`
- `steer_enter_queues_while_plan_stream_is_active`
- `steer_enter_submits_when_plan_stream_is_not_active`

Also updated existing `UserMessage { ... }` test fixtures to include the
new field.

## Codex author
`codex fork 019c1047-d5d5-7c92-a357-6009604dc7e8`
This commit is contained in:
Charley Cunningham
2026-02-06 09:43:00 -08:00
committed by GitHub
parent 4521a6e852
commit b7ecd166a6
2 changed files with 228 additions and 23 deletions

View File

@@ -527,6 +527,54 @@ async fn interrupted_turn_restores_queued_messages_with_images_and_elements() {
);
}
#[tokio::test]
async fn interrupted_turn_restore_keeps_active_mode_for_resubmission() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(Some("gpt-5")).await;
chat.thread_id = Some(ThreadId::new());
chat.set_feature_enabled(Feature::CollaborationModes, true);
let plan_mask = collaboration_modes::plan_mask(chat.models_manager.as_ref())
.expect("expected plan collaboration mode");
let expected_mode = plan_mask
.mode
.expect("expected mode kind on plan collaboration mode");
chat.set_collaboration_mask(plan_mask);
chat.on_task_started();
chat.queued_user_messages.push_back(UserMessage {
text: "Implement the plan.".to_string(),
local_images: Vec::new(),
text_elements: Vec::new(),
mention_paths: HashMap::new(),
});
chat.refresh_queued_user_messages();
chat.handle_codex_event(Event {
id: "interrupt".into(),
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
});
assert_eq!(chat.bottom_pane.composer_text(), "Implement the plan.");
assert!(chat.queued_user_messages.is_empty());
assert_eq!(chat.active_collaboration_mode_kind(), expected_mode);
chat.handle_key_event(KeyEvent::from(KeyCode::Enter));
match next_submit_op(&mut op_rx) {
Op::UserTurn {
collaboration_mode: Some(CollaborationMode { mode, .. }),
personality: None,
..
} => assert_eq!(mode, expected_mode),
other => {
panic!("expected Op::UserTurn with active mode, got {other:?}")
}
}
assert_eq!(chat.active_collaboration_mode_kind(), expected_mode);
}
#[tokio::test]
async fn remap_placeholders_uses_attachment_labels() {
let placeholder_one = "[Image #1]";
@@ -1358,6 +1406,97 @@ async fn submit_user_message_with_mode_sets_coding_collaboration_mode() {
}
}
#[tokio::test]
async fn submit_user_message_with_mode_errors_when_mode_changes_during_running_turn() {
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(Some("gpt-5")).await;
chat.thread_id = Some(ThreadId::new());
chat.set_feature_enabled(Feature::CollaborationModes, true);
let plan_mask =
collaboration_modes::mask_for_kind(chat.models_manager.as_ref(), ModeKind::Plan)
.expect("expected plan collaboration mask");
chat.set_collaboration_mask(plan_mask);
chat.on_task_started();
let default_mode = collaboration_modes::default_mask(chat.models_manager.as_ref())
.expect("expected default collaboration mode");
chat.submit_user_message_with_mode("Implement the plan.".to_string(), default_mode);
assert_eq!(chat.active_collaboration_mode_kind(), ModeKind::Plan);
assert!(chat.queued_user_messages.is_empty());
assert_matches!(op_rx.try_recv(), Err(TryRecvError::Empty));
let rendered = drain_insert_history(&mut rx)
.iter()
.map(|lines| lines_to_single_string(lines))
.collect::<Vec<_>>()
.join("\n");
assert!(
rendered.contains("Cannot switch collaboration mode while a turn is running."),
"expected running-turn error message, got: {rendered:?}"
);
}
#[tokio::test]
async fn submit_user_message_with_mode_allows_same_mode_during_running_turn() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(Some("gpt-5")).await;
chat.thread_id = Some(ThreadId::new());
chat.set_feature_enabled(Feature::CollaborationModes, true);
let plan_mask =
collaboration_modes::mask_for_kind(chat.models_manager.as_ref(), ModeKind::Plan)
.expect("expected plan collaboration mask");
chat.set_collaboration_mask(plan_mask.clone());
chat.on_task_started();
chat.submit_user_message_with_mode("Continue planning.".to_string(), plan_mask);
assert_eq!(chat.active_collaboration_mode_kind(), ModeKind::Plan);
assert!(chat.queued_user_messages.is_empty());
match next_submit_op(&mut op_rx) {
Op::UserTurn {
collaboration_mode:
Some(CollaborationMode {
mode: ModeKind::Plan,
..
}),
personality: None,
..
} => {}
other => {
panic!("expected Op::UserTurn with plan collab mode, got {other:?}")
}
}
}
#[tokio::test]
async fn submit_user_message_with_mode_submits_when_plan_stream_is_not_active() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(Some("gpt-5")).await;
chat.thread_id = Some(ThreadId::new());
chat.set_feature_enabled(Feature::CollaborationModes, true);
let plan_mask =
collaboration_modes::mask_for_kind(chat.models_manager.as_ref(), ModeKind::Plan)
.expect("expected plan collaboration mask");
chat.set_collaboration_mask(plan_mask);
let default_mode = collaboration_modes::default_mask(chat.models_manager.as_ref())
.expect("expected default collaboration mode");
let expected_mode = default_mode
.mode
.expect("expected default collaboration mode kind");
chat.submit_user_message_with_mode("Implement the plan.".to_string(), default_mode);
assert_eq!(chat.active_collaboration_mode_kind(), expected_mode);
assert!(chat.queued_user_messages.is_empty());
match next_submit_op(&mut op_rx) {
Op::UserTurn {
collaboration_mode: Some(CollaborationMode { mode, .. }),
personality: None,
..
} => assert_eq!(mode, expected_mode),
other => {
panic!("expected Op::UserTurn with default collab mode, got {other:?}")
}
}
}
#[tokio::test]
async fn plan_implementation_popup_skips_replayed_turn_complete() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5")).await;
@@ -1986,6 +2125,55 @@ async fn unified_exec_begin_restores_working_status_snapshot() {
);
}
#[tokio::test]
async fn steer_enter_queues_while_plan_stream_is_active() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
chat.thread_id = Some(ThreadId::new());
chat.set_feature_enabled(Feature::CollaborationModes, true);
let plan_mask =
collaboration_modes::mask_for_kind(chat.models_manager.as_ref(), ModeKind::Plan)
.expect("expected plan collaboration mask");
chat.set_collaboration_mask(plan_mask);
chat.on_task_started();
chat.on_plan_delta("- Step 1".to_string());
chat.bottom_pane
.set_composer_text("queued submission".to_string(), Vec::new(), Vec::new());
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
assert_eq!(chat.active_collaboration_mode_kind(), ModeKind::Plan);
assert_eq!(chat.queued_user_messages.len(), 1);
assert_eq!(
chat.queued_user_messages.front().unwrap().text,
"queued submission"
);
assert_matches!(op_rx.try_recv(), Err(TryRecvError::Empty));
}
#[tokio::test]
async fn steer_enter_submits_when_plan_stream_is_not_active() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
chat.thread_id = Some(ThreadId::new());
chat.set_feature_enabled(Feature::CollaborationModes, true);
let plan_mask =
collaboration_modes::mask_for_kind(chat.models_manager.as_ref(), ModeKind::Plan)
.expect("expected plan collaboration mask");
chat.set_collaboration_mask(plan_mask);
chat.on_task_started();
chat.bottom_pane
.set_composer_text("submitted immediately".to_string(), Vec::new(), Vec::new());
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
assert!(chat.queued_user_messages.is_empty());
match next_submit_op(&mut op_rx) {
Op::UserTurn {
personality: None, ..
} => {}
other => panic!("expected Op::UserTurn, got {other:?}"),
}
}
#[tokio::test]
async fn ctrl_c_shutdown_works_with_caps_lock() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;