From 3673b69a2ab5f10edaf10ba8e36db29463198840 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Mon, 18 May 2026 10:31:48 -0700 Subject: [PATCH] Add exec-server websocket pump tests --- codex-rs/exec-server/src/connection.rs | 58 +++++++++++++++++ codex-rs/exec-server/src/relay.rs | 87 ++++++++++++++++++++++++++ 2 files changed, 145 insertions(+) diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index 0974a21c3a..ca1b70b85d 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -610,6 +610,8 @@ fn serialize_jsonrpc_message(message: &JSONRPCMessage) -> Result anyhow::Result<()> { + let (client_websocket, mut server_websocket) = websocket_pair().await?; + let mut connection = JsonRpcConnection::from_websocket(client_websocket, "test".into()); + + server_websocket + .send(Message::Pong(b"check".to_vec().into())) + .await?; + assert!( + timeout(Duration::from_millis(50), connection.incoming_rx.recv()) + .await + .is_err() + ); + + drop(connection); + Ok(()) + } + + #[tokio::test] + async fn websocket_connection_reports_server_close() -> anyhow::Result<()> { + let (client_websocket, mut server_websocket) = websocket_pair().await?; + let mut connection = JsonRpcConnection::from_websocket(client_websocket, "test".into()); + + server_websocket.close(None).await?; + assert!(matches!( + timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?, + Some(JsonRpcConnectionEvent::Disconnected { reason: None }) + )); + + drop(connection); + Ok(()) + } + + #[tokio::test] + async fn websocket_connection_accepts_binary_jsonrpc_message() -> anyhow::Result<()> { + let (client_websocket, mut server_websocket) = websocket_pair().await?; + let mut connection = JsonRpcConnection::from_websocket(client_websocket, "test".into()); + let message = JSONRPCMessage::Request(JSONRPCRequest { + id: RequestId::Integer(1), + method: "test".to_string(), + params: None, + trace: None, + }); + + server_websocket + .send(Message::Binary(serde_json::to_vec(&message)?.into())) + .await?; + assert!(matches!( + timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?, + Some(JsonRpcConnectionEvent::Message(actual)) if actual == message + )); + + drop(connection); + Ok(()) + } + async fn websocket_pair() -> anyhow::Result<( WebSocketStream>, WebSocketStream, diff --git a/codex-rs/exec-server/src/relay.rs b/codex-rs/exec-server/src/relay.rs index 471a2e658e..6a6fef9d62 100644 --- a/codex-rs/exec-server/src/relay.rs +++ b/codex-rs/exec-server/src/relay.rs @@ -471,6 +471,8 @@ fn spawn_virtual_stream( mod tests { use std::time::Duration; + use codex_app_server_protocol::JSONRPCRequest; + use codex_app_server_protocol::RequestId; use tokio::net::TcpListener; use tokio::time::timeout; use tokio_tungstenite::accept_async; @@ -519,6 +521,68 @@ mod tests { Ok(()) } + #[tokio::test] + async fn harness_connection_receives_relay_data() -> anyhow::Result<()> { + let (client_websocket, mut server_websocket) = websocket_pair().await?; + let mut connection = + harness_connection_from_websocket(client_websocket, "test".to_string()); + let stream_id = read_resume_stream_id(&mut server_websocket).await?; + let message = test_jsonrpc_message(); + + server_websocket + .send(Message::Binary( + encode_relay_message_frame(&RelayMessageFrame::data( + stream_id, + /*seq*/ 0, + jsonrpc_payload(&message)?, + )) + .into(), + )) + .await?; + assert!(matches!( + timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?, + Some(JsonRpcConnectionEvent::Message(actual)) if actual == message + )); + + drop(connection); + Ok(()) + } + + #[tokio::test] + async fn harness_connection_reports_text_frames_as_malformed() -> anyhow::Result<()> { + let (client_websocket, mut server_websocket) = websocket_pair().await?; + let mut connection = + harness_connection_from_websocket(client_websocket, "test".to_string()); + + read_resume_stream_id(&mut server_websocket).await?; + server_websocket.send(Message::Text("nope".into())).await?; + assert!(matches!( + timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?, + Some(JsonRpcConnectionEvent::MalformedMessage { reason }) + if reason == "relay exec-server transport expects binary protobuf frames" + )); + + drop(connection); + Ok(()) + } + + #[tokio::test] + async fn harness_connection_reports_server_close() -> anyhow::Result<()> { + let (client_websocket, mut server_websocket) = websocket_pair().await?; + let mut connection = + harness_connection_from_websocket(client_websocket, "test".to_string()); + + read_resume_stream_id(&mut server_websocket).await?; + server_websocket.close(None).await?; + assert!(matches!( + timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?, + Some(JsonRpcConnectionEvent::Disconnected { reason: None }) + )); + + drop(connection); + Ok(()) + } + async fn websocket_pair() -> anyhow::Result<( WebSocketStream>, WebSocketStream, @@ -534,6 +598,29 @@ mod tests { Ok((client_websocket, server_websocket)) } + async fn read_resume_stream_id( + websocket: &mut WebSocketStream, + ) -> anyhow::Result { + let message = timeout(Duration::from_secs(1), websocket.next()) + .await? + .expect("websocket should stay open")?; + let Message::Binary(payload) = message else { + anyhow::bail!("expected relay resume frame, got {message:?}"); + }; + let frame = decode_relay_message_frame(payload.as_ref())?; + assert_eq!(frame.validate()?, RelayFrameBodyKind::Resume); + Ok(frame.stream_id) + } + + fn test_jsonrpc_message() -> JSONRPCMessage { + JSONRPCMessage::Request(JSONRPCRequest { + id: RequestId::Integer(1), + method: "test".to_string(), + params: None, + trace: None, + }) + } + async fn read_pong( websocket: &mut WebSocketStream, ) -> anyhow::Result<()> {