mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
## Summary - add `ForkSnapshotMode` to `ThreadManager::fork_thread` so callers can request either a committed snapshot or an interrupted snapshot - share the model-visible `<turn_aborted>` history marker between the live interrupt path and interrupted forks - update the small set of direct fork callsites to pass `ForkSnapshotMode::Committed` Note: this enables /btw to work similarly as Esc to interrupt (hopefully somewhat in distribution) --------- Co-authored-by: Codex <noreply@openai.com>
2834 lines
103 KiB
Rust
2834 lines
103 KiB
Rust
use crate::protocol::v2::CollabAgentState;
|
|
use crate::protocol::v2::CollabAgentTool;
|
|
use crate::protocol::v2::CollabAgentToolCallStatus;
|
|
use crate::protocol::v2::CommandAction;
|
|
use crate::protocol::v2::CommandExecutionStatus;
|
|
use crate::protocol::v2::DynamicToolCallOutputContentItem;
|
|
use crate::protocol::v2::DynamicToolCallStatus;
|
|
use crate::protocol::v2::FileUpdateChange;
|
|
use crate::protocol::v2::McpToolCallError;
|
|
use crate::protocol::v2::McpToolCallResult;
|
|
use crate::protocol::v2::McpToolCallStatus;
|
|
use crate::protocol::v2::PatchApplyStatus;
|
|
use crate::protocol::v2::PatchChangeKind;
|
|
use crate::protocol::v2::ThreadItem;
|
|
use crate::protocol::v2::Turn;
|
|
use crate::protocol::v2::TurnError as V2TurnError;
|
|
use crate::protocol::v2::TurnError;
|
|
use crate::protocol::v2::TurnStatus;
|
|
use crate::protocol::v2::UserInput;
|
|
use crate::protocol::v2::WebSearchAction;
|
|
use codex_protocol::items::parse_hook_prompt_message;
|
|
use codex_protocol::models::MessagePhase;
|
|
use codex_protocol::protocol::AgentReasoningEvent;
|
|
use codex_protocol::protocol::AgentReasoningRawContentEvent;
|
|
use codex_protocol::protocol::AgentStatus;
|
|
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
|
use codex_protocol::protocol::CompactedItem;
|
|
use codex_protocol::protocol::ContextCompactedEvent;
|
|
use codex_protocol::protocol::DynamicToolCallResponseEvent;
|
|
use codex_protocol::protocol::ErrorEvent;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::ExecCommandBeginEvent;
|
|
use codex_protocol::protocol::ExecCommandEndEvent;
|
|
use codex_protocol::protocol::ImageGenerationBeginEvent;
|
|
use codex_protocol::protocol::ImageGenerationEndEvent;
|
|
use codex_protocol::protocol::ItemCompletedEvent;
|
|
use codex_protocol::protocol::ItemStartedEvent;
|
|
use codex_protocol::protocol::McpToolCallBeginEvent;
|
|
use codex_protocol::protocol::McpToolCallEndEvent;
|
|
use codex_protocol::protocol::PatchApplyBeginEvent;
|
|
use codex_protocol::protocol::PatchApplyEndEvent;
|
|
use codex_protocol::protocol::ReviewOutputEvent;
|
|
use codex_protocol::protocol::RolloutItem;
|
|
use codex_protocol::protocol::ThreadRolledBackEvent;
|
|
use codex_protocol::protocol::TurnAbortedEvent;
|
|
use codex_protocol::protocol::TurnCompleteEvent;
|
|
use codex_protocol::protocol::TurnStartedEvent;
|
|
use codex_protocol::protocol::UserMessageEvent;
|
|
use codex_protocol::protocol::ViewImageToolCallEvent;
|
|
use codex_protocol::protocol::WebSearchBeginEvent;
|
|
use codex_protocol::protocol::WebSearchEndEvent;
|
|
use std::collections::HashMap;
|
|
use tracing::warn;
|
|
use uuid::Uuid;
|
|
|
|
#[cfg(test)]
|
|
use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus;
|
|
#[cfg(test)]
|
|
use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
|
|
|
|
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values.
|
|
///
|
|
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
|
|
/// resumed/rebuilt thread history preserves the original turn identifiers.
|
|
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
|
|
let mut builder = ThreadHistoryBuilder::new();
|
|
for item in items {
|
|
builder.handle_rollout_item(item);
|
|
}
|
|
builder.finish()
|
|
}
|
|
|
|
pub struct ThreadHistoryBuilder {
|
|
turns: Vec<Turn>,
|
|
current_turn: Option<PendingTurn>,
|
|
next_item_index: i64,
|
|
current_rollout_index: usize,
|
|
next_rollout_index: usize,
|
|
}
|
|
|
|
impl Default for ThreadHistoryBuilder {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl ThreadHistoryBuilder {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
turns: Vec::new(),
|
|
current_turn: None,
|
|
next_item_index: 1,
|
|
current_rollout_index: 0,
|
|
next_rollout_index: 0,
|
|
}
|
|
}
|
|
|
|
pub fn reset(&mut self) {
|
|
*self = Self::new();
|
|
}
|
|
|
|
pub fn finish(mut self) -> Vec<Turn> {
|
|
self.finish_current_turn();
|
|
self.turns
|
|
}
|
|
|
|
pub fn active_turn_snapshot(&self) -> Option<Turn> {
|
|
self.current_turn
|
|
.as_ref()
|
|
.map(Turn::from)
|
|
.or_else(|| self.turns.last().cloned())
|
|
}
|
|
|
|
pub fn has_active_turn(&self) -> bool {
|
|
self.current_turn.is_some()
|
|
}
|
|
|
|
pub fn active_turn_id_if_explicit(&self) -> Option<String> {
|
|
self.current_turn
|
|
.as_ref()
|
|
.filter(|turn| turn.opened_explicitly)
|
|
.map(|turn| turn.id.clone())
|
|
}
|
|
|
|
pub fn active_turn_start_index(&self) -> Option<usize> {
|
|
self.current_turn
|
|
.as_ref()
|
|
.map(|turn| turn.rollout_start_index)
|
|
}
|
|
|
|
/// Shared reducer for persisted rollout replay and in-memory current-turn
|
|
/// tracking used by running thread resume/rejoin.
|
|
///
|
|
/// This function should handle all EventMsg variants that can be persisted in a rollout file.
|
|
/// See `should_persist_event_msg` in `codex-rs/core/rollout/policy.rs`.
|
|
pub fn handle_event(&mut self, event: &EventMsg) {
|
|
match event {
|
|
EventMsg::UserMessage(payload) => self.handle_user_message(payload),
|
|
EventMsg::AgentMessage(payload) => self.handle_agent_message(
|
|
payload.message.clone(),
|
|
payload.phase.clone(),
|
|
payload.memory_citation.clone().map(Into::into),
|
|
),
|
|
EventMsg::AgentReasoning(payload) => self.handle_agent_reasoning(payload),
|
|
EventMsg::AgentReasoningRawContent(payload) => {
|
|
self.handle_agent_reasoning_raw_content(payload)
|
|
}
|
|
EventMsg::WebSearchBegin(payload) => self.handle_web_search_begin(payload),
|
|
EventMsg::WebSearchEnd(payload) => self.handle_web_search_end(payload),
|
|
EventMsg::ExecCommandBegin(payload) => self.handle_exec_command_begin(payload),
|
|
EventMsg::ExecCommandEnd(payload) => self.handle_exec_command_end(payload),
|
|
EventMsg::ApplyPatchApprovalRequest(payload) => {
|
|
self.handle_apply_patch_approval_request(payload)
|
|
}
|
|
EventMsg::PatchApplyBegin(payload) => self.handle_patch_apply_begin(payload),
|
|
EventMsg::PatchApplyEnd(payload) => self.handle_patch_apply_end(payload),
|
|
EventMsg::DynamicToolCallRequest(payload) => {
|
|
self.handle_dynamic_tool_call_request(payload)
|
|
}
|
|
EventMsg::DynamicToolCallResponse(payload) => {
|
|
self.handle_dynamic_tool_call_response(payload)
|
|
}
|
|
EventMsg::McpToolCallBegin(payload) => self.handle_mcp_tool_call_begin(payload),
|
|
EventMsg::McpToolCallEnd(payload) => self.handle_mcp_tool_call_end(payload),
|
|
EventMsg::ViewImageToolCall(payload) => self.handle_view_image_tool_call(payload),
|
|
EventMsg::ImageGenerationBegin(payload) => self.handle_image_generation_begin(payload),
|
|
EventMsg::ImageGenerationEnd(payload) => self.handle_image_generation_end(payload),
|
|
EventMsg::CollabAgentSpawnBegin(payload) => {
|
|
self.handle_collab_agent_spawn_begin(payload)
|
|
}
|
|
EventMsg::CollabAgentSpawnEnd(payload) => self.handle_collab_agent_spawn_end(payload),
|
|
EventMsg::CollabAgentInteractionBegin(payload) => {
|
|
self.handle_collab_agent_interaction_begin(payload)
|
|
}
|
|
EventMsg::CollabAgentInteractionEnd(payload) => {
|
|
self.handle_collab_agent_interaction_end(payload)
|
|
}
|
|
EventMsg::CollabWaitingBegin(payload) => self.handle_collab_waiting_begin(payload),
|
|
EventMsg::CollabWaitingEnd(payload) => self.handle_collab_waiting_end(payload),
|
|
EventMsg::CollabCloseBegin(payload) => self.handle_collab_close_begin(payload),
|
|
EventMsg::CollabCloseEnd(payload) => self.handle_collab_close_end(payload),
|
|
EventMsg::CollabResumeBegin(payload) => self.handle_collab_resume_begin(payload),
|
|
EventMsg::CollabResumeEnd(payload) => self.handle_collab_resume_end(payload),
|
|
EventMsg::ContextCompacted(payload) => self.handle_context_compacted(payload),
|
|
EventMsg::EnteredReviewMode(payload) => self.handle_entered_review_mode(payload),
|
|
EventMsg::ExitedReviewMode(payload) => self.handle_exited_review_mode(payload),
|
|
EventMsg::ItemStarted(payload) => self.handle_item_started(payload),
|
|
EventMsg::ItemCompleted(payload) => self.handle_item_completed(payload),
|
|
EventMsg::HookStarted(_) | EventMsg::HookCompleted(_) => {}
|
|
EventMsg::Error(payload) => self.handle_error(payload),
|
|
EventMsg::TokenCount(_) => {}
|
|
EventMsg::ThreadRolledBack(payload) => self.handle_thread_rollback(payload),
|
|
EventMsg::UndoCompleted(_) => {}
|
|
EventMsg::TurnAborted(payload) => self.handle_turn_aborted(payload),
|
|
EventMsg::TurnStarted(payload) => self.handle_turn_started(payload),
|
|
EventMsg::TurnComplete(payload) => self.handle_turn_complete(payload),
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
pub fn handle_rollout_item(&mut self, item: &RolloutItem) {
|
|
self.current_rollout_index = self.next_rollout_index;
|
|
self.next_rollout_index += 1;
|
|
match item {
|
|
RolloutItem::EventMsg(event) => self.handle_event(event),
|
|
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
|
|
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
|
|
RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {}
|
|
}
|
|
}
|
|
|
|
fn handle_response_item(&mut self, item: &codex_protocol::models::ResponseItem) {
|
|
let codex_protocol::models::ResponseItem::Message {
|
|
role, content, id, ..
|
|
} = item
|
|
else {
|
|
return;
|
|
};
|
|
|
|
if role != "user" {
|
|
return;
|
|
}
|
|
|
|
let Some(hook_prompt) = parse_hook_prompt_message(id.as_ref(), content) else {
|
|
return;
|
|
};
|
|
|
|
self.ensure_turn().items.push(ThreadItem::HookPrompt {
|
|
id: hook_prompt.id,
|
|
fragments: hook_prompt
|
|
.fragments
|
|
.into_iter()
|
|
.map(crate::protocol::v2::HookPromptFragment::from)
|
|
.collect(),
|
|
});
|
|
}
|
|
|
|
fn handle_user_message(&mut self, payload: &UserMessageEvent) {
|
|
// User messages should stay in explicitly opened turns. For backward
|
|
// compatibility with older streams that did not open turns explicitly,
|
|
// close any implicit/inactive turn and start a fresh one for this input.
|
|
if let Some(turn) = self.current_turn.as_ref()
|
|
&& !turn.opened_explicitly
|
|
&& !(turn.saw_compaction && turn.items.is_empty())
|
|
{
|
|
self.finish_current_turn();
|
|
}
|
|
let mut turn = self
|
|
.current_turn
|
|
.take()
|
|
.unwrap_or_else(|| self.new_turn(/*id*/ None));
|
|
let id = self.next_item_id();
|
|
let content = self.build_user_inputs(payload);
|
|
turn.items.push(ThreadItem::UserMessage { id, content });
|
|
self.current_turn = Some(turn);
|
|
}
|
|
|
|
fn handle_agent_message(
|
|
&mut self,
|
|
text: String,
|
|
phase: Option<MessagePhase>,
|
|
memory_citation: Option<crate::protocol::v2::MemoryCitation>,
|
|
) {
|
|
if text.is_empty() {
|
|
return;
|
|
}
|
|
|
|
let id = self.next_item_id();
|
|
self.ensure_turn().items.push(ThreadItem::AgentMessage {
|
|
id,
|
|
text,
|
|
phase,
|
|
memory_citation,
|
|
});
|
|
}
|
|
|
|
fn handle_agent_reasoning(&mut self, payload: &AgentReasoningEvent) {
|
|
if payload.text.is_empty() {
|
|
return;
|
|
}
|
|
|
|
// If the last item is a reasoning item, add the new text to the summary.
|
|
if let Some(ThreadItem::Reasoning { summary, .. }) = self.ensure_turn().items.last_mut() {
|
|
summary.push(payload.text.clone());
|
|
return;
|
|
}
|
|
|
|
// Otherwise, create a new reasoning item.
|
|
let id = self.next_item_id();
|
|
self.ensure_turn().items.push(ThreadItem::Reasoning {
|
|
id,
|
|
summary: vec![payload.text.clone()],
|
|
content: Vec::new(),
|
|
});
|
|
}
|
|
|
|
fn handle_agent_reasoning_raw_content(&mut self, payload: &AgentReasoningRawContentEvent) {
|
|
if payload.text.is_empty() {
|
|
return;
|
|
}
|
|
|
|
// If the last item is a reasoning item, add the new text to the content.
|
|
if let Some(ThreadItem::Reasoning { content, .. }) = self.ensure_turn().items.last_mut() {
|
|
content.push(payload.text.clone());
|
|
return;
|
|
}
|
|
|
|
// Otherwise, create a new reasoning item.
|
|
let id = self.next_item_id();
|
|
self.ensure_turn().items.push(ThreadItem::Reasoning {
|
|
id,
|
|
summary: Vec::new(),
|
|
content: vec![payload.text.clone()],
|
|
});
|
|
}
|
|
|
|
fn handle_item_started(&mut self, payload: &ItemStartedEvent) {
|
|
match &payload.item {
|
|
codex_protocol::items::TurnItem::Plan(plan) => {
|
|
if plan.text.is_empty() {
|
|
return;
|
|
}
|
|
self.upsert_item_in_turn_id(
|
|
&payload.turn_id,
|
|
ThreadItem::from(payload.item.clone()),
|
|
);
|
|
}
|
|
codex_protocol::items::TurnItem::UserMessage(_)
|
|
| codex_protocol::items::TurnItem::HookPrompt(_)
|
|
| codex_protocol::items::TurnItem::AgentMessage(_)
|
|
| codex_protocol::items::TurnItem::Reasoning(_)
|
|
| codex_protocol::items::TurnItem::WebSearch(_)
|
|
| codex_protocol::items::TurnItem::ImageGeneration(_)
|
|
| codex_protocol::items::TurnItem::ContextCompaction(_) => {}
|
|
}
|
|
}
|
|
|
|
fn handle_item_completed(&mut self, payload: &ItemCompletedEvent) {
|
|
match &payload.item {
|
|
codex_protocol::items::TurnItem::Plan(plan) => {
|
|
if plan.text.is_empty() {
|
|
return;
|
|
}
|
|
self.upsert_item_in_turn_id(
|
|
&payload.turn_id,
|
|
ThreadItem::from(payload.item.clone()),
|
|
);
|
|
}
|
|
codex_protocol::items::TurnItem::UserMessage(_)
|
|
| codex_protocol::items::TurnItem::HookPrompt(_)
|
|
| codex_protocol::items::TurnItem::AgentMessage(_)
|
|
| codex_protocol::items::TurnItem::Reasoning(_)
|
|
| codex_protocol::items::TurnItem::WebSearch(_)
|
|
| codex_protocol::items::TurnItem::ImageGeneration(_)
|
|
| codex_protocol::items::TurnItem::ContextCompaction(_) => {}
|
|
}
|
|
}
|
|
|
|
fn handle_web_search_begin(&mut self, payload: &WebSearchBeginEvent) {
|
|
let item = ThreadItem::WebSearch {
|
|
id: payload.call_id.clone(),
|
|
query: String::new(),
|
|
action: None,
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_web_search_end(&mut self, payload: &WebSearchEndEvent) {
|
|
let item = ThreadItem::WebSearch {
|
|
id: payload.call_id.clone(),
|
|
query: payload.query.clone(),
|
|
action: Some(WebSearchAction::from(payload.action.clone())),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_exec_command_begin(&mut self, payload: &ExecCommandBeginEvent) {
|
|
let command = shlex::try_join(payload.command.iter().map(String::as_str))
|
|
.unwrap_or_else(|_| payload.command.join(" "));
|
|
let command_actions = payload
|
|
.parsed_cmd
|
|
.iter()
|
|
.cloned()
|
|
.map(CommandAction::from)
|
|
.collect();
|
|
let item = ThreadItem::CommandExecution {
|
|
id: payload.call_id.clone(),
|
|
command,
|
|
cwd: payload.cwd.clone(),
|
|
process_id: payload.process_id.clone(),
|
|
source: payload.source.into(),
|
|
status: CommandExecutionStatus::InProgress,
|
|
command_actions,
|
|
aggregated_output: None,
|
|
exit_code: None,
|
|
duration_ms: None,
|
|
};
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
|
|
fn handle_exec_command_end(&mut self, payload: &ExecCommandEndEvent) {
|
|
let status: CommandExecutionStatus = (&payload.status).into();
|
|
let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX);
|
|
let aggregated_output = if payload.aggregated_output.is_empty() {
|
|
None
|
|
} else {
|
|
Some(payload.aggregated_output.clone())
|
|
};
|
|
let command = shlex::try_join(payload.command.iter().map(String::as_str))
|
|
.unwrap_or_else(|_| payload.command.join(" "));
|
|
let command_actions = payload
|
|
.parsed_cmd
|
|
.iter()
|
|
.cloned()
|
|
.map(CommandAction::from)
|
|
.collect();
|
|
let item = ThreadItem::CommandExecution {
|
|
id: payload.call_id.clone(),
|
|
command,
|
|
cwd: payload.cwd.clone(),
|
|
process_id: payload.process_id.clone(),
|
|
source: payload.source.into(),
|
|
status,
|
|
command_actions,
|
|
aggregated_output,
|
|
exit_code: Some(payload.exit_code),
|
|
duration_ms: Some(duration_ms),
|
|
};
|
|
// Command completions can arrive out of order. Unified exec may return
|
|
// while a PTY is still running, then emit ExecCommandEnd later from a
|
|
// background exit watcher when that process finally exits. By then, a
|
|
// newer user turn may already have started. Route by event turn_id so
|
|
// replay preserves the original turn association.
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
|
|
fn handle_apply_patch_approval_request(&mut self, payload: &ApplyPatchApprovalRequestEvent) {
|
|
let item = ThreadItem::FileChange {
|
|
id: payload.call_id.clone(),
|
|
changes: convert_patch_changes(&payload.changes),
|
|
status: PatchApplyStatus::InProgress,
|
|
};
|
|
if payload.turn_id.is_empty() {
|
|
self.upsert_item_in_current_turn(item);
|
|
} else {
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
}
|
|
|
|
fn handle_patch_apply_begin(&mut self, payload: &PatchApplyBeginEvent) {
|
|
let item = ThreadItem::FileChange {
|
|
id: payload.call_id.clone(),
|
|
changes: convert_patch_changes(&payload.changes),
|
|
status: PatchApplyStatus::InProgress,
|
|
};
|
|
if payload.turn_id.is_empty() {
|
|
self.upsert_item_in_current_turn(item);
|
|
} else {
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
}
|
|
|
|
fn handle_patch_apply_end(&mut self, payload: &PatchApplyEndEvent) {
|
|
let status: PatchApplyStatus = (&payload.status).into();
|
|
let item = ThreadItem::FileChange {
|
|
id: payload.call_id.clone(),
|
|
changes: convert_patch_changes(&payload.changes),
|
|
status,
|
|
};
|
|
if payload.turn_id.is_empty() {
|
|
self.upsert_item_in_current_turn(item);
|
|
} else {
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
}
|
|
|
|
fn handle_dynamic_tool_call_request(
|
|
&mut self,
|
|
payload: &codex_protocol::dynamic_tools::DynamicToolCallRequest,
|
|
) {
|
|
let item = ThreadItem::DynamicToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: payload.tool.clone(),
|
|
arguments: payload.arguments.clone(),
|
|
status: DynamicToolCallStatus::InProgress,
|
|
content_items: None,
|
|
success: None,
|
|
duration_ms: None,
|
|
};
|
|
if payload.turn_id.is_empty() {
|
|
self.upsert_item_in_current_turn(item);
|
|
} else {
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
}
|
|
|
|
fn handle_dynamic_tool_call_response(&mut self, payload: &DynamicToolCallResponseEvent) {
|
|
let status = if payload.success {
|
|
DynamicToolCallStatus::Completed
|
|
} else {
|
|
DynamicToolCallStatus::Failed
|
|
};
|
|
let duration_ms = i64::try_from(payload.duration.as_millis()).ok();
|
|
let item = ThreadItem::DynamicToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: payload.tool.clone(),
|
|
arguments: payload.arguments.clone(),
|
|
status,
|
|
content_items: Some(convert_dynamic_tool_content_items(&payload.content_items)),
|
|
success: Some(payload.success),
|
|
duration_ms,
|
|
};
|
|
if payload.turn_id.is_empty() {
|
|
self.upsert_item_in_current_turn(item);
|
|
} else {
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
}
|
|
|
|
fn handle_mcp_tool_call_begin(&mut self, payload: &McpToolCallBeginEvent) {
|
|
let item = ThreadItem::McpToolCall {
|
|
id: payload.call_id.clone(),
|
|
server: payload.invocation.server.clone(),
|
|
tool: payload.invocation.tool.clone(),
|
|
status: McpToolCallStatus::InProgress,
|
|
arguments: payload
|
|
.invocation
|
|
.arguments
|
|
.clone()
|
|
.unwrap_or(serde_json::Value::Null),
|
|
result: None,
|
|
error: None,
|
|
duration_ms: None,
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_mcp_tool_call_end(&mut self, payload: &McpToolCallEndEvent) {
|
|
let status = if payload.is_success() {
|
|
McpToolCallStatus::Completed
|
|
} else {
|
|
McpToolCallStatus::Failed
|
|
};
|
|
let duration_ms = i64::try_from(payload.duration.as_millis()).ok();
|
|
let (result, error) = match &payload.result {
|
|
Ok(value) => (
|
|
Some(McpToolCallResult {
|
|
content: value.content.clone(),
|
|
structured_content: value.structured_content.clone(),
|
|
}),
|
|
None,
|
|
),
|
|
Err(message) => (
|
|
None,
|
|
Some(McpToolCallError {
|
|
message: message.clone(),
|
|
}),
|
|
),
|
|
};
|
|
let item = ThreadItem::McpToolCall {
|
|
id: payload.call_id.clone(),
|
|
server: payload.invocation.server.clone(),
|
|
tool: payload.invocation.tool.clone(),
|
|
status,
|
|
arguments: payload
|
|
.invocation
|
|
.arguments
|
|
.clone()
|
|
.unwrap_or(serde_json::Value::Null),
|
|
result,
|
|
error,
|
|
duration_ms,
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_view_image_tool_call(&mut self, payload: &ViewImageToolCallEvent) {
|
|
let item = ThreadItem::ImageView {
|
|
id: payload.call_id.clone(),
|
|
path: payload.path.to_string_lossy().into_owned(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_image_generation_begin(&mut self, payload: &ImageGenerationBeginEvent) {
|
|
let item = ThreadItem::ImageGeneration {
|
|
id: payload.call_id.clone(),
|
|
status: String::new(),
|
|
revised_prompt: None,
|
|
result: String::new(),
|
|
saved_path: None,
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_image_generation_end(&mut self, payload: &ImageGenerationEndEvent) {
|
|
let item = ThreadItem::ImageGeneration {
|
|
id: payload.call_id.clone(),
|
|
status: payload.status.clone(),
|
|
revised_prompt: payload.revised_prompt.clone(),
|
|
result: payload.result.clone(),
|
|
saved_path: payload.saved_path.clone(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_agent_spawn_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabAgentSpawnBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::SpawnAgent,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: Vec::new(),
|
|
prompt: Some(payload.prompt.clone()),
|
|
model: Some(payload.model.clone()),
|
|
reasoning_effort: Some(payload.reasoning_effort),
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_agent_spawn_end(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
|
|
) {
|
|
let has_receiver = payload.new_thread_id.is_some();
|
|
let status = match &payload.status {
|
|
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
|
_ if has_receiver => CollabAgentToolCallStatus::Completed,
|
|
_ => CollabAgentToolCallStatus::Failed,
|
|
};
|
|
let (receiver_thread_ids, agents_states) = match &payload.new_thread_id {
|
|
Some(id) => {
|
|
let receiver_id = id.to_string();
|
|
let received_status = CollabAgentState::from(payload.status.clone());
|
|
(
|
|
vec![receiver_id.clone()],
|
|
[(receiver_id, received_status)].into_iter().collect(),
|
|
)
|
|
}
|
|
None => (Vec::new(), HashMap::new()),
|
|
};
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::SpawnAgent,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids,
|
|
prompt: Some(payload.prompt.clone()),
|
|
model: Some(payload.model.clone()),
|
|
reasoning_effort: Some(payload.reasoning_effort),
|
|
agents_states,
|
|
});
|
|
}
|
|
|
|
fn handle_collab_agent_interaction_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabAgentInteractionBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::SendInput,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![payload.receiver_thread_id.to_string()],
|
|
prompt: Some(payload.prompt.clone()),
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_agent_interaction_end(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabAgentInteractionEndEvent,
|
|
) {
|
|
let status = match &payload.status {
|
|
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
|
_ => CollabAgentToolCallStatus::Completed,
|
|
};
|
|
let receiver_id = payload.receiver_thread_id.to_string();
|
|
let received_status = CollabAgentState::from(payload.status.clone());
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::SendInput,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![receiver_id.clone()],
|
|
prompt: Some(payload.prompt.clone()),
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states: [(receiver_id, received_status)].into_iter().collect(),
|
|
});
|
|
}
|
|
|
|
fn handle_collab_waiting_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabWaitingBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::Wait,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: payload
|
|
.receiver_thread_ids
|
|
.iter()
|
|
.map(ToString::to_string)
|
|
.collect(),
|
|
prompt: None,
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_waiting_end(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabWaitingEndEvent,
|
|
) {
|
|
let status = if payload
|
|
.statuses
|
|
.values()
|
|
.any(|status| matches!(status, AgentStatus::Errored(_) | AgentStatus::NotFound))
|
|
{
|
|
CollabAgentToolCallStatus::Failed
|
|
} else {
|
|
CollabAgentToolCallStatus::Completed
|
|
};
|
|
let mut receiver_thread_ids: Vec<String> =
|
|
payload.statuses.keys().map(ToString::to_string).collect();
|
|
receiver_thread_ids.sort();
|
|
let agents_states = payload
|
|
.statuses
|
|
.iter()
|
|
.map(|(id, status)| (id.to_string(), CollabAgentState::from(status.clone())))
|
|
.collect();
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::Wait,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids,
|
|
prompt: None,
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states,
|
|
});
|
|
}
|
|
|
|
fn handle_collab_close_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabCloseBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::CloseAgent,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![payload.receiver_thread_id.to_string()],
|
|
prompt: None,
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_close_end(&mut self, payload: &codex_protocol::protocol::CollabCloseEndEvent) {
|
|
let status = match &payload.status {
|
|
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
|
_ => CollabAgentToolCallStatus::Completed,
|
|
};
|
|
let receiver_id = payload.receiver_thread_id.to_string();
|
|
let agents_states = [(
|
|
receiver_id.clone(),
|
|
CollabAgentState::from(payload.status.clone()),
|
|
)]
|
|
.into_iter()
|
|
.collect();
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::CloseAgent,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![receiver_id],
|
|
prompt: None,
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states,
|
|
});
|
|
}
|
|
|
|
fn handle_collab_resume_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabResumeBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::ResumeAgent,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![payload.receiver_thread_id.to_string()],
|
|
prompt: None,
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_resume_end(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabResumeEndEvent,
|
|
) {
|
|
let status = match &payload.status {
|
|
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
|
_ => CollabAgentToolCallStatus::Completed,
|
|
};
|
|
let receiver_id = payload.receiver_thread_id.to_string();
|
|
let agents_states = [(
|
|
receiver_id.clone(),
|
|
CollabAgentState::from(payload.status.clone()),
|
|
)]
|
|
.into_iter()
|
|
.collect();
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::ResumeAgent,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![receiver_id],
|
|
prompt: None,
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states,
|
|
});
|
|
}
|
|
|
|
fn handle_context_compacted(&mut self, _payload: &ContextCompactedEvent) {
|
|
let id = self.next_item_id();
|
|
self.ensure_turn()
|
|
.items
|
|
.push(ThreadItem::ContextCompaction { id });
|
|
}
|
|
|
|
fn handle_entered_review_mode(&mut self, payload: &codex_protocol::protocol::ReviewRequest) {
|
|
let review = payload
|
|
.user_facing_hint
|
|
.clone()
|
|
.unwrap_or_else(|| "Review requested.".to_string());
|
|
let id = self.next_item_id();
|
|
self.ensure_turn()
|
|
.items
|
|
.push(ThreadItem::EnteredReviewMode { id, review });
|
|
}
|
|
|
|
fn handle_exited_review_mode(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::ExitedReviewModeEvent,
|
|
) {
|
|
let review = payload
|
|
.review_output
|
|
.as_ref()
|
|
.map(render_review_output_text)
|
|
.unwrap_or_else(|| REVIEW_FALLBACK_MESSAGE.to_string());
|
|
let id = self.next_item_id();
|
|
self.ensure_turn()
|
|
.items
|
|
.push(ThreadItem::ExitedReviewMode { id, review });
|
|
}
|
|
|
|
fn handle_error(&mut self, payload: &ErrorEvent) {
|
|
if !payload.affects_turn_status() {
|
|
return;
|
|
}
|
|
let Some(turn) = self.current_turn.as_mut() else {
|
|
return;
|
|
};
|
|
turn.status = TurnStatus::Failed;
|
|
turn.error = Some(V2TurnError {
|
|
message: payload.message.clone(),
|
|
codex_error_info: payload.codex_error_info.clone().map(Into::into),
|
|
additional_details: None,
|
|
});
|
|
}
|
|
|
|
fn handle_turn_aborted(&mut self, payload: &TurnAbortedEvent) {
|
|
if let Some(turn_id) = payload.turn_id.as_deref() {
|
|
// Prefer an exact ID match so we interrupt the turn explicitly targeted by the event.
|
|
if let Some(turn) = self.current_turn.as_mut().filter(|turn| turn.id == turn_id) {
|
|
turn.status = TurnStatus::Interrupted;
|
|
return;
|
|
}
|
|
|
|
if let Some(turn) = self.turns.iter_mut().find(|turn| turn.id == turn_id) {
|
|
turn.status = TurnStatus::Interrupted;
|
|
return;
|
|
}
|
|
}
|
|
|
|
// If the event has no ID (or refers to an unknown turn), fall back to the active turn.
|
|
if let Some(turn) = self.current_turn.as_mut() {
|
|
turn.status = TurnStatus::Interrupted;
|
|
}
|
|
}
|
|
|
|
fn handle_turn_started(&mut self, payload: &TurnStartedEvent) {
|
|
self.finish_current_turn();
|
|
self.current_turn = Some(
|
|
self.new_turn(Some(payload.turn_id.clone()))
|
|
.with_status(TurnStatus::InProgress)
|
|
.opened_explicitly(),
|
|
);
|
|
}
|
|
|
|
fn handle_turn_complete(&mut self, payload: &TurnCompleteEvent) {
|
|
let mark_completed = |status: &mut TurnStatus| {
|
|
if matches!(*status, TurnStatus::Completed | TurnStatus::InProgress) {
|
|
*status = TurnStatus::Completed;
|
|
}
|
|
};
|
|
|
|
// Prefer an exact ID match from the active turn and then close it.
|
|
if let Some(current_turn) = self
|
|
.current_turn
|
|
.as_mut()
|
|
.filter(|turn| turn.id == payload.turn_id)
|
|
{
|
|
mark_completed(&mut current_turn.status);
|
|
self.finish_current_turn();
|
|
return;
|
|
}
|
|
|
|
if let Some(turn) = self
|
|
.turns
|
|
.iter_mut()
|
|
.find(|turn| turn.id == payload.turn_id)
|
|
{
|
|
mark_completed(&mut turn.status);
|
|
return;
|
|
}
|
|
|
|
// If the completion event cannot be matched, apply it to the active turn.
|
|
if let Some(current_turn) = self.current_turn.as_mut() {
|
|
mark_completed(&mut current_turn.status);
|
|
self.finish_current_turn();
|
|
}
|
|
}
|
|
|
|
/// Marks the current turn as containing a persisted compaction marker.
|
|
///
|
|
/// This keeps compaction-only legacy turns from being dropped by
|
|
/// `finish_current_turn` when they have no renderable items and were not
|
|
/// explicitly opened.
|
|
fn handle_compacted(&mut self, _payload: &CompactedItem) {
|
|
self.ensure_turn().saw_compaction = true;
|
|
}
|
|
|
|
fn handle_thread_rollback(&mut self, payload: &ThreadRolledBackEvent) {
|
|
self.finish_current_turn();
|
|
|
|
let n = usize::try_from(payload.num_turns).unwrap_or(usize::MAX);
|
|
if n >= self.turns.len() {
|
|
self.turns.clear();
|
|
} else {
|
|
self.turns.truncate(self.turns.len().saturating_sub(n));
|
|
}
|
|
|
|
let item_count: usize = self.turns.iter().map(|t| t.items.len()).sum();
|
|
self.next_item_index = i64::try_from(item_count.saturating_add(1)).unwrap_or(i64::MAX);
|
|
}
|
|
|
|
fn finish_current_turn(&mut self) {
|
|
if let Some(turn) = self.current_turn.take() {
|
|
if turn.items.is_empty() && !turn.opened_explicitly && !turn.saw_compaction {
|
|
return;
|
|
}
|
|
self.turns.push(turn.into());
|
|
}
|
|
}
|
|
|
|
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
|
|
PendingTurn {
|
|
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
|
|
items: Vec::new(),
|
|
error: None,
|
|
status: TurnStatus::Completed,
|
|
opened_explicitly: false,
|
|
saw_compaction: false,
|
|
rollout_start_index: self.current_rollout_index,
|
|
}
|
|
}
|
|
|
|
fn ensure_turn(&mut self) -> &mut PendingTurn {
|
|
if self.current_turn.is_none() {
|
|
let turn = self.new_turn(/*id*/ None);
|
|
return self.current_turn.insert(turn);
|
|
}
|
|
|
|
if let Some(turn) = self.current_turn.as_mut() {
|
|
return turn;
|
|
}
|
|
|
|
unreachable!("current turn must exist after initialization");
|
|
}
|
|
|
|
fn upsert_item_in_turn_id(&mut self, turn_id: &str, item: ThreadItem) {
|
|
if let Some(turn) = self.current_turn.as_mut()
|
|
&& turn.id == turn_id
|
|
{
|
|
upsert_turn_item(&mut turn.items, item);
|
|
return;
|
|
}
|
|
|
|
if let Some(turn) = self.turns.iter_mut().find(|turn| turn.id == turn_id) {
|
|
upsert_turn_item(&mut turn.items, item);
|
|
return;
|
|
}
|
|
|
|
warn!(
|
|
item_id = item.id(),
|
|
"dropping turn-scoped item for unknown turn id `{turn_id}`"
|
|
);
|
|
}
|
|
|
|
fn upsert_item_in_current_turn(&mut self, item: ThreadItem) {
|
|
let turn = self.ensure_turn();
|
|
upsert_turn_item(&mut turn.items, item);
|
|
}
|
|
|
|
fn next_item_id(&mut self) -> String {
|
|
let id = format!("item-{}", self.next_item_index);
|
|
self.next_item_index += 1;
|
|
id
|
|
}
|
|
|
|
fn build_user_inputs(&self, payload: &UserMessageEvent) -> Vec<UserInput> {
|
|
let mut content = Vec::new();
|
|
if !payload.message.trim().is_empty() {
|
|
content.push(UserInput::Text {
|
|
text: payload.message.clone(),
|
|
text_elements: payload
|
|
.text_elements
|
|
.iter()
|
|
.cloned()
|
|
.map(Into::into)
|
|
.collect(),
|
|
});
|
|
}
|
|
if let Some(images) = &payload.images {
|
|
for image in images {
|
|
content.push(UserInput::Image { url: image.clone() });
|
|
}
|
|
}
|
|
for path in &payload.local_images {
|
|
content.push(UserInput::LocalImage { path: path.clone() });
|
|
}
|
|
content
|
|
}
|
|
}
|
|
|
|
const REVIEW_FALLBACK_MESSAGE: &str = "Reviewer failed to output a response.";
|
|
|
|
fn render_review_output_text(output: &ReviewOutputEvent) -> String {
|
|
let explanation = output.overall_explanation.trim();
|
|
if explanation.is_empty() {
|
|
REVIEW_FALLBACK_MESSAGE.to_string()
|
|
} else {
|
|
explanation.to_string()
|
|
}
|
|
}
|
|
|
|
pub fn convert_patch_changes(
|
|
changes: &HashMap<std::path::PathBuf, codex_protocol::protocol::FileChange>,
|
|
) -> Vec<FileUpdateChange> {
|
|
let mut converted: Vec<FileUpdateChange> = changes
|
|
.iter()
|
|
.map(|(path, change)| FileUpdateChange {
|
|
path: path.to_string_lossy().into_owned(),
|
|
kind: map_patch_change_kind(change),
|
|
diff: format_file_change_diff(change),
|
|
})
|
|
.collect();
|
|
converted.sort_by(|a, b| a.path.cmp(&b.path));
|
|
converted
|
|
}
|
|
|
|
fn convert_dynamic_tool_content_items(
|
|
items: &[codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem],
|
|
) -> Vec<DynamicToolCallOutputContentItem> {
|
|
items
|
|
.iter()
|
|
.cloned()
|
|
.map(|item| match item {
|
|
codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem::InputText { text } => {
|
|
DynamicToolCallOutputContentItem::InputText { text }
|
|
}
|
|
codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem::InputImage {
|
|
image_url,
|
|
} => DynamicToolCallOutputContentItem::InputImage { image_url },
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
fn map_patch_change_kind(change: &codex_protocol::protocol::FileChange) -> PatchChangeKind {
|
|
match change {
|
|
codex_protocol::protocol::FileChange::Add { .. } => PatchChangeKind::Add,
|
|
codex_protocol::protocol::FileChange::Delete { .. } => PatchChangeKind::Delete,
|
|
codex_protocol::protocol::FileChange::Update { move_path, .. } => PatchChangeKind::Update {
|
|
move_path: move_path.clone(),
|
|
},
|
|
}
|
|
}
|
|
|
|
fn format_file_change_diff(change: &codex_protocol::protocol::FileChange) -> String {
|
|
match change {
|
|
codex_protocol::protocol::FileChange::Add { content } => content.clone(),
|
|
codex_protocol::protocol::FileChange::Delete { content } => content.clone(),
|
|
codex_protocol::protocol::FileChange::Update {
|
|
unified_diff,
|
|
move_path,
|
|
} => {
|
|
if let Some(path) = move_path {
|
|
format!("{unified_diff}\n\nMoved to: {}", path.display())
|
|
} else {
|
|
unified_diff.clone()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn upsert_turn_item(items: &mut Vec<ThreadItem>, item: ThreadItem) {
|
|
if let Some(existing_item) = items
|
|
.iter_mut()
|
|
.find(|existing_item| existing_item.id() == item.id())
|
|
{
|
|
*existing_item = item;
|
|
return;
|
|
}
|
|
items.push(item);
|
|
}
|
|
|
|
struct PendingTurn {
|
|
id: String,
|
|
items: Vec<ThreadItem>,
|
|
error: Option<TurnError>,
|
|
status: TurnStatus,
|
|
/// True when this turn originated from an explicit `turn_started`/`turn_complete`
|
|
/// boundary, so we preserve it even if it has no renderable items.
|
|
opened_explicitly: bool,
|
|
/// True when this turn includes a persisted `RolloutItem::Compacted`, which
|
|
/// should keep the turn from being dropped even without normal items.
|
|
saw_compaction: bool,
|
|
/// Index of the rollout item that opened this turn during replay.
|
|
rollout_start_index: usize,
|
|
}
|
|
|
|
impl PendingTurn {
|
|
fn opened_explicitly(mut self) -> Self {
|
|
self.opened_explicitly = true;
|
|
self
|
|
}
|
|
|
|
fn with_status(mut self, status: TurnStatus) -> Self {
|
|
self.status = status;
|
|
self
|
|
}
|
|
}
|
|
|
|
impl From<PendingTurn> for Turn {
|
|
fn from(value: PendingTurn) -> Self {
|
|
Self {
|
|
id: value.id,
|
|
items: value.items,
|
|
error: value.error,
|
|
status: value.status,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<&PendingTurn> for Turn {
|
|
fn from(value: &PendingTurn) -> Self {
|
|
Self {
|
|
id: value.id.clone(),
|
|
items: value.items.clone(),
|
|
error: value.error.clone(),
|
|
status: value.status.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::protocol::v2::CommandExecutionSource;
|
|
use codex_protocol::ThreadId;
|
|
use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem;
|
|
use codex_protocol::items::HookPromptFragment as CoreHookPromptFragment;
|
|
use codex_protocol::items::TurnItem as CoreTurnItem;
|
|
use codex_protocol::items::UserMessageItem as CoreUserMessageItem;
|
|
use codex_protocol::items::build_hook_prompt_message;
|
|
use codex_protocol::models::MessagePhase as CoreMessagePhase;
|
|
use codex_protocol::models::WebSearchAction as CoreWebSearchAction;
|
|
use codex_protocol::parse_command::ParsedCommand;
|
|
use codex_protocol::protocol::AgentMessageEvent;
|
|
use codex_protocol::protocol::AgentReasoningEvent;
|
|
use codex_protocol::protocol::AgentReasoningRawContentEvent;
|
|
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
|
use codex_protocol::protocol::CodexErrorInfo;
|
|
use codex_protocol::protocol::CompactedItem;
|
|
use codex_protocol::protocol::DynamicToolCallResponseEvent;
|
|
use codex_protocol::protocol::ExecCommandEndEvent;
|
|
use codex_protocol::protocol::ExecCommandSource;
|
|
use codex_protocol::protocol::ItemStartedEvent;
|
|
use codex_protocol::protocol::McpInvocation;
|
|
use codex_protocol::protocol::McpToolCallEndEvent;
|
|
use codex_protocol::protocol::PatchApplyBeginEvent;
|
|
use codex_protocol::protocol::ThreadRolledBackEvent;
|
|
use codex_protocol::protocol::TurnAbortReason;
|
|
use codex_protocol::protocol::TurnAbortedEvent;
|
|
use codex_protocol::protocol::TurnCompleteEvent;
|
|
use codex_protocol::protocol::TurnStartedEvent;
|
|
use codex_protocol::protocol::UserMessageEvent;
|
|
use codex_protocol::protocol::WebSearchEndEvent;
|
|
use pretty_assertions::assert_eq;
|
|
use std::path::PathBuf;
|
|
use std::time::Duration;
|
|
use uuid::Uuid;
|
|
|
|
#[test]
|
|
fn builds_multiple_turns_with_reasoning_items() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "First turn".into(),
|
|
images: Some(vec!["https://example.com/one.png".into()]),
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Hi there".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
EventMsg::AgentReasoning(AgentReasoningEvent {
|
|
text: "thinking".into(),
|
|
}),
|
|
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
|
|
text: "full reasoning".into(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Second turn".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Reply two".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
];
|
|
|
|
let mut builder = ThreadHistoryBuilder::new();
|
|
for event in &events {
|
|
builder.handle_event(event);
|
|
}
|
|
let turns = builder.finish();
|
|
assert_eq!(turns.len(), 2);
|
|
|
|
let first = &turns[0];
|
|
assert!(Uuid::parse_str(&first.id).is_ok());
|
|
assert_eq!(first.status, TurnStatus::Completed);
|
|
assert_eq!(first.items.len(), 3);
|
|
assert_eq!(
|
|
first.items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![
|
|
UserInput::Text {
|
|
text: "First turn".into(),
|
|
text_elements: Vec::new(),
|
|
},
|
|
UserInput::Image {
|
|
url: "https://example.com/one.png".into(),
|
|
}
|
|
],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
first.items[1],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-2".into(),
|
|
text: "Hi there".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}
|
|
);
|
|
assert_eq!(
|
|
first.items[2],
|
|
ThreadItem::Reasoning {
|
|
id: "item-3".into(),
|
|
summary: vec!["thinking".into()],
|
|
content: vec!["full reasoning".into()],
|
|
}
|
|
);
|
|
|
|
let second = &turns[1];
|
|
assert!(Uuid::parse_str(&second.id).is_ok());
|
|
assert_ne!(first.id, second.id);
|
|
assert_eq!(second.items.len(), 2);
|
|
assert_eq!(
|
|
second.items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-4".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Second turn".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
second.items[1],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-5".into(),
|
|
text: "Reply two".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn ignores_non_plan_item_lifecycle_events() {
|
|
let turn_id = "turn-1";
|
|
let thread_id = ThreadId::new();
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: turn_id.to_string(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "hello".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::ItemStarted(ItemStartedEvent {
|
|
thread_id,
|
|
turn_id: turn_id.to_string(),
|
|
item: CoreTurnItem::UserMessage(CoreUserMessageItem {
|
|
id: "user-item-id".to_string(),
|
|
content: Vec::new(),
|
|
}),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: turn_id.to_string(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 1);
|
|
assert_eq!(
|
|
turns[0].items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "hello".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn preserves_agent_message_phase_in_history() {
|
|
let events = vec![EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Final reply".into(),
|
|
phase: Some(CoreMessagePhase::FinalAnswer),
|
|
memory_citation: None,
|
|
})];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(
|
|
turns[0].items[0],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-1".into(),
|
|
text: "Final reply".into(),
|
|
phase: Some(MessagePhase::FinalAnswer),
|
|
memory_citation: None,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn replays_image_generation_end_events_into_turn_history() {
|
|
let items = vec![
|
|
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-image".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
})),
|
|
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
|
message: "generate an image".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
})),
|
|
RolloutItem::EventMsg(EventMsg::ImageGenerationEnd(ImageGenerationEndEvent {
|
|
call_id: "ig_123".into(),
|
|
status: "completed".into(),
|
|
revised_prompt: Some("final prompt".into()),
|
|
result: "Zm9v".into(),
|
|
saved_path: Some("/tmp/ig_123.png".into()),
|
|
})),
|
|
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-image".into(),
|
|
last_agent_message: None,
|
|
})),
|
|
];
|
|
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(
|
|
turns[0],
|
|
Turn {
|
|
id: "turn-image".into(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
items: vec![
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "generate an image".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
ThreadItem::ImageGeneration {
|
|
id: "ig_123".into(),
|
|
status: "completed".into(),
|
|
revised_prompt: Some("final prompt".into()),
|
|
result: "Zm9v".into(),
|
|
saved_path: Some("/tmp/ig_123.png".into()),
|
|
},
|
|
],
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn splits_reasoning_when_interleaved() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Turn start".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentReasoning(AgentReasoningEvent {
|
|
text: "first summary".into(),
|
|
}),
|
|
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
|
|
text: "first content".into(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "interlude".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
EventMsg::AgentReasoning(AgentReasoningEvent {
|
|
text: "second summary".into(),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
let turn = &turns[0];
|
|
assert_eq!(turn.items.len(), 4);
|
|
|
|
assert_eq!(
|
|
turn.items[1],
|
|
ThreadItem::Reasoning {
|
|
id: "item-2".into(),
|
|
summary: vec!["first summary".into()],
|
|
content: vec!["first content".into()],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
turn.items[3],
|
|
ThreadItem::Reasoning {
|
|
id: "item-4".into(),
|
|
summary: vec!["second summary".into()],
|
|
content: Vec::new(),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn marks_turn_as_interrupted_when_aborted() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Please do the thing".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Working...".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
EventMsg::TurnAborted(TurnAbortedEvent {
|
|
turn_id: Some("turn-1".into()),
|
|
reason: TurnAbortReason::Replaced,
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Let's try again".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Second attempt complete.".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
|
|
let first_turn = &turns[0];
|
|
assert_eq!(first_turn.status, TurnStatus::Interrupted);
|
|
assert_eq!(first_turn.items.len(), 2);
|
|
assert_eq!(
|
|
first_turn.items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Please do the thing".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
first_turn.items[1],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-2".into(),
|
|
text: "Working...".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}
|
|
);
|
|
|
|
let second_turn = &turns[1];
|
|
assert_eq!(second_turn.status, TurnStatus::Completed);
|
|
assert_eq!(second_turn.items.len(), 2);
|
|
assert_eq!(
|
|
second_turn.items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-3".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Let's try again".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
second_turn.items[1],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-4".into(),
|
|
text: "Second attempt complete.".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn drops_last_turns_on_thread_rollback() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "First".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A1".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A2".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 1 }),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Third".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A3".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
assert!(Uuid::parse_str(&turns[0].id).is_ok());
|
|
assert!(Uuid::parse_str(&turns[1].id).is_ok());
|
|
assert_ne!(turns[0].id, turns[1].id);
|
|
assert_eq!(turns[0].status, TurnStatus::Completed);
|
|
assert_eq!(turns[1].status, TurnStatus::Completed);
|
|
assert_eq!(
|
|
turns[0].items,
|
|
vec![
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "First".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
ThreadItem::AgentMessage {
|
|
id: "item-2".into(),
|
|
text: "A1".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
]
|
|
);
|
|
assert_eq!(
|
|
turns[1].items,
|
|
vec![
|
|
ThreadItem::UserMessage {
|
|
id: "item-3".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Third".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
ThreadItem::AgentMessage {
|
|
id: "item-4".into(),
|
|
text: "A3".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn thread_rollback_clears_all_turns_when_num_turns_exceeds_history() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "One".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A1".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Two".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A2".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 99 }),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns, Vec::<Turn>::new());
|
|
}
|
|
|
|
#[test]
|
|
fn uses_explicit_turn_boundaries_for_mid_turn_steering() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Start".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Steer".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(
|
|
turns[0].items,
|
|
vec![
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Start".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
ThreadItem::UserMessage {
|
|
id: "item-2".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Steer".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reconstructs_tool_items_from_persisted_completion_events() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-1".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "run tools".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::WebSearchEnd(WebSearchEndEvent {
|
|
call_id: "search-1".into(),
|
|
query: "codex".into(),
|
|
action: CoreWebSearchAction::Search {
|
|
query: Some("codex".into()),
|
|
queries: None,
|
|
},
|
|
}),
|
|
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
|
call_id: "exec-1".into(),
|
|
process_id: Some("pid-1".into()),
|
|
turn_id: "turn-1".into(),
|
|
command: vec!["echo".into(), "hello world".into()],
|
|
cwd: PathBuf::from("/tmp"),
|
|
parsed_cmd: vec![ParsedCommand::Unknown {
|
|
cmd: "echo hello world".into(),
|
|
}],
|
|
source: ExecCommandSource::Agent,
|
|
interaction_input: None,
|
|
stdout: String::new(),
|
|
stderr: String::new(),
|
|
aggregated_output: "hello world\n".into(),
|
|
exit_code: 0,
|
|
duration: Duration::from_millis(12),
|
|
formatted_output: String::new(),
|
|
status: CoreExecCommandStatus::Completed,
|
|
}),
|
|
EventMsg::McpToolCallEnd(McpToolCallEndEvent {
|
|
call_id: "mcp-1".into(),
|
|
invocation: McpInvocation {
|
|
server: "docs".into(),
|
|
tool: "lookup".into(),
|
|
arguments: Some(serde_json::json!({"id":"123"})),
|
|
},
|
|
duration: Duration::from_millis(8),
|
|
result: Err("boom".into()),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 4);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::WebSearch {
|
|
id: "search-1".into(),
|
|
query: "codex".into(),
|
|
action: Some(WebSearchAction::Search {
|
|
query: Some("codex".into()),
|
|
queries: None,
|
|
}),
|
|
}
|
|
);
|
|
assert_eq!(
|
|
turns[0].items[2],
|
|
ThreadItem::CommandExecution {
|
|
id: "exec-1".into(),
|
|
command: "echo 'hello world'".into(),
|
|
cwd: PathBuf::from("/tmp"),
|
|
process_id: Some("pid-1".into()),
|
|
source: CommandExecutionSource::Agent,
|
|
status: CommandExecutionStatus::Completed,
|
|
command_actions: vec![CommandAction::Unknown {
|
|
command: "echo hello world".into(),
|
|
}],
|
|
aggregated_output: Some("hello world\n".into()),
|
|
exit_code: Some(0),
|
|
duration_ms: Some(12),
|
|
}
|
|
);
|
|
assert_eq!(
|
|
turns[0].items[3],
|
|
ThreadItem::McpToolCall {
|
|
id: "mcp-1".into(),
|
|
server: "docs".into(),
|
|
tool: "lookup".into(),
|
|
status: McpToolCallStatus::Failed,
|
|
arguments: serde_json::json!({"id":"123"}),
|
|
result: None,
|
|
error: Some(McpToolCallError {
|
|
message: "boom".into(),
|
|
}),
|
|
duration_ms: Some(8),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reconstructs_dynamic_tool_items_from_request_and_response_events() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-1".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "run dynamic tool".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::DynamicToolCallRequest(
|
|
codex_protocol::dynamic_tools::DynamicToolCallRequest {
|
|
call_id: "dyn-1".into(),
|
|
turn_id: "turn-1".into(),
|
|
tool: "lookup_ticket".into(),
|
|
arguments: serde_json::json!({"id":"ABC-123"}),
|
|
},
|
|
),
|
|
EventMsg::DynamicToolCallResponse(DynamicToolCallResponseEvent {
|
|
call_id: "dyn-1".into(),
|
|
turn_id: "turn-1".into(),
|
|
tool: "lookup_ticket".into(),
|
|
arguments: serde_json::json!({"id":"ABC-123"}),
|
|
content_items: vec![CoreDynamicToolCallOutputContentItem::InputText {
|
|
text: "Ticket is open".into(),
|
|
}],
|
|
success: true,
|
|
error: None,
|
|
duration: Duration::from_millis(42),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 2);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::DynamicToolCall {
|
|
id: "dyn-1".into(),
|
|
tool: "lookup_ticket".into(),
|
|
arguments: serde_json::json!({"id":"ABC-123"}),
|
|
status: DynamicToolCallStatus::Completed,
|
|
content_items: Some(vec![DynamicToolCallOutputContentItem::InputText {
|
|
text: "Ticket is open".into(),
|
|
}]),
|
|
success: Some(true),
|
|
duration_ms: Some(42),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reconstructs_declined_exec_and_patch_items() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-1".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "run tools".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
|
call_id: "exec-declined".into(),
|
|
process_id: Some("pid-2".into()),
|
|
turn_id: "turn-1".into(),
|
|
command: vec!["ls".into()],
|
|
cwd: PathBuf::from("/tmp"),
|
|
parsed_cmd: vec![ParsedCommand::Unknown { cmd: "ls".into() }],
|
|
source: ExecCommandSource::Agent,
|
|
interaction_input: None,
|
|
stdout: String::new(),
|
|
stderr: "exec command rejected by user".into(),
|
|
aggregated_output: "exec command rejected by user".into(),
|
|
exit_code: -1,
|
|
duration: Duration::ZERO,
|
|
formatted_output: String::new(),
|
|
status: CoreExecCommandStatus::Declined,
|
|
}),
|
|
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
|
|
call_id: "patch-declined".into(),
|
|
turn_id: "turn-1".into(),
|
|
stdout: String::new(),
|
|
stderr: "patch rejected by user".into(),
|
|
success: false,
|
|
changes: [(
|
|
PathBuf::from("README.md"),
|
|
codex_protocol::protocol::FileChange::Add {
|
|
content: "hello\n".into(),
|
|
},
|
|
)]
|
|
.into_iter()
|
|
.collect(),
|
|
status: CorePatchApplyStatus::Declined,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 3);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::CommandExecution {
|
|
id: "exec-declined".into(),
|
|
command: "ls".into(),
|
|
cwd: PathBuf::from("/tmp"),
|
|
process_id: Some("pid-2".into()),
|
|
source: CommandExecutionSource::Agent,
|
|
status: CommandExecutionStatus::Declined,
|
|
command_actions: vec![CommandAction::Unknown {
|
|
command: "ls".into(),
|
|
}],
|
|
aggregated_output: Some("exec command rejected by user".into()),
|
|
exit_code: Some(-1),
|
|
duration_ms: Some(0),
|
|
}
|
|
);
|
|
assert_eq!(
|
|
turns[0].items[2],
|
|
ThreadItem::FileChange {
|
|
id: "patch-declined".into(),
|
|
changes: vec![FileUpdateChange {
|
|
path: "README.md".into(),
|
|
kind: PatchChangeKind::Add,
|
|
diff: "hello\n".into(),
|
|
}],
|
|
status: PatchApplyStatus::Declined,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn assigns_late_exec_completion_to_original_turn() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "first".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-b".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
|
call_id: "exec-late".into(),
|
|
process_id: Some("pid-42".into()),
|
|
turn_id: "turn-a".into(),
|
|
command: vec!["echo".into(), "done".into()],
|
|
cwd: PathBuf::from("/tmp"),
|
|
parsed_cmd: vec![ParsedCommand::Unknown {
|
|
cmd: "echo done".into(),
|
|
}],
|
|
source: ExecCommandSource::Agent,
|
|
interaction_input: None,
|
|
stdout: "done\n".into(),
|
|
stderr: String::new(),
|
|
aggregated_output: "done\n".into(),
|
|
exit_code: 0,
|
|
duration: Duration::from_millis(5),
|
|
formatted_output: "done\n".into(),
|
|
status: CoreExecCommandStatus::Completed,
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-b".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[1].id, "turn-b");
|
|
assert_eq!(turns[0].items.len(), 2);
|
|
assert_eq!(turns[1].items.len(), 1);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::CommandExecution {
|
|
id: "exec-late".into(),
|
|
command: "echo done".into(),
|
|
cwd: PathBuf::from("/tmp"),
|
|
process_id: Some("pid-42".into()),
|
|
source: CommandExecutionSource::Agent,
|
|
status: CommandExecutionStatus::Completed,
|
|
command_actions: vec![CommandAction::Unknown {
|
|
command: "echo done".into(),
|
|
}],
|
|
aggregated_output: Some("done\n".into()),
|
|
exit_code: Some(0),
|
|
duration_ms: Some(5),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn drops_late_turn_scoped_item_for_unknown_turn_id() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "first".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-b".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
|
call_id: "exec-unknown-turn".into(),
|
|
process_id: Some("pid-42".into()),
|
|
turn_id: "turn-missing".into(),
|
|
command: vec!["echo".into(), "done".into()],
|
|
cwd: PathBuf::from("/tmp"),
|
|
parsed_cmd: vec![ParsedCommand::Unknown {
|
|
cmd: "echo done".into(),
|
|
}],
|
|
source: ExecCommandSource::Agent,
|
|
interaction_input: None,
|
|
stdout: "done\n".into(),
|
|
stderr: String::new(),
|
|
aggregated_output: "done\n".into(),
|
|
exit_code: 0,
|
|
duration: Duration::from_millis(5),
|
|
formatted_output: "done\n".into(),
|
|
status: CoreExecCommandStatus::Completed,
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-b".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let mut builder = ThreadHistoryBuilder::new();
|
|
for event in &events {
|
|
builder.handle_event(event);
|
|
}
|
|
let turns = builder.finish();
|
|
assert_eq!(turns.len(), 2);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[1].id, "turn-b");
|
|
assert_eq!(turns[0].items.len(), 1);
|
|
assert_eq!(turns[1].items.len(), 1);
|
|
assert_eq!(
|
|
turns[1].items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-2".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "second".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn patch_apply_begin_updates_active_turn_snapshot_with_file_change() {
|
|
let turn_id = "turn-1";
|
|
let mut builder = ThreadHistoryBuilder::new();
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: turn_id.to_string(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "apply patch".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
|
|
call_id: "patch-call".into(),
|
|
turn_id: turn_id.to_string(),
|
|
auto_approved: false,
|
|
changes: [(
|
|
PathBuf::from("README.md"),
|
|
codex_protocol::protocol::FileChange::Add {
|
|
content: "hello\n".into(),
|
|
},
|
|
)]
|
|
.into_iter()
|
|
.collect(),
|
|
}),
|
|
];
|
|
|
|
for event in &events {
|
|
builder.handle_event(event);
|
|
}
|
|
|
|
let snapshot = builder
|
|
.active_turn_snapshot()
|
|
.expect("active turn snapshot");
|
|
assert_eq!(snapshot.id, turn_id);
|
|
assert_eq!(snapshot.status, TurnStatus::InProgress);
|
|
assert_eq!(
|
|
snapshot.items,
|
|
vec![
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "apply patch".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
ThreadItem::FileChange {
|
|
id: "patch-call".into(),
|
|
changes: vec![FileUpdateChange {
|
|
path: "README.md".into(),
|
|
kind: PatchChangeKind::Add,
|
|
diff: "hello\n".into(),
|
|
}],
|
|
status: PatchApplyStatus::InProgress,
|
|
},
|
|
]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn apply_patch_approval_request_updates_active_turn_snapshot_with_file_change() {
|
|
let turn_id = "turn-1";
|
|
let mut builder = ThreadHistoryBuilder::new();
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: turn_id.to_string(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "apply patch".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
|
call_id: "patch-call".into(),
|
|
turn_id: turn_id.to_string(),
|
|
changes: [(
|
|
PathBuf::from("README.md"),
|
|
codex_protocol::protocol::FileChange::Add {
|
|
content: "hello\n".into(),
|
|
},
|
|
)]
|
|
.into_iter()
|
|
.collect(),
|
|
reason: None,
|
|
grant_root: None,
|
|
}),
|
|
];
|
|
|
|
for event in &events {
|
|
builder.handle_event(event);
|
|
}
|
|
|
|
let snapshot = builder
|
|
.active_turn_snapshot()
|
|
.expect("active turn snapshot");
|
|
assert_eq!(snapshot.id, turn_id);
|
|
assert_eq!(snapshot.status, TurnStatus::InProgress);
|
|
assert_eq!(
|
|
snapshot.items,
|
|
vec![
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "apply patch".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
ThreadItem::FileChange {
|
|
id: "patch-call".into(),
|
|
changes: vec![FileUpdateChange {
|
|
path: "README.md".into(),
|
|
kind: PatchChangeKind::Add,
|
|
diff: "hello\n".into(),
|
|
}],
|
|
status: PatchApplyStatus::InProgress,
|
|
},
|
|
]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn late_turn_complete_does_not_close_active_turn() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "first".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-b".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "still in b".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-b".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[1].id, "turn-b");
|
|
assert_eq!(turns[1].items.len(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn late_turn_aborted_does_not_interrupt_active_turn() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "first".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-b".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnAborted(TurnAbortedEvent {
|
|
turn_id: Some("turn-a".into()),
|
|
reason: TurnAbortReason::Replaced,
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "still in b".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[1].id, "turn-b");
|
|
assert_eq!(turns[1].status, TurnStatus::InProgress);
|
|
assert_eq!(turns[1].items.len(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn preserves_compaction_only_turn() {
|
|
let items = vec![
|
|
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-compact".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
})),
|
|
RolloutItem::Compacted(CompactedItem {
|
|
message: String::new(),
|
|
replacement_history: None,
|
|
}),
|
|
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-compact".into(),
|
|
last_agent_message: None,
|
|
})),
|
|
];
|
|
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(
|
|
turns,
|
|
vec![Turn {
|
|
id: "turn-compact".into(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
items: Vec::new(),
|
|
}]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reconstructs_collab_resume_end_item() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "resume agent".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::CollabResumeEnd(codex_protocol::protocol::CollabResumeEndEvent {
|
|
call_id: "resume-1".into(),
|
|
sender_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000001")
|
|
.expect("valid sender thread id"),
|
|
receiver_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000002")
|
|
.expect("valid receiver thread id"),
|
|
receiver_agent_nickname: None,
|
|
receiver_agent_role: None,
|
|
status: AgentStatus::Completed(None),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 2);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::CollabAgentToolCall {
|
|
id: "resume-1".into(),
|
|
tool: CollabAgentTool::ResumeAgent,
|
|
status: CollabAgentToolCallStatus::Completed,
|
|
sender_thread_id: "00000000-0000-0000-0000-000000000001".into(),
|
|
receiver_thread_ids: vec!["00000000-0000-0000-0000-000000000002".into()],
|
|
prompt: None,
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states: [(
|
|
"00000000-0000-0000-0000-000000000002".into(),
|
|
CollabAgentState {
|
|
status: crate::protocol::v2::CollabAgentStatus::Completed,
|
|
message: None,
|
|
},
|
|
)]
|
|
.into_iter()
|
|
.collect(),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reconstructs_collab_spawn_end_item_with_model_metadata() {
|
|
let sender_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000001")
|
|
.expect("valid sender thread id");
|
|
let spawned_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000002")
|
|
.expect("valid receiver thread id");
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "spawn agent".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
|
|
call_id: "spawn-1".into(),
|
|
sender_thread_id,
|
|
new_thread_id: Some(spawned_thread_id),
|
|
new_agent_nickname: Some("Scout".into()),
|
|
new_agent_role: Some("explorer".into()),
|
|
prompt: "inspect the repo".into(),
|
|
model: "gpt-5.4-mini".into(),
|
|
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
|
|
status: AgentStatus::Running,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 2);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::CollabAgentToolCall {
|
|
id: "spawn-1".into(),
|
|
tool: CollabAgentTool::SpawnAgent,
|
|
status: CollabAgentToolCallStatus::Completed,
|
|
sender_thread_id: "00000000-0000-0000-0000-000000000001".into(),
|
|
receiver_thread_ids: vec!["00000000-0000-0000-0000-000000000002".into()],
|
|
prompt: Some("inspect the repo".into()),
|
|
model: Some("gpt-5.4-mini".into()),
|
|
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Medium),
|
|
agents_states: [(
|
|
"00000000-0000-0000-0000-000000000002".into(),
|
|
CollabAgentState {
|
|
status: crate::protocol::v2::CollabAgentStatus::Running,
|
|
message: None,
|
|
},
|
|
)]
|
|
.into_iter()
|
|
.collect(),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reconstructs_interrupted_send_input_as_completed_collab_call() {
|
|
// `send_input(interrupt=true)` first stops the child's active turn, then redirects it with
|
|
// new input. The transient interrupted status should remain visible in agent state, but the
|
|
// collab tool call itself is still a successful redirect rather than a failed operation.
|
|
let sender = ThreadId::try_from("00000000-0000-0000-0000-000000000001")
|
|
.expect("valid sender thread id");
|
|
let receiver = ThreadId::try_from("00000000-0000-0000-0000-000000000002")
|
|
.expect("valid receiver thread id");
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "redirect".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::CollabAgentInteractionBegin(
|
|
codex_protocol::protocol::CollabAgentInteractionBeginEvent {
|
|
call_id: "send-1".into(),
|
|
sender_thread_id: sender,
|
|
receiver_thread_id: receiver,
|
|
prompt: "new task".into(),
|
|
},
|
|
),
|
|
EventMsg::CollabAgentInteractionEnd(
|
|
codex_protocol::protocol::CollabAgentInteractionEndEvent {
|
|
call_id: "send-1".into(),
|
|
sender_thread_id: sender,
|
|
receiver_thread_id: receiver,
|
|
receiver_agent_nickname: None,
|
|
receiver_agent_role: None,
|
|
prompt: "new task".into(),
|
|
status: AgentStatus::Interrupted,
|
|
},
|
|
),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 2);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::CollabAgentToolCall {
|
|
id: "send-1".into(),
|
|
tool: CollabAgentTool::SendInput,
|
|
status: CollabAgentToolCallStatus::Completed,
|
|
sender_thread_id: sender.to_string(),
|
|
receiver_thread_ids: vec![receiver.to_string()],
|
|
prompt: Some("new task".into()),
|
|
model: None,
|
|
reasoning_effort: None,
|
|
agents_states: [(
|
|
receiver.to_string(),
|
|
CollabAgentState {
|
|
status: crate::protocol::v2::CollabAgentStatus::Interrupted,
|
|
message: None,
|
|
},
|
|
)]
|
|
.into_iter()
|
|
.collect(),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn rollback_failed_error_does_not_mark_turn_failed() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "hello".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "done".into(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}),
|
|
EventMsg::Error(ErrorEvent {
|
|
message: "rollback failed".into(),
|
|
codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].status, TurnStatus::Completed);
|
|
assert_eq!(turns[0].error, None);
|
|
}
|
|
|
|
#[test]
|
|
fn out_of_turn_error_does_not_create_or_fail_a_turn() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "hello".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::Error(ErrorEvent {
|
|
message: "request-level failure".into(),
|
|
codex_error_info: Some(CodexErrorInfo::BadRequest),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(
|
|
turns[0],
|
|
Turn {
|
|
id: "turn-a".into(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
items: vec![ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "hello".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}],
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn error_then_turn_complete_preserves_failed_status() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "hello".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::Error(ErrorEvent {
|
|
message: "stream failure".into(),
|
|
codex_error_info: Some(CodexErrorInfo::ResponseStreamDisconnected {
|
|
http_status_code: Some(502),
|
|
}),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[0].status, TurnStatus::Failed);
|
|
assert_eq!(
|
|
turns[0].error,
|
|
Some(TurnError {
|
|
message: "stream failure".into(),
|
|
codex_error_info: Some(
|
|
crate::protocol::v2::CodexErrorInfo::ResponseStreamDisconnected {
|
|
http_status_code: Some(502),
|
|
}
|
|
),
|
|
additional_details: None,
|
|
})
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn rebuilds_hook_prompt_items_from_rollout_response_items() {
|
|
let hook_prompt = build_hook_prompt_message(&[
|
|
CoreHookPromptFragment::from_single_hook("Retry with tests.", "hook-run-1"),
|
|
CoreHookPromptFragment::from_single_hook("Then summarize cleanly.", "hook-run-2"),
|
|
])
|
|
.expect("hook prompt message");
|
|
let items = vec![
|
|
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
})),
|
|
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
|
message: "hello".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
})),
|
|
RolloutItem::ResponseItem(hook_prompt),
|
|
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
})),
|
|
];
|
|
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 2);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::HookPrompt {
|
|
id: turns[0].items[1].id().to_string(),
|
|
fragments: vec![
|
|
crate::protocol::v2::HookPromptFragment {
|
|
text: "Retry with tests.".into(),
|
|
hook_run_id: "hook-run-1".into(),
|
|
},
|
|
crate::protocol::v2::HookPromptFragment {
|
|
text: "Then summarize cleanly.".into(),
|
|
hook_run_id: "hook-run-2".into(),
|
|
},
|
|
],
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn ignores_plain_user_response_items_in_rollout_replay() {
|
|
let items = vec![
|
|
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
})),
|
|
RolloutItem::ResponseItem(codex_protocol::models::ResponseItem::Message {
|
|
id: Some("msg-1".into()),
|
|
role: "user".into(),
|
|
content: vec![codex_protocol::models::ContentItem::InputText {
|
|
text: "plain text".into(),
|
|
}],
|
|
end_turn: None,
|
|
phase: None,
|
|
}),
|
|
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
})),
|
|
];
|
|
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert!(turns[0].items.is_empty());
|
|
}
|
|
}
|