From 2cd3522272023100dbbcb82d6d9e8c87bb083e3c Mon Sep 17 00:00:00 2001 From: Brent Traut Date: Sun, 17 May 2026 16:12:35 -0700 Subject: [PATCH] app-server: reset terminal plan progress without goals --- .../app-server/src/bespoke_event_handling.rs | 372 ++++++++++++++++-- codex-rs/app-server/src/thread_state.rs | 2 + 2 files changed, 344 insertions(+), 30 deletions(-) diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 5b137c1091..bb113afc00 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -88,6 +88,7 @@ use codex_core::review_prompts; use codex_protocol::ThreadId; use codex_protocol::items::parse_hook_prompt_message; use codex_protocol::models::AdditionalPermissionProfile as CoreAdditionalPermissionProfile; +use codex_protocol::plan_tool::StepStatus; use codex_protocol::plan_tool::UpdatePlanArgs; use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo; use codex_protocol::protocol::Event; @@ -181,6 +182,8 @@ pub(crate) async fn apply_bespoke_event_handling( outgoing.abort_pending_server_requests().await; respond_to_pending_interrupts(&thread_state, &outgoing).await; let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some(); + let has_active_goal = + thread_has_active_goal(conversation.as_ref(), conversation_id).await; thread_watch_manager .note_turn_completed(&conversation_id.to_string(), turn_failed) .await; @@ -188,6 +191,7 @@ pub(crate) async fn apply_bespoke_event_handling( conversation_id, event_turn_id, turn_complete_event, + has_active_goal, &outgoing, &thread_state, ) @@ -1113,6 +1117,8 @@ pub(crate) async fn apply_bespoke_event_handling( outgoing.abort_pending_server_requests().await; respond_to_pending_interrupts(&thread_state, &outgoing).await; + let has_active_goal = + thread_has_active_goal(conversation.as_ref(), conversation_id).await; thread_watch_manager .note_turn_interrupted(&conversation_id.to_string()) .await; @@ -1120,6 +1126,7 @@ pub(crate) async fn apply_bespoke_event_handling( conversation_id, event_turn_id, turn_aborted_event, + has_active_goal, &outgoing, &thread_state, ) @@ -1209,6 +1216,7 @@ pub(crate) async fn apply_bespoke_event_handling( &event_turn_id, plan_update_event, &outgoing, + &thread_state, ) .await; } @@ -1243,21 +1251,10 @@ async fn handle_turn_plan_update( event_turn_id: &str, plan_update_event: UpdatePlanArgs, outgoing: &ThreadScopedOutgoingMessageSender, + thread_state: &Arc>, ) { - // `update_plan` is a todo/checklist tool; it is not related to plan-mode updates - let notification = TurnPlanUpdatedNotification { - thread_id: conversation_id.to_string(), - turn_id: event_turn_id.to_string(), - explanation: plan_update_event.explanation, - plan: plan_update_event - .plan - .into_iter() - .map(TurnPlanStep::from) - .collect(), - }; - outgoing - .send_server_notification(ServerNotification::TurnPlanUpdated(notification)) - .await; + thread_state.lock().await.turn_summary.latest_plan_update = Some(plan_update_event.clone()); + emit_turn_plan_updated(conversation_id, event_turn_id, plan_update_event, outgoing).await; } struct TurnCompletionMetadata { @@ -1292,6 +1289,74 @@ async fn emit_turn_completed_with_status( .await; } +async fn emit_terminal_plan_cleanup( + conversation_id: ThreadId, + event_turn_id: &str, + latest_plan_update: Option, + has_active_goal: bool, + outgoing: &ThreadScopedOutgoingMessageSender, +) { + if has_active_goal { + return; + } + + let Some(mut latest_plan_update) = latest_plan_update else { + return; + }; + let mut downgraded = false; + for item in &mut latest_plan_update.plan { + if matches!(item.status, StepStatus::InProgress) { + item.status = StepStatus::Pending; + downgraded = true; + } + } + if !downgraded { + return; + } + + latest_plan_update.explanation = None; + emit_turn_plan_updated(conversation_id, event_turn_id, latest_plan_update, outgoing).await; +} + +async fn emit_turn_plan_updated( + conversation_id: ThreadId, + event_turn_id: &str, + plan_update_event: UpdatePlanArgs, + outgoing: &ThreadScopedOutgoingMessageSender, +) { + // `update_plan` is a todo/checklist tool; it is not related to plan-mode updates. + let notification = TurnPlanUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.to_string(), + explanation: plan_update_event.explanation, + plan: plan_update_event + .plan + .into_iter() + .map(TurnPlanStep::from) + .collect(), + }; + outgoing + .send_server_notification(ServerNotification::TurnPlanUpdated(notification)) + .await; +} + +async fn thread_has_active_goal(conversation: &CodexThread, conversation_id: ThreadId) -> bool { + let Some(state_db) = conversation.state_db() else { + return false; + }; + + match state_db.get_thread_goal(conversation_id).await { + Ok(goal) => goal.is_some_and(|goal| goal.status == codex_state::ThreadGoalStatus::Active), + Err(err) => { + tracing::warn!( + thread_id = %conversation_id, + "failed to read thread goal before terminal plan cleanup: {err}" + ); + true + } + } +} + #[allow(clippy::too_many_arguments)] async fn start_command_execution_item( conversation_id: &ThreadId, @@ -1438,10 +1503,7 @@ pub(crate) async fn maybe_emit_hook_prompt_item_completed( .await; } -async fn find_and_remove_turn_summary( - _conversation_id: ThreadId, - thread_state: &Arc>, -) -> TurnSummary { +async fn find_and_remove_turn_summary(thread_state: &Arc>) -> TurnSummary { let mut state = thread_state.lock().await; std::mem::take(&mut state.turn_summary) } @@ -1450,10 +1512,19 @@ async fn handle_turn_complete( conversation_id: ThreadId, event_turn_id: String, turn_complete_event: TurnCompleteEvent, + has_active_goal: bool, outgoing: &ThreadScopedOutgoingMessageSender, thread_state: &Arc>, ) { - let turn_summary = find_and_remove_turn_summary(conversation_id, thread_state).await; + let turn_summary = find_and_remove_turn_summary(thread_state).await; + emit_terminal_plan_cleanup( + conversation_id, + &event_turn_id, + turn_summary.latest_plan_update, + has_active_goal, + outgoing, + ) + .await; let (status, error) = match turn_summary.last_error { Some(error) => (TurnStatus::Failed, Some(error)), @@ -1479,10 +1550,19 @@ async fn handle_turn_interrupted( conversation_id: ThreadId, event_turn_id: String, turn_aborted_event: TurnAbortedEvent, + has_active_goal: bool, outgoing: &ThreadScopedOutgoingMessageSender, thread_state: &Arc>, ) { - let turn_summary = find_and_remove_turn_summary(conversation_id, thread_state).await; + let turn_summary = find_and_remove_turn_summary(thread_state).await; + emit_terminal_plan_cleanup( + conversation_id, + &event_turn_id, + turn_summary.latest_plan_update, + has_active_goal, + outgoing, + ) + .await; emit_turn_completed_with_status( conversation_id, @@ -3163,7 +3243,7 @@ mod tests { ) .await; - let turn_summary = find_and_remove_turn_summary(conversation_id, &thread_state).await; + let turn_summary = find_and_remove_turn_summary(&thread_state).await; assert_eq!( turn_summary.last_error, Some(TurnError { @@ -3296,6 +3376,7 @@ mod tests { conversation_id, event_turn_id.clone(), turn_complete_event(&event_turn_id), + /*has_active_goal*/ false, &outgoing, &thread_state, ) @@ -3349,6 +3430,7 @@ mod tests { conversation_id, event_turn_id.clone(), turn_aborted_event(&event_turn_id), + /*has_active_goal*/ false, &outgoing, &thread_state, ) @@ -3399,6 +3481,7 @@ mod tests { conversation_id, event_turn_id.clone(), turn_complete_event(&event_turn_id), + /*has_active_goal*/ false, &outgoing, &thread_state, ) @@ -3453,20 +3536,246 @@ mod tests { }; let conversation_id = ThreadId::new(); + let thread_state = new_thread_state(); - handle_turn_plan_update(conversation_id, "turn-123", update, &outgoing).await; + handle_turn_plan_update( + conversation_id, + "turn-123", + update, + &outgoing, + &thread_state, + ) + .await; let msg = recv_broadcast_message(&mut rx).await?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => { - assert_eq!(n.thread_id, conversation_id.to_string()); - assert_eq!(n.turn_id, "turn-123"); - assert_eq!(n.explanation.as_deref(), Some("need plan")); - assert_eq!(n.plan.len(), 2); - assert_eq!(n.plan[0].step, "first"); - assert_eq!(n.plan[0].status, TurnPlanStepStatus::Pending); - assert_eq!(n.plan[1].step, "second"); - assert_eq!(n.plan[1].status, TurnPlanStepStatus::Completed); + assert_eq!( + n, + TurnPlanUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id: "turn-123".to_string(), + explanation: Some("need plan".to_string()), + plan: vec![ + TurnPlanStep { + step: "first".to_string(), + status: TurnPlanStepStatus::Pending, + }, + TurnPlanStep { + step: "second".to_string(), + status: TurnPlanStepStatus::Completed, + }, + ], + } + ); + } + other => bail!("unexpected message: {other:?}"), + } + let cached = thread_state + .lock() + .await + .turn_summary + .latest_plan_update + .clone() + .expect("plan update should be cached"); + assert_eq!(cached.explanation.as_deref(), Some("need plan")); + assert_eq!(cached.plan.len(), 2); + assert_eq!(cached.plan[0].step, "first"); + assert!(matches!(cached.plan[0].status, StepStatus::Pending)); + assert_eq!(cached.plan[1].step, "second"); + assert!(matches!(cached.plan[1].status, StepStatus::Completed)); + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } + + #[tokio::test] + async fn test_handle_turn_complete_downgrades_in_progress_plan_without_active_goal() + -> Result<()> { + let conversation_id = ThreadId::new(); + let event_turn_id = "complete_with_plan".to_string(); + let thread_state = new_thread_state(); + thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs { + explanation: Some("still working".to_string()), + plan: vec![ + PlanItemArg { + step: "first".to_string(), + status: StepStatus::InProgress, + }, + PlanItemArg { + step: "second".to_string(), + status: StepStatus::Completed, + }, + PlanItemArg { + step: "third".to_string(), + status: StepStatus::InProgress, + }, + ], + }); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); + + handle_turn_complete( + conversation_id, + event_turn_id.clone(), + turn_complete_event(&event_turn_id), + /*has_active_goal*/ false, + &outgoing, + &thread_state, + ) + .await; + + let first = recv_broadcast_message(&mut rx).await?; + match first { + OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => { + assert_eq!( + n, + TurnPlanUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id, + explanation: None, + plan: vec![ + TurnPlanStep { + step: "first".to_string(), + status: TurnPlanStepStatus::Pending, + }, + TurnPlanStep { + step: "second".to_string(), + status: TurnPlanStepStatus::Completed, + }, + TurnPlanStep { + step: "third".to_string(), + status: TurnPlanStepStatus::Pending, + }, + ], + } + ); + } + other => bail!("unexpected message: {other:?}"), + } + + let second = recv_broadcast_message(&mut rx).await?; + match second { + OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { + assert_eq!(n.turn.id, "complete_with_plan"); + assert_eq!(n.turn.status, TurnStatus::Completed); + } + other => bail!("unexpected message: {other:?}"), + } + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } + + #[tokio::test] + async fn test_handle_turn_interrupted_downgrades_in_progress_plan_without_active_goal() + -> Result<()> { + let conversation_id = ThreadId::new(); + let event_turn_id = "interrupt_with_plan".to_string(); + let thread_state = new_thread_state(); + thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs { + explanation: Some("still working".to_string()), + plan: vec![PlanItemArg { + step: "first".to_string(), + status: StepStatus::InProgress, + }], + }); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); + + handle_turn_interrupted( + conversation_id, + event_turn_id.clone(), + turn_aborted_event(&event_turn_id), + /*has_active_goal*/ false, + &outgoing, + &thread_state, + ) + .await; + + let first = recv_broadcast_message(&mut rx).await?; + match first { + OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => { + assert_eq!( + n, + TurnPlanUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id, + explanation: None, + plan: vec![TurnPlanStep { + step: "first".to_string(), + status: TurnPlanStepStatus::Pending, + }], + } + ); + } + other => bail!("unexpected message: {other:?}"), + } + + let second = recv_broadcast_message(&mut rx).await?; + match second { + OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { + assert_eq!(n.turn.id, "interrupt_with_plan"); + assert_eq!(n.turn.status, TurnStatus::Interrupted); + } + other => bail!("unexpected message: {other:?}"), + } + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } + + #[tokio::test] + async fn test_handle_turn_complete_preserves_in_progress_plan_with_active_goal() -> Result<()> { + let conversation_id = ThreadId::new(); + let event_turn_id = "complete_active_goal".to_string(); + let thread_state = new_thread_state(); + thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs { + explanation: Some("still working".to_string()), + plan: vec![PlanItemArg { + step: "first".to_string(), + status: StepStatus::InProgress, + }], + }); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); + + handle_turn_complete( + conversation_id, + event_turn_id.clone(), + turn_complete_event(&event_turn_id), + /*has_active_goal*/ true, + &outgoing, + &thread_state, + ) + .await; + + let msg = recv_broadcast_message(&mut rx).await?; + match msg { + OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { + assert_eq!(n.turn.id, event_turn_id); + assert_eq!(n.turn.status, TurnStatus::Completed); } other => bail!("unexpected message: {other:?}"), } @@ -3633,6 +3942,7 @@ mod tests { conversation_a, a_turn1.clone(), turn_complete_event(&a_turn1), + /*has_active_goal*/ false, &outgoing, &thread_state, ) @@ -3654,6 +3964,7 @@ mod tests { conversation_b, b_turn1.clone(), turn_complete_event(&b_turn1), + /*has_active_goal*/ false, &outgoing, &thread_state, ) @@ -3665,6 +3976,7 @@ mod tests { conversation_a, a_turn2.clone(), turn_complete_event(&a_turn2), + /*has_active_goal*/ false, &outgoing, &thread_state, ) diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 32dfcc325d..8064c3db7d 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -9,6 +9,7 @@ use codex_core::CodexThread; use codex_core::ThreadConfigSnapshot; use codex_file_watcher::WatchRegistration; use codex_protocol::ThreadId; +use codex_protocol::plan_tool::UpdatePlanArgs; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; use codex_rollout::state_db::StateDbHandle; @@ -65,6 +66,7 @@ pub(crate) struct TurnSummary { pub(crate) started_at: Option, pub(crate) command_execution_started: HashSet, pub(crate) last_error: Option, + pub(crate) latest_plan_update: Option, } #[derive(Default)]