Split multi-agent handler into dedicated files (#14603)

## Summary
- move the multi-agent handlers suite into its own files for spawn,
wait, resume, send input, and close logic
- keep the aggregated module in place while delegating each handler to
its new file to keep things organized per handler

## Testing
- Not run (not requested)
This commit is contained in:
pakrym-oai
2026-03-13 09:11:03 -07:00
committed by GitHub
parent c7e847aaeb
commit 8e89e9eded
6 changed files with 810 additions and 836 deletions

View File

@@ -105,842 +105,11 @@ where
})
}
mod spawn {
use super::*;
use crate::agent::control::SpawnAgentOptions;
use crate::agent::role::DEFAULT_ROLE_NAME;
use crate::agent::role::apply_role_to_config;
use crate::agent::exceeds_thread_spawn_depth_limit;
use crate::agent::next_thread_spawn_depth;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = SpawnAgentResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(
&self,
invocation: ToolInvocation,
) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = input_preview(&input_items);
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let result = session
.services
.agent_control
.spawn_agent_with_options(
config,
input_items,
Some(thread_spawn_source(
session.conversation_id,
child_depth,
role_name,
)),
SpawnAgentOptions {
fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()),
},
)
.await
.map_err(collab_spawn_error);
let (new_thread_id, status) = match &result {
Ok(thread_id) => (
Some(*thread_id),
session.services.agent_control.get_status(*thread_id).await,
),
Err(_) => (None, AgentStatus::NotFound),
};
let (new_agent_nickname, new_agent_role) = match new_thread_id {
Some(thread_id) => session
.services
.agent_control
.get_agent_nickname_and_role(thread_id)
.await
.unwrap_or((None, None)),
None => (None, None),
};
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
status,
}
.into(),
)
.await;
let new_thread_id = result?;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry
.counter("codex.multi_agent.spawn", 1, &[("role", role_tag)]);
Ok(SpawnAgentResult {
agent_id: new_thread_id.to_string(),
nickname,
})
}
}
#[derive(Debug, Deserialize)]
struct SpawnAgentArgs {
message: Option<String>,
items: Option<Vec<UserInput>>,
agent_type: Option<String>,
model: Option<String>,
reasoning_effort: Option<ReasoningEffort>,
#[serde(default)]
fork_context: bool,
}
#[derive(Debug, Serialize)]
pub(crate) struct SpawnAgentResult {
agent_id: String,
nickname: Option<String>,
}
impl ToolOutput for SpawnAgentResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "spawn_agent")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "spawn_agent")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "spawn_agent")
}
}
}
mod send_input {
use super::*;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = SendInputResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(
&self,
invocation: ToolInvocation,
) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SendInputArgs = parse_arguments(&arguments)?;
let receiver_thread_id = agent_id(&args.id)?;
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = input_preview(&input_items);
let (receiver_agent_nickname, receiver_agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(receiver_thread_id)
.await
.unwrap_or((None, None));
if args.interrupt {
session
.services
.agent_control
.interrupt_agent(receiver_thread_id)
.await
.map_err(|err| collab_agent_error(receiver_thread_id, err))?;
}
session
.send_event(
&turn,
CollabAgentInteractionBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
prompt: prompt.clone(),
}
.into(),
)
.await;
let result = session
.services
.agent_control
.send_input(receiver_thread_id, input_items)
.await
.map_err(|err| collab_agent_error(receiver_thread_id, err));
let status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
session
.send_event(
&turn,
CollabAgentInteractionEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname,
receiver_agent_role,
prompt,
status,
}
.into(),
)
.await;
let submission_id = result?;
Ok(SendInputResult { submission_id })
}
}
#[derive(Debug, Deserialize)]
struct SendInputArgs {
id: String,
message: Option<String>,
items: Option<Vec<UserInput>>,
#[serde(default)]
interrupt: bool,
}
#[derive(Debug, Serialize)]
pub(crate) struct SendInputResult {
submission_id: String,
}
impl ToolOutput for SendInputResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "send_input")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "send_input")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "send_input")
}
}
}
mod resume_agent {
use super::*;
use crate::agent::next_thread_spawn_depth;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = ResumeAgentResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(
&self,
invocation: ToolInvocation,
) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: ResumeAgentArgs = parse_arguments(&arguments)?;
let receiver_thread_id = agent_id(&args.id)?;
let (receiver_agent_nickname, receiver_agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(receiver_thread_id)
.await
.unwrap_or((None, None));
let child_depth = next_thread_spawn_depth(&turn.session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabResumeBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent_nickname.clone(),
receiver_agent_role: receiver_agent_role.clone(),
}
.into(),
)
.await;
let mut status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
let error = if matches!(status, AgentStatus::NotFound) {
match try_resume_closed_agent(&session, &turn, receiver_thread_id, child_depth)
.await
{
Ok(resumed_status) => {
status = resumed_status;
None
}
Err(err) => {
status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
Some(err)
}
}
} else {
None
};
let (receiver_agent_nickname, receiver_agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(receiver_thread_id)
.await
.unwrap_or((receiver_agent_nickname, receiver_agent_role));
session
.send_event(
&turn,
CollabResumeEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname,
receiver_agent_role,
status: status.clone(),
}
.into(),
)
.await;
if let Some(err) = error {
return Err(err);
}
turn.session_telemetry
.counter("codex.multi_agent.resume", 1, &[]);
Ok(ResumeAgentResult { status })
}
}
#[derive(Debug, Deserialize)]
struct ResumeAgentArgs {
id: String,
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) struct ResumeAgentResult {
pub(crate) status: AgentStatus,
}
impl ToolOutput for ResumeAgentResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "resume_agent")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "resume_agent")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "resume_agent")
}
}
async fn try_resume_closed_agent(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
receiver_thread_id: ThreadId,
child_depth: i32,
) -> Result<AgentStatus, FunctionCallError> {
let config = build_agent_resume_config(turn.as_ref(), child_depth)?;
let resumed_thread_id = session
.services
.agent_control
.resume_agent_from_rollout(
config,
receiver_thread_id,
thread_spawn_source(session.conversation_id, child_depth, None),
)
.await
.map_err(|err| collab_agent_error(receiver_thread_id, err))?;
Ok(session
.services
.agent_control
.get_status(resumed_thread_id)
.await)
}
}
pub(crate) mod wait {
use super::*;
use crate::agent::status::is_final;
use futures::FutureExt;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::watch::Receiver;
use tokio::time::Instant;
use tokio::time::timeout_at;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = WaitResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(
&self,
invocation: ToolInvocation,
) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: WaitArgs = parse_arguments(&arguments)?;
if args.ids.is_empty() {
return Err(FunctionCallError::RespondToModel(
"ids must be non-empty".to_owned(),
));
}
let receiver_thread_ids = args
.ids
.iter()
.map(|id| agent_id(id))
.collect::<Result<Vec<_>, _>>()?;
let mut receiver_agents = Vec::with_capacity(receiver_thread_ids.len());
for receiver_thread_id in &receiver_thread_ids {
let (agent_nickname, agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(*receiver_thread_id)
.await
.unwrap_or((None, None));
receiver_agents.push(CollabAgentRef {
thread_id: *receiver_thread_id,
agent_nickname,
agent_role,
});
}
let timeout_ms = args.timeout_ms.unwrap_or(DEFAULT_WAIT_TIMEOUT_MS);
let timeout_ms = match timeout_ms {
ms if ms <= 0 => {
return Err(FunctionCallError::RespondToModel(
"timeout_ms must be greater than zero".to_owned(),
));
}
ms => ms.clamp(MIN_WAIT_TIMEOUT_MS, MAX_WAIT_TIMEOUT_MS),
};
session
.send_event(
&turn,
CollabWaitingBeginEvent {
sender_thread_id: session.conversation_id,
receiver_thread_ids: receiver_thread_ids.clone(),
receiver_agents: receiver_agents.clone(),
call_id: call_id.clone(),
}
.into(),
)
.await;
let mut status_rxs = Vec::with_capacity(receiver_thread_ids.len());
let mut initial_final_statuses = Vec::new();
for id in &receiver_thread_ids {
match session.services.agent_control.subscribe_status(*id).await {
Ok(rx) => {
let status = rx.borrow().clone();
if is_final(&status) {
initial_final_statuses.push((*id, status));
}
status_rxs.push((*id, rx));
}
Err(CodexErr::ThreadNotFound(_)) => {
initial_final_statuses.push((*id, AgentStatus::NotFound));
}
Err(err) => {
let mut statuses = HashMap::with_capacity(1);
statuses.insert(*id, session.services.agent_control.get_status(*id).await);
session
.send_event(
&turn,
CollabWaitingEndEvent {
sender_thread_id: session.conversation_id,
call_id: call_id.clone(),
agent_statuses: build_wait_agent_statuses(
&statuses,
&receiver_agents,
),
statuses,
}
.into(),
)
.await;
return Err(collab_agent_error(*id, err));
}
}
}
let statuses = if !initial_final_statuses.is_empty() {
initial_final_statuses
} else {
let mut futures = FuturesUnordered::new();
for (id, rx) in status_rxs.into_iter() {
let session = session.clone();
futures.push(wait_for_final_status(session, id, rx));
}
let mut results = Vec::new();
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
loop {
match timeout_at(deadline, futures.next()).await {
Ok(Some(Some(result))) => {
results.push(result);
break;
}
Ok(Some(None)) => continue,
Ok(None) | Err(_) => break,
}
}
if !results.is_empty() {
loop {
match futures.next().now_or_never() {
Some(Some(Some(result))) => results.push(result),
Some(Some(None)) => continue,
Some(None) | None => break,
}
}
}
results
};
let statuses_map = statuses.clone().into_iter().collect::<HashMap<_, _>>();
let agent_statuses = build_wait_agent_statuses(&statuses_map, &receiver_agents);
let result = WaitResult {
status: statuses_map.clone(),
timed_out: statuses.is_empty(),
};
session
.send_event(
&turn,
CollabWaitingEndEvent {
sender_thread_id: session.conversation_id,
call_id,
agent_statuses,
statuses: statuses_map,
}
.into(),
)
.await;
Ok(result)
}
}
#[derive(Debug, Deserialize)]
struct WaitArgs {
ids: Vec<String>,
timeout_ms: Option<i64>,
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) struct WaitResult {
pub(crate) status: HashMap<ThreadId, AgentStatus>,
pub(crate) timed_out: bool,
}
impl ToolOutput for WaitResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "wait")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, None, "wait")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "wait")
}
}
async fn wait_for_final_status(
session: Arc<Session>,
thread_id: ThreadId,
mut status_rx: Receiver<AgentStatus>,
) -> Option<(ThreadId, AgentStatus)> {
let mut status = status_rx.borrow().clone();
if is_final(&status) {
return Some((thread_id, status));
}
loop {
if status_rx.changed().await.is_err() {
let latest = session.services.agent_control.get_status(thread_id).await;
return is_final(&latest).then_some((thread_id, latest));
}
status = status_rx.borrow().clone();
if is_final(&status) {
return Some((thread_id, status));
}
}
}
}
pub mod close_agent {
use super::*;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = CloseAgentResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(
&self,
invocation: ToolInvocation,
) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
let (receiver_agent_nickname, receiver_agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(agent_id)
.await
.unwrap_or((None, None));
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent_nickname.clone(),
receiver_agent_role: receiver_agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
};
let result = if !matches!(status, AgentStatus::Shutdown) {
session
.services
.agent_control
.shutdown_agent(agent_id)
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ())
} else {
Ok(())
};
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname,
receiver_agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
Ok(CloseAgentResult { status })
}
}
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct CloseAgentResult {
pub(crate) status: AgentStatus,
}
impl ToolOutput for CloseAgentResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "close_agent")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "close_agent")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "close_agent")
}
}
}
pub mod close_agent;
mod resume_agent;
mod send_input;
mod spawn;
pub(crate) mod wait;
fn agent_id(id: &str) -> Result<ThreadId, FunctionCallError> {
ThreadId::from_string(id)

View File

@@ -0,0 +1,123 @@
use super::*;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = CloseAgentResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
let (receiver_agent_nickname, receiver_agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(agent_id)
.await
.unwrap_or((None, None));
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent_nickname.clone(),
receiver_agent_role: receiver_agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
};
let result = if !matches!(status, AgentStatus::Shutdown) {
session
.services
.agent_control
.shutdown_agent(agent_id)
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ())
} else {
Ok(())
};
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname,
receiver_agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
Ok(CloseAgentResult { status })
}
}
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct CloseAgentResult {
pub(crate) status: AgentStatus,
}
impl ToolOutput for CloseAgentResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "close_agent")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "close_agent")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "close_agent")
}
}

