Preserve exec-server websocket keepalive ownership

This commit is contained in:
starr-openai
2026-05-18 10:42:43 -07:00
parent 3673b69a2a
commit d94300d782
2 changed files with 25 additions and 2 deletions

View File

@@ -323,11 +323,11 @@ impl JsonRpcConnection {
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
Self::from_websocket_stream(stream, connection_label, /*ping_interval*/ None)
Self::from_websocket_stream(stream, connection_label, Some(WEBSOCKET_KEEPALIVE_INTERVAL))
}
pub(crate) fn from_axum_websocket(stream: AxumWebSocket, connection_label: String) -> Self {
Self::from_websocket_stream(stream, connection_label, Some(WEBSOCKET_KEEPALIVE_INTERVAL))
Self::from_websocket_stream(stream, connection_label, /*ping_interval*/ None)
}
fn from_websocket_stream<T, M, E>(

View File

@@ -19,6 +19,7 @@ use crate::connection::CHANNEL_CAPACITY;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::connection::JsonRpcTransport;
use crate::connection::WEBSOCKET_KEEPALIVE_INTERVAL;
use crate::relay_proto::RelayData;
use crate::relay_proto::RelayMessageFrame;
use crate::relay_proto::RelayResume;
@@ -166,6 +167,11 @@ where
}
let mut next_seq = 0u32;
let mut keepalive = tokio::time::interval_at(
tokio::time::Instant::now() + WEBSOCKET_KEEPALIVE_INTERVAL,
WEBSOCKET_KEEPALIVE_INTERVAL,
);
keepalive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
maybe_message = outgoing_rx.recv() => {
@@ -190,6 +196,12 @@ where
break;
}
}
_ = keepalive.tick() => {
if websocket.send(Message::Ping(Vec::new().into())).await.is_err() {
let _ = disconnected_tx.send(true);
break;
}
}
incoming_message = websocket.next() => {
match incoming_message {
Some(Ok(Message::Binary(payload))) => {
@@ -312,6 +324,11 @@ pub(crate) async fn run_multiplexed_executor<S>(
mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
let mut streams: HashMap<String, VirtualStream> = HashMap::new();
let mut keepalive = tokio::time::interval_at(
tokio::time::Instant::now() + WEBSOCKET_KEEPALIVE_INTERVAL,
WEBSOCKET_KEEPALIVE_INTERVAL,
);
keepalive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
let frame = tokio::select! {
maybe_encoded = physical_outgoing_rx.recv() => {
@@ -323,6 +340,12 @@ pub(crate) async fn run_multiplexed_executor<S>(
}
continue;
}
_ = keepalive.tick() => {
if websocket.send(Message::Ping(Vec::new().into())).await.is_err() {
break;
}
continue;
}
incoming_message = websocket.next() => match incoming_message {
Some(Ok(Message::Binary(payload))) => {
match decode_relay_message_frame(payload.as_ref()) {