feat: emit events around collab tools (#9095)

Emit the following events around the collab tools. On the `app-server`
this will be under `item/started` and `item/completed`
```
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct CollabAgentSpawnBeginEvent {
    /// Identifier for the collab tool call.
    pub call_id: String,
    /// Thread ID of the sender.
    pub sender_thread_id: ThreadId,
    /// Initial prompt sent to the agent. Can be empty to prevent CoT leaking at the
    /// beginning.
    pub prompt: String,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct CollabAgentSpawnEndEvent {
    /// Identifier for the collab tool call.
    pub call_id: String,
    /// Thread ID of the sender.
    pub sender_thread_id: ThreadId,
    /// Thread ID of the newly spawned agent, if it was created.
    pub new_thread_id: Option<ThreadId>,
    /// Initial prompt sent to the agent. Can be empty to prevent CoT leaking at the
    /// beginning.
    pub prompt: String,
    /// Last known status of the new agent reported to the sender agent.
    pub status: AgentStatus,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct CollabAgentInteractionBeginEvent {
    /// Identifier for the collab tool call.
    pub call_id: String,
    /// Thread ID of the sender.
    pub sender_thread_id: ThreadId,
    /// Thread ID of the receiver.
    pub receiver_thread_id: ThreadId,
    /// Prompt sent from the sender to the receiver. Can be empty to prevent CoT
    /// leaking at the beginning.
    pub prompt: String,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct CollabAgentInteractionEndEvent {
    /// Identifier for the collab tool call.
    pub call_id: String,
    /// Thread ID of the sender.
    pub sender_thread_id: ThreadId,
    /// Thread ID of the receiver.
    pub receiver_thread_id: ThreadId,
    /// Prompt sent from the sender to the receiver. Can be empty to prevent CoT
    /// leaking at the beginning.
    pub prompt: String,
    /// Last known status of the receiver agent reported to the sender agent.
    pub status: AgentStatus,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct CollabWaitingBeginEvent {
    /// Thread ID of the sender.
    pub sender_thread_id: ThreadId,
    /// Thread ID of the receiver.
    pub receiver_thread_id: ThreadId,
    /// ID of the waiting call.
    pub call_id: String,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct CollabWaitingEndEvent {
    /// Thread ID of the sender.
    pub sender_thread_id: ThreadId,
    /// Thread ID of the receiver.
    pub receiver_thread_id: ThreadId,
    /// ID of the waiting call.
    pub call_id: String,
    /// Last known status of the receiver agent reported to the sender agent.
    pub status: AgentStatus,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct CollabCloseBeginEvent {
    /// Identifier for the collab tool call.
    pub call_id: String,
    /// Thread ID of the sender.
    pub sender_thread_id: ThreadId,
    /// Thread ID of the receiver.
    pub receiver_thread_id: ThreadId,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct CollabCloseEndEvent {
    /// Identifier for the collab tool call.
    pub call_id: String,
    /// Thread ID of the sender.
    pub sender_thread_id: ThreadId,
    /// Thread ID of the receiver.
    pub receiver_thread_id: ThreadId,
    /// Last known status of the receiver agent reported to the sender agent before
    /// the close.
    pub status: AgentStatus,
}
```
This commit is contained in:
jif-oai
2026-01-14 17:55:57 +00:00
committed by GitHub
parent 4283a7432b
commit 6a939ed7a4
8 changed files with 435 additions and 54 deletions

View File

@@ -1,4 +1,5 @@
use crate::agent::AgentStatus;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::CodexErr;
@@ -11,6 +12,14 @@ use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use async_trait::async_trait;
use codex_protocol::ThreadId;
use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabCloseBeginEvent;
use codex_protocol::protocol::CollabCloseEndEvent;
use codex_protocol::protocol::CollabWaitingBeginEvent;
use codex_protocol::protocol::CollabWaitingEndEvent;
use serde::Deserialize;
use serde::Serialize;
@@ -40,6 +49,7 @@ impl ToolHandler for CollabHandler {
turn,
tool_name,
payload,
call_id,
..
} = invocation;
@@ -53,10 +63,10 @@ impl ToolHandler for CollabHandler {
};
match tool_name.as_str() {
"spawn_agent" => spawn::handle(session, turn, arguments).await,
"send_input" => send_input::handle(session, arguments).await,
"wait" => wait::handle(session, arguments).await,
"close_agent" => close_agent::handle(session, arguments).await,
"spawn_agent" => spawn::handle(session, turn, call_id, arguments).await,
"send_input" => send_input::handle(session, turn, call_id, arguments).await,
"wait" => wait::handle(session, turn, call_id, arguments).await,
"close_agent" => close_agent::handle(session, turn, call_id, arguments).await,
other => Err(FunctionCallError::RespondToModel(format!(
"unsupported collab tool {other}"
))),
@@ -66,7 +76,6 @@ impl ToolHandler for CollabHandler {
mod spawn {
use super::*;
use crate::codex::Session;
use std::sync::Arc;
#[derive(Debug, Deserialize)]
@@ -82,24 +91,58 @@ mod spawn {
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
call_id: String,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
if args.message.trim().is_empty() {
let prompt = args.message;
if prompt.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be sent to an agent".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
}
.into(),
)
.await;
let config = build_agent_spawn_config(turn.as_ref())?;
let result = session
.services
.agent_control
.spawn_agent(config, args.message)
.spawn_agent(config, prompt.clone())
.await
.map_err(collab_spawn_error)?;
.map_err(collab_spawn_error);
let (new_thread_id, status) = match &result {
Ok(thread_id) => (
Some(*thread_id),
session.services.agent_control.get_status(*thread_id).await,
),
Err(_) => (None, AgentStatus::NotFound),
};
session
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
sender_thread_id: session.conversation_id,
new_thread_id,
prompt,
status,
}
.into(),
)
.await;
let new_thread_id = result?;
let content = serde_json::to_string(&SpawnAgentResult {
agent_id: result.to_string(),
agent_id: new_thread_id.to_string(),
})
.map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize spawn_agent result: {err}"))
@@ -115,7 +158,6 @@ mod spawn {
mod send_input {
use super::*;
use crate::codex::Session;
use std::sync::Arc;
#[derive(Debug, Deserialize)]
@@ -131,22 +173,55 @@ mod send_input {
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
call_id: String,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: SendInputArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
if args.message.trim().is_empty() {
let receiver_thread_id = agent_id(&args.id)?;
let prompt = args.message;
if prompt.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be sent to an agent".to_string(),
));
}
let agent_id_for_err = agent_id;
let submission_id = session
session
.send_event(
&turn,
CollabAgentInteractionBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id,
prompt: prompt.clone(),
}
.into(),
)
.await;
let result = session
.services
.agent_control
.send_prompt(agent_id, args.message)
.send_prompt(receiver_thread_id, prompt.clone())
.await
.map_err(|err| collab_agent_error(agent_id_for_err, err))?;
.map_err(|err| collab_agent_error(receiver_thread_id, err));
let status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
session
.send_event(
&turn,
CollabAgentInteractionEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id,
prompt,
status,
}
.into(),
)
.await;
let submission_id = result?;
let content = serde_json::to_string(&SendInputResult { submission_id }).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize send_input result: {err}"))
@@ -163,7 +238,6 @@ mod send_input {
mod wait {
use super::*;
use crate::agent::status::is_final;
use crate::codex::Session;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
@@ -183,10 +257,12 @@ mod wait {
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
call_id: String,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: WaitArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
let receiver_thread_id = agent_id(&args.id)?;
// Validate timeout.
let timeout_ms = args.timeout_ms.unwrap_or(DEFAULT_WAIT_TIMEOUT_MS);
@@ -199,14 +275,85 @@ mod wait {
ms => ms.min(MAX_WAIT_TIMEOUT_MS),
};
let agent_id_for_err = agent_id;
let mut status_rx = session
session
.send_event(
&turn,
CollabWaitingBeginEvent {
sender_thread_id: session.conversation_id,
receiver_thread_id,
call_id: call_id.clone(),
}
.into(),
)
.await;
let status_rx = match session
.services
.agent_control
.subscribe_status(agent_id)
.subscribe_status(receiver_thread_id)
.await
.map_err(|err| collab_agent_error(agent_id_for_err, err))?;
{
Ok(status_rx) => status_rx,
Err(err) => {
let status = session
.services
.agent_control
.get_status(receiver_thread_id)
.await;
session
.send_event(
&turn,
CollabWaitingEndEvent {
sender_thread_id: session.conversation_id,
receiver_thread_id,
call_id: call_id.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(receiver_thread_id, err));
}
};
let result =
wait_for_status(session.as_ref(), receiver_thread_id, timeout_ms, status_rx).await;
session
.send_event(
&turn,
CollabWaitingEndEvent {
sender_thread_id: session.conversation_id,
receiver_thread_id,
call_id,
status: result.status.clone(),
}
.into(),
)
.await;
if matches!(result.status, AgentStatus::NotFound) {
return Err(FunctionCallError::RespondToModel(format!(
"agent with id {receiver_thread_id} not found"
)));
}
let content = serde_json::to_string(&result).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize wait result: {err}"))
})?;
let success = !result.timed_out && !matches!(result.status, AgentStatus::Errored(_));
Ok(ToolOutput::Function {
content,
success: Some(success),
content_items: None,
})
}
async fn wait_for_status(
session: &Session,
agent_id: ThreadId,
timeout_ms: i64,
mut status_rx: tokio::sync::watch::Receiver<AgentStatus>,
) -> WaitResult {
// Get last known status.
let mut status = status_rx.borrow_and_update().clone();
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
@@ -231,31 +378,12 @@ mod wait {
}
};
if matches!(status, AgentStatus::NotFound) {
return Err(FunctionCallError::RespondToModel(format!(
"agent with id {agent_id} not found"
)));
}
let result = WaitResult { status, timed_out };
let content = serde_json::to_string(&result).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize wait result: {err}"))
})?;
let success = !result.timed_out && !matches!(result.status, AgentStatus::Errored(_));
Ok(ToolOutput::Function {
content,
success: Some(success),
content_items: None,
})
WaitResult { status, timed_out }
}
}
pub mod close_agent {
use super::*;
use crate::codex::Session;
use std::sync::Arc;
#[derive(Debug, Deserialize, Serialize)]
@@ -265,28 +393,71 @@ pub mod close_agent {
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
call_id: String,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
let agent_id_for_err = agent_id;
let mut status_rx = session
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
.map_err(|err| collab_agent_error(agent_id_for_err, err))?;
let status = status_rx.borrow_and_update().clone();
if !matches!(status, AgentStatus::Shutdown) {
let agent_id_for_err = agent_id;
let _ = session
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
};
let result = if !matches!(status, AgentStatus::Shutdown) {
session
.services
.agent_control
.shutdown_agent(agent_id)
.await
.map_err(|err| collab_agent_error(agent_id_for_err, err))?;
}
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ())
} else {
Ok(())
};
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
status: status.clone(),
}
.into(),
)
.await;
result?;
let content = serde_json::to_string(&CloseAgentResult { status }).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize close_agent result: {err}"))