View File

@@ -0,0 +1,163 @@
use super::*;
use crate::agent::next_thread_spawn_depth;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = ResumeAgentResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: ResumeAgentArgs = parse_arguments(&arguments)?;
let receiver_thread_id = agent_id(&args.id)?;
let (receiver_agent_nickname, receiver_agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(receiver_thread_id)
.await
.unwrap_or((None, None));
let child_depth = next_thread_spawn_depth(&turn.session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabResumeBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent_nickname.clone(),
receiver_agent_role: receiver_agent_role.clone(),
}
.into(),
)
.await;
let mut status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
let error = if matches!(status, AgentStatus::NotFound) {
match try_resume_closed_agent(&session, &turn, receiver_thread_id, child_depth).await {
Ok(resumed_status) => {
status = resumed_status;
None
}
Err(err) => {
status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
Some(err)
}
}
} else {
None
};
let (receiver_agent_nickname, receiver_agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(receiver_thread_id)
.await
.unwrap_or((receiver_agent_nickname, receiver_agent_role));
session
.send_event(
&turn,
CollabResumeEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname,
receiver_agent_role,
status: status.clone(),
}
.into(),
)
.await;
if let Some(err) = error {
return Err(err);
}
turn.session_telemetry
.counter("codex.multi_agent.resume", 1, &[]);
Ok(ResumeAgentResult { status })
}
}
#[derive(Debug, Deserialize)]
struct ResumeAgentArgs {
id: String,
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) struct ResumeAgentResult {
pub(crate) status: AgentStatus,
}
impl ToolOutput for ResumeAgentResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "resume_agent")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "resume_agent")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "resume_agent")
}
}
async fn try_resume_closed_agent(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
receiver_thread_id: ThreadId,
child_depth: i32,
) -> Result<AgentStatus, FunctionCallError> {
let config = build_agent_resume_config(turn.as_ref(), child_depth)?;
let resumed_thread_id = session
.services
.agent_control
.resume_agent_from_rollout(
config,
receiver_thread_id,
thread_spawn_source(session.conversation_id, child_depth, None),
)
.await
.map_err(|err| collab_agent_error(receiver_thread_id, err))?;
Ok(session
.services
.agent_control
.get_status(resumed_thread_id)
.await)
}

