feat: custom watcher for multi-agent v2 (#15570)

Custom watcher that sends an InterAgentCommunication on end of turn
This commit is contained in:
jif-oai
2026-03-23 22:56:17 +00:00
committed by GitHub
parent 0f34b14b41
commit 4605c65308
4 changed files with 327 additions and 42 deletions

View File

@@ -266,6 +266,7 @@ impl AgentControl {
new_thread.thread_id,
notification_source,
child_reference,
agent_metadata.agent_path.clone(),
);
Ok(LiveAgent {
@@ -437,6 +438,7 @@ impl AgentControl {
resumed_thread.thread_id,
Some(notification_source.clone()),
child_reference,
agent_metadata.agent_path.clone(),
);
self.persist_thread_spawn_edge_for_source(
resumed_thread.thread.as_ref(),
@@ -687,6 +689,7 @@ impl AgentControl {
child_thread_id: ThreadId,
session_source: Option<SessionSource>,
child_reference: String,
child_agent_path: Option<AgentPath>,
) {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
@@ -717,6 +720,39 @@ impl AgentControl {
let Ok(state) = control.upgrade() else {
return;
};
let child_thread = state.get_thread(child_thread_id).await.ok();
if let Some(child_agent_path) = child_agent_path
&& child_thread
.as_ref()
.map(|thread| thread.enabled(Feature::MultiAgentV2))
.unwrap_or(true)
{
let AgentStatus::Completed(Some(content)) = &status else {
return;
};
let Some((parent_path, _)) = child_agent_path.as_str().rsplit_once('/') else {
return;
};
let Ok(parent_agent_path) = AgentPath::try_from(parent_path) else {
return;
};
let Some(parent_thread_id) = control.state.agent_id_for_path(&parent_agent_path)
else {
return;
};
let _ = control
.send_inter_agent_communication(
parent_thread_id,
InterAgentCommunication::new(
child_agent_path,
parent_agent_path,
Vec::new(),
content.clone(),
),
)
.await;
return;
}
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {
return;
};

View File

@@ -11,11 +11,13 @@ use crate::contextual_user_message::SUBAGENT_NOTIFICATION_OPEN_TAG;
use assert_matches::assert_matches;
use chrono::Utc;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::config_types::ModeKind;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnAbortReason;
@@ -125,6 +127,29 @@ fn history_contains_text(history_items: &[ResponseItem], needle: &str) -> bool {
})
}
fn history_contains_assistant_inter_agent_communication(
history_items: &[ResponseItem],
expected: &InterAgentCommunication,
) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
if role != "assistant" {
return false;
}
content.iter().any(|content_item| match content_item {
ContentItem::OutputText { text } => {
serde_json::from_str::<InterAgentCommunication>(text)
.ok()
.as_ref()
== Some(expected)
}
ContentItem::InputText { .. } | ContentItem::InputImage { .. } => false,
})
})
}
async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> bool {
let wait = async {
loop {
@@ -937,6 +962,223 @@ async fn spawn_child_completion_notifies_parent_history() {
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
}
#[tokio::test]
async fn multi_agent_v2_completion_sends_inter_agent_message_to_direct_parent() {
let harness = AgentControlHarness::new().await;
let (root_thread_id, _) = harness.start_thread().await;
let mut config = harness.config.clone();
let _ = config.features.enable(Feature::MultiAgentV2);
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let worker_thread_id = harness
.control
.spawn_agent(
config.clone(),
text_input("hello worker"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root_thread_id,
depth: 1,
agent_path: Some(worker_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("worker spawn should succeed");
let tester_path = worker_path.join("tester").expect("tester path");
let tester_thread_id = harness
.control
.spawn_agent(
config,
text_input("hello tester"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("tester spawn should succeed");
let tester_thread = harness
.manager
.get_thread(tester_thread_id)
.await
.expect("tester thread should exist");
let tester_turn = tester_thread.codex.session.new_default_turn().await;
tester_thread
.codex
.session
.send_event(
tester_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: tester_turn.sub_id.clone(),
last_agent_message: Some("done".to_string()),
}),
)
.await;
timeout(Duration::from_secs(2), async {
loop {
let delivered = harness
.manager
.captured_ops()
.into_iter()
.any(|(thread_id, op)| {
thread_id == worker_thread_id
&& matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == tester_path
&& communication.recipient == worker_path
&& communication.other_recipients.is_empty()
&& communication.content == "done"
)
});
if delivered {
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("completion watcher should send inter-agent communication");
let worker_thread = harness
.manager
.get_thread(worker_thread_id)
.await
.expect("worker thread should exist");
let expected_message = InterAgentCommunication::new(
tester_path.clone(),
worker_path.clone(),
Vec::new(),
"done".to_string(),
);
timeout(Duration::from_secs(2), async {
loop {
let history_items = worker_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
if history_contains_assistant_inter_agent_communication(
&history_items,
&expected_message,
) && !has_subagent_notification(&history_items)
{
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("worker should record assistant inter-agent message");
}
#[tokio::test]
async fn multi_agent_v2_completion_ignores_dead_direct_parent() {
let harness = AgentControlHarness::new().await;
let (root_thread_id, root_thread) = harness.start_thread().await;
let mut config = harness.config.clone();
let _ = config.features.enable(Feature::MultiAgentV2);
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let worker_thread_id = harness
.control
.spawn_agent(
config.clone(),
text_input("hello worker"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root_thread_id,
depth: 1,
agent_path: Some(worker_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("worker spawn should succeed");
let tester_path = worker_path.join("tester").expect("tester path");
let tester_thread_id = harness
.control
.spawn_agent(
config,
text_input("hello tester"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("tester spawn should succeed");
harness
.control
.shutdown_live_agent(worker_thread_id)
.await
.expect("worker shutdown should succeed");
let tester_thread = harness
.manager
.get_thread(tester_thread_id)
.await
.expect("tester thread should exist");
let tester_turn = tester_thread.codex.session.new_default_turn().await;
tester_thread
.codex
.session
.send_event(
tester_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: tester_turn.sub_id.clone(),
last_agent_message: Some("done".to_string()),
}),
)
.await;
sleep(Duration::from_millis(100)).await;
assert!(
!harness
.manager
.captured_ops()
.into_iter()
.any(|(thread_id, op)| {
thread_id == worker_thread_id
&& matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == tester_path
&& communication.recipient == worker_path
&& communication.content == "done"
)
})
);
let root_history_items = root_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
assert!(!history_contains_assistant_inter_agent_communication(
&root_history_items,
&InterAgentCommunication::new(
tester_path,
AgentPath::root(),
Vec::new(),
"done".to_string(),
)
));
assert!(!has_subagent_notification(&root_history_items));
}
#[tokio::test]
async fn completion_watcher_notifies_parent_when_child_is_missing() {
let harness = AgentControlHarness::new().await;
@@ -953,6 +1195,7 @@ async fn completion_watcher_notifies_parent_when_child_is_missing() {
agent_role: Some("explorer".to_string()),
})),
child_thread_id.to_string(),
None,
);
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);

View File

@@ -36,19 +36,17 @@ fn assistant_message(text: &str) -> ResponseItem {
}
fn inter_agent_assistant_message(text: &str) -> ResponseItem {
let communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::root().join("worker").unwrap(),
Vec::new(),
text.to_string(),
);
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: serde_json::to_string(&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::root()
.join("worker")
.expect("worker path should be valid"),
Vec::new(),
text.to_string(),
))
.expect("inter-agent communication should serialize"),
text: serde_json::to_string(&communication).unwrap(),
}],
end_turn: None,
phase: None,

View File

@@ -80,14 +80,31 @@ fn thread_manager() -> ThreadManager {
)
}
fn history_contains_inter_agent_communication(
history_items: &[ResponseItem],
expected: &InterAgentCommunication,
) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
if role != "assistant" {
return false;
}
content.iter().any(|content_item| match content_item {
ContentItem::OutputText { text } => {
serde_json::from_str::<InterAgentCommunication>(text)
.ok()
.as_ref()
== Some(expected)
}
ContentItem::InputText { .. } | ContentItem::InputImage { .. } => false,
})
})
}
fn inter_agent_message_text(recipient: &str, content: &str) -> String {
serde_json::to_string(&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from(recipient).expect("recipient path should be valid"),
Vec::new(),
content.to_string(),
))
.expect("inter-agent communication should serialize")
format!("author: /root\nrecipient: {recipient}\nother_recipients: []\nContent: {content}")
}
#[derive(Clone, Copy)]
@@ -401,7 +418,12 @@ async fn multi_agent_v2_spawn_returns_path_and_send_input_accepts_relative_path(
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let expected_message = inter_agent_message_text("/root/test_process", "continue");
let expected_communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/test_process").expect("agent path"),
Vec::new(),
"continue".to_string(),
);
timeout(Duration::from_secs(2), async {
loop {
let history_items = child_thread
@@ -411,18 +433,8 @@ async fn multi_agent_v2_spawn_returns_path_and_send_input_accepts_relative_path(
.await
.raw_items()
.to_vec();
let recorded = history_items.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "assistant"
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::OutputText { text }
if text == &expected_message
))
)
});
let recorded =
history_contains_inter_agent_communication(&history_items, &expected_communication);
let saw_user_message = history_items.iter().any(|item| {
matches!(
item,
@@ -664,7 +676,6 @@ async fn multi_agent_v2_send_input_interrupts_busy_child_without_losing_message(
))
)));
let expected_message = inter_agent_message_text("/root/worker", "continue");
timeout(Duration::from_secs(5), async {
loop {
let history_items = thread
@@ -674,18 +685,15 @@ async fn multi_agent_v2_send_input_interrupts_busy_child_without_losing_message(
.await
.raw_items()
.to_vec();
let saw_envelope = history_items.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "assistant"
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::OutputText { text }
if text == &expected_message
))
)
});
let saw_envelope = history_contains_inter_agent_communication(
&history_items,
&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"continue".to_string(),
),
);
let saw_user_message = history_items.iter().any(|item| {
matches!(
item,