Files
codex/codex-rs/exec-server/src/relay.rs
Anton Panasenko ac466c0dbd feat(exec-server): use protobuf relay frames (#22343)
## Why

Remote exec-server now needs one executor websocket to serve multiple
harness JSON-RPC sessions. Rendezvous routes by `stream_id`, and the
exec-server side needs to use the same stable relay frame contract
instead of a hand-rolled JSON shape.

The relay protocol also needs to make ownership boundaries clear:
harness and executor endpoints own sequencing, acks, retries, duplicate
suppression, segmentation, and reassembly; rendezvous only routes
frames.

## What Changed

- Add the checked-in `codex.exec_server.relay.v1.RelayMessageFrame`
proto plus generated prost bindings for `codex-exec-server`.
- Encode remote harness/executor relay traffic as binary protobuf
websocket frames while keeping local websocket JSON-RPC unchanged.
- Demux executor-side relay streams into independent
`ConnectionProcessor` sessions keyed by `stream_id`.
- Add a programmatic `RemoteExecutorConfig::with_bearer_token(...)`
constructor for non-CLI callers and integration tests.
- Add an integration test that starts the remote executor against a fake
registry/rendezvous websocket and verifies two virtual streams share one
executor websocket without cross-talk, including per-stream reset
behavior.
- Document the remote relay envelope, sequence ranges, `ack`/`ack_bits`,
and endpoint responsibilities in `exec-server/README.md`.

## Verification

- `cargo test -p codex-exec-server --test relay
multiplexed_remote_executor_routes_independent_virtual_streams --
--exact`
- `cargo test -p codex-exec-server --test relay`
- `cargo test -p codex-exec-server` passed outside the sandbox. The
sandboxed run hit macOS `sandbox-exec: sandbox_apply: Operation not
permitted` in filesystem sandbox tests.
2026-05-12 16:50:45 -07:00

456 lines
16 KiB
Rust

use std::collections::HashMap;
use codex_app_server_protocol::JSONRPCMessage;
use futures::SinkExt;
use futures::StreamExt;
use prost::Message as ProstMessage;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Message;
use tracing::debug;
use tracing::warn;
use uuid::Uuid;
use crate::ExecServerError;
use crate::connection::CHANNEL_CAPACITY;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::connection::JsonRpcTransport;
use crate::relay_proto::RelayData;
use crate::relay_proto::RelayMessageFrame;
use crate::relay_proto::RelayResume;
use crate::relay_proto::relay_message_frame;
use crate::server::ConnectionProcessor;
const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum RelayFrameBodyKind {
Data,
Ack,
Resume,
Reset,
Heartbeat,
}
impl RelayMessageFrame {
fn data(stream_id: String, seq: u32, payload: Vec<u8>) -> Self {
Self {
version: RELAY_MESSAGE_FRAME_VERSION,
stream_id,
ack: 0,
ack_bits: 0,
body: Some(relay_message_frame::Body::Data(RelayData {
seq,
segment_index: 0,
segment_count: 1,
payload,
})),
}
}
fn resume(stream_id: String) -> Self {
Self {
version: RELAY_MESSAGE_FRAME_VERSION,
stream_id,
ack: 0,
ack_bits: 0,
body: Some(relay_message_frame::Body::Resume(RelayResume {
next_seq: 0,
})),
}
}
fn validate(&self) -> Result<RelayFrameBodyKind, ExecServerError> {
if self.version != RELAY_MESSAGE_FRAME_VERSION {
return Err(ExecServerError::Protocol(format!(
"unsupported relay message frame version {}",
self.version
)));
}
if self.stream_id.trim().is_empty() {
return Err(ExecServerError::Protocol(
"relay message frame is missing stream_id".to_string(),
));
}
match self.body.as_ref() {
Some(relay_message_frame::Body::Data(data)) => {
if data.segment_index != 0 || data.segment_count != 1 || data.payload.is_empty() {
return Err(ExecServerError::Protocol(
"relay data message frame is missing required fields".to_string(),
));
}
Ok(RelayFrameBodyKind::Data)
}
Some(relay_message_frame::Body::AckFrame(_)) => Ok(RelayFrameBodyKind::Ack),
Some(relay_message_frame::Body::Resume(_)) => Ok(RelayFrameBodyKind::Resume),
Some(relay_message_frame::Body::Reset(reset)) => {
if reset.reason.is_empty() {
return Err(ExecServerError::Protocol(
"relay reset message frame is missing reason".to_string(),
));
}
Ok(RelayFrameBodyKind::Reset)
}
Some(relay_message_frame::Body::Heartbeat(_)) => Ok(RelayFrameBodyKind::Heartbeat),
None => Err(ExecServerError::Protocol(
"relay message frame is missing body".to_string(),
)),
}
}
fn into_jsonrpc_message(self) -> Result<JSONRPCMessage, ExecServerError> {
let kind = self.validate()?;
if kind != RelayFrameBodyKind::Data {
return Err(ExecServerError::Protocol(
"expected relay data message frame".to_string(),
));
}
let payload = match self.body {
Some(relay_message_frame::Body::Data(data)) => data.payload,
_ => Vec::new(),
};
serde_json::from_slice(&payload).map_err(ExecServerError::Json)
}
fn into_reset_reason(self) -> Option<String> {
match self.body {
Some(relay_message_frame::Body::Reset(reset)) if !reset.reason.is_empty() => {
Some(reset.reason)
}
_ => None,
}
}
}
fn encode_relay_message_frame(frame: &RelayMessageFrame) -> Vec<u8> {
frame.encode_to_vec()
}
fn decode_relay_message_frame(payload: &[u8]) -> Result<RelayMessageFrame, ExecServerError> {
RelayMessageFrame::decode(payload)
.map_err(|err| ExecServerError::Protocol(format!("invalid relay message frame: {err}")))
}
fn jsonrpc_payload(message: &JSONRPCMessage) -> Result<Vec<u8>, ExecServerError> {
serde_json::to_vec(message).map_err(ExecServerError::Json)
}
pub(crate) fn harness_connection_from_websocket<S>(
stream: WebSocketStream<S>,
connection_label: String,
) -> JsonRpcConnection
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let stream_id = Uuid::new_v4().to_string();
let (mut websocket_writer, mut websocket_reader) = stream.split();
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (disconnected_tx, disconnected_rx) = watch::channel(false);
let reader_label = connection_label;
let reader_stream_id = stream_id.clone();
let incoming_tx_for_reader = incoming_tx;
let disconnected_tx_for_reader = disconnected_tx.clone();
let reader_task = tokio::spawn(async move {
loop {
match websocket_reader.next().await {
Some(Ok(Message::Binary(payload))) => {
let frame = match decode_relay_message_frame(payload.as_ref()) {
Ok(frame) => frame,
Err(err) => {
let _ = incoming_tx_for_reader
.send(JsonRpcConnectionEvent::MalformedMessage {
reason: format!(
"failed to parse relay message frame from {reader_label}: {err}"
),
})
.await;
continue;
}
};
if frame.stream_id != reader_stream_id {
continue;
}
let kind = match frame.validate() {
Ok(kind) => kind,
Err(err) => {
let _ = incoming_tx_for_reader
.send(JsonRpcConnectionEvent::MalformedMessage {
reason: err.to_string(),
})
.await;
continue;
}
};
match kind {
RelayFrameBodyKind::Data => match frame.into_jsonrpc_message() {
Ok(message) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
Err(err) => {
let _ = incoming_tx_for_reader
.send(JsonRpcConnectionEvent::MalformedMessage {
reason: err.to_string(),
})
.await;
}
},
RelayFrameBodyKind::Reset => {
let _ = disconnected_tx_for_reader.send(true);
let _ = incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Disconnected {
reason: frame.into_reset_reason(),
})
.await;
break;
}
RelayFrameBodyKind::Ack
| RelayFrameBodyKind::Resume
| RelayFrameBodyKind::Heartbeat => {}
}
}
Some(Ok(Message::Close(_))) | None => {
let _ = disconnected_tx_for_reader.send(true);
let _ = incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Disconnected { reason: None })
.await;
break;
}
Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => {}
Some(Ok(Message::Text(_))) => {
let _ = incoming_tx_for_reader
.send(JsonRpcConnectionEvent::MalformedMessage {
reason: "relay exec-server transport expects binary protobuf frames"
.to_string(),
})
.await;
}
Some(Err(err)) => {
let _ = disconnected_tx_for_reader.send(true);
let _ = incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Disconnected {
reason: Some(format!(
"failed to read relay websocket frame from {reader_label}: {err}"
)),
})
.await;
break;
}
}
}
});
let writer_task = tokio::spawn(async move {
let resume = RelayMessageFrame::resume(stream_id.clone());
if websocket_writer
.send(Message::Binary(encode_relay_message_frame(&resume).into()))
.await
.is_err()
{
let _ = disconnected_tx.send(true);
return;
}
let mut next_seq = 0u32;
while let Some(message) = outgoing_rx.recv().await {
let payload = match jsonrpc_payload(&message) {
Ok(payload) => payload,
Err(err) => {
warn!("failed to serialize JSON-RPC payload for relay transport: {err}");
break;
}
};
let frame = RelayMessageFrame::data(stream_id.clone(), next_seq, payload);
next_seq = next_seq.wrapping_add(1);
if websocket_writer
.send(Message::Binary(encode_relay_message_frame(&frame).into()))
.await
.is_err()
{
let _ = disconnected_tx.send(true);
break;
}
}
});
JsonRpcConnection {
outgoing_tx,
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
transport: JsonRpcTransport::Plain,
}
}
pub(crate) async fn run_multiplexed_executor<S>(
stream: WebSocketStream<S>,
processor: ConnectionProcessor,
) where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let (mut websocket_writer, mut websocket_reader) = stream.split();
let (physical_outgoing_tx, mut physical_outgoing_rx) =
mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
let writer_task = tokio::spawn(async move {
while let Some(encoded) = physical_outgoing_rx.recv().await {
if websocket_writer
.send(Message::Binary(encoded.into()))
.await
.is_err()
{
break;
}
}
});
let mut streams: HashMap<String, VirtualStream> = HashMap::new();
loop {
let frame = match websocket_reader.next().await {
Some(Ok(Message::Binary(payload))) => {
match decode_relay_message_frame(payload.as_ref()) {
Ok(frame) => frame,
Err(err) => {
warn!("dropping malformed relay message frame from harness: {err}");
continue;
}
}
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => continue,
Some(Ok(Message::Text(_))) => {
warn!("dropping non-binary relay message frame from harness");
continue;
}
Some(Err(err)) => {
debug!("multiplexed executor websocket read failed: {err}");
break;
}
};
let kind = match frame.validate() {
Ok(kind) => kind,
Err(err) => {
warn!("dropping invalid relay message frame: {err}");
continue;
}
};
match kind {
RelayFrameBodyKind::Data => {
let stream_id = frame.stream_id.clone();
let message = match frame.into_jsonrpc_message() {
Ok(message) => message,
Err(err) => {
warn!("dropping malformed relay data message frame: {err}");
continue;
}
};
let stream = streams.entry(stream_id.clone()).or_insert_with(|| {
spawn_virtual_stream(
stream_id.clone(),
processor.clone(),
physical_outgoing_tx.clone(),
)
});
if stream
.incoming_tx
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
streams.remove(&stream_id);
}
}
RelayFrameBodyKind::Reset => {
if let Some(stream) = streams.remove(&frame.stream_id) {
stream.disconnect(frame.into_reset_reason()).await;
}
}
RelayFrameBodyKind::Ack
| RelayFrameBodyKind::Resume
| RelayFrameBodyKind::Heartbeat => {}
}
}
for (_stream_id, stream) in streams {
stream.disconnect(/*reason*/ None).await;
}
drop(physical_outgoing_tx);
let _ = writer_task.await;
}
struct VirtualStream {
incoming_tx: mpsc::Sender<JsonRpcConnectionEvent>,
disconnected_tx: watch::Sender<bool>,
}
impl VirtualStream {
async fn disconnect(self, reason: Option<String>) {
let _ = self.disconnected_tx.send(true);
let _ = self
.incoming_tx
.send(JsonRpcConnectionEvent::Disconnected { reason })
.await;
}
}
fn spawn_virtual_stream(
stream_id: String,
processor: ConnectionProcessor,
physical_outgoing_tx: mpsc::Sender<Vec<u8>>,
) -> VirtualStream {
let (json_outgoing_tx, mut json_outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (disconnected_tx, disconnected_rx) = watch::channel(false);
let writer_stream_id = stream_id;
let writer_task = tokio::spawn(async move {
let mut next_seq = 0u32;
while let Some(message) = json_outgoing_rx.recv().await {
let payload = match jsonrpc_payload(&message) {
Ok(payload) => payload,
Err(err) => {
warn!("failed to serialize virtual stream JSON-RPC payload: {err}");
break;
}
};
let frame = RelayMessageFrame::data(writer_stream_id.clone(), next_seq, payload);
next_seq = next_seq.wrapping_add(1);
if physical_outgoing_tx
.send(encode_relay_message_frame(&frame))
.await
.is_err()
{
break;
}
}
});
let connection = JsonRpcConnection {
outgoing_tx: json_outgoing_tx,
incoming_rx,
disconnected_rx,
task_handles: vec![writer_task],
transport: JsonRpcTransport::Plain,
};
tokio::spawn(async move {
processor.run_connection(connection).await;
});
VirtualStream {
incoming_tx,
disconnected_tx,
}
}