mirror of
https://github.com/openai/codex.git
synced 2026-06-04 04:12:03 +00:00
Compare commits
2 Commits
starr/mcp-
...
codex/exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2e54d30ad9 | ||
|
|
f0a839ea0c |
@@ -2988,6 +2988,20 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"SkillsExtraRootsSetParams": {
|
||||
"properties": {
|
||||
"extraRoots": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/AbsolutePathBuf"
|
||||
},
|
||||
"type": "array"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"extraRoots"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"SkillsListParams": {
|
||||
"properties": {
|
||||
"cwds": {
|
||||
@@ -4783,6 +4797,30 @@
|
||||
"title": "Skills/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"skills/extraRoots/set"
|
||||
],
|
||||
"title": "Skills/extraRoots/setRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/SkillsExtraRootsSetParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Skills/extraRoots/setRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
|
||||
@@ -709,6 +709,30 @@
|
||||
"title": "Skills/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/v2/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"skills/extraRoots/set"
|
||||
],
|
||||
"title": "Skills/extraRoots/setRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/v2/SkillsExtraRootsSetParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Skills/extraRoots/setRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
@@ -15121,6 +15145,27 @@
|
||||
"title": "SkillsConfigWriteResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"SkillsExtraRootsSetParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"extraRoots": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/v2/AbsolutePathBuf"
|
||||
},
|
||||
"type": "array"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"extraRoots"
|
||||
],
|
||||
"title": "SkillsExtraRootsSetParams",
|
||||
"type": "object"
|
||||
},
|
||||
"SkillsExtraRootsSetResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "SkillsExtraRootsSetResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"SkillsListEntry": {
|
||||
"properties": {
|
||||
"cwd": {
|
||||
|
||||
@@ -1457,6 +1457,30 @@
|
||||
"title": "Skills/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"skills/extraRoots/set"
|
||||
],
|
||||
"title": "Skills/extraRoots/setRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/SkillsExtraRootsSetParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Skills/extraRoots/setRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
@@ -12945,6 +12969,27 @@
|
||||
"title": "SkillsConfigWriteResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"SkillsExtraRootsSetParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"extraRoots": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/AbsolutePathBuf"
|
||||
},
|
||||
"type": "array"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"extraRoots"
|
||||
],
|
||||
"title": "SkillsExtraRootsSetParams",
|
||||
"type": "object"
|
||||
},
|
||||
"SkillsExtraRootsSetResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "SkillsExtraRootsSetResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"SkillsListEntry": {
|
||||
"properties": {
|
||||
"cwd": {
|
||||
|
||||
22
codex-rs/app-server-protocol/schema/json/v2/SkillsExtraRootsSetParams.json
generated
Normal file
22
codex-rs/app-server-protocol/schema/json/v2/SkillsExtraRootsSetParams.json
generated
Normal file
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"definitions": {
|
||||
"AbsolutePathBuf": {
|
||||
"description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"extraRoots": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/AbsolutePathBuf"
|
||||
},
|
||||
"type": "array"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"extraRoots"
|
||||
],
|
||||
"title": "SkillsExtraRootsSetParams",
|
||||
"type": "object"
|
||||
}
|
||||
5
codex-rs/app-server-protocol/schema/json/v2/SkillsExtraRootsSetResponse.json
generated
Normal file
5
codex-rs/app-server-protocol/schema/json/v2/SkillsExtraRootsSetResponse.json
generated
Normal file
@@ -0,0 +1,5 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "SkillsExtraRootsSetResponse",
|
||||
"type": "object"
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
6
codex-rs/app-server-protocol/schema/typescript/v2/SkillsExtraRootsSetParams.ts
generated
Normal file
6
codex-rs/app-server-protocol/schema/typescript/v2/SkillsExtraRootsSetParams.ts
generated
Normal file
@@ -0,0 +1,6 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { AbsolutePathBuf } from "../AbsolutePathBuf";
|
||||
|
||||
export type SkillsExtraRootsSetParams = { extraRoots: Array<AbsolutePathBuf>, };
|
||||
5
codex-rs/app-server-protocol/schema/typescript/v2/SkillsExtraRootsSetResponse.ts
generated
Normal file
5
codex-rs/app-server-protocol/schema/typescript/v2/SkillsExtraRootsSetResponse.ts
generated
Normal file
@@ -0,0 +1,5 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
export type SkillsExtraRootsSetResponse = Record<string, never>;
|
||||
@@ -344,6 +344,8 @@ export type { SkillToolDependency } from "./SkillToolDependency";
|
||||
export type { SkillsChangedNotification } from "./SkillsChangedNotification";
|
||||
export type { SkillsConfigWriteParams } from "./SkillsConfigWriteParams";
|
||||
export type { SkillsConfigWriteResponse } from "./SkillsConfigWriteResponse";
|
||||
export type { SkillsExtraRootsSetParams } from "./SkillsExtraRootsSetParams";
|
||||
export type { SkillsExtraRootsSetResponse } from "./SkillsExtraRootsSetResponse";
|
||||
export type { SkillsListEntry } from "./SkillsListEntry";
|
||||
export type { SkillsListParams } from "./SkillsListParams";
|
||||
export type { SkillsListResponse } from "./SkillsListResponse";
|
||||
|
||||
@@ -610,6 +610,11 @@ client_request_definitions! {
|
||||
serialization: global_shared_read("config"),
|
||||
response: v2::SkillsListResponse,
|
||||
},
|
||||
SkillsExtraRootsSet => "skills/extraRoots/set" {
|
||||
params: v2::SkillsExtraRootsSetParams,
|
||||
serialization: global("config"),
|
||||
response: v2::SkillsExtraRootsSetResponse,
|
||||
},
|
||||
HooksList => "hooks/list" {
|
||||
params: v2::HooksListParams,
|
||||
serialization: global("config"),
|
||||
@@ -1721,6 +1726,17 @@ mod tests {
|
||||
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
|
||||
);
|
||||
|
||||
let skills_extra_roots_set = ClientRequest::SkillsExtraRootsSet {
|
||||
request_id: request_id(),
|
||||
params: v2::SkillsExtraRootsSetParams {
|
||||
extra_roots: vec![absolute_path("/tmp/skills")],
|
||||
},
|
||||
};
|
||||
assert_eq!(
|
||||
skills_extra_roots_set.serialization_scope(),
|
||||
Some(ClientRequestSerializationScope::Global("config"))
|
||||
);
|
||||
|
||||
let plugin_list = ClientRequest::PluginList {
|
||||
request_id: request_id(),
|
||||
params: v2::PluginListParams {
|
||||
|
||||
@@ -35,6 +35,18 @@ pub struct SkillsListResponse {
|
||||
pub data: Vec<SkillsListEntry>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct SkillsExtraRootsSetParams {
|
||||
pub extra_roots: Vec<AbsolutePathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct SkillsExtraRootsSetResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
|
||||
@@ -2626,6 +2626,27 @@ fn skills_list_params_serialization_uses_force_reload() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn skills_extra_roots_set_params_serialization_uses_extra_roots() {
|
||||
assert_eq!(
|
||||
serde_json::to_value(SkillsExtraRootsSetParams {
|
||||
extra_roots: vec![absolute_path("tmp/skills")],
|
||||
})
|
||||
.unwrap(),
|
||||
json!({
|
||||
"extraRoots": [absolute_path_string("tmp/skills")],
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn skills_extra_roots_set_params_rejects_relative_roots() {
|
||||
let result = serde_json::from_value::<SkillsExtraRootsSetParams>(json!({
|
||||
"extraRoots": ["relative/path"],
|
||||
}));
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plugin_source_serializes_local_git_and_remote_variants() {
|
||||
let local_path = if cfg!(windows) {
|
||||
|
||||
@@ -196,6 +196,7 @@ Example with notification opt-out:
|
||||
- `environment/add` — experimental; add or replace a named remote environment by `environmentId` and `execServerUrl` for later selection by `thread/start` or `turn/start`; returns `{}` and does not change the default environment.
|
||||
- `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination). Built-in presets do not select a model; the Plan preset selects medium reasoning effort. This response omits built-in developer instructions; clients should either pass `settings.developer_instructions: null` when setting a mode to use Codex's built-in instructions, or provide their own instructions explicitly.
|
||||
- `skills/list` — list skills for one or more `cwd` values (optional `forceReload`).
|
||||
- `skills/extraRoots/set` — replace the app-server process runtime extra standalone skill roots. The roots are not persisted; missing directories are accepted and simply load no skills.
|
||||
- `hooks/list` — list discovered hooks for one or more `cwd` values.
|
||||
- `marketplace/add` — add a remote plugin marketplace from an HTTP(S) Git URL, SSH Git URL, or GitHub `owner/repo` shorthand, then persist it into the user marketplace config. Returns the installed root path plus whether the marketplace was already present.
|
||||
- `marketplace/remove` — remove a configured marketplace by name from the user marketplace config, and delete its installed marketplace root when one exists.
|
||||
@@ -1506,6 +1507,7 @@ $skill-creator Add a new skill for triaging flaky CI and include step-by-step us
|
||||
Use `skills/list` to fetch the available skills (optionally scoped by `cwds`, with `forceReload`).
|
||||
`skills/list` might reuse a cached skills result per `cwd`; setting `forceReload` to `true` refreshes the result from disk.
|
||||
The server also emits `skills/changed` notifications when watched local skill files change. Treat this as an invalidation signal and re-run `skills/list` with your current params when needed.
|
||||
Use `skills/extraRoots/set` to replace additional standalone skill roots for the current app-server process. These roots use the same layout as other standalone skill roots: each root contains skill directories, and each skill directory contains `SKILL.md`. Missing roots are accepted and load no skills until they exist. This setting is lost when app-server exits.
|
||||
|
||||
```json
|
||||
{ "method": "skills/list", "id": 25, "params": {
|
||||
@@ -1542,12 +1544,23 @@ The server also emits `skills/changed` notifications when watched local skill fi
|
||||
}
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"method": "skills/extraRoots/set",
|
||||
"id": 26,
|
||||
"params": {
|
||||
"extraRoots": ["/Users/me/generated-skills"]
|
||||
}
|
||||
}
|
||||
{ "id": 26, "result": {} }
|
||||
```
|
||||
|
||||
To enable or disable a skill by absolute path:
|
||||
|
||||
```json
|
||||
{
|
||||
"method": "skills/config/write",
|
||||
"id": 26,
|
||||
"id": 27,
|
||||
"params": {
|
||||
"path": "/Users/alice/.codex/skills/skill-creator/SKILL.md",
|
||||
"name": null,
|
||||
@@ -1561,7 +1574,7 @@ To enable or disable a skill by name:
|
||||
```json
|
||||
{
|
||||
"method": "skills/config/write",
|
||||
"id": 27,
|
||||
"id": 28,
|
||||
"params": {
|
||||
"path": null,
|
||||
"name": "github:yeet",
|
||||
|
||||
@@ -354,6 +354,8 @@ impl MessageProcessor {
|
||||
app_list_shutdown_token,
|
||||
);
|
||||
let catalog_processor = CatalogRequestProcessor::new(
|
||||
outgoing.clone(),
|
||||
Arc::clone(&skills_watcher),
|
||||
auth_manager.clone(),
|
||||
Arc::clone(&thread_manager),
|
||||
Arc::clone(&config),
|
||||
@@ -1108,6 +1110,9 @@ impl MessageProcessor {
|
||||
ClientRequest::SkillsList { params, .. } => {
|
||||
self.catalog_processor.skills_list(params).await
|
||||
}
|
||||
ClientRequest::SkillsExtraRootsSet { params, .. } => {
|
||||
self.catalog_processor.skills_extra_roots_set(params).await
|
||||
}
|
||||
ClientRequest::HooksList { params, .. } => {
|
||||
self.catalog_processor.hooks_list(params).await
|
||||
}
|
||||
|
||||
@@ -156,6 +156,8 @@ use codex_app_server_protocol::ServerRequestResolvedNotification;
|
||||
use codex_app_server_protocol::SkillSummary;
|
||||
use codex_app_server_protocol::SkillsConfigWriteParams;
|
||||
use codex_app_server_protocol::SkillsConfigWriteResponse;
|
||||
use codex_app_server_protocol::SkillsExtraRootsSetParams;
|
||||
use codex_app_server_protocol::SkillsExtraRootsSetResponse;
|
||||
use codex_app_server_protocol::SkillsListParams;
|
||||
use codex_app_server_protocol::SkillsListResponse;
|
||||
use codex_app_server_protocol::SortDirection;
|
||||
@@ -350,7 +352,7 @@ use codex_mcp::McpRuntimeContext;
|
||||
use codex_mcp::McpServerStatusSnapshot;
|
||||
use codex_mcp::McpSnapshotDetail;
|
||||
use codex_mcp::collect_mcp_server_status_snapshot_with_detail;
|
||||
use codex_mcp::discover_supported_scopes_with_runtime_context;
|
||||
use codex_mcp::discover_supported_scopes;
|
||||
use codex_mcp::read_mcp_resource as read_mcp_resource_without_thread;
|
||||
use codex_mcp::resolve_oauth_scopes;
|
||||
use codex_memories_write::clear_memory_roots_contents;
|
||||
@@ -402,7 +404,7 @@ use codex_protocol::protocol::USER_MESSAGE_BEGIN;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
|
||||
use codex_protocol::user_input::UserInput as CoreInputItem;
|
||||
use codex_rmcp_client::perform_oauth_login_return_url_with_http_client;
|
||||
use codex_rmcp_client::perform_oauth_login_return_url;
|
||||
use codex_rollout::EventPersistenceMode;
|
||||
use codex_rollout::is_persisted_rollout_item;
|
||||
use codex_rollout::state_db::StateDbHandle;
|
||||
|
||||
@@ -4,6 +4,8 @@ use futures::StreamExt;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct CatalogRequestProcessor {
|
||||
pub(super) outgoing: Arc<OutgoingMessageSender>,
|
||||
pub(super) skills_watcher: Arc<SkillsWatcher>,
|
||||
pub(super) auth_manager: Arc<AuthManager>,
|
||||
pub(super) thread_manager: Arc<ThreadManager>,
|
||||
pub(super) config: Arc<Config>,
|
||||
@@ -96,6 +98,8 @@ fn errors_to_info(
|
||||
|
||||
impl CatalogRequestProcessor {
|
||||
pub(crate) fn new(
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
skills_watcher: Arc<SkillsWatcher>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
config: Arc<Config>,
|
||||
@@ -103,6 +107,8 @@ impl CatalogRequestProcessor {
|
||||
workspace_settings_cache: Arc<workspace_settings::WorkspaceSettingsCache>,
|
||||
) -> Self {
|
||||
Self {
|
||||
outgoing,
|
||||
skills_watcher,
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
config,
|
||||
@@ -138,6 +144,15 @@ impl CatalogRequestProcessor {
|
||||
.map(|response| Some(response.into()))
|
||||
}
|
||||
|
||||
pub(crate) async fn skills_extra_roots_set(
|
||||
&self,
|
||||
params: SkillsExtraRootsSetParams,
|
||||
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
||||
self.skills_extra_roots_set_response(params)
|
||||
.await
|
||||
.map(|response| Some(response.into()))
|
||||
}
|
||||
|
||||
pub(crate) async fn model_list(
|
||||
&self,
|
||||
params: ModelListParams,
|
||||
@@ -567,6 +582,24 @@ impl CatalogRequestProcessor {
|
||||
Ok(SkillsListResponse { data })
|
||||
}
|
||||
|
||||
async fn skills_extra_roots_set_response(
|
||||
&self,
|
||||
params: SkillsExtraRootsSetParams,
|
||||
) -> Result<SkillsExtraRootsSetResponse, JSONRPCErrorError> {
|
||||
let SkillsExtraRootsSetParams { extra_roots } = params;
|
||||
self.skills_watcher
|
||||
.register_runtime_extra_roots(&extra_roots);
|
||||
self.thread_manager
|
||||
.skills_manager()
|
||||
.set_extra_roots(extra_roots);
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::SkillsChanged(
|
||||
codex_app_server_protocol::SkillsChangedNotification {},
|
||||
))
|
||||
.await;
|
||||
Ok(SkillsExtraRootsSetResponse {})
|
||||
}
|
||||
|
||||
/// Handle `hooks/list` by resolving hooks for each requested cwd.
|
||||
async fn hooks_list_response(
|
||||
&self,
|
||||
|
||||
@@ -130,12 +130,6 @@ impl McpRequestProcessor {
|
||||
"No MCP server named '{name}' found."
|
||||
)));
|
||||
};
|
||||
let environment_manager = self.thread_manager.environment_manager();
|
||||
// This threadless login path has no turn cwd or turn-selected
|
||||
// environment. Use config cwd only as the local stdio fallback; HTTP
|
||||
// auth routing still follows each MCP server's configured environment.
|
||||
let runtime_context =
|
||||
McpRuntimeContext::new(Arc::clone(&environment_manager), config.cwd.to_path_buf());
|
||||
|
||||
let (url, http_headers, env_http_headers) = match &server.transport {
|
||||
McpServerTransportConfig::StreamableHttp {
|
||||
@@ -152,19 +146,14 @@ impl McpRequestProcessor {
|
||||
};
|
||||
|
||||
let discovered_scopes = if scopes.is_none() && server.scopes.is_none() {
|
||||
discover_supported_scopes_with_runtime_context(&name, server, &runtime_context).await
|
||||
discover_supported_scopes(&server.transport).await
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let resolved_scopes =
|
||||
resolve_oauth_scopes(scopes, server.scopes.clone(), discovered_scopes);
|
||||
|
||||
let http_client = runtime_context
|
||||
.resolve_streamable_http_client(&name, server)
|
||||
.map_err(|err| {
|
||||
internal_error(format!("failed to resolve MCP server '{name}': {err}"))
|
||||
})?;
|
||||
let handle = perform_oauth_login_return_url_with_http_client(
|
||||
let handle = perform_oauth_login_return_url(
|
||||
&name,
|
||||
&url,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
@@ -176,7 +165,6 @@ impl McpRequestProcessor {
|
||||
timeout_secs,
|
||||
config.mcp_oauth_callback_port,
|
||||
config.mcp_oauth_callback_url.as_deref(),
|
||||
http_client,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| internal_error(format!("failed to login to MCP server '{name}': {err}")))?;
|
||||
|
||||
@@ -10,9 +10,9 @@ use codex_core_plugins::remote::RemotePluginScope;
|
||||
use codex_core_plugins::remote::is_valid_remote_plugin_id;
|
||||
use codex_core_plugins::remote::validate_remote_plugin_id;
|
||||
use codex_mcp::McpOAuthLoginSupport;
|
||||
use codex_mcp::oauth_login_support_with_runtime_context;
|
||||
use codex_mcp::oauth_login_support;
|
||||
use codex_mcp::should_retry_without_scopes;
|
||||
use codex_rmcp_client::perform_oauth_login_silent_with_http_client;
|
||||
use codex_rmcp_client::perform_oauth_login_silent;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct PluginRequestProcessor {
|
||||
@@ -1565,18 +1565,8 @@ impl PluginRequestProcessor {
|
||||
config: &Config,
|
||||
plugin_mcp_servers: HashMap<String, McpServerConfig>,
|
||||
) {
|
||||
let runtime_context = McpRuntimeContext::new(
|
||||
self.thread_manager.environment_manager(),
|
||||
config.cwd.to_path_buf(),
|
||||
);
|
||||
for (name, server) in plugin_mcp_servers {
|
||||
let oauth_config = match oauth_login_support_with_runtime_context(
|
||||
&name,
|
||||
&server,
|
||||
&runtime_context,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let oauth_config = match oauth_login_support(&server.transport).await {
|
||||
McpOAuthLoginSupport::Supported(config) => config,
|
||||
McpOAuthLoginSupport::Unsupported => continue,
|
||||
McpOAuthLoginSupport::Unknown(err) => {
|
||||
@@ -1596,19 +1586,12 @@ impl PluginRequestProcessor {
|
||||
let store_mode = config.mcp_oauth_credentials_store_mode;
|
||||
let callback_port = config.mcp_oauth_callback_port;
|
||||
let callback_url = config.mcp_oauth_callback_url.clone();
|
||||
let http_client = match runtime_context.resolve_streamable_http_client(&name, &server) {
|
||||
Ok(http_client) => http_client,
|
||||
Err(err) => {
|
||||
warn!("failed to resolve MCP server {name} for plugin login: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let outgoing = Arc::clone(&self.outgoing);
|
||||
let notification_name = name.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let oauth_client_id = server.oauth_client_id();
|
||||
let first_attempt = perform_oauth_login_silent_with_http_client(
|
||||
let first_attempt = perform_oauth_login_silent(
|
||||
&name,
|
||||
&oauth_config.url,
|
||||
store_mode,
|
||||
@@ -1619,13 +1602,12 @@ impl PluginRequestProcessor {
|
||||
server.oauth_resource.as_deref(),
|
||||
callback_port,
|
||||
callback_url.as_deref(),
|
||||
Arc::clone(&http_client),
|
||||
)
|
||||
.await;
|
||||
|
||||
let final_result = match first_attempt {
|
||||
Err(err) if should_retry_without_scopes(&resolved_scopes, &err) => {
|
||||
perform_oauth_login_silent_with_http_client(
|
||||
perform_oauth_login_silent(
|
||||
&name,
|
||||
&oauth_config.url,
|
||||
store_mode,
|
||||
@@ -1636,7 +1618,6 @@ impl PluginRequestProcessor {
|
||||
server.oauth_resource.as_deref(),
|
||||
callback_port,
|
||||
callback_url.as_deref(),
|
||||
http_client,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
@@ -15,6 +16,7 @@ use codex_file_watcher::ThrottledWatchReceiver;
|
||||
use codex_file_watcher::WatchPath;
|
||||
use codex_file_watcher::WatchRegistration;
|
||||
use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio_util::sync::DropGuard;
|
||||
use tracing::warn;
|
||||
@@ -26,6 +28,7 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50);
|
||||
|
||||
pub(crate) struct SkillsWatcher {
|
||||
subscriber: FileWatcherSubscriber,
|
||||
runtime_extra_roots_registration: Mutex<WatchRegistration>,
|
||||
shutdown_token: CancellationToken,
|
||||
_shutdown_drop_guard: DropGuard,
|
||||
}
|
||||
@@ -48,6 +51,7 @@ impl SkillsWatcher {
|
||||
Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_token.child_token());
|
||||
Arc::new(Self {
|
||||
subscriber,
|
||||
runtime_extra_roots_registration: Mutex::new(WatchRegistration::default()),
|
||||
shutdown_token,
|
||||
_shutdown_drop_guard: shutdown_drop_guard,
|
||||
})
|
||||
@@ -57,6 +61,22 @@ impl SkillsWatcher {
|
||||
self.shutdown_token.cancel();
|
||||
}
|
||||
|
||||
pub(crate) fn register_runtime_extra_roots(&self, extra_roots: &[AbsolutePathBuf]) {
|
||||
let roots = extra_roots
|
||||
.iter()
|
||||
.map(|root| WatchPath {
|
||||
path: root.clone().into_path_buf(),
|
||||
recursive: true,
|
||||
})
|
||||
.collect();
|
||||
let registration = self.subscriber.register_paths(roots);
|
||||
let mut guard = self
|
||||
.runtime_extra_roots_registration
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*guard = registration;
|
||||
}
|
||||
|
||||
pub(crate) async fn register_thread_config(
|
||||
&self,
|
||||
config: &Config,
|
||||
|
||||
@@ -71,6 +71,7 @@ use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ReviewStartParams;
|
||||
use codex_app_server_protocol::SendAddCreditsNudgeEmailParams;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::SkillsExtraRootsSetParams;
|
||||
use codex_app_server_protocol::SkillsListParams;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadCompactStartParams;
|
||||
@@ -675,6 +676,15 @@ impl McpProcess {
|
||||
self.send_request("skills/list", params).await
|
||||
}
|
||||
|
||||
/// Send a `skills/extraRoots/set` JSON-RPC request.
|
||||
pub async fn send_skills_extra_roots_set_request(
|
||||
&mut self,
|
||||
params: SkillsExtraRootsSetParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("skills/extraRoots/set", params).await
|
||||
}
|
||||
|
||||
/// Send a `hooks/list` JSON-RPC request.
|
||||
pub async fn send_hooks_list_request(
|
||||
&mut self,
|
||||
|
||||
@@ -13,11 +13,14 @@ use codex_app_server_protocol::PluginListParams;
|
||||
use codex_app_server_protocol::PluginListResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SkillsChangedNotification;
|
||||
use codex_app_server_protocol::SkillsExtraRootsSetParams;
|
||||
use codex_app_server_protocol::SkillsExtraRootsSetResponse;
|
||||
use codex_app_server_protocol::SkillsListParams;
|
||||
use codex_app_server_protocol::SkillsListResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_config::types::AuthCredentialsStoreMode;
|
||||
use codex_exec_server::CODEX_EXEC_SERVER_URL_ENV_VAR;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
@@ -40,6 +43,23 @@ fn write_skill(root: &TempDir, name: &str) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn expect_skills_changed_notification(
|
||||
mcp: &mut McpProcess,
|
||||
timeout_duration: Duration,
|
||||
) -> Result<()> {
|
||||
let notification = timeout(
|
||||
timeout_duration,
|
||||
mcp.read_stream_until_notification_message("skills/changed"),
|
||||
)
|
||||
.await??;
|
||||
let params = notification
|
||||
.params
|
||||
.context("skills/changed params must be present")?;
|
||||
let notification: SkillsChangedNotification = serde_json::from_value(params)?;
|
||||
assert_eq!(notification, SkillsChangedNotification {});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_plugins_enabled_config_with_base_url(
|
||||
codex_home: &std::path::Path,
|
||||
base_url: &str,
|
||||
@@ -573,6 +593,150 @@ async fn skills_list_uses_cached_result_until_force_reload() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skills_extra_roots_set_updates_process_runtime_roots() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let cwd = TempDir::new()?;
|
||||
let extra_root = TempDir::new()?;
|
||||
let extra_skills_root = extra_root.path().join("skills");
|
||||
let skill_dir = extra_skills_root.join("runtime-skill");
|
||||
std::fs::create_dir_all(&skill_dir)?;
|
||||
std::fs::write(
|
||||
skill_dir.join("SKILL.md"),
|
||||
"---\nname: runtime-skill\ndescription: runtime skill\n---\n\n# Body\n",
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let set_request_id = mcp
|
||||
.send_skills_extra_roots_set_request(SkillsExtraRootsSetParams {
|
||||
extra_roots: vec![AbsolutePathBuf::from_absolute_path(&extra_skills_root)?],
|
||||
})
|
||||
.await?;
|
||||
let set_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(set_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: SkillsExtraRootsSetResponse = to_response(set_response)?;
|
||||
expect_skills_changed_notification(&mut mcp, DEFAULT_TIMEOUT).await?;
|
||||
|
||||
let skills_request_id = mcp
|
||||
.send_skills_list_request(SkillsListParams {
|
||||
cwds: vec![cwd.path().to_path_buf()],
|
||||
force_reload: false,
|
||||
})
|
||||
.await?;
|
||||
let skills_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(skills_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let SkillsListResponse { data } = to_response(skills_response)?;
|
||||
assert_eq!(data.len(), 1);
|
||||
assert_eq!(data[0].errors, Vec::new());
|
||||
assert!(
|
||||
data[0]
|
||||
.skills
|
||||
.iter()
|
||||
.any(|skill| skill.name == "runtime-skill")
|
||||
);
|
||||
|
||||
let missing_root = extra_root.path().join("missing-skills");
|
||||
let reset_request_id = mcp
|
||||
.send_skills_extra_roots_set_request(SkillsExtraRootsSetParams {
|
||||
extra_roots: vec![AbsolutePathBuf::from_absolute_path(&missing_root)?],
|
||||
})
|
||||
.await?;
|
||||
let reset_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(reset_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: SkillsExtraRootsSetResponse = to_response(reset_response)?;
|
||||
expect_skills_changed_notification(&mut mcp, DEFAULT_TIMEOUT).await?;
|
||||
|
||||
let skills_request_id = mcp
|
||||
.send_skills_list_request(SkillsListParams {
|
||||
cwds: vec![cwd.path().to_path_buf()],
|
||||
force_reload: false,
|
||||
})
|
||||
.await?;
|
||||
let skills_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(skills_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let SkillsListResponse { data } = to_response(skills_response)?;
|
||||
assert_eq!(data.len(), 1);
|
||||
assert_eq!(data[0].errors, Vec::new());
|
||||
assert!(
|
||||
data[0]
|
||||
.skills
|
||||
.iter()
|
||||
.all(|skill| skill.name != "runtime-skill")
|
||||
);
|
||||
|
||||
let clear_request_id = mcp
|
||||
.send_skills_extra_roots_set_request(SkillsExtraRootsSetParams {
|
||||
extra_roots: Vec::new(),
|
||||
})
|
||||
.await?;
|
||||
let clear_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(clear_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: SkillsExtraRootsSetResponse = to_response(clear_response)?;
|
||||
expect_skills_changed_notification(&mut mcp, DEFAULT_TIMEOUT).await?;
|
||||
let skills_request_id = mcp
|
||||
.send_skills_list_request(SkillsListParams {
|
||||
cwds: vec![cwd.path().to_path_buf()],
|
||||
force_reload: false,
|
||||
})
|
||||
.await?;
|
||||
let skills_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(skills_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let SkillsListResponse { data } = to_response(skills_response)?;
|
||||
assert_eq!(data.len(), 1);
|
||||
assert_eq!(data[0].errors, Vec::new());
|
||||
assert!(
|
||||
data[0]
|
||||
.skills
|
||||
.iter()
|
||||
.all(|skill| skill.name != "runtime-skill")
|
||||
);
|
||||
|
||||
drop(mcp);
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
let skills_request_id = mcp
|
||||
.send_skills_list_request(SkillsListParams {
|
||||
cwds: vec![cwd.path().to_path_buf()],
|
||||
force_reload: false,
|
||||
})
|
||||
.await?;
|
||||
let skills_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(skills_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let SkillsListResponse { data } = to_response(skills_response)?;
|
||||
assert_eq!(data.len(), 1);
|
||||
assert_eq!(data[0].errors, Vec::new());
|
||||
assert!(
|
||||
data[0]
|
||||
.skills
|
||||
.iter()
|
||||
.all(|skill| skill.name != "runtime-skill")
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -42,11 +42,8 @@ pub use mcp::McpOAuthLoginSupport;
|
||||
pub use mcp::McpOAuthScopesSource;
|
||||
pub use mcp::ResolvedMcpOAuthScopes;
|
||||
pub use mcp::compute_auth_statuses;
|
||||
pub use mcp::compute_auth_statuses_with_runtime_context;
|
||||
pub use mcp::discover_supported_scopes;
|
||||
pub use mcp::discover_supported_scopes_with_runtime_context;
|
||||
pub use mcp::oauth_login_support;
|
||||
pub use mcp::oauth_login_support_with_runtime_context;
|
||||
pub use mcp::resolve_oauth_scopes;
|
||||
pub use mcp::should_retry_without_scopes;
|
||||
|
||||
|
||||
@@ -1,21 +1,17 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_config::McpServerConfig;
|
||||
use codex_config::McpServerTransportConfig;
|
||||
use codex_config::types::OAuthCredentialsStoreMode;
|
||||
use codex_exec_server::HttpClient;
|
||||
use codex_exec_server::ReqwestHttpClient;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_protocol::protocol::McpAuthStatus;
|
||||
use codex_rmcp_client::OAuthProviderError;
|
||||
use codex_rmcp_client::determine_streamable_http_auth_status;
|
||||
use codex_rmcp_client::discover_streamable_http_oauth_with_http_client;
|
||||
use codex_rmcp_client::discover_streamable_http_oauth;
|
||||
use futures::future::join_all;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::runtime::McpRuntimeContext;
|
||||
use crate::server::EffectiveMcpServer;
|
||||
|
||||
use super::CODEX_APPS_MCP_SERVER_NAME;
|
||||
@@ -56,25 +52,6 @@ pub struct McpAuthStatusEntry {
|
||||
}
|
||||
|
||||
pub async fn oauth_login_support(transport: &McpServerTransportConfig) -> McpOAuthLoginSupport {
|
||||
oauth_login_support_with_http_client(transport, Arc::new(ReqwestHttpClient)).await
|
||||
}
|
||||
|
||||
pub async fn oauth_login_support_with_runtime_context(
|
||||
server_name: &str,
|
||||
config: &McpServerConfig,
|
||||
runtime_context: &McpRuntimeContext,
|
||||
) -> McpOAuthLoginSupport {
|
||||
let http_client = match runtime_context.resolve_streamable_http_client(server_name, config) {
|
||||
Ok(http_client) => http_client,
|
||||
Err(err) => return McpOAuthLoginSupport::Unknown(anyhow::anyhow!(err)),
|
||||
};
|
||||
oauth_login_support_with_http_client(&config.transport, http_client).await
|
||||
}
|
||||
|
||||
async fn oauth_login_support_with_http_client(
|
||||
transport: &McpServerTransportConfig,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> McpOAuthLoginSupport {
|
||||
let McpServerTransportConfig::StreamableHttp {
|
||||
url,
|
||||
bearer_token_env_var,
|
||||
@@ -89,13 +66,7 @@ async fn oauth_login_support_with_http_client(
|
||||
return McpOAuthLoginSupport::Unsupported;
|
||||
}
|
||||
|
||||
match discover_streamable_http_oauth_with_http_client(
|
||||
url,
|
||||
http_headers.clone(),
|
||||
env_http_headers.clone(),
|
||||
http_client,
|
||||
)
|
||||
.await
|
||||
match discover_streamable_http_oauth(url, http_headers.clone(), env_http_headers.clone()).await
|
||||
{
|
||||
Ok(Some(discovery)) => McpOAuthLoginSupport::Supported(McpOAuthLoginConfig {
|
||||
url: url.clone(),
|
||||
@@ -117,17 +88,6 @@ pub async fn discover_supported_scopes(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn discover_supported_scopes_with_runtime_context(
|
||||
server_name: &str,
|
||||
config: &McpServerConfig,
|
||||
runtime_context: &McpRuntimeContext,
|
||||
) -> Option<Vec<String>> {
|
||||
match oauth_login_support_with_runtime_context(server_name, config, runtime_context).await {
|
||||
McpOAuthLoginSupport::Supported(config) => config.discovered_scopes,
|
||||
McpOAuthLoginSupport::Unsupported | McpOAuthLoginSupport::Unknown(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn resolve_oauth_scopes(
|
||||
explicit_scopes: Option<Vec<String>>,
|
||||
configured_scopes: Option<Vec<String>>,
|
||||
@@ -174,45 +134,10 @@ pub async fn compute_auth_statuses<'a, I>(
|
||||
) -> HashMap<String, McpAuthStatusEntry>
|
||||
where
|
||||
I: IntoIterator<Item = (&'a String, &'a EffectiveMcpServer)>,
|
||||
{
|
||||
compute_auth_statuses_with_http_client_resolver(servers, store_mode, auth, |_, _| {
|
||||
Ok(Arc::new(ReqwestHttpClient) as Arc<dyn HttpClient>)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn compute_auth_statuses_with_runtime_context<'a, I>(
|
||||
servers: I,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
auth: Option<&CodexAuth>,
|
||||
runtime_context: McpRuntimeContext,
|
||||
) -> HashMap<String, McpAuthStatusEntry>
|
||||
where
|
||||
I: IntoIterator<Item = (&'a String, &'a EffectiveMcpServer)>,
|
||||
{
|
||||
compute_auth_statuses_with_http_client_resolver(
|
||||
servers,
|
||||
store_mode,
|
||||
auth,
|
||||
move |name, config| runtime_context.resolve_streamable_http_client(name, config),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn compute_auth_statuses_with_http_client_resolver<'a, I, F>(
|
||||
servers: I,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
auth: Option<&CodexAuth>,
|
||||
resolve_http_client: F,
|
||||
) -> HashMap<String, McpAuthStatusEntry>
|
||||
where
|
||||
I: IntoIterator<Item = (&'a String, &'a EffectiveMcpServer)>,
|
||||
F: Fn(&str, &McpServerConfig) -> Result<Arc<dyn HttpClient>, String> + Clone,
|
||||
{
|
||||
let futures = servers.into_iter().map(|(name, server)| {
|
||||
let name = name.clone();
|
||||
let config = server.configured_config().cloned();
|
||||
let resolve_http_client = resolve_http_client.clone();
|
||||
let has_runtime_auth = name == CODEX_APPS_MCP_SERVER_NAME
|
||||
&& auth.is_some_and(CodexAuth::uses_codex_backend)
|
||||
&& config.as_ref().is_some_and(|config| {
|
||||
@@ -227,15 +152,7 @@ where
|
||||
async move {
|
||||
let auth_status = match config.as_ref() {
|
||||
Some(config) => {
|
||||
match compute_auth_status(
|
||||
&name,
|
||||
config,
|
||||
store_mode,
|
||||
has_runtime_auth,
|
||||
&resolve_http_client,
|
||||
)
|
||||
.await
|
||||
{
|
||||
match compute_auth_status(&name, config, store_mode, has_runtime_auth).await {
|
||||
Ok(status) => status,
|
||||
Err(error) => {
|
||||
warn!(
|
||||
@@ -263,7 +180,6 @@ async fn compute_auth_status(
|
||||
config: &McpServerConfig,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
has_runtime_auth: bool,
|
||||
resolve_http_client: &impl Fn(&str, &McpServerConfig) -> Result<Arc<dyn HttpClient>, String>,
|
||||
) -> Result<McpAuthStatus> {
|
||||
if !config.enabled {
|
||||
return Ok(McpAuthStatus::Unsupported);
|
||||
@@ -281,8 +197,6 @@ async fn compute_auth_status(
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
} => {
|
||||
let http_client =
|
||||
resolve_http_client(server_name, config).map_err(anyhow::Error::msg)?;
|
||||
determine_streamable_http_auth_status(
|
||||
server_name,
|
||||
url,
|
||||
@@ -290,7 +204,6 @@ async fn compute_auth_status(
|
||||
http_headers.clone(),
|
||||
env_http_headers.clone(),
|
||||
store_mode,
|
||||
http_client,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -4,11 +4,8 @@ pub use auth::McpOAuthLoginSupport;
|
||||
pub use auth::McpOAuthScopesSource;
|
||||
pub use auth::ResolvedMcpOAuthScopes;
|
||||
pub use auth::compute_auth_statuses;
|
||||
pub use auth::compute_auth_statuses_with_runtime_context;
|
||||
pub use auth::discover_supported_scopes;
|
||||
pub use auth::discover_supported_scopes_with_runtime_context;
|
||||
pub use auth::oauth_login_support;
|
||||
pub use auth::oauth_login_support_with_runtime_context;
|
||||
pub use auth::resolve_oauth_scopes;
|
||||
pub use auth::should_retry_without_scopes;
|
||||
|
||||
@@ -275,11 +272,10 @@ pub async fn read_mcp_resource(
|
||||
let mut mcp_servers = effective_mcp_servers(config, auth);
|
||||
let host_owned_codex_apps_enabled = host_owned_codex_apps_enabled(config, auth);
|
||||
mcp_servers.retain(|name, _| name == server);
|
||||
let auth_statuses = compute_auth_statuses_with_runtime_context(
|
||||
let auth_statuses = compute_auth_statuses(
|
||||
mcp_servers.iter(),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth,
|
||||
runtime_context.clone(),
|
||||
)
|
||||
.await;
|
||||
let (tx_event, rx_event) = unbounded();
|
||||
@@ -342,11 +338,10 @@ pub async fn collect_mcp_server_status_snapshot_with_detail(
|
||||
};
|
||||
}
|
||||
|
||||
let auth_status_entries = compute_auth_statuses_with_runtime_context(
|
||||
let auth_status_entries = compute_auth_statuses(
|
||||
mcp_servers.iter(),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth,
|
||||
runtime_context.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -46,6 +46,8 @@ use codex_async_utils::OrCancelExt;
|
||||
use codex_config::McpServerConfig;
|
||||
use codex_config::McpServerTransportConfig;
|
||||
use codex_config::types::OAuthCredentialsStoreMode;
|
||||
use codex_exec_server::HttpClient;
|
||||
use codex_exec_server::ReqwestHttpClient;
|
||||
use codex_protocol::mcp::McpServerInfo;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_rmcp_client::ExecutorStdioServerLauncher;
|
||||
@@ -584,7 +586,6 @@ async fn make_rmcp_client(
|
||||
let resolved_environment = runtime_context
|
||||
.resolve_server_environment(server_name, &config)
|
||||
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))?;
|
||||
let http_config = config.clone();
|
||||
let is_local_environment = config.is_local_environment();
|
||||
let McpServerConfig { transport, .. } = config;
|
||||
|
||||
@@ -631,9 +632,10 @@ async fn make_rmcp_client(
|
||||
env_http_headers,
|
||||
bearer_token_env_var,
|
||||
} => {
|
||||
let http_client = runtime_context
|
||||
.resolve_streamable_http_client(server_name, &http_config)
|
||||
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))?;
|
||||
let http_client = resolved_environment.as_ref().map_or_else(
|
||||
|| Arc::new(ReqwestHttpClient) as Arc<dyn HttpClient>,
|
||||
|environment| environment.get_http_client(),
|
||||
);
|
||||
let resolved_bearer_token =
|
||||
match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) {
|
||||
Ok(token) => token,
|
||||
|
||||
@@ -11,8 +11,6 @@ use std::time::Duration;
|
||||
|
||||
use codex_exec_server::Environment;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::HttpClient;
|
||||
use codex_exec_server::ReqwestHttpClient;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
|
||||
@@ -89,18 +87,6 @@ impl McpRuntimeContext {
|
||||
config.environment_id
|
||||
))
|
||||
}
|
||||
|
||||
pub fn resolve_streamable_http_client(
|
||||
&self,
|
||||
server_name: &str,
|
||||
config: &codex_config::McpServerConfig,
|
||||
) -> Result<Arc<dyn HttpClient>, String> {
|
||||
let resolved_environment = self.resolve_server_environment(server_name, config)?;
|
||||
Ok(resolved_environment.map_or_else(
|
||||
|| Arc::new(ReqwestHttpClient) as Arc<dyn HttpClient>,
|
||||
|environment| environment.get_http_client(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_remote_stdio_cwd(
|
||||
|
||||
@@ -235,6 +235,7 @@ pub(crate) async fn skill_roots(
|
||||
config_layer_stack: &ConfigLayerStack,
|
||||
cwd: &AbsolutePathBuf,
|
||||
plugin_skill_roots: Vec<PluginSkillRoot>,
|
||||
extra_skill_roots: Vec<AbsolutePathBuf>,
|
||||
) -> Vec<SkillRoot> {
|
||||
let home_dir =
|
||||
home_dir().and_then(|path| AbsolutePathBuf::from_absolute_path_checked(path).ok());
|
||||
@@ -244,6 +245,7 @@ pub(crate) async fn skill_roots(
|
||||
cwd,
|
||||
home_dir.as_ref(),
|
||||
plugin_skill_roots,
|
||||
extra_skill_roots,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -254,6 +256,7 @@ async fn skill_roots_with_home_dir(
|
||||
cwd: &AbsolutePathBuf,
|
||||
home_dir: Option<&AbsolutePathBuf>,
|
||||
plugin_skill_roots: Vec<PluginSkillRoot>,
|
||||
extra_skill_roots: Vec<AbsolutePathBuf>,
|
||||
) -> Vec<SkillRoot> {
|
||||
let mut roots = skill_roots_from_layer_stack_inner(config_layer_stack, home_dir, fs.clone());
|
||||
roots.extend(plugin_skill_roots.into_iter().map(|root| SkillRoot {
|
||||
@@ -263,6 +266,13 @@ async fn skill_roots_with_home_dir(
|
||||
plugin_id: Some(root.plugin_id),
|
||||
plugin_root: Some(root.plugin_root),
|
||||
}));
|
||||
roots.extend(extra_skill_roots.into_iter().map(|path| SkillRoot {
|
||||
path,
|
||||
scope: SkillScope::User,
|
||||
file_system: Arc::clone(&LOCAL_FS),
|
||||
plugin_id: None,
|
||||
plugin_root: None,
|
||||
}));
|
||||
roots.extend(repo_agents_skill_roots(fs, config_layer_stack, cwd).await);
|
||||
dedupe_skill_roots_by_path(&mut roots);
|
||||
roots
|
||||
@@ -1051,7 +1061,15 @@ pub(crate) async fn skill_roots_from_layer_stack(
|
||||
cwd: &AbsolutePathBuf,
|
||||
home_dir: Option<&AbsolutePathBuf>,
|
||||
) -> Vec<SkillRoot> {
|
||||
skill_roots_with_home_dir(Some(fs), config_layer_stack, cwd, home_dir, Vec::new()).await
|
||||
skill_roots_with_home_dir(
|
||||
Some(fs),
|
||||
config_layer_stack,
|
||||
cwd,
|
||||
home_dir,
|
||||
Vec::new(),
|
||||
Vec::new(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1900,6 +1900,7 @@ async fn skill_roots_include_admin_with_lowest_priority() {
|
||||
&cfg.config_layer_stack,
|
||||
&cfg.cwd,
|
||||
Vec::new(),
|
||||
Vec::new(),
|
||||
)
|
||||
.await
|
||||
.into_iter()
|
||||
|
||||
@@ -51,6 +51,7 @@ impl SkillsLoadInput {
|
||||
pub struct SkillsManager {
|
||||
codex_home: AbsolutePathBuf,
|
||||
restriction_product: Option<Product>,
|
||||
extra_roots: RwLock<Vec<AbsolutePathBuf>>,
|
||||
cache_by_cwd: RwLock<HashMap<AbsolutePathBuf, SkillLoadOutcome>>,
|
||||
cache_by_config: RwLock<HashMap<ConfigSkillsCacheKey, SkillLoadOutcome>>,
|
||||
}
|
||||
@@ -68,6 +69,7 @@ impl SkillsManager {
|
||||
let manager = Self {
|
||||
codex_home,
|
||||
restriction_product,
|
||||
extra_roots: RwLock::new(Vec::new()),
|
||||
cache_by_cwd: RwLock::new(HashMap::new()),
|
||||
cache_by_config: RwLock::new(HashMap::new()),
|
||||
};
|
||||
@@ -81,6 +83,17 @@ impl SkillsManager {
|
||||
manager
|
||||
}
|
||||
|
||||
pub fn set_extra_roots(&self, extra_roots: Vec<AbsolutePathBuf>) {
|
||||
{
|
||||
let mut roots = self
|
||||
.extra_roots
|
||||
.write()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*roots = extra_roots;
|
||||
}
|
||||
self.clear_cache();
|
||||
}
|
||||
|
||||
/// Load skills for an already-constructed [`Config`], avoiding any additional config-layer
|
||||
/// loading.
|
||||
///
|
||||
@@ -118,6 +131,7 @@ impl SkillsManager {
|
||||
&input.config_layer_stack,
|
||||
&input.cwd,
|
||||
input.effective_skill_roots.clone(),
|
||||
self.extra_roots(),
|
||||
)
|
||||
.await;
|
||||
if !input.bundled_skills_enabled {
|
||||
@@ -145,6 +159,7 @@ impl SkillsManager {
|
||||
&input.config_layer_stack,
|
||||
&input.cwd,
|
||||
input.effective_skill_roots.clone(),
|
||||
self.extra_roots(),
|
||||
)
|
||||
.await;
|
||||
if !bundled_skills_enabled_from_stack(&input.config_layer_stack) {
|
||||
@@ -214,6 +229,13 @@ impl SkillsManager {
|
||||
Err(err) => err.into_inner().get(cache_key).cloned(),
|
||||
}
|
||||
}
|
||||
|
||||
fn extra_roots(&self) -> Vec<AbsolutePathBuf> {
|
||||
match self.extra_roots.read() {
|
||||
Ok(roots) => roots.clone(),
|
||||
Err(err) => err.into_inner().clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
|
||||
@@ -221,6 +221,128 @@ async fn skills_for_config_reuses_cache_for_same_effective_config() {
|
||||
assert_eq!(outcome2.skills, outcome1.skills);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_extra_roots_replaces_runtime_roots_and_clears_cache() {
|
||||
let codex_home = tempfile::tempdir().expect("tempdir");
|
||||
let cwd = tempfile::tempdir().expect("tempdir");
|
||||
let extra_root = tempfile::tempdir().expect("tempdir");
|
||||
let config_layer_stack = config_stack(&codex_home, "");
|
||||
let skills_manager = SkillsManager::new(
|
||||
codex_home.path().abs(),
|
||||
/*bundled_skills_enabled*/ true,
|
||||
);
|
||||
|
||||
let skills_input = SkillsLoadInput::new(
|
||||
cwd.path().abs(),
|
||||
Vec::new(),
|
||||
config_layer_stack.clone(),
|
||||
bundled_skills_enabled_from_stack(&config_layer_stack),
|
||||
);
|
||||
let empty_outcome = skills_manager
|
||||
.skills_for_cwd(
|
||||
&skills_input,
|
||||
/*force_reload*/ false,
|
||||
Some(Arc::clone(&LOCAL_FS)),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
empty_outcome
|
||||
.skills
|
||||
.iter()
|
||||
.all(|skill| skill.name != "runtime-skill")
|
||||
);
|
||||
|
||||
let extra_skills_root = extra_root.path().join("skills");
|
||||
let skill_dir = extra_skills_root.join("runtime-skill");
|
||||
fs::create_dir_all(&skill_dir).expect("create skill dir");
|
||||
fs::write(
|
||||
skill_dir.join("SKILL.md"),
|
||||
"---\nname: runtime-skill\ndescription: runtime skill\n---\n\n# Body\n",
|
||||
)
|
||||
.expect("write skill");
|
||||
skills_manager.set_extra_roots(vec![extra_skills_root.abs()]);
|
||||
|
||||
let runtime_outcome = skills_manager
|
||||
.skills_for_cwd(
|
||||
&skills_input,
|
||||
/*force_reload*/ false,
|
||||
Some(Arc::clone(&LOCAL_FS)),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
runtime_outcome
|
||||
.skills
|
||||
.iter()
|
||||
.any(|skill| skill.name == "runtime-skill")
|
||||
);
|
||||
|
||||
skills_manager.set_extra_roots(vec![extra_root.path().join("missing-skills").abs()]);
|
||||
let replaced_outcome = skills_manager
|
||||
.skills_for_cwd(
|
||||
&skills_input,
|
||||
/*force_reload*/ false,
|
||||
Some(Arc::clone(&LOCAL_FS)),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(replaced_outcome.errors, Vec::new());
|
||||
assert!(
|
||||
replaced_outcome
|
||||
.skills
|
||||
.iter()
|
||||
.all(|skill| skill.name != "runtime-skill")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_extra_roots_applies_to_config_loads_and_empty_clears() {
|
||||
let codex_home = tempfile::tempdir().expect("tempdir");
|
||||
let cwd = tempfile::tempdir().expect("tempdir");
|
||||
let extra_root = tempfile::tempdir().expect("tempdir");
|
||||
let config_layer_stack = config_stack(&codex_home, "");
|
||||
let skills_manager = SkillsManager::new(
|
||||
codex_home.path().abs(),
|
||||
/*bundled_skills_enabled*/ true,
|
||||
);
|
||||
|
||||
let empty_outcome =
|
||||
skills_for_config_with_stack(&skills_manager, &cwd, &config_layer_stack, &[]).await;
|
||||
assert!(
|
||||
empty_outcome
|
||||
.skills
|
||||
.iter()
|
||||
.all(|skill| skill.name != "runtime-skill")
|
||||
);
|
||||
|
||||
let extra_skills_root = extra_root.path().join("skills");
|
||||
let skill_dir = extra_skills_root.join("runtime-skill");
|
||||
fs::create_dir_all(&skill_dir).expect("create skill dir");
|
||||
fs::write(
|
||||
skill_dir.join("SKILL.md"),
|
||||
"---\nname: runtime-skill\ndescription: runtime skill\n---\n\n# Body\n",
|
||||
)
|
||||
.expect("write skill");
|
||||
skills_manager.set_extra_roots(vec![extra_skills_root.abs()]);
|
||||
|
||||
let runtime_outcome =
|
||||
skills_for_config_with_stack(&skills_manager, &cwd, &config_layer_stack, &[]).await;
|
||||
assert!(
|
||||
runtime_outcome
|
||||
.skills
|
||||
.iter()
|
||||
.any(|skill| skill.name == "runtime-skill")
|
||||
);
|
||||
|
||||
skills_manager.set_extra_roots(Vec::new());
|
||||
let cleared_outcome =
|
||||
skills_for_config_with_stack(&skills_manager, &cwd, &config_layer_stack, &[]).await;
|
||||
assert!(
|
||||
cleared_outcome
|
||||
.skills
|
||||
.iter()
|
||||
.all(|skill| skill.name != "runtime-skill")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skills_for_config_disables_plugin_skills_by_name() {
|
||||
let codex_home = tempfile::tempdir().expect("tempdir");
|
||||
|
||||
@@ -39,7 +39,7 @@ use codex_mcp::McpRuntimeContext;
|
||||
use codex_mcp::ToolInfo;
|
||||
use codex_mcp::ToolPluginProvenance;
|
||||
use codex_mcp::codex_apps_tools_cache_key;
|
||||
use codex_mcp::compute_auth_statuses_with_runtime_context;
|
||||
use codex_mcp::compute_auth_statuses;
|
||||
use codex_mcp::host_owned_codex_apps_enabled;
|
||||
use codex_mcp::with_codex_apps_mcp;
|
||||
|
||||
@@ -252,11 +252,10 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_environment_manager(
|
||||
});
|
||||
}
|
||||
|
||||
let auth_status_entries = compute_auth_statuses_with_runtime_context(
|
||||
let auth_status_entries = compute_auth_statuses(
|
||||
mcp_servers.iter(),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth.as_ref(),
|
||||
McpRuntimeContext::new(Arc::clone(&environment_manager), config.cwd.to_path_buf()),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use codex_protocol::request_user_input::RequestUserInputArgs;
|
||||
use codex_protocol::request_user_input::RequestUserInputQuestion;
|
||||
use codex_protocol::request_user_input::RequestUserInputQuestionOption;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_rmcp_client::perform_oauth_login_with_http_client;
|
||||
use codex_rmcp_client::perform_oauth_login;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -23,7 +23,7 @@ use codex_mcp::ElicitationReviewerHandle;
|
||||
use codex_mcp::McpOAuthLoginSupport;
|
||||
use codex_mcp::McpPermissionPromptAutoApproveContext;
|
||||
use codex_mcp::mcp_permission_prompt_is_auto_approved;
|
||||
use codex_mcp::oauth_login_support_with_runtime_context;
|
||||
use codex_mcp::oauth_login_support;
|
||||
use codex_mcp::resolve_oauth_scopes;
|
||||
use codex_mcp::should_retry_without_scopes;
|
||||
|
||||
@@ -136,46 +136,23 @@ pub(crate) async fn maybe_install_mcp_dependencies(
|
||||
return;
|
||||
}
|
||||
|
||||
let runtime_context = match turn_context.environments.primary() {
|
||||
Some(turn_environment) => codex_mcp::McpRuntimeContext::new(
|
||||
std::sync::Arc::clone(&sess.services.environment_manager),
|
||||
turn_environment.cwd.to_path_buf(),
|
||||
),
|
||||
None => codex_mcp::McpRuntimeContext::new(
|
||||
std::sync::Arc::clone(&sess.services.environment_manager),
|
||||
#[allow(deprecated)]
|
||||
turn_context.cwd.to_path_buf(),
|
||||
),
|
||||
};
|
||||
|
||||
for (name, server_config) in added {
|
||||
let oauth_config =
|
||||
match oauth_login_support_with_runtime_context(&name, &server_config, &runtime_context)
|
||||
.await
|
||||
{
|
||||
McpOAuthLoginSupport::Supported(config) => config,
|
||||
McpOAuthLoginSupport::Unsupported => continue,
|
||||
McpOAuthLoginSupport::Unknown(err) => {
|
||||
warn!("MCP server may or may not require login for dependency {name}: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let oauth_config = match oauth_login_support(&server_config.transport).await {
|
||||
McpOAuthLoginSupport::Supported(config) => config,
|
||||
McpOAuthLoginSupport::Unsupported => continue,
|
||||
McpOAuthLoginSupport::Unknown(err) => {
|
||||
warn!("MCP server may or may not require login for dependency {name}: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let resolved_scopes = resolve_oauth_scopes(
|
||||
/*explicit_scopes*/ None,
|
||||
server_config.scopes.clone(),
|
||||
oauth_config.discovered_scopes.clone(),
|
||||
);
|
||||
let http_client =
|
||||
match runtime_context.resolve_streamable_http_client(&name, &server_config) {
|
||||
Ok(http_client) => http_client,
|
||||
Err(err) => {
|
||||
warn!("failed to resolve MCP server {name} for dependency login: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let oauth_client_id = server_config.oauth_client_id();
|
||||
let first_attempt = perform_oauth_login_with_http_client(
|
||||
let first_attempt = perform_oauth_login(
|
||||
&name,
|
||||
&oauth_config.url,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
@@ -186,13 +163,12 @@ pub(crate) async fn maybe_install_mcp_dependencies(
|
||||
server_config.oauth_resource.as_deref(),
|
||||
config.mcp_oauth_callback_port,
|
||||
config.mcp_oauth_callback_url.as_deref(),
|
||||
std::sync::Arc::clone(&http_client),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(err) = first_attempt {
|
||||
if should_retry_without_scopes(&resolved_scopes, &err) {
|
||||
if let Err(err) = perform_oauth_login_with_http_client(
|
||||
if let Err(err) = perform_oauth_login(
|
||||
&name,
|
||||
&oauth_config.url,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
@@ -203,7 +179,6 @@ pub(crate) async fn maybe_install_mcp_dependencies(
|
||||
server_config.oauth_resource.as_deref(),
|
||||
config.mcp_oauth_callback_port,
|
||||
config.mcp_oauth_callback_url.as_deref(),
|
||||
http_client,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -323,6 +323,8 @@ impl Session {
|
||||
effective_mcp_servers_from_configured(mcp_servers, &mcp_config, auth.as_ref());
|
||||
let host_owned_codex_apps_enabled =
|
||||
host_owned_codex_apps_enabled(&mcp_config, auth.as_ref());
|
||||
let auth_statuses =
|
||||
compute_auth_statuses(mcp_servers.iter(), store_mode, auth.as_ref()).await;
|
||||
let mcp_runtime_context = match turn_context.environments.primary() {
|
||||
Some(turn_environment) => McpRuntimeContext::new(
|
||||
Arc::clone(&self.services.environment_manager),
|
||||
@@ -334,13 +336,6 @@ impl Session {
|
||||
turn_context.cwd.to_path_buf(),
|
||||
),
|
||||
};
|
||||
let auth_statuses = compute_auth_statuses_with_runtime_context(
|
||||
mcp_servers.iter(),
|
||||
store_mode,
|
||||
auth.as_ref(),
|
||||
mcp_runtime_context.clone(),
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
guard.cancel();
|
||||
|
||||
@@ -315,7 +315,7 @@ use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use crate::windows_sandbox::WindowsSandboxLevelExt;
|
||||
use codex_core_plugins::PluginsManager;
|
||||
use codex_git_utils::get_git_repo_root;
|
||||
use codex_mcp::compute_auth_statuses_with_runtime_context;
|
||||
use codex_mcp::compute_auth_statuses;
|
||||
use codex_mcp::effective_mcp_servers_from_configured;
|
||||
use codex_mcp::host_owned_codex_apps_enabled;
|
||||
use codex_otel::SessionTelemetry;
|
||||
|
||||
@@ -621,21 +621,15 @@ impl Session {
|
||||
let auth_manager_clone = Arc::clone(&auth_manager);
|
||||
let config_for_mcp = Arc::clone(&config);
|
||||
let mcp_manager_for_mcp = Arc::clone(&mcp_manager);
|
||||
let environment_manager_for_mcp = Arc::clone(&environment_manager);
|
||||
let auth_and_mcp_fut = async move {
|
||||
let auth = auth_manager_clone.auth().await;
|
||||
let mcp_servers = mcp_manager_for_mcp
|
||||
.effective_servers(&config_for_mcp, auth.as_ref())
|
||||
.await;
|
||||
let mcp_runtime_context = McpRuntimeContext::new(
|
||||
environment_manager_for_mcp,
|
||||
config_for_mcp.cwd.to_path_buf(),
|
||||
);
|
||||
let auth_statuses = compute_auth_statuses_with_runtime_context(
|
||||
let auth_statuses = compute_auth_statuses(
|
||||
mcp_servers.iter(),
|
||||
config_for_mcp.mcp_oauth_credentials_store_mode,
|
||||
auth.as_ref(),
|
||||
mcp_runtime_context,
|
||||
)
|
||||
.await;
|
||||
(auth, mcp_servers, auth_statuses)
|
||||
|
||||
200
codex-rs/exec-server/observability.md
Normal file
200
codex-rs/exec-server/observability.md
Normal file
@@ -0,0 +1,200 @@
|
||||
# Exec-server observability proposal
|
||||
|
||||
Status: Proposed
|
||||
|
||||
## Motivation
|
||||
|
||||
`codex exec-server` runs inside execution environments where the surrounding
|
||||
services can be observed, but the work performed by the exec-server process is
|
||||
largely opaque. It accepts connections, attaches resumable sessions, spawns
|
||||
and manages commands, serves filesystem operations, proxies HTTP requests, and
|
||||
in remote mode maintains a relay connection. Failures in any of these paths
|
||||
currently have little context beyond the caller-visible error.
|
||||
|
||||
The useful Kubernetes analogy is a node agent such as the kubelet: exec-server
|
||||
is not the workload itself, but a long-running component that receives control
|
||||
requests and operates workloads within its environment. Kubernetes components
|
||||
provide component logs, configurable metrics and traces, and health endpoints
|
||||
without treating workload output as component telemetry. Exec-server should
|
||||
follow the same boundary.
|
||||
|
||||
## Goals
|
||||
|
||||
- Make standalone local and remote exec-server instances observable through the
|
||||
existing Codex OpenTelemetry and `tracing` infrastructure.
|
||||
- Provide enough lifecycle context to distinguish connectivity, protocol,
|
||||
session, subprocess, filesystem, and HTTP proxy failures.
|
||||
- Keep stdout reserved for transport behavior and emit component logs only to
|
||||
stderr or configured exporters.
|
||||
- Avoid recording user content, secrets, or unbounded-cardinality data as
|
||||
telemetry.
|
||||
- Require explicit operator opt-in before exporting metrics from a
|
||||
containerized exec-server instance.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- Capturing child-process stdout or stderr as application telemetry.
|
||||
- Logging commands, environment variables, filesystem contents, HTTP bodies,
|
||||
authorization data, or full URLs.
|
||||
- Adding a separate telemetry backend or exec-server-specific configuration
|
||||
format.
|
||||
- Changing the JSON-RPC or relay protocols for the initial increment.
|
||||
|
||||
## Current state
|
||||
|
||||
The exec-server crate already emits a small number of `tracing` events for
|
||||
transport, session, and error conditions. The websocket listener also exposes
|
||||
`/readyz`. However, the standalone `codex exec-server` CLI path does not
|
||||
install a tracing subscriber or construct an OTEL provider, so those events
|
||||
are not useful in the container deployment that most needs them.
|
||||
|
||||
Other Codex long-running entrypoints already load `Config`, construct an OTEL
|
||||
provider through `codex_core::otel_init::build_provider`, attach logger and
|
||||
tracing layers to `tracing_subscriber`, and record process-start metrics when
|
||||
metrics are enabled. Exec-server should reuse that path rather than create
|
||||
telemetry machinery in the protocol and execution crate.
|
||||
|
||||
## Signal model
|
||||
|
||||
The following Kubernetes component concepts map cleanly to exec-server:
|
||||
|
||||
| Kubernetes concept | Exec-server equivalent |
|
||||
| --- | --- |
|
||||
| Component logs | Structured events about server, connection, session, relay, and managed-operation lifecycle |
|
||||
| Component metrics | Bounded counters, gauges, and duration histograms for transport and managed operations |
|
||||
| Component traces | Spans from an inbound RPC through a process, filesystem, HTTP, or relay action |
|
||||
| `/readyz` and `/livez` | Whether the service can accept work and whether the process runtime is alive |
|
||||
| Workload logs | Command stdout/stderr, which remain protocol output and are not component telemetry |
|
||||
|
||||
All signals must use low-cardinality operation names and outcomes. Session IDs
|
||||
and process IDs may be useful for log or trace correlation after a privacy
|
||||
review, but must not become metric labels. Commands, paths, body content, auth
|
||||
material, environment values, and full URLs are excluded from emitted data.
|
||||
|
||||
## Initial increment: opt-in OTEL bootstrap
|
||||
|
||||
The first change should expose the tracing already present in exec-server and
|
||||
establish the exporter path needed for later instrumentation.
|
||||
|
||||
Implementation ownership remains in `codex-rs/cli/src/main.rs`, where
|
||||
`codex exec-server` is launched and where `codex-core` configuration and OTEL
|
||||
bootstrap dependencies already exist:
|
||||
|
||||
1. Load Codex configuration for every `exec-server` invocation, including
|
||||
local listener mode. Remote mode reuses the same loaded config for
|
||||
authentication.
|
||||
2. Build an OTEL provider with service name `codex-exec-server` and
|
||||
`default_analytics_enabled = false`.
|
||||
3. Attach a stderr formatting layer filtered through `RUST_LOG`, plus the
|
||||
configured OTEL logger and tracing layers, before entering local serving or
|
||||
remote relay operation.
|
||||
4. Invoke the existing process-start metric helper. It records only when the
|
||||
operator has explicitly enabled metrics export.
|
||||
5. Retain the provider for the server process lifetime so exporters can flush
|
||||
on shutdown.
|
||||
|
||||
This increment intentionally does not install SQLite telemetry: unlike
|
||||
app-server and MCP server, standalone exec-server does not initialize the
|
||||
Codex state database.
|
||||
|
||||
### Configuration behavior
|
||||
|
||||
The existing configuration surface applies without new CLI flags:
|
||||
|
||||
| Signal | Enablement |
|
||||
| --- | --- |
|
||||
| Local stderr logs | Controlled by `RUST_LOG`; written to stderr only |
|
||||
| Exported logs | Configure `otel.exporter` |
|
||||
| Exported traces | Configure `otel.trace_exporter` |
|
||||
| Exported metrics | Set `analytics.enabled = true`; optionally override `otel.metrics_exporter` |
|
||||
|
||||
When `analytics.enabled` is absent, exec-server does not export metrics. This
|
||||
differs intentionally from interactive entrypoints that can default analytics
|
||||
on: exec-server may run as a long-lived container component, so telemetry
|
||||
export is an operator deployment choice. If analytics is enabled without a
|
||||
metrics exporter override, existing configuration defaults apply.
|
||||
|
||||
### Transport invariants
|
||||
|
||||
- Local websocket mode continues to write the bound `ws://` startup URL to
|
||||
stdout for existing launch scripts.
|
||||
- Stdio mode continues to reserve stdout for JSON-RPC frames.
|
||||
- Subscriber output is always sent to stderr or OTEL exporters.
|
||||
- Local startup now loads configuration before opening its listener; malformed
|
||||
strict configuration continues to fail before serving work.
|
||||
|
||||
## Follow-on increments
|
||||
|
||||
Once the exporter path is deployed and its operational value is verified,
|
||||
later changes can add signals at existing component boundaries.
|
||||
|
||||
### Lifecycle events and spans
|
||||
|
||||
Emit bounded structured events and spans for:
|
||||
|
||||
- process startup and shutdown;
|
||||
- local connections and remote relay registration, connection, disconnection,
|
||||
and retry;
|
||||
- session attach, resume, and eviction;
|
||||
- RPC completion by method family and outcome;
|
||||
- managed process start, exit, and termination;
|
||||
- filesystem and HTTP proxy operation completion by operation family and
|
||||
outcome.
|
||||
|
||||
These events should identify operation type and status without including
|
||||
payload content. Trace-context propagation over the exec-server protocol can
|
||||
be considered separately after local spans are useful.
|
||||
|
||||
### Metrics
|
||||
|
||||
Add OTEL instruments only with fixed label domains, such as transport,
|
||||
operation family, result, and HTTP status class:
|
||||
|
||||
| Candidate metric | Instrument | Labels |
|
||||
| --- | --- | --- |
|
||||
| `exec_server.connections.active` | gauge | `transport` |
|
||||
| `exec_server.connections.total` | counter | `transport`, `result` |
|
||||
| `exec_server.requests.total` | counter | `operation`, `result` |
|
||||
| `exec_server.request.duration` | histogram | `operation`, `result` |
|
||||
| `exec_server.processes.active` | gauge | none |
|
||||
| `exec_server.process.duration` | histogram | `result` |
|
||||
| `exec_server.relay.reconnects` | counter | `reason` |
|
||||
|
||||
Metric labels must never include session IDs, process IDs, commands, paths,
|
||||
hostnames, or URLs.
|
||||
|
||||
### Health
|
||||
|
||||
Websocket mode already returns success from `/readyz`. A later increment
|
||||
should separate:
|
||||
|
||||
- `/livez`: the server runtime is responsive and has not begun shutdown.
|
||||
- `/readyz`: the server can accept useful work.
|
||||
|
||||
For remote environment mode, readiness should reflect an active registration
|
||||
and relay connection. Transient downstream failures should affect readiness
|
||||
rather than liveness, so container restarts do not amplify backend outages.
|
||||
|
||||
## Validation and rollout
|
||||
|
||||
The initial implementation should test that:
|
||||
|
||||
- local websocket startup still writes its endpoint on stdout while an enabled
|
||||
`RUST_LOG` filter makes exec-server component logs visible on stderr;
|
||||
- configured log and trace exporters can be constructed under the
|
||||
`codex-exec-server` service name;
|
||||
- metrics are absent by default and present only when analytics is explicitly
|
||||
enabled;
|
||||
- strict configuration errors still prevent listener startup.
|
||||
|
||||
Deployment can start by enabling stderr collection for container instances,
|
||||
then opting selected environments into OTEL log or trace export. Subsequent
|
||||
signal additions should use the redaction and cardinality rules above and be
|
||||
validated against real incident queries before adding further volume.
|
||||
|
||||
## References
|
||||
|
||||
- [Kubernetes logging architecture](https://kubernetes.io/docs/concepts/cluster-administration/logging/)
|
||||
- [Kubernetes system component metrics](https://kubernetes.io/docs/concepts/cluster-administration/system-metrics/)
|
||||
- [Kubernetes system component traces](https://kubernetes.io/docs/concepts/cluster-administration/system-traces/)
|
||||
- [Kubernetes API health endpoints](https://kubernetes.io/docs/reference/using-api/health-checks/)
|
||||
@@ -20,8 +20,6 @@ pub(crate) mod response_body_stream;
|
||||
#[path = "rpc_http_client.rs"]
|
||||
mod rpc_http_client;
|
||||
|
||||
pub use reqwest_http_client::HTTP_REQUEST_NO_PROXY_HEADER;
|
||||
pub use reqwest_http_client::HTTP_REQUEST_NO_REDIRECTS_HEADER;
|
||||
pub(crate) use reqwest_http_client::PendingReqwestHttpBodyStream;
|
||||
pub use reqwest_http_client::ReqwestHttpClient;
|
||||
pub(crate) use reqwest_http_client::ReqwestHttpRequestRunner;
|
||||
|
||||
@@ -35,9 +35,6 @@ use crate::rpc::invalid_params;
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ReqwestHttpClient;
|
||||
|
||||
pub const HTTP_REQUEST_NO_PROXY_HEADER: &str = "x-codex-internal-http-no-proxy";
|
||||
pub const HTTP_REQUEST_NO_REDIRECTS_HEADER: &str = "x-codex-internal-http-no-redirects";
|
||||
|
||||
/// Streaming response state held between the initial HTTP response and
|
||||
/// downstream body-delta forwarding.
|
||||
pub(crate) struct PendingReqwestHttpBodyStream {
|
||||
@@ -52,22 +49,13 @@ pub(crate) struct ReqwestHttpRequestRunner {
|
||||
}
|
||||
|
||||
impl ReqwestHttpClient {
|
||||
fn build_client(
|
||||
timeout_ms: Option<u64>,
|
||||
request_policy: HttpRequestPolicy,
|
||||
) -> Result<reqwest::Client, ExecServerError> {
|
||||
let mut builder = match timeout_ms {
|
||||
fn build_client(timeout_ms: Option<u64>) -> Result<reqwest::Client, ExecServerError> {
|
||||
let builder = match timeout_ms {
|
||||
None => reqwest::Client::builder(),
|
||||
Some(timeout_ms) => {
|
||||
reqwest::Client::builder().timeout(Duration::from_millis(timeout_ms))
|
||||
}
|
||||
};
|
||||
if request_policy.no_proxy {
|
||||
builder = builder.no_proxy();
|
||||
}
|
||||
if request_policy.no_redirects {
|
||||
builder = builder.redirect(reqwest::redirect::Policy::none());
|
||||
}
|
||||
build_reqwest_client_with_custom_ca(builder)
|
||||
.map_err(|error| ExecServerError::HttpRequest(error.to_string()))
|
||||
}
|
||||
@@ -79,7 +67,7 @@ impl HttpClient for ReqwestHttpClient {
|
||||
params: HttpRequestParams,
|
||||
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>> {
|
||||
async move {
|
||||
let runner = ReqwestHttpRequestRunner::new(¶ms)
|
||||
let runner = ReqwestHttpRequestRunner::new(params.timeout_ms)
|
||||
.map_err(|error| ExecServerError::HttpRequest(error.message))?;
|
||||
let (response, _) = runner
|
||||
.run(HttpRequestParams {
|
||||
@@ -98,7 +86,7 @@ impl HttpClient for ReqwestHttpClient {
|
||||
params: HttpRequestParams,
|
||||
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>> {
|
||||
async move {
|
||||
let runner = ReqwestHttpRequestRunner::new(¶ms)
|
||||
let runner = ReqwestHttpRequestRunner::new(params.timeout_ms)
|
||||
.map_err(|error| ExecServerError::HttpRequest(error.message))?;
|
||||
let (response, pending_stream) = runner
|
||||
.run(HttpRequestParams {
|
||||
@@ -122,12 +110,9 @@ impl HttpClient for ReqwestHttpClient {
|
||||
}
|
||||
|
||||
impl ReqwestHttpRequestRunner {
|
||||
pub(crate) fn new(params: &HttpRequestParams) -> Result<Self, JSONRPCErrorError> {
|
||||
let client = ReqwestHttpClient::build_client(
|
||||
params.timeout_ms,
|
||||
HttpRequestPolicy::from_headers(¶ms.headers),
|
||||
)
|
||||
.map_err(|error| internal_error(error.to_string()))?;
|
||||
pub(crate) fn new(timeout_ms: Option<u64>) -> Result<Self, JSONRPCErrorError> {
|
||||
let client = ReqwestHttpClient::build_client(timeout_ms)
|
||||
.map_err(|error| internal_error(error.to_string()))?;
|
||||
Ok(Self { client })
|
||||
}
|
||||
|
||||
@@ -254,9 +239,6 @@ impl ReqwestHttpRequestRunner {
|
||||
fn build_headers(headers: Vec<HttpHeader>) -> Result<HeaderMap, JSONRPCErrorError> {
|
||||
let mut header_map = HeaderMap::new();
|
||||
for header in headers {
|
||||
if is_internal_http_request_header(&header.name) {
|
||||
continue;
|
||||
}
|
||||
let name = HeaderName::from_bytes(header.name.as_bytes()).map_err(|error| {
|
||||
invalid_params(format!("http/request header name is invalid: {error}"))
|
||||
})?;
|
||||
@@ -283,72 +265,3 @@ impl ReqwestHttpRequestRunner {
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
struct HttpRequestPolicy {
|
||||
no_proxy: bool,
|
||||
no_redirects: bool,
|
||||
}
|
||||
|
||||
impl HttpRequestPolicy {
|
||||
fn from_headers(headers: &[HttpHeader]) -> Self {
|
||||
Self {
|
||||
no_proxy: has_internal_http_request_header(headers, HTTP_REQUEST_NO_PROXY_HEADER),
|
||||
no_redirects: has_internal_http_request_header(
|
||||
headers,
|
||||
HTTP_REQUEST_NO_REDIRECTS_HEADER,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn has_internal_http_request_header(headers: &[HttpHeader], expected_name: &str) -> bool {
|
||||
headers
|
||||
.iter()
|
||||
.any(|header| header.name.eq_ignore_ascii_case(expected_name))
|
||||
}
|
||||
|
||||
fn is_internal_http_request_header(header_name: &str) -> bool {
|
||||
header_name.eq_ignore_ascii_case(HTTP_REQUEST_NO_PROXY_HEADER)
|
||||
|| header_name.eq_ignore_ascii_case(HTTP_REQUEST_NO_REDIRECTS_HEADER)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn request_policy_reads_internal_headers_without_forwarding_them() {
|
||||
let headers = vec![
|
||||
HttpHeader {
|
||||
name: HTTP_REQUEST_NO_PROXY_HEADER.to_string(),
|
||||
value: "true".to_string(),
|
||||
},
|
||||
HttpHeader {
|
||||
name: HTTP_REQUEST_NO_REDIRECTS_HEADER.to_string(),
|
||||
value: "true".to_string(),
|
||||
},
|
||||
HttpHeader {
|
||||
name: "accept".to_string(),
|
||||
value: "application/json".to_string(),
|
||||
},
|
||||
];
|
||||
|
||||
assert_eq!(
|
||||
HttpRequestPolicy::from_headers(&headers),
|
||||
HttpRequestPolicy {
|
||||
no_proxy: true,
|
||||
no_redirects: true,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
ReqwestHttpRequestRunner::build_headers(headers)
|
||||
.expect("headers should build")
|
||||
.get("accept")
|
||||
.expect("accept header should be forwarded"),
|
||||
"application/json"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,22 +94,6 @@ impl EnvironmentManager {
|
||||
Self::from_snapshot(provider.snapshot().await?, local_runtime_paths)
|
||||
}
|
||||
|
||||
/// Builds a manager from `CODEX_HOME` while omitting the synthetic local
|
||||
/// environment from the loaded snapshot.
|
||||
///
|
||||
/// This preserves configured remote environments for callers that need
|
||||
/// environment-aware HTTP routing but do not have local runtime paths
|
||||
/// available to construct the local execution environment.
|
||||
pub async fn from_codex_home_without_local(
|
||||
codex_home: impl AsRef<std::path::Path>,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let provider = environment_provider_from_codex_home(codex_home.as_ref())?;
|
||||
Self::from_snapshot(
|
||||
without_local_environment(provider.snapshot().await?),
|
||||
/*local_runtime_paths*/ None,
|
||||
)
|
||||
}
|
||||
|
||||
/// Builds a manager from the legacy environment-variable provider without
|
||||
/// reading user config files from `CODEX_HOME`.
|
||||
pub async fn from_env(
|
||||
@@ -294,16 +278,6 @@ impl EnvironmentManager {
|
||||
}
|
||||
}
|
||||
|
||||
fn without_local_environment(
|
||||
mut snapshot: EnvironmentProviderSnapshot,
|
||||
) -> EnvironmentProviderSnapshot {
|
||||
snapshot.include_local = false;
|
||||
if snapshot.default == EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string()) {
|
||||
snapshot.default = EnvironmentDefault::Disabled;
|
||||
}
|
||||
snapshot
|
||||
}
|
||||
|
||||
/// Concrete execution/filesystem environment selected for a session.
|
||||
///
|
||||
/// This bundles the selected backend metadata together with the local runtime
|
||||
@@ -459,7 +433,6 @@ mod tests {
|
||||
use super::EnvironmentManager;
|
||||
use super::LOCAL_ENVIRONMENT_ID;
|
||||
use super::REMOTE_ENVIRONMENT_ID;
|
||||
use super::without_local_environment;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::ProcessId;
|
||||
use crate::environment_provider::EnvironmentDefault;
|
||||
@@ -755,16 +728,16 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_snapshot_without_local_environment_disables_local_default() {
|
||||
let snapshot = EnvironmentProviderSnapshot {
|
||||
let mut snapshot = EnvironmentProviderSnapshot {
|
||||
environments: Vec::new(),
|
||||
default: EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string()),
|
||||
include_local: true,
|
||||
};
|
||||
let manager = EnvironmentManager::from_snapshot(
|
||||
without_local_environment(snapshot),
|
||||
/*local_runtime_paths*/ None,
|
||||
)
|
||||
.expect("environment manager");
|
||||
snapshot.include_local = false;
|
||||
snapshot.default = EnvironmentDefault::Disabled;
|
||||
let manager =
|
||||
EnvironmentManager::from_snapshot(snapshot, /*local_runtime_paths*/ None)
|
||||
.expect("environment manager");
|
||||
|
||||
assert!(manager.default_environment().is_none());
|
||||
assert_eq!(manager.default_environment_id(), None);
|
||||
@@ -772,37 +745,6 @@ mod tests {
|
||||
assert_local_environment_unavailable(&manager);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_snapshot_without_local_environment_keeps_remote_default() {
|
||||
let snapshot = EnvironmentProviderSnapshot {
|
||||
environments: vec![(
|
||||
REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
|
||||
.expect("remote environment"),
|
||||
)],
|
||||
default: EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string()),
|
||||
include_local: true,
|
||||
};
|
||||
let manager = EnvironmentManager::from_snapshot(
|
||||
without_local_environment(snapshot),
|
||||
/*local_runtime_paths*/ None,
|
||||
)
|
||||
.expect("environment manager");
|
||||
|
||||
assert_eq!(
|
||||
manager.default_environment_id(),
|
||||
Some(REMOTE_ENVIRONMENT_ID)
|
||||
);
|
||||
assert!(
|
||||
manager
|
||||
.default_environment()
|
||||
.expect("remote default environment")
|
||||
.is_remote()
|
||||
);
|
||||
assert!(manager.get_environment(LOCAL_ENVIRONMENT_ID).is_none());
|
||||
assert_local_environment_unavailable(&manager);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_environment_returns_none_for_unknown_id() {
|
||||
let manager = EnvironmentManager::default_for_tests();
|
||||
|
||||
@@ -25,8 +25,6 @@ mod server;
|
||||
|
||||
pub use client::ExecServerClient;
|
||||
pub use client::ExecServerError;
|
||||
pub use client::http_client::HTTP_REQUEST_NO_PROXY_HEADER;
|
||||
pub use client::http_client::HTTP_REQUEST_NO_REDIRECTS_HEADER;
|
||||
pub use client::http_client::HttpResponseBodyStream;
|
||||
pub use client::http_client::ReqwestHttpClient;
|
||||
pub use client_api::ExecServerClientConnectOptions;
|
||||
|
||||
@@ -178,7 +178,9 @@ impl ExecServerHandler {
|
||||
if stream_response {
|
||||
self.reserve_http_body_stream(&http_request_id).await?;
|
||||
}
|
||||
let response = ReqwestHttpRequestRunner::new(¶ms)?.run(params).await;
|
||||
let response = ReqwestHttpRequestRunner::new(params.timeout_ms)?
|
||||
.run(params)
|
||||
.await;
|
||||
if response.is_err() && stream_response {
|
||||
self.release_http_body_stream(&http_request_id).await;
|
||||
}
|
||||
|
||||
@@ -1,23 +1,20 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Error;
|
||||
use anyhow::Result;
|
||||
use codex_exec_server::HttpClient;
|
||||
use codex_exec_server::HttpRequestParams;
|
||||
use codex_exec_server::ReqwestHttpClient;
|
||||
use codex_protocol::protocol::McpAuthStatus;
|
||||
use reqwest::Client;
|
||||
use reqwest::StatusCode;
|
||||
use reqwest::Url;
|
||||
use reqwest::header::AUTHORIZATION;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::WWW_AUTHENTICATE;
|
||||
use serde::Deserialize;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::oauth::has_oauth_tokens;
|
||||
use crate::utils::apply_default_headers;
|
||||
use crate::utils::build_default_headers;
|
||||
use crate::utils::oauth_discovery_headers;
|
||||
use codex_config::types::OAuthCredentialsStoreMode;
|
||||
|
||||
const DISCOVERY_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
@@ -29,14 +26,6 @@ pub struct StreamableHttpOAuthDiscovery {
|
||||
pub scopes_supported: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct StreamableHttpOAuthMetadata {
|
||||
pub(crate) authorization_endpoint: String,
|
||||
pub(crate) token_endpoint: String,
|
||||
pub(crate) registration_endpoint: Option<String>,
|
||||
pub(crate) scopes_supported: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
/// Determine the authentication status for a streamable HTTP MCP server.
|
||||
pub async fn determine_streamable_http_auth_status(
|
||||
server_name: &str,
|
||||
@@ -45,7 +34,6 @@ pub async fn determine_streamable_http_auth_status(
|
||||
http_headers: Option<HashMap<String, String>>,
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<McpAuthStatus> {
|
||||
if bearer_token_env_var.is_some() {
|
||||
return Ok(McpAuthStatus::BearerToken);
|
||||
@@ -60,7 +48,7 @@ pub async fn determine_streamable_http_auth_status(
|
||||
return Ok(McpAuthStatus::OAuth);
|
||||
}
|
||||
|
||||
match discover_streamable_http_oauth_with_headers(url, &default_headers, http_client).await {
|
||||
match discover_streamable_http_oauth_with_headers(url, &default_headers).await {
|
||||
Ok(Some(_)) => Ok(McpAuthStatus::NotLoggedIn),
|
||||
Ok(None) => Ok(McpAuthStatus::Unsupported),
|
||||
Err(error) => {
|
||||
@@ -74,18 +62,8 @@ pub async fn determine_streamable_http_auth_status(
|
||||
|
||||
/// Attempt to determine whether a streamable HTTP MCP server advertises OAuth login.
|
||||
pub async fn supports_oauth_login(url: &str) -> Result<bool> {
|
||||
supports_oauth_login_with_http_client(url, Arc::new(ReqwestHttpClient)).await
|
||||
}
|
||||
|
||||
pub async fn supports_oauth_login_with_http_client(
|
||||
url: &str,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<bool> {
|
||||
Ok(discover_streamable_http_oauth_with_http_client(
|
||||
url,
|
||||
/*http_headers*/ None,
|
||||
/*env_http_headers*/ None,
|
||||
http_client,
|
||||
Ok(discover_streamable_http_oauth(
|
||||
url, /*http_headers*/ None, /*env_http_headers*/ None,
|
||||
)
|
||||
.await?
|
||||
.is_some())
|
||||
@@ -96,54 +74,63 @@ pub async fn discover_streamable_http_oauth(
|
||||
http_headers: Option<HashMap<String, String>>,
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
) -> Result<Option<StreamableHttpOAuthDiscovery>> {
|
||||
discover_streamable_http_oauth_with_http_client(
|
||||
url,
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
Arc::new(ReqwestHttpClient),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn discover_streamable_http_oauth_with_http_client(
|
||||
url: &str,
|
||||
http_headers: Option<HashMap<String, String>>,
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Option<StreamableHttpOAuthDiscovery>> {
|
||||
Ok(
|
||||
discover_streamable_http_oauth_metadata(url, http_headers, env_http_headers, http_client)
|
||||
.await?
|
||||
.map(|metadata| StreamableHttpOAuthDiscovery {
|
||||
scopes_supported: metadata.scopes_supported,
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn discover_streamable_http_oauth_metadata(
|
||||
url: &str,
|
||||
http_headers: Option<HashMap<String, String>>,
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Option<StreamableHttpOAuthMetadata>> {
|
||||
let default_headers = build_default_headers(http_headers, env_http_headers)?;
|
||||
discover_streamable_http_oauth_with_headers(url, &default_headers, http_client).await
|
||||
discover_streamable_http_oauth_with_headers(url, &default_headers).await
|
||||
}
|
||||
|
||||
async fn discover_streamable_http_oauth_with_headers(
|
||||
url: &str,
|
||||
default_headers: &HeaderMap,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Option<StreamableHttpOAuthMetadata>> {
|
||||
) -> Result<Option<StreamableHttpOAuthDiscovery>> {
|
||||
let base_url = Url::parse(url)?;
|
||||
let request_headers = oauth_request_headers(default_headers)?;
|
||||
if let Some(metadata) =
|
||||
try_discover_oauth_server(&base_url, &request_headers, Arc::clone(&http_client)).await?
|
||||
{
|
||||
return Ok(Some(metadata));
|
||||
|
||||
// Use no_proxy to avoid a bug in the system-configuration crate that
|
||||
// can result in a panic. See #8912.
|
||||
let builder = Client::builder().timeout(DISCOVERY_TIMEOUT).no_proxy();
|
||||
let client = apply_default_headers(builder, default_headers).build()?;
|
||||
|
||||
let mut last_error: Option<Error> = None;
|
||||
for candidate_path in discovery_paths(base_url.path()) {
|
||||
let mut discovery_url = base_url.clone();
|
||||
discovery_url.set_path(&candidate_path);
|
||||
|
||||
let response = match client
|
||||
.get(discovery_url.clone())
|
||||
.header(OAUTH_DISCOVERY_HEADER, OAUTH_DISCOVERY_VERSION)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
last_error = Some(err.into());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if response.status() != StatusCode::OK {
|
||||
continue;
|
||||
}
|
||||
|
||||
let metadata = match response.json::<OAuthDiscoveryMetadata>().await {
|
||||
Ok(metadata) => metadata,
|
||||
Err(err) => {
|
||||
last_error = Some(err.into());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if metadata.authorization_endpoint.is_some() && metadata.token_endpoint.is_some() {
|
||||
return Ok(Some(StreamableHttpOAuthDiscovery {
|
||||
scopes_supported: normalize_scopes(metadata.scopes_supported),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
discover_oauth_server_via_resource_metadata(&base_url, &request_headers, http_client).await
|
||||
if let Some(err) = last_error {
|
||||
debug!("OAuth discovery requests failed for {url}: {err:?}");
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -153,19 +140,9 @@ struct OAuthDiscoveryMetadata {
|
||||
#[serde(default)]
|
||||
token_endpoint: Option<String>,
|
||||
#[serde(default)]
|
||||
registration_endpoint: Option<String>,
|
||||
#[serde(default)]
|
||||
scopes_supported: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResourceServerMetadata {
|
||||
#[serde(default)]
|
||||
authorization_server: Option<String>,
|
||||
#[serde(default)]
|
||||
authorization_servers: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
fn normalize_scopes(scopes_supported: Option<Vec<String>>) -> Option<Vec<String>> {
|
||||
let scopes_supported = scopes_supported?;
|
||||
|
||||
@@ -188,326 +165,30 @@ fn normalize_scopes(scopes_supported: Option<Vec<String>>) -> Option<Vec<String>
|
||||
}
|
||||
}
|
||||
|
||||
fn oauth_request_headers(
|
||||
default_headers: &HeaderMap,
|
||||
) -> Result<Vec<codex_exec_server::HttpHeader>> {
|
||||
let mut request_headers = default_headers.clone();
|
||||
request_headers.insert(OAUTH_DISCOVERY_HEADER, OAUTH_DISCOVERY_VERSION.parse()?);
|
||||
Ok(oauth_discovery_headers(&request_headers))
|
||||
}
|
||||
|
||||
async fn try_discover_oauth_server(
|
||||
base_url: &Url,
|
||||
request_headers: &[codex_exec_server::HttpHeader],
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Option<StreamableHttpOAuthMetadata>> {
|
||||
for discovery_url in discovery_urls(base_url) {
|
||||
if let Some(metadata) =
|
||||
fetch_authorization_metadata(&discovery_url, request_headers, Arc::clone(&http_client))
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(metadata));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn discover_oauth_server_via_resource_metadata(
|
||||
base_url: &Url,
|
||||
request_headers: &[codex_exec_server::HttpHeader],
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Option<StreamableHttpOAuthMetadata>> {
|
||||
let Some(resource_metadata_url) =
|
||||
discover_resource_metadata_url(base_url, request_headers, Arc::clone(&http_client)).await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(resource_metadata) = fetch_resource_metadata_from_url(
|
||||
&resource_metadata_url,
|
||||
request_headers,
|
||||
Arc::clone(&http_client),
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let candidates = resource_metadata.authorization_server.into_iter().chain(
|
||||
resource_metadata
|
||||
.authorization_servers
|
||||
.into_iter()
|
||||
.flatten(),
|
||||
);
|
||||
for candidate in candidates {
|
||||
let candidate = candidate.trim();
|
||||
if candidate.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let candidate_url = match Url::parse(candidate) {
|
||||
Ok(url) => url,
|
||||
Err(_) => match resource_metadata_url.join(candidate) {
|
||||
Ok(url) => url,
|
||||
Err(err) => {
|
||||
debug!("failed to resolve authorization server URL `{candidate}`: {err}");
|
||||
continue;
|
||||
}
|
||||
},
|
||||
};
|
||||
if candidate_url.path().contains("/.well-known/") {
|
||||
if let Some(metadata) = fetch_authorization_metadata(
|
||||
&candidate_url,
|
||||
request_headers,
|
||||
Arc::clone(&http_client),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(metadata));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(metadata) =
|
||||
try_discover_oauth_server(&candidate_url, request_headers, Arc::clone(&http_client))
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(metadata));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn discover_resource_metadata_url(
|
||||
base_url: &Url,
|
||||
request_headers: &[codex_exec_server::HttpHeader],
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Option<Url>> {
|
||||
if let Some(resource_metadata_url) = fetch_resource_metadata_url(
|
||||
base_url,
|
||||
base_url,
|
||||
request_headers,
|
||||
Arc::clone(&http_client),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(resource_metadata_url));
|
||||
}
|
||||
|
||||
for candidate_path in well_known_paths(base_url.path(), "oauth-protected-resource") {
|
||||
let mut discovery_url = base_url.clone();
|
||||
discovery_url.set_query(None);
|
||||
discovery_url.set_fragment(None);
|
||||
discovery_url.set_path(&candidate_path);
|
||||
if let Some(resource_metadata_url) = fetch_resource_metadata_url(
|
||||
&discovery_url,
|
||||
base_url,
|
||||
request_headers,
|
||||
Arc::clone(&http_client),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(resource_metadata_url));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn fetch_resource_metadata_url(
|
||||
url: &Url,
|
||||
base_url: &Url,
|
||||
request_headers: &[codex_exec_server::HttpHeader],
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Option<Url>> {
|
||||
let response = match oauth_get(url, request_headers, http_client).await {
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
debug!("resource metadata probe failed for {url}: {err:?}");
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
if response.status == StatusCode::OK.as_u16() {
|
||||
return Ok(Some(url.clone()));
|
||||
}
|
||||
if response.status != StatusCode::UNAUTHORIZED.as_u16() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(response
|
||||
.headers
|
||||
.iter()
|
||||
.filter(|header| header.name.eq_ignore_ascii_case(WWW_AUTHENTICATE.as_str()))
|
||||
.find_map(|header| extract_resource_metadata_url_from_header(&header.value, base_url)))
|
||||
}
|
||||
|
||||
async fn fetch_resource_metadata_from_url(
|
||||
resource_metadata_url: &Url,
|
||||
request_headers: &[codex_exec_server::HttpHeader],
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Option<ResourceServerMetadata>> {
|
||||
let response = match oauth_get(resource_metadata_url, request_headers, http_client).await {
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
debug!("resource metadata request failed for {resource_metadata_url}: {err:?}");
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
if response.status != StatusCode::OK.as_u16() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some(serde_json::from_slice(&response.body.into_inner())?))
|
||||
}
|
||||
|
||||
async fn fetch_authorization_metadata(
|
||||
discovery_url: &Url,
|
||||
request_headers: &[codex_exec_server::HttpHeader],
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Option<StreamableHttpOAuthMetadata>> {
|
||||
let response = match oauth_get(discovery_url, request_headers, http_client).await {
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
debug!("OAuth discovery request failed for {discovery_url}: {err:?}");
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
if response.status != StatusCode::OK.as_u16() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let metadata =
|
||||
match serde_json::from_slice::<OAuthDiscoveryMetadata>(&response.body.into_inner()) {
|
||||
Ok(metadata) => metadata,
|
||||
Err(err) => {
|
||||
debug!("failed to parse OAuth metadata for {discovery_url}: {err}");
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
let (Some(authorization_endpoint), Some(token_endpoint)) =
|
||||
(metadata.authorization_endpoint, metadata.token_endpoint)
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
Ok(Some(StreamableHttpOAuthMetadata {
|
||||
authorization_endpoint,
|
||||
token_endpoint,
|
||||
registration_endpoint: metadata.registration_endpoint,
|
||||
scopes_supported: normalize_scopes(metadata.scopes_supported),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn oauth_get(
|
||||
url: &Url,
|
||||
request_headers: &[codex_exec_server::HttpHeader],
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<codex_exec_server::HttpRequestResponse> {
|
||||
http_client
|
||||
.http_request(HttpRequestParams {
|
||||
method: "GET".to_string(),
|
||||
url: url.to_string(),
|
||||
headers: request_headers.to_vec(),
|
||||
body: None,
|
||||
timeout_ms: Some(DISCOVERY_TIMEOUT.as_millis() as u64),
|
||||
request_id: "oauth-discovery".to_string(),
|
||||
stream_response: false,
|
||||
})
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Implements MCP authorization server metadata discovery priority.
|
||||
fn discovery_urls(base_url: &Url) -> Vec<Url> {
|
||||
discovery_paths(base_url.path())
|
||||
.into_iter()
|
||||
.map(|candidate_path| {
|
||||
let mut discovery_url = base_url.clone();
|
||||
discovery_url.set_query(None);
|
||||
discovery_url.set_fragment(None);
|
||||
discovery_url.set_path(&candidate_path);
|
||||
discovery_url
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Implements RFC 8414 section 3.1 for discovering well-known oauth endpoints.
|
||||
/// This is a requirement for MCP servers to support OAuth.
|
||||
/// https://datatracker.ietf.org/doc/html/rfc8414#section-3.1
|
||||
/// https://github.com/modelcontextprotocol/rust-sdk/blob/main/crates/rmcp/src/transport/auth.rs#L182
|
||||
fn discovery_paths(base_path: &str) -> Vec<String> {
|
||||
let trimmed = base_path.trim_start_matches('/').trim_end_matches('/');
|
||||
if trimmed.is_empty() {
|
||||
return vec![
|
||||
"/.well-known/oauth-authorization-server".to_string(),
|
||||
"/.well-known/openid-configuration".to_string(),
|
||||
];
|
||||
}
|
||||
vec![
|
||||
format!("/.well-known/oauth-authorization-server/{trimmed}"),
|
||||
format!("/.well-known/openid-configuration/{trimmed}"),
|
||||
format!("/{trimmed}/.well-known/openid-configuration"),
|
||||
"/.well-known/oauth-authorization-server".to_string(),
|
||||
]
|
||||
}
|
||||
let canonical = "/.well-known/oauth-authorization-server".to_string();
|
||||
|
||||
fn well_known_paths(base_path: &str, suffix: &str) -> Vec<String> {
|
||||
let trimmed = base_path.trim_start_matches('/').trim_end_matches('/');
|
||||
let canonical = format!("/.well-known/{suffix}");
|
||||
if trimmed.is_empty() {
|
||||
return vec![canonical];
|
||||
}
|
||||
vec![
|
||||
format!("{canonical}/{trimmed}"),
|
||||
format!("/{trimmed}/.well-known/{suffix}"),
|
||||
canonical,
|
||||
]
|
||||
}
|
||||
|
||||
fn extract_resource_metadata_url_from_header(header: &str, base_url: &Url) -> Option<Url> {
|
||||
let header_lowercase = header.to_ascii_lowercase();
|
||||
let fragment_key = "resource_metadata=";
|
||||
let mut search_offset = 0;
|
||||
while let Some(position) = header_lowercase[search_offset..].find(fragment_key) {
|
||||
let value_offset = search_offset + position + fragment_key.len();
|
||||
let value_slice = &header[value_offset..];
|
||||
if let Some((value, consumed)) = parse_next_header_value(value_slice) {
|
||||
if let Ok(url) = Url::parse(&value) {
|
||||
return Some(url);
|
||||
}
|
||||
if let Ok(url) = base_url.join(&value) {
|
||||
return Some(url);
|
||||
}
|
||||
search_offset = value_offset + consumed;
|
||||
continue;
|
||||
let mut candidates = Vec::new();
|
||||
let mut push_unique = |candidate: String| {
|
||||
if !candidates.contains(&candidate) {
|
||||
candidates.push(candidate);
|
||||
}
|
||||
break;
|
||||
}
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
fn parse_next_header_value(header_fragment: &str) -> Option<(String, usize)> {
|
||||
let trimmed = header_fragment.trim_start();
|
||||
let leading_ws = header_fragment.len() - trimmed.len();
|
||||
if let Some(stripped) = trimmed.strip_prefix('"') {
|
||||
let mut escaped = false;
|
||||
let mut result = String::new();
|
||||
for (index, character) in stripped.char_indices() {
|
||||
if escaped {
|
||||
result.push(character);
|
||||
escaped = false;
|
||||
continue;
|
||||
}
|
||||
match character {
|
||||
'\\' => escaped = true,
|
||||
'"' => return Some((result, leading_ws + index + 2)),
|
||||
_ => result.push(character),
|
||||
}
|
||||
}
|
||||
return None;
|
||||
}
|
||||
push_unique(format!("{canonical}/{trimmed}"));
|
||||
push_unique(format!("/{trimmed}/.well-known/oauth-authorization-server"));
|
||||
push_unique(canonical);
|
||||
|
||||
let end = trimmed
|
||||
.find(|character: char| character == ',' || character == ';' || character.is_whitespace())
|
||||
.unwrap_or(trimmed.len());
|
||||
Some((trimmed[..end].to_string(), leading_ws + end))
|
||||
candidates
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -516,118 +197,12 @@ mod tests {
|
||||
use axum::Json;
|
||||
use axum::Router;
|
||||
use axum::routing::get;
|
||||
use codex_exec_server::ExecServerError;
|
||||
use codex_exec_server::HTTP_REQUEST_NO_PROXY_HEADER;
|
||||
use codex_exec_server::HttpClient;
|
||||
use codex_exec_server::HttpRequestParams;
|
||||
use codex_exec_server::HttpRequestResponse;
|
||||
use codex_exec_server::HttpResponseBodyStream;
|
||||
use codex_exec_server::ReqwestHttpClient;
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serial_test::serial;
|
||||
use std::collections::HashMap;
|
||||
use std::ffi::OsString;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
#[derive(Default)]
|
||||
struct ScriptedHttpClient {
|
||||
requests: std::sync::Mutex<Vec<HttpRequestParams>>,
|
||||
}
|
||||
|
||||
impl ScriptedHttpClient {
|
||||
fn requests(&self) -> Vec<HttpRequestParams> {
|
||||
self.requests.lock().expect("lock requests").clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpClient for ScriptedHttpClient {
|
||||
fn http_request(
|
||||
&self,
|
||||
params: HttpRequestParams,
|
||||
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>> {
|
||||
self.requests
|
||||
.lock()
|
||||
.expect("lock requests")
|
||||
.push(params.clone());
|
||||
async move {
|
||||
let response = match params.url.as_str() {
|
||||
"http://remote-only.invalid/.well-known/oauth-authorization-server/mcp" => {
|
||||
HttpRequestResponse {
|
||||
status: 404,
|
||||
headers: Vec::new(),
|
||||
body: Vec::new().into(),
|
||||
}
|
||||
}
|
||||
"http://remote-only.invalid/.well-known/openid-configuration/mcp" => {
|
||||
HttpRequestResponse {
|
||||
status: 200,
|
||||
headers: Vec::new(),
|
||||
body: serde_json::to_vec(&serde_json::json!({
|
||||
"authorization_endpoint": "https://auth.remote.example/oauth/authorize",
|
||||
"token_endpoint": "https://auth.remote.example/oauth/token",
|
||||
}))
|
||||
.expect("serialize oidc metadata")
|
||||
.into(),
|
||||
}
|
||||
}
|
||||
"http://resource-only.invalid/mcp" => HttpRequestResponse {
|
||||
status: 401,
|
||||
headers: vec![codex_exec_server::HttpHeader {
|
||||
name: "www-authenticate".to_string(),
|
||||
value: "Bearer resource_metadata=\"/.well-known/oauth-protected-resource/mcp\"".to_string(),
|
||||
}],
|
||||
body: Vec::new().into(),
|
||||
},
|
||||
"http://resource-only.invalid/.well-known/oauth-protected-resource/mcp" => {
|
||||
HttpRequestResponse {
|
||||
status: 200,
|
||||
headers: Vec::new(),
|
||||
body: serde_json::to_vec(&serde_json::json!({
|
||||
"authorization_servers": ["https://auth.remote.example/tenant"],
|
||||
}))
|
||||
.expect("serialize resource metadata")
|
||||
.into(),
|
||||
}
|
||||
}
|
||||
"https://auth.remote.example/.well-known/oauth-authorization-server/tenant" => {
|
||||
HttpRequestResponse {
|
||||
status: 200,
|
||||
headers: Vec::new(),
|
||||
body: serde_json::to_vec(&serde_json::json!({
|
||||
"authorization_endpoint": "https://auth.remote.example/oauth/authorize",
|
||||
"token_endpoint": "https://auth.remote.example/oauth/token",
|
||||
}))
|
||||
.expect("serialize oauth metadata")
|
||||
.into(),
|
||||
}
|
||||
}
|
||||
_ => HttpRequestResponse {
|
||||
status: 404,
|
||||
headers: Vec::new(),
|
||||
body: Vec::new().into(),
|
||||
},
|
||||
};
|
||||
Ok(response)
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn http_request_stream(
|
||||
&self,
|
||||
_params: HttpRequestParams,
|
||||
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>>
|
||||
{
|
||||
async move {
|
||||
Err(ExecServerError::HttpRequest(
|
||||
"unexpected stream".to_string(),
|
||||
))
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
struct TestServer {
|
||||
url: String,
|
||||
handle: JoinHandle<()>,
|
||||
@@ -708,7 +283,6 @@ mod tests {
|
||||
)])),
|
||||
/*env_http_headers*/ None,
|
||||
OAuthCredentialsStoreMode::Keyring,
|
||||
Arc::new(ReqwestHttpClient),
|
||||
)
|
||||
.await
|
||||
.expect("status should compute");
|
||||
@@ -730,7 +304,6 @@ mod tests {
|
||||
"CODEX_RMCP_CLIENT_AUTH_STATUS_TEST_TOKEN".to_string(),
|
||||
)])),
|
||||
OAuthCredentialsStoreMode::Keyring,
|
||||
Arc::new(ReqwestHttpClient),
|
||||
)
|
||||
.await
|
||||
.expect("status should compute");
|
||||
@@ -797,52 +370,4 @@ mod tests {
|
||||
|
||||
assert!(supported);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn discover_streamable_http_oauth_uses_openid_fallback_through_selected_client() {
|
||||
let http_client = Arc::new(ScriptedHttpClient::default());
|
||||
let discovery = discover_streamable_http_oauth_with_http_client(
|
||||
"http://remote-only.invalid/mcp",
|
||||
/*http_headers*/ None,
|
||||
/*env_http_headers*/ None,
|
||||
http_client.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("discovery should succeed")
|
||||
.expect("oauth support should be detected");
|
||||
|
||||
assert_eq!(discovery.scopes_supported, None);
|
||||
assert_eq!(
|
||||
http_client
|
||||
.requests()
|
||||
.iter()
|
||||
.map(|request| request.url.as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
"http://remote-only.invalid/.well-known/oauth-authorization-server/mcp",
|
||||
"http://remote-only.invalid/.well-known/openid-configuration/mcp",
|
||||
]
|
||||
);
|
||||
assert!(http_client.requests().iter().all(|request| {
|
||||
request
|
||||
.headers
|
||||
.iter()
|
||||
.any(|header| header.name == HTTP_REQUEST_NO_PROXY_HEADER)
|
||||
}));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn discover_streamable_http_oauth_uses_resource_metadata_fallback() {
|
||||
let discovery = discover_streamable_http_oauth_with_http_client(
|
||||
"http://resource-only.invalid/mcp",
|
||||
/*http_headers*/ None,
|
||||
/*env_http_headers*/ None,
|
||||
Arc::new(ScriptedHttpClient::default()),
|
||||
)
|
||||
.await
|
||||
.expect("discovery should succeed")
|
||||
.expect("oauth support should be detected");
|
||||
|
||||
assert_eq!(discovery.scopes_supported, None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ use std::time::Duration;
|
||||
use axum::Router;
|
||||
use axum::body::Body;
|
||||
use axum::extract::Json;
|
||||
use axum::extract::RawQuery;
|
||||
use axum::extract::State;
|
||||
use axum::http::HeaderMap;
|
||||
use axum::http::HeaderValue;
|
||||
@@ -22,7 +21,6 @@ use axum::http::header::HOST;
|
||||
use axum::http::header::WWW_AUTHENTICATE;
|
||||
use axum::middleware;
|
||||
use axum::middleware::Next;
|
||||
use axum::response::Redirect;
|
||||
use axum::response::Response;
|
||||
use axum::routing::get;
|
||||
use axum::routing::post;
|
||||
@@ -155,8 +153,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route("/oauth/authorize", get(fake_oauth_authorize))
|
||||
.route("/oauth/token", post(fake_oauth_token))
|
||||
.nest_service(
|
||||
"/mcp",
|
||||
StreamableHttpService::new(
|
||||
@@ -183,34 +179,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fake_oauth_authorize(RawQuery(query): RawQuery) -> Result<Redirect, StatusCode> {
|
||||
let params = reqwest::Url::parse(&format!(
|
||||
"http://localhost/?{}",
|
||||
query.as_deref().ok_or(StatusCode::BAD_REQUEST)?
|
||||
))
|
||||
.map_err(|_| StatusCode::BAD_REQUEST)?
|
||||
.query_pairs()
|
||||
.into_owned()
|
||||
.collect::<HashMap<_, _>>();
|
||||
let redirect_uri = params.get("redirect_uri").ok_or(StatusCode::BAD_REQUEST)?;
|
||||
let state = params.get("state").ok_or(StatusCode::BAD_REQUEST)?;
|
||||
let mut callback_url =
|
||||
reqwest::Url::parse(redirect_uri).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
callback_url
|
||||
.query_pairs_mut()
|
||||
.append_pair("code", "test-authorization-code")
|
||||
.append_pair("state", state);
|
||||
Ok(Redirect::temporary(callback_url.as_str()))
|
||||
}
|
||||
|
||||
async fn fake_oauth_token() -> Json<serde_json::Value> {
|
||||
Json(json!({
|
||||
"access_token": "test-access-token",
|
||||
"refresh_token": "test-refresh-token",
|
||||
"token_type": "bearer",
|
||||
}))
|
||||
}
|
||||
|
||||
impl ServerHandler for TestToolServer {
|
||||
fn get_info(&self) -> ServerInfo {
|
||||
ServerInfo::new(
|
||||
|
||||
@@ -14,9 +14,7 @@ mod utils;
|
||||
pub use auth_status::StreamableHttpOAuthDiscovery;
|
||||
pub use auth_status::determine_streamable_http_auth_status;
|
||||
pub use auth_status::discover_streamable_http_oauth;
|
||||
pub use auth_status::discover_streamable_http_oauth_with_http_client;
|
||||
pub use auth_status::supports_oauth_login;
|
||||
pub use auth_status::supports_oauth_login_with_http_client;
|
||||
pub use codex_protocol::protocol::McpAuthStatus;
|
||||
pub use in_process_transport::InProcessTransportFactory;
|
||||
pub use oauth::StoredOAuthTokens;
|
||||
@@ -28,10 +26,7 @@ pub use perform_oauth_login::OAuthProviderError;
|
||||
pub use perform_oauth_login::OauthLoginHandle;
|
||||
pub use perform_oauth_login::perform_oauth_login;
|
||||
pub use perform_oauth_login::perform_oauth_login_return_url;
|
||||
pub use perform_oauth_login::perform_oauth_login_return_url_with_http_client;
|
||||
pub use perform_oauth_login::perform_oauth_login_silent;
|
||||
pub use perform_oauth_login::perform_oauth_login_silent_with_http_client;
|
||||
pub use perform_oauth_login::perform_oauth_login_with_http_client;
|
||||
pub use rmcp::model::ElicitationAction;
|
||||
pub use rmcp_client::Elicitation;
|
||||
pub use rmcp_client::ElicitationResponse;
|
||||
|
||||
@@ -9,34 +9,12 @@ use anyhow::anyhow;
|
||||
use anyhow::bail;
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
|
||||
use codex_exec_server::HttpClient;
|
||||
use codex_exec_server::HttpRequestParams;
|
||||
use oauth2::AsyncHttpClient;
|
||||
use oauth2::AuthUrl;
|
||||
use oauth2::AuthorizationCode;
|
||||
use oauth2::ClientId;
|
||||
use oauth2::ClientSecret;
|
||||
use oauth2::CsrfToken;
|
||||
use oauth2::EndpointNotSet;
|
||||
use oauth2::EndpointSet;
|
||||
use oauth2::HttpRequest;
|
||||
use oauth2::HttpResponse;
|
||||
use oauth2::PkceCodeChallenge;
|
||||
use oauth2::PkceCodeVerifier;
|
||||
use oauth2::RedirectUrl;
|
||||
use oauth2::RequestTokenError;
|
||||
use oauth2::RevocationErrorResponseType;
|
||||
use oauth2::Scope;
|
||||
use oauth2::StandardErrorResponse;
|
||||
use oauth2::StandardRevocableToken;
|
||||
use oauth2::StandardTokenIntrospectionResponse;
|
||||
use oauth2::TokenUrl;
|
||||
use oauth2::basic::BasicErrorResponseType;
|
||||
use oauth2::basic::BasicTokenType;
|
||||
use reqwest::ClientBuilder;
|
||||
use reqwest::Url;
|
||||
use reqwest::header::HeaderMap;
|
||||
use rmcp::transport::auth::OAuthTokenResponse;
|
||||
use rmcp::transport::auth::VendorExtraTokenFields;
|
||||
use rmcp::transport::AuthorizationManager;
|
||||
use rmcp::transport::AuthorizationSession;
|
||||
use rmcp::transport::auth::OAuthClientConfig;
|
||||
use rmcp::transport::auth::OAuthState;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use tiny_http::Response;
|
||||
@@ -47,13 +25,10 @@ use urlencoding::decode;
|
||||
|
||||
use crate::StoredOAuthTokens;
|
||||
use crate::WrappedOAuthTokenResponse;
|
||||
use crate::auth_status::StreamableHttpOAuthMetadata;
|
||||
use crate::auth_status::discover_streamable_http_oauth_metadata;
|
||||
use crate::oauth::compute_expires_at_millis;
|
||||
use crate::save_oauth_tokens;
|
||||
use crate::utils::apply_default_headers;
|
||||
use crate::utils::build_default_headers;
|
||||
use crate::utils::oauth_token_headers;
|
||||
use crate::utils::protocol_headers;
|
||||
use codex_config::types::OAuthCredentialsStoreMode;
|
||||
|
||||
struct OauthHeaders {
|
||||
@@ -61,159 +36,6 @@ struct OauthHeaders {
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
type OAuthClient = oauth2::Client<
|
||||
StandardErrorResponse<BasicErrorResponseType>,
|
||||
OAuthTokenResponse,
|
||||
StandardTokenIntrospectionResponse<VendorExtraTokenFields, BasicTokenType>,
|
||||
StandardRevocableToken,
|
||||
StandardErrorResponse<RevocationErrorResponseType>,
|
||||
EndpointSet,
|
||||
EndpointNotSet,
|
||||
EndpointNotSet,
|
||||
EndpointNotSet,
|
||||
EndpointSet,
|
||||
>;
|
||||
|
||||
type OAuthClientBuilder = oauth2::Client<
|
||||
StandardErrorResponse<BasicErrorResponseType>,
|
||||
OAuthTokenResponse,
|
||||
StandardTokenIntrospectionResponse<VendorExtraTokenFields, BasicTokenType>,
|
||||
StandardRevocableToken,
|
||||
StandardErrorResponse<RevocationErrorResponseType>,
|
||||
>;
|
||||
|
||||
struct OAuthState {
|
||||
client: OAuthClient,
|
||||
client_id: String,
|
||||
pkce_verifier: PkceCodeVerifier,
|
||||
csrf_state: CsrfToken,
|
||||
authorization_url: String,
|
||||
default_headers: HeaderMap,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
}
|
||||
|
||||
impl OAuthState {
|
||||
fn new(
|
||||
metadata: StreamableHttpOAuthMetadata,
|
||||
client: OAuthClientConfig,
|
||||
redirect_uri: &str,
|
||||
scopes: &[&str],
|
||||
default_headers: HeaderMap,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<Self> {
|
||||
let OAuthClientConfig {
|
||||
client_id,
|
||||
client_secret,
|
||||
} = client;
|
||||
let mut client = OAuthClientBuilder::new(ClientId::new(client_id.clone()))
|
||||
.set_auth_uri(AuthUrl::new(metadata.authorization_endpoint)?)
|
||||
.set_token_uri(TokenUrl::new(metadata.token_endpoint)?)
|
||||
.set_redirect_uri(RedirectUrl::new(redirect_uri.to_string())?);
|
||||
if let Some(client_secret) = client_secret {
|
||||
client = client.set_client_secret(ClientSecret::new(client_secret));
|
||||
}
|
||||
let (pkce_challenge, pkce_verifier) = PkceCodeChallenge::new_random_sha256();
|
||||
let mut request = client
|
||||
.authorize_url(CsrfToken::new_random)
|
||||
.set_pkce_challenge(pkce_challenge);
|
||||
for scope in scopes {
|
||||
request = request.add_scope(Scope::new((*scope).to_string()));
|
||||
}
|
||||
let (authorization_url, csrf_state) = request.url();
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
client_id,
|
||||
pkce_verifier,
|
||||
csrf_state,
|
||||
authorization_url: authorization_url.to_string(),
|
||||
default_headers,
|
||||
http_client,
|
||||
})
|
||||
}
|
||||
|
||||
fn authorization_url(&self) -> &str {
|
||||
&self.authorization_url
|
||||
}
|
||||
|
||||
async fn handle_callback(
|
||||
self,
|
||||
code: &str,
|
||||
csrf_state: &str,
|
||||
) -> Result<(String, OAuthTokenResponse)> {
|
||||
if self.csrf_state.secret() != csrf_state {
|
||||
bail!("OAuth callback state did not match login request");
|
||||
}
|
||||
let http_client = RoutedOAuthHttpClient::new(self.http_client, self.default_headers);
|
||||
let credentials = match self
|
||||
.client
|
||||
.exchange_code(AuthorizationCode::new(code.to_string()))
|
||||
.set_pkce_verifier(self.pkce_verifier)
|
||||
.request_async(&http_client)
|
||||
.await
|
||||
{
|
||||
Ok(credentials) => credentials,
|
||||
Err(RequestTokenError::Parse(_, body)) => {
|
||||
serde_json::from_slice::<OAuthTokenResponse>(&body)?
|
||||
}
|
||||
Err(error) => return Err(anyhow!("OAuth token exchange failed: {error}")),
|
||||
};
|
||||
Ok((self.client_id, credentials))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RoutedOAuthHttpClient {
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
default_headers: HeaderMap,
|
||||
}
|
||||
|
||||
impl RoutedOAuthHttpClient {
|
||||
fn new(http_client: Arc<dyn HttpClient>, default_headers: HeaderMap) -> Self {
|
||||
Self {
|
||||
http_client,
|
||||
default_headers,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error(transparent)]
|
||||
struct RoutedOAuthHttpClientError(#[from] anyhow::Error);
|
||||
|
||||
impl<'c> AsyncHttpClient<'c> for RoutedOAuthHttpClient {
|
||||
type Error = RoutedOAuthHttpClientError;
|
||||
type Future = futures::future::BoxFuture<'c, Result<HttpResponse, Self::Error>>;
|
||||
|
||||
fn call(&'c self, request: HttpRequest) -> Self::Future {
|
||||
Box::pin(async move {
|
||||
let (parts, body) = request.into_parts();
|
||||
let mut headers = self.default_headers.clone();
|
||||
headers.extend(parts.headers);
|
||||
let response = self
|
||||
.http_client
|
||||
.http_request(HttpRequestParams {
|
||||
method: parts.method.to_string(),
|
||||
url: parts.uri.to_string(),
|
||||
headers: oauth_token_headers(&headers),
|
||||
body: Some(body.into()),
|
||||
timeout_ms: None,
|
||||
request_id: "oauth-request".to_string(),
|
||||
stream_response: false,
|
||||
})
|
||||
.await
|
||||
.map_err(|err| RoutedOAuthHttpClientError(anyhow!(err)))?;
|
||||
let mut builder = oauth2::http::Response::builder().status(response.status);
|
||||
for header in response.headers {
|
||||
builder = builder.header(header.name, header.value);
|
||||
}
|
||||
builder
|
||||
.body(response.body.into_inner())
|
||||
.map_err(|err: oauth2::http::Error| RoutedOAuthHttpClientError(anyhow!(err)))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct CallbackServerGuard {
|
||||
server: Arc<Server>,
|
||||
}
|
||||
@@ -266,36 +88,6 @@ pub async fn perform_oauth_login(
|
||||
oauth_resource: Option<&str>,
|
||||
callback_port: Option<u16>,
|
||||
callback_url: Option<&str>,
|
||||
) -> Result<()> {
|
||||
perform_oauth_login_with_http_client(
|
||||
server_name,
|
||||
server_url,
|
||||
store_mode,
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
scopes,
|
||||
oauth_client_id,
|
||||
oauth_resource,
|
||||
callback_port,
|
||||
callback_url,
|
||||
Arc::new(codex_exec_server::ReqwestHttpClient),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn perform_oauth_login_with_http_client(
|
||||
server_name: &str,
|
||||
server_url: &str,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
http_headers: Option<HashMap<String, String>>,
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
scopes: &[String],
|
||||
oauth_client_id: Option<&str>,
|
||||
oauth_resource: Option<&str>,
|
||||
callback_port: Option<u16>,
|
||||
callback_url: Option<&str>,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<()> {
|
||||
perform_oauth_login_with_browser_output(
|
||||
server_name,
|
||||
@@ -303,7 +95,6 @@ pub async fn perform_oauth_login_with_http_client(
|
||||
store_mode,
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
http_client,
|
||||
scopes,
|
||||
oauth_client_id,
|
||||
oauth_resource,
|
||||
@@ -326,36 +117,6 @@ pub async fn perform_oauth_login_silent(
|
||||
oauth_resource: Option<&str>,
|
||||
callback_port: Option<u16>,
|
||||
callback_url: Option<&str>,
|
||||
) -> Result<()> {
|
||||
perform_oauth_login_silent_with_http_client(
|
||||
server_name,
|
||||
server_url,
|
||||
store_mode,
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
scopes,
|
||||
oauth_client_id,
|
||||
oauth_resource,
|
||||
callback_port,
|
||||
callback_url,
|
||||
Arc::new(codex_exec_server::ReqwestHttpClient),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn perform_oauth_login_silent_with_http_client(
|
||||
server_name: &str,
|
||||
server_url: &str,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
http_headers: Option<HashMap<String, String>>,
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
scopes: &[String],
|
||||
oauth_client_id: Option<&str>,
|
||||
oauth_resource: Option<&str>,
|
||||
callback_port: Option<u16>,
|
||||
callback_url: Option<&str>,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<()> {
|
||||
perform_oauth_login_with_browser_output(
|
||||
server_name,
|
||||
@@ -363,7 +124,6 @@ pub async fn perform_oauth_login_silent_with_http_client(
|
||||
store_mode,
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
http_client,
|
||||
scopes,
|
||||
oauth_client_id,
|
||||
oauth_resource,
|
||||
@@ -381,7 +141,6 @@ async fn perform_oauth_login_with_browser_output(
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
http_headers: Option<HashMap<String, String>>,
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
scopes: &[String],
|
||||
oauth_client_id: Option<&str>,
|
||||
oauth_resource: Option<&str>,
|
||||
@@ -398,7 +157,6 @@ async fn perform_oauth_login_with_browser_output(
|
||||
server_url,
|
||||
store_mode,
|
||||
headers,
|
||||
http_client,
|
||||
scopes,
|
||||
oauth_client_id,
|
||||
oauth_resource,
|
||||
@@ -425,38 +183,6 @@ pub async fn perform_oauth_login_return_url(
|
||||
timeout_secs: Option<i64>,
|
||||
callback_port: Option<u16>,
|
||||
callback_url: Option<&str>,
|
||||
) -> Result<OauthLoginHandle> {
|
||||
perform_oauth_login_return_url_with_http_client(
|
||||
server_name,
|
||||
server_url,
|
||||
store_mode,
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
scopes,
|
||||
oauth_client_id,
|
||||
oauth_resource,
|
||||
timeout_secs,
|
||||
callback_port,
|
||||
callback_url,
|
||||
Arc::new(codex_exec_server::ReqwestHttpClient),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn perform_oauth_login_return_url_with_http_client(
|
||||
server_name: &str,
|
||||
server_url: &str,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
http_headers: Option<HashMap<String, String>>,
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
scopes: &[String],
|
||||
oauth_client_id: Option<&str>,
|
||||
oauth_resource: Option<&str>,
|
||||
timeout_secs: Option<i64>,
|
||||
callback_port: Option<u16>,
|
||||
callback_url: Option<&str>,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<OauthLoginHandle> {
|
||||
let headers = OauthHeaders {
|
||||
http_headers,
|
||||
@@ -467,7 +193,6 @@ pub async fn perform_oauth_login_return_url_with_http_client(
|
||||
server_url,
|
||||
store_mode,
|
||||
headers,
|
||||
http_client,
|
||||
scopes,
|
||||
oauth_client_id,
|
||||
oauth_resource,
|
||||
@@ -721,7 +446,6 @@ impl OauthLoginFlow {
|
||||
server_url: &str,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
headers: OauthHeaders,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
scopes: &[String],
|
||||
oauth_client_id: Option<&str>,
|
||||
oauth_resource: Option<&str>,
|
||||
@@ -752,19 +476,27 @@ impl OauthLoginFlow {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
spawn_callback_server(server, tx, callback_path);
|
||||
|
||||
let OauthHeaders {
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
} = headers;
|
||||
let default_headers = build_default_headers(http_headers, env_http_headers)?;
|
||||
let http_client = apply_default_headers(ClientBuilder::new(), &default_headers).build()?;
|
||||
|
||||
let scope_refs: Vec<&str> = scopes.iter().map(String::as_str).collect();
|
||||
let oauth_state = start_authorization(
|
||||
server_name,
|
||||
server_url,
|
||||
http_client,
|
||||
headers,
|
||||
&scope_refs,
|
||||
&redirect_uri,
|
||||
oauth_client_id,
|
||||
)
|
||||
.await?;
|
||||
let auth_url =
|
||||
append_query_param(oauth_state.authorization_url(), "resource", oauth_resource);
|
||||
let auth_url = append_query_param(
|
||||
&oauth_state.get_authorization_url().await?,
|
||||
"resource",
|
||||
oauth_resource,
|
||||
);
|
||||
let timeout_secs = timeout_secs.unwrap_or(DEFAULT_OAUTH_TIMEOUT_SECS).max(1);
|
||||
let timeout = Duration::from_secs(timeout_secs as u64);
|
||||
|
||||
@@ -818,12 +550,19 @@ impl OauthLoginFlow {
|
||||
CallbackResult::Error(error) => return Err(anyhow!(error)),
|
||||
};
|
||||
|
||||
let (client_id, credentials) = self
|
||||
.oauth_state
|
||||
self.oauth_state
|
||||
.handle_callback(&code, &csrf_state)
|
||||
.await
|
||||
.context("failed to handle OAuth callback")?;
|
||||
|
||||
let (client_id, credentials_opt) = self
|
||||
.oauth_state
|
||||
.get_credentials()
|
||||
.await
|
||||
.context("failed to retrieve OAuth credentials")?;
|
||||
let credentials = credentials_opt
|
||||
.ok_or_else(|| anyhow!("OAuth provider did not return credentials"))?;
|
||||
|
||||
let expires_at = compute_expires_at_millis(&credentials);
|
||||
let stored = StoredOAuthTokens {
|
||||
server_name: self.server_name.clone(),
|
||||
@@ -863,110 +602,34 @@ impl OauthLoginFlow {
|
||||
}
|
||||
|
||||
async fn start_authorization(
|
||||
server_name: &str,
|
||||
server_url: &str,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
headers: OauthHeaders,
|
||||
http_client: reqwest::Client,
|
||||
scopes: &[&str],
|
||||
redirect_uri: &str,
|
||||
oauth_client_id: Option<&str>,
|
||||
) -> Result<OAuthState> {
|
||||
let OauthHeaders {
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
} = headers;
|
||||
let metadata = discover_streamable_http_oauth_metadata(
|
||||
server_url,
|
||||
http_headers.clone(),
|
||||
env_http_headers.clone(),
|
||||
Arc::clone(&http_client),
|
||||
)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow!("MCP server `{server_name}` does not advertise OAuth metadata"))?;
|
||||
let default_headers = build_default_headers(http_headers.clone(), env_http_headers.clone())?;
|
||||
let client = match oauth_client_id.filter(|client_id| !client_id.trim().is_empty()) {
|
||||
Some(client_id) => OAuthClientConfig {
|
||||
client_id: client_id.to_string(),
|
||||
client_secret: None,
|
||||
},
|
||||
None => {
|
||||
register_oauth_client(
|
||||
&metadata,
|
||||
redirect_uri,
|
||||
http_headers,
|
||||
env_http_headers,
|
||||
Arc::clone(&http_client),
|
||||
)
|
||||
.await?
|
||||
}
|
||||
let Some(oauth_client_id) = oauth_client_id.filter(|client_id| !client_id.trim().is_empty())
|
||||
else {
|
||||
let mut oauth_state = OAuthState::new(server_url, Some(http_client)).await?;
|
||||
oauth_state
|
||||
.start_authorization(scopes, redirect_uri, Some("Codex"))
|
||||
.await?;
|
||||
return Ok(oauth_state);
|
||||
};
|
||||
OAuthState::new(
|
||||
metadata,
|
||||
client,
|
||||
redirect_uri,
|
||||
scopes,
|
||||
default_headers,
|
||||
http_client,
|
||||
)
|
||||
}
|
||||
|
||||
async fn register_oauth_client(
|
||||
metadata: &StreamableHttpOAuthMetadata,
|
||||
redirect_uri: &str,
|
||||
http_headers: Option<HashMap<String, String>>,
|
||||
env_http_headers: Option<HashMap<String, String>>,
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
) -> Result<OAuthClientConfig> {
|
||||
let registration_url = metadata
|
||||
.registration_endpoint
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("OAuth server does not support dynamic client registration"))?;
|
||||
let default_headers = build_default_headers(http_headers, env_http_headers)?;
|
||||
let registration_request = serde_json::json!({
|
||||
"client_name": "Codex",
|
||||
"redirect_uris": [redirect_uri],
|
||||
"grant_types": ["authorization_code", "refresh_token"],
|
||||
"token_endpoint_auth_method": "none",
|
||||
"response_types": ["code"],
|
||||
});
|
||||
let mut headers = default_headers;
|
||||
headers.insert(reqwest::header::CONTENT_TYPE, "application/json".parse()?);
|
||||
let response = http_client
|
||||
.http_request(HttpRequestParams {
|
||||
method: "POST".to_string(),
|
||||
url: registration_url.clone(),
|
||||
headers: protocol_headers(&headers),
|
||||
body: Some(serde_json::to_vec(®istration_request)?.into()),
|
||||
timeout_ms: None,
|
||||
request_id: "oauth-register".to_string(),
|
||||
stream_response: false,
|
||||
})
|
||||
.await?;
|
||||
if !(200..300).contains(&response.status) {
|
||||
bail!(
|
||||
"OAuth dynamic client registration returned HTTP {}",
|
||||
response.status
|
||||
);
|
||||
}
|
||||
#[derive(serde::Deserialize)]
|
||||
struct ClientRegistrationResponse {
|
||||
client_id: String,
|
||||
#[serde(default)]
|
||||
client_secret: Option<String>,
|
||||
}
|
||||
let response =
|
||||
serde_json::from_slice::<ClientRegistrationResponse>(&response.body.into_inner())?;
|
||||
Ok(OAuthClientConfig {
|
||||
client_id: response.client_id,
|
||||
client_secret: response
|
||||
.client_secret
|
||||
.filter(|client_secret| !client_secret.trim().is_empty()),
|
||||
})
|
||||
}
|
||||
let mut auth_manager = AuthorizationManager::new(server_url).await?;
|
||||
auth_manager.with_client(http_client)?;
|
||||
let metadata = auth_manager.discover_metadata().await?;
|
||||
auth_manager.set_metadata(metadata);
|
||||
auth_manager.configure_client(
|
||||
OAuthClientConfig::new(oauth_client_id, redirect_uri)
|
||||
.with_scopes(scopes.iter().map(|scope| (*scope).to_string()).collect()),
|
||||
)?;
|
||||
let auth_url = auth_manager.get_authorization_url(scopes).await?;
|
||||
|
||||
struct OAuthClientConfig {
|
||||
client_id: String,
|
||||
client_secret: Option<String>,
|
||||
Ok(OAuthState::Session(
|
||||
AuthorizationSession::for_scope_upgrade(auth_manager, auth_url, redirect_uri),
|
||||
))
|
||||
}
|
||||
|
||||
fn append_query_param(url: &str, key: &str, value: Option<&str>) -> String {
|
||||
@@ -988,31 +651,16 @@ fn append_query_param(url: &str, key: &str, value: Option<&str>) -> String {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use axum::Json;
|
||||
use axum::Router;
|
||||
use axum::routing::get;
|
||||
use codex_exec_server::ExecServerError;
|
||||
use codex_exec_server::HTTP_REQUEST_NO_REDIRECTS_HEADER;
|
||||
use codex_exec_server::HttpClient;
|
||||
use codex_exec_server::HttpRequestParams;
|
||||
use codex_exec_server::HttpRequestResponse;
|
||||
use codex_exec_server::HttpResponseBodyStream;
|
||||
use codex_exec_server::ReqwestHttpClient;
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use oauth2::AsyncHttpClient;
|
||||
use pretty_assertions::assert_eq;
|
||||
use reqwest::Url;
|
||||
use reqwest::header::HeaderMap;
|
||||
use serde_json::json;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use super::CallbackOutcome;
|
||||
use super::OAuthProviderError;
|
||||
use super::OauthHeaders;
|
||||
use super::append_callback_id_to_redirect_uri;
|
||||
use super::append_query_param;
|
||||
use super::callback_id_from_server_url;
|
||||
@@ -1020,62 +668,6 @@ mod tests {
|
||||
use super::parse_oauth_callback;
|
||||
use super::start_authorization;
|
||||
|
||||
#[derive(Default)]
|
||||
struct RemoteOnlyHttpClient {
|
||||
requests: Mutex<Vec<HttpRequestParams>>,
|
||||
}
|
||||
|
||||
impl RemoteOnlyHttpClient {
|
||||
fn requests(&self) -> Vec<HttpRequestParams> {
|
||||
self.requests.lock().expect("lock requests").clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpClient for RemoteOnlyHttpClient {
|
||||
fn http_request(
|
||||
&self,
|
||||
params: HttpRequestParams,
|
||||
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>> {
|
||||
let url = params.url.clone();
|
||||
self.requests.lock().expect("lock requests").push(params);
|
||||
async move {
|
||||
let metadata = if url == "https://auth.remote.example/oauth/token" {
|
||||
json!({
|
||||
"access_token": "access-token",
|
||||
"token_type": "bearer",
|
||||
})
|
||||
} else {
|
||||
json!({
|
||||
"authorization_endpoint": "https://auth.remote.example/oauth/authorize",
|
||||
"token_endpoint": "https://auth.remote.example/oauth/token",
|
||||
"scopes_supported": ["scope:remote"],
|
||||
})
|
||||
};
|
||||
Ok(HttpRequestResponse {
|
||||
status: 200,
|
||||
headers: Vec::new(),
|
||||
body: serde_json::to_vec(&metadata)
|
||||
.expect("serialize metadata")
|
||||
.into(),
|
||||
})
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn http_request_stream(
|
||||
&self,
|
||||
_params: HttpRequestParams,
|
||||
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>>
|
||||
{
|
||||
async move {
|
||||
Err(ExecServerError::HttpRequest(
|
||||
"unexpected stream".to_string(),
|
||||
))
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_oauth_metadata_server() -> String {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
@@ -1117,13 +709,8 @@ mod tests {
|
||||
async fn start_authorization_uses_configured_client_id() {
|
||||
let base_url = spawn_oauth_metadata_server().await;
|
||||
let oauth_state = start_authorization(
|
||||
"server",
|
||||
&format!("{base_url}/mcp"),
|
||||
Arc::new(ReqwestHttpClient),
|
||||
OauthHeaders {
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
},
|
||||
reqwest::Client::new(),
|
||||
&[],
|
||||
"http://127.0.0.1/callback",
|
||||
Some("eci-prd-pub-codex-123"),
|
||||
@@ -1131,8 +718,11 @@ mod tests {
|
||||
.await
|
||||
.expect("start oauth authorization");
|
||||
|
||||
let auth_url =
|
||||
Url::parse(oauth_state.authorization_url()).expect("authorization url should parse");
|
||||
let authorization_url = oauth_state
|
||||
.get_authorization_url()
|
||||
.await
|
||||
.expect("read authorization url");
|
||||
let auth_url = Url::parse(&authorization_url).expect("authorization url should parse");
|
||||
let client_id = auth_url
|
||||
.query_pairs()
|
||||
.find(|(key, _)| key == "client_id")
|
||||
@@ -1141,107 +731,6 @@ mod tests {
|
||||
assert_eq!(client_id.as_deref(), Some("eci-prd-pub-codex-123"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_authorization_uses_selected_http_client_for_remote_only_server() {
|
||||
let http_client = Arc::new(RemoteOnlyHttpClient::default());
|
||||
let oauth_state = start_authorization(
|
||||
"remote-only",
|
||||
"http://remote-only.invalid/mcp",
|
||||
http_client.clone(),
|
||||
OauthHeaders {
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
},
|
||||
&[],
|
||||
"http://127.0.0.1/callback",
|
||||
Some("remote-client-id"),
|
||||
)
|
||||
.await
|
||||
.expect("start oauth authorization through selected http client");
|
||||
|
||||
let auth_url =
|
||||
Url::parse(oauth_state.authorization_url()).expect("authorization url should parse");
|
||||
let client_id = auth_url
|
||||
.query_pairs()
|
||||
.find(|(key, _)| key == "client_id")
|
||||
.map(|(_, value)| value.into_owned());
|
||||
assert_eq!(client_id.as_deref(), Some("remote-client-id"));
|
||||
assert_eq!(
|
||||
http_client
|
||||
.requests()
|
||||
.into_iter()
|
||||
.map(|request| request.url)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
"http://remote-only.invalid/.well-known/oauth-authorization-server/mcp".to_string()
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn routed_token_request_disables_redirects() {
|
||||
let http_client = Arc::new(RemoteOnlyHttpClient::default());
|
||||
let client = super::RoutedOAuthHttpClient::new(http_client.clone(), HeaderMap::new());
|
||||
let request = oauth2::http::Request::builder()
|
||||
.method("POST")
|
||||
.uri("https://auth.remote.example/oauth/token")
|
||||
.body(Vec::new())
|
||||
.expect("build token request");
|
||||
|
||||
client
|
||||
.call(request)
|
||||
.await
|
||||
.expect("token request should succeed");
|
||||
|
||||
assert!(
|
||||
http_client.requests()[0]
|
||||
.headers
|
||||
.iter()
|
||||
.any(|header| header.name == HTTP_REQUEST_NO_REDIRECTS_HEADER)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn token_exchange_preserves_dynamic_registration_client_secret() {
|
||||
let http_client = Arc::new(RemoteOnlyHttpClient::default());
|
||||
let oauth_state = super::OAuthState::new(
|
||||
super::StreamableHttpOAuthMetadata {
|
||||
authorization_endpoint: "https://auth.remote.example/oauth/authorize".to_string(),
|
||||
token_endpoint: "https://auth.remote.example/oauth/token".to_string(),
|
||||
registration_endpoint: None,
|
||||
scopes_supported: None,
|
||||
},
|
||||
super::OAuthClientConfig {
|
||||
client_id: "dynamic-client".to_string(),
|
||||
client_secret: Some("dynamic-secret".to_string()),
|
||||
},
|
||||
"http://127.0.0.1/callback",
|
||||
&[],
|
||||
HeaderMap::new(),
|
||||
http_client.clone(),
|
||||
)
|
||||
.expect("build oauth state");
|
||||
let csrf_state = oauth_state.csrf_state.secret().to_string();
|
||||
|
||||
oauth_state
|
||||
.handle_callback("code", &csrf_state)
|
||||
.await
|
||||
.expect("token exchange should succeed");
|
||||
|
||||
let authorization = http_client
|
||||
.requests()
|
||||
.into_iter()
|
||||
.find(|request| request.url == "https://auth.remote.example/oauth/token")
|
||||
.and_then(|request| {
|
||||
request
|
||||
.headers
|
||||
.into_iter()
|
||||
.find(|header| header.name.eq_ignore_ascii_case("authorization"))
|
||||
})
|
||||
.expect("token exchange should send authorization header");
|
||||
assert!(authorization.value.starts_with("Basic "));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_oauth_callback_accepts_default_path() {
|
||||
let parsed = parse_oauth_callback("/callback?code=abc&state=xyz", "/callback");
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use codex_config::types::McpServerEnvVar;
|
||||
use codex_exec_server::HTTP_REQUEST_NO_PROXY_HEADER;
|
||||
use codex_exec_server::HTTP_REQUEST_NO_REDIRECTS_HEADER;
|
||||
use codex_exec_server::HttpHeader;
|
||||
use reqwest::ClientBuilder;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderName;
|
||||
@@ -129,40 +126,6 @@ pub(crate) fn apply_default_headers(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn protocol_headers(headers: &HeaderMap) -> Vec<HttpHeader> {
|
||||
headers
|
||||
.iter()
|
||||
.filter_map(|(name, value)| {
|
||||
value.to_str().ok().map(|value| HttpHeader {
|
||||
name: name.as_str().to_string(),
|
||||
value: value.to_string(),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn oauth_discovery_headers(headers: &HeaderMap) -> Vec<HttpHeader> {
|
||||
internal_request_headers(protocol_headers(headers), [HTTP_REQUEST_NO_PROXY_HEADER])
|
||||
}
|
||||
|
||||
pub(crate) fn oauth_token_headers(headers: &HeaderMap) -> Vec<HttpHeader> {
|
||||
internal_request_headers(
|
||||
protocol_headers(headers),
|
||||
[HTTP_REQUEST_NO_REDIRECTS_HEADER],
|
||||
)
|
||||
}
|
||||
|
||||
fn internal_request_headers<const N: usize>(
|
||||
mut headers: Vec<HttpHeader>,
|
||||
internal_header_names: [&str; N],
|
||||
) -> Vec<HttpHeader> {
|
||||
headers.extend(internal_header_names.into_iter().map(|name| HttpHeader {
|
||||
name: name.to_string(),
|
||||
value: "true".to_string(),
|
||||
}));
|
||||
headers
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(crate) const DEFAULT_ENV_VARS: &[&str] = &[
|
||||
"HOME",
|
||||
|
||||
@@ -1,244 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
|
||||
export CARGO_TARGET_DIR="${CARGO_TARGET_DIR:-${REPO_ROOT}/codex-rs/target}"
|
||||
REMOTE_SERVER_BIN="${REMOTE_SERVER_BIN:-${CARGO_TARGET_DIR}/debug/test_streamable_http_server}"
|
||||
CODEX_BIN="${CODEX_BIN:-${CARGO_TARGET_DIR}/debug/codex}"
|
||||
TMP_DIR="$(mktemp -d "${TMPDIR:-/tmp}/codex-remote-mcp-oauth-e2e.XXXXXX")"
|
||||
CODEX_HOME="${TMP_DIR}/codex-home"
|
||||
EXEC_SERVER_PORT=""
|
||||
EXEC_SERVER_LOG_FILE="${TMP_DIR}/exec-server.log"
|
||||
EXEC_SERVER_PID=""
|
||||
REMOTE_BOUND_ADDR_FILE="${TMP_DIR}/remote-mcp.addr"
|
||||
REMOTE_LOG_FILE="${TMP_DIR}/remote-mcp.log"
|
||||
REMOTE_SERVER_PID=""
|
||||
|
||||
cleanup() {
|
||||
if [[ -n "${REMOTE_SERVER_PID}" ]]; then
|
||||
kill "${REMOTE_SERVER_PID}" >/dev/null 2>&1 || true
|
||||
fi
|
||||
if [[ -n "${EXEC_SERVER_PID}" ]]; then
|
||||
kill "${EXEC_SERVER_PID}" >/dev/null 2>&1 || true
|
||||
fi
|
||||
rm -rf "${TMP_DIR}"
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
if [[ ! -x "${CODEX_BIN}" || ! -x "${REMOTE_SERVER_BIN}" ]]; then
|
||||
(
|
||||
cd "${REPO_ROOT}/codex-rs"
|
||||
cargo build -p codex-cli --bin codex -p codex-rmcp-client --bin test_streamable_http_server
|
||||
)
|
||||
fi
|
||||
|
||||
EXEC_SERVER_PORT="$(
|
||||
python3 - <<'PY'
|
||||
import socket
|
||||
|
||||
with socket.socket() as sock:
|
||||
sock.bind(("127.0.0.1", 0))
|
||||
print(sock.getsockname()[1])
|
||||
PY
|
||||
)"
|
||||
"${CODEX_BIN}" exec-server --listen "ws://127.0.0.1:${EXEC_SERVER_PORT}" \
|
||||
>"${EXEC_SERVER_LOG_FILE}" 2>&1 &
|
||||
EXEC_SERVER_PID="$!"
|
||||
|
||||
deadline=$((SECONDS + 10))
|
||||
while (( SECONDS < deadline )); do
|
||||
if python3 - <<PY >/dev/null 2>&1
|
||||
import socket
|
||||
|
||||
with socket.create_connection(("127.0.0.1", ${EXEC_SERVER_PORT}), timeout=0.2):
|
||||
pass
|
||||
PY
|
||||
then
|
||||
break
|
||||
fi
|
||||
sleep 0.05
|
||||
done
|
||||
if ! python3 - <<PY >/dev/null 2>&1
|
||||
import socket
|
||||
|
||||
with socket.create_connection(("127.0.0.1", ${EXEC_SERVER_PORT}), timeout=0.2):
|
||||
pass
|
||||
PY
|
||||
then
|
||||
cat "${EXEC_SERVER_LOG_FILE}" >&2 || true
|
||||
echo "timed out waiting for exec-server" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
MCP_STREAMABLE_HTTP_BIND_ADDR='127.0.0.1:0' \
|
||||
MCP_STREAMABLE_HTTP_BOUND_ADDR_FILE="${REMOTE_BOUND_ADDR_FILE}" \
|
||||
"${REMOTE_SERVER_BIN}" >"${REMOTE_LOG_FILE}" 2>&1 &
|
||||
REMOTE_SERVER_PID="$!"
|
||||
|
||||
deadline=$((SECONDS + 10))
|
||||
REMOTE_BOUND_ADDR=""
|
||||
while (( SECONDS < deadline )); do
|
||||
if REMOTE_BOUND_ADDR="$(cat "${REMOTE_BOUND_ADDR_FILE}" 2>/dev/null)"; then
|
||||
break
|
||||
fi
|
||||
sleep 0.05
|
||||
done
|
||||
if [[ -z "${REMOTE_BOUND_ADDR}" ]]; then
|
||||
cat "${REMOTE_LOG_FILE}" >&2 || true
|
||||
echo "timed out waiting for remote MCP OAuth test server" >&2
|
||||
exit 1
|
||||
fi
|
||||
REMOTE_PORT="${REMOTE_BOUND_ADDR##*:}"
|
||||
REMOTE_MCP_URL="http://127.0.0.1:${REMOTE_PORT}/mcp"
|
||||
CALLBACK_PORT="$(
|
||||
python3 - <<'PY'
|
||||
import socket
|
||||
|
||||
with socket.socket() as sock:
|
||||
sock.bind(("0.0.0.0", 0))
|
||||
print(sock.getsockname()[1])
|
||||
PY
|
||||
)"
|
||||
|
||||
mkdir -p "${CODEX_HOME}"
|
||||
cat > "${CODEX_HOME}/config.toml" <<EOF
|
||||
model_provider = "mock_provider"
|
||||
mcp_oauth_credentials_store = "file"
|
||||
mcp_oauth_callback_port = ${CALLBACK_PORT}
|
||||
mcp_oauth_callback_url = "http://127.0.0.1:${CALLBACK_PORT}/callback"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock"
|
||||
base_url = "http://127.0.0.1:1/v1"
|
||||
wire_api = "responses"
|
||||
|
||||
[mcp_servers.remote-oauth]
|
||||
url = "${REMOTE_MCP_URL}"
|
||||
environment_id = "remote"
|
||||
|
||||
[mcp_servers.remote-oauth.oauth]
|
||||
client_id = "codex-app-server-test"
|
||||
EOF
|
||||
cat > "${CODEX_HOME}/environments.toml" <<EOF
|
||||
include_local = false
|
||||
|
||||
[[environments]]
|
||||
id = "remote"
|
||||
url = "ws://127.0.0.1:${EXEC_SERVER_PORT}"
|
||||
EOF
|
||||
|
||||
CODEX_BIN="${CODEX_BIN}" \
|
||||
CODEX_HOME="${CODEX_HOME}" \
|
||||
python3 - <<'PY'
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
|
||||
|
||||
def send(proc, payload):
|
||||
proc.stdin.write(json.dumps(payload) + "\n")
|
||||
proc.stdin.flush()
|
||||
|
||||
|
||||
def read_message(proc, timeout_s=15):
|
||||
deadline = time.monotonic() + timeout_s
|
||||
while time.monotonic() < deadline:
|
||||
line = proc.stdout.readline()
|
||||
if line:
|
||||
return json.loads(line)
|
||||
if proc.poll() is not None:
|
||||
stderr = proc.stderr.read()
|
||||
raise SystemExit(f"codex app-server exited: {stderr}")
|
||||
time.sleep(0.01)
|
||||
raise SystemExit("timed out waiting for codex app-server message")
|
||||
|
||||
|
||||
def read_until(proc, predicate, timeout_s=15):
|
||||
deadline = time.monotonic() + timeout_s
|
||||
seen = []
|
||||
while time.monotonic() < deadline:
|
||||
message = read_message(proc, timeout_s=max(0.1, deadline - time.monotonic()))
|
||||
seen.append(message)
|
||||
if predicate(message):
|
||||
return message
|
||||
raise SystemExit(f"timed out waiting for matching message: {seen!r}")
|
||||
|
||||
|
||||
def request(proc, request_id, method, params):
|
||||
send(proc, {"id": request_id, "method": method, "params": params})
|
||||
message = read_until(proc, lambda message: message.get("id") == request_id)
|
||||
if "error" in message:
|
||||
raise SystemExit(f"{method} failed: {message['error']}")
|
||||
return message["result"]
|
||||
|
||||
|
||||
codex_bin = os.environ["CODEX_BIN"]
|
||||
codex_home = os.environ["CODEX_HOME"]
|
||||
proc = subprocess.Popen(
|
||||
[codex_bin, "app-server", "--listen", "stdio://"],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
env={**os.environ, "CODEX_HOME": codex_home},
|
||||
)
|
||||
assert proc.stdin is not None
|
||||
assert proc.stdout is not None
|
||||
assert proc.stderr is not None
|
||||
|
||||
initialize = request(
|
||||
proc,
|
||||
1,
|
||||
"initialize",
|
||||
{
|
||||
"clientInfo": {"name": "remote-mcp-oauth-e2e", "version": "0.1.0"},
|
||||
"capabilities": {"experimentalApi": True},
|
||||
},
|
||||
)
|
||||
if "userAgent" not in initialize:
|
||||
raise SystemExit(f"initialize response missing userAgent: {initialize!r}")
|
||||
send(proc, {"method": "initialized"})
|
||||
|
||||
status = request(
|
||||
proc,
|
||||
2,
|
||||
"mcpServerStatus/list",
|
||||
{"detail": "toolsAndAuthOnly"},
|
||||
)
|
||||
entry = status["data"][0]
|
||||
if entry["name"] != "remote-oauth" or entry["authStatus"] != "notLoggedIn":
|
||||
raise SystemExit(f"unexpected pre-login status: {status!r}")
|
||||
|
||||
login = request(proc, 3, "mcpServer/oauth/login", {"name": "remote-oauth"})
|
||||
authorization_url = login["authorizationUrl"]
|
||||
with urllib.request.urlopen(authorization_url, timeout=15) as response:
|
||||
if response.status != 200:
|
||||
raise SystemExit(f"unexpected authorize response: {response.status}")
|
||||
|
||||
completed = read_until(
|
||||
proc,
|
||||
lambda message: message.get("method") == "mcpServer/oauthLogin/completed",
|
||||
)
|
||||
params = completed.get("params") or {}
|
||||
if params != {"name": "remote-oauth", "success": True}:
|
||||
raise SystemExit(f"unexpected oauth completion notification: {completed!r}")
|
||||
|
||||
status = request(
|
||||
proc,
|
||||
4,
|
||||
"mcpServerStatus/list",
|
||||
{"detail": "toolsAndAuthOnly"},
|
||||
)
|
||||
entry = status["data"][0]
|
||||
if entry["authStatus"] != "oAuth":
|
||||
raise SystemExit(f"unexpected post-login status: {status!r}")
|
||||
|
||||
proc.terminate()
|
||||
proc.wait(timeout=5)
|
||||
print("remote MCP OAuth E2E passed")
|
||||
PY
|
||||
Reference in New Issue
Block a user