Compare commits

...

12 Commits

Author SHA1 Message Date
Matthew Zeng
ad5d583d50 update 2026-01-22 16:35:03 -08:00
Matthew Zeng
8f621e0f1a update 2026-01-22 11:46:34 -08:00
Matthew Zeng
341cb5a8f8 update 2026-01-22 09:40:18 -08:00
Matthew Zeng
5d5595ba04 update 2026-01-22 00:16:22 -08:00
Matthew Zeng
7e056fed05 update 2026-01-21 23:25:38 -08:00
Matthew Zeng
361b3711cc update 2026-01-21 23:21:53 -08:00
Matthew Zeng
5a54beebf9 update 2026-01-21 23:04:42 -08:00
Matthew Zeng
da95ee0e58 update 2026-01-21 22:59:01 -08:00
Matthew Zeng
407cd9f0d3 update 2026-01-21 22:52:28 -08:00
Matthew Zeng
b5a37c2bd3 Merge branch 'main' of https://github.com/openai/codex into dev/mzeng/connectors_codex_cli_1 2026-01-21 22:27:44 -08:00
Matthew Zeng
9a69a97e33 Merge branch 'main' of https://github.com/openai/codex into dev/mzeng/connectors_codex_cli_1 2026-01-21 21:12:54 -08:00
Matthew Zeng
4d25fe57b3 update 2026-01-21 17:37:09 -08:00
25 changed files with 1288 additions and 28 deletions

5
codex-rs/Cargo.lock generated
View File

