From cba76460f21374e98bd54d522ed4e32b6f607765 Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Thu, 9 Apr 2026 22:43:47 +0800 Subject: [PATCH] enhance(sync): serialize pull/upload and dedupe pull --- src/main/frontend/worker/sync.cljs | 33 ++++- src/main/frontend/worker/sync/apply_txs.cljs | 117 ++++++++++-------- .../frontend/worker/sync/handle_message.cljs | 47 +++++-- 3 files changed, 137 insertions(+), 60 deletions(-) diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index e6ed2d52f8..8217d5148a 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -143,6 +143,21 @@ [ws message] (sync-transport/send! sync-transport/coerce-ws-client-message ws message)) +(defn- enqueue-receive-message! + [client task] + (if-let [queue (:receive-queue client)] + (swap! queue + (fn [prev] + (-> (or prev (p/resolved nil)) + ;; Keep queue alive even if one message handler fails. + (p/catch (fn [_] nil)) + (p/then (fn [_] (task))) + (p/catch (fn [error] + (log/error :db-sync/ws-handle-message-failed + {:repo (:repo client) + :error error})))))) + (task))) + (defn update-presence! [editing-block-uuid] (when-let [client @worker-state/*db-sync-client] @@ -161,7 +176,9 @@ [repo] {:repo repo :send-queue (atom (p/resolved nil)) + :receive-queue (atom (p/resolved nil)) :asset-queue (atom (p/resolved nil)) + :pending-pull-since (atom nil) :inflight (atom []) :reconnect (atom {:attempt 0 :timer nil}) :stale-kill-timer (atom nil) @@ -199,7 +216,9 @@ (set! (.-onmessage ws) (fn [event] (touch-last-ws-message! client) - (sync-handle-message/handle-message! repo client (.-data event)))) + (enqueue-receive-message! client + (fn [] + (sync-handle-message/handle-message! repo client (.-data event)))))) (set! (.-onerror ws) (fn [error] (log/error :db-sync/ws-error error))) (set! (.-onclose ws) (fn [_] @@ -347,3 +366,15 @@ (defn upload-graph! [repo] (sync-upload/upload-graph! repo)) + +(defn stop-upload! + [repo] + (sync-apply/set-upload-stopped! repo true)) + +(defn resume-upload! + [repo] + (sync-apply/set-upload-stopped! repo false)) + +(defn upload-stopped? + [repo] + (sync-apply/upload-stopped? repo)) diff --git a/src/main/frontend/worker/sync/apply_txs.cljs b/src/main/frontend/worker/sync/apply_txs.cljs index 51bf787008..266feb084d 100644 --- a/src/main/frontend/worker/sync/apply_txs.cljs +++ b/src/main/frontend/worker/sync/apply_txs.cljs @@ -32,11 +32,23 @@ (defonce *repo->latest-remote-tx (atom {})) (defonce *repo->latest-remote-checksum (atom {})) (defonce *upload-temp-opfs-pool (atom nil)) +;; Debug-only gate to reproduce one-way sync: +;; still pull/rebase remote txs, but skip local tx batch uploads. +(defonce *repo->upload-stopped? (atom {})) (defn fail-fast [tag data] (log/error tag data) (throw (ex-info (name tag) data))) +(defn set-upload-stopped! + [repo stopped?] + (swap! *repo->upload-stopped? assoc repo (boolean stopped?)) + (boolean stopped?)) + +(defn upload-stopped? + [repo] + (true? (get @*repo->upload-stopped? repo))) + (declare enqueue-asset-task!) (defn- current-client [repo] @@ -487,40 +499,56 @@ (when (and (ws-open? ws) (worker-state/online?)) (let [batch (pending-txs repo {:limit 50})] (when (seq batch) - (let [{:keys [tx-entries drop-tx-ids]} (prepare-upload-tx-entries conn batch)] - (when (seq drop-tx-ids) - (mark-pending-txs-false! repo drop-tx-ids)) - (when (seq tx-entries) - (-> (p/let [aes-key (when (sync-crypt/graph-e2ee? repo) - (sync-crypt/ {:tx (sqlite-util/write-transit-str tx-data)} - tx-id - (assoc :tx-id (str tx-id)) - outliner-op - (assoc :outliner-op outliner-op))) - tx-entries*) - tx-ids (mapv :tx-id tx-entries)] - (reset! (:inflight client) tx-ids) - (send! ws {:type "tx/batch" - :t-before local-tx - :txs payload})) - (p/catch (fn [error] - (js/console.error error)))))))))))))) + (when-not (upload-stopped? repo) + (let [{:keys [tx-entries drop-tx-ids]} (prepare-upload-tx-entries conn batch)] + (when (seq drop-tx-ids) + (mark-pending-txs-false! repo drop-tx-ids)) + (when (seq tx-entries) + (-> (p/let [aes-key (when (sync-crypt/graph-e2ee? repo) + (sync-crypt/ {:tx (sqlite-util/write-transit-str tx-data)} + tx-id + (assoc :tx-id (str tx-id)) + outliner-op + (assoc :outliner-op outliner-op))) + tx-entries*) + tx-ids (mapv :tx-id tx-entries)] + (reset! (:inflight client) tx-ids) + (send! ws {:type "tx/batch" + :t-before local-tx + :txs payload})) + (p/catch (fn [error] + (js/console.error error))))))))))))))) + +(defn enqueue-flush-pending! + [repo client] + (if-let [send-queue (:send-queue client)] + (swap! send-queue + (fn [prev] + (-> (or prev (p/resolved nil)) + (p/catch (fn [_] nil)) + (p/then (fn [_] + (flush-pending! repo client))) + (p/catch (fn [error] + (log/error :db-sync/flush-pending-queue-failed + {:repo repo + :error error})))))) + (flush-pending! repo client))) (defn- missing-order-add-op? [db item] @@ -572,14 +600,8 @@ (let [tx-data (->> (:tx-data remote-tx) (map (partial resolve-temp-id db)) seq) - report (try - (ldb/transact! conn tx-data {:transact-remote? true}) - (catch :default e - (js/console.error e) - (log/error ::transact-remote-txs! {:remote-tx remote-tx - :index (inc index) - :total (count remote-txs)}) - (throw e))) + report (ldb/transact! conn tx-data {:transact-remote? true + :t (:t remote-tx)}) results' (cond-> results tx-data (conj {:tx-data tx-data @@ -1075,16 +1097,7 @@ (persist-local-tx! repo tx-report normalized reversed-datoms) (when-let [client @worker-state/*db-sync-client] (when (= repo (:repo client)) - (let [send-queue (:send-queue client)] - (swap! send-queue - (fn [prev] - (p/then prev - (fn [_] - (when-let [current @worker-state/*db-sync-client] - (when (= repo (:repo current)) - (when-let [ws (:ws current)] - (when (ws-open? ws) - (flush-pending! repo current))))))))))))))) + (enqueue-flush-pending! repo client)))))) ;; (defonce *persist-promise (atom nil)) diff --git a/src/main/frontend/worker/sync/handle_message.cljs b/src/main/frontend/worker/sync/handle_message.cljs index 1efdd56ecb..07d092fbbe 100644 --- a/src/main/frontend/worker/sync/handle_message.cljs +++ b/src/main/frontend/worker/sync/handle_message.cljs @@ -73,6 +73,20 @@ (fn [prev] (p/then prev (fn [_] (task))))))) +(defn- enqueue-send-task! + [client task] + (if-let [queue (:send-queue client)] + (swap! queue + (fn [prev] + (-> (or prev (p/resolved nil)) + (p/catch (fn [_] nil)) + (p/then (fn [_] (task))) + (p/catch (fn [error] + (log/error :db-sync/send-queue-task-failed + {:repo (:repo client) + :error error})))))) + (task))) + (defn- current-client [repo] (sync-presence/current-client worker-state/*db-sync-client repo)) @@ -102,6 +116,25 @@ [value context] (sync-transport/parse-transit fail-fast value context)) +(defn- request-pull! + [client since] + (when (and (:ws client) (ws-open? (:ws client))) + (enqueue-send-task! + client + (fn [] + (when (and (:ws client) (ws-open? (:ws client))) + (if-let [*pending (:pending-pull-since client)] + (let [pending @*pending] + (when (or (nil? pending) (< since pending)) + (reset! *pending since) + (send! (:ws client) {:type "pull" :since since}))) + (send! (:ws client) {:type "pull" :since since}))))))) + +(defn- clear-pending-pull! + [client] + (when-let [*pending (:pending-pull-since client)] + (reset! *pending nil))) + (defn- pending-local-tx? [repo] (when-let [conn (client-ops-conn repo)] @@ -164,8 +197,7 @@ (require-uuid failed-tx-id {:repo repo :type "tx/reject" :field :failed-tx-id})) (case reason "stale" - (when (and (:ws client) (ws-open? (:ws client))) - (send! (:ws client) {:type "pull" :since local-tx})) + (request-pull! client local-tx) (let [inflight @(:inflight client) inflight-set (set inflight) @@ -208,14 +240,14 @@ (verify-sync-checksum! repo client local-tx remote-tx remote-checksum {:type "hello"}) (broadcast-rtc-state! client) (when (> remote-tx local-tx) - (send! (:ws client) {:type "pull" :since local-tx})) + (request-pull! client local-tx)) (sync-assets/enqueue-asset-sync! repo client {:enqueue-asset-task-f enqueue-asset-task! :current-client-f current-client :broadcast-rtc-state!-f broadcast-rtc-state! :fail-fast-f fail-fast}) - (sync-apply/flush-pending! repo client)) + (sync-apply/enqueue-flush-pending! repo client)) (defn- handle-online-users! [repo client message] @@ -241,7 +273,7 @@ (sync-apply/mark-pending-txs-false! repo @(:inflight client)) (reset! (:inflight client) []) (verify-sync-checksum! repo client next-local-tx remote-tx remote-checksum {:type "tx/batch/ok"}) - (sync-apply/flush-pending! repo client))) + (sync-apply/enqueue-flush-pending! repo client))) (defn- update-latest-remote-state! [repo message] @@ -290,6 +322,7 @@ (defn- handle-pull-ok! [repo client local-tx remote-tx remote-checksum message] + (clear-pending-pull! client) (when (> remote-tx local-tx) (let [txs (:txs message)] (require-non-negative remote-tx {:repo repo :type "pull/ok"}) @@ -319,11 +352,11 @@ (client-op/update-local-tx repo remote-tx) (broadcast-rtc-state! client) (verify-sync-checksum! repo client remote-tx remote-tx remote-checksum {:type "pull/ok"}) - (sync-apply/flush-pending! repo client))))))) + (sync-apply/enqueue-flush-pending! repo client))))))) (defn- handle-changed! [repo client local-tx remote-tx] (require-non-negative remote-tx {:repo repo :type "changed"}) (broadcast-rtc-state! client) (when (< local-tx remote-tx) - (send! (:ws client) {:type "pull" :since local-tx}))) + (request-pull! client local-tx)))