core: box multi-agent handler futures

This commit is contained in:
Michael Bolin
2026-05-11 23:21:46 -07:00
parent 46f30d0282
commit 44831f4b8a
7 changed files with 656 additions and 606 deletions

View File

@@ -64,6 +64,12 @@ pub(crate) struct LiveAgent {
pub(crate) status: AgentStatus,
}
#[derive(Clone, Copy)]
enum LiveAgentShutdownMode {
SubmitOnly,
WaitForTermination,
}
#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
pub(crate) struct ListedAgent {
pub(crate) agent_name: String,
@@ -233,32 +239,31 @@ impl AgentControl {
// The same `AgentControl` is sent to spawn the thread.
let new_thread = match (session_source, options.fork_mode.as_ref()) {
(Some(session_source), Some(_)) => {
self.spawn_forked_thread(
Box::pin(self.spawn_forked_thread(
&state,
config,
session_source,
&options,
inherited_shell_snapshot,
inherited_exec_policy,
)
))
.await?
}
(Some(session_source), None) => {
state
.spawn_new_thread_with_source(
config.clone(),
self.clone(),
session_source,
/*thread_source*/ Some(ThreadSource::Subagent),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
options.environments.clone(),
)
.await?
Box::pin(state.spawn_new_thread_with_source(
config,
self.clone(),
session_source,
/*thread_source*/ Some(ThreadSource::Subagent),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
options.environments.clone(),
))
.await?
}
(None, _) => state.spawn_new_thread(config.clone(), self.clone()).await?,
(None, _) => Box::pin(state.spawn_new_thread(config, self.clone())).await?,
};
agent_metadata.agent_id = Some(new_thread.thread_id);
reservation.commit(agent_metadata.clone());
@@ -713,22 +718,42 @@ impl AgentControl {
/// Submit a shutdown request for a live agent without marking it explicitly closed in
/// persisted spawn-edge state.
pub(crate) async fn shutdown_live_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
self.shutdown_live_agent_with_mode(agent_id, LiveAgentShutdownMode::SubmitOnly)
.await
}
async fn shutdown_live_agent_and_wait(&self, agent_id: ThreadId) -> CodexResult<String> {
self.shutdown_live_agent_with_mode(agent_id, LiveAgentShutdownMode::WaitForTermination)
.await
}
async fn shutdown_live_agent_with_mode(
&self,
agent_id: ThreadId,
mode: LiveAgentShutdownMode,
) -> CodexResult<String> {
let state = self.upgrade()?;
let mut thread_to_wait = None;
let result = if let Ok(thread) = state.get_thread(agent_id).await {
thread.codex.session.ensure_rollout_materialized().await;
thread.codex.session.flush_rollout().await?;
let result = if matches!(thread.agent_status().await, AgentStatus::Shutdown) {
if matches!(thread.agent_status().await, AgentStatus::Shutdown) {
Ok(String::new())
} else {
state.send_op(agent_id, Op::Shutdown {}).await
};
thread.wait_until_terminated().await;
result
let result = state.send_op(agent_id, Op::Shutdown {}).await;
if result.is_ok() && matches!(mode, LiveAgentShutdownMode::WaitForTermination) {
thread_to_wait = Some(thread);
}
result
}
} else {
state.send_op(agent_id, Op::Shutdown {}).await
};
let _ = state.remove_thread(&agent_id).await;
self.state.release_spawned_thread(agent_id);
if let Some(thread) = thread_to_wait {
thread.wait_until_terminated().await;
}
result
}
@@ -750,9 +775,9 @@ impl AgentControl {
/// Shut down `agent_id` and any live descendants reachable from the in-memory spawn tree.
async fn shutdown_agent_tree(&self, agent_id: ThreadId) -> CodexResult<String> {
let descendant_ids = self.live_thread_spawn_descendants(agent_id).await?;
let result = self.shutdown_live_agent(agent_id).await;
let result = self.shutdown_live_agent_and_wait(agent_id).await;
for descendant_id in descendant_ids {
match self.shutdown_live_agent(descendant_id).await {
match self.shutdown_live_agent_and_wait(descendant_id).await {
Ok(_) | Err(CodexErr::ThreadNotFound(_)) | Err(CodexErr::InternalAgentDied) => {}
Err(err) => return Err(err),
}

View File

@@ -20,84 +20,89 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = parse_agent_id_target(&args.target)?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(async move {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = parse_agent_id_target(&args.target)?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
};
let result = Box::pin(session.services.agent_control.close_agent(agent_id))
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
};
let result = Box::pin(session.services.agent_control.close_agent(agent_id))
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
Ok(CloseAgentResult {
previous_status: status,
Ok(CloseAgentResult {
previous_status: status,
})
})
}
}

View File

@@ -22,111 +22,116 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: ResumeAgentArgs = parse_arguments(&arguments)?;
let receiver_thread_id = ThreadId::from_string(&args.id).map_err(|err| {
FunctionCallError::RespondToModel(format!("invalid agent id {}: {err:?}", args.id))
})?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or_default();
let child_depth = next_thread_spawn_depth(&turn.session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(async move {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: ResumeAgentArgs = parse_arguments(&arguments)?;
let receiver_thread_id = ThreadId::from_string(&args.id).map_err(|err| {
FunctionCallError::RespondToModel(format!("invalid agent id {}: {err:?}", args.id))
})?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or_default();
let child_depth = next_thread_spawn_depth(&turn.session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabResumeBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
session
.send_event(
&turn,
CollabResumeBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
}
.into(),
)
.await;
let mut status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
let (receiver_agent, error) = if matches!(status, AgentStatus::NotFound) {
match Box::pin(try_resume_closed_agent(
&session,
&turn,
receiver_thread_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
}
.into(),
)
.await;
let mut status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
let (receiver_agent, error) = if matches!(status, AgentStatus::NotFound) {
match Box::pin(try_resume_closed_agent(
&session,
&turn,
receiver_thread_id,
child_depth,
))
.await
{
Ok(()) => {
status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
(
session
child_depth,
))
.await
{
Ok(()) => {
status = session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or(receiver_agent),
None,
)
}
Err(err) => {
status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
(receiver_agent, Some(err))
.get_status(receiver_thread_id)
.await;
(
session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or(receiver_agent),
None,
)
}
Err(err) => {
status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
(receiver_agent, Some(err))
}
}
} else {
(receiver_agent, None)
};
session
.send_event(
&turn,
CollabResumeEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
if let Some(err) = error {
return Err(err);
}
} else {
(receiver_agent, None)
};
session
.send_event(
&turn,
CollabResumeEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
turn.session_telemetry
.counter("codex.multi_agent.resume", /*inc*/ 1, &[]);
if let Some(err) = error {
return Err(err);
}
turn.session_telemetry
.counter("codex.multi_agent.resume", /*inc*/ 1, &[]);
Ok(ResumeAgentResult { status })
Ok(ResumeAgentResult { status })
})
}
}

View File

@@ -37,157 +37,162 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = render_input_preview(&input_items);
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
if args.fork_context {
reject_full_fork_spawn_overrides(
role_name,
args.model.as_deref(),
args.reasoning_effort,
)?;
} else {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let result = Box::pin(session.services.agent_control.spawn_agent_with_metadata(
config,
input_items,
Some(thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
/*task_name*/ None,
)?),
SpawnAgentOptions {
fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()),
fork_mode: args.fork_context.then_some(SpawnAgentForkMode::FullHistory),
environments: Some(turn.environments.to_selections()),
},
))
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
};
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(async move {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = render_input_preview(&input_items);
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
let max_depth = turn.config.agent_max_depth;
if exceeds_thread_spawn_depth_limit(child_depth, max_depth) {
return Err(FunctionCallError::RespondToModel(
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
None => None,
};
let (_new_agent_path, new_agent_nickname, new_agent_role) =
match (&agent_snapshot, new_agent_metadata) {
(Some(snapshot), _) => (
snapshot.session_source.get_agent_path().map(String::from),
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
),
(None, Some(metadata)) => (
metadata.agent_path.map(String::from),
metadata.agent_nickname,
metadata.agent_role,
),
(None, None) => (None, None, None),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
}
.into(),
)
.await;
let new_thread_id = result?.thread_id;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
/*inc*/ 1,
&[("role", role_tag)],
);
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
if args.fork_context {
reject_full_fork_spawn_overrides(
role_name,
args.model.as_deref(),
args.reasoning_effort,
)?;
} else {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
Ok(SpawnAgentResult {
agent_id: new_thread_id.to_string(),
nickname,
let result = Box::pin(session.services.agent_control.spawn_agent_with_metadata(
config,
input_items,
Some(thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
/*task_name*/ None,
)?),
SpawnAgentOptions {
fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()),
fork_mode: args.fork_context.then_some(SpawnAgentForkMode::FullHistory),
environments: Some(turn.environments.to_selections()),
},
))
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
};
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
}
None => None,
};
let (_new_agent_path, new_agent_nickname, new_agent_role) =
match (&agent_snapshot, new_agent_metadata) {
(Some(snapshot), _) => (
snapshot.session_source.get_agent_path().map(String::from),
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
),
(None, Some(metadata)) => (
metadata.agent_path.map(String::from),
metadata.agent_nickname,
metadata.agent_role,
),
(None, None) => (None, None, None),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
}
.into(),
)
.await;
let new_thread_id = result?.thread_id;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
/*inc*/ 1,
&[("role", role_tag)],
);
Ok(SpawnAgentResult {
agent_id: new_thread_id.to_string(),
nickname,
})
})
}
}

