Compare commits

...

1 Commits

Author SHA1 Message Date
starr-openai
27d5ba2aae Add subagent parent mailbox messaging
Co-authored-by: Codex <noreply@openai.com>
2026-04-17 14:55:29 -07:00
16 changed files with 1026 additions and 19 deletions

View File

@@ -483,6 +483,7 @@ impl AgentControl {
) -> CodexResult<ThreadId> {
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = &session_source
&& *depth >= config.agent_max_depth
&& !config.features.enabled(Feature::MultiAgentV2)
{
let _ = config.features.disable(Feature::SpawnCsv);
let _ = config.features.disable(Feature::Collab);
@@ -638,6 +639,37 @@ impl AgentControl {
result
}
pub(crate) async fn enqueue_inter_agent_communication(
&self,
agent_id: ThreadId,
communication: InterAgentCommunication,
) -> CodexResult<()> {
let last_task_message = communication.content.clone();
let state = self.upgrade()?;
let thread = state.get_thread(agent_id).await?;
thread
.codex
.session
.enqueue_mailbox_communication(communication);
self.state
.update_last_task_message(agent_id, last_task_message);
Ok(())
}
pub(crate) async fn maybe_start_turn_for_pending_work(
&self,
agent_id: ThreadId,
) -> CodexResult<()> {
let state = self.upgrade()?;
let thread = state.get_thread(agent_id).await?;
thread
.codex
.session
.maybe_start_turn_for_pending_work()
.await;
Ok(())
}
/// Interrupt the current task for an existing agent thread.
pub(crate) async fn interrupt_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
let state = self.upgrade()?;
@@ -717,6 +749,12 @@ impl AgentControl {
thread.agent_status().await
}
pub(crate) async fn has_mailbox_waiters(&self, agent_id: ThreadId) -> CodexResult<bool> {
let state = self.upgrade()?;
let thread = state.get_thread(agent_id).await?;
Ok(thread.codex.session.has_mailbox_waiters())
}
pub(crate) fn register_session_root(
&self,
current_thread_id: ThreadId,
@@ -731,6 +769,10 @@ impl AgentControl {
self.state.agent_metadata_for_thread(agent_id)
}
pub(crate) fn agent_id_for_path(&self, agent_path: &AgentPath) -> Option<ThreadId> {
self.state.agent_id_for_path(agent_path)
}
pub(crate) async fn list_live_agent_subtree_thread_ids(
&self,
agent_id: ThreadId,

View File

@@ -1,6 +1,7 @@
use codex_protocol::protocol::InterAgentCommunication;
use std::collections::VecDeque;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::sync::mpsc;
use tokio::sync::watch;
@@ -12,6 +13,7 @@ pub(crate) struct Mailbox {
tx: mpsc::UnboundedSender<InterAgentCommunication>,
next_seq: AtomicU64,
seq_tx: watch::Sender<u64>,
waiter_count: AtomicUsize,
}
pub(crate) struct MailboxReceiver {
@@ -28,6 +30,7 @@ impl Mailbox {
tx,
next_seq: AtomicU64::new(0),
seq_tx,
waiter_count: AtomicUsize::new(0),
},
MailboxReceiver {
rx,
@@ -46,6 +49,25 @@ impl Mailbox {
self.seq_tx.send_replace(seq);
seq
}
pub(crate) fn begin_wait(&self) -> MailboxWaitGuard<'_> {
self.waiter_count.fetch_add(1, Ordering::Relaxed);
MailboxWaitGuard { mailbox: self }
}
pub(crate) fn has_waiters(&self) -> bool {
self.waiter_count.load(Ordering::Relaxed) > 0
}
}
pub(crate) struct MailboxWaitGuard<'a> {
mailbox: &'a Mailbox,
}
impl Drop for MailboxWaitGuard<'_> {
fn drop(&mut self) {
self.mailbox.waiter_count.fetch_sub(1, Ordering::Relaxed);
}
}
impl MailboxReceiver {

View File

@@ -469,6 +469,7 @@ impl Codex {
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = session_source
&& depth >= config.agent_max_depth
&& !config.features.enabled(Feature::MultiAgentV2)
{
let _ = config.features.disable(Feature::SpawnCsv);
let _ = config.features.disable(Feature::Collab);
@@ -2867,6 +2868,23 @@ impl Session {
self.mailbox.subscribe()
}
pub(crate) fn begin_mailbox_wait(&self) -> crate::agent::mailbox::MailboxWaitGuard<'_> {
self.mailbox.begin_wait()
}
pub(crate) fn has_mailbox_waiters(&self) -> bool {
self.mailbox.has_waiters()
}
pub(crate) async fn session_source(&self) -> SessionSource {
self.state
.lock()
.await
.session_configuration
.session_source
.clone()
}
pub(crate) fn enqueue_mailbox_communication(&self, communication: InterAgentCommunication) {
self.mailbox.send(communication);
}

View File

@@ -1,5 +1,6 @@
use super::*;
use crate::CodexThread;
use crate::NewThread;
use crate::ThreadManager;
use crate::config::AgentRoleConfig;
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
@@ -13,6 +14,7 @@ use crate::tools::context::ToolOutput;
use crate::tools::handlers::multi_agents_v2::CloseAgentHandler as CloseAgentHandlerV2;
use crate::tools::handlers::multi_agents_v2::FollowupTaskHandler as FollowupTaskHandlerV2;
use crate::tools::handlers::multi_agents_v2::ListAgentsHandler as ListAgentsHandlerV2;
use crate::tools::handlers::multi_agents_v2::ParentMessageHandler as ParentMessageHandlerV2;
use crate::tools::handlers::multi_agents_v2::SendMessageHandler as SendMessageHandlerV2;
use crate::tools::handlers::multi_agents_v2::SpawnAgentHandler as SpawnAgentHandlerV2;
use crate::tools::handlers::multi_agents_v2::WaitAgentHandler as WaitAgentHandlerV2;
@@ -25,6 +27,7 @@ use codex_model_provider::create_model_provider;
use codex_model_provider_info::built_in_model_providers;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ModeKind;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputBody;
@@ -46,6 +49,7 @@ use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::user_input::UserInput;
use core_test_support::TempDirExt;
use pretty_assertions::assert_eq;
@@ -92,6 +96,123 @@ fn thread_manager() -> ThreadManager {
)
}
async fn pending_inter_agent_mail(
session: &crate::session::session::Session,
) -> Vec<InterAgentCommunication> {
session
.get_pending_input()
.await
.into_iter()
.filter_map(|item| match item {
ResponseInputItem::Message { content, .. } => {
InterAgentCommunication::from_message_content(&content)
}
_ => None,
})
.collect()
}
async fn make_parent_message_child(
agent_path: Option<AgentPath>,
) -> (
ThreadManager,
NewThread,
crate::session::session::Session,
TurnContext,
ThreadId,
) {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
.start_thread((*turn.config).clone())
.await
.expect("root thread should start");
session.services.agent_control = manager.agent_control();
session.conversation_id = root.thread_id;
let mut config = (*turn.config).clone();
config
.features
.enable(Feature::MultiAgentV2)
.expect("test config should allow feature update");
turn.config = Arc::new(config);
let child_thread_id = session
.services
.agent_control
.spawn_agent_with_metadata(
(*turn.config).clone(),
vec![UserInput::Text {
text: "inspect this repo".to_string(),
text_elements: Vec::new(),
}]
.into(),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root.thread_id,
depth: 1,
agent_path: agent_path.clone(),
agent_nickname: None,
agent_role: None,
})),
crate::agent::control::SpawnAgentOptions::default(),
)
.await
.expect("worker spawn should succeed")
.thread_id;
session.conversation_id = child_thread_id;
turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root.thread_id,
depth: 1,
agent_path,
agent_nickname: None,
agent_role: None,
});
(manager, root, session, turn, child_thread_id)
}
async fn mark_parent_running(root: &NewThread) -> Arc<TurnContext> {
let root_turn = root.thread.codex.session.new_default_turn().await;
root.thread
.codex
.session
.send_event(
root_turn.as_ref(),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: root_turn.sub_id.clone(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}),
)
.await;
root_turn
}
async fn wait_until_parent_mail_is_pending(session: &crate::session::session::Session) {
for _ in 0..50 {
if session.has_pending_input().await {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
async fn send_parent_message(
session: crate::session::session::Session,
turn: TurnContext,
payload: serde_json::Value,
) {
ParentMessageHandlerV2
.handle(invocation(
Arc::new(session),
Arc::new(turn),
"send_parent_message",
function_payload(payload),
))
.await
.expect("parent message should succeed");
}
async fn install_role_with_model_override(turn: &mut TurnContext) -> String {
let role_name = "fork-context-role".to_string();
tokio::fs::create_dir_all(&turn.config.codex_home)
@@ -1104,6 +1225,314 @@ async fn multi_agent_v2_followup_task_rejects_root_target_from_child() {
);
}
#[tokio::test]
async fn multi_agent_v2_parent_message_interrupts_busy_parent_before_delivering_mail() {
let child_path = AgentPath::try_from("/root/worker").expect("agent path");
let (manager, root, session, turn, _) =
make_parent_message_child(Some(child_path.clone())).await;
let root_turn = mark_parent_running(&root).await;
send_parent_message(
session,
turn,
json!({
"message": "parent please take over",
"mode": "interrupt"
}),
)
.await;
let root_ops = manager
.captured_ops()
.into_iter()
.filter_map(|(id, op)| (id == root.thread_id).then_some(op))
.collect::<Vec<_>>();
assert_eq!(
root_ops
.iter()
.filter(|op| matches!(op, Op::Interrupt))
.count(),
1
);
root.thread
.codex
.session
.send_event(
root_turn.as_ref(),
EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some(root_turn.sub_id.clone()),
reason: TurnAbortReason::Interrupted,
completed_at: None,
duration_ms: None,
}),
)
.await;
wait_until_parent_mail_is_pending(root.thread.codex.session.as_ref()).await;
assert_eq!(
pending_inter_agent_mail(root.thread.codex.session.as_ref()).await,
vec![InterAgentCommunication::new(
child_path,
AgentPath::root(),
Vec::new(),
"parent please take over".to_string(),
/*trigger_turn*/ true,
)]
);
}
#[tokio::test]
async fn multi_agent_v2_parent_message_queue_trigger_waits_for_busy_parent_to_finish() {
let child_path = AgentPath::try_from("/root/worker").expect("agent path");
let (manager, root, session, turn, _) =
make_parent_message_child(Some(child_path.clone())).await;
let root_turn = mark_parent_running(&root).await;
send_parent_message(
session,
turn,
json!({
"message": "parent should process when free",
"mode": "queue",
"trigger_turn": true
}),
)
.await;
let root_ops = manager
.captured_ops()
.into_iter()
.filter_map(|(id, op)| (id == root.thread_id).then_some(op))
.collect::<Vec<_>>();
assert_eq!(
root_ops
.iter()
.filter(|op| matches!(op, Op::Interrupt))
.count(),
0
);
assert!(
!root.thread.codex.session.has_pending_input().await,
"queue plus trigger should not be drainable by the currently running parent turn"
);
root.thread
.codex
.session
.send_event(
root_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: root_turn.sub_id.clone(),
last_agent_message: Some("parent turn done".to_string()),
completed_at: None,
duration_ms: None,
}),
)
.await;
wait_until_parent_mail_is_pending(root.thread.codex.session.as_ref()).await;
assert_eq!(
pending_inter_agent_mail(root.thread.codex.session.as_ref()).await,
vec![InterAgentCommunication::new(
child_path,
AgentPath::root(),
Vec::new(),
"parent should process when free".to_string(),
/*trigger_turn*/ true,
)]
);
}
#[tokio::test]
async fn multi_agent_v2_parent_message_uses_registry_when_turn_source_is_generic() {
let child_path = AgentPath::try_from("/root/worker").expect("agent path");
let (_manager, root, session, mut turn, _) =
make_parent_message_child(Some(child_path.clone())).await;
turn.session_source = SessionSource::Cli;
send_parent_message(
session,
turn,
json!({
"message": "registry-resolved parent mail",
"mode": "enqueue"
}),
)
.await;
assert_eq!(
pending_inter_agent_mail(root.thread.codex.session.as_ref()).await,
vec![InterAgentCommunication::new(
child_path,
AgentPath::root(),
Vec::new(),
"registry-resolved parent mail".to_string(),
/*trigger_turn*/ false,
)]
);
}
#[tokio::test]
async fn multi_agent_v2_parent_message_synthesizes_path_for_anonymous_thread_spawn() {
let (_manager, root, session, turn, child_thread_id) =
make_parent_message_child(/*agent_path*/ None).await;
send_parent_message(
session,
turn,
json!({
"message": "anonymous parent mail",
"mode": "enqueue"
}),
)
.await;
let synthetic_child_path = AgentPath::try_from(format!(
"/root/agent_{}",
child_thread_id.to_string().replace('-', "_")
))
.expect("synthetic child path should be valid");
assert_eq!(
pending_inter_agent_mail(root.thread.codex.session.as_ref()).await,
vec![InterAgentCommunication::new(
synthetic_child_path,
AgentPath::root(),
Vec::new(),
"anonymous parent mail".to_string(),
/*trigger_turn*/ false,
)]
);
}
#[tokio::test]
async fn multi_agent_v2_parent_message_wakes_wait_agent_without_interrupting_parent() {
let (_session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let mut config = (*turn.config).clone();
config
.features
.enable(Feature::MultiAgentV2)
.expect("test config should allow feature update");
let root = manager
.start_thread(config.clone())
.await
.expect("root thread should start");
turn.config = Arc::new(config);
let parent_session = root.thread.codex.session.clone();
let parent_turn = Arc::new(turn);
let wait_task = tokio::spawn({
let session = parent_session.clone();
let turn = parent_turn.clone();
async move {
WaitAgentHandlerV2
.handle(invocation(
session,
turn,
"wait_agent",
function_payload(json!({"timeout_ms": 10_000})),
))
.await
}
});
for _ in 0..50 {
if parent_session.has_mailbox_waiters() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(
parent_session.has_mailbox_waiters(),
"wait_agent should register mailbox waiter"
);
let child_path = AgentPath::try_from("/root/worker").expect("agent path");
let child_thread_id = parent_session
.services
.agent_control
.spawn_agent_with_metadata(
(*parent_turn.config).clone(),
vec![UserInput::Text {
text: "inspect this repo".to_string(),
text_elements: Vec::new(),
}]
.into(),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root.thread_id,
depth: 1,
agent_path: Some(child_path.clone()),
agent_nickname: None,
agent_role: None,
})),
crate::agent::control::SpawnAgentOptions::default(),
)
.await
.expect("worker spawn should succeed")
.thread_id;
let (mut child_session, mut child_turn) = make_session_and_context().await;
child_session.services.agent_control = manager.agent_control();
child_session.conversation_id = child_thread_id;
child_turn.config = parent_turn.config.clone();
child_turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root.thread_id,
depth: 1,
agent_path: Some(child_path.clone()),
agent_nickname: None,
agent_role: None,
});
ParentMessageHandlerV2
.handle(invocation(
Arc::new(child_session),
Arc::new(child_turn),
"send_parent_message",
function_payload(json!({
"message": "waiter-visible parent mail",
"mode": "queue",
"trigger_turn": true
})),
))
.await
.expect("parent message should succeed");
let wait_output = wait_task
.await
.expect("wait task should join")
.expect("wait_agent should succeed");
let (content, _) = expect_text_output(wait_output);
let result: crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult =
serde_json::from_str(&content).expect("wait_agent result should parse");
assert_eq!(
result,
crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult {
message: "Wait completed.".to_string(),
timed_out: false,
}
);
let root_ops = manager
.captured_ops()
.into_iter()
.filter_map(|(id, op)| (id == root.thread_id).then_some(op))
.collect::<Vec<_>>();
assert_eq!(
root_ops
.iter()
.filter(|op| matches!(op, Op::Interrupt))
.count(),
0
);
assert_eq!(
pending_inter_agent_mail(parent_session.as_ref()).await,
vec![InterAgentCommunication::new(
child_path,
AgentPath::root(),
Vec::new(),
"waiter-visible parent mail".to_string(),
/*trigger_turn*/ true,
)]
);
}
#[tokio::test]
async fn multi_agent_v2_list_agents_returns_completed_status_and_last_task_message() {
let (mut session, mut turn) = make_session_and_context().await;
@@ -2847,7 +3276,7 @@ async fn multi_agent_v2_wait_agent_returns_summary_for_mailbox_activity() {
}
#[tokio::test]
async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() {
async fn multi_agent_v2_wait_agent_returns_pending_mail_after_start() {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
@@ -2892,7 +3321,7 @@ async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() {
.expect("worker path");
session.enqueue_mailbox_communication(InterAgentCommunication::new(
worker_path.clone(),
worker_path,
AgentPath::root(),
Vec::new(),
"already queued".to_string(),
@@ -2913,21 +3342,6 @@ async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() {
.await
}
});
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(
!wait_task.is_finished(),
"mail already queued before wait should not wake wait_agent"
);
session.enqueue_mailbox_communication(InterAgentCommunication::new(
worker_path,
AgentPath::root(),
Vec::new(),
"new mail".to_string(),
/*trigger_turn*/ false,
));
let output = wait_task
.await
.expect("wait task should join")

View File

@@ -30,6 +30,7 @@ use serde_json::Value as JsonValue;
pub(crate) use close_agent::Handler as CloseAgentHandler;
pub(crate) use followup_task::Handler as FollowupTaskHandler;
pub(crate) use list_agents::Handler as ListAgentsHandler;
pub(crate) use parent_message::Handler as ParentMessageHandler;
pub(crate) use send_message::Handler as SendMessageHandler;
pub(crate) use spawn::Handler as SpawnAgentHandler;
pub(crate) use wait::Handler as WaitAgentHandler;
@@ -38,6 +39,7 @@ mod close_agent;
mod followup_task;
mod list_agents;
mod message_tool;
mod parent_message;
mod send_message;
mod spawn;
pub(crate) mod wait;

View File

@@ -47,7 +47,7 @@ pub(crate) struct FollowupTaskArgs {
pub(crate) interrupt: bool,
}
fn message_content(message: String) -> Result<String, FunctionCallError> {
pub(crate) fn message_content(message: String) -> Result<String, FunctionCallError> {
if message.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be sent to an agent".to_string(),

View File

@@ -0,0 +1,359 @@
use super::message_tool::message_content;
use super::*;
use crate::tools::context::FunctionToolOutput;
use codex_protocol::ThreadId;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
struct ParentMessageSource {
parent_thread_id: ThreadId,
parent_agent_path: AgentPath,
child_agent_path: AgentPath,
}
pub(crate) struct Handler;
impl ToolHandler for Handler {
type Output = FunctionToolOutput;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let arguments = function_arguments(invocation.payload.clone())?;
let args: ParentMessageArgs = parse_arguments(&arguments)?;
let delivery = args.delivery_options();
handle_parent_message(invocation, message_content(args.message)?, delivery).await
}
}
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct ParentMessageArgs {
message: String,
#[serde(default)]
mode: ParentMessageMode,
trigger_turn: Option<bool>,
}
impl ParentMessageArgs {
fn delivery_options(&self) -> ParentMessageDelivery {
ParentMessageDelivery {
mode: self.mode,
trigger_turn: self
.trigger_turn
.unwrap_or(self.mode == ParentMessageMode::Interrupt),
}
}
}
#[derive(Debug, Default, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
enum ParentMessageMode {
#[default]
#[serde(alias = "enqueue")]
Queue,
Interrupt,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct ParentMessageDelivery {
mode: ParentMessageMode,
trigger_turn: bool,
}
async fn handle_parent_message(
invocation: ToolInvocation,
prompt: String,
delivery: ParentMessageDelivery,
) -> Result<FunctionToolOutput, FunctionCallError> {
let ToolInvocation {
session,
turn,
call_id,
..
} = invocation;
let session_source = session.session_source().await;
let source_result = parent_message_source(&turn.session_source)
.or_else(|| parent_message_source(&session_source))
.or_else(|| parent_message_source_from_agent_registry(session.as_ref()))
.or_else(|| {
parent_message_source_from_thread_spawn_context(
session.as_ref(),
&turn.session_source,
&session_source,
)
});
let source = match source_result {
Some(Ok(source)) => Some(source),
Some(Err(err)) => return Err(err),
None => {
parent_message_source_from_parent_thread(
session.as_ref(),
&turn.session_source,
&session_source,
)
.await
}
};
let source = source.ok_or_else(|| {
FunctionCallError::RespondToModel(
"send_parent_message is only available from a spawned sub-agent".to_string(),
)
})?;
session
.send_event(
&turn,
CollabAgentInteractionBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id: source.parent_thread_id,
prompt: prompt.clone(),
}
.into(),
)
.await;
let parent_waiting = if delivery.trigger_turn && delivery.mode == ParentMessageMode::Queue {
session
.services
.agent_control
.has_mailbox_waiters(source.parent_thread_id)
.await
.map_err(|err| collab_agent_error(source.parent_thread_id, err))?
} else {
false
};
let communication = InterAgentCommunication::new(
source.child_agent_path,
source.parent_agent_path,
Vec::new(),
prompt.clone(),
delivery.trigger_turn,
);
let parent_status_before_wake = session
.services
.agent_control
.get_status(source.parent_thread_id)
.await;
let parent_running = matches!(parent_status_before_wake, AgentStatus::Running);
let defer_delivery_until_parent_is_free = parent_running
&& (delivery.mode == ParentMessageMode::Interrupt
|| (delivery.trigger_turn && !parent_waiting));
let send_result = if defer_delivery_until_parent_is_free {
if delivery.mode == ParentMessageMode::Interrupt {
session
.services
.agent_control
.interrupt_agent(source.parent_thread_id)
.await
.map_err(|err| collab_agent_error(source.parent_thread_id, err))?;
}
deliver_parent_message_when_parent_is_free(
session.services.agent_control.clone(),
source.parent_thread_id,
communication,
delivery.trigger_turn,
);
Ok(())
} else {
session
.services
.agent_control
.enqueue_inter_agent_communication(source.parent_thread_id, communication)
.await
.map_err(|err| collab_agent_error(source.parent_thread_id, err))
};
if send_result.is_ok()
&& delivery.trigger_turn
&& !parent_waiting
&& !defer_delivery_until_parent_is_free
{
session
.services
.agent_control
.maybe_start_turn_for_pending_work(source.parent_thread_id)
.await
.map_err(|err| collab_agent_error(source.parent_thread_id, err))?;
}
let status = session
.services
.agent_control
.get_status(source.parent_thread_id)
.await;
session
.send_event(
&turn,
CollabAgentInteractionEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id: source.parent_thread_id,
receiver_agent_nickname: None,
receiver_agent_role: None,
prompt,
status,
}
.into(),
)
.await;
send_result?;
Ok(FunctionToolOutput::from_text(String::new(), Some(true)))
}
fn deliver_parent_message_when_parent_is_free(
agent_control: crate::agent::control::AgentControl,
parent_thread_id: ThreadId,
communication: InterAgentCommunication,
trigger_turn: bool,
) {
tokio::spawn(async move {
let Ok(mut status_rx) = agent_control.subscribe_status(parent_thread_id).await else {
return;
};
while matches!(*status_rx.borrow(), AgentStatus::Running) {
if status_rx.changed().await.is_err() {
return;
}
}
if agent_control
.enqueue_inter_agent_communication(parent_thread_id, communication)
.await
.is_err()
{
return;
}
if trigger_turn {
let _ = agent_control
.maybe_start_turn_for_pending_work(parent_thread_id)
.await;
}
});
}
fn parent_message_source(
session_source: &SessionSource,
) -> Option<Result<ParentMessageSource, FunctionCallError>> {
match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
agent_path: Some(agent_path),
..
}) => Some(
parent_agent_path(agent_path).map(|parent_agent_path| ParentMessageSource {
parent_thread_id: *parent_thread_id,
parent_agent_path,
child_agent_path: agent_path.clone(),
}),
),
_ => None,
}
}
fn parent_message_source_from_agent_registry(
session: &crate::session::session::Session,
) -> Option<Result<ParentMessageSource, FunctionCallError>> {
let child_agent_path = session
.services
.agent_control
.get_agent_metadata(session.conversation_id)?
.agent_path?;
let parent_agent_path = match parent_agent_path(&child_agent_path) {
Ok(parent_agent_path) => parent_agent_path,
Err(err) => return Some(Err(err)),
};
let Some(parent_thread_id) = session
.services
.agent_control
.agent_id_for_path(&parent_agent_path)
else {
return Some(Err(FunctionCallError::RespondToModel(
"Could not resolve parent thread for this sub-agent".to_string(),
)));
};
Some(Ok(ParentMessageSource {
parent_thread_id,
parent_agent_path,
child_agent_path,
}))
}
fn parent_message_source_from_thread_spawn_context(
session: &crate::session::session::Session,
turn_session_source: &SessionSource,
session_source: &SessionSource,
) -> Option<Result<ParentMessageSource, FunctionCallError>> {
let parent_thread_id = thread_spawn_parent_thread_id(turn_session_source)
.or_else(|| thread_spawn_parent_thread_id(session_source))?;
let child_agent_path = session
.services
.agent_control
.get_agent_metadata(session.conversation_id)
.and_then(|metadata| metadata.agent_path);
child_agent_path.map(|child_agent_path| {
let parent_agent_path = parent_agent_path(&child_agent_path)?;
Ok(ParentMessageSource {
parent_thread_id,
parent_agent_path,
child_agent_path,
})
})
}
async fn parent_message_source_from_parent_thread(
session: &crate::session::session::Session,
turn_session_source: &SessionSource,
session_source: &SessionSource,
) -> Option<ParentMessageSource> {
let parent_thread_id = thread_spawn_parent_thread_id(turn_session_source)
.or_else(|| thread_spawn_parent_thread_id(session_source))?;
let parent_agent_path = session
.services
.agent_control
.get_agent_config_snapshot(parent_thread_id)
.await
.and_then(|snapshot| snapshot.session_source.get_agent_path())
.unwrap_or_else(AgentPath::root);
let synthetic_name = format!(
"agent_{}",
session.conversation_id.to_string().replace('-', "_")
);
let child_agent_path = parent_agent_path.join(&synthetic_name).ok()?;
Some(ParentMessageSource {
parent_thread_id,
parent_agent_path,
child_agent_path,
})
}
fn thread_spawn_parent_thread_id(session_source: &SessionSource) -> Option<ThreadId> {
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
}) = session_source
else {
return None;
};
Some(*parent_thread_id)
}
fn parent_agent_path(child_agent_path: &AgentPath) -> Result<AgentPath, FunctionCallError> {
child_agent_path
.as_str()
.rsplit_once('/')
.and_then(|(parent, _)| AgentPath::try_from(parent).ok())
.ok_or_else(|| {
FunctionCallError::RespondToModel(
"Could not resolve parent agent path for this sub-agent".to_string(),
)
})
}

