mirror of
https://github.com/logseq/logseq.git
synced 2026-05-23 12:14:06 +00:00
enhance(db-sync): harden server tx batch handling and protocol
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
|
||||
(def tx-entry-schema
|
||||
[:map
|
||||
[:tx-id {:optional true} :uuid]
|
||||
[:tx :string]
|
||||
[:outliner-op {:optional true} [:maybe :keyword]]])
|
||||
|
||||
@@ -37,13 +38,20 @@
|
||||
[:type [:= "ping"]]]]])
|
||||
|
||||
(def tx-reject-reason-schema
|
||||
[:enum "stale" "empty tx data" "invalid tx" "invalid t-before" "db transact failed"])
|
||||
[:enum "stale"
|
||||
"empty tx data"
|
||||
"invalid tx"
|
||||
"invalid t-before"
|
||||
"db transact failed"
|
||||
"snapshot upload in progress"])
|
||||
|
||||
(def tx-reject-schema
|
||||
[:map
|
||||
[:type [:= "tx/reject"]]
|
||||
[:reason tx-reject-reason-schema]
|
||||
[:t {:optional true} :int]
|
||||
[:success-tx-ids {:optional true} [:sequential :uuid]]
|
||||
[:failed-tx-id {:optional true} :uuid]
|
||||
[:data {:optional true} :string]])
|
||||
|
||||
(def user-presence-schema
|
||||
|
||||
19
deps/db-sync/src/logseq/db_sync/node/graph.cljs
vendored
19
deps/db-sync/src/logseq/db_sync/node/graph.cljs
vendored
@@ -11,6 +11,8 @@
|
||||
(doto (js-obj)
|
||||
(aset "DB" index-db)
|
||||
(aset "LOGSEQ_SYNC_ASSETS" assets-bucket)
|
||||
;; Keep node-adapter snapshot stream uncompressed.
|
||||
(aset "DB_SYNC_SNAPSHOT_STREAM_GZIP" "false")
|
||||
(aset "COGNITO_ISSUER" (:cognito-issuer cfg))
|
||||
(aset "COGNITO_CLIENT_ID" (:cognito-client-id cfg))
|
||||
(aset "COGNITO_JWKS_URL" (:cognito-jwks-url cfg))))
|
||||
@@ -33,8 +35,19 @@
|
||||
(swap! registry assoc graph-id ctx)
|
||||
ctx)))
|
||||
|
||||
(defn- close-graph-context!
|
||||
[^js ctx]
|
||||
(when-let [^js sql (.-sql ctx)]
|
||||
(when-let [close (.-close sql)]
|
||||
(close))))
|
||||
|
||||
(defn delete-graph!
|
||||
[registry deps graph-id]
|
||||
(when-let [^js ctx (get @registry graph-id)]
|
||||
(close-graph-context! ctx)
|
||||
(swap! registry dissoc graph-id))
|
||||
(storage/delete-graph-db! (get-in deps [:config :data-dir]) graph-id))
|
||||
|
||||
(defn close-graphs! [registry]
|
||||
(doseq [[_ ^js ctx] @registry]
|
||||
(when-let [^js sql (.-sql ctx)]
|
||||
(when-let [close (.-close sql)]
|
||||
(close)))))
|
||||
(close-graph-context! ctx)))
|
||||
|
||||
@@ -26,6 +26,9 @@
|
||||
(doto (js-obj)
|
||||
(aset "DB" index-db)
|
||||
(aset "LOGSEQ_SYNC_ASSETS" assets-bucket)
|
||||
;; Node adapter serves snapshot transit stream without gzip to avoid
|
||||
;; browser/adapter content-encoding mismatches during graph download.
|
||||
(aset "DB_SYNC_SNAPSHOT_STREAM_GZIP" "false")
|
||||
(aset "COGNITO_ISSUER" (:cognito-issuer cfg))
|
||||
(aset "COGNITO_CLIENT_ID" (:cognito-client-id cfg))
|
||||
(aset "COGNITO_JWKS_URL" (:cognito-jwks-url cfg))))
|
||||
@@ -92,11 +95,14 @@
|
||||
(let [cfg (config/normalize-config overrides)
|
||||
index-db (storage/open-index-db (:data-dir cfg))
|
||||
assets-bucket (assets/make-bucket (node-path/join (:data-dir cfg) "assets"))
|
||||
env (make-env cfg index-db assets-bucket)
|
||||
registry (atom {})
|
||||
deps {:config cfg
|
||||
:index-db index-db
|
||||
:assets-bucket assets-bucket}
|
||||
env (doto (make-env cfg index-db assets-bucket)
|
||||
(aset "DB_SYNC_DELETE_GRAPH"
|
||||
(fn [graph-id]
|
||||
(graph/delete-graph! registry deps graph-id))))
|
||||
server (.createServer http
|
||||
(fn [req res]
|
||||
(-> (p/let [request (platform-node/request-from-node req {:scheme "http"})
|
||||
|
||||
@@ -48,3 +48,11 @@
|
||||
(ensure-dir! (node-path/dirname db-path))
|
||||
{:graph-name graph-name
|
||||
:sql (wrap-db (new sqlite db-path nil))}))
|
||||
|
||||
(defn delete-graph-db!
|
||||
[data-dir graph-id]
|
||||
(let [[_graph-name db-path] (common-sqlite/get-db-full-path (node-path/join data-dir "graphs") graph-id)
|
||||
graph-dir (node-path/dirname db-path)]
|
||||
(when (.existsSync fs graph-dir)
|
||||
(.rmSync fs graph-dir #js {:recursive true
|
||||
:force true}))))
|
||||
|
||||
35
deps/db-sync/src/logseq/db_sync/storage.cljs
vendored
35
deps/db-sync/src/logseq/db_sync/storage.cljs
vendored
@@ -148,24 +148,27 @@
|
||||
[sql {:keys [db-after db-before tx-data tx-meta] :as tx-report}]
|
||||
(let [prev-checksum (get-checksum sql)
|
||||
checksum (sync-checksum/update-checksum prev-checksum tx-report)]
|
||||
;; (let [full-checksum (sync-checksum/recompute-checksum db-after)]
|
||||
;; (let [full-checksum (sync-checksum/recompute-checksum db-after)
|
||||
;; prev-full-checksum (sync-checksum/recompute-checksum db-before)]
|
||||
;; (when (and prev-checksum
|
||||
;; (seq tx-data)
|
||||
;; (not= checksum full-checksum))
|
||||
;; (prn :debug :before-checksum-error {:prev-checksum prev-checksum
|
||||
;; :new-checksum checksum
|
||||
;; :recomputed-after-checksum full-checksum
|
||||
;; :tx-meta tx-meta
|
||||
;; :tx-data tx-data
|
||||
;; :db-before (ldb/write-transit-str db-before)
|
||||
;; :db-after (ldb/write-transit-str db-after)})
|
||||
;; (throw (ex-info "server checksum doesn't match"
|
||||
;; {:prev-checksum prev-checksum
|
||||
;; :recomputed-after-checksum full-checksum
|
||||
;; :tx-meta tx-meta
|
||||
;; :tx-data tx-data
|
||||
;; :db-before (ldb/write-transit-str db-before)
|
||||
;; :db-after (ldb/write-transit-str db-after)}))))
|
||||
;; (prn :debug :before-checksum-error {:prev-tx (get-t sql)
|
||||
;; :prev-checksum prev-checksum
|
||||
;; :prev-full-checksum prev-full-checksum
|
||||
;; :new-checksum checksum
|
||||
;; :recomputed-after-checksum full-checksum
|
||||
;; :tx-meta tx-meta
|
||||
;; :db-before (ldb/write-transit-str db-before)
|
||||
;; :tx-data (ldb/write-transit-str tx-data)})
|
||||
;; (when (not= prev-checksum prev-full-checksum)
|
||||
;; (prn :debug :prev-checksum-not-match {:prev-checksum prev-checksum
|
||||
;; :prev-full-checksum prev-full-checksum}))
|
||||
;; (throw (ex-info "server checksum doesn't match"
|
||||
;; {:prev-checksum prev-checksum
|
||||
;; :recomputed-after-checksum full-checksum
|
||||
;; :tx-meta tx-meta
|
||||
;; :tx-data tx-data
|
||||
;; :prev-tx (get-t sql)}))))
|
||||
|
||||
(set-checksum! sql checksum)
|
||||
(when-not (empty? tx-data)
|
||||
|
||||
76
deps/db-sync/src/logseq/db_sync/tx_sanitize.cljs
vendored
Normal file
76
deps/db-sync/src/logseq/db_sync/tx_sanitize.cljs
vendored
Normal file
@@ -0,0 +1,76 @@
|
||||
(ns logseq.db-sync.tx-sanitize
|
||||
(:require [clojure.set :as set]
|
||||
[datascript.core :as d]
|
||||
[logseq.db :as ldb]))
|
||||
|
||||
(def ^:private retract-entity-ops
|
||||
#{:db/retractEntity :db.fn/retractEntity})
|
||||
|
||||
(defn- retract-entity-op?
|
||||
[item]
|
||||
(and (vector? item)
|
||||
(= 2 (count item))
|
||||
(contains? retract-entity-ops (first item))))
|
||||
|
||||
(defn- entity-ref->eid
|
||||
[db entity-ref]
|
||||
(cond
|
||||
(and (number? entity-ref) (neg? entity-ref))
|
||||
nil
|
||||
|
||||
:else
|
||||
(try
|
||||
(some-> (d/entity db entity-ref) :db/id)
|
||||
(catch :default _
|
||||
nil))))
|
||||
|
||||
(def ^:private entity-op-kinds
|
||||
#{:db/add :db/retract :db/cas :db.fn/cas})
|
||||
|
||||
(defn- touched-entity-eid
|
||||
[db item]
|
||||
(cond
|
||||
(and (map? item) (contains? item :db/id))
|
||||
(entity-ref->eid db (:db/id item))
|
||||
|
||||
(and (map? item) (contains? item :block/uuid))
|
||||
(entity-ref->eid db [:block/uuid (:block/uuid item)])
|
||||
|
||||
(and (vector? item)
|
||||
(contains? entity-op-kinds (first item))
|
||||
(<= 4 (count item)))
|
||||
(entity-ref->eid db (second item))
|
||||
|
||||
:else
|
||||
nil))
|
||||
|
||||
(defn sanitize-tx
|
||||
([db tx-data]
|
||||
(sanitize-tx db tx-data nil))
|
||||
([db tx-data {:keys [drop-missing-retract-ops?]
|
||||
:or {drop-missing-retract-ops? false}}]
|
||||
(let [tx-data* (cond->> tx-data
|
||||
drop-missing-retract-ops?
|
||||
(remove (fn [item]
|
||||
(and (retract-entity-op? item)
|
||||
(nil? (entity-ref->eid db (second item)))))))
|
||||
tx-data* (vec tx-data*)
|
||||
retract-eids (->> tx-data*
|
||||
(keep (fn [item]
|
||||
(when (retract-entity-op? item)
|
||||
(entity-ref->eid db (second item)))))
|
||||
set)
|
||||
touched-eids (->> tx-data*
|
||||
(remove retract-entity-op?)
|
||||
(keep (partial touched-entity-eid db))
|
||||
set)
|
||||
descendant-retract-eids (->> retract-eids
|
||||
(mapcat (fn [eid]
|
||||
(let [entity (d/entity db eid)]
|
||||
(when (:block/uuid entity)
|
||||
(ldb/get-block-full-children-ids db eid)))))
|
||||
set)
|
||||
missing-retract-eids (sort (set/difference descendant-retract-eids retract-eids touched-eids))]
|
||||
(cond-> tx-data*
|
||||
(seq missing-retract-eids)
|
||||
(into (map (fn [eid] [:db/retractEntity eid]) missing-retract-eids))))))
|
||||
@@ -23,6 +23,10 @@
|
||||
|
||||
(defn- <delete-graph-do! [^js env ^js url graph-id]
|
||||
(let [^js namespace (.-LOGSEQ_SYNC_DO env)
|
||||
_ (when-not namespace
|
||||
(throw (ex-info "missing LOGSEQ_SYNC_DO binding"
|
||||
{:graph-id graph-id
|
||||
:binding "LOGSEQ_SYNC_DO"})))
|
||||
do-id (.idFromName namespace graph-id)
|
||||
stub (.get namespace do-id)
|
||||
reset-url (str (.-origin url) "/admin/reset")]
|
||||
@@ -33,10 +37,17 @@
|
||||
:status (.-status resp)})))
|
||||
resp)))
|
||||
|
||||
(defn- <delete-graph-storage!
|
||||
[^js env ^js url graph-id]
|
||||
(let [delete-graph-fn (aget env "DB_SYNC_DELETE_GRAPH")]
|
||||
(if (fn? delete-graph-fn)
|
||||
(delete-graph-fn graph-id)
|
||||
(<delete-graph-do! env url graph-id))))
|
||||
|
||||
(defn- <delete-graph! [db ^js env ^js url graph-id]
|
||||
(p/do!
|
||||
(index/<graph-delete-metadata! db graph-id)
|
||||
(<delete-graph-do! env url graph-id)
|
||||
(<delete-graph-storage! env url graph-id)
|
||||
(index/<graph-delete-index-entry! db graph-id)))
|
||||
|
||||
(defn ^:large-vars/cleanup-todo handle [{:keys [db ^js env request url claims route]}]
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
(ns logseq.db-sync.worker.handler.sync
|
||||
(:require [clojure.set :as set]
|
||||
[clojure.string :as string]
|
||||
[datascript.core :as d]
|
||||
(:require [clojure.string :as string]
|
||||
[lambdaisland.glogi :as log]
|
||||
[logseq.db :as ldb]
|
||||
[logseq.db-sync.batch :as batch]
|
||||
@@ -11,6 +9,7 @@
|
||||
[logseq.db-sync.protocol :as protocol]
|
||||
[logseq.db-sync.snapshot :as snapshot]
|
||||
[logseq.db-sync.storage :as storage]
|
||||
[logseq.db-sync.tx-sanitize :as tx-sanitize]
|
||||
[logseq.db-sync.worker.http :as http]
|
||||
[logseq.db-sync.worker.routes.sync :as sync-routes]
|
||||
[logseq.db-sync.worker.ws :as ws]
|
||||
@@ -141,11 +140,18 @@
|
||||
stream))
|
||||
|
||||
(defn- maybe-compress-stream [stream]
|
||||
(when-not (exists? js/CompressionStream)
|
||||
(throw (ex-info "gzip compression not supported"
|
||||
{:type :db-sync/compression-not-supported})))
|
||||
(.pipeThrough stream (js/CompressionStream. snapshot-content-encoding)))
|
||||
|
||||
(defn- snapshot-stream-gzip-enabled?
|
||||
[^js self]
|
||||
(let [v (some-> self .-env (aget "DB_SYNC_SNAPSHOT_STREAM_GZIP"))]
|
||||
(cond
|
||||
(nil? v) true
|
||||
(false? v) false
|
||||
(string? v) (not (contains? #{"false" "0" "off" "no"}
|
||||
(string/lower-case v)))
|
||||
:else (boolean v))))
|
||||
|
||||
;; (defn- <buffer-stream
|
||||
;; [stream]
|
||||
;; (p/let [resp (js/Response. stream)
|
||||
@@ -294,70 +300,58 @@
|
||||
(reset-import! sql))
|
||||
(import-snapshot-rows! sql "kvs" rows)))
|
||||
|
||||
(defn- sanitize-tx
|
||||
[db outliner-op tx-data]
|
||||
(let [retract-op? (fn [item]
|
||||
(and (vector? item)
|
||||
(= 2 (count item))
|
||||
(contains? #{:db/retractEntity :db.fn/retractEntity} (first item))))
|
||||
->eid (fn [entity-ref]
|
||||
(some-> (d/entity db entity-ref) :db/id))
|
||||
retract-eids (->> tx-data
|
||||
(keep (fn [item]
|
||||
(when (retract-op? item)
|
||||
(->eid (second item)))))
|
||||
set)
|
||||
descendant-retract-eids (->> retract-eids
|
||||
(mapcat (fn [eid]
|
||||
(let [entity (d/entity db eid)]
|
||||
(when (:block/uuid entity)
|
||||
(ldb/get-block-full-children-ids db eid)))))
|
||||
(remove nil?)
|
||||
set)
|
||||
missing-retract-eids (sort (set/difference descendant-retract-eids retract-eids))
|
||||
tx-data' (cond-> (vec tx-data)
|
||||
(seq missing-retract-eids)
|
||||
(into (map (fn [eid] [:db/retractEntity eid]) missing-retract-eids)))]
|
||||
(if (= outliner-op :fix)
|
||||
(remove (fn [[_ id]]
|
||||
(nil? (d/entity db id))) tx-data')
|
||||
tx-data')))
|
||||
|
||||
(defn- apply-tx-entry!
|
||||
[conn {:keys [tx outliner-op]}]
|
||||
(let [tx-data (->> (protocol/transit->tx tx)
|
||||
(sanitize-tx @conn outliner-op))]
|
||||
(when (seq tx-data)
|
||||
(ldb/transact! conn tx-data (cond-> {:op :apply-client-tx}
|
||||
outliner-op (assoc :outliner-op outliner-op))))))
|
||||
(let [tx-data (tx-sanitize/sanitize-tx @conn
|
||||
(protocol/transit->tx tx)
|
||||
{:drop-missing-retract-ops? (= outliner-op :fix)})]
|
||||
(if (seq tx-data)
|
||||
(try
|
||||
(ldb/transact! conn tx-data (cond-> {:op :apply-client-tx}
|
||||
outliner-op (assoc :outliner-op outliner-op)))
|
||||
true
|
||||
(catch :default e
|
||||
;; Rebase txs are inferred from local history and can become stale when
|
||||
;; concurrent remote edits remove referenced entities before upload.
|
||||
;; Treat stale :entity-id/missing rebases as no-op so sync can continue.
|
||||
(if (and (= outliner-op :rebase)
|
||||
(= :entity-id/missing (:error (ex-data e))))
|
||||
(do
|
||||
(log/warn :db-sync/drop-stale-rebase-tx
|
||||
{:outliner-op outliner-op
|
||||
:tx-data tx-data
|
||||
:error (str e)})
|
||||
false)
|
||||
(throw e))))
|
||||
false)))
|
||||
|
||||
(defn- db-transact-failed-response
|
||||
[sql tx-entry message]
|
||||
{:type "tx/reject"
|
||||
:reason (str "db transact failed: " message)
|
||||
:t (storage/get-t sql)
|
||||
:data (common/write-transit tx-entry)})
|
||||
|
||||
(defn- apply-tx! [^js self sender tx-entries]
|
||||
(defn- apply-tx! [^js self tx-entries]
|
||||
(let [sql (.-sql self)]
|
||||
(ensure-conn! self)
|
||||
(let [conn (.-conn self)]
|
||||
(loop [remaining tx-entries]
|
||||
(loop [remaining tx-entries
|
||||
applied? false
|
||||
successful-tx-ids []]
|
||||
(if-let [tx-entry (first remaining)]
|
||||
(let [result (try
|
||||
(apply-tx-entry! conn tx-entry)
|
||||
::ok
|
||||
(catch :default e
|
||||
(throw e)
|
||||
(log/error :db-sync/transact-failed e)
|
||||
(db-transact-failed-response sql tx-entry (.-message e))))]
|
||||
(if (= ::ok result)
|
||||
(recur (next remaining))
|
||||
result))
|
||||
(let [tx-id (:tx-id tx-entry)
|
||||
applied-entry? (try
|
||||
(boolean (apply-tx-entry! conn tx-entry))
|
||||
(catch :default e
|
||||
(log/error :db-sync/transact-failed e)
|
||||
(throw (ex-info "tx entry apply failed"
|
||||
(cond-> {:type :db-sync/tx-entry-failed
|
||||
:successful-tx-ids successful-tx-ids}
|
||||
tx-id (assoc :failed-tx-id tx-id))
|
||||
e))))
|
||||
next-successful-tx-ids (cond-> successful-tx-ids
|
||||
tx-id (conj tx-id))]
|
||||
(recur (next remaining)
|
||||
(or applied? applied-entry?)
|
||||
next-successful-tx-ids))
|
||||
(let [new-t (storage/get-t sql)]
|
||||
;; FIXME: no need to broadcast if client tx is less than remote tx
|
||||
(ws/broadcast! self sender {:type "changed" :t new-t})
|
||||
new-t))))))
|
||||
{:t new-t
|
||||
:applied? applied?
|
||||
:successful-tx-ids successful-tx-ids}))))))
|
||||
|
||||
(defn handle-tx-batch! [^js self sender txs t-before]
|
||||
(let [current-t (t-now self)]
|
||||
@@ -379,18 +373,28 @@
|
||||
:else
|
||||
(if (seq txs)
|
||||
(try
|
||||
(let [new-t (apply-tx! self sender txs)]
|
||||
(if (and (map? new-t) (= "tx/reject" (:type new-t)))
|
||||
new-t
|
||||
(let [checksum (current-checksum self)]
|
||||
(cond-> {:type "tx/batch/ok"
|
||||
:t new-t}
|
||||
(string? checksum) (assoc :checksum checksum)))))
|
||||
(let [{:keys [t applied?]} (apply-tx! self txs)
|
||||
checksum (current-checksum self)]
|
||||
(when applied?
|
||||
;; Broadcast once per processed batch after tx-log/checksum settle.
|
||||
(ws/broadcast! self sender {:type "changed" :t t}))
|
||||
(cond-> {:type "tx/batch/ok"
|
||||
:t t}
|
||||
(string? checksum) (assoc :checksum checksum)))
|
||||
(catch :default e
|
||||
(log/error :db-sync/transact-failed e)
|
||||
{:type "tx/reject"
|
||||
:reason "db transact failed"
|
||||
:t (t-now self)}))
|
||||
(let [new-t (t-now self)
|
||||
{:keys [successful-tx-ids failed-tx-id]}
|
||||
(ex-data e)]
|
||||
(log/error :db-sync/transact-failed e)
|
||||
(when (> new-t current-t)
|
||||
;; Broadcast once when partial batch writes advanced the graph.
|
||||
(ws/broadcast! self sender {:type "changed" :t new-t}))
|
||||
(cond-> {:type "tx/reject"
|
||||
:reason "db transact failed"
|
||||
:error-detail (str e)
|
||||
:t new-t}
|
||||
(seq successful-tx-ids) (assoc :success-tx-ids successful-tx-ids)
|
||||
failed-tx-id (assoc :failed-tx-id failed-tx-id)))))
|
||||
{:type "tx/reject"
|
||||
:reason "empty tx data"}))))
|
||||
|
||||
@@ -437,14 +441,19 @@
|
||||
(let [graph-id (graph-id-from-request request)]
|
||||
(if (not (seq graph-id))
|
||||
(http/bad-request "missing graph id")
|
||||
(let [stream (-> (snapshot-export-stream self)
|
||||
(maybe-compress-stream))
|
||||
row-count (snapshot-row-count (.-sql self))]
|
||||
(let [gzip? (and (snapshot-stream-gzip-enabled? self)
|
||||
(exists? js/CompressionStream))
|
||||
stream (cond-> (snapshot-export-stream self)
|
||||
gzip?
|
||||
(maybe-compress-stream))
|
||||
row-count (snapshot-row-count (.-sql self))
|
||||
headers (cond-> {"content-type" snapshot-content-type}
|
||||
gzip?
|
||||
(assoc "content-encoding" snapshot-content-encoding))]
|
||||
(js/Response. stream
|
||||
#js {:status 200
|
||||
:headers (js/Object.assign
|
||||
#js {"content-type" snapshot-content-type
|
||||
"content-encoding" snapshot-content-encoding}
|
||||
(clj->js headers)
|
||||
#js {"x-snapshot-row-count" (str row-count)}
|
||||
(common/cors-headers))})))))
|
||||
|
||||
@@ -460,11 +469,16 @@
|
||||
(if-not ready-for-sync?
|
||||
(http/error-response "graph not ready" 409)
|
||||
(let [key (str "stream/" graph-id ".snapshot")
|
||||
url (snapshot-stream-url request graph-id)]
|
||||
(http/json-response :sync/snapshot-download {:ok true
|
||||
:key key
|
||||
:url url
|
||||
:content-encoding snapshot-content-encoding})))))))
|
||||
url (snapshot-stream-url request graph-id)
|
||||
content-encoding (when (and (snapshot-stream-gzip-enabled? self)
|
||||
(exists? js/CompressionStream))
|
||||
snapshot-content-encoding)]
|
||||
(http/json-response :sync/snapshot-download
|
||||
(cond-> {:ok true
|
||||
:key key
|
||||
:url url}
|
||||
content-encoding
|
||||
(assoc :content-encoding content-encoding)))))))))
|
||||
|
||||
(defn- handle-sync-admin-reset
|
||||
[^js self]
|
||||
|
||||
@@ -230,6 +230,66 @@
|
||||
(is (= full-final checksum-b))
|
||||
(is (= one-shot-checksum checksum-b)))))
|
||||
|
||||
(deftest incremental-checksum-is-invariant-across-commuting-batch-order-test
|
||||
(testing "incremental checksum converges when commuting tx batches are applied in different order"
|
||||
(let [db0 (sample-db)
|
||||
checksum0 (checksum/recompute-checksum db0)
|
||||
tx-a [[:db/add 3 :block/title "Parent v2"]
|
||||
[:db/add 4 :block/order "a9"]]
|
||||
tx-b [[:db/add 1 :block/name "page-a-v2"]
|
||||
[:db/add 2 :block/title "Page B v2"]]
|
||||
report-a (d/with db0 tx-a)
|
||||
checksum-a (checksum/update-checksum checksum0 report-a)
|
||||
db-a (:db-after report-a)
|
||||
report-b-after-a (d/with db-a tx-b)
|
||||
checksum-ab (checksum/update-checksum checksum-a report-b-after-a)
|
||||
db-ab (:db-after report-b-after-a)
|
||||
full-ab (checksum/recompute-checksum db-ab)
|
||||
|
||||
report-b (d/with db0 tx-b)
|
||||
checksum-b (checksum/update-checksum checksum0 report-b)
|
||||
db-b (:db-after report-b)
|
||||
report-a-after-b (d/with db-b tx-a)
|
||||
checksum-ba (checksum/update-checksum checksum-b report-a-after-b)
|
||||
db-ba (:db-after report-a-after-b)
|
||||
full-ba (checksum/recompute-checksum db-ba)]
|
||||
(is (= full-ab full-ba))
|
||||
(is (= full-ab checksum-ab))
|
||||
(is (= full-ba checksum-ba))
|
||||
(is (= checksum-ab checksum-ba)))))
|
||||
|
||||
(deftest incremental-checksum-is-invariant-across-intra-batch-datom-order-test
|
||||
(testing "incremental checksum converges when same tx-data uses different datom order"
|
||||
(let [db0 (sample-db)
|
||||
checksum0 (checksum/recompute-checksum db0)
|
||||
uuid-a (random-uuid)
|
||||
uuid-b (random-uuid)
|
||||
tx-order-a [{:db/id -1
|
||||
:block/uuid uuid-a
|
||||
:block/title "Inserted A"
|
||||
:block/order "a5"
|
||||
:block/parent 3
|
||||
:block/page 1}
|
||||
{:db/id -2
|
||||
:block/uuid uuid-b
|
||||
:block/title "Inserted B"
|
||||
:block/order "a6"
|
||||
:block/parent 3
|
||||
:block/page 1}]
|
||||
tx-order-b (vec (reverse tx-order-a))
|
||||
report-a (d/with db0 tx-order-a)
|
||||
db-a (:db-after report-a)
|
||||
full-a (checksum/recompute-checksum db-a)
|
||||
incremental-a (checksum/update-checksum checksum0 report-a)
|
||||
report-b (d/with db0 tx-order-b)
|
||||
db-b (:db-after report-b)
|
||||
full-b (checksum/recompute-checksum db-b)
|
||||
incremental-b (checksum/update-checksum checksum0 report-b)]
|
||||
(is (= full-a full-b))
|
||||
(is (= full-a incremental-a))
|
||||
(is (= full-b incremental-b))
|
||||
(is (= incremental-a incremental-b)))))
|
||||
|
||||
(deftest incremental-checksum-handles-rebase-like-toggle-churn-test
|
||||
(testing "incremental checksum uses net tuple delta when batch contains add/retract/add churn"
|
||||
(let [db0 (sample-db)
|
||||
|
||||
223
deps/db-sync/test/logseq/db_sync/normalize_test.cljs
vendored
223
deps/db-sync/test/logseq/db_sync/normalize_test.cljs
vendored
@@ -1,6 +1,7 @@
|
||||
(ns logseq.db-sync.normalize-test
|
||||
(:require [cljs.test :refer [deftest is testing]]
|
||||
[datascript.core :as d]
|
||||
[logseq.db :as ldb]
|
||||
[logseq.db.common.normalize :as db-normalize]
|
||||
[logseq.db.test.helper :as db-test]))
|
||||
|
||||
@@ -19,6 +20,87 @@
|
||||
[datom]
|
||||
(subvec (vec datom) 0 4))
|
||||
|
||||
(defn- tx-touches-uuid?
|
||||
[tx-data block-uuid]
|
||||
(let [lookup [:block/uuid block-uuid]
|
||||
block-uuid-str (str block-uuid)]
|
||||
(boolean
|
||||
(some (fn [item]
|
||||
(let [item (vec item)]
|
||||
(case (count item)
|
||||
5 (or (= lookup (nth item 1))
|
||||
(= block-uuid-str (nth item 1))
|
||||
(= block-uuid (nth item 3)))
|
||||
2 (or (= lookup (second item))
|
||||
(= block-uuid-str (second item)))
|
||||
false)))
|
||||
tx-data))))
|
||||
|
||||
(defn- seeded-rng
|
||||
[seed0]
|
||||
(let [state (atom (bit-or (long seed0) 0))]
|
||||
(fn []
|
||||
(let [s (swap! state
|
||||
(fn [x]
|
||||
(let [x (bit-xor x (bit-shift-left x 13))
|
||||
x (bit-xor x (bit-shift-right x 17))
|
||||
x (bit-xor x (bit-shift-left x 5))]
|
||||
(bit-or x 0))))]
|
||||
(/ (double (unsigned-bit-shift-right s 0)) 4294967296.0)))))
|
||||
|
||||
(defn- rand-int*
|
||||
[rng n]
|
||||
(js/Math.floor (* (rng) n)))
|
||||
|
||||
(defn- pick-rand
|
||||
[rng coll]
|
||||
(when (seq coll)
|
||||
(nth coll (rand-int* rng (count coll)))))
|
||||
|
||||
(defn- normal-block-uuids
|
||||
[db]
|
||||
(->> (d/datoms db :avet :block/uuid)
|
||||
(map :e)
|
||||
distinct
|
||||
(keep (fn [eid]
|
||||
(let [ent (d/entity db eid)]
|
||||
(when (and (uuid? (:block/uuid ent))
|
||||
(not (ldb/built-in? ent))
|
||||
(nil? (:block/name ent))
|
||||
(some? (:block/page ent)))
|
||||
(:block/uuid ent)))))
|
||||
vec))
|
||||
|
||||
(defn- page-uuid
|
||||
[db]
|
||||
(some (fn [{:keys [e]}]
|
||||
(let [ent (d/entity db e)]
|
||||
(when (and (uuid? (:block/uuid ent))
|
||||
(not (ldb/built-in? ent))
|
||||
(string? (:block/name ent)))
|
||||
(:block/uuid ent))))
|
||||
(d/datoms db :avet :block/uuid)))
|
||||
|
||||
(defn- block-state
|
||||
[db]
|
||||
(->> (d/datoms db :avet :block/uuid)
|
||||
(map :e)
|
||||
distinct
|
||||
(keep (fn [eid]
|
||||
(let [ent (d/entity db eid)]
|
||||
(when (and (uuid? (:block/uuid ent))
|
||||
(not (ldb/built-in? ent))
|
||||
(or (string? (:block/name ent))
|
||||
(some? (:block/page ent))))
|
||||
[(:block/uuid ent)
|
||||
{:block/uuid (:block/uuid ent)
|
||||
:block/name (:block/name ent)
|
||||
:block/title (:block/title ent)
|
||||
:block/page (some-> ent :block/page :block/uuid)
|
||||
:block/parent (some-> ent :block/parent :block/uuid)
|
||||
:block/order (:block/order ent)}]))))
|
||||
(into (sorted-map))))
|
||||
|
||||
(deftest normalize-tx-data-keeps-title-retract-without-replacement-test
|
||||
(let [conn (new-conn)
|
||||
page-uuid (create-page! conn "Page")
|
||||
@@ -43,3 +125,144 @@
|
||||
(testing "drops old :block/title retract and keeps new add during title update"
|
||||
(is (some #(= [:db/add [:block/uuid page-uuid] :block/title "Page 2"] %) tx-data))
|
||||
(is (not-any? #(= [:db/retract [:block/uuid page-uuid] :block/title "Page"] %) tx-data)))))
|
||||
|
||||
(deftest normalize-tx-data-keeps-recreated-normal-blocks-test
|
||||
(testing "retract + recreate for normal blocks should not drop recreated entity datoms"
|
||||
(let [conn (new-conn)
|
||||
page-uuid (create-page! conn "Page")
|
||||
target-uuid (random-uuid)
|
||||
sibling-uuid (random-uuid)
|
||||
child-uuid (random-uuid)
|
||||
_ (d/transact! conn [{:block/uuid target-uuid
|
||||
:block/title "old-target"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid page-uuid]
|
||||
:block/order "a0"}
|
||||
{:block/uuid sibling-uuid
|
||||
:block/title "sibling"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid page-uuid]
|
||||
:block/order "a1"}
|
||||
{:block/uuid child-uuid
|
||||
:block/title "child"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid target-uuid]
|
||||
:block/order "a0"}])
|
||||
db-before @conn
|
||||
|
||||
;; Shape A: retract then recreate with tempid + explicit :block/uuid add.
|
||||
tx-report-a (d/with db-before
|
||||
[[:db/retractEntity [:block/uuid target-uuid]]
|
||||
[:db/add -1 :block/uuid target-uuid]
|
||||
[:db/add -1 :block/title "new-target-a"]
|
||||
[:db/add -1 :block/page [:block/uuid page-uuid]]
|
||||
[:db/add -1 :block/parent [:block/uuid page-uuid]]
|
||||
[:db/add -1 :block/order "a0"]
|
||||
[:db/add [:block/uuid sibling-uuid] :block/title "sibling-2"]]
|
||||
{})
|
||||
normalized-a (db-normalize/normalize-tx-data (:db-after tx-report-a)
|
||||
(:db-before tx-report-a)
|
||||
(:tx-data tx-report-a))
|
||||
|
||||
;; Shape B: same recreate, plus child reparent to recreated target.
|
||||
tx-report-b (d/with db-before
|
||||
[[:db/retractEntity [:block/uuid target-uuid]]
|
||||
[:db/add -1 :block/uuid target-uuid]
|
||||
[:db/add -1 :block/title "new-target-b"]
|
||||
[:db/add -1 :block/page [:block/uuid page-uuid]]
|
||||
[:db/add -1 :block/parent [:block/uuid page-uuid]]
|
||||
[:db/add -1 :block/order "a2"]
|
||||
[:db/add [:block/uuid child-uuid] :block/parent [:block/uuid target-uuid]]
|
||||
[:db/add [:block/uuid sibling-uuid] :block/title "sibling-3"]]
|
||||
{})
|
||||
normalized-b (db-normalize/normalize-tx-data (:db-after tx-report-b)
|
||||
(:db-before tx-report-b)
|
||||
(:tx-data tx-report-b))]
|
||||
(is (some? (d/entity (:db-after tx-report-a) [:block/uuid target-uuid])))
|
||||
(is (some? (d/entity (:db-after tx-report-b) [:block/uuid target-uuid])))
|
||||
(is (tx-touches-uuid? normalized-a target-uuid)
|
||||
(str "shape A unexpectedly dropped recreated block from normalized tx: " (pr-str normalized-a)))
|
||||
(is (tx-touches-uuid? normalized-b target-uuid)
|
||||
(str "shape B unexpectedly dropped recreated block from normalized tx: " (pr-str normalized-b))))))
|
||||
|
||||
(deftest normalize-tx-data-replay-equivalence-for-retract-recreate-fuzz-test
|
||||
(testing "normalized tx-data should replay to same db-after for normal-block retract/recreate patterns"
|
||||
(let [conn (new-conn)
|
||||
page-uuid (create-page! conn "Page")]
|
||||
(d/transact! conn (mapv (fn [idx]
|
||||
{:block/uuid (random-uuid)
|
||||
:block/title (str "seed-block-" idx)
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid page-uuid]
|
||||
:block/order (str "a" idx)})
|
||||
(range 5)))
|
||||
(dotimes [seed 300]
|
||||
(let [rng (seeded-rng (+ 777 seed))
|
||||
db-before @conn
|
||||
blocks (normal-block-uuids db-before)
|
||||
target (pick-rand rng blocks)
|
||||
sibling (pick-rand rng (remove #(= % target) blocks))
|
||||
move-candidate (pick-rand rng (remove #(= % target) blocks))
|
||||
case-id (mod seed 6)
|
||||
base [[:db/retractEntity [:block/uuid target]]
|
||||
[:db/add -1 :block/uuid target]
|
||||
[:db/add -1 :block/title (str "fuzz-target-" seed)]
|
||||
[:db/add -1 :block/page [:block/uuid page-uuid]]
|
||||
[:db/add -1 :block/parent [:block/uuid page-uuid]]
|
||||
[:db/add -1 :block/order (str "z" (mod seed 5))]]
|
||||
tx-data (case case-id
|
||||
0
|
||||
(cond-> base
|
||||
sibling (conj [:db/add [:block/uuid sibling] :block/title (str "fuzz-sibling-title-" seed)]))
|
||||
|
||||
1
|
||||
(cond-> base
|
||||
sibling (conj [:db/retractEntity [:block/uuid sibling]]))
|
||||
|
||||
2
|
||||
(if sibling
|
||||
(into base
|
||||
[[:db/retractEntity [:block/uuid sibling]]
|
||||
[:db/add -2 :block/uuid sibling]
|
||||
[:db/add -2 :block/title (str "fuzz-sibling-recreated-" seed)]
|
||||
[:db/add -2 :block/page [:block/uuid page-uuid]]
|
||||
[:db/add -2 :block/parent [:block/uuid page-uuid]]
|
||||
[:db/add -2 :block/order (str "y" (mod seed 5))]])
|
||||
base)
|
||||
|
||||
3
|
||||
(into base
|
||||
[[:db/add -2 :block/uuid (random-uuid)]
|
||||
[:db/add -2 :block/title (str "fuzz-fresh-" seed)]
|
||||
[:db/add -2 :block/page [:block/uuid page-uuid]]
|
||||
[:db/add -2 :block/parent [:block/uuid page-uuid]]
|
||||
[:db/add -2 :block/order (str "x" (mod seed 7))]])
|
||||
|
||||
4
|
||||
(if sibling
|
||||
(into base
|
||||
[[:db/retractEntity [:block/uuid sibling]]
|
||||
[:db/add -2 :block/uuid sibling]
|
||||
[:db/add -2 :block/title (str "fuzz-sibling-reparented-" seed)]
|
||||
[:db/add -2 :block/page [:block/uuid page-uuid]]
|
||||
[:db/add -2 :block/parent [:block/uuid target]]
|
||||
[:db/add -2 :block/order "a0"]])
|
||||
base)
|
||||
|
||||
;; 5
|
||||
(cond-> base
|
||||
move-candidate
|
||||
(conj [:db/add [:block/uuid move-candidate] :block/parent [:block/uuid target]])))
|
||||
tx-report (d/with db-before tx-data {})
|
||||
normalized (db-normalize/normalize-tx-data (:db-after tx-report)
|
||||
(:db-before tx-report)
|
||||
(:tx-data tx-report))
|
||||
replay-report (d/with db-before normalized {})
|
||||
expected (block-state (:db-after tx-report))
|
||||
actual (block-state (:db-after replay-report))]
|
||||
(is (= expected actual)
|
||||
(str "seed=" seed
|
||||
"\noriginal=" (pr-str tx-data)
|
||||
"\nnormalized=" (pr-str normalized)
|
||||
"\nexpected-count=" (count expected)
|
||||
" actual-count=" (count actual))))))))
|
||||
|
||||
139
deps/db-sync/test/logseq/db_sync/storage_test.cljs
vendored
139
deps/db-sync/test/logseq/db_sync/storage_test.cljs
vendored
@@ -3,7 +3,10 @@
|
||||
[clojure.string :as string]
|
||||
[cljs.test :refer [deftest is testing]]
|
||||
[datascript.core :as d]
|
||||
[logseq.db :as ldb]
|
||||
[logseq.db-sync.common :as common]
|
||||
[logseq.db-sync.storage :as storage]
|
||||
[logseq.db.common.normalize :as db-normalize]
|
||||
[logseq.db-sync.test-sql :as test-sql]))
|
||||
|
||||
(def sqlite (if (find-ns 'nbb.core) (aget sqlite3 "default") sqlite3))
|
||||
@@ -37,6 +40,41 @@
|
||||
(finally
|
||||
(.close sql)))))
|
||||
|
||||
(defn- seeded-rng
|
||||
[seed0]
|
||||
(let [state (atom (bit-or (long seed0) 0))]
|
||||
(fn []
|
||||
(let [s (swap! state
|
||||
(fn [x]
|
||||
(let [x (bit-xor x (bit-shift-left x 13))
|
||||
x (bit-xor x (bit-shift-right x 17))
|
||||
x (bit-xor x (bit-shift-left x 5))]
|
||||
(bit-or x 0))))]
|
||||
(/ (double (unsigned-bit-shift-right s 0)) 4294967296.0)))))
|
||||
|
||||
(defn- rand-int*
|
||||
[rng n]
|
||||
(js/Math.floor (* (rng) n)))
|
||||
|
||||
(defn- pick-rand
|
||||
[rng coll]
|
||||
(when (seq coll)
|
||||
(nth coll (rand-int* rng (count coll)))))
|
||||
|
||||
(defn- normal-block-uuids
|
||||
[db]
|
||||
(->> (d/datoms db :avet :block/uuid)
|
||||
(map :e)
|
||||
distinct
|
||||
(keep (fn [eid]
|
||||
(let [ent (d/entity db eid)]
|
||||
(when (and (uuid? (:block/uuid ent))
|
||||
(not (ldb/built-in? ent))
|
||||
(nil? (:block/name ent))
|
||||
(some? (:block/page ent)))
|
||||
(:block/uuid ent)))))
|
||||
vec))
|
||||
|
||||
(deftest t-counter-test
|
||||
(let [sql (test-sql/make-sql)]
|
||||
(storage/init-schema! sql)
|
||||
@@ -74,3 +112,104 @@
|
||||
(is (= :ok result))
|
||||
(is (= stale-checksum
|
||||
(storage/get-checksum sql)))))))))
|
||||
|
||||
(deftest listener-failure-can-leave-kvs-ahead-of-tx-log-test
|
||||
(testing "if listener append fails after store, kvs can persist while tx_log/t stay unchanged"
|
||||
(with-memory-sql
|
||||
(fn [sql]
|
||||
(storage/init-schema! sql)
|
||||
(let [conn (storage/open-conn sql)
|
||||
stale-checksum "ffffffffffffffff"
|
||||
page-uuid (random-uuid)]
|
||||
;; Force append-tx-for-tx-report to throw on next non-empty tx.
|
||||
(storage/set-checksum! sql stale-checksum)
|
||||
(let [error (try
|
||||
(d/transact! conn [{:block/uuid page-uuid
|
||||
:block/name "repro-kvs-ahead-page"}])
|
||||
nil
|
||||
(catch :default e
|
||||
e))]
|
||||
(is (some? error))
|
||||
(is (string/includes? (or (ex-message error) (.-message error) "")
|
||||
"server checksum doesn't match"))
|
||||
;; Listener failed before append-tx!/next-t!, so tx_log/t/checksum metadata stay stale.
|
||||
(is (= 0 (storage/get-t sql)))
|
||||
(is (= [] (storage/fetch-tx-since sql 0)))
|
||||
(is (= stale-checksum (storage/get-checksum sql)))
|
||||
;; But kvs store can already be persisted; restoring a new conn sees the entity.
|
||||
(let [restored-conn (storage/open-conn sql)]
|
||||
(is (= page-uuid
|
||||
(:block/uuid (d/entity @restored-conn [:block/uuid page-uuid])))))))))))
|
||||
|
||||
(deftest normalize-drop-can-hide-kvs-mutation-from-tx-log-test
|
||||
(testing "if normalize drops tx payload, tx_log can miss persisted kvs state changes"
|
||||
(with-memory-sql
|
||||
(fn [sql]
|
||||
(storage/init-schema! sql)
|
||||
(let [conn (storage/open-conn sql)
|
||||
page-uuid (random-uuid)]
|
||||
(with-redefs [db-normalize/normalize-tx-data (fn [_db-after _db-before _tx-data]
|
||||
[])]
|
||||
(d/transact! conn [{:block/uuid page-uuid
|
||||
:block/name "normalize-drop-repro"}]))
|
||||
(is (= 1 (storage/get-t sql)))
|
||||
(let [entries (storage/fetch-tx-since sql 0)]
|
||||
(is (= 1 (count entries)))
|
||||
(is (= []
|
||||
(common/read-transit (:tx (first entries)))))
|
||||
(let [restored-conn (storage/open-conn sql)]
|
||||
(is (= page-uuid
|
||||
(:block/uuid (d/entity @restored-conn [:block/uuid page-uuid])))))))))))
|
||||
|
||||
(deftest randomized-normal-block-retract-recreate-does-not-throw-checksum-mismatch-test
|
||||
(testing "normal block retract/recreate patterns should not naturally trigger server checksum mismatch"
|
||||
(with-memory-sql
|
||||
(fn [sql]
|
||||
(storage/init-schema! sql)
|
||||
(let [conn (storage/open-conn sql)
|
||||
rng (seeded-rng 424242)
|
||||
page-uuid (random-uuid)
|
||||
_ (d/transact! conn [{:block/uuid page-uuid
|
||||
:block/name "repro-page"
|
||||
:block/title "repro-page"}])
|
||||
_ (d/transact! conn (mapv (fn [idx]
|
||||
{:block/uuid (random-uuid)
|
||||
:block/title (str "seed-" idx)
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid page-uuid]
|
||||
:block/order (str "a" idx)})
|
||||
(range 5)))
|
||||
*mismatch (atom nil)]
|
||||
(dotimes [step 500]
|
||||
(when-not @*mismatch
|
||||
(let [db @conn
|
||||
blocks (normal-block-uuids db)
|
||||
target (pick-rand rng blocks)
|
||||
sibling (pick-rand rng (remove #(= % target) blocks))
|
||||
add-fresh? (< (rng) 0.3)
|
||||
tx-data (cond-> [[:db/retractEntity [:block/uuid target]]
|
||||
[:db/add -1 :block/uuid target]
|
||||
[:db/add -1 :block/title (str "rr-" step)]
|
||||
[:db/add -1 :block/page [:block/uuid page-uuid]]
|
||||
[:db/add -1 :block/parent [:block/uuid page-uuid]]
|
||||
[:db/add -1 :block/order (str "z" (mod step 7))]]
|
||||
sibling
|
||||
(conj [:db/add [:block/uuid sibling] :block/title (str "sib-" step)])
|
||||
add-fresh?
|
||||
(into [[:db/add -2 :block/uuid (random-uuid)]
|
||||
[:db/add -2 :block/title (str "fresh-" step)]
|
||||
[:db/add -2 :block/page [:block/uuid page-uuid]]
|
||||
[:db/add -2 :block/parent [:block/uuid page-uuid]]
|
||||
[:db/add -2 :block/order (str "x" (mod step 9))]]))]
|
||||
(try
|
||||
(d/transact! conn tx-data)
|
||||
(catch :default e
|
||||
(let [message (or (ex-message e) (some-> e .-message) (str e))]
|
||||
(if (string/includes? message "server checksum doesn't match")
|
||||
(reset! *mismatch {:step step
|
||||
:tx-data tx-data
|
||||
:error message})
|
||||
;; tx can be invalid due random -2 fresh fields; ignore non-checksum failures
|
||||
nil)))))))
|
||||
(is (nil? @*mismatch)
|
||||
(str "found checksum mismatch repro: " (pr-str @*mismatch))))))))
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
[logseq.db-sync.node-server-test]
|
||||
[logseq.db-sync.normalize-test]
|
||||
[logseq.db-sync.platform-test]
|
||||
[logseq.db-sync.storage-test]
|
||||
[logseq.db-sync.worker-auth-test]
|
||||
[logseq.db-sync.worker-dispatch-test]
|
||||
[logseq.db-sync.worker-handler-assets-test]
|
||||
|
||||
@@ -225,3 +225,35 @@
|
||||
(p/catch (fn [error]
|
||||
(is false (str error))
|
||||
(done)))))))
|
||||
|
||||
(deftest graphs-delete-supports-node-delete-hook-without-do-namespace-test
|
||||
(async done
|
||||
(let [request (js/Request. "http://localhost/graphs/graph-1" #js {:method "DELETE"})
|
||||
url (js/URL. (.-url request))
|
||||
hook-calls* (atom [])]
|
||||
(-> (p/with-redefs [index/<user-has-access-to-graph? (fn [_db _graph-id _user-id]
|
||||
(p/resolved true))
|
||||
index/<graph-delete-metadata! (fn [_db _graph-id]
|
||||
(p/resolved true))
|
||||
index/<graph-delete-index-entry! (fn [_db _graph-id]
|
||||
(p/resolved true))]
|
||||
(p/let [resp (index-handler/handle {:db :db
|
||||
:env #js {"DB_SYNC_DELETE_GRAPH"
|
||||
(fn [graph-id]
|
||||
(swap! hook-calls* conj graph-id)
|
||||
(p/resolved true))}
|
||||
:request request
|
||||
:url url
|
||||
:claims #js {"sub" "user-1"}
|
||||
:route {:handler :graphs/delete
|
||||
:path-params {:graph-id "graph-1"}}})
|
||||
text (.text resp)
|
||||
body (js->clj (js/JSON.parse text) :keywordize-keys true)]
|
||||
(is (= 200 (.-status resp)))
|
||||
(is (= {:graph-id "graph-1" :deleted true} body))
|
||||
(is (= ["graph-1"] @hook-calls*))))
|
||||
(p/then (fn []
|
||||
(done)))
|
||||
(p/catch (fn [error]
|
||||
(is false (str error))
|
||||
(done)))))))
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
(ns logseq.db-sync.worker-handler-sync-test
|
||||
(:require [cljs.test :refer [async deftest is testing]]
|
||||
[datascript.core :as d]
|
||||
[logseq.db-sync.checksum :as sync-checksum]
|
||||
[logseq.db-sync.common :as common]
|
||||
[logseq.db-sync.index :as index]
|
||||
[logseq.db-sync.protocol :as protocol]
|
||||
@@ -12,9 +13,344 @@
|
||||
[logseq.db.frontend.schema :as db-schema]
|
||||
[promesa.core :as p]))
|
||||
|
||||
(defn- seeded-rng
|
||||
[seed0]
|
||||
(let [state (atom (bit-or (long seed0) 0))]
|
||||
(fn []
|
||||
(let [s (swap! state
|
||||
(fn [x]
|
||||
(let [x (bit-xor x (bit-shift-left x 13))
|
||||
x (bit-xor x (bit-shift-right x 17))
|
||||
x (bit-xor x (bit-shift-left x 5))]
|
||||
(bit-or x 0))))]
|
||||
(/ (double (unsigned-bit-shift-right s 0)) 4294967296.0)))))
|
||||
|
||||
(defn- rand-int*
|
||||
[rng n]
|
||||
(js/Math.floor (* (rng) n)))
|
||||
|
||||
(defn- pick-rand
|
||||
[rng coll]
|
||||
(when (seq coll)
|
||||
(nth coll (rand-int* rng (count coll)))))
|
||||
|
||||
(defn- block-uuids-by-predicate
|
||||
[db pred]
|
||||
(->> (d/datoms db :avet :block/uuid)
|
||||
(map :e)
|
||||
distinct
|
||||
(keep (fn [eid]
|
||||
(let [ent (d/entity db eid)
|
||||
uuid (:block/uuid ent)]
|
||||
(when (and uuid (pred ent))
|
||||
(str uuid)))))
|
||||
vec))
|
||||
|
||||
(defn- page-uuids
|
||||
[db]
|
||||
(block-uuids-by-predicate db #(some? (:block/name %))))
|
||||
|
||||
(defn- non-page-block-uuids
|
||||
[db]
|
||||
(block-uuids-by-predicate db #(nil? (:block/name %))))
|
||||
|
||||
(defn- all-block-uuids
|
||||
[db]
|
||||
(block-uuids-by-predicate db (constantly true)))
|
||||
|
||||
(defn- gen-server-tx-entry
|
||||
[rng db step]
|
||||
(let [page-ids (page-uuids db)
|
||||
block-ids (non-page-block-uuids db)
|
||||
all-ids (all-block-uuids db)
|
||||
op (rand-int* rng 6)]
|
||||
(case op
|
||||
;; Explicit empty rebase no-op
|
||||
0 {:tx (protocol/tx->transit [])
|
||||
:outliner-op :rebase}
|
||||
|
||||
;; stale retract in :fix should be sanitized away (often no-op)
|
||||
1 {:tx (protocol/tx->transit [[:db/retractEntity [:block/uuid (random-uuid)]]])
|
||||
:outliner-op :fix}
|
||||
|
||||
;; update title
|
||||
2 (if-let [target-id (pick-rand rng all-ids)]
|
||||
{:tx (protocol/tx->transit [[:db/add [:block/uuid (uuid target-id)]
|
||||
:block/title
|
||||
(str "server-fuzz-title-" step)]])
|
||||
:outliner-op :save-block}
|
||||
{:tx (protocol/tx->transit [])
|
||||
:outliner-op :rebase})
|
||||
|
||||
;; move block parent/page
|
||||
3 (if (and (seq block-ids) (seq page-ids))
|
||||
(let [child (pick-rand rng block-ids)
|
||||
parent (or (pick-rand rng block-ids)
|
||||
child)
|
||||
page (pick-rand rng page-ids)]
|
||||
{:tx (protocol/tx->transit [[:db/add [:block/uuid (uuid child)]
|
||||
:block/parent
|
||||
[:block/uuid (uuid parent)]]
|
||||
[:db/add [:block/uuid (uuid child)]
|
||||
:block/page
|
||||
[:block/uuid (uuid page)]]])
|
||||
:outliner-op :move-blocks})
|
||||
{:tx (protocol/tx->transit [])
|
||||
:outliner-op :rebase})
|
||||
|
||||
;; add block
|
||||
4 (if (seq page-ids)
|
||||
(let [page (pick-rand rng page-ids)
|
||||
parent (or (pick-rand rng block-ids)
|
||||
page)
|
||||
new-uuid (random-uuid)]
|
||||
{:tx (protocol/tx->transit [{:db/id -1
|
||||
:block/uuid new-uuid
|
||||
:block/title (str "server-fuzz-add-" step)
|
||||
:block/order (str "a" (rand-int* rng 9))
|
||||
:block/parent [:block/uuid (uuid parent)]
|
||||
:block/page [:block/uuid (uuid page)]}])
|
||||
:outliner-op :insert-blocks})
|
||||
{:tx (protocol/tx->transit [])
|
||||
:outliner-op :rebase})
|
||||
|
||||
;; delete non-page block
|
||||
(if-let [victim (pick-rand rng block-ids)]
|
||||
{:tx (protocol/tx->transit [[:db/retractEntity [:block/uuid (uuid victim)]]])
|
||||
:outliner-op :delete-blocks}
|
||||
{:tx (protocol/tx->transit [])
|
||||
:outliner-op :rebase}))))
|
||||
|
||||
(defn- empty-sql []
|
||||
#js {:exec (fn [& _] #js [])})
|
||||
|
||||
(defn- make-server-self
|
||||
[]
|
||||
(let [sql (test-sql/make-sql)
|
||||
conn (storage/open-conn sql)
|
||||
self #js {:sql sql
|
||||
:conn conn
|
||||
:schema-ready true}]
|
||||
{:sql sql
|
||||
:conn conn
|
||||
:self self}))
|
||||
|
||||
(defn- apply-entries!
|
||||
[^js self entries]
|
||||
(loop [t-before (storage/get-t (.-sql self))
|
||||
remaining entries]
|
||||
(if-let [entry (first remaining)]
|
||||
(let [response (with-redefs [ws/broadcast! (fn [& _] nil)]
|
||||
(sync-handler/handle-tx-batch! self nil [entry] t-before))]
|
||||
(is (= "tx/batch/ok" (:type response)))
|
||||
(recur (:t response) (next remaining)))
|
||||
t-before)))
|
||||
|
||||
(defn- apply-batch-with-t!
|
||||
[^js self t-before entries]
|
||||
(with-redefs [ws/broadcast! (fn [& _] nil)]
|
||||
(sync-handler/handle-tx-batch! self nil entries t-before)))
|
||||
|
||||
(defn- assert-server-checksum-step!
|
||||
[sql conn prev-t prev-checksum response label]
|
||||
(let [stored-checksum (storage/get-checksum sql)
|
||||
recomputed-checksum (sync-checksum/recompute-checksum @conn)
|
||||
new-t (storage/get-t sql)
|
||||
accepted? (= "tx/batch/ok" (:type response))
|
||||
advanced? (> new-t prev-t)]
|
||||
(is (= new-t (:t response))
|
||||
(str label " response.t should match storage t"))
|
||||
(if accepted?
|
||||
(if advanced?
|
||||
(do
|
||||
(is (string? stored-checksum)
|
||||
(str label " stored checksum missing after mutation"))
|
||||
(is (= recomputed-checksum stored-checksum)
|
||||
(str label " stored checksum should equal full recompute")))
|
||||
(is (= prev-checksum stored-checksum)
|
||||
(str label " checksum changed on no-op accepted batch")))
|
||||
(do
|
||||
(is (= "tx/reject" (:type response))
|
||||
(str label " expected tx rejection"))
|
||||
(is (= prev-t new-t)
|
||||
(str label " rejected tx should not change t"))
|
||||
(is (= prev-checksum stored-checksum)
|
||||
(str label " rejected tx should not change checksum"))))
|
||||
{:accepted? accepted?
|
||||
:advanced? advanced?
|
||||
:t new-t
|
||||
:checksum stored-checksum}))
|
||||
|
||||
(defn- block-placement
|
||||
[db block-uuid]
|
||||
(let [ent (d/pull db [{:block/parent [:block/uuid :block/name]}
|
||||
{:block/page [:block/uuid :block/name]}
|
||||
:block/order]
|
||||
[:block/uuid block-uuid])]
|
||||
{:parent-uuid (get-in ent [:block/parent :block/uuid])
|
||||
:parent-page? (boolean (get-in ent [:block/parent :block/name]))
|
||||
:page-uuid (get-in ent [:block/page :block/uuid])
|
||||
:order (:block/order ent)}))
|
||||
|
||||
(defn- no-op-rebase-entry
|
||||
[]
|
||||
{:tx (protocol/tx->transit [])
|
||||
:outliner-op :rebase})
|
||||
|
||||
(defn- tx-entry-appliable?
|
||||
[db {:keys [tx]}]
|
||||
(try
|
||||
(d/with db (protocol/transit->tx tx))
|
||||
true
|
||||
(catch :default _
|
||||
false)))
|
||||
|
||||
(defn- tx-entries-appliable?
|
||||
[db entries]
|
||||
(every? (partial tx-entry-appliable? db) entries))
|
||||
|
||||
(defn- make-insert-command
|
||||
[rng db step]
|
||||
(let [pages (page-uuids db)
|
||||
blocks (non-page-block-uuids db)]
|
||||
(if-let [page-id (pick-rand rng pages)]
|
||||
(let [parent-id (or (pick-rand rng blocks) page-id)
|
||||
new-uuid (random-uuid)
|
||||
entry {:tx (protocol/tx->transit [{:db/id -1
|
||||
:block/uuid new-uuid
|
||||
:block/title (str "rand-insert-" step)
|
||||
:block/order (str "a" step "-" (rand-int* rng 9))
|
||||
:block/parent [:block/uuid (uuid parent-id)]
|
||||
:block/page [:block/uuid (uuid page-id)]}])
|
||||
:outliner-op :insert-blocks}
|
||||
inverse {:tx (protocol/tx->transit [[:db/retractEntity [:block/uuid new-uuid]]])
|
||||
:outliner-op :delete-blocks}]
|
||||
{:forward [entry]
|
||||
:inverse [inverse]
|
||||
:undoable? true})
|
||||
{:forward [(no-op-rebase-entry)]
|
||||
:undoable? false})))
|
||||
|
||||
(defn- make-title-command
|
||||
[rng db step]
|
||||
(if-let [target-id (pick-rand rng (all-block-uuids db))]
|
||||
(let [target-uuid (uuid target-id)
|
||||
old-title (or (:block/title (d/pull db [:block/title] [:block/uuid target-uuid])) "")
|
||||
new-title (str "rand-title-" step)]
|
||||
{:forward [{:tx (protocol/tx->transit [[:db/add [:block/uuid target-uuid]
|
||||
:block/title
|
||||
new-title]])
|
||||
:outliner-op :save-block}]
|
||||
:inverse [{:tx (protocol/tx->transit [[:db/add [:block/uuid target-uuid]
|
||||
:block/title
|
||||
old-title]])
|
||||
:outliner-op :save-block}]
|
||||
:undoable? true})
|
||||
{:forward [(no-op-rebase-entry)]
|
||||
:undoable? false}))
|
||||
|
||||
(defn- make-move-like-command
|
||||
[db target-id new-parent-id new-page-id new-order outliner-op]
|
||||
(let [target-uuid (uuid target-id)
|
||||
placement (block-placement db target-uuid)]
|
||||
(if (and (:parent-uuid placement) (:page-uuid placement))
|
||||
{:forward [{:tx (protocol/tx->transit [[:db/add [:block/uuid target-uuid]
|
||||
:block/parent
|
||||
[:block/uuid (uuid new-parent-id)]]
|
||||
[:db/add [:block/uuid target-uuid]
|
||||
:block/page
|
||||
[:block/uuid (uuid new-page-id)]]
|
||||
[:db/add [:block/uuid target-uuid]
|
||||
:block/order
|
||||
new-order]])
|
||||
:outliner-op outliner-op}]
|
||||
:inverse [{:tx (protocol/tx->transit [[:db/add [:block/uuid target-uuid]
|
||||
:block/parent
|
||||
[:block/uuid (:parent-uuid placement)]]
|
||||
[:db/add [:block/uuid target-uuid]
|
||||
:block/page
|
||||
[:block/uuid (:page-uuid placement)]]
|
||||
[:db/add [:block/uuid target-uuid]
|
||||
:block/order
|
||||
(:order placement)]])
|
||||
:outliner-op outliner-op}]
|
||||
:undoable? true}
|
||||
{:forward [(no-op-rebase-entry)]
|
||||
:undoable? false})))
|
||||
|
||||
(defn- make-random-move-command
|
||||
[rng db step]
|
||||
(let [blocks (non-page-block-uuids db)
|
||||
pages (page-uuids db)]
|
||||
(if (and (seq blocks) (seq pages))
|
||||
(let [target-id (pick-rand rng blocks)
|
||||
parent-candidates (vec (remove #{target-id} (concat blocks pages)))
|
||||
parent-id (or (pick-rand rng parent-candidates) (pick-rand rng pages))
|
||||
page-id (pick-rand rng pages)]
|
||||
(make-move-like-command db target-id parent-id page-id (str "m" step) :move-blocks))
|
||||
{:forward [(no-op-rebase-entry)]
|
||||
:undoable? false})))
|
||||
|
||||
(defn- make-random-indent-command
|
||||
[rng db step]
|
||||
(let [blocks (non-page-block-uuids db)
|
||||
pages (page-uuids db)]
|
||||
(if (and (seq blocks) (seq pages))
|
||||
(let [child-id (pick-rand rng blocks)
|
||||
parent-candidates (vec (remove #{child-id} blocks))
|
||||
parent-id (or (pick-rand rng parent-candidates)
|
||||
(pick-rand rng pages))
|
||||
page-id (pick-rand rng pages)]
|
||||
(make-move-like-command db child-id parent-id page-id (str "i" step) :indent-blocks))
|
||||
{:forward [(no-op-rebase-entry)]
|
||||
:undoable? false})))
|
||||
|
||||
(defn- make-random-outdent-command
|
||||
[rng db step]
|
||||
(let [candidates (->> (non-page-block-uuids db)
|
||||
(keep (fn [block-id]
|
||||
(let [placement (block-placement db (uuid block-id))]
|
||||
(when (and (:parent-uuid placement)
|
||||
(not (:parent-page? placement))
|
||||
(:page-uuid placement))
|
||||
block-id))))
|
||||
vec)]
|
||||
(if-let [child-id (pick-rand rng candidates)]
|
||||
(let [child-uuid (uuid child-id)
|
||||
placement (block-placement db child-uuid)
|
||||
parent-placement (block-placement db (:parent-uuid placement))]
|
||||
(if-let [grandparent-uuid (:parent-uuid parent-placement)]
|
||||
(make-move-like-command db child-id (str grandparent-uuid) (str (:page-uuid placement)) (str "o" step "-" (rand-int* rng 9)) :outdent-blocks)
|
||||
{:forward [(no-op-rebase-entry)]
|
||||
:undoable? false}))
|
||||
{:forward [(no-op-rebase-entry)]
|
||||
:undoable? false})))
|
||||
|
||||
(defn- make-random-delete-entry
|
||||
[rng db]
|
||||
(if-let [victim-id (pick-rand rng (non-page-block-uuids db))]
|
||||
{:tx (protocol/tx->transit [[:db/retractEntity [:block/uuid (uuid victim-id)]]])
|
||||
:outliner-op :delete-blocks}
|
||||
(no-op-rebase-entry)))
|
||||
|
||||
(defn- make-stale-add-after-delete-conflict
|
||||
[rng db step]
|
||||
(let [blocks (non-page-block-uuids db)
|
||||
pages (page-uuids db)]
|
||||
(when (and (seq blocks) (seq pages))
|
||||
(let [victim-id (pick-rand rng blocks)
|
||||
page-id (pick-rand rng pages)
|
||||
stale-child-uuid (random-uuid)]
|
||||
{:delete-entry {:tx (protocol/tx->transit [[:db/retractEntity [:block/uuid (uuid victim-id)]]])
|
||||
:outliner-op :delete-blocks}
|
||||
:stale-add-entry {:tx (protocol/tx->transit [{:db/id -1
|
||||
:block/uuid stale-child-uuid
|
||||
:block/title (str "stale-child-" step)
|
||||
:block/order (str "c" step)
|
||||
:block/parent [:block/uuid (uuid victim-id)]
|
||||
:block/page [:block/uuid (uuid page-id)]}])
|
||||
:outliner-op :insert-blocks}}))))
|
||||
|
||||
(defn- request-url
|
||||
([]
|
||||
(request-url "/sync/graph-1/snapshot/download?graph-id=graph-1"))
|
||||
@@ -36,7 +372,12 @@
|
||||
:schema-ready true
|
||||
:sql sql}
|
||||
{:keys [request url]} (request-url)
|
||||
expected-url "http://localhost/sync/graph-1/snapshot/stream"]
|
||||
expected-url "http://localhost/sync/graph-1/snapshot/stream"
|
||||
original-compression-stream (.-CompressionStream js/globalThis)
|
||||
restore! #(aset js/globalThis "CompressionStream" original-compression-stream)]
|
||||
(aset js/globalThis
|
||||
"CompressionStream"
|
||||
(passthrough-compression-stream-constructor))
|
||||
(-> (p/let [resp (sync-handler/handle {:self self
|
||||
:request request
|
||||
:url url
|
||||
@@ -47,7 +388,33 @@
|
||||
(is (= true (:ok body)))
|
||||
(is (= "stream/graph-1.snapshot" (:key body)))
|
||||
(is (= expected-url (:url body)))
|
||||
(is (= "gzip" (:content-encoding body)))
|
||||
(is (= "gzip" (:content-encoding body))))
|
||||
(p/then (fn []
|
||||
(restore!)
|
||||
(done)))
|
||||
(p/catch (fn [error]
|
||||
(restore!)
|
||||
(is false (str error))
|
||||
(done)))))))
|
||||
|
||||
(deftest snapshot-download-omits-gzip-encoding-when-disabled-in-env-test
|
||||
(async done
|
||||
(let [sql (empty-sql)
|
||||
conn (d/create-conn db-schema/schema)
|
||||
self #js {:env #js {"DB_SYNC_SNAPSHOT_STREAM_GZIP" "false"}
|
||||
:conn conn
|
||||
:schema-ready true
|
||||
:sql sql}
|
||||
{:keys [request url]} (request-url)]
|
||||
(-> (p/let [resp (sync-handler/handle {:self self
|
||||
:request request
|
||||
:url url
|
||||
:route {:handler :sync/snapshot-download}})
|
||||
text (.text resp)
|
||||
body (js->clj (js/JSON.parse text) :keywordize-keys true)]
|
||||
(is (= 200 (.-status resp)))
|
||||
(is (= true (:ok body)))
|
||||
(is (not (contains? body :content-encoding)))
|
||||
(done))
|
||||
(p/then (fn []
|
||||
nil))
|
||||
@@ -101,6 +468,41 @@
|
||||
(is false (str error))
|
||||
(done)))))))
|
||||
|
||||
(deftest snapshot-download-stream-route-returns-uncompressed-framed-kvs-rows-when-disabled-in-env-test
|
||||
(async done
|
||||
(let [rows [[1 "row-1" nil]
|
||||
[2 "row-2" nil]]
|
||||
sql (empty-sql)
|
||||
conn (d/create-conn db-schema/schema)
|
||||
self #js {:env #js {"DB_SYNC_SNAPSHOT_STREAM_GZIP" "false"}
|
||||
:conn conn
|
||||
:schema-ready true
|
||||
:sql sql}
|
||||
{:keys [request]} (request-url "/sync/graph-1/snapshot/stream?graph-id=graph-1")]
|
||||
(-> (p/with-redefs [sync-handler/fetch-snapshot-kvs-rows (fn [_sql last-addr _limit]
|
||||
(if (neg? last-addr) rows []))
|
||||
sync-handler/snapshot-row-count (fn [_sql] (count rows))]
|
||||
(p/let [resp (sync-handler/handle-http self request)
|
||||
encoding (.get (.-headers resp) "content-encoding")
|
||||
content-type (.get (.-headers resp) "content-type")
|
||||
buf (.arrayBuffer resp)
|
||||
payload (js/Uint8Array. buf)
|
||||
rows (snapshot/finalize-framed-buffer payload)
|
||||
addrs (mapv first rows)]
|
||||
(is (= 200 (.-status resp)))
|
||||
(is (nil? encoding))
|
||||
(is (= "application/transit+json" content-type))
|
||||
(is (= 2 (count rows)))
|
||||
(is (= (sort addrs) addrs))
|
||||
(is (= [[1 "row-1" nil]
|
||||
[2 "row-2" nil]]
|
||||
rows))))
|
||||
(p/then (fn []
|
||||
(done)))
|
||||
(p/catch (fn [error]
|
||||
(is false (str error))
|
||||
(done)))))))
|
||||
|
||||
(deftest ensure-schema-fallback-probes-existing-tables-test
|
||||
(async done
|
||||
(let [self #js {:sql (empty-sql)}
|
||||
@@ -234,6 +636,389 @@
|
||||
(is (nil? (:data response)))
|
||||
(is (= 2 @apply-calls)))))
|
||||
|
||||
(deftest tx-batch-reject-includes-success-and-failed-tx-ids-test
|
||||
(testing "partial failure returns success and failed tx ids and broadcasts changed once"
|
||||
(let [sql (test-sql/make-sql)
|
||||
conn (storage/open-conn sql)
|
||||
self #js {:sql sql
|
||||
:conn conn
|
||||
:schema-ready true}
|
||||
success-tx-id (random-uuid)
|
||||
failed-tx-id (random-uuid)
|
||||
success-block-uuid (random-uuid)
|
||||
missing-uuid (random-uuid)
|
||||
tx-entry-1 {:tx-id success-tx-id
|
||||
:tx (protocol/tx->transit [{:db/id -1
|
||||
:block/uuid success-block-uuid
|
||||
:block/title "ok"}])
|
||||
:outliner-op :save-block}
|
||||
tx-entry-2 {:tx-id failed-tx-id
|
||||
:tx (protocol/tx->transit [[:db/add [:block/uuid missing-uuid] :block/title "stale" 1]])
|
||||
:outliner-op :save-block}
|
||||
changed-messages (atom [])
|
||||
response (with-redefs [ws/broadcast! (fn [_self _sender payload]
|
||||
(swap! changed-messages conj payload))]
|
||||
(sync-handler/handle-tx-batch! self nil [tx-entry-1 tx-entry-2] 0))]
|
||||
(is (= "tx/reject" (:type response)))
|
||||
(is (= "db transact failed" (:reason response)))
|
||||
(is (= 1 (:t response)))
|
||||
(is (= [success-tx-id] (:success-tx-ids response)))
|
||||
(is (= failed-tx-id (:failed-tx-id response)))
|
||||
(is (= [{:type "changed" :t 1}] @changed-messages))
|
||||
(is (some? (d/entity @conn [:block/uuid success-block-uuid])))
|
||||
(is (nil? (d/entity @conn [:block/uuid missing-uuid]))))))
|
||||
|
||||
(deftest tx-batch-ignores-empty-rebase-entry-test
|
||||
(testing "empty rebase entry is a no-op: no t increment, no tx-log append, no changed broadcast"
|
||||
(let [sql (test-sql/make-sql)
|
||||
conn (storage/open-conn sql)
|
||||
self #js {:sql sql
|
||||
:conn conn
|
||||
:schema-ready true}
|
||||
t-before (storage/get-t sql)
|
||||
tx-entry {:tx (protocol/tx->transit [])
|
||||
:outliner-op :rebase}
|
||||
changed-messages (atom [])
|
||||
response (with-redefs [ws/broadcast! (fn [_self _sender payload]
|
||||
(swap! changed-messages conj payload))]
|
||||
(sync-handler/handle-tx-batch! self nil [tx-entry] t-before))]
|
||||
(is (= "tx/batch/ok" (:type response)))
|
||||
(is (= t-before (:t response)))
|
||||
(is (empty? (storage/fetch-tx-since sql t-before)))
|
||||
(is (empty? @changed-messages)))))
|
||||
|
||||
(deftest tx-batch-mixed-empty-rebase-and-real-entry-test
|
||||
(testing "empty rebase entry is ignored while real tx still applies"
|
||||
(let [sql (test-sql/make-sql)
|
||||
conn (storage/open-conn sql)
|
||||
self #js {:sql sql
|
||||
:conn conn
|
||||
:schema-ready true}
|
||||
t-before (storage/get-t sql)
|
||||
noop-rebase-entry {:tx (protocol/tx->transit [])
|
||||
:outliner-op :rebase}
|
||||
block-uuid (random-uuid)
|
||||
real-entry {:tx (protocol/tx->transit [[:db/add -1 :block/uuid block-uuid]
|
||||
[:db/add -1 :block/title "applied"]])
|
||||
:outliner-op :save-block}
|
||||
changed-messages (atom [])
|
||||
response (with-redefs [ws/broadcast! (fn [_self _sender payload]
|
||||
(swap! changed-messages conj payload))]
|
||||
(sync-handler/handle-tx-batch! self nil [noop-rebase-entry real-entry] t-before))
|
||||
txs (storage/fetch-tx-since sql t-before)]
|
||||
(is (= "tx/batch/ok" (:type response)))
|
||||
(is (= (inc t-before) (:t response)))
|
||||
(is (= 1 (count txs)))
|
||||
(is (= :save-block (:outliner-op (first txs))))
|
||||
(is (= [{:type "changed" :t (inc t-before)}] @changed-messages)))))
|
||||
|
||||
(deftest tx-batch-ignores-stale-rebase-with-missing-lookup-entity-test
|
||||
(testing "stale rebase lookup refs to missing entities are treated as no-op"
|
||||
(let [sql (test-sql/make-sql)
|
||||
conn (storage/open-conn sql)
|
||||
self #js {:sql sql
|
||||
:conn conn
|
||||
:schema-ready true}
|
||||
page-uuid (random-uuid)
|
||||
parent-uuid (random-uuid)
|
||||
missing-block-uuid (random-uuid)
|
||||
_ (d/transact! conn [{:block/uuid page-uuid
|
||||
:block/name "rebase-stale-page"
|
||||
:block/title "rebase-stale-page"}
|
||||
{:block/uuid parent-uuid
|
||||
:block/title "existing-parent"
|
||||
:block/order "a0"
|
||||
:block/parent [:block/uuid page-uuid]
|
||||
:block/page [:block/uuid page-uuid]}])
|
||||
t-before (storage/get-t sql)
|
||||
checksum-before (storage/get-checksum sql)
|
||||
tx-entry {:tx (protocol/tx->transit
|
||||
[[:db/retract [:block/uuid missing-block-uuid]
|
||||
:block/parent
|
||||
[:block/uuid page-uuid]
|
||||
536882158]
|
||||
[:db/add [:block/uuid missing-block-uuid]
|
||||
:block/parent
|
||||
[:block/uuid parent-uuid]
|
||||
536882158]
|
||||
[:db/retract [:block/uuid missing-block-uuid]
|
||||
:block/order
|
||||
"a100001V"
|
||||
536882158]
|
||||
[:db/add [:block/uuid missing-block-uuid]
|
||||
:block/order
|
||||
"a0"
|
||||
536882158]])
|
||||
:outliner-op :rebase}
|
||||
changed-messages (atom [])
|
||||
response (with-redefs [ws/broadcast! (fn [_self _sender payload]
|
||||
(swap! changed-messages conj payload))]
|
||||
(sync-handler/handle-tx-batch! self nil [tx-entry] t-before))]
|
||||
(is (= "tx/batch/ok" (:type response)))
|
||||
(is (= t-before (:t response)))
|
||||
(is (= checksum-before (storage/get-checksum sql)))
|
||||
(is (empty? (storage/fetch-tx-since sql t-before)))
|
||||
(is (empty? @changed-messages)))))
|
||||
|
||||
(deftest server-incremental-checksum-matches-full-recompute-fuzz-test
|
||||
(testing "server stored checksum stays equal to full recompute across randomized tx/rebase/no-op sequences"
|
||||
(doseq [seed (range 1 11)]
|
||||
(let [sql (test-sql/make-sql)
|
||||
conn (storage/open-conn sql)
|
||||
self #js {:sql sql
|
||||
:conn conn
|
||||
:schema-ready true}
|
||||
page-uuid (random-uuid)
|
||||
root-block-uuid (random-uuid)
|
||||
_ (d/transact! conn [{:block/uuid page-uuid
|
||||
:block/name (str "server-fuzz-page-" seed)
|
||||
:block/title (str "server-fuzz-page-" seed)}
|
||||
{:block/uuid root-block-uuid
|
||||
:block/title (str "server-fuzz-root-" seed)
|
||||
:block/order "a0"
|
||||
:block/parent [:block/uuid page-uuid]
|
||||
:block/page [:block/uuid page-uuid]}])
|
||||
rng (seeded-rng seed)]
|
||||
(loop [step 0
|
||||
prev-t (storage/get-t sql)
|
||||
prev-checksum (storage/get-checksum sql)]
|
||||
(when (< step 60)
|
||||
(let [entry (gen-server-tx-entry rng @conn step)
|
||||
response (with-redefs [ws/broadcast! (fn [& _] nil)]
|
||||
(sync-handler/handle-tx-batch! self nil [entry] prev-t))
|
||||
new-t (:t response)
|
||||
stored-checksum (storage/get-checksum sql)
|
||||
recomputed-checksum (sync-checksum/recompute-checksum @conn)]
|
||||
(is (= "tx/batch/ok" (:type response))
|
||||
(str "expected tx/batch/ok at seed " seed " step " step))
|
||||
(is (= new-t (storage/get-t sql))
|
||||
(str "t mismatch at seed " seed " step " step))
|
||||
(if (> new-t prev-t)
|
||||
(do
|
||||
(is (string? stored-checksum)
|
||||
(str "stored checksum missing after mutation at seed " seed " step " step))
|
||||
(is (= recomputed-checksum stored-checksum)
|
||||
(str "checksum mismatch at seed " seed " step " step
|
||||
" recomputed=" recomputed-checksum
|
||||
" stored=" stored-checksum)))
|
||||
(is (= prev-checksum stored-checksum)
|
||||
(str "checksum changed on no-op batch at seed " seed " step " step)))
|
||||
(recur (inc step) new-t stored-checksum))))))))
|
||||
|
||||
(deftest server-checksum-is-invariant-across-commuting-batch-order-test
|
||||
(testing "server checksum converges when commuting tx entries are applied in opposite order"
|
||||
(let [page-uuid (random-uuid)
|
||||
block-a-uuid (random-uuid)
|
||||
block-b-uuid (random-uuid)
|
||||
seed-db! (fn [conn]
|
||||
(d/transact! conn [{:block/uuid page-uuid
|
||||
:block/name "server-order-page"
|
||||
:block/title "server-order-page"}
|
||||
{:block/uuid block-a-uuid
|
||||
:block/title "A0"
|
||||
:block/order "a0"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid page-uuid]}
|
||||
{:block/uuid block-b-uuid
|
||||
:block/title "B0"
|
||||
:block/order "a1"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid page-uuid]}]))
|
||||
entry-a {:tx (protocol/tx->transit [[:db/add [:block/uuid block-a-uuid]
|
||||
:block/title
|
||||
"A1"]])
|
||||
:outliner-op :save-block}
|
||||
entry-b {:tx (protocol/tx->transit [[:db/add [:block/uuid block-b-uuid]
|
||||
:block/order
|
||||
"a9"]])
|
||||
:outliner-op :save-block}
|
||||
{:keys [self conn sql]} (make-server-self)
|
||||
_ (seed-db! conn)
|
||||
_ (apply-entries! self [entry-a entry-b])
|
||||
checksum-ab (storage/get-checksum sql)
|
||||
recompute-ab (sync-checksum/recompute-checksum @conn)
|
||||
pull-ab [(d/pull @conn [:block/title :block/order] [:block/uuid block-a-uuid])
|
||||
(d/pull @conn [:block/title :block/order] [:block/uuid block-b-uuid])]
|
||||
{:keys [self conn sql]} (make-server-self)
|
||||
_ (seed-db! conn)
|
||||
_ (apply-entries! self [entry-b entry-a])
|
||||
checksum-ba (storage/get-checksum sql)
|
||||
recompute-ba (sync-checksum/recompute-checksum @conn)
|
||||
pull-ba [(d/pull @conn [:block/title :block/order] [:block/uuid block-a-uuid])
|
||||
(d/pull @conn [:block/title :block/order] [:block/uuid block-b-uuid])]]
|
||||
(is (= recompute-ab checksum-ab))
|
||||
(is (= recompute-ba checksum-ba))
|
||||
(is (= checksum-ab checksum-ba))
|
||||
(is (= pull-ab pull-ba)))))
|
||||
|
||||
(deftest server-checksum-is-invariant-across-tx-partitioning-test
|
||||
(testing "server checksum converges when identical tx-data is sent as one entry or split entries"
|
||||
(let [page-uuid (random-uuid)
|
||||
block-a-uuid (random-uuid)
|
||||
block-b-uuid (random-uuid)
|
||||
seed-db! (fn [conn]
|
||||
(d/transact! conn [{:block/uuid page-uuid
|
||||
:block/name "server-partition-page"
|
||||
:block/title "server-partition-page"}
|
||||
{:block/uuid block-a-uuid
|
||||
:block/title "A0"
|
||||
:block/order "a0"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid page-uuid]}
|
||||
{:block/uuid block-b-uuid
|
||||
:block/title "B0"
|
||||
:block/order "a1"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid page-uuid]}]))
|
||||
datom-a [:db/add [:block/uuid block-a-uuid] :block/title "A2"]
|
||||
datom-b [:db/add [:block/uuid block-b-uuid] :block/order "a8"]
|
||||
one-entry {:tx (protocol/tx->transit [datom-a datom-b])
|
||||
:outliner-op :save-block}
|
||||
split-entry-a {:tx (protocol/tx->transit [datom-a])
|
||||
:outliner-op :save-block}
|
||||
split-entry-b {:tx (protocol/tx->transit [datom-b])
|
||||
:outliner-op :save-block}
|
||||
{:keys [self conn sql]} (make-server-self)
|
||||
_ (seed-db! conn)
|
||||
_ (apply-entries! self [one-entry])
|
||||
checksum-one (storage/get-checksum sql)
|
||||
recompute-one (sync-checksum/recompute-checksum @conn)
|
||||
pull-one [(d/pull @conn [:block/title :block/order] [:block/uuid block-a-uuid])
|
||||
(d/pull @conn [:block/title :block/order] [:block/uuid block-b-uuid])]
|
||||
{:keys [self conn sql]} (make-server-self)
|
||||
_ (seed-db! conn)
|
||||
_ (apply-entries! self [split-entry-a split-entry-b])
|
||||
checksum-split (storage/get-checksum sql)
|
||||
recompute-split (sync-checksum/recompute-checksum @conn)
|
||||
pull-split [(d/pull @conn [:block/title :block/order] [:block/uuid block-a-uuid])
|
||||
(d/pull @conn [:block/title :block/order] [:block/uuid block-b-uuid])]]
|
||||
(is (= recompute-one checksum-one))
|
||||
(is (= recompute-split checksum-split))
|
||||
(is (= checksum-one checksum-split))
|
||||
(is (= pull-one pull-split)))))
|
||||
|
||||
(deftest server-checksum-remains-correct-under-random-outliner-conflicts-test
|
||||
(testing "random insert/move/indent/outdent/delete with stale-client conflicts and undo/redo keeps checksum correct"
|
||||
(doseq [seed (range 31 35)]
|
||||
(let [{:keys [self conn sql]} (make-server-self)
|
||||
page-uuid (random-uuid)
|
||||
root-uuid (random-uuid)
|
||||
child-a-uuid (random-uuid)
|
||||
child-b-uuid (random-uuid)
|
||||
_ (d/transact! conn [{:block/uuid page-uuid
|
||||
:block/name (str "outliner-fuzz-page-" seed)
|
||||
:block/title (str "outliner-fuzz-page-" seed)}
|
||||
{:block/uuid root-uuid
|
||||
:block/title "root"
|
||||
:block/order "a0"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid page-uuid]}
|
||||
{:block/uuid child-a-uuid
|
||||
:block/title "child-a"
|
||||
:block/order "a1"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid root-uuid]}
|
||||
{:block/uuid child-b-uuid
|
||||
:block/title "child-b"
|
||||
:block/order "a2"
|
||||
:block/page [:block/uuid page-uuid]
|
||||
:block/parent [:block/uuid root-uuid]}])
|
||||
rng (seeded-rng (* seed 7919))]
|
||||
(loop [step 0
|
||||
t-before (storage/get-t sql)
|
||||
checksum-before (storage/get-checksum sql)
|
||||
undo-stack []
|
||||
redo-stack []]
|
||||
(when (< step 80)
|
||||
(let [db @conn
|
||||
op (rand-int* rng 11)]
|
||||
(cond
|
||||
;; explicit conflict scenario: delete parent then stale client inserts child under deleted parent
|
||||
(= op 0)
|
||||
(if-let [{:keys [delete-entry stale-add-entry]} (make-stale-add-after-delete-conflict rng db step)]
|
||||
(let [delete-response (apply-batch-with-t! self t-before [delete-entry])
|
||||
delete-state (assert-server-checksum-step! sql conn t-before checksum-before delete-response
|
||||
(str "seed " seed " step " step " delete-before-stale-add"))
|
||||
stale-response (apply-batch-with-t! self (:t delete-state) [stale-add-entry])
|
||||
stale-state (assert-server-checksum-step! sql conn (:t delete-state) (:checksum delete-state) stale-response
|
||||
(str "seed " seed " step " step " stale-add-after-delete"))]
|
||||
(is (= "tx/reject" (:type stale-response))
|
||||
(str "seed " seed " step " step " stale child insert should be rejected"))
|
||||
(recur (inc step) (:t stale-state) (:checksum stale-state) undo-stack redo-stack))
|
||||
(let [noop-response (apply-batch-with-t! self t-before [(no-op-rebase-entry)])
|
||||
noop-state (assert-server-checksum-step! sql conn t-before checksum-before noop-response
|
||||
(str "seed " seed " step " step " fallback-noop"))]
|
||||
(recur (inc step) (:t noop-state) (:checksum noop-state) undo-stack redo-stack)))
|
||||
|
||||
;; undo
|
||||
(= op 1)
|
||||
(if-let [{:keys [forward inverse]} (peek undo-stack)]
|
||||
(let [entries (if (tx-entries-appliable? db inverse)
|
||||
inverse
|
||||
[(no-op-rebase-entry)])
|
||||
response (apply-batch-with-t! self t-before entries)
|
||||
state (assert-server-checksum-step! sql conn t-before checksum-before response
|
||||
(str "seed " seed " step " step " undo"))]
|
||||
(recur (inc step)
|
||||
(:t state)
|
||||
(:checksum state)
|
||||
(pop undo-stack)
|
||||
(if (:advanced? state)
|
||||
(conj redo-stack {:forward forward :inverse inverse})
|
||||
redo-stack)))
|
||||
(let [noop-response (apply-batch-with-t! self t-before [(no-op-rebase-entry)])
|
||||
noop-state (assert-server-checksum-step! sql conn t-before checksum-before noop-response
|
||||
(str "seed " seed " step " step " undo-noop"))]
|
||||
(recur (inc step) (:t noop-state) (:checksum noop-state) undo-stack redo-stack)))
|
||||
|
||||
;; redo
|
||||
(= op 2)
|
||||
(if-let [{:keys [forward inverse]} (peek redo-stack)]
|
||||
(let [entries (if (tx-entries-appliable? db forward)
|
||||
forward
|
||||
[(no-op-rebase-entry)])
|
||||
response (apply-batch-with-t! self t-before entries)
|
||||
state (assert-server-checksum-step! sql conn t-before checksum-before response
|
||||
(str "seed " seed " step " step " redo"))]
|
||||
(recur (inc step)
|
||||
(:t state)
|
||||
(:checksum state)
|
||||
(if (:advanced? state)
|
||||
(conj undo-stack {:forward forward :inverse inverse})
|
||||
undo-stack)
|
||||
(pop redo-stack)))
|
||||
(let [noop-response (apply-batch-with-t! self t-before [(no-op-rebase-entry)])
|
||||
noop-state (assert-server-checksum-step! sql conn t-before checksum-before noop-response
|
||||
(str "seed " seed " step " step " redo-noop"))]
|
||||
(recur (inc step) (:t noop-state) (:checksum noop-state) undo-stack redo-stack)))
|
||||
|
||||
:else
|
||||
(let [command (case op
|
||||
3 (make-insert-command rng db step)
|
||||
4 (make-random-move-command rng db step)
|
||||
5 (make-random-indent-command rng db step)
|
||||
6 (make-random-outdent-command rng db step)
|
||||
7 (make-title-command rng db step)
|
||||
8 {:forward [(make-random-delete-entry rng db)]
|
||||
:undoable? false}
|
||||
9 (make-random-move-command rng db step)
|
||||
10 (make-random-indent-command rng db step)
|
||||
{:forward [(no-op-rebase-entry)]
|
||||
:undoable? false})
|
||||
entries (if (tx-entries-appliable? db (:forward command))
|
||||
(:forward command)
|
||||
[(no-op-rebase-entry)])
|
||||
response (apply-batch-with-t! self t-before entries)
|
||||
state (assert-server-checksum-step! sql conn t-before checksum-before response
|
||||
(str "seed " seed " step " step " op " op))
|
||||
command-applied? (and (:undoable? command) (:advanced? state))
|
||||
next-undo (if command-applied?
|
||||
(conj undo-stack {:forward (:forward command)
|
||||
:inverse (:inverse command)})
|
||||
undo-stack)
|
||||
next-redo (if (:advanced? state) [] redo-stack)]
|
||||
(recur (inc step) (:t state) (:checksum state) next-undo next-redo))))))))))
|
||||
|
||||
(defn- seed-page-with-block-tree!
|
||||
[conn]
|
||||
(let [page-uuid (random-uuid)
|
||||
|
||||
@@ -13,8 +13,9 @@
|
||||
- Update current editing block for presence (omit or null to clear).
|
||||
- `{"type":"pull","since":<t>}`
|
||||
- Request txs after `since` (defaults to 0).
|
||||
- `{"type":"tx/batch","t-before":<t>,"txs":["<tx-transit>", ...]}`
|
||||
- `{"type":"tx/batch","t-before":<t>,"txs":[{"tx":"<tx-transit>","tx-id":"<uuid?>","outliner-op":"<keyword?>"}, ...]}`
|
||||
- Upload a batch of txs based on `t-before` (required).
|
||||
- `tx-id` is optional but recommended for per-entry ack/reject mapping.
|
||||
- `{"type":"ping"}`
|
||||
- Optional keepalive; server replies `pong`.
|
||||
|
||||
@@ -24,19 +25,20 @@
|
||||
- `{"type":"online-users","online-users":[{"user-id":"...","email":"...","username":"...","name":"..."}...]}`
|
||||
- Presence update
|
||||
- Optional `editing-block-uuid` indicates the block the user is editing.
|
||||
- `{"type":"pull/ok","t":<t>,"checksum":"<hex>","txs":[{"t":<t>,"tx":"<tx-transit>"}...]}`
|
||||
- `{"type":"pull/ok","t":<t>,"checksum":"<hex>","txs":[{"t":<t>,"tx":"<tx-transit>","outliner-op":"<keyword?>"}...]}`
|
||||
- Pull response with txs and post-apply entity checksum.
|
||||
- `{"type":"tx/batch/ok","t":<t>,"checksum":"<hex>"}`
|
||||
- Batch accepted; server advanced to t and returns the resulting entity checksum.
|
||||
- `{"type":"changed","t":<t>}`
|
||||
- Broadcast that server state advanced; client should pull.
|
||||
- Broadcast once after a handled `tx/batch` that advanced server state (`t` increased); client should pull.
|
||||
- `{"type":"tx/reject","reason":"stale","t":<t>}`
|
||||
- Client tx is based on stale t.
|
||||
- `{"type":"tx/reject","reason":"cycle","data":"<transit {:attr <kw> :server-values ...}>"}`
|
||||
- Cycle detected with server values.
|
||||
- `{"type":"tx/reject","reason":"db transact failed","t":<t>,"data":"<transit {:tx \"<tx-transit>\" :outliner-op ...}>"}`
|
||||
- Server-side transact/validation failed for one tx entry in the batch; `data` echoes the rejected tx entry for debugging.
|
||||
- `{"type":"tx/reject","reason":"empty tx data"|"invalid tx"|"invalid t-before"}`
|
||||
- `{"type":"tx/reject","reason":"db transact failed","t":<t>,"success-tx-ids":["<uuid>",...],"failed-tx-id":"<uuid>"}`
|
||||
- Server-side transact/validation failed for one tx entry in the batch.
|
||||
- `success-tx-ids` are entries already applied before the failure.
|
||||
- `failed-tx-id` is the entry that failed.
|
||||
- Legacy servers may return `data` with rejected tx payload for debugging.
|
||||
- `{"type":"tx/reject","reason":"empty tx data"|"invalid tx"|"invalid t-before"|"snapshot upload in progress"}`
|
||||
- Invalid batch.
|
||||
- `{"type":"pong"}`
|
||||
- Keepalive response.
|
||||
@@ -87,11 +89,11 @@
|
||||
- `GET /sync/:graph-id/health`
|
||||
- Health check. Response: `{"ok":true}`.
|
||||
- `GET /sync/:graph-id/pull?since=<t>`
|
||||
- Same as WS pull. Response: `{"type":"pull/ok","t":<t>,"checksum":"<hex>","txs":[{"t":<t>,"tx":"<tx-transit>"}...]}`.
|
||||
- Same as WS pull. Response: `{"type":"pull/ok","t":<t>,"checksum":"<hex>","txs":[{"t":<t>,"tx":"<tx-transit>","outliner-op":"<keyword?>"}...]}`.
|
||||
- Error response (400): `{"error":"invalid since"}`.
|
||||
- Error response (409): `{"error":"graph not ready"}` when bootstrap upload/import has not finished.
|
||||
- `POST /sync/:graph-id/tx/batch`
|
||||
- Same as WS tx/batch. Body: `{"t-before":<t>,"txs":["<tx-transit>", ...]}`.
|
||||
- Same as WS tx/batch. Body: `{"t-before":<t>,"txs":[{"tx":"<tx-transit>","tx-id":"<uuid?>","outliner-op":"<keyword?>"}, ...]}`.
|
||||
- Response: `{"type":"tx/batch/ok","t":<t>,"checksum":"<hex>"}` or `{"type":"tx/reject","reason":...}`.
|
||||
- Error response (400): `{"error":"missing body"|"invalid tx"}`.
|
||||
- Error response (409): `{"error":"graph not ready"}` when bootstrap upload/import has not finished.
|
||||
|
||||
Reference in New Issue
Block a user