use std::collections::VecDeque; use std::path::Path; use std::process::Stdio; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; use tokio::io::AsyncBufReadExt; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio::process::Child; use tokio::process::ChildStdin; use tokio::process::ChildStdout; use anyhow::Context; use codex_app_server_protocol::AppsListParams; use codex_app_server_protocol::CancelLoginAccountParams; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::CollaborationModeListParams; use codex_app_server_protocol::CommandExecParams; use codex_app_server_protocol::CommandExecResizeParams; use codex_app_server_protocol::CommandExecTerminateParams; use codex_app_server_protocol::CommandExecWriteParams; use codex_app_server_protocol::ConfigBatchWriteParams; use codex_app_server_protocol::ConfigReadParams; use codex_app_server_protocol::ConfigValueWriteParams; use codex_app_server_protocol::ExperimentalFeatureListParams; use codex_app_server_protocol::FeedbackUploadParams; use codex_app_server_protocol::FsCopyParams; use codex_app_server_protocol::FsCreateDirectoryParams; use codex_app_server_protocol::FsGetMetadataParams; use codex_app_server_protocol::FsReadDirectoryParams; use codex_app_server_protocol::FsReadFileParams; use codex_app_server_protocol::FsRemoveParams; use codex_app_server_protocol::FsWriteFileParams; use codex_app_server_protocol::GetAccountParams; use codex_app_server_protocol::GetAuthStatusParams; use codex_app_server_protocol::GetConversationSummaryParams; use codex_app_server_protocol::InitializeCapabilities; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::LoginAccountParams; use codex_app_server_protocol::MockExperimentalMethodParams; use codex_app_server_protocol::ModelListParams; use codex_app_server_protocol::PluginInstallParams; use codex_app_server_protocol::PluginListParams; use codex_app_server_protocol::PluginReadParams; use codex_app_server_protocol::PluginUninstallParams; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ReviewStartParams; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::SkillsListParams; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadCompactStartParams; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadLoadedListParams; use codex_app_server_protocol::ThreadMetadataUpdateParams; use codex_app_server_protocol::ThreadReadParams; use codex_app_server_protocol::ThreadRealtimeAppendAudioParams; use codex_app_server_protocol::ThreadRealtimeAppendTextParams; use codex_app_server_protocol::ThreadRealtimeStartParams; use codex_app_server_protocol::ThreadRealtimeStopParams; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadRollbackParams; use codex_app_server_protocol::ThreadSetNameParams; use codex_app_server_protocol::ThreadShellCommandParams; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadUnarchiveParams; use codex_app_server_protocol::ThreadUnsubscribeParams; use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnInterruptParams; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnSteerParams; use codex_app_server_protocol::WindowsSandboxSetupStartParams; use codex_core::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR; use tokio::process::Command; pub struct McpProcess { next_request_id: AtomicI64, /// Retain this child process until the client is dropped. The Tokio runtime /// will make a "best effort" to reap the process after it exits, but it is /// not a guarantee. See the `kill_on_drop` documentation for details. #[allow(dead_code)] process: Child, stdin: Option, stdout: BufReader, pending_messages: VecDeque, } pub const DEFAULT_CLIENT_NAME: &str = "codex-app-server-tests"; impl McpProcess { pub async fn new(codex_home: &Path) -> anyhow::Result { Self::new_with_env_and_args(codex_home, &[], &[]).await } pub async fn new_with_args(codex_home: &Path, args: &[&str]) -> anyhow::Result { Self::new_with_env_and_args(codex_home, &[], args).await } /// Creates a new MCP process, allowing tests to override or remove /// specific environment variables for the child process only. /// /// Pass a tuple of (key, Some(value)) to set/override, or (key, None) to /// remove a variable from the child's environment. pub async fn new_with_env( codex_home: &Path, env_overrides: &[(&str, Option<&str>)], ) -> anyhow::Result { Self::new_with_env_and_args(codex_home, env_overrides, &[]).await } async fn new_with_env_and_args( codex_home: &Path, env_overrides: &[(&str, Option<&str>)], args: &[&str], ) -> anyhow::Result { let program = codex_utils_cargo_bin::cargo_bin("codex-app-server") .context("should find binary for codex-app-server")?; let mut cmd = Command::new(program); cmd.stdin(Stdio::piped()); cmd.stdout(Stdio::piped()); cmd.stderr(Stdio::piped()); cmd.current_dir(codex_home); cmd.env("CODEX_HOME", codex_home); cmd.env("RUST_LOG", "info"); cmd.env_remove(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR); cmd.args(args); for (k, v) in env_overrides { match v { Some(val) => { cmd.env(k, val); } None => { cmd.env_remove(k); } } } let mut process = cmd .kill_on_drop(true) .spawn() .context("codex-mcp-server proc should start")?; let stdin = process .stdin .take() .ok_or_else(|| anyhow::format_err!("mcp should have stdin fd"))?; let stdout = process .stdout .take() .ok_or_else(|| anyhow::format_err!("mcp should have stdout fd"))?; let stdout = BufReader::new(stdout); // Forward child's stderr to our stderr so failures are visible even // when stdout/stderr are captured by the test harness. if let Some(stderr) = process.stderr.take() { let mut stderr_reader = BufReader::new(stderr).lines(); tokio::spawn(async move { while let Ok(Some(line)) = stderr_reader.next_line().await { eprintln!("[mcp stderr] {line}"); } }); } Ok(Self { next_request_id: AtomicI64::new(0), process, stdin: Some(stdin), stdout, pending_messages: VecDeque::new(), }) } /// Performs the initialization handshake with the MCP server. pub async fn initialize(&mut self) -> anyhow::Result<()> { let initialized = self .initialize_with_client_info(ClientInfo { name: DEFAULT_CLIENT_NAME.to_string(), title: None, version: "0.1.0".to_string(), }) .await?; let JSONRPCMessage::Response(_) = initialized else { unreachable!("expected JSONRPCMessage::Response for initialize, got {initialized:?}"); }; Ok(()) } /// Sends initialize with the provided client info and returns the response/error message. pub async fn initialize_with_client_info( &mut self, client_info: ClientInfo, ) -> anyhow::Result { self.initialize_with_capabilities( client_info, Some(InitializeCapabilities { experimental_api: true, ..Default::default() }), ) .await } pub async fn initialize_with_capabilities( &mut self, client_info: ClientInfo, capabilities: Option, ) -> anyhow::Result { self.initialize_with_params(InitializeParams { client_info, capabilities, }) .await } async fn initialize_with_params( &mut self, params: InitializeParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); let request_id = self.send_request("initialize", params).await?; let message = self.read_jsonrpc_message().await?; match message { JSONRPCMessage::Response(response) => { if response.id != RequestId::Integer(request_id) { anyhow::bail!( "initialize response id mismatch: expected {}, got {:?}", request_id, response.id ); } // Send notifications/initialized to ack the response. self.send_notification(ClientNotification::Initialized) .await?; Ok(JSONRPCMessage::Response(response)) } JSONRPCMessage::Error(error) => { if error.id != RequestId::Integer(request_id) { anyhow::bail!( "initialize error id mismatch: expected {}, got {:?}", request_id, error.id ); } Ok(JSONRPCMessage::Error(error)) } JSONRPCMessage::Notification(notification) => { anyhow::bail!("unexpected JSONRPCMessage::Notification: {notification:?}"); } JSONRPCMessage::Request(request) => { anyhow::bail!("unexpected JSONRPCMessage::Request: {request:?}"); } } } /// Send a `getAuthStatus` JSON-RPC request. pub async fn send_get_auth_status_request( &mut self, params: GetAuthStatusParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("getAuthStatus", params).await } /// Send a `getConversationSummary` JSON-RPC request. pub async fn send_get_conversation_summary_request( &mut self, params: GetConversationSummaryParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("getConversationSummary", params).await } /// Send an `account/rateLimits/read` JSON-RPC request. pub async fn send_get_account_rate_limits_request(&mut self) -> anyhow::Result { self.send_request("account/rateLimits/read", /*params*/ None) .await } /// Send an `account/read` JSON-RPC request. pub async fn send_get_account_request( &mut self, params: GetAccountParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("account/read", params).await } /// Send an `account/login/start` JSON-RPC request with ChatGPT auth tokens. pub async fn send_chatgpt_auth_tokens_login_request( &mut self, access_token: String, chatgpt_account_id: String, chatgpt_plan_type: Option, ) -> anyhow::Result { let params = LoginAccountParams::ChatgptAuthTokens { access_token, chatgpt_account_id, chatgpt_plan_type, }; let params = Some(serde_json::to_value(params)?); self.send_request("account/login/start", params).await } /// Send a `feedback/upload` JSON-RPC request. pub async fn send_feedback_upload_request( &mut self, params: FeedbackUploadParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("feedback/upload", params).await } /// Send a `thread/start` JSON-RPC request. pub async fn send_thread_start_request( &mut self, params: ThreadStartParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/start", params).await } /// Send a `thread/resume` JSON-RPC request. pub async fn send_thread_resume_request( &mut self, params: ThreadResumeParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/resume", params).await } /// Send a `thread/fork` JSON-RPC request. pub async fn send_thread_fork_request( &mut self, params: ThreadForkParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/fork", params).await } /// Send a `thread/archive` JSON-RPC request. pub async fn send_thread_archive_request( &mut self, params: ThreadArchiveParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/archive", params).await } /// Send a `thread/name/set` JSON-RPC request. pub async fn send_thread_set_name_request( &mut self, params: ThreadSetNameParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/name/set", params).await } /// Send a `thread/metadata/update` JSON-RPC request. pub async fn send_thread_metadata_update_request( &mut self, params: ThreadMetadataUpdateParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/metadata/update", params).await } /// Send a `thread/unsubscribe` JSON-RPC request. pub async fn send_thread_unsubscribe_request( &mut self, params: ThreadUnsubscribeParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/unsubscribe", params).await } /// Send a `thread/unarchive` JSON-RPC request. pub async fn send_thread_unarchive_request( &mut self, params: ThreadUnarchiveParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/unarchive", params).await } /// Send a `thread/compact/start` JSON-RPC request. pub async fn send_thread_compact_start_request( &mut self, params: ThreadCompactStartParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/compact/start", params).await } /// Send a `thread/shellCommand` JSON-RPC request. pub async fn send_thread_shell_command_request( &mut self, params: ThreadShellCommandParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/shellCommand", params).await } /// Send a `thread/rollback` JSON-RPC request. pub async fn send_thread_rollback_request( &mut self, params: ThreadRollbackParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/rollback", params).await } /// Send a `thread/list` JSON-RPC request. pub async fn send_thread_list_request( &mut self, params: ThreadListParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/list", params).await } /// Send a `thread/loaded/list` JSON-RPC request. pub async fn send_thread_loaded_list_request( &mut self, params: ThreadLoadedListParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/loaded/list", params).await } /// Send a `thread/read` JSON-RPC request. pub async fn send_thread_read_request( &mut self, params: ThreadReadParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/read", params).await } /// Send a `model/list` JSON-RPC request. pub async fn send_list_models_request( &mut self, params: ModelListParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("model/list", params).await } /// Send an `experimentalFeature/list` JSON-RPC request. pub async fn send_experimental_feature_list_request( &mut self, params: ExperimentalFeatureListParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("experimentalFeature/list", params).await } /// Send an `app/list` JSON-RPC request. pub async fn send_apps_list_request(&mut self, params: AppsListParams) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("app/list", params).await } /// Send a `skills/list` JSON-RPC request. pub async fn send_skills_list_request( &mut self, params: SkillsListParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("skills/list", params).await } /// Send a `plugin/install` JSON-RPC request. pub async fn send_plugin_install_request( &mut self, params: PluginInstallParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("plugin/install", params).await } /// Send a `plugin/uninstall` JSON-RPC request. pub async fn send_plugin_uninstall_request( &mut self, params: PluginUninstallParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("plugin/uninstall", params).await } /// Send a `plugin/list` JSON-RPC request. pub async fn send_plugin_list_request( &mut self, params: PluginListParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("plugin/list", params).await } /// Send a `plugin/read` JSON-RPC request. pub async fn send_plugin_read_request( &mut self, params: PluginReadParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("plugin/read", params).await } /// Send a JSON-RPC request with raw params for protocol-level validation tests. pub async fn send_raw_request( &mut self, method: &str, params: Option, ) -> anyhow::Result { self.send_request(method, params).await } /// Send a `collaborationMode/list` JSON-RPC request. pub async fn send_list_collaboration_modes_request( &mut self, params: CollaborationModeListParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("collaborationMode/list", params).await } /// Send a `mock/experimentalMethod` JSON-RPC request. pub async fn send_mock_experimental_method_request( &mut self, params: MockExperimentalMethodParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("mock/experimentalMethod", params).await } /// Send a `turn/start` JSON-RPC request (v2). pub async fn send_turn_start_request( &mut self, params: TurnStartParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("turn/start", params).await } /// Send a `command/exec` JSON-RPC request (v2). pub async fn send_command_exec_request( &mut self, params: CommandExecParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("command/exec", params).await } /// Send a `command/exec/write` JSON-RPC request (v2). pub async fn send_command_exec_write_request( &mut self, params: CommandExecWriteParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("command/exec/write", params).await } /// Send a `command/exec/resize` JSON-RPC request (v2). pub async fn send_command_exec_resize_request( &mut self, params: CommandExecResizeParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("command/exec/resize", params).await } /// Send a `command/exec/terminate` JSON-RPC request (v2). pub async fn send_command_exec_terminate_request( &mut self, params: CommandExecTerminateParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("command/exec/terminate", params).await } /// Send a `turn/interrupt` JSON-RPC request (v2). pub async fn send_turn_interrupt_request( &mut self, params: TurnInterruptParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("turn/interrupt", params).await } /// Send a `thread/realtime/start` JSON-RPC request (v2). pub async fn send_thread_realtime_start_request( &mut self, params: ThreadRealtimeStartParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/realtime/start", params).await } /// Send a `thread/realtime/appendAudio` JSON-RPC request (v2). pub async fn send_thread_realtime_append_audio_request( &mut self, params: ThreadRealtimeAppendAudioParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/realtime/appendAudio", params) .await } /// Send a `thread/realtime/appendText` JSON-RPC request (v2). pub async fn send_thread_realtime_append_text_request( &mut self, params: ThreadRealtimeAppendTextParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/realtime/appendText", params) .await } /// Send a `thread/realtime/stop` JSON-RPC request (v2). pub async fn send_thread_realtime_stop_request( &mut self, params: ThreadRealtimeStopParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("thread/realtime/stop", params).await } /// Deterministically clean up an intentionally in-flight turn. /// /// Some tests assert behavior while a turn is still running. Returning from those tests /// without an explicit interrupt + terminal turn notification wait can leave in-flight work /// racing teardown and intermittently show up as `LEAK` in nextest. /// /// In rare races, the turn can also fail or complete on its own after we send /// `turn/interrupt` but before the server emits the interrupt response. The helper treats a /// buffered matching `turn/completed` notification as sufficient terminal cleanup in that /// case so teardown does not flap on timing. pub async fn interrupt_turn_and_wait_for_aborted( &mut self, thread_id: String, turn_id: String, read_timeout: std::time::Duration, ) -> anyhow::Result<()> { let interrupt_request_id = self .send_turn_interrupt_request(TurnInterruptParams { thread_id: thread_id.clone(), turn_id: turn_id.clone(), }) .await?; match tokio::time::timeout( read_timeout, self.read_stream_until_response_message(RequestId::Integer(interrupt_request_id)), ) .await { Ok(result) => { result.with_context(|| "failed while waiting for turn interrupt response")?; } Err(err) => { if self.pending_turn_completed_notification(&thread_id, &turn_id) { return Ok(()); } return Err(err).with_context(|| "timed out waiting for turn interrupt response"); } } match tokio::time::timeout( read_timeout, self.read_stream_until_notification_message("turn/completed"), ) .await { Ok(result) => { result.with_context(|| "failed while waiting for terminal turn notification")?; } Err(err) => { if self.pending_turn_completed_notification(&thread_id, &turn_id) { return Ok(()); } return Err(err) .with_context(|| "timed out waiting for terminal turn notification"); } } Ok(()) } /// Send a `turn/steer` JSON-RPC request (v2). pub async fn send_turn_steer_request( &mut self, params: TurnSteerParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("turn/steer", params).await } /// Send a `review/start` JSON-RPC request (v2). pub async fn send_review_start_request( &mut self, params: ReviewStartParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("review/start", params).await } pub async fn send_windows_sandbox_setup_start_request( &mut self, params: WindowsSandboxSetupStartParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("windowsSandbox/setupStart", params).await } pub async fn send_config_read_request( &mut self, params: ConfigReadParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("config/read", params).await } pub async fn send_config_value_write_request( &mut self, params: ConfigValueWriteParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("config/value/write", params).await } pub async fn send_config_batch_write_request( &mut self, params: ConfigBatchWriteParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("config/batchWrite", params).await } pub async fn send_fs_read_file_request( &mut self, params: FsReadFileParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("fs/readFile", params).await } pub async fn send_fs_write_file_request( &mut self, params: FsWriteFileParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("fs/writeFile", params).await } pub async fn send_fs_create_directory_request( &mut self, params: FsCreateDirectoryParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("fs/createDirectory", params).await } pub async fn send_fs_get_metadata_request( &mut self, params: FsGetMetadataParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("fs/getMetadata", params).await } pub async fn send_fs_read_directory_request( &mut self, params: FsReadDirectoryParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("fs/readDirectory", params).await } pub async fn send_fs_remove_request(&mut self, params: FsRemoveParams) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("fs/remove", params).await } pub async fn send_fs_copy_request(&mut self, params: FsCopyParams) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("fs/copy", params).await } /// Send an `account/logout` JSON-RPC request. pub async fn send_logout_account_request(&mut self) -> anyhow::Result { self.send_request("account/logout", /*params*/ None).await } /// Send an `account/login/start` JSON-RPC request for API key login. pub async fn send_login_account_api_key_request( &mut self, api_key: &str, ) -> anyhow::Result { let params = serde_json::json!({ "type": "apiKey", "apiKey": api_key, }); self.send_request("account/login/start", Some(params)).await } /// Send an `account/login/start` JSON-RPC request for ChatGPT login. pub async fn send_login_account_chatgpt_request(&mut self) -> anyhow::Result { let params = serde_json::json!({ "type": "chatgpt" }); self.send_request("account/login/start", Some(params)).await } /// Send an `account/login/cancel` JSON-RPC request. pub async fn send_cancel_login_account_request( &mut self, params: CancelLoginAccountParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("account/login/cancel", params).await } /// Send a `fuzzyFileSearch` JSON-RPC request. pub async fn send_fuzzy_file_search_request( &mut self, query: &str, roots: Vec, cancellation_token: Option, ) -> anyhow::Result { let mut params = serde_json::json!({ "query": query, "roots": roots, }); if let Some(token) = cancellation_token { params["cancellationToken"] = serde_json::json!(token); } self.send_request("fuzzyFileSearch", Some(params)).await } pub async fn send_fuzzy_file_search_session_start_request( &mut self, session_id: &str, roots: Vec, ) -> anyhow::Result { let params = serde_json::json!({ "sessionId": session_id, "roots": roots, }); self.send_request("fuzzyFileSearch/sessionStart", Some(params)) .await } pub async fn start_fuzzy_file_search_session( &mut self, session_id: &str, roots: Vec, ) -> anyhow::Result { let request_id = self .send_fuzzy_file_search_session_start_request(session_id, roots) .await?; self.read_stream_until_response_message(RequestId::Integer(request_id)) .await } pub async fn send_fuzzy_file_search_session_update_request( &mut self, session_id: &str, query: &str, ) -> anyhow::Result { let params = serde_json::json!({ "sessionId": session_id, "query": query, }); self.send_request("fuzzyFileSearch/sessionUpdate", Some(params)) .await } pub async fn update_fuzzy_file_search_session( &mut self, session_id: &str, query: &str, ) -> anyhow::Result { let request_id = self .send_fuzzy_file_search_session_update_request(session_id, query) .await?; self.read_stream_until_response_message(RequestId::Integer(request_id)) .await } pub async fn send_fuzzy_file_search_session_stop_request( &mut self, session_id: &str, ) -> anyhow::Result { let params = serde_json::json!({ "sessionId": session_id, }); self.send_request("fuzzyFileSearch/sessionStop", Some(params)) .await } pub async fn stop_fuzzy_file_search_session( &mut self, session_id: &str, ) -> anyhow::Result { let request_id = self .send_fuzzy_file_search_session_stop_request(session_id) .await?; self.read_stream_until_response_message(RequestId::Integer(request_id)) .await } async fn send_request( &mut self, method: &str, params: Option, ) -> anyhow::Result { let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); let message = JSONRPCMessage::Request(JSONRPCRequest { id: RequestId::Integer(request_id), method: method.to_string(), params, trace: None, }); self.send_jsonrpc_message(message).await?; Ok(request_id) } pub async fn send_response( &mut self, id: RequestId, result: serde_json::Value, ) -> anyhow::Result<()> { self.send_jsonrpc_message(JSONRPCMessage::Response(JSONRPCResponse { id, result })) .await } pub async fn send_error( &mut self, id: RequestId, error: JSONRPCErrorError, ) -> anyhow::Result<()> { self.send_jsonrpc_message(JSONRPCMessage::Error(JSONRPCError { id, error })) .await } pub async fn send_notification( &mut self, notification: ClientNotification, ) -> anyhow::Result<()> { let value = serde_json::to_value(notification)?; self.send_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification { method: value .get("method") .and_then(|m| m.as_str()) .ok_or_else(|| anyhow::format_err!("notification missing method field"))? .to_string(), params: value.get("params").cloned(), })) .await } async fn send_jsonrpc_message(&mut self, message: JSONRPCMessage) -> anyhow::Result<()> { eprintln!("writing message to stdin: {message:?}"); let Some(stdin) = self.stdin.as_mut() else { anyhow::bail!("mcp stdin closed"); }; let payload = serde_json::to_string(&message)?; stdin.write_all(payload.as_bytes()).await?; stdin.write_all(b"\n").await?; stdin.flush().await?; Ok(()) } async fn read_jsonrpc_message(&mut self) -> anyhow::Result { let mut line = String::new(); self.stdout.read_line(&mut line).await?; let message = serde_json::from_str::(&line)?; eprintln!("read message from stdout: {message:?}"); Ok(message) } pub async fn read_stream_until_request_message(&mut self) -> anyhow::Result { eprintln!("in read_stream_until_request_message()"); let message = self .read_stream_until_message(|message| matches!(message, JSONRPCMessage::Request(_))) .await?; let JSONRPCMessage::Request(jsonrpc_request) = message else { unreachable!("expected JSONRPCMessage::Request, got {message:?}"); }; jsonrpc_request .try_into() .with_context(|| "failed to deserialize ServerRequest from JSONRPCRequest") } pub async fn read_stream_until_response_message( &mut self, request_id: RequestId, ) -> anyhow::Result { eprintln!("in read_stream_until_response_message({request_id:?})"); let message = self .read_stream_until_message(|message| { Self::message_request_id(message) == Some(&request_id) }) .await?; let JSONRPCMessage::Response(response) = message else { unreachable!("expected JSONRPCMessage::Response, got {message:?}"); }; Ok(response) } pub async fn read_stream_until_error_message( &mut self, request_id: RequestId, ) -> anyhow::Result { let message = self .read_stream_until_message(|message| { Self::message_request_id(message) == Some(&request_id) }) .await?; let JSONRPCMessage::Error(err) = message else { unreachable!("expected JSONRPCMessage::Error, got {message:?}"); }; Ok(err) } pub async fn read_stream_until_notification_message( &mut self, method: &str, ) -> anyhow::Result { eprintln!("in read_stream_until_notification_message({method})"); let message = self .read_stream_until_message(|message| { matches!( message, JSONRPCMessage::Notification(notification) if notification.method == method ) }) .await?; let JSONRPCMessage::Notification(notification) = message else { unreachable!("expected JSONRPCMessage::Notification, got {message:?}"); }; Ok(notification) } pub async fn read_stream_until_matching_notification( &mut self, description: &str, predicate: F, ) -> anyhow::Result where F: Fn(&JSONRPCNotification) -> bool, { eprintln!("in read_stream_until_matching_notification({description})"); let message = self .read_stream_until_message(|message| { matches!( message, JSONRPCMessage::Notification(notification) if predicate(notification) ) }) .await?; let JSONRPCMessage::Notification(notification) = message else { unreachable!("expected JSONRPCMessage::Notification, got {message:?}"); }; Ok(notification) } pub async fn read_next_message(&mut self) -> anyhow::Result { self.read_stream_until_message(|_| true).await } /// Clears any buffered messages so future reads only consider new stream items. /// /// We call this when e.g. we want to validate against the next turn and no longer care about /// messages buffered from the prior turn. pub fn clear_message_buffer(&mut self) { self.pending_messages.clear(); } pub fn pending_notification_methods(&self) -> Vec { self.pending_messages .iter() .filter_map(|message| match message { JSONRPCMessage::Notification(notification) => Some(notification.method.clone()), _ => None, }) .collect() } /// Reads the stream until a message matches `predicate`, buffering any non-matching messages /// for later reads. async fn read_stream_until_message(&mut self, predicate: F) -> anyhow::Result where F: Fn(&JSONRPCMessage) -> bool, { if let Some(message) = self.take_pending_message(&predicate) { return Ok(message); } loop { let message = self.read_jsonrpc_message().await?; if predicate(&message) { return Ok(message); } self.pending_messages.push_back(message); } } fn take_pending_message(&mut self, predicate: &F) -> Option where F: Fn(&JSONRPCMessage) -> bool, { if let Some(pos) = self.pending_messages.iter().position(predicate) { return self.pending_messages.remove(pos); } None } fn pending_turn_completed_notification(&self, thread_id: &str, turn_id: &str) -> bool { self.pending_messages.iter().any(|message| { let JSONRPCMessage::Notification(notification) = message else { return false; }; if notification.method != "turn/completed" { return false; } let Some(params) = notification.params.as_ref() else { return false; }; let Ok(payload) = serde_json::from_value::(params.clone()) else { return false; }; payload.thread_id == thread_id && payload.turn.id == turn_id }) } fn message_request_id(message: &JSONRPCMessage) -> Option<&RequestId> { match message { JSONRPCMessage::Request(request) => Some(&request.id), JSONRPCMessage::Response(response) => Some(&response.id), JSONRPCMessage::Error(err) => Some(&err.id), JSONRPCMessage::Notification(_) => None, } } } impl Drop for McpProcess { fn drop(&mut self) { // These tests spawn a `codex-app-server` child process. // // We keep that child alive for the test and rely on Tokio's `kill_on_drop(true)` when this // helper is dropped. Tokio documents kill-on-drop as best-effort: dropping requests // termination, but it does not guarantee the child has fully exited and been reaped before // teardown continues. // // That makes cleanup timing nondeterministic. Leak detection can occasionally observe the // child still alive at teardown and report `LEAK`, which makes the test flaky. // // Drop can't be async, so we do a bounded synchronous cleanup: // // 1. Close stdin to request a graceful shutdown via EOF. // 2. Poll briefly for graceful exit. // 3. If still alive, request termination with `start_kill()`. // 4. Poll `try_wait()` until the OS reports the child exited, with a short timeout. drop(self.stdin.take()); let graceful_start = std::time::Instant::now(); let graceful_timeout = std::time::Duration::from_millis(200); while graceful_start.elapsed() < graceful_timeout { match self.process.try_wait() { Ok(Some(_)) => return, Ok(None) => std::thread::sleep(std::time::Duration::from_millis(5)), Err(_) => return, } } let _ = self.process.start_kill(); let start = std::time::Instant::now(); let timeout = std::time::Duration::from_secs(5); while start.elapsed() < timeout { match self.process.try_wait() { Ok(Some(_)) => return, Ok(None) => std::thread::sleep(std::time::Duration::from_millis(10)), Err(_) => return, } } } }