use crate::protocol::item_builders::build_command_execution_begin_item; use crate::protocol::item_builders::build_command_execution_end_item; use crate::protocol::item_builders::build_file_change_approval_request_item; use crate::protocol::item_builders::build_file_change_begin_item; use crate::protocol::item_builders::build_file_change_end_item; use crate::protocol::item_builders::build_item_from_guardian_event; use crate::protocol::v2::CollabAgentState; use crate::protocol::v2::CollabAgentTool; use crate::protocol::v2::CollabAgentToolCallStatus; use crate::protocol::v2::CommandExecutionStatus; use crate::protocol::v2::DynamicToolCallOutputContentItem; use crate::protocol::v2::DynamicToolCallStatus; use crate::protocol::v2::McpToolCallError; use crate::protocol::v2::McpToolCallResult; use crate::protocol::v2::McpToolCallStatus; use crate::protocol::v2::ThreadItem; use crate::protocol::v2::Turn; use crate::protocol::v2::TurnError as V2TurnError; use crate::protocol::v2::TurnError; use crate::protocol::v2::TurnStatus; use crate::protocol::v2::UserInput; use crate::protocol::v2::WebSearchAction; use codex_protocol::items::parse_hook_prompt_message; use codex_protocol::models::MessagePhase; use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AgentReasoningRawContentEvent; use codex_protocol::protocol::AgentStatus; use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::ContextCompactedEvent; use codex_protocol::protocol::DynamicToolCallResponseEvent; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ExecCommandBeginEvent; use codex_protocol::protocol::ExecCommandEndEvent; use codex_protocol::protocol::GuardianAssessmentEvent; use codex_protocol::protocol::GuardianAssessmentStatus; use codex_protocol::protocol::ImageGenerationBeginEvent; use codex_protocol::protocol::ImageGenerationEndEvent; use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::McpToolCallBeginEvent; use codex_protocol::protocol::McpToolCallEndEvent; use codex_protocol::protocol::PatchApplyBeginEvent; use codex_protocol::protocol::PatchApplyEndEvent; use codex_protocol::protocol::ReviewOutputEvent; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use codex_protocol::protocol::ViewImageToolCallEvent; use codex_protocol::protocol::WebSearchBeginEvent; use codex_protocol::protocol::WebSearchEndEvent; use std::collections::HashMap; use tracing::warn; use uuid::Uuid; #[cfg(test)] use crate::protocol::v2::CommandAction; #[cfg(test)] use crate::protocol::v2::FileUpdateChange; #[cfg(test)] use crate::protocol::v2::PatchApplyStatus; #[cfg(test)] use crate::protocol::v2::PatchChangeKind; #[cfg(test)] use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus; #[cfg(test)] use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus; /// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values. /// /// When available, this uses `TurnContext.turn_id` as the canonical turn id so /// resumed/rebuilt thread history preserves the original turn identifiers. pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec { let mut builder = ThreadHistoryBuilder::new(); for item in items { builder.handle_rollout_item(item); } builder.finish() } pub struct ThreadHistoryBuilder { turns: Vec, current_turn: Option, next_item_index: i64, current_rollout_index: usize, next_rollout_index: usize, } impl Default for ThreadHistoryBuilder { fn default() -> Self { Self::new() } } impl ThreadHistoryBuilder { pub fn new() -> Self { Self { turns: Vec::new(), current_turn: None, next_item_index: 1, current_rollout_index: 0, next_rollout_index: 0, } } pub fn reset(&mut self) { *self = Self::new(); } pub fn finish(mut self) -> Vec { self.finish_current_turn(); self.turns } pub fn active_turn_snapshot(&self) -> Option { self.current_turn .as_ref() .map(Turn::from) .or_else(|| self.turns.last().cloned()) } pub fn has_active_turn(&self) -> bool { self.current_turn.is_some() } pub fn active_turn_id_if_explicit(&self) -> Option { self.current_turn .as_ref() .filter(|turn| turn.opened_explicitly) .map(|turn| turn.id.clone()) } pub fn active_turn_start_index(&self) -> Option { self.current_turn .as_ref() .map(|turn| turn.rollout_start_index) } /// Shared reducer for persisted rollout replay and in-memory current-turn /// tracking used by running thread resume/rejoin. /// /// This function should handle all EventMsg variants that can be persisted in a rollout file. /// See `should_persist_event_msg` in `codex-rs/core/rollout/policy.rs`. pub fn handle_event(&mut self, event: &EventMsg) { match event { EventMsg::UserMessage(payload) => self.handle_user_message(payload), EventMsg::AgentMessage(payload) => self.handle_agent_message( payload.message.clone(), payload.phase.clone(), payload.memory_citation.clone().map(Into::into), ), EventMsg::AgentReasoning(payload) => self.handle_agent_reasoning(payload), EventMsg::AgentReasoningRawContent(payload) => { self.handle_agent_reasoning_raw_content(payload) } EventMsg::WebSearchBegin(payload) => self.handle_web_search_begin(payload), EventMsg::WebSearchEnd(payload) => self.handle_web_search_end(payload), EventMsg::ExecCommandBegin(payload) => self.handle_exec_command_begin(payload), EventMsg::ExecCommandEnd(payload) => self.handle_exec_command_end(payload), EventMsg::GuardianAssessment(payload) => self.handle_guardian_assessment(payload), EventMsg::ApplyPatchApprovalRequest(payload) => { self.handle_apply_patch_approval_request(payload) } EventMsg::PatchApplyBegin(payload) => self.handle_patch_apply_begin(payload), EventMsg::PatchApplyEnd(payload) => self.handle_patch_apply_end(payload), EventMsg::DynamicToolCallRequest(payload) => { self.handle_dynamic_tool_call_request(payload) } EventMsg::DynamicToolCallResponse(payload) => { self.handle_dynamic_tool_call_response(payload) } EventMsg::McpToolCallBegin(payload) => self.handle_mcp_tool_call_begin(payload), EventMsg::McpToolCallEnd(payload) => self.handle_mcp_tool_call_end(payload), EventMsg::ViewImageToolCall(payload) => self.handle_view_image_tool_call(payload), EventMsg::ImageGenerationBegin(payload) => self.handle_image_generation_begin(payload), EventMsg::ImageGenerationEnd(payload) => self.handle_image_generation_end(payload), EventMsg::CollabAgentSpawnBegin(payload) => { self.handle_collab_agent_spawn_begin(payload) } EventMsg::CollabAgentSpawnEnd(payload) => self.handle_collab_agent_spawn_end(payload), EventMsg::CollabAgentInteractionBegin(payload) => { self.handle_collab_agent_interaction_begin(payload) } EventMsg::CollabAgentInteractionEnd(payload) => { self.handle_collab_agent_interaction_end(payload) } EventMsg::CollabWaitingBegin(payload) => self.handle_collab_waiting_begin(payload), EventMsg::CollabWaitingEnd(payload) => self.handle_collab_waiting_end(payload), EventMsg::CollabCloseBegin(payload) => self.handle_collab_close_begin(payload), EventMsg::CollabCloseEnd(payload) => self.handle_collab_close_end(payload), EventMsg::CollabResumeBegin(payload) => self.handle_collab_resume_begin(payload), EventMsg::CollabResumeEnd(payload) => self.handle_collab_resume_end(payload), EventMsg::ContextCompacted(payload) => self.handle_context_compacted(payload), EventMsg::EnteredReviewMode(payload) => self.handle_entered_review_mode(payload), EventMsg::ExitedReviewMode(payload) => self.handle_exited_review_mode(payload), EventMsg::ItemStarted(payload) => self.handle_item_started(payload), EventMsg::ItemCompleted(payload) => self.handle_item_completed(payload), EventMsg::HookStarted(_) | EventMsg::HookCompleted(_) => {} EventMsg::Error(payload) => self.handle_error(payload), EventMsg::TokenCount(_) => {} EventMsg::ThreadRolledBack(payload) => self.handle_thread_rollback(payload), EventMsg::UndoCompleted(_) => {} EventMsg::TurnAborted(payload) => self.handle_turn_aborted(payload), EventMsg::TurnStarted(payload) => self.handle_turn_started(payload), EventMsg::TurnComplete(payload) => self.handle_turn_complete(payload), _ => {} } } pub fn handle_rollout_item(&mut self, item: &RolloutItem) { self.current_rollout_index = self.next_rollout_index; self.next_rollout_index += 1; match item { RolloutItem::EventMsg(event) => self.handle_event(event), RolloutItem::Compacted(payload) => self.handle_compacted(payload), RolloutItem::ResponseItem(item) => self.handle_response_item(item), RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {} } } fn handle_response_item(&mut self, item: &codex_protocol::models::ResponseItem) { let codex_protocol::models::ResponseItem::Message { role, content, id, .. } = item else { return; }; if role != "user" { return; } let Some(hook_prompt) = parse_hook_prompt_message(id.as_ref(), content) else { return; }; self.ensure_turn().items.push(ThreadItem::HookPrompt { id: hook_prompt.id, fragments: hook_prompt .fragments .into_iter() .map(crate::protocol::v2::HookPromptFragment::from) .collect(), }); } fn handle_user_message(&mut self, payload: &UserMessageEvent) { // User messages should stay in explicitly opened turns. For backward // compatibility with older streams that did not open turns explicitly, // close any implicit/inactive turn and start a fresh one for this input. if let Some(turn) = self.current_turn.as_ref() && !turn.opened_explicitly && !(turn.saw_compaction && turn.items.is_empty()) { self.finish_current_turn(); } let mut turn = self .current_turn .take() .unwrap_or_else(|| self.new_turn(/*id*/ None)); let id = self.next_item_id(); let content = self.build_user_inputs(payload); turn.items.push(ThreadItem::UserMessage { id, content }); self.current_turn = Some(turn); } fn handle_agent_message( &mut self, text: String, phase: Option, memory_citation: Option, ) { if text.is_empty() { return; } let id = self.next_item_id(); self.ensure_turn().items.push(ThreadItem::AgentMessage { id, text, phase, memory_citation, }); } fn handle_agent_reasoning(&mut self, payload: &AgentReasoningEvent) { if payload.text.is_empty() { return; } // If the last item is a reasoning item, add the new text to the summary. if let Some(ThreadItem::Reasoning { summary, .. }) = self.ensure_turn().items.last_mut() { summary.push(payload.text.clone()); return; } // Otherwise, create a new reasoning item. let id = self.next_item_id(); self.ensure_turn().items.push(ThreadItem::Reasoning { id, summary: vec![payload.text.clone()], content: Vec::new(), }); } fn handle_agent_reasoning_raw_content(&mut self, payload: &AgentReasoningRawContentEvent) { if payload.text.is_empty() { return; } // If the last item is a reasoning item, add the new text to the content. if let Some(ThreadItem::Reasoning { content, .. }) = self.ensure_turn().items.last_mut() { content.push(payload.text.clone()); return; } // Otherwise, create a new reasoning item. let id = self.next_item_id(); self.ensure_turn().items.push(ThreadItem::Reasoning { id, summary: Vec::new(), content: vec![payload.text.clone()], }); } fn handle_item_started(&mut self, payload: &ItemStartedEvent) { match &payload.item { codex_protocol::items::TurnItem::Plan(plan) => { if plan.text.is_empty() { return; } self.upsert_item_in_turn_id( &payload.turn_id, ThreadItem::from(payload.item.clone()), ); } codex_protocol::items::TurnItem::UserMessage(_) | codex_protocol::items::TurnItem::HookPrompt(_) | codex_protocol::items::TurnItem::AgentMessage(_) | codex_protocol::items::TurnItem::Reasoning(_) | codex_protocol::items::TurnItem::WebSearch(_) | codex_protocol::items::TurnItem::ImageGeneration(_) | codex_protocol::items::TurnItem::ContextCompaction(_) => {} } } fn handle_item_completed(&mut self, payload: &ItemCompletedEvent) { match &payload.item { codex_protocol::items::TurnItem::Plan(plan) => { if plan.text.is_empty() { return; } self.upsert_item_in_turn_id( &payload.turn_id, ThreadItem::from(payload.item.clone()), ); } codex_protocol::items::TurnItem::UserMessage(_) | codex_protocol::items::TurnItem::HookPrompt(_) | codex_protocol::items::TurnItem::AgentMessage(_) | codex_protocol::items::TurnItem::Reasoning(_) | codex_protocol::items::TurnItem::WebSearch(_) | codex_protocol::items::TurnItem::ImageGeneration(_) | codex_protocol::items::TurnItem::ContextCompaction(_) => {} } } fn handle_web_search_begin(&mut self, payload: &WebSearchBeginEvent) { let item = ThreadItem::WebSearch { id: payload.call_id.clone(), query: String::new(), action: None, }; self.upsert_item_in_current_turn(item); } fn handle_web_search_end(&mut self, payload: &WebSearchEndEvent) { let item = ThreadItem::WebSearch { id: payload.call_id.clone(), query: payload.query.clone(), action: Some(WebSearchAction::from(payload.action.clone())), }; self.upsert_item_in_current_turn(item); } fn handle_exec_command_begin(&mut self, payload: &ExecCommandBeginEvent) { let item = build_command_execution_begin_item(payload); self.upsert_item_in_turn_id(&payload.turn_id, item); } fn handle_exec_command_end(&mut self, payload: &ExecCommandEndEvent) { let item = build_command_execution_end_item(payload); // Command completions can arrive out of order. Unified exec may return // while a PTY is still running, then emit ExecCommandEnd later from a // background exit watcher when that process finally exits. By then, a // newer user turn may already have started. Route by event turn_id so // replay preserves the original turn association. self.upsert_item_in_turn_id(&payload.turn_id, item); } fn handle_guardian_assessment(&mut self, payload: &GuardianAssessmentEvent) { let status = match payload.status { GuardianAssessmentStatus::InProgress => CommandExecutionStatus::InProgress, GuardianAssessmentStatus::Denied | GuardianAssessmentStatus::Aborted => { CommandExecutionStatus::Declined } GuardianAssessmentStatus::TimedOut => CommandExecutionStatus::Failed, GuardianAssessmentStatus::Approved => return, }; let Some(item) = build_item_from_guardian_event(payload, status) else { return; }; if payload.turn_id.is_empty() { self.upsert_item_in_current_turn(item); } else { self.upsert_item_in_turn_id(&payload.turn_id, item); } } fn handle_apply_patch_approval_request(&mut self, payload: &ApplyPatchApprovalRequestEvent) { let item = build_file_change_approval_request_item(payload); if payload.turn_id.is_empty() { self.upsert_item_in_current_turn(item); } else { self.upsert_item_in_turn_id(&payload.turn_id, item); } } fn handle_patch_apply_begin(&mut self, payload: &PatchApplyBeginEvent) { let item = build_file_change_begin_item(payload); if payload.turn_id.is_empty() { self.upsert_item_in_current_turn(item); } else { self.upsert_item_in_turn_id(&payload.turn_id, item); } } fn handle_patch_apply_end(&mut self, payload: &PatchApplyEndEvent) { let item = build_file_change_end_item(payload); if payload.turn_id.is_empty() { self.upsert_item_in_current_turn(item); } else { self.upsert_item_in_turn_id(&payload.turn_id, item); } } fn handle_dynamic_tool_call_request( &mut self, payload: &codex_protocol::dynamic_tools::DynamicToolCallRequest, ) { let item = ThreadItem::DynamicToolCall { id: payload.call_id.clone(), tool: payload.tool.clone(), arguments: payload.arguments.clone(), status: DynamicToolCallStatus::InProgress, content_items: None, success: None, duration_ms: None, }; if payload.turn_id.is_empty() { self.upsert_item_in_current_turn(item); } else { self.upsert_item_in_turn_id(&payload.turn_id, item); } } fn handle_dynamic_tool_call_response(&mut self, payload: &DynamicToolCallResponseEvent) { let status = if payload.success { DynamicToolCallStatus::Completed } else { DynamicToolCallStatus::Failed }; let duration_ms = i64::try_from(payload.duration.as_millis()).ok(); let item = ThreadItem::DynamicToolCall { id: payload.call_id.clone(), tool: payload.tool.clone(), arguments: payload.arguments.clone(), status, content_items: Some(convert_dynamic_tool_content_items(&payload.content_items)), success: Some(payload.success), duration_ms, }; if payload.turn_id.is_empty() { self.upsert_item_in_current_turn(item); } else { self.upsert_item_in_turn_id(&payload.turn_id, item); } } fn handle_mcp_tool_call_begin(&mut self, payload: &McpToolCallBeginEvent) { let item = ThreadItem::McpToolCall { id: payload.call_id.clone(), server: payload.invocation.server.clone(), tool: payload.invocation.tool.clone(), status: McpToolCallStatus::InProgress, arguments: payload .invocation .arguments .clone() .unwrap_or(serde_json::Value::Null), result: None, error: None, duration_ms: None, }; self.upsert_item_in_current_turn(item); } fn handle_mcp_tool_call_end(&mut self, payload: &McpToolCallEndEvent) { let status = if payload.is_success() { McpToolCallStatus::Completed } else { McpToolCallStatus::Failed }; let duration_ms = i64::try_from(payload.duration.as_millis()).ok(); let (result, error) = match &payload.result { Ok(value) => ( Some(McpToolCallResult { content: value.content.clone(), structured_content: value.structured_content.clone(), meta: value.meta.clone(), }), None, ), Err(message) => ( None, Some(McpToolCallError { message: message.clone(), }), ), }; let item = ThreadItem::McpToolCall { id: payload.call_id.clone(), server: payload.invocation.server.clone(), tool: payload.invocation.tool.clone(), status, arguments: payload .invocation .arguments .clone() .unwrap_or(serde_json::Value::Null), result, error, duration_ms, }; self.upsert_item_in_current_turn(item); } fn handle_view_image_tool_call(&mut self, payload: &ViewImageToolCallEvent) { let item = ThreadItem::ImageView { id: payload.call_id.clone(), path: payload.path.clone(), }; self.upsert_item_in_current_turn(item); } fn handle_image_generation_begin(&mut self, payload: &ImageGenerationBeginEvent) { let item = ThreadItem::ImageGeneration { id: payload.call_id.clone(), status: String::new(), revised_prompt: None, result: String::new(), saved_path: None, }; self.upsert_item_in_current_turn(item); } fn handle_image_generation_end(&mut self, payload: &ImageGenerationEndEvent) { let item = ThreadItem::ImageGeneration { id: payload.call_id.clone(), status: payload.status.clone(), revised_prompt: payload.revised_prompt.clone(), result: payload.result.clone(), saved_path: payload.saved_path.clone(), }; self.upsert_item_in_current_turn(item); } fn handle_collab_agent_spawn_begin( &mut self, payload: &codex_protocol::protocol::CollabAgentSpawnBeginEvent, ) { let item = ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::SpawnAgent, status: CollabAgentToolCallStatus::InProgress, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids: Vec::new(), prompt: Some(payload.prompt.clone()), model: Some(payload.model.clone()), reasoning_effort: Some(payload.reasoning_effort), agents_states: HashMap::new(), }; self.upsert_item_in_current_turn(item); } fn handle_collab_agent_spawn_end( &mut self, payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent, ) { let has_receiver = payload.new_thread_id.is_some(); let status = match &payload.status { AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, _ if has_receiver => CollabAgentToolCallStatus::Completed, _ => CollabAgentToolCallStatus::Failed, }; let (receiver_thread_ids, agents_states) = match &payload.new_thread_id { Some(id) => { let receiver_id = id.to_string(); let received_status = CollabAgentState::from(payload.status.clone()); ( vec![receiver_id.clone()], [(receiver_id, received_status)].into_iter().collect(), ) } None => (Vec::new(), HashMap::new()), }; self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::SpawnAgent, status, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids, prompt: Some(payload.prompt.clone()), model: Some(payload.model.clone()), reasoning_effort: Some(payload.reasoning_effort), agents_states, }); } fn handle_collab_agent_interaction_begin( &mut self, payload: &codex_protocol::protocol::CollabAgentInteractionBeginEvent, ) { let item = ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::SendInput, status: CollabAgentToolCallStatus::InProgress, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids: vec![payload.receiver_thread_id.to_string()], prompt: Some(payload.prompt.clone()), model: None, reasoning_effort: None, agents_states: HashMap::new(), }; self.upsert_item_in_current_turn(item); } fn handle_collab_agent_interaction_end( &mut self, payload: &codex_protocol::protocol::CollabAgentInteractionEndEvent, ) { let status = match &payload.status { AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, _ => CollabAgentToolCallStatus::Completed, }; let receiver_id = payload.receiver_thread_id.to_string(); let received_status = CollabAgentState::from(payload.status.clone()); self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::SendInput, status, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids: vec![receiver_id.clone()], prompt: Some(payload.prompt.clone()), model: None, reasoning_effort: None, agents_states: [(receiver_id, received_status)].into_iter().collect(), }); } fn handle_collab_waiting_begin( &mut self, payload: &codex_protocol::protocol::CollabWaitingBeginEvent, ) { let item = ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::Wait, status: CollabAgentToolCallStatus::InProgress, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids: payload .receiver_thread_ids .iter() .map(ToString::to_string) .collect(), prompt: None, model: None, reasoning_effort: None, agents_states: HashMap::new(), }; self.upsert_item_in_current_turn(item); } fn handle_collab_waiting_end( &mut self, payload: &codex_protocol::protocol::CollabWaitingEndEvent, ) { let status = if payload .statuses .values() .any(|status| matches!(status, AgentStatus::Errored(_) | AgentStatus::NotFound)) { CollabAgentToolCallStatus::Failed } else { CollabAgentToolCallStatus::Completed }; let mut receiver_thread_ids: Vec = payload.statuses.keys().map(ToString::to_string).collect(); receiver_thread_ids.sort(); let agents_states = payload .statuses .iter() .map(|(id, status)| (id.to_string(), CollabAgentState::from(status.clone()))) .collect(); self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::Wait, status, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids, prompt: None, model: None, reasoning_effort: None, agents_states, }); } fn handle_collab_close_begin( &mut self, payload: &codex_protocol::protocol::CollabCloseBeginEvent, ) { let item = ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::CloseAgent, status: CollabAgentToolCallStatus::InProgress, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids: vec![payload.receiver_thread_id.to_string()], prompt: None, model: None, reasoning_effort: None, agents_states: HashMap::new(), }; self.upsert_item_in_current_turn(item); } fn handle_collab_close_end(&mut self, payload: &codex_protocol::protocol::CollabCloseEndEvent) { let status = match &payload.status { AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, _ => CollabAgentToolCallStatus::Completed, }; let receiver_id = payload.receiver_thread_id.to_string(); let agents_states = [( receiver_id.clone(), CollabAgentState::from(payload.status.clone()), )] .into_iter() .collect(); self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::CloseAgent, status, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids: vec![receiver_id], prompt: None, model: None, reasoning_effort: None, agents_states, }); } fn handle_collab_resume_begin( &mut self, payload: &codex_protocol::protocol::CollabResumeBeginEvent, ) { let item = ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::ResumeAgent, status: CollabAgentToolCallStatus::InProgress, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids: vec![payload.receiver_thread_id.to_string()], prompt: None, model: None, reasoning_effort: None, agents_states: HashMap::new(), }; self.upsert_item_in_current_turn(item); } fn handle_collab_resume_end( &mut self, payload: &codex_protocol::protocol::CollabResumeEndEvent, ) { let status = match &payload.status { AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, _ => CollabAgentToolCallStatus::Completed, }; let receiver_id = payload.receiver_thread_id.to_string(); let agents_states = [( receiver_id.clone(), CollabAgentState::from(payload.status.clone()), )] .into_iter() .collect(); self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall { id: payload.call_id.clone(), tool: CollabAgentTool::ResumeAgent, status, sender_thread_id: payload.sender_thread_id.to_string(), receiver_thread_ids: vec![receiver_id], prompt: None, model: None, reasoning_effort: None, agents_states, }); } fn handle_context_compacted(&mut self, _payload: &ContextCompactedEvent) { let id = self.next_item_id(); self.ensure_turn() .items .push(ThreadItem::ContextCompaction { id }); } fn handle_entered_review_mode(&mut self, payload: &codex_protocol::protocol::ReviewRequest) { let review = payload .user_facing_hint .clone() .unwrap_or_else(|| "Review requested.".to_string()); let id = self.next_item_id(); self.ensure_turn() .items .push(ThreadItem::EnteredReviewMode { id, review }); } fn handle_exited_review_mode( &mut self, payload: &codex_protocol::protocol::ExitedReviewModeEvent, ) { let review = payload .review_output .as_ref() .map(render_review_output_text) .unwrap_or_else(|| REVIEW_FALLBACK_MESSAGE.to_string()); let id = self.next_item_id(); self.ensure_turn() .items .push(ThreadItem::ExitedReviewMode { id, review }); } fn handle_error(&mut self, payload: &ErrorEvent) { if !payload.affects_turn_status() { return; } let Some(turn) = self.current_turn.as_mut() else { return; }; turn.status = TurnStatus::Failed; turn.error = Some(V2TurnError { message: payload.message.clone(), codex_error_info: payload.codex_error_info.clone().map(Into::into), additional_details: None, }); } fn handle_turn_aborted(&mut self, payload: &TurnAbortedEvent) { let apply_abort = |turn: &mut PendingTurn| { turn.status = TurnStatus::Interrupted; turn.completed_at = payload.completed_at; turn.duration_ms = payload.duration_ms; }; if let Some(turn_id) = payload.turn_id.as_deref() { // Prefer an exact ID match so we interrupt the turn explicitly targeted by the event. if let Some(turn) = self.current_turn.as_mut().filter(|turn| turn.id == turn_id) { apply_abort(turn); return; } if let Some(turn) = self.turns.iter_mut().find(|turn| turn.id == turn_id) { turn.status = TurnStatus::Interrupted; turn.completed_at = payload.completed_at; turn.duration_ms = payload.duration_ms; return; } } // If the event has no ID (or refers to an unknown turn), fall back to the active turn. if let Some(turn) = self.current_turn.as_mut() { apply_abort(turn); } } fn handle_turn_started(&mut self, payload: &TurnStartedEvent) { self.finish_current_turn(); self.current_turn = Some( self.new_turn(Some(payload.turn_id.clone())) .with_status(TurnStatus::InProgress) .with_started_at(payload.started_at) .opened_explicitly(), ); } fn handle_turn_complete(&mut self, payload: &TurnCompleteEvent) { let mark_completed = |turn: &mut PendingTurn| { if matches!(turn.status, TurnStatus::Completed | TurnStatus::InProgress) { turn.status = TurnStatus::Completed; } turn.completed_at = payload.completed_at; turn.duration_ms = payload.duration_ms; }; // Prefer an exact ID match from the active turn and then close it. if let Some(current_turn) = self .current_turn .as_mut() .filter(|turn| turn.id == payload.turn_id) { mark_completed(current_turn); self.finish_current_turn(); return; } if let Some(turn) = self .turns .iter_mut() .find(|turn| turn.id == payload.turn_id) { if matches!(turn.status, TurnStatus::Completed | TurnStatus::InProgress) { turn.status = TurnStatus::Completed; } turn.completed_at = payload.completed_at; turn.duration_ms = payload.duration_ms; return; } // If the completion event cannot be matched, apply it to the active turn. if let Some(current_turn) = self.current_turn.as_mut() { mark_completed(current_turn); self.finish_current_turn(); } } /// Marks the current turn as containing a persisted compaction marker. /// /// This keeps compaction-only legacy turns from being dropped by /// `finish_current_turn` when they have no renderable items and were not /// explicitly opened. fn handle_compacted(&mut self, _payload: &CompactedItem) { self.ensure_turn().saw_compaction = true; } fn handle_thread_rollback(&mut self, payload: &ThreadRolledBackEvent) { self.finish_current_turn(); let n = usize::try_from(payload.num_turns).unwrap_or(usize::MAX); if n >= self.turns.len() { self.turns.clear(); } else { self.turns.truncate(self.turns.len().saturating_sub(n)); } let item_count: usize = self.turns.iter().map(|t| t.items.len()).sum(); self.next_item_index = i64::try_from(item_count.saturating_add(1)).unwrap_or(i64::MAX); } fn finish_current_turn(&mut self) { if let Some(turn) = self.current_turn.take() { if turn.items.is_empty() && !turn.opened_explicitly && !turn.saw_compaction { return; } self.turns.push(Turn::from(turn)); } } fn new_turn(&mut self, id: Option) -> PendingTurn { PendingTurn { id: id.unwrap_or_else(|| Uuid::now_v7().to_string()), items: Vec::new(), error: None, status: TurnStatus::Completed, started_at: None, completed_at: None, duration_ms: None, opened_explicitly: false, saw_compaction: false, rollout_start_index: self.current_rollout_index, } } fn ensure_turn(&mut self) -> &mut PendingTurn { if self.current_turn.is_none() { let turn = self.new_turn(/*id*/ None); return self.current_turn.insert(turn); } if let Some(turn) = self.current_turn.as_mut() { return turn; } unreachable!("current turn must exist after initialization"); } fn upsert_item_in_turn_id(&mut self, turn_id: &str, item: ThreadItem) { if let Some(turn) = self.current_turn.as_mut() && turn.id == turn_id { upsert_turn_item(&mut turn.items, item); return; } if let Some(turn) = self.turns.iter_mut().find(|turn| turn.id == turn_id) { upsert_turn_item(&mut turn.items, item); return; } warn!( item_id = item.id(), "dropping turn-scoped item for unknown turn id `{turn_id}`" ); } fn upsert_item_in_current_turn(&mut self, item: ThreadItem) { let turn = self.ensure_turn(); upsert_turn_item(&mut turn.items, item); } fn next_item_id(&mut self) -> String { let id = format!("item-{}", self.next_item_index); self.next_item_index += 1; id } fn build_user_inputs(&self, payload: &UserMessageEvent) -> Vec { let mut content = Vec::new(); if !payload.message.trim().is_empty() { content.push(UserInput::Text { text: payload.message.clone(), text_elements: payload .text_elements .iter() .cloned() .map(Into::into) .collect(), }); } if let Some(images) = &payload.images { for image in images { content.push(UserInput::Image { url: image.clone() }); } } for path in &payload.local_images { content.push(UserInput::LocalImage { path: path.clone() }); } content } } const REVIEW_FALLBACK_MESSAGE: &str = "Reviewer failed to output a response."; fn render_review_output_text(output: &ReviewOutputEvent) -> String { let explanation = output.overall_explanation.trim(); if explanation.is_empty() { REVIEW_FALLBACK_MESSAGE.to_string() } else { explanation.to_string() } } fn convert_dynamic_tool_content_items( items: &[codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem], ) -> Vec { items .iter() .cloned() .map(|item| match item { codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem::InputText { text } => { DynamicToolCallOutputContentItem::InputText { text } } codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem::InputImage { image_url, } => DynamicToolCallOutputContentItem::InputImage { image_url }, }) .collect() } fn upsert_turn_item(items: &mut Vec, item: ThreadItem) { if let Some(existing_item) = items .iter_mut() .find(|existing_item| existing_item.id() == item.id()) { *existing_item = item; return; } items.push(item); } struct PendingTurn { id: String, items: Vec, error: Option, status: TurnStatus, started_at: Option, completed_at: Option, duration_ms: Option, /// True when this turn originated from an explicit `turn_started`/`turn_complete` /// boundary, so we preserve it even if it has no renderable items. opened_explicitly: bool, /// True when this turn includes a persisted `RolloutItem::Compacted`, which /// should keep the turn from being dropped even without normal items. saw_compaction: bool, /// Index of the rollout item that opened this turn during replay. rollout_start_index: usize, } impl PendingTurn { fn opened_explicitly(mut self) -> Self { self.opened_explicitly = true; self } fn with_status(mut self, status: TurnStatus) -> Self { self.status = status; self } fn with_started_at(mut self, started_at: Option) -> Self { self.started_at = started_at; self } } impl From for Turn { fn from(value: PendingTurn) -> Self { Self { id: value.id, items: value.items, error: value.error, status: value.status, started_at: value.started_at, completed_at: value.completed_at, duration_ms: value.duration_ms, } } } impl From<&PendingTurn> for Turn { fn from(value: &PendingTurn) -> Self { Self { id: value.id.clone(), items: value.items.clone(), error: value.error.clone(), status: value.status.clone(), started_at: value.started_at, completed_at: value.completed_at, duration_ms: value.duration_ms, } } } #[cfg(test)] mod tests { use super::*; use crate::protocol::v2::CommandExecutionSource; use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem; use codex_protocol::items::HookPromptFragment as CoreHookPromptFragment; use codex_protocol::items::TurnItem as CoreTurnItem; use codex_protocol::items::UserMessageItem as CoreUserMessageItem; use codex_protocol::items::build_hook_prompt_message; use codex_protocol::mcp::CallToolResult; use codex_protocol::models::MessagePhase as CoreMessagePhase; use codex_protocol::models::WebSearchAction as CoreWebSearchAction; use codex_protocol::parse_command::ParsedCommand; use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AgentReasoningRawContentEvent; use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::DynamicToolCallResponseEvent; use codex_protocol::protocol::ExecCommandEndEvent; use codex_protocol::protocol::ExecCommandSource; use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::McpInvocation; use codex_protocol::protocol::McpToolCallEndEvent; use codex_protocol::protocol::PatchApplyBeginEvent; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use codex_protocol::protocol::WebSearchEndEvent; use codex_utils_absolute_path::test_support::PathBufExt; use codex_utils_absolute_path::test_support::test_path_buf; use pretty_assertions::assert_eq; use std::path::PathBuf; use std::time::Duration; use uuid::Uuid; #[test] fn builds_multiple_turns_with_reasoning_items() { let events = vec![ EventMsg::UserMessage(UserMessageEvent { message: "First turn".into(), images: Some(vec!["https://example.com/one.png".into()]), text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "Hi there".into(), phase: None, memory_citation: None, }), EventMsg::AgentReasoning(AgentReasoningEvent { text: "thinking".into(), }), EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text: "full reasoning".into(), }), EventMsg::UserMessage(UserMessageEvent { message: "Second turn".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "Reply two".into(), phase: None, memory_citation: None, }), ]; let mut builder = ThreadHistoryBuilder::new(); for event in &events { builder.handle_event(event); } let turns = builder.finish(); assert_eq!(turns.len(), 2); let first = &turns[0]; assert!(Uuid::parse_str(&first.id).is_ok()); assert_eq!(first.status, TurnStatus::Completed); assert_eq!(first.items.len(), 3); assert_eq!( first.items[0], ThreadItem::UserMessage { id: "item-1".into(), content: vec![ UserInput::Text { text: "First turn".into(), text_elements: Vec::new(), }, UserInput::Image { url: "https://example.com/one.png".into(), } ], } ); assert_eq!( first.items[1], ThreadItem::AgentMessage { id: "item-2".into(), text: "Hi there".into(), phase: None, memory_citation: None, } ); assert_eq!( first.items[2], ThreadItem::Reasoning { id: "item-3".into(), summary: vec!["thinking".into()], content: vec!["full reasoning".into()], } ); let second = &turns[1]; assert!(Uuid::parse_str(&second.id).is_ok()); assert_ne!(first.id, second.id); assert_eq!(second.items.len(), 2); assert_eq!( second.items[0], ThreadItem::UserMessage { id: "item-4".into(), content: vec![UserInput::Text { text: "Second turn".into(), text_elements: Vec::new(), }], } ); assert_eq!( second.items[1], ThreadItem::AgentMessage { id: "item-5".into(), text: "Reply two".into(), phase: None, memory_citation: None, } ); } #[test] fn ignores_non_plan_item_lifecycle_events() { let turn_id = "turn-1"; let thread_id = ThreadId::new(); let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: turn_id.to_string(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "hello".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::ItemStarted(ItemStartedEvent { thread_id, turn_id: turn_id.to_string(), item: CoreTurnItem::UserMessage(CoreUserMessageItem { id: "user-item-id".to_string(), content: Vec::new(), }), }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: turn_id.to_string(), last_agent_message: None, completed_at: None, duration_ms: None, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 1); assert_eq!( turns[0].items[0], ThreadItem::UserMessage { id: "item-1".into(), content: vec![UserInput::Text { text: "hello".into(), text_elements: Vec::new(), }], } ); } #[test] fn preserves_agent_message_phase_in_history() { let events = vec![EventMsg::AgentMessage(AgentMessageEvent { message: "Final reply".into(), phase: Some(CoreMessagePhase::FinalAnswer), memory_citation: None, })]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!( turns[0].items[0], ThreadItem::AgentMessage { id: "item-1".into(), text: "Final reply".into(), phase: Some(MessagePhase::FinalAnswer), memory_citation: None, } ); } #[test] fn replays_image_generation_end_events_into_turn_history() { let items = vec![ RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-image".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), })), RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { message: "generate an image".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), })), RolloutItem::EventMsg(EventMsg::ImageGenerationEnd(ImageGenerationEndEvent { call_id: "ig_123".into(), status: "completed".into(), revised_prompt: Some("final prompt".into()), result: "Zm9v".into(), saved_path: Some(test_path_buf("/tmp/ig_123.png").abs()), })), RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-image".into(), last_agent_message: None, completed_at: None, duration_ms: None, })), ]; let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!( turns[0], Turn { id: "turn-image".into(), status: TurnStatus::Completed, error: None, started_at: None, completed_at: None, duration_ms: None, items: vec![ ThreadItem::UserMessage { id: "item-1".into(), content: vec![UserInput::Text { text: "generate an image".into(), text_elements: Vec::new(), }], }, ThreadItem::ImageGeneration { id: "ig_123".into(), status: "completed".into(), revised_prompt: Some("final prompt".into()), result: "Zm9v".into(), saved_path: Some(test_path_buf("/tmp/ig_123.png").abs()), }, ], } ); } #[test] fn splits_reasoning_when_interleaved() { let events = vec![ EventMsg::UserMessage(UserMessageEvent { message: "Turn start".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentReasoning(AgentReasoningEvent { text: "first summary".into(), }), EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text: "first content".into(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "interlude".into(), phase: None, memory_citation: None, }), EventMsg::AgentReasoning(AgentReasoningEvent { text: "second summary".into(), }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); let turn = &turns[0]; assert_eq!(turn.items.len(), 4); assert_eq!( turn.items[1], ThreadItem::Reasoning { id: "item-2".into(), summary: vec!["first summary".into()], content: vec!["first content".into()], } ); assert_eq!( turn.items[3], ThreadItem::Reasoning { id: "item-4".into(), summary: vec!["second summary".into()], content: Vec::new(), } ); } #[test] fn marks_turn_as_interrupted_when_aborted() { let events = vec![ EventMsg::UserMessage(UserMessageEvent { message: "Please do the thing".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "Working...".into(), phase: None, memory_citation: None, }), EventMsg::TurnAborted(TurnAbortedEvent { turn_id: Some("turn-1".into()), reason: TurnAbortReason::Replaced, completed_at: None, duration_ms: None, }), EventMsg::UserMessage(UserMessageEvent { message: "Let's try again".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "Second attempt complete.".into(), phase: None, memory_citation: None, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 2); let first_turn = &turns[0]; assert_eq!(first_turn.status, TurnStatus::Interrupted); assert_eq!(first_turn.items.len(), 2); assert_eq!( first_turn.items[0], ThreadItem::UserMessage { id: "item-1".into(), content: vec![UserInput::Text { text: "Please do the thing".into(), text_elements: Vec::new(), }], } ); assert_eq!( first_turn.items[1], ThreadItem::AgentMessage { id: "item-2".into(), text: "Working...".into(), phase: None, memory_citation: None, } ); let second_turn = &turns[1]; assert_eq!(second_turn.status, TurnStatus::Completed); assert_eq!(second_turn.items.len(), 2); assert_eq!( second_turn.items[0], ThreadItem::UserMessage { id: "item-3".into(), content: vec![UserInput::Text { text: "Let's try again".into(), text_elements: Vec::new(), }], } ); assert_eq!( second_turn.items[1], ThreadItem::AgentMessage { id: "item-4".into(), text: "Second attempt complete.".into(), phase: None, memory_citation: None, } ); } #[test] fn drops_last_turns_on_thread_rollback() { let events = vec![ EventMsg::UserMessage(UserMessageEvent { message: "First".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "A1".into(), phase: None, memory_citation: None, }), EventMsg::UserMessage(UserMessageEvent { message: "Second".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "A2".into(), phase: None, memory_citation: None, }), EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 1 }), EventMsg::UserMessage(UserMessageEvent { message: "Third".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "A3".into(), phase: None, memory_citation: None, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 2); assert!(Uuid::parse_str(&turns[0].id).is_ok()); assert!(Uuid::parse_str(&turns[1].id).is_ok()); assert_ne!(turns[0].id, turns[1].id); assert_eq!(turns[0].status, TurnStatus::Completed); assert_eq!(turns[1].status, TurnStatus::Completed); assert_eq!( turns[0].items, vec![ ThreadItem::UserMessage { id: "item-1".into(), content: vec![UserInput::Text { text: "First".into(), text_elements: Vec::new(), }], }, ThreadItem::AgentMessage { id: "item-2".into(), text: "A1".into(), phase: None, memory_citation: None, }, ] ); assert_eq!( turns[1].items, vec![ ThreadItem::UserMessage { id: "item-3".into(), content: vec![UserInput::Text { text: "Third".into(), text_elements: Vec::new(), }], }, ThreadItem::AgentMessage { id: "item-4".into(), text: "A3".into(), phase: None, memory_citation: None, }, ] ); } #[test] fn thread_rollback_clears_all_turns_when_num_turns_exceeds_history() { let events = vec![ EventMsg::UserMessage(UserMessageEvent { message: "One".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "A1".into(), phase: None, memory_citation: None, }), EventMsg::UserMessage(UserMessageEvent { message: "Two".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "A2".into(), phase: None, memory_citation: None, }), EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 99 }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns, Vec::::new()); } #[test] fn uses_explicit_turn_boundaries_for_mid_turn_steering() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-a".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "Start".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::UserMessage(UserMessageEvent { message: "Steer".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].id, "turn-a"); assert_eq!( turns[0].items, vec![ ThreadItem::UserMessage { id: "item-1".into(), content: vec![UserInput::Text { text: "Start".into(), text_elements: Vec::new(), }], }, ThreadItem::UserMessage { id: "item-2".into(), content: vec![UserInput::Text { text: "Steer".into(), text_elements: Vec::new(), }], }, ] ); } #[test] fn reconstructs_tool_items_from_persisted_completion_events() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-1".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "run tools".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::WebSearchEnd(WebSearchEndEvent { call_id: "search-1".into(), query: "codex".into(), action: CoreWebSearchAction::Search { query: Some("codex".into()), queries: None, }, }), EventMsg::ExecCommandEnd(ExecCommandEndEvent { call_id: "exec-1".into(), process_id: Some("pid-1".into()), turn_id: "turn-1".into(), command: vec!["echo".into(), "hello world".into()], cwd: test_path_buf("/tmp").abs(), parsed_cmd: vec![ParsedCommand::Unknown { cmd: "echo hello world".into(), }], source: ExecCommandSource::Agent, interaction_input: None, stdout: String::new(), stderr: String::new(), aggregated_output: "hello world\n".into(), exit_code: 0, duration: Duration::from_millis(12), formatted_output: String::new(), status: CoreExecCommandStatus::Completed, }), EventMsg::McpToolCallEnd(McpToolCallEndEvent { call_id: "mcp-1".into(), invocation: McpInvocation { server: "docs".into(), tool: "lookup".into(), arguments: Some(serde_json::json!({"id":"123"})), }, duration: Duration::from_millis(8), result: Err("boom".into()), }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 4); assert_eq!( turns[0].items[1], ThreadItem::WebSearch { id: "search-1".into(), query: "codex".into(), action: Some(WebSearchAction::Search { query: Some("codex".into()), queries: None, }), } ); assert_eq!( turns[0].items[2], ThreadItem::CommandExecution { id: "exec-1".into(), command: "echo 'hello world'".into(), cwd: test_path_buf("/tmp").abs(), process_id: Some("pid-1".into()), source: CommandExecutionSource::Agent, status: CommandExecutionStatus::Completed, command_actions: vec![CommandAction::Unknown { command: "echo hello world".into(), }], aggregated_output: Some("hello world\n".into()), exit_code: Some(0), duration_ms: Some(12), } ); assert_eq!( turns[0].items[3], ThreadItem::McpToolCall { id: "mcp-1".into(), server: "docs".into(), tool: "lookup".into(), status: McpToolCallStatus::Failed, arguments: serde_json::json!({"id":"123"}), result: None, error: Some(McpToolCallError { message: "boom".into(), }), duration_ms: Some(8), } ); } #[test] fn reconstructs_mcp_tool_result_meta_from_persisted_completion_events() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-1".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::McpToolCallEnd(McpToolCallEndEvent { call_id: "mcp-1".into(), invocation: McpInvocation { server: "docs".into(), tool: "lookup".into(), arguments: Some(serde_json::json!({"id":"123"})), }, duration: Duration::from_millis(8), result: Ok(CallToolResult { content: vec![serde_json::json!({ "type": "text", "text": "result" })], structured_content: Some(serde_json::json!({"id":"123"})), is_error: Some(false), meta: Some(serde_json::json!({ "ui/resourceUri": "ui://widget/lookup.html" })), }), }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!( turns[0].items[0], ThreadItem::McpToolCall { id: "mcp-1".into(), server: "docs".into(), tool: "lookup".into(), status: McpToolCallStatus::Completed, arguments: serde_json::json!({"id":"123"}), result: Some(McpToolCallResult { content: vec![serde_json::json!({ "type": "text", "text": "result" })], structured_content: Some(serde_json::json!({"id":"123"})), meta: Some(serde_json::json!({ "ui/resourceUri": "ui://widget/lookup.html" })), }), error: None, duration_ms: Some(8), } ); } #[test] fn reconstructs_dynamic_tool_items_from_request_and_response_events() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-1".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "run dynamic tool".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::DynamicToolCallRequest( codex_protocol::dynamic_tools::DynamicToolCallRequest { call_id: "dyn-1".into(), turn_id: "turn-1".into(), tool: "lookup_ticket".into(), arguments: serde_json::json!({"id":"ABC-123"}), }, ), EventMsg::DynamicToolCallResponse(DynamicToolCallResponseEvent { call_id: "dyn-1".into(), turn_id: "turn-1".into(), tool: "lookup_ticket".into(), arguments: serde_json::json!({"id":"ABC-123"}), content_items: vec![CoreDynamicToolCallOutputContentItem::InputText { text: "Ticket is open".into(), }], success: true, error: None, duration: Duration::from_millis(42), }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 2); assert_eq!( turns[0].items[1], ThreadItem::DynamicToolCall { id: "dyn-1".into(), tool: "lookup_ticket".into(), arguments: serde_json::json!({"id":"ABC-123"}), status: DynamicToolCallStatus::Completed, content_items: Some(vec![DynamicToolCallOutputContentItem::InputText { text: "Ticket is open".into(), }]), success: Some(true), duration_ms: Some(42), } ); } #[test] fn reconstructs_declined_exec_and_patch_items() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-1".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "run tools".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::ExecCommandEnd(ExecCommandEndEvent { call_id: "exec-declined".into(), process_id: Some("pid-2".into()), turn_id: "turn-1".into(), command: vec!["ls".into()], cwd: test_path_buf("/tmp").abs(), parsed_cmd: vec![ParsedCommand::Unknown { cmd: "ls".into() }], source: ExecCommandSource::Agent, interaction_input: None, stdout: String::new(), stderr: "exec command rejected by user".into(), aggregated_output: "exec command rejected by user".into(), exit_code: -1, duration: Duration::ZERO, formatted_output: String::new(), status: CoreExecCommandStatus::Declined, }), EventMsg::PatchApplyEnd(PatchApplyEndEvent { call_id: "patch-declined".into(), turn_id: "turn-1".into(), stdout: String::new(), stderr: "patch rejected by user".into(), success: false, changes: [( PathBuf::from("README.md"), codex_protocol::protocol::FileChange::Add { content: "hello\n".into(), }, )] .into_iter() .collect(), status: CorePatchApplyStatus::Declined, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 3); assert_eq!( turns[0].items[1], ThreadItem::CommandExecution { id: "exec-declined".into(), command: "ls".into(), cwd: test_path_buf("/tmp").abs(), process_id: Some("pid-2".into()), source: CommandExecutionSource::Agent, status: CommandExecutionStatus::Declined, command_actions: vec![CommandAction::Unknown { command: "ls".into(), }], aggregated_output: Some("exec command rejected by user".into()), exit_code: Some(-1), duration_ms: Some(0), } ); assert_eq!( turns[0].items[2], ThreadItem::FileChange { id: "patch-declined".into(), changes: vec![FileUpdateChange { path: "README.md".into(), kind: PatchChangeKind::Add, diff: "hello\n".into(), }], status: PatchApplyStatus::Declined, } ); } #[test] fn reconstructs_declined_guardian_command_item() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-1".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "review this command".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::GuardianAssessment(GuardianAssessmentEvent { id: "review-guardian-exec".into(), target_item_id: Some("guardian-exec".into()), turn_id: "turn-1".into(), status: GuardianAssessmentStatus::InProgress, risk_level: None, user_authorization: None, rationale: None, decision_source: None, action: serde_json::from_value(serde_json::json!({ "type": "command", "source": "shell", "command": "rm -rf /tmp/guardian", "cwd": test_path_buf("/tmp"), })) .expect("guardian action"), }), EventMsg::GuardianAssessment(GuardianAssessmentEvent { id: "review-guardian-exec".into(), target_item_id: Some("guardian-exec".into()), turn_id: "turn-1".into(), status: GuardianAssessmentStatus::Denied, risk_level: Some(codex_protocol::protocol::GuardianRiskLevel::High), user_authorization: Some(codex_protocol::protocol::GuardianUserAuthorization::Low), rationale: Some("Would delete user data.".into()), decision_source: Some( codex_protocol::protocol::GuardianAssessmentDecisionSource::Agent, ), action: serde_json::from_value(serde_json::json!({ "type": "command", "source": "shell", "command": "rm -rf /tmp/guardian", "cwd": test_path_buf("/tmp"), })) .expect("guardian action"), }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 2); assert_eq!( turns[0].items[1], ThreadItem::CommandExecution { id: "guardian-exec".into(), command: "rm -rf /tmp/guardian".into(), cwd: test_path_buf("/tmp").abs(), process_id: None, source: CommandExecutionSource::Agent, status: CommandExecutionStatus::Declined, command_actions: vec![CommandAction::Unknown { command: "rm -rf /tmp/guardian".into(), }], aggregated_output: None, exit_code: None, duration_ms: None, } ); } #[test] fn reconstructs_in_progress_guardian_execve_item() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-1".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "run a subcommand".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::GuardianAssessment(GuardianAssessmentEvent { id: "review-guardian-execve".into(), target_item_id: Some("guardian-execve".into()), turn_id: "turn-1".into(), status: GuardianAssessmentStatus::InProgress, risk_level: None, user_authorization: None, rationale: None, decision_source: None, action: serde_json::from_value(serde_json::json!({ "type": "execve", "source": "shell", "program": "/bin/rm", "argv": ["/usr/bin/rm", "-f", "/tmp/file.sqlite"], "cwd": test_path_buf("/tmp"), })) .expect("guardian action"), }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 2); assert_eq!( turns[0].items[1], ThreadItem::CommandExecution { id: "guardian-execve".into(), command: "/bin/rm -f /tmp/file.sqlite".into(), cwd: test_path_buf("/tmp").abs(), process_id: None, source: CommandExecutionSource::Agent, status: CommandExecutionStatus::InProgress, command_actions: vec![CommandAction::Unknown { command: "/bin/rm -f /tmp/file.sqlite".into(), }], aggregated_output: None, exit_code: None, duration_ms: None, } ); } #[test] fn assigns_late_exec_completion_to_original_turn() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-a".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "first".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-b".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "second".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::ExecCommandEnd(ExecCommandEndEvent { call_id: "exec-late".into(), process_id: Some("pid-42".into()), turn_id: "turn-a".into(), command: vec!["echo".into(), "done".into()], cwd: test_path_buf("/tmp").abs(), parsed_cmd: vec![ParsedCommand::Unknown { cmd: "echo done".into(), }], source: ExecCommandSource::Agent, interaction_input: None, stdout: "done\n".into(), stderr: String::new(), aggregated_output: "done\n".into(), exit_code: 0, duration: Duration::from_millis(5), formatted_output: "done\n".into(), status: CoreExecCommandStatus::Completed, }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-b".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 2); assert_eq!(turns[0].id, "turn-a"); assert_eq!(turns[1].id, "turn-b"); assert_eq!(turns[0].items.len(), 2); assert_eq!(turns[1].items.len(), 1); assert_eq!( turns[0].items[1], ThreadItem::CommandExecution { id: "exec-late".into(), command: "echo done".into(), cwd: test_path_buf("/tmp").abs(), process_id: Some("pid-42".into()), source: CommandExecutionSource::Agent, status: CommandExecutionStatus::Completed, command_actions: vec![CommandAction::Unknown { command: "echo done".into(), }], aggregated_output: Some("done\n".into()), exit_code: Some(0), duration_ms: Some(5), } ); } #[test] fn drops_late_turn_scoped_item_for_unknown_turn_id() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-a".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "first".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-b".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "second".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::ExecCommandEnd(ExecCommandEndEvent { call_id: "exec-unknown-turn".into(), process_id: Some("pid-42".into()), turn_id: "turn-missing".into(), command: vec!["echo".into(), "done".into()], cwd: test_path_buf("/tmp").abs(), parsed_cmd: vec![ParsedCommand::Unknown { cmd: "echo done".into(), }], source: ExecCommandSource::Agent, interaction_input: None, stdout: "done\n".into(), stderr: String::new(), aggregated_output: "done\n".into(), exit_code: 0, duration: Duration::from_millis(5), formatted_output: "done\n".into(), status: CoreExecCommandStatus::Completed, }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-b".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), ]; let mut builder = ThreadHistoryBuilder::new(); for event in &events { builder.handle_event(event); } let turns = builder.finish(); assert_eq!(turns.len(), 2); assert_eq!(turns[0].id, "turn-a"); assert_eq!(turns[1].id, "turn-b"); assert_eq!(turns[0].items.len(), 1); assert_eq!(turns[1].items.len(), 1); assert_eq!( turns[1].items[0], ThreadItem::UserMessage { id: "item-2".into(), content: vec![UserInput::Text { text: "second".into(), text_elements: Vec::new(), }], } ); } #[test] fn patch_apply_begin_updates_active_turn_snapshot_with_file_change() { let turn_id = "turn-1"; let mut builder = ThreadHistoryBuilder::new(); let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: turn_id.to_string(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "apply patch".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: "patch-call".into(), turn_id: turn_id.to_string(), auto_approved: false, changes: [( PathBuf::from("README.md"), codex_protocol::protocol::FileChange::Add { content: "hello\n".into(), }, )] .into_iter() .collect(), }), ]; for event in &events { builder.handle_event(event); } let snapshot = builder .active_turn_snapshot() .expect("active turn snapshot"); assert_eq!(snapshot.id, turn_id); assert_eq!(snapshot.status, TurnStatus::InProgress); assert_eq!( snapshot.items, vec![ ThreadItem::UserMessage { id: "item-1".into(), content: vec![UserInput::Text { text: "apply patch".into(), text_elements: Vec::new(), }], }, ThreadItem::FileChange { id: "patch-call".into(), changes: vec![FileUpdateChange { path: "README.md".into(), kind: PatchChangeKind::Add, diff: "hello\n".into(), }], status: PatchApplyStatus::InProgress, }, ] ); } #[test] fn apply_patch_approval_request_updates_active_turn_snapshot_with_file_change() { let turn_id = "turn-1"; let mut builder = ThreadHistoryBuilder::new(); let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: turn_id.to_string(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "apply patch".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id: "patch-call".into(), turn_id: turn_id.to_string(), changes: [( PathBuf::from("README.md"), codex_protocol::protocol::FileChange::Add { content: "hello\n".into(), }, )] .into_iter() .collect(), reason: None, grant_root: None, }), ]; for event in &events { builder.handle_event(event); } let snapshot = builder .active_turn_snapshot() .expect("active turn snapshot"); assert_eq!(snapshot.id, turn_id); assert_eq!(snapshot.status, TurnStatus::InProgress); assert_eq!( snapshot.items, vec![ ThreadItem::UserMessage { id: "item-1".into(), content: vec![UserInput::Text { text: "apply patch".into(), text_elements: Vec::new(), }], }, ThreadItem::FileChange { id: "patch-call".into(), changes: vec![FileUpdateChange { path: "README.md".into(), kind: PatchChangeKind::Add, diff: "hello\n".into(), }], status: PatchApplyStatus::InProgress, }, ] ); } #[test] fn late_turn_complete_does_not_close_active_turn() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-a".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "first".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-b".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "second".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), EventMsg::AgentMessage(AgentMessageEvent { message: "still in b".into(), phase: None, memory_citation: None, }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-b".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 2); assert_eq!(turns[0].id, "turn-a"); assert_eq!(turns[1].id, "turn-b"); assert_eq!(turns[1].items.len(), 2); } #[test] fn late_turn_aborted_does_not_interrupt_active_turn() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-a".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "first".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-b".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "second".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::TurnAborted(TurnAbortedEvent { turn_id: Some("turn-a".into()), reason: TurnAbortReason::Replaced, completed_at: None, duration_ms: None, }), EventMsg::AgentMessage(AgentMessageEvent { message: "still in b".into(), phase: None, memory_citation: None, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 2); assert_eq!(turns[0].id, "turn-a"); assert_eq!(turns[1].id, "turn-b"); assert_eq!(turns[1].status, TurnStatus::InProgress); assert_eq!(turns[1].items.len(), 2); } #[test] fn preserves_compaction_only_turn() { let items = vec![ RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-compact".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), })), RolloutItem::Compacted(CompactedItem { message: String::new(), replacement_history: None, }), RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-compact".into(), last_agent_message: None, completed_at: None, duration_ms: None, })), ]; let turns = build_turns_from_rollout_items(&items); assert_eq!( turns, vec![Turn { id: "turn-compact".into(), status: TurnStatus::Completed, error: None, started_at: None, completed_at: None, duration_ms: None, items: Vec::new(), }] ); } #[test] fn reconstructs_collab_resume_end_item() { let events = vec![ EventMsg::UserMessage(UserMessageEvent { message: "resume agent".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::CollabResumeEnd(codex_protocol::protocol::CollabResumeEndEvent { call_id: "resume-1".into(), sender_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000001") .expect("valid sender thread id"), receiver_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000002") .expect("valid receiver thread id"), receiver_agent_nickname: None, receiver_agent_role: None, status: AgentStatus::Completed(None), }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 2); assert_eq!( turns[0].items[1], ThreadItem::CollabAgentToolCall { id: "resume-1".into(), tool: CollabAgentTool::ResumeAgent, status: CollabAgentToolCallStatus::Completed, sender_thread_id: "00000000-0000-0000-0000-000000000001".into(), receiver_thread_ids: vec!["00000000-0000-0000-0000-000000000002".into()], prompt: None, model: None, reasoning_effort: None, agents_states: [( "00000000-0000-0000-0000-000000000002".into(), CollabAgentState { status: crate::protocol::v2::CollabAgentStatus::Completed, message: None, }, )] .into_iter() .collect(), } ); } #[test] fn reconstructs_collab_spawn_end_item_with_model_metadata() { let sender_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000001") .expect("valid sender thread id"); let spawned_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000002") .expect("valid receiver thread id"); let events = vec![ EventMsg::UserMessage(UserMessageEvent { message: "spawn agent".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent { call_id: "spawn-1".into(), sender_thread_id, new_thread_id: Some(spawned_thread_id), new_agent_nickname: Some("Scout".into()), new_agent_role: Some("explorer".into()), prompt: "inspect the repo".into(), model: "gpt-5.4-mini".into(), reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium, status: AgentStatus::Running, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 2); assert_eq!( turns[0].items[1], ThreadItem::CollabAgentToolCall { id: "spawn-1".into(), tool: CollabAgentTool::SpawnAgent, status: CollabAgentToolCallStatus::Completed, sender_thread_id: "00000000-0000-0000-0000-000000000001".into(), receiver_thread_ids: vec!["00000000-0000-0000-0000-000000000002".into()], prompt: Some("inspect the repo".into()), model: Some("gpt-5.4-mini".into()), reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Medium), agents_states: [( "00000000-0000-0000-0000-000000000002".into(), CollabAgentState { status: crate::protocol::v2::CollabAgentStatus::Running, message: None, }, )] .into_iter() .collect(), } ); } #[test] fn reconstructs_interrupted_send_input_as_completed_collab_call() { // `send_input(interrupt=true)` first stops the child's active turn, then redirects it with // new input. The transient interrupted status should remain visible in agent state, but the // collab tool call itself is still a successful redirect rather than a failed operation. let sender = ThreadId::try_from("00000000-0000-0000-0000-000000000001") .expect("valid sender thread id"); let receiver = ThreadId::try_from("00000000-0000-0000-0000-000000000002") .expect("valid receiver thread id"); let events = vec![ EventMsg::UserMessage(UserMessageEvent { message: "redirect".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::CollabAgentInteractionBegin( codex_protocol::protocol::CollabAgentInteractionBeginEvent { call_id: "send-1".into(), sender_thread_id: sender, receiver_thread_id: receiver, prompt: "new task".into(), }, ), EventMsg::CollabAgentInteractionEnd( codex_protocol::protocol::CollabAgentInteractionEndEvent { call_id: "send-1".into(), sender_thread_id: sender, receiver_thread_id: receiver, receiver_agent_nickname: None, receiver_agent_role: None, prompt: "new task".into(), status: AgentStatus::Interrupted, }, ), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 2); assert_eq!( turns[0].items[1], ThreadItem::CollabAgentToolCall { id: "send-1".into(), tool: CollabAgentTool::SendInput, status: CollabAgentToolCallStatus::Completed, sender_thread_id: sender.to_string(), receiver_thread_ids: vec![receiver.to_string()], prompt: Some("new task".into()), model: None, reasoning_effort: None, agents_states: [( receiver.to_string(), CollabAgentState { status: crate::protocol::v2::CollabAgentStatus::Interrupted, message: None, }, )] .into_iter() .collect(), } ); } #[test] fn rollback_failed_error_does_not_mark_turn_failed() { let events = vec![ EventMsg::UserMessage(UserMessageEvent { message: "hello".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::AgentMessage(AgentMessageEvent { message: "done".into(), phase: None, memory_citation: None, }), EventMsg::Error(ErrorEvent { message: "rollback failed".into(), codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed), }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].status, TurnStatus::Completed); assert_eq!(turns[0].error, None); } #[test] fn out_of_turn_error_does_not_create_or_fail_a_turn() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-a".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "hello".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), EventMsg::Error(ErrorEvent { message: "request-level failure".into(), codex_error_info: Some(CodexErrorInfo::BadRequest), }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!( turns[0], Turn { id: "turn-a".into(), status: TurnStatus::Completed, error: None, started_at: None, completed_at: None, duration_ms: None, items: vec![ThreadItem::UserMessage { id: "item-1".into(), content: vec![UserInput::Text { text: "hello".into(), text_elements: Vec::new(), }], }], } ); } #[test] fn error_then_turn_complete_preserves_failed_status() { let events = vec![ EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-a".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), }), EventMsg::UserMessage(UserMessageEvent { message: "hello".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), }), EventMsg::Error(ErrorEvent { message: "stream failure".into(), codex_error_info: Some(CodexErrorInfo::ResponseStreamDisconnected { http_status_code: Some(502), }), }), EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, }), ]; let items = events .into_iter() .map(RolloutItem::EventMsg) .collect::>(); let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].id, "turn-a"); assert_eq!(turns[0].status, TurnStatus::Failed); assert_eq!( turns[0].error, Some(TurnError { message: "stream failure".into(), codex_error_info: Some( crate::protocol::v2::CodexErrorInfo::ResponseStreamDisconnected { http_status_code: Some(502), } ), additional_details: None, }) ); } #[test] fn rebuilds_hook_prompt_items_from_rollout_response_items() { let hook_prompt = build_hook_prompt_message(&[ CoreHookPromptFragment::from_single_hook("Retry with tests.", "hook-run-1"), CoreHookPromptFragment::from_single_hook("Then summarize cleanly.", "hook-run-2"), ]) .expect("hook prompt message"); let items = vec![ RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-a".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), })), RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { message: "hello".into(), images: None, text_elements: Vec::new(), local_images: Vec::new(), })), RolloutItem::ResponseItem(hook_prompt), RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, })), ]; let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert_eq!(turns[0].items.len(), 2); assert_eq!( turns[0].items[1], ThreadItem::HookPrompt { id: turns[0].items[1].id().to_string(), fragments: vec![ crate::protocol::v2::HookPromptFragment { text: "Retry with tests.".into(), hook_run_id: "hook-run-1".into(), }, crate::protocol::v2::HookPromptFragment { text: "Then summarize cleanly.".into(), hook_run_id: "hook-run-2".into(), }, ], } ); } #[test] fn ignores_plain_user_response_items_in_rollout_replay() { let items = vec![ RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { turn_id: "turn-a".into(), started_at: None, model_context_window: None, collaboration_mode_kind: Default::default(), })), RolloutItem::ResponseItem(codex_protocol::models::ResponseItem::Message { id: Some("msg-1".into()), role: "user".into(), content: vec![codex_protocol::models::ContentItem::InputText { text: "plain text".into(), }], end_turn: None, phase: None, }), RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { turn_id: "turn-a".into(), last_agent_message: None, completed_at: None, duration_ms: None, })), ]; let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); assert!(turns[0].items.is_empty()); } }