From 576bcc5132b671cbed01a24723fc3b6d52abca0e Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Tue, 13 Jan 2026 21:51:19 +0800 Subject: [PATCH] sync rebase --- src/main/frontend/worker/db_sync.cljs | 116 +++++++++++++++----------- 1 file changed, 68 insertions(+), 48 deletions(-) diff --git a/src/main/frontend/worker/db_sync.cljs b/src/main/frontend/worker/db_sync.cljs index a2f67c1cc0..e96485521f 100644 --- a/src/main/frontend/worker/db_sync.cljs +++ b/src/main/frontend/worker/db_sync.cljs @@ -641,57 +641,77 @@ (defn- rebase-apply-remote-tx! [repo client tx-data] (if-let [conn (worker-state/get-datascript-conn repo)] - (try - (let [local-txs (pending-txs repo) - reversed-tx-data (->> local-txs - (mapcat :reversed-tx) - reverse - db-normalize/replace-attr-retract-with-retract-entity-v2) - tx-report (ldb/transact-with-temp-conn! - conn - {:rtc-tx? true} - (fn [temp-conn _*batch-tx-data] - (let [db @temp-conn - ;; 1. rebase - rebase (:db-after (d/with db reversed-tx-data)) - tx-report (d/with rebase tx-data)] - ;; TODO: 2. ensure checksum matches between client & server + (let [local-txs (pending-txs repo) + pending-tx-ids (mapv :tx-id local-txs) + rebased-pending (atom nil) + reversed-tx-data (->> local-txs + (mapcat :reversed-tx) + reverse + db-normalize/replace-attr-retract-with-retract-entity-v2) + tx-report (ldb/transact-with-temp-conn! + conn + {:rtc-tx? true} + (fn [temp-conn _*batch-tx-data] + (let [db @temp-conn + ;; 1. reverse local pending txs + reversed-db (:db-after (d/with db reversed-tx-data)) + tx-report (d/with reversed-db tx-data)] + ;; TODO: 2. ensure checksum matches between client & server - ;; 3. fix data - (let [deleted-blocks (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report)) - tx-meta {:persist-op? false - :gen-undo-ops? false}] - (when (seq deleted-blocks) - (let [nodes (map #(d/entity @temp-conn (:db/id %)) deleted-blocks) - pages (filter ldb/page? nodes) - blocks (remove ldb/page? nodes)] - ;; deleting pages first - (doseq [page pages] - (worker-page/delete! temp-conn (:block/uuid page) tx-meta)) - (when (seq blocks) - (outliner-tx/transact! - (assoc tx-meta - :outliner-op :delete-blocks - :transact-opts {:conn temp-conn}) - (outliner-core/delete-blocks! temp-conn blocks {}))))) + ;; 3. fix data + (let [deleted-blocks (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report)) + tx-meta {:persist-op? false + :gen-undo-ops? false}] + (when (seq deleted-blocks) + (let [nodes (map #(d/entity @temp-conn (:db/id %)) deleted-blocks) + pages (filter ldb/page? nodes) + blocks (remove ldb/page? nodes)] + ;; deleting pages first + (doseq [page pages] + (worker-page/delete! temp-conn (:block/uuid page) tx-meta)) + (when (seq blocks) + (outliner-tx/transact! + (assoc tx-meta + :outliner-op :delete-blocks + :transact-opts {:conn temp-conn}) + (outliner-core/delete-blocks! temp-conn blocks {}))))) - ;; 4. apply remote tx-data - (when (seq tx-data) - (let [rtc-tx-data (sanitize-remote-tx-data @temp-conn tx-data) - tx-report (ldb/transact! temp-conn rtc-tx-data)] - (sync-order/fix-duplicate-orders! temp-conn (:tx-data tx-report))))))))] + ;; 4. apply remote tx-data + (let [rtc-tx-data (sanitize-remote-tx-data @temp-conn tx-data) + tx-report (ldb/transact! temp-conn rtc-tx-data)] + (sync-order/fix-duplicate-orders! temp-conn (:tx-data tx-report))) - (when tx-report - (let [db-after (:db-after tx-report) - asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))] - (when (seq asset-uuids) - (enqueue-asset-downloads! repo client asset-uuids)))) + ;; 5. rebase pending local txs for the new remote base + (when (seq local-txs) + (let [pending-tx-data (->> local-txs + (mapcat :tx) + keep-last-parent-update) + db (:db-after @temp-conn) + pending-tx-data (sanitize-remote-tx-data db pending-tx-data)] + (when (seq pending-tx-data) + (let [rebased-report (d/with (:db-after tx-report) pending-tx-data) + normalized (normalize-tx-data + (:db-after rebased-report) + (:db-after tx-report) + (:tx-data rebased-report)) + reversed-datoms (map (fn [[e a v _t added]] + [(if added :db/retract :db/add) e a v]) + (:tx-data rebased-report))] + (reset! rebased-pending {:normalized normalized + :reversed reversed-datoms})))))))))] - ;; TODO: Remove all pending txs, insert the above one - ) - (catch :default e - (log/error :db-sync/apply-remote-tx-failed {:error e}) - (throw e))) + (when (seq local-txs) + (if-let [{:keys [normalized reversed]} @rebased-pending] + (when (seq normalized) + (when (persist-local-tx! repo normalized reversed {:persist-op? false}) + (remove-pending-txs! repo pending-tx-ids))) + (log/info :db-sync/pending-txs-retained {:repo repo :reason :rebased-empty}))) + + (when tx-report + (let [db-after (:db-after tx-report) + asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))] + (when (seq asset-uuids) + (enqueue-asset-downloads! repo client asset-uuids))))) (fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx}))) (defn- handle-message! [repo client raw] @@ -725,7 +745,7 @@ tx (mapcat (fn [data] (parse-transit (:tx data) {:repo repo :type "pull/ok"})) txs)] - (when tx + (when (seq tx) (rebase-apply-remote-tx! repo client tx) (client-op/update-local-tx repo remote-tx) (flush-pending! repo client)))