View File

@@ -0,0 +1,118 @@
use super::*;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = SendInputResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SendInputArgs = parse_arguments(&arguments)?;
let receiver_thread_id = agent_id(&args.id)?;
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = input_preview(&input_items);
let (receiver_agent_nickname, receiver_agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(receiver_thread_id)
.await
.unwrap_or((None, None));
if args.interrupt {
session
.services
.agent_control
.interrupt_agent(receiver_thread_id)
.await
.map_err(|err| collab_agent_error(receiver_thread_id, err))?;
}
session
.send_event(
&turn,
CollabAgentInteractionBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
prompt: prompt.clone(),
}
.into(),
)
.await;
let result = session
.services
.agent_control
.send_input(receiver_thread_id, input_items)
.await
.map_err(|err| collab_agent_error(receiver_thread_id, err));
let status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
session
.send_event(
&turn,
CollabAgentInteractionEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname,
receiver_agent_role,
prompt,
status,
}
.into(),
)
.await;
let submission_id = result?;
Ok(SendInputResult { submission_id })
}
}
#[derive(Debug, Deserialize)]
struct SendInputArgs {
id: String,
message: Option<String>,
items: Option<Vec<UserInput>>,
#[serde(default)]
interrupt: bool,
}
#[derive(Debug, Serialize)]
pub(crate) struct SendInputResult {
submission_id: String,
}
impl ToolOutput for SendInputResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "send_input")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "send_input")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "send_input")
}
}

