[apps] Stablize app list updated event. (#13067)

Stablize app list updated event so that we only send 2 updates: 1 when
installed apps become available, one when all directory apps are
available. Previously it also updates when directory apps become
available before installed apps, which cuts off installed apps.
This commit is contained in:
Matthew Zeng
2026-02-27 15:23:24 -08:00
committed by GitHub
parent 695957a348
commit 392fa7de50
2 changed files with 132 additions and 95 deletions

View File

@@ -5332,8 +5332,14 @@ impl CodexMessageProcessor {
),
&config,
);
Self::send_app_list_updated_notification(&outgoing, merged.clone()).await;
last_notified_apps = Some(merged);
if Self::should_send_app_list_updated_notification(
merged.as_slice(),
accessible_loaded,
all_loaded,
) {
Self::send_app_list_updated_notification(&outgoing, merged.clone()).await;
last_notified_apps = Some(merged);
}
}
loop {
@@ -5411,7 +5417,12 @@ impl CodexMessageProcessor {
),
&config,
);
if last_notified_apps.as_ref() != Some(&merged) {
if Self::should_send_app_list_updated_notification(
merged.as_slice(),
accessible_loaded,
all_loaded,
) && last_notified_apps.as_ref() != Some(&merged)
{
Self::send_app_list_updated_notification(&outgoing, merged.clone()).await;
last_notified_apps = Some(merged.clone());
}
@@ -5441,6 +5452,15 @@ impl CodexMessageProcessor {
connectors::merge_connectors_with_accessible(all, accessible, all_connectors_loaded)
}
fn should_send_app_list_updated_notification(
connectors: &[AppInfo],
accessible_loaded: bool,
all_loaded: bool,
) -> bool {
connectors.iter().any(|connector| connector.is_accessible)
|| (accessible_loaded && all_loaded)
}
fn paginate_apps(
connectors: &[AppInfo],
start: usize,

View File

@@ -428,7 +428,7 @@ async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<(
}
#[tokio::test]
async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
async fn list_apps_waits_for_accessible_data_before_emitting_directory_updates() -> Result<()> {
let connectors = vec![
AppInfo {
id: "alpha".to_string(),
@@ -475,7 +475,7 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_user_id("user-directory-first")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
@@ -492,60 +492,14 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
})
.await?;
let expected_directory_first = vec![
AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
is_accessible: false,
is_enabled: true,
},
AppInfo {
id: "beta".to_string(),
name: "beta".to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()),
is_accessible: false,
is_enabled: true,
},
];
let expected_accessible_first = vec![AppInfo {
id: "beta".to_string(),
name: "Beta App".to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()),
is_accessible: true,
is_enabled: true,
}];
let first_update = read_app_list_updated_notification(&mut mcp).await?;
// app/list emits an update after whichever async load finishes first. Even with
// a tools delay in this test, the accessible-tools path can return first if the
// process-global Codex Apps tools cache is warm from another test.
let maybe_update = timeout(
Duration::from_millis(150),
read_app_list_updated_notification(&mut mcp),
)
.await;
assert!(
first_update.data == expected_directory_first
|| first_update.data == expected_accessible_first,
"unexpected first app/list update: {:#?}",
first_update.data
maybe_update.is_err(),
"unexpected directory-only app/list update before accessible apps loaded"
);
let expected = vec![
@@ -579,8 +533,96 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
},
];
let second_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(second_update.data, expected);
let update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(update.data, expected);
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let AppsListResponse { data, next_cursor } = to_response(response)?;
assert_eq!(data, expected);
assert!(next_cursor.is_none());
server_handle.abort();
Ok(())
}
#[tokio::test]
async fn list_apps_does_not_emit_empty_interim_updates() -> Result<()> {
let connectors = vec![AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: None,
is_accessible: false,
is_enabled: true,
}];
let (server_url, server_handle) = start_apps_server_with_delays(
connectors.clone(),
Vec::new(),
Duration::from_millis(300),
Duration::ZERO,
)
.await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-empty-interim")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_apps_list_request(AppsListParams {
limit: None,
cursor: None,
thread_id: None,
force_refetch: false,
})
.await?;
let maybe_update = timeout(
Duration::from_millis(150),
read_app_list_updated_notification(&mut mcp),
)
.await;
assert!(
maybe_update.is_err(),
"unexpected empty interim app/list update"
);
let expected = vec![AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
is_accessible: false,
is_enabled: true,
}];
let update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(update.data, expected);
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
@@ -1026,39 +1068,14 @@ async fn list_apps_force_refetch_patches_updates_from_cached_snapshots() -> Resu
]
);
let second_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(
second_update.data,
vec![
AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha v1".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
is_accessible: false,
is_enabled: true,
},
AppInfo {
id: "beta".to_string(),
name: "Beta App".to_string(),
description: Some("Beta v1".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()),
is_accessible: false,
is_enabled: true,
},
]
let maybe_second_update = timeout(
Duration::from_millis(150),
read_app_list_updated_notification(&mut mcp),
)
.await;
assert!(
maybe_second_update.is_err(),
"unexpected inaccessible-only app/list update during force refetch"
);
let expected_final = vec![AppInfo {
@@ -1075,8 +1092,8 @@ async fn list_apps_force_refetch_patches_updates_from_cached_snapshots() -> Resu
is_accessible: false,
is_enabled: true,
}];
let third_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(third_update.data, expected_final);
let second_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(second_update.data, expected_final);
let refetch_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,