[3/4] Add executor-backed RMCP HTTP client (#18583)

### Why
The RMCP layer needs a Streamable HTTP client that can talk either
directly over `reqwest` or through the executor HTTP runner without
duplicating MCP session logic higher in the stack. This PR adds that
client-side transport boundary so remote Streamable HTTP MCP can reuse
the same RMCP flow as the local path.

### What
- Add a shared `rmcp-client/src/streamable_http/` module with:
  - `transport_client.rs` for the local-or-remote transport enum
  - `local_client.rs` for the direct `reqwest` implementation
  - `remote_client.rs` for the executor-backed implementation
  - `common.rs` for the small shared Streamable HTTP helpers
- Teach `RmcpClient` to build Streamable HTTP transports in either local
or remote mode while keeping the existing OAuth ownership in RMCP.
- Translate remote POST, GET, and DELETE session operations into
executor `http/request` calls.
- Preserve RMCP session expiry handling and reconnect behavior for the
remote transport.
- Add remote transport coverage in
`rmcp-client/tests/streamable_http_remote.rs` and keep the shared test
support in `rmcp-client/tests/streamable_http_test_support.rs`.

### Verification
- `cargo check -p codex-rmcp-client`
- online CI

### Stack
1. #18581 protocol
2. #18582 runner
3. #18583 RMCP client
4. #18584 manager wiring and local/remote coverage

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-22 17:38:04 -07:00
committed by GitHub
parent 83ec1eb5d6
commit 0e78ce80ee
20 changed files with 1595 additions and 972 deletions

View File

@@ -8,6 +8,8 @@ use std::time::Duration;
use arc_swap::ArcSwap;
use codex_app_server_protocol::JSONRPCNotification;
use futures::FutureExt;
use futures::future::BoxFuture;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::OnceCell;
@@ -20,6 +22,7 @@ use tracing::debug;
use crate::ProcessId;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::HttpClient;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecProcessEvent;
@@ -206,6 +209,25 @@ impl LazyRemoteExecServerClient {
}
}
impl HttpClient for LazyRemoteExecServerClient {
fn http_request(
&self,
params: crate::HttpRequestParams,
) -> BoxFuture<'_, Result<crate::HttpRequestResponse, ExecServerError>> {
async move { self.get().await?.http_request(params).await }.boxed()
}
fn http_request_stream(
&self,
params: crate::HttpRequestParams,
) -> BoxFuture<
'_,
Result<(crate::HttpRequestResponse, crate::HttpResponseBodyStream), ExecServerError>,
> {
async move { self.get().await?.http_request_stream(params).await }.boxed()
}
}
#[derive(Debug, thiserror::Error)]
pub enum ExecServerError {
#[error("failed to spawn exec-server: {0}")]
@@ -226,6 +248,8 @@ pub enum ExecServerError {
Disconnected(String),
#[error("failed to serialize or deserialize exec-server JSON: {0}")]
Json(#[from] serde_json::Error),
#[error("HTTP request failed: {0}")]
HttpRequest(String),
#[error("exec-server protocol error: {0}")]
Protocol(String),
#[error("exec-server rejected request ({code}): {message}")]

View File

@@ -1,522 +1,26 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
//! HTTP client capability implementations shared by local and remote environments.
//!
//! This module is the facade for the environment-owned [`crate::HttpClient`]
//! capability:
//! - [`ReqwestHttpClient`] executes requests directly with `reqwest`
//! - [`ExecServerClient`] forwards requests over the JSON-RPC transport
//! - [`HttpResponseBodyStream`] presents buffered local bodies and streamed
//! remote `http/request/bodyDelta` notifications through one byte-stream API
//!
//! Runtime split:
//! - orchestrator process: holds an `Arc<dyn HttpClient>` and chooses local or
//! remote execution
//! - remote runtime: serves the `http/request` RPC and runs the concrete local
//! HTTP request there when the orchestrator uses [`ExecServerClient`]
use codex_app_server_protocol::JSONRPCErrorError;
use futures::StreamExt;
use reqwest::Method;
use reqwest::Url;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use serde_json::Value;
use serde_json::from_value;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tracing::debug;
#[path = "reqwest_http_client.rs"]
mod reqwest_http_client;
#[path = "http_response_body_stream.rs"]
pub(crate) mod response_body_stream;
#[path = "rpc_http_client.rs"]
mod rpc_http_client;
use super::ExecServerClient;
use super::ExecServerError;
use super::Inner;
use crate::protocol::HTTP_REQUEST_BODY_DELTA_METHOD;
use crate::protocol::HTTP_REQUEST_METHOD;
use crate::protocol::HttpHeader;
use crate::protocol::HttpRequestBodyDeltaNotification;
use crate::protocol::HttpRequestParams;
use crate::protocol::HttpRequestResponse;
use crate::rpc::RpcNotificationSender;
use crate::rpc::internal_error;
use crate::rpc::invalid_params;
/// Maximum queued body frames per streamed executor HTTP response.
const HTTP_BODY_DELTA_CHANNEL_CAPACITY: usize = 256;
pub(crate) struct ExecutorPendingHttpBodyStream {
pub(crate) request_id: String,
response: reqwest::Response,
}
pub(crate) struct ExecutorHttpRequestRunner {
client: reqwest::Client,
}
/// Request-scoped stream of body chunks for an executor HTTP response.
///
/// The initial `http/request` call returns status and headers. This stream then
/// receives the ordered `http/request/bodyDelta` notifications for that request
/// id until EOF or a terminal error.
pub struct HttpResponseBodyStream {
inner: Arc<Inner>,
request_id: String,
next_seq: u64,
rx: mpsc::Receiver<HttpRequestBodyDeltaNotification>,
// Terminal frames can carry a final chunk; return that once, then EOF.
pending_eof: bool,
closed: bool,
}
impl ExecServerClient {
/// Performs an executor-side HTTP request and buffers the response body.
pub async fn http_request(
&self,
mut params: HttpRequestParams,
) -> Result<HttpRequestResponse, ExecServerError> {
params.stream_response = false;
self.call(HTTP_REQUEST_METHOD, &params).await
}
/// Performs an executor-side HTTP request and returns a body stream.
///
/// The method sets `stream_response` and replaces any caller-supplied
/// `request_id` with a connection-local id, so late deltas from abandoned
/// streams cannot be confused with later requests.
pub async fn http_request_stream(
&self,
mut params: HttpRequestParams,
) -> Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError> {
params.stream_response = true;
let request_id = self.inner.next_http_body_stream_request_id();
params.request_id = request_id.clone();
let (tx, rx) = mpsc::channel(HTTP_BODY_DELTA_CHANNEL_CAPACITY);
self.inner
.insert_http_body_stream(request_id.clone(), tx)
.await?;
let mut registration = HttpBodyStreamRegistration {
inner: Arc::clone(&self.inner),
request_id: request_id.clone(),
active: true,
};
let response = match self.call(HTTP_REQUEST_METHOD, &params).await {
Ok(response) => response,
Err(error) => {
self.inner.remove_http_body_stream(&request_id).await;
registration.active = false;
return Err(error);
}
};
registration.active = false;
Ok((
response,
HttpResponseBodyStream {
inner: Arc::clone(&self.inner),
request_id,
next_seq: 1,
rx,
pending_eof: false,
closed: false,
},
))
}
}
impl HttpResponseBodyStream {
/// Receives the next response-body chunk.
///
/// Returns `Ok(None)` at EOF and converts sequence gaps or executor-side
/// stream errors into protocol errors.
pub async fn recv(&mut self) -> Result<Option<Vec<u8>>, ExecServerError> {
if self.pending_eof {
self.pending_eof = false;
self.finish().await;
return Ok(None);
}
let Some(delta) = self.rx.recv().await else {
self.finish().await;
if let Some(error) = self
.inner
.take_http_body_stream_failure(&self.request_id)
.await
{
return Err(ExecServerError::Protocol(format!(
"http response stream `{}` failed: {error}",
self.request_id
)));
}
return Ok(None);
};
if delta.seq != self.next_seq {
self.finish().await;
return Err(ExecServerError::Protocol(format!(
"http response stream `{}` received seq {}, expected {}",
self.request_id, delta.seq, self.next_seq
)));
}
self.next_seq += 1;
let chunk = delta.delta.into_inner();
if let Some(error) = delta.error {
self.finish().await;
return Err(ExecServerError::Protocol(format!(
"http response stream `{}` failed: {error}",
self.request_id
)));
}
if delta.done {
self.finish().await;
if chunk.is_empty() {
return Ok(None);
}
self.pending_eof = true;
}
Ok(Some(chunk))
}
/// Removes this stream from the connection routing table once it reaches EOF.
async fn finish(&mut self) {
if self.closed {
return;
}
self.closed = true;
self.inner.remove_http_body_stream(&self.request_id).await;
}
}
impl Drop for HttpResponseBodyStream {
/// Schedules stream-route removal if the consumer drops before EOF.
fn drop(&mut self) {
if self.closed {
return;
}
self.closed = true;
spawn_remove_http_body_stream(Arc::clone(&self.inner), self.request_id.clone());
}
}
impl ExecutorHttpRequestRunner {
pub(crate) fn new(timeout_ms: Option<u64>) -> Result<Self, JSONRPCErrorError> {
let client = match timeout_ms {
None => reqwest::Client::builder(),
Some(timeout_ms) => {
reqwest::Client::builder().timeout(Duration::from_millis(timeout_ms))
}
}
.build()
.map_err(|err| internal_error(format!("failed to build http/request client: {err}")))?;
Ok(Self { client })
}
pub(crate) async fn run(
&self,
params: HttpRequestParams,
) -> Result<(HttpRequestResponse, Option<ExecutorPendingHttpBodyStream>), JSONRPCErrorError>
{
let method = Method::from_bytes(params.method.as_bytes())
.map_err(|err| invalid_params(format!("http/request method is invalid: {err}")))?;
let url = Url::parse(&params.url)
.map_err(|err| invalid_params(format!("http/request url is invalid: {err}")))?;
match url.scheme() {
"http" | "https" => {}
scheme => {
return Err(invalid_params(format!(
"http/request only supports http and https URLs, got {scheme}"
)));
}
}
let headers = Self::build_headers(params.headers)?;
let mut request = self.client.request(method, url).headers(headers);
if let Some(body) = params.body {
request = request.body(body.into_inner());
}
let response = request
.send()
.await
.map_err(|err| internal_error(format!("http/request failed: {err}")))?;
let status = response.status().as_u16();
let headers = Self::response_headers(response.headers());
if params.stream_response {
return Ok((
HttpRequestResponse {
status,
headers,
body: Vec::new().into(),
},
Some(ExecutorPendingHttpBodyStream {
request_id: params.request_id,
response,
}),
));
}
let body = response.bytes().await.map_err(|err| {
internal_error(format!("failed to read http/request response body: {err}"))
})?;
Ok((
HttpRequestResponse {
status,
headers,
body: body.to_vec().into(),
},
None,
))
}
fn build_headers(headers: Vec<HttpHeader>) -> Result<HeaderMap, JSONRPCErrorError> {
let mut header_map = HeaderMap::new();
for header in headers {
let name = HeaderName::from_bytes(header.name.as_bytes()).map_err(|err| {
invalid_params(format!("http/request header name is invalid: {err}"))
})?;
let value = HeaderValue::from_str(&header.value).map_err(|err| {
invalid_params(format!(
"http/request header value is invalid for {}: {err}",
header.name
))
})?;
header_map.append(name, value);
}
Ok(header_map)
}
fn response_headers(headers: &HeaderMap) -> Vec<HttpHeader> {
headers
.iter()
.filter_map(|(name, value)| {
Some(HttpHeader {
name: name.as_str().to_string(),
value: value.to_str().ok()?.to_string(),
})
})
.collect()
}
pub(crate) async fn stream_body(
pending_stream: ExecutorPendingHttpBodyStream,
notifications: RpcNotificationSender,
) {
let ExecutorPendingHttpBodyStream {
request_id,
response,
} = pending_stream;
let mut seq = 1;
let mut body = response.bytes_stream();
while let Some(chunk) = body.next().await {
match chunk {
Ok(bytes) => {
if !send_executor_body_delta(
&notifications,
HttpRequestBodyDeltaNotification {
request_id: request_id.clone(),
seq,
delta: bytes.to_vec().into(),
done: false,
error: None,
},
)
.await
{
return;
}
seq += 1;
}
Err(err) => {
let _ = send_executor_body_delta(
&notifications,
HttpRequestBodyDeltaNotification {
request_id,
seq,
delta: Vec::new().into(),
done: true,
error: Some(err.to_string()),
},
)
.await;
return;
}
}
}
let _ = send_executor_body_delta(
&notifications,
HttpRequestBodyDeltaNotification {
request_id,
seq,
delta: Vec::new().into(),
done: true,
error: None,
},
)
.await;
}
}
impl Inner {
/// Routes one streamed HTTP body notification into its request-local receiver.
pub(super) async fn handle_http_body_delta_notification(
&self,
params: Option<Value>,
) -> Result<(), ExecServerError> {
let params: HttpRequestBodyDeltaNotification = from_value(params.unwrap_or(Value::Null))?;
// Unknown request ids are ignored intentionally: a stream may have already
// reached EOF and released its route.
if let Some(tx) = self
.http_body_streams
.load()
.get(&params.request_id)
.cloned()
{
let request_id = params.request_id.clone();
let terminal_delta = params.done || params.error.is_some();
match tx.try_send(params) {
Ok(()) => {
if terminal_delta {
self.remove_http_body_stream(&request_id).await;
}
}
Err(TrySendError::Closed(_)) => {
self.remove_http_body_stream(&request_id).await;
debug!("http response stream receiver dropped before body delta delivery");
}
Err(TrySendError::Full(_)) => {
self.record_http_body_stream_failure(
&request_id,
"body delta channel filled before delivery".to_string(),
)
.await;
self.remove_http_body_stream(&request_id).await;
debug!(
"closing http response stream `{request_id}` after body delta backpressure"
);
}
}
}
Ok(())
}
/// Fails active streamed HTTP bodies so callers do not wait forever after a
/// transport disconnect or notification handling failure.
pub(super) async fn fail_all_http_body_streams(&self, message: String) {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let streams = self.http_body_streams.load();
let streams = streams.as_ref().clone();
self.http_body_streams.store(Arc::new(HashMap::new()));
for (request_id, tx) in streams {
if tx
.try_send(HttpRequestBodyDeltaNotification {
request_id: request_id.clone(),
seq: 1,
delta: Vec::new().into(),
done: true,
error: Some(message.clone()),
})
.is_err()
{
let mut next_failures = self.http_body_stream_failures.load().as_ref().clone();
next_failures.insert(request_id, message.clone());
self.http_body_stream_failures
.store(Arc::new(next_failures));
}
}
}
/// Allocates a connection-local streamed HTTP response id.
fn next_http_body_stream_request_id(&self) -> String {
let id = self
.http_body_stream_next_id
.fetch_add(1, Ordering::Relaxed);
format!("http-{id}")
}
/// Registers a request id before issuing an executor streaming HTTP call.
async fn insert_http_body_stream(
&self,
request_id: String,
tx: mpsc::Sender<HttpRequestBodyDeltaNotification>,
) -> Result<(), ExecServerError> {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let streams = self.http_body_streams.load();
if streams.contains_key(&request_id) {
return Err(ExecServerError::Protocol(format!(
"http response stream already registered for request {request_id}"
)));
}
let mut next_streams = streams.as_ref().clone();
next_streams.insert(request_id.clone(), tx);
self.http_body_streams.store(Arc::new(next_streams));
let failures = self.http_body_stream_failures.load();
if failures.contains_key(&request_id) {
let mut next_failures = failures.as_ref().clone();
next_failures.remove(&request_id);
self.http_body_stream_failures
.store(Arc::new(next_failures));
}
Ok(())
}
/// Removes a request id after EOF, terminal error, or request failure.
async fn remove_http_body_stream(
&self,
request_id: &str,
) -> Option<mpsc::Sender<HttpRequestBodyDeltaNotification>> {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let streams = self.http_body_streams.load();
let stream = streams.get(request_id).cloned();
stream.as_ref()?;
let mut next_streams = streams.as_ref().clone();
next_streams.remove(request_id);
self.http_body_streams.store(Arc::new(next_streams));
stream
}
async fn record_http_body_stream_failure(&self, request_id: &str, message: String) {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let failures = self.http_body_stream_failures.load();
let mut next_failures = failures.as_ref().clone();
next_failures.insert(request_id.to_string(), message);
self.http_body_stream_failures
.store(Arc::new(next_failures));
}
async fn take_http_body_stream_failure(&self, request_id: &str) -> Option<String> {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let failures = self.http_body_stream_failures.load();
let error = failures.get(request_id).cloned();
error.as_ref()?;
let mut next_failures = failures.as_ref().clone();
next_failures.remove(request_id);
self.http_body_stream_failures
.store(Arc::new(next_failures));
error
}
}
/// Active route registration owned while `http_request_stream` awaits headers.
struct HttpBodyStreamRegistration {
inner: Arc<Inner>,
request_id: String,
active: bool,
}
impl Drop for HttpBodyStreamRegistration {
/// Removes the route if the stream request future is cancelled before headers return.
fn drop(&mut self) {
if self.active {
spawn_remove_http_body_stream(Arc::clone(&self.inner), self.request_id.clone());
}
}
}
/// Schedules HTTP body route removal from synchronous drop paths.
fn spawn_remove_http_body_stream(inner: Arc<Inner>, request_id: String) {
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
inner.remove_http_body_stream(&request_id).await;
});
}
}
async fn send_executor_body_delta(
notifications: &RpcNotificationSender,
delta: HttpRequestBodyDeltaNotification,
) -> bool {
notifications
.notify(HTTP_REQUEST_BODY_DELTA_METHOD, &delta)
.await
.is_ok()
}
pub(crate) use reqwest_http_client::PendingReqwestHttpBodyStream;
pub use reqwest_http_client::ReqwestHttpClient;
pub(crate) use reqwest_http_client::ReqwestHttpRequestRunner;
pub use response_body_stream::HttpResponseBodyStream;

