Merge branch 'main' into jif/compressed-rollouts

This commit is contained in:
jif-oai
2026-03-31 16:58:54 +02:00
committed by GitHub
8 changed files with 145 additions and 148 deletions

View File

@@ -347,7 +347,7 @@ async fn multi_agent_v2_spawn_requires_task_name() {
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo"
"items": [{"type": "text", "text": "inspect this repo"}]
})),
);
let Err(err) = SpawnAgentHandlerV2.handle(invocation).await else {
@@ -359,6 +359,42 @@ async fn multi_agent_v2_spawn_requires_task_name() {
assert!(message.contains("missing field `task_name`"));
}
#[tokio::test]
async fn multi_agent_v2_spawn_rejects_legacy_message_field() {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
.start_thread((*turn.config).clone())
.await
.expect("root thread should start");
session.services.agent_control = manager.agent_control();
session.conversation_id = root.thread_id;
let mut config = (*turn.config).clone();
config
.features
.enable(Feature::MultiAgentV2)
.expect("test config should allow feature update");
turn.config = Arc::new(config);
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "worker"
})),
);
let Err(err) = SpawnAgentHandlerV2.handle(invocation).await else {
panic!("legacy message field should be rejected");
};
let FunctionCallError::RespondToModel(message) = err else {
panic!("legacy message field should surface as a model-facing error");
};
assert!(message.contains("unknown field `message`"));
}
#[tokio::test]
async fn spawn_agent_errors_when_manager_dropped() {
let (session, turn) = make_session_and_context().await;
@@ -408,7 +444,7 @@ async fn multi_agent_v2_spawn_returns_path_and_send_message_accepts_relative_pat
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "test_process"
})),
))
@@ -503,7 +539,7 @@ async fn multi_agent_v2_spawn_rejects_legacy_fork_context() {
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "worker",
"fork_context": true
})),
@@ -542,7 +578,7 @@ async fn multi_agent_v2_spawn_rejects_invalid_fork_turns_string() {
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "worker",
"fork_turns": "banana"
})),
@@ -581,7 +617,7 @@ async fn multi_agent_v2_spawn_rejects_zero_fork_turns() {
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "worker",
"fork_turns": "0"
})),
@@ -695,7 +731,7 @@ async fn multi_agent_v2_list_agents_returns_completed_status_and_last_task_messa
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "worker"
})),
))
@@ -873,7 +909,7 @@ async fn multi_agent_v2_list_agents_omits_closed_agents() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "worker"
})),
))
@@ -937,7 +973,7 @@ async fn multi_agent_v2_send_message_rejects_structured_items() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"items": [{"type": "text", "text": "boot worker"}],
"task_name": "worker"
})),
))
@@ -974,7 +1010,7 @@ async fn multi_agent_v2_send_message_rejects_structured_items() {
}
#[tokio::test]
async fn multi_agent_v2_send_message_interrupts_busy_child_without_triggering_turn() {
async fn multi_agent_v2_send_message_rejects_interrupt_parameter() {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
@@ -995,7 +1031,7 @@ async fn multi_agent_v2_send_message_interrupts_busy_child_without_triggering_tu
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"items": [{"type": "text", "text": "boot worker"}],
"task_name": "worker"
})),
))
@@ -1007,107 +1043,43 @@ async fn multi_agent_v2_send_message_interrupts_busy_child_without_triggering_tu
.resolve_agent_reference(session.conversation_id, &turn.session_source, "worker")
.await
.expect("worker should resolve");
let thread = manager
.get_thread(agent_id)
.await
.expect("worker thread should exist");
let active_turn = thread.codex.session.new_default_turn().await;
thread
.codex
.session
.spawn_task(
Arc::clone(&active_turn),
vec![UserInput::Text {
text: "working".to_string(),
text_elements: Vec::new(),
}],
NeverEndingTask,
)
.await;
let invocation = invocation(
session,
turn,
"send_message",
function_payload(json!({
"target": agent_id.to_string(),
"items": [{"type": "text", "text": "continue"}],
"interrupt": true
})),
);
SendMessageHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
"send_message",
function_payload(json!({
"target": agent_id.to_string(),
"items": [{"type": "text", "text": "continue"}],
"interrupt": true
})),
))
.await
.expect("interrupting v2 send_message should succeed");
let Err(err) = SendMessageHandlerV2.handle(invocation).await else {
panic!("send_message interrupt parameter should be rejected");
};
let FunctionCallError::RespondToModel(message) = err else {
panic!("expected model-facing parse error");
};
assert!(message.starts_with(
"failed to parse function arguments: unknown field `interrupt`, expected `target` or `items`"
));
let ops = manager.captured_ops();
let ops_for_agent: Vec<&Op> = ops
.iter()
.filter_map(|(id, op)| (*id == agent_id).then_some(op))
.collect();
assert!(ops_for_agent.iter().any(|op| matches!(op, Op::Interrupt)));
assert!(ops_for_agent.iter().any(|op| {
matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == AgentPath::root()
&& communication.recipient.as_str() == "/root/worker"
&& communication.other_recipients.is_empty()
&& communication.content == "continue"
&& !communication.trigger_turn
)
}));
timeout(Duration::from_secs(5), async {
loop {
if !thread.codex.session.has_pending_input().await {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
let history_items = thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
let saw_envelope = history_contains_inter_agent_communication(
&history_items,
&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"continue".to_string(),
/*trigger_turn*/ false,
),
);
let saw_user_message = history_items.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "user"
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::InputText { text } if text == "continue"
))
)
});
if saw_envelope && !saw_user_message {
panic!("send_message should not materialize the envelope into history");
}
if !saw_user_message {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("interrupting v2 send_message should queue the redirected message without a turn");
let _ = thread
.submit(Op::Shutdown {})
.await
.expect("shutdown should submit");
assert!(!ops_for_agent.iter().any(|op| matches!(op, Op::Interrupt)));
assert!(!ops_for_agent.iter().any(|op| matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == AgentPath::root()
&& communication.recipient.as_str() == "/root/worker"
&& communication.other_recipients.is_empty()
&& communication.content == "continue"
&& !communication.trigger_turn
)));
}
#[tokio::test]
@@ -1132,7 +1104,7 @@ async fn multi_agent_v2_assign_task_interrupts_busy_child_without_losing_message
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"items": [{"type": "text", "text": "boot worker"}],
"task_name": "worker"
})),
))
@@ -1261,7 +1233,7 @@ async fn multi_agent_v2_assign_task_completion_notifies_parent_on_every_turn() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"items": [{"type": "text", "text": "boot worker"}],
"task_name": "worker"
})),
))
@@ -1390,7 +1362,7 @@ async fn multi_agent_v2_interrupted_turn_does_not_notify_parent() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"items": [{"type": "text", "text": "boot worker"}],
"task_name": "worker"
})),
))
@@ -1466,7 +1438,7 @@ async fn multi_agent_v2_spawn_includes_agent_id_key_when_named() {
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "test_process"
})),
))
@@ -1504,7 +1476,7 @@ async fn multi_agent_v2_spawn_surfaces_task_name_validation_errors() {
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "BadName"
})),
);
@@ -2131,7 +2103,7 @@ async fn multi_agent_v2_wait_agent_accepts_timeout_only_argument() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"items": [{"type": "text", "text": "boot worker"}],
"task_name": "worker"
})),
))
@@ -2377,7 +2349,7 @@ async fn multi_agent_v2_wait_agent_returns_summary_for_mailbox_activity() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "test_process"
})),
))
@@ -2468,7 +2440,7 @@ async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"items": [{"type": "text", "text": "boot worker"}],
"task_name": "worker"
})),
))
@@ -2568,7 +2540,7 @@ async fn multi_agent_v2_wait_agent_wakes_on_any_mailbox_notification() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": format!("boot {task_name}"),
"items": [{"type": "text", "text": format!("boot {task_name}")}],
"task_name": task_name
})),
))
@@ -2655,7 +2627,7 @@ async fn multi_agent_v2_wait_agent_does_not_return_completed_content() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"items": [{"type": "text", "text": "boot worker"}],
"task_name": "worker"
})),
))
@@ -2741,7 +2713,7 @@ async fn multi_agent_v2_close_agent_accepts_task_name_target() {
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"items": [{"type": "text", "text": "inspect this repo"}],
"task_name": "worker"
})),
))

