This commit is contained in:
Ahmed Ibrahim
2026-02-17 15:15:05 -08:00
parent 2c1d6af908
commit f26c001568
2 changed files with 116 additions and 22 deletions

View File

@@ -320,30 +320,38 @@ impl RealtimeWebsocketEvents {
return Ok(None);
}
let msg = match self.rx_message.lock().await.recv().await {
Some(Ok(msg)) => msg,
Some(Err(err)) => {
self.is_closed.store(true, Ordering::SeqCst);
return Err(ApiError::Stream(format!(
"failed to read websocket message: {err}"
)));
}
None => {
self.is_closed.store(true, Ordering::SeqCst);
return Ok(None);
}
};
loop {
let msg = match self.rx_message.lock().await.recv().await {
Some(Ok(msg)) => msg,
Some(Err(err)) => {
self.is_closed.store(true, Ordering::SeqCst);
return Err(ApiError::Stream(format!(
"failed to read websocket message: {err}"
)));
}
None => {
self.is_closed.store(true, Ordering::SeqCst);
return Ok(None);
}
};
match msg {
Message::Text(text) => Ok(parse_realtime_event(&text)),
Message::Close(_) => {
self.is_closed.store(true, Ordering::SeqCst);
Ok(None)
match msg {
Message::Text(text) => {
if let Some(event) = parse_realtime_event(&text) {
return Ok(Some(event));
}
}
Message::Close(_) => {
self.is_closed.store(true, Ordering::SeqCst);
return Ok(None);
}
Message::Binary(_) => {
return Ok(Some(RealtimeEvent::Error(
"unexpected binary realtime websocket event".to_string(),
)));
}
Message::Frame(_) | Message::Ping(_) | Message::Pong(_) => {}
}
Message::Binary(_) => Ok(Some(RealtimeEvent::Error(
"unexpected binary realtime websocket event".to_string(),
))),
Message::Frame(_) | Message::Ping(_) | Message::Pong(_) => Ok(None),
}
}
}