From 862869cbc4051e5442afdcdb01a3019740868ac7 Mon Sep 17 00:00:00 2001 From: canvrno-oai Date: Tue, 12 May 2026 16:56:32 -0700 Subject: [PATCH] Route delegated MCP elicitations back to child session --- codex-rs/core/src/codex_delegate.rs | 108 ++++++++++++++++++++++ codex-rs/core/src/codex_delegate_tests.rs | 93 +++++++++++++++++++ 2 files changed, 201 insertions(+) diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 493771e28c..9595a01633 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -4,7 +4,9 @@ use std::sync::Arc; use async_channel::Receiver; use async_channel::Sender; use codex_analytics::GuardianApprovalRequestSource; +use codex_app_server_protocol::McpServerElicitationRequestParams; use codex_async_utils::OrCancelExt; +use codex_protocol::approvals::ElicitationRequestEvent; use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; @@ -326,6 +328,19 @@ async fn forward_events( ) .await; } + Event { + msg: EventMsg::ElicitationRequest(event), + .. + } => { + handle_mcp_elicitation( + &codex, + &parent_session, + &parent_ctx, + event, + &cancel_token, + ) + .await; + } Event { id, msg: EventMsg::McpToolCallBegin(event), @@ -652,6 +667,99 @@ async fn handle_request_user_input( let _ = codex.submit(Op::UserInputAnswer { id, response }).await; } +async fn handle_mcp_elicitation( + codex: &Codex, + parent_session: &Arc, + parent_ctx: &Arc, + event: ElicitationRequestEvent, + cancel_token: &CancellationToken, +) { + let ElicitationRequestEvent { + server_name, + id, + request, + .. + } = event; + let request = match request.try_into() { + Ok(request) => request, + Err(err) => { + tracing::warn!( + error = %err, + server_name, + request_id = ?id, + "failed to parse delegated MCP elicitation request" + ); + let _ = codex + .submit(Op::ResolveElicitation { + server_name, + request_id: id, + decision: codex_protocol::approvals::ElicitationAction::Cancel, + content: None, + meta: None, + }) + .await; + return; + } + }; + let request_id = match &id { + codex_protocol::mcp::RequestId::String(value) => { + rmcp::model::NumberOrString::String(Arc::from(value.as_str())) + } + codex_protocol::mcp::RequestId::Integer(value) => { + rmcp::model::NumberOrString::Number(*value) + } + }; + let response = tokio::select! { + biased; + _ = cancel_token.cancelled() => { + let response = codex_rmcp_client::ElicitationResponse { + action: codex_rmcp_client::ElicitationAction::Cancel, + content: None, + meta: None, + }; + let _ = parent_session + .resolve_elicitation(server_name.clone(), request_id.clone(), response.clone()) + .await; + Some(response) + } + response = parent_session.request_mcp_server_elicitation( + parent_ctx, + request_id, + McpServerElicitationRequestParams { + thread_id: parent_session.conversation_id.to_string(), + turn_id: Some(parent_ctx.sub_id.clone()), + server_name: server_name.clone(), + request, + }, + ) => response, + } + .unwrap_or(codex_rmcp_client::ElicitationResponse { + action: codex_rmcp_client::ElicitationAction::Cancel, + content: None, + meta: None, + }); + let decision = match response.action { + codex_rmcp_client::ElicitationAction::Accept => { + codex_protocol::approvals::ElicitationAction::Accept + } + codex_rmcp_client::ElicitationAction::Decline => { + codex_protocol::approvals::ElicitationAction::Decline + } + codex_rmcp_client::ElicitationAction::Cancel => { + codex_protocol::approvals::ElicitationAction::Cancel + } + }; + let _ = codex + .submit(Op::ResolveElicitation { + server_name, + request_id: id, + decision, + content: response.content, + meta: response.meta, + }) + .await; +} + /// Intercepts delegated legacy MCP approval prompts on the RequestUserInput /// compatibility path and, when guardian is active, answers them /// programmatically after running the guardian review. diff --git a/codex-rs/core/src/codex_delegate_tests.rs b/codex-rs/core/src/codex_delegate_tests.rs index 66cde8d1ea..7eb5be8b8f 100644 --- a/codex-rs/core/src/codex_delegate_tests.rs +++ b/codex-rs/core/src/codex_delegate_tests.rs @@ -2,6 +2,8 @@ use super::*; use crate::mcp_tool_call::MCP_TOOL_APPROVAL_DECLINE_SYNTHETIC; use crate::mcp_tool_call::MCP_TOOL_APPROVAL_QUESTION_ID_PREFIX; use async_channel::bounded; +use codex_protocol::approvals::ElicitationRequest; +use codex_protocol::approvals::ElicitationRequestEvent; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::models::NetworkPermissions; use codex_protocol::models::ResponseItem; @@ -274,6 +276,97 @@ async fn handle_request_permissions_uses_tool_call_id_for_round_trip() { ); } +#[tokio::test] +async fn handle_mcp_elicitation_routes_response_back_to_delegate() { + let (parent_session, parent_ctx, rx_events) = + crate::session::tests::make_session_and_context_with_rx().await; + *parent_session.active_turn.lock().await = Some(crate::state::ActiveTurn::default()); + + let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY); + let (_tx_events, rx_events_child) = bounded(SUBMISSION_CHANNEL_CAPACITY); + let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit); + let codex = Arc::new(Codex { + tx_sub, + rx_event: rx_events_child, + agent_status, + session: Arc::clone(&parent_session), + session_loop_termination: completed_session_loop_termination(), + }); + + let cancel_token = CancellationToken::new(); + let handle = tokio::spawn({ + let codex = Arc::clone(&codex); + let parent_session = Arc::clone(&parent_session); + let parent_ctx = Arc::clone(&parent_ctx); + let cancel_token = cancel_token.clone(); + async move { + handle_mcp_elicitation( + codex.as_ref(), + &parent_session, + &parent_ctx, + ElicitationRequestEvent { + turn_id: Some("child-turn-1".to_string()), + server_name: "maas_confluence".to_string(), + id: codex_protocol::mcp::RequestId::String("request-1".to_string()), + request: ElicitationRequest::Form { + meta: None, + message: "Allow this request?".to_string(), + requested_schema: serde_json::json!({ + "type": "object", + "properties": {}, + }), + }, + }, + &cancel_token, + ) + .await; + } + }); + + let request_event = timeout(Duration::from_secs(1), rx_events.recv()) + .await + .expect("elicitation event timed out") + .expect("elicitation event missing"); + let EventMsg::ElicitationRequest(request) = request_event.msg else { + panic!("expected ElicitationRequest event"); + }; + assert_eq!(request.turn_id, Some(parent_ctx.sub_id.clone())); + assert_eq!(request.server_name, "maas_confluence"); + + parent_session + .resolve_elicitation( + "maas_confluence".to_string(), + rmcp::model::RequestId::String("request-1".into()), + codex_rmcp_client::ElicitationResponse { + action: codex_rmcp_client::ElicitationAction::Accept, + content: Some(serde_json::json!({})), + meta: Some(serde_json::json!({ "persist": "session" })), + }, + ) + .await + .expect("resolve elicitation"); + + timeout(Duration::from_secs(1), handle) + .await + .expect("handle_mcp_elicitation hung") + .expect("handle_mcp_elicitation join error"); + + let submission = timeout(Duration::from_secs(1), rx_sub.recv()) + .await + .expect("elicitation response timed out") + .expect("elicitation response missing"); + assert_eq!( + submission.op, + Op::ResolveElicitation { + server_name: "maas_confluence".to_string(), + request_id: codex_protocol::mcp::RequestId::String("request-1".to_string()), + decision: codex_protocol::approvals::ElicitationAction::Accept, + content: Some(serde_json::json!({})), + meta: Some(serde_json::json!({ "persist": "session" })), + } + ); +} + #[tokio::test] async fn handle_exec_approval_uses_call_id_for_guardian_review_and_approval_id_for_reply() { let (parent_session, parent_ctx, rx_events) =