@@ -625,6 +625,8 @@ dependencies = [
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"sync_wrapper",
"tokio",
"tower",
@@ -1000,11 +1002,13 @@ version = "0.0.0"
dependencies = [
"anyhow",
"app_test_support",
"axum",
"base64",
"chrono",
"codex-app-server-protocol",
"codex-arg0",
"codex-backend-client",
"codex-chatgpt",
"codex-common",
"codex-core",
"codex-feedback",
@@ -1018,6 +1022,7 @@ dependencies = [
"mcp-types",
"os_info",
"pretty_assertions",
"rmcp",
"serde",
"serde_json",
"serial_test",

View File

@@ -133,6 +133,10 @@ client_request_definitions! {
params: v2::SkillsListParams,
response: v2::SkillsListResponse,
},
AppsList => "app/list" {
params: v2::AppsListParams,
response: v2::AppsListResponse,
},
SkillsConfigWrite => "skills/config/write" {
params: v2::SkillsConfigWriteParams,
response: v2::SkillsConfigWriteResponse,

View File

@@ -965,6 +965,39 @@ pub struct ListMcpServerStatusResponse {
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct AppsListParams {
/// Opaque pagination cursor returned by a previous call.
pub cursor: Option<String>,
/// Optional page size; defaults to a reasonable server-side value.
pub limit: Option<u32>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct AppInfo {
pub id: String,
pub name: String,
pub description: Option<String>,
pub logo_url: Option<String>,
pub install_url: Option<String>,
#[serde(default)]
pub is_accessible: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct AppsListResponse {
pub data: Vec<AppInfo>,
/// Opaque cursor to pass to the next call to continue after the last item.
/// If None, there are no more items to return.
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -22,6 +22,7 @@ codex-common = { workspace = true, features = ["cli"] }
codex-core = { workspace = true }
codex-backend-client = { workspace = true }
codex-file-search = { workspace = true }
codex-chatgpt = { workspace = true }
codex-login = { workspace = true }
codex-protocol = { workspace = true }
codex-app-server-protocol = { workspace = true }
@@ -48,11 +49,20 @@ uuid = { workspace = true, features = ["serde", "v7"] }
[dev-dependencies]
app_test_support = { workspace = true }
axum = { workspace = true, default-features = false, features = [
"http1",
"json",
"tokio",
] }
base64 = { workspace = true }
core_test_support = { workspace = true }
mcp-types = { workspace = true }
os_info = { workspace = true }
pretty_assertions = { workspace = true }
rmcp = { workspace = true, default-features = false, features = [
"server",
"transport-streamable-http-server",
] }
serial_test = { workspace = true }
wiremock = { workspace = true }
shlex = { workspace = true }

View File

@@ -88,6 +88,7 @@ Example (from OpenAI's official VSCode extension):
- `model/list` — list available models (with reasoning effort options).
- `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination).
- `skills/list` — list skills for one or more `cwd` values (optional `forceReload`).
- `app/list` — list available apps.
- `skills/config/write` — write user-level skill config by path.
- `mcpServer/oauth/login` — start an OAuth login for a configured MCP server; returns an `authorization_url` and later emits `mcpServer/oauthLogin/completed` once the browser flow finishes.
- `tool/requestUserInput` — prompt the user with 13 short questions for a tool call and return their answers (experimental).

View File

@@ -13,6 +13,9 @@ use codex_app_server_protocol::AccountLoginCompletedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::AppInfo as ApiAppInfo;
use codex_app_server_protocol::AppsListParams;
use codex_app_server_protocol::AppsListResponse;
use codex_app_server_protocol::ArchiveConversationParams;
use codex_app_server_protocol::ArchiveConversationResponse;
use codex_app_server_protocol::AskForApproval;
@@ -120,6 +123,7 @@ use codex_app_server_protocol::UserInput as V2UserInput;
use codex_app_server_protocol::UserSavedConfig;
use codex_app_server_protocol::build_turns_from_event_msgs;
use codex_backend_client::Client as BackendClient;
use codex_chatgpt::connectors;
use codex_core::AuthManager;
use codex_core::CodexThread;
use codex_core::Cursor as RolloutCursor;
@@ -406,6 +410,9 @@ impl CodexMessageProcessor {
ClientRequest::SkillsList { request_id, params } => {
self.skills_list(request_id, params).await;
}
ClientRequest::AppsList { request_id, params } => {
self.apps_list(request_id, params).await;
}
ClientRequest::SkillsConfigWrite { request_id, params } => {
self.skills_config_write(request_id, params).await;
}
@@ -3296,6 +3303,102 @@ impl CodexMessageProcessor {
.await;
}
async fn apps_list(&self, request_id: RequestId, params: AppsListParams) {
let AppsListParams { cursor, limit } = params;
let config = match self.load_latest_config().await {
Ok(config) => config,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
if !config.features.enabled(Feature::Connectors) {
self.outgoing
.send_response(
request_id,
AppsListResponse {
data: Vec::new(),
next_cursor: None,
},
)
.await;
return;
}
let connectors = match connectors::list_connectors(&config).await {
Ok(connectors) => connectors,
Err(err) => {
self.send_internal_error(request_id, format!("failed to list apps: {err}"))
.await;
return;
}
};
let total = connectors.len();
if total == 0 {
self.outgoing
.send_response(
request_id,
AppsListResponse {
data: Vec::new(),
next_cursor: None,
},
)
.await;
return;
}
let effective_limit = limit.unwrap_or(total as u32).max(1) as usize;
let effective_limit = effective_limit.min(total);
let start = match cursor {
Some(cursor) => match cursor.parse::<usize>() {
Ok(idx) => idx,
Err(_) => {
self.send_invalid_request_error(
request_id,
format!("invalid cursor: {cursor}"),
)
.await;
return;
}
},
None => 0,
};
if start > total {
self.send_invalid_request_error(
request_id,
format!("cursor {start} exceeds total apps {total}"),
)
.await;
return;
}
let end = start.saturating_add(effective_limit).min(total);
let data = connectors[start..end]
.iter()
.cloned()
.map(|connector| ApiAppInfo {
id: connector.connector_id,
name: connector.connector_name,
description: connector.connector_description,
logo_url: connector.logo_url,
install_url: connector.install_url,
is_accessible: connector.is_accessible,
})
.collect();
let next_cursor = if end < total {
Some(end.to_string())
} else {
None
};
self.outgoing
.send_response(request_id, AppsListResponse { data, next_cursor })
.await;
}
async fn skills_list(&self, request_id: RequestId, params: SkillsListParams) {
let SkillsListParams { cwds, force_reload } = params;
let cwds = if cwds.is_empty() {

View File

@@ -49,6 +49,16 @@ impl ChatGptAuthFixture {
self
}
pub fn chatgpt_user_id(mut self, chatgpt_user_id: impl Into<String>) -> Self {
self.claims.chatgpt_user_id = Some(chatgpt_user_id.into());
self
}
pub fn chatgpt_account_id(mut self, chatgpt_account_id: impl Into<String>) -> Self {
self.claims.chatgpt_account_id = Some(chatgpt_account_id.into());
self
}
pub fn email(mut self, email: impl Into<String>) -> Self {
self.claims.email = Some(email.into());
self
@@ -69,6 +79,8 @@ impl ChatGptAuthFixture {
pub struct ChatGptIdTokenClaims {
pub email: Option<String>,
pub plan_type: Option<String>,
pub chatgpt_user_id: Option<String>,
pub chatgpt_account_id: Option<String>,
}
impl ChatGptIdTokenClaims {
@@ -85,6 +97,16 @@ impl ChatGptIdTokenClaims {
self.plan_type = Some(plan_type.into());
self
}
pub fn chatgpt_user_id(mut self, chatgpt_user_id: impl Into<String>) -> Self {
self.chatgpt_user_id = Some(chatgpt_user_id.into());
self
}
pub fn chatgpt_account_id(mut self, chatgpt_account_id: impl Into<String>) -> Self {
self.chatgpt_account_id = Some(chatgpt_account_id.into());
self
}
}
pub fn encode_id_token(claims: &ChatGptIdTokenClaims) -> Result<String> {
@@ -93,10 +115,20 @@ pub fn encode_id_token(claims: &ChatGptIdTokenClaims) -> Result<String> {
if let Some(email) = &claims.email {
payload.insert("email".to_string(), json!(email));
}
let mut auth_payload = serde_json::Map::new();
if let Some(plan_type) = &claims.plan_type {
auth_payload.insert("chatgpt_plan_type".to_string(), json!(plan_type));
}
if let Some(chatgpt_user_id) = &claims.chatgpt_user_id {
auth_payload.insert("chatgpt_user_id".to_string(), json!(chatgpt_user_id));
}
if let Some(chatgpt_account_id) = &claims.chatgpt_account_id {
auth_payload.insert("chatgpt_account_id".to_string(), json!(chatgpt_account_id));
}
if !auth_payload.is_empty() {
payload.insert(
"https://api.openai.com/auth".to_string(),
json!({ "chatgpt_plan_type": plan_type }),
serde_json::Value::Object(auth_payload),
);
}
let payload = serde_json::Value::Object(payload);

View File

@@ -12,6 +12,7 @@ use tokio::process::ChildStdout;
use anyhow::Context;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AppsListParams;
use codex_app_server_protocol::ArchiveConversationParams;
use codex_app_server_protocol::CancelLoginAccountParams;
use codex_app_server_protocol::CancelLoginChatGptParams;
@@ -399,6 +400,12 @@ impl McpProcess {
self.send_request("model/list", params).await
}
/// Send an `app/list` JSON-RPC request.
pub async fn send_apps_list_request(&mut self, params: AppsListParams) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("app/list", params).await
}
/// Send a `collaborationMode/list` JSON-RPC request.
pub async fn send_list_collaboration_modes_request(
&mut self,

View File

@@ -0,0 +1,381 @@
use std::borrow::Cow;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use axum::Json;
use axum::Router;
use axum::extract::State;
use axum::http::HeaderMap;
use axum::http::StatusCode;
use axum::http::header::AUTHORIZATION;
use axum::routing::post;
use codex_app_server_protocol::AppInfo;
use codex_app_server_protocol::AppsListParams;
use codex_app_server_protocol::AppsListResponse;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::connectors::ConnectorInfo;
use pretty_assertions::assert_eq;
use rmcp::handler::server::ServerHandler;
use rmcp::model::JsonObject;
use rmcp::model::ListToolsResult;
use rmcp::model::Meta;
use rmcp::model::ServerCapabilities;
use rmcp::model::ServerInfo;
use rmcp::model::Tool;
use rmcp::model::ToolAnnotations;
use rmcp::transport::StreamableHttpServerConfig;
use rmcp::transport::StreamableHttpService;
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use serde_json::json;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio::time::timeout;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test]
async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_apps_list_request(AppsListParams {
limit: Some(50),
cursor: None,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let AppsListResponse { data, next_cursor } = to_response(response)?;
assert!(data.is_empty());
assert!(next_cursor.is_none());
Ok(())
}
#[tokio::test]
async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
let connectors = vec![
ConnectorInfo {
connector_id: "alpha".to_string(),
connector_name: "Alpha".to_string(),
connector_description: Some("Alpha connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
install_url: None,
is_accessible: false,
},
ConnectorInfo {
connector_id: "beta".to_string(),
connector_name: "beta".to_string(),
connector_description: None,
logo_url: None,
install_url: None,
is_accessible: false,
},
];
let tools = vec![connector_tool("beta", "Beta App")?];
let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_apps_list_request(AppsListParams {
limit: None,
cursor: None,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let AppsListResponse { data, next_cursor } = to_response(response)?;
let expected = vec![
AppInfo {
id: "beta".to_string(),
name: "Beta App".to_string(),
description: None,
logo_url: None,
install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()),
is_accessible: true,
},
AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
is_accessible: false,
},
];
assert_eq!(data, expected);
assert!(next_cursor.is_none());
server_handle.abort();
Ok(())
}
#[tokio::test]
async fn list_apps_paginates_results() -> Result<()> {
let connectors = vec![
ConnectorInfo {
connector_id: "alpha".to_string(),
connector_name: "Alpha".to_string(),
connector_description: Some("Alpha connector".to_string()),
logo_url: None,
install_url: None,
is_accessible: false,
},
ConnectorInfo {
connector_id: "beta".to_string(),
connector_name: "beta".to_string(),
connector_description: None,
logo_url: None,
install_url: None,
is_accessible: false,
},
];
let tools = vec![connector_tool("beta", "Beta App")?];
let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let first_request = mcp
.send_apps_list_request(AppsListParams {
limit: Some(1),
cursor: None,
})
.await?;
let first_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(first_request)),
)
.await??;
let AppsListResponse {
data: first_page,
next_cursor: first_cursor,
} = to_response(first_response)?;
let expected_first = vec![AppInfo {
id: "beta".to_string(),
name: "Beta App".to_string(),
description: None,
logo_url: None,
install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()),
is_accessible: true,
}];
assert_eq!(first_page, expected_first);
let next_cursor = first_cursor.ok_or_else(|| anyhow::anyhow!("missing cursor"))?;
let second_request = mcp
.send_apps_list_request(AppsListParams {
limit: Some(1),
cursor: Some(next_cursor),
})
.await?;
let second_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_request)),
)
.await??;
let AppsListResponse {
data: second_page,
next_cursor: second_cursor,
} = to_response(second_response)?;
let expected_second = vec![AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
is_accessible: false,
}];
assert_eq!(second_page, expected_second);
assert!(second_cursor.is_none());
server_handle.abort();
Ok(())
}
#[derive(Clone)]
struct AppsServerState {
expected_bearer: String,
expected_account_id: String,
response: serde_json::Value,
}
#[derive(Clone)]
struct AppListMcpServer {
tools: Arc<Vec<Tool>>,
}
impl AppListMcpServer {
fn new(tools: Arc<Vec<Tool>>) -> Self {
Self { tools }
}
}
impl ServerHandler for AppListMcpServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
capabilities: ServerCapabilities::builder().enable_tools().build(),
..ServerInfo::default()
}
}
fn list_tools(
&self,
_request: Option<rmcp::model::PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListToolsResult, rmcp::ErrorData>> + Send + '_
{
let tools = self.tools.clone();
async move {
Ok(ListToolsResult {
tools: (*tools).clone(),
next_cursor: None,
meta: None,
})
}
}
}
async fn start_apps_server(
connectors: Vec<ConnectorInfo>,
tools: Vec<Tool>,
) -> Result<(String, JoinHandle<()>)> {
let state = AppsServerState {
expected_bearer: "Bearer chatgpt-token".to_string(),
expected_account_id: "account-123".to_string(),
response: json!({ "connectors": connectors }),
};
let state = Arc::new(state);
let tools = Arc::new(tools);
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let mcp_service = StreamableHttpService::new(
{
let tools = tools.clone();
move || Ok(AppListMcpServer::new(tools.clone()))
},
Arc::new(LocalSessionManager::default()),
StreamableHttpServerConfig::default(),
);
let router = Router::new()
.route("/aip/connectors/list_accessible", post(list_connectors))
.with_state(state)
.nest_service("/api/codex/apps", mcp_service);
let handle = tokio::spawn(async move {
let _ = axum::serve(listener, router).await;
});
Ok((format!("http://{addr}"), handle))
}
async fn list_connectors(
State(state): State<Arc<AppsServerState>>,
headers: HeaderMap,
) -> Result<impl axum::response::IntoResponse, StatusCode> {
let bearer_ok = headers
.get(AUTHORIZATION)
.and_then(|value| value.to_str().ok())
.is_some_and(|value| value == state.expected_bearer);
let account_ok = headers
.get("chatgpt-account-id")
.and_then(|value| value.to_str().ok())
.is_some_and(|value| value == state.expected_account_id);
if bearer_ok && account_ok {
Ok(Json(state.response.clone()))
} else {
Err(StatusCode::UNAUTHORIZED)
}
}
fn connector_tool(connector_id: &str, connector_name: &str) -> Result<Tool> {
let schema: JsonObject = serde_json::from_value(json!({
"type": "object",
"additionalProperties": false
}))?;
let mut tool = Tool::new(
Cow::Owned(format!("connector_{connector_id}")),
Cow::Borrowed("Connector test tool"),
Arc::new(schema),
);
tool.annotations = Some(ToolAnnotations::new().read_only(true));
let mut meta = Meta::new();
meta.0
.insert("connector_id".to_string(), json!(connector_id));
meta.0
.insert("connector_name".to_string(), json!(connector_name));
tool.meta = Some(meta);
Ok(tool)
}
fn write_connectors_config(codex_home: &std::path::Path, base_url: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
chatgpt_base_url = "{base_url}"
[features]
connectors = true
"#
),
)
}

View File

@@ -1,5 +1,6 @@
mod account;
mod analytics;
mod app_list;
mod collaboration_mode_list;
mod config_rpc;
mod initialize;

View File

@@ -5,6 +5,7 @@ use crate::chatgpt_token::get_chatgpt_token_data;
use crate::chatgpt_token::init_chatgpt_token_from_auth;
use anyhow::Context;
use serde::Serialize;
use serde::de::DeserializeOwned;
/// Make a GET request to the ChatGPT backend API.
@@ -48,3 +49,37 @@ pub(crate) async fn chatgpt_get_request<T: DeserializeOwned>(
anyhow::bail!("Request failed with status {status}: {body}")
}
}
pub(crate) async fn chatgpt_post_request<T: DeserializeOwned, P: Serialize>(
config: &Config,
access_token: &str,
account_id: &str,
path: &str,
payload: &P,
) -> anyhow::Result<T> {
let chatgpt_base_url = &config.chatgpt_base_url;
let client = create_client();
let url = format!("{chatgpt_base_url}{path}");
let response = client
.post(&url)
.bearer_auth(access_token)
.header("chatgpt-account-id", account_id)
.header("Content-Type", "application/json")
.json(payload)
.send()
.await
.context("Failed to send request")?;
if response.status().is_success() {
let result: T = response
.json()
.await
.context("Failed to parse JSON response")?;
Ok(result)
} else {
let status = response.status();
let body = response.text().await.unwrap_or_default();
anyhow::bail!("Request failed with status {status}: {body}")
}
}

View File

@@ -0,0 +1,125 @@
use codex_core::config::Config;
use codex_core::features::Feature;
use serde::Deserialize;
use serde::Serialize;
use crate::chatgpt_client::chatgpt_post_request;
use crate::chatgpt_token::get_chatgpt_token_data;
use crate::chatgpt_token::init_chatgpt_token_from_auth;
pub use codex_core::connectors::ConnectorInfo;
pub use codex_core::connectors::connector_display_label;
use codex_core::connectors::connector_install_url;
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools;
use codex_core::connectors::merge_connectors;
#[derive(Debug, Serialize)]
struct ListConnectorsRequest {
principals: Vec<Principal>,
}
#[derive(Debug, Serialize)]
struct Principal {
#[serde(rename = "type")]
principal_type: PrincipalType,
id: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
enum PrincipalType {
User,
}
#[derive(Debug, Deserialize)]
struct ListConnectorsResponse {
connectors: Vec<ConnectorInfo>,
}
pub async fn list_connectors(config: &Config) -> anyhow::Result<Vec<ConnectorInfo>> {
if !config.features.enabled(Feature::Connectors) {
return Ok(Vec::new());
}
let (connectors_result, accessible_result) = tokio::join!(
list_all_connectors(config),
list_accessible_connectors_from_mcp_tools(config),
);
let connectors = connectors_result?;
let accessible = accessible_result?;
Ok(merge_connectors(connectors, accessible))
}
pub async fn list_all_connectors(config: &Config) -> anyhow::Result<Vec<ConnectorInfo>> {
if !config.features.enabled(Feature::Connectors) {
return Ok(Vec::new());
}
init_chatgpt_token_from_auth(&config.codex_home, config.cli_auth_credentials_store_mode)
.await?;
let token_data =
get_chatgpt_token_data().ok_or_else(|| anyhow::anyhow!("ChatGPT token not available"))?;
let user_id = token_data
.id_token
.chatgpt_user_id
.as_deref()
.ok_or_else(|| {
anyhow::anyhow!("ChatGPT user ID not available, please re-run `codex login`")
})?;
let account_id = token_data
.id_token
.chatgpt_account_id
.as_deref()
.ok_or_else(|| {
anyhow::anyhow!("ChatGPT account ID not available, please re-run `codex login`")
})?;
let principal_id = format!("{user_id}__{account_id}");
let request = ListConnectorsRequest {
principals: vec![Principal {
principal_type: PrincipalType::User,
id: principal_id,
}],
};
let response: ListConnectorsResponse = chatgpt_post_request(
config,
token_data.access_token.as_str(),
account_id,
"/aip/connectors/list_accessible?skip_actions=true&external_logos=true",
&request,
)
.await?;
let mut connectors = response.connectors;
for connector in &mut connectors {
let install_url = match connector.install_url.take() {
Some(install_url) => install_url,
None => connector_install_url(&connector.connector_name, &connector.connector_id),
};
connector.connector_name =
normalize_connector_name(&connector.connector_name, &connector.connector_id);
connector.connector_description =
normalize_connector_value(connector.connector_description.as_deref());
connector.install_url = Some(install_url);
connector.is_accessible = false;
}
connectors.sort_by(|left, right| {
left.connector_name
.cmp(&right.connector_name)
.then_with(|| left.connector_id.cmp(&right.connector_id))
});
Ok(connectors)
}
fn normalize_connector_name(name: &str, connector_id: &str) -> String {
let trimmed = name.trim();
if trimmed.is_empty() {
connector_id.to_string()
} else {
trimmed.to_string()
}
}
fn normalize_connector_value(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}

View File

@@ -1,4 +1,5 @@
pub mod apply_command;
mod chatgpt_client;
mod chatgpt_token;
pub mod connectors;
pub mod get_task;

View File

@@ -153,6 +153,9 @@
"collaboration_modes": {
"type": "boolean"
},
"connectors": {
"type": "boolean"
},
"elevated_windows_sandbox": {
"type": "boolean"
},
@@ -1116,6 +1119,9 @@
"collaboration_modes": {
"type": "boolean"
},
"connectors": {
"type": "boolean"
},
"elevated_windows_sandbox": {
"type": "boolean"
},

View File

@@ -996,6 +996,7 @@ mod tests {
id_token: IdTokenInfo {
email: Some("user@example.com".to_string()),
chatgpt_plan_type: Some(InternalPlanType::Known(InternalKnownPlan::Pro)),
chatgpt_user_id: Some("user-12345".to_string()),
chatgpt_account_id: None,
raw_jwt: fake_jwt,
},

View File

@@ -17,6 +17,7 @@ use crate::compact;
use crate::compact::run_inline_auto_compact_task;
use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::connectors;
use crate::exec_policy::ExecPolicyManager;
use crate::features::Feature;
use crate::features::Features;
@@ -103,7 +104,10 @@ use crate::exec::StreamOutput;
use crate::exec_policy::ExecPolicyUpdateError;
use crate::feedback_tags;
use crate::instructions::UserInstructions;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp::effective_mcp_servers;
use crate::mcp::with_codex_apps_mcp;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::model_provider_info::CHAT_WIRE_API_DEPRECATION_SUMMARY;
use crate::project_doc::get_user_instructions;
@@ -635,14 +639,25 @@ impl Session {
let rollout_fut = RolloutRecorder::new(&config, rollout_params);
let history_meta_fut = crate::message_history::history_metadata(&config);
let auth_statuses_fut = compute_auth_statuses(
config.mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
);
let auth_manager_clone = Arc::clone(&auth_manager);
let config_for_mcp = Arc::clone(&config);
let auth_and_mcp_fut = async move {
let auth = auth_manager_clone.auth().await;
let mcp_servers = effective_mcp_servers(&config_for_mcp, auth.as_ref());
let auth_statuses = compute_auth_statuses(
mcp_servers.iter(),
config_for_mcp.mcp_oauth_credentials_store_mode,
)
.await;
(auth, mcp_servers, auth_statuses)
};
// Join all independent futures.
let (rollout_recorder, (history_log_id, history_entry_count), auth_statuses) =
tokio::join!(rollout_fut, history_meta_fut, auth_statuses_fut);
let (
rollout_recorder,
(history_log_id, history_entry_count),
(auth, mcp_servers, auth_statuses),
) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut);
let rollout_recorder = rollout_recorder.map_err(|e| {
error!("failed to initialize rollout recorder: {e:#}");
@@ -682,7 +697,6 @@ impl Session {
}
maybe_push_chat_wire_api_deprecation(&config, &mut post_session_configured_events);
let auth = auth_manager.auth().await;
let auth = auth.as_ref();
let otel_manager = OtelManager::new(
conversation_id,
@@ -717,7 +731,7 @@ impl Session {
config.model_auto_compact_token_limit,
config.approval_policy.value(),
config.sandbox_policy.get().clone(),
config.mcp_servers.keys().map(String::as_str).collect(),
mcp_servers.keys().map(String::as_str).collect(),
config.active_profile.clone(),
);
@@ -801,7 +815,7 @@ impl Session {
.write()
.await
.initialize(
&config.mcp_servers,
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_statuses.clone(),
tx_event.clone(),
@@ -1941,6 +1955,14 @@ impl Session {
}
};
let auth = self.services.auth_manager.auth().await;
let config = self.get_config().await;
let mcp_servers = with_codex_apps_mcp(
mcp_servers,
self.features.enabled(Feature::Connectors),
auth.as_ref(),
config.as_ref(),
);
let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode).await;
let sandbox_state = SandboxState {
sandbox_policy: turn_context.sandbox_policy.clone(),
@@ -2120,6 +2142,7 @@ mod handlers {
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp::collect_mcp_snapshot_from_manager;
use crate::mcp::effective_mcp_servers;
use crate::review_prompts::resolve_review_request;
use crate::tasks::CompactTask;
use crate::tasks::RegularTask;
@@ -2431,13 +2454,12 @@ mod handlers {
pub async fn list_mcp_tools(sess: &Session, config: &Arc<Config>, sub_id: String) {
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
let auth = sess.services.auth_manager.auth().await;
let mcp_servers = effective_mcp_servers(config, auth.as_ref());
let snapshot = collect_mcp_snapshot_from_manager(
&mcp_connection_manager,
compute_auth_statuses(
config.mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
)
.await,
compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode)
.await,
)
.await;
let event = Event {
@@ -2950,6 +2972,60 @@ async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>)
}
}
fn filter_connectors_for_input(
connectors: Vec<connectors::ConnectorInfo>,
input: &[ResponseItem],
) -> Vec<connectors::ConnectorInfo> {
let user_messages = collect_user_messages(input);
if user_messages.is_empty() {
return Vec::new();
}
connectors
.into_iter()
.filter(|connector| connector_inserted_in_messages(connector, &user_messages))
.collect()
}
fn connector_inserted_in_messages(
connector: &connectors::ConnectorInfo,
user_messages: &[String],
) -> bool {
let label = connectors::connector_display_label(connector);
let needle = label.to_lowercase();
let legacy = format!("{label} connector").to_lowercase();
user_messages.iter().any(|message| {
let message = message.to_lowercase();
message.contains(&needle) || message.contains(&legacy)
})
}
fn filter_codex_apps_mcp_tools(
mut mcp_tools: HashMap<String, crate::mcp_connection_manager::ToolInfo>,
connectors: &[connectors::ConnectorInfo],
) -> HashMap<String, crate::mcp_connection_manager::ToolInfo> {
let allowed: HashSet<&str> = connectors
.iter()
.map(|connector| connector.connector_id.as_str())
.collect();
mcp_tools.retain(|_, tool| {
if tool.server_name != CODEX_APPS_MCP_SERVER_NAME {
return true;
}
let Some(connector_id) = codex_apps_connector_id(tool) else {
return false;
};
allowed.contains(connector_id)
});
mcp_tools
}
fn codex_apps_connector_id(tool: &crate::mcp_connection_manager::ToolInfo) -> Option<&str> {
tool.connector_id.as_deref()
}
#[instrument(level = "trace",
skip_all,
fields(
@@ -2966,7 +3042,7 @@ async fn run_sampling_request(
input: Vec<ResponseItem>,
cancellation_token: CancellationToken,
) -> CodexResult<SamplingRequestResult> {
let mcp_tools = sess
let mut mcp_tools = sess
.services
.mcp_connection_manager
.read()
@@ -2974,6 +3050,20 @@ async fn run_sampling_request(
.list_all_tools()
.or_cancel(&cancellation_token)
.await?;
let connectors_for_tools = if turn_context
.client
.config()
.features
.enabled(Feature::Connectors)
{
let connectors = connectors::accessible_connectors_from_mcp_tools(&mcp_tools);
Some(filter_connectors_for_input(connectors, &input))
} else {
None
};
if let Some(connectors) = connectors_for_tools.as_ref() {
mcp_tools = filter_codex_apps_mcp_tools(mcp_tools, connectors);
}
let router = Arc::new(ToolRouter::from_config(
&turn_context.tools_config,
Some(

View File

@@ -0,0 +1,227 @@
use std::collections::HashMap;
use std::env;
use std::path::PathBuf;
use async_channel::unbounded;
use codex_protocol::protocol::SandboxPolicy;
use serde::Deserialize;
use serde::Serialize;
use tokio_util::sync::CancellationToken;
use crate::AuthManager;
use crate::SandboxState;
use crate::config::Config;
use crate::features::Feature;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp::with_codex_apps_mcp;
use crate::mcp_connection_manager::McpConnectionManager;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectorInfo {
#[serde(rename = "id")]
pub connector_id: String,
#[serde(rename = "name")]
pub connector_name: String,
#[serde(default, rename = "description")]
pub connector_description: Option<String>,
#[serde(default, rename = "logo_url")]
pub logo_url: Option<String>,
#[serde(default, rename = "install_url")]
pub install_url: Option<String>,
#[serde(default)]
pub is_accessible: bool,
}
pub async fn list_accessible_connectors_from_mcp_tools(
config: &Config,
) -> anyhow::Result<Vec<ConnectorInfo>> {
if !config.features.enabled(Feature::Connectors) {
return Ok(Vec::new());
}
let auth_manager = auth_manager_from_config(config);
let auth = auth_manager.auth().await;
let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config);
if mcp_servers.is_empty() {
return Ok(Vec::new());
}
let auth_status_entries =
compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode).await;
let mut mcp_connection_manager = McpConnectionManager::default();
let (tx_event, rx_event) = unbounded();
drop(rx_event);
let cancel_token = CancellationToken::new();
let sandbox_state = SandboxState {
sandbox_policy: SandboxPolicy::ReadOnly,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
sandbox_cwd: env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
};
mcp_connection_manager
.initialize(
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_status_entries,
tx_event,
cancel_token.clone(),
sandbox_state,
)
.await;
let tools = mcp_connection_manager.list_all_tools().await;
cancel_token.cancel();
Ok(accessible_connectors_from_mcp_tools(&tools))
}
fn auth_manager_from_config(config: &Config) -> std::sync::Arc<AuthManager> {
AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
)
}
pub fn connector_display_label(connector: &ConnectorInfo) -> String {
format_connector_label(&connector.connector_name, &connector.connector_id)
}
pub(crate) fn accessible_connectors_from_mcp_tools(
mcp_tools: &HashMap<String, crate::mcp_connection_manager::ToolInfo>,
) -> Vec<ConnectorInfo> {
let tools = mcp_tools.values().filter_map(|tool| {
if tool.server_name != CODEX_APPS_MCP_SERVER_NAME {
return None;
}
let connector_id = tool.connector_id.as_deref()?;
let connector_name = normalize_connector_value(tool.connector_name.as_deref());
Some((connector_id.to_string(), connector_name))
});
collect_accessible_connectors(tools)
}
pub fn merge_connectors(
connectors: Vec<ConnectorInfo>,
accessible_connectors: Vec<ConnectorInfo>,
) -> Vec<ConnectorInfo> {
let mut merged: HashMap<String, ConnectorInfo> = connectors
.into_iter()
.map(|mut connector| {
connector.is_accessible = false;
(connector.connector_id.clone(), connector)
})
.collect();
for mut connector in accessible_connectors {
connector.is_accessible = true;
let connector_id = connector.connector_id.clone();
if let Some(existing) = merged.get_mut(&connector_id) {
existing.is_accessible = true;
if existing.connector_name == existing.connector_id
&& connector.connector_name != connector.connector_id
{
existing.connector_name = connector.connector_name;
}
if existing.connector_description.is_none() && connector.connector_description.is_some()
{
existing.connector_description = connector.connector_description;
}
if existing.logo_url.is_none() && connector.logo_url.is_some() {
existing.logo_url = connector.logo_url;
}
} else {
merged.insert(connector_id, connector);
}
}
let mut merged = merged.into_values().collect::<Vec<_>>();
for connector in &mut merged {
if connector.install_url.is_none() {
connector.install_url = Some(connector_install_url(
&connector.connector_name,
&connector.connector_id,
));
}
}
merged.sort_by(|left, right| {
right
.is_accessible
.cmp(&left.is_accessible)
.then_with(|| left.connector_name.cmp(&right.connector_name))
.then_with(|| left.connector_id.cmp(&right.connector_id))
});
merged
}
fn collect_accessible_connectors<I>(tools: I) -> Vec<ConnectorInfo>
where
I: IntoIterator<Item = (String, Option<String>)>,
{
let mut connectors: HashMap<String, String> = HashMap::new();
for (connector_id, connector_name) in tools {
let connector_name = connector_name.unwrap_or_else(|| connector_id.clone());
if let Some(existing_name) = connectors.get_mut(&connector_id) {
if existing_name == &connector_id && connector_name != connector_id {
*existing_name = connector_name;
}
} else {
connectors.insert(connector_id, connector_name);
}
}
let mut accessible: Vec<ConnectorInfo> = connectors
.into_iter()
.map(|(connector_id, connector_name)| ConnectorInfo {
install_url: Some(connector_install_url(&connector_name, &connector_id)),
connector_id,
connector_name,
connector_description: None,
logo_url: None,
is_accessible: true,
})
.collect();
accessible.sort_by(|left, right| {
right
.is_accessible
.cmp(&left.is_accessible)
.then_with(|| left.connector_name.cmp(&right.connector_name))
.then_with(|| left.connector_id.cmp(&right.connector_id))
});
accessible
}
fn normalize_connector_value(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
pub fn connector_install_url(name: &str, connector_id: &str) -> String {
let slug = connector_name_slug(name);
format!("https://chatgpt.com/apps/{slug}/{connector_id}")
}
fn connector_name_slug(name: &str) -> String {
let mut normalized = String::with_capacity(name.len());
for character in name.chars() {
if character.is_ascii_alphanumeric() {
normalized.push(character.to_ascii_lowercase());
} else {
normalized.push('-');
}
}
let normalized = normalized.trim_matches('-');
if normalized.is_empty() {
"app".to_string()
} else {
normalized.to_string()
}
}
fn format_connector_label(name: &str, _id: &str) -> String {
name.to_string()
}

View File

@@ -101,6 +101,8 @@ pub enum Feature {
EnableRequestCompression,
/// Enable collab tools.
Collab,
/// Enable connectors (apps).
Connectors,
/// Steer feature flag - when enabled, Enter submits immediately instead of queuing.
Steer,
/// Enable collaboration modes (Plan, Pair Programming, Execute).
@@ -439,6 +441,12 @@ pub const FEATURES: &[FeatureSpec] = &[
},
default_enabled: false,
},
FeatureSpec {
id: Feature::Connectors,
key: "connectors",
stage: Stage::Beta,
default_enabled: false,
},
FeatureSpec {
id: Feature::Steer,
key: "steer",

View File

@@ -20,6 +20,7 @@ mod codex_delegate;
mod command_safety;
pub mod config;
pub mod config_loader;
pub mod connectors;
mod context_manager;
pub mod custom_prompts;
pub mod env;

View File

@@ -9,16 +9,135 @@ use codex_protocol::protocol::SandboxPolicy;
use mcp_types::Tool as McpTool;
use tokio_util::sync::CancellationToken;
use crate::AuthManager;
use crate::CodexAuth;
use crate::config::Config;
use crate::config::types::McpServerConfig;
use crate::config::types::McpServerTransportConfig;
use crate::features::Feature;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::SandboxState;
const MCP_TOOL_NAME_PREFIX: &str = "mcp";
const MCP_TOOL_NAME_DELIMITER: &str = "__";
pub(crate) const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps_mcp";
const CODEX_CONNECTORS_TOKEN_ENV_VAR: &str = "CODEX_CONNECTORS_TOKEN";
fn codex_apps_mcp_bearer_token_env_var() -> Option<String> {
match env::var(CODEX_CONNECTORS_TOKEN_ENV_VAR) {
Ok(value) if !value.trim().is_empty() => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()),
Ok(_) => None,
Err(env::VarError::NotPresent) => None,
Err(env::VarError::NotUnicode(_)) => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()),
}
}
fn codex_apps_mcp_bearer_token(auth: Option<&CodexAuth>) -> Option<String> {
let token = auth.and_then(|auth| auth.get_token().ok())?;
let token = token.trim();
if token.is_empty() {
None
} else {
Some(token.to_string())
}
}
fn codex_apps_mcp_http_headers(auth: Option<&CodexAuth>) -> Option<HashMap<String, String>> {
let mut headers = HashMap::new();
if let Some(token) = codex_apps_mcp_bearer_token(auth) {
headers.insert("Authorization".to_string(), format!("Bearer {token}"));
}
if let Some(account_id) = auth.and_then(CodexAuth::get_account_id) {
headers.insert("ChatGPT-Account-ID".to_string(), account_id);
}
if headers.is_empty() {
None
} else {
Some(headers)
}
}
fn codex_apps_mcp_url(base_url: &str) -> String {
let mut base_url = base_url.trim_end_matches('/').to_string();
if (base_url.starts_with("https://chatgpt.com")
|| base_url.starts_with("https://chat.openai.com"))
&& !base_url.contains("/backend-api")
{
base_url = format!("{base_url}/backend-api");
}
if base_url.contains("/backend-api") {
format!("{base_url}/wham/apps")
} else if base_url.contains("/api/codex") {
format!("{base_url}/apps")
} else {
format!("{base_url}/api/codex/apps")
}
}
fn codex_apps_mcp_server_config(config: &Config, auth: Option<&CodexAuth>) -> McpServerConfig {
let bearer_token_env_var = codex_apps_mcp_bearer_token_env_var();
let http_headers = if bearer_token_env_var.is_some() {
None
} else {
codex_apps_mcp_http_headers(auth)
};
let url = codex_apps_mcp_url(&config.chatgpt_base_url);
McpServerConfig {
transport: McpServerTransportConfig::StreamableHttp {
url,
bearer_token_env_var,
http_headers,
env_http_headers: None,
},
enabled: true,
disabled_reason: None,
startup_timeout_sec: None,
tool_timeout_sec: None,
enabled_tools: None,
disabled_tools: None,
}
}
pub(crate) fn with_codex_apps_mcp(
mut servers: HashMap<String, McpServerConfig>,
connectors_enabled: bool,
auth: Option<&CodexAuth>,
config: &Config,
) -> HashMap<String, McpServerConfig> {
if connectors_enabled {
servers.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
codex_apps_mcp_server_config(config, auth),
);
} else {
servers.remove(CODEX_APPS_MCP_SERVER_NAME);
}
servers
}
pub(crate) fn effective_mcp_servers(
config: &Config,
auth: Option<&CodexAuth>,
) -> HashMap<String, McpServerConfig> {
with_codex_apps_mcp(
config.mcp_servers.get().clone(),
config.features.enabled(Feature::Connectors),
auth,
config,
)
}
pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent {
if config.mcp_servers.is_empty() {
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
);
let auth = auth_manager.auth().await;
let mcp_servers = effective_mcp_servers(config, auth.as_ref());
if mcp_servers.is_empty() {
return McpListToolsResponseEvent {
tools: HashMap::new(),
resources: HashMap::new(),
@@ -27,11 +146,8 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent
};
}
let auth_status_entries = compute_auth_statuses(
config.mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
)
.await;
let auth_status_entries =
compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode).await;
let mut mcp_connection_manager = McpConnectionManager::default();
let (tx_event, rx_event) = unbounded();
@@ -47,7 +163,7 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent
mcp_connection_manager
.initialize(
&config.mcp_servers,
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_status_entries.clone(),
tx_event,

View File

@@ -153,6 +153,8 @@ pub(crate) struct ToolInfo {
pub(crate) server_name: String,
pub(crate) tool_name: String,
pub(crate) tool: Tool,
pub(crate) connector_id: Option<String>,
pub(crate) connector_name: Option<String>,
}
type ResponderMap = HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>;
@@ -899,14 +901,16 @@ async fn list_tools_for_client(
client: &Arc<RmcpClient>,
timeout: Option<Duration>,
) -> Result<Vec<ToolInfo>> {
let resp = client.list_tools(None, timeout).await?;
let resp = client.list_tools_with_connector_ids(None, timeout).await?;
Ok(resp
.tools
.into_iter()
.map(|tool| ToolInfo {
server_name: server_name.to_owned(),
tool_name: tool.name.clone(),
tool,
tool_name: tool.tool.name.clone(),
tool: tool.tool,
connector_id: tool.connector_id,
connector_name: tool.connector_name,
})
.collect())
}
@@ -1004,6 +1008,8 @@ mod tests {
output_schema: None,
title: None,
},
connector_id: None,
connector_name: None,
}
}

View File

@@ -28,6 +28,8 @@ pub struct IdTokenInfo {
/// (e.g., "free", "plus", "pro", "business", "enterprise", "edu").
/// (Note: values may vary by backend.)
pub(crate) chatgpt_plan_type: Option<PlanType>,
/// ChatGPT user identifier associated with the token, if present.
pub chatgpt_user_id: Option<String>,
/// Organization/workspace identifier associated with the token, if present.
pub chatgpt_account_id: Option<String>,
pub raw_jwt: String,
@@ -74,6 +76,10 @@ struct AuthClaims {
#[serde(default)]
chatgpt_plan_type: Option<PlanType>,
#[serde(default)]
chatgpt_user_id: Option<String>,
#[serde(default)]
user_id: Option<String>,
#[serde(default)]
chatgpt_account_id: Option<String>,
}
@@ -103,12 +109,14 @@ pub fn parse_id_token(id_token: &str) -> Result<IdTokenInfo, IdTokenInfoError> {
email: claims.email,
raw_jwt: id_token.to_string(),
chatgpt_plan_type: auth.chatgpt_plan_type,
chatgpt_user_id: auth.chatgpt_user_id.or(auth.user_id),
chatgpt_account_id: auth.chatgpt_account_id,
}),
None => Ok(IdTokenInfo {
email: claims.email,
raw_jwt: id_token.to_string(),
chatgpt_plan_type: None,
chatgpt_user_id: None,
chatgpt_account_id: None,
}),
}

View File

@@ -22,5 +22,7 @@ pub use perform_oauth_login::perform_oauth_login_return_url;
pub use rmcp::model::ElicitationAction;
pub use rmcp_client::Elicitation;
pub use rmcp_client::ElicitationResponse;
pub use rmcp_client::ListToolsWithConnectorIdResult;
pub use rmcp_client::RmcpClient;
pub use rmcp_client::SendElicitation;
pub use rmcp_client::ToolWithConnectorId;

View File

@@ -23,6 +23,7 @@ use mcp_types::ListToolsResult;
use mcp_types::ReadResourceRequestParams;
use mcp_types::ReadResourceResult;
use mcp_types::RequestId;
use mcp_types::Tool;
use reqwest::header::HeaderMap;
use rmcp::model::CallToolRequestParam;
use rmcp::model::ClientNotification;
@@ -44,6 +45,7 @@ use rmcp::transport::auth::AuthClient;
use rmcp::transport::auth::OAuthState;
use rmcp::transport::child_process::TokioChildProcess;
use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
use serde_json::Value;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Command;
@@ -95,6 +97,17 @@ pub type SendElicitation = Box<
dyn Fn(RequestId, Elicitation) -> BoxFuture<'static, Result<ElicitationResponse>> + Send + Sync,
>;
pub struct ToolWithConnectorId {
pub tool: Tool,
pub connector_id: Option<String>,
pub connector_name: Option<String>,
}
pub struct ListToolsWithConnectorIdResult {
pub next_cursor: Option<String>,
pub tools: Vec<ToolWithConnectorId>,
}
/// MCP client implemented on top of the official `rmcp` SDK.
/// https://github.com/modelcontextprotocol/rust-sdk
pub struct RmcpClient {
@@ -286,6 +299,18 @@ impl RmcpClient {
params: Option<ListToolsRequestParams>,
timeout: Option<Duration>,
) -> Result<ListToolsResult> {
let result = self.list_tools_with_connector_ids(params, timeout).await?;
Ok(ListToolsResult {
next_cursor: result.next_cursor,
tools: result.tools.into_iter().map(|tool| tool.tool).collect(),
})
}
pub async fn list_tools_with_connector_ids(
&self,
params: Option<ListToolsRequestParams>,
timeout: Option<Duration>,
) -> Result<ListToolsWithConnectorIdResult> {
self.refresh_oauth_if_needed().await;
let service = self.service().await?;
let rmcp_params = params
@@ -294,9 +319,35 @@ impl RmcpClient {
let fut = service.list_tools(rmcp_params);
let result = run_with_timeout(fut, timeout, "tools/list").await?;
let converted = convert_to_mcp(result)?;
let tools = result
.tools
.into_iter()
.map(|tool| {
let meta = tool.meta.as_ref();
let connector_id = Self::meta_string(meta, "connector_id");
let connector_name = Self::meta_string(meta, "connector_name")
.or_else(|| Self::meta_string(meta, "connector_display_name"));
let tool = convert_to_mcp(tool)?;
Ok(ToolWithConnectorId {
tool,
connector_id,
connector_name,
})
})
.collect::<Result<Vec<_>>>()?;
self.persist_oauth_tokens().await;
Ok(converted)
Ok(ListToolsWithConnectorIdResult {
next_cursor: result.next_cursor,
tools,
})
}
fn meta_string(meta: Option<&rmcp::model::Meta>, key: &str) -> Option<String> {
meta.and_then(|meta| meta.get(key))
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
pub async fn list_resources(

View File

@@ -12,6 +12,12 @@ Codex can connect to MCP servers configured in `~/.codex/config.toml`. See the c
- https://developers.openai.com/codex/config-reference
## Apps (Connectors)
Use `$` in the composer to insert a ChatGPT connector; the popover lists accessible
apps. The `/apps` command lists available and installed apps. Connected apps appear first
and are labeled as connected; others are marked as can be installed.
## Notify
Codex can run a notification hook when the agent finishes a turn. See the configuration reference for the latest notification settings: