Compare commits

...

4 Commits

Author SHA1 Message Date
Albin Cassirer
28f465ed5d codex: satisfy argument comment lint 2026-05-19 17:36:28 -07:00
Albin Cassirer
119cea0683 codex: format rebased startup module 2026-05-19 17:14:37 -07:00
Albin Cassirer
8fd35ba574 Apply clippy cleanup in startup module 2026-05-19 17:14:37 -07:00
Albin Cassirer
dfc173778a Add async subagent startup for thread spawns 2026-05-19 17:14:37 -07:00
13 changed files with 874 additions and 135 deletions

View File

@@ -1514,6 +1514,10 @@
"MultiAgentV2ConfigToml": {
"additionalProperties": false,
"properties": {
"async_subagent_startup": {
"description": "When `true`, spawned MultiAgentV2 children register their thread before waiting for slow startup work such as required MCP initialization.",
"type": "boolean"
},
"default_wait_timeout_ms": {
"format": "int64",
"maximum": 3600000.0,

View File

@@ -10,6 +10,9 @@ use crate::context::ContextualUserFragment;
use crate::context::SubagentNotification;
use crate::init_state_db;
use assert_matches::assert_matches;
use codex_config::Constrained;
use codex_config::types::McpServerConfig;
use codex_config::types::McpServerTransportConfig;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_protocol::AgentPath;
@@ -32,6 +35,7 @@ use codex_thread_store::LocalThreadStore;
use codex_thread_store::LocalThreadStoreConfig;
use codex_thread_store::ThreadStore;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use tempfile::TempDir;
use tokio::time::Duration;
use tokio::time::sleep;
@@ -74,6 +78,46 @@ fn assistant_message(text: &str, phase: Option<MessagePhase>) -> ResponseItem {
}
}
/// Returns a required stdio MCP server that never completes initialization
/// before its startup timeout, forcing the async startup path to surface a
/// delayed failure from the child thread.
fn blocking_required_stdio_mcp_server() -> McpServerConfig {
let (command, args) = if cfg!(windows) {
(
"cmd".to_string(),
vec!["/C".to_string(), "ping -n 5 127.0.0.1 > NUL".to_string()],
)
} else {
(
"sh".to_string(),
vec!["-c".to_string(), "sleep 5".to_string()],
)
};
McpServerConfig {
transport: McpServerTransportConfig::Stdio {
command,
args,
env: None,
env_vars: Vec::new(),
cwd: None,
},
experimental_environment: None,
enabled: true,
required: true,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: Some(Duration::from_millis(150)),
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth: None,
oauth_resource: None,
tools: HashMap::new(),
}
}
fn spawn_agent_call(call_id: &str) -> ResponseItem {
ResponseItem::FunctionCall {
id: None,
@@ -123,6 +167,45 @@ impl AgentControlHarness {
}
}
async fn spawn_async_child_with_blocking_required_mcp(
harness: &AgentControlHarness,
parent_thread_id: ThreadId,
worker_path: AgentPath,
) -> LiveAgent {
let mut config = harness.config.clone();
let _ = config.features.enable(Feature::MultiAgentV2);
config.multi_agent_v2.async_subagent_startup = true;
config.mcp_servers = Constrained::allow_any(HashMap::from([(
"slow_required".to_string(),
blocking_required_stdio_mcp_server(),
)]));
harness
.control
.spawn_agent_with_metadata(
config,
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
AgentPath::root(),
worker_path.clone(),
Vec::new(),
"hello worker".to_string(),
/*trigger_turn*/ true,
),
},
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: Some(worker_path),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
SpawnAgentOptions::default(),
)
.await
.expect("child spawn should succeed")
}
fn has_subagent_notification(history_items: &[ResponseItem]) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else {
@@ -199,6 +282,81 @@ async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> boo
timeout(Duration::from_secs(10), wait).await.is_ok()
}
async fn wait_for_subagent_notification_with_timeout(
parent_thread: &Arc<CodexThread>,
timeout_duration: Duration,
) -> bool {
let wait = async {
loop {
let history_items = parent_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
if has_subagent_notification(&history_items) {
return true;
}
sleep(Duration::from_millis(25)).await;
}
};
timeout(timeout_duration, wait).await.is_ok()
}
async fn wait_for_pending_parent_inbox(parent_thread: &Arc<CodexThread>) -> bool {
let wait = async {
loop {
if parent_thread
.codex
.session
.input_queue
.has_pending_input(&parent_thread.codex.session.active_turn)
.await
{
return true;
}
sleep(Duration::from_millis(25)).await;
}
};
timeout(Duration::from_secs(10), wait).await.is_ok()
}
async fn wait_for_parent_startup_failure_notification(
harness: &AgentControlHarness,
parent_thread_id: ThreadId,
worker_path: &AgentPath,
) -> bool {
let wait = async {
loop {
let found = harness
.manager
.captured_ops()
.into_iter()
.any(|(thread_id, op)| {
if thread_id != parent_thread_id {
return false;
}
let Op::InterAgentCommunication { communication } = op else {
return false;
};
communication.author == *worker_path
&& communication.recipient == AgentPath::root()
&& !communication.trigger_turn
&& SubagentNotification::matches_text(&communication.content)
&& communication
.content
.contains("required MCP servers failed to initialize")
});
if found {
return true;
}
sleep(Duration::from_millis(25)).await;
}
};
timeout(Duration::from_secs(10), wait).await.is_ok()
}
async fn persist_thread_for_tree_resume(thread: &Arc<CodexThread>, message: &str) {
thread
.inject_user_message_without_turn(message.to_string())
@@ -1575,6 +1733,81 @@ async fn multi_agent_v2_completion_ignores_dead_direct_parent() {
assert!(!has_subagent_notification(&root_history_items));
}
#[tokio::test]
async fn async_subagent_startup_registers_child_before_required_mcp_failure() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let worker_path = AgentPath::root().join("worker").expect("worker path");
let spawned = spawn_async_child_with_blocking_required_mcp(
&harness,
parent_thread_id,
worker_path.clone(),
)
.await;
assert_eq!(spawned.status, AgentStatus::PendingInit);
let child_thread = harness
.manager
.get_thread(spawned.thread_id)
.await
.expect("child thread should exist");
assert!(
child_thread
.codex
.session
.active_turn
.lock()
.await
.is_none()
);
assert_eq!(
harness.control.get_status(spawned.thread_id).await,
AgentStatus::PendingInit,
"the child should stay in PendingInit until deferred startup either succeeds or fails"
);
assert!(
wait_for_parent_startup_failure_notification(&harness, parent_thread_id, &worker_path)
.await,
"startup failure should enqueue a mailbox notification for the parent"
);
// MultiAgentV2 children report back through the parent's mailbox, so the
// notification should land as pending input rather than as an immediate
// history item.
assert!(
wait_for_pending_parent_inbox(&parent_thread).await,
"parent should receive a startup failure notification from the child"
);
assert_matches!(
harness.control.get_status(spawned.thread_id).await,
AgentStatus::Errored(message)
if message.contains("required MCP servers failed to initialize")
);
}
#[tokio::test]
async fn async_subagent_startup_does_not_report_failure_after_child_shutdown() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let worker_path = AgentPath::root().join("worker").expect("worker path");
let spawned =
spawn_async_child_with_blocking_required_mcp(&harness, parent_thread_id, worker_path).await;
harness
.control
.shutdown_live_agent(spawned.thread_id)
.await
.expect("shutdown should succeed");
assert!(
!wait_for_subagent_notification_with_timeout(&parent_thread, Duration::from_millis(750))
.await,
"shutdown should suppress late async-startup failure notifications"
);
}
#[tokio::test]
async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
let harness = AgentControlHarness::new().await;