View File

@@ -0,0 +1,173 @@
use super::*;
use crate::agent::control::SpawnAgentOptions;
use crate::agent::role::DEFAULT_ROLE_NAME;
use crate::agent::role::apply_role_to_config;
use crate::agent::exceeds_thread_spawn_depth_limit;
use crate::agent::next_thread_spawn_depth;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = SpawnAgentResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = input_preview(&input_items);
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let result = session
.services
.agent_control
.spawn_agent_with_options(
config,
input_items,
Some(thread_spawn_source(
session.conversation_id,
child_depth,
role_name,
)),
SpawnAgentOptions {
fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()),
},
)
.await
.map_err(collab_spawn_error);
let (new_thread_id, status) = match &result {
Ok(thread_id) => (
Some(*thread_id),
session.services.agent_control.get_status(*thread_id).await,
),
Err(_) => (None, AgentStatus::NotFound),
};
let (new_agent_nickname, new_agent_role) = match new_thread_id {
Some(thread_id) => session
.services
.agent_control
.get_agent_nickname_and_role(thread_id)
.await
.unwrap_or((None, None)),
None => (None, None),
};
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
status,
}
.into(),
)
.await;
let new_thread_id = result?;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry
.counter("codex.multi_agent.spawn", 1, &[("role", role_tag)]);
Ok(SpawnAgentResult {
agent_id: new_thread_id.to_string(),
nickname,
})
}
}
#[derive(Debug, Deserialize)]
struct SpawnAgentArgs {
message: Option<String>,
items: Option<Vec<UserInput>>,
agent_type: Option<String>,
model: Option<String>,
reasoning_effort: Option<ReasoningEffort>,
#[serde(default)]
fork_context: bool,
}
#[derive(Debug, Serialize)]
pub(crate) struct SpawnAgentResult {
agent_id: String,
nickname: Option<String>,
}
impl ToolOutput for SpawnAgentResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "spawn_agent")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, Some(true), "spawn_agent")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "spawn_agent")
}
}

