mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
update
This commit is contained in:
@@ -149,6 +149,7 @@ use crate::mcp::effective_mcp_servers;
|
||||
use crate::mcp::maybe_prompt_and_install_mcp_dependencies;
|
||||
use crate::mcp::with_codex_apps_mcp;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
use crate::mcp_connection_manager::filter_codex_apps_mcp_tools_only;
|
||||
use crate::mcp_connection_manager::filter_mcp_tools_by_name;
|
||||
use crate::memories;
|
||||
@@ -1336,6 +1337,8 @@ impl Session {
|
||||
tx_event.clone(),
|
||||
cancel_token,
|
||||
sandbox_state,
|
||||
config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key(auth),
|
||||
)
|
||||
.await;
|
||||
if !required_mcp_servers.is_empty() {
|
||||
@@ -2939,6 +2942,8 @@ impl Session {
|
||||
self.get_tx_event(),
|
||||
cancel_token,
|
||||
sandbox_state,
|
||||
config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key(auth.as_ref()),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -22,8 +22,8 @@ use crate::features::Feature;
|
||||
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use crate::mcp::auth::compute_auth_statuses;
|
||||
use crate::mcp::with_codex_apps_mcp;
|
||||
use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
use crate::token_data::TokenData;
|
||||
|
||||
pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600);
|
||||
@@ -109,6 +109,8 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options(
|
||||
tx_event,
|
||||
cancel_token.clone(),
|
||||
sandbox_state,
|
||||
config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key(auth.as_ref()),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -122,16 +124,14 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options(
|
||||
);
|
||||
}
|
||||
|
||||
let codex_apps_ready = if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) {
|
||||
let timeout = cfg.startup_timeout_sec.unwrap_or(DEFAULT_STARTUP_TIMEOUT);
|
||||
let tools = mcp_connection_manager.list_all_tools().await;
|
||||
let codex_apps_ready = if mcp_servers.contains_key(CODEX_APPS_MCP_SERVER_NAME) {
|
||||
mcp_connection_manager
|
||||
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout)
|
||||
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, Duration::ZERO)
|
||||
.await
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let tools = mcp_connection_manager.list_all_tools().await;
|
||||
cancel_token.cancel();
|
||||
|
||||
let accessible_connectors = accessible_connectors_from_mcp_tools(&tools);
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::features::Feature;
|
||||
use crate::mcp::auth::compute_auth_statuses;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::mcp_connection_manager::SandboxState;
|
||||
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
|
||||
const MCP_TOOL_NAME_PREFIX: &str = "mcp";
|
||||
const MCP_TOOL_NAME_DELIMITER: &str = "__";
|
||||
@@ -212,6 +213,8 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent
|
||||
tx_event,
|
||||
cancel_token.clone(),
|
||||
sandbox_state,
|
||||
config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key(auth.as_ref()),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -12,10 +12,7 @@ use std::env;
|
||||
use std::ffi::OsString;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use crate::mcp::auth::McpAuthStatusEntry;
|
||||
@@ -88,7 +85,8 @@ pub const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
/// Default timeout for individual tool calls.
|
||||
const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
const CODEX_APPS_TOOLS_CACHE_TTL: Duration = Duration::from_secs(3600);
|
||||
const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 1;
|
||||
const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools";
|
||||
|
||||
/// The Responses API requires tool names to match `^[a-zA-Z0-9_-]+$`.
|
||||
/// MCP server/tool names are user-controlled, so sanitize the fully-qualified
|
||||
@@ -117,6 +115,27 @@ fn sha1_hex(s: &str) -> String {
|
||||
format!("{sha1:x}")
|
||||
}
|
||||
|
||||
pub(crate) fn codex_apps_tools_cache_key(
|
||||
auth: Option<&crate::CodexAuth>,
|
||||
) -> CodexAppsToolsCacheKey {
|
||||
let token_data = auth.and_then(|auth| auth.get_token_data().ok());
|
||||
let account_id = token_data
|
||||
.as_ref()
|
||||
.and_then(|token_data| token_data.account_id.clone());
|
||||
let chatgpt_user_id = token_data
|
||||
.as_ref()
|
||||
.and_then(|token_data| token_data.id_token.chatgpt_user_id.clone());
|
||||
let is_workspace_account = token_data
|
||||
.as_ref()
|
||||
.is_some_and(|token_data| token_data.id_token.is_workspace_account());
|
||||
|
||||
CodexAppsToolsCacheKey {
|
||||
account_id,
|
||||
chatgpt_user_id,
|
||||
is_workspace_account,
|
||||
}
|
||||
}
|
||||
|
||||
fn qualify_tools<I>(tools: I) -> HashMap<String, ToolInfo>
|
||||
where
|
||||
I: IntoIterator<Item = ToolInfo>,
|
||||
@@ -159,7 +178,7 @@ where
|
||||
qualified_tools
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub(crate) struct ToolInfo {
|
||||
pub(crate) server_name: String,
|
||||
pub(crate) tool_name: String,
|
||||
@@ -168,14 +187,40 @@ pub(crate) struct ToolInfo {
|
||||
pub(crate) connector_name: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub(crate) struct CodexAppsToolsCacheKey {
|
||||
account_id: Option<String>,
|
||||
chatgpt_user_id: Option<String>,
|
||||
is_workspace_account: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CachedCodexAppsTools {
|
||||
expires_at: Instant,
|
||||
struct CodexAppsToolsCacheContext {
|
||||
codex_home: PathBuf,
|
||||
user_key: CodexAppsToolsCacheKey,
|
||||
}
|
||||
|
||||
impl CodexAppsToolsCacheContext {
|
||||
fn cache_path(&self) -> PathBuf {
|
||||
let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default();
|
||||
let user_key_hash = sha1_hex(&user_key_json);
|
||||
self.codex_home
|
||||
.join(CODEX_APPS_TOOLS_CACHE_DIR)
|
||||
.join(format!("{user_key_hash}.json"))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct CodexAppsToolsDiskCache {
|
||||
schema_version: u8,
|
||||
tools: Vec<ToolInfo>,
|
||||
}
|
||||
|
||||
static CODEX_APPS_TOOLS_CACHE: LazyLock<StdMutex<Option<CachedCodexAppsTools>>> =
|
||||
LazyLock::new(|| StdMutex::new(None));
|
||||
enum CachedCodexAppsToolsLoad {
|
||||
Hit(Vec<ToolInfo>),
|
||||
Missing,
|
||||
Invalid,
|
||||
}
|
||||
|
||||
type ResponderMap = HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>;
|
||||
|
||||
@@ -253,6 +298,7 @@ struct ManagedClient {
|
||||
tool_filter: ToolFilter,
|
||||
tool_timeout: Option<Duration>,
|
||||
server_supports_sandbox_state_capability: bool,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
}
|
||||
|
||||
impl ManagedClient {
|
||||
@@ -276,6 +322,13 @@ impl ManagedClient {
|
||||
#[derive(Clone)]
|
||||
struct AsyncManagedClient {
|
||||
client: Shared<BoxFuture<'static, Result<ManagedClient, StartupOutcomeError>>>,
|
||||
startup_snapshot: StartupToolSnapshot,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum StartupToolSnapshot {
|
||||
CacheHit(Vec<ToolInfo>),
|
||||
Unavailable,
|
||||
}
|
||||
|
||||
impl AsyncManagedClient {
|
||||
@@ -286,8 +339,19 @@ impl AsyncManagedClient {
|
||||
cancel_token: CancellationToken,
|
||||
tx_event: Sender<Event>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
) -> Self {
|
||||
let tool_filter = ToolFilter::from_config(&config);
|
||||
let startup_snapshot = match load_startup_cached_codex_apps_tools_snapshot(
|
||||
&server_name,
|
||||
codex_apps_tools_cache_context.as_ref(),
|
||||
) {
|
||||
StartupToolSnapshot::CacheHit(tools) => {
|
||||
StartupToolSnapshot::CacheHit(filter_tools(tools, tool_filter.clone()))
|
||||
}
|
||||
StartupToolSnapshot::Unavailable => StartupToolSnapshot::Unavailable,
|
||||
};
|
||||
let startup_tool_filter = tool_filter.clone();
|
||||
let fut = async move {
|
||||
if let Err(error) = validate_mcp_server_name(&server_name) {
|
||||
return Err(error.into());
|
||||
@@ -300,9 +364,10 @@ impl AsyncManagedClient {
|
||||
client,
|
||||
config.startup_timeout_sec.or(Some(DEFAULT_STARTUP_TIMEOUT)),
|
||||
config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT),
|
||||
tool_filter,
|
||||
startup_tool_filter,
|
||||
tx_event,
|
||||
elicitation_requests,
|
||||
codex_apps_tools_cache_context,
|
||||
)
|
||||
.or_cancel(&cancel_token)
|
||||
.await
|
||||
@@ -313,6 +378,7 @@ impl AsyncManagedClient {
|
||||
};
|
||||
Self {
|
||||
client: fut.boxed().shared(),
|
||||
startup_snapshot,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,6 +386,10 @@ impl AsyncManagedClient {
|
||||
self.client.clone().await
|
||||
}
|
||||
|
||||
fn try_client(&self) -> Option<Result<ManagedClient, StartupOutcomeError>> {
|
||||
self.client.clone().now_or_never()
|
||||
}
|
||||
|
||||
async fn notify_sandbox_state_change(&self, sandbox_state: &SandboxState) -> Result<()> {
|
||||
let managed = self.client().await?;
|
||||
managed.notify_sandbox_state_change(sandbox_state).await
|
||||
@@ -358,6 +428,8 @@ impl McpConnectionManager {
|
||||
tx_event: Sender<Event>,
|
||||
cancel_token: CancellationToken,
|
||||
initial_sandbox_state: SandboxState,
|
||||
codex_home: PathBuf,
|
||||
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
|
||||
) {
|
||||
if cancel_token.is_cancelled() {
|
||||
return;
|
||||
@@ -376,6 +448,14 @@ impl McpConnectionManager {
|
||||
},
|
||||
)
|
||||
.await;
|
||||
let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
Some(CodexAppsToolsCacheContext {
|
||||
codex_home: codex_home.clone(),
|
||||
user_key: codex_apps_tools_cache_key.clone(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let async_managed_client = AsyncManagedClient::new(
|
||||
server_name.clone(),
|
||||
cfg,
|
||||
@@ -383,6 +463,7 @@ impl McpConnectionManager {
|
||||
cancel_token.clone(),
|
||||
tx_event.clone(),
|
||||
elicitation_requests.clone(),
|
||||
codex_apps_tools_cache_context,
|
||||
);
|
||||
clients.insert(server_name.clone(), async_managed_client.clone());
|
||||
let tx_event = tx_event.clone();
|
||||
@@ -510,34 +591,62 @@ impl McpConnectionManager {
|
||||
failures
|
||||
}
|
||||
|
||||
async fn list_tools_for_ready_client(
|
||||
server_name: &str,
|
||||
client: ManagedClient,
|
||||
) -> Vec<ToolInfo> {
|
||||
let rmcp_client = client.client;
|
||||
let tool_timeout = client.tool_timeout;
|
||||
let tool_filter = client.tool_filter;
|
||||
let mut server_tools = client.tools;
|
||||
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
match list_tools_for_client(
|
||||
server_name,
|
||||
&rmcp_client,
|
||||
tool_timeout,
|
||||
client.codex_apps_tools_cache_context.as_ref(),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(fresh_or_cached_tools) => {
|
||||
server_tools = fresh_or_cached_tools;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Failed to refresh tools for MCP server '{server_name}', using startup snapshot: {err:#}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
filter_tools(server_tools, tool_filter)
|
||||
}
|
||||
|
||||
/// Returns a single map that contains all tools. Each key is the
|
||||
/// fully-qualified name for the tool.
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub async fn list_all_tools(&self) -> HashMap<String, ToolInfo> {
|
||||
let mut tools = HashMap::new();
|
||||
for (server_name, managed_client) in &self.clients {
|
||||
let client = managed_client.client().await.ok();
|
||||
if let Some(client) = client {
|
||||
let rmcp_client = client.client;
|
||||
let tool_timeout = client.tool_timeout;
|
||||
let tool_filter = client.tool_filter;
|
||||
let mut server_tools = client.tools;
|
||||
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
match list_tools_for_client(server_name, &rmcp_client, tool_timeout).await {
|
||||
Ok(fresh_or_cached_tools) => {
|
||||
server_tools = fresh_or_cached_tools;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Failed to refresh tools for MCP server '{server_name}', using startup snapshot: {err:#}"
|
||||
);
|
||||
}
|
||||
let ready_client = match managed_client.try_client() {
|
||||
Some(Ok(client)) => Some(client),
|
||||
Some(Err(_)) => None,
|
||||
None => match &managed_client.startup_snapshot {
|
||||
StartupToolSnapshot::CacheHit(tools_from_cache) => {
|
||||
tools.extend(qualify_tools(tools_from_cache.clone()));
|
||||
None
|
||||
}
|
||||
}
|
||||
StartupToolSnapshot::Unavailable => managed_client.client().await.ok(),
|
||||
},
|
||||
};
|
||||
|
||||
tools.extend(qualify_tools(filter_tools(server_tools, tool_filter)));
|
||||
}
|
||||
let Some(ready_client) = ready_client else {
|
||||
continue;
|
||||
};
|
||||
let server_tools = Self::list_tools_for_ready_client(server_name, ready_client).await;
|
||||
tools.extend(qualify_tools(server_tools));
|
||||
}
|
||||
tools
|
||||
}
|
||||
@@ -565,7 +674,9 @@ impl McpConnectionManager {
|
||||
format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'")
|
||||
})?;
|
||||
|
||||
write_cached_codex_apps_tools(&tools);
|
||||
if let Some(cache_context) = managed_client.codex_apps_tools_cache_context.as_ref() {
|
||||
write_cached_codex_apps_tools(cache_context, &tools);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -997,6 +1108,7 @@ async fn start_server_task(
|
||||
tool_filter: ToolFilter,
|
||||
tx_event: Sender<Event>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
) -> Result<ManagedClient, StartupOutcomeError> {
|
||||
let params = InitializeRequestParams {
|
||||
meta: None,
|
||||
@@ -1033,9 +1145,15 @@ async fn start_server_task(
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)?;
|
||||
|
||||
let tools = list_tools_for_client(&server_name, &client, startup_timeout)
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)?;
|
||||
let tools = list_tools_for_client(
|
||||
&server_name,
|
||||
&client,
|
||||
startup_timeout,
|
||||
codex_apps_tools_cache_context.as_ref(),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)?;
|
||||
|
||||
let server_supports_sandbox_state_capability = initialize_result
|
||||
.capabilities
|
||||
@@ -1050,6 +1168,7 @@ async fn start_server_task(
|
||||
tool_timeout: Some(tool_timeout),
|
||||
tool_filter,
|
||||
server_supports_sandbox_state_capability,
|
||||
codex_apps_tools_cache_context,
|
||||
};
|
||||
|
||||
Ok(managed)
|
||||
@@ -1103,51 +1222,122 @@ async fn list_tools_for_client(
|
||||
server_name: &str,
|
||||
client: &Arc<RmcpClient>,
|
||||
timeout: Option<Duration>,
|
||||
codex_apps_tools_cache_context: Option<&CodexAppsToolsCacheContext>,
|
||||
refresh_in_background: bool,
|
||||
) -> Result<Vec<ToolInfo>> {
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME
|
||||
&& let Some(cached_tools) = read_cached_codex_apps_tools()
|
||||
{
|
||||
return Ok(cached_tools);
|
||||
if server_name != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return list_tools_for_client_uncached(server_name, client, timeout).await;
|
||||
}
|
||||
|
||||
let tools = list_tools_for_client_uncached(server_name, client, timeout).await?;
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
write_cached_codex_apps_tools(&tools);
|
||||
let Some(cache_context) = codex_apps_tools_cache_context.cloned() else {
|
||||
return list_tools_for_client_uncached(server_name, client, timeout).await;
|
||||
};
|
||||
|
||||
let cached = load_cached_codex_apps_tools(&cache_context);
|
||||
if refresh_in_background
|
||||
&& matches!(
|
||||
cached,
|
||||
CachedCodexAppsToolsLoad::Hit(_) | CachedCodexAppsToolsLoad::Invalid
|
||||
)
|
||||
{
|
||||
refresh_codex_apps_tools_cache_async(Arc::clone(client), timeout, cache_context.clone());
|
||||
}
|
||||
|
||||
match cached {
|
||||
CachedCodexAppsToolsLoad::Hit(tools) => Ok(tools),
|
||||
CachedCodexAppsToolsLoad::Invalid if refresh_in_background => Ok(Vec::new()),
|
||||
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => {
|
||||
let tools = list_tools_for_client_uncached(server_name, client, timeout).await?;
|
||||
write_cached_codex_apps_tools(&cache_context, &tools);
|
||||
Ok(tools)
|
||||
}
|
||||
}
|
||||
Ok(tools)
|
||||
}
|
||||
|
||||
fn read_cached_codex_apps_tools() -> Option<Vec<ToolInfo>> {
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let now = Instant::now();
|
||||
|
||||
if let Some(cached) = cache_guard.as_ref()
|
||||
&& now < cached.expires_at
|
||||
{
|
||||
return Some(cached.tools.clone());
|
||||
}
|
||||
|
||||
if cache_guard
|
||||
.as_ref()
|
||||
.is_some_and(|cached| now >= cached.expires_at)
|
||||
{
|
||||
*cache_guard = None;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn write_cached_codex_apps_tools(tools: &[ToolInfo]) {
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*cache_guard = Some(CachedCodexAppsTools {
|
||||
expires_at: Instant::now() + CODEX_APPS_TOOLS_CACHE_TTL,
|
||||
tools: tools.to_vec(),
|
||||
fn refresh_codex_apps_tools_cache_async(
|
||||
client: Arc<RmcpClient>,
|
||||
timeout: Option<Duration>,
|
||||
cache_context: CodexAppsToolsCacheContext,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
match list_tools_for_client_uncached(CODEX_APPS_MCP_SERVER_NAME, &client, timeout).await {
|
||||
Ok(tools) => write_cached_codex_apps_tools(&cache_context, &tools),
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}': {err:#}"
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn load_startup_cached_codex_apps_tools_snapshot(
|
||||
server_name: &str,
|
||||
cache_context: Option<&CodexAppsToolsCacheContext>,
|
||||
) -> StartupToolSnapshot {
|
||||
if server_name != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return StartupToolSnapshot::Unavailable;
|
||||
}
|
||||
|
||||
let Some(cache_context) = cache_context else {
|
||||
return StartupToolSnapshot::Unavailable;
|
||||
};
|
||||
|
||||
match load_cached_codex_apps_tools(cache_context) {
|
||||
CachedCodexAppsToolsLoad::Hit(tools) => StartupToolSnapshot::CacheHit(tools),
|
||||
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => {
|
||||
StartupToolSnapshot::Unavailable
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn read_cached_codex_apps_tools(
|
||||
cache_context: &CodexAppsToolsCacheContext,
|
||||
) -> Option<Vec<ToolInfo>> {
|
||||
match load_cached_codex_apps_tools(cache_context) {
|
||||
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
|
||||
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn load_cached_codex_apps_tools(
|
||||
cache_context: &CodexAppsToolsCacheContext,
|
||||
) -> CachedCodexAppsToolsLoad {
|
||||
let cache_path = cache_context.cache_path();
|
||||
let bytes = match std::fs::read(cache_path) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
return CachedCodexAppsToolsLoad::Missing;
|
||||
}
|
||||
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
|
||||
};
|
||||
let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) {
|
||||
Ok(cache) => cache,
|
||||
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
|
||||
};
|
||||
if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION {
|
||||
return CachedCodexAppsToolsLoad::Invalid;
|
||||
}
|
||||
CachedCodexAppsToolsLoad::Hit(cache.tools)
|
||||
}
|
||||
|
||||
fn write_cached_codex_apps_tools(cache_context: &CodexAppsToolsCacheContext, tools: &[ToolInfo]) {
|
||||
let cache_path = cache_context.cache_path();
|
||||
if let Some(parent) = cache_path.parent()
|
||||
&& std::fs::create_dir_all(parent).is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache {
|
||||
schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION,
|
||||
tools: tools.to_vec(),
|
||||
}) else {
|
||||
return;
|
||||
};
|
||||
let _ = std::fs::write(cache_path, bytes);
|
||||
}
|
||||
|
||||
async fn list_tools_for_client_uncached(
|
||||
server_name: &str,
|
||||
client: &Arc<RmcpClient>,
|
||||
@@ -1262,6 +1452,7 @@ mod tests {
|
||||
use rmcp::model::JsonObject;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempdir;
|
||||
|
||||
fn create_test_tool(server_name: &str, tool_name: &str) -> ToolInfo {
|
||||
ToolInfo {
|
||||
@@ -1283,19 +1474,19 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn with_clean_codex_apps_tools_cache<T>(f: impl FnOnce() -> T) -> T {
|
||||
let previous_cache = {
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
cache_guard.take()
|
||||
};
|
||||
let result = f();
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*cache_guard = previous_cache;
|
||||
result
|
||||
fn create_codex_apps_tools_cache_context(
|
||||
codex_home: PathBuf,
|
||||
account_id: Option<&str>,
|
||||
chatgpt_user_id: Option<&str>,
|
||||
) -> CodexAppsToolsCacheContext {
|
||||
CodexAppsToolsCacheContext {
|
||||
codex_home,
|
||||
user_key: CodexAppsToolsCacheKey {
|
||||
account_id: account_id.map(ToOwned::to_owned),
|
||||
chatgpt_user_id: chatgpt_user_id.map(ToOwned::to_owned),
|
||||
is_workspace_account: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1452,43 +1643,191 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tools_cache_is_overwritten_by_last_write() {
|
||||
with_clean_codex_apps_tools_cache(|| {
|
||||
let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")];
|
||||
let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")];
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")];
|
||||
let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")];
|
||||
|
||||
write_cached_codex_apps_tools(&tools_gateway_1);
|
||||
let cached_gateway_1 =
|
||||
read_cached_codex_apps_tools().expect("cache entry exists for first write");
|
||||
assert_eq!(cached_gateway_1[0].tool_name, "one");
|
||||
write_cached_codex_apps_tools(&cache_context, &tools_gateway_1);
|
||||
let cached_gateway_1 = read_cached_codex_apps_tools(&cache_context)
|
||||
.expect("cache entry exists for first write");
|
||||
assert_eq!(cached_gateway_1[0].tool_name, "one");
|
||||
|
||||
write_cached_codex_apps_tools(&tools_gateway_2);
|
||||
let cached_gateway_2 =
|
||||
read_cached_codex_apps_tools().expect("cache entry exists for second write");
|
||||
assert_eq!(cached_gateway_2[0].tool_name, "two");
|
||||
});
|
||||
write_cached_codex_apps_tools(&cache_context, &tools_gateway_2);
|
||||
let cached_gateway_2 = read_cached_codex_apps_tools(&cache_context)
|
||||
.expect("cache entry exists for second write");
|
||||
assert_eq!(cached_gateway_2[0].tool_name, "two");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tools_cache_is_cleared_when_expired() {
|
||||
with_clean_codex_apps_tools_cache(|| {
|
||||
let tools = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "stale_tool")];
|
||||
write_cached_codex_apps_tools(&tools);
|
||||
fn codex_apps_tools_cache_is_scoped_per_user() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context_user_1 = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let cache_context_user_2 = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-two"),
|
||||
Some("user-two"),
|
||||
);
|
||||
let tools_user_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")];
|
||||
let tools_user_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")];
|
||||
|
||||
{
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
cache_guard.as_mut().expect("cache exists").expires_at =
|
||||
Instant::now() - Duration::from_secs(1);
|
||||
}
|
||||
write_cached_codex_apps_tools(&cache_context_user_1, &tools_user_1);
|
||||
write_cached_codex_apps_tools(&cache_context_user_2, &tools_user_2);
|
||||
|
||||
assert!(read_cached_codex_apps_tools().is_none());
|
||||
let read_user_1 =
|
||||
read_cached_codex_apps_tools(&cache_context_user_1).expect("cache entry for user one");
|
||||
let read_user_2 =
|
||||
read_cached_codex_apps_tools(&cache_context_user_2).expect("cache entry for user two");
|
||||
|
||||
let cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
assert!(cache_guard.is_none());
|
||||
});
|
||||
assert_eq!(read_user_1[0].tool_name, "one");
|
||||
assert_eq!(read_user_2[0].tool_name, "two");
|
||||
assert_ne!(
|
||||
cache_context_user_1.cache_path(),
|
||||
cache_context_user_2.cache_path(),
|
||||
"each user should get an isolated cache file"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tools_cache_is_ignored_when_schema_version_mismatches() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let cache_path = cache_context.cache_path();
|
||||
if let Some(parent) = cache_path.parent() {
|
||||
std::fs::create_dir_all(parent).expect("create parent");
|
||||
}
|
||||
let bytes = serde_json::to_vec_pretty(&serde_json::json!({
|
||||
"schema_version": CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION + 1,
|
||||
"tools": [create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")],
|
||||
}))
|
||||
.expect("serialize");
|
||||
std::fs::write(cache_path, bytes).expect("write");
|
||||
|
||||
assert!(read_cached_codex_apps_tools(&cache_context).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tools_cache_is_ignored_when_json_is_invalid() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let cache_path = cache_context.cache_path();
|
||||
if let Some(parent) = cache_path.parent() {
|
||||
std::fs::create_dir_all(parent).expect("create parent");
|
||||
}
|
||||
std::fs::write(cache_path, b"{not json").expect("write");
|
||||
|
||||
assert!(read_cached_codex_apps_tools(&cache_context).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn startup_cached_codex_apps_tools_loads_from_disk_cache() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let cached_tools = vec![create_test_tool(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
"calendar_search",
|
||||
)];
|
||||
write_cached_codex_apps_tools(&cache_context, &cached_tools);
|
||||
|
||||
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
Some(&cache_context),
|
||||
);
|
||||
let StartupToolSnapshot::CacheHit(startup_tools) = startup_snapshot else {
|
||||
panic!("expected startup snapshot to load from cache");
|
||||
};
|
||||
|
||||
assert_eq!(startup_tools.len(), 1);
|
||||
assert_eq!(startup_tools[0].server_name, CODEX_APPS_MCP_SERVER_NAME);
|
||||
assert_eq!(startup_tools[0].tool_name, "calendar_search");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_all_tools_uses_startup_snapshot_while_client_is_pending() {
|
||||
let startup_tools = vec![create_test_tool(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
"calendar_create_event",
|
||||
)];
|
||||
let pending_client =
|
||||
futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
|
||||
.boxed()
|
||||
.shared();
|
||||
let mut manager = McpConnectionManager::default();
|
||||
manager.clients.insert(
|
||||
CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
AsyncManagedClient {
|
||||
client: pending_client,
|
||||
startup_snapshot: StartupToolSnapshot::CacheHit(startup_tools),
|
||||
},
|
||||
);
|
||||
|
||||
let tools = manager.list_all_tools().await;
|
||||
let tool = tools
|
||||
.get("mcp__codex_apps__calendar_create_event")
|
||||
.expect("tool from startup cache");
|
||||
assert_eq!(tool.server_name, CODEX_APPS_MCP_SERVER_NAME);
|
||||
assert_eq!(tool.tool_name, "calendar_create_event");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_all_tools_blocks_while_client_is_pending_without_startup_snapshot() {
|
||||
let pending_client =
|
||||
futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
|
||||
.boxed()
|
||||
.shared();
|
||||
let mut manager = McpConnectionManager::default();
|
||||
manager.clients.insert(
|
||||
CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
AsyncManagedClient {
|
||||
client: pending_client,
|
||||
startup_snapshot: StartupToolSnapshot::Unavailable,
|
||||
},
|
||||
);
|
||||
|
||||
let timeout_result =
|
||||
tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await;
|
||||
assert!(timeout_result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_all_tools_does_not_block_when_startup_snapshot_cache_hit_is_empty() {
|
||||
let pending_client =
|
||||
futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
|
||||
.boxed()
|
||||
.shared();
|
||||
let mut manager = McpConnectionManager::default();
|
||||
manager.clients.insert(
|
||||
CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
AsyncManagedClient {
|
||||
client: pending_client,
|
||||
startup_snapshot: StartupToolSnapshot::CacheHit(Vec::new()),
|
||||
},
|
||||
);
|
||||
|
||||
let timeout_result =
|
||||
tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await;
|
||||
let tools = timeout_result.expect("cache-hit startup snapshot should not block");
|
||||
assert!(tools.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user