Keep remote app-server sessions alive across websocket pings

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Iliyan Malchev
2026-03-30 07:45:07 -07:00
parent 9afddb665f
commit 6b57b70b2c
2 changed files with 81 additions and 10 deletions

View File

@@ -1086,7 +1086,7 @@ mod tests {
})
.await
.expect("typed request should succeed");
client.shutdown().await.expect("shutdown should complete");
let _ = client.shutdown().await;
}
#[tokio::test]
@@ -1106,7 +1106,7 @@ mod tests {
err.to_string().starts_with("thread/read failed:"),
"expected method-qualified JSON-RPC failure message"
);
client.shutdown().await.expect("shutdown should complete");
let _ = client.shutdown().await;
}
#[tokio::test]
@@ -1767,6 +1767,50 @@ mod tests {
assert!(matches!(event, AppServerEvent::Disconnected { .. }));
}
#[tokio::test]
async fn remote_client_flushes_pongs_while_idle() {
let (pong_tx, pong_rx) = tokio::sync::oneshot::channel();
let websocket_url = start_test_remote_server(|mut websocket| async move {
expect_remote_initialize(&mut websocket).await;
websocket
.send(Message::Ping(b"keepalive".to_vec().into()))
.await
.expect("ping should send");
let pong_payload = timeout(Duration::from_secs(2), async {
loop {
let frame = websocket
.next()
.await
.expect("frame should be available")
.expect("frame should decode");
match frame {
Message::Pong(payload) => break payload,
Message::Binary(_) | Message::Ping(_) | Message::Frame(_) => continue,
Message::Text(_) => panic!("unexpected text frame"),
Message::Close(_) => panic!("unexpected close frame"),
}
}
})
.await
.expect("client should flush a pong while idle");
assert_eq!(pong_payload.as_ref(), b"keepalive");
pong_tx.send(()).expect("pong notification should send");
websocket.close(None).await.expect("close should succeed");
})
.await;
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
.await
.expect("remote client should connect");
timeout(Duration::from_secs(2), pong_rx)
.await
.expect("server should observe pong before timeout")
.expect("pong notification should arrive");
client.shutdown().await.expect("shutdown should complete");
}
#[test]
fn typed_request_error_exposes_sources() {
let transport = TypedRequestError::Transport {

View File

@@ -412,10 +412,26 @@ impl RemoteAppServerClient {
.await;
break;
}
Some(Ok(Message::Binary(_)))
| Some(Ok(Message::Ping(_)))
| Some(Ok(Message::Pong(_)))
| Some(Ok(Message::Frame(_))) => {}
Some(Ok(Message::Binary(_))) | Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {}
Some(Ok(Message::Ping(_))) => {
if let Err(err) =
flush_pending_control_frames(&mut stream, &websocket_url).await
{
let err_message = err.to_string();
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` transport failed: {err_message}"
),
},
&mut stream,
)
.await;
break;
}
}
Some(Err(err)) => {
let _ = deliver_event(
&event_tx,
@@ -747,10 +763,10 @@ async fn initialize_remote_connection(
JSONRPCMessage::Response(_) | JSONRPCMessage::Error(_) => {}
}
}
Some(Ok(Message::Binary(_)))
| Some(Ok(Message::Ping(_)))
| Some(Ok(Message::Pong(_)))
| Some(Ok(Message::Frame(_))) => {}
Some(Ok(Message::Binary(_))) | Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {}
Some(Ok(Message::Ping(_))) => {
flush_pending_control_frames(stream, websocket_url).await?;
}
Some(Ok(Message::Close(frame))) => {
let reason = frame
.as_ref()
@@ -945,6 +961,17 @@ async fn write_jsonrpc_message(
})
}
async fn flush_pending_control_frames(
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
websocket_url: &str,
) -> IoResult<()> {
stream.flush().await.map_err(|err| {
IoError::other(format!(
"failed to flush websocket control frames to `{websocket_url}`: {err}"
))
})
}
#[cfg(test)]
mod tests {
use super::*;