View File

@@ -0,0 +1,355 @@
//! Shared HTTP response-body stream plumbing for local and remote execution.
//!
//! This module owns the byte-stream type exposed by the `HttpClient`
//! capability plus the remote-side routing table used to turn
//! `http/request/bodyDelta` notifications back into per-request streams.
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use bytes::Bytes;
use futures::StreamExt;
use reqwest::Response;
use serde_json::Value;
use serde_json::from_value;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tracing::debug;
use crate::client::ExecServerError;
use crate::client::Inner;
use crate::protocol::HTTP_REQUEST_BODY_DELTA_METHOD;
use crate::protocol::HttpRequestBodyDeltaNotification;
use crate::rpc::RpcNotificationSender;
pub(super) struct HttpBodyStreamRegistration {
inner: Arc<Inner>,
request_id: String,
active: bool,
}
enum HttpResponseBodyStreamInner {
Local {
body: Pin<Box<dyn futures::Stream<Item = Result<Bytes, reqwest::Error>> + Send>>,
},
Remote {
inner: Arc<Inner>,
request_id: String,
next_seq: u64,
rx: mpsc::Receiver<HttpRequestBodyDeltaNotification>,
pending_eof: bool,
closed: bool,
},
}
/// Request-scoped stream of body chunks for an HTTP response.
///
/// The initial `http/request` call returns status and headers. This stream then
/// receives the ordered `http/request/bodyDelta` notifications for that request
/// id until EOF or a terminal error.
pub struct HttpResponseBodyStream {
inner: HttpResponseBodyStreamInner,
}
impl HttpResponseBodyStream {
pub(super) fn local(response: Response) -> Self {
Self {
inner: HttpResponseBodyStreamInner::Local {
body: Box::pin(response.bytes_stream()),
},
}
}
pub(super) fn remote(
inner: Arc<Inner>,
request_id: String,
rx: mpsc::Receiver<HttpRequestBodyDeltaNotification>,
) -> Self {
Self {
inner: HttpResponseBodyStreamInner::Remote {
inner,
request_id,
next_seq: 1,
rx,
pending_eof: false,
closed: false,
},
}
}
/// Receives the next response-body chunk.
///
/// Returns `Ok(None)` at EOF and converts sequence gaps or stream-side
/// stream errors into protocol errors.
pub async fn recv(&mut self) -> Result<Option<Vec<u8>>, ExecServerError> {
match &mut self.inner {
HttpResponseBodyStreamInner::Local { body } => match body.next().await {
Some(chunk) => match chunk {
Ok(bytes) => Ok(Some(bytes.to_vec())),
Err(error) => Err(ExecServerError::HttpRequest(error.to_string())),
},
None => Ok(None),
},
HttpResponseBodyStreamInner::Remote {
inner,
request_id,
next_seq,
rx,
pending_eof,
closed,
} => {
if *pending_eof {
*pending_eof = false;
finish_remote_stream(inner, request_id, closed).await;
return Ok(None);
}
let Some(delta) = rx.recv().await else {
finish_remote_stream(inner, request_id, closed).await;
if let Some(error) = inner.take_http_body_stream_failure(request_id).await {
return Err(ExecServerError::Protocol(format!(
"http response stream `{request_id}` failed: {error}",
)));
}
return Ok(None);
};
if delta.seq != *next_seq {
finish_remote_stream(inner, request_id, closed).await;
return Err(ExecServerError::Protocol(format!(
"http response stream `{request_id}` received seq {}, expected {}",
delta.seq, *next_seq
)));
}
*next_seq += 1;
let chunk = delta.delta.into_inner();
if let Some(error) = delta.error {
finish_remote_stream(inner, request_id, closed).await;
return Err(ExecServerError::Protocol(format!(
"http response stream `{request_id}` failed: {error}",
)));
}
if delta.done {
finish_remote_stream(inner, request_id, closed).await;
if chunk.is_empty() {
return Ok(None);
}
*pending_eof = true;
}
Ok(Some(chunk))
}
}
}
}
impl Drop for HttpResponseBodyStream {
/// Schedules stream-route removal if the consumer drops before EOF.
fn drop(&mut self) {
if let HttpResponseBodyStreamInner::Remote {
inner,
request_id,
closed,
..
} = &mut self.inner
{
if *closed {
return;
}
*closed = true;
spawn_remove_http_body_stream(Arc::clone(inner), request_id.clone());
}
}
}
impl HttpBodyStreamRegistration {
pub(super) fn new(inner: Arc<Inner>, request_id: String) -> Self {
Self {
inner,
request_id,
active: true,
}
}
pub(super) fn disarm(&mut self) {
self.active = false;
}
}
impl Drop for HttpBodyStreamRegistration {
/// Removes the route if the stream request future is cancelled before headers return.
fn drop(&mut self) {
if self.active {
spawn_remove_http_body_stream(Arc::clone(&self.inner), self.request_id.clone());
}
}
}
async fn finish_remote_stream(inner: &Arc<Inner>, request_id: &str, closed: &mut bool) {
if *closed {
return;
}
*closed = true;
inner.remove_http_body_stream(request_id).await;
}
/// Schedules HTTP body route removal from synchronous drop paths.
fn spawn_remove_http_body_stream(inner: Arc<Inner>, request_id: String) {
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
inner.remove_http_body_stream(&request_id).await;
});
}
}
pub(super) async fn send_body_delta(
notifications: &RpcNotificationSender,
delta: HttpRequestBodyDeltaNotification,
) -> bool {
notifications
.notify(HTTP_REQUEST_BODY_DELTA_METHOD, &delta)
.await
.is_ok()
}
impl Inner {
/// Routes one streamed HTTP body notification into its request-local receiver.
pub(crate) async fn handle_http_body_delta_notification(
&self,
params: Option<Value>,
) -> Result<(), ExecServerError> {
let params: HttpRequestBodyDeltaNotification = from_value(params.unwrap_or(Value::Null))?;
// Unknown request ids are ignored intentionally: a stream may have already
// reached EOF and released its route.
if let Some(tx) = self
.http_body_streams
.load()
.get(&params.request_id)
.cloned()
{
let request_id = params.request_id.clone();
let terminal_delta = params.done || params.error.is_some();
match tx.try_send(params) {
Ok(()) => {
if terminal_delta {
self.remove_http_body_stream(&request_id).await;
}
}
Err(TrySendError::Closed(_)) => {
self.remove_http_body_stream(&request_id).await;
debug!("http response stream receiver dropped before body delta delivery");
}
Err(TrySendError::Full(_)) => {
self.record_http_body_stream_failure(
&request_id,
"body delta channel filled before delivery".to_string(),
)
.await;
self.remove_http_body_stream(&request_id).await;
debug!(
"closing http response stream `{request_id}` after body delta backpressure"
);
}
}
}
Ok(())
}
/// Fails active streamed HTTP bodies so callers do not wait forever after a
/// transport disconnect or notification handling failure.
pub(crate) async fn fail_all_http_body_streams(&self, message: String) {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let streams = self.http_body_streams.load();
let streams = streams.as_ref().clone();
self.http_body_streams.store(Arc::new(HashMap::new()));
for (request_id, tx) in streams {
if tx
.try_send(HttpRequestBodyDeltaNotification {
request_id: request_id.clone(),
seq: 1,
delta: Vec::new().into(),
done: true,
error: Some(message.clone()),
})
.is_err()
{
let mut next_failures = self.http_body_stream_failures.load().as_ref().clone();
next_failures.insert(request_id, message.clone());
self.http_body_stream_failures
.store(Arc::new(next_failures));
}
}
}
/// Allocates a connection-local streamed HTTP response id.
pub(super) fn next_http_body_stream_request_id(&self) -> String {
let id = self
.http_body_stream_next_id
.fetch_add(1, Ordering::Relaxed);
format!("http-{id}")
}
/// Registers a request id before issuing a streaming HTTP call.
pub(super) async fn insert_http_body_stream(
&self,
request_id: String,
tx: mpsc::Sender<HttpRequestBodyDeltaNotification>,
) -> Result<(), ExecServerError> {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let streams = self.http_body_streams.load();
if streams.contains_key(&request_id) {
return Err(ExecServerError::Protocol(format!(
"http response stream already registered for request {request_id}"
)));
}
let mut next_streams = streams.as_ref().clone();
next_streams.insert(request_id.clone(), tx);
self.http_body_streams.store(Arc::new(next_streams));
let failures = self.http_body_stream_failures.load();
if failures.contains_key(&request_id) {
let mut next_failures = failures.as_ref().clone();
next_failures.remove(&request_id);
self.http_body_stream_failures
.store(Arc::new(next_failures));
}
Ok(())
}
/// Removes a request id after EOF, terminal error, or request failure.
pub(super) async fn remove_http_body_stream(
&self,
request_id: &str,
) -> Option<mpsc::Sender<HttpRequestBodyDeltaNotification>> {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let streams = self.http_body_streams.load();
let stream = streams.get(request_id).cloned();
stream.as_ref()?;
let mut next_streams = streams.as_ref().clone();
next_streams.remove(request_id);
self.http_body_streams.store(Arc::new(next_streams));
stream
}
async fn record_http_body_stream_failure(&self, request_id: &str, message: String) {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let failures = self.http_body_stream_failures.load();
let mut next_failures = failures.as_ref().clone();
next_failures.insert(request_id.to_string(), message);
self.http_body_stream_failures
.store(Arc::new(next_failures));
}
async fn take_http_body_stream_failure(&self, request_id: &str) -> Option<String> {
let _streams_write_guard = self.http_body_streams_write_lock.lock().await;
let failures = self.http_body_stream_failures.load();
let error = failures.get(request_id).cloned();
error.as_ref()?;
let mut next_failures = failures.as_ref().clone();
next_failures.remove(request_id);
self.http_body_stream_failures
.store(Arc::new(next_failures));
error
}
}

