diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index ceb392ceb1..0ed3b3ffc8 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -500,6 +500,12 @@ (= :block/uuid (first x))) (second x))) +(defn- batched-remote-tx-data? + [tx-data*] + (and (seq tx-data*) + (sequential? (first tx-data*)) + (sequential? (first (first tx-data*))))) + (defn- drop-anonymous-temp-entity-datoms "Drop malformed temp entities from remote txs. A temp entity must declare one identity attr (:block/uuid or :db/ident) @@ -1062,79 +1068,82 @@ (defn- apply-remote-tx! [repo client tx-data*] - (if-let [conn (worker-state/get-datascript-conn repo)] - (let [tx-data (->> tx-data* - (db-normalize/remove-retract-entity-ref @conn) - (#(drop-anonymous-temp-entity-datoms @conn %))) - local-txs (pending-txs repo) - reversed-tx-data (get-reverse-tx-data local-txs) - has-local-changes? (seq reversed-tx-data) - *remote-tx-report (atom nil) - *reversed-tx-report (atom nil) - *remote-deleted-ids (atom #{}) - *rebase-tx-data (atom []) - db @conn - remote-deleted-blocks (->> tx-data - (keep (fn [item] - (when (= :db/retractEntity (first item)) - (d/entity db (second item)))))) - remote-deleted-block-ids (set (map :block/uuid remote-deleted-blocks)) - safe-remote-tx-data (->> tx-data - (remove (fn [item] - (or (= :db/retractEntity (first item)) - (contains? remote-deleted-block-ids (get-lookup-id (last item)))))) - seq) - temp-tx-meta {:rtc-tx? true - :temp-conn? true - :gen-undo-ops? false - :persist-op? false} - apply-context {:conn conn - :local-txs local-txs - :reversed-tx-data reversed-tx-data - :safe-remote-tx-data safe-remote-tx-data - :remote-deleted-blocks remote-deleted-blocks - :remote-deleted-block-ids remote-deleted-block-ids - :temp-tx-meta temp-tx-meta - :*remote-tx-report *remote-tx-report - :*reversed-tx-report *reversed-tx-report - :*remote-deleted-ids *remote-deleted-ids - :*rebase-tx-data *rebase-tx-data} - tx-report (if has-local-changes? - (apply-remote-tx-with-local-changes! apply-context) - (apply-remote-tx-without-local-changes! apply-context)) - remote-tx-report @*remote-tx-report] - ;; persist rebase tx to client ops - (when has-local-changes? - (when-let [tx-data (seq @*rebase-tx-data)] - (let [remote-tx-data-set (set tx-data*) - normalized (->> tx-data - (normalize-tx-data (:db-after tx-report) - (or (:db-after remote-tx-report) - (:db-after @*reversed-tx-report))) - (remove (fn [[op _e a]] - (and (= op :db/retract) - (contains? #{:block/updated-at :block/created-at :block/title} a))))) - normalized-tx-data (remove remote-tx-data-set normalized) - reversed-datoms (reverse-tx-data tx-data)] - ;; (prn :debug :normalized-tx-data normalized-tx-data) - ;; (prn :debug :remote-tx-data tx-data*) - ;; (prn :debug :diff (data/diff remote-tx-data-set - ;; (set normalized))) - (when (seq normalized-tx-data) - (persist-local-tx! repo normalized-tx-data reversed-datoms {:op :rtc-rebase})))) - (remove-pending-txs! repo (map :tx-id local-txs))) + (if (batched-remote-tx-data? tx-data*) + (doseq [tx-data tx-data*] + (apply-remote-tx! repo client tx-data)) + (if-let [conn (worker-state/get-datascript-conn repo)] + (let [tx-data (->> tx-data* + (db-normalize/remove-retract-entity-ref @conn) + (#(drop-anonymous-temp-entity-datoms @conn %))) + local-txs (pending-txs repo) + reversed-tx-data (get-reverse-tx-data local-txs) + has-local-changes? (seq reversed-tx-data) + *remote-tx-report (atom nil) + *reversed-tx-report (atom nil) + *remote-deleted-ids (atom #{}) + *rebase-tx-data (atom []) + db @conn + remote-deleted-blocks (->> tx-data + (keep (fn [item] + (when (= :db/retractEntity (first item)) + (d/entity db (second item)))))) + remote-deleted-block-ids (set (map :block/uuid remote-deleted-blocks)) + safe-remote-tx-data (->> tx-data + (remove (fn [item] + (or (= :db/retractEntity (first item)) + (contains? remote-deleted-block-ids (get-lookup-id (last item)))))) + seq) + temp-tx-meta {:rtc-tx? true + :temp-conn? true + :gen-undo-ops? false + :persist-op? false} + apply-context {:conn conn + :local-txs local-txs + :reversed-tx-data reversed-tx-data + :safe-remote-tx-data safe-remote-tx-data + :remote-deleted-blocks remote-deleted-blocks + :remote-deleted-block-ids remote-deleted-block-ids + :temp-tx-meta temp-tx-meta + :*remote-tx-report *remote-tx-report + :*reversed-tx-report *reversed-tx-report + :*remote-deleted-ids *remote-deleted-ids + :*rebase-tx-data *rebase-tx-data} + tx-report (if has-local-changes? + (apply-remote-tx-with-local-changes! apply-context) + (apply-remote-tx-without-local-changes! apply-context)) + remote-tx-report @*remote-tx-report] + ;; persist rebase tx to client ops + (when has-local-changes? + (when-let [tx-data (seq @*rebase-tx-data)] + (let [remote-tx-data-set (set tx-data*) + normalized (->> tx-data + (normalize-tx-data (:db-after tx-report) + (or (:db-after remote-tx-report) + (:db-after @*reversed-tx-report))) + (remove (fn [[op _e a]] + (and (= op :db/retract) + (contains? #{:block/updated-at :block/created-at :block/title} a))))) + normalized-tx-data (remove remote-tx-data-set normalized) + reversed-datoms (reverse-tx-data tx-data)] + ;; (prn :debug :normalized-tx-data normalized-tx-data) + ;; (prn :debug :remote-tx-data tx-data*) + ;; (prn :debug :diff (data/diff remote-tx-data-set + ;; (set normalized))) + (when (seq normalized-tx-data) + (persist-local-tx! repo normalized-tx-data reversed-datoms {:op :rtc-rebase})))) + (remove-pending-txs! repo (map :tx-id local-txs))) - (when-let [*inflight (:inflight client)] - (reset! *inflight [])) + (when-let [*inflight (:inflight client)] + (reset! *inflight [])) - (-> (rehydrate-large-titles! repo {:tx-data tx-data - :graph-id (:graph-id client)}) - (p/catch (fn [error] - (log/error :db-sync/large-title-rehydrate-failed - {:repo repo :error error})))) + (-> (rehydrate-large-titles! repo {:tx-data tx-data + :graph-id (:graph-id client)}) + (p/catch (fn [error] + (log/error :db-sync/large-title-rehydrate-failed + {:repo repo :error error})))) - (reset! *remote-tx-report nil)) - (fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx}))) + (reset! *remote-tx-report nil)) + (fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))) (defn- handle-message! [repo client raw] (let [message (-> raw parse-message coerce-ws-server-message)] @@ -1170,23 +1179,23 @@ (reset! (:inflight client) []) (flush-pending! repo client)) ;; Download response - ;; Merge batch txs to one tx, does it really work? We'll see "pull/ok" (when (> remote-tx local-tx) (let [txs (:txs message) _ (require-non-negative remote-tx {:repo repo :type "pull/ok"}) _ (require-seq txs {:repo repo :type "pull/ok" :field :txs}) txs-data (mapv (fn [data] (parse-transit (:tx data) {:repo repo :type "pull/ok"})) - txs) - tx (distinct (mapcat identity txs-data))] - (when (seq tx) + txs)] + (when (seq txs-data) (p/let [aes-key (sync-crypt/> (filter (fn [{:keys [t]}] (> t since)) txs) - (mapcat :tx)))) + (mapv :tx)))) (defn- server-upload! [server t-before tx-data] (swap! server @@ -233,10 +233,10 @@ server-t (:t @server)] ;; (prn :debug :repo repo :local-tx local-tx :server-t server-t) (when (< local-tx server-t) - (let [tx (server-pull server local-tx)] + (let [txs (server-pull server local-tx)] ;; (prn :debug :apply-remote-tx :repo repo ;; :tx tx) - (#'db-sync/apply-remote-tx! repo client tx) + (#'db-sync/apply-remote-tx! repo client txs) (client-op/update-local-tx repo server-t) (reset! progress? true))) (let [pending (#'db-sync/pending-txs repo) diff --git a/src/test/frontend/worker/db_sync_test.cljs b/src/test/frontend/worker/db_sync_test.cljs index 4d8c98254b..268fa8685b 100644 --- a/src/test/frontend/worker/db_sync_test.cljs +++ b/src/test/frontend/worker/db_sync_test.cljs @@ -176,6 +176,52 @@ (reset! db-sync/*repo->latest-remote-tx latest-prev) (done)))))))))) +(deftest pull-ok-batched-txs-preserve-tempid-boundaries-test + (testing "pull/ok applies tx batches without cross-tx tempid collisions" + (async done + (let [{:keys [conn client-ops-conn parent]} (setup-parent-child) + page-uuid (:block/uuid (:block/page parent)) + block-uuid-a (random-uuid) + block-uuid-b (random-uuid) + now 1760000000000 + tx-a (sqlite-util/write-transit-str + [[:db/add -1 :block/uuid block-uuid-a] + [:db/add -1 :block/title "remote-a"] + [:db/add -1 :block/parent [:block/uuid page-uuid]] + [:db/add -1 :block/page [:block/uuid page-uuid]] + [:db/add -1 :block/order 1] + [:db/add -1 :block/updated-at now] + [:db/add -1 :block/created-at now]]) + tx-b (sqlite-util/write-transit-str + [[:db/add -1 :block/uuid block-uuid-b] + [:db/add -1 :block/title "remote-b"] + [:db/add -1 :block/parent [:block/uuid page-uuid]] + [:db/add -1 :block/page [:block/uuid page-uuid]] + [:db/add -1 :block/order 2] + [:db/add -1 :block/updated-at now] + [:db/add -1 :block/created-at now]]) + raw-message (js/JSON.stringify + (clj->js {:type "pull/ok" + :t 2 + :txs [{:t 1 :tx tx-a} + {:t 2 :tx tx-b}]})) + latest-prev @db-sync/*repo->latest-remote-tx + client {:repo test-repo + :graph-id "graph-1" + :inflight (atom []) + :online-users (atom []) + :ws-state (atom :open)}] + (with-datascript-conns conn client-ops-conn + (fn [] + (reset! db-sync/*repo->latest-remote-tx {}) + (-> (p/let [_ (client-op/update-local-tx test-repo 0) + _ (#'db-sync/handle-message! test-repo client raw-message)] + (is (= "remote-a" (:block/title (d/entity @conn [:block/uuid block-uuid-a])))) + (is (= "remote-b" (:block/title (d/entity @conn [:block/uuid block-uuid-b]))))) + (p/finally (fn [] + (reset! db-sync/*repo->latest-remote-tx latest-prev) + (done)))))))))) + (deftest reaction-add-enqueues-pending-sync-tx-test (testing "adding a reaction should enqueue tx for db-sync" (let [{:keys [conn client-ops-conn parent]} (setup-parent-child)]