enhance(sync): serialize pull/upload and dedupe pull

This commit is contained in:
Tienson Qin
2026-04-09 22:43:47 +08:00
parent 75bf1e683b
commit cba76460f2
3 changed files with 137 additions and 60 deletions

View File

@@ -143,6 +143,21 @@
[ws message]
(sync-transport/send! sync-transport/coerce-ws-client-message ws message))
(defn- enqueue-receive-message!
[client task]
(if-let [queue (:receive-queue client)]
(swap! queue
(fn [prev]
(-> (or prev (p/resolved nil))
;; Keep queue alive even if one message handler fails.
(p/catch (fn [_] nil))
(p/then (fn [_] (task)))
(p/catch (fn [error]
(log/error :db-sync/ws-handle-message-failed
{:repo (:repo client)
:error error}))))))
(task)))
(defn update-presence!
[editing-block-uuid]
(when-let [client @worker-state/*db-sync-client]
@@ -161,7 +176,9 @@
[repo]
{:repo repo
:send-queue (atom (p/resolved nil))
:receive-queue (atom (p/resolved nil))
:asset-queue (atom (p/resolved nil))
:pending-pull-since (atom nil)
:inflight (atom [])
:reconnect (atom {:attempt 0 :timer nil})
:stale-kill-timer (atom nil)
@@ -199,7 +216,9 @@
(set! (.-onmessage ws)
(fn [event]
(touch-last-ws-message! client)
(sync-handle-message/handle-message! repo client (.-data event))))
(enqueue-receive-message! client
(fn []
(sync-handle-message/handle-message! repo client (.-data event))))))
(set! (.-onerror ws) (fn [error] (log/error :db-sync/ws-error error)))
(set! (.-onclose ws)
(fn [_]
@@ -347,3 +366,15 @@
(defn upload-graph!
[repo]
(sync-upload/upload-graph! repo))
(defn stop-upload!
[repo]
(sync-apply/set-upload-stopped! repo true))
(defn resume-upload!
[repo]
(sync-apply/set-upload-stopped! repo false))
(defn upload-stopped?
[repo]
(sync-apply/upload-stopped? repo))

View File

@@ -32,11 +32,23 @@
(defonce *repo->latest-remote-tx (atom {}))
(defonce *repo->latest-remote-checksum (atom {}))
(defonce *upload-temp-opfs-pool (atom nil))
;; Debug-only gate to reproduce one-way sync:
;; still pull/rebase remote txs, but skip local tx batch uploads.
(defonce *repo->upload-stopped? (atom {}))
(defn fail-fast [tag data]
(log/error tag data)
(throw (ex-info (name tag) data)))
(defn set-upload-stopped!
[repo stopped?]
(swap! *repo->upload-stopped? assoc repo (boolean stopped?))
(boolean stopped?))
(defn upload-stopped?
[repo]
(true? (get @*repo->upload-stopped? repo)))
(declare enqueue-asset-task!)
(defn- current-client [repo]
@@ -487,40 +499,56 @@
(when (and (ws-open? ws) (worker-state/online?))
(let [batch (pending-txs repo {:limit 50})]
(when (seq batch)
(let [{:keys [tx-entries drop-tx-ids]} (prepare-upload-tx-entries conn batch)]
(when (seq drop-tx-ids)
(mark-pending-txs-false! repo drop-tx-ids))
(when (seq tx-entries)
(-> (p/let [aes-key (when (sync-crypt/graph-e2ee? repo)
(sync-crypt/<ensure-graph-aes-key repo (:graph-id client)))
_ (when (and (sync-crypt/graph-e2ee? repo) (nil? aes-key))
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
tx-entries* (p/all
(mapv (fn [{:keys [tx-data] :as tx-entry}]
(p/let [tx-data* (offload-large-titles
tx-data
{:repo repo
:graph-id (:graph-id client)
:aes-key aes-key})
tx-data** (if aes-key
(sync-crypt/<encrypt-tx-data aes-key tx-data*)
tx-data*)]
(assoc tx-entry :tx-data tx-data**)))
tx-entries))
payload (mapv (fn [{:keys [tx-id tx-data outliner-op]}]
(cond-> {:tx (sqlite-util/write-transit-str tx-data)}
tx-id
(assoc :tx-id (str tx-id))
outliner-op
(assoc :outliner-op outliner-op)))
tx-entries*)
tx-ids (mapv :tx-id tx-entries)]
(reset! (:inflight client) tx-ids)
(send! ws {:type "tx/batch"
:t-before local-tx
:txs payload}))
(p/catch (fn [error]
(js/console.error error))))))))))))))
(when-not (upload-stopped? repo)
(let [{:keys [tx-entries drop-tx-ids]} (prepare-upload-tx-entries conn batch)]
(when (seq drop-tx-ids)
(mark-pending-txs-false! repo drop-tx-ids))
(when (seq tx-entries)
(-> (p/let [aes-key (when (sync-crypt/graph-e2ee? repo)
(sync-crypt/<ensure-graph-aes-key repo (:graph-id client)))
_ (when (and (sync-crypt/graph-e2ee? repo) (nil? aes-key))
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
tx-entries* (p/all
(mapv (fn [{:keys [tx-data] :as tx-entry}]
(p/let [tx-data* (offload-large-titles
tx-data
{:repo repo
:graph-id (:graph-id client)
:aes-key aes-key})
tx-data** (if aes-key
(sync-crypt/<encrypt-tx-data aes-key tx-data*)
tx-data*)]
(assoc tx-entry :tx-data tx-data**)))
tx-entries))
payload (mapv (fn [{:keys [tx-id tx-data outliner-op]}]
(cond-> {:tx (sqlite-util/write-transit-str tx-data)}
tx-id
(assoc :tx-id (str tx-id))
outliner-op
(assoc :outliner-op outliner-op)))
tx-entries*)
tx-ids (mapv :tx-id tx-entries)]
(reset! (:inflight client) tx-ids)
(send! ws {:type "tx/batch"
:t-before local-tx
:txs payload}))
(p/catch (fn [error]
(js/console.error error)))))))))))))))
(defn enqueue-flush-pending!
[repo client]
(if-let [send-queue (:send-queue client)]
(swap! send-queue
(fn [prev]
(-> (or prev (p/resolved nil))
(p/catch (fn [_] nil))
(p/then (fn [_]
(flush-pending! repo client)))
(p/catch (fn [error]
(log/error :db-sync/flush-pending-queue-failed
{:repo repo
:error error}))))))
(flush-pending! repo client)))
(defn- missing-order-add-op?
[db item]
@@ -572,14 +600,8 @@
(let [tx-data (->> (:tx-data remote-tx)
(map (partial resolve-temp-id db))
seq)
report (try
(ldb/transact! conn tx-data {:transact-remote? true})
(catch :default e
(js/console.error e)
(log/error ::transact-remote-txs! {:remote-tx remote-tx
:index (inc index)
:total (count remote-txs)})
(throw e)))
report (ldb/transact! conn tx-data {:transact-remote? true
:t (:t remote-tx)})
results' (cond-> results
tx-data
(conj {:tx-data tx-data
@@ -1075,16 +1097,7 @@
(persist-local-tx! repo tx-report normalized reversed-datoms)
(when-let [client @worker-state/*db-sync-client]
(when (= repo (:repo client))
(let [send-queue (:send-queue client)]
(swap! send-queue
(fn [prev]
(p/then prev
(fn [_]
(when-let [current @worker-state/*db-sync-client]
(when (= repo (:repo current))
(when-let [ws (:ws current)]
(when (ws-open? ws)
(flush-pending! repo current)))))))))))))))
(enqueue-flush-pending! repo client))))))
;; (defonce *persist-promise (atom nil))

View File

@@ -73,6 +73,20 @@
(fn [prev]
(p/then prev (fn [_] (task)))))))
(defn- enqueue-send-task!
[client task]
(if-let [queue (:send-queue client)]
(swap! queue
(fn [prev]
(-> (or prev (p/resolved nil))
(p/catch (fn [_] nil))
(p/then (fn [_] (task)))
(p/catch (fn [error]
(log/error :db-sync/send-queue-task-failed
{:repo (:repo client)
:error error}))))))
(task)))
(defn- current-client
[repo]
(sync-presence/current-client worker-state/*db-sync-client repo))
@@ -102,6 +116,25 @@
[value context]
(sync-transport/parse-transit fail-fast value context))
(defn- request-pull!
[client since]
(when (and (:ws client) (ws-open? (:ws client)))
(enqueue-send-task!
client
(fn []
(when (and (:ws client) (ws-open? (:ws client)))
(if-let [*pending (:pending-pull-since client)]
(let [pending @*pending]
(when (or (nil? pending) (< since pending))
(reset! *pending since)
(send! (:ws client) {:type "pull" :since since})))
(send! (:ws client) {:type "pull" :since since})))))))
(defn- clear-pending-pull!
[client]
(when-let [*pending (:pending-pull-since client)]
(reset! *pending nil)))
(defn- pending-local-tx?
[repo]
(when-let [conn (client-ops-conn repo)]
@@ -164,8 +197,7 @@
(require-uuid failed-tx-id {:repo repo :type "tx/reject" :field :failed-tx-id}))
(case reason
"stale"
(when (and (:ws client) (ws-open? (:ws client)))
(send! (:ws client) {:type "pull" :since local-tx}))
(request-pull! client local-tx)
(let [inflight @(:inflight client)
inflight-set (set inflight)
@@ -208,14 +240,14 @@
(verify-sync-checksum! repo client local-tx remote-tx remote-checksum {:type "hello"})
(broadcast-rtc-state! client)
(when (> remote-tx local-tx)
(send! (:ws client) {:type "pull" :since local-tx}))
(request-pull! client local-tx))
(sync-assets/enqueue-asset-sync!
repo client
{:enqueue-asset-task-f enqueue-asset-task!
:current-client-f current-client
:broadcast-rtc-state!-f broadcast-rtc-state!
:fail-fast-f fail-fast})
(sync-apply/flush-pending! repo client))
(sync-apply/enqueue-flush-pending! repo client))
(defn- handle-online-users!
[repo client message]
@@ -241,7 +273,7 @@
(sync-apply/mark-pending-txs-false! repo @(:inflight client))
(reset! (:inflight client) [])
(verify-sync-checksum! repo client next-local-tx remote-tx remote-checksum {:type "tx/batch/ok"})
(sync-apply/flush-pending! repo client)))
(sync-apply/enqueue-flush-pending! repo client)))
(defn- update-latest-remote-state!
[repo message]
@@ -290,6 +322,7 @@
(defn- handle-pull-ok!
[repo client local-tx remote-tx remote-checksum message]
(clear-pending-pull! client)
(when (> remote-tx local-tx)
(let [txs (:txs message)]
(require-non-negative remote-tx {:repo repo :type "pull/ok"})
@@ -319,11 +352,11 @@
(client-op/update-local-tx repo remote-tx)
(broadcast-rtc-state! client)
(verify-sync-checksum! repo client remote-tx remote-tx remote-checksum {:type "pull/ok"})
(sync-apply/flush-pending! repo client)))))))
(sync-apply/enqueue-flush-pending! repo client)))))))
(defn- handle-changed!
[repo client local-tx remote-tx]
(require-non-negative remote-tx {:repo repo :type "changed"})
(broadcast-rtc-state! client)
(when (< local-tx remote-tx)
(send! (:ws client) {:type "pull" :since local-tx})))
(request-pull! client local-tx)))