View File

@@ -3179,10 +3179,11 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr
let parent_thread_id = parent.thread_id;
let parent_session = parent.thread.codex.session.clone();
let child_turn = parent_session.new_default_turn().await;
let child_spawn_output = SpawnAgentHandler::default()
.handle(invocation(
parent_session.clone(),
parent_session.new_default_turn().await,
child_turn,
"spawn_agent",
function_payload(json!({"message": "hello child"})),
))

View File

@@ -20,96 +20,101 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = resolve_agent_target(&session, &turn, &args.target).await?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
if receiver_agent
.agent_path
.as_ref()
.is_some_and(AgentPath::is_root)
{
return Err(FunctionCallError::RespondToModel(
"root is not a spawned agent".to_string(),
));
}
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(async move {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = resolve_agent_target(&session, &turn, &args.target).await?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
if receiver_agent
.agent_path
.as_ref()
.is_some_and(AgentPath::is_root)
{
return Err(FunctionCallError::RespondToModel(
"root is not a spawned agent".to_string(),
));
}
};
let result = session
.services
.agent_control
.close_agent(agent_id)
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
.into(),
)
.await;
result?;
};
let result = session
.services
.agent_control
.close_agent(agent_id)
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
Ok(CloseAgentResult {
previous_status: status,
Ok(CloseAgentResult {
previous_status: status,
})
})
}
}

