Files
codex/codex-rs/exec-server/tests/relay.rs
Anton Panasenko ac466c0dbd feat(exec-server): use protobuf relay frames (#22343)
## Why

Remote exec-server now needs one executor websocket to serve multiple
harness JSON-RPC sessions. Rendezvous routes by `stream_id`, and the
exec-server side needs to use the same stable relay frame contract
instead of a hand-rolled JSON shape.

The relay protocol also needs to make ownership boundaries clear:
harness and executor endpoints own sequencing, acks, retries, duplicate
suppression, segmentation, and reassembly; rendezvous only routes
frames.

## What Changed

- Add the checked-in `codex.exec_server.relay.v1.RelayMessageFrame`
proto plus generated prost bindings for `codex-exec-server`.
- Encode remote harness/executor relay traffic as binary protobuf
websocket frames while keeping local websocket JSON-RPC unchanged.
- Demux executor-side relay streams into independent
`ConnectionProcessor` sessions keyed by `stream_id`.
- Add a programmatic `RemoteExecutorConfig::with_bearer_token(...)`
constructor for non-CLI callers and integration tests.
- Add an integration test that starts the remote executor against a fake
registry/rendezvous websocket and verifies two virtual streams share one
executor websocket without cross-talk, including per-stream reset
behavior.
- Document the remote relay envelope, sequence ranges, `ack`/`ack_bits`,
and endpoint responsibilities in `exec-server/README.md`.

## Verification

- `cargo test -p codex-exec-server --test relay
multiplexed_remote_executor_routes_independent_virtual_streams --
--exact`
- `cargo test -p codex-exec-server --test relay`
- `cargo test -p codex-exec-server` passed outside the sandbox. The
sandboxed run hit macOS `sandbox-exec: sandbox_apply: Operation not
permitted` in filesystem sandbox tests.
2026-05-12 16:50:45 -07:00

354 lines
10 KiB
Rust

mod common;
#[path = "../src/proto/codex.exec_server.relay.v1.rs"]
mod relay_proto;
use std::collections::HashMap;
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_exec_server::ExecServerRuntimePaths;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use codex_exec_server::RemoteExecutorConfig;
use futures::SinkExt;
use futures::StreamExt;
use pretty_assertions::assert_eq;
use prost::Message as ProstMessage;
use relay_proto::RelayData;
use relay_proto::RelayMessageFrame;
use relay_proto::RelayReset;
use relay_proto::relay_message_frame;
use tokio::net::TcpListener;
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
use uuid::Uuid;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
const EXECUTOR_ID: &str = "exec-mux-test";
const REGISTRY_TOKEN: &str = "registry-token";
const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multiplexed_remote_executor_routes_independent_virtual_streams() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let rendezvous_url = format!("ws://{}", listener.local_addr()?);
let registry = MockServer::start().await;
Mock::given(method("POST"))
.and(path(format!("/cloud/executor/{EXECUTOR_ID}/register")))
.and(header("authorization", format!("Bearer {REGISTRY_TOKEN}")))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"executor_id": EXECUTOR_ID,
"url": rendezvous_url,
})))
.mount(&registry)
.await;
let (codex_exe, codex_linux_sandbox_exe) = common::current_test_binary_helper_paths()?;
let runtime_paths = ExecServerRuntimePaths::new(codex_exe, codex_linux_sandbox_exe)?;
let config = RemoteExecutorConfig::with_bearer_token(
registry.uri(),
EXECUTOR_ID.to_string(),
REGISTRY_TOKEN.to_string(),
)?;
let remote_executor = tokio::spawn(codex_exec_server::run_remote_executor(
config,
runtime_paths,
));
let (socket, _peer_addr) = timeout(TEST_TIMEOUT, listener.accept())
.await
.context("remote executor should connect to fake rendezvous")??;
let mut websocket = timeout(TEST_TIMEOUT, accept_async(socket))
.await
.context("fake rendezvous should accept executor websocket")??;
let stream_a = "stream-a";
let stream_b = "stream-b";
send_relay_message(
&mut websocket,
stream_a,
/*seq*/ 0,
initialize_request(/*id*/ 1, "relay-test-a")?,
)
.await?;
send_relay_message(
&mut websocket,
stream_b,
/*seq*/ 0,
initialize_request(/*id*/ 1, "relay-test-b")?,
)
.await?;
let initialize_responses = read_relay_messages_by_stream(&mut websocket, /*count*/ 2).await?;
let session_a =
assert_initialize_response(initialize_responses.get(stream_a), stream_a, /*id*/ 1)?;
let session_b =
assert_initialize_response(initialize_responses.get(stream_b), stream_b, /*id*/ 1)?;
assert_ne!(session_a, session_b);
send_relay_message(
&mut websocket,
stream_a,
/*seq*/ 1,
notification("initialized", serde_json::json!({})),
)
.await?;
send_relay_message(
&mut websocket,
stream_b,
/*seq*/ 1,
notification("initialized", serde_json::json!({})),
)
.await?;
send_relay_message(
&mut websocket,
stream_a,
/*seq*/ 2,
request(/*id*/ 2, "test/unknown-a", serde_json::json!({})),
)
.await?;
send_relay_message(
&mut websocket,
stream_b,
/*seq*/ 2,
request(/*id*/ 2, "test/unknown-b", serde_json::json!({})),
)
.await?;
let unknown_method_responses =
read_relay_messages_by_stream(&mut websocket, /*count*/ 2).await?;
assert_error_response(
unknown_method_responses.get(stream_a),
stream_a,
/*id*/ 2,
"test/unknown-a",
)?;
assert_error_response(
unknown_method_responses.get(stream_b),
stream_b,
/*id*/ 2,
"test/unknown-b",
)?;
send_relay_reset(&mut websocket, stream_a, "test_reset").await?;
send_relay_message(
&mut websocket,
stream_b,
/*seq*/ 3,
request(
/*id*/ 3,
"test/unknown-b-after-reset",
serde_json::json!({}),
),
)
.await?;
let (stream_id, message) = read_relay_message(&mut websocket).await?;
assert_eq!(stream_id, stream_b);
assert_error_response(
Some(&message),
stream_b,
/*id*/ 3,
"test/unknown-b-after-reset",
)?;
websocket.close(None).await?;
remote_executor.abort();
let _ = remote_executor.await;
Ok(())
}
async fn send_relay_message<S>(
websocket: &mut WebSocketStream<S>,
stream_id: &str,
seq: u32,
message: JSONRPCMessage,
) -> Result<()>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let payload = serde_json::to_vec(&message)?;
let frame = RelayMessageFrame {
version: RELAY_MESSAGE_FRAME_VERSION,
stream_id: stream_id.to_string(),
ack: 0,
ack_bits: 0,
body: Some(relay_message_frame::Body::Data(RelayData {
seq,
segment_index: 0,
segment_count: 1,
payload,
})),
};
send_relay_frame(websocket, frame).await
}
async fn send_relay_reset<S>(
websocket: &mut WebSocketStream<S>,
stream_id: &str,
reason: &str,
) -> Result<()>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
send_relay_frame(
websocket,
RelayMessageFrame {
version: RELAY_MESSAGE_FRAME_VERSION,
stream_id: stream_id.to_string(),
ack: 0,
ack_bits: 0,
body: Some(relay_message_frame::Body::Reset(RelayReset {
reason: reason.to_string(),
})),
},
)
.await
}
async fn send_relay_frame<S>(
websocket: &mut WebSocketStream<S>,
frame: RelayMessageFrame,
) -> Result<()>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
websocket
.send(Message::Binary(frame.encode_to_vec().into()))
.await?;
Ok(())
}
async fn read_relay_messages_by_stream<S>(
websocket: &mut WebSocketStream<S>,
count: usize,
) -> Result<HashMap<String, JSONRPCMessage>>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let mut messages = HashMap::new();
for _ in 0..count {
let (stream_id, message) = read_relay_message(websocket).await?;
if messages.insert(stream_id.clone(), message).is_some() {
bail!("received duplicate response for stream {stream_id}");
}
}
Ok(messages)
}
async fn read_relay_message<S>(
websocket: &mut WebSocketStream<S>,
) -> Result<(String, JSONRPCMessage)>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
loop {
let frame = timeout(TEST_TIMEOUT, websocket.next())
.await
.context("timed out waiting for relay frame")?
.ok_or_else(|| anyhow!("executor websocket closed"))??;
match frame {
Message::Binary(bytes) => {
let frame = RelayMessageFrame::decode(bytes.as_ref())?;
let stream_id = frame.stream_id;
let Some(relay_message_frame::Body::Data(data)) = frame.body else {
continue;
};
let message = serde_json::from_slice(&data.payload)?;
return Ok((stream_id, message));
}
Message::Ping(_) | Message::Pong(_) => {}
Message::Close(_) => bail!("executor websocket closed"),
Message::Text(_) => bail!("executor sent text frame on relay websocket"),
Message::Frame(_) => {}
}
}
}
fn initialize_request(id: i64, client_name: &str) -> Result<JSONRPCMessage> {
Ok(request(
id,
"initialize",
serde_json::to_value(InitializeParams {
client_name: client_name.to_string(),
resume_session_id: None,
})?,
))
}
fn request(id: i64, method: &str, params: serde_json::Value) -> JSONRPCMessage {
JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(id),
method: method.to_string(),
params: Some(params),
trace: None,
})
}
fn notification(method: &str, params: serde_json::Value) -> JSONRPCMessage {
JSONRPCMessage::Notification(JSONRPCNotification {
method: method.to_string(),
params: Some(params),
})
}
fn assert_initialize_response(
message: Option<&JSONRPCMessage>,
stream_id: &str,
id: i64,
) -> Result<Uuid> {
let message = message.ok_or_else(|| anyhow!("missing initialize response for {stream_id}"))?;
let JSONRPCMessage::Response(JSONRPCResponse {
id: response_id,
result,
}) = message
else {
bail!("expected initialize response for {stream_id}, got {message:?}");
};
assert_eq!(response_id, &RequestId::Integer(id));
let response: InitializeResponse = serde_json::from_value(result.clone())?;
Ok(Uuid::parse_str(&response.session_id)?)
}
fn assert_error_response(
message: Option<&JSONRPCMessage>,
stream_id: &str,
id: i64,
expected_method: &str,
) -> Result<()> {
let message = message.ok_or_else(|| anyhow!("missing error response for {stream_id}"))?;
let JSONRPCMessage::Error(JSONRPCError {
id: response_id,
error,
}) = message
else {
bail!("expected error response for {stream_id}, got {message:?}");
};
assert_eq!(response_id, &RequestId::Integer(id));
assert!(
error.message.contains(expected_method),
"expected error for {stream_id} to mention {expected_method}, got {}",
error.message
);
Ok(())
}