enhance(rtc): generate rtc ops from migration-updates&tx-data

This commit is contained in:
rcmerci
2025-08-07 23:12:23 +08:00
parent 1712c0897a
commit e69f652041
6 changed files with 58 additions and 116 deletions

View File

@@ -256,20 +256,6 @@
(for [{:keys [graph-uuid graph-schema-version graph-status]} (:remote-graphs debug-state*)]
(shui/select-item {:value [graph-uuid graph-schema-version] :disabled (some? graph-status)} graph-uuid)))))]
[:div.pb-2.flex.flex-row.items-center.gap-2
(ui/button "Run server-migrations"
{:on-click (fn []
(let [repo (state/get-current-repo)]
(when-let [server-schema-version (:server-schema-version debug-state*)]
(state/<invoke-db-worker :thread-api/rtc-add-migration-client-ops
repo server-schema-version))))})
[:input.form-input.my-2.py-1.w-32
{:on-change (fn [e] (swap! debug-state assoc :server-schema-version (util/evalue e)))
:on-focus (fn [e] (let [v (.-value (.-target e))]
(when (= v "server migration start version here(e.g. \"64.2\")")
(set! (.-value (.-target e)) ""))))
:placeholder "server migration start version here(e.g. \"64.2\")"}]]
[:hr.my-2]
(let [*keys-state (get state ::keys-state)

View File

@@ -4,8 +4,9 @@
[clojure.walk :as walk]
[datascript.core :as d]
[datascript.impl.entity :as de]
[frontend.util :as util]
[frontend.worker-common.util :as worker-util]
[frontend.worker.rtc.client-op :as client-op]
[frontend.worker.db.rename-db-ident :as rename-db-ident]
[logseq.common.config :as common-config]
[logseq.common.util :as common-util]
[logseq.db :as ldb]
@@ -14,10 +15,8 @@
[logseq.db.frontend.property :as db-property]
[logseq.db.frontend.schema :as db-schema]
[logseq.db.sqlite.create-graph :as sqlite-create-graph]
[logseq.db.sqlite.util :as sqlite-util]
[frontend.worker.db.rename-db-ident :as rename-db-ident]))
[logseq.db.sqlite.util :as sqlite-util]))
;; TODO: fixes/rollback
;; Frontend migrations
;; ===================
@@ -384,6 +383,12 @@
(when (neg? compare-result)
(js/console.warn (str "Current db schema-version is " db-schema/version ", max available schema-version is " max-schema-version))))))
;;; some validations of schema-version->updates
(doseq [[version migrate-updates] schema-version->updates]
(when (contains? (set (keys migrate-updates)) :fix)
(assert (= 1 (count migrate-updates))
(util/format "migration(%s): :fix type cannot coexist with other types (:properties, :classes, :rename-db-idents) " version))))
(defn ensure-built-in-data-exists!
"Return tx-data"
[conn]
@@ -445,7 +450,7 @@
(defn- upgrade-version!
"Return tx-data"
[conn db-based? version {:keys [properties classes rename-db-idents fix]}]
[conn db-based? version {:keys [properties classes rename-db-idents fix] :as migrate-updates}]
(let [version (db-schema/parse-schema-version version)
db @conn
new-properties (->> (select-keys db-property/built-in-properties properties)
@@ -478,12 +483,12 @@
tx-data)
r (ldb/transact! conn tx-data' {:db-migrate? true})]
(println "DB schema migrated to" version)
r))
(assoc r :migrate-updates migrate-updates)))
(defn migrate
"Migrate 'frontend' datascript schema and data. To add a new migration,
add an entry to schema-version->updates and bump db-schema/version"
[repo conn]
[conn]
(when (ldb/db-based-graph? @conn)
(let [db @conn
version-in-db (db-schema/parse-schema-version (or (:kv/value (d/entity db :logseq.kv/schema-version)) 0))
@@ -504,17 +509,17 @@
(not (pos? (db-schema/compare-schema-version v* db-schema/version))))
[v updates])))
schema-version->updates)
*transact-result-coll (atom [])]
result-ks [:tx-data :db-before :db-after :migrate-updates]
*upgrade-result-coll (atom [])]
(println "DB schema migrated from" version-in-db)
(doseq [[v m] updates]
(let [r (upgrade-version! conn db-based? v m)]
(swap! *transact-result-coll conj (select-keys r [:tx-data :db-before :db-after]))))
;; (client-op/add-migration-datoms! repo version-in-db db-schema/version @*transact-result-coll)
(swap! *transact-result-coll conj
(select-keys (ensure-built-in-data-exists! conn) [:tx-data :db-before :db-after]))
(swap! *upgrade-result-coll conj (select-keys r result-ks))))
(swap! *upgrade-result-coll conj
(select-keys (ensure-built-in-data-exists! conn) result-ks))
{:from-version version-in-db
:to-version db-schema/version
:transact-result-coll @*transact-result-coll})
:upgrade-result-coll @*upgrade-result-coll})
(catch :default e
(prn :error (str "DB migration failed to migrate to " db-schema/version " from " version-in-db ":"))
(js/console.error e)

View File

@@ -29,7 +29,7 @@
[frontend.worker.rtc.client-op :as client-op]
[frontend.worker.rtc.core :as rtc.core]
[frontend.worker.rtc.db-listener]
[frontend.worker.rtc.gen-client-op :as gen-client-op]
[frontend.worker.rtc.migrate :as rtc-migrate]
[frontend.worker.search :as search]
[frontend.worker.shared-service :as shared-service]
[frontend.worker.state :as worker-state]
@@ -286,26 +286,16 @@
(d/reset-schema! client-ops-conn client-op/schema-in-db))
(when (and db-based? (not initial-data-exists?) (not datoms))
(let [config (or config "")
initial-data (sqlite-create-graph/build-db-initial-data config
(select-keys opts [:import-type :graph-git-sha]))]
initial-data (sqlite-create-graph/build-db-initial-data
config (select-keys opts [:import-type :graph-git-sha]))]
(d/transact! conn initial-data {:initial-db? true})))
(gc-sqlite-dbs! db client-ops-db conn {})
(let [migration-result (db-migrate/migrate repo conn)
transact-result-coll (:transact-result-coll migration-result)]
;; convert migration-result into rtc ops if it's a rtc-db-graph
(let [migration-result (db-migrate/migrate conn)]
(when (client-op/rtc-db-graph? repo)
(let [ops-coll
(for [{:keys [tx-data db-before db-after]} transact-result-coll]
(let [{:keys [same-entity-datoms-coll id->same-entity-datoms]}
(gen-client-op/group-datoms-by-entity tx-data)
e->a->add?->v->t (update-vals
id->same-entity-datoms
gen-client-op/entity-datoms=>a->add?->v->t)]
(gen-client-op/generate-rtc-ops db-before db-after same-entity-datoms-coll e->a->add?->v->t)))]
;; TODO: client-op/add-ops!
ops-coll)))
(let [client-ops (rtc-migrate/migration-results=>client-ops migration-result)]
(client-op/add-ops! repo client-ops))))
(db-listener/listen-db-changes! repo (get @*datascript-conns repo))))))

