mirror of
https://github.com/openai/codex.git
synced 2026-06-02 11:22:01 +00:00
Compare commits
3 Commits
rust-v0.13
...
fix/lazy-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
68dc2fda2f | ||
|
|
95d02e8129 | ||
|
|
8bd391e501 |
@@ -2755,7 +2755,7 @@ async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Re
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_fails_when_required_mcp_server_fails_to_initialize() -> Result<()> {
|
||||
async fn thread_resume_returns_before_required_mcp_server_initializes() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?;
|
||||
@@ -2770,24 +2770,13 @@ async fn thread_resume_fails_when_required_mcp_server_fails_to_initialize() -> R
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
err.error
|
||||
.message
|
||||
.contains("required MCP servers failed to initialize"),
|
||||
"unexpected error message: {}",
|
||||
err.error.message
|
||||
);
|
||||
assert!(
|
||||
err.error.message.contains("required_broken"),
|
||||
"unexpected error message: {}",
|
||||
err.error.message
|
||||
);
|
||||
let _: ThreadResumeResponse = to_response(
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -586,7 +586,7 @@ async fn thread_start_ephemeral_remains_pathless() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_start_fails_when_required_mcp_server_fails_to_initialize() -> Result<()> {
|
||||
async fn thread_start_returns_before_required_mcp_server_initializes() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -599,24 +599,13 @@ async fn thread_start_fails_when_required_mcp_server_fails_to_initialize() -> Re
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
|
||||
let err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(req_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
err.error
|
||||
.message
|
||||
.contains("required MCP servers failed to initialize"),
|
||||
"unexpected error message: {}",
|
||||
err.error.message
|
||||
);
|
||||
assert!(
|
||||
err.error.message.contains("required_broken"),
|
||||
"unexpected error message: {}",
|
||||
err.error.message
|
||||
);
|
||||
let _: ThreadStartResponse = to_response(
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
|
||||
)
|
||||
.await??,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -320,6 +320,7 @@ async fn handle_approved_mcp_tool_call(
|
||||
let arguments_value = invocation.arguments.clone();
|
||||
let connector_id = metadata.and_then(|metadata| metadata.connector_id.as_deref());
|
||||
let connector_name = metadata.and_then(|metadata| metadata.connector_name.as_deref());
|
||||
sess.ensure_mcp_connection_manager_initialized().await;
|
||||
let server_origin = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
@@ -1480,6 +1481,7 @@ pub(crate) async fn lookup_mcp_tool_metadata(
|
||||
server: &str,
|
||||
tool_name: &str,
|
||||
) -> Option<McpToolApprovalMetadata> {
|
||||
sess.ensure_mcp_connection_manager_initialized().await;
|
||||
let tools = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
@@ -1575,6 +1577,7 @@ async fn lookup_mcp_app_usage_metadata(
|
||||
server: &str,
|
||||
tool_name: &str,
|
||||
) -> Option<McpAppUsageMetadata> {
|
||||
sess.ensure_mcp_connection_manager_initialized().await;
|
||||
let tools = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
|
||||
@@ -468,7 +468,7 @@ pub async fn dynamic_tool_response(sess: &Arc<Session>, id: String, response: Dy
|
||||
}
|
||||
|
||||
pub async fn refresh_mcp_servers(sess: &Arc<Session>, refresh_config: McpServerRefreshConfig) {
|
||||
let mut guard = sess.pending_mcp_server_refresh_config.lock().await;
|
||||
let mut guard = sess.pending_mcp_server_refresh.lock().await;
|
||||
*guard = Some(refresh_config);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use codex_mcp::EffectiveMcpServer;
|
||||
use codex_mcp::ElicitationReviewRequest;
|
||||
use codex_mcp::ElicitationReviewer;
|
||||
use codex_mcp::ElicitationReviewerHandle;
|
||||
@@ -19,9 +20,24 @@ use rmcp::model::CreateElicitationRequestParams;
|
||||
use rmcp::model::ElicitationAction;
|
||||
use rmcp::model::Meta;
|
||||
use serde_json::Map;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
const MCP_ELICITATION_DECLINE_MESSAGE_KEY: &str = "message";
|
||||
|
||||
struct ReplaceMcpConnectionManagerArgs<'a> {
|
||||
submit_id: String,
|
||||
approval_policy: &'a Constrained<AskForApproval>,
|
||||
permission_profile: PermissionProfile,
|
||||
runtime_environment: McpRuntimeEnvironment,
|
||||
config: &'a Arc<Config>,
|
||||
mcp_servers: HashMap<String, EffectiveMcpServer>,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
auth: Option<&'a CodexAuth>,
|
||||
host_owned_codex_apps_enabled: bool,
|
||||
client_elicitation_capability: ElicitationCapability,
|
||||
elicitation_reviewer: Option<ElicitationReviewerHandle>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
enum GuardianElicitationReview {
|
||||
NotRequested,
|
||||
@@ -61,6 +77,159 @@ impl Session {
|
||||
Arc::new(GuardianMcpElicitationReviewer::new(self))
|
||||
}
|
||||
|
||||
pub(crate) fn start_mcp_connection_manager_initialization(self: &Arc<Self>) {
|
||||
let session = Arc::clone(self);
|
||||
drop(self.services.runtime_handle.spawn(async move {
|
||||
session.ensure_mcp_connection_manager_initialized().await;
|
||||
}));
|
||||
}
|
||||
|
||||
fn session_mcp_runtime_environment(
|
||||
&self,
|
||||
session_configuration: &SessionConfiguration,
|
||||
) -> McpRuntimeEnvironment {
|
||||
let turn_environment = crate::environment_selection::resolve_environment_selections(
|
||||
self.services.environment_manager.as_ref(),
|
||||
&session_configuration.environments,
|
||||
)
|
||||
.unwrap_or_else(|err| {
|
||||
panic!("session MCP environment selections should remain valid: {err}")
|
||||
})
|
||||
.primary()
|
||||
.cloned();
|
||||
match turn_environment {
|
||||
Some(turn_environment) => McpRuntimeEnvironment::new(
|
||||
Arc::clone(&turn_environment.environment),
|
||||
turn_environment.cwd.to_path_buf(),
|
||||
),
|
||||
None => McpRuntimeEnvironment::new(
|
||||
self.services
|
||||
.environment_manager
|
||||
.default_environment()
|
||||
.unwrap_or_else(|| self.services.environment_manager.local_environment()),
|
||||
session_configuration.cwd.to_path_buf(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
async fn replace_mcp_connection_manager(&self, args: ReplaceMcpConnectionManagerArgs<'_>) {
|
||||
let ReplaceMcpConnectionManagerArgs {
|
||||
submit_id,
|
||||
approval_policy,
|
||||
permission_profile,
|
||||
runtime_environment,
|
||||
config,
|
||||
mcp_servers,
|
||||
store_mode,
|
||||
auth,
|
||||
host_owned_codex_apps_enabled,
|
||||
client_elicitation_capability,
|
||||
elicitation_reviewer,
|
||||
} = args;
|
||||
let tool_plugin_provenance = self
|
||||
.services
|
||||
.mcp_manager
|
||||
.tool_plugin_provenance(config.as_ref())
|
||||
.await;
|
||||
let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode, auth).await;
|
||||
{
|
||||
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
guard.cancel();
|
||||
*guard = CancellationToken::new();
|
||||
}
|
||||
let (refreshed_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
store_mode,
|
||||
auth_statuses,
|
||||
approval_policy,
|
||||
submit_id,
|
||||
self.get_tx_event(),
|
||||
permission_profile,
|
||||
runtime_environment,
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth),
|
||||
host_owned_codex_apps_enabled,
|
||||
client_elicitation_capability,
|
||||
tool_plugin_provenance,
|
||||
auth,
|
||||
elicitation_reviewer,
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let current_manager = self.services.mcp_connection_manager.read().await;
|
||||
refreshed_manager.set_elicitations_auto_deny(current_manager.elicitations_auto_deny());
|
||||
}
|
||||
{
|
||||
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
if guard.is_cancelled() {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
*guard = cancel_token;
|
||||
}
|
||||
|
||||
let mut old_manager = {
|
||||
let mut manager = self.services.mcp_connection_manager.write().await;
|
||||
std::mem::replace(&mut *manager, refreshed_manager)
|
||||
};
|
||||
self.mcp_connection_manager_initialized
|
||||
.store(true, Ordering::Release);
|
||||
old_manager.shutdown().await;
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "session MCP initialization must stay single-flight while the shared pool is built"
|
||||
)]
|
||||
pub(crate) async fn ensure_mcp_connection_manager_initialized(&self) {
|
||||
if self
|
||||
.mcp_connection_manager_initialized
|
||||
.load(Ordering::Acquire)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let _guard = self.services.mcp_connection_manager_init_lock.lock().await;
|
||||
if self
|
||||
.mcp_connection_manager_initialized
|
||||
.load(Ordering::Acquire)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let auth = self.services.auth_manager.auth().await;
|
||||
let session_configuration = {
|
||||
let state = self.state.lock().await;
|
||||
state.session_configuration.clone()
|
||||
};
|
||||
let config = Arc::clone(&session_configuration.original_config_do_not_use);
|
||||
let mcp_servers = self
|
||||
.services
|
||||
.mcp_manager
|
||||
.effective_servers(&config, auth.as_ref())
|
||||
.await;
|
||||
let mcp_config = config
|
||||
.to_mcp_config(self.services.plugins_manager.as_ref())
|
||||
.await;
|
||||
|
||||
self.replace_mcp_connection_manager(ReplaceMcpConnectionManagerArgs {
|
||||
submit_id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
approval_policy: &session_configuration.approval_policy,
|
||||
permission_profile: session_configuration.permission_profile(),
|
||||
runtime_environment: self.session_mcp_runtime_environment(&session_configuration),
|
||||
config: &config,
|
||||
mcp_servers,
|
||||
store_mode: config.mcp_oauth_credentials_store_mode,
|
||||
auth: auth.as_ref(),
|
||||
host_owned_codex_apps_enabled: host_owned_codex_apps_enabled(
|
||||
&mcp_config,
|
||||
auth.as_ref(),
|
||||
),
|
||||
client_elicitation_capability: mcp_config.client_elicitation_capability,
|
||||
elicitation_reviewer: None,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "active turn checks and turn state updates must remain atomic"
|
||||
@@ -188,6 +357,7 @@ impl Session {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.ensure_mcp_connection_manager_initialized().await;
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
@@ -205,6 +375,7 @@ impl Session {
|
||||
server: &str,
|
||||
params: Option<PaginatedRequestParams>,
|
||||
) -> anyhow::Result<ListResourcesResult> {
|
||||
self.ensure_mcp_connection_manager_initialized().await;
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
@@ -222,6 +393,7 @@ impl Session {
|
||||
server: &str,
|
||||
params: Option<PaginatedRequestParams>,
|
||||
) -> anyhow::Result<ListResourceTemplatesResult> {
|
||||
self.ensure_mcp_connection_manager_initialized().await;
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
@@ -239,6 +411,7 @@ impl Session {
|
||||
server: &str,
|
||||
params: ReadResourceRequestParams,
|
||||
) -> anyhow::Result<ReadResourceResult> {
|
||||
self.ensure_mcp_connection_manager_initialized().await;
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
@@ -258,6 +431,7 @@ impl Session {
|
||||
arguments: Option<serde_json::Value>,
|
||||
meta: Option<serde_json::Value>,
|
||||
) -> anyhow::Result<CallToolResult> {
|
||||
self.ensure_mcp_connection_manager_initialized().await;
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
@@ -278,71 +452,39 @@ impl Session {
|
||||
let mcp_config = config
|
||||
.to_mcp_config(self.services.plugins_manager.as_ref())
|
||||
.await;
|
||||
let tool_plugin_provenance = self
|
||||
.services
|
||||
.mcp_manager
|
||||
.tool_plugin_provenance(config.as_ref())
|
||||
.await;
|
||||
let mcp_servers =
|
||||
effective_mcp_servers_from_configured(mcp_servers, &mcp_config, auth.as_ref());
|
||||
let host_owned_codex_apps_enabled =
|
||||
host_owned_codex_apps_enabled(&mcp_config, auth.as_ref());
|
||||
let auth_statuses =
|
||||
compute_auth_statuses(mcp_servers.iter(), store_mode, auth.as_ref()).await;
|
||||
let mcp_runtime_environment = match turn_context.environments.primary() {
|
||||
Some(turn_environment) => McpRuntimeEnvironment::new(
|
||||
Arc::clone(&turn_environment.environment),
|
||||
turn_environment.cwd.to_path_buf(),
|
||||
),
|
||||
None => McpRuntimeEnvironment::new(
|
||||
self.services
|
||||
.environment_manager
|
||||
.default_environment()
|
||||
.unwrap_or_else(|| self.services.environment_manager.local_environment()),
|
||||
#[allow(deprecated)]
|
||||
turn_context.cwd.to_path_buf(),
|
||||
),
|
||||
};
|
||||
{
|
||||
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
guard.cancel();
|
||||
*guard = CancellationToken::new();
|
||||
}
|
||||
let (refreshed_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
store_mode,
|
||||
auth_statuses,
|
||||
&turn_context.approval_policy,
|
||||
turn_context.sub_id.clone(),
|
||||
self.get_tx_event(),
|
||||
turn_context.permission_profile(),
|
||||
mcp_runtime_environment,
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth.as_ref()),
|
||||
host_owned_codex_apps_enabled,
|
||||
mcp_config.client_elicitation_capability,
|
||||
tool_plugin_provenance,
|
||||
auth.as_ref(),
|
||||
elicitation_reviewer,
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let current_manager = self.services.mcp_connection_manager.read().await;
|
||||
refreshed_manager.set_elicitations_auto_deny(current_manager.elicitations_auto_deny());
|
||||
}
|
||||
{
|
||||
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
if guard.is_cancelled() {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
*guard = cancel_token;
|
||||
}
|
||||
|
||||
let mut old_manager = {
|
||||
let mut manager = self.services.mcp_connection_manager.write().await;
|
||||
std::mem::replace(&mut *manager, refreshed_manager)
|
||||
};
|
||||
old_manager.shutdown().await;
|
||||
self.replace_mcp_connection_manager(ReplaceMcpConnectionManagerArgs {
|
||||
submit_id: turn_context.sub_id.clone(),
|
||||
approval_policy: &turn_context.approval_policy,
|
||||
permission_profile: turn_context.permission_profile(),
|
||||
runtime_environment: match turn_context.environments.primary() {
|
||||
Some(turn_environment) => McpRuntimeEnvironment::new(
|
||||
Arc::clone(&turn_environment.environment),
|
||||
turn_environment.cwd.to_path_buf(),
|
||||
),
|
||||
None => McpRuntimeEnvironment::new(
|
||||
self.services
|
||||
.environment_manager
|
||||
.default_environment()
|
||||
.unwrap_or_else(|| self.services.environment_manager.local_environment()),
|
||||
#[allow(deprecated)]
|
||||
turn_context.cwd.to_path_buf(),
|
||||
),
|
||||
},
|
||||
config: &config,
|
||||
mcp_servers,
|
||||
store_mode,
|
||||
auth: auth.as_ref(),
|
||||
host_owned_codex_apps_enabled: host_owned_codex_apps_enabled(
|
||||
&mcp_config,
|
||||
auth.as_ref(),
|
||||
),
|
||||
client_elicitation_capability: mcp_config.client_elicitation_capability,
|
||||
elicitation_reviewer,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn refresh_mcp_servers_if_requested(
|
||||
@@ -350,15 +492,15 @@ impl Session {
|
||||
turn_context: &TurnContext,
|
||||
elicitation_reviewer: Option<ElicitationReviewerHandle>,
|
||||
) {
|
||||
let refresh_config = { self.pending_mcp_server_refresh_config.lock().await.take() };
|
||||
let Some(refresh_config) = refresh_config else {
|
||||
let refresh = { self.pending_mcp_server_refresh.lock().await.take() };
|
||||
let Some(refresh) = refresh else {
|
||||
return;
|
||||
};
|
||||
|
||||
let McpServerRefreshConfig {
|
||||
mcp_servers,
|
||||
mcp_oauth_credentials_store_mode,
|
||||
} = refresh_config;
|
||||
} = refresh;
|
||||
|
||||
let mcp_servers =
|
||||
match serde_json::from_value::<HashMap<String, McpServerConfig>>(mcp_servers) {
|
||||
|
||||
@@ -142,14 +142,12 @@ use futures::future::BoxFuture;
|
||||
use futures::future::Shared;
|
||||
use futures::prelude::*;
|
||||
use rmcp::model::ElicitationCapability;
|
||||
use rmcp::model::FormElicitationCapability;
|
||||
use rmcp::model::ListResourceTemplatesResult;
|
||||
use rmcp::model::ListResourcesResult;
|
||||
use rmcp::model::PaginatedRequestParams;
|
||||
use rmcp::model::ReadResourceRequestParams;
|
||||
use rmcp::model::ReadResourceResult;
|
||||
use rmcp::model::RequestId;
|
||||
use rmcp::model::UrlElicitationCapability;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
@@ -2722,6 +2720,7 @@ impl Session {
|
||||
}
|
||||
}
|
||||
if turn_context.config.include_apps_instructions && turn_context.apps_enabled() {
|
||||
self.ensure_mcp_connection_manager_initialized().await;
|
||||
let mcp_connection_manager = self.services.mcp_connection_manager.read().await;
|
||||
let accessible_and_enabled_connectors =
|
||||
connectors::list_accessible_and_enabled_connectors_from_manager(
|
||||
|
||||
@@ -24,7 +24,8 @@ pub(crate) struct Session {
|
||||
/// The set of enabled features should be invariant for the lifetime of the
|
||||
/// session.
|
||||
pub(super) features: ManagedFeatures,
|
||||
pub(super) pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
|
||||
pub(super) pending_mcp_server_refresh: Mutex<Option<McpServerRefreshConfig>>,
|
||||
pub(super) mcp_connection_manager_initialized: std::sync::atomic::AtomicBool,
|
||||
pub(crate) conversation: Arc<RealtimeConversationManager>,
|
||||
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
|
||||
pub(super) mailbox: Mailbox,
|
||||
@@ -407,10 +408,6 @@ impl Session {
|
||||
|
||||
#[instrument(name = "session_init", level = "info", skip_all)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "session initialization must serialize access through session-owned manager guards"
|
||||
)]
|
||||
pub(crate) async fn new(
|
||||
mut session_configuration: SessionConfiguration,
|
||||
config: Arc<Config>,
|
||||
@@ -554,13 +551,7 @@ impl Session {
|
||||
let mcp_servers = mcp_manager_for_mcp
|
||||
.effective_servers(&config_for_mcp, auth.as_ref())
|
||||
.await;
|
||||
let auth_statuses = compute_auth_statuses(
|
||||
mcp_servers.iter(),
|
||||
config_for_mcp.mcp_oauth_credentials_store_mode,
|
||||
auth.as_ref(),
|
||||
)
|
||||
.await;
|
||||
(auth, mcp_servers, auth_statuses)
|
||||
(auth, mcp_servers)
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.auth_mcp",
|
||||
@@ -568,7 +559,7 @@ impl Session {
|
||||
));
|
||||
|
||||
// Join all independent futures.
|
||||
let (thread_persistence_result, state_db_ctx, (auth, mcp_servers, auth_statuses)) =
|
||||
let (thread_persistence_result, state_db_ctx, (auth, mcp_servers)) =
|
||||
tokio::join!(thread_persistence_fut, state_db_fut, auth_and_mcp_fut);
|
||||
|
||||
let mut live_thread_init =
|
||||
@@ -885,6 +876,7 @@ impl Session {
|
||||
config.permissions.permission_profile(),
|
||||
),
|
||||
)),
|
||||
mcp_connection_manager_init_lock: Mutex::new(()),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
@@ -951,7 +943,8 @@ impl Session {
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
|
||||
features: config.features.clone(),
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
pending_mcp_server_refresh: Mutex::new(None),
|
||||
mcp_connection_manager_initialized: std::sync::atomic::AtomicBool::new(false),
|
||||
conversation: Arc::new(RealtimeConversationManager::new()),
|
||||
active_turn: Mutex::new(None),
|
||||
mailbox,
|
||||
@@ -1001,115 +994,7 @@ impl Session {
|
||||
for event in events {
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
|
||||
let mut required_mcp_servers: Vec<String> = mcp_servers
|
||||
.iter()
|
||||
.filter(|(_, server)| server.enabled() && server.required())
|
||||
.map(|(name, _)| name.clone())
|
||||
.collect();
|
||||
required_mcp_servers.sort();
|
||||
let enabled_mcp_server_count =
|
||||
mcp_servers.values().filter(|server| server.enabled()).count();
|
||||
let required_mcp_server_count = required_mcp_servers.len();
|
||||
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()).await;
|
||||
let host_owned_codex_apps_enabled = config
|
||||
.features
|
||||
.apps_enabled_for_auth(auth.as_ref().is_some_and(|auth| auth.uses_codex_backend()));
|
||||
let client_elicitation_capability = if config.features.enabled(Feature::AuthElicitation) {
|
||||
ElicitationCapability {
|
||||
form: Some(FormElicitationCapability::default()),
|
||||
url: Some(UrlElicitationCapability::default()),
|
||||
}
|
||||
} else {
|
||||
ElicitationCapability::default()
|
||||
};
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
cancel_guard.cancel();
|
||||
*cancel_guard = CancellationToken::new();
|
||||
}
|
||||
let turn_environment = crate::environment_selection::resolve_environment_selections(
|
||||
sess.services.environment_manager.as_ref(),
|
||||
&session_configuration.environments,
|
||||
)
|
||||
.map_err(|err| {
|
||||
CodexErr::InvalidRequest(err.to_string().replace(
|
||||
"unknown turn environment id",
|
||||
"unknown stored MCP environment id",
|
||||
))
|
||||
})?
|
||||
.primary()
|
||||
.cloned();
|
||||
let mcp_runtime_environment = match turn_environment {
|
||||
Some(turn_environment) => McpRuntimeEnvironment::new(
|
||||
Arc::clone(&turn_environment.environment),
|
||||
turn_environment.cwd.to_path_buf(),
|
||||
),
|
||||
None => McpRuntimeEnvironment::new(
|
||||
sess.services
|
||||
.environment_manager
|
||||
.default_environment()
|
||||
.unwrap_or_else(|| sess.services.environment_manager.local_environment()),
|
||||
session_configuration.cwd.to_path_buf(),
|
||||
),
|
||||
};
|
||||
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_statuses.clone(),
|
||||
&session_configuration.approval_policy,
|
||||
INITIAL_SUBMIT_ID.to_owned(),
|
||||
tx_event.clone(),
|
||||
session_configuration.permission_profile(),
|
||||
mcp_runtime_environment,
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth),
|
||||
host_owned_codex_apps_enabled,
|
||||
client_elicitation_capability,
|
||||
tool_plugin_provenance,
|
||||
auth,
|
||||
Some(sess.mcp_elicitation_reviewer()),
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"session_init.mcp_manager_init",
|
||||
otel.name = "session_init.mcp_manager_init",
|
||||
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
{
|
||||
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
|
||||
*manager_guard = mcp_connection_manager;
|
||||
}
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
if cancel_guard.is_cancelled() {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
*cancel_guard = cancel_token;
|
||||
}
|
||||
if !required_mcp_servers.is_empty() {
|
||||
let failures = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.required_startup_failures(&required_mcp_servers)
|
||||
.instrument(info_span!(
|
||||
"session_init.required_mcp_wait",
|
||||
otel.name = "session_init.required_mcp_wait",
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
if !failures.is_empty() {
|
||||
let details = failures
|
||||
.iter()
|
||||
.map(|failure| format!("{}: {}", failure.server, failure.error))
|
||||
.collect::<Vec<_>>()
|
||||
.join("; ");
|
||||
anyhow::bail!("required MCP servers failed to initialize: {details}");
|
||||
}
|
||||
}
|
||||
sess.start_mcp_connection_manager_initialization();
|
||||
sess.schedule_startup_prewarm(session_configuration.base_instructions.clone())
|
||||
.await;
|
||||
let session_start_source = match &initial_history {
|
||||
|
||||
@@ -4300,6 +4300,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
config.permissions.permission_profile(),
|
||||
),
|
||||
)),
|
||||
mcp_connection_manager_init_lock: Mutex::new(()),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
@@ -4408,7 +4409,8 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
|
||||
features: config.features.clone(),
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
pending_mcp_server_refresh: Mutex::new(None),
|
||||
mcp_connection_manager_initialized: std::sync::atomic::AtomicBool::new(false),
|
||||
conversation: Arc::new(RealtimeConversationManager::new()),
|
||||
active_turn: Mutex::new(None),
|
||||
mailbox,
|
||||
@@ -6156,6 +6158,7 @@ where
|
||||
config.permissions.permission_profile(),
|
||||
),
|
||||
)),
|
||||
mcp_connection_manager_init_lock: Mutex::new(()),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
@@ -6264,7 +6267,8 @@ where
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
|
||||
features: config.features.clone(),
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
pending_mcp_server_refresh: Mutex::new(None),
|
||||
mcp_connection_manager_initialized: std::sync::atomic::AtomicBool::new(false),
|
||||
conversation: Arc::new(RealtimeConversationManager::new()),
|
||||
active_turn: Mutex::new(None),
|
||||
mailbox,
|
||||
@@ -6363,35 +6367,41 @@ async fn refresh_mcp_servers_is_deferred_until_next_turn() {
|
||||
mcp_oauth_credentials_store_mode,
|
||||
};
|
||||
{
|
||||
let mut guard = session.pending_mcp_server_refresh_config.lock().await;
|
||||
let mut guard = session.pending_mcp_server_refresh.lock().await;
|
||||
*guard = Some(refresh_config);
|
||||
}
|
||||
|
||||
assert!(!old_token.is_cancelled());
|
||||
assert!(
|
||||
session
|
||||
.pending_mcp_server_refresh_config
|
||||
.lock()
|
||||
.await
|
||||
.is_some()
|
||||
);
|
||||
assert!(session.pending_mcp_server_refresh.lock().await.is_some());
|
||||
|
||||
session
|
||||
.refresh_mcp_servers_if_requested(&turn_context, /*elicitation_reviewer*/ None)
|
||||
.await;
|
||||
|
||||
assert!(old_token.is_cancelled());
|
||||
assert!(
|
||||
session
|
||||
.pending_mcp_server_refresh_config
|
||||
.lock()
|
||||
.await
|
||||
.is_none()
|
||||
);
|
||||
assert!(session.pending_mcp_server_refresh.lock().await.is_none());
|
||||
let new_token = session.mcp_startup_cancellation_token().await;
|
||||
assert!(!new_token.is_cancelled());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lazy_mcp_initialization_is_idempotent() {
|
||||
let (session, _turn_context) = make_session_and_context().await;
|
||||
|
||||
session.ensure_mcp_connection_manager_initialized().await;
|
||||
let first_token = session.mcp_startup_cancellation_token().await;
|
||||
assert!(!first_token.is_cancelled());
|
||||
|
||||
session.ensure_mcp_connection_manager_initialized().await;
|
||||
|
||||
assert!(
|
||||
!first_token.is_cancelled(),
|
||||
"reinitializing the session-owned MCP pool should not cancel the first token",
|
||||
);
|
||||
let second_token = session.mcp_startup_cancellation_token().await;
|
||||
assert!(!second_token.is_cancelled());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_task_does_not_update_previous_turn_settings_for_non_run_turn_tasks() {
|
||||
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
|
||||
|
||||
@@ -188,6 +188,7 @@ pub(crate) async fn run_turn(
|
||||
// Plugin mentions need raw MCP/app inventory even when app tools
|
||||
// are normally hidden so we can describe the plugin's currently
|
||||
// usable capabilities for this turn.
|
||||
sess.ensure_mcp_connection_manager_initialized().await;
|
||||
match sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
@@ -1165,6 +1166,7 @@ pub(crate) async fn built_tools(
|
||||
skills_outcome: Option<&SkillLoadOutcome>,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> CodexResult<Arc<ToolRouter>> {
|
||||
sess.ensure_mcp_connection_manager_initialized().await;
|
||||
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
|
||||
let has_mcp_servers = mcp_connection_manager.has_servers();
|
||||
let all_mcp_tools = mcp_connection_manager
|
||||
|
||||
@@ -38,6 +38,7 @@ use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub(crate) struct SessionServices {
|
||||
pub(crate) mcp_connection_manager: Arc<RwLock<McpConnectionManager>>,
|
||||
pub(crate) mcp_connection_manager_init_lock: Mutex<()>,
|
||||
pub(crate) mcp_startup_cancellation_token: Mutex<CancellationToken>,
|
||||
pub(crate) unified_exec_manager: UnifiedExecProcessManager,
|
||||
#[cfg_attr(not(unix), allow(dead_code))]
|
||||
|
||||
@@ -4,6 +4,7 @@ mod response_adapter;
|
||||
mod wait_handler;
|
||||
pub(crate) mod wait_spec;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -29,9 +30,11 @@ use crate::tools::context::ToolPayload;
|
||||
use crate::tools::parallel::ToolCallRuntime;
|
||||
use crate::tools::router::ToolCall;
|
||||
use crate::tools::router::ToolCallSource;
|
||||
use crate::tools::router::ToolRouterParams;
|
||||
use crate::unified_exec::resolve_max_tokens;
|
||||
use codex_features::Feature;
|
||||
use codex_tools::ToolName;
|
||||
use codex_tools::collect_code_mode_tool_definitions;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::formatted_truncate_text_content_items_with_policy;
|
||||
use codex_utils_output_truncation::truncate_function_output_items_with_policy;
|
||||
@@ -257,6 +260,64 @@ fn truncate_code_mode_result(
|
||||
truncate_function_output_items_with_policy(&items, policy)
|
||||
}
|
||||
|
||||
pub(super) async fn build_enabled_tools(
|
||||
exec: &ExecContext,
|
||||
) -> Vec<codex_code_mode::ToolDefinition> {
|
||||
let router = build_nested_router(exec).await;
|
||||
let specs = router.model_visible_specs();
|
||||
collect_code_mode_tool_definitions(&specs)
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "nested tool router construction reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn build_nested_router(exec: &ExecContext) -> ToolRouter {
|
||||
let nested_tools_config = exec.turn.tools_config.for_code_mode_nested_tools();
|
||||
exec.session
|
||||
.ensure_mcp_connection_manager_initialized()
|
||||
.await;
|
||||
let listed_mcp_tools = exec
|
||||
.session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.list_all_tools()
|
||||
.await;
|
||||
let parallel_mcp_server_names = exec
|
||||
.turn
|
||||
.config
|
||||
.mcp_servers
|
||||
.get()
|
||||
.iter()
|
||||
.filter_map(|(server_name, server_config)| {
|
||||
server_config
|
||||
.supports_parallel_tool_calls
|
||||
.then_some(server_name.clone())
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let listed_mcp_tools = listed_mcp_tools
|
||||
.into_iter()
|
||||
.map(|mut tool| {
|
||||
tool.supports_parallel_tool_calls =
|
||||
parallel_mcp_server_names.contains(&tool.server_name);
|
||||
tool
|
||||
})
|
||||
.collect();
|
||||
|
||||
ToolRouter::from_config(
|
||||
&nested_tools_config,
|
||||
ToolRouterParams {
|
||||
deferred_mcp_tools: None,
|
||||
mcp_tools: Some(listed_mcp_tools),
|
||||
discoverable_tools: None,
|
||||
extension_tool_executors: Vec::new(),
|
||||
dynamic_tools: exec.turn.dynamic_tools.as_slice(),
|
||||
},
|
||||
)
|
||||
}
|
||||
async fn call_nested_tool(
|
||||
_exec: ExecContext,
|
||||
tool_runtime: ToolCallRuntime,
|
||||
|
||||
@@ -33,7 +33,6 @@ pub use read_mcp_resource::ReadMcpResourceHandler;
|
||||
|
||||
#[derive(Debug, Deserialize, Default)]
|
||||
struct ListResourcesArgs {
|
||||
/// Lists all resources from all servers if not specified.
|
||||
#[serde(default)]
|
||||
server: Option<String>,
|
||||
#[serde(default)]
|
||||
@@ -42,7 +41,6 @@ struct ListResourcesArgs {
|
||||
|
||||
#[derive(Debug, Deserialize, Default)]
|
||||
struct ListResourceTemplatesArgs {
|
||||
/// Lists all resource templates from all servers if not specified.
|
||||
#[serde(default)]
|
||||
server: Option<String>,
|
||||
#[serde(default)]
|
||||
|
||||
@@ -105,6 +105,7 @@ impl ToolExecutor<ToolInvocation> for ListMcpResourceTemplatesHandler {
|
||||
));
|
||||
}
|
||||
|
||||
session.ensure_mcp_connection_manager_initialized().await;
|
||||
let templates = session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
|
||||
@@ -103,6 +103,7 @@ impl ToolExecutor<ToolInvocation> for ListMcpResourcesHandler {
|
||||
));
|
||||
}
|
||||
|
||||
session.ensure_mcp_connection_manager_initialized().await;
|
||||
let resources = session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
|
||||
@@ -112,6 +112,7 @@ impl ToolExecutor<ToolInvocation> for RequestPluginInstallHandler {
|
||||
}
|
||||
|
||||
let auth = session.services.auth_manager.auth().await;
|
||||
session.ensure_mcp_connection_manager_initialized().await;
|
||||
let manager = session.services.mcp_connection_manager.read().await;
|
||||
let mcp_tools = manager.list_all_tools().await;
|
||||
drop(manager);
|
||||
@@ -314,6 +315,7 @@ async fn refresh_missing_requested_connectors(
|
||||
return Some(Vec::new());
|
||||
}
|
||||
|
||||
session.ensure_mcp_connection_manager_initialized().await;
|
||||
let manager = session.services.mcp_connection_manager.read().await;
|
||||
let mcp_tools = manager.list_all_tools().await;
|
||||
let accessible_connectors = connectors::with_app_enabled_state(
|
||||
|
||||
@@ -332,7 +332,6 @@ impl ToolRegistry {
|
||||
),
|
||||
),
|
||||
];
|
||||
|
||||
{
|
||||
let mut active = invocation.session.active_turn.lock().await;
|
||||
if let Some(active_turn) = active.as_mut() {
|
||||
|
||||
Reference in New Issue
Block a user