Compare commits

...

6 Commits

Author SHA1 Message Date
Matthew Zeng
ce5c64c584 update 2026-04-14 11:22:48 -07:00
Matthew Zeng
a7b9a6b0d0 update 2026-04-14 11:04:00 -07:00
Matthew Zeng
6c5899c897 Merge branch 'main' of github.com:openai/codex into dev/mzeng/keep-mcp-tools-consistent 2026-04-14 10:41:52 -07:00
Matthew Zeng
5acd567e6a update 2026-04-14 10:25:15 -07:00
Matthew Zeng
5150ed5c47 update 2026-04-13 23:26:29 -07:00
Matthew Zeng
43c8d9858f Keep advertised MCP tools sticky across requests 2026-04-13 22:16:36 -07:00
17 changed files with 652 additions and 54 deletions

View File

@@ -218,7 +218,9 @@ impl ThreadHistoryBuilder {
RolloutItem::EventMsg(event) => self.handle_event(event),
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {}
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_) => {}
}
}

View File

@@ -1,6 +1,7 @@
pub(crate) mod mcp;
pub(crate) mod mcp_connection_manager;
pub(crate) mod mcp_tool_names;
mod tool_catalog;
pub use mcp::CODEX_APPS_MCP_SERVER_NAME;
pub use mcp::McpAuthStatusEntry;
@@ -44,3 +45,7 @@ pub use mcp_connection_manager::ToolInfo;
pub use mcp_connection_manager::codex_apps_tools_cache_key;
pub use mcp_connection_manager::declared_openai_file_input_param_names;
pub use mcp_connection_manager::filter_non_codex_apps_mcp_tools_only;
pub use tool_catalog::McpToolCatalog;
pub use tool_catalog::McpToolCatalogMerge;
pub use tool_catalog::McpToolCatalogSnapshot;
pub use tool_catalog::McpToolCatalogUpdate;

View File

