Compare commits

...

18 Commits

Author SHA1 Message Date
Brent Traut
4cca083ddc app-server: drop stale cleanup on rollback failures 2026-05-19 10:48:57 -07:00
Brent Traut
74926f7e86 app-server: simplify terminal cleanup decisions 2026-05-19 10:48:57 -07:00
Brent Traut
39d320d3fd app-server: tighten deferred plan cleanup edge cases 2026-05-19 10:48:56 -07:00
Brent Traut
be5acf7c57 app-server: simplify deferred cleanup plumbing 2026-05-19 10:48:56 -07:00
Brent Traut
7eeeb53ae5 app-server: preserve deferred cleanup across rollback 2026-05-19 10:48:56 -07:00
Brent Traut
5e007ccff4 app-server: inline pending cleanup check 2026-05-19 10:48:56 -07:00
Brent Traut
678543f365 app-server: bound deferred cleanup state 2026-05-19 10:48:56 -07:00
Brent Traut
370c054f5b app-server: simplify interrupted cleanup flushing 2026-05-19 10:48:56 -07:00
Brent Traut
c8f3195a54 app-server: tighten deferred plan cleanup boundaries 2026-05-19 10:48:56 -07:00
Brent Traut
26931a3927 app-server: centralize deferred plan cleanup flushing 2026-05-19 10:48:56 -07:00
Brent Traut
e26a615fc3 app-server: scope deferred plan cleanup notifications 2026-05-19 10:48:56 -07:00
Brent Traut
97bcd94066 app-server: simplify deferred cleanup emission 2026-05-19 10:48:56 -07:00
Brent Traut
8ed9601d2d app-server: flush deferred cleanup on goal lifecycle changes 2026-05-19 10:48:56 -07:00
Brent Traut
7cd4941b75 app-server: trim deferred cleanup state 2026-05-19 10:48:55 -07:00
Brent Traut
9ed73b3b8a app-server: retain deferred plan cleanup across goal turns 2026-05-19 10:48:55 -07:00
Brent Traut
7f1ece0c79 app-server: skip unnecessary terminal goal lookups 2026-05-19 10:48:55 -07:00
Brent Traut
5151acb6e5 app-server: clean plan progress after interrupted goal turns 2026-05-19 10:48:55 -07:00
Brent Traut
2cd3522272 app-server: reset terminal plan progress without goals 2026-05-19 10:48:55 -07:00
4 changed files with 1401 additions and 29 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -403,6 +403,7 @@ impl ThreadGoalRequestProcessor {
goal: ThreadGoal,
listener_command_tx: Option<tokio::sync::mpsc::UnboundedSender<ThreadListenerCommand>>,
) {
let goal_is_active = goal.status == ThreadGoalStatus::Active;
if let Some(listener_command_tx) = listener_command_tx {
let command = crate::thread_state::ThreadListenerCommand::EmitThreadGoalUpdated {
goal: goal.clone(),
@@ -423,6 +424,9 @@ impl ThreadGoalRequestProcessor {
},
))
.await;
if !goal_is_active {
self.emit_pending_terminal_plan_cleanup(thread_id).await;
}
}
async fn emit_thread_goal_cleared_ordered(
@@ -446,6 +450,26 @@ impl ThreadGoalRequestProcessor {
},
))
.await;
self.emit_pending_terminal_plan_cleanup(thread_id).await;
}
async fn emit_pending_terminal_plan_cleanup(&self, thread_id: ThreadId) {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let subscribed_connection_ids = self
.thread_state_manager
.subscribed_connection_ids(thread_id)
.await;
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
self.outgoing.clone(),
subscribed_connection_ids,
thread_id,
);
crate::bespoke_event_handling::flush_pending_terminal_plan_cleanup(
thread_id,
&thread_state,
&thread_outgoing,
)
.await;
}
}
@@ -497,3 +521,92 @@ fn parse_thread_id_for_request(thread_id: &str) -> Result<ThreadId, JSONRPCError
ThreadId::from_string(thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use crate::thread_state::ConnectionCapabilities;
use crate::thread_state::PendingTerminalPlanCleanup;
use codex_protocol::plan_tool::PlanItemArg;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
use core_test_support::load_default_config_for_test;
use tempfile::TempDir;
use tokio::sync::mpsc;
async fn recv_message(rx: &mut mpsc::Receiver<OutgoingEnvelope>) -> OutgoingMessage {
match rx.recv().await.expect("expected outgoing message") {
OutgoingEnvelope::Broadcast { message }
| OutgoingEnvelope::ToConnection { message, .. } => message,
}
}
#[tokio::test]
async fn goal_clear_fallback_flushes_pending_terminal_plan_cleanup() -> anyhow::Result<()> {
let codex_home = TempDir::new()?;
let config = load_default_config_for_test(&codex_home).await;
let thread_manager = Arc::new(
codex_core::test_support::thread_manager_with_models_provider_and_home(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
),
);
let thread_id = ThreadId::new();
let thread_state_manager = ThreadStateManager::new();
let connection_id = ConnectionId(1);
thread_state_manager
.connection_initialized(connection_id, ConnectionCapabilities::default())
.await;
thread_state_manager
.try_ensure_connection_subscribed(
thread_id,
connection_id,
/*experimental_raw_events*/ false,
)
.await
.expect("connection should subscribe");
let thread_state = thread_state_manager.thread_state(thread_id).await;
thread_state.lock().await.pending_terminal_plan_cleanups =
vec![PendingTerminalPlanCleanup {
turn_id: "terminal-turn".to_string(),
plan_update: UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
}],
},
}];
let (tx, mut rx) = mpsc::channel(4);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let processor = ThreadGoalRequestProcessor::new(
thread_manager,
outgoing,
Arc::new(config),
thread_state_manager,
/*state_db*/ None,
);
processor
.emit_thread_goal_cleared_ordered(thread_id, /*listener_command_tx*/ None)
.await;
assert!(matches!(
recv_message(&mut rx).await,
OutgoingMessage::AppServerNotification(ServerNotification::ThreadGoalCleared(_))
));
assert!(matches!(
recv_message(&mut rx).await,
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(_))
));
Ok(())
}
}

