mirror of
https://github.com/openai/codex.git
synced 2026-04-19 20:24:50 +00:00
Compare commits
2 Commits
fcoury/mar
...
spawn-agen
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c924b3e3e | ||
|
|
478afb40dd |
@@ -575,6 +575,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -5756,6 +5756,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -2404,6 +2404,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -52,6 +52,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -52,6 +52,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -188,6 +188,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -253,6 +253,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -191,6 +191,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -191,6 +191,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -191,6 +191,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -253,6 +253,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -191,6 +191,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -253,6 +253,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -191,6 +191,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -191,6 +191,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -188,6 +188,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -188,6 +188,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -188,6 +188,7 @@
|
||||
"CollabAgentTool": {
|
||||
"enum": [
|
||||
"spawnAgent",
|
||||
"spawnAgentsOnCsv",
|
||||
"sendInput",
|
||||
"resumeAgent",
|
||||
"wait",
|
||||
|
||||
@@ -2,4 +2,4 @@
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
export type CollabAgentTool = "spawnAgent" | "sendInput" | "resumeAgent" | "wait" | "closeAgent";
|
||||
export type CollabAgentTool = "spawnAgent" | "spawnAgentsOnCsv" | "sendInput" | "resumeAgent" | "wait" | "closeAgent";
|
||||
|
||||
@@ -584,7 +584,7 @@ impl ThreadHistoryBuilder {
|
||||
) {
|
||||
let item = ThreadItem::CollabAgentToolCall {
|
||||
id: payload.call_id.clone(),
|
||||
tool: CollabAgentTool::SpawnAgent,
|
||||
tool: collab_spawn_tool(payload.tool.clone()),
|
||||
status: CollabAgentToolCallStatus::InProgress,
|
||||
sender_thread_id: payload.sender_thread_id.to_string(),
|
||||
receiver_thread_ids: Vec::new(),
|
||||
@@ -600,26 +600,28 @@ impl ThreadHistoryBuilder {
|
||||
&mut self,
|
||||
payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
|
||||
) {
|
||||
let has_receiver = payload.new_thread_id.is_some();
|
||||
let status = match &payload.status {
|
||||
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
||||
_ if has_receiver => CollabAgentToolCallStatus::Completed,
|
||||
_ => CollabAgentToolCallStatus::Failed,
|
||||
};
|
||||
let (receiver_thread_ids, agents_states) = match &payload.new_thread_id {
|
||||
Some(id) => {
|
||||
let receiver_id = id.to_string();
|
||||
let received_status = CollabAgentState::from(payload.status.clone());
|
||||
(
|
||||
vec![receiver_id.clone()],
|
||||
[(receiver_id, received_status)].into_iter().collect(),
|
||||
)
|
||||
let (receiver_thread_ids, agents_states) = collab_spawn_receivers_and_states(payload);
|
||||
let has_receiver = !receiver_thread_ids.is_empty();
|
||||
let status = match payload.tool {
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent => match &payload.status {
|
||||
AgentStatus::Errored(_) | AgentStatus::NotFound => {
|
||||
CollabAgentToolCallStatus::Failed
|
||||
}
|
||||
_ if has_receiver => CollabAgentToolCallStatus::Completed,
|
||||
_ => CollabAgentToolCallStatus::Failed,
|
||||
},
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv => {
|
||||
match &payload.status {
|
||||
AgentStatus::Errored(_) | AgentStatus::NotFound => {
|
||||
CollabAgentToolCallStatus::Failed
|
||||
}
|
||||
_ => CollabAgentToolCallStatus::Completed,
|
||||
}
|
||||
}
|
||||
None => (Vec::new(), HashMap::new()),
|
||||
};
|
||||
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
||||
id: payload.call_id.clone(),
|
||||
tool: CollabAgentTool::SpawnAgent,
|
||||
tool: collab_spawn_tool(payload.tool.clone()),
|
||||
status,
|
||||
sender_thread_id: payload.sender_thread_id.to_string(),
|
||||
receiver_thread_ids,
|
||||
@@ -1056,6 +1058,50 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
fn collab_spawn_tool(tool: codex_protocol::protocol::CollabAgentSpawnTool) -> CollabAgentTool {
|
||||
match tool {
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent => CollabAgentTool::SpawnAgent,
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv => {
|
||||
CollabAgentTool::SpawnAgentsOnCsv
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn collab_spawn_receivers_and_states(
|
||||
payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
|
||||
) -> (Vec<String>, HashMap<String, CollabAgentState>) {
|
||||
if !payload.agent_statuses.is_empty() {
|
||||
let receiver_thread_ids = payload
|
||||
.agent_statuses
|
||||
.iter()
|
||||
.map(|status| status.thread_id.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
let agents_states = payload
|
||||
.agent_statuses
|
||||
.iter()
|
||||
.map(|status| {
|
||||
(
|
||||
status.thread_id.to_string(),
|
||||
CollabAgentState::from(status.status.clone()),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
return (receiver_thread_ids, agents_states);
|
||||
}
|
||||
|
||||
match &payload.new_thread_id {
|
||||
Some(id) => {
|
||||
let receiver_id = id.to_string();
|
||||
let received_status = CollabAgentState::from(payload.status.clone());
|
||||
(
|
||||
vec![receiver_id.clone()],
|
||||
[(receiver_id, received_status)].into_iter().collect(),
|
||||
)
|
||||
}
|
||||
None => (Vec::new(), HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
const REVIEW_FALLBACK_MESSAGE: &str = "Reviewer failed to output a response.";
|
||||
|
||||
fn render_review_output_text(output: &ReviewOutputEvent) -> String {
|
||||
@@ -2728,6 +2774,7 @@ mod tests {
|
||||
}),
|
||||
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
|
||||
call_id: "spawn-1".into(),
|
||||
tool: codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent,
|
||||
sender_thread_id,
|
||||
new_thread_id: Some(spawned_thread_id),
|
||||
new_agent_nickname: Some("Scout".into()),
|
||||
@@ -2735,6 +2782,7 @@ mod tests {
|
||||
prompt: "inspect the repo".into(),
|
||||
model: "gpt-5.4-mini".into(),
|
||||
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
|
||||
agent_statuses: Vec::new(),
|
||||
status: AgentStatus::Running,
|
||||
}),
|
||||
];
|
||||
@@ -2770,6 +2818,70 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconstructs_collab_batch_spawn_end_item_with_worker_statuses() {
|
||||
let sender_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000001")
|
||||
.expect("valid sender thread id");
|
||||
let worker_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000002")
|
||||
.expect("valid worker thread id");
|
||||
let events = vec![
|
||||
EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "run csv batch".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
}),
|
||||
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
|
||||
call_id: "spawn-batch-1".into(),
|
||||
tool: codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv,
|
||||
sender_thread_id,
|
||||
new_thread_id: None,
|
||||
new_agent_nickname: None,
|
||||
new_agent_role: None,
|
||||
prompt: "Return {path}".into(),
|
||||
model: "gpt-5.1".into(),
|
||||
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Low,
|
||||
agent_statuses: vec![codex_protocol::protocol::CollabAgentStatusEntry {
|
||||
thread_id: worker_thread_id,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
status: AgentStatus::Completed(None),
|
||||
}],
|
||||
status: AgentStatus::Completed(None),
|
||||
}),
|
||||
];
|
||||
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].items.len(), 2);
|
||||
assert_eq!(
|
||||
turns[0].items[1],
|
||||
ThreadItem::CollabAgentToolCall {
|
||||
id: "spawn-batch-1".into(),
|
||||
tool: CollabAgentTool::SpawnAgentsOnCsv,
|
||||
status: CollabAgentToolCallStatus::Completed,
|
||||
sender_thread_id: sender_thread_id.to_string(),
|
||||
receiver_thread_ids: vec![worker_thread_id.to_string()],
|
||||
prompt: Some("Return {path}".into()),
|
||||
model: Some("gpt-5.1".into()),
|
||||
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Low),
|
||||
agents_states: [(
|
||||
worker_thread_id.to_string(),
|
||||
CollabAgentState {
|
||||
status: crate::protocol::v2::CollabAgentStatus::Completed,
|
||||
message: None,
|
||||
},
|
||||
)]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconstructs_interrupted_send_input_as_completed_collab_call() {
|
||||
// `send_input(interrupt=true)` first stops the child's active turn, then redirects it with
|
||||
|
||||
@@ -4893,6 +4893,7 @@ v2_enum_from_core! {
|
||||
#[ts(export_to = "v2/")]
|
||||
pub enum CollabAgentTool {
|
||||
SpawnAgent,
|
||||
SpawnAgentsOnCsv,
|
||||
SendInput,
|
||||
ResumeAgent,
|
||||
Wait,
|
||||
|
||||
@@ -948,7 +948,7 @@ Today both notifications carry an empty `items` array even when item events were
|
||||
- `commandExecution` — `{id, command, cwd, status, commandActions, aggregatedOutput?, exitCode?, durationMs?}` for sandboxed commands; `status` is `inProgress`, `completed`, `failed`, or `declined`.
|
||||
- `fileChange` — `{id, changes, status}` describing proposed edits; `changes` list `{path, kind, diff}` and `status` is `inProgress`, `completed`, `failed`, or `declined`.
|
||||
- `mcpToolCall` — `{id, server, tool, status, arguments, result?, error?}` describing MCP calls; `status` is `inProgress`, `completed`, or `failed`.
|
||||
- `collabToolCall` — `{id, tool, status, senderThreadId, receiverThreadId?, newThreadId?, prompt?, agentStatus?}` describing collab tool calls (`spawn_agent`, `send_input`, `resume_agent`, `wait`, `close_agent`); `status` is `inProgress`, `completed`, or `failed`.
|
||||
- `collabToolCall` — `{id, tool, status, senderThreadId, receiverThreadIds, prompt?, model?, reasoningEffort?, agentsStates}` describing collab tool calls (`spawn_agent`, `spawn_agents_on_csv`, `send_input`, `resume_agent`, `wait`, `close_agent`); `status` is `inProgress`, `completed`, or `failed`.
|
||||
- `webSearch` — `{id, query, action?}` for a web search request issued by the agent; `action` mirrors the Responses API web_search action payload (`search`, `open_page`, `find_in_page`) and may be omitted until completion.
|
||||
- `imageView` — `{id, path}` emitted when the agent invokes the image viewer tool.
|
||||
- `enteredReviewMode` — `{id, review}` sent when the reviewer starts; `review` is a short user-facing label such as `"current changes"` or the requested target description.
|
||||
|
||||
@@ -1007,7 +1007,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
EventMsg::CollabAgentSpawnBegin(begin_event) => {
|
||||
let item = ThreadItem::CollabAgentToolCall {
|
||||
id: begin_event.call_id,
|
||||
tool: CollabAgentTool::SpawnAgent,
|
||||
tool: collab_spawn_tool(begin_event.tool),
|
||||
status: V2CollabToolCallStatus::InProgress,
|
||||
sender_thread_id: begin_event.sender_thread_id.to_string(),
|
||||
receiver_thread_ids: Vec::new(),
|
||||
@@ -1026,27 +1026,33 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.await;
|
||||
}
|
||||
EventMsg::CollabAgentSpawnEnd(end_event) => {
|
||||
let has_receiver = end_event.new_thread_id.is_some();
|
||||
let status = match &end_event.status {
|
||||
codex_protocol::protocol::AgentStatus::Errored(_)
|
||||
| codex_protocol::protocol::AgentStatus::NotFound => V2CollabToolCallStatus::Failed,
|
||||
_ if has_receiver => V2CollabToolCallStatus::Completed,
|
||||
_ => V2CollabToolCallStatus::Failed,
|
||||
};
|
||||
let (receiver_thread_ids, agents_states) = match end_event.new_thread_id {
|
||||
Some(id) => {
|
||||
let receiver_id = id.to_string();
|
||||
let received_status = V2CollabAgentStatus::from(end_event.status.clone());
|
||||
(
|
||||
vec![receiver_id.clone()],
|
||||
[(receiver_id, received_status)].into_iter().collect(),
|
||||
)
|
||||
let (receiver_thread_ids, agents_states) =
|
||||
collab_spawn_receivers_and_states(&end_event);
|
||||
let has_receiver = !receiver_thread_ids.is_empty();
|
||||
let status = match end_event.tool {
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent => {
|
||||
match &end_event.status {
|
||||
codex_protocol::protocol::AgentStatus::Errored(_)
|
||||
| codex_protocol::protocol::AgentStatus::NotFound => {
|
||||
V2CollabToolCallStatus::Failed
|
||||
}
|
||||
_ if has_receiver => V2CollabToolCallStatus::Completed,
|
||||
_ => V2CollabToolCallStatus::Failed,
|
||||
}
|
||||
}
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv => {
|
||||
match &end_event.status {
|
||||
codex_protocol::protocol::AgentStatus::Errored(_)
|
||||
| codex_protocol::protocol::AgentStatus::NotFound => {
|
||||
V2CollabToolCallStatus::Failed
|
||||
}
|
||||
_ => V2CollabToolCallStatus::Completed,
|
||||
}
|
||||
}
|
||||
None => (Vec::new(), HashMap::new()),
|
||||
};
|
||||
let item = ThreadItem::CollabAgentToolCall {
|
||||
id: end_event.call_id,
|
||||
tool: CollabAgentTool::SpawnAgent,
|
||||
tool: collab_spawn_tool(end_event.tool),
|
||||
status,
|
||||
sender_thread_id: end_event.sender_thread_id.to_string(),
|
||||
receiver_thread_ids,
|
||||
@@ -2813,6 +2819,50 @@ fn collab_resume_end_item(end_event: codex_protocol::protocol::CollabResumeEndEv
|
||||
}
|
||||
}
|
||||
|
||||
fn collab_spawn_tool(tool: codex_protocol::protocol::CollabAgentSpawnTool) -> CollabAgentTool {
|
||||
match tool {
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent => CollabAgentTool::SpawnAgent,
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv => {
|
||||
CollabAgentTool::SpawnAgentsOnCsv
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn collab_spawn_receivers_and_states(
|
||||
end_event: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
|
||||
) -> (Vec<String>, HashMap<String, V2CollabAgentStatus>) {
|
||||
if !end_event.agent_statuses.is_empty() {
|
||||
let receiver_thread_ids = end_event
|
||||
.agent_statuses
|
||||
.iter()
|
||||
.map(|agent_status| agent_status.thread_id.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
let agents_states = end_event
|
||||
.agent_statuses
|
||||
.iter()
|
||||
.map(|agent_status| {
|
||||
(
|
||||
agent_status.thread_id.to_string(),
|
||||
V2CollabAgentStatus::from(agent_status.status.clone()),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
return (receiver_thread_ids, agents_states);
|
||||
}
|
||||
|
||||
match end_event.new_thread_id {
|
||||
Some(id) => {
|
||||
let receiver_id = id.to_string();
|
||||
let received_status = V2CollabAgentStatus::from(end_event.status.clone());
|
||||
(
|
||||
vec![receiver_id.clone()],
|
||||
[(receiver_id, received_status)].into_iter().collect(),
|
||||
)
|
||||
}
|
||||
None => (Vec::new(), HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// similar to handle_mcp_tool_call_begin in exec
|
||||
async fn construct_mcp_tool_call_notification(
|
||||
begin_event: McpToolCallBeginEvent,
|
||||
|
||||
@@ -920,6 +920,8 @@ mod tests {
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::accept_async;
|
||||
|
||||
const TEST_HTTP_REQUEST_ACCEPT_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
|
||||
async fn remote_control_state_runtime(codex_home: &TempDir) -> Arc<StateRuntime> {
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "test-provider".to_string())
|
||||
.await
|
||||
@@ -1395,7 +1397,7 @@ mod tests {
|
||||
}
|
||||
|
||||
async fn accept_http_request(listener: &TcpListener) -> (TcpStream, String) {
|
||||
let (stream, _) = timeout(Duration::from_secs(5), listener.accept())
|
||||
let (stream, _) = timeout(TEST_HTTP_REQUEST_ACCEPT_TIMEOUT, listener.accept())
|
||||
.await
|
||||
.expect("HTTP request should arrive in time")
|
||||
.expect("listener accept should succeed");
|
||||
|
||||
@@ -57,12 +57,22 @@ use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use wiremock::Mock;
|
||||
use wiremock::Respond;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path_regex;
|
||||
|
||||
#[cfg(windows)]
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
|
||||
@@ -77,6 +87,135 @@ fn body_contains(req: &wiremock::Request, text: &str) -> bool {
|
||||
.is_some_and(|body| body.contains(text))
|
||||
}
|
||||
|
||||
struct BatchAgentsResponder {
|
||||
spawn_args_json: String,
|
||||
seen_main: AtomicBool,
|
||||
call_counter: AtomicUsize,
|
||||
}
|
||||
|
||||
impl BatchAgentsResponder {
|
||||
fn new(spawn_args_json: String) -> Self {
|
||||
Self {
|
||||
spawn_args_json,
|
||||
seen_main: AtomicBool::new(false),
|
||||
call_counter: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Respond for BatchAgentsResponder {
|
||||
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
|
||||
let body_bytes = decode_body_bytes(request);
|
||||
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
|
||||
|
||||
if has_function_call_output(&body) {
|
||||
return core_test_support::responses::sse_response(core_test_support::responses::sse(
|
||||
vec![
|
||||
core_test_support::responses::ev_response_created("resp-tool"),
|
||||
core_test_support::responses::ev_completed("resp-tool"),
|
||||
],
|
||||
));
|
||||
}
|
||||
|
||||
if let Some((job_id, item_id)) = extract_batch_job_and_item(&body) {
|
||||
let call_id = format!(
|
||||
"call-worker-{}",
|
||||
self.call_counter.fetch_add(1, Ordering::SeqCst)
|
||||
);
|
||||
let args = json!({
|
||||
"job_id": job_id,
|
||||
"item_id": item_id,
|
||||
"result": { "item_id": item_id },
|
||||
});
|
||||
let args_json = serde_json::to_string(&args).unwrap_or_else(|err| {
|
||||
panic!("worker args serialize: {err}");
|
||||
});
|
||||
return core_test_support::responses::sse_response(core_test_support::responses::sse(
|
||||
vec![
|
||||
core_test_support::responses::ev_response_created("resp-worker"),
|
||||
core_test_support::responses::ev_function_call(
|
||||
&call_id,
|
||||
"report_agent_job_result",
|
||||
&args_json,
|
||||
),
|
||||
core_test_support::responses::ev_completed("resp-worker"),
|
||||
],
|
||||
));
|
||||
}
|
||||
|
||||
if !self.seen_main.swap(true, Ordering::SeqCst) {
|
||||
return core_test_support::responses::sse_response(core_test_support::responses::sse(
|
||||
vec![
|
||||
core_test_support::responses::ev_response_created("resp-main"),
|
||||
core_test_support::responses::ev_function_call(
|
||||
"call-spawn",
|
||||
"spawn_agents_on_csv",
|
||||
&self.spawn_args_json,
|
||||
),
|
||||
core_test_support::responses::ev_completed("resp-main"),
|
||||
],
|
||||
));
|
||||
}
|
||||
|
||||
core_test_support::responses::sse_response(core_test_support::responses::sse(vec![
|
||||
core_test_support::responses::ev_response_created("resp-parent-done"),
|
||||
core_test_support::responses::ev_assistant_message("msg-parent-done", "parent done"),
|
||||
core_test_support::responses::ev_completed("resp-parent-done"),
|
||||
]))
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_body_bytes(request: &wiremock::Request) -> Vec<u8> {
|
||||
request.body.clone()
|
||||
}
|
||||
|
||||
fn has_function_call_output(body: &Value) -> bool {
|
||||
body.get("input")
|
||||
.and_then(Value::as_array)
|
||||
.is_some_and(|items| {
|
||||
items.iter().any(|item| {
|
||||
item.get("type").and_then(Value::as_str) == Some("function_call_output")
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn extract_batch_job_and_item(body: &Value) -> Option<(String, String)> {
|
||||
let texts = message_input_texts(body);
|
||||
let mut combined = texts.join("\n");
|
||||
if let Some(instructions) = body.get("instructions").and_then(Value::as_str) {
|
||||
combined.push('\n');
|
||||
combined.push_str(instructions);
|
||||
}
|
||||
if !combined.contains("You are processing one item for a generic agent job.") {
|
||||
return None;
|
||||
}
|
||||
let job_id = extract_prefixed_line(&combined, "Job ID:")?;
|
||||
let item_id = extract_prefixed_line(&combined, "Item ID:")?;
|
||||
Some((job_id, item_id))
|
||||
}
|
||||
|
||||
fn extract_prefixed_line(text: &str, prefix: &str) -> Option<String> {
|
||||
text.lines()
|
||||
.find_map(|line| line.strip_prefix(prefix).map(str::trim))
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
fn message_input_texts(body: &Value) -> Vec<String> {
|
||||
let Some(items) = body.get("input").and_then(Value::as_array) else {
|
||||
return Vec::new();
|
||||
};
|
||||
items
|
||||
.iter()
|
||||
.filter(|item| item.get("type").and_then(Value::as_str) == Some("message"))
|
||||
.filter_map(|item| item.get("content").and_then(Value::as_array))
|
||||
.flatten()
|
||||
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text"))
|
||||
.filter_map(|span| span.get("text").and_then(Value::as_str))
|
||||
.map(str::to_string)
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_sends_originator_header() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
@@ -2041,6 +2180,178 @@ config_file = "./custom-role.toml"
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_emits_spawn_agents_on_csv_item_with_model_metadata_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
const PARENT_PROMPT: &str = "run the csv batch";
|
||||
const INSTRUCTION: &str = "Return {path}";
|
||||
const REQUESTED_MODEL: &str = "gpt-5.1";
|
||||
const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low;
|
||||
const SPAWN_CALL_ID: &str = "call-spawn";
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let input_path = codex_home.path().join("batch_input.csv");
|
||||
let output_path = codex_home.path().join("batch_output.csv");
|
||||
fs::write(&input_path, "path\nfile-1\n")?;
|
||||
|
||||
let spawn_args = serde_json::to_string(&json!({
|
||||
"csv_path": input_path.display().to_string(),
|
||||
"instruction": INSTRUCTION,
|
||||
"output_csv_path": output_path.display().to_string(),
|
||||
"model": REQUESTED_MODEL,
|
||||
"reasoning_effort": REQUESTED_REASONING_EFFORT,
|
||||
}))?;
|
||||
let responder = BatchAgentsResponder::new(spawn_args);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&BTreeMap::from([
|
||||
(Feature::SpawnCsv, true),
|
||||
(Feature::Sqlite, true),
|
||||
(Feature::EnableRequestCompression, false),
|
||||
]),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("gpt-5.2-codex".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: PARENT_PROMPT.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let turn: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
let spawn_started = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let started_notif = mcp
|
||||
.read_stream_until_notification_message("item/started")
|
||||
.await?;
|
||||
let started: ItemStartedNotification =
|
||||
serde_json::from_value(started_notif.params.expect("item/started params"))?;
|
||||
if let ThreadItem::CollabAgentToolCall { id, .. } = &started.item
|
||||
&& id == SPAWN_CALL_ID
|
||||
{
|
||||
return Ok::<ThreadItem, anyhow::Error>(started.item);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
assert_eq!(
|
||||
spawn_started,
|
||||
ThreadItem::CollabAgentToolCall {
|
||||
id: SPAWN_CALL_ID.to_string(),
|
||||
tool: CollabAgentTool::SpawnAgentsOnCsv,
|
||||
status: CollabAgentToolCallStatus::InProgress,
|
||||
sender_thread_id: thread.id.clone(),
|
||||
receiver_thread_ids: Vec::new(),
|
||||
prompt: Some(INSTRUCTION.to_string()),
|
||||
model: Some(REQUESTED_MODEL.to_string()),
|
||||
reasoning_effort: Some(REQUESTED_REASONING_EFFORT),
|
||||
agents_states: HashMap::new(),
|
||||
}
|
||||
);
|
||||
|
||||
let spawn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let completed_notif = mcp
|
||||
.read_stream_until_notification_message("item/completed")
|
||||
.await?;
|
||||
let completed: ItemCompletedNotification =
|
||||
serde_json::from_value(completed_notif.params.expect("item/completed params"))?;
|
||||
if let ThreadItem::CollabAgentToolCall { id, .. } = &completed.item
|
||||
&& id == SPAWN_CALL_ID
|
||||
{
|
||||
return Ok::<ThreadItem, anyhow::Error>(completed.item);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
let ThreadItem::CollabAgentToolCall {
|
||||
id,
|
||||
tool,
|
||||
status,
|
||||
sender_thread_id,
|
||||
receiver_thread_ids,
|
||||
prompt,
|
||||
model,
|
||||
reasoning_effort,
|
||||
agents_states,
|
||||
} = spawn_completed
|
||||
else {
|
||||
unreachable!("loop ensures we break on collab agent tool call items");
|
||||
};
|
||||
let receiver_thread_id = receiver_thread_ids
|
||||
.first()
|
||||
.cloned()
|
||||
.expect("batch completion should include worker thread id");
|
||||
assert_eq!(id, SPAWN_CALL_ID);
|
||||
assert_eq!(tool, CollabAgentTool::SpawnAgentsOnCsv);
|
||||
assert_eq!(status, CollabAgentToolCallStatus::Completed);
|
||||
assert_eq!(sender_thread_id, thread.id);
|
||||
assert_eq!(receiver_thread_ids, vec![receiver_thread_id.clone()]);
|
||||
assert_eq!(prompt, Some(INSTRUCTION.to_string()));
|
||||
assert_eq!(model, Some(REQUESTED_MODEL.to_string()));
|
||||
assert_eq!(reasoning_effort, Some(REQUESTED_REASONING_EFFORT));
|
||||
let agent_state = agents_states
|
||||
.get(&receiver_thread_id)
|
||||
.expect("batch completion should include worker state");
|
||||
assert_eq!(agent_state.status, CollabAgentStatus::Completed);
|
||||
assert_eq!(agent_state.message, None);
|
||||
|
||||
let turn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let turn_completed_notif = mcp
|
||||
.read_stream_until_notification_message("turn/completed")
|
||||
.await?;
|
||||
let turn_completed: TurnCompletedNotification = serde_json::from_value(
|
||||
turn_completed_notif.params.expect("turn/completed params"),
|
||||
)?;
|
||||
if turn_completed.thread_id == thread.id && turn_completed.turn.id == turn.turn.id {
|
||||
return Ok::<TurnCompletedNotification, anyhow::Error>(turn_completed);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
assert_eq!(turn_completed.thread_id, thread.id);
|
||||
assert_eq!(turn_completed.turn.id, turn.turn.id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -8,13 +8,19 @@ use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::context::FunctionToolOutput;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::handlers::multi_agents::apply_requested_spawn_agent_model_overrides;
|
||||
use crate::tools::handlers::multi_agents::build_agent_spawn_config;
|
||||
use crate::tools::handlers::parse_arguments;
|
||||
use crate::tools::registry::ToolHandler;
|
||||
use crate::tools::registry::ToolKind;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnTool;
|
||||
use codex_protocol::protocol::CollabAgentStatusEntry;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
@@ -49,6 +55,8 @@ struct SpawnAgentsOnCsvArgs {
|
||||
id_column: Option<String>,
|
||||
output_csv_path: Option<String>,
|
||||
output_schema: Option<Value>,
|
||||
model: Option<String>,
|
||||
reasoning_effort: Option<ReasoningEffort>,
|
||||
max_concurrency: Option<usize>,
|
||||
max_workers: Option<usize>,
|
||||
max_runtime_seconds: Option<u64>,
|
||||
@@ -194,6 +202,7 @@ impl ToolHandler for BatchJobHandler {
|
||||
turn,
|
||||
tool_name,
|
||||
payload,
|
||||
call_id,
|
||||
..
|
||||
} = invocation;
|
||||
|
||||
@@ -207,7 +216,9 @@ impl ToolHandler for BatchJobHandler {
|
||||
};
|
||||
|
||||
match tool_name.as_str() {
|
||||
"spawn_agents_on_csv" => spawn_agents_on_csv::handle(session, turn, arguments).await,
|
||||
"spawn_agents_on_csv" => {
|
||||
spawn_agents_on_csv::handle(session, turn, call_id, arguments).await
|
||||
}
|
||||
"report_agent_job_result" => report_agent_job_result::handle(session, arguments).await,
|
||||
other => Err(FunctionCallError::RespondToModel(format!(
|
||||
"unsupported agent job tool {other}"
|
||||
@@ -227,6 +238,7 @@ mod spawn_agents_on_csv {
|
||||
pub async fn handle(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
call_id: String,
|
||||
arguments: String,
|
||||
) -> Result<FunctionToolOutput, FunctionCallError> {
|
||||
let args: SpawnAgentsOnCsvArgs = parse_arguments(arguments.as_str())?;
|
||||
@@ -235,6 +247,20 @@ mod spawn_agents_on_csv {
|
||||
"instruction must be non-empty".to_string(),
|
||||
));
|
||||
}
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentSpawnBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
tool: CollabAgentSpawnTool::SpawnAgentsOnCsv,
|
||||
sender_thread_id: session.conversation_id,
|
||||
prompt: args.instruction.clone(),
|
||||
model: args.model.clone().unwrap_or_default(),
|
||||
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let db = required_state_db(&session)?;
|
||||
let input_path = turn.resolve_path(Some(args.csv_path));
|
||||
@@ -339,7 +365,15 @@ mod spawn_agents_on_csv {
|
||||
})?;
|
||||
|
||||
let requested_concurrency = args.max_concurrency.or(args.max_workers);
|
||||
let options = match build_runner_options(&session, &turn, requested_concurrency).await {
|
||||
let options = match build_runner_options(
|
||||
&session,
|
||||
&turn,
|
||||
requested_concurrency,
|
||||
args.model.as_deref(),
|
||||
args.reasoning_effort,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(options) => options,
|
||||
Err(err) => {
|
||||
let error_message = err.to_string();
|
||||
@@ -362,12 +396,17 @@ mod spawn_agents_on_csv {
|
||||
"agent job concurrency: job_id={job_id} requested={requested_concurrency:?} max_threads={max_threads:?} effective={effective_concurrency}"
|
||||
);
|
||||
let _ = session.notify_background_event(&turn, message).await;
|
||||
let effective_batch_model = options.spawn_config.model.clone().unwrap_or_default();
|
||||
let effective_batch_reasoning_effort = options
|
||||
.spawn_config
|
||||
.model_reasoning_effort
|
||||
.unwrap_or_default();
|
||||
if let Err(err) = run_agent_job_loop(
|
||||
session.clone(),
|
||||
turn.clone(),
|
||||
db.clone(),
|
||||
job_id.clone(),
|
||||
options,
|
||||
options.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -410,6 +449,55 @@ mod spawn_agents_on_csv {
|
||||
))
|
||||
})?;
|
||||
let mut job_error = job.last_error.clone().filter(|err| !err.trim().is_empty());
|
||||
let agent_statuses = collect_batch_agent_statuses(db.clone(), job_id.as_str())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to load batch worker statuses for {job_id}: {err}"
|
||||
))
|
||||
})?;
|
||||
let collab_status = match job.status {
|
||||
codex_state::AgentJobStatus::Completed => {
|
||||
if progress.failed_items > 0 {
|
||||
AgentStatus::Completed(Some(format!(
|
||||
"agent job completed with {} failed items",
|
||||
progress.failed_items
|
||||
)))
|
||||
} else {
|
||||
AgentStatus::Completed(None)
|
||||
}
|
||||
}
|
||||
codex_state::AgentJobStatus::Cancelled => {
|
||||
AgentStatus::Completed(job.last_error.clone())
|
||||
}
|
||||
codex_state::AgentJobStatus::Failed => AgentStatus::Errored(
|
||||
job_error
|
||||
.clone()
|
||||
.unwrap_or_else(|| "agent job failed".to_string()),
|
||||
),
|
||||
codex_state::AgentJobStatus::Pending | codex_state::AgentJobStatus::Running => {
|
||||
AgentStatus::Running
|
||||
}
|
||||
};
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id: call_id.clone(),
|
||||
tool: CollabAgentSpawnTool::SpawnAgentsOnCsv,
|
||||
sender_thread_id: session.conversation_id,
|
||||
new_thread_id: None,
|
||||
new_agent_nickname: None,
|
||||
new_agent_role: None,
|
||||
prompt: job.instruction.clone(),
|
||||
model: effective_batch_model,
|
||||
reasoning_effort: effective_batch_reasoning_effort,
|
||||
agent_statuses,
|
||||
status: collab_status,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
let failed_item_errors = if progress.failed_items > 0 {
|
||||
let items = db
|
||||
.list_agent_job_items(
|
||||
@@ -523,6 +611,8 @@ async fn build_runner_options(
|
||||
session: &Arc<Session>,
|
||||
turn: &Arc<TurnContext>,
|
||||
requested_concurrency: Option<usize>,
|
||||
requested_model: Option<&str>,
|
||||
requested_reasoning_effort: Option<ReasoningEffort>,
|
||||
) -> Result<JobRunnerOptions, FunctionCallError> {
|
||||
let session_source = turn.session_source.clone();
|
||||
let child_depth = next_thread_spawn_depth(&session_source);
|
||||
@@ -535,7 +625,15 @@ async fn build_runner_options(
|
||||
let max_concurrency =
|
||||
normalize_concurrency(requested_concurrency, turn.config.agent_max_threads);
|
||||
let base_instructions = session.get_base_instructions().await;
|
||||
let spawn_config = build_agent_spawn_config(&base_instructions, turn.as_ref())?;
|
||||
let mut spawn_config = build_agent_spawn_config(&base_instructions, turn.as_ref())?;
|
||||
apply_requested_spawn_agent_model_overrides(
|
||||
session,
|
||||
turn.as_ref(),
|
||||
&mut spawn_config,
|
||||
requested_model,
|
||||
requested_reasoning_effort,
|
||||
)
|
||||
.await?;
|
||||
Ok(JobRunnerOptions {
|
||||
max_concurrency,
|
||||
spawn_config,
|
||||
@@ -994,6 +1092,41 @@ async fn finalize_finished_item(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn collect_batch_agent_statuses(
|
||||
db: Arc<codex_state::StateRuntime>,
|
||||
job_id: &str,
|
||||
) -> anyhow::Result<Vec<CollabAgentStatusEntry>> {
|
||||
let items = db
|
||||
.list_agent_job_items(job_id, /*status*/ None, /*limit*/ None)
|
||||
.await?;
|
||||
let mut agent_statuses = Vec::new();
|
||||
for item in items {
|
||||
let Some(assigned_thread_id) = item.assigned_thread_id.as_deref() else {
|
||||
continue;
|
||||
};
|
||||
let Ok(thread_id) = ThreadId::from_string(assigned_thread_id) else {
|
||||
continue;
|
||||
};
|
||||
let status = match item.status {
|
||||
codex_state::AgentJobItemStatus::Completed => AgentStatus::Completed(None),
|
||||
codex_state::AgentJobItemStatus::Failed => AgentStatus::Errored(
|
||||
item.last_error
|
||||
.clone()
|
||||
.unwrap_or_else(|| "agent job item failed".to_string()),
|
||||
),
|
||||
codex_state::AgentJobItemStatus::Pending => continue,
|
||||
codex_state::AgentJobItemStatus::Running => AgentStatus::Running,
|
||||
};
|
||||
agent_statuses.push(CollabAgentStatusEntry {
|
||||
thread_id,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
status,
|
||||
});
|
||||
}
|
||||
Ok(agent_statuses)
|
||||
}
|
||||
|
||||
fn build_worker_prompt(
|
||||
job: &codex_state::AgentJob,
|
||||
item: &codex_state::AgentJobItem,
|
||||
|
||||
@@ -25,6 +25,7 @@ use codex_protocol::protocol::CollabAgentInteractionEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentRef;
|
||||
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnTool;
|
||||
use codex_protocol::protocol::CollabCloseBeginEvent;
|
||||
use codex_protocol::protocol::CollabCloseEndEvent;
|
||||
use codex_protocol::protocol::CollabResumeBeginEvent;
|
||||
|
||||
@@ -51,6 +51,7 @@ impl ToolHandler for Handler {
|
||||
&turn,
|
||||
CollabAgentSpawnBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
tool: CollabAgentSpawnTool::SpawnAgent,
|
||||
sender_thread_id: session.conversation_id,
|
||||
prompt: prompt.clone(),
|
||||
model: args.model.clone().unwrap_or_default(),
|
||||
@@ -141,6 +142,7 @@ impl ToolHandler for Handler {
|
||||
&turn,
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id,
|
||||
tool: CollabAgentSpawnTool::SpawnAgent,
|
||||
sender_thread_id: session.conversation_id,
|
||||
new_thread_id,
|
||||
new_agent_nickname,
|
||||
@@ -148,6 +150,7 @@ impl ToolHandler for Handler {
|
||||
prompt,
|
||||
model: effective_model,
|
||||
reasoning_effort: effective_reasoning_effort,
|
||||
agent_statuses: Vec::new(),
|
||||
status,
|
||||
}
|
||||
.into(),
|
||||
|
||||
@@ -18,6 +18,7 @@ use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnTool;
|
||||
use codex_protocol::protocol::CollabCloseBeginEvent;
|
||||
use codex_protocol::protocol::CollabCloseEndEvent;
|
||||
use codex_protocol::protocol::CollabWaitingBeginEvent;
|
||||
|
||||
@@ -61,6 +61,7 @@ impl ToolHandler for Handler {
|
||||
&turn,
|
||||
CollabAgentSpawnBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
tool: CollabAgentSpawnTool::SpawnAgent,
|
||||
sender_thread_id: session.conversation_id,
|
||||
prompt: prompt.clone(),
|
||||
model: args.model.clone().unwrap_or_default(),
|
||||
@@ -182,6 +183,7 @@ impl ToolHandler for Handler {
|
||||
&turn,
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id,
|
||||
tool: CollabAgentSpawnTool::SpawnAgent,
|
||||
sender_thread_id: session.conversation_id,
|
||||
new_thread_id,
|
||||
new_agent_nickname,
|
||||
@@ -189,6 +191,7 @@ impl ToolHandler for Handler {
|
||||
prompt,
|
||||
model: effective_model,
|
||||
reasoning_effort: effective_reasoning_effort,
|
||||
agent_statuses: Vec::new(),
|
||||
status,
|
||||
}
|
||||
.into(),
|
||||
|
||||
@@ -242,6 +242,7 @@ impl EventProcessorWithJsonOutput {
|
||||
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
||||
tool: match tool {
|
||||
CollabAgentTool::SpawnAgent => CollabTool::SpawnAgent,
|
||||
CollabAgentTool::SpawnAgentsOnCsv => CollabTool::SpawnAgentsOnCsv,
|
||||
CollabAgentTool::SendInput => CollabTool::SendInput,
|
||||
CollabAgentTool::ResumeAgent => CollabTool::Wait,
|
||||
CollabAgentTool::Wait => CollabTool::Wait,
|
||||
|
||||
@@ -217,6 +217,7 @@ pub enum CollabToolCallStatus {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CollabTool {
|
||||
SpawnAgent,
|
||||
SpawnAgentsOnCsv,
|
||||
SendInput,
|
||||
Wait,
|
||||
CloseAgent,
|
||||
|
||||
@@ -760,6 +760,96 @@ fn collab_spawn_begin_and_end_emit_item_events() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collab_batch_spawn_begin_and_end_emit_item_events() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
||||
|
||||
let started =
|
||||
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
|
||||
item: ThreadItem::CollabAgentToolCall {
|
||||
id: "collab-batch-1".to_string(),
|
||||
tool: CollabAgentTool::SpawnAgentsOnCsv,
|
||||
status: ApiCollabAgentToolCallStatus::InProgress,
|
||||
sender_thread_id: "thread-parent".to_string(),
|
||||
receiver_thread_ids: Vec::new(),
|
||||
prompt: Some("Return {path}".to_string()),
|
||||
model: Some("gpt-5.1".to_string()),
|
||||
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Low),
|
||||
agents_states: std::collections::HashMap::new(),
|
||||
},
|
||||
thread_id: "thread-parent".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
}));
|
||||
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
||||
ItemCompletedNotification {
|
||||
item: ThreadItem::CollabAgentToolCall {
|
||||
id: "collab-batch-1".to_string(),
|
||||
tool: CollabAgentTool::SpawnAgentsOnCsv,
|
||||
status: ApiCollabAgentToolCallStatus::Completed,
|
||||
sender_thread_id: "thread-parent".to_string(),
|
||||
receiver_thread_ids: vec!["thread-child".to_string()],
|
||||
prompt: Some("Return {path}".to_string()),
|
||||
model: Some("gpt-5.1".to_string()),
|
||||
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Low),
|
||||
agents_states: std::collections::HashMap::from([(
|
||||
"thread-child".to_string(),
|
||||
ApiCollabAgentState {
|
||||
status: ApiCollabAgentStatus::Completed,
|
||||
message: None,
|
||||
},
|
||||
)]),
|
||||
},
|
||||
thread_id: "thread-parent".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
},
|
||||
));
|
||||
|
||||
assert_eq!(
|
||||
started,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
||||
item: ExecThreadItem {
|
||||
id: "item_0".to_string(),
|
||||
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
||||
tool: CollabTool::SpawnAgentsOnCsv,
|
||||
sender_thread_id: "thread-parent".to_string(),
|
||||
receiver_thread_ids: Vec::new(),
|
||||
prompt: Some("Return {path}".to_string()),
|
||||
agents_states: std::collections::HashMap::new(),
|
||||
status: CollabToolCallStatus::InProgress,
|
||||
},),
|
||||
},
|
||||
})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
completed,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
||||
item: ExecThreadItem {
|
||||
id: "item_0".to_string(),
|
||||
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
||||
tool: CollabTool::SpawnAgentsOnCsv,
|
||||
sender_thread_id: "thread-parent".to_string(),
|
||||
receiver_thread_ids: vec!["thread-child".to_string()],
|
||||
prompt: Some("Return {path}".to_string()),
|
||||
agents_states: std::collections::HashMap::from([(
|
||||
"thread-child".to_string(),
|
||||
CollabAgentState {
|
||||
status: CollabAgentStatus::Completed,
|
||||
message: None,
|
||||
},
|
||||
)]),
|
||||
status: CollabToolCallStatus::Completed,
|
||||
},),
|
||||
},
|
||||
})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn file_change_completion_maps_change_kinds() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
||||
|
||||
@@ -3409,10 +3409,22 @@ pub enum TurnAbortReason {
|
||||
ReviewEnded,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[ts(rename_all = "snake_case")]
|
||||
pub enum CollabAgentSpawnTool {
|
||||
#[default]
|
||||
SpawnAgent,
|
||||
SpawnAgentsOnCsv,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
|
||||
pub struct CollabAgentSpawnBeginEvent {
|
||||
/// Identifier for the collab tool call.
|
||||
pub call_id: String,
|
||||
/// Tool that initiated the spawn flow.
|
||||
#[serde(default)]
|
||||
pub tool: CollabAgentSpawnTool,
|
||||
/// Thread ID of the sender.
|
||||
pub sender_thread_id: ThreadId,
|
||||
/// Initial prompt sent to the agent. Can be empty to prevent CoT leaking at the
|
||||
@@ -3452,6 +3464,9 @@ pub struct CollabAgentStatusEntry {
|
||||
pub struct CollabAgentSpawnEndEvent {
|
||||
/// Identifier for the collab tool call.
|
||||
pub call_id: String,
|
||||
/// Tool that initiated the spawn flow.
|
||||
#[serde(default)]
|
||||
pub tool: CollabAgentSpawnTool,
|
||||
/// Thread ID of the sender.
|
||||
pub sender_thread_id: ThreadId,
|
||||
/// Thread ID of the newly spawned agent, if it was created.
|
||||
@@ -3469,6 +3484,9 @@ pub struct CollabAgentSpawnEndEvent {
|
||||
pub model: String,
|
||||
/// Effective reasoning effort used by the spawned agent after inheritance and role overrides.
|
||||
pub reasoning_effort: ReasoningEffortConfig,
|
||||
/// Final per-agent statuses for multi-agent batch spawns.
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub agent_statuses: Vec<CollabAgentStatusEntry>,
|
||||
/// Last known status of the new agent reported to the sender agent.
|
||||
pub status: AgentStatus,
|
||||
}
|
||||
|
||||
@@ -26,6 +26,20 @@ pub fn create_spawn_agents_on_csv_tool() -> ToolSpec {
|
||||
"output_csv_path".to_string(),
|
||||
JsonSchema::string(Some("Optional output CSV path for exported results.".to_string())),
|
||||
),
|
||||
(
|
||||
"model".to_string(),
|
||||
JsonSchema::string(Some(
|
||||
"Optional model override for each worker sub-agent. Replaces the inherited model."
|
||||
.to_string(),
|
||||
)),
|
||||
),
|
||||
(
|
||||
"reasoning_effort".to_string(),
|
||||
JsonSchema::string(Some(
|
||||
"Optional reasoning effort override for each worker sub-agent. Replaces the inherited reasoning effort."
|
||||
.to_string(),
|
||||
)),
|
||||
),
|
||||
(
|
||||
"max_concurrency".to_string(),
|
||||
JsonSchema::number(Some(
|
||||
|
||||
@@ -39,6 +39,20 @@ fn spawn_agents_on_csv_tool_requires_csv_and_instruction() {
|
||||
"Optional output CSV path for exported results.".to_string(),
|
||||
)),
|
||||
),
|
||||
(
|
||||
"model".to_string(),
|
||||
JsonSchema::string(Some(
|
||||
"Optional model override for each worker sub-agent. Replaces the inherited model."
|
||||
.to_string(),
|
||||
)),
|
||||
),
|
||||
(
|
||||
"reasoning_effort".to_string(),
|
||||
JsonSchema::string(Some(
|
||||
"Optional reasoning effort override for each worker sub-agent. Replaces the inherited reasoning effort."
|
||||
.to_string(),
|
||||
)),
|
||||
),
|
||||
(
|
||||
"max_concurrency".to_string(),
|
||||
JsonSchema::number(Some(
|
||||
|
||||
@@ -3724,8 +3724,8 @@ impl ChatWidget {
|
||||
let first_receiver_metadata =
|
||||
first_receiver.map(|thread_id| self.collab_agent_metadata(thread_id));
|
||||
|
||||
match tool {
|
||||
CollabAgentTool::SpawnAgent => {
|
||||
match tool.clone() {
|
||||
CollabAgentTool::SpawnAgent | CollabAgentTool::SpawnAgentsOnCsv => {
|
||||
if let (Some(model), Some(reasoning_effort)) = (model.clone(), reasoning_effort) {
|
||||
self.pending_collab_spawn_requests.insert(
|
||||
id.clone(),
|
||||
@@ -3748,9 +3748,23 @@ impl ChatWidget {
|
||||
}
|
||||
})
|
||||
});
|
||||
let (agent_statuses, statuses) = app_server_collab_agent_statuses_to_core(
|
||||
&receiver_thread_ids,
|
||||
&agents_states,
|
||||
&self.collab_agent_metadata,
|
||||
);
|
||||
self.on_collab_event(multi_agents::spawn_end(
|
||||
codex_protocol::protocol::CollabAgentSpawnEndEvent {
|
||||
call_id: id,
|
||||
tool: match tool {
|
||||
CollabAgentTool::SpawnAgent => {
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent
|
||||
}
|
||||
CollabAgentTool::SpawnAgentsOnCsv => {
|
||||
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv
|
||||
}
|
||||
_ => unreachable!("non-spawn collab tool"),
|
||||
},
|
||||
sender_thread_id,
|
||||
new_thread_id: first_receiver,
|
||||
new_agent_nickname: first_receiver_metadata
|
||||
@@ -3762,10 +3776,20 @@ impl ChatWidget {
|
||||
prompt: prompt.unwrap_or_default(),
|
||||
model: String::new(),
|
||||
reasoning_effort: ReasoningEffortConfig::Medium,
|
||||
agent_statuses,
|
||||
status: first_receiver
|
||||
.as_ref()
|
||||
.and_then(|thread_id| agents_states.get(&thread_id.to_string()))
|
||||
.map(app_server_collab_state_to_core)
|
||||
.and_then(|thread_id| statuses.get(thread_id))
|
||||
.cloned()
|
||||
.or_else(|| match status {
|
||||
CollabAgentToolCallStatus::Completed => {
|
||||
Some(AgentStatus::Completed(None))
|
||||
}
|
||||
CollabAgentToolCallStatus::Failed => {
|
||||
Some(AgentStatus::Errored("Agent spawn failed".into()))
|
||||
}
|
||||
CollabAgentToolCallStatus::InProgress => None,
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
AgentStatus::Errored("Agent spawn failed".into())
|
||||
}),
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
---
|
||||
source: tui/src/chatwidget/tests/app_server.rs
|
||||
expression: combined
|
||||
---
|
||||
• Spawned 1 CSV workers (gpt-5.1 low)
|
||||
└ Return {path}
|
||||
019cff70-2599-75e2-af72-b91781b41a8e: Completed
|
||||
@@ -11,6 +11,7 @@ async fn collab_spawn_end_shows_requested_model_and_effort() {
|
||||
id: "spawn-begin".into(),
|
||||
msg: EventMsg::CollabAgentSpawnBegin(CollabAgentSpawnBeginEvent {
|
||||
call_id: "call-spawn".to_string(),
|
||||
tool: codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent,
|
||||
sender_thread_id,
|
||||
prompt: "Explore the repo".to_string(),
|
||||
model: "gpt-5".to_string(),
|
||||
@@ -21,6 +22,7 @@ async fn collab_spawn_end_shows_requested_model_and_effort() {
|
||||
id: "spawn-end".into(),
|
||||
msg: EventMsg::CollabAgentSpawnEnd(CollabAgentSpawnEndEvent {
|
||||
call_id: "call-spawn".to_string(),
|
||||
tool: codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent,
|
||||
sender_thread_id,
|
||||
new_thread_id: Some(spawned_thread_id),
|
||||
new_agent_nickname: Some("Robie".to_string()),
|
||||
@@ -28,6 +30,7 @@ async fn collab_spawn_end_shows_requested_model_and_effort() {
|
||||
prompt: "Explore the repo".to_string(),
|
||||
model: "gpt-5".to_string(),
|
||||
reasoning_effort: ReasoningEffortConfig::High,
|
||||
agent_statuses: Vec::new(),
|
||||
status: AgentStatus::PendingInit,
|
||||
}),
|
||||
});
|
||||
@@ -409,6 +412,69 @@ async fn live_app_server_collab_spawn_completed_renders_requested_model_and_effo
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn live_app_server_collab_batch_spawn_completed_renders_worker_statuses() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
let sender_thread_id =
|
||||
ThreadId::from_string("019cff70-2599-75e2-af72-b90000000002").expect("valid thread id");
|
||||
let worker_thread_id =
|
||||
ThreadId::from_string("019cff70-2599-75e2-af72-b91781b41a8e").expect("valid thread id");
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::ItemStarted(ItemStartedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: AppServerThreadItem::CollabAgentToolCall {
|
||||
id: "spawn-batch-1".to_string(),
|
||||
tool: AppServerCollabAgentTool::SpawnAgentsOnCsv,
|
||||
status: AppServerCollabAgentToolCallStatus::InProgress,
|
||||
sender_thread_id: sender_thread_id.to_string(),
|
||||
receiver_thread_ids: Vec::new(),
|
||||
prompt: Some("Return {path}".to_string()),
|
||||
model: Some("gpt-5.1".to_string()),
|
||||
reasoning_effort: Some(ReasoningEffortConfig::Low),
|
||||
agents_states: HashMap::new(),
|
||||
},
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::ItemCompleted(ItemCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: AppServerThreadItem::CollabAgentToolCall {
|
||||
id: "spawn-batch-1".to_string(),
|
||||
tool: AppServerCollabAgentTool::SpawnAgentsOnCsv,
|
||||
status: AppServerCollabAgentToolCallStatus::Completed,
|
||||
sender_thread_id: sender_thread_id.to_string(),
|
||||
receiver_thread_ids: vec![worker_thread_id.to_string()],
|
||||
prompt: Some("Return {path}".to_string()),
|
||||
model: Some("gpt-5.1".to_string()),
|
||||
reasoning_effort: Some(ReasoningEffortConfig::Low),
|
||||
agents_states: HashMap::from([(
|
||||
worker_thread_id.to_string(),
|
||||
AppServerCollabAgentState {
|
||||
status: AppServerCollabAgentStatus::Completed,
|
||||
message: None,
|
||||
},
|
||||
)]),
|
||||
},
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
|
||||
let combined = drain_insert_history(&mut rx)
|
||||
.into_iter()
|
||||
.map(|lines| lines_to_single_string(&lines))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
assert_chatwidget_snapshot!(
|
||||
"app_server_collab_batch_spawn_completed_renders_worker_statuses",
|
||||
combined
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn live_app_server_failed_turn_does_not_duplicate_error_history() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
|
||||
@@ -13,6 +13,7 @@ use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentRef;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnTool;
|
||||
use codex_protocol::protocol::CollabAgentStatusEntry;
|
||||
use codex_protocol::protocol::CollabCloseEndEvent;
|
||||
use codex_protocol::protocol::CollabResumeBeginEvent;
|
||||
@@ -177,32 +178,49 @@ pub(crate) fn spawn_end(
|
||||
) -> PlainHistoryCell {
|
||||
let CollabAgentSpawnEndEvent {
|
||||
call_id: _,
|
||||
tool,
|
||||
sender_thread_id: _,
|
||||
new_thread_id,
|
||||
new_agent_nickname,
|
||||
new_agent_role,
|
||||
prompt,
|
||||
agent_statuses,
|
||||
status: _,
|
||||
..
|
||||
} = ev;
|
||||
|
||||
let title = match new_thread_id {
|
||||
Some(thread_id) => title_with_agent(
|
||||
"Spawned",
|
||||
AgentLabel {
|
||||
thread_id: Some(thread_id),
|
||||
nickname: new_agent_nickname.as_deref(),
|
||||
role: new_agent_role.as_deref(),
|
||||
},
|
||||
spawn_request,
|
||||
),
|
||||
None => title_text("Agent spawn failed"),
|
||||
let title = match tool {
|
||||
CollabAgentSpawnTool::SpawnAgent => match new_thread_id {
|
||||
Some(thread_id) => title_with_agent(
|
||||
"Spawned",
|
||||
AgentLabel {
|
||||
thread_id: Some(thread_id),
|
||||
nickname: new_agent_nickname.as_deref(),
|
||||
role: new_agent_role.as_deref(),
|
||||
},
|
||||
spawn_request,
|
||||
),
|
||||
None => title_text("Agent spawn failed"),
|
||||
},
|
||||
CollabAgentSpawnTool::SpawnAgentsOnCsv => {
|
||||
let mut spans =
|
||||
vec![Span::from(format!("Spawned {} CSV workers", agent_statuses.len())).bold()];
|
||||
spans.extend(spawn_request_spans(spawn_request));
|
||||
title_spans_line(spans)
|
||||
}
|
||||
};
|
||||
|
||||
let mut details = Vec::new();
|
||||
if let Some(line) = prompt_line(&prompt) {
|
||||
details.push(line);
|
||||
}
|
||||
if matches!(tool, CollabAgentSpawnTool::SpawnAgentsOnCsv) {
|
||||
let statuses = agent_statuses
|
||||
.iter()
|
||||
.map(|agent_status| (agent_status.thread_id, agent_status.status.clone()))
|
||||
.collect::<HashMap<_, _>>();
|
||||
details.extend(wait_complete_lines(&statuses, &agent_statuses));
|
||||
}
|
||||
collab_event(title, details)
|
||||
}
|
||||
|
||||
@@ -604,6 +622,7 @@ mod tests {
|
||||
let spawn = spawn_end(
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id: "call-spawn".to_string(),
|
||||
tool: CollabAgentSpawnTool::SpawnAgent,
|
||||
sender_thread_id,
|
||||
new_thread_id: Some(robie_id),
|
||||
new_agent_nickname: Some("Robie".to_string()),
|
||||
@@ -611,6 +630,7 @@ mod tests {
|
||||
prompt: "Compute 11! and reply with just the integer result.".to_string(),
|
||||
model: "gpt-5".to_string(),
|
||||
reasoning_effort: ReasoningEffortConfig::High,
|
||||
agent_statuses: Vec::new(),
|
||||
status: AgentStatus::PendingInit,
|
||||
},
|
||||
Some(&SpawnRequestSummary {
|
||||
@@ -742,6 +762,7 @@ mod tests {
|
||||
let cell = spawn_end(
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id: "call-spawn".to_string(),
|
||||
tool: CollabAgentSpawnTool::SpawnAgent,
|
||||
sender_thread_id,
|
||||
new_thread_id: Some(robie_id),
|
||||
new_agent_nickname: Some("Robie".to_string()),
|
||||
@@ -749,6 +770,7 @@ mod tests {
|
||||
prompt: String::new(),
|
||||
model: "gpt-5".to_string(),
|
||||
reasoning_effort: ReasoningEffortConfig::High,
|
||||
agent_statuses: Vec::new(),
|
||||
status: AgentStatus::PendingInit,
|
||||
},
|
||||
Some(&SpawnRequestSummary {
|
||||
|
||||
Reference in New Issue
Block a user