diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 1a4aa35e6f..e01da8989d 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1548,7 +1548,7 @@ fn thread_rollback_response_from_stored_thread( "thread {thread_id} did not include persisted history after rollback" )); }; - populate_thread_turns_from_history(&mut thread, &history.items, /*active_turn*/ None); + populate_thread_turns_from_history(&mut thread, &history.items, &[]); thread.status = loaded_status; Ok(ThreadRollbackResponse { thread }) } diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 985baac91a..54aa0e397f 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -526,23 +526,24 @@ pub(super) async fn handle_pending_thread_resume_request( pending_thread_unloads: &Arc>>, pending: crate::thread_state::PendingThreadResumeRequest, ) { - let active_turn = { + let live_turns = { let state = thread_state.lock().await; - state.active_turn_snapshot() + state.live_turn_snapshots() }; + let latest_live_turn = live_turns.last(); tracing::debug!( thread_id = %conversation_id, request_id = ?pending.request_id, - active_turn_present = active_turn.is_some(), - active_turn_id = ?active_turn.as_ref().map(|turn| turn.id.as_str()), - active_turn_status = ?active_turn.as_ref().map(|turn| &turn.status), + live_turn_present = latest_live_turn.is_some(), + latest_live_turn_id = ?latest_live_turn.as_ref().map(|turn| turn.id.as_str()), + latest_live_turn_status = ?latest_live_turn.as_ref().map(|turn| &turn.status), "composing running thread resume response" ); let has_live_in_progress_turn = matches!(conversation.agent_status().await, AgentStatus::Running) - || active_turn - .as_ref() - .is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress)); + || live_turns + .iter() + .any(|turn| matches!(turn.status, TurnStatus::InProgress)); let request_id = pending.request_id; let connection_id = request_id.connection_id; @@ -551,7 +552,7 @@ pub(super) async fn handle_pending_thread_resume_request( populate_thread_turns_from_history( &mut thread, &pending.history_items, - active_turn.as_ref(), + live_turns.as_slice(), ); } @@ -716,11 +717,11 @@ pub(super) async fn send_thread_goal_snapshot_notification( pub(crate) fn populate_thread_turns_from_history( thread: &mut Thread, items: &[RolloutItem], - active_turn: Option<&Turn>, + live_turns: &[Turn], ) { let mut turns = build_api_turns_from_rollout_items(items); - if let Some(active_turn) = active_turn { - merge_turn_history_with_active_turn(&mut turns, active_turn.clone()); + for turn in live_turns { + merge_turn_history_with_live_turn(&mut turns, turn.clone()); } thread.turns = turns; } @@ -750,9 +751,9 @@ pub(super) async fn resolve_pending_server_request( .await; } -pub(super) fn merge_turn_history_with_active_turn(turns: &mut Vec, active_turn: Turn) { - turns.retain(|turn| turn.id != active_turn.id); - turns.push(active_turn); +pub(super) fn merge_turn_history_with_live_turn(turns: &mut Vec, live_turn: Turn) { + turns.retain(|turn| turn.id != live_turn.id); + turns.push(live_turn); } pub(super) fn set_thread_status_and_interrupt_stale_turns( diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index e9d80089e9..7dd3702dcd 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -2086,14 +2086,12 @@ impl ThreadRequestProcessor { .load_history(/*include_archived*/ true) .await .map_err(|err| thread_read_history_load_error(thread_id, err))?; - let live_turn = { + let live_turns = { let thread_state = self.thread_state_manager.thread_state(thread_id).await; let state = thread_state.lock().await; - state - .active_turn_snapshot() - .or_else(|| state.last_terminal_turn_snapshot()) + state.live_turn_snapshots() }; - populate_thread_turns_from_history(thread, &history.items, live_turn.as_ref()); + populate_thread_turns_from_history(thread, &history.items, live_turns.as_slice()); } Ok(()) @@ -2129,19 +2127,17 @@ impl ThreadRequestProcessor { Some(thread) => matches!(thread.agent_status().await, AgentStatus::Running), None => false, }; - let active_turn = if loaded_thread.is_some() { + let live_turns = if loaded_thread.is_some() { // Persisted history may not yet include the currently running turn. The // app-server listener has already projected live turn events into - // ThreadState, so merge that in-memory snapshot before paginating. A - // failed turn may also complete before its error event is persisted, so - // bridge the latest terminal failed snapshot too. + // ThreadState, so merge those in-memory snapshots before paginating. + // A failed turn may also complete before its error event is persisted, + // so bridge terminal failed snapshots too. let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; let state = thread_state.lock().await; - state - .active_turn_snapshot() - .or_else(|| state.last_terminal_turn_snapshot()) + state.live_turn_snapshots() } else { - None + Vec::new() }; let mut turns = reconstruct_thread_turns_for_turns_list( &items, @@ -2149,7 +2145,7 @@ impl ThreadRequestProcessor { .loaded_status_for_thread(&thread_uuid.to_string()) .await, has_live_running_thread, - active_turn, + live_turns.as_slice(), ); for turn in &mut turns { match items_view { @@ -2866,11 +2862,7 @@ impl ThreadRequestProcessor { let (mut thread, history) = thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd); if include_turns && let Some(history) = history { - populate_thread_turns_from_history( - &mut thread, - &history.items, - /*active_turn*/ None, - ); + populate_thread_turns_from_history(&mut thread, &history.items, &[]); } thread } @@ -2979,11 +2971,7 @@ impl ThreadRequestProcessor { thread.path = Some(rollout_path.to_path_buf()); if include_turns { let history_items = thread_history.get_rollout_items(); - populate_thread_turns_from_history( - &mut thread, - &history_items, - /*active_turn*/ None, - ); + populate_thread_turns_from_history(&mut thread, &history_items, &[]); } self.attach_thread_name(thread_id, &mut thread).await; Ok(thread) @@ -3177,11 +3165,7 @@ impl ThreadRequestProcessor { thread.preview = preview_from_rollout_items(&history_items); thread.forked_from_id = Some(source_thread_id.to_string()); if include_turns { - populate_thread_turns_from_history( - &mut thread, - &history_items, - /*active_turn*/ None, - ); + populate_thread_turns_from_history(&mut thread, &history_items, &[]); } thread }; @@ -3545,16 +3529,16 @@ fn reconstruct_thread_turns_for_turns_list( items: &[RolloutItem], loaded_status: ThreadStatus, has_live_running_thread: bool, - active_turn: Option, + live_turns: &[Turn], ) -> Vec { let has_live_in_progress_turn = has_live_running_thread - || active_turn - .as_ref() - .is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress)); + || live_turns + .iter() + .any(|turn| matches!(turn.status, TurnStatus::InProgress)); let mut turns = build_api_turns_from_rollout_items(items); normalize_thread_turns_status(&mut turns, loaded_status, has_live_in_progress_turn); - if let Some(active_turn) = active_turn { - merge_turn_history_with_active_turn(&mut turns, active_turn); + for turn in live_turns { + merge_turn_history_with_live_turn(&mut turns, turn.clone()); } turns } diff --git a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs index f3528a1df3..0ec8cc5eb0 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs @@ -274,12 +274,87 @@ mod thread_processor_behavior_tests { &persisted_items, ThreadStatus::Idle, /*has_live_running_thread*/ false, - Some(active_turn.clone()), + std::slice::from_ref(&active_turn), ); assert_eq!(turns.last(), Some(&active_turn)); } + #[test] + fn thread_turns_list_merges_terminal_failure_and_active_turn() { + let persisted_items = vec![RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "persisted".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + ..Default::default() + }, + ))]; + let failed_terminal_turn = Turn { + id: "turn-1".to_string(), + items: Vec::new(), + items_view: TurnItemsView::Full, + error: Some(TurnError { + message: "schema failed".to_string(), + codex_error_info: None, + additional_details: None, + data: None, + }), + status: TurnStatus::Failed, + started_at: None, + completed_at: None, + duration_ms: None, + }; + let active_turn = Turn { + id: "turn-2".to_string(), + items: Vec::new(), + items_view: TurnItemsView::Full, + error: None, + status: TurnStatus::InProgress, + started_at: None, + completed_at: None, + duration_ms: None, + }; + + let turns = reconstruct_thread_turns_for_turns_list( + &persisted_items, + ThreadStatus::Idle, + /*has_live_running_thread*/ false, + &[failed_terminal_turn.clone(), active_turn.clone()], + ); + + assert!(turns.contains(&failed_terminal_turn)); + assert_eq!(turns.last(), Some(&active_turn)); + } + + #[test] + fn app_server_history_replay_preserves_limited_error_events() { + let error = "schema failed"; + let turns = build_api_turns_from_rollout_items(&[ + RolloutItem::EventMsg(EventMsg::UserMessage( + codex_protocol::protocol::UserMessageEvent { + message: "persisted".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + ..Default::default() + }, + )), + RolloutItem::EventMsg(EventMsg::Error(codex_protocol::protocol::ErrorEvent { + message: error.to_string(), + codex_error_info: None, + })), + ]); + + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].status, TurnStatus::Failed); + assert_eq!( + turns[0].error.as_ref().map(|error| error.message.as_str()), + Some(error) + ); + } + #[test] fn validate_dynamic_tools_rejects_empty_namespace() { let tools = vec![ApiDynamicToolSpec { diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index c413fe4291..1f294683d0 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -148,12 +148,22 @@ impl ThreadState { self.last_terminal_turn.clone() } + pub(crate) fn live_turn_snapshots(&self) -> Vec { + let mut turns = Vec::new(); + if let Some(turn) = self.last_terminal_turn_snapshot() { + turns.push(turn); + } + if let Some(turn) = self.active_turn_snapshot() { + turns.retain(|existing| existing.id != turn.id); + turns.push(turn); + } + turns + } + pub(crate) fn track_current_turn_event(&mut self, event_turn_id: &str, event: &EventMsg) { if let EventMsg::TurnStarted(payload) = event { self.turn_summary.started_at = payload.started_at; self.turn_summary.last_error = None; - self.last_terminal_turn_id = None; - self.last_terminal_turn = None; } self.current_turn_history.handle_event(event); if matches!(event, EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_)) @@ -229,6 +239,7 @@ mod tests { use codex_protocol::config_types::Settings; use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo; use codex_protocol::protocol::ErrorEvent; + use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use pretty_assertions::assert_eq; @@ -266,6 +277,59 @@ mod tests { assert_eq!(turn.error, state.turn_summary.last_error); } + #[test] + fn live_turn_snapshots_preserve_failed_terminal_turn_after_next_turn_starts() { + let mut state = ThreadState::default(); + state.track_current_turn_event("turn-1", &turn_started_event("turn-1")); + state.track_current_turn_event( + "turn-1", + &EventMsg::Error(ErrorEvent { + message: "raw error".to_string(), + codex_error_info: Some(CoreCodexErrorInfo::BadRequest), + }), + ); + state.turn_summary.last_error = Some(TurnError { + message: "enriched error".to_string(), + codex_error_info: Some(CodexErrorInfo::BadRequest), + additional_details: None, + data: None, + }); + state.track_current_turn_event("turn-1", &turn_complete_event("turn-1")); + state.track_current_turn_event("turn-2", &turn_started_event("turn-2")); + + let turns = state.live_turn_snapshots(); + + assert_eq!(turns.len(), 2); + assert_eq!(turns[0].id, "turn-1"); + assert_eq!(turns[0].status, TurnStatus::Failed); + assert_eq!( + turns[0].error, + state.last_terminal_turn_snapshot().unwrap().error + ); + assert_eq!(turns[1].id, "turn-2"); + assert_eq!(turns[1].status, TurnStatus::InProgress); + assert_eq!(turns[1].error, None); + } + + fn turn_started_event(turn_id: &str) -> EventMsg { + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn_id.to_string(), + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + }) + } + + fn turn_complete_event(turn_id: &str) -> EventMsg { + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: turn_id.to_string(), + last_agent_message: None, + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + }) + } + #[test] fn note_thread_settings_reports_only_effective_changes() { let mut state = ThreadState::default(); diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index b6dbba98f8..f4bd653c4b 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -151,6 +151,9 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option { | EventMsg::TurnComplete(_) | EventMsg::WebSearchEnd(_) | EventMsg::ImageGenerationEnd(_) => Some(EventPersistenceMode::Limited), + EventMsg::Error(event) if event.affects_turn_status() => { + Some(EventPersistenceMode::Limited) + } EventMsg::ItemCompleted(event) => { // Plan items are derived from streaming tags and are not part of the // raw ResponseItem history, so we persist their completion to replay @@ -219,3 +222,36 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option { | EventMsg::CollabResumeBegin(_) => None, } } + +#[cfg(test)] +mod tests { + use super::*; + use codex_protocol::protocol::CodexErrorInfo; + use codex_protocol::protocol::ErrorEvent; + + #[test] + fn limited_persistence_includes_turn_affecting_errors() { + let item = RolloutItem::EventMsg(EventMsg::Error(ErrorEvent { + message: "backend schema failed".to_string(), + codex_error_info: Some(CodexErrorInfo::BadRequest), + })); + + assert!(is_persisted_rollout_item( + &item, + EventPersistenceMode::Limited + )); + } + + #[test] + fn limited_persistence_skips_non_turn_affecting_errors() { + let item = RolloutItem::EventMsg(EventMsg::Error(ErrorEvent { + message: "rollback failed".to_string(), + codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed), + })); + + assert!(!is_persisted_rollout_item( + &item, + EventPersistenceMode::Limited + )); + } +}