mirror of
https://github.com/openai/codex.git
synced 2026-06-02 03:11:59 +00:00
Compare commits
4 Commits
guinness/r
...
codex/asyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28f465ed5d | ||
|
|
119cea0683 | ||
|
|
8fd35ba574 | ||
|
|
dfc173778a |
@@ -1514,6 +1514,10 @@
|
||||
"MultiAgentV2ConfigToml": {
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"async_subagent_startup": {
|
||||
"description": "When `true`, spawned MultiAgentV2 children register their thread before waiting for slow startup work such as required MCP initialization.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"default_wait_timeout_ms": {
|
||||
"format": "int64",
|
||||
"maximum": 3600000.0,
|
||||
|
||||
@@ -10,6 +10,9 @@ use crate::context::ContextualUserFragment;
|
||||
use crate::context::SubagentNotification;
|
||||
use crate::init_state_db;
|
||||
use assert_matches::assert_matches;
|
||||
use codex_config::Constrained;
|
||||
use codex_config::types::McpServerConfig;
|
||||
use codex_config::types::McpServerTransportConfig;
|
||||
use codex_features::Feature;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_protocol::AgentPath;
|
||||
@@ -32,6 +35,7 @@ use codex_thread_store::LocalThreadStore;
|
||||
use codex_thread_store::LocalThreadStoreConfig;
|
||||
use codex_thread_store::ThreadStore;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
@@ -74,6 +78,46 @@ fn assistant_message(text: &str, phase: Option<MessagePhase>) -> ResponseItem {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a required stdio MCP server that never completes initialization
|
||||
/// before its startup timeout, forcing the async startup path to surface a
|
||||
/// delayed failure from the child thread.
|
||||
fn blocking_required_stdio_mcp_server() -> McpServerConfig {
|
||||
let (command, args) = if cfg!(windows) {
|
||||
(
|
||||
"cmd".to_string(),
|
||||
vec!["/C".to_string(), "ping -n 5 127.0.0.1 > NUL".to_string()],
|
||||
)
|
||||
} else {
|
||||
(
|
||||
"sh".to_string(),
|
||||
vec!["-c".to_string(), "sleep 5".to_string()],
|
||||
)
|
||||
};
|
||||
McpServerConfig {
|
||||
transport: McpServerTransportConfig::Stdio {
|
||||
command,
|
||||
args,
|
||||
env: None,
|
||||
env_vars: Vec::new(),
|
||||
cwd: None,
|
||||
},
|
||||
experimental_environment: None,
|
||||
enabled: true,
|
||||
required: true,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_millis(150)),
|
||||
tool_timeout_sec: None,
|
||||
default_tools_approval_mode: None,
|
||||
enabled_tools: None,
|
||||
disabled_tools: None,
|
||||
scopes: None,
|
||||
oauth: None,
|
||||
oauth_resource: None,
|
||||
tools: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_agent_call(call_id: &str) -> ResponseItem {
|
||||
ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
@@ -123,6 +167,45 @@ impl AgentControlHarness {
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_async_child_with_blocking_required_mcp(
|
||||
harness: &AgentControlHarness,
|
||||
parent_thread_id: ThreadId,
|
||||
worker_path: AgentPath,
|
||||
) -> LiveAgent {
|
||||
let mut config = harness.config.clone();
|
||||
let _ = config.features.enable(Feature::MultiAgentV2);
|
||||
config.multi_agent_v2.async_subagent_startup = true;
|
||||
config.mcp_servers = Constrained::allow_any(HashMap::from([(
|
||||
"slow_required".to_string(),
|
||||
blocking_required_stdio_mcp_server(),
|
||||
)]));
|
||||
|
||||
harness
|
||||
.control
|
||||
.spawn_agent_with_metadata(
|
||||
config,
|
||||
Op::InterAgentCommunication {
|
||||
communication: InterAgentCommunication::new(
|
||||
AgentPath::root(),
|
||||
worker_path.clone(),
|
||||
Vec::new(),
|
||||
"hello worker".to_string(),
|
||||
/*trigger_turn*/ true,
|
||||
),
|
||||
},
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: Some(worker_path),
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
SpawnAgentOptions::default(),
|
||||
)
|
||||
.await
|
||||
.expect("child spawn should succeed")
|
||||
}
|
||||
|
||||
fn has_subagent_notification(history_items: &[ResponseItem]) -> bool {
|
||||
history_items.iter().any(|item| {
|
||||
let ResponseItem::Message { role, content, .. } = item else {
|
||||
@@ -199,6 +282,81 @@ async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> boo
|
||||
timeout(Duration::from_secs(10), wait).await.is_ok()
|
||||
}
|
||||
|
||||
async fn wait_for_subagent_notification_with_timeout(
|
||||
parent_thread: &Arc<CodexThread>,
|
||||
timeout_duration: Duration,
|
||||
) -> bool {
|
||||
let wait = async {
|
||||
loop {
|
||||
let history_items = parent_thread
|
||||
.codex
|
||||
.session
|
||||
.clone_history()
|
||||
.await
|
||||
.raw_items()
|
||||
.to_vec();
|
||||
if has_subagent_notification(&history_items) {
|
||||
return true;
|
||||
}
|
||||
sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
};
|
||||
timeout(timeout_duration, wait).await.is_ok()
|
||||
}
|
||||
|
||||
async fn wait_for_pending_parent_inbox(parent_thread: &Arc<CodexThread>) -> bool {
|
||||
let wait = async {
|
||||
loop {
|
||||
if parent_thread
|
||||
.codex
|
||||
.session
|
||||
.input_queue
|
||||
.has_pending_input(&parent_thread.codex.session.active_turn)
|
||||
.await
|
||||
{
|
||||
return true;
|
||||
}
|
||||
sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
};
|
||||
timeout(Duration::from_secs(10), wait).await.is_ok()
|
||||
}
|
||||
|
||||
async fn wait_for_parent_startup_failure_notification(
|
||||
harness: &AgentControlHarness,
|
||||
parent_thread_id: ThreadId,
|
||||
worker_path: &AgentPath,
|
||||
) -> bool {
|
||||
let wait = async {
|
||||
loop {
|
||||
let found = harness
|
||||
.manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.any(|(thread_id, op)| {
|
||||
if thread_id != parent_thread_id {
|
||||
return false;
|
||||
}
|
||||
let Op::InterAgentCommunication { communication } = op else {
|
||||
return false;
|
||||
};
|
||||
communication.author == *worker_path
|
||||
&& communication.recipient == AgentPath::root()
|
||||
&& !communication.trigger_turn
|
||||
&& SubagentNotification::matches_text(&communication.content)
|
||||
&& communication
|
||||
.content
|
||||
.contains("required MCP servers failed to initialize")
|
||||
});
|
||||
if found {
|
||||
return true;
|
||||
}
|
||||
sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
};
|
||||
timeout(Duration::from_secs(10), wait).await.is_ok()
|
||||
}
|
||||
|
||||
async fn persist_thread_for_tree_resume(thread: &Arc<CodexThread>, message: &str) {
|
||||
thread
|
||||
.inject_user_message_without_turn(message.to_string())
|
||||
@@ -1575,6 +1733,81 @@ async fn multi_agent_v2_completion_ignores_dead_direct_parent() {
|
||||
assert!(!has_subagent_notification(&root_history_items));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn async_subagent_startup_registers_child_before_required_mcp_failure() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
let worker_path = AgentPath::root().join("worker").expect("worker path");
|
||||
let spawned = spawn_async_child_with_blocking_required_mcp(
|
||||
&harness,
|
||||
parent_thread_id,
|
||||
worker_path.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(spawned.status, AgentStatus::PendingInit);
|
||||
|
||||
let child_thread = harness
|
||||
.manager
|
||||
.get_thread(spawned.thread_id)
|
||||
.await
|
||||
.expect("child thread should exist");
|
||||
assert!(
|
||||
child_thread
|
||||
.codex
|
||||
.session
|
||||
.active_turn
|
||||
.lock()
|
||||
.await
|
||||
.is_none()
|
||||
);
|
||||
assert_eq!(
|
||||
harness.control.get_status(spawned.thread_id).await,
|
||||
AgentStatus::PendingInit,
|
||||
"the child should stay in PendingInit until deferred startup either succeeds or fails"
|
||||
);
|
||||
|
||||
assert!(
|
||||
wait_for_parent_startup_failure_notification(&harness, parent_thread_id, &worker_path)
|
||||
.await,
|
||||
"startup failure should enqueue a mailbox notification for the parent"
|
||||
);
|
||||
|
||||
// MultiAgentV2 children report back through the parent's mailbox, so the
|
||||
// notification should land as pending input rather than as an immediate
|
||||
// history item.
|
||||
assert!(
|
||||
wait_for_pending_parent_inbox(&parent_thread).await,
|
||||
"parent should receive a startup failure notification from the child"
|
||||
);
|
||||
assert_matches!(
|
||||
harness.control.get_status(spawned.thread_id).await,
|
||||
AgentStatus::Errored(message)
|
||||
if message.contains("required MCP servers failed to initialize")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn async_subagent_startup_does_not_report_failure_after_child_shutdown() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
let worker_path = AgentPath::root().join("worker").expect("worker path");
|
||||
let spawned =
|
||||
spawn_async_child_with_blocking_required_mcp(&harness, parent_thread_id, worker_path).await;
|
||||
|
||||
harness
|
||||
.control
|
||||
.shutdown_live_agent(spawned.thread_id)
|
||||
.await
|
||||
.expect("shutdown should succeed");
|
||||
|
||||
assert!(
|
||||
!wait_for_subagent_notification_with_timeout(&parent_thread, Duration::from_millis(750))
|
||||
.await,
|
||||
"shutdown should suppress late async-startup failure notifications"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
|
||||
@@ -10186,6 +10186,7 @@ async fn multi_agent_v2_config_from_feature_table() -> std::io::Result<()> {
|
||||
codex_home.path().join(CONFIG_TOML_FILE),
|
||||
r#"[features.multi_agent_v2]
|
||||
enabled = true
|
||||
async_subagent_startup = true
|
||||
max_concurrent_threads_per_session = 5
|
||||
min_wait_timeout_ms = 2500
|
||||
max_wait_timeout_ms = 120000
|
||||
@@ -10207,6 +10208,7 @@ non_code_mode_only = true
|
||||
.await?;
|
||||
|
||||
assert!(config.features.enabled(Feature::MultiAgentV2));
|
||||
assert!(config.multi_agent_v2.async_subagent_startup);
|
||||
assert_eq!(config.multi_agent_v2.max_concurrent_threads_per_session, 5);
|
||||
assert_eq!(config.multi_agent_v2.min_wait_timeout_ms, 2500);
|
||||
assert_eq!(config.multi_agent_v2.max_wait_timeout_ms, 120000);
|
||||
@@ -10243,6 +10245,7 @@ async fn profile_multi_agent_v2_config_overrides_base() -> std::io::Result<()> {
|
||||
r#"profile = "no_hint"
|
||||
|
||||
[features.multi_agent_v2]
|
||||
async_subagent_startup = false
|
||||
max_concurrent_threads_per_session = 4
|
||||
min_wait_timeout_ms = 3000
|
||||
max_wait_timeout_ms = 120000
|
||||
@@ -10256,6 +10259,7 @@ hide_spawn_agent_metadata = true
|
||||
non_code_mode_only = false
|
||||
|
||||
[profiles.no_hint.features.multi_agent_v2]
|
||||
async_subagent_startup = true
|
||||
max_concurrent_threads_per_session = 6
|
||||
min_wait_timeout_ms = 1500
|
||||
max_wait_timeout_ms = 90000
|
||||
@@ -10276,6 +10280,7 @@ non_code_mode_only = true
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
assert!(config.multi_agent_v2.async_subagent_startup);
|
||||
assert_eq!(config.multi_agent_v2.max_concurrent_threads_per_session, 6);
|
||||
assert_eq!(config.multi_agent_v2.min_wait_timeout_ms, 1500);
|
||||
assert_eq!(config.multi_agent_v2.max_wait_timeout_ms, 90000);
|
||||
@@ -10320,6 +10325,7 @@ enabled = true
|
||||
.await?;
|
||||
|
||||
assert_eq!(config.multi_agent_v2.max_concurrent_threads_per_session, 4);
|
||||
assert!(!config.multi_agent_v2.async_subagent_startup);
|
||||
assert_eq!(config.multi_agent_v2.min_wait_timeout_ms, 10_000);
|
||||
assert_eq!(config.multi_agent_v2.max_wait_timeout_ms, 3_600_000);
|
||||
assert_eq!(config.multi_agent_v2.default_wait_timeout_ms, 30_000);
|
||||
|
||||
@@ -958,6 +958,9 @@ pub struct Config {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
|
||||
pub struct MultiAgentV2Config {
|
||||
/// Register spawned MultiAgentV2 children before slow MCP startup
|
||||
/// finishes so the parent is not blocked on tool initialization.
|
||||
pub async_subagent_startup: bool,
|
||||
pub max_concurrent_threads_per_session: usize,
|
||||
pub min_wait_timeout_ms: i64,
|
||||
pub max_wait_timeout_ms: i64,
|
||||
@@ -974,6 +977,7 @@ pub struct MultiAgentV2Config {
|
||||
impl Default for MultiAgentV2Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
async_subagent_startup: false,
|
||||
max_concurrent_threads_per_session:
|
||||
DEFAULT_MULTI_AGENT_V2_MAX_CONCURRENT_THREADS_PER_SESSION,
|
||||
min_wait_timeout_ms: DEFAULT_MULTI_AGENT_V2_MIN_WAIT_TIMEOUT_MS,
|
||||
@@ -2165,6 +2169,10 @@ fn resolve_multi_agent_v2_config(
|
||||
let profile = multi_agent_v2_toml_config(config_profile.features.as_ref());
|
||||
let default = MultiAgentV2Config::default();
|
||||
|
||||
let async_subagent_startup = profile
|
||||
.and_then(|config| config.async_subagent_startup)
|
||||
.or_else(|| base.and_then(|config| config.async_subagent_startup))
|
||||
.unwrap_or(default.async_subagent_startup);
|
||||
let max_concurrent_threads_per_session = profile
|
||||
.and_then(|config| config.max_concurrent_threads_per_session)
|
||||
.or_else(|| base.and_then(|config| config.max_concurrent_threads_per_session))
|
||||
@@ -2215,6 +2223,7 @@ fn resolve_multi_agent_v2_config(
|
||||
.unwrap_or(default.non_code_mode_only);
|
||||
|
||||
MultiAgentV2Config {
|
||||
async_subagent_startup,
|
||||
max_concurrent_threads_per_session,
|
||||
min_wait_timeout_ms,
|
||||
max_wait_timeout_ms,
|
||||
|
||||
@@ -9,6 +9,7 @@ use tracing::Instrument;
|
||||
use tracing::debug_span;
|
||||
use tracing::info_span;
|
||||
|
||||
use crate::session::SessionStartupState;
|
||||
use crate::session::SteerInputError;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::session::SessionSettingsUpdate;
|
||||
@@ -183,6 +184,38 @@ async fn thread_settings_applied_event(sess: &Session) -> EventMsg {
|
||||
})
|
||||
}
|
||||
|
||||
async fn reject_turn_start_until_session_ready(sess: &Arc<Session>, sub_id: &str) -> bool {
|
||||
// Async subagent startup only defers mailbox-triggered work today. Any
|
||||
// direct turn-starting request that arrives before startup completes is
|
||||
// rejected explicitly so it cannot race the child into running with an
|
||||
// incomplete tool set or partially reconstructed history.
|
||||
match sess.startup_state() {
|
||||
SessionStartupState::Ready => false,
|
||||
SessionStartupState::Initializing => {
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: "thread startup is still in progress".to_string(),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
true
|
||||
}
|
||||
SessionStartupState::Failed(message) => {
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: format!("thread failed to initialize: {message}"),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn user_input_or_turn_inner(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: String,
|
||||
@@ -208,6 +241,10 @@ pub(super) async fn user_input_or_turn_inner(
|
||||
updates.final_output_json_schema = Some(final_output_json_schema);
|
||||
updates.environments = environments;
|
||||
|
||||
if reject_turn_start_until_session_ready(sess, &sub_id).await {
|
||||
return;
|
||||
}
|
||||
|
||||
let Ok(current_context) = sess.new_turn_with_sub_id(sub_id.clone(), updates).await else {
|
||||
// new_turn_with_sub_id already emits the error event.
|
||||
return;
|
||||
@@ -325,6 +362,10 @@ pub async fn run_user_shell_command(sess: &Arc<Session>, sub_id: String, command
|
||||
return;
|
||||
}
|
||||
|
||||
if reject_turn_start_until_session_ready(sess, &sub_id).await {
|
||||
return;
|
||||
}
|
||||
|
||||
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
|
||||
sess.spawn_task(
|
||||
Arc::clone(&turn_context),
|
||||
@@ -458,6 +499,10 @@ pub async fn reload_user_config(sess: &Arc<Session>) {
|
||||
}
|
||||
|
||||
pub async fn compact(sess: &Arc<Session>, sub_id: String) {
|
||||
if reject_turn_start_until_session_ready(sess, &sub_id).await {
|
||||
return;
|
||||
}
|
||||
|
||||
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
|
||||
|
||||
sess.spawn_task(
|
||||
@@ -606,6 +651,7 @@ pub async fn set_thread_memory_mode(sess: &Arc<Session>, sub_id: String, mode: T
|
||||
|
||||
async fn shutdown_session_runtime(sess: &Arc<Session>) {
|
||||
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
sess.cancel_mcp_startup().await;
|
||||
let _ = sess.conversation.shutdown().await;
|
||||
sess.services
|
||||
.unified_exec_manager
|
||||
@@ -683,6 +729,10 @@ pub async fn review(
|
||||
sub_id: String,
|
||||
review_request: ReviewRequest,
|
||||
) {
|
||||
if reject_turn_start_until_session_ready(sess, &sub_id).await {
|
||||
return;
|
||||
}
|
||||
|
||||
let turn_context = sess.new_default_turn_with_sub_id(sub_id.clone()).await;
|
||||
sess.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref())
|
||||
.await;
|
||||
|
||||
@@ -200,6 +200,7 @@ mod review;
|
||||
mod rollout_reconstruction;
|
||||
#[allow(clippy::module_inception)]
|
||||
pub(crate) mod session;
|
||||
mod startup;
|
||||
pub(crate) mod turn;
|
||||
pub(crate) mod turn_context;
|
||||
use self::config_lock::export_config_lock_if_configured;
|
||||
@@ -214,6 +215,7 @@ use self::session::AppServerClientMetadata;
|
||||
use self::session::Session;
|
||||
use self::session::SessionConfiguration;
|
||||
pub(crate) use self::session::SessionSettingsUpdate;
|
||||
pub(crate) use self::startup::SessionStartupState;
|
||||
#[cfg(test)]
|
||||
use self::turn::AssistantMessageStreamParsers;
|
||||
#[cfg(test)]
|
||||
@@ -652,7 +654,7 @@ impl Codex {
|
||||
tx_event.clone(),
|
||||
agent_status_tx.clone(),
|
||||
conversation_history,
|
||||
session_source_clone,
|
||||
session_source_clone.clone(),
|
||||
skills_manager,
|
||||
plugins_manager,
|
||||
mcp_manager.clone(),
|
||||
@@ -670,6 +672,15 @@ impl Codex {
|
||||
map_session_init_error(&e, &config.codex_home)
|
||||
})?;
|
||||
let thread_id = session.conversation_id;
|
||||
if !matches!(
|
||||
session_source_clone,
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
|
||||
) {
|
||||
// Thread-spawn children must wait until ThreadManager registration
|
||||
// finishes. Other session sources have no external registration
|
||||
// boundary, so they can kick off any deferred startup immediately.
|
||||
session.start_deferred_startup_if_needed().await;
|
||||
}
|
||||
|
||||
// This task will run until Op::Shutdown is received.
|
||||
let session_for_loop = Arc::clone(&session);
|
||||
@@ -1661,8 +1672,8 @@ impl Session {
|
||||
return;
|
||||
}
|
||||
|
||||
self.forward_child_completion_to_parent(
|
||||
turn_context,
|
||||
self.forward_child_status_to_parent(
|
||||
Some(turn_context),
|
||||
*parent_thread_id,
|
||||
child_agent_path,
|
||||
status,
|
||||
@@ -1670,10 +1681,14 @@ impl Session {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Sends the standard completion envelope from a spawned MultiAgentV2 child to its parent.
|
||||
async fn forward_child_completion_to_parent(
|
||||
/// Sends a status envelope from a spawned MultiAgentV2 child to its parent.
|
||||
///
|
||||
/// Turn completions carry a turn context so rollout trace interactions can
|
||||
/// be recorded. Startup failures happen before any child turn exists, so
|
||||
/// they reuse the same parent notification path without trace recording.
|
||||
async fn forward_child_status_to_parent(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
turn_context: Option<&TurnContext>,
|
||||
parent_thread_id: ThreadId,
|
||||
child_agent_path: &codex_protocol::AgentPath,
|
||||
status: AgentStatus,
|
||||
@@ -1710,7 +1725,7 @@ impl Session {
|
||||
debug!("failed to notify parent thread {parent_thread_id}: {err}");
|
||||
return;
|
||||
}
|
||||
if let Some(message) = trace_message {
|
||||
if let (Some(message), Some(turn_context)) = (trace_message, turn_context) {
|
||||
self.services
|
||||
.rollout_thread_trace
|
||||
.record_agent_result_interaction(
|
||||
@@ -1725,6 +1740,37 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
async fn maybe_notify_parent_of_startup_failure(&self, status: AgentStatus) {
|
||||
if !self.enabled(Feature::MultiAgentV2) {
|
||||
return;
|
||||
}
|
||||
|
||||
let session_source = {
|
||||
let state = self.state.lock().await;
|
||||
state.session_configuration.session_source.clone()
|
||||
};
|
||||
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
agent_path: Some(child_agent_path),
|
||||
..
|
||||
}) = session_source
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
if !is_final(&status) {
|
||||
return;
|
||||
}
|
||||
|
||||
self.forward_child_status_to_parent(
|
||||
/*turn_context*/ None,
|
||||
parent_thread_id,
|
||||
&child_agent_path,
|
||||
status,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn maybe_mirror_event_text_to_realtime(&self, msg: &EventMsg) {
|
||||
let Some(text) = realtime_text_for_event(msg) else {
|
||||
return;
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use super::input_queue::InputQueue;
|
||||
use super::startup::PendingSessionStartup;
|
||||
use super::startup::async_subagent_startup_enabled;
|
||||
use super::*;
|
||||
use crate::goals::GoalRuntimeState;
|
||||
use crate::state::ActiveTurn;
|
||||
@@ -18,6 +20,13 @@ pub(crate) struct Session {
|
||||
pub(crate) installation_id: String,
|
||||
pub(super) tx_event: Sender<Event>,
|
||||
pub(super) agent_status: watch::Sender<AgentStatus>,
|
||||
/// Gates model-driven turn creation until startup has either finished or
|
||||
/// failed decisively. Spawned subagents can therefore exist early without
|
||||
/// racing the model against incomplete MCP setup.
|
||||
pub(super) startup_state: watch::Sender<SessionStartupState>,
|
||||
/// Holds deferred startup work until ThreadManager has registered the
|
||||
/// thread and made the child visible to parent-facing APIs.
|
||||
pub(super) deferred_startup: Mutex<Option<PendingSessionStartup>>,
|
||||
pub(super) out_of_band_elicitation_paused: watch::Sender<bool>,
|
||||
pub(super) state: Mutex<SessionState>,
|
||||
/// Serializes rebuild/apply cycles for the running proxy; each cycle
|
||||
@@ -418,10 +427,6 @@ impl Session {
|
||||
|
||||
#[instrument(name = "session_init", level = "info", skip_all)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "session initialization must serialize access through session-owned manager guards"
|
||||
)]
|
||||
pub(crate) async fn new(
|
||||
mut session_configuration: SessionConfiguration,
|
||||
config: Arc<Config>,
|
||||
@@ -450,6 +455,8 @@ impl Session {
|
||||
session_configuration.provider
|
||||
);
|
||||
let forked_from_id = initial_history.forked_from_id();
|
||||
let async_subagent_startup =
|
||||
async_subagent_startup_enabled(config.as_ref(), &session_configuration.session_source);
|
||||
|
||||
let event_persistence_mode = if session_configuration.persist_extended_history {
|
||||
ThreadEventPersistenceMode::Extended
|
||||
@@ -951,12 +958,19 @@ impl Session {
|
||||
.set_window_generation(window_generation);
|
||||
let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) =
|
||||
watch::channel(false);
|
||||
let (startup_state, _startup_state_rx) = watch::channel(if async_subagent_startup {
|
||||
SessionStartupState::Initializing
|
||||
} else {
|
||||
SessionStartupState::Ready
|
||||
});
|
||||
|
||||
let sess = Arc::new(Session {
|
||||
conversation_id: thread_id,
|
||||
installation_id,
|
||||
tx_event: tx_event.clone(),
|
||||
agent_status,
|
||||
startup_state,
|
||||
deferred_startup: Mutex::new(None),
|
||||
out_of_band_elicitation_paused,
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
|
||||
@@ -1009,116 +1023,6 @@ impl Session {
|
||||
for event in events {
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
|
||||
let mut required_mcp_servers: Vec<String> = mcp_servers
|
||||
.iter()
|
||||
.filter(|(_, server)| server.enabled() && server.required())
|
||||
.map(|(name, _)| name.clone())
|
||||
.collect();
|
||||
required_mcp_servers.sort();
|
||||
let enabled_mcp_server_count =
|
||||
mcp_servers.values().filter(|server| server.enabled()).count();
|
||||
let required_mcp_server_count = required_mcp_servers.len();
|
||||
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()).await;
|
||||
let host_owned_codex_apps_enabled = config
|
||||
.features
|
||||
.apps_enabled_for_auth(auth.as_ref().is_some_and(|auth| auth.uses_codex_backend()));
|
||||
let client_elicitation_capability = if config.features.enabled(Feature::AuthElicitation) {
|
||||
ElicitationCapability {
|
||||
form: Some(FormElicitationCapability::default()),
|
||||
url: Some(UrlElicitationCapability::default()),
|
||||
}
|
||||
} else {
|
||||
ElicitationCapability::default()
|
||||
};
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
cancel_guard.cancel();
|
||||
*cancel_guard = CancellationToken::new();
|
||||
}
|
||||
let turn_environment = crate::environment_selection::resolve_environment_selections(
|
||||
sess.services.environment_manager.as_ref(),
|
||||
&session_configuration.environments,
|
||||
)
|
||||
.map_err(|err| {
|
||||
CodexErr::InvalidRequest(err.to_string().replace(
|
||||
"unknown turn environment id",
|
||||
"unknown stored MCP environment id",
|
||||
))
|
||||
})?
|
||||
.primary()
|
||||
.cloned();
|
||||
let mcp_runtime_environment = match turn_environment {
|
||||
Some(turn_environment) => McpRuntimeEnvironment::new(
|
||||
Some(Arc::clone(&turn_environment.environment)),
|
||||
sess.services.environment_manager.try_local_environment(),
|
||||
turn_environment.cwd.to_path_buf(),
|
||||
),
|
||||
None => McpRuntimeEnvironment::new(
|
||||
sess.services.environment_manager.default_or_local_environment(),
|
||||
sess.services.environment_manager.try_local_environment(),
|
||||
session_configuration.cwd.to_path_buf(),
|
||||
),
|
||||
};
|
||||
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_statuses.clone(),
|
||||
&session_configuration.approval_policy,
|
||||
INITIAL_SUBMIT_ID.to_owned(),
|
||||
tx_event.clone(),
|
||||
session_configuration.permission_profile(),
|
||||
mcp_runtime_environment,
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth),
|
||||
host_owned_codex_apps_enabled,
|
||||
client_elicitation_capability,
|
||||
tool_plugin_provenance,
|
||||
auth,
|
||||
Some(sess.mcp_elicitation_reviewer()),
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"session_init.mcp_manager_init",
|
||||
otel.name = "session_init.mcp_manager_init",
|
||||
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
{
|
||||
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
|
||||
*manager_guard = mcp_connection_manager;
|
||||
}
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
if cancel_guard.is_cancelled() {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
*cancel_guard = cancel_token;
|
||||
}
|
||||
if !required_mcp_servers.is_empty() {
|
||||
let failures = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.required_startup_failures(&required_mcp_servers)
|
||||
.instrument(info_span!(
|
||||
"session_init.required_mcp_wait",
|
||||
otel.name = "session_init.required_mcp_wait",
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
if !failures.is_empty() {
|
||||
let details = failures
|
||||
.iter()
|
||||
.map(|failure| format!("{}: {}", failure.server, failure.error))
|
||||
.collect::<Vec<_>>()
|
||||
.join("; ");
|
||||
anyhow::bail!("required MCP servers failed to initialize: {details}");
|
||||
}
|
||||
}
|
||||
sess.schedule_startup_prewarm(session_configuration.base_instructions.clone())
|
||||
.await;
|
||||
let session_start_source = match &initial_history {
|
||||
InitialHistory::Resumed(_) => codex_hooks::SessionStartSource::Resume,
|
||||
InitialHistory::New | InitialHistory::Forked(_) => {
|
||||
@@ -1126,15 +1030,27 @@ impl Session {
|
||||
}
|
||||
InitialHistory::Cleared => codex_hooks::SessionStartSource::Clear,
|
||||
};
|
||||
|
||||
// record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted.
|
||||
Box::pin(sess.record_initial_history(initial_history)).await;
|
||||
{
|
||||
let mut state = sess.state.lock().await;
|
||||
state.set_pending_session_start_source(Some(session_start_source));
|
||||
let pending_startup = PendingSessionStartup {
|
||||
initial_history,
|
||||
session_start_source,
|
||||
auth: auth.cloned(),
|
||||
mcp_servers,
|
||||
auth_statuses,
|
||||
base_instructions: session_configuration.base_instructions.clone(),
|
||||
};
|
||||
if async_subagent_startup {
|
||||
{
|
||||
let mut deferred_startup = sess.deferred_startup.lock().await;
|
||||
// Stash the slow startup work so ThreadManager can release
|
||||
// `spawn_agent` as soon as the child is registered, then
|
||||
// resume initialization in the background.
|
||||
*deferred_startup = Some(pending_startup);
|
||||
}
|
||||
Ok(sess)
|
||||
} else {
|
||||
Box::pin(sess.finish_initial_startup(pending_startup)).await?;
|
||||
Ok(sess)
|
||||
}
|
||||
|
||||
Ok(sess)
|
||||
}
|
||||
.await;
|
||||
match session_result {
|
||||
|
||||
307
codex-rs/core/src/session/startup.rs
Normal file
307
codex-rs/core/src/session/startup.rs
Normal file
@@ -0,0 +1,307 @@
|
||||
use super::*;
|
||||
use codex_mcp::EffectiveMcpServer;
|
||||
use codex_mcp::McpAuthStatusEntry;
|
||||
|
||||
/// Tracks whether a session may start model-driven turn work yet.
|
||||
///
|
||||
/// Async subagent startup intentionally creates the child thread before slow
|
||||
/// MCP initialization finishes. The child therefore needs a small readiness
|
||||
/// gate so external work can stay queued until the session has the correct
|
||||
/// tool universe and reconstructed history.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum SessionStartupState {
|
||||
Ready,
|
||||
Initializing,
|
||||
Failed(String),
|
||||
}
|
||||
|
||||
/// Captures the work deferred until after a session has been registered.
|
||||
///
|
||||
/// This keeps the async startup seam explicit: thread creation and
|
||||
/// `SessionConfigured` happen first, while slow MCP initialization and initial
|
||||
/// history reconstruction can either complete synchronously or continue in the
|
||||
/// background for spawned subagents.
|
||||
pub(super) struct PendingSessionStartup {
|
||||
pub(super) initial_history: InitialHistory,
|
||||
pub(super) session_start_source: codex_hooks::SessionStartSource,
|
||||
pub(super) auth: Option<CodexAuth>,
|
||||
pub(super) mcp_servers: HashMap<String, EffectiveMcpServer>,
|
||||
pub(super) auth_statuses: HashMap<String, McpAuthStatusEntry>,
|
||||
pub(super) base_instructions: String,
|
||||
}
|
||||
|
||||
pub(super) fn async_subagent_startup_enabled(
|
||||
config: &Config,
|
||||
session_source: &SessionSource,
|
||||
) -> bool {
|
||||
// Only spawned MultiAgentV2 children have the parent-notification and
|
||||
// queued-mailbox semantics needed to safely surface delayed startup
|
||||
// failures after the thread has been registered.
|
||||
config.features.enabled(Feature::MultiAgentV2)
|
||||
&& config.multi_agent_v2.async_subagent_startup
|
||||
&& matches!(
|
||||
session_source,
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
agent_path: Some(_),
|
||||
..
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub(crate) fn startup_state(&self) -> SessionStartupState {
|
||||
self.startup_state.borrow().clone()
|
||||
}
|
||||
|
||||
pub(crate) fn startup_ready(&self) -> bool {
|
||||
matches!(self.startup_state(), SessionStartupState::Ready)
|
||||
}
|
||||
|
||||
/// Starts any deferred startup work after the thread has been registered.
|
||||
pub(crate) async fn start_deferred_startup_if_needed(self: &Arc<Self>) {
|
||||
let startup = {
|
||||
let mut deferred_startup = self.deferred_startup.lock().await;
|
||||
deferred_startup.take()
|
||||
};
|
||||
if let Some(startup) = startup {
|
||||
// Only one caller should ever observe pending startup work, but the
|
||||
// explicit take keeps repeated calls harmless and makes the handoff
|
||||
// point easy to reason about.
|
||||
self.spawn_initial_startup(startup);
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_initial_startup(self: &Arc<Self>, startup: PendingSessionStartup) {
|
||||
let session = Arc::clone(self);
|
||||
let thread_id = session.thread_id();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
if let Err(err) = session.finish_initial_startup(startup).await {
|
||||
session.handle_initial_startup_failure(err).await;
|
||||
}
|
||||
}
|
||||
.instrument(tracing::info_span!(
|
||||
"session_init.deferred_startup",
|
||||
otel.name = "session_init.deferred_startup",
|
||||
thread_id = %thread_id,
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
pub(super) async fn finish_initial_startup(
|
||||
self: &Arc<Self>,
|
||||
startup: PendingSessionStartup,
|
||||
) -> anyhow::Result<()> {
|
||||
let PendingSessionStartup {
|
||||
initial_history,
|
||||
session_start_source,
|
||||
auth,
|
||||
mcp_servers,
|
||||
auth_statuses,
|
||||
base_instructions,
|
||||
} = startup;
|
||||
let config = self.get_config().await;
|
||||
let session_configuration = {
|
||||
let state = self.state.lock().await;
|
||||
state.session_configuration.clone()
|
||||
};
|
||||
|
||||
let mut required_mcp_servers: Vec<String> = mcp_servers
|
||||
.iter()
|
||||
.filter(|(_, server)| server.enabled() && server.required())
|
||||
.map(|(name, _)| name.clone())
|
||||
.collect();
|
||||
required_mcp_servers.sort();
|
||||
let enabled_mcp_server_count = mcp_servers
|
||||
.values()
|
||||
.filter(|server| server.enabled())
|
||||
.count();
|
||||
let required_mcp_server_count = required_mcp_servers.len();
|
||||
let tool_plugin_provenance = self
|
||||
.services
|
||||
.mcp_manager
|
||||
.tool_plugin_provenance(config.as_ref())
|
||||
.await;
|
||||
let host_owned_codex_apps_enabled = config.features.apps_enabled_for_auth(
|
||||
auth.as_ref()
|
||||
.is_some_and(codex_login::CodexAuth::uses_codex_backend),
|
||||
);
|
||||
let client_elicitation_capability = if config.features.enabled(Feature::AuthElicitation) {
|
||||
ElicitationCapability {
|
||||
form: Some(FormElicitationCapability::default()),
|
||||
url: Some(UrlElicitationCapability::default()),
|
||||
}
|
||||
} else {
|
||||
ElicitationCapability::default()
|
||||
};
|
||||
{
|
||||
let mut cancel_guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
cancel_guard.cancel();
|
||||
*cancel_guard = CancellationToken::new();
|
||||
}
|
||||
let turn_environment = crate::environment_selection::resolve_environment_selections(
|
||||
self.services.environment_manager.as_ref(),
|
||||
&session_configuration.environments,
|
||||
)
|
||||
.map_err(|err| {
|
||||
CodexErr::InvalidRequest(err.to_string().replace(
|
||||
"unknown turn environment id",
|
||||
"unknown stored MCP environment id",
|
||||
))
|
||||
})?
|
||||
.primary()
|
||||
.cloned();
|
||||
let mcp_runtime_environment = match turn_environment {
|
||||
Some(turn_environment) => McpRuntimeEnvironment::new(
|
||||
Some(Arc::clone(&turn_environment.environment)),
|
||||
self.services.environment_manager.try_local_environment(),
|
||||
turn_environment.cwd.to_path_buf(),
|
||||
),
|
||||
None => McpRuntimeEnvironment::new(
|
||||
self.services
|
||||
.environment_manager
|
||||
.default_or_local_environment(),
|
||||
self.services.environment_manager.try_local_environment(),
|
||||
session_configuration.cwd.to_path_buf(),
|
||||
),
|
||||
};
|
||||
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_statuses,
|
||||
&session_configuration.approval_policy,
|
||||
INITIAL_SUBMIT_ID.to_owned(),
|
||||
self.get_tx_event(),
|
||||
session_configuration.permission_profile(),
|
||||
mcp_runtime_environment,
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth.as_ref()),
|
||||
host_owned_codex_apps_enabled,
|
||||
client_elicitation_capability,
|
||||
tool_plugin_provenance,
|
||||
auth.as_ref(),
|
||||
Some(self.mcp_elicitation_reviewer()),
|
||||
)
|
||||
.instrument(tracing::info_span!(
|
||||
"session_init.mcp_manager_init",
|
||||
otel.name = "session_init.mcp_manager_init",
|
||||
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
{
|
||||
let mut cancel_guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
if cancel_guard.is_cancelled() {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
*cancel_guard = cancel_token;
|
||||
}
|
||||
if !required_mcp_servers.is_empty() {
|
||||
let failures = mcp_connection_manager
|
||||
.required_startup_failures(&required_mcp_servers)
|
||||
.instrument(tracing::info_span!(
|
||||
"session_init.required_mcp_wait",
|
||||
otel.name = "session_init.required_mcp_wait",
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
if !failures.is_empty() {
|
||||
let details = failures
|
||||
.iter()
|
||||
.map(|failure| format!("{}: {}", failure.server, failure.error))
|
||||
.collect::<Vec<_>>()
|
||||
.join("; ");
|
||||
anyhow::bail!("required MCP servers failed to initialize: {details}");
|
||||
}
|
||||
}
|
||||
{
|
||||
// Publish the live manager only after the required-server gate has
|
||||
// settled so the rest of the session never observes a half-ready
|
||||
// manager through the shared RwLock.
|
||||
let mut manager_guard = self.services.mcp_connection_manager.write().await;
|
||||
*manager_guard = mcp_connection_manager;
|
||||
}
|
||||
|
||||
self.schedule_startup_prewarm(base_instructions).await;
|
||||
// `record_initial_history` can emit events, so keep it after
|
||||
// `SessionConfigured` regardless of whether startup is synchronous or
|
||||
// deferred.
|
||||
Box::pin(self.record_initial_history(initial_history)).await;
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_pending_session_start_source(Some(session_start_source));
|
||||
}
|
||||
|
||||
if matches!(self.agent_status.borrow().clone(), AgentStatus::Shutdown) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.startup_state.send_replace(SessionStartupState::Ready);
|
||||
self.maybe_start_turn_for_pending_work().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_initial_startup_failure(self: &Arc<Self>, error: anyhow::Error) {
|
||||
if matches!(self.agent_status.borrow().clone(), AgentStatus::Shutdown) {
|
||||
return;
|
||||
}
|
||||
|
||||
let message = error.to_string();
|
||||
self.startup_state
|
||||
.send_replace(SessionStartupState::Failed(message.clone()));
|
||||
self.send_event_raw(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: message.clone(),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
self.maybe_notify_parent_of_startup_failure(AgentStatus::Errored(message))
|
||||
.await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn set_startup_state_for_tests(&self, state: SessionStartupState) {
|
||||
self.startup_state.send_replace(state);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::ConfigBuilder;
|
||||
use codex_protocol::AgentPath;
|
||||
|
||||
#[tokio::test]
|
||||
async fn async_subagent_startup_requires_a_multi_agent_v2_pathful_child() {
|
||||
let mut config = ConfigBuilder::default()
|
||||
.build()
|
||||
.await
|
||||
.expect("default test config should load");
|
||||
let _ = config.features.enable(Feature::MultiAgentV2);
|
||||
config.multi_agent_v2.async_subagent_startup = true;
|
||||
|
||||
assert!(async_subagent_startup_enabled(
|
||||
&config,
|
||||
&SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: ThreadId::new(),
|
||||
depth: 1,
|
||||
agent_path: Some(AgentPath::root().join("worker").expect("worker path")),
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})
|
||||
));
|
||||
|
||||
assert!(!async_subagent_startup_enabled(
|
||||
&config,
|
||||
&SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: ThreadId::new(),
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ use crate::skills::SkillRenderSideEffects;
|
||||
use crate::skills::render::SkillMetadataBudget;
|
||||
use crate::test_support::models_manager_with_provider;
|
||||
use crate::tools::format_exec_output_str;
|
||||
use assert_matches::assert_matches;
|
||||
use codex_config::ConfigLayerStack;
|
||||
use codex_config::ConfigLayerStackOrdering;
|
||||
use codex_config::LoaderOverrides;
|
||||
@@ -4327,6 +4328,8 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
tx_event,
|
||||
agent_status: agent_status_tx,
|
||||
startup_state: watch::channel(SessionStartupState::Ready).0,
|
||||
deferred_startup: Mutex::new(None),
|
||||
out_of_band_elicitation_paused: watch::channel(false).0,
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
|
||||
@@ -6154,6 +6157,8 @@ where
|
||||
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
tx_event,
|
||||
agent_status: agent_status_tx,
|
||||
startup_state: watch::channel(SessionStartupState::Ready).0,
|
||||
deferred_startup: Mutex::new(None),
|
||||
out_of_band_elicitation_paused: watch::channel(false).0,
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
|
||||
@@ -6284,6 +6289,130 @@ async fn refresh_mcp_servers_is_deferred_until_next_turn() {
|
||||
assert!(!new_token.is_cancelled());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn inter_agent_mail_stays_queued_while_startup_is_initializing() {
|
||||
let (session, _rx_event) = make_session_with_config_and_rx(|config| {
|
||||
let _ = config.features.enable(Feature::MultiAgentV2);
|
||||
})
|
||||
.await
|
||||
.expect("session should initialize");
|
||||
session.set_startup_state_for_tests(SessionStartupState::Initializing);
|
||||
|
||||
handlers::inter_agent_communication(
|
||||
&session,
|
||||
"startup-mail".to_string(),
|
||||
InterAgentCommunication::new(
|
||||
AgentPath::root(),
|
||||
AgentPath::try_from("/root/worker").expect("worker path"),
|
||||
Vec::new(),
|
||||
"hello".to_string(),
|
||||
/*trigger_turn*/ true,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
session.active_turn.lock().await.is_none(),
|
||||
"startup should not claim an active turn before deferred initialization finishes"
|
||||
);
|
||||
assert!(
|
||||
session.input_queue.has_pending_mailbox_items().await,
|
||||
"trigger-turn mailbox input should stay queued while startup is still in progress"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn user_input_returns_error_while_startup_is_initializing() {
|
||||
let (session, rx_event) = make_session_with_config_and_rx(|config| {
|
||||
let _ = config.features.enable(Feature::MultiAgentV2);
|
||||
})
|
||||
.await
|
||||
.expect("session should initialize");
|
||||
session.set_startup_state_for_tests(SessionStartupState::Initializing);
|
||||
|
||||
handlers::user_input_or_turn(
|
||||
&session,
|
||||
"startup-user-input".to_string(),
|
||||
Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
environments: None,
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
thread_settings: Default::default(),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let event = loop {
|
||||
let event = rx_event.recv().await.expect("startup error event");
|
||||
if matches!(event.msg, EventMsg::Error(_)) {
|
||||
break event;
|
||||
}
|
||||
};
|
||||
assert_eq!(event.id, "startup-user-input");
|
||||
assert_matches!(
|
||||
event.msg,
|
||||
EventMsg::Error(ErrorEvent {
|
||||
message,
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}) if message == "thread startup is still in progress"
|
||||
);
|
||||
assert!(
|
||||
session.active_turn.lock().await.is_none(),
|
||||
"rejected input should not create a turn while startup is blocked"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn user_input_returns_error_after_startup_failure() {
|
||||
let (session, rx_event) = make_session_with_config_and_rx(|config| {
|
||||
let _ = config.features.enable(Feature::MultiAgentV2);
|
||||
})
|
||||
.await
|
||||
.expect("session should initialize");
|
||||
session.set_startup_state_for_tests(SessionStartupState::Failed(
|
||||
"required MCP servers failed to initialize".to_string(),
|
||||
));
|
||||
|
||||
handlers::user_input_or_turn(
|
||||
&session,
|
||||
"startup-user-input".to_string(),
|
||||
Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
environments: None,
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
thread_settings: Default::default(),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let event = loop {
|
||||
let event = rx_event.recv().await.expect("startup error event");
|
||||
if matches!(event.msg, EventMsg::Error(_)) {
|
||||
break event;
|
||||
}
|
||||
};
|
||||
assert_eq!(event.id, "startup-user-input");
|
||||
assert_matches!(
|
||||
event.msg,
|
||||
EventMsg::Error(ErrorEvent {
|
||||
message,
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}) if message == "thread failed to initialize: required MCP servers failed to initialize"
|
||||
);
|
||||
assert!(
|
||||
session.active_turn.lock().await.is_none(),
|
||||
"failed startup should keep rejecting direct turn creation"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_task_does_not_update_previous_turn_settings_for_non_run_turn_tasks() {
|
||||
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
|
||||
|
||||
@@ -469,6 +469,13 @@ impl Session {
|
||||
self: &Arc<Self>,
|
||||
sub_id: String,
|
||||
) {
|
||||
// Async subagent startup registers the child thread before slow MCP
|
||||
// initialization finishes. Keep mailbox-triggered work queued until
|
||||
// startup reaches a stable terminal state.
|
||||
if !self.startup_ready() {
|
||||
return;
|
||||
}
|
||||
|
||||
if !self
|
||||
.input_queue
|
||||
.has_queued_response_items_for_next_turn()
|
||||
|
||||
@@ -1253,24 +1253,49 @@ impl ThreadManagerState {
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
let mut codex = Some(codex);
|
||||
let thread = {
|
||||
let mut threads = self.threads.write().await;
|
||||
if let std::collections::hash_map::Entry::Vacant(e) = threads.entry(thread_id) {
|
||||
let Some(codex) = codex.take() else {
|
||||
unreachable!();
|
||||
};
|
||||
let thread = Arc::new(CodexThread::new(
|
||||
codex,
|
||||
session_configured.clone(),
|
||||
session_configured.rollout_path.clone(),
|
||||
session_source,
|
||||
));
|
||||
// Make the thread visible to parent-facing queries before any
|
||||
// deferred startup work can emit status changes or mailbox
|
||||
// notifications.
|
||||
e.insert(thread.clone());
|
||||
return Ok(NewThread {
|
||||
thread_id,
|
||||
thread,
|
||||
session_configured,
|
||||
});
|
||||
Some(thread)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(thread) = thread {
|
||||
// The registration above is the semantic handoff point: after this
|
||||
// call the child exists from the parent's perspective, so delayed
|
||||
// MCP failures can be reported asynchronously without losing the
|
||||
// child identity.
|
||||
thread
|
||||
.codex
|
||||
.session
|
||||
.start_deferred_startup_if_needed()
|
||||
.await;
|
||||
return Ok(NewThread {
|
||||
thread_id,
|
||||
thread,
|
||||
session_configured,
|
||||
});
|
||||
}
|
||||
|
||||
let Some(codex) = codex else {
|
||||
unreachable!();
|
||||
};
|
||||
if let Err(err) = codex.shutdown_and_wait().await {
|
||||
warn!("failed to shut down duplicate thread {thread_id}: {err}");
|
||||
}
|
||||
|
||||
@@ -9,6 +9,10 @@ use std::collections::BTreeMap;
|
||||
pub struct MultiAgentV2ConfigToml {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub enabled: Option<bool>,
|
||||
/// When `true`, spawned MultiAgentV2 children register their thread before
|
||||
/// waiting for slow startup work such as required MCP initialization.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub async_subagent_startup: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[schemars(range(min = 1))]
|
||||
pub max_concurrent_threads_per_session: Option<usize>,
|
||||
|
||||
@@ -521,6 +521,7 @@ fn multi_agent_v2_feature_config_deserializes_table() {
|
||||
r#"
|
||||
[multi_agent_v2]
|
||||
enabled = true
|
||||
async_subagent_startup = false
|
||||
max_concurrent_threads_per_session = 4
|
||||
min_wait_timeout_ms = 2500
|
||||
max_wait_timeout_ms = 120000
|
||||
@@ -544,6 +545,7 @@ non_code_mode_only = true
|
||||
features.multi_agent_v2,
|
||||
Some(crate::FeatureToml::Config(crate::MultiAgentV2ConfigToml {
|
||||
enabled: Some(true),
|
||||
async_subagent_startup: Some(false),
|
||||
max_concurrent_threads_per_session: Some(4),
|
||||
min_wait_timeout_ms: Some(2500),
|
||||
max_wait_timeout_ms: Some(120000),
|
||||
@@ -583,6 +585,7 @@ usage_hint_enabled = false
|
||||
features_toml.multi_agent_v2,
|
||||
Some(crate::FeatureToml::Config(crate::MultiAgentV2ConfigToml {
|
||||
enabled: None,
|
||||
async_subagent_startup: None,
|
||||
max_concurrent_threads_per_session: None,
|
||||
min_wait_timeout_ms: None,
|
||||
max_wait_timeout_ms: None,
|
||||
|
||||
Reference in New Issue
Block a user