Compare commits

...

3 Commits

Author SHA1 Message Date
Matthew Zeng
b1f6bdb052 Make app list force-refetch tests deterministic 2026-04-16 13:58:07 -07:00
Matthew Zeng
bed02d6ab1 Make app list force-refetch cache tests deterministic 2026-04-16 11:07:15 -07:00
Matthew Zeng
967b1d89be Fix flaky app list tests 2026-04-15 11:46:21 -07:00
2 changed files with 96 additions and 16 deletions

View File

@@ -6092,26 +6092,23 @@ impl CodexMessageProcessor {
);
let cached_all_connectors = all_connectors.clone();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let accessible_config = config.clone();
let accessible_tx = tx.clone();
tokio::spawn(async move {
let mut accessible_task = tokio::spawn(async move {
let result = connectors::list_accessible_connectors_from_mcp_tools_with_options(
&accessible_config,
force_refetch,
)
.await
.map_err(|err| format!("failed to load accessible apps: {err}"));
let _ = accessible_tx.send(AppListLoadResult::Accessible(result));
AppListLoadResult::Accessible(result)
});
let all_config = config.clone();
tokio::spawn(async move {
let mut directory_task = tokio::spawn(async move {
let result = connectors::list_all_connectors_with_options(&all_config, force_refetch)
.await
.map_err(|err| format!("failed to list apps: {err}"));
let _ = tx.send(AppListLoadResult::Directory(result));
AppListLoadResult::Directory(result)
});
let app_list_deadline = tokio::time::Instant::now() + APP_LIST_LOAD_TIMEOUT;
@@ -6139,18 +6136,37 @@ impl CodexMessageProcessor {
}
loop {
let result = match tokio::time::timeout_at(app_list_deadline, rx.recv()).await {
Ok(Some(result)) => result,
Ok(None) => {
let result = match tokio::time::timeout_at(app_list_deadline, async {
tokio::select! {
result = &mut accessible_task, if !accessible_loaded => result,
result = &mut directory_task, if !all_loaded => result,
}
})
.await
{
Ok(Ok(result)) => result,
Ok(Err(err)) => {
if !accessible_loaded {
accessible_task.abort();
}
if !all_loaded {
directory_task.abort();
}
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "failed to load app lists".to_string(),
message: format!("failed to load app lists: {err}"),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
}
Err(_) => {
if !accessible_loaded {
accessible_task.abort();
}
if !all_loaded {
directory_task.abort();
}
let timeout_seconds = APP_LIST_LOAD_TIMEOUT.as_secs();
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
@@ -6170,6 +6186,9 @@ impl CodexMessageProcessor {
accessible_loaded = true;
}
AppListLoadResult::Accessible(Err(err)) => {
if !all_loaded {
directory_task.abort();
}
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: err,
@@ -6183,6 +6202,9 @@ impl CodexMessageProcessor {
all_loaded = true;
}
AppListLoadResult::Directory(Err(err)) => {
if !accessible_loaded {
accessible_task.abort();
}
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: err,

View File

@@ -53,6 +53,7 @@ use rmcp::transport::streamable_http_server::session::local::LocalSessionManager
use serde_json::json;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio::time::timeout;
@@ -61,6 +62,13 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test]
async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> {
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
r#"
[features]
connectors = false
"#,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
@@ -876,8 +884,13 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result
plugin_display_names: Vec::new(),
}];
let tools = vec![connector_tool("beta", "Beta App")?];
let (server_url, server_handle) =
start_apps_server_with_delays(connectors, tools, Duration::ZERO, Duration::ZERO).await?;
let (server_url, server_handle, server_control) = start_apps_server_with_delays_and_control(
connectors,
tools,
Duration::ZERO,
Duration::ZERO,
)
.await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
@@ -913,6 +926,11 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result
assert!(initial_next_cursor.is_none());
assert_eq!(initial_data.len(), 1);
assert!(initial_data.iter().all(|app| app.is_accessible));
timeout(
DEFAULT_TIMEOUT,
server_control.wait_for_tools_list_finished(),
)
.await?;
write_chatgpt_auth(
codex_home.path(),
@@ -922,6 +940,7 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
server_control.set_tools(Vec::new());
let refetch_request = mcp
.send_apps_list_request(AppsListParams {
@@ -938,6 +957,12 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result
.await??;
assert!(refetch_error.error.message.contains("failed to"));
timeout(
DEFAULT_TIMEOUT,
server_control.wait_for_tools_list_finished(),
)
.await?;
let cached_request = mcp
.send_apps_list_request(AppsListParams {
limit: None,
@@ -1332,11 +1357,20 @@ struct AppsServerState {
struct AppListMcpServer {
tools: Arc<StdMutex<Vec<Tool>>>,
tools_delay: Duration,
tools_list_finished: Arc<Notify>,
}
impl AppListMcpServer {
fn new(tools: Arc<StdMutex<Vec<Tool>>>, tools_delay: Duration) -> Self {
Self { tools, tools_delay }
fn new(
tools: Arc<StdMutex<Vec<Tool>>>,
tools_delay: Duration,
tools_list_finished: Arc<Notify>,
) -> Self {
Self {
tools,
tools_delay,
tools_list_finished,
}
}
}
@@ -1344,6 +1378,7 @@ impl AppListMcpServer {
struct AppsServerControl {
response: Arc<StdMutex<serde_json::Value>>,
tools: Arc<StdMutex<Vec<Tool>>>,
tools_list_finished: Arc<Notify>,
}
impl AppsServerControl {
@@ -1362,6 +1397,18 @@ impl AppsServerControl {
.unwrap_or_else(std::sync::PoisonError::into_inner);
*tools_guard = tools;
}
async fn wait_for_tools_list_finished(&self) {
self.tools_list_finished.notified().await;
}
}
struct NotifyOnDrop(Arc<Notify>);
impl Drop for NotifyOnDrop {
fn drop(&mut self) {
self.0.notify_one();
}
}
impl ServerHandler for AppListMcpServer {
@@ -1380,7 +1427,9 @@ impl ServerHandler for AppListMcpServer {
{
let tools = self.tools.clone();
let tools_delay = self.tools_delay;
let tools_list_finished = self.tools_list_finished.clone();
async move {
let _notify_finished = NotifyOnDrop(tools_list_finished);
if tools_delay > Duration::ZERO {
tokio::time::sleep(tools_delay).await;
}
@@ -1419,6 +1468,7 @@ async fn start_apps_server_with_delays_and_control(
json!({ "apps": connectors, "next_token": null }),
));
let tools = Arc::new(StdMutex::new(tools));
let tools_list_finished = Arc::new(Notify::new());
let state = AppsServerState {
expected_bearer: "Bearer chatgpt-token".to_string(),
expected_account_id: "account-123".to_string(),
@@ -1429,6 +1479,7 @@ async fn start_apps_server_with_delays_and_control(
let server_control = AppsServerControl {
response,
tools: tools.clone(),
tools_list_finished: tools_list_finished.clone(),
};
let listener = TcpListener::bind("127.0.0.1:0").await?;
@@ -1437,7 +1488,14 @@ async fn start_apps_server_with_delays_and_control(
let mcp_service = StreamableHttpService::new(
{
let tools = tools.clone();
move || Ok(AppListMcpServer::new(tools.clone(), tools_delay))
let tools_list_finished = tools_list_finished.clone();
move || {
Ok(AppListMcpServer::new(
tools.clone(),
tools_delay,
tools_list_finished.clone(),
))
}
},
Arc::new(LocalSessionManager::default()),
StreamableHttpServerConfig::default(),