From be5acf7c57a8dd4e02e77ebda80115669f04eb47 Mon Sep 17 00:00:00 2001 From: Brent Traut Date: Mon, 18 May 2026 20:15:06 -0700 Subject: [PATCH] app-server: simplify deferred cleanup plumbing --- .../app-server/src/bespoke_event_handling.rs | 16 +++++-- .../request_processors/thread_lifecycle.rs | 48 +++++++++++-------- codex-rs/app-server/src/thread_state.rs | 7 +-- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 04c3649723..7d497427f1 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1327,11 +1327,8 @@ pub(crate) async fn flush_pending_terminal_plan_cleanup( ) { let pending_terminal_plan_cleanups = take_flushable_pending_terminal_plan_cleanup(thread_state).await; - for (turn_id, latest_plan_update) in - terminal_plan_cleanup_updates(pending_terminal_plan_cleanups) - { - emit_turn_plan_updated(conversation_id, &turn_id, latest_plan_update, outgoing).await; - } + emit_terminal_plan_cleanup_updates(conversation_id, pending_terminal_plan_cleanups, outgoing) + .await; } async fn flush_all_pending_terminal_plan_cleanup( @@ -1341,6 +1338,15 @@ async fn flush_all_pending_terminal_plan_cleanup( ) { let pending_terminal_plan_cleanups = std::mem::take(&mut thread_state.lock().await.pending_terminal_plan_cleanups); + emit_terminal_plan_cleanup_updates(conversation_id, pending_terminal_plan_cleanups, outgoing) + .await; +} + +async fn emit_terminal_plan_cleanup_updates( + conversation_id: ThreadId, + pending_terminal_plan_cleanups: Vec, + outgoing: &ThreadScopedOutgoingMessageSender, +) { for (turn_id, latest_plan_update) in terminal_plan_cleanup_updates(pending_terminal_plan_cleanups) { diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index f0c3dc49bc..4d49c4154e 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -476,18 +476,11 @@ pub(super) async fn handle_thread_listener_command( )) .await; if !goal_is_active { - let subscribed_connection_ids = thread_state_manager - .subscribed_connection_ids(conversation_id) - .await; - let thread_outgoing = ThreadScopedOutgoingMessageSender::new( - outgoing.clone(), - subscribed_connection_ids, - conversation_id, - ); - crate::bespoke_event_handling::flush_pending_terminal_plan_cleanup( + flush_pending_terminal_plan_cleanup_for_subscribers( conversation_id, + thread_state_manager, thread_state, - &thread_outgoing, + outgoing, ) .await; } @@ -500,18 +493,11 @@ pub(super) async fn handle_thread_listener_command( }, )) .await; - let subscribed_connection_ids = thread_state_manager - .subscribed_connection_ids(conversation_id) - .await; - let thread_outgoing = ThreadScopedOutgoingMessageSender::new( - outgoing.clone(), - subscribed_connection_ids, - conversation_id, - ); - crate::bespoke_event_handling::flush_pending_terminal_plan_cleanup( + flush_pending_terminal_plan_cleanup_for_subscribers( conversation_id, + thread_state_manager, thread_state, - &thread_outgoing, + outgoing, ) .await; } @@ -534,6 +520,28 @@ pub(super) async fn handle_thread_listener_command( } } +async fn flush_pending_terminal_plan_cleanup_for_subscribers( + conversation_id: ThreadId, + thread_state_manager: &ThreadStateManager, + thread_state: &Arc>, + outgoing: &Arc, +) { + let subscribed_connection_ids = thread_state_manager + .subscribed_connection_ids(conversation_id) + .await; + let thread_outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing.clone(), + subscribed_connection_ids, + conversation_id, + ); + crate::bespoke_event_handling::flush_pending_terminal_plan_cleanup( + conversation_id, + thread_state, + &thread_outgoing, + ) + .await; +} + #[allow(clippy::too_many_arguments)] #[expect( clippy::await_holding_invalid_type, diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 4163b6a12a..dc67c56a0f 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -140,11 +140,8 @@ impl ThreadState { } pub(crate) fn prune_pending_terminal_plan_cleanups_after_rollback(&mut self) { - self.pending_terminal_plan_cleanups.retain(|cleanup| { - self.terminal_turn_ids - .iter() - .any(|turn_id| turn_id == &cleanup.turn_id) - }); + self.pending_terminal_plan_cleanups + .retain(|cleanup| self.terminal_turn_ids.contains(&cleanup.turn_id)); } pub(crate) fn track_current_turn_event(&mut self, event_turn_id: &str, event: &EventMsg) {