@@ -0,0 +1,248 @@
use std::collections::HashMap;
use codex_protocol::mcp::AdvertisedMcpToolCatalog;
use codex_protocol::mcp::AdvertisedMcpToolInfo;
use tracing::warn;
use crate::mcp_connection_manager::ToolInfo;
#[derive(Debug, Default)]
pub struct McpToolCatalog {
has_servers: bool,
tools: HashMap<String, ToolInfo>,
}
#[derive(Debug)]
pub struct McpToolCatalogUpdate {
pub has_servers: bool,
pub tools: HashMap<String, ToolInfo>,
}
#[derive(Debug)]
pub struct McpToolCatalogSnapshot {
pub has_servers: bool,
pub tools: HashMap<String, ToolInfo>,
}
#[derive(Debug)]
pub struct McpToolCatalogMerge {
pub snapshot: McpToolCatalogSnapshot,
pub changed: bool,
}
impl McpToolCatalog {
pub fn merge(&mut self, update: McpToolCatalogUpdate) -> McpToolCatalogMerge {
let McpToolCatalogUpdate { has_servers, tools } = update;
let mut changed = false;
if has_servers && !self.has_servers {
self.has_servers = true;
changed = true;
}
for (name, tool) in tools {
if let std::collections::hash_map::Entry::Vacant(entry) = self.tools.entry(name) {
entry.insert(tool);
changed = true;
}
}
McpToolCatalogMerge {
snapshot: self.snapshot(),
changed,
}
}
pub fn snapshot(&self) -> McpToolCatalogSnapshot {
McpToolCatalogSnapshot {
has_servers: self.has_servers,
tools: self.tools.clone(),
}
}
pub fn resolve(&self, name: &str, namespace: Option<&str>) -> Option<ToolInfo> {
let qualified_name = qualified_mcp_tool_name(name, namespace);
self.tools.get(&qualified_name).cloned()
}
}
impl McpToolCatalogUpdate {
pub fn from_advertised(catalog: AdvertisedMcpToolCatalog) -> Self {
let tools = catalog
.tools
.into_iter()
.filter_map(
|(name, tool)| match advertised_tool_info_into_tool_info(tool) {
Ok(tool) => Some((name, tool)),
Err(err) => {
warn!("failed to hydrate advertised MCP tool {name} from rollout: {err}");
None
}
},
)
.collect();
Self {
has_servers: catalog.has_servers,
tools,
}
}
}
impl McpToolCatalogSnapshot {
pub fn to_advertised(&self) -> Option<AdvertisedMcpToolCatalog> {
if !self.has_servers && self.tools.is_empty() {
return None;
}
let tools = self
.tools
.iter()
.filter_map(
|(name, tool)| match advertised_tool_info_from_tool_info(tool) {
Ok(tool) => Some((name.clone(), tool)),
Err(err) => {
warn!("failed to persist advertised MCP tool {name}: {err}");
None
}
},
)
.collect();
Some(AdvertisedMcpToolCatalog {
has_servers: self.has_servers,
tools,
})
}
}
fn advertised_tool_info_from_tool_info(
tool: &ToolInfo,
) -> serde_json::Result<AdvertisedMcpToolInfo> {
Ok(AdvertisedMcpToolInfo {
server_name: tool.server_name.clone(),
callable_name: tool.callable_name.clone(),
callable_namespace: tool.callable_namespace.clone(),
server_instructions: tool.server_instructions.clone(),
tool: serde_json::to_value(&tool.tool)?,
connector_id: tool.connector_id.clone(),
connector_name: tool.connector_name.clone(),
plugin_display_names: tool.plugin_display_names.clone(),
connector_description: tool.connector_description.clone(),
})
}
fn advertised_tool_info_into_tool_info(
tool: AdvertisedMcpToolInfo,
) -> serde_json::Result<ToolInfo> {
Ok(ToolInfo {
server_name: tool.server_name,
callable_name: tool.callable_name,
callable_namespace: tool.callable_namespace,
server_instructions: tool.server_instructions,
tool: serde_json::from_value(tool.tool)?,
connector_id: tool.connector_id,
connector_name: tool.connector_name,
plugin_display_names: tool.plugin_display_names,
connector_description: tool.connector_description,
})
}
fn qualified_mcp_tool_name(name: &str, namespace: Option<&str>) -> String {
match namespace {
Some(namespace) if name.starts_with(namespace) => name.to_string(),
Some(namespace) => format!("{namespace}{name}"),
None => name.to_string(),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use rmcp::model::Tool;
use serde_json::Map as JsonObject;
use super::*;
fn tool_info(server_name: &str, callable_name: &str) -> ToolInfo {
ToolInfo {
server_name: server_name.to_string(),
callable_name: callable_name.to_string(),
callable_namespace: format!("mcp__{server_name}__"),
server_instructions: None,
tool: Tool {
name: callable_name.to_string().into(),
title: None,
description: Some(format!("Test tool: {callable_name}").into()),
input_schema: Arc::new(JsonObject::default()),
output_schema: None,
annotations: None,
execution: None,
icons: None,
meta: None,
},
connector_id: None,
connector_name: None,
plugin_display_names: Vec::new(),
connector_description: None,
}
}
#[test]
fn merge_preserves_first_seen_tool_definition() {
let mut catalog = McpToolCatalog::default();
catalog.merge(McpToolCatalogUpdate {
has_servers: true,
tools: HashMap::from([(
"mcp__observability__get_dashboard".to_string(),
tool_info("observability", "get_dashboard"),
)]),
});
catalog.merge(McpToolCatalogUpdate {
has_servers: true,
tools: HashMap::from([(
"mcp__observability__get_dashboard".to_string(),
tool_info("other", "replacement"),
)]),
});
let resolved = catalog
.resolve("mcp__observability__get_dashboard", /*namespace*/ None)
.expect("tool should resolve");
assert_eq!(resolved.server_name, "observability");
assert_eq!(resolved.tool.name.as_ref(), "get_dashboard");
}
#[test]
fn resolve_matches_namespaced_tool_calls() {
let mut catalog = McpToolCatalog::default();
catalog.merge(McpToolCatalogUpdate {
has_servers: true,
tools: HashMap::from([(
"mcp__observability__get_dashboard".to_string(),
tool_info("observability", "get_dashboard"),
)]),
});
let resolved = catalog
.resolve("get_dashboard", Some("mcp__observability__"))
.expect("tool should resolve");
assert_eq!(resolved.server_name, "observability");
}
#[test]
fn merge_preserves_advertised_server_presence() {
let mut catalog = McpToolCatalog::default();
let merged = catalog.merge(McpToolCatalogUpdate {
has_servers: true,
tools: HashMap::new(),
});
assert!(merged.changed);
assert!(merged.snapshot.has_servers);
let merged = catalog.merge(McpToolCatalogUpdate {
has_servers: false,
tools: HashMap::new(),
});
assert!(!merged.changed);
assert!(merged.snapshot.has_servers);
assert!(merged.snapshot.tools.is_empty());
}
}

View File

@@ -117,6 +117,7 @@ fn keep_forked_rollout_item(item: &RolloutItem) -> bool {
| ResponseItem::Other,
) => false,
RolloutItem::Compacted(_)
| RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::EventMsg(_)
| RolloutItem::SessionMeta(_)
| RolloutItem::TurnContext(_) => true,

View File

@@ -78,6 +78,8 @@ use codex_login::CodexAuth;
use codex_login::auth_env_telemetry::collect_auth_env_telemetry;
use codex_login::default_client::originator;
use codex_mcp::McpConnectionManager;
use codex_mcp::McpToolCatalogSnapshot;
use codex_mcp::McpToolCatalogUpdate;
use codex_mcp::SandboxState;
use codex_mcp::ToolInfo;
use codex_mcp::codex_apps_tools_cache_key;
@@ -2358,6 +2360,64 @@ impl Session {
state.get_connector_selection()
}
async fn merge_advertised_mcp_catalog_for_model(
&self,
update: McpToolCatalogUpdate,
) -> McpToolCatalogSnapshot {
let merged = {
let mut state = self.state.lock().await;
state.merge_advertised_mcp_tools(update)
};
if merged.changed {
self.persist_advertised_mcp_catalog(&merged.snapshot).await;
}
merged.snapshot
}
async fn hydrate_advertised_mcp_catalog_from_rollout(&self, rollout_items: &[RolloutItem]) {
let mut state = self.state.lock().await;
for item in rollout_items {
if let RolloutItem::AdvertisedMcpTools(catalog) = item {
state.merge_advertised_mcp_tools(McpToolCatalogUpdate::from_advertised(
catalog.clone(),
));
}
}
}
async fn persist_advertised_mcp_catalog(&self, snapshot: &McpToolCatalogSnapshot) {
let Some(catalog) = snapshot.to_advertised() else {
return;
};
self.persist_rollout_items(&[RolloutItem::AdvertisedMcpTools(catalog)])
.await;
}
async fn list_model_visible_mcp_catalog_for_turn(
&self,
cancellation_token: &CancellationToken,
) -> CodexResult<McpToolCatalogSnapshot> {
let manager = self.services.mcp_connection_manager.read().await;
let has_servers = manager.has_servers();
let tools = manager
.list_all_tools()
.or_cancel(cancellation_token)
.await?;
drop(manager);
Ok(self
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate { has_servers, tools })
.await)
}
async fn list_model_visible_mcp_catalog(&self) -> McpToolCatalogSnapshot {
let manager = self.services.mcp_connection_manager.read().await;
let has_servers = manager.has_servers();
let tools = manager.list_all_tools().await;
drop(manager);
self.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate { has_servers, tools })
.await
}
// Clears connector IDs that were accumulated for explicit selection.
pub(crate) async fn clear_connector_selection(&self) {
let mut state = self.state.lock().await;
@@ -2387,6 +2447,8 @@ impl Session {
}
InitialHistory::Resumed(resumed_history) => {
let rollout_items = resumed_history.history;
self.hydrate_advertised_mcp_catalog_from_rollout(&rollout_items)
.await;
let previous_turn_settings = self
.apply_rollout_reconstruction(&turn_context, &rollout_items)
.await;
@@ -2425,6 +2487,8 @@ impl Session {
}
}
InitialHistory::Forked(rollout_items) => {
self.hydrate_advertised_mcp_catalog_from_rollout(&rollout_items)
.await;
self.apply_rollout_reconstruction(&turn_context, &rollout_items)
.await;
@@ -3841,13 +3905,14 @@ impl Session {
}
}
if turn_context.config.include_apps_instructions && turn_context.apps_enabled() {
let mcp_connection_manager = self.services.mcp_connection_manager.read().await;
let accessible_and_enabled_connectors =
connectors::list_accessible_and_enabled_connectors_from_manager(
&mcp_connection_manager,
&turn_context.config,
)
.await;
let mcp_catalog = self.list_model_visible_mcp_catalog().await;
let accessible_and_enabled_connectors = connectors::with_app_enabled_state(
connectors::accessible_connectors_from_mcp_tools(&mcp_catalog.tools),
&turn_context.config,
)
.into_iter()
.filter(|connector| connector.is_accessible && connector.is_enabled)
.collect::<Vec<_>>();
if let Some(apps_section) = render_apps_section(&accessible_and_enabled_connectors) {
developer_sections.push(apps_section);
}
@@ -4458,12 +4523,19 @@ impl Session {
name: &str,
namespace: Option<&str>,
) -> Option<ToolInfo> {
self.services
if let Some(tool) = self
.services
.mcp_connection_manager
.read()
.await
.resolve_tool_info(name, namespace)
.await
{
return Some(tool);
}
let state = self.state.lock().await;
state.resolve_advertised_mcp_tool(name, namespace)
}
pub async fn interrupt_task(self: &Arc<Self>) {
@@ -4528,6 +4600,15 @@ impl Session {
sandbox_cwd: turn_context.cwd.to_path_buf(),
use_legacy_landlock: turn_context.features.use_legacy_landlock(),
};
let manager = self.services.mcp_connection_manager.read().await;
let existing_has_servers = manager.has_servers();
let existing_tools = manager.list_all_tools().await;
drop(manager);
self.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
has_servers: existing_has_servers,
tools: existing_tools,
})
.await;
{
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
guard.cancel();
@@ -6173,15 +6254,10 @@ pub(crate) async fn run_turn(
// are normally hidden so we can describe the plugin's currently
// usable capabilities for this turn.
match sess
.services
.mcp_connection_manager
.read()
.await
.list_all_tools()
.or_cancel(&cancellation_token)
.list_model_visible_mcp_catalog_for_turn(&cancellation_token)
.await
{
Ok(mcp_tools) => mcp_tools,
Ok(mcp_catalog) => mcp_catalog.tools,
Err(_) if turn_context.apps_enabled() => return None,
Err(_) => HashMap::new(),
}
@@ -7114,13 +7190,11 @@ pub(crate) async fn built_tools(
skills_outcome: Option<&SkillLoadOutcome>,
cancellation_token: &CancellationToken,
) -> CodexResult<Arc<ToolRouter>> {
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
let has_mcp_servers = mcp_connection_manager.has_servers();
let all_mcp_tools = mcp_connection_manager
.list_all_tools()
.or_cancel(cancellation_token)
let mcp_catalog = sess
.list_model_visible_mcp_catalog_for_turn(cancellation_token)
.await?;
drop(mcp_connection_manager);
let has_mcp_servers = mcp_catalog.has_servers;
let all_mcp_tools = mcp_catalog.tools;
let loaded_plugins = sess
.services
.plugins_manager

View File

@@ -207,7 +207,9 @@ impl Session {
active_segment.get_or_insert_with(ActiveReplaySegment::default);
active_segment.counts_as_user_turn |= is_user_turn_boundary(response_item);
}
RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => {}
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::EventMsg(_)
| RolloutItem::SessionMeta(_) => {}
}
if base_replacement_history.is_some()
@@ -275,6 +277,7 @@ impl Session {
history.drop_last_n_user_turns(rollback.num_turns);
}
RolloutItem::EventMsg(_)
| RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_) => {}
}

View File

@@ -18,6 +18,8 @@ use crate::tools::format_exec_output_str;
use codex_features::Features;
use codex_login::CodexAuth;
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
use codex_mcp::McpToolCatalog;
use codex_mcp::McpToolCatalogUpdate;
use codex_mcp::ToolInfo;
use codex_model_provider_info::ModelProviderInfo;
use codex_models_manager::bundled_models_response;
@@ -945,6 +947,177 @@ fn mcp_tool_exposure_directly_exposes_explicit_apps_in_large_search_sets() {
assert!(deferred_tools.contains_key("mcp__rmcp__tool_0"));
}
#[tokio::test]
async fn built_tools_keeps_advertised_mcp_tools_when_live_manager_is_empty() {
let (session, turn_context, _rx) = make_session_and_context_with_rx().await;
session
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
has_servers: true,
tools: numbered_mcp_tools(/*count*/ 1),
})
.await;
let router = built_tools(
session.as_ref(),
turn_context.as_ref(),
&[],
&HashSet::new(),
/*skills_outcome*/ None,
&CancellationToken::new(),
)
.await
.expect("build tools");
assert!(
router
.model_visible_specs()
.iter()
.any(|spec| spec.name() == "mcp__rmcp__tool_0"),
"previously advertised MCP tool should remain model-visible"
);
}
#[tokio::test]
async fn built_tools_keeps_mcp_resource_tools_when_server_catalog_was_advertised() {
let (session, turn_context, _rx) = make_session_and_context_with_rx().await;
session
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
has_servers: true,
tools: HashMap::new(),
})
.await;
let router = built_tools(
session.as_ref(),
turn_context.as_ref(),
&[],
&HashSet::new(),
/*skills_outcome*/ None,
&CancellationToken::new(),
)
.await
.expect("build tools");
let specs = router.model_visible_specs();
assert!(specs.iter().any(|spec| spec.name() == "list_mcp_resources"));
assert!(
specs
.iter()
.any(|spec| spec.name() == "list_mcp_resource_templates")
);
assert!(specs.iter().any(|spec| spec.name() == "read_mcp_resource"));
}
#[tokio::test]
async fn build_tool_call_routes_advertised_mcp_tool_when_live_manager_is_empty() {
let (session, _turn_context, _rx) = make_session_and_context_with_rx().await;
session
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
has_servers: true,
tools: numbered_mcp_tools(/*count*/ 1),
})
.await;
let call = ToolRouter::build_tool_call(
session.as_ref(),
ResponseItem::FunctionCall {
id: None,
name: "mcp__rmcp__tool_0".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: "call-1".to_string(),
},
)
.await
.expect("build tool call")
.expect("function call should produce a tool call");
match call.payload {
ToolPayload::Mcp {
server,
tool,
raw_arguments,
} => {
assert_eq!(server, "rmcp");
assert_eq!(tool, "tool_0");
assert_eq!(raw_arguments, "{}");
}
other => panic!("expected MCP payload, got {other:?}"),
}
}
#[tokio::test]
async fn resumed_history_hydrates_advertised_mcp_tools() {
let mut catalog = McpToolCatalog::default();
let advertised = catalog
.merge(McpToolCatalogUpdate {
has_servers: true,
tools: numbered_mcp_tools(/*count*/ 1),
})
.snapshot
.to_advertised()
.expect("advertised catalog");
let (session, turn_context, _rx) = make_session_and_context_with_rx().await;
session
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
conversation_id: ThreadId::new(),
history: vec![RolloutItem::AdvertisedMcpTools(advertised)],
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
}))
.await;
let router = built_tools(
session.as_ref(),
turn_context.as_ref(),
&[],
&HashSet::new(),
/*skills_outcome*/ None,
&CancellationToken::new(),
)
.await
.expect("build tools");
assert!(
router
.model_visible_specs()
.iter()
.any(|spec| spec.name() == "mcp__rmcp__tool_0"),
"MCP tools advertised before process exit should be restored on resume"
);
}
#[tokio::test]
async fn advertised_mcp_catalog_updates_are_persisted_to_rollout() {
let (session, _turn_context, _rx) = make_session_and_context_with_rx().await;
let rollout_path = attach_rollout_recorder(&session).await;
session
.merge_advertised_mcp_catalog_for_model(McpToolCatalogUpdate {
has_servers: true,
tools: numbered_mcp_tools(/*count*/ 1),
})
.await;
session.flush_rollout().await.expect("flush rollout");
let InitialHistory::Resumed(resumed) = RolloutRecorder::get_rollout_history(&rollout_path)
.await
.expect("read rollout history")
else {
panic!("expected resumed rollout history");
};
let advertised = resumed.history.iter().find_map(|item| match item {
RolloutItem::AdvertisedMcpTools(catalog) => Some(catalog),
_ => None,
});
assert!(
advertised.is_some_and(|catalog| catalog.tools.contains_key("mcp__rmcp__tool_0")),
"advertised MCP catalog should be persisted for process resume"
);
}
#[tokio::test]
async fn reconstruct_history_matches_live_compactions() {
let (session, turn_context) = make_session_and_context().await;

View File

@@ -104,19 +104,6 @@ pub async fn list_accessible_connectors_from_mcp_tools(
)
}
pub(crate) async fn list_accessible_and_enabled_connectors_from_manager(
mcp_connection_manager: &McpConnectionManager,
config: &Config,
) -> Vec<AppInfo> {
with_app_enabled_state(
accessible_connectors_from_mcp_tools(&mcp_connection_manager.list_all_tools().await),
config,
)
.into_iter()
.filter(|connector| connector.is_accessible && connector.is_enabled)
.collect()
}
pub(crate) async fn list_tool_suggest_discoverable_tools_with_auth(
config: &Config,
auth: Option<&CodexAuth>,

View File

@@ -10,6 +10,10 @@ use crate::codex::PreviousTurnSettings;
use crate::codex::SessionConfiguration;
use crate::context_manager::ContextManager;
use crate::session_startup_prewarm::SessionStartupPrewarmHandle;
use codex_mcp::McpToolCatalog;
use codex_mcp::McpToolCatalogMerge;
use codex_mcp::McpToolCatalogUpdate;
use codex_mcp::ToolInfo;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
@@ -28,6 +32,7 @@ pub(crate) struct SessionState {
/// model/realtime handling on subsequent regular turns (including full-context
/// reinjection after resume or `/compact`).
previous_turn_settings: Option<PreviousTurnSettings>,
advertised_mcp_tools: McpToolCatalog,
/// Startup prewarmed session prepared during session initialization.
pub(crate) startup_prewarm: Option<SessionStartupPrewarmHandle>,
pub(crate) active_connector_selection: HashSet<String>,
@@ -48,6 +53,7 @@ impl SessionState {
dependency_env: HashMap::new(),
mcp_dependency_prompted: HashSet::new(),
previous_turn_settings: None,
advertised_mcp_tools: McpToolCatalog::default(),
startup_prewarm: None,
active_connector_selection: HashSet::new(),
pending_session_start_source: None,
@@ -75,6 +81,21 @@ impl SessionState {
self.previous_turn_settings = previous_turn_settings;
}
pub(crate) fn merge_advertised_mcp_tools(
&mut self,
update: McpToolCatalogUpdate,
) -> McpToolCatalogMerge {
self.advertised_mcp_tools.merge(update)
}
pub(crate) fn resolve_advertised_mcp_tool(
&self,
name: &str,
namespace: Option<&str>,
) -> Option<ToolInfo> {
self.advertised_mcp_tools.resolve(name, namespace)
}
pub(crate) fn set_next_turn_is_first(&mut self, value: bool) {
self.next_turn_is_first = value;
}

View File

@@ -3,9 +3,12 @@
//!
//! We intentionally keep these types TS/JSON-schema friendly (via `ts-rs` and
//! `schemars`) so they can be embedded in Codex's own protocol structures.
use std::collections::HashMap;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use ts_rs::TS;
/// ID of a request, which can be either a string or an integer.
@@ -52,6 +55,43 @@ pub struct Tool {
pub meta: Option<serde_json::Value>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct AdvertisedMcpToolCatalog {
pub has_servers: bool,
pub tools: HashMap<String, AdvertisedMcpToolInfo>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct AdvertisedMcpToolInfo {
/// Raw MCP server name used for routing the tool call.
pub server_name: String,
/// Model-visible tool name used in Responses API tool declarations.
pub callable_name: String,
/// Model-visible namespace used for deferred tool loading.
pub callable_namespace: String,
/// Instructions from the MCP server initialize result.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub server_instructions: Option<String>,
/// Serialized raw MCP tool definition used to rebuild model-visible schemas on resume.
pub tool: Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub connector_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub connector_name: Option<String>,
#[serde(default)]
pub plugin_display_names: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub connector_description: Option<String>,
}
/// A known resource that the server is capable of reading.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]

View File

@@ -28,6 +28,7 @@ use crate::dynamic_tools::DynamicToolCallRequest;
use crate::dynamic_tools::DynamicToolResponse;
use crate::dynamic_tools::DynamicToolSpec;
use crate::items::TurnItem;
use crate::mcp::AdvertisedMcpToolCatalog;
use crate::mcp::CallToolResult;
use crate::mcp::RequestId;
use crate::mcp::Resource as McpResource;
@@ -2474,12 +2475,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.forked_from_id,
_ => None,
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.id),
_ => None,
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -2532,12 +2541,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
_ => None,
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.base_instructions.clone(),
_ => None,
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -2548,12 +2565,20 @@ impl InitialHistory {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
_ => None,
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
InitialHistory::Forked(items) => items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.dynamic_tools.clone(),
_ => None,
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
}
}
@@ -2562,7 +2587,11 @@ impl InitialHistory {
fn session_cwd_from_items(items: &[RolloutItem]) -> Option<PathBuf> {
items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.cwd.clone()),
_ => None,
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
@@ -2782,6 +2811,7 @@ pub struct SessionMetaLine {
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
pub enum RolloutItem {
SessionMeta(SessionMetaLine),
AdvertisedMcpTools(AdvertisedMcpToolCatalog),
ResponseItem(ResponseItem),
Compacted(CompactedItem),
TurnContext(TurnContextItem),

View File

@@ -1065,6 +1065,9 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
RolloutItem::Compacted(_) => {
// Not included in `head`; skip.
}
RolloutItem::AdvertisedMcpTools(_) => {
// Not included in `head`; skip.
}
RolloutItem::EventMsg(ev) => {
if let EventMsg::UserMessage(user) = ev {
summary.saw_user_event = true;
@@ -1117,6 +1120,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Va
}
}
RolloutItem::Compacted(_)
| RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => {}
}

View File

@@ -70,7 +70,8 @@ pub fn builder_from_items(
) -> Option<ThreadMetadataBuilder> {
if let Some(session_meta) = items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line),
RolloutItem::ResponseItem(_)
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
@@ -124,7 +125,8 @@ pub async fn extract_metadata_from_rollout(
metadata,
memory_mode: items.iter().rev().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,

View File

@@ -16,9 +16,10 @@ pub fn is_persisted_response_item(item: &RolloutItem, mode: EventPersistenceMode
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode),
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
true
}
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_) => true,
}
}

View File

@@ -686,6 +686,9 @@ impl RolloutRecorder {
RolloutItem::TurnContext(item) => {
items.push(RolloutItem::TurnContext(item));
}
RolloutItem::AdvertisedMcpTools(item) => {
items.push(RolloutItem::AdvertisedMcpTools(item));
}
RolloutItem::EventMsg(_ev) => {
items.push(RolloutItem::EventMsg(_ev));
}
@@ -1303,6 +1306,7 @@ async fn resume_candidate_matches_cwd(
&& let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item {
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()),
RolloutItem::SessionMeta(_)
| RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::EventMsg(_) => None,

View File

@@ -22,7 +22,7 @@ pub fn apply_rollout_item(
RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx),
RolloutItem::EventMsg(event) => apply_event_msg(metadata, event),
RolloutItem::ResponseItem(item) => apply_response_item(metadata, item),
RolloutItem::Compacted(_) => {}
RolloutItem::AdvertisedMcpTools(_) | RolloutItem::Compacted(_) => {}
}
if metadata.model_provider.is_empty() {
metadata.model_provider = default_provider.to_string();
@@ -36,9 +36,10 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
RolloutItem::EventMsg(
EventMsg::TokenCount(_) | EventMsg::UserMessage(_) | EventMsg::ThreadNameUpdated(_),
) => true,
RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) => {
false
}
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_) => false,
}
}

View File

@@ -954,7 +954,8 @@ SELECT
pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<DynamicToolSpec>>> {
items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
RolloutItem::ResponseItem(_)
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
@@ -964,7 +965,8 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<
pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
items.iter().rev().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
RolloutItem::AdvertisedMcpTools(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,