fix: one shot end of turn (#16308)

Fix the death of the end of turn watcher
This commit is contained in:
jif-oai
2026-03-31 11:11:33 +02:00
committed by GitHub
parent 20f43c1e05
commit 873e466549
3 changed files with 304 additions and 22 deletions

View File

@@ -273,17 +273,19 @@ impl AgentControl {
self.send_input(new_thread.thread_id, initial_operation)
.await?;
let child_reference = agent_metadata
.agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| new_thread.thread_id.to_string());
self.maybe_start_completion_watcher(
new_thread.thread_id,
notification_source,
child_reference,
agent_metadata.agent_path.clone(),
);
if !new_thread.thread.enabled(Feature::MultiAgentV2) {
let child_reference = agent_metadata
.agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| new_thread.thread_id.to_string());
self.maybe_start_completion_watcher(
new_thread.thread_id,
notification_source,
child_reference,
agent_metadata.agent_path.clone(),
);
}
Ok(LiveAgent {
thread_id: new_thread.thread_id,
@@ -445,17 +447,19 @@ impl AgentControl {
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
let child_reference = agent_metadata
.agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| resumed_thread.thread_id.to_string());
self.maybe_start_completion_watcher(
resumed_thread.thread_id,
Some(notification_source.clone()),
child_reference,
agent_metadata.agent_path.clone(),
);
if !resumed_thread.thread.enabled(Feature::MultiAgentV2) {
let child_reference = agent_metadata
.agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| resumed_thread.thread_id.to_string());
self.maybe_start_completion_watcher(
resumed_thread.thread_id,
Some(notification_source.clone()),
child_reference,
agent_metadata.agent_path.clone(),
);
}
self.persist_thread_spawn_edge_for_source(
resumed_thread.thread.as_ref(),
resumed_thread.thread_id,

View File

@@ -14,6 +14,7 @@ use crate::agent::AgentStatus;
use crate::agent::Mailbox;
use crate::agent::MailboxReceiver;
use crate::agent::agent_status_from_event;
use crate::agent::status::is_final;
use crate::apps::render_apps_section;
use crate::auth_env_telemetry::collect_auth_env_telemetry;
use crate::commit_attribution::commit_message_trailer_instruction;
@@ -39,6 +40,7 @@ use crate::realtime_conversation::handle_start as handle_realtime_conversation_s
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
use crate::render_skills_section;
use crate::rollout::session_index;
use crate::session_prefix::format_subagent_notification_message;
use crate::skills_load_input_from_config;
use crate::stream_events_utils::HandleOutputCtx;
use crate::stream_events_utils::handle_non_tool_response_item;
@@ -2638,6 +2640,8 @@ impl Session {
msg,
};
self.send_event_raw(event).await;
self.maybe_notify_parent_of_terminal_turn(turn_context, &legacy_source)
.await;
self.maybe_mirror_event_text_to_realtime(&legacy_source)
.await;
self.maybe_clear_realtime_handoff_for_event(&legacy_source)
@@ -2653,6 +2657,73 @@ impl Session {
}
}
/// Forwards terminal turn events from spawned MultiAgentV2 children to their direct parent.
async fn maybe_notify_parent_of_terminal_turn(
&self,
turn_context: &TurnContext,
msg: &EventMsg,
) {
if !self.enabled(Feature::MultiAgentV2) {
return;
}
if !matches!(msg, EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)) {
return;
}
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
agent_path: Some(child_agent_path),
..
}) = &turn_context.session_source
else {
return;
};
let Some(status) = agent_status_from_event(msg) else {
return;
};
if !is_final(&status) {
return;
}
self.forward_child_completion_to_parent(*parent_thread_id, child_agent_path, status)
.await;
}
/// Sends the standard completion envelope from a spawned MultiAgentV2 child to its parent.
async fn forward_child_completion_to_parent(
&self,
parent_thread_id: ThreadId,
child_agent_path: &codex_protocol::AgentPath,
status: AgentStatus,
) {
let Some(parent_agent_path) = child_agent_path
.as_str()
.rsplit_once('/')
.and_then(|(parent, _)| codex_protocol::AgentPath::try_from(parent).ok())
else {
return;
};
let message = format_subagent_notification_message(child_agent_path.as_str(), &status);
let communication = InterAgentCommunication::new(
child_agent_path.clone(),
parent_agent_path,
Vec::new(),
message,
/*trigger_turn*/ false,
);
if let Err(err) = self
.services
.agent_control
.send_inter_agent_communication(parent_thread_id, communication)
.await
{
debug!("failed to notify parent thread {parent_thread_id}: {err}");
}
}
async fn maybe_mirror_event_text_to_realtime(&self, msg: &EventMsg) {
let Some(text) = realtime_text_for_event(msg) else {
return;

View File

@@ -16,7 +16,10 @@ use crate::protocol::Op;
use crate::protocol::SandboxPolicy;
use crate::protocol::SessionSource;
use crate::protocol::SubAgentSource;
use crate::protocol::TurnAbortReason;
use crate::protocol::TurnAbortedEvent;
use crate::protocol::TurnCompleteEvent;
use crate::session_prefix::format_subagent_notification_message;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
@@ -1119,6 +1122,210 @@ async fn multi_agent_v2_assign_task_interrupts_busy_child_without_losing_message
.expect("shutdown should submit");
}
#[tokio::test]
async fn multi_agent_v2_assign_task_completion_notifies_parent_on_every_turn() {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
.start_thread((*turn.config).clone())
.await
.expect("root thread should start");
session.services.agent_control = manager.agent_control();
session.conversation_id = root.thread_id;
let mut config = turn.config.as_ref().clone();
let _ = config.features.enable(Feature::MultiAgentV2);
turn.config = Arc::new(config);
let session = Arc::new(session);
let turn = Arc::new(turn);
SpawnAgentHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"task_name": "worker"
})),
))
.await
.expect("spawn worker");
let agent_id = session
.services
.agent_control
.resolve_agent_reference(session.conversation_id, &turn.session_source, "worker")
.await
.expect("worker should resolve");
let thread = manager
.get_thread(agent_id)
.await
.expect("worker thread should exist");
let worker_path = AgentPath::try_from("/root/worker").expect("worker path");
let first_turn = thread.codex.session.new_default_turn().await;
thread
.codex
.session
.send_event(
first_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: first_turn.sub_id.clone(),
last_agent_message: Some("first done".to_string()),
}),
)
.await;
AssignTaskHandlerV2
.handle(invocation(
session,
turn,
"assign_task",
function_payload(json!({
"target": agent_id.to_string(),
"items": [{"type": "text", "text": "continue"}],
})),
))
.await
.expect("assign_task should succeed");
let second_turn = thread.codex.session.new_default_turn().await;
thread
.codex
.session
.send_event(
second_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: second_turn.sub_id.clone(),
last_agent_message: Some("second done".to_string()),
}),
)
.await;
let first_notification = format_subagent_notification_message(
worker_path.as_str(),
&AgentStatus::Completed(Some("first done".to_string())),
);
let second_notification = format_subagent_notification_message(
worker_path.as_str(),
&AgentStatus::Completed(Some("second done".to_string())),
);
let notifications = timeout(Duration::from_secs(5), async {
loop {
let notifications = manager
.captured_ops()
.into_iter()
.filter_map(|(id, op)| {
(id == root.thread_id)
.then_some(op)
.and_then(|op| match op {
Op::InterAgentCommunication { communication }
if communication.author == worker_path
&& communication.recipient == AgentPath::root()
&& communication.other_recipients.is_empty()
&& !communication.trigger_turn =>
{
Some(communication.content)
}
_ => None,
})
})
.collect::<Vec<_>>();
let first_count = notifications
.iter()
.filter(|message| **message == first_notification)
.count();
let second_count = notifications
.iter()
.filter(|message| **message == second_notification)
.count();
if first_count == 1 && second_count == 1 {
break notifications;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("parent should receive one completion notification per child turn");
assert_eq!(notifications.len(), 2);
}
#[tokio::test]
async fn multi_agent_v2_interrupted_turn_does_not_notify_parent() {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
.start_thread((*turn.config).clone())
.await
.expect("root thread should start");
session.services.agent_control = manager.agent_control();
session.conversation_id = root.thread_id;
let mut config = turn.config.as_ref().clone();
let _ = config.features.enable(Feature::MultiAgentV2);
turn.config = Arc::new(config);
let session = Arc::new(session);
let turn = Arc::new(turn);
SpawnAgentHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"task_name": "worker"
})),
))
.await
.expect("spawn worker");
let agent_id = session
.services
.agent_control
.resolve_agent_reference(session.conversation_id, &turn.session_source, "worker")
.await
.expect("worker should resolve");
let thread = manager
.get_thread(agent_id)
.await
.expect("worker thread should exist");
let aborted_turn = thread.codex.session.new_default_turn().await;
thread
.codex
.session
.send_event(
aborted_turn.as_ref(),
EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some(aborted_turn.sub_id.clone()),
reason: TurnAbortReason::Interrupted,
}),
)
.await;
let notifications = manager
.captured_ops()
.into_iter()
.filter_map(|(id, op)| {
(id == root.thread_id)
.then_some(op)
.and_then(|op| match op {
Op::InterAgentCommunication { communication }
if communication.author.as_str() == "/root/worker"
&& communication.recipient == AgentPath::root()
&& communication.other_recipients.is_empty()
&& !communication.trigger_turn =>
{
Some(communication.content)
}
_ => None,
})
})
.collect::<Vec<_>>();
assert_eq!(notifications, Vec::<String>::new());
}
#[tokio::test]
async fn multi_agent_v2_spawn_includes_agent_id_key_when_named() {
let (mut session, mut turn) = make_session_and_context().await;