Compare commits

...

2 Commits

Author SHA1 Message Date
Brent Traut
9f1467ee36 Fix persisted stale plan cleanup 2026-05-15 12:13:20 -07:00
Brent Traut
a6485f1313 Reset stale in-progress plan steps at terminal turns 2026-05-14 19:11:41 -07:00
9 changed files with 535 additions and 12 deletions

View File

@@ -14,6 +14,8 @@ use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnPlanStepStatus;
use codex_app_server_protocol::TurnPlanUpdatedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
@@ -24,6 +26,7 @@ use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Settings;
use core_test_support::responses;
use core_test_support::responses::ev_function_call;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
use std::collections::BTreeMap;
@@ -127,6 +130,90 @@ async fn plan_mode_without_proposed_plan_does_not_emit_plan_item() -> Result<()>
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn update_plan_cleanup_reaches_app_server_before_turn_completion() -> Result<()> {
skip_if_no_network!(Ok(()));
let plan_args = serde_json::json!({
"explanation": "track remaining work",
"plan": [
{"step": "inspect", "status": "in_progress"},
{"step": "report", "status": "pending"},
{"step": "finished", "status": "completed"},
],
})
.to_string();
let responses = vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
ev_function_call("call-update-plan", "update_plan", &plan_args),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-2"),
]),
];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let turn = start_default_turn(&mut mcp).await?;
let mut methods = Vec::new();
let mut updates = Vec::new();
let completed = loop {
let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??;
let JSONRPCMessage::Notification(notification) = message else {
continue;
};
methods.push(notification.method.clone());
match notification.method.as_str() {
"turn/plan/updated" => {
let params = notification.params.ok_or_else(|| {
anyhow!("turn/plan/updated notifications must include params")
})?;
updates.push(serde_json::from_value::<TurnPlanUpdatedNotification>(
params,
)?);
}
"turn/completed" => {
let params = notification
.params
.ok_or_else(|| anyhow!("turn/completed notifications must include params"))?;
break serde_json::from_value::<TurnCompletedNotification>(params)?;
}
_ => {}
}
};
wait_for_responses_request_count(&server, /*expected_count*/ 2).await?;
assert_eq!(completed.turn.id, turn.id);
assert_eq!(updates.len(), 2, "expected original and cleanup updates");
assert_eq!(updates[0].plan[0].status, TurnPlanStepStatus::InProgress);
assert_eq!(updates[1].plan[0].status, TurnPlanStepStatus::Pending);
assert_eq!(updates[1].plan[1].status, TurnPlanStepStatus::Pending);
assert_eq!(updates[1].plan[2].status, TurnPlanStepStatus::Completed);
let completed_index = methods
.iter()
.position(|method| method == "turn/completed")
.expect("turn completion should be observed");
let cleanup_index = methods
.iter()
.rposition(|method| method == "turn/plan/updated")
.expect("cleanup plan update should be observed");
assert!(
cleanup_index < completed_index,
"cleanup plan notification should precede turn completion"
);
Ok(())
}
async fn start_plan_mode_turn(mcp: &mut McpProcess) -> Result<codex_app_server_protocol::Turn> {
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
@@ -168,6 +255,38 @@ async fn start_plan_mode_turn(mcp: &mut McpProcess) -> Result<codex_app_server_p
Ok(to_response::<TurnStartResponse>(turn_resp)?.turn)
}
async fn start_default_turn(mcp: &mut McpProcess) -> Result<codex_app_server_protocol::Turn> {
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let thread = to_response::<ThreadStartResponse>(thread_resp)?.thread;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "Update the todo list".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
Ok(to_response::<TurnStartResponse>(turn_resp)?.turn)
}
async fn collect_turn_notifications(
mcp: &mut McpProcess,
) -> Result<(

View File

@@ -97,6 +97,8 @@ use codex_protocol::models::format_allow_prefixes;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::NetworkSandboxPolicy;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::FileChange;
use codex_protocol::protocol::HasLegacyEvent;
use codex_protocol::protocol::InterAgentCommunication;
@@ -107,6 +109,7 @@ use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::ThreadGoalStatus;
use codex_protocol::protocol::ThreadSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnContextItem;
@@ -1271,6 +1274,10 @@ impl Session {
reconstructed_rollout.reference_context_item,
)
.await;
{
let mut state = self.state.lock().await;
state.set_latest_plan_update(reconstructed_rollout.latest_plan_update);
}
self.set_previous_turn_settings(previous_turn_settings.clone())
.await;
previous_turn_settings
@@ -1586,6 +1593,10 @@ impl Session {
/// Persist the event to rollout and send it to clients.
pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) {
let legacy_source = msg.clone();
if let EventMsg::PlanUpdate(update) = &legacy_source {
let mut state = self.state.lock().await;
state.set_latest_plan_update(Some(update.clone()));
}
self.services
.rollout_thread_trace
.record_codex_turn_event(&turn_context.sub_id, &legacy_source);
@@ -1614,6 +1625,53 @@ impl Session {
}
}
pub(crate) async fn reset_in_progress_plan_steps_for_terminal_turn(
&self,
turn_context: &TurnContext,
preserve_active_goals: bool,
) {
if preserve_active_goals && self.has_active_thread_goal_for_plan_cleanup().await {
return;
}
let Some(mut latest_plan_update) = ({
let state = self.state.lock().await;
state.latest_plan_update()
}) else {
return;
};
let mut changed = false;
for item in &mut latest_plan_update.plan {
if matches!(item.status, StepStatus::InProgress) {
item.status = StepStatus::Pending;
changed = true;
}
}
if changed {
self.send_event(turn_context, EventMsg::PlanUpdate(latest_plan_update))
.await;
}
}
async fn has_active_thread_goal_for_plan_cleanup(&self) -> bool {
if !self.enabled(Feature::Goals) {
return false;
}
match self.get_thread_goal().await {
Ok(Some(goal)) => goal.status == ThreadGoalStatus::Active,
Ok(None) => false,
Err(err) => {
warn!(
"failed to inspect thread goal before terminal plan cleanup; preserving in-progress plan steps: {err}"
);
true
}
}
}
/// Forwards terminal turn events from spawned MultiAgentV2 children to their direct parent.
async fn maybe_notify_parent_of_terminal_turn(
&self,

View File

@@ -8,6 +8,7 @@ pub(super) struct RolloutReconstruction {
pub(super) history: Vec<ResponseItem>,
pub(super) previous_turn_settings: Option<PreviousTurnSettings>,
pub(super) reference_context_item: Option<TurnContextItem>,
pub(super) latest_plan_update: Option<UpdatePlanArgs>,
}
#[derive(Debug, Default)]
@@ -33,6 +34,7 @@ struct ActiveReplaySegment<'a> {
previous_turn_settings: Option<PreviousTurnSettings>,
reference_context_item: TurnReferenceContextItem,
base_replacement_history: Option<&'a [ResponseItem]>,
latest_plan_update: Option<UpdatePlanArgs>,
}
fn turn_ids_are_compatible(active_turn_id: Option<&str>, item_turn_id: Option<&str>) -> bool {
@@ -45,6 +47,7 @@ fn finalize_active_segment<'a>(
base_replacement_history: &mut Option<&'a [ResponseItem]>,
previous_turn_settings: &mut Option<PreviousTurnSettings>,
reference_context_item: &mut TurnReferenceContextItem,
latest_plan_update: &mut Option<UpdatePlanArgs>,
pending_rollback_turns: &mut usize,
) {
// Thread rollback drops the newest surviving real user-message boundaries. In replay, that
@@ -81,6 +84,14 @@ fn finalize_active_segment<'a>(
{
*reference_context_item = active_segment.reference_context_item;
}
// Plan updates follow turn rollback semantics too. The newest surviving reverse-replay
// segment that contains one becomes the plan snapshot to hydrate into live session state.
if latest_plan_update.is_none()
&& let Some(segment_latest_plan_update) = active_segment.latest_plan_update
{
*latest_plan_update = Some(segment_latest_plan_update);
}
}
impl Session {
@@ -97,6 +108,7 @@ impl Session {
let mut base_replacement_history: Option<&[ResponseItem]> = None;
let mut previous_turn_settings = None;
let mut reference_context_item = TurnReferenceContextItem::NeverSet;
let mut latest_plan_update = None;
// Rollback is "drop the newest N user turns". While scanning in reverse, that becomes
// "skip the next N user-turn segments we finalize".
let mut pending_rollback_turns = 0usize;
@@ -198,10 +210,18 @@ impl Session {
&mut base_replacement_history,
&mut previous_turn_settings,
&mut reference_context_item,
&mut latest_plan_update,
&mut pending_rollback_turns,
);
}
}
RolloutItem::EventMsg(EventMsg::PlanUpdate(update)) => {
let active_segment =
active_segment.get_or_insert_with(ActiveReplaySegment::default);
if active_segment.latest_plan_update.is_none() {
active_segment.latest_plan_update = Some(update.clone());
}
}
RolloutItem::ResponseItem(response_item) => {
let active_segment =
active_segment.get_or_insert_with(ActiveReplaySegment::default);
@@ -213,6 +233,7 @@ impl Session {
if base_replacement_history.is_some()
&& previous_turn_settings.is_some()
&& !matches!(reference_context_item, TurnReferenceContextItem::NeverSet)
&& latest_plan_update.is_some()
{
// At this point we have both eager resume metadata values and the replacement-
// history base for the surviving tail, so older rollout items cannot affect this
@@ -227,6 +248,7 @@ impl Session {
&mut base_replacement_history,
&mut previous_turn_settings,
&mut reference_context_item,
&mut latest_plan_update,
&mut pending_rollback_turns,
);
}
@@ -296,6 +318,7 @@ impl Session {
history: history.raw_items().to_vec(),
previous_turn_settings,
reference_context_item,
latest_plan_update,
}
}
}

View File

@@ -47,6 +47,9 @@ use codex_protocol::permissions::FileSystemPath;
use codex_protocol::permissions::FileSystemSandboxEntry;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::FileSystemSpecialPath;
use codex_protocol::plan_tool::PlanItemArg;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::NonSteerableTurnKind;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::TurnEnvironmentSelection;
@@ -2619,6 +2622,93 @@ async fn thread_rollback_recomputes_previous_turn_settings_and_reference_context
);
}
#[tokio::test]
async fn thread_rollback_refreshes_latest_plan_update_from_surviving_history() {
let (mut sess, _tc, rx) = make_session_and_context_with_rx().await;
attach_thread_persistence(
Arc::get_mut(&mut sess).expect("session should not have additional references"),
)
.await;
let surviving_plan = UpdatePlanArgs {
explanation: Some("surviving plan".to_string()),
plan: vec![PlanItemArg {
step: "keep this".to_string(),
status: StepStatus::Pending,
}],
};
let rolled_back_plan = UpdatePlanArgs {
explanation: Some("rolled back plan".to_string()),
plan: vec![PlanItemArg {
step: "drop this".to_string(),
status: StepStatus::InProgress,
}],
};
sess.persist_rollout_items(&[
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
started_at: None,
model_context_window: Some(128_000),
collaboration_mode_kind: ModeKind::Default,
})),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "turn 1 user".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
})),
RolloutItem::ResponseItem(user_message("turn 1 user")),
RolloutItem::EventMsg(EventMsg::PlanUpdate(surviving_plan.clone())),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
})),
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-2".to_string(),
started_at: None,
model_context_window: Some(128_000),
collaboration_mode_kind: ModeKind::Default,
})),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "turn 2 user".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
})),
RolloutItem::ResponseItem(user_message("turn 2 user")),
RolloutItem::EventMsg(EventMsg::PlanUpdate(rolled_back_plan.clone())),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-2".to_string(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
})),
])
.await;
{
let mut state = sess.state.lock().await;
state.set_latest_plan_update(Some(rolled_back_plan));
}
handlers::thread_rollback(&sess, "sub-1".to_string(), /*num_turns*/ 1).await;
let rollback_event = wait_for_thread_rolled_back(&rx).await;
assert_eq!(rollback_event.num_turns, 1);
let latest_plan_update = {
let state = sess.state.lock().await;
state.latest_plan_update()
};
assert_eq!(
serde_json::to_value(latest_plan_update).expect("serialize refreshed plan update"),
serde_json::to_value(Some(surviving_plan)).expect("serialize surviving plan update")
);
}
#[tokio::test]
async fn thread_rollback_restores_cleared_reference_context_item_after_compaction() {
let (mut sess, tc, rx) = make_session_and_context_with_rx().await;
@@ -8156,6 +8246,167 @@ async fn interrupt_accounts_active_goal_before_pausing() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test]
async fn interrupted_active_goal_resets_plan_progress_before_turn_abort() -> anyhow::Result<()> {
let (sess, tc, rx, _codex_home) = make_goal_session_and_context_with_rx().await;
sess.set_thread_goal(
tc.as_ref(),
SetGoalRequest {
objective: Some("Pause this goal on interrupt".to_string()),
status: Some(ThreadGoalStatus::Active),
token_budget: None,
},
)
.await?;
while rx.try_recv().is_ok() {}
sess.send_event(
tc.as_ref(),
EventMsg::PlanUpdate(UpdatePlanArgs {
explanation: Some("interrupt should reset this plan".to_string()),
plan: vec![PlanItemArg {
step: "continue work".to_string(),
status: StepStatus::InProgress,
}],
}),
)
.await;
while let Ok(event) = rx.recv().await {
if matches!(event.msg, EventMsg::PlanUpdate(_)) {
break;
}
}
sess.spawn_task(
Arc::clone(&tc),
Vec::new(),
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: false,
},
)
.await;
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
let mut saw_cleanup_before_abort = false;
while let Ok(event) = rx.recv().await {
match event.msg {
EventMsg::PlanUpdate(UpdatePlanArgs { plan, .. }) => {
saw_cleanup_before_abort |= plan
.iter()
.any(|item| matches!(item.status, StepStatus::Pending));
}
EventMsg::TurnAborted(_) => break,
_ => {}
}
}
assert!(
saw_cleanup_before_abort,
"interrupt cleanup should reset in-progress plan items before TurnAborted"
);
let goal = sess
.get_thread_goal()
.await?
.expect("goal should remain persisted after interrupt");
assert_eq!(ThreadGoalStatus::Paused, goal.status);
Ok(())
}
#[tokio::test]
async fn terminal_plan_cleanup_skips_active_goal() -> anyhow::Result<()> {
let (sess, tc, rx, _codex_home) = make_goal_session_and_context_with_rx().await;
sess.set_thread_goal(
tc.as_ref(),
SetGoalRequest {
objective: Some("Keep the current work moving".to_string()),
status: Some(ThreadGoalStatus::Active),
token_budget: None,
},
)
.await?;
while rx.try_recv().is_ok() {}
sess.send_event(
tc.as_ref(),
EventMsg::PlanUpdate(UpdatePlanArgs {
explanation: Some("active goal should preserve progress".to_string()),
plan: vec![PlanItemArg {
step: "continue work".to_string(),
status: StepStatus::InProgress,
}],
}),
)
.await;
while let Ok(event) = rx.recv().await {
if matches!(event.msg, EventMsg::PlanUpdate(_)) {
break;
}
}
sess.reset_in_progress_plan_steps_for_terminal_turn(
tc.as_ref(),
/*preserve_active_goals*/ true,
)
.await;
let emitted_cleanup = std::iter::from_fn(|| rx.try_recv().ok())
.any(|event| matches!(event.msg, EventMsg::PlanUpdate(UpdatePlanArgs { .. })));
assert!(
!emitted_cleanup,
"active goals should suppress terminal plan cleanup updates"
);
Ok(())
}
#[tokio::test]
async fn task_finish_without_terminal_turn_does_not_reset_plan_progress() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
sess.send_event(
tc.as_ref(),
EventMsg::PlanUpdate(UpdatePlanArgs {
explanation: Some("keep in progress until the turn really ends".to_string()),
plan: vec![PlanItemArg {
step: "continue work".to_string(),
status: StepStatus::InProgress,
}],
}),
)
.await;
while let Ok(event) = rx.recv().await {
if matches!(event.msg, EventMsg::PlanUpdate(_)) {
break;
}
}
sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None)
.await;
let mut saw_cleanup_update = false;
while let Ok(event) = rx.recv().await {
match event.msg {
EventMsg::PlanUpdate(UpdatePlanArgs { plan, .. }) => {
saw_cleanup_update |= plan
.iter()
.any(|item| matches!(item.status, StepStatus::Pending));
}
EventMsg::TurnComplete(_) => break,
_ => {}
}
}
assert!(
!saw_cleanup_update,
"non-terminal task completion should not reset in-progress plan items"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn active_goal_continuation_runs_again_after_no_tool_turn() -> anyhow::Result<()> {
let server = start_mock_server().await;

View File

@@ -2,6 +2,7 @@
use codex_protocol::models::AdditionalPermissionProfile;
use codex_protocol::models::ResponseItem;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_sandboxing::policy_transforms::merge_permission_profiles;
use std::collections::HashMap;
use std::collections::HashSet;
@@ -28,6 +29,10 @@ pub(crate) struct SessionState {
/// model/realtime handling on subsequent regular turns (including full-context
/// reinjection after resume or `/compact`).
previous_turn_settings: Option<PreviousTurnSettings>,
/// Latest todo/checklist plan update observed for the thread. The durable
/// source of truth remains the persisted event stream; this cache lets core
/// emit terminal-state corrections without rereading rollout storage.
latest_plan_update: Option<UpdatePlanArgs>,
/// Startup prewarmed session prepared during session initialization.
pub(crate) startup_prewarm: Option<SessionStartupPrewarmHandle>,
pub(crate) active_connector_selection: HashSet<String>,
@@ -48,6 +53,7 @@ impl SessionState {
dependency_env: HashMap::new(),
mcp_dependency_prompted: HashSet::new(),
previous_turn_settings: None,
latest_plan_update: None,
startup_prewarm: None,
active_connector_selection: HashSet::new(),
pending_session_start_source: None,
@@ -75,6 +81,14 @@ impl SessionState {
self.previous_turn_settings = previous_turn_settings;
}
pub(crate) fn latest_plan_update(&self) -> Option<UpdatePlanArgs> {
self.latest_plan_update.clone()
}
pub(crate) fn set_latest_plan_update(&mut self, update: Option<UpdatePlanArgs>) {
self.latest_plan_update = update;
}
pub(crate) fn set_next_turn_is_first(&mut self, value: bool) {
self.next_turn_is_first = value;
}

View File

@@ -767,6 +767,13 @@ impl Session {
{
warn!("failed to apply goal runtime turn-finished event: {err}");
}
if should_clear_active_turn {
self.reset_in_progress_plan_steps_for_terminal_turn(
turn_context.as_ref(),
/*preserve_active_goals*/ true,
)
.await;
}
let event = EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_context.sub_id.clone(),
last_agent_message,
@@ -867,6 +874,11 @@ impl Session {
}
}
self.reset_in_progress_plan_steps_for_terminal_turn(
task.turn_context.as_ref(),
reason != TurnAbortReason::Interrupted,
)
.await;
let (completed_at, duration_ms) = task
.turn_context
.turn_timing_state

View File

@@ -155,6 +155,7 @@ async fn update_plan_tool_emits_plan_update_event() -> anyhow::Result<()> {
"plan": [
{"step": "Inspect workspace", "status": "in_progress"},
{"step": "Report results", "status": "pending"},
{"step": "Ship fix", "status": "completed"},
],
})
.to_string();
@@ -199,16 +200,10 @@ async fn update_plan_tool_emits_plan_update_event() -> anyhow::Result<()> {
})
.await?;
let mut saw_plan_update = false;
let mut plan_updates = Vec::new();
wait_for_event(&codex, |event| match event {
EventMsg::PlanUpdate(update) => {
saw_plan_update = true;
assert_eq!(update.explanation.as_deref(), Some("Tool harness check"));
assert_eq!(update.plan.len(), 2);
assert_eq!(update.plan[0].step, "Inspect workspace");
assert_matches!(update.plan[0].status, StepStatus::InProgress);
assert_eq!(update.plan[1].step, "Report results");
assert_matches!(update.plan[1].status, StepStatus::Pending);
plan_updates.push(update.clone());
false
}
EventMsg::TurnComplete(_) => true,
@@ -216,7 +211,37 @@ async fn update_plan_tool_emits_plan_update_event() -> anyhow::Result<()> {
})
.await;
assert!(saw_plan_update, "expected PlanUpdate event");
assert_eq!(
plan_updates.len(),
2,
"expected original and cleanup plan updates"
);
let initial_update = &plan_updates[0];
assert_eq!(
initial_update.explanation.as_deref(),
Some("Tool harness check")
);
assert_eq!(initial_update.plan.len(), 3);
assert_eq!(initial_update.plan[0].step, "Inspect workspace");
assert_matches!(initial_update.plan[0].status, StepStatus::InProgress);
assert_eq!(initial_update.plan[1].step, "Report results");
assert_matches!(initial_update.plan[1].status, StepStatus::Pending);
assert_eq!(initial_update.plan[2].step, "Ship fix");
assert_matches!(initial_update.plan[2].status, StepStatus::Completed);
let cleanup_update = &plan_updates[1];
assert_eq!(
cleanup_update.explanation.as_deref(),
Some("Tool harness check")
);
assert_eq!(cleanup_update.plan.len(), 3);
assert_eq!(cleanup_update.plan[0].step, "Inspect workspace");
assert_matches!(cleanup_update.plan[0].status, StepStatus::Pending);
assert_eq!(cleanup_update.plan[1].step, "Report results");
assert_matches!(cleanup_update.plan[1].status, StepStatus::Pending);
assert_eq!(cleanup_update.plan[2].step, "Ship fix");
assert_matches!(cleanup_update.plan[2].status, StepStatus::Completed);
let req = second_mock.single_request();
let (output_text, _success_flag) = call_output(&req, call_id);

View File

@@ -197,9 +197,9 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
| EventMsg::RealtimeConversationListVoicesResponse(_)
| EventMsg::McpStartupUpdate(_)
| EventMsg::McpStartupComplete(_)
| EventMsg::WebSearchBegin(_)
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete
| EventMsg::WebSearchBegin(_) => None,
EventMsg::PlanUpdate(_) => Some(EventPersistenceMode::Limited),
EventMsg::ShutdownComplete
| EventMsg::DeprecationNotice(_)
| EventMsg::ItemStarted(_)
| EventMsg::HookStarted(_)

View File

@@ -18,8 +18,10 @@ use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use crate::EventPersistenceMode;
use crate::INTERACTIVE_SESSION_SOURCES;
use crate::find_thread_path_by_id_str;
use crate::is_persisted_rollout_item;
use crate::list::Cursor;
use crate::list::ThreadItem;
use crate::list::ThreadSortKey;
@@ -31,6 +33,9 @@ use anyhow::Result;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::plan_tool::PlanItemArg;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
@@ -45,6 +50,22 @@ use codex_protocol::protocol::UserMessageEvent;
const NO_SOURCE_FILTER: &[SessionSource] = &[];
const TEST_PROVIDER: &str = "test-provider";
#[test]
fn plan_updates_are_persisted_in_limited_rollouts() {
let item = RolloutItem::EventMsg(EventMsg::PlanUpdate(UpdatePlanArgs {
explanation: Some("keep plan state durable".to_string()),
plan: vec![PlanItemArg {
step: "finish persistence".to_string(),
status: StepStatus::InProgress,
}],
}));
assert!(is_persisted_rollout_item(
&item,
EventPersistenceMode::Limited
));
}
fn provider_vec(providers: &[&str]) -> Vec<String> {
providers
.iter()