Compare commits

...

3 Commits

Author SHA1 Message Date
pakrym-oai
a797d50b66 Move plugin install suggest behind extensibility 2026-05-06 12:58:53 -07:00
pakrym-oai
7c1a3455fa Move plugin install suggestion handler to app-server 2026-05-06 11:53:26 -07:00
pakrym-oai
39c707410b Add core extensibility registry 2026-05-06 11:37:38 -07:00
39 changed files with 1003 additions and 712 deletions

View File

@@ -90,6 +90,7 @@ mod mcp_refresh;
mod message_processor;
mod models;
mod outgoing_message;
mod plugin_install_suggest;
mod request_processors;
mod request_serialization;
mod server_request_error;

View File

@@ -180,17 +180,17 @@ mod tests {
.expect("refresh tests require state db");
let thread_store = thread_store_from_config(&good_config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_manager = Arc::new(ThreadManager::new(
let thread_manager = Arc::new(ThreadManager::builder(
&good_config,
auth_manager,
SessionSource::Exec,
Arc::new(EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
"11111111-1111-4111-8111-111111111111".to_string(),
));
)
.session_source(SessionSource::Exec)
.build());
thread_manager.start_thread(good_config).await?;
thread_manager.start_thread(bad_config).await?;

View File

@@ -295,17 +295,27 @@ impl MessageProcessor {
// resumed, or forked threads to a different persistence backend/root.
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_manager = Arc::new(ThreadManager::new(
let plugin_install_suggest_client_names:
crate::plugin_install_suggest::AppServerClientNames =
Arc::new(std::sync::RwLock::new(Default::default()));
let thread_manager = Arc::new(ThreadManager::builder(
config.as_ref(),
auth_manager.clone(),
session_source,
environment_manager,
Some(analytics_events_client.clone()),
state_db.clone(),
Arc::clone(&thread_store),
agent_graph_store.clone(),
installation_id,
));
)
.register_extension(Arc::new(
crate::plugin_install_suggest::PluginInstallSuggestToolProvider::new(
auth_manager.clone(),
Arc::clone(&plugin_install_suggest_client_names),
),
) as Arc<dyn codex_core::ToolProvider>)
.session_source(session_source)
.analytics_events_client(analytics_events_client.clone())
.build());
thread_manager
.plugins_manager()
.set_analytics_events_client(analytics_events_client.clone());
@@ -402,6 +412,7 @@ impl MessageProcessor {
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
Some(state_db.clone()),
Arc::clone(&plugin_install_suggest_client_names),
);
let turn_processor = TurnRequestProcessor::new(
auth_manager.clone(),
@@ -415,6 +426,7 @@ impl MessageProcessor {
thread_state_manager,
thread_watch_manager,
thread_list_state_permit,
plugin_install_suggest_client_names,
);
if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) {
// Keep plugin startup warmups aligned at app-server startup.

View File

@@ -0,0 +1,397 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::RwLock;
use codex_app_server_protocol::AppInfo;
use codex_config::types::ToolSuggestDisabledTool;
use codex_core::config::Config;
use codex_core::config::edit::ConfigEdit;
use codex_core::config::edit::ConfigEditsBuilder;
use codex_core::connectors;
use codex_core::extensibility::AnyToolHandler;
use codex_core::extensibility::FunctionCallError;
use codex_core::extensibility::FunctionToolOutput;
use codex_core::extensibility::ToolHandler;
use codex_core::extensibility::ToolInvocation;
use codex_core::extensibility::ToolKind;
use codex_core::extensibility::ToolProvider;
use codex_core::extensibility::ToolProviderContext;
use codex_core_plugins::PluginsManager;
use codex_features::Feature;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
use codex_protocol::ThreadId;
use codex_rmcp_client::ElicitationAction;
use codex_rmcp_client::ElicitationResponse;
use codex_tools::DiscoverableTool;
use codex_tools::DiscoverableToolAction;
use codex_tools::REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE;
use codex_tools::REQUEST_PLUGIN_INSTALL_PERSIST_KEY;
use codex_tools::REQUEST_PLUGIN_INSTALL_TOOL_NAME;
use codex_tools::RequestPluginInstallArgs;
use codex_tools::RequestPluginInstallResult;
use codex_tools::ToolName;
use codex_tools::all_requested_connectors_picked_up;
use codex_tools::build_request_plugin_install_elicitation_request;
use codex_tools::verified_connector_install_completed;
use codex_utils_absolute_path::AbsolutePathBuf;
use serde_json::Value;
use tracing::warn;
pub(crate) type AppServerClientNames = Arc<RwLock<HashMap<String, String>>>;
pub(crate) fn set_app_server_client_name(
app_server_client_names: &AppServerClientNames,
thread_id: ThreadId,
app_server_client_name: Option<String>,
) {
let Ok(mut app_server_client_names) = app_server_client_names.write() else {
return;
};
let thread_id = thread_id.to_string();
match app_server_client_name {
Some(app_server_client_name) => {
app_server_client_names.insert(thread_id, app_server_client_name);
}
None => {
app_server_client_names.remove(&thread_id);
}
}
}
pub(crate) struct PluginInstallSuggestToolProvider {
auth_manager: Arc<AuthManager>,
app_server_client_names: AppServerClientNames,
}
impl PluginInstallSuggestToolProvider {
pub(crate) fn new(
auth_manager: Arc<AuthManager>,
app_server_client_names: AppServerClientNames,
) -> Self {
Self {
auth_manager,
app_server_client_names,
}
}
}
impl ToolProvider for PluginInstallSuggestToolProvider {
fn handlers(&self, context: ToolProviderContext) -> Vec<Arc<dyn AnyToolHandler>> {
let config = context.config();
let conversation_id = context.conversation_id();
let features = config.features.get();
if !features.enabled(Feature::ToolSuggest)
|| !features.enabled(Feature::Apps)
|| !features.enabled(Feature::Plugins)
{
return Vec::new();
}
let Ok(app_server_client_names) = self.app_server_client_names.read() else {
return Vec::new();
};
if app_server_client_names
.get(&conversation_id)
.is_some_and(|client_name| client_name == "codex-tui")
{
return Vec::new();
}
drop(app_server_client_names);
vec![Arc::new(RequestPluginInstallHandler {
config,
auth_manager: Arc::clone(&self.auth_manager),
plugins_manager: context.plugins_manager(),
conversation_id,
turn_id: context.turn_id(),
})]
}
}
struct RequestPluginInstallHandler {
config: Arc<Config>,
auth_manager: Arc<AuthManager>,
plugins_manager: Arc<PluginsManager>,
conversation_id: String,
turn_id: String,
}
impl ToolHandler for RequestPluginInstallHandler {
type Output = FunctionToolOutput;
fn tool_name(&self) -> ToolName {
ToolName::plain(REQUEST_PLUGIN_INSTALL_TOOL_NAME)
}
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
self.handle_request_plugin_install(invocation)
}
}
impl RequestPluginInstallHandler {
async fn handle_request_plugin_install(
&self,
invocation: ToolInvocation,
) -> Result<FunctionToolOutput, FunctionCallError> {
let arguments = invocation.function_arguments(REQUEST_PLUGIN_INSTALL_TOOL_NAME)?;
let args: RequestPluginInstallArgs = serde_json::from_str(arguments).map_err(|err| {
FunctionCallError::RespondToModel(format!("failed to parse function arguments: {err}"))
})?;
let suggest_reason = args.suggest_reason.trim();
if suggest_reason.is_empty() {
return Err(FunctionCallError::RespondToModel(
"suggest_reason must not be empty".to_string(),
));
}
if args.action_type != DiscoverableToolAction::Install {
return Err(FunctionCallError::RespondToModel(
"plugin install requests currently support only action_type=\"install\""
.to_string(),
));
}
let auth = self.auth_manager.auth().await;
let mcp_tools = invocation.list_mcp_tools().await;
let accessible_connectors = connectors::with_app_enabled_state(
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
self.config.as_ref(),
);
let discoverable_tools = connectors::list_tool_suggest_discoverable_tools_with_auth(
self.config.as_ref(),
auth.as_ref(),
&accessible_connectors,
)
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"plugin install requests are unavailable right now: {err}"
))
})?;
let tool = discoverable_tools
.into_iter()
.find(|tool| tool.tool_type() == args.tool_type && tool.id() == args.tool_id)
.ok_or_else(|| {
FunctionCallError::RespondToModel(format!(
"tool_id must match one of the discoverable tools exposed by {REQUEST_PLUGIN_INSTALL_TOOL_NAME}"
))
})?;
let params = build_request_plugin_install_elicitation_request(
CODEX_APPS_MCP_SERVER_NAME,
self.conversation_id.clone(),
self.turn_id.clone(),
&args,
suggest_reason,
&tool,
);
let request_id = format!("request_plugin_install_{}", invocation.call_id());
let response = invocation
.request_mcp_server_elicitation(request_id, params)
.await;
if let Some(response) = response.as_ref()
&& maybe_persist_disabled_install_request(&self.config.codex_home, &tool, response)
.await
{
invocation.reload_user_config_layer().await;
}
let user_confirmed = response
.as_ref()
.is_some_and(|response| response.action == ElicitationAction::Accept);
let completed = if user_confirmed {
self.verify_request_plugin_install_completed(&invocation, &tool, auth.as_ref())
.await
} else {
false
};
if completed && let DiscoverableTool::Connector(connector) = &tool {
invocation
.merge_connector_selection(HashSet::from([connector.id.clone()]))
.await;
}
let content = serde_json::to_string(&RequestPluginInstallResult {
completed,
user_confirmed,
tool_type: args.tool_type,
action_type: args.action_type,
tool_id: tool.id().to_string(),
tool_name: tool.name().to_string(),
suggest_reason: suggest_reason.to_string(),
})
.map_err(|err| {
FunctionCallError::Fatal(format!(
"failed to serialize {REQUEST_PLUGIN_INSTALL_TOOL_NAME} response: {err}"
))
})?;
Ok(FunctionToolOutput::from_text(content, Some(true)))
}
async fn verify_request_plugin_install_completed(
&self,
invocation: &ToolInvocation,
tool: &DiscoverableTool,
auth: Option<&CodexAuth>,
) -> bool {
match tool {
DiscoverableTool::Connector(connector) => self
.refresh_missing_requested_connectors(
invocation,
auth,
std::slice::from_ref(&connector.id),
connector.id.as_str(),
)
.await
.is_some_and(|accessible_connectors| {
verified_connector_install_completed(
connector.id.as_str(),
&accessible_connectors,
)
}),
DiscoverableTool::Plugin(plugin) => {
invocation.reload_user_config_layer().await;
let completed = verified_plugin_install_completed(
plugin.id.as_str(),
self.config.as_ref(),
self.plugins_manager.as_ref(),
);
let _ = self
.refresh_missing_requested_connectors(
invocation,
auth,
&plugin.app_connector_ids,
plugin.id.as_str(),
)
.await;
completed
}
}
}
async fn refresh_missing_requested_connectors(
&self,
invocation: &ToolInvocation,
auth: Option<&CodexAuth>,
expected_connector_ids: &[String],
tool_id: &str,
) -> Option<Vec<AppInfo>> {
if expected_connector_ids.is_empty() {
return Some(Vec::new());
}
let mcp_tools = invocation.list_mcp_tools().await;
let accessible_connectors = connectors::with_app_enabled_state(
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
self.config.as_ref(),
);
if all_requested_connectors_picked_up(expected_connector_ids, &accessible_connectors) {
return Some(accessible_connectors);
}
match invocation.hard_refresh_codex_apps_tools_cache().await {
Ok(mcp_tools) => {
let accessible_connectors = connectors::with_app_enabled_state(
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
self.config.as_ref(),
);
connectors::refresh_accessible_connectors_cache_from_mcp_tools(
self.config.as_ref(),
auth,
&mcp_tools,
);
Some(accessible_connectors)
}
Err(err) => {
warn!(
"failed to refresh codex apps tools cache after plugin install request for {tool_id}: {err:#}"
);
None
}
}
}
}
async fn maybe_persist_disabled_install_request(
codex_home: &AbsolutePathBuf,
tool: &DiscoverableTool,
response: &ElicitationResponse,
) -> bool {
if !request_plugin_install_response_requests_persistent_disable(response) {
return false;
}
if let Err(err) = persist_disabled_install_request(codex_home, tool).await {
warn!(
error = %err,
tool_id = tool.id(),
"failed to persist disabled tool suggestion"
);
return false;
}
true
}
fn request_plugin_install_response_requests_persistent_disable(
response: &ElicitationResponse,
) -> bool {
if response.action != ElicitationAction::Decline {
return false;
}
response
.meta
.as_ref()
.and_then(Value::as_object)
.and_then(|meta| meta.get(REQUEST_PLUGIN_INSTALL_PERSIST_KEY))
.and_then(Value::as_str)
== Some(REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE)
}
async fn persist_disabled_install_request(
codex_home: &AbsolutePathBuf,
tool: &DiscoverableTool,
) -> anyhow::Result<()> {
ConfigEditsBuilder::new(codex_home)
.with_edits([ConfigEdit::AddToolSuggestDisabledTool(
disabled_install_request(tool),
)])
.apply()
.await
}
fn disabled_install_request(tool: &DiscoverableTool) -> ToolSuggestDisabledTool {
match tool {
DiscoverableTool::Connector(connector) => {
ToolSuggestDisabledTool::connector(connector.id.as_str())
}
DiscoverableTool::Plugin(plugin) => ToolSuggestDisabledTool::plugin(plugin.id.as_str()),
}
}
fn verified_plugin_install_completed(
tool_id: &str,
config: &Config,
plugins_manager: &PluginsManager,
) -> bool {
let plugins_input = config.plugins_config_input();
plugins_manager
.list_marketplaces_for_config(&plugins_input, &[])
.ok()
.into_iter()
.flat_map(|outcome| outcome.marketplaces)
.flat_map(|marketplace| marketplace.plugins.into_iter())
.any(|plugin| plugin.id == tool_id && plugin.installed)
}

