mirror of
https://github.com/logseq/logseq.git
synced 2026-04-24 22:25:01 +00:00
fix: compare selected remote tx-data with tx-report tx-data instead
of using checksum as checksum doesn't work for batch tx
This commit is contained in:
20
checksum.md
20
checksum.md
@@ -1,20 +0,0 @@
|
||||
# Tx-Chain Checksum
|
||||
|
||||
This project uses a transaction-chain checksum (sha256) to ensure both client and server process the same sequence of normalized transactions.
|
||||
|
||||
## Key Concepts
|
||||
|
||||
- Each tx only includes `:block/uuid`, `:block/parent`, `:block/page`, and `:block/title` (still ignoring `:block/tx-id` and RTC ignore attrs) before being turned into a deterministic string.
|
||||
- The checksum for a tx is `sha256(prev-checksum + tx-string)`, with a fixed `initial-checksum` seed.
|
||||
- The server persists the latest checksum in `sync_meta` and recomputes it from `tx_log` when needed.
|
||||
- The client stores the checksum alongside its local metadata and validates pull/ok and tx/batch/ok responses before applying txs.
|
||||
|
||||
## Failure Handling
|
||||
|
||||
- When a pull or tx batch includes a checksum, the client compares it to the computed chain; mismatches throw `:db-sync/checksum-mismatch`.
|
||||
- The server includes the checksum in both pull/ok and tx/batch/ok responses so clients can stay in sync while applying retries or pending batches.
|
||||
|
||||
## Testing
|
||||
|
||||
- Coverage lives in `src/test/frontend/worker/db_sync_test.cljs`.
|
||||
- Run `bb dev:test -v frontend.worker.db-sync-test`.
|
||||
67
deps/db-sync/src/logseq/db_sync/checksum.cljs
vendored
67
deps/db-sync/src/logseq/db_sync/checksum.cljs
vendored
@@ -1,67 +0,0 @@
|
||||
(ns logseq.db-sync.checksum
|
||||
(:require [clojure.set :as set]
|
||||
[datascript.core :as d]
|
||||
[goog.crypt :as crypt]
|
||||
[goog.crypt.Sha256]
|
||||
[logseq.db :as ldb]
|
||||
[logseq.db.common.normalize :refer [eid->lookup]]
|
||||
[logseq.db.frontend.property :as db-property]))
|
||||
|
||||
(def ^:private local-ignore-attrs
|
||||
#{:db/id
|
||||
:block/tx-id})
|
||||
|
||||
(def ^:private rtc-ignore-attrs
|
||||
(into #{}
|
||||
(keep (fn [[kw config]]
|
||||
(when (get-in config [:rtc :rtc/ignore-attr-when-syncing]) kw)))
|
||||
db-property/built-in-properties))
|
||||
|
||||
(def ^:private default-ignore-attrs
|
||||
(set/union local-ignore-attrs rtc-ignore-attrs))
|
||||
|
||||
(def ^:private default-checksum-attrs
|
||||
#{:block/uuid
|
||||
:block/parent
|
||||
:block/page
|
||||
:block/title})
|
||||
|
||||
(defn- sha256-hex
|
||||
[strings]
|
||||
(let [hasher (new crypt/Sha256)]
|
||||
(doseq [item strings]
|
||||
(.update hasher (crypt/stringToUtf8ByteArray item))
|
||||
(.update hasher (crypt/stringToUtf8ByteArray "\n")))
|
||||
(crypt/byteArrayToHex (.digest hasher))))
|
||||
|
||||
(def ^:private initial-checksum
|
||||
(sha256-hex ["logseq/db-sync/tx-chain-v1"]))
|
||||
|
||||
(defn next-checksum
|
||||
[prev-checksum tx-data]
|
||||
(let [base (or prev-checksum initial-checksum)
|
||||
tx-str (ldb/write-transit-str tx-data)]
|
||||
(sha256-hex [base tx-str])))
|
||||
|
||||
(defn initial-chain-checksum
|
||||
[]
|
||||
initial-checksum)
|
||||
|
||||
(defn filter-tx-data
|
||||
[{:keys [db-after db-before tx-data]}]
|
||||
(->> tx-data
|
||||
(keep
|
||||
(fn [[e a v _t added]]
|
||||
(when (contains? default-checksum-attrs a)
|
||||
(let [op (if added :db/add :db/retract)
|
||||
e' (or (eid->lookup db-before e)
|
||||
(eid->lookup db-after e))
|
||||
v' (if (and (integer? v)
|
||||
(pos? v)
|
||||
(or (= :db.type/ref (:db/valueType (d/entity db-after a)))
|
||||
(= :db.type/ref (:db/valueType (d/entity db-before a)))))
|
||||
(or (eid->lookup db-before v) (eid->lookup db-after v))
|
||||
v)]
|
||||
[op e' a v']))))
|
||||
sort
|
||||
vec))
|
||||
55
deps/db-sync/src/logseq/db_sync/compare.cljs
vendored
Normal file
55
deps/db-sync/src/logseq/db_sync/compare.cljs
vendored
Normal file
@@ -0,0 +1,55 @@
|
||||
(ns logseq.db-sync.compare
|
||||
(:require [datascript.core :as d]
|
||||
[datascript.impl.entity :as de]
|
||||
[goog.crypt.Sha256]
|
||||
[logseq.db.common.normalize :refer [eid->lookup]]))
|
||||
|
||||
(def ^:private compare-attrs
|
||||
#{:block/uuid
|
||||
:block/parent
|
||||
:block/page
|
||||
:block/title})
|
||||
|
||||
(defn filter-applied-tx-data
|
||||
[{:keys [db-after db-before tx-data]}]
|
||||
(->> tx-data
|
||||
(keep
|
||||
(fn [[e a v _t added]]
|
||||
(when (contains? compare-attrs a)
|
||||
(let [op (if added :db/add :db/retract)
|
||||
e' (or (eid->lookup db-before e)
|
||||
(eid->lookup db-after e))
|
||||
v' (if (and (integer? v)
|
||||
(pos? v)
|
||||
(or (= :db.type/ref (:db/valueType (d/entity db-after a)))
|
||||
(= :db.type/ref (:db/valueType (d/entity db-before a)))))
|
||||
(or (eid->lookup db-before v) (eid->lookup db-after v))
|
||||
v)]
|
||||
[op e' a v']))))
|
||||
distinct
|
||||
set))
|
||||
|
||||
(defn filter-received-tx-data
|
||||
[{:keys [tempids db-before db-after]} tx-data]
|
||||
(->> tx-data
|
||||
(mapcat
|
||||
(fn [[op e a v]]
|
||||
(if (= op :db.fn/retractEntity)
|
||||
(let [entity (d/entity db-before e)]
|
||||
(map
|
||||
(fn [a]
|
||||
(let [v (get entity a)
|
||||
v' (if (de/entity? v) [:block/uuid (:block/uuid v)] v)]
|
||||
[:db/retract [:block/uuid (:block/uuid entity)] a v']))
|
||||
compare-attrs))
|
||||
(when (contains? compare-attrs a)
|
||||
(let [e' (if (neg-int? e)
|
||||
(when-let [id (:block/uuid (d/entity db-after (get tempids e)))]
|
||||
[:block/uuid id])
|
||||
e)
|
||||
v' (if (neg-int? v)
|
||||
(when-let [id (:block/uuid (d/entity db-after (get tempids v)))]
|
||||
[:block/uuid id])
|
||||
v)]
|
||||
[[op e' a v']])))))
|
||||
set))
|
||||
@@ -40,14 +40,12 @@
|
||||
[:map
|
||||
[:type [:= "pull/ok"]]
|
||||
[:t :int]
|
||||
[:txs [:sequential tx-log-entry-schema]]
|
||||
[:checksum {:optional true} :string]])
|
||||
[:txs [:sequential tx-log-entry-schema]]])
|
||||
|
||||
(def tx-batch-ok-schema
|
||||
[:map
|
||||
[:type [:= "tx/batch/ok"]]
|
||||
[:t :int]
|
||||
[:checksum {:optional true} :string]])
|
||||
[:t :int]])
|
||||
|
||||
(def ws-server-message-schema
|
||||
[:multi {:dispatch :type}
|
||||
|
||||
43
deps/db-sync/src/logseq/db_sync/storage.cljs
vendored
43
deps/db-sync/src/logseq/db_sync/storage.cljs
vendored
@@ -3,12 +3,13 @@
|
||||
[clojure.string :as string]
|
||||
[datascript.core :as d]
|
||||
[datascript.storage :refer [IStorage]]
|
||||
[logseq.db-sync.checksum :as checksum]
|
||||
[logseq.db-sync.common :as common]
|
||||
[logseq.db.common.normalize :as db-normalize]
|
||||
[logseq.db.common.sqlite :as common-sqlite]
|
||||
[logseq.db.frontend.schema :as db-schema]))
|
||||
|
||||
;; TODO: GC kvs table
|
||||
|
||||
(defn init-schema! [sql]
|
||||
(common/sql-exec sql "create table if not exists kvs (addr INTEGER primary key, content TEXT, addresses JSON)")
|
||||
(common/sql-exec sql
|
||||
@@ -37,35 +38,6 @@
|
||||
(name k)
|
||||
(str v)))
|
||||
|
||||
(defn get-checksum [sql]
|
||||
(get-meta sql :checksum))
|
||||
|
||||
(defn set-checksum! [sql checksum]
|
||||
(set-meta! sql :checksum checksum))
|
||||
|
||||
(defn- fetch-all-txs [sql]
|
||||
(let [rows (common/get-sql-rows
|
||||
(common/sql-exec sql
|
||||
"select t, tx from tx_log order by t asc"))]
|
||||
(mapv (fn [row]
|
||||
{:t (aget row "t")
|
||||
:tx (aget row "tx")})
|
||||
rows)))
|
||||
|
||||
(defn get-or-init-checksum!
|
||||
[sql]
|
||||
(if-let [existing (get-checksum sql)]
|
||||
existing
|
||||
(let [txs (fetch-all-txs sql)
|
||||
tx-data (mapcat (fn [entry]
|
||||
(common/read-transit (:tx entry)))
|
||||
txs)
|
||||
checksum (if (seq tx-data)
|
||||
(checksum/next-checksum nil tx-data)
|
||||
(checksum/initial-chain-checksum))]
|
||||
(set-checksum! sql checksum)
|
||||
checksum)))
|
||||
|
||||
(defn get-t [sql]
|
||||
(let [value (get-meta sql :t)]
|
||||
(if (string? value)
|
||||
@@ -142,19 +114,14 @@
|
||||
(restore-data-from-addr sql addr))))
|
||||
|
||||
(defn- append-tx-for-tx-report
|
||||
[sql {:keys [db-after db-before tx-data] :as tx-report}]
|
||||
[sql {:keys [db-after db-before tx-data]}]
|
||||
(let [new-t (next-t! sql)
|
||||
created-at (common/now-ms)
|
||||
normalized-data (->> tx-data
|
||||
db-normalize/replace-attr-retract-with-retract-entity
|
||||
(db-normalize/normalize-tx-data db-after db-before))
|
||||
tx-str (common/write-transit normalized-data)
|
||||
prev-checksum (get-or-init-checksum! sql)
|
||||
next-checksum (checksum/next-checksum
|
||||
prev-checksum
|
||||
(checksum/filter-tx-data tx-report))]
|
||||
(append-tx! sql new-t tx-str created-at)
|
||||
(set-checksum! sql next-checksum)))
|
||||
tx-str (common/write-transit normalized-data)]
|
||||
(append-tx! sql new-t tx-str created-at)))
|
||||
|
||||
(defn- listen-db-updates!
|
||||
[sql conn]
|
||||
|
||||
10
deps/db-sync/src/logseq/db_sync/worker.cljs
vendored
10
deps/db-sync/src/logseq/db_sync/worker.cljs
vendored
@@ -6,7 +6,6 @@
|
||||
[lambdaisland.glogi.console :as glogi-console]
|
||||
[logseq.common.authorization :as authorization]
|
||||
[logseq.db :as ldb]
|
||||
[logseq.db-sync.checksum :as checksum]
|
||||
[logseq.db-sync.common :as common :refer [cors-headers]]
|
||||
[logseq.db-sync.malli-schema :as db-sync-schema]
|
||||
[logseq.db-sync.protocol :as protocol]
|
||||
@@ -294,11 +293,10 @@
|
||||
(defn- pull-response [^js self since]
|
||||
(let [sql (.-sql self)
|
||||
txs (storage/fetch-tx-since sql since)
|
||||
checksum (storage/get-or-init-checksum! sql)
|
||||
response {:type "pull/ok"
|
||||
:t (t-now self)
|
||||
:txs txs}]
|
||||
(assoc response :checksum checksum)))
|
||||
response))
|
||||
|
||||
;; FIXME: memory limit, should re-download graph using sqlite table rows
|
||||
;; (defn- snapshot-response [^js self]
|
||||
@@ -317,8 +315,7 @@
|
||||
(common/sql-exec sql "delete from tx_log")
|
||||
(common/sql-exec sql "delete from sync_meta")
|
||||
(storage/init-schema! sql)
|
||||
(storage/set-t! sql 0)
|
||||
(storage/set-checksum! sql (checksum/initial-chain-checksum)))
|
||||
(storage/set-t! sql 0))
|
||||
(when (seq rows)
|
||||
(doseq [[addr content addresses] rows]
|
||||
(common/sql-exec sql
|
||||
@@ -361,8 +358,7 @@
|
||||
(if (and (map? new-t) (= "tx/reject" (:type new-t)))
|
||||
new-t
|
||||
{:type "tx/batch/ok"
|
||||
:t new-t
|
||||
:checksum (storage/get-or-init-checksum! (.-sql self))}))
|
||||
:t new-t}))
|
||||
{:type "tx/reject"
|
||||
:reason "empty tx data"}))))
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
[logseq.common.path :as path]
|
||||
[logseq.common.util :as common-util]
|
||||
[logseq.db :as ldb]
|
||||
[logseq.db-sync.checksum :as sync-checksum]
|
||||
[logseq.db-sync.compare :as sync-compare]
|
||||
[logseq.db-sync.cycle :as sync-cycle]
|
||||
[logseq.db-sync.malli-schema :as db-sync-schema]
|
||||
[logseq.db-sync.order :as sync-order]
|
||||
@@ -389,16 +389,6 @@
|
||||
(fn [prev]
|
||||
(p/then prev (fn [_] (task)))))))
|
||||
|
||||
(defn- asset-type-from-files
|
||||
[repo asset-uuid]
|
||||
(p/let [paths (worker-state/<invoke-main-thread :thread-api/get-all-asset-file-paths repo)]
|
||||
(some (fn [path]
|
||||
(let [stem (path/file-stem path)
|
||||
ext (path/file-ext path)]
|
||||
(when (and (seq stem) (seq ext) (= stem (str asset-uuid)))
|
||||
ext)))
|
||||
paths)))
|
||||
|
||||
(defn- upload-remote-asset!
|
||||
[repo graph-id asset-uuid asset-type checksum]
|
||||
(let [base (http-base-url)]
|
||||
@@ -609,11 +599,10 @@
|
||||
cycle-tx-report]))))
|
||||
|
||||
(defn- apply-remote-tx!
|
||||
[repo client tx-data* & {:keys [expected-checksum]}]
|
||||
[repo client tx-data* & {:keys [local-tx remote-tx]}]
|
||||
(if-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(let [tx-data (keep-last-update @conn tx-data*)
|
||||
local-txs (pending-txs repo)
|
||||
*computed-checksum (atom nil)
|
||||
reversed-tx-data (->> local-txs
|
||||
(mapcat :reversed-tx)
|
||||
reverse
|
||||
@@ -634,17 +623,17 @@
|
||||
;; 2. transact remote tx-data
|
||||
remote-tx-report (ldb/transact! temp-conn tx-data tx-meta)
|
||||
_ (reset! *remote-tx-report remote-tx-report)
|
||||
computed-checksum (when expected-checksum
|
||||
(sync-checksum/next-checksum
|
||||
(client-op/get-local-checksum repo)
|
||||
(sync-checksum/filter-tx-data remote-tx-report)))]
|
||||
|
||||
(reset! *computed-checksum computed-checksum)
|
||||
;; (when (and expected-checksum (not= expected-checksum computed-checksum))
|
||||
;; (fail-fast :db-sync/checksum-mismatch
|
||||
;; {:repo repo
|
||||
;; :expected-checksum expected-checksum
|
||||
;; :actual-checksum computed-checksum}))
|
||||
remote-received-tx-data (sync-compare/filter-received-tx-data remote-tx-report tx-data)
|
||||
remote-applied-tx-data (sync-compare/filter-applied-tx-data remote-tx-report)]
|
||||
(when (not= remote-received-tx-data remote-applied-tx-data)
|
||||
(fail-fast :db-sync/compare-tx-data-mismatch
|
||||
{:repo repo
|
||||
:tx-data tx-data
|
||||
:remote-received-tx-data remote-received-tx-data
|
||||
:remote-applied-tx-data remote-applied-tx-data
|
||||
:local-tx local-tx
|
||||
:remote-tx remote-tx
|
||||
:tempids (:tempids remote-tx-report)}))
|
||||
|
||||
(when has-local-changes?
|
||||
;; 3. Remove nodes which parents have been deleted locally
|
||||
@@ -679,8 +668,6 @@
|
||||
(persist-local-tx! repo normalized-tx-data reversed-datoms {:op :rtc-rebase})))
|
||||
|
||||
(when tx-report
|
||||
(when-let [computed-checksum @*computed-checksum]
|
||||
(client-op/update-local-checksum repo computed-checksum))
|
||||
(let [asset-uuids (asset-uuids-from-tx (:db-after remote-tx-report) (:tx-data remote-tx-report))]
|
||||
(when (seq asset-uuids)
|
||||
(enqueue-asset-downloads! repo client asset-uuids))))
|
||||
@@ -707,10 +694,6 @@
|
||||
;; Upload response
|
||||
"tx/batch/ok" (do
|
||||
(require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
|
||||
;; TODO: should be able to calculate the batch tx's checksum with
|
||||
;; `(d/with current-db reversed-tx)`
|
||||
(when-let [checksum (:checksum message)]
|
||||
(client-op/update-local-checksum repo checksum))
|
||||
(client-op/update-local-tx repo remote-tx)
|
||||
(remove-pending-txs! repo @(:inflight client))
|
||||
(reset! (:inflight client) [])
|
||||
@@ -721,14 +704,14 @@
|
||||
(let [txs (:txs message)
|
||||
_ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
|
||||
_ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
|
||||
expected-checksum (:checksum message)
|
||||
txs-data (mapv (fn [data]
|
||||
(parse-transit (:tx data) {:repo repo :type "pull/ok"}))
|
||||
txs)
|
||||
tx (mapcat identity txs-data)]
|
||||
(when (seq tx)
|
||||
(apply-remote-tx! repo client tx
|
||||
:expected-checksum expected-checksum)
|
||||
:local-tx local-tx
|
||||
:remote-tx remote-tx)
|
||||
(client-op/update-local-tx repo remote-tx)
|
||||
(flush-pending! repo client))))
|
||||
"changed" (do
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
[frontend.worker.db-sync :as db-sync]
|
||||
[frontend.worker.rtc.client-op :as client-op]
|
||||
[frontend.worker.state :as worker-state]
|
||||
[logseq.db-sync.checksum :as db-sync-checksum]
|
||||
[logseq.db-sync.compare :as sync-compare]
|
||||
[logseq.db.test.helper :as db-test]
|
||||
[logseq.outliner.core :as outliner-core]))
|
||||
|
||||
@@ -235,8 +235,8 @@
|
||||
:logseq.property.embedding/hnsw-label-updated-at 1]]
|
||||
tx-2 [[:db/add [:block/uuid block-uuid]
|
||||
:logseq.property.embedding/hnsw-label-updated-at 2]]
|
||||
checksum-1 (db-sync-checksum/next-checksum nil tx-1)
|
||||
checksum-2 (db-sync-checksum/next-checksum nil tx-2)]
|
||||
checksum-1 (sync-compare/next-checksum nil tx-1)
|
||||
checksum-2 (sync-compare/next-checksum nil tx-2)]
|
||||
(is (= checksum-1 checksum-2)))))
|
||||
|
||||
(comment
|
||||
@@ -254,15 +254,15 @@
|
||||
(let [block-uuid (random-uuid)
|
||||
tx-1 [[:db/add [:block/uuid block-uuid] :block/title "block"]]
|
||||
tx-2 [[:db/add [:block/uuid block-uuid] :block/title "block updated"]]
|
||||
checksum-1 (db-sync-checksum/next-checksum nil tx-1)
|
||||
checksum-2 (db-sync-checksum/next-checksum nil tx-2)]
|
||||
checksum-1 (sync-compare/next-checksum nil tx-1)
|
||||
checksum-2 (sync-compare/next-checksum nil tx-2)]
|
||||
(is (not= checksum-1 checksum-2)))))
|
||||
|
||||
(deftest apply-remote-tx-checksum-validation-test
|
||||
(testing "apply-remote-tx honors checksum"
|
||||
(let [{:keys [conn client-ops-conn child1]} (setup-parent-child)
|
||||
tx-1 [[:db/add (:db/id child1) :block/title "child 1 remote"]]
|
||||
expected (db-sync-checksum/next-checksum nil tx-1)]
|
||||
expected (sync-compare/next-checksum nil tx-1)]
|
||||
(with-datascript-conns conn client-ops-conn
|
||||
(fn []
|
||||
(#'db-sync/apply-remote-tx!
|
||||
|
||||
Reference in New Issue
Block a user