mirror of
https://github.com/openai/codex.git
synced 2026-05-01 18:06:47 +00:00
Compare commits
1 Commits
ice-window
...
subagent-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27d5ba2aae |
@@ -483,6 +483,7 @@ impl AgentControl {
|
||||
) -> CodexResult<ThreadId> {
|
||||
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = &session_source
|
||||
&& *depth >= config.agent_max_depth
|
||||
&& !config.features.enabled(Feature::MultiAgentV2)
|
||||
{
|
||||
let _ = config.features.disable(Feature::SpawnCsv);
|
||||
let _ = config.features.disable(Feature::Collab);
|
||||
@@ -638,6 +639,37 @@ impl AgentControl {
|
||||
result
|
||||
}
|
||||
|
||||
pub(crate) async fn enqueue_inter_agent_communication(
|
||||
&self,
|
||||
agent_id: ThreadId,
|
||||
communication: InterAgentCommunication,
|
||||
) -> CodexResult<()> {
|
||||
let last_task_message = communication.content.clone();
|
||||
let state = self.upgrade()?;
|
||||
let thread = state.get_thread(agent_id).await?;
|
||||
thread
|
||||
.codex
|
||||
.session
|
||||
.enqueue_mailbox_communication(communication);
|
||||
self.state
|
||||
.update_last_task_message(agent_id, last_task_message);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn maybe_start_turn_for_pending_work(
|
||||
&self,
|
||||
agent_id: ThreadId,
|
||||
) -> CodexResult<()> {
|
||||
let state = self.upgrade()?;
|
||||
let thread = state.get_thread(agent_id).await?;
|
||||
thread
|
||||
.codex
|
||||
.session
|
||||
.maybe_start_turn_for_pending_work()
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Interrupt the current task for an existing agent thread.
|
||||
pub(crate) async fn interrupt_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
|
||||
let state = self.upgrade()?;
|
||||
@@ -717,6 +749,12 @@ impl AgentControl {
|
||||
thread.agent_status().await
|
||||
}
|
||||
|
||||
pub(crate) async fn has_mailbox_waiters(&self, agent_id: ThreadId) -> CodexResult<bool> {
|
||||
let state = self.upgrade()?;
|
||||
let thread = state.get_thread(agent_id).await?;
|
||||
Ok(thread.codex.session.has_mailbox_waiters())
|
||||
}
|
||||
|
||||
pub(crate) fn register_session_root(
|
||||
&self,
|
||||
current_thread_id: ThreadId,
|
||||
@@ -731,6 +769,10 @@ impl AgentControl {
|
||||
self.state.agent_metadata_for_thread(agent_id)
|
||||
}
|
||||
|
||||
pub(crate) fn agent_id_for_path(&self, agent_path: &AgentPath) -> Option<ThreadId> {
|
||||
self.state.agent_id_for_path(agent_path)
|
||||
}
|
||||
|
||||
pub(crate) async fn list_live_agent_subtree_thread_ids(
|
||||
&self,
|
||||
agent_id: ThreadId,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::watch;
|
||||
@@ -12,6 +13,7 @@ pub(crate) struct Mailbox {
|
||||
tx: mpsc::UnboundedSender<InterAgentCommunication>,
|
||||
next_seq: AtomicU64,
|
||||
seq_tx: watch::Sender<u64>,
|
||||
waiter_count: AtomicUsize,
|
||||
}
|
||||
|
||||
pub(crate) struct MailboxReceiver {
|
||||
@@ -28,6 +30,7 @@ impl Mailbox {
|
||||
tx,
|
||||
next_seq: AtomicU64::new(0),
|
||||
seq_tx,
|
||||
waiter_count: AtomicUsize::new(0),
|
||||
},
|
||||
MailboxReceiver {
|
||||
rx,
|
||||
@@ -46,6 +49,25 @@ impl Mailbox {
|
||||
self.seq_tx.send_replace(seq);
|
||||
seq
|
||||
}
|
||||
|
||||
pub(crate) fn begin_wait(&self) -> MailboxWaitGuard<'_> {
|
||||
self.waiter_count.fetch_add(1, Ordering::Relaxed);
|
||||
MailboxWaitGuard { mailbox: self }
|
||||
}
|
||||
|
||||
pub(crate) fn has_waiters(&self) -> bool {
|
||||
self.waiter_count.load(Ordering::Relaxed) > 0
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct MailboxWaitGuard<'a> {
|
||||
mailbox: &'a Mailbox,
|
||||
}
|
||||
|
||||
impl Drop for MailboxWaitGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.mailbox.waiter_count.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
impl MailboxReceiver {
|
||||
|
||||
@@ -469,6 +469,7 @@ impl Codex {
|
||||
|
||||
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = session_source
|
||||
&& depth >= config.agent_max_depth
|
||||
&& !config.features.enabled(Feature::MultiAgentV2)
|
||||
{
|
||||
let _ = config.features.disable(Feature::SpawnCsv);
|
||||
let _ = config.features.disable(Feature::Collab);
|
||||
@@ -2867,6 +2868,23 @@ impl Session {
|
||||
self.mailbox.subscribe()
|
||||
}
|
||||
|
||||
pub(crate) fn begin_mailbox_wait(&self) -> crate::agent::mailbox::MailboxWaitGuard<'_> {
|
||||
self.mailbox.begin_wait()
|
||||
}
|
||||
|
||||
pub(crate) fn has_mailbox_waiters(&self) -> bool {
|
||||
self.mailbox.has_waiters()
|
||||
}
|
||||
|
||||
pub(crate) async fn session_source(&self) -> SessionSource {
|
||||
self.state
|
||||
.lock()
|
||||
.await
|
||||
.session_configuration
|
||||
.session_source
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn enqueue_mailbox_communication(&self, communication: InterAgentCommunication) {
|
||||
self.mailbox.send(communication);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::*;
|
||||
use crate::CodexThread;
|
||||
use crate::NewThread;
|
||||
use crate::ThreadManager;
|
||||
use crate::config::AgentRoleConfig;
|
||||
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
|
||||
@@ -13,6 +14,7 @@ use crate::tools::context::ToolOutput;
|
||||
use crate::tools::handlers::multi_agents_v2::CloseAgentHandler as CloseAgentHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::FollowupTaskHandler as FollowupTaskHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::ListAgentsHandler as ListAgentsHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::ParentMessageHandler as ParentMessageHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::SendMessageHandler as SendMessageHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::SpawnAgentHandler as SpawnAgentHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::WaitAgentHandler as WaitAgentHandlerV2;
|
||||
@@ -25,6 +27,7 @@ use codex_model_provider::create_model_provider;
|
||||
use codex_model_provider_info::built_in_model_providers;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
@@ -46,6 +49,7 @@ use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::TempDirExt;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -92,6 +96,123 @@ fn thread_manager() -> ThreadManager {
|
||||
)
|
||||
}
|
||||
|
||||
async fn pending_inter_agent_mail(
|
||||
session: &crate::session::session::Session,
|
||||
) -> Vec<InterAgentCommunication> {
|
||||
session
|
||||
.get_pending_input()
|
||||
.await
|
||||
.into_iter()
|
||||
.filter_map(|item| match item {
|
||||
ResponseInputItem::Message { content, .. } => {
|
||||
InterAgentCommunication::from_message_content(&content)
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn make_parent_message_child(
|
||||
agent_path: Option<AgentPath>,
|
||||
) -> (
|
||||
ThreadManager,
|
||||
NewThread,
|
||||
crate::session::session::Session,
|
||||
TurnContext,
|
||||
ThreadId,
|
||||
) {
|
||||
let (mut session, mut turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
let root = manager
|
||||
.start_thread((*turn.config).clone())
|
||||
.await
|
||||
.expect("root thread should start");
|
||||
session.services.agent_control = manager.agent_control();
|
||||
session.conversation_id = root.thread_id;
|
||||
let mut config = (*turn.config).clone();
|
||||
config
|
||||
.features
|
||||
.enable(Feature::MultiAgentV2)
|
||||
.expect("test config should allow feature update");
|
||||
turn.config = Arc::new(config);
|
||||
|
||||
let child_thread_id = session
|
||||
.services
|
||||
.agent_control
|
||||
.spawn_agent_with_metadata(
|
||||
(*turn.config).clone(),
|
||||
vec![UserInput::Text {
|
||||
text: "inspect this repo".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}]
|
||||
.into(),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: root.thread_id,
|
||||
depth: 1,
|
||||
agent_path: agent_path.clone(),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
})),
|
||||
crate::agent::control::SpawnAgentOptions::default(),
|
||||
)
|
||||
.await
|
||||
.expect("worker spawn should succeed")
|
||||
.thread_id;
|
||||
session.conversation_id = child_thread_id;
|
||||
turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: root.thread_id,
|
||||
depth: 1,
|
||||
agent_path,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
});
|
||||
|
||||
(manager, root, session, turn, child_thread_id)
|
||||
}
|
||||
|
||||
async fn mark_parent_running(root: &NewThread) -> Arc<TurnContext> {
|
||||
let root_turn = root.thread.codex.session.new_default_turn().await;
|
||||
root.thread
|
||||
.codex
|
||||
.session
|
||||
.send_event(
|
||||
root_turn.as_ref(),
|
||||
EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: root_turn.sub_id.clone(),
|
||||
started_at: None,
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: ModeKind::Default,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
root_turn
|
||||
}
|
||||
|
||||
async fn wait_until_parent_mail_is_pending(session: &crate::session::session::Session) {
|
||||
for _ in 0..50 {
|
||||
if session.has_pending_input().await {
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_parent_message(
|
||||
session: crate::session::session::Session,
|
||||
turn: TurnContext,
|
||||
payload: serde_json::Value,
|
||||
) {
|
||||
ParentMessageHandlerV2
|
||||
.handle(invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"send_parent_message",
|
||||
function_payload(payload),
|
||||
))
|
||||
.await
|
||||
.expect("parent message should succeed");
|
||||
}
|
||||
|
||||
async fn install_role_with_model_override(turn: &mut TurnContext) -> String {
|
||||
let role_name = "fork-context-role".to_string();
|
||||
tokio::fs::create_dir_all(&turn.config.codex_home)
|
||||
@@ -1104,6 +1225,314 @@ async fn multi_agent_v2_followup_task_rejects_root_target_from_child() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_parent_message_interrupts_busy_parent_before_delivering_mail() {
|
||||
let child_path = AgentPath::try_from("/root/worker").expect("agent path");
|
||||
let (manager, root, session, turn, _) =
|
||||
make_parent_message_child(Some(child_path.clone())).await;
|
||||
let root_turn = mark_parent_running(&root).await;
|
||||
|
||||
send_parent_message(
|
||||
session,
|
||||
turn,
|
||||
json!({
|
||||
"message": "parent please take over",
|
||||
"mode": "interrupt"
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let root_ops = manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.filter_map(|(id, op)| (id == root.thread_id).then_some(op))
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(
|
||||
root_ops
|
||||
.iter()
|
||||
.filter(|op| matches!(op, Op::Interrupt))
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
root.thread
|
||||
.codex
|
||||
.session
|
||||
.send_event(
|
||||
root_turn.as_ref(),
|
||||
EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: Some(root_turn.sub_id.clone()),
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
wait_until_parent_mail_is_pending(root.thread.codex.session.as_ref()).await;
|
||||
assert_eq!(
|
||||
pending_inter_agent_mail(root.thread.codex.session.as_ref()).await,
|
||||
vec![InterAgentCommunication::new(
|
||||
child_path,
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
"parent please take over".to_string(),
|
||||
/*trigger_turn*/ true,
|
||||
)]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_parent_message_queue_trigger_waits_for_busy_parent_to_finish() {
|
||||
let child_path = AgentPath::try_from("/root/worker").expect("agent path");
|
||||
let (manager, root, session, turn, _) =
|
||||
make_parent_message_child(Some(child_path.clone())).await;
|
||||
let root_turn = mark_parent_running(&root).await;
|
||||
|
||||
send_parent_message(
|
||||
session,
|
||||
turn,
|
||||
json!({
|
||||
"message": "parent should process when free",
|
||||
"mode": "queue",
|
||||
"trigger_turn": true
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let root_ops = manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.filter_map(|(id, op)| (id == root.thread_id).then_some(op))
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(
|
||||
root_ops
|
||||
.iter()
|
||||
.filter(|op| matches!(op, Op::Interrupt))
|
||||
.count(),
|
||||
0
|
||||
);
|
||||
assert!(
|
||||
!root.thread.codex.session.has_pending_input().await,
|
||||
"queue plus trigger should not be drainable by the currently running parent turn"
|
||||
);
|
||||
|
||||
root.thread
|
||||
.codex
|
||||
.session
|
||||
.send_event(
|
||||
root_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: root_turn.sub_id.clone(),
|
||||
last_agent_message: Some("parent turn done".to_string()),
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
wait_until_parent_mail_is_pending(root.thread.codex.session.as_ref()).await;
|
||||
|
||||
assert_eq!(
|
||||
pending_inter_agent_mail(root.thread.codex.session.as_ref()).await,
|
||||
vec![InterAgentCommunication::new(
|
||||
child_path,
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
"parent should process when free".to_string(),
|
||||
/*trigger_turn*/ true,
|
||||
)]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_parent_message_uses_registry_when_turn_source_is_generic() {
|
||||
let child_path = AgentPath::try_from("/root/worker").expect("agent path");
|
||||
let (_manager, root, session, mut turn, _) =
|
||||
make_parent_message_child(Some(child_path.clone())).await;
|
||||
turn.session_source = SessionSource::Cli;
|
||||
|
||||
send_parent_message(
|
||||
session,
|
||||
turn,
|
||||
json!({
|
||||
"message": "registry-resolved parent mail",
|
||||
"mode": "enqueue"
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
pending_inter_agent_mail(root.thread.codex.session.as_ref()).await,
|
||||
vec![InterAgentCommunication::new(
|
||||
child_path,
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
"registry-resolved parent mail".to_string(),
|
||||
/*trigger_turn*/ false,
|
||||
)]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_parent_message_synthesizes_path_for_anonymous_thread_spawn() {
|
||||
let (_manager, root, session, turn, child_thread_id) =
|
||||
make_parent_message_child(/*agent_path*/ None).await;
|
||||
|
||||
send_parent_message(
|
||||
session,
|
||||
turn,
|
||||
json!({
|
||||
"message": "anonymous parent mail",
|
||||
"mode": "enqueue"
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let synthetic_child_path = AgentPath::try_from(format!(
|
||||
"/root/agent_{}",
|
||||
child_thread_id.to_string().replace('-', "_")
|
||||
))
|
||||
.expect("synthetic child path should be valid");
|
||||
assert_eq!(
|
||||
pending_inter_agent_mail(root.thread.codex.session.as_ref()).await,
|
||||
vec![InterAgentCommunication::new(
|
||||
synthetic_child_path,
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
"anonymous parent mail".to_string(),
|
||||
/*trigger_turn*/ false,
|
||||
)]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_parent_message_wakes_wait_agent_without_interrupting_parent() {
|
||||
let (_session, mut turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
let mut config = (*turn.config).clone();
|
||||
config
|
||||
.features
|
||||
.enable(Feature::MultiAgentV2)
|
||||
.expect("test config should allow feature update");
|
||||
let root = manager
|
||||
.start_thread(config.clone())
|
||||
.await
|
||||
.expect("root thread should start");
|
||||
turn.config = Arc::new(config);
|
||||
|
||||
let parent_session = root.thread.codex.session.clone();
|
||||
let parent_turn = Arc::new(turn);
|
||||
let wait_task = tokio::spawn({
|
||||
let session = parent_session.clone();
|
||||
let turn = parent_turn.clone();
|
||||
async move {
|
||||
WaitAgentHandlerV2
|
||||
.handle(invocation(
|
||||
session,
|
||||
turn,
|
||||
"wait_agent",
|
||||
function_payload(json!({"timeout_ms": 10_000})),
|
||||
))
|
||||
.await
|
||||
}
|
||||
});
|
||||
for _ in 0..50 {
|
||||
if parent_session.has_mailbox_waiters() {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
assert!(
|
||||
parent_session.has_mailbox_waiters(),
|
||||
"wait_agent should register mailbox waiter"
|
||||
);
|
||||
|
||||
let child_path = AgentPath::try_from("/root/worker").expect("agent path");
|
||||
let child_thread_id = parent_session
|
||||
.services
|
||||
.agent_control
|
||||
.spawn_agent_with_metadata(
|
||||
(*parent_turn.config).clone(),
|
||||
vec![UserInput::Text {
|
||||
text: "inspect this repo".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}]
|
||||
.into(),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: root.thread_id,
|
||||
depth: 1,
|
||||
agent_path: Some(child_path.clone()),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
})),
|
||||
crate::agent::control::SpawnAgentOptions::default(),
|
||||
)
|
||||
.await
|
||||
.expect("worker spawn should succeed")
|
||||
.thread_id;
|
||||
let (mut child_session, mut child_turn) = make_session_and_context().await;
|
||||
child_session.services.agent_control = manager.agent_control();
|
||||
child_session.conversation_id = child_thread_id;
|
||||
child_turn.config = parent_turn.config.clone();
|
||||
child_turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: root.thread_id,
|
||||
depth: 1,
|
||||
agent_path: Some(child_path.clone()),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
});
|
||||
|
||||
ParentMessageHandlerV2
|
||||
.handle(invocation(
|
||||
Arc::new(child_session),
|
||||
Arc::new(child_turn),
|
||||
"send_parent_message",
|
||||
function_payload(json!({
|
||||
"message": "waiter-visible parent mail",
|
||||
"mode": "queue",
|
||||
"trigger_turn": true
|
||||
})),
|
||||
))
|
||||
.await
|
||||
.expect("parent message should succeed");
|
||||
|
||||
let wait_output = wait_task
|
||||
.await
|
||||
.expect("wait task should join")
|
||||
.expect("wait_agent should succeed");
|
||||
let (content, _) = expect_text_output(wait_output);
|
||||
let result: crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult =
|
||||
serde_json::from_str(&content).expect("wait_agent result should parse");
|
||||
assert_eq!(
|
||||
result,
|
||||
crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult {
|
||||
message: "Wait completed.".to_string(),
|
||||
timed_out: false,
|
||||
}
|
||||
);
|
||||
|
||||
let root_ops = manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.filter_map(|(id, op)| (id == root.thread_id).then_some(op))
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(
|
||||
root_ops
|
||||
.iter()
|
||||
.filter(|op| matches!(op, Op::Interrupt))
|
||||
.count(),
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
pending_inter_agent_mail(parent_session.as_ref()).await,
|
||||
vec![InterAgentCommunication::new(
|
||||
child_path,
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
"waiter-visible parent mail".to_string(),
|
||||
/*trigger_turn*/ true,
|
||||
)]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_list_agents_returns_completed_status_and_last_task_message() {
|
||||
let (mut session, mut turn) = make_session_and_context().await;
|
||||
@@ -2847,7 +3276,7 @@ async fn multi_agent_v2_wait_agent_returns_summary_for_mailbox_activity() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() {
|
||||
async fn multi_agent_v2_wait_agent_returns_pending_mail_after_start() {
|
||||
let (mut session, mut turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
let root = manager
|
||||
@@ -2892,7 +3321,7 @@ async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() {
|
||||
.expect("worker path");
|
||||
|
||||
session.enqueue_mailbox_communication(InterAgentCommunication::new(
|
||||
worker_path.clone(),
|
||||
worker_path,
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
"already queued".to_string(),
|
||||
@@ -2913,21 +3342,6 @@ async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() {
|
||||
.await
|
||||
}
|
||||
});
|
||||
tokio::task::yield_now().await;
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
assert!(
|
||||
!wait_task.is_finished(),
|
||||
"mail already queued before wait should not wake wait_agent"
|
||||
);
|
||||
|
||||
session.enqueue_mailbox_communication(InterAgentCommunication::new(
|
||||
worker_path,
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
"new mail".to_string(),
|
||||
/*trigger_turn*/ false,
|
||||
));
|
||||
|
||||
let output = wait_task
|
||||
.await
|
||||
.expect("wait task should join")
|
||||
|
||||
@@ -30,6 +30,7 @@ use serde_json::Value as JsonValue;
|
||||
pub(crate) use close_agent::Handler as CloseAgentHandler;
|
||||
pub(crate) use followup_task::Handler as FollowupTaskHandler;
|
||||
pub(crate) use list_agents::Handler as ListAgentsHandler;
|
||||
pub(crate) use parent_message::Handler as ParentMessageHandler;
|
||||
pub(crate) use send_message::Handler as SendMessageHandler;
|
||||
pub(crate) use spawn::Handler as SpawnAgentHandler;
|
||||
pub(crate) use wait::Handler as WaitAgentHandler;
|
||||
@@ -38,6 +39,7 @@ mod close_agent;
|
||||
mod followup_task;
|
||||
mod list_agents;
|
||||
mod message_tool;
|
||||
mod parent_message;
|
||||
mod send_message;
|
||||
mod spawn;
|
||||
pub(crate) mod wait;
|
||||
|
||||
@@ -47,7 +47,7 @@ pub(crate) struct FollowupTaskArgs {
|
||||
pub(crate) interrupt: bool,
|
||||
}
|
||||
|
||||
fn message_content(message: String) -> Result<String, FunctionCallError> {
|
||||
pub(crate) fn message_content(message: String) -> Result<String, FunctionCallError> {
|
||||
if message.trim().is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"Empty message can't be sent to an agent".to_string(),
|
||||
|
||||
@@ -0,0 +1,359 @@
|
||||
use super::message_tool::message_content;
|
||||
use super::*;
|
||||
use crate::tools::context::FunctionToolOutput;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
|
||||
struct ParentMessageSource {
|
||||
parent_thread_id: ThreadId,
|
||||
parent_agent_path: AgentPath,
|
||||
child_agent_path: AgentPath,
|
||||
}
|
||||
|
||||
pub(crate) struct Handler;
|
||||
|
||||
impl ToolHandler for Handler {
|
||||
type Output = FunctionToolOutput;
|
||||
|
||||
fn kind(&self) -> ToolKind {
|
||||
ToolKind::Function
|
||||
}
|
||||
|
||||
fn matches_kind(&self, payload: &ToolPayload) -> bool {
|
||||
matches!(payload, ToolPayload::Function { .. })
|
||||
}
|
||||
|
||||
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
|
||||
let arguments = function_arguments(invocation.payload.clone())?;
|
||||
let args: ParentMessageArgs = parse_arguments(&arguments)?;
|
||||
let delivery = args.delivery_options();
|
||||
handle_parent_message(invocation, message_content(args.message)?, delivery).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct ParentMessageArgs {
|
||||
message: String,
|
||||
#[serde(default)]
|
||||
mode: ParentMessageMode,
|
||||
trigger_turn: Option<bool>,
|
||||
}
|
||||
|
||||
impl ParentMessageArgs {
|
||||
fn delivery_options(&self) -> ParentMessageDelivery {
|
||||
ParentMessageDelivery {
|
||||
mode: self.mode,
|
||||
trigger_turn: self
|
||||
.trigger_turn
|
||||
.unwrap_or(self.mode == ParentMessageMode::Interrupt),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Deserialize, Clone, Copy, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
enum ParentMessageMode {
|
||||
#[default]
|
||||
#[serde(alias = "enqueue")]
|
||||
Queue,
|
||||
Interrupt,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
struct ParentMessageDelivery {
|
||||
mode: ParentMessageMode,
|
||||
trigger_turn: bool,
|
||||
}
|
||||
|
||||
async fn handle_parent_message(
|
||||
invocation: ToolInvocation,
|
||||
prompt: String,
|
||||
delivery: ParentMessageDelivery,
|
||||
) -> Result<FunctionToolOutput, FunctionCallError> {
|
||||
let ToolInvocation {
|
||||
session,
|
||||
turn,
|
||||
call_id,
|
||||
..
|
||||
} = invocation;
|
||||
let session_source = session.session_source().await;
|
||||
let source_result = parent_message_source(&turn.session_source)
|
||||
.or_else(|| parent_message_source(&session_source))
|
||||
.or_else(|| parent_message_source_from_agent_registry(session.as_ref()))
|
||||
.or_else(|| {
|
||||
parent_message_source_from_thread_spawn_context(
|
||||
session.as_ref(),
|
||||
&turn.session_source,
|
||||
&session_source,
|
||||
)
|
||||
});
|
||||
let source = match source_result {
|
||||
Some(Ok(source)) => Some(source),
|
||||
Some(Err(err)) => return Err(err),
|
||||
None => {
|
||||
parent_message_source_from_parent_thread(
|
||||
session.as_ref(),
|
||||
&turn.session_source,
|
||||
&session_source,
|
||||
)
|
||||
.await
|
||||
}
|
||||
};
|
||||
let source = source.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(
|
||||
"send_parent_message is only available from a spawned sub-agent".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentInteractionBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
sender_thread_id: session.conversation_id,
|
||||
receiver_thread_id: source.parent_thread_id,
|
||||
prompt: prompt.clone(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let parent_waiting = if delivery.trigger_turn && delivery.mode == ParentMessageMode::Queue {
|
||||
session
|
||||
.services
|
||||
.agent_control
|
||||
.has_mailbox_waiters(source.parent_thread_id)
|
||||
.await
|
||||
.map_err(|err| collab_agent_error(source.parent_thread_id, err))?
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let communication = InterAgentCommunication::new(
|
||||
source.child_agent_path,
|
||||
source.parent_agent_path,
|
||||
Vec::new(),
|
||||
prompt.clone(),
|
||||
delivery.trigger_turn,
|
||||
);
|
||||
let parent_status_before_wake = session
|
||||
.services
|
||||
.agent_control
|
||||
.get_status(source.parent_thread_id)
|
||||
.await;
|
||||
let parent_running = matches!(parent_status_before_wake, AgentStatus::Running);
|
||||
let defer_delivery_until_parent_is_free = parent_running
|
||||
&& (delivery.mode == ParentMessageMode::Interrupt
|
||||
|| (delivery.trigger_turn && !parent_waiting));
|
||||
let send_result = if defer_delivery_until_parent_is_free {
|
||||
if delivery.mode == ParentMessageMode::Interrupt {
|
||||
session
|
||||
.services
|
||||
.agent_control
|
||||
.interrupt_agent(source.parent_thread_id)
|
||||
.await
|
||||
.map_err(|err| collab_agent_error(source.parent_thread_id, err))?;
|
||||
}
|
||||
deliver_parent_message_when_parent_is_free(
|
||||
session.services.agent_control.clone(),
|
||||
source.parent_thread_id,
|
||||
communication,
|
||||
delivery.trigger_turn,
|
||||
);
|
||||
Ok(())
|
||||
} else {
|
||||
session
|
||||
.services
|
||||
.agent_control
|
||||
.enqueue_inter_agent_communication(source.parent_thread_id, communication)
|
||||
.await
|
||||
.map_err(|err| collab_agent_error(source.parent_thread_id, err))
|
||||
};
|
||||
|
||||
if send_result.is_ok()
|
||||
&& delivery.trigger_turn
|
||||
&& !parent_waiting
|
||||
&& !defer_delivery_until_parent_is_free
|
||||
{
|
||||
session
|
||||
.services
|
||||
.agent_control
|
||||
.maybe_start_turn_for_pending_work(source.parent_thread_id)
|
||||
.await
|
||||
.map_err(|err| collab_agent_error(source.parent_thread_id, err))?;
|
||||
}
|
||||
|
||||
let status = session
|
||||
.services
|
||||
.agent_control
|
||||
.get_status(source.parent_thread_id)
|
||||
.await;
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentInteractionEndEvent {
|
||||
call_id,
|
||||
sender_thread_id: session.conversation_id,
|
||||
receiver_thread_id: source.parent_thread_id,
|
||||
receiver_agent_nickname: None,
|
||||
receiver_agent_role: None,
|
||||
prompt,
|
||||
status,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
|
||||
send_result?;
|
||||
Ok(FunctionToolOutput::from_text(String::new(), Some(true)))
|
||||
}
|
||||
|
||||
fn deliver_parent_message_when_parent_is_free(
|
||||
agent_control: crate::agent::control::AgentControl,
|
||||
parent_thread_id: ThreadId,
|
||||
communication: InterAgentCommunication,
|
||||
trigger_turn: bool,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
let Ok(mut status_rx) = agent_control.subscribe_status(parent_thread_id).await else {
|
||||
return;
|
||||
};
|
||||
while matches!(*status_rx.borrow(), AgentStatus::Running) {
|
||||
if status_rx.changed().await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if agent_control
|
||||
.enqueue_inter_agent_communication(parent_thread_id, communication)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
if trigger_turn {
|
||||
let _ = agent_control
|
||||
.maybe_start_turn_for_pending_work(parent_thread_id)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn parent_message_source(
|
||||
session_source: &SessionSource,
|
||||
) -> Option<Result<ParentMessageSource, FunctionCallError>> {
|
||||
match session_source {
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
agent_path: Some(agent_path),
|
||||
..
|
||||
}) => Some(
|
||||
parent_agent_path(agent_path).map(|parent_agent_path| ParentMessageSource {
|
||||
parent_thread_id: *parent_thread_id,
|
||||
parent_agent_path,
|
||||
child_agent_path: agent_path.clone(),
|
||||
}),
|
||||
),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn parent_message_source_from_agent_registry(
|
||||
session: &crate::session::session::Session,
|
||||
) -> Option<Result<ParentMessageSource, FunctionCallError>> {
|
||||
let child_agent_path = session
|
||||
.services
|
||||
.agent_control
|
||||
.get_agent_metadata(session.conversation_id)?
|
||||
.agent_path?;
|
||||
let parent_agent_path = match parent_agent_path(&child_agent_path) {
|
||||
Ok(parent_agent_path) => parent_agent_path,
|
||||
Err(err) => return Some(Err(err)),
|
||||
};
|
||||
let Some(parent_thread_id) = session
|
||||
.services
|
||||
.agent_control
|
||||
.agent_id_for_path(&parent_agent_path)
|
||||
else {
|
||||
return Some(Err(FunctionCallError::RespondToModel(
|
||||
"Could not resolve parent thread for this sub-agent".to_string(),
|
||||
)));
|
||||
};
|
||||
Some(Ok(ParentMessageSource {
|
||||
parent_thread_id,
|
||||
parent_agent_path,
|
||||
child_agent_path,
|
||||
}))
|
||||
}
|
||||
|
||||
fn parent_message_source_from_thread_spawn_context(
|
||||
session: &crate::session::session::Session,
|
||||
turn_session_source: &SessionSource,
|
||||
session_source: &SessionSource,
|
||||
) -> Option<Result<ParentMessageSource, FunctionCallError>> {
|
||||
let parent_thread_id = thread_spawn_parent_thread_id(turn_session_source)
|
||||
.or_else(|| thread_spawn_parent_thread_id(session_source))?;
|
||||
let child_agent_path = session
|
||||
.services
|
||||
.agent_control
|
||||
.get_agent_metadata(session.conversation_id)
|
||||
.and_then(|metadata| metadata.agent_path);
|
||||
child_agent_path.map(|child_agent_path| {
|
||||
let parent_agent_path = parent_agent_path(&child_agent_path)?;
|
||||
Ok(ParentMessageSource {
|
||||
parent_thread_id,
|
||||
parent_agent_path,
|
||||
child_agent_path,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async fn parent_message_source_from_parent_thread(
|
||||
session: &crate::session::session::Session,
|
||||
turn_session_source: &SessionSource,
|
||||
session_source: &SessionSource,
|
||||
) -> Option<ParentMessageSource> {
|
||||
let parent_thread_id = thread_spawn_parent_thread_id(turn_session_source)
|
||||
.or_else(|| thread_spawn_parent_thread_id(session_source))?;
|
||||
let parent_agent_path = session
|
||||
.services
|
||||
.agent_control
|
||||
.get_agent_config_snapshot(parent_thread_id)
|
||||
.await
|
||||
.and_then(|snapshot| snapshot.session_source.get_agent_path())
|
||||
.unwrap_or_else(AgentPath::root);
|
||||
let synthetic_name = format!(
|
||||
"agent_{}",
|
||||
session.conversation_id.to_string().replace('-', "_")
|
||||
);
|
||||
let child_agent_path = parent_agent_path.join(&synthetic_name).ok()?;
|
||||
Some(ParentMessageSource {
|
||||
parent_thread_id,
|
||||
parent_agent_path,
|
||||
child_agent_path,
|
||||
})
|
||||
}
|
||||
|
||||
fn thread_spawn_parent_thread_id(session_source: &SessionSource) -> Option<ThreadId> {
|
||||
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id, ..
|
||||
}) = session_source
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
Some(*parent_thread_id)
|
||||
}
|
||||
|
||||
fn parent_agent_path(child_agent_path: &AgentPath) -> Result<AgentPath, FunctionCallError> {
|
||||
child_agent_path
|
||||
.as_str()
|
||||
.rsplit_once('/')
|
||||
.and_then(|(parent, _)| AgentPath::try_from(parent).ok())
|
||||
.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(
|
||||
"Could not resolve parent agent path for this sub-agent".to_string(),
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -38,6 +38,7 @@ impl ToolHandler for Handler {
|
||||
};
|
||||
|
||||
let mut mailbox_seq_rx = session.subscribe_mailbox_seq();
|
||||
let _mailbox_wait_guard = session.begin_mailbox_wait();
|
||||
|
||||
session
|
||||
.send_event(
|
||||
@@ -53,7 +54,8 @@ impl ToolHandler for Handler {
|
||||
.await;
|
||||
|
||||
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
|
||||
let timed_out = !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await;
|
||||
let timed_out = !session.has_pending_input().await
|
||||
&& !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await;
|
||||
let result = WaitAgentResult::from_timed_out(timed_out);
|
||||
|
||||
session
|
||||
|
||||
@@ -104,6 +104,7 @@ pub(crate) fn build_specs_with_discoverable_tools(
|
||||
use crate::tools::handlers::multi_agents_v2::CloseAgentHandler as CloseAgentHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::FollowupTaskHandler as FollowupTaskHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::ListAgentsHandler as ListAgentsHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::ParentMessageHandler as ParentMessageHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::SendMessageHandler as SendMessageHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::SpawnAgentHandler as SpawnAgentHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::WaitAgentHandler as WaitAgentHandlerV2;
|
||||
@@ -239,6 +240,9 @@ pub(crate) fn build_specs_with_discoverable_tools(
|
||||
ToolHandlerKind::RequestUserInput => {
|
||||
builder.register_handler(handler.name, request_user_input_handler.clone());
|
||||
}
|
||||
ToolHandlerKind::ParentMessageV2 => {
|
||||
builder.register_handler(handler.name, Arc::new(ParentMessageHandlerV2));
|
||||
}
|
||||
ToolHandlerKind::ResumeAgentV1 => {
|
||||
builder.register_handler(handler.name, Arc::new(ResumeAgentHandler));
|
||||
}
|
||||
|
||||
@@ -181,6 +181,44 @@ pub fn create_followup_task_tool() -> ToolSpec {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_parent_message_tool() -> ToolSpec {
|
||||
let properties = BTreeMap::from([
|
||||
(
|
||||
"message".to_string(),
|
||||
JsonSchema::string(Some(
|
||||
"Message text to send to this sub-agent's direct parent.".to_string(),
|
||||
)),
|
||||
),
|
||||
(
|
||||
"mode".to_string(),
|
||||
JsonSchema::string_enum(
|
||||
vec![json!("queue"), json!("enqueue"), json!("interrupt")],
|
||||
Some(
|
||||
"`queue` queues the message in the parent mailbox. `enqueue` is accepted as a legacy alias for `queue`. `interrupt` cancels the parent's current turn before delivering the message."
|
||||
.to_string(),
|
||||
),
|
||||
),
|
||||
),
|
||||
(
|
||||
"trigger_turn".to_string(),
|
||||
JsonSchema::boolean(Some(
|
||||
"When true, ensure the parent processes this message: start an idle parent turn, wake wait_agent, or queue it for a follow-up turn when the parent is busy. Defaults to true for mode=\"interrupt\" and false for mode=\"queue\"/\"enqueue\"."
|
||||
.to_string(),
|
||||
)),
|
||||
),
|
||||
]);
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "send_parent_message".to_string(),
|
||||
description: "Send a string message to this sub-agent's direct parent. `mode` controls queue vs interrupt; `trigger_turn` controls whether the parent must process the message when free."
|
||||
.to_string(),
|
||||
strict: false,
|
||||
defer_loading: None,
|
||||
parameters: JsonSchema::object(properties, Some(vec!["message".to_string()]), Some(false.into())),
|
||||
output_schema: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_resume_agent_tool() -> ToolSpec {
|
||||
let properties = BTreeMap::from([(
|
||||
"id".to_string(),
|
||||
|
||||
@@ -192,6 +192,52 @@ fn followup_task_tool_requires_message_and_has_no_output_schema() {
|
||||
assert_eq!(output_schema, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parent_message_tool_separates_queue_interrupt_and_trigger_turn() {
|
||||
let ToolSpec::Function(ResponsesApiTool {
|
||||
description,
|
||||
parameters,
|
||||
output_schema,
|
||||
..
|
||||
}) = create_parent_message_tool()
|
||||
else {
|
||||
panic!("send_parent_message should be a function tool");
|
||||
};
|
||||
assert_eq!(
|
||||
parameters.schema_type,
|
||||
Some(JsonSchemaType::Single(JsonSchemaPrimitiveType::Object))
|
||||
);
|
||||
let properties = parameters
|
||||
.properties
|
||||
.as_ref()
|
||||
.expect("send_parent_message should use object params");
|
||||
assert!(properties.contains_key("message"));
|
||||
assert!(properties.contains_key("mode"));
|
||||
assert!(properties.contains_key("trigger_turn"));
|
||||
assert!(!properties.contains_key("target"));
|
||||
assert!(!properties.contains_key("items"));
|
||||
assert!(description.contains("direct parent"));
|
||||
assert_eq!(
|
||||
properties
|
||||
.get("mode")
|
||||
.and_then(|schema| schema.enum_values.as_ref()),
|
||||
Some(&vec![json!("queue"), json!("enqueue"), json!("interrupt")])
|
||||
);
|
||||
assert_eq!(
|
||||
properties
|
||||
.get("trigger_turn")
|
||||
.and_then(|schema| schema.description.as_deref()),
|
||||
Some(
|
||||
"When true, ensure the parent processes this message: start an idle parent turn, wake wait_agent, or queue it for a follow-up turn when the parent is busy. Defaults to true for mode=\"interrupt\" and false for mode=\"queue\"/\"enqueue\"."
|
||||
)
|
||||
);
|
||||
assert_eq!(
|
||||
parameters.required.as_ref(),
|
||||
Some(&vec!["message".to_string()])
|
||||
);
|
||||
assert_eq!(output_schema, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_agent_tool_v2_uses_timeout_only_summary_output() {
|
||||
let ToolSpec::Function(ResponsesApiTool {
|
||||
|
||||
@@ -33,6 +33,7 @@ pub use agent_tool::create_close_agent_tool_v1;
|
||||
pub use agent_tool::create_close_agent_tool_v2;
|
||||
pub use agent_tool::create_followup_task_tool;
|
||||
pub use agent_tool::create_list_agents_tool;
|
||||
pub use agent_tool::create_parent_message_tool;
|
||||
pub use agent_tool::create_resume_agent_tool;
|
||||
pub use agent_tool::create_send_input_tool_v1;
|
||||
pub use agent_tool::create_send_message_tool;
|
||||
|
||||
@@ -104,6 +104,7 @@ pub struct ToolsConfig {
|
||||
pub can_request_original_image_detail: bool,
|
||||
pub collab_tools: bool,
|
||||
pub multi_agent_v2: bool,
|
||||
pub parent_message_tool: bool,
|
||||
pub hide_spawn_agent_metadata: bool,
|
||||
pub spawn_agent_usage_hint: bool,
|
||||
pub spawn_agent_usage_hint_text: Option<String>,
|
||||
@@ -201,6 +202,11 @@ impl ToolsConfig {
|
||||
SessionSource::SubAgent(SubAgentSource::Other(label))
|
||||
if label.starts_with("agent_job:")
|
||||
);
|
||||
let include_parent_message_tool = include_multi_agent_v2
|
||||
&& matches!(
|
||||
session_source,
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
|
||||
);
|
||||
|
||||
Self {
|
||||
available_models: available_models.to_vec(),
|
||||
@@ -225,6 +231,7 @@ impl ToolsConfig {
|
||||
can_request_original_image_detail: include_original_image_detail,
|
||||
collab_tools: include_collab_tools,
|
||||
multi_agent_v2: include_multi_agent_v2,
|
||||
parent_message_tool: include_parent_message_tool,
|
||||
hide_spawn_agent_metadata: false,
|
||||
spawn_agent_usage_hint: true,
|
||||
spawn_agent_usage_hint_text: None,
|
||||
|
||||
@@ -34,6 +34,7 @@ use crate::create_list_dir_tool;
|
||||
use crate::create_list_mcp_resource_templates_tool;
|
||||
use crate::create_list_mcp_resources_tool;
|
||||
use crate::create_local_shell_tool;
|
||||
use crate::create_parent_message_tool;
|
||||
use crate::create_read_mcp_resource_tool;
|
||||
use crate::create_report_agent_job_result_tool;
|
||||
use crate::create_request_permissions_tool;
|
||||
@@ -412,6 +413,13 @@ pub fn build_tool_registry_plan(
|
||||
/*supports_parallel_tool_calls*/ false,
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
if config.parent_message_tool {
|
||||
plan.push_spec(
|
||||
create_parent_message_tool(),
|
||||
/*supports_parallel_tool_calls*/ false,
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
}
|
||||
plan.push_spec(
|
||||
create_wait_agent_tool_v2(params.wait_agent_timeouts),
|
||||
/*supports_parallel_tool_calls*/ false,
|
||||
@@ -430,6 +438,7 @@ pub fn build_tool_registry_plan(
|
||||
plan.register_handler("spawn_agent", ToolHandlerKind::SpawnAgentV2);
|
||||
plan.register_handler("send_message", ToolHandlerKind::SendMessageV2);
|
||||
plan.register_handler("followup_task", ToolHandlerKind::FollowupTaskV2);
|
||||
plan.register_handler("send_parent_message", ToolHandlerKind::ParentMessageV2);
|
||||
plan.register_handler("wait_agent", ToolHandlerKind::WaitAgentV2);
|
||||
plan.register_handler("close_agent", ToolHandlerKind::CloseAgentV2);
|
||||
plan.register_handler("list_agents", ToolHandlerKind::ListAgentsV2);
|
||||
|
||||
@@ -22,6 +22,8 @@ use crate::mcp_call_tool_result_output_schema;
|
||||
use codex_app_server_protocol::AppInfo;
|
||||
use codex_features::Feature;
|
||||
use codex_features::Features;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::WebSearchConfig;
|
||||
use codex_protocol::config_types::WebSearchMode;
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
@@ -221,6 +223,7 @@ fn test_build_specs_multi_agent_v2_uses_task_names_and_hides_resume() {
|
||||
"list_agents",
|
||||
],
|
||||
);
|
||||
assert_lacks_tool_name(&tools, "send_parent_message");
|
||||
|
||||
let spawn_agent = find_tool(&tools, "spawn_agent");
|
||||
let ToolSpec::Function(ResponsesApiTool {
|
||||
@@ -285,6 +288,45 @@ fn test_build_specs_multi_agent_v2_uses_task_names_and_hides_resume() {
|
||||
Some(&vec!["target".to_string(), "message".to_string()])
|
||||
);
|
||||
|
||||
let child_tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_info: &model_info,
|
||||
available_models: &available_models,
|
||||
features: &features,
|
||||
image_generation_tool_auth_allowed: true,
|
||||
web_search_mode: Some(WebSearchMode::Cached),
|
||||
session_source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: ThreadId::new(),
|
||||
depth: 1,
|
||||
agent_path: Some(AgentPath::try_from("/root/worker").expect("agent path")),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
sandbox_policy: &SandboxPolicy::DangerFullAccess,
|
||||
windows_sandbox_level: WindowsSandboxLevel::Disabled,
|
||||
});
|
||||
let (child_tools, _) = build_specs(
|
||||
&child_tools_config,
|
||||
/*mcp_tools*/ None,
|
||||
/*deferred_mcp_tools*/ None,
|
||||
&[],
|
||||
);
|
||||
let parent_message = find_tool(&child_tools, "send_parent_message");
|
||||
let ToolSpec::Function(ResponsesApiTool {
|
||||
parameters,
|
||||
output_schema,
|
||||
..
|
||||
}) = &parent_message.spec
|
||||
else {
|
||||
panic!("send_parent_message should be a function tool");
|
||||
};
|
||||
assert_eq!(output_schema, &None);
|
||||
let (properties, required) = expect_object_schema(parameters);
|
||||
assert!(properties.contains_key("message"));
|
||||
assert!(properties.contains_key("mode"));
|
||||
assert!(properties.contains_key("trigger_turn"));
|
||||
assert!(!properties.contains_key("target"));
|
||||
assert_eq!(required, Some(&vec!["message".to_string()]));
|
||||
|
||||
let wait_agent = find_tool(&tools, "wait_agent");
|
||||
let ToolSpec::Function(ResponsesApiTool {
|
||||
parameters,
|
||||
|
||||
@@ -28,6 +28,7 @@ pub enum ToolHandlerKind {
|
||||
RequestPermissions,
|
||||
RequestUserInput,
|
||||
ResumeAgentV1,
|
||||
ParentMessageV2,
|
||||
SendInputV1,
|
||||
SendMessageV2,
|
||||
Shell,
|
||||
|
||||
Reference in New Issue
Block a user