View File

@@ -0,0 +1,228 @@
use super::*;
use crate::agent::status::is_final;
use futures::FutureExt;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::watch::Receiver;
use tokio::time::Instant;
use tokio::time::timeout_at;
pub(crate) struct Handler;
#[async_trait]
impl ToolHandler for Handler {
type Output = WaitResult;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: WaitArgs = parse_arguments(&arguments)?;
if args.ids.is_empty() {
return Err(FunctionCallError::RespondToModel(
"ids must be non-empty".to_owned(),
));
}
let receiver_thread_ids = args
.ids
.iter()
.map(|id| agent_id(id))
.collect::<Result<Vec<_>, _>>()?;
let mut receiver_agents = Vec::with_capacity(receiver_thread_ids.len());
for receiver_thread_id in &receiver_thread_ids {
let (agent_nickname, agent_role) = session
.services
.agent_control
.get_agent_nickname_and_role(*receiver_thread_id)
.await
.unwrap_or((None, None));
receiver_agents.push(CollabAgentRef {
thread_id: *receiver_thread_id,
agent_nickname,
agent_role,
});
}
let timeout_ms = args.timeout_ms.unwrap_or(DEFAULT_WAIT_TIMEOUT_MS);
let timeout_ms = match timeout_ms {
ms if ms <= 0 => {
return Err(FunctionCallError::RespondToModel(
"timeout_ms must be greater than zero".to_owned(),
));
}
ms => ms.clamp(MIN_WAIT_TIMEOUT_MS, MAX_WAIT_TIMEOUT_MS),
};
session
.send_event(
&turn,
CollabWaitingBeginEvent {
sender_thread_id: session.conversation_id,
receiver_thread_ids: receiver_thread_ids.clone(),
receiver_agents: receiver_agents.clone(),
call_id: call_id.clone(),
}
.into(),
)
.await;
let mut status_rxs = Vec::with_capacity(receiver_thread_ids.len());
let mut initial_final_statuses = Vec::new();
for id in &receiver_thread_ids {
match session.services.agent_control.subscribe_status(*id).await {
Ok(rx) => {
let status = rx.borrow().clone();
if is_final(&status) {
initial_final_statuses.push((*id, status));
}
status_rxs.push((*id, rx));
}
Err(CodexErr::ThreadNotFound(_)) => {
initial_final_statuses.push((*id, AgentStatus::NotFound));
}
Err(err) => {
let mut statuses = HashMap::with_capacity(1);
statuses.insert(*id, session.services.agent_control.get_status(*id).await);
session
.send_event(
&turn,
CollabWaitingEndEvent {
sender_thread_id: session.conversation_id,
call_id: call_id.clone(),
agent_statuses: build_wait_agent_statuses(
&statuses,
&receiver_agents,
),
statuses,
}
.into(),
)
.await;
return Err(collab_agent_error(*id, err));
}
}
}
let statuses = if !initial_final_statuses.is_empty() {
initial_final_statuses
} else {
let mut futures = FuturesUnordered::new();
for (id, rx) in status_rxs.into_iter() {
let session = session.clone();
futures.push(wait_for_final_status(session, id, rx));
}
let mut results = Vec::new();
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
loop {
match timeout_at(deadline, futures.next()).await {
Ok(Some(Some(result))) => {
results.push(result);
break;
}
Ok(Some(None)) => continue,
Ok(None) | Err(_) => break,
}
}
if !results.is_empty() {
loop {
match futures.next().now_or_never() {
Some(Some(Some(result))) => results.push(result),
Some(Some(None)) => continue,
Some(None) | None => break,
}
}
}
results
};
let statuses_map = statuses.clone().into_iter().collect::<HashMap<_, _>>();
let agent_statuses = build_wait_agent_statuses(&statuses_map, &receiver_agents);
let result = WaitResult {
status: statuses_map.clone(),
timed_out: statuses.is_empty(),
};
session
.send_event(
&turn,
CollabWaitingEndEvent {
sender_thread_id: session.conversation_id,
call_id,
agent_statuses,
statuses: statuses_map,
}
.into(),
)
.await;
Ok(result)
}
}
#[derive(Debug, Deserialize)]
struct WaitArgs {
ids: Vec<String>,
timeout_ms: Option<i64>,
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) struct WaitResult {
pub(crate) status: HashMap<ThreadId, AgentStatus>,
pub(crate) timed_out: bool,
}
impl ToolOutput for WaitResult {
fn log_preview(&self) -> String {
tool_output_json_text(self, "wait")
}
fn success_for_logging(&self) -> bool {
true
}
fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
tool_output_response_item(call_id, payload, self, None, "wait")
}
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
tool_output_code_mode_result(self, "wait")
}
}
async fn wait_for_final_status(
session: Arc<Session>,
thread_id: ThreadId,
mut status_rx: Receiver<AgentStatus>,
) -> Option<(ThreadId, AgentStatus)> {
let mut status = status_rx.borrow().clone();
if is_final(&status) {
return Some((thread_id, status));
}
loop {
if status_rx.changed().await.is_err() {
let latest = session.services.agent_control.get_status(thread_id).await;
return is_final(&latest).then_some((thread_id, latest));
}
status = status_rx.borrow().clone();
if is_final(&status) {
return Some((thread_id, status));
}
}
}