Compare commits

...

2 Commits

Author SHA1 Message Date
Jeremy Rose
1a263394b9 fmt 2025-09-02 10:21:44 -07:00
Jeremy Rose
fdef3fbe82 interrupt instead of queueing 2025-09-01 22:06:07 -07:00
3 changed files with 117 additions and 17 deletions

View File

@@ -264,6 +264,9 @@ struct State {
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
pending_input: Vec<ResponseInputItem>,
history: ConversationHistory,
/// True while awaiting a streaming response from the model API for the
/// current task/turn.
waiting_on_api_response: bool,
}
/// Context for an initialized model agent
@@ -604,6 +607,24 @@ impl Session {
}
}
/// Mark that we are awaiting (or no longer awaiting) a model API response.
fn set_waiting_on_api_response(&self, waiting: bool) {
let mut state = self.state.lock_unchecked();
state.waiting_on_api_response = waiting;
}
/// Returns true if we are currently waiting on a model API response.
fn is_waiting_on_api_response(&self) -> bool {
self.state.lock_unchecked().waiting_on_api_response
}
/// Helper that sets `waiting_on_api_response = true` and automatically
/// clears it when dropped.
fn begin_waiting_on_api_response(&self) -> WaitingApiGuard<'_> {
self.set_waiting_on_api_response(true);
WaitingApiGuard { sess: self }
}
pub async fn request_command_approval(
&self,
sub_id: String,
@@ -888,8 +909,18 @@ impl Session {
/// Returns the input if there was no task running to inject into
pub fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
let waiting = self.is_waiting_on_api_response();
let mut state = self.state.lock_unchecked();
if state.current_task.is_some() {
// If we're in the middle of streaming a model response, abort the
// current task so we can immediately start a fresh request that
// includes everything recorded so far plus this new user input.
if waiting {
// Do not mutate pending_input here; instruct caller to spawn a new task.
return Err(input);
}
// Otherwise, queue the input for inclusion on the next turn.
state.pending_input.push(input.into());
Ok(())
} else {
@@ -908,6 +939,12 @@ impl Session {
}
}
/// Returns true if there is pending input that should be included on the
/// next turn. Unlike `get_pending_input()`, this does not consume it.
pub fn has_pending_input(&self) -> bool {
!self.state.lock_unchecked().pending_input.is_empty()
}
pub async fn call_tool(
&self,
server: &str,
@@ -966,6 +1003,17 @@ impl Drop for Session {
}
}
/// RAII guard that flips `waiting_on_api_response` back to false when dropped.
struct WaitingApiGuard<'a> {
sess: &'a Session,
}
impl Drop for WaitingApiGuard<'_> {
fn drop(&mut self) {
self.sess.set_waiting_on_api_response(false);
}
}
#[derive(Clone, Debug)]
pub(crate) struct ExecCommandContext {
pub(crate) sub_id: String,
@@ -1036,14 +1084,16 @@ impl AgentTask {
// TOCTOU?
if !self.handle.is_finished() {
self.handle.abort();
let event = Event {
id: self.sub_id,
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
};
let tx_event = self.sess.tx_event.clone();
tokio::spawn(async move {
tx_event.send(event).await.ok();
});
if matches!(reason, TurnAbortReason::Interrupted) {
let event = Event {
id: self.sub_id,
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
};
let tx_event = self.sess.tx_event.clone();
tokio::spawn(async move {
tx_event.send(event).await.ok();
});
}
}
}
}
@@ -1572,10 +1622,20 @@ async fn run_task(
}
if responses.is_empty() {
debug!("Turn completed");
// Turn completed with an assistant message.
last_agent_message = get_last_assistant_message_from_turn(
&items_to_record_in_conversation_history,
);
// If there is pending input that arrived during this
// turn, keep the task alive and start another turn so the
// pending input is sent immediately.
if sess.has_pending_input() {
debug!("Turn completed; pending input present — continuing");
continue;
}
debug!("Turn completed");
sess.maybe_notify(UserNotification::AgentTurnComplete {
turn_id: sub_id.clone(),
input_messages: turn_input_messages,
@@ -1739,6 +1799,8 @@ async fn try_run_turn(
})
};
// We are now awaiting a response from the model API.
let _waiting_guard = sess.begin_waiting_on_api_response();
let mut stream = turn_context.client.clone().stream(&prompt).await?;
let mut output = Vec::new();

View File

@@ -74,9 +74,33 @@ pub enum HistoryPersistence {
None,
}
/// Behavior for handling a user message submitted while a turn is in progress.
#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)]
#[serde(rename_all = "lowercase")]
pub enum MessageDuringTurnBehavior {
/// Queue the message to be sent after the current turn completes.
#[default]
Queue,
/// Interrupt the current model stream. If mid tool-call, do not interrupt;
/// send the user message so it will be included with the tool outputs on
/// the next turn.
Interrupt,
}
/// Collection of settings that are specific to the TUI.
#[derive(Deserialize, Debug, Clone, PartialEq, Default)]
pub struct Tui {}
#[derive(Deserialize, Debug, Clone, PartialEq)]
pub struct Tui {
#[serde(default)]
pub message_during_turn_behavior: MessageDuringTurnBehavior,
}
impl Default for Tui {
fn default() -> Self {
Self {
message_during_turn_behavior: MessageDuringTurnBehavior::Queue,
}
}
}
#[derive(Deserialize, Debug, Clone, PartialEq, Default)]
pub struct SandboxWorkspaceWrite {

View File

@@ -77,6 +77,7 @@ use codex_common::approval_presets::builtin_approval_presets;
use codex_common::model_presets::ModelPreset;
use codex_common::model_presets::builtin_model_presets;
use codex_core::ConversationManager;
use codex_core::config_types::MessageDuringTurnBehavior;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol_config_types::ReasoningEffort as ReasoningEffortConfig;
@@ -116,6 +117,7 @@ pub(crate) struct ChatWidget {
last_history_was_exec: bool,
// User messages queued while a turn is in progress
queued_user_messages: VecDeque<UserMessage>,
// No TUI-level tracking; core decides mid-tool-call behavior.
}
struct UserMessage {
@@ -272,10 +274,10 @@ impl ChatWidget {
/// When there are queued user messages, restore them into the composer
/// separated by newlines rather than autosubmitting the next one.
fn on_interrupted_turn(&mut self) {
// Finalize, log a gentle prompt, and clear running state.
// User-initiated interrupt: finalize with a gentle prompt
// and restore any queued messages into the composer.
self.finalize_turn_with_error_message("Tell the model what to do differently".to_owned());
// If any messages were queued during the task, restore them into the composer.
if !self.queued_user_messages.is_empty() {
let combined = self
.queued_user_messages
@@ -284,7 +286,6 @@ impl ChatWidget {
.collect::<Vec<_>>()
.join("\n");
self.bottom_pane.set_composer_text(combined);
// Clear the queue and update the status indicator list.
self.queued_user_messages.clear();
self.refresh_queued_user_messages();
}
@@ -342,6 +343,7 @@ impl ChatWidget {
|q| q.push_patch_end(event),
|s| s.handle_patch_apply_end_now(ev2),
);
// No TUI-level tracking
}
fn on_exec_command_end(&mut self, ev: ExecCommandEndEvent) {
@@ -727,14 +729,24 @@ impl ChatWidget {
_ => {
match self.bottom_pane.handle_key_event(key_event) {
InputResult::Submitted(text) => {
// If a task is running, queue the user input to be sent after the turn completes.
// Build the message to submit or defer.
let user_message = UserMessage {
text,
image_paths: self.bottom_pane.take_recent_submission_images(),
};
if self.bottom_pane.is_task_running() {
self.queued_user_messages.push_back(user_message);
self.refresh_queued_user_messages();
match self.config.tui.message_during_turn_behavior {
MessageDuringTurnBehavior::Queue => {
// Original behavior: queue for after the turn completes.
self.queued_user_messages.push_back(user_message);
self.refresh_queued_user_messages();
}
MessageDuringTurnBehavior::Interrupt => {
// Forward to Codex and print immediately; Core will either
// queue alongside tool outputs or replace the running turn.
self.submit_user_message(user_message);
}
}
} else {
self.submit_user_message(user_message);
}
@@ -1032,6 +1044,8 @@ impl ChatWidget {
}
}
// No TUI-level mid-tool-call logic; Core manages it.
// If idle and there are queued inputs, submit exactly one to start the next turn.
fn maybe_send_next_queued_input(&mut self) {
if self.bottom_pane.is_task_running() {