Compare commits

...

4 Commits

15 changed files with 1432 additions and 60 deletions

View File

@@ -1757,12 +1757,19 @@
},
{
"properties": {
"elicitation_id": {
"type": [
"string",
"null"
]
},
"id": {
"$ref": "#/definitions/RequestId"
},
"message": {
"type": "string"
},
"requested_schema": true,
"server_name": {
"type": "string"
},
@@ -1772,6 +1779,12 @@
],
"title": "ElicitationRequestEventMsgType",
"type": "string"
},
"url": {
"type": [
"string",
"null"
]
}
},
"required": [
@@ -7002,12 +7015,19 @@
},
{
"properties": {
"elicitation_id": {
"type": [
"string",
"null"
]
},
"id": {
"$ref": "#/definitions/RequestId"
},
"message": {
"type": "string"
},
"requested_schema": true,
"server_name": {
"type": "string"
},
@@ -7017,6 +7037,12 @@
],
"title": "ElicitationRequestEventMsgType",
"type": "string"
},
"url": {
"type": [
"string",
"null"
]
}
},
"required": [

View File

@@ -2818,12 +2818,19 @@
},
{
"properties": {
"elicitation_id": {
"type": [
"string",
"null"
]
},
"id": {
"$ref": "#/definitions/RequestId"
},
"message": {
"type": "string"
},
"requested_schema": true,
"server_name": {
"type": "string"
},
@@ -2833,6 +2840,12 @@
],
"title": "ElicitationRequestEventMsgType",
"type": "string"
},
"url": {
"type": [
"string",
"null"
]
}
},
"required": [

View File

@@ -1,5 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { JsonValue } from "./serde_json/JsonValue";
export type ElicitationRequestEvent = { server_name: string, id: string | number, message: string, };
export type ElicitationRequestEvent = { server_name: string, id: string | number, message: string, requested_schema?: JsonValue, url?: string, elicitation_id?: string, };

View File

@@ -66,6 +66,7 @@ use codex_protocol::items::PlanItem;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::mcp::RequestId as ProtocolRequestId;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::format_allow_prefixes;
use codex_protocol::openai_models::ModelInfo;
@@ -84,6 +85,7 @@ use codex_protocol::protocol::TurnContextNetworkItem;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::request_user_input::RequestUserInputArgs;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationAction;
use codex_rmcp_client::ElicitationResponse;
use codex_rmcp_client::OAuthCredentialsStoreMode;
use futures::future::BoxFuture;
@@ -153,6 +155,10 @@ use crate::instructions::UserInstructions;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp::effective_mcp_servers;
use crate::mcp::elicitation_url::build_url_elicitation_outcome_from_user_input;
use crate::mcp::elicitation_url::build_url_elicitation_request_user_input_args;
use crate::mcp::elicitation_url::is_url_elicitation_request;
use crate::mcp::is_apps_mcp_gateway_elicitation_flow_active;
use crate::mcp::maybe_prompt_and_install_mcp_dependencies;
use crate::mcp::with_codex_apps_mcp;
use crate::mcp_connection_manager::McpConnectionManager;
@@ -237,6 +243,8 @@ use crate::tasks::SessionTaskContext;
use crate::tools::ToolRouter;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::handlers::SEARCH_TOOL_BM25_TOOL_NAME;
use crate::tools::handlers::build_mcp_elicitation_request_user_input_args;
use crate::tools::handlers::build_mcp_elicitation_response_from_user_input;
use crate::tools::js_repl::JsReplHandle;
use crate::tools::network_approval::NetworkApprovalService;
use crate::tools::network_approval::build_blocked_request_observer;
@@ -525,6 +533,7 @@ impl Codex {
pub(crate) struct Session {
pub(crate) conversation_id: ThreadId,
tx_event: Sender<Event>,
mcp_event_tx: Sender<Event>,
agent_status: watch::Sender<AgentStatus>,
state: Mutex<SessionState>,
/// The set of enabled features should be invariant for the lifetime of the
@@ -923,6 +932,43 @@ impl Session {
});
}
// MCP startup/runtime events are emitted into a dedicated channel
// (`Session::mcp_event_tx`) instead of directly to `self.tx_event` so the
// session can inspect and selectively transform them first. Most events are
// forwarded unchanged, but codex_apps gateway elicitation requests are
// intercepted and converted into the request_user_input flow, with the
// elicitation resolved asynchronously rather than surfacing the raw MCP
// request event to clients. Both initial MCP startup and later MCP refreshes
// route through this forwarder, so this is the single interception point.
fn start_mcp_event_forwarder(self: &Arc<Self>, rx_event: Receiver<Event>) {
let weak_sess = Arc::downgrade(self);
tokio::spawn(async move {
while let Ok(event) = rx_event.recv().await {
let Some(sess) = weak_sess.upgrade() else {
break;
};
if let EventMsg::ElicitationRequest(elicitation) = &event.msg
&& is_apps_mcp_gateway_elicitation_flow_active(
&sess.features,
&elicitation.server_name,
)
{
let prompt_sess = Arc::clone(&sess);
let prompt_event = elicitation.clone();
tokio::spawn(async move {
prompt_sess
.prompt_codex_apps_elicitation_via_request_user_input(prompt_event)
.await;
});
continue;
}
if let Err(e) = sess.tx_event.send(event).await {
debug!("dropping event because channel is closed: {e}");
}
}
});
}
#[allow(clippy::too_many_arguments)]
fn make_turn_context(
auth_manager: Option<Arc<AuthManager>>,
@@ -1303,6 +1349,7 @@ impl Session {
// setup is straightforward enough and performs well.
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized(
&config.permissions.approval_policy,
config.features.enabled(Feature::AppsMcpGateway),
))),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
@@ -1358,9 +1405,11 @@ impl Session {
);
state.set_startup_regular_task(startup_regular_task);
let (tx_mcp_event, rx_mcp_event) = async_channel::unbounded();
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
mcp_event_tx: tx_mcp_event.clone(),
agent_status,
state: Mutex::new(state),
features: config.features.clone(),
@@ -1371,6 +1420,7 @@ impl Session {
js_repl,
next_internal_sub_id: AtomicU64::new(0),
});
sess.start_mcp_event_forwarder(rx_mcp_event);
if let Some(network_policy_decider_session) = network_policy_decider_session {
let mut guard = network_policy_decider_session.write().await;
*guard = Arc::downgrade(&sess);
@@ -1430,7 +1480,8 @@ impl Session {
config.mcp_oauth_credentials_store_mode,
auth_statuses.clone(),
&session_configuration.approval_policy,
tx_event.clone(),
tx_mcp_event.clone(),
config.features.enabled(Feature::AppsMcpGateway),
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth),
@@ -2278,6 +2329,79 @@ impl Session {
}
}
async fn prompt_codex_apps_elicitation_via_request_user_input(
&self,
elicitation: codex_protocol::approvals::ElicitationRequestEvent,
) {
let (response, completion_elicitation_id) = if let Some((
turn_context,
cancellation_token,
)) =
self.active_turn_context_and_cancellation_token().await
{
let is_url_elicitation = is_url_elicitation_request(&elicitation);
let args = if is_url_elicitation {
build_url_elicitation_request_user_input_args(&elicitation)
} else {
build_mcp_elicitation_request_user_input_args(&elicitation)
};
let call_id = format!("mcp_elicitation_request_{}", turn_context.sub_id);
let turn_sub_id = turn_context.sub_id.clone();
let user_input = tokio::select! {
biased;
_ = cancellation_token.cancelled() => {
let empty = RequestUserInputResponse {
answers: HashMap::new(),
};
self.notify_user_input_response(&turn_sub_id, empty).await;
None
}
response = self.request_user_input(turn_context.as_ref(), call_id, args) => response,
};
if is_url_elicitation {
let outcome =
build_url_elicitation_outcome_from_user_input(user_input, &elicitation);
(outcome.response, outcome.completion_elicitation_id)
} else {
(
build_mcp_elicitation_response_from_user_input(user_input, &elicitation),
None,
)
}
} else {
(
ElicitationResponse {
action: ElicitationAction::Cancel,
content: None,
},
None,
)
};
let server_name = elicitation.server_name.clone();
let request_id = protocol_request_id_to_rmcp(&elicitation.id);
if let Err(err) = self
.resolve_elicitation(server_name.clone(), request_id, response)
.await
{
warn!(
error = %err,
"failed to resolve codex_apps MCP elicitation request"
);
return;
}
if let Some(elicitation_id) = completion_elicitation_id
&& let Err(err) = self
.complete_url_elicitation(server_name, elicitation_id)
.await
{
warn!(
error = %err,
"failed to send codex_apps url elicitation completion notification"
);
}
}
pub(crate) async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) {
self.send_event(
turn_context,
@@ -2578,6 +2702,19 @@ impl Session {
.await
}
pub async fn complete_url_elicitation(
&self,
server_name: String,
elicitation_id: String,
) -> anyhow::Result<()> {
self.services
.mcp_connection_manager
.read()
.await
.complete_url_elicitation(server_name, elicitation_id)
.await
}
/// Records input items: always append to conversation history and
/// persist these response items to rollout.
pub(crate) async fn record_conversation_items(
@@ -3272,7 +3409,8 @@ impl Session {
store_mode,
auth_statuses,
&turn_context.config.permissions.approval_policy,
self.get_tx_event(),
self.mcp_event_tx.clone(),
turn_context.features.enabled(Feature::AppsMcpGateway),
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth.as_ref()),
@@ -3355,6 +3493,15 @@ impl Session {
}
}
fn protocol_request_id_to_rmcp(request_id: &ProtocolRequestId) -> RequestId {
match request_id {
ProtocolRequestId::String(value) => {
rmcp::model::NumberOrString::String(Arc::from(value.as_str()))
}
ProtocolRequestId::Integer(value) => rmcp::model::NumberOrString::Number(*value),
}
}
async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiver<Submission>) {
// To break out of this loop, send Op::Shutdown.
while let Ok(sub) = rx_sub.recv().await {
@@ -7947,9 +8094,11 @@ mod tests {
Arc::clone(&js_repl),
);
let (mcp_event_tx, _rx_mcp_event) = async_channel::unbounded();
let session = Session {
conversation_id,
tx_event,
mcp_event_tx,
agent_status: agent_status_tx,
state: Mutex::new(state),
features: config.features.clone(),
@@ -8100,9 +8249,11 @@ mod tests {
Arc::clone(&js_repl),
));
let (mcp_event_tx, _rx_mcp_event) = async_channel::unbounded();
let session = Arc::new(Session {
conversation_id,
tx_event,
mcp_event_tx,
agent_status: agent_status_tx,
state: Mutex::new(state),
features: config.features.clone(),

View File

@@ -157,6 +157,7 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
auth_status_entries,
&config.permissions.approval_policy,
tx_event,
config.features.enabled(Feature::AppsMcpGateway),
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth.as_ref()),

View File

@@ -125,7 +125,7 @@ pub enum Feature {
Collab,
/// Enable apps.
Apps,
/// Route apps MCP calls through the configured gateway.
/// Route apps MCP calls through the new gateway and enable elicitation flows.
AppsMcpGateway,
/// Allow prompting and installing missing MCP dependencies.
SkillMcpDependencyInstall,

View File

@@ -0,0 +1,637 @@
use std::collections::HashMap;
use std::collections::HashSet;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputQuestion;
use codex_protocol::request_user_input::RequestUserInputQuestionOption;
use codex_protocol::request_user_input::RequestUserInputResponse;
use regex_lite::Regex;
use serde_json::Map;
use serde_json::Number;
use serde_json::Value;
const REQUEST_USER_INPUT_NOTE_PREFIX: &str = "user_note: ";
const MCP_ELICITATION_FIELD_ID_PREFIX: &str = "mcp_elicitation_field";
#[derive(Clone, Debug)]
struct EnumChoice {
value: Value,
label: String,
}
#[derive(Clone, Debug)]
enum FieldKind {
String {
min_length: Option<usize>,
max_length: Option<usize>,
pattern: Option<String>,
format: Option<String>,
},
Number {
integer: bool,
minimum: Option<f64>,
maximum: Option<f64>,
},
Boolean,
SingleSelectEnum {
choices: Vec<EnumChoice>,
},
MultiSelectEnum {
choices: Vec<EnumChoice>,
min_items: Option<usize>,
max_items: Option<usize>,
},
}
#[derive(Clone, Debug)]
struct FieldSpec {
property_name: String,
question_id: String,
header: String,
description: Option<String>,
required: bool,
default: Option<Value>,
kind: FieldKind,
}
#[derive(Clone, Debug)]
struct ParsedSchema {
fields: Vec<FieldSpec>,
}
impl ParsedSchema {
fn from_requested_schema(requested_schema: &Value) -> Option<Self> {
let properties = requested_schema.get("properties")?.as_object()?;
let required: HashSet<&str> = requested_schema
.get("required")
.and_then(Value::as_array)
.map(|items| items.iter().filter_map(Value::as_str).collect())
.unwrap_or_default();
let mut used_ids = HashSet::new();
let fields = properties
.iter()
.enumerate()
.filter_map(|(idx, (property_name, schema))| {
FieldSpec::from_property(
property_name,
schema,
required.contains(property_name.as_str()),
idx,
&mut used_ids,
)
})
.collect();
Some(Self { fields })
}
}
impl FieldSpec {
fn from_property(
property_name: &str,
schema: &Value,
required: bool,
index: usize,
used_ids: &mut HashSet<String>,
) -> Option<Self> {
let schema_obj = schema.as_object()?;
let title = schema_obj
.get("title")
.and_then(Value::as_str)
.map(str::trim)
.filter(|title| !title.is_empty())
.map(ToOwned::to_owned);
let description = schema_obj
.get("description")
.and_then(Value::as_str)
.map(str::trim)
.filter(|desc| !desc.is_empty())
.map(ToOwned::to_owned);
let default = schema_obj.get("default").cloned();
let kind = parse_field_kind(schema_obj)?;
let question_id = build_question_id(property_name, index, used_ids);
Some(Self {
property_name: property_name.to_string(),
question_id,
header: title.unwrap_or_else(|| property_name.to_string()),
description,
required,
default,
kind,
})
}
fn to_question(&self) -> RequestUserInputQuestion {
let question = self.render_prompt();
let options = match &self.kind {
FieldKind::Boolean => Some(vec![
RequestUserInputQuestionOption {
label: "Yes".to_string(),
description: "Choose Yes.".to_string(),
},
RequestUserInputQuestionOption {
label: "No".to_string(),
description: "Choose No.".to_string(),
},
]),
FieldKind::SingleSelectEnum { choices } => Some(
choices
.iter()
.map(|choice| RequestUserInputQuestionOption {
label: choice.label.clone(),
description: format!(
"Use value `{}`.",
render_json_primitive(&choice.value)
),
})
.collect(),
),
FieldKind::String { .. }
| FieldKind::Number { .. }
| FieldKind::MultiSelectEnum { .. } => None,
};
// This keeps the minimal implementation isolated: multi-select enum values are entered
// as comma-separated text instead of true multi-select controls. A complete solution
// should add first-class multi-select support in RequestUserInput by:
// 1) Extending protocol question/answer types to represent multiple selected options.
// 2) Updating `tui/src/bottom_pane/request_user_input/mod.rs` to render selectable
// checkboxes (or equivalent) and commit multiple selections.
// 3) Updating answer rendering/history (`tui/src/history_cell.rs`) to display
// multi-select responses without relying on comma-splitting.
// 4) Removing comma-splitting/parsing in this adapter once structured answers exist.
RequestUserInputQuestion {
id: self.question_id.clone(),
header: self.header.clone(),
question,
is_other: false,
is_secret: false,
options,
}
}
fn render_prompt(&self) -> String {
let mut lines = Vec::new();
if let Some(description) = &self.description {
lines.push(description.clone());
}
let mut constraints = Vec::new();
match &self.kind {
FieldKind::String {
min_length,
max_length,
pattern,
format,
} => {
constraints.push("Enter text.".to_string());
if format.is_some() {
constraints.push(
"Use text in the requested format (for example: \"example\").".to_string(),
);
}
if let Some(min_length) = min_length {
constraints.push(format!("Use at least {min_length} characters."));
}
if let Some(max_length) = max_length {
constraints.push(format!("Use no more than {max_length} characters."));
}
if let Some(pattern) = pattern {
constraints.push(format!("Must follow this pattern: {pattern}."));
}
}
FieldKind::Number {
integer,
minimum,
maximum,
} => {
constraints.push(if *integer {
"Enter a whole number.".to_string()
} else {
"Enter a number.".to_string()
});
if let Some(minimum) = minimum {
constraints.push(format!("Must be at least {minimum}."));
}
if let Some(maximum) = maximum {
constraints.push(format!("Must be at most {maximum}."));
}
}
FieldKind::Boolean => {
constraints.push("Choose Yes or No below.".to_string());
}
FieldKind::SingleSelectEnum { .. } => {
constraints.push("Choose from one option below.".to_string());
}
FieldKind::MultiSelectEnum {
choices,
min_items,
max_items,
} => {
constraints.push("Choose one or more options.".to_string());
constraints.push(
"Enter multiple values separated by commas (for example: Red, Blue)."
.to_string(),
);
let values = choices
.iter()
.map(|choice| choice.label.clone())
.collect::<Vec<_>>()
.join(", ");
constraints.push(format!("Available options: {values}."));
if let Some(min_items) = min_items {
constraints.push(format!("Choose at least {min_items}."));
}
if let Some(max_items) = max_items {
constraints.push(format!("Choose no more than {max_items}."));
}
}
}
constraints.push(if self.required {
"This answer is required.".to_string()
} else {
"You can leave this blank.".to_string()
});
if let Some(default) = &self.default {
constraints.push(format!(
"If left blank, default is {}.",
render_json_primitive(default)
));
} else {
constraints.push("No default value is set.".to_string());
}
lines.extend(constraints);
lines.push(format!("Please enter a value for \"{}\".", self.header));
lines.join("\n")
}
fn parse_response_value(
&self,
answers: &HashMap<String, RequestUserInputAnswer>,
) -> Result<Option<Value>, ()> {
let answer = answers.get(&self.question_id);
let (selection, note) = extract_selection_and_note(answer);
let raw = note
.or(selection)
.map(str::trim)
.filter(|input| !input.is_empty());
match &self.kind {
FieldKind::String {
min_length,
max_length,
pattern,
..
} => {
let Some(raw) = raw else {
return self.default_or_required_value();
};
if let Some(min_length) = min_length
&& raw.chars().count() < *min_length
{
return Err(());
}
if let Some(max_length) = max_length
&& raw.chars().count() > *max_length
{
return Err(());
}
if let Some(pattern) = pattern
&& let Ok(regex) = Regex::new(pattern)
&& !regex.is_match(raw)
{
return Err(());
}
Ok(Some(Value::String(raw.to_string())))
}
FieldKind::Number {
integer,
minimum,
maximum,
} => {
let Some(raw) = raw else {
return self.default_or_required_value();
};
if *integer {
let parsed = raw.parse::<i64>().map_err(|_| ())?;
let parsed_f64 = parsed as f64;
if let Some(minimum) = minimum
&& parsed_f64 < *minimum
{
return Err(());
}
if let Some(maximum) = maximum
&& parsed_f64 > *maximum
{
return Err(());
}
Ok(Some(Value::Number(Number::from(parsed))))
} else {
let parsed = raw.parse::<f64>().map_err(|_| ())?;
if let Some(minimum) = minimum
&& parsed < *minimum
{
return Err(());
}
if let Some(maximum) = maximum
&& parsed > *maximum
{
return Err(());
}
let number = Number::from_f64(parsed).ok_or(())?;
Ok(Some(Value::Number(number)))
}
}
FieldKind::Boolean => {
let Some(raw) = raw else {
return self.default_or_required_value();
};
let bool_value = match raw.to_ascii_lowercase().as_str() {
"true" | "yes" | "y" | "1" => true,
"false" | "no" | "n" | "0" => false,
_ => return Err(()),
};
Ok(Some(Value::Bool(bool_value)))
}
FieldKind::SingleSelectEnum { choices } => {
let Some(raw) = raw else {
return self.default_or_required_value();
};
let selected = choices.iter().find_map(|choice| {
if choice.label == raw || render_json_primitive(&choice.value) == raw {
Some(choice.value.clone())
} else {
None
}
});
selected.map(Some).ok_or(())
}
FieldKind::MultiSelectEnum {
choices,
min_items,
max_items,
} => {
let Some(raw) = raw else {
return self.default_or_required_value();
};
let parts: Vec<&str> = raw
.split(',')
.map(str::trim)
.filter(|part| !part.is_empty())
.collect();
if let Some(min_items) = min_items
&& parts.len() < *min_items
{
return Err(());
}
if let Some(max_items) = max_items
&& parts.len() > *max_items
{
return Err(());
}
let values = parts
.into_iter()
.map(|part| {
choices.iter().find_map(|choice| {
if choice.label == part || render_json_primitive(&choice.value) == part
{
Some(choice.value.clone())
} else {
None
}
})
})
.collect::<Option<Vec<_>>>()
.ok_or(())?;
Ok(Some(Value::Array(values)))
}
}
}
fn default_or_required_value(&self) -> Result<Option<Value>, ()> {
if let Some(default) = &self.default {
return Ok(Some(default.clone()));
}
if self.required { Err(()) } else { Ok(None) }
}
}
pub(crate) fn build_elicitation_content_questions(
requested_schema: Option<&Value>,
) -> Vec<RequestUserInputQuestion> {
let Some(requested_schema) = requested_schema else {
return Vec::new();
};
let Some(parsed_schema) = ParsedSchema::from_requested_schema(requested_schema) else {
return Vec::new();
};
parsed_schema
.fields
.into_iter()
.map(|field| field.to_question())
.collect()
}
pub(crate) fn build_elicitation_content_from_response(
response: &RequestUserInputResponse,
requested_schema: Option<&Value>,
) -> Result<Option<Value>, ()> {
let Some(requested_schema) = requested_schema else {
return Ok(None);
};
let Some(parsed_schema) = ParsedSchema::from_requested_schema(requested_schema) else {
return Ok(None);
};
let mut content = Map::new();
for field in parsed_schema.fields {
if let Some(value) = field.parse_response_value(&response.answers)? {
content.insert(field.property_name, value);
}
}
Ok(Some(Value::Object(content)))
}
fn parse_field_kind(schema_obj: &Map<String, Value>) -> Option<FieldKind> {
let field_type = schema_obj.get("type").and_then(Value::as_str);
if field_type == Some("string") {
if let Some(choices) = parse_choices_from_enum(schema_obj) {
return Some(FieldKind::SingleSelectEnum { choices });
}
if let Some(choices) = parse_choices_from_const_union(schema_obj.get("oneOf")) {
return Some(FieldKind::SingleSelectEnum { choices });
}
return Some(FieldKind::String {
min_length: schema_obj
.get("minLength")
.and_then(Value::as_u64)
.map(|value| value as usize),
max_length: schema_obj
.get("maxLength")
.and_then(Value::as_u64)
.map(|value| value as usize),
pattern: schema_obj
.get("pattern")
.and_then(Value::as_str)
.map(ToOwned::to_owned),
format: schema_obj
.get("format")
.and_then(Value::as_str)
.map(ToOwned::to_owned),
});
}
if matches!(field_type, Some("number") | Some("integer")) {
return Some(FieldKind::Number {
integer: field_type == Some("integer"),
minimum: schema_obj.get("minimum").and_then(Value::as_f64),
maximum: schema_obj.get("maximum").and_then(Value::as_f64),
});
}
if field_type == Some("boolean") {
return Some(FieldKind::Boolean);
}
if field_type == Some("array")
&& let Some(items) = schema_obj.get("items").and_then(Value::as_object)
{
if let Some(choices) = parse_choices_from_enum(items) {
return Some(FieldKind::MultiSelectEnum {
choices,
min_items: schema_obj
.get("minItems")
.and_then(Value::as_u64)
.map(|value| value as usize),
max_items: schema_obj
.get("maxItems")
.and_then(Value::as_u64)
.map(|value| value as usize),
});
}
if let Some(choices) = parse_choices_from_const_union(items.get("anyOf")) {
return Some(FieldKind::MultiSelectEnum {
choices,
min_items: schema_obj
.get("minItems")
.and_then(Value::as_u64)
.map(|value| value as usize),
max_items: schema_obj
.get("maxItems")
.and_then(Value::as_u64)
.map(|value| value as usize),
});
}
}
None
}
fn parse_choices_from_enum(schema_obj: &Map<String, Value>) -> Option<Vec<EnumChoice>> {
let values = schema_obj.get("enum")?.as_array()?;
let choices: Vec<EnumChoice> = values
.iter()
.filter(|value| is_supported_primitive(value))
.map(|value| EnumChoice {
value: value.clone(),
label: render_json_primitive(value),
})
.collect();
if choices.is_empty() {
None
} else {
Some(choices)
}
}
fn parse_choices_from_const_union(value: Option<&Value>) -> Option<Vec<EnumChoice>> {
let entries = value?.as_array()?;
let choices: Vec<EnumChoice> = entries
.iter()
.filter_map(|entry| {
let obj = entry.as_object()?;
let const_value = obj.get("const")?;
if !is_supported_primitive(const_value) {
return None;
}
let label = obj
.get("title")
.and_then(Value::as_str)
.map(str::trim)
.filter(|title| !title.is_empty())
.map(ToOwned::to_owned)
.unwrap_or_else(|| render_json_primitive(const_value));
Some(EnumChoice {
value: const_value.clone(),
label,
})
})
.collect();
if choices.is_empty() {
None
} else {
Some(choices)
}
}
fn build_question_id(property_name: &str, index: usize, used_ids: &mut HashSet<String>) -> String {
let sanitized: String = property_name
.chars()
.map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' })
.collect();
let base = format!("{MCP_ELICITATION_FIELD_ID_PREFIX}_{index}_{sanitized}");
let mut candidate = base.clone();
let mut suffix = 1usize;
while !used_ids.insert(candidate.clone()) {
candidate = format!("{base}_{suffix}");
suffix += 1;
}
candidate
}
fn extract_selection_and_note(
answer: Option<&RequestUserInputAnswer>,
) -> (Option<&str>, Option<&str>) {
let Some(answer) = answer else {
return (None, None);
};
let mut selection = None;
let mut note = None;
for entry in &answer.answers {
if let Some(value) = entry.strip_prefix(REQUEST_USER_INPUT_NOTE_PREFIX) {
let value = value.trim();
if !value.is_empty() {
note = Some(value);
}
continue;
}
if selection.is_none() {
let value = entry.trim();
if !value.is_empty() {
selection = Some(value);
}
}
}
(selection, note)
}
fn is_supported_primitive(value: &Value) -> bool {
value.is_string() || value.is_number() || value.is_boolean()
}
fn render_json_primitive(value: &Value) -> String {
match value {
Value::String(s) => s.clone(),
Value::Number(number) => number.to_string(),
Value::Bool(boolean) => boolean.to_string(),
_ => value.to_string(),
}
}

View File

@@ -0,0 +1,135 @@
use codex_protocol::approvals::ElicitationRequestEvent;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputArgs;
use codex_protocol::request_user_input::RequestUserInputQuestion;
use codex_protocol::request_user_input::RequestUserInputQuestionOption;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationAction;
use codex_rmcp_client::ElicitationResponse;
const REQUEST_USER_INPUT_NOTE_PREFIX: &str = "user_note: ";
const MCP_URL_ELICITATION_DECISION_QUESTION_ID: &str = "mcp_url_elicitation_decision";
const MCP_URL_ELICITATION_COMPLETED: &str = "Completed";
const MCP_URL_ELICITATION_DECLINE: &str = "Decline";
const MCP_URL_ELICITATION_CANCEL: &str = "Cancel";
pub(crate) fn is_url_elicitation_request(elicitation: &ElicitationRequestEvent) -> bool {
elicitation.url.is_some()
}
pub(crate) struct UrlElicitationPromptOutcome {
pub(crate) response: ElicitationResponse,
pub(crate) completion_elicitation_id: Option<String>,
}
pub(crate) fn build_url_elicitation_request_user_input_args(
elicitation: &ElicitationRequestEvent,
) -> RequestUserInputArgs {
let mut question_lines = vec![elicitation.message.clone()];
if let Some(url) = &elicitation.url {
question_lines.push(String::new());
question_lines.push("Open this URL in your browser to continue:".to_string());
question_lines.push(url.clone());
question_lines.push(String::new());
question_lines.push(
"After completing the flow in your browser, return here and choose Completed."
.to_string(),
);
}
if elicitation.elicitation_id.is_none() {
question_lines.push(String::new());
question_lines.push(
"This request is missing required URL elicitation metadata, so it cannot be completed safely."
.to_string(),
);
}
RequestUserInputArgs {
questions: vec![RequestUserInputQuestion {
id: MCP_URL_ELICITATION_DECISION_QUESTION_ID.to_string(),
header: "MCP URL elicitation".to_string(),
question: question_lines.join("\n"),
is_other: false,
is_secret: false,
options: Some(vec![
RequestUserInputQuestionOption {
label: MCP_URL_ELICITATION_COMPLETED.to_string(),
description: "You completed the browser flow and want to continue.".to_string(),
},
RequestUserInputQuestionOption {
label: MCP_URL_ELICITATION_DECLINE.to_string(),
description: "Decline this URL elicitation request.".to_string(),
},
RequestUserInputQuestionOption {
label: MCP_URL_ELICITATION_CANCEL.to_string(),
description: "Cancel this URL elicitation request.".to_string(),
},
]),
}],
}
}
pub(crate) fn build_url_elicitation_outcome_from_user_input(
response: Option<RequestUserInputResponse>,
elicitation: &ElicitationRequestEvent,
) -> UrlElicitationPromptOutcome {
let Some(response) = response else {
return UrlElicitationPromptOutcome {
response: ElicitationResponse {
action: ElicitationAction::Cancel,
content: None,
},
completion_elicitation_id: None,
};
};
let action = response
.answers
.get(MCP_URL_ELICITATION_DECISION_QUESTION_ID)
.and_then(request_user_input_answer_to_url_elicitation_action)
.unwrap_or(ElicitationAction::Cancel);
match action {
ElicitationAction::Accept => {
let Some(elicitation_id) = elicitation.elicitation_id.clone() else {
return UrlElicitationPromptOutcome {
response: ElicitationResponse {
action: ElicitationAction::Cancel,
content: None,
},
completion_elicitation_id: None,
};
};
UrlElicitationPromptOutcome {
response: ElicitationResponse {
action: ElicitationAction::Accept,
content: None,
},
completion_elicitation_id: Some(elicitation_id),
}
}
ElicitationAction::Decline | ElicitationAction::Cancel => UrlElicitationPromptOutcome {
response: ElicitationResponse {
action,
content: None,
},
completion_elicitation_id: None,
},
}
}
fn request_user_input_answer_to_url_elicitation_action(
answer: &RequestUserInputAnswer,
) -> Option<ElicitationAction> {
answer.answers.iter().find_map(|entry| {
if entry.starts_with(REQUEST_USER_INPUT_NOTE_PREFIX) {
return None;
}
match entry.as_str() {
MCP_URL_ELICITATION_COMPLETED => Some(ElicitationAction::Accept),
MCP_URL_ELICITATION_DECLINE => Some(ElicitationAction::Decline),
MCP_URL_ELICITATION_CANCEL => Some(ElicitationAction::Cancel),
_ => None,
}
})
}

View File

@@ -1,4 +1,6 @@
pub mod auth;
pub(crate) mod elicitation_form;
pub(crate) mod elicitation_url;
mod skill_dependencies;
pub(crate) use skill_dependencies::maybe_prompt_and_install_mcp_dependencies;
@@ -21,6 +23,7 @@ use crate::config::Config;
use crate::config::types::McpServerConfig;
use crate::config::types::McpServerTransportConfig;
use crate::features::Feature;
use crate::features::Features;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::SandboxState;
@@ -30,8 +33,15 @@ const MCP_TOOL_NAME_PREFIX: &str = "mcp";
const MCP_TOOL_NAME_DELIMITER: &str = "__";
pub(crate) const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps";
const CODEX_CONNECTORS_TOKEN_ENV_VAR: &str = "CODEX_CONNECTORS_TOKEN";
const OPENAI_CONNECTORS_MCP_BASE_URL: &str = "https://api.openai.com";
const OPENAI_CONNECTORS_MCP_PATH: &str = "/v1/connectors/gateways/flat/mcp";
const OPENAI_CONNECTORS_MCP_URL: &str = "https://api.openai.com/v1/connectors/gateways/flat/";
pub(crate) fn is_apps_mcp_gateway_elicitation_flow_active(
features: &Features,
server_name: &str,
) -> bool {
features.enabled(Feature::AppsMcpGateway) && server_name == CODEX_APPS_MCP_SERVER_NAME
}
// Legacy vs new MCP gateway
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -95,7 +105,7 @@ fn normalize_codex_apps_base_url(base_url: &str) -> String {
fn codex_apps_mcp_url_for_gateway(base_url: &str, gateway: CodexAppsMcpGateway) -> String {
if gateway == CodexAppsMcpGateway::MCPGateway {
return format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
return OPENAI_CONNECTORS_MCP_URL.to_string();
}
let base_url = normalize_codex_apps_base_url(base_url);
@@ -208,6 +218,7 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent
auth_status_entries.clone(),
&config.permissions.approval_policy,
tx_event,
config.features.enabled(Feature::AppsMcpGateway),
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth.as_ref()),
@@ -452,35 +463,33 @@ mod tests {
#[test]
fn codex_apps_mcp_url_for_gateway_uses_openai_connectors_gateway() {
let expected_url = format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
assert_eq!(
codex_apps_mcp_url_for_gateway(
"https://chatgpt.com/backend-api",
CodexAppsMcpGateway::MCPGateway
),
expected_url.as_str()
OPENAI_CONNECTORS_MCP_URL
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"https://chat.openai.com",
CodexAppsMcpGateway::MCPGateway
),
expected_url.as_str()
OPENAI_CONNECTORS_MCP_URL
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"http://localhost:8080/api/codex",
CodexAppsMcpGateway::MCPGateway
),
expected_url.as_str()
OPENAI_CONNECTORS_MCP_URL
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"http://localhost:8080",
CodexAppsMcpGateway::MCPGateway
),
expected_url.as_str()
OPENAI_CONNECTORS_MCP_URL
);
}
@@ -501,10 +510,7 @@ mod tests {
config.chatgpt_base_url = "https://chatgpt.com".to_string();
config.features.enable(Feature::AppsMcpGateway);
assert_eq!(
codex_apps_mcp_url(&config),
format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}")
);
assert_eq!(codex_apps_mcp_url(&config), OPENAI_CONNECTORS_MCP_URL);
}
#[test]
@@ -538,7 +544,6 @@ mod tests {
_ => panic!("expected streamable http transport for codex apps"),
};
let expected_url = format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
assert_eq!(url, &expected_url);
assert_eq!(url, OPENAI_CONNECTORS_MCP_URL);
}
}

View File

@@ -62,6 +62,7 @@ use rmcp::model::RequestId;
use rmcp::model::Resource;
use rmcp::model::ResourceTemplate;
use rmcp::model::Tool;
use rmcp::model::UrlElicitationCapability;
use serde::Deserialize;
use serde::Serialize;
@@ -249,14 +250,16 @@ fn elicitation_is_rejected_by_policy(approval_policy: AskForApproval) -> bool {
#[derive(Clone)]
struct ElicitationRequestManager {
requests: Arc<Mutex<ResponderMap>>,
mcp_elicitations_enabled: bool,
approval_policy: Arc<StdMutex<AskForApproval>>,
}
impl ElicitationRequestManager {
fn new(approval_policy: AskForApproval) -> Self {
fn new(approval_policy: AskForApproval, mcp_elicitations_enabled: bool) -> Self {
Self {
requests: Arc::new(Mutex::new(HashMap::new())),
approval_policy: Arc::new(StdMutex::new(approval_policy)),
mcp_elicitations_enabled,
}
}
@@ -278,6 +281,7 @@ impl ElicitationRequestManager {
fn make_sender(&self, server_name: String, tx_event: Sender<Event>) -> SendElicitation {
let elicitation_requests = self.requests.clone();
let approval_policy = self.approval_policy.clone();
let mcp_elicitations_enabled = self.mcp_elicitations_enabled;
Box::new(move |id, elicitation| {
let elicitation_requests = elicitation_requests.clone();
let tx_event = tx_event.clone();
@@ -299,29 +303,49 @@ impl ElicitationRequestManager {
let mut lock = elicitation_requests.lock().await;
lock.insert((server_name.clone(), id.clone()), tx);
}
let request_id = match id.clone() {
rmcp::model::NumberOrString::String(value) => {
ProtocolRequestId::String(value.to_string())
}
rmcp::model::NumberOrString::Number(value) => ProtocolRequestId::Integer(value),
};
let (message, requested_schema, url, elicitation_id) = match elicitation {
CreateElicitationRequestParams::FormElicitationParams {
message,
requested_schema,
..
} => (
message,
if mcp_elicitations_enabled {
serde_json::to_value(requested_schema).ok()
} else {
None
},
None,
None,
),
CreateElicitationRequestParams::UrlElicitationParams {
message,
url,
elicitation_id,
..
} => (
message,
None,
mcp_elicitations_enabled.then_some(url),
mcp_elicitations_enabled.then_some(elicitation_id),
),
};
let _ = tx_event
.send(Event {
id: "mcp_elicitation_request".to_string(),
msg: EventMsg::ElicitationRequest(ElicitationRequestEvent {
server_name,
id: match id.clone() {
rmcp::model::NumberOrString::String(value) => {
ProtocolRequestId::String(value.to_string())
}
rmcp::model::NumberOrString::Number(value) => {
ProtocolRequestId::Integer(value)
}
},
message: match elicitation {
CreateElicitationRequestParams::FormElicitationParams {
message,
..
}
| CreateElicitationRequestParams::UrlElicitationParams {
message,
..
} => message,
},
id: request_id,
message,
requested_schema,
url,
elicitation_id,
}),
})
.await;
@@ -513,11 +537,17 @@ pub(crate) struct McpConnectionManager {
}
impl McpConnectionManager {
pub(crate) fn new_uninitialized(approval_policy: &Constrained<AskForApproval>) -> Self {
pub(crate) fn new_uninitialized(
approval_policy: &Constrained<AskForApproval>,
mcp_elicitations_enabled: bool,
) -> Self {
Self {
clients: HashMap::new(),
server_origins: HashMap::new(),
elicitation_requests: ElicitationRequestManager::new(approval_policy.value()),
elicitation_requests: ElicitationRequestManager::new(
approval_policy.value(),
mcp_elicitations_enabled,
),
}
}
@@ -525,7 +555,7 @@ impl McpConnectionManager {
pub(crate) fn new_mcp_connection_manager_for_tests(
approval_policy: &Constrained<AskForApproval>,
) -> Self {
Self::new_uninitialized(approval_policy)
Self::new_uninitialized(approval_policy, false)
}
pub(crate) fn has_servers(&self) -> bool {
@@ -549,6 +579,7 @@ impl McpConnectionManager {
auth_entries: HashMap<String, McpAuthStatusEntry>,
approval_policy: &Constrained<AskForApproval>,
tx_event: Sender<Event>,
mcp_elicitations_enabled: bool,
initial_sandbox_state: SandboxState,
codex_home: PathBuf,
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
@@ -557,7 +588,8 @@ impl McpConnectionManager {
let mut clients = HashMap::new();
let mut server_origins = HashMap::new();
let mut join_set = JoinSet::new();
let elicitation_requests = ElicitationRequestManager::new(approval_policy.value());
let elicitation_requests =
ElicitationRequestManager::new(approval_policy.value(), mcp_elicitations_enabled);
let mcp_servers = mcp_servers.clone();
for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) {
if let Some(origin) = transport_origin(&cfg.transport) {
@@ -683,6 +715,24 @@ impl McpConnectionManager {
.await
}
pub async fn complete_url_elicitation(
&self,
server_name: String,
elicitation_id: String,
) -> Result<()> {
let managed_client = self.client_by_name(&server_name).await?;
managed_client
.client
.send_custom_notification(
"notifications/elicitation/complete",
Some(serde_json::json!({ "elicitationId": elicitation_id })),
)
.await
.with_context(|| {
format!("failed sending url elicitation completion for `{server_name}`")
})
}
pub(crate) async fn wait_for_server_ready(&self, server_name: &str, timeout: Duration) -> bool {
let Some(async_managed_client) = self.clients.get(server_name) else {
return false;
@@ -752,6 +802,7 @@ impl McpConnectionManager {
CODEX_APPS_MCP_SERVER_NAME,
&managed_client.client,
managed_client.tool_timeout,
self.elicitation_requests.mcp_elicitations_enabled,
)
.await
.with_context(|| {
@@ -1206,7 +1257,10 @@ impl From<anyhow::Error> for StartupOutcomeError {
}
}
fn elicitation_capability_for_server(server_name: &str) -> Option<ElicitationCapability> {
fn elicitation_capability_for_server(
server_name: &str,
mcp_elicitations_enabled: bool,
) -> Option<ElicitationCapability> {
if server_name == CODEX_APPS_MCP_SERVER_NAME {
// https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities
// indicates this should be an empty object.
@@ -1214,7 +1268,7 @@ fn elicitation_capability_for_server(server_name: &str) -> Option<ElicitationCap
form: Some(FormElicitationCapability {
schema_validation: None,
}),
url: None,
url: mcp_elicitations_enabled.then_some(UrlElicitationCapability {}),
})
} else {
None
@@ -1234,7 +1288,10 @@ async fn start_server_task(
elicitation_requests,
codex_apps_tools_cache_context,
} = params;
let elicitation = elicitation_capability_for_server(&server_name);
let elicitation = elicitation_capability_for_server(
&server_name,
elicitation_requests.mcp_elicitations_enabled,
);
let params = InitializeRequestParams {
meta: None,
capabilities: ClientCapabilities {
@@ -1265,9 +1322,14 @@ async fn start_server_task(
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(&server_name, &client, startup_timeout)
.await
.map_err(StartupOutcomeError::from)?;
let tools = list_tools_for_client_uncached(
&server_name,
&client,
startup_timeout,
elicitation_requests.mcp_elicitations_enabled,
)
.await
.map_err(StartupOutcomeError::from)?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
@@ -1474,10 +1536,38 @@ async fn list_tools_for_client_uncached(
server_name: &str,
client: &Arc<RmcpClient>,
timeout: Option<Duration>,
mcp_elicitations_enabled: bool,
) -> Result<Vec<ToolInfo>> {
let resp = client.list_tools_with_connector_ids(None, timeout).await?;
let tools = resp
.tools
// If MCP elicitations + codex_apps, paginate tools to avoid partial list
let paginate_tools = mcp_elicitations_enabled && server_name == CODEX_APPS_MCP_SERVER_NAME;
let raw_tools = if paginate_tools {
let mut collected = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
meta: None,
cursor: Some(next.clone()),
});
let response = client
.list_tools_with_connector_ids(params, timeout)
.await?;
collected.extend(response.tools);
match response.next_cursor {
Some(next) => {
if cursor.as_ref() == Some(&next) {
return Err(anyhow!("tools/list returned duplicate cursor"));
}
cursor = Some(next);
}
None => break collected,
}
}
} else {
let resp = client.list_tools_with_connector_ids(None, timeout).await?;
resp.tools
};
let tools = raw_tools
.into_iter()
.map(|tool| {
let connector_name = tool.connector_name;
@@ -1984,7 +2074,7 @@ mod tests {
.boxed()
.shared();
let approval_policy = Constrained::allow_any(AskForApproval::OnFailure);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy, false);
manager.clients.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
AsyncManagedClient {
@@ -2009,7 +2099,7 @@ mod tests {
.boxed()
.shared();
let approval_policy = Constrained::allow_any(AskForApproval::OnFailure);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy, false);
manager.clients.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
AsyncManagedClient {
@@ -2031,7 +2121,7 @@ mod tests {
.boxed()
.shared();
let approval_policy = Constrained::allow_any(AskForApproval::OnFailure);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy, false);
manager.clients.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
AsyncManagedClient {
@@ -2061,7 +2151,7 @@ mod tests {
.boxed()
.shared();
let approval_policy = Constrained::allow_any(AskForApproval::OnFailure);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy, false);
let startup_complete = Arc::new(std::sync::atomic::AtomicBool::new(true));
manager.clients.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
@@ -2082,9 +2172,22 @@ mod tests {
#[test]
fn elicitation_capability_enabled_only_for_codex_apps() {
let codex_apps_capability = elicitation_capability_for_server(CODEX_APPS_MCP_SERVER_NAME);
let codex_apps_capability =
elicitation_capability_for_server(CODEX_APPS_MCP_SERVER_NAME, true);
assert!(matches!(
codex_apps_capability,
Some(ElicitationCapability {
form: Some(FormElicitationCapability {
schema_validation: None
}),
url: Some(UrlElicitationCapability {}),
})
));
let codex_apps_without_feature =
elicitation_capability_for_server(CODEX_APPS_MCP_SERVER_NAME, false);
assert!(matches!(
codex_apps_without_feature,
Some(ElicitationCapability {
form: Some(FormElicitationCapability {
schema_validation: None
@@ -2093,7 +2196,7 @@ mod tests {
})
));
assert!(elicitation_capability_for_server("custom_mcp").is_none());
assert!(elicitation_capability_for_server("custom_mcp", true).is_none());
}
#[test]

View File

@@ -10,6 +10,7 @@ use crate::codex::TurnContext;
use crate::config::types::AppToolApproval;
use crate::connectors;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::is_apps_mcp_gateway_elicitation_flow_active;
use crate::protocol::EventMsg;
use crate::protocol::McpInvocation;
use crate::protocol::McpToolCallBeginEvent;
@@ -352,6 +353,10 @@ async fn maybe_request_mcp_tool_approval(
}
}
if is_apps_mcp_gateway_elicitation_flow_active(&turn_context.config.features, server) {
return None;
}
let approval_key = if approval_mode == AppToolApproval::Auto {
let connector_id = metadata.and_then(|metadata| metadata.connector_id.clone());
if server == CODEX_APPS_MCP_SERVER_NAME && connector_id.is_none() {

View File

@@ -31,6 +31,8 @@ pub use multi_agents::MultiAgentHandler;
pub use plan::PlanHandler;
pub use read_file::ReadFileHandler;
pub use request_user_input::RequestUserInputHandler;
pub(crate) use request_user_input::build_mcp_elicitation_request_user_input_args;
pub(crate) use request_user_input::build_mcp_elicitation_response_from_user_input;
pub(crate) use request_user_input::request_user_input_tool_description;
pub(crate) use search_tool_bm25::DEFAULT_LIMIT as SEARCH_TOOL_BM25_DEFAULT_LIMIT;
pub(crate) use search_tool_bm25::SEARCH_TOOL_BM25_TOOL_NAME;

View File

@@ -1,7 +1,16 @@
use async_trait::async_trait;
use codex_protocol::approvals::ElicitationRequestEvent;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputQuestion;
use codex_protocol::request_user_input::RequestUserInputQuestionOption;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationAction;
use codex_rmcp_client::ElicitationResponse;
use crate::function_tool::FunctionCallError;
use crate::mcp::elicitation_form::build_elicitation_content_from_response;
use crate::mcp::elicitation_form::build_elicitation_content_questions;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
@@ -12,6 +21,12 @@ use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::TUI_VISIBLE_COLLABORATION_MODES;
use codex_protocol::request_user_input::RequestUserInputArgs;
const MCP_ELICITATION_DECISION_QUESTION_ID: &str = "mcp_elicitation_decision";
const MCP_ELICITATION_ACCEPT: &str = "Accept";
const MCP_ELICITATION_DECLINE: &str = "Decline";
const MCP_ELICITATION_CANCEL: &str = "Cancel";
const REQUEST_USER_INPUT_NOTE_PREFIX: &str = "user_note: ";
fn format_allowed_modes() -> String {
let mode_names: Vec<&str> = TUI_VISIBLE_COLLABORATION_MODES
.into_iter()
@@ -45,6 +60,103 @@ pub(crate) fn request_user_input_tool_description() -> String {
)
}
pub(crate) fn build_mcp_elicitation_request_user_input_args(
elicitation: &ElicitationRequestEvent,
) -> RequestUserInputArgs {
let mut question = elicitation.message.clone();
if let Some(url) = &elicitation.url {
question = format!("{question}\nURL: {url}");
}
let mut questions = vec![RequestUserInputQuestion {
id: MCP_ELICITATION_DECISION_QUESTION_ID.to_string(),
header: "MCP elicitation".to_string(),
question,
is_other: true,
is_secret: false,
options: Some(vec![
RequestUserInputQuestionOption {
label: MCP_ELICITATION_ACCEPT.to_string(),
description: "Accept this elicitation request.".to_string(),
},
RequestUserInputQuestionOption {
label: MCP_ELICITATION_DECLINE.to_string(),
description: "Decline this elicitation request.".to_string(),
},
RequestUserInputQuestionOption {
label: MCP_ELICITATION_CANCEL.to_string(),
description: "Cancel this elicitation request.".to_string(),
},
]),
}];
questions.extend(build_elicitation_content_questions(
elicitation.requested_schema.as_ref(),
));
RequestUserInputArgs { questions }
}
pub(crate) fn build_mcp_elicitation_response_from_user_input(
response: Option<RequestUserInputResponse>,
elicitation: &ElicitationRequestEvent,
) -> ElicitationResponse {
let Some(response) = response else {
return ElicitationResponse {
action: ElicitationAction::Cancel,
content: None,
};
};
let action = response
.answers
.get(MCP_ELICITATION_DECISION_QUESTION_ID)
.and_then(request_user_input_answer_to_elicitation_action)
.unwrap_or(ElicitationAction::Cancel);
match action {
ElicitationAction::Accept => {
let content = if elicitation.requested_schema.is_some() {
match build_elicitation_content_from_response(
&response,
elicitation.requested_schema.as_ref(),
) {
Ok(Some(value)) => Some(value),
Ok(None) => Some(serde_json::json!({})),
Err(()) => {
return ElicitationResponse {
action: ElicitationAction::Cancel,
content: None,
};
}
}
} else {
Some(serde_json::json!({}))
};
ElicitationResponse { action, content }
}
ElicitationAction::Decline | ElicitationAction::Cancel => ElicitationResponse {
action,
content: None,
},
}
}
fn request_user_input_answer_to_elicitation_action(
answer: &RequestUserInputAnswer,
) -> Option<ElicitationAction> {
answer.answers.iter().find_map(|entry| {
if entry.starts_with(REQUEST_USER_INPUT_NOTE_PREFIX) {
return None;
}
match entry.as_str() {
MCP_ELICITATION_ACCEPT => Some(ElicitationAction::Accept),
MCP_ELICITATION_DECLINE => Some(ElicitationAction::Decline),
MCP_ELICITATION_CANCEL => Some(ElicitationAction::Cancel),
_ => None,
}
})
}
pub struct RequestUserInputHandler;
#[async_trait]

View File

@@ -7,6 +7,7 @@ use crate::protocol::FileChange;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use ts_rs::TS;
/// Proposed execpolicy change to allow commands starting with this prefix.
@@ -102,9 +103,15 @@ pub struct ElicitationRequestEvent {
#[ts(type = "string | number")]
pub id: RequestId,
pub message: String,
// TODO: MCP servers can request we fill out a schema for the elicitation. We don't support
// this yet.
// pub requested_schema: ElicitRequestParamsRequestedSchema,
#[serde(skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub requested_schema: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub elicitation_id: Option<String>,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]

View File

@@ -31,6 +31,8 @@ use codex_otel::OtelManager;
use codex_otel::RuntimeMetricsSummary;
use codex_protocol::ThreadId;
use codex_protocol::account::PlanType;
use codex_protocol::approvals::ElicitationAction;
use codex_protocol::approvals::ElicitationRequestEvent;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
@@ -38,6 +40,8 @@ use codex_protocol::config_types::Settings;
use codex_protocol::items::AgentMessageContent;
use codex_protocol::items::AgentMessageItem;
use codex_protocol::items::TurnItem;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::mcp::RequestId;
use codex_protocol::models::MessagePhase;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::openai_models::ReasoningEffortPreset;
@@ -65,9 +69,12 @@ use codex_protocol::protocol::ExecPolicyAmendment;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::FileChange;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::McpInvocation;
use codex_protocol::protocol::McpStartupCompleteEvent;
use codex_protocol::protocol::McpStartupStatus;
use codex_protocol::protocol::McpStartupUpdateEvent;
use codex_protocol::protocol::McpToolCallBeginEvent;
use codex_protocol::protocol::McpToolCallEndEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::PatchApplyEndEvent;
@@ -3237,6 +3244,173 @@ async fn streaming_final_answer_keeps_task_running_state() {
assert!(!chat.bottom_pane.quit_shortcut_hint_visible());
}
#[tokio::test]
async fn mcp_tool_call_begin_elicitation_end_deferred_during_active_stream() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_agent_message_delta("Generating search response...\n".to_string());
chat.on_commit_tick();
let _ = drain_insert_history(&mut rx);
let call_id = "mcp-call-1".to_string();
let invocation = McpInvocation {
server: "mcp-test".to_string(),
tool: "search".to_string(),
arguments: Some(serde_json::json!({ "query": "foo" })),
};
chat.handle_codex_event(Event {
id: "mcp-begin".into(),
msg: EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id: call_id.clone(),
invocation: invocation.clone(),
}),
});
chat.handle_codex_event(Event {
id: "elicitation".into(),
msg: EventMsg::ElicitationRequest(ElicitationRequestEvent {
server_name: "mcp-test".to_string(),
id: RequestId::String("elicitation-1".to_string()),
message: "Need search details".to_string(),
requested_schema: Some(serde_json::json!({
"type": "object",
"properties": {},
})),
url: Some("https://example.com/elicitation".to_string()),
elicitation_id: Some("elicitation-url-1".to_string()),
}),
});
chat.handle_codex_event(Event {
id: "mcp-end".into(),
msg: EventMsg::McpToolCallEnd(McpToolCallEndEvent {
call_id,
invocation,
duration: std::time::Duration::from_millis(25),
result: Ok(CallToolResult {
content: vec![serde_json::json!({"type":"text","text":"ok"})],
structured_content: None,
is_error: Some(false),
meta: None,
}),
}),
});
assert_eq!(drain_insert_history(&mut rx).len(), 0);
assert!(!chat.interrupts.is_empty());
chat.handle_codex_event(Event {
id: "final".into(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: String::new(),
phase: None,
}),
});
let cells = drain_insert_history(&mut rx);
assert!(
!cells.is_empty(),
"expected queued MCP cell after stream flush"
);
let final_blob = lines_to_single_string(cells.last().unwrap());
assert!(final_blob.contains("Called"));
assert!(final_blob.contains("mcp-test.search"));
assert!(chat.interrupts.is_empty());
assert!(chat.active_cell.is_none());
}
#[tokio::test]
async fn mcp_tool_call_elicitation_can_be_resolved_after_stream_queue_flush() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_agent_message_delta("Running through tool request...\n".to_string());
chat.on_commit_tick();
let _ = drain_insert_history(&mut rx);
let call_id = "mcp-call-2".to_string();
let invocation = McpInvocation {
server: "mcp-test".to_string(),
tool: "search".to_string(),
arguments: Some(serde_json::json!({ "query": "bar" })),
};
chat.handle_codex_event(Event {
id: "mcp-begin-2".into(),
msg: EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id: call_id.clone(),
invocation: invocation.clone(),
}),
});
chat.handle_codex_event(Event {
id: "elicitation-2".into(),
msg: EventMsg::ElicitationRequest(ElicitationRequestEvent {
server_name: "mcp-test".to_string(),
id: RequestId::String("elicitation-2".to_string()),
message: "Need search details".to_string(),
requested_schema: Some(serde_json::json!({
"type": "object",
"properties": {},
})),
url: None,
elicitation_id: None,
}),
});
chat.handle_codex_event(Event {
id: "mcp-end-2".into(),
msg: EventMsg::McpToolCallEnd(McpToolCallEndEvent {
call_id,
invocation,
duration: std::time::Duration::from_millis(33),
result: Ok(CallToolResult {
content: vec![serde_json::json!({"type":"text","text":"ok"})],
structured_content: None,
is_error: Some(false),
meta: None,
}),
}),
});
let _ = drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "final-2".into(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: String::new(),
phase: None,
}),
});
let cells = drain_insert_history(&mut rx);
assert!(
!cells.is_empty(),
"expected queued MCP cell after stream flush"
);
let final_blob = lines_to_single_string(cells.last().unwrap());
assert!(final_blob.contains("Called"));
assert!(final_blob.contains("mcp-test.search"));
let popup = render_bottom_popup(&chat, 80);
assert!(popup.contains("mcp-test needs your approval."));
chat.handle_key_event(KeyEvent::new(KeyCode::Char('y'), KeyModifiers::NONE));
let mut found_op = None;
while let Ok(app_ev) = rx.try_recv() {
if let AppEvent::CodexOp(op) = app_ev {
found_op = Some(op);
break;
}
}
let Op::ResolveElicitation {
server_name,
request_id,
decision,
..
} = found_op.expect("expected ResolveElicitation op")
else {
panic!("unexpected op after resolving elicitation");
};
assert_eq!(server_name, "mcp-test");
assert_eq!(request_id, RequestId::String("elicitation-2".to_string()));
assert_eq!(decision, ElicitationAction::Accept);
}
#[tokio::test]
async fn idle_commit_ticks_do_not_restore_status_without_commentary_completion() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;