fix: auto-reconnect after long sleep by adding ws health check

This commit is contained in:
Tienson Qin
2026-02-11 15:15:40 +08:00
parent 10a8e3c491
commit dbef84fbb4
3 changed files with 246 additions and 102 deletions

View File

@@ -299,123 +299,150 @@
{:type "tx/reject"
:reason "empty tx data"}))))
(defn- handle-sync-pull
[^js self ^js url]
(let [raw-since (.get (.-searchParams url) "since")
since (if (some? raw-since) (parse-int raw-since) 0)]
(if (or (and (some? raw-since) (not (number? since))) (neg? since))
(http/bad-request "invalid since")
(http/json-response :sync/pull (pull-response self since)))))
(defn- handle-sync-snapshot-stream
[^js self request]
(let [graph-id (graph-id-from-request request)]
(if (not (seq graph-id))
(http/bad-request "missing graph id")
(let [use-compression? (exists? js/CompressionStream)
content-encoding (when use-compression? snapshot-content-encoding)
stream (snapshot-export-stream self)
stream (if use-compression?
(maybe-compress-stream stream)
stream)]
(js/Response. stream
#js {:status 200
:headers (js/Object.assign
#js {"content-type" snapshot-content-type
"content-encoding" (or content-encoding "identity")}
(common/cors-headers))})))))
(defn- handle-sync-snapshot-download
[^js self request]
(let [graph-id (graph-id-from-request request)
^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))]
(cond
(not (seq graph-id))
(http/bad-request "missing graph id")
(nil? bucket)
(http/error-response "missing assets bucket" 500)
:else
(p/let [snapshot-id (str (random-uuid))
key (snapshot-key graph-id snapshot-id)
use-compression? (exists? js/CompressionStream)
content-encoding (when use-compression? snapshot-content-encoding)
stream (snapshot-export-stream self)
stream (if use-compression?
(maybe-compress-stream stream)
stream)
multipart? (and (some? (.-createMultipartUpload bucket))
(fn? (.-createMultipartUpload bucket)))
opts #js {:httpMetadata #js {:contentType snapshot-content-type
:contentEncoding content-encoding
:cacheControl snapshot-cache-control}
:customMetadata #js {:purpose "snapshot"
:created-at (str (common/now-ms))}}
_ (if multipart?
(upload-multipart! bucket key stream opts)
(if use-compression?
(p/let [body (<buffer-stream stream)]
(.put bucket key body opts))
(p/let [body (snapshot-export-fixed-length self)]
(.put bucket key body opts))))
url (snapshot-url request graph-id snapshot-id)]
(http/json-response :sync/snapshot-download {:ok true
:key key
:url url
:content-encoding content-encoding})))))
(defn- handle-sync-admin-reset
[^js self]
(common/sql-exec (.-sql self) "drop table if exists kvs")
(common/sql-exec (.-sql self) "drop table if exists tx_log")
(common/sql-exec (.-sql self) "drop table if exists sync_meta")
(storage/init-schema! (.-sql self))
(set! (.-schema-ready self) true)
(set! (.-conn self) nil)
(http/json-response :sync/admin-reset {:ok true}))
(defn- handle-sync-tx-batch
[^js self request]
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :sync/tx-batch body)]
(if (nil? body)
(http/bad-request "invalid tx")
(let [{:keys [txs t-before]} body
t-before (parse-int t-before)]
(if (string? txs)
(http/json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before))
(http/bad-request "invalid tx")))))))))
(defn- parse-reset-param
[value]
(if (nil? value)
true
(not (contains? #{"false" "0"} value))))
(defn- handle-sync-snapshot-upload
[^js self request url]
(let [graph-id (graph-id-from-request request)
reset-param (.get (.-searchParams url) "reset")
reset? (parse-reset-param reset-param)
req-encoding (.get (.-headers request) "content-encoding")]
(cond
(not (seq graph-id))
(http/bad-request "missing graph id")
(nil? (.-body request))
(http/bad-request "missing body")
:else
(let [stream (.-body request)
encoding (or req-encoding "")]
(if (and (= encoding snapshot-content-encoding)
(not (exists? js/DecompressionStream)))
(http/error-response "gzip not supported" 500)
(p/let [stream (maybe-decompress-stream stream encoding)
count (import-snapshot-stream! self stream reset?)]
(http/json-response :sync/snapshot-upload {:ok true
:count count})))))))
(defn handle [{:keys [^js self request url route]}]
(case (:handler route)
:sync/health
(http/json-response :sync/health {:ok true})
:sync/pull
(let [raw-since (.get (.-searchParams url) "since")
since (if (some? raw-since) (parse-int raw-since) 0)]
(if (or (and (some? raw-since) (not (number? since))) (neg? since))
(http/bad-request "invalid since")
(http/json-response :sync/pull (pull-response self since))))
(handle-sync-pull self url)
:sync/snapshot-stream
(let [graph-id (graph-id-from-request request)]
(if (not (seq graph-id))
(http/bad-request "missing graph id")
(let [use-compression? (exists? js/CompressionStream)
content-encoding (when use-compression? snapshot-content-encoding)
stream (snapshot-export-stream self)
stream (if use-compression?
(maybe-compress-stream stream)
stream)]
(js/Response. stream
#js {:status 200
:headers (js/Object.assign
#js {"content-type" snapshot-content-type
"content-encoding" (or content-encoding "identity")}
(common/cors-headers))}))))
(handle-sync-snapshot-stream self request)
:sync/snapshot-download
(let [graph-id (graph-id-from-request request)
^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))]
(cond
(not (seq graph-id))
(http/bad-request "missing graph id")
(nil? bucket)
(http/error-response "missing assets bucket" 500)
:else
(p/let [snapshot-id (str (random-uuid))
key (snapshot-key graph-id snapshot-id)
use-compression? (exists? js/CompressionStream)
content-encoding (when use-compression? snapshot-content-encoding)
stream (snapshot-export-stream self)
stream (if use-compression?
(maybe-compress-stream stream)
stream)
multipart? (and (some? (.-createMultipartUpload bucket))
(fn? (.-createMultipartUpload bucket)))
opts #js {:httpMetadata #js {:contentType snapshot-content-type
:contentEncoding content-encoding
:cacheControl snapshot-cache-control}
:customMetadata #js {:purpose "snapshot"
:created-at (str (common/now-ms))}}
_ (if multipart?
(upload-multipart! bucket key stream opts)
(if use-compression?
(p/let [body (<buffer-stream stream)]
(.put bucket key body opts))
(p/let [body (snapshot-export-fixed-length self)]
(.put bucket key body opts))))
url (snapshot-url request graph-id snapshot-id)]
(http/json-response :sync/snapshot-download {:ok true
:key key
:url url
:content-encoding content-encoding}))))
(handle-sync-snapshot-download self request)
:sync/admin-reset
(do
(common/sql-exec (.-sql self) "drop table if exists kvs")
(common/sql-exec (.-sql self) "drop table if exists tx_log")
(common/sql-exec (.-sql self) "drop table if exists sync_meta")
(storage/init-schema! (.-sql self))
(set! (.-schema-ready self) true)
(set! (.-conn self) nil)
(http/json-response :sync/admin-reset {:ok true}))
(handle-sync-admin-reset self)
:sync/tx-batch
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :sync/tx-batch body)]
(if (nil? body)
(http/bad-request "invalid tx")
(let [{:keys [txs t-before]} body
t-before (parse-int t-before)]
(if (string? txs)
(http/json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before))
(http/bad-request "invalid tx"))))))))
(handle-sync-tx-batch self request)
:sync/snapshot-upload
(let [graph-id (graph-id-from-request request)
reset-param (.get (.-searchParams url) "reset")
reset? (if (nil? reset-param)
true
(not (contains? #{"false" "0"} reset-param)))
req-encoding (.get (.-headers request) "content-encoding")]
(cond
(not (seq graph-id))
(http/bad-request "missing graph id")
(nil? (.-body request))
(http/bad-request "missing body")
:else
(let [stream (.-body request)
encoding (or req-encoding "")]
(if (and (= encoding snapshot-content-encoding)
(not (exists? js/DecompressionStream)))
(http/error-response "gzip not supported" 500)
(p/let [stream (maybe-decompress-stream stream encoding)
count (import-snapshot-stream! self stream reset?)]
(http/json-response :sync/snapshot-upload {:ok true
:count count}))))))
(handle-sync-snapshot-upload self request url)
(http/not-found)))

View File

@@ -154,6 +154,8 @@
(def ^:private reconnect-base-delay-ms 1000)
(def ^:private reconnect-max-delay-ms 30000)
(def ^:private reconnect-jitter-ms 250)
(def ^:private ws-pull-loop-interval-ms 30000)
(def ^:private ws-stale-timeout-ms 120000)
(def ^:private large-title-byte-limit 4096)
(def ^:private large-title-asset-type "txt")
(def ^:private large-title-object-attr :logseq.property.sync/large-title-object)
@@ -288,6 +290,16 @@
(clear-reconnect-timer! reconnect)
(swap! reconnect assoc :attempt 0)))
(defn- clear-pull-loop-timer! [client]
(when-let [*timer (:pull-loop-timer client)]
(when-let [timer @*timer]
(js/clearInterval timer)
(reset! *timer nil))))
(defn- touch-last-ws-message! [client]
(when-let [*ts (:last-ws-message-ts client)]
(reset! *ts (common-util/time-ms))))
(defn- send! [ws message]
(when (ws-open? ws)
(if-let [coerced (coerce-ws-client-message message)]
@@ -552,6 +564,8 @@
:asset-queue (atom (p/resolved nil))
:inflight (atom [])
:reconnect (atom {:attempt 0 :timer nil})
:pull-loop-timer (atom nil)
:last-ws-message-ts (atom (common-util/time-ms))
:online-users (atom [])
:ws-state (atom :closed)}]
(reset! worker-state/*db-sync-client client)
@@ -1161,6 +1175,7 @@
(defn- attach-ws-handlers! [repo client ws url]
(set! (.-onmessage ws)
(fn [event]
(touch-last-ws-message! client)
(handle-message! repo client (.-data event))))
(set! (.-onerror ws)
(fn [event]
@@ -1168,6 +1183,7 @@
(set! (.-onclose ws)
(fn [_]
(log/info :db-sync/ws-closed {:repo repo})
(clear-pull-loop-timer! client)
(update-online-users! client [])
(set-ws-state! client :closed)
(schedule-reconnect! repo client url :close))))
@@ -1178,10 +1194,37 @@
(set! (.-onerror ws) nil)
(set! (.-onclose ws) nil))
(defn- start-pull-loop! [client _ws]
(defn- start-pull-loop! [client ws]
(let [repo (:repo client)
graph-id (:graph-id client)]
(clear-pull-loop-timer! client)
(when-let [*timer (:pull-loop-timer client)]
(let [timer (js/setInterval
(fn []
(when-let [current @worker-state/*db-sync-client]
(when (and (= repo (:repo current))
(= graph-id (:graph-id current))
(identical? ws (:ws current))
(ws-open? ws)
(worker-state/online?))
(let [now (common-util/time-ms)
last-ts (or (some-> (:last-ws-message-ts current) deref) now)
stale-ms (- now last-ts)]
(if (>= stale-ms ws-stale-timeout-ms)
(do
(log/warn :db-sync/ws-stale-timeout {:repo repo :stale-ms stale-ms})
(try
(.close ws)
(catch :default _
nil)))
(let [local-tx (or (client-op/get-local-tx repo) 0)]
(send! ws {:type "pull" :since local-tx})))))))
ws-pull-loop-interval-ms)]
(reset! *timer timer))))
client)
(defn- stop-client! [client]
(clear-pull-loop-timer! client)
(when-let [reconnect (:reconnect client)]
(clear-reconnect-timer! reconnect))
(when-let [ws (:ws client)]
@@ -1202,6 +1245,7 @@
(set! (.-onopen ws)
(fn [_]
(reset-reconnect! updated)
(touch-last-ws-message! updated)
(set-ws-state! updated :open)
(send! ws {:type "hello" :client repo})
(enqueue-asset-sync! repo updated)))

View File

@@ -8,6 +8,7 @@
[frontend.worker.sync :as db-sync]
[frontend.worker.sync.client-op :as client-op]
[frontend.worker.sync.crypt :as sync-crypt]
[logseq.common.util :as common-util]
[logseq.db.test.helper :as db-test]
[logseq.outliner.core :as outliner-core]
[promesa.core :as p]))
@@ -110,6 +111,78 @@
@(:online-users client)))
(is (= 1 (count @broadcasts))))))
(deftest start-pull-loop-sends-periodic-pull-test
(let [prev-client @worker-state/*db-sync-client
prev-set-interval js/setInterval
prev-clear-interval js/clearInterval
*tick (atom nil)
sent-payloads (atom [])
ws #js {:readyState 1
:send (fn [payload] (swap! sent-payloads conj payload))
:close (fn [] nil)}
client {:repo test-repo
:graph-id "graph-1"
:ws ws
:pull-loop-timer (atom nil)
:last-ws-message-ts (atom 1000)
:reconnect (atom {:attempt 0 :timer nil})}]
(set! js/setInterval (fn [f _ms]
(reset! *tick f)
1))
(set! js/clearInterval (fn [_id] nil))
(try
(reset! worker-state/*db-sync-client client)
(with-redefs [worker-state/online? (fn [] true)
client-op/get-local-tx (fn [_repo] 42)
common-util/time-ms (fn [] 1010)]
(#'db-sync/start-pull-loop! client ws)
(is (fn? @*tick))
(when (fn? @*tick)
(@*tick))
(is (= 1 (count @sent-payloads)))
(let [payload (js->clj (js/JSON.parse (first @sent-payloads))
:keywordize-keys true)]
(is (= "pull" (:type payload)))
(is (= 42 (:since payload)))))
(finally
(set! js/setInterval prev-set-interval)
(set! js/clearInterval prev-clear-interval)
(reset! worker-state/*db-sync-client prev-client)))))
(deftest start-pull-loop-closes-stale-open-websocket-test
(let [prev-client @worker-state/*db-sync-client
prev-set-interval js/setInterval
prev-clear-interval js/clearInterval
*tick (atom nil)
close-called? (atom false)
ws #js {:readyState 1
:send (fn [_payload] nil)
:close (fn [] (reset! close-called? true))}
client {:repo test-repo
:graph-id "graph-1"
:ws ws
:pull-loop-timer (atom nil)
:last-ws-message-ts (atom 0)
:reconnect (atom {:attempt 0 :timer nil})}]
(set! js/setInterval (fn [f _ms]
(reset! *tick f)
1))
(set! js/clearInterval (fn [_id] nil))
(try
(reset! worker-state/*db-sync-client client)
(with-redefs [worker-state/online? (fn [] true)
client-op/get-local-tx (fn [_repo] 0)
common-util/time-ms (fn [] 1000000)]
(#'db-sync/start-pull-loop! client ws)
(is (fn? @*tick))
(when (fn? @*tick)
(@*tick))
(is (true? @close-called?)))
(finally
(set! js/setInterval prev-set-interval)
(set! js/clearInterval prev-clear-interval)
(reset! worker-state/*db-sync-client prev-client)))))
(deftest ^:long reparent-block-when-cycle-detected-test
(testing "cycle from remote sync reparent block to page root"
(let [{:keys [conn parent child1]} (setup-parent-child)]