From 9f06d171e23fdb8efcb8ee8c05d12c76fb472529 Mon Sep 17 00:00:00 2001 From: aaronl-openai Date: Tue, 5 May 2026 21:09:28 -0700 Subject: [PATCH] Preserve session MCP config on refresh (#21055) # Overview MCP refreshes were rebuilding active threads from fresh disk-backed config only, which dropped thread-start session overlays such as app-injected MCP servers. This keeps refreshes current with disk config while preserving the thread-local config that only the active thread knows about. # Changes - Rebuild refreshed config per active thread using that thread's current `cwd`, rather than fanning out one app-server config to every thread. - Preserve each thread's `SessionFlags` layer while replacing reloadable config layers with freshly loaded config, then derive the MCP refresh payload from the rebuilt result. - Move MCP refresh orchestration into app-server so manual refreshes fail loudly while background refreshes remain best-effort, and route plugin-triggered refreshes through the same per-thread reload path. - Add regression coverage for session overlays, fresh project config, plugin-derived MCP config, current requirements, and strict vs best-effort refresh behavior. # Verification - Passed focused Rust coverage for the thread-config rebuild behavior and deferred MCP refresh flow, plus `cargo test -p codex-app-server --lib`. - Verified end to end in the Codex dev app against the locally built CLI: registered an MCP via thread config, verified that it could be used successfully before refresh, manually triggered MCP refresh, and verified that it continued to be available afterward. --- codex-rs/app-server/src/config_manager.rs | 15 + codex-rs/app-server/src/lib.rs | 1 + codex-rs/app-server/src/mcp_refresh.rs | 241 ++++++++++++++ codex-rs/app-server/src/message_processor.rs | 2 +- codex-rs/app-server/src/request_processors.rs | 1 - .../request_processors/account_processor.rs | 16 +- .../src/request_processors/mcp_processor.rs | 43 +-- .../src/request_processors/plugins.rs | 42 +-- codex-rs/core/src/config/config_tests.rs | 302 ++++++++++++++++++ codex-rs/core/src/config/mod.rs | 60 ++++ codex-rs/core/src/thread_manager.rs | 22 -- 11 files changed, 652 insertions(+), 93 deletions(-) create mode 100644 codex-rs/app-server/src/mcp_refresh.rs diff --git a/codex-rs/app-server/src/config_manager.rs b/codex-rs/app-server/src/config_manager.rs index ba11205b7a..030829fa4b 100644 --- a/codex-rs/app-server/src/config_manager.rs +++ b/codex-rs/app-server/src/config_manager.rs @@ -140,6 +140,21 @@ impl ConfigManager { .await } + pub(crate) async fn load_latest_config_for_thread( + &self, + thread_config: &Config, + ) -> std::io::Result { + let refreshed_config = self + .load_latest_config(Some(thread_config.cwd.to_path_buf())) + .await?; + let mut config = thread_config + .rebuild_preserving_session_layers(&refreshed_config) + .await?; + self.apply_runtime_feature_enablement(&mut config); + self.apply_arg0_paths(&mut config); + Ok(config) + } + pub(crate) async fn load_default_config(&self) -> std::io::Result { let mut config = Config::load_default_with_cli_overrides_for_codex_home( self.codex_home.clone(), diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 4013bbe76b..cf6a9e890e 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -85,6 +85,7 @@ mod filters; mod fs_watch; mod fuzzy_file_search; pub mod in_process; +mod mcp_refresh; mod message_processor; mod models; mod outgoing_message; diff --git a/codex-rs/app-server/src/mcp_refresh.rs b/codex-rs/app-server/src/mcp_refresh.rs new file mode 100644 index 0000000000..93dc02f7c3 --- /dev/null +++ b/codex-rs/app-server/src/mcp_refresh.rs @@ -0,0 +1,241 @@ +use crate::config_manager::ConfigManager; +use codex_core::CodexThread; +use codex_core::ThreadManager; +use codex_core::config::Config; +use codex_protocol::ThreadId; +use codex_protocol::protocol::McpServerRefreshConfig; +use codex_protocol::protocol::Op; +use std::io; +use std::sync::Arc; +use tracing::warn; + +pub(crate) async fn queue_strict_refresh( + thread_manager: &Arc, + config_manager: &ConfigManager, +) -> io::Result<()> { + config_manager + .load_latest_config(/*fallback_cwd*/ None) + .await?; + let mut refreshes = Vec::new(); + for thread_id in thread_manager.list_thread_ids().await { + let thread = thread_manager + .get_thread(thread_id) + .await + .map_err(|err| io::Error::other(format!("failed to load thread {thread_id}: {err}")))?; + let config = + build_refresh_config(thread_manager, config_manager, thread.config().await).await?; + refreshes.push((thread_id, thread, config)); + } + for (thread_id, thread, config) in refreshes { + queue_refresh(thread_id, thread, config).await?; + } + Ok(()) +} + +pub(crate) async fn queue_best_effort_refresh( + thread_manager: &Arc, + config_manager: &ConfigManager, +) { + for thread_id in thread_manager.list_thread_ids().await { + let thread = match thread_manager.get_thread(thread_id).await { + Ok(thread) => thread, + Err(err) => { + warn!("failed to load thread {thread_id} for MCP refresh: {err}"); + continue; + } + }; + let config = + match build_refresh_config(thread_manager, config_manager, thread.config().await).await + { + Ok(config) => config, + Err(err) => { + warn!("failed to build MCP refresh config for thread {thread_id}: {err}"); + continue; + } + }; + if let Err(err) = queue_refresh(thread_id, thread, config).await { + warn!("{err}"); + } + } +} + +async fn build_refresh_config( + thread_manager: &ThreadManager, + config_manager: &ConfigManager, + thread_config: Arc, +) -> io::Result { + let config = config_manager + .load_latest_config_for_thread(thread_config.as_ref()) + .await?; + let mcp_servers = thread_manager + .mcp_manager() + .configured_servers(&config) + .await; + Ok(McpServerRefreshConfig { + mcp_servers: serde_json::to_value(mcp_servers).map_err(io::Error::other)?, + mcp_oauth_credentials_store_mode: serde_json::to_value( + config.mcp_oauth_credentials_store_mode, + ) + .map_err(io::Error::other)?, + }) +} + +async fn queue_refresh( + thread_id: ThreadId, + thread: Arc, + config: McpServerRefreshConfig, +) -> io::Result<()> { + thread + .submit(Op::RefreshMcpServers { config }) + .await + .map(|_| ()) + .map_err(|err| { + io::Error::other(format!( + "failed to queue MCP refresh for thread {thread_id}: {err}" + )) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use codex_arg0::Arg0DispatchPaths; + use codex_config::CloudRequirementsLoader; + use codex_config::LoaderOverrides; + use codex_config::ThreadConfigContext; + use codex_config::ThreadConfigLoadError; + use codex_config::ThreadConfigLoadErrorCode; + use codex_config::ThreadConfigLoader; + use codex_config::ThreadConfigSource; + use codex_core::agent_graph_store_from_state_db; + use codex_core::config::ConfigOverrides; + use codex_core::init_state_db_from_config; + use codex_core::thread_store_from_config; + use codex_exec_server::EnvironmentManager; + use codex_login::AuthManager; + use codex_login::CodexAuth; + use codex_protocol::protocol::SessionSource; + use codex_utils_absolute_path::AbsolutePathBuf; + use pretty_assertions::assert_eq; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use tempfile::TempDir; + + #[tokio::test] + async fn strict_refresh_reports_thread_planning_failures() -> anyhow::Result<()> { + let (_temp_dir, thread_manager, config_manager, _loader) = refresh_test_state().await?; + + let err = queue_strict_refresh(&thread_manager, &config_manager) + .await + .expect_err("strict refresh should fail"); + + assert_eq!(err.to_string(), "failed to load refresh config"); + Ok(()) + } + + #[tokio::test] + async fn best_effort_refresh_attempts_every_loaded_thread() -> anyhow::Result<()> { + let (_temp_dir, thread_manager, config_manager, loader) = refresh_test_state().await?; + + queue_best_effort_refresh(&thread_manager, &config_manager).await; + + assert_eq!(loader.good_loads.load(Ordering::Relaxed), 1); + assert_eq!(loader.bad_loads.load(Ordering::Relaxed), 1); + Ok(()) + } + + async fn refresh_test_state() -> anyhow::Result<( + TempDir, + Arc, + ConfigManager, + Arc, + )> { + let temp_dir = TempDir::new()?; + let good_cwd = temp_dir.path().join("good"); + let bad_cwd = temp_dir.path().join("bad"); + std::fs::create_dir_all(&good_cwd)?; + std::fs::create_dir_all(&bad_cwd)?; + + let initial_config_manager = + ConfigManager::without_managed_config_for_tests(temp_dir.path().to_path_buf()); + let good_config = initial_config_manager + .load_for_cwd( + /*request_overrides*/ None, + ConfigOverrides::default(), + Some(good_cwd.clone()), + ) + .await?; + let bad_config = initial_config_manager + .load_for_cwd( + /*request_overrides*/ None, + ConfigOverrides::default(), + Some(bad_cwd.clone()), + ) + .await?; + + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")); + let state_db = init_state_db_from_config(&good_config) + .await + .expect("refresh tests require state db"); + let thread_store = thread_store_from_config(&good_config, state_db.clone()); + let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); + let thread_manager = Arc::new(ThreadManager::new( + &good_config, + auth_manager, + SessionSource::Exec, + Arc::new(EnvironmentManager::default_for_tests()), + /*analytics_events_client*/ None, + state_db, + thread_store, + agent_graph_store, + )); + thread_manager.start_thread(good_config).await?; + thread_manager.start_thread(bad_config).await?; + + let loader = Arc::new(CountingThreadConfigLoader { + good_cwd: AbsolutePathBuf::try_from(good_cwd)?, + bad_cwd: AbsolutePathBuf::try_from(bad_cwd)?, + good_loads: AtomicUsize::new(0), + bad_loads: AtomicUsize::new(0), + }); + let config_manager = ConfigManager::new( + temp_dir.path().to_path_buf(), + Vec::new(), + LoaderOverrides::without_managed_config_for_tests(), + CloudRequirementsLoader::default(), + Arg0DispatchPaths::default(), + loader.clone(), + ); + + Ok((temp_dir, thread_manager, config_manager, loader)) + } + + struct CountingThreadConfigLoader { + good_cwd: AbsolutePathBuf, + bad_cwd: AbsolutePathBuf, + good_loads: AtomicUsize, + bad_loads: AtomicUsize, + } + + #[async_trait] + impl ThreadConfigLoader for CountingThreadConfigLoader { + async fn load( + &self, + context: ThreadConfigContext, + ) -> Result, ThreadConfigLoadError> { + if context.cwd.as_ref() == Some(&self.good_cwd) { + self.good_loads.fetch_add(1, Ordering::Relaxed); + } + if context.cwd.as_ref() == Some(&self.bad_cwd) { + self.bad_loads.fetch_add(1, Ordering::Relaxed); + return Err(ThreadConfigLoadError::new( + ThreadConfigLoadErrorCode::Internal, + /*status_code*/ None, + "failed to load refresh config", + )); + } + Ok(Vec::new()) + } + } +} diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index c449217254..e0cc3bd176 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -416,7 +416,7 @@ impl MessageProcessor { if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) { // Keep plugin startup warmups aligned at app-server startup. let on_effective_plugins_changed = - plugin_processor.effective_plugins_changed_callback((*config).clone()); + plugin_processor.effective_plugins_changed_callback(); thread_manager .plugins_manager() .maybe_start_plugin_startup_tasks_for_config( diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 8667d67d11..9f419d7006 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -361,7 +361,6 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::GitInfo as CoreGitInfo; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus; -use codex_protocol::protocol::McpServerRefreshConfig; use codex_protocol::protocol::Op; use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; use codex_protocol::protocol::RealtimeVoicesList; diff --git a/codex-rs/app-server/src/request_processors/account_processor.rs b/codex-rs/app-server/src/request_processors/account_processor.rs index 2609e25ac4..c73d6700e7 100644 --- a/codex-rs/app-server/src/request_processors/account_processor.rs +++ b/codex-rs/app-server/src/request_processors/account_processor.rs @@ -168,7 +168,7 @@ impl AccountRequestProcessor { { Ok(config) => { let refresh_thread_manager = Arc::clone(thread_manager); - let refresh_config = config.clone(); + let refresh_config_manager = config_manager.clone(); thread_manager .plugins_manager() .maybe_start_remote_installed_plugins_cache_refresh( @@ -177,7 +177,7 @@ impl AccountRequestProcessor { Some(Arc::new(move || { Self::spawn_effective_plugins_changed_task( Arc::clone(&refresh_thread_manager), - refresh_config.clone(), + refresh_config_manager.clone(), ); })), ); @@ -190,19 +190,17 @@ impl AccountRequestProcessor { } } - fn spawn_effective_plugins_changed_task(thread_manager: Arc, config: Config) { + fn spawn_effective_plugins_changed_task( + thread_manager: Arc, + config_manager: ConfigManager, + ) { tokio::spawn(async move { thread_manager.plugins_manager().clear_cache(); thread_manager.skills_manager().clear_cache(); if thread_manager.list_thread_ids().await.is_empty() { return; } - if let Err(err) = - McpRequestProcessor::queue_mcp_server_refresh_for_config(&thread_manager, &config) - .await - { - warn!("failed to queue MCP refresh after effective plugins changed: {err:?}"); - } + crate::mcp_refresh::queue_best_effort_refresh(&thread_manager, &config_manager).await; }); } diff --git a/codex-rs/app-server/src/request_processors/mcp_processor.rs b/codex-rs/app-server/src/request_processors/mcp_processor.rs index 22d43d87a2..243506f6af 100644 --- a/codex-rs/app-server/src/request_processors/mcp_processor.rs +++ b/codex-rs/app-server/src/request_processors/mcp_processor.rs @@ -77,8 +77,9 @@ impl McpRequestProcessor { &self, _params: Option<()>, ) -> Result { - let config = self.load_latest_config(/*fallback_cwd*/ None).await?; - Self::queue_mcp_server_refresh_for_config(&self.thread_manager, &config).await?; + crate::mcp_refresh::queue_strict_refresh(&self.thread_manager, &self.config_manager) + .await + .map_err(|err| internal_error(format!("failed to refresh MCP servers: {err}")))?; Ok(McpServerRefreshResponse {}) } @@ -108,44 +109,6 @@ impl McpRequestProcessor { Ok((thread_id, thread)) } - pub(super) async fn queue_mcp_server_refresh_for_config( - thread_manager: &Arc, - config: &Config, - ) -> Result<(), JSONRPCErrorError> { - let configured_servers = thread_manager - .mcp_manager() - .configured_servers(config) - .await; - let mcp_servers = match serde_json::to_value(configured_servers) { - Ok(value) => value, - Err(err) => { - return Err(internal_error(format!( - "failed to serialize MCP servers: {err}" - ))); - } - }; - - let mcp_oauth_credentials_store_mode = - match serde_json::to_value(config.mcp_oauth_credentials_store_mode) { - Ok(value) => value, - Err(err) => { - return Err(internal_error(format!( - "failed to serialize MCP OAuth credentials store mode: {err}" - ))); - } - }; - - let refresh_config = McpServerRefreshConfig { - mcp_servers, - mcp_oauth_credentials_store_mode, - }; - - // Refresh requests are queued per thread; each thread rebuilds MCP connections on its next - // active turn to avoid work for threads that never resume. - thread_manager.refresh_mcp_servers(refresh_config).await; - Ok(()) - } - async fn mcp_server_oauth_login_response( &self, params: McpServerOauthLoginParams, diff --git a/codex-rs/app-server/src/request_processors/plugins.rs b/codex-rs/app-server/src/request_processors/plugins.rs index a758e3a582..7d542edf39 100644 --- a/codex-rs/app-server/src/request_processors/plugins.rs +++ b/codex-rs/app-server/src/request_processors/plugins.rs @@ -246,33 +246,35 @@ impl PluginRequestProcessor { .map(|response| Some(response.into())) } - pub(crate) fn effective_plugins_changed_callback( - &self, - config: Config, - ) -> Arc { + pub(crate) fn effective_plugins_changed_callback(&self) -> Arc { let thread_manager = Arc::clone(&self.thread_manager); + let config_manager = self.config_manager.clone(); Arc::new(move || { - Self::spawn_effective_plugins_changed_task(Arc::clone(&thread_manager), config.clone()); + Self::spawn_effective_plugins_changed_task( + Arc::clone(&thread_manager), + config_manager.clone(), + ); }) } - fn on_effective_plugins_changed(&self, config: Config) { - Self::spawn_effective_plugins_changed_task(Arc::clone(&self.thread_manager), config); + fn on_effective_plugins_changed(&self) { + Self::spawn_effective_plugins_changed_task( + Arc::clone(&self.thread_manager), + self.config_manager.clone(), + ); } - fn spawn_effective_plugins_changed_task(thread_manager: Arc, config: Config) { + fn spawn_effective_plugins_changed_task( + thread_manager: Arc, + config_manager: ConfigManager, + ) { tokio::spawn(async move { thread_manager.plugins_manager().clear_cache(); thread_manager.skills_manager().clear_cache(); if thread_manager.list_thread_ids().await.is_empty() { return; } - if let Err(err) = - McpRequestProcessor::queue_mcp_server_refresh_for_config(&thread_manager, &config) - .await - { - warn!("failed to queue MCP refresh after effective plugins changed: {err:?}"); - } + crate::mcp_refresh::queue_best_effort_refresh(&thread_manager, &config_manager).await; }); } @@ -342,7 +344,7 @@ impl PluginRequestProcessor { &plugins_input, auth.clone(), &roots, - Some(self.effective_plugins_changed_callback(config.clone())), + Some(self.effective_plugins_changed_callback()), ); let config_for_marketplace_listing = plugins_input.clone(); @@ -840,7 +842,7 @@ impl PluginRequestProcessor { } }; - self.on_effective_plugins_changed(config.clone()); + self.on_effective_plugins_changed(); let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await; if !plugin_mcp_servers.is_empty() { @@ -951,7 +953,7 @@ impl PluginRequestProcessor { .maybe_start_remote_installed_plugins_cache_refresh_after_mutation( &config.plugins_config_input(), auth.clone(), - Some(self.effective_plugins_changed_callback(config.clone())), + Some(self.effective_plugins_changed_callback()), ); let mut plugin_metadata = @@ -1144,7 +1146,7 @@ impl PluginRequestProcessor { .await .map_err(Self::plugin_uninstall_error)?; match self.load_latest_config(/*fallback_cwd*/ None).await { - Ok(config) => self.on_effective_plugins_changed(config), + Ok(_) => self.on_effective_plugins_changed(), Err(err) => { warn!( "failed to reload config after plugin uninstall, clearing plugin-related caches only: {err:?}" @@ -1245,12 +1247,12 @@ impl PluginRequestProcessor { ) { let plugins_manager = self.thread_manager.plugins_manager(); if plugins_manager.clear_remote_installed_plugins_cache() { - self.on_effective_plugins_changed(config.clone()); + self.on_effective_plugins_changed(); } plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation( &config.plugins_config_input(), auth.clone(), - Some(self.effective_plugins_changed_callback(config.clone())), + Some(self.effective_plugins_changed_callback()), ); } diff --git a/codex-rs/core/src/config/config_tests.rs b/codex-rs/core/src/config/config_tests.rs index c35a3767cb..aed305552d 100644 --- a/codex-rs/core/src/config/config_tests.rs +++ b/codex-rs/core/src/config/config_tests.rs @@ -6,6 +6,7 @@ use crate::config::edit::ConfigEditsBuilder; use crate::config::edit::apply_blocking; use assert_matches::assert_matches; use codex_config::CONFIG_TOML_FILE; +use codex_config::ConfigLayerEntry; use codex_config::RequirementSource; use codex_config::config_toml::AgentRoleToml; use codex_config::config_toml::AgentsToml; @@ -2845,6 +2846,307 @@ fn filter_plugin_mcp_servers_by_allowlist_blocks_unlisted_plugin() { ); } +#[tokio::test] +async fn rebuild_preserving_session_layers_refreshes_requirements() -> std::io::Result<()> { + let codex_home = TempDir::new()?; + let user_file = AbsolutePathBuf::resolve_path_against_base(CONFIG_TOML_FILE, codex_home.path()); + let project_dot_codex = + AbsolutePathBuf::resolve_path_against_base("project/.codex", codex_home.path()); + let mcp_requirements = BTreeMap::from([ + ( + "session_overrides_user".to_string(), + McpServerRequirement { + identity: McpServerIdentity::Command { + command: "session-command".to_string(), + }, + }, + ), + ( + "managed_overrides_session".to_string(), + McpServerRequirement { + identity: McpServerIdentity::Command { + command: "managed-command".to_string(), + }, + }, + ), + ( + "fresh_global".to_string(), + McpServerRequirement { + identity: McpServerIdentity::Command { + command: "fresh-global-command".to_string(), + }, + }, + ), + ( + "fresh_project".to_string(), + McpServerRequirement { + identity: McpServerIdentity::Command { + command: "fresh-project-command".to_string(), + }, + }, + ), + ]); + let requirements_toml = codex_config::ConfigRequirementsToml { + mcp_servers: Some(mcp_requirements.clone()), + ..Default::default() + }; + let requirements = codex_config::ConfigRequirements { + mcp_servers: Some(Sourced::new(mcp_requirements, RequirementSource::Unknown)), + ..Default::default() + }; + let refreshed_layer_stack = ConfigLayerStack::new( + vec![ + ConfigLayerEntry::new( + codex_app_server_protocol::ConfigLayerSource::User { + file: user_file.clone(), + }, + toml::toml! { + [mcp_servers.session_overrides_user] + command = "new-user-command" + [mcp_servers.managed_overrides_session] + command = "new-user-command" + [mcp_servers.fresh_global] + command = "fresh-global-command" + } + .into(), + ), + ConfigLayerEntry::new( + codex_app_server_protocol::ConfigLayerSource::Project { + dot_codex_folder: project_dot_codex.clone(), + }, + toml::toml! { + [mcp_servers.fresh_project] + command = "fresh-project-command" + } + .into(), + ), + ConfigLayerEntry::new( + codex_app_server_protocol::ConfigLayerSource::LegacyManagedConfigTomlFromMdm, + toml::toml! { + [mcp_servers.managed_overrides_session] + command = "managed-command" + } + .into(), + ), + ], + requirements, + requirements_toml, + ) + .map_err(std::io::Error::other)?; + let refreshed_toml = refreshed_layer_stack + .effective_config() + .try_into() + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?; + let refreshed_config = Config::load_config_with_layer_stack( + LOCAL_FS.as_ref(), + refreshed_toml, + ConfigOverrides { + cwd: Some(codex_home.path().to_path_buf()), + ..Default::default() + }, + codex_home.abs(), + refreshed_layer_stack, + ) + .await?; + let thread_layer_stack = ConfigLayerStack::new( + vec![ + ConfigLayerEntry::new( + codex_app_server_protocol::ConfigLayerSource::User { + file: user_file.clone(), + }, + toml::toml! { + [mcp_servers.session_overrides_user] + command = "old-user-command" + [mcp_servers.managed_overrides_session] + command = "old-user-command" + [mcp_servers.fresh_global] + command = "old-global-command" + } + .into(), + ), + ConfigLayerEntry::new( + codex_app_server_protocol::ConfigLayerSource::Project { + dot_codex_folder: project_dot_codex, + }, + toml::toml! { + [mcp_servers.fresh_project] + command = "old-project-command" + } + .into(), + ), + ConfigLayerEntry::new( + codex_app_server_protocol::ConfigLayerSource::SessionFlags, + toml::toml! { + [mcp_servers.session_overrides_user] + command = "session-command" + [mcp_servers.managed_overrides_session] + command = "session-command" + [mcp_servers.blocked_session] + command = "blocked-session-command" + } + .into(), + ), + ConfigLayerEntry::new( + codex_app_server_protocol::ConfigLayerSource::LegacyManagedConfigTomlFromMdm, + toml::toml! { + [mcp_servers.managed_overrides_session] + command = "old-managed-command" + } + .into(), + ), + ], + Default::default(), + Default::default(), + ) + .map_err(std::io::Error::other)?; + let thread_toml = thread_layer_stack + .effective_config() + .try_into() + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?; + let thread_config = Config::load_config_with_layer_stack( + LOCAL_FS.as_ref(), + thread_toml, + ConfigOverrides { + cwd: Some(codex_home.path().to_path_buf()), + ..Default::default() + }, + codex_home.abs(), + thread_layer_stack, + ) + .await?; + let config = thread_config + .rebuild_preserving_session_layers(&refreshed_config) + .await?; + + assert_eq!( + config.mcp_servers.get(), + &HashMap::from([ + ( + "session_overrides_user".to_string(), + stdio_mcp("session-command"), + ), + ( + "managed_overrides_session".to_string(), + stdio_mcp("managed-command"), + ), + ( + "fresh_global".to_string(), + stdio_mcp("fresh-global-command"), + ), + ( + "fresh_project".to_string(), + stdio_mcp("fresh-project-command"), + ), + ( + "blocked_session".to_string(), + McpServerConfig { + enabled: false, + disabled_reason: Some(McpServerDisabledReason::Requirements { + source: RequirementSource::Unknown, + }), + ..stdio_mcp("blocked-session-command") + }, + ), + ]) + ); + + Ok(()) +} + +#[tokio::test] +async fn rebuild_preserving_session_layers_refreshes_plugin_derived_mcp_config() +-> anyhow::Result<()> { + let codex_home = TempDir::new()?; + let plugin_root = codex_home + .path() + .join("plugins/cache") + .join("test/sample/local"); + std::fs::create_dir_all(plugin_root.join(".codex-plugin"))?; + std::fs::write( + plugin_root.join(".codex-plugin/plugin.json"), + r#"{"name":"sample"}"#, + )?; + std::fs::write( + plugin_root.join(".mcp.json"), + r#"{ + "mcpServers": { + "sample": { + "type": "http", + "url": "https://sample.example/mcp" + } + } +}"#, + )?; + + let user_file = AbsolutePathBuf::resolve_path_against_base(CONFIG_TOML_FILE, codex_home.path()); + let refreshed_layer_stack = ConfigLayerStack::new( + vec![ConfigLayerEntry::new( + codex_app_server_protocol::ConfigLayerSource::User { + file: user_file.clone(), + }, + toml::toml! { + [features] + plugins = true + + [plugins."sample@test"] + enabled = true + } + .into(), + )], + Default::default(), + Default::default(), + )?; + let refreshed_config = Config::load_config_with_layer_stack( + LOCAL_FS.as_ref(), + refreshed_layer_stack.effective_config().try_into()?, + ConfigOverrides { + cwd: Some(codex_home.path().to_path_buf()), + ..Default::default() + }, + codex_home.abs(), + refreshed_layer_stack, + ) + .await?; + let thread_layer_stack = ConfigLayerStack::new( + vec![ConfigLayerEntry::new( + codex_app_server_protocol::ConfigLayerSource::User { file: user_file }, + toml::toml! { + [features] + plugins = false + + [plugins."sample@test"] + enabled = true + } + .into(), + )], + Default::default(), + Default::default(), + )?; + let thread_config = Config::load_config_with_layer_stack( + LOCAL_FS.as_ref(), + thread_layer_stack.effective_config().try_into()?, + ConfigOverrides { + cwd: Some(codex_home.path().to_path_buf()), + ..Default::default() + }, + codex_home.abs(), + thread_layer_stack, + ) + .await?; + let config = thread_config + .rebuild_preserving_session_layers(&refreshed_config) + .await?; + let plugins_manager = PluginsManager::new(codex_home.path().to_path_buf()); + let mcp_config = config.to_mcp_config(&plugins_manager).await; + + assert_eq!( + mcp_config.configured_mcp_servers.get("sample"), + Some(&http_mcp("https://sample.example/mcp")) + ); + + Ok(()) +} + #[tokio::test] async fn to_mcp_config_applies_plugin_mcp_cloud_requirements() -> anyhow::Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index cfe2100d49..657a0413c9 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -1130,6 +1130,62 @@ impl Config { } } + pub async fn rebuild_preserving_session_layers( + &self, + refreshed_config: &Config, + ) -> std::io::Result { + let mut layers = refreshed_config + .config_layer_stack + .get_layers( + ConfigLayerStackOrdering::LowestPrecedenceFirst, + /*include_disabled*/ true, + ) + .into_iter() + .filter(|layer| !is_session_layer(&layer.name)) + .cloned() + .collect::>(); + layers.extend( + self.config_layer_stack + .get_layers( + ConfigLayerStackOrdering::LowestPrecedenceFirst, + /*include_disabled*/ true, + ) + .into_iter() + .filter(|layer| is_session_layer(&layer.name)) + .cloned(), + ); + layers.sort_by_key(|layer| layer.name.precedence()); + + let config_layer_stack = ConfigLayerStack::new( + layers, + refreshed_config.config_layer_stack.requirements().clone(), + refreshed_config + .config_layer_stack + .requirements_toml() + .clone(), + )? + .with_user_and_project_exec_policy_rules_ignored( + refreshed_config + .config_layer_stack + .ignore_user_and_project_exec_policy_rules(), + ); + let cfg: ConfigToml = config_layer_stack + .effective_config() + .try_into() + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?; + Self::load_config_with_layer_stack( + LOCAL_FS.as_ref(), + cfg, + ConfigOverrides { + cwd: Some(self.cwd.to_path_buf()), + ..Default::default() + }, + refreshed_config.codex_home.clone(), + config_layer_stack, + ) + .await + } + /// This is the preferred way to create an instance of [Config]. pub async fn load_with_cli_overrides( cli_overrides: Vec<(String, TomlValue)>, @@ -1680,6 +1736,10 @@ fn thread_store_config( } } +fn is_session_layer(source: &ConfigLayerSource) -> bool { + matches!(source, ConfigLayerSource::SessionFlags) +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum PermissionConfigSyntax { Legacy, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 03a3ec92fc..124f0729ef 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -42,7 +42,6 @@ use codex_protocol::openai_models::ModelPreset; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::InitialHistory; -use codex_protocol::protocol::McpServerRefreshConfig; use codex_protocol::protocol::Op; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionConfiguredEvent; @@ -526,27 +525,6 @@ impl ThreadManager { self.state.list_thread_ids().await } - pub async fn refresh_mcp_servers(&self, refresh_config: McpServerRefreshConfig) { - let threads = self - .state - .threads - .read() - .await - .values() - .cloned() - .collect::>(); - for thread in threads { - if let Err(err) = thread - .submit(Op::RefreshMcpServers { - config: refresh_config.clone(), - }) - .await - { - warn!("failed to request MCP server refresh: {err}"); - } - } - } - pub fn subscribe_thread_created(&self) -> broadcast::Receiver { self.state.thread_created_tx.subscribe() }