Compare commits

..

2 Commits

Author SHA1 Message Date
Richard Lee
2e54d30ad9 document exec-server observability proposal 2026-05-28 21:54:38 -07:00
xl-openai
f0a839ea0c Add runtime extra skill roots API (#24977)
## Summary
- Add v2 `skills/extraRoots/set` to replace app-server process-local
standalone skill roots. The setting is not persisted, accepts missing
roots, and `extraRoots: []` clears the runtime set.
- Wire runtime roots into core skill discovery for `skills/list` and
turn loads, clear skill caches on set, and register the roots with the
skills watcher so later filesystem changes emit `skills/changed`.
- Update app-server docs, generated JSON/TypeScript schemas, and
coverage for serialization, missing roots, empty clears, and restart
behavior.

## Testing
- `cargo test -p codex-app-server-protocol`
- `cargo test -p codex-core-skills`
- `cargo test -p codex-app-server
skills_extra_roots_set_updates_process_runtime_roots`
- `just fix -p codex-app-server-protocol`
- `just fix -p codex-core-skills`
- `just fix -p codex-app-server`
2026-05-28 21:14:34 -07:00
47 changed files with 1007 additions and 1805 deletions

View File

@@ -2988,6 +2988,20 @@
],
"type": "object"
},
"SkillsExtraRootsSetParams": {
"properties": {
"extraRoots": {
"items": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"type": "array"
}
},
"required": [
"extraRoots"
],
"type": "object"
},
"SkillsListParams": {
"properties": {
"cwds": {
@@ -4783,6 +4797,30 @@
"title": "Skills/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"skills/extraRoots/set"
],
"title": "Skills/extraRoots/setRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/SkillsExtraRootsSetParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Skills/extraRoots/setRequest",
"type": "object"
},
{
"properties": {
"id": {

View File

@@ -709,6 +709,30 @@
"title": "Skills/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"skills/extraRoots/set"
],
"title": "Skills/extraRoots/setRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/SkillsExtraRootsSetParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Skills/extraRoots/setRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -15121,6 +15145,27 @@
"title": "SkillsConfigWriteResponse",
"type": "object"
},
"SkillsExtraRootsSetParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"extraRoots": {
"items": {
"$ref": "#/definitions/v2/AbsolutePathBuf"
},
"type": "array"
}
},
"required": [
"extraRoots"
],
"title": "SkillsExtraRootsSetParams",
"type": "object"
},
"SkillsExtraRootsSetResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SkillsExtraRootsSetResponse",
"type": "object"
},
"SkillsListEntry": {
"properties": {
"cwd": {

View File

@@ -1457,6 +1457,30 @@
"title": "Skills/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"skills/extraRoots/set"
],
"title": "Skills/extraRoots/setRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/SkillsExtraRootsSetParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Skills/extraRoots/setRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -12945,6 +12969,27 @@
"title": "SkillsConfigWriteResponse",
"type": "object"
},
"SkillsExtraRootsSetParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"extraRoots": {
"items": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"type": "array"
}
},
"required": [
"extraRoots"
],
"title": "SkillsExtraRootsSetParams",
"type": "object"
},
"SkillsExtraRootsSetResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SkillsExtraRootsSetResponse",
"type": "object"
},
"SkillsListEntry": {
"properties": {
"cwd": {

View File

@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AbsolutePathBuf": {
"description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",
"type": "string"
}
},
"properties": {
"extraRoots": {
"items": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"type": "array"
}
},
"required": [
"extraRoots"
],
"title": "SkillsExtraRootsSetParams",
"type": "object"
}

View File

