Compare commits

...

5 Commits

Author SHA1 Message Date
jif-oai
651d6852a2 feat: basic collab rendering 2026-01-12 15:27:59 +00:00
jif-oai
e3cf74885a feat: emit events around collab tools 2026-01-12 14:53:09 +00:00
jif-oai
9659583559 feat: add close tool implementation for collab (#9090)
Pretty straight forward. A known follow-up will be to drop it from the
AgentControl
2026-01-12 13:21:46 +00:00
jif-oai
623707ab58 feat: add wait tool implementation for collab (#9088)
Add implementation for the `wait` tool.

For this we consider all status different from `PendingInit` and
`Running` as terminal. The `wait` tool call will return either after a
given timeout or when the tool reaches a non-terminal status.

A few points to note:
* The usage of a channel is preferred to prevent some races (just
looping on `get_status()` could "miss" a terminal status)
* The order of operations is very important, we need to first subscribe
and then check the last known status to prevent race conditions
* If the channel gets dropped, we return an error on purpose
2026-01-12 12:16:24 +00:00
jif-oai
86f81ca010 feat: testing harness for collab 1 (#8983) 2026-01-12 11:17:05 +00:00
15 changed files with 1151 additions and 117 deletions

View File

@@ -9,6 +9,7 @@ use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::watch;
/// Control-plane handle for multi-agent operations.
/// `AgentControl` is held by each session (via `SessionServices`). It provides capability to
@@ -27,7 +28,6 @@ impl AgentControl {
Self { manager }
}
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
/// Spawn a new agent thread and submit the initial prompt.
///
/// If `headless` is true, a background drain task is spawned to prevent unbounded event growth
@@ -50,7 +50,6 @@ impl AgentControl {
Ok(new_thread.thread_id)
}
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
/// Send a `user` prompt to an existing agent thread.
pub(crate) async fn send_prompt(
&self,
@@ -69,7 +68,13 @@ impl AgentControl {
.await
}
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
/// Submit a shutdown request to an existing agent thread.
pub(crate) async fn shutdown_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
let state = self.upgrade()?;
state.send_op(agent_id, Op::Shutdown {}).await
}
#[allow(dead_code)] // Will be used for collab tools.
/// Fetch the last known status for `agent_id`, returning `NotFound` when unavailable.
pub(crate) async fn get_status(&self, agent_id: ThreadId) -> AgentStatus {
let Ok(state) = self.upgrade() else {
@@ -82,6 +87,16 @@ impl AgentControl {
thread.agent_status().await
}
/// Subscribe to status updates for `agent_id`, yielding the latest value and changes.
pub(crate) async fn subscribe_status(
&self,
agent_id: ThreadId,
) -> CodexResult<watch::Receiver<AgentStatus>> {
let state = self.upgrade()?;
let thread = state.get_thread(agent_id).await?;
Ok(thread.subscribe_status())
}
fn upgrade(&self) -> CodexResult<Arc<ThreadManagerState>> {
self.manager
.upgrade()
@@ -114,13 +129,63 @@ fn spawn_headless_drain(thread: Arc<CodexThread>) {
#[cfg(test)]
mod tests {
use super::*;
use crate::CodexAuth;
use crate::ThreadManager;
use crate::agent::agent_status_from_event;
use crate::config::Config;
use crate::config::ConfigBuilder;
use assert_matches::assert_matches;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
async fn test_config() -> (TempDir, Config) {
let home = TempDir::new().expect("create temp dir");
let config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.build()
.await
.expect("load default test config");
(home, config)
}
struct AgentControlHarness {
_home: TempDir,
config: Config,
manager: ThreadManager,
control: AgentControl,
}
impl AgentControlHarness {
async fn new() -> Self {
let (home, config) = test_config().await;
let manager = ThreadManager::with_models_provider_and_home(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
Self {
_home: home,
config,
manager,
control,
}
}
async fn start_thread(&self) -> (ThreadId, Arc<CodexThread>) {
let new_thread = self
.manager
.start_thread(self.config.clone())
.await
.expect("start thread");
(new_thread.thread_id, new_thread.thread)
}
}
#[tokio::test]
async fn send_prompt_errors_when_manager_dropped() {
@@ -185,4 +250,135 @@ mod tests {
let status = agent_status_from_event(&EventMsg::ShutdownComplete);
assert_eq!(status, Some(AgentStatus::Shutdown));
}
#[tokio::test]
async fn spawn_agent_errors_when_manager_dropped() {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.spawn_agent(config, "hello".to_string(), false)
.await
.expect_err("spawn_agent should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
);
}
#[tokio::test]
async fn send_prompt_errors_when_thread_missing() {
let harness = AgentControlHarness::new().await;
let thread_id = ThreadId::new();
let err = harness
.control
.send_prompt(thread_id, "hello".to_string())
.await
.expect_err("send_prompt should fail for missing thread");
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
}
#[tokio::test]
async fn get_status_returns_not_found_for_missing_thread() {
let harness = AgentControlHarness::new().await;
let status = harness.control.get_status(ThreadId::new()).await;
assert_eq!(status, AgentStatus::NotFound);
}
#[tokio::test]
async fn get_status_returns_pending_init_for_new_thread() {
let harness = AgentControlHarness::new().await;
let (thread_id, _) = harness.start_thread().await;
let status = harness.control.get_status(thread_id).await;
assert_eq!(status, AgentStatus::PendingInit);
}
#[tokio::test]
async fn subscribe_status_errors_for_missing_thread() {
let harness = AgentControlHarness::new().await;
let thread_id = ThreadId::new();
let err = harness
.control
.subscribe_status(thread_id)
.await
.expect_err("subscribe_status should fail for missing thread");
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
}
#[tokio::test]
async fn subscribe_status_updates_on_shutdown() {
let harness = AgentControlHarness::new().await;
let (thread_id, thread) = harness.start_thread().await;
let mut status_rx = harness
.control
.subscribe_status(thread_id)
.await
.expect("subscribe_status should succeed");
assert_eq!(status_rx.borrow().clone(), AgentStatus::PendingInit);
let _ = thread
.submit(Op::Shutdown {})
.await
.expect("shutdown should submit");
let _ = status_rx.changed().await;
assert_eq!(status_rx.borrow().clone(), AgentStatus::Shutdown);
}
#[tokio::test]
async fn send_prompt_submits_user_message() {
let harness = AgentControlHarness::new().await;
let (thread_id, _thread) = harness.start_thread().await;
let submission_id = harness
.control
.send_prompt(thread_id, "hello from tests".to_string())
.await
.expect("send_prompt should succeed");
assert!(!submission_id.is_empty());
let expected = (
thread_id,
Op::UserInput {
items: vec![UserInput::Text {
text: "hello from tests".to_string(),
}],
final_output_json_schema: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn spawn_agent_creates_thread_and_sends_prompt() {
let harness = AgentControlHarness::new().await;
let thread_id = harness
.control
.spawn_agent(harness.config.clone(), "spawned".to_string(), false)
.await
.expect("spawn_agent should succeed");
let _thread = harness
.manager
.get_thread(thread_id)
.await
.expect("thread should be registered");
let expected = (
thread_id,
Op::UserInput {
items: vec![UserInput::Text {
text: "spawned".to_string(),
}],
final_output_json_schema: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
}
}

View File

@@ -13,3 +13,7 @@ pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option<AgentStatus> {
_ => None,
}
}
pub(crate) fn is_final(status: &AgentStatus) -> bool {
!matches!(status, AgentStatus::PendingInit | AgentStatus::Running)
}

View File

@@ -164,6 +164,7 @@ use codex_protocol::protocol::InitialHistory;
use codex_protocol::user_input::UserInput;
use codex_utils_readiness::Readiness;
use codex_utils_readiness::ReadinessFlag;
use tokio::sync::watch;
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
@@ -172,7 +173,7 @@ pub struct Codex {
pub(crate) tx_sub: Sender<Submission>,
pub(crate) rx_event: Receiver<Event>,
// Last known status of the agent.
pub(crate) agent_status: Arc<RwLock<AgentStatus>>,
pub(crate) agent_status: watch::Receiver<AgentStatus>,
}
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
@@ -275,7 +276,7 @@ impl Codex {
// Generate a unique ID for the lifetime of this Codex session.
let session_source_clone = session_configuration.session_source.clone();
let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit));
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);
let session = Session::new(
session_configuration,
@@ -284,7 +285,7 @@ impl Codex {
models_manager.clone(),
exec_policy,
tx_event.clone(),
Arc::clone(&agent_status),
agent_status_tx.clone(),
conversation_history,
session_source_clone,
skills_manager,
@@ -303,7 +304,7 @@ impl Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event,
agent_status,
agent_status: agent_status_rx,
};
#[allow(deprecated)]
@@ -345,8 +346,7 @@ impl Codex {
}
pub(crate) async fn agent_status(&self) -> AgentStatus {
let status = self.agent_status.read().await;
status.clone()
self.agent_status.borrow().clone()
}
}
@@ -354,9 +354,9 @@ impl Codex {
///
/// A session has at most 1 running task at a time, and can be interrupted by user input.
pub(crate) struct Session {
conversation_id: ThreadId,
pub(crate) conversation_id: ThreadId,
tx_event: Sender<Event>,
agent_status: Arc<RwLock<AgentStatus>>,
agent_status: watch::Sender<AgentStatus>,
state: Mutex<SessionState>,
/// The set of enabled features should be invariant for the lifetime of the
/// session.
@@ -557,7 +557,7 @@ impl Session {
models_manager: Arc<ModelsManager>,
exec_policy: ExecPolicyManager,
tx_event: Sender<Event>,
agent_status: Arc<RwLock<AgentStatus>>,
agent_status: watch::Sender<AgentStatus>,
initial_history: InitialHistory,
session_source: SessionSource,
skills_manager: Arc<SkillsManager>,
@@ -703,7 +703,7 @@ impl Session {
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
agent_status: Arc::clone(&agent_status),
agent_status,
state: Mutex::new(state),
features: config.features.clone(),
active_turn: Mutex::new(None),
@@ -1026,8 +1026,7 @@ impl Session {
pub(crate) async fn send_event_raw(&self, event: Event) {
// Record the last known agent status.
if let Some(status) = agent_status_from_event(&event.msg) {
let mut guard = self.agent_status.write().await;
*guard = status;
self.agent_status.send_replace(status);
}
// Persist the event into rollout (recorder filters as needed)
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
@@ -1045,8 +1044,7 @@ impl Session {
pub(crate) async fn send_event_raw_flushed(&self, event: Event) {
// Record the last known agent status.
if let Some(status) = agent_status_from_event(&event.msg) {
let mut guard = self.agent_status.write().await;
*guard = status;
self.agent_status.send_replace(status);
}
self.persist_rollout_items(&[RolloutItem::EventMsg(event.msg.clone())])
.await;
@@ -3494,7 +3492,7 @@ mod tests {
));
let agent_control = AgentControl::default();
let exec_policy = ExecPolicyManager::default();
let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit));
let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit);
let model = ModelsManager::get_model_offline(config.model.as_deref());
let session_configuration = SessionConfiguration {
provider: config.model_provider.clone(),
@@ -3557,7 +3555,7 @@ mod tests {
let session = Session {
conversation_id,
tx_event,
agent_status: Arc::clone(&agent_status),
agent_status: agent_status_tx,
state: Mutex::new(state),
features: config.features.clone(),
active_turn: Mutex::new(None),
@@ -3588,7 +3586,7 @@ mod tests {
));
let agent_control = AgentControl::default();
let exec_policy = ExecPolicyManager::default();
let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit));
let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit);
let model = ModelsManager::get_model_offline(config.model.as_deref());
let session_configuration = SessionConfiguration {
provider: config.model_provider.clone(),
@@ -3651,7 +3649,7 @@ mod tests {
let session = Arc::new(Session {
conversation_id,
tx_event,
agent_status: Arc::clone(&agent_status),
agent_status: agent_status_tx,
state: Mutex::new(state),
features: config.features.clone(),
active_turn: Mutex::new(None),

View File

@@ -87,7 +87,7 @@ pub(crate) async fn run_codex_thread_interactive(
next_id: AtomicU64::new(0),
tx_sub: tx_ops,
rx_event: rx_sub,
agent_status: Arc::clone(&codex.agent_status),
agent_status: codex.agent_status.clone(),
})
}
@@ -129,7 +129,7 @@ pub(crate) async fn run_codex_thread_one_shot(
// Bridge events so we can observe completion and shut down automatically.
let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let ops_tx = io.tx_sub.clone();
let agent_status = Arc::clone(&io.agent_status);
let agent_status = io.agent_status.clone();
let io_for_bridge = io;
tokio::spawn(async move {
while let Ok(event) = io_for_bridge.next_event().await {
@@ -363,20 +363,23 @@ mod tests {
use super::*;
use async_channel::bounded;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::RawResponseItemEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use pretty_assertions::assert_eq;
use tokio::sync::watch;
#[tokio::test]
async fn forward_events_cancelled_while_send_blocked_shuts_down_delegate() {
let (tx_events, rx_events) = bounded(1);
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
let codex = Arc::new(Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event: rx_events,
agent_status: Default::default(),
agent_status,
});
let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;

View File

@@ -5,6 +5,7 @@ use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
use std::path::PathBuf;
use tokio::sync::watch;
pub struct CodexThread {
codex: Codex,
@@ -38,6 +39,10 @@ impl CodexThread {
self.codex.agent_status().await
}
pub(crate) fn subscribe_status(&self) -> watch::Receiver<AgentStatus> {
self.codex.agent_status.clone()
}
pub fn rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}

View File

@@ -90,6 +90,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::SkillsUpdateAvailable => false,
| EventMsg::SkillsUpdateAvailable
| EventMsg::CollabInteraction(_) => false,
}
}

View File

@@ -56,6 +56,10 @@ pub(crate) struct ThreadManagerState {
models_manager: Arc<ModelsManager>,
skills_manager: Arc<SkillsManager>,
session_source: SessionSource,
#[cfg(any(test, feature = "test-support"))]
#[allow(dead_code)]
// Captures submitted ops for testing purpose.
ops_log: Arc<std::sync::Mutex<Vec<(ThreadId, Op)>>>,
}
impl ThreadManager {
@@ -74,6 +78,8 @@ impl ThreadManager {
skills_manager: Arc::new(SkillsManager::new(codex_home)),
auth_manager,
session_source,
#[cfg(any(test, feature = "test-support"))]
ops_log: Arc::new(std::sync::Mutex::new(Vec::new())),
}),
#[cfg(any(test, feature = "test-support"))]
_test_codex_home_guard: None,
@@ -111,6 +117,8 @@ impl ThreadManager {
skills_manager: Arc::new(SkillsManager::new(codex_home)),
auth_manager,
session_source: SessionSource::Exec,
#[cfg(any(test, feature = "test-support"))]
ops_log: Arc::new(std::sync::Mutex::new(Vec::new())),
}),
_test_codex_home_guard: None,
}
@@ -202,9 +210,19 @@ impl ThreadManager {
.await
}
fn agent_control(&self) -> AgentControl {
pub(crate) fn agent_control(&self) -> AgentControl {
AgentControl::new(Arc::downgrade(&self.state))
}
#[cfg(any(test, feature = "test-support"))]
#[allow(dead_code)]
pub(crate) fn captured_ops(&self) -> Vec<(ThreadId, Op)> {
self.state
.ops_log
.lock()
.map(|log| log.clone())
.unwrap_or_default()
}
}
impl ThreadManagerState {
@@ -217,7 +235,14 @@ impl ThreadManagerState {
}
pub(crate) async fn send_op(&self, thread_id: ThreadId, op: Op) -> CodexResult<String> {
self.get_thread(thread_id).await?.submit(op).await
let thread = self.get_thread(thread_id).await?;
#[cfg(any(test, feature = "test-support"))]
{
if let Ok(mut log) = self.ops_log.lock() {
log.push((thread_id, op.clone()));
}
}
thread.submit(op).await
}
#[allow(dead_code)] // Used by upcoming multi-agent tooling.

View File

@@ -1,3 +1,4 @@
use crate::agent::AgentStatus;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::CodexErr;
@@ -10,30 +11,16 @@ use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use async_trait::async_trait;
use codex_protocol::ThreadId;
use codex_protocol::protocol::CollabInteractionEvent;
use codex_protocol::protocol::EventMsg;
use serde::Deserialize;
use serde::Serialize;
pub struct CollabHandler;
pub(crate) const DEFAULT_WAIT_TIMEOUT_MS: i64 = 30_000;
pub(crate) const MAX_WAIT_TIMEOUT_MS: i64 = 300_000;
#[derive(Debug, Deserialize)]
struct SpawnAgentArgs {
message: String,
}
#[derive(Debug, Deserialize)]
struct SendInputArgs {
id: String,
message: String,
}
#[derive(Debug, Deserialize)]
struct WaitArgs {
id: String,
timeout_ms: Option<i64>,
}
#[derive(Debug, Deserialize)]
struct CloseAgentArgs {
id: String,
@@ -68,10 +55,10 @@ impl ToolHandler for CollabHandler {
};
match tool_name.as_str() {
"spawn_agent" => handle_spawn_agent(session, turn, arguments).await,
"send_input" => handle_send_input(session, arguments).await,
"wait" => handle_wait(arguments).await,
"close_agent" => handle_close_agent(arguments).await,
"spawn_agent" => spawn::handle(session, turn, arguments).await,
"send_input" => send_input::handle(session, turn, arguments).await,
"wait" => wait::handle(session, turn, arguments).await,
"close_agent" => close_agent::handle(session, turn, arguments).await,
other => Err(FunctionCallError::RespondToModel(format!(
"unsupported collab tool {other}"
))),
@@ -79,84 +66,309 @@ impl ToolHandler for CollabHandler {
}
}
async fn handle_spawn_agent(
session: std::sync::Arc<crate::codex::Session>,
turn: std::sync::Arc<TurnContext>,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
if args.message.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be send to an agent".to_string(),
));
}
let config = build_agent_spawn_config(turn.as_ref())?;
let result = session
.services
.agent_control
.spawn_agent(config, args.message, true)
.await
.map_err(|err| FunctionCallError::Fatal(err.to_string()))?;
mod spawn {
use super::*;
use crate::codex::Session;
use std::sync::Arc;
Ok(ToolOutput::Function {
content: format!("agent_id: {result}"),
success: Some(true),
content_items: None,
})
#[derive(Debug, Deserialize)]
struct SpawnAgentArgs {
message: String,
}
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
if args.message.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be send to an agent".to_string(),
));
}
let config = build_agent_spawn_config(turn.as_ref())?;
let result = session
.services
.agent_control
.spawn_agent(config, args.message.clone(), true)
.await
.map_err(|err| FunctionCallError::Fatal(err.to_string()))?;
emit_event(session, turn, args.message, result).await;
Ok(ToolOutput::Function {
content: format!("agent_id: {result}"),
success: Some(true),
content_items: None,
})
}
async fn emit_event(
session: Arc<Session>,
turn: Arc<TurnContext>,
prompt: String,
new_id: ThreadId,
) {
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::AgentSpawned {
sender_id: session.conversation_id,
new_id,
prompt,
}),
)
.await
}
}
async fn handle_send_input(
session: std::sync::Arc<crate::codex::Session>,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: SendInputArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
if args.message.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be send to an agent".to_string(),
));
mod send_input {
use super::*;
use crate::codex::Session;
use std::sync::Arc;
#[derive(Debug, Deserialize)]
struct SendInputArgs {
id: String,
message: String,
}
let content = session
.services
.agent_control
.send_prompt(agent_id, args.message)
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: SendInputArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
if args.message.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be send to an agent".to_string(),
));
}
let content = session
.services
.agent_control
.send_prompt(agent_id, args.message.clone())
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
err => FunctionCallError::Fatal(err.to_string()),
})?;
emit_event(session, turn, agent_id, args.message).await;
Ok(ToolOutput::Function {
content,
success: Some(true),
content_items: None,
})
}
async fn emit_event(
session: Arc<Session>,
turn: Arc<TurnContext>,
receiver_id: ThreadId,
prompt: String,
) {
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::AgentInteraction {
sender_id: session.conversation_id,
receiver_id,
prompt,
}),
)
.await
}
}
mod wait {
use super::*;
use crate::agent::status::is_final;
use crate::codex::Session;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tokio::time::timeout_at;
#[derive(Debug, Deserialize)]
struct WaitArgs {
id: String,
timeout_ms: Option<i64>,
}
#[derive(Debug, Serialize)]
struct WaitResult {
status: AgentStatus,
timed_out: bool,
}
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: WaitArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
// Validate timeout.
let timeout_ms = args.timeout_ms.unwrap_or(DEFAULT_WAIT_TIMEOUT_MS);
let timeout_ms = match timeout_ms {
ms if ms <= 0 => {
return Err(FunctionCallError::RespondToModel(
"timeout_ms must be greater than zero".to_owned(),
));
}
err => FunctionCallError::Fatal(err.to_string()),
ms => ms.min(MAX_WAIT_TIMEOUT_MS),
};
let mut status_rx = session
.services
.agent_control
.subscribe_status(agent_id)
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
err => FunctionCallError::Fatal(err.to_string()),
})?;
let waiting_id = format!("collab-waiting-{}", uuid::Uuid::new_v4());
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::WaitingBegin {
sender_id: session.conversation_id,
receiver_id: agent_id,
waiting_id: waiting_id.clone(),
}),
)
.await;
// Get last known status.
let mut status = status_rx.borrow_and_update().clone();
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
let timed_out = loop {
if is_final(&status) {
break false;
}
match timeout_at(deadline, status_rx.changed()).await {
Ok(Ok(())) => status = status_rx.borrow().clone(),
Ok(Err(_)) => {
let last_status = session.services.agent_control.get_status(agent_id).await;
if last_status != AgentStatus::NotFound {
// On-purpose we keep the last known status if the agent gets dropped. This
// event is not supposed to happen.
status = last_status;
}
break false;
}
Err(_) => break true,
}
};
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::WaitingEnd {
sender_id: session.conversation_id,
receiver_id: agent_id,
waiting_id,
status: status.clone(),
}),
)
.await;
if matches!(status, AgentStatus::NotFound) {
return Err(FunctionCallError::RespondToModel(format!(
"agent with id {agent_id} not found"
)));
}
let result = WaitResult { status, timed_out };
let content = serde_json::to_string(&result).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize wait result: {err}"))
})?;
Ok(ToolOutput::Function {
content,
success: Some(true),
content_items: None,
})
}
async fn handle_wait(arguments: String) -> Result<ToolOutput, FunctionCallError> {
let args: WaitArgs = parse_arguments(&arguments)?;
let _agent_id = agent_id(&args.id)?;
let timeout_ms = args.timeout_ms.unwrap_or(DEFAULT_WAIT_TIMEOUT_MS);
if timeout_ms <= 0 {
return Err(FunctionCallError::RespondToModel(
"timeout_ms must be greater than zero".to_string(),
));
Ok(ToolOutput::Function {
content,
success: Some(!result.timed_out),
content_items: None,
})
}
let _timeout_ms = timeout_ms.min(MAX_WAIT_TIMEOUT_MS);
// TODO(jif): implement agent wait once lifecycle tracking is wired up.
Err(FunctionCallError::Fatal("wait not implemented".to_string()))
}
async fn handle_close_agent(arguments: String) -> Result<ToolOutput, FunctionCallError> {
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let _agent_id = agent_id(&args.id)?;
// TODO(jif): implement agent shutdown and return the final status.
Err(FunctionCallError::Fatal(
"close_agent not implemented".to_string(),
))
pub mod close_agent {
use super::*;
use crate::codex::Session;
use std::sync::Arc;
#[derive(Debug, Deserialize, Serialize)]
pub(super) struct CloseAgentResult {
pub(super) status: AgentStatus,
}
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
let status = session.services.agent_control.get_status(agent_id).await;
if !matches!(status, AgentStatus::Shutdown) {
let _ = session
.services
.agent_control
.shutdown_agent(agent_id)
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
err => FunctionCallError::Fatal(err.to_string()),
})?;
}
emit_event(session, turn, agent_id, status.clone()).await;
let content = serde_json::to_string(&CloseAgentResult { status }).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize close_agent result: {err}"))
})?;
Ok(ToolOutput::Function {
content,
success: Some(true),
content_items: None,
})
}
async fn emit_event(
session: Arc<Session>,
turn: Arc<TurnContext>,
receiver_id: ThreadId,
status: AgentStatus,
) {
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::Close {
sender_id: session.conversation_id,
receiver_id,
status,
}),
)
.await
}
}
fn agent_id(id: &str) -> Result<ThreadId, FunctionCallError> {
@@ -192,3 +404,384 @@ fn build_agent_spawn_config(turn: &TurnContext) -> Result<Config, FunctionCallEr
})?;
Ok(config)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CodexAuth;
use crate::ThreadManager;
use crate::built_in_model_providers;
use crate::codex::make_session_and_context;
use crate::config::types::ShellEnvironmentPolicy;
use crate::function_tool::FunctionCallError;
use crate::protocol::AskForApproval;
use crate::protocol::Op;
use crate::protocol::SandboxPolicy;
use crate::turn_diff_tracker::TurnDiffTracker;
use codex_protocol::ThreadId;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::timeout;
fn invocation(
session: Arc<crate::codex::Session>,
turn: Arc<TurnContext>,
tool_name: &str,
payload: ToolPayload,
) -> ToolInvocation {
ToolInvocation {
session,
turn,
tracker: Arc::new(Mutex::new(TurnDiffTracker::default())),
call_id: "call-1".to_string(),
tool_name: tool_name.to_string(),
payload,
}
}
fn function_payload(args: serde_json::Value) -> ToolPayload {
ToolPayload::Function {
arguments: args.to_string(),
}
}
fn thread_manager() -> ThreadManager {
ThreadManager::with_models_provider(
CodexAuth::from_api_key("dummy"),
built_in_model_providers()["openai"].clone(),
)
}
#[tokio::test]
async fn handler_rejects_non_function_payloads() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"spawn_agent",
ToolPayload::Custom {
input: "hello".to_string(),
},
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("payload should be rejected");
};
assert_eq!(
err,
FunctionCallError::RespondToModel(
"collab handler received unsupported payload".to_string()
)
);
}
#[tokio::test]
async fn handler_rejects_unknown_tool() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"unknown_tool",
function_payload(json!({})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("tool should be rejected");
};
assert_eq!(
err,
FunctionCallError::RespondToModel("unsupported collab tool unknown_tool".to_string())
);
}
#[tokio::test]
async fn spawn_agent_rejects_empty_message() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"spawn_agent",
function_payload(json!({"message": " "})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("empty message should be rejected");
};
assert_eq!(
err,
FunctionCallError::RespondToModel(
"Empty message can't be send to an agent".to_string()
)
);
}
#[tokio::test]
async fn spawn_agent_errors_when_manager_dropped() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"spawn_agent",
function_payload(json!({"message": "hello"})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("spawn should fail without a manager");
};
assert_eq!(
err,
FunctionCallError::Fatal("unsupported operation: thread manager dropped".to_string())
);
}
#[tokio::test]
async fn send_input_rejects_empty_message() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"send_input",
function_payload(json!({"id": ThreadId::new().to_string(), "message": ""})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("empty message should be rejected");
};
assert_eq!(
err,
FunctionCallError::RespondToModel(
"Empty message can't be send to an agent".to_string()
)
);
}
#[tokio::test]
async fn send_input_rejects_invalid_id() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"send_input",
function_payload(json!({"id": "not-a-uuid", "message": "hi"})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("invalid id should be rejected");
};
let FunctionCallError::RespondToModel(msg) = err else {
panic!("expected respond-to-model error");
};
assert!(msg.starts_with("invalid agent id not-a-uuid:"));
}
#[tokio::test]
async fn send_input_reports_missing_agent() {
let (mut session, turn) = make_session_and_context().await;
let manager = thread_manager();
session.services.agent_control = manager.agent_control();
let agent_id = ThreadId::new();
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"send_input",
function_payload(json!({"id": agent_id.to_string(), "message": "hi"})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("missing agent should be reported");
};
assert_eq!(
err,
FunctionCallError::RespondToModel(format!("agent with id {agent_id} not found"))
);
}
#[tokio::test]
async fn wait_rejects_non_positive_timeout() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"wait",
function_payload(json!({"id": ThreadId::new().to_string(), "timeout_ms": 0})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("non-positive timeout should be rejected");
};
assert_eq!(
err,
FunctionCallError::RespondToModel("timeout_ms must be greater than zero".to_string())
);
}
#[tokio::test]
async fn wait_rejects_invalid_id() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"wait",
function_payload(json!({"id": "invalid"})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("invalid id should be rejected");
};
let FunctionCallError::RespondToModel(msg) = err else {
panic!("expected respond-to-model error");
};
assert!(msg.starts_with("invalid agent id invalid:"));
}
#[tokio::test]
async fn wait_times_out_when_status_is_not_final() {
let (mut session, turn) = make_session_and_context().await;
let manager = thread_manager();
session.services.agent_control = manager.agent_control();
let config = turn.client.config().as_ref().clone();
let thread = manager.start_thread(config).await.expect("start thread");
let agent_id = thread.thread_id;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"wait",
function_payload(json!({"id": agent_id.to_string(), "timeout_ms": 10})),
);
let output = CollabHandler
.handle(invocation)
.await
.expect("wait should succeed");
let ToolOutput::Function {
content, success, ..
} = output
else {
panic!("expected function output");
};
assert_eq!(content, r#"{"status":"pending_init","timed_out":true}"#);
assert_eq!(success, Some(false));
let _ = thread
.thread
.submit(Op::Shutdown {})
.await
.expect("shutdown should submit");
}
#[tokio::test]
async fn wait_returns_final_status_without_timeout() {
let (mut session, turn) = make_session_and_context().await;
let manager = thread_manager();
session.services.agent_control = manager.agent_control();
let config = turn.client.config().as_ref().clone();
let thread = manager.start_thread(config).await.expect("start thread");
let agent_id = thread.thread_id;
let mut status_rx = manager
.agent_control()
.subscribe_status(agent_id)
.await
.expect("subscribe should succeed");
let _ = thread
.thread
.submit(Op::Shutdown {})
.await
.expect("shutdown should submit");
let _ = timeout(Duration::from_secs(1), status_rx.changed())
.await
.expect("shutdown status should arrive");
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"wait",
function_payload(json!({"id": agent_id.to_string(), "timeout_ms": 1000})),
);
let output = CollabHandler
.handle(invocation)
.await
.expect("wait should succeed");
let ToolOutput::Function {
content, success, ..
} = output
else {
panic!("expected function output");
};
assert_eq!(content, r#"{"status":"shutdown","timed_out":false}"#);
assert_eq!(success, Some(true));
}
#[tokio::test]
async fn close_agent_submits_shutdown_and_returns_status() {
let (mut session, turn) = make_session_and_context().await;
let manager = thread_manager();
session.services.agent_control = manager.agent_control();
let config = turn.client.config().as_ref().clone();
let thread = manager.start_thread(config).await.expect("start thread");
let agent_id = thread.thread_id;
let status_before = manager.agent_control().get_status(agent_id).await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"close_agent",
function_payload(json!({"id": agent_id.to_string()})),
);
let output = CollabHandler
.handle(invocation)
.await
.expect("close_agent should succeed");
let ToolOutput::Function {
content, success, ..
} = output
else {
panic!("expected function output");
};
let result: close_agent::CloseAgentResult =
serde_json::from_str(&content).expect("close_agent result should be json");
assert_eq!(result.status, status_before);
assert_eq!(success, Some(true));
let ops = manager.captured_ops();
let submitted_shutdown = ops
.iter()
.any(|(id, op)| *id == agent_id && matches!(op, Op::Shutdown));
assert_eq!(submitted_shutdown, true);
}
#[tokio::test]
async fn build_agent_spawn_config_uses_turn_context_values() {
let (_session, mut turn) = make_session_and_context().await;
turn.developer_instructions = Some("dev".to_string());
turn.base_instructions = Some("base".to_string());
turn.compact_prompt = Some("compact".to_string());
turn.user_instructions = Some("user".to_string());
turn.shell_environment_policy = ShellEnvironmentPolicy {
use_profile: true,
..ShellEnvironmentPolicy::default()
};
let temp_dir = tempfile::tempdir().expect("temp dir");
turn.cwd = temp_dir.path().to_path_buf();
turn.codex_linux_sandbox_exe = Some(PathBuf::from("/bin/echo"));
turn.approval_policy = AskForApproval::Never;
turn.sandbox_policy = SandboxPolicy::DangerFullAccess;
let config = build_agent_spawn_config(&turn).expect("spawn config");
let mut expected = (*turn.client.config()).clone();
expected.model = Some(turn.client.get_model());
expected.model_provider = turn.client.get_provider();
expected.model_reasoning_effort = turn.client.get_reasoning_effort();
expected.model_reasoning_summary = turn.client.get_reasoning_summary();
expected.developer_instructions = turn.developer_instructions.clone();
expected.base_instructions = turn.base_instructions.clone();
expected.compact_prompt = turn.compact_prompt.clone();
expected.user_instructions = turn.user_instructions.clone();
expected.shell_environment_policy = turn.shell_environment_policy.clone();
expected.codex_linux_sandbox_exe = turn.codex_linux_sandbox_exe.clone();
expected.cwd = turn.cwd.clone();
expected
.approval_policy
.set(turn.approval_policy)
.expect("approval policy set");
expected
.sandbox_policy
.set(turn.sandbox_policy)
.expect("sandbox policy set");
assert_eq!(config, expected);
}
}

