[2/4] Implement executor HTTP request runner (#18582)

### Why
Remote streamable HTTP MCP needs the executor to perform ordinary HTTP
requests on the executor side. This keeps network placement aligned with
`experimental_environment = "remote"` without adding MCP-specific
executor APIs.

### What
- Add an executor-side `http/request` runner backed by `reqwest`.
- Validate request method and URL scheme, preserving the transport
boundary at plain HTTP.
- Return buffered responses for ordinary calls and emit ordered
`http/request/bodyDelta` notifications for streaming responses.
- Register the request handler in the exec-server router.
- Document the runner entrypoint, conversion helpers, body-stream
bridge, notification sender, timeout behavior, and new integration-test
helpers.
- Add exec-server integration tests with the existing websocket harness
and a local TCP HTTP peer for buffered and streamed responses, with
comments spelling out what each test proves and its
setup/exercise/assert phases.

### Stack
1. #18581 protocol
2. #18582 runner
3. #18583 RMCP client
4. #18584 manager wiring and local/remote coverage

### Verification
- `just fmt`
- `cargo check -p codex-exec-server -p codex-rmcp-client --tests`
- `cargo check -p codex-core --test all` compile-only
- `git diff --check`
- Online full CI is running from the `full-ci` branch, including the
remote Rust test job.

Co-authored-by: Codex <noreply@openai.com>

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-22 13:36:34 -07:00
committed by GitHub
parent 18a26d7bbc
commit 9360f267f3
10 changed files with 1169 additions and 105 deletions

View File

@@ -42,6 +42,7 @@ const HTTP_REQUEST_BODY_DELTA_METHOD: &str = "http/request/bodyDelta";
const INITIALIZE_METHOD: &str = "initialize";
const INITIALIZED_METHOD: &str = "initialized";
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
const HTTP_BODY_DELTA_CHANNEL_CAPACITY: u64 = 256;
const OVERFLOWING_BODY_DELTA_FRAMES: u64 = 1_024;
/// What this tests: the buffered HTTP helper always sends a buffered
@@ -51,8 +52,8 @@ async fn http_request_forces_buffered_request_params() -> Result<()> {
// Phase 1: start a fake WebSocket exec-server so the test covers the
// public client connection path without depending on the HTTP runner.
let server = spawn_scripted_exec_server(|mut peer| async move {
// Phase 2: verify the buffered helper strips streaming-only fields
// before it sends the JSON-RPC call.
// Phase 2: verify the buffered helper forces buffered mode before it
// sends the JSON-RPC call.
let (request_id, params) = peer.read_http_request().await?;
assert_eq!(
params,
@@ -62,7 +63,7 @@ async fn http_request_forces_buffered_request_params() -> Result<()> {
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: None,
request_id: "ignored-stream-id".to_string(),
stream_response: false,
}
);
@@ -90,7 +91,7 @@ async fn http_request_forces_buffered_request_params() -> Result<()> {
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("ignored-stream-id".to_string()),
request_id: "ignored-stream-id".to_string(),
stream_response: true,
}),
)
@@ -130,7 +131,7 @@ async fn http_response_body_stream_uses_generated_ids_and_receives_ordered_delta
}],
body: None,
timeout_ms: None,
request_id: Some("http-1".to_string()),
request_id: "http-1".to_string(),
stream_response: true,
}
);
@@ -185,7 +186,7 @@ async fn http_response_body_stream_uses_generated_ids_and_receives_ordered_delta
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("http-2".to_string()),
request_id: "http-2".to_string(),
stream_response: true,
}
);
@@ -214,7 +215,7 @@ async fn http_response_body_stream_uses_generated_ids_and_receives_ordered_delta
}],
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
}),
)
@@ -252,7 +253,7 @@ async fn http_response_body_stream_uses_generated_ids_and_receives_ordered_delta
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
}),
)
@@ -288,7 +289,7 @@ async fn http_response_body_stream_drops_queued_terminal_before_next_generated_i
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("http-1".to_string()),
request_id: "http-1".to_string(),
stream_response: true,
}
);
@@ -321,7 +322,7 @@ async fn http_response_body_stream_drops_queued_terminal_before_next_generated_i
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("http-2".to_string()),
request_id: "http-2".to_string(),
stream_response: true,
}
);
@@ -347,7 +348,7 @@ async fn http_response_body_stream_drops_queued_terminal_before_next_generated_i
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
}),
)
@@ -371,7 +372,7 @@ async fn http_response_body_stream_drops_queued_terminal_before_next_generated_i
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
};
let (reuse_response, _reuse_body_stream) =
@@ -410,7 +411,7 @@ async fn http_response_body_stream_ignores_late_deltas_after_cancelled_request()
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("http-1".to_string()),
request_id: "http-1".to_string(),
stream_response: true,
}
);
@@ -429,7 +430,7 @@ async fn http_response_body_stream_ignores_late_deltas_after_cancelled_request()
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("http-2".to_string()),
request_id: "http-2".to_string(),
stream_response: true,
}
);
@@ -473,7 +474,7 @@ async fn http_response_body_stream_ignores_late_deltas_after_cancelled_request()
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
})
.await;
@@ -494,7 +495,7 @@ async fn http_response_body_stream_ignores_late_deltas_after_cancelled_request()
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
}),
)
@@ -540,7 +541,7 @@ async fn http_response_body_stream_ignores_late_deltas_after_drop() -> Result<()
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("http-1".to_string()),
request_id: "http-1".to_string(),
stream_response: true,
}
);
@@ -579,7 +580,7 @@ async fn http_response_body_stream_ignores_late_deltas_after_drop() -> Result<()
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("http-2".to_string()),
request_id: "http-2".to_string(),
stream_response: true,
}
);
@@ -614,7 +615,7 @@ async fn http_response_body_stream_ignores_late_deltas_after_drop() -> Result<()
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
}),
)
@@ -646,7 +647,7 @@ async fn http_response_body_stream_ignores_late_deltas_after_drop() -> Result<()
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
}),
)
@@ -690,7 +691,7 @@ async fn http_response_body_stream_fails_when_transport_disconnects() -> Result<
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("http-1".to_string()),
request_id: "http-1".to_string(),
stream_response: true,
}
);
@@ -716,7 +717,7 @@ async fn http_response_body_stream_fails_when_transport_disconnects() -> Result<
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
}),
)
@@ -742,6 +743,98 @@ async fn http_response_body_stream_fails_when_transport_disconnects() -> Result<
Ok(())
}
/// What this tests: transport disconnect still records a terminal stream
/// failure even when the client-side body-delta queue is already full.
#[tokio::test]
async fn http_response_body_stream_reports_disconnect_when_queue_is_full() -> Result<()> {
// Phase 1: fill the queued body-delta route exactly to capacity before the
// response headers arrive, then drop the transport without sending EOF.
let server = spawn_scripted_exec_server(|mut peer| async move {
let (request_id, params) = peer.read_http_request().await?;
assert_eq!(
params,
HttpRequestParams {
method: "GET".to_string(),
url: "https://example.test/mcp/disconnect-full-queue".to_string(),
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: "http-1".to_string(),
stream_response: true,
}
);
for seq in 1..=HTTP_BODY_DELTA_CHANNEL_CAPACITY {
peer.write_body_delta(HttpRequestBodyDeltaNotification {
request_id: "http-1".to_string(),
seq,
delta: b"x".to_vec().into(),
done: false,
error: None,
})
.await?;
}
peer.write_response(
request_id,
HttpRequestResponse {
status: 200,
headers: Vec::new(),
body: Vec::new().into(),
},
)
.await
})
.await?;
let client = server.connect_client().await?;
// Phase 2: start the streaming request and receive headers while the
// queue is already full.
let (_response, mut body_stream) = timeout(
TEST_TIMEOUT,
client.http_request_stream(HttpRequestParams {
method: "GET".to_string(),
url: "https://example.test/mcp/disconnect-full-queue".to_string(),
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: "caller-stream-id".to_string(),
stream_response: false,
}),
)
.await
.context("streamed http/request should return headers")??;
// Phase 3: drain the queued chunks and assert the transport disconnect is
// still reported as an error rather than a clean EOF.
let mut chunks = 0;
let error = loop {
match timeout(TEST_TIMEOUT, body_stream.recv())
.await
.context("disconnect should wake the full queued body stream")?
{
Ok(Some(_chunk)) => {
chunks += 1;
}
Ok(None) => bail!("disconnect with a full queue should not look like clean EOF"),
Err(error) => break error,
}
};
assert_eq!(
(
chunks,
error
.to_string()
.starts_with(
"exec-server protocol error: http response stream `http-1` failed: exec-server transport disconnected",
),
),
(HTTP_BODY_DELTA_CHANNEL_CAPACITY as usize, true)
);
drop(client);
server.finish().await?;
Ok(())
}
/// What this tests: body-delta backpressure closes the public body stream as
/// an error rather than letting callers accept a truncated body as clean EOF.
#[tokio::test]
@@ -759,7 +852,7 @@ async fn http_response_body_stream_reports_backpressure_truncation() -> Result<(
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("http-1".to_string()),
request_id: "http-1".to_string(),
stream_response: true,
}
);
@@ -801,7 +894,7 @@ async fn http_response_body_stream_reports_backpressure_truncation() -> Result<(
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: Some("caller-stream-id".to_string()),
request_id: "caller-stream-id".to_string(),
stream_response: false,
}),
)