@@ -0,0 +1,5 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SkillsExtraRootsSetResponse",
"type": "object"
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { AbsolutePathBuf } from "../AbsolutePathBuf";
export type SkillsExtraRootsSetParams = { extraRoots: Array<AbsolutePathBuf>, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type SkillsExtraRootsSetResponse = Record<string, never>;

View File

@@ -344,6 +344,8 @@ export type { SkillToolDependency } from "./SkillToolDependency";
export type { SkillsChangedNotification } from "./SkillsChangedNotification";
export type { SkillsConfigWriteParams } from "./SkillsConfigWriteParams";
export type { SkillsConfigWriteResponse } from "./SkillsConfigWriteResponse";
export type { SkillsExtraRootsSetParams } from "./SkillsExtraRootsSetParams";
export type { SkillsExtraRootsSetResponse } from "./SkillsExtraRootsSetResponse";
export type { SkillsListEntry } from "./SkillsListEntry";
export type { SkillsListParams } from "./SkillsListParams";
export type { SkillsListResponse } from "./SkillsListResponse";

View File

@@ -610,6 +610,11 @@ client_request_definitions! {
serialization: global_shared_read("config"),
response: v2::SkillsListResponse,
},
SkillsExtraRootsSet => "skills/extraRoots/set" {
params: v2::SkillsExtraRootsSetParams,
serialization: global("config"),
response: v2::SkillsExtraRootsSetResponse,
},
HooksList => "hooks/list" {
params: v2::HooksListParams,
serialization: global("config"),
@@ -1721,6 +1726,17 @@ mod tests {
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
);
let skills_extra_roots_set = ClientRequest::SkillsExtraRootsSet {
request_id: request_id(),
params: v2::SkillsExtraRootsSetParams {
extra_roots: vec![absolute_path("/tmp/skills")],
},
};
assert_eq!(
skills_extra_roots_set.serialization_scope(),
Some(ClientRequestSerializationScope::Global("config"))
);
let plugin_list = ClientRequest::PluginList {
request_id: request_id(),
params: v2::PluginListParams {

View File

@@ -35,6 +35,18 @@ pub struct SkillsListResponse {
pub data: Vec<SkillsListEntry>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct SkillsExtraRootsSetParams {
pub extra_roots: Vec<AbsolutePathBuf>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct SkillsExtraRootsSetResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -2626,6 +2626,27 @@ fn skills_list_params_serialization_uses_force_reload() {
);
}
#[test]
fn skills_extra_roots_set_params_serialization_uses_extra_roots() {
assert_eq!(
serde_json::to_value(SkillsExtraRootsSetParams {
extra_roots: vec![absolute_path("tmp/skills")],
})
.unwrap(),
json!({
"extraRoots": [absolute_path_string("tmp/skills")],
}),
);
}
#[test]
fn skills_extra_roots_set_params_rejects_relative_roots() {
let result = serde_json::from_value::<SkillsExtraRootsSetParams>(json!({
"extraRoots": ["relative/path"],
}));
assert!(result.is_err());
}
#[test]
fn plugin_source_serializes_local_git_and_remote_variants() {
let local_path = if cfg!(windows) {

View File

@@ -196,6 +196,7 @@ Example with notification opt-out:
- `environment/add` — experimental; add or replace a named remote environment by `environmentId` and `execServerUrl` for later selection by `thread/start` or `turn/start`; returns `{}` and does not change the default environment.
- `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination). Built-in presets do not select a model; the Plan preset selects medium reasoning effort. This response omits built-in developer instructions; clients should either pass `settings.developer_instructions: null` when setting a mode to use Codex's built-in instructions, or provide their own instructions explicitly.
- `skills/list` — list skills for one or more `cwd` values (optional `forceReload`).
- `skills/extraRoots/set` — replace the app-server process runtime extra standalone skill roots. The roots are not persisted; missing directories are accepted and simply load no skills.
- `hooks/list` — list discovered hooks for one or more `cwd` values.
- `marketplace/add` — add a remote plugin marketplace from an HTTP(S) Git URL, SSH Git URL, or GitHub `owner/repo` shorthand, then persist it into the user marketplace config. Returns the installed root path plus whether the marketplace was already present.
- `marketplace/remove` — remove a configured marketplace by name from the user marketplace config, and delete its installed marketplace root when one exists.
@@ -1506,6 +1507,7 @@ $skill-creator Add a new skill for triaging flaky CI and include step-by-step us
Use `skills/list` to fetch the available skills (optionally scoped by `cwds`, with `forceReload`).
`skills/list` might reuse a cached skills result per `cwd`; setting `forceReload` to `true` refreshes the result from disk.
The server also emits `skills/changed` notifications when watched local skill files change. Treat this as an invalidation signal and re-run `skills/list` with your current params when needed.
Use `skills/extraRoots/set` to replace additional standalone skill roots for the current app-server process. These roots use the same layout as other standalone skill roots: each root contains skill directories, and each skill directory contains `SKILL.md`. Missing roots are accepted and load no skills until they exist. This setting is lost when app-server exits.
```json
{ "method": "skills/list", "id": 25, "params": {
@@ -1542,12 +1544,23 @@ The server also emits `skills/changed` notifications when watched local skill fi
}
```
```json
{
"method": "skills/extraRoots/set",
"id": 26,
"params": {
"extraRoots": ["/Users/me/generated-skills"]
}
}
{ "id": 26, "result": {} }
```
To enable or disable a skill by absolute path:
```json
{
"method": "skills/config/write",
"id": 26,
"id": 27,
"params": {
"path": "/Users/alice/.codex/skills/skill-creator/SKILL.md",
"name": null,
@@ -1561,7 +1574,7 @@ To enable or disable a skill by name:
```json
{
"method": "skills/config/write",
"id": 27,
"id": 28,
"params": {
"path": null,
"name": "github:yeet",

View File

@@ -354,6 +354,8 @@ impl MessageProcessor {
app_list_shutdown_token,
);
let catalog_processor = CatalogRequestProcessor::new(
outgoing.clone(),
Arc::clone(&skills_watcher),
auth_manager.clone(),
Arc::clone(&thread_manager),
Arc::clone(&config),
@@ -1108,6 +1110,9 @@ impl MessageProcessor {
ClientRequest::SkillsList { params, .. } => {
self.catalog_processor.skills_list(params).await
}
ClientRequest::SkillsExtraRootsSet { params, .. } => {
self.catalog_processor.skills_extra_roots_set(params).await
}
ClientRequest::HooksList { params, .. } => {
self.catalog_processor.hooks_list(params).await
}

View File

@@ -156,6 +156,8 @@ use codex_app_server_protocol::ServerRequestResolvedNotification;
use codex_app_server_protocol::SkillSummary;
use codex_app_server_protocol::SkillsConfigWriteParams;
use codex_app_server_protocol::SkillsConfigWriteResponse;
use codex_app_server_protocol::SkillsExtraRootsSetParams;
use codex_app_server_protocol::SkillsExtraRootsSetResponse;
use codex_app_server_protocol::SkillsListParams;
use codex_app_server_protocol::SkillsListResponse;
use codex_app_server_protocol::SortDirection;
@@ -350,7 +352,7 @@ use codex_mcp::McpRuntimeContext;
use codex_mcp::McpServerStatusSnapshot;
use codex_mcp::McpSnapshotDetail;
use codex_mcp::collect_mcp_server_status_snapshot_with_detail;
use codex_mcp::discover_supported_scopes_with_runtime_context;
use codex_mcp::discover_supported_scopes;
use codex_mcp::read_mcp_resource as read_mcp_resource_without_thread;
use codex_mcp::resolve_oauth_scopes;
use codex_memories_write::clear_memory_roots_contents;
@@ -402,7 +404,7 @@ use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_rmcp_client::perform_oauth_login_return_url_with_http_client;
use codex_rmcp_client::perform_oauth_login_return_url;
use codex_rollout::EventPersistenceMode;
use codex_rollout::is_persisted_rollout_item;
use codex_rollout::state_db::StateDbHandle;

View File

@@ -4,6 +4,8 @@ use futures::StreamExt;
#[derive(Clone)]
pub(crate) struct CatalogRequestProcessor {
pub(super) outgoing: Arc<OutgoingMessageSender>,
pub(super) skills_watcher: Arc<SkillsWatcher>,
pub(super) auth_manager: Arc<AuthManager>,
pub(super) thread_manager: Arc<ThreadManager>,
pub(super) config: Arc<Config>,
@@ -96,6 +98,8 @@ fn errors_to_info(
impl CatalogRequestProcessor {
pub(crate) fn new(
outgoing: Arc<OutgoingMessageSender>,
skills_watcher: Arc<SkillsWatcher>,
auth_manager: Arc<AuthManager>,
thread_manager: Arc<ThreadManager>,
config: Arc<Config>,
@@ -103,6 +107,8 @@ impl CatalogRequestProcessor {
workspace_settings_cache: Arc<workspace_settings::WorkspaceSettingsCache>,
) -> Self {
Self {
outgoing,
skills_watcher,
auth_manager,
thread_manager,
config,
@@ -138,6 +144,15 @@ impl CatalogRequestProcessor {
.map(|response| Some(response.into()))
}
pub(crate) async fn skills_extra_roots_set(
&self,
params: SkillsExtraRootsSetParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.skills_extra_roots_set_response(params)
.await
.map(|response| Some(response.into()))
}
pub(crate) async fn model_list(
&self,
params: ModelListParams,
@@ -567,6 +582,24 @@ impl CatalogRequestProcessor {
Ok(SkillsListResponse { data })
}
async fn skills_extra_roots_set_response(
&self,
params: SkillsExtraRootsSetParams,
) -> Result<SkillsExtraRootsSetResponse, JSONRPCErrorError> {
let SkillsExtraRootsSetParams { extra_roots } = params;
self.skills_watcher
.register_runtime_extra_roots(&extra_roots);
self.thread_manager
.skills_manager()
.set_extra_roots(extra_roots);
self.outgoing
.send_server_notification(ServerNotification::SkillsChanged(
codex_app_server_protocol::SkillsChangedNotification {},
))
.await;
Ok(SkillsExtraRootsSetResponse {})
}
/// Handle `hooks/list` by resolving hooks for each requested cwd.
async fn hooks_list_response(
&self,

View File

@@ -130,12 +130,6 @@ impl McpRequestProcessor {
"No MCP server named '{name}' found."
)));
};
let environment_manager = self.thread_manager.environment_manager();
// This threadless login path has no turn cwd or turn-selected
// environment. Use config cwd only as the local stdio fallback; HTTP
// auth routing still follows each MCP server's configured environment.
let runtime_context =
McpRuntimeContext::new(Arc::clone(&environment_manager), config.cwd.to_path_buf());
let (url, http_headers, env_http_headers) = match &server.transport {
McpServerTransportConfig::StreamableHttp {
@@ -152,19 +146,14 @@ impl McpRequestProcessor {
};
let discovered_scopes = if scopes.is_none() && server.scopes.is_none() {
discover_supported_scopes_with_runtime_context(&name, server, &runtime_context).await
discover_supported_scopes(&server.transport).await
} else {
None
};
let resolved_scopes =
resolve_oauth_scopes(scopes, server.scopes.clone(), discovered_scopes);
let http_client = runtime_context
.resolve_streamable_http_client(&name, server)
.map_err(|err| {
internal_error(format!("failed to resolve MCP server '{name}': {err}"))
})?;
let handle = perform_oauth_login_return_url_with_http_client(
let handle = perform_oauth_login_return_url(
&name,
&url,
config.mcp_oauth_credentials_store_mode,
@@ -176,7 +165,6 @@ impl McpRequestProcessor {
timeout_secs,
config.mcp_oauth_callback_port,
config.mcp_oauth_callback_url.as_deref(),
http_client,
)
.await
.map_err(|err| internal_error(format!("failed to login to MCP server '{name}': {err}")))?;

View File

@@ -10,9 +10,9 @@ use codex_core_plugins::remote::RemotePluginScope;
use codex_core_plugins::remote::is_valid_remote_plugin_id;
use codex_core_plugins::remote::validate_remote_plugin_id;
use codex_mcp::McpOAuthLoginSupport;
use codex_mcp::oauth_login_support_with_runtime_context;
use codex_mcp::oauth_login_support;
use codex_mcp::should_retry_without_scopes;
use codex_rmcp_client::perform_oauth_login_silent_with_http_client;
use codex_rmcp_client::perform_oauth_login_silent;
#[derive(Clone)]
pub(crate) struct PluginRequestProcessor {
@@ -1565,18 +1565,8 @@ impl PluginRequestProcessor {
config: &Config,
plugin_mcp_servers: HashMap<String, McpServerConfig>,
) {
let runtime_context = McpRuntimeContext::new(
self.thread_manager.environment_manager(),
config.cwd.to_path_buf(),
);
for (name, server) in plugin_mcp_servers {
let oauth_config = match oauth_login_support_with_runtime_context(
&name,
&server,
&runtime_context,
)
.await
{
let oauth_config = match oauth_login_support(&server.transport).await {
McpOAuthLoginSupport::Supported(config) => config,
McpOAuthLoginSupport::Unsupported => continue,
McpOAuthLoginSupport::Unknown(err) => {
@@ -1596,19 +1586,12 @@ impl PluginRequestProcessor {
let store_mode = config.mcp_oauth_credentials_store_mode;
let callback_port = config.mcp_oauth_callback_port;
let callback_url = config.mcp_oauth_callback_url.clone();
let http_client = match runtime_context.resolve_streamable_http_client(&name, &server) {
Ok(http_client) => http_client,
Err(err) => {
warn!("failed to resolve MCP server {name} for plugin login: {err}");
continue;
}
};
let outgoing = Arc::clone(&self.outgoing);
let notification_name = name.clone();
tokio::spawn(async move {
let oauth_client_id = server.oauth_client_id();
let first_attempt = perform_oauth_login_silent_with_http_client(
let first_attempt = perform_oauth_login_silent(
&name,
&oauth_config.url,
store_mode,
@@ -1619,13 +1602,12 @@ impl PluginRequestProcessor {
server.oauth_resource.as_deref(),
callback_port,
callback_url.as_deref(),
Arc::clone(&http_client),
)
.await;
let final_result = match first_attempt {
Err(err) if should_retry_without_scopes(&resolved_scopes, &err) => {
perform_oauth_login_silent_with_http_client(
perform_oauth_login_silent(
&name,
&oauth_config.url,
store_mode,
@@ -1636,7 +1618,6 @@ impl PluginRequestProcessor {
server.oauth_resource.as_deref(),
callback_port,
callback_url.as_deref(),
http_client,
)
.await
}

View File

@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use crate::outgoing_message::OutgoingMessageSender;
@@ -15,6 +16,7 @@ use codex_file_watcher::ThrottledWatchReceiver;
use codex_file_watcher::WatchPath;
use codex_file_watcher::WatchRegistration;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_utils_absolute_path::AbsolutePathBuf;
use tokio_util::sync::CancellationToken;
use tokio_util::sync::DropGuard;
use tracing::warn;
@@ -26,6 +28,7 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50);
pub(crate) struct SkillsWatcher {
subscriber: FileWatcherSubscriber,
runtime_extra_roots_registration: Mutex<WatchRegistration>,
shutdown_token: CancellationToken,
_shutdown_drop_guard: DropGuard,
}
@@ -48,6 +51,7 @@ impl SkillsWatcher {
Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_token.child_token());
Arc::new(Self {
subscriber,
runtime_extra_roots_registration: Mutex::new(WatchRegistration::default()),
shutdown_token,
_shutdown_drop_guard: shutdown_drop_guard,
})
@@ -57,6 +61,22 @@ impl SkillsWatcher {
self.shutdown_token.cancel();
}
pub(crate) fn register_runtime_extra_roots(&self, extra_roots: &[AbsolutePathBuf]) {
let roots = extra_roots
.iter()
.map(|root| WatchPath {
path: root.clone().into_path_buf(),
recursive: true,
})
.collect();
let registration = self.subscriber.register_paths(roots);
let mut guard = self
.runtime_extra_roots_registration
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*guard = registration;
}
pub(crate) async fn register_thread_config(
&self,
config: &Config,

View File

@@ -71,6 +71,7 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ReviewStartParams;
use codex_app_server_protocol::SendAddCreditsNudgeEmailParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SkillsExtraRootsSetParams;
use codex_app_server_protocol::SkillsListParams;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadCompactStartParams;
@@ -675,6 +676,15 @@ impl McpProcess {
self.send_request("skills/list", params).await
}
/// Send a `skills/extraRoots/set` JSON-RPC request.
pub async fn send_skills_extra_roots_set_request(
&mut self,
params: SkillsExtraRootsSetParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("skills/extraRoots/set", params).await
}
/// Send a `hooks/list` JSON-RPC request.
pub async fn send_hooks_list_request(
&mut self,

View File

@@ -13,11 +13,14 @@ use codex_app_server_protocol::PluginListParams;
use codex_app_server_protocol::PluginListResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SkillsChangedNotification;
use codex_app_server_protocol::SkillsExtraRootsSetParams;
use codex_app_server_protocol::SkillsExtraRootsSetResponse;
use codex_app_server_protocol::SkillsListParams;
use codex_app_server_protocol::SkillsListResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_config::types::AuthCredentialsStoreMode;
use codex_exec_server::CODEX_EXEC_SERVER_URL_ENV_VAR;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -40,6 +43,23 @@ fn write_skill(root: &TempDir, name: &str) -> Result<()> {
Ok(())
}
async fn expect_skills_changed_notification(
mcp: &mut McpProcess,
timeout_duration: Duration,
) -> Result<()> {
let notification = timeout(
timeout_duration,
mcp.read_stream_until_notification_message("skills/changed"),
)
.await??;
let params = notification
.params
.context("skills/changed params must be present")?;
let notification: SkillsChangedNotification = serde_json::from_value(params)?;
assert_eq!(notification, SkillsChangedNotification {});
Ok(())
}
fn write_plugins_enabled_config_with_base_url(
codex_home: &std::path::Path,
base_url: &str,
@@ -573,6 +593,150 @@ async fn skills_list_uses_cached_result_until_force_reload() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn skills_extra_roots_set_updates_process_runtime_roots() -> Result<()> {
let codex_home = TempDir::new()?;
let cwd = TempDir::new()?;
let extra_root = TempDir::new()?;
let extra_skills_root = extra_root.path().join("skills");
let skill_dir = extra_skills_root.join("runtime-skill");
std::fs::create_dir_all(&skill_dir)?;
std::fs::write(
skill_dir.join("SKILL.md"),
"---\nname: runtime-skill\ndescription: runtime skill\n---\n\n# Body\n",
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let set_request_id = mcp
.send_skills_extra_roots_set_request(SkillsExtraRootsSetParams {
extra_roots: vec![AbsolutePathBuf::from_absolute_path(&extra_skills_root)?],
})
.await?;
let set_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(set_request_id)),
)
.await??;
let _: SkillsExtraRootsSetResponse = to_response(set_response)?;
expect_skills_changed_notification(&mut mcp, DEFAULT_TIMEOUT).await?;
let skills_request_id = mcp
.send_skills_list_request(SkillsListParams {
cwds: vec![cwd.path().to_path_buf()],
force_reload: false,
})
.await?;
let skills_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(skills_request_id)),
)
.await??;
let SkillsListResponse { data } = to_response(skills_response)?;
assert_eq!(data.len(), 1);
assert_eq!(data[0].errors, Vec::new());
assert!(
data[0]
.skills
.iter()
.any(|skill| skill.name == "runtime-skill")
);
let missing_root = extra_root.path().join("missing-skills");
let reset_request_id = mcp
.send_skills_extra_roots_set_request(SkillsExtraRootsSetParams {
extra_roots: vec![AbsolutePathBuf::from_absolute_path(&missing_root)?],
})
.await?;
let reset_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(reset_request_id)),
)
.await??;
let _: SkillsExtraRootsSetResponse = to_response(reset_response)?;
expect_skills_changed_notification(&mut mcp, DEFAULT_TIMEOUT).await?;
let skills_request_id = mcp
.send_skills_list_request(SkillsListParams {
cwds: vec![cwd.path().to_path_buf()],
force_reload: false,
})
.await?;
let skills_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(skills_request_id)),
)
.await??;
let SkillsListResponse { data } = to_response(skills_response)?;
assert_eq!(data.len(), 1);
assert_eq!(data[0].errors, Vec::new());
assert!(
data[0]
.skills
.iter()
.all(|skill| skill.name != "runtime-skill")
);
let clear_request_id = mcp
.send_skills_extra_roots_set_request(SkillsExtraRootsSetParams {
extra_roots: Vec::new(),
})
.await?;
let clear_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(clear_request_id)),
)
.await??;
let _: SkillsExtraRootsSetResponse = to_response(clear_response)?;
expect_skills_changed_notification(&mut mcp, DEFAULT_TIMEOUT).await?;
let skills_request_id = mcp
.send_skills_list_request(SkillsListParams {
cwds: vec![cwd.path().to_path_buf()],
force_reload: false,
})
.await?;
let skills_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(skills_request_id)),
)
.await??;
let SkillsListResponse { data } = to_response(skills_response)?;
assert_eq!(data.len(), 1);
assert_eq!(data[0].errors, Vec::new());
assert!(
data[0]
.skills
.iter()
.all(|skill| skill.name != "runtime-skill")
);
drop(mcp);
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let skills_request_id = mcp
.send_skills_list_request(SkillsListParams {
cwds: vec![cwd.path().to_path_buf()],
force_reload: false,
})
.await?;
let skills_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(skills_request_id)),
)
.await??;
let SkillsListResponse { data } = to_response(skills_response)?;
assert_eq!(data.len(), 1);
assert_eq!(data[0].errors, Vec::new());
assert!(
data[0]
.skills
.iter()
.all(|skill| skill.name != "runtime-skill")
);
Ok(())
}
#[tokio::test]
async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View File

@@ -42,11 +42,8 @@ pub use mcp::McpOAuthLoginSupport;
pub use mcp::McpOAuthScopesSource;
pub use mcp::ResolvedMcpOAuthScopes;
pub use mcp::compute_auth_statuses;
pub use mcp::compute_auth_statuses_with_runtime_context;
pub use mcp::discover_supported_scopes;
pub use mcp::discover_supported_scopes_with_runtime_context;
pub use mcp::oauth_login_support;
pub use mcp::oauth_login_support_with_runtime_context;
pub use mcp::resolve_oauth_scopes;
pub use mcp::should_retry_without_scopes;

View File

@@ -1,21 +1,17 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use codex_config::McpServerConfig;
use codex_config::McpServerTransportConfig;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_exec_server::HttpClient;
use codex_exec_server::ReqwestHttpClient;
use codex_login::CodexAuth;
use codex_protocol::protocol::McpAuthStatus;
use codex_rmcp_client::OAuthProviderError;
use codex_rmcp_client::determine_streamable_http_auth_status;
use codex_rmcp_client::discover_streamable_http_oauth_with_http_client;
use codex_rmcp_client::discover_streamable_http_oauth;
use futures::future::join_all;
use tracing::warn;
use crate::runtime::McpRuntimeContext;
use crate::server::EffectiveMcpServer;
use super::CODEX_APPS_MCP_SERVER_NAME;
@@ -56,25 +52,6 @@ pub struct McpAuthStatusEntry {
}
pub async fn oauth_login_support(transport: &McpServerTransportConfig) -> McpOAuthLoginSupport {
oauth_login_support_with_http_client(transport, Arc::new(ReqwestHttpClient)).await
}
pub async fn oauth_login_support_with_runtime_context(
server_name: &str,
config: &McpServerConfig,
runtime_context: &McpRuntimeContext,
) -> McpOAuthLoginSupport {
let http_client = match runtime_context.resolve_streamable_http_client(server_name, config) {
Ok(http_client) => http_client,
Err(err) => return McpOAuthLoginSupport::Unknown(anyhow::anyhow!(err)),
};
oauth_login_support_with_http_client(&config.transport, http_client).await
}
async fn oauth_login_support_with_http_client(
transport: &McpServerTransportConfig,
http_client: Arc<dyn HttpClient>,
) -> McpOAuthLoginSupport {
let McpServerTransportConfig::StreamableHttp {
url,
bearer_token_env_var,
@@ -89,13 +66,7 @@ async fn oauth_login_support_with_http_client(
return McpOAuthLoginSupport::Unsupported;
}
match discover_streamable_http_oauth_with_http_client(
url,
http_headers.clone(),
env_http_headers.clone(),
http_client,
)
.await
match discover_streamable_http_oauth(url, http_headers.clone(), env_http_headers.clone()).await
{
Ok(Some(discovery)) => McpOAuthLoginSupport::Supported(McpOAuthLoginConfig {
url: url.clone(),
@@ -117,17 +88,6 @@ pub async fn discover_supported_scopes(
}
}
pub async fn discover_supported_scopes_with_runtime_context(
server_name: &str,
config: &McpServerConfig,
runtime_context: &McpRuntimeContext,
) -> Option<Vec<String>> {
match oauth_login_support_with_runtime_context(server_name, config, runtime_context).await {
McpOAuthLoginSupport::Supported(config) => config.discovered_scopes,
McpOAuthLoginSupport::Unsupported | McpOAuthLoginSupport::Unknown(_) => None,
}
}
pub fn resolve_oauth_scopes(
explicit_scopes: Option<Vec<String>>,
configured_scopes: Option<Vec<String>>,
@@ -174,45 +134,10 @@ pub async fn compute_auth_statuses<'a, I>(
) -> HashMap<String, McpAuthStatusEntry>
where
I: IntoIterator<Item = (&'a String, &'a EffectiveMcpServer)>,
{
compute_auth_statuses_with_http_client_resolver(servers, store_mode, auth, |_, _| {
Ok(Arc::new(ReqwestHttpClient) as Arc<dyn HttpClient>)
})
.await
}
pub async fn compute_auth_statuses_with_runtime_context<'a, I>(
servers: I,
store_mode: OAuthCredentialsStoreMode,
auth: Option<&CodexAuth>,
runtime_context: McpRuntimeContext,
) -> HashMap<String, McpAuthStatusEntry>
where
I: IntoIterator<Item = (&'a String, &'a EffectiveMcpServer)>,
{
compute_auth_statuses_with_http_client_resolver(
servers,
store_mode,
auth,
move |name, config| runtime_context.resolve_streamable_http_client(name, config),
)
.await
}
async fn compute_auth_statuses_with_http_client_resolver<'a, I, F>(
servers: I,
store_mode: OAuthCredentialsStoreMode,
auth: Option<&CodexAuth>,
resolve_http_client: F,
) -> HashMap<String, McpAuthStatusEntry>
where
I: IntoIterator<Item = (&'a String, &'a EffectiveMcpServer)>,
F: Fn(&str, &McpServerConfig) -> Result<Arc<dyn HttpClient>, String> + Clone,
{
let futures = servers.into_iter().map(|(name, server)| {
let name = name.clone();
let config = server.configured_config().cloned();
let resolve_http_client = resolve_http_client.clone();
let has_runtime_auth = name == CODEX_APPS_MCP_SERVER_NAME
&& auth.is_some_and(CodexAuth::uses_codex_backend)
&& config.as_ref().is_some_and(|config| {
@@ -227,15 +152,7 @@ where
async move {
let auth_status = match config.as_ref() {
Some(config) => {
match compute_auth_status(
&name,
config,
store_mode,
has_runtime_auth,
&resolve_http_client,
)
.await
{
match compute_auth_status(&name, config, store_mode, has_runtime_auth).await {
Ok(status) => status,
Err(error) => {
warn!(
@@ -263,7 +180,6 @@ async fn compute_auth_status(
config: &McpServerConfig,
store_mode: OAuthCredentialsStoreMode,
has_runtime_auth: bool,
resolve_http_client: &impl Fn(&str, &McpServerConfig) -> Result<Arc<dyn HttpClient>, String>,
) -> Result<McpAuthStatus> {
if !config.enabled {
return Ok(McpAuthStatus::Unsupported);
@@ -281,8 +197,6 @@ async fn compute_auth_status(
http_headers,
env_http_headers,
} => {
let http_client =
resolve_http_client(server_name, config).map_err(anyhow::Error::msg)?;
determine_streamable_http_auth_status(
server_name,
url,
@@ -290,7 +204,6 @@ async fn compute_auth_status(
http_headers.clone(),
env_http_headers.clone(),
store_mode,
http_client,
)
.await
}

View File

@@ -4,11 +4,8 @@ pub use auth::McpOAuthLoginSupport;
pub use auth::McpOAuthScopesSource;
pub use auth::ResolvedMcpOAuthScopes;
pub use auth::compute_auth_statuses;
pub use auth::compute_auth_statuses_with_runtime_context;
pub use auth::discover_supported_scopes;
pub use auth::discover_supported_scopes_with_runtime_context;
pub use auth::oauth_login_support;
pub use auth::oauth_login_support_with_runtime_context;
pub use auth::resolve_oauth_scopes;
pub use auth::should_retry_without_scopes;
@@ -275,11 +272,10 @@ pub async fn read_mcp_resource(
let mut mcp_servers = effective_mcp_servers(config, auth);
let host_owned_codex_apps_enabled = host_owned_codex_apps_enabled(config, auth);
mcp_servers.retain(|name, _| name == server);
let auth_statuses = compute_auth_statuses_with_runtime_context(
let auth_statuses = compute_auth_statuses(
mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
auth,
runtime_context.clone(),
)
.await;
let (tx_event, rx_event) = unbounded();
@@ -342,11 +338,10 @@ pub async fn collect_mcp_server_status_snapshot_with_detail(
};
}
let auth_status_entries = compute_auth_statuses_with_runtime_context(
let auth_status_entries = compute_auth_statuses(
mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
auth,
runtime_context.clone(),
)
.await;

View File

@@ -46,6 +46,8 @@ use codex_async_utils::OrCancelExt;
use codex_config::McpServerConfig;
use codex_config::McpServerTransportConfig;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_exec_server::HttpClient;
use codex_exec_server::ReqwestHttpClient;
use codex_protocol::mcp::McpServerInfo;
use codex_protocol::protocol::Event;
use codex_rmcp_client::ExecutorStdioServerLauncher;
@@ -584,7 +586,6 @@ async fn make_rmcp_client(
let resolved_environment = runtime_context
.resolve_server_environment(server_name, &config)
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))?;
let http_config = config.clone();
let is_local_environment = config.is_local_environment();
let McpServerConfig { transport, .. } = config;
@@ -631,9 +632,10 @@ async fn make_rmcp_client(
env_http_headers,
bearer_token_env_var,
} => {
let http_client = runtime_context
.resolve_streamable_http_client(server_name, &http_config)
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))?;
let http_client = resolved_environment.as_ref().map_or_else(
|| Arc::new(ReqwestHttpClient) as Arc<dyn HttpClient>,
|environment| environment.get_http_client(),
);
let resolved_bearer_token =
match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) {
Ok(token) => token,

View File

@@ -11,8 +11,6 @@ use std::time::Duration;
use codex_exec_server::Environment;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::HttpClient;
use codex_exec_server::ReqwestHttpClient;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::SandboxPolicy;
@@ -89,18 +87,6 @@ impl McpRuntimeContext {
config.environment_id
))
}
pub fn resolve_streamable_http_client(
&self,
server_name: &str,
config: &codex_config::McpServerConfig,
) -> Result<Arc<dyn HttpClient>, String> {
let resolved_environment = self.resolve_server_environment(server_name, config)?;
Ok(resolved_environment.map_or_else(
|| Arc::new(ReqwestHttpClient) as Arc<dyn HttpClient>,
|environment| environment.get_http_client(),
))
}
}
fn ensure_remote_stdio_cwd(

View File

@@ -235,6 +235,7 @@ pub(crate) async fn skill_roots(
config_layer_stack: &ConfigLayerStack,
cwd: &AbsolutePathBuf,
plugin_skill_roots: Vec<PluginSkillRoot>,
extra_skill_roots: Vec<AbsolutePathBuf>,
) -> Vec<SkillRoot> {
let home_dir =
home_dir().and_then(|path| AbsolutePathBuf::from_absolute_path_checked(path).ok());
@@ -244,6 +245,7 @@ pub(crate) async fn skill_roots(
cwd,
home_dir.as_ref(),
plugin_skill_roots,
extra_skill_roots,
)
.await
}
@@ -254,6 +256,7 @@ async fn skill_roots_with_home_dir(
cwd: &AbsolutePathBuf,
home_dir: Option<&AbsolutePathBuf>,
plugin_skill_roots: Vec<PluginSkillRoot>,
extra_skill_roots: Vec<AbsolutePathBuf>,
) -> Vec<SkillRoot> {
let mut roots = skill_roots_from_layer_stack_inner(config_layer_stack, home_dir, fs.clone());
roots.extend(plugin_skill_roots.into_iter().map(|root| SkillRoot {
@@ -263,6 +266,13 @@ async fn skill_roots_with_home_dir(
plugin_id: Some(root.plugin_id),
plugin_root: Some(root.plugin_root),
}));
roots.extend(extra_skill_roots.into_iter().map(|path| SkillRoot {
path,
scope: SkillScope::User,
file_system: Arc::clone(&LOCAL_FS),
plugin_id: None,
plugin_root: None,
}));
roots.extend(repo_agents_skill_roots(fs, config_layer_stack, cwd).await);
dedupe_skill_roots_by_path(&mut roots);
roots
@@ -1051,7 +1061,15 @@ pub(crate) async fn skill_roots_from_layer_stack(
cwd: &AbsolutePathBuf,
home_dir: Option<&AbsolutePathBuf>,
) -> Vec<SkillRoot> {
skill_roots_with_home_dir(Some(fs), config_layer_stack, cwd, home_dir, Vec::new()).await
skill_roots_with_home_dir(
Some(fs),
config_layer_stack,
cwd,
home_dir,
Vec::new(),
Vec::new(),
)
.await
}
#[cfg(test)]

View File

@@ -1900,6 +1900,7 @@ async fn skill_roots_include_admin_with_lowest_priority() {
&cfg.config_layer_stack,
&cfg.cwd,
Vec::new(),
Vec::new(),
)
.await
.into_iter()

View File

@@ -51,6 +51,7 @@ impl SkillsLoadInput {
pub struct SkillsManager {
codex_home: AbsolutePathBuf,
restriction_product: Option<Product>,
extra_roots: RwLock<Vec<AbsolutePathBuf>>,
cache_by_cwd: RwLock<HashMap<AbsolutePathBuf, SkillLoadOutcome>>,
cache_by_config: RwLock<HashMap<ConfigSkillsCacheKey, SkillLoadOutcome>>,
}
@@ -68,6 +69,7 @@ impl SkillsManager {
let manager = Self {
codex_home,
restriction_product,
extra_roots: RwLock::new(Vec::new()),
cache_by_cwd: RwLock::new(HashMap::new()),
cache_by_config: RwLock::new(HashMap::new()),
};
@@ -81,6 +83,17 @@ impl SkillsManager {
manager
}
pub fn set_extra_roots(&self, extra_roots: Vec<AbsolutePathBuf>) {
{
let mut roots = self
.extra_roots
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*roots = extra_roots;
}
self.clear_cache();
}
/// Load skills for an already-constructed [`Config`], avoiding any additional config-layer
/// loading.
///
@@ -118,6 +131,7 @@ impl SkillsManager {
&input.config_layer_stack,
&input.cwd,
input.effective_skill_roots.clone(),
self.extra_roots(),
)
.await;
if !input.bundled_skills_enabled {
@@ -145,6 +159,7 @@ impl SkillsManager {
&input.config_layer_stack,
&input.cwd,
input.effective_skill_roots.clone(),
self.extra_roots(),
)
.await;
if !bundled_skills_enabled_from_stack(&input.config_layer_stack) {
@@ -214,6 +229,13 @@ impl SkillsManager {
Err(err) => err.into_inner().get(cache_key).cloned(),
}
}
fn extra_roots(&self) -> Vec<AbsolutePathBuf> {
match self.extra_roots.read() {
Ok(roots) => roots.clone(),
Err(err) => err.into_inner().clone(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]

View File

@@ -221,6 +221,128 @@ async fn skills_for_config_reuses_cache_for_same_effective_config() {
assert_eq!(outcome2.skills, outcome1.skills);
}
#[tokio::test]
async fn set_extra_roots_replaces_runtime_roots_and_clears_cache() {
let codex_home = tempfile::tempdir().expect("tempdir");
let cwd = tempfile::tempdir().expect("tempdir");
let extra_root = tempfile::tempdir().expect("tempdir");
let config_layer_stack = config_stack(&codex_home, "");
let skills_manager = SkillsManager::new(
codex_home.path().abs(),
/*bundled_skills_enabled*/ true,
);
let skills_input = SkillsLoadInput::new(
cwd.path().abs(),
Vec::new(),
config_layer_stack.clone(),
bundled_skills_enabled_from_stack(&config_layer_stack),
);
let empty_outcome = skills_manager
.skills_for_cwd(
&skills_input,
/*force_reload*/ false,
Some(Arc::clone(&LOCAL_FS)),
)
.await;
assert!(
empty_outcome
.skills
.iter()
.all(|skill| skill.name != "runtime-skill")
);
let extra_skills_root = extra_root.path().join("skills");
let skill_dir = extra_skills_root.join("runtime-skill");
fs::create_dir_all(&skill_dir).expect("create skill dir");
fs::write(
skill_dir.join("SKILL.md"),
"---\nname: runtime-skill\ndescription: runtime skill\n---\n\n# Body\n",
)
.expect("write skill");
skills_manager.set_extra_roots(vec![extra_skills_root.abs()]);
let runtime_outcome = skills_manager
.skills_for_cwd(
&skills_input,
/*force_reload*/ false,
Some(Arc::clone(&LOCAL_FS)),
)
.await;
assert!(
runtime_outcome
.skills
.iter()
.any(|skill| skill.name == "runtime-skill")
);
skills_manager.set_extra_roots(vec![extra_root.path().join("missing-skills").abs()]);
let replaced_outcome = skills_manager
.skills_for_cwd(
&skills_input,
/*force_reload*/ false,
Some(Arc::clone(&LOCAL_FS)),
)
.await;
assert_eq!(replaced_outcome.errors, Vec::new());
assert!(
replaced_outcome
.skills
.iter()
.all(|skill| skill.name != "runtime-skill")
);
}
#[tokio::test]
async fn set_extra_roots_applies_to_config_loads_and_empty_clears() {
let codex_home = tempfile::tempdir().expect("tempdir");
let cwd = tempfile::tempdir().expect("tempdir");
let extra_root = tempfile::tempdir().expect("tempdir");
let config_layer_stack = config_stack(&codex_home, "");
let skills_manager = SkillsManager::new(
codex_home.path().abs(),
/*bundled_skills_enabled*/ true,
);
let empty_outcome =
skills_for_config_with_stack(&skills_manager, &cwd, &config_layer_stack, &[]).await;
assert!(
empty_outcome
.skills
.iter()
.all(|skill| skill.name != "runtime-skill")
);
let extra_skills_root = extra_root.path().join("skills");
let skill_dir = extra_skills_root.join("runtime-skill");
fs::create_dir_all(&skill_dir).expect("create skill dir");
fs::write(
skill_dir.join("SKILL.md"),
"---\nname: runtime-skill\ndescription: runtime skill\n---\n\n# Body\n",
)
.expect("write skill");
skills_manager.set_extra_roots(vec![extra_skills_root.abs()]);
let runtime_outcome =
skills_for_config_with_stack(&skills_manager, &cwd, &config_layer_stack, &[]).await;
assert!(
runtime_outcome
.skills
.iter()
.any(|skill| skill.name == "runtime-skill")
);
skills_manager.set_extra_roots(Vec::new());
let cleared_outcome =
skills_for_config_with_stack(&skills_manager, &cwd, &config_layer_stack, &[]).await;
assert!(
cleared_outcome
.skills
.iter()
.all(|skill| skill.name != "runtime-skill")
);
}
#[tokio::test]
async fn skills_for_config_disables_plugin_skills_by_name() {
let codex_home = tempfile::tempdir().expect("tempdir");

View File

@@ -39,7 +39,7 @@ use codex_mcp::McpRuntimeContext;
use codex_mcp::ToolInfo;
use codex_mcp::ToolPluginProvenance;
use codex_mcp::codex_apps_tools_cache_key;
use codex_mcp::compute_auth_statuses_with_runtime_context;
use codex_mcp::compute_auth_statuses;
use codex_mcp::host_owned_codex_apps_enabled;
use codex_mcp::with_codex_apps_mcp;
@@ -252,11 +252,10 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_environment_manager(
});
}
let auth_status_entries = compute_auth_statuses_with_runtime_context(
let auth_status_entries = compute_auth_statuses(
mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
auth.as_ref(),
McpRuntimeContext::new(Arc::clone(&environment_manager), config.cwd.to_path_buf()),
)
.await;

View File

@@ -11,7 +11,7 @@ use codex_protocol::request_user_input::RequestUserInputArgs;
use codex_protocol::request_user_input::RequestUserInputQuestion;
use codex_protocol::request_user_input::RequestUserInputQuestionOption;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::perform_oauth_login_with_http_client;
use codex_rmcp_client::perform_oauth_login;
use tokio_util::sync::CancellationToken;
use tracing::warn;
@@ -23,7 +23,7 @@ use codex_mcp::ElicitationReviewerHandle;
use codex_mcp::McpOAuthLoginSupport;
use codex_mcp::McpPermissionPromptAutoApproveContext;
use codex_mcp::mcp_permission_prompt_is_auto_approved;
use codex_mcp::oauth_login_support_with_runtime_context;
use codex_mcp::oauth_login_support;
use codex_mcp::resolve_oauth_scopes;
use codex_mcp::should_retry_without_scopes;
@@ -136,46 +136,23 @@ pub(crate) async fn maybe_install_mcp_dependencies(
return;
}
let runtime_context = match turn_context.environments.primary() {
Some(turn_environment) => codex_mcp::McpRuntimeContext::new(
std::sync::Arc::clone(&sess.services.environment_manager),
turn_environment.cwd.to_path_buf(),
),
None => codex_mcp::McpRuntimeContext::new(
std::sync::Arc::clone(&sess.services.environment_manager),
#[allow(deprecated)]
turn_context.cwd.to_path_buf(),
),
};
for (name, server_config) in added {
let oauth_config =
match oauth_login_support_with_runtime_context(&name, &server_config, &runtime_context)
.await
{
McpOAuthLoginSupport::Supported(config) => config,
McpOAuthLoginSupport::Unsupported => continue,
McpOAuthLoginSupport::Unknown(err) => {
warn!("MCP server may or may not require login for dependency {name}: {err}");
continue;
}
};
let oauth_config = match oauth_login_support(&server_config.transport).await {
McpOAuthLoginSupport::Supported(config) => config,
McpOAuthLoginSupport::Unsupported => continue,
McpOAuthLoginSupport::Unknown(err) => {
warn!("MCP server may or may not require login for dependency {name}: {err}");
continue;
}
};
let resolved_scopes = resolve_oauth_scopes(
/*explicit_scopes*/ None,
server_config.scopes.clone(),
oauth_config.discovered_scopes.clone(),
);
let http_client =
match runtime_context.resolve_streamable_http_client(&name, &server_config) {
Ok(http_client) => http_client,
Err(err) => {
warn!("failed to resolve MCP server {name} for dependency login: {err}");
continue;
}
};
let oauth_client_id = server_config.oauth_client_id();
let first_attempt = perform_oauth_login_with_http_client(
let first_attempt = perform_oauth_login(
&name,
&oauth_config.url,
config.mcp_oauth_credentials_store_mode,
@@ -186,13 +163,12 @@ pub(crate) async fn maybe_install_mcp_dependencies(
server_config.oauth_resource.as_deref(),
config.mcp_oauth_callback_port,
config.mcp_oauth_callback_url.as_deref(),
std::sync::Arc::clone(&http_client),
)
.await;
if let Err(err) = first_attempt {
if should_retry_without_scopes(&resolved_scopes, &err) {
if let Err(err) = perform_oauth_login_with_http_client(
if let Err(err) = perform_oauth_login(
&name,
&oauth_config.url,
config.mcp_oauth_credentials_store_mode,
@@ -203,7 +179,6 @@ pub(crate) async fn maybe_install_mcp_dependencies(
server_config.oauth_resource.as_deref(),
config.mcp_oauth_callback_port,
config.mcp_oauth_callback_url.as_deref(),
http_client,
)
.await
{

View File

@@ -323,6 +323,8 @@ impl Session {
effective_mcp_servers_from_configured(mcp_servers, &mcp_config, auth.as_ref());
let host_owned_codex_apps_enabled =
host_owned_codex_apps_enabled(&mcp_config, auth.as_ref());
let auth_statuses =
compute_auth_statuses(mcp_servers.iter(), store_mode, auth.as_ref()).await;
let mcp_runtime_context = match turn_context.environments.primary() {
Some(turn_environment) => McpRuntimeContext::new(
Arc::clone(&self.services.environment_manager),
@@ -334,13 +336,6 @@ impl Session {
turn_context.cwd.to_path_buf(),
),
};
let auth_statuses = compute_auth_statuses_with_runtime_context(
mcp_servers.iter(),
store_mode,
auth.as_ref(),
mcp_runtime_context.clone(),
)
.await;
{
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
guard.cancel();

View File

@@ -315,7 +315,7 @@ use crate::unified_exec::UnifiedExecProcessManager;
use crate::windows_sandbox::WindowsSandboxLevelExt;
use codex_core_plugins::PluginsManager;
use codex_git_utils::get_git_repo_root;
use codex_mcp::compute_auth_statuses_with_runtime_context;
use codex_mcp::compute_auth_statuses;
use codex_mcp::effective_mcp_servers_from_configured;
use codex_mcp::host_owned_codex_apps_enabled;
use codex_otel::SessionTelemetry;

View File

@@ -621,21 +621,15 @@ impl Session {
let auth_manager_clone = Arc::clone(&auth_manager);
let config_for_mcp = Arc::clone(&config);
let mcp_manager_for_mcp = Arc::clone(&mcp_manager);
let environment_manager_for_mcp = Arc::clone(&environment_manager);
let auth_and_mcp_fut = async move {
let auth = auth_manager_clone.auth().await;
let mcp_servers = mcp_manager_for_mcp
.effective_servers(&config_for_mcp, auth.as_ref())
.await;
let mcp_runtime_context = McpRuntimeContext::new(
environment_manager_for_mcp,
config_for_mcp.cwd.to_path_buf(),
);
let auth_statuses = compute_auth_statuses_with_runtime_context(
let auth_statuses = compute_auth_statuses(
mcp_servers.iter(),
config_for_mcp.mcp_oauth_credentials_store_mode,
auth.as_ref(),
mcp_runtime_context,
)
.await;
(auth, mcp_servers, auth_statuses)

View File

@@ -0,0 +1,200 @@
# Exec-server observability proposal
Status: Proposed
## Motivation
`codex exec-server` runs inside execution environments where the surrounding
services can be observed, but the work performed by the exec-server process is
largely opaque. It accepts connections, attaches resumable sessions, spawns
and manages commands, serves filesystem operations, proxies HTTP requests, and
in remote mode maintains a relay connection. Failures in any of these paths
currently have little context beyond the caller-visible error.
The useful Kubernetes analogy is a node agent such as the kubelet: exec-server
is not the workload itself, but a long-running component that receives control
requests and operates workloads within its environment. Kubernetes components
provide component logs, configurable metrics and traces, and health endpoints
without treating workload output as component telemetry. Exec-server should
follow the same boundary.
## Goals
- Make standalone local and remote exec-server instances observable through the
existing Codex OpenTelemetry and `tracing` infrastructure.
- Provide enough lifecycle context to distinguish connectivity, protocol,
session, subprocess, filesystem, and HTTP proxy failures.
- Keep stdout reserved for transport behavior and emit component logs only to
stderr or configured exporters.
- Avoid recording user content, secrets, or unbounded-cardinality data as
telemetry.
- Require explicit operator opt-in before exporting metrics from a
containerized exec-server instance.
## Non-goals
- Capturing child-process stdout or stderr as application telemetry.
- Logging commands, environment variables, filesystem contents, HTTP bodies,
authorization data, or full URLs.
- Adding a separate telemetry backend or exec-server-specific configuration
format.
- Changing the JSON-RPC or relay protocols for the initial increment.
## Current state
The exec-server crate already emits a small number of `tracing` events for
transport, session, and error conditions. The websocket listener also exposes
`/readyz`. However, the standalone `codex exec-server` CLI path does not
install a tracing subscriber or construct an OTEL provider, so those events
are not useful in the container deployment that most needs them.
Other Codex long-running entrypoints already load `Config`, construct an OTEL
provider through `codex_core::otel_init::build_provider`, attach logger and
tracing layers to `tracing_subscriber`, and record process-start metrics when
metrics are enabled. Exec-server should reuse that path rather than create
telemetry machinery in the protocol and execution crate.
## Signal model
The following Kubernetes component concepts map cleanly to exec-server:
| Kubernetes concept | Exec-server equivalent |
| --- | --- |
| Component logs | Structured events about server, connection, session, relay, and managed-operation lifecycle |
| Component metrics | Bounded counters, gauges, and duration histograms for transport and managed operations |
| Component traces | Spans from an inbound RPC through a process, filesystem, HTTP, or relay action |
| `/readyz` and `/livez` | Whether the service can accept work and whether the process runtime is alive |
| Workload logs | Command stdout/stderr, which remain protocol output and are not component telemetry |
All signals must use low-cardinality operation names and outcomes. Session IDs
and process IDs may be useful for log or trace correlation after a privacy
review, but must not become metric labels. Commands, paths, body content, auth
material, environment values, and full URLs are excluded from emitted data.
## Initial increment: opt-in OTEL bootstrap
The first change should expose the tracing already present in exec-server and
establish the exporter path needed for later instrumentation.
Implementation ownership remains in `codex-rs/cli/src/main.rs`, where
`codex exec-server` is launched and where `codex-core` configuration and OTEL
bootstrap dependencies already exist:
1. Load Codex configuration for every `exec-server` invocation, including
local listener mode. Remote mode reuses the same loaded config for
authentication.
2. Build an OTEL provider with service name `codex-exec-server` and
`default_analytics_enabled = false`.
3. Attach a stderr formatting layer filtered through `RUST_LOG`, plus the
configured OTEL logger and tracing layers, before entering local serving or
remote relay operation.
4. Invoke the existing process-start metric helper. It records only when the
operator has explicitly enabled metrics export.
5. Retain the provider for the server process lifetime so exporters can flush
on shutdown.
This increment intentionally does not install SQLite telemetry: unlike
app-server and MCP server, standalone exec-server does not initialize the
Codex state database.
### Configuration behavior
The existing configuration surface applies without new CLI flags:
| Signal | Enablement |
| --- | --- |
| Local stderr logs | Controlled by `RUST_LOG`; written to stderr only |
| Exported logs | Configure `otel.exporter` |
| Exported traces | Configure `otel.trace_exporter` |
| Exported metrics | Set `analytics.enabled = true`; optionally override `otel.metrics_exporter` |
When `analytics.enabled` is absent, exec-server does not export metrics. This
differs intentionally from interactive entrypoints that can default analytics
on: exec-server may run as a long-lived container component, so telemetry
export is an operator deployment choice. If analytics is enabled without a
metrics exporter override, existing configuration defaults apply.
### Transport invariants
- Local websocket mode continues to write the bound `ws://` startup URL to
stdout for existing launch scripts.
- Stdio mode continues to reserve stdout for JSON-RPC frames.
- Subscriber output is always sent to stderr or OTEL exporters.
- Local startup now loads configuration before opening its listener; malformed
strict configuration continues to fail before serving work.
## Follow-on increments
Once the exporter path is deployed and its operational value is verified,
later changes can add signals at existing component boundaries.
### Lifecycle events and spans
Emit bounded structured events and spans for:
- process startup and shutdown;
- local connections and remote relay registration, connection, disconnection,
and retry;
- session attach, resume, and eviction;
- RPC completion by method family and outcome;
- managed process start, exit, and termination;
- filesystem and HTTP proxy operation completion by operation family and
outcome.
These events should identify operation type and status without including
payload content. Trace-context propagation over the exec-server protocol can
be considered separately after local spans are useful.
### Metrics
Add OTEL instruments only with fixed label domains, such as transport,
operation family, result, and HTTP status class:
| Candidate metric | Instrument | Labels |
| --- | --- | --- |
| `exec_server.connections.active` | gauge | `transport` |
| `exec_server.connections.total` | counter | `transport`, `result` |
| `exec_server.requests.total` | counter | `operation`, `result` |
| `exec_server.request.duration` | histogram | `operation`, `result` |
| `exec_server.processes.active` | gauge | none |
| `exec_server.process.duration` | histogram | `result` |
| `exec_server.relay.reconnects` | counter | `reason` |
Metric labels must never include session IDs, process IDs, commands, paths,
hostnames, or URLs.
### Health
Websocket mode already returns success from `/readyz`. A later increment
should separate:
- `/livez`: the server runtime is responsive and has not begun shutdown.
- `/readyz`: the server can accept useful work.
For remote environment mode, readiness should reflect an active registration
and relay connection. Transient downstream failures should affect readiness
rather than liveness, so container restarts do not amplify backend outages.
## Validation and rollout
The initial implementation should test that:
- local websocket startup still writes its endpoint on stdout while an enabled
`RUST_LOG` filter makes exec-server component logs visible on stderr;
- configured log and trace exporters can be constructed under the
`codex-exec-server` service name;
- metrics are absent by default and present only when analytics is explicitly
enabled;
- strict configuration errors still prevent listener startup.
Deployment can start by enabling stderr collection for container instances,
then opting selected environments into OTEL log or trace export. Subsequent
signal additions should use the redaction and cardinality rules above and be
validated against real incident queries before adding further volume.
## References
- [Kubernetes logging architecture](https://kubernetes.io/docs/concepts/cluster-administration/logging/)
- [Kubernetes system component metrics](https://kubernetes.io/docs/concepts/cluster-administration/system-metrics/)
- [Kubernetes system component traces](https://kubernetes.io/docs/concepts/cluster-administration/system-traces/)
- [Kubernetes API health endpoints](https://kubernetes.io/docs/reference/using-api/health-checks/)

View File

@@ -20,8 +20,6 @@ pub(crate) mod response_body_stream;
#[path = "rpc_http_client.rs"]
mod rpc_http_client;
pub use reqwest_http_client::HTTP_REQUEST_NO_PROXY_HEADER;
pub use reqwest_http_client::HTTP_REQUEST_NO_REDIRECTS_HEADER;
pub(crate) use reqwest_http_client::PendingReqwestHttpBodyStream;
pub use reqwest_http_client::ReqwestHttpClient;
pub(crate) use reqwest_http_client::ReqwestHttpRequestRunner;

View File

@@ -35,9 +35,6 @@ use crate::rpc::invalid_params;
#[derive(Clone, Default)]
pub struct ReqwestHttpClient;
pub const HTTP_REQUEST_NO_PROXY_HEADER: &str = "x-codex-internal-http-no-proxy";
pub const HTTP_REQUEST_NO_REDIRECTS_HEADER: &str = "x-codex-internal-http-no-redirects";
/// Streaming response state held between the initial HTTP response and
/// downstream body-delta forwarding.
pub(crate) struct PendingReqwestHttpBodyStream {
@@ -52,22 +49,13 @@ pub(crate) struct ReqwestHttpRequestRunner {
}
impl ReqwestHttpClient {
fn build_client(
timeout_ms: Option<u64>,
request_policy: HttpRequestPolicy,
) -> Result<reqwest::Client, ExecServerError> {
let mut builder = match timeout_ms {
fn build_client(timeout_ms: Option<u64>) -> Result<reqwest::Client, ExecServerError> {
let builder = match timeout_ms {
None => reqwest::Client::builder(),
Some(timeout_ms) => {
reqwest::Client::builder().timeout(Duration::from_millis(timeout_ms))
}
};
if request_policy.no_proxy {
builder = builder.no_proxy();
}
if request_policy.no_redirects {
builder = builder.redirect(reqwest::redirect::Policy::none());
}
build_reqwest_client_with_custom_ca(builder)
.map_err(|error| ExecServerError::HttpRequest(error.to_string()))
}
@@ -79,7 +67,7 @@ impl HttpClient for ReqwestHttpClient {
params: HttpRequestParams,
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>> {
async move {
let runner = ReqwestHttpRequestRunner::new(&params)
let runner = ReqwestHttpRequestRunner::new(params.timeout_ms)
.map_err(|error| ExecServerError::HttpRequest(error.message))?;
let (response, _) = runner
.run(HttpRequestParams {
@@ -98,7 +86,7 @@ impl HttpClient for ReqwestHttpClient {
params: HttpRequestParams,
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>> {
async move {
let runner = ReqwestHttpRequestRunner::new(&params)
let runner = ReqwestHttpRequestRunner::new(params.timeout_ms)
.map_err(|error| ExecServerError::HttpRequest(error.message))?;
let (response, pending_stream) = runner
.run(HttpRequestParams {
@@ -122,12 +110,9 @@ impl HttpClient for ReqwestHttpClient {
}
impl ReqwestHttpRequestRunner {
pub(crate) fn new(params: &HttpRequestParams) -> Result<Self, JSONRPCErrorError> {
let client = ReqwestHttpClient::build_client(
params.timeout_ms,
HttpRequestPolicy::from_headers(&params.headers),
)
.map_err(|error| internal_error(error.to_string()))?;
pub(crate) fn new(timeout_ms: Option<u64>) -> Result<Self, JSONRPCErrorError> {
let client = ReqwestHttpClient::build_client(timeout_ms)
.map_err(|error| internal_error(error.to_string()))?;
Ok(Self { client })
}
@@ -254,9 +239,6 @@ impl ReqwestHttpRequestRunner {
fn build_headers(headers: Vec<HttpHeader>) -> Result<HeaderMap, JSONRPCErrorError> {
let mut header_map = HeaderMap::new();
for header in headers {
if is_internal_http_request_header(&header.name) {
continue;
}
let name = HeaderName::from_bytes(header.name.as_bytes()).map_err(|error| {
invalid_params(format!("http/request header name is invalid: {error}"))
})?;
@@ -283,72 +265,3 @@ impl ReqwestHttpRequestRunner {
.collect()
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
struct HttpRequestPolicy {
no_proxy: bool,
no_redirects: bool,
}
impl HttpRequestPolicy {
fn from_headers(headers: &[HttpHeader]) -> Self {
Self {
no_proxy: has_internal_http_request_header(headers, HTTP_REQUEST_NO_PROXY_HEADER),
no_redirects: has_internal_http_request_header(
headers,
HTTP_REQUEST_NO_REDIRECTS_HEADER,
),
}
}
}
fn has_internal_http_request_header(headers: &[HttpHeader], expected_name: &str) -> bool {
headers
.iter()
.any(|header| header.name.eq_ignore_ascii_case(expected_name))
}
fn is_internal_http_request_header(header_name: &str) -> bool {
header_name.eq_ignore_ascii_case(HTTP_REQUEST_NO_PROXY_HEADER)
|| header_name.eq_ignore_ascii_case(HTTP_REQUEST_NO_REDIRECTS_HEADER)
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn request_policy_reads_internal_headers_without_forwarding_them() {
let headers = vec![
HttpHeader {
name: HTTP_REQUEST_NO_PROXY_HEADER.to_string(),
value: "true".to_string(),
},
HttpHeader {
name: HTTP_REQUEST_NO_REDIRECTS_HEADER.to_string(),
value: "true".to_string(),
},
HttpHeader {
name: "accept".to_string(),
value: "application/json".to_string(),
},
];
assert_eq!(
HttpRequestPolicy::from_headers(&headers),
HttpRequestPolicy {
no_proxy: true,
no_redirects: true,
}
);
assert_eq!(
ReqwestHttpRequestRunner::build_headers(headers)
.expect("headers should build")
.get("accept")
.expect("accept header should be forwarded"),
"application/json"
);
}
}

View File

@@ -94,22 +94,6 @@ impl EnvironmentManager {
Self::from_snapshot(provider.snapshot().await?, local_runtime_paths)
}
/// Builds a manager from `CODEX_HOME` while omitting the synthetic local
/// environment from the loaded snapshot.
///
/// This preserves configured remote environments for callers that need
/// environment-aware HTTP routing but do not have local runtime paths
/// available to construct the local execution environment.
pub async fn from_codex_home_without_local(
codex_home: impl AsRef<std::path::Path>,
) -> Result<Self, ExecServerError> {
let provider = environment_provider_from_codex_home(codex_home.as_ref())?;
Self::from_snapshot(
without_local_environment(provider.snapshot().await?),
/*local_runtime_paths*/ None,
)
}
/// Builds a manager from the legacy environment-variable provider without
/// reading user config files from `CODEX_HOME`.
pub async fn from_env(
@@ -294,16 +278,6 @@ impl EnvironmentManager {
}
}
fn without_local_environment(
mut snapshot: EnvironmentProviderSnapshot,
) -> EnvironmentProviderSnapshot {
snapshot.include_local = false;
if snapshot.default == EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string()) {
snapshot.default = EnvironmentDefault::Disabled;
}
snapshot
}
/// Concrete execution/filesystem environment selected for a session.
///
/// This bundles the selected backend metadata together with the local runtime
@@ -459,7 +433,6 @@ mod tests {
use super::EnvironmentManager;
use super::LOCAL_ENVIRONMENT_ID;
use super::REMOTE_ENVIRONMENT_ID;
use super::without_local_environment;
use crate::ExecServerRuntimePaths;
use crate::ProcessId;
use crate::environment_provider::EnvironmentDefault;
@@ -755,16 +728,16 @@ mod tests {
#[tokio::test]
async fn environment_manager_snapshot_without_local_environment_disables_local_default() {
let snapshot = EnvironmentProviderSnapshot {
let mut snapshot = EnvironmentProviderSnapshot {
environments: Vec::new(),
default: EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string()),
include_local: true,
};
let manager = EnvironmentManager::from_snapshot(
without_local_environment(snapshot),
/*local_runtime_paths*/ None,
)
.expect("environment manager");
snapshot.include_local = false;
snapshot.default = EnvironmentDefault::Disabled;
let manager =
EnvironmentManager::from_snapshot(snapshot, /*local_runtime_paths*/ None)
.expect("environment manager");
assert!(manager.default_environment().is_none());
assert_eq!(manager.default_environment_id(), None);
@@ -772,37 +745,6 @@ mod tests {
assert_local_environment_unavailable(&manager);
}
#[tokio::test]
async fn environment_manager_snapshot_without_local_environment_keeps_remote_default() {
let snapshot = EnvironmentProviderSnapshot {
environments: vec![(
REMOTE_ENVIRONMENT_ID.to_string(),
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
.expect("remote environment"),
)],
default: EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string()),
include_local: true,
};
let manager = EnvironmentManager::from_snapshot(
without_local_environment(snapshot),
/*local_runtime_paths*/ None,
)
.expect("environment manager");
assert_eq!(
manager.default_environment_id(),
Some(REMOTE_ENVIRONMENT_ID)
);
assert!(
manager
.default_environment()
.expect("remote default environment")
.is_remote()
);
assert!(manager.get_environment(LOCAL_ENVIRONMENT_ID).is_none());
assert_local_environment_unavailable(&manager);
}
#[tokio::test]
async fn get_environment_returns_none_for_unknown_id() {
let manager = EnvironmentManager::default_for_tests();

View File

@@ -25,8 +25,6 @@ mod server;
pub use client::ExecServerClient;
pub use client::ExecServerError;
pub use client::http_client::HTTP_REQUEST_NO_PROXY_HEADER;
pub use client::http_client::HTTP_REQUEST_NO_REDIRECTS_HEADER;
pub use client::http_client::HttpResponseBodyStream;
pub use client::http_client::ReqwestHttpClient;
pub use client_api::ExecServerClientConnectOptions;

View File

@@ -178,7 +178,9 @@ impl ExecServerHandler {
if stream_response {
self.reserve_http_body_stream(&http_request_id).await?;
}
let response = ReqwestHttpRequestRunner::new(&params)?.run(params).await;
let response = ReqwestHttpRequestRunner::new(params.timeout_ms)?
.run(params)
.await;
if response.is_err() && stream_response {
self.release_http_body_stream(&http_request_id).await;
}

View File

@@ -1,23 +1,20 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Error;
use anyhow::Result;
use codex_exec_server::HttpClient;
use codex_exec_server::HttpRequestParams;
use codex_exec_server::ReqwestHttpClient;
use codex_protocol::protocol::McpAuthStatus;
use reqwest::Client;
use reqwest::StatusCode;
use reqwest::Url;
use reqwest::header::AUTHORIZATION;
use reqwest::header::HeaderMap;
use reqwest::header::WWW_AUTHENTICATE;
use serde::Deserialize;
use tracing::debug;
use crate::oauth::has_oauth_tokens;
use crate::utils::apply_default_headers;
use crate::utils::build_default_headers;
use crate::utils::oauth_discovery_headers;
use codex_config::types::OAuthCredentialsStoreMode;
const DISCOVERY_TIMEOUT: Duration = Duration::from_secs(5);
@@ -29,14 +26,6 @@ pub struct StreamableHttpOAuthDiscovery {
pub scopes_supported: Option<Vec<String>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct StreamableHttpOAuthMetadata {
pub(crate) authorization_endpoint: String,
pub(crate) token_endpoint: String,
pub(crate) registration_endpoint: Option<String>,
pub(crate) scopes_supported: Option<Vec<String>>,
}
/// Determine the authentication status for a streamable HTTP MCP server.
pub async fn determine_streamable_http_auth_status(
server_name: &str,
@@ -45,7 +34,6 @@ pub async fn determine_streamable_http_auth_status(
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
store_mode: OAuthCredentialsStoreMode,
http_client: Arc<dyn HttpClient>,
) -> Result<McpAuthStatus> {
if bearer_token_env_var.is_some() {
return Ok(McpAuthStatus::BearerToken);
@@ -60,7 +48,7 @@ pub async fn determine_streamable_http_auth_status(
return Ok(McpAuthStatus::OAuth);
}
match discover_streamable_http_oauth_with_headers(url, &default_headers, http_client).await {
match discover_streamable_http_oauth_with_headers(url, &default_headers).await {
Ok(Some(_)) => Ok(McpAuthStatus::NotLoggedIn),
Ok(None) => Ok(McpAuthStatus::Unsupported),
Err(error) => {
@@ -74,18 +62,8 @@ pub async fn determine_streamable_http_auth_status(
/// Attempt to determine whether a streamable HTTP MCP server advertises OAuth login.
pub async fn supports_oauth_login(url: &str) -> Result<bool> {
supports_oauth_login_with_http_client(url, Arc::new(ReqwestHttpClient)).await
}
pub async fn supports_oauth_login_with_http_client(
url: &str,
http_client: Arc<dyn HttpClient>,
) -> Result<bool> {
Ok(discover_streamable_http_oauth_with_http_client(
url,
/*http_headers*/ None,
/*env_http_headers*/ None,
http_client,
Ok(discover_streamable_http_oauth(
url, /*http_headers*/ None, /*env_http_headers*/ None,
)
.await?
.is_some())
@@ -96,54 +74,63 @@ pub async fn discover_streamable_http_oauth(
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
) -> Result<Option<StreamableHttpOAuthDiscovery>> {
discover_streamable_http_oauth_with_http_client(
url,
http_headers,
env_http_headers,
Arc::new(ReqwestHttpClient),
)
.await
}
pub async fn discover_streamable_http_oauth_with_http_client(
url: &str,
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
http_client: Arc<dyn HttpClient>,
) -> Result<Option<StreamableHttpOAuthDiscovery>> {
Ok(
discover_streamable_http_oauth_metadata(url, http_headers, env_http_headers, http_client)
.await?
.map(|metadata| StreamableHttpOAuthDiscovery {
scopes_supported: metadata.scopes_supported,
}),
)
}
pub(crate) async fn discover_streamable_http_oauth_metadata(
url: &str,
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
http_client: Arc<dyn HttpClient>,
) -> Result<Option<StreamableHttpOAuthMetadata>> {
let default_headers = build_default_headers(http_headers, env_http_headers)?;
discover_streamable_http_oauth_with_headers(url, &default_headers, http_client).await
discover_streamable_http_oauth_with_headers(url, &default_headers).await
}
async fn discover_streamable_http_oauth_with_headers(
url: &str,
default_headers: &HeaderMap,
http_client: Arc<dyn HttpClient>,
) -> Result<Option<StreamableHttpOAuthMetadata>> {
) -> Result<Option<StreamableHttpOAuthDiscovery>> {
let base_url = Url::parse(url)?;
let request_headers = oauth_request_headers(default_headers)?;
if let Some(metadata) =
try_discover_oauth_server(&base_url, &request_headers, Arc::clone(&http_client)).await?
{
return Ok(Some(metadata));
// Use no_proxy to avoid a bug in the system-configuration crate that
// can result in a panic. See #8912.
let builder = Client::builder().timeout(DISCOVERY_TIMEOUT).no_proxy();
let client = apply_default_headers(builder, default_headers).build()?;
let mut last_error: Option<Error> = None;
for candidate_path in discovery_paths(base_url.path()) {
let mut discovery_url = base_url.clone();
discovery_url.set_path(&candidate_path);
let response = match client
.get(discovery_url.clone())
.header(OAUTH_DISCOVERY_HEADER, OAUTH_DISCOVERY_VERSION)
.send()
.await
{
Ok(response) => response,
Err(err) => {
last_error = Some(err.into());
continue;
}
};
if response.status() != StatusCode::OK {
continue;
}
let metadata = match response.json::<OAuthDiscoveryMetadata>().await {
Ok(metadata) => metadata,
Err(err) => {
last_error = Some(err.into());
continue;
}
};
if metadata.authorization_endpoint.is_some() && metadata.token_endpoint.is_some() {
return Ok(Some(StreamableHttpOAuthDiscovery {
scopes_supported: normalize_scopes(metadata.scopes_supported),
}));
}
}
discover_oauth_server_via_resource_metadata(&base_url, &request_headers, http_client).await
if let Some(err) = last_error {
debug!("OAuth discovery requests failed for {url}: {err:?}");
}
Ok(None)
}
#[derive(Debug, Deserialize)]
@@ -153,19 +140,9 @@ struct OAuthDiscoveryMetadata {
#[serde(default)]
token_endpoint: Option<String>,
#[serde(default)]
registration_endpoint: Option<String>,
#[serde(default)]
scopes_supported: Option<Vec<String>>,
}
#[derive(Debug, Deserialize)]
struct ResourceServerMetadata {
#[serde(default)]
authorization_server: Option<String>,
#[serde(default)]
authorization_servers: Option<Vec<String>>,
}
fn normalize_scopes(scopes_supported: Option<Vec<String>>) -> Option<Vec<String>> {
let scopes_supported = scopes_supported?;
@@ -188,326 +165,30 @@ fn normalize_scopes(scopes_supported: Option<Vec<String>>) -> Option<Vec<String>
}
}
fn oauth_request_headers(
default_headers: &HeaderMap,
) -> Result<Vec<codex_exec_server::HttpHeader>> {
let mut request_headers = default_headers.clone();
request_headers.insert(OAUTH_DISCOVERY_HEADER, OAUTH_DISCOVERY_VERSION.parse()?);
Ok(oauth_discovery_headers(&request_headers))
}
async fn try_discover_oauth_server(
base_url: &Url,
request_headers: &[codex_exec_server::HttpHeader],
http_client: Arc<dyn HttpClient>,
) -> Result<Option<StreamableHttpOAuthMetadata>> {
for discovery_url in discovery_urls(base_url) {
if let Some(metadata) =
fetch_authorization_metadata(&discovery_url, request_headers, Arc::clone(&http_client))
.await?
{
return Ok(Some(metadata));
}
}
Ok(None)
}
async fn discover_oauth_server_via_resource_metadata(
base_url: &Url,
request_headers: &[codex_exec_server::HttpHeader],
http_client: Arc<dyn HttpClient>,
) -> Result<Option<StreamableHttpOAuthMetadata>> {
let Some(resource_metadata_url) =
discover_resource_metadata_url(base_url, request_headers, Arc::clone(&http_client)).await?
else {
return Ok(None);
};
let Some(resource_metadata) = fetch_resource_metadata_from_url(
&resource_metadata_url,
request_headers,
Arc::clone(&http_client),
)
.await?
else {
return Ok(None);
};
let candidates = resource_metadata.authorization_server.into_iter().chain(
resource_metadata
.authorization_servers
.into_iter()
.flatten(),
);
for candidate in candidates {
let candidate = candidate.trim();
if candidate.is_empty() {
continue;
}
let candidate_url = match Url::parse(candidate) {
Ok(url) => url,
Err(_) => match resource_metadata_url.join(candidate) {
Ok(url) => url,
Err(err) => {
debug!("failed to resolve authorization server URL `{candidate}`: {err}");
continue;
}
},
};
if candidate_url.path().contains("/.well-known/") {
if let Some(metadata) = fetch_authorization_metadata(
&candidate_url,
request_headers,
Arc::clone(&http_client),
)
.await?
{
return Ok(Some(metadata));
}
continue;
}
if let Some(metadata) =
try_discover_oauth_server(&candidate_url, request_headers, Arc::clone(&http_client))
.await?
{
return Ok(Some(metadata));
}
}
Ok(None)
}
async fn discover_resource_metadata_url(
base_url: &Url,
request_headers: &[codex_exec_server::HttpHeader],
http_client: Arc<dyn HttpClient>,
) -> Result<Option<Url>> {
if let Some(resource_metadata_url) = fetch_resource_metadata_url(
base_url,
base_url,
request_headers,
Arc::clone(&http_client),
)
.await?
{
return Ok(Some(resource_metadata_url));
}
for candidate_path in well_known_paths(base_url.path(), "oauth-protected-resource") {
let mut discovery_url = base_url.clone();
discovery_url.set_query(None);
discovery_url.set_fragment(None);
discovery_url.set_path(&candidate_path);
if let Some(resource_metadata_url) = fetch_resource_metadata_url(
&discovery_url,
base_url,
request_headers,
Arc::clone(&http_client),
)
.await?
{
return Ok(Some(resource_metadata_url));
}
}
Ok(None)
}
async fn fetch_resource_metadata_url(
url: &Url,
base_url: &Url,
request_headers: &[codex_exec_server::HttpHeader],
http_client: Arc<dyn HttpClient>,
) -> Result<Option<Url>> {
let response = match oauth_get(url, request_headers, http_client).await {
Ok(response) => response,
Err(err) => {
debug!("resource metadata probe failed for {url}: {err:?}");
return Ok(None);
}
};
if response.status == StatusCode::OK.as_u16() {
return Ok(Some(url.clone()));
}
if response.status != StatusCode::UNAUTHORIZED.as_u16() {
return Ok(None);
}
Ok(response
.headers
.iter()
.filter(|header| header.name.eq_ignore_ascii_case(WWW_AUTHENTICATE.as_str()))
.find_map(|header| extract_resource_metadata_url_from_header(&header.value, base_url)))
}
async fn fetch_resource_metadata_from_url(
resource_metadata_url: &Url,
request_headers: &[codex_exec_server::HttpHeader],
http_client: Arc<dyn HttpClient>,
) -> Result<Option<ResourceServerMetadata>> {
let response = match oauth_get(resource_metadata_url, request_headers, http_client).await {
Ok(response) => response,
Err(err) => {
debug!("resource metadata request failed for {resource_metadata_url}: {err:?}");
return Ok(None);
}
};
if response.status != StatusCode::OK.as_u16() {
return Ok(None);
}
Ok(Some(serde_json::from_slice(&response.body.into_inner())?))
}
async fn fetch_authorization_metadata(
discovery_url: &Url,
request_headers: &[codex_exec_server::HttpHeader],
http_client: Arc<dyn HttpClient>,
) -> Result<Option<StreamableHttpOAuthMetadata>> {
let response = match oauth_get(discovery_url, request_headers, http_client).await {
Ok(response) => response,
Err(err) => {
debug!("OAuth discovery request failed for {discovery_url}: {err:?}");
return Ok(None);
}
};
if response.status != StatusCode::OK.as_u16() {
return Ok(None);
}
let metadata =
match serde_json::from_slice::<OAuthDiscoveryMetadata>(&response.body.into_inner()) {
Ok(metadata) => metadata,
Err(err) => {
debug!("failed to parse OAuth metadata for {discovery_url}: {err}");
return Ok(None);
}
};
let (Some(authorization_endpoint), Some(token_endpoint)) =
(metadata.authorization_endpoint, metadata.token_endpoint)
else {
return Ok(None);
};
Ok(Some(StreamableHttpOAuthMetadata {
authorization_endpoint,
token_endpoint,
registration_endpoint: metadata.registration_endpoint,
scopes_supported: normalize_scopes(metadata.scopes_supported),
}))
}
async fn oauth_get(
url: &Url,
request_headers: &[codex_exec_server::HttpHeader],
http_client: Arc<dyn HttpClient>,
) -> Result<codex_exec_server::HttpRequestResponse> {
http_client
.http_request(HttpRequestParams {
method: "GET".to_string(),
url: url.to_string(),
headers: request_headers.to_vec(),
body: None,
timeout_ms: Some(DISCOVERY_TIMEOUT.as_millis() as u64),
request_id: "oauth-discovery".to_string(),
stream_response: false,
})
.await
.map_err(Into::into)
}
/// Implements MCP authorization server metadata discovery priority.
fn discovery_urls(base_url: &Url) -> Vec<Url> {
discovery_paths(base_url.path())
.into_iter()
.map(|candidate_path| {
let mut discovery_url = base_url.clone();
discovery_url.set_query(None);
discovery_url.set_fragment(None);
discovery_url.set_path(&candidate_path);
discovery_url
})
.collect()
}
/// Implements RFC 8414 section 3.1 for discovering well-known oauth endpoints.
/// This is a requirement for MCP servers to support OAuth.
/// https://datatracker.ietf.org/doc/html/rfc8414#section-3.1
/// https://github.com/modelcontextprotocol/rust-sdk/blob/main/crates/rmcp/src/transport/auth.rs#L182
fn discovery_paths(base_path: &str) -> Vec<String> {
let trimmed = base_path.trim_start_matches('/').trim_end_matches('/');
if trimmed.is_empty() {
return vec![
"/.well-known/oauth-authorization-server".to_string(),
"/.well-known/openid-configuration".to_string(),
];
}
vec![
format!("/.well-known/oauth-authorization-server/{trimmed}"),
format!("/.well-known/openid-configuration/{trimmed}"),
format!("/{trimmed}/.well-known/openid-configuration"),
"/.well-known/oauth-authorization-server".to_string(),
]
}
let canonical = "/.well-known/oauth-authorization-server".to_string();
fn well_known_paths(base_path: &str, suffix: &str) -> Vec<String> {
let trimmed = base_path.trim_start_matches('/').trim_end_matches('/');
let canonical = format!("/.well-known/{suffix}");
if trimmed.is_empty() {
return vec![canonical];
}
vec![
format!("{canonical}/{trimmed}"),
format!("/{trimmed}/.well-known/{suffix}"),
canonical,
]
}
fn extract_resource_metadata_url_from_header(header: &str, base_url: &Url) -> Option<Url> {
let header_lowercase = header.to_ascii_lowercase();
let fragment_key = "resource_metadata=";
let mut search_offset = 0;
while let Some(position) = header_lowercase[search_offset..].find(fragment_key) {
let value_offset = search_offset + position + fragment_key.len();
let value_slice = &header[value_offset..];
if let Some((value, consumed)) = parse_next_header_value(value_slice) {
if let Ok(url) = Url::parse(&value) {
return Some(url);
}
if let Ok(url) = base_url.join(&value) {
return Some(url);
}
search_offset = value_offset + consumed;
continue;
let mut candidates = Vec::new();
let mut push_unique = |candidate: String| {
if !candidates.contains(&candidate) {
candidates.push(candidate);
}
break;
}
None
}
};
fn parse_next_header_value(header_fragment: &str) -> Option<(String, usize)> {
let trimmed = header_fragment.trim_start();
let leading_ws = header_fragment.len() - trimmed.len();
if let Some(stripped) = trimmed.strip_prefix('"') {
let mut escaped = false;
let mut result = String::new();
for (index, character) in stripped.char_indices() {
if escaped {
result.push(character);
escaped = false;
continue;
}
match character {
'\\' => escaped = true,
'"' => return Some((result, leading_ws + index + 2)),
_ => result.push(character),
}
}
return None;
}
push_unique(format!("{canonical}/{trimmed}"));
push_unique(format!("/{trimmed}/.well-known/oauth-authorization-server"));
push_unique(canonical);
let end = trimmed
.find(|character: char| character == ',' || character == ';' || character.is_whitespace())
.unwrap_or(trimmed.len());
Some((trimmed[..end].to_string(), leading_ws + end))
candidates
}
#[cfg(test)]
@@ -516,118 +197,12 @@ mod tests {
use axum::Json;
use axum::Router;
use axum::routing::get;
use codex_exec_server::ExecServerError;
use codex_exec_server::HTTP_REQUEST_NO_PROXY_HEADER;
use codex_exec_server::HttpClient;
use codex_exec_server::HttpRequestParams;
use codex_exec_server::HttpRequestResponse;
use codex_exec_server::HttpResponseBodyStream;
use codex_exec_server::ReqwestHttpClient;
use futures::FutureExt;
use futures::future::BoxFuture;
use pretty_assertions::assert_eq;
use serial_test::serial;
use std::collections::HashMap;
use std::ffi::OsString;
use tokio::task::JoinHandle;
#[derive(Default)]
struct ScriptedHttpClient {
requests: std::sync::Mutex<Vec<HttpRequestParams>>,
}
impl ScriptedHttpClient {
fn requests(&self) -> Vec<HttpRequestParams> {
self.requests.lock().expect("lock requests").clone()
}
}
impl HttpClient for ScriptedHttpClient {
fn http_request(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>> {
self.requests
.lock()
.expect("lock requests")
.push(params.clone());
async move {
let response = match params.url.as_str() {
"http://remote-only.invalid/.well-known/oauth-authorization-server/mcp" => {
HttpRequestResponse {
status: 404,
headers: Vec::new(),
body: Vec::new().into(),
}
}
"http://remote-only.invalid/.well-known/openid-configuration/mcp" => {
HttpRequestResponse {
status: 200,
headers: Vec::new(),
body: serde_json::to_vec(&serde_json::json!({
"authorization_endpoint": "https://auth.remote.example/oauth/authorize",
"token_endpoint": "https://auth.remote.example/oauth/token",
}))
.expect("serialize oidc metadata")
.into(),
}
}
"http://resource-only.invalid/mcp" => HttpRequestResponse {
status: 401,
headers: vec![codex_exec_server::HttpHeader {
name: "www-authenticate".to_string(),
value: "Bearer resource_metadata=\"/.well-known/oauth-protected-resource/mcp\"".to_string(),
}],
body: Vec::new().into(),
},
"http://resource-only.invalid/.well-known/oauth-protected-resource/mcp" => {
HttpRequestResponse {
status: 200,
headers: Vec::new(),
body: serde_json::to_vec(&serde_json::json!({
"authorization_servers": ["https://auth.remote.example/tenant"],
}))
.expect("serialize resource metadata")
.into(),
}
}
"https://auth.remote.example/.well-known/oauth-authorization-server/tenant" => {
HttpRequestResponse {
status: 200,
headers: Vec::new(),
body: serde_json::to_vec(&serde_json::json!({
"authorization_endpoint": "https://auth.remote.example/oauth/authorize",
"token_endpoint": "https://auth.remote.example/oauth/token",
}))
.expect("serialize oauth metadata")
.into(),
}
}
_ => HttpRequestResponse {
status: 404,
headers: Vec::new(),
body: Vec::new().into(),
},
};
Ok(response)
}
.boxed()
}
fn http_request_stream(
&self,
_params: HttpRequestParams,
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>>
{
async move {
Err(ExecServerError::HttpRequest(
"unexpected stream".to_string(),
))
}
.boxed()
}
}
struct TestServer {
url: String,
handle: JoinHandle<()>,
@@ -708,7 +283,6 @@ mod tests {
)])),
/*env_http_headers*/ None,
OAuthCredentialsStoreMode::Keyring,
Arc::new(ReqwestHttpClient),
)
.await
.expect("status should compute");
@@ -730,7 +304,6 @@ mod tests {
"CODEX_RMCP_CLIENT_AUTH_STATUS_TEST_TOKEN".to_string(),
)])),
OAuthCredentialsStoreMode::Keyring,
Arc::new(ReqwestHttpClient),
)
.await
.expect("status should compute");
@@ -797,52 +370,4 @@ mod tests {
assert!(supported);
}
#[tokio::test]
async fn discover_streamable_http_oauth_uses_openid_fallback_through_selected_client() {
let http_client = Arc::new(ScriptedHttpClient::default());
let discovery = discover_streamable_http_oauth_with_http_client(
"http://remote-only.invalid/mcp",
/*http_headers*/ None,
/*env_http_headers*/ None,
http_client.clone(),
)
.await
.expect("discovery should succeed")
.expect("oauth support should be detected");
assert_eq!(discovery.scopes_supported, None);
assert_eq!(
http_client
.requests()
.iter()
.map(|request| request.url.as_str())
.collect::<Vec<_>>(),
vec![
"http://remote-only.invalid/.well-known/oauth-authorization-server/mcp",
"http://remote-only.invalid/.well-known/openid-configuration/mcp",
]
);
assert!(http_client.requests().iter().all(|request| {
request
.headers
.iter()
.any(|header| header.name == HTTP_REQUEST_NO_PROXY_HEADER)
}));
}
#[tokio::test]
async fn discover_streamable_http_oauth_uses_resource_metadata_fallback() {
let discovery = discover_streamable_http_oauth_with_http_client(
"http://resource-only.invalid/mcp",
/*http_headers*/ None,
/*env_http_headers*/ None,
Arc::new(ScriptedHttpClient::default()),
)
.await
.expect("discovery should succeed")
.expect("oauth support should be detected");
assert_eq!(discovery.scopes_supported, None);
}
}

View File

@@ -9,7 +9,6 @@ use std::time::Duration;
use axum::Router;
use axum::body::Body;
use axum::extract::Json;
use axum::extract::RawQuery;
use axum::extract::State;
use axum::http::HeaderMap;
use axum::http::HeaderValue;
@@ -22,7 +21,6 @@ use axum::http::header::HOST;
use axum::http::header::WWW_AUTHENTICATE;
use axum::middleware;
use axum::middleware::Next;
use axum::response::Redirect;
use axum::response::Response;
use axum::routing::get;
use axum::routing::post;
@@ -155,8 +153,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}),
)
.route("/oauth/authorize", get(fake_oauth_authorize))
.route("/oauth/token", post(fake_oauth_token))
.nest_service(
"/mcp",
StreamableHttpService::new(
@@ -183,34 +179,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
async fn fake_oauth_authorize(RawQuery(query): RawQuery) -> Result<Redirect, StatusCode> {
let params = reqwest::Url::parse(&format!(
"http://localhost/?{}",
query.as_deref().ok_or(StatusCode::BAD_REQUEST)?
))
.map_err(|_| StatusCode::BAD_REQUEST)?
.query_pairs()
.into_owned()
.collect::<HashMap<_, _>>();
let redirect_uri = params.get("redirect_uri").ok_or(StatusCode::BAD_REQUEST)?;
let state = params.get("state").ok_or(StatusCode::BAD_REQUEST)?;
let mut callback_url =
reqwest::Url::parse(redirect_uri).map_err(|_| StatusCode::BAD_REQUEST)?;
callback_url
.query_pairs_mut()
.append_pair("code", "test-authorization-code")
.append_pair("state", state);
Ok(Redirect::temporary(callback_url.as_str()))
}
async fn fake_oauth_token() -> Json<serde_json::Value> {
Json(json!({
"access_token": "test-access-token",
"refresh_token": "test-refresh-token",
"token_type": "bearer",
}))
}
impl ServerHandler for TestToolServer {
fn get_info(&self) -> ServerInfo {
ServerInfo::new(

View File

@@ -14,9 +14,7 @@ mod utils;
pub use auth_status::StreamableHttpOAuthDiscovery;
pub use auth_status::determine_streamable_http_auth_status;
pub use auth_status::discover_streamable_http_oauth;
pub use auth_status::discover_streamable_http_oauth_with_http_client;
pub use auth_status::supports_oauth_login;
pub use auth_status::supports_oauth_login_with_http_client;
pub use codex_protocol::protocol::McpAuthStatus;
pub use in_process_transport::InProcessTransportFactory;
pub use oauth::StoredOAuthTokens;
@@ -28,10 +26,7 @@ pub use perform_oauth_login::OAuthProviderError;
pub use perform_oauth_login::OauthLoginHandle;
pub use perform_oauth_login::perform_oauth_login;
pub use perform_oauth_login::perform_oauth_login_return_url;
pub use perform_oauth_login::perform_oauth_login_return_url_with_http_client;
pub use perform_oauth_login::perform_oauth_login_silent;
pub use perform_oauth_login::perform_oauth_login_silent_with_http_client;
pub use perform_oauth_login::perform_oauth_login_with_http_client;
pub use rmcp::model::ElicitationAction;
pub use rmcp_client::Elicitation;
pub use rmcp_client::ElicitationResponse;

View File

@@ -9,34 +9,12 @@ use anyhow::anyhow;
use anyhow::bail;
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use codex_exec_server::HttpClient;
use codex_exec_server::HttpRequestParams;
use oauth2::AsyncHttpClient;
use oauth2::AuthUrl;
use oauth2::AuthorizationCode;
use oauth2::ClientId;
use oauth2::ClientSecret;
use oauth2::CsrfToken;
use oauth2::EndpointNotSet;
use oauth2::EndpointSet;
use oauth2::HttpRequest;
use oauth2::HttpResponse;
use oauth2::PkceCodeChallenge;
use oauth2::PkceCodeVerifier;
use oauth2::RedirectUrl;
use oauth2::RequestTokenError;
use oauth2::RevocationErrorResponseType;
use oauth2::Scope;
use oauth2::StandardErrorResponse;
use oauth2::StandardRevocableToken;
use oauth2::StandardTokenIntrospectionResponse;
use oauth2::TokenUrl;
use oauth2::basic::BasicErrorResponseType;
use oauth2::basic::BasicTokenType;
use reqwest::ClientBuilder;
use reqwest::Url;
use reqwest::header::HeaderMap;
use rmcp::transport::auth::OAuthTokenResponse;
use rmcp::transport::auth::VendorExtraTokenFields;
use rmcp::transport::AuthorizationManager;
use rmcp::transport::AuthorizationSession;
use rmcp::transport::auth::OAuthClientConfig;
use rmcp::transport::auth::OAuthState;
use sha2::Digest;
use sha2::Sha256;
use tiny_http::Response;
@@ -47,13 +25,10 @@ use urlencoding::decode;
use crate::StoredOAuthTokens;
use crate::WrappedOAuthTokenResponse;
use crate::auth_status::StreamableHttpOAuthMetadata;
use crate::auth_status::discover_streamable_http_oauth_metadata;
use crate::oauth::compute_expires_at_millis;
use crate::save_oauth_tokens;
use crate::utils::apply_default_headers;
use crate::utils::build_default_headers;
use crate::utils::oauth_token_headers;
use crate::utils::protocol_headers;
use codex_config::types::OAuthCredentialsStoreMode;
struct OauthHeaders {
@@ -61,159 +36,6 @@ struct OauthHeaders {
env_http_headers: Option<HashMap<String, String>>,
}
type OAuthClient = oauth2::Client<
StandardErrorResponse<BasicErrorResponseType>,
OAuthTokenResponse,
StandardTokenIntrospectionResponse<VendorExtraTokenFields, BasicTokenType>,
StandardRevocableToken,
StandardErrorResponse<RevocationErrorResponseType>,
EndpointSet,
EndpointNotSet,
EndpointNotSet,
EndpointNotSet,
EndpointSet,
>;
type OAuthClientBuilder = oauth2::Client<
StandardErrorResponse<BasicErrorResponseType>,
OAuthTokenResponse,
StandardTokenIntrospectionResponse<VendorExtraTokenFields, BasicTokenType>,
StandardRevocableToken,
StandardErrorResponse<RevocationErrorResponseType>,
>;
struct OAuthState {
client: OAuthClient,
client_id: String,
pkce_verifier: PkceCodeVerifier,
csrf_state: CsrfToken,
authorization_url: String,
default_headers: HeaderMap,
http_client: Arc<dyn HttpClient>,
}
impl OAuthState {
fn new(
metadata: StreamableHttpOAuthMetadata,
client: OAuthClientConfig,
redirect_uri: &str,
scopes: &[&str],
default_headers: HeaderMap,
http_client: Arc<dyn HttpClient>,
) -> Result<Self> {
let OAuthClientConfig {
client_id,
client_secret,
} = client;
let mut client = OAuthClientBuilder::new(ClientId::new(client_id.clone()))
.set_auth_uri(AuthUrl::new(metadata.authorization_endpoint)?)
.set_token_uri(TokenUrl::new(metadata.token_endpoint)?)
.set_redirect_uri(RedirectUrl::new(redirect_uri.to_string())?);
if let Some(client_secret) = client_secret {
client = client.set_client_secret(ClientSecret::new(client_secret));
}
let (pkce_challenge, pkce_verifier) = PkceCodeChallenge::new_random_sha256();
let mut request = client
.authorize_url(CsrfToken::new_random)
.set_pkce_challenge(pkce_challenge);
for scope in scopes {
request = request.add_scope(Scope::new((*scope).to_string()));
}
let (authorization_url, csrf_state) = request.url();
Ok(Self {
client,
client_id,
pkce_verifier,
csrf_state,
authorization_url: authorization_url.to_string(),
default_headers,
http_client,
})
}
fn authorization_url(&self) -> &str {
&self.authorization_url
}
async fn handle_callback(
self,
code: &str,
csrf_state: &str,
) -> Result<(String, OAuthTokenResponse)> {
if self.csrf_state.secret() != csrf_state {
bail!("OAuth callback state did not match login request");
}
let http_client = RoutedOAuthHttpClient::new(self.http_client, self.default_headers);
let credentials = match self
.client
.exchange_code(AuthorizationCode::new(code.to_string()))
.set_pkce_verifier(self.pkce_verifier)
.request_async(&http_client)
.await
{
Ok(credentials) => credentials,
Err(RequestTokenError::Parse(_, body)) => {
serde_json::from_slice::<OAuthTokenResponse>(&body)?
}
Err(error) => return Err(anyhow!("OAuth token exchange failed: {error}")),
};
Ok((self.client_id, credentials))
}
}
#[derive(Clone)]
struct RoutedOAuthHttpClient {
http_client: Arc<dyn HttpClient>,
default_headers: HeaderMap,
}
impl RoutedOAuthHttpClient {
fn new(http_client: Arc<dyn HttpClient>, default_headers: HeaderMap) -> Self {
Self {
http_client,
default_headers,
}
}
}
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
struct RoutedOAuthHttpClientError(#[from] anyhow::Error);
impl<'c> AsyncHttpClient<'c> for RoutedOAuthHttpClient {
type Error = RoutedOAuthHttpClientError;
type Future = futures::future::BoxFuture<'c, Result<HttpResponse, Self::Error>>;
fn call(&'c self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
let (parts, body) = request.into_parts();
let mut headers = self.default_headers.clone();
headers.extend(parts.headers);
let response = self
.http_client
.http_request(HttpRequestParams {
method: parts.method.to_string(),
url: parts.uri.to_string(),
headers: oauth_token_headers(&headers),
body: Some(body.into()),
timeout_ms: None,
request_id: "oauth-request".to_string(),
stream_response: false,
})
.await
.map_err(|err| RoutedOAuthHttpClientError(anyhow!(err)))?;
let mut builder = oauth2::http::Response::builder().status(response.status);
for header in response.headers {
builder = builder.header(header.name, header.value);
}
builder
.body(response.body.into_inner())
.map_err(|err: oauth2::http::Error| RoutedOAuthHttpClientError(anyhow!(err)))
})
}
}
struct CallbackServerGuard {
server: Arc<Server>,
}
@@ -266,36 +88,6 @@ pub async fn perform_oauth_login(
oauth_resource: Option<&str>,
callback_port: Option<u16>,
callback_url: Option<&str>,
) -> Result<()> {
perform_oauth_login_with_http_client(
server_name,
server_url,
store_mode,
http_headers,
env_http_headers,
scopes,
oauth_client_id,
oauth_resource,
callback_port,
callback_url,
Arc::new(codex_exec_server::ReqwestHttpClient),
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn perform_oauth_login_with_http_client(
server_name: &str,
server_url: &str,
store_mode: OAuthCredentialsStoreMode,
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
scopes: &[String],
oauth_client_id: Option<&str>,
oauth_resource: Option<&str>,
callback_port: Option<u16>,
callback_url: Option<&str>,
http_client: Arc<dyn HttpClient>,
) -> Result<()> {
perform_oauth_login_with_browser_output(
server_name,
@@ -303,7 +95,6 @@ pub async fn perform_oauth_login_with_http_client(
store_mode,
http_headers,
env_http_headers,
http_client,
scopes,
oauth_client_id,
oauth_resource,
@@ -326,36 +117,6 @@ pub async fn perform_oauth_login_silent(
oauth_resource: Option<&str>,
callback_port: Option<u16>,
callback_url: Option<&str>,
) -> Result<()> {
perform_oauth_login_silent_with_http_client(
server_name,
server_url,
store_mode,
http_headers,
env_http_headers,
scopes,
oauth_client_id,
oauth_resource,
callback_port,
callback_url,
Arc::new(codex_exec_server::ReqwestHttpClient),
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn perform_oauth_login_silent_with_http_client(
server_name: &str,
server_url: &str,
store_mode: OAuthCredentialsStoreMode,
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
scopes: &[String],
oauth_client_id: Option<&str>,
oauth_resource: Option<&str>,
callback_port: Option<u16>,
callback_url: Option<&str>,
http_client: Arc<dyn HttpClient>,
) -> Result<()> {
perform_oauth_login_with_browser_output(
server_name,
@@ -363,7 +124,6 @@ pub async fn perform_oauth_login_silent_with_http_client(
store_mode,
http_headers,
env_http_headers,
http_client,
scopes,
oauth_client_id,
oauth_resource,
@@ -381,7 +141,6 @@ async fn perform_oauth_login_with_browser_output(
store_mode: OAuthCredentialsStoreMode,
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
http_client: Arc<dyn HttpClient>,
scopes: &[String],
oauth_client_id: Option<&str>,
oauth_resource: Option<&str>,
@@ -398,7 +157,6 @@ async fn perform_oauth_login_with_browser_output(
server_url,
store_mode,
headers,
http_client,
scopes,
oauth_client_id,
oauth_resource,
@@ -425,38 +183,6 @@ pub async fn perform_oauth_login_return_url(
timeout_secs: Option<i64>,
callback_port: Option<u16>,
callback_url: Option<&str>,
) -> Result<OauthLoginHandle> {
perform_oauth_login_return_url_with_http_client(
server_name,
server_url,
store_mode,
http_headers,
env_http_headers,
scopes,
oauth_client_id,
oauth_resource,
timeout_secs,
callback_port,
callback_url,
Arc::new(codex_exec_server::ReqwestHttpClient),
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn perform_oauth_login_return_url_with_http_client(
server_name: &str,
server_url: &str,
store_mode: OAuthCredentialsStoreMode,
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
scopes: &[String],
oauth_client_id: Option<&str>,
oauth_resource: Option<&str>,
timeout_secs: Option<i64>,
callback_port: Option<u16>,
callback_url: Option<&str>,
http_client: Arc<dyn HttpClient>,
) -> Result<OauthLoginHandle> {
let headers = OauthHeaders {
http_headers,
@@ -467,7 +193,6 @@ pub async fn perform_oauth_login_return_url_with_http_client(
server_url,
store_mode,
headers,
http_client,
scopes,
oauth_client_id,
oauth_resource,
@@ -721,7 +446,6 @@ impl OauthLoginFlow {
server_url: &str,
store_mode: OAuthCredentialsStoreMode,
headers: OauthHeaders,
http_client: Arc<dyn HttpClient>,
scopes: &[String],
oauth_client_id: Option<&str>,
oauth_resource: Option<&str>,
@@ -752,19 +476,27 @@ impl OauthLoginFlow {
let (tx, rx) = oneshot::channel();
spawn_callback_server(server, tx, callback_path);
let OauthHeaders {
http_headers,
env_http_headers,
} = headers;
let default_headers = build_default_headers(http_headers, env_http_headers)?;
let http_client = apply_default_headers(ClientBuilder::new(), &default_headers).build()?;
let scope_refs: Vec<&str> = scopes.iter().map(String::as_str).collect();
let oauth_state = start_authorization(
server_name,
server_url,
http_client,
headers,
&scope_refs,
&redirect_uri,
oauth_client_id,
)
.await?;
let auth_url =
append_query_param(oauth_state.authorization_url(), "resource", oauth_resource);
let auth_url = append_query_param(
&oauth_state.get_authorization_url().await?,
"resource",
oauth_resource,
);
let timeout_secs = timeout_secs.unwrap_or(DEFAULT_OAUTH_TIMEOUT_SECS).max(1);
let timeout = Duration::from_secs(timeout_secs as u64);
@@ -818,12 +550,19 @@ impl OauthLoginFlow {
CallbackResult::Error(error) => return Err(anyhow!(error)),
};
let (client_id, credentials) = self
.oauth_state
self.oauth_state
.handle_callback(&code, &csrf_state)
.await
.context("failed to handle OAuth callback")?;
let (client_id, credentials_opt) = self
.oauth_state
.get_credentials()
.await
.context("failed to retrieve OAuth credentials")?;
let credentials = credentials_opt
.ok_or_else(|| anyhow!("OAuth provider did not return credentials"))?;
let expires_at = compute_expires_at_millis(&credentials);
let stored = StoredOAuthTokens {
server_name: self.server_name.clone(),
@@ -863,110 +602,34 @@ impl OauthLoginFlow {
}
async fn start_authorization(
server_name: &str,
server_url: &str,
http_client: Arc<dyn HttpClient>,
headers: OauthHeaders,
http_client: reqwest::Client,
scopes: &[&str],
redirect_uri: &str,
oauth_client_id: Option<&str>,
) -> Result<OAuthState> {
let OauthHeaders {
http_headers,
env_http_headers,
} = headers;
let metadata = discover_streamable_http_oauth_metadata(
server_url,
http_headers.clone(),
env_http_headers.clone(),
Arc::clone(&http_client),
)
.await?
.ok_or_else(|| anyhow!("MCP server `{server_name}` does not advertise OAuth metadata"))?;
let default_headers = build_default_headers(http_headers.clone(), env_http_headers.clone())?;
let client = match oauth_client_id.filter(|client_id| !client_id.trim().is_empty()) {
Some(client_id) => OAuthClientConfig {
client_id: client_id.to_string(),
client_secret: None,
},
None => {
register_oauth_client(
&metadata,
redirect_uri,
http_headers,
env_http_headers,
Arc::clone(&http_client),
)
.await?
}
let Some(oauth_client_id) = oauth_client_id.filter(|client_id| !client_id.trim().is_empty())
else {
let mut oauth_state = OAuthState::new(server_url, Some(http_client)).await?;
oauth_state
.start_authorization(scopes, redirect_uri, Some("Codex"))
.await?;
return Ok(oauth_state);
};
OAuthState::new(
metadata,
client,
redirect_uri,
scopes,
default_headers,
http_client,
)
}
async fn register_oauth_client(
metadata: &StreamableHttpOAuthMetadata,
redirect_uri: &str,
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
http_client: Arc<dyn HttpClient>,
) -> Result<OAuthClientConfig> {
let registration_url = metadata
.registration_endpoint
.as_ref()
.ok_or_else(|| anyhow!("OAuth server does not support dynamic client registration"))?;
let default_headers = build_default_headers(http_headers, env_http_headers)?;
let registration_request = serde_json::json!({
"client_name": "Codex",
"redirect_uris": [redirect_uri],
"grant_types": ["authorization_code", "refresh_token"],
"token_endpoint_auth_method": "none",
"response_types": ["code"],
});
let mut headers = default_headers;
headers.insert(reqwest::header::CONTENT_TYPE, "application/json".parse()?);
let response = http_client
.http_request(HttpRequestParams {
method: "POST".to_string(),
url: registration_url.clone(),
headers: protocol_headers(&headers),
body: Some(serde_json::to_vec(&registration_request)?.into()),
timeout_ms: None,
request_id: "oauth-register".to_string(),
stream_response: false,
})
.await?;
if !(200..300).contains(&response.status) {
bail!(
"OAuth dynamic client registration returned HTTP {}",
response.status
);
}
#[derive(serde::Deserialize)]
struct ClientRegistrationResponse {
client_id: String,
#[serde(default)]
client_secret: Option<String>,
}
let response =
serde_json::from_slice::<ClientRegistrationResponse>(&response.body.into_inner())?;
Ok(OAuthClientConfig {
client_id: response.client_id,
client_secret: response
.client_secret
.filter(|client_secret| !client_secret.trim().is_empty()),
})
}
let mut auth_manager = AuthorizationManager::new(server_url).await?;
auth_manager.with_client(http_client)?;
let metadata = auth_manager.discover_metadata().await?;
auth_manager.set_metadata(metadata);
auth_manager.configure_client(
OAuthClientConfig::new(oauth_client_id, redirect_uri)
.with_scopes(scopes.iter().map(|scope| (*scope).to_string()).collect()),
)?;
let auth_url = auth_manager.get_authorization_url(scopes).await?;
struct OAuthClientConfig {
client_id: String,
client_secret: Option<String>,
Ok(OAuthState::Session(
AuthorizationSession::for_scope_upgrade(auth_manager, auth_url, redirect_uri),
))
}
fn append_query_param(url: &str, key: &str, value: Option<&str>) -> String {
@@ -988,31 +651,16 @@ fn append_query_param(url: &str, key: &str, value: Option<&str>) -> String {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::Mutex;
use axum::Json;
use axum::Router;
use axum::routing::get;
use codex_exec_server::ExecServerError;
use codex_exec_server::HTTP_REQUEST_NO_REDIRECTS_HEADER;
use codex_exec_server::HttpClient;
use codex_exec_server::HttpRequestParams;
use codex_exec_server::HttpRequestResponse;
use codex_exec_server::HttpResponseBodyStream;
use codex_exec_server::ReqwestHttpClient;
use futures::FutureExt;
use futures::future::BoxFuture;
use oauth2::AsyncHttpClient;
use pretty_assertions::assert_eq;
use reqwest::Url;
use reqwest::header::HeaderMap;
use serde_json::json;
use tokio::net::TcpListener;
use super::CallbackOutcome;
use super::OAuthProviderError;
use super::OauthHeaders;
use super::append_callback_id_to_redirect_uri;
use super::append_query_param;
use super::callback_id_from_server_url;
@@ -1020,62 +668,6 @@ mod tests {
use super::parse_oauth_callback;
use super::start_authorization;
#[derive(Default)]
struct RemoteOnlyHttpClient {
requests: Mutex<Vec<HttpRequestParams>>,
}
impl RemoteOnlyHttpClient {
fn requests(&self) -> Vec<HttpRequestParams> {
self.requests.lock().expect("lock requests").clone()
}
}
impl HttpClient for RemoteOnlyHttpClient {
fn http_request(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>> {
let url = params.url.clone();
self.requests.lock().expect("lock requests").push(params);
async move {
let metadata = if url == "https://auth.remote.example/oauth/token" {
json!({
"access_token": "access-token",
"token_type": "bearer",
})
} else {
json!({
"authorization_endpoint": "https://auth.remote.example/oauth/authorize",
"token_endpoint": "https://auth.remote.example/oauth/token",
"scopes_supported": ["scope:remote"],
})
};
Ok(HttpRequestResponse {
status: 200,
headers: Vec::new(),
body: serde_json::to_vec(&metadata)
.expect("serialize metadata")
.into(),
})
}
.boxed()
}
fn http_request_stream(
&self,
_params: HttpRequestParams,
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>>
{
async move {
Err(ExecServerError::HttpRequest(
"unexpected stream".to_string(),
))
}
.boxed()
}
}
async fn spawn_oauth_metadata_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0")
.await
@@ -1117,13 +709,8 @@ mod tests {
async fn start_authorization_uses_configured_client_id() {
let base_url = spawn_oauth_metadata_server().await;
let oauth_state = start_authorization(
"server",
&format!("{base_url}/mcp"),
Arc::new(ReqwestHttpClient),
OauthHeaders {
http_headers: None,
env_http_headers: None,
},
reqwest::Client::new(),
&[],
"http://127.0.0.1/callback",
Some("eci-prd-pub-codex-123"),
@@ -1131,8 +718,11 @@ mod tests {
.await
.expect("start oauth authorization");
let auth_url =
Url::parse(oauth_state.authorization_url()).expect("authorization url should parse");
let authorization_url = oauth_state
.get_authorization_url()
.await
.expect("read authorization url");
let auth_url = Url::parse(&authorization_url).expect("authorization url should parse");
let client_id = auth_url
.query_pairs()
.find(|(key, _)| key == "client_id")
@@ -1141,107 +731,6 @@ mod tests {
assert_eq!(client_id.as_deref(), Some("eci-prd-pub-codex-123"));
}
#[tokio::test]
async fn start_authorization_uses_selected_http_client_for_remote_only_server() {
let http_client = Arc::new(RemoteOnlyHttpClient::default());
let oauth_state = start_authorization(
"remote-only",
"http://remote-only.invalid/mcp",
http_client.clone(),
OauthHeaders {
http_headers: None,
env_http_headers: None,
},
&[],
"http://127.0.0.1/callback",
Some("remote-client-id"),
)
.await
.expect("start oauth authorization through selected http client");
let auth_url =
Url::parse(oauth_state.authorization_url()).expect("authorization url should parse");
let client_id = auth_url
.query_pairs()
.find(|(key, _)| key == "client_id")
.map(|(_, value)| value.into_owned());
assert_eq!(client_id.as_deref(), Some("remote-client-id"));
assert_eq!(
http_client
.requests()
.into_iter()
.map(|request| request.url)
.collect::<Vec<_>>(),
vec![
"http://remote-only.invalid/.well-known/oauth-authorization-server/mcp".to_string()
]
);
}
#[tokio::test]
async fn routed_token_request_disables_redirects() {
let http_client = Arc::new(RemoteOnlyHttpClient::default());
let client = super::RoutedOAuthHttpClient::new(http_client.clone(), HeaderMap::new());
let request = oauth2::http::Request::builder()
.method("POST")
.uri("https://auth.remote.example/oauth/token")
.body(Vec::new())
.expect("build token request");
client
.call(request)
.await
.expect("token request should succeed");
assert!(
http_client.requests()[0]
.headers
.iter()
.any(|header| header.name == HTTP_REQUEST_NO_REDIRECTS_HEADER)
);
}
#[tokio::test]
async fn token_exchange_preserves_dynamic_registration_client_secret() {
let http_client = Arc::new(RemoteOnlyHttpClient::default());
let oauth_state = super::OAuthState::new(
super::StreamableHttpOAuthMetadata {
authorization_endpoint: "https://auth.remote.example/oauth/authorize".to_string(),
token_endpoint: "https://auth.remote.example/oauth/token".to_string(),
registration_endpoint: None,
scopes_supported: None,
},
super::OAuthClientConfig {
client_id: "dynamic-client".to_string(),
client_secret: Some("dynamic-secret".to_string()),
},
"http://127.0.0.1/callback",
&[],
HeaderMap::new(),
http_client.clone(),
)
.expect("build oauth state");
let csrf_state = oauth_state.csrf_state.secret().to_string();
oauth_state
.handle_callback("code", &csrf_state)
.await
.expect("token exchange should succeed");
let authorization = http_client
.requests()
.into_iter()
.find(|request| request.url == "https://auth.remote.example/oauth/token")
.and_then(|request| {
request
.headers
.into_iter()
.find(|header| header.name.eq_ignore_ascii_case("authorization"))
})
.expect("token exchange should send authorization header");
assert!(authorization.value.starts_with("Basic "));
}
#[test]
fn parse_oauth_callback_accepts_default_path() {
let parsed = parse_oauth_callback("/callback?code=abc&state=xyz", "/callback");

View File

@@ -1,9 +1,6 @@
use anyhow::Result;
use anyhow::anyhow;
use codex_config::types::McpServerEnvVar;
use codex_exec_server::HTTP_REQUEST_NO_PROXY_HEADER;
use codex_exec_server::HTTP_REQUEST_NO_REDIRECTS_HEADER;
use codex_exec_server::HttpHeader;
use reqwest::ClientBuilder;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
@@ -129,40 +126,6 @@ pub(crate) fn apply_default_headers(
}
}
pub(crate) fn protocol_headers(headers: &HeaderMap) -> Vec<HttpHeader> {
headers
.iter()
.filter_map(|(name, value)| {
value.to_str().ok().map(|value| HttpHeader {
name: name.as_str().to_string(),
value: value.to_string(),
})
})
.collect()
}
pub(crate) fn oauth_discovery_headers(headers: &HeaderMap) -> Vec<HttpHeader> {
internal_request_headers(protocol_headers(headers), [HTTP_REQUEST_NO_PROXY_HEADER])
}
pub(crate) fn oauth_token_headers(headers: &HeaderMap) -> Vec<HttpHeader> {
internal_request_headers(
protocol_headers(headers),
[HTTP_REQUEST_NO_REDIRECTS_HEADER],
)
}
fn internal_request_headers<const N: usize>(
mut headers: Vec<HttpHeader>,
internal_header_names: [&str; N],
) -> Vec<HttpHeader> {
headers.extend(internal_header_names.into_iter().map(|name| HttpHeader {
name: name.to_string(),
value: "true".to_string(),
}));
headers
}
#[cfg(unix)]
pub(crate) const DEFAULT_ENV_VARS: &[&str] = &[
"HOME",

View File

@@ -1,244 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
export CARGO_TARGET_DIR="${CARGO_TARGET_DIR:-${REPO_ROOT}/codex-rs/target}"
REMOTE_SERVER_BIN="${REMOTE_SERVER_BIN:-${CARGO_TARGET_DIR}/debug/test_streamable_http_server}"
CODEX_BIN="${CODEX_BIN:-${CARGO_TARGET_DIR}/debug/codex}"
TMP_DIR="$(mktemp -d "${TMPDIR:-/tmp}/codex-remote-mcp-oauth-e2e.XXXXXX")"
CODEX_HOME="${TMP_DIR}/codex-home"
EXEC_SERVER_PORT=""
EXEC_SERVER_LOG_FILE="${TMP_DIR}/exec-server.log"
EXEC_SERVER_PID=""
REMOTE_BOUND_ADDR_FILE="${TMP_DIR}/remote-mcp.addr"
REMOTE_LOG_FILE="${TMP_DIR}/remote-mcp.log"
REMOTE_SERVER_PID=""
cleanup() {
if [[ -n "${REMOTE_SERVER_PID}" ]]; then
kill "${REMOTE_SERVER_PID}" >/dev/null 2>&1 || true
fi
if [[ -n "${EXEC_SERVER_PID}" ]]; then
kill "${EXEC_SERVER_PID}" >/dev/null 2>&1 || true
fi
rm -rf "${TMP_DIR}"
}
trap cleanup EXIT
if [[ ! -x "${CODEX_BIN}" || ! -x "${REMOTE_SERVER_BIN}" ]]; then
(
cd "${REPO_ROOT}/codex-rs"
cargo build -p codex-cli --bin codex -p codex-rmcp-client --bin test_streamable_http_server
)
fi
EXEC_SERVER_PORT="$(
python3 - <<'PY'
import socket
with socket.socket() as sock:
sock.bind(("127.0.0.1", 0))
print(sock.getsockname()[1])
PY
)"
"${CODEX_BIN}" exec-server --listen "ws://127.0.0.1:${EXEC_SERVER_PORT}" \
>"${EXEC_SERVER_LOG_FILE}" 2>&1 &
EXEC_SERVER_PID="$!"
deadline=$((SECONDS + 10))
while (( SECONDS < deadline )); do
if python3 - <<PY >/dev/null 2>&1
import socket
with socket.create_connection(("127.0.0.1", ${EXEC_SERVER_PORT}), timeout=0.2):
pass
PY
then
break
fi
sleep 0.05
done
if ! python3 - <<PY >/dev/null 2>&1
import socket
with socket.create_connection(("127.0.0.1", ${EXEC_SERVER_PORT}), timeout=0.2):
pass
PY
then
cat "${EXEC_SERVER_LOG_FILE}" >&2 || true
echo "timed out waiting for exec-server" >&2
exit 1
fi
MCP_STREAMABLE_HTTP_BIND_ADDR='127.0.0.1:0' \
MCP_STREAMABLE_HTTP_BOUND_ADDR_FILE="${REMOTE_BOUND_ADDR_FILE}" \
"${REMOTE_SERVER_BIN}" >"${REMOTE_LOG_FILE}" 2>&1 &
REMOTE_SERVER_PID="$!"
deadline=$((SECONDS + 10))
REMOTE_BOUND_ADDR=""
while (( SECONDS < deadline )); do
if REMOTE_BOUND_ADDR="$(cat "${REMOTE_BOUND_ADDR_FILE}" 2>/dev/null)"; then
break
fi
sleep 0.05
done
if [[ -z "${REMOTE_BOUND_ADDR}" ]]; then
cat "${REMOTE_LOG_FILE}" >&2 || true
echo "timed out waiting for remote MCP OAuth test server" >&2
exit 1
fi
REMOTE_PORT="${REMOTE_BOUND_ADDR##*:}"
REMOTE_MCP_URL="http://127.0.0.1:${REMOTE_PORT}/mcp"
CALLBACK_PORT="$(
python3 - <<'PY'
import socket
with socket.socket() as sock:
sock.bind(("0.0.0.0", 0))
print(sock.getsockname()[1])
PY
)"
mkdir -p "${CODEX_HOME}"
cat > "${CODEX_HOME}/config.toml" <<EOF
model_provider = "mock_provider"
mcp_oauth_credentials_store = "file"
mcp_oauth_callback_port = ${CALLBACK_PORT}
mcp_oauth_callback_url = "http://127.0.0.1:${CALLBACK_PORT}/callback"
[model_providers.mock_provider]
name = "Mock"
base_url = "http://127.0.0.1:1/v1"
wire_api = "responses"
[mcp_servers.remote-oauth]
url = "${REMOTE_MCP_URL}"
environment_id = "remote"
[mcp_servers.remote-oauth.oauth]
client_id = "codex-app-server-test"
EOF
cat > "${CODEX_HOME}/environments.toml" <<EOF
include_local = false
[[environments]]
id = "remote"
url = "ws://127.0.0.1:${EXEC_SERVER_PORT}"
EOF
CODEX_BIN="${CODEX_BIN}" \
CODEX_HOME="${CODEX_HOME}" \
python3 - <<'PY'
import json
import os
import subprocess
import sys
import time
import urllib.request
def send(proc, payload):
proc.stdin.write(json.dumps(payload) + "\n")
proc.stdin.flush()
def read_message(proc, timeout_s=15):
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
line = proc.stdout.readline()
if line:
return json.loads(line)
if proc.poll() is not None:
stderr = proc.stderr.read()
raise SystemExit(f"codex app-server exited: {stderr}")
time.sleep(0.01)
raise SystemExit("timed out waiting for codex app-server message")
def read_until(proc, predicate, timeout_s=15):
deadline = time.monotonic() + timeout_s
seen = []
while time.monotonic() < deadline:
message = read_message(proc, timeout_s=max(0.1, deadline - time.monotonic()))
seen.append(message)
if predicate(message):
return message
raise SystemExit(f"timed out waiting for matching message: {seen!r}")
def request(proc, request_id, method, params):
send(proc, {"id": request_id, "method": method, "params": params})
message = read_until(proc, lambda message: message.get("id") == request_id)
if "error" in message:
raise SystemExit(f"{method} failed: {message['error']}")
return message["result"]
codex_bin = os.environ["CODEX_BIN"]
codex_home = os.environ["CODEX_HOME"]
proc = subprocess.Popen(
[codex_bin, "app-server", "--listen", "stdio://"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env={**os.environ, "CODEX_HOME": codex_home},
)
assert proc.stdin is not None
assert proc.stdout is not None
assert proc.stderr is not None
initialize = request(
proc,
1,
"initialize",
{
"clientInfo": {"name": "remote-mcp-oauth-e2e", "version": "0.1.0"},
"capabilities": {"experimentalApi": True},
},
)
if "userAgent" not in initialize:
raise SystemExit(f"initialize response missing userAgent: {initialize!r}")
send(proc, {"method": "initialized"})
status = request(
proc,
2,
"mcpServerStatus/list",
{"detail": "toolsAndAuthOnly"},
)
entry = status["data"][0]
if entry["name"] != "remote-oauth" or entry["authStatus"] != "notLoggedIn":
raise SystemExit(f"unexpected pre-login status: {status!r}")
login = request(proc, 3, "mcpServer/oauth/login", {"name": "remote-oauth"})
authorization_url = login["authorizationUrl"]
with urllib.request.urlopen(authorization_url, timeout=15) as response:
if response.status != 200:
raise SystemExit(f"unexpected authorize response: {response.status}")
completed = read_until(
proc,
lambda message: message.get("method") == "mcpServer/oauthLogin/completed",
)
params = completed.get("params") or {}
if params != {"name": "remote-oauth", "success": True}:
raise SystemExit(f"unexpected oauth completion notification: {completed!r}")
status = request(
proc,
4,
"mcpServerStatus/list",
{"detail": "toolsAndAuthOnly"},
)
entry = status["data"][0]
if entry["authStatus"] != "oAuth":
raise SystemExit(f"unexpected post-login status: {status!r}")
proc.terminate()
proc.wait(timeout=5)
print("remote MCP OAuth E2E passed")
PY