Preserve session MCP config on refresh (#21055)

# Overview
MCP refreshes were rebuilding active threads from fresh disk-backed
config only, which dropped thread-start session overlays such as
app-injected MCP servers. This keeps refreshes current with disk config
while preserving the thread-local config that only the active thread
knows about.

# Changes
- Rebuild refreshed config per active thread using that thread's current
`cwd`, rather than fanning out one app-server config to every thread.
- Preserve each thread's `SessionFlags` layer while replacing reloadable
config layers with freshly loaded config, then derive the MCP refresh
payload from the rebuilt result.
- Move MCP refresh orchestration into app-server so manual refreshes
fail loudly while background refreshes remain best-effort, and route
plugin-triggered refreshes through the same per-thread reload path.
- Add regression coverage for session overlays, fresh project config,
plugin-derived MCP config, current requirements, and strict vs
best-effort refresh behavior.

# Verification
- Passed focused Rust coverage for the thread-config rebuild behavior
and deferred MCP refresh flow, plus `cargo test -p codex-app-server
--lib`.
- Verified end to end in the Codex dev app against the locally built
CLI: registered an MCP via thread config, verified that it could be used
successfully before refresh, manually triggered MCP refresh, and
verified that it continued to be available afterward.
This commit is contained in:
aaronl-openai
2026-05-05 21:09:28 -07:00
committed by GitHub
parent 8ef31894dc
commit 9f06d171e2
11 changed files with 652 additions and 93 deletions

View File

@@ -140,6 +140,21 @@ impl ConfigManager {
.await
}
pub(crate) async fn load_latest_config_for_thread(
&self,
thread_config: &Config,
) -> std::io::Result<Config> {
let refreshed_config = self
.load_latest_config(Some(thread_config.cwd.to_path_buf()))
.await?;
let mut config = thread_config
.rebuild_preserving_session_layers(&refreshed_config)
.await?;
self.apply_runtime_feature_enablement(&mut config);
self.apply_arg0_paths(&mut config);
Ok(config)
}
pub(crate) async fn load_default_config(&self) -> std::io::Result<Config> {
let mut config = Config::load_default_with_cli_overrides_for_codex_home(
self.codex_home.clone(),

View File

@@ -85,6 +85,7 @@ mod filters;
mod fs_watch;
mod fuzzy_file_search;
pub mod in_process;
mod mcp_refresh;
mod message_processor;
mod models;
mod outgoing_message;

View File

@@ -0,0 +1,241 @@
use crate::config_manager::ConfigManager;
use codex_core::CodexThread;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_protocol::ThreadId;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
use std::io;
use std::sync::Arc;
use tracing::warn;
pub(crate) async fn queue_strict_refresh(
thread_manager: &Arc<ThreadManager>,
config_manager: &ConfigManager,
) -> io::Result<()> {
config_manager
.load_latest_config(/*fallback_cwd*/ None)
.await?;
let mut refreshes = Vec::new();
for thread_id in thread_manager.list_thread_ids().await {
let thread = thread_manager
.get_thread(thread_id)
.await
.map_err(|err| io::Error::other(format!("failed to load thread {thread_id}: {err}")))?;
let config =
build_refresh_config(thread_manager, config_manager, thread.config().await).await?;
refreshes.push((thread_id, thread, config));
}
for (thread_id, thread, config) in refreshes {
queue_refresh(thread_id, thread, config).await?;
}
Ok(())
}
pub(crate) async fn queue_best_effort_refresh(
thread_manager: &Arc<ThreadManager>,
config_manager: &ConfigManager,
) {
for thread_id in thread_manager.list_thread_ids().await {
let thread = match thread_manager.get_thread(thread_id).await {
Ok(thread) => thread,
Err(err) => {
warn!("failed to load thread {thread_id} for MCP refresh: {err}");
continue;
}
};
let config =
match build_refresh_config(thread_manager, config_manager, thread.config().await).await
{
Ok(config) => config,
Err(err) => {
warn!("failed to build MCP refresh config for thread {thread_id}: {err}");
continue;
}
};
if let Err(err) = queue_refresh(thread_id, thread, config).await {
warn!("{err}");
}
}
}
async fn build_refresh_config(
thread_manager: &ThreadManager,
config_manager: &ConfigManager,
thread_config: Arc<Config>,
) -> io::Result<McpServerRefreshConfig> {
let config = config_manager
.load_latest_config_for_thread(thread_config.as_ref())
.await?;
let mcp_servers = thread_manager
.mcp_manager()
.configured_servers(&config)
.await;
Ok(McpServerRefreshConfig {
mcp_servers: serde_json::to_value(mcp_servers).map_err(io::Error::other)?,
mcp_oauth_credentials_store_mode: serde_json::to_value(
config.mcp_oauth_credentials_store_mode,
)
.map_err(io::Error::other)?,
})
}
async fn queue_refresh(
thread_id: ThreadId,
thread: Arc<CodexThread>,
config: McpServerRefreshConfig,
) -> io::Result<()> {
thread
.submit(Op::RefreshMcpServers { config })
.await
.map(|_| ())
.map_err(|err| {
io::Error::other(format!(
"failed to queue MCP refresh for thread {thread_id}: {err}"
))
})
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use codex_arg0::Arg0DispatchPaths;
use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_config::ThreadConfigContext;
use codex_config::ThreadConfigLoadError;
use codex_config::ThreadConfigLoadErrorCode;
use codex_config::ThreadConfigLoader;
use codex_config::ThreadConfigSource;
use codex_core::agent_graph_store_from_state_db;
use codex_core::config::ConfigOverrides;
use codex_core::init_state_db_from_config;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_protocol::protocol::SessionSource;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tempfile::TempDir;
#[tokio::test]
async fn strict_refresh_reports_thread_planning_failures() -> anyhow::Result<()> {
let (_temp_dir, thread_manager, config_manager, _loader) = refresh_test_state().await?;
let err = queue_strict_refresh(&thread_manager, &config_manager)
.await
.expect_err("strict refresh should fail");
assert_eq!(err.to_string(), "failed to load refresh config");
Ok(())
}
#[tokio::test]
async fn best_effort_refresh_attempts_every_loaded_thread() -> anyhow::Result<()> {
let (_temp_dir, thread_manager, config_manager, loader) = refresh_test_state().await?;
queue_best_effort_refresh(&thread_manager, &config_manager).await;
assert_eq!(loader.good_loads.load(Ordering::Relaxed), 1);
assert_eq!(loader.bad_loads.load(Ordering::Relaxed), 1);
Ok(())
}
async fn refresh_test_state() -> anyhow::Result<(
TempDir,
Arc<ThreadManager>,
ConfigManager,
Arc<CountingThreadConfigLoader>,
)> {
let temp_dir = TempDir::new()?;
let good_cwd = temp_dir.path().join("good");
let bad_cwd = temp_dir.path().join("bad");
std::fs::create_dir_all(&good_cwd)?;
std::fs::create_dir_all(&bad_cwd)?;
let initial_config_manager =
ConfigManager::without_managed_config_for_tests(temp_dir.path().to_path_buf());
let good_config = initial_config_manager
.load_for_cwd(
/*request_overrides*/ None,
ConfigOverrides::default(),
Some(good_cwd.clone()),
)
.await?;
let bad_config = initial_config_manager
.load_for_cwd(
/*request_overrides*/ None,
ConfigOverrides::default(),
Some(bad_cwd.clone()),
)
.await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let state_db = init_state_db_from_config(&good_config)
.await
.expect("refresh tests require state db");
let thread_store = thread_store_from_config(&good_config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_manager = Arc::new(ThreadManager::new(
&good_config,
auth_manager,
SessionSource::Exec,
Arc::new(EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
));
thread_manager.start_thread(good_config).await?;
thread_manager.start_thread(bad_config).await?;
let loader = Arc::new(CountingThreadConfigLoader {
good_cwd: AbsolutePathBuf::try_from(good_cwd)?,
bad_cwd: AbsolutePathBuf::try_from(bad_cwd)?,
good_loads: AtomicUsize::new(0),
bad_loads: AtomicUsize::new(0),
});
let config_manager = ConfigManager::new(
temp_dir.path().to_path_buf(),
Vec::new(),
LoaderOverrides::without_managed_config_for_tests(),
CloudRequirementsLoader::default(),
Arg0DispatchPaths::default(),
loader.clone(),
);
Ok((temp_dir, thread_manager, config_manager, loader))
}
struct CountingThreadConfigLoader {
good_cwd: AbsolutePathBuf,
bad_cwd: AbsolutePathBuf,
good_loads: AtomicUsize,
bad_loads: AtomicUsize,
}
#[async_trait]
impl ThreadConfigLoader for CountingThreadConfigLoader {
async fn load(
&self,
context: ThreadConfigContext,
) -> Result<Vec<ThreadConfigSource>, ThreadConfigLoadError> {
if context.cwd.as_ref() == Some(&self.good_cwd) {
self.good_loads.fetch_add(1, Ordering::Relaxed);
}
if context.cwd.as_ref() == Some(&self.bad_cwd) {
self.bad_loads.fetch_add(1, Ordering::Relaxed);
return Err(ThreadConfigLoadError::new(
ThreadConfigLoadErrorCode::Internal,
/*status_code*/ None,
"failed to load refresh config",
));
}
Ok(Vec::new())
}
}
}

View File

@@ -416,7 +416,7 @@ impl MessageProcessor {
if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) {
// Keep plugin startup warmups aligned at app-server startup.
let on_effective_plugins_changed =
plugin_processor.effective_plugins_changed_callback((*config).clone());
plugin_processor.effective_plugins_changed_callback();
thread_manager
.plugins_manager()
.maybe_start_plugin_startup_tasks_for_config(

View File

@@ -361,7 +361,6 @@ use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GitInfo as CoreGitInfo;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
use codex_protocol::protocol::RealtimeVoicesList;

View File

@@ -168,7 +168,7 @@ impl AccountRequestProcessor {
{
Ok(config) => {
let refresh_thread_manager = Arc::clone(thread_manager);
let refresh_config = config.clone();
let refresh_config_manager = config_manager.clone();
thread_manager
.plugins_manager()
.maybe_start_remote_installed_plugins_cache_refresh(
@@ -177,7 +177,7 @@ impl AccountRequestProcessor {
Some(Arc::new(move || {
Self::spawn_effective_plugins_changed_task(
Arc::clone(&refresh_thread_manager),
refresh_config.clone(),
refresh_config_manager.clone(),
);
})),
);
@@ -190,19 +190,17 @@ impl AccountRequestProcessor {
}
}
fn spawn_effective_plugins_changed_task(thread_manager: Arc<ThreadManager>, config: Config) {
fn spawn_effective_plugins_changed_task(
thread_manager: Arc<ThreadManager>,
config_manager: ConfigManager,
) {
tokio::spawn(async move {
thread_manager.plugins_manager().clear_cache();
thread_manager.skills_manager().clear_cache();
if thread_manager.list_thread_ids().await.is_empty() {
return;
}
if let Err(err) =
McpRequestProcessor::queue_mcp_server_refresh_for_config(&thread_manager, &config)
.await
{
warn!("failed to queue MCP refresh after effective plugins changed: {err:?}");
}
crate::mcp_refresh::queue_best_effort_refresh(&thread_manager, &config_manager).await;
});
}

View File

@@ -77,8 +77,9 @@ impl McpRequestProcessor {
&self,
_params: Option<()>,
) -> Result<McpServerRefreshResponse, JSONRPCErrorError> {
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
Self::queue_mcp_server_refresh_for_config(&self.thread_manager, &config).await?;
crate::mcp_refresh::queue_strict_refresh(&self.thread_manager, &self.config_manager)
.await
.map_err(|err| internal_error(format!("failed to refresh MCP servers: {err}")))?;
Ok(McpServerRefreshResponse {})
}
@@ -108,44 +109,6 @@ impl McpRequestProcessor {
Ok((thread_id, thread))
}
pub(super) async fn queue_mcp_server_refresh_for_config(
thread_manager: &Arc<ThreadManager>,
config: &Config,
) -> Result<(), JSONRPCErrorError> {
let configured_servers = thread_manager
.mcp_manager()
.configured_servers(config)
.await;
let mcp_servers = match serde_json::to_value(configured_servers) {
Ok(value) => value,
Err(err) => {
return Err(internal_error(format!(
"failed to serialize MCP servers: {err}"
)));
}
};
let mcp_oauth_credentials_store_mode =
match serde_json::to_value(config.mcp_oauth_credentials_store_mode) {
Ok(value) => value,
Err(err) => {
return Err(internal_error(format!(
"failed to serialize MCP OAuth credentials store mode: {err}"
)));
}
};
let refresh_config = McpServerRefreshConfig {
mcp_servers,
mcp_oauth_credentials_store_mode,
};
// Refresh requests are queued per thread; each thread rebuilds MCP connections on its next
// active turn to avoid work for threads that never resume.
thread_manager.refresh_mcp_servers(refresh_config).await;
Ok(())
}
async fn mcp_server_oauth_login_response(
&self,
params: McpServerOauthLoginParams,

View File

@@ -246,33 +246,35 @@ impl PluginRequestProcessor {
.map(|response| Some(response.into()))
}
pub(crate) fn effective_plugins_changed_callback(
&self,
config: Config,
) -> Arc<dyn Fn() + Send + Sync> {
pub(crate) fn effective_plugins_changed_callback(&self) -> Arc<dyn Fn() + Send + Sync> {
let thread_manager = Arc::clone(&self.thread_manager);
let config_manager = self.config_manager.clone();
Arc::new(move || {
Self::spawn_effective_plugins_changed_task(Arc::clone(&thread_manager), config.clone());
Self::spawn_effective_plugins_changed_task(
Arc::clone(&thread_manager),
config_manager.clone(),
);
})
}
fn on_effective_plugins_changed(&self, config: Config) {
Self::spawn_effective_plugins_changed_task(Arc::clone(&self.thread_manager), config);
fn on_effective_plugins_changed(&self) {
Self::spawn_effective_plugins_changed_task(
Arc::clone(&self.thread_manager),
self.config_manager.clone(),
);
}
fn spawn_effective_plugins_changed_task(thread_manager: Arc<ThreadManager>, config: Config) {
fn spawn_effective_plugins_changed_task(
thread_manager: Arc<ThreadManager>,
config_manager: ConfigManager,
) {
tokio::spawn(async move {
thread_manager.plugins_manager().clear_cache();
thread_manager.skills_manager().clear_cache();
if thread_manager.list_thread_ids().await.is_empty() {
return;
}
if let Err(err) =
McpRequestProcessor::queue_mcp_server_refresh_for_config(&thread_manager, &config)
.await
{
warn!("failed to queue MCP refresh after effective plugins changed: {err:?}");
}
crate::mcp_refresh::queue_best_effort_refresh(&thread_manager, &config_manager).await;
});
}
@@ -342,7 +344,7 @@ impl PluginRequestProcessor {
&plugins_input,
auth.clone(),
&roots,
Some(self.effective_plugins_changed_callback(config.clone())),
Some(self.effective_plugins_changed_callback()),
);
let config_for_marketplace_listing = plugins_input.clone();
@@ -840,7 +842,7 @@ impl PluginRequestProcessor {
}
};
self.on_effective_plugins_changed(config.clone());
self.on_effective_plugins_changed();
let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await;
if !plugin_mcp_servers.is_empty() {
@@ -951,7 +953,7 @@ impl PluginRequestProcessor {
.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
&config.plugins_config_input(),
auth.clone(),
Some(self.effective_plugins_changed_callback(config.clone())),
Some(self.effective_plugins_changed_callback()),
);
let mut plugin_metadata =
@@ -1144,7 +1146,7 @@ impl PluginRequestProcessor {
.await
.map_err(Self::plugin_uninstall_error)?;
match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => self.on_effective_plugins_changed(config),
Ok(_) => self.on_effective_plugins_changed(),
Err(err) => {
warn!(
"failed to reload config after plugin uninstall, clearing plugin-related caches only: {err:?}"
@@ -1245,12 +1247,12 @@ impl PluginRequestProcessor {
) {
let plugins_manager = self.thread_manager.plugins_manager();
if plugins_manager.clear_remote_installed_plugins_cache() {
self.on_effective_plugins_changed(config.clone());
self.on_effective_plugins_changed();
}
plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
&config.plugins_config_input(),
auth.clone(),
Some(self.effective_plugins_changed_callback(config.clone())),
Some(self.effective_plugins_changed_callback()),
);
}