View File

@@ -0,0 +1,568 @@
#![cfg(unix)]
mod common;
use std::collections::BTreeMap;
use std::io::ErrorKind;
use std::time::Duration;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_exec_server::HttpHeader;
use codex_exec_server::HttpRequestBodyDeltaNotification;
use codex_exec_server::HttpRequestParams;
use codex_exec_server::HttpRequestResponse;
use codex_exec_server::InitializeParams;
use common::exec_server::ExecServerHarness;
use common::exec_server::exec_server;
use pretty_assertions::assert_eq;
use serde::de::DeserializeOwned;
use serde_json::Value;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::oneshot;
use tokio::time::timeout;
/// HTTP request captured by the ad-hoc TCP server in these integration tests.
#[derive(Debug)]
struct CapturedHttpRequest {
stream: TcpStream,
request_line: String,
headers: BTreeMap<String, String>,
body: Vec<u8>,
}
/// What this tests: a real exec-server websocket `http/request` performs one
/// HTTP request through the runner and returns the complete response body in
/// the JSON-RPC response.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_http_request_buffers_response_body() -> anyhow::Result<()> {
// Phase 1: start exec-server and complete the JSON-RPC handshake.
let mut server = exec_server().await?;
initialize_exec_server(&mut server).await?;
// Phase 2: start a local HTTP peer and ask exec-server to POST to it.
let listener = TcpListener::bind("127.0.0.1:0").await?;
let url = format!("http://{}/mcp?case=buffered", listener.local_addr()?);
let http_request_id = server
.send_request(
"http/request",
serde_json::to_value(HttpRequestParams {
method: "POST".to_string(),
url,
headers: vec![HttpHeader {
name: "x-codex-test".to_string(),
value: "buffered".to_string(),
}],
body: Some(b"request-body".to_vec().into()),
timeout_ms: Some(5_000),
request_id: "buffered-request".to_string(),
stream_response: false,
})?,
)
.await?;
// Phase 3: assert the HTTP peer observes the expected method, path,
// headers, and body before returning a fixed-length response.
let captured = accept_http_request(&listener).await?;
assert_eq!(
(
captured.request_line.as_str(),
captured.headers.get("x-codex-test").map(String::as_str),
captured.body.as_slice(),
),
(
"POST /mcp?case=buffered HTTP/1.1",
Some("buffered"),
b"request-body".as_slice(),
)
);
respond_with_status_and_headers(
captured.stream,
"201 Created",
&[("x-mcp-test", "buffered")],
b"response-body",
)
.await?;
// Phase 4: assert exec-server returns status, response headers, and the
// full response body in the JSON-RPC result.
let response: HttpRequestResponse = wait_for_response(&mut server, http_request_id).await?;
assert_eq!(
(
response.status,
response_header(&response.headers, "x-mcp-test"),
response.body.into_inner(),
),
(201, Some("buffered".to_string()), b"response-body".to_vec(),)
);
server.shutdown().await?;
Ok(())
}
/// What this tests: a real exec-server websocket `http/request` can return
/// response headers immediately and stream the response body as ordered
/// `http/request/bodyDelta` notifications.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_http_request_streams_response_body_notifications() -> anyhow::Result<()> {
// Phase 1: start exec-server and complete the JSON-RPC handshake.
let mut server = exec_server().await?;
initialize_exec_server(&mut server).await?;
// Phase 2: start a local HTTP peer and ask exec-server for a streamed GET.
let listener = TcpListener::bind("127.0.0.1:0").await?;
let url = format!("http://{}/mcp?case=streaming", listener.local_addr()?);
let http_request_id = server
.send_request(
"http/request",
serde_json::to_value(HttpRequestParams {
method: "GET".to_string(),
url,
headers: vec![HttpHeader {
name: "accept".to_string(),
value: "text/event-stream".to_string(),
}],
body: None,
timeout_ms: Some(5_000),
request_id: "stream-1".to_string(),
stream_response: true,
})?,
)
.await?;
// Phase 3: assert the HTTP peer observes the expected request and then
// respond with chunked transfer encoding to exercise streaming.
let captured = accept_http_request(&listener).await?;
assert_eq!(
(
captured.request_line.as_str(),
captured.headers.get("accept").map(String::as_str),
captured.body,
),
(
"GET /mcp?case=streaming HTTP/1.1",
Some("text/event-stream"),
Vec::new(),
)
);
respond_with_chunked_body(
captured.stream,
&[("x-mcp-test", "streaming")],
&[b"hello ".as_slice(), b"world".as_slice()],
)
.await?;
// Phase 4: assert the JSON-RPC response reaches the wire before any body
// delta notifications, and that it contains status and headers but no
// buffered body when streaming is requested.
let first_event = server.next_event().await?;
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = first_event else {
anyhow::bail!("expected http/request response before body deltas, got {first_event:?}");
};
assert_eq!(id, http_request_id);
let response: HttpRequestResponse = serde_json::from_value(result)?;
assert_eq!(
(
response.status,
response_header(&response.headers, "x-mcp-test"),
response.body.into_inner(),
),
(200, Some("streaming".to_string()), Vec::new())
);
// Phase 5: assert the body notifications are contiguous, ordered, and end
// with a clean terminal frame.
let deltas = collect_response_body_deltas(&mut server, "stream-1").await?;
let seqs = deltas.iter().map(|delta| delta.seq).collect::<Vec<_>>();
let body = deltas
.iter()
.flat_map(|delta| delta.delta.clone().into_inner())
.collect::<Vec<_>>();
let terminal = deltas.last().map(|delta| (delta.done, delta.error.clone()));
let expected_seqs = (1..=deltas.len() as u64).collect::<Vec<_>>();
assert_eq!(
(seqs, body, terminal),
(expected_seqs, b"hello world".to_vec(), Some((true, None)))
);
server.shutdown().await?;
Ok(())
}
/// What this tests: streamed `requestId`s stay reserved until the body stream
/// finishes, so a second in-flight request cannot reuse the same id.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_http_request_rejects_duplicate_stream_request_ids() -> anyhow::Result<()> {
let mut server = exec_server().await?;
initialize_exec_server(&mut server).await?;
let listener = TcpListener::bind("127.0.0.1:0").await?;
let url = format!(
"http://{}/mcp?case=duplicate-stream-id",
listener.local_addr()?
);
let first_request_id = server
.send_request(
"http/request",
serde_json::to_value(HttpRequestParams {
method: "GET".to_string(),
url: url.clone(),
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: "stream-dup".to_string(),
stream_response: true,
})?,
)
.await?;
let captured = accept_http_request(&listener).await?;
let (finish_tx, finish_rx) = oneshot::channel();
let response_task = tokio::spawn(async move {
respond_with_chunked_body_until_finish(captured.stream, &[], &[b"hello"], finish_rx).await
});
let _: HttpRequestResponse = wait_for_response(&mut server, first_request_id).await?;
let duplicate_request_id = server
.send_request(
"http/request",
serde_json::to_value(HttpRequestParams {
method: "GET".to_string(),
url,
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: "stream-dup".to_string(),
stream_response: true,
})?,
)
.await?;
let duplicate_response = server
.wait_for_event(|event| {
matches!(
event,
JSONRPCMessage::Error(JSONRPCError { id, .. }) if id == &duplicate_request_id
)
})
.await?;
let JSONRPCMessage::Error(JSONRPCError { error, .. }) = duplicate_response else {
anyhow::bail!("expected duplicate requestId error response");
};
assert_eq!(error.code, -32602);
assert_eq!(
error.message,
"http/request streamResponse requestId `stream-dup` is already active"
);
finish_tx
.send(())
.expect("response task should still be waiting");
response_task.await??;
let _ = collect_response_body_deltas(&mut server, "stream-dup").await?;
server.shutdown().await?;
Ok(())
}
/// What this tests: omitting `timeoutMs` leaves the request unbounded, while
/// an explicit short timeout still fails the same delayed response.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_http_request_honors_optional_timeout() -> anyhow::Result<()> {
let mut server = exec_server().await?;
initialize_exec_server(&mut server).await?;
let listener = TcpListener::bind("127.0.0.1:0").await?;
let delayed_url = format!(
"http://{}/mcp?case=optional-timeout",
listener.local_addr()?
);
let no_timeout_request_id = server
.send_request(
"http/request",
serde_json::to_value(HttpRequestParams {
method: "GET".to_string(),
url: delayed_url.clone(),
headers: Vec::new(),
body: None,
timeout_ms: None,
request_id: "buffered-request".to_string(),
stream_response: false,
})?,
)
.await?;
let captured = accept_http_request(&listener).await?;
let delayed_response = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
respond_with_status_and_headers(captured.stream, "200 OK", &[], b"slow-success").await
});
let response: HttpRequestResponse =
wait_for_response(&mut server, no_timeout_request_id).await?;
assert_eq!(response.body.into_inner(), b"slow-success".to_vec());
delayed_response.await??;
let timeout_request_id = server
.send_request(
"http/request",
serde_json::to_value(HttpRequestParams {
method: "GET".to_string(),
url: delayed_url,
headers: Vec::new(),
body: None,
timeout_ms: Some(10),
request_id: "buffered-request".to_string(),
stream_response: false,
})?,
)
.await?;
let captured = accept_http_request(&listener).await?;
let delayed_timeout_response = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
respond_with_status_and_headers(captured.stream, "200 OK", &[], b"too-late").await
});
let error = wait_for_error_response(&mut server, timeout_request_id).await?;
assert_eq!(error.code, -32603);
assert!(
error.message.starts_with("http/request failed: "),
"unexpected timeout error: {}",
error.message
);
match delayed_timeout_response.await? {
Ok(()) => {}
Err(err) if is_expected_peer_disconnect(&err) => {}
Err(err) => return Err(err),
}
server.shutdown().await?;
Ok(())
}
/// Performs the JSON-RPC initialize handshake required before executor methods.
async fn initialize_exec_server(server: &mut ExecServerHarness) -> anyhow::Result<()> {
let initialize_id = server
.send_request(
"initialize",
serde_json::to_value(InitializeParams {
client_name: "exec-server-http-test".to_string(),
resume_session_id: None,
})?,
)
.await?;
let _: Value = wait_for_response(server, initialize_id).await?;
server
.send_notification("initialized", serde_json::json!({}))
.await?;
Ok(())
}
/// Waits for a typed JSON-RPC response with the requested id.
async fn wait_for_response<T>(
server: &mut ExecServerHarness,
request_id: RequestId,
) -> anyhow::Result<T>
where
T: DeserializeOwned,
{
let response = server
.wait_for_event(|event| {
matches!(
event,
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &request_id
)
})
.await?;
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
anyhow::bail!("expected JSON-RPC response for {request_id:?}");
};
Ok(serde_json::from_value(result)?)
}
/// Waits for a JSON-RPC error with the requested id.
async fn wait_for_error_response(
server: &mut ExecServerHarness,
request_id: RequestId,
) -> anyhow::Result<codex_app_server_protocol::JSONRPCErrorError> {
let response = server
.wait_for_event(|event| {
matches!(
event,
JSONRPCMessage::Error(JSONRPCError { id, .. }) if id == &request_id
)
})
.await?;
let JSONRPCMessage::Error(JSONRPCError { error, .. }) = response else {
anyhow::bail!("expected JSON-RPC error for {request_id:?}");
};
Ok(error)
}
/// Accepts one HTTP/1.1 request and captures its wire-visible fields.
async fn accept_http_request(listener: &TcpListener) -> anyhow::Result<CapturedHttpRequest> {
let (stream, _) = timeout(Duration::from_secs(5), listener.accept()).await??;
let mut reader = BufReader::new(stream);
let mut request_line = String::new();
reader.read_line(&mut request_line).await?;
let request_line = request_line.trim_end_matches("\r\n").to_string();
let mut headers = BTreeMap::new();
loop {
let mut line = String::new();
reader.read_line(&mut line).await?;
if line == "\r\n" {
break;
}
let line = line.trim_end_matches("\r\n");
let (name, value) = line
.split_once(':')
.ok_or_else(|| anyhow::anyhow!("HTTP header should contain colon: {line}"))?;
headers.insert(name.to_ascii_lowercase(), value.trim().to_string());
}
let content_length = headers
.get("content-length")
.and_then(|value| value.parse::<usize>().ok())
.unwrap_or(0);
let mut body = vec![0; content_length];
reader.read_exact(&mut body).await?;
Ok(CapturedHttpRequest {
stream: reader.into_inner(),
request_line,
headers,
body,
})
}
/// Writes a fixed-length HTTP response to the captured request stream.
async fn respond_with_status_and_headers(
mut stream: TcpStream,
status: &str,
headers: &[(&str, &str)],
body: &[u8],
) -> anyhow::Result<()> {
let extra_headers = headers
.iter()
.map(|(name, value)| format!("{name}: {value}\r\n"))
.collect::<String>();
let response = format!(
"HTTP/1.1 {status}\r\ncontent-type: text/plain\r\ncontent-length: {}\r\nconnection: close\r\n{extra_headers}\r\n",
body.len(),
);
stream.write_all(response.as_bytes()).await?;
stream.write_all(body).await?;
stream.flush().await?;
Ok(())
}
fn is_expected_peer_disconnect(err: &anyhow::Error) -> bool {
err.chain().any(|cause| {
cause
.downcast_ref::<std::io::Error>()
.is_some_and(|io_err| {
matches!(
io_err.kind(),
ErrorKind::BrokenPipe | ErrorKind::ConnectionReset | ErrorKind::UnexpectedEof
)
})
})
}
/// Writes a chunked HTTP response so reqwest must drive the streaming path.
async fn respond_with_chunked_body(
mut stream: TcpStream,
headers: &[(&str, &str)],
chunks: &[&[u8]],
) -> anyhow::Result<()> {
let extra_headers = headers
.iter()
.map(|(name, value)| format!("{name}: {value}\r\n"))
.collect::<String>();
let response = format!(
"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ntransfer-encoding: chunked\r\nconnection: close\r\n{extra_headers}\r\n",
);
stream.write_all(response.as_bytes()).await?;
for chunk in chunks {
stream
.write_all(format!("{:x}\r\n", chunk.len()).as_bytes())
.await?;
stream.write_all(chunk).await?;
stream.write_all(b"\r\n").await?;
stream.flush().await?;
}
stream.write_all(b"0\r\n\r\n").await?;
stream.flush().await?;
Ok(())
}
/// Writes a chunked response and keeps the stream open until the test allows EOF.
async fn respond_with_chunked_body_until_finish(
mut stream: TcpStream,
headers: &[(&str, &str)],
chunks: &[&[u8]],
finish_rx: oneshot::Receiver<()>,
) -> anyhow::Result<()> {
let extra_headers = headers
.iter()
.map(|(name, value)| format!("{name}: {value}\r\n"))
.collect::<String>();
let response = format!(
"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ntransfer-encoding: chunked\r\nconnection: close\r\n{extra_headers}\r\n",
);
stream.write_all(response.as_bytes()).await?;
for chunk in chunks {
stream
.write_all(format!("{:x}\r\n", chunk.len()).as_bytes())
.await?;
stream.write_all(chunk).await?;
stream.write_all(b"\r\n").await?;
stream.flush().await?;
}
finish_rx.await?;
stream.write_all(b"0\r\n\r\n").await?;
stream.flush().await?;
Ok(())
}
/// Collects streamed response-body notifications until the terminal frame.
async fn collect_response_body_deltas(
server: &mut ExecServerHarness,
request_id: &str,
) -> anyhow::Result<Vec<HttpRequestBodyDeltaNotification>> {
let mut deltas = Vec::new();
loop {
let event = server.next_event().await?;
let JSONRPCMessage::Notification(JSONRPCNotification { method, params }) = event else {
anyhow::bail!("expected http/request body delta notification, got {event:?}");
};
assert_eq!(method, "http/request/bodyDelta");
let delta: HttpRequestBodyDeltaNotification =
serde_json::from_value(params.unwrap_or(Value::Null))?;
assert_eq!(delta.request_id, request_id);
let done = delta.done;
deltas.push(delta);
if done {
return Ok(deltas);
}
}
}
/// Returns a response header value without depending on header-name casing.
fn response_header(headers: &[HttpHeader], name: &str) -> Option<String> {
headers
.iter()
.find(|header| header.name.eq_ignore_ascii_case(name))
.map(|header| header.value.clone())
}