enhance(worker-sync): add ws reconnect backoff

This commit is contained in:
rcmerci
2026-01-10 00:17:35 +08:00
parent 7d2d098cf2
commit 52bac8aef5

View File

@@ -47,6 +47,9 @@
(def ^:private max-asset-size (* 100 1024 1024))
(def ^:private upload-kvs-batch-size 2000)
(def ^:private reconnect-base-delay-ms 1000)
(def ^:private reconnect-max-delay-ms 30000)
(def ^:private reconnect-jitter-ms 250)
(defn- format-ws-url [base graph-id]
(cond
@@ -82,6 +85,22 @@
(defn- ws-open? [ws]
(= 1 (ready-state ws)))
(defn- reconnect-delay-ms [attempt]
(let [exp (js/Math.pow 2 attempt)
delay (min reconnect-max-delay-ms (* reconnect-base-delay-ms exp))
jitter (rand-int reconnect-jitter-ms)]
(+ delay jitter)))
(defn- clear-reconnect-timer! [reconnect]
(when-let [timer (:timer @reconnect)]
(js/clearTimeout timer)
(swap! reconnect assoc :timer nil)))
(defn- reset-reconnect! [client]
(when-let [reconnect (:reconnect client)]
(clear-reconnect-timer! reconnect)
(swap! reconnect assoc :attempt 0)))
(defn- send! [ws message]
(when (ws-open? ws)
(.send ws (js/JSON.stringify (clj->js message)))))
@@ -206,7 +225,8 @@
(let [client {:repo repo
:send-queue (atom (p/resolved nil))
:asset-queue (atom (p/resolved nil))
:inflight (atom [])}]
:inflight (atom [])
:reconnect (atom {:attempt 0 :timer nil})}]
(swap! worker-state/*worker-sync-clients assoc repo client)
client)))
@@ -473,18 +493,45 @@
:t_before (or (client-op/get-local-tx repo) 0)
:txs txs}))))))))))
(defn- attach-ws-handlers! [repo client ws]
(declare connect!)
(defn- schedule-reconnect! [repo client url reason]
(when (enabled?)
(when-let [reconnect (:reconnect client)]
(let [{:keys [attempt timer]} @reconnect]
(when (nil? timer)
(let [delay (reconnect-delay-ms attempt)
timeout-id (js/setTimeout
(fn []
(swap! reconnect assoc :timer nil)
(when (enabled?)
(when-let [current (get @worker-state/*worker-sync-clients repo)]
(when (= (:graph-id current) (:graph-id client))
(let [updated (connect! repo current url)]
(swap! worker-state/*worker-sync-clients assoc repo updated))))))
delay)]
(swap! reconnect assoc :timer timeout-id :attempt (inc attempt))
(log/info :worker-sync/ws-reconnect-scheduled
{:repo repo :delay delay :attempt attempt :reason reason})))))))
(defn- attach-ws-handlers! [repo client ws url]
(set! (.-onmessage ws)
(fn [event]
(handle-message! repo client (.-data event))))
(set! (.-onerror ws)
(fn [event]
(log/error :worker-sync/ws-error {:repo repo :error event})))
(set! (.-onclose ws)
(fn [_]
(log/info :worker-sync/ws-closed {:repo repo}))))
(log/info :worker-sync/ws-closed {:repo repo})
(schedule-reconnect! repo client url :close))))
(defn- start-pull-loop! [client ws]
(defn- start-pull-loop! [client _ws]
client)
(defn- stop-client! [client]
(when-let [reconnect (:reconnect client)]
(clear-reconnect-timer! reconnect))
(when-let [ws (:ws client)]
(try
(.close ws)
@@ -494,9 +541,10 @@
(defn- connect! [repo client url]
(let [ws (js/WebSocket. (append-token url (auth-token)))
updated (assoc client :ws ws)]
(attach-ws-handlers! repo updated ws)
(attach-ws-handlers! repo updated ws url)
(set! (.-onopen ws)
(fn [_]
(reset-reconnect! updated)
(send! ws {:type "hello" :client repo})
(enqueue-asset-sync! repo updated)
(enqueue-asset-initial-download! repo updated)))