chore: rework state machine further (#16567)

This commit is contained in:
jif-oai
2026-04-02 16:15:28 +02:00
committed by GitHub
parent e47ed5e57f
commit ab6cce62b8
4 changed files with 77 additions and 24 deletions

View File

@@ -4047,29 +4047,39 @@ impl Session {
}
pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) {
self.set_mailbox_delivery_phase(sub_id, MailboxDeliveryPhase::NextTurn)
.await;
}
pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) {
self.set_mailbox_delivery_phase(sub_id, MailboxDeliveryPhase::CurrentTurn)
.await;
}
async fn set_mailbox_delivery_phase(&self, sub_id: &str, phase: MailboxDeliveryPhase) {
let turn_state = {
let active = self.active_turn.lock().await;
active.as_ref().and_then(|active_turn| {
active_turn
.tasks
.contains_key(sub_id)
.then(|| Arc::clone(&active_turn.turn_state))
})
};
let turn_state = self.turn_state_for_sub_id(sub_id).await;
let Some(turn_state) = turn_state else {
return;
};
turn_state.lock().await.set_mailbox_delivery_phase(phase);
let mut turn_state = turn_state.lock().await;
if turn_state.has_pending_input() {
return;
}
turn_state.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn);
}
pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) {
let turn_state = self.turn_state_for_sub_id(sub_id).await;
let Some(turn_state) = turn_state else {
return;
};
turn_state
.lock()
.await
.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn);
}
async fn turn_state_for_sub_id(
&self,
sub_id: &str,
) -> Option<Arc<tokio::sync::Mutex<crate::state::TurnState>>> {
let active = self.active_turn.lock().await;
active.as_ref().and_then(|active_turn| {
active_turn
.tasks
.contains_key(sub_id)
.then(|| Arc::clone(&active_turn.turn_state))
})
}
pub(crate) fn subscribe_mailbox_seq(&self) -> watch::Receiver<u64> {