feat: list agents for sub-agent v2 (#15621)

Add a `list_agents` for multi-agent v2, optionally path based

This return the task and status of each agent in the matched path
This commit is contained in:
jif-oai
2026-03-24 11:24:08 +00:00
committed by GitHub
parent 567832c6fe
commit 38c088ba8d
9 changed files with 660 additions and 22 deletions

View File

@@ -5,8 +5,10 @@ use crate::agent::role::DEFAULT_ROLE_NAME;
use crate::agent::role::resolve_role_config;
use crate::agent::status::is_final;
use crate::codex_thread::ThreadConfigSnapshot;
use crate::context_manager::is_user_turn_boundary;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::event_mapping::parse_turn_item;
use crate::find_archived_thread_path_by_id_str;
use crate::find_thread_path_by_id_str;
use crate::rollout::RolloutRecorder;
@@ -18,7 +20,10 @@ use crate::thread_manager::ThreadManagerState;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
@@ -29,6 +34,7 @@ use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
@@ -51,6 +57,13 @@ pub(crate) struct LiveAgent {
pub(crate) status: AgentStatus,
}
#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
pub(crate) struct ListedAgent {
pub(crate) agent_name: String,
pub(crate) agent_status: AgentStatus,
pub(crate) last_task_message: Option<String>,
}
fn default_agent_nickname_list() -> Vec<&'static str> {
AGENT_NAMES
.lines()
@@ -456,21 +469,28 @@ impl AgentControl {
agent_id: ThreadId,
items: Vec<UserInput>,
) -> CodexResult<String> {
let last_task_message = render_input_preview(&items);
let state = self.upgrade()?;
self.handle_thread_request_result(
agent_id,
&state,
state
.send_op(
agent_id,
Op::UserInput {
items,
final_output_json_schema: None,
},
)
.await,
)
.await
let result = self
.handle_thread_request_result(
agent_id,
&state,
state
.send_op(
agent_id,
Op::UserInput {
items,
final_output_json_schema: None,
},
)
.await,
)
.await;
if result.is_ok() {
self.state
.update_last_task_message(agent_id, last_task_message);
}
result
}
/// Append a prebuilt message to an existing agent thread outside the normal user-input path.
@@ -494,15 +514,22 @@ impl AgentControl {
agent_id: ThreadId,
communication: InterAgentCommunication,
) -> CodexResult<String> {
let last_task_message = communication.content.clone();
let state = self.upgrade()?;
self.handle_thread_request_result(
agent_id,
&state,
state
.send_op(agent_id, Op::InterAgentCommunication { communication })
.await,
)
.await
let result = self
.handle_thread_request_result(
agent_id,
&state,
state
.send_op(agent_id, Op::InterAgentCommunication { communication })
.await,
)
.await;
if result.is_ok() {
self.state
.update_last_task_message(agent_id, last_task_message);
}
result
}
/// Interrupt the current task for an existing agent thread.
@@ -680,6 +707,69 @@ impl AgentControl {
.join("\n")
}
pub(crate) async fn list_agents(
&self,
current_session_source: &SessionSource,
path_prefix: Option<&str>,
) -> CodexResult<Vec<ListedAgent>> {
let state = self.upgrade()?;
let resolved_prefix = path_prefix
.map(|prefix| {
current_session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root)
.resolve(prefix)
.map_err(CodexErr::UnsupportedOperation)
})
.transpose()?;
let mut live_agents = self.state.live_agents();
live_agents.sort_by(|left, right| {
left.agent_path
.as_deref()
.unwrap_or_default()
.cmp(right.agent_path.as_deref().unwrap_or_default())
.then_with(|| {
left.agent_id
.map(|id| id.to_string())
.unwrap_or_default()
.cmp(&right.agent_id.map(|id| id.to_string()).unwrap_or_default())
})
});
let mut agents = Vec::with_capacity(live_agents.len());
for metadata in live_agents {
let Some(thread_id) = metadata.agent_id else {
continue;
};
if resolved_prefix
.as_ref()
.is_some_and(|prefix| !agent_matches_prefix(metadata.agent_path.as_ref(), prefix))
{
continue;
}
let Ok(thread) = state.get_thread(thread_id).await else {
continue;
};
let agent_name = metadata
.agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| thread_id.to_string());
agents.push(ListedAgent {
agent_name,
agent_status: thread.agent_status().await,
last_task_message: metadata
.last_task_message
.clone()
.or(last_task_message_for_thread(thread.as_ref()).await),
});
}
Ok(agents)
}
/// Starts a detached watcher for sub-agents spawned from another thread.
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
@@ -800,6 +890,7 @@ impl AgentControl {
agent_path,
agent_nickname,
agent_role,
last_task_message: None,
};
Ok((session_source, agent_metadata))
}
@@ -963,6 +1054,95 @@ fn thread_spawn_parent_thread_id(session_source: &SessionSource) -> Option<Threa
}
}
fn agent_matches_prefix(agent_path: Option<&AgentPath>, prefix: &AgentPath) -> bool {
if prefix.is_root() {
return true;
}
agent_path.is_some_and(|agent_path| {
agent_path == prefix
|| agent_path
.as_str()
.strip_prefix(prefix.as_str())
.is_some_and(|suffix| suffix.starts_with('/'))
})
}
async fn last_task_message_for_thread(thread: &crate::CodexThread) -> Option<String> {
let pending_input = thread.codex.session.pending_input_snapshot().await;
if let Some(message) = pending_input
.iter()
.rev()
.find_map(last_task_message_from_input_item)
{
return Some(message);
}
let queued_input = thread
.codex
.session
.queued_response_items_for_next_turn_snapshot()
.await;
if let Some(message) = queued_input
.iter()
.rev()
.find_map(last_task_message_from_input_item)
{
return Some(message);
}
let history = thread.codex.session.clone_history().await;
history
.raw_items()
.iter()
.rev()
.find_map(last_task_message_from_item)
}
fn last_task_message_from_input_item(item: &ResponseInputItem) -> Option<String> {
let response_item: ResponseItem = item.clone().into();
last_task_message_from_item(&response_item)
}
fn last_task_message_from_item(item: &ResponseItem) -> Option<String> {
if !is_user_turn_boundary(item) {
return None;
}
match item {
ResponseItem::Message { role, content, .. } if role == "user" => {
let Some(TurnItem::UserMessage(message)) = parse_turn_item(item) else {
return None;
};
Some(render_input_preview(&message.content))
}
ResponseItem::Message { content, .. } => match content.as_slice() {
[ContentItem::InputText { text }] | [ContentItem::OutputText { text }] => {
serde_json::from_str::<InterAgentCommunication>(text)
.ok()
.map(|communication| communication.content)
}
_ => None,
},
_ => None,
}
}
fn render_input_preview(items: &[UserInput]) -> String {
items
.iter()
.map(|item| match item {
UserInput::Text { text, .. } => text.clone(),
UserInput::Image { .. } => "[image]".to_string(),
UserInput::LocalImage { path } => format!("[local_image:{}]", path.display()),
UserInput::Skill { name, path } => format!("[skill:${name}]({})", path.display()),
UserInput::Mention { name, path } => format!("[mention:${name}]({path})"),
_ => "[input]".to_string(),
})
.collect::<Vec<_>>()
.join("\n")
}
fn thread_spawn_depth(session_source: &SessionSource) -> Option<i32> {
match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) => Some(*depth),