chore: split sub-agent v2 implementation (#15540)

Just to make things cleaner
This commit is contained in:
jif-oai
2026-03-23 19:41:53 +00:00
committed by GitHub
parent b5d0a5518d
commit 450dc289c3
12 changed files with 1058 additions and 431 deletions

View File

@@ -8,6 +8,8 @@ mod list_dir;
mod mcp;
mod mcp_resource;
pub(crate) mod multi_agents;
pub(crate) mod multi_agents_common;
pub(crate) mod multi_agents_v2;
mod plan;
mod read_file;
mod request_permissions;

View File

@@ -11,45 +11,33 @@ use crate::agent::agent_resolver::resolve_agent_targets;
use crate::agent::exceeds_thread_spawn_depth_limit;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::CodexErr;
use crate::function_tool::FunctionCallError;
use crate::models_manager::manager::RefreshStrategy;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
pub(crate) use crate::tools::handlers::multi_agents_common::*;
use crate::tools::handlers::parse_arguments;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use async_trait::async_trait;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::openai_models::ReasoningEffortPreset;
use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
use codex_protocol::protocol::CollabAgentRef;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabAgentStatusEntry;
use codex_protocol::protocol::CollabCloseBeginEvent;
use codex_protocol::protocol::CollabCloseEndEvent;
use codex_protocol::protocol::CollabResumeBeginEvent;
use codex_protocol::protocol::CollabResumeEndEvent;
use codex_protocol::protocol::CollabWaitingBeginEvent;
use codex_protocol::protocol::CollabWaitingEndEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::sync::Arc;
pub(crate) use close_agent::Handler as CloseAgentHandler;
pub(crate) use resume_agent::Handler as ResumeAgentHandler;
@@ -57,368 +45,12 @@ pub(crate) use send_input::Handler as SendInputHandler;
pub(crate) use spawn::Handler as SpawnAgentHandler;
pub(crate) use wait::Handler as WaitAgentHandler;
/// Minimum wait timeout to prevent tight polling loops from burning CPU.
pub(crate) const MIN_WAIT_TIMEOUT_MS: i64 = 10_000;
pub(crate) const DEFAULT_WAIT_TIMEOUT_MS: i64 = 30_000;
pub(crate) const MAX_WAIT_TIMEOUT_MS: i64 = 3600 * 1000;
fn function_arguments(payload: ToolPayload) -> Result<String, FunctionCallError> {
match payload {
ToolPayload::Function { arguments } => Ok(arguments),
_ => Err(FunctionCallError::RespondToModel(
"collab handler received unsupported payload".to_string(),
)),
}
}
fn tool_output_json_text<T>(value: &T, tool_name: &str) -> String
where
T: Serialize,
{
serde_json::to_string(value).unwrap_or_else(|err| {
JsonValue::String(format!("failed to serialize {tool_name} result: {err}")).to_string()
})
}
fn tool_output_response_item<T>(
call_id: &str,
payload: &ToolPayload,
value: &T,
success: Option<bool>,
tool_name: &str,
) -> ResponseInputItem
where
T: Serialize,
{
FunctionToolOutput::from_text(tool_output_json_text(value, tool_name), success)
.to_response_item(call_id, payload)
}
fn tool_output_code_mode_result<T>(value: &T, tool_name: &str) -> JsonValue
where
T: Serialize,
{
serde_json::to_value(value).unwrap_or_else(|err| {
JsonValue::String(format!("failed to serialize {tool_name} result: {err}"))
})
}
pub mod close_agent;
mod resume_agent;
mod send_input;
mod spawn;
pub(crate) mod wait;
fn build_wait_agent_statuses(
statuses: &HashMap<ThreadId, AgentStatus>,
receiver_agents: &[CollabAgentRef],
) -> Vec<CollabAgentStatusEntry> {
if statuses.is_empty() {
return Vec::new();
}
let mut entries = Vec::with_capacity(statuses.len());
let mut seen = HashMap::with_capacity(receiver_agents.len());
for receiver_agent in receiver_agents {
seen.insert(receiver_agent.thread_id, ());
if let Some(status) = statuses.get(&receiver_agent.thread_id) {
entries.push(CollabAgentStatusEntry {
thread_id: receiver_agent.thread_id,
agent_nickname: receiver_agent.agent_nickname.clone(),
agent_role: receiver_agent.agent_role.clone(),
status: status.clone(),
});
}
}
let mut extras = statuses
.iter()
.filter(|(thread_id, _)| !seen.contains_key(thread_id))
.map(|(thread_id, status)| CollabAgentStatusEntry {
thread_id: *thread_id,
agent_nickname: None,
agent_role: None,
status: status.clone(),
})
.collect::<Vec<_>>();
extras.sort_by(|left, right| left.thread_id.to_string().cmp(&right.thread_id.to_string()));
entries.extend(extras);
entries
}
fn collab_spawn_error(err: CodexErr) -> FunctionCallError {
match err {
CodexErr::UnsupportedOperation(message) if message == "thread manager dropped" => {
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
}
CodexErr::UnsupportedOperation(message) => FunctionCallError::RespondToModel(message),
err => FunctionCallError::RespondToModel(format!("collab spawn failed: {err}")),
}
}
fn collab_agent_error(agent_id: ThreadId, err: CodexErr) -> FunctionCallError {
match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
CodexErr::InternalAgentDied => {
FunctionCallError::RespondToModel(format!("agent with id {agent_id} is closed"))
}
CodexErr::UnsupportedOperation(_) => {
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
}
err => FunctionCallError::RespondToModel(format!("collab tool failed: {err}")),
}
}
fn thread_spawn_source(
parent_thread_id: ThreadId,
parent_session_source: &SessionSource,
depth: i32,
agent_role: Option<&str>,
task_name: Option<String>,
) -> Result<SessionSource, FunctionCallError> {
let agent_path = task_name
.as_deref()
.map(|task_name| {
parent_session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root)
.join(task_name)
.map_err(FunctionCallError::RespondToModel)
})
.transpose()?;
Ok(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth,
agent_path,
agent_nickname: None,
agent_role: agent_role.map(str::to_string),
}))
}
fn parse_collab_input(
message: Option<String>,
items: Option<Vec<UserInput>>,
) -> Result<Vec<UserInput>, FunctionCallError> {
match (message, items) {
(Some(_), Some(_)) => Err(FunctionCallError::RespondToModel(
"Provide either message or items, but not both".to_string(),
)),
(None, None) => Err(FunctionCallError::RespondToModel(
"Provide one of: message or items".to_string(),
)),
(Some(message), None) => {
if message.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be sent to an agent".to_string(),
));
}
Ok(vec![UserInput::Text {
text: message,
text_elements: Vec::new(),
}])
}
(None, Some(items)) => {
if items.is_empty() {
return Err(FunctionCallError::RespondToModel(
"Items can't be empty".to_string(),
));
}
Ok(items)
}
}
}
fn input_preview(items: &[UserInput]) -> String {
let parts: Vec<String> = items
.iter()
.map(|item| match item {
UserInput::Text { text, .. } => text.clone(),
UserInput::Image { .. } => "[image]".to_string(),
UserInput::LocalImage { path } => format!("[local_image:{}]", path.display()),
UserInput::Skill { name, path } => {
format!("[skill:${name}]({})", path.display())
}
UserInput::Mention { name, path } => format!("[mention:${name}]({path})"),
_ => "[input]".to_string(),
})
.collect();
parts.join("\n")
}
/// Builds the base config snapshot for a newly spawned sub-agent.
///
/// The returned config starts from the parent's effective config and then refreshes the
/// runtime-owned fields carried on `turn`, including model selection, reasoning settings,
/// approval policy, sandbox, and cwd. Role-specific overrides are layered after this step;
/// skipping this helper and cloning stale config state directly can send the child agent out with
/// the wrong provider or runtime policy.
pub(crate) fn build_agent_spawn_config(
base_instructions: &BaseInstructions,
turn: &TurnContext,
) -> Result<Config, FunctionCallError> {
let mut config = build_agent_shared_config(turn)?;
config.base_instructions = Some(base_instructions.text.clone());
Ok(config)
}
fn build_agent_resume_config(
turn: &TurnContext,
child_depth: i32,
) -> Result<Config, FunctionCallError> {
let mut config = build_agent_shared_config(turn)?;
apply_spawn_agent_overrides(&mut config, child_depth);
// For resume, keep base instructions sourced from rollout/session metadata.
config.base_instructions = None;
Ok(config)
}
fn build_agent_shared_config(turn: &TurnContext) -> Result<Config, FunctionCallError> {
let base_config = turn.config.clone();
let mut config = (*base_config).clone();
config.model = Some(turn.model_info.slug.clone());
config.model_provider = turn.provider.clone();
config.model_reasoning_effort = turn.reasoning_effort;
config.model_reasoning_summary = Some(turn.reasoning_summary);
config.developer_instructions = turn.developer_instructions.clone();
config.compact_prompt = turn.compact_prompt.clone();
apply_spawn_agent_runtime_overrides(&mut config, turn)?;
Ok(config)
}
/// Copies runtime-only turn state onto a child config before it is handed to `AgentControl`.
///
/// These values are chosen by the live turn rather than persisted config, so leaving them stale
/// can make a child agent disagree with its parent about approval policy, cwd, or sandboxing.
fn apply_spawn_agent_runtime_overrides(
config: &mut Config,
turn: &TurnContext,
) -> Result<(), FunctionCallError> {
config
.permissions
.approval_policy
.set(turn.approval_policy.value())
.map_err(|err| {
FunctionCallError::RespondToModel(format!("approval_policy is invalid: {err}"))
})?;
config.permissions.shell_environment_policy = turn.shell_environment_policy.clone();
config.codex_linux_sandbox_exe = turn.codex_linux_sandbox_exe.clone();
config.cwd = turn.cwd.clone();
config
.permissions
.sandbox_policy
.set(turn.sandbox_policy.get().clone())
.map_err(|err| {
FunctionCallError::RespondToModel(format!("sandbox_policy is invalid: {err}"))
})?;
config.permissions.file_system_sandbox_policy = turn.file_system_sandbox_policy.clone();
config.permissions.network_sandbox_policy = turn.network_sandbox_policy;
Ok(())
}
fn apply_spawn_agent_overrides(config: &mut Config, child_depth: i32) {
if child_depth >= config.agent_max_depth {
let _ = config.features.disable(Feature::SpawnCsv);
let _ = config.features.disable(Feature::Collab);
}
}
async fn apply_requested_spawn_agent_model_overrides(
session: &Session,
turn: &TurnContext,
config: &mut Config,
requested_model: Option<&str>,
requested_reasoning_effort: Option<ReasoningEffort>,
) -> Result<(), FunctionCallError> {
if requested_model.is_none() && requested_reasoning_effort.is_none() {
return Ok(());
}
if let Some(requested_model) = requested_model {
let available_models = session
.services
.models_manager
.list_models(RefreshStrategy::Offline)
.await;
let selected_model_name = find_spawn_agent_model_name(&available_models, requested_model)?;
let selected_model_info = session
.services
.models_manager
.get_model_info(&selected_model_name, config)
.await;
config.model = Some(selected_model_name.clone());
if let Some(reasoning_effort) = requested_reasoning_effort {
validate_spawn_agent_reasoning_effort(
&selected_model_name,
&selected_model_info.supported_reasoning_levels,
reasoning_effort,
)?;
config.model_reasoning_effort = Some(reasoning_effort);
} else {
config.model_reasoning_effort = selected_model_info.default_reasoning_level;
}
return Ok(());
}
if let Some(reasoning_effort) = requested_reasoning_effort {
validate_spawn_agent_reasoning_effort(
&turn.model_info.slug,
&turn.model_info.supported_reasoning_levels,
reasoning_effort,
)?;
config.model_reasoning_effort = Some(reasoning_effort);
}
Ok(())
}
fn find_spawn_agent_model_name(
available_models: &[codex_protocol::openai_models::ModelPreset],
requested_model: &str,
) -> Result<String, FunctionCallError> {
available_models
.iter()
.find(|model| model.model == requested_model)
.map(|model| model.model.clone())
.ok_or_else(|| {
let available = available_models
.iter()
.map(|model| model.model.as_str())
.collect::<Vec<_>>()
.join(", ");
FunctionCallError::RespondToModel(format!(
"Unknown model `{requested_model}` for spawn_agent. Available models: {available}"
))
})
}
fn validate_spawn_agent_reasoning_effort(
model: &str,
supported_reasoning_levels: &[ReasoningEffortPreset],
requested_reasoning_effort: ReasoningEffort,
) -> Result<(), FunctionCallError> {
if supported_reasoning_levels
.iter()
.any(|preset| preset.effort == requested_reasoning_effort)
{
return Ok(());
}
let supported = supported_reasoning_levels
.iter()
.map(|preset| preset.effort.to_string())
.collect::<Vec<_>>()
.join(", ");
Err(FunctionCallError::RespondToModel(format!(
"Reasoning effort `{requested_reasoning_effort}` is not supported for model `{model}`. Supported reasoning efforts: {supported}"
)))
}
#[cfg(test)]
#[path = "multi_agents_tests.rs"]
mod tests;