View File

@@ -316,6 +316,8 @@ pub(crate) struct ThreadRequestProcessor {
pub(super) thread_list_state_permit: Arc<Semaphore>,
pub(super) thread_goal_processor: ThreadGoalRequestProcessor,
pub(super) state_db: Option<StateDbHandle>,
pub(super) plugin_install_suggest_client_names:
crate::plugin_install_suggest::AppServerClientNames,
pub(super) background_tasks: TaskTracker,
}
@@ -336,6 +338,7 @@ impl ThreadRequestProcessor {
thread_list_state_permit: Arc<Semaphore>,
thread_goal_processor: ThreadGoalRequestProcessor,
state_db: Option<StateDbHandle>,
plugin_install_suggest_client_names: crate::plugin_install_suggest::AppServerClientNames,
) -> Self {
Self {
auth_manager,
@@ -352,6 +355,7 @@ impl ThreadRequestProcessor {
thread_list_state_permit,
thread_goal_processor,
state_db,
plugin_install_suggest_client_names,
background_tasks: TaskTracker::new(),
}
}
@@ -657,10 +661,12 @@ impl ThreadRequestProcessor {
}
async fn set_app_server_client_info(
plugin_install_suggest_client_names: &crate::plugin_install_suggest::AppServerClientNames,
thread: &CodexThread,
app_server_client_name: Option<String>,
app_server_client_version: Option<String>,
) -> Result<(), JSONRPCErrorError> {
let client_name_for_tool_provider = app_server_client_name.clone();
let mcp_elicitations_auto_deny = xcode_26_4_mcp_elicitations_auto_deny(
app_server_client_name.as_deref(),
app_server_client_version.as_deref(),
@@ -672,7 +678,13 @@ impl ThreadRequestProcessor {
mcp_elicitations_auto_deny,
)
.await
.map_err(|err| internal_error(format!("failed to set app server client info: {err}")))
.map_err(|err| internal_error(format!("failed to set app server client info: {err}")))?;
crate::plugin_install_suggest::set_app_server_client_name(
plugin_install_suggest_client_names,
thread.session_configured().thread_id,
client_name_for_tool_provider,
);
Ok(())
}
async fn finalize_thread_teardown(&self, thread_id: ThreadId) {
@@ -849,6 +861,8 @@ impl ThreadRequestProcessor {
let config_manager = self.config_manager.clone();
let outgoing = Arc::clone(&listener_task_context.outgoing);
let error_request_id = request_id.clone();
let plugin_install_suggest_client_names =
Arc::clone(&self.plugin_install_suggest_client_names);
let thread_start_task = async move {
if let Err(error) = Self::thread_start_task(
listener_task_context,
@@ -865,6 +879,7 @@ impl ThreadRequestProcessor {
service_name,
experimental_raw_events,
request_trace,
plugin_install_suggest_client_names,
)
.await
{
@@ -949,6 +964,7 @@ impl ThreadRequestProcessor {
service_name: Option<String>,
experimental_raw_events: bool,
request_trace: Option<W3cTraceContext>,
plugin_install_suggest_client_names: crate::plugin_install_suggest::AppServerClientNames,
) -> Result<(), JSONRPCErrorError> {
let requested_cwd = typesafe_overrides.cwd.clone();
let mut config = config_manager
@@ -1081,6 +1097,7 @@ impl ThreadRequestProcessor {
})?;
Self::set_app_server_client_info(
&plugin_install_suggest_client_names,
thread.as_ref(),
app_server_client_name,
app_server_client_version,
@@ -2404,6 +2421,7 @@ impl ThreadRequestProcessor {
..
}) => {
if let Err(err) = Self::set_app_server_client_info(
&self.plugin_install_suggest_client_names,
codex_thread.as_ref(),
app_server_client_name,
app_server_client_version,
@@ -2638,6 +2656,7 @@ impl ThreadRequestProcessor {
)
.await?;
Self::set_app_server_client_info(
&self.plugin_install_suggest_client_names,
existing_thread.as_ref(),
app_server_client_name,
app_server_client_version,
@@ -3053,6 +3072,7 @@ impl ThreadRequestProcessor {
})?;
Self::set_app_server_client_info(
&self.plugin_install_suggest_client_names,
forked_thread.as_ref(),
app_server_client_name,
app_server_client_version,

View File

@@ -13,6 +13,7 @@ pub(crate) struct TurnRequestProcessor {
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
thread_list_state_permit: Arc<Semaphore>,
plugin_install_suggest_client_names: crate::plugin_install_suggest::AppServerClientNames,
}
impl TurnRequestProcessor {
@@ -29,6 +30,7 @@ impl TurnRequestProcessor {
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
thread_list_state_permit: Arc<Semaphore>,
plugin_install_suggest_client_names: crate::plugin_install_suggest::AppServerClientNames,
) -> Self {
Self {
auth_manager,
@@ -42,6 +44,7 @@ impl TurnRequestProcessor {
thread_state_manager,
thread_watch_manager,
thread_list_state_permit,
plugin_install_suggest_client_names,
}
}
@@ -330,7 +333,7 @@ impl TurnRequestProcessor {
.inspect_err(|error| {
self.track_error_response(&request_id, error, /*error_type*/ None);
})?;
Self::set_app_server_client_info(
self.set_app_server_client_info(
thread.as_ref(),
app_server_client_name,
app_server_client_version,
@@ -541,10 +544,12 @@ impl TurnRequestProcessor {
}
async fn set_app_server_client_info(
&self,
thread: &CodexThread,
app_server_client_name: Option<String>,
app_server_client_version: Option<String>,
) -> Result<(), JSONRPCErrorError> {
let client_name_for_tool_provider = app_server_client_name.clone();
let mcp_elicitations_auto_deny = xcode_26_4_mcp_elicitations_auto_deny(
app_server_client_name.as_deref(),
app_server_client_version.as_deref(),
@@ -556,7 +561,13 @@ impl TurnRequestProcessor {
mcp_elicitations_auto_deny,
)
.await
.map_err(|err| internal_error(format!("failed to set app server client info: {err}")))
.map_err(|err| internal_error(format!("failed to set app server client info: {err}")))?;
crate::plugin_install_suggest::set_app_server_client_name(
&self.plugin_install_suggest_client_names,
thread.session_configured().thread_id,
client_name_for_tool_provider,
);
Ok(())
}
async fn turn_steer_inner(

View File

@@ -24,13 +24,21 @@ pub use codex_config::types::TuiKeymap;
pub use codex_config::types::TuiNotificationSettings;
pub use codex_config::types::UriBasedFileOpener;
pub use codex_core::CodexThread;
pub use codex_core::ExtensionRegistry;
pub use codex_core::ForkSnapshot;
pub use codex_core::AnyToolHandler;
pub use codex_core::McpManager;
pub use codex_core::NewThread;
pub use codex_core::StartThreadOptions;
pub use codex_core::StateDbHandle;
pub use codex_core::ThreadManager;
pub use codex_core::ThreadManagerBuilder;
pub use codex_core::ThreadShutdownReport;
pub use codex_core::FunctionCallError;
pub use codex_core::FunctionToolOutput;
pub use codex_core::ToolHandler;
pub use codex_core::ToolInvocation;
pub use codex_core::ToolProvider;
pub use codex_core::agent_graph_store_from_state_db;
pub use codex_core::config::Config;
pub use codex_core::config::Constrained;

View File

@@ -100,6 +100,7 @@ pub(crate) async fn run_codex_thread_interactive(
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
state_db: parent_session.services.state_db.clone(),
thread_store: Arc::clone(&parent_session.services.thread_store),
extensions: parent_session.services.extensions.clone(),
}))
.or_cancel(&cancel_token)
.await??;

View File

@@ -115,7 +115,7 @@ pub(crate) async fn list_accessible_and_enabled_connectors_from_manager(
.collect()
}
pub(crate) async fn list_tool_suggest_discoverable_tools_with_auth(
pub async fn list_tool_suggest_discoverable_tools_with_auth(
config: &Config,
auth: Option<&CodexAuth>,
accessible_connectors: &[AppInfo],
@@ -162,7 +162,7 @@ pub async fn list_cached_accessible_connectors_from_mcp_tools(
})
}
pub(crate) fn refresh_accessible_connectors_cache_from_mcp_tools(
pub fn refresh_accessible_connectors_cache_from_mcp_tools(
config: &Config,
auth: Option<&CodexAuth>,
mcp_tools: &HashMap<String, ToolInfo>,
@@ -515,7 +515,7 @@ async fn chatgpt_get_request_with_auth_provider<T: DeserializeOwned>(
}
}
pub(crate) fn accessible_connectors_from_mcp_tools(
pub fn accessible_connectors_from_mcp_tools(
mcp_tools: &HashMap<String, ToolInfo>,
) -> Vec<AppInfo> {
// ToolInfo already carries plugin provenance, so app-level plugin sources

View File

@@ -0,0 +1,110 @@
//! Extension points for codex-core.
use std::any::Any;
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::Arc;
use crate::config::Config;
use codex_core_plugins::PluginsManager;
pub use crate::function_tool::FunctionCallError;
pub use crate::tools::context::FunctionToolOutput;
pub use crate::tools::context::ToolInvocation;
pub use crate::tools::registry::AnyToolHandler;
pub use crate::tools::registry::ToolHandler;
pub use crate::tools::registry::ToolKind;
/// Stores registered implementations of codex-core extension traits.
///
/// Registrations are keyed by the trait object type used at insertion time. To
/// register an implementation for an extension trait, coerce it to that trait
/// object first, such as `Arc<dyn ToolProvider>`.
#[derive(Clone, Default)]
pub struct ExtensionRegistry {
extensions: HashMap<TypeId, Vec<Arc<dyn Any + Send + Sync>>>,
}
impl ExtensionRegistry {
/// Register one implementation for the extension trait `T`.
pub fn register<T>(&mut self, extension: Arc<T>)
where
T: ?Sized + Send + Sync + 'static,
{
let extension = Arc::new(extension);
self.extensions
.entry(TypeId::of::<T>())
.or_default()
.push(extension);
}
/// Return all registered implementations for the extension trait `T`.
pub fn get<T>(&self) -> Vec<Arc<T>>
where
T: ?Sized + Send + Sync + 'static,
{
self.extensions
.get(&TypeId::of::<T>())
.into_iter()
.flat_map(|extensions| extensions.iter())
.filter_map(|extension| extension.downcast_ref::<Arc<T>>().cloned())
.collect()
}
/// Returns true when no extension traits have been registered.
pub fn is_empty(&self) -> bool {
self.extensions.is_empty()
}
}
/// Runtime context available when extension providers create tool handlers.
#[derive(Clone)]
pub struct ToolProviderContext {
config: Arc<Config>,
plugins_manager: Arc<PluginsManager>,
conversation_id: String,
turn_id: String,
}
impl ToolProviderContext {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
config: Arc<Config>,
plugins_manager: Arc<PluginsManager>,
conversation_id: String,
turn_id: String,
) -> Self {
Self {
config,
plugins_manager,
conversation_id,
turn_id,
}
}
pub fn config(&self) -> Arc<Config> {
Arc::clone(&self.config)
}
pub fn plugins_manager(&self) -> Arc<PluginsManager> {
Arc::clone(&self.plugins_manager)
}
pub fn conversation_id(&self) -> String {
self.conversation_id.clone()
}
pub fn turn_id(&self) -> String {
self.turn_id.clone()
}
}
/// Provides tools through codex-core extensibility.
///
/// Implementations are expected to return handlers owned by the provider. Tool
/// specs may still be provided by the existing built-in plan while handlers
/// migrate behind this extension point.
pub trait ToolProvider: Send + Sync + 'static {
/// Return tool handlers owned by this provider for the current config.
fn handlers(&self, context: ToolProviderContext) -> Vec<Arc<dyn AnyToolHandler>>;
}

View File

@@ -33,6 +33,14 @@ mod context_manager;
mod environment_selection;
pub mod exec;
pub mod exec_env;
pub mod extensibility;
pub use extensibility::AnyToolHandler;
pub use extensibility::ExtensionRegistry;
pub use extensibility::FunctionCallError;
pub use extensibility::FunctionToolOutput;
pub use extensibility::ToolHandler;
pub use extensibility::ToolInvocation;
pub use extensibility::ToolProvider;
mod exec_policy;
pub mod file_watcher;
mod flags;
@@ -116,6 +124,7 @@ pub use thread_manager::ForkSnapshot;
pub use thread_manager::NewThread;
pub use thread_manager::StartThreadOptions;
pub use thread_manager::ThreadManager;
pub use thread_manager::ThreadManagerBuilder;
pub use thread_manager::ThreadShutdownReport;
pub use thread_manager::agent_graph_store_from_state_db;
pub use thread_manager::build_models_manager;

View File

@@ -44,17 +44,17 @@ pub async fn build_prompt_input(
let thread_store = thread_store_from_config(&config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let installation_id = resolve_installation_id(&config.codex_home).await?;
let thread_manager = ThreadManager::new(
let thread_manager = ThreadManager::builder(
&config,
Arc::clone(&auth_manager),
SessionSource::Exec,
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
installation_id,
);
)
.session_source(SessionSource::Exec)
.build();
let thread = thread_manager.start_thread(config).await?;
let output = build_prompt_input_from_session(thread.thread.codex.session.as_ref(), input).await;

View File

@@ -32,6 +32,7 @@ use crate::context::PersonalitySpecInstructions;
use crate::default_skill_metadata_budget;
use crate::environment_selection::ResolvedTurnEnvironments;
use crate::exec_policy::ExecPolicyManager;
use crate::extensibility::ExtensionRegistry;
use crate::parse_turn_item;
use crate::path_utils::normalize_for_native_workdir;
use crate::realtime_conversation::RealtimeConversationManager;
@@ -411,6 +412,7 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
pub(crate) state_db: Option<state_db::StateDbHandle>,
pub(crate) thread_store: Arc<dyn ThreadStore>,
pub(crate) extensions: ExtensionRegistry,
}
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
@@ -471,6 +473,7 @@ impl Codex {
analytics_events_client,
state_db,
thread_store,
extensions,
} = args;
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
@@ -648,6 +651,7 @@ impl Codex {
analytics_events_client,
state_db,
thread_store,
extensions,
parent_rollout_thread_trace,
)
.await

View File

@@ -370,6 +370,7 @@ impl Session {
analytics_events_client: Option<AnalyticsEventsClient>,
state_db: Option<state_db::StateDbHandle>,
thread_store: Arc<dyn ThreadStore>,
extensions: ExtensionRegistry,
parent_rollout_thread_trace: ThreadTraceContext,
) -> anyhow::Result<Arc<Self>> {
debug!(
@@ -838,6 +839,7 @@ impl Session {
state_db: state_db_ctx.clone(),
live_thread: live_thread_init.as_ref().cloned(),
thread_store: Arc::clone(&thread_store),
extensions,
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),
session_id,

View File

@@ -545,6 +545,7 @@ fn test_tool_runtime(session: Arc<Session>, turn_context: Arc<TurnContext>) -> T
parallel_mcp_server_names: HashSet::new(),
discoverable_tools: None,
dynamic_tools: turn_context.dynamic_tools.as_slice(),
extension_tool_handlers: Vec::new(),
},
));
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
@@ -3762,6 +3763,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
.await
.expect("state db should initialize"),
)),
extensions: ExtensionRegistry::default(),
model_client: ModelClient::new(
Some(auth_manager.clone()),
thread_id.into(),
@@ -5448,6 +5450,7 @@ where
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
state_db,
)),
extensions: ExtensionRegistry::default(),
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),
thread_id.into(),
@@ -8287,6 +8290,7 @@ async fn fatal_tool_error_stops_turn_and_reports_error() {
parallel_mcp_server_names: HashSet::new(),
discoverable_tools: None,
dynamic_tools: turn_context.dynamic_tools.as_slice(),
extension_tool_handlers: Vec::new(),
},
);
let item = ResponseItem::CustomToolCall {

View File

@@ -769,6 +769,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
analytics_events_client: None,
state_db: None,
thread_store,
extensions: ExtensionRegistry::default(),
})
.await
.expect("spawn guardian subagent");

View File

@@ -19,6 +19,8 @@ use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::compact_remote_v2::run_inline_remote_auto_compact_task as run_inline_remote_auto_compact_task_v2;
use crate::connectors;
use crate::context::ContextualUserFragment;
use crate::extensibility::ToolProvider;
use crate::extensibility::ToolProviderContext;
use crate::feedback_tags;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::emit_hook_completed_events;
@@ -53,6 +55,7 @@ use crate::stream_events_utils::record_completed_response_item;
use crate::tools::ToolRouter;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::registry::ToolArgumentDiffContext;
use crate::tools::registry::ToolArgumentDiffConsumer;
use crate::tools::router::ToolRouterParams;
use crate::turn_diff_tracker::TurnDiffTracker;
@@ -1187,7 +1190,10 @@ pub(crate) async fn built_tools(
None
};
let auth = sess.services.auth_manager.auth().await;
let discoverable_tools = if apps_enabled && turn_context.tools_config.tool_suggest {
let plugin_install_suggest_enabled = apps_enabled
&& turn_context.tools_config.tool_suggest
&& turn_context.app_server_client_name.as_deref() != Some("codex-tui");
let discoverable_tools = if plugin_install_suggest_enabled {
if let Some(accessible_connectors) = accessible_connectors_with_enabled_state.as_ref() {
match connectors::list_tool_suggest_discoverable_tools_with_auth(
&turn_context.config,
@@ -1265,6 +1271,20 @@ pub(crate) async fn built_tools(
})
.collect::<HashSet<_>>();
let provider_context = ToolProviderContext::new(
Arc::clone(&turn_context.config),
Arc::clone(&sess.services.plugins_manager),
sess.conversation_id.to_string(),
turn_context.sub_id.clone(),
);
let extension_tool_handlers = sess
.services
.extensions
.get::<dyn ToolProvider>()
.into_iter()
.flat_map(|provider| provider.handlers(provider_context.clone()))
.collect();
Ok(Arc::new(ToolRouter::from_config(
&turn_context.tools_config,
ToolRouterParams {
@@ -1274,6 +1294,7 @@ pub(crate) async fn built_tools(
parallel_mcp_server_names,
discoverable_tools,
dynamic_tools: turn_context.dynamic_tools.as_slice(),
extension_tool_handlers,
},
)))
}
@@ -2176,7 +2197,11 @@ async fn try_run_sampling_request(
Some(call_id) => call_id,
None => active_call_id.clone(),
};
if let Some(event) = consumer.consume_diff(turn_context.as_ref(), call_id, &delta) {
if let Some(event) = consumer.consume_diff(
ToolArgumentDiffContext::new(turn_context.as_ref()),
call_id,
&delta,
) {
sess.send_event(&turn_context, event).await;
}
}

View File

@@ -6,6 +6,7 @@ use crate::agent::AgentControl;
use crate::client::ModelClient;
use crate::config::StartedNetworkProxy;
use crate::exec_policy::ExecPolicyManager;
use crate::extensibility::ExtensionRegistry;
use crate::guardian::GuardianRejection;
use crate::guardian::GuardianRejectionCircuitBreaker;
use crate::mcp::McpManager;
@@ -66,6 +67,7 @@ pub(crate) struct SessionServices {
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) live_thread: Option<LiveThread>,
pub(crate) thread_store: Arc<dyn ThreadStore>,
pub(crate) extensions: ExtensionRegistry,
/// Session-scoped model client shared across turns.
pub(crate) model_client: ModelClient,
pub(crate) code_mode_service: CodeModeService,

View File

@@ -5,6 +5,7 @@ use crate::config::Config;
use crate::config::ThreadStoreConfig;
use crate::environment_selection::default_thread_environment_selections;
use crate::environment_selection::resolve_environment_selections;
use crate::extensibility::ExtensionRegistry;
use crate::file_watcher::FileWatcher;
use crate::mcp::McpManager;
use crate::resolve_installation_id;
@@ -216,6 +217,107 @@ pub struct ThreadManager {
_test_codex_home_guard: Option<TempCodexHomeGuard>,
}
/// Builder for constructing a [`ThreadManager`].
pub struct ThreadManagerBuilder {
codex_home: AbsolutePathBuf,
bundled_skills_enabled: bool,
auth_manager: Arc<AuthManager>,
models_manager: SharedModelsManager,
session_source: SessionSource,
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
state_db: StateDbHandle,
thread_store: Arc<dyn ThreadStore>,
agent_graph_store: Arc<dyn AgentGraphStore>,
extensions: ExtensionRegistry,
installation_id: String,
test_codex_home_guard: Option<TempCodexHomeGuard>,
}
impl ThreadManagerBuilder {
/// Create a builder from the effective runtime config and process-scoped stores.
#[allow(clippy::too_many_arguments)]
pub fn from_config(
config: &Config,
auth_manager: Arc<AuthManager>,
environment_manager: Arc<EnvironmentManager>,
state_db: StateDbHandle,
thread_store: Arc<dyn ThreadStore>,
agent_graph_store: Arc<dyn AgentGraphStore>,
installation_id: String,
) -> Self {
Self::new(
config.codex_home.clone(),
config.bundled_skills_enabled(),
auth_manager.clone(),
build_models_manager(config, auth_manager),
environment_manager,
state_db,
thread_store,
agent_graph_store,
installation_id,
)
}
#[allow(clippy::too_many_arguments)]
fn new(
codex_home: AbsolutePathBuf,
bundled_skills_enabled: bool,
auth_manager: Arc<AuthManager>,
models_manager: SharedModelsManager,
environment_manager: Arc<EnvironmentManager>,
state_db: StateDbHandle,
thread_store: Arc<dyn ThreadStore>,
agent_graph_store: Arc<dyn AgentGraphStore>,
installation_id: String,
) -> Self {
Self {
codex_home,
bundled_skills_enabled,
auth_manager,
models_manager,
session_source: SessionSource::Exec,
environment_manager,
analytics_events_client: None,
state_db,
thread_store,
agent_graph_store,
extensions: ExtensionRegistry::default(),
installation_id,
test_codex_home_guard: None,
}
}
/// Override the session source recorded on threads started by this manager.
pub fn session_source(mut self, session_source: SessionSource) -> Self {
self.session_source = session_source;
self
}
/// Attach the analytics client used by services owned by this manager.
pub fn analytics_events_client(
mut self,
analytics_events_client: AnalyticsEventsClient,
) -> Self {
self.analytics_events_client = Some(analytics_events_client);
self
}
/// Register one implementation for extension trait `T`.
pub fn register_extension<T>(mut self, extension: Arc<T>) -> Self
where
T: ?Sized + Send + Sync + 'static,
{
self.extensions.register(extension);
self
}
/// Build a thread manager from the configured inputs.
pub fn build(self) -> ThreadManager {
ThreadManager::new(self)
}
}
pub struct StartThreadOptions {
pub config: Config,
pub initial_history: InitialHistory,
@@ -253,6 +355,7 @@ pub(crate) struct ThreadManagerState {
thread_store: Arc<dyn ThreadStore>,
state_db: StateDbHandle,
agent_graph_store: Arc<dyn AgentGraphStore>,
extensions: ExtensionRegistry,
session_source: SessionSource,
installation_id: String,
analytics_events_client: Option<AnalyticsEventsClient>,
@@ -308,19 +411,44 @@ async fn state_db_from_roots_for_tests(
}
impl ThreadManager {
/// Create a builder for the standard runtime thread manager.
#[allow(clippy::too_many_arguments)]
pub fn new(
pub fn builder(
config: &Config,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
state_db: StateDbHandle,
thread_store: Arc<dyn ThreadStore>,
agent_graph_store: Arc<dyn AgentGraphStore>,
installation_id: String,
) -> Self {
let codex_home = config.codex_home.clone();
) -> ThreadManagerBuilder {
ThreadManagerBuilder::from_config(
config,
auth_manager,
environment_manager,
state_db,
thread_store,
agent_graph_store,
installation_id,
)
}
fn new(builder: ThreadManagerBuilder) -> Self {
let ThreadManagerBuilder {
codex_home,
bundled_skills_enabled,
auth_manager,
models_manager,
session_source,
environment_manager,
analytics_events_client,
state_db,
thread_store,
agent_graph_store,
extensions,
installation_id,
test_codex_home_guard,
} = builder;
let restriction_product = session_source.restriction_product();
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
let plugins_manager = Arc::new(PluginsManager::new_with_restriction_product(
@@ -330,7 +458,7 @@ impl ThreadManager {
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
let skills_manager = Arc::new(SkillsManager::new_with_restriction_product(
codex_home,
config.bundled_skills_enabled(),
bundled_skills_enabled,
restriction_product,
));
let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager));
@@ -338,7 +466,7 @@ impl ThreadManager {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
thread_created_tx,
models_manager: build_models_manager(config, auth_manager.clone()),
models_manager,
environment_manager,
skills_manager,
plugins_manager,
@@ -347,6 +475,7 @@ impl ThreadManager {
thread_store,
state_db,
agent_graph_store,
extensions,
auth_manager,
session_source,
installation_id,
@@ -354,7 +483,7 @@ impl ThreadManager {
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
_test_codex_home_guard: None,
_test_codex_home_guard: test_codex_home_guard,
}
}
@@ -440,21 +569,8 @@ impl ThreadManager {
) -> Self {
set_thread_manager_test_mode_for_tests(/*enabled*/ true);
let auth_manager = AuthManager::from_auth_for_testing(auth);
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
let restriction_product = SessionSource::Exec.restriction_product();
let plugins_manager = Arc::new(PluginsManager::new_with_restriction_product(
codex_home.clone(),
restriction_product,
));
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
let skills_manager = Arc::new(SkillsManager::new_with_restriction_product(
skills_codex_home,
/*bundled_skills_enabled*/ true,
restriction_product,
));
let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager));
// This test constructor has no Config input. Tests that need a non-local
// process store should construct ThreadManager::new with an explicit store.
// This test helper has no Config input. Tests that need a non-local
// process store should construct ThreadManager through the builder with an explicit store.
let thread_store: Arc<dyn ThreadStore> = Arc::new(LocalThreadStore::new(
LocalThreadStoreConfig {
codex_home: codex_home.clone(),
@@ -463,29 +579,21 @@ impl ThreadManager {
state_db.clone(),
));
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
thread_created_tx,
models_manager: create_model_provider(provider, Some(auth_manager.clone()))
.models_manager(codex_home, /*config_model_catalog*/ None),
environment_manager,
skills_manager,
plugins_manager,
mcp_manager,
skills_watcher,
thread_store,
state_db,
agent_graph_store,
auth_manager,
session_source: SessionSource::Exec,
installation_id,
analytics_events_client: None,
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
_test_codex_home_guard: None,
}
let models_manager = create_model_provider(provider, Some(auth_manager.clone()))
.models_manager(codex_home, /*config_model_catalog*/ None);
ThreadManagerBuilder::new(
skills_codex_home,
/*bundled_skills_enabled*/ true,
auth_manager,
models_manager,
environment_manager,
state_db,
thread_store,
agent_graph_store,
installation_id,
)
.session_source(SessionSource::Exec)
.build()
}
pub fn session_source(&self) -> SessionSource {
@@ -531,6 +639,10 @@ impl ThreadManager {
self.state.models_manager.clone()
}
pub fn extensions(&self) -> &ExtensionRegistry {
&self.state.extensions
}
pub async fn list_models(&self, refresh_strategy: RefreshStrategy) -> Vec<ModelPreset> {
self.state
.models_manager
@@ -1244,6 +1356,7 @@ impl ThreadManagerState {
analytics_events_client: self.analytics_events_client.clone(),
state_db: Some(self.state_db.clone()),
thread_store: Arc::clone(&self.thread_store),
extensions: self.extensions.clone(),
})
.await?;
let new_thread = self

View File

@@ -409,17 +409,17 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let selected_cwd =
AbsolutePathBuf::try_from(config.cwd.as_path().join("selected")).expect("absolute path");
let environments = vec![TurnEnvironmentSelection {
@@ -525,17 +525,17 @@ async fn explicit_installation_id_skips_codex_home_file() {
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let installation_id = uuid::Uuid::new_v4().to_string();
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager,
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
installation_id.clone(),
);
)
.session_source(SessionSource::Exec)
.build();
let thread = manager
.start_thread(config.clone())
@@ -564,17 +564,17 @@ async fn resume_active_thread_from_rollout_returns_running_thread() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let source = manager
.start_thread(config.clone())
@@ -621,17 +621,17 @@ async fn resume_stopped_thread_from_rollout_spawns_new_thread() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let source = manager
.start_thread(config.clone())
@@ -683,17 +683,17 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let source = manager
.start_thread_with_options(StartThreadOptions {
@@ -769,17 +769,17 @@ async fn new_uses_active_provider_for_model_refresh() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager,
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let _ = manager.list_models(RefreshStrategy::Online).await;
assert_eq!(models_mock.requests().len(), 1);
@@ -984,17 +984,17 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let source = manager
.resume_thread_with_history(
@@ -1091,17 +1091,17 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let source = manager
.resume_thread_with_history(
@@ -1187,17 +1187,17 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let source = manager
.resume_thread_with_history(
@@ -1329,17 +1329,17 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let source = manager
.resume_thread_with_history(

View File

@@ -302,6 +302,7 @@ async fn build_nested_router(exec: &ExecContext) -> ToolRouter {
parallel_mcp_server_names,
discoverable_tools: None,
dynamic_tools: exec.turn.dynamic_tools.as_slice(),
extension_tool_handlers: Vec::new(),
},
)
}

View File

@@ -1,4 +1,5 @@
use crate::context_manager::truncate_function_output_payload;
use crate::function_tool::FunctionCallError;
use crate::original_image_detail::sanitize_original_image_detail;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
@@ -7,7 +8,10 @@ use crate::tools::TELEMETRY_PREVIEW_MAX_LINES;
use crate::tools::TELEMETRY_PREVIEW_TRUNCATION_NOTICE;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::unified_exec::resolve_max_tokens;
use codex_app_server_protocol::McpServerElicitationRequestParams;
use codex_mcp::ToolInfo;
use codex_protocol::mcp::CallToolResult;
use codex_rmcp_client::ElicitationResponse;
use codex_protocol::models::DEFAULT_IMAGE_DETAIL;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputContentItem;
@@ -24,6 +28,8 @@ use codex_utils_string::take_bytes_at_char_boundary;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
@@ -46,14 +52,81 @@ pub enum ToolCallSource {
#[derive(Clone)]
pub struct ToolInvocation {
pub session: Arc<Session>,
pub turn: Arc<TurnContext>,
pub cancellation_token: CancellationToken,
pub tracker: SharedTurnDiffTracker,
pub call_id: String,
pub tool_name: ToolName,
pub source: ToolCallSource,
pub payload: ToolPayload,
pub(crate) session: Arc<Session>,
pub(crate) turn: Arc<TurnContext>,
pub(crate) cancellation_token: CancellationToken,
pub(crate) tracker: SharedTurnDiffTracker,
pub(crate) call_id: String,
pub(crate) tool_name: ToolName,
pub(crate) source: ToolCallSource,
pub(crate) payload: ToolPayload,
}
impl ToolInvocation {
pub fn function_arguments(&self, tool_name: &str) -> Result<&str, FunctionCallError> {
match &self.payload {
ToolPayload::Function { arguments } => Ok(arguments),
_ => Err(FunctionCallError::Fatal(format!(
"{tool_name} handler received unsupported payload"
))),
}
}
pub fn call_id(&self) -> &str {
self.call_id.as_str()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "tool extensions read through the session-owned MCP manager guard"
)]
pub async fn list_mcp_tools(&self) -> HashMap<String, ToolInfo> {
self.session
.services
.mcp_connection_manager
.read()
.await
.list_all_tools()
.await
}
#[expect(
clippy::await_holding_invalid_type,
reason = "tool extensions refresh through the session-owned MCP manager guard"
)]
pub async fn hard_refresh_codex_apps_tools_cache(
&self,
) -> anyhow::Result<HashMap<String, ToolInfo>> {
self.session
.services
.mcp_connection_manager
.read()
.await
.hard_refresh_codex_apps_tools_cache()
.await
}
pub async fn request_mcp_server_elicitation(
&self,
request_id: String,
params: McpServerElicitationRequestParams,
) -> Option<ElicitationResponse> {
self.session
.request_mcp_server_elicitation(
self.turn.as_ref(),
rmcp::model::RequestId::String(request_id.into()),
params,
)
.await
}
pub async fn reload_user_config_layer(&self) {
self.session.reload_user_config_layer().await;
}
pub async fn merge_connector_selection(&self, connector_ids: HashSet<String>) {
self.session.merge_connector_selection(connector_ids).await;
}
}
#[derive(Clone, Debug)]

View File

@@ -26,6 +26,7 @@ use crate::tools::hook_names::HookToolName;
use crate::tools::orchestrator::ToolOrchestrator;
use crate::tools::registry::PostToolUsePayload;
use crate::tools::registry::PreToolUsePayload;
use crate::tools::registry::ToolArgumentDiffContext;
use crate::tools::registry::ToolArgumentDiffConsumer;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
@@ -64,11 +65,11 @@ struct ApplyPatchArgumentDiffConsumer {
impl ToolArgumentDiffConsumer for ApplyPatchArgumentDiffConsumer {
fn consume_diff(
&mut self,
turn: &TurnContext,
context: ToolArgumentDiffContext<'_>,
call_id: String,
diff: &str,
) -> Option<EventMsg> {
if !turn.features.enabled(Feature::ApplyPatchStreamingEvents) {
if !context.feature_enabled(Feature::ApplyPatchStreamingEvents) {
return None;
}

View File

@@ -9,7 +9,6 @@ pub(crate) mod multi_agents_common;
pub(crate) mod multi_agents_v2;
mod plan;
mod request_permissions;
mod request_plugin_install;
mod request_user_input;
mod shell;
mod test_sync;
@@ -47,7 +46,6 @@ pub use mcp_resource::ListMcpResourcesHandler;
pub use mcp_resource::ReadMcpResourceHandler;
pub use plan::PlanHandler;
pub use request_permissions::RequestPermissionsHandler;
pub use request_plugin_install::RequestPluginInstallHandler;
pub use request_user_input::RequestUserInputHandler;
pub use shell::ContainerExecHandler;
pub use shell::LocalShellHandler;

View File

@@ -3162,17 +3162,17 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr
let state_db = init_state_db(&config)
.await
.expect("test config should initialize state db");
let manager = ThreadManager::new(
let manager = ThreadManager::builder(
&config,
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db.clone(),
thread_store_from_config(&config, state_db.clone()),
agent_graph_store_from_state_db(state_db.clone()),
"11111111-1111-4111-8111-111111111111".to_string(),
);
)
.session_source(SessionSource::Exec)
.build();
let parent = manager
.start_thread(config.clone())

View File

@@ -1,339 +0,0 @@
use std::collections::HashSet;
use codex_app_server_protocol::AppInfo;
use codex_config::types::ToolSuggestDisabledTool;
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
use codex_rmcp_client::ElicitationAction;
use codex_rmcp_client::ElicitationResponse;
use codex_tools::DiscoverableTool;
use codex_tools::DiscoverableToolAction;
use codex_tools::DiscoverableToolType;
use codex_tools::REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE;
use codex_tools::REQUEST_PLUGIN_INSTALL_PERSIST_KEY;
use codex_tools::REQUEST_PLUGIN_INSTALL_TOOL_NAME;
use codex_tools::RequestPluginInstallArgs;
use codex_tools::RequestPluginInstallResult;
use codex_tools::ToolName;
use codex_tools::all_requested_connectors_picked_up;
use codex_tools::build_request_plugin_install_elicitation_request;
use codex_tools::filter_request_plugin_install_discoverable_tools_for_client;
use codex_tools::verified_connector_install_completed;
use rmcp::model::RequestId;
use serde_json::Value;
use tracing::warn;
use crate::config::edit::ConfigEdit;
use crate::config::edit::ConfigEditsBuilder;
use crate::connectors;
use crate::function_tool::FunctionCallError;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::handlers::parse_arguments;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
pub struct RequestPluginInstallHandler;
impl ToolHandler for RequestPluginInstallHandler {
type Output = FunctionToolOutput;
fn tool_name(&self) -> ToolName {
ToolName::plain(REQUEST_PLUGIN_INSTALL_TOOL_NAME)
}
fn kind(&self) -> ToolKind {
ToolKind::Function
}
#[expect(
clippy::await_holding_invalid_type,
reason = "plugin install discovery reads through the session-owned manager guard"
)]
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
payload,
session,
turn,
call_id,
..
} = invocation;
let arguments = match payload {
ToolPayload::Function { arguments } => arguments,
_ => {
return Err(FunctionCallError::Fatal(format!(
"{REQUEST_PLUGIN_INSTALL_TOOL_NAME} handler received unsupported payload"
)));
}
};
let args: RequestPluginInstallArgs = parse_arguments(&arguments)?;
let suggest_reason = args.suggest_reason.trim();
if suggest_reason.is_empty() {
return Err(FunctionCallError::RespondToModel(
"suggest_reason must not be empty".to_string(),
));
}
if args.action_type != DiscoverableToolAction::Install {
return Err(FunctionCallError::RespondToModel(
"plugin install requests currently support only action_type=\"install\""
.to_string(),
));
}
if args.tool_type == DiscoverableToolType::Plugin
&& turn.app_server_client_name.as_deref() == Some("codex-tui")
{
return Err(FunctionCallError::RespondToModel(
"plugin install requests are not available in codex-tui yet".to_string(),
));
}
let auth = session.services.auth_manager.auth().await;
let manager = session.services.mcp_connection_manager.read().await;
let mcp_tools = manager.list_all_tools().await;
drop(manager);
let accessible_connectors = connectors::with_app_enabled_state(
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
&turn.config,
);
let discoverable_tools = connectors::list_tool_suggest_discoverable_tools_with_auth(
&turn.config,
auth.as_ref(),
&accessible_connectors,
)
.await
.map(|discoverable_tools| {
filter_request_plugin_install_discoverable_tools_for_client(
discoverable_tools,
turn.app_server_client_name.as_deref(),
)
})
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"plugin install requests are unavailable right now: {err}"
))
})?;
let tool = discoverable_tools
.into_iter()
.find(|tool| tool.tool_type() == args.tool_type && tool.id() == args.tool_id)
.ok_or_else(|| {
FunctionCallError::RespondToModel(format!(
"tool_id must match one of the discoverable tools exposed by {REQUEST_PLUGIN_INSTALL_TOOL_NAME}"
))
})?;
let request_id = RequestId::String(format!("request_plugin_install_{call_id}").into());
let params = build_request_plugin_install_elicitation_request(
CODEX_APPS_MCP_SERVER_NAME,
session.conversation_id.to_string(),
turn.sub_id.clone(),
&args,
suggest_reason,
&tool,
);
let response = session
.request_mcp_server_elicitation(turn.as_ref(), request_id, params)
.await;
if let Some(response) = response.as_ref() {
maybe_persist_disabled_install_request(&session, &turn, &tool, response).await;
}
let user_confirmed = response
.as_ref()
.is_some_and(|response| response.action == ElicitationAction::Accept);
let completed = if user_confirmed {
verify_request_plugin_install_completed(&session, &turn, &tool, auth.as_ref()).await
} else {
false
};
if completed && let DiscoverableTool::Connector(connector) = &tool {
session
.merge_connector_selection(HashSet::from([connector.id.clone()]))
.await;
}
let content = serde_json::to_string(&RequestPluginInstallResult {
completed,
user_confirmed,
tool_type: args.tool_type,
action_type: args.action_type,
tool_id: tool.id().to_string(),
tool_name: tool.name().to_string(),
suggest_reason: suggest_reason.to_string(),
})
.map_err(|err| {
FunctionCallError::Fatal(format!(
"failed to serialize {REQUEST_PLUGIN_INSTALL_TOOL_NAME} response: {err}"
))
})?;
Ok(FunctionToolOutput::from_text(content, Some(true)))
}
}
async fn maybe_persist_disabled_install_request(
session: &crate::session::session::Session,
turn: &crate::session::turn_context::TurnContext,
tool: &DiscoverableTool,
response: &ElicitationResponse,
) {
if !request_plugin_install_response_requests_persistent_disable(response) {
return;
}
if let Err(err) = persist_disabled_install_request(&turn.config.codex_home, tool).await {
warn!(
error = %err,
tool_id = tool.id(),
"failed to persist disabled tool suggestion"
);
return;
}
session.reload_user_config_layer().await;
}
fn request_plugin_install_response_requests_persistent_disable(
response: &ElicitationResponse,
) -> bool {
if response.action != ElicitationAction::Decline {
return false;
}
response
.meta
.as_ref()
.and_then(Value::as_object)
.and_then(|meta| meta.get(REQUEST_PLUGIN_INSTALL_PERSIST_KEY))
.and_then(Value::as_str)
== Some(REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE)
}
async fn persist_disabled_install_request(
codex_home: &codex_utils_absolute_path::AbsolutePathBuf,
tool: &DiscoverableTool,
) -> anyhow::Result<()> {
ConfigEditsBuilder::new(codex_home)
.with_edits([ConfigEdit::AddToolSuggestDisabledTool(
disabled_install_request(tool),
)])
.apply()
.await
}
fn disabled_install_request(tool: &DiscoverableTool) -> ToolSuggestDisabledTool {
match tool {
DiscoverableTool::Connector(connector) => {
ToolSuggestDisabledTool::connector(connector.id.as_str())
}
DiscoverableTool::Plugin(plugin) => ToolSuggestDisabledTool::plugin(plugin.id.as_str()),
}
}
async fn verify_request_plugin_install_completed(
session: &crate::session::session::Session,
turn: &crate::session::turn_context::TurnContext,
tool: &DiscoverableTool,
auth: Option<&codex_login::CodexAuth>,
) -> bool {
match tool {
DiscoverableTool::Connector(connector) => refresh_missing_requested_connectors(
session,
turn,
auth,
std::slice::from_ref(&connector.id),
connector.id.as_str(),
)
.await
.is_some_and(|accessible_connectors| {
verified_connector_install_completed(connector.id.as_str(), &accessible_connectors)
}),
DiscoverableTool::Plugin(plugin) => {
session.reload_user_config_layer().await;
let config = session.get_config().await;
let completed = verified_plugin_install_completed(
plugin.id.as_str(),
config.as_ref(),
session.services.plugins_manager.as_ref(),
);
let _ = refresh_missing_requested_connectors(
session,
turn,
auth,
&plugin.app_connector_ids,
plugin.id.as_str(),
)
.await;
completed
}
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "connector cache refresh reads through the session-owned manager guard"
)]
async fn refresh_missing_requested_connectors(
session: &crate::session::session::Session,
turn: &crate::session::turn_context::TurnContext,
auth: Option<&codex_login::CodexAuth>,
expected_connector_ids: &[String],
tool_id: &str,
) -> Option<Vec<AppInfo>> {
if expected_connector_ids.is_empty() {
return Some(Vec::new());
}
let manager = session.services.mcp_connection_manager.read().await;
let mcp_tools = manager.list_all_tools().await;
let accessible_connectors = connectors::with_app_enabled_state(
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
&turn.config,
);
if all_requested_connectors_picked_up(expected_connector_ids, &accessible_connectors) {
return Some(accessible_connectors);
}
match manager.hard_refresh_codex_apps_tools_cache().await {
Ok(mcp_tools) => {
let accessible_connectors = connectors::with_app_enabled_state(
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
&turn.config,
);
connectors::refresh_accessible_connectors_cache_from_mcp_tools(
&turn.config,
auth,
&mcp_tools,
);
Some(accessible_connectors)
}
Err(err) => {
warn!(
"failed to refresh codex apps tools cache after plugin install request for {tool_id}: {err:#}"
);
None
}
}
}
fn verified_plugin_install_completed(
tool_id: &str,
config: &crate::config::Config,
plugins_manager: &codex_core_plugins::PluginsManager,
) -> bool {
let plugins_input = config.plugins_config_input();
plugins_manager
.list_marketplaces_for_config(&plugins_input, &[])
.ok()
.into_iter()
.flat_map(|outcome| outcome.marketplaces)
.flat_map(|marketplace| marketplace.plugins.into_iter())
.any(|plugin| plugin.id == tool_id && plugin.installed)
}
#[cfg(test)]
#[path = "request_plugin_install_tests.rs"]
mod tests;

View File

@@ -1,214 +0,0 @@
use super::*;
use crate::plugins::test_support::load_plugins_config;
use crate::plugins::test_support::write_curated_plugin_sha;
use crate::plugins::test_support::write_openai_curated_marketplace;
use crate::plugins::test_support::write_plugins_feature_config;
use codex_config::CONFIG_TOML_FILE;
use codex_config::config_toml::ConfigToml;
use codex_config::types::ToolSuggestConfig;
use codex_config::types::ToolSuggestDisabledTool;
use codex_config::types::ToolSuggestDiscoverable;
use codex_config::types::ToolSuggestDiscoverableType;
use codex_core_plugins::PluginInstallRequest;
use codex_core_plugins::PluginsManager;
use codex_core_plugins::startup_sync::curated_plugins_repo_path;
use codex_rmcp_client::ElicitationResponse;
use codex_tools::DiscoverablePluginInfo;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::PathExt;
use pretty_assertions::assert_eq;
use rmcp::model::ElicitationAction;
use serde_json::json;
use tempfile::tempdir;
#[tokio::test]
async fn verified_plugin_install_completed_requires_installed_plugin() {
let codex_home = tempdir().expect("tempdir should succeed");
let curated_root = curated_plugins_repo_path(codex_home.path());
write_openai_curated_marketplace(&curated_root, &["sample"]);
write_curated_plugin_sha(codex_home.path());
write_plugins_feature_config(codex_home.path());
let config = load_plugins_config(codex_home.path()).await;
let plugins_manager = PluginsManager::new(codex_home.path().to_path_buf());
assert!(!verified_plugin_install_completed(
"sample@openai-curated",
&config,
&plugins_manager,
));
plugins_manager
.install_plugin(PluginInstallRequest {
plugin_name: "sample".to_string(),
marketplace_path: AbsolutePathBuf::try_from(
curated_root.join(".agents/plugins/marketplace.json"),
)
.expect("marketplace path"),
})
.await
.expect("plugin should install");
let refreshed_config = load_plugins_config(codex_home.path()).await;
assert!(verified_plugin_install_completed(
"sample@openai-curated",
&refreshed_config,
&plugins_manager,
));
}
#[test]
fn request_plugin_install_response_persists_only_decline_always_mode() {
assert!(request_plugin_install_response_requests_persistent_disable(
&ElicitationResponse {
action: ElicitationAction::Decline,
content: None,
meta: Some(json!({
REQUEST_PLUGIN_INSTALL_PERSIST_KEY: REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE
})),
}
));
assert!(
!request_plugin_install_response_requests_persistent_disable(&ElicitationResponse {
action: ElicitationAction::Accept,
content: None,
meta: Some(json!({
REQUEST_PLUGIN_INSTALL_PERSIST_KEY: REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE
})),
})
);
assert!(
!request_plugin_install_response_requests_persistent_disable(&ElicitationResponse {
action: ElicitationAction::Decline,
content: None,
meta: Some(json!({ REQUEST_PLUGIN_INSTALL_PERSIST_KEY: "session" })),
})
);
assert!(
!request_plugin_install_response_requests_persistent_disable(&ElicitationResponse {
action: ElicitationAction::Decline,
content: None,
meta: None,
})
);
}
#[tokio::test]
async fn persist_disabled_install_request_writes_connector_config() {
let codex_home = tempdir().expect("tempdir should succeed");
let tool = connector_tool("connector_calendar", "Google Calendar");
persist_disabled_install_request(&codex_home.path().abs(), &tool)
.await
.expect("persist connector disable");
let contents =
std::fs::read_to_string(codex_home.path().join(CONFIG_TOML_FILE)).expect("read config");
let parsed: ConfigToml = toml::from_str(&contents).expect("parse config");
assert_eq!(
parsed.tool_suggest,
Some(ToolSuggestConfig {
discoverables: Vec::new(),
disabled_tools: vec![ToolSuggestDisabledTool::connector("connector_calendar")],
})
);
}
#[tokio::test]
async fn persist_disabled_install_request_writes_plugin_config() {
let codex_home = tempdir().expect("tempdir should succeed");
let tool = DiscoverableTool::Plugin(Box::new(DiscoverablePluginInfo {
id: "slack@openai-curated".to_string(),
name: "Slack".to_string(),
description: None,
has_skills: true,
mcp_server_names: Vec::new(),
app_connector_ids: Vec::new(),
}));
persist_disabled_install_request(&codex_home.path().abs(), &tool)
.await
.expect("persist plugin disable");
let contents =
std::fs::read_to_string(codex_home.path().join(CONFIG_TOML_FILE)).expect("read config");
let parsed: ConfigToml = toml::from_str(&contents).expect("parse config");
assert_eq!(
parsed.tool_suggest,
Some(ToolSuggestConfig {
discoverables: Vec::new(),
disabled_tools: vec![ToolSuggestDisabledTool::plugin("slack@openai-curated")],
})
);
}
#[tokio::test]
async fn persist_disabled_install_request_dedupes_existing_disabled_tools() {
let codex_home = tempdir().expect("tempdir should succeed");
let tool = connector_tool("connector_calendar", "Google Calendar");
std::fs::write(
codex_home.path().join(CONFIG_TOML_FILE),
r#"
[tool_suggest]
discoverables = [
{ type = "plugin", id = "sample@openai-curated" }
]
[[tool_suggest.disabled_tools]]
type = "connector"
id = " connector_calendar "
[[tool_suggest.disabled_tools]]
type = "connector"
id = "connector_calendar"
[[tool_suggest.disabled_tools]]
type = "connector"
id = " "
[[tool_suggest.disabled_tools]]
type = "plugin"
id = "slack@openai-curated"
"#,
)
.expect("write config");
persist_disabled_install_request(&codex_home.path().abs(), &tool)
.await
.expect("persist connector disable");
let contents =
std::fs::read_to_string(codex_home.path().join(CONFIG_TOML_FILE)).expect("read config");
let parsed: ConfigToml = toml::from_str(&contents).expect("parse config");
assert_eq!(
parsed.tool_suggest,
Some(ToolSuggestConfig {
discoverables: vec![ToolSuggestDiscoverable {
kind: ToolSuggestDiscoverableType::Plugin,
id: "sample@openai-curated".to_string(),
}],
disabled_tools: vec![
ToolSuggestDisabledTool::connector("connector_calendar"),
ToolSuggestDisabledTool::plugin("slack@openai-curated"),
],
})
);
}
fn connector_tool(id: &str, name: &str) -> DiscoverableTool {
DiscoverableTool::Connector(Box::new(AppInfo {
id: id.to_string(),
name: name.to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: None,
is_accessible: false,
is_enabled: true,
plugin_display_names: Vec::new(),
}))
}

View File

@@ -25,9 +25,11 @@ use codex_hooks::HookResult;
use codex_hooks::HookToolInput;
use codex_hooks::HookToolInputLocalShell;
use codex_hooks::HookToolKind;
use codex_features::Feature;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::protocol::EventMsg;
use codex_tools::ConfiguredToolSpec;
use codex_tools::ResponsesApiNamespaceTool;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
use codex_utils_readiness::Readiness;
@@ -94,11 +96,25 @@ pub trait ToolHandler: Send + Sync {
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send;
}
pub struct ToolArgumentDiffContext<'a> {
turn: &'a TurnContext,
}
impl<'a> ToolArgumentDiffContext<'a> {
pub(crate) fn new(turn: &'a TurnContext) -> Self {
Self { turn }
}
pub fn feature_enabled(&self, feature: Feature) -> bool {
self.turn.features.enabled(feature)
}
}
/// Consumes streamed argument diffs for a tool call and emits protocol events
/// derived from partial tool input.
pub(crate) trait ToolArgumentDiffConsumer: Send {
pub trait ToolArgumentDiffConsumer: Send {
/// Consume the next argument diff for a tool call.
fn consume_diff(&mut self, turn: &TurnContext, call_id: String, diff: &str)
fn consume_diff(&mut self, context: ToolArgumentDiffContext<'_>, call_id: String, diff: &str)
-> Option<EventMsg>;
/// Finish consuming argument diffs before the tool call completes.
@@ -107,7 +123,7 @@ pub(crate) trait ToolArgumentDiffConsumer: Send {
}
}
pub(crate) struct AnyToolResult {
pub struct AnyToolResult {
pub(crate) call_id: String,
pub(crate) payload: ToolPayload,
pub(crate) result: Box<dyn ToolOutput>,
@@ -134,7 +150,7 @@ impl AnyToolResult {
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PreToolUsePayload {
pub struct PreToolUsePayload {
/// Hook-facing tool name model.
///
/// The canonical name is serialized to hook stdin, while aliases are used
@@ -148,7 +164,7 @@ pub(crate) struct PreToolUsePayload {
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct PostToolUsePayload {
pub struct PostToolUsePayload {
/// Hook-facing tool name model.
///
/// The canonical name is serialized to hook stdin, while aliases are used
@@ -162,7 +178,9 @@ pub(crate) struct PostToolUsePayload {
pub(crate) tool_response: Value,
}
trait AnyToolHandler: Send + Sync {
pub trait AnyToolHandler: Send + Sync {
fn tool_name(&self) -> ToolName;
fn matches_kind(&self, payload: &ToolPayload) -> bool;
fn is_mutating<'a>(&'a self, invocation: &'a ToolInvocation) -> BoxFuture<'a, bool>;
@@ -180,6 +198,10 @@ impl<T> AnyToolHandler for T
where
T: ToolHandler,
{
fn tool_name(&self) -> ToolName {
ToolHandler::tool_name(self)
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
ToolHandler::matches_kind(self, payload)
}
@@ -539,14 +561,46 @@ impl ToolRegistryBuilder {
where
H: ToolHandler + 'static,
{
self.register_any_handler(handler);
}
pub fn register_any_handler(&mut self, handler: Arc<dyn AnyToolHandler>) {
let name = handler.tool_name();
let display_name = name.display();
let handler: Arc<dyn AnyToolHandler> = handler;
if self.handlers.insert(name, handler).is_some() {
warn!("overwriting handler for tool {display_name}");
}
}
pub fn register_any_handler_if_configured(&mut self, handler: Arc<dyn AnyToolHandler>) {
let name = handler.tool_name();
let has_spec = self.specs.iter().any(|configured_tool| match &configured_tool.spec {
ToolSpec::Function(tool) if name.namespace.is_none() => tool.name == name.name,
ToolSpec::Freeform(tool) if name.namespace.is_none() => tool.name == name.name,
ToolSpec::Namespace(namespace) => namespace.tools.iter().any(|tool| match tool {
ResponsesApiNamespaceTool::Function(tool) => {
name.namespace.as_deref() == Some(namespace.name.as_str())
&& tool.name == name.name
}
}),
ToolSpec::ToolSearch { .. } if name.namespace.is_none() => name.name == "tool_search",
ToolSpec::LocalShell {} if name.namespace.is_none() => name.name == "local_shell",
ToolSpec::ImageGeneration { .. } if name.namespace.is_none() => {
name.name == "image_generation"
}
ToolSpec::WebSearch { .. } if name.namespace.is_none() => name.name == "web_search",
ToolSpec::Function(_)
| ToolSpec::Freeform(_)
| ToolSpec::ToolSearch { .. }
| ToolSpec::LocalShell {}
| ToolSpec::ImageGeneration { .. }
| ToolSpec::WebSearch { .. } => false,
});
if has_spec {
self.register_any_handler(handler);
}
}
pub fn build(self) -> (Vec<ConfiguredToolSpec>, ToolRegistry) {
let registry = ToolRegistry::new(self.handlers);
(self.specs, registry)

View File

@@ -5,6 +5,7 @@ use crate::session::turn_context::TurnContext;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::registry::AnyToolHandler;
use crate::tools::registry::AnyToolResult;
use crate::tools::registry::ToolArgumentDiffConsumer;
use crate::tools::registry::ToolRegistry;
@@ -50,6 +51,7 @@ pub(crate) struct ToolRouterParams<'a> {
pub(crate) parallel_mcp_server_names: HashSet<String>,
pub(crate) discoverable_tools: Option<Vec<DiscoverableTool>>,
pub(crate) dynamic_tools: &'a [DynamicToolSpec],
pub(crate) extension_tool_handlers: Vec<Arc<dyn AnyToolHandler>>,
}
impl ToolRouter {
@@ -61,8 +63,9 @@ impl ToolRouter {
parallel_mcp_server_names,
discoverable_tools,
dynamic_tools,
extension_tool_handlers,
} = params;
let builder = build_specs_with_discoverable_tools(
let mut builder = build_specs_with_discoverable_tools(
config,
mcp_tools,
deferred_mcp_tools,
@@ -70,6 +73,9 @@ impl ToolRouter {
discoverable_tools,
dynamic_tools,
);
for handler in extension_tool_handlers {
builder.register_any_handler_if_configured(handler);
}
let (specs, registry) = builder.build();
let deferred_dynamic_tools = dynamic_tools
.iter()

View File

@@ -38,6 +38,7 @@ async fn parallel_support_does_not_match_namespaced_local_tool_names() -> anyhow
parallel_mcp_server_names: HashSet::new(),
discoverable_tools: None,
dynamic_tools: turn.dynamic_tools.as_slice(),
extension_tool_handlers: Vec::new(),
},
);
@@ -111,6 +112,7 @@ async fn mcp_parallel_support_uses_exact_payload_server() -> anyhow::Result<()>
parallel_mcp_server_names: HashSet::from(["echo".to_string()]),
discoverable_tools: None,
dynamic_tools: turn.dynamic_tools.as_slice(),
extension_tool_handlers: Vec::new(),
},
);
@@ -178,6 +180,7 @@ async fn model_visible_specs_filter_deferred_dynamic_tools() -> anyhow::Result<(
parallel_mcp_server_names: HashSet::new(),
discoverable_tools: None,
dynamic_tools: &dynamic_tools,
extension_tool_handlers: Vec::new(),
},
);

View File

@@ -89,7 +89,6 @@ pub(crate) fn build_specs_with_discoverable_tools(
use crate::tools::handlers::PlanHandler;
use crate::tools::handlers::ReadMcpResourceHandler;
use crate::tools::handlers::RequestPermissionsHandler;
use crate::tools::handlers::RequestPluginInstallHandler;
use crate::tools::handlers::RequestUserInputHandler;
use crate::tools::handlers::ShellCommandHandler;
use crate::tools::handlers::ShellHandler;
@@ -284,9 +283,6 @@ pub(crate) fn build_specs_with_discoverable_tools(
);
builder.register_handler(Arc::new(ToolSearchHandler::new(entries)));
}
ToolHandlerKind::RequestPluginInstall => {
builder.register_handler(Arc::new(RequestPluginInstallHandler));
}
ToolHandlerKind::UpdateGoal => {
builder.register_handler(Arc::new(UpdateGoalHandler));
}

View File

@@ -354,6 +354,7 @@ async fn assert_model_tools(
parallel_mcp_server_names: std::collections::HashSet::new(),
discoverable_tools: None,
dynamic_tools: &[],
extension_tool_handlers: Vec::new(),
},
);
let model_visible_specs = router.model_visible_specs();

View File

@@ -433,17 +433,17 @@ impl TestCodexBuilder {
let thread_store = thread_store_from_config(&config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let installation_id = resolve_installation_id(&config.codex_home).await?;
ThreadManager::new(
ThreadManager::builder(
&config,
codex_core::test_support::auth_manager_from_auth(auth.clone()),
SessionSource::Exec,
Arc::clone(&environment_manager),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
installation_id,
)
.session_source(SessionSource::Exec)
.build()
} else {
codex_core::test_support::thread_manager_with_models_provider_and_home(
auth.clone(),

View File

@@ -1124,17 +1124,17 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
let installation_id = resolve_installation_id(&config.codex_home)
.await
.expect("resolve installation id");
let thread_manager = ThreadManager::new(
let thread_manager = ThreadManager::builder(
&config,
auth_manager,
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
installation_id,
);
)
.session_source(SessionSource::Exec)
.build();
let NewThread { thread: codex, .. } = thread_manager
.start_thread(config.clone())
.await

View File

@@ -66,17 +66,17 @@ impl MessageProcessor {
let state_db = init_state_db_from_config(config.as_ref()).await?;
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_manager = Arc::new(ThreadManager::new(
let thread_manager = Arc::new(ThreadManager::builder(
config.as_ref(),
auth_manager,
SessionSource::Mcp,
environment_manager,
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
installation_id,
));
)
.session_source(SessionSource::Mcp)
.build());
Some(Self {
outgoing,
initialized: false,

View File

@@ -121,17 +121,17 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
let environment_manager =
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await);
let installation_id = resolve_installation_id(&config.codex_home).await?;
let thread_manager = ThreadManager::new(
let thread_manager = ThreadManager::builder(
&config,
auth_manager,
SessionSource::Exec,
environment_manager,
/*analytics_events_client*/ None,
state_db,
Arc::clone(&thread_store),
agent_graph_store,
installation_id,
);
)
.session_source(SessionSource::Exec)
.build();
let NewThread {
thread_id, thread, ..

View File

@@ -1,5 +1,4 @@
use crate::CommandToolOptions;
use crate::REQUEST_PLUGIN_INSTALL_TOOL_NAME;
use crate::REQUEST_USER_INPUT_TOOL_NAME;
use crate::ResponsesApiNamespace;
use crate::ResponsesApiNamespaceTool;
@@ -328,10 +327,6 @@ pub fn build_tool_registry_plan(
/*supports_parallel_tool_calls*/ true,
/*code_mode_enabled*/ false,
);
plan.register_handler(
REQUEST_PLUGIN_INSTALL_TOOL_NAME,
ToolHandlerKind::RequestPluginInstall,
);
}
if config.environment_mode.has_environment()

View File

@@ -1868,10 +1868,7 @@ fn request_plugin_install_description_lists_discoverable_tools() {
Some(discoverable_tools),
&[],
);
assert!(handlers.contains(&ToolHandlerSpec {
name: ToolName::plain(REQUEST_PLUGIN_INSTALL_TOOL_NAME),
kind: ToolHandlerKind::RequestPluginInstall,
}));
assert!(handlers.is_empty());
let request_plugin_install = find_tool(&tools, REQUEST_PLUGIN_INSTALL_TOOL_NAME);
let ToolSpec::Function(ResponsesApiTool {

View File

@@ -29,7 +29,6 @@ pub enum ToolHandlerKind {
Plan,
ReadMcpResource,
ReportAgentJobResult,
RequestPluginInstall,
RequestPermissions,
RequestUserInput,
ResumeAgentV1,