Refactor subagent manager/handler readability

This commit is contained in:
Friel
2025-11-22 19:42:44 -08:00
parent 91cc56702f
commit b4bccfb10a
2 changed files with 100 additions and 169 deletions

View File

@@ -1537,16 +1537,7 @@ impl SubagentManager {
let completions = self.completions.read().await;
completions.get(session_id).cloned()
} {
let metadata = self
.registry
.get(session_id)
.await
.ok_or(SubagentManagerError::NotFound)?;
return Ok(AwaitInboxResult {
metadata,
completion: Some(completion),
messages: Vec::new(),
});
return self.completion_result(session_id, completion).await;
}
let runtime = {
@@ -1560,16 +1551,7 @@ impl SubagentManager {
let completions = self.completions.read().await;
completions.get(session_id).cloned()
} {
let metadata = self
.registry
.get(session_id)
.await
.ok_or(SubagentManagerError::NotFound)?;
return Ok(AwaitInboxResult {
metadata,
completion: Some(completion),
messages: Vec::new(),
});
return self.completion_result(session_id, completion).await;
}
return Err(SubagentManagerError::NotFound);
}
@@ -1585,13 +1567,8 @@ impl SubagentManager {
let mut completion_opt = current_completion(&receiver);
if completion_opt.is_some() || !messages.is_empty() {
if let Some(ref completion) = completion_opt {
let desired_status = status_from_completion(completion);
self.update_status_and_emit(session_id, desired_status)
.await;
{
let mut completions = self.completions.write().await;
completions.insert(*session_id, completion.clone());
}
self.ensure_completion_recorded(session_id, completion)
.await?;
}
let metadata = self
.registry
@@ -1611,58 +1588,35 @@ impl SubagentManager {
loop {
let remaining = Self::remaining_timeout(start, timeout_total, session_id, agent_id)?;
if let Some(rem) = remaining {
tokio::select! {
_ = tokio::time::sleep(rem) => {
let timeout_ms = timeout_total
.map(|d| d.as_millis().try_into().unwrap_or(u64::MAX))
.unwrap_or(0);
return Err(SubagentManagerError::AwaitTimedOut {
session_id: *session_id,
agent_id,
timeout_ms,
});
}
_ = inbox_notify.notified() => {
let new_messages = runtime.drain_inbox().await;
if !new_messages.is_empty() {
messages.extend(new_messages);
break;
}
// Spurious wakeup or inbox drained by another waiter;
// loop and wait again.
}
changed = receiver.changed() => {
match changed {
Ok(()) => {
if let Some(completion) = current_completion(&receiver) {
completion_opt = Some(completion);
break;
}
}
Err(_) => break,
}
}
tokio::select! {
_ = Self::timeout_future(remaining) => {
let timeout_ms = timeout_total
.map(|d| d.as_millis().try_into().unwrap_or(u64::MAX))
.unwrap_or(0);
return Err(SubagentManagerError::AwaitTimedOut {
session_id: *session_id,
agent_id,
timeout_ms,
});
}
} else {
tokio::select! {
_ = inbox_notify.notified() => {
let new_messages = runtime.drain_inbox().await;
if !new_messages.is_empty() {
messages.extend(new_messages);
break;
}
_ = inbox_notify.notified() => {
let new_messages = runtime.drain_inbox().await;
if !new_messages.is_empty() {
messages.extend(new_messages);
break;
}
changed = receiver.changed() => {
match changed {
Ok(()) => {
if let Some(completion) = current_completion(&receiver) {
completion_opt = Some(completion);
break;
}
// Spurious wakeup or inbox drained by another waiter;
// loop and wait again.
}
changed = receiver.changed() => {
match changed {
Ok(()) => {
if let Some(completion) = current_completion(&receiver) {
completion_opt = Some(completion);
break;
}
Err(_) => break,
}
Err(_) => break,
}
}
}
@@ -1670,13 +1624,8 @@ impl SubagentManager {
// Finalize completion state if we observed a terminal result.
if let Some(ref completion) = completion_opt {
let desired_status = status_from_completion(completion);
self.update_status_and_emit(session_id, desired_status)
.await;
{
let mut completions = self.completions.write().await;
completions.insert(*session_id, completion.clone());
}
self.ensure_completion_recorded(session_id, completion)
.await?;
}
let metadata = self
@@ -1935,17 +1884,10 @@ impl SubagentManager {
// messages into the parent and child
// histories so callers do not need to
// issue an explicit await.
if let Ok(result) = self
.await_inbox_and_completion(
&session_id,
Some(Duration::from_millis(0)),
)
if let Some(result) = self
.drain_terminal_inbox(&session_id)
.await
{
self
.deliver_inbox_to_threads_at_yield(&result)
.await;
if let Some(completion) = result.completion.clone() {
let status = status_from_completion(&completion);
self
@@ -1969,17 +1911,9 @@ impl SubagentManager {
reason: ev.reason.clone(),
})
.await;
if let Ok(result) = self
.await_inbox_and_completion(
&session_id,
Some(Duration::from_millis(0)),
)
.await
{
self
.deliver_inbox_to_threads_at_yield(&result)
.await;
}
let _ = self
.drain_terminal_inbox(&session_id)
.await;
self
.finalize_terminal(
&session_id,
@@ -1998,17 +1932,9 @@ impl SubagentManager {
message: ev.message.clone(),
})
.await;
if let Ok(result) = self
.await_inbox_and_completion(
&session_id,
Some(Duration::from_millis(0)),
)
.await
{
self
.deliver_inbox_to_threads_at_yield(&result)
.await;
}
let _ = self
.drain_terminal_inbox(&session_id)
.await;
self
.finalize_terminal(
&session_id,
@@ -2027,17 +1953,9 @@ impl SubagentManager {
message: ev.message.clone(),
})
.await;
if let Ok(result) = self
.await_inbox_and_completion(
&session_id,
Some(Duration::from_millis(0)),
)
.await
{
self
.deliver_inbox_to_threads_at_yield(&result)
.await;
}
let _ = self
.drain_terminal_inbox(&session_id)
.await;
self
.finalize_terminal(
&session_id,
@@ -2073,6 +1991,18 @@ impl SubagentManager {
// Runtime is kept alive after completion so messages can resume; no auto-removal here.
}
async fn drain_terminal_inbox(&self, session_id: &ConversationId) -> Option<AwaitInboxResult> {
if let Ok(result) = self
.await_inbox_and_completion(session_id, Some(Duration::from_millis(0)))
.await
{
self.deliver_inbox_to_threads_at_yield(&result).await;
Some(result)
} else {
None
}
}
async fn run_pending_ops(&self, session_id: ConversationId, runtime: Arc<ManagedSubagent>) {
let cancel = runtime.cancellation_token();
let notify = runtime.pending_ops_notifier();

View File

@@ -724,21 +724,7 @@ async fn handle_spawn(ctx: &InvocationContext) -> Result<ToolOutput, FunctionCal
.await
.map_err(|err| map_manager_error(err, None))?;
let response = json!({
"session_id": metadata.session_id,
"agent_id": metadata.agent_id,
"parent_agent_id": metadata.parent_agent_id,
"origin": metadata.origin,
"status": metadata.status,
"model": model,
"label": metadata.label,
"summary": metadata.summary,
"parent_session_id": metadata.parent_session_id,
"started_at_ms": metadata.created_at_ms,
"initial_message_count": metadata.initial_message_count,
"pending_messages": metadata.pending_messages,
"pending_interrupts": metadata.pending_interrupts,
});
let response = build_subagent_response(&metadata, Some(json!(model)), None);
Ok(ToolOutput::Function {
content: response.to_string(),
@@ -813,22 +799,8 @@ async fn handle_fork(ctx: &InvocationContext) -> Result<ToolOutput, FunctionCall
"prompt": prompt_clone,
});
let response = json!({
"session_id": metadata.session_id,
"agent_id": metadata.agent_id,
"parent_agent_id": metadata.parent_agent_id,
"origin": metadata.origin,
"status": metadata.status,
"model": model_clone,
"label": metadata.label,
"summary": metadata.summary,
"parent_session_id": metadata.parent_session_id,
"started_at_ms": metadata.created_at_ms,
"initial_message_count": metadata.initial_message_count,
"pending_messages": metadata.pending_messages,
"pending_interrupts": metadata.pending_interrupts,
"payload": parent_payload,
});
let response =
build_subagent_response(&metadata, Some(json!(model_clone)), Some(parent_payload));
Ok(ToolOutput::Function {
content: response.to_string(),
@@ -935,20 +907,7 @@ async fn handle_send_message(ctx: &InvocationContext) -> Result<ToolOutput, Func
.await
.map_err(|err| map_manager_error(err, Some(agent_id)))?;
let response = json!({
"session_id": metadata.session_id,
"agent_id": metadata.agent_id,
"parent_agent_id": metadata.parent_agent_id,
"origin": metadata.origin,
"status": metadata.status,
"label": metadata.label,
"summary": metadata.summary,
"parent_session_id": metadata.parent_session_id,
"started_at_ms": metadata.created_at_ms,
"initial_message_count": metadata.initial_message_count,
"pending_messages": metadata.pending_messages,
"pending_interrupts": metadata.pending_interrupts,
});
let response = build_subagent_response(&metadata, None, None);
Ok(ToolOutput::Function {
content: response.to_string(),
@@ -1376,6 +1335,48 @@ impl ToolHandler for SubagentToolHandler {
}
}
fn build_subagent_response(
metadata: &SubagentMetadata,
model_value: Option<Value>,
payload: Option<Value>,
) -> Value {
let mut map = Map::new();
map.insert("session_id".to_string(), json!(metadata.session_id));
map.insert("agent_id".to_string(), json!(metadata.agent_id));
map.insert(
"parent_agent_id".to_string(),
json!(metadata.parent_agent_id),
);
map.insert("origin".to_string(), json!(metadata.origin));
map.insert("status".to_string(), json!(metadata.status));
if let Some(model) = model_value {
map.insert("model".to_string(), model);
}
if let Some(payload_value) = payload {
map.insert("payload".to_string(), payload_value);
}
map.insert("label".to_string(), json!(metadata.label));
map.insert("summary".to_string(), json!(metadata.summary));
map.insert(
"parent_session_id".to_string(),
json!(metadata.parent_session_id),
);
map.insert("started_at_ms".to_string(), json!(metadata.created_at_ms));
map.insert(
"initial_message_count".to_string(),
json!(metadata.initial_message_count),
);
map.insert(
"pending_messages".to_string(),
json!(metadata.pending_messages),
);
map.insert(
"pending_interrupts".to_string(),
json!(metadata.pending_interrupts),
);
Value::Object(map)
}
/// Build the JSON payload returned by `subagent_logs` for a given log snapshot.
pub fn render_logs_payload(
session_id: ConversationId,