Compare commits

...

3 Commits

Author SHA1 Message Date
jif-oai
144c488ba2 fix 2026-01-29 13:07:24 +01:00
jif-oai
63f7351489 Merge remote-tracking branch 'origin/main' into jif/parallel-tasks
# Conflicts:
#	codex-rs/core/src/tasks/mod.rs
2026-01-28 15:49:54 +01:00
jif-oai
b2d779d96e feat: run user shell in parallel 2026-01-26 13:04:04 +00:00
7 changed files with 279 additions and 53 deletions

View File

@@ -164,6 +164,7 @@ use crate::skills::SkillsManager;
use crate::skills::build_skill_injections;
use crate::skills::collect_explicit_skill_mentions;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::SessionServices;
use crate::state::SessionState;
use crate::state_db;
@@ -2056,12 +2057,12 @@ impl Session {
pub async fn inject_input(&self, input: Vec<UserInput>) -> Result<(), Vec<UserInput>> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
Some(at) if at.tasks.values().any(RunningTask::accepts_pending_input) => {
let mut ts = at.turn_state.lock().await;
ts.push_pending_input(input.into());
Ok(())
}
None => Err(input),
Some(_) | None => Err(input),
}
}
@@ -2072,14 +2073,14 @@ impl Session {
) -> Result<(), Vec<ResponseInputItem>> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
Some(at) if at.tasks.values().any(RunningTask::accepts_pending_input) => {
let mut ts = at.turn_state.lock().await;
for item in input {
ts.push_pending_input(item);
}
Ok(())
}
None => Err(input),
Some(_) | None => Err(input),
}
}
@@ -2432,6 +2433,7 @@ mod handlers {
use crate::review_prompts::resolve_review_request;
use crate::tasks::CompactTask;
use crate::tasks::RegularTask;
use crate::tasks::SpawnAbortPolicy;
use crate::tasks::UndoTask;
use crate::tasks::UserShellCommandTask;
use codex_protocol::custom_prompts::CustomPrompt;
@@ -2618,10 +2620,11 @@ mod handlers {
previous_context: &mut Option<Arc<TurnContext>>,
) {
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
sess.spawn_task(
sess.spawn_task_with_policy(
Arc::clone(&turn_context),
Vec::new(),
UserShellCommandTask::new(command),
SpawnAbortPolicy::KeepActiveTurn,
)
.await;
*previous_context = Some(turn_context);

View File

@@ -50,6 +50,12 @@ pub(crate) struct RunningTask {
pub(crate) _timer: Option<codex_otel::Timer>,
}
impl RunningTask {
pub(crate) fn accepts_pending_input(&self) -> bool {
self.task.accepts_pending_input()
}
}
impl ActiveTurn {
pub(crate) fn add_task(&mut self, task: RunningTask) {
let sub_id = task.turn_context.sub_id.clone();

View File

@@ -43,6 +43,12 @@ pub(crate) use user_shell::UserShellCommandTask;
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn on purpose. If any tools/commands were aborted, they may have partially executed; verify current state before retrying.";
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum SpawnAbortPolicy {
AbortActiveTurn,
KeepActiveTurn,
}
/// Thin wrapper that exposes the parts of [`Session`] task runners need.
#[derive(Clone)]
pub(crate) struct SessionTaskContext {
@@ -81,6 +87,11 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
/// surface it in telemetry and UI.
fn kind(&self) -> TaskKind;
/// Whether this task can consume pending user input injected mid-turn.
fn accepts_pending_input(&self) -> bool {
true
}
/// Executes the task until completion or cancellation.
///
/// Implementations typically stream protocol events using `session` and
@@ -114,7 +125,20 @@ impl Session {
input: Vec<UserInput>,
task: T,
) {
self.abort_all_tasks(TurnAbortReason::Replaced).await;
self.spawn_task_with_policy(turn_context, input, task, SpawnAbortPolicy::AbortActiveTurn)
.await;
}
pub async fn spawn_task_with_policy<T: SessionTask>(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
task: T,
policy: SpawnAbortPolicy,
) {
if policy == SpawnAbortPolicy::AbortActiveTurn {
self.abort_all_tasks(TurnAbortReason::Replaced).await;
}
self.seed_initial_context_if_needed(turn_context.as_ref())
.await;
@@ -200,9 +224,14 @@ impl Session {
async fn register_new_active_task(&self, task: RunningTask) {
let mut active = self.active_turn.lock().await;
let mut turn = ActiveTurn::default();
turn.add_task(task);
*active = Some(turn);
match active.as_mut() {
Some(turn) => turn.add_task(task),
None => {
let mut turn = ActiveTurn::default();
turn.add_task(task);
*active = Some(turn);
}
}
}
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {

View File

@@ -52,6 +52,10 @@ impl SessionTask for UserShellCommandTask {
TaskKind::Regular
}
fn accepts_pending_input(&self) -> bool {
false
}
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,

View File

@@ -1,5 +1,7 @@
use anyhow::Context;
use codex_core::features::Feature;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::ExecCommandSource;
@@ -7,6 +9,8 @@ use codex_core::protocol::ExecOutputStream;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::TurnAbortReason;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::user_input::UserInput;
use core_test_support::assert_regex_match;
use core_test_support::responses;
use core_test_support::responses::ev_assistant_message;
@@ -20,9 +24,13 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use regex_lite::escape;
use std::path::PathBuf;
use std::time::Duration;
use std::time::Instant;
use tempfile::TempDir;
use tokio::time::timeout;
#[tokio::test]
async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
@@ -119,6 +127,81 @@ async fn user_shell_cmd_can_be_interrupted() {
assert_eq!(ev.reason, TurnAbortReason::Interrupted);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_shell_cmd_does_not_abort_active_turn() -> anyhow::Result<()> {
let server = start_mock_server().await;
let sse_body = responses::sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]);
let response = responses::sse_response(sse_body).set_delay(Duration::from_millis(200));
let _mock = responses::mount_response_once(&server, response).await;
let test = test_codex().build(&server).await?;
let codex = test.codex.clone();
let session_model = test.session_configured.model.clone();
let cwd = test.cwd_path().to_path_buf();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "keep running".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.clone(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
let regular_turn_id = loop {
let event = timeout(Duration::from_secs(1), codex.next_event()).await??;
if matches!(event.msg, EventMsg::TurnStarted(_)) {
break event.id;
}
};
codex
.submit(Op::RunUserShellCommand {
command: "true".to_string(),
})
.await?;
let deadline = Instant::now() + Duration::from_secs(2);
let mut replaced_abort_for_regular = false;
let mut regular_completed = false;
while Instant::now() < deadline && !regular_completed {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
let event = timeout(remaining, codex.next_event()).await??;
match event {
Event {
id,
msg: EventMsg::TurnAborted(ev),
} if id == regular_turn_id && ev.reason == TurnAbortReason::Replaced => {
replaced_abort_for_regular = true;
}
Event {
id,
msg: EventMsg::TurnComplete(_),
} if id == regular_turn_id => {
regular_completed = true;
}
_ => {}
}
}
assert_eq!(regular_completed, true);
assert_eq!(replaced_abort_for_regular, false);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_shell_command_history_is_persisted_and_shared_with_model() -> anyhow::Result<()> {
let server = responses::start_mock_server().await;

View File

@@ -18,7 +18,7 @@
//! The bottom pane exposes a single "task running" indicator that drives the spinner and interrupt
//! hints. This module treats that indicator as derived UI-busy state: it is set while an agent turn
//! is in progress and while MCP server startup is in progress. Those lifecycles are tracked
//! independently (`agent_turn_running` and `mcp_startup_status`) and synchronized via
//! independently (`running_turn_ids` and `mcp_startup_status`) and synchronized via
//! `update_task_running_state`.
use std::collections::HashMap;
use std::collections::HashSet;
@@ -449,11 +449,11 @@ pub(crate) struct ChatWidget {
unified_exec_wait_streak: Option<UnifiedExecWaitStreak>,
task_complete_pending: bool,
unified_exec_processes: Vec<UnifiedExecProcessSummary>,
/// Tracks whether codex-core currently considers an agent turn to be in progress.
/// Tracks the set of active turn ids currently running.
///
/// This is kept separate from `mcp_startup_status` so that MCP startup progress (or completion)
/// can update the status header without accidentally clearing the spinner for an active turn.
agent_turn_running: bool,
running_turn_ids: HashSet<String>,
/// Tracks per-server MCP startup state while startup is in progress.
///
/// The map is `Some(_)` from the first `McpStartupUpdate` until `McpStartupComplete`, and the
@@ -676,8 +676,9 @@ impl ChatWidget {
/// The bottom pane only has one running flag, but this module treats it as a derived state of
/// both the agent turn lifecycle and MCP startup lifecycle.
fn update_task_running_state(&mut self) {
let turn_running = !self.running_turn_ids.is_empty();
self.bottom_pane
.set_task_running(self.agent_turn_running || self.mcp_startup_status.is_some());
.set_task_running(turn_running || self.mcp_startup_status.is_some());
}
fn restore_reasoning_status_header(&mut self) {
@@ -865,8 +866,13 @@ impl ChatWidget {
// Raw reasoning uses the same flow as summarized reasoning
fn on_task_started(&mut self) {
self.agent_turn_running = true;
fn on_task_started(&mut self, turn_id: Option<&str>, from_replay: bool) {
if turn_id.is_none() && from_replay {
return;
}
if let Some(turn_id) = turn_id {
self.running_turn_ids.insert(turn_id.to_string());
}
self.saw_plan_update_this_turn = false;
self.bottom_pane.clear_quit_shortcut_hint();
self.quit_shortcut_expires_at = None;
@@ -880,19 +886,13 @@ impl ChatWidget {
self.request_redraw();
}
fn on_task_complete(&mut self, last_agent_message: Option<String>, from_replay: bool) {
// If a stream is currently active, finalize it.
self.flush_answer_stream_with_separator();
self.flush_unified_exec_wait_streak();
// Mark task stopped and request redraw now that all content is in history.
self.agent_turn_running = false;
self.update_task_running_state();
self.running_commands.clear();
self.suppressed_exec_calls.clear();
self.last_unified_wait = None;
self.unified_exec_wait_streak = None;
self.clear_unified_exec_processes();
self.request_redraw();
fn on_task_complete(
&mut self,
turn_id: Option<&str>,
last_agent_message: Option<String>,
from_replay: bool,
) {
self.on_turn_finished(turn_id);
if !from_replay && self.queued_user_messages.is_empty() {
self.maybe_prompt_plan_implementation(last_agent_message.as_deref());
@@ -907,6 +907,27 @@ impl ChatWidget {
self.maybe_show_pending_rate_limit_prompt();
}
fn on_turn_finished(&mut self, turn_id: Option<&str>) {
// If a stream is currently active, finalize it.
self.flush_answer_stream_with_separator();
self.flush_unified_exec_wait_streak();
if let Some(turn_id) = turn_id {
self.running_turn_ids.remove(turn_id);
} else {
self.running_turn_ids.clear();
}
self.update_task_running_state();
if self.running_turn_ids.is_empty() {
// Only clear shared command state when all turns are finished.
self.running_commands.clear();
self.suppressed_exec_calls.clear();
self.last_unified_wait = None;
self.unified_exec_wait_streak = None;
self.clear_unified_exec_processes();
}
self.request_redraw();
}
fn maybe_prompt_plan_implementation(&mut self, last_agent_message: Option<&str>) {
if !self.collaboration_modes_enabled() {
return;
@@ -1101,7 +1122,7 @@ impl ChatWidget {
// Ensure any spinner is replaced by a red ✗ and flushed into history.
self.finalize_active_cell_as_failed();
// Reset running state and clear streaming buffers.
self.agent_turn_running = false;
self.running_turn_ids.clear();
self.update_task_running_state();
self.running_commands.clear();
self.suppressed_exec_calls.clear();
@@ -2026,7 +2047,7 @@ impl ChatWidget {
unified_exec_wait_streak: None,
task_complete_pending: false,
unified_exec_processes: Vec::new(),
agent_turn_running: false,
running_turn_ids: HashSet::new(),
mcp_startup_status: None,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
@@ -2159,7 +2180,7 @@ impl ChatWidget {
unified_exec_wait_streak: None,
task_complete_pending: false,
unified_exec_processes: Vec::new(),
agent_turn_running: false,
running_turn_ids: HashSet::new(),
mcp_startup_status: None,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
@@ -2285,7 +2306,7 @@ impl ChatWidget {
unified_exec_wait_streak: None,
task_complete_pending: false,
unified_exec_processes: Vec::new(),
agent_turn_running: false,
running_turn_ids: HashSet::new(),
mcp_startup_status: None,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
@@ -2997,9 +3018,9 @@ impl ChatWidget {
self.on_agent_reasoning_final();
}
EventMsg::AgentReasoningSectionBreak(_) => self.on_reasoning_section_break(),
EventMsg::TurnStarted(_) => self.on_task_started(),
EventMsg::TurnStarted(_) => self.on_task_started(id.as_deref(), from_replay),
EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message }) => {
self.on_task_complete(last_agent_message, from_replay)
self.on_task_complete(id.as_deref(), last_agent_message, from_replay)
}
EventMsg::TokenCount(ev) => {
self.set_token_info(ev.info);
@@ -3014,7 +3035,9 @@ impl ChatWidget {
self.on_interrupted_turn(ev.reason);
}
TurnAbortReason::Replaced => {
self.on_error("Turn aborted: replaced by a new task".to_owned())
// Replaced turns do not emit TurnComplete, so remove this turn from running
// state here (important when turns can overlap).
self.on_turn_finished(id.as_deref());
}
TurnAbortReason::ReviewEnded => {
self.on_interrupted_turn(ev.reason);

View File

@@ -808,7 +808,7 @@ async fn make_chatwidget_manual(
unified_exec_wait_streak: None,
task_complete_pending: false,
unified_exec_processes: Vec::new(),
agent_turn_running: false,
running_turn_ids: HashSet::new(),
mcp_startup_status: None,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
@@ -1255,7 +1255,7 @@ async fn plan_implementation_popup_skips_when_messages_queued() {
chat.bottom_pane.set_task_running(true);
chat.queue_user_message("Queued message".into());
chat.on_task_complete(Some("Plan details".to_string()), false);
chat.on_task_complete(None, Some("Plan details".to_string()), false);
let popup = render_bottom_popup(&chat, 80);
assert!(
@@ -1273,7 +1273,7 @@ async fn plan_implementation_popup_shows_on_plan_update_without_message() {
.expect("expected plan collaboration mask");
chat.set_collaboration_mask(plan_mask);
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
chat.on_plan_update(UpdatePlanArgs {
explanation: None,
plan: vec![PlanItemArg {
@@ -1281,7 +1281,7 @@ async fn plan_implementation_popup_shows_on_plan_update_without_message() {
status: StepStatus::Pending,
}],
});
chat.on_task_complete(None, false);
chat.on_task_complete(Some("turn-1"), None, false);
let popup = render_bottom_popup(&chat, 80);
assert!(
@@ -1301,7 +1301,7 @@ async fn plan_implementation_popup_skips_when_rate_limit_prompt_pending() {
.expect("expected plan collaboration mask");
chat.set_collaboration_mask(plan_mask);
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
chat.on_plan_update(UpdatePlanArgs {
explanation: None,
plan: vec![PlanItemArg {
@@ -1310,7 +1310,7 @@ async fn plan_implementation_popup_skips_when_rate_limit_prompt_pending() {
}],
});
chat.on_rate_limit_snapshot(Some(snapshot(92.0)));
chat.on_task_complete(None, false);
chat.on_task_complete(Some("turn-1"), None, false);
let popup = render_bottom_popup(&chat, 80);
assert!(
@@ -1668,7 +1668,7 @@ async fn streaming_final_answer_keeps_task_running_state() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
chat.thread_id = Some(ThreadId::new());
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
chat.on_agent_message_delta("Final answer line\n".to_string());
chat.on_commit_tick();
@@ -1694,6 +1694,41 @@ async fn streaming_final_answer_keeps_task_running_state() {
assert!(!chat.bottom_pane.quit_shortcut_hint_visible());
}
#[tokio::test]
async fn task_running_persists_when_user_shell_turn_completes_first() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "turn-regular".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "turn-shell".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
}),
});
assert_eq!(chat.bottom_pane.is_task_running(), true);
chat.handle_codex_event(Event {
id: "turn-shell".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: None,
}),
});
assert_eq!(chat.bottom_pane.is_task_running(), true);
chat.handle_codex_event(Event {
id: "turn-regular".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: None,
}),
});
assert_eq!(chat.bottom_pane.is_task_running(), false);
}
#[tokio::test]
async fn ctrl_c_shutdown_works_with_caps_lock() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
@@ -1852,7 +1887,7 @@ async fn exec_end_without_begin_uses_event_command() {
#[tokio::test]
async fn exec_history_shows_unified_exec_startup_commands() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
let begin = begin_exec_with_source(
&mut chat,
@@ -1879,7 +1914,7 @@ async fn exec_history_shows_unified_exec_startup_commands() {
#[tokio::test]
async fn exec_history_shows_unified_exec_tool_calls() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
let begin = begin_exec_with_source(
&mut chat,
@@ -1896,7 +1931,7 @@ async fn exec_history_shows_unified_exec_tool_calls() {
#[tokio::test]
async fn unified_exec_end_after_task_complete_is_suppressed() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
let begin = begin_exec_with_source(
&mut chat,
@@ -1906,7 +1941,7 @@ async fn unified_exec_end_after_task_complete_is_suppressed() {
);
drain_insert_history(&mut rx);
chat.on_task_complete(None, false);
chat.on_task_complete(Some("turn-1"), None, false);
end_exec(&mut chat, begin, "", "", 0);
let cells = drain_insert_history(&mut rx);
@@ -1919,8 +1954,8 @@ async fn unified_exec_end_after_task_complete_is_suppressed() {
#[tokio::test]
async fn unified_exec_interaction_after_task_complete_is_suppressed() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_task_complete(None, false);
chat.on_task_started(Some("turn-1"), false);
chat.on_task_complete(Some("turn-1"), None, false);
chat.handle_codex_event(Event {
id: "call-1".to_string(),
@@ -2014,7 +2049,7 @@ async fn unified_exec_wait_before_streamed_agent_message_snapshot() {
#[tokio::test]
async fn unified_exec_wait_status_header_updates_on_late_command_display() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
chat.unified_exec_processes.push(UnifiedExecProcessSummary {
key: "proc-1".to_string(),
command_display: "sleep 5".to_string(),
@@ -2036,7 +2071,7 @@ async fn unified_exec_wait_status_header_updates_on_late_command_display() {
#[tokio::test]
async fn unified_exec_waiting_multiple_empty_snapshots() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
begin_unified_exec_startup(&mut chat, "call-wait-1", "proc-1", "just fix");
terminal_interaction(&mut chat, "call-wait-1a", "proc-1", "");
@@ -2064,7 +2099,7 @@ async fn unified_exec_waiting_multiple_empty_snapshots() {
#[tokio::test]
async fn unified_exec_empty_then_non_empty_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
begin_unified_exec_startup(&mut chat, "call-wait-2", "proc-2", "just fix");
terminal_interaction(&mut chat, "call-wait-2a", "proc-2", "");
@@ -2081,7 +2116,7 @@ async fn unified_exec_empty_then_non_empty_snapshot() {
#[tokio::test]
async fn unified_exec_non_empty_then_empty_snapshots() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
begin_unified_exec_startup(&mut chat, "call-wait-3", "proc-3", "just fix");
terminal_interaction(&mut chat, "call-wait-3a", "proc-3", "pwd\n");
@@ -2199,7 +2234,7 @@ async fn collab_mode_shift_tab_cycles_only_when_enabled_and_idle() {
assert_eq!(chat.active_collaboration_mode_kind(), ModeKind::Code);
assert_eq!(chat.current_collaboration_mode(), &initial);
chat.on_task_started();
chat.on_task_started(Some("turn-1"), false);
let before = chat.active_collaboration_mode_kind();
chat.handle_key_event(KeyEvent::from(KeyCode::BackTab));
assert_eq!(chat.active_collaboration_mode_kind(), before);
@@ -3664,6 +3699,49 @@ async fn interrupt_restores_queued_messages_into_composer() {
let _ = drain_insert_history(&mut rx);
}
#[tokio::test]
async fn replaced_turn_aborted_clears_only_that_turn_id() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "turn-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "turn-2".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
}),
});
assert!(chat.running_turn_ids.contains("turn-1"));
assert!(chat.running_turn_ids.contains("turn-2"));
assert!(chat.bottom_pane.is_task_running());
chat.handle_codex_event(Event {
id: "turn-1".into(),
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Replaced,
}),
});
assert!(!chat.running_turn_ids.contains("turn-1"));
assert!(chat.running_turn_ids.contains("turn-2"));
assert!(chat.bottom_pane.is_task_running());
chat.handle_codex_event(Event {
id: "turn-2".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: None,
}),
});
assert!(chat.running_turn_ids.is_empty());
assert!(!chat.bottom_pane.is_task_running());
}
#[tokio::test]
async fn interrupt_prepends_queued_messages_before_existing_composer_text() {
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(None).await;