fix(db-sync): handle duplicate :block/uuid in incremental checksum

Account for duplicate :block/uuid datom multiplicity during incremental checksum delta calculation.
Expand touched eids to uuid peers and apply digest add/remove by tuple counts.
Add regression fixture+test from rebased retract mismatch log.
This commit is contained in:
Tienson Qin
2026-04-12 17:32:02 +08:00
parent 30bb4acc27
commit 3dd7302dc9
4 changed files with 163 additions and 41 deletions

View File

@@ -149,7 +149,7 @@
(when (checksum-eligible-entity? db e)
(entity-checksum-tuples db e e2ee?))))))
(defn- touched-checksum-eids
(defn- touched-base-eids
[db-before db-after tx-data]
(->> tx-data
(keep :e)
@@ -158,22 +158,111 @@
(checksum-eligible-entity? db-after eid))))
set))
(defn- touched-checksum-uuids
[db-before db-after eids]
(->> eids
(mapcat (fn [eid]
[(get-block-uuid db-before eid)
(get-block-uuid db-after eid)]))
(remove nil?)
set))
(defn- eids-by-block-uuid
[db block-uuid]
(->> (d/datoms db :avet :block/uuid block-uuid)
(map :e)
set))
(defn- block-uuid-datom-count
[db eid]
(count (d/datoms db :eavt eid :block/uuid)))
(defn- duplicate-block-uuid?
[db-before db-after block-uuids]
(some (fn [uuid]
(or (> (count (d/datoms db-before :avet :block/uuid uuid)) 1)
(> (count (d/datoms db-after :avet :block/uuid uuid)) 1)))
block-uuids))
(defn- tuple-set-for-eids
[db eids e2ee?]
(reduce (fn [tuples eid]
(if (checksum-eligible-entity? db eid)
(into tuples (or (entity-checksum-tuples db eid e2ee?) #{}))
tuples))
#{}
eids))
(defn- tuple-counts-for-eids
[db eids e2ee?]
(reduce
(fn [counts eid]
(let [datom-count (block-uuid-datom-count db eid)]
(if (and (pos? datom-count)
(checksum-eligible-entity? db eid))
(reduce (fn [acc tuple]
(update acc tuple (fnil + 0) datom-count))
counts
(or (entity-checksum-tuples db eid e2ee?) #{}))
counts)))
{}
eids))
(defn- net-tuple-delta
[db-before db-after e2ee? tx-data]
(let [touched-eids (touched-checksum-eids db-before db-after tx-data)]
(reduce
(fn [{:keys [removed added]} eid]
(let [before-tuples (if (checksum-eligible-entity? db-before eid)
(or (entity-checksum-tuples db-before eid e2ee?) #{})
#{})
after-tuples (if (checksum-eligible-entity? db-after eid)
(or (entity-checksum-tuples db-after eid e2ee?) #{})
#{})]
{:removed (into removed (set/difference before-tuples after-tuples))
:added (into added (set/difference after-tuples before-tuples))}))
{:removed #{}
:added #{}}
touched-eids)))
(let [base-eids (touched-base-eids db-before db-after tx-data)]
(if (empty? base-eids)
{:removed {}
:added {}}
(let [touched-uuids (touched-checksum-uuids db-before db-after base-eids)]
(if (duplicate-block-uuid? db-before db-after touched-uuids)
(let [peer-eids (->> touched-uuids
(mapcat (fn [uuid]
(concat (eids-by-block-uuid db-before uuid)
(eids-by-block-uuid db-after uuid))))
(filter (fn [eid]
(or (checksum-eligible-entity? db-before eid)
(checksum-eligible-entity? db-after eid))))
set)
touched-eids (set/union base-eids peer-eids)
before-counts (tuple-counts-for-eids db-before touched-eids e2ee?)
after-counts (tuple-counts-for-eids db-after touched-eids e2ee?)
all-tuples (set/union (set (keys before-counts))
(set (keys after-counts)))]
(reduce
(fn [{:keys [removed added]} tuple]
(let [before-count (get before-counts tuple 0)
after-count (get after-counts tuple 0)]
(cond
(> before-count after-count)
{:removed (assoc removed tuple (- before-count after-count))
:added added}
(> after-count before-count)
{:removed removed
:added (assoc added tuple (- after-count before-count))}
:else
{:removed removed
:added added})))
{:removed {}
:added {}}
all-tuples))
(let [before-tuples (tuple-set-for-eids db-before base-eids e2ee?)
after-tuples (tuple-set-for-eids db-after base-eids e2ee?)
removed (set/difference before-tuples after-tuples)
added (set/difference after-tuples before-tuples)]
{:removed (into {} (map (fn [tuple] [tuple 1]) removed))
:added (into {} (map (fn [tuple] [tuple 1]) added))}))))))
(defn- apply-digest-n
[checksum-state tuple count op]
(let [digest (tuple-digest tuple)]
(loop [n count
state checksum-state]
(if (pos? n)
(recur (dec n) (op state digest))
state))))
(defn recompute-checksum
[db]
@@ -214,28 +303,29 @@
(defn update-checksum
[checksum {:keys [db-before db-after tx-data]}]
(let [before-e2ee? (ldb/get-graph-rtc-e2ee? db-before)
after-e2ee? (ldb/get-graph-rtc-e2ee? db-after)
tx-data (or tx-data [])]
(cond
(not= before-e2ee? after-e2ee?)
(time
(let [before-e2ee? (ldb/get-graph-rtc-e2ee? db-before)
after-e2ee? (ldb/get-graph-rtc-e2ee? db-after)
tx-data (or tx-data [])]
(cond
(not= before-e2ee? after-e2ee?)
;; E2EE mode changes the global digest semantics, so incremental deltas are invalid.
(recompute-checksum db-after)
(recompute-checksum db-after)
(empty? tx-data)
checksum
(empty? tx-data)
checksum
:else
(let [initial-state (if (valid-checksum? checksum)
(checksum->state checksum)
(checksum->state (recompute-checksum db-before)))
{:keys [removed added]} (net-tuple-delta db-before db-after after-e2ee? tx-data)
state-after-removals (reduce (fn [checksum-state tuple]
(subtract-digest checksum-state (tuple-digest tuple)))
initial-state
removed)
state-after-additions (reduce (fn [checksum-state tuple]
(add-digest checksum-state (tuple-digest tuple)))
state-after-removals
added)]
(state->checksum state-after-additions)))))
:else
(let [initial-state (if (valid-checksum? checksum)
(checksum->state checksum)
(checksum->state (recompute-checksum db-before)))
{:keys [removed added]} (net-tuple-delta db-before db-after after-e2ee? tx-data)
state-after-removals (reduce-kv (fn [checksum-state tuple count]
(apply-digest-n checksum-state tuple count subtract-digest))
initial-state
removed)
state-after-additions (reduce-kv (fn [checksum-state tuple count]
(apply-digest-n checksum-state tuple count add-digest))
state-after-removals
added)]
(state->checksum state-after-additions))))))

View File

@@ -1,6 +1,9 @@
(ns logseq.db-sync.checksum-test
(:require [cljs.test :refer [deftest is testing]]
(:require ["fs" :as fs]
[cljs.reader :as reader]
[cljs.test :refer [deftest is testing]]
[datascript.core :as d]
[logseq.db :as ldb]
[logseq.db-sync.checksum :as checksum]
[logseq.db.frontend.schema :as db-schema]))
@@ -40,6 +43,14 @@
{:db (:db-after tx-report)
:checksum incremental}))
(defn- load-rebased-retract-checksum-fixture
[]
(let [payload (-> (.readFileSync fs "test/logseq/db_sync/fixtures/rebased_retract_checksum_payload.edn" "utf8")
reader/read-string)]
{:db-before (ldb/read-transit-str (:db-before payload))
:db-after (ldb/read-transit-str (:db-after payload))
:tx-data (ldb/read-transit-str (:tx-data payload))}))
(deftest checksum-ignores-unrelated-datoms-test
(testing "recompute and incremental checksums ignore unrelated datoms"
(let [db-before (sample-db)
@@ -52,6 +63,18 @@
(is (= checksum-before
(checksum/update-checksum checksum-before tx-report))))))
(deftest incremental-checksum-matches-recompute-on-rebased-retract-entity-log-repro-test
(testing "incremental checksum should equal full recompute on rebased retract-entity replay payload"
(let [{:keys [db-before db-after tx-data]} (load-rebased-retract-checksum-fixture)
checksum-before (checksum/recompute-checksum db-before)
tx-report {:db-before db-before
:db-after db-after
:tx-data tx-data}
full (checksum/recompute-checksum db-after)
incremental (checksum/update-checksum checksum-before tx-report)]
(is (not= checksum-before full))
(is (= full incremental)))))
(deftest incremental-checksum-matches-recompute-on-replace-datom-test
(testing "incremental checksum matches full recompute when replacing existing values"
(let [db-before (sample-db)

File diff suppressed because one or more lines are too long

View File

@@ -54,9 +54,17 @@
[repo tx-report]
(when (worker-state/get-client-ops-conn repo)
(let [current-checksum (client-op/get-local-checksum repo)
new-checksum (sync-checksum/update-checksum current-checksum tx-report)
;; new-checksum (sync-checksum/recompute-checksum (:db-after tx-report))
]
new-checksum (sync-checksum/update-checksum current-checksum tx-report)]
;; (let [full-checksum (sync-checksum/recompute-checksum (:db-after tx-report))]
;; (when (not= new-checksum full-checksum)
;; (prn :debug
;; "checksum-doesn't match"
;; {:current-checksum current-checksum
;; :new-checksum new-checksum
;; :full-checksum full-checksum
;; :db-before (ldb/write-transit-str (:db-before tx-report))
;; :db-after (ldb/write-transit-str (:db-after tx-report))
;; :tx-data (ldb/write-transit-str (:tx-data tx-report))})))
(client-op/update-local-checksum repo new-checksum))))
(defn- broadcast-rtc-state!