View File

@@ -1,5 +1,6 @@
use super::*;
use crate::agent::next_thread_spawn_depth;
use std::sync::Arc;
pub(crate) struct Handler;

View File

@@ -1,15 +1,7 @@
use super::*;
use crate::agent::inter_agent_instruction::InterAgentDelivery;
use crate::agent::inter_agent_instruction::InterAgentInstruction;
pub(crate) struct Handler;
fn can_use_v2_inter_agent_instruction(items: &[UserInput]) -> bool {
items
.iter()
.all(|item| matches!(item, UserInput::Text { .. }))
}
#[async_trait]
impl ToolHandler for Handler {
type Output = SendInputResult;
@@ -61,39 +53,10 @@ impl ToolHandler for Handler {
)
.await;
let agent_control = session.services.agent_control.clone();
let result = if turn.config.features.enabled(Feature::MultiAgentV2)
&& can_use_v2_inter_agent_instruction(&input_items)
{
let receiver_agent_path = receiver_agent.agent_path.clone().ok_or_else(|| {
FunctionCallError::RespondToModel(
"target agent is missing an agent_path".to_string(),
)
})?;
let instruction = InterAgentInstruction::new(
turn.session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root),
receiver_agent_path,
Vec::new(),
prompt.clone(),
);
agent_control
.deliver_inter_agent_instruction(
receiver_thread_id,
instruction,
if args.interrupt {
InterAgentDelivery::NextTurn
} else {
InterAgentDelivery::CurrentTurn
},
)
.await
} else {
agent_control
.send_input(receiver_thread_id, input_items)
.await
}
.map_err(|err| collab_agent_error(receiver_thread_id, err));
let result = agent_control
.send_input(receiver_thread_id, input_items)
.await
.map_err(|err| collab_agent_error(receiver_thread_id, err));
let status = session
.services
.agent_control

View File

@@ -1,9 +1,11 @@
use super::*;
use crate::agent::status::is_final;
use crate::error::CodexErr;
use futures::FutureExt;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch::Receiver;
use tokio::time::Instant;

View File

@@ -0,0 +1,381 @@
use crate::agent::AgentStatus;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::CodexErr;
use crate::function_tool::FunctionCallError;
use crate::models_manager::manager::RefreshStrategy;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::openai_models::ReasoningEffortPreset;
use codex_protocol::protocol::CollabAgentRef;
use codex_protocol::protocol::CollabAgentStatusEntry;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::HashMap;
/// Minimum wait timeout to prevent tight polling loops from burning CPU.
pub(crate) const MIN_WAIT_TIMEOUT_MS: i64 = 10_000;
pub(crate) const DEFAULT_WAIT_TIMEOUT_MS: i64 = 30_000;
pub(crate) const MAX_WAIT_TIMEOUT_MS: i64 = 3600 * 1000;
pub(crate) fn function_arguments(payload: ToolPayload) -> Result<String, FunctionCallError> {
match payload {
ToolPayload::Function { arguments } => Ok(arguments),
_ => Err(FunctionCallError::RespondToModel(
"collab handler received unsupported payload".to_string(),
)),
}
}
pub(crate) fn tool_output_json_text<T>(value: &T, tool_name: &str) -> String
where
T: Serialize,
{
serde_json::to_string(value).unwrap_or_else(|err| {
JsonValue::String(format!("failed to serialize {tool_name} result: {err}")).to_string()
})
}
pub(crate) fn tool_output_response_item<T>(
call_id: &str,
payload: &ToolPayload,
value: &T,
success: Option<bool>,
tool_name: &str,
) -> ResponseInputItem
where
T: Serialize,
{
FunctionToolOutput::from_text(tool_output_json_text(value, tool_name), success)
.to_response_item(call_id, payload)
}
pub(crate) fn tool_output_code_mode_result<T>(value: &T, tool_name: &str) -> JsonValue
where
T: Serialize,
{
serde_json::to_value(value).unwrap_or_else(|err| {
JsonValue::String(format!("failed to serialize {tool_name} result: {err}"))
})
}
pub(crate) fn build_wait_agent_statuses(
statuses: &HashMap<ThreadId, AgentStatus>,
receiver_agents: &[CollabAgentRef],
) -> Vec<CollabAgentStatusEntry> {
if statuses.is_empty() {
return Vec::new();
}
let mut entries = Vec::with_capacity(statuses.len());
let mut seen = HashMap::with_capacity(receiver_agents.len());
for receiver_agent in receiver_agents {
seen.insert(receiver_agent.thread_id, ());
if let Some(status) = statuses.get(&receiver_agent.thread_id) {
entries.push(CollabAgentStatusEntry {
thread_id: receiver_agent.thread_id,
agent_nickname: receiver_agent.agent_nickname.clone(),
agent_role: receiver_agent.agent_role.clone(),
status: status.clone(),
});
}
}
let mut extras = statuses
.iter()
.filter(|(thread_id, _)| !seen.contains_key(thread_id))
.map(|(thread_id, status)| CollabAgentStatusEntry {
thread_id: *thread_id,
agent_nickname: None,
agent_role: None,
status: status.clone(),
})
.collect::<Vec<_>>();
extras.sort_by(|left, right| left.thread_id.to_string().cmp(&right.thread_id.to_string()));
entries.extend(extras);
entries
}
pub(crate) fn collab_spawn_error(err: CodexErr) -> FunctionCallError {
match err {
CodexErr::UnsupportedOperation(message) if message == "thread manager dropped" => {
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
}
CodexErr::UnsupportedOperation(message) => FunctionCallError::RespondToModel(message),
err => FunctionCallError::RespondToModel(format!("collab spawn failed: {err}")),
}
}
pub(crate) fn collab_agent_error(agent_id: ThreadId, err: CodexErr) -> FunctionCallError {
match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
CodexErr::InternalAgentDied => {
FunctionCallError::RespondToModel(format!("agent with id {agent_id} is closed"))
}
CodexErr::UnsupportedOperation(_) => {
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
}
err => FunctionCallError::RespondToModel(format!("collab tool failed: {err}")),
}
}
pub(crate) fn thread_spawn_source(
parent_thread_id: ThreadId,
parent_session_source: &SessionSource,
depth: i32,
agent_role: Option<&str>,
task_name: Option<String>,
) -> Result<SessionSource, FunctionCallError> {
let agent_path = task_name
.as_deref()
.map(|task_name| {
parent_session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root)
.join(task_name)
.map_err(FunctionCallError::RespondToModel)
})
.transpose()?;
Ok(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth,
agent_path,
agent_nickname: None,
agent_role: agent_role.map(str::to_string),
}))
}
pub(crate) fn parse_collab_input(
message: Option<String>,
items: Option<Vec<UserInput>>,
) -> Result<Vec<UserInput>, FunctionCallError> {
match (message, items) {
(Some(_), Some(_)) => Err(FunctionCallError::RespondToModel(
"Provide either message or items, but not both".to_string(),
)),
(None, None) => Err(FunctionCallError::RespondToModel(
"Provide one of: message or items".to_string(),
)),
(Some(message), None) => {
if message.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be sent to an agent".to_string(),
));
}
Ok(vec![UserInput::Text {
text: message,
text_elements: Vec::new(),
}])
}
(None, Some(items)) => {
if items.is_empty() {
return Err(FunctionCallError::RespondToModel(
"Items can't be empty".to_string(),
));
}
Ok(items)
}
}
}
pub(crate) fn input_preview(items: &[UserInput]) -> String {
let parts: Vec<String> = items
.iter()
.map(|item| match item {
UserInput::Text { text, .. } => text.clone(),
UserInput::Image { .. } => "[image]".to_string(),
UserInput::LocalImage { path } => format!("[local_image:{}]", path.display()),
UserInput::Skill { name, path } => {
format!("[skill:${name}]({})", path.display())
}
UserInput::Mention { name, path } => format!("[mention:${name}]({path})"),
_ => "[input]".to_string(),
})
.collect();
parts.join("\n")
}
/// Builds the base config snapshot for a newly spawned sub-agent.
///
/// The returned config starts from the parent's effective config and then refreshes the
/// runtime-owned fields carried on `turn`, including model selection, reasoning settings,
/// approval policy, sandbox, and cwd. Role-specific overrides are layered after this step;
/// skipping this helper and cloning stale config state directly can send the child agent out with
/// the wrong provider or runtime policy.
pub(crate) fn build_agent_spawn_config(
base_instructions: &BaseInstructions,
turn: &TurnContext,
) -> Result<Config, FunctionCallError> {
let mut config = build_agent_shared_config(turn)?;
config.base_instructions = Some(base_instructions.text.clone());
Ok(config)
}
pub(crate) fn build_agent_resume_config(
turn: &TurnContext,
child_depth: i32,
) -> Result<Config, FunctionCallError> {
let mut config = build_agent_shared_config(turn)?;
apply_spawn_agent_overrides(&mut config, child_depth);
// For resume, keep base instructions sourced from rollout/session metadata.
config.base_instructions = None;
Ok(config)
}
fn build_agent_shared_config(turn: &TurnContext) -> Result<Config, FunctionCallError> {
let base_config = turn.config.clone();
let mut config = (*base_config).clone();
config.model = Some(turn.model_info.slug.clone());
config.model_provider = turn.provider.clone();
config.model_reasoning_effort = turn.reasoning_effort;
config.model_reasoning_summary = Some(turn.reasoning_summary);
config.developer_instructions = turn.developer_instructions.clone();
config.compact_prompt = turn.compact_prompt.clone();
apply_spawn_agent_runtime_overrides(&mut config, turn)?;
Ok(config)
}
/// Copies runtime-only turn state onto a child config before it is handed to `AgentControl`.
///
/// These values are chosen by the live turn rather than persisted config, so leaving them stale
/// can make a child agent disagree with its parent about approval policy, cwd, or sandboxing.
pub(crate) fn apply_spawn_agent_runtime_overrides(
config: &mut Config,
turn: &TurnContext,
) -> Result<(), FunctionCallError> {
config
.permissions
.approval_policy
.set(turn.approval_policy.value())
.map_err(|err| {
FunctionCallError::RespondToModel(format!("approval_policy is invalid: {err}"))
})?;
config.permissions.shell_environment_policy = turn.shell_environment_policy.clone();
config.codex_linux_sandbox_exe = turn.codex_linux_sandbox_exe.clone();
config.cwd = turn.cwd.clone();
config
.permissions
.sandbox_policy
.set(turn.sandbox_policy.get().clone())
.map_err(|err| {
FunctionCallError::RespondToModel(format!("sandbox_policy is invalid: {err}"))
})?;
config.permissions.file_system_sandbox_policy = turn.file_system_sandbox_policy.clone();
config.permissions.network_sandbox_policy = turn.network_sandbox_policy;
Ok(())
}
pub(crate) fn apply_spawn_agent_overrides(config: &mut Config, child_depth: i32) {
if child_depth >= config.agent_max_depth {
let _ = config.features.disable(Feature::SpawnCsv);
let _ = config.features.disable(Feature::Collab);
}
}
pub(crate) async fn apply_requested_spawn_agent_model_overrides(
session: &Session,
turn: &TurnContext,
config: &mut Config,
requested_model: Option<&str>,
requested_reasoning_effort: Option<ReasoningEffort>,
) -> Result<(), FunctionCallError> {
if requested_model.is_none() && requested_reasoning_effort.is_none() {
return Ok(());
}
if let Some(requested_model) = requested_model {
let available_models = session
.services
.models_manager
.list_models(RefreshStrategy::Offline)
.await;
let selected_model_name = find_spawn_agent_model_name(&available_models, requested_model)?;
let selected_model_info = session
.services
.models_manager
.get_model_info(&selected_model_name, config)
.await;
config.model = Some(selected_model_name.clone());
if let Some(reasoning_effort) = requested_reasoning_effort {
validate_spawn_agent_reasoning_effort(
&selected_model_name,
&selected_model_info.supported_reasoning_levels,
reasoning_effort,
)?;
config.model_reasoning_effort = Some(reasoning_effort);
} else {
config.model_reasoning_effort = selected_model_info.default_reasoning_level;
}
return Ok(());
}
if let Some(reasoning_effort) = requested_reasoning_effort {
validate_spawn_agent_reasoning_effort(
&turn.model_info.slug,
&turn.model_info.supported_reasoning_levels,
reasoning_effort,
)?;
config.model_reasoning_effort = Some(reasoning_effort);
}
Ok(())
}
fn find_spawn_agent_model_name(
available_models: &[codex_protocol::openai_models::ModelPreset],
requested_model: &str,
) -> Result<String, FunctionCallError> {
available_models
.iter()
.find(|model| model.model == requested_model)
.map(|model| model.model.clone())
.ok_or_else(|| {
let available = available_models
.iter()
.map(|model| model.model.as_str())
.collect::<Vec<_>>()
.join(", ");
FunctionCallError::RespondToModel(format!(
"Unknown model `{requested_model}` for spawn_agent. Available models: {available}"
))
})
}
fn validate_spawn_agent_reasoning_effort(
model: &str,
supported_reasoning_levels: &[ReasoningEffortPreset],
requested_reasoning_effort: ReasoningEffort,
) -> Result<(), FunctionCallError> {
if supported_reasoning_levels
.iter()
.any(|preset| preset.effort == requested_reasoning_effort)
{
return Ok(());
}
let supported = supported_reasoning_levels
.iter()
.map(|preset| preset.effort.to_string())
.collect::<Vec<_>>()
.join(", ");
Err(FunctionCallError::RespondToModel(format!(
"Reasoning effort `{requested_reasoning_effort}` is not supported for model `{model}`. Supported reasoning efforts: {supported}"
)))
}

