fix: clear old db when importing datoms for encrypted graphs

This commit is contained in:
Tienson Qin
2026-03-12 16:46:23 +08:00
parent 80b48b7b86
commit ae11f615a5
7 changed files with 126 additions and 105 deletions

View File

@@ -150,32 +150,29 @@
(if (.-done result)
{:chunk-count chunk-idx}
(p/let [chunk (->uint8 (.-value result))
_ (on-chunk chunk chunk-idx)]
_ (on-chunk chunk)]
(p/recur (inc chunk-idx)))))))
(p/let [chunk (<snapshot-response-bytes resp)]
(if (and chunk (pos? (.-byteLength chunk)))
(p/let [_ (on-chunk chunk 0)]
(p/let [_ (on-chunk chunk)]
{:chunk-count 1})
{:chunk-count 0}))))
(defn- <flush-row-batches!
[rows batch-size chunk-idx on-batch]
(p/loop [remaining rows
chunk-idx chunk-idx]
[rows batch-size on-batch]
(p/loop [remaining rows]
(if (>= (count remaining) batch-size)
(let [batch (subvec remaining 0 batch-size)
rest-rows (subvec remaining batch-size)]
(p/let [_ (on-batch batch chunk-idx)]
(p/recur rest-rows (inc chunk-idx))))
{:chunk-idx chunk-idx
:rows remaining})))
(p/let [_ (on-batch batch)]
(p/recur rest-rows)))
remaining)))
(defn- <stream-snapshot-row-batches!
[^js resp batch-size on-batch]
(if-let [stream (response-body-stream resp)]
(let [reader (.getReader stream)]
(p/loop [buffer nil
chunk-idx 0
pending []]
(p/let [result (.read reader)]
(if (.-done result)
@@ -183,18 +180,17 @@
(into pending (finalize-framed-buffer buffer))
pending)]
(if (seq pending)
(p/let [_ (on-batch pending chunk-idx)]
{:chunk-count (inc chunk-idx)})
{:chunk-count chunk-idx}))
(p/let [_ (on-batch pending)]
{:chunk-count 1})
{:chunk-count 0}))
(let [{rows :rows next-buffer :buffer} (parse-framed-chunk buffer (->uint8 (.-value result)))
pending (into pending rows)]
(p/let [{chunk-idx :chunk-idx
pending :rows} (<flush-row-batches! pending batch-size chunk-idx on-batch)]
(p/recur next-buffer chunk-idx pending)))))))
(p/let [pending (<flush-row-batches! pending batch-size on-batch)]
(p/recur next-buffer pending)))))))
(p/let [snapshot-bytes (<snapshot-response-bytes resp)
rows (vec (finalize-framed-buffer snapshot-bytes))]
(if (seq rows)
(p/let [_ (on-batch rows 0)]
(p/let [_ (on-batch rows)]
{:chunk-count 1})
{:chunk-count 0}))))
@@ -454,20 +450,19 @@
(p/let [_ (if @state/*db-worker
(<stream-snapshot-chunks!
resp
(fn [chunk chunk-idx]
(fn [chunk]
(p/let [import-id (ensure-import!)]
(state/<invoke-db-worker-direct-pass :thread-api/db-sync-import-framed-chunk
(Comlink/transfer chunk #js [(.-buffer chunk)])
chunk-idx
graph-uuid
import-id))))
(<stream-snapshot-row-batches!
resp
10000
(fn [rows chunk-idx]
(fn [rows]
(p/let [import-id (ensure-import!)]
(state/<invoke-db-worker :thread-api/db-sync-import-rows-chunk
rows chunk-idx nil graph-uuid import-id)))))
rows graph-uuid import-id)))))
_ (state/pub-event!
[:rtc/log {:type :rtc.log/download
:sub-type :download-completed

View File

@@ -19,4 +19,5 @@
(assoc v :db/ident k))))
db-schema/schema))]
(when (seq diffs)
(prn :debug ::check-and-fix-schema! :diffs diffs)
(d/transact! conn diffs))))

View File

@@ -47,7 +47,6 @@
[logseq.db.common.view :as db-view]
[logseq.db.frontend.class :as db-class]
[logseq.db.frontend.entity-util :as entity-util]
[logseq.db.frontend.property :as db-property]
[logseq.db.frontend.schema :as db-schema]
[logseq.db.sqlite.create-graph :as sqlite-create-graph]
[logseq.db.sqlite.export :as sqlite-export]
@@ -146,17 +145,6 @@
(.exec db "PRAGMA locking_mode=exclusive")
(.exec db "PRAGMA journal_mode=WAL"))
(defn- ensure-db-sync-import-db!
[repo reset?]
(if-let [sqlite @*sqlite]
(let [^js DB (.-DB ^js (.-oo1 sqlite))
^js db (new DB ":memory:" "c")]
(common-sqlite/create-kvs-table! db)
(when reset?
(.exec db "delete from kvs"))
db)
(db-sync/fail-fast :db-sync/missing-field {:repo repo :field :sqlite})))
(defn restore-data-from-addr
"Update sqlite-cli/restore-data-from-addr when making changes"
[db addr]
@@ -293,26 +281,25 @@
(let [conn (common-sqlite/get-storage-conn storage db-schema/schema)
_ (db-fix/check-and-fix-schema! conn)
_ (when datoms
(let [property-eids (into #{}
(comp (filter (fn [datom]
(and (= (:a datom) :db/ident)
(db-property/property? (:v datom)))))
(map :e))
datoms)
(let [ident-eids (into #{}
(comp (filter (fn [datom]
(= (:a datom) :db/ident)))
(map :e))
datoms)
to-tx (fn [d] [:db/add (:e d) (:a d) (:v d)])
batch-size 20000
prop-batches (->> datoms
(filter #(contains? property-eids (:e %)))
(map to-tx)
(partition-all batch-size))
_ (doseq [batch prop-batches]
(d/transact! conn (vec batch) {:initial-db? true}))
non-prop-batches (->> datoms
(remove #(contains? property-eids (:e %)))
(map to-tx)
(partition-all batch-size))]
(doseq [batch non-prop-batches]
(d/transact! conn (vec batch) {:initial-db? true}))))
ident-batches (->> datoms
(filter #(contains? ident-eids (:e %)))
(map to-tx)
(partition-all batch-size))
_ (doseq [batch ident-batches]
(d/transact! conn batch {:initial-db? true}))
non-ident-batches (->> datoms
(remove #(contains? ident-eids (:e %)))
(map to-tx)
(partition-all batch-size))]
(doseq [batch non-ident-batches]
(d/transact! conn batch {:initial-db? true}))))
client-ops-conn (when-not @*publishing? (common-sqlite/get-storage-conn
client-ops-storage
client-op/schema-in-db))
@@ -786,7 +773,7 @@
(throw error))))))
(def-thread-api :thread-api/db-sync-import-rows-chunk
[rows chunk-idx total-chunks graph-id import-id]
[rows graph-id import-id]
(-> (p/let [{:keys [db aes-key graph-e2ee?]} (require-import-state! nil graph-id import-id)
_ (import-snapshot-rows! db aes-key graph-e2ee? rows)]
(log-import-progress! graph-id import-id (count rows))
@@ -797,7 +784,7 @@
(throw error)))))
(def-thread-api :thread-api/db-sync-import-framed-chunk
[chunk chunk-idx graph-id import-id]
[chunk graph-id import-id]
(-> (p/let [{:keys [db aes-key graph-e2ee? snapshot-buffer]} (require-import-state! nil graph-id import-id)
{:keys [rows buffer]} (snapshot/parse-framed-chunk snapshot-buffer chunk)
_ (swap! *import-state
@@ -822,10 +809,19 @@
_ (when (seq rows)
(import-snapshot-rows! db aes-key graph-e2ee? rows))
_ (log-import-progress! graph-id import-id (count rows))
_ (.exec db "PRAGMA wal_checkpoint(2)")
datoms (when graph-e2ee?
(let [storage (new-sqlite-storage db)
conn (common-sqlite/get-storage-conn storage db-schema/schema)]
(vec (d/datoms @conn :eavt))))
_ (when-not graph-e2ee?
(.exec db "PRAGMA wal_checkpoint(2)"))
_ (.close db)
_ (when graph-e2ee?
(when-let [^js pool (worker-state/get-opfs-pool repo)]
(remove-vfs! pool)
(swap! *opfs-pools dissoc repo)))
_ (reset! *import-state nil)]
(import-datoms-to-db! repo graph-id remote-tx nil))
(import-datoms-to-db! repo graph-id remote-tx datoms))
(p/catch (fn [error]
(when-not (= :db-sync/stale-import (:type (ex-data error)))
(clear-import-state! import-id))

View File

@@ -1208,36 +1208,6 @@
:op :large-title-rehydrate})))
items)))))))
(defn- offload-large-titles-in-datoms
[repo graph-id datoms aes-key]
(let [needs-offload (filterv (fn [datom]
(and (= :block/title (:a datom))
(string? (:v datom))
(large-title? (:v datom))))
datoms)
offload-entities (into #{} (map :e) needs-offload)]
(if (empty? needs-offload)
(p/resolved datoms)
(p/let [offloaded (p/loop [remaining needs-offload
result {}]
(if (empty? remaining)
result
(let [datom (first remaining)]
(p/let [obj (upload-large-title! repo graph-id (:v datom) aes-key)]
(p/recur (rest remaining)
(assoc result (:e datom)
{:placeholder (assoc datom :v "")
:obj-datom (assoc datom :a large-title-object-attr :v obj)}))))))]
(reduce (fn [acc datom]
(if (contains? offload-entities (:e datom))
(if (= :block/title (:a datom))
(let [{:keys [placeholder obj-datom]} (get offloaded (:e datom))]
(-> acc (conj placeholder) (conj obj-datom)))
(conj acc datom))
(conj acc datom)))
[]
datoms)))))
(defn- <offload-large-titles-in-datoms-batch
[repo graph-id datoms aes-key]
(p/loop [remaining datoms

View File

@@ -241,7 +241,7 @@
(p/then (fn [_]
(is (= 3 (count @import-calls)))
(let [[prepare-op graph reset? graph-uuid graph-e2ee?] (first @import-calls)
[chunk-op imported-rows chunk-idx total-chunks chunk-graph-uuid import-id] (second @import-calls)
[chunk-op imported-rows chunk-graph-uuid import-id] (second @import-calls)
[finalize-op finalize-graph finalize-graph-uuid remote-tx finalize-import-id] (nth @import-calls 2)]
(is (= :thread-api/db-sync-import-prepare prepare-op))
(is (string/ends-with? graph "demo-graph"))
@@ -250,8 +250,6 @@
(is (= false graph-e2ee?))
(is (= :thread-api/db-sync-import-rows-chunk chunk-op))
(is (= rows imported-rows))
(is (= 0 chunk-idx))
(is (nil? total-chunks))
(is (= "graph-1" chunk-graph-uuid))
(is (= "import-1" import-id))
(is (= :thread-api/db-sync-import-finalize finalize-op))
@@ -315,7 +313,7 @@
(p/finally (fn [] (set! js/fetch original-fetch)))))
(p/then (fn [_]
(is (= 3 (count @import-calls)))
(let [[chunk-op imported-rows _ _ _ import-id] (second @import-calls)]
(let [[chunk-op imported-rows _ import-id] (second @import-calls)]
(is (= :thread-api/db-sync-import-rows-chunk chunk-op))
(is (= rows imported-rows))
(is (= "import-1" import-id)))
@@ -374,7 +372,7 @@
(p/finally (fn [] (set! js/fetch original-fetch)))))
(p/then (fn [_]
(is (= 3 (count @import-calls)))
(let [[chunk-op imported-rows _ _ _ import-id] (second @import-calls)]
(let [[chunk-op imported-rows _ import-id] (second @import-calls)]
(is (= :thread-api/db-sync-import-rows-chunk chunk-op))
(is (= rows imported-rows))
(is (= "import-1" import-id)))

View File

@@ -1022,7 +1022,7 @@
(fn [path]
(swap! opened-paths conj path)
#js {:close (fn [] nil)})}))
logseq.db.common.sqlite/create-kvs-table! (fn [_] nil)]
common-sqlite/create-kvs-table! (fn [_] nil)]
(-> (p/let [{:keys [db path]} (#'db-sync/<create-temp-sqlite-db!)]
(is (some? db))
(is (= [path] @opened-paths))

View File

@@ -1,5 +1,6 @@
(ns frontend.worker.db-worker-test
(:require [cljs.test :refer [async deftest is]]
[datascript.core :as d]
[frontend.common.thread-api :as thread-api]
[frontend.worker.a-test-env]
[frontend.worker.db-worker :as db-worker]
@@ -11,6 +12,7 @@
[frontend.worker.sync.crypt :as sync-crypt]
[frontend.worker.sync.log-and-state :as rtc-log-and-state]
[logseq.db.common.sqlite :as common-sqlite]
[logseq.db.frontend.schema :as db-schema]
[promesa.core :as p]))
(def ^:private test-repo "test-db-worker-repo")
@@ -59,25 +61,18 @@
(async done
(restoring-worker-state
(fn []
(let [search-resets (atom [])
search-db #js {:exec (fn [_] nil)
:close (fn [] nil)}
thread-apis-prev @thread-api/*thread-apis]
(let [thread-apis-prev @thread-api/*thread-apis]
(vreset! thread-api/*thread-apis
(assoc thread-apis-prev
:thread-api/create-or-open-db (fn [_repo _opts] (p/resolved nil))))
(-> (p/with-redefs [db-sync/rehydrate-large-titles-from-db! (fn [_repo _graph-id] (p/resolved nil))
rtc-log-and-state/rtc-log (fn [& _] nil)
worker-state/get-sqlite-conn (fn [_repo type]
(when (= type :search)
search-db))
search/truncate-table! (fn [db]
(swap! search-resets conj db))
worker-state/get-sqlite-conn (fn [_repo _type] nil)
client-op/update-local-tx (fn [& _] nil)
shared-service/broadcast-to-clients! (fn [& _] nil)]
(#'db-worker/import-datoms-to-db! test-repo "graph-1" 42 nil))
(p/then (fn [_]
(is (= [search-db] @search-resets))
(is true)
(vreset! thread-api/*thread-apis thread-apis-prev)
(done)))
(p/catch (fn [error]
@@ -174,10 +169,10 @@
db-worker/import-datoms-to-db! (fn [& _] (p/resolved nil))]
(p/let [first-import (prepare test-repo false "graph-1" false)
second-import (prepare test-repo false "graph-1" false)
stale-outcome (capture-outcome #(rows-chunk [[1 "content-1" "addresses-1"]] 0 1 "graph-1" (:import-id first-import)))]
stale-outcome (capture-outcome #(rows-chunk [[1 "content-1" "addresses-1"]] "graph-1" (:import-id first-import)))]
(is (= :db-sync/stale-import (some-> stale-outcome :error ex-data :type)))
(is (empty? @upserts))
(-> (rows-chunk [[2 "content-2" "addresses-2"]] 0 1 "graph-1" (:import-id second-import))
(-> (rows-chunk [[2 "content-2" "addresses-2"]] "graph-1" (:import-id second-import))
(p/then (fn [_]
(is (= 1 (count @upserts)))
(finalize test-repo "graph-1" 42 (:import-id second-import))))
@@ -202,7 +197,7 @@
(swap! upserts conj {:db db :binds binds}))
rtc-log-and-state/rtc-log (fn [& _] nil)]
(p/let [{:keys [import-id]} (prepare test-repo false "graph-1" false)
_ (rows-chunk rows 0 1 "graph-1" import-id)]
_ (rows-chunk rows "graph-1" import-id)]
(is (= 1 (count @upserts)))
(is (= (count rows) (count (:binds (first @upserts)))))
(done)))
@@ -232,7 +227,7 @@
(swap! upserts conj {:db db :binds binds}))
rtc-log-and-state/rtc-log (fn [& _] nil)]
(p/let [{:keys [import-id]} (prepare test-repo false "graph-1" true)
_ (rows-chunk rows 0 1 "graph-1" import-id)]
_ (rows-chunk rows "graph-1" import-id)]
(is (= 1 (count @decrypt-calls)))
(is (= rows (:rows (first @decrypt-calls))))
(is (= 1 (count @upserts)))
@@ -268,3 +263,69 @@
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(deftest db-sync-import-finalize-rebuilds-into-fresh-db-for-e2ee-import-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
removed (atom [])
captured (atom nil)
pool #js {:removeVfs (fn [] (swap! removed conj :removed))}
datoms [{:e 171 :a :block/name :v "$$$views" :tx 1 :added true}]
storage-conn (d/create-conn db-schema/schema)
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)]
(d/transact! storage-conn
(mapv (fn [{:keys [e a v]}]
[:db/add e a v])
datoms))
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:encrypted] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)
worker-state/get-opfs-pool (fn [_] pool)
sync-crypt/<fetch-graph-aes-key-for-download (fn [_] (p/resolved :aes-key))
common-sqlite/get-storage-conn (fn [_ _] storage-conn)
db-worker/import-datoms-to-db! (fn [& args]
(reset! captured args)
(p/resolved nil))]
(p/let [{:keys [import-id]} (prepare test-repo false "graph-1" true)
_ (finalize test-repo "graph-1" 42 import-id)]
(let [[repo graph-id remote-tx imported-datoms] @captured]
(is (= test-repo repo))
(is (= "graph-1" graph-id))
(is (= 42 remote-tx))
(is (= [[171 :block/name "$$$views"]]
(mapv (fn [d] [(:e d) (:a d) (:v d)]) imported-datoms))))
(is (= [:removed] @removed))
(is (nil? (get @worker-state/*opfs-pools test-repo)))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(deftest db-sync-import-finalize-keeps-direct-open-for-non-e2ee-import-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
removed (atom [])
captured (atom nil)
pool #js {:removeVfs (fn [] (swap! removed conj :removed))}
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)]
(reset! worker-state/*opfs-pools {test-repo pool})
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:plain] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)
db-worker/import-datoms-to-db! (fn [& args]
(reset! captured args)
(p/resolved nil))]
(p/let [{:keys [import-id]} (prepare test-repo false "graph-1" false)
_ (finalize test-repo "graph-1" 42 import-id)]
(is (= [test-repo "graph-1" 42 nil] @captured))
(is (empty? @removed))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))