diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index da28431bfd..4acbb27b3c 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -5,6 +5,7 @@ use crate::outgoing_message::ThreadScopedOutgoingMessageSender; use crate::request_processors::populate_thread_turns_from_history; use crate::request_processors::thread_from_stored_thread; use crate::server_request_error::is_turn_transition_server_request_error; +use crate::thread_state::PendingTerminalPlanCleanup; use crate::thread_state::ThreadState; use crate::thread_state::TurnSummary; use crate::thread_state::resolve_server_request_on_thread_listener; @@ -1215,6 +1216,14 @@ pub(crate) async fn apply_bespoke_event_handling( notification, )) .await; + if thread_goal_event.goal.status != codex_protocol::protocol::ThreadGoalStatus::Active { + emit_terminal_plan_cleanup( + thread_goal_event.thread_id, + take_pending_terminal_plan_cleanup(&thread_state).await, + &outgoing, + ) + .await; + } } EventMsg::TurnDiff(turn_diff_event) => { handle_turn_diff(conversation_id, &event_turn_id, turn_diff_event, &outgoing).await; @@ -1262,7 +1271,15 @@ async fn handle_turn_plan_update( outgoing: &ThreadScopedOutgoingMessageSender, thread_state: &Arc>, ) { - thread_state.lock().await.turn_summary.latest_plan_update = Some(plan_update_event.clone()); + let pending_terminal_plan_cleanup = plan_update_event + .plan + .iter() + .any(|item| matches!(item.status, StepStatus::InProgress)) + .then(|| PendingTerminalPlanCleanup { + turn_id: event_turn_id.to_string(), + plan_update: plan_update_event.clone(), + }); + thread_state.lock().await.pending_terminal_plan_cleanup = pending_terminal_plan_cleanup; emit_turn_plan_updated(conversation_id, event_turn_id, plan_update_event, outgoing).await; } @@ -1300,16 +1317,14 @@ async fn emit_turn_completed_with_status( async fn emit_terminal_plan_cleanup( conversation_id: ThreadId, - event_turn_id: &str, - latest_plan_update: Option, - preserve_terminal_plan_progress: bool, + pending_terminal_plan_cleanup: Option, outgoing: &ThreadScopedOutgoingMessageSender, ) { - if preserve_terminal_plan_progress { - return; - } - - let Some(mut latest_plan_update) = latest_plan_update else { + let Some(PendingTerminalPlanCleanup { + turn_id, + plan_update: mut latest_plan_update, + }) = pending_terminal_plan_cleanup + else { return; }; let mut downgraded = false; @@ -1324,7 +1339,7 @@ async fn emit_terminal_plan_cleanup( } latest_plan_update.explanation = None; - emit_turn_plan_updated(conversation_id, event_turn_id, latest_plan_update, outgoing).await; + emit_turn_plan_updated(conversation_id, &turn_id, latest_plan_update, outgoing).await; } async fn emit_turn_plan_updated( @@ -1353,15 +1368,18 @@ async fn terminal_plan_cleanup_may_be_needed(thread_state: &Arc>, +) -> Option { + thread_state + .lock() + .await + .pending_terminal_plan_cleanup + .take() } async fn should_preserve_terminal_plan_progress( @@ -1550,9 +1568,11 @@ async fn handle_turn_complete( let turn_summary = find_and_remove_turn_summary(thread_state).await; emit_terminal_plan_cleanup( conversation_id, - &event_turn_id, - turn_summary.latest_plan_update, - preserve_terminal_plan_progress, + if preserve_terminal_plan_progress { + None + } else { + take_pending_terminal_plan_cleanup(thread_state).await + }, outgoing, ) .await; @@ -1588,9 +1608,11 @@ async fn handle_turn_interrupted( let turn_summary = find_and_remove_turn_summary(thread_state).await; emit_terminal_plan_cleanup( conversation_id, - &event_turn_id, - turn_summary.latest_plan_update, - preserve_terminal_plan_progress, + if preserve_terminal_plan_progress { + None + } else { + take_pending_terminal_plan_cleanup(thread_state).await + }, outgoing, ) .await; @@ -3602,19 +3624,14 @@ mod tests { } other => bail!("unexpected message: {other:?}"), } - let cached = thread_state - .lock() - .await - .turn_summary - .latest_plan_update - .clone() - .expect("plan update should be cached"); - assert_eq!(cached.explanation.as_deref(), Some("need plan")); - assert_eq!(cached.plan.len(), 2); - assert_eq!(cached.plan[0].step, "first"); - assert!(matches!(cached.plan[0].status, StepStatus::Pending)); - assert_eq!(cached.plan[1].step, "second"); - assert!(matches!(cached.plan[1].status, StepStatus::Completed)); + assert!( + thread_state + .lock() + .await + .pending_terminal_plan_cleanup + .is_none(), + "plans without in-progress steps do not need terminal cleanup" + ); assert!(rx.try_recv().is_err(), "no extra messages expected"); Ok(()) } @@ -3625,23 +3642,27 @@ mod tests { let conversation_id = ThreadId::new(); let event_turn_id = "complete_with_plan".to_string(); let thread_state = new_thread_state(); - thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs { - explanation: Some("still working".to_string()), - plan: vec![ - PlanItemArg { - step: "first".to_string(), - status: StepStatus::InProgress, + thread_state.lock().await.pending_terminal_plan_cleanup = + Some(PendingTerminalPlanCleanup { + turn_id: event_turn_id.clone(), + plan_update: UpdatePlanArgs { + explanation: Some("still working".to_string()), + plan: vec![ + PlanItemArg { + step: "first".to_string(), + status: StepStatus::InProgress, + }, + PlanItemArg { + step: "second".to_string(), + status: StepStatus::Completed, + }, + PlanItemArg { + step: "third".to_string(), + status: StepStatus::InProgress, + }, + ], }, - PlanItemArg { - step: "second".to_string(), - status: StepStatus::Completed, - }, - PlanItemArg { - step: "third".to_string(), - status: StepStatus::InProgress, - }, - ], - }); + }); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new( tx, @@ -3710,13 +3731,17 @@ mod tests { let conversation_id = ThreadId::new(); let event_turn_id = "interrupt_with_plan".to_string(); let thread_state = new_thread_state(); - thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs { - explanation: Some("still working".to_string()), - plan: vec![PlanItemArg { - step: "first".to_string(), - status: StepStatus::InProgress, - }], - }); + thread_state.lock().await.pending_terminal_plan_cleanup = + Some(PendingTerminalPlanCleanup { + turn_id: event_turn_id.clone(), + plan_update: UpdatePlanArgs { + explanation: Some("still working".to_string()), + plan: vec![PlanItemArg { + step: "first".to_string(), + status: StepStatus::InProgress, + }], + }, + }); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new( tx, @@ -3774,13 +3799,17 @@ mod tests { let conversation_id = ThreadId::new(); let event_turn_id = "complete_active_goal".to_string(); let thread_state = new_thread_state(); - thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs { - explanation: Some("still working".to_string()), - plan: vec![PlanItemArg { - step: "first".to_string(), - status: StepStatus::InProgress, - }], - }); + thread_state.lock().await.pending_terminal_plan_cleanup = + Some(PendingTerminalPlanCleanup { + turn_id: event_turn_id.clone(), + plan_update: UpdatePlanArgs { + explanation: Some("still working".to_string()), + plan: vec![PlanItemArg { + step: "first".to_string(), + status: StepStatus::InProgress, + }], + }, + }); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new( tx, @@ -3810,6 +3839,122 @@ mod tests { } other => bail!("unexpected message: {other:?}"), } + assert!( + thread_state + .lock() + .await + .pending_terminal_plan_cleanup + .is_some(), + "active goals retain pending cleanup until the goal settles" + ); + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } + + #[tokio::test] + async fn terminal_thread_goal_update_cleans_preserved_plan_progress() -> Result<()> { + let codex_home = TempDir::new()?; + let config = load_default_config_for_test(&codex_home).await; + let thread_manager = Arc::new( + codex_core::test_support::thread_manager_with_models_provider_and_home( + CodexAuth::create_dummy_chatgpt_auth_for_testing(), + config.model_provider.clone(), + config.codex_home.to_path_buf(), + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + ), + ); + let codex_core::NewThread { + thread_id: conversation_id, + thread: conversation, + .. + } = thread_manager.start_thread(config).await?; + let turn_id = "preserved-plan-turn".to_string(); + let thread_state = new_thread_state(); + thread_state.lock().await.pending_terminal_plan_cleanup = + Some(PendingTerminalPlanCleanup { + turn_id: turn_id.clone(), + plan_update: UpdatePlanArgs { + explanation: Some("still working".to_string()), + plan: vec![PlanItemArg { + step: "first".to_string(), + status: StepStatus::InProgress, + }], + }, + }); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + conversation_id, + ); + + apply_bespoke_event_handling( + Event { + id: turn_id.clone(), + msg: EventMsg::ThreadGoalUpdated( + codex_protocol::protocol::ThreadGoalUpdatedEvent { + thread_id: conversation_id, + turn_id: Some(turn_id.clone()), + goal: codex_protocol::protocol::ThreadGoal { + thread_id: conversation_id, + objective: "finish the work".to_string(), + status: codex_protocol::protocol::ThreadGoalStatus::Complete, + token_budget: None, + tokens_used: 1, + time_used_seconds: 1, + created_at: 1, + updated_at: 2, + }, + }, + ), + }, + conversation_id, + conversation, + thread_manager, + outgoing, + thread_state.clone(), + ThreadWatchManager::new(), + Arc::new(tokio::sync::Semaphore::new(/*permits*/ 1)), + "test-provider".to_string(), + ) + .await; + + let first = recv_broadcast_message(&mut rx).await?; + assert!(matches!( + first, + OutgoingMessage::AppServerNotification(ServerNotification::ThreadGoalUpdated(_)) + )); + + let second = recv_broadcast_message(&mut rx).await?; + match second { + OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => { + assert_eq!( + n, + TurnPlanUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id, + explanation: None, + plan: vec![TurnPlanStep { + step: "first".to_string(), + status: TurnPlanStepStatus::Pending, + }], + } + ); + } + other => bail!("unexpected message: {other:?}"), + } + assert!( + thread_state + .lock() + .await + .pending_terminal_plan_cleanup + .is_none(), + "terminal goal updates consume preserved cleanup state" + ); assert!(rx.try_recv().is_err(), "no extra messages expected"); Ok(()) } diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 8064c3db7d..95e92bfbe0 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -66,7 +66,12 @@ pub(crate) struct TurnSummary { pub(crate) started_at: Option, pub(crate) command_execution_started: HashSet, pub(crate) last_error: Option, - pub(crate) latest_plan_update: Option, +} + +#[derive(Clone)] +pub(crate) struct PendingTerminalPlanCleanup { + pub(crate) turn_id: String, + pub(crate) plan_update: UpdatePlanArgs, } #[derive(Default)] @@ -74,6 +79,7 @@ pub(crate) struct ThreadState { pub(crate) pending_interrupts: PendingInterruptQueue, pub(crate) pending_rollbacks: Option, pub(crate) turn_summary: TurnSummary, + pub(crate) pending_terminal_plan_cleanup: Option, pub(crate) last_terminal_turn_id: Option, pub(crate) cancel_tx: Option>, pub(crate) experimental_raw_events: bool,