mirror of
https://github.com/openai/codex.git
synced 2026-05-10 14:22:30 +00:00
Compare commits
6 Commits
pr21095
...
dev/mzeng/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce5c64c584 | ||
|
|
a7b9a6b0d0 | ||
|
|
6c5899c897 | ||
|
|
5acd567e6a | ||
|
|
5150ed5c47 | ||
|
|
43c8d9858f |
@@ -218,7 +218,9 @@ impl ThreadHistoryBuilder {
|
||||
RolloutItem::EventMsg(event) => self.handle_event(event),
|
||||
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
|
||||
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
|
||||
RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {}
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub(crate) mod mcp;
|
||||
pub(crate) mod mcp_connection_manager;
|
||||
pub(crate) mod mcp_tool_names;
|
||||
mod tool_catalog;
|
||||
|
||||
pub use mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
pub use mcp::McpAuthStatusEntry;
|
||||
@@ -44,3 +45,7 @@ pub use mcp_connection_manager::ToolInfo;
|
||||
pub use mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
pub use mcp_connection_manager::declared_openai_file_input_param_names;
|
||||
pub use mcp_connection_manager::filter_non_codex_apps_mcp_tools_only;
|
||||
pub use tool_catalog::McpToolCatalog;
|
||||
pub use tool_catalog::McpToolCatalogMerge;
|
||||
pub use tool_catalog::McpToolCatalogSnapshot;
|
||||
pub use tool_catalog::McpToolCatalogUpdate;
|
||||
|
||||
248
codex-rs/codex-mcp/src/tool_catalog.rs
Normal file
248
codex-rs/codex-mcp/src/tool_catalog.rs
Normal file
@@ -0,0 +1,248 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use codex_protocol::mcp::AdvertisedMcpToolCatalog;
|
||||
use codex_protocol::mcp::AdvertisedMcpToolInfo;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::mcp_connection_manager::ToolInfo;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct McpToolCatalog {
|
||||
has_servers: bool,
|
||||
tools: HashMap<String, ToolInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct McpToolCatalogUpdate {
|
||||
pub has_servers: bool,
|
||||
pub tools: HashMap<String, ToolInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct McpToolCatalogSnapshot {
|
||||
pub has_servers: bool,
|
||||
pub tools: HashMap<String, ToolInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct McpToolCatalogMerge {
|
||||
pub snapshot: McpToolCatalogSnapshot,
|
||||
pub changed: bool,
|
||||
}
|
||||
|
||||
impl McpToolCatalog {
|
||||
pub fn merge(&mut self, update: McpToolCatalogUpdate) -> McpToolCatalogMerge {
|
||||
let McpToolCatalogUpdate { has_servers, tools } = update;
|
||||
let mut changed = false;
|
||||
if has_servers && !self.has_servers {
|
||||
self.has_servers = true;
|
||||
changed = true;
|
||||
}
|
||||
for (name, tool) in tools {
|
||||
if let std::collections::hash_map::Entry::Vacant(entry) = self.tools.entry(name) {
|
||||
entry.insert(tool);
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
McpToolCatalogMerge {
|
||||
snapshot: self.snapshot(),
|
||||
changed,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn snapshot(&self) -> McpToolCatalogSnapshot {
|
||||
McpToolCatalogSnapshot {
|
||||
has_servers: self.has_servers,
|
||||
tools: self.tools.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn resolve(&self, name: &str, namespace: Option<&str>) -> Option<ToolInfo> {
|
||||
let qualified_name = qualified_mcp_tool_name(name, namespace);
|
||||
self.tools.get(&qualified_name).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
impl McpToolCatalogUpdate {
|
||||
pub fn from_advertised(catalog: AdvertisedMcpToolCatalog) -> Self {
|
||||
let tools = catalog
|
||||
.tools
|
||||
.into_iter()
|
||||
.filter_map(
|
||||
|(name, tool)| match advertised_tool_info_into_tool_info(tool) {
|
||||
Ok(tool) => Some((name, tool)),
|
||||
Err(err) => {
|
||||
warn!("failed to hydrate advertised MCP tool {name} from rollout: {err}");
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.collect();
|
||||
Self {
|
||||
has_servers: catalog.has_servers,
|
||||
tools,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl McpToolCatalogSnapshot {
|
||||
pub fn to_advertised(&self) -> Option<AdvertisedMcpToolCatalog> {
|
||||
if !self.has_servers && self.tools.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let tools = self
|
||||
.tools
|
||||
.iter()
|
||||
.filter_map(
|
||||
|(name, tool)| match advertised_tool_info_from_tool_info(tool) {
|
||||
Ok(tool) => Some((name.clone(), tool)),
|
||||
Err(err) => {
|
||||
warn!("failed to persist advertised MCP tool {name}: {err}");
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.collect();
|
||||
|
||||
Some(AdvertisedMcpToolCatalog {
|
||||
has_servers: self.has_servers,
|
||||
tools,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn advertised_tool_info_from_tool_info(
|
||||
tool: &ToolInfo,
|
||||
) -> serde_json::Result<AdvertisedMcpToolInfo> {
|
||||
Ok(AdvertisedMcpToolInfo {
|
||||
server_name: tool.server_name.clone(),
|
||||
callable_name: tool.callable_name.clone(),
|
||||
callable_namespace: tool.callable_namespace.clone(),
|
||||
server_instructions: tool.server_instructions.clone(),
|
||||
tool: serde_json::to_value(&tool.tool)?,
|
||||
connector_id: tool.connector_id.clone(),
|
||||
connector_name: tool.connector_name.clone(),
|
||||
plugin_display_names: tool.plugin_display_names.clone(),
|
||||
connector_description: tool.connector_description.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn advertised_tool_info_into_tool_info(
|
||||
tool: AdvertisedMcpToolInfo,
|
||||
) -> serde_json::Result<ToolInfo> {
|
||||
Ok(ToolInfo {
|
||||
server_name: tool.server_name,
|
||||
callable_name: tool.callable_name,
|
||||
callable_namespace: tool.callable_namespace,
|
||||
server_instructions: tool.server_instructions,
|
||||
tool: serde_json::from_value(tool.tool)?,
|
||||
connector_id: tool.connector_id,
|
||||
connector_name: tool.connector_name,
|
||||
plugin_display_names: tool.plugin_display_names,
|
||||
connector_description: tool.connector_description,
|
||||
})
|
||||
}
|
||||
|
||||
fn qualified_mcp_tool_name(name: &str, namespace: Option<&str>) -> String {
|
||||
match namespace {
|
||||
Some(namespace) if name.starts_with(namespace) => name.to_string(),
|
||||
Some(namespace) => format!("{namespace}{name}"),
|
||||
None => name.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use rmcp::model::Tool;
|
||||
use serde_json::Map as JsonObject;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn tool_info(server_name: &str, callable_name: &str) -> ToolInfo {
|
||||
ToolInfo {
|
||||
server_name: server_name.to_string(),
|
||||
callable_name: callable_name.to_string(),
|
||||
callable_namespace: format!("mcp__{server_name}__"),
|
||||
server_instructions: None,
|
||||
tool: Tool {
|
||||
name: callable_name.to_string().into(),
|
||||
title: None,
|
||||
description: Some(format!("Test tool: {callable_name}").into()),
|
||||
input_schema: Arc::new(JsonObject::default()),
|
||||
output_schema: None,
|
||||
annotations: None,
|
||||
execution: None,
|
||||
icons: None,
|
||||
meta: None,
|
||||
},
|
||||
connector_id: None,
|
||||
connector_name: None,
|
||||
plugin_display_names: Vec::new(),
|
||||
connector_description: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_preserves_first_seen_tool_definition() {
|
||||
let mut catalog = McpToolCatalog::default();
|
||||
catalog.merge(McpToolCatalogUpdate {
|
||||
has_servers: true,
|
||||
tools: HashMap::from([(
|
||||
"mcp__observability__get_dashboard".to_string(),
|
||||
tool_info("observability", "get_dashboard"),
|
||||
)]),
|
||||
});
|
||||
catalog.merge(McpToolCatalogUpdate {
|
||||
has_servers: true,
|
||||
tools: HashMap::from([(
|
||||
"mcp__observability__get_dashboard".to_string(),
|
||||
tool_info("other", "replacement"),
|
||||
)]),
|
||||
});
|
||||
|
||||
let resolved = catalog
|
||||
.resolve("mcp__observability__get_dashboard", /*namespace*/ None)
|
||||
.expect("tool should resolve");
|
||||
assert_eq!(resolved.server_name, "observability");
|
||||
assert_eq!(resolved.tool.name.as_ref(), "get_dashboard");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_matches_namespaced_tool_calls() {
|
||||
let mut catalog = McpToolCatalog::default();
|
||||
catalog.merge(McpToolCatalogUpdate {
|
||||
has_servers: true,
|
||||
tools: HashMap::from([(
|
||||
"mcp__observability__get_dashboard".to_string(),
|
||||
tool_info("observability", "get_dashboard"),
|
||||
)]),
|
||||
});
|
||||
|
||||
let resolved = catalog
|
||||
.resolve("get_dashboard", Some("mcp__observability__"))
|
||||
.expect("tool should resolve");
|
||||
assert_eq!(resolved.server_name, "observability");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_preserves_advertised_server_presence() {
|
||||
let mut catalog = McpToolCatalog::default();
|
||||
let merged = catalog.merge(McpToolCatalogUpdate {
|
||||
has_servers: true,
|
||||
tools: HashMap::new(),
|
||||
});
|
||||
assert!(merged.changed);
|
||||
assert!(merged.snapshot.has_servers);
|
||||
|
||||
let merged = catalog.merge(McpToolCatalogUpdate {
|
||||
has_servers: false,
|
||||
tools: HashMap::new(),
|
||||
});
|
||||
assert!(!merged.changed);
|
||||
assert!(merged.snapshot.has_servers);
|
||||
assert!(merged.snapshot.tools.is_empty());
|
||||
}
|
||||
}
|
||||
@@ -117,6 +117,7 @@ fn keep_forked_rollout_item(item: &RolloutItem) -> bool {
|
||||
| ResponseItem::Other,
|
||||
) => false,
|
||||
RolloutItem::Compacted(_)
|
||||
| RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::EventMsg(_)
|
||||
| RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::TurnContext(_) => true,
|
||||
|
||||
@@ -78,6 +78,8 @@ use codex_login::CodexAuth;
|
||||
use codex_login::auth_env_telemetry::collect_auth_env_telemetry;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_mcp::McpConnectionManager;
|
||||
use codex_mcp::McpToolCatalogSnapshot;
|
||||
use codex_mcp::McpToolCatalogUpdate;
|
||||
use codex_mcp::SandboxState;
|
||||
use codex_mcp::ToolInfo;
|
||||
use codex_mcp::codex_apps_tools_cache_key;
|
||||
@@ -2358,6 +2360,64 @@ impl Session {
|
||||
state.get_connector_selection()
|
||||
}
|
||||
|
||||
async fn merge_advertised_mcp_catalog_for_model(
|
||||
&self,
|
||||
update: McpToolCatalogUpdate,
|
||||
) -> McpToolCatalogSnapshot {
|
||||
let merged = {
|
||||
let mut state = self.state.lock().await;
|
||||
state.merge_advertised_mcp_tools(update)
|
||||
};
|
||||
if merged.changed {
|
||||
self.persist_advertised_mcp_catalog(&merged.snapshot).await;
|
||||
}
|
||||
merged.snapshot
|
||||
}
|
||||
|
||||
async fn hydrate_advertised_mcp_catalog_from_rollout(&self, rollout_items: &[RolloutItem]) {
|
||||
let mut state = self.state.lock().await;
|
||||
for item in rollout_items {
|
||||
if let RolloutItem::AdvertisedMcpTools(catalog) = item {
|
||||
state.merge_advertised_mcp_tools(McpToolCatalogUpdate::from_advertised(
|
||||
catalog.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn persist_advertised_mcp_catalog(&self, snapshot: &McpToolCatalogSnapshot) {
|
||||
let Some(catalog) = snapshot.to_advertised() else {
|
||||
return;
|
||||
};
|
||||
self.persist_rollout_items(&[RolloutItem::AdvertisedMcpTools(catalog)])
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn list_model_visible_mcp_catalog_for_turn(
|
||||
&self,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> CodexResult<McpToolCatalogSnapshot> {
|
||||
let manager = self.services.mcp_connection_manager.read().await;
|
||||
let has_servers = manager.has_servers();
|
||||
let tools = manager
|
||||
.list_all_tools()
|
||||
.or_cancel(cancellation_token)
|
||||
.await?;
|
||||
drop(manager);
|
||||
Ok(self
|
||||
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate { has_servers, tools })
|
||||
.await)
|
||||
}
|
||||
|
||||
async fn list_model_visible_mcp_catalog(&self) -> McpToolCatalogSnapshot {
|
||||
let manager = self.services.mcp_connection_manager.read().await;
|
||||
let has_servers = manager.has_servers();
|
||||
let tools = manager.list_all_tools().await;
|
||||
drop(manager);
|
||||
self.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate { has_servers, tools })
|
||||
.await
|
||||
}
|
||||
|
||||
// Clears connector IDs that were accumulated for explicit selection.
|
||||
pub(crate) async fn clear_connector_selection(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
@@ -2387,6 +2447,8 @@ impl Session {
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
let rollout_items = resumed_history.history;
|
||||
self.hydrate_advertised_mcp_catalog_from_rollout(&rollout_items)
|
||||
.await;
|
||||
let previous_turn_settings = self
|
||||
.apply_rollout_reconstruction(&turn_context, &rollout_items)
|
||||
.await;
|
||||
@@ -2425,6 +2487,8 @@ impl Session {
|
||||
}
|
||||
}
|
||||
InitialHistory::Forked(rollout_items) => {
|
||||
self.hydrate_advertised_mcp_catalog_from_rollout(&rollout_items)
|
||||
.await;
|
||||
self.apply_rollout_reconstruction(&turn_context, &rollout_items)
|
||||
.await;
|
||||
|
||||
@@ -3841,13 +3905,14 @@ impl Session {
|
||||
}
|
||||
}
|
||||
if turn_context.config.include_apps_instructions && turn_context.apps_enabled() {
|
||||
let mcp_connection_manager = self.services.mcp_connection_manager.read().await;
|
||||
let accessible_and_enabled_connectors =
|
||||
connectors::list_accessible_and_enabled_connectors_from_manager(
|
||||
&mcp_connection_manager,
|
||||
&turn_context.config,
|
||||
)
|
||||
.await;
|
||||
let mcp_catalog = self.list_model_visible_mcp_catalog().await;
|
||||
let accessible_and_enabled_connectors = connectors::with_app_enabled_state(
|
||||
connectors::accessible_connectors_from_mcp_tools(&mcp_catalog.tools),
|
||||
&turn_context.config,
|
||||
)
|
||||
.into_iter()
|
||||
.filter(|connector| connector.is_accessible && connector.is_enabled)
|
||||
.collect::<Vec<_>>();
|
||||
if let Some(apps_section) = render_apps_section(&accessible_and_enabled_connectors) {
|
||||
developer_sections.push(apps_section);
|
||||
}
|
||||
@@ -4458,12 +4523,19 @@ impl Session {
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Option<ToolInfo> {
|
||||
self.services
|
||||
if let Some(tool) = self
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.resolve_tool_info(name, namespace)
|
||||
.await
|
||||
{
|
||||
return Some(tool);
|
||||
}
|
||||
|
||||
let state = self.state.lock().await;
|
||||
state.resolve_advertised_mcp_tool(name, namespace)
|
||||
}
|
||||
|
||||
pub async fn interrupt_task(self: &Arc<Self>) {
|
||||
@@ -4528,6 +4600,15 @@ impl Session {
|
||||
sandbox_cwd: turn_context.cwd.to_path_buf(),
|
||||
use_legacy_landlock: turn_context.features.use_legacy_landlock(),
|
||||
};
|
||||
let manager = self.services.mcp_connection_manager.read().await;
|
||||
let existing_has_servers = manager.has_servers();
|
||||
let existing_tools = manager.list_all_tools().await;
|
||||
drop(manager);
|
||||
self.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
|
||||
has_servers: existing_has_servers,
|
||||
tools: existing_tools,
|
||||
})
|
||||
.await;
|
||||
{
|
||||
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
guard.cancel();
|
||||
@@ -6173,15 +6254,10 @@ pub(crate) async fn run_turn(
|
||||
// are normally hidden so we can describe the plugin's currently
|
||||
// usable capabilities for this turn.
|
||||
match sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.list_all_tools()
|
||||
.or_cancel(&cancellation_token)
|
||||
.list_model_visible_mcp_catalog_for_turn(&cancellation_token)
|
||||
.await
|
||||
{
|
||||
Ok(mcp_tools) => mcp_tools,
|
||||
Ok(mcp_catalog) => mcp_catalog.tools,
|
||||
Err(_) if turn_context.apps_enabled() => return None,
|
||||
Err(_) => HashMap::new(),
|
||||
}
|
||||
@@ -7114,13 +7190,11 @@ pub(crate) async fn built_tools(
|
||||
skills_outcome: Option<&SkillLoadOutcome>,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> CodexResult<Arc<ToolRouter>> {
|
||||
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
|
||||
let has_mcp_servers = mcp_connection_manager.has_servers();
|
||||
let all_mcp_tools = mcp_connection_manager
|
||||
.list_all_tools()
|
||||
.or_cancel(cancellation_token)
|
||||
let mcp_catalog = sess
|
||||
.list_model_visible_mcp_catalog_for_turn(cancellation_token)
|
||||
.await?;
|
||||
drop(mcp_connection_manager);
|
||||
let has_mcp_servers = mcp_catalog.has_servers;
|
||||
let all_mcp_tools = mcp_catalog.tools;
|
||||
let loaded_plugins = sess
|
||||
.services
|
||||
.plugins_manager
|
||||
|
||||
@@ -207,7 +207,9 @@ impl Session {
|
||||
active_segment.get_or_insert_with(ActiveReplaySegment::default);
|
||||
active_segment.counts_as_user_turn |= is_user_turn_boundary(response_item);
|
||||
}
|
||||
RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => {}
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::EventMsg(_)
|
||||
| RolloutItem::SessionMeta(_) => {}
|
||||
}
|
||||
|
||||
if base_replacement_history.is_some()
|
||||
@@ -275,6 +277,7 @@ impl Session {
|
||||
history.drop_last_n_user_turns(rollback.num_turns);
|
||||
}
|
||||
RolloutItem::EventMsg(_)
|
||||
| RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_) => {}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ use crate::tools::format_exec_output_str;
|
||||
use codex_features::Features;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use codex_mcp::McpToolCatalog;
|
||||
use codex_mcp::McpToolCatalogUpdate;
|
||||
use codex_mcp::ToolInfo;
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_models_manager::bundled_models_response;
|
||||
@@ -945,6 +947,177 @@ fn mcp_tool_exposure_directly_exposes_explicit_apps_in_large_search_sets() {
|
||||
assert!(deferred_tools.contains_key("mcp__rmcp__tool_0"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn built_tools_keeps_advertised_mcp_tools_when_live_manager_is_empty() {
|
||||
let (session, turn_context, _rx) = make_session_and_context_with_rx().await;
|
||||
session
|
||||
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
|
||||
has_servers: true,
|
||||
tools: numbered_mcp_tools(/*count*/ 1),
|
||||
})
|
||||
.await;
|
||||
|
||||
let router = built_tools(
|
||||
session.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
&[],
|
||||
&HashSet::new(),
|
||||
/*skills_outcome*/ None,
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await
|
||||
.expect("build tools");
|
||||
|
||||
assert!(
|
||||
router
|
||||
.model_visible_specs()
|
||||
.iter()
|
||||
.any(|spec| spec.name() == "mcp__rmcp__tool_0"),
|
||||
"previously advertised MCP tool should remain model-visible"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn built_tools_keeps_mcp_resource_tools_when_server_catalog_was_advertised() {
|
||||
let (session, turn_context, _rx) = make_session_and_context_with_rx().await;
|
||||
session
|
||||
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
|
||||
has_servers: true,
|
||||
tools: HashMap::new(),
|
||||
})
|
||||
.await;
|
||||
|
||||
let router = built_tools(
|
||||
session.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
&[],
|
||||
&HashSet::new(),
|
||||
/*skills_outcome*/ None,
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await
|
||||
.expect("build tools");
|
||||
|
||||
let specs = router.model_visible_specs();
|
||||
|
||||
assert!(specs.iter().any(|spec| spec.name() == "list_mcp_resources"));
|
||||
assert!(
|
||||
specs
|
||||
.iter()
|
||||
.any(|spec| spec.name() == "list_mcp_resource_templates")
|
||||
);
|
||||
assert!(specs.iter().any(|spec| spec.name() == "read_mcp_resource"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn build_tool_call_routes_advertised_mcp_tool_when_live_manager_is_empty() {
|
||||
let (session, _turn_context, _rx) = make_session_and_context_with_rx().await;
|
||||
session
|
||||
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
|
||||
has_servers: true,
|
||||
tools: numbered_mcp_tools(/*count*/ 1),
|
||||
})
|
||||
.await;
|
||||
|
||||
let call = ToolRouter::build_tool_call(
|
||||
session.as_ref(),
|
||||
ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
name: "mcp__rmcp__tool_0".to_string(),
|
||||
namespace: None,
|
||||
arguments: "{}".to_string(),
|
||||
call_id: "call-1".to_string(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("build tool call")
|
||||
.expect("function call should produce a tool call");
|
||||
|
||||
match call.payload {
|
||||
ToolPayload::Mcp {
|
||||
server,
|
||||
tool,
|
||||
raw_arguments,
|
||||
} => {
|
||||
assert_eq!(server, "rmcp");
|
||||
assert_eq!(tool, "tool_0");
|
||||
assert_eq!(raw_arguments, "{}");
|
||||
}
|
||||
other => panic!("expected MCP payload, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resumed_history_hydrates_advertised_mcp_tools() {
|
||||
let mut catalog = McpToolCatalog::default();
|
||||
let advertised = catalog
|
||||
.merge(McpToolCatalogUpdate {
|
||||
has_servers: true,
|
||||
tools: numbered_mcp_tools(/*count*/ 1),
|
||||
})
|
||||
.snapshot
|
||||
.to_advertised()
|
||||
.expect("advertised catalog");
|
||||
|
||||
let (session, turn_context, _rx) = make_session_and_context_with_rx().await;
|
||||
session
|
||||
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
|
||||
conversation_id: ThreadId::new(),
|
||||
history: vec![RolloutItem::AdvertisedMcpTools(advertised)],
|
||||
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
|
||||
}))
|
||||
.await;
|
||||
|
||||
let router = built_tools(
|
||||
session.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
&[],
|
||||
&HashSet::new(),
|
||||
/*skills_outcome*/ None,
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await
|
||||
.expect("build tools");
|
||||
|
||||
assert!(
|
||||
router
|
||||
.model_visible_specs()
|
||||
.iter()
|
||||
.any(|spec| spec.name() == "mcp__rmcp__tool_0"),
|
||||
"MCP tools advertised before process exit should be restored on resume"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn advertised_mcp_catalog_updates_are_persisted_to_rollout() {
|
||||
let (session, _turn_context, _rx) = make_session_and_context_with_rx().await;
|
||||
let rollout_path = attach_rollout_recorder(&session).await;
|
||||
|
||||
session
|
||||
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
|
||||
has_servers: true,
|
||||
tools: numbered_mcp_tools(/*count*/ 1),
|
||||
})
|
||||
.await;
|
||||
session.flush_rollout().await.expect("flush rollout");
|
||||
|
||||
let InitialHistory::Resumed(resumed) = RolloutRecorder::get_rollout_history(&rollout_path)
|
||||
.await
|
||||
.expect("read rollout history")
|
||||
else {
|
||||
panic!("expected resumed rollout history");
|
||||
};
|
||||
let advertised = resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::AdvertisedMcpTools(catalog) => Some(catalog),
|
||||
_ => None,
|
||||
});
|
||||
|
||||
assert!(
|
||||
advertised.is_some_and(|catalog| catalog.tools.contains_key("mcp__rmcp__tool_0")),
|
||||
"advertised MCP catalog should be persisted for process resume"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconstruct_history_matches_live_compactions() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
|
||||
@@ -104,19 +104,6 @@ pub async fn list_accessible_connectors_from_mcp_tools(
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn list_accessible_and_enabled_connectors_from_manager(
|
||||
mcp_connection_manager: &McpConnectionManager,
|
||||
config: &Config,
|
||||
) -> Vec<AppInfo> {
|
||||
with_app_enabled_state(
|
||||
accessible_connectors_from_mcp_tools(&mcp_connection_manager.list_all_tools().await),
|
||||
config,
|
||||
)
|
||||
.into_iter()
|
||||
.filter(|connector| connector.is_accessible && connector.is_enabled)
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) async fn list_tool_suggest_discoverable_tools_with_auth(
|
||||
config: &Config,
|
||||
auth: Option<&CodexAuth>,
|
||||
|
||||
@@ -10,6 +10,10 @@ use crate::codex::PreviousTurnSettings;
|
||||
use crate::codex::SessionConfiguration;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::session_startup_prewarm::SessionStartupPrewarmHandle;
|
||||
use codex_mcp::McpToolCatalog;
|
||||
use codex_mcp::McpToolCatalogMerge;
|
||||
use codex_mcp::McpToolCatalogUpdate;
|
||||
use codex_mcp::ToolInfo;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
@@ -28,6 +32,7 @@ pub(crate) struct SessionState {
|
||||
/// model/realtime handling on subsequent regular turns (including full-context
|
||||
/// reinjection after resume or `/compact`).
|
||||
previous_turn_settings: Option<PreviousTurnSettings>,
|
||||
advertised_mcp_tools: McpToolCatalog,
|
||||
/// Startup prewarmed session prepared during session initialization.
|
||||
pub(crate) startup_prewarm: Option<SessionStartupPrewarmHandle>,
|
||||
pub(crate) active_connector_selection: HashSet<String>,
|
||||
@@ -48,6 +53,7 @@ impl SessionState {
|
||||
dependency_env: HashMap::new(),
|
||||
mcp_dependency_prompted: HashSet::new(),
|
||||
previous_turn_settings: None,
|
||||
advertised_mcp_tools: McpToolCatalog::default(),
|
||||
startup_prewarm: None,
|
||||
active_connector_selection: HashSet::new(),
|
||||
pending_session_start_source: None,
|
||||
@@ -75,6 +81,21 @@ impl SessionState {
|
||||
self.previous_turn_settings = previous_turn_settings;
|
||||
}
|
||||
|
||||
pub(crate) fn merge_advertised_mcp_tools(
|
||||
&mut self,
|
||||
update: McpToolCatalogUpdate,
|
||||
) -> McpToolCatalogMerge {
|
||||
self.advertised_mcp_tools.merge(update)
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_advertised_mcp_tool(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Option<ToolInfo> {
|
||||
self.advertised_mcp_tools.resolve(name, namespace)
|
||||
}
|
||||
|
||||
pub(crate) fn set_next_turn_is_first(&mut self, value: bool) {
|
||||
self.next_turn_is_first = value;
|
||||
}
|
||||
|
||||
@@ -3,9 +3,12 @@
|
||||
//!
|
||||
//! We intentionally keep these types TS/JSON-schema friendly (via `ts-rs` and
|
||||
//! `schemars`) so they can be embedded in Codex's own protocol structures.
|
||||
use std::collections::HashMap;
|
||||
|
||||
use schemars::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use ts_rs::TS;
|
||||
|
||||
/// ID of a request, which can be either a string or an integer.
|
||||
@@ -52,6 +55,43 @@ pub struct Tool {
|
||||
pub meta: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
pub struct AdvertisedMcpToolCatalog {
|
||||
pub has_servers: bool,
|
||||
pub tools: HashMap<String, AdvertisedMcpToolInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
pub struct AdvertisedMcpToolInfo {
|
||||
/// Raw MCP server name used for routing the tool call.
|
||||
pub server_name: String,
|
||||
/// Model-visible tool name used in Responses API tool declarations.
|
||||
pub callable_name: String,
|
||||
/// Model-visible namespace used for deferred tool loading.
|
||||
pub callable_namespace: String,
|
||||
/// Instructions from the MCP server initialize result.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub server_instructions: Option<String>,
|
||||
/// Serialized raw MCP tool definition used to rebuild model-visible schemas on resume.
|
||||
pub tool: Value,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub connector_id: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub connector_name: Option<String>,
|
||||
#[serde(default)]
|
||||
pub plugin_display_names: Vec<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub connector_description: Option<String>,
|
||||
}
|
||||
|
||||
/// A known resource that the server is capable of reading.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
|
||||
@@ -28,6 +28,7 @@ use crate::dynamic_tools::DynamicToolCallRequest;
|
||||
use crate::dynamic_tools::DynamicToolResponse;
|
||||
use crate::dynamic_tools::DynamicToolSpec;
|
||||
use crate::items::TurnItem;
|
||||
use crate::mcp::AdvertisedMcpToolCatalog;
|
||||
use crate::mcp::CallToolResult;
|
||||
use crate::mcp::RequestId;
|
||||
use crate::mcp::Resource as McpResource;
|
||||
@@ -2474,12 +2475,20 @@ impl InitialHistory {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.forked_from_id,
|
||||
_ => None,
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.id),
|
||||
_ => None,
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -2532,12 +2541,20 @@ impl InitialHistory {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
|
||||
_ => None,
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
|
||||
_ => None,
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -2548,12 +2565,20 @@ impl InitialHistory {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
resumed.history.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
|
||||
_ => None,
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
|
||||
_ => None,
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -2562,7 +2587,11 @@ impl InitialHistory {
|
||||
fn session_cwd_from_items(items: &[RolloutItem]) -> Option<PathBuf> {
|
||||
items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.cwd.clone()),
|
||||
_ => None,
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2782,6 +2811,7 @@ pub struct SessionMetaLine {
|
||||
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
|
||||
pub enum RolloutItem {
|
||||
SessionMeta(SessionMetaLine),
|
||||
AdvertisedMcpTools(AdvertisedMcpToolCatalog),
|
||||
ResponseItem(ResponseItem),
|
||||
Compacted(CompactedItem),
|
||||
TurnContext(TurnContextItem),
|
||||
|
||||
@@ -1065,6 +1065,9 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
|
||||
RolloutItem::Compacted(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::AdvertisedMcpTools(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::EventMsg(ev) => {
|
||||
if let EventMsg::UserMessage(user) = ev {
|
||||
summary.saw_user_event = true;
|
||||
@@ -1117,6 +1120,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Va
|
||||
}
|
||||
}
|
||||
RolloutItem::Compacted(_)
|
||||
| RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => {}
|
||||
}
|
||||
|
||||
@@ -70,7 +70,8 @@ pub fn builder_from_items(
|
||||
) -> Option<ThreadMetadataBuilder> {
|
||||
if let Some(session_meta) = items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line),
|
||||
RolloutItem::ResponseItem(_)
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
@@ -124,7 +125,8 @@ pub async fn extract_metadata_from_rollout(
|
||||
metadata,
|
||||
memory_mode: items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
|
||||
RolloutItem::ResponseItem(_)
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
|
||||
@@ -16,9 +16,10 @@ pub fn is_persisted_response_item(item: &RolloutItem, mode: EventPersistenceMode
|
||||
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
|
||||
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode),
|
||||
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
|
||||
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
|
||||
true
|
||||
}
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -686,6 +686,9 @@ impl RolloutRecorder {
|
||||
RolloutItem::TurnContext(item) => {
|
||||
items.push(RolloutItem::TurnContext(item));
|
||||
}
|
||||
RolloutItem::AdvertisedMcpTools(item) => {
|
||||
items.push(RolloutItem::AdvertisedMcpTools(item));
|
||||
}
|
||||
RolloutItem::EventMsg(_ev) => {
|
||||
items.push(RolloutItem::EventMsg(_ev));
|
||||
}
|
||||
@@ -1303,6 +1306,7 @@ async fn resume_candidate_matches_cwd(
|
||||
&& let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()),
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
|
||||
@@ -22,7 +22,7 @@ pub fn apply_rollout_item(
|
||||
RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx),
|
||||
RolloutItem::EventMsg(event) => apply_event_msg(metadata, event),
|
||||
RolloutItem::ResponseItem(item) => apply_response_item(metadata, item),
|
||||
RolloutItem::Compacted(_) => {}
|
||||
RolloutItem::AdvertisedMcpTools(_) | RolloutItem::Compacted(_) => {}
|
||||
}
|
||||
if metadata.model_provider.is_empty() {
|
||||
metadata.model_provider = default_provider.to_string();
|
||||
@@ -36,9 +36,10 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
|
||||
RolloutItem::EventMsg(
|
||||
EventMsg::TokenCount(_) | EventMsg::UserMessage(_) | EventMsg::ThreadNameUpdated(_),
|
||||
) => true,
|
||||
RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) => {
|
||||
false
|
||||
}
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::EventMsg(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -954,7 +954,8 @@ SELECT
|
||||
pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<DynamicToolSpec>>> {
|
||||
items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
|
||||
RolloutItem::ResponseItem(_)
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
@@ -964,7 +965,8 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<
|
||||
pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
|
||||
items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
|
||||
RolloutItem::ResponseItem(_)
|
||||
RolloutItem::AdvertisedMcpTools(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
|
||||
Reference in New Issue
Block a user