Attach subagent completion watcher before input

This commit is contained in:
Ahmed Ibrahim
2026-03-04 20:03:01 -08:00
parent dc19e78962
commit 0bd20db915

View File

@@ -25,6 +25,7 @@ use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::watch;
use tracing::debug;
const AGENT_NAMES: &str = include_str!("agent_names.txt");
const FORKED_SPAWN_AGENT_OUTPUT_MESSAGE: &str = "You are the newly spawned agent. The prior conversation history was forked from your parent agent. Treat the next user message as your new task, and use the forked history only as background context.";
@@ -212,8 +213,9 @@ impl AgentControl {
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source)
.await;
self.send_input(new_thread.thread_id, items).await?;
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
Ok(new_thread.thread_id)
}
@@ -288,7 +290,8 @@ impl AgentControl {
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source))
.await;
Ok(resumed_thread.thread_id)
}
@@ -418,7 +421,7 @@ impl AgentControl {
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
/// can receive completion notifications.
fn maybe_start_completion_watcher(
async fn maybe_start_completion_watcher(
&self,
child_thread_id: ThreadId,
session_source: Option<SessionSource>,
@@ -429,13 +432,20 @@ impl AgentControl {
else {
return;
};
let status_rx = self.subscribe_status(child_thread_id).await.ok();
let control = self.clone();
tokio::spawn(async move {
let status = match control.subscribe_status(child_thread_id).await {
Ok(mut status_rx) => {
let status = match status_rx {
Some(mut status_rx) => {
debug!(%child_thread_id, "subagent completion watcher attached");
let mut status = status_rx.borrow().clone();
while !is_final(&status) {
if status_rx.changed().await.is_err() {
debug!(
%child_thread_id,
"subagent completion watcher lost status stream; reading latest status"
);
status = control.get_status(child_thread_id).await;
break;
}
@@ -443,9 +453,20 @@ impl AgentControl {
}
status
}
Err(_) => control.get_status(child_thread_id).await,
None => {
debug!(
%child_thread_id,
"subagent completion watcher could not subscribe; reading latest status"
);
control.get_status(child_thread_id).await
}
};
if !is_final(&status) {
debug!(
%child_thread_id,
?status,
"subagent completion watcher exiting before final status"
);
return;
}
@@ -1331,15 +1352,18 @@ mod tests {
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = ThreadId::new();
harness.control.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
);
harness
.control
.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await;
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);