another try of client fix

This commit is contained in:
Tienson Qin
2026-01-13 18:08:36 +08:00
parent 8c8ca9ce45
commit 2afc7fe48d
6 changed files with 228 additions and 31 deletions

View File

@@ -2,15 +2,21 @@
"Simple db-sync client based on promesa + WebSocket."
(:require [clojure.string :as string]
[datascript.core :as d]
[frontend.worker.handler.page :as worker-page]
[frontend.worker.rtc.client-op :as client-op]
[frontend.worker.state :as worker-state]
[lambdaisland.glogi :as log]
[logseq.common.path :as path]
[logseq.common.util :as common-util]
[logseq.db :as ldb]
[logseq.db-sync.cycle :as db-sync-cycle]
[logseq.db-sync.malli-schema :as db-sync-schema]
[logseq.db-sync.order :as sync-order]
[logseq.db.common.normalize :as db-normalize]
[logseq.db.sqlite.util :as sqlite-util]
[logseq.outliner.core :as outliner-core]
[logseq.outliner.pipeline :as outliner-pipeline]
[logseq.outliner.transaction :as outliner-tx]
[promesa.core :as p]))
(defonce *repo->latest-remote-tx (atom {}))
@@ -278,6 +284,116 @@
[(second item) (nth item 2)]
item)))))
(defn- normalize-entity-ref
[ref]
(cond
(uuid? ref) [:block/uuid ref]
(keyword? ref) [:db/ident ref]
:else ref))
(defn- entity-ref->entid
[db ref]
(cond
(nil? ref) nil
(number? ref) (when (pos? ref) ref)
(uuid? ref) (d/entid db [:block/uuid ref])
(vector? ref) (d/entid db ref)
(keyword? ref) (d/entid db [:db/ident ref])
:else nil))
(defn- entity-ref-matches?
[db target ref]
(let [target-entid (entity-ref->entid db target)
ref-entid (entity-ref->entid db ref)]
(cond
(and target-entid ref-entid) (= target-entid ref-entid)
:else (= (normalize-entity-ref target) (normalize-entity-ref ref)))))
(defn- valid-entity-ref?
[db ref]
(cond
(nil? ref) false
(number? ref) (or (neg? ref) (some? (d/entity db ref)))
:else (some? (entity-ref->entid db ref))))
(defn- ref-attr?
[db attr]
(when attr
(= :db.type/ref (:db/valueType (d/entity db attr)))))
(defn- drop-invalid-refs
[db tx-data]
(->> tx-data
(keep (fn [tx]
(cond
(and (vector? tx) (= :db.fn/retractEntity (first tx)))
(when (valid-entity-ref? db (second tx))
tx)
(and (vector? tx) (= 4 (count tx)))
(let [[_op e a v] tx]
(when (valid-entity-ref? db e)
(if (and (ref-attr? db a) (some? v))
(when (valid-entity-ref? db v)
tx)
tx)))
:else
tx)))))
(defn- remove-attr-updates
[db tx-data attr entity-ref]
(remove (fn [tx]
(and (vector? tx)
(= attr (nth tx 2 nil))
(entity-ref-matches? db entity-ref (nth tx 1 nil))))
tx-data))
(defn- tx-entity-ref
[db entity-ref]
(or (entity-ref->entid db entity-ref)
(normalize-entity-ref entity-ref)))
(defn- replace-parent-with-page-root
[db tx-data entity-ref]
(let [entity-entid (entity-ref->entid db entity-ref)
page-uuid (when entity-entid
(some-> (d/entity db entity-entid) :block/page :block/uuid))
page-ref (when page-uuid [:block/uuid page-uuid])]
(cond-> (remove-attr-updates db tx-data :block/parent entity-ref)
page-ref
(conj [:db/add (tx-entity-ref db entity-ref) :block/parent page-ref]))))
(defn- fix-cycle-updates
[db tx-data]
(loop [tx-data tx-data
attempt 0]
(if (>= attempt 4)
tx-data
(if-let [{:keys [attr entity]} (db-sync-cycle/detect-cycle db tx-data)]
(let [tx-data' (case attr
:block/parent (replace-parent-with-page-root db tx-data entity)
:logseq.property.class/extends (remove-attr-updates db tx-data attr entity)
(remove-attr-updates db tx-data attr entity))]
(recur tx-data' (inc attempt)))
tx-data))))
(defn- sanitize-remote-tx-data
[db tx-data]
(let [tx-data (vec tx-data)
original-count (count tx-data)
tx-data (->> tx-data
db-normalize/replace-attr-retract-with-retract-entity-v2
keep-last-parent-update
(drop-invalid-refs db)
(fix-cycle-updates db))
dropped (- original-count (count tx-data))]
(when (pos? dropped)
(log/info :db-sync/remote-tx-sanitized
{:dropped dropped
:original original-count}))
tx-data))
(defn- flush-pending!
[repo client]
(let [inflight @(:inflight client)
@@ -534,29 +650,49 @@
(mapcat :reversed-tx)
reverse
db-normalize/replace-attr-retract-with-retract-entity-v2)
*tx-report (atom nil)
_ (ldb/transact-with-temp-conn!
conn
{:rtc-tx? true}
(fn [conn]
(prn :debug :reversed-tx-data reversed-tx-data)
(when (seq reversed-tx-data)
(ldb/transact! conn reversed-tx-data))
(let [;; 2. apply remote tx
tx-data' (db-normalize/replace-attr-retract-with-retract-entity tx-data)
_ (prn :debug :apply-remote-tx-data tx-data')
_ (prn :debug :original-tx-data tx-data')
tx-report (ldb/transact! conn tx-data')]
(reset! *tx-report tx-report)
;; TODO: 3. compute and compare checksum
;; Notice: no need to restore local changes, we'll send pending-txs to server
;; and let the server to fix the tx-data if there's invalidation
)))
tx-report @*tx-report
db-after (:db-after tx-report)
asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))]
(when (seq asset-uuids)
(enqueue-asset-downloads! repo client asset-uuids)))
tx-report (ldb/transact-with-temp-conn!
conn
{:rtc-tx? true}
(fn [temp-conn _*batch-tx-data]
(let [db @temp-conn
;; 1. rebase
rebase (:db-after (d/with db reversed-tx-data))
tx-report (d/with rebase tx-data)]
;; TODO: 2. ensure checksum matches between client & server
;; 3. fix data
(let [deleted-blocks (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report))
tx-meta {:persist-op? false
:gen-undo-ops? false}]
(when (seq deleted-blocks)
(let [nodes (map #(d/entity @temp-conn (:db/id %)) deleted-blocks)
pages (filter ldb/page? nodes)
blocks (remove ldb/page? nodes)]
;; deleting pages first
(doseq [page pages]
(worker-page/delete! temp-conn (:block/uuid page) tx-meta))
(when (seq blocks)
(outliner-tx/transact!
(assoc tx-meta
:outliner-op :delete-blocks
:transact-opts {:conn temp-conn})
(outliner-core/delete-blocks! temp-conn blocks {})))))
;; 4. apply remote tx-data
(when (seq tx-data)
(let [rtc-tx-data (sanitize-remote-tx-data @temp-conn tx-data)
tx-report (ldb/transact! temp-conn rtc-tx-data)]
(prn :debug :tx-data rtc-tx-data)
(sync-order/fix-duplicate-orders! temp-conn (:tx-data tx-report))))))))]
(when tx-report
(let [db-after (:db-after tx-report)
asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))]
(when (seq asset-uuids)
(enqueue-asset-downloads! repo client asset-uuids))))
;; TODO: Remove all pending txs, insert the above one
)
(catch :default e
(log/error :db-sync/apply-remote-tx-failed {:error e})
(throw e)))

View File

@@ -61,3 +61,21 @@
(is (some? page'))
(is (= (:db/id page') (:db/id (:block/parent parent'))))
(is (= (:db/id parent') (:db/id (:block/parent child'))))))))))
(deftest drop-missing-parent-update-test
(testing "drop invalid parent updates during remote rebase"
(let [{:keys [conn child]} (setup-parent-child)
child-uuid (:block/uuid child)
original-parent-uuid (:block/uuid (:block/parent (d/entity @conn (:db/id child))))
missing-parent-uuid (random-uuid)]
(prn :debug :missing-parent-uuid missing-parent-uuid)
(with-datascript-conn conn
(fn []
(#'db-sync/rebase-apply-remote-tx!
test-repo
nil
[[:db/add [:block/uuid child-uuid]
:block/parent [:block/uuid missing-parent-uuid]]])
(let [child' (d/entity @conn (:db/id child))]
(is (= original-parent-uuid
(:block/uuid (:block/parent child'))))))))))