feat: structured multi-agent output (#15515)

Send input now sends messages as assistant message and with this format:

```
author: /root/worker_a
recipient: /root/worker_a/tester
other_recipients: []
Content: bla bla bla. Actual content. Only text for now
```
This commit is contained in:
jif-oai
2026-03-23 18:53:54 +00:00
committed by GitHub
parent e838645fa2
commit 37ac0c093c
17 changed files with 994 additions and 66 deletions

View File

@@ -798,6 +798,7 @@ pub(crate) struct Session {
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(crate) conversation: Arc<RealtimeConversationManager>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
idle_pending_input: Mutex<Vec<ResponseInputItem>>,
pub(crate) guardian_review_session: GuardianReviewSessionManager,
pub(crate) services: SessionServices,
js_repl: Arc<JsReplHandle>,
@@ -809,6 +810,7 @@ pub(crate) struct TurnSkillsContext {
pub(crate) outcome: Arc<SkillLoadOutcome>,
pub(crate) implicit_invocation_seen_skills: Arc<Mutex<HashSet<String>>>,
}
impl TurnSkillsContext {
pub(crate) fn new(outcome: Arc<SkillLoadOutcome>) -> Self {
Self {
@@ -1895,6 +1897,7 @@ impl Session {
pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None),
idle_pending_input: Mutex::new(Vec::new()),
guardian_review_session: GuardianReviewSessionManager::default(),
services,
js_repl,
@@ -3912,7 +3915,7 @@ impl Session {
Ok(active_turn_id.clone())
}
/// Returns the input if there was no task running to inject into
/// Returns the input if there was no task running to inject into.
pub async fn inject_response_items(
&self,
input: Vec<ResponseInputItem>,
@@ -3953,6 +3956,24 @@ impl Session {
}
}
/// Queue response items to be injected into the next active turn created for this session.
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {
if items.is_empty() {
return;
}
let mut idle_pending_input = self.idle_pending_input.lock().await;
idle_pending_input.extend(items);
}
pub(crate) async fn take_queued_response_items_for_next_turn(&self) -> Vec<ResponseInputItem> {
std::mem::take(&mut *self.idle_pending_input.lock().await)
}
pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool {
!self.idle_pending_input.lock().await.is_empty()
}
pub async fn has_pending_input(&self) -> bool {
let active = self.active_turn.lock().await;
match active.as_ref() {
@@ -5441,7 +5462,7 @@ pub(crate) async fn run_turn(
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
if input.is_empty() {
if input.is_empty() && !sess.has_pending_input().await {
return None;
}
@@ -5583,25 +5604,33 @@ pub(crate) async fn run_turn(
})
.collect::<Vec<_>>();
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
let mut last_agent_message: Option<String> = None;
if run_pending_session_start_hooks(&sess, &turn_context).await {
return last_agent_message;
return None;
}
let user_prompt_submit_outcome =
run_user_prompt_submit_hooks(&sess, &turn_context, UserMessageItem::new(&input).message())
.await;
if user_prompt_submit_outcome.should_stop {
record_additional_contexts(
let additional_contexts = if input.is_empty() {
Vec::new()
} else {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
&sess,
&turn_context,
user_prompt_submit_outcome.additional_contexts,
UserMessageItem::new(&input).message(),
)
.await;
return last_agent_message;
}
let additional_contexts = user_prompt_submit_outcome.additional_contexts;
if user_prompt_submit_outcome.should_stop {
record_additional_contexts(
&sess,
&turn_context,
user_prompt_submit_outcome.additional_contexts,
)
.await;
return None;
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
user_prompt_submit_outcome.additional_contexts
};
sess.services
.analytics_events_client
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
@@ -5612,17 +5641,17 @@ pub(crate) async fn run_turn(
}
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
.await;
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
record_additional_contexts(&sess, &turn_context, additional_contexts).await;
// Track the previous-turn baseline from the regular user-turn path only so
// standalone tasks (compact/shell/review/undo) cannot suppress future
// model/realtime injections.
sess.set_previous_turn_settings(Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
}))
.await;
if !input.is_empty() {
// Track the previous-turn baseline from the regular user-turn path only so
// standalone tasks (compact/shell/review/undo) cannot suppress future
// model/realtime injections.
sess.set_previous_turn_settings(Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
}))
.await;
}
if !skill_items.is_empty() {
sess.record_conversation_items(&turn_context, &skill_items)
@@ -5633,8 +5662,10 @@ pub(crate) async fn run_turn(
.await;
}
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
.await;
let mut last_agent_message: Option<String> = None;
let mut stop_hook_active = false;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
// many turns, from the perspective of the user, it is a single turn.