Compare commits

...

7 Commits

Author SHA1 Message Date
Matthew Zeng
b4843a50b1 Merge branch 'dev/mzeng/hot_reload_config' of https://github.com/openai/codex into dev/mzeng/hot_reload_config 2026-02-27 16:00:50 -08:00
Matthew Zeng
cc15dbdb09 Merge branch 'main' of https://github.com/openai/codex into dev/mzeng/hot_reload_config 2026-02-27 15:58:31 -08:00
Matthew Zeng
cea21b18e2 Merge branch 'main' into dev/mzeng/hot_reload_config 2026-02-27 09:38:57 -08:00
Matthew Zeng
13a0d98633 Merge branch 'main' into dev/mzeng/hot_reload_config 2026-02-25 16:57:41 -08:00
Matthew Zeng
2ba659a54d Merge branch 'main' of https://github.com/openai/codex into dev/mzeng/hot_reload_config 2026-02-23 14:40:39 -08:00
Matthew Zeng
ad95377604 update 2026-02-23 12:48:10 -08:00
Matthew Zeng
fb494b73d9 update 2026-02-22 22:27:46 -08:00
4 changed files with 100 additions and 44 deletions

View File

@@ -4068,42 +4068,35 @@ impl CodexMessageProcessor {
}
async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) {
let config = match self.load_latest_config().await {
Ok(config) => config,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
if let Err(error) = self.refresh_mcp_servers_from_latest_config().await {
self.outgoing.send_error(request_id, error).await;
return;
}
let mcp_servers = match serde_json::to_value(config.mcp_servers.get()) {
Ok(value) => value,
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to serialize MCP servers: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let response = McpServerRefreshResponse {};
self.outgoing.send_response(request_id, response).await;
}
pub(crate) async fn refresh_mcp_servers_from_latest_config(
&self,
) -> Result<(), JSONRPCErrorError> {
let config = self.load_latest_config().await?;
let mcp_servers =
serde_json::to_value(config.mcp_servers.get()).map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to serialize MCP servers: {err}"),
data: None,
})?;
let mcp_oauth_credentials_store_mode =
match serde_json::to_value(config.mcp_oauth_credentials_store_mode) {
Ok(value) => value,
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to serialize MCP OAuth credentials store mode: {err}"
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
serde_json::to_value(config.mcp_oauth_credentials_store_mode).map_err(|err| {
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to serialize MCP OAuth credentials store mode: {err}"),
data: None,
}
};
})?;
let refresh_config = McpServerRefreshConfig {
mcp_servers,
@@ -4112,10 +4105,14 @@ impl CodexMessageProcessor {
// Refresh requests are queued per thread; each thread rebuilds MCP connections on its next
// active turn to avoid work for threads that never resume.
let thread_manager = Arc::clone(&self.thread_manager);
thread_manager.refresh_mcp_servers(refresh_config).await;
let response = McpServerRefreshResponse {};
self.outgoing.send_response(request_id, response).await;
self.thread_manager
.refresh_mcp_servers(refresh_config)
.await;
Ok(())
}
pub(crate) async fn reload_user_config_for_threads(&self) {
self.thread_manager.reload_user_config().await;
}
async fn mcp_server_oauth_login(

View File

@@ -504,7 +504,12 @@ impl MessageProcessor {
params: ConfigValueWriteParams,
) {
match self.config_api.write_value(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.codex_message_processor
.reload_user_config_for_threads()
.await;
self.outgoing.send_response(request_id, response).await;
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -515,7 +520,12 @@ impl MessageProcessor {
params: ConfigBatchWriteParams,
) {
match self.config_api.batch_write(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.codex_message_processor
.reload_user_config_for_threads()
.await;
self.outgoing.send_response(request_id, response).await;
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}

View File

@@ -943,12 +943,39 @@ async fn list_apps_force_refetch_patches_updates_from_cached_snapshots() -> Resu
})
.await?;
let warm_first_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(
warm_first_update.data,
vec![AppInfo {
let expected_warm_accessible_first = vec![AppInfo {
id: "beta".to_string(),
name: "Beta App".to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()),
is_accessible: true,
is_enabled: true,
}];
let expected_warm_directory_first = vec![
AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha v1".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
is_accessible: false,
is_enabled: true,
},
AppInfo {
id: "beta".to_string(),
name: "Beta App".to_string(),
description: None,
description: Some("Beta v1".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
@@ -956,9 +983,15 @@ async fn list_apps_force_refetch_patches_updates_from_cached_snapshots() -> Resu
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()),
is_accessible: true,
is_accessible: false,
is_enabled: true,
}]
},
];
assert!(
warm_first_update.data == expected_warm_accessible_first
|| warm_first_update.data == expected_warm_directory_first,
"unexpected first warm app/list update: {:#?}",
warm_first_update.data
);
let warm_second_update = read_app_list_updated_notification(&mut mcp).await?;

View File

@@ -276,6 +276,22 @@ impl ThreadManager {
}
}
pub async fn reload_user_config(&self) {
let threads = self
.state
.threads
.read()
.await
.values()
.cloned()
.collect::<Vec<_>>();
for thread in threads {
if let Err(err) = thread.submit(Op::ReloadUserConfig).await {
warn!("failed to request user config reload: {err}");
}
}
}
pub fn subscribe_thread_created(&self) -> broadcast::Receiver<ThreadId> {
self.state.thread_created_tx.subscribe()
}