test version of non-batch apply-remote-tx

This commit is contained in:
Tienson Qin
2026-02-26 21:15:08 +08:00
parent dd4b38276a
commit 08213ceeb9
3 changed files with 136 additions and 81 deletions

View File

@@ -500,6 +500,12 @@
(= :block/uuid (first x)))
(second x)))
(defn- batched-remote-tx-data?
[tx-data*]
(and (seq tx-data*)
(sequential? (first tx-data*))
(sequential? (first (first tx-data*)))))
(defn- drop-anonymous-temp-entity-datoms
"Drop malformed temp entities from remote txs.
A temp entity must declare one identity attr (:block/uuid or :db/ident)
@@ -1062,79 +1068,82 @@
(defn- apply-remote-tx!
[repo client tx-data*]
(if-let [conn (worker-state/get-datascript-conn repo)]
(let [tx-data (->> tx-data*
(db-normalize/remove-retract-entity-ref @conn)
(#(drop-anonymous-temp-entity-datoms @conn %)))
local-txs (pending-txs repo)
reversed-tx-data (get-reverse-tx-data local-txs)
has-local-changes? (seq reversed-tx-data)
*remote-tx-report (atom nil)
*reversed-tx-report (atom nil)
*remote-deleted-ids (atom #{})
*rebase-tx-data (atom [])
db @conn
remote-deleted-blocks (->> tx-data
(keep (fn [item]
(when (= :db/retractEntity (first item))
(d/entity db (second item))))))
remote-deleted-block-ids (set (map :block/uuid remote-deleted-blocks))
safe-remote-tx-data (->> tx-data
(remove (fn [item]
(or (= :db/retractEntity (first item))
(contains? remote-deleted-block-ids (get-lookup-id (last item))))))
seq)
temp-tx-meta {:rtc-tx? true
:temp-conn? true
:gen-undo-ops? false
:persist-op? false}
apply-context {:conn conn
:local-txs local-txs
:reversed-tx-data reversed-tx-data
:safe-remote-tx-data safe-remote-tx-data
:remote-deleted-blocks remote-deleted-blocks
:remote-deleted-block-ids remote-deleted-block-ids
:temp-tx-meta temp-tx-meta
:*remote-tx-report *remote-tx-report
:*reversed-tx-report *reversed-tx-report
:*remote-deleted-ids *remote-deleted-ids
:*rebase-tx-data *rebase-tx-data}
tx-report (if has-local-changes?
(apply-remote-tx-with-local-changes! apply-context)
(apply-remote-tx-without-local-changes! apply-context))
remote-tx-report @*remote-tx-report]
;; persist rebase tx to client ops
(when has-local-changes?
(when-let [tx-data (seq @*rebase-tx-data)]
(let [remote-tx-data-set (set tx-data*)
normalized (->> tx-data
(normalize-tx-data (:db-after tx-report)
(or (:db-after remote-tx-report)
(:db-after @*reversed-tx-report)))
(remove (fn [[op _e a]]
(and (= op :db/retract)
(contains? #{:block/updated-at :block/created-at :block/title} a)))))
normalized-tx-data (remove remote-tx-data-set normalized)
reversed-datoms (reverse-tx-data tx-data)]
;; (prn :debug :normalized-tx-data normalized-tx-data)
;; (prn :debug :remote-tx-data tx-data*)
;; (prn :debug :diff (data/diff remote-tx-data-set
;; (set normalized)))
(when (seq normalized-tx-data)
(persist-local-tx! repo normalized-tx-data reversed-datoms {:op :rtc-rebase}))))
(remove-pending-txs! repo (map :tx-id local-txs)))
(if (batched-remote-tx-data? tx-data*)
(doseq [tx-data tx-data*]
(apply-remote-tx! repo client tx-data))
(if-let [conn (worker-state/get-datascript-conn repo)]
(let [tx-data (->> tx-data*
(db-normalize/remove-retract-entity-ref @conn)
(#(drop-anonymous-temp-entity-datoms @conn %)))
local-txs (pending-txs repo)
reversed-tx-data (get-reverse-tx-data local-txs)
has-local-changes? (seq reversed-tx-data)
*remote-tx-report (atom nil)
*reversed-tx-report (atom nil)
*remote-deleted-ids (atom #{})
*rebase-tx-data (atom [])
db @conn
remote-deleted-blocks (->> tx-data
(keep (fn [item]
(when (= :db/retractEntity (first item))
(d/entity db (second item))))))
remote-deleted-block-ids (set (map :block/uuid remote-deleted-blocks))
safe-remote-tx-data (->> tx-data
(remove (fn [item]
(or (= :db/retractEntity (first item))
(contains? remote-deleted-block-ids (get-lookup-id (last item))))))
seq)
temp-tx-meta {:rtc-tx? true
:temp-conn? true
:gen-undo-ops? false
:persist-op? false}
apply-context {:conn conn
:local-txs local-txs
:reversed-tx-data reversed-tx-data
:safe-remote-tx-data safe-remote-tx-data
:remote-deleted-blocks remote-deleted-blocks
:remote-deleted-block-ids remote-deleted-block-ids
:temp-tx-meta temp-tx-meta
:*remote-tx-report *remote-tx-report
:*reversed-tx-report *reversed-tx-report
:*remote-deleted-ids *remote-deleted-ids
:*rebase-tx-data *rebase-tx-data}
tx-report (if has-local-changes?
(apply-remote-tx-with-local-changes! apply-context)
(apply-remote-tx-without-local-changes! apply-context))
remote-tx-report @*remote-tx-report]
;; persist rebase tx to client ops
(when has-local-changes?
(when-let [tx-data (seq @*rebase-tx-data)]
(let [remote-tx-data-set (set tx-data*)
normalized (->> tx-data
(normalize-tx-data (:db-after tx-report)
(or (:db-after remote-tx-report)
(:db-after @*reversed-tx-report)))
(remove (fn [[op _e a]]
(and (= op :db/retract)
(contains? #{:block/updated-at :block/created-at :block/title} a)))))
normalized-tx-data (remove remote-tx-data-set normalized)
reversed-datoms (reverse-tx-data tx-data)]
;; (prn :debug :normalized-tx-data normalized-tx-data)
;; (prn :debug :remote-tx-data tx-data*)
;; (prn :debug :diff (data/diff remote-tx-data-set
;; (set normalized)))
(when (seq normalized-tx-data)
(persist-local-tx! repo normalized-tx-data reversed-datoms {:op :rtc-rebase}))))
(remove-pending-txs! repo (map :tx-id local-txs)))
(when-let [*inflight (:inflight client)]
(reset! *inflight []))
(when-let [*inflight (:inflight client)]
(reset! *inflight []))
(-> (rehydrate-large-titles! repo {:tx-data tx-data
:graph-id (:graph-id client)})
(p/catch (fn [error]
(log/error :db-sync/large-title-rehydrate-failed
{:repo repo :error error}))))
(-> (rehydrate-large-titles! repo {:tx-data tx-data
:graph-id (:graph-id client)})
(p/catch (fn [error]
(log/error :db-sync/large-title-rehydrate-failed
{:repo repo :error error}))))
(reset! *remote-tx-report nil))
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
(reset! *remote-tx-report nil))
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx}))))
(defn- handle-message! [repo client raw]
(let [message (-> raw parse-message coerce-ws-server-message)]
@@ -1170,23 +1179,23 @@
(reset! (:inflight client) [])
(flush-pending! repo client))
;; Download response
;; Merge batch txs to one tx, does it really work? We'll see
"pull/ok" (when (> remote-tx local-tx)
(let [txs (:txs message)
_ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
_ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
txs-data (mapv (fn [data]
(parse-transit (:tx data) {:repo repo :type "pull/ok"}))
txs)
tx (distinct (mapcat identity txs-data))]
(when (seq tx)
txs)]
(when (seq txs-data)
(p/let [aes-key (sync-crypt/<ensure-graph-aes-key repo (:graph-id client))
_ (when (and (sync-crypt/graph-e2ee? repo) (nil? aes-key))
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
tx* (if aes-key
(sync-crypt/<decrypt-tx-data aes-key tx)
(p/resolved tx))]
(apply-remote-tx! repo client tx*)
tx-batches (if aes-key
(p/all (mapv (fn [tx-data]
(sync-crypt/<decrypt-tx-data aes-key tx-data))
txs-data))
(p/resolved txs-data))]
(apply-remote-tx! repo client tx-batches)
(client-op/update-local-tx repo remote-tx)
(broadcast-rtc-state! client)
(flush-pending! repo client)))))

View File

@@ -206,7 +206,7 @@
(defn- server-pull [server since]
(let [{:keys [txs]} @server]
(->> (filter (fn [{:keys [t]}] (> t since)) txs)
(mapcat :tx))))
(mapv :tx))))
(defn- server-upload! [server t-before tx-data]
(swap! server
@@ -233,10 +233,10 @@
server-t (:t @server)]
;; (prn :debug :repo repo :local-tx local-tx :server-t server-t)
(when (< local-tx server-t)
(let [tx (server-pull server local-tx)]
(let [txs (server-pull server local-tx)]
;; (prn :debug :apply-remote-tx :repo repo
;; :tx tx)
(#'db-sync/apply-remote-tx! repo client tx)
(#'db-sync/apply-remote-tx! repo client txs)
(client-op/update-local-tx repo server-t)
(reset! progress? true)))
(let [pending (#'db-sync/pending-txs repo)

View File

@@ -176,6 +176,52 @@
(reset! db-sync/*repo->latest-remote-tx latest-prev)
(done))))))))))
(deftest pull-ok-batched-txs-preserve-tempid-boundaries-test
(testing "pull/ok applies tx batches without cross-tx tempid collisions"
(async done
(let [{:keys [conn client-ops-conn parent]} (setup-parent-child)
page-uuid (:block/uuid (:block/page parent))
block-uuid-a (random-uuid)
block-uuid-b (random-uuid)
now 1760000000000
tx-a (sqlite-util/write-transit-str
[[:db/add -1 :block/uuid block-uuid-a]
[:db/add -1 :block/title "remote-a"]
[:db/add -1 :block/parent [:block/uuid page-uuid]]
[:db/add -1 :block/page [:block/uuid page-uuid]]
[:db/add -1 :block/order 1]
[:db/add -1 :block/updated-at now]
[:db/add -1 :block/created-at now]])
tx-b (sqlite-util/write-transit-str
[[:db/add -1 :block/uuid block-uuid-b]
[:db/add -1 :block/title "remote-b"]
[:db/add -1 :block/parent [:block/uuid page-uuid]]
[:db/add -1 :block/page [:block/uuid page-uuid]]
[:db/add -1 :block/order 2]
[:db/add -1 :block/updated-at now]
[:db/add -1 :block/created-at now]])
raw-message (js/JSON.stringify
(clj->js {:type "pull/ok"
:t 2
:txs [{:t 1 :tx tx-a}
{:t 2 :tx tx-b}]}))
latest-prev @db-sync/*repo->latest-remote-tx
client {:repo test-repo
:graph-id "graph-1"
:inflight (atom [])
:online-users (atom [])
:ws-state (atom :open)}]
(with-datascript-conns conn client-ops-conn
(fn []
(reset! db-sync/*repo->latest-remote-tx {})
(-> (p/let [_ (client-op/update-local-tx test-repo 0)
_ (#'db-sync/handle-message! test-repo client raw-message)]
(is (= "remote-a" (:block/title (d/entity @conn [:block/uuid block-uuid-a]))))
(is (= "remote-b" (:block/title (d/entity @conn [:block/uuid block-uuid-b])))))
(p/finally (fn []
(reset! db-sync/*repo->latest-remote-tx latest-prev)
(done))))))))))
(deftest reaction-add-enqueues-pending-sync-tx-test
(testing "adding a reaction should enqueue tx for db-sync"
(let [{:keys [conn client-ops-conn parent]} (setup-parent-child)]