mirror of
https://github.com/openai/codex.git
synced 2026-02-19 15:23:46 +00:00
Compare commits
4 Commits
dev/mzeng/
...
latest-alp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8bf5e734ca | ||
|
|
2daa3fd44f | ||
|
|
f298c48cc6 | ||
|
|
227352257c |
@@ -66,7 +66,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.0.0"
|
||||
version = "0.105.0-alpha.4"
|
||||
# Track the edition for all workspace crates in one place. Individual
|
||||
# crates can still override this value, but keeping it here means new
|
||||
# crates created with `cargo new -w ...` automatically inherit the 2024
|
||||
|
||||
@@ -4,6 +4,8 @@ use std::sync::LazyLock;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
|
||||
use codex_core::config::Config;
|
||||
use codex_core::default_client::is_first_party_chat_originator;
|
||||
use codex_core::default_client::originator;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::token_data::TokenData;
|
||||
use serde::Deserialize;
|
||||
@@ -20,10 +22,8 @@ use codex_core::connectors::AppMetadata;
|
||||
use codex_core::connectors::CONNECTORS_CACHE_TTL;
|
||||
pub use codex_core::connectors::connector_display_label;
|
||||
use codex_core::connectors::connector_install_url;
|
||||
use codex_core::connectors::filter_disallowed_connectors;
|
||||
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools;
|
||||
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options;
|
||||
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options_and_status;
|
||||
pub use codex_core::connectors::list_cached_accessible_connectors_from_mcp_tools;
|
||||
use codex_core::connectors::merge_connectors;
|
||||
pub use codex_core::connectors::with_app_enabled_state;
|
||||
@@ -106,7 +106,7 @@ pub async fn list_cached_all_connectors(config: &Config) -> Option<Vec<AppInfo>>
|
||||
}
|
||||
let token_data = get_chatgpt_token_data()?;
|
||||
let cache_key = all_connectors_cache_key(config, &token_data);
|
||||
read_cached_all_connectors(&cache_key).map(filter_disallowed_connectors)
|
||||
read_cached_all_connectors(&cache_key)
|
||||
}
|
||||
|
||||
pub async fn list_all_connectors_with_options(
|
||||
@@ -123,7 +123,7 @@ pub async fn list_all_connectors_with_options(
|
||||
get_chatgpt_token_data().ok_or_else(|| anyhow::anyhow!("ChatGPT token not available"))?;
|
||||
let cache_key = all_connectors_cache_key(config, &token_data);
|
||||
if !force_refetch && let Some(cached_connectors) = read_cached_all_connectors(&cache_key) {
|
||||
return Ok(filter_disallowed_connectors(cached_connectors));
|
||||
return Ok(cached_connectors);
|
||||
}
|
||||
|
||||
let mut apps = list_directory_connectors(config).await?;
|
||||
@@ -149,7 +149,6 @@ pub async fn list_all_connectors_with_options(
|
||||
.cmp(&right.name)
|
||||
.then_with(|| left.id.cmp(&right.id))
|
||||
});
|
||||
let connectors = filter_disallowed_connectors(connectors);
|
||||
write_cached_all_connectors(cache_key, &connectors);
|
||||
Ok(connectors)
|
||||
}
|
||||
@@ -454,6 +453,45 @@ fn normalize_connector_value(value: Option<&str>) -> Option<String> {
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
const DISALLOWED_CONNECTOR_IDS: &[&str] = &[
|
||||
"asdk_app_6938a94a61d881918ef32cb999ff937c",
|
||||
"connector_2b0a9009c9c64bf9933a3dae3f2b1254",
|
||||
"connector_68de829bf7648191acd70a907364c67c",
|
||||
"connector_68e004f14af881919eb50893d3d9f523",
|
||||
"connector_69272cb413a081919685ec3c88d1744e",
|
||||
"connector_0f9c9d4592e54d0a9a12b3f44a1e2010",
|
||||
];
|
||||
const FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS: &[&str] =
|
||||
&["connector_0f9c9d4592e54d0a9a12b3f44a1e2010"];
|
||||
const DISALLOWED_CONNECTOR_PREFIX: &str = "connector_openai_";
|
||||
|
||||
fn filter_disallowed_connectors(connectors: Vec<AppInfo>) -> Vec<AppInfo> {
|
||||
filter_disallowed_connectors_for_originator(connectors, originator().value.as_str())
|
||||
}
|
||||
|
||||
fn filter_disallowed_connectors_for_originator(
|
||||
connectors: Vec<AppInfo>,
|
||||
originator_value: &str,
|
||||
) -> Vec<AppInfo> {
|
||||
let disallowed_connector_ids = if is_first_party_chat_originator(originator_value) {
|
||||
FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS
|
||||
} else {
|
||||
DISALLOWED_CONNECTOR_IDS
|
||||
};
|
||||
|
||||
connectors
|
||||
.into_iter()
|
||||
.filter(|connector| is_connector_allowed(connector, disallowed_connector_ids))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn is_connector_allowed(connector: &AppInfo, disallowed_connector_ids: &[&str]) -> bool {
|
||||
let connector_id = connector.id.as_str();
|
||||
!connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX)
|
||||
&& !disallowed_connector_ids.contains(&connector_id)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -516,6 +554,22 @@ mod tests {
|
||||
assert_eq!(filtered, vec![app("delta")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn first_party_chat_originator_filters_target_and_openai_prefixed_connectors() {
|
||||
let filtered = filter_disallowed_connectors_for_originator(
|
||||
vec![
|
||||
app("connector_openai_foo"),
|
||||
app("asdk_app_6938a94a61d881918ef32cb999ff937c"),
|
||||
app("connector_0f9c9d4592e54d0a9a12b3f44a1e2010"),
|
||||
],
|
||||
"codex_atlas",
|
||||
);
|
||||
assert_eq!(
|
||||
filtered,
|
||||
vec![app("asdk_app_6938a94a61d881918ef32cb999ff937c"),]
|
||||
);
|
||||
}
|
||||
|
||||
fn merged_app(id: &str, is_accessible: bool) -> AppInfo {
|
||||
AppInfo {
|
||||
id: id.to_string(),
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::agent::guards::Guards;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::path::PathBuf;
|
||||
@@ -46,6 +49,7 @@ impl AgentControl {
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
let notification_source = session_source.clone();
|
||||
|
||||
// The same `AgentControl` is sent to spawn the thread.
|
||||
let new_thread = match session_source {
|
||||
@@ -64,6 +68,7 @@ impl AgentControl {
|
||||
state.notify_thread_created(new_thread.thread_id);
|
||||
|
||||
self.send_input(new_thread.thread_id, items).await?;
|
||||
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
|
||||
|
||||
Ok(new_thread.thread_id)
|
||||
}
|
||||
@@ -77,6 +82,7 @@ impl AgentControl {
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
let notification_source = session_source.clone();
|
||||
|
||||
let resumed_thread = state
|
||||
.resume_thread_from_rollout_with_source(
|
||||
@@ -90,6 +96,7 @@ impl AgentControl {
|
||||
// Resumed threads are re-registered in-memory and need the same listener
|
||||
// attachment path as freshly spawned threads.
|
||||
state.notify_thread_created(resumed_thread.thread_id);
|
||||
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
|
||||
|
||||
Ok(resumed_thread.thread_id)
|
||||
}
|
||||
@@ -164,13 +171,60 @@ impl AgentControl {
|
||||
thread.total_token_usage().await
|
||||
}
|
||||
|
||||
/// Starts a detached watcher for sub-agents spawned from another thread.
|
||||
///
|
||||
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
|
||||
/// can receive completion notifications.
|
||||
fn maybe_start_completion_watcher(
|
||||
&self,
|
||||
child_thread_id: ThreadId,
|
||||
session_source: Option<SessionSource>,
|
||||
) {
|
||||
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id, ..
|
||||
})) = session_source
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let control = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut status_rx = match control.subscribe_status(child_thread_id).await {
|
||||
Ok(rx) => rx,
|
||||
Err(_) => return,
|
||||
};
|
||||
let mut status = status_rx.borrow().clone();
|
||||
while !is_final(&status) {
|
||||
if status_rx.changed().await.is_err() {
|
||||
status = control.get_status(child_thread_id).await;
|
||||
break;
|
||||
}
|
||||
status = status_rx.borrow().clone();
|
||||
}
|
||||
if !is_final(&status) {
|
||||
return;
|
||||
}
|
||||
|
||||
let Ok(state) = control.upgrade() else {
|
||||
return;
|
||||
};
|
||||
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {
|
||||
return;
|
||||
};
|
||||
parent_thread
|
||||
.inject_user_message_without_turn(format_subagent_notification_message(
|
||||
&child_thread_id.to_string(),
|
||||
&status,
|
||||
))
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
fn upgrade(&self) -> CodexResult<Arc<ThreadManagerState>> {
|
||||
self.manager
|
||||
.upgrade()
|
||||
.ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -180,16 +234,24 @@ mod tests {
|
||||
use crate::agent::agent_status_from_event;
|
||||
use crate::config::Config;
|
||||
use crate::config::ConfigBuilder;
|
||||
use crate::session_prefix::SUBAGENT_NOTIFICATION_OPEN_TAG;
|
||||
use assert_matches::assert_matches;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
use toml::Value as TomlValue;
|
||||
|
||||
async fn test_config_with_cli_overrides(
|
||||
@@ -250,6 +312,42 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn has_subagent_notification(history_items: &[ResponseItem]) -> bool {
|
||||
history_items.iter().any(|item| {
|
||||
let ResponseItem::Message { role, content, .. } = item else {
|
||||
return false;
|
||||
};
|
||||
if role != "user" {
|
||||
return false;
|
||||
}
|
||||
content.iter().any(|content_item| match content_item {
|
||||
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
|
||||
text.contains(SUBAGENT_NOTIFICATION_OPEN_TAG)
|
||||
}
|
||||
ContentItem::InputImage { .. } => false,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> bool {
|
||||
let wait = async {
|
||||
loop {
|
||||
let history_items = parent_thread
|
||||
.codex
|
||||
.session
|
||||
.clone_history()
|
||||
.await
|
||||
.raw_items()
|
||||
.to_vec();
|
||||
if has_subagent_notification(&history_items) {
|
||||
return true;
|
||||
}
|
||||
sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
};
|
||||
timeout(Duration::from_secs(2), wait).await.is_ok()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_input_errors_when_manager_dropped() {
|
||||
let control = AgentControl::default();
|
||||
@@ -683,4 +781,35 @@ mod tests {
|
||||
.await
|
||||
.expect("shutdown resumed thread");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_child_completion_notifies_parent_history() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
|
||||
let child_thread_id = harness
|
||||
.control
|
||||
.spawn_agent(
|
||||
harness.config.clone(),
|
||||
text_input("hello child"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.expect("child spawn should succeed");
|
||||
|
||||
let child_thread = harness
|
||||
.manager
|
||||
.get_thread(child_thread_id)
|
||||
.await
|
||||
.expect("child thread should exist");
|
||||
let _ = child_thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
|
||||
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,8 +150,6 @@ use crate::mcp::effective_mcp_servers;
|
||||
use crate::mcp::maybe_prompt_and_install_mcp_dependencies;
|
||||
use crate::mcp::with_codex_apps_mcp;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::mcp_connection_manager::McpConnectionManagerInitializeParams;
|
||||
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
use crate::mcp_connection_manager::filter_codex_apps_mcp_tools_only;
|
||||
use crate::mcp_connection_manager::filter_mcp_tools_by_name;
|
||||
use crate::mcp_connection_manager::filter_non_codex_apps_mcp_tools_only;
|
||||
@@ -1411,15 +1409,11 @@ impl Session {
|
||||
.await
|
||||
.initialize(
|
||||
&mcp_servers,
|
||||
McpConnectionManagerInitializeParams {
|
||||
store_mode: config.mcp_oauth_credentials_store_mode,
|
||||
auth_entries: auth_statuses.clone(),
|
||||
tx_event: tx_event.clone(),
|
||||
cancel_token,
|
||||
initial_sandbox_state: sandbox_state,
|
||||
codex_home: config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key: codex_apps_tools_cache_key(auth),
|
||||
},
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_statuses.clone(),
|
||||
tx_event.clone(),
|
||||
cancel_token,
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
if !required_mcp_servers.is_empty() {
|
||||
@@ -3041,15 +3035,11 @@ impl Session {
|
||||
refreshed_manager
|
||||
.initialize(
|
||||
&mcp_servers,
|
||||
McpConnectionManagerInitializeParams {
|
||||
store_mode,
|
||||
auth_entries: auth_statuses,
|
||||
tx_event: self.get_tx_event(),
|
||||
cancel_token,
|
||||
initial_sandbox_state: sandbox_state,
|
||||
codex_home: config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key: codex_apps_tools_cache_key(auth.as_ref()),
|
||||
},
|
||||
store_mode,
|
||||
auth_statuses,
|
||||
self.get_tx_event(),
|
||||
cancel_token,
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -8,6 +8,9 @@ use crate::protocol::Event;
|
||||
use crate::protocol::Op;
|
||||
use crate::protocol::Submission;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
@@ -32,7 +35,7 @@ pub struct ThreadConfigSnapshot {
|
||||
}
|
||||
|
||||
pub struct CodexThread {
|
||||
codex: Codex,
|
||||
pub(crate) codex: Codex,
|
||||
rollout_path: Option<PathBuf>,
|
||||
_watch_registration: WatchRegistration,
|
||||
}
|
||||
@@ -85,6 +88,33 @@ impl CodexThread {
|
||||
self.codex.session.total_token_usage().await
|
||||
}
|
||||
|
||||
/// Records a user-role session-prefix message without creating a new user turn boundary.
|
||||
pub(crate) async fn inject_user_message_without_turn(&self, message: String) {
|
||||
let pending_item = ResponseInputItem::Message {
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText { text: message }],
|
||||
};
|
||||
let pending_items = vec![pending_item];
|
||||
let Err(items_without_active_turn) = self
|
||||
.codex
|
||||
.session
|
||||
.inject_response_items(pending_items)
|
||||
.await
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
let turn_context = self.codex.session.new_default_turn().await;
|
||||
let items: Vec<ResponseItem> = items_without_active_turn
|
||||
.into_iter()
|
||||
.map(ResponseItem::from)
|
||||
.collect();
|
||||
self.codex
|
||||
.session
|
||||
.record_conversation_items(turn_context.as_ref(), &items)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub fn rollout_path(&self) -> Option<PathBuf> {
|
||||
self.rollout_path.clone()
|
||||
}
|
||||
|
||||
@@ -23,9 +23,9 @@ use serde::Serialize;
|
||||
use serde::de::Error as SerdeError;
|
||||
|
||||
pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev";
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 8;
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16;
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
|
||||
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 12;
|
||||
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
|
||||
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)]
|
||||
|
||||
@@ -20,19 +20,15 @@ use crate::CodexAuth;
|
||||
use crate::SandboxState;
|
||||
use crate::config::Config;
|
||||
use crate::config::types::AppsConfigToml;
|
||||
use crate::default_client::is_first_party_chat_originator;
|
||||
use crate::default_client::originator;
|
||||
use crate::features::Feature;
|
||||
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use crate::mcp::auth::compute_auth_statuses;
|
||||
use crate::mcp::with_codex_apps_mcp;
|
||||
use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::mcp_connection_manager::McpConnectionManagerInitializeParams;
|
||||
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
use crate::token_data::TokenData;
|
||||
|
||||
pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600);
|
||||
const CONNECTORS_READY_TIMEOUT_ON_EMPTY_TOOLS: Duration = Duration::from_secs(30);
|
||||
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
struct AccessibleConnectorsCacheKey {
|
||||
@@ -52,20 +48,10 @@ struct CachedAccessibleConnectors {
|
||||
static ACCESSIBLE_CONNECTORS_CACHE: LazyLock<StdMutex<Option<CachedAccessibleConnectors>>> =
|
||||
LazyLock::new(|| StdMutex::new(None));
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AccessibleConnectorsStatus {
|
||||
pub connectors: Vec<AppInfo>,
|
||||
pub codex_apps_ready: bool,
|
||||
}
|
||||
|
||||
pub async fn list_accessible_connectors_from_mcp_tools(
|
||||
config: &Config,
|
||||
) -> anyhow::Result<Vec<AppInfo>> {
|
||||
Ok(
|
||||
list_accessible_connectors_from_mcp_tools_with_options_and_status(config, false)
|
||||
.await?
|
||||
.connectors,
|
||||
)
|
||||
list_accessible_connectors_from_mcp_tools_with_options(config, false).await
|
||||
}
|
||||
|
||||
pub async fn list_cached_accessible_connectors_from_mcp_tools(
|
||||
@@ -78,29 +64,15 @@ pub async fn list_cached_accessible_connectors_from_mcp_tools(
|
||||
let auth_manager = auth_manager_from_config(config);
|
||||
let auth = auth_manager.auth().await;
|
||||
let cache_key = accessible_connectors_cache_key(config, auth.as_ref());
|
||||
read_cached_accessible_connectors(&cache_key).map(filter_disallowed_connectors)
|
||||
read_cached_accessible_connectors(&cache_key)
|
||||
}
|
||||
|
||||
pub async fn list_accessible_connectors_from_mcp_tools_with_options(
|
||||
config: &Config,
|
||||
force_refetch: bool,
|
||||
) -> anyhow::Result<Vec<AppInfo>> {
|
||||
Ok(
|
||||
list_accessible_connectors_from_mcp_tools_with_options_and_status(config, force_refetch)
|
||||
.await?
|
||||
.connectors,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
config: &Config,
|
||||
force_refetch: bool,
|
||||
) -> anyhow::Result<AccessibleConnectorsStatus> {
|
||||
if !config.features.enabled(Feature::Apps) {
|
||||
return Ok(AccessibleConnectorsStatus {
|
||||
connectors: Vec::new(),
|
||||
codex_apps_ready: true,
|
||||
});
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let auth_manager = auth_manager_from_config(config);
|
||||
@@ -108,19 +80,12 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
let cache_key = accessible_connectors_cache_key(config, auth.as_ref());
|
||||
if !force_refetch && let Some(cached_connectors) = read_cached_accessible_connectors(&cache_key)
|
||||
{
|
||||
let cached_connectors = filter_disallowed_connectors(cached_connectors);
|
||||
return Ok(AccessibleConnectorsStatus {
|
||||
connectors: cached_connectors,
|
||||
codex_apps_ready: true,
|
||||
});
|
||||
return Ok(cached_connectors);
|
||||
}
|
||||
|
||||
let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config);
|
||||
if mcp_servers.is_empty() {
|
||||
return Ok(AccessibleConnectorsStatus {
|
||||
connectors: Vec::new(),
|
||||
codex_apps_ready: true,
|
||||
});
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let auth_status_entries =
|
||||
@@ -141,15 +106,11 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
mcp_connection_manager
|
||||
.initialize(
|
||||
&mcp_servers,
|
||||
McpConnectionManagerInitializeParams {
|
||||
store_mode: config.mcp_oauth_credentials_store_mode,
|
||||
auth_entries: auth_status_entries,
|
||||
tx_event,
|
||||
cancel_token: cancel_token.clone(),
|
||||
initial_sandbox_state: sandbox_state,
|
||||
codex_home: config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key: codex_apps_tools_cache_key(auth.as_ref()),
|
||||
},
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_status_entries,
|
||||
tx_event,
|
||||
cancel_token.clone(),
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -163,43 +124,23 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
);
|
||||
}
|
||||
|
||||
let mut tools = mcp_connection_manager.list_all_tools().await;
|
||||
let mut should_reload_tools = false;
|
||||
let codex_apps_ready = if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) {
|
||||
let immediate_ready = mcp_connection_manager
|
||||
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, Duration::ZERO)
|
||||
.await;
|
||||
if immediate_ready {
|
||||
true
|
||||
} else if tools.is_empty() {
|
||||
let timeout = cfg
|
||||
.startup_timeout_sec
|
||||
.unwrap_or(CONNECTORS_READY_TIMEOUT_ON_EMPTY_TOOLS);
|
||||
let ready = mcp_connection_manager
|
||||
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout)
|
||||
.await;
|
||||
should_reload_tools = ready;
|
||||
ready
|
||||
} else {
|
||||
false
|
||||
}
|
||||
let timeout = cfg.startup_timeout_sec.unwrap_or(DEFAULT_STARTUP_TIMEOUT);
|
||||
mcp_connection_manager
|
||||
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout)
|
||||
.await
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if should_reload_tools {
|
||||
tools = mcp_connection_manager.list_all_tools().await;
|
||||
}
|
||||
|
||||
let tools = mcp_connection_manager.list_all_tools().await;
|
||||
cancel_token.cancel();
|
||||
|
||||
let accessible_connectors =
|
||||
filter_disallowed_connectors(accessible_connectors_from_mcp_tools(&tools));
|
||||
let accessible_connectors = accessible_connectors_from_mcp_tools(&tools);
|
||||
if codex_apps_ready || !accessible_connectors.is_empty() {
|
||||
write_cached_accessible_connectors(cache_key, &accessible_connectors);
|
||||
}
|
||||
Ok(AccessibleConnectorsStatus {
|
||||
connectors: accessible_connectors,
|
||||
codex_apps_ready,
|
||||
})
|
||||
Ok(accessible_connectors)
|
||||
}
|
||||
|
||||
fn accessible_connectors_cache_key(
|
||||
@@ -351,48 +292,6 @@ pub fn with_app_enabled_state(mut connectors: Vec<AppInfo>, config: &Config) ->
|
||||
connectors
|
||||
}
|
||||
|
||||
const DISALLOWED_CONNECTOR_IDS: &[&str] = &[
|
||||
"asdk_app_6938a94a61d881918ef32cb999ff937c",
|
||||
"connector_2b0a9009c9c64bf9933a3dae3f2b1254",
|
||||
"connector_68de829bf7648191acd70a907364c67c",
|
||||
"connector_68e004f14af881919eb50893d3d9f523",
|
||||
"connector_69272cb413a081919685ec3c88d1744e",
|
||||
];
|
||||
const FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS: &[&str] =
|
||||
&["connector_0f9c9d4592e54d0a9a12b3f44a1e2010"];
|
||||
const DISALLOWED_CONNECTOR_PREFIX: &str = "connector_openai_";
|
||||
|
||||
pub fn filter_disallowed_connectors(connectors: Vec<AppInfo>) -> Vec<AppInfo> {
|
||||
filter_disallowed_connectors_for_originator(connectors, originator().value.as_str())
|
||||
}
|
||||
|
||||
pub(crate) fn is_connector_id_allowed(connector_id: &str) -> bool {
|
||||
is_connector_id_allowed_for_originator(connector_id, originator().value.as_str())
|
||||
}
|
||||
|
||||
fn filter_disallowed_connectors_for_originator(
|
||||
connectors: Vec<AppInfo>,
|
||||
originator_value: &str,
|
||||
) -> Vec<AppInfo> {
|
||||
connectors
|
||||
.into_iter()
|
||||
.filter(|connector| {
|
||||
is_connector_id_allowed_for_originator(connector.id.as_str(), originator_value)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn is_connector_id_allowed_for_originator(connector_id: &str, originator_value: &str) -> bool {
|
||||
let disallowed_connector_ids = if is_first_party_chat_originator(originator_value) {
|
||||
FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS
|
||||
} else {
|
||||
DISALLOWED_CONNECTOR_IDS
|
||||
};
|
||||
|
||||
!connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX)
|
||||
&& !disallowed_connector_ids.contains(&connector_id)
|
||||
}
|
||||
|
||||
fn read_apps_config(config: &Config) -> Option<AppsConfigToml> {
|
||||
let effective_config = config.config_layer_stack.effective_config();
|
||||
let apps_config = effective_config.as_table()?.get("apps")?.clone();
|
||||
@@ -473,67 +372,3 @@ pub fn connector_name_slug(name: &str) -> String {
|
||||
fn format_connector_label(name: &str, _id: &str) -> String {
|
||||
name.to_string()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn app(id: &str) -> AppInfo {
|
||||
AppInfo {
|
||||
id: id.to_string(),
|
||||
name: id.to_string(),
|
||||
description: None,
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: None,
|
||||
branding: None,
|
||||
app_metadata: None,
|
||||
labels: None,
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_disallowed_connectors_allows_non_disallowed_connectors() {
|
||||
let filtered = filter_disallowed_connectors(vec![app("asdk_app_hidden"), app("alpha")]);
|
||||
assert_eq!(filtered, vec![app("asdk_app_hidden"), app("alpha")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_disallowed_connectors_filters_openai_prefix() {
|
||||
let filtered = filter_disallowed_connectors(vec![
|
||||
app("connector_openai_foo"),
|
||||
app("connector_openai_bar"),
|
||||
app("gamma"),
|
||||
]);
|
||||
assert_eq!(filtered, vec![app("gamma")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_disallowed_connectors_filters_disallowed_connector_ids() {
|
||||
let filtered = filter_disallowed_connectors(vec![
|
||||
app("asdk_app_6938a94a61d881918ef32cb999ff937c"),
|
||||
app("delta"),
|
||||
]);
|
||||
assert_eq!(filtered, vec![app("delta")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn first_party_chat_originator_filters_target_and_openai_prefixed_connectors() {
|
||||
let filtered = filter_disallowed_connectors_for_originator(
|
||||
vec![
|
||||
app("connector_openai_foo"),
|
||||
app("asdk_app_6938a94a61d881918ef32cb999ff937c"),
|
||||
app("connector_0f9c9d4592e54d0a9a12b3f44a1e2010"),
|
||||
],
|
||||
"codex_atlas",
|
||||
);
|
||||
assert_eq!(
|
||||
filtered,
|
||||
vec![app("asdk_app_6938a94a61d881918ef32cb999ff937c"),]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -571,6 +571,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
user_input_text_msg("turn 1 user"),
|
||||
assistant_msg("turn 1 assistant"),
|
||||
user_input_text_msg("turn 2 user"),
|
||||
@@ -591,6 +594,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
user_input_text_msg("turn 1 user"),
|
||||
assistant_msg("turn 1 assistant"),
|
||||
];
|
||||
@@ -610,6 +616,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
];
|
||||
|
||||
let mut history = create_history_with_items(vec![
|
||||
@@ -622,6 +631,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
user_input_text_msg("turn 1 user"),
|
||||
assistant_msg("turn 1 assistant"),
|
||||
user_input_text_msg("turn 2 user"),
|
||||
@@ -640,6 +652,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
user_input_text_msg("turn 1 user"),
|
||||
assistant_msg("turn 1 assistant"),
|
||||
user_input_text_msg("turn 2 user"),
|
||||
|
||||
@@ -370,7 +370,7 @@ fn legacy_usage_notice(alias: &str, feature: Feature) -> (String, Option<String>
|
||||
None
|
||||
} else {
|
||||
Some(format!(
|
||||
"Enable it with `--enable {canonical}` or `[features].{canonical}` in config.toml. See https://github.com/openai/codex/blob/main/docs/config.md#feature-flags for details."
|
||||
"Enable it with `--enable {canonical}` or `[features].{canonical}` in config.toml. See https://developers.openai.com/codex/config-basic#feature-flags for details."
|
||||
))
|
||||
};
|
||||
(summary, details)
|
||||
|
||||
@@ -24,9 +24,7 @@ use crate::config::types::McpServerTransportConfig;
|
||||
use crate::features::Feature;
|
||||
use crate::mcp::auth::compute_auth_statuses;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::mcp_connection_manager::McpConnectionManagerInitializeParams;
|
||||
use crate::mcp_connection_manager::SandboxState;
|
||||
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
|
||||
const MCP_TOOL_NAME_PREFIX: &str = "mcp";
|
||||
const MCP_TOOL_NAME_DELIMITER: &str = "__";
|
||||
@@ -209,15 +207,11 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent
|
||||
mcp_connection_manager
|
||||
.initialize(
|
||||
&mcp_servers,
|
||||
McpConnectionManagerInitializeParams {
|
||||
store_mode: config.mcp_oauth_credentials_store_mode,
|
||||
auth_entries: auth_status_entries.clone(),
|
||||
tx_event,
|
||||
cancel_token: cancel_token.clone(),
|
||||
initial_sandbox_state: sandbox_state,
|
||||
codex_home: config.codex_home.clone(),
|
||||
codex_apps_tools_cache_key: codex_apps_tools_cache_key(auth.as_ref()),
|
||||
},
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_status_entries.clone(),
|
||||
tx_event,
|
||||
cancel_token.clone(),
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -12,8 +12,8 @@ use std::env;
|
||||
use std::ffi::OsString;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
@@ -73,7 +73,6 @@ use tracing::warn;
|
||||
use crate::codex::INITIAL_SUBMIT_ID;
|
||||
use crate::config::types::McpServerConfig;
|
||||
use crate::config::types::McpServerTransportConfig;
|
||||
use crate::connectors::is_connector_id_allowed;
|
||||
|
||||
/// Delimiter used to separate the server name from the tool name in a fully
|
||||
/// qualified tool name.
|
||||
@@ -89,8 +88,7 @@ pub const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
/// Default timeout for individual tool calls.
|
||||
const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 1;
|
||||
const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools";
|
||||
const CODEX_APPS_TOOLS_CACHE_TTL: Duration = Duration::from_secs(3600);
|
||||
const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
|
||||
const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str = "codex.mcp.tools.fetch_uncached.duration_ms";
|
||||
const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str = "codex.mcp.tools.cache_write.duration_ms";
|
||||
@@ -122,27 +120,6 @@ fn sha1_hex(s: &str) -> String {
|
||||
format!("{sha1:x}")
|
||||
}
|
||||
|
||||
pub(crate) fn codex_apps_tools_cache_key(
|
||||
auth: Option<&crate::CodexAuth>,
|
||||
) -> CodexAppsToolsCacheKey {
|
||||
let token_data = auth.and_then(|auth| auth.get_token_data().ok());
|
||||
let account_id = token_data
|
||||
.as_ref()
|
||||
.and_then(|token_data| token_data.account_id.clone());
|
||||
let chatgpt_user_id = token_data
|
||||
.as_ref()
|
||||
.and_then(|token_data| token_data.id_token.chatgpt_user_id.clone());
|
||||
let is_workspace_account = token_data
|
||||
.as_ref()
|
||||
.is_some_and(|token_data| token_data.id_token.is_workspace_account());
|
||||
|
||||
CodexAppsToolsCacheKey {
|
||||
account_id,
|
||||
chatgpt_user_id,
|
||||
is_workspace_account,
|
||||
}
|
||||
}
|
||||
|
||||
fn qualify_tools<I>(tools: I) -> HashMap<String, ToolInfo>
|
||||
where
|
||||
I: IntoIterator<Item = ToolInfo>,
|
||||
@@ -185,7 +162,7 @@ where
|
||||
qualified_tools
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ToolInfo {
|
||||
pub(crate) server_name: String,
|
||||
pub(crate) tool_name: String,
|
||||
@@ -194,40 +171,14 @@ pub(crate) struct ToolInfo {
|
||||
pub(crate) connector_name: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub(crate) struct CodexAppsToolsCacheKey {
|
||||
account_id: Option<String>,
|
||||
chatgpt_user_id: Option<String>,
|
||||
is_workspace_account: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CodexAppsToolsCacheContext {
|
||||
codex_home: PathBuf,
|
||||
user_key: CodexAppsToolsCacheKey,
|
||||
}
|
||||
|
||||
impl CodexAppsToolsCacheContext {
|
||||
fn cache_path(&self) -> PathBuf {
|
||||
let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default();
|
||||
let user_key_hash = sha1_hex(&user_key_json);
|
||||
self.codex_home
|
||||
.join(CODEX_APPS_TOOLS_CACHE_DIR)
|
||||
.join(format!("{user_key_hash}.json"))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct CodexAppsToolsDiskCache {
|
||||
schema_version: u8,
|
||||
struct CachedCodexAppsTools {
|
||||
expires_at: Instant,
|
||||
tools: Vec<ToolInfo>,
|
||||
}
|
||||
|
||||
enum CachedCodexAppsToolsLoad {
|
||||
Hit(Vec<ToolInfo>),
|
||||
Missing,
|
||||
Invalid,
|
||||
}
|
||||
static CODEX_APPS_TOOLS_CACHE: LazyLock<StdMutex<Option<CachedCodexAppsTools>>> =
|
||||
LazyLock::new(|| StdMutex::new(None));
|
||||
|
||||
type ResponderMap = HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>;
|
||||
|
||||
@@ -305,35 +256,9 @@ struct ManagedClient {
|
||||
tool_filter: ToolFilter,
|
||||
tool_timeout: Option<Duration>,
|
||||
server_supports_sandbox_state_capability: bool,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
}
|
||||
|
||||
impl ManagedClient {
|
||||
fn listed_tools(&self) -> Vec<ToolInfo> {
|
||||
let total_start = Instant::now();
|
||||
if let Some(cache_context) = self.codex_apps_tools_cache_context.as_ref()
|
||||
&& let CachedCodexAppsToolsLoad::Hit(tools) =
|
||||
load_cached_codex_apps_tools(cache_context)
|
||||
{
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
total_start.elapsed(),
|
||||
&[("cache", "hit")],
|
||||
);
|
||||
return filter_tools(tools, &self.tool_filter);
|
||||
}
|
||||
|
||||
if self.codex_apps_tools_cache_context.is_some() {
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
total_start.elapsed(),
|
||||
&[("cache", "miss")],
|
||||
);
|
||||
}
|
||||
|
||||
self.tools.clone()
|
||||
}
|
||||
|
||||
/// Returns once the server has ack'd the sandbox state update.
|
||||
async fn notify_sandbox_state_change(&self, sandbox_state: &SandboxState) -> Result<()> {
|
||||
if !self.server_supports_sandbox_state_capability {
|
||||
@@ -354,8 +279,6 @@ impl ManagedClient {
|
||||
#[derive(Clone)]
|
||||
struct AsyncManagedClient {
|
||||
client: Shared<BoxFuture<'static, Result<ManagedClient, StartupOutcomeError>>>,
|
||||
startup_snapshot: Option<Vec<ToolInfo>>,
|
||||
startup_complete: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl AsyncManagedClient {
|
||||
@@ -366,63 +289,33 @@ impl AsyncManagedClient {
|
||||
cancel_token: CancellationToken,
|
||||
tx_event: Sender<Event>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
) -> Self {
|
||||
let tool_filter = ToolFilter::from_config(&config);
|
||||
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
|
||||
&server_name,
|
||||
codex_apps_tools_cache_context.as_ref(),
|
||||
)
|
||||
.map(|tools| filter_tools(tools, &tool_filter));
|
||||
let startup_tool_filter = tool_filter;
|
||||
let startup_complete = Arc::new(AtomicBool::new(false));
|
||||
let startup_complete_for_fut = Arc::clone(&startup_complete);
|
||||
let fut = async move {
|
||||
let outcome = async {
|
||||
if let Err(error) = validate_mcp_server_name(&server_name) {
|
||||
return Err(error.into());
|
||||
}
|
||||
|
||||
let client =
|
||||
Arc::new(make_rmcp_client(&server_name, config.transport, store_mode).await?);
|
||||
match start_server_task(
|
||||
server_name,
|
||||
client,
|
||||
StartServerTaskParams {
|
||||
startup_timeout: config
|
||||
.startup_timeout_sec
|
||||
.or(Some(DEFAULT_STARTUP_TIMEOUT)),
|
||||
tool_timeout: config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT),
|
||||
tool_filter: startup_tool_filter,
|
||||
tx_event,
|
||||
elicitation_requests,
|
||||
codex_apps_tools_cache_context,
|
||||
},
|
||||
)
|
||||
.or_cancel(&cancel_token)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled),
|
||||
}
|
||||
if let Err(error) = validate_mcp_server_name(&server_name) {
|
||||
return Err(error.into());
|
||||
}
|
||||
.await;
|
||||
|
||||
startup_complete_for_fut.store(true, Ordering::Release);
|
||||
outcome
|
||||
let client =
|
||||
Arc::new(make_rmcp_client(&server_name, config.transport, store_mode).await?);
|
||||
match start_server_task(
|
||||
server_name,
|
||||
client,
|
||||
config.startup_timeout_sec.or(Some(DEFAULT_STARTUP_TIMEOUT)),
|
||||
config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT),
|
||||
tool_filter,
|
||||
tx_event,
|
||||
elicitation_requests,
|
||||
)
|
||||
.or_cancel(&cancel_token)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled),
|
||||
}
|
||||
};
|
||||
let client = fut.boxed().shared();
|
||||
if startup_snapshot.is_some() {
|
||||
let startup_task = client.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = startup_task.await;
|
||||
});
|
||||
}
|
||||
|
||||
Self {
|
||||
client,
|
||||
startup_snapshot,
|
||||
startup_complete,
|
||||
client: fut.boxed().shared(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -430,24 +323,6 @@ impl AsyncManagedClient {
|
||||
self.client.clone().await
|
||||
}
|
||||
|
||||
fn startup_snapshot_while_initializing(&self) -> Option<Vec<ToolInfo>> {
|
||||
if !self.startup_complete.load(Ordering::Acquire) {
|
||||
return self.startup_snapshot.clone();
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
async fn listed_tools(&self) -> Option<Vec<ToolInfo>> {
|
||||
if let Some(startup_tools) = self.startup_snapshot_while_initializing() {
|
||||
return Some(startup_tools);
|
||||
}
|
||||
|
||||
match self.client().await {
|
||||
Ok(client) => Some(client.listed_tools()),
|
||||
Err(_) => self.startup_snapshot.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn notify_sandbox_state_change(&self, sandbox_state: &SandboxState) -> Result<()> {
|
||||
let managed = self.client().await?;
|
||||
managed.notify_sandbox_state_change(sandbox_state).await
|
||||
@@ -477,16 +352,6 @@ pub(crate) struct McpConnectionManager {
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
}
|
||||
|
||||
pub(crate) struct McpConnectionManagerInitializeParams {
|
||||
pub(crate) store_mode: OAuthCredentialsStoreMode,
|
||||
pub(crate) auth_entries: HashMap<String, McpAuthStatusEntry>,
|
||||
pub(crate) tx_event: Sender<Event>,
|
||||
pub(crate) cancel_token: CancellationToken,
|
||||
pub(crate) initial_sandbox_state: SandboxState,
|
||||
pub(crate) codex_home: PathBuf,
|
||||
pub(crate) codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
|
||||
}
|
||||
|
||||
impl McpConnectionManager {
|
||||
pub(crate) fn has_servers(&self) -> bool {
|
||||
!self.clients.is_empty()
|
||||
@@ -495,17 +360,12 @@ impl McpConnectionManager {
|
||||
pub async fn initialize(
|
||||
&mut self,
|
||||
mcp_servers: &HashMap<String, McpServerConfig>,
|
||||
params: McpConnectionManagerInitializeParams,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
auth_entries: HashMap<String, McpAuthStatusEntry>,
|
||||
tx_event: Sender<Event>,
|
||||
cancel_token: CancellationToken,
|
||||
initial_sandbox_state: SandboxState,
|
||||
) {
|
||||
let McpConnectionManagerInitializeParams {
|
||||
store_mode,
|
||||
auth_entries,
|
||||
tx_event,
|
||||
cancel_token,
|
||||
initial_sandbox_state,
|
||||
codex_home,
|
||||
codex_apps_tools_cache_key,
|
||||
} = params;
|
||||
if cancel_token.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
@@ -523,14 +383,6 @@ impl McpConnectionManager {
|
||||
},
|
||||
)
|
||||
.await;
|
||||
let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
Some(CodexAppsToolsCacheContext {
|
||||
codex_home: codex_home.clone(),
|
||||
user_key: codex_apps_tools_cache_key.clone(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let async_managed_client = AsyncManagedClient::new(
|
||||
server_name.clone(),
|
||||
cfg,
|
||||
@@ -538,7 +390,6 @@ impl McpConnectionManager {
|
||||
cancel_token.clone(),
|
||||
tx_event.clone(),
|
||||
elicitation_requests.clone(),
|
||||
codex_apps_tools_cache_context,
|
||||
);
|
||||
clients.insert(server_name.clone(), async_managed_client.clone());
|
||||
let tx_event = tx_event.clone();
|
||||
@@ -671,11 +522,29 @@ impl McpConnectionManager {
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub async fn list_all_tools(&self) -> HashMap<String, ToolInfo> {
|
||||
let mut tools = HashMap::new();
|
||||
for managed_client in self.clients.values() {
|
||||
let Some(server_tools) = managed_client.listed_tools().await else {
|
||||
continue;
|
||||
};
|
||||
tools.extend(qualify_tools(server_tools));
|
||||
for (server_name, managed_client) in &self.clients {
|
||||
let client = managed_client.client().await.ok();
|
||||
if let Some(client) = client {
|
||||
let rmcp_client = client.client;
|
||||
let tool_timeout = client.tool_timeout;
|
||||
let tool_filter = client.tool_filter;
|
||||
let mut server_tools = client.tools;
|
||||
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
match list_tools_for_client(server_name, &rmcp_client, tool_timeout).await {
|
||||
Ok(fresh_or_cached_tools) => {
|
||||
server_tools = fresh_or_cached_tools;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Failed to refresh tools for MCP server '{server_name}', using startup snapshot: {err:#}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tools.extend(qualify_tools(filter_tools(server_tools, tool_filter)));
|
||||
}
|
||||
}
|
||||
tools
|
||||
}
|
||||
@@ -693,8 +562,6 @@ impl McpConnectionManager {
|
||||
.await
|
||||
.context("failed to get client")?;
|
||||
|
||||
let list_start = Instant::now();
|
||||
let fetch_start = Instant::now();
|
||||
let tools = list_tools_for_client_uncached(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
&managed_client.client,
|
||||
@@ -704,22 +571,8 @@ impl McpConnectionManager {
|
||||
.with_context(|| {
|
||||
format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'")
|
||||
})?;
|
||||
emit_duration(
|
||||
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
|
||||
fetch_start.elapsed(),
|
||||
&[],
|
||||
);
|
||||
|
||||
write_cached_codex_apps_tools_if_needed(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
managed_client.codex_apps_tools_cache_context.as_ref(),
|
||||
&tools,
|
||||
);
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
list_start.elapsed(),
|
||||
&[("cache", "miss")],
|
||||
);
|
||||
write_cached_codex_apps_tools(&tools);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1028,7 +881,7 @@ impl ToolFilter {
|
||||
}
|
||||
}
|
||||
|
||||
fn filter_tools(tools: Vec<ToolInfo>, filter: &ToolFilter) -> Vec<ToolInfo> {
|
||||
fn filter_tools(tools: Vec<ToolInfo>, filter: ToolFilter) -> Vec<ToolInfo> {
|
||||
tools
|
||||
.into_iter()
|
||||
.filter(|tool| filter.allows(&tool.tool_name))
|
||||
@@ -1156,16 +1009,12 @@ impl From<anyhow::Error> for StartupOutcomeError {
|
||||
async fn start_server_task(
|
||||
server_name: String,
|
||||
client: Arc<RmcpClient>,
|
||||
params: StartServerTaskParams,
|
||||
startup_timeout: Option<Duration>, // TODO: cancel_token should handle this.
|
||||
tool_timeout: Duration,
|
||||
tool_filter: ToolFilter,
|
||||
tx_event: Sender<Event>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
) -> Result<ManagedClient, StartupOutcomeError> {
|
||||
let StartServerTaskParams {
|
||||
startup_timeout,
|
||||
tool_timeout,
|
||||
tool_filter,
|
||||
tx_event,
|
||||
elicitation_requests,
|
||||
codex_apps_tools_cache_context,
|
||||
} = params;
|
||||
let params = InitializeRequestParams {
|
||||
meta: None,
|
||||
capabilities: ClientCapabilities {
|
||||
@@ -1201,29 +1050,9 @@ async fn start_server_task(
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)?;
|
||||
|
||||
let list_start = Instant::now();
|
||||
let fetch_start = Instant::now();
|
||||
let tools = list_tools_for_client_uncached(&server_name, &client, startup_timeout)
|
||||
let tools = list_tools_for_client(&server_name, &client, startup_timeout)
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)?;
|
||||
emit_duration(
|
||||
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
|
||||
fetch_start.elapsed(),
|
||||
&[],
|
||||
);
|
||||
write_cached_codex_apps_tools_if_needed(
|
||||
&server_name,
|
||||
codex_apps_tools_cache_context.as_ref(),
|
||||
&tools,
|
||||
);
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
list_start.elapsed(),
|
||||
&[("cache", "miss")],
|
||||
);
|
||||
}
|
||||
let tools = filter_tools(tools, &tool_filter);
|
||||
|
||||
let server_supports_sandbox_state_capability = initialize_result
|
||||
.capabilities
|
||||
@@ -1238,21 +1067,11 @@ async fn start_server_task(
|
||||
tool_timeout: Some(tool_timeout),
|
||||
tool_filter,
|
||||
server_supports_sandbox_state_capability,
|
||||
codex_apps_tools_cache_context,
|
||||
};
|
||||
|
||||
Ok(managed)
|
||||
}
|
||||
|
||||
struct StartServerTaskParams {
|
||||
startup_timeout: Option<Duration>, // TODO: cancel_token should handle this.
|
||||
tool_timeout: Duration,
|
||||
tool_filter: ToolFilter,
|
||||
tx_event: Sender<Event>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
}
|
||||
|
||||
async fn make_rmcp_client(
|
||||
server_name: &str,
|
||||
transport: McpServerTransportConfig,
|
||||
@@ -1297,99 +1116,80 @@ async fn make_rmcp_client(
|
||||
}
|
||||
}
|
||||
|
||||
fn write_cached_codex_apps_tools_if_needed(
|
||||
async fn list_tools_for_client(
|
||||
server_name: &str,
|
||||
cache_context: Option<&CodexAppsToolsCacheContext>,
|
||||
tools: &[ToolInfo],
|
||||
) {
|
||||
if server_name != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return;
|
||||
client: &Arc<RmcpClient>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Vec<ToolInfo>> {
|
||||
let total_start = Instant::now();
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME
|
||||
&& let Some(cached_tools) = read_cached_codex_apps_tools()
|
||||
{
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
total_start.elapsed(),
|
||||
&[("cache", "hit")],
|
||||
);
|
||||
return Ok(cached_tools);
|
||||
}
|
||||
|
||||
if let Some(cache_context) = cache_context {
|
||||
let fetch_start = Instant::now();
|
||||
let tools = list_tools_for_client_uncached(server_name, client, timeout).await?;
|
||||
emit_duration(
|
||||
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
|
||||
fetch_start.elapsed(),
|
||||
&[],
|
||||
);
|
||||
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
let cache_write_start = Instant::now();
|
||||
write_cached_codex_apps_tools(cache_context, tools);
|
||||
write_cached_codex_apps_tools(&tools);
|
||||
emit_duration(
|
||||
MCP_TOOLS_CACHE_WRITE_DURATION_METRIC,
|
||||
cache_write_start.elapsed(),
|
||||
&[],
|
||||
);
|
||||
}
|
||||
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
emit_duration(
|
||||
MCP_TOOLS_LIST_DURATION_METRIC,
|
||||
total_start.elapsed(),
|
||||
&[("cache", "miss")],
|
||||
);
|
||||
}
|
||||
Ok(tools)
|
||||
}
|
||||
|
||||
fn load_startup_cached_codex_apps_tools_snapshot(
|
||||
server_name: &str,
|
||||
cache_context: Option<&CodexAppsToolsCacheContext>,
|
||||
) -> Option<Vec<ToolInfo>> {
|
||||
if server_name != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return None;
|
||||
}
|
||||
fn read_cached_codex_apps_tools() -> Option<Vec<ToolInfo>> {
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let now = Instant::now();
|
||||
|
||||
let cache_context = cache_context?;
|
||||
|
||||
match load_cached_codex_apps_tools(cache_context) {
|
||||
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
|
||||
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn read_cached_codex_apps_tools(
|
||||
cache_context: &CodexAppsToolsCacheContext,
|
||||
) -> Option<Vec<ToolInfo>> {
|
||||
match load_cached_codex_apps_tools(cache_context) {
|
||||
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
|
||||
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn load_cached_codex_apps_tools(
|
||||
cache_context: &CodexAppsToolsCacheContext,
|
||||
) -> CachedCodexAppsToolsLoad {
|
||||
let cache_path = cache_context.cache_path();
|
||||
let bytes = match std::fs::read(cache_path) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
return CachedCodexAppsToolsLoad::Missing;
|
||||
}
|
||||
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
|
||||
};
|
||||
let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) {
|
||||
Ok(cache) => cache,
|
||||
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
|
||||
};
|
||||
if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION {
|
||||
return CachedCodexAppsToolsLoad::Invalid;
|
||||
}
|
||||
CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools))
|
||||
}
|
||||
|
||||
fn write_cached_codex_apps_tools(cache_context: &CodexAppsToolsCacheContext, tools: &[ToolInfo]) {
|
||||
let cache_path = cache_context.cache_path();
|
||||
if let Some(parent) = cache_path.parent()
|
||||
&& std::fs::create_dir_all(parent).is_err()
|
||||
if let Some(cached) = cache_guard.as_ref()
|
||||
&& now < cached.expires_at
|
||||
{
|
||||
return;
|
||||
return Some(cached.tools.clone());
|
||||
}
|
||||
let tools = filter_disallowed_codex_apps_tools(tools.to_vec());
|
||||
let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache {
|
||||
schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION,
|
||||
tools,
|
||||
}) else {
|
||||
return;
|
||||
};
|
||||
let _ = std::fs::write(cache_path, bytes);
|
||||
|
||||
if cache_guard
|
||||
.as_ref()
|
||||
.is_some_and(|cached| now >= cached.expires_at)
|
||||
{
|
||||
*cache_guard = None;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn filter_disallowed_codex_apps_tools(tools: Vec<ToolInfo>) -> Vec<ToolInfo> {
|
||||
tools
|
||||
.into_iter()
|
||||
.filter(|tool| {
|
||||
tool.connector_id
|
||||
.as_deref()
|
||||
.is_none_or(is_connector_id_allowed)
|
||||
})
|
||||
.collect()
|
||||
fn write_cached_codex_apps_tools(tools: &[ToolInfo]) {
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*cache_guard = Some(CachedCodexAppsTools {
|
||||
expires_at: Instant::now() + CODEX_APPS_TOOLS_CACHE_TTL,
|
||||
tools: tools.to_vec(),
|
||||
});
|
||||
}
|
||||
|
||||
fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) {
|
||||
@@ -1404,7 +1204,7 @@ async fn list_tools_for_client_uncached(
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Vec<ToolInfo>> {
|
||||
let resp = client.list_tools_with_connector_ids(None, timeout).await?;
|
||||
let tools = resp
|
||||
Ok(resp
|
||||
.tools
|
||||
.into_iter()
|
||||
.map(|tool| {
|
||||
@@ -1425,11 +1225,7 @@ async fn list_tools_for_client_uncached(
|
||||
connector_name,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
return Ok(filter_disallowed_codex_apps_tools(tools));
|
||||
}
|
||||
Ok(tools)
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn validate_mcp_server_name(server_name: &str) -> Result<()> {
|
||||
@@ -1516,7 +1312,6 @@ mod tests {
|
||||
use rmcp::model::JsonObject;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempdir;
|
||||
|
||||
fn create_test_tool(server_name: &str, tool_name: &str) -> ToolInfo {
|
||||
ToolInfo {
|
||||
@@ -1538,31 +1333,19 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_test_tool_with_connector(
|
||||
server_name: &str,
|
||||
tool_name: &str,
|
||||
connector_id: &str,
|
||||
connector_name: Option<&str>,
|
||||
) -> ToolInfo {
|
||||
let mut tool = create_test_tool(server_name, tool_name);
|
||||
tool.connector_id = Some(connector_id.to_string());
|
||||
tool.connector_name = connector_name.map(ToOwned::to_owned);
|
||||
tool
|
||||
}
|
||||
|
||||
fn create_codex_apps_tools_cache_context(
|
||||
codex_home: PathBuf,
|
||||
account_id: Option<&str>,
|
||||
chatgpt_user_id: Option<&str>,
|
||||
) -> CodexAppsToolsCacheContext {
|
||||
CodexAppsToolsCacheContext {
|
||||
codex_home,
|
||||
user_key: CodexAppsToolsCacheKey {
|
||||
account_id: account_id.map(ToOwned::to_owned),
|
||||
chatgpt_user_id: chatgpt_user_id.map(ToOwned::to_owned),
|
||||
is_workspace_account: false,
|
||||
},
|
||||
}
|
||||
fn with_clean_codex_apps_tools_cache<T>(f: impl FnOnce() -> T) -> T {
|
||||
let previous_cache = {
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
cache_guard.take()
|
||||
};
|
||||
let result = f();
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*cache_guard = previous_cache;
|
||||
result
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1707,9 +1490,9 @@ mod tests {
|
||||
disabled: HashSet::from(["tool_a".to_string()]),
|
||||
};
|
||||
|
||||
let filtered: Vec<_> = filter_tools(server1_tools, &server1_filter)
|
||||
let filtered: Vec<_> = filter_tools(server1_tools, server1_filter)
|
||||
.into_iter()
|
||||
.chain(filter_tools(server2_tools, &server2_filter))
|
||||
.chain(filter_tools(server2_tools, server2_filter))
|
||||
.collect();
|
||||
|
||||
assert_eq!(filtered.len(), 1);
|
||||
@@ -1719,256 +1502,43 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tools_cache_is_overwritten_by_last_write() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")];
|
||||
let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")];
|
||||
with_clean_codex_apps_tools_cache(|| {
|
||||
let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")];
|
||||
let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")];
|
||||
|
||||
write_cached_codex_apps_tools(&cache_context, &tools_gateway_1);
|
||||
let cached_gateway_1 = read_cached_codex_apps_tools(&cache_context)
|
||||
.expect("cache entry exists for first write");
|
||||
assert_eq!(cached_gateway_1[0].tool_name, "one");
|
||||
write_cached_codex_apps_tools(&tools_gateway_1);
|
||||
let cached_gateway_1 =
|
||||
read_cached_codex_apps_tools().expect("cache entry exists for first write");
|
||||
assert_eq!(cached_gateway_1[0].tool_name, "one");
|
||||
|
||||
write_cached_codex_apps_tools(&cache_context, &tools_gateway_2);
|
||||
let cached_gateway_2 = read_cached_codex_apps_tools(&cache_context)
|
||||
.expect("cache entry exists for second write");
|
||||
assert_eq!(cached_gateway_2[0].tool_name, "two");
|
||||
write_cached_codex_apps_tools(&tools_gateway_2);
|
||||
let cached_gateway_2 =
|
||||
read_cached_codex_apps_tools().expect("cache entry exists for second write");
|
||||
assert_eq!(cached_gateway_2[0].tool_name, "two");
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tools_cache_is_scoped_per_user() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context_user_1 = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let cache_context_user_2 = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-two"),
|
||||
Some("user-two"),
|
||||
);
|
||||
let tools_user_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")];
|
||||
let tools_user_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")];
|
||||
fn codex_apps_tools_cache_is_cleared_when_expired() {
|
||||
with_clean_codex_apps_tools_cache(|| {
|
||||
let tools = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "stale_tool")];
|
||||
write_cached_codex_apps_tools(&tools);
|
||||
|
||||
write_cached_codex_apps_tools(&cache_context_user_1, &tools_user_1);
|
||||
write_cached_codex_apps_tools(&cache_context_user_2, &tools_user_2);
|
||||
{
|
||||
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
cache_guard.as_mut().expect("cache exists").expires_at =
|
||||
Instant::now() - Duration::from_secs(1);
|
||||
}
|
||||
|
||||
let read_user_1 =
|
||||
read_cached_codex_apps_tools(&cache_context_user_1).expect("cache entry for user one");
|
||||
let read_user_2 =
|
||||
read_cached_codex_apps_tools(&cache_context_user_2).expect("cache entry for user two");
|
||||
assert!(read_cached_codex_apps_tools().is_none());
|
||||
|
||||
assert_eq!(read_user_1[0].tool_name, "one");
|
||||
assert_eq!(read_user_2[0].tool_name, "two");
|
||||
assert_ne!(
|
||||
cache_context_user_1.cache_path(),
|
||||
cache_context_user_2.cache_path(),
|
||||
"each user should get an isolated cache file"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tools_cache_filters_disallowed_connectors() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let tools = vec![
|
||||
create_test_tool_with_connector(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
"blocked_tool",
|
||||
"connector_openai_hidden",
|
||||
Some("Hidden"),
|
||||
),
|
||||
create_test_tool_with_connector(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
"allowed_tool",
|
||||
"calendar",
|
||||
Some("Calendar"),
|
||||
),
|
||||
];
|
||||
|
||||
write_cached_codex_apps_tools(&cache_context, &tools);
|
||||
let cached =
|
||||
read_cached_codex_apps_tools(&cache_context).expect("cache entry exists for user");
|
||||
|
||||
assert_eq!(cached.len(), 1);
|
||||
assert_eq!(cached[0].tool_name, "allowed_tool");
|
||||
assert_eq!(cached[0].connector_id.as_deref(), Some("calendar"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tools_cache_is_ignored_when_schema_version_mismatches() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let cache_path = cache_context.cache_path();
|
||||
if let Some(parent) = cache_path.parent() {
|
||||
std::fs::create_dir_all(parent).expect("create parent");
|
||||
}
|
||||
let bytes = serde_json::to_vec_pretty(&serde_json::json!({
|
||||
"schema_version": CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION + 1,
|
||||
"tools": [create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")],
|
||||
}))
|
||||
.expect("serialize");
|
||||
std::fs::write(cache_path, bytes).expect("write");
|
||||
|
||||
assert!(read_cached_codex_apps_tools(&cache_context).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tools_cache_is_ignored_when_json_is_invalid() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let cache_path = cache_context.cache_path();
|
||||
if let Some(parent) = cache_path.parent() {
|
||||
std::fs::create_dir_all(parent).expect("create parent");
|
||||
}
|
||||
std::fs::write(cache_path, b"{not json").expect("write");
|
||||
|
||||
assert!(read_cached_codex_apps_tools(&cache_context).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn startup_cached_codex_apps_tools_loads_from_disk_cache() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let cache_context = create_codex_apps_tools_cache_context(
|
||||
codex_home.path().to_path_buf(),
|
||||
Some("account-one"),
|
||||
Some("user-one"),
|
||||
);
|
||||
let cached_tools = vec![create_test_tool(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
"calendar_search",
|
||||
)];
|
||||
write_cached_codex_apps_tools(&cache_context, &cached_tools);
|
||||
|
||||
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
Some(&cache_context),
|
||||
);
|
||||
let startup_tools = startup_snapshot.expect("expected startup snapshot to load from cache");
|
||||
|
||||
assert_eq!(startup_tools.len(), 1);
|
||||
assert_eq!(startup_tools[0].server_name, CODEX_APPS_MCP_SERVER_NAME);
|
||||
assert_eq!(startup_tools[0].tool_name, "calendar_search");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_all_tools_uses_startup_snapshot_while_client_is_pending() {
|
||||
let startup_tools = vec![create_test_tool(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
"calendar_create_event",
|
||||
)];
|
||||
let pending_client =
|
||||
futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
|
||||
.boxed()
|
||||
.shared();
|
||||
let mut manager = McpConnectionManager::default();
|
||||
manager.clients.insert(
|
||||
CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
AsyncManagedClient {
|
||||
client: pending_client,
|
||||
startup_snapshot: Some(startup_tools),
|
||||
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
},
|
||||
);
|
||||
|
||||
let tools = manager.list_all_tools().await;
|
||||
let tool = tools
|
||||
.get("mcp__codex_apps__calendar_create_event")
|
||||
.expect("tool from startup cache");
|
||||
assert_eq!(tool.server_name, CODEX_APPS_MCP_SERVER_NAME);
|
||||
assert_eq!(tool.tool_name, "calendar_create_event");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_all_tools_blocks_while_client_is_pending_without_startup_snapshot() {
|
||||
let pending_client =
|
||||
futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
|
||||
.boxed()
|
||||
.shared();
|
||||
let mut manager = McpConnectionManager::default();
|
||||
manager.clients.insert(
|
||||
CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
AsyncManagedClient {
|
||||
client: pending_client,
|
||||
startup_snapshot: None,
|
||||
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
},
|
||||
);
|
||||
|
||||
let timeout_result =
|
||||
tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await;
|
||||
assert!(timeout_result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_all_tools_does_not_block_when_startup_snapshot_cache_hit_is_empty() {
|
||||
let pending_client =
|
||||
futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
|
||||
.boxed()
|
||||
.shared();
|
||||
let mut manager = McpConnectionManager::default();
|
||||
manager.clients.insert(
|
||||
CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
AsyncManagedClient {
|
||||
client: pending_client,
|
||||
startup_snapshot: Some(Vec::new()),
|
||||
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
},
|
||||
);
|
||||
|
||||
let timeout_result =
|
||||
tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await;
|
||||
let tools = timeout_result.expect("cache-hit startup snapshot should not block");
|
||||
assert!(tools.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_all_tools_uses_startup_snapshot_when_client_startup_fails() {
|
||||
let startup_tools = vec![create_test_tool(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
"calendar_create_event",
|
||||
)];
|
||||
let failed_client = futures::future::ready::<Result<ManagedClient, StartupOutcomeError>>(
|
||||
Err(StartupOutcomeError::Failed {
|
||||
error: "startup failed".to_string(),
|
||||
}),
|
||||
)
|
||||
.boxed()
|
||||
.shared();
|
||||
let mut manager = McpConnectionManager::default();
|
||||
let startup_complete = Arc::new(std::sync::atomic::AtomicBool::new(true));
|
||||
manager.clients.insert(
|
||||
CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
AsyncManagedClient {
|
||||
client: failed_client,
|
||||
startup_snapshot: Some(startup_tools),
|
||||
startup_complete,
|
||||
},
|
||||
);
|
||||
|
||||
let tools = manager.list_all_tools().await;
|
||||
let tool = tools
|
||||
.get("mcp__codex_apps__calendar_create_event")
|
||||
.expect("tool from startup cache");
|
||||
assert_eq!(tool.server_name, CODEX_APPS_MCP_SERVER_NAME);
|
||||
assert_eq!(tool.tool_name, "calendar_create_event");
|
||||
let cache_guard = CODEX_APPS_TOOLS_CACHE
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
assert!(cache_guard.is_none());
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
|
||||
/// Helpers for identifying model-visible "session prefix" messages.
|
||||
///
|
||||
/// A session prefix is a user-role message that carries configuration or state needed by
|
||||
@@ -6,10 +8,41 @@
|
||||
/// boundaries.
|
||||
pub(crate) const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
|
||||
pub(crate) const TURN_ABORTED_OPEN_TAG: &str = "<turn_aborted>";
|
||||
pub(crate) const SUBAGENT_NOTIFICATION_OPEN_TAG: &str = "<subagent_notification>";
|
||||
pub(crate) const SUBAGENT_NOTIFICATION_CLOSE_TAG: &str = "</subagent_notification>";
|
||||
|
||||
fn starts_with_ascii_case_insensitive(text: &str, prefix: &str) -> bool {
|
||||
text.get(..prefix.len())
|
||||
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(prefix))
|
||||
}
|
||||
|
||||
/// Returns true if `text` starts with a session prefix marker (case-insensitive).
|
||||
pub(crate) fn is_session_prefix(text: &str) -> bool {
|
||||
let trimmed = text.trim_start();
|
||||
let lowered = trimmed.to_ascii_lowercase();
|
||||
lowered.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG) || lowered.starts_with(TURN_ABORTED_OPEN_TAG)
|
||||
starts_with_ascii_case_insensitive(trimmed, ENVIRONMENT_CONTEXT_OPEN_TAG)
|
||||
|| starts_with_ascii_case_insensitive(trimmed, TURN_ABORTED_OPEN_TAG)
|
||||
|| starts_with_ascii_case_insensitive(trimmed, SUBAGENT_NOTIFICATION_OPEN_TAG)
|
||||
}
|
||||
|
||||
pub(crate) fn format_subagent_notification_message(agent_id: &str, status: &AgentStatus) -> String {
|
||||
let payload_json = serde_json::json!({
|
||||
"agent_id": agent_id,
|
||||
"status": status,
|
||||
})
|
||||
.to_string();
|
||||
format!("{SUBAGENT_NOTIFICATION_OPEN_TAG}\n{payload_json}\n{SUBAGENT_NOTIFICATION_CLOSE_TAG}")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn is_session_prefix_is_case_insensitive() {
|
||||
assert_eq!(
|
||||
is_session_prefix("<SUBAGENT_NOTIFICATION>{}</subagent_notification>"),
|
||||
true
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,7 +427,7 @@ mod resume_agent {
|
||||
}
|
||||
}
|
||||
|
||||
mod wait {
|
||||
pub(crate) mod wait {
|
||||
use super::*;
|
||||
use crate::agent::status::is_final;
|
||||
use futures::FutureExt;
|
||||
@@ -447,10 +447,10 @@ mod wait {
|
||||
timeout_ms: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct WaitResult {
|
||||
status: HashMap<ThreadId, AgentStatus>,
|
||||
timed_out: bool,
|
||||
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub(crate) struct WaitResult {
|
||||
pub(crate) status: HashMap<ThreadId, AgentStatus>,
|
||||
pub(crate) timed_out: bool,
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
@@ -1462,12 +1462,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, PartialEq, Eq)]
|
||||
struct WaitResult {
|
||||
status: HashMap<ThreadId, AgentStatus>,
|
||||
timed_out: bool,
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_rejects_non_positive_timeout() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
@@ -1553,11 +1547,11 @@ mod tests {
|
||||
else {
|
||||
panic!("expected function output");
|
||||
};
|
||||
let result: WaitResult =
|
||||
let result: wait::WaitResult =
|
||||
serde_json::from_str(&content).expect("wait result should be json");
|
||||
assert_eq!(
|
||||
result,
|
||||
WaitResult {
|
||||
wait::WaitResult {
|
||||
status: HashMap::from([
|
||||
(id_a, AgentStatus::NotFound),
|
||||
(id_b, AgentStatus::NotFound),
|
||||
@@ -1597,11 +1591,11 @@ mod tests {
|
||||
else {
|
||||
panic!("expected function output");
|
||||
};
|
||||
let result: WaitResult =
|
||||
let result: wait::WaitResult =
|
||||
serde_json::from_str(&content).expect("wait result should be json");
|
||||
assert_eq!(
|
||||
result,
|
||||
WaitResult {
|
||||
wait::WaitResult {
|
||||
status: HashMap::new(),
|
||||
timed_out: true
|
||||
}
|
||||
@@ -1694,11 +1688,11 @@ mod tests {
|
||||
else {
|
||||
panic!("expected function output");
|
||||
};
|
||||
let result: WaitResult =
|
||||
let result: wait::WaitResult =
|
||||
serde_json::from_str(&content).expect("wait result should be json");
|
||||
assert_eq!(
|
||||
result,
|
||||
WaitResult {
|
||||
wait::WaitResult {
|
||||
status: HashMap::from([(agent_id, AgentStatus::Shutdown)]),
|
||||
timed_out: false
|
||||
}
|
||||
|
||||
@@ -646,7 +646,7 @@ fn create_wait_tool() -> ToolSpec {
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "wait".to_string(),
|
||||
description: "Wait for agents to reach a final status. Completed statuses may include the agent's final message. Returns empty status when timed out."
|
||||
description: "Wait for agents to reach a final status. Completed statuses may include the agent's final message. Returns empty status when timed out. Once the agent reaches his final status, a notification message will be received containing the same completed status."
|
||||
.to_string(),
|
||||
strict: false,
|
||||
parameters: JsonSchema::Object {
|
||||
|
||||
@@ -50,7 +50,7 @@ async fn emits_deprecation_notice_for_legacy_feature_flag() -> anyhow::Result<()
|
||||
assert_eq!(
|
||||
details.as_deref(),
|
||||
Some(
|
||||
"Enable it with `--enable unified_exec` or `[features].unified_exec` in config.toml. See https://github.com/openai/codex/blob/main/docs/config.md#feature-flags for details."
|
||||
"Enable it with `--enable unified_exec` or `[features].unified_exec` in config.toml. See https://developers.openai.com/codex/config-basic#feature-flags for details."
|
||||
),
|
||||
);
|
||||
|
||||
|
||||
@@ -114,6 +114,7 @@ mod skills;
|
||||
mod sqlite_state;
|
||||
mod stream_error_allows_next_turn;
|
||||
mod stream_no_completed;
|
||||
mod subagent_notifications;
|
||||
mod text_encoding_fix;
|
||||
mod tool_harness;
|
||||
mod tool_parallelism;
|
||||
|
||||
196
codex-rs/core/tests/suite/subagent_notifications.rs
Normal file
196
codex-rs/core/tests/suite/subagent_notifications.rs
Normal file
@@ -0,0 +1,196 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::features::Feature;
|
||||
use core_test_support::responses::ResponsesRequest;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_response_once_match;
|
||||
use core_test_support::responses::mount_sse_once_match;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::sse_response;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use serde_json::json;
|
||||
use std::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::sleep;
|
||||
use wiremock::MockServer;
|
||||
|
||||
const SPAWN_CALL_ID: &str = "spawn-call-1";
|
||||
const TURN_1_PROMPT: &str = "spawn a child and continue";
|
||||
const TURN_2_NO_WAIT_PROMPT: &str = "follow up without wait";
|
||||
const CHILD_PROMPT: &str = "child: do work";
|
||||
|
||||
fn body_contains(req: &wiremock::Request, text: &str) -> bool {
|
||||
let is_zstd = req
|
||||
.headers
|
||||
.get("content-encoding")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.is_some_and(|value| {
|
||||
value
|
||||
.split(',')
|
||||
.any(|entry| entry.trim().eq_ignore_ascii_case("zstd"))
|
||||
});
|
||||
let bytes = if is_zstd {
|
||||
zstd::stream::decode_all(std::io::Cursor::new(&req.body)).ok()
|
||||
} else {
|
||||
Some(req.body.clone())
|
||||
};
|
||||
bytes
|
||||
.and_then(|body| String::from_utf8(body).ok())
|
||||
.is_some_and(|body| body.contains(text))
|
||||
}
|
||||
|
||||
fn has_subagent_notification(req: &ResponsesRequest) -> bool {
|
||||
req.message_input_texts("user")
|
||||
.iter()
|
||||
.any(|text| text.contains("<subagent_notification>"))
|
||||
}
|
||||
|
||||
async fn wait_for_spawned_thread_id(test: &TestCodex) -> Result<String> {
|
||||
let deadline = Instant::now() + Duration::from_secs(2);
|
||||
loop {
|
||||
let ids = test.thread_manager.list_thread_ids().await;
|
||||
if let Some(spawned_id) = ids
|
||||
.iter()
|
||||
.find(|id| **id != test.session_configured.session_id)
|
||||
{
|
||||
return Ok(spawned_id.to_string());
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!("timed out waiting for spawned thread id");
|
||||
}
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_requests(
|
||||
mock: &core_test_support::responses::ResponseMock,
|
||||
) -> Result<Vec<ResponsesRequest>> {
|
||||
let deadline = Instant::now() + Duration::from_secs(2);
|
||||
loop {
|
||||
let requests = mock.requests();
|
||||
if !requests.is_empty() {
|
||||
return Ok(requests);
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!("expected at least 1 request, got {}", requests.len());
|
||||
}
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn setup_turn_one_with_spawned_child(
|
||||
server: &MockServer,
|
||||
child_response_delay: Option<Duration>,
|
||||
) -> Result<(TestCodex, String)> {
|
||||
let spawn_args = serde_json::to_string(&json!({
|
||||
"message": CHILD_PROMPT,
|
||||
}))?;
|
||||
|
||||
mount_sse_once_match(
|
||||
server,
|
||||
|req: &wiremock::Request| body_contains(req, TURN_1_PROMPT),
|
||||
sse(vec![
|
||||
ev_response_created("resp-turn1-1"),
|
||||
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
|
||||
ev_completed("resp-turn1-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let child_sse = sse(vec![
|
||||
ev_response_created("resp-child-1"),
|
||||
ev_assistant_message("msg-child-1", "child done"),
|
||||
ev_completed("resp-child-1"),
|
||||
]);
|
||||
let child_request_log = if let Some(delay) = child_response_delay {
|
||||
mount_response_once_match(
|
||||
server,
|
||||
|req: &wiremock::Request| {
|
||||
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
|
||||
},
|
||||
sse_response(child_sse).set_delay(delay),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
mount_sse_once_match(
|
||||
server,
|
||||
|req: &wiremock::Request| {
|
||||
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
|
||||
},
|
||||
child_sse,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
let _turn1_followup = mount_sse_once_match(
|
||||
server,
|
||||
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
|
||||
sse(vec![
|
||||
ev_response_created("resp-turn1-2"),
|
||||
ev_assistant_message("msg-turn1-2", "parent done"),
|
||||
ev_completed("resp-turn1-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::Collab);
|
||||
});
|
||||
let test = builder.build(server).await?;
|
||||
test.submit_turn(TURN_1_PROMPT).await?;
|
||||
if child_response_delay.is_none() {
|
||||
let _ = wait_for_requests(&child_request_log).await?;
|
||||
let rollout_path = test
|
||||
.codex
|
||||
.rollout_path()
|
||||
.ok_or_else(|| anyhow::anyhow!("expected parent rollout path"))?;
|
||||
let deadline = Instant::now() + Duration::from_secs(6);
|
||||
loop {
|
||||
let has_notification = tokio::fs::read_to_string(&rollout_path)
|
||||
.await
|
||||
.is_ok_and(|rollout| rollout.contains("<subagent_notification>"));
|
||||
if has_notification {
|
||||
break;
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!(
|
||||
"timed out waiting for parent rollout to include subagent notification"
|
||||
);
|
||||
}
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
let spawned_id = wait_for_spawned_thread_id(&test).await?;
|
||||
|
||||
Ok((test, spawned_id))
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn subagent_notification_is_included_without_wait() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let (test, _spawned_id) = setup_turn_one_with_spawned_child(&server, None).await?;
|
||||
|
||||
let turn2 = mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| body_contains(req, TURN_2_NO_WAIT_PROMPT),
|
||||
sse(vec![
|
||||
ev_response_created("resp-turn2-1"),
|
||||
ev_assistant_message("msg-turn2-1", "no wait path"),
|
||||
ev_completed("resp-turn2-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
test.submit_turn(TURN_2_NO_WAIT_PROMPT).await?;
|
||||
|
||||
let turn2_requests = wait_for_requests(&turn2).await?;
|
||||
assert!(turn2_requests.iter().any(has_subagent_notification));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -539,7 +539,6 @@ pub(crate) struct ChatWidget {
|
||||
mcp_startup_status: Option<HashMap<String, McpStartupStatus>>,
|
||||
connectors_cache: ConnectorsCacheState,
|
||||
connectors_prefetch_in_flight: bool,
|
||||
connectors_force_refetch_pending: bool,
|
||||
// Queue of interruptive UI events deferred during an active write cycle
|
||||
interrupts: InterruptManager,
|
||||
// Accumulates the current reasoning block text to extract a header
|
||||
@@ -2651,7 +2650,6 @@ impl ChatWidget {
|
||||
mcp_startup_status: None,
|
||||
connectors_cache: ConnectorsCacheState::default(),
|
||||
connectors_prefetch_in_flight: false,
|
||||
connectors_force_refetch_pending: false,
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
full_reasoning_buffer: String::new(),
|
||||
@@ -2815,7 +2813,6 @@ impl ChatWidget {
|
||||
mcp_startup_status: None,
|
||||
connectors_cache: ConnectorsCacheState::default(),
|
||||
connectors_prefetch_in_flight: false,
|
||||
connectors_force_refetch_pending: false,
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
full_reasoning_buffer: String::new(),
|
||||
@@ -2968,7 +2965,6 @@ impl ChatWidget {
|
||||
mcp_startup_status: None,
|
||||
connectors_cache: ConnectorsCacheState::default(),
|
||||
connectors_prefetch_in_flight: false,
|
||||
connectors_force_refetch_pending: false,
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
full_reasoning_buffer: String::new(),
|
||||
@@ -4603,13 +4599,7 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
fn prefetch_connectors_with_options(&mut self, force_refetch: bool) {
|
||||
if !self.connectors_enabled() {
|
||||
return;
|
||||
}
|
||||
if self.connectors_prefetch_in_flight {
|
||||
if force_refetch {
|
||||
self.connectors_force_refetch_pending = true;
|
||||
}
|
||||
if !self.connectors_enabled() || self.connectors_prefetch_in_flight {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -4621,8 +4611,8 @@ impl ChatWidget {
|
||||
let config = self.config.clone();
|
||||
let app_event_tx = self.app_event_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let accessible_result =
|
||||
match connectors::list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
let accessible_connectors =
|
||||
match connectors::list_accessible_connectors_from_mcp_tools_with_options(
|
||||
&config,
|
||||
force_refetch,
|
||||
)
|
||||
@@ -4637,9 +4627,6 @@ impl ChatWidget {
|
||||
return;
|
||||
}
|
||||
};
|
||||
let should_schedule_force_refetch =
|
||||
!force_refetch && !accessible_result.codex_apps_ready;
|
||||
let accessible_connectors = accessible_result.connectors;
|
||||
|
||||
app_event_tx.send(AppEvent::ConnectorsLoaded {
|
||||
result: Ok(ConnectorsSnapshot {
|
||||
@@ -4649,8 +4636,7 @@ impl ChatWidget {
|
||||
});
|
||||
|
||||
let result: Result<ConnectorsSnapshot, String> = async {
|
||||
let all_connectors =
|
||||
connectors::list_all_connectors_with_options(&config, force_refetch).await?;
|
||||
let all_connectors = connectors::list_all_connectors(&config).await?;
|
||||
let connectors = connectors::merge_connectors_with_accessible(
|
||||
all_connectors,
|
||||
accessible_connectors,
|
||||
@@ -4665,12 +4651,6 @@ impl ChatWidget {
|
||||
result,
|
||||
is_final: true,
|
||||
});
|
||||
|
||||
if should_schedule_force_refetch {
|
||||
app_event_tx.send(AppEvent::RefreshConnectors {
|
||||
force_refetch: true,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -6839,13 +6819,8 @@ impl ChatWidget {
|
||||
result: Result<ConnectorsSnapshot, String>,
|
||||
is_final: bool,
|
||||
) {
|
||||
let mut trigger_pending_force_refetch = false;
|
||||
if is_final {
|
||||
self.connectors_prefetch_in_flight = false;
|
||||
if self.connectors_force_refetch_pending {
|
||||
self.connectors_force_refetch_pending = false;
|
||||
trigger_pending_force_refetch = true;
|
||||
}
|
||||
}
|
||||
|
||||
match result {
|
||||
@@ -6880,15 +6855,12 @@ impl ChatWidget {
|
||||
Err(err) => {
|
||||
if matches!(self.connectors_cache, ConnectorsCacheState::Ready(_)) {
|
||||
warn!("failed to refresh apps list; retaining current apps snapshot: {err}");
|
||||
} else {
|
||||
self.connectors_cache = ConnectorsCacheState::Failed(err);
|
||||
self.bottom_pane.set_connectors_snapshot(None);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if trigger_pending_force_refetch {
|
||||
self.prefetch_connectors_with_options(true);
|
||||
self.connectors_cache = ConnectorsCacheState::Failed(err);
|
||||
self.bottom_pane.set_connectors_snapshot(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1626,7 +1626,6 @@ async fn make_chatwidget_manual(
|
||||
mcp_startup_status: None,
|
||||
connectors_cache: ConnectorsCacheState::default(),
|
||||
connectors_prefetch_in_flight: false,
|
||||
connectors_force_refetch_pending: false,
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
full_reasoning_buffer: String::new(),
|
||||
@@ -4580,42 +4579,6 @@ async fn apps_refresh_failure_keeps_existing_full_snapshot() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apps_refresh_failure_with_cached_snapshot_triggers_pending_force_refetch() {
|
||||
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
chat.config.features.enable(Feature::Apps);
|
||||
chat.bottom_pane.set_connectors_enabled(true);
|
||||
chat.connectors_prefetch_in_flight = true;
|
||||
chat.connectors_force_refetch_pending = true;
|
||||
|
||||
let full_connectors = vec![codex_chatgpt::connectors::AppInfo {
|
||||
id: "unit_test_apps_refresh_failure_pending_connector".to_string(),
|
||||
name: "Notion".to_string(),
|
||||
description: Some("Workspace docs".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
branding: None,
|
||||
app_metadata: None,
|
||||
labels: None,
|
||||
install_url: Some("https://example.test/notion".to_string()),
|
||||
is_accessible: true,
|
||||
is_enabled: true,
|
||||
}];
|
||||
chat.connectors_cache = ConnectorsCacheState::Ready(ConnectorsSnapshot {
|
||||
connectors: full_connectors.clone(),
|
||||
});
|
||||
|
||||
chat.on_connectors_loaded(Err("failed to load apps".to_string()), true);
|
||||
|
||||
assert!(chat.connectors_prefetch_in_flight);
|
||||
assert!(!chat.connectors_force_refetch_pending);
|
||||
assert_matches!(
|
||||
&chat.connectors_cache,
|
||||
ConnectorsCacheState::Ready(snapshot) if snapshot.connectors == full_connectors
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apps_partial_refresh_uses_same_filtering_as_full_refresh() {
|
||||
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
|
||||
Reference in New Issue
Block a user