mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
[apps] Fix app loading logic. (#11518)
When `app/list` is called with `force_refetch=True`, we should seed the results with what is already cached instead of starting from an empty list. Otherwise when we send app/list/updated events, the client will first see an empty list of accessible apps and then get the updated one.
This commit is contained in:
@@ -4610,6 +4610,11 @@ impl CodexMessageProcessor {
|
||||
None => 0,
|
||||
};
|
||||
|
||||
let (mut accessible_connectors, mut all_connectors) = tokio::join!(
|
||||
connectors::list_cached_accessible_connectors_from_mcp_tools(&config),
|
||||
connectors::list_cached_all_connectors(&config)
|
||||
);
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let accessible_config = config.clone();
|
||||
@@ -4632,9 +4637,9 @@ impl CodexMessageProcessor {
|
||||
let _ = tx.send(AppListLoadResult::Directory(result));
|
||||
});
|
||||
|
||||
let mut accessible_connectors: Option<Vec<AppInfo>> = None;
|
||||
let mut all_connectors: Option<Vec<AppInfo>> = None;
|
||||
let app_list_deadline = tokio::time::Instant::now() + APP_LIST_LOAD_TIMEOUT;
|
||||
let mut accessible_loaded = false;
|
||||
let mut all_loaded = false;
|
||||
|
||||
loop {
|
||||
let result = match tokio::time::timeout_at(app_list_deadline, rx.recv()).await {
|
||||
@@ -4665,6 +4670,7 @@ impl CodexMessageProcessor {
|
||||
match result {
|
||||
AppListLoadResult::Accessible(Ok(connectors)) => {
|
||||
accessible_connectors = Some(connectors);
|
||||
accessible_loaded = true;
|
||||
}
|
||||
AppListLoadResult::Accessible(Err(err)) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -4677,6 +4683,7 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
AppListLoadResult::Directory(Ok(connectors)) => {
|
||||
all_connectors = Some(connectors);
|
||||
all_loaded = true;
|
||||
}
|
||||
AppListLoadResult::Directory(Err(err)) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -4698,7 +4705,7 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
Self::send_app_list_updated_notification(&outgoing, merged.clone()).await;
|
||||
|
||||
if accessible_connectors.is_some() && all_connectors.is_some() {
|
||||
if accessible_loaded && all_loaded {
|
||||
match Self::paginate_apps(merged.as_slice(), start, limit) {
|
||||
Ok(response) => {
|
||||
outgoing.send_response(request_id, response).await;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::borrow::Cow;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
@@ -722,6 +723,201 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_apps_force_refetch_patches_updates_from_cached_snapshots() -> Result<()> {
|
||||
let initial_connectors = vec![
|
||||
AppInfo {
|
||||
id: "alpha".to_string(),
|
||||
name: "Alpha".to_string(),
|
||||
description: Some("Alpha v1".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: None,
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
},
|
||||
AppInfo {
|
||||
id: "beta".to_string(),
|
||||
name: "Beta App".to_string(),
|
||||
description: Some("Beta v1".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: None,
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
},
|
||||
];
|
||||
let initial_tools = vec![connector_tool("beta", "Beta App")?];
|
||||
let (server_url, server_handle, server_control) = start_apps_server_with_delays_and_control(
|
||||
initial_connectors,
|
||||
initial_tools,
|
||||
Duration::from_millis(300),
|
||||
Duration::ZERO,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
write_connectors_config(codex_home.path(), &server_url)?;
|
||||
write_chatgpt_auth(
|
||||
codex_home.path(),
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
.account_id("account-123")
|
||||
.chatgpt_user_id("user-123")
|
||||
.chatgpt_account_id("account-123"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let warm_request = mcp
|
||||
.send_apps_list_request(AppsListParams {
|
||||
limit: None,
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
let warm_first_update = read_app_list_updated_notification(&mut mcp).await?;
|
||||
assert_eq!(
|
||||
warm_first_update.data,
|
||||
vec![AppInfo {
|
||||
id: "beta".to_string(),
|
||||
name: "Beta App".to_string(),
|
||||
description: None,
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()),
|
||||
is_accessible: true,
|
||||
is_enabled: true,
|
||||
}]
|
||||
);
|
||||
|
||||
let warm_second_update = read_app_list_updated_notification(&mut mcp).await?;
|
||||
assert_eq!(
|
||||
warm_second_update.data,
|
||||
vec![
|
||||
AppInfo {
|
||||
id: "beta".to_string(),
|
||||
name: "Beta App".to_string(),
|
||||
description: Some("Beta v1".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()),
|
||||
is_accessible: true,
|
||||
is_enabled: true,
|
||||
},
|
||||
AppInfo {
|
||||
id: "alpha".to_string(),
|
||||
name: "Alpha".to_string(),
|
||||
description: Some("Alpha v1".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
let warm_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(warm_request)),
|
||||
)
|
||||
.await??;
|
||||
let AppsListResponse {
|
||||
data: warm_data,
|
||||
next_cursor: warm_next_cursor,
|
||||
} = to_response(warm_response)?;
|
||||
assert_eq!(warm_data, warm_second_update.data);
|
||||
assert!(warm_next_cursor.is_none());
|
||||
|
||||
server_control.set_connectors(vec![AppInfo {
|
||||
id: "alpha".to_string(),
|
||||
name: "Alpha".to_string(),
|
||||
description: Some("Alpha v2".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: None,
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
}]);
|
||||
server_control.set_tools(Vec::new());
|
||||
|
||||
let refetch_request = mcp
|
||||
.send_apps_list_request(AppsListParams {
|
||||
limit: None,
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: true,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let first_update = read_app_list_updated_notification(&mut mcp).await?;
|
||||
assert_eq!(
|
||||
first_update.data,
|
||||
vec![
|
||||
AppInfo {
|
||||
id: "alpha".to_string(),
|
||||
name: "Alpha".to_string(),
|
||||
description: Some("Alpha v1".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
},
|
||||
AppInfo {
|
||||
id: "beta".to_string(),
|
||||
name: "Beta App".to_string(),
|
||||
description: Some("Beta v1".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()),
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
let expected_final = vec![AppInfo {
|
||||
id: "alpha".to_string(),
|
||||
name: "Alpha".to_string(),
|
||||
description: Some("Alpha v2".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
}];
|
||||
let second_update = read_app_list_updated_notification(&mut mcp).await?;
|
||||
assert_eq!(second_update.data, expected_final);
|
||||
|
||||
let refetch_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(refetch_request)),
|
||||
)
|
||||
.await??;
|
||||
let AppsListResponse {
|
||||
data: refetch_data,
|
||||
next_cursor: refetch_next_cursor,
|
||||
} = to_response(refetch_response)?;
|
||||
assert_eq!(refetch_data, expected_final);
|
||||
assert!(refetch_next_cursor.is_none());
|
||||
|
||||
server_handle.abort();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_app_list_updated_notification(
|
||||
mcp: &mut McpProcess,
|
||||
) -> Result<AppListUpdatedNotification> {
|
||||
@@ -741,22 +937,46 @@ async fn read_app_list_updated_notification(
|
||||
struct AppsServerState {
|
||||
expected_bearer: String,
|
||||
expected_account_id: String,
|
||||
response: serde_json::Value,
|
||||
response: Arc<StdMutex<serde_json::Value>>,
|
||||
directory_delay: Duration,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppListMcpServer {
|
||||
tools: Arc<Vec<Tool>>,
|
||||
tools: Arc<StdMutex<Vec<Tool>>>,
|
||||
tools_delay: Duration,
|
||||
}
|
||||
|
||||
impl AppListMcpServer {
|
||||
fn new(tools: Arc<Vec<Tool>>, tools_delay: Duration) -> Self {
|
||||
fn new(tools: Arc<StdMutex<Vec<Tool>>>, tools_delay: Duration) -> Self {
|
||||
Self { tools, tools_delay }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppsServerControl {
|
||||
response: Arc<StdMutex<serde_json::Value>>,
|
||||
tools: Arc<StdMutex<Vec<Tool>>>,
|
||||
}
|
||||
|
||||
impl AppsServerControl {
|
||||
fn set_connectors(&self, connectors: Vec<AppInfo>) {
|
||||
let mut response_guard = self
|
||||
.response
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*response_guard = json!({ "apps": connectors, "next_token": null });
|
||||
}
|
||||
|
||||
fn set_tools(&self, tools: Vec<Tool>) {
|
||||
let mut tools_guard = self
|
||||
.tools
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*tools_guard = tools;
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerHandler for AppListMcpServer {
|
||||
fn get_info(&self) -> ServerInfo {
|
||||
ServerInfo {
|
||||
@@ -777,8 +997,12 @@ impl ServerHandler for AppListMcpServer {
|
||||
if tools_delay > Duration::ZERO {
|
||||
tokio::time::sleep(tools_delay).await;
|
||||
}
|
||||
let tools = tools
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.clone();
|
||||
Ok(ListToolsResult {
|
||||
tools: (*tools).clone(),
|
||||
tools,
|
||||
next_cursor: None,
|
||||
meta: None,
|
||||
})
|
||||
@@ -792,14 +1016,33 @@ async fn start_apps_server_with_delays(
|
||||
directory_delay: Duration,
|
||||
tools_delay: Duration,
|
||||
) -> Result<(String, JoinHandle<()>)> {
|
||||
let (server_url, server_handle, _server_control) =
|
||||
start_apps_server_with_delays_and_control(connectors, tools, directory_delay, tools_delay)
|
||||
.await?;
|
||||
Ok((server_url, server_handle))
|
||||
}
|
||||
|
||||
async fn start_apps_server_with_delays_and_control(
|
||||
connectors: Vec<AppInfo>,
|
||||
tools: Vec<Tool>,
|
||||
directory_delay: Duration,
|
||||
tools_delay: Duration,
|
||||
) -> Result<(String, JoinHandle<()>, AppsServerControl)> {
|
||||
let response = Arc::new(StdMutex::new(
|
||||
json!({ "apps": connectors, "next_token": null }),
|
||||
));
|
||||
let tools = Arc::new(StdMutex::new(tools));
|
||||
let state = AppsServerState {
|
||||
expected_bearer: "Bearer chatgpt-token".to_string(),
|
||||
expected_account_id: "account-123".to_string(),
|
||||
response: json!({ "apps": connectors, "next_token": null }),
|
||||
response: response.clone(),
|
||||
directory_delay,
|
||||
};
|
||||
let state = Arc::new(state);
|
||||
let tools = Arc::new(tools);
|
||||
let server_control = AppsServerControl {
|
||||
response,
|
||||
tools: tools.clone(),
|
||||
};
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
let addr = listener.local_addr()?;
|
||||
@@ -826,7 +1069,7 @@ async fn start_apps_server_with_delays(
|
||||
let _ = axum::serve(listener, router).await;
|
||||
});
|
||||
|
||||
Ok((format!("http://{addr}"), handle))
|
||||
Ok((format!("http://{addr}"), handle, server_control))
|
||||
}
|
||||
|
||||
async fn list_directory_connectors(
|
||||
@@ -847,7 +1090,12 @@ async fn list_directory_connectors(
|
||||
.is_some_and(|value| value == state.expected_account_id);
|
||||
|
||||
if bearer_ok && account_ok {
|
||||
Ok(Json(state.response.clone()))
|
||||
let response = state
|
||||
.response
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.clone();
|
||||
Ok(Json(response))
|
||||
} else {
|
||||
Err(StatusCode::UNAUTHORIZED)
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ pub use codex_core::connectors::connector_display_label;
|
||||
use codex_core::connectors::connector_install_url;
|
||||
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools;
|
||||
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options;
|
||||
pub use codex_core::connectors::list_cached_accessible_connectors_from_mcp_tools;
|
||||
use codex_core::connectors::merge_connectors;
|
||||
pub use codex_core::connectors::with_app_enabled_state;
|
||||
|
||||
@@ -83,6 +84,22 @@ pub async fn list_all_connectors(config: &Config) -> anyhow::Result<Vec<AppInfo>
|
||||
list_all_connectors_with_options(config, false).await
|
||||
}
|
||||
|
||||
pub async fn list_cached_all_connectors(config: &Config) -> Option<Vec<AppInfo>> {
|
||||
if !config.features.enabled(Feature::Apps) {
|
||||
return Some(Vec::new());
|
||||
}
|
||||
|
||||
if init_chatgpt_token_from_auth(&config.codex_home, config.cli_auth_credentials_store_mode)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return None;
|
||||
}
|
||||
let token_data = get_chatgpt_token_data()?;
|
||||
let cache_key = all_connectors_cache_key(config, &token_data);
|
||||
read_cached_all_connectors(&cache_key)
|
||||
}
|
||||
|
||||
pub async fn list_all_connectors_with_options(
|
||||
config: &Config,
|
||||
force_refetch: bool,
|
||||
|
||||
@@ -52,6 +52,19 @@ pub async fn list_accessible_connectors_from_mcp_tools(
|
||||
list_accessible_connectors_from_mcp_tools_with_options(config, false).await
|
||||
}
|
||||
|
||||
pub async fn list_cached_accessible_connectors_from_mcp_tools(
|
||||
config: &Config,
|
||||
) -> Option<Vec<AppInfo>> {
|
||||
if !config.features.enabled(Feature::Apps) {
|
||||
return Some(Vec::new());
|
||||
}
|
||||
|
||||
let auth_manager = auth_manager_from_config(config);
|
||||
let auth = auth_manager.auth().await;
|
||||
let cache_key = accessible_connectors_cache_key(config, auth.as_ref());
|
||||
read_cached_accessible_connectors(&cache_key)
|
||||
}
|
||||
|
||||
pub async fn list_accessible_connectors_from_mcp_tools_with_options(
|
||||
config: &Config,
|
||||
force_refetch: bool,
|
||||
|
||||
Reference in New Issue
Block a user