Route delegated MCP elicitations back to child session

This commit is contained in:
canvrno-oai
2026-05-12 16:56:32 -07:00
parent 3c3e18c222
commit 862869cbc4
2 changed files with 201 additions and 0 deletions

View File

@@ -4,7 +4,9 @@ use std::sync::Arc;
use async_channel::Receiver;
use async_channel::Sender;
use codex_analytics::GuardianApprovalRequestSource;
use codex_app_server_protocol::McpServerElicitationRequestParams;
use codex_async_utils::OrCancelExt;
use codex_protocol::approvals::ElicitationRequestEvent;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
@@ -326,6 +328,19 @@ async fn forward_events(
)
.await;
}
Event {
msg: EventMsg::ElicitationRequest(event),
..
} => {
handle_mcp_elicitation(
&codex,
&parent_session,
&parent_ctx,
event,
&cancel_token,
)
.await;
}
Event {
id,
msg: EventMsg::McpToolCallBegin(event),
@@ -652,6 +667,99 @@ async fn handle_request_user_input(
let _ = codex.submit(Op::UserInputAnswer { id, response }).await;
}
async fn handle_mcp_elicitation(
codex: &Codex,
parent_session: &Arc<Session>,
parent_ctx: &Arc<TurnContext>,
event: ElicitationRequestEvent,
cancel_token: &CancellationToken,
) {
let ElicitationRequestEvent {
server_name,
id,
request,
..
} = event;
let request = match request.try_into() {
Ok(request) => request,
Err(err) => {
tracing::warn!(
error = %err,
server_name,
request_id = ?id,
"failed to parse delegated MCP elicitation request"
);
let _ = codex
.submit(Op::ResolveElicitation {
server_name,
request_id: id,
decision: codex_protocol::approvals::ElicitationAction::Cancel,
content: None,
meta: None,
})
.await;
return;
}
};
let request_id = match &id {
codex_protocol::mcp::RequestId::String(value) => {
rmcp::model::NumberOrString::String(Arc::from(value.as_str()))
}
codex_protocol::mcp::RequestId::Integer(value) => {
rmcp::model::NumberOrString::Number(*value)
}
};
let response = tokio::select! {
biased;
_ = cancel_token.cancelled() => {
let response = codex_rmcp_client::ElicitationResponse {
action: codex_rmcp_client::ElicitationAction::Cancel,
content: None,
meta: None,
};
let _ = parent_session
.resolve_elicitation(server_name.clone(), request_id.clone(), response.clone())
.await;
Some(response)
}
response = parent_session.request_mcp_server_elicitation(
parent_ctx,
request_id,
McpServerElicitationRequestParams {
thread_id: parent_session.conversation_id.to_string(),
turn_id: Some(parent_ctx.sub_id.clone()),
server_name: server_name.clone(),
request,
},
) => response,
}
.unwrap_or(codex_rmcp_client::ElicitationResponse {
action: codex_rmcp_client::ElicitationAction::Cancel,
content: None,
meta: None,
});
let decision = match response.action {
codex_rmcp_client::ElicitationAction::Accept => {
codex_protocol::approvals::ElicitationAction::Accept
}
codex_rmcp_client::ElicitationAction::Decline => {
codex_protocol::approvals::ElicitationAction::Decline
}
codex_rmcp_client::ElicitationAction::Cancel => {
codex_protocol::approvals::ElicitationAction::Cancel
}
};
let _ = codex
.submit(Op::ResolveElicitation {
server_name,
request_id: id,
decision,
content: response.content,
meta: response.meta,
})
.await;
}
/// Intercepts delegated legacy MCP approval prompts on the RequestUserInput
/// compatibility path and, when guardian is active, answers them
/// programmatically after running the guardian review.

View File

@@ -2,6 +2,8 @@ use super::*;
use crate::mcp_tool_call::MCP_TOOL_APPROVAL_DECLINE_SYNTHETIC;
use crate::mcp_tool_call::MCP_TOOL_APPROVAL_QUESTION_ID_PREFIX;
use async_channel::bounded;
use codex_protocol::approvals::ElicitationRequest;
use codex_protocol::approvals::ElicitationRequestEvent;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::models::NetworkPermissions;
use codex_protocol::models::ResponseItem;
@@ -274,6 +276,97 @@ async fn handle_request_permissions_uses_tool_call_id_for_round_trip() {
);
}
#[tokio::test]
async fn handle_mcp_elicitation_routes_response_back_to_delegate() {
let (parent_session, parent_ctx, rx_events) =
crate::session::tests::make_session_and_context_with_rx().await;
*parent_session.active_turn.lock().await = Some(crate::state::ActiveTurn::default());
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
let (_tx_events, rx_events_child) = bounded(SUBMISSION_CHANNEL_CAPACITY);
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
let codex = Arc::new(Codex {
tx_sub,
rx_event: rx_events_child,
agent_status,
session: Arc::clone(&parent_session),
session_loop_termination: completed_session_loop_termination(),
});
let cancel_token = CancellationToken::new();
let handle = tokio::spawn({
let codex = Arc::clone(&codex);
let parent_session = Arc::clone(&parent_session);
let parent_ctx = Arc::clone(&parent_ctx);
let cancel_token = cancel_token.clone();
async move {
handle_mcp_elicitation(
codex.as_ref(),
&parent_session,
&parent_ctx,
ElicitationRequestEvent {
turn_id: Some("child-turn-1".to_string()),
server_name: "maas_confluence".to_string(),
id: codex_protocol::mcp::RequestId::String("request-1".to_string()),
request: ElicitationRequest::Form {
meta: None,
message: "Allow this request?".to_string(),
requested_schema: serde_json::json!({
"type": "object",
"properties": {},
}),
},
},
&cancel_token,
)
.await;
}
});
let request_event = timeout(Duration::from_secs(1), rx_events.recv())
.await
.expect("elicitation event timed out")
.expect("elicitation event missing");
let EventMsg::ElicitationRequest(request) = request_event.msg else {
panic!("expected ElicitationRequest event");
};
assert_eq!(request.turn_id, Some(parent_ctx.sub_id.clone()));
assert_eq!(request.server_name, "maas_confluence");
parent_session
.resolve_elicitation(
"maas_confluence".to_string(),
rmcp::model::RequestId::String("request-1".into()),
codex_rmcp_client::ElicitationResponse {
action: codex_rmcp_client::ElicitationAction::Accept,
content: Some(serde_json::json!({})),
meta: Some(serde_json::json!({ "persist": "session" })),
},
)
.await
.expect("resolve elicitation");
timeout(Duration::from_secs(1), handle)
.await
.expect("handle_mcp_elicitation hung")
.expect("handle_mcp_elicitation join error");
let submission = timeout(Duration::from_secs(1), rx_sub.recv())
.await
.expect("elicitation response timed out")
.expect("elicitation response missing");
assert_eq!(
submission.op,
Op::ResolveElicitation {
server_name: "maas_confluence".to_string(),
request_id: codex_protocol::mcp::RequestId::String("request-1".to_string()),
decision: codex_protocol::approvals::ElicitationAction::Accept,
content: Some(serde_json::json!({})),
meta: Some(serde_json::json!({ "persist": "session" })),
}
);
}
#[tokio::test]
async fn handle_exec_approval_uses_call_id_for_guardian_review_and_approval_id_for_reply() {
let (parent_session, parent_ctx, rx_events) =