use anyhow::Result; use app_test_support::McpProcess; use app_test_support::create_final_assistant_message_sse_response; use app_test_support::create_mock_responses_server_sequence; use app_test_support::create_request_user_input_sse_response; use app_test_support::to_response; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::UserInput as V2UserInput; use codex_protocol::config_types::CollaborationMode; use codex_protocol::config_types::ModeKind; use codex_protocol::config_types::Settings; use codex_protocol::openai_models::ReasoningEffort; use tokio::time::timeout; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn request_user_input_round_trip() -> Result<()> { let codex_home = tempfile::TempDir::new()?; let responses = vec![ create_request_user_input_sse_response("call1")?, create_final_assistant_message_sse_response("done")?, ]; let server = create_mock_responses_server_sequence(responses).await; create_config_toml(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let thread_start_id = mcp .send_thread_start_request(ThreadStartParams { model: Some("mock-model".to_string()), ..Default::default() }) .await?; let thread_start_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(thread_start_id)), ) .await??; let ThreadStartResponse { thread, .. } = to_response(thread_start_resp)?; let turn_start_id = mcp .send_turn_start_request(TurnStartParams { thread_id: thread.id.clone(), input: vec![V2UserInput::Text { text: "ask something".to_string(), text_elements: Vec::new(), }], model: Some("mock-model".to_string()), effort: Some(ReasoningEffort::Medium), collaboration_mode: Some(CollaborationMode { mode: ModeKind::Plan, settings: Settings { model: "mock-model".to_string(), reasoning_effort: Some(ReasoningEffort::Medium), developer_instructions: None, }, }), ..Default::default() }) .await?; let turn_start_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)), ) .await??; let TurnStartResponse { turn, .. } = to_response(turn_start_resp)?; let server_req = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_request_message(), ) .await??; let ServerRequest::ToolRequestUserInput { request_id, params } = server_req else { panic!("expected ToolRequestUserInput request, got: {server_req:?}"); }; assert_eq!(params.thread_id, thread.id); assert_eq!(params.turn_id, turn.id); assert_eq!(params.item_id, "call1"); assert_eq!(params.questions.len(), 1); mcp.send_response( request_id, serde_json::json!({ "answers": { "confirm_path": { "answers": ["yes"] } } }), ) .await?; timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_notification_message("codex/event/task_complete"), ) .await??; timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_notification_message("turn/completed"), ) .await??; Ok(()) } fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, format!( r#" model = "mock-model" approval_policy = "untrusted" sandbox_mode = "read-only" model_provider = "mock_provider" [features] collaboration_modes = true [model_providers.mock_provider] name = "Mock provider for test" base_url = "{server_uri}/v1" wire_api = "responses" request_max_retries = 0 stream_max_retries = 0 "# ), ) }