mirror of
https://github.com/logseq/logseq.git
synced 2026-05-28 14:39:48 +00:00
fix(sync): stabilize incremental checksum updates
This commit is contained in:
112
deps/db-sync/src/logseq/db_sync/checksum.cljs
vendored
112
deps/db-sync/src/logseq/db_sync/checksum.cljs
vendored
@@ -151,12 +151,26 @@
|
||||
|
||||
(defn- touched-base-eids
|
||||
[db-before db-after tx-data]
|
||||
(->> tx-data
|
||||
(keep :e)
|
||||
(filter (fn [eid]
|
||||
(or (checksum-eligible-entity? db-before eid)
|
||||
(checksum-eligible-entity? db-after eid))))
|
||||
set))
|
||||
(let [before-eligible-cache (volatile! {})
|
||||
after-eligible-cache (volatile! {})
|
||||
cached-eligible? (fn [db cache eid]
|
||||
(if-some [cached (find @cache eid)]
|
||||
(val cached)
|
||||
(let [eligible? (boolean (checksum-eligible-entity? db eid))]
|
||||
(vswap! cache assoc eid eligible?)
|
||||
eligible?)))]
|
||||
(->> tx-data
|
||||
(reduce (fn [eids datom]
|
||||
(if-let [eid (:e datom)]
|
||||
(let [attr (:a datom)]
|
||||
(cond-> eids
|
||||
(= attr :block/uuid) (conj eid)
|
||||
(or (cached-eligible? db-before before-eligible-cache eid)
|
||||
(cached-eligible? db-after after-eligible-cache eid))
|
||||
(conj eid)))
|
||||
eids))
|
||||
#{})
|
||||
set)))
|
||||
|
||||
(defn- touched-checksum-uuids
|
||||
[db-before db-after eids]
|
||||
@@ -167,6 +181,29 @@
|
||||
(remove nil?)
|
||||
set))
|
||||
|
||||
(defn- eids-with-changed-block-uuid
|
||||
[db-before db-after eids]
|
||||
(->> eids
|
||||
(filter (fn [eid]
|
||||
(not= (get-block-uuid db-before eid)
|
||||
(get-block-uuid db-after eid))))
|
||||
set))
|
||||
|
||||
(defn- referrer-eids-by-target
|
||||
[db target-eid]
|
||||
(->> (concat (d/datoms db :avet :block/parent target-eid)
|
||||
(d/datoms db :avet :block/page target-eid))
|
||||
(map :e)
|
||||
set))
|
||||
|
||||
(defn- impacted-referrer-eids
|
||||
[db-before db-after target-eids]
|
||||
(->> target-eids
|
||||
(mapcat (fn [target-eid]
|
||||
(concat (referrer-eids-by-target db-before target-eid)
|
||||
(referrer-eids-by-target db-after target-eid))))
|
||||
set))
|
||||
|
||||
(defn- eids-by-block-uuid
|
||||
[db block-uuid]
|
||||
(->> (d/datoms db :avet :block/uuid block-uuid)
|
||||
@@ -214,7 +251,17 @@
|
||||
(if (empty? base-eids)
|
||||
{:removed {}
|
||||
:added {}}
|
||||
(let [touched-uuids (touched-checksum-uuids db-before db-after base-eids)]
|
||||
(let [uuid-changed-eids (eids-with-changed-block-uuid db-before db-after base-eids)
|
||||
dependent-eids (when (seq uuid-changed-eids)
|
||||
(->> (impacted-referrer-eids db-before db-after uuid-changed-eids)
|
||||
(filter (fn [eid]
|
||||
(or (checksum-eligible-entity? db-before eid)
|
||||
(checksum-eligible-entity? db-after eid))))
|
||||
set))
|
||||
effective-eids (if (seq dependent-eids)
|
||||
(set/union base-eids dependent-eids)
|
||||
base-eids)
|
||||
touched-uuids (touched-checksum-uuids db-before db-after effective-eids)]
|
||||
(if (duplicate-block-uuid? db-before db-after touched-uuids)
|
||||
(let [peer-eids (->> touched-uuids
|
||||
(mapcat (fn [uuid]
|
||||
@@ -224,7 +271,7 @@
|
||||
(or (checksum-eligible-entity? db-before eid)
|
||||
(checksum-eligible-entity? db-after eid))))
|
||||
set)
|
||||
touched-eids (set/union base-eids peer-eids)
|
||||
touched-eids (set/union effective-eids peer-eids)
|
||||
before-counts (tuple-counts-for-eids db-before touched-eids e2ee?)
|
||||
after-counts (tuple-counts-for-eids db-after touched-eids e2ee?)
|
||||
all-tuples (set/union (set (keys before-counts))
|
||||
@@ -248,8 +295,8 @@
|
||||
{:removed {}
|
||||
:added {}}
|
||||
all-tuples))
|
||||
(let [before-tuples (tuple-set-for-eids db-before base-eids e2ee?)
|
||||
after-tuples (tuple-set-for-eids db-after base-eids e2ee?)
|
||||
(let [before-tuples (tuple-set-for-eids db-before effective-eids e2ee?)
|
||||
after-tuples (tuple-set-for-eids db-after effective-eids e2ee?)
|
||||
removed (set/difference before-tuples after-tuples)
|
||||
added (set/difference after-tuples before-tuples)]
|
||||
{:removed (into {} (map (fn [tuple] [tuple 1]) removed))
|
||||
@@ -303,29 +350,28 @@
|
||||
|
||||
(defn update-checksum
|
||||
[checksum {:keys [db-before db-after tx-data]}]
|
||||
(time
|
||||
(let [before-e2ee? (ldb/get-graph-rtc-e2ee? db-before)
|
||||
after-e2ee? (ldb/get-graph-rtc-e2ee? db-after)
|
||||
tx-data (or tx-data [])]
|
||||
(cond
|
||||
(not= before-e2ee? after-e2ee?)
|
||||
(let [before-e2ee? (ldb/get-graph-rtc-e2ee? db-before)
|
||||
after-e2ee? (ldb/get-graph-rtc-e2ee? db-after)
|
||||
tx-data (or tx-data [])]
|
||||
(cond
|
||||
(not= before-e2ee? after-e2ee?)
|
||||
;; E2EE mode changes the global digest semantics, so incremental deltas are invalid.
|
||||
(recompute-checksum db-after)
|
||||
(recompute-checksum db-after)
|
||||
|
||||
(empty? tx-data)
|
||||
checksum
|
||||
(empty? tx-data)
|
||||
checksum
|
||||
|
||||
:else
|
||||
(let [initial-state (if (valid-checksum? checksum)
|
||||
(checksum->state checksum)
|
||||
(checksum->state (recompute-checksum db-before)))
|
||||
{:keys [removed added]} (net-tuple-delta db-before db-after after-e2ee? tx-data)
|
||||
state-after-removals (reduce-kv (fn [checksum-state tuple count]
|
||||
(apply-digest-n checksum-state tuple count subtract-digest))
|
||||
initial-state
|
||||
removed)
|
||||
state-after-additions (reduce-kv (fn [checksum-state tuple count]
|
||||
(apply-digest-n checksum-state tuple count add-digest))
|
||||
state-after-removals
|
||||
added)]
|
||||
(state->checksum state-after-additions))))))
|
||||
:else
|
||||
(let [initial-state (if (valid-checksum? checksum)
|
||||
(checksum->state checksum)
|
||||
(checksum->state (recompute-checksum db-before)))
|
||||
{:keys [removed added]} (net-tuple-delta db-before db-after after-e2ee? tx-data)
|
||||
state-after-removals (reduce-kv (fn [checksum-state tuple count]
|
||||
(apply-digest-n checksum-state tuple count subtract-digest))
|
||||
initial-state
|
||||
removed)
|
||||
state-after-additions (reduce-kv (fn [checksum-state tuple count]
|
||||
(apply-digest-n checksum-state tuple count add-digest))
|
||||
state-after-removals
|
||||
added)]
|
||||
(state->checksum state-after-additions)))))
|
||||
|
||||
@@ -81,26 +81,39 @@
|
||||
db-before (ldb/read-transit-str (:db-before payload))
|
||||
tx-data (vec (ensure-tx-data (:tx-data payload)))
|
||||
tx-report (d/with db-before tx-data)
|
||||
input-checksum (or (:prev-checksum payload)
|
||||
(:current-checksum payload))
|
||||
logged-prev-full (:prev-full-checksum payload)
|
||||
logged-full-after (or (:recomputed-after-checksum payload)
|
||||
(:full-checksum payload))
|
||||
replayed-prev-full (sync-checksum/recompute-checksum db-before)
|
||||
replayed-incremental-from-full-before
|
||||
(sync-checksum/update-checksum replayed-prev-full tx-report)
|
||||
replayed-incremental (sync-checksum/update-checksum (:prev-checksum payload) tx-report)
|
||||
replayed-incremental-from-input
|
||||
(sync-checksum/update-checksum input-checksum tx-report)
|
||||
replayed-recomputed (sync-checksum/recompute-checksum (:db-after tx-report))
|
||||
result {:log-file log-path
|
||||
:prev-tx (:prev-tx payload)
|
||||
:tx-meta (:tx-meta payload)
|
||||
:tx-count (count tx-data)
|
||||
:prev-checksum (:prev-checksum payload)
|
||||
:logged-prev-full-checksum (:prev-full-checksum payload)
|
||||
:input-checksum input-checksum
|
||||
:logged-prev-full-checksum logged-prev-full
|
||||
:replayed-prev-full-checksum replayed-prev-full
|
||||
:prev-checksum-eq-replayed-prev-full?
|
||||
(= (:prev-checksum payload) replayed-prev-full)
|
||||
:input-checksum-eq-replayed-prev-full?
|
||||
(= input-checksum replayed-prev-full)
|
||||
:match-logged-prev-full?
|
||||
(= logged-prev-full replayed-prev-full)
|
||||
:logged-new-checksum (:new-checksum payload)
|
||||
:replayed-incremental replayed-incremental
|
||||
:replayed-incremental replayed-incremental-from-input
|
||||
:replayed-incremental-from-full-before replayed-incremental-from-full-before
|
||||
:logged-recomputed-after (:recomputed-after-checksum payload)
|
||||
:logged-full-after-checksum logged-full-after
|
||||
:replayed-recomputed-after replayed-recomputed
|
||||
:match-logged-new? (= replayed-incremental (:new-checksum payload))
|
||||
:match-logged-recomputed? (= replayed-recomputed (:recomputed-after-checksum payload))
|
||||
:incremental-eq-full? (= replayed-incremental replayed-recomputed)}]
|
||||
:match-logged-new?
|
||||
(= replayed-incremental-from-input (:new-checksum payload))
|
||||
:match-logged-full-after?
|
||||
(= replayed-recomputed logged-full-after)
|
||||
:incremental-eq-full?
|
||||
(= replayed-incremental-from-input replayed-recomputed)
|
||||
:incremental-from-full-before-eq-full?
|
||||
(= replayed-incremental-from-full-before replayed-recomputed)}]
|
||||
(println (pr-str result))))))
|
||||
|
||||
@@ -51,6 +51,14 @@
|
||||
:db-after (ldb/read-transit-str (:db-after payload))
|
||||
:tx-data (ldb/read-transit-str (:tx-data payload))}))
|
||||
|
||||
(defn- load-parent-order-rebase-checksum-fixture
|
||||
[]
|
||||
(let [payload (-> (.readFileSync fs "test/logseq/db_sync/fixtures/parent_order_rebase_checksum_payload.edn" "utf8")
|
||||
reader/read-string)]
|
||||
{:db-before (ldb/read-transit-str (:db-before payload))
|
||||
:db-after (ldb/read-transit-str (:db-after payload))
|
||||
:tx-data (ldb/read-transit-str (:tx-data payload))}))
|
||||
|
||||
(deftest checksum-ignores-unrelated-datoms-test
|
||||
(testing "recompute and incremental checksums ignore unrelated datoms"
|
||||
(let [db-before (sample-db)
|
||||
@@ -75,6 +83,17 @@
|
||||
(is (not= checksum-before full))
|
||||
(is (= full incremental)))))
|
||||
|
||||
(deftest incremental-checksum-matches-recompute-on-parent-order-rebase-log-repro-test
|
||||
(testing "incremental checksum should equal full recompute on parent/order rebase payload"
|
||||
(let [{:keys [db-before db-after tx-data]} (load-parent-order-rebase-checksum-fixture)
|
||||
checksum-before (checksum/recompute-checksum db-before)
|
||||
tx-report {:db-before db-before
|
||||
:db-after db-after
|
||||
:tx-data tx-data}
|
||||
full (checksum/recompute-checksum db-after)
|
||||
incremental (checksum/update-checksum checksum-before tx-report)]
|
||||
(is (= full incremental)))))
|
||||
|
||||
(deftest incremental-checksum-matches-recompute-on-replace-datom-test
|
||||
(testing "incremental checksum matches full recompute when replacing existing values"
|
||||
(let [db-before (sample-db)
|
||||
@@ -146,6 +165,51 @@
|
||||
(is (not= before-checksum full))
|
||||
(is (= full incremental)))))
|
||||
|
||||
(deftest incremental-checksum-matches-recompute-when-parent-uuid-changes-test
|
||||
(testing "incremental checksum tracks children whose normalized parent UUID changes via parent :block/uuid update"
|
||||
(let [db-before (sample-db)
|
||||
before-checksum (checksum/recompute-checksum db-before)
|
||||
tx-report (d/with db-before [[:db/add 3 :block/uuid (random-uuid)]])
|
||||
db-after (:db-after tx-report)
|
||||
full (checksum/recompute-checksum db-after)
|
||||
incremental (checksum/update-checksum before-checksum tx-report)]
|
||||
(is (not= before-checksum full))
|
||||
(is (= full incremental)))))
|
||||
|
||||
(deftest incremental-checksum-matches-recompute-when-page-uuid-changes-test
|
||||
(testing "incremental checksum tracks blocks whose normalized page UUID changes via page :block/uuid update"
|
||||
(let [db-before (sample-db)
|
||||
before-checksum (checksum/recompute-checksum db-before)
|
||||
tx-report (d/with db-before [[:db/add 1 :block/uuid (random-uuid)]])
|
||||
db-after (:db-after tx-report)
|
||||
full (checksum/recompute-checksum db-after)
|
||||
incremental (checksum/update-checksum before-checksum tx-report)]
|
||||
(is (not= before-checksum full))
|
||||
(is (= full incremental)))))
|
||||
|
||||
(deftest incremental-checksum-matches-recompute-when-parent-uuid-changes-on-ineligible-target-test
|
||||
(testing "incremental checksum tracks children when parent UUID changes on a non-checksum-eligible target entity"
|
||||
(let [db-before (-> (d/empty-db db-schema/schema)
|
||||
(d/db-with [{:db/id 1
|
||||
:block/uuid (random-uuid)
|
||||
:block/name "page-a"
|
||||
:block/title "Page A"}
|
||||
{:db/id 2
|
||||
:block/uuid (random-uuid)
|
||||
:block/title "Orphan parent"}
|
||||
{:db/id 3
|
||||
:block/uuid (random-uuid)
|
||||
:block/title "Child"
|
||||
:block/parent 2
|
||||
:block/page 1}]))
|
||||
before-checksum (checksum/recompute-checksum db-before)
|
||||
tx-report (d/with db-before [[:db/add 2 :block/uuid (random-uuid)]])
|
||||
db-after (:db-after tx-report)
|
||||
full (checksum/recompute-checksum db-after)
|
||||
incremental (checksum/update-checksum before-checksum tx-report)]
|
||||
(is (not= before-checksum full))
|
||||
(is (= full incremental)))))
|
||||
|
||||
(deftest incremental-checksum-matches-recompute-when-block-is-readded-test
|
||||
(testing "incremental checksum remains equal to recompute when a block is deleted and re-added with the same UUID"
|
||||
(let [db0 (sample-db)
|
||||
@@ -370,6 +434,27 @@
|
||||
incremental (checksum/update-checksum checksum0 batch-report)]
|
||||
(is (= full-final incremental)))))
|
||||
|
||||
(deftest incremental-checksum-matches-recompute-with-duplicate-block-uuid-tx-churn-test
|
||||
(testing "incremental checksum stays equal to recompute when tx-data retracts+adds the same :block/uuid in one tx"
|
||||
(let [db0 (sample-db)
|
||||
shared-uuid (random-uuid)
|
||||
db1 (:db-after (d/with db0 [{:db/id 5
|
||||
:block/uuid shared-uuid
|
||||
:block/title "Peer"
|
||||
:block/parent 1
|
||||
:block/page 1}]))
|
||||
original-parent-uuid (:block/uuid (d/entity db1 3))
|
||||
tx-data [(d/datom 3 :block/uuid original-parent-uuid 200 false)
|
||||
(d/datom 5 :block/uuid shared-uuid 200 false)
|
||||
(d/datom 3 :block/uuid shared-uuid 200 true)
|
||||
(d/datom 3 :block/order "a9" 200 true)]
|
||||
tx-report (d/with db1 tx-data)
|
||||
db-before (:db-before tx-report)
|
||||
checksum-before (checksum/recompute-checksum db-before)
|
||||
full (checksum/recompute-checksum (:db-after tx-report))
|
||||
incremental (checksum/update-checksum checksum-before tx-report)]
|
||||
(is (= full incremental)))))
|
||||
|
||||
(deftest checksum-ignores-non-page-non-block-entities-test
|
||||
(testing "entities with uuid but without page semantics do not affect checksum"
|
||||
(let [db0 (sample-db)
|
||||
|
||||
1
deps/db-sync/test/logseq/db_sync/fixtures/parent_order_rebase_checksum_payload.edn
vendored
Normal file
1
deps/db-sync/test/logseq/db_sync/fixtures/parent_order_rebase_checksum_payload.edn
vendored
Normal file
File diff suppressed because one or more lines are too long
2
deps/db/src/logseq/db.cljs
vendored
2
deps/db/src/logseq/db.cljs
vendored
@@ -269,7 +269,7 @@
|
||||
_ (reset! *tx-data nil)
|
||||
tx-report {:db-before db-before
|
||||
:db-after @conn
|
||||
:tx-meta tx-meta
|
||||
:tx-meta (assoc tx-meta :batch-final-tx-report? true)
|
||||
:tx-data batch-tx-data}]
|
||||
(dc/run-callbacks conn tx-report)
|
||||
tx-report)
|
||||
|
||||
@@ -70,10 +70,11 @@
|
||||
(d/unlisten! conn ::listen-db-changes!)
|
||||
(d/listen! conn ::listen-db-changes!
|
||||
(fn listen-db-changes!-inner
|
||||
[{:keys [tx-data] :as tx-report}]
|
||||
(when-not (:batch-tx? @conn)
|
||||
(when (seq tx-data)
|
||||
(db-sync/update-local-sync-checksum! repo tx-report)
|
||||
[{:keys [tx-data tx-meta] :as tx-report}]
|
||||
(when (seq tx-data)
|
||||
(when-not (:batch-final-tx-report? tx-meta)
|
||||
(db-sync/update-local-sync-checksum! repo tx-report))
|
||||
(when-not (:batch-tx? @conn)
|
||||
(let [tx-report' (if sync-db-to-main-thread?
|
||||
(sync-db-to-main-thread repo conn tx-report)
|
||||
tx-report)
|
||||
|
||||
@@ -15,7 +15,9 @@
|
||||
[lambdaisland.glogi :as log]
|
||||
[logseq.common.util :as common-util]
|
||||
[logseq.db-sync.checksum :as sync-checksum]
|
||||
[promesa.core :as p]))
|
||||
[promesa.core :as p]
|
||||
;; [logseq.db :as ldb]
|
||||
))
|
||||
|
||||
(def ^:private reconnect-base-delay-ms 1000)
|
||||
(def ^:private reconnect-max-delay-ms 30000)
|
||||
@@ -64,7 +66,8 @@
|
||||
;; :full-checksum full-checksum
|
||||
;; :db-before (ldb/write-transit-str (:db-before tx-report))
|
||||
;; :db-after (ldb/write-transit-str (:db-after tx-report))
|
||||
;; :tx-data (ldb/write-transit-str (:tx-data tx-report))})))
|
||||
;; :tx-data (ldb/write-transit-str (:tx-data tx-report))
|
||||
;; :tx-meta (ldb/write-transit-str (:tx-meta tx-report))})))
|
||||
(client-op/update-local-checksum repo new-checksum))))
|
||||
|
||||
(defn- broadcast-rtc-state!
|
||||
|
||||
@@ -22,5 +22,6 @@ extend-exclude = ["resources/*",
|
||||
"src/resources/*",
|
||||
"scripts/resources/*",
|
||||
"src/test/fixtures/*",
|
||||
"deps/db-sync/test/logseq/db_sync/fixtures/*.edn",
|
||||
"clj-e2e/resources/*",
|
||||
"deps/common/src/logseq/common/plural.cljs"]
|
||||
|
||||
Reference in New Issue
Block a user