Compare commits

...

2 Commits

Author SHA1 Message Date
rupert-openai
3c924b3e3e codex: fix CI failure on PR #17108 2026-04-08 13:54:57 +01:00
kh.ai
478afb40dd codex: add app-server support for spawn_agents_on_csv 2026-04-08 04:38:45 -05:00
40 changed files with 949 additions and 57 deletions

View File

@@ -575,6 +575,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -5756,6 +5756,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -2404,6 +2404,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -52,6 +52,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -52,6 +52,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -188,6 +188,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -253,6 +253,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -191,6 +191,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -191,6 +191,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -191,6 +191,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -253,6 +253,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -191,6 +191,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -253,6 +253,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -191,6 +191,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -191,6 +191,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -188,6 +188,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -188,6 +188,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -188,6 +188,7 @@
"CollabAgentTool": {
"enum": [
"spawnAgent",
"spawnAgentsOnCsv",
"sendInput",
"resumeAgent",
"wait",

View File

@@ -2,4 +2,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type CollabAgentTool = "spawnAgent" | "sendInput" | "resumeAgent" | "wait" | "closeAgent";
export type CollabAgentTool = "spawnAgent" | "spawnAgentsOnCsv" | "sendInput" | "resumeAgent" | "wait" | "closeAgent";

View File

@@ -584,7 +584,7 @@ impl ThreadHistoryBuilder {
) {
let item = ThreadItem::CollabAgentToolCall {
id: payload.call_id.clone(),
tool: CollabAgentTool::SpawnAgent,
tool: collab_spawn_tool(payload.tool.clone()),
status: CollabAgentToolCallStatus::InProgress,
sender_thread_id: payload.sender_thread_id.to_string(),
receiver_thread_ids: Vec::new(),
@@ -600,26 +600,28 @@ impl ThreadHistoryBuilder {
&mut self,
payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
) {
let has_receiver = payload.new_thread_id.is_some();
let status = match &payload.status {
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
_ if has_receiver => CollabAgentToolCallStatus::Completed,
_ => CollabAgentToolCallStatus::Failed,
};
let (receiver_thread_ids, agents_states) = match &payload.new_thread_id {
Some(id) => {
let receiver_id = id.to_string();
let received_status = CollabAgentState::from(payload.status.clone());
(
vec![receiver_id.clone()],
[(receiver_id, received_status)].into_iter().collect(),
)
let (receiver_thread_ids, agents_states) = collab_spawn_receivers_and_states(payload);
let has_receiver = !receiver_thread_ids.is_empty();
let status = match payload.tool {
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent => match &payload.status {
AgentStatus::Errored(_) | AgentStatus::NotFound => {
CollabAgentToolCallStatus::Failed
}
_ if has_receiver => CollabAgentToolCallStatus::Completed,
_ => CollabAgentToolCallStatus::Failed,
},
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv => {
match &payload.status {
AgentStatus::Errored(_) | AgentStatus::NotFound => {
CollabAgentToolCallStatus::Failed
}
_ => CollabAgentToolCallStatus::Completed,
}
}
None => (Vec::new(), HashMap::new()),
};
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
id: payload.call_id.clone(),
tool: CollabAgentTool::SpawnAgent,
tool: collab_spawn_tool(payload.tool.clone()),
status,
sender_thread_id: payload.sender_thread_id.to_string(),
receiver_thread_ids,
@@ -1056,6 +1058,50 @@ impl ThreadHistoryBuilder {
}
}
fn collab_spawn_tool(tool: codex_protocol::protocol::CollabAgentSpawnTool) -> CollabAgentTool {
match tool {
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent => CollabAgentTool::SpawnAgent,
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv => {
CollabAgentTool::SpawnAgentsOnCsv
}
}
}
fn collab_spawn_receivers_and_states(
payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
) -> (Vec<String>, HashMap<String, CollabAgentState>) {
if !payload.agent_statuses.is_empty() {
let receiver_thread_ids = payload
.agent_statuses
.iter()
.map(|status| status.thread_id.to_string())
.collect::<Vec<_>>();
let agents_states = payload
.agent_statuses
.iter()
.map(|status| {
(
status.thread_id.to_string(),
CollabAgentState::from(status.status.clone()),
)
})
.collect();
return (receiver_thread_ids, agents_states);
}
match &payload.new_thread_id {
Some(id) => {
let receiver_id = id.to_string();
let received_status = CollabAgentState::from(payload.status.clone());
(
vec![receiver_id.clone()],
[(receiver_id, received_status)].into_iter().collect(),
)
}
None => (Vec::new(), HashMap::new()),
}
}
const REVIEW_FALLBACK_MESSAGE: &str = "Reviewer failed to output a response.";
fn render_review_output_text(output: &ReviewOutputEvent) -> String {
@@ -2728,6 +2774,7 @@ mod tests {
}),
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
call_id: "spawn-1".into(),
tool: codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent,
sender_thread_id,
new_thread_id: Some(spawned_thread_id),
new_agent_nickname: Some("Scout".into()),
@@ -2735,6 +2782,7 @@ mod tests {
prompt: "inspect the repo".into(),
model: "gpt-5.4-mini".into(),
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
agent_statuses: Vec::new(),
status: AgentStatus::Running,
}),
];
@@ -2770,6 +2818,70 @@ mod tests {
);
}
#[test]
fn reconstructs_collab_batch_spawn_end_item_with_worker_statuses() {
let sender_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000001")
.expect("valid sender thread id");
let worker_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000002")
.expect("valid worker thread id");
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
message: "run csv batch".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
call_id: "spawn-batch-1".into(),
tool: codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv,
sender_thread_id,
new_thread_id: None,
new_agent_nickname: None,
new_agent_role: None,
prompt: "Return {path}".into(),
model: "gpt-5.1".into(),
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Low,
agent_statuses: vec![codex_protocol::protocol::CollabAgentStatusEntry {
thread_id: worker_thread_id,
agent_nickname: None,
agent_role: None,
status: AgentStatus::Completed(None),
}],
status: AgentStatus::Completed(None),
}),
];
let items = events
.into_iter()
.map(RolloutItem::EventMsg)
.collect::<Vec<_>>();
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].items.len(), 2);
assert_eq!(
turns[0].items[1],
ThreadItem::CollabAgentToolCall {
id: "spawn-batch-1".into(),
tool: CollabAgentTool::SpawnAgentsOnCsv,
status: CollabAgentToolCallStatus::Completed,
sender_thread_id: sender_thread_id.to_string(),
receiver_thread_ids: vec![worker_thread_id.to_string()],
prompt: Some("Return {path}".into()),
model: Some("gpt-5.1".into()),
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Low),
agents_states: [(
worker_thread_id.to_string(),
CollabAgentState {
status: crate::protocol::v2::CollabAgentStatus::Completed,
message: None,
},
)]
.into_iter()
.collect(),
}
);
}
#[test]
fn reconstructs_interrupted_send_input_as_completed_collab_call() {
// `send_input(interrupt=true)` first stops the child's active turn, then redirects it with

View File

@@ -4893,6 +4893,7 @@ v2_enum_from_core! {
#[ts(export_to = "v2/")]
pub enum CollabAgentTool {
SpawnAgent,
SpawnAgentsOnCsv,
SendInput,
ResumeAgent,
Wait,

View File

@@ -948,7 +948,7 @@ Today both notifications carry an empty `items` array even when item events were
- `commandExecution``{id, command, cwd, status, commandActions, aggregatedOutput?, exitCode?, durationMs?}` for sandboxed commands; `status` is `inProgress`, `completed`, `failed`, or `declined`.
- `fileChange``{id, changes, status}` describing proposed edits; `changes` list `{path, kind, diff}` and `status` is `inProgress`, `completed`, `failed`, or `declined`.
- `mcpToolCall``{id, server, tool, status, arguments, result?, error?}` describing MCP calls; `status` is `inProgress`, `completed`, or `failed`.
- `collabToolCall``{id, tool, status, senderThreadId, receiverThreadId?, newThreadId?, prompt?, agentStatus?}` describing collab tool calls (`spawn_agent`, `send_input`, `resume_agent`, `wait`, `close_agent`); `status` is `inProgress`, `completed`, or `failed`.
- `collabToolCall``{id, tool, status, senderThreadId, receiverThreadIds, prompt?, model?, reasoningEffort?, agentsStates}` describing collab tool calls (`spawn_agent`, `spawn_agents_on_csv`, `send_input`, `resume_agent`, `wait`, `close_agent`); `status` is `inProgress`, `completed`, or `failed`.
- `webSearch``{id, query, action?}` for a web search request issued by the agent; `action` mirrors the Responses API web_search action payload (`search`, `open_page`, `find_in_page`) and may be omitted until completion.
- `imageView``{id, path}` emitted when the agent invokes the image viewer tool.
- `enteredReviewMode``{id, review}` sent when the reviewer starts; `review` is a short user-facing label such as `"current changes"` or the requested target description.

View File

@@ -1007,7 +1007,7 @@ pub(crate) async fn apply_bespoke_event_handling(
EventMsg::CollabAgentSpawnBegin(begin_event) => {
let item = ThreadItem::CollabAgentToolCall {
id: begin_event.call_id,
tool: CollabAgentTool::SpawnAgent,
tool: collab_spawn_tool(begin_event.tool),
status: V2CollabToolCallStatus::InProgress,
sender_thread_id: begin_event.sender_thread_id.to_string(),
receiver_thread_ids: Vec::new(),
@@ -1026,27 +1026,33 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::CollabAgentSpawnEnd(end_event) => {
let has_receiver = end_event.new_thread_id.is_some();
let status = match &end_event.status {
codex_protocol::protocol::AgentStatus::Errored(_)
| codex_protocol::protocol::AgentStatus::NotFound => V2CollabToolCallStatus::Failed,
_ if has_receiver => V2CollabToolCallStatus::Completed,
_ => V2CollabToolCallStatus::Failed,
};
let (receiver_thread_ids, agents_states) = match end_event.new_thread_id {
Some(id) => {
let receiver_id = id.to_string();
let received_status = V2CollabAgentStatus::from(end_event.status.clone());
(
vec![receiver_id.clone()],
[(receiver_id, received_status)].into_iter().collect(),
)
let (receiver_thread_ids, agents_states) =
collab_spawn_receivers_and_states(&end_event);
let has_receiver = !receiver_thread_ids.is_empty();
let status = match end_event.tool {
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent => {
match &end_event.status {
codex_protocol::protocol::AgentStatus::Errored(_)
| codex_protocol::protocol::AgentStatus::NotFound => {
V2CollabToolCallStatus::Failed
}
_ if has_receiver => V2CollabToolCallStatus::Completed,
_ => V2CollabToolCallStatus::Failed,
}
}
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv => {
match &end_event.status {
codex_protocol::protocol::AgentStatus::Errored(_)
| codex_protocol::protocol::AgentStatus::NotFound => {
V2CollabToolCallStatus::Failed
}
_ => V2CollabToolCallStatus::Completed,
}
}
None => (Vec::new(), HashMap::new()),
};
let item = ThreadItem::CollabAgentToolCall {
id: end_event.call_id,
tool: CollabAgentTool::SpawnAgent,
tool: collab_spawn_tool(end_event.tool),
status,
sender_thread_id: end_event.sender_thread_id.to_string(),
receiver_thread_ids,
@@ -2813,6 +2819,50 @@ fn collab_resume_end_item(end_event: codex_protocol::protocol::CollabResumeEndEv
}
}
fn collab_spawn_tool(tool: codex_protocol::protocol::CollabAgentSpawnTool) -> CollabAgentTool {
match tool {
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent => CollabAgentTool::SpawnAgent,
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv => {
CollabAgentTool::SpawnAgentsOnCsv
}
}
}
fn collab_spawn_receivers_and_states(
end_event: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
) -> (Vec<String>, HashMap<String, V2CollabAgentStatus>) {
if !end_event.agent_statuses.is_empty() {
let receiver_thread_ids = end_event
.agent_statuses
.iter()
.map(|agent_status| agent_status.thread_id.to_string())
.collect::<Vec<_>>();
let agents_states = end_event
.agent_statuses
.iter()
.map(|agent_status| {
(
agent_status.thread_id.to_string(),
V2CollabAgentStatus::from(agent_status.status.clone()),
)
})
.collect();
return (receiver_thread_ids, agents_states);
}
match end_event.new_thread_id {
Some(id) => {
let receiver_id = id.to_string();
let received_status = V2CollabAgentStatus::from(end_event.status.clone());
(
vec![receiver_id.clone()],
[(receiver_id, received_status)].into_iter().collect(),
)
}
None => (Vec::new(), HashMap::new()),
}
}
/// similar to handle_mcp_tool_call_begin in exec
async fn construct_mcp_tool_call_notification(
begin_event: McpToolCallBeginEvent,

View File

@@ -920,6 +920,8 @@ mod tests {
use tokio::time::timeout;
use tokio_tungstenite::accept_async;
const TEST_HTTP_REQUEST_ACCEPT_TIMEOUT: Duration = Duration::from_secs(15);
async fn remote_control_state_runtime(codex_home: &TempDir) -> Arc<StateRuntime> {
StateRuntime::init(codex_home.path().to_path_buf(), "test-provider".to_string())
.await
@@ -1395,7 +1397,7 @@ mod tests {
}
async fn accept_http_request(listener: &TcpListener) -> (TcpStream, String) {
let (stream, _) = timeout(Duration::from_secs(5), listener.accept())
let (stream, _) = timeout(TEST_HTTP_REQUEST_ACCEPT_TIMEOUT, listener.accept())
.await
.expect("HTTP request should arrive in time")
.expect("listener accept should succeed");

View File

@@ -57,12 +57,22 @@ use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path_regex;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
@@ -77,6 +87,135 @@ fn body_contains(req: &wiremock::Request, text: &str) -> bool {
.is_some_and(|body| body.contains(text))
}
struct BatchAgentsResponder {
spawn_args_json: String,
seen_main: AtomicBool,
call_counter: AtomicUsize,
}
impl BatchAgentsResponder {
fn new(spawn_args_json: String) -> Self {
Self {
spawn_args_json,
seen_main: AtomicBool::new(false),
call_counter: AtomicUsize::new(0),
}
}
}
impl Respond for BatchAgentsResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
let body_bytes = decode_body_bytes(request);
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
if has_function_call_output(&body) {
return core_test_support::responses::sse_response(core_test_support::responses::sse(
vec![
core_test_support::responses::ev_response_created("resp-tool"),
core_test_support::responses::ev_completed("resp-tool"),
],
));
}
if let Some((job_id, item_id)) = extract_batch_job_and_item(&body) {
let call_id = format!(
"call-worker-{}",
self.call_counter.fetch_add(1, Ordering::SeqCst)
);
let args = json!({
"job_id": job_id,
"item_id": item_id,
"result": { "item_id": item_id },
});
let args_json = serde_json::to_string(&args).unwrap_or_else(|err| {
panic!("worker args serialize: {err}");
});
return core_test_support::responses::sse_response(core_test_support::responses::sse(
vec![
core_test_support::responses::ev_response_created("resp-worker"),
core_test_support::responses::ev_function_call(
&call_id,
"report_agent_job_result",
&args_json,
),
core_test_support::responses::ev_completed("resp-worker"),
],
));
}
if !self.seen_main.swap(true, Ordering::SeqCst) {
return core_test_support::responses::sse_response(core_test_support::responses::sse(
vec![
core_test_support::responses::ev_response_created("resp-main"),
core_test_support::responses::ev_function_call(
"call-spawn",
"spawn_agents_on_csv",
&self.spawn_args_json,
),
core_test_support::responses::ev_completed("resp-main"),
],
));
}
core_test_support::responses::sse_response(core_test_support::responses::sse(vec![
core_test_support::responses::ev_response_created("resp-parent-done"),
core_test_support::responses::ev_assistant_message("msg-parent-done", "parent done"),
core_test_support::responses::ev_completed("resp-parent-done"),
]))
}
}
fn decode_body_bytes(request: &wiremock::Request) -> Vec<u8> {
request.body.clone()
}
fn has_function_call_output(body: &Value) -> bool {
body.get("input")
.and_then(Value::as_array)
.is_some_and(|items| {
items.iter().any(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
})
})
}
fn extract_batch_job_and_item(body: &Value) -> Option<(String, String)> {
let texts = message_input_texts(body);
let mut combined = texts.join("\n");
if let Some(instructions) = body.get("instructions").and_then(Value::as_str) {
combined.push('\n');
combined.push_str(instructions);
}
if !combined.contains("You are processing one item for a generic agent job.") {
return None;
}
let job_id = extract_prefixed_line(&combined, "Job ID:")?;
let item_id = extract_prefixed_line(&combined, "Item ID:")?;
Some((job_id, item_id))
}
fn extract_prefixed_line(text: &str, prefix: &str) -> Option<String> {
text.lines()
.find_map(|line| line.strip_prefix(prefix).map(str::trim))
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn message_input_texts(body: &Value) -> Vec<String> {
let Some(items) = body.get("input").and_then(Value::as_array) else {
return Vec::new();
};
items
.iter()
.filter(|item| item.get("type").and_then(Value::as_str) == Some("message"))
.filter_map(|item| item.get("content").and_then(Value::as_array))
.flatten()
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text"))
.filter_map(|span| span.get("text").and_then(Value::as_str))
.map(str::to_string)
.collect()
}
#[tokio::test]
async fn turn_start_sends_originator_header() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
@@ -2041,6 +2180,178 @@ config_file = "./custom-role.toml"
Ok(())
}
#[tokio::test]
async fn turn_start_emits_spawn_agents_on_csv_item_with_model_metadata_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
const PARENT_PROMPT: &str = "run the csv batch";
const INSTRUCTION: &str = "Return {path}";
const REQUESTED_MODEL: &str = "gpt-5.1";
const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low;
const SPAWN_CALL_ID: &str = "call-spawn";
let server = responses::start_mock_server().await;
let codex_home = TempDir::new()?;
let input_path = codex_home.path().join("batch_input.csv");
let output_path = codex_home.path().join("batch_output.csv");
fs::write(&input_path, "path\nfile-1\n")?;
let spawn_args = serde_json::to_string(&json!({
"csv_path": input_path.display().to_string(),
"instruction": INSTRUCTION,
"output_csv_path": output_path.display().to_string(),
"model": REQUESTED_MODEL,
"reasoning_effort": REQUESTED_REASONING_EFFORT,
}))?;
let responder = BatchAgentsResponder::new(spawn_args);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::from([
(Feature::SpawnCsv, true),
(Feature::Sqlite, true),
(Feature::EnableRequestCompression, false),
]),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.2-codex".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: PARENT_PROMPT.to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let turn: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
let spawn_started = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let started_notif = mcp
.read_stream_until_notification_message("item/started")
.await?;
let started: ItemStartedNotification =
serde_json::from_value(started_notif.params.expect("item/started params"))?;
if let ThreadItem::CollabAgentToolCall { id, .. } = &started.item
&& id == SPAWN_CALL_ID
{
return Ok::<ThreadItem, anyhow::Error>(started.item);
}
}
})
.await??;
assert_eq!(
spawn_started,
ThreadItem::CollabAgentToolCall {
id: SPAWN_CALL_ID.to_string(),
tool: CollabAgentTool::SpawnAgentsOnCsv,
status: CollabAgentToolCallStatus::InProgress,
sender_thread_id: thread.id.clone(),
receiver_thread_ids: Vec::new(),
prompt: Some(INSTRUCTION.to_string()),
model: Some(REQUESTED_MODEL.to_string()),
reasoning_effort: Some(REQUESTED_REASONING_EFFORT),
agents_states: HashMap::new(),
}
);
let spawn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let completed_notif = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification =
serde_json::from_value(completed_notif.params.expect("item/completed params"))?;
if let ThreadItem::CollabAgentToolCall { id, .. } = &completed.item
&& id == SPAWN_CALL_ID
{
return Ok::<ThreadItem, anyhow::Error>(completed.item);
}
}
})
.await??;
let ThreadItem::CollabAgentToolCall {
id,
tool,
status,
sender_thread_id,
receiver_thread_ids,
prompt,
model,
reasoning_effort,
agents_states,
} = spawn_completed
else {
unreachable!("loop ensures we break on collab agent tool call items");
};
let receiver_thread_id = receiver_thread_ids
.first()
.cloned()
.expect("batch completion should include worker thread id");
assert_eq!(id, SPAWN_CALL_ID);
assert_eq!(tool, CollabAgentTool::SpawnAgentsOnCsv);
assert_eq!(status, CollabAgentToolCallStatus::Completed);
assert_eq!(sender_thread_id, thread.id);
assert_eq!(receiver_thread_ids, vec![receiver_thread_id.clone()]);
assert_eq!(prompt, Some(INSTRUCTION.to_string()));
assert_eq!(model, Some(REQUESTED_MODEL.to_string()));
assert_eq!(reasoning_effort, Some(REQUESTED_REASONING_EFFORT));
let agent_state = agents_states
.get(&receiver_thread_id)
.expect("batch completion should include worker state");
assert_eq!(agent_state.status, CollabAgentStatus::Completed);
assert_eq!(agent_state.message, None);
let turn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let turn_completed_notif = mcp
.read_stream_until_notification_message("turn/completed")
.await?;
let turn_completed: TurnCompletedNotification = serde_json::from_value(
turn_completed_notif.params.expect("turn/completed params"),
)?;
if turn_completed.thread_id == thread.id && turn_completed.turn.id == turn.turn.id {
return Ok::<TurnCompletedNotification, anyhow::Error>(turn_completed);
}
}
})
.await??;
assert_eq!(turn_completed.thread_id, thread.id);
assert_eq!(turn_completed.turn.id, turn.turn.id);
Ok(())
}
#[tokio::test]
async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -8,13 +8,19 @@ use crate::function_tool::FunctionCallError;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::handlers::multi_agents::apply_requested_spawn_agent_model_overrides;
use crate::tools::handlers::multi_agents::build_agent_spawn_config;
use crate::tools::handlers::parse_arguments;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use codex_protocol::ThreadId;
use codex_protocol::error::CodexErr;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabAgentSpawnTool;
use codex_protocol::protocol::CollabAgentStatusEntry;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
@@ -49,6 +55,8 @@ struct SpawnAgentsOnCsvArgs {
id_column: Option<String>,
output_csv_path: Option<String>,
output_schema: Option<Value>,
model: Option<String>,
reasoning_effort: Option<ReasoningEffort>,
max_concurrency: Option<usize>,
max_workers: Option<usize>,
max_runtime_seconds: Option<u64>,
@@ -194,6 +202,7 @@ impl ToolHandler for BatchJobHandler {
turn,
tool_name,
payload,
call_id,
..
} = invocation;
@@ -207,7 +216,9 @@ impl ToolHandler for BatchJobHandler {
};
match tool_name.as_str() {
"spawn_agents_on_csv" => spawn_agents_on_csv::handle(session, turn, arguments).await,
"spawn_agents_on_csv" => {
spawn_agents_on_csv::handle(session, turn, call_id, arguments).await
}
"report_agent_job_result" => report_agent_job_result::handle(session, arguments).await,
other => Err(FunctionCallError::RespondToModel(format!(
"unsupported agent job tool {other}"
@@ -227,6 +238,7 @@ mod spawn_agents_on_csv {
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
call_id: String,
arguments: String,
) -> Result<FunctionToolOutput, FunctionCallError> {
let args: SpawnAgentsOnCsvArgs = parse_arguments(arguments.as_str())?;
@@ -235,6 +247,20 @@ mod spawn_agents_on_csv {
"instruction must be non-empty".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
tool: CollabAgentSpawnTool::SpawnAgentsOnCsv,
sender_thread_id: session.conversation_id,
prompt: args.instruction.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let db = required_state_db(&session)?;
let input_path = turn.resolve_path(Some(args.csv_path));
@@ -339,7 +365,15 @@ mod spawn_agents_on_csv {
})?;
let requested_concurrency = args.max_concurrency.or(args.max_workers);
let options = match build_runner_options(&session, &turn, requested_concurrency).await {
let options = match build_runner_options(
&session,
&turn,
requested_concurrency,
args.model.as_deref(),
args.reasoning_effort,
)
.await
{
Ok(options) => options,
Err(err) => {
let error_message = err.to_string();
@@ -362,12 +396,17 @@ mod spawn_agents_on_csv {
"agent job concurrency: job_id={job_id} requested={requested_concurrency:?} max_threads={max_threads:?} effective={effective_concurrency}"
);
let _ = session.notify_background_event(&turn, message).await;
let effective_batch_model = options.spawn_config.model.clone().unwrap_or_default();
let effective_batch_reasoning_effort = options
.spawn_config
.model_reasoning_effort
.unwrap_or_default();
if let Err(err) = run_agent_job_loop(
session.clone(),
turn.clone(),
db.clone(),
job_id.clone(),
options,
options.clone(),
)
.await
{
@@ -410,6 +449,55 @@ mod spawn_agents_on_csv {
))
})?;
let mut job_error = job.last_error.clone().filter(|err| !err.trim().is_empty());
let agent_statuses = collect_batch_agent_statuses(db.clone(), job_id.as_str())
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to load batch worker statuses for {job_id}: {err}"
))
})?;
let collab_status = match job.status {
codex_state::AgentJobStatus::Completed => {
if progress.failed_items > 0 {
AgentStatus::Completed(Some(format!(
"agent job completed with {} failed items",
progress.failed_items
)))
} else {
AgentStatus::Completed(None)
}
}
codex_state::AgentJobStatus::Cancelled => {
AgentStatus::Completed(job.last_error.clone())
}
codex_state::AgentJobStatus::Failed => AgentStatus::Errored(
job_error
.clone()
.unwrap_or_else(|| "agent job failed".to_string()),
),
codex_state::AgentJobStatus::Pending | codex_state::AgentJobStatus::Running => {
AgentStatus::Running
}
};
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id: call_id.clone(),
tool: CollabAgentSpawnTool::SpawnAgentsOnCsv,
sender_thread_id: session.conversation_id,
new_thread_id: None,
new_agent_nickname: None,
new_agent_role: None,
prompt: job.instruction.clone(),
model: effective_batch_model,
reasoning_effort: effective_batch_reasoning_effort,
agent_statuses,
status: collab_status,
}
.into(),
)
.await;
let failed_item_errors = if progress.failed_items > 0 {
let items = db
.list_agent_job_items(
@@ -523,6 +611,8 @@ async fn build_runner_options(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
requested_concurrency: Option<usize>,
requested_model: Option<&str>,
requested_reasoning_effort: Option<ReasoningEffort>,
) -> Result<JobRunnerOptions, FunctionCallError> {
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
@@ -535,7 +625,15 @@ async fn build_runner_options(
let max_concurrency =
normalize_concurrency(requested_concurrency, turn.config.agent_max_threads);
let base_instructions = session.get_base_instructions().await;
let spawn_config = build_agent_spawn_config(&base_instructions, turn.as_ref())?;
let mut spawn_config = build_agent_spawn_config(&base_instructions, turn.as_ref())?;
apply_requested_spawn_agent_model_overrides(
session,
turn.as_ref(),
&mut spawn_config,
requested_model,
requested_reasoning_effort,
)
.await?;
Ok(JobRunnerOptions {
max_concurrency,
spawn_config,
@@ -994,6 +1092,41 @@ async fn finalize_finished_item(
Ok(())
}
async fn collect_batch_agent_statuses(
db: Arc<codex_state::StateRuntime>,
job_id: &str,
) -> anyhow::Result<Vec<CollabAgentStatusEntry>> {
let items = db
.list_agent_job_items(job_id, /*status*/ None, /*limit*/ None)
.await?;
let mut agent_statuses = Vec::new();
for item in items {
let Some(assigned_thread_id) = item.assigned_thread_id.as_deref() else {
continue;
};
let Ok(thread_id) = ThreadId::from_string(assigned_thread_id) else {
continue;
};
let status = match item.status {
codex_state::AgentJobItemStatus::Completed => AgentStatus::Completed(None),
codex_state::AgentJobItemStatus::Failed => AgentStatus::Errored(
item.last_error
.clone()
.unwrap_or_else(|| "agent job item failed".to_string()),
),
codex_state::AgentJobItemStatus::Pending => continue,
codex_state::AgentJobItemStatus::Running => AgentStatus::Running,
};
agent_statuses.push(CollabAgentStatusEntry {
thread_id,
agent_nickname: None,
agent_role: None,
status,
});
}
Ok(agent_statuses)
}
fn build_worker_prompt(
job: &codex_state::AgentJob,
item: &codex_state::AgentJobItem,

View File

@@ -25,6 +25,7 @@ use codex_protocol::protocol::CollabAgentInteractionEndEvent;
use codex_protocol::protocol::CollabAgentRef;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabAgentSpawnTool;
use codex_protocol::protocol::CollabCloseBeginEvent;
use codex_protocol::protocol::CollabCloseEndEvent;
use codex_protocol::protocol::CollabResumeBeginEvent;

View File

@@ -51,6 +51,7 @@ impl ToolHandler for Handler {
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
tool: CollabAgentSpawnTool::SpawnAgent,
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
@@ -141,6 +142,7 @@ impl ToolHandler for Handler {
&turn,
CollabAgentSpawnEndEvent {
call_id,
tool: CollabAgentSpawnTool::SpawnAgent,
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
@@ -148,6 +150,7 @@ impl ToolHandler for Handler {
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
agent_statuses: Vec::new(),
status,
}
.into(),

View File

@@ -18,6 +18,7 @@ use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabAgentSpawnTool;
use codex_protocol::protocol::CollabCloseBeginEvent;
use codex_protocol::protocol::CollabCloseEndEvent;
use codex_protocol::protocol::CollabWaitingBeginEvent;

View File

@@ -61,6 +61,7 @@ impl ToolHandler for Handler {
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
tool: CollabAgentSpawnTool::SpawnAgent,
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
@@ -182,6 +183,7 @@ impl ToolHandler for Handler {
&turn,
CollabAgentSpawnEndEvent {
call_id,
tool: CollabAgentSpawnTool::SpawnAgent,
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
@@ -189,6 +191,7 @@ impl ToolHandler for Handler {
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
agent_statuses: Vec::new(),
status,
}
.into(),

View File

@@ -242,6 +242,7 @@ impl EventProcessorWithJsonOutput {
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
tool: match tool {
CollabAgentTool::SpawnAgent => CollabTool::SpawnAgent,
CollabAgentTool::SpawnAgentsOnCsv => CollabTool::SpawnAgentsOnCsv,
CollabAgentTool::SendInput => CollabTool::SendInput,
CollabAgentTool::ResumeAgent => CollabTool::Wait,
CollabAgentTool::Wait => CollabTool::Wait,

View File

@@ -217,6 +217,7 @@ pub enum CollabToolCallStatus {
#[serde(rename_all = "snake_case")]
pub enum CollabTool {
SpawnAgent,
SpawnAgentsOnCsv,
SendInput,
Wait,
CloseAgent,

View File

@@ -760,6 +760,96 @@ fn collab_spawn_begin_and_end_emit_item_events() {
);
}
#[test]
fn collab_batch_spawn_begin_and_end_emit_item_events() {
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
let started =
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
item: ThreadItem::CollabAgentToolCall {
id: "collab-batch-1".to_string(),
tool: CollabAgentTool::SpawnAgentsOnCsv,
status: ApiCollabAgentToolCallStatus::InProgress,
sender_thread_id: "thread-parent".to_string(),
receiver_thread_ids: Vec::new(),
prompt: Some("Return {path}".to_string()),
model: Some("gpt-5.1".to_string()),
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Low),
agents_states: std::collections::HashMap::new(),
},
thread_id: "thread-parent".to_string(),
turn_id: "turn-1".to_string(),
}));
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
ItemCompletedNotification {
item: ThreadItem::CollabAgentToolCall {
id: "collab-batch-1".to_string(),
tool: CollabAgentTool::SpawnAgentsOnCsv,
status: ApiCollabAgentToolCallStatus::Completed,
sender_thread_id: "thread-parent".to_string(),
receiver_thread_ids: vec!["thread-child".to_string()],
prompt: Some("Return {path}".to_string()),
model: Some("gpt-5.1".to_string()),
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Low),
agents_states: std::collections::HashMap::from([(
"thread-child".to_string(),
ApiCollabAgentState {
status: ApiCollabAgentStatus::Completed,
message: None,
},
)]),
},
thread_id: "thread-parent".to_string(),
turn_id: "turn-1".to_string(),
},
));
assert_eq!(
started,
CollectedThreadEvents {
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
item: ExecThreadItem {
id: "item_0".to_string(),
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
tool: CollabTool::SpawnAgentsOnCsv,
sender_thread_id: "thread-parent".to_string(),
receiver_thread_ids: Vec::new(),
prompt: Some("Return {path}".to_string()),
agents_states: std::collections::HashMap::new(),
status: CollabToolCallStatus::InProgress,
},),
},
})],
status: CodexStatus::Running,
}
);
assert_eq!(
completed,
CollectedThreadEvents {
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
item: ExecThreadItem {
id: "item_0".to_string(),
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
tool: CollabTool::SpawnAgentsOnCsv,
sender_thread_id: "thread-parent".to_string(),
receiver_thread_ids: vec!["thread-child".to_string()],
prompt: Some("Return {path}".to_string()),
agents_states: std::collections::HashMap::from([(
"thread-child".to_string(),
CollabAgentState {
status: CollabAgentStatus::Completed,
message: None,
},
)]),
status: CollabToolCallStatus::Completed,
},),
},
})],
status: CodexStatus::Running,
}
);
}
#[test]
fn file_change_completion_maps_change_kinds() {
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);

