feat: use serde to differenciate inter agent communication (#15560)

Use `serde` to encode the inter agent communication to an assistant
message and use the decode to see if this is such a message

Note: this assume serde on small pattern is fast enough
This commit is contained in:
jif-oai
2026-03-23 22:09:55 +00:00
committed by GitHub
parent 73bbb07ba8
commit 191fd9fd16
4 changed files with 62 additions and 55 deletions

View File

@@ -3,9 +3,11 @@ use super::*;
use crate::protocol::CompactedItem;
use crate::protocol::InitialHistory;
use crate::protocol::ResumedHistory;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InterAgentCommunication;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
@@ -38,7 +40,15 @@ fn inter_agent_assistant_message(text: &str) -> ResponseItem {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
text: serde_json::to_string(&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::root()
.join("worker")
.expect("worker path should be valid"),
Vec::new(),
text.to_string(),
))
.expect("inter-agent communication should serialize"),
}],
end_turn: None,
phase: None,
@@ -455,9 +465,7 @@ async fn reconstruct_history_rollback_counts_inter_agent_assistant_turns() {
turn_id: Some(assistant_turn_id.clone()),
..first_context_item.clone()
};
let assistant_instruction = inter_agent_assistant_message(
"author: /root\nrecipient: /root/worker\nother_recipients: []\nContent: continue",
);
let assistant_instruction = inter_agent_assistant_message("continue");
let assistant_reply = assistant_message("worker reply");
let rollout_items = vec![

View File

@@ -4,6 +4,7 @@ use crate::truncate::TruncationPolicy;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_git::GhostCommit;
use codex_protocol::AgentPath;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputBody;
@@ -17,6 +18,7 @@ use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::openai_models::InputModality;
use codex_protocol::openai_models::default_input_modalities;
use codex_protocol::protocol::InterAgentCommunication;
use image::ImageBuffer;
use image::ImageFormat;
use image::Rgba;
@@ -39,11 +41,17 @@ fn assistant_msg(text: &str) -> ResponseItem {
}
fn inter_agent_assistant_msg(text: &str) -> ResponseItem {
let communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::root().join("worker").unwrap(),
Vec::new(),
text.to_string(),
);
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
text: serde_json::to_string(&communication).unwrap(),
}],
end_turn: None,
phase: None,
@@ -239,9 +247,7 @@ fn items_after_last_model_generated_tokens_are_zero_without_model_generated_item
#[test]
fn inter_agent_assistant_messages_are_turn_boundaries() {
let item = inter_agent_assistant_msg(
"author: /root\nrecipient: /root/worker\nother_recipients: []\nContent: continue",
);
let item = inter_agent_assistant_msg("continue");
assert!(is_user_turn_boundary(&item));
}
@@ -250,9 +256,7 @@ fn inter_agent_assistant_messages_are_turn_boundaries() {
fn drop_last_n_user_turns_treats_inter_agent_assistant_messages_as_instruction_turns() {
let first_turn = user_input_text_msg("first");
let first_reply = assistant_msg("done");
let inter_agent_turn = inter_agent_assistant_msg(
"author: /root\nrecipient: /root/worker\nother_recipients: []\nContent: continue",
);
let inter_agent_turn = inter_agent_assistant_msg("continue");
let inter_agent_reply = assistant_msg("worker reply");
let mut history = create_history_with_items(vec![
first_turn.clone(),
@@ -266,6 +270,15 @@ fn drop_last_n_user_turns_treats_inter_agent_assistant_messages_as_instruction_t
assert_eq!(history.raw_items(), &vec![first_turn, first_reply]);
}
#[test]
fn legacy_inter_agent_assistant_messages_are_not_turn_boundaries() {
let item = assistant_msg(
"author: /root\nrecipient: /root/worker\nother_recipients: []\nContent: continue",
);
assert!(!is_user_turn_boundary(&item));
}
#[test]
fn total_token_usage_includes_all_items_after_last_model_generated_item() {
let mut history = create_history_with_items(vec![assistant_msg("already counted by API")]);

View File

@@ -32,6 +32,7 @@ use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use pretty_assertions::assert_eq;
@@ -79,6 +80,16 @@ fn thread_manager() -> ThreadManager {
)
}
fn inter_agent_message_text(recipient: &str, content: &str) -> String {
serde_json::to_string(&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from(recipient).expect("recipient path should be valid"),
Vec::new(),
content.to_string(),
))
.expect("inter-agent communication should serialize")
}
#[derive(Clone, Copy)]
struct NeverEndingTask;
@@ -390,6 +401,7 @@ async fn multi_agent_v2_spawn_returns_path_and_send_input_accepts_relative_path(
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let expected_message = inter_agent_message_text("/root/test_process", "continue");
timeout(Duration::from_secs(2), async {
loop {
let history_items = child_thread
@@ -407,8 +419,7 @@ async fn multi_agent_v2_spawn_returns_path_and_send_input_accepts_relative_path(
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::OutputText { text }
if text
== "author: /root\nrecipient: /root/test_process\nother_recipients: []\nContent: continue"
if text == &expected_message
))
)
});
@@ -508,6 +519,10 @@ async fn multi_agent_v2_send_input_accepts_structured_items() {
.find(|(id, op)| *id == agent_id && *op == expected);
assert_eq!(captured, Some((agent_id, expected)));
let expected_message = inter_agent_message_text(
"/root/worker",
"[mention:$drive](app://google_drive)\nread the folder",
);
timeout(Duration::from_secs(2), async {
loop {
let history_items = thread
@@ -525,8 +540,7 @@ async fn multi_agent_v2_send_input_accepts_structured_items() {
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::OutputText { text }
if text
== "author: /root\nrecipient: /root/worker\nother_recipients: []\nContent: [mention:$drive](app://google_drive)\nread the folder"
if text == &expected_message
))
)
});
@@ -650,6 +664,7 @@ async fn multi_agent_v2_send_input_interrupts_busy_child_without_losing_message(
))
)));
let expected_message = inter_agent_message_text("/root/worker", "continue");
timeout(Duration::from_secs(5), async {
loop {
let history_items = thread
@@ -667,8 +682,7 @@ async fn multi_agent_v2_send_input_interrupts_busy_child_without_losing_message(
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::OutputText { text }
if text
== "author: /root\nrecipient: /root/worker\nother_recipients: []\nContent: continue"
if text == &expected_message
))
)
});

