From 5e60e38d5b22a6bf32f883b6e375d1a7491864db Mon Sep 17 00:00:00 2001 From: canvrno-oai Date: Wed, 13 May 2026 14:47:53 -0700 Subject: [PATCH] Use unique parent_request_id to prevent collisions, update tests and cleanup --- codex-rs/core/src/codex_delegate.rs | 19 ++- codex-rs/core/src/codex_delegate_tests.rs | 150 ++++++++++++++++++++-- 2 files changed, 147 insertions(+), 22 deletions(-) diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index b225cdd2dd..95ba312070 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -701,14 +701,6 @@ async fn handle_mcp_elicitation( return; } }; - let request_id = match &id { - codex_protocol::mcp::RequestId::String(value) => { - rmcp::model::NumberOrString::String(Arc::from(value.as_str())) - } - codex_protocol::mcp::RequestId::Integer(value) => { - rmcp::model::NumberOrString::Number(*value) - } - }; let response = if parent_session .services .mcp_connection_manager @@ -723,6 +715,9 @@ async fn handle_mcp_elicitation( meta: None, }) } else { + // Child MCP request IDs are child-local, so route through a unique parent-side ID. + let parent_request_id = + rmcp::model::RequestId::String(uuid::Uuid::new_v4().to_string().into()); tokio::select! { biased; _ = cancel_token.cancelled() => { @@ -732,13 +727,17 @@ async fn handle_mcp_elicitation( meta: None, }; let _ = parent_session - .resolve_elicitation(server_name.clone(), request_id.clone(), response.clone()) + .resolve_elicitation( + server_name.clone(), + parent_request_id.clone(), + response.clone(), + ) .await; Some(response) } response = parent_session.request_mcp_server_elicitation( parent_ctx, - request_id.clone(), + parent_request_id, McpServerElicitationRequestParams { thread_id: parent_session.conversation_id.to_string(), turn_id: Some(parent_ctx.sub_id.clone()), diff --git a/codex-rs/core/src/codex_delegate_tests.rs b/codex-rs/core/src/codex_delegate_tests.rs index 7eb5be8b8f..05da600bda 100644 --- a/codex-rs/core/src/codex_delegate_tests.rs +++ b/codex-rs/core/src/codex_delegate_tests.rs @@ -333,18 +333,15 @@ async fn handle_mcp_elicitation_routes_response_back_to_delegate() { assert_eq!(request.turn_id, Some(parent_ctx.sub_id.clone())); assert_eq!(request.server_name, "maas_confluence"); - parent_session - .resolve_elicitation( - "maas_confluence".to_string(), - rmcp::model::RequestId::String("request-1".into()), - codex_rmcp_client::ElicitationResponse { - action: codex_rmcp_client::ElicitationAction::Accept, - content: Some(serde_json::json!({})), - meta: Some(serde_json::json!({ "persist": "session" })), - }, - ) - .await - .expect("resolve elicitation"); + crate::session::handlers::resolve_elicitation( + &parent_session, + "maas_confluence".to_string(), + request.id, + codex_protocol::approvals::ElicitationAction::Accept, + Some(serde_json::json!({})), + Some(serde_json::json!({ "persist": "session" })), + ) + .await; timeout(Duration::from_secs(1), handle) .await @@ -367,6 +364,135 @@ async fn handle_mcp_elicitation_routes_response_back_to_delegate() { ); } +#[tokio::test] +async fn handle_mcp_elicitation_namespaces_routed_parent_request_ids() { + let (parent_session, parent_ctx, rx_events) = + crate::session::tests::make_session_and_context_with_rx().await; + *parent_session.active_turn.lock().await = Some(crate::state::ActiveTurn::default()); + + let make_codex = || { + let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY); + let (_tx_events, rx_events_child) = bounded(SUBMISSION_CHANNEL_CAPACITY); + let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit); + let codex = Arc::new(Codex { + tx_sub, + rx_event: rx_events_child, + agent_status, + session: Arc::clone(&parent_session), + session_loop_termination: completed_session_loop_termination(), + }); + (codex, rx_sub) + }; + let (first_codex, first_rx_sub) = make_codex(); + let (second_codex, second_rx_sub) = make_codex(); + let cancel_token = CancellationToken::new(); + let request = ElicitationRequest::Form { + meta: None, + message: "Allow this request?".to_string(), + requested_schema: serde_json::json!({ + "type": "object", + "properties": {}, + }), + }; + let route_elicitation = |codex: Arc, child_turn_id: &'static str, request| { + let parent_session = Arc::clone(&parent_session); + let parent_ctx = Arc::clone(&parent_ctx); + let cancel_token = cancel_token.clone(); + tokio::spawn(async move { + handle_mcp_elicitation( + codex.as_ref(), + &parent_session, + &parent_ctx, + ElicitationRequestEvent { + turn_id: Some(child_turn_id.to_string()), + server_name: "maas_confluence".to_string(), + id: codex_protocol::mcp::RequestId::Integer(0), + request, + }, + &cancel_token, + ) + .await; + }) + }; + let recv_routed_request = || async { + let request_event = timeout(Duration::from_secs(1), rx_events.recv()) + .await + .expect("elicitation event timed out") + .expect("elicitation event missing"); + let EventMsg::ElicitationRequest(request) = request_event.msg else { + panic!("expected ElicitationRequest event"); + }; + request + }; + let first_handle = route_elicitation(Arc::clone(&first_codex), "child-turn-1", request.clone()); + let first_request = recv_routed_request().await; + let second_handle = route_elicitation(Arc::clone(&second_codex), "child-turn-2", request); + let second_request = recv_routed_request().await; + + assert_eq!(first_request.turn_id, Some(parent_ctx.sub_id.clone())); + assert_eq!(second_request.turn_id, Some(parent_ctx.sub_id.clone())); + assert_eq!(first_request.server_name, "maas_confluence"); + assert_eq!(second_request.server_name, "maas_confluence"); + assert_ne!(first_request.id, second_request.id); + + crate::session::handlers::resolve_elicitation( + &parent_session, + "maas_confluence".to_string(), + first_request.id, + codex_protocol::approvals::ElicitationAction::Decline, + None, + None, + ) + .await; + crate::session::handlers::resolve_elicitation( + &parent_session, + "maas_confluence".to_string(), + second_request.id, + codex_protocol::approvals::ElicitationAction::Accept, + Some(serde_json::json!({})), + None, + ) + .await; + + timeout(Duration::from_secs(1), first_handle) + .await + .expect("first handle_mcp_elicitation hung") + .expect("first handle_mcp_elicitation join error"); + timeout(Duration::from_secs(1), second_handle) + .await + .expect("second handle_mcp_elicitation hung") + .expect("second handle_mcp_elicitation join error"); + + assert_eq!( + timeout(Duration::from_secs(1), first_rx_sub.recv()) + .await + .expect("first elicitation response timed out") + .expect("first elicitation response missing") + .op, + Op::ResolveElicitation { + server_name: "maas_confluence".to_string(), + request_id: codex_protocol::mcp::RequestId::Integer(0), + decision: codex_protocol::approvals::ElicitationAction::Decline, + content: None, + meta: None, + } + ); + assert_eq!( + timeout(Duration::from_secs(1), second_rx_sub.recv()) + .await + .expect("second elicitation response timed out") + .expect("second elicitation response missing") + .op, + Op::ResolveElicitation { + server_name: "maas_confluence".to_string(), + request_id: codex_protocol::mcp::RequestId::Integer(0), + decision: codex_protocol::approvals::ElicitationAction::Accept, + content: Some(serde_json::json!({})), + meta: None, + } + ); +} + #[tokio::test] async fn handle_exec_approval_uses_call_id_for_guardian_review_and_approval_id_for_reply() { let (parent_session, parent_ctx, rx_events) =