View File

@@ -164,17 +164,6 @@
ws-state (assoc :ws-state ws-state)))
(m/reductions {} nil ws-state-flow)))
(defn- add-migration-client-ops!
[repo db server-schema-version]
(when server-schema-version
(let [client-schema-version (ldb/get-graph-schema-version db)
added-ops (r.migrate/add-migration-client-ops! repo db server-schema-version client-schema-version)]
(when (seq added-ops)
(log/info :add-migration-client-ops
{:repo repo
:server-schema-version server-schema-version
:client-schema-version client-schema-version})))))
(defn- update-remote-schema-version!
[conn server-schema-version]
(when server-schema-version
@@ -252,7 +241,6 @@
(m/? get-ws-create-task)
(started-dfv true)
(update-remote-schema-version! conn @*server-schema-version)
(add-migration-client-ops! repo @conn @*server-schema-version)
(reset! *assets-sync-loop-canceler
(c.m/run-task :assets-sync-loop-task
assets-sync-loop-task))
@@ -653,11 +641,6 @@
[token graph-uuid schema-version]
(new-task--download-info-list token graph-uuid schema-version)))
(def-thread-api :thread-api/rtc-add-migration-client-ops
[repo server-schema-version]
(when-let [db @(worker-state/get-datascript-conn repo)]
(add-migration-client-ops! repo db server-schema-version)))
;;; ================ API (ends) ================
;;; subscribe state ;;;

View File

