This commit is contained in:
Matthew Zeng
2026-03-19 10:39:56 -07:00
parent 859c58f07d
commit 1a910fd874
3 changed files with 212 additions and 13 deletions

View File

@@ -168,7 +168,7 @@ Example with notification opt-out:
- `skills/changed` — notification emitted when watched local skill files change.
- `app/list` — list available apps.
- `skills/config/write` — write user-level skill config by path.
- `plugin/install` — install a plugin from a discovered marketplace entry, rejecting marketplace entries marked unavailable for install, and return the effective plugin auth policy plus any apps that still need auth (**under development; do not call from production clients yet**).
- `plugin/install` — install a plugin from a discovered marketplace entry, rejecting marketplace entries marked unavailable for install, queue a reload for bundled plugin MCP servers, start MCP OAuth in the browser for bundled servers that support it, and return the effective plugin auth policy plus any apps that still need auth (**under development; do not call from production clients yet**).
- `plugin/uninstall` — uninstall a plugin by id by removing its cached files and clearing its user-level config entry (**under development; do not call from production clients yet**).
- `mcpServer/oauth/login` — start an OAuth login for a configured MCP server; returns an `authorization_url` and later emits `mcpServer/oauthLogin/completed` once the browser flow finishes.
- `tool/requestUserInput` — prompt the user with 13 short questions for a tool call and return their answers (experimental).

View File

@@ -215,8 +215,11 @@ use codex_core::find_thread_name_by_id;
use codex_core::find_thread_names_by_ids;
use codex_core::find_thread_path_by_id_str;
use codex_core::git_info::git_diff_to_remote;
use codex_core::mcp::auth::McpOAuthLoginSupport;
use codex_core::mcp::auth::discover_supported_scopes;
use codex_core::mcp::auth::oauth_login_support;
use codex_core::mcp::auth::resolve_oauth_scopes;
use codex_core::mcp::auth::should_retry_without_scopes;
use codex_core::mcp::collect_mcp_snapshot;
use codex_core::mcp::group_tools_by_server;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
@@ -272,6 +275,7 @@ use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_rmcp_client::perform_oauth_login;
use codex_rmcp_client::perform_oauth_login_return_url;
use codex_state::StateRuntime;
use codex_state::ThreadMetadata;
@@ -4587,20 +4591,31 @@ impl CodexMessageProcessor {
}
};
if let Err(error) = self.queue_mcp_server_refresh_for_config(&config).await {
self.outgoing.send_error(request_id, error).await;
return;
}
let response = McpServerRefreshResponse {};
self.outgoing.send_response(request_id, response).await;
}
async fn queue_mcp_server_refresh_for_config(
&self,
config: &Config,
) -> Result<(), JSONRPCErrorError> {
let configured_servers = self
.thread_manager
.mcp_manager()
.configured_servers(&config);
.configured_servers(config);
let mcp_servers = match serde_json::to_value(configured_servers) {
Ok(value) => value,
Err(err) => {
let error = JSONRPCErrorError {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to serialize MCP servers: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
});
}
};
@@ -4608,15 +4623,13 @@ impl CodexMessageProcessor {
match serde_json::to_value(config.mcp_oauth_credentials_store_mode) {
Ok(value) => value,
Err(err) => {
let error = JSONRPCErrorError {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to serialize MCP OAuth credentials store mode: {err}"
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
});
}
};
@@ -4629,8 +4642,7 @@ impl CodexMessageProcessor {
// active turn to avoid work for threads that never resume.
let thread_manager = Arc::clone(&self.thread_manager);
thread_manager.refresh_mcp_servers(refresh_config).await;
let response = McpServerRefreshResponse {};
self.outgoing.send_response(request_id, response).await;
Ok(())
}
async fn mcp_server_oauth_login(
@@ -5710,6 +5722,10 @@ impl CodexMessageProcessor {
let config_cwd = marketplace_path.as_path().parent().map(Path::to_path_buf);
let plugins_manager = self.thread_manager.plugins_manager();
let plugin_read_request = PluginReadRequest {
plugin_name: plugin_name.clone(),
marketplace_path: marketplace_path.clone(),
};
let request = PluginInstallRequest {
plugin_name,
marketplace_path,
@@ -5742,6 +5758,32 @@ impl CodexMessageProcessor {
self.config.as_ref().clone()
}
};
self.clear_plugin_related_caches();
let plugin_mcp_server_names =
match plugins_manager.read_plugin_for_config(&config, &plugin_read_request) {
Ok(outcome) => outcome.plugin.mcp_server_names,
Err(err) => {
warn!(
plugin = result.plugin_id.as_key(),
"failed to read plugin MCP servers after install: {err:#}"
);
Vec::new()
}
};
if !plugin_mcp_server_names.is_empty() {
if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await {
warn!(
plugin = result.plugin_id.as_key(),
"failed to queue MCP refresh after plugin install: {err:?}"
);
}
self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_server_names)
.await;
}
let plugin_apps = load_plugin_apps(result.installed_path.as_path());
let apps_needing_auth = if plugin_apps.is_empty()
|| !config.features.apps_enabled(Some(&self.auth_manager)).await
@@ -5802,7 +5844,6 @@ impl CodexMessageProcessor {
)
};
self.clear_plugin_related_caches();
self.outgoing
.send_response(
request_id,
@@ -5858,6 +5899,94 @@ impl CodexMessageProcessor {
}
}
async fn start_plugin_mcp_oauth_logins(
&self,
config: &Config,
plugin_mcp_server_names: Vec<String>,
) {
let configured_servers = self.thread_manager.mcp_manager().configured_servers(config);
for name in plugin_mcp_server_names {
let Some(server) = configured_servers.get(&name).cloned() else {
warn!(
mcp_server = name,
"plugin MCP server was not found after install"
);
continue;
};
let oauth_config = match oauth_login_support(&server.transport).await {
McpOAuthLoginSupport::Supported(config) => config,
McpOAuthLoginSupport::Unsupported => continue,
McpOAuthLoginSupport::Unknown(err) => {
warn!(
"MCP server may or may not require login for plugin install {name}: {err}"
);
continue;
}
};
let resolved_scopes = resolve_oauth_scopes(
/*explicit_scopes*/ None,
server.scopes.clone(),
oauth_config.discovered_scopes.clone(),
);
let store_mode = config.mcp_oauth_credentials_store_mode;
let callback_port = config.mcp_oauth_callback_port;
let callback_url = config.mcp_oauth_callback_url.clone();
let outgoing = Arc::clone(&self.outgoing);
let notification_name = name.clone();
tokio::spawn(async move {
let first_attempt = perform_oauth_login(
&name,
&oauth_config.url,
store_mode,
oauth_config.http_headers.clone(),
oauth_config.env_http_headers.clone(),
&resolved_scopes.scopes,
server.oauth_resource.as_deref(),
callback_port,
callback_url.as_deref(),
)
.await;
let final_result = match first_attempt {
Err(err) if should_retry_without_scopes(&resolved_scopes, &err) => {
perform_oauth_login(
&name,
&oauth_config.url,
store_mode,
oauth_config.http_headers,
oauth_config.env_http_headers,
&[],
server.oauth_resource.as_deref(),
callback_port,
callback_url.as_deref(),
)
.await
}
result => result,
};
let (success, error) = match final_result {
Ok(()) => (true, None),
Err(err) => (false, Some(err.to_string())),
};
let notification = ServerNotification::McpServerOauthLoginCompleted(
McpServerOauthLoginCompletedNotification {
name: notification_name,
success,
error,
},
);
outgoing.send_server_notification(notification).await;
});
}
}
async fn plugin_uninstall(
&self,
request_id: ConnectionRequestId,

View File

@@ -527,6 +527,76 @@ async fn plugin_install_filters_disallowed_apps_needing_auth() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn plugin_install_makes_bundled_mcp_servers_available_to_followup_requests() -> Result<()> {
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
"[features]\nplugins = true\n",
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
None,
None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
std::fs::write(
repo_root.path().join("sample-plugin/.mcp.json"),
r#"{
"mcpServers": {
"sample-mcp": {
"command": "echo"
}
}
}"#,
)?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path,
plugin_name: "sample-plugin".to_string(),
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let request_id = mcp
.send_raw_request(
"mcpServer/oauth/login",
Some(json!({
"name": "sample-mcp",
})),
)
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert_eq!(
err.error.message,
"OAuth login is only supported for streamable HTTP servers."
);
Ok(())
}
#[derive(Clone)]
struct AppsServerState {
response: Arc<StdMutex<serde_json::Value>>,