app-server: simplify terminal cleanup decisions

This commit is contained in:
Brent Traut
2026-05-18 20:26:31 -07:00
parent 39d320d3fd
commit 74926f7e86

View File

@@ -183,14 +183,12 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing.abort_pending_server_requests().await;
respond_to_pending_interrupts(&thread_state, &outgoing).await;
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some();
let has_pending_terminal_plan_cleanup = !thread_state
.lock()
.await
.pending_terminal_plan_cleanups
.is_empty();
let preserve_terminal_plan_progress = has_pending_terminal_plan_cleanup
&& should_preserve_terminal_plan_progress(conversation.as_ref(), conversation_id)
.await;
let preserve_terminal_plan_progress = should_preserve_terminal_plan_progress(
conversation.as_ref(),
conversation_id,
&thread_state,
)
.await;
thread_watch_manager
.note_turn_completed(&conversation_id.to_string(), turn_failed)
.await;
@@ -1123,14 +1121,12 @@ pub(crate) async fn apply_bespoke_event_handling(
// All per-thread requests are bound to a turn, so abort them.
outgoing.abort_pending_server_requests().await;
respond_to_pending_interrupts(&thread_state, &outgoing).await;
let has_pending_terminal_plan_cleanup = !thread_state
.lock()
.await
.pending_terminal_plan_cleanups
.is_empty();
let preserve_terminal_plan_progress = has_pending_terminal_plan_cleanup
&& should_preserve_terminal_plan_progress(conversation.as_ref(), conversation_id)
.await;
let preserve_terminal_plan_progress = should_preserve_terminal_plan_progress(
conversation.as_ref(),
conversation_id,
&thread_state,
)
.await;
thread_watch_manager
.note_turn_interrupted(&conversation_id.to_string())
@@ -1366,39 +1362,25 @@ async fn emit_terminal_plan_cleanup_updates(
pending_terminal_plan_cleanups: Vec<PendingTerminalPlanCleanup>,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
for (turn_id, latest_plan_update) in
terminal_plan_cleanup_updates(pending_terminal_plan_cleanups)
for PendingTerminalPlanCleanup {
turn_id,
plan_update: mut latest_plan_update,
} in pending_terminal_plan_cleanups
{
emit_turn_plan_updated(conversation_id, &turn_id, latest_plan_update, outgoing).await;
let mut downgraded = false;
for item in &mut latest_plan_update.plan {
if matches!(item.status, StepStatus::InProgress) {
item.status = StepStatus::Pending;
downgraded = true;
}
}
if downgraded {
latest_plan_update.explanation = None;
emit_turn_plan_updated(conversation_id, &turn_id, latest_plan_update, outgoing).await;
}
}
}
fn terminal_plan_cleanup_updates(
pending_terminal_plan_cleanups: Vec<PendingTerminalPlanCleanup>,
) -> Vec<(String, UpdatePlanArgs)> {
pending_terminal_plan_cleanups
.into_iter()
.filter_map(
|PendingTerminalPlanCleanup {
turn_id,
plan_update: mut latest_plan_update,
}| {
let mut downgraded = false;
for item in &mut latest_plan_update.plan {
if matches!(item.status, StepStatus::InProgress) {
item.status = StepStatus::Pending;
downgraded = true;
}
}
downgraded.then(|| {
latest_plan_update.explanation = None;
(turn_id, latest_plan_update)
})
},
)
.collect()
}
fn retain_pending_terminal_plan_cleanups_for_turns(
pending_terminal_plan_cleanups: &mut Vec<PendingTerminalPlanCleanup>,
retained_turns: &[Turn],
@@ -1448,7 +1430,17 @@ async fn take_flushable_pending_terminal_plan_cleanup(
async fn should_preserve_terminal_plan_progress(
conversation: &CodexThread,
conversation_id: ThreadId,
thread_state: &Arc<Mutex<ThreadState>>,
) -> bool {
if thread_state
.lock()
.await
.pending_terminal_plan_cleanups
.is_empty()
{
return false;
}
let Some(state_db) = conversation.state_db() else {
return false;
};