feat: disable notifier v2 and start turn on agent interaction (#15624)

Make the inter-agent communication start a turn

As part of this, we disable the v2 notifier to prevent some odd
behaviour where the agent restart working while you're talking to it for
example
This commit is contained in:
jif-oai
2026-03-24 17:01:24 +00:00
committed by GitHub
parent 0f90a34676
commit b51d5f18c7
6 changed files with 47 additions and 150 deletions

View File

@@ -812,36 +812,14 @@ impl AgentControl {
return;
};
let child_thread = state.get_thread(child_thread_id).await.ok();
if let Some(child_agent_path) = child_agent_path
if child_agent_path.is_some()
&& child_thread
.as_ref()
.map(|thread| thread.enabled(Feature::MultiAgentV2))
.unwrap_or(true)
{
let AgentStatus::Completed(Some(content)) = &status else {
return;
};
let Some((parent_path, _)) = child_agent_path.as_str().rsplit_once('/') else {
return;
};
let Ok(parent_agent_path) = AgentPath::try_from(parent_path) else {
return;
};
let Some(parent_thread_id) = control.state.agent_id_for_path(&parent_agent_path)
else {
return;
};
let _ = control
.send_inter_agent_communication(
parent_thread_id,
InterAgentCommunication::new(
child_agent_path,
parent_agent_path,
Vec::new(),
content.clone(),
),
)
.await;
// Disable v2 completion notifications for now so child turns do
// not enqueue synthetic parent messages on completion.
return;
}
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {

View File

@@ -29,8 +29,6 @@ use tempfile::TempDir;
use tokio::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
const MULTI_AGENT_EVENTUAL_TIMEOUT: Duration = Duration::from_secs(5);
use toml::Value as TomlValue;
async fn test_config_with_cli_overrides(
@@ -964,94 +962,6 @@ async fn spawn_child_completion_notifies_parent_history() {
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
}
#[tokio::test]
async fn multi_agent_v2_completion_sends_inter_agent_message_to_direct_parent() {
let harness = AgentControlHarness::new().await;
let (root_thread_id, _) = harness.start_thread().await;
let mut config = harness.config.clone();
let _ = config.features.enable(Feature::MultiAgentV2);
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let worker_thread_id = harness
.control
.spawn_agent(
config.clone(),
text_input("hello worker"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root_thread_id,
depth: 1,
agent_path: Some(worker_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("worker spawn should succeed");
let tester_path = worker_path.join("tester").expect("tester path");
let tester_thread_id = harness
.control
.spawn_agent(
config,
text_input("hello tester"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("tester spawn should succeed");
let tester_thread = harness
.manager
.get_thread(tester_thread_id)
.await
.expect("tester thread should exist");
let tester_turn = tester_thread.codex.session.new_default_turn().await;
tester_thread
.codex
.session
.send_event(
tester_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: tester_turn.sub_id.clone(),
last_agent_message: Some("done".to_string()),
}),
)
.await;
let expected = InterAgentCommunication::new(
tester_path.clone(),
worker_path.clone(),
Vec::new(),
"done".to_string(),
);
timeout(MULTI_AGENT_EVENTUAL_TIMEOUT, async {
loop {
let delivered = harness
.manager
.captured_ops()
.into_iter()
.any(|(thread_id, op)| {
thread_id == worker_thread_id
&& matches!(
op,
Op::InterAgentCommunication { communication }
if communication == expected
)
});
if delivered {
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("completion watcher should send inter-agent communication");
}
#[tokio::test]
async fn multi_agent_v2_completion_ignores_dead_direct_parent() {
let harness = AgentControlHarness::new().await;

View File

@@ -252,6 +252,15 @@ fn inter_agent_assistant_messages_are_turn_boundaries() {
assert!(is_user_turn_boundary(&item));
}
#[test]
fn for_prompt_preserves_inter_agent_assistant_messages() {
let item = inter_agent_assistant_msg("continue");
let history = create_history_with_items(vec![item.clone()]);
assert_eq!(history.raw_items(), std::slice::from_ref(&item));
assert_eq!(history.for_prompt(&default_input_modalities()), vec![item]);
}
#[test]
fn drop_last_n_user_turns_treats_inter_agent_assistant_messages_as_instruction_turns() {
let first_turn = user_input_text_msg("first");

View File

@@ -30,7 +30,6 @@ use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_pending_input;
use crate::models_manager::manager::ModelsManager;
use crate::protocol::EventMsg;
use crate::protocol::TokenUsage;
use crate::protocol::TurnAbortReason;
use crate::protocol::TurnAbortedEvent;
use crate::protocol::TurnCompleteEvent;
@@ -192,11 +191,6 @@ impl Session {
let cancellation_token = CancellationToken::new();
let done = Arc::new(Notify::new());
let timer = turn_context
.session_telemetry
.start_timer(TURN_E2E_DURATION_METRIC, &[])
.ok();
let done_clone = Arc::clone(&done);
let handle = {
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
@@ -236,6 +230,21 @@ impl Session {
)
};
let queued_response_items = self.take_queued_response_items_for_next_turn().await;
let mut active = self.active_turn.lock().await;
let mut turn = ActiveTurn::default();
let mut turn_state = turn.turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
for item in queued_response_items {
turn_state.push_pending_input(item);
}
drop(turn_state);
let timer = turn_context
.session_telemetry
.start_timer(TURN_E2E_DURATION_METRIC, &[])
.ok();
let running_task = RunningTask {
done,
handle: Arc::new(AbortOnDropHandle::new(handle)),
@@ -245,8 +254,8 @@ impl Session {
turn_context: Arc::clone(&turn_context),
_timer: timer,
};
self.register_new_active_task(running_task, token_usage_at_turn_start)
.await;
turn.add_task(running_task);
*active = Some(turn);
}
pub(crate) async fn ensure_task_for_queued_response_items(self: &Arc<Self>) {
@@ -406,23 +415,6 @@ impl Session {
self.send_event(turn_context.as_ref(), event).await;
}
async fn register_new_active_task(
&self,
task: RunningTask,
token_usage_at_turn_start: TokenUsage,
) {
let mut active = self.active_turn.lock().await;
let mut turn = ActiveTurn::default();
let mut turn_state = turn.turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
for item in self.take_queued_response_items_for_next_turn().await {
turn_state.push_pending_input(item);
}
drop(turn_state);
turn.add_task(task);
*active = Some(turn);
}
async fn take_active_turn(&self) -> Option<ActiveTurn> {
let mut active = self.active_turn.lock().await;
active.take()

View File

@@ -63,14 +63,22 @@ impl SessionTask for RegularTask {
Some(*prewarmed_client_session)
}
};
run_turn(
sess,
ctx,
input,
prewarmed_client_session,
cancellation_token,
)
.instrument(run_turn_span)
.await
let mut next_input = input;
let mut prewarmed_client_session = prewarmed_client_session;
loop {
let last_agent_message = run_turn(
Arc::clone(&sess),
Arc::clone(&ctx),
next_input,
prewarmed_client_session.take(),
cancellation_token.child_token(),
)
.instrument(run_turn_span.clone())
.await;
if !sess.has_pending_input().await {
return last_agent_message;
}
next_input = Vec::new();
}
}
}

View File

@@ -548,7 +548,7 @@ impl InterAgentCommunication {
Self::from_message_content(content).is_some()
}
fn from_message_content(content: &[ContentItem]) -> Option<Self> {
pub fn from_message_content(content: &[ContentItem]) -> Option<Self> {
match content {
[ContentItem::InputText { text }] | [ContentItem::OutputText { text }] => {
serde_json::from_str(text).ok()