View File

@@ -10186,6 +10186,7 @@ async fn multi_agent_v2_config_from_feature_table() -> std::io::Result<()> {
codex_home.path().join(CONFIG_TOML_FILE),
r#"[features.multi_agent_v2]
enabled = true
async_subagent_startup = true
max_concurrent_threads_per_session = 5
min_wait_timeout_ms = 2500
max_wait_timeout_ms = 120000
@@ -10207,6 +10208,7 @@ non_code_mode_only = true
.await?;
assert!(config.features.enabled(Feature::MultiAgentV2));
assert!(config.multi_agent_v2.async_subagent_startup);
assert_eq!(config.multi_agent_v2.max_concurrent_threads_per_session, 5);
assert_eq!(config.multi_agent_v2.min_wait_timeout_ms, 2500);
assert_eq!(config.multi_agent_v2.max_wait_timeout_ms, 120000);
@@ -10243,6 +10245,7 @@ async fn profile_multi_agent_v2_config_overrides_base() -> std::io::Result<()> {
r#"profile = "no_hint"
[features.multi_agent_v2]
async_subagent_startup = false
max_concurrent_threads_per_session = 4
min_wait_timeout_ms = 3000
max_wait_timeout_ms = 120000
@@ -10256,6 +10259,7 @@ hide_spawn_agent_metadata = true
non_code_mode_only = false
[profiles.no_hint.features.multi_agent_v2]
async_subagent_startup = true
max_concurrent_threads_per_session = 6
min_wait_timeout_ms = 1500
max_wait_timeout_ms = 90000
@@ -10276,6 +10280,7 @@ non_code_mode_only = true
.build()
.await?;
assert!(config.multi_agent_v2.async_subagent_startup);
assert_eq!(config.multi_agent_v2.max_concurrent_threads_per_session, 6);
assert_eq!(config.multi_agent_v2.min_wait_timeout_ms, 1500);
assert_eq!(config.multi_agent_v2.max_wait_timeout_ms, 90000);
@@ -10320,6 +10325,7 @@ enabled = true
.await?;
assert_eq!(config.multi_agent_v2.max_concurrent_threads_per_session, 4);
assert!(!config.multi_agent_v2.async_subagent_startup);
assert_eq!(config.multi_agent_v2.min_wait_timeout_ms, 10_000);
assert_eq!(config.multi_agent_v2.max_wait_timeout_ms, 3_600_000);
assert_eq!(config.multi_agent_v2.default_wait_timeout_ms, 30_000);

View File

@@ -958,6 +958,9 @@ pub struct Config {
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct MultiAgentV2Config {
/// Register spawned MultiAgentV2 children before slow MCP startup
/// finishes so the parent is not blocked on tool initialization.
pub async_subagent_startup: bool,
pub max_concurrent_threads_per_session: usize,
pub min_wait_timeout_ms: i64,
pub max_wait_timeout_ms: i64,
@@ -974,6 +977,7 @@ pub struct MultiAgentV2Config {
impl Default for MultiAgentV2Config {
fn default() -> Self {
Self {
async_subagent_startup: false,
max_concurrent_threads_per_session:
DEFAULT_MULTI_AGENT_V2_MAX_CONCURRENT_THREADS_PER_SESSION,
min_wait_timeout_ms: DEFAULT_MULTI_AGENT_V2_MIN_WAIT_TIMEOUT_MS,
@@ -2165,6 +2169,10 @@ fn resolve_multi_agent_v2_config(
let profile = multi_agent_v2_toml_config(config_profile.features.as_ref());
let default = MultiAgentV2Config::default();
let async_subagent_startup = profile
.and_then(|config| config.async_subagent_startup)
.or_else(|| base.and_then(|config| config.async_subagent_startup))
.unwrap_or(default.async_subagent_startup);
let max_concurrent_threads_per_session = profile
.and_then(|config| config.max_concurrent_threads_per_session)
.or_else(|| base.and_then(|config| config.max_concurrent_threads_per_session))
@@ -2215,6 +2223,7 @@ fn resolve_multi_agent_v2_config(
.unwrap_or(default.non_code_mode_only);
MultiAgentV2Config {
async_subagent_startup,
max_concurrent_threads_per_session,
min_wait_timeout_ms,
max_wait_timeout_ms,

View File

@@ -9,6 +9,7 @@ use tracing::Instrument;
use tracing::debug_span;
use tracing::info_span;
use crate::session::SessionStartupState;
use crate::session::SteerInputError;
use crate::session::session::Session;
use crate::session::session::SessionSettingsUpdate;
@@ -183,6 +184,38 @@ async fn thread_settings_applied_event(sess: &Session) -> EventMsg {
})
}
async fn reject_turn_start_until_session_ready(sess: &Arc<Session>, sub_id: &str) -> bool {
// Async subagent startup only defers mailbox-triggered work today. Any
// direct turn-starting request that arrives before startup completes is
// rejected explicitly so it cannot race the child into running with an
// incomplete tool set or partially reconstructed history.
match sess.startup_state() {
SessionStartupState::Ready => false,
SessionStartupState::Initializing => {
sess.send_event_raw(Event {
id: sub_id.to_string(),
msg: EventMsg::Error(ErrorEvent {
message: "thread startup is still in progress".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
}),
})
.await;
true
}
SessionStartupState::Failed(message) => {
sess.send_event_raw(Event {
id: sub_id.to_string(),
msg: EventMsg::Error(ErrorEvent {
message: format!("thread failed to initialize: {message}"),
codex_error_info: Some(CodexErrorInfo::Other),
}),
})
.await;
true
}
}
}
pub(super) async fn user_input_or_turn_inner(
sess: &Arc<Session>,
sub_id: String,
@@ -208,6 +241,10 @@ pub(super) async fn user_input_or_turn_inner(
updates.final_output_json_schema = Some(final_output_json_schema);
updates.environments = environments;
if reject_turn_start_until_session_ready(sess, &sub_id).await {
return;
}
let Ok(current_context) = sess.new_turn_with_sub_id(sub_id.clone(), updates).await else {
// new_turn_with_sub_id already emits the error event.
return;
@@ -325,6 +362,10 @@ pub async fn run_user_shell_command(sess: &Arc<Session>, sub_id: String, command
return;
}
if reject_turn_start_until_session_ready(sess, &sub_id).await {
return;
}
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
sess.spawn_task(
Arc::clone(&turn_context),
@@ -458,6 +499,10 @@ pub async fn reload_user_config(sess: &Arc<Session>) {
}
pub async fn compact(sess: &Arc<Session>, sub_id: String) {
if reject_turn_start_until_session_ready(sess, &sub_id).await {
return;
}
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
sess.spawn_task(
@@ -606,6 +651,7 @@ pub async fn set_thread_memory_mode(sess: &Arc<Session>, sub_id: String, mode: T
async fn shutdown_session_runtime(sess: &Arc<Session>) {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
sess.cancel_mcp_startup().await;
let _ = sess.conversation.shutdown().await;
sess.services
.unified_exec_manager
@@ -683,6 +729,10 @@ pub async fn review(
sub_id: String,
review_request: ReviewRequest,
) {
if reject_turn_start_until_session_ready(sess, &sub_id).await {
return;
}
let turn_context = sess.new_default_turn_with_sub_id(sub_id.clone()).await;
sess.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref())
.await;

View File

@@ -200,6 +200,7 @@ mod review;
mod rollout_reconstruction;
#[allow(clippy::module_inception)]
pub(crate) mod session;
mod startup;
pub(crate) mod turn;
pub(crate) mod turn_context;
use self::config_lock::export_config_lock_if_configured;
@@ -214,6 +215,7 @@ use self::session::AppServerClientMetadata;
use self::session::Session;
use self::session::SessionConfiguration;
pub(crate) use self::session::SessionSettingsUpdate;
pub(crate) use self::startup::SessionStartupState;
#[cfg(test)]
use self::turn::AssistantMessageStreamParsers;
#[cfg(test)]
@@ -652,7 +654,7 @@ impl Codex {
tx_event.clone(),
agent_status_tx.clone(),
conversation_history,
session_source_clone,
session_source_clone.clone(),
skills_manager,
plugins_manager,
mcp_manager.clone(),
@@ -670,6 +672,15 @@ impl Codex {
map_session_init_error(&e, &config.codex_home)
})?;
let thread_id = session.conversation_id;
if !matches!(
session_source_clone,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
) {
// Thread-spawn children must wait until ThreadManager registration
// finishes. Other session sources have no external registration
// boundary, so they can kick off any deferred startup immediately.
session.start_deferred_startup_if_needed().await;
}
// This task will run until Op::Shutdown is received.
let session_for_loop = Arc::clone(&session);
@@ -1661,8 +1672,8 @@ impl Session {
return;
}
self.forward_child_completion_to_parent(
turn_context,
self.forward_child_status_to_parent(
Some(turn_context),
*parent_thread_id,
child_agent_path,
status,
@@ -1670,10 +1681,14 @@ impl Session {
.await;
}
/// Sends the standard completion envelope from a spawned MultiAgentV2 child to its parent.
async fn forward_child_completion_to_parent(
/// Sends a status envelope from a spawned MultiAgentV2 child to its parent.
///
/// Turn completions carry a turn context so rollout trace interactions can
/// be recorded. Startup failures happen before any child turn exists, so
/// they reuse the same parent notification path without trace recording.
async fn forward_child_status_to_parent(
&self,
turn_context: &TurnContext,
turn_context: Option<&TurnContext>,
parent_thread_id: ThreadId,
child_agent_path: &codex_protocol::AgentPath,
status: AgentStatus,
@@ -1710,7 +1725,7 @@ impl Session {
debug!("failed to notify parent thread {parent_thread_id}: {err}");
return;
}
if let Some(message) = trace_message {
if let (Some(message), Some(turn_context)) = (trace_message, turn_context) {
self.services
.rollout_thread_trace
.record_agent_result_interaction(
@@ -1725,6 +1740,37 @@ impl Session {
}
}
async fn maybe_notify_parent_of_startup_failure(&self, status: AgentStatus) {
if !self.enabled(Feature::MultiAgentV2) {
return;
}
let session_source = {
let state = self.state.lock().await;
state.session_configuration.session_source.clone()
};
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
agent_path: Some(child_agent_path),
..
}) = session_source
else {
return;
};
if !is_final(&status) {
return;
}
self.forward_child_status_to_parent(
/*turn_context*/ None,
parent_thread_id,
&child_agent_path,
status,
)
.await;
}
async fn maybe_mirror_event_text_to_realtime(&self, msg: &EventMsg) {
let Some(text) = realtime_text_for_event(msg) else {
return;

View File

@@ -1,4 +1,6 @@
use super::input_queue::InputQueue;
use super::startup::PendingSessionStartup;
use super::startup::async_subagent_startup_enabled;
use super::*;
use crate::goals::GoalRuntimeState;
use crate::state::ActiveTurn;
@@ -18,6 +20,13 @@ pub(crate) struct Session {
pub(crate) installation_id: String,
pub(super) tx_event: Sender<Event>,
pub(super) agent_status: watch::Sender<AgentStatus>,
/// Gates model-driven turn creation until startup has either finished or
/// failed decisively. Spawned subagents can therefore exist early without
/// racing the model against incomplete MCP setup.
pub(super) startup_state: watch::Sender<SessionStartupState>,
/// Holds deferred startup work until ThreadManager has registered the
/// thread and made the child visible to parent-facing APIs.
pub(super) deferred_startup: Mutex<Option<PendingSessionStartup>>,
pub(super) out_of_band_elicitation_paused: watch::Sender<bool>,
pub(super) state: Mutex<SessionState>,
/// Serializes rebuild/apply cycles for the running proxy; each cycle
@@ -418,10 +427,6 @@ impl Session {
#[instrument(name = "session_init", level = "info", skip_all)]
#[allow(clippy::too_many_arguments)]
#[expect(
clippy::await_holding_invalid_type,
reason = "session initialization must serialize access through session-owned manager guards"
)]
pub(crate) async fn new(
mut session_configuration: SessionConfiguration,
config: Arc<Config>,
@@ -450,6 +455,8 @@ impl Session {
session_configuration.provider
);
let forked_from_id = initial_history.forked_from_id();
let async_subagent_startup =
async_subagent_startup_enabled(config.as_ref(), &session_configuration.session_source);
let event_persistence_mode = if session_configuration.persist_extended_history {
ThreadEventPersistenceMode::Extended
@@ -951,12 +958,19 @@ impl Session {
.set_window_generation(window_generation);
let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) =
watch::channel(false);
let (startup_state, _startup_state_rx) = watch::channel(if async_subagent_startup {
SessionStartupState::Initializing
} else {
SessionStartupState::Ready
});
let sess = Arc::new(Session {
conversation_id: thread_id,
installation_id,
tx_event: tx_event.clone(),
agent_status,
startup_state,
deferred_startup: Mutex::new(None),
out_of_band_elicitation_paused,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
@@ -1009,116 +1023,6 @@ impl Session {
for event in events {
sess.send_event_raw(event).await;
}
let mut required_mcp_servers: Vec<String> = mcp_servers
.iter()
.filter(|(_, server)| server.enabled() && server.required())
.map(|(name, _)| name.clone())
.collect();
required_mcp_servers.sort();
let enabled_mcp_server_count =
mcp_servers.values().filter(|server| server.enabled()).count();
let required_mcp_server_count = required_mcp_servers.len();
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()).await;
let host_owned_codex_apps_enabled = config
.features
.apps_enabled_for_auth(auth.as_ref().is_some_and(|auth| auth.uses_codex_backend()));
let client_elicitation_capability = if config.features.enabled(Feature::AuthElicitation) {
ElicitationCapability {
form: Some(FormElicitationCapability::default()),
url: Some(UrlElicitationCapability::default()),
}
} else {
ElicitationCapability::default()
};
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
cancel_guard.cancel();
*cancel_guard = CancellationToken::new();
}
let turn_environment = crate::environment_selection::resolve_environment_selections(
sess.services.environment_manager.as_ref(),
&session_configuration.environments,
)
.map_err(|err| {
CodexErr::InvalidRequest(err.to_string().replace(
"unknown turn environment id",
"unknown stored MCP environment id",
))
})?
.primary()
.cloned();
let mcp_runtime_environment = match turn_environment {
Some(turn_environment) => McpRuntimeEnvironment::new(
Some(Arc::clone(&turn_environment.environment)),
sess.services.environment_manager.try_local_environment(),
turn_environment.cwd.to_path_buf(),
),
None => McpRuntimeEnvironment::new(
sess.services.environment_manager.default_or_local_environment(),
sess.services.environment_manager.try_local_environment(),
session_configuration.cwd.to_path_buf(),
),
};
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_statuses.clone(),
&session_configuration.approval_policy,
INITIAL_SUBMIT_ID.to_owned(),
tx_event.clone(),
session_configuration.permission_profile(),
mcp_runtime_environment,
config.codex_home.to_path_buf(),
codex_apps_tools_cache_key(auth),
host_owned_codex_apps_enabled,
client_elicitation_capability,
tool_plugin_provenance,
auth,
Some(sess.mcp_elicitation_reviewer()),
)
.instrument(info_span!(
"session_init.mcp_manager_init",
otel.name = "session_init.mcp_manager_init",
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
{
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
*manager_guard = mcp_connection_manager;
}
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
if cancel_guard.is_cancelled() {
cancel_token.cancel();
}
*cancel_guard = cancel_token;
}
if !required_mcp_servers.is_empty() {
let failures = sess
.services
.mcp_connection_manager
.read()
.await
.required_startup_failures(&required_mcp_servers)
.instrument(info_span!(
"session_init.required_mcp_wait",
otel.name = "session_init.required_mcp_wait",
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
if !failures.is_empty() {
let details = failures
.iter()
.map(|failure| format!("{}: {}", failure.server, failure.error))
.collect::<Vec<_>>()
.join("; ");
anyhow::bail!("required MCP servers failed to initialize: {details}");
}
}
sess.schedule_startup_prewarm(session_configuration.base_instructions.clone())
.await;
let session_start_source = match &initial_history {
InitialHistory::Resumed(_) => codex_hooks::SessionStartSource::Resume,
InitialHistory::New | InitialHistory::Forked(_) => {
@@ -1126,15 +1030,27 @@ impl Session {
}
InitialHistory::Cleared => codex_hooks::SessionStartSource::Clear,
};
// record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted.
Box::pin(sess.record_initial_history(initial_history)).await;
{
let mut state = sess.state.lock().await;
state.set_pending_session_start_source(Some(session_start_source));
let pending_startup = PendingSessionStartup {
initial_history,
session_start_source,
auth: auth.cloned(),
mcp_servers,
auth_statuses,
base_instructions: session_configuration.base_instructions.clone(),
};
if async_subagent_startup {
{
let mut deferred_startup = sess.deferred_startup.lock().await;
// Stash the slow startup work so ThreadManager can release
// `spawn_agent` as soon as the child is registered, then
// resume initialization in the background.
*deferred_startup = Some(pending_startup);
}
Ok(sess)
} else {
Box::pin(sess.finish_initial_startup(pending_startup)).await?;
Ok(sess)
}
Ok(sess)
}
.await;
match session_result {

View File

@@ -0,0 +1,307 @@
use super::*;
use codex_mcp::EffectiveMcpServer;
use codex_mcp::McpAuthStatusEntry;
/// Tracks whether a session may start model-driven turn work yet.
///
/// Async subagent startup intentionally creates the child thread before slow
/// MCP initialization finishes. The child therefore needs a small readiness
/// gate so external work can stay queued until the session has the correct
/// tool universe and reconstructed history.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum SessionStartupState {
Ready,
Initializing,
Failed(String),
}
/// Captures the work deferred until after a session has been registered.
///
/// This keeps the async startup seam explicit: thread creation and
/// `SessionConfigured` happen first, while slow MCP initialization and initial
/// history reconstruction can either complete synchronously or continue in the
/// background for spawned subagents.
pub(super) struct PendingSessionStartup {
pub(super) initial_history: InitialHistory,
pub(super) session_start_source: codex_hooks::SessionStartSource,
pub(super) auth: Option<CodexAuth>,
pub(super) mcp_servers: HashMap<String, EffectiveMcpServer>,
pub(super) auth_statuses: HashMap<String, McpAuthStatusEntry>,
pub(super) base_instructions: String,
}
pub(super) fn async_subagent_startup_enabled(
config: &Config,
session_source: &SessionSource,
) -> bool {
// Only spawned MultiAgentV2 children have the parent-notification and
// queued-mailbox semantics needed to safely surface delayed startup
// failures after the thread has been registered.
config.features.enabled(Feature::MultiAgentV2)
&& config.multi_agent_v2.async_subagent_startup
&& matches!(
session_source,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
agent_path: Some(_),
..
})
)
}
impl Session {
pub(crate) fn startup_state(&self) -> SessionStartupState {
self.startup_state.borrow().clone()
}
pub(crate) fn startup_ready(&self) -> bool {
matches!(self.startup_state(), SessionStartupState::Ready)
}
/// Starts any deferred startup work after the thread has been registered.
pub(crate) async fn start_deferred_startup_if_needed(self: &Arc<Self>) {
let startup = {
let mut deferred_startup = self.deferred_startup.lock().await;
deferred_startup.take()
};
if let Some(startup) = startup {
// Only one caller should ever observe pending startup work, but the
// explicit take keeps repeated calls harmless and makes the handoff
// point easy to reason about.
self.spawn_initial_startup(startup);
}
}
fn spawn_initial_startup(self: &Arc<Self>, startup: PendingSessionStartup) {
let session = Arc::clone(self);
let thread_id = session.thread_id();
tokio::spawn(
async move {
if let Err(err) = session.finish_initial_startup(startup).await {
session.handle_initial_startup_failure(err).await;
}
}
.instrument(tracing::info_span!(
"session_init.deferred_startup",
otel.name = "session_init.deferred_startup",
thread_id = %thread_id,
)),
);
}
pub(super) async fn finish_initial_startup(
self: &Arc<Self>,
startup: PendingSessionStartup,
) -> anyhow::Result<()> {
let PendingSessionStartup {
initial_history,
session_start_source,
auth,
mcp_servers,
auth_statuses,
base_instructions,
} = startup;
let config = self.get_config().await;
let session_configuration = {
let state = self.state.lock().await;
state.session_configuration.clone()
};
let mut required_mcp_servers: Vec<String> = mcp_servers
.iter()
.filter(|(_, server)| server.enabled() && server.required())
.map(|(name, _)| name.clone())
.collect();
required_mcp_servers.sort();
let enabled_mcp_server_count = mcp_servers
.values()
.filter(|server| server.enabled())
.count();
let required_mcp_server_count = required_mcp_servers.len();
let tool_plugin_provenance = self
.services
.mcp_manager
.tool_plugin_provenance(config.as_ref())
.await;
let host_owned_codex_apps_enabled = config.features.apps_enabled_for_auth(
auth.as_ref()
.is_some_and(codex_login::CodexAuth::uses_codex_backend),
);
let client_elicitation_capability = if config.features.enabled(Feature::AuthElicitation) {
ElicitationCapability {
form: Some(FormElicitationCapability::default()),
url: Some(UrlElicitationCapability::default()),
}
} else {
ElicitationCapability::default()
};
{
let mut cancel_guard = self.services.mcp_startup_cancellation_token.lock().await;
cancel_guard.cancel();
*cancel_guard = CancellationToken::new();
}
let turn_environment = crate::environment_selection::resolve_environment_selections(
self.services.environment_manager.as_ref(),
&session_configuration.environments,
)
.map_err(|err| {
CodexErr::InvalidRequest(err.to_string().replace(
"unknown turn environment id",
"unknown stored MCP environment id",
))
})?
.primary()
.cloned();
let mcp_runtime_environment = match turn_environment {
Some(turn_environment) => McpRuntimeEnvironment::new(
Some(Arc::clone(&turn_environment.environment)),
self.services.environment_manager.try_local_environment(),
turn_environment.cwd.to_path_buf(),
),
None => McpRuntimeEnvironment::new(
self.services
.environment_manager
.default_or_local_environment(),
self.services.environment_manager.try_local_environment(),
session_configuration.cwd.to_path_buf(),
),
};
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_statuses,
&session_configuration.approval_policy,
INITIAL_SUBMIT_ID.to_owned(),
self.get_tx_event(),
session_configuration.permission_profile(),
mcp_runtime_environment,
config.codex_home.to_path_buf(),
codex_apps_tools_cache_key(auth.as_ref()),
host_owned_codex_apps_enabled,
client_elicitation_capability,
tool_plugin_provenance,
auth.as_ref(),
Some(self.mcp_elicitation_reviewer()),
)
.instrument(tracing::info_span!(
"session_init.mcp_manager_init",
otel.name = "session_init.mcp_manager_init",
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
{
let mut cancel_guard = self.services.mcp_startup_cancellation_token.lock().await;
if cancel_guard.is_cancelled() {
cancel_token.cancel();
}
*cancel_guard = cancel_token;
}
if !required_mcp_servers.is_empty() {
let failures = mcp_connection_manager
.required_startup_failures(&required_mcp_servers)
.instrument(tracing::info_span!(
"session_init.required_mcp_wait",
otel.name = "session_init.required_mcp_wait",
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
if !failures.is_empty() {
let details = failures
.iter()
.map(|failure| format!("{}: {}", failure.server, failure.error))
.collect::<Vec<_>>()
.join("; ");
anyhow::bail!("required MCP servers failed to initialize: {details}");
}
}
{
// Publish the live manager only after the required-server gate has
// settled so the rest of the session never observes a half-ready
// manager through the shared RwLock.
let mut manager_guard = self.services.mcp_connection_manager.write().await;
*manager_guard = mcp_connection_manager;
}
self.schedule_startup_prewarm(base_instructions).await;
// `record_initial_history` can emit events, so keep it after
// `SessionConfigured` regardless of whether startup is synchronous or
// deferred.
Box::pin(self.record_initial_history(initial_history)).await;
{
let mut state = self.state.lock().await;
state.set_pending_session_start_source(Some(session_start_source));
}
if matches!(self.agent_status.borrow().clone(), AgentStatus::Shutdown) {
return Ok(());
}
self.startup_state.send_replace(SessionStartupState::Ready);
self.maybe_start_turn_for_pending_work().await;
Ok(())
}
async fn handle_initial_startup_failure(self: &Arc<Self>, error: anyhow::Error) {
if matches!(self.agent_status.borrow().clone(), AgentStatus::Shutdown) {
return;
}
let message = error.to_string();
self.startup_state
.send_replace(SessionStartupState::Failed(message.clone()));
self.send_event_raw(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Error(ErrorEvent {
message: message.clone(),
codex_error_info: Some(CodexErrorInfo::Other),
}),
})
.await;
self.maybe_notify_parent_of_startup_failure(AgentStatus::Errored(message))
.await;
}
#[cfg(test)]
pub(crate) fn set_startup_state_for_tests(&self, state: SessionStartupState) {
self.startup_state.send_replace(state);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ConfigBuilder;
use codex_protocol::AgentPath;
#[tokio::test]
async fn async_subagent_startup_requires_a_multi_agent_v2_pathful_child() {
let mut config = ConfigBuilder::default()
.build()
.await
.expect("default test config should load");
let _ = config.features.enable(Feature::MultiAgentV2);
config.multi_agent_v2.async_subagent_startup = true;
assert!(async_subagent_startup_enabled(
&config,
&SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: ThreadId::new(),
depth: 1,
agent_path: Some(AgentPath::root().join("worker").expect("worker path")),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})
));
assert!(!async_subagent_startup_enabled(
&config,
&SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: ThreadId::new(),
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})
));
}
}

View File

@@ -10,6 +10,7 @@ use crate::skills::SkillRenderSideEffects;
use crate::skills::render::SkillMetadataBudget;
use crate::test_support::models_manager_with_provider;
use crate::tools::format_exec_output_str;
use assert_matches::assert_matches;
use codex_config::ConfigLayerStack;
use codex_config::ConfigLayerStackOrdering;
use codex_config::LoaderOverrides;
@@ -4327,6 +4328,8 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
tx_event,
agent_status: agent_status_tx,
startup_state: watch::channel(SessionStartupState::Ready).0,
deferred_startup: Mutex::new(None),
out_of_band_elicitation_paused: watch::channel(false).0,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
@@ -6154,6 +6157,8 @@ where
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
tx_event,
agent_status: agent_status_tx,
startup_state: watch::channel(SessionStartupState::Ready).0,
deferred_startup: Mutex::new(None),
out_of_band_elicitation_paused: watch::channel(false).0,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
@@ -6284,6 +6289,130 @@ async fn refresh_mcp_servers_is_deferred_until_next_turn() {
assert!(!new_token.is_cancelled());
}
#[tokio::test]
async fn inter_agent_mail_stays_queued_while_startup_is_initializing() {
let (session, _rx_event) = make_session_with_config_and_rx(|config| {
let _ = config.features.enable(Feature::MultiAgentV2);
})
.await
.expect("session should initialize");
session.set_startup_state_for_tests(SessionStartupState::Initializing);
handlers::inter_agent_communication(
&session,
"startup-mail".to_string(),
InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("worker path"),
Vec::new(),
"hello".to_string(),
/*trigger_turn*/ true,
),
)
.await;
assert!(
session.active_turn.lock().await.is_none(),
"startup should not claim an active turn before deferred initialization finishes"
);
assert!(
session.input_queue.has_pending_mailbox_items().await,
"trigger-turn mailbox input should stay queued while startup is still in progress"
);
}
#[tokio::test]
async fn user_input_returns_error_while_startup_is_initializing() {
let (session, rx_event) = make_session_with_config_and_rx(|config| {
let _ = config.features.enable(Feature::MultiAgentV2);
})
.await
.expect("session should initialize");
session.set_startup_state_for_tests(SessionStartupState::Initializing);
handlers::user_input_or_turn(
&session,
"startup-user-input".to_string(),
Op::UserInput {
items: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
environments: None,
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
},
)
.await;
let event = loop {
let event = rx_event.recv().await.expect("startup error event");
if matches!(event.msg, EventMsg::Error(_)) {
break event;
}
};
assert_eq!(event.id, "startup-user-input");
assert_matches!(
event.msg,
EventMsg::Error(ErrorEvent {
message,
codex_error_info: Some(CodexErrorInfo::Other),
}) if message == "thread startup is still in progress"
);
assert!(
session.active_turn.lock().await.is_none(),
"rejected input should not create a turn while startup is blocked"
);
}
#[tokio::test]
async fn user_input_returns_error_after_startup_failure() {
let (session, rx_event) = make_session_with_config_and_rx(|config| {
let _ = config.features.enable(Feature::MultiAgentV2);
})
.await
.expect("session should initialize");
session.set_startup_state_for_tests(SessionStartupState::Failed(
"required MCP servers failed to initialize".to_string(),
));
handlers::user_input_or_turn(
&session,
"startup-user-input".to_string(),
Op::UserInput {
items: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
environments: None,
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
},
)
.await;
let event = loop {
let event = rx_event.recv().await.expect("startup error event");
if matches!(event.msg, EventMsg::Error(_)) {
break event;
}
};
assert_eq!(event.id, "startup-user-input");
assert_matches!(
event.msg,
EventMsg::Error(ErrorEvent {
message,
codex_error_info: Some(CodexErrorInfo::Other),
}) if message == "thread failed to initialize: required MCP servers failed to initialize"
);
assert!(
session.active_turn.lock().await.is_none(),
"failed startup should keep rejecting direct turn creation"
);
}
#[tokio::test]
async fn spawn_task_does_not_update_previous_turn_settings_for_non_run_turn_tasks() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;

View File

@@ -469,6 +469,13 @@ impl Session {
self: &Arc<Self>,
sub_id: String,
) {
// Async subagent startup registers the child thread before slow MCP
// initialization finishes. Keep mailbox-triggered work queued until
// startup reaches a stable terminal state.
if !self.startup_ready() {
return;
}
if !self
.input_queue
.has_queued_response_items_for_next_turn()

View File

@@ -1253,24 +1253,49 @@ impl ThreadManagerState {
}
};
{
let mut codex = Some(codex);
let thread = {
let mut threads = self.threads.write().await;
if let std::collections::hash_map::Entry::Vacant(e) = threads.entry(thread_id) {
let Some(codex) = codex.take() else {
unreachable!();
};
let thread = Arc::new(CodexThread::new(
codex,
session_configured.clone(),
session_configured.rollout_path.clone(),
session_source,
));
// Make the thread visible to parent-facing queries before any
// deferred startup work can emit status changes or mailbox
// notifications.
e.insert(thread.clone());
return Ok(NewThread {
thread_id,
thread,
session_configured,
});
Some(thread)
} else {
None
}
};
if let Some(thread) = thread {
// The registration above is the semantic handoff point: after this
// call the child exists from the parent's perspective, so delayed
// MCP failures can be reported asynchronously without losing the
// child identity.
thread
.codex
.session
.start_deferred_startup_if_needed()
.await;
return Ok(NewThread {
thread_id,
thread,
session_configured,
});
}
let Some(codex) = codex else {
unreachable!();
};
if let Err(err) = codex.shutdown_and_wait().await {
warn!("failed to shut down duplicate thread {thread_id}: {err}");
}

View File

@@ -9,6 +9,10 @@ use std::collections::BTreeMap;
pub struct MultiAgentV2ConfigToml {
#[serde(skip_serializing_if = "Option::is_none")]
pub enabled: Option<bool>,
/// When `true`, spawned MultiAgentV2 children register their thread before
/// waiting for slow startup work such as required MCP initialization.
#[serde(skip_serializing_if = "Option::is_none")]
pub async_subagent_startup: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 1))]
pub max_concurrent_threads_per_session: Option<usize>,

View File

@@ -521,6 +521,7 @@ fn multi_agent_v2_feature_config_deserializes_table() {
r#"
[multi_agent_v2]
enabled = true
async_subagent_startup = false
max_concurrent_threads_per_session = 4
min_wait_timeout_ms = 2500
max_wait_timeout_ms = 120000
@@ -544,6 +545,7 @@ non_code_mode_only = true
features.multi_agent_v2,
Some(crate::FeatureToml::Config(crate::MultiAgentV2ConfigToml {
enabled: Some(true),
async_subagent_startup: Some(false),
max_concurrent_threads_per_session: Some(4),
min_wait_timeout_ms: Some(2500),
max_wait_timeout_ms: Some(120000),
@@ -583,6 +585,7 @@ usage_hint_enabled = false
features_toml.multi_agent_v2,
Some(crate::FeatureToml::Config(crate::MultiAgentV2ConfigToml {
enabled: None,
async_subagent_startup: None,
max_concurrent_threads_per_session: None,
min_wait_timeout_ms: None,
max_wait_timeout_ms: None,