enhance(db-sync): ensure only one ws per repo

This commit is contained in:
rcmerci
2026-01-11 23:00:36 +08:00
parent b949f707ed
commit 6e2cae24b7

View File

@@ -90,6 +90,22 @@
(defn- ws-open? [ws]
(= 1 (ready-state ws)))
(defonce ^:private *db-sync-ws-registry (atom {}))
(defn- registered-ws [repo]
(get @*db-sync-ws-registry repo))
(defn- register-ws! [repo ws]
(swap! *db-sync-ws-registry assoc repo ws))
(defn- unregister-ws! [repo ws]
(when (identical? ws (registered-ws repo))
(swap! *db-sync-ws-registry dissoc repo)))
(defn- registered-ws-open? [repo]
(when-let [ws (registered-ws repo)]
(ws-open? ws)))
(def ^:private state-start :start)
(def ^:private state-hello-wait :hello-wait)
(def ^:private state-hello-done :hello-done)
@@ -214,6 +230,9 @@
(clear-reconnect-timer! reconnect)
(swap! reconnect assoc :attempt 0)))
(defn- reconnect-pending? [client]
(boolean (some-> client :reconnect deref :timer)))
(defn- reset-client-machine! [client]
(set-client-state! client state-start)
(set-pull-active! client false)
@@ -747,7 +766,7 @@
(log/error :db-sync/apply-remote-tx-failed {:error e})))
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
(defn- send-pull! [repo client ws since]
(defn- send-pull! [client ws since]
(send! ws {:type "pull" :since since})
(set-pull-active! client true)
(set-client-state! client state-pull-wait))
@@ -810,7 +829,7 @@
(complete-tx-batch! client)
(when (and (= (client-state client) state-pull-wait)
(pull-active? client))
(send-pull! repo client (:ws client) (or (client-op/get-local-tx repo) 0)))
(send-pull! client (:ws client) (or (client-op/get-local-tx repo) 0)))
(flush-pending! repo client)))
(defn- handle-message! [repo client raw]
@@ -833,7 +852,7 @@
(require-non-negative remote-tx {:repo repo :type "hello"})
(set-client-state! client state-hello-done)
(when (> remote-tx local-tx)
(send-pull! repo client ws local-tx))
(send-pull! client ws local-tx))
(enqueue-asset-sync! repo client)
(enqueue-asset-initial-download! repo client)
(flush-pending! repo client))
@@ -845,7 +864,7 @@
"changed" (do
(require-non-negative remote-tx {:repo repo :type "changed"})
(when (< local-tx remote-tx)
(send-pull! repo client ws local-tx)))
(send-pull! client ws local-tx)))
"error" (fail-fast-state! client :db-sync/invalid-field
{:repo repo :state state :type msg-type :message message})
"pong" nil
@@ -858,7 +877,7 @@
"changed" (do
(require-non-negative remote-tx {:repo repo :type "changed"})
(when (< local-tx remote-tx)
(send-pull! repo client ws local-tx)))
(send-pull! client ws local-tx)))
"error" (fail-fast-state! client :db-sync/invalid-field
{:repo repo :state state :type msg-type :message message})
"pong" nil
@@ -885,7 +904,7 @@
(set-client-state! client state-tx-reject-stale)
(clear-tx-return-state! client)
(set-stale-inflight! client true)
(send-pull! repo client ws local-tx))
(send-pull! client ws local-tx))
"cycle"
(do
(set-client-state! client state-tx-reject-cycle)
@@ -978,6 +997,7 @@
(log/error :db-sync/ws-error {:repo repo :error event})))
(set! (.-onclose ws)
(fn [_]
(unregister-ws! repo ws)
(log/info :db-sync/ws-closed {:repo repo})
(set-client-state! client state-end)
(schedule-reconnect! repo client url :close))))
@@ -999,19 +1019,24 @@
nil))))
(defn- connect! [repo client url]
(let [ws (js/WebSocket. (append-token url (auth-token)))
updated (-> client
reset-client-machine!
(assoc :ws ws))]
(attach-ws-handlers! repo updated ws url)
(set! (.-onopen ws)
(fn [_]
(reset-reconnect! updated)
(set-client-state! updated state-hello-wait)
(send! ws {:type "hello" :client repo})
(enqueue-asset-sync! repo updated)
(enqueue-asset-initial-download! repo updated)))
(start-pull-loop! updated ws)))
(if (registered-ws-open? repo)
(do
(log/info :db-sync/ws-connect-skipped {:repo repo :reason :existing-open})
(assoc client :ws (registered-ws repo)))
(let [ws (js/WebSocket. (append-token url (auth-token)))
updated (-> client
reset-client-machine!
(assoc :ws ws))]
(attach-ws-handlers! repo updated ws url)
(set! (.-onopen ws)
(fn [_]
(register-ws! repo ws)
(reset-reconnect! updated)
(set-client-state! updated state-hello-wait)
(send! ws {:type "hello" :client repo})
(enqueue-asset-sync! repo updated)
(enqueue-asset-initial-download! repo updated)))
(start-pull-loop! updated ws))))
(defn stop!
([]
@@ -1029,21 +1054,30 @@
[repo]
(if-not (enabled?)
(p/resolved nil)
(p/do!
(stop!)
(let [base (ws-base-url)
graph-id (get-graph-id repo)]
(if (and (string? base) (seq base) (seq graph-id))
(let [client (ensure-client-state! repo)
url (format-ws-url base graph-id)
_ (ensure-client-graph-uuid! repo graph-id)
connected (assoc client :graph-id graph-id)
connected (connect! repo connected url)]
(swap! worker-state/*db-sync-clients assoc repo connected)
(p/resolved nil))
(do
(log/info :db-sync/start-skipped {:repo repo :graph-id graph-id :base base})
(p/resolved nil)))))))
(let [client (get @worker-state/*db-sync-clients repo)]
(cond
(registered-ws-open? repo)
(p/resolved nil)
(and client (reconnect-pending? client))
(p/resolved nil)
:else
(p/do!
(stop!)
(let [base (ws-base-url)
graph-id (get-graph-id repo)]
(if (and (string? base) (seq base) (seq graph-id))
(let [client (ensure-client-state! repo)
url (format-ws-url base graph-id)
_ (ensure-client-graph-uuid! repo graph-id)
connected (assoc client :graph-id graph-id)
connected (connect! repo connected url)]
(swap! worker-state/*db-sync-clients assoc repo connected)
(p/resolved nil))
(do
(log/info :db-sync/start-skipped {:repo repo :graph-id graph-id :base base})
(p/resolved nil)))))))))
(defn get-client-state
[repo]