@@ -1,57 +1,38 @@
(ns frontend.worker.rtc.migrate
"migrate server data according to schema-version and client's migration-updates"
(:require [datascript.core :as d]
[frontend.worker.db.migrate :as db-migrate]
[frontend.worker.rtc.client-op :as client-op]
[frontend.worker.rtc.gen-client-op :as gen-client-op]
[logseq.db.frontend.schema :as db-schema]))
(:require [clojure.set :as set]
[datascript.core :as d]
[frontend.worker.rtc.gen-client-op :as gen-client-op]))
(defn- server-client-schema-version->migrations
[server-schema-version client-schema-version]
(when (neg? (db-schema/compare-schema-version server-schema-version client-schema-version))
(let [sorted-schema-version->updates
(->> (map (fn [[schema-version updates]]
[((juxt :major :minor) (db-schema/parse-schema-version schema-version))
updates])
db-migrate/schema-version->updates)
(sort-by first))]
(->> sorted-schema-version->updates
(drop-while (fn [[schema-version _updates]]
(not (neg? (db-schema/compare-schema-version server-schema-version schema-version)))))
(take-while (fn [[schema-version _updates]]
(not (neg? (db-schema/compare-schema-version client-schema-version schema-version)))))
(map second)))))
(defn migration-results=>client-ops
[{:keys [_from-version to-version upgrade-result-coll] :as _migration-result}]
(let [client-ops
(mapcat
(fn [{:keys [tx-data db-before db-after migrate-updates]}]
(cond
(:fix migrate-updates)
(let [{:keys [same-entity-datoms-coll id->same-entity-datoms]}
(gen-client-op/group-datoms-by-entity tx-data)
e->a->add?->v->t
(update-vals
id->same-entity-datoms
gen-client-op/entity-datoms=>a->add?->v->t)]
(gen-client-op/generate-rtc-ops db-before db-after same-entity-datoms-coll e->a->add?->v->t))
(defn- migration-updates->client-ops
"convert :classes, :properties from frontend.worker.db.migrate/schema-version->updates into client-ops"
[db client-schema-version migrate-updates]
(let [property-ks (mapcat :properties migrate-updates)
class-ks (mapcat :classes migrate-updates)
rename-db-idents (mapcat :rename-db-idents migrate-updates)
d-entity-fn (partial d/entity db)
new-property-entities (keep d-entity-fn property-ks)
new-class-entities (keep d-entity-fn class-ks)
client-ops (vec (concat (gen-client-op/generate-rtc-ops-from-property-entities new-property-entities)
(gen-client-op/generate-rtc-ops-from-class-entities new-class-entities)
(gen-client-op/generate-rtc-rename-db-ident-ops rename-db-idents)))
(empty? (set/difference (set (keys migrate-updates)) #{:properties :classes :rename-db-idents}))
(let [property-ks (:properties migrate-updates)
class-ks (:classes migrate-updates)
rename-db-idents (:rename-db-idents migrate-updates)
d-entity-fn (partial d/entity db-after)
new-property-entities (keep d-entity-fn property-ks)
new-class-entities (keep d-entity-fn class-ks)]
(concat (gen-client-op/generate-rtc-ops-from-property-entities new-property-entities)
(gen-client-op/generate-rtc-ops-from-class-entities new-class-entities)
(gen-client-op/generate-rtc-rename-db-ident-ops rename-db-idents)))))
upgrade-result-coll)
max-t (apply max 0 (map second client-ops))]
(conj client-ops
(conj (vec client-ops)
[:update-kv-value
max-t
{:db-ident :logseq.kv/schema-version
:value client-schema-version}])))
(defn add-migration-client-ops!
[repo db server-schema-version client-schema-version]
(assert (and server-schema-version client-schema-version))
(when-let [ops (not-empty
(some->> (server-client-schema-version->migrations server-schema-version client-schema-version)
(migration-updates->client-ops db client-schema-version)))]
(client-op/add-ops! repo ops)
ops))
(defn local-migrate-result-data=>remote-tx-data
"datoms-tx-data: [tx-data1, tx-data2, ...]
TODO:"
[transact-result-data]
transact-result-data)
:value to-version}])))

View File

@@ -7,13 +7,10 @@
[frontend.worker.rtc.migrate :as rtc-migrate]
[logseq.db :as ldb]))
(deftest ^:focus local-datoms-tx-data=>remote-tx-data-test
(deftest ^:focus migration-results=>client-ops
(let [db-transit (str (fs-node/readFileSync "src/test/migration/64.8.transit"))
db (ldb/read-transit-str db-transit)
conn (d/conn-from-db db)
transact-result-coll (db-migrate/migrate "rtc-migrate-test" conn)]
(pp/pprint (update transact-result-coll :transact-result-coll #(map :tx-data %)))
(let [remote-tx-data (rtc-migrate/local-migrate-result-data=>remote-tx-data
(:transact-result-coll transact-result-coll))]
;; (pp/pprint (map :tx-data remote-tx-data))
)))
migration-result (db-migrate/migrate conn)
client-ops (rtc-migrate/migration-results=>client-ops migration-result)]
(pp/pprint client-ops)))