From 211ce0955df212e299ef45c30c4f2204c86b8174 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Tue, 12 May 2026 11:29:03 -0700 Subject: [PATCH] feat(exec-server): use proto relay frames --- codex-rs/Cargo.lock | 1 + codex-rs/exec-server/Cargo.toml | 1 + codex-rs/exec-server/README.md | 60 +++- codex-rs/exec-server/src/lib.rs | 1 + .../proto/codex.exec_server.relay.v1.proto | 37 +++ .../src/proto/codex.exec_server.relay.v1.rs | 54 ++++ codex-rs/exec-server/src/relay.rs | 261 +++++++++--------- codex-rs/exec-server/src/relay_proto.rs | 7 + 8 files changed, 287 insertions(+), 135 deletions(-) create mode 100644 codex-rs/exec-server/src/proto/codex.exec_server.relay.v1.proto create mode 100644 codex-rs/exec-server/src/proto/codex.exec_server.relay.v1.rs create mode 100644 codex-rs/exec-server/src/relay_proto.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index bbb61f8337..ddc6bc829b 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2740,6 +2740,7 @@ dependencies = [ "ctor 0.6.3", "futures", "pretty_assertions", + "prost 0.14.3", "reqwest", "serde", "serde_json", diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index 936fa412f1..09a9a71ea0 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -26,6 +26,7 @@ codex-utils-pty = { workspace = true } codex-utils-rustls-provider = { workspace = true } futures = { workspace = true } reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] } +prost = "0.14.3" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index 81664eaca0..1eaf6e69eb 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -30,7 +30,65 @@ It requires a bearer token in `CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN`. Wire framing: -- websocket: one JSON-RPC message per websocket text frame +- local websocket: one JSON-RPC message per websocket frame +- remote websocket: binary protobuf relay frames carrying JSON-RPC payloads + +## Remote Relay Message Format + +In remote mode, the harness and executor communicate through rendezvous using +`codex.exec_server.relay.v1.RelayMessageFrame`; the checked-in schema is in +`src/proto/codex.exec_server.relay.v1.proto`. The relay frame carries stream +identity plus endpoint-owned reliability metadata: + +```text +version +stream_id +body // data | ack_frame | resume | reset | heartbeat +ack // highest contiguous peer segment seq received +ack_bits // bitset for peer segment seqs after ack +seq // data only: segment sequence number +segment_index // data only: 0-based index within message +segment_count // data only: number of segments in message +payload // data only: JSON-RPC message bytes or segment bytes +next_seq // resume only: next sender seq +reason // reset only: reset reason +``` + +`stream_id` identifies one virtual harness/executor JSON-RPC session on the +executor websocket. The harness generates a UUIDv4 `stream_id`; the executor +demuxes frames by `stream_id` and runs an independent `ConnectionProcessor` per +stream. + +Use segment-level sequence numbers for reliability: + +```text +seq = 0, 1, 2, 3, ... +``` + +Use contiguous segment sequence ranges to identify and stitch a segmented +application message: + +```text +message_start_seq = seq - segment_index +segment_index = 0 +segment_count = 1 +``` + +`message_start_seq` is derived by the receiver, not sent on the wire. For +unsplit messages, `message_start_seq == seq`, `segment_index == 0`, and +`segment_count == 1`. + +Use cumulative `ack` plus fixed-size `ack_bits` instead of variable ack ranges: + +```text +ack = highest contiguous received segment seq +bit i in ack_bits acknowledges seq = ack + 1 + i +``` + +Send `ack` and `ack_bits` redundantly on every outbound frame. Acks are not +themselves acked. Acks, retries, duplicate suppression, segmentation, and +reassembly are endpoint responsibilities; rendezvous only routes relay frames +by `stream_id`. ## Lifecycle diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 9fbca92d7c..872f16ce32 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -14,6 +14,7 @@ mod process; mod process_id; mod protocol; mod relay; +mod relay_proto; mod remote; mod remote_file_system; mod remote_process; diff --git a/codex-rs/exec-server/src/proto/codex.exec_server.relay.v1.proto b/codex-rs/exec-server/src/proto/codex.exec_server.relay.v1.proto new file mode 100644 index 0000000000..46527d80cc --- /dev/null +++ b/codex-rs/exec-server/src/proto/codex.exec_server.relay.v1.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +package codex.exec_server.relay.v1; + +message RelayMessageFrame { + uint32 version = 1; + string stream_id = 2; + uint32 ack = 3; + uint32 ack_bits = 4; + + oneof body { + RelayData data = 5; + RelayAck ack_frame = 6; + RelayResume resume = 7; + RelayReset reset = 8; + RelayHeartbeat heartbeat = 9; + } +} + +message RelayData { + uint32 seq = 1; + uint32 segment_index = 2; + uint32 segment_count = 3; + bytes payload = 4; +} + +message RelayAck {} + +message RelayResume { + uint32 next_seq = 1; +} + +message RelayReset { + string reason = 1; +} + +message RelayHeartbeat {} diff --git a/codex-rs/exec-server/src/proto/codex.exec_server.relay.v1.rs b/codex-rs/exec-server/src/proto/codex.exec_server.relay.v1.rs new file mode 100644 index 0000000000..072a003dac --- /dev/null +++ b/codex-rs/exec-server/src/proto/codex.exec_server.relay.v1.rs @@ -0,0 +1,54 @@ +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RelayMessageFrame { + #[prost(uint32, tag = "1")] + pub version: u32, + #[prost(string, tag = "2")] + pub stream_id: ::prost::alloc::string::String, + #[prost(uint32, tag = "3")] + pub ack: u32, + #[prost(uint32, tag = "4")] + pub ack_bits: u32, + #[prost(oneof = "relay_message_frame::Body", tags = "5, 6, 7, 8, 9")] + pub body: ::core::option::Option, +} +pub mod relay_message_frame { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Body { + #[prost(message, tag = "5")] + Data(super::RelayData), + #[prost(message, tag = "6")] + AckFrame(super::RelayAck), + #[prost(message, tag = "7")] + Resume(super::RelayResume), + #[prost(message, tag = "8")] + Reset(super::RelayReset), + #[prost(message, tag = "9")] + Heartbeat(super::RelayHeartbeat), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RelayData { + #[prost(uint32, tag = "1")] + pub seq: u32, + #[prost(uint32, tag = "2")] + pub segment_index: u32, + #[prost(uint32, tag = "3")] + pub segment_count: u32, + #[prost(bytes = "vec", tag = "4")] + pub payload: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct RelayAck {} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct RelayResume { + #[prost(uint32, tag = "1")] + pub next_seq: u32, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct RelayReset { + #[prost(string, tag = "1")] + pub reason: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct RelayHeartbeat {} diff --git a/codex-rs/exec-server/src/relay.rs b/codex-rs/exec-server/src/relay.rs index ac1dc1bc67..bce787cfc2 100644 --- a/codex-rs/exec-server/src/relay.rs +++ b/codex-rs/exec-server/src/relay.rs @@ -1,12 +1,9 @@ use std::collections::HashMap; -use base64::Engine; -use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use codex_app_server_protocol::JSONRPCMessage; use futures::SinkExt; use futures::StreamExt; -use serde::Deserialize; -use serde::Serialize; +use prost::Message as ProstMessage; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::sync::mpsc; @@ -22,12 +19,16 @@ use crate::connection::CHANNEL_CAPACITY; use crate::connection::JsonRpcConnection; use crate::connection::JsonRpcConnectionEvent; use crate::connection::JsonRpcTransport; +use crate::relay_proto::RelayData; +use crate::relay_proto::RelayMessageFrame; +use crate::relay_proto::RelayResume; +use crate::relay_proto::relay_message_frame; use crate::server::ConnectionProcessor; const RELAY_MESSAGE_FRAME_VERSION: u32 = 1; -#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)] -#[serde(rename_all = "snake_case")] -enum RelayMessageFrameKind { + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +enum RelayFrameBodyKind { Data, Ack, Resume, @@ -35,39 +36,19 @@ enum RelayMessageFrameKind { Heartbeat, } -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -struct RelayMessageFrame { - version: u32, - stream_id: String, - kind: RelayMessageFrameKind, - ack: u32, - ack_bits: u32, - #[serde(skip_serializing_if = "Option::is_none")] - seq: Option, - #[serde(skip_serializing_if = "Option::is_none")] - segment_index: Option, - #[serde(skip_serializing_if = "Option::is_none")] - segment_count: Option, - #[serde(skip_serializing_if = "Option::is_none")] - payload_base64: Option, - #[serde(skip_serializing_if = "Option::is_none")] - reason: Option, -} - impl RelayMessageFrame { fn data(stream_id: String, seq: u32, payload: Vec) -> Self { Self { version: RELAY_MESSAGE_FRAME_VERSION, stream_id, - kind: RelayMessageFrameKind::Data, ack: 0, ack_bits: 0, - seq: Some(seq), - segment_index: Some(0), - segment_count: Some(1), - payload_base64: Some(BASE64_STANDARD.encode(payload)), - reason: None, + body: Some(relay_message_frame::Body::Data(RelayData { + seq, + segment_index: 0, + segment_count: 1, + payload, + })), } } @@ -75,18 +56,15 @@ impl RelayMessageFrame { Self { version: RELAY_MESSAGE_FRAME_VERSION, stream_id, - kind: RelayMessageFrameKind::Resume, ack: 0, ack_bits: 0, - seq: None, - segment_index: None, - segment_count: None, - payload_base64: None, - reason: None, + body: Some(relay_message_frame::Body::Resume(RelayResume { + next_seq: 0, + })), } } - fn validate(&self) -> Result<(), ExecServerError> { + fn validate(&self) -> Result { if self.version != RELAY_MESSAGE_FRAME_VERSION { return Err(ExecServerError::Protocol(format!( "unsupported relay message frame version {}", @@ -95,43 +73,66 @@ impl RelayMessageFrame { } if self.stream_id.trim().is_empty() { return Err(ExecServerError::Protocol( - "relay message frame is missing streamId".to_string(), + "relay message frame is missing stream_id".to_string(), )); } - if self.kind == RelayMessageFrameKind::Data - && (self.seq.is_none() - || self.segment_index != Some(0) - || self.segment_count != Some(1) - || self.payload_base64.is_none()) - { - return Err(ExecServerError::Protocol( - "relay data message frame is missing required fields".to_string(), - )); + match self.body.as_ref() { + Some(relay_message_frame::Body::Data(data)) => { + if data.segment_index != 0 || data.segment_count != 1 || data.payload.is_empty() { + return Err(ExecServerError::Protocol( + "relay data message frame is missing required fields".to_string(), + )); + } + Ok(RelayFrameBodyKind::Data) + } + Some(relay_message_frame::Body::AckFrame(_)) => Ok(RelayFrameBodyKind::Ack), + Some(relay_message_frame::Body::Resume(_)) => Ok(RelayFrameBodyKind::Resume), + Some(relay_message_frame::Body::Reset(reset)) => { + if reset.reason.is_empty() { + return Err(ExecServerError::Protocol( + "relay reset message frame is missing reason".to_string(), + )); + } + Ok(RelayFrameBodyKind::Reset) + } + Some(relay_message_frame::Body::Heartbeat(_)) => Ok(RelayFrameBodyKind::Heartbeat), + None => Err(ExecServerError::Protocol( + "relay message frame is missing body".to_string(), + )), } - if self.kind == RelayMessageFrameKind::Reset && self.reason.is_none() { - return Err(ExecServerError::Protocol( - "relay reset message frame is missing reason".to_string(), - )); - } - Ok(()) } fn into_jsonrpc_message(self) -> Result { - self.validate()?; - if self.kind != RelayMessageFrameKind::Data { + let kind = self.validate()?; + if kind != RelayFrameBodyKind::Data { return Err(ExecServerError::Protocol( "expected relay data message frame".to_string(), )); } - let payload = BASE64_STANDARD - .decode(self.payload_base64.unwrap_or_default()) - .map_err(|err| ExecServerError::Protocol(format!("invalid payloadBase64: {err}")))?; + let payload = match self.body { + Some(relay_message_frame::Body::Data(data)) => data.payload, + _ => Vec::new(), + }; serde_json::from_slice(&payload).map_err(ExecServerError::Json) } + + fn into_reset_reason(self) -> Option { + match self.body { + Some(relay_message_frame::Body::Reset(reset)) if !reset.reason.is_empty() => { + Some(reset.reason) + } + _ => None, + } + } } -fn serialize_relay_message_frame(frame: &RelayMessageFrame) -> Result { - serde_json::to_string(frame).map_err(ExecServerError::Json) +fn encode_relay_message_frame(frame: &RelayMessageFrame) -> Vec { + frame.encode_to_vec() +} + +fn decode_relay_message_frame(payload: &[u8]) -> Result { + RelayMessageFrame::decode(payload) + .map_err(|err| ExecServerError::Protocol(format!("invalid relay message frame: {err}"))) } fn jsonrpc_payload(message: &JSONRPCMessage) -> Result, ExecServerError> { @@ -158,8 +159,8 @@ where let reader_task = tokio::spawn(async move { loop { match websocket_reader.next().await { - Some(Ok(Message::Text(text))) => { - let frame = match serde_json::from_str::(text.as_ref()) { + Some(Ok(Message::Binary(payload))) => { + let frame = match decode_relay_message_frame(payload.as_ref()) { Ok(frame) => frame, Err(err) => { let _ = incoming_tx_for_reader @@ -175,8 +176,19 @@ where if frame.stream_id != reader_stream_id { continue; } - match frame.kind { - RelayMessageFrameKind::Data => match frame.into_jsonrpc_message() { + let kind = match frame.validate() { + Ok(kind) => kind, + Err(err) => { + let _ = incoming_tx_for_reader + .send(JsonRpcConnectionEvent::MalformedMessage { + reason: err.to_string(), + }) + .await; + continue; + } + }; + match kind { + RelayFrameBodyKind::Data => match frame.into_jsonrpc_message() { Ok(message) => { if incoming_tx_for_reader .send(JsonRpcConnectionEvent::Message(message)) @@ -194,18 +206,18 @@ where .await; } }, - RelayMessageFrameKind::Reset => { + RelayFrameBodyKind::Reset => { let _ = disconnected_tx_for_reader.send(true); let _ = incoming_tx_for_reader .send(JsonRpcConnectionEvent::Disconnected { - reason: frame.reason, + reason: frame.into_reset_reason(), }) .await; break; } - RelayMessageFrameKind::Ack - | RelayMessageFrameKind::Resume - | RelayMessageFrameKind::Heartbeat => {} + RelayFrameBodyKind::Ack + | RelayFrameBodyKind::Resume + | RelayFrameBodyKind::Heartbeat => {} } } Some(Ok(Message::Close(_))) | None => { @@ -216,10 +228,10 @@ where break; } Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => {} - Some(Ok(Message::Binary(_))) => { + Some(Ok(Message::Text(_))) => { let _ = incoming_tx_for_reader .send(JsonRpcConnectionEvent::MalformedMessage { - reason: "relay exec-server transport expects JSON text frames" + reason: "relay exec-server transport expects binary protobuf frames" .to_string(), }) .await; @@ -241,22 +253,13 @@ where let writer_task = tokio::spawn(async move { let resume = RelayMessageFrame::resume(stream_id.clone()); - match serialize_relay_message_frame(&resume) { - Ok(encoded) => { - if websocket_writer - .send(Message::Text(encoded.into())) - .await - .is_err() - { - let _ = disconnected_tx.send(true); - return; - } - } - Err(err) => { - warn!("failed to serialize relay resume frame: {err}"); - let _ = disconnected_tx.send(true); - return; - } + if websocket_writer + .send(Message::Binary(encode_relay_message_frame(&resume).into())) + .await + .is_err() + { + let _ = disconnected_tx.send(true); + return; } let mut next_seq = 0u32; @@ -270,22 +273,13 @@ where }; let frame = RelayMessageFrame::data(stream_id.clone(), next_seq, payload); next_seq = next_seq.wrapping_add(1); - match serialize_relay_message_frame(&frame) { - Ok(encoded) => { - if websocket_writer - .send(Message::Text(encoded.into())) - .await - .is_err() - { - let _ = disconnected_tx.send(true); - break; - } - } - Err(err) => { - warn!("failed to serialize relay data message frame: {err}"); - let _ = disconnected_tx.send(true); - break; - } + if websocket_writer + .send(Message::Binary(encode_relay_message_frame(&frame).into())) + .await + .is_err() + { + let _ = disconnected_tx.send(true); + break; } } }); @@ -307,11 +301,11 @@ pub(crate) async fn run_multiplexed_executor( { let (mut websocket_writer, mut websocket_reader) = stream.split(); let (physical_outgoing_tx, mut physical_outgoing_rx) = - mpsc::channel::(CHANNEL_CAPACITY); + mpsc::channel::>(CHANNEL_CAPACITY); let writer_task = tokio::spawn(async move { while let Some(encoded) = physical_outgoing_rx.recv().await { if websocket_writer - .send(Message::Text(encoded.into())) + .send(Message::Binary(encoded.into())) .await .is_err() { @@ -323,8 +317,8 @@ pub(crate) async fn run_multiplexed_executor( let mut streams: HashMap = HashMap::new(); loop { let frame = match websocket_reader.next().await { - Some(Ok(Message::Text(text))) => { - match serde_json::from_str::(text.as_ref()) { + Some(Ok(Message::Binary(payload))) => { + match decode_relay_message_frame(payload.as_ref()) { Ok(frame) => frame, Err(err) => { warn!("dropping malformed relay message frame from harness: {err}"); @@ -334,8 +328,8 @@ pub(crate) async fn run_multiplexed_executor( } Some(Ok(Message::Close(_))) | None => break, Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => continue, - Some(Ok(Message::Binary(_))) => { - warn!("dropping non-text relay message frame from harness"); + Some(Ok(Message::Text(_))) => { + warn!("dropping non-binary relay message frame from harness"); continue; } Some(Err(err)) => { @@ -344,13 +338,16 @@ pub(crate) async fn run_multiplexed_executor( } }; - if let Err(err) = frame.validate() { - warn!("dropping invalid relay message frame: {err}"); - continue; - } + let kind = match frame.validate() { + Ok(kind) => kind, + Err(err) => { + warn!("dropping invalid relay message frame: {err}"); + continue; + } + }; - match frame.kind { - RelayMessageFrameKind::Data => { + match kind { + RelayFrameBodyKind::Data => { let stream_id = frame.stream_id.clone(); let message = match frame.into_jsonrpc_message() { Ok(message) => message, @@ -375,14 +372,14 @@ pub(crate) async fn run_multiplexed_executor( streams.remove(&stream_id); } } - RelayMessageFrameKind::Reset => { + RelayFrameBodyKind::Reset => { if let Some(stream) = streams.remove(&frame.stream_id) { - stream.disconnect(frame.reason).await; + stream.disconnect(frame.into_reset_reason()).await; } } - RelayMessageFrameKind::Ack - | RelayMessageFrameKind::Resume - | RelayMessageFrameKind::Heartbeat => {} + RelayFrameBodyKind::Ack + | RelayFrameBodyKind::Resume + | RelayFrameBodyKind::Heartbeat => {} } } @@ -411,7 +408,7 @@ impl VirtualStream { fn spawn_virtual_stream( stream_id: String, processor: ConnectionProcessor, - physical_outgoing_tx: mpsc::Sender, + physical_outgoing_tx: mpsc::Sender>, ) -> VirtualStream { let (json_outgoing_tx, mut json_outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY); let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -430,16 +427,12 @@ fn spawn_virtual_stream( }; let frame = RelayMessageFrame::data(writer_stream_id.clone(), next_seq, payload); next_seq = next_seq.wrapping_add(1); - match serialize_relay_message_frame(&frame) { - Ok(encoded) => { - if physical_outgoing_tx.send(encoded).await.is_err() { - break; - } - } - Err(err) => { - warn!("failed to serialize virtual stream relay message frame: {err}"); - break; - } + if physical_outgoing_tx + .send(encode_relay_message_frame(&frame)) + .await + .is_err() + { + break; } } }); diff --git a/codex-rs/exec-server/src/relay_proto.rs b/codex-rs/exec-server/src/relay_proto.rs new file mode 100644 index 0000000000..b8a938b8c7 --- /dev/null +++ b/codex-rs/exec-server/src/relay_proto.rs @@ -0,0 +1,7 @@ +#[path = "proto/codex.exec_server.relay.v1.rs"] +mod generated; + +pub(crate) use generated::RelayData; +pub(crate) use generated::RelayMessageFrame; +pub(crate) use generated::RelayResume; +pub(crate) use generated::relay_message_frame;