View File

@@ -3409,10 +3409,22 @@ pub enum TurnAbortReason {
ReviewEnded,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "snake_case")]
#[ts(rename_all = "snake_case")]
pub enum CollabAgentSpawnTool {
#[default]
SpawnAgent,
SpawnAgentsOnCsv,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct CollabAgentSpawnBeginEvent {
/// Identifier for the collab tool call.
pub call_id: String,
/// Tool that initiated the spawn flow.
#[serde(default)]
pub tool: CollabAgentSpawnTool,
/// Thread ID of the sender.
pub sender_thread_id: ThreadId,
/// Initial prompt sent to the agent. Can be empty to prevent CoT leaking at the
@@ -3452,6 +3464,9 @@ pub struct CollabAgentStatusEntry {
pub struct CollabAgentSpawnEndEvent {
/// Identifier for the collab tool call.
pub call_id: String,
/// Tool that initiated the spawn flow.
#[serde(default)]
pub tool: CollabAgentSpawnTool,
/// Thread ID of the sender.
pub sender_thread_id: ThreadId,
/// Thread ID of the newly spawned agent, if it was created.
@@ -3469,6 +3484,9 @@ pub struct CollabAgentSpawnEndEvent {
pub model: String,
/// Effective reasoning effort used by the spawned agent after inheritance and role overrides.
pub reasoning_effort: ReasoningEffortConfig,
/// Final per-agent statuses for multi-agent batch spawns.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub agent_statuses: Vec<CollabAgentStatusEntry>,
/// Last known status of the new agent reported to the sender agent.
pub status: AgentStatus,
}

View File

@@ -26,6 +26,20 @@ pub fn create_spawn_agents_on_csv_tool() -> ToolSpec {
"output_csv_path".to_string(),
JsonSchema::string(Some("Optional output CSV path for exported results.".to_string())),
),
(
"model".to_string(),
JsonSchema::string(Some(
"Optional model override for each worker sub-agent. Replaces the inherited model."
.to_string(),
)),
),
(
"reasoning_effort".to_string(),
JsonSchema::string(Some(
"Optional reasoning effort override for each worker sub-agent. Replaces the inherited reasoning effort."
.to_string(),
)),
),
(
"max_concurrency".to_string(),
JsonSchema::number(Some(

View File

@@ -39,6 +39,20 @@ fn spawn_agents_on_csv_tool_requires_csv_and_instruction() {
"Optional output CSV path for exported results.".to_string(),
)),
),
(
"model".to_string(),
JsonSchema::string(Some(
"Optional model override for each worker sub-agent. Replaces the inherited model."
.to_string(),
)),
),
(
"reasoning_effort".to_string(),
JsonSchema::string(Some(
"Optional reasoning effort override for each worker sub-agent. Replaces the inherited reasoning effort."
.to_string(),
)),
),
(
"max_concurrency".to_string(),
JsonSchema::number(Some(

View File

@@ -3724,8 +3724,8 @@ impl ChatWidget {
let first_receiver_metadata =
first_receiver.map(|thread_id| self.collab_agent_metadata(thread_id));
match tool {
CollabAgentTool::SpawnAgent => {
match tool.clone() {
CollabAgentTool::SpawnAgent | CollabAgentTool::SpawnAgentsOnCsv => {
if let (Some(model), Some(reasoning_effort)) = (model.clone(), reasoning_effort) {
self.pending_collab_spawn_requests.insert(
id.clone(),
@@ -3748,9 +3748,23 @@ impl ChatWidget {
}
})
});
let (agent_statuses, statuses) = app_server_collab_agent_statuses_to_core(
&receiver_thread_ids,
&agents_states,
&self.collab_agent_metadata,
);
self.on_collab_event(multi_agents::spawn_end(
codex_protocol::protocol::CollabAgentSpawnEndEvent {
call_id: id,
tool: match tool {
CollabAgentTool::SpawnAgent => {
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent
}
CollabAgentTool::SpawnAgentsOnCsv => {
codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgentsOnCsv
}
_ => unreachable!("non-spawn collab tool"),
},
sender_thread_id,
new_thread_id: first_receiver,
new_agent_nickname: first_receiver_metadata
@@ -3762,10 +3776,20 @@ impl ChatWidget {
prompt: prompt.unwrap_or_default(),
model: String::new(),
reasoning_effort: ReasoningEffortConfig::Medium,
agent_statuses,
status: first_receiver
.as_ref()
.and_then(|thread_id| agents_states.get(&thread_id.to_string()))
.map(app_server_collab_state_to_core)
.and_then(|thread_id| statuses.get(thread_id))
.cloned()
.or_else(|| match status {
CollabAgentToolCallStatus::Completed => {
Some(AgentStatus::Completed(None))
}
CollabAgentToolCallStatus::Failed => {
Some(AgentStatus::Errored("Agent spawn failed".into()))
}
CollabAgentToolCallStatus::InProgress => None,
})
.unwrap_or_else(|| {
AgentStatus::Errored("Agent spawn failed".into())
}),

View File

@@ -0,0 +1,7 @@
---
source: tui/src/chatwidget/tests/app_server.rs
expression: combined
---
• Spawned 1 CSV workers (gpt-5.1 low)
└ Return {path}
019cff70-2599-75e2-af72-b91781b41a8e: Completed

View File

@@ -11,6 +11,7 @@ async fn collab_spawn_end_shows_requested_model_and_effort() {
id: "spawn-begin".into(),
msg: EventMsg::CollabAgentSpawnBegin(CollabAgentSpawnBeginEvent {
call_id: "call-spawn".to_string(),
tool: codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent,
sender_thread_id,
prompt: "Explore the repo".to_string(),
model: "gpt-5".to_string(),
@@ -21,6 +22,7 @@ async fn collab_spawn_end_shows_requested_model_and_effort() {
id: "spawn-end".into(),
msg: EventMsg::CollabAgentSpawnEnd(CollabAgentSpawnEndEvent {
call_id: "call-spawn".to_string(),
tool: codex_protocol::protocol::CollabAgentSpawnTool::SpawnAgent,
sender_thread_id,
new_thread_id: Some(spawned_thread_id),
new_agent_nickname: Some("Robie".to_string()),
@@ -28,6 +30,7 @@ async fn collab_spawn_end_shows_requested_model_and_effort() {
prompt: "Explore the repo".to_string(),
model: "gpt-5".to_string(),
reasoning_effort: ReasoningEffortConfig::High,
agent_statuses: Vec::new(),
status: AgentStatus::PendingInit,
}),
});
@@ -409,6 +412,69 @@ async fn live_app_server_collab_spawn_completed_renders_requested_model_and_effo
);
}
#[tokio::test]
async fn live_app_server_collab_batch_spawn_completed_renders_worker_statuses() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
let sender_thread_id =
ThreadId::from_string("019cff70-2599-75e2-af72-b90000000002").expect("valid thread id");
let worker_thread_id =
ThreadId::from_string("019cff70-2599-75e2-af72-b91781b41a8e").expect("valid thread id");
chat.handle_server_notification(
ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: AppServerThreadItem::CollabAgentToolCall {
id: "spawn-batch-1".to_string(),
tool: AppServerCollabAgentTool::SpawnAgentsOnCsv,
status: AppServerCollabAgentToolCallStatus::InProgress,
sender_thread_id: sender_thread_id.to_string(),
receiver_thread_ids: Vec::new(),
prompt: Some("Return {path}".to_string()),
model: Some("gpt-5.1".to_string()),
reasoning_effort: Some(ReasoningEffortConfig::Low),
agents_states: HashMap::new(),
},
}),
/*replay_kind*/ None,
);
chat.handle_server_notification(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: AppServerThreadItem::CollabAgentToolCall {
id: "spawn-batch-1".to_string(),
tool: AppServerCollabAgentTool::SpawnAgentsOnCsv,
status: AppServerCollabAgentToolCallStatus::Completed,
sender_thread_id: sender_thread_id.to_string(),
receiver_thread_ids: vec![worker_thread_id.to_string()],
prompt: Some("Return {path}".to_string()),
model: Some("gpt-5.1".to_string()),
reasoning_effort: Some(ReasoningEffortConfig::Low),
agents_states: HashMap::from([(
worker_thread_id.to_string(),
AppServerCollabAgentState {
status: AppServerCollabAgentStatus::Completed,
message: None,
},
)]),
},
}),
/*replay_kind*/ None,
);
let combined = drain_insert_history(&mut rx)
.into_iter()
.map(|lines| lines_to_single_string(&lines))
.collect::<Vec<_>>()
.join("\n");
assert_chatwidget_snapshot!(
"app_server_collab_batch_spawn_completed_renders_worker_statuses",
combined
);
}
#[tokio::test]
async fn live_app_server_failed_turn_does_not_duplicate_error_history() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;

View File

@@ -13,6 +13,7 @@ use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
use codex_protocol::protocol::CollabAgentRef;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabAgentSpawnTool;
use codex_protocol::protocol::CollabAgentStatusEntry;
use codex_protocol::protocol::CollabCloseEndEvent;
use codex_protocol::protocol::CollabResumeBeginEvent;
@@ -177,32 +178,49 @@ pub(crate) fn spawn_end(
) -> PlainHistoryCell {
let CollabAgentSpawnEndEvent {
call_id: _,
tool,
sender_thread_id: _,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
agent_statuses,
status: _,
..
} = ev;
let title = match new_thread_id {
Some(thread_id) => title_with_agent(
"Spawned",
AgentLabel {
thread_id: Some(thread_id),
nickname: new_agent_nickname.as_deref(),
role: new_agent_role.as_deref(),
},
spawn_request,
),
None => title_text("Agent spawn failed"),
let title = match tool {
CollabAgentSpawnTool::SpawnAgent => match new_thread_id {
Some(thread_id) => title_with_agent(
"Spawned",
AgentLabel {
thread_id: Some(thread_id),
nickname: new_agent_nickname.as_deref(),
role: new_agent_role.as_deref(),
},
spawn_request,
),
None => title_text("Agent spawn failed"),
},
CollabAgentSpawnTool::SpawnAgentsOnCsv => {
let mut spans =
vec![Span::from(format!("Spawned {} CSV workers", agent_statuses.len())).bold()];
spans.extend(spawn_request_spans(spawn_request));
title_spans_line(spans)
}
};
let mut details = Vec::new();
if let Some(line) = prompt_line(&prompt) {
details.push(line);
}
if matches!(tool, CollabAgentSpawnTool::SpawnAgentsOnCsv) {
let statuses = agent_statuses
.iter()
.map(|agent_status| (agent_status.thread_id, agent_status.status.clone()))
.collect::<HashMap<_, _>>();
details.extend(wait_complete_lines(&statuses, &agent_statuses));
}
collab_event(title, details)
}
@@ -604,6 +622,7 @@ mod tests {
let spawn = spawn_end(
CollabAgentSpawnEndEvent {
call_id: "call-spawn".to_string(),
tool: CollabAgentSpawnTool::SpawnAgent,
sender_thread_id,
new_thread_id: Some(robie_id),
new_agent_nickname: Some("Robie".to_string()),
@@ -611,6 +630,7 @@ mod tests {
prompt: "Compute 11! and reply with just the integer result.".to_string(),
model: "gpt-5".to_string(),
reasoning_effort: ReasoningEffortConfig::High,
agent_statuses: Vec::new(),
status: AgentStatus::PendingInit,
},
Some(&SpawnRequestSummary {
@@ -742,6 +762,7 @@ mod tests {
let cell = spawn_end(
CollabAgentSpawnEndEvent {
call_id: "call-spawn".to_string(),
tool: CollabAgentSpawnTool::SpawnAgent,
sender_thread_id,
new_thread_id: Some(robie_id),
new_agent_nickname: Some("Robie".to_string()),
@@ -749,6 +770,7 @@ mod tests {
prompt: String::new(),
model: "gpt-5".to_string(),
reasoning_effort: ReasoningEffortConfig::High,
agent_statuses: Vec::new(),
status: AgentStatus::PendingInit,
},
Some(&SpawnRequestSummary {