mirror of
https://github.com/openai/codex.git
synced 2026-04-25 23:24:55 +00:00
https://github.com/openai/codex/pull/10455 introduced the `phase` field, and then https://github.com/openai/codex/pull/12072 introduced a `MessagePhase` type in `v2.rs` that paralleled the `MessagePhase` type in `codex-rs/protocol/src/models.rs`. The app server protocol prefers `camelCase` while the Responses API uses `snake_case`, so this meant we had two versions of `MessagePhase` with different serialization rules. When the app server protocol refers to types from the Responses API, we use the wire format of the the Responses API even though it is inconsistent with the app server API. This PR deletes `MessagePhase` from `v2.rs` and consolidates on the Responses API version to eliminate confusion.
2162 lines
77 KiB
Rust
2162 lines
77 KiB
Rust
use crate::protocol::v2::CollabAgentState;
|
|
use crate::protocol::v2::CollabAgentTool;
|
|
use crate::protocol::v2::CollabAgentToolCallStatus;
|
|
use crate::protocol::v2::CommandAction;
|
|
use crate::protocol::v2::CommandExecutionStatus;
|
|
use crate::protocol::v2::FileUpdateChange;
|
|
use crate::protocol::v2::McpToolCallError;
|
|
use crate::protocol::v2::McpToolCallResult;
|
|
use crate::protocol::v2::McpToolCallStatus;
|
|
use crate::protocol::v2::PatchApplyStatus;
|
|
use crate::protocol::v2::PatchChangeKind;
|
|
use crate::protocol::v2::ThreadItem;
|
|
use crate::protocol::v2::Turn;
|
|
use crate::protocol::v2::TurnError as V2TurnError;
|
|
use crate::protocol::v2::TurnError;
|
|
use crate::protocol::v2::TurnStatus;
|
|
use crate::protocol::v2::UserInput;
|
|
use crate::protocol::v2::WebSearchAction;
|
|
use codex_protocol::models::MessagePhase;
|
|
use codex_protocol::protocol::AgentReasoningEvent;
|
|
use codex_protocol::protocol::AgentReasoningRawContentEvent;
|
|
use codex_protocol::protocol::AgentStatus;
|
|
use codex_protocol::protocol::CompactedItem;
|
|
use codex_protocol::protocol::ContextCompactedEvent;
|
|
use codex_protocol::protocol::ErrorEvent;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::ExecCommandBeginEvent;
|
|
use codex_protocol::protocol::ExecCommandEndEvent;
|
|
use codex_protocol::protocol::ItemCompletedEvent;
|
|
use codex_protocol::protocol::ItemStartedEvent;
|
|
use codex_protocol::protocol::McpToolCallBeginEvent;
|
|
use codex_protocol::protocol::McpToolCallEndEvent;
|
|
use codex_protocol::protocol::PatchApplyBeginEvent;
|
|
use codex_protocol::protocol::PatchApplyEndEvent;
|
|
use codex_protocol::protocol::ReviewOutputEvent;
|
|
use codex_protocol::protocol::RolloutItem;
|
|
use codex_protocol::protocol::ThreadRolledBackEvent;
|
|
use codex_protocol::protocol::TurnAbortedEvent;
|
|
use codex_protocol::protocol::TurnCompleteEvent;
|
|
use codex_protocol::protocol::TurnStartedEvent;
|
|
use codex_protocol::protocol::UserMessageEvent;
|
|
use codex_protocol::protocol::ViewImageToolCallEvent;
|
|
use codex_protocol::protocol::WebSearchBeginEvent;
|
|
use codex_protocol::protocol::WebSearchEndEvent;
|
|
use std::collections::HashMap;
|
|
use tracing::warn;
|
|
use uuid::Uuid;
|
|
|
|
#[cfg(test)]
|
|
use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus;
|
|
#[cfg(test)]
|
|
use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
|
|
|
|
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values.
|
|
///
|
|
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
|
|
/// resumed/rebuilt thread history preserves the original turn identifiers.
|
|
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
|
|
let mut builder = ThreadHistoryBuilder::new();
|
|
for item in items {
|
|
builder.handle_rollout_item(item);
|
|
}
|
|
builder.finish()
|
|
}
|
|
|
|
pub struct ThreadHistoryBuilder {
|
|
turns: Vec<Turn>,
|
|
current_turn: Option<PendingTurn>,
|
|
next_item_index: i64,
|
|
}
|
|
|
|
impl Default for ThreadHistoryBuilder {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl ThreadHistoryBuilder {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
turns: Vec::new(),
|
|
current_turn: None,
|
|
next_item_index: 1,
|
|
}
|
|
}
|
|
|
|
pub fn reset(&mut self) {
|
|
*self = Self::new();
|
|
}
|
|
|
|
pub fn finish(mut self) -> Vec<Turn> {
|
|
self.finish_current_turn();
|
|
self.turns
|
|
}
|
|
|
|
pub fn active_turn_snapshot(&self) -> Option<Turn> {
|
|
self.current_turn
|
|
.as_ref()
|
|
.map(Turn::from)
|
|
.or_else(|| self.turns.last().cloned())
|
|
}
|
|
|
|
pub fn has_active_turn(&self) -> bool {
|
|
self.current_turn.is_some()
|
|
}
|
|
|
|
/// Shared reducer for persisted rollout replay and in-memory current-turn
|
|
/// tracking used by running thread resume/rejoin.
|
|
///
|
|
/// This function should handle all EventMsg variants that can be persisted in a rollout file.
|
|
/// See `should_persist_event_msg` in `codex-rs/core/rollout/policy.rs`.
|
|
pub fn handle_event(&mut self, event: &EventMsg) {
|
|
match event {
|
|
EventMsg::UserMessage(payload) => self.handle_user_message(payload),
|
|
EventMsg::AgentMessage(payload) => {
|
|
self.handle_agent_message(payload.message.clone(), payload.phase.clone())
|
|
}
|
|
EventMsg::AgentReasoning(payload) => self.handle_agent_reasoning(payload),
|
|
EventMsg::AgentReasoningRawContent(payload) => {
|
|
self.handle_agent_reasoning_raw_content(payload)
|
|
}
|
|
EventMsg::WebSearchBegin(payload) => self.handle_web_search_begin(payload),
|
|
EventMsg::WebSearchEnd(payload) => self.handle_web_search_end(payload),
|
|
EventMsg::ExecCommandBegin(payload) => self.handle_exec_command_begin(payload),
|
|
EventMsg::ExecCommandEnd(payload) => self.handle_exec_command_end(payload),
|
|
EventMsg::PatchApplyBegin(payload) => self.handle_patch_apply_begin(payload),
|
|
EventMsg::PatchApplyEnd(payload) => self.handle_patch_apply_end(payload),
|
|
EventMsg::McpToolCallBegin(payload) => self.handle_mcp_tool_call_begin(payload),
|
|
EventMsg::McpToolCallEnd(payload) => self.handle_mcp_tool_call_end(payload),
|
|
EventMsg::ViewImageToolCall(payload) => self.handle_view_image_tool_call(payload),
|
|
EventMsg::CollabAgentSpawnBegin(payload) => {
|
|
self.handle_collab_agent_spawn_begin(payload)
|
|
}
|
|
EventMsg::CollabAgentSpawnEnd(payload) => self.handle_collab_agent_spawn_end(payload),
|
|
EventMsg::CollabAgentInteractionBegin(payload) => {
|
|
self.handle_collab_agent_interaction_begin(payload)
|
|
}
|
|
EventMsg::CollabAgentInteractionEnd(payload) => {
|
|
self.handle_collab_agent_interaction_end(payload)
|
|
}
|
|
EventMsg::CollabWaitingBegin(payload) => self.handle_collab_waiting_begin(payload),
|
|
EventMsg::CollabWaitingEnd(payload) => self.handle_collab_waiting_end(payload),
|
|
EventMsg::CollabCloseBegin(payload) => self.handle_collab_close_begin(payload),
|
|
EventMsg::CollabCloseEnd(payload) => self.handle_collab_close_end(payload),
|
|
EventMsg::CollabResumeBegin(payload) => self.handle_collab_resume_begin(payload),
|
|
EventMsg::CollabResumeEnd(payload) => self.handle_collab_resume_end(payload),
|
|
EventMsg::ContextCompacted(payload) => self.handle_context_compacted(payload),
|
|
EventMsg::EnteredReviewMode(payload) => self.handle_entered_review_mode(payload),
|
|
EventMsg::ExitedReviewMode(payload) => self.handle_exited_review_mode(payload),
|
|
EventMsg::ItemStarted(payload) => self.handle_item_started(payload),
|
|
EventMsg::ItemCompleted(payload) => self.handle_item_completed(payload),
|
|
EventMsg::Error(payload) => self.handle_error(payload),
|
|
EventMsg::TokenCount(_) => {}
|
|
EventMsg::ThreadRolledBack(payload) => self.handle_thread_rollback(payload),
|
|
EventMsg::UndoCompleted(_) => {}
|
|
EventMsg::TurnAborted(payload) => self.handle_turn_aborted(payload),
|
|
EventMsg::TurnStarted(payload) => self.handle_turn_started(payload),
|
|
EventMsg::TurnComplete(payload) => self.handle_turn_complete(payload),
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
pub fn handle_rollout_item(&mut self, item: &RolloutItem) {
|
|
match item {
|
|
RolloutItem::EventMsg(event) => self.handle_event(event),
|
|
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
|
|
RolloutItem::TurnContext(_)
|
|
| RolloutItem::SessionMeta(_)
|
|
| RolloutItem::ResponseItem(_) => {}
|
|
}
|
|
}
|
|
|
|
fn handle_user_message(&mut self, payload: &UserMessageEvent) {
|
|
// User messages should stay in explicitly opened turns. For backward
|
|
// compatibility with older streams that did not open turns explicitly,
|
|
// close any implicit/inactive turn and start a fresh one for this input.
|
|
if let Some(turn) = self.current_turn.as_ref()
|
|
&& !turn.opened_explicitly
|
|
&& !(turn.saw_compaction && turn.items.is_empty())
|
|
{
|
|
self.finish_current_turn();
|
|
}
|
|
let mut turn = self
|
|
.current_turn
|
|
.take()
|
|
.unwrap_or_else(|| self.new_turn(None));
|
|
let id = self.next_item_id();
|
|
let content = self.build_user_inputs(payload);
|
|
turn.items.push(ThreadItem::UserMessage { id, content });
|
|
self.current_turn = Some(turn);
|
|
}
|
|
|
|
fn handle_agent_message(&mut self, text: String, phase: Option<MessagePhase>) {
|
|
if text.is_empty() {
|
|
return;
|
|
}
|
|
|
|
let id = self.next_item_id();
|
|
self.ensure_turn()
|
|
.items
|
|
.push(ThreadItem::AgentMessage { id, text, phase });
|
|
}
|
|
|
|
fn handle_agent_reasoning(&mut self, payload: &AgentReasoningEvent) {
|
|
if payload.text.is_empty() {
|
|
return;
|
|
}
|
|
|
|
// If the last item is a reasoning item, add the new text to the summary.
|
|
if let Some(ThreadItem::Reasoning { summary, .. }) = self.ensure_turn().items.last_mut() {
|
|
summary.push(payload.text.clone());
|
|
return;
|
|
}
|
|
|
|
// Otherwise, create a new reasoning item.
|
|
let id = self.next_item_id();
|
|
self.ensure_turn().items.push(ThreadItem::Reasoning {
|
|
id,
|
|
summary: vec![payload.text.clone()],
|
|
content: Vec::new(),
|
|
});
|
|
}
|
|
|
|
fn handle_agent_reasoning_raw_content(&mut self, payload: &AgentReasoningRawContentEvent) {
|
|
if payload.text.is_empty() {
|
|
return;
|
|
}
|
|
|
|
// If the last item is a reasoning item, add the new text to the content.
|
|
if let Some(ThreadItem::Reasoning { content, .. }) = self.ensure_turn().items.last_mut() {
|
|
content.push(payload.text.clone());
|
|
return;
|
|
}
|
|
|
|
// Otherwise, create a new reasoning item.
|
|
let id = self.next_item_id();
|
|
self.ensure_turn().items.push(ThreadItem::Reasoning {
|
|
id,
|
|
summary: Vec::new(),
|
|
content: vec![payload.text.clone()],
|
|
});
|
|
}
|
|
|
|
fn handle_item_started(&mut self, payload: &ItemStartedEvent) {
|
|
match &payload.item {
|
|
codex_protocol::items::TurnItem::Plan(plan) => {
|
|
if plan.text.is_empty() {
|
|
return;
|
|
}
|
|
self.upsert_item_in_turn_id(
|
|
&payload.turn_id,
|
|
ThreadItem::from(payload.item.clone()),
|
|
);
|
|
}
|
|
codex_protocol::items::TurnItem::UserMessage(_)
|
|
| codex_protocol::items::TurnItem::AgentMessage(_)
|
|
| codex_protocol::items::TurnItem::Reasoning(_)
|
|
| codex_protocol::items::TurnItem::WebSearch(_)
|
|
| codex_protocol::items::TurnItem::ContextCompaction(_) => {}
|
|
}
|
|
}
|
|
|
|
fn handle_item_completed(&mut self, payload: &ItemCompletedEvent) {
|
|
match &payload.item {
|
|
codex_protocol::items::TurnItem::Plan(plan) => {
|
|
if plan.text.is_empty() {
|
|
return;
|
|
}
|
|
self.upsert_item_in_turn_id(
|
|
&payload.turn_id,
|
|
ThreadItem::from(payload.item.clone()),
|
|
);
|
|
}
|
|
codex_protocol::items::TurnItem::UserMessage(_)
|
|
| codex_protocol::items::TurnItem::AgentMessage(_)
|
|
| codex_protocol::items::TurnItem::Reasoning(_)
|
|
| codex_protocol::items::TurnItem::WebSearch(_)
|
|
| codex_protocol::items::TurnItem::ContextCompaction(_) => {}
|
|
}
|
|
}
|
|
|
|
fn handle_web_search_begin(&mut self, payload: &WebSearchBeginEvent) {
|
|
let item = ThreadItem::WebSearch {
|
|
id: payload.call_id.clone(),
|
|
query: String::new(),
|
|
action: None,
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_web_search_end(&mut self, payload: &WebSearchEndEvent) {
|
|
let item = ThreadItem::WebSearch {
|
|
id: payload.call_id.clone(),
|
|
query: payload.query.clone(),
|
|
action: Some(WebSearchAction::from(payload.action.clone())),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_exec_command_begin(&mut self, payload: &ExecCommandBeginEvent) {
|
|
let command = shlex::try_join(payload.command.iter().map(String::as_str))
|
|
.unwrap_or_else(|_| payload.command.join(" "));
|
|
let command_actions = payload
|
|
.parsed_cmd
|
|
.iter()
|
|
.cloned()
|
|
.map(CommandAction::from)
|
|
.collect();
|
|
let item = ThreadItem::CommandExecution {
|
|
id: payload.call_id.clone(),
|
|
command,
|
|
cwd: payload.cwd.clone(),
|
|
process_id: payload.process_id.clone(),
|
|
status: CommandExecutionStatus::InProgress,
|
|
command_actions,
|
|
aggregated_output: None,
|
|
exit_code: None,
|
|
duration_ms: None,
|
|
};
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
|
|
fn handle_exec_command_end(&mut self, payload: &ExecCommandEndEvent) {
|
|
let status: CommandExecutionStatus = (&payload.status).into();
|
|
let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX);
|
|
let aggregated_output = if payload.aggregated_output.is_empty() {
|
|
None
|
|
} else {
|
|
Some(payload.aggregated_output.clone())
|
|
};
|
|
let command = shlex::try_join(payload.command.iter().map(String::as_str))
|
|
.unwrap_or_else(|_| payload.command.join(" "));
|
|
let command_actions = payload
|
|
.parsed_cmd
|
|
.iter()
|
|
.cloned()
|
|
.map(CommandAction::from)
|
|
.collect();
|
|
let item = ThreadItem::CommandExecution {
|
|
id: payload.call_id.clone(),
|
|
command,
|
|
cwd: payload.cwd.clone(),
|
|
process_id: payload.process_id.clone(),
|
|
status,
|
|
command_actions,
|
|
aggregated_output,
|
|
exit_code: Some(payload.exit_code),
|
|
duration_ms: Some(duration_ms),
|
|
};
|
|
// Command completions can arrive out of order. Unified exec may return
|
|
// while a PTY is still running, then emit ExecCommandEnd later from a
|
|
// background exit watcher when that process finally exits. By then, a
|
|
// newer user turn may already have started. Route by event turn_id so
|
|
// replay preserves the original turn association.
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
|
|
fn handle_patch_apply_begin(&mut self, payload: &PatchApplyBeginEvent) {
|
|
let item = ThreadItem::FileChange {
|
|
id: payload.call_id.clone(),
|
|
changes: convert_patch_changes(&payload.changes),
|
|
status: PatchApplyStatus::InProgress,
|
|
};
|
|
if payload.turn_id.is_empty() {
|
|
self.upsert_item_in_current_turn(item);
|
|
} else {
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
}
|
|
|
|
fn handle_patch_apply_end(&mut self, payload: &PatchApplyEndEvent) {
|
|
let status: PatchApplyStatus = (&payload.status).into();
|
|
let item = ThreadItem::FileChange {
|
|
id: payload.call_id.clone(),
|
|
changes: convert_patch_changes(&payload.changes),
|
|
status,
|
|
};
|
|
if payload.turn_id.is_empty() {
|
|
self.upsert_item_in_current_turn(item);
|
|
} else {
|
|
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
|
}
|
|
}
|
|
|
|
fn handle_mcp_tool_call_begin(&mut self, payload: &McpToolCallBeginEvent) {
|
|
let item = ThreadItem::McpToolCall {
|
|
id: payload.call_id.clone(),
|
|
server: payload.invocation.server.clone(),
|
|
tool: payload.invocation.tool.clone(),
|
|
status: McpToolCallStatus::InProgress,
|
|
arguments: payload
|
|
.invocation
|
|
.arguments
|
|
.clone()
|
|
.unwrap_or(serde_json::Value::Null),
|
|
result: None,
|
|
error: None,
|
|
duration_ms: None,
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_mcp_tool_call_end(&mut self, payload: &McpToolCallEndEvent) {
|
|
let status = if payload.is_success() {
|
|
McpToolCallStatus::Completed
|
|
} else {
|
|
McpToolCallStatus::Failed
|
|
};
|
|
let duration_ms = i64::try_from(payload.duration.as_millis()).ok();
|
|
let (result, error) = match &payload.result {
|
|
Ok(value) => (
|
|
Some(McpToolCallResult {
|
|
content: value.content.clone(),
|
|
structured_content: value.structured_content.clone(),
|
|
}),
|
|
None,
|
|
),
|
|
Err(message) => (
|
|
None,
|
|
Some(McpToolCallError {
|
|
message: message.clone(),
|
|
}),
|
|
),
|
|
};
|
|
let item = ThreadItem::McpToolCall {
|
|
id: payload.call_id.clone(),
|
|
server: payload.invocation.server.clone(),
|
|
tool: payload.invocation.tool.clone(),
|
|
status,
|
|
arguments: payload
|
|
.invocation
|
|
.arguments
|
|
.clone()
|
|
.unwrap_or(serde_json::Value::Null),
|
|
result,
|
|
error,
|
|
duration_ms,
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_view_image_tool_call(&mut self, payload: &ViewImageToolCallEvent) {
|
|
let item = ThreadItem::ImageView {
|
|
id: payload.call_id.clone(),
|
|
path: payload.path.to_string_lossy().into_owned(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_agent_spawn_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabAgentSpawnBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::SpawnAgent,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: Vec::new(),
|
|
prompt: Some(payload.prompt.clone()),
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_agent_spawn_end(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
|
|
) {
|
|
let has_receiver = payload.new_thread_id.is_some();
|
|
let status = match &payload.status {
|
|
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
|
_ if has_receiver => CollabAgentToolCallStatus::Completed,
|
|
_ => CollabAgentToolCallStatus::Failed,
|
|
};
|
|
let (receiver_thread_ids, agents_states) = match &payload.new_thread_id {
|
|
Some(id) => {
|
|
let receiver_id = id.to_string();
|
|
let received_status = CollabAgentState::from(payload.status.clone());
|
|
(
|
|
vec![receiver_id.clone()],
|
|
[(receiver_id, received_status)].into_iter().collect(),
|
|
)
|
|
}
|
|
None => (Vec::new(), HashMap::new()),
|
|
};
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::SpawnAgent,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids,
|
|
prompt: Some(payload.prompt.clone()),
|
|
agents_states,
|
|
});
|
|
}
|
|
|
|
fn handle_collab_agent_interaction_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabAgentInteractionBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::SendInput,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![payload.receiver_thread_id.to_string()],
|
|
prompt: Some(payload.prompt.clone()),
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_agent_interaction_end(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabAgentInteractionEndEvent,
|
|
) {
|
|
let status = match &payload.status {
|
|
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
|
_ => CollabAgentToolCallStatus::Completed,
|
|
};
|
|
let receiver_id = payload.receiver_thread_id.to_string();
|
|
let received_status = CollabAgentState::from(payload.status.clone());
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::SendInput,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![receiver_id.clone()],
|
|
prompt: Some(payload.prompt.clone()),
|
|
agents_states: [(receiver_id, received_status)].into_iter().collect(),
|
|
});
|
|
}
|
|
|
|
fn handle_collab_waiting_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabWaitingBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::Wait,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: payload
|
|
.receiver_thread_ids
|
|
.iter()
|
|
.map(ToString::to_string)
|
|
.collect(),
|
|
prompt: None,
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_waiting_end(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabWaitingEndEvent,
|
|
) {
|
|
let status = if payload
|
|
.statuses
|
|
.values()
|
|
.any(|status| matches!(status, AgentStatus::Errored(_) | AgentStatus::NotFound))
|
|
{
|
|
CollabAgentToolCallStatus::Failed
|
|
} else {
|
|
CollabAgentToolCallStatus::Completed
|
|
};
|
|
let mut receiver_thread_ids: Vec<String> =
|
|
payload.statuses.keys().map(ToString::to_string).collect();
|
|
receiver_thread_ids.sort();
|
|
let agents_states = payload
|
|
.statuses
|
|
.iter()
|
|
.map(|(id, status)| (id.to_string(), CollabAgentState::from(status.clone())))
|
|
.collect();
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::Wait,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids,
|
|
prompt: None,
|
|
agents_states,
|
|
});
|
|
}
|
|
|
|
fn handle_collab_close_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabCloseBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::CloseAgent,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![payload.receiver_thread_id.to_string()],
|
|
prompt: None,
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_close_end(&mut self, payload: &codex_protocol::protocol::CollabCloseEndEvent) {
|
|
let status = match &payload.status {
|
|
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
|
_ => CollabAgentToolCallStatus::Completed,
|
|
};
|
|
let receiver_id = payload.receiver_thread_id.to_string();
|
|
let agents_states = [(
|
|
receiver_id.clone(),
|
|
CollabAgentState::from(payload.status.clone()),
|
|
)]
|
|
.into_iter()
|
|
.collect();
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::CloseAgent,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![receiver_id],
|
|
prompt: None,
|
|
agents_states,
|
|
});
|
|
}
|
|
|
|
fn handle_collab_resume_begin(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabResumeBeginEvent,
|
|
) {
|
|
let item = ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::ResumeAgent,
|
|
status: CollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![payload.receiver_thread_id.to_string()],
|
|
prompt: None,
|
|
agents_states: HashMap::new(),
|
|
};
|
|
self.upsert_item_in_current_turn(item);
|
|
}
|
|
|
|
fn handle_collab_resume_end(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::CollabResumeEndEvent,
|
|
) {
|
|
let status = match &payload.status {
|
|
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
|
_ => CollabAgentToolCallStatus::Completed,
|
|
};
|
|
let receiver_id = payload.receiver_thread_id.to_string();
|
|
let agents_states = [(
|
|
receiver_id.clone(),
|
|
CollabAgentState::from(payload.status.clone()),
|
|
)]
|
|
.into_iter()
|
|
.collect();
|
|
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
|
id: payload.call_id.clone(),
|
|
tool: CollabAgentTool::ResumeAgent,
|
|
status,
|
|
sender_thread_id: payload.sender_thread_id.to_string(),
|
|
receiver_thread_ids: vec![receiver_id],
|
|
prompt: None,
|
|
agents_states,
|
|
});
|
|
}
|
|
|
|
fn handle_context_compacted(&mut self, _payload: &ContextCompactedEvent) {
|
|
let id = self.next_item_id();
|
|
self.ensure_turn()
|
|
.items
|
|
.push(ThreadItem::ContextCompaction { id });
|
|
}
|
|
|
|
fn handle_entered_review_mode(&mut self, payload: &codex_protocol::protocol::ReviewRequest) {
|
|
let review = payload
|
|
.user_facing_hint
|
|
.clone()
|
|
.unwrap_or_else(|| "Review requested.".to_string());
|
|
let id = self.next_item_id();
|
|
self.ensure_turn()
|
|
.items
|
|
.push(ThreadItem::EnteredReviewMode { id, review });
|
|
}
|
|
|
|
fn handle_exited_review_mode(
|
|
&mut self,
|
|
payload: &codex_protocol::protocol::ExitedReviewModeEvent,
|
|
) {
|
|
let review = payload
|
|
.review_output
|
|
.as_ref()
|
|
.map(render_review_output_text)
|
|
.unwrap_or_else(|| REVIEW_FALLBACK_MESSAGE.to_string());
|
|
let id = self.next_item_id();
|
|
self.ensure_turn()
|
|
.items
|
|
.push(ThreadItem::ExitedReviewMode { id, review });
|
|
}
|
|
|
|
fn handle_error(&mut self, payload: &ErrorEvent) {
|
|
if !payload.affects_turn_status() {
|
|
return;
|
|
}
|
|
let Some(turn) = self.current_turn.as_mut() else {
|
|
return;
|
|
};
|
|
turn.status = TurnStatus::Failed;
|
|
turn.error = Some(V2TurnError {
|
|
message: payload.message.clone(),
|
|
codex_error_info: payload.codex_error_info.clone().map(Into::into),
|
|
additional_details: None,
|
|
});
|
|
}
|
|
|
|
fn handle_turn_aborted(&mut self, payload: &TurnAbortedEvent) {
|
|
if let Some(turn_id) = payload.turn_id.as_deref() {
|
|
// Prefer an exact ID match so we interrupt the turn explicitly targeted by the event.
|
|
if let Some(turn) = self.current_turn.as_mut().filter(|turn| turn.id == turn_id) {
|
|
turn.status = TurnStatus::Interrupted;
|
|
return;
|
|
}
|
|
|
|
if let Some(turn) = self.turns.iter_mut().find(|turn| turn.id == turn_id) {
|
|
turn.status = TurnStatus::Interrupted;
|
|
return;
|
|
}
|
|
}
|
|
|
|
// If the event has no ID (or refers to an unknown turn), fall back to the active turn.
|
|
if let Some(turn) = self.current_turn.as_mut() {
|
|
turn.status = TurnStatus::Interrupted;
|
|
}
|
|
}
|
|
|
|
fn handle_turn_started(&mut self, payload: &TurnStartedEvent) {
|
|
self.finish_current_turn();
|
|
self.current_turn = Some(
|
|
self.new_turn(Some(payload.turn_id.clone()))
|
|
.with_status(TurnStatus::InProgress)
|
|
.opened_explicitly(),
|
|
);
|
|
}
|
|
|
|
fn handle_turn_complete(&mut self, payload: &TurnCompleteEvent) {
|
|
let mark_completed = |status: &mut TurnStatus| {
|
|
if matches!(*status, TurnStatus::Completed | TurnStatus::InProgress) {
|
|
*status = TurnStatus::Completed;
|
|
}
|
|
};
|
|
|
|
// Prefer an exact ID match from the active turn and then close it.
|
|
if let Some(current_turn) = self
|
|
.current_turn
|
|
.as_mut()
|
|
.filter(|turn| turn.id == payload.turn_id)
|
|
{
|
|
mark_completed(&mut current_turn.status);
|
|
self.finish_current_turn();
|
|
return;
|
|
}
|
|
|
|
if let Some(turn) = self
|
|
.turns
|
|
.iter_mut()
|
|
.find(|turn| turn.id == payload.turn_id)
|
|
{
|
|
mark_completed(&mut turn.status);
|
|
return;
|
|
}
|
|
|
|
// If the completion event cannot be matched, apply it to the active turn.
|
|
if let Some(current_turn) = self.current_turn.as_mut() {
|
|
mark_completed(&mut current_turn.status);
|
|
self.finish_current_turn();
|
|
}
|
|
}
|
|
|
|
/// Marks the current turn as containing a persisted compaction marker.
|
|
///
|
|
/// This keeps compaction-only legacy turns from being dropped by
|
|
/// `finish_current_turn` when they have no renderable items and were not
|
|
/// explicitly opened.
|
|
fn handle_compacted(&mut self, _payload: &CompactedItem) {
|
|
self.ensure_turn().saw_compaction = true;
|
|
}
|
|
|
|
fn handle_thread_rollback(&mut self, payload: &ThreadRolledBackEvent) {
|
|
self.finish_current_turn();
|
|
|
|
let n = usize::try_from(payload.num_turns).unwrap_or(usize::MAX);
|
|
if n >= self.turns.len() {
|
|
self.turns.clear();
|
|
} else {
|
|
self.turns.truncate(self.turns.len().saturating_sub(n));
|
|
}
|
|
|
|
let item_count: usize = self.turns.iter().map(|t| t.items.len()).sum();
|
|
self.next_item_index = i64::try_from(item_count.saturating_add(1)).unwrap_or(i64::MAX);
|
|
}
|
|
|
|
fn finish_current_turn(&mut self) {
|
|
if let Some(turn) = self.current_turn.take() {
|
|
if turn.items.is_empty() && !turn.opened_explicitly && !turn.saw_compaction {
|
|
return;
|
|
}
|
|
self.turns.push(turn.into());
|
|
}
|
|
}
|
|
|
|
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
|
|
PendingTurn {
|
|
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
|
|
items: Vec::new(),
|
|
error: None,
|
|
status: TurnStatus::Completed,
|
|
opened_explicitly: false,
|
|
saw_compaction: false,
|
|
}
|
|
}
|
|
|
|
fn ensure_turn(&mut self) -> &mut PendingTurn {
|
|
if self.current_turn.is_none() {
|
|
let turn = self.new_turn(None);
|
|
return self.current_turn.insert(turn);
|
|
}
|
|
|
|
if let Some(turn) = self.current_turn.as_mut() {
|
|
return turn;
|
|
}
|
|
|
|
unreachable!("current turn must exist after initialization");
|
|
}
|
|
|
|
fn upsert_item_in_turn_id(&mut self, turn_id: &str, item: ThreadItem) {
|
|
if let Some(turn) = self.current_turn.as_mut()
|
|
&& turn.id == turn_id
|
|
{
|
|
upsert_turn_item(&mut turn.items, item);
|
|
return;
|
|
}
|
|
|
|
if let Some(turn) = self.turns.iter_mut().find(|turn| turn.id == turn_id) {
|
|
upsert_turn_item(&mut turn.items, item);
|
|
return;
|
|
}
|
|
|
|
warn!(
|
|
item_id = item.id(),
|
|
"dropping turn-scoped item for unknown turn id `{turn_id}`"
|
|
);
|
|
}
|
|
|
|
fn upsert_item_in_current_turn(&mut self, item: ThreadItem) {
|
|
let turn = self.ensure_turn();
|
|
upsert_turn_item(&mut turn.items, item);
|
|
}
|
|
|
|
fn next_item_id(&mut self) -> String {
|
|
let id = format!("item-{}", self.next_item_index);
|
|
self.next_item_index += 1;
|
|
id
|
|
}
|
|
|
|
fn build_user_inputs(&self, payload: &UserMessageEvent) -> Vec<UserInput> {
|
|
let mut content = Vec::new();
|
|
if !payload.message.trim().is_empty() {
|
|
content.push(UserInput::Text {
|
|
text: payload.message.clone(),
|
|
text_elements: payload
|
|
.text_elements
|
|
.iter()
|
|
.cloned()
|
|
.map(Into::into)
|
|
.collect(),
|
|
});
|
|
}
|
|
if let Some(images) = &payload.images {
|
|
for image in images {
|
|
content.push(UserInput::Image { url: image.clone() });
|
|
}
|
|
}
|
|
for path in &payload.local_images {
|
|
content.push(UserInput::LocalImage { path: path.clone() });
|
|
}
|
|
content
|
|
}
|
|
}
|
|
|
|
const REVIEW_FALLBACK_MESSAGE: &str = "Reviewer failed to output a response.";
|
|
|
|
fn render_review_output_text(output: &ReviewOutputEvent) -> String {
|
|
let explanation = output.overall_explanation.trim();
|
|
if explanation.is_empty() {
|
|
REVIEW_FALLBACK_MESSAGE.to_string()
|
|
} else {
|
|
explanation.to_string()
|
|
}
|
|
}
|
|
|
|
pub fn convert_patch_changes(
|
|
changes: &HashMap<std::path::PathBuf, codex_protocol::protocol::FileChange>,
|
|
) -> Vec<FileUpdateChange> {
|
|
let mut converted: Vec<FileUpdateChange> = changes
|
|
.iter()
|
|
.map(|(path, change)| FileUpdateChange {
|
|
path: path.to_string_lossy().into_owned(),
|
|
kind: map_patch_change_kind(change),
|
|
diff: format_file_change_diff(change),
|
|
})
|
|
.collect();
|
|
converted.sort_by(|a, b| a.path.cmp(&b.path));
|
|
converted
|
|
}
|
|
|
|
fn map_patch_change_kind(change: &codex_protocol::protocol::FileChange) -> PatchChangeKind {
|
|
match change {
|
|
codex_protocol::protocol::FileChange::Add { .. } => PatchChangeKind::Add,
|
|
codex_protocol::protocol::FileChange::Delete { .. } => PatchChangeKind::Delete,
|
|
codex_protocol::protocol::FileChange::Update { move_path, .. } => PatchChangeKind::Update {
|
|
move_path: move_path.clone(),
|
|
},
|
|
}
|
|
}
|
|
|
|
fn format_file_change_diff(change: &codex_protocol::protocol::FileChange) -> String {
|
|
match change {
|
|
codex_protocol::protocol::FileChange::Add { content } => content.clone(),
|
|
codex_protocol::protocol::FileChange::Delete { content } => content.clone(),
|
|
codex_protocol::protocol::FileChange::Update {
|
|
unified_diff,
|
|
move_path,
|
|
} => {
|
|
if let Some(path) = move_path {
|
|
format!("{unified_diff}\n\nMoved to: {}", path.display())
|
|
} else {
|
|
unified_diff.clone()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn upsert_turn_item(items: &mut Vec<ThreadItem>, item: ThreadItem) {
|
|
if let Some(existing_item) = items
|
|
.iter_mut()
|
|
.find(|existing_item| existing_item.id() == item.id())
|
|
{
|
|
*existing_item = item;
|
|
return;
|
|
}
|
|
items.push(item);
|
|
}
|
|
|
|
struct PendingTurn {
|
|
id: String,
|
|
items: Vec<ThreadItem>,
|
|
error: Option<TurnError>,
|
|
status: TurnStatus,
|
|
/// True when this turn originated from an explicit `turn_started`/`turn_complete`
|
|
/// boundary, so we preserve it even if it has no renderable items.
|
|
opened_explicitly: bool,
|
|
/// True when this turn includes a persisted `RolloutItem::Compacted`, which
|
|
/// should keep the turn from being dropped even without normal items.
|
|
saw_compaction: bool,
|
|
}
|
|
|
|
impl PendingTurn {
|
|
fn opened_explicitly(mut self) -> Self {
|
|
self.opened_explicitly = true;
|
|
self
|
|
}
|
|
|
|
fn with_status(mut self, status: TurnStatus) -> Self {
|
|
self.status = status;
|
|
self
|
|
}
|
|
}
|
|
|
|
impl From<PendingTurn> for Turn {
|
|
fn from(value: PendingTurn) -> Self {
|
|
Self {
|
|
id: value.id,
|
|
items: value.items,
|
|
error: value.error,
|
|
status: value.status,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<&PendingTurn> for Turn {
|
|
fn from(value: &PendingTurn) -> Self {
|
|
Self {
|
|
id: value.id.clone(),
|
|
items: value.items.clone(),
|
|
error: value.error.clone(),
|
|
status: value.status.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use codex_protocol::ThreadId;
|
|
use codex_protocol::items::TurnItem as CoreTurnItem;
|
|
use codex_protocol::items::UserMessageItem as CoreUserMessageItem;
|
|
use codex_protocol::models::MessagePhase as CoreMessagePhase;
|
|
use codex_protocol::models::WebSearchAction as CoreWebSearchAction;
|
|
use codex_protocol::parse_command::ParsedCommand;
|
|
use codex_protocol::protocol::AgentMessageEvent;
|
|
use codex_protocol::protocol::AgentReasoningEvent;
|
|
use codex_protocol::protocol::AgentReasoningRawContentEvent;
|
|
use codex_protocol::protocol::CodexErrorInfo;
|
|
use codex_protocol::protocol::CompactedItem;
|
|
use codex_protocol::protocol::ExecCommandEndEvent;
|
|
use codex_protocol::protocol::ExecCommandSource;
|
|
use codex_protocol::protocol::ItemStartedEvent;
|
|
use codex_protocol::protocol::McpInvocation;
|
|
use codex_protocol::protocol::McpToolCallEndEvent;
|
|
use codex_protocol::protocol::ThreadRolledBackEvent;
|
|
use codex_protocol::protocol::TurnAbortReason;
|
|
use codex_protocol::protocol::TurnAbortedEvent;
|
|
use codex_protocol::protocol::TurnCompleteEvent;
|
|
use codex_protocol::protocol::TurnStartedEvent;
|
|
use codex_protocol::protocol::UserMessageEvent;
|
|
use codex_protocol::protocol::WebSearchEndEvent;
|
|
use pretty_assertions::assert_eq;
|
|
use std::path::PathBuf;
|
|
use std::time::Duration;
|
|
use uuid::Uuid;
|
|
|
|
#[test]
|
|
fn builds_multiple_turns_with_reasoning_items() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "First turn".into(),
|
|
images: Some(vec!["https://example.com/one.png".into()]),
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Hi there".into(),
|
|
phase: None,
|
|
}),
|
|
EventMsg::AgentReasoning(AgentReasoningEvent {
|
|
text: "thinking".into(),
|
|
}),
|
|
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
|
|
text: "full reasoning".into(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Second turn".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Reply two".into(),
|
|
phase: None,
|
|
}),
|
|
];
|
|
|
|
let mut builder = ThreadHistoryBuilder::new();
|
|
for event in &events {
|
|
builder.handle_event(event);
|
|
}
|
|
let turns = builder.finish();
|
|
assert_eq!(turns.len(), 2);
|
|
|
|
let first = &turns[0];
|
|
assert!(Uuid::parse_str(&first.id).is_ok());
|
|
assert_eq!(first.status, TurnStatus::Completed);
|
|
assert_eq!(first.items.len(), 3);
|
|
assert_eq!(
|
|
first.items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![
|
|
UserInput::Text {
|
|
text: "First turn".into(),
|
|
text_elements: Vec::new(),
|
|
},
|
|
UserInput::Image {
|
|
url: "https://example.com/one.png".into(),
|
|
}
|
|
],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
first.items[1],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-2".into(),
|
|
text: "Hi there".into(),
|
|
phase: None,
|
|
}
|
|
);
|
|
assert_eq!(
|
|
first.items[2],
|
|
ThreadItem::Reasoning {
|
|
id: "item-3".into(),
|
|
summary: vec!["thinking".into()],
|
|
content: vec!["full reasoning".into()],
|
|
}
|
|
);
|
|
|
|
let second = &turns[1];
|
|
assert!(Uuid::parse_str(&second.id).is_ok());
|
|
assert_ne!(first.id, second.id);
|
|
assert_eq!(second.items.len(), 2);
|
|
assert_eq!(
|
|
second.items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-4".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Second turn".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
second.items[1],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-5".into(),
|
|
text: "Reply two".into(),
|
|
phase: None,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn ignores_non_plan_item_lifecycle_events() {
|
|
let turn_id = "turn-1";
|
|
let thread_id = ThreadId::new();
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: turn_id.to_string(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "hello".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::ItemStarted(ItemStartedEvent {
|
|
thread_id,
|
|
turn_id: turn_id.to_string(),
|
|
item: CoreTurnItem::UserMessage(CoreUserMessageItem {
|
|
id: "user-item-id".to_string(),
|
|
content: Vec::new(),
|
|
}),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: turn_id.to_string(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 1);
|
|
assert_eq!(
|
|
turns[0].items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "hello".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn preserves_agent_message_phase_in_history() {
|
|
let events = vec![EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Final reply".into(),
|
|
phase: Some(CoreMessagePhase::FinalAnswer),
|
|
})];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(
|
|
turns[0].items[0],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-1".into(),
|
|
text: "Final reply".into(),
|
|
phase: Some(MessagePhase::FinalAnswer),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn splits_reasoning_when_interleaved() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Turn start".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentReasoning(AgentReasoningEvent {
|
|
text: "first summary".into(),
|
|
}),
|
|
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
|
|
text: "first content".into(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "interlude".into(),
|
|
phase: None,
|
|
}),
|
|
EventMsg::AgentReasoning(AgentReasoningEvent {
|
|
text: "second summary".into(),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
let turn = &turns[0];
|
|
assert_eq!(turn.items.len(), 4);
|
|
|
|
assert_eq!(
|
|
turn.items[1],
|
|
ThreadItem::Reasoning {
|
|
id: "item-2".into(),
|
|
summary: vec!["first summary".into()],
|
|
content: vec!["first content".into()],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
turn.items[3],
|
|
ThreadItem::Reasoning {
|
|
id: "item-4".into(),
|
|
summary: vec!["second summary".into()],
|
|
content: Vec::new(),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn marks_turn_as_interrupted_when_aborted() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Please do the thing".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Working...".into(),
|
|
phase: None,
|
|
}),
|
|
EventMsg::TurnAborted(TurnAbortedEvent {
|
|
turn_id: Some("turn-1".into()),
|
|
reason: TurnAbortReason::Replaced,
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Let's try again".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Second attempt complete.".into(),
|
|
phase: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
|
|
let first_turn = &turns[0];
|
|
assert_eq!(first_turn.status, TurnStatus::Interrupted);
|
|
assert_eq!(first_turn.items.len(), 2);
|
|
assert_eq!(
|
|
first_turn.items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Please do the thing".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
first_turn.items[1],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-2".into(),
|
|
text: "Working...".into(),
|
|
phase: None,
|
|
}
|
|
);
|
|
|
|
let second_turn = &turns[1];
|
|
assert_eq!(second_turn.status, TurnStatus::Completed);
|
|
assert_eq!(second_turn.items.len(), 2);
|
|
assert_eq!(
|
|
second_turn.items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-3".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Let's try again".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
assert_eq!(
|
|
second_turn.items[1],
|
|
ThreadItem::AgentMessage {
|
|
id: "item-4".into(),
|
|
text: "Second attempt complete.".into(),
|
|
phase: None,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn drops_last_turns_on_thread_rollback() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "First".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A1".into(),
|
|
phase: None,
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A2".into(),
|
|
phase: None,
|
|
}),
|
|
EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 1 }),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Third".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A3".into(),
|
|
phase: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
assert!(Uuid::parse_str(&turns[0].id).is_ok());
|
|
assert!(Uuid::parse_str(&turns[1].id).is_ok());
|
|
assert_ne!(turns[0].id, turns[1].id);
|
|
assert_eq!(turns[0].status, TurnStatus::Completed);
|
|
assert_eq!(turns[1].status, TurnStatus::Completed);
|
|
assert_eq!(
|
|
turns[0].items,
|
|
vec![
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "First".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
ThreadItem::AgentMessage {
|
|
id: "item-2".into(),
|
|
text: "A1".into(),
|
|
phase: None,
|
|
},
|
|
]
|
|
);
|
|
assert_eq!(
|
|
turns[1].items,
|
|
vec![
|
|
ThreadItem::UserMessage {
|
|
id: "item-3".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Third".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
ThreadItem::AgentMessage {
|
|
id: "item-4".into(),
|
|
text: "A3".into(),
|
|
phase: None,
|
|
},
|
|
]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn thread_rollback_clears_all_turns_when_num_turns_exceeds_history() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "One".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A1".into(),
|
|
phase: None,
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Two".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "A2".into(),
|
|
phase: None,
|
|
}),
|
|
EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 99 }),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns, Vec::<Turn>::new());
|
|
}
|
|
|
|
#[test]
|
|
fn uses_explicit_turn_boundaries_for_mid_turn_steering() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Start".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "Steer".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(
|
|
turns[0].items,
|
|
vec![
|
|
ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Start".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
ThreadItem::UserMessage {
|
|
id: "item-2".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "Steer".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
},
|
|
]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reconstructs_tool_items_from_persisted_completion_events() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-1".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "run tools".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::WebSearchEnd(WebSearchEndEvent {
|
|
call_id: "search-1".into(),
|
|
query: "codex".into(),
|
|
action: CoreWebSearchAction::Search {
|
|
query: Some("codex".into()),
|
|
queries: None,
|
|
},
|
|
}),
|
|
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
|
call_id: "exec-1".into(),
|
|
process_id: Some("pid-1".into()),
|
|
turn_id: "turn-1".into(),
|
|
command: vec!["echo".into(), "hello world".into()],
|
|
cwd: PathBuf::from("/tmp"),
|
|
parsed_cmd: vec![ParsedCommand::Unknown {
|
|
cmd: "echo hello world".into(),
|
|
}],
|
|
source: ExecCommandSource::Agent,
|
|
interaction_input: None,
|
|
stdout: String::new(),
|
|
stderr: String::new(),
|
|
aggregated_output: "hello world\n".into(),
|
|
exit_code: 0,
|
|
duration: Duration::from_millis(12),
|
|
formatted_output: String::new(),
|
|
status: CoreExecCommandStatus::Completed,
|
|
}),
|
|
EventMsg::McpToolCallEnd(McpToolCallEndEvent {
|
|
call_id: "mcp-1".into(),
|
|
invocation: McpInvocation {
|
|
server: "docs".into(),
|
|
tool: "lookup".into(),
|
|
arguments: Some(serde_json::json!({"id":"123"})),
|
|
},
|
|
duration: Duration::from_millis(8),
|
|
result: Err("boom".into()),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 4);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::WebSearch {
|
|
id: "search-1".into(),
|
|
query: "codex".into(),
|
|
action: Some(WebSearchAction::Search {
|
|
query: Some("codex".into()),
|
|
queries: None,
|
|
}),
|
|
}
|
|
);
|
|
assert_eq!(
|
|
turns[0].items[2],
|
|
ThreadItem::CommandExecution {
|
|
id: "exec-1".into(),
|
|
command: "echo 'hello world'".into(),
|
|
cwd: PathBuf::from("/tmp"),
|
|
process_id: Some("pid-1".into()),
|
|
status: CommandExecutionStatus::Completed,
|
|
command_actions: vec![CommandAction::Unknown {
|
|
command: "echo hello world".into(),
|
|
}],
|
|
aggregated_output: Some("hello world\n".into()),
|
|
exit_code: Some(0),
|
|
duration_ms: Some(12),
|
|
}
|
|
);
|
|
assert_eq!(
|
|
turns[0].items[3],
|
|
ThreadItem::McpToolCall {
|
|
id: "mcp-1".into(),
|
|
server: "docs".into(),
|
|
tool: "lookup".into(),
|
|
status: McpToolCallStatus::Failed,
|
|
arguments: serde_json::json!({"id":"123"}),
|
|
result: None,
|
|
error: Some(McpToolCallError {
|
|
message: "boom".into(),
|
|
}),
|
|
duration_ms: Some(8),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reconstructs_declined_exec_and_patch_items() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-1".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "run tools".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
|
call_id: "exec-declined".into(),
|
|
process_id: Some("pid-2".into()),
|
|
turn_id: "turn-1".into(),
|
|
command: vec!["ls".into()],
|
|
cwd: PathBuf::from("/tmp"),
|
|
parsed_cmd: vec![ParsedCommand::Unknown { cmd: "ls".into() }],
|
|
source: ExecCommandSource::Agent,
|
|
interaction_input: None,
|
|
stdout: String::new(),
|
|
stderr: "exec command rejected by user".into(),
|
|
aggregated_output: "exec command rejected by user".into(),
|
|
exit_code: -1,
|
|
duration: Duration::ZERO,
|
|
formatted_output: String::new(),
|
|
status: CoreExecCommandStatus::Declined,
|
|
}),
|
|
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
|
|
call_id: "patch-declined".into(),
|
|
turn_id: "turn-1".into(),
|
|
stdout: String::new(),
|
|
stderr: "patch rejected by user".into(),
|
|
success: false,
|
|
changes: [(
|
|
PathBuf::from("README.md"),
|
|
codex_protocol::protocol::FileChange::Add {
|
|
content: "hello\n".into(),
|
|
},
|
|
)]
|
|
.into_iter()
|
|
.collect(),
|
|
status: CorePatchApplyStatus::Declined,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 3);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::CommandExecution {
|
|
id: "exec-declined".into(),
|
|
command: "ls".into(),
|
|
cwd: PathBuf::from("/tmp"),
|
|
process_id: Some("pid-2".into()),
|
|
status: CommandExecutionStatus::Declined,
|
|
command_actions: vec![CommandAction::Unknown {
|
|
command: "ls".into(),
|
|
}],
|
|
aggregated_output: Some("exec command rejected by user".into()),
|
|
exit_code: Some(-1),
|
|
duration_ms: Some(0),
|
|
}
|
|
);
|
|
assert_eq!(
|
|
turns[0].items[2],
|
|
ThreadItem::FileChange {
|
|
id: "patch-declined".into(),
|
|
changes: vec![FileUpdateChange {
|
|
path: "README.md".into(),
|
|
kind: PatchChangeKind::Add,
|
|
diff: "hello\n".into(),
|
|
}],
|
|
status: PatchApplyStatus::Declined,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn assigns_late_exec_completion_to_original_turn() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "first".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-b".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
|
call_id: "exec-late".into(),
|
|
process_id: Some("pid-42".into()),
|
|
turn_id: "turn-a".into(),
|
|
command: vec!["echo".into(), "done".into()],
|
|
cwd: PathBuf::from("/tmp"),
|
|
parsed_cmd: vec![ParsedCommand::Unknown {
|
|
cmd: "echo done".into(),
|
|
}],
|
|
source: ExecCommandSource::Agent,
|
|
interaction_input: None,
|
|
stdout: "done\n".into(),
|
|
stderr: String::new(),
|
|
aggregated_output: "done\n".into(),
|
|
exit_code: 0,
|
|
duration: Duration::from_millis(5),
|
|
formatted_output: "done\n".into(),
|
|
status: CoreExecCommandStatus::Completed,
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-b".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[1].id, "turn-b");
|
|
assert_eq!(turns[0].items.len(), 2);
|
|
assert_eq!(turns[1].items.len(), 1);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::CommandExecution {
|
|
id: "exec-late".into(),
|
|
command: "echo done".into(),
|
|
cwd: PathBuf::from("/tmp"),
|
|
process_id: Some("pid-42".into()),
|
|
status: CommandExecutionStatus::Completed,
|
|
command_actions: vec![CommandAction::Unknown {
|
|
command: "echo done".into(),
|
|
}],
|
|
aggregated_output: Some("done\n".into()),
|
|
exit_code: Some(0),
|
|
duration_ms: Some(5),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn drops_late_turn_scoped_item_for_unknown_turn_id() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "first".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-b".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
|
call_id: "exec-unknown-turn".into(),
|
|
process_id: Some("pid-42".into()),
|
|
turn_id: "turn-missing".into(),
|
|
command: vec!["echo".into(), "done".into()],
|
|
cwd: PathBuf::from("/tmp"),
|
|
parsed_cmd: vec![ParsedCommand::Unknown {
|
|
cmd: "echo done".into(),
|
|
}],
|
|
source: ExecCommandSource::Agent,
|
|
interaction_input: None,
|
|
stdout: "done\n".into(),
|
|
stderr: String::new(),
|
|
aggregated_output: "done\n".into(),
|
|
exit_code: 0,
|
|
duration: Duration::from_millis(5),
|
|
formatted_output: "done\n".into(),
|
|
status: CoreExecCommandStatus::Completed,
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-b".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let mut builder = ThreadHistoryBuilder::new();
|
|
for event in &events {
|
|
builder.handle_event(event);
|
|
}
|
|
let turns = builder.finish();
|
|
assert_eq!(turns.len(), 2);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[1].id, "turn-b");
|
|
assert_eq!(turns[0].items.len(), 1);
|
|
assert_eq!(turns[1].items.len(), 1);
|
|
assert_eq!(
|
|
turns[1].items[0],
|
|
ThreadItem::UserMessage {
|
|
id: "item-2".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "second".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn late_turn_complete_does_not_close_active_turn() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "first".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-b".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "still in b".into(),
|
|
phase: None,
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-b".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[1].id, "turn-b");
|
|
assert_eq!(turns[1].items.len(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn late_turn_aborted_does_not_interrupt_active_turn() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "first".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-b".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "second".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnAborted(TurnAbortedEvent {
|
|
turn_id: Some("turn-a".into()),
|
|
reason: TurnAbortReason::Replaced,
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "still in b".into(),
|
|
phase: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 2);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[1].id, "turn-b");
|
|
assert_eq!(turns[1].status, TurnStatus::InProgress);
|
|
assert_eq!(turns[1].items.len(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn preserves_compaction_only_turn() {
|
|
let items = vec![
|
|
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-compact".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
})),
|
|
RolloutItem::Compacted(CompactedItem {
|
|
message: String::new(),
|
|
replacement_history: None,
|
|
}),
|
|
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-compact".into(),
|
|
last_agent_message: None,
|
|
})),
|
|
];
|
|
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(
|
|
turns,
|
|
vec![Turn {
|
|
id: "turn-compact".into(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
items: Vec::new(),
|
|
}]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reconstructs_collab_resume_end_item() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "resume agent".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::CollabResumeEnd(codex_protocol::protocol::CollabResumeEndEvent {
|
|
call_id: "resume-1".into(),
|
|
sender_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000001")
|
|
.expect("valid sender thread id"),
|
|
receiver_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000002")
|
|
.expect("valid receiver thread id"),
|
|
receiver_agent_nickname: None,
|
|
receiver_agent_role: None,
|
|
status: AgentStatus::Completed(None),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].items.len(), 2);
|
|
assert_eq!(
|
|
turns[0].items[1],
|
|
ThreadItem::CollabAgentToolCall {
|
|
id: "resume-1".into(),
|
|
tool: CollabAgentTool::ResumeAgent,
|
|
status: CollabAgentToolCallStatus::Completed,
|
|
sender_thread_id: "00000000-0000-0000-0000-000000000001".into(),
|
|
receiver_thread_ids: vec!["00000000-0000-0000-0000-000000000002".into()],
|
|
prompt: None,
|
|
agents_states: [(
|
|
"00000000-0000-0000-0000-000000000002".into(),
|
|
CollabAgentState {
|
|
status: crate::protocol::v2::CollabAgentStatus::Completed,
|
|
message: None,
|
|
},
|
|
)]
|
|
.into_iter()
|
|
.collect(),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn rollback_failed_error_does_not_mark_turn_failed() {
|
|
let events = vec![
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "hello".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "done".into(),
|
|
phase: None,
|
|
}),
|
|
EventMsg::Error(ErrorEvent {
|
|
message: "rollback failed".into(),
|
|
codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].status, TurnStatus::Completed);
|
|
assert_eq!(turns[0].error, None);
|
|
}
|
|
|
|
#[test]
|
|
fn out_of_turn_error_does_not_create_or_fail_a_turn() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "hello".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
EventMsg::Error(ErrorEvent {
|
|
message: "request-level failure".into(),
|
|
codex_error_info: Some(CodexErrorInfo::BadRequest),
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(
|
|
turns[0],
|
|
Turn {
|
|
id: "turn-a".into(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
items: vec![ThreadItem::UserMessage {
|
|
id: "item-1".into(),
|
|
content: vec![UserInput::Text {
|
|
text: "hello".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
}],
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn error_then_turn_complete_preserves_failed_status() {
|
|
let events = vec![
|
|
EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: "turn-a".into(),
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}),
|
|
EventMsg::UserMessage(UserMessageEvent {
|
|
message: "hello".into(),
|
|
images: None,
|
|
text_elements: Vec::new(),
|
|
local_images: Vec::new(),
|
|
}),
|
|
EventMsg::Error(ErrorEvent {
|
|
message: "stream failure".into(),
|
|
codex_error_info: Some(CodexErrorInfo::ResponseStreamDisconnected {
|
|
http_status_code: Some(502),
|
|
}),
|
|
}),
|
|
EventMsg::TurnComplete(TurnCompleteEvent {
|
|
turn_id: "turn-a".into(),
|
|
last_agent_message: None,
|
|
}),
|
|
];
|
|
|
|
let items = events
|
|
.into_iter()
|
|
.map(RolloutItem::EventMsg)
|
|
.collect::<Vec<_>>();
|
|
let turns = build_turns_from_rollout_items(&items);
|
|
assert_eq!(turns.len(), 1);
|
|
assert_eq!(turns[0].id, "turn-a");
|
|
assert_eq!(turns[0].status, TurnStatus::Failed);
|
|
assert_eq!(
|
|
turns[0].error,
|
|
Some(TurnError {
|
|
message: "stream failure".into(),
|
|
codex_error_info: Some(
|
|
crate::protocol::v2::CodexErrorInfo::ResponseStreamDisconnected {
|
|
http_status_code: Some(502),
|
|
}
|
|
),
|
|
additional_details: None,
|
|
})
|
|
);
|
|
}
|
|
}
|