View File

@@ -19,9 +19,13 @@ use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use crate::tools::context::ToolOutput;
use crate::tools::handlers::multi_agents_v2::SendInputHandler as SendInputHandlerV2;
use crate::tools::handlers::multi_agents_v2::SpawnAgentHandler as SpawnAgentHandlerV2;
use crate::tools::handlers::multi_agents_v2::WaitAgentHandler as WaitAgentHandlerV2;
use crate::turn_diff_tracker::TurnDiffTracker;
use codex_features::Feature;
use codex_protocol::ThreadId;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::ResponseInputItem;
@@ -317,7 +321,7 @@ async fn multi_agent_v2_spawn_returns_path_and_send_input_accepts_relative_path(
let session = Arc::new(session);
let turn = Arc::new(turn);
let spawn_output = SpawnAgentHandler
let spawn_output = SpawnAgentHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
@@ -356,7 +360,7 @@ async fn multi_agent_v2_spawn_returns_path_and_send_input_accepts_relative_path(
Some("/root/test_process")
);
SendInputHandler
SendInputHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
@@ -432,7 +436,7 @@ async fn multi_agent_v2_send_input_accepts_structured_items() {
let session = Arc::new(session);
let turn = Arc::new(turn);
SpawnAgentHandler
SpawnAgentHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
@@ -467,7 +471,7 @@ async fn multi_agent_v2_send_input_accepts_structured_items() {
})),
);
SendInputHandler
SendInputHandlerV2
.handle(invocation)
.await
.expect("structured items should be accepted in v2");
@@ -557,7 +561,7 @@ async fn multi_agent_v2_send_input_interrupts_busy_child_without_losing_message(
let session = Arc::new(session);
let turn = Arc::new(turn);
SpawnAgentHandler
SpawnAgentHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
@@ -594,7 +598,7 @@ async fn multi_agent_v2_send_input_interrupts_busy_child_without_losing_message(
)
.await;
SendInputHandler
SendInputHandlerV2
.handle(invocation(
session,
turn,
@@ -688,7 +692,7 @@ async fn multi_agent_v2_spawn_includes_agent_id_key_when_named() {
.expect("test config should allow feature update");
turn.config = Arc::new(config);
let output = SpawnAgentHandler
let output = SpawnAgentHandlerV2
.handle(invocation(
Arc::new(session),
Arc::new(turn),
@@ -736,7 +740,7 @@ async fn multi_agent_v2_spawn_surfaces_task_name_validation_errors() {
"task_name": "BadName"
})),
);
let Err(err) = SpawnAgentHandler.handle(invocation).await else {
let Err(err) = SpawnAgentHandlerV2.handle(invocation).await else {
panic!("invalid agent name should be rejected");
};
assert_eq!(
@@ -1355,16 +1359,16 @@ async fn multi_agent_v2_wait_agent_accepts_targets_argument() {
"wait_agent",
function_payload(json!({"targets": [target.clone()]})),
);
let output = WaitAgentHandler
let output = WaitAgentHandlerV2
.handle(invocation)
.await
.expect("targets should be accepted in v2 mode");
let (content, success) = expect_text_output(output);
let result: wait::WaitAgentResult =
let result: crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult =
serde_json::from_str(&content).expect("wait_agent result should be json");
assert_eq!(
result,
wait::WaitAgentResult {
crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult {
status: HashMap::from([(target, AgentStatus::NotFound)]),
timed_out: false,
}
@@ -1556,7 +1560,7 @@ async fn multi_agent_v2_wait_agent_returns_statuses_keyed_by_path() {
let session = Arc::new(session);
let turn = Arc::new(turn);
let spawn_output = SpawnAgentHandler
let spawn_output = SpawnAgentHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
@@ -1600,7 +1604,7 @@ async fn multi_agent_v2_wait_agent_returns_statuses_keyed_by_path() {
.await
.expect("shutdown status should arrive");
let wait_output = WaitAgentHandler
let wait_output = WaitAgentHandlerV2
.handle(invocation(
session,
turn,
@@ -1613,11 +1617,11 @@ async fn multi_agent_v2_wait_agent_returns_statuses_keyed_by_path() {
.await
.expect("wait_agent should succeed");
let (content, success) = expect_text_output(wait_output);
let result: wait::WaitAgentResult =
let result: crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult =
serde_json::from_str(&content).expect("wait_agent result should be json");
assert_eq!(
result,
wait::WaitAgentResult {
crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult {
status: HashMap::from([(spawn_result.task_name, AgentStatus::Shutdown)]),
timed_out: false,
}

View File

@@ -0,0 +1,39 @@
//! Implements the MultiAgentV2 collaboration tool surface.
use crate::agent::AgentStatus;
use crate::agent::agent_resolver::resolve_agent_target;
use crate::agent::agent_resolver::resolve_agent_targets;
use crate::agent::exceeds_thread_spawn_depth_limit;
use crate::codex::Session;
use crate::function_tool::FunctionCallError;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
use crate::tools::handlers::multi_agents_common::*;
use crate::tools::handlers::parse_arguments;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use async_trait::async_trait;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
use codex_protocol::protocol::CollabAgentRef;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabWaitingBeginEvent;
use codex_protocol::protocol::CollabWaitingEndEvent;
use codex_protocol::user_input::UserInput;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
pub(crate) use send_input::Handler as SendInputHandler;
pub(crate) use spawn::Handler as SpawnAgentHandler;
pub(crate) use wait::Handler as WaitAgentHandler;
mod send_input;
mod spawn;
pub(crate) mod wait;

View File

@@ -0,0 +1,154 @@
use super::*;
use crate::agent::inter_agent_instruction::InterAgentDelivery;
use crate::agent::inter_agent_instruction::InterAgentInstruction;
pub(crate) struct Handler;
fn can_use_v2_inter_agent_instruction(items: &[UserInput]) -> bool {
items
.iter()
.all(|item| matches!(item, UserInput::Text { .. }))
}
#[async_trait]
impl ToolHandler for Handler {
type Output = SendInputResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SendInputArgs = parse_arguments(&arguments)?;
let receiver_thread_id = resolve_agent_target(&session, &turn, &args.target).await?;
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = input_preview(&input_items);
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or_default();
if args.interrupt {
session
.services
.agent_control
.interrupt_agent(receiver_thread_id)
.await
.map_err(|err| collab_agent_error(receiver_thread_id, err))?;
}
session
.send_event(
&turn,
CollabAgentInteractionBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
prompt: prompt.clone(),
}
.into(),
)
.await;
let result = if can_use_v2_inter_agent_instruction(&input_items) {
let receiver_agent_path = receiver_agent.agent_path.clone().ok_or_else(|| {
FunctionCallError::RespondToModel(
"target agent is missing an agent_path".to_string(),
)
})?;
let instruction = InterAgentInstruction::new(
turn.session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root),
receiver_agent_path,
Vec::new(),
prompt.clone(),
);
session
.services
.agent_control
.deliver_inter_agent_instruction(
receiver_thread_id,
instruction,
if args.interrupt {
InterAgentDelivery::NextTurn
} else {
InterAgentDelivery::CurrentTurn
},
)
.await
} else {
session
.services
.agent_control
.send_input(receiver_thread_id, input_items)
.await
}
.map_err(|err| collab_agent_error(receiver_thread_id, err));
let status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
session
.send_event(
&turn,
CollabAgentInteractionEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
prompt,
status,
}
.into(),
)
.await;
let submission_id = result?;
Ok(SendInputResult { submission_id })
}
}
#[derive(Debug, Deserialize)]
struct SendInputArgs {
target: String,
message: Option<String>,
items: Option<Vec<UserInput>>,
#[serde(default)]
interrupt: bool,
}
#[derive(Debug, Serialize)]
pub(crate) struct SendInputResult {
submission_id: String,
}
impl ToolOutput for SendInputResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "send_input")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "send_input")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "send_input")
}
}

View File

@@ -0,0 +1,204 @@
use super::*;
use crate::agent::control::SpawnAgentOptions;
use crate::agent::next_thread_spawn_depth;
use crate::agent::role::DEFAULT_ROLE_NAME;
use crate::agent::role::apply_role_to_config;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = SpawnAgentResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = input_preview(&input_items);
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let result = session
.services
.agent_control
.spawn_agent_with_metadata(
config,
input_items,
Some(thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
args.task_name.clone(),
)?),
SpawnAgentOptions {
fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()),
},
)
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
};
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
}
None => None,
};
let (new_agent_path, new_agent_nickname, new_agent_role) =
match (&agent_snapshot, new_agent_metadata) {
(Some(snapshot), _) => (
snapshot.session_source.get_agent_path().map(String::from),
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
),
(None, Some(metadata)) => (
metadata.agent_path.map(String::from),
metadata.agent_nickname,
metadata.agent_role,
),
(None, None) => (None, None, None),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
let task_name = new_agent_path.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
}
.into(),
)
.await;
let new_thread_id = result?.thread_id;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
/*inc*/ 1,
&[("role", role_tag)],
);
Ok(SpawnAgentResult {
agent_id: task_name.is_none().then(|| new_thread_id.to_string()),
task_name,
nickname,
})
}
}
#[derive(Debug, Deserialize)]
struct SpawnAgentArgs {
message: Option<String>,
items: Option<Vec<UserInput>>,
task_name: Option<String>,
agent_type: Option<String>,
model: Option<String>,
reasoning_effort: Option<ReasoningEffort>,
#[serde(default)]
fork_context: bool,
}
#[derive(Debug, Serialize)]
pub(crate) struct SpawnAgentResult {
agent_id: Option<String>,
task_name: Option<String>,
nickname: Option<String>,
}
impl ToolOutput for SpawnAgentResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "spawn_agent")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "spawn_agent")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "spawn_agent")
}
}