View File

@@ -530,54 +530,26 @@ impl InterAgentCommunication {
}
}
pub fn to_response_item(&self) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: self.as_text(),
}],
end_turn: None,
phase: None,
}
}
pub fn to_response_input_item(&self) -> ResponseInputItem {
ResponseInputItem::Message {
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: self.as_text(),
text: serde_json::to_string(self).unwrap_or_default(),
}],
}
}
pub fn is_message_content(content: &[ContentItem]) -> bool {
content.iter().any(|content_item| match content_item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
Self::is_instruction_text(text)
Self::from_message_content(content).is_some()
}
fn from_message_content(content: &[ContentItem]) -> Option<Self> {
match content {
[ContentItem::InputText { text }] | [ContentItem::OutputText { text }] => {
serde_json::from_str(text).ok()
}
_ => false,
})
}
fn as_text(&self) -> String {
let other_recipients = self
.other_recipients
.iter()
.map(std::string::ToString::to_string)
.collect::<Vec<_>>()
.join(", ");
format!(
"author: {}\nrecipient: {}\nother_recipients: [{other_recipients}]\nContent: {}",
self.author, self.recipient, self.content
)
}
fn is_instruction_text(text: &str) -> bool {
text.starts_with("author: ")
&& text.contains("\nrecipient: ")
&& text.contains("\nother_recipients: [")
&& text.contains("]\nContent: ")
_ => None,
}
}
}