use crate::agent::AgentStatus; use crate::agent::registry::AgentMetadata; use crate::agent::registry::AgentRegistry; use crate::agent::role::DEFAULT_ROLE_NAME; use crate::agent::role::resolve_role_config; use crate::agent::status::is_final; use crate::codex_thread::ThreadConfigSnapshot; use crate::session::emit_subagent_session_started; use crate::session_prefix::format_subagent_context_line; use crate::session_prefix::format_subagent_notification_message; use crate::shell_snapshot::ShellSnapshot; use crate::thread_manager::ResumeThreadWithHistoryOptions; use crate::thread_manager::ThreadManagerState; use crate::thread_rollout_truncation::truncate_rollout_to_last_n_fork_turns; use codex_features::Feature; use codex_protocol::AgentPath; use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::error::CodexErr; use codex_protocol::error::Result as CodexResult; use codex_protocol::models::ContentItem; use codex_protocol::models::MessagePhase; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InterAgentCommunication; use codex_protocol::protocol::Op; use codex_protocol::protocol::ResumedHistory; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::user_input::UserInput; use codex_state::DirectionalThreadSpawnEdgeStatus; use codex_thread_store::ReadThreadParams; use serde::Serialize; use std::collections::HashMap; use std::collections::VecDeque; use std::sync::Arc; use std::sync::Weak; use tokio::sync::watch; use tracing::warn; const AGENT_NAMES: &str = include_str!("agent_names.txt"); const ROOT_LAST_TASK_MESSAGE: &str = "Main thread"; #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum SpawnAgentForkMode { FullHistory, LastNTurns(usize), } #[derive(Clone, Debug, Default)] pub(crate) struct SpawnAgentOptions { pub(crate) fork_parent_spawn_call_id: Option, pub(crate) fork_mode: Option, pub(crate) environments: Option>, } #[derive(Clone, Debug)] pub(crate) struct LiveAgent { pub(crate) thread_id: ThreadId, pub(crate) metadata: AgentMetadata, pub(crate) status: AgentStatus, } #[derive(Clone, Debug, Serialize, PartialEq, Eq)] pub(crate) struct ListedAgent { pub(crate) agent_name: String, pub(crate) agent_status: AgentStatus, pub(crate) last_task_message: Option, } fn default_agent_nickname_list() -> Vec<&'static str> { AGENT_NAMES .lines() .map(str::trim) .filter(|name| !name.is_empty()) .collect() } fn agent_nickname_candidates( config: &crate::config::Config, role_name: Option<&str>, ) -> Vec { let role_name = role_name.unwrap_or(DEFAULT_ROLE_NAME); if let Some(candidates) = resolve_role_config(config, role_name).and_then(|role| role.nickname_candidates.clone()) { return candidates; } default_agent_nickname_list() .into_iter() .map(ToOwned::to_owned) .collect() } fn keep_forked_response_item( item: &ResponseItem, multi_agent_v2_usage_hint_texts_to_filter: &[String], ) -> bool { let is_multi_agent_v2_usage_hint = match item { ResponseItem::Message { role, content, .. } if role == "developer" => { matches!( content.as_slice(), [ContentItem::InputText { text }] if multi_agent_v2_usage_hint_texts_to_filter .iter() .any(|usage_hint_text| usage_hint_text == text) ) } _ => false, }; if is_multi_agent_v2_usage_hint { return false; } match item { ResponseItem::Message { role, phase, .. } => match role.as_str() { "system" | "developer" | "user" => true, "assistant" => *phase == Some(MessagePhase::FinalAnswer), _ => false, }, ResponseItem::Reasoning { .. } | ResponseItem::LocalShellCall { .. } | ResponseItem::FunctionCall { .. } | ResponseItem::ToolSearchCall { .. } | ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCall { .. } | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::ToolSearchOutput { .. } | ResponseItem::WebSearchCall { .. } | ResponseItem::ImageGenerationCall { .. } | ResponseItem::Compaction { .. } | ResponseItem::CompactionTrigger | ResponseItem::ContextCompaction { .. } | ResponseItem::Other => false, } } fn sanitize_forked_replacement_history( items: &mut Vec, multi_agent_v2_usage_hint_texts_to_filter: &[String], ) { let is_contextual_user_response_item = |item: &ResponseItem| { matches!( item, ResponseItem::Message { role, content, .. } if role == "user" && crate::event_mapping::is_contextual_user_message_content(content) ) }; let is_non_contextual_user_response_item = |item: &ResponseItem| { matches!( item, ResponseItem::Message { role, content, .. } if role == "user" && !crate::event_mapping::is_contextual_user_message_content(content) ) }; let source_items = std::mem::take(items); let mut sanitized_items = Vec::with_capacity(source_items.len()); let mut index = 0; while index < source_items.len() { let mut skipped_developer_context = false; while let Some(item) = source_items.get(index) { if matches!(item, ResponseItem::Message { role, .. } if role == "developer") { skipped_developer_context = true; index += 1; } else if is_contextual_user_response_item(item) { index += 1; } else { break; } } if skipped_developer_context && source_items .get(index) .is_some_and(is_non_contextual_user_response_item) && source_items .get(index + 1) .is_some_and(is_non_contextual_user_response_item) { // Extension-contributed PromptSlot::ContextualUser fragments do not carry the // built-in contextual markers. In compacted histories, the remaining structural // cue is that such a startup user item sits between developer setup and the // first real user turn. index += 1; } let Some(item) = source_items.get(index) else { break; }; if keep_forked_response_item(item, multi_agent_v2_usage_hint_texts_to_filter) { sanitized_items.push(item.clone()); } index += 1; } *items = sanitized_items; } fn strip_parent_startup_context_bundle_from_forked_rollout(items: &mut Vec) { let Some(turn_context_idx) = items .iter() .position(|item| matches!(item, RolloutItem::TurnContext(_))) else { return; }; let prefix = &items[..turn_context_idx]; if !prefix.iter().all(|item| { matches!( item, RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) if role == "developer" || role == "user" ) || matches!(item, RolloutItem::SessionMeta(_)) }) { return; } let mut context_start = turn_context_idx; while context_start > 0 { let is_startup_context_item = match &items[context_start - 1] { RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) if role == "developer" || role == "user" => { true } _ => false, }; if !is_startup_context_item { break; } context_start -= 1; } if context_start < turn_context_idx { items.drain(context_start..=turn_context_idx); } } fn sanitize_forked_rollout_item( item: &mut RolloutItem, multi_agent_v2_usage_hint_texts_to_filter: &[String], ) -> bool { match item { RolloutItem::ResponseItem(response_item) => { keep_forked_response_item(response_item, multi_agent_v2_usage_hint_texts_to_filter) } // A forked child gets its own runtime config, including spawned-agent // instructions, so it must establish a fresh context diff baseline. RolloutItem::TurnContext(_) => false, RolloutItem::Compacted(compacted) => { if let Some(replacement_history) = &mut compacted.replacement_history { sanitize_forked_replacement_history( replacement_history, multi_agent_v2_usage_hint_texts_to_filter, ); } true } RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => true, } } /// Control-plane handle for multi-agent operations. /// `AgentControl` is held by each session (via `SessionServices`). It provides capability to /// spawn new agents and the inter-agent communication layer. /// An `AgentControl` instance is intended to be created at most once per root thread/session /// tree. That same `AgentControl` is then shared with every sub-agent spawned from that root, /// which keeps the registry scoped to that root thread rather than the entire `ThreadManager`. #[derive(Clone, Default)] pub(crate) struct AgentControl { /// ID shared by the whole agent control session. This means every sub-agents from a common /// root share the same session ID. session_id: SessionId, /// Weak handle back to the global thread registry/state. /// This is `Weak` to avoid reference cycles and shadow persistence of the form /// `ThreadManagerState -> CodexThread -> Session -> SessionServices -> ThreadManagerState`. manager: Weak, state: Arc, } impl AgentControl { /// Construct a new `AgentControl` that can spawn/message agents via the given manager state. pub(crate) fn new(manager: Weak) -> Self { Self { manager, ..Default::default() } } pub(crate) fn with_session_id(mut self, session_id: SessionId) -> Self { self.session_id = session_id; self } pub(crate) fn session_id(&self) -> SessionId { self.session_id } /// Spawn a new agent thread and submit the initial prompt. #[cfg(test)] pub(crate) async fn spawn_agent( &self, config: crate::config::Config, initial_operation: Op, session_source: Option, ) -> CodexResult { let spawned_agent = Box::pin(self.spawn_agent_internal( config, initial_operation, session_source, SpawnAgentOptions::default(), )) .await?; Ok(spawned_agent.thread_id) } /// Spawn an agent thread with some metadata. pub(crate) async fn spawn_agent_with_metadata( &self, config: crate::config::Config, initial_operation: Op, session_source: Option, options: SpawnAgentOptions, // TODO(jif) drop with new fork. ) -> CodexResult { Box::pin(self.spawn_agent_internal(config, initial_operation, session_source, options)) .await } async fn spawn_agent_internal( &self, config: crate::config::Config, initial_operation: Op, session_source: Option, options: SpawnAgentOptions, ) -> CodexResult { let state = self.upgrade()?; let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?; let inherited_shell_snapshot = self .inherited_shell_snapshot_for_source(&state, session_source.as_ref()) .await; let inherited_exec_policy = self .inherited_exec_policy_for_source(&state, session_source.as_ref(), &config) .await; let (session_source, mut agent_metadata) = match session_source { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth, agent_path, agent_role, .. })) => { let (session_source, agent_metadata) = self.prepare_thread_spawn( &mut reservation, &config, parent_thread_id, depth, agent_path, agent_role, /*preferred_agent_nickname*/ None, )?; (Some(session_source), agent_metadata) } other => (other, AgentMetadata::default()), }; let notification_source = session_source.clone(); // The same `AgentControl` is sent to spawn the thread. let new_thread = match (session_source, options.fork_mode.as_ref()) { (Some(session_source), Some(_)) => { Box::pin(self.spawn_forked_thread( &state, config, session_source, &options, inherited_shell_snapshot, inherited_exec_policy, )) .await? } (Some(session_source), None) => { Box::pin(state.spawn_new_thread_with_source( config.clone(), self.clone(), session_source, /*thread_source*/ Some(ThreadSource::Subagent), /*persist_extended_history*/ false, /*metrics_service_name*/ None, inherited_shell_snapshot, inherited_exec_policy, options.environments.clone(), )) .await? } (None, _) => Box::pin(state.spawn_new_thread(config.clone(), self.clone())).await?, }; agent_metadata.agent_id = Some(new_thread.thread_id); reservation.commit(agent_metadata.clone()); if let Some(SessionSource::SubAgent( subagent_source @ SubAgentSource::ThreadSpawn { parent_thread_id, .. }, )) = notification_source.as_ref() { let client_metadata = match state.get_thread(*parent_thread_id).await { Ok(parent_thread) => { parent_thread .codex .session .app_server_client_metadata() .await } Err(error) => { tracing::warn!( error = %error, parent_thread_id = %parent_thread_id, "skipping subagent thread analytics: failed to load parent thread metadata" ); crate::session::session::AppServerClientMetadata { client_name: None, client_version: None, } } }; let thread_config = new_thread.thread.codex.thread_config_snapshot().await; emit_subagent_session_started( &new_thread .thread .codex .session .services .analytics_events_client, client_metadata, new_thread.thread_id, /*parent_thread_id*/ None, thread_config, subagent_source.clone(), ); } // Notify a new thread has been created. This notification will be processed by clients // to subscribe or drain this newly created thread. // TODO(jif) add helper for drain state.notify_thread_created(new_thread.thread_id); self.persist_thread_spawn_edge_for_source( new_thread.thread.as_ref(), new_thread.thread_id, notification_source.as_ref(), ) .await; self.send_input(new_thread.thread_id, initial_operation) .await?; if !new_thread.thread.enabled(Feature::MultiAgentV2) { let child_reference = agent_metadata .agent_path .as_ref() .map(ToString::to_string) .unwrap_or_else(|| new_thread.thread_id.to_string()); self.maybe_start_completion_watcher( new_thread.thread_id, notification_source, child_reference, agent_metadata.agent_path.clone(), ); } Ok(LiveAgent { thread_id: new_thread.thread_id, metadata: agent_metadata, status: self.get_status(new_thread.thread_id).await, }) } async fn spawn_forked_thread( &self, state: &Arc, config: crate::config::Config, session_source: SessionSource, options: &SpawnAgentOptions, inherited_shell_snapshot: Option>, inherited_exec_policy: Option>, ) -> CodexResult { if options.fork_parent_spawn_call_id.is_none() { return Err(CodexErr::Fatal( "spawn_agent fork requires a parent spawn call id".to_string(), )); } let Some(fork_mode) = options.fork_mode.as_ref() else { return Err(CodexErr::Fatal( "spawn_agent fork requires a fork mode".to_string(), )); }; let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, .. }) = &session_source else { return Err(CodexErr::Fatal( "spawn_agent fork requires a thread-spawn session source".to_string(), )); }; let parent_thread_id = *parent_thread_id; let parent_thread = state.get_thread(parent_thread_id).await.ok(); if let Some(parent_thread) = parent_thread.as_ref() { // `record_conversation_items` only queues persistence writes asynchronously. // Flush before snapshotting store history for a fork. parent_thread.ensure_rollout_materialized().await; parent_thread.flush_rollout().await?; } let parent_history = state .read_stored_thread(ReadThreadParams { thread_id: parent_thread_id, include_archived: true, include_history: true, }) .await? .history .ok_or_else(|| { CodexErr::Fatal(format!( "parent thread history unavailable for fork: {parent_thread_id}" )) })?; let mut forked_rollout_items = parent_history.items; if let SpawnAgentForkMode::LastNTurns(last_n_turns) = fork_mode { forked_rollout_items = truncate_rollout_to_last_n_fork_turns(&forked_rollout_items, *last_n_turns); } strip_parent_startup_context_bundle_from_forked_rollout(&mut forked_rollout_items); // MultiAgentV2 root/subagent usage hints are injected as standalone developer // messages at thread start. When forking history, drop hints from the parent // so the child gets a fresh hint that matches its own session source/config. let multi_agent_v2_usage_hint_texts_to_filter: Vec = if let Some(parent_thread) = parent_thread.as_ref() { parent_thread .codex .session .configured_multi_agent_v2_usage_hint_texts() .await } else if config.features.enabled(Feature::MultiAgentV2) { [ config.multi_agent_v2.root_agent_usage_hint_text.clone(), config.multi_agent_v2.subagent_usage_hint_text.clone(), ] .into_iter() .flatten() .collect() } else { Vec::new() }; forked_rollout_items.retain_mut(|item| { sanitize_forked_rollout_item(item, &multi_agent_v2_usage_hint_texts_to_filter) }); state .fork_thread_with_source( config.clone(), InitialHistory::Forked(forked_rollout_items), self.clone(), session_source, /*thread_source*/ Some(ThreadSource::Subagent), /*persist_extended_history*/ false, inherited_shell_snapshot, inherited_exec_policy, options.environments.clone(), ) .await } /// Resume an existing agent thread from a recorded rollout file. pub(crate) async fn resume_agent_from_rollout( &self, config: crate::config::Config, thread_id: ThreadId, session_source: SessionSource, ) -> CodexResult { let root_depth = thread_spawn_depth(&session_source).unwrap_or(0); let resumed_thread_id = Box::pin(self.resume_single_agent_from_rollout( config.clone(), thread_id, session_source, )) .await?; let state = self.upgrade()?; let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else { return Ok(resumed_thread_id); }; let Some(state_db_ctx) = resumed_thread.state_db() else { return Ok(resumed_thread_id); }; let mut resume_queue = VecDeque::from([(thread_id, root_depth)]); while let Some((parent_thread_id, parent_depth)) = resume_queue.pop_front() { let child_ids = match state_db_ctx .list_thread_spawn_children_with_status( parent_thread_id, DirectionalThreadSpawnEdgeStatus::Open, ) .await { Ok(child_ids) => child_ids, Err(err) => { warn!( "failed to load persisted thread-spawn children for {parent_thread_id}: {err}" ); continue; } }; for child_thread_id in child_ids { let child_depth = parent_depth + 1; let child_resumed = if state.get_thread(child_thread_id).await.is_ok() { true } else { let child_session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: child_depth, agent_path: None, agent_nickname: None, agent_role: None, }); match self .resume_single_agent_from_rollout( config.clone(), child_thread_id, child_session_source, ) .await { Ok(_) => true, Err(err) => { warn!("failed to resume descendant thread {child_thread_id}: {err}"); false } } }; if child_resumed { resume_queue.push_back((child_thread_id, child_depth)); } } } Ok(resumed_thread_id) } async fn resume_single_agent_from_rollout( &self, mut config: crate::config::Config, thread_id: ThreadId, session_source: SessionSource, ) -> CodexResult { if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = &session_source && *depth >= config.agent_max_depth && !config.features.enabled(Feature::MultiAgentV2) { let _ = config.features.disable(Feature::SpawnCsv); let _ = config.features.disable(Feature::Collab); } let state = self.upgrade()?; let state_db_ctx = state.state_db(); let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?; let (session_source, agent_metadata) = match session_source { SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth, agent_path, agent_role: _, agent_nickname: _, }) => { let (resumed_agent_nickname, resumed_agent_role) = if let Some(state_db_ctx) = state_db_ctx.as_ref() { match state_db_ctx.get_thread(thread_id).await { Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role), Ok(None) | Err(_) => (None, None), } } else { (None, None) }; self.prepare_thread_spawn( &mut reservation, &config, parent_thread_id, depth, agent_path, resumed_agent_role, resumed_agent_nickname, )? } other => (other, AgentMetadata::default()), }; let notification_source = session_source.clone(); let inherited_shell_snapshot = self .inherited_shell_snapshot_for_source(&state, Some(&session_source)) .await; let inherited_exec_policy = self .inherited_exec_policy_for_source(&state, Some(&session_source), &config) .await; let stored_thread = state .read_stored_thread(ReadThreadParams { thread_id, include_archived: true, include_history: true, }) .await?; let history = stored_thread .history .ok_or_else(|| CodexErr::ThreadNotFound(thread_id))? .items; let resumed_thread = state .resume_thread_with_history_with_source(ResumeThreadWithHistoryOptions { config: config.clone(), initial_history: InitialHistory::Resumed(ResumedHistory { conversation_id: thread_id, history, rollout_path: stored_thread.rollout_path, }), agent_control: self.clone(), session_source, inherited_shell_snapshot, inherited_exec_policy, }) .await?; let mut agent_metadata = agent_metadata; agent_metadata.agent_id = Some(resumed_thread.thread_id); reservation.commit(agent_metadata.clone()); // Resumed threads are re-registered in-memory and need the same listener // attachment path as freshly spawned threads. state.notify_thread_created(resumed_thread.thread_id); if !resumed_thread.thread.enabled(Feature::MultiAgentV2) { let child_reference = agent_metadata .agent_path .as_ref() .map(ToString::to_string) .unwrap_or_else(|| resumed_thread.thread_id.to_string()); self.maybe_start_completion_watcher( resumed_thread.thread_id, Some(notification_source.clone()), child_reference, agent_metadata.agent_path.clone(), ); } self.persist_thread_spawn_edge_for_source( resumed_thread.thread.as_ref(), resumed_thread.thread_id, Some(¬ification_source), ) .await; Ok(resumed_thread.thread_id) } /// Send rich user input items to an existing agent thread. pub(crate) async fn send_input( &self, agent_id: ThreadId, initial_operation: Op, ) -> CodexResult { let last_task_message = render_input_preview(&initial_operation); let state = self.upgrade()?; let result = self .handle_thread_request_result( agent_id, &state, state.send_op(agent_id, initial_operation).await, ) .await; if result.is_ok() { self.state .update_last_task_message(agent_id, last_task_message); } result } /// Append a prebuilt message to an existing agent thread outside the normal user-input path. #[cfg(test)] pub(crate) async fn append_message( &self, agent_id: ThreadId, message: ResponseItem, ) -> CodexResult { let state = self.upgrade()?; self.handle_thread_request_result( agent_id, &state, state.append_message(agent_id, message).await, ) .await } pub(crate) async fn send_inter_agent_communication( &self, agent_id: ThreadId, communication: InterAgentCommunication, ) -> CodexResult { let last_task_message = communication.content.clone(); let state = self.upgrade()?; let result = self .handle_thread_request_result( agent_id, &state, state .send_op(agent_id, Op::InterAgentCommunication { communication }) .await, ) .await; if result.is_ok() { self.state .update_last_task_message(agent_id, last_task_message); } result } /// Interrupt the current task for an existing agent thread. pub(crate) async fn interrupt_agent(&self, agent_id: ThreadId) -> CodexResult { let state = self.upgrade()?; state.send_op(agent_id, Op::Interrupt).await } async fn handle_thread_request_result( &self, agent_id: ThreadId, state: &Arc, result: CodexResult, ) -> CodexResult { if matches!(result, Err(CodexErr::InternalAgentDied)) { let _ = state.remove_thread(&agent_id).await; self.state.release_spawned_thread(agent_id); } result } /// Submit a shutdown request for a live agent without marking it explicitly closed in /// persisted spawn-edge state. pub(crate) async fn shutdown_live_agent(&self, agent_id: ThreadId) -> CodexResult { let state = self.upgrade()?; let result = if let Ok(thread) = state.get_thread(agent_id).await { thread.codex.session.ensure_rollout_materialized().await; thread.codex.session.flush_rollout().await?; let result = if matches!(thread.agent_status().await, AgentStatus::Shutdown) { Ok(String::new()) } else { state.send_op(agent_id, Op::Shutdown {}).await }; thread.wait_until_terminated().await; result } else { state.send_op(agent_id, Op::Shutdown {}).await }; let _ = state.remove_thread(&agent_id).await; self.state.release_spawned_thread(agent_id); result } /// Mark `agent_id` as explicitly closed in persisted spawn-edge state, then shut down the /// agent and any live descendants reached from the in-memory tree. pub(crate) async fn close_agent(&self, agent_id: ThreadId) -> CodexResult { let state = self.upgrade()?; if let Ok(thread) = state.get_thread(agent_id).await && let Some(state_db_ctx) = thread.state_db() && let Err(err) = state_db_ctx .set_thread_spawn_edge_status(agent_id, DirectionalThreadSpawnEdgeStatus::Closed) .await { warn!("failed to persist thread-spawn edge status for {agent_id}: {err}"); } Box::pin(self.shutdown_agent_tree(agent_id)).await } /// Shut down `agent_id` and any live descendants reachable from the in-memory spawn tree. async fn shutdown_agent_tree(&self, agent_id: ThreadId) -> CodexResult { let descendant_ids = self.live_thread_spawn_descendants(agent_id).await?; let result = self.shutdown_live_agent(agent_id).await; for descendant_id in descendant_ids { match self.shutdown_live_agent(descendant_id).await { Ok(_) | Err(CodexErr::ThreadNotFound(_)) | Err(CodexErr::InternalAgentDied) => {} Err(err) => return Err(err), } } result } /// Fetch the last known status for `agent_id`, returning `NotFound` when unavailable. pub(crate) async fn get_status(&self, agent_id: ThreadId) -> AgentStatus { let Ok(state) = self.upgrade() else { // No agent available if upgrade fails. return AgentStatus::NotFound; }; let Ok(thread) = state.get_thread(agent_id).await else { return AgentStatus::NotFound; }; thread.agent_status().await } pub(crate) fn register_session_root( &self, current_thread_id: ThreadId, current_session_source: &SessionSource, ) { if thread_spawn_parent_thread_id(current_session_source).is_none() { self.state.register_root_thread(current_thread_id); } } pub(crate) fn get_agent_metadata(&self, agent_id: ThreadId) -> Option { self.state.agent_metadata_for_thread(agent_id) } pub(crate) async fn list_live_agent_subtree_thread_ids( &self, agent_id: ThreadId, ) -> CodexResult> { let mut thread_ids = vec![agent_id]; thread_ids.extend(self.live_thread_spawn_descendants(agent_id).await?); Ok(thread_ids) } pub(crate) async fn get_agent_config_snapshot( &self, agent_id: ThreadId, ) -> Option { let Ok(state) = self.upgrade() else { return None; }; let Ok(thread) = state.get_thread(agent_id).await else { return None; }; Some(thread.config_snapshot().await) } pub(crate) async fn resolve_agent_reference( &self, _current_thread_id: ThreadId, current_session_source: &SessionSource, agent_reference: &str, ) -> CodexResult { let current_agent_path = current_session_source .get_agent_path() .unwrap_or_else(AgentPath::root); let agent_path = current_agent_path .resolve(agent_reference) .map_err(CodexErr::UnsupportedOperation)?; if let Some(thread_id) = self.state.agent_id_for_path(&agent_path) { return Ok(thread_id); } Err(CodexErr::UnsupportedOperation(format!( "live agent path `{}` not found", agent_path.as_str() ))) } /// Subscribe to status updates for `agent_id`, yielding the latest value and changes. pub(crate) async fn subscribe_status( &self, agent_id: ThreadId, ) -> CodexResult> { let state = self.upgrade()?; let thread = state.get_thread(agent_id).await?; Ok(thread.subscribe_status()) } pub(crate) async fn format_environment_context_subagents( &self, parent_thread_id: ThreadId, ) -> String { let Ok(agents) = self.open_thread_spawn_children(parent_thread_id).await else { return String::new(); }; agents .into_iter() .map(|(thread_id, metadata)| { let reference = metadata .agent_path .as_ref() .map(|agent_path| agent_path.name().to_string()) .unwrap_or_else(|| thread_id.to_string()); format_subagent_context_line(reference.as_str(), metadata.agent_nickname.as_deref()) }) .collect::>() .join("\n") } pub(crate) async fn list_agents( &self, current_session_source: &SessionSource, path_prefix: Option<&str>, ) -> CodexResult> { let state = self.upgrade()?; let resolved_prefix = path_prefix .map(|prefix| { current_session_source .get_agent_path() .unwrap_or_else(AgentPath::root) .resolve(prefix) .map_err(CodexErr::UnsupportedOperation) }) .transpose()?; let mut live_agents = self.state.live_agents(); live_agents.sort_by(|left, right| { left.agent_path .as_deref() .unwrap_or_default() .cmp(right.agent_path.as_deref().unwrap_or_default()) .then_with(|| { left.agent_id .map(|id| id.to_string()) .unwrap_or_default() .cmp(&right.agent_id.map(|id| id.to_string()).unwrap_or_default()) }) }); let root_path = AgentPath::root(); let mut agents = Vec::with_capacity(live_agents.len().saturating_add(1)); if resolved_prefix .as_ref() .is_none_or(|prefix| agent_matches_prefix(Some(&root_path), prefix)) && let Some(root_thread_id) = self.state.agent_id_for_path(&root_path) && let Ok(root_thread) = state.get_thread(root_thread_id).await { agents.push(ListedAgent { agent_name: root_path.to_string(), agent_status: root_thread.agent_status().await, last_task_message: Some(ROOT_LAST_TASK_MESSAGE.to_string()), }); } for metadata in live_agents { let Some(thread_id) = metadata.agent_id else { continue; }; if resolved_prefix .as_ref() .is_some_and(|prefix| !agent_matches_prefix(metadata.agent_path.as_ref(), prefix)) { continue; } let Ok(thread) = state.get_thread(thread_id).await else { continue; }; let agent_name = metadata .agent_path .as_ref() .map(ToString::to_string) .unwrap_or_else(|| thread_id.to_string()); let last_task_message = metadata.last_task_message.clone(); agents.push(ListedAgent { agent_name, agent_status: thread.agent_status().await, last_task_message, }); } Ok(agents) } /// Starts a detached watcher for sub-agents spawned from another thread. /// /// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and /// can receive completion notifications. fn maybe_start_completion_watcher( &self, child_thread_id: ThreadId, session_source: Option, child_reference: String, child_agent_path: Option, ) { let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, .. })) = session_source else { return; }; let control = self.clone(); tokio::spawn(async move { let status = match control.subscribe_status(child_thread_id).await { Ok(mut status_rx) => { let mut status = status_rx.borrow().clone(); while !is_final(&status) { if status_rx.changed().await.is_err() { status = control.get_status(child_thread_id).await; break; } status = status_rx.borrow().clone(); } status } Err(_) => control.get_status(child_thread_id).await, }; if !is_final(&status) { return; } let Ok(state) = control.upgrade() else { return; }; let child_thread = state.get_thread(child_thread_id).await.ok(); let message = format_subagent_notification_message(child_reference.as_str(), &status); if child_agent_path.is_some() && child_thread .as_ref() .map(|thread| thread.enabled(Feature::MultiAgentV2)) .unwrap_or(true) { let Some(child_agent_path) = child_agent_path.clone() else { return; }; let Some(parent_agent_path) = child_agent_path .as_str() .rsplit_once('/') .and_then(|(parent, _)| AgentPath::try_from(parent).ok()) else { return; }; let communication = InterAgentCommunication::new( child_agent_path, parent_agent_path, Vec::new(), message, /*trigger_turn*/ false, ); let _ = control .send_inter_agent_communication(parent_thread_id, communication) .await; return; } let Ok(parent_thread) = state.get_thread(parent_thread_id).await else { return; }; parent_thread .inject_user_message_without_turn(message) .await; }); } #[allow(clippy::too_many_arguments)] fn prepare_thread_spawn( &self, reservation: &mut crate::agent::registry::SpawnReservation, config: &crate::config::Config, parent_thread_id: ThreadId, depth: i32, agent_path: Option, agent_role: Option, preferred_agent_nickname: Option, ) -> CodexResult<(SessionSource, AgentMetadata)> { if depth == 1 { self.state.register_root_thread(parent_thread_id); } if let Some(agent_path) = agent_path.as_ref() { reservation.reserve_agent_path(agent_path)?; } let candidate_names = agent_nickname_candidates(config, agent_role.as_deref()); let candidate_name_refs: Vec<&str> = candidate_names.iter().map(String::as_str).collect(); let agent_nickname = Some(reservation.reserve_agent_nickname_with_preference( &candidate_name_refs, preferred_agent_nickname.as_deref(), )?); let session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth, agent_path: agent_path.clone(), agent_nickname: agent_nickname.clone(), agent_role: agent_role.clone(), }); let agent_metadata = AgentMetadata { agent_id: None, agent_path, agent_nickname, agent_role, last_task_message: None, }; Ok((session_source, agent_metadata)) } fn upgrade(&self) -> CodexResult> { self.manager .upgrade() .ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string())) } async fn inherited_shell_snapshot_for_source( &self, state: &Arc, session_source: Option<&SessionSource>, ) -> Option> { let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, .. })) = session_source else { return None; }; let parent_thread = state.get_thread(*parent_thread_id).await.ok()?; parent_thread.codex.session.user_shell().shell_snapshot() } async fn inherited_exec_policy_for_source( &self, state: &Arc, session_source: Option<&SessionSource>, child_config: &crate::config::Config, ) -> Option> { let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, .. })) = session_source else { return None; }; let parent_thread = state.get_thread(*parent_thread_id).await.ok()?; let parent_config = parent_thread.codex.session.get_config().await; if !crate::exec_policy::child_uses_parent_exec_policy(&parent_config, child_config) { return None; } Some(Arc::clone( &parent_thread.codex.session.services.exec_policy, )) } async fn open_thread_spawn_children( &self, parent_thread_id: ThreadId, ) -> CodexResult> { let mut children_by_parent = self.live_thread_spawn_children().await?; Ok(children_by_parent .remove(&parent_thread_id) .unwrap_or_default()) } async fn live_thread_spawn_children( &self, ) -> CodexResult>> { let state = self.upgrade()?; let mut children_by_parent = HashMap::>::new(); for thread_id in state.list_thread_ids().await { let Ok(thread) = state.get_thread(thread_id).await else { continue; }; let snapshot = thread.config_snapshot().await; let Some(parent_thread_id) = thread_spawn_parent_thread_id(&snapshot.session_source) else { continue; }; children_by_parent .entry(parent_thread_id) .or_default() .push(( thread_id, self.state .agent_metadata_for_thread(thread_id) .unwrap_or(AgentMetadata { agent_id: Some(thread_id), ..Default::default() }), )); } for children in children_by_parent.values_mut() { children.sort_by(|left, right| { left.1 .agent_path .as_deref() .unwrap_or_default() .cmp(right.1.agent_path.as_deref().unwrap_or_default()) .then_with(|| left.0.to_string().cmp(&right.0.to_string())) }); } Ok(children_by_parent) } async fn persist_thread_spawn_edge_for_source( &self, thread: &crate::CodexThread, child_thread_id: ThreadId, session_source: Option<&SessionSource>, ) { let Some(parent_thread_id) = session_source.and_then(thread_spawn_parent_thread_id) else { return; }; let Some(state_db_ctx) = thread.state_db() else { return; }; if let Err(err) = state_db_ctx .upsert_thread_spawn_edge( parent_thread_id, child_thread_id, DirectionalThreadSpawnEdgeStatus::Open, ) .await { warn!("failed to persist thread-spawn edge: {err}"); } } async fn live_thread_spawn_descendants( &self, root_thread_id: ThreadId, ) -> CodexResult> { let mut children_by_parent = self.live_thread_spawn_children().await?; let mut descendants = Vec::new(); let mut stack = children_by_parent .remove(&root_thread_id) .unwrap_or_default() .into_iter() .map(|(child_thread_id, _)| child_thread_id) .rev() .collect::>(); while let Some(thread_id) = stack.pop() { descendants.push(thread_id); if let Some(children) = children_by_parent.remove(&thread_id) { for (child_thread_id, _) in children.into_iter().rev() { stack.push(child_thread_id); } } } Ok(descendants) } } fn thread_spawn_parent_thread_id(session_source: &SessionSource) -> Option { match session_source { SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, .. }) => Some(*parent_thread_id), _ => None, } } fn agent_matches_prefix(agent_path: Option<&AgentPath>, prefix: &AgentPath) -> bool { if prefix.is_root() { return true; } agent_path.is_some_and(|agent_path| { agent_path == prefix || agent_path .as_str() .strip_prefix(prefix.as_str()) .is_some_and(|suffix| suffix.starts_with('/')) }) } pub(crate) fn render_input_preview(initial_operation: &Op) -> String { match initial_operation { Op::UserInput { items, .. } => items .iter() .map(|item| match item { UserInput::Text { text, .. } => text.clone(), UserInput::Image { .. } => "[image]".to_string(), UserInput::LocalImage { path, .. } => { format!("[local_image:{}]", path.display()) } UserInput::Skill { name, path } => format!("[skill:${name}]({})", path.display()), UserInput::Mention { name, path } => format!("[mention:${name}]({path})"), _ => "[input]".to_string(), }) .collect::>() .join("\n"), Op::InterAgentCommunication { communication } => communication.content.clone(), _ => String::new(), } } fn thread_spawn_depth(session_source: &SessionSource) -> Option { match session_source { SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) => Some(*depth), _ => None, } } #[cfg(test)] #[path = "control_tests.rs"] mod tests;