View File

@@ -0,0 +1,236 @@
use super::*;
use crate::agent::status::is_final;
use futures::FutureExt;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::watch::Receiver;
use tokio::time::Instant;
use tokio::time::timeout_at;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = WaitAgentResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: WaitArgs = parse_arguments(&arguments)?;
let receiver_thread_ids = resolve_agent_targets(&session, &turn, args.targets).await?;
let mut receiver_agents = Vec::with_capacity(receiver_thread_ids.len());
let mut target_by_thread_id = HashMap::with_capacity(receiver_thread_ids.len());
for receiver_thread_id in &receiver_thread_ids {
let agent_metadata = session
.services
.agent_control
.get_agent_metadata(*receiver_thread_id)
.unwrap_or_default();
target_by_thread_id.insert(
*receiver_thread_id,
agent_metadata
.agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| receiver_thread_id.to_string()),
);
receiver_agents.push(CollabAgentRef {
thread_id: *receiver_thread_id,
agent_nickname: agent_metadata.agent_nickname,
agent_role: agent_metadata.agent_role,
});
}
let timeout_ms = args.timeout_ms.unwrap_or(DEFAULT_WAIT_TIMEOUT_MS);
let timeout_ms = match timeout_ms {
ms if ms <= 0 => {
return Err(FunctionCallError::RespondToModel(
"timeout_ms must be greater than zero".to_owned(),
));
}
ms => ms.clamp(MIN_WAIT_TIMEOUT_MS, MAX_WAIT_TIMEOUT_MS),
};
session
.send_event(
&turn,
CollabWaitingBeginEvent {
sender_thread_id: session.conversation_id,
receiver_thread_ids: receiver_thread_ids.clone(),
receiver_agents: receiver_agents.clone(),
call_id: call_id.clone(),
}
.into(),
)
.await;
let mut status_rxs = Vec::with_capacity(receiver_thread_ids.len());
let mut initial_final_statuses = Vec::new();
for id in &receiver_thread_ids {
match session.services.agent_control.subscribe_status(*id).await {
Ok(rx) => {
let status = rx.borrow().clone();
if is_final(&status) {
initial_final_statuses.push((*id, status));
}
status_rxs.push((*id, rx));
}
Err(crate::error::CodexErr::ThreadNotFound(_)) => {
initial_final_statuses.push((*id, AgentStatus::NotFound));
}
Err(err) => {
let mut statuses = HashMap::with_capacity(1);
statuses.insert(*id, session.services.agent_control.get_status(*id).await);
session
.send_event(
&turn,
CollabWaitingEndEvent {
sender_thread_id: session.conversation_id,
call_id: call_id.clone(),
agent_statuses: build_wait_agent_statuses(
&statuses,
&receiver_agents,
),
statuses,
}
.into(),
)
.await;
return Err(collab_agent_error(*id, err));
}
}
}
let statuses = if !initial_final_statuses.is_empty() {
initial_final_statuses
} else {
let mut futures = FuturesUnordered::new();
for (id, rx) in status_rxs {
let session = session.clone();
futures.push(wait_for_final_status(session, id, rx));
}
let mut results = Vec::new();
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
loop {
match timeout_at(deadline, futures.next()).await {
Ok(Some(Some(result))) => {
results.push(result);
break;
}
Ok(Some(None)) => continue,
Ok(None) | Err(_) => break,
}
}
if !results.is_empty() {
loop {
match futures.next().now_or_never() {
Some(Some(Some(result))) => results.push(result),
Some(Some(None)) => continue,
Some(None) | None => break,
}
}
}
results
};
let timed_out = statuses.is_empty();
let statuses_by_id = statuses.clone().into_iter().collect::<HashMap<_, _>>();
let agent_statuses = build_wait_agent_statuses(&statuses_by_id, &receiver_agents);
let result = WaitAgentResult {
status: statuses
.into_iter()
.filter_map(|(thread_id, status)| {
target_by_thread_id
.get(&thread_id)
.cloned()
.map(|target| (target, status))
})
.collect(),
timed_out,
};
session
.send_event(
&turn,
CollabWaitingEndEvent {
sender_thread_id: session.conversation_id,
call_id,
agent_statuses,
statuses: statuses_by_id,
}
.into(),
)
.await;
Ok(result)
}
}
#[derive(Debug, Deserialize)]
struct WaitArgs {
#[serde(default)]
targets: Vec<String>,
timeout_ms: Option<i64>,
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) struct WaitAgentResult {
pub(crate) status: HashMap<String, AgentStatus>,
pub(crate) timed_out: bool,
}
impl ToolOutput for WaitAgentResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "wait_agent")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, /*success*/ None, "wait_agent")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "wait_agent")
}
}
async fn wait_for_final_status(
session: std::sync::Arc<Session>,
thread_id: ThreadId,
mut status_rx: Receiver<AgentStatus>,
) -> Option<(ThreadId, AgentStatus)> {
let mut status = status_rx.borrow().clone();
if is_final(&status) {
return Some((thread_id, status));
}
loop {
if status_rx.changed().await.is_err() {
let latest = session.services.agent_control.get_status(thread_id).await;
return is_final(&latest).then_some((thread_id, latest));
}
status = status_rx.borrow().clone();
if is_final(&status) {
return Some((thread_id, status));
}
}
}

