mirror of
https://github.com/openai/codex.git
synced 2026-06-03 03:41:58 +00:00
feat: Add focused diagnostics for MCP HTTP send failures (#25013)
Adds failure-only logging for MCP streamable HTTP post_message calls and the underlying reqwest send path, capturing the MCP method/request id, endpoint shape, auth-header presence, timeout/connect classification, and sanitized error source chain without logging headers, bodies, tokens, or full URLs.
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
//! - in a remote environment, that means the remote runtime after the
|
||||
//! orchestrator has forwarded `http/request` over JSON-RPC
|
||||
|
||||
use std::error::Error as StdError;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
@@ -135,15 +136,21 @@ impl ReqwestHttpRequestRunner {
|
||||
}
|
||||
|
||||
let headers = Self::build_headers(params.headers)?;
|
||||
let mut request = self.client.request(method, url).headers(headers);
|
||||
let mut request = self.client.request(method.clone(), url).headers(headers);
|
||||
if let Some(body) = params.body {
|
||||
request = request.body(body.into_inner());
|
||||
}
|
||||
|
||||
let response = request
|
||||
.send()
|
||||
.await
|
||||
.map_err(|error| internal_error(format!("http/request failed: {error}")))?;
|
||||
let response = match request.send().await {
|
||||
Ok(response) => response,
|
||||
Err(error) => {
|
||||
let error_message = error.to_string();
|
||||
log_send_error(&method, error);
|
||||
return Err(internal_error(format!(
|
||||
"http/request failed: {error_message}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let status = response.status().as_u16();
|
||||
let headers = Self::response_headers(response.headers());
|
||||
|
||||
@@ -265,3 +272,26 @@ impl ReqwestHttpRequestRunner {
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn log_send_error(method: &Method, error: reqwest::Error) {
|
||||
let error = error.without_url();
|
||||
let source_chain = error_source_chain(&error);
|
||||
tracing::warn!(
|
||||
http_method = method.as_str(),
|
||||
error_is_timeout = error.is_timeout(),
|
||||
error_is_connect = error.is_connect(),
|
||||
error = %error,
|
||||
error_sources = ?source_chain,
|
||||
"http/request send failed"
|
||||
);
|
||||
}
|
||||
|
||||
fn error_source_chain(error: &reqwest::Error) -> Option<String> {
|
||||
let mut sources = Vec::new();
|
||||
let mut source = error.source();
|
||||
while let Some(error) = source {
|
||||
sources.push(error.to_string());
|
||||
source = error.source();
|
||||
}
|
||||
(!sources.is_empty()).then(|| sources.join(": "))
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ use reqwest::header::CONTENT_TYPE;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderName;
|
||||
use rmcp::model::ClientJsonRpcMessage;
|
||||
use rmcp::model::JsonRpcMessage;
|
||||
use rmcp::model::ServerJsonRpcMessage;
|
||||
use rmcp::transport::streamable_http_client::AuthRequiredError;
|
||||
use rmcp::transport::streamable_http_client::InsufficientScopeError;
|
||||
@@ -88,6 +89,8 @@ impl StreamableHttpClient for StreamableHttpClientAdapter {
|
||||
auth_token: Option<String>,
|
||||
custom_headers: HashMap<HeaderName, reqwest::header::HeaderValue>,
|
||||
) -> std::result::Result<StreamableHttpPostResponse, StreamableHttpError<Self::Error>> {
|
||||
let (mcp_method, mcp_request_id) = client_jsonrpc_message_fields(&message);
|
||||
let has_session_id = session_id.is_some();
|
||||
let mut headers = self.default_headers.clone();
|
||||
headers.extend(custom_headers);
|
||||
self.add_auth_headers(&mut headers);
|
||||
@@ -121,7 +124,8 @@ impl StreamableHttpClient for StreamableHttpClientAdapter {
|
||||
}
|
||||
|
||||
let body = serde_json::to_vec(&message).map_err(StreamableHttpError::Deserialize)?;
|
||||
let (response, mut body_stream) = self
|
||||
let has_authorization_header = headers.contains_key(AUTHORIZATION);
|
||||
let response = self
|
||||
.http_client
|
||||
.http_request_stream(HttpRequestParams {
|
||||
method: "POST".to_string(),
|
||||
@@ -132,9 +136,22 @@ impl StreamableHttpClient for StreamableHttpClientAdapter {
|
||||
request_id: "buffered-request".to_string(),
|
||||
stream_response: true,
|
||||
})
|
||||
.await
|
||||
.map_err(StreamableHttpClientAdapterError::from)
|
||||
.map_err(StreamableHttpError::Client)?;
|
||||
.await;
|
||||
let (response, mut body_stream) = match response {
|
||||
Ok(response) => response,
|
||||
Err(error) => {
|
||||
log_post_message_http_error(
|
||||
&uri,
|
||||
mcp_method.as_deref(),
|
||||
mcp_request_id.as_deref(),
|
||||
has_session_id,
|
||||
has_authorization_header,
|
||||
);
|
||||
return Err(StreamableHttpError::Client(
|
||||
StreamableHttpClientAdapterError::from(error),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if response.status == StatusCode::NOT_FOUND.as_u16() && session_id.is_some() {
|
||||
return Err(StreamableHttpError::Client(
|
||||
@@ -354,6 +371,50 @@ fn body_preview(body: impl Into<String>) -> String {
|
||||
body_preview
|
||||
}
|
||||
|
||||
fn client_jsonrpc_message_fields(
|
||||
message: &ClientJsonRpcMessage,
|
||||
) -> (Option<String>, Option<String>) {
|
||||
match message {
|
||||
JsonRpcMessage::Request(request) => (
|
||||
Some(request.request.method().to_string()),
|
||||
Some(request.id.to_string()),
|
||||
),
|
||||
JsonRpcMessage::Response(response) => (None, Some(response.id.to_string())),
|
||||
JsonRpcMessage::Notification(_) => (None, None),
|
||||
JsonRpcMessage::Error(error) => (None, error.id.as_ref().map(ToString::to_string)),
|
||||
}
|
||||
}
|
||||
|
||||
fn log_post_message_http_error(
|
||||
uri: &str,
|
||||
mcp_method: Option<&str>,
|
||||
mcp_request_id: Option<&str>,
|
||||
has_session_id: bool,
|
||||
has_authorization_header: bool,
|
||||
) {
|
||||
let parsed_url = reqwest::Url::parse(uri).ok();
|
||||
tracing::warn!(
|
||||
endpoint_scheme = parsed_url
|
||||
.as_ref()
|
||||
.map(reqwest::Url::scheme)
|
||||
.unwrap_or("<invalid>"),
|
||||
endpoint_host = parsed_url
|
||||
.as_ref()
|
||||
.and_then(reqwest::Url::host_str)
|
||||
.unwrap_or("<invalid>"),
|
||||
endpoint_path = parsed_url
|
||||
.as_ref()
|
||||
.map(reqwest::Url::path)
|
||||
.unwrap_or("<invalid>"),
|
||||
endpoint_has_query = parsed_url.as_ref().is_some_and(|url| url.query().is_some()),
|
||||
mcp_method = mcp_method.unwrap_or("<none>"),
|
||||
mcp_request_id = mcp_request_id.unwrap_or("<none>"),
|
||||
has_session_id = has_session_id,
|
||||
has_authorization_header = has_authorization_header,
|
||||
"streamable HTTP post_message failed"
|
||||
);
|
||||
}
|
||||
|
||||
fn insert_header<Error>(
|
||||
headers: &mut HeaderMap,
|
||||
name: HeaderName,
|
||||
|
||||
Reference in New Issue
Block a user