mirror of
https://github.com/openai/codex.git
synced 2026-05-09 13:52:41 +00:00
Compare commits
3 Commits
xli-codex/
...
pakrym/cor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a797d50b66 | ||
|
|
7c1a3455fa | ||
|
|
39c707410b |
@@ -90,6 +90,7 @@ mod mcp_refresh;
|
||||
mod message_processor;
|
||||
mod models;
|
||||
mod outgoing_message;
|
||||
mod plugin_install_suggest;
|
||||
mod request_processors;
|
||||
mod request_serialization;
|
||||
mod server_request_error;
|
||||
|
||||
@@ -180,17 +180,17 @@ mod tests {
|
||||
.expect("refresh tests require state db");
|
||||
let thread_store = thread_store_from_config(&good_config, state_db.clone());
|
||||
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
let thread_manager = Arc::new(ThreadManager::builder(
|
||||
&good_config,
|
||||
auth_manager,
|
||||
SessionSource::Exec,
|
||||
Arc::new(EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
"11111111-1111-4111-8111-111111111111".to_string(),
|
||||
));
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build());
|
||||
thread_manager.start_thread(good_config).await?;
|
||||
thread_manager.start_thread(bad_config).await?;
|
||||
|
||||
|
||||
@@ -295,17 +295,27 @@ impl MessageProcessor {
|
||||
// resumed, or forked threads to a different persistence backend/root.
|
||||
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
|
||||
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
let plugin_install_suggest_client_names:
|
||||
crate::plugin_install_suggest::AppServerClientNames =
|
||||
Arc::new(std::sync::RwLock::new(Default::default()));
|
||||
let thread_manager = Arc::new(ThreadManager::builder(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
environment_manager,
|
||||
Some(analytics_events_client.clone()),
|
||||
state_db.clone(),
|
||||
Arc::clone(&thread_store),
|
||||
agent_graph_store.clone(),
|
||||
installation_id,
|
||||
));
|
||||
)
|
||||
.register_extension(Arc::new(
|
||||
crate::plugin_install_suggest::PluginInstallSuggestToolProvider::new(
|
||||
auth_manager.clone(),
|
||||
Arc::clone(&plugin_install_suggest_client_names),
|
||||
),
|
||||
) as Arc<dyn codex_core::ToolProvider>)
|
||||
.session_source(session_source)
|
||||
.analytics_events_client(analytics_events_client.clone())
|
||||
.build());
|
||||
thread_manager
|
||||
.plugins_manager()
|
||||
.set_analytics_events_client(analytics_events_client.clone());
|
||||
@@ -402,6 +412,7 @@ impl MessageProcessor {
|
||||
Arc::clone(&thread_list_state_permit),
|
||||
thread_goal_processor.clone(),
|
||||
Some(state_db.clone()),
|
||||
Arc::clone(&plugin_install_suggest_client_names),
|
||||
);
|
||||
let turn_processor = TurnRequestProcessor::new(
|
||||
auth_manager.clone(),
|
||||
@@ -415,6 +426,7 @@ impl MessageProcessor {
|
||||
thread_state_manager,
|
||||
thread_watch_manager,
|
||||
thread_list_state_permit,
|
||||
plugin_install_suggest_client_names,
|
||||
);
|
||||
if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) {
|
||||
// Keep plugin startup warmups aligned at app-server startup.
|
||||
|
||||
397
codex-rs/app-server/src/plugin_install_suggest.rs
Normal file
397
codex-rs/app-server/src/plugin_install_suggest.rs
Normal file
@@ -0,0 +1,397 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use codex_app_server_protocol::AppInfo;
|
||||
use codex_config::types::ToolSuggestDisabledTool;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::edit::ConfigEdit;
|
||||
use codex_core::config::edit::ConfigEditsBuilder;
|
||||
use codex_core::connectors;
|
||||
use codex_core::extensibility::AnyToolHandler;
|
||||
use codex_core::extensibility::FunctionCallError;
|
||||
use codex_core::extensibility::FunctionToolOutput;
|
||||
use codex_core::extensibility::ToolHandler;
|
||||
use codex_core::extensibility::ToolInvocation;
|
||||
use codex_core::extensibility::ToolKind;
|
||||
use codex_core::extensibility::ToolProvider;
|
||||
use codex_core::extensibility::ToolProviderContext;
|
||||
use codex_core_plugins::PluginsManager;
|
||||
use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_rmcp_client::ElicitationAction;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use codex_tools::DiscoverableTool;
|
||||
use codex_tools::DiscoverableToolAction;
|
||||
use codex_tools::REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE;
|
||||
use codex_tools::REQUEST_PLUGIN_INSTALL_PERSIST_KEY;
|
||||
use codex_tools::REQUEST_PLUGIN_INSTALL_TOOL_NAME;
|
||||
use codex_tools::RequestPluginInstallArgs;
|
||||
use codex_tools::RequestPluginInstallResult;
|
||||
use codex_tools::ToolName;
|
||||
use codex_tools::all_requested_connectors_picked_up;
|
||||
use codex_tools::build_request_plugin_install_elicitation_request;
|
||||
use codex_tools::verified_connector_install_completed;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use serde_json::Value;
|
||||
use tracing::warn;
|
||||
|
||||
pub(crate) type AppServerClientNames = Arc<RwLock<HashMap<String, String>>>;
|
||||
|
||||
pub(crate) fn set_app_server_client_name(
|
||||
app_server_client_names: &AppServerClientNames,
|
||||
thread_id: ThreadId,
|
||||
app_server_client_name: Option<String>,
|
||||
) {
|
||||
let Ok(mut app_server_client_names) = app_server_client_names.write() else {
|
||||
return;
|
||||
};
|
||||
let thread_id = thread_id.to_string();
|
||||
match app_server_client_name {
|
||||
Some(app_server_client_name) => {
|
||||
app_server_client_names.insert(thread_id, app_server_client_name);
|
||||
}
|
||||
None => {
|
||||
app_server_client_names.remove(&thread_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct PluginInstallSuggestToolProvider {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
app_server_client_names: AppServerClientNames,
|
||||
}
|
||||
|
||||
impl PluginInstallSuggestToolProvider {
|
||||
pub(crate) fn new(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
app_server_client_names: AppServerClientNames,
|
||||
) -> Self {
|
||||
Self {
|
||||
auth_manager,
|
||||
app_server_client_names,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ToolProvider for PluginInstallSuggestToolProvider {
|
||||
fn handlers(&self, context: ToolProviderContext) -> Vec<Arc<dyn AnyToolHandler>> {
|
||||
let config = context.config();
|
||||
let conversation_id = context.conversation_id();
|
||||
let features = config.features.get();
|
||||
if !features.enabled(Feature::ToolSuggest)
|
||||
|| !features.enabled(Feature::Apps)
|
||||
|| !features.enabled(Feature::Plugins)
|
||||
{
|
||||
return Vec::new();
|
||||
}
|
||||
let Ok(app_server_client_names) = self.app_server_client_names.read() else {
|
||||
return Vec::new();
|
||||
};
|
||||
if app_server_client_names
|
||||
.get(&conversation_id)
|
||||
.is_some_and(|client_name| client_name == "codex-tui")
|
||||
{
|
||||
return Vec::new();
|
||||
}
|
||||
drop(app_server_client_names);
|
||||
|
||||
vec![Arc::new(RequestPluginInstallHandler {
|
||||
config,
|
||||
auth_manager: Arc::clone(&self.auth_manager),
|
||||
plugins_manager: context.plugins_manager(),
|
||||
conversation_id,
|
||||
turn_id: context.turn_id(),
|
||||
})]
|
||||
}
|
||||
}
|
||||
|
||||
struct RequestPluginInstallHandler {
|
||||
config: Arc<Config>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
plugins_manager: Arc<PluginsManager>,
|
||||
conversation_id: String,
|
||||
turn_id: String,
|
||||
}
|
||||
|
||||
impl ToolHandler for RequestPluginInstallHandler {
|
||||
type Output = FunctionToolOutput;
|
||||
|
||||
fn tool_name(&self) -> ToolName {
|
||||
ToolName::plain(REQUEST_PLUGIN_INSTALL_TOOL_NAME)
|
||||
}
|
||||
|
||||
fn kind(&self) -> ToolKind {
|
||||
ToolKind::Function
|
||||
}
|
||||
|
||||
fn handle(
|
||||
&self,
|
||||
invocation: ToolInvocation,
|
||||
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
|
||||
self.handle_request_plugin_install(invocation)
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestPluginInstallHandler {
|
||||
async fn handle_request_plugin_install(
|
||||
&self,
|
||||
invocation: ToolInvocation,
|
||||
) -> Result<FunctionToolOutput, FunctionCallError> {
|
||||
let arguments = invocation.function_arguments(REQUEST_PLUGIN_INSTALL_TOOL_NAME)?;
|
||||
let args: RequestPluginInstallArgs = serde_json::from_str(arguments).map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!("failed to parse function arguments: {err}"))
|
||||
})?;
|
||||
let suggest_reason = args.suggest_reason.trim();
|
||||
if suggest_reason.is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"suggest_reason must not be empty".to_string(),
|
||||
));
|
||||
}
|
||||
if args.action_type != DiscoverableToolAction::Install {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"plugin install requests currently support only action_type=\"install\""
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let auth = self.auth_manager.auth().await;
|
||||
let mcp_tools = invocation.list_mcp_tools().await;
|
||||
let accessible_connectors = connectors::with_app_enabled_state(
|
||||
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
|
||||
self.config.as_ref(),
|
||||
);
|
||||
let discoverable_tools = connectors::list_tool_suggest_discoverable_tools_with_auth(
|
||||
self.config.as_ref(),
|
||||
auth.as_ref(),
|
||||
&accessible_connectors,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"plugin install requests are unavailable right now: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let tool = discoverable_tools
|
||||
.into_iter()
|
||||
.find(|tool| tool.tool_type() == args.tool_type && tool.id() == args.tool_id)
|
||||
.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"tool_id must match one of the discoverable tools exposed by {REQUEST_PLUGIN_INSTALL_TOOL_NAME}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let params = build_request_plugin_install_elicitation_request(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
self.conversation_id.clone(),
|
||||
self.turn_id.clone(),
|
||||
&args,
|
||||
suggest_reason,
|
||||
&tool,
|
||||
);
|
||||
let request_id = format!("request_plugin_install_{}", invocation.call_id());
|
||||
let response = invocation
|
||||
.request_mcp_server_elicitation(request_id, params)
|
||||
.await;
|
||||
if let Some(response) = response.as_ref()
|
||||
&& maybe_persist_disabled_install_request(&self.config.codex_home, &tool, response)
|
||||
.await
|
||||
{
|
||||
invocation.reload_user_config_layer().await;
|
||||
}
|
||||
let user_confirmed = response
|
||||
.as_ref()
|
||||
.is_some_and(|response| response.action == ElicitationAction::Accept);
|
||||
|
||||
let completed = if user_confirmed {
|
||||
self.verify_request_plugin_install_completed(&invocation, &tool, auth.as_ref())
|
||||
.await
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if completed && let DiscoverableTool::Connector(connector) = &tool {
|
||||
invocation
|
||||
.merge_connector_selection(HashSet::from([connector.id.clone()]))
|
||||
.await;
|
||||
}
|
||||
|
||||
let content = serde_json::to_string(&RequestPluginInstallResult {
|
||||
completed,
|
||||
user_confirmed,
|
||||
tool_type: args.tool_type,
|
||||
action_type: args.action_type,
|
||||
tool_id: tool.id().to_string(),
|
||||
tool_name: tool.name().to_string(),
|
||||
suggest_reason: suggest_reason.to_string(),
|
||||
})
|
||||
.map_err(|err| {
|
||||
FunctionCallError::Fatal(format!(
|
||||
"failed to serialize {REQUEST_PLUGIN_INSTALL_TOOL_NAME} response: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(FunctionToolOutput::from_text(content, Some(true)))
|
||||
}
|
||||
|
||||
async fn verify_request_plugin_install_completed(
|
||||
&self,
|
||||
invocation: &ToolInvocation,
|
||||
tool: &DiscoverableTool,
|
||||
auth: Option<&CodexAuth>,
|
||||
) -> bool {
|
||||
match tool {
|
||||
DiscoverableTool::Connector(connector) => self
|
||||
.refresh_missing_requested_connectors(
|
||||
invocation,
|
||||
auth,
|
||||
std::slice::from_ref(&connector.id),
|
||||
connector.id.as_str(),
|
||||
)
|
||||
.await
|
||||
.is_some_and(|accessible_connectors| {
|
||||
verified_connector_install_completed(
|
||||
connector.id.as_str(),
|
||||
&accessible_connectors,
|
||||
)
|
||||
}),
|
||||
DiscoverableTool::Plugin(plugin) => {
|
||||
invocation.reload_user_config_layer().await;
|
||||
let completed = verified_plugin_install_completed(
|
||||
plugin.id.as_str(),
|
||||
self.config.as_ref(),
|
||||
self.plugins_manager.as_ref(),
|
||||
);
|
||||
let _ = self
|
||||
.refresh_missing_requested_connectors(
|
||||
invocation,
|
||||
auth,
|
||||
&plugin.app_connector_ids,
|
||||
plugin.id.as_str(),
|
||||
)
|
||||
.await;
|
||||
completed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn refresh_missing_requested_connectors(
|
||||
&self,
|
||||
invocation: &ToolInvocation,
|
||||
auth: Option<&CodexAuth>,
|
||||
expected_connector_ids: &[String],
|
||||
tool_id: &str,
|
||||
) -> Option<Vec<AppInfo>> {
|
||||
if expected_connector_ids.is_empty() {
|
||||
return Some(Vec::new());
|
||||
}
|
||||
|
||||
let mcp_tools = invocation.list_mcp_tools().await;
|
||||
let accessible_connectors = connectors::with_app_enabled_state(
|
||||
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
|
||||
self.config.as_ref(),
|
||||
);
|
||||
if all_requested_connectors_picked_up(expected_connector_ids, &accessible_connectors) {
|
||||
return Some(accessible_connectors);
|
||||
}
|
||||
|
||||
match invocation.hard_refresh_codex_apps_tools_cache().await {
|
||||
Ok(mcp_tools) => {
|
||||
let accessible_connectors = connectors::with_app_enabled_state(
|
||||
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
|
||||
self.config.as_ref(),
|
||||
);
|
||||
connectors::refresh_accessible_connectors_cache_from_mcp_tools(
|
||||
self.config.as_ref(),
|
||||
auth,
|
||||
&mcp_tools,
|
||||
);
|
||||
Some(accessible_connectors)
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to refresh codex apps tools cache after plugin install request for {tool_id}: {err:#}"
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn maybe_persist_disabled_install_request(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
tool: &DiscoverableTool,
|
||||
response: &ElicitationResponse,
|
||||
) -> bool {
|
||||
if !request_plugin_install_response_requests_persistent_disable(response) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Err(err) = persist_disabled_install_request(codex_home, tool).await {
|
||||
warn!(
|
||||
error = %err,
|
||||
tool_id = tool.id(),
|
||||
"failed to persist disabled tool suggestion"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn request_plugin_install_response_requests_persistent_disable(
|
||||
response: &ElicitationResponse,
|
||||
) -> bool {
|
||||
if response.action != ElicitationAction::Decline {
|
||||
return false;
|
||||
}
|
||||
|
||||
response
|
||||
.meta
|
||||
.as_ref()
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|meta| meta.get(REQUEST_PLUGIN_INSTALL_PERSIST_KEY))
|
||||
.and_then(Value::as_str)
|
||||
== Some(REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE)
|
||||
}
|
||||
|
||||
async fn persist_disabled_install_request(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
tool: &DiscoverableTool,
|
||||
) -> anyhow::Result<()> {
|
||||
ConfigEditsBuilder::new(codex_home)
|
||||
.with_edits([ConfigEdit::AddToolSuggestDisabledTool(
|
||||
disabled_install_request(tool),
|
||||
)])
|
||||
.apply()
|
||||
.await
|
||||
}
|
||||
|
||||
fn disabled_install_request(tool: &DiscoverableTool) -> ToolSuggestDisabledTool {
|
||||
match tool {
|
||||
DiscoverableTool::Connector(connector) => {
|
||||
ToolSuggestDisabledTool::connector(connector.id.as_str())
|
||||
}
|
||||
DiscoverableTool::Plugin(plugin) => ToolSuggestDisabledTool::plugin(plugin.id.as_str()),
|
||||
}
|
||||
}
|
||||
|
||||
fn verified_plugin_install_completed(
|
||||
tool_id: &str,
|
||||
config: &Config,
|
||||
plugins_manager: &PluginsManager,
|
||||
) -> bool {
|
||||
let plugins_input = config.plugins_config_input();
|
||||
plugins_manager
|
||||
.list_marketplaces_for_config(&plugins_input, &[])
|
||||
.ok()
|
||||
.into_iter()
|
||||
.flat_map(|outcome| outcome.marketplaces)
|
||||
.flat_map(|marketplace| marketplace.plugins.into_iter())
|
||||
.any(|plugin| plugin.id == tool_id && plugin.installed)
|
||||
}
|
||||
@@ -316,6 +316,8 @@ pub(crate) struct ThreadRequestProcessor {
|
||||
pub(super) thread_list_state_permit: Arc<Semaphore>,
|
||||
pub(super) thread_goal_processor: ThreadGoalRequestProcessor,
|
||||
pub(super) state_db: Option<StateDbHandle>,
|
||||
pub(super) plugin_install_suggest_client_names:
|
||||
crate::plugin_install_suggest::AppServerClientNames,
|
||||
pub(super) background_tasks: TaskTracker,
|
||||
}
|
||||
|
||||
@@ -336,6 +338,7 @@ impl ThreadRequestProcessor {
|
||||
thread_list_state_permit: Arc<Semaphore>,
|
||||
thread_goal_processor: ThreadGoalRequestProcessor,
|
||||
state_db: Option<StateDbHandle>,
|
||||
plugin_install_suggest_client_names: crate::plugin_install_suggest::AppServerClientNames,
|
||||
) -> Self {
|
||||
Self {
|
||||
auth_manager,
|
||||
@@ -352,6 +355,7 @@ impl ThreadRequestProcessor {
|
||||
thread_list_state_permit,
|
||||
thread_goal_processor,
|
||||
state_db,
|
||||
plugin_install_suggest_client_names,
|
||||
background_tasks: TaskTracker::new(),
|
||||
}
|
||||
}
|
||||
@@ -657,10 +661,12 @@ impl ThreadRequestProcessor {
|
||||
}
|
||||
|
||||
async fn set_app_server_client_info(
|
||||
plugin_install_suggest_client_names: &crate::plugin_install_suggest::AppServerClientNames,
|
||||
thread: &CodexThread,
|
||||
app_server_client_name: Option<String>,
|
||||
app_server_client_version: Option<String>,
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
let client_name_for_tool_provider = app_server_client_name.clone();
|
||||
let mcp_elicitations_auto_deny = xcode_26_4_mcp_elicitations_auto_deny(
|
||||
app_server_client_name.as_deref(),
|
||||
app_server_client_version.as_deref(),
|
||||
@@ -672,7 +678,13 @@ impl ThreadRequestProcessor {
|
||||
mcp_elicitations_auto_deny,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| internal_error(format!("failed to set app server client info: {err}")))
|
||||
.map_err(|err| internal_error(format!("failed to set app server client info: {err}")))?;
|
||||
crate::plugin_install_suggest::set_app_server_client_name(
|
||||
plugin_install_suggest_client_names,
|
||||
thread.session_configured().thread_id,
|
||||
client_name_for_tool_provider,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finalize_thread_teardown(&self, thread_id: ThreadId) {
|
||||
@@ -849,6 +861,8 @@ impl ThreadRequestProcessor {
|
||||
let config_manager = self.config_manager.clone();
|
||||
let outgoing = Arc::clone(&listener_task_context.outgoing);
|
||||
let error_request_id = request_id.clone();
|
||||
let plugin_install_suggest_client_names =
|
||||
Arc::clone(&self.plugin_install_suggest_client_names);
|
||||
let thread_start_task = async move {
|
||||
if let Err(error) = Self::thread_start_task(
|
||||
listener_task_context,
|
||||
@@ -865,6 +879,7 @@ impl ThreadRequestProcessor {
|
||||
service_name,
|
||||
experimental_raw_events,
|
||||
request_trace,
|
||||
plugin_install_suggest_client_names,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -949,6 +964,7 @@ impl ThreadRequestProcessor {
|
||||
service_name: Option<String>,
|
||||
experimental_raw_events: bool,
|
||||
request_trace: Option<W3cTraceContext>,
|
||||
plugin_install_suggest_client_names: crate::plugin_install_suggest::AppServerClientNames,
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
let requested_cwd = typesafe_overrides.cwd.clone();
|
||||
let mut config = config_manager
|
||||
@@ -1081,6 +1097,7 @@ impl ThreadRequestProcessor {
|
||||
})?;
|
||||
|
||||
Self::set_app_server_client_info(
|
||||
&plugin_install_suggest_client_names,
|
||||
thread.as_ref(),
|
||||
app_server_client_name,
|
||||
app_server_client_version,
|
||||
@@ -2404,6 +2421,7 @@ impl ThreadRequestProcessor {
|
||||
..
|
||||
}) => {
|
||||
if let Err(err) = Self::set_app_server_client_info(
|
||||
&self.plugin_install_suggest_client_names,
|
||||
codex_thread.as_ref(),
|
||||
app_server_client_name,
|
||||
app_server_client_version,
|
||||
@@ -2638,6 +2656,7 @@ impl ThreadRequestProcessor {
|
||||
)
|
||||
.await?;
|
||||
Self::set_app_server_client_info(
|
||||
&self.plugin_install_suggest_client_names,
|
||||
existing_thread.as_ref(),
|
||||
app_server_client_name,
|
||||
app_server_client_version,
|
||||
@@ -3053,6 +3072,7 @@ impl ThreadRequestProcessor {
|
||||
})?;
|
||||
|
||||
Self::set_app_server_client_info(
|
||||
&self.plugin_install_suggest_client_names,
|
||||
forked_thread.as_ref(),
|
||||
app_server_client_name,
|
||||
app_server_client_version,
|
||||
|
||||
@@ -13,6 +13,7 @@ pub(crate) struct TurnRequestProcessor {
|
||||
thread_state_manager: ThreadStateManager,
|
||||
thread_watch_manager: ThreadWatchManager,
|
||||
thread_list_state_permit: Arc<Semaphore>,
|
||||
plugin_install_suggest_client_names: crate::plugin_install_suggest::AppServerClientNames,
|
||||
}
|
||||
|
||||
impl TurnRequestProcessor {
|
||||
@@ -29,6 +30,7 @@ impl TurnRequestProcessor {
|
||||
thread_state_manager: ThreadStateManager,
|
||||
thread_watch_manager: ThreadWatchManager,
|
||||
thread_list_state_permit: Arc<Semaphore>,
|
||||
plugin_install_suggest_client_names: crate::plugin_install_suggest::AppServerClientNames,
|
||||
) -> Self {
|
||||
Self {
|
||||
auth_manager,
|
||||
@@ -42,6 +44,7 @@ impl TurnRequestProcessor {
|
||||
thread_state_manager,
|
||||
thread_watch_manager,
|
||||
thread_list_state_permit,
|
||||
plugin_install_suggest_client_names,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,7 +333,7 @@ impl TurnRequestProcessor {
|
||||
.inspect_err(|error| {
|
||||
self.track_error_response(&request_id, error, /*error_type*/ None);
|
||||
})?;
|
||||
Self::set_app_server_client_info(
|
||||
self.set_app_server_client_info(
|
||||
thread.as_ref(),
|
||||
app_server_client_name,
|
||||
app_server_client_version,
|
||||
@@ -541,10 +544,12 @@ impl TurnRequestProcessor {
|
||||
}
|
||||
|
||||
async fn set_app_server_client_info(
|
||||
&self,
|
||||
thread: &CodexThread,
|
||||
app_server_client_name: Option<String>,
|
||||
app_server_client_version: Option<String>,
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
let client_name_for_tool_provider = app_server_client_name.clone();
|
||||
let mcp_elicitations_auto_deny = xcode_26_4_mcp_elicitations_auto_deny(
|
||||
app_server_client_name.as_deref(),
|
||||
app_server_client_version.as_deref(),
|
||||
@@ -556,7 +561,13 @@ impl TurnRequestProcessor {
|
||||
mcp_elicitations_auto_deny,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| internal_error(format!("failed to set app server client info: {err}")))
|
||||
.map_err(|err| internal_error(format!("failed to set app server client info: {err}")))?;
|
||||
crate::plugin_install_suggest::set_app_server_client_name(
|
||||
&self.plugin_install_suggest_client_names,
|
||||
thread.session_configured().thread_id,
|
||||
client_name_for_tool_provider,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn turn_steer_inner(
|
||||
|
||||
@@ -24,13 +24,21 @@ pub use codex_config::types::TuiKeymap;
|
||||
pub use codex_config::types::TuiNotificationSettings;
|
||||
pub use codex_config::types::UriBasedFileOpener;
|
||||
pub use codex_core::CodexThread;
|
||||
pub use codex_core::ExtensionRegistry;
|
||||
pub use codex_core::ForkSnapshot;
|
||||
pub use codex_core::AnyToolHandler;
|
||||
pub use codex_core::McpManager;
|
||||
pub use codex_core::NewThread;
|
||||
pub use codex_core::StartThreadOptions;
|
||||
pub use codex_core::StateDbHandle;
|
||||
pub use codex_core::ThreadManager;
|
||||
pub use codex_core::ThreadManagerBuilder;
|
||||
pub use codex_core::ThreadShutdownReport;
|
||||
pub use codex_core::FunctionCallError;
|
||||
pub use codex_core::FunctionToolOutput;
|
||||
pub use codex_core::ToolHandler;
|
||||
pub use codex_core::ToolInvocation;
|
||||
pub use codex_core::ToolProvider;
|
||||
pub use codex_core::agent_graph_store_from_state_db;
|
||||
pub use codex_core::config::Config;
|
||||
pub use codex_core::config::Constrained;
|
||||
|
||||
@@ -100,6 +100,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
|
||||
state_db: parent_session.services.state_db.clone(),
|
||||
thread_store: Arc::clone(&parent_session.services.thread_store),
|
||||
extensions: parent_session.services.extensions.clone(),
|
||||
}))
|
||||
.or_cancel(&cancel_token)
|
||||
.await??;
|
||||
|
||||
@@ -115,7 +115,7 @@ pub(crate) async fn list_accessible_and_enabled_connectors_from_manager(
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) async fn list_tool_suggest_discoverable_tools_with_auth(
|
||||
pub async fn list_tool_suggest_discoverable_tools_with_auth(
|
||||
config: &Config,
|
||||
auth: Option<&CodexAuth>,
|
||||
accessible_connectors: &[AppInfo],
|
||||
@@ -162,7 +162,7 @@ pub async fn list_cached_accessible_connectors_from_mcp_tools(
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn refresh_accessible_connectors_cache_from_mcp_tools(
|
||||
pub fn refresh_accessible_connectors_cache_from_mcp_tools(
|
||||
config: &Config,
|
||||
auth: Option<&CodexAuth>,
|
||||
mcp_tools: &HashMap<String, ToolInfo>,
|
||||
@@ -515,7 +515,7 @@ async fn chatgpt_get_request_with_auth_provider<T: DeserializeOwned>(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn accessible_connectors_from_mcp_tools(
|
||||
pub fn accessible_connectors_from_mcp_tools(
|
||||
mcp_tools: &HashMap<String, ToolInfo>,
|
||||
) -> Vec<AppInfo> {
|
||||
// ToolInfo already carries plugin provenance, so app-level plugin sources
|
||||
|
||||
110
codex-rs/core/src/extensibility/mod.rs
Normal file
110
codex-rs/core/src/extensibility/mod.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
//! Extension points for codex-core.
|
||||
|
||||
use std::any::Any;
|
||||
use std::any::TypeId;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::config::Config;
|
||||
use codex_core_plugins::PluginsManager;
|
||||
|
||||
pub use crate::function_tool::FunctionCallError;
|
||||
pub use crate::tools::context::FunctionToolOutput;
|
||||
pub use crate::tools::context::ToolInvocation;
|
||||
pub use crate::tools::registry::AnyToolHandler;
|
||||
pub use crate::tools::registry::ToolHandler;
|
||||
pub use crate::tools::registry::ToolKind;
|
||||
|
||||
/// Stores registered implementations of codex-core extension traits.
|
||||
///
|
||||
/// Registrations are keyed by the trait object type used at insertion time. To
|
||||
/// register an implementation for an extension trait, coerce it to that trait
|
||||
/// object first, such as `Arc<dyn ToolProvider>`.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ExtensionRegistry {
|
||||
extensions: HashMap<TypeId, Vec<Arc<dyn Any + Send + Sync>>>,
|
||||
}
|
||||
|
||||
impl ExtensionRegistry {
|
||||
/// Register one implementation for the extension trait `T`.
|
||||
pub fn register<T>(&mut self, extension: Arc<T>)
|
||||
where
|
||||
T: ?Sized + Send + Sync + 'static,
|
||||
{
|
||||
let extension = Arc::new(extension);
|
||||
self.extensions
|
||||
.entry(TypeId::of::<T>())
|
||||
.or_default()
|
||||
.push(extension);
|
||||
}
|
||||
|
||||
/// Return all registered implementations for the extension trait `T`.
|
||||
pub fn get<T>(&self) -> Vec<Arc<T>>
|
||||
where
|
||||
T: ?Sized + Send + Sync + 'static,
|
||||
{
|
||||
self.extensions
|
||||
.get(&TypeId::of::<T>())
|
||||
.into_iter()
|
||||
.flat_map(|extensions| extensions.iter())
|
||||
.filter_map(|extension| extension.downcast_ref::<Arc<T>>().cloned())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns true when no extension traits have been registered.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.extensions.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Runtime context available when extension providers create tool handlers.
|
||||
#[derive(Clone)]
|
||||
pub struct ToolProviderContext {
|
||||
config: Arc<Config>,
|
||||
plugins_manager: Arc<PluginsManager>,
|
||||
conversation_id: String,
|
||||
turn_id: String,
|
||||
}
|
||||
|
||||
impl ToolProviderContext {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
config: Arc<Config>,
|
||||
plugins_manager: Arc<PluginsManager>,
|
||||
conversation_id: String,
|
||||
turn_id: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
config,
|
||||
plugins_manager,
|
||||
conversation_id,
|
||||
turn_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn config(&self) -> Arc<Config> {
|
||||
Arc::clone(&self.config)
|
||||
}
|
||||
|
||||
pub fn plugins_manager(&self) -> Arc<PluginsManager> {
|
||||
Arc::clone(&self.plugins_manager)
|
||||
}
|
||||
|
||||
pub fn conversation_id(&self) -> String {
|
||||
self.conversation_id.clone()
|
||||
}
|
||||
|
||||
pub fn turn_id(&self) -> String {
|
||||
self.turn_id.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides tools through codex-core extensibility.
|
||||
///
|
||||
/// Implementations are expected to return handlers owned by the provider. Tool
|
||||
/// specs may still be provided by the existing built-in plan while handlers
|
||||
/// migrate behind this extension point.
|
||||
pub trait ToolProvider: Send + Sync + 'static {
|
||||
/// Return tool handlers owned by this provider for the current config.
|
||||
fn handlers(&self, context: ToolProviderContext) -> Vec<Arc<dyn AnyToolHandler>>;
|
||||
}
|
||||
@@ -33,6 +33,14 @@ mod context_manager;
|
||||
mod environment_selection;
|
||||
pub mod exec;
|
||||
pub mod exec_env;
|
||||
pub mod extensibility;
|
||||
pub use extensibility::AnyToolHandler;
|
||||
pub use extensibility::ExtensionRegistry;
|
||||
pub use extensibility::FunctionCallError;
|
||||
pub use extensibility::FunctionToolOutput;
|
||||
pub use extensibility::ToolHandler;
|
||||
pub use extensibility::ToolInvocation;
|
||||
pub use extensibility::ToolProvider;
|
||||
mod exec_policy;
|
||||
pub mod file_watcher;
|
||||
mod flags;
|
||||
@@ -116,6 +124,7 @@ pub use thread_manager::ForkSnapshot;
|
||||
pub use thread_manager::NewThread;
|
||||
pub use thread_manager::StartThreadOptions;
|
||||
pub use thread_manager::ThreadManager;
|
||||
pub use thread_manager::ThreadManagerBuilder;
|
||||
pub use thread_manager::ThreadShutdownReport;
|
||||
pub use thread_manager::agent_graph_store_from_state_db;
|
||||
pub use thread_manager::build_models_manager;
|
||||
|
||||
@@ -44,17 +44,17 @@ pub async fn build_prompt_input(
|
||||
let thread_store = thread_store_from_config(&config, state_db.clone());
|
||||
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let thread_manager = ThreadManager::new(
|
||||
let thread_manager = ThreadManager::builder(
|
||||
&config,
|
||||
Arc::clone(&auth_manager),
|
||||
SessionSource::Exec,
|
||||
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
installation_id,
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
let thread = thread_manager.start_thread(config).await?;
|
||||
|
||||
let output = build_prompt_input_from_session(thread.thread.codex.session.as_ref(), input).await;
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::context::PersonalitySpecInstructions;
|
||||
use crate::default_skill_metadata_budget;
|
||||
use crate::environment_selection::ResolvedTurnEnvironments;
|
||||
use crate::exec_policy::ExecPolicyManager;
|
||||
use crate::extensibility::ExtensionRegistry;
|
||||
use crate::parse_turn_item;
|
||||
use crate::path_utils::normalize_for_native_workdir;
|
||||
use crate::realtime_conversation::RealtimeConversationManager;
|
||||
@@ -411,6 +412,7 @@ pub(crate) struct CodexSpawnArgs {
|
||||
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
pub(crate) state_db: Option<state_db::StateDbHandle>,
|
||||
pub(crate) thread_store: Arc<dyn ThreadStore>,
|
||||
pub(crate) extensions: ExtensionRegistry,
|
||||
}
|
||||
|
||||
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
|
||||
@@ -471,6 +473,7 @@ impl Codex {
|
||||
analytics_events_client,
|
||||
state_db,
|
||||
thread_store,
|
||||
extensions,
|
||||
} = args;
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
@@ -648,6 +651,7 @@ impl Codex {
|
||||
analytics_events_client,
|
||||
state_db,
|
||||
thread_store,
|
||||
extensions,
|
||||
parent_rollout_thread_trace,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -370,6 +370,7 @@ impl Session {
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
state_db: Option<state_db::StateDbHandle>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
extensions: ExtensionRegistry,
|
||||
parent_rollout_thread_trace: ThreadTraceContext,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
debug!(
|
||||
@@ -838,6 +839,7 @@ impl Session {
|
||||
state_db: state_db_ctx.clone(),
|
||||
live_thread: live_thread_init.as_ref().cloned(),
|
||||
thread_store: Arc::clone(&thread_store),
|
||||
extensions,
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
session_id,
|
||||
|
||||
@@ -545,6 +545,7 @@ fn test_tool_runtime(session: Arc<Session>, turn_context: Arc<TurnContext>) -> T
|
||||
parallel_mcp_server_names: HashSet::new(),
|
||||
discoverable_tools: None,
|
||||
dynamic_tools: turn_context.dynamic_tools.as_slice(),
|
||||
extension_tool_handlers: Vec::new(),
|
||||
},
|
||||
));
|
||||
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
|
||||
@@ -3762,6 +3763,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
.await
|
||||
.expect("state db should initialize"),
|
||||
)),
|
||||
extensions: ExtensionRegistry::default(),
|
||||
model_client: ModelClient::new(
|
||||
Some(auth_manager.clone()),
|
||||
thread_id.into(),
|
||||
@@ -5448,6 +5450,7 @@ where
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
|
||||
state_db,
|
||||
)),
|
||||
extensions: ExtensionRegistry::default(),
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
thread_id.into(),
|
||||
@@ -8287,6 +8290,7 @@ async fn fatal_tool_error_stops_turn_and_reports_error() {
|
||||
parallel_mcp_server_names: HashSet::new(),
|
||||
discoverable_tools: None,
|
||||
dynamic_tools: turn_context.dynamic_tools.as_slice(),
|
||||
extension_tool_handlers: Vec::new(),
|
||||
},
|
||||
);
|
||||
let item = ResponseItem::CustomToolCall {
|
||||
|
||||
@@ -769,6 +769,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
|
||||
analytics_events_client: None,
|
||||
state_db: None,
|
||||
thread_store,
|
||||
extensions: ExtensionRegistry::default(),
|
||||
})
|
||||
.await
|
||||
.expect("spawn guardian subagent");
|
||||
|
||||
@@ -19,6 +19,8 @@ use crate::compact_remote::run_inline_remote_auto_compact_task;
|
||||
use crate::compact_remote_v2::run_inline_remote_auto_compact_task as run_inline_remote_auto_compact_task_v2;
|
||||
use crate::connectors;
|
||||
use crate::context::ContextualUserFragment;
|
||||
use crate::extensibility::ToolProvider;
|
||||
use crate::extensibility::ToolProviderContext;
|
||||
use crate::feedback_tags;
|
||||
use crate::hook_runtime::PendingInputHookDisposition;
|
||||
use crate::hook_runtime::emit_hook_completed_events;
|
||||
@@ -53,6 +55,7 @@ use crate::stream_events_utils::record_completed_response_item;
|
||||
use crate::tools::ToolRouter;
|
||||
use crate::tools::context::SharedTurnDiffTracker;
|
||||
use crate::tools::parallel::ToolCallRuntime;
|
||||
use crate::tools::registry::ToolArgumentDiffContext;
|
||||
use crate::tools::registry::ToolArgumentDiffConsumer;
|
||||
use crate::tools::router::ToolRouterParams;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
@@ -1187,7 +1190,10 @@ pub(crate) async fn built_tools(
|
||||
None
|
||||
};
|
||||
let auth = sess.services.auth_manager.auth().await;
|
||||
let discoverable_tools = if apps_enabled && turn_context.tools_config.tool_suggest {
|
||||
let plugin_install_suggest_enabled = apps_enabled
|
||||
&& turn_context.tools_config.tool_suggest
|
||||
&& turn_context.app_server_client_name.as_deref() != Some("codex-tui");
|
||||
let discoverable_tools = if plugin_install_suggest_enabled {
|
||||
if let Some(accessible_connectors) = accessible_connectors_with_enabled_state.as_ref() {
|
||||
match connectors::list_tool_suggest_discoverable_tools_with_auth(
|
||||
&turn_context.config,
|
||||
@@ -1265,6 +1271,20 @@ pub(crate) async fn built_tools(
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let provider_context = ToolProviderContext::new(
|
||||
Arc::clone(&turn_context.config),
|
||||
Arc::clone(&sess.services.plugins_manager),
|
||||
sess.conversation_id.to_string(),
|
||||
turn_context.sub_id.clone(),
|
||||
);
|
||||
let extension_tool_handlers = sess
|
||||
.services
|
||||
.extensions
|
||||
.get::<dyn ToolProvider>()
|
||||
.into_iter()
|
||||
.flat_map(|provider| provider.handlers(provider_context.clone()))
|
||||
.collect();
|
||||
|
||||
Ok(Arc::new(ToolRouter::from_config(
|
||||
&turn_context.tools_config,
|
||||
ToolRouterParams {
|
||||
@@ -1274,6 +1294,7 @@ pub(crate) async fn built_tools(
|
||||
parallel_mcp_server_names,
|
||||
discoverable_tools,
|
||||
dynamic_tools: turn_context.dynamic_tools.as_slice(),
|
||||
extension_tool_handlers,
|
||||
},
|
||||
)))
|
||||
}
|
||||
@@ -2176,7 +2197,11 @@ async fn try_run_sampling_request(
|
||||
Some(call_id) => call_id,
|
||||
None => active_call_id.clone(),
|
||||
};
|
||||
if let Some(event) = consumer.consume_diff(turn_context.as_ref(), call_id, &delta) {
|
||||
if let Some(event) = consumer.consume_diff(
|
||||
ToolArgumentDiffContext::new(turn_context.as_ref()),
|
||||
call_id,
|
||||
&delta,
|
||||
) {
|
||||
sess.send_event(&turn_context, event).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::agent::AgentControl;
|
||||
use crate::client::ModelClient;
|
||||
use crate::config::StartedNetworkProxy;
|
||||
use crate::exec_policy::ExecPolicyManager;
|
||||
use crate::extensibility::ExtensionRegistry;
|
||||
use crate::guardian::GuardianRejection;
|
||||
use crate::guardian::GuardianRejectionCircuitBreaker;
|
||||
use crate::mcp::McpManager;
|
||||
@@ -66,6 +67,7 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) state_db: Option<StateDbHandle>,
|
||||
pub(crate) live_thread: Option<LiveThread>,
|
||||
pub(crate) thread_store: Arc<dyn ThreadStore>,
|
||||
pub(crate) extensions: ExtensionRegistry,
|
||||
/// Session-scoped model client shared across turns.
|
||||
pub(crate) model_client: ModelClient,
|
||||
pub(crate) code_mode_service: CodeModeService,
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::config::Config;
|
||||
use crate::config::ThreadStoreConfig;
|
||||
use crate::environment_selection::default_thread_environment_selections;
|
||||
use crate::environment_selection::resolve_environment_selections;
|
||||
use crate::extensibility::ExtensionRegistry;
|
||||
use crate::file_watcher::FileWatcher;
|
||||
use crate::mcp::McpManager;
|
||||
use crate::resolve_installation_id;
|
||||
@@ -216,6 +217,107 @@ pub struct ThreadManager {
|
||||
_test_codex_home_guard: Option<TempCodexHomeGuard>,
|
||||
}
|
||||
|
||||
/// Builder for constructing a [`ThreadManager`].
|
||||
pub struct ThreadManagerBuilder {
|
||||
codex_home: AbsolutePathBuf,
|
||||
bundled_skills_enabled: bool,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: SharedModelsManager,
|
||||
session_source: SessionSource,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
state_db: StateDbHandle,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
agent_graph_store: Arc<dyn AgentGraphStore>,
|
||||
extensions: ExtensionRegistry,
|
||||
installation_id: String,
|
||||
test_codex_home_guard: Option<TempCodexHomeGuard>,
|
||||
}
|
||||
|
||||
impl ThreadManagerBuilder {
|
||||
/// Create a builder from the effective runtime config and process-scoped stores.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn from_config(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
state_db: StateDbHandle,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
agent_graph_store: Arc<dyn AgentGraphStore>,
|
||||
installation_id: String,
|
||||
) -> Self {
|
||||
Self::new(
|
||||
config.codex_home.clone(),
|
||||
config.bundled_skills_enabled(),
|
||||
auth_manager.clone(),
|
||||
build_models_manager(config, auth_manager),
|
||||
environment_manager,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
installation_id,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new(
|
||||
codex_home: AbsolutePathBuf,
|
||||
bundled_skills_enabled: bool,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: SharedModelsManager,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
state_db: StateDbHandle,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
agent_graph_store: Arc<dyn AgentGraphStore>,
|
||||
installation_id: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
codex_home,
|
||||
bundled_skills_enabled,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
session_source: SessionSource::Exec,
|
||||
environment_manager,
|
||||
analytics_events_client: None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
extensions: ExtensionRegistry::default(),
|
||||
installation_id,
|
||||
test_codex_home_guard: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Override the session source recorded on threads started by this manager.
|
||||
pub fn session_source(mut self, session_source: SessionSource) -> Self {
|
||||
self.session_source = session_source;
|
||||
self
|
||||
}
|
||||
|
||||
/// Attach the analytics client used by services owned by this manager.
|
||||
pub fn analytics_events_client(
|
||||
mut self,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
) -> Self {
|
||||
self.analytics_events_client = Some(analytics_events_client);
|
||||
self
|
||||
}
|
||||
|
||||
/// Register one implementation for extension trait `T`.
|
||||
pub fn register_extension<T>(mut self, extension: Arc<T>) -> Self
|
||||
where
|
||||
T: ?Sized + Send + Sync + 'static,
|
||||
{
|
||||
self.extensions.register(extension);
|
||||
self
|
||||
}
|
||||
|
||||
/// Build a thread manager from the configured inputs.
|
||||
pub fn build(self) -> ThreadManager {
|
||||
ThreadManager::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StartThreadOptions {
|
||||
pub config: Config,
|
||||
pub initial_history: InitialHistory,
|
||||
@@ -253,6 +355,7 @@ pub(crate) struct ThreadManagerState {
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
state_db: StateDbHandle,
|
||||
agent_graph_store: Arc<dyn AgentGraphStore>,
|
||||
extensions: ExtensionRegistry,
|
||||
session_source: SessionSource,
|
||||
installation_id: String,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
@@ -308,19 +411,44 @@ async fn state_db_from_roots_for_tests(
|
||||
}
|
||||
|
||||
impl ThreadManager {
|
||||
/// Create a builder for the standard runtime thread manager.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
pub fn builder(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
session_source: SessionSource,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
state_db: StateDbHandle,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
agent_graph_store: Arc<dyn AgentGraphStore>,
|
||||
installation_id: String,
|
||||
) -> Self {
|
||||
let codex_home = config.codex_home.clone();
|
||||
) -> ThreadManagerBuilder {
|
||||
ThreadManagerBuilder::from_config(
|
||||
config,
|
||||
auth_manager,
|
||||
environment_manager,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
installation_id,
|
||||
)
|
||||
}
|
||||
|
||||
fn new(builder: ThreadManagerBuilder) -> Self {
|
||||
let ThreadManagerBuilder {
|
||||
codex_home,
|
||||
bundled_skills_enabled,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
session_source,
|
||||
environment_manager,
|
||||
analytics_events_client,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
extensions,
|
||||
installation_id,
|
||||
test_codex_home_guard,
|
||||
} = builder;
|
||||
let restriction_product = session_source.restriction_product();
|
||||
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
|
||||
let plugins_manager = Arc::new(PluginsManager::new_with_restriction_product(
|
||||
@@ -330,7 +458,7 @@ impl ThreadManager {
|
||||
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
|
||||
let skills_manager = Arc::new(SkillsManager::new_with_restriction_product(
|
||||
codex_home,
|
||||
config.bundled_skills_enabled(),
|
||||
bundled_skills_enabled,
|
||||
restriction_product,
|
||||
));
|
||||
let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager));
|
||||
@@ -338,7 +466,7 @@ impl ThreadManager {
|
||||
state: Arc::new(ThreadManagerState {
|
||||
threads: Arc::new(RwLock::new(HashMap::new())),
|
||||
thread_created_tx,
|
||||
models_manager: build_models_manager(config, auth_manager.clone()),
|
||||
models_manager,
|
||||
environment_manager,
|
||||
skills_manager,
|
||||
plugins_manager,
|
||||
@@ -347,6 +475,7 @@ impl ThreadManager {
|
||||
thread_store,
|
||||
state_db,
|
||||
agent_graph_store,
|
||||
extensions,
|
||||
auth_manager,
|
||||
session_source,
|
||||
installation_id,
|
||||
@@ -354,7 +483,7 @@ impl ThreadManager {
|
||||
ops_log: should_use_test_thread_manager_behavior()
|
||||
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
|
||||
}),
|
||||
_test_codex_home_guard: None,
|
||||
_test_codex_home_guard: test_codex_home_guard,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -440,21 +569,8 @@ impl ThreadManager {
|
||||
) -> Self {
|
||||
set_thread_manager_test_mode_for_tests(/*enabled*/ true);
|
||||
let auth_manager = AuthManager::from_auth_for_testing(auth);
|
||||
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
|
||||
let restriction_product = SessionSource::Exec.restriction_product();
|
||||
let plugins_manager = Arc::new(PluginsManager::new_with_restriction_product(
|
||||
codex_home.clone(),
|
||||
restriction_product,
|
||||
));
|
||||
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
|
||||
let skills_manager = Arc::new(SkillsManager::new_with_restriction_product(
|
||||
skills_codex_home,
|
||||
/*bundled_skills_enabled*/ true,
|
||||
restriction_product,
|
||||
));
|
||||
let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager));
|
||||
// This test constructor has no Config input. Tests that need a non-local
|
||||
// process store should construct ThreadManager::new with an explicit store.
|
||||
// This test helper has no Config input. Tests that need a non-local
|
||||
// process store should construct ThreadManager through the builder with an explicit store.
|
||||
let thread_store: Arc<dyn ThreadStore> = Arc::new(LocalThreadStore::new(
|
||||
LocalThreadStoreConfig {
|
||||
codex_home: codex_home.clone(),
|
||||
@@ -463,29 +579,21 @@ impl ThreadManager {
|
||||
state_db.clone(),
|
||||
));
|
||||
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
|
||||
Self {
|
||||
state: Arc::new(ThreadManagerState {
|
||||
threads: Arc::new(RwLock::new(HashMap::new())),
|
||||
thread_created_tx,
|
||||
models_manager: create_model_provider(provider, Some(auth_manager.clone()))
|
||||
.models_manager(codex_home, /*config_model_catalog*/ None),
|
||||
environment_manager,
|
||||
skills_manager,
|
||||
plugins_manager,
|
||||
mcp_manager,
|
||||
skills_watcher,
|
||||
thread_store,
|
||||
state_db,
|
||||
agent_graph_store,
|
||||
auth_manager,
|
||||
session_source: SessionSource::Exec,
|
||||
installation_id,
|
||||
analytics_events_client: None,
|
||||
ops_log: should_use_test_thread_manager_behavior()
|
||||
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
|
||||
}),
|
||||
_test_codex_home_guard: None,
|
||||
}
|
||||
let models_manager = create_model_provider(provider, Some(auth_manager.clone()))
|
||||
.models_manager(codex_home, /*config_model_catalog*/ None);
|
||||
ThreadManagerBuilder::new(
|
||||
skills_codex_home,
|
||||
/*bundled_skills_enabled*/ true,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
environment_manager,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
installation_id,
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn session_source(&self) -> SessionSource {
|
||||
@@ -531,6 +639,10 @@ impl ThreadManager {
|
||||
self.state.models_manager.clone()
|
||||
}
|
||||
|
||||
pub fn extensions(&self) -> &ExtensionRegistry {
|
||||
&self.state.extensions
|
||||
}
|
||||
|
||||
pub async fn list_models(&self, refresh_strategy: RefreshStrategy) -> Vec<ModelPreset> {
|
||||
self.state
|
||||
.models_manager
|
||||
@@ -1244,6 +1356,7 @@ impl ThreadManagerState {
|
||||
analytics_events_client: self.analytics_events_client.clone(),
|
||||
state_db: Some(self.state_db.clone()),
|
||||
thread_store: Arc::clone(&self.thread_store),
|
||||
extensions: self.extensions.clone(),
|
||||
})
|
||||
.await?;
|
||||
let new_thread = self
|
||||
|
||||
@@ -409,17 +409,17 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
let selected_cwd =
|
||||
AbsolutePathBuf::try_from(config.cwd.as_path().join("selected")).expect("absolute path");
|
||||
let environments = vec![TurnEnvironmentSelection {
|
||||
@@ -525,17 +525,17 @@ async fn explicit_installation_id_skips_codex_home_file() {
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let installation_id = uuid::Uuid::new_v4().to_string();
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager,
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
installation_id.clone(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let thread = manager
|
||||
.start_thread(config.clone())
|
||||
@@ -564,17 +564,17 @@ async fn resume_active_thread_from_rollout_returns_running_thread() {
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let source = manager
|
||||
.start_thread(config.clone())
|
||||
@@ -621,17 +621,17 @@ async fn resume_stopped_thread_from_rollout_spawns_new_thread() {
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let source = manager
|
||||
.start_thread(config.clone())
|
||||
@@ -683,17 +683,17 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() {
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let source = manager
|
||||
.start_thread_with_options(StartThreadOptions {
|
||||
@@ -769,17 +769,17 @@ async fn new_uses_active_provider_for_model_refresh() {
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager,
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let _ = manager.list_models(RefreshStrategy::Online).await;
|
||||
assert_eq!(models_mock.requests().len(), 1);
|
||||
@@ -984,17 +984,17 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let source = manager
|
||||
.resume_thread_with_history(
|
||||
@@ -1091,17 +1091,17 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let source = manager
|
||||
.resume_thread_with_history(
|
||||
@@ -1187,17 +1187,17 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let source = manager
|
||||
.resume_thread_with_history(
|
||||
@@ -1329,17 +1329,17 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> {
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let source = manager
|
||||
.resume_thread_with_history(
|
||||
|
||||
@@ -302,6 +302,7 @@ async fn build_nested_router(exec: &ExecContext) -> ToolRouter {
|
||||
parallel_mcp_server_names,
|
||||
discoverable_tools: None,
|
||||
dynamic_tools: exec.turn.dynamic_tools.as_slice(),
|
||||
extension_tool_handlers: Vec::new(),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::context_manager::truncate_function_output_payload;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::original_image_detail::sanitize_original_image_detail;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
@@ -7,7 +8,10 @@ use crate::tools::TELEMETRY_PREVIEW_MAX_LINES;
|
||||
use crate::tools::TELEMETRY_PREVIEW_TRUNCATION_NOTICE;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use crate::unified_exec::resolve_max_tokens;
|
||||
use codex_app_server_protocol::McpServerElicitationRequestParams;
|
||||
use codex_mcp::ToolInfo;
|
||||
use codex_protocol::mcp::CallToolResult;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use codex_protocol::models::DEFAULT_IMAGE_DETAIL;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
@@ -24,6 +28,8 @@ use codex_utils_string::take_bytes_at_char_boundary;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -46,14 +52,81 @@ pub enum ToolCallSource {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ToolInvocation {
|
||||
pub session: Arc<Session>,
|
||||
pub turn: Arc<TurnContext>,
|
||||
pub cancellation_token: CancellationToken,
|
||||
pub tracker: SharedTurnDiffTracker,
|
||||
pub call_id: String,
|
||||
pub tool_name: ToolName,
|
||||
pub source: ToolCallSource,
|
||||
pub payload: ToolPayload,
|
||||
pub(crate) session: Arc<Session>,
|
||||
pub(crate) turn: Arc<TurnContext>,
|
||||
pub(crate) cancellation_token: CancellationToken,
|
||||
pub(crate) tracker: SharedTurnDiffTracker,
|
||||
pub(crate) call_id: String,
|
||||
pub(crate) tool_name: ToolName,
|
||||
pub(crate) source: ToolCallSource,
|
||||
pub(crate) payload: ToolPayload,
|
||||
}
|
||||
|
||||
impl ToolInvocation {
|
||||
pub fn function_arguments(&self, tool_name: &str) -> Result<&str, FunctionCallError> {
|
||||
match &self.payload {
|
||||
ToolPayload::Function { arguments } => Ok(arguments),
|
||||
_ => Err(FunctionCallError::Fatal(format!(
|
||||
"{tool_name} handler received unsupported payload"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn call_id(&self) -> &str {
|
||||
self.call_id.as_str()
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "tool extensions read through the session-owned MCP manager guard"
|
||||
)]
|
||||
pub async fn list_mcp_tools(&self) -> HashMap<String, ToolInfo> {
|
||||
self.session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.list_all_tools()
|
||||
.await
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "tool extensions refresh through the session-owned MCP manager guard"
|
||||
)]
|
||||
pub async fn hard_refresh_codex_apps_tools_cache(
|
||||
&self,
|
||||
) -> anyhow::Result<HashMap<String, ToolInfo>> {
|
||||
self.session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.hard_refresh_codex_apps_tools_cache()
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn request_mcp_server_elicitation(
|
||||
&self,
|
||||
request_id: String,
|
||||
params: McpServerElicitationRequestParams,
|
||||
) -> Option<ElicitationResponse> {
|
||||
self.session
|
||||
.request_mcp_server_elicitation(
|
||||
self.turn.as_ref(),
|
||||
rmcp::model::RequestId::String(request_id.into()),
|
||||
params,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn reload_user_config_layer(&self) {
|
||||
self.session.reload_user_config_layer().await;
|
||||
}
|
||||
|
||||
pub async fn merge_connector_selection(&self, connector_ids: HashSet<String>) {
|
||||
self.session.merge_connector_selection(connector_ids).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
||||
@@ -26,6 +26,7 @@ use crate::tools::hook_names::HookToolName;
|
||||
use crate::tools::orchestrator::ToolOrchestrator;
|
||||
use crate::tools::registry::PostToolUsePayload;
|
||||
use crate::tools::registry::PreToolUsePayload;
|
||||
use crate::tools::registry::ToolArgumentDiffContext;
|
||||
use crate::tools::registry::ToolArgumentDiffConsumer;
|
||||
use crate::tools::registry::ToolHandler;
|
||||
use crate::tools::registry::ToolKind;
|
||||
@@ -64,11 +65,11 @@ struct ApplyPatchArgumentDiffConsumer {
|
||||
impl ToolArgumentDiffConsumer for ApplyPatchArgumentDiffConsumer {
|
||||
fn consume_diff(
|
||||
&mut self,
|
||||
turn: &TurnContext,
|
||||
context: ToolArgumentDiffContext<'_>,
|
||||
call_id: String,
|
||||
diff: &str,
|
||||
) -> Option<EventMsg> {
|
||||
if !turn.features.enabled(Feature::ApplyPatchStreamingEvents) {
|
||||
if !context.feature_enabled(Feature::ApplyPatchStreamingEvents) {
|
||||
return None;
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ pub(crate) mod multi_agents_common;
|
||||
pub(crate) mod multi_agents_v2;
|
||||
mod plan;
|
||||
mod request_permissions;
|
||||
mod request_plugin_install;
|
||||
mod request_user_input;
|
||||
mod shell;
|
||||
mod test_sync;
|
||||
@@ -47,7 +46,6 @@ pub use mcp_resource::ListMcpResourcesHandler;
|
||||
pub use mcp_resource::ReadMcpResourceHandler;
|
||||
pub use plan::PlanHandler;
|
||||
pub use request_permissions::RequestPermissionsHandler;
|
||||
pub use request_plugin_install::RequestPluginInstallHandler;
|
||||
pub use request_user_input::RequestUserInputHandler;
|
||||
pub use shell::ContainerExecHandler;
|
||||
pub use shell::LocalShellHandler;
|
||||
|
||||
@@ -3162,17 +3162,17 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr
|
||||
let state_db = init_state_db(&config)
|
||||
.await
|
||||
.expect("test config should initialize state db");
|
||||
let manager = ThreadManager::new(
|
||||
let manager = ThreadManager::builder(
|
||||
&config,
|
||||
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db.clone(),
|
||||
thread_store_from_config(&config, state_db.clone()),
|
||||
agent_graph_store_from_state_db(state_db.clone()),
|
||||
"11111111-1111-4111-8111-111111111111".to_string(),
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let parent = manager
|
||||
.start_thread(config.clone())
|
||||
|
||||
@@ -1,339 +0,0 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use codex_app_server_protocol::AppInfo;
|
||||
use codex_config::types::ToolSuggestDisabledTool;
|
||||
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use codex_rmcp_client::ElicitationAction;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use codex_tools::DiscoverableTool;
|
||||
use codex_tools::DiscoverableToolAction;
|
||||
use codex_tools::DiscoverableToolType;
|
||||
use codex_tools::REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE;
|
||||
use codex_tools::REQUEST_PLUGIN_INSTALL_PERSIST_KEY;
|
||||
use codex_tools::REQUEST_PLUGIN_INSTALL_TOOL_NAME;
|
||||
use codex_tools::RequestPluginInstallArgs;
|
||||
use codex_tools::RequestPluginInstallResult;
|
||||
use codex_tools::ToolName;
|
||||
use codex_tools::all_requested_connectors_picked_up;
|
||||
use codex_tools::build_request_plugin_install_elicitation_request;
|
||||
use codex_tools::filter_request_plugin_install_discoverable_tools_for_client;
|
||||
use codex_tools::verified_connector_install_completed;
|
||||
use rmcp::model::RequestId;
|
||||
use serde_json::Value;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::config::edit::ConfigEdit;
|
||||
use crate::config::edit::ConfigEditsBuilder;
|
||||
use crate::connectors;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::context::FunctionToolOutput;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::handlers::parse_arguments;
|
||||
use crate::tools::registry::ToolHandler;
|
||||
use crate::tools::registry::ToolKind;
|
||||
|
||||
pub struct RequestPluginInstallHandler;
|
||||
|
||||
impl ToolHandler for RequestPluginInstallHandler {
|
||||
type Output = FunctionToolOutput;
|
||||
|
||||
fn tool_name(&self) -> ToolName {
|
||||
ToolName::plain(REQUEST_PLUGIN_INSTALL_TOOL_NAME)
|
||||
}
|
||||
|
||||
fn kind(&self) -> ToolKind {
|
||||
ToolKind::Function
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "plugin install discovery reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
|
||||
let ToolInvocation {
|
||||
payload,
|
||||
session,
|
||||
turn,
|
||||
call_id,
|
||||
..
|
||||
} = invocation;
|
||||
|
||||
let arguments = match payload {
|
||||
ToolPayload::Function { arguments } => arguments,
|
||||
_ => {
|
||||
return Err(FunctionCallError::Fatal(format!(
|
||||
"{REQUEST_PLUGIN_INSTALL_TOOL_NAME} handler received unsupported payload"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let args: RequestPluginInstallArgs = parse_arguments(&arguments)?;
|
||||
let suggest_reason = args.suggest_reason.trim();
|
||||
if suggest_reason.is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"suggest_reason must not be empty".to_string(),
|
||||
));
|
||||
}
|
||||
if args.action_type != DiscoverableToolAction::Install {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"plugin install requests currently support only action_type=\"install\""
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
if args.tool_type == DiscoverableToolType::Plugin
|
||||
&& turn.app_server_client_name.as_deref() == Some("codex-tui")
|
||||
{
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"plugin install requests are not available in codex-tui yet".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let auth = session.services.auth_manager.auth().await;
|
||||
let manager = session.services.mcp_connection_manager.read().await;
|
||||
let mcp_tools = manager.list_all_tools().await;
|
||||
drop(manager);
|
||||
let accessible_connectors = connectors::with_app_enabled_state(
|
||||
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
|
||||
&turn.config,
|
||||
);
|
||||
let discoverable_tools = connectors::list_tool_suggest_discoverable_tools_with_auth(
|
||||
&turn.config,
|
||||
auth.as_ref(),
|
||||
&accessible_connectors,
|
||||
)
|
||||
.await
|
||||
.map(|discoverable_tools| {
|
||||
filter_request_plugin_install_discoverable_tools_for_client(
|
||||
discoverable_tools,
|
||||
turn.app_server_client_name.as_deref(),
|
||||
)
|
||||
})
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"plugin install requests are unavailable right now: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let tool = discoverable_tools
|
||||
.into_iter()
|
||||
.find(|tool| tool.tool_type() == args.tool_type && tool.id() == args.tool_id)
|
||||
.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"tool_id must match one of the discoverable tools exposed by {REQUEST_PLUGIN_INSTALL_TOOL_NAME}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let request_id = RequestId::String(format!("request_plugin_install_{call_id}").into());
|
||||
let params = build_request_plugin_install_elicitation_request(
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
session.conversation_id.to_string(),
|
||||
turn.sub_id.clone(),
|
||||
&args,
|
||||
suggest_reason,
|
||||
&tool,
|
||||
);
|
||||
let response = session
|
||||
.request_mcp_server_elicitation(turn.as_ref(), request_id, params)
|
||||
.await;
|
||||
if let Some(response) = response.as_ref() {
|
||||
maybe_persist_disabled_install_request(&session, &turn, &tool, response).await;
|
||||
}
|
||||
let user_confirmed = response
|
||||
.as_ref()
|
||||
.is_some_and(|response| response.action == ElicitationAction::Accept);
|
||||
|
||||
let completed = if user_confirmed {
|
||||
verify_request_plugin_install_completed(&session, &turn, &tool, auth.as_ref()).await
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if completed && let DiscoverableTool::Connector(connector) = &tool {
|
||||
session
|
||||
.merge_connector_selection(HashSet::from([connector.id.clone()]))
|
||||
.await;
|
||||
}
|
||||
|
||||
let content = serde_json::to_string(&RequestPluginInstallResult {
|
||||
completed,
|
||||
user_confirmed,
|
||||
tool_type: args.tool_type,
|
||||
action_type: args.action_type,
|
||||
tool_id: tool.id().to_string(),
|
||||
tool_name: tool.name().to_string(),
|
||||
suggest_reason: suggest_reason.to_string(),
|
||||
})
|
||||
.map_err(|err| {
|
||||
FunctionCallError::Fatal(format!(
|
||||
"failed to serialize {REQUEST_PLUGIN_INSTALL_TOOL_NAME} response: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(FunctionToolOutput::from_text(content, Some(true)))
|
||||
}
|
||||
}
|
||||
|
||||
async fn maybe_persist_disabled_install_request(
|
||||
session: &crate::session::session::Session,
|
||||
turn: &crate::session::turn_context::TurnContext,
|
||||
tool: &DiscoverableTool,
|
||||
response: &ElicitationResponse,
|
||||
) {
|
||||
if !request_plugin_install_response_requests_persistent_disable(response) {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(err) = persist_disabled_install_request(&turn.config.codex_home, tool).await {
|
||||
warn!(
|
||||
error = %err,
|
||||
tool_id = tool.id(),
|
||||
"failed to persist disabled tool suggestion"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
session.reload_user_config_layer().await;
|
||||
}
|
||||
|
||||
fn request_plugin_install_response_requests_persistent_disable(
|
||||
response: &ElicitationResponse,
|
||||
) -> bool {
|
||||
if response.action != ElicitationAction::Decline {
|
||||
return false;
|
||||
}
|
||||
|
||||
response
|
||||
.meta
|
||||
.as_ref()
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|meta| meta.get(REQUEST_PLUGIN_INSTALL_PERSIST_KEY))
|
||||
.and_then(Value::as_str)
|
||||
== Some(REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE)
|
||||
}
|
||||
|
||||
async fn persist_disabled_install_request(
|
||||
codex_home: &codex_utils_absolute_path::AbsolutePathBuf,
|
||||
tool: &DiscoverableTool,
|
||||
) -> anyhow::Result<()> {
|
||||
ConfigEditsBuilder::new(codex_home)
|
||||
.with_edits([ConfigEdit::AddToolSuggestDisabledTool(
|
||||
disabled_install_request(tool),
|
||||
)])
|
||||
.apply()
|
||||
.await
|
||||
}
|
||||
|
||||
fn disabled_install_request(tool: &DiscoverableTool) -> ToolSuggestDisabledTool {
|
||||
match tool {
|
||||
DiscoverableTool::Connector(connector) => {
|
||||
ToolSuggestDisabledTool::connector(connector.id.as_str())
|
||||
}
|
||||
DiscoverableTool::Plugin(plugin) => ToolSuggestDisabledTool::plugin(plugin.id.as_str()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn verify_request_plugin_install_completed(
|
||||
session: &crate::session::session::Session,
|
||||
turn: &crate::session::turn_context::TurnContext,
|
||||
tool: &DiscoverableTool,
|
||||
auth: Option<&codex_login::CodexAuth>,
|
||||
) -> bool {
|
||||
match tool {
|
||||
DiscoverableTool::Connector(connector) => refresh_missing_requested_connectors(
|
||||
session,
|
||||
turn,
|
||||
auth,
|
||||
std::slice::from_ref(&connector.id),
|
||||
connector.id.as_str(),
|
||||
)
|
||||
.await
|
||||
.is_some_and(|accessible_connectors| {
|
||||
verified_connector_install_completed(connector.id.as_str(), &accessible_connectors)
|
||||
}),
|
||||
DiscoverableTool::Plugin(plugin) => {
|
||||
session.reload_user_config_layer().await;
|
||||
let config = session.get_config().await;
|
||||
let completed = verified_plugin_install_completed(
|
||||
plugin.id.as_str(),
|
||||
config.as_ref(),
|
||||
session.services.plugins_manager.as_ref(),
|
||||
);
|
||||
let _ = refresh_missing_requested_connectors(
|
||||
session,
|
||||
turn,
|
||||
auth,
|
||||
&plugin.app_connector_ids,
|
||||
plugin.id.as_str(),
|
||||
)
|
||||
.await;
|
||||
completed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "connector cache refresh reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn refresh_missing_requested_connectors(
|
||||
session: &crate::session::session::Session,
|
||||
turn: &crate::session::turn_context::TurnContext,
|
||||
auth: Option<&codex_login::CodexAuth>,
|
||||
expected_connector_ids: &[String],
|
||||
tool_id: &str,
|
||||
) -> Option<Vec<AppInfo>> {
|
||||
if expected_connector_ids.is_empty() {
|
||||
return Some(Vec::new());
|
||||
}
|
||||
|
||||
let manager = session.services.mcp_connection_manager.read().await;
|
||||
let mcp_tools = manager.list_all_tools().await;
|
||||
let accessible_connectors = connectors::with_app_enabled_state(
|
||||
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
|
||||
&turn.config,
|
||||
);
|
||||
if all_requested_connectors_picked_up(expected_connector_ids, &accessible_connectors) {
|
||||
return Some(accessible_connectors);
|
||||
}
|
||||
|
||||
match manager.hard_refresh_codex_apps_tools_cache().await {
|
||||
Ok(mcp_tools) => {
|
||||
let accessible_connectors = connectors::with_app_enabled_state(
|
||||
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
|
||||
&turn.config,
|
||||
);
|
||||
connectors::refresh_accessible_connectors_cache_from_mcp_tools(
|
||||
&turn.config,
|
||||
auth,
|
||||
&mcp_tools,
|
||||
);
|
||||
Some(accessible_connectors)
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to refresh codex apps tools cache after plugin install request for {tool_id}: {err:#}"
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn verified_plugin_install_completed(
|
||||
tool_id: &str,
|
||||
config: &crate::config::Config,
|
||||
plugins_manager: &codex_core_plugins::PluginsManager,
|
||||
) -> bool {
|
||||
let plugins_input = config.plugins_config_input();
|
||||
plugins_manager
|
||||
.list_marketplaces_for_config(&plugins_input, &[])
|
||||
.ok()
|
||||
.into_iter()
|
||||
.flat_map(|outcome| outcome.marketplaces)
|
||||
.flat_map(|marketplace| marketplace.plugins.into_iter())
|
||||
.any(|plugin| plugin.id == tool_id && plugin.installed)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "request_plugin_install_tests.rs"]
|
||||
mod tests;
|
||||
@@ -1,214 +0,0 @@
|
||||
use super::*;
|
||||
use crate::plugins::test_support::load_plugins_config;
|
||||
use crate::plugins::test_support::write_curated_plugin_sha;
|
||||
use crate::plugins::test_support::write_openai_curated_marketplace;
|
||||
use crate::plugins::test_support::write_plugins_feature_config;
|
||||
use codex_config::CONFIG_TOML_FILE;
|
||||
use codex_config::config_toml::ConfigToml;
|
||||
use codex_config::types::ToolSuggestConfig;
|
||||
use codex_config::types::ToolSuggestDisabledTool;
|
||||
use codex_config::types::ToolSuggestDiscoverable;
|
||||
use codex_config::types::ToolSuggestDiscoverableType;
|
||||
use codex_core_plugins::PluginInstallRequest;
|
||||
use codex_core_plugins::PluginsManager;
|
||||
use codex_core_plugins::startup_sync::curated_plugins_repo_path;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use codex_tools::DiscoverablePluginInfo;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use core_test_support::PathExt;
|
||||
use pretty_assertions::assert_eq;
|
||||
use rmcp::model::ElicitationAction;
|
||||
use serde_json::json;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[tokio::test]
|
||||
async fn verified_plugin_install_completed_requires_installed_plugin() {
|
||||
let codex_home = tempdir().expect("tempdir should succeed");
|
||||
let curated_root = curated_plugins_repo_path(codex_home.path());
|
||||
write_openai_curated_marketplace(&curated_root, &["sample"]);
|
||||
write_curated_plugin_sha(codex_home.path());
|
||||
write_plugins_feature_config(codex_home.path());
|
||||
|
||||
let config = load_plugins_config(codex_home.path()).await;
|
||||
let plugins_manager = PluginsManager::new(codex_home.path().to_path_buf());
|
||||
|
||||
assert!(!verified_plugin_install_completed(
|
||||
"sample@openai-curated",
|
||||
&config,
|
||||
&plugins_manager,
|
||||
));
|
||||
|
||||
plugins_manager
|
||||
.install_plugin(PluginInstallRequest {
|
||||
plugin_name: "sample".to_string(),
|
||||
marketplace_path: AbsolutePathBuf::try_from(
|
||||
curated_root.join(".agents/plugins/marketplace.json"),
|
||||
)
|
||||
.expect("marketplace path"),
|
||||
})
|
||||
.await
|
||||
.expect("plugin should install");
|
||||
|
||||
let refreshed_config = load_plugins_config(codex_home.path()).await;
|
||||
assert!(verified_plugin_install_completed(
|
||||
"sample@openai-curated",
|
||||
&refreshed_config,
|
||||
&plugins_manager,
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn request_plugin_install_response_persists_only_decline_always_mode() {
|
||||
assert!(request_plugin_install_response_requests_persistent_disable(
|
||||
&ElicitationResponse {
|
||||
action: ElicitationAction::Decline,
|
||||
content: None,
|
||||
meta: Some(json!({
|
||||
REQUEST_PLUGIN_INSTALL_PERSIST_KEY: REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE
|
||||
})),
|
||||
}
|
||||
));
|
||||
assert!(
|
||||
!request_plugin_install_response_requests_persistent_disable(&ElicitationResponse {
|
||||
action: ElicitationAction::Accept,
|
||||
content: None,
|
||||
meta: Some(json!({
|
||||
REQUEST_PLUGIN_INSTALL_PERSIST_KEY: REQUEST_PLUGIN_INSTALL_PERSIST_ALWAYS_VALUE
|
||||
})),
|
||||
})
|
||||
);
|
||||
assert!(
|
||||
!request_plugin_install_response_requests_persistent_disable(&ElicitationResponse {
|
||||
action: ElicitationAction::Decline,
|
||||
content: None,
|
||||
meta: Some(json!({ REQUEST_PLUGIN_INSTALL_PERSIST_KEY: "session" })),
|
||||
})
|
||||
);
|
||||
assert!(
|
||||
!request_plugin_install_response_requests_persistent_disable(&ElicitationResponse {
|
||||
action: ElicitationAction::Decline,
|
||||
content: None,
|
||||
meta: None,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persist_disabled_install_request_writes_connector_config() {
|
||||
let codex_home = tempdir().expect("tempdir should succeed");
|
||||
let tool = connector_tool("connector_calendar", "Google Calendar");
|
||||
|
||||
persist_disabled_install_request(&codex_home.path().abs(), &tool)
|
||||
.await
|
||||
.expect("persist connector disable");
|
||||
|
||||
let contents =
|
||||
std::fs::read_to_string(codex_home.path().join(CONFIG_TOML_FILE)).expect("read config");
|
||||
let parsed: ConfigToml = toml::from_str(&contents).expect("parse config");
|
||||
assert_eq!(
|
||||
parsed.tool_suggest,
|
||||
Some(ToolSuggestConfig {
|
||||
discoverables: Vec::new(),
|
||||
disabled_tools: vec![ToolSuggestDisabledTool::connector("connector_calendar")],
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persist_disabled_install_request_writes_plugin_config() {
|
||||
let codex_home = tempdir().expect("tempdir should succeed");
|
||||
let tool = DiscoverableTool::Plugin(Box::new(DiscoverablePluginInfo {
|
||||
id: "slack@openai-curated".to_string(),
|
||||
name: "Slack".to_string(),
|
||||
description: None,
|
||||
has_skills: true,
|
||||
mcp_server_names: Vec::new(),
|
||||
app_connector_ids: Vec::new(),
|
||||
}));
|
||||
|
||||
persist_disabled_install_request(&codex_home.path().abs(), &tool)
|
||||
.await
|
||||
.expect("persist plugin disable");
|
||||
|
||||
let contents =
|
||||
std::fs::read_to_string(codex_home.path().join(CONFIG_TOML_FILE)).expect("read config");
|
||||
let parsed: ConfigToml = toml::from_str(&contents).expect("parse config");
|
||||
assert_eq!(
|
||||
parsed.tool_suggest,
|
||||
Some(ToolSuggestConfig {
|
||||
discoverables: Vec::new(),
|
||||
disabled_tools: vec![ToolSuggestDisabledTool::plugin("slack@openai-curated")],
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persist_disabled_install_request_dedupes_existing_disabled_tools() {
|
||||
let codex_home = tempdir().expect("tempdir should succeed");
|
||||
let tool = connector_tool("connector_calendar", "Google Calendar");
|
||||
std::fs::write(
|
||||
codex_home.path().join(CONFIG_TOML_FILE),
|
||||
r#"
|
||||
[tool_suggest]
|
||||
discoverables = [
|
||||
{ type = "plugin", id = "sample@openai-curated" }
|
||||
]
|
||||
|
||||
[[tool_suggest.disabled_tools]]
|
||||
type = "connector"
|
||||
id = " connector_calendar "
|
||||
|
||||
[[tool_suggest.disabled_tools]]
|
||||
type = "connector"
|
||||
id = "connector_calendar"
|
||||
|
||||
[[tool_suggest.disabled_tools]]
|
||||
type = "connector"
|
||||
id = " "
|
||||
|
||||
[[tool_suggest.disabled_tools]]
|
||||
type = "plugin"
|
||||
id = "slack@openai-curated"
|
||||
"#,
|
||||
)
|
||||
.expect("write config");
|
||||
|
||||
persist_disabled_install_request(&codex_home.path().abs(), &tool)
|
||||
.await
|
||||
.expect("persist connector disable");
|
||||
|
||||
let contents =
|
||||
std::fs::read_to_string(codex_home.path().join(CONFIG_TOML_FILE)).expect("read config");
|
||||
let parsed: ConfigToml = toml::from_str(&contents).expect("parse config");
|
||||
assert_eq!(
|
||||
parsed.tool_suggest,
|
||||
Some(ToolSuggestConfig {
|
||||
discoverables: vec![ToolSuggestDiscoverable {
|
||||
kind: ToolSuggestDiscoverableType::Plugin,
|
||||
id: "sample@openai-curated".to_string(),
|
||||
}],
|
||||
disabled_tools: vec![
|
||||
ToolSuggestDisabledTool::connector("connector_calendar"),
|
||||
ToolSuggestDisabledTool::plugin("slack@openai-curated"),
|
||||
],
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
fn connector_tool(id: &str, name: &str) -> DiscoverableTool {
|
||||
DiscoverableTool::Connector(Box::new(AppInfo {
|
||||
id: id.to_string(),
|
||||
name: name.to_string(),
|
||||
description: None,
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
branding: None,
|
||||
app_metadata: None,
|
||||
labels: None,
|
||||
install_url: None,
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
plugin_display_names: Vec::new(),
|
||||
}))
|
||||
}
|
||||
@@ -25,9 +25,11 @@ use codex_hooks::HookResult;
|
||||
use codex_hooks::HookToolInput;
|
||||
use codex_hooks::HookToolInputLocalShell;
|
||||
use codex_hooks::HookToolKind;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_tools::ConfiguredToolSpec;
|
||||
use codex_tools::ResponsesApiNamespaceTool;
|
||||
use codex_tools::ToolName;
|
||||
use codex_tools::ToolSpec;
|
||||
use codex_utils_readiness::Readiness;
|
||||
@@ -94,11 +96,25 @@ pub trait ToolHandler: Send + Sync {
|
||||
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send;
|
||||
}
|
||||
|
||||
pub struct ToolArgumentDiffContext<'a> {
|
||||
turn: &'a TurnContext,
|
||||
}
|
||||
|
||||
impl<'a> ToolArgumentDiffContext<'a> {
|
||||
pub(crate) fn new(turn: &'a TurnContext) -> Self {
|
||||
Self { turn }
|
||||
}
|
||||
|
||||
pub fn feature_enabled(&self, feature: Feature) -> bool {
|
||||
self.turn.features.enabled(feature)
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes streamed argument diffs for a tool call and emits protocol events
|
||||
/// derived from partial tool input.
|
||||
pub(crate) trait ToolArgumentDiffConsumer: Send {
|
||||
pub trait ToolArgumentDiffConsumer: Send {
|
||||
/// Consume the next argument diff for a tool call.
|
||||
fn consume_diff(&mut self, turn: &TurnContext, call_id: String, diff: &str)
|
||||
fn consume_diff(&mut self, context: ToolArgumentDiffContext<'_>, call_id: String, diff: &str)
|
||||
-> Option<EventMsg>;
|
||||
|
||||
/// Finish consuming argument diffs before the tool call completes.
|
||||
@@ -107,7 +123,7 @@ pub(crate) trait ToolArgumentDiffConsumer: Send {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AnyToolResult {
|
||||
pub struct AnyToolResult {
|
||||
pub(crate) call_id: String,
|
||||
pub(crate) payload: ToolPayload,
|
||||
pub(crate) result: Box<dyn ToolOutput>,
|
||||
@@ -134,7 +150,7 @@ impl AnyToolResult {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct PreToolUsePayload {
|
||||
pub struct PreToolUsePayload {
|
||||
/// Hook-facing tool name model.
|
||||
///
|
||||
/// The canonical name is serialized to hook stdin, while aliases are used
|
||||
@@ -148,7 +164,7 @@ pub(crate) struct PreToolUsePayload {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub(crate) struct PostToolUsePayload {
|
||||
pub struct PostToolUsePayload {
|
||||
/// Hook-facing tool name model.
|
||||
///
|
||||
/// The canonical name is serialized to hook stdin, while aliases are used
|
||||
@@ -162,7 +178,9 @@ pub(crate) struct PostToolUsePayload {
|
||||
pub(crate) tool_response: Value,
|
||||
}
|
||||
|
||||
trait AnyToolHandler: Send + Sync {
|
||||
pub trait AnyToolHandler: Send + Sync {
|
||||
fn tool_name(&self) -> ToolName;
|
||||
|
||||
fn matches_kind(&self, payload: &ToolPayload) -> bool;
|
||||
|
||||
fn is_mutating<'a>(&'a self, invocation: &'a ToolInvocation) -> BoxFuture<'a, bool>;
|
||||
@@ -180,6 +198,10 @@ impl<T> AnyToolHandler for T
|
||||
where
|
||||
T: ToolHandler,
|
||||
{
|
||||
fn tool_name(&self) -> ToolName {
|
||||
ToolHandler::tool_name(self)
|
||||
}
|
||||
|
||||
fn matches_kind(&self, payload: &ToolPayload) -> bool {
|
||||
ToolHandler::matches_kind(self, payload)
|
||||
}
|
||||
@@ -539,14 +561,46 @@ impl ToolRegistryBuilder {
|
||||
where
|
||||
H: ToolHandler + 'static,
|
||||
{
|
||||
self.register_any_handler(handler);
|
||||
}
|
||||
|
||||
pub fn register_any_handler(&mut self, handler: Arc<dyn AnyToolHandler>) {
|
||||
let name = handler.tool_name();
|
||||
let display_name = name.display();
|
||||
let handler: Arc<dyn AnyToolHandler> = handler;
|
||||
if self.handlers.insert(name, handler).is_some() {
|
||||
warn!("overwriting handler for tool {display_name}");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_any_handler_if_configured(&mut self, handler: Arc<dyn AnyToolHandler>) {
|
||||
let name = handler.tool_name();
|
||||
let has_spec = self.specs.iter().any(|configured_tool| match &configured_tool.spec {
|
||||
ToolSpec::Function(tool) if name.namespace.is_none() => tool.name == name.name,
|
||||
ToolSpec::Freeform(tool) if name.namespace.is_none() => tool.name == name.name,
|
||||
ToolSpec::Namespace(namespace) => namespace.tools.iter().any(|tool| match tool {
|
||||
ResponsesApiNamespaceTool::Function(tool) => {
|
||||
name.namespace.as_deref() == Some(namespace.name.as_str())
|
||||
&& tool.name == name.name
|
||||
}
|
||||
}),
|
||||
ToolSpec::ToolSearch { .. } if name.namespace.is_none() => name.name == "tool_search",
|
||||
ToolSpec::LocalShell {} if name.namespace.is_none() => name.name == "local_shell",
|
||||
ToolSpec::ImageGeneration { .. } if name.namespace.is_none() => {
|
||||
name.name == "image_generation"
|
||||
}
|
||||
ToolSpec::WebSearch { .. } if name.namespace.is_none() => name.name == "web_search",
|
||||
ToolSpec::Function(_)
|
||||
| ToolSpec::Freeform(_)
|
||||
| ToolSpec::ToolSearch { .. }
|
||||
| ToolSpec::LocalShell {}
|
||||
| ToolSpec::ImageGeneration { .. }
|
||||
| ToolSpec::WebSearch { .. } => false,
|
||||
});
|
||||
if has_spec {
|
||||
self.register_any_handler(handler);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build(self) -> (Vec<ConfiguredToolSpec>, ToolRegistry) {
|
||||
let registry = ToolRegistry::new(self.handlers);
|
||||
(self.specs, registry)
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::session::turn_context::TurnContext;
|
||||
use crate::tools::context::SharedTurnDiffTracker;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::registry::AnyToolHandler;
|
||||
use crate::tools::registry::AnyToolResult;
|
||||
use crate::tools::registry::ToolArgumentDiffConsumer;
|
||||
use crate::tools::registry::ToolRegistry;
|
||||
@@ -50,6 +51,7 @@ pub(crate) struct ToolRouterParams<'a> {
|
||||
pub(crate) parallel_mcp_server_names: HashSet<String>,
|
||||
pub(crate) discoverable_tools: Option<Vec<DiscoverableTool>>,
|
||||
pub(crate) dynamic_tools: &'a [DynamicToolSpec],
|
||||
pub(crate) extension_tool_handlers: Vec<Arc<dyn AnyToolHandler>>,
|
||||
}
|
||||
|
||||
impl ToolRouter {
|
||||
@@ -61,8 +63,9 @@ impl ToolRouter {
|
||||
parallel_mcp_server_names,
|
||||
discoverable_tools,
|
||||
dynamic_tools,
|
||||
extension_tool_handlers,
|
||||
} = params;
|
||||
let builder = build_specs_with_discoverable_tools(
|
||||
let mut builder = build_specs_with_discoverable_tools(
|
||||
config,
|
||||
mcp_tools,
|
||||
deferred_mcp_tools,
|
||||
@@ -70,6 +73,9 @@ impl ToolRouter {
|
||||
discoverable_tools,
|
||||
dynamic_tools,
|
||||
);
|
||||
for handler in extension_tool_handlers {
|
||||
builder.register_any_handler_if_configured(handler);
|
||||
}
|
||||
let (specs, registry) = builder.build();
|
||||
let deferred_dynamic_tools = dynamic_tools
|
||||
.iter()
|
||||
|
||||
@@ -38,6 +38,7 @@ async fn parallel_support_does_not_match_namespaced_local_tool_names() -> anyhow
|
||||
parallel_mcp_server_names: HashSet::new(),
|
||||
discoverable_tools: None,
|
||||
dynamic_tools: turn.dynamic_tools.as_slice(),
|
||||
extension_tool_handlers: Vec::new(),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -111,6 +112,7 @@ async fn mcp_parallel_support_uses_exact_payload_server() -> anyhow::Result<()>
|
||||
parallel_mcp_server_names: HashSet::from(["echo".to_string()]),
|
||||
discoverable_tools: None,
|
||||
dynamic_tools: turn.dynamic_tools.as_slice(),
|
||||
extension_tool_handlers: Vec::new(),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -178,6 +180,7 @@ async fn model_visible_specs_filter_deferred_dynamic_tools() -> anyhow::Result<(
|
||||
parallel_mcp_server_names: HashSet::new(),
|
||||
discoverable_tools: None,
|
||||
dynamic_tools: &dynamic_tools,
|
||||
extension_tool_handlers: Vec::new(),
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -89,7 +89,6 @@ pub(crate) fn build_specs_with_discoverable_tools(
|
||||
use crate::tools::handlers::PlanHandler;
|
||||
use crate::tools::handlers::ReadMcpResourceHandler;
|
||||
use crate::tools::handlers::RequestPermissionsHandler;
|
||||
use crate::tools::handlers::RequestPluginInstallHandler;
|
||||
use crate::tools::handlers::RequestUserInputHandler;
|
||||
use crate::tools::handlers::ShellCommandHandler;
|
||||
use crate::tools::handlers::ShellHandler;
|
||||
@@ -284,9 +283,6 @@ pub(crate) fn build_specs_with_discoverable_tools(
|
||||
);
|
||||
builder.register_handler(Arc::new(ToolSearchHandler::new(entries)));
|
||||
}
|
||||
ToolHandlerKind::RequestPluginInstall => {
|
||||
builder.register_handler(Arc::new(RequestPluginInstallHandler));
|
||||
}
|
||||
ToolHandlerKind::UpdateGoal => {
|
||||
builder.register_handler(Arc::new(UpdateGoalHandler));
|
||||
}
|
||||
|
||||
@@ -354,6 +354,7 @@ async fn assert_model_tools(
|
||||
parallel_mcp_server_names: std::collections::HashSet::new(),
|
||||
discoverable_tools: None,
|
||||
dynamic_tools: &[],
|
||||
extension_tool_handlers: Vec::new(),
|
||||
},
|
||||
);
|
||||
let model_visible_specs = router.model_visible_specs();
|
||||
|
||||
@@ -433,17 +433,17 @@ impl TestCodexBuilder {
|
||||
let thread_store = thread_store_from_config(&config, state_db.clone());
|
||||
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
ThreadManager::new(
|
||||
ThreadManager::builder(
|
||||
&config,
|
||||
codex_core::test_support::auth_manager_from_auth(auth.clone()),
|
||||
SessionSource::Exec,
|
||||
Arc::clone(&environment_manager),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
installation_id,
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build()
|
||||
} else {
|
||||
codex_core::test_support::thread_manager_with_models_provider_and_home(
|
||||
auth.clone(),
|
||||
|
||||
@@ -1124,17 +1124,17 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
|
||||
let installation_id = resolve_installation_id(&config.codex_home)
|
||||
.await
|
||||
.expect("resolve installation id");
|
||||
let thread_manager = ThreadManager::new(
|
||||
let thread_manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager,
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
installation_id,
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
let NewThread { thread: codex, .. } = thread_manager
|
||||
.start_thread(config.clone())
|
||||
.await
|
||||
|
||||
@@ -66,17 +66,17 @@ impl MessageProcessor {
|
||||
let state_db = init_state_db_from_config(config.as_ref()).await?;
|
||||
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
|
||||
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
let thread_manager = Arc::new(ThreadManager::builder(
|
||||
config.as_ref(),
|
||||
auth_manager,
|
||||
SessionSource::Mcp,
|
||||
environment_manager,
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
installation_id,
|
||||
));
|
||||
)
|
||||
.session_source(SessionSource::Mcp)
|
||||
.build());
|
||||
Some(Self {
|
||||
outgoing,
|
||||
initialized: false,
|
||||
|
||||
@@ -121,17 +121,17 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
let environment_manager =
|
||||
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await);
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let thread_manager = ThreadManager::new(
|
||||
let thread_manager = ThreadManager::builder(
|
||||
&config,
|
||||
auth_manager,
|
||||
SessionSource::Exec,
|
||||
environment_manager,
|
||||
/*analytics_events_client*/ None,
|
||||
state_db,
|
||||
Arc::clone(&thread_store),
|
||||
agent_graph_store,
|
||||
installation_id,
|
||||
);
|
||||
)
|
||||
.session_source(SessionSource::Exec)
|
||||
.build();
|
||||
|
||||
let NewThread {
|
||||
thread_id, thread, ..
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use crate::CommandToolOptions;
|
||||
use crate::REQUEST_PLUGIN_INSTALL_TOOL_NAME;
|
||||
use crate::REQUEST_USER_INPUT_TOOL_NAME;
|
||||
use crate::ResponsesApiNamespace;
|
||||
use crate::ResponsesApiNamespaceTool;
|
||||
@@ -328,10 +327,6 @@ pub fn build_tool_registry_plan(
|
||||
/*supports_parallel_tool_calls*/ true,
|
||||
/*code_mode_enabled*/ false,
|
||||
);
|
||||
plan.register_handler(
|
||||
REQUEST_PLUGIN_INSTALL_TOOL_NAME,
|
||||
ToolHandlerKind::RequestPluginInstall,
|
||||
);
|
||||
}
|
||||
|
||||
if config.environment_mode.has_environment()
|
||||
|
||||
@@ -1868,10 +1868,7 @@ fn request_plugin_install_description_lists_discoverable_tools() {
|
||||
Some(discoverable_tools),
|
||||
&[],
|
||||
);
|
||||
assert!(handlers.contains(&ToolHandlerSpec {
|
||||
name: ToolName::plain(REQUEST_PLUGIN_INSTALL_TOOL_NAME),
|
||||
kind: ToolHandlerKind::RequestPluginInstall,
|
||||
}));
|
||||
assert!(handlers.is_empty());
|
||||
|
||||
let request_plugin_install = find_tool(&tools, REQUEST_PLUGIN_INSTALL_TOOL_NAME);
|
||||
let ToolSpec::Function(ResponsesApiTool {
|
||||
|
||||
@@ -29,7 +29,6 @@ pub enum ToolHandlerKind {
|
||||
Plan,
|
||||
ReadMcpResource,
|
||||
ReportAgentJobResult,
|
||||
RequestPluginInstall,
|
||||
RequestPermissions,
|
||||
RequestUserInput,
|
||||
ResumeAgentV1,
|
||||
|
||||
Reference in New Issue
Block a user