View File

@@ -465,6 +465,7 @@ pub(super) async fn handle_thread_listener_command(
.await;
}
ThreadListenerCommand::EmitThreadGoalUpdated { goal } => {
let goal_is_active = goal.status == ThreadGoalStatus::Active;
outgoing
.send_server_notification(ServerNotification::ThreadGoalUpdated(
ThreadGoalUpdatedNotification {
@@ -474,6 +475,15 @@ pub(super) async fn handle_thread_listener_command(
},
))
.await;
if !goal_is_active {
flush_pending_terminal_plan_cleanup_for_subscribers(
conversation_id,
thread_state_manager,
thread_state,
outgoing,
)
.await;
}
}
ThreadListenerCommand::EmitThreadGoalCleared => {
outgoing
@@ -483,6 +493,13 @@ pub(super) async fn handle_thread_listener_command(
},
))
.await;
flush_pending_terminal_plan_cleanup_for_subscribers(
conversation_id,
thread_state_manager,
thread_state,
outgoing,
)
.await;
}
ThreadListenerCommand::EmitThreadGoalSnapshot { state_db } => {
send_thread_goal_snapshot_notification(outgoing, conversation_id, &state_db).await;
@@ -503,6 +520,28 @@ pub(super) async fn handle_thread_listener_command(
}
}
async fn flush_pending_terminal_plan_cleanup_for_subscribers(
conversation_id: ThreadId,
thread_state_manager: &ThreadStateManager,
thread_state: &Arc<Mutex<ThreadState>>,
outgoing: &Arc<OutgoingMessageSender>,
) {
let subscribed_connection_ids = thread_state_manager
.subscribed_connection_ids(conversation_id)
.await;
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
subscribed_connection_ids,
conversation_id,
);
crate::bespoke_event_handling::flush_pending_terminal_plan_cleanup(
conversation_id,
thread_state,
&thread_outgoing,
)
.await;
}
#[allow(clippy::too_many_arguments)]
#[expect(
clippy::await_holding_invalid_type,
@@ -763,3 +802,98 @@ pub(super) fn set_thread_status_and_interrupt_stale_turns(
}
thread.status = status;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use crate::thread_state::ConnectionCapabilities;
use crate::thread_state::PendingTerminalPlanCleanup;
use codex_protocol::plan_tool::PlanItemArg;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
use core_test_support::load_default_config_for_test;
use tempfile::TempDir;
use tokio::sync::mpsc;
async fn recv_message(rx: &mut mpsc::Receiver<OutgoingEnvelope>) -> OutgoingMessage {
match rx.recv().await.expect("expected outgoing message") {
OutgoingEnvelope::Broadcast { message }
| OutgoingEnvelope::ToConnection { message, .. } => message,
}
}
#[tokio::test]
async fn listener_goal_clear_flushes_pending_terminal_plan_cleanup() -> anyhow::Result<()> {
let codex_home = TempDir::new()?;
let config = load_default_config_for_test(&codex_home).await;
let thread_manager = Arc::new(
codex_core::test_support::thread_manager_with_models_provider_and_home(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
),
);
let codex_core::NewThread {
thread_id,
thread: conversation,
..
} = thread_manager.start_thread(config).await?;
let thread_state_manager = ThreadStateManager::new();
let connection_id = ConnectionId(1);
thread_state_manager
.connection_initialized(connection_id, ConnectionCapabilities::default())
.await;
thread_state_manager
.try_ensure_connection_subscribed(
thread_id,
connection_id,
/*experimental_raw_events*/ false,
)
.await
.expect("connection should subscribe");
let thread_state = thread_state_manager.thread_state(thread_id).await;
thread_state.lock().await.pending_terminal_plan_cleanups =
vec![PendingTerminalPlanCleanup {
turn_id: "terminal-turn".to_string(),
plan_update: UpdatePlanArgs {
explanation: Some("still working".to_string()),
plan: vec![PlanItemArg {
step: "first".to_string(),
status: StepStatus::InProgress,
}],
},
}];
let (tx, mut rx) = mpsc::channel(4);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
handle_thread_listener_command(
thread_id,
&conversation,
codex_home.path(),
&thread_state_manager,
&thread_state,
&ThreadWatchManager::new(),
&outgoing,
&Arc::new(Mutex::new(HashSet::new())),
ThreadListenerCommand::EmitThreadGoalCleared,
)
.await;
assert!(matches!(
recv_message(&mut rx).await,
OutgoingMessage::AppServerNotification(ServerNotification::ThreadGoalCleared(_))
));
assert!(matches!(
recv_message(&mut rx).await,
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(_))
));
Ok(())
}
}

View File

@@ -9,6 +9,7 @@ use codex_core::CodexThread;
use codex_core::ThreadConfigSnapshot;
use codex_file_watcher::WatchRegistration;
use codex_protocol::ThreadId;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_rollout::state_db::StateDbHandle;
@@ -67,11 +68,17 @@ pub(crate) struct TurnSummary {
pub(crate) last_error: Option<TurnError>,
}
pub(crate) struct PendingTerminalPlanCleanup {
pub(crate) turn_id: String,
pub(crate) plan_update: UpdatePlanArgs,
}
#[derive(Default)]
pub(crate) struct ThreadState {
pub(crate) pending_interrupts: PendingInterruptQueue,
pub(crate) pending_rollbacks: Option<ConnectionRequestId>,
pub(crate) turn_summary: TurnSummary,
pub(crate) pending_terminal_plan_cleanups: Vec<PendingTerminalPlanCleanup>,
pub(crate) last_terminal_turn_id: Option<String>,
pub(crate) cancel_tx: Option<oneshot::Sender<()>>,
pub(crate) experimental_raw_events: bool,