View File

@@ -38,6 +38,7 @@ impl ToolHandler for Handler {
};
let mut mailbox_seq_rx = session.subscribe_mailbox_seq();
let _mailbox_wait_guard = session.begin_mailbox_wait();
session
.send_event(
@@ -53,7 +54,8 @@ impl ToolHandler for Handler {
.await;
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
let timed_out = !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await;
let timed_out = !session.has_pending_input().await
&& !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await;
let result = WaitAgentResult::from_timed_out(timed_out);
session

View File

@@ -104,6 +104,7 @@ pub(crate) fn build_specs_with_discoverable_tools(
use crate::tools::handlers::multi_agents_v2::CloseAgentHandler as CloseAgentHandlerV2;
use crate::tools::handlers::multi_agents_v2::FollowupTaskHandler as FollowupTaskHandlerV2;
use crate::tools::handlers::multi_agents_v2::ListAgentsHandler as ListAgentsHandlerV2;
use crate::tools::handlers::multi_agents_v2::ParentMessageHandler as ParentMessageHandlerV2;
use crate::tools::handlers::multi_agents_v2::SendMessageHandler as SendMessageHandlerV2;
use crate::tools::handlers::multi_agents_v2::SpawnAgentHandler as SpawnAgentHandlerV2;
use crate::tools::handlers::multi_agents_v2::WaitAgentHandler as WaitAgentHandlerV2;
@@ -239,6 +240,9 @@ pub(crate) fn build_specs_with_discoverable_tools(
ToolHandlerKind::RequestUserInput => {
builder.register_handler(handler.name, request_user_input_handler.clone());
}
ToolHandlerKind::ParentMessageV2 => {
builder.register_handler(handler.name, Arc::new(ParentMessageHandlerV2));
}
ToolHandlerKind::ResumeAgentV1 => {
builder.register_handler(handler.name, Arc::new(ResumeAgentHandler));
}

View File

@@ -181,6 +181,44 @@ pub fn create_followup_task_tool() -> ToolSpec {
})
}
pub fn create_parent_message_tool() -> ToolSpec {
let properties = BTreeMap::from([
(
"message".to_string(),
JsonSchema::string(Some(
"Message text to send to this sub-agent's direct parent.".to_string(),
)),
),
(
"mode".to_string(),
JsonSchema::string_enum(
vec![json!("queue"), json!("enqueue"), json!("interrupt")],
Some(
"`queue` queues the message in the parent mailbox. `enqueue` is accepted as a legacy alias for `queue`. `interrupt` cancels the parent's current turn before delivering the message."
.to_string(),
),
),
),
(
"trigger_turn".to_string(),
JsonSchema::boolean(Some(
"When true, ensure the parent processes this message: start an idle parent turn, wake wait_agent, or queue it for a follow-up turn when the parent is busy. Defaults to true for mode=\"interrupt\" and false for mode=\"queue\"/\"enqueue\"."
.to_string(),
)),
),
]);
ToolSpec::Function(ResponsesApiTool {
name: "send_parent_message".to_string(),
description: "Send a string message to this sub-agent's direct parent. `mode` controls queue vs interrupt; `trigger_turn` controls whether the parent must process the message when free."
.to_string(),
strict: false,
defer_loading: None,
parameters: JsonSchema::object(properties, Some(vec!["message".to_string()]), Some(false.into())),
output_schema: None,
})
}
pub fn create_resume_agent_tool() -> ToolSpec {
let properties = BTreeMap::from([(
"id".to_string(),

View File

@@ -192,6 +192,52 @@ fn followup_task_tool_requires_message_and_has_no_output_schema() {
assert_eq!(output_schema, None);
}
#[test]
fn parent_message_tool_separates_queue_interrupt_and_trigger_turn() {
let ToolSpec::Function(ResponsesApiTool {
description,
parameters,
output_schema,
..
}) = create_parent_message_tool()
else {
panic!("send_parent_message should be a function tool");
};
assert_eq!(
parameters.schema_type,
Some(JsonSchemaType::Single(JsonSchemaPrimitiveType::Object))
);
let properties = parameters
.properties
.as_ref()
.expect("send_parent_message should use object params");
assert!(properties.contains_key("message"));
assert!(properties.contains_key("mode"));
assert!(properties.contains_key("trigger_turn"));
assert!(!properties.contains_key("target"));
assert!(!properties.contains_key("items"));
assert!(description.contains("direct parent"));
assert_eq!(
properties
.get("mode")
.and_then(|schema| schema.enum_values.as_ref()),
Some(&vec![json!("queue"), json!("enqueue"), json!("interrupt")])
);
assert_eq!(
properties
.get("trigger_turn")
.and_then(|schema| schema.description.as_deref()),
Some(
"When true, ensure the parent processes this message: start an idle parent turn, wake wait_agent, or queue it for a follow-up turn when the parent is busy. Defaults to true for mode=\"interrupt\" and false for mode=\"queue\"/\"enqueue\"."
)
);
assert_eq!(
parameters.required.as_ref(),
Some(&vec!["message".to_string()])
);
assert_eq!(output_schema, None);
}
#[test]
fn wait_agent_tool_v2_uses_timeout_only_summary_output() {
let ToolSpec::Function(ResponsesApiTool {

View File

@@ -33,6 +33,7 @@ pub use agent_tool::create_close_agent_tool_v1;
pub use agent_tool::create_close_agent_tool_v2;
pub use agent_tool::create_followup_task_tool;
pub use agent_tool::create_list_agents_tool;
pub use agent_tool::create_parent_message_tool;
pub use agent_tool::create_resume_agent_tool;
pub use agent_tool::create_send_input_tool_v1;
pub use agent_tool::create_send_message_tool;

View File

@@ -104,6 +104,7 @@ pub struct ToolsConfig {
pub can_request_original_image_detail: bool,
pub collab_tools: bool,
pub multi_agent_v2: bool,
pub parent_message_tool: bool,
pub hide_spawn_agent_metadata: bool,
pub spawn_agent_usage_hint: bool,
pub spawn_agent_usage_hint_text: Option<String>,
@@ -201,6 +202,11 @@ impl ToolsConfig {
SessionSource::SubAgent(SubAgentSource::Other(label))
if label.starts_with("agent_job:")
);
let include_parent_message_tool = include_multi_agent_v2
&& matches!(
session_source,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
);
Self {
available_models: available_models.to_vec(),
@@ -225,6 +231,7 @@ impl ToolsConfig {
can_request_original_image_detail: include_original_image_detail,
collab_tools: include_collab_tools,
multi_agent_v2: include_multi_agent_v2,
parent_message_tool: include_parent_message_tool,
hide_spawn_agent_metadata: false,
spawn_agent_usage_hint: true,
spawn_agent_usage_hint_text: None,

View File

@@ -34,6 +34,7 @@ use crate::create_list_dir_tool;
use crate::create_list_mcp_resource_templates_tool;
use crate::create_list_mcp_resources_tool;
use crate::create_local_shell_tool;
use crate::create_parent_message_tool;
use crate::create_read_mcp_resource_tool;
use crate::create_report_agent_job_result_tool;
use crate::create_request_permissions_tool;
@@ -412,6 +413,13 @@ pub fn build_tool_registry_plan(
/*supports_parallel_tool_calls*/ false,
config.code_mode_enabled,
);
if config.parent_message_tool {
plan.push_spec(
create_parent_message_tool(),
/*supports_parallel_tool_calls*/ false,
config.code_mode_enabled,
);
}
plan.push_spec(
create_wait_agent_tool_v2(params.wait_agent_timeouts),
/*supports_parallel_tool_calls*/ false,
@@ -430,6 +438,7 @@ pub fn build_tool_registry_plan(
plan.register_handler("spawn_agent", ToolHandlerKind::SpawnAgentV2);
plan.register_handler("send_message", ToolHandlerKind::SendMessageV2);
plan.register_handler("followup_task", ToolHandlerKind::FollowupTaskV2);
plan.register_handler("send_parent_message", ToolHandlerKind::ParentMessageV2);
plan.register_handler("wait_agent", ToolHandlerKind::WaitAgentV2);
plan.register_handler("close_agent", ToolHandlerKind::CloseAgentV2);
plan.register_handler("list_agents", ToolHandlerKind::ListAgentsV2);

View File

@@ -22,6 +22,8 @@ use crate::mcp_call_tool_result_output_schema;
use codex_app_server_protocol::AppInfo;
use codex_features::Feature;
use codex_features::Features;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::config_types::WebSearchConfig;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::config_types::WindowsSandboxLevel;
@@ -221,6 +223,7 @@ fn test_build_specs_multi_agent_v2_uses_task_names_and_hides_resume() {
"list_agents",
],
);
assert_lacks_tool_name(&tools, "send_parent_message");
let spawn_agent = find_tool(&tools, "spawn_agent");
let ToolSpec::Function(ResponsesApiTool {
@@ -285,6 +288,45 @@ fn test_build_specs_multi_agent_v2_uses_task_names_and_hides_resume() {
Some(&vec!["target".to_string(), "message".to_string()])
);
let child_tools_config = ToolsConfig::new(&ToolsConfigParams {
model_info: &model_info,
available_models: &available_models,
features: &features,
image_generation_tool_auth_allowed: true,
web_search_mode: Some(WebSearchMode::Cached),
session_source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: ThreadId::new(),
depth: 1,
agent_path: Some(AgentPath::try_from("/root/worker").expect("agent path")),
agent_nickname: None,
agent_role: None,
}),
sandbox_policy: &SandboxPolicy::DangerFullAccess,
windows_sandbox_level: WindowsSandboxLevel::Disabled,
});
let (child_tools, _) = build_specs(
&child_tools_config,
/*mcp_tools*/ None,
/*deferred_mcp_tools*/ None,
&[],
);
let parent_message = find_tool(&child_tools, "send_parent_message");
let ToolSpec::Function(ResponsesApiTool {
parameters,
output_schema,
..
}) = &parent_message.spec
else {
panic!("send_parent_message should be a function tool");
};
assert_eq!(output_schema, &None);
let (properties, required) = expect_object_schema(parameters);
assert!(properties.contains_key("message"));
assert!(properties.contains_key("mode"));
assert!(properties.contains_key("trigger_turn"));
assert!(!properties.contains_key("target"));
assert_eq!(required, Some(&vec!["message".to_string()]));
let wait_agent = find_tool(&tools, "wait_agent");
let ToolSpec::Function(ResponsesApiTool {
parameters,

View File

@@ -28,6 +28,7 @@ pub enum ToolHandlerKind {
RequestPermissions,
RequestUserInput,
ResumeAgentV1,
ParentMessageV2,
SendInputV1,
SendMessageV2,
Shell,