mirror of
https://github.com/openai/codex.git
synced 2026-04-29 08:56:38 +00:00
Merge branch 'main' into rhan/surface-updates
This commit is contained in:
@@ -13,6 +13,7 @@ base64 = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-features = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-utils-cargo-bin = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use codex_core::features::FEATURES;
|
||||
use codex_core::features::Feature;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
|
||||
|
||||
@@ -10,8 +10,8 @@ use codex_app_server_protocol::ExperimentalFeatureStage;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_core::features::FEATURES;
|
||||
use codex_core::features::Stage;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Stage;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
@@ -18,8 +18,8 @@ use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_core::features::FEATURES;
|
||||
use codex_core::features::Feature;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::config_types::CollaborationMode;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Settings;
|
||||
|
||||
@@ -435,6 +435,7 @@ async fn plugin_install_returns_apps_needing_auth() -> Result<()> {
|
||||
name: "Alpha".to_string(),
|
||||
description: Some("Alpha connector".to_string()),
|
||||
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
|
||||
needs_auth: true,
|
||||
}],
|
||||
}
|
||||
);
|
||||
@@ -518,6 +519,7 @@ async fn plugin_install_filters_disallowed_apps_needing_auth() -> Result<()> {
|
||||
name: "Alpha".to_string(),
|
||||
description: Some("Alpha connector".to_string()),
|
||||
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
|
||||
needs_auth: true,
|
||||
}],
|
||||
}
|
||||
);
|
||||
@@ -527,6 +529,79 @@ async fn plugin_install_filters_disallowed_apps_needing_auth() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn plugin_install_makes_bundled_mcp_servers_available_to_followup_requests() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
"[features]\nplugins = true\n",
|
||||
)?;
|
||||
let repo_root = TempDir::new()?;
|
||||
write_plugin_marketplace(
|
||||
repo_root.path(),
|
||||
"debug",
|
||||
"sample-plugin",
|
||||
"./sample-plugin",
|
||||
None,
|
||||
None,
|
||||
)?;
|
||||
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
|
||||
std::fs::write(
|
||||
repo_root.path().join("sample-plugin/.mcp.json"),
|
||||
r#"{
|
||||
"mcpServers": {
|
||||
"sample-mcp": {
|
||||
"command": "echo"
|
||||
}
|
||||
}
|
||||
}"#,
|
||||
)?;
|
||||
let marketplace_path =
|
||||
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_plugin_install_request(PluginInstallParams {
|
||||
marketplace_path,
|
||||
plugin_name: "sample-plugin".to_string(),
|
||||
force_remote_sync: false,
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let response: PluginInstallResponse = to_response(response)?;
|
||||
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
|
||||
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
|
||||
assert!(!config.contains("[mcp_servers.sample-mcp]"));
|
||||
assert!(!config.contains("command = \"echo\""));
|
||||
|
||||
let request_id = mcp
|
||||
.send_raw_request(
|
||||
"mcpServer/oauth/login",
|
||||
Some(json!({
|
||||
"name": "sample-mcp",
|
||||
})),
|
||||
)
|
||||
.await?;
|
||||
let err = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert_eq!(err.error.code, -32600);
|
||||
assert_eq!(
|
||||
err.error.message,
|
||||
"OAuth login is only supported for streamable HTTP servers."
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppsServerState {
|
||||
response: Arc<StdMutex<serde_json::Value>>,
|
||||
|
||||
@@ -28,6 +28,7 @@ use wiremock::matchers::path;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567";
|
||||
const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1";
|
||||
|
||||
fn write_plugins_enabled_config(codex_home: &std::path::Path) -> std::io::Result<()> {
|
||||
std::fs::write(
|
||||
@@ -755,6 +756,91 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn app_server_startup_remote_plugin_sync_runs_once() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let server = MockServer::start().await;
|
||||
write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
|
||||
write_chatgpt_auth(
|
||||
codex_home.path(),
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
.account_id("account-123")
|
||||
.chatgpt_user_id("user-123")
|
||||
.chatgpt_account_id("account-123"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)?;
|
||||
write_openai_curated_marketplace(codex_home.path(), &["linear"])?;
|
||||
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/plugins/list"))
|
||||
.and(header("authorization", "Bearer chatgpt-token"))
|
||||
.and(header("chatgpt-account-id", "account-123"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(
|
||||
r#"[
|
||||
{"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}
|
||||
]"#,
|
||||
))
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/plugins/featured"))
|
||||
.and(header("authorization", "Bearer chatgpt-token"))
|
||||
.and(header("chatgpt-account-id", "account-123"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let marker_path = codex_home
|
||||
.path()
|
||||
.join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE);
|
||||
|
||||
{
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
wait_for_path_exists(&marker_path).await?;
|
||||
wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?;
|
||||
let request_id = mcp
|
||||
.send_plugin_list_request(PluginListParams {
|
||||
cwds: None,
|
||||
force_remote_sync: false,
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let response: PluginListResponse = to_response(response)?;
|
||||
let curated_marketplace = response
|
||||
.marketplaces
|
||||
.into_iter()
|
||||
.find(|marketplace| marketplace.name == "openai-curated")
|
||||
.expect("expected openai-curated marketplace entry");
|
||||
assert_eq!(
|
||||
curated_marketplace
|
||||
.plugins
|
||||
.into_iter()
|
||||
.map(|plugin| (plugin.id, plugin.installed, plugin.enabled))
|
||||
.collect::<Vec<_>>(),
|
||||
vec![("linear@openai-curated".to_string(), true, true)]
|
||||
);
|
||||
wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?;
|
||||
}
|
||||
|
||||
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
|
||||
assert!(config.contains(r#"[plugins."linear@openai-curated"]"#));
|
||||
|
||||
{
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||
wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn plugin_list_fetches_featured_plugin_ids_without_chatgpt_auth() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -836,24 +922,32 @@ async fn plugin_list_uses_warmed_featured_plugin_ids_cache_on_first_request() ->
|
||||
async fn wait_for_featured_plugin_request_count(
|
||||
server: &MockServer,
|
||||
expected_count: usize,
|
||||
) -> Result<()> {
|
||||
wait_for_remote_plugin_request_count(server, "/plugins/featured", expected_count).await
|
||||
}
|
||||
|
||||
async fn wait_for_remote_plugin_request_count(
|
||||
server: &MockServer,
|
||||
path_suffix: &str,
|
||||
expected_count: usize,
|
||||
) -> Result<()> {
|
||||
timeout(DEFAULT_TIMEOUT, async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
bail!("wiremock did not record requests");
|
||||
};
|
||||
let featured_request_count = requests
|
||||
let request_count = requests
|
||||
.iter()
|
||||
.filter(|request| {
|
||||
request.method == "GET" && request.url.path().ends_with("/plugins/featured")
|
||||
request.method == "GET" && request.url.path().ends_with(path_suffix)
|
||||
})
|
||||
.count();
|
||||
if featured_request_count == expected_count {
|
||||
if request_count == expected_count {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
if featured_request_count > expected_count {
|
||||
if request_count > expected_count {
|
||||
bail!(
|
||||
"expected exactly {expected_count} /plugins/featured requests, got {featured_request_count}"
|
||||
"expected exactly {expected_count} {path_suffix} requests, got {request_count}"
|
||||
);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -863,6 +957,19 @@ async fn wait_for_featured_plugin_request_count(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_path_exists(path: &std::path::Path) -> Result<()> {
|
||||
timeout(DEFAULT_TIMEOUT, async {
|
||||
loop {
|
||||
if path.exists() {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_installed_plugin(
|
||||
codex_home: &TempDir,
|
||||
marketplace_name: &str,
|
||||
|
||||
@@ -1,17 +1,46 @@
|
||||
use std::borrow::Cow;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use app_test_support::ChatGptAuthFixture;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::to_response;
|
||||
use app_test_support::write_chatgpt_auth;
|
||||
use axum::Json;
|
||||
use axum::Router;
|
||||
use axum::extract::State;
|
||||
use axum::http::HeaderMap;
|
||||
use axum::http::StatusCode;
|
||||
use axum::http::Uri;
|
||||
use axum::http::header::AUTHORIZATION;
|
||||
use axum::routing::get;
|
||||
use codex_app_server_protocol::AppInfo;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::PluginAuthPolicy;
|
||||
use codex_app_server_protocol::PluginInstallPolicy;
|
||||
use codex_app_server_protocol::PluginReadParams;
|
||||
use codex_app_server_protocol::PluginReadResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_core::auth::AuthCredentialsStoreMode;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use pretty_assertions::assert_eq;
|
||||
use rmcp::handler::server::ServerHandler;
|
||||
use rmcp::model::JsonObject;
|
||||
use rmcp::model::ListToolsResult;
|
||||
use rmcp::model::Meta;
|
||||
use rmcp::model::ServerCapabilities;
|
||||
use rmcp::model::ServerInfo;
|
||||
use rmcp::model::Tool;
|
||||
use rmcp::model::ToolAnnotations;
|
||||
use rmcp::transport::StreamableHttpServerConfig;
|
||||
use rmcp::transport::StreamableHttpService;
|
||||
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
@@ -222,11 +251,103 @@ enabled = true
|
||||
response.plugin.apps[0].install_url.as_deref(),
|
||||
Some("https://chatgpt.com/apps/gmail/gmail")
|
||||
);
|
||||
assert_eq!(response.plugin.apps[0].needs_auth, true);
|
||||
assert_eq!(response.plugin.mcp_servers.len(), 1);
|
||||
assert_eq!(response.plugin.mcp_servers[0], "demo");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn plugin_read_returns_app_needs_auth() -> Result<()> {
|
||||
let connectors = vec![
|
||||
AppInfo {
|
||||
id: "alpha".to_string(),
|
||||
name: "Alpha".to_string(),
|
||||
description: Some("Alpha connector".to_string()),
|
||||
logo_url: Some("https://example.com/alpha.png".to_string()),
|
||||
logo_url_dark: None,
|
||||
distribution_channel: Some("featured".to_string()),
|
||||
branding: None,
|
||||
app_metadata: None,
|
||||
labels: None,
|
||||
install_url: None,
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
plugin_display_names: Vec::new(),
|
||||
},
|
||||
AppInfo {
|
||||
id: "beta".to_string(),
|
||||
name: "Beta".to_string(),
|
||||
description: Some("Beta connector".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
branding: None,
|
||||
app_metadata: None,
|
||||
labels: None,
|
||||
install_url: None,
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
plugin_display_names: Vec::new(),
|
||||
},
|
||||
];
|
||||
let tools = vec![connector_tool("beta", "Beta App")?];
|
||||
let (server_url, server_handle) = start_apps_server(connectors, tools).await?;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
write_connectors_config(codex_home.path(), &server_url)?;
|
||||
write_chatgpt_auth(
|
||||
codex_home.path(),
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
.account_id("account-123")
|
||||
.chatgpt_user_id("user-123")
|
||||
.chatgpt_account_id("account-123"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)?;
|
||||
|
||||
let repo_root = TempDir::new()?;
|
||||
write_plugin_marketplace(
|
||||
repo_root.path(),
|
||||
"debug",
|
||||
"sample-plugin",
|
||||
"./sample-plugin",
|
||||
)?;
|
||||
write_plugin_source(repo_root.path(), "sample-plugin", &["alpha", "beta"])?;
|
||||
let marketplace_path =
|
||||
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_plugin_read_request(PluginReadParams {
|
||||
marketplace_path,
|
||||
plugin_name: "sample-plugin".to_string(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let response: PluginReadResponse = to_response(response)?;
|
||||
|
||||
assert_eq!(
|
||||
response
|
||||
.plugin
|
||||
.apps
|
||||
.iter()
|
||||
.map(|app| (app.id.as_str(), app.needs_auth))
|
||||
.collect::<Vec<_>>(),
|
||||
vec![("alpha", true), ("beta", false)]
|
||||
);
|
||||
|
||||
server_handle.abort();
|
||||
let _ = server_handle.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn plugin_read_accepts_legacy_string_default_prompt() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -422,3 +543,201 @@ plugins = true
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppsServerState {
|
||||
response: Arc<StdMutex<serde_json::Value>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PluginReadMcpServer {
|
||||
tools: Arc<StdMutex<Vec<Tool>>>,
|
||||
}
|
||||
|
||||
impl ServerHandler for PluginReadMcpServer {
|
||||
fn get_info(&self) -> ServerInfo {
|
||||
ServerInfo {
|
||||
capabilities: ServerCapabilities::builder().enable_tools().build(),
|
||||
..ServerInfo::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn list_tools(
|
||||
&self,
|
||||
_request: Option<rmcp::model::PaginatedRequestParams>,
|
||||
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
|
||||
) -> impl std::future::Future<Output = Result<ListToolsResult, rmcp::ErrorData>> + Send + '_
|
||||
{
|
||||
let tools = self.tools.clone();
|
||||
async move {
|
||||
let tools = tools
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.clone();
|
||||
Ok(ListToolsResult {
|
||||
tools,
|
||||
next_cursor: None,
|
||||
meta: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_apps_server(
|
||||
connectors: Vec<AppInfo>,
|
||||
tools: Vec<Tool>,
|
||||
) -> Result<(String, JoinHandle<()>)> {
|
||||
let state = Arc::new(AppsServerState {
|
||||
response: Arc::new(StdMutex::new(
|
||||
json!({ "apps": connectors, "next_token": null }),
|
||||
)),
|
||||
});
|
||||
let tools = Arc::new(StdMutex::new(tools));
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
let addr = listener.local_addr()?;
|
||||
let mcp_service = StreamableHttpService::new(
|
||||
{
|
||||
let tools = tools.clone();
|
||||
move || {
|
||||
Ok(PluginReadMcpServer {
|
||||
tools: tools.clone(),
|
||||
})
|
||||
}
|
||||
},
|
||||
Arc::new(LocalSessionManager::default()),
|
||||
StreamableHttpServerConfig::default(),
|
||||
);
|
||||
let router = Router::new()
|
||||
.route("/connectors/directory/list", get(list_directory_connectors))
|
||||
.route(
|
||||
"/connectors/directory/list_workspace",
|
||||
get(list_directory_connectors),
|
||||
)
|
||||
.with_state(state)
|
||||
.nest_service("/api/codex/apps", mcp_service);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let _ = axum::serve(listener, router).await;
|
||||
});
|
||||
|
||||
Ok((format!("http://{addr}"), handle))
|
||||
}
|
||||
|
||||
async fn list_directory_connectors(
|
||||
State(state): State<Arc<AppsServerState>>,
|
||||
headers: HeaderMap,
|
||||
uri: Uri,
|
||||
) -> Result<impl axum::response::IntoResponse, StatusCode> {
|
||||
let bearer_ok = headers
|
||||
.get(AUTHORIZATION)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.is_some_and(|value| value == "Bearer chatgpt-token");
|
||||
let account_ok = headers
|
||||
.get("chatgpt-account-id")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.is_some_and(|value| value == "account-123");
|
||||
let external_logos_ok = uri
|
||||
.query()
|
||||
.is_some_and(|query| query.split('&').any(|pair| pair == "external_logos=true"));
|
||||
|
||||
if !bearer_ok || !account_ok {
|
||||
Err(StatusCode::UNAUTHORIZED)
|
||||
} else if !external_logos_ok {
|
||||
Err(StatusCode::BAD_REQUEST)
|
||||
} else {
|
||||
let response = state
|
||||
.response
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.clone();
|
||||
Ok(Json(response))
|
||||
}
|
||||
}
|
||||
|
||||
fn connector_tool(connector_id: &str, connector_name: &str) -> Result<Tool> {
|
||||
let schema: JsonObject = serde_json::from_value(json!({
|
||||
"type": "object",
|
||||
"additionalProperties": false
|
||||
}))?;
|
||||
let mut tool = Tool::new(
|
||||
Cow::Owned(format!("connector_{connector_id}")),
|
||||
Cow::Borrowed("Connector test tool"),
|
||||
Arc::new(schema),
|
||||
);
|
||||
tool.annotations = Some(ToolAnnotations::new().read_only(true));
|
||||
|
||||
let mut meta = Meta::new();
|
||||
meta.0
|
||||
.insert("connector_id".to_string(), json!(connector_id));
|
||||
meta.0
|
||||
.insert("connector_name".to_string(), json!(connector_name));
|
||||
tool.meta = Some(meta);
|
||||
Ok(tool)
|
||||
}
|
||||
|
||||
fn write_connectors_config(codex_home: &std::path::Path, base_url: &str) -> std::io::Result<()> {
|
||||
std::fs::write(
|
||||
codex_home.join("config.toml"),
|
||||
format!(
|
||||
r#"
|
||||
chatgpt_base_url = "{base_url}"
|
||||
mcp_oauth_credentials_store = "file"
|
||||
|
||||
[features]
|
||||
plugins = true
|
||||
connectors = true
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn write_plugin_marketplace(
|
||||
repo_root: &std::path::Path,
|
||||
marketplace_name: &str,
|
||||
plugin_name: &str,
|
||||
source_path: &str,
|
||||
) -> std::io::Result<()> {
|
||||
std::fs::create_dir_all(repo_root.join(".git"))?;
|
||||
std::fs::create_dir_all(repo_root.join(".agents/plugins"))?;
|
||||
std::fs::write(
|
||||
repo_root.join(".agents/plugins/marketplace.json"),
|
||||
format!(
|
||||
r#"{{
|
||||
"name": "{marketplace_name}",
|
||||
"plugins": [
|
||||
{{
|
||||
"name": "{plugin_name}",
|
||||
"source": {{
|
||||
"source": "local",
|
||||
"path": "{source_path}"
|
||||
}}
|
||||
}}
|
||||
]
|
||||
}}"#
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn write_plugin_source(
|
||||
repo_root: &std::path::Path,
|
||||
plugin_name: &str,
|
||||
app_ids: &[&str],
|
||||
) -> Result<()> {
|
||||
let plugin_root = repo_root.join(plugin_name);
|
||||
std::fs::create_dir_all(plugin_root.join(".codex-plugin"))?;
|
||||
std::fs::write(
|
||||
plugin_root.join(".codex-plugin/plugin.json"),
|
||||
format!(r#"{{"name":"{plugin_name}"}}"#),
|
||||
)?;
|
||||
|
||||
let apps = app_ids
|
||||
.iter()
|
||||
.map(|app_id| ((*app_id).to_string(), json!({ "id": app_id })))
|
||||
.collect::<serde_json::Map<_, _>>();
|
||||
std::fs::write(
|
||||
plugin_root.join(".app.json"),
|
||||
serde_json::to_vec_pretty(&json!({ "apps": apps }))?,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -23,8 +23,8 @@ use codex_app_server_protocol::ThreadRealtimeStopParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeStopResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_core::features::FEATURES;
|
||||
use codex_core::features::Feature;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::protocol::RealtimeConversationVersion;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
|
||||
@@ -26,8 +26,8 @@ use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_core::features::FEATURES;
|
||||
use codex_core::features::Feature;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
|
||||
@@ -7,7 +7,10 @@ use app_test_support::write_chatgpt_auth;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::McpServerStartupState;
|
||||
use codex_app_server_protocol::McpServerStatusUpdatedNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
@@ -328,6 +331,103 @@ async fn thread_start_fails_when_required_mcp_server_fails_to_initialize() -> Re
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_start_emits_mcp_server_status_updated_notifications() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml_with_optional_broken_mcp(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let req_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
|
||||
let _: ThreadStartResponse = to_response(
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
|
||||
)
|
||||
.await??,
|
||||
)?;
|
||||
|
||||
let starting = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_matching_notification(
|
||||
"mcpServer/startupStatus/updated starting",
|
||||
|notification| {
|
||||
notification.method == "mcpServer/startupStatus/updated"
|
||||
&& notification
|
||||
.params
|
||||
.as_ref()
|
||||
.and_then(|params| params.get("name"))
|
||||
.and_then(Value::as_str)
|
||||
== Some("optional_broken")
|
||||
&& notification
|
||||
.params
|
||||
.as_ref()
|
||||
.and_then(|params| params.get("status"))
|
||||
.and_then(Value::as_str)
|
||||
== Some("starting")
|
||||
},
|
||||
),
|
||||
)
|
||||
.await??;
|
||||
let starting: ServerNotification = starting.try_into()?;
|
||||
let ServerNotification::McpServerStatusUpdated(starting) = starting else {
|
||||
anyhow::bail!("unexpected notification variant");
|
||||
};
|
||||
assert_eq!(
|
||||
starting,
|
||||
McpServerStatusUpdatedNotification {
|
||||
name: "optional_broken".to_string(),
|
||||
status: McpServerStartupState::Starting,
|
||||
error: None,
|
||||
}
|
||||
);
|
||||
|
||||
let failed = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_matching_notification(
|
||||
"mcpServer/startupStatus/updated failed",
|
||||
|notification| {
|
||||
notification.method == "mcpServer/startupStatus/updated"
|
||||
&& notification
|
||||
.params
|
||||
.as_ref()
|
||||
.and_then(|params| params.get("name"))
|
||||
.and_then(Value::as_str)
|
||||
== Some("optional_broken")
|
||||
&& notification
|
||||
.params
|
||||
.as_ref()
|
||||
.and_then(|params| params.get("status"))
|
||||
.and_then(Value::as_str)
|
||||
== Some("failed")
|
||||
},
|
||||
),
|
||||
)
|
||||
.await??;
|
||||
let failed: ServerNotification = failed.try_into()?;
|
||||
let ServerNotification::McpServerStatusUpdated(failed) = failed else {
|
||||
anyhow::bail!("unexpected notification variant");
|
||||
};
|
||||
assert_eq!(failed.name, "optional_broken");
|
||||
assert_eq!(failed.status, McpServerStartupState::Failed);
|
||||
assert!(
|
||||
failed
|
||||
.error
|
||||
.as_deref()
|
||||
.is_some_and(|error| error.contains("MCP client for `optional_broken` failed to start")),
|
||||
"unexpected MCP startup error: {:?}",
|
||||
failed.error
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_start_surfaces_cloud_requirements_load_errors() -> Result<()> {
|
||||
let server = MockServer::start().await;
|
||||
@@ -491,3 +591,32 @@ required = true
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn create_config_toml_with_optional_broken_mcp(
|
||||
codex_home: &Path,
|
||||
server_uri: &str,
|
||||
) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
|
||||
[mcp_servers.optional_broken]
|
||||
command = "codex-definitely-not-a-real-binary"
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -44,9 +44,9 @@ use codex_app_server_protocol::TurnStartedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_core::config::ConfigToml;
|
||||
use codex_core::features::FEATURES;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::personality_migration::PERSONALITY_MIGRATION_FILENAME;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::config_types::CollaborationMode;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Personality;
|
||||
|
||||
@@ -30,8 +30,8 @@ use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_core::features::FEATURES;
|
||||
use codex_core::features::Feature;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
Reference in New Issue
Block a user