mirror of
https://github.com/openai/codex.git
synced 2026-04-30 01:16:54 +00:00
core/tui: non-blocking MCP startup (#6334)
This makes MCP startup not block TUI startup. Messages sent while MCPs are booting will be queued. https://github.com/user-attachments/assets/96e1d234-5d8f-4932-a935-a675d35c05e0 Fixes #6317 --------- Co-authored-by: pakrym-oai <pakrym@openai.com>
This commit is contained in:
@@ -9,8 +9,6 @@ use crate::client_common::REVIEW_PROMPT;
|
||||
use crate::compact;
|
||||
use crate::features::Feature;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::mcp::auth::McpAuthStatusEntry;
|
||||
use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT;
|
||||
use crate::parse_command::parse_command;
|
||||
use crate::parse_turn_item;
|
||||
use crate::response_processing::process_items;
|
||||
@@ -45,6 +43,7 @@ use mcp_types::ReadResourceResult;
|
||||
use serde_json;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
@@ -57,7 +56,6 @@ use crate::client::ModelClient;
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::config::Config;
|
||||
use crate::config::types::McpServerTransportConfig;
|
||||
use crate::config::types::ShellEnvironmentPolicy;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::environment_context::EnvironmentContext;
|
||||
@@ -476,21 +474,13 @@ impl Session {
|
||||
),
|
||||
};
|
||||
|
||||
// Error messages to dispatch after SessionConfigured is sent.
|
||||
let mut post_session_configured_events = Vec::<Event>::new();
|
||||
|
||||
// Kick off independent async setup tasks in parallel to reduce startup latency.
|
||||
//
|
||||
// - initialize RolloutRecorder with new or resumed session info
|
||||
// - spin up MCP connection manager
|
||||
// - perform default shell discovery
|
||||
// - load history metadata
|
||||
let rollout_fut = RolloutRecorder::new(&config, rollout_params);
|
||||
|
||||
let mcp_fut = McpConnectionManager::new(
|
||||
config.mcp_servers.clone(),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
);
|
||||
let default_shell_fut = shell::default_user_shell();
|
||||
let history_meta_fut = crate::message_history::history_metadata(&config);
|
||||
let auth_statuses_fut = compute_auth_statuses(
|
||||
@@ -499,15 +489,8 @@ impl Session {
|
||||
);
|
||||
|
||||
// Join all independent futures.
|
||||
let (
|
||||
rollout_recorder,
|
||||
mcp_res,
|
||||
default_shell,
|
||||
(history_log_id, history_entry_count),
|
||||
auth_statuses,
|
||||
) = tokio::join!(
|
||||
let (rollout_recorder, default_shell, (history_log_id, history_entry_count), auth_statuses) = tokio::join!(
|
||||
rollout_fut,
|
||||
mcp_fut,
|
||||
default_shell_fut,
|
||||
history_meta_fut,
|
||||
auth_statuses_fut
|
||||
@@ -519,34 +502,7 @@ impl Session {
|
||||
})?;
|
||||
let rollout_path = rollout_recorder.rollout_path.clone();
|
||||
|
||||
// Handle MCP manager result and record any startup failures.
|
||||
let (mcp_connection_manager, failed_clients) = match mcp_res {
|
||||
Ok((mgr, failures)) => (mgr, failures),
|
||||
Err(e) => {
|
||||
let message = format!("Failed to create MCP connection manager: {e:#}");
|
||||
error!("{message}");
|
||||
post_session_configured_events.push(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
msg: EventMsg::Error(ErrorEvent { message }),
|
||||
});
|
||||
(McpConnectionManager::default(), Default::default())
|
||||
}
|
||||
};
|
||||
|
||||
// Surface individual client start-up failures to the user.
|
||||
if !failed_clients.is_empty() {
|
||||
for (server_name, err) in failed_clients {
|
||||
let auth_entry = auth_statuses.get(&server_name);
|
||||
let display_message = mcp_init_error_display(&server_name, auth_entry, &err);
|
||||
warn!("MCP client for `{server_name}` failed to start: {err:#}");
|
||||
post_session_configured_events.push(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: display_message,
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
let mut post_session_configured_events = Vec::<Event>::new();
|
||||
|
||||
for (alias, feature) in session_configuration.features.legacy_feature_usages() {
|
||||
let canonical = feature.key();
|
||||
@@ -595,7 +551,8 @@ impl Session {
|
||||
warm_model_cache(&session_configuration.model);
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager,
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_startup_cancellation_token: CancellationToken::new(),
|
||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||
notifier: UserNotifier::new(config.notify.clone()),
|
||||
rollout: Mutex::new(Some(rollout_recorder)),
|
||||
@@ -635,6 +592,18 @@ impl Session {
|
||||
for event in events {
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
sess.services
|
||||
.mcp_connection_manager
|
||||
.write()
|
||||
.await
|
||||
.initialize(
|
||||
config.mcp_servers.clone(),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_statuses.clone(),
|
||||
tx_event.clone(),
|
||||
sess.services.mcp_startup_cancellation_token.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted.
|
||||
sess.record_initial_history(initial_history).await;
|
||||
@@ -1258,6 +1227,8 @@ impl Session {
|
||||
) -> anyhow::Result<ListResourcesResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.list_resources(server, params)
|
||||
.await
|
||||
}
|
||||
@@ -1269,6 +1240,8 @@ impl Session {
|
||||
) -> anyhow::Result<ListResourceTemplatesResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.list_resource_templates(server, params)
|
||||
.await
|
||||
}
|
||||
@@ -1280,6 +1253,8 @@ impl Session {
|
||||
) -> anyhow::Result<ReadResourceResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.read_resource(server, params)
|
||||
.await
|
||||
}
|
||||
@@ -1292,19 +1267,29 @@ impl Session {
|
||||
) -> anyhow::Result<CallToolResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.call_tool(server, tool, arguments)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) fn parse_mcp_tool_name(&self, tool_name: &str) -> Option<(String, String)> {
|
||||
pub(crate) async fn parse_mcp_tool_name(&self, tool_name: &str) -> Option<(String, String)> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.parse_tool_name(tool_name)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn interrupt_task(self: &Arc<Self>) {
|
||||
info!("interrupt received: abort current task, if any");
|
||||
self.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
let has_active_turn = { self.active_turn.lock().await.is_some() };
|
||||
if has_active_turn {
|
||||
self.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
} else {
|
||||
self.cancel_mcp_startup().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn notifier(&self) -> &UserNotifier {
|
||||
@@ -1318,6 +1303,10 @@ impl Session {
|
||||
fn show_raw_agent_reasoning(&self) -> bool {
|
||||
self.services.show_raw_agent_reasoning
|
||||
}
|
||||
|
||||
async fn cancel_mcp_startup(&self) {
|
||||
self.services.mcp_startup_cancellation_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiver<Submission>) {
|
||||
@@ -1575,17 +1564,15 @@ mod handlers {
|
||||
}
|
||||
|
||||
pub async fn list_mcp_tools(sess: &Session, config: &Arc<Config>, sub_id: String) {
|
||||
// This is a cheap lookup from the connection manager's cache.
|
||||
let tools = sess.services.mcp_connection_manager.list_all_tools();
|
||||
let (auth_status_entries, resources, resource_templates) = tokio::join!(
|
||||
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
|
||||
let (tools, auth_status_entries, resources, resource_templates) = tokio::join!(
|
||||
mcp_connection_manager.list_all_tools(),
|
||||
compute_auth_statuses(
|
||||
config.mcp_servers.iter(),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
),
|
||||
sess.services.mcp_connection_manager.list_all_resources(),
|
||||
sess.services
|
||||
.mcp_connection_manager
|
||||
.list_all_resource_templates()
|
||||
mcp_connection_manager.list_all_resources(),
|
||||
mcp_connection_manager.list_all_resource_templates(),
|
||||
);
|
||||
let auth_statuses = auth_status_entries
|
||||
.iter()
|
||||
@@ -1594,7 +1581,10 @@ mod handlers {
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::McpListToolsResponse(crate::protocol::McpListToolsResponseEvent {
|
||||
tools,
|
||||
tools: tools
|
||||
.into_iter()
|
||||
.map(|(name, tool)| (name, tool.tool))
|
||||
.collect(),
|
||||
resources,
|
||||
resource_templates,
|
||||
auth_statuses,
|
||||
@@ -1924,10 +1914,22 @@ async fn run_turn(
|
||||
input: Vec<ResponseItem>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> CodexResult<TurnRunResult> {
|
||||
let mcp_tools = sess.services.mcp_connection_manager.list_all_tools();
|
||||
let mcp_tools = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.list_all_tools()
|
||||
.or_cancel(&cancellation_token)
|
||||
.await?;
|
||||
let router = Arc::new(ToolRouter::from_config(
|
||||
&turn_context.tools_config,
|
||||
Some(mcp_tools),
|
||||
Some(
|
||||
mcp_tools
|
||||
.into_iter()
|
||||
.map(|(name, tool)| (name, tool.tool))
|
||||
.collect(),
|
||||
),
|
||||
));
|
||||
|
||||
let model_supports_parallel = turn_context
|
||||
@@ -2096,7 +2098,7 @@ async fn try_run_turn(
|
||||
ResponseEvent::Created => {}
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
let previously_active_item = active_item.take();
|
||||
match ToolRouter::build_tool_call(sess.as_ref(), item.clone()) {
|
||||
match ToolRouter::build_tool_call(sess.as_ref(), item.clone()).await {
|
||||
Ok(Some(call)) => {
|
||||
let payload_preview = call.payload.log_payload().into_owned();
|
||||
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
|
||||
@@ -2307,59 +2309,6 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -
|
||||
})
|
||||
}
|
||||
|
||||
fn mcp_init_error_display(
|
||||
server_name: &str,
|
||||
entry: Option<&McpAuthStatusEntry>,
|
||||
err: &anyhow::Error,
|
||||
) -> String {
|
||||
if let Some(McpServerTransportConfig::StreamableHttp {
|
||||
url,
|
||||
bearer_token_env_var,
|
||||
http_headers,
|
||||
..
|
||||
}) = &entry.map(|entry| &entry.config.transport)
|
||||
&& url == "https://api.githubcopilot.com/mcp/"
|
||||
&& bearer_token_env_var.is_none()
|
||||
&& http_headers.as_ref().map(HashMap::is_empty).unwrap_or(true)
|
||||
{
|
||||
// GitHub only supports OAUth for first party MCP clients.
|
||||
// That means that the user has to specify a personal access token either via bearer_token_env_var or http_headers.
|
||||
// https://github.com/github/github-mcp-server/issues/921#issuecomment-3221026448
|
||||
format!(
|
||||
"GitHub MCP does not support OAuth. Log in by adding a personal access token (https://github.com/settings/personal-access-tokens) to your environment and config.toml:\n[mcp_servers.{server_name}]\nbearer_token_env_var = CODEX_GITHUB_PERSONAL_ACCESS_TOKEN"
|
||||
)
|
||||
} else if is_mcp_client_auth_required_error(err) {
|
||||
format!(
|
||||
"The {server_name} MCP server is not logged in. Run `codex mcp login {server_name}`."
|
||||
)
|
||||
} else if is_mcp_client_startup_timeout_error(err) {
|
||||
let startup_timeout_secs = match entry {
|
||||
Some(entry) => match entry.config.startup_timeout_sec {
|
||||
Some(timeout) => timeout,
|
||||
None => DEFAULT_STARTUP_TIMEOUT,
|
||||
},
|
||||
None => DEFAULT_STARTUP_TIMEOUT,
|
||||
}
|
||||
.as_secs();
|
||||
format!(
|
||||
"MCP client for `{server_name}` timed out after {startup_timeout_secs} seconds. Add or adjust `startup_timeout_sec` in your config.toml:\n[mcp_servers.{server_name}]\nstartup_timeout_sec = XX"
|
||||
)
|
||||
} else {
|
||||
format!("MCP client for `{server_name}` failed to start: {err:#}")
|
||||
}
|
||||
}
|
||||
|
||||
fn is_mcp_client_auth_required_error(error: &anyhow::Error) -> bool {
|
||||
// StreamableHttpError::AuthRequired from the MCP SDK.
|
||||
error.to_string().contains("Auth required")
|
||||
}
|
||||
|
||||
fn is_mcp_client_startup_timeout_error(error: &anyhow::Error) -> bool {
|
||||
let error_message = error.to_string();
|
||||
error_message.contains("request timed out")
|
||||
|| error_message.contains("timed out handshaking with MCP server")
|
||||
}
|
||||
|
||||
use crate::features::Features;
|
||||
#[cfg(test)]
|
||||
pub(crate) use tests::make_session_and_context;
|
||||
@@ -2369,10 +2318,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::config::ConfigOverrides;
|
||||
use crate::config::ConfigToml;
|
||||
use crate::config::types::McpServerConfig;
|
||||
use crate::config::types::McpServerTransportConfig;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::mcp::auth::McpAuthStatusEntry;
|
||||
use crate::tools::format_exec_output_str;
|
||||
|
||||
use crate::protocol::CompactedItem;
|
||||
@@ -2392,7 +2338,6 @@ mod tests {
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::McpAuthStatus;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
@@ -2606,7 +2551,8 @@ mod tests {
|
||||
let state = SessionState::new(session_configuration.clone());
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: McpConnectionManager::default(),
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_startup_cancellation_token: CancellationToken::new(),
|
||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||
notifier: UserNotifier::new(None),
|
||||
rollout: Mutex::new(None),
|
||||
@@ -2682,7 +2628,8 @@ mod tests {
|
||||
let state = SessionState::new(session_configuration.clone());
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: McpConnectionManager::default(),
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_startup_cancellation_token: CancellationToken::new(),
|
||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||
notifier: UserNotifier::new(None),
|
||||
rollout: Mutex::new(None),
|
||||
@@ -2863,9 +2810,23 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn fatal_tool_error_stops_turn_and_reports_error() {
|
||||
let (session, turn_context, _rx) = make_session_and_context_with_rx();
|
||||
let tools = {
|
||||
session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.list_all_tools()
|
||||
.await
|
||||
};
|
||||
let router = ToolRouter::from_config(
|
||||
&turn_context.tools_config,
|
||||
Some(session.services.mcp_connection_manager.list_all_tools()),
|
||||
Some(
|
||||
tools
|
||||
.into_iter()
|
||||
.map(|(name, tool)| (name, tool.tool))
|
||||
.collect(),
|
||||
),
|
||||
);
|
||||
let item = ResponseItem::CustomToolCall {
|
||||
id: None,
|
||||
@@ -2876,6 +2837,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let call = ToolRouter::build_tool_call(session.as_ref(), item.clone())
|
||||
.await
|
||||
.expect("build tool call")
|
||||
.expect("tool call present");
|
||||
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
|
||||
@@ -3125,7 +3087,6 @@ mod tests {
|
||||
pretty_assertions::assert_eq!(exec_output.metadata, ResponseExecMetadata { exit_code: 0 });
|
||||
assert!(exec_output.output.contains("hi"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unified_exec_rejects_escalated_permissions_when_policy_not_on_request() {
|
||||
use crate::protocol::AskForApproval;
|
||||
@@ -3167,89 +3128,4 @@ mod tests {
|
||||
|
||||
pretty_assertions::assert_eq!(output, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mcp_init_error_display_prompts_for_github_pat() {
|
||||
let server_name = "github";
|
||||
let entry = McpAuthStatusEntry {
|
||||
config: McpServerConfig {
|
||||
transport: McpServerTransportConfig::StreamableHttp {
|
||||
url: "https://api.githubcopilot.com/mcp/".to_string(),
|
||||
bearer_token_env_var: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
},
|
||||
enabled: true,
|
||||
startup_timeout_sec: None,
|
||||
tool_timeout_sec: None,
|
||||
enabled_tools: None,
|
||||
disabled_tools: None,
|
||||
},
|
||||
auth_status: McpAuthStatus::Unsupported,
|
||||
};
|
||||
let err = anyhow::anyhow!("OAuth is unsupported");
|
||||
|
||||
let display = mcp_init_error_display(server_name, Some(&entry), &err);
|
||||
|
||||
let expected = format!(
|
||||
"GitHub MCP does not support OAuth. Log in by adding a personal access token (https://github.com/settings/personal-access-tokens) to your environment and config.toml:\n[mcp_servers.{server_name}]\nbearer_token_env_var = CODEX_GITHUB_PERSONAL_ACCESS_TOKEN"
|
||||
);
|
||||
|
||||
assert_eq!(expected, display);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mcp_init_error_display_prompts_for_login_when_auth_required() {
|
||||
let server_name = "example";
|
||||
let err = anyhow::anyhow!("Auth required for server");
|
||||
|
||||
let display = mcp_init_error_display(server_name, None, &err);
|
||||
|
||||
let expected = format!(
|
||||
"The {server_name} MCP server is not logged in. Run `codex mcp login {server_name}`."
|
||||
);
|
||||
|
||||
assert_eq!(expected, display);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mcp_init_error_display_reports_generic_errors() {
|
||||
let server_name = "custom";
|
||||
let entry = McpAuthStatusEntry {
|
||||
config: McpServerConfig {
|
||||
transport: McpServerTransportConfig::StreamableHttp {
|
||||
url: "https://example.com".to_string(),
|
||||
bearer_token_env_var: Some("TOKEN".to_string()),
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
},
|
||||
enabled: true,
|
||||
startup_timeout_sec: None,
|
||||
tool_timeout_sec: None,
|
||||
enabled_tools: None,
|
||||
disabled_tools: None,
|
||||
},
|
||||
auth_status: McpAuthStatus::Unsupported,
|
||||
};
|
||||
let err = anyhow::anyhow!("boom");
|
||||
|
||||
let display = mcp_init_error_display(server_name, Some(&entry), &err);
|
||||
|
||||
let expected = format!("MCP client for `{server_name}` failed to start: {err:#}");
|
||||
|
||||
assert_eq!(expected, display);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mcp_init_error_display_includes_startup_timeout_hint() {
|
||||
let server_name = "slow";
|
||||
let err = anyhow::anyhow!("request timed out");
|
||||
|
||||
let display = mcp_init_error_display(server_name, None, &err);
|
||||
|
||||
assert_eq!(
|
||||
"MCP client for `slow` timed out after 10 seconds. Add or adjust `startup_timeout_sec` in your config.toml:\n[mcp_servers.slow]\nstartup_timeout_sec = XX",
|
||||
display
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user