app-server: reset terminal plan progress without goals

This commit is contained in:
Brent Traut
2026-05-17 16:12:35 -07:00
parent 05b8ce4354
commit 2cd3522272
2 changed files with 344 additions and 30 deletions

View File

@@ -88,6 +88,7 @@ use codex_core::review_prompts;
use codex_protocol::ThreadId;
use codex_protocol::items::parse_hook_prompt_message;
use codex_protocol::models::AdditionalPermissionProfile as CoreAdditionalPermissionProfile;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo;
use codex_protocol::protocol::Event;
@@ -181,6 +182,8 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing.abort_pending_server_requests().await;
respond_to_pending_interrupts(&thread_state, &outgoing).await;
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some();
let has_active_goal =
thread_has_active_goal(conversation.as_ref(), conversation_id).await;
thread_watch_manager
.note_turn_completed(&conversation_id.to_string(), turn_failed)
.await;
@@ -188,6 +191,7 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id,
event_turn_id,
turn_complete_event,
has_active_goal,
&outgoing,
&thread_state,
)
@@ -1113,6 +1117,8 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing.abort_pending_server_requests().await;
respond_to_pending_interrupts(&thread_state, &outgoing).await;
let has_active_goal =
thread_has_active_goal(conversation.as_ref(), conversation_id).await;
thread_watch_manager
.note_turn_interrupted(&conversation_id.to_string())
.await;
@@ -1120,6 +1126,7 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id,
event_turn_id,
turn_aborted_event,
has_active_goal,
&outgoing,
&thread_state,
)
@@ -1209,6 +1216,7 @@ pub(crate) async fn apply_bespoke_event_handling(
&event_turn_id,
plan_update_event,
&outgoing,
&thread_state,
)
.await;
}
@@ -1243,21 +1251,10 @@ async fn handle_turn_plan_update(
event_turn_id: &str,
plan_update_event: UpdatePlanArgs,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
// `update_plan` is a todo/checklist tool; it is not related to plan-mode updates
let notification = TurnPlanUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.to_string(),
explanation: plan_update_event.explanation,
plan: plan_update_event
.plan
.into_iter()
.map(TurnPlanStep::from)
.collect(),
};
outgoing
.send_server_notification(ServerNotification::TurnPlanUpdated(notification))
.await;
thread_state.lock().await.turn_summary.latest_plan_update = Some(plan_update_event.clone());
emit_turn_plan_updated(conversation_id, event_turn_id, plan_update_event, outgoing).await;
}
struct TurnCompletionMetadata {
@@ -1292,6 +1289,74 @@ async fn emit_turn_completed_with_status(
.await;
}
async fn emit_terminal_plan_cleanup(
conversation_id: ThreadId,
event_turn_id: &str,
latest_plan_update: Option<UpdatePlanArgs>,
has_active_goal: bool,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
if has_active_goal {
return;
}
let Some(mut latest_plan_update) = latest_plan_update else {
return;
};
let mut downgraded = false;
for item in &mut latest_plan_update.plan {
if matches!(item.status, StepStatus::InProgress) {
item.status = StepStatus::Pending;
downgraded = true;
}
}
if !downgraded {
return;
}
latest_plan_update.explanation = None;
emit_turn_plan_updated(conversation_id, event_turn_id, latest_plan_update, outgoing).await;
}
async fn emit_turn_plan_updated(
conversation_id: ThreadId,
event_turn_id: &str,
plan_update_event: UpdatePlanArgs,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
// `update_plan` is a todo/checklist tool; it is not related to plan-mode updates.
let notification = TurnPlanUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.to_string(),
explanation: plan_update_event.explanation,
plan: plan_update_event
.plan
.into_iter()
.map(TurnPlanStep::from)
.collect(),
};
outgoing
.send_server_notification(ServerNotification::TurnPlanUpdated(notification))
.await;
}
async fn thread_has_active_goal(conversation: &CodexThread, conversation_id: ThreadId) -> bool {
let Some(state_db) = conversation.state_db() else {
return false;
};
match state_db.get_thread_goal(conversation_id).await {
Ok(goal) => goal.is_some_and(|goal| goal.status == codex_state::ThreadGoalStatus::Active),
Err(err) => {
tracing::warn!(
thread_id = %conversation_id,
"failed to read thread goal before terminal plan cleanup: {err}"
);
true
}
}
}
#[allow(clippy::too_many_arguments)]
async fn start_command_execution_item(
conversation_id: &ThreadId,
@@ -1438,10 +1503,7 @@ pub(crate) async fn maybe_emit_hook_prompt_item_completed(
.await;
}
async fn find_and_remove_turn_summary(
_conversation_id: ThreadId,
thread_state: &Arc<Mutex<ThreadState>>,
) -> TurnSummary {
async fn find_and_remove_turn_summary(thread_state: &Arc<Mutex<ThreadState>>) -> TurnSummary {
let mut state = thread_state.lock().await;
std::mem::take(&mut state.turn_summary)
}
@@ -1450,10 +1512,19 @@ async fn handle_turn_complete(
conversation_id: ThreadId,
event_turn_id: String,
turn_complete_event: TurnCompleteEvent,
has_active_goal: bool,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
let turn_summary = find_and_remove_turn_summary(conversation_id, thread_state).await;
let turn_summary = find_and_remove_turn_summary(thread_state).await;
emit_terminal_plan_cleanup(
conversation_id,
&event_turn_id,
turn_summary.latest_plan_update,
has_active_goal,
outgoing,
)
.await;
let (status, error) = match turn_summary.last_error {
Some(error) => (TurnStatus::Failed, Some(error)),
@@ -1479,10 +1550,19 @@ async fn handle_turn_interrupted(
conversation_id: ThreadId,
event_turn_id: String,
turn_aborted_event: TurnAbortedEvent,
has_active_goal: bool,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
let turn_summary = find_and_remove_turn_summary(conversation_id, thread_state).await;
let turn_summary = find_and_remove_turn_summary(thread_state).await;
emit_terminal_plan_cleanup(
conversation_id,
&event_turn_id,
turn_summary.latest_plan_update,
has_active_goal,
outgoing,
)
.await;
emit_turn_completed_with_status(
conversation_id,
@@ -3163,7 +3243,7 @@ mod tests {
)
.await;
let turn_summary = find_and_remove_turn_summary(conversation_id, &thread_state).await;
let turn_summary = find_and_remove_turn_summary(&thread_state).await;
assert_eq!(
turn_summary.last_error,
Some(TurnError {
@@ -3296,6 +3376,7 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*has_active_goal*/ false,
&outgoing,
&thread_state,
)
@@ -3349,6 +3430,7 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_aborted_event(&event_turn_id),
/*has_active_goal*/ false,
&outgoing,
&thread_state,
)
@@ -3399,6 +3481,7 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*has_active_goal*/ false,
&outgoing,
&thread_state,
)
@@ -3453,20 +3536,246 @@ mod tests {
};
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
handle_turn_plan_update(conversation_id, "turn-123", update, &outgoing).await;
handle_turn_plan_update(
conversation_id,
"turn-123",
update,
&outgoing,
&thread_state,
)
.await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => {
assert_eq!(n.thread_id, conversation_id.to_string());
assert_eq!(n.turn_id, "turn-123");
assert_eq!(n.explanation.as_deref(), Some("need plan"));
assert_eq!(n.plan.len(), 2);
assert_eq!(n.plan[0].step, "first");
assert_eq!(n.plan[0].status, TurnPlanStepStatus::Pending);
assert_eq!(n.plan[1].step, "second");
assert_eq!(n.plan[1].status, TurnPlanStepStatus::Completed);
assert_eq!(
n,
TurnPlanUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id: "turn-123".to_string(),
explanation: Some("need plan".to_string()),
plan: vec![
TurnPlanStep {
step: "first".to_string(),
status: TurnPlanStepStatus::Pending,
},
TurnPlanStep {
step: "second".to_string(),
status: TurnPlanStepStatus::Completed,
},
],
}
);
}
other => bail!("unexpected message: {other:?}"),
}
let cached = thread_state
.lock()
.await
.turn_summary
.latest_plan_update
.clone()
.expect("plan update should be cached");
assert_eq!(cached.explanation.as_deref(), Some("need plan"));
assert_eq!(cached.plan.len(), 2);
assert_eq!(cached.plan[0].step, "first");
assert!(matches!(cached.plan[0].status, StepStatus::Pending));
assert_eq!(cached.plan[1].step, "second");
assert!(matches!(cached.plan[1].status, StepStatus::Completed));
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_turn_complete_downgrades_in_progress_plan_without_active_goal()
-> Result<()> {
let conversation_id = ThreadId::new();
let event_turn_id = "complete_with_plan".to_string();
let thread_state = new_thread_state();
thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![
PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
},
PlanItemArg {
step: "second".to_string(),
status: StepStatus::Completed,
},
PlanItemArg {
step: "third".to_string(),
status: StepStatus::InProgress,
},
],
});
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
handle_turn_complete(
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*has_active_goal*/ false,
&outgoing,
&thread_state,
)
.await;
let first = recv_broadcast_message(&mut rx).await?;
match first {
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => {
assert_eq!(
n,
TurnPlanUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id,
explanation: None,
plan: vec![
TurnPlanStep {
step: "first".to_string(),
status: TurnPlanStepStatus::Pending,
},
TurnPlanStep {
step: "second".to_string(),
status: TurnPlanStepStatus::Completed,
},
TurnPlanStep {
step: "third".to_string(),
status: TurnPlanStepStatus::Pending,
},
],
}
);
}
other => bail!("unexpected message: {other:?}"),
}
let second = recv_broadcast_message(&mut rx).await?;
match second {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, "complete_with_plan");
assert_eq!(n.turn.status, TurnStatus::Completed);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_turn_interrupted_downgrades_in_progress_plan_without_active_goal()
-> Result<()> {
let conversation_id = ThreadId::new();
let event_turn_id = "interrupt_with_plan".to_string();
let thread_state = new_thread_state();
thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
}],
});
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
handle_turn_interrupted(
conversation_id,
event_turn_id.clone(),
turn_aborted_event(&event_turn_id),
/*has_active_goal*/ false,
&outgoing,
&thread_state,
)
.await;
let first = recv_broadcast_message(&mut rx).await?;
match first {
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => {
assert_eq!(
n,
TurnPlanUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id,
explanation: None,
plan: vec![TurnPlanStep {
step: "first".to_string(),
status: TurnPlanStepStatus::Pending,
}],
}
);
}
other => bail!("unexpected message: {other:?}"),
}
let second = recv_broadcast_message(&mut rx).await?;
match second {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, "interrupt_with_plan");
assert_eq!(n.turn.status, TurnStatus::Interrupted);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_turn_complete_preserves_in_progress_plan_with_active_goal() -> Result<()> {
let conversation_id = ThreadId::new();
let event_turn_id = "complete_active_goal".to_string();
let thread_state = new_thread_state();
thread_state.lock().await.turn_summary.latest_plan_update = Some(UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
}],
});
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
handle_turn_complete(
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*has_active_goal*/ true,
&outgoing,
&thread_state,
)
.await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
assert_eq!(n.turn.status, TurnStatus::Completed);
}
other => bail!("unexpected message: {other:?}"),
}
@@ -3633,6 +3942,7 @@ mod tests {
conversation_a,
a_turn1.clone(),
turn_complete_event(&a_turn1),
/*has_active_goal*/ false,
&outgoing,
&thread_state,
)
@@ -3654,6 +3964,7 @@ mod tests {
conversation_b,
b_turn1.clone(),
turn_complete_event(&b_turn1),
/*has_active_goal*/ false,
&outgoing,
&thread_state,
)
@@ -3665,6 +3976,7 @@ mod tests {
conversation_a,
a_turn2.clone(),
turn_complete_event(&a_turn2),
/*has_active_goal*/ false,
&outgoing,
&thread_state,
)

View File

@@ -9,6 +9,7 @@ use codex_core::CodexThread;
use codex_core::ThreadConfigSnapshot;
use codex_file_watcher::WatchRegistration;
use codex_protocol::ThreadId;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_rollout::state_db::StateDbHandle;
@@ -65,6 +66,7 @@ pub(crate) struct TurnSummary {
pub(crate) started_at: Option<i64>,
pub(crate) command_execution_started: HashSet<String>,
pub(crate) last_error: Option<TurnError>,
pub(crate) latest_plan_update: Option<UpdatePlanArgs>,
}
#[derive(Default)]