mirror of
https://github.com/openai/codex.git
synced 2026-05-22 20:14:17 +00:00
Remove manual websocket pong handling
This commit is contained in:
@@ -401,19 +401,6 @@ impl JsonRpcConnection {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(JsonRpcWebSocketFrame::Ping(payload)) => {
|
||||
if let Err(err) = websocket.send(M::pong(payload)).await {
|
||||
send_disconnected(
|
||||
&incoming_tx,
|
||||
&disconnected_tx,
|
||||
Some(format!(
|
||||
"failed to write websocket pong to {connection_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(JsonRpcWebSocketFrame::Close) => {
|
||||
send_disconnected(
|
||||
&incoming_tx,
|
||||
@@ -477,7 +464,6 @@ impl JsonRpcConnection {
|
||||
|
||||
enum JsonRpcWebSocketFrame {
|
||||
Message(JSONRPCMessage),
|
||||
Ping(bytes::Bytes),
|
||||
Close,
|
||||
Ignore,
|
||||
}
|
||||
@@ -486,7 +472,6 @@ trait JsonRpcWebSocketMessage: Send + 'static {
|
||||
fn parse_jsonrpc_frame(self) -> Result<JsonRpcWebSocketFrame, serde_json::Error>;
|
||||
fn from_text(text: String) -> Self;
|
||||
fn ping() -> Self;
|
||||
fn pong(payload: bytes::Bytes) -> Self;
|
||||
}
|
||||
|
||||
impl JsonRpcWebSocketMessage for Message {
|
||||
@@ -499,8 +484,9 @@ impl JsonRpcWebSocketMessage for Message {
|
||||
serde_json::from_slice(bytes.as_ref()).map(JsonRpcWebSocketFrame::Message)
|
||||
}
|
||||
Message::Close(_) => Ok(JsonRpcWebSocketFrame::Close),
|
||||
Message::Ping(payload) => Ok(JsonRpcWebSocketFrame::Ping(payload)),
|
||||
Message::Pong(_) | Message::Frame(_) => Ok(JsonRpcWebSocketFrame::Ignore),
|
||||
Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => {
|
||||
Ok(JsonRpcWebSocketFrame::Ignore)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -511,10 +497,6 @@ impl JsonRpcWebSocketMessage for Message {
|
||||
fn ping() -> Self {
|
||||
Self::Ping(Vec::new().into())
|
||||
}
|
||||
|
||||
fn pong(payload: bytes::Bytes) -> Self {
|
||||
Self::Pong(payload)
|
||||
}
|
||||
}
|
||||
|
||||
impl JsonRpcWebSocketMessage for AxumWebSocketMessage {
|
||||
@@ -527,8 +509,9 @@ impl JsonRpcWebSocketMessage for AxumWebSocketMessage {
|
||||
serde_json::from_slice(bytes.as_ref()).map(JsonRpcWebSocketFrame::Message)
|
||||
}
|
||||
AxumWebSocketMessage::Close(_) => Ok(JsonRpcWebSocketFrame::Close),
|
||||
AxumWebSocketMessage::Ping(payload) => Ok(JsonRpcWebSocketFrame::Ping(payload)),
|
||||
AxumWebSocketMessage::Pong(_) => Ok(JsonRpcWebSocketFrame::Ignore),
|
||||
AxumWebSocketMessage::Ping(_) | AxumWebSocketMessage::Pong(_) => {
|
||||
Ok(JsonRpcWebSocketFrame::Ignore)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -539,10 +522,6 @@ impl JsonRpcWebSocketMessage for AxumWebSocketMessage {
|
||||
fn ping() -> Self {
|
||||
Self::Ping(Vec::new().into())
|
||||
}
|
||||
|
||||
fn pong(payload: bytes::Bytes) -> Self {
|
||||
Self::Pong(payload)
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_disconnected(
|
||||
@@ -628,23 +607,6 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_connection_pongs_server_ping() -> anyhow::Result<()> {
|
||||
let (client_websocket, mut server_websocket) = websocket_pair().await?;
|
||||
let connection = JsonRpcConnection::from_websocket(client_websocket, "test".into());
|
||||
|
||||
server_websocket
|
||||
.send(Message::Ping(b"check".to_vec().into()))
|
||||
.await?;
|
||||
let message = timeout(Duration::from_secs(1), server_websocket.next())
|
||||
.await?
|
||||
.expect("websocket should stay open")?;
|
||||
assert_eq!(message, Message::Pong(b"check".to_vec().into()));
|
||||
|
||||
drop(connection);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_connection_sends_configured_ping() -> anyhow::Result<()> {
|
||||
let (client_websocket, mut server_websocket) = websocket_pair().await?;
|
||||
@@ -733,7 +695,7 @@ mod tests {
|
||||
|
||||
connection.outgoing_tx.send(message.clone()).await?;
|
||||
control.wait_for_blocked_write().await?;
|
||||
control.send_inbound(Message::Ping(b"check".to_vec().into()))?;
|
||||
control.send_inbound(Message::Pong(b"check".to_vec().into()))?;
|
||||
assert!(
|
||||
timeout(Duration::from_millis(50), connection.incoming_rx.recv())
|
||||
.await
|
||||
@@ -745,11 +707,6 @@ mod tests {
|
||||
timeout(Duration::from_secs(1), outbound_rx.next()).await?,
|
||||
Some(Message::Text(text)) if serde_json::from_str::<JSONRPCMessage>(&text)? == message
|
||||
));
|
||||
assert!(matches!(
|
||||
timeout(Duration::from_secs(1), outbound_rx.next()).await?,
|
||||
Some(Message::Pong(payload)) if payload.as_ref() == b"check"
|
||||
));
|
||||
|
||||
drop(connection);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -268,12 +268,6 @@ where
|
||||
| RelayFrameBodyKind::Heartbeat => {}
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Ping(payload))) => {
|
||||
if websocket.send(Message::Pong(payload)).await.is_err() {
|
||||
let _ = disconnected_tx.send(true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Close(_))) | None => {
|
||||
let _ = disconnected_tx.send(true);
|
||||
let _ = incoming_tx
|
||||
@@ -281,7 +275,7 @@ where
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
Some(Ok(Message::Pong(_) | Message::Frame(_))) => {}
|
||||
Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => {}
|
||||
Some(Ok(Message::Text(_))) => {
|
||||
let _ = incoming_tx
|
||||
.send(JsonRpcConnectionEvent::MalformedMessage {
|
||||
@@ -359,14 +353,8 @@ pub(crate) async fn run_multiplexed_executor<S>(
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Ping(payload))) => {
|
||||
if websocket.send(Message::Pong(payload)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Some(Ok(Message::Close(_))) | None => break,
|
||||
Some(Ok(Message::Pong(_) | Message::Frame(_))) => continue,
|
||||
Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => continue,
|
||||
Some(Ok(Message::Text(_))) => {
|
||||
warn!("dropping non-binary relay message frame from harness");
|
||||
continue;
|
||||
@@ -517,46 +505,6 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
fn test_runtime_paths() -> anyhow::Result<crate::ExecServerRuntimePaths> {
|
||||
crate::ExecServerRuntimePaths::new(
|
||||
std::env::current_exe()?,
|
||||
/*codex_linux_sandbox_exe*/ None,
|
||||
)
|
||||
.map_err(anyhow::Error::from)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multiplexed_executor_pongs_server_ping() -> anyhow::Result<()> {
|
||||
let (client_websocket, mut server_websocket) = websocket_pair().await?;
|
||||
let executor_task = tokio::spawn(run_multiplexed_executor(
|
||||
client_websocket,
|
||||
ConnectionProcessor::new(test_runtime_paths()?),
|
||||
));
|
||||
|
||||
server_websocket
|
||||
.send(Message::Ping(b"check".to_vec().into()))
|
||||
.await?;
|
||||
read_pong(&mut server_websocket).await?;
|
||||
|
||||
executor_task.abort();
|
||||
let _ = executor_task.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn harness_connection_pongs_server_ping() -> anyhow::Result<()> {
|
||||
let (client_websocket, mut server_websocket) = websocket_pair().await?;
|
||||
let connection = harness_connection_from_websocket(client_websocket, "test".to_string());
|
||||
|
||||
server_websocket
|
||||
.send(Message::Ping(b"check".to_vec().into()))
|
||||
.await?;
|
||||
read_pong(&mut server_websocket).await?;
|
||||
|
||||
drop(connection);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn harness_connection_receives_relay_data() -> anyhow::Result<()> {
|
||||
let (client_websocket, mut server_websocket) = websocket_pair().await?;
|
||||
@@ -637,7 +585,7 @@ mod tests {
|
||||
control.set_write_blocked();
|
||||
connection.outgoing_tx.send(message.clone()).await?;
|
||||
control.wait_for_blocked_write().await?;
|
||||
control.send_inbound(Message::Ping(b"check".to_vec().into()))?;
|
||||
control.send_inbound(Message::Pong(b"check".to_vec().into()))?;
|
||||
assert!(
|
||||
timeout(Duration::from_millis(50), connection.incoming_rx.recv())
|
||||
.await
|
||||
@@ -654,11 +602,6 @@ mod tests {
|
||||
let frame = decode_relay_message_frame(data_payload.as_ref())?;
|
||||
assert_eq!(frame.stream_id, stream_id);
|
||||
assert_eq!(frame.into_jsonrpc_message()?, message);
|
||||
assert!(matches!(
|
||||
timeout(Duration::from_secs(1), outbound_rx.next()).await?,
|
||||
Some(Message::Pong(payload)) if payload.as_ref() == b"check"
|
||||
));
|
||||
|
||||
drop(connection);
|
||||
Ok(())
|
||||
}
|
||||
@@ -701,25 +644,6 @@ mod tests {
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_pong(
|
||||
websocket: &mut WebSocketStream<tokio::net::TcpStream>,
|
||||
) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let Some(message) = timeout(Duration::from_secs(1), websocket.next()).await? else {
|
||||
anyhow::bail!("websocket closed before pong");
|
||||
};
|
||||
match message? {
|
||||
Message::Pong(payload) if payload.as_ref() == b"check" => return Ok(()),
|
||||
Message::Binary(_)
|
||||
| Message::Text(_)
|
||||
| Message::Ping(_)
|
||||
| Message::Pong(_)
|
||||
| Message::Frame(_) => {}
|
||||
Message::Close(_) => anyhow::bail!("websocket closed before pong"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ControlledWebSocket {
|
||||
inbound_rx: futures_mpsc::UnboundedReceiver<Result<Message, std::convert::Infallible>>,
|
||||
outbound_tx: futures_mpsc::UnboundedSender<Message>,
|
||||
|
||||
Reference in New Issue
Block a user