Handle interrupted request_user_input responses

This commit is contained in:
Charles Cunningham
2026-01-31 17:54:44 -08:00
parent c5dd0b0a1e
commit 48b7012d5a
10 changed files with 165 additions and 434 deletions

View File

@@ -1660,9 +1660,13 @@ impl Session {
pub async fn notify_user_input_response(
&self,
sub_id: &str,
call_id: Option<String>,
_call_id: Option<String>,
response: RequestUserInputResponse,
) {
if response.interrupted {
// An interrupted request_user_input should end the turn without a follow-up request.
self.mark_request_user_input_interrupted().await;
}
let entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
@@ -1679,32 +1683,7 @@ impl Session {
tx_response.send(response).ok();
}
None => {
// No pending user-input sender means the tool response arrived after the
// original request_user_input turn ended (e.g., the TUI queued partial
// answers on interrupt and is replaying them on the next user turn).
// Record a function_call_output so history/rollout includes the tool
// response in the correct order before the next user message, without
// starting a new model request immediately.
if response.answers.is_empty() && !response.interrupted {
warn!(
"dropping empty request_user_input response for sub_id: {sub_id}; likely cancelled"
);
return;
}
let call_id = call_id.unwrap_or_else(|| sub_id.to_string());
let call_id_for_log = call_id.clone();
let response_item = match response.to_function_call_output(call_id) {
Ok(item) => item,
Err(err) => {
warn!(
"failed to serialize request_user_input response for call_id: {call_id_for_log}: {err}"
);
return;
}
};
let turn_context = self.new_default_turn_with_sub_id(sub_id.to_string()).await;
self.record_conversation_items(&turn_context, &[response_item])
.await;
warn!("No pending user input found for sub_id: {sub_id}");
}
}
}
@@ -2233,6 +2212,25 @@ impl Session {
}
}
pub async fn mark_request_user_input_interrupted(&self) {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut() {
let mut ts = at.turn_state.lock().await;
ts.mark_request_user_input_interrupted();
}
}
pub async fn take_request_user_input_interrupted(&self) -> bool {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.take_request_user_input_interrupted()
}
None => false,
}
}
pub async fn list_resources(
&self,
server: &str,
@@ -4274,7 +4272,7 @@ async fn try_run_sampling_request(
let plan_mode = turn_context.collaboration_mode_kind == ModeKind::Plan;
let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id));
let receiving_span = trace_span!("receiving_stream");
let outcome: CodexResult<SamplingRequestResult> = loop {
let mut outcome: CodexResult<SamplingRequestResult> = loop {
let handle_responses = trace_span!(
parent: &receiving_span,
"handle_responses",
@@ -4485,6 +4483,13 @@ async fn try_run_sampling_request(
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
if let Ok(result) = outcome.as_mut() {
// Suppress follow-up requests when request_user_input was interrupted.
if sess.take_request_user_input_interrupted().await {
result.needs_follow_up = false;
}
}
if should_emit_turn_diff {
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;

View File

@@ -73,6 +73,8 @@ pub(crate) struct TurnState {
pending_user_input: HashMap<String, oneshot::Sender<RequestUserInputResponse>>,
pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>,
pending_input: Vec<ResponseInputItem>,
/// True when a request_user_input flow was interrupted and should suppress follow-up.
request_user_input_interrupted: bool,
}
impl TurnState {
@@ -96,6 +98,7 @@ impl TurnState {
self.pending_user_input.clear();
self.pending_dynamic_tools.clear();
self.pending_input.clear();
self.request_user_input_interrupted = false;
}
pub(crate) fn insert_pending_user_input(
@@ -145,6 +148,14 @@ impl TurnState {
pub(crate) fn has_pending_input(&self) -> bool {
!self.pending_input.is_empty()
}
pub(crate) fn mark_request_user_input_interrupted(&mut self) {
self.request_user_input_interrupted = true;
}
pub(crate) fn take_request_user_input_interrupted(&mut self) -> bool {
std::mem::take(&mut self.request_user_input_interrupted)
}
}
impl ActiveTurn {

View File

@@ -1,8 +1,5 @@
#![allow(clippy::unwrap_used)]
use std::collections::HashMap;
use std::time::Duration;
use codex_core::features::Feature;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
@@ -12,7 +9,6 @@ use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::Settings;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::user_input::UserInput;
@@ -28,10 +24,11 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::collections::HashMap;
fn call_output(req: &ResponsesRequest, call_id: &str) -> String {
let raw = req.function_call_output(call_id);
@@ -147,15 +144,11 @@ async fn request_user_input_round_trip_resolves_pending() -> anyhow::Result<()>
})
.await?;
let request_event = wait_for_event_with_timeout(
&codex,
|event| matches!(event, EventMsg::RequestUserInput(_)),
Duration::from_secs(20),
)
let request = wait_for_event_match(&codex, |event| match event {
EventMsg::RequestUserInput(request) => Some(request.clone()),
_ => None,
})
.await;
let EventMsg::RequestUserInput(request) = request_event else {
panic!("expected RequestUserInput event");
};
assert_eq!(request.call_id, call_id);
assert_eq!(request.questions.len(), 1);
assert_eq!(request.questions[0].is_other, true);
@@ -197,194 +190,6 @@ async fn request_user_input_round_trip_resolves_pending() -> anyhow::Result<()>
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn request_user_input_partial_answers_replayed_after_interrupt() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let builder = test_codex();
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder
.with_config(|config| {
config.features.enable(Feature::CollaborationModes);
})
.build(&server)
.await?;
let call_id = "user-input-partial-call";
let request_args = json!({
"questions": [{
"id": "confirm_path",
"header": "Confirm",
"question": "Proceed with the plan?",
"isOther": false,
"options": [{
"label": "Yes (Recommended)",
"description": "Continue the current plan."
}, {
"label": "No",
"description": "Stop and revisit the approach."
}]
}, {
"id": "details",
"header": "Details",
"question": "Any extra notes?",
"isOther": false,
"options": [{
"label": "No extra notes (Recommended)",
"description": "Continue without additional detail."
}, {
"label": "Add notes",
"description": "Provide more context."
}]
}]
})
.to_string();
let first_response = sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "request_user_input", &request_args),
ev_completed("resp-1"),
]);
responses::mount_sse_once(&server, first_response).await;
let second_response = sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "acknowledged"),
ev_completed("resp-2"),
]);
let second_mock = responses::mount_sse_once(&server, second_response).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "please confirm".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model.clone(),
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: Some(CollaborationMode {
mode: ModeKind::Plan,
settings: Settings {
model: session_configured.model.clone(),
reasoning_effort: None,
developer_instructions: None,
},
}),
personality: None,
})
.await?;
let request_event = wait_for_event_with_timeout(
&codex,
|event| matches!(event, EventMsg::RequestUserInput(_)),
Duration::from_secs(20),
)
.await;
let EventMsg::RequestUserInput(request) = request_event else {
panic!("expected RequestUserInput event");
};
assert_eq!(request.call_id, call_id);
codex.submit(Op::Interrupt).await?;
wait_for_event(&codex, |event| match event {
EventMsg::TurnAborted(ev) => ev.reason == TurnAbortReason::Interrupted,
_ => false,
})
.await;
let mut answers = HashMap::new();
answers.insert(
"confirm_path".to_string(),
RequestUserInputAnswer {
answers: vec!["yes".to_string()],
},
);
codex
.submit(Op::UserInputAnswer {
id: request.turn_id.clone(),
call_id: Some(request.call_id.clone()),
response: RequestUserInputResponse {
answers,
interrupted: true,
},
})
.await?;
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "continue".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: Some(CollaborationMode {
mode: ModeKind::Plan,
settings: Settings {
model: session_configured.model.clone(),
reasoning_effort: None,
developer_instructions: None,
},
}),
personality: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let req = second_mock.single_request();
let input = req.input();
let output_item = input
.iter()
.rev()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})
.expect("pending function_call_output present");
let output = output_item.get("output").cloned().unwrap_or(Value::Null);
let output_text = match output {
Value::String(text) => text,
Value::Object(obj) => obj
.get("content")
.and_then(Value::as_str)
.expect("function_call_output missing content")
.to_string(),
_ => panic!("unexpected function_call_output payload"),
};
let output_json: Value = serde_json::from_str(&output_text)?;
assert_eq!(
output_json,
json!({
"answers": {
"confirm_path": { "answers": ["yes"] }
},
"interrupted": true
})
);
Ok(())
}
async fn assert_request_user_input_rejected<F>(mode_name: &str, build_mode: F) -> anyhow::Result<()>
where
F: FnOnce(String) -> CollaborationMode,

