diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 1efd514f62..9157228df3 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -5,10 +5,8 @@ use crate::agent::role::DEFAULT_ROLE_NAME; use crate::agent::role::resolve_role_config; use crate::agent::status::is_final; use crate::codex_thread::ThreadConfigSnapshot; -use crate::context_manager::is_user_turn_boundary; use crate::error::CodexErr; use crate::error::Result as CodexResult; -use crate::event_mapping::parse_turn_item; use crate::find_archived_thread_path_by_id_str; use crate::find_thread_path_by_id_str; use crate::rollout::RolloutRecorder; @@ -20,10 +18,7 @@ use crate::thread_manager::ThreadManagerState; use codex_features::Feature; use codex_protocol::AgentPath; use codex_protocol::ThreadId; -use codex_protocol::items::TurnItem; -use codex_protocol::models::ContentItem; use codex_protocol::models::FunctionCallOutputPayload; -use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InterAgentCommunication; @@ -118,30 +113,36 @@ impl AgentControl { pub(crate) async fn spawn_agent( &self, config: crate::config::Config, - items: Vec, + initial_operation: Op, session_source: Option, ) -> CodexResult { Ok(self - .spawn_agent_internal(config, items, session_source, SpawnAgentOptions::default()) + .spawn_agent_internal( + config, + initial_operation, + session_source, + SpawnAgentOptions::default(), + ) .await? .thread_id) } + /// Spawn an agent thread with some metadata. pub(crate) async fn spawn_agent_with_metadata( &self, config: crate::config::Config, - items: Vec, + initial_operation: Op, session_source: Option, - options: SpawnAgentOptions, + options: SpawnAgentOptions, // TODO(jif) drop with new fork. ) -> CodexResult { - self.spawn_agent_internal(config, items, session_source, options) + self.spawn_agent_internal(config, initial_operation, session_source, options) .await } async fn spawn_agent_internal( &self, config: crate::config::Config, - items: Vec, + initial_operation: Op, session_source: Option, options: SpawnAgentOptions, ) -> CodexResult { @@ -270,7 +271,8 @@ impl AgentControl { ) .await; - self.send_input(new_thread.thread_id, items).await?; + self.send_input(new_thread.thread_id, initial_operation) + .await?; let child_reference = agent_metadata .agent_path .as_ref() @@ -468,23 +470,15 @@ impl AgentControl { pub(crate) async fn send_input( &self, agent_id: ThreadId, - items: Vec, + initial_operation: Op, ) -> CodexResult { - let last_task_message = render_input_preview(&items); + let last_task_message = render_input_preview(&initial_operation); let state = self.upgrade()?; let result = self .handle_thread_request_result( agent_id, &state, - state - .send_op( - agent_id, - Op::UserInput { - items, - final_output_json_schema: None, - }, - ) - .await, + state.send_op(agent_id, initial_operation).await, ) .await; if result.is_ok() { @@ -766,10 +760,7 @@ impl AgentControl { .as_ref() .map(ToString::to_string) .unwrap_or_else(|| thread_id.to_string()); - let last_task_message = match metadata.last_task_message.clone() { - Some(last_task_message) => Some(last_task_message), - None => last_task_message_for_thread(thread.as_ref()).await, - }; + let last_task_message = metadata.last_task_message.clone(); agents.push(ListedAgent { agent_name, agent_status: thread.agent_status().await, @@ -1072,79 +1063,23 @@ fn agent_matches_prefix(agent_path: Option<&AgentPath>, prefix: &AgentPath) -> b }) } -async fn last_task_message_for_thread(thread: &crate::CodexThread) -> Option { - let pending_input = thread.codex.session.pending_input_snapshot().await; - if let Some(message) = pending_input - .iter() - .rev() - .find_map(last_task_message_from_input_item) - { - return Some(message); +pub(crate) fn render_input_preview(initial_operation: &Op) -> String { + match initial_operation { + Op::UserInput { items, .. } => items + .iter() + .map(|item| match item { + UserInput::Text { text, .. } => text.clone(), + UserInput::Image { .. } => "[image]".to_string(), + UserInput::LocalImage { path } => format!("[local_image:{}]", path.display()), + UserInput::Skill { name, path } => format!("[skill:${name}]({})", path.display()), + UserInput::Mention { name, path } => format!("[mention:${name}]({path})"), + _ => "[input]".to_string(), + }) + .collect::>() + .join("\n"), + Op::InterAgentCommunication { communication } => communication.content.clone(), + _ => String::new(), } - - let queued_input = thread - .codex - .session - .queued_response_items_for_next_turn_snapshot() - .await; - if let Some(message) = queued_input - .iter() - .rev() - .find_map(last_task_message_from_input_item) - { - return Some(message); - } - - let history = thread.codex.session.clone_history().await; - history - .raw_items() - .iter() - .rev() - .find_map(last_task_message_from_item) -} - -fn last_task_message_from_input_item(item: &ResponseInputItem) -> Option { - let response_item: ResponseItem = item.clone().into(); - last_task_message_from_item(&response_item) -} - -fn last_task_message_from_item(item: &ResponseItem) -> Option { - if !is_user_turn_boundary(item) { - return None; - } - - match item { - ResponseItem::Message { role, .. } if role == "user" => { - let Some(TurnItem::UserMessage(message)) = parse_turn_item(item) else { - return None; - }; - Some(render_input_preview(&message.content)) - } - ResponseItem::Message { content, .. } => match content.as_slice() { - [ContentItem::InputText { text }] | [ContentItem::OutputText { text }] => { - serde_json::from_str::(text) - .ok() - .map(|communication| communication.content) - } - _ => None, - }, - _ => None, - } -} - -fn render_input_preview(items: &[UserInput]) -> String { - items - .iter() - .map(|item| match item { - UserInput::Text { text, .. } => text.clone(), - UserInput::Image { .. } => "[image]".to_string(), - UserInput::LocalImage { path } => format!("[local_image:{}]", path.display()), - UserInput::Skill { name, path } => format!("[skill:${name}]({})", path.display()), - UserInput::Mention { name, path } => format!("[mention:${name}]({path})"), - _ => "[input]".to_string(), - }) - .collect::>() - .join("\n") } fn thread_spawn_depth(session_source: &SessionSource) -> Option { diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 878d7d0991..41a01c5ff7 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -54,11 +54,12 @@ async fn test_config() -> (TempDir, Config) { test_config_with_cli_overrides(Vec::new()).await } -fn text_input(text: &str) -> Vec { +fn text_input(text: &str) -> Op { vec![UserInput::Text { text: text.to_string(), text_elements: Vec::new(), }] + .into() } struct AgentControlHarness { @@ -217,7 +218,8 @@ async fn send_input_errors_when_manager_dropped() { vec![UserInput::Text { text: "hello".to_string(), text_elements: Vec::new(), - }], + }] + .into(), ) .await .expect_err("send_input should fail without a manager"); @@ -321,7 +323,8 @@ async fn send_input_errors_when_thread_missing() { vec![UserInput::Text { text: "hello".to_string(), text_elements: Vec::new(), - }], + }] + .into(), ) .await .expect_err("send_input should fail for missing thread"); @@ -387,7 +390,8 @@ async fn send_input_submits_user_message() { vec![UserInput::Text { text: "hello from tests".to_string(), text_elements: Vec::new(), - }], + }] + .into(), ) .await .expect("send_input should succeed"); diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 54f5c93f2d..9400af7219 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -3980,17 +3980,6 @@ impl Session { } } - pub(crate) async fn pending_input_snapshot(&self) -> Vec { - let active = self.active_turn.lock().await; - match active.as_ref() { - Some(at) => { - let ts = at.turn_state.lock().await; - ts.pending_input_snapshot() - } - None => Vec::with_capacity(0), - } - } - /// Queue response items to be injected into the next active turn created for this session. pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec) { if items.is_empty() { @@ -4005,12 +3994,6 @@ impl Session { std::mem::take(&mut *self.idle_pending_input.lock().await) } - pub(crate) async fn queued_response_items_for_next_turn_snapshot( - &self, - ) -> Vec { - self.idle_pending_input.lock().await.clone() - } - pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool { !self.idle_pending_input.lock().await.is_empty() } diff --git a/codex-rs/core/src/memories/phase2.rs b/codex-rs/core/src/memories/phase2.rs index 321b314abb..78f762ed0e 100644 --- a/codex-rs/core/src/memories/phase2.rs +++ b/codex-rs/core/src/memories/phase2.rs @@ -132,7 +132,7 @@ pub(super) async fn run(session: &Arc, config: Arc) { let thread_id = match session .services .agent_control - .spawn_agent(agent_config, prompt, Some(source)) + .spawn_agent(agent_config, prompt.into(), Some(source)) .await { Ok(thread_id) => thread_id, diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index ebfa5a8bb8..07d040ca21 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -198,10 +198,6 @@ impl TurnState { } } - pub(crate) fn pending_input_snapshot(&self) -> Vec { - self.pending_input.clone() - } - pub(crate) fn has_pending_input(&self) -> bool { !self.pending_input.is_empty() } diff --git a/codex-rs/core/src/tools/handlers/agent_jobs.rs b/codex-rs/core/src/tools/handlers/agent_jobs.rs index 639b21d067..c607730b8f 100644 --- a/codex-rs/core/src/tools/handlers/agent_jobs.rs +++ b/codex-rs/core/src/tools/handlers/agent_jobs.rs @@ -633,7 +633,7 @@ async fn run_agent_job_loop( .agent_control .spawn_agent( options.spawn_config.clone(), - items, + items.into(), Some(SessionSource::SubAgent(SubAgentSource::Other(format!( "agent_job:{job_id}" )))), diff --git a/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs b/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs index 9a80a43a22..3c8527712b 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs @@ -1,4 +1,5 @@ use super::*; +use crate::agent::control::render_input_preview; pub(crate) struct Handler; @@ -26,7 +27,7 @@ impl ToolHandler for Handler { let args: SendInputArgs = parse_arguments(&arguments)?; let receiver_thread_id = parse_agent_id_target(&args.target)?; let input_items = parse_collab_input(args.message, args.items)?; - let prompt = input_preview(&input_items); + let prompt = render_input_preview(&input_items); let receiver_agent = session .services .agent_control diff --git a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs index 37f166c259..9bb7b6055c 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs @@ -1,5 +1,6 @@ use super::*; use crate::agent::control::SpawnAgentOptions; +use crate::agent::control::render_input_preview; use crate::agent::role::DEFAULT_ROLE_NAME; use crate::agent::role::apply_role_to_config; @@ -36,7 +37,7 @@ impl ToolHandler for Handler { .map(str::trim) .filter(|role| !role.is_empty()); let input_items = parse_collab_input(args.message, args.items)?; - let prompt = input_preview(&input_items); + let prompt = render_input_preview(&input_items); let session_source = turn.session_source.clone(); let child_depth = next_thread_spawn_depth(&session_source); let max_depth = turn.config.agent_max_depth; diff --git a/codex-rs/core/src/tools/handlers/multi_agents_common.rs b/codex-rs/core/src/tools/handlers/multi_agents_common.rs index 452e05a2f4..dd68465a7f 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_common.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_common.rs @@ -17,6 +17,7 @@ use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::openai_models::ReasoningEffortPreset; use codex_protocol::protocol::CollabAgentRef; use codex_protocol::protocol::CollabAgentStatusEntry; +use codex_protocol::protocol::Op; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::user_input::UserInput; @@ -161,7 +162,7 @@ pub(crate) fn thread_spawn_source( pub(crate) fn parse_collab_input( message: Option, items: Option>, -) -> Result, FunctionCallError> { +) -> Result { match (message, items) { (Some(_), Some(_)) => Err(FunctionCallError::RespondToModel( "Provide either message or items, but not both".to_string(), @@ -178,7 +179,8 @@ pub(crate) fn parse_collab_input( Ok(vec![UserInput::Text { text: message, text_elements: Vec::new(), - }]) + }] + .into()) } (None, Some(items)) => { if items.is_empty() { @@ -186,29 +188,11 @@ pub(crate) fn parse_collab_input( "Items can't be empty".to_string(), )); } - Ok(items) + Ok(items.into()) } } } -pub(crate) fn input_preview(items: &[UserInput]) -> String { - let parts: Vec = items - .iter() - .map(|item| match item { - UserInput::Text { text, .. } => text.clone(), - UserInput::Image { .. } => "[image]".to_string(), - UserInput::LocalImage { path } => format!("[local_image:{}]", path.display()), - UserInput::Skill { name, path } => { - format!("[skill:${name}]({})", path.display()) - } - UserInput::Mention { name, path } => format!("[mention:${name}]({path})"), - _ => "[input]".to_string(), - }) - .collect(); - - parts.join("\n") -} - /// Builds the base config snapshot for a newly spawned sub-agent. /// /// The returned config starts from the parent's effective config and then refreshes the diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index 7f1f966569..f6f4677bee 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -436,6 +436,18 @@ async fn multi_agent_v2_spawn_returns_path_and_send_message_accepts_relative_pat child_snapshot.session_source.get_agent_path().as_deref(), Some("/root/test_process") ); + assert!(manager.captured_ops().iter().any(|(id, op)| { + *id == child_thread_id + && matches!( + op, + Op::InterAgentCommunication { communication } + if communication.author == AgentPath::root() + && communication.recipient.as_str() == "/root/test_process" + && communication.other_recipients.is_empty() + && communication.content == "inspect this repo" + && communication.trigger_turn + ) + })); SendMessageHandlerV2 .handle(invocation( @@ -490,7 +502,8 @@ async fn multi_agent_v2_send_message_accepts_root_target_from_child() { vec![UserInput::Text { text: "inspect this repo".to_string(), text_elements: Vec::new(), - }], + }] + .into(), Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: root.thread_id, depth: 1, @@ -654,7 +667,8 @@ async fn multi_agent_v2_list_agents_filters_by_relative_path_prefix() { vec![UserInput::Text { text: "research".to_string(), text_elements: Vec::new(), - }], + }] + .into(), Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: root.thread_id, depth: 1, @@ -674,7 +688,8 @@ async fn multi_agent_v2_list_agents_filters_by_relative_path_prefix() { vec![UserInput::Text { text: "build".to_string(), text_elements: Vec::new(), - }], + }] + .into(), Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: root.thread_id, depth: 2, diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs index 48c7615e70..a61c01f6a1 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs @@ -4,6 +4,7 @@ //! whether the resulting `InterAgentCommunication` should wake the target immediately. use super::*; +use crate::agent::control::render_input_preview; use codex_protocol::protocol::InterAgentCommunication; #[derive(Clone, Copy)] @@ -83,7 +84,7 @@ fn text_content( .iter() .all(|item| matches!(item, UserInput::Text { .. })) { - return Ok(input_preview(items)); + return Ok(render_input_preview(&(items.to_vec().into()))); } Err(FunctionCallError::RespondToModel( mode.unsupported_items_error().to_string(), diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs index 6cdad3c1c2..ffe128b43e 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs @@ -1,8 +1,12 @@ use super::*; use crate::agent::control::SpawnAgentOptions; +use crate::agent::control::render_input_preview; use crate::agent::next_thread_spawn_depth; use crate::agent::role::DEFAULT_ROLE_NAME; use crate::agent::role::apply_role_to_config; +use codex_protocol::AgentPath; +use codex_protocol::protocol::InterAgentCommunication; +use codex_protocol::protocol::Op; pub(crate) struct Handler; @@ -33,8 +37,10 @@ impl ToolHandler for Handler { .as_deref() .map(str::trim) .filter(|role| !role.is_empty()); - let input_items = parse_collab_input(args.message, args.items)?; - let prompt = input_preview(&input_items); + + let initial_operation = parse_collab_input(args.message, args.items)?; + let prompt = render_input_preview(&initial_operation); + let session_source = turn.session_source.clone(); let child_depth = next_thread_spawn_depth(&session_source); let max_depth = turn.config.agent_max_depth; @@ -72,19 +78,39 @@ impl ToolHandler for Handler { apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?; apply_spawn_agent_overrides(&mut config, child_depth); + let spawn_source = thread_spawn_source( + session.conversation_id, + &turn.session_source, + child_depth, + role_name, + Some(args.task_name.clone()), + )?; let result = session .services .agent_control .spawn_agent_with_metadata( config, - input_items, - Some(thread_spawn_source( - session.conversation_id, - &turn.session_source, - child_depth, - role_name, - Some(args.task_name.clone()), - )?), + match (spawn_source.get_agent_path(), initial_operation) { + (Some(recipient), Op::UserInput { items, .. }) + if items + .iter() + .all(|item| matches!(item, UserInput::Text { .. })) => + { + Op::InterAgentCommunication { + communication: InterAgentCommunication::new( + turn.session_source + .get_agent_path() + .unwrap_or_else(AgentPath::root), + recipient, + Vec::new(), + prompt.clone(), + /*trigger_turn*/ true, + ), + } + } + (_, initial_operation) => initial_operation, + }, + Some(spawn_source), SpawnAgentOptions { fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()), }, diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 9a4f2b90b2..8b38a5050f 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -512,6 +512,15 @@ pub enum Op { ListModels, } +impl From> for Op { + fn from(value: Vec) -> Self { + Op::UserInput { + items: value, + final_output_json_schema: None, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema, TS)] pub struct InterAgentCommunication { pub author: AgentPath,