Add exec-server websocket pump tests

This commit is contained in:
starr-openai
2026-05-18 10:31:48 -07:00
parent 8a9300e92a
commit 3673b69a2a
2 changed files with 145 additions and 0 deletions

View File

@@ -610,6 +610,8 @@ fn serialize_jsonrpc_message(message: &JSONRPCMessage) -> Result<String, serde_j
#[cfg(test)]
mod tests {
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::RequestId;
use tokio::net::TcpListener;
use tokio::time::timeout;
use tokio_tungstenite::accept_async;
@@ -652,6 +654,62 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn websocket_connection_ignores_server_pong() -> anyhow::Result<()> {
let (client_websocket, mut server_websocket) = websocket_pair().await?;
let mut connection = JsonRpcConnection::from_websocket(client_websocket, "test".into());
server_websocket
.send(Message::Pong(b"check".to_vec().into()))
.await?;
assert!(
timeout(Duration::from_millis(50), connection.incoming_rx.recv())
.await
.is_err()
);
drop(connection);
Ok(())
}
#[tokio::test]
async fn websocket_connection_reports_server_close() -> anyhow::Result<()> {
let (client_websocket, mut server_websocket) = websocket_pair().await?;
let mut connection = JsonRpcConnection::from_websocket(client_websocket, "test".into());
server_websocket.close(None).await?;
assert!(matches!(
timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?,
Some(JsonRpcConnectionEvent::Disconnected { reason: None })
));
drop(connection);
Ok(())
}
#[tokio::test]
async fn websocket_connection_accepts_binary_jsonrpc_message() -> anyhow::Result<()> {
let (client_websocket, mut server_websocket) = websocket_pair().await?;
let mut connection = JsonRpcConnection::from_websocket(client_websocket, "test".into());
let message = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: "test".to_string(),
params: None,
trace: None,
});
server_websocket
.send(Message::Binary(serde_json::to_vec(&message)?.into()))
.await?;
assert!(matches!(
timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?,
Some(JsonRpcConnectionEvent::Message(actual)) if actual == message
));
drop(connection);
Ok(())
}
async fn websocket_pair() -> anyhow::Result<(
WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
WebSocketStream<tokio::net::TcpStream>,

View File

@@ -471,6 +471,8 @@ fn spawn_virtual_stream(
mod tests {
use std::time::Duration;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::RequestId;
use tokio::net::TcpListener;
use tokio::time::timeout;
use tokio_tungstenite::accept_async;
@@ -519,6 +521,68 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn harness_connection_receives_relay_data() -> anyhow::Result<()> {
let (client_websocket, mut server_websocket) = websocket_pair().await?;
let mut connection =
harness_connection_from_websocket(client_websocket, "test".to_string());
let stream_id = read_resume_stream_id(&mut server_websocket).await?;
let message = test_jsonrpc_message();
server_websocket
.send(Message::Binary(
encode_relay_message_frame(&RelayMessageFrame::data(
stream_id,
/*seq*/ 0,
jsonrpc_payload(&message)?,
))
.into(),
))
.await?;
assert!(matches!(
timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?,
Some(JsonRpcConnectionEvent::Message(actual)) if actual == message
));
drop(connection);
Ok(())
}
#[tokio::test]
async fn harness_connection_reports_text_frames_as_malformed() -> anyhow::Result<()> {
let (client_websocket, mut server_websocket) = websocket_pair().await?;
let mut connection =
harness_connection_from_websocket(client_websocket, "test".to_string());
read_resume_stream_id(&mut server_websocket).await?;
server_websocket.send(Message::Text("nope".into())).await?;
assert!(matches!(
timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?,
Some(JsonRpcConnectionEvent::MalformedMessage { reason })
if reason == "relay exec-server transport expects binary protobuf frames"
));
drop(connection);
Ok(())
}
#[tokio::test]
async fn harness_connection_reports_server_close() -> anyhow::Result<()> {
let (client_websocket, mut server_websocket) = websocket_pair().await?;
let mut connection =
harness_connection_from_websocket(client_websocket, "test".to_string());
read_resume_stream_id(&mut server_websocket).await?;
server_websocket.close(None).await?;
assert!(matches!(
timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?,
Some(JsonRpcConnectionEvent::Disconnected { reason: None })
));
drop(connection);
Ok(())
}
async fn websocket_pair() -> anyhow::Result<(
WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
WebSocketStream<tokio::net::TcpStream>,
@@ -534,6 +598,29 @@ mod tests {
Ok((client_websocket, server_websocket))
}
async fn read_resume_stream_id(
websocket: &mut WebSocketStream<tokio::net::TcpStream>,
) -> anyhow::Result<String> {
let message = timeout(Duration::from_secs(1), websocket.next())
.await?
.expect("websocket should stay open")?;
let Message::Binary(payload) = message else {
anyhow::bail!("expected relay resume frame, got {message:?}");
};
let frame = decode_relay_message_frame(payload.as_ref())?;
assert_eq!(frame.validate()?, RelayFrameBodyKind::Resume);
Ok(frame.stream_id)
}
fn test_jsonrpc_message() -> JSONRPCMessage {
JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: "test".to_string(),
params: None,
trace: None,
})
}
async fn read_pong(
websocket: &mut WebSocketStream<tokio::net::TcpStream>,
) -> anyhow::Result<()> {