View File

@@ -1,7 +1,5 @@
use std::collections::HashMap;
use crate::models::FunctionCallOutputPayload;
use crate::models::ResponseItem;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
@@ -54,21 +52,6 @@ impl RequestUserInputResponse {
pub fn to_output_content(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
pub fn to_function_call_output(
&self,
call_id: String,
) -> Result<ResponseItem, serde_json::Error> {
let content = self.to_output_content()?;
Ok(ResponseItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content,
success: Some(true),
..Default::default()
},
})
}
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]

View File

@@ -1487,19 +1487,6 @@ impl App {
}
}
}
AppEvent::QueueRequestUserInputAnswers {
turn_id,
call_id,
answers,
interrupted,
} => {
self.chat_widget.queue_request_user_input_answers(
turn_id,
call_id,
answers,
interrupted,
);
}
AppEvent::StartCommitAnimation => {
if self
.commit_anim_running

View File

@@ -8,7 +8,6 @@
//! Exit is modelled explicitly via `AppEvent::Exit(ExitMode)` so callers can request shutdown-first
//! quits without reaching into the app loop or coupling to shutdown/exit sequencing.
use std::collections::HashMap;
use std::path::PathBuf;
use codex_chatgpt::connectors::AppInfo;
@@ -18,7 +17,6 @@ use codex_core::protocol::RateLimitSnapshot;
use codex_file_search::FileMatch;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use crate::bottom_pane::ApprovalRequest;
use crate::history_cell::HistoryCell;
@@ -114,15 +112,6 @@ pub(crate) enum AppEvent {
InsertHistoryCell(Box<dyn HistoryCell>),
/// Buffer request_user_input answers (typically from an interrupted UI) so they
/// are sent before the next user turn, preserving tool call/output ordering.
QueueRequestUserInputAnswers {
turn_id: String,
call_id: String,
answers: HashMap<String, RequestUserInputAnswer>,
interrupted: bool,
},
StartCommitAnimation,
StopCommitAnimation,
CommitTick,

View File

@@ -671,6 +671,7 @@ impl RequestUserInputOverlay {
self.sync_composer_placeholder();
}
/// Build the answer payload for a question if it is committed (or include empty if requested).
fn answer_for_question(
&self,
idx: usize,
@@ -692,8 +693,7 @@ impl RequestUserInputOverlay {
} else {
None
};
let draft = answer_state.draft.clone();
let notes = draft.text_with_pending().trim().to_string();
let notes = answer_state.draft.text_with_pending().trim().to_string();
let selected_label = selected_idx
.and_then(|selected_idx| Self::option_label_for_index(question, selected_idx));
let mut answers = selected_label.into_iter().collect::<Vec<_>>();
@@ -706,6 +706,7 @@ impl RequestUserInputOverlay {
Some(RequestUserInputAnswer { answers })
}
/// Submit committed answers when the questions UI is explicitly interrupted.
fn submit_committed_answers_for_interrupt(&mut self) {
let mut answers = HashMap::new();
for (idx, question) in self.request.questions.iter().enumerate() {
@@ -715,17 +716,16 @@ impl RequestUserInputOverlay {
}
// This path only runs when the user explicitly interrupts the questions UI.
let interrupted = true;
if answers.is_empty() && !interrupted {
return;
}
let history_answers = answers.clone();
self.app_event_tx
.send(AppEvent::QueueRequestUserInputAnswers {
turn_id: self.request.turn_id.clone(),
call_id: self.request.call_id.clone(),
answers,
interrupted,
});
.send(AppEvent::CodexOp(Op::UserInputAnswer {
id: self.request.turn_id.clone(),
call_id: Some(self.request.call_id.clone()),
response: RequestUserInputResponse {
answers,
interrupted,
},
}));
self.app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
history_cell::new_request_user_input_result(
self.request.questions.clone(),
@@ -1044,7 +1044,6 @@ impl BottomPaneView for RequestUserInputOverlay {
if matches!(key_event.code, KeyCode::Esc) {
self.submit_committed_answers_for_interrupt();
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
self.done = true;
return;
}
@@ -1252,7 +1251,6 @@ impl BottomPaneView for RequestUserInputOverlay {
if self.confirm_unanswered_active() {
self.close_unanswered_confirmation();
self.submit_committed_answers_for_interrupt();
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
self.done = true;
return CancellationEvent::Handled;
}
@@ -1262,7 +1260,6 @@ impl BottomPaneView for RequestUserInputOverlay {
}
self.submit_committed_answers_for_interrupt();
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
self.done = true;
CancellationEvent::Handled
}
@@ -1758,25 +1755,18 @@ mod tests {
assert_eq!(overlay.done, true);
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::QueueRequestUserInputAnswers {
answers,
interrupted,
..
} = event
else {
panic!("expected queued answers");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
panic!("expected UserInputAnswer");
};
assert!(interrupted, "expected interrupted flag");
assert!(answers.is_empty(), "expected no committed answers");
assert!(response.interrupted, "expected interrupted flag");
assert!(response.answers.is_empty(), "expected no committed answers");
let event = rx.try_recv().expect("expected history cell");
assert!(matches!(event, AppEvent::InsertHistoryCell(_)));
let event = rx.try_recv().expect("expected interrupt AppEvent");
let AppEvent::CodexOp(op) = event else {
panic!("expected CodexOp");
};
assert_eq!(op, Op::Interrupt);
assert!(
rx.try_recv().is_err(),
"unexpected AppEvent after history cell"
);
}
#[test]
@@ -1794,25 +1784,18 @@ mod tests {
assert_eq!(overlay.done, true);
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::QueueRequestUserInputAnswers {
answers,
interrupted,
..
} = event
else {
panic!("expected queued answers");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
panic!("expected UserInputAnswer");
};
assert!(interrupted, "expected interrupted flag");
assert!(answers.is_empty(), "expected no committed answers");
assert!(response.interrupted, "expected interrupted flag");
assert!(response.answers.is_empty(), "expected no committed answers");
let event = rx.try_recv().expect("expected history cell");
assert!(matches!(event, AppEvent::InsertHistoryCell(_)));
let event = rx.try_recv().expect("expected interrupt AppEvent");
let AppEvent::CodexOp(op) = event else {
panic!("expected CodexOp");
};
assert_eq!(op, Op::Interrupt);
assert!(
rx.try_recv().is_err(),
"unexpected AppEvent after history cell"
);
}
#[test]
@@ -1833,24 +1816,19 @@ mod tests {
assert_eq!(overlay.done, true);
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::QueueRequestUserInputAnswers {
answers: _,
interrupted,
..
} = event
else {
panic!("expected queued answers");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
panic!("expected UserInputAnswer");
};
assert!(interrupted, "expected interrupted flag");
assert!(response.interrupted, "expected interrupted flag");
let answer = response.answers.get("q1").expect("answer missing");
assert_eq!(answer.answers, vec!["Option 1".to_string()]);
let event = rx.try_recv().expect("expected history cell");
assert!(matches!(event, AppEvent::InsertHistoryCell(_)));
let event = rx.try_recv().expect("expected interrupt AppEvent");
let AppEvent::CodexOp(op) = event else {
panic!("expected CodexOp");
};
assert_eq!(op, Op::Interrupt);
assert!(
rx.try_recv().is_err(),
"unexpected AppEvent after history cell"
);
}
#[test]
@@ -1872,24 +1850,19 @@ mod tests {
assert_eq!(overlay.done, true);
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::QueueRequestUserInputAnswers {
answers: _,
interrupted,
..
} = event
else {
panic!("expected queued answers");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
panic!("expected UserInputAnswer");
};
assert!(interrupted, "expected interrupted flag");
assert!(response.interrupted, "expected interrupted flag");
let answer = response.answers.get("q1").expect("answer missing");
assert_eq!(answer.answers, vec!["Option 1".to_string()]);
let event = rx.try_recv().expect("expected history cell");
assert!(matches!(event, AppEvent::InsertHistoryCell(_)));
let event = rx.try_recv().expect("expected interrupt AppEvent");
let AppEvent::CodexOp(op) = event else {
panic!("expected CodexOp");
};
assert_eq!(op, Op::Interrupt);
assert!(
rx.try_recv().is_err(),
"unexpected AppEvent after history cell"
);
}
#[test]
@@ -1918,26 +1891,19 @@ mod tests {
overlay.handle_key_event(KeyEvent::from(KeyCode::Esc));
let event = rx.try_recv().expect("expected partial answers");
let AppEvent::QueueRequestUserInputAnswers {
answers,
interrupted,
..
} = event
else {
panic!("expected queued answers");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
panic!("expected UserInputAnswer");
};
assert!(interrupted, "expected interrupted flag");
let answer = answers.get("q1").expect("answer missing");
assert!(response.interrupted, "expected interrupted flag");
let answer = response.answers.get("q1").expect("answer missing");
assert_eq!(answer.answers, vec!["Option 1".to_string()]);
let event = rx.try_recv().expect("expected history cell");
assert!(matches!(event, AppEvent::InsertHistoryCell(_)));
let event = rx.try_recv().expect("expected interrupt AppEvent");
let AppEvent::CodexOp(op) = event else {
panic!("expected CodexOp");
};
assert_eq!(op, Op::Interrupt);
assert!(
rx.try_recv().is_err(),
"unexpected AppEvent after history cell"
);
}
#[test]

View File

@@ -72,6 +72,7 @@ use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::Op;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::RateLimitSnapshot;
use codex_core::protocol::RawResponseItemEvent;
use codex_core::protocol::ReviewRequest;
use codex_core::protocol::ReviewTarget;
use codex_core::protocol::SkillMetadata as ProtocolSkillMetadata;
@@ -103,10 +104,11 @@ use codex_protocol::config_types::Personality;
use codex_protocol::config_types::Settings;
#[cfg(target_os = "windows")]
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::local_image_label_text;
use codex_protocol::parse_command::ParsedCommand;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputEvent;
use codex_protocol::request_user_input::RequestUserInputQuestion;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::user_input::TextElement;
use codex_protocol::user_input::UserInput;
@@ -389,13 +391,6 @@ pub(crate) struct ChatWidgetInit {
pub(crate) otel_manager: OtelManager,
}
struct PendingRequestUserInputAnswers {
turn_id: String,
call_id: String,
answers: HashMap<String, RequestUserInputAnswer>,
interrupted: bool,
}
#[derive(Default)]
enum RateLimitSwitchPromptState {
#[default]
@@ -538,8 +533,8 @@ pub(crate) struct ChatWidget {
suppress_session_configured_redraw: bool,
// User messages queued while a turn is in progress
queued_user_messages: VecDeque<UserMessage>,
// Partial request_user_input answers to send before the next user turn.
pending_request_user_input_answers: VecDeque<PendingRequestUserInputAnswers>,
// Replayed request_user_input questions keyed by call_id to render answers on resume.
replay_request_user_input_questions: HashMap<String, Vec<RequestUserInputQuestion>>,
// Pending notification to show when unfocused on next Draw
pending_notification: Option<Notification>,
/// When `Some`, the user has pressed a quit shortcut and the second press
@@ -807,7 +802,7 @@ impl ChatWidget {
.set_history_metadata(event.history_log_id, event.history_entry_count);
self.set_skills(None);
self.bottom_pane.set_connectors_snapshot(None);
self.clear_pending_request_user_input_answers();
self.replay_request_user_input_questions.clear();
self.thread_id = Some(event.session_id);
self.thread_name = event.thread_name.clone();
self.forked_from = event.forked_from_id;
@@ -1502,6 +1497,37 @@ impl ChatWidget {
);
}
/// Cache replayed request_user_input questions so answers can render once the
/// corresponding tool output is replayed.
fn cache_request_user_input_for_replay(&mut self, ev: RequestUserInputEvent) {
self.replay_request_user_input_questions
.insert(ev.call_id, ev.questions);
}
/// Replay handler for request_user_input tool outputs, rendering them into history.
fn on_raw_response_item_replay(&mut self, event: RawResponseItemEvent) {
let ResponseItem::FunctionCallOutput { call_id, output } = event.item else {
return;
};
let Some(questions) = self.replay_request_user_input_questions.remove(&call_id) else {
return;
};
let response: RequestUserInputResponse = match serde_json::from_str(&output.content) {
Ok(response) => response,
Err(err) => {
tracing::warn!(
"failed to parse request_user_input response for call_id {call_id}: {err}"
);
return;
}
};
self.add_to_history(history_cell::new_request_user_input_result(
questions,
response.answers,
response.interrupted,
));
}
fn on_exec_command_begin(&mut self, ev: ExecCommandBeginEvent) {
self.flush_answer_stream_with_separator();
if is_unified_exec_source(ev.source) {
@@ -2271,7 +2297,7 @@ impl ChatWidget {
thread_name: None,
forked_from: None,
queued_user_messages: VecDeque::new(),
pending_request_user_input_answers: VecDeque::new(),
replay_request_user_input_questions: HashMap::new(),
show_welcome_banner: is_first_run,
suppress_session_configured_redraw: false,
pending_notification: None,
@@ -2421,7 +2447,7 @@ impl ChatWidget {
plan_delta_buffer: String::new(),
plan_item_active: false,
queued_user_messages: VecDeque::new(),
pending_request_user_input_answers: VecDeque::new(),
replay_request_user_input_questions: HashMap::new(),
show_welcome_banner: is_first_run,
suppress_session_configured_redraw: false,
pending_notification: None,
@@ -2552,7 +2578,7 @@ impl ChatWidget {
thread_name: None,
forked_from: None,
queued_user_messages: VecDeque::new(),
pending_request_user_input_answers: VecDeque::new(),
replay_request_user_input_questions: HashMap::new(),
show_welcome_banner: false,
suppress_session_configured_redraw: true,
pending_notification: None,
@@ -3151,54 +3177,6 @@ impl ChatWidget {
}
}
pub(crate) fn queue_request_user_input_answers(
&mut self,
turn_id: String,
call_id: String,
answers: HashMap<String, RequestUserInputAnswer>,
interrupted: bool,
) {
if answers.is_empty() && !interrupted {
return;
}
if let Some(existing) = self
.pending_request_user_input_answers
.iter_mut()
.find(|pending| pending.call_id == call_id)
{
existing.turn_id = turn_id;
existing.answers = answers;
existing.interrupted = interrupted;
return;
}
self.pending_request_user_input_answers
.push_back(PendingRequestUserInputAnswers {
turn_id,
call_id,
answers,
interrupted,
});
}
fn flush_pending_request_user_input_answers(&mut self) {
let pending = std::mem::take(&mut self.pending_request_user_input_answers);
for pending in pending {
let call_id = pending.call_id;
self.submit_op(Op::UserInputAnswer {
id: pending.turn_id,
call_id: Some(call_id),
response: RequestUserInputResponse {
answers: pending.answers,
interrupted: pending.interrupted,
},
});
}
}
fn clear_pending_request_user_input_answers(&mut self) {
self.pending_request_user_input_answers.clear();
}
fn submit_user_message(&mut self, user_message: UserMessage) {
if !self.is_session_configured() {
tracing::warn!("cannot submit user message before session is configured; queueing");
@@ -3277,7 +3255,6 @@ impl ChatWidget {
});
}
}
self.flush_pending_request_user_input_answers();
let effective_mode = self.effective_collaboration_mode();
let collaboration_mode = if self.collaboration_modes_enabled() {
@@ -3434,11 +3411,15 @@ impl ChatWidget {
EventMsg::McpStartupUpdate(ev) => self.on_mcp_startup_update(ev),
EventMsg::McpStartupComplete(ev) => self.on_mcp_startup_complete(ev),
EventMsg::TurnAborted(ev) => match ev.reason {
TurnAbortReason::Interrupted => self.on_interrupted_turn(ev.reason),
TurnAbortReason::Interrupted => {
self.on_interrupted_turn(ev.reason);
}
TurnAbortReason::Replaced => {
self.on_error("Turn aborted: replaced by a new task".to_owned())
}
TurnAbortReason::ReviewEnded => self.on_interrupted_turn(ev.reason),
TurnAbortReason::ReviewEnded => {
self.on_interrupted_turn(ev.reason);
}
},
EventMsg::PlanUpdate(update) => self.on_plan_update(update),
EventMsg::ExecApprovalRequest(ev) => {
@@ -3452,12 +3433,12 @@ impl ChatWidget {
self.on_elicitation_request(ev);
}
EventMsg::RequestUserInput(ev) => {
// Replay should not reopen interactive question UIs for now; we only replay the
// persisted tool call/output items. Once we reconstruct answers on replay,
// remove this guard and render the questions from history instead.
// TODO: Support request_user_input replay by reconstructing question/answer
// history and rendering without re-prompting.
if !from_replay {
if from_replay {
// During replay, avoid reopening the questions UI and instead cache
// questions until we see the matching tool output.
// TODO: support replaying unanswered questions as a UI flow.
self.cache_request_user_input_for_replay(ev);
} else {
self.on_request_user_input(ev);
}
}
@@ -3515,9 +3496,13 @@ impl ChatWidget {
EventMsg::CollabWaitingEnd(ev) => self.on_collab_event(collab::waiting_end(ev)),
EventMsg::CollabCloseBegin(_) => {}
EventMsg::CollabCloseEnd(ev) => self.on_collab_event(collab::close_end(ev)),
EventMsg::ThreadRolledBack(_) => self.clear_pending_request_user_input_answers(),
EventMsg::RawResponseItem(_)
| EventMsg::ItemStarted(_)
EventMsg::ThreadRolledBack(_) => {}
EventMsg::RawResponseItem(event) => {
if from_replay {
self.on_raw_response_item_replay(event);
}
}
EventMsg::ItemStarted(_)
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)

View File

@@ -832,7 +832,7 @@ async fn make_chatwidget_manual(
frame_requester: FrameRequester::test_dummy(),
show_welcome_banner: true,
queued_user_messages: VecDeque::new(),
pending_request_user_input_answers: VecDeque::new(),
replay_request_user_input_questions: HashMap::new(),
suppress_session_configured_redraw: false,
pending_notification: None,
quit_shortcut_expires_at: None,

View File

@@ -1761,6 +1761,7 @@ pub(crate) fn new_error_event(message: String) -> PlainHistoryCell {
PlainHistoryCell { lines }
}
/// Build a history cell that renders request_user_input questions and answers.
pub(crate) fn new_request_user_input_result(
questions: Vec<RequestUserInputQuestion>,
answers: HashMap<String, RequestUserInputAnswer>,
@@ -1773,6 +1774,7 @@ pub(crate) fn new_request_user_input_result(
}
}
/// Renders a completed (or interrupted) request_user_input exchange in history.
#[derive(Debug)]
pub(crate) struct RequestUserInputResultCell {
questions: Vec<RequestUserInputQuestion>,
@@ -1817,8 +1819,6 @@ impl HistoryCell for RequestUserInputResultCell {
Style::default(),
);
if answer_missing && let Some(last) = question_lines.last_mut() {
let spans = last.spans.to_vec();
last.spans = spans;
last.spans.push(" (unanswered)".dim());
}
lines.extend(question_lines);