Compare commits

...

2 Commits

Author SHA1 Message Date
jif-oai
651d6852a2 feat: basic collab rendering 2026-01-12 15:27:59 +00:00
jif-oai
e3cf74885a feat: emit events around collab tools 2026-01-12 14:53:09 +00:00
10 changed files with 307 additions and 19 deletions

View File

@@ -354,7 +354,7 @@ impl Codex {
///
/// A session has at most 1 running task at a time, and can be interrupted by user input.
pub(crate) struct Session {
conversation_id: ThreadId,
pub(crate) conversation_id: ThreadId,
tx_event: Sender<Event>,
agent_status: watch::Sender<AgentStatus>,
state: Mutex<SessionState>,

View File

@@ -90,6 +90,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::SkillsUpdateAvailable => false,
| EventMsg::SkillsUpdateAvailable
| EventMsg::CollabInteraction(_) => false,
}
}

View File

@@ -11,6 +11,8 @@ use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use async_trait::async_trait;
use codex_protocol::ThreadId;
use codex_protocol::protocol::CollabInteractionEvent;
use codex_protocol::protocol::EventMsg;
use serde::Deserialize;
use serde::Serialize;
@@ -54,9 +56,9 @@ impl ToolHandler for CollabHandler {
match tool_name.as_str() {
"spawn_agent" => spawn::handle(session, turn, arguments).await,
"send_input" => send_input::handle(session, arguments).await,
"wait" => wait::handle(session, arguments).await,
"close_agent" => close_agent::handle(session, arguments).await,
"send_input" => send_input::handle(session, turn, arguments).await,
"wait" => wait::handle(session, turn, arguments).await,
"close_agent" => close_agent::handle(session, turn, arguments).await,
other => Err(FunctionCallError::RespondToModel(format!(
"unsupported collab tool {other}"
))),
@@ -89,16 +91,36 @@ mod spawn {
let result = session
.services
.agent_control
.spawn_agent(config, args.message, true)
.spawn_agent(config, args.message.clone(), true)
.await
.map_err(|err| FunctionCallError::Fatal(err.to_string()))?;
emit_event(session, turn, args.message, result).await;
Ok(ToolOutput::Function {
content: format!("agent_id: {result}"),
success: Some(true),
content_items: None,
})
}
async fn emit_event(
session: Arc<Session>,
turn: Arc<TurnContext>,
prompt: String,
new_id: ThreadId,
) {
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::AgentSpawned {
sender_id: session.conversation_id,
new_id,
prompt,
}),
)
.await
}
}
mod send_input {
@@ -114,6 +136,7 @@ mod send_input {
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: SendInputArgs = parse_arguments(&arguments)?;
@@ -126,7 +149,7 @@ mod send_input {
let content = session
.services
.agent_control
.send_prompt(agent_id, args.message)
.send_prompt(agent_id, args.message.clone())
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
@@ -135,12 +158,32 @@ mod send_input {
err => FunctionCallError::Fatal(err.to_string()),
})?;
emit_event(session, turn, agent_id, args.message).await;
Ok(ToolOutput::Function {
content,
success: Some(true),
content_items: None,
})
}
async fn emit_event(
session: Arc<Session>,
turn: Arc<TurnContext>,
receiver_id: ThreadId,
prompt: String,
) {
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::AgentInteraction {
sender_id: session.conversation_id,
receiver_id,
prompt,
}),
)
.await
}
}
mod wait {
@@ -166,6 +209,7 @@ mod wait {
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: WaitArgs = parse_arguments(&arguments)?;
@@ -194,6 +238,18 @@ mod wait {
err => FunctionCallError::Fatal(err.to_string()),
})?;
let waiting_id = format!("collab-waiting-{}", uuid::Uuid::new_v4());
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::WaitingBegin {
sender_id: session.conversation_id,
receiver_id: agent_id,
waiting_id: waiting_id.clone(),
}),
)
.await;
// Get last known status.
let mut status = status_rx.borrow_and_update().clone();
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
@@ -218,6 +274,18 @@ mod wait {
}
};
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::WaitingEnd {
sender_id: session.conversation_id,
receiver_id: agent_id,
waiting_id,
status: status.clone(),
}),
)
.await;
if matches!(status, AgentStatus::NotFound) {
return Err(FunctionCallError::RespondToModel(format!(
"agent with id {agent_id} not found"
@@ -250,22 +318,12 @@ pub mod close_agent {
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
arguments: String,
) -> Result<ToolOutput, FunctionCallError> {
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
let mut status_rx = session
.services
.agent_control
.subscribe_status(agent_id)
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
err => FunctionCallError::Fatal(err.to_string()),
})?;
let status = status_rx.borrow_and_update().clone();
let status = session.services.agent_control.get_status(agent_id).await;
if !matches!(status, AgentStatus::Shutdown) {
let _ = session
@@ -281,6 +339,8 @@ pub mod close_agent {
})?;
}
emit_event(session, turn, agent_id, status.clone()).await;
let content = serde_json::to_string(&CloseAgentResult { status }).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize close_agent result: {err}"))
})?;
@@ -291,6 +351,24 @@ pub mod close_agent {
content_items: None,
})
}
async fn emit_event(
session: Arc<Session>,
turn: Arc<TurnContext>,
receiver_id: ThreadId,
status: AgentStatus,
) {
session
.send_event(
&turn,
EventMsg::CollabInteraction(CollabInteractionEvent::Close {
sender_id: session.conversation_id,
receiver_id,
status,
}),
)
.await
}
}
fn agent_id(id: &str) -> Result<ThreadId, FunctionCallError> {

View File

@@ -571,6 +571,9 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::ContextCompacted(_) => {
ts_msg!(self, "context compacted");
}
EventMsg::CollabInteraction(_) => {
// TODO(jif) handle collab tools.
}
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
EventMsg::WebSearchBegin(_)
| EventMsg::ExecApprovalRequest(_)

View File

@@ -306,6 +306,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::ExitedReviewMode(_)
| EventMsg::ContextCompacted(_)
| EventMsg::ThreadRolledBack(_)
| EventMsg::CollabInteraction(_)
| EventMsg::DeprecationNotice(_) => {
// For now, we do not do anything extra for these
// events. Note that

View File

@@ -683,6 +683,9 @@ pub enum EventMsg {
AgentMessageContentDelta(AgentMessageContentDeltaEvent),
ReasoningContentDelta(ReasoningContentDeltaEvent),
ReasoningRawContentDelta(ReasoningRawContentDeltaEvent),
/// Collab interaction.
CollabInteraction(CollabInteractionEvent),
}
/// Agent lifecycle status, derived from emitted events.
@@ -1933,6 +1936,56 @@ pub enum TurnAbortReason {
ReviewEnded,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "snake_case")]
pub enum CollabInteractionEvent {
AgentSpawned {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the newly spawned agent.
new_id: ThreadId,
/// Initial prompt sent to the agent. Can be empty to prevent CoT leaking at the
/// beginning.
prompt: String,
},
AgentInteraction {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the receiver.
receiver_id: ThreadId,
/// Prompt sent from the sender to the receiver. Can be empty to prevent CoT
/// leaking at the beginning.
prompt: String,
},
WaitingBegin {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the receiver.
receiver_id: ThreadId,
/// ID of the waiting call.
waiting_id: String,
},
WaitingEnd {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the receiver.
receiver_id: ThreadId,
/// ID of the waiting call.
waiting_id: String,
/// Final status of the receiver agent reported to the sender agent.
status: AgentStatus,
},
Close {
/// Thread ID of the sender.
sender_id: ThreadId,
/// Thread ID of the receiver.
receiver_id: ThreadId,
/// Last known status of the receiver agent reported to the sender agent before
/// the close.
status: AgentStatus,
},
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -24,6 +24,7 @@ use codex_core::protocol::AgentReasoningRawContentDeltaEvent;
use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CollabInteractionEvent;
use codex_core::protocol::CreditsSnapshot;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::ErrorEvent;
@@ -102,6 +103,7 @@ use crate::bottom_pane::SelectionViewParams;
use crate::bottom_pane::custom_prompt_view::CustomPromptView;
use crate::bottom_pane::popup_consts::standard_popup_hint_line;
use crate::clipboard_paste::paste_image_to_temp_png;
use crate::collab_event_cell;
use crate::diff_render::display_path_for;
use crate::exec_cell::CommandOutput;
use crate::exec_cell::ExecCell;
@@ -1073,6 +1075,11 @@ impl ChatWidget {
self.set_status_header(message);
}
fn on_collab_interaction(&mut self, event: CollabInteractionEvent) {
self.add_to_history(collab_event_cell::new_collab_interaction(event));
self.request_redraw();
}
fn on_undo_started(&mut self, event: UndoStartedEvent) {
self.bottom_pane.ensure_status_indicator();
self.bottom_pane.set_interrupt_hint_visible(false);
@@ -2182,6 +2189,7 @@ impl ChatWidget {
}
EventMsg::ExitedReviewMode(review) => self.on_exited_review_mode(review),
EventMsg::ContextCompacted(_) => self.on_agent_message("Context compacted".to_owned()),
EventMsg::CollabInteraction(event) => self.on_collab_interaction(event),
EventMsg::ThreadRolledBack(_) => {}
EventMsg::RawResponseItem(_)
| EventMsg::ItemStarted(_)

View File

@@ -0,0 +1,140 @@
use codex_core::protocol::AgentStatus;
use codex_core::protocol::CollabInteractionEvent;
use ratatui::style::Stylize;
use ratatui::text::Line;
use crate::history_cell::HistoryCell;
use crate::text_formatting::truncate_text;
use crate::wrapping::RtOptions;
use crate::wrapping::word_wrap_lines;
const COLLAB_PROMPT_MAX_GRAPHEMES: usize = 120;
#[derive(Debug)]
pub(crate) struct CollabInteractionCell {
summary: Line<'static>,
detail: Option<Line<'static>>,
}
impl CollabInteractionCell {
fn new(summary: Line<'static>, detail: Option<Line<'static>>) -> Self {
Self { summary, detail }
}
}
impl HistoryCell for CollabInteractionCell {
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
let wrap_width = width.max(1) as usize;
let mut lines = word_wrap_lines(
std::iter::once(self.summary.clone()),
RtOptions::new(wrap_width)
.initial_indent("".dim().into())
.subsequent_indent(" ".into()),
);
if let Some(detail) = &self.detail {
let detail_lines = word_wrap_lines(
std::iter::once(detail.clone()),
RtOptions::new(wrap_width)
.initial_indent("".dim().into())
.subsequent_indent(" ".into()),
);
lines.extend(detail_lines);
}
lines
}
}
fn collab_status_label(status: &AgentStatus) -> String {
match status {
AgentStatus::PendingInit => "pending init".to_string(),
AgentStatus::Running => "running".to_string(),
AgentStatus::Completed(message) => format!("completed: {message:?}"),
AgentStatus::Errored(_) => "errored".to_string(),
AgentStatus::Shutdown => "shutdown".to_string(),
AgentStatus::NotFound => "not found".to_string(),
}
}
fn collab_detail_line(label: &str, message: &str) -> Option<Line<'static>> {
let trimmed = message.trim();
if trimmed.is_empty() {
return None;
}
let collapsed = trimmed
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.collect::<Vec<_>>()
.join(" ");
let truncated = truncate_text(&collapsed, COLLAB_PROMPT_MAX_GRAPHEMES);
let label = format!("{label}: ");
Some(Line::from(vec![label.dim(), truncated.into()]))
}
pub(crate) fn new_collab_interaction(event: CollabInteractionEvent) -> CollabInteractionCell {
let (summary, detail) = match event {
CollabInteractionEvent::AgentSpawned { new_id, prompt, .. } => {
let summary = Line::from(vec![
"Spawned agent".bold(),
" ".into(),
new_id.to_string().dim(),
]);
let detail = collab_detail_line("Prompt", &prompt);
(summary, detail)
}
CollabInteractionEvent::AgentInteraction {
receiver_id,
prompt,
..
} => {
let summary = Line::from(vec![
"Sent to agent".bold(),
" ".into(),
receiver_id.to_string().dim(),
]);
let detail = collab_detail_line("Message", &prompt);
(summary, detail)
}
CollabInteractionEvent::WaitingBegin { receiver_id, .. } => {
let summary = Line::from(vec![
"Waiting on agent".bold(),
" ".into(),
receiver_id.to_string().dim(),
]);
(summary, None)
}
CollabInteractionEvent::WaitingEnd {
receiver_id,
status,
..
} => {
let summary = Line::from(vec![
"Wait ended for agent".bold(),
" ".into(),
receiver_id.to_string().dim(),
" · ".dim(),
collab_status_label(&status).dim(),
]);
(summary, None)
}
CollabInteractionEvent::Close {
receiver_id,
status,
..
} => {
let summary = Line::from(vec![
"Closed agent".bold(),
" ".into(),
receiver_id.to_string().dim(),
" · ".dim(),
collab_status_label(&status).dim(),
]);
(summary, None)
}
};
CollabInteractionCell::new(summary, detail)
}

View File

@@ -43,6 +43,7 @@ mod bottom_pane;
mod chatwidget;
mod cli;
mod clipboard_paste;
mod collab_event_cell;
mod color;
pub mod custom_terminal;
mod diff_render;

View File

@@ -1988,6 +1988,9 @@ impl ChatWidget {
}
EventMsg::ExitedReviewMode(review) => self.on_exited_review_mode(review),
EventMsg::ContextCompacted(_) => self.on_agent_message("Context compacted".to_owned()),
EventMsg::CollabInteraction(_) => {
// TODO(jif) handle collab tools.
}
EventMsg::RawResponseItem(_)
| EventMsg::ThreadRolledBack(_)
| EventMsg::ItemStarted(_)