feat: codex exec mapping of collab tools (#9817)

THIS IS NOT THE FINAL UX
This commit is contained in:
jif-oai
2026-01-26 19:01:35 +01:00
committed by GitHub
parent 3ba702c5b6
commit 01d7f8095b
4 changed files with 714 additions and 9 deletions

View File

@@ -3,7 +3,16 @@ use codex_common::elapsed::format_elapsed;
use codex_core::config::Config;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::AgentStatus;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CollabAgentInteractionBeginEvent;
use codex_core::protocol::CollabAgentInteractionEndEvent;
use codex_core::protocol::CollabAgentSpawnBeginEvent;
use codex_core::protocol::CollabAgentSpawnEndEvent;
use codex_core::protocol::CollabCloseBeginEvent;
use codex_core::protocol::CollabCloseEndEvent;
use codex_core::protocol::CollabWaitingBeginEvent;
use codex_core::protocol::CollabWaitingEndEvent;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
@@ -571,15 +580,161 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::ContextCompacted(_) => {
ts_msg!(self, "context compacted");
}
EventMsg::CollabAgentSpawnBegin(_)
| EventMsg::CollabAgentSpawnEnd(_)
| EventMsg::CollabAgentInteractionBegin(_)
| EventMsg::CollabAgentInteractionEnd(_)
| EventMsg::CollabWaitingBegin(_)
| EventMsg::CollabWaitingEnd(_)
| EventMsg::CollabCloseBegin(_)
| EventMsg::CollabCloseEnd(_) => {
// TODO(jif) handle collab tools.
EventMsg::CollabAgentSpawnBegin(CollabAgentSpawnBeginEvent {
call_id,
sender_thread_id: _,
prompt,
}) => {
ts_msg!(
self,
"{} {}",
"collab".style(self.magenta),
format_collab_invocation("spawn_agent", &call_id, Some(&prompt))
.style(self.bold)
);
}
EventMsg::CollabAgentSpawnEnd(CollabAgentSpawnEndEvent {
call_id,
sender_thread_id: _,
new_thread_id,
prompt,
status,
}) => {
let success = new_thread_id.is_some() && !is_collab_status_failure(&status);
let title_style = if success { self.green } else { self.red };
let title = format!(
"{} {}:",
format_collab_invocation("spawn_agent", &call_id, Some(&prompt)),
format_collab_status(&status)
);
ts_msg!(self, "{}", title.style(title_style));
if let Some(new_thread_id) = new_thread_id {
eprintln!(" agent: {}", new_thread_id.to_string().style(self.dimmed));
}
}
EventMsg::CollabAgentInteractionBegin(CollabAgentInteractionBeginEvent {
call_id,
sender_thread_id: _,
receiver_thread_id,
prompt,
}) => {
ts_msg!(
self,
"{} {}",
"collab".style(self.magenta),
format_collab_invocation("send_input", &call_id, Some(&prompt))
.style(self.bold)
);
eprintln!(
" receiver: {}",
receiver_thread_id.to_string().style(self.dimmed)
);
}
EventMsg::CollabAgentInteractionEnd(CollabAgentInteractionEndEvent {
call_id,
sender_thread_id: _,
receiver_thread_id,
prompt,
status,
}) => {
let success = !is_collab_status_failure(&status);
let title_style = if success { self.green } else { self.red };
let title = format!(
"{} {}:",
format_collab_invocation("send_input", &call_id, Some(&prompt)),
format_collab_status(&status)
);
ts_msg!(self, "{}", title.style(title_style));
eprintln!(
" receiver: {}",
receiver_thread_id.to_string().style(self.dimmed)
);
}
EventMsg::CollabWaitingBegin(CollabWaitingBeginEvent {
sender_thread_id: _,
receiver_thread_ids,
call_id,
}) => {
ts_msg!(
self,
"{} {}",
"collab".style(self.magenta),
format_collab_invocation("wait", &call_id, None).style(self.bold)
);
eprintln!(
" receivers: {}",
format_receiver_list(&receiver_thread_ids).style(self.dimmed)
);
}
EventMsg::CollabWaitingEnd(CollabWaitingEndEvent {
sender_thread_id: _,
call_id,
statuses,
}) => {
if statuses.is_empty() {
ts_msg!(
self,
"{} {}:",
format_collab_invocation("wait", &call_id, None),
"timed out".style(self.yellow)
);
return CodexStatus::Running;
}
let success = !statuses.values().any(is_collab_status_failure);
let title_style = if success { self.green } else { self.red };
let title = format!(
"{} {} agents complete:",
format_collab_invocation("wait", &call_id, None),
statuses.len()
);
ts_msg!(self, "{}", title.style(title_style));
let mut sorted = statuses
.into_iter()
.map(|(thread_id, status)| (thread_id.to_string(), status))
.collect::<Vec<_>>();
sorted.sort_by(|(left, _), (right, _)| left.cmp(right));
for (thread_id, status) in sorted {
eprintln!(
" {} {}",
thread_id.style(self.dimmed),
format_collab_status(&status).style(style_for_agent_status(&status, self))
);
}
}
EventMsg::CollabCloseBegin(CollabCloseBeginEvent {
call_id,
sender_thread_id: _,
receiver_thread_id,
}) => {
ts_msg!(
self,
"{} {}",
"collab".style(self.magenta),
format_collab_invocation("close_agent", &call_id, None).style(self.bold)
);
eprintln!(
" receiver: {}",
receiver_thread_id.to_string().style(self.dimmed)
);
}
EventMsg::CollabCloseEnd(CollabCloseEndEvent {
call_id,
sender_thread_id: _,
receiver_thread_id,
status,
}) => {
let success = !is_collab_status_failure(&status);
let title_style = if success { self.green } else { self.red };
let title = format!(
"{} {}:",
format_collab_invocation("close_agent", &call_id, None),
format_collab_status(&status)
);
ts_msg!(self, "{}", title.style(title_style));
eprintln!(
" receiver: {}",
receiver_thread_id.to_string().style(self.dimmed)
);
}
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
EventMsg::WebSearchBegin(_)
@@ -654,6 +809,78 @@ fn format_file_change(change: &FileChange) -> &'static str {
}
}
fn format_collab_invocation(tool: &str, call_id: &str, prompt: Option<&str>) -> String {
let prompt = prompt
.map(str::trim)
.filter(|prompt| !prompt.is_empty())
.map(|prompt| truncate_preview(prompt, 120));
match prompt {
Some(prompt) => format!("{tool}({call_id}, prompt=\"{prompt}\")"),
None => format!("{tool}({call_id})"),
}
}
fn format_collab_status(status: &AgentStatus) -> String {
match status {
AgentStatus::PendingInit => "pending init".to_string(),
AgentStatus::Running => "running".to_string(),
AgentStatus::Completed(Some(message)) => {
let preview = truncate_preview(message.trim(), 120);
if preview.is_empty() {
"completed".to_string()
} else {
format!("completed: \"{preview}\"")
}
}
AgentStatus::Completed(None) => "completed".to_string(),
AgentStatus::Errored(message) => {
let preview = truncate_preview(message.trim(), 120);
if preview.is_empty() {
"errored".to_string()
} else {
format!("errored: \"{preview}\"")
}
}
AgentStatus::Shutdown => "shutdown".to_string(),
AgentStatus::NotFound => "not found".to_string(),
}
}
fn style_for_agent_status(
status: &AgentStatus,
processor: &EventProcessorWithHumanOutput,
) -> Style {
match status {
AgentStatus::PendingInit | AgentStatus::Shutdown => processor.dimmed,
AgentStatus::Running => processor.cyan,
AgentStatus::Completed(_) => processor.green,
AgentStatus::Errored(_) | AgentStatus::NotFound => processor.red,
}
}
fn is_collab_status_failure(status: &AgentStatus) -> bool {
matches!(status, AgentStatus::Errored(_) | AgentStatus::NotFound)
}
fn format_receiver_list(ids: &[codex_protocol::ThreadId]) -> String {
if ids.is_empty() {
return "none".to_string();
}
ids.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ")
}
fn truncate_preview(text: &str, max_chars: usize) -> String {
if text.chars().count() <= max_chars {
return text.to_string();
}
let preview = text.chars().take(max_chars).collect::<String>();
format!("{preview}")
}
fn format_mcp_invocation(invocation: &McpInvocation) -> String {
// Build fully-qualified tool name: server.tool
let fq_tool_name = format!("{}.{}", invocation.server, invocation.tool);

View File

@@ -6,6 +6,11 @@ use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use crate::event_processor::handle_last_message;
use crate::exec_events::AgentMessageItem;
use crate::exec_events::CollabAgentState;
use crate::exec_events::CollabAgentStatus;
use crate::exec_events::CollabTool;
use crate::exec_events::CollabToolCallItem;
use crate::exec_events::CollabToolCallStatus;
use crate::exec_events::CommandExecutionItem;
use crate::exec_events::CommandExecutionStatus;
use crate::exec_events::ErrorItem;
@@ -35,6 +40,15 @@ use crate::exec_events::Usage;
use crate::exec_events::WebSearchItem;
use codex_core::config::Config;
use codex_core::protocol;
use codex_core::protocol::AgentStatus as CoreAgentStatus;
use codex_core::protocol::CollabAgentInteractionBeginEvent;
use codex_core::protocol::CollabAgentInteractionEndEvent;
use codex_core::protocol::CollabAgentSpawnBeginEvent;
use codex_core::protocol::CollabAgentSpawnEndEvent;
use codex_core::protocol::CollabCloseBeginEvent;
use codex_core::protocol::CollabCloseEndEvent;
use codex_core::protocol::CollabWaitingBeginEvent;
use codex_core::protocol::CollabWaitingEndEvent;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
use serde_json::Value as JsonValue;
@@ -51,6 +65,7 @@ pub struct EventProcessorWithJsonOutput {
running_todo_list: Option<RunningTodoList>,
last_total_token_usage: Option<codex_core::protocol::TokenUsage>,
running_mcp_tool_calls: HashMap<String, RunningMcpToolCall>,
running_collab_tool_calls: HashMap<String, RunningCollabToolCall>,
last_critical_error: Option<ThreadErrorEvent>,
}
@@ -75,6 +90,12 @@ struct RunningMcpToolCall {
arguments: JsonValue,
}
#[derive(Debug, Clone)]
struct RunningCollabToolCall {
tool: CollabTool,
item_id: String,
}
impl EventProcessorWithJsonOutput {
pub fn new(last_message_path: Option<PathBuf>) -> Self {
Self {
@@ -85,6 +106,7 @@ impl EventProcessorWithJsonOutput {
running_todo_list: None,
last_total_token_usage: None,
running_mcp_tool_calls: HashMap::new(),
running_collab_tool_calls: HashMap::new(),
last_critical_error: None,
}
}
@@ -102,6 +124,18 @@ impl EventProcessorWithJsonOutput {
}
protocol::EventMsg::McpToolCallBegin(ev) => self.handle_mcp_tool_call_begin(ev),
protocol::EventMsg::McpToolCallEnd(ev) => self.handle_mcp_tool_call_end(ev),
protocol::EventMsg::CollabAgentSpawnBegin(ev) => self.handle_collab_spawn_begin(ev),
protocol::EventMsg::CollabAgentSpawnEnd(ev) => self.handle_collab_spawn_end(ev),
protocol::EventMsg::CollabAgentInteractionBegin(ev) => {
self.handle_collab_interaction_begin(ev)
}
protocol::EventMsg::CollabAgentInteractionEnd(ev) => {
self.handle_collab_interaction_end(ev)
}
protocol::EventMsg::CollabWaitingBegin(ev) => self.handle_collab_wait_begin(ev),
protocol::EventMsg::CollabWaitingEnd(ev) => self.handle_collab_wait_end(ev),
protocol::EventMsg::CollabCloseBegin(ev) => self.handle_collab_close_begin(ev),
protocol::EventMsg::CollabCloseEnd(ev) => self.handle_collab_close_end(ev),
protocol::EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev),
protocol::EventMsg::PatchApplyEnd(ev) => self.handle_patch_apply_end(ev),
protocol::EventMsg::WebSearchBegin(_) => Vec::new(),
@@ -341,6 +375,219 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
fn handle_collab_spawn_begin(&mut self, ev: &CollabAgentSpawnBeginEvent) -> Vec<ThreadEvent> {
self.start_collab_tool_call(
&ev.call_id,
CollabTool::SpawnAgent,
ev.sender_thread_id.to_string(),
Vec::new(),
Some(ev.prompt.clone()),
)
}
fn handle_collab_spawn_end(&mut self, ev: &CollabAgentSpawnEndEvent) -> Vec<ThreadEvent> {
let (receiver_thread_ids, agents_states) = match ev.new_thread_id {
Some(id) => {
let receiver_id = id.to_string();
let agent_state = CollabAgentState::from(ev.status.clone());
(
vec![receiver_id.clone()],
[(receiver_id, agent_state)].into_iter().collect(),
)
}
None => (Vec::new(), HashMap::new()),
};
let status = if ev.new_thread_id.is_some() && !is_collab_failure(&ev.status) {
CollabToolCallStatus::Completed
} else {
CollabToolCallStatus::Failed
};
self.finish_collab_tool_call(
&ev.call_id,
CollabTool::SpawnAgent,
ev.sender_thread_id.to_string(),
receiver_thread_ids,
Some(ev.prompt.clone()),
agents_states,
status,
)
}
fn handle_collab_interaction_begin(
&mut self,
ev: &CollabAgentInteractionBeginEvent,
) -> Vec<ThreadEvent> {
self.start_collab_tool_call(
&ev.call_id,
CollabTool::SendInput,
ev.sender_thread_id.to_string(),
vec![ev.receiver_thread_id.to_string()],
Some(ev.prompt.clone()),
)
}
fn handle_collab_interaction_end(
&mut self,
ev: &CollabAgentInteractionEndEvent,
) -> Vec<ThreadEvent> {
let receiver_id = ev.receiver_thread_id.to_string();
let agent_state = CollabAgentState::from(ev.status.clone());
let status = if is_collab_failure(&ev.status) {
CollabToolCallStatus::Failed
} else {
CollabToolCallStatus::Completed
};
self.finish_collab_tool_call(
&ev.call_id,
CollabTool::SendInput,
ev.sender_thread_id.to_string(),
vec![receiver_id.clone()],
Some(ev.prompt.clone()),
[(receiver_id, agent_state)].into_iter().collect(),
status,
)
}
fn handle_collab_wait_begin(&mut self, ev: &CollabWaitingBeginEvent) -> Vec<ThreadEvent> {
self.start_collab_tool_call(
&ev.call_id,
CollabTool::Wait,
ev.sender_thread_id.to_string(),
ev.receiver_thread_ids
.iter()
.map(ToString::to_string)
.collect(),
None,
)
}
fn handle_collab_wait_end(&mut self, ev: &CollabWaitingEndEvent) -> Vec<ThreadEvent> {
let status = if ev.statuses.values().any(is_collab_failure) {
CollabToolCallStatus::Failed
} else {
CollabToolCallStatus::Completed
};
let mut receiver_thread_ids = ev
.statuses
.keys()
.map(ToString::to_string)
.collect::<Vec<_>>();
receiver_thread_ids.sort();
let agents_states = ev
.statuses
.iter()
.map(|(thread_id, status)| {
(
thread_id.to_string(),
CollabAgentState::from(status.clone()),
)
})
.collect();
self.finish_collab_tool_call(
&ev.call_id,
CollabTool::Wait,
ev.sender_thread_id.to_string(),
receiver_thread_ids,
None,
agents_states,
status,
)
}
fn handle_collab_close_begin(&mut self, ev: &CollabCloseBeginEvent) -> Vec<ThreadEvent> {
self.start_collab_tool_call(
&ev.call_id,
CollabTool::CloseAgent,
ev.sender_thread_id.to_string(),
vec![ev.receiver_thread_id.to_string()],
None,
)
}
fn handle_collab_close_end(&mut self, ev: &CollabCloseEndEvent) -> Vec<ThreadEvent> {
let receiver_id = ev.receiver_thread_id.to_string();
let agent_state = CollabAgentState::from(ev.status.clone());
let status = if is_collab_failure(&ev.status) {
CollabToolCallStatus::Failed
} else {
CollabToolCallStatus::Completed
};
self.finish_collab_tool_call(
&ev.call_id,
CollabTool::CloseAgent,
ev.sender_thread_id.to_string(),
vec![receiver_id.clone()],
None,
[(receiver_id, agent_state)].into_iter().collect(),
status,
)
}
fn start_collab_tool_call(
&mut self,
call_id: &str,
tool: CollabTool,
sender_thread_id: String,
receiver_thread_ids: Vec<String>,
prompt: Option<String>,
) -> Vec<ThreadEvent> {
let item_id = self.get_next_item_id();
self.running_collab_tool_calls.insert(
call_id.to_string(),
RunningCollabToolCall {
tool: tool.clone(),
item_id: item_id.clone(),
},
);
let item = ThreadItem {
id: item_id,
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
tool,
sender_thread_id,
receiver_thread_ids,
prompt,
agents_states: HashMap::new(),
status: CollabToolCallStatus::InProgress,
}),
};
vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })]
}
#[allow(clippy::too_many_arguments)]
fn finish_collab_tool_call(
&mut self,
call_id: &str,
tool: CollabTool,
sender_thread_id: String,
receiver_thread_ids: Vec<String>,
prompt: Option<String>,
agents_states: HashMap<String, CollabAgentState>,
status: CollabToolCallStatus,
) -> Vec<ThreadEvent> {
let (tool, item_id) = match self.running_collab_tool_calls.remove(call_id) {
Some(running) => (running.tool, running.item_id),
None => {
warn!(
call_id,
"Received collab tool end without begin; synthesizing new item"
);
(tool, self.get_next_item_id())
}
};
let item = ThreadItem {
id: item_id,
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
tool,
sender_thread_id,
receiver_thread_ids,
prompt,
agents_states,
status,
}),
};
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
fn handle_patch_apply_begin(
&mut self,
ev: &protocol::PatchApplyBeginEvent,
@@ -512,6 +759,44 @@ impl EventProcessorWithJsonOutput {
}
}
fn is_collab_failure(status: &CoreAgentStatus) -> bool {
matches!(
status,
CoreAgentStatus::Errored(_) | CoreAgentStatus::NotFound
)
}
impl From<CoreAgentStatus> for CollabAgentState {
fn from(value: CoreAgentStatus) -> Self {
match value {
CoreAgentStatus::PendingInit => Self {
status: CollabAgentStatus::PendingInit,
message: None,
},
CoreAgentStatus::Running => Self {
status: CollabAgentStatus::Running,
message: None,
},
CoreAgentStatus::Completed(message) => Self {
status: CollabAgentStatus::Completed,
message,
},
CoreAgentStatus::Errored(message) => Self {
status: CollabAgentStatus::Errored,
message: Some(message),
},
CoreAgentStatus::Shutdown => Self {
status: CollabAgentStatus::Shutdown,
message: None,
},
CoreAgentStatus::NotFound => Self {
status: CollabAgentStatus::NotFound,
message: None,
},
}
}
}
impl EventProcessor for EventProcessorWithJsonOutput {
fn print_config_summary(&mut self, _: &Config, _: &str, ev: &protocol::SessionConfiguredEvent) {
self.process_event(protocol::Event {

View File

@@ -2,6 +2,7 @@ use mcp_types::ContentBlock as McpContentBlock;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use ts_rs::TS;
/// Top-level JSONL events emitted by codex exec
@@ -113,6 +114,9 @@ pub enum ThreadItemDetails {
/// Represents a call to an MCP tool. The item starts when the invocation is
/// dispatched and completes when the MCP server reports success or failure.
McpToolCall(McpToolCallItem),
/// Represents a call to a collab tool. The item starts when the collab tool is
/// invoked and completes when the collab tool reports success or failure.
CollabToolCall(CollabToolCallItem),
/// Captures a web search request. It starts when the search is kicked off
/// and completes when results are returned to the agent.
WebSearch(WebSearchItem),
@@ -198,6 +202,56 @@ pub enum McpToolCallStatus {
Failed,
}
/// The status of a collab tool call.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default, TS)]
#[serde(rename_all = "snake_case")]
pub enum CollabToolCallStatus {
#[default]
InProgress,
Completed,
Failed,
}
/// Supported collab tools.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
#[serde(rename_all = "snake_case")]
pub enum CollabTool {
SpawnAgent,
SendInput,
Wait,
CloseAgent,
}
/// The status of a collab agent.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
#[serde(rename_all = "snake_case")]
pub enum CollabAgentStatus {
PendingInit,
Running,
Completed,
Errored,
Shutdown,
NotFound,
}
/// Last known state of a collab agent.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct CollabAgentState {
pub status: CollabAgentStatus,
pub message: Option<String>,
}
/// A call to a collab tool.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
pub struct CollabToolCallItem {
pub tool: CollabTool,
pub sender_thread_id: String,
pub receiver_thread_ids: Vec<String>,
pub prompt: Option<String>,
pub agents_states: HashMap<String, CollabAgentState>,
pub status: CollabToolCallStatus,
}
/// Result payload produced by an MCP tool invocation.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
pub struct McpToolCallItemResult {

View File

@@ -1,6 +1,10 @@
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::AgentReasoningEvent;
use codex_core::protocol::AgentStatus;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::CollabAgentSpawnBeginEvent;
use codex_core::protocol::CollabAgentSpawnEndEvent;
use codex_core::protocol::CollabWaitingEndEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
@@ -19,6 +23,11 @@ use codex_core::protocol::WarningEvent;
use codex_core::protocol::WebSearchEndEvent;
use codex_exec::event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
use codex_exec::exec_events::AgentMessageItem;
use codex_exec::exec_events::CollabAgentState;
use codex_exec::exec_events::CollabAgentStatus;
use codex_exec::exec_events::CollabTool;
use codex_exec::exec_events::CollabToolCallItem;
use codex_exec::exec_events::CollabToolCallStatus;
use codex_exec::exec_events::CommandExecutionItem;
use codex_exec::exec_events::CommandExecutionStatus;
use codex_exec::exec_events::ErrorItem;
@@ -44,6 +53,7 @@ use codex_exec::exec_events::TurnFailedEvent;
use codex_exec::exec_events::TurnStartedEvent;
use codex_exec::exec_events::Usage;
use codex_exec::exec_events::WebSearchItem;
use codex_protocol::ThreadId;
use codex_protocol::plan_tool::PlanItemArg;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
@@ -444,6 +454,135 @@ fn mcp_tool_call_defaults_arguments_and_preserves_structured_content() {
);
}
#[test]
fn collab_spawn_begin_and_end_emit_item_events() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let sender_thread_id = ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
let new_thread_id = ThreadId::from_string("9e107d9d-372b-4b8c-a2a4-1d9bb3fce0c1").unwrap();
let prompt = "draft a plan".to_string();
let begin = event(
"c1",
EventMsg::CollabAgentSpawnBegin(CollabAgentSpawnBeginEvent {
call_id: "call-10".to_string(),
sender_thread_id,
prompt: prompt.clone(),
}),
);
let begin_events = ep.collect_thread_events(&begin);
assert_eq!(
begin_events,
vec![ThreadEvent::ItemStarted(ItemStartedEvent {
item: ThreadItem {
id: "item_0".to_string(),
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
tool: CollabTool::SpawnAgent,
sender_thread_id: sender_thread_id.to_string(),
receiver_thread_ids: Vec::new(),
prompt: Some(prompt.clone()),
agents_states: std::collections::HashMap::new(),
status: CollabToolCallStatus::InProgress,
}),
},
})]
);
let end = event(
"c2",
EventMsg::CollabAgentSpawnEnd(CollabAgentSpawnEndEvent {
call_id: "call-10".to_string(),
sender_thread_id,
new_thread_id: Some(new_thread_id),
prompt: prompt.clone(),
status: AgentStatus::Running,
}),
);
let end_events = ep.collect_thread_events(&end);
assert_eq!(
end_events,
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
item: ThreadItem {
id: "item_0".to_string(),
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
tool: CollabTool::SpawnAgent,
sender_thread_id: sender_thread_id.to_string(),
receiver_thread_ids: vec![new_thread_id.to_string()],
prompt: Some(prompt),
agents_states: [(
new_thread_id.to_string(),
CollabAgentState {
status: CollabAgentStatus::Running,
message: None,
},
)]
.into_iter()
.collect(),
status: CollabToolCallStatus::Completed,
}),
},
})]
);
}
#[test]
fn collab_wait_end_without_begin_synthesizes_failed_item() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let sender_thread_id = ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
let running_thread_id = ThreadId::from_string("3f76d2a0-943e-4f43-8a38-b289c9c6c3d1").unwrap();
let failed_thread_id = ThreadId::from_string("c1dfd96e-1f0c-4f26-9b4f-1aa02c2d3c4d").unwrap();
let mut receiver_thread_ids = vec![running_thread_id.to_string(), failed_thread_id.to_string()];
receiver_thread_ids.sort();
let mut statuses = std::collections::HashMap::new();
statuses.insert(
running_thread_id,
AgentStatus::Completed(Some("done".to_string())),
);
statuses.insert(failed_thread_id, AgentStatus::Errored("boom".to_string()));
let end = event(
"c3",
EventMsg::CollabWaitingEnd(CollabWaitingEndEvent {
sender_thread_id,
call_id: "call-11".to_string(),
statuses: statuses.clone(),
}),
);
let events = ep.collect_thread_events(&end);
assert_eq!(
events,
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
item: ThreadItem {
id: "item_0".to_string(),
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
tool: CollabTool::Wait,
sender_thread_id: sender_thread_id.to_string(),
receiver_thread_ids,
prompt: None,
agents_states: [
(
running_thread_id.to_string(),
CollabAgentState {
status: CollabAgentStatus::Completed,
message: Some("done".to_string()),
},
),
(
failed_thread_id.to_string(),
CollabAgentState {
status: CollabAgentStatus::Errored,
message: Some("boom".to_string()),
},
),
]
.into_iter()
.collect(),
status: CollabToolCallStatus::Failed,
}),
},
})]
);
}
#[test]
fn plan_update_after_complete_starts_new_todo_list_with_new_id() {
let mut ep = EventProcessorWithJsonOutput::new(None);