Refactor subagent handler/manager (readability)

This commit is contained in:
Friel
2025-11-22 09:26:01 -08:00
parent beb71f4a00
commit 91cc56702f
2 changed files with 972 additions and 847 deletions

View File

@@ -48,13 +48,14 @@ use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex_delegate::run_codex_conversation_interactive;
use crate::error::CodexErr;
use crate::model_family::derive_default_model_family;
use crate::model_family::find_family_for_model;
use crate::protocol::SandboxPolicy;
use crate::model_family::{derive_default_model_family, find_family_for_model};
use codex_protocol::config_types::ReasoningEffort;
use crate::subagents::SubagentMetadata;
use crate::subagents::SubagentOrigin;
use crate::subagents::SubagentRegistry;
use crate::subagents::SubagentStatus;
use codex_protocol::config_types::ReasoningEffort;
const LOG_CAPACITY: usize = 200;
const ROOT_AGENT_ID: AgentId = 0;
@@ -120,6 +121,14 @@ fn clamp_reasoning_effort_for_model(config: &mut CodexConfig) {
}
}
fn normalize_prompt(prompt: Option<String>) -> Option<String> {
prompt
.as_ref()
.map(|text| text.trim())
.filter(|text| !text.is_empty())
.map(str::to_string)
}
impl SubagentManager {
fn allocate_agent_id(&self) -> AgentId {
self.next_agent_id.fetch_add(1, Ordering::Relaxed)
@@ -136,35 +145,18 @@ impl SubagentManager {
.duration_since(UNIX_EPOCH)
.map(|dur| dur.as_millis() as i64)
.unwrap_or(0);
let pending_messages = {
let mut guard = self.root_inbox.write().await;
let entry = guard.entry(*root_session_id).or_insert_with(Vec::new);
entry.push(RootInboxItem {
sender_agent_id: metadata.agent_id,
timestamp_ms: now_ms,
payload: RootInboxPayload::Completion(completion),
metadata: Some(metadata),
});
entry.len()
};
let event = EventMsg::AgentInbox(AgentInboxEvent {
agent_id: ROOT_AGENT_ID,
session_id: *root_session_id,
pending_messages,
pending_interrupts: 0,
});
self.send_event_to_parent(source_session_id, event).await;
if self.root_inbox_autosubmit
&& let Some(root_session) = crate::session_index::get(root_session_id)
&& !root_session.has_active_turn().await
{
let items = self.drain_root_inbox_to_items(root_session_id).await;
if !items.is_empty() {
root_session.autosubmit_inbox_task(items).await;
}
}
let _ = self
.push_root_inbox_entry(
root_session_id,
source_session_id,
RootInboxItem {
sender_agent_id: metadata.agent_id,
timestamp_ms: now_ms,
payload: RootInboxPayload::Completion(completion),
metadata: Some(metadata),
},
)
.await;
}
async fn parent_agent_id(&self, session_id: &ConversationId) -> AgentId {
@@ -228,11 +220,33 @@ impl SubagentManager {
rollout_items: &[RolloutItem],
parent_session_id: ConversationId,
) {
let by_session = Self::collect_lifecycle_from_rollout(rollout_items);
let watchdogs = Self::collect_watchdogs_from_rollout(rollout_items);
for meta in by_session.values() {
self.registry.register_imported(meta.clone()).await;
}
for (agent_id, (interval, message)) in watchdogs {
let _ = self
.watchdog_action(
parent_session_id,
ROOT_AGENT_ID,
agent_id,
interval,
message,
false,
)
.await;
}
}
fn collect_lifecycle_from_rollout(
rollout_items: &[RolloutItem],
) -> HashMap<ConversationId, SubagentMetadata> {
use codex_protocol::protocol::SubagentLifecycleEvent;
let mut by_session: HashMap<ConversationId, SubagentMetadata> = HashMap::new();
let mut watchdogs: HashMap<AgentId, (u64, String)> = HashMap::new();
let mut call_args: HashMap<String, serde_json::Value> = HashMap::new();
for item in rollout_items {
match item {
@@ -261,6 +275,21 @@ impl SubagentManager {
meta.pending_interrupts = ev.pending_interrupts;
}
}
_ => {}
}
}
by_session
}
fn collect_watchdogs_from_rollout(
rollout_items: &[RolloutItem],
) -> HashMap<AgentId, (u64, String)> {
let mut watchdogs: HashMap<AgentId, (u64, String)> = HashMap::new();
let mut call_args: HashMap<String, serde_json::Value> = HashMap::new();
for item in rollout_items {
match item {
RolloutItem::ResponseItem(ResponseItem::FunctionCall {
name,
call_id,
@@ -298,22 +327,7 @@ impl SubagentManager {
}
}
for meta in by_session.values() {
self.registry.register_imported(meta.clone()).await;
}
for (agent_id, (interval, message)) in watchdogs {
let _ = self
.watchdog_action(
parent_session_id,
ROOT_AGENT_ID,
agent_id,
interval,
message,
false,
)
.await;
}
watchdogs
}
async fn register_emitter(
@@ -330,6 +344,24 @@ impl SubagentManager {
self.emitters.write().await.remove(session_id);
}
async fn prepare_child_session(
&self,
metadata: &SubagentMetadata,
session: Arc<Session>,
turn: Arc<TurnContext>,
) {
self.register_emitter(metadata.session_id, Arc::clone(&session), Arc::clone(&turn))
.await;
self.emit_created(metadata).await;
self.emit_inbox(metadata).await;
}
async fn rollback_child(&self, session_id: &ConversationId) {
self.emit_deleted(session_id).await;
self.remove_emitter(session_id).await;
self.registry.remove(session_id).await;
}
async fn send_event_to_parent(&self, session_id: &ConversationId, msg: EventMsg) {
if let Some(emitter) = self.emitters.read().await.get(session_id).cloned() {
emitter.session.send_event(&emitter.turn, msg).await;
@@ -342,15 +374,30 @@ impl SubagentManager {
source_session_id: &ConversationId,
message: InboxMessage,
) {
let _ = self
.push_root_inbox_entry(
root_session_id,
source_session_id,
RootInboxItem {
sender_agent_id: message.sender_agent_id,
timestamp_ms: message.timestamp_ms,
payload: RootInboxPayload::Message(message),
metadata: None,
},
)
.await;
}
async fn push_root_inbox_entry(
&self,
root_session_id: &ConversationId,
source_session_id: &ConversationId,
item: RootInboxItem,
) -> usize {
let pending_messages = {
let mut guard = self.root_inbox.write().await;
let entry = guard.entry(*root_session_id).or_insert_with(Vec::new);
entry.push(RootInboxItem {
sender_agent_id: message.sender_agent_id,
timestamp_ms: message.timestamp_ms,
payload: RootInboxPayload::Message(message),
metadata: None,
});
entry.push(item);
entry.len()
};
@@ -371,6 +418,8 @@ impl SubagentManager {
root_session.autosubmit_inbox_task(items).await;
}
}
pending_messages
}
/// Drain and clear the root inbox for `root_session_id`, returning a
@@ -547,6 +596,14 @@ impl SubagentManager {
}
}
async fn agent_id_for(&self, session_id: &ConversationId) -> AgentId {
self.registry
.get(session_id)
.await
.map(|m| m.agent_id)
.unwrap_or(ROOT_AGENT_ID)
}
async fn emit_inbox(&self, metadata: &SubagentMetadata) {
let msg = EventMsg::AgentInbox(AgentInboxEvent {
agent_id: metadata.agent_id,
@@ -718,12 +775,7 @@ impl SubagentManager {
}
async fn emit_status(&self, session_id: &ConversationId, status: SubagentStatus) {
let agent_id = self
.registry
.get(session_id)
.await
.map(|m| m.agent_id)
.unwrap_or(ROOT_AGENT_ID);
let agent_id = self.agent_id_for(session_id).await;
let event =
EventMsg::SubagentLifecycle(SubagentLifecycleEvent::Status(SubagentStatusEvent {
agent_id,
@@ -734,12 +786,7 @@ impl SubagentManager {
}
async fn emit_reasoning_header(&self, session_id: &ConversationId, header: String) {
let agent_id = self
.registry
.get(session_id)
.await
.map(|m| m.agent_id)
.unwrap_or(ROOT_AGENT_ID);
let agent_id = self.agent_id_for(session_id).await;
let event = EventMsg::SubagentLifecycle(SubagentLifecycleEvent::ReasoningHeader(
SubagentReasoningHeaderEvent {
agent_id,
@@ -751,12 +798,7 @@ impl SubagentManager {
}
async fn emit_deleted(&self, session_id: &ConversationId) {
let agent_id = self
.registry
.get(session_id)
.await
.map(|m| m.agent_id)
.unwrap_or(ROOT_AGENT_ID);
let agent_id = self.agent_id_for(session_id).await;
let event =
EventMsg::SubagentLifecycle(SubagentLifecycleEvent::Deleted(SubagentRemovedEvent {
agent_id,
@@ -804,10 +846,8 @@ impl SubagentManager {
)
.await;
self.register_emitter(session_id, Arc::clone(&session), Arc::clone(&turn))
self.prepare_child_session(&metadata, Arc::clone(&session), Arc::clone(&turn))
.await;
self.emit_created(&metadata).await;
self.emit_inbox(&metadata).await;
let launch = self
.launch_child(
@@ -822,9 +862,7 @@ impl SubagentManager {
let runtime = match launch {
Ok(runtime) => runtime,
Err(err) => {
self.emit_deleted(&session_id).await;
self.remove_emitter(&session_id).await;
self.registry.remove(&session_id).await;
self.rollback_child(&session_id).await;
return Err(err);
}
};
@@ -871,10 +909,8 @@ impl SubagentManager {
)
.await;
self.register_emitter(session_id, Arc::clone(&session), Arc::clone(&turn))
self.prepare_child_session(&metadata, Arc::clone(&session), Arc::clone(&turn))
.await;
self.emit_created(&metadata).await;
self.emit_inbox(&metadata).await;
let mut fork_items = build_fork_history(Arc::clone(&session))
.await
@@ -920,19 +956,12 @@ impl SubagentManager {
let runtime = match launch {
Ok(runtime) => runtime,
Err(err) => {
self.emit_deleted(&session_id).await;
self.remove_emitter(&session_id).await;
self.registry.remove(&session_id).await;
self.rollback_child(&session_id).await;
return Err(err);
}
};
let trimmed_prompt = request
.prompt
.as_ref()
.map(|p| p.trim())
.filter(|p| !p.is_empty())
.map(str::to_string);
let trimmed_prompt = normalize_prompt(request.prompt.clone());
match trimmed_prompt {
Some(prompt) => match runtime.submit_prompt(&prompt).await {
@@ -1006,11 +1035,7 @@ impl SubagentManager {
// Keep prior completions/logs intact so history reflects previous turns.
// Only clear the runtime's in-flight completion latch so new work can proceed.
runtime.clear_completion();
let trimmed_prompt = prompt
.as_ref()
.map(|text| text.trim())
.filter(|text| !text.is_empty())
.map(std::string::ToString::to_string);
let trimmed_prompt = normalize_prompt(prompt);
if interrupt || trimmed_prompt.is_some() {
let counts = runtime
@@ -1219,11 +1244,7 @@ impl SubagentManager {
prompt: Option<String>,
sender_metadata: SubagentMetadata,
) -> Result<(), SubagentManagerError> {
let trimmed_prompt = prompt
.as_ref()
.map(|text| text.trim())
.filter(|text| !text.is_empty())
.map(std::string::ToString::to_string);
let trimmed_prompt = normalize_prompt(prompt);
if trimmed_prompt.is_none() {
return Ok(());
@@ -1336,6 +1357,28 @@ impl SubagentManager {
}
}
fn remaining_timeout(
start: tokio::time::Instant,
timeout_total: Option<Duration>,
session_id: &ConversationId,
agent_id: AgentId,
) -> Result<Option<Duration>, SubagentManagerError> {
if let Some(total) = timeout_total {
let elapsed = start.elapsed();
if elapsed >= total {
let timeout_ms = total.as_millis().try_into().unwrap_or(u64::MAX);
return Err(SubagentManagerError::AwaitTimedOut {
session_id: *session_id,
agent_id,
timeout_ms,
});
}
Ok(Some(total - elapsed))
} else {
Ok(None)
}
}
pub async fn await_completion(
&self,
session_id: &ConversationId,
@@ -1395,6 +1438,7 @@ impl SubagentManager {
}
};
let mut receiver = runtime.completion_receiver();
let agent_id = self.agent_id_for(session_id).await;
if let Some(completion) = current_completion(&receiver) {
let metadata = self
@@ -1412,33 +1456,15 @@ impl SubagentManager {
let start = tokio::time::Instant::now();
let mut completion_opt = None;
loop {
let changed = if let Some(duration) = timeout_total {
let elapsed = start.elapsed();
if elapsed >= duration {
let timeout_ms = duration.as_millis().try_into().unwrap_or(u64::MAX);
let agent_id = self
.registry
.get(session_id)
.await
.map(|m| m.agent_id)
.unwrap_or(ROOT_AGENT_ID);
return Err(SubagentManagerError::AwaitTimedOut {
session_id: *session_id,
agent_id,
timeout_ms,
});
}
let remaining = duration - elapsed;
let changed = if let Some(remaining) =
Self::remaining_timeout(start, timeout_total, session_id, agent_id)?
{
match tokio::time::timeout(remaining, receiver.changed()).await {
Ok(result) => result,
Err(_) => {
let timeout_ms = duration.as_millis().try_into().unwrap_or(u64::MAX);
let agent_id = self
.registry
.get(session_id)
.await
.map(|m| m.agent_id)
.unwrap_or(ROOT_AGENT_ID);
let timeout_ms = timeout_total
.map(|d| d.as_millis().try_into().unwrap_or(u64::MAX))
.unwrap_or(0);
return Err(SubagentManagerError::AwaitTimedOut {
session_id: *session_id,
agent_id,
@@ -1551,6 +1577,7 @@ impl SubagentManager {
let mut receiver = runtime.completion_receiver();
let inbox_notify = runtime.inbox_notifier();
let agent_id = self.agent_id_for(session_id).await;
// Fast path: if there are already inbox messages or a completion,
// return immediately without waiting.
@@ -1582,28 +1609,7 @@ impl SubagentManager {
let start = tokio::time::Instant::now();
loop {
// Helper for computing remaining timeout and converting expirations
// into the existing AwaitTimedOut error.
let remaining = if let Some(duration) = timeout_total {
let elapsed = start.elapsed();
if elapsed >= duration {
let timeout_ms = duration.as_millis().try_into().unwrap_or(u64::MAX);
let agent_id = self
.registry
.get(session_id)
.await
.map(|m| m.agent_id)
.unwrap_or(ROOT_AGENT_ID);
return Err(SubagentManagerError::AwaitTimedOut {
session_id: *session_id,
agent_id,
timeout_ms,
});
}
Some(duration - elapsed)
} else {
None
};
let remaining = Self::remaining_timeout(start, timeout_total, session_id, agent_id)?;
if let Some(rem) = remaining {
tokio::select! {
@@ -1611,12 +1617,6 @@ impl SubagentManager {
let timeout_ms = timeout_total
.map(|d| d.as_millis().try_into().unwrap_or(u64::MAX))
.unwrap_or(0);
let agent_id = self
.registry
.get(session_id)
.await
.map(|m| m.agent_id)
.unwrap_or(ROOT_AGENT_ID);
return Err(SubagentManagerError::AwaitTimedOut {
session_id: *session_id,
agent_id,

File diff suppressed because it is too large Load Diff