From 74926f7e863ce97aa02240e346c9e9d0d2aa63a9 Mon Sep 17 00:00:00 2001 From: Brent Traut Date: Mon, 18 May 2026 20:26:31 -0700 Subject: [PATCH] app-server: simplify terminal cleanup decisions --- .../app-server/src/bespoke_event_handling.rs | 82 +++++++++---------- 1 file changed, 37 insertions(+), 45 deletions(-) diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 587d422781..2b982a388e 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -183,14 +183,12 @@ pub(crate) async fn apply_bespoke_event_handling( outgoing.abort_pending_server_requests().await; respond_to_pending_interrupts(&thread_state, &outgoing).await; let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some(); - let has_pending_terminal_plan_cleanup = !thread_state - .lock() - .await - .pending_terminal_plan_cleanups - .is_empty(); - let preserve_terminal_plan_progress = has_pending_terminal_plan_cleanup - && should_preserve_terminal_plan_progress(conversation.as_ref(), conversation_id) - .await; + let preserve_terminal_plan_progress = should_preserve_terminal_plan_progress( + conversation.as_ref(), + conversation_id, + &thread_state, + ) + .await; thread_watch_manager .note_turn_completed(&conversation_id.to_string(), turn_failed) .await; @@ -1123,14 +1121,12 @@ pub(crate) async fn apply_bespoke_event_handling( // All per-thread requests are bound to a turn, so abort them. outgoing.abort_pending_server_requests().await; respond_to_pending_interrupts(&thread_state, &outgoing).await; - let has_pending_terminal_plan_cleanup = !thread_state - .lock() - .await - .pending_terminal_plan_cleanups - .is_empty(); - let preserve_terminal_plan_progress = has_pending_terminal_plan_cleanup - && should_preserve_terminal_plan_progress(conversation.as_ref(), conversation_id) - .await; + let preserve_terminal_plan_progress = should_preserve_terminal_plan_progress( + conversation.as_ref(), + conversation_id, + &thread_state, + ) + .await; thread_watch_manager .note_turn_interrupted(&conversation_id.to_string()) @@ -1366,39 +1362,25 @@ async fn emit_terminal_plan_cleanup_updates( pending_terminal_plan_cleanups: Vec, outgoing: &ThreadScopedOutgoingMessageSender, ) { - for (turn_id, latest_plan_update) in - terminal_plan_cleanup_updates(pending_terminal_plan_cleanups) + for PendingTerminalPlanCleanup { + turn_id, + plan_update: mut latest_plan_update, + } in pending_terminal_plan_cleanups { - emit_turn_plan_updated(conversation_id, &turn_id, latest_plan_update, outgoing).await; + let mut downgraded = false; + for item in &mut latest_plan_update.plan { + if matches!(item.status, StepStatus::InProgress) { + item.status = StepStatus::Pending; + downgraded = true; + } + } + if downgraded { + latest_plan_update.explanation = None; + emit_turn_plan_updated(conversation_id, &turn_id, latest_plan_update, outgoing).await; + } } } -fn terminal_plan_cleanup_updates( - pending_terminal_plan_cleanups: Vec, -) -> Vec<(String, UpdatePlanArgs)> { - pending_terminal_plan_cleanups - .into_iter() - .filter_map( - |PendingTerminalPlanCleanup { - turn_id, - plan_update: mut latest_plan_update, - }| { - let mut downgraded = false; - for item in &mut latest_plan_update.plan { - if matches!(item.status, StepStatus::InProgress) { - item.status = StepStatus::Pending; - downgraded = true; - } - } - downgraded.then(|| { - latest_plan_update.explanation = None; - (turn_id, latest_plan_update) - }) - }, - ) - .collect() -} - fn retain_pending_terminal_plan_cleanups_for_turns( pending_terminal_plan_cleanups: &mut Vec, retained_turns: &[Turn], @@ -1448,7 +1430,17 @@ async fn take_flushable_pending_terminal_plan_cleanup( async fn should_preserve_terminal_plan_progress( conversation: &CodexThread, conversation_id: ThreadId, + thread_state: &Arc>, ) -> bool { + if thread_state + .lock() + .await + .pending_terminal_plan_cleanups + .is_empty() + { + return false; + } + let Some(state_db) = conversation.state_db() else { return false; };