diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs index 632004d5bd..29f60b860b 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs @@ -299,123 +299,150 @@ {:type "tx/reject" :reason "empty tx data"})))) +(defn- handle-sync-pull + [^js self ^js url] + (let [raw-since (.get (.-searchParams url) "since") + since (if (some? raw-since) (parse-int raw-since) 0)] + (if (or (and (some? raw-since) (not (number? since))) (neg? since)) + (http/bad-request "invalid since") + (http/json-response :sync/pull (pull-response self since))))) + +(defn- handle-sync-snapshot-stream + [^js self request] + (let [graph-id (graph-id-from-request request)] + (if (not (seq graph-id)) + (http/bad-request "missing graph id") + (let [use-compression? (exists? js/CompressionStream) + content-encoding (when use-compression? snapshot-content-encoding) + stream (snapshot-export-stream self) + stream (if use-compression? + (maybe-compress-stream stream) + stream)] + (js/Response. stream + #js {:status 200 + :headers (js/Object.assign + #js {"content-type" snapshot-content-type + "content-encoding" (or content-encoding "identity")} + (common/cors-headers))}))))) + +(defn- handle-sync-snapshot-download + [^js self request] + (let [graph-id (graph-id-from-request request) + ^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))] + (cond + (not (seq graph-id)) + (http/bad-request "missing graph id") + + (nil? bucket) + (http/error-response "missing assets bucket" 500) + + :else + (p/let [snapshot-id (str (random-uuid)) + key (snapshot-key graph-id snapshot-id) + use-compression? (exists? js/CompressionStream) + content-encoding (when use-compression? snapshot-content-encoding) + stream (snapshot-export-stream self) + stream (if use-compression? + (maybe-compress-stream stream) + stream) + multipart? (and (some? (.-createMultipartUpload bucket)) + (fn? (.-createMultipartUpload bucket))) + opts #js {:httpMetadata #js {:contentType snapshot-content-type + :contentEncoding content-encoding + :cacheControl snapshot-cache-control} + :customMetadata #js {:purpose "snapshot" + :created-at (str (common/now-ms))}} + _ (if multipart? + (upload-multipart! bucket key stream opts) + (if use-compression? + (p/let [body (clj result :keywordize-keys true) + body (http/coerce-http-request :sync/tx-batch body)] + (if (nil? body) + (http/bad-request "invalid tx") + (let [{:keys [txs t-before]} body + t-before (parse-int t-before)] + (if (string? txs) + (http/json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before)) + (http/bad-request "invalid tx"))))))))) + +(defn- parse-reset-param + [value] + (if (nil? value) + true + (not (contains? #{"false" "0"} value)))) + +(defn- handle-sync-snapshot-upload + [^js self request url] + (let [graph-id (graph-id-from-request request) + reset-param (.get (.-searchParams url) "reset") + reset? (parse-reset-param reset-param) + req-encoding (.get (.-headers request) "content-encoding")] + (cond + (not (seq graph-id)) + (http/bad-request "missing graph id") + + (nil? (.-body request)) + (http/bad-request "missing body") + + :else + (let [stream (.-body request) + encoding (or req-encoding "")] + (if (and (= encoding snapshot-content-encoding) + (not (exists? js/DecompressionStream))) + (http/error-response "gzip not supported" 500) + (p/let [stream (maybe-decompress-stream stream encoding) + count (import-snapshot-stream! self stream reset?)] + (http/json-response :sync/snapshot-upload {:ok true + :count count}))))))) + (defn handle [{:keys [^js self request url route]}] (case (:handler route) :sync/health (http/json-response :sync/health {:ok true}) :sync/pull - (let [raw-since (.get (.-searchParams url) "since") - since (if (some? raw-since) (parse-int raw-since) 0)] - (if (or (and (some? raw-since) (not (number? since))) (neg? since)) - (http/bad-request "invalid since") - (http/json-response :sync/pull (pull-response self since)))) + (handle-sync-pull self url) :sync/snapshot-stream - (let [graph-id (graph-id-from-request request)] - (if (not (seq graph-id)) - (http/bad-request "missing graph id") - (let [use-compression? (exists? js/CompressionStream) - content-encoding (when use-compression? snapshot-content-encoding) - stream (snapshot-export-stream self) - stream (if use-compression? - (maybe-compress-stream stream) - stream)] - (js/Response. stream - #js {:status 200 - :headers (js/Object.assign - #js {"content-type" snapshot-content-type - "content-encoding" (or content-encoding "identity")} - (common/cors-headers))})))) + (handle-sync-snapshot-stream self request) :sync/snapshot-download - (let [graph-id (graph-id-from-request request) - ^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))] - (cond - (not (seq graph-id)) - (http/bad-request "missing graph id") - - (nil? bucket) - (http/error-response "missing assets bucket" 500) - - :else - (p/let [snapshot-id (str (random-uuid)) - key (snapshot-key graph-id snapshot-id) - use-compression? (exists? js/CompressionStream) - content-encoding (when use-compression? snapshot-content-encoding) - stream (snapshot-export-stream self) - stream (if use-compression? - (maybe-compress-stream stream) - stream) - multipart? (and (some? (.-createMultipartUpload bucket)) - (fn? (.-createMultipartUpload bucket))) - opts #js {:httpMetadata #js {:contentType snapshot-content-type - :contentEncoding content-encoding - :cacheControl snapshot-cache-control} - :customMetadata #js {:purpose "snapshot" - :created-at (str (common/now-ms))}} - _ (if multipart? - (upload-multipart! bucket key stream opts) - (if use-compression? - (p/let [body (clj result :keywordize-keys true) - body (http/coerce-http-request :sync/tx-batch body)] - (if (nil? body) - (http/bad-request "invalid tx") - (let [{:keys [txs t-before]} body - t-before (parse-int t-before)] - (if (string? txs) - (http/json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before)) - (http/bad-request "invalid tx")))))))) + (handle-sync-tx-batch self request) :sync/snapshot-upload - (let [graph-id (graph-id-from-request request) - reset-param (.get (.-searchParams url) "reset") - reset? (if (nil? reset-param) - true - (not (contains? #{"false" "0"} reset-param))) - req-encoding (.get (.-headers request) "content-encoding")] - (cond - (not (seq graph-id)) - (http/bad-request "missing graph id") - - (nil? (.-body request)) - (http/bad-request "missing body") - - :else - (let [stream (.-body request) - encoding (or req-encoding "")] - (if (and (= encoding snapshot-content-encoding) - (not (exists? js/DecompressionStream))) - (http/error-response "gzip not supported" 500) - (p/let [stream (maybe-decompress-stream stream encoding) - count (import-snapshot-stream! self stream reset?)] - (http/json-response :sync/snapshot-upload {:ok true - :count count})))))) + (handle-sync-snapshot-upload self request url) (http/not-found))) diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index 1733e71740..0be3d4aa1b 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -154,6 +154,8 @@ (def ^:private reconnect-base-delay-ms 1000) (def ^:private reconnect-max-delay-ms 30000) (def ^:private reconnect-jitter-ms 250) +(def ^:private ws-pull-loop-interval-ms 30000) +(def ^:private ws-stale-timeout-ms 120000) (def ^:private large-title-byte-limit 4096) (def ^:private large-title-asset-type "txt") (def ^:private large-title-object-attr :logseq.property.sync/large-title-object) @@ -288,6 +290,16 @@ (clear-reconnect-timer! reconnect) (swap! reconnect assoc :attempt 0))) +(defn- clear-pull-loop-timer! [client] + (when-let [*timer (:pull-loop-timer client)] + (when-let [timer @*timer] + (js/clearInterval timer) + (reset! *timer nil)))) + +(defn- touch-last-ws-message! [client] + (when-let [*ts (:last-ws-message-ts client)] + (reset! *ts (common-util/time-ms)))) + (defn- send! [ws message] (when (ws-open? ws) (if-let [coerced (coerce-ws-client-message message)] @@ -552,6 +564,8 @@ :asset-queue (atom (p/resolved nil)) :inflight (atom []) :reconnect (atom {:attempt 0 :timer nil}) + :pull-loop-timer (atom nil) + :last-ws-message-ts (atom (common-util/time-ms)) :online-users (atom []) :ws-state (atom :closed)}] (reset! worker-state/*db-sync-client client) @@ -1161,6 +1175,7 @@ (defn- attach-ws-handlers! [repo client ws url] (set! (.-onmessage ws) (fn [event] + (touch-last-ws-message! client) (handle-message! repo client (.-data event)))) (set! (.-onerror ws) (fn [event] @@ -1168,6 +1183,7 @@ (set! (.-onclose ws) (fn [_] (log/info :db-sync/ws-closed {:repo repo}) + (clear-pull-loop-timer! client) (update-online-users! client []) (set-ws-state! client :closed) (schedule-reconnect! repo client url :close)))) @@ -1178,10 +1194,37 @@ (set! (.-onerror ws) nil) (set! (.-onclose ws) nil)) -(defn- start-pull-loop! [client _ws] +(defn- start-pull-loop! [client ws] + (let [repo (:repo client) + graph-id (:graph-id client)] + (clear-pull-loop-timer! client) + (when-let [*timer (:pull-loop-timer client)] + (let [timer (js/setInterval + (fn [] + (when-let [current @worker-state/*db-sync-client] + (when (and (= repo (:repo current)) + (= graph-id (:graph-id current)) + (identical? ws (:ws current)) + (ws-open? ws) + (worker-state/online?)) + (let [now (common-util/time-ms) + last-ts (or (some-> (:last-ws-message-ts current) deref) now) + stale-ms (- now last-ts)] + (if (>= stale-ms ws-stale-timeout-ms) + (do + (log/warn :db-sync/ws-stale-timeout {:repo repo :stale-ms stale-ms}) + (try + (.close ws) + (catch :default _ + nil))) + (let [local-tx (or (client-op/get-local-tx repo) 0)] + (send! ws {:type "pull" :since local-tx}))))))) + ws-pull-loop-interval-ms)] + (reset! *timer timer)))) client) (defn- stop-client! [client] + (clear-pull-loop-timer! client) (when-let [reconnect (:reconnect client)] (clear-reconnect-timer! reconnect)) (when-let [ws (:ws client)] @@ -1202,6 +1245,7 @@ (set! (.-onopen ws) (fn [_] (reset-reconnect! updated) + (touch-last-ws-message! updated) (set-ws-state! updated :open) (send! ws {:type "hello" :client repo}) (enqueue-asset-sync! repo updated))) diff --git a/src/test/frontend/worker/db_sync_test.cljs b/src/test/frontend/worker/db_sync_test.cljs index 6db823a01c..aa41c4d615 100644 --- a/src/test/frontend/worker/db_sync_test.cljs +++ b/src/test/frontend/worker/db_sync_test.cljs @@ -8,6 +8,7 @@ [frontend.worker.sync :as db-sync] [frontend.worker.sync.client-op :as client-op] [frontend.worker.sync.crypt :as sync-crypt] + [logseq.common.util :as common-util] [logseq.db.test.helper :as db-test] [logseq.outliner.core :as outliner-core] [promesa.core :as p])) @@ -110,6 +111,78 @@ @(:online-users client))) (is (= 1 (count @broadcasts)))))) +(deftest start-pull-loop-sends-periodic-pull-test + (let [prev-client @worker-state/*db-sync-client + prev-set-interval js/setInterval + prev-clear-interval js/clearInterval + *tick (atom nil) + sent-payloads (atom []) + ws #js {:readyState 1 + :send (fn [payload] (swap! sent-payloads conj payload)) + :close (fn [] nil)} + client {:repo test-repo + :graph-id "graph-1" + :ws ws + :pull-loop-timer (atom nil) + :last-ws-message-ts (atom 1000) + :reconnect (atom {:attempt 0 :timer nil})}] + (set! js/setInterval (fn [f _ms] + (reset! *tick f) + 1)) + (set! js/clearInterval (fn [_id] nil)) + (try + (reset! worker-state/*db-sync-client client) + (with-redefs [worker-state/online? (fn [] true) + client-op/get-local-tx (fn [_repo] 42) + common-util/time-ms (fn [] 1010)] + (#'db-sync/start-pull-loop! client ws) + (is (fn? @*tick)) + (when (fn? @*tick) + (@*tick)) + (is (= 1 (count @sent-payloads))) + (let [payload (js->clj (js/JSON.parse (first @sent-payloads)) + :keywordize-keys true)] + (is (= "pull" (:type payload))) + (is (= 42 (:since payload))))) + (finally + (set! js/setInterval prev-set-interval) + (set! js/clearInterval prev-clear-interval) + (reset! worker-state/*db-sync-client prev-client))))) + +(deftest start-pull-loop-closes-stale-open-websocket-test + (let [prev-client @worker-state/*db-sync-client + prev-set-interval js/setInterval + prev-clear-interval js/clearInterval + *tick (atom nil) + close-called? (atom false) + ws #js {:readyState 1 + :send (fn [_payload] nil) + :close (fn [] (reset! close-called? true))} + client {:repo test-repo + :graph-id "graph-1" + :ws ws + :pull-loop-timer (atom nil) + :last-ws-message-ts (atom 0) + :reconnect (atom {:attempt 0 :timer nil})}] + (set! js/setInterval (fn [f _ms] + (reset! *tick f) + 1)) + (set! js/clearInterval (fn [_id] nil)) + (try + (reset! worker-state/*db-sync-client client) + (with-redefs [worker-state/online? (fn [] true) + client-op/get-local-tx (fn [_repo] 0) + common-util/time-ms (fn [] 1000000)] + (#'db-sync/start-pull-loop! client ws) + (is (fn? @*tick)) + (when (fn? @*tick) + (@*tick)) + (is (true? @close-called?))) + (finally + (set! js/setInterval prev-set-interval) + (set! js/clearInterval prev-clear-interval) + (reset! worker-state/*db-sync-client prev-client))))) + (deftest ^:long reparent-block-when-cycle-detected-test (testing "cycle from remote sync reparent block to page root" (let [{:keys [conn parent child1]} (setup-parent-child)]