View File

@@ -571,6 +571,9 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::ContextCompacted(_) => {
ts_msg!(self, "context compacted");
}
EventMsg::CollabInteraction(_) => {
// TODO(jif) handle collab tools.
}
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
EventMsg::WebSearchBegin(_)
| EventMsg::ExecApprovalRequest(_)

View File

@@ -306,6 +306,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::ExitedReviewMode(_)
| EventMsg::ContextCompacted(_)
| EventMsg::ThreadRolledBack(_)
| EventMsg::CollabInteraction(_)
| EventMsg::DeprecationNotice(_) => {
// For now, we do not do anything extra for these
// events. Note that

View File

@@ -683,6 +683,9 @@ pub enum EventMsg {
AgentMessageContentDelta(AgentMessageContentDeltaEvent),
ReasoningContentDelta(ReasoningContentDeltaEvent),
ReasoningRawContentDelta(ReasoningRawContentDeltaEvent),
/// Collab interaction.
CollabInteraction(CollabInteractionEvent),
}
/// Agent lifecycle status, derived from emitted events.
@@ -699,7 +702,7 @@ pub enum AgentStatus {
Completed(Option<String>),
/// Agent encountered an error.
Errored(String),
/// Agent has been shutdowned.
/// Agent has been shutdown.
Shutdown,
/// Agent is not found.
NotFound,
@@ -1933,6 +1936,56 @@ pub enum TurnAbortReason {
ReviewEnded,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "snake_case")]
pub enum CollabInteractionEvent {
AgentSpawned {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the newly spawned agent.
new_id: ThreadId,
/// Initial prompt sent to the agent. Can be empty to prevent CoT leaking at the
/// beginning.
prompt: String,
},
AgentInteraction {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the receiver.
receiver_id: ThreadId,
/// Prompt sent from the sender to the receiver. Can be empty to prevent CoT
/// leaking at the beginning.
prompt: String,
},
WaitingBegin {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the receiver.
receiver_id: ThreadId,
/// ID of the waiting call.
waiting_id: String,
},
WaitingEnd {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the receiver.
receiver_id: ThreadId,
/// ID of the waiting call.
waiting_id: String,
/// Final status of the receiver agent reported to the sender agent.
status: AgentStatus,
},
Close {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the receiver.
receiver_id: ThreadId,
/// Last known status of the receiver agent reported to the sender agent before
/// the close.
status: AgentStatus,
},
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -24,6 +24,7 @@ use codex_core::protocol::AgentReasoningRawContentDeltaEvent;
use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CollabInteractionEvent;
use codex_core::protocol::CreditsSnapshot;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::ErrorEvent;
@@ -102,6 +103,7 @@ use crate::bottom_pane::SelectionViewParams;
use crate::bottom_pane::custom_prompt_view::CustomPromptView;
use crate::bottom_pane::popup_consts::standard_popup_hint_line;
use crate::clipboard_paste::paste_image_to_temp_png;
use crate::collab_event_cell;
use crate::diff_render::display_path_for;
use crate::exec_cell::CommandOutput;
use crate::exec_cell::ExecCell;
@@ -1073,6 +1075,11 @@ impl ChatWidget {
self.set_status_header(message);
}
fn on_collab_interaction(&mut self, event: CollabInteractionEvent) {
self.add_to_history(collab_event_cell::new_collab_interaction(event));
self.request_redraw();
}
fn on_undo_started(&mut self, event: UndoStartedEvent) {
self.bottom_pane.ensure_status_indicator();
self.bottom_pane.set_interrupt_hint_visible(false);
@@ -2182,6 +2189,7 @@ impl ChatWidget {
}
EventMsg::ExitedReviewMode(review) => self.on_exited_review_mode(review),
EventMsg::ContextCompacted(_) => self.on_agent_message("Context compacted".to_owned()),
EventMsg::CollabInteraction(event) => self.on_collab_interaction(event),
EventMsg::ThreadRolledBack(_) => {}
EventMsg::RawResponseItem(_)
| EventMsg::ItemStarted(_)

View File

@@ -0,0 +1,140 @@
use codex_core::protocol::AgentStatus;
use codex_core::protocol::CollabInteractionEvent;
use ratatui::style::Stylize;
use ratatui::text::Line;
use crate::history_cell::HistoryCell;
use crate::text_formatting::truncate_text;
use crate::wrapping::RtOptions;
use crate::wrapping::word_wrap_lines;
const COLLAB_PROMPT_MAX_GRAPHEMES: usize = 120;
#[derive(Debug)]
pub(crate) struct CollabInteractionCell {
summary: Line<'static>,
detail: Option<Line<'static>>,
}
impl CollabInteractionCell {
fn new(summary: Line<'static>, detail: Option<Line<'static>>) -> Self {
Self { summary, detail }
}
}
impl HistoryCell for CollabInteractionCell {
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
let wrap_width = width.max(1) as usize;
let mut lines = word_wrap_lines(
std::iter::once(self.summary.clone()),
RtOptions::new(wrap_width)
.initial_indent("".dim().into())
.subsequent_indent(" ".into()),
);
if let Some(detail) = &self.detail {
let detail_lines = word_wrap_lines(
std::iter::once(detail.clone()),
RtOptions::new(wrap_width)
.initial_indent("".dim().into())
.subsequent_indent(" ".into()),
);
lines.extend(detail_lines);
}
lines
}
}
fn collab_status_label(status: &AgentStatus) -> String {
match status {
AgentStatus::PendingInit => "pending init".to_string(),
AgentStatus::Running => "running".to_string(),
AgentStatus::Completed(message) => format!("completed: {message:?}"),
AgentStatus::Errored(_) => "errored".to_string(),
AgentStatus::Shutdown => "shutdown".to_string(),
AgentStatus::NotFound => "not found".to_string(),
}
}
fn collab_detail_line(label: &str, message: &str) -> Option<Line<'static>> {
let trimmed = message.trim();
if trimmed.is_empty() {
return None;
}
let collapsed = trimmed
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.collect::<Vec<_>>()
.join(" ");
let truncated = truncate_text(&collapsed, COLLAB_PROMPT_MAX_GRAPHEMES);
let label = format!("{label}: ");
Some(Line::from(vec![label.dim(), truncated.into()]))
}
pub(crate) fn new_collab_interaction(event: CollabInteractionEvent) -> CollabInteractionCell {
let (summary, detail) = match event {
CollabInteractionEvent::AgentSpawned { new_id, prompt, .. } => {
let summary = Line::from(vec![
"Spawned agent".bold(),
" ".into(),
new_id.to_string().dim(),
]);
let detail = collab_detail_line("Prompt", &prompt);
(summary, detail)
}
CollabInteractionEvent::AgentInteraction {
receiver_id,
prompt,
..
} => {
let summary = Line::from(vec![
"Sent to agent".bold(),
" ".into(),
receiver_id.to_string().dim(),
]);
let detail = collab_detail_line("Message", &prompt);
(summary, detail)
}
CollabInteractionEvent::WaitingBegin { receiver_id, .. } => {
let summary = Line::from(vec![
"Waiting on agent".bold(),
" ".into(),
receiver_id.to_string().dim(),
]);
(summary, None)
}
CollabInteractionEvent::WaitingEnd {
receiver_id,
status,
..
} => {
let summary = Line::from(vec![
"Wait ended for agent".bold(),
" ".into(),
receiver_id.to_string().dim(),
" · ".dim(),
collab_status_label(&status).dim(),
]);
(summary, None)
}
CollabInteractionEvent::Close {
receiver_id,
status,
..
} => {
let summary = Line::from(vec![
"Closed agent".bold(),
" ".into(),
receiver_id.to_string().dim(),
" · ".dim(),
collab_status_label(&status).dim(),
]);
(summary, None)
}
};
CollabInteractionCell::new(summary, detail)
}

View File

@@ -43,6 +43,7 @@ mod bottom_pane;
mod chatwidget;
mod cli;
mod clipboard_paste;
mod collab_event_cell;
mod color;
pub mod custom_terminal;
mod diff_render;

View File

@@ -1988,6 +1988,9 @@ impl ChatWidget {
}
EventMsg::ExitedReviewMode(review) => self.on_exited_review_mode(review),
EventMsg::ContextCompacted(_) => self.on_agent_message("Context compacted".to_owned()),
EventMsg::CollabInteraction(_) => {
// TODO(jif) handle collab tools.
}
EventMsg::RawResponseItem(_)
| EventMsg::ThreadRolledBack(_)
| EventMsg::ItemStarted(_)