View File

@@ -23,9 +23,9 @@ use crate::tools::handlers::TOOL_SUGGEST_TOOL_NAME;
use crate::tools::handlers::agent_jobs::BatchJobHandler;
use crate::tools::handlers::apply_patch::create_apply_patch_freeform_tool;
use crate::tools::handlers::apply_patch::create_apply_patch_json_tool;
use crate::tools::handlers::multi_agents::DEFAULT_WAIT_TIMEOUT_MS;
use crate::tools::handlers::multi_agents::MAX_WAIT_TIMEOUT_MS;
use crate::tools::handlers::multi_agents::MIN_WAIT_TIMEOUT_MS;
use crate::tools::handlers::multi_agents_common::DEFAULT_WAIT_TIMEOUT_MS;
use crate::tools::handlers::multi_agents_common::MAX_WAIT_TIMEOUT_MS;
use crate::tools::handlers::multi_agents_common::MIN_WAIT_TIMEOUT_MS;
use crate::tools::handlers::request_permissions_tool_description;
use crate::tools::handlers::request_user_input_tool_description;
use crate::tools::registry::ToolRegistryBuilder;
@@ -2602,6 +2602,9 @@ pub(crate) fn build_specs_with_discoverable_tools(
use crate::tools::handlers::multi_agents::SendInputHandler;
use crate::tools::handlers::multi_agents::SpawnAgentHandler;
use crate::tools::handlers::multi_agents::WaitAgentHandler;
use crate::tools::handlers::multi_agents_v2::SendInputHandler as SendInputHandlerV2;
use crate::tools::handlers::multi_agents_v2::SpawnAgentHandler as SpawnAgentHandlerV2;
use crate::tools::handlers::multi_agents_v2::WaitAgentHandler as WaitAgentHandlerV2;
use std::sync::Arc;
let mut builder = ToolRegistryBuilder::new();
@@ -3013,9 +3016,15 @@ pub(crate) fn build_specs_with_discoverable_tools(
/*supports_parallel_tool_calls*/ false,
config.code_mode_enabled,
);
builder.register_handler("spawn_agent", Arc::new(SpawnAgentHandler));
builder.register_handler("send_input", Arc::new(SendInputHandler));
builder.register_handler("wait_agent", Arc::new(WaitAgentHandler));
if config.multi_agent_v2 {
builder.register_handler("spawn_agent", Arc::new(SpawnAgentHandlerV2));
builder.register_handler("send_input", Arc::new(SendInputHandlerV2));
builder.register_handler("wait_agent", Arc::new(WaitAgentHandlerV2));
} else {
builder.register_handler("spawn_agent", Arc::new(SpawnAgentHandler));
builder.register_handler("send_input", Arc::new(SendInputHandler));
builder.register_handler("wait_agent", Arc::new(WaitAgentHandler));
}
builder.register_handler("close_agent", Arc::new(CloseAgentHandler));
}