mirror of
https://github.com/openai/codex.git
synced 2026-05-23 12:34:25 +00:00
Compare commits
2 Commits
windows-sa
...
btraut/fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f1467ee36 | ||
|
|
a6485f1313 |
@@ -14,6 +14,8 @@ use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnPlanStepStatus;
|
||||
use codex_app_server_protocol::TurnPlanUpdatedNotification;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
@@ -24,6 +26,7 @@ use codex_protocol::config_types::CollaborationMode;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Settings;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
@@ -127,6 +130,90 @@ async fn plan_mode_without_proposed_plan_does_not_emit_plan_item() -> Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn update_plan_cleanup_reaches_app_server_before_turn_completion() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let plan_args = serde_json::json!({
|
||||
"explanation": "track remaining work",
|
||||
"plan": [
|
||||
{"step": "inspect", "status": "in_progress"},
|
||||
{"step": "report", "status": "pending"},
|
||||
{"step": "finished", "status": "completed"},
|
||||
],
|
||||
})
|
||||
.to_string();
|
||||
let responses = vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
ev_function_call("call-update-plan", "update_plan", &plan_args),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let turn = start_default_turn(&mut mcp).await?;
|
||||
let mut methods = Vec::new();
|
||||
let mut updates = Vec::new();
|
||||
let completed = loop {
|
||||
let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??;
|
||||
let JSONRPCMessage::Notification(notification) = message else {
|
||||
continue;
|
||||
};
|
||||
methods.push(notification.method.clone());
|
||||
match notification.method.as_str() {
|
||||
"turn/plan/updated" => {
|
||||
let params = notification.params.ok_or_else(|| {
|
||||
anyhow!("turn/plan/updated notifications must include params")
|
||||
})?;
|
||||
updates.push(serde_json::from_value::<TurnPlanUpdatedNotification>(
|
||||
params,
|
||||
)?);
|
||||
}
|
||||
"turn/completed" => {
|
||||
let params = notification
|
||||
.params
|
||||
.ok_or_else(|| anyhow!("turn/completed notifications must include params"))?;
|
||||
break serde_json::from_value::<TurnCompletedNotification>(params)?;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
};
|
||||
wait_for_responses_request_count(&server, /*expected_count*/ 2).await?;
|
||||
|
||||
assert_eq!(completed.turn.id, turn.id);
|
||||
assert_eq!(updates.len(), 2, "expected original and cleanup updates");
|
||||
assert_eq!(updates[0].plan[0].status, TurnPlanStepStatus::InProgress);
|
||||
assert_eq!(updates[1].plan[0].status, TurnPlanStepStatus::Pending);
|
||||
assert_eq!(updates[1].plan[1].status, TurnPlanStepStatus::Pending);
|
||||
assert_eq!(updates[1].plan[2].status, TurnPlanStepStatus::Completed);
|
||||
|
||||
let completed_index = methods
|
||||
.iter()
|
||||
.position(|method| method == "turn/completed")
|
||||
.expect("turn completion should be observed");
|
||||
let cleanup_index = methods
|
||||
.iter()
|
||||
.rposition(|method| method == "turn/plan/updated")
|
||||
.expect("cleanup plan update should be observed");
|
||||
assert!(
|
||||
cleanup_index < completed_index,
|
||||
"cleanup plan notification should precede turn completion"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_plan_mode_turn(mcp: &mut McpProcess) -> Result<codex_app_server_protocol::Turn> {
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
@@ -168,6 +255,38 @@ async fn start_plan_mode_turn(mcp: &mut McpProcess) -> Result<codex_app_server_p
|
||||
Ok(to_response::<TurnStartResponse>(turn_resp)?.turn)
|
||||
}
|
||||
|
||||
async fn start_default_turn(mcp: &mut McpProcess) -> Result<codex_app_server_protocol::Turn> {
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let thread = to_response::<ThreadStartResponse>(thread_resp)?.thread;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Update the todo list".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
Ok(to_response::<TurnStartResponse>(turn_resp)?.turn)
|
||||
}
|
||||
|
||||
async fn collect_turn_notifications(
|
||||
mcp: &mut McpProcess,
|
||||
) -> Result<(
|
||||
|
||||
@@ -97,6 +97,8 @@ use codex_protocol::models::format_allow_prefixes;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::permissions::FileSystemSandboxPolicy;
|
||||
use codex_protocol::permissions::NetworkSandboxPolicy;
|
||||
use codex_protocol::plan_tool::StepStatus;
|
||||
use codex_protocol::plan_tool::UpdatePlanArgs;
|
||||
use codex_protocol::protocol::FileChange;
|
||||
use codex_protocol::protocol::HasLegacyEvent;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
@@ -107,6 +109,7 @@ use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::ThreadGoalStatus;
|
||||
use codex_protocol::protocol::ThreadSource;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
@@ -1271,6 +1274,10 @@ impl Session {
|
||||
reconstructed_rollout.reference_context_item,
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_latest_plan_update(reconstructed_rollout.latest_plan_update);
|
||||
}
|
||||
self.set_previous_turn_settings(previous_turn_settings.clone())
|
||||
.await;
|
||||
previous_turn_settings
|
||||
@@ -1586,6 +1593,10 @@ impl Session {
|
||||
/// Persist the event to rollout and send it to clients.
|
||||
pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) {
|
||||
let legacy_source = msg.clone();
|
||||
if let EventMsg::PlanUpdate(update) = &legacy_source {
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_latest_plan_update(Some(update.clone()));
|
||||
}
|
||||
self.services
|
||||
.rollout_thread_trace
|
||||
.record_codex_turn_event(&turn_context.sub_id, &legacy_source);
|
||||
@@ -1614,6 +1625,53 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn reset_in_progress_plan_steps_for_terminal_turn(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
preserve_active_goals: bool,
|
||||
) {
|
||||
if preserve_active_goals && self.has_active_thread_goal_for_plan_cleanup().await {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(mut latest_plan_update) = ({
|
||||
let state = self.state.lock().await;
|
||||
state.latest_plan_update()
|
||||
}) else {
|
||||
return;
|
||||
};
|
||||
|
||||
let mut changed = false;
|
||||
for item in &mut latest_plan_update.plan {
|
||||
if matches!(item.status, StepStatus::InProgress) {
|
||||
item.status = StepStatus::Pending;
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if changed {
|
||||
self.send_event(turn_context, EventMsg::PlanUpdate(latest_plan_update))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn has_active_thread_goal_for_plan_cleanup(&self) -> bool {
|
||||
if !self.enabled(Feature::Goals) {
|
||||
return false;
|
||||
}
|
||||
|
||||
match self.get_thread_goal().await {
|
||||
Ok(Some(goal)) => goal.status == ThreadGoalStatus::Active,
|
||||
Ok(None) => false,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to inspect thread goal before terminal plan cleanup; preserving in-progress plan steps: {err}"
|
||||
);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Forwards terminal turn events from spawned MultiAgentV2 children to their direct parent.
|
||||
async fn maybe_notify_parent_of_terminal_turn(
|
||||
&self,
|
||||
|
||||
@@ -8,6 +8,7 @@ pub(super) struct RolloutReconstruction {
|
||||
pub(super) history: Vec<ResponseItem>,
|
||||
pub(super) previous_turn_settings: Option<PreviousTurnSettings>,
|
||||
pub(super) reference_context_item: Option<TurnContextItem>,
|
||||
pub(super) latest_plan_update: Option<UpdatePlanArgs>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -33,6 +34,7 @@ struct ActiveReplaySegment<'a> {
|
||||
previous_turn_settings: Option<PreviousTurnSettings>,
|
||||
reference_context_item: TurnReferenceContextItem,
|
||||
base_replacement_history: Option<&'a [ResponseItem]>,
|
||||
latest_plan_update: Option<UpdatePlanArgs>,
|
||||
}
|
||||
|
||||
fn turn_ids_are_compatible(active_turn_id: Option<&str>, item_turn_id: Option<&str>) -> bool {
|
||||
@@ -45,6 +47,7 @@ fn finalize_active_segment<'a>(
|
||||
base_replacement_history: &mut Option<&'a [ResponseItem]>,
|
||||
previous_turn_settings: &mut Option<PreviousTurnSettings>,
|
||||
reference_context_item: &mut TurnReferenceContextItem,
|
||||
latest_plan_update: &mut Option<UpdatePlanArgs>,
|
||||
pending_rollback_turns: &mut usize,
|
||||
) {
|
||||
// Thread rollback drops the newest surviving real user-message boundaries. In replay, that
|
||||
@@ -81,6 +84,14 @@ fn finalize_active_segment<'a>(
|
||||
{
|
||||
*reference_context_item = active_segment.reference_context_item;
|
||||
}
|
||||
|
||||
// Plan updates follow turn rollback semantics too. The newest surviving reverse-replay
|
||||
// segment that contains one becomes the plan snapshot to hydrate into live session state.
|
||||
if latest_plan_update.is_none()
|
||||
&& let Some(segment_latest_plan_update) = active_segment.latest_plan_update
|
||||
{
|
||||
*latest_plan_update = Some(segment_latest_plan_update);
|
||||
}
|
||||
}
|
||||
|
||||
impl Session {
|
||||
@@ -97,6 +108,7 @@ impl Session {
|
||||
let mut base_replacement_history: Option<&[ResponseItem]> = None;
|
||||
let mut previous_turn_settings = None;
|
||||
let mut reference_context_item = TurnReferenceContextItem::NeverSet;
|
||||
let mut latest_plan_update = None;
|
||||
// Rollback is "drop the newest N user turns". While scanning in reverse, that becomes
|
||||
// "skip the next N user-turn segments we finalize".
|
||||
let mut pending_rollback_turns = 0usize;
|
||||
@@ -198,10 +210,18 @@ impl Session {
|
||||
&mut base_replacement_history,
|
||||
&mut previous_turn_settings,
|
||||
&mut reference_context_item,
|
||||
&mut latest_plan_update,
|
||||
&mut pending_rollback_turns,
|
||||
);
|
||||
}
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::PlanUpdate(update)) => {
|
||||
let active_segment =
|
||||
active_segment.get_or_insert_with(ActiveReplaySegment::default);
|
||||
if active_segment.latest_plan_update.is_none() {
|
||||
active_segment.latest_plan_update = Some(update.clone());
|
||||
}
|
||||
}
|
||||
RolloutItem::ResponseItem(response_item) => {
|
||||
let active_segment =
|
||||
active_segment.get_or_insert_with(ActiveReplaySegment::default);
|
||||
@@ -213,6 +233,7 @@ impl Session {
|
||||
if base_replacement_history.is_some()
|
||||
&& previous_turn_settings.is_some()
|
||||
&& !matches!(reference_context_item, TurnReferenceContextItem::NeverSet)
|
||||
&& latest_plan_update.is_some()
|
||||
{
|
||||
// At this point we have both eager resume metadata values and the replacement-
|
||||
// history base for the surviving tail, so older rollout items cannot affect this
|
||||
@@ -227,6 +248,7 @@ impl Session {
|
||||
&mut base_replacement_history,
|
||||
&mut previous_turn_settings,
|
||||
&mut reference_context_item,
|
||||
&mut latest_plan_update,
|
||||
&mut pending_rollback_turns,
|
||||
);
|
||||
}
|
||||
@@ -296,6 +318,7 @@ impl Session {
|
||||
history: history.raw_items().to_vec(),
|
||||
previous_turn_settings,
|
||||
reference_context_item,
|
||||
latest_plan_update,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,6 +47,9 @@ use codex_protocol::permissions::FileSystemPath;
|
||||
use codex_protocol::permissions::FileSystemSandboxEntry;
|
||||
use codex_protocol::permissions::FileSystemSandboxPolicy;
|
||||
use codex_protocol::permissions::FileSystemSpecialPath;
|
||||
use codex_protocol::plan_tool::PlanItemArg;
|
||||
use codex_protocol::plan_tool::StepStatus;
|
||||
use codex_protocol::plan_tool::UpdatePlanArgs;
|
||||
use codex_protocol::protocol::NonSteerableTurnKind;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
@@ -2619,6 +2622,93 @@ async fn thread_rollback_recomputes_previous_turn_settings_and_reference_context
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_rollback_refreshes_latest_plan_update_from_surviving_history() {
|
||||
let (mut sess, _tc, rx) = make_session_and_context_with_rx().await;
|
||||
attach_thread_persistence(
|
||||
Arc::get_mut(&mut sess).expect("session should not have additional references"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let surviving_plan = UpdatePlanArgs {
|
||||
explanation: Some("surviving plan".to_string()),
|
||||
plan: vec![PlanItemArg {
|
||||
step: "keep this".to_string(),
|
||||
status: StepStatus::Pending,
|
||||
}],
|
||||
};
|
||||
let rolled_back_plan = UpdatePlanArgs {
|
||||
explanation: Some("rolled back plan".to_string()),
|
||||
plan: vec![PlanItemArg {
|
||||
step: "drop this".to_string(),
|
||||
status: StepStatus::InProgress,
|
||||
}],
|
||||
};
|
||||
|
||||
sess.persist_rollout_items(&[
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
started_at: None,
|
||||
model_context_window: Some(128_000),
|
||||
collaboration_mode_kind: ModeKind::Default,
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "turn 1 user".to_string(),
|
||||
images: None,
|
||||
local_images: Vec::new(),
|
||||
text_elements: Vec::new(),
|
||||
})),
|
||||
RolloutItem::ResponseItem(user_message("turn 1 user")),
|
||||
RolloutItem::EventMsg(EventMsg::PlanUpdate(surviving_plan.clone())),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
time_to_first_token_ms: None,
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-2".to_string(),
|
||||
started_at: None,
|
||||
model_context_window: Some(128_000),
|
||||
collaboration_mode_kind: ModeKind::Default,
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "turn 2 user".to_string(),
|
||||
images: None,
|
||||
local_images: Vec::new(),
|
||||
text_elements: Vec::new(),
|
||||
})),
|
||||
RolloutItem::ResponseItem(user_message("turn 2 user")),
|
||||
RolloutItem::EventMsg(EventMsg::PlanUpdate(rolled_back_plan.clone())),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-2".to_string(),
|
||||
last_agent_message: None,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
time_to_first_token_ms: None,
|
||||
})),
|
||||
])
|
||||
.await;
|
||||
{
|
||||
let mut state = sess.state.lock().await;
|
||||
state.set_latest_plan_update(Some(rolled_back_plan));
|
||||
}
|
||||
|
||||
handlers::thread_rollback(&sess, "sub-1".to_string(), /*num_turns*/ 1).await;
|
||||
let rollback_event = wait_for_thread_rolled_back(&rx).await;
|
||||
assert_eq!(rollback_event.num_turns, 1);
|
||||
|
||||
let latest_plan_update = {
|
||||
let state = sess.state.lock().await;
|
||||
state.latest_plan_update()
|
||||
};
|
||||
assert_eq!(
|
||||
serde_json::to_value(latest_plan_update).expect("serialize refreshed plan update"),
|
||||
serde_json::to_value(Some(surviving_plan)).expect("serialize surviving plan update")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_rollback_restores_cleared_reference_context_item_after_compaction() {
|
||||
let (mut sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
@@ -8156,6 +8246,167 @@ async fn interrupt_accounts_active_goal_before_pausing() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn interrupted_active_goal_resets_plan_progress_before_turn_abort() -> anyhow::Result<()> {
|
||||
let (sess, tc, rx, _codex_home) = make_goal_session_and_context_with_rx().await;
|
||||
sess.set_thread_goal(
|
||||
tc.as_ref(),
|
||||
SetGoalRequest {
|
||||
objective: Some("Pause this goal on interrupt".to_string()),
|
||||
status: Some(ThreadGoalStatus::Active),
|
||||
token_budget: None,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
while rx.try_recv().is_ok() {}
|
||||
|
||||
sess.send_event(
|
||||
tc.as_ref(),
|
||||
EventMsg::PlanUpdate(UpdatePlanArgs {
|
||||
explanation: Some("interrupt should reset this plan".to_string()),
|
||||
plan: vec![PlanItemArg {
|
||||
step: "continue work".to_string(),
|
||||
status: StepStatus::InProgress,
|
||||
}],
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
while let Ok(event) = rx.recv().await {
|
||||
if matches!(event.msg, EventMsg::PlanUpdate(_)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
sess.spawn_task(
|
||||
Arc::clone(&tc),
|
||||
Vec::new(),
|
||||
NeverEndingTask {
|
||||
kind: TaskKind::Regular,
|
||||
listen_to_cancellation_token: false,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
|
||||
let mut saw_cleanup_before_abort = false;
|
||||
while let Ok(event) = rx.recv().await {
|
||||
match event.msg {
|
||||
EventMsg::PlanUpdate(UpdatePlanArgs { plan, .. }) => {
|
||||
saw_cleanup_before_abort |= plan
|
||||
.iter()
|
||||
.any(|item| matches!(item.status, StepStatus::Pending));
|
||||
}
|
||||
EventMsg::TurnAborted(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
assert!(
|
||||
saw_cleanup_before_abort,
|
||||
"interrupt cleanup should reset in-progress plan items before TurnAborted"
|
||||
);
|
||||
|
||||
let goal = sess
|
||||
.get_thread_goal()
|
||||
.await?
|
||||
.expect("goal should remain persisted after interrupt");
|
||||
assert_eq!(ThreadGoalStatus::Paused, goal.status);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminal_plan_cleanup_skips_active_goal() -> anyhow::Result<()> {
|
||||
let (sess, tc, rx, _codex_home) = make_goal_session_and_context_with_rx().await;
|
||||
sess.set_thread_goal(
|
||||
tc.as_ref(),
|
||||
SetGoalRequest {
|
||||
objective: Some("Keep the current work moving".to_string()),
|
||||
status: Some(ThreadGoalStatus::Active),
|
||||
token_budget: None,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
while rx.try_recv().is_ok() {}
|
||||
|
||||
sess.send_event(
|
||||
tc.as_ref(),
|
||||
EventMsg::PlanUpdate(UpdatePlanArgs {
|
||||
explanation: Some("active goal should preserve progress".to_string()),
|
||||
plan: vec![PlanItemArg {
|
||||
step: "continue work".to_string(),
|
||||
status: StepStatus::InProgress,
|
||||
}],
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
while let Ok(event) = rx.recv().await {
|
||||
if matches!(event.msg, EventMsg::PlanUpdate(_)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
sess.reset_in_progress_plan_steps_for_terminal_turn(
|
||||
tc.as_ref(),
|
||||
/*preserve_active_goals*/ true,
|
||||
)
|
||||
.await;
|
||||
|
||||
let emitted_cleanup = std::iter::from_fn(|| rx.try_recv().ok())
|
||||
.any(|event| matches!(event.msg, EventMsg::PlanUpdate(UpdatePlanArgs { .. })));
|
||||
assert!(
|
||||
!emitted_cleanup,
|
||||
"active goals should suppress terminal plan cleanup updates"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_finish_without_terminal_turn_does_not_reset_plan_progress() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
|
||||
sess.send_event(
|
||||
tc.as_ref(),
|
||||
EventMsg::PlanUpdate(UpdatePlanArgs {
|
||||
explanation: Some("keep in progress until the turn really ends".to_string()),
|
||||
plan: vec![PlanItemArg {
|
||||
step: "continue work".to_string(),
|
||||
status: StepStatus::InProgress,
|
||||
}],
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
while let Ok(event) = rx.recv().await {
|
||||
if matches!(event.msg, EventMsg::PlanUpdate(_)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None)
|
||||
.await;
|
||||
|
||||
let mut saw_cleanup_update = false;
|
||||
while let Ok(event) = rx.recv().await {
|
||||
match event.msg {
|
||||
EventMsg::PlanUpdate(UpdatePlanArgs { plan, .. }) => {
|
||||
saw_cleanup_update |= plan
|
||||
.iter()
|
||||
.any(|item| matches!(item.status, StepStatus::Pending));
|
||||
}
|
||||
EventMsg::TurnComplete(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
!saw_cleanup_update,
|
||||
"non-terminal task completion should not reset in-progress plan items"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn active_goal_continuation_runs_again_after_no_tool_turn() -> anyhow::Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
use codex_protocol::models::AdditionalPermissionProfile;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::plan_tool::UpdatePlanArgs;
|
||||
use codex_sandboxing::policy_transforms::merge_permission_profiles;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
@@ -28,6 +29,10 @@ pub(crate) struct SessionState {
|
||||
/// model/realtime handling on subsequent regular turns (including full-context
|
||||
/// reinjection after resume or `/compact`).
|
||||
previous_turn_settings: Option<PreviousTurnSettings>,
|
||||
/// Latest todo/checklist plan update observed for the thread. The durable
|
||||
/// source of truth remains the persisted event stream; this cache lets core
|
||||
/// emit terminal-state corrections without rereading rollout storage.
|
||||
latest_plan_update: Option<UpdatePlanArgs>,
|
||||
/// Startup prewarmed session prepared during session initialization.
|
||||
pub(crate) startup_prewarm: Option<SessionStartupPrewarmHandle>,
|
||||
pub(crate) active_connector_selection: HashSet<String>,
|
||||
@@ -48,6 +53,7 @@ impl SessionState {
|
||||
dependency_env: HashMap::new(),
|
||||
mcp_dependency_prompted: HashSet::new(),
|
||||
previous_turn_settings: None,
|
||||
latest_plan_update: None,
|
||||
startup_prewarm: None,
|
||||
active_connector_selection: HashSet::new(),
|
||||
pending_session_start_source: None,
|
||||
@@ -75,6 +81,14 @@ impl SessionState {
|
||||
self.previous_turn_settings = previous_turn_settings;
|
||||
}
|
||||
|
||||
pub(crate) fn latest_plan_update(&self) -> Option<UpdatePlanArgs> {
|
||||
self.latest_plan_update.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn set_latest_plan_update(&mut self, update: Option<UpdatePlanArgs>) {
|
||||
self.latest_plan_update = update;
|
||||
}
|
||||
|
||||
pub(crate) fn set_next_turn_is_first(&mut self, value: bool) {
|
||||
self.next_turn_is_first = value;
|
||||
}
|
||||
|
||||
@@ -767,6 +767,13 @@ impl Session {
|
||||
{
|
||||
warn!("failed to apply goal runtime turn-finished event: {err}");
|
||||
}
|
||||
if should_clear_active_turn {
|
||||
self.reset_in_progress_plan_steps_for_terminal_turn(
|
||||
turn_context.as_ref(),
|
||||
/*preserve_active_goals*/ true,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
let event = EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
last_agent_message,
|
||||
@@ -867,6 +874,11 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
self.reset_in_progress_plan_steps_for_terminal_turn(
|
||||
task.turn_context.as_ref(),
|
||||
reason != TurnAbortReason::Interrupted,
|
||||
)
|
||||
.await;
|
||||
let (completed_at, duration_ms) = task
|
||||
.turn_context
|
||||
.turn_timing_state
|
||||
|
||||
@@ -155,6 +155,7 @@ async fn update_plan_tool_emits_plan_update_event() -> anyhow::Result<()> {
|
||||
"plan": [
|
||||
{"step": "Inspect workspace", "status": "in_progress"},
|
||||
{"step": "Report results", "status": "pending"},
|
||||
{"step": "Ship fix", "status": "completed"},
|
||||
],
|
||||
})
|
||||
.to_string();
|
||||
@@ -199,16 +200,10 @@ async fn update_plan_tool_emits_plan_update_event() -> anyhow::Result<()> {
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut saw_plan_update = false;
|
||||
let mut plan_updates = Vec::new();
|
||||
wait_for_event(&codex, |event| match event {
|
||||
EventMsg::PlanUpdate(update) => {
|
||||
saw_plan_update = true;
|
||||
assert_eq!(update.explanation.as_deref(), Some("Tool harness check"));
|
||||
assert_eq!(update.plan.len(), 2);
|
||||
assert_eq!(update.plan[0].step, "Inspect workspace");
|
||||
assert_matches!(update.plan[0].status, StepStatus::InProgress);
|
||||
assert_eq!(update.plan[1].step, "Report results");
|
||||
assert_matches!(update.plan[1].status, StepStatus::Pending);
|
||||
plan_updates.push(update.clone());
|
||||
false
|
||||
}
|
||||
EventMsg::TurnComplete(_) => true,
|
||||
@@ -216,7 +211,37 @@ async fn update_plan_tool_emits_plan_update_event() -> anyhow::Result<()> {
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(saw_plan_update, "expected PlanUpdate event");
|
||||
assert_eq!(
|
||||
plan_updates.len(),
|
||||
2,
|
||||
"expected original and cleanup plan updates"
|
||||
);
|
||||
|
||||
let initial_update = &plan_updates[0];
|
||||
assert_eq!(
|
||||
initial_update.explanation.as_deref(),
|
||||
Some("Tool harness check")
|
||||
);
|
||||
assert_eq!(initial_update.plan.len(), 3);
|
||||
assert_eq!(initial_update.plan[0].step, "Inspect workspace");
|
||||
assert_matches!(initial_update.plan[0].status, StepStatus::InProgress);
|
||||
assert_eq!(initial_update.plan[1].step, "Report results");
|
||||
assert_matches!(initial_update.plan[1].status, StepStatus::Pending);
|
||||
assert_eq!(initial_update.plan[2].step, "Ship fix");
|
||||
assert_matches!(initial_update.plan[2].status, StepStatus::Completed);
|
||||
|
||||
let cleanup_update = &plan_updates[1];
|
||||
assert_eq!(
|
||||
cleanup_update.explanation.as_deref(),
|
||||
Some("Tool harness check")
|
||||
);
|
||||
assert_eq!(cleanup_update.plan.len(), 3);
|
||||
assert_eq!(cleanup_update.plan[0].step, "Inspect workspace");
|
||||
assert_matches!(cleanup_update.plan[0].status, StepStatus::Pending);
|
||||
assert_eq!(cleanup_update.plan[1].step, "Report results");
|
||||
assert_matches!(cleanup_update.plan[1].status, StepStatus::Pending);
|
||||
assert_eq!(cleanup_update.plan[2].step, "Ship fix");
|
||||
assert_matches!(cleanup_update.plan[2].status, StepStatus::Completed);
|
||||
|
||||
let req = second_mock.single_request();
|
||||
let (output_text, _success_flag) = call_output(&req, call_id);
|
||||
|
||||
@@ -197,9 +197,9 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
|
||||
| EventMsg::RealtimeConversationListVoicesResponse(_)
|
||||
| EventMsg::McpStartupUpdate(_)
|
||||
| EventMsg::McpStartupComplete(_)
|
||||
| EventMsg::WebSearchBegin(_)
|
||||
| EventMsg::PlanUpdate(_)
|
||||
| EventMsg::ShutdownComplete
|
||||
| EventMsg::WebSearchBegin(_) => None,
|
||||
EventMsg::PlanUpdate(_) => Some(EventPersistenceMode::Limited),
|
||||
EventMsg::ShutdownComplete
|
||||
| EventMsg::DeprecationNotice(_)
|
||||
| EventMsg::ItemStarted(_)
|
||||
| EventMsg::HookStarted(_)
|
||||
|
||||
@@ -18,8 +18,10 @@ use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::EventPersistenceMode;
|
||||
use crate::INTERACTIVE_SESSION_SOURCES;
|
||||
use crate::find_thread_path_by_id_str;
|
||||
use crate::is_persisted_rollout_item;
|
||||
use crate::list::Cursor;
|
||||
use crate::list::ThreadItem;
|
||||
use crate::list::ThreadSortKey;
|
||||
@@ -31,6 +33,9 @@ use anyhow::Result;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::plan_tool::PlanItemArg;
|
||||
use codex_protocol::plan_tool::StepStatus;
|
||||
use codex_protocol::plan_tool::UpdatePlanArgs;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
@@ -45,6 +50,22 @@ use codex_protocol::protocol::UserMessageEvent;
|
||||
const NO_SOURCE_FILTER: &[SessionSource] = &[];
|
||||
const TEST_PROVIDER: &str = "test-provider";
|
||||
|
||||
#[test]
|
||||
fn plan_updates_are_persisted_in_limited_rollouts() {
|
||||
let item = RolloutItem::EventMsg(EventMsg::PlanUpdate(UpdatePlanArgs {
|
||||
explanation: Some("keep plan state durable".to_string()),
|
||||
plan: vec![PlanItemArg {
|
||||
step: "finish persistence".to_string(),
|
||||
status: StepStatus::InProgress,
|
||||
}],
|
||||
}));
|
||||
|
||||
assert!(is_persisted_rollout_item(
|
||||
&item,
|
||||
EventPersistenceMode::Limited
|
||||
));
|
||||
}
|
||||
|
||||
fn provider_vec(providers: &[&str]) -> Vec<String> {
|
||||
providers
|
||||
.iter()
|
||||
|
||||
Reference in New Issue
Block a user