From 52bac8aef5e976e7b8d982f6ddde4ee0e436fe6c Mon Sep 17 00:00:00 2001 From: rcmerci Date: Sat, 10 Jan 2026 00:17:35 +0800 Subject: [PATCH] enhance(worker-sync): add ws reconnect backoff --- src/main/frontend/worker/worker_sync.cljs | 58 +++++++++++++++++++++-- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/src/main/frontend/worker/worker_sync.cljs b/src/main/frontend/worker/worker_sync.cljs index 9a2f734f02..e8d9e28c77 100644 --- a/src/main/frontend/worker/worker_sync.cljs +++ b/src/main/frontend/worker/worker_sync.cljs @@ -47,6 +47,9 @@ (def ^:private max-asset-size (* 100 1024 1024)) (def ^:private upload-kvs-batch-size 2000) +(def ^:private reconnect-base-delay-ms 1000) +(def ^:private reconnect-max-delay-ms 30000) +(def ^:private reconnect-jitter-ms 250) (defn- format-ws-url [base graph-id] (cond @@ -82,6 +85,22 @@ (defn- ws-open? [ws] (= 1 (ready-state ws))) +(defn- reconnect-delay-ms [attempt] + (let [exp (js/Math.pow 2 attempt) + delay (min reconnect-max-delay-ms (* reconnect-base-delay-ms exp)) + jitter (rand-int reconnect-jitter-ms)] + (+ delay jitter))) + +(defn- clear-reconnect-timer! [reconnect] + (when-let [timer (:timer @reconnect)] + (js/clearTimeout timer) + (swap! reconnect assoc :timer nil))) + +(defn- reset-reconnect! [client] + (when-let [reconnect (:reconnect client)] + (clear-reconnect-timer! reconnect) + (swap! reconnect assoc :attempt 0))) + (defn- send! [ws message] (when (ws-open? ws) (.send ws (js/JSON.stringify (clj->js message))))) @@ -206,7 +225,8 @@ (let [client {:repo repo :send-queue (atom (p/resolved nil)) :asset-queue (atom (p/resolved nil)) - :inflight (atom [])}] + :inflight (atom []) + :reconnect (atom {:attempt 0 :timer nil})}] (swap! worker-state/*worker-sync-clients assoc repo client) client))) @@ -473,18 +493,45 @@ :t_before (or (client-op/get-local-tx repo) 0) :txs txs})))))))))) -(defn- attach-ws-handlers! [repo client ws] +(declare connect!) + +(defn- schedule-reconnect! [repo client url reason] + (when (enabled?) + (when-let [reconnect (:reconnect client)] + (let [{:keys [attempt timer]} @reconnect] + (when (nil? timer) + (let [delay (reconnect-delay-ms attempt) + timeout-id (js/setTimeout + (fn [] + (swap! reconnect assoc :timer nil) + (when (enabled?) + (when-let [current (get @worker-state/*worker-sync-clients repo)] + (when (= (:graph-id current) (:graph-id client)) + (let [updated (connect! repo current url)] + (swap! worker-state/*worker-sync-clients assoc repo updated)))))) + delay)] + (swap! reconnect assoc :timer timeout-id :attempt (inc attempt)) + (log/info :worker-sync/ws-reconnect-scheduled + {:repo repo :delay delay :attempt attempt :reason reason}))))))) + +(defn- attach-ws-handlers! [repo client ws url] (set! (.-onmessage ws) (fn [event] (handle-message! repo client (.-data event)))) + (set! (.-onerror ws) + (fn [event] + (log/error :worker-sync/ws-error {:repo repo :error event}))) (set! (.-onclose ws) (fn [_] - (log/info :worker-sync/ws-closed {:repo repo})))) + (log/info :worker-sync/ws-closed {:repo repo}) + (schedule-reconnect! repo client url :close)))) -(defn- start-pull-loop! [client ws] +(defn- start-pull-loop! [client _ws] client) (defn- stop-client! [client] + (when-let [reconnect (:reconnect client)] + (clear-reconnect-timer! reconnect)) (when-let [ws (:ws client)] (try (.close ws) @@ -494,9 +541,10 @@ (defn- connect! [repo client url] (let [ws (js/WebSocket. (append-token url (auth-token))) updated (assoc client :ws ws)] - (attach-ws-handlers! repo updated ws) + (attach-ws-handlers! repo updated ws url) (set! (.-onopen ws) (fn [_] + (reset-reconnect! updated) (send! ws {:type "hello" :client repo}) (enqueue-asset-sync! repo updated) (enqueue-asset-initial-download! repo updated)))