Compare commits

...

3 Commits

Author SHA1 Message Date
Edward Frazer
68dc2fda2f fix: prewarm MCP startup notifications in background 2026-05-15 14:32:10 -07:00
Edward Frazer
95d02e8129 fix: align lazy MCP startup with current main 2026-05-15 14:26:42 -07:00
Edward Frazer
8bd391e501 fix: make session MCP startup lazy 2026-05-15 14:09:18 -07:00
16 changed files with 331 additions and 249 deletions

View File

@@ -2755,7 +2755,7 @@ async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Re
}
#[tokio::test]
async fn thread_resume_fails_when_required_mcp_server_fails_to_initialize() -> Result<()> {
async fn thread_resume_returns_before_required_mcp_server_initializes() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?;
@@ -2770,24 +2770,13 @@ async fn thread_resume_fails_when_required_mcp_server_fails_to_initialize() -> R
..Default::default()
})
.await?;
let err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(resume_id)),
)
.await??;
assert!(
err.error
.message
.contains("required MCP servers failed to initialize"),
"unexpected error message: {}",
err.error.message
);
assert!(
err.error.message.contains("required_broken"),
"unexpected error message: {}",
err.error.message
);
let _: ThreadResumeResponse = to_response(
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??,
)?;
Ok(())
}

View File

@@ -586,7 +586,7 @@ async fn thread_start_ephemeral_remains_pathless() -> Result<()> {
}
#[tokio::test]
async fn thread_start_fails_when_required_mcp_server_fails_to_initialize() -> Result<()> {
async fn thread_start_returns_before_required_mcp_server_initializes() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
@@ -599,24 +599,13 @@ async fn thread_start_fails_when_required_mcp_server_fails_to_initialize() -> Re
.send_thread_start_request(ThreadStartParams::default())
.await?;
let err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(req_id)),
)
.await??;
assert!(
err.error
.message
.contains("required MCP servers failed to initialize"),
"unexpected error message: {}",
err.error.message
);
assert!(
err.error.message.contains("required_broken"),
"unexpected error message: {}",
err.error.message
);
let _: ThreadStartResponse = to_response(
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??,
)?;
Ok(())
}

View File

@@ -320,6 +320,7 @@ async fn handle_approved_mcp_tool_call(
let arguments_value = invocation.arguments.clone();
let connector_id = metadata.and_then(|metadata| metadata.connector_id.as_deref());
let connector_name = metadata.and_then(|metadata| metadata.connector_name.as_deref());
sess.ensure_mcp_connection_manager_initialized().await;
let server_origin = sess
.services
.mcp_connection_manager
@@ -1480,6 +1481,7 @@ pub(crate) async fn lookup_mcp_tool_metadata(
server: &str,
tool_name: &str,
) -> Option<McpToolApprovalMetadata> {
sess.ensure_mcp_connection_manager_initialized().await;
let tools = sess
.services
.mcp_connection_manager
@@ -1575,6 +1577,7 @@ async fn lookup_mcp_app_usage_metadata(
server: &str,
tool_name: &str,
) -> Option<McpAppUsageMetadata> {
sess.ensure_mcp_connection_manager_initialized().await;
let tools = sess
.services
.mcp_connection_manager

View File

@@ -468,7 +468,7 @@ pub async fn dynamic_tool_response(sess: &Arc<Session>, id: String, response: Dy
}
pub async fn refresh_mcp_servers(sess: &Arc<Session>, refresh_config: McpServerRefreshConfig) {
let mut guard = sess.pending_mcp_server_refresh_config.lock().await;
let mut guard = sess.pending_mcp_server_refresh.lock().await;
*guard = Some(refresh_config);
}

View File

@@ -1,4 +1,5 @@
use super::*;
use codex_mcp::EffectiveMcpServer;
use codex_mcp::ElicitationReviewRequest;
use codex_mcp::ElicitationReviewer;
use codex_mcp::ElicitationReviewerHandle;
@@ -19,9 +20,24 @@ use rmcp::model::CreateElicitationRequestParams;
use rmcp::model::ElicitationAction;
use rmcp::model::Meta;
use serde_json::Map;
use std::sync::atomic::Ordering;
const MCP_ELICITATION_DECLINE_MESSAGE_KEY: &str = "message";
struct ReplaceMcpConnectionManagerArgs<'a> {
submit_id: String,
approval_policy: &'a Constrained<AskForApproval>,
permission_profile: PermissionProfile,
runtime_environment: McpRuntimeEnvironment,
config: &'a Arc<Config>,
mcp_servers: HashMap<String, EffectiveMcpServer>,
store_mode: OAuthCredentialsStoreMode,
auth: Option<&'a CodexAuth>,
host_owned_codex_apps_enabled: bool,
client_elicitation_capability: ElicitationCapability,
elicitation_reviewer: Option<ElicitationReviewerHandle>,
}
#[derive(Debug, PartialEq)]
enum GuardianElicitationReview {
NotRequested,
@@ -61,6 +77,159 @@ impl Session {
Arc::new(GuardianMcpElicitationReviewer::new(self))
}
pub(crate) fn start_mcp_connection_manager_initialization(self: &Arc<Self>) {
let session = Arc::clone(self);
drop(self.services.runtime_handle.spawn(async move {
session.ensure_mcp_connection_manager_initialized().await;
}));
}
fn session_mcp_runtime_environment(
&self,
session_configuration: &SessionConfiguration,
) -> McpRuntimeEnvironment {
let turn_environment = crate::environment_selection::resolve_environment_selections(
self.services.environment_manager.as_ref(),
&session_configuration.environments,
)
.unwrap_or_else(|err| {
panic!("session MCP environment selections should remain valid: {err}")
})
.primary()
.cloned();
match turn_environment {
Some(turn_environment) => McpRuntimeEnvironment::new(
Arc::clone(&turn_environment.environment),
turn_environment.cwd.to_path_buf(),
),
None => McpRuntimeEnvironment::new(
self.services
.environment_manager
.default_environment()
.unwrap_or_else(|| self.services.environment_manager.local_environment()),
session_configuration.cwd.to_path_buf(),
),
}
}
async fn replace_mcp_connection_manager(&self, args: ReplaceMcpConnectionManagerArgs<'_>) {
let ReplaceMcpConnectionManagerArgs {
submit_id,
approval_policy,
permission_profile,
runtime_environment,
config,
mcp_servers,
store_mode,
auth,
host_owned_codex_apps_enabled,
client_elicitation_capability,
elicitation_reviewer,
} = args;
let tool_plugin_provenance = self
.services
.mcp_manager
.tool_plugin_provenance(config.as_ref())
.await;
let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode, auth).await;
{
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
guard.cancel();
*guard = CancellationToken::new();
}
let (refreshed_manager, cancel_token) = McpConnectionManager::new(
&mcp_servers,
store_mode,
auth_statuses,
approval_policy,
submit_id,
self.get_tx_event(),
permission_profile,
runtime_environment,
config.codex_home.to_path_buf(),
codex_apps_tools_cache_key(auth),
host_owned_codex_apps_enabled,
client_elicitation_capability,
tool_plugin_provenance,
auth,
elicitation_reviewer,
)
.await;
{
let current_manager = self.services.mcp_connection_manager.read().await;
refreshed_manager.set_elicitations_auto_deny(current_manager.elicitations_auto_deny());
}
{
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
if guard.is_cancelled() {
cancel_token.cancel();
}
*guard = cancel_token;
}
let mut old_manager = {
let mut manager = self.services.mcp_connection_manager.write().await;
std::mem::replace(&mut *manager, refreshed_manager)
};
self.mcp_connection_manager_initialized
.store(true, Ordering::Release);
old_manager.shutdown().await;
}
#[expect(
clippy::await_holding_invalid_type,
reason = "session MCP initialization must stay single-flight while the shared pool is built"
)]
pub(crate) async fn ensure_mcp_connection_manager_initialized(&self) {
if self
.mcp_connection_manager_initialized
.load(Ordering::Acquire)
{
return;
}
let _guard = self.services.mcp_connection_manager_init_lock.lock().await;
if self
.mcp_connection_manager_initialized
.load(Ordering::Acquire)
{
return;
}
let auth = self.services.auth_manager.auth().await;
let session_configuration = {
let state = self.state.lock().await;
state.session_configuration.clone()
};
let config = Arc::clone(&session_configuration.original_config_do_not_use);
let mcp_servers = self
.services
.mcp_manager
.effective_servers(&config, auth.as_ref())
.await;
let mcp_config = config
.to_mcp_config(self.services.plugins_manager.as_ref())
.await;
self.replace_mcp_connection_manager(ReplaceMcpConnectionManagerArgs {
submit_id: INITIAL_SUBMIT_ID.to_owned(),
approval_policy: &session_configuration.approval_policy,
permission_profile: session_configuration.permission_profile(),
runtime_environment: self.session_mcp_runtime_environment(&session_configuration),
config: &config,
mcp_servers,
store_mode: config.mcp_oauth_credentials_store_mode,
auth: auth.as_ref(),
host_owned_codex_apps_enabled: host_owned_codex_apps_enabled(
&mcp_config,
auth.as_ref(),
),
client_elicitation_capability: mcp_config.client_elicitation_capability,
elicitation_reviewer: None,
})
.await;
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
@@ -188,6 +357,7 @@ impl Session {
return Ok(());
}
self.ensure_mcp_connection_manager_initialized().await;
self.services
.mcp_connection_manager
.read()
@@ -205,6 +375,7 @@ impl Session {
server: &str,
params: Option<PaginatedRequestParams>,
) -> anyhow::Result<ListResourcesResult> {
self.ensure_mcp_connection_manager_initialized().await;
self.services
.mcp_connection_manager
.read()
@@ -222,6 +393,7 @@ impl Session {
server: &str,
params: Option<PaginatedRequestParams>,
) -> anyhow::Result<ListResourceTemplatesResult> {
self.ensure_mcp_connection_manager_initialized().await;
self.services
.mcp_connection_manager
.read()
@@ -239,6 +411,7 @@ impl Session {
server: &str,
params: ReadResourceRequestParams,
) -> anyhow::Result<ReadResourceResult> {
self.ensure_mcp_connection_manager_initialized().await;
self.services
.mcp_connection_manager
.read()
@@ -258,6 +431,7 @@ impl Session {
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> anyhow::Result<CallToolResult> {
self.ensure_mcp_connection_manager_initialized().await;
self.services
.mcp_connection_manager
.read()
@@ -278,71 +452,39 @@ impl Session {
let mcp_config = config
.to_mcp_config(self.services.plugins_manager.as_ref())
.await;
let tool_plugin_provenance = self
.services
.mcp_manager
.tool_plugin_provenance(config.as_ref())
.await;
let mcp_servers =
effective_mcp_servers_from_configured(mcp_servers, &mcp_config, auth.as_ref());
let host_owned_codex_apps_enabled =
host_owned_codex_apps_enabled(&mcp_config, auth.as_ref());
let auth_statuses =
compute_auth_statuses(mcp_servers.iter(), store_mode, auth.as_ref()).await;
let mcp_runtime_environment = match turn_context.environments.primary() {
Some(turn_environment) => McpRuntimeEnvironment::new(
Arc::clone(&turn_environment.environment),
turn_environment.cwd.to_path_buf(),
),
None => McpRuntimeEnvironment::new(
self.services
.environment_manager
.default_environment()
.unwrap_or_else(|| self.services.environment_manager.local_environment()),
#[allow(deprecated)]
turn_context.cwd.to_path_buf(),
),
};
{
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
guard.cancel();
*guard = CancellationToken::new();
}
let (refreshed_manager, cancel_token) = McpConnectionManager::new(
&mcp_servers,
store_mode,
auth_statuses,
&turn_context.approval_policy,
turn_context.sub_id.clone(),
self.get_tx_event(),
turn_context.permission_profile(),
mcp_runtime_environment,
config.codex_home.to_path_buf(),
codex_apps_tools_cache_key(auth.as_ref()),
host_owned_codex_apps_enabled,
mcp_config.client_elicitation_capability,
tool_plugin_provenance,
auth.as_ref(),
elicitation_reviewer,
)
.await;
{
let current_manager = self.services.mcp_connection_manager.read().await;
refreshed_manager.set_elicitations_auto_deny(current_manager.elicitations_auto_deny());
}
{
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
if guard.is_cancelled() {
cancel_token.cancel();
}
*guard = cancel_token;
}
let mut old_manager = {
let mut manager = self.services.mcp_connection_manager.write().await;
std::mem::replace(&mut *manager, refreshed_manager)
};
old_manager.shutdown().await;
self.replace_mcp_connection_manager(ReplaceMcpConnectionManagerArgs {
submit_id: turn_context.sub_id.clone(),
approval_policy: &turn_context.approval_policy,
permission_profile: turn_context.permission_profile(),
runtime_environment: match turn_context.environments.primary() {
Some(turn_environment) => McpRuntimeEnvironment::new(
Arc::clone(&turn_environment.environment),
turn_environment.cwd.to_path_buf(),
),
None => McpRuntimeEnvironment::new(
self.services
.environment_manager
.default_environment()
.unwrap_or_else(|| self.services.environment_manager.local_environment()),
#[allow(deprecated)]
turn_context.cwd.to_path_buf(),
),
},
config: &config,
mcp_servers,
store_mode,
auth: auth.as_ref(),
host_owned_codex_apps_enabled: host_owned_codex_apps_enabled(
&mcp_config,
auth.as_ref(),
),
client_elicitation_capability: mcp_config.client_elicitation_capability,
elicitation_reviewer,
})
.await;
}
pub(crate) async fn refresh_mcp_servers_if_requested(
@@ -350,15 +492,15 @@ impl Session {
turn_context: &TurnContext,
elicitation_reviewer: Option<ElicitationReviewerHandle>,
) {
let refresh_config = { self.pending_mcp_server_refresh_config.lock().await.take() };
let Some(refresh_config) = refresh_config else {
let refresh = { self.pending_mcp_server_refresh.lock().await.take() };
let Some(refresh) = refresh else {
return;
};
let McpServerRefreshConfig {
mcp_servers,
mcp_oauth_credentials_store_mode,
} = refresh_config;
} = refresh;
let mcp_servers =
match serde_json::from_value::<HashMap<String, McpServerConfig>>(mcp_servers) {

View File

@@ -142,14 +142,12 @@ use futures::future::BoxFuture;
use futures::future::Shared;
use futures::prelude::*;
use rmcp::model::ElicitationCapability;
use rmcp::model::FormElicitationCapability;
use rmcp::model::ListResourceTemplatesResult;
use rmcp::model::ListResourcesResult;
use rmcp::model::PaginatedRequestParams;
use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use rmcp::model::RequestId;
use rmcp::model::UrlElicitationCapability;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
@@ -2722,6 +2720,7 @@ impl Session {
}
}
if turn_context.config.include_apps_instructions && turn_context.apps_enabled() {
self.ensure_mcp_connection_manager_initialized().await;
let mcp_connection_manager = self.services.mcp_connection_manager.read().await;
let accessible_and_enabled_connectors =
connectors::list_accessible_and_enabled_connectors_from_manager(

View File

@@ -24,7 +24,8 @@ pub(crate) struct Session {
/// The set of enabled features should be invariant for the lifetime of the
/// session.
pub(super) features: ManagedFeatures,
pub(super) pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(super) pending_mcp_server_refresh: Mutex<Option<McpServerRefreshConfig>>,
pub(super) mcp_connection_manager_initialized: std::sync::atomic::AtomicBool,
pub(crate) conversation: Arc<RealtimeConversationManager>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(super) mailbox: Mailbox,
@@ -407,10 +408,6 @@ impl Session {
#[instrument(name = "session_init", level = "info", skip_all)]
#[allow(clippy::too_many_arguments)]
#[expect(
clippy::await_holding_invalid_type,
reason = "session initialization must serialize access through session-owned manager guards"
)]
pub(crate) async fn new(
mut session_configuration: SessionConfiguration,
config: Arc<Config>,
@@ -554,13 +551,7 @@ impl Session {
let mcp_servers = mcp_manager_for_mcp
.effective_servers(&config_for_mcp, auth.as_ref())
.await;
let auth_statuses = compute_auth_statuses(
mcp_servers.iter(),
config_for_mcp.mcp_oauth_credentials_store_mode,
auth.as_ref(),
)
.await;
(auth, mcp_servers, auth_statuses)
(auth, mcp_servers)
}
.instrument(info_span!(
"session_init.auth_mcp",
@@ -568,7 +559,7 @@ impl Session {
));
// Join all independent futures.
let (thread_persistence_result, state_db_ctx, (auth, mcp_servers, auth_statuses)) =
let (thread_persistence_result, state_db_ctx, (auth, mcp_servers)) =
tokio::join!(thread_persistence_fut, state_db_fut, auth_and_mcp_fut);
let mut live_thread_init =
@@ -885,6 +876,7 @@ impl Session {
config.permissions.permission_profile(),
),
)),
mcp_connection_manager_init_lock: Mutex::new(()),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
@@ -951,7 +943,8 @@ impl Session {
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
pending_mcp_server_refresh: Mutex::new(None),
mcp_connection_manager_initialized: std::sync::atomic::AtomicBool::new(false),
conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None),
mailbox,
@@ -1001,115 +994,7 @@ impl Session {
for event in events {
sess.send_event_raw(event).await;
}
let mut required_mcp_servers: Vec<String> = mcp_servers
.iter()
.filter(|(_, server)| server.enabled() && server.required())
.map(|(name, _)| name.clone())
.collect();
required_mcp_servers.sort();
let enabled_mcp_server_count =
mcp_servers.values().filter(|server| server.enabled()).count();
let required_mcp_server_count = required_mcp_servers.len();
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()).await;
let host_owned_codex_apps_enabled = config
.features
.apps_enabled_for_auth(auth.as_ref().is_some_and(|auth| auth.uses_codex_backend()));
let client_elicitation_capability = if config.features.enabled(Feature::AuthElicitation) {
ElicitationCapability {
form: Some(FormElicitationCapability::default()),
url: Some(UrlElicitationCapability::default()),
}
} else {
ElicitationCapability::default()
};
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
cancel_guard.cancel();
*cancel_guard = CancellationToken::new();
}
let turn_environment = crate::environment_selection::resolve_environment_selections(
sess.services.environment_manager.as_ref(),
&session_configuration.environments,
)
.map_err(|err| {
CodexErr::InvalidRequest(err.to_string().replace(
"unknown turn environment id",
"unknown stored MCP environment id",
))
})?
.primary()
.cloned();
let mcp_runtime_environment = match turn_environment {
Some(turn_environment) => McpRuntimeEnvironment::new(
Arc::clone(&turn_environment.environment),
turn_environment.cwd.to_path_buf(),
),
None => McpRuntimeEnvironment::new(
sess.services
.environment_manager
.default_environment()
.unwrap_or_else(|| sess.services.environment_manager.local_environment()),
session_configuration.cwd.to_path_buf(),
),
};
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_statuses.clone(),
&session_configuration.approval_policy,
INITIAL_SUBMIT_ID.to_owned(),
tx_event.clone(),
session_configuration.permission_profile(),
mcp_runtime_environment,
config.codex_home.to_path_buf(),
codex_apps_tools_cache_key(auth),
host_owned_codex_apps_enabled,
client_elicitation_capability,
tool_plugin_provenance,
auth,
Some(sess.mcp_elicitation_reviewer()),
)
.instrument(info_span!(
"session_init.mcp_manager_init",
otel.name = "session_init.mcp_manager_init",
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
{
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
*manager_guard = mcp_connection_manager;
}
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
if cancel_guard.is_cancelled() {
cancel_token.cancel();
}
*cancel_guard = cancel_token;
}
if !required_mcp_servers.is_empty() {
let failures = sess
.services
.mcp_connection_manager
.read()
.await
.required_startup_failures(&required_mcp_servers)
.instrument(info_span!(
"session_init.required_mcp_wait",
otel.name = "session_init.required_mcp_wait",
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
if !failures.is_empty() {
let details = failures
.iter()
.map(|failure| format!("{}: {}", failure.server, failure.error))
.collect::<Vec<_>>()
.join("; ");
anyhow::bail!("required MCP servers failed to initialize: {details}");
}
}
sess.start_mcp_connection_manager_initialization();
sess.schedule_startup_prewarm(session_configuration.base_instructions.clone())
.await;
let session_start_source = match &initial_history {

View File

@@ -4300,6 +4300,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
config.permissions.permission_profile(),
),
)),
mcp_connection_manager_init_lock: Mutex::new(()),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
@@ -4408,7 +4409,8 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
pending_mcp_server_refresh: Mutex::new(None),
mcp_connection_manager_initialized: std::sync::atomic::AtomicBool::new(false),
conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None),
mailbox,
@@ -6156,6 +6158,7 @@ where
config.permissions.permission_profile(),
),
)),
mcp_connection_manager_init_lock: Mutex::new(()),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
@@ -6264,7 +6267,8 @@ where
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
pending_mcp_server_refresh: Mutex::new(None),
mcp_connection_manager_initialized: std::sync::atomic::AtomicBool::new(false),
conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None),
mailbox,
@@ -6363,35 +6367,41 @@ async fn refresh_mcp_servers_is_deferred_until_next_turn() {
mcp_oauth_credentials_store_mode,
};
{
let mut guard = session.pending_mcp_server_refresh_config.lock().await;
let mut guard = session.pending_mcp_server_refresh.lock().await;
*guard = Some(refresh_config);
}
assert!(!old_token.is_cancelled());
assert!(
session
.pending_mcp_server_refresh_config
.lock()
.await
.is_some()
);
assert!(session.pending_mcp_server_refresh.lock().await.is_some());
session
.refresh_mcp_servers_if_requested(&turn_context, /*elicitation_reviewer*/ None)
.await;
assert!(old_token.is_cancelled());
assert!(
session
.pending_mcp_server_refresh_config
.lock()
.await
.is_none()
);
assert!(session.pending_mcp_server_refresh.lock().await.is_none());
let new_token = session.mcp_startup_cancellation_token().await;
assert!(!new_token.is_cancelled());
}
#[tokio::test]
async fn lazy_mcp_initialization_is_idempotent() {
let (session, _turn_context) = make_session_and_context().await;
session.ensure_mcp_connection_manager_initialized().await;
let first_token = session.mcp_startup_cancellation_token().await;
assert!(!first_token.is_cancelled());
session.ensure_mcp_connection_manager_initialized().await;
assert!(
!first_token.is_cancelled(),
"reinitializing the session-owned MCP pool should not cancel the first token",
);
let second_token = session.mcp_startup_cancellation_token().await;
assert!(!second_token.is_cancelled());
}
#[tokio::test]
async fn spawn_task_does_not_update_previous_turn_settings_for_non_run_turn_tasks() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;

View File

@@ -188,6 +188,7 @@ pub(crate) async fn run_turn(
// Plugin mentions need raw MCP/app inventory even when app tools
// are normally hidden so we can describe the plugin's currently
// usable capabilities for this turn.
sess.ensure_mcp_connection_manager_initialized().await;
match sess
.services
.mcp_connection_manager
@@ -1165,6 +1166,7 @@ pub(crate) async fn built_tools(
skills_outcome: Option<&SkillLoadOutcome>,
cancellation_token: &CancellationToken,
) -> CodexResult<Arc<ToolRouter>> {
sess.ensure_mcp_connection_manager_initialized().await;
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
let has_mcp_servers = mcp_connection_manager.has_servers();
let all_mcp_tools = mcp_connection_manager

View File

@@ -38,6 +38,7 @@ use tokio_util::sync::CancellationToken;
pub(crate) struct SessionServices {
pub(crate) mcp_connection_manager: Arc<RwLock<McpConnectionManager>>,
pub(crate) mcp_connection_manager_init_lock: Mutex<()>,
pub(crate) mcp_startup_cancellation_token: Mutex<CancellationToken>,
pub(crate) unified_exec_manager: UnifiedExecProcessManager,
#[cfg_attr(not(unix), allow(dead_code))]

View File

@@ -4,6 +4,7 @@ mod response_adapter;
mod wait_handler;
pub(crate) mod wait_spec;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
@@ -29,9 +30,11 @@ use crate::tools::context::ToolPayload;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::router::ToolCall;
use crate::tools::router::ToolCallSource;
use crate::tools::router::ToolRouterParams;
use crate::unified_exec::resolve_max_tokens;
use codex_features::Feature;
use codex_tools::ToolName;
use codex_tools::collect_code_mode_tool_definitions;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::formatted_truncate_text_content_items_with_policy;
use codex_utils_output_truncation::truncate_function_output_items_with_policy;
@@ -257,6 +260,64 @@ fn truncate_code_mode_result(
truncate_function_output_items_with_policy(&items, policy)
}
pub(super) async fn build_enabled_tools(
exec: &ExecContext,
) -> Vec<codex_code_mode::ToolDefinition> {
let router = build_nested_router(exec).await;
let specs = router.model_visible_specs();
collect_code_mode_tool_definitions(&specs)
}
#[expect(
clippy::await_holding_invalid_type,
reason = "nested tool router construction reads through the session-owned manager guard"
)]
async fn build_nested_router(exec: &ExecContext) -> ToolRouter {
let nested_tools_config = exec.turn.tools_config.for_code_mode_nested_tools();
exec.session
.ensure_mcp_connection_manager_initialized()
.await;
let listed_mcp_tools = exec
.session
.services
.mcp_connection_manager
.read()
.await
.list_all_tools()
.await;
let parallel_mcp_server_names = exec
.turn
.config
.mcp_servers
.get()
.iter()
.filter_map(|(server_name, server_config)| {
server_config
.supports_parallel_tool_calls
.then_some(server_name.clone())
})
.collect::<HashSet<_>>();
let listed_mcp_tools = listed_mcp_tools
.into_iter()
.map(|mut tool| {
tool.supports_parallel_tool_calls =
parallel_mcp_server_names.contains(&tool.server_name);
tool
})
.collect();
ToolRouter::from_config(
&nested_tools_config,
ToolRouterParams {
deferred_mcp_tools: None,
mcp_tools: Some(listed_mcp_tools),
discoverable_tools: None,
extension_tool_executors: Vec::new(),
dynamic_tools: exec.turn.dynamic_tools.as_slice(),
},
)
}
async fn call_nested_tool(
_exec: ExecContext,
tool_runtime: ToolCallRuntime,

View File

@@ -33,7 +33,6 @@ pub use read_mcp_resource::ReadMcpResourceHandler;
#[derive(Debug, Deserialize, Default)]
struct ListResourcesArgs {
/// Lists all resources from all servers if not specified.
#[serde(default)]
server: Option<String>,
#[serde(default)]
@@ -42,7 +41,6 @@ struct ListResourcesArgs {
#[derive(Debug, Deserialize, Default)]
struct ListResourceTemplatesArgs {
/// Lists all resource templates from all servers if not specified.
#[serde(default)]
server: Option<String>,
#[serde(default)]

View File

@@ -105,6 +105,7 @@ impl ToolExecutor<ToolInvocation> for ListMcpResourceTemplatesHandler {
));
}
session.ensure_mcp_connection_manager_initialized().await;
let templates = session
.services
.mcp_connection_manager

View File

@@ -103,6 +103,7 @@ impl ToolExecutor<ToolInvocation> for ListMcpResourcesHandler {
));
}
session.ensure_mcp_connection_manager_initialized().await;
let resources = session
.services
.mcp_connection_manager

View File

@@ -112,6 +112,7 @@ impl ToolExecutor<ToolInvocation> for RequestPluginInstallHandler {
}
let auth = session.services.auth_manager.auth().await;
session.ensure_mcp_connection_manager_initialized().await;
let manager = session.services.mcp_connection_manager.read().await;
let mcp_tools = manager.list_all_tools().await;
drop(manager);
@@ -314,6 +315,7 @@ async fn refresh_missing_requested_connectors(
return Some(Vec::new());
}
session.ensure_mcp_connection_manager_initialized().await;
let manager = session.services.mcp_connection_manager.read().await;
let mcp_tools = manager.list_all_tools().await;
let accessible_connectors = connectors::with_app_enabled_state(

View File

@@ -332,7 +332,6 @@ impl ToolRegistry {
),
),
];
{
let mut active = invocation.session.active_turn.lock().await;
if let Some(active_turn) = active.as_mut() {