View File

@@ -39,188 +39,192 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let fork_mode = args.fork_mode()?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(async move {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let fork_mode = args.fork_mode()?;
let role_name = args
.agent_type
.as_deref()
.map(str::trim)
.filter(|role| !role.is_empty());
let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?;
let prompt = render_input_preview(&initial_operation);
let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?;
let prompt = render_input_preview(&initial_operation);
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
if matches!(fork_mode, Some(SpawnAgentForkMode::FullHistory)) {
reject_full_fork_spawn_overrides(
role_name,
args.model.as_deref(),
args.reasoning_effort,
)?;
} else {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let spawn_source = thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
Some(args.task_name.clone()),
)?;
let result = session
.services
.agent_control
.spawn_agent_with_metadata(
config,
match (spawn_source.get_agent_path(), initial_operation) {
(Some(recipient), Op::UserInput { items, .. })
if items
.iter()
.all(|item| matches!(item, UserInput::Text { .. })) =>
{
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
turn.session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root),
recipient,
Vec::new(),
prompt.clone(),
/*trigger_turn*/ true,
),
}
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
(_, initial_operation) => initial_operation,
},
Some(spawn_source),
SpawnAgentOptions {
fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()),
fork_mode,
environments: Some(turn.environments.to_selections()),
},
.into(),
)
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
if matches!(fork_mode, Some(SpawnAgentForkMode::FullHistory)) {
reject_full_fork_spawn_overrides(
role_name,
args.model.as_deref(),
args.reasoning_effort,
)?;
} else {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let spawn_source = thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
Some(args.task_name.clone()),
)?;
let result = Box::pin(
session.services.agent_control.spawn_agent_with_metadata(
config,
match (spawn_source.get_agent_path(), initial_operation) {
(Some(recipient), Op::UserInput { items, .. })
if items
.iter()
.all(|item| matches!(item, UserInput::Text { .. })) =>
{
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
turn.session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root),
recipient,
Vec::new(),
prompt.clone(),
/*trigger_turn*/ true,
),
}
}
(_, initial_operation) => initial_operation,
},
Some(spawn_source),
SpawnAgentOptions {
fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()),
fork_mode,
environments: Some(turn.environments.to_selections()),
},
),
)
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
};
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
}
None => None,
};
let (new_agent_path, new_agent_nickname, new_agent_role) =
match (&agent_snapshot, new_agent_metadata) {
(Some(snapshot), _) => (
snapshot.session_source.get_agent_path().map(String::from),
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
(None, Some(metadata)) => (
metadata.agent_path.map(String::from),
metadata.agent_nickname,
metadata.agent_role,
),
(None, None) => (None, None, None),
Err(_) => (None, None, AgentStatus::NotFound),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
}
.into(),
)
.await;
let _ = result?;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
/*inc*/ 1,
&[("role", role_tag)],
);
let task_name = new_agent_path.ok_or_else(|| {
FunctionCallError::RespondToModel(
"spawned agent is missing a canonical task name".to_string(),
)
})?;
None => None,
};
let (new_agent_path, new_agent_nickname, new_agent_role) =
match (&agent_snapshot, new_agent_metadata) {
(Some(snapshot), _) => (
snapshot.session_source.get_agent_path().map(String::from),
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
),
(None, Some(metadata)) => (
metadata.agent_path.map(String::from),
metadata.agent_nickname,
metadata.agent_role,
),
(None, None) => (None, None, None),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
new_agent_role,
prompt,
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
}
.into(),
)
.await;
let _ = result?;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
/*inc*/ 1,
&[("role", role_tag)],
);
let task_name = new_agent_path.ok_or_else(|| {
FunctionCallError::RespondToModel(
"spawned agent is missing a canonical task name".to_string(),
)
})?;
let hide_agent_metadata = turn.config.multi_agent_v2.hide_spawn_agent_metadata;
if hide_agent_metadata {
Ok(SpawnAgentResult::HiddenMetadata { task_name })
} else {
Ok(SpawnAgentResult::WithNickname {
task_name,
nickname,
})
}
let hide_agent_metadata = turn.config.multi_agent_v2.hide_spawn_agent_metadata;
if hide_agent_metadata {
Ok(SpawnAgentResult::HiddenMetadata { task_name })
} else {
Ok(SpawnAgentResult::WithNickname {
task_name,
nickname,
})
}
})
}
}