From e26a615fc3d83c76868372cd58451faff5e594ac Mon Sep 17 00:00:00 2001 From: Brent Traut Date: Mon, 18 May 2026 19:31:46 -0700 Subject: [PATCH] app-server: scope deferred plan cleanup notifications --- .../app-server/src/bespoke_event_handling.rs | 19 +-------------- .../thread_goal_processor.rs | 24 +++++++++++++++++++ .../request_processors/thread_lifecycle.rs | 24 +++++++++++++++---- 3 files changed, 45 insertions(+), 22 deletions(-) diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index a220eab99f..4ac1ee62f5 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1,7 +1,6 @@ use crate::error_code::internal_error; use crate::error_code::invalid_request; use crate::outgoing_message::ClientRequestResult; -use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; use crate::request_processors::populate_thread_turns_from_history; use crate::request_processors::thread_from_stored_thread; @@ -1325,7 +1324,7 @@ async fn emit_turn_completed_with_status( .await; } -async fn emit_terminal_plan_cleanup( +pub(crate) async fn emit_terminal_plan_cleanup( conversation_id: ThreadId, pending_terminal_plan_cleanups: Vec, outgoing: &ThreadScopedOutgoingMessageSender, @@ -1337,22 +1336,6 @@ async fn emit_terminal_plan_cleanup( } } -pub(crate) async fn emit_terminal_plan_cleanup_globally( - conversation_id: ThreadId, - pending_terminal_plan_cleanups: Vec, - outgoing: &Arc, -) { - for (turn_id, latest_plan_update) in - terminal_plan_cleanup_updates(pending_terminal_plan_cleanups) - { - let notification = - turn_plan_updated_notification(conversation_id, &turn_id, latest_plan_update); - outgoing - .send_server_notification(ServerNotification::TurnPlanUpdated(notification)) - .await; - } -} - fn terminal_plan_cleanup_updates( pending_terminal_plan_cleanups: Vec, ) -> Vec<(String, UpdatePlanArgs)> { diff --git a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs index cf1e58345b..2cef82bc94 100644 --- a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs @@ -403,6 +403,7 @@ impl ThreadGoalRequestProcessor { goal: ThreadGoal, listener_command_tx: Option>, ) { + let goal_is_active = goal.status == ThreadGoalStatus::Active; if let Some(listener_command_tx) = listener_command_tx { let command = crate::thread_state::ThreadListenerCommand::EmitThreadGoalUpdated { goal: goal.clone(), @@ -423,6 +424,9 @@ impl ThreadGoalRequestProcessor { }, )) .await; + if !goal_is_active { + self.emit_pending_terminal_plan_cleanup(thread_id).await; + } } async fn emit_thread_goal_cleared_ordered( @@ -446,6 +450,26 @@ impl ThreadGoalRequestProcessor { }, )) .await; + self.emit_pending_terminal_plan_cleanup(thread_id).await; + } + + async fn emit_pending_terminal_plan_cleanup(&self, thread_id: ThreadId) { + let thread_state = self.thread_state_manager.thread_state(thread_id).await; + let subscribed_connection_ids = self + .thread_state_manager + .subscribed_connection_ids(thread_id) + .await; + let thread_outgoing = ThreadScopedOutgoingMessageSender::new( + self.outgoing.clone(), + subscribed_connection_ids, + thread_id, + ); + crate::bespoke_event_handling::emit_terminal_plan_cleanup( + thread_id, + crate::bespoke_event_handling::take_pending_terminal_plan_cleanup(&thread_state).await, + &thread_outgoing, + ) + .await; } } diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 5826f9fbad..0f74d579c1 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -476,11 +476,19 @@ pub(super) async fn handle_thread_listener_command( )) .await; if !goal_is_active { - crate::bespoke_event_handling::emit_terminal_plan_cleanup_globally( + let subscribed_connection_ids = thread_state_manager + .subscribed_connection_ids(conversation_id) + .await; + let thread_outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing.clone(), + subscribed_connection_ids, + conversation_id, + ); + crate::bespoke_event_handling::emit_terminal_plan_cleanup( conversation_id, crate::bespoke_event_handling::take_pending_terminal_plan_cleanup(thread_state) .await, - outgoing, + &thread_outgoing, ) .await; } @@ -493,11 +501,19 @@ pub(super) async fn handle_thread_listener_command( }, )) .await; - crate::bespoke_event_handling::emit_terminal_plan_cleanup_globally( + let subscribed_connection_ids = thread_state_manager + .subscribed_connection_ids(conversation_id) + .await; + let thread_outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing.clone(), + subscribed_connection_ids, + conversation_id, + ); + crate::bespoke_event_handling::emit_terminal_plan_cleanup( conversation_id, crate::bespoke_event_handling::take_pending_terminal_plan_cleanup(thread_state) .await, - outgoing, + &thread_outgoing, ) .await; }