fix(app-server): preserve failed turn diagnostics

This commit is contained in:
Alex Kotliarskyi
2026-05-21 13:04:50 -07:00
parent 267f6a46c9
commit 660d504a87
6 changed files with 214 additions and 54 deletions

View File

@@ -1548,7 +1548,7 @@ fn thread_rollback_response_from_stored_thread(
"thread {thread_id} did not include persisted history after rollback"
));
};
populate_thread_turns_from_history(&mut thread, &history.items, /*active_turn*/ None);
populate_thread_turns_from_history(&mut thread, &history.items, &[]);
thread.status = loaded_status;
Ok(ThreadRollbackResponse { thread })
}

View File

@@ -526,23 +526,24 @@ pub(super) async fn handle_pending_thread_resume_request(
pending_thread_unloads: &Arc<Mutex<HashSet<ThreadId>>>,
pending: crate::thread_state::PendingThreadResumeRequest,
) {
let active_turn = {
let live_turns = {
let state = thread_state.lock().await;
state.active_turn_snapshot()
state.live_turn_snapshots()
};
let latest_live_turn = live_turns.last();
tracing::debug!(
thread_id = %conversation_id,
request_id = ?pending.request_id,
active_turn_present = active_turn.is_some(),
active_turn_id = ?active_turn.as_ref().map(|turn| turn.id.as_str()),
active_turn_status = ?active_turn.as_ref().map(|turn| &turn.status),
live_turn_present = latest_live_turn.is_some(),
latest_live_turn_id = ?latest_live_turn.as_ref().map(|turn| turn.id.as_str()),
latest_live_turn_status = ?latest_live_turn.as_ref().map(|turn| &turn.status),
"composing running thread resume response"
);
let has_live_in_progress_turn =
matches!(conversation.agent_status().await, AgentStatus::Running)
|| active_turn
.as_ref()
.is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress));
|| live_turns
.iter()
.any(|turn| matches!(turn.status, TurnStatus::InProgress));
let request_id = pending.request_id;
let connection_id = request_id.connection_id;
@@ -551,7 +552,7 @@ pub(super) async fn handle_pending_thread_resume_request(
populate_thread_turns_from_history(
&mut thread,
&pending.history_items,
active_turn.as_ref(),
live_turns.as_slice(),
);
}
@@ -716,11 +717,11 @@ pub(super) async fn send_thread_goal_snapshot_notification(
pub(crate) fn populate_thread_turns_from_history(
thread: &mut Thread,
items: &[RolloutItem],
active_turn: Option<&Turn>,
live_turns: &[Turn],
) {
let mut turns = build_api_turns_from_rollout_items(items);
if let Some(active_turn) = active_turn {
merge_turn_history_with_active_turn(&mut turns, active_turn.clone());
for turn in live_turns {
merge_turn_history_with_live_turn(&mut turns, turn.clone());
}
thread.turns = turns;
}
@@ -750,9 +751,9 @@ pub(super) async fn resolve_pending_server_request(
.await;
}
pub(super) fn merge_turn_history_with_active_turn(turns: &mut Vec<Turn>, active_turn: Turn) {
turns.retain(|turn| turn.id != active_turn.id);
turns.push(active_turn);
pub(super) fn merge_turn_history_with_live_turn(turns: &mut Vec<Turn>, live_turn: Turn) {
turns.retain(|turn| turn.id != live_turn.id);
turns.push(live_turn);
}
pub(super) fn set_thread_status_and_interrupt_stale_turns(

View File

@@ -2086,14 +2086,12 @@ impl ThreadRequestProcessor {
.load_history(/*include_archived*/ true)
.await
.map_err(|err| thread_read_history_load_error(thread_id, err))?;
let live_turn = {
let live_turns = {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let state = thread_state.lock().await;
state
.active_turn_snapshot()
.or_else(|| state.last_terminal_turn_snapshot())
state.live_turn_snapshots()
};
populate_thread_turns_from_history(thread, &history.items, live_turn.as_ref());
populate_thread_turns_from_history(thread, &history.items, live_turns.as_slice());
}
Ok(())
@@ -2129,19 +2127,17 @@ impl ThreadRequestProcessor {
Some(thread) => matches!(thread.agent_status().await, AgentStatus::Running),
None => false,
};
let active_turn = if loaded_thread.is_some() {
let live_turns = if loaded_thread.is_some() {
// Persisted history may not yet include the currently running turn. The
// app-server listener has already projected live turn events into
// ThreadState, so merge that in-memory snapshot before paginating. A
// failed turn may also complete before its error event is persisted, so
// bridge the latest terminal failed snapshot too.
// ThreadState, so merge those in-memory snapshots before paginating.
// A failed turn may also complete before its error event is persisted,
// so bridge terminal failed snapshots too.
let thread_state = self.thread_state_manager.thread_state(thread_uuid).await;
let state = thread_state.lock().await;
state
.active_turn_snapshot()
.or_else(|| state.last_terminal_turn_snapshot())
state.live_turn_snapshots()
} else {
None
Vec::new()
};
let mut turns = reconstruct_thread_turns_for_turns_list(
&items,
@@ -2149,7 +2145,7 @@ impl ThreadRequestProcessor {
.loaded_status_for_thread(&thread_uuid.to_string())
.await,
has_live_running_thread,
active_turn,
live_turns.as_slice(),
);
for turn in &mut turns {
match items_view {
@@ -2866,11 +2862,7 @@ impl ThreadRequestProcessor {
let (mut thread, history) =
thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd);
if include_turns && let Some(history) = history {
populate_thread_turns_from_history(
&mut thread,
&history.items,
/*active_turn*/ None,
);
populate_thread_turns_from_history(&mut thread, &history.items, &[]);
}
thread
}
@@ -2979,11 +2971,7 @@ impl ThreadRequestProcessor {
thread.path = Some(rollout_path.to_path_buf());
if include_turns {
let history_items = thread_history.get_rollout_items();
populate_thread_turns_from_history(
&mut thread,
&history_items,
/*active_turn*/ None,
);
populate_thread_turns_from_history(&mut thread, &history_items, &[]);
}
self.attach_thread_name(thread_id, &mut thread).await;
Ok(thread)
@@ -3177,11 +3165,7 @@ impl ThreadRequestProcessor {
thread.preview = preview_from_rollout_items(&history_items);
thread.forked_from_id = Some(source_thread_id.to_string());
if include_turns {
populate_thread_turns_from_history(
&mut thread,
&history_items,
/*active_turn*/ None,
);
populate_thread_turns_from_history(&mut thread, &history_items, &[]);
}
thread
};
@@ -3545,16 +3529,16 @@ fn reconstruct_thread_turns_for_turns_list(
items: &[RolloutItem],
loaded_status: ThreadStatus,
has_live_running_thread: bool,
active_turn: Option<Turn>,
live_turns: &[Turn],
) -> Vec<Turn> {
let has_live_in_progress_turn = has_live_running_thread
|| active_turn
.as_ref()
.is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress));
|| live_turns
.iter()
.any(|turn| matches!(turn.status, TurnStatus::InProgress));
let mut turns = build_api_turns_from_rollout_items(items);
normalize_thread_turns_status(&mut turns, loaded_status, has_live_in_progress_turn);
if let Some(active_turn) = active_turn {
merge_turn_history_with_active_turn(&mut turns, active_turn);
for turn in live_turns {
merge_turn_history_with_live_turn(&mut turns, turn.clone());
}
turns
}

View File

@@ -274,12 +274,87 @@ mod thread_processor_behavior_tests {
&persisted_items,
ThreadStatus::Idle,
/*has_live_running_thread*/ false,
Some(active_turn.clone()),
std::slice::from_ref(&active_turn),
);
assert_eq!(turns.last(), Some(&active_turn));
}
#[test]
fn thread_turns_list_merges_terminal_failure_and_active_turn() {
let persisted_items = vec![RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
message: "persisted".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
..Default::default()
},
))];
let failed_terminal_turn = Turn {
id: "turn-1".to_string(),
items: Vec::new(),
items_view: TurnItemsView::Full,
error: Some(TurnError {
message: "schema failed".to_string(),
codex_error_info: None,
additional_details: None,
data: None,
}),
status: TurnStatus::Failed,
started_at: None,
completed_at: None,
duration_ms: None,
};
let active_turn = Turn {
id: "turn-2".to_string(),
items: Vec::new(),
items_view: TurnItemsView::Full,
error: None,
status: TurnStatus::InProgress,
started_at: None,
completed_at: None,
duration_ms: None,
};
let turns = reconstruct_thread_turns_for_turns_list(
&persisted_items,
ThreadStatus::Idle,
/*has_live_running_thread*/ false,
&[failed_terminal_turn.clone(), active_turn.clone()],
);
assert!(turns.contains(&failed_terminal_turn));
assert_eq!(turns.last(), Some(&active_turn));
}
#[test]
fn app_server_history_replay_preserves_limited_error_events() {
let error = "schema failed";
let turns = build_api_turns_from_rollout_items(&[
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
message: "persisted".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
..Default::default()
},
)),
RolloutItem::EventMsg(EventMsg::Error(codex_protocol::protocol::ErrorEvent {
message: error.to_string(),
codex_error_info: None,
})),
]);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].status, TurnStatus::Failed);
assert_eq!(
turns[0].error.as_ref().map(|error| error.message.as_str()),
Some(error)
);
}
#[test]
fn validate_dynamic_tools_rejects_empty_namespace() {
let tools = vec![ApiDynamicToolSpec {

View File

@@ -148,12 +148,22 @@ impl ThreadState {
self.last_terminal_turn.clone()
}
pub(crate) fn live_turn_snapshots(&self) -> Vec<Turn> {
let mut turns = Vec::new();
if let Some(turn) = self.last_terminal_turn_snapshot() {
turns.push(turn);
}
if let Some(turn) = self.active_turn_snapshot() {
turns.retain(|existing| existing.id != turn.id);
turns.push(turn);
}
turns
}
pub(crate) fn track_current_turn_event(&mut self, event_turn_id: &str, event: &EventMsg) {
if let EventMsg::TurnStarted(payload) = event {
self.turn_summary.started_at = payload.started_at;
self.turn_summary.last_error = None;
self.last_terminal_turn_id = None;
self.last_terminal_turn = None;
}
self.current_turn_history.handle_event(event);
if matches!(event, EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_))
@@ -229,6 +239,7 @@ mod tests {
use codex_protocol::config_types::Settings;
use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use pretty_assertions::assert_eq;
@@ -266,6 +277,59 @@ mod tests {
assert_eq!(turn.error, state.turn_summary.last_error);
}
#[test]
fn live_turn_snapshots_preserve_failed_terminal_turn_after_next_turn_starts() {
let mut state = ThreadState::default();
state.track_current_turn_event("turn-1", &turn_started_event("turn-1"));
state.track_current_turn_event(
"turn-1",
&EventMsg::Error(ErrorEvent {
message: "raw error".to_string(),
codex_error_info: Some(CoreCodexErrorInfo::BadRequest),
}),
);
state.turn_summary.last_error = Some(TurnError {
message: "enriched error".to_string(),
codex_error_info: Some(CodexErrorInfo::BadRequest),
additional_details: None,
data: None,
});
state.track_current_turn_event("turn-1", &turn_complete_event("turn-1"));
state.track_current_turn_event("turn-2", &turn_started_event("turn-2"));
let turns = state.live_turn_snapshots();
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].id, "turn-1");
assert_eq!(turns[0].status, TurnStatus::Failed);
assert_eq!(
turns[0].error,
state.last_terminal_turn_snapshot().unwrap().error
);
assert_eq!(turns[1].id, "turn-2");
assert_eq!(turns[1].status, TurnStatus::InProgress);
assert_eq!(turns[1].error, None);
}
fn turn_started_event(turn_id: &str) -> EventMsg {
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
})
}
fn turn_complete_event(turn_id: &str) -> EventMsg {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
})
}
#[test]
fn note_thread_settings_reports_only_effective_changes() {
let mut state = ThreadState::default();

View File

@@ -151,6 +151,9 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
| EventMsg::TurnComplete(_)
| EventMsg::WebSearchEnd(_)
| EventMsg::ImageGenerationEnd(_) => Some(EventPersistenceMode::Limited),
EventMsg::Error(event) if event.affects_turn_status() => {
Some(EventPersistenceMode::Limited)
}
EventMsg::ItemCompleted(event) => {
// Plan items are derived from streaming tags and are not part of the
// raw ResponseItem history, so we persist their completion to replay
@@ -219,3 +222,36 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
| EventMsg::CollabResumeBegin(_) => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ErrorEvent;
#[test]
fn limited_persistence_includes_turn_affecting_errors() {
let item = RolloutItem::EventMsg(EventMsg::Error(ErrorEvent {
message: "backend schema failed".to_string(),
codex_error_info: Some(CodexErrorInfo::BadRequest),
}));
assert!(is_persisted_rollout_item(
&item,
EventPersistenceMode::Limited
));
}
#[test]
fn limited_persistence_skips_non_turn_affecting_errors() {
let item = RolloutItem::EventMsg(EventMsg::Error(ErrorEvent {
message: "rollback failed".to_string(),
codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed),
}));
assert!(!is_persisted_rollout_item(
&item,
EventPersistenceMode::Limited
));
}
}