use std::borrow::Cow; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use app_test_support::ChatGptAuthFixture; use app_test_support::McpProcess; use app_test_support::to_response; use app_test_support::write_chatgpt_auth; use axum::Json; use axum::Router; use axum::extract::State; use axum::http::HeaderMap; use axum::http::StatusCode; use axum::http::header::AUTHORIZATION; use axum::routing::get; use codex_app_server_protocol::AppInfo; use codex_app_server_protocol::AppsListParams; use codex_app_server_protocol::AppsListResponse; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_core::auth::AuthCredentialsStoreMode; use pretty_assertions::assert_eq; use rmcp::handler::server::ServerHandler; use rmcp::model::JsonObject; use rmcp::model::ListToolsResult; use rmcp::model::Meta; use rmcp::model::ServerCapabilities; use rmcp::model::ServerInfo; use rmcp::model::Tool; use rmcp::model::ToolAnnotations; use rmcp::transport::StreamableHttpServerConfig; use rmcp::transport::StreamableHttpService; use rmcp::transport::streamable_http_server::session::local::LocalSessionManager; use serde_json::json; use tempfile::TempDir; use tokio::net::TcpListener; use tokio::task::JoinHandle; use tokio::time::timeout; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); #[tokio::test] async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> { let codex_home = TempDir::new()?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; let request_id = mcp .send_apps_list_request(AppsListParams { limit: Some(50), cursor: None, }) .await?; let response: JSONRPCResponse = timeout( DEFAULT_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(request_id)), ) .await??; let AppsListResponse { data, next_cursor } = to_response(response)?; assert!(data.is_empty()); assert!(next_cursor.is_none()); Ok(()) } #[tokio::test] async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { let connectors = vec![ AppInfo { id: "alpha".to_string(), name: "Alpha".to_string(), description: Some("Alpha connector".to_string()), logo_url: Some("https://example.com/alpha.png".to_string()), logo_url_dark: None, distribution_channel: None, install_url: None, is_accessible: false, }, AppInfo { id: "beta".to_string(), name: "beta".to_string(), description: None, logo_url: None, logo_url_dark: None, distribution_channel: None, install_url: None, is_accessible: false, }, ]; let tools = vec![connector_tool("beta", "Beta App")?]; let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?; let codex_home = TempDir::new()?; write_connectors_config(codex_home.path(), &server_url)?; write_chatgpt_auth( codex_home.path(), ChatGptAuthFixture::new("chatgpt-token") .account_id("account-123") .chatgpt_user_id("user-123") .chatgpt_account_id("account-123"), AuthCredentialsStoreMode::File, )?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; let request_id = mcp .send_apps_list_request(AppsListParams { limit: None, cursor: None, }) .await?; let response: JSONRPCResponse = timeout( DEFAULT_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(request_id)), ) .await??; let AppsListResponse { data, next_cursor } = to_response(response)?; let expected = vec![ AppInfo { id: "beta".to_string(), name: "Beta App".to_string(), description: None, logo_url: None, logo_url_dark: None, distribution_channel: None, install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()), is_accessible: true, }, AppInfo { id: "alpha".to_string(), name: "Alpha".to_string(), description: Some("Alpha connector".to_string()), logo_url: Some("https://example.com/alpha.png".to_string()), logo_url_dark: None, distribution_channel: None, install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), is_accessible: false, }, ]; assert_eq!(data, expected); assert!(next_cursor.is_none()); server_handle.abort(); Ok(()) } #[tokio::test] async fn list_apps_paginates_results() -> Result<()> { let connectors = vec![ AppInfo { id: "alpha".to_string(), name: "Alpha".to_string(), description: Some("Alpha connector".to_string()), logo_url: None, logo_url_dark: None, distribution_channel: None, install_url: None, is_accessible: false, }, AppInfo { id: "beta".to_string(), name: "beta".to_string(), description: None, logo_url: None, logo_url_dark: None, distribution_channel: None, install_url: None, is_accessible: false, }, ]; let tools = vec![connector_tool("beta", "Beta App")?]; let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?; let codex_home = TempDir::new()?; write_connectors_config(codex_home.path(), &server_url)?; write_chatgpt_auth( codex_home.path(), ChatGptAuthFixture::new("chatgpt-token") .account_id("account-123") .chatgpt_user_id("user-123") .chatgpt_account_id("account-123"), AuthCredentialsStoreMode::File, )?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; let first_request = mcp .send_apps_list_request(AppsListParams { limit: Some(1), cursor: None, }) .await?; let first_response: JSONRPCResponse = timeout( DEFAULT_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(first_request)), ) .await??; let AppsListResponse { data: first_page, next_cursor: first_cursor, } = to_response(first_response)?; let expected_first = vec![AppInfo { id: "beta".to_string(), name: "Beta App".to_string(), description: None, logo_url: None, logo_url_dark: None, distribution_channel: None, install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()), is_accessible: true, }]; assert_eq!(first_page, expected_first); let next_cursor = first_cursor.ok_or_else(|| anyhow::anyhow!("missing cursor"))?; let second_request = mcp .send_apps_list_request(AppsListParams { limit: Some(1), cursor: Some(next_cursor), }) .await?; let second_response: JSONRPCResponse = timeout( DEFAULT_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(second_request)), ) .await??; let AppsListResponse { data: second_page, next_cursor: second_cursor, } = to_response(second_response)?; let expected_second = vec![AppInfo { id: "alpha".to_string(), name: "Alpha".to_string(), description: Some("Alpha connector".to_string()), logo_url: None, logo_url_dark: None, distribution_channel: None, install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), is_accessible: false, }]; assert_eq!(second_page, expected_second); assert!(second_cursor.is_none()); server_handle.abort(); Ok(()) } #[derive(Clone)] struct AppsServerState { expected_bearer: String, expected_account_id: String, response: serde_json::Value, } #[derive(Clone)] struct AppListMcpServer { tools: Arc>, } impl AppListMcpServer { fn new(tools: Arc>) -> Self { Self { tools } } } impl ServerHandler for AppListMcpServer { fn get_info(&self) -> ServerInfo { ServerInfo { capabilities: ServerCapabilities::builder().enable_tools().build(), ..ServerInfo::default() } } fn list_tools( &self, _request: Option, _context: rmcp::service::RequestContext, ) -> impl std::future::Future> + Send + '_ { let tools = self.tools.clone(); async move { Ok(ListToolsResult { tools: (*tools).clone(), next_cursor: None, meta: None, }) } } } async fn start_apps_server( connectors: Vec, tools: Vec, ) -> Result<(String, JoinHandle<()>)> { let state = AppsServerState { expected_bearer: "Bearer chatgpt-token".to_string(), expected_account_id: "account-123".to_string(), response: json!({ "apps": connectors, "next_token": null }), }; let state = Arc::new(state); let tools = Arc::new(tools); let listener = TcpListener::bind("127.0.0.1:0").await?; let addr = listener.local_addr()?; let mcp_service = StreamableHttpService::new( { let tools = tools.clone(); move || Ok(AppListMcpServer::new(tools.clone())) }, Arc::new(LocalSessionManager::default()), StreamableHttpServerConfig::default(), ); let router = Router::new() .route("/connectors/directory/list", get(list_directory_connectors)) .route( "/connectors/directory/list_workspace", get(list_directory_connectors), ) .with_state(state) .nest_service("/api/codex/apps", mcp_service); let handle = tokio::spawn(async move { let _ = axum::serve(listener, router).await; }); Ok((format!("http://{addr}"), handle)) } async fn list_directory_connectors( State(state): State>, headers: HeaderMap, ) -> Result { let bearer_ok = headers .get(AUTHORIZATION) .and_then(|value| value.to_str().ok()) .is_some_and(|value| value == state.expected_bearer); let account_ok = headers .get("chatgpt-account-id") .and_then(|value| value.to_str().ok()) .is_some_and(|value| value == state.expected_account_id); if bearer_ok && account_ok { Ok(Json(state.response.clone())) } else { Err(StatusCode::UNAUTHORIZED) } } fn connector_tool(connector_id: &str, connector_name: &str) -> Result { let schema: JsonObject = serde_json::from_value(json!({ "type": "object", "additionalProperties": false }))?; let mut tool = Tool::new( Cow::Owned(format!("connector_{connector_id}")), Cow::Borrowed("Connector test tool"), Arc::new(schema), ); tool.annotations = Some(ToolAnnotations::new().read_only(true)); let mut meta = Meta::new(); meta.0 .insert("connector_id".to_string(), json!(connector_id)); meta.0 .insert("connector_name".to_string(), json!(connector_name)); tool.meta = Some(meta); Ok(tool) } fn write_connectors_config(codex_home: &std::path::Path, base_url: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, format!( r#" chatgpt_base_url = "{base_url}" [features] connectors = true "# ), ) }