core: box multi-agent handler futures (#22266)

## Why

This is the base PR in the split stack for the permissions migration. It
isolates stack-safety work that had been mixed into the larger
permissions PR, so reviewers can evaluate the async-future changes
separately from the permissions model changes in #22267.

The main risk this addresses is large or recursive multi-agent futures
overflowing smaller runner stacks. A follow-up review also called out
that `shutdown_live_agent` must remain quiescent: callers should not
remove a live agent from tracking or release its spawn slot until the
worker loop has actually terminated.

## What Changed

- Boxes the large async futures in the multi-agent spawn, resume, and
close tool handlers.
- Boxes the `AgentControl` spawn and recursive close/shutdown paths that
can otherwise build very deep futures.
- Keeps `shutdown_live_agent` waiting for thread termination before
removing/releasing the live agent, preserving the previous shutdown
ordering while still boxing the recursive close path.

## Verification Strategy

The focused local coverage was `cargo test -p codex-core multi_agents`,
which exercises the multi-agent spawn/resume/close handlers, cascade
close/resume behavior, and the shutdown path touched by this PR.












---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/22266).
* #22330
* #22329
* #22328
* #22327
* __->__ #22266
This commit is contained in:
Michael Bolin
2026-05-12 17:22:25 -07:00
committed by GitHub
parent 589b820d6e
commit 9e7cdbd0d2
7 changed files with 656 additions and 620 deletions

View File

@@ -233,32 +233,31 @@ impl AgentControl {
// The same `AgentControl` is sent to spawn the thread.
let new_thread = match (session_source, options.fork_mode.as_ref()) {
(Some(session_source), Some(_)) => {
self.spawn_forked_thread(
Box::pin(self.spawn_forked_thread(
&state,
config,
session_source,
&options,
inherited_shell_snapshot,
inherited_exec_policy,
)
))
.await?
}
(Some(session_source), None) => {
state
.spawn_new_thread_with_source(
config.clone(),
self.clone(),
session_source,
/*thread_source*/ Some(ThreadSource::Subagent),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
options.environments.clone(),
)
.await?
Box::pin(state.spawn_new_thread_with_source(
config.clone(),
self.clone(),
session_source,
/*thread_source*/ Some(ThreadSource::Subagent),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
options.environments.clone(),
))
.await?
}
(None, _) => state.spawn_new_thread(config.clone(), self.clone()).await?,
(None, _) => Box::pin(state.spawn_new_thread(config.clone(), self.clone())).await?,
};
agent_metadata.agent_id = Some(new_thread.thread_id);
reservation.commit(agent_metadata.clone());

View File

@@ -20,88 +20,97 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = parse_agent_id_target(&args.target)?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
};
let result = Box::pin(session.services.agent_control.close_agent(agent_id))
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
Ok(CloseAgentResult {
previous_status: status,
})
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(handle_close_agent(invocation))
}
}
async fn handle_close_agent(
invocation: ToolInvocation,
) -> Result<CloseAgentResult, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = parse_agent_id_target(&args.target)?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
};
let result = Box::pin(session.services.agent_control.close_agent(agent_id))
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
Ok(CloseAgentResult {
previous_status: status,
})
}
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct CloseAgentResult {
pub(crate) previous_status: AgentStatus,

View File

@@ -22,114 +22,123 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: ResumeAgentArgs = parse_arguments(&arguments)?;
let receiver_thread_id = ThreadId::from_string(&args.id).map_err(|err| {
FunctionCallError::RespondToModel(format!("invalid agent id {}: {err:?}", args.id))
})?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or_default();
let child_depth = next_thread_spawn_depth(&turn.session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabResumeBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
}
.into(),
)
.await;
let mut status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
let (receiver_agent, error) = if matches!(status, AgentStatus::NotFound) {
match Box::pin(try_resume_closed_agent(
&session,
&turn,
receiver_thread_id,
child_depth,
))
.await
{
Ok(()) => {
status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
(
session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or(receiver_agent),
None,
)
}
Err(err) => {
status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
(receiver_agent, Some(err))
}
}
} else {
(receiver_agent, None)
};
session
.send_event(
&turn,
CollabResumeEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
if let Some(err) = error {
return Err(err);
}
turn.session_telemetry
.counter("codex.multi_agent.resume", /*inc*/ 1, &[]);
Ok(ResumeAgentResult { status })
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(handle_resume_agent(invocation))
}
}
async fn handle_resume_agent(
invocation: ToolInvocation,
) -> Result<ResumeAgentResult, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: ResumeAgentArgs = parse_arguments(&arguments)?;
let receiver_thread_id = ThreadId::from_string(&args.id).map_err(|err| {
FunctionCallError::RespondToModel(format!("invalid agent id {}: {err:?}", args.id))
})?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or_default();
let child_depth = next_thread_spawn_depth(&turn.session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabResumeBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
}
.into(),
)
.await;
let mut status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
let (receiver_agent, error) = if matches!(status, AgentStatus::NotFound) {
match Box::pin(try_resume_closed_agent(
&session,
&turn,
receiver_thread_id,
child_depth,
))
.await
{
Ok(()) => {
status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
(
session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or(receiver_agent),
None,
)
}
Err(err) => {
status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
(receiver_agent, Some(err))
}
}
} else {
(receiver_agent, None)
};
session
.send_event(
&turn,
CollabResumeEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
if let Some(err) = error {
return Err(err);
}
turn.session_telemetry
.counter("codex.multi_agent.resume", /*inc*/ 1, &[]);
Ok(ResumeAgentResult { status })
}
#[derive(Debug, Deserialize)]
struct ResumeAgentArgs {
id: String,

View File

@@ -37,161 +37,166 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = render_input_preview(&input_items);
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
if args.fork_context {
reject_full_fork_spawn_overrides(
role_name,
args.model.as_deref(),
args.reasoning_effort,
)?;
} else {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let result = Box::pin(session.services.agent_control.spawn_agent_with_metadata(
config,
input_items,
Some(thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
/*task_name*/ None,
)?),
SpawnAgentOptions {
fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()),
fork_mode: args.fork_context.then_some(SpawnAgentForkMode::FullHistory),
environments: Some(turn.environments.to_selections()),
},
))
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
};
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
}
None => None,
};
let (_new_agent_path, new_agent_nickname, new_agent_role) =
match (&agent_snapshot, new_agent_metadata) {
(Some(snapshot), _) => (
snapshot.session_source.get_agent_path().map(String::from),
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
),
(None, Some(metadata)) => (
metadata.agent_path.map(String::from),
metadata.agent_nickname,
metadata.agent_role,
),
(None, None) => (None, None, None),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
}
.into(),
)
.await;
let new_thread_id = result?.thread_id;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
/*inc*/ 1,
&[("role", role_tag)],
);
Ok(SpawnAgentResult {
agent_id: new_thread_id.to_string(),
nickname,
})
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(handle_spawn_agent(invocation))
}
}
async fn handle_spawn_agent(
invocation: ToolInvocation,
) -> Result<SpawnAgentResult, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = render_input_preview(&input_items);
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
if args.fork_context {
reject_full_fork_spawn_overrides(role_name, args.model.as_deref(), args.reasoning_effort)?;
} else {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let result = Box::pin(session.services.agent_control.spawn_agent_with_metadata(
config,
input_items,
Some(thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
/*task_name*/ None,
)?),
SpawnAgentOptions {
fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()),
fork_mode: args.fork_context.then_some(SpawnAgentForkMode::FullHistory),
environments: Some(turn.environments.to_selections()),
},
))
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
};
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
}
None => None,
};
let (_new_agent_path, new_agent_nickname, new_agent_role) =
match (&agent_snapshot, new_agent_metadata) {
(Some(snapshot), _) => (
snapshot.session_source.get_agent_path().map(String::from),
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
),
(None, Some(metadata)) => (
metadata.agent_path.map(String::from),
metadata.agent_nickname,
metadata.agent_role,
),
(None, None) => (None, None, None),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
}
.into(),
)
.await;
let new_thread_id = result?.thread_id;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
/*inc*/ 1,
&[("role", role_tag)],
);
Ok(SpawnAgentResult {
agent_id: new_thread_id.to_string(),
nickname,
})
}
#[derive(Debug, Deserialize)]
struct SpawnAgentArgs {
message: Option<String>,

View File

@@ -3179,10 +3179,11 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr
let parent_thread_id = parent.thread_id;
let parent_session = parent.thread.codex.session.clone();
let child_turn = parent_session.new_default_turn().await;
let child_spawn_output = SpawnAgentHandler::default()
.handle(invocation(
parent_session.clone(),
parent_session.new_default_turn().await,
child_turn,
"spawn_agent",
function_payload(json!({"message": "hello child"})),
))

View File

@@ -20,100 +20,109 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = resolve_agent_target(&session, &turn, &args.target).await?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
if receiver_agent
.agent_path
.as_ref()
.is_some_and(AgentPath::is_root)
{
return Err(FunctionCallError::RespondToModel(
"root is not a spawned agent".to_string(),
));
}
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
};
let result = session
.services
.agent_control
.close_agent(agent_id)
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
Ok(CloseAgentResult {
previous_status: status,
})
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(handle_close_agent(invocation))
}
}
async fn handle_close_agent(
invocation: ToolInvocation,
) -> Result<CloseAgentResult, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = resolve_agent_target(&session, &turn, &args.target).await?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
if receiver_agent
.agent_path
.as_ref()
.is_some_and(AgentPath::is_root)
{
return Err(FunctionCallError::RespondToModel(
"root is not a spawned agent".to_string(),
));
}
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
};
let result = session
.services
.agent_control
.close_agent(agent_id)
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
Ok(CloseAgentResult {
previous_status: status,
})
}
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct CloseAgentArgs {

View File

@@ -39,188 +39,192 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let fork_mode = args.fork_mode()?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(handle_spawn_agent(invocation))
}
}
let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?;
let prompt = render_input_preview(&initial_operation);
async fn handle_spawn_agent(
invocation: ToolInvocation,
) -> Result<SpawnAgentResult, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let fork_mode = args.fork_mode()?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
if matches!(fork_mode, Some(SpawnAgentForkMode::FullHistory)) {
reject_full_fork_spawn_overrides(
role_name,
args.model.as_deref(),
args.reasoning_effort,
)?;
} else {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?;
let prompt = render_input_preview(&initial_operation);
let spawn_source = thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
Some(args.task_name.clone()),
)?;
let result = session
.services
.agent_control
.spawn_agent_with_metadata(
config,
match (spawn_source.get_agent_path(), initial_operation) {
(Some(recipient), Op::UserInput { items, .. })
if items
.iter()
.all(|item| matches!(item, UserInput::Text { .. })) =>
{
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
turn.session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root),
recipient,
Vec::new(),
prompt.clone(),
/*trigger_turn*/ true,
),
}
}
(_, initial_operation) => initial_operation,
},
Some(spawn_source),
SpawnAgentOptions {
fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()),
fork_mode,
environments: Some(turn.environments.to_selections()),
},
)
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
};
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
None => None,
};
let (new_agent_path, new_agent_nickname, new_agent_role) =
match (&agent_snapshot, new_agent_metadata) {
(Some(snapshot), _) => (
snapshot.session_source.get_agent_path().map(String::from),
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
),
(None, Some(metadata)) => (
metadata.agent_path.map(String::from),
metadata.agent_nickname,
metadata.agent_role,
),
(None, None) => (None, None, None),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
}
.into(),
)
.await;
let _ = result?;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
/*inc*/ 1,
&[("role", role_tag)],
);
let task_name = new_agent_path.ok_or_else(|| {
FunctionCallError::RespondToModel(
"spawned agent is missing a canonical task name".to_string(),
)
})?;
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
if matches!(fork_mode, Some(SpawnAgentForkMode::FullHistory)) {
reject_full_fork_spawn_overrides(role_name, args.model.as_deref(), args.reasoning_effort)?;
} else {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let hide_agent_metadata = turn.config.multi_agent_v2.hide_spawn_agent_metadata;
if hide_agent_metadata {
Ok(SpawnAgentResult::HiddenMetadata { task_name })
} else {
Ok(SpawnAgentResult::WithNickname {
task_name,
nickname,
})
let spawn_source = thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
Some(args.task_name.clone()),
)?;
let result = Box::pin(
session.services.agent_control.spawn_agent_with_metadata(
config,
match (spawn_source.get_agent_path(), initial_operation) {
(Some(recipient), Op::UserInput { items, .. })
if items
.iter()
.all(|item| matches!(item, UserInput::Text { .. })) =>
{
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
turn.session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root),
recipient,
Vec::new(),
prompt.clone(),
/*trigger_turn*/ true,
),
}
}
(_, initial_operation) => initial_operation,
},
Some(spawn_source),
SpawnAgentOptions {
fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()),
fork_mode,
environments: Some(turn.environments.to_selections()),
},
),
)
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
};
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
}
None => None,
};
let (new_agent_path, new_agent_nickname, new_agent_role) =
match (&agent_snapshot, new_agent_metadata) {
(Some(snapshot), _) => (
snapshot.session_source.get_agent_path().map(String::from),
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
),
(None, Some(metadata)) => (
metadata.agent_path.map(String::from),
metadata.agent_nickname,
metadata.agent_role,
),
(None, None) => (None, None, None),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
}
.into(),
)
.await;
let _ = result?;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
/*inc*/ 1,
&[("role", role_tag)],
);
let task_name = new_agent_path.ok_or_else(|| {
FunctionCallError::RespondToModel(
"spawned agent is missing a canonical task name".to_string(),
)
})?;
let hide_agent_metadata = turn.config.multi_agent_v2.hide_spawn_agent_metadata;
if hide_agent_metadata {
Ok(SpawnAgentResult::HiddenMetadata { task_name })
} else {
Ok(SpawnAgentResult::WithNickname {
task_name,
nickname,
})
}
}