app-server: scope deferred plan cleanup notifications

This commit is contained in:
Brent Traut
2026-05-18 19:31:46 -07:00
parent 97bcd94066
commit e26a615fc3
3 changed files with 45 additions and 22 deletions

View File

@@ -1,7 +1,6 @@
use crate::error_code::internal_error;
use crate::error_code::invalid_request;
use crate::outgoing_message::ClientRequestResult;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
use crate::request_processors::populate_thread_turns_from_history;
use crate::request_processors::thread_from_stored_thread;
@@ -1325,7 +1324,7 @@ async fn emit_turn_completed_with_status(
.await;
}
async fn emit_terminal_plan_cleanup(
pub(crate) async fn emit_terminal_plan_cleanup(
conversation_id: ThreadId,
pending_terminal_plan_cleanups: Vec<PendingTerminalPlanCleanup>,
outgoing: &ThreadScopedOutgoingMessageSender,
@@ -1337,22 +1336,6 @@ async fn emit_terminal_plan_cleanup(
}
}
pub(crate) async fn emit_terminal_plan_cleanup_globally(
conversation_id: ThreadId,
pending_terminal_plan_cleanups: Vec<PendingTerminalPlanCleanup>,
outgoing: &Arc<OutgoingMessageSender>,
) {
for (turn_id, latest_plan_update) in
terminal_plan_cleanup_updates(pending_terminal_plan_cleanups)
{
let notification =
turn_plan_updated_notification(conversation_id, &turn_id, latest_plan_update);
outgoing
.send_server_notification(ServerNotification::TurnPlanUpdated(notification))
.await;
}
}
fn terminal_plan_cleanup_updates(
pending_terminal_plan_cleanups: Vec<PendingTerminalPlanCleanup>,
) -> Vec<(String, UpdatePlanArgs)> {

View File

@@ -403,6 +403,7 @@ impl ThreadGoalRequestProcessor {
goal: ThreadGoal,
listener_command_tx: Option<tokio::sync::mpsc::UnboundedSender<ThreadListenerCommand>>,
) {
let goal_is_active = goal.status == ThreadGoalStatus::Active;
if let Some(listener_command_tx) = listener_command_tx {
let command = crate::thread_state::ThreadListenerCommand::EmitThreadGoalUpdated {
goal: goal.clone(),
@@ -423,6 +424,9 @@ impl ThreadGoalRequestProcessor {
},
))
.await;
if !goal_is_active {
self.emit_pending_terminal_plan_cleanup(thread_id).await;
}
}
async fn emit_thread_goal_cleared_ordered(
@@ -446,6 +450,26 @@ impl ThreadGoalRequestProcessor {
},
))
.await;
self.emit_pending_terminal_plan_cleanup(thread_id).await;
}
async fn emit_pending_terminal_plan_cleanup(&self, thread_id: ThreadId) {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let subscribed_connection_ids = self
.thread_state_manager
.subscribed_connection_ids(thread_id)
.await;
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
self.outgoing.clone(),
subscribed_connection_ids,
thread_id,
);
crate::bespoke_event_handling::emit_terminal_plan_cleanup(
thread_id,
crate::bespoke_event_handling::take_pending_terminal_plan_cleanup(&thread_state).await,
&thread_outgoing,
)
.await;
}
}

View File

@@ -476,11 +476,19 @@ pub(super) async fn handle_thread_listener_command(
))
.await;
if !goal_is_active {
crate::bespoke_event_handling::emit_terminal_plan_cleanup_globally(
let subscribed_connection_ids = thread_state_manager
.subscribed_connection_ids(conversation_id)
.await;
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
subscribed_connection_ids,
conversation_id,
);
crate::bespoke_event_handling::emit_terminal_plan_cleanup(
conversation_id,
crate::bespoke_event_handling::take_pending_terminal_plan_cleanup(thread_state)
.await,
outgoing,
&thread_outgoing,
)
.await;
}
@@ -493,11 +501,19 @@ pub(super) async fn handle_thread_listener_command(
},
))
.await;
crate::bespoke_event_handling::emit_terminal_plan_cleanup_globally(
let subscribed_connection_ids = thread_state_manager
.subscribed_connection_ids(conversation_id)
.await;
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
subscribed_connection_ids,
conversation_id,
);
crate::bespoke_event_handling::emit_terminal_plan_cleanup(
conversation_id,
crate::bespoke_event_handling::take_pending_terminal_plan_cleanup(thread_state)
.await,
outgoing,
&thread_outgoing,
)
.await;
}