Files
codex/codex-rs/exec-server/src/client/rpc_http_client.rs
Ahmed Ibrahim 0e78ce80ee [3/4] Add executor-backed RMCP HTTP client (#18583)
### Why
The RMCP layer needs a Streamable HTTP client that can talk either
directly over `reqwest` or through the executor HTTP runner without
duplicating MCP session logic higher in the stack. This PR adds that
client-side transport boundary so remote Streamable HTTP MCP can reuse
the same RMCP flow as the local path.

### What
- Add a shared `rmcp-client/src/streamable_http/` module with:
  - `transport_client.rs` for the local-or-remote transport enum
  - `local_client.rs` for the direct `reqwest` implementation
  - `remote_client.rs` for the executor-backed implementation
  - `common.rs` for the small shared Streamable HTTP helpers
- Teach `RmcpClient` to build Streamable HTTP transports in either local
or remote mode while keeping the existing OAuth ownership in RMCP.
- Translate remote POST, GET, and DELETE session operations into
executor `http/request` calls.
- Preserve RMCP session expiry handling and reconnect behavior for the
remote transport.
- Add remote transport coverage in
`rmcp-client/tests/streamable_http_remote.rs` and keep the shared test
support in `rmcp-client/tests/streamable_http_test_support.rs`.

### Verification
- `cargo check -p codex-rmcp-client`
- online CI

### Stack
1. #18581 protocol
2. #18582 runner
3. #18583 RMCP client
4. #18584 manager wiring and local/remote coverage

---------

Co-authored-by: Codex <noreply@openai.com>
2026-04-22 17:38:04 -07:00

89 lines
3.3 KiB
Rust

//! JSON-RPC-backed `HttpClient` implementation.
//!
//! This code runs in the orchestrator process. It does not issue network
//! requests directly; instead it forwards `http/request` to the remote runtime
//! and then reconstructs streamed bodies from `http/request/bodyDelta`
//! notifications on the shared connection.
use std::sync::Arc;
use futures::FutureExt;
use futures::future::BoxFuture;
use tokio::sync::mpsc;
use super::HttpResponseBodyStream;
use super::response_body_stream::HttpBodyStreamRegistration;
use crate::HttpClient;
use crate::client::ExecServerClient;
use crate::client::ExecServerError;
use crate::protocol::HTTP_REQUEST_METHOD;
use crate::protocol::HttpRequestParams;
use crate::protocol::HttpRequestResponse;
/// Maximum queued body frames per streamed HTTP response.
const HTTP_BODY_DELTA_CHANNEL_CAPACITY: usize = 256;
impl ExecServerClient {
/// Performs an HTTP request and buffers the response body.
pub async fn http_request(
&self,
mut params: HttpRequestParams,
) -> Result<HttpRequestResponse, ExecServerError> {
params.stream_response = false;
self.call(HTTP_REQUEST_METHOD, &params).await
}
/// Performs an HTTP request and returns a body stream.
///
/// The method sets `stream_response` and replaces any caller-supplied
/// `request_id` with a connection-local id, so late deltas from abandoned
/// streams cannot be confused with later requests.
pub async fn http_request_stream(
&self,
mut params: HttpRequestParams,
) -> Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError> {
params.stream_response = true;
let request_id = self.inner.next_http_body_stream_request_id();
params.request_id = request_id.clone();
let (tx, rx) = mpsc::channel(HTTP_BODY_DELTA_CHANNEL_CAPACITY);
self.inner
.insert_http_body_stream(request_id.clone(), tx)
.await?;
let mut registration =
HttpBodyStreamRegistration::new(Arc::clone(&self.inner), request_id.clone());
let response = match self.call(HTTP_REQUEST_METHOD, &params).await {
Ok(response) => response,
Err(error) => {
self.inner.remove_http_body_stream(&request_id).await;
registration.disarm();
return Err(error);
}
};
registration.disarm();
Ok((
response,
HttpResponseBodyStream::remote(Arc::clone(&self.inner), request_id, rx),
))
}
}
impl HttpClient for ExecServerClient {
/// Orchestrator-side adapter that forwards buffered HTTP requests to the
/// remote runtime over the shared JSON-RPC connection.
fn http_request(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>> {
async move { ExecServerClient::http_request(self, params).await }.boxed()
}
/// Orchestrator-side adapter that forwards streamed HTTP requests to the
/// remote runtime and exposes body deltas as a byte stream.
fn http_request_stream(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>> {
async move { ExecServerClient::http_request_stream(self, params).await }.boxed()
}
}