View File

@@ -1,3 +1,4 @@
use super::message_tool::AssignTaskArgs;
use super::message_tool::MessageDeliveryMode;
use super::message_tool::MessageToolResult;
use super::message_tool::handle_message_tool;
@@ -18,6 +19,15 @@ impl ToolHandler for Handler {
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
handle_message_tool(invocation, MessageDeliveryMode::TriggerTurn).await
let arguments = function_arguments(invocation.payload.clone())?;
let args: AssignTaskArgs = parse_arguments(&arguments)?;
handle_message_tool(
invocation,
MessageDeliveryMode::TriggerTurn,
args.target,
args.items,
args.interrupt,
)
.await
}
}

View File

@@ -38,8 +38,17 @@ impl MessageDeliveryMode {
}
#[derive(Debug, Deserialize)]
/// Input shared by the MultiAgentV2 `send_message` and `assign_task` tools.
pub(crate) struct MessageToolArgs {
#[serde(deny_unknown_fields)]
/// Input for the MultiAgentV2 `send_message` tool.
pub(crate) struct SendMessageArgs {
pub(crate) target: String,
pub(crate) items: Vec<UserInput>,
}
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
/// Input for the MultiAgentV2 `assign_task` tool.
pub(crate) struct AssignTaskArgs {
pub(crate) target: String,
pub(crate) items: Vec<UserInput>,
#[serde(default)]
@@ -95,6 +104,9 @@ fn text_content(
pub(crate) async fn handle_message_tool(
invocation: ToolInvocation,
mode: MessageDeliveryMode,
target: String,
items: Vec<UserInput>,
interrupt: bool,
) -> Result<MessageToolResult, FunctionCallError> {
let ToolInvocation {
session,
@@ -103,16 +115,15 @@ pub(crate) async fn handle_message_tool(
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: MessageToolArgs = parse_arguments(&arguments)?;
let receiver_thread_id = resolve_agent_target(&session, &turn, &args.target).await?;
let prompt = text_content(&args.items, mode)?;
let _ = payload;
let receiver_thread_id = resolve_agent_target(&session, &turn, &target).await?;
let prompt = text_content(&items, mode)?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or_default();
if args.interrupt {
if interrupt {
session
.services
.agent_control

View File

@@ -1,5 +1,6 @@
use super::message_tool::MessageDeliveryMode;
use super::message_tool::MessageToolResult;
use super::message_tool::SendMessageArgs;
use super::message_tool::handle_message_tool;
use super::*;
@@ -18,6 +19,15 @@ impl ToolHandler for Handler {
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
handle_message_tool(invocation, MessageDeliveryMode::QueueOnly).await
let arguments = function_arguments(invocation.payload.clone())?;
let args: SendMessageArgs = parse_arguments(&arguments)?;
handle_message_tool(
invocation,
MessageDeliveryMode::QueueOnly,
args.target,
args.items,
/*interrupt*/ false,
)
.await
}
}

View File

@@ -40,7 +40,7 @@ impl ToolHandler for Handler {
.map(str::trim)
.filter(|role| !role.is_empty());
let initial_operation = parse_collab_input(args.message, args.items)?;
let initial_operation = parse_collab_input(/*message*/ None, Some(args.items))?;
let prompt = render_input_preview(&initial_operation);
let session_source = turn.session_source.clone();
@@ -200,9 +200,9 @@ impl ToolHandler for Handler {
}
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct SpawnAgentArgs {
message: Option<String>,
items: Option<Vec<UserInput>>,
items: Vec<UserInput>,
task_name: String,
agent_type: Option<String>,
model: Option<String>,

View File

@@ -497,9 +497,14 @@ fn test_build_specs_multi_agent_v2_uses_task_names_and_hides_resume() {
panic!("spawn_agent should use object params");
};
assert!(properties.contains_key("task_name"));
assert!(properties.contains_key("items"));
assert!(properties.contains_key("fork_turns"));
assert!(!properties.contains_key("message"));
assert!(!properties.contains_key("fork_context"));
assert_eq!(required.as_ref(), Some(&vec!["task_name".to_string()]));
assert_eq!(
required.as_ref(),
Some(&vec!["task_name".to_string(), "items".to_string()])
);
let output_schema = output_schema
.as_ref()
.expect("spawn_agent should define output schema");
@@ -521,6 +526,7 @@ fn test_build_specs_multi_agent_v2_uses_task_names_and_hides_resume() {
panic!("send_message should use object params");
};
assert!(properties.contains_key("target"));
assert!(!properties.contains_key("interrupt"));
assert!(!properties.contains_key("message"));
assert_eq!(
required.as_ref(),

View File

@@ -66,7 +66,7 @@ pub fn create_spawn_agent_tool_v2(options: SpawnAgentToolOptions<'_>) -> ToolSpe
defer_loading: None,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["task_name".to_string()]),
required: Some(vec!["task_name".to_string(), "items".to_string()]),
additional_properties: Some(false.into()),
},
output_schema: Some(spawn_agent_output_schema_v2()),
@@ -128,20 +128,11 @@ pub fn create_send_message_tool() -> ToolSpec {
},
),
("items".to_string(), create_collab_input_items_schema()),
(
"interrupt".to_string(),
JsonSchema::Boolean {
description: Some(
"When true, stop the agent's current task and handle this immediately. When false (default), queue this message."
.to_string(),
),
},
),
]);
ToolSpec::Function(ResponsesApiTool {
name: "send_message".to_string(),
description: "Add a message to an existing agent without triggering a new turn. Use interrupt=true to stop the current task first. In MultiAgentV2, this tool currently supports text content only."
description: "Add a message to an existing agent without triggering a new turn. In MultiAgentV2, this tool currently supports text content only."
.to_string(),
strict: false,
defer_loading: None,
@@ -594,15 +585,6 @@ fn spawn_agent_common_properties_v1(agent_type_description: &str) -> BTreeMap<St
fn spawn_agent_common_properties_v2(agent_type_description: &str) -> BTreeMap<String, JsonSchema> {
BTreeMap::from([
(
"message".to_string(),
JsonSchema::String {
description: Some(
"Initial plain-text task for the new agent. Use either message or items."
.to_string(),
),
},
),
("items".to_string(), create_collab_input_items_schema()),
(
"agent_type".to_string(),

View File

@@ -56,7 +56,9 @@ fn spawn_agent_tool_v2_requires_task_name_and_lists_visible_models() {
assert!(description.contains("visible display (`visible-model`)"));
assert!(!description.contains("hidden display (`hidden-model`)"));
assert!(properties.contains_key("task_name"));
assert!(properties.contains_key("items"));
assert!(properties.contains_key("fork_turns"));
assert!(!properties.contains_key("message"));
assert!(!properties.contains_key("fork_context"));
assert_eq!(
properties.get("agent_type"),
@@ -64,7 +66,10 @@ fn spawn_agent_tool_v2_requires_task_name_and_lists_visible_models() {
description: Some("role help".to_string()),
})
);
assert_eq!(required, Some(vec!["task_name".to_string()]));
assert_eq!(
required,
Some(vec!["task_name".to_string(), "items".to_string()])
);
assert_eq!(
output_schema.expect("spawn_agent output schema")["required"],
json!(["agent_id", "task_name", "nickname"])
@@ -109,6 +114,7 @@ fn send_message_tool_requires_items_and_uses_submission_output() {
};
assert!(properties.contains_key("target"));
assert!(properties.contains_key("items"));
assert!(!properties.contains_key("interrupt"));
assert!(!properties.contains_key("message"));
assert_eq!(
required,