Compare commits

...

9 Commits

Author SHA1 Message Date
Sama Setty
c4d81e84a2 Merge remote-tracking branch 'origin/main' into dev/ssetty/explicit-plugin-app-startup 2026-04-23 10:52:36 -07:00
Sama Setty
6d80f40ded codex: fix tui thread session state CI failures (#17679) 2026-04-22 23:48:41 -07:00
Sama Setty
f8b5041508 Merge remote-tracking branch 'origin/main' into dev/ssetty/explicit-plugin-app-startup 2026-04-22 23:06:37 -07:00
Sama Setty
09f299924a Merge remote-tracking branch 'origin/main' into dev/ssetty/explicit-plugin-app-startup
# Conflicts:
#	codex-rs/core/src/session/tests.rs
2026-04-17 14:29:09 -07:00
Sama Setty
e4cf72bf96 codex: fix CI failure on PR #17679 2026-04-17 10:40:25 -07:00
Sama Setty
8623dfb4e4 Merge origin/main into dev/ssetty/explicit-plugin-app-startup 2026-04-16 20:15:34 -07:00
Sama Setty
3581373031 codex: expose app tools for explicit plugin mentions 2026-04-13 13:01:22 -07:00
Sama Setty
efba567e6b changes 2026-04-13 12:43:39 -07:00
Sama Setty
db19109589 Wait for explicit plugin app tools 2026-04-13 12:38:34 -07:00
5 changed files with 349 additions and 12 deletions

View File

@@ -202,6 +202,8 @@ use self::turn::AssistantMessageStreamParsers;
#[cfg(test)]
use self::turn::collect_explicit_app_ids_from_skill_items;
#[cfg(test)]
use self::turn::explicitly_enabled_connectors_missing_from_tools;
#[cfg(test)]
use self::turn::filter_connectors_for_input;
use self::turn::realtime_text_for_event;
use self::turn_context::TurnContext;

View File

@@ -539,6 +539,86 @@ fn assistant_message_stream_parsers_seed_plan_parser_across_added_and_delta_boun
assert!(tail.plan_segments.is_empty());
}
fn make_mcp_tool(
server_name: &str,
tool_name: &str,
connector_id: Option<&str>,
connector_name: Option<&str>,
) -> ToolInfo {
let tool_namespace = if server_name == codex_mcp::CODEX_APPS_MCP_SERVER_NAME {
connector_name
.map(codex_connectors::metadata::sanitize_name)
.map(|connector_name| format!("mcp__{server_name}__{connector_name}"))
.unwrap_or_else(|| server_name.to_string())
} else {
format!("mcp__{server_name}__")
};
ToolInfo {
server_name: server_name.to_string(),
callable_name: tool_name.to_string(),
callable_namespace: tool_namespace,
server_instructions: None,
tool: rmcp::model::Tool {
name: tool_name.to_string().into(),
title: None,
description: Some(format!("Test tool: {tool_name}").into()),
input_schema: Arc::new(rmcp::model::JsonObject::default()),
output_schema: None,
annotations: None,
execution: None,
icons: None,
meta: None,
},
connector_id: connector_id.map(str::to_string),
connector_name: connector_name.map(str::to_string),
plugin_display_names: Vec::new(),
connector_description: None,
}
}
#[test]
fn explicit_connectors_missing_from_tools_requests_startup_wait() {
let connector_ids = HashSet::from(["slack".to_string()]);
assert!(explicitly_enabled_connectors_missing_from_tools(
&connector_ids,
&HashMap::new(),
));
}
#[test]
fn explicit_connectors_present_in_tools_skip_startup_wait() {
let connector_ids = HashSet::from(["slack".to_string()]);
let mcp_tools = HashMap::from([(
"mcp__codex_apps__slack_search".to_string(),
make_mcp_tool(
codex_mcp::CODEX_APPS_MCP_SERVER_NAME,
"slack_search",
Some("slack"),
Some("Slack"),
),
)]);
assert!(!explicitly_enabled_connectors_missing_from_tools(
&connector_ids,
&mcp_tools,
));
}
#[test]
fn explicit_connectors_ignore_non_app_tool_matches() {
let connector_ids = HashSet::from(["slack".to_string()]);
let mcp_tools = HashMap::from([(
"mcp__rmcp__slack_search".to_string(),
make_mcp_tool("rmcp", "slack_search", Some("slack"), Some("Slack")),
)]);
assert!(explicitly_enabled_connectors_missing_from_tools(
&connector_ids,
&mcp_tools,
));
}
#[test]
fn validated_network_policy_amendment_host_allows_normalized_match() {
let amendment = NetworkPolicyAmendment {

View File

@@ -1,8 +1,3 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use crate::SkillInjections;
use crate::SkillLoadOutcome;
use crate::build_skill_injections;
@@ -71,6 +66,8 @@ use codex_hooks::HookEvent;
use codex_hooks::HookEventAfterAgent;
use codex_hooks::HookPayload;
use codex_hooks::HookResult;
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
use codex_mcp::ToolInfo;
use codex_protocol::config_types::ModeKind;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
@@ -107,6 +104,11 @@ use codex_utils_stream_parser::strip_citations;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tracing::error;
@@ -117,6 +119,26 @@ use tracing::trace;
use tracing::trace_span;
use tracing::warn;
const EXPLICIT_APPS_READY_TIMEOUT: Duration = Duration::from_secs(3);
// Explicit plugin mentions imply app usage even when the user did not
// mention the app directly. If those connectors are still missing from the
// current `codex_apps` snapshot, give startup a bounded chance to finish.
#[cfg_attr(test, allow(dead_code))]
pub(super) fn explicitly_enabled_connectors_missing_from_tools(
connector_ids: &HashSet<String>,
mcp_tools: &HashMap<String, ToolInfo>,
) -> bool {
let accessible_connector_ids = connectors::accessible_connectors_from_mcp_tools(mcp_tools)
.into_iter()
.map(|connector| connector.id)
.collect::<HashSet<_>>();
connector_ids
.iter()
.any(|connector_id| !accessible_connector_ids.contains(connector_id))
}
/// Takes a user message as input and runs a loop where, at each sampling request, the model
/// replies with either:
///
@@ -178,15 +200,24 @@ pub(crate) async fn run_turn(
// enabled plugins, then converted into turn-scoped guidance below.
let mentioned_plugins =
collect_explicit_plugin_mentions(&input, loaded_plugins.capability_summaries());
let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input);
if turn_context.apps_enabled() {
// Treat app connectors declared by explicit plugin mentions as
// explicit for this turn too. That lets them participate in both
// startup waiting and first-turn tool exposure.
explicitly_enabled_connectors.extend(mentioned_plugins.iter().flat_map(|plugin| {
plugin
.app_connector_ids
.iter()
.map(|connector_id| connector_id.0.clone())
}));
}
let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() {
// Plugin mentions need raw MCP/app inventory even when app tools
// are normally hidden so we can describe the plugin's currently
// usable capabilities for this turn.
match sess
.services
.mcp_connection_manager
.read()
.await
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
let mut mcp_tools = match mcp_connection_manager
.list_all_tools()
.or_cancel(&cancellation_token)
.await
@@ -194,7 +225,37 @@ pub(crate) async fn run_turn(
Ok(mcp_tools) => mcp_tools,
Err(_) if turn_context.apps_enabled() => return None,
Err(_) => HashMap::new(),
};
if turn_context.apps_enabled()
&& !explicitly_enabled_connectors.is_empty()
&& explicitly_enabled_connectors_missing_from_tools(
&explicitly_enabled_connectors,
&mcp_tools,
)
{
// The caller explicitly asked for one of these app-backed surfaces,
// but the first snapshot still does not expose it, so wait
// briefly and then rebuild the tool view for this turn.
let codex_apps_ready = match mcp_connection_manager
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, EXPLICIT_APPS_READY_TIMEOUT)
.or_cancel(&cancellation_token)
.await
{
Ok(codex_apps_ready) => codex_apps_ready,
Err(_) => return None,
};
if codex_apps_ready {
mcp_tools = match mcp_connection_manager
.list_all_tools()
.or_cancel(&cancellation_token)
.await
{
Ok(mcp_tools) => mcp_tools,
Err(_) => return None,
};
}
}
mcp_tools
} else {
HashMap::new()
};
@@ -277,7 +338,6 @@ pub(crate) async fn run_turn(
.filter_map(crate::plugins::PluginCapabilitySummary::telemetry_metadata)
.collect::<Vec<_>>();
let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input);
explicitly_enabled_connectors.extend(collect_explicit_app_ids_from_skill_items(
&skill_items,
&available_connectors,

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use serde_json::Value;
use serde_json::json;
use std::time::Duration;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
@@ -38,6 +39,13 @@ impl AppsTestServer {
}
pub async fn mount_searchable(server: &MockServer) -> Result<Self> {
Self::mount_searchable_with_tools_list_delay(server, Duration::ZERO).await
}
pub async fn mount_searchable_with_tools_list_delay(
server: &MockServer,
tools_list_delay: Duration,
) -> Result<Self> {
mount_oauth_metadata(server).await;
mount_connectors_directory(server).await;
mount_streamable_http_json_rpc(
@@ -45,6 +53,7 @@ impl AppsTestServer {
CONNECTOR_NAME.to_string(),
CONNECTOR_DESCRIPTION.to_string(),
/*searchable*/ true,
tools_list_delay,
)
.await;
Ok(Self {
@@ -55,6 +64,15 @@ impl AppsTestServer {
pub async fn mount_with_connector_name(
server: &MockServer,
connector_name: &str,
) -> Result<Self> {
Self::mount_with_connector_name_and_tools_list_delay(server, connector_name, Duration::ZERO)
.await
}
pub async fn mount_with_connector_name_and_tools_list_delay(
server: &MockServer,
connector_name: &str,
tools_list_delay: Duration,
) -> Result<Self> {
mount_oauth_metadata(server).await;
mount_connectors_directory(server).await;
@@ -63,6 +81,7 @@ impl AppsTestServer {
connector_name.to_string(),
CONNECTOR_DESCRIPTION.to_string(),
/*searchable*/ false,
tools_list_delay,
)
.await;
Ok(Self {
@@ -119,6 +138,7 @@ async fn mount_streamable_http_json_rpc(
connector_name: String,
connector_description: String,
searchable: bool,
tools_list_delay: Duration,
) {
Mock::given(method("POST"))
.and(path_regex("^/api/codex/apps/?$"))
@@ -126,6 +146,7 @@ async fn mount_streamable_http_json_rpc(
connector_name,
connector_description,
searchable,
tools_list_delay,
})
.mount(server)
.await;
@@ -135,6 +156,7 @@ struct CodexAppsJsonRpcResponder {
connector_name: String,
connector_description: String,
searchable: bool,
tools_list_delay: Duration,
}
impl Respond for CodexAppsJsonRpcResponder {
@@ -305,7 +327,9 @@ impl Respond for CodexAppsJsonRpcResponder {
}));
}
}
ResponseTemplate::new(200).set_body_json(response)
ResponseTemplate::new(200)
.set_delay(self.tools_list_delay)
.set_body_json(response)
}
"tools/call" => {
let id = body.get("id").cloned().unwrap_or(Value::Null);

View File

@@ -8,6 +8,8 @@ use std::time::Instant;
use anyhow::Result;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_mcp::codex_apps_tools_cache_key;
use codex_models_manager::bundled_models_response;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use core_test_support::apps_test_server::AppsTestServer;
@@ -21,6 +23,8 @@ use core_test_support::stdio_server_bin;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use sha1::Digest;
use sha1::Sha1;
use tempfile::TempDir;
use wiremock::MockServer;
@@ -334,6 +338,173 @@ async fn explicit_plugin_mentions_inject_plugin_guidance() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn explicit_plugin_mentions_wait_for_plugin_apps_to_finish_starting() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let apps_server = AppsTestServer::mount_with_connector_name_and_tools_list_delay(
&server,
"Google Calendar",
Duration::from_secs(1),
)
.await?;
let mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let codex_home = Arc::new(TempDir::new()?);
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
let user_key = codex_apps_tools_cache_key(Some(&auth));
let user_key_json = serde_json::to_string(&user_key)?;
let mut hasher = Sha1::new();
hasher.update(user_key_json.as_bytes());
let cache_path = codex_home
.path()
.join("cache/codex_apps_tools")
.join(format!("{:x}.json", hasher.finalize()));
std::fs::create_dir_all(
cache_path
.parent()
.expect("codex apps tools cache path should have a parent"),
)?;
std::fs::write(
&cache_path,
serde_json::to_vec_pretty(&serde_json::json!({
"schema_version": 2,
"tools": [],
}))?,
)?;
write_plugin_app_plugin(codex_home.as_ref());
let mut builder = test_codex()
.with_home(codex_home)
.with_auth(auth)
.with_config(move |config| {
config
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_server.chatgpt_base_url;
});
let codex = builder.build(&server).await?.codex;
codex
.submit(Op::UserInput {
environments: None,
items: vec![codex_protocol::user_input::UserInput::Mention {
name: "sample".into(),
path: format!("plugin://{SAMPLE_PLUGIN_CONFIG_NAME}"),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let request = mock.single_request();
let developer_messages = request.message_input_texts("developer");
assert!(
developer_messages
.iter()
.any(|text| text.contains("Apps from this plugin")),
"expected plugin app guidance after waiting for codex apps startup: {developer_messages:?}"
);
let request_tools = tool_names(&request.body_json());
assert!(
request_tools
.iter()
.any(|name| name == "mcp__codex_apps__google_calendar"),
"expected plugin app namespace after waiting for codex apps startup: {request_tools:?}"
);
assert!(
request
.tool_by_name("mcp__codex_apps__google_calendar", "_create_event")
.is_some(),
"expected plugin app create tool after waiting for codex apps startup: {request_tools:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn explicit_plugin_mentions_directly_expose_plugin_apps_with_tool_search() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let apps_server = AppsTestServer::mount_searchable(&server).await?;
let mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let codex_home = Arc::new(TempDir::new()?);
write_plugin_app_plugin(codex_home.as_ref());
let mut builder = test_codex()
.with_home(codex_home)
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_model("gpt-5.4")
.with_config(move |config| {
config
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.enable(Feature::ToolSearch)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_server.chatgpt_base_url;
config.model = Some("gpt-5.4".to_string());
let mut model_catalog = bundled_models_response()
.unwrap_or_else(|err| panic!("bundled models.json should parse: {err}"));
let model = model_catalog
.models
.iter_mut()
.find(|model| model.slug == "gpt-5.4")
.expect("gpt-5.4 exists in bundled models.json");
model.supports_search_tool = true;
config.model_catalog = Some(model_catalog);
});
let codex = builder.build(&server).await?.codex;
codex
.submit(Op::UserInput {
environments: None,
items: vec![codex_protocol::user_input::UserInput::Mention {
name: "sample".into(),
path: format!("plugin://{SAMPLE_PLUGIN_CONFIG_NAME}"),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let request_tools = tool_names(&mock.single_request().body_json());
assert!(
!request_tools.iter().any(|name| name == "tool_search"),
"expected explicit plugin mention to bypass tool_search and expose its app tools directly: {request_tools:?}"
);
assert!(
request_tools
.iter()
.any(|name| name == "mcp__codex_apps__calendar"),
"expected explicit plugin mention to directly expose its app namespace: {request_tools:?}"
);
assert!(
mock.single_request()
.tool_by_name("mcp__codex_apps__calendar", "_create_event")
.is_some(),
"expected explicit plugin mention to directly expose its app create tool: {request_tools:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn explicit_plugin_mentions_track_plugin_used_analytics() -> Result<()> {
skip_if_no_network!(Ok(()));