app-server: retain deferred plan cleanup across goal turns

This commit is contained in:
Brent Traut
2026-05-18 19:11:42 -07:00
parent 7f1ece0c79
commit 9ed73b3b8a
2 changed files with 220 additions and 69 deletions

View File

@@ -5,6 +5,7 @@ use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
use crate::request_processors::populate_thread_turns_from_history;
use crate::request_processors::thread_from_stored_thread;
use crate::server_request_error::is_turn_transition_server_request_error;
use crate::thread_state::PendingTerminalPlanCleanup;
use crate::thread_state::ThreadState;
use crate::thread_state::TurnSummary;
use crate::thread_state::resolve_server_request_on_thread_listener;
@@ -1215,6 +1216,14 @@ pub(crate) async fn apply_bespoke_event_handling(
notification,
))
.await;
if thread_goal_event.goal.status != codex_protocol::protocol::ThreadGoalStatus::Active {
emit_terminal_plan_cleanup(
thread_goal_event.thread_id,
take_pending_terminal_plan_cleanup(&thread_state).await,
&outgoing,
)
.await;
}
}
EventMsg::TurnDiff(turn_diff_event) => {
handle_turn_diff(conversation_id, &event_turn_id, turn_diff_event, &outgoing).await;
@@ -1262,7 +1271,15 @@ async fn handle_turn_plan_update(
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
thread_state.lock().await.turn_summary.latest_plan_update = Some(plan_update_event.clone());
let pending_terminal_plan_cleanup = plan_update_event
.plan
.iter()
.any(|item| matches!(item.status, StepStatus::InProgress))
.then(|| PendingTerminalPlanCleanup {
turn_id: event_turn_id.to_string(),
plan_update: plan_update_event.clone(),
});
thread_state.lock().await.pending_terminal_plan_cleanup = pending_terminal_plan_cleanup;
emit_turn_plan_updated(conversation_id, event_turn_id, plan_update_event, outgoing).await;
}
@@ -1300,16 +1317,14 @@ async fn emit_turn_completed_with_status(
async fn emit_terminal_plan_cleanup(
conversation_id: ThreadId,
event_turn_id: &str,
latest_plan_update: Option<UpdatePlanArgs>,
preserve_terminal_plan_progress: bool,
pending_terminal_plan_cleanup: Option<PendingTerminalPlanCleanup>,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
if preserve_terminal_plan_progress {
return;
}
let Some(mut latest_plan_update) = latest_plan_update else {
let Some(PendingTerminalPlanCleanup {
turn_id,
plan_update: mut latest_plan_update,
}) = pending_terminal_plan_cleanup
else {
return;
};
let mut downgraded = false;
@@ -1324,7 +1339,7 @@ async fn emit_terminal_plan_cleanup(
}
latest_plan_update.explanation = None;
emit_turn_plan_updated(conversation_id, event_turn_id, latest_plan_update, outgoing).await;
emit_turn_plan_updated(conversation_id, &turn_id, latest_plan_update, outgoing).await;
}
async fn emit_turn_plan_updated(
@@ -1353,15 +1368,18 @@ async fn terminal_plan_cleanup_may_be_needed(thread_state: &Arc<Mutex<ThreadStat
thread_state
.lock()
.await
.turn_summary
.latest_plan_update
.as_ref()
.is_some_and(|update| {
update
.plan
.iter()
.any(|item| matches!(item.status, StepStatus::InProgress))
})
.pending_terminal_plan_cleanup
.is_some()
}
async fn take_pending_terminal_plan_cleanup(
thread_state: &Arc<Mutex<ThreadState>>,
) -> Option<PendingTerminalPlanCleanup> {
thread_state
.lock()
.await
.pending_terminal_plan_cleanup
.take()
}
async fn should_preserve_terminal_plan_progress(
@@ -1550,9 +1568,11 @@ async fn handle_turn_complete(
let turn_summary = find_and_remove_turn_summary(thread_state).await;
emit_terminal_plan_cleanup(
conversation_id,
&event_turn_id,
turn_summary.latest_plan_update,
preserve_terminal_plan_progress,
if preserve_terminal_plan_progress {
None
} else {
take_pending_terminal_plan_cleanup(thread_state).await
},
outgoing,
)
.await;
@@ -1588,9 +1608,11 @@ async fn handle_turn_interrupted(
let turn_summary = find_and_remove_turn_summary(thread_state).await;
emit_terminal_plan_cleanup(
conversation_id,
&event_turn_id,
turn_summary.latest_plan_update,
preserve_terminal_plan_progress,
if preserve_terminal_plan_progress {
None
} else {
take_pending_terminal_plan_cleanup(thread_state).await
},
outgoing,
)
.await;
@@ -3602,19 +3624,14 @@ mod tests {
}
other => bail!("unexpected message: {other:?}"),
}
let cached = thread_state
.lock()
.await
.turn_summary
.latest_plan_update
.clone()
.expect("plan update should be cached");
assert_eq!(cached.explanation.as_deref(), Some("need plan"));
assert_eq!(cached.plan.len(), 2);
assert_eq!(cached.plan[0].step, "first");
assert!(matches!(cached.plan[0].status, StepStatus::Pending));
assert_eq!(cached.plan[1].step, "second");
assert!(matches!(cached.plan[1].status, StepStatus::Completed));
assert!(
thread_state
.lock()
.await
.pending_terminal_plan_cleanup
.is_none(),
"plans without in-progress steps do not need terminal cleanup"
);
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
@@ -3625,23 +3642,27 @@ mod tests {
let conversation_id = ThreadId::new();
let event_turn_id = "complete_with_plan".to_string();
let thread_state = new_thread_state();
thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![
PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
thread_state.lock().await.pending_terminal_plan_cleanup =
Some(PendingTerminalPlanCleanup {
turn_id: event_turn_id.clone(),
plan_update: UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![
PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
},
PlanItemArg {
step: "second".to_string(),
status: StepStatus::Completed,
},
PlanItemArg {
step: "third".to_string(),
status: StepStatus::InProgress,
},
],
},
PlanItemArg {
step: "second".to_string(),
status: StepStatus::Completed,
},
PlanItemArg {
step: "third".to_string(),
status: StepStatus::InProgress,
},
],
});
});
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
@@ -3710,13 +3731,17 @@ mod tests {
let conversation_id = ThreadId::new();
let event_turn_id = "interrupt_with_plan".to_string();
let thread_state = new_thread_state();
thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
}],
});
thread_state.lock().await.pending_terminal_plan_cleanup =
Some(PendingTerminalPlanCleanup {
turn_id: event_turn_id.clone(),
plan_update: UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
}],
},
});
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
@@ -3774,13 +3799,17 @@ mod tests {
let conversation_id = ThreadId::new();
let event_turn_id = "complete_active_goal".to_string();
let thread_state = new_thread_state();
thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
}],
});
thread_state.lock().await.pending_terminal_plan_cleanup =
Some(PendingTerminalPlanCleanup {
turn_id: event_turn_id.clone(),
plan_update: UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
}],
},
});
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
@@ -3810,6 +3839,122 @@ mod tests {
}
other => bail!("unexpected message: {other:?}"),
}
assert!(
thread_state
.lock()
.await
.pending_terminal_plan_cleanup
.is_some(),
"active goals retain pending cleanup until the goal settles"
);
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn terminal_thread_goal_update_cleans_preserved_plan_progress() -> Result<()> {
let codex_home = TempDir::new()?;
let config = load_default_config_for_test(&codex_home).await;
let thread_manager = Arc::new(
codex_core::test_support::thread_manager_with_models_provider_and_home(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
),
);
let codex_core::NewThread {
thread_id: conversation_id,
thread: conversation,
..
} = thread_manager.start_thread(config).await?;
let turn_id = "preserved-plan-turn".to_string();
let thread_state = new_thread_state();
thread_state.lock().await.pending_terminal_plan_cleanup =
Some(PendingTerminalPlanCleanup {
turn_id: turn_id.clone(),
plan_update: UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
}],
},
});
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
conversation_id,
);
apply_bespoke_event_handling(
Event {
id: turn_id.clone(),
msg: EventMsg::ThreadGoalUpdated(
codex_protocol::protocol::ThreadGoalUpdatedEvent {
thread_id: conversation_id,
turn_id: Some(turn_id.clone()),
goal: codex_protocol::protocol::ThreadGoal {
thread_id: conversation_id,
objective: "finish the work".to_string(),
status: codex_protocol::protocol::ThreadGoalStatus::Complete,
token_budget: None,
tokens_used: 1,
time_used_seconds: 1,
created_at: 1,
updated_at: 2,
},
},
),
},
conversation_id,
conversation,
thread_manager,
outgoing,
thread_state.clone(),
ThreadWatchManager::new(),
Arc::new(tokio::sync::Semaphore::new(/*permits*/ 1)),
"test-provider".to_string(),
)
.await;
let first = recv_broadcast_message(&mut rx).await?;
assert!(matches!(
first,
OutgoingMessage::AppServerNotification(ServerNotification::ThreadGoalUpdated(_))
));
let second = recv_broadcast_message(&mut rx).await?;
match second {
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => {
assert_eq!(
n,
TurnPlanUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id,
explanation: None,
plan: vec![TurnPlanStep {
step: "first".to_string(),
status: TurnPlanStepStatus::Pending,
}],
}
);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(
thread_state
.lock()
.await
.pending_terminal_plan_cleanup
.is_none(),
"terminal goal updates consume preserved cleanup state"
);
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}

View File

@@ -66,7 +66,12 @@ pub(crate) struct TurnSummary {
pub(crate) started_at: Option<i64>,
pub(crate) command_execution_started: HashSet<String>,
pub(crate) last_error: Option<TurnError>,
pub(crate) latest_plan_update: Option<UpdatePlanArgs>,
}
#[derive(Clone)]
pub(crate) struct PendingTerminalPlanCleanup {
pub(crate) turn_id: String,
pub(crate) plan_update: UpdatePlanArgs,
}
#[derive(Default)]
@@ -74,6 +79,7 @@ pub(crate) struct ThreadState {
pub(crate) pending_interrupts: PendingInterruptQueue,
pub(crate) pending_rollbacks: Option<ConnectionRequestId>,
pub(crate) turn_summary: TurnSummary,
pub(crate) pending_terminal_plan_cleanup: Option<PendingTerminalPlanCleanup>,
pub(crate) last_terminal_turn_id: Option<String>,
pub(crate) cancel_tx: Option<oneshot::Sender<()>>,
pub(crate) experimental_raw_events: bool,