mirror of
https://github.com/openai/codex.git
synced 2026-05-30 07:50:17 +00:00
Merge b6dc57e035 into sapling-pr-archive-bolinfest
This commit is contained in:
4
codex-rs/Cargo.lock
generated
4
codex-rs/Cargo.lock
generated
@@ -2441,7 +2441,11 @@ dependencies = [
|
||||
"codex-app-server-protocol",
|
||||
"pretty_assertions",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha1",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"urlencoding",
|
||||
]
|
||||
|
||||
|
||||
@@ -4,7 +4,8 @@ use std::time::Duration;
|
||||
use crate::chatgpt_client::chatgpt_get_request_with_timeout;
|
||||
|
||||
use codex_app_server_protocol::AppInfo;
|
||||
use codex_connectors::AllConnectorsCacheKey;
|
||||
use codex_connectors::ConnectorDirectoryCacheContext;
|
||||
use codex_connectors::ConnectorDirectoryCacheKey;
|
||||
use codex_connectors::DirectoryListResponse;
|
||||
use codex_connectors::filter::filter_disallowed_connectors;
|
||||
use codex_connectors::merge::merge_connectors;
|
||||
@@ -75,8 +76,8 @@ pub async fn list_cached_all_connectors(config: &Config) -> Option<Vec<AppInfo>>
|
||||
}
|
||||
|
||||
let auth = connector_auth(config).await.ok()?;
|
||||
let cache_key = all_connectors_cache_key(config, &auth);
|
||||
let connectors = codex_connectors::cached_all_connectors(&cache_key)?;
|
||||
let cache_context = connector_directory_cache_context(config, &auth);
|
||||
let connectors = codex_connectors::cached_directory_connectors(&cache_context)?;
|
||||
let connectors = merge_plugin_connectors(
|
||||
connectors,
|
||||
plugin_apps_for_config(config)
|
||||
@@ -98,9 +99,9 @@ pub async fn list_all_connectors_with_options(
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let auth = connector_auth(config).await?;
|
||||
let cache_key = all_connectors_cache_key(config, &auth);
|
||||
let cache_context = connector_directory_cache_context(config, &auth);
|
||||
let connectors = codex_connectors::list_all_connectors_with_options(
|
||||
cache_key,
|
||||
cache_context,
|
||||
auth.is_workspace_account(),
|
||||
force_refetch,
|
||||
|path| async move {
|
||||
@@ -126,12 +127,18 @@ pub async fn list_all_connectors_with_options(
|
||||
))
|
||||
}
|
||||
|
||||
fn all_connectors_cache_key(config: &Config, auth: &CodexAuth) -> AllConnectorsCacheKey {
|
||||
AllConnectorsCacheKey::new(
|
||||
config.chatgpt_base_url.clone(),
|
||||
auth.get_account_id(),
|
||||
auth.get_chatgpt_user_id(),
|
||||
auth.is_workspace_account(),
|
||||
fn connector_directory_cache_context(
|
||||
config: &Config,
|
||||
auth: &CodexAuth,
|
||||
) -> ConnectorDirectoryCacheContext {
|
||||
ConnectorDirectoryCacheContext::new(
|
||||
config.codex_home.to_path_buf(),
|
||||
ConnectorDirectoryCacheKey::new(
|
||||
config.chatgpt_base_url.clone(),
|
||||
auth.get_account_id(),
|
||||
auth.get_chatgpt_user_id(),
|
||||
auth.is_workspace_account(),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -11,10 +11,14 @@ workspace = true
|
||||
anyhow = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sha1 = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
urlencoding = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
|
||||
[lib]
|
||||
|
||||
112
codex-rs/connectors/src/directory_cache.rs
Normal file
112
codex-rs/connectors/src/directory_cache.rs
Normal file
@@ -0,0 +1,112 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use codex_app_server_protocol::AppInfo;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use sha1::Digest;
|
||||
use sha1::Sha1;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ConnectorDirectoryCacheKey;
|
||||
|
||||
pub(crate) const CONNECTOR_DIRECTORY_DISK_CACHE_SCHEMA_VERSION: u8 = 1;
|
||||
const CONNECTOR_DIRECTORY_DISK_CACHE_DIR: &str = "cache/codex_app_directory";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectorDirectoryCacheContext {
|
||||
pub(crate) codex_home: PathBuf,
|
||||
pub(crate) cache_key: ConnectorDirectoryCacheKey,
|
||||
}
|
||||
|
||||
impl ConnectorDirectoryCacheContext {
|
||||
pub fn new(codex_home: PathBuf, cache_key: ConnectorDirectoryCacheKey) -> Self {
|
||||
Self {
|
||||
codex_home,
|
||||
cache_key,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cache_path(&self) -> PathBuf {
|
||||
let cache_key_json = serde_json::to_string(&self.cache_key).unwrap_or_default();
|
||||
let cache_key_hash = sha1_hex(&cache_key_json);
|
||||
self.codex_home
|
||||
.join(CONNECTOR_DIRECTORY_DISK_CACHE_DIR)
|
||||
.join(format!("{cache_key_hash}.json"))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum CachedConnectorDirectoryDiskLoad {
|
||||
Hit { connectors: Vec<AppInfo> },
|
||||
Missing,
|
||||
Invalid,
|
||||
}
|
||||
|
||||
pub(crate) fn load_cached_directory_connectors_from_disk(
|
||||
cache_context: &ConnectorDirectoryCacheContext,
|
||||
) -> CachedConnectorDirectoryDiskLoad {
|
||||
let cache_path = cache_context.cache_path();
|
||||
let bytes = match std::fs::read(&cache_path) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
return CachedConnectorDirectoryDiskLoad::Missing;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
cache_path = %cache_path.display(),
|
||||
"failed to read connector directory disk cache: {err}"
|
||||
);
|
||||
return CachedConnectorDirectoryDiskLoad::Invalid;
|
||||
}
|
||||
};
|
||||
let cache: ConnectorDirectoryDiskCache = match serde_json::from_slice(&bytes) {
|
||||
Ok(cache) => cache,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
cache_path = %cache_path.display(),
|
||||
"failed to parse connector directory disk cache: {err}"
|
||||
);
|
||||
let _ = std::fs::remove_file(cache_path);
|
||||
return CachedConnectorDirectoryDiskLoad::Invalid;
|
||||
}
|
||||
};
|
||||
if cache.schema_version != CONNECTOR_DIRECTORY_DISK_CACHE_SCHEMA_VERSION {
|
||||
let _ = std::fs::remove_file(cache_path);
|
||||
return CachedConnectorDirectoryDiskLoad::Invalid;
|
||||
}
|
||||
|
||||
CachedConnectorDirectoryDiskLoad::Hit {
|
||||
connectors: cache.connectors,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn write_cached_directory_connectors_to_disk(
|
||||
cache_context: &ConnectorDirectoryCacheContext,
|
||||
connectors: &[AppInfo],
|
||||
) {
|
||||
let cache_path = cache_context.cache_path();
|
||||
if let Some(parent) = cache_path.parent()
|
||||
&& std::fs::create_dir_all(parent).is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
let Ok(bytes) = serde_json::to_vec_pretty(&ConnectorDirectoryDiskCache {
|
||||
schema_version: CONNECTOR_DIRECTORY_DISK_CACHE_SCHEMA_VERSION,
|
||||
connectors: connectors.to_vec(),
|
||||
}) else {
|
||||
return;
|
||||
};
|
||||
let _ = std::fs::write(cache_path, bytes);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct ConnectorDirectoryDiskCache {
|
||||
schema_version: u8,
|
||||
connectors: Vec<AppInfo>,
|
||||
}
|
||||
|
||||
fn sha1_hex(value: &str) -> String {
|
||||
let mut hasher = Sha1::new();
|
||||
hasher.update(value.as_bytes());
|
||||
let sha1 = hasher.finalize();
|
||||
format!("{sha1:x}")
|
||||
}
|
||||
@@ -9,23 +9,27 @@ use codex_app_server_protocol::AppBranding;
|
||||
use codex_app_server_protocol::AppInfo;
|
||||
use codex_app_server_protocol::AppMetadata;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
pub mod accessible;
|
||||
mod directory_cache;
|
||||
pub mod filter;
|
||||
pub mod merge;
|
||||
pub mod metadata;
|
||||
|
||||
pub use directory_cache::ConnectorDirectoryCacheContext;
|
||||
|
||||
pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600);
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct AllConnectorsCacheKey {
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ConnectorDirectoryCacheKey {
|
||||
chatgpt_base_url: String,
|
||||
account_id: Option<String>,
|
||||
chatgpt_user_id: Option<String>,
|
||||
is_workspace_account: bool,
|
||||
}
|
||||
|
||||
impl AllConnectorsCacheKey {
|
||||
impl ConnectorDirectoryCacheKey {
|
||||
pub fn new(
|
||||
chatgpt_base_url: String,
|
||||
account_id: Option<String>,
|
||||
@@ -42,13 +46,13 @@ impl AllConnectorsCacheKey {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CachedAllConnectors {
|
||||
key: AllConnectorsCacheKey,
|
||||
struct CachedConnectorDirectory {
|
||||
key: ConnectorDirectoryCacheKey,
|
||||
expires_at: Instant,
|
||||
connectors: Vec<AppInfo>,
|
||||
}
|
||||
|
||||
static ALL_CONNECTORS_CACHE: LazyLock<StdMutex<Option<CachedAllConnectors>>> =
|
||||
static CONNECTOR_DIRECTORY_CACHE: LazyLock<StdMutex<Option<CachedConnectorDirectory>>> =
|
||||
LazyLock::new(|| StdMutex::new(None));
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -76,26 +80,54 @@ pub struct DirectoryApp {
|
||||
visibility: Option<String>,
|
||||
}
|
||||
|
||||
pub fn cached_all_connectors(cache_key: &AllConnectorsCacheKey) -> Option<Vec<AppInfo>> {
|
||||
let mut cache_guard = ALL_CONNECTORS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let now = Instant::now();
|
||||
|
||||
if let Some(cached) = cache_guard.as_ref() {
|
||||
if now < cached.expires_at && cached.key == *cache_key {
|
||||
return Some(cached.connectors.clone());
|
||||
}
|
||||
if now >= cached.expires_at {
|
||||
*cache_guard = None;
|
||||
}
|
||||
pub fn cached_directory_connectors(
|
||||
cache_context: &ConnectorDirectoryCacheContext,
|
||||
) -> Option<Vec<AppInfo>> {
|
||||
if let Some(cached_connectors) = cached_directory_connectors_in_memory(&cache_context.cache_key)
|
||||
{
|
||||
return Some(cached_connectors);
|
||||
}
|
||||
|
||||
let directory_cache::CachedConnectorDirectoryDiskLoad::Hit { connectors } =
|
||||
directory_cache::load_cached_directory_connectors_from_disk(cache_context)
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
write_cached_directory_connectors_in_memory(
|
||||
cache_context.cache_key.clone(),
|
||||
&connectors,
|
||||
Duration::ZERO,
|
||||
);
|
||||
Some(connectors)
|
||||
}
|
||||
|
||||
fn cached_directory_connectors_in_memory(
|
||||
cache_key: &ConnectorDirectoryCacheKey,
|
||||
) -> Option<Vec<AppInfo>> {
|
||||
let cache_guard = CONNECTOR_DIRECTORY_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
cache_guard
|
||||
.as_ref()
|
||||
.filter(|cached| cached.key == *cache_key)
|
||||
.map(|cached| cached.connectors.clone())
|
||||
}
|
||||
|
||||
fn unexpired_directory_connectors_in_memory(
|
||||
cache_key: &ConnectorDirectoryCacheKey,
|
||||
) -> Option<Vec<AppInfo>> {
|
||||
let cache_guard = CONNECTOR_DIRECTORY_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let cached = cache_guard.as_ref()?;
|
||||
if cached.key == *cache_key && Instant::now() < cached.expires_at {
|
||||
return Some(cached.connectors.clone());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub async fn list_all_connectors_with_options<F, Fut>(
|
||||
cache_key: AllConnectorsCacheKey,
|
||||
cache_context: ConnectorDirectoryCacheContext,
|
||||
is_workspace_account: bool,
|
||||
force_refetch: bool,
|
||||
mut fetch_page: F,
|
||||
@@ -104,7 +136,10 @@ where
|
||||
F: FnMut(String) -> Fut,
|
||||
Fut: Future<Output = anyhow::Result<DirectoryListResponse>>,
|
||||
{
|
||||
if !force_refetch && let Some(cached_connectors) = cached_all_connectors(&cache_key) {
|
||||
if !force_refetch
|
||||
&& let Some(cached_connectors) =
|
||||
unexpired_directory_connectors_in_memory(&cache_context.cache_key)
|
||||
{
|
||||
return Ok(cached_connectors);
|
||||
}
|
||||
|
||||
@@ -132,17 +167,33 @@ where
|
||||
.cmp(&right.name)
|
||||
.then_with(|| left.id.cmp(&right.id))
|
||||
});
|
||||
write_cached_all_connectors(cache_key, &connectors);
|
||||
write_cached_directory_connectors(&cache_context, &connectors);
|
||||
Ok(connectors)
|
||||
}
|
||||
|
||||
fn write_cached_all_connectors(cache_key: AllConnectorsCacheKey, connectors: &[AppInfo]) {
|
||||
let mut cache_guard = ALL_CONNECTORS_CACHE
|
||||
fn write_cached_directory_connectors(
|
||||
cache_context: &ConnectorDirectoryCacheContext,
|
||||
connectors: &[AppInfo],
|
||||
) {
|
||||
write_cached_directory_connectors_in_memory(
|
||||
cache_context.cache_key.clone(),
|
||||
connectors,
|
||||
CONNECTORS_CACHE_TTL,
|
||||
);
|
||||
directory_cache::write_cached_directory_connectors_to_disk(cache_context, connectors);
|
||||
}
|
||||
|
||||
fn write_cached_directory_connectors_in_memory(
|
||||
cache_key: ConnectorDirectoryCacheKey,
|
||||
connectors: &[AppInfo],
|
||||
ttl: Duration,
|
||||
) {
|
||||
let mut cache_guard = CONNECTOR_DIRECTORY_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*cache_guard = Some(CachedAllConnectors {
|
||||
*cache_guard = Some(CachedConnectorDirectory {
|
||||
key: cache_key,
|
||||
expires_at: Instant::now() + CONNECTORS_CACHE_TTL,
|
||||
expires_at: Instant::now() + ttl,
|
||||
connectors: connectors.to_vec(),
|
||||
});
|
||||
}
|
||||
@@ -417,12 +468,13 @@ mod tests {
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use tempfile::TempDir;
|
||||
|
||||
static ALL_CONNECTORS_CACHE_TEST_LOCK: LazyLock<tokio::sync::Mutex<()>> =
|
||||
static CONNECTOR_DIRECTORY_CACHE_TEST_LOCK: LazyLock<tokio::sync::Mutex<()>> =
|
||||
LazyLock::new(|| tokio::sync::Mutex::new(()));
|
||||
|
||||
fn cache_key(id: &str) -> AllConnectorsCacheKey {
|
||||
AllConnectorsCacheKey::new(
|
||||
fn cache_key(id: &str) -> ConnectorDirectoryCacheKey {
|
||||
ConnectorDirectoryCacheKey::new(
|
||||
"https://chatgpt.example".to_string(),
|
||||
Some(format!("account-{id}")),
|
||||
Some(format!("user-{id}")),
|
||||
@@ -430,6 +482,17 @@ mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
fn cache_context(codex_home: &TempDir, id: &str) -> ConnectorDirectoryCacheContext {
|
||||
ConnectorDirectoryCacheContext::new(codex_home.path().to_path_buf(), cache_key(id))
|
||||
}
|
||||
|
||||
fn clear_directory_memory_cache() {
|
||||
let mut cache_guard = CONNECTOR_DIRECTORY_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*cache_guard = None;
|
||||
}
|
||||
|
||||
fn app(id: &str, name: &str) -> DirectoryApp {
|
||||
DirectoryApp {
|
||||
id: id.to_string(),
|
||||
@@ -450,15 +513,16 @@ mod tests {
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "test serializes access to the shared connector cache for its full duration"
|
||||
)]
|
||||
async fn list_all_connectors_uses_shared_cache() -> anyhow::Result<()> {
|
||||
let _cache_guard = ALL_CONNECTORS_CACHE_TEST_LOCK.lock().await;
|
||||
async fn list_all_connectors_uses_shared_directory_cache() -> anyhow::Result<()> {
|
||||
let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await;
|
||||
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
let call_counter = Arc::clone(&calls);
|
||||
let key = cache_key("shared");
|
||||
let codex_home = TempDir::new()?;
|
||||
let cache_context = cache_context(&codex_home, "shared");
|
||||
|
||||
let first = list_all_connectors_with_options(
|
||||
key.clone(),
|
||||
cache_context.clone(),
|
||||
/*is_workspace_account*/ false,
|
||||
/*force_refetch*/ false,
|
||||
move |_path| {
|
||||
@@ -475,7 +539,7 @@ mod tests {
|
||||
.await?;
|
||||
|
||||
let second = list_all_connectors_with_options(
|
||||
key,
|
||||
cache_context,
|
||||
/*is_workspace_account*/ false,
|
||||
/*force_refetch*/ false,
|
||||
move |_path| async move {
|
||||
@@ -495,14 +559,15 @@ mod tests {
|
||||
reason = "test serializes access to the shared connector cache for its full duration"
|
||||
)]
|
||||
async fn list_all_connectors_merges_and_normalizes_directory_apps() -> anyhow::Result<()> {
|
||||
let _cache_guard = ALL_CONNECTORS_CACHE_TEST_LOCK.lock().await;
|
||||
let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await;
|
||||
|
||||
let key = cache_key("merged");
|
||||
let codex_home = TempDir::new()?;
|
||||
let cache_context = cache_context(&codex_home, "merged");
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
let call_counter = Arc::clone(&calls);
|
||||
|
||||
let connectors = list_all_connectors_with_options(
|
||||
key,
|
||||
cache_context,
|
||||
/*is_workspace_account*/ true,
|
||||
/*force_refetch*/ true,
|
||||
move |path| {
|
||||
@@ -566,6 +631,134 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "test serializes access to the shared connector cache for its full duration"
|
||||
)]
|
||||
async fn cached_directory_connectors_reads_directory_disk_cache() -> anyhow::Result<()> {
|
||||
let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let cache_context = cache_context(&codex_home, "disk");
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
let call_counter = Arc::clone(&calls);
|
||||
|
||||
let first = list_all_connectors_with_options(
|
||||
cache_context.clone(),
|
||||
/*is_workspace_account*/ false,
|
||||
/*force_refetch*/ false,
|
||||
move |_path| {
|
||||
let call_counter = Arc::clone(&call_counter);
|
||||
async move {
|
||||
call_counter.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(DirectoryListResponse {
|
||||
apps: vec![app("alpha", "Alpha")],
|
||||
next_token: None,
|
||||
})
|
||||
}
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
clear_directory_memory_cache();
|
||||
|
||||
let second = cached_directory_connectors(&cache_context).expect("disk cache should load");
|
||||
|
||||
assert_eq!(calls.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(first, second);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "test serializes access to the shared connector cache for its full duration"
|
||||
)]
|
||||
async fn list_all_connectors_refreshes_when_only_directory_disk_cache_exists()
|
||||
-> anyhow::Result<()> {
|
||||
let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let cache_context = cache_context(&codex_home, "disk-refresh");
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
let call_counter = Arc::clone(&calls);
|
||||
|
||||
list_all_connectors_with_options(
|
||||
cache_context.clone(),
|
||||
/*is_workspace_account*/ false,
|
||||
/*force_refetch*/ false,
|
||||
move |_path| {
|
||||
let call_counter = Arc::clone(&call_counter);
|
||||
async move {
|
||||
call_counter.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(DirectoryListResponse {
|
||||
apps: vec![app("alpha", "Alpha")],
|
||||
next_token: None,
|
||||
})
|
||||
}
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
clear_directory_memory_cache();
|
||||
let mut cached_expected = directory_app_to_app_info(app("alpha", "Alpha"));
|
||||
cached_expected.install_url = Some(connector_install_url(
|
||||
&cached_expected.name,
|
||||
&cached_expected.id,
|
||||
));
|
||||
assert_eq!(
|
||||
cached_directory_connectors(&cache_context),
|
||||
Some(vec![cached_expected])
|
||||
);
|
||||
let refreshed_calls = Arc::clone(&calls);
|
||||
|
||||
let refreshed = list_all_connectors_with_options(
|
||||
cache_context,
|
||||
/*is_workspace_account*/ false,
|
||||
/*force_refetch*/ false,
|
||||
move |_path| {
|
||||
let call_counter = Arc::clone(&refreshed_calls);
|
||||
async move {
|
||||
call_counter.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(DirectoryListResponse {
|
||||
apps: vec![app("beta", "Beta")],
|
||||
next_token: None,
|
||||
})
|
||||
}
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut expected = directory_app_to_app_info(app("beta", "Beta"));
|
||||
expected.install_url = Some(connector_install_url(&expected.name, &expected.id));
|
||||
assert_eq!(calls.load(Ordering::SeqCst), 2);
|
||||
assert_eq!(refreshed, vec![expected]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cached_directory_connectors_drops_stale_disk_schema() -> anyhow::Result<()> {
|
||||
let _cache_guard = CONNECTOR_DIRECTORY_CACHE_TEST_LOCK.lock().await;
|
||||
|
||||
clear_directory_memory_cache();
|
||||
let codex_home = TempDir::new()?;
|
||||
let cache_context = cache_context(&codex_home, "stale-schema");
|
||||
let cache_path = cache_context.cache_path();
|
||||
std::fs::create_dir_all(cache_path.parent().expect("cache parent"))?;
|
||||
std::fs::write(
|
||||
&cache_path,
|
||||
serde_json::to_vec_pretty(&serde_json::json!({
|
||||
"schema_version": 0,
|
||||
"connectors": [],
|
||||
}))?,
|
||||
)?;
|
||||
|
||||
assert_eq!(cached_directory_connectors(&cache_context), None);
|
||||
assert!(!cache_path.exists());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_directory_connectors_omits_tier_for_all_pages() -> anyhow::Result<()> {
|
||||
let requested_paths: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
@@ -6,14 +6,12 @@ use std::sync::Mutex as StdMutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Context;
|
||||
use async_channel::unbounded;
|
||||
use codex_api::SharedAuthProvider;
|
||||
pub use codex_app_server_protocol::AppBranding;
|
||||
pub use codex_app_server_protocol::AppInfo;
|
||||
pub use codex_app_server_protocol::AppMetadata;
|
||||
use codex_connectors::AllConnectorsCacheKey;
|
||||
use codex_connectors::DirectoryListResponse;
|
||||
use codex_connectors::ConnectorDirectoryCacheContext;
|
||||
use codex_connectors::ConnectorDirectoryCacheKey;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
@@ -21,7 +19,6 @@ use codex_protocol::models::PermissionProfile;
|
||||
use codex_tools::DiscoverableTool;
|
||||
use rmcp::model::ToolAnnotations;
|
||||
use serde::Deserialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::config::Config;
|
||||
@@ -36,7 +33,6 @@ use codex_core_plugins::PluginsManager;
|
||||
use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use codex_mcp::McpConnectionManager;
|
||||
@@ -49,7 +45,6 @@ use codex_mcp::host_owned_codex_apps_enabled;
|
||||
use codex_mcp::with_codex_apps_mcp;
|
||||
|
||||
const CONNECTORS_READY_TIMEOUT_ON_EMPTY_TOOLS: Duration = Duration::from_secs(30);
|
||||
const DIRECTORY_CONNECTORS_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) struct AppToolPolicy {
|
||||
@@ -120,9 +115,11 @@ pub(crate) async fn list_tool_suggest_discoverable_tools_with_auth(
|
||||
auth: Option<&CodexAuth>,
|
||||
accessible_connectors: &[AppInfo],
|
||||
) -> anyhow::Result<Vec<DiscoverableTool>> {
|
||||
let directory_connectors =
|
||||
list_directory_connectors_for_tool_suggest_with_auth(config, auth).await?;
|
||||
let connector_ids = tool_suggest_connector_ids(config).await;
|
||||
let directory_connectors = codex_connectors::merge::merge_plugin_connectors(
|
||||
cached_directory_connectors_for_tool_suggest_with_auth(config, auth).await,
|
||||
connector_ids.iter().cloned(),
|
||||
);
|
||||
let discoverable_connectors =
|
||||
codex_connectors::filter::filter_tool_suggest_discoverable_connectors(
|
||||
directory_connectors,
|
||||
@@ -436,12 +433,12 @@ async fn tool_suggest_connector_ids(config: &Config) -> HashSet<String> {
|
||||
connector_ids
|
||||
}
|
||||
|
||||
async fn list_directory_connectors_for_tool_suggest_with_auth(
|
||||
async fn cached_directory_connectors_for_tool_suggest_with_auth(
|
||||
config: &Config,
|
||||
auth: Option<&CodexAuth>,
|
||||
) -> anyhow::Result<Vec<AppInfo>> {
|
||||
) -> Vec<AppInfo> {
|
||||
if !config.features.enabled(Feature::Apps) {
|
||||
return Ok(Vec::new());
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let loaded_auth;
|
||||
@@ -454,67 +451,25 @@ async fn list_directory_connectors_for_tool_suggest_with_auth(
|
||||
loaded_auth.as_ref()
|
||||
};
|
||||
let Some(auth) = auth.filter(|auth| auth.uses_codex_backend()) else {
|
||||
return Ok(Vec::new());
|
||||
return Vec::new();
|
||||
};
|
||||
|
||||
let account_id = match auth.get_account_id() {
|
||||
Some(account_id) if !account_id.is_empty() => account_id,
|
||||
_ => return Ok(Vec::new()),
|
||||
_ => return Vec::new(),
|
||||
};
|
||||
let auth_provider = codex_model_provider::auth_provider_from_auth(auth);
|
||||
let is_workspace_account = auth.is_workspace_account();
|
||||
let cache_key = AllConnectorsCacheKey::new(
|
||||
config.chatgpt_base_url.clone(),
|
||||
Some(account_id.clone()),
|
||||
auth.get_chatgpt_user_id(),
|
||||
is_workspace_account,
|
||||
let cache_context = ConnectorDirectoryCacheContext::new(
|
||||
config.codex_home.to_path_buf(),
|
||||
ConnectorDirectoryCacheKey::new(
|
||||
config.chatgpt_base_url.clone(),
|
||||
Some(account_id),
|
||||
auth.get_chatgpt_user_id(),
|
||||
is_workspace_account,
|
||||
),
|
||||
);
|
||||
|
||||
codex_connectors::list_all_connectors_with_options(
|
||||
cache_key,
|
||||
is_workspace_account,
|
||||
/*force_refetch*/ false,
|
||||
|path| {
|
||||
let auth_provider = auth_provider.clone();
|
||||
async move {
|
||||
chatgpt_get_request_with_auth_provider::<DirectoryListResponse>(
|
||||
config,
|
||||
path,
|
||||
auth_provider,
|
||||
)
|
||||
.await
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn chatgpt_get_request_with_auth_provider<T: DeserializeOwned>(
|
||||
config: &Config,
|
||||
path: String,
|
||||
auth_provider: SharedAuthProvider,
|
||||
) -> anyhow::Result<T> {
|
||||
let client = create_client();
|
||||
let url = format!("{}{}", config.chatgpt_base_url, path);
|
||||
let response = client
|
||||
.get(&url)
|
||||
.headers(auth_provider.to_auth_headers())
|
||||
.header("Content-Type", "application/json")
|
||||
.timeout(DIRECTORY_CONNECTORS_TIMEOUT)
|
||||
.send()
|
||||
.await
|
||||
.context("failed to send request")?;
|
||||
|
||||
if response.status().is_success() {
|
||||
response
|
||||
.json()
|
||||
.await
|
||||
.context("failed to parse JSON response")
|
||||
} else {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
anyhow::bail!("request failed with status {status}: {body}");
|
||||
}
|
||||
codex_connectors::cached_directory_connectors(&cache_context).unwrap_or_default()
|
||||
}
|
||||
|
||||
pub(crate) fn accessible_connectors_from_mcp_tools(mcp_tools: &[ToolInfo]) -> Vec<AppInfo> {
|
||||
|
||||
@@ -19,6 +19,7 @@ use codex_connectors::metadata::connector_install_url;
|
||||
use codex_connectors::metadata::connector_mention_slug;
|
||||
use codex_connectors::metadata::sanitize_name;
|
||||
use codex_features::Feature;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use codex_mcp::ToolInfo;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
@@ -1120,6 +1121,42 @@ disabled_tools = [
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tool_suggest_uses_connector_id_fallback_when_directory_cache_is_empty() {
|
||||
let codex_home = tempdir().expect("tempdir should succeed");
|
||||
std::fs::write(
|
||||
codex_home.path().join(CONFIG_TOML_FILE),
|
||||
r#"
|
||||
[features]
|
||||
apps = true
|
||||
|
||||
[tool_suggest]
|
||||
discoverables = [
|
||||
{ type = "connector", id = "connector_gmail" }
|
||||
]
|
||||
"#,
|
||||
)
|
||||
.expect("write config");
|
||||
let config = ConfigBuilder::default()
|
||||
.codex_home(codex_home.path().to_path_buf())
|
||||
.build()
|
||||
.await
|
||||
.expect("config should load");
|
||||
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
|
||||
|
||||
let discoverable_tools =
|
||||
list_tool_suggest_discoverable_tools_with_auth(&config, Some(&auth), &[])
|
||||
.await
|
||||
.expect("discoverable tools should load");
|
||||
|
||||
assert_eq!(
|
||||
discoverable_tools,
|
||||
vec![DiscoverableTool::from(plugin_connector_to_app_info(
|
||||
"connector_gmail".to_string(),
|
||||
))]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_tool_suggest_discoverable_connectors_keeps_only_plugin_backed_uninstalled_apps() {
|
||||
let filtered = filter_tool_suggest_discoverable_connectors(
|
||||
|
||||
Reference in New Issue
Block a user