fix: sync migration txs (#12687)

* fix: sync migration txs

* fix: repair missing migration built-ins
This commit is contained in:
Tienson Qin
2026-05-20 11:31:06 +08:00
committed by GitHub
parent d68cbce311
commit 910a76fee0
2 changed files with 195 additions and 1 deletions

View File

@@ -44,6 +44,7 @@
[logseq.db.common.view :as db-view]
[logseq.db.frontend.class :as db-class]
[logseq.db.frontend.entity-util :as entity-util]
[logseq.db.frontend.property :as db-property]
[logseq.db.frontend.schema :as db-schema]
[logseq.db.sqlite.create-graph :as sqlite-create-graph]
[logseq.db.sqlite.export :as sqlite-export]
@@ -407,6 +408,87 @@
{:persist-op? false
:skip-validate-db? true}))))
(defn- handle-migrate-result-local-txs!
[repo migrate-result]
(doseq [tx-report (:upgrade-result-coll migrate-result)]
(db-sync/handle-local-tx! repo tx-report)))
(def ^:private built-in-sync-repair-tx-id
#uuid "00000000-0000-4000-8000-652665286528")
(def ^:private built-in-sync-repair-properties
[:logseq.property.repeat/repeat-type
:logseq.property.comments/blocks])
(def ^:private built-in-sync-repair-classes
[:logseq.class/Comments
:logseq.class/Comment])
;; Fixed so duplicate repair txs from multiple clients converge on the same datoms.
(def ^:private built-in-sync-repair-timestamp 0)
(defn- stable-built-in-sync-repair-item
[order item]
(if (and (map? item) (:block/uuid item))
(assoc item
:block/created-at built-in-sync-repair-timestamp
:block/updated-at built-in-sync-repair-timestamp
:block/order order)
item))
(defn- built-in-sync-repair-tx-data
[]
(let [properties built-in-sync-repair-properties
new-properties (->> (select-keys db-property/built-in-properties properties)
sqlite-create-graph/build-properties
(map (fn [b] (assoc b :logseq.property/built-in? true))))
new-classes (->> (select-keys db-class/built-in-classes built-in-sync-repair-classes)
(#(sqlite-create-graph/build-initial-classes* % (zipmap properties properties)))
(map (fn [b] (assoc b :logseq.property/built-in? true))))
new-class-idents (keep (fn [class]
(when-let [db-ident (:db/ident class)]
{:db/ident db-ident}))
new-classes)
tx-data (vec (concat new-class-idents new-properties new-classes))
block-item-count (count (filter #(and (map? %) (:block/uuid %)) tx-data))
orders (db-order/gen-n-keys block-item-count nil nil :max-key-atom (atom nil))
*orders (atom orders)]
(mapv (fn [item]
(stable-built-in-sync-repair-item
(when (and (map? item) (:block/uuid item))
(let [order (first @*orders)]
(swap! *orders rest)
order))
item))
tx-data)))
(defn- enqueue-built-in-sync-repair!
[repo]
(when-not (client-op/get-local-tx-entry repo built-in-sync-repair-tx-id)
(let [{:keys [should-inc-pending?]}
(client-op/upsert-local-tx-entry!
repo
{:tx-id built-in-sync-repair-tx-id
:created-at 0
:pending? true
:failed? false
:outliner-op :fix
:undo-redo :none
:forward-outliner-ops []
:inverse-outliner-ops []
:inferred-outliner-ops? false
:normalized-tx-data (built-in-sync-repair-tx-data)
:reversed-tx-data []})]
(when should-inc-pending?
(client-op/adjust-pending-local-tx-count! repo 1)))))
(defn- maybe-enqueue-built-in-sync-repair!
[repo conn migrate-result initial-data-exists?]
(when (and (nil? migrate-result)
initial-data-exists?
(true? (:kv/value (d/entity @conn :logseq.kv/graph-remote?))))
(enqueue-built-in-sync-repair! repo)))
(defn- <create-or-open-db!
[repo {:keys [config datoms sync-download-graph? creating-remote-graph?] :as opts}]
(when creating-remote-graph?
@@ -469,7 +551,10 @@
(ldb/transact! conn initial-data
{:initial-db? true})))]
(when-not sync-download-graph?
(db-migrate/migrate conn)
(let [migrate-result (db-migrate/migrate conn)]
(if migrate-result
(handle-migrate-result-local-txs! repo migrate-result)
(maybe-enqueue-built-in-sync-repair! repo conn migrate-result initial-data-exists?)))
(gc-sqlite-dbs! db conn {})
(maybe-run-recycle-gc! conn))

View File

@@ -285,6 +285,115 @@
(reset! *service old-service)
(done))))))))
(deftest handle-migrate-result-local-txs-enqueues-upgrade-reports-test
(let [conn (d/create-conn db-schema/schema)
migration-tx-report {:tx-data [[:db/add 1 :block/title "Migrated"]]
:db-before @conn
:db-after @conn
:tx-meta {:db-migrate? true}}
handled-reports (atom [])]
(with-redefs [db-sync/handle-local-tx! (fn [repo tx-report]
(swap! handled-reports conj
{:repo repo
:tx-report tx-report}))]
(#'db-core/handle-migrate-result-local-txs!
test-repo
{:upgrade-result-coll [migration-tx-report]})
(is (= [{:repo test-repo
:tx-report migration-tx-report}]
@handled-reports)))))
(deftest built-in-sync-repair-tx-data-covers-65-26-through-65-28-test
(let [tx-data (#'db-core/built-in-sync-repair-tx-data)
tx-data-again (#'db-core/built-in-sync-repair-tx-data)
idents (set (keep :db/ident tx-data))]
(is (= tx-data tx-data-again)
"The repair is stable when multiple clients send it")
(is (every? idents
[:logseq.property.repeat/repeat-type
:logseq.property.comments/blocks
:logseq.class/Comments
:logseq.class/Comment]))
(is (every?
(fn [item]
(or (not (and (map? item) (:block/uuid item)))
(and (number? (:block/created-at item))
(number? (:block/updated-at item))
(string? (:block/order item))
(db-order/validate-order-key? (:block/order item)))))
tx-data)
"Block-shaped repair items keep valid timestamps and order")
(is (not-any?
(fn [item]
(or (and (map? item)
(contains? item :logseq.property.comments/blocks))
(and (vector? item)
(= :logseq.property.comments/blocks (nth item 2 nil)))))
tx-data)
"65.29 comment target data is intentionally not included")))
(deftest enqueue-built-in-sync-repair-queues-pending-fix-once-test
(let [upserts (atom [])
count-adjustments (atom [])]
(with-redefs [client-op/get-local-tx-entry (fn [_repo _tx-id] nil)
client-op/upsert-local-tx-entry! (fn [repo entry]
(swap! upserts conj {:repo repo
:entry entry})
{:should-inc-pending? true})
client-op/adjust-pending-local-tx-count! (fn [repo delta]
(swap! count-adjustments conj [repo delta]))]
(#'db-core/enqueue-built-in-sync-repair! test-repo)
(let [{:keys [repo entry]} (first @upserts)]
(is (= test-repo repo))
(is (uuid? (:tx-id entry)))
(is (= 0 (:created-at entry)))
(is (true? (:pending? entry)))
(is (false? (:failed? entry)))
(is (= :fix (:outliner-op entry)))
(is (seq (:normalized-tx-data entry)))))
(is (= [[test-repo 1]] @count-adjustments))
(reset! upserts [])
(reset! count-adjustments [])
(with-redefs [client-op/get-local-tx-entry (fn [_repo _tx-id] {:tx-id (random-uuid)})
client-op/upsert-local-tx-entry! (fn [_repo _entry]
(swap! upserts conj :unexpected))
client-op/adjust-pending-local-tx-count! (fn [_repo _delta]
(swap! count-adjustments conj :unexpected))]
(#'db-core/enqueue-built-in-sync-repair! test-repo)
(is (empty? @upserts))
(is (empty? @count-adjustments)))))
(deftest maybe-enqueue-built-in-sync-repair-only-for-migrated-remote-graphs-test
(let [remote-conn (d/create-conn db-schema/schema)
local-conn (d/create-conn db-schema/schema)
upserts (atom [])]
(d/transact! remote-conn [{:db/ident :logseq.kv/graph-remote?
:kv/value true}])
(d/transact! local-conn [{:db/ident :logseq.kv/graph-remote?
:kv/value false}])
(with-redefs [client-op/get-local-tx-entry (fn [_repo _tx-id] nil)
client-op/upsert-local-tx-entry! (fn [repo entry]
(swap! upserts conj {:repo repo
:entry entry})
{:should-inc-pending? false})
client-op/adjust-pending-local-tx-count! (fn [_repo _delta] nil)]
(#'db-core/maybe-enqueue-built-in-sync-repair! test-repo remote-conn nil true)
(is (= 1 (count @upserts)))
(reset! upserts [])
(#'db-core/maybe-enqueue-built-in-sync-repair! test-repo remote-conn {:upgrade-result-coll []} true)
(is (empty? @upserts)
"Real migration tx reports are uploaded instead of repair txs")
(#'db-core/maybe-enqueue-built-in-sync-repair! test-repo local-conn nil true)
(is (empty? @upserts)
"Local-only graphs do not need server repair")
(#'db-core/maybe-enqueue-built-in-sync-repair! test-repo remote-conn nil false)
(is (empty? @upserts)
"New graphs rely on initial upload data"))))
(deftest search-build-blocks-indice-in-worker-reports-progress-to-main-thread-test
(async done
(->