View File

@@ -0,0 +1,267 @@
//! `reqwest`-backed `HttpClient` implementation.
//!
//! This code runs wherever the real network request should originate:
//! - in a local environment, that means the orchestrator process
//! - in a remote environment, that means the remote runtime after the
//! orchestrator has forwarded `http/request` over JSON-RPC
use std::time::Duration;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_client::build_reqwest_client_with_custom_ca;
use futures::FutureExt;
use futures::StreamExt;
use futures::future::BoxFuture;
use reqwest::Method;
use reqwest::Url;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use super::HttpResponseBodyStream;
use super::response_body_stream::send_body_delta;
use crate::HttpClient;
use crate::client::ExecServerError;
use crate::protocol::HttpHeader;
use crate::protocol::HttpRequestBodyDeltaNotification;
use crate::protocol::HttpRequestParams;
use crate::protocol::HttpRequestResponse;
use crate::rpc::RpcNotificationSender;
use crate::rpc::internal_error;
use crate::rpc::invalid_params;
/// `HttpClient` implementation that performs the actual HTTP request with
/// `reqwest`.
#[derive(Clone, Default)]
pub struct ReqwestHttpClient;
/// Streaming response state held between the initial HTTP response and
/// downstream body-delta forwarding.
pub(crate) struct PendingReqwestHttpBodyStream {
pub(crate) request_id: String,
pub(crate) response: reqwest::Response,
}
/// Validates `http/request` parameters and runs the actual `reqwest` call used
/// by the exec-server route and the local [`HttpClient`] backend.
pub(crate) struct ReqwestHttpRequestRunner {
client: reqwest::Client,
}
impl ReqwestHttpClient {
fn build_client(timeout_ms: Option<u64>) -> Result<reqwest::Client, ExecServerError> {
let builder = match timeout_ms {
None => reqwest::Client::builder(),
Some(timeout_ms) => {
reqwest::Client::builder().timeout(Duration::from_millis(timeout_ms))
}
};
build_reqwest_client_with_custom_ca(builder)
.map_err(|error| ExecServerError::HttpRequest(error.to_string()))
}
}
impl HttpClient for ReqwestHttpClient {
fn http_request(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>> {
async move {
let runner = ReqwestHttpRequestRunner::new(params.timeout_ms)
.map_err(|error| ExecServerError::HttpRequest(error.message))?;
let (response, _) = runner
.run(HttpRequestParams {
stream_response: false,
..params
})
.await
.map_err(|error| ExecServerError::HttpRequest(error.message))?;
Ok(response)
}
.boxed()
}
fn http_request_stream(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>> {
async move {
let runner = ReqwestHttpRequestRunner::new(params.timeout_ms)
.map_err(|error| ExecServerError::HttpRequest(error.message))?;
let (response, pending_stream) = runner
.run(HttpRequestParams {
stream_response: true,
..params
})
.await
.map_err(|error| ExecServerError::HttpRequest(error.message))?;
let pending_stream = pending_stream.ok_or_else(|| {
ExecServerError::Protocol(
"http request stream did not return a response body stream".to_string(),
)
})?;
Ok((
response,
HttpResponseBodyStream::local(pending_stream.response),
))
}
.boxed()
}
}
impl ReqwestHttpRequestRunner {
pub(crate) fn new(timeout_ms: Option<u64>) -> Result<Self, JSONRPCErrorError> {
let client = ReqwestHttpClient::build_client(timeout_ms)
.map_err(|error| internal_error(error.to_string()))?;
Ok(Self { client })
}
pub(crate) async fn run(
&self,
params: HttpRequestParams,
) -> Result<(HttpRequestResponse, Option<PendingReqwestHttpBodyStream>), JSONRPCErrorError>
{
let method = Method::from_bytes(params.method.as_bytes())
.map_err(|error| invalid_params(format!("http/request method is invalid: {error}")))?;
let url = Url::parse(&params.url)
.map_err(|error| invalid_params(format!("http/request url is invalid: {error}")))?;
match url.scheme() {
"http" | "https" => {}
scheme => {
return Err(invalid_params(format!(
"http/request only supports http and https URLs, got {scheme}"
)));
}
}
let headers = Self::build_headers(params.headers)?;
let mut request = self.client.request(method, url).headers(headers);
if let Some(body) = params.body {
request = request.body(body.into_inner());
}
let response = request
.send()
.await
.map_err(|error| internal_error(format!("http/request failed: {error}")))?;
let status = response.status().as_u16();
let headers = Self::response_headers(response.headers());
if params.stream_response {
return Ok((
HttpRequestResponse {
status,
headers,
body: Vec::new().into(),
},
Some(PendingReqwestHttpBodyStream {
request_id: params.request_id,
response,
}),
));
}
let body = response.bytes().await.map_err(|error| {
internal_error(format!(
"failed to read http/request response body: {error}"
))
})?;
Ok((
HttpRequestResponse {
status,
headers,
body: body.to_vec().into(),
},
None,
))
}
pub(crate) async fn stream_body(
pending_stream: PendingReqwestHttpBodyStream,
notifications: RpcNotificationSender,
) {
let PendingReqwestHttpBodyStream {
request_id,
response,
} = pending_stream;
let mut seq = 1;
let mut body = response.bytes_stream();
while let Some(chunk) = body.next().await {
match chunk {
Ok(bytes) => {
if !send_body_delta(
&notifications,
HttpRequestBodyDeltaNotification {
request_id: request_id.clone(),
seq,
delta: bytes.to_vec().into(),
done: false,
error: None,
},
)
.await
{
return;
}
seq += 1;
}
Err(error) => {
let _ = send_body_delta(
&notifications,
HttpRequestBodyDeltaNotification {
request_id,
seq,
delta: Vec::new().into(),
done: true,
error: Some(error.to_string()),
},
)
.await;
return;
}
}
}
let _ = send_body_delta(
&notifications,
HttpRequestBodyDeltaNotification {
request_id,
seq,
delta: Vec::new().into(),
done: true,
error: None,
},
)
.await;
}
fn build_headers(headers: Vec<HttpHeader>) -> Result<HeaderMap, JSONRPCErrorError> {
let mut header_map = HeaderMap::new();
for header in headers {
let name = HeaderName::from_bytes(header.name.as_bytes()).map_err(|error| {
invalid_params(format!("http/request header name is invalid: {error}"))
})?;
let value = HeaderValue::from_str(&header.value).map_err(|error| {
invalid_params(format!(
"http/request header value is invalid for {}: {error}",
header.name
))
})?;
header_map.append(name, value);
}
Ok(header_map)
}
fn response_headers(headers: &HeaderMap) -> Vec<HttpHeader> {
headers
.iter()
.filter_map(|(name, value)| {
Some(HttpHeader {
name: name.as_str().to_string(),
value: value.to_str().ok()?.to_string(),
})
})
.collect()
}
}

View File

@@ -0,0 +1,88 @@
//! JSON-RPC-backed `HttpClient` implementation.
//!
//! This code runs in the orchestrator process. It does not issue network
//! requests directly; instead it forwards `http/request` to the remote runtime
//! and then reconstructs streamed bodies from `http/request/bodyDelta`
//! notifications on the shared connection.
use std::sync::Arc;
use futures::FutureExt;
use futures::future::BoxFuture;
use tokio::sync::mpsc;
use super::HttpResponseBodyStream;
use super::response_body_stream::HttpBodyStreamRegistration;
use crate::HttpClient;
use crate::client::ExecServerClient;
use crate::client::ExecServerError;
use crate::protocol::HTTP_REQUEST_METHOD;
use crate::protocol::HttpRequestParams;
use crate::protocol::HttpRequestResponse;
/// Maximum queued body frames per streamed HTTP response.
const HTTP_BODY_DELTA_CHANNEL_CAPACITY: usize = 256;
impl ExecServerClient {
/// Performs an HTTP request and buffers the response body.
pub async fn http_request(
&self,
mut params: HttpRequestParams,
) -> Result<HttpRequestResponse, ExecServerError> {
params.stream_response = false;
self.call(HTTP_REQUEST_METHOD, &params).await
}
/// Performs an HTTP request and returns a body stream.
///
/// The method sets `stream_response` and replaces any caller-supplied
/// `request_id` with a connection-local id, so late deltas from abandoned
/// streams cannot be confused with later requests.
pub async fn http_request_stream(
&self,
mut params: HttpRequestParams,
) -> Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError> {
params.stream_response = true;
let request_id = self.inner.next_http_body_stream_request_id();
params.request_id = request_id.clone();
let (tx, rx) = mpsc::channel(HTTP_BODY_DELTA_CHANNEL_CAPACITY);
self.inner
.insert_http_body_stream(request_id.clone(), tx)
.await?;
let mut registration =
HttpBodyStreamRegistration::new(Arc::clone(&self.inner), request_id.clone());
let response = match self.call(HTTP_REQUEST_METHOD, &params).await {
Ok(response) => response,
Err(error) => {
self.inner.remove_http_body_stream(&request_id).await;
registration.disarm();
return Err(error);
}
};
registration.disarm();
Ok((
response,
HttpResponseBodyStream::remote(Arc::clone(&self.inner), request_id, rx),
))
}
}
impl HttpClient for ExecServerClient {
/// Orchestrator-side adapter that forwards buffered HTTP requests to the
/// remote runtime over the shared JSON-RPC connection.
fn http_request(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>> {
async move { ExecServerClient::http_request(self, params).await }.boxed()
}
/// Orchestrator-side adapter that forwards streamed HTTP requests to the
/// remote runtime and exposes body deltas as a byte stream.
fn http_request_stream(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>> {
async move { ExecServerClient::http_request_stream(self, params).await }.boxed()
}
}

View File

@@ -1,5 +1,12 @@
use std::time::Duration;
use futures::future::BoxFuture;
use crate::ExecServerError;
use crate::HttpRequestParams;
use crate::HttpRequestResponse;
use crate::HttpResponseBodyStream;
/// Connection options for any exec-server client transport.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecServerClientConnectOptions {
@@ -17,3 +24,22 @@ pub struct RemoteExecServerConnectArgs {
pub initialize_timeout: Duration,
pub resume_session_id: Option<String>,
}
/// Sends HTTP requests through a runtime-selected transport.
///
/// This is the HTTP capability counterpart to [`crate::ExecBackend`]. Callers
/// use it when they need environment-owned network requests but should not
/// depend on the concrete connection type or how that connection is established.
pub trait HttpClient: Send + Sync {
/// Perform an HTTP request and buffer the response body.
fn http_request(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<HttpRequestResponse, ExecServerError>>;
/// Perform an HTTP request and return a streamed body handle.
fn http_request_stream(
&self,
params: HttpRequestParams,
) -> BoxFuture<'_, Result<(HttpRequestResponse, HttpResponseBodyStream), ExecServerError>>;
}

View File

@@ -3,7 +3,9 @@ use std::sync::Arc;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::HttpClient;
use crate::client::LazyRemoteExecServerClient;
use crate::client::http_client::ReqwestHttpClient;
use crate::file_system::ExecutorFileSystem;
use crate::local_file_system::LocalFileSystem;
use crate::local_process::LocalProcess;
@@ -136,6 +138,7 @@ pub struct Environment {
exec_server_url: Option<String>,
exec_backend: Arc<dyn ExecBackend>,
filesystem: Arc<dyn ExecutorFileSystem>,
http_client: Arc<dyn HttpClient>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
}
@@ -146,6 +149,7 @@ impl Environment {
exec_server_url: None,
exec_backend: Arc::new(LocalProcess::default()),
filesystem: Arc::new(LocalFileSystem::unsandboxed()),
http_client: Arc::new(ReqwestHttpClient),
local_runtime_paths: None,
}
}
@@ -202,6 +206,7 @@ impl Environment {
filesystem: Arc::new(LocalFileSystem::with_runtime_paths(
local_runtime_paths.clone(),
)),
http_client: Arc::new(ReqwestHttpClient),
local_runtime_paths: Some(local_runtime_paths),
}
}
@@ -216,12 +221,14 @@ impl Environment {
) -> Self {
let client = LazyRemoteExecServerClient::new(exec_server_url.clone());
let exec_backend: Arc<dyn ExecBackend> = Arc::new(RemoteProcess::new(client.clone()));
let filesystem: Arc<dyn ExecutorFileSystem> = Arc::new(RemoteFileSystem::new(client));
let filesystem: Arc<dyn ExecutorFileSystem> =
Arc::new(RemoteFileSystem::new(client.clone()));
Self {
exec_server_url: Some(exec_server_url),
exec_backend,
filesystem,
http_client: Arc::new(client),
local_runtime_paths,
}
}
@@ -243,6 +250,10 @@ impl Environment {
Arc::clone(&self.exec_backend)
}
pub fn get_http_client(&self) -> Arc<dyn HttpClient> {
Arc::clone(&self.http_client)
}
pub fn get_filesystem(&self) -> Arc<dyn ExecutorFileSystem> {
Arc::clone(&self.filesystem)
}

View File

@@ -20,7 +20,10 @@ mod server;
pub use client::ExecServerClient;
pub use client::ExecServerError;
pub use client::http_client::HttpResponseBodyStream;
pub use client::http_client::ReqwestHttpClient;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::HttpClient;
pub use client_api::RemoteExecServerConnectArgs;
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
pub use environment::Environment;

View File

@@ -12,8 +12,8 @@ use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use crate::ExecServerRuntimePaths;
use crate::client::http_client::ExecutorHttpRequestRunner;
use crate::client::http_client::ExecutorPendingHttpBodyStream;
use crate::client::http_client::PendingReqwestHttpBodyStream;
use crate::client::http_client::ReqwestHttpRequestRunner;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::FsCopyParams;
@@ -178,7 +178,7 @@ impl ExecServerHandler {
if stream_response {
self.reserve_http_body_stream(&http_request_id).await?;
}
let response = ExecutorHttpRequestRunner::new(params.timeout_ms)?
let response = ReqwestHttpRequestRunner::new(params.timeout_ms)?
.run(params)
.await;
if response.is_err() && stream_response {
@@ -306,7 +306,7 @@ impl ExecServerHandler {
async fn start_http_body_stream(
self: &Arc<Self>,
pending_stream: ExecutorPendingHttpBodyStream,
pending_stream: PendingReqwestHttpBodyStream,
) {
let request_id = pending_stream.request_id.clone();
if self.background_task_shutdown.is_cancelled() {
@@ -320,7 +320,7 @@ impl ExecServerHandler {
self.background_tasks.spawn(async move {
tokio::select! {
_ = shutdown.cancelled() => {}
_ = ExecutorHttpRequestRunner::stream_body(pending_stream, notifications) => {}
_ = ReqwestHttpRequestRunner::stream_body(pending_stream, notifications) => {}
}
handler.release_http_body_stream(&finished_request_id).await;
});