From 9e7cdbd0d28a4400a70339042e4643b4f462da6c Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Tue, 12 May 2026 17:22:25 -0700 Subject: [PATCH] core: box multi-agent handler futures (#22266) ## Why This is the base PR in the split stack for the permissions migration. It isolates stack-safety work that had been mixed into the larger permissions PR, so reviewers can evaluate the async-future changes separately from the permissions model changes in #22267. The main risk this addresses is large or recursive multi-agent futures overflowing smaller runner stacks. A follow-up review also called out that `shutdown_live_agent` must remain quiescent: callers should not remove a live agent from tracking or release its spawn slot until the worker loop has actually terminated. ## What Changed - Boxes the large async futures in the multi-agent spawn, resume, and close tool handlers. - Boxes the `AgentControl` spawn and recursive close/shutdown paths that can otherwise build very deep futures. - Keeps `shutdown_live_agent` waiting for thread termination before removing/releasing the live agent, preserving the previous shutdown ordering while still boxing the recursive close path. ## Verification Strategy The focused local coverage was `cargo test -p codex-core multi_agents`, which exercises the multi-agent spawn/resume/close handlers, cascade close/resume behavior, and the shutdown path touched by this PR. --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/22266). * #22330 * #22329 * #22328 * #22327 * __->__ #22266 --- codex-rs/core/src/agent/control.rs | 31 +- .../handlers/multi_agents/close_agent.rs | 167 ++++---- .../handlers/multi_agents/resume_agent.rs | 219 +++++------ .../src/tools/handlers/multi_agents/spawn.rs | 309 +++++++-------- .../src/tools/handlers/multi_agents_tests.rs | 3 +- .../handlers/multi_agents_v2/close_agent.rs | 191 +++++----- .../tools/handlers/multi_agents_v2/spawn.rs | 356 +++++++++--------- 7 files changed, 656 insertions(+), 620 deletions(-) diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 079ee61f01..d973f40e45 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -233,32 +233,31 @@ impl AgentControl { // The same `AgentControl` is sent to spawn the thread. let new_thread = match (session_source, options.fork_mode.as_ref()) { (Some(session_source), Some(_)) => { - self.spawn_forked_thread( + Box::pin(self.spawn_forked_thread( &state, config, session_source, &options, inherited_shell_snapshot, inherited_exec_policy, - ) + )) .await? } (Some(session_source), None) => { - state - .spawn_new_thread_with_source( - config.clone(), - self.clone(), - session_source, - /*thread_source*/ Some(ThreadSource::Subagent), - /*persist_extended_history*/ false, - /*metrics_service_name*/ None, - inherited_shell_snapshot, - inherited_exec_policy, - options.environments.clone(), - ) - .await? + Box::pin(state.spawn_new_thread_with_source( + config.clone(), + self.clone(), + session_source, + /*thread_source*/ Some(ThreadSource::Subagent), + /*persist_extended_history*/ false, + /*metrics_service_name*/ None, + inherited_shell_snapshot, + inherited_exec_policy, + options.environments.clone(), + )) + .await? } - (None, _) => state.spawn_new_thread(config.clone(), self.clone()).await?, + (None, _) => Box::pin(state.spawn_new_thread(config.clone(), self.clone())).await?, }; agent_metadata.agent_id = Some(new_thread.thread_id); reservation.commit(agent_metadata.clone()); diff --git a/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs b/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs index e4788dd9b8..d87bb6c66c 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs @@ -20,88 +20,97 @@ impl ToolHandler for Handler { matches!(payload, ToolPayload::Function { .. }) } - async fn handle(&self, invocation: ToolInvocation) -> Result { - let ToolInvocation { - session, - turn, - payload, - call_id, - .. - } = invocation; - let arguments = function_arguments(payload)?; - let args: CloseAgentArgs = parse_arguments(&arguments)?; - let agent_id = parse_agent_id_target(&args.target)?; - let receiver_agent = session - .services - .agent_control - .get_agent_metadata(agent_id) - .unwrap_or_default(); - session - .send_event( - &turn, - CollabCloseBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - receiver_thread_id: agent_id, - } - .into(), - ) - .await; - let status = match session - .services - .agent_control - .subscribe_status(agent_id) - .await - { - Ok(mut status_rx) => status_rx.borrow_and_update().clone(), - Err(err) => { - let status = session.services.agent_control.get_status(agent_id).await; - session - .send_event( - &turn, - CollabCloseEndEvent { - call_id: call_id.clone(), - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - receiver_thread_id: agent_id, - receiver_agent_nickname: receiver_agent.agent_nickname.clone(), - receiver_agent_role: receiver_agent.agent_role.clone(), - status, - } - .into(), - ) - .await; - return Err(collab_agent_error(agent_id, err)); - } - }; - let result = Box::pin(session.services.agent_control.close_agent(agent_id)) - .await - .map_err(|err| collab_agent_error(agent_id, err)) - .map(|_| ()); - session - .send_event( - &turn, - CollabCloseEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - receiver_thread_id: agent_id, - receiver_agent_nickname: receiver_agent.agent_nickname, - receiver_agent_role: receiver_agent.agent_role, - status: status.clone(), - } - .into(), - ) - .await; - result?; - - Ok(CloseAgentResult { - previous_status: status, - }) + fn handle( + &self, + invocation: ToolInvocation, + ) -> impl std::future::Future> + Send { + Box::pin(handle_close_agent(invocation)) } } +async fn handle_close_agent( + invocation: ToolInvocation, +) -> Result { + let ToolInvocation { + session, + turn, + payload, + call_id, + .. + } = invocation; + let arguments = function_arguments(payload)?; + let args: CloseAgentArgs = parse_arguments(&arguments)?; + let agent_id = parse_agent_id_target(&args.target)?; + let receiver_agent = session + .services + .agent_control + .get_agent_metadata(agent_id) + .unwrap_or_default(); + session + .send_event( + &turn, + CollabCloseBeginEvent { + call_id: call_id.clone(), + started_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + receiver_thread_id: agent_id, + } + .into(), + ) + .await; + let status = match session + .services + .agent_control + .subscribe_status(agent_id) + .await + { + Ok(mut status_rx) => status_rx.borrow_and_update().clone(), + Err(err) => { + let status = session.services.agent_control.get_status(agent_id).await; + session + .send_event( + &turn, + CollabCloseEndEvent { + call_id: call_id.clone(), + completed_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + receiver_thread_id: agent_id, + receiver_agent_nickname: receiver_agent.agent_nickname.clone(), + receiver_agent_role: receiver_agent.agent_role.clone(), + status, + } + .into(), + ) + .await; + return Err(collab_agent_error(agent_id, err)); + } + }; + let result = Box::pin(session.services.agent_control.close_agent(agent_id)) + .await + .map_err(|err| collab_agent_error(agent_id, err)) + .map(|_| ()); + session + .send_event( + &turn, + CollabCloseEndEvent { + call_id, + completed_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + receiver_thread_id: agent_id, + receiver_agent_nickname: receiver_agent.agent_nickname, + receiver_agent_role: receiver_agent.agent_role, + status: status.clone(), + } + .into(), + ) + .await; + result?; + + Ok(CloseAgentResult { + previous_status: status, + }) +} + #[derive(Debug, Deserialize, Serialize)] pub(crate) struct CloseAgentResult { pub(crate) previous_status: AgentStatus, diff --git a/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs b/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs index fa77bc37bb..c8cc0f70b7 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs @@ -22,114 +22,123 @@ impl ToolHandler for Handler { matches!(payload, ToolPayload::Function { .. }) } - async fn handle(&self, invocation: ToolInvocation) -> Result { - let ToolInvocation { - session, - turn, - payload, - call_id, - .. - } = invocation; - let arguments = function_arguments(payload)?; - let args: ResumeAgentArgs = parse_arguments(&arguments)?; - let receiver_thread_id = ThreadId::from_string(&args.id).map_err(|err| { - FunctionCallError::RespondToModel(format!("invalid agent id {}: {err:?}", args.id)) - })?; - let receiver_agent = session - .services - .agent_control - .get_agent_metadata(receiver_thread_id) - .unwrap_or_default(); - let child_depth = next_thread_spawn_depth(&turn.session_source); - let max_depth = turn.config.agent_max_depth; - if exceeds_thread_spawn_depth_limit(child_depth, max_depth) { - return Err(FunctionCallError::RespondToModel( - "Agent depth limit reached. Solve the task yourself.".to_string(), - )); - } - - session - .send_event( - &turn, - CollabResumeBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - receiver_thread_id, - receiver_agent_nickname: receiver_agent.agent_nickname.clone(), - receiver_agent_role: receiver_agent.agent_role.clone(), - } - .into(), - ) - .await; - - let mut status = session - .services - .agent_control - .get_status(receiver_thread_id) - .await; - let (receiver_agent, error) = if matches!(status, AgentStatus::NotFound) { - match Box::pin(try_resume_closed_agent( - &session, - &turn, - receiver_thread_id, - child_depth, - )) - .await - { - Ok(()) => { - status = session - .services - .agent_control - .get_status(receiver_thread_id) - .await; - ( - session - .services - .agent_control - .get_agent_metadata(receiver_thread_id) - .unwrap_or(receiver_agent), - None, - ) - } - Err(err) => { - status = session - .services - .agent_control - .get_status(receiver_thread_id) - .await; - (receiver_agent, Some(err)) - } - } - } else { - (receiver_agent, None) - }; - session - .send_event( - &turn, - CollabResumeEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - receiver_thread_id, - receiver_agent_nickname: receiver_agent.agent_nickname, - receiver_agent_role: receiver_agent.agent_role, - status: status.clone(), - } - .into(), - ) - .await; - - if let Some(err) = error { - return Err(err); - } - turn.session_telemetry - .counter("codex.multi_agent.resume", /*inc*/ 1, &[]); - - Ok(ResumeAgentResult { status }) + fn handle( + &self, + invocation: ToolInvocation, + ) -> impl std::future::Future> + Send { + Box::pin(handle_resume_agent(invocation)) } } +async fn handle_resume_agent( + invocation: ToolInvocation, +) -> Result { + let ToolInvocation { + session, + turn, + payload, + call_id, + .. + } = invocation; + let arguments = function_arguments(payload)?; + let args: ResumeAgentArgs = parse_arguments(&arguments)?; + let receiver_thread_id = ThreadId::from_string(&args.id).map_err(|err| { + FunctionCallError::RespondToModel(format!("invalid agent id {}: {err:?}", args.id)) + })?; + let receiver_agent = session + .services + .agent_control + .get_agent_metadata(receiver_thread_id) + .unwrap_or_default(); + let child_depth = next_thread_spawn_depth(&turn.session_source); + let max_depth = turn.config.agent_max_depth; + if exceeds_thread_spawn_depth_limit(child_depth, max_depth) { + return Err(FunctionCallError::RespondToModel( + "Agent depth limit reached. Solve the task yourself.".to_string(), + )); + } + + session + .send_event( + &turn, + CollabResumeBeginEvent { + call_id: call_id.clone(), + started_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + receiver_thread_id, + receiver_agent_nickname: receiver_agent.agent_nickname.clone(), + receiver_agent_role: receiver_agent.agent_role.clone(), + } + .into(), + ) + .await; + + let mut status = session + .services + .agent_control + .get_status(receiver_thread_id) + .await; + let (receiver_agent, error) = if matches!(status, AgentStatus::NotFound) { + match Box::pin(try_resume_closed_agent( + &session, + &turn, + receiver_thread_id, + child_depth, + )) + .await + { + Ok(()) => { + status = session + .services + .agent_control + .get_status(receiver_thread_id) + .await; + ( + session + .services + .agent_control + .get_agent_metadata(receiver_thread_id) + .unwrap_or(receiver_agent), + None, + ) + } + Err(err) => { + status = session + .services + .agent_control + .get_status(receiver_thread_id) + .await; + (receiver_agent, Some(err)) + } + } + } else { + (receiver_agent, None) + }; + session + .send_event( + &turn, + CollabResumeEndEvent { + call_id, + completed_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + receiver_thread_id, + receiver_agent_nickname: receiver_agent.agent_nickname, + receiver_agent_role: receiver_agent.agent_role, + status: status.clone(), + } + .into(), + ) + .await; + + if let Some(err) = error { + return Err(err); + } + turn.session_telemetry + .counter("codex.multi_agent.resume", /*inc*/ 1, &[]); + + Ok(ResumeAgentResult { status }) +} + #[derive(Debug, Deserialize)] struct ResumeAgentArgs { id: String, diff --git a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs index 0317658955..8ac5ca0854 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs @@ -37,161 +37,166 @@ impl ToolHandler for Handler { matches!(payload, ToolPayload::Function { .. }) } - async fn handle(&self, invocation: ToolInvocation) -> Result { - let ToolInvocation { - session, - turn, - payload, - call_id, - .. - } = invocation; - let arguments = function_arguments(payload)?; - let args: SpawnAgentArgs = parse_arguments(&arguments)?; - let role_name = args - .agent_type - .as_deref() - .map(str::trim) - .filter(|role| !role.is_empty()); - let input_items = parse_collab_input(args.message, args.items)?; - let prompt = render_input_preview(&input_items); - let session_source = turn.session_source.clone(); - let child_depth = next_thread_spawn_depth(&session_source); - let max_depth = turn.config.agent_max_depth; - if exceeds_thread_spawn_depth_limit(child_depth, max_depth) { - return Err(FunctionCallError::RespondToModel( - "Agent depth limit reached. Solve the task yourself.".to_string(), - )); - } - session - .send_event( - &turn, - CollabAgentSpawnBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - prompt: prompt.clone(), - model: args.model.clone().unwrap_or_default(), - reasoning_effort: args.reasoning_effort.unwrap_or_default(), - } - .into(), - ) - .await; - let mut config = - build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?; - if args.fork_context { - reject_full_fork_spawn_overrides( - role_name, - args.model.as_deref(), - args.reasoning_effort, - )?; - } else { - apply_requested_spawn_agent_model_overrides( - &session, - turn.as_ref(), - &mut config, - args.model.as_deref(), - args.reasoning_effort, - ) - .await?; - apply_role_to_config(&mut config, role_name) - .await - .map_err(FunctionCallError::RespondToModel)?; - } - apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?; - apply_spawn_agent_overrides(&mut config, child_depth); - - let result = Box::pin(session.services.agent_control.spawn_agent_with_metadata( - config, - input_items, - Some(thread_spawn_source( - session.conversation_id, - &turn.session_source, - child_depth, - role_name, - /*task_name*/ None, - )?), - SpawnAgentOptions { - fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()), - fork_mode: args.fork_context.then_some(SpawnAgentForkMode::FullHistory), - environments: Some(turn.environments.to_selections()), - }, - )) - .await - .map_err(collab_spawn_error); - let (new_thread_id, new_agent_metadata, status) = match &result { - Ok(spawned_agent) => ( - Some(spawned_agent.thread_id), - Some(spawned_agent.metadata.clone()), - spawned_agent.status.clone(), - ), - Err(_) => (None, None, AgentStatus::NotFound), - }; - let agent_snapshot = match new_thread_id { - Some(thread_id) => { - session - .services - .agent_control - .get_agent_config_snapshot(thread_id) - .await - } - None => None, - }; - let (_new_agent_path, new_agent_nickname, new_agent_role) = - match (&agent_snapshot, new_agent_metadata) { - (Some(snapshot), _) => ( - snapshot.session_source.get_agent_path().map(String::from), - snapshot.session_source.get_nickname(), - snapshot.session_source.get_agent_role(), - ), - (None, Some(metadata)) => ( - metadata.agent_path.map(String::from), - metadata.agent_nickname, - metadata.agent_role, - ), - (None, None) => (None, None, None), - }; - let effective_model = agent_snapshot - .as_ref() - .map(|snapshot| snapshot.model.clone()) - .unwrap_or_else(|| args.model.clone().unwrap_or_default()); - let effective_reasoning_effort = agent_snapshot - .as_ref() - .and_then(|snapshot| snapshot.reasoning_effort) - .unwrap_or(args.reasoning_effort.unwrap_or_default()); - let nickname = new_agent_nickname.clone(); - session - .send_event( - &turn, - CollabAgentSpawnEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - new_thread_id, - new_agent_nickname, - new_agent_role, - prompt, - model: effective_model, - reasoning_effort: effective_reasoning_effort, - status, - } - .into(), - ) - .await; - let new_thread_id = result?.thread_id; - let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); - turn.session_telemetry.counter( - "codex.multi_agent.spawn", - /*inc*/ 1, - &[("role", role_tag)], - ); - - Ok(SpawnAgentResult { - agent_id: new_thread_id.to_string(), - nickname, - }) + fn handle( + &self, + invocation: ToolInvocation, + ) -> impl std::future::Future> + Send { + Box::pin(handle_spawn_agent(invocation)) } } +async fn handle_spawn_agent( + invocation: ToolInvocation, +) -> Result { + let ToolInvocation { + session, + turn, + payload, + call_id, + .. + } = invocation; + let arguments = function_arguments(payload)?; + let args: SpawnAgentArgs = parse_arguments(&arguments)?; + let role_name = args + .agent_type + .as_deref() + .map(str::trim) + .filter(|role| !role.is_empty()); + let input_items = parse_collab_input(args.message, args.items)?; + let prompt = render_input_preview(&input_items); + let session_source = turn.session_source.clone(); + let child_depth = next_thread_spawn_depth(&session_source); + let max_depth = turn.config.agent_max_depth; + if exceeds_thread_spawn_depth_limit(child_depth, max_depth) { + return Err(FunctionCallError::RespondToModel( + "Agent depth limit reached. Solve the task yourself.".to_string(), + )); + } + session + .send_event( + &turn, + CollabAgentSpawnBeginEvent { + call_id: call_id.clone(), + started_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + prompt: prompt.clone(), + model: args.model.clone().unwrap_or_default(), + reasoning_effort: args.reasoning_effort.unwrap_or_default(), + } + .into(), + ) + .await; + let mut config = + build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?; + if args.fork_context { + reject_full_fork_spawn_overrides(role_name, args.model.as_deref(), args.reasoning_effort)?; + } else { + apply_requested_spawn_agent_model_overrides( + &session, + turn.as_ref(), + &mut config, + args.model.as_deref(), + args.reasoning_effort, + ) + .await?; + apply_role_to_config(&mut config, role_name) + .await + .map_err(FunctionCallError::RespondToModel)?; + } + apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?; + apply_spawn_agent_overrides(&mut config, child_depth); + + let result = Box::pin(session.services.agent_control.spawn_agent_with_metadata( + config, + input_items, + Some(thread_spawn_source( + session.conversation_id, + &turn.session_source, + child_depth, + role_name, + /*task_name*/ None, + )?), + SpawnAgentOptions { + fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()), + fork_mode: args.fork_context.then_some(SpawnAgentForkMode::FullHistory), + environments: Some(turn.environments.to_selections()), + }, + )) + .await + .map_err(collab_spawn_error); + let (new_thread_id, new_agent_metadata, status) = match &result { + Ok(spawned_agent) => ( + Some(spawned_agent.thread_id), + Some(spawned_agent.metadata.clone()), + spawned_agent.status.clone(), + ), + Err(_) => (None, None, AgentStatus::NotFound), + }; + let agent_snapshot = match new_thread_id { + Some(thread_id) => { + session + .services + .agent_control + .get_agent_config_snapshot(thread_id) + .await + } + None => None, + }; + let (_new_agent_path, new_agent_nickname, new_agent_role) = + match (&agent_snapshot, new_agent_metadata) { + (Some(snapshot), _) => ( + snapshot.session_source.get_agent_path().map(String::from), + snapshot.session_source.get_nickname(), + snapshot.session_source.get_agent_role(), + ), + (None, Some(metadata)) => ( + metadata.agent_path.map(String::from), + metadata.agent_nickname, + metadata.agent_role, + ), + (None, None) => (None, None, None), + }; + let effective_model = agent_snapshot + .as_ref() + .map(|snapshot| snapshot.model.clone()) + .unwrap_or_else(|| args.model.clone().unwrap_or_default()); + let effective_reasoning_effort = agent_snapshot + .as_ref() + .and_then(|snapshot| snapshot.reasoning_effort) + .unwrap_or(args.reasoning_effort.unwrap_or_default()); + let nickname = new_agent_nickname.clone(); + session + .send_event( + &turn, + CollabAgentSpawnEndEvent { + call_id, + completed_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + new_thread_id, + new_agent_nickname, + new_agent_role, + prompt, + model: effective_model, + reasoning_effort: effective_reasoning_effort, + status, + } + .into(), + ) + .await; + let new_thread_id = result?.thread_id; + let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); + turn.session_telemetry.counter( + "codex.multi_agent.spawn", + /*inc*/ 1, + &[("role", role_tag)], + ); + + Ok(SpawnAgentResult { + agent_id: new_thread_id.to_string(), + nickname, + }) +} + #[derive(Debug, Deserialize)] struct SpawnAgentArgs { message: Option, diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index b030cb010c..d1360186a0 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -3179,10 +3179,11 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr let parent_thread_id = parent.thread_id; let parent_session = parent.thread.codex.session.clone(); + let child_turn = parent_session.new_default_turn().await; let child_spawn_output = SpawnAgentHandler::default() .handle(invocation( parent_session.clone(), - parent_session.new_default_turn().await, + child_turn, "spawn_agent", function_payload(json!({"message": "hello child"})), )) diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/close_agent.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/close_agent.rs index 6f2984c338..246b971739 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/close_agent.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/close_agent.rs @@ -20,100 +20,109 @@ impl ToolHandler for Handler { matches!(payload, ToolPayload::Function { .. }) } - async fn handle(&self, invocation: ToolInvocation) -> Result { - let ToolInvocation { - session, - turn, - payload, - call_id, - .. - } = invocation; - let arguments = function_arguments(payload)?; - let args: CloseAgentArgs = parse_arguments(&arguments)?; - let agent_id = resolve_agent_target(&session, &turn, &args.target).await?; - let receiver_agent = session - .services - .agent_control - .get_agent_metadata(agent_id) - .unwrap_or_default(); - if receiver_agent - .agent_path - .as_ref() - .is_some_and(AgentPath::is_root) - { - return Err(FunctionCallError::RespondToModel( - "root is not a spawned agent".to_string(), - )); - } - session - .send_event( - &turn, - CollabCloseBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - receiver_thread_id: agent_id, - } - .into(), - ) - .await; - let status = match session - .services - .agent_control - .subscribe_status(agent_id) - .await - { - Ok(mut status_rx) => status_rx.borrow_and_update().clone(), - Err(err) => { - let status = session.services.agent_control.get_status(agent_id).await; - session - .send_event( - &turn, - CollabCloseEndEvent { - call_id: call_id.clone(), - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - receiver_thread_id: agent_id, - receiver_agent_nickname: receiver_agent.agent_nickname.clone(), - receiver_agent_role: receiver_agent.agent_role.clone(), - status, - } - .into(), - ) - .await; - return Err(collab_agent_error(agent_id, err)); - } - }; - let result = session - .services - .agent_control - .close_agent(agent_id) - .await - .map_err(|err| collab_agent_error(agent_id, err)) - .map(|_| ()); - session - .send_event( - &turn, - CollabCloseEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - receiver_thread_id: agent_id, - receiver_agent_nickname: receiver_agent.agent_nickname, - receiver_agent_role: receiver_agent.agent_role, - status: status.clone(), - } - .into(), - ) - .await; - result?; - - Ok(CloseAgentResult { - previous_status: status, - }) + fn handle( + &self, + invocation: ToolInvocation, + ) -> impl std::future::Future> + Send { + Box::pin(handle_close_agent(invocation)) } } +async fn handle_close_agent( + invocation: ToolInvocation, +) -> Result { + let ToolInvocation { + session, + turn, + payload, + call_id, + .. + } = invocation; + let arguments = function_arguments(payload)?; + let args: CloseAgentArgs = parse_arguments(&arguments)?; + let agent_id = resolve_agent_target(&session, &turn, &args.target).await?; + let receiver_agent = session + .services + .agent_control + .get_agent_metadata(agent_id) + .unwrap_or_default(); + if receiver_agent + .agent_path + .as_ref() + .is_some_and(AgentPath::is_root) + { + return Err(FunctionCallError::RespondToModel( + "root is not a spawned agent".to_string(), + )); + } + session + .send_event( + &turn, + CollabCloseBeginEvent { + call_id: call_id.clone(), + started_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + receiver_thread_id: agent_id, + } + .into(), + ) + .await; + let status = match session + .services + .agent_control + .subscribe_status(agent_id) + .await + { + Ok(mut status_rx) => status_rx.borrow_and_update().clone(), + Err(err) => { + let status = session.services.agent_control.get_status(agent_id).await; + session + .send_event( + &turn, + CollabCloseEndEvent { + call_id: call_id.clone(), + completed_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + receiver_thread_id: agent_id, + receiver_agent_nickname: receiver_agent.agent_nickname.clone(), + receiver_agent_role: receiver_agent.agent_role.clone(), + status, + } + .into(), + ) + .await; + return Err(collab_agent_error(agent_id, err)); + } + }; + let result = session + .services + .agent_control + .close_agent(agent_id) + .await + .map_err(|err| collab_agent_error(agent_id, err)) + .map(|_| ()); + session + .send_event( + &turn, + CollabCloseEndEvent { + call_id, + completed_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + receiver_thread_id: agent_id, + receiver_agent_nickname: receiver_agent.agent_nickname, + receiver_agent_role: receiver_agent.agent_role, + status: status.clone(), + } + .into(), + ) + .await; + result?; + + Ok(CloseAgentResult { + previous_status: status, + }) +} + #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] struct CloseAgentArgs { diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs index 6642c5e4b0..9544c0b9f8 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs @@ -39,188 +39,192 @@ impl ToolHandler for Handler { matches!(payload, ToolPayload::Function { .. }) } - async fn handle(&self, invocation: ToolInvocation) -> Result { - let ToolInvocation { - session, - turn, - payload, - call_id, - .. - } = invocation; - let arguments = function_arguments(payload)?; - let args: SpawnAgentArgs = parse_arguments(&arguments)?; - let fork_mode = args.fork_mode()?; - let role_name = args - .agent_type - .as_deref() - .map(str::trim) - .filter(|role| !role.is_empty()); + fn handle( + &self, + invocation: ToolInvocation, + ) -> impl std::future::Future> + Send { + Box::pin(handle_spawn_agent(invocation)) + } +} - let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?; - let prompt = render_input_preview(&initial_operation); +async fn handle_spawn_agent( + invocation: ToolInvocation, +) -> Result { + let ToolInvocation { + session, + turn, + payload, + call_id, + .. + } = invocation; + let arguments = function_arguments(payload)?; + let args: SpawnAgentArgs = parse_arguments(&arguments)?; + let fork_mode = args.fork_mode()?; + let role_name = args + .agent_type + .as_deref() + .map(str::trim) + .filter(|role| !role.is_empty()); - let session_source = turn.session_source.clone(); - let child_depth = next_thread_spawn_depth(&session_source); - session - .send_event( - &turn, - CollabAgentSpawnBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - prompt: prompt.clone(), - model: args.model.clone().unwrap_or_default(), - reasoning_effort: args.reasoning_effort.unwrap_or_default(), - } - .into(), - ) - .await; - let mut config = - build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?; - if matches!(fork_mode, Some(SpawnAgentForkMode::FullHistory)) { - reject_full_fork_spawn_overrides( - role_name, - args.model.as_deref(), - args.reasoning_effort, - )?; - } else { - apply_requested_spawn_agent_model_overrides( - &session, - turn.as_ref(), - &mut config, - args.model.as_deref(), - args.reasoning_effort, - ) - .await?; - apply_role_to_config(&mut config, role_name) - .await - .map_err(FunctionCallError::RespondToModel)?; - } - apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?; - apply_spawn_agent_overrides(&mut config, child_depth); + let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?; + let prompt = render_input_preview(&initial_operation); - let spawn_source = thread_spawn_source( - session.conversation_id, - &turn.session_source, - child_depth, - role_name, - Some(args.task_name.clone()), - )?; - let result = session - .services - .agent_control - .spawn_agent_with_metadata( - config, - match (spawn_source.get_agent_path(), initial_operation) { - (Some(recipient), Op::UserInput { items, .. }) - if items - .iter() - .all(|item| matches!(item, UserInput::Text { .. })) => - { - Op::InterAgentCommunication { - communication: InterAgentCommunication::new( - turn.session_source - .get_agent_path() - .unwrap_or_else(AgentPath::root), - recipient, - Vec::new(), - prompt.clone(), - /*trigger_turn*/ true, - ), - } - } - (_, initial_operation) => initial_operation, - }, - Some(spawn_source), - SpawnAgentOptions { - fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()), - fork_mode, - environments: Some(turn.environments.to_selections()), - }, - ) - .await - .map_err(collab_spawn_error); - let (new_thread_id, new_agent_metadata, status) = match &result { - Ok(spawned_agent) => ( - Some(spawned_agent.thread_id), - Some(spawned_agent.metadata.clone()), - spawned_agent.status.clone(), - ), - Err(_) => (None, None, AgentStatus::NotFound), - }; - let agent_snapshot = match new_thread_id { - Some(thread_id) => { - session - .services - .agent_control - .get_agent_config_snapshot(thread_id) - .await + let session_source = turn.session_source.clone(); + let child_depth = next_thread_spawn_depth(&session_source); + session + .send_event( + &turn, + CollabAgentSpawnBeginEvent { + call_id: call_id.clone(), + started_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + prompt: prompt.clone(), + model: args.model.clone().unwrap_or_default(), + reasoning_effort: args.reasoning_effort.unwrap_or_default(), } - None => None, - }; - let (new_agent_path, new_agent_nickname, new_agent_role) = - match (&agent_snapshot, new_agent_metadata) { - (Some(snapshot), _) => ( - snapshot.session_source.get_agent_path().map(String::from), - snapshot.session_source.get_nickname(), - snapshot.session_source.get_agent_role(), - ), - (None, Some(metadata)) => ( - metadata.agent_path.map(String::from), - metadata.agent_nickname, - metadata.agent_role, - ), - (None, None) => (None, None, None), - }; - let effective_model = agent_snapshot - .as_ref() - .map(|snapshot| snapshot.model.clone()) - .unwrap_or_else(|| args.model.clone().unwrap_or_default()); - let effective_reasoning_effort = agent_snapshot - .as_ref() - .and_then(|snapshot| snapshot.reasoning_effort) - .unwrap_or(args.reasoning_effort.unwrap_or_default()); - let nickname = new_agent_nickname.clone(); - session - .send_event( - &turn, - CollabAgentSpawnEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.conversation_id, - new_thread_id, - new_agent_nickname, - new_agent_role, - prompt, - model: effective_model, - reasoning_effort: effective_reasoning_effort, - status, - } - .into(), - ) - .await; - let _ = result?; - let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); - turn.session_telemetry.counter( - "codex.multi_agent.spawn", - /*inc*/ 1, - &[("role", role_tag)], - ); - let task_name = new_agent_path.ok_or_else(|| { - FunctionCallError::RespondToModel( - "spawned agent is missing a canonical task name".to_string(), - ) - })?; + .into(), + ) + .await; + let mut config = + build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?; + if matches!(fork_mode, Some(SpawnAgentForkMode::FullHistory)) { + reject_full_fork_spawn_overrides(role_name, args.model.as_deref(), args.reasoning_effort)?; + } else { + apply_requested_spawn_agent_model_overrides( + &session, + turn.as_ref(), + &mut config, + args.model.as_deref(), + args.reasoning_effort, + ) + .await?; + apply_role_to_config(&mut config, role_name) + .await + .map_err(FunctionCallError::RespondToModel)?; + } + apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?; + apply_spawn_agent_overrides(&mut config, child_depth); - let hide_agent_metadata = turn.config.multi_agent_v2.hide_spawn_agent_metadata; - if hide_agent_metadata { - Ok(SpawnAgentResult::HiddenMetadata { task_name }) - } else { - Ok(SpawnAgentResult::WithNickname { - task_name, - nickname, - }) + let spawn_source = thread_spawn_source( + session.conversation_id, + &turn.session_source, + child_depth, + role_name, + Some(args.task_name.clone()), + )?; + let result = Box::pin( + session.services.agent_control.spawn_agent_with_metadata( + config, + match (spawn_source.get_agent_path(), initial_operation) { + (Some(recipient), Op::UserInput { items, .. }) + if items + .iter() + .all(|item| matches!(item, UserInput::Text { .. })) => + { + Op::InterAgentCommunication { + communication: InterAgentCommunication::new( + turn.session_source + .get_agent_path() + .unwrap_or_else(AgentPath::root), + recipient, + Vec::new(), + prompt.clone(), + /*trigger_turn*/ true, + ), + } + } + (_, initial_operation) => initial_operation, + }, + Some(spawn_source), + SpawnAgentOptions { + fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()), + fork_mode, + environments: Some(turn.environments.to_selections()), + }, + ), + ) + .await + .map_err(collab_spawn_error); + let (new_thread_id, new_agent_metadata, status) = match &result { + Ok(spawned_agent) => ( + Some(spawned_agent.thread_id), + Some(spawned_agent.metadata.clone()), + spawned_agent.status.clone(), + ), + Err(_) => (None, None, AgentStatus::NotFound), + }; + let agent_snapshot = match new_thread_id { + Some(thread_id) => { + session + .services + .agent_control + .get_agent_config_snapshot(thread_id) + .await } + None => None, + }; + let (new_agent_path, new_agent_nickname, new_agent_role) = + match (&agent_snapshot, new_agent_metadata) { + (Some(snapshot), _) => ( + snapshot.session_source.get_agent_path().map(String::from), + snapshot.session_source.get_nickname(), + snapshot.session_source.get_agent_role(), + ), + (None, Some(metadata)) => ( + metadata.agent_path.map(String::from), + metadata.agent_nickname, + metadata.agent_role, + ), + (None, None) => (None, None, None), + }; + let effective_model = agent_snapshot + .as_ref() + .map(|snapshot| snapshot.model.clone()) + .unwrap_or_else(|| args.model.clone().unwrap_or_default()); + let effective_reasoning_effort = agent_snapshot + .as_ref() + .and_then(|snapshot| snapshot.reasoning_effort) + .unwrap_or(args.reasoning_effort.unwrap_or_default()); + let nickname = new_agent_nickname.clone(); + session + .send_event( + &turn, + CollabAgentSpawnEndEvent { + call_id, + completed_at_ms: now_unix_timestamp_ms(), + sender_thread_id: session.conversation_id, + new_thread_id, + new_agent_nickname, + new_agent_role, + prompt, + model: effective_model, + reasoning_effort: effective_reasoning_effort, + status, + } + .into(), + ) + .await; + let _ = result?; + let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); + turn.session_telemetry.counter( + "codex.multi_agent.spawn", + /*inc*/ 1, + &[("role", role_tag)], + ); + let task_name = new_agent_path.ok_or_else(|| { + FunctionCallError::RespondToModel( + "spawned agent is missing a canonical task name".to_string(), + ) + })?; + + let hide_agent_metadata = turn.config.multi_agent_v2.hide_spawn_agent_metadata; + if hide_agent_metadata { + Ok(SpawnAgentResult::HiddenMetadata { task_name }) + } else { + Ok(SpawnAgentResult::WithNickname { + task_name, + nickname, + }) } }