From ae11f615a5af579a113d29245de4a36a01f4b4b8 Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Thu, 12 Mar 2026 16:46:23 +0800 Subject: [PATCH] fix: clear old db when importing datoms for encrypted graphs --- src/main/frontend/handler/db_based/sync.cljs | 37 ++++---- src/main/frontend/worker/db/fix.cljs | 1 + src/main/frontend/worker/db_worker.cljs | 64 +++++++------ src/main/frontend/worker/sync.cljs | 30 ------- .../frontend/handler/db_based/sync_test.cljs | 8 +- src/test/frontend/worker/db_sync_test.cljs | 2 +- src/test/frontend/worker/db_worker_test.cljs | 89 ++++++++++++++++--- 7 files changed, 126 insertions(+), 105 deletions(-) diff --git a/src/main/frontend/handler/db_based/sync.cljs b/src/main/frontend/handler/db_based/sync.cljs index 09f748f41b..ae6da48e28 100644 --- a/src/main/frontend/handler/db_based/sync.cljs +++ b/src/main/frontend/handler/db_based/sync.cljs @@ -150,32 +150,29 @@ (if (.-done result) {:chunk-count chunk-idx} (p/let [chunk (->uint8 (.-value result)) - _ (on-chunk chunk chunk-idx)] + _ (on-chunk chunk)] (p/recur (inc chunk-idx))))))) (p/let [chunk (= (count remaining) batch-size) (let [batch (subvec remaining 0 batch-size) rest-rows (subvec remaining batch-size)] - (p/let [_ (on-batch batch chunk-idx)] - (p/recur rest-rows (inc chunk-idx)))) - {:chunk-idx chunk-idx - :rows remaining}))) + (p/let [_ (on-batch batch)] + (p/recur rest-rows))) + remaining))) (defn- uint8 (.-value result))) pending (into pending rows)] - (p/let [{chunk-idx :chunk-idx - pending :rows} (> datoms - (filter #(contains? property-eids (:e %))) - (map to-tx) - (partition-all batch-size)) - _ (doseq [batch prop-batches] - (d/transact! conn (vec batch) {:initial-db? true})) - non-prop-batches (->> datoms - (remove #(contains? property-eids (:e %))) - (map to-tx) - (partition-all batch-size))] - (doseq [batch non-prop-batches] - (d/transact! conn (vec batch) {:initial-db? true})))) + ident-batches (->> datoms + (filter #(contains? ident-eids (:e %))) + (map to-tx) + (partition-all batch-size)) + _ (doseq [batch ident-batches] + (d/transact! conn batch {:initial-db? true})) + non-ident-batches (->> datoms + (remove #(contains? ident-eids (:e %))) + (map to-tx) + (partition-all batch-size))] + (doseq [batch non-ident-batches] + (d/transact! conn batch {:initial-db? true})))) client-ops-conn (when-not @*publishing? (common-sqlite/get-storage-conn client-ops-storage client-op/schema-in-db)) @@ -786,7 +773,7 @@ (throw error)))))) (def-thread-api :thread-api/db-sync-import-rows-chunk - [rows chunk-idx total-chunks graph-id import-id] + [rows graph-id import-id] (-> (p/let [{:keys [db aes-key graph-e2ee?]} (require-import-state! nil graph-id import-id) _ (import-snapshot-rows! db aes-key graph-e2ee? rows)] (log-import-progress! graph-id import-id (count rows)) @@ -797,7 +784,7 @@ (throw error))))) (def-thread-api :thread-api/db-sync-import-framed-chunk - [chunk chunk-idx graph-id import-id] + [chunk graph-id import-id] (-> (p/let [{:keys [db aes-key graph-e2ee? snapshot-buffer]} (require-import-state! nil graph-id import-id) {:keys [rows buffer]} (snapshot/parse-framed-chunk snapshot-buffer chunk) _ (swap! *import-state @@ -822,10 +809,19 @@ _ (when (seq rows) (import-snapshot-rows! db aes-key graph-e2ee? rows)) _ (log-import-progress! graph-id import-id (count rows)) - _ (.exec db "PRAGMA wal_checkpoint(2)") + datoms (when graph-e2ee? + (let [storage (new-sqlite-storage db) + conn (common-sqlite/get-storage-conn storage db-schema/schema)] + (vec (d/datoms @conn :eavt)))) + _ (when-not graph-e2ee? + (.exec db "PRAGMA wal_checkpoint(2)")) _ (.close db) + _ (when graph-e2ee? + (when-let [^js pool (worker-state/get-opfs-pool repo)] + (remove-vfs! pool) + (swap! *opfs-pools dissoc repo))) _ (reset! *import-state nil)] - (import-datoms-to-db! repo graph-id remote-tx nil)) + (import-datoms-to-db! repo graph-id remote-tx datoms)) (p/catch (fn [error] (when-not (= :db-sync/stale-import (:type (ex-data error))) (clear-import-state! import-id)) diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index 26610b952a..af1028b275 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -1208,36 +1208,6 @@ :op :large-title-rehydrate}))) items))))))) -(defn- offload-large-titles-in-datoms - [repo graph-id datoms aes-key] - (let [needs-offload (filterv (fn [datom] - (and (= :block/title (:a datom)) - (string? (:v datom)) - (large-title? (:v datom)))) - datoms) - offload-entities (into #{} (map :e) needs-offload)] - (if (empty? needs-offload) - (p/resolved datoms) - (p/let [offloaded (p/loop [remaining needs-offload - result {}] - (if (empty? remaining) - result - (let [datom (first remaining)] - (p/let [obj (upload-large-title! repo graph-id (:v datom) aes-key)] - (p/recur (rest remaining) - (assoc result (:e datom) - {:placeholder (assoc datom :v "") - :obj-datom (assoc datom :a large-title-object-attr :v obj)}))))))] - (reduce (fn [acc datom] - (if (contains? offload-entities (:e datom)) - (if (= :block/title (:a datom)) - (let [{:keys [placeholder obj-datom]} (get offloaded (:e datom))] - (-> acc (conj placeholder) (conj obj-datom))) - (conj acc datom)) - (conj acc datom))) - [] - datoms))))) - (defn- (p/let [{:keys [db path]} (#'db-sync/ (p/with-redefs [db-sync/rehydrate-large-titles-from-db! (fn [_repo _graph-id] (p/resolved nil)) rtc-log-and-state/rtc-log (fn [& _] nil) - worker-state/get-sqlite-conn (fn [_repo type] - (when (= type :search) - search-db)) - search/truncate-table! (fn [db] - (swap! search-resets conj db)) + worker-state/get-sqlite-conn (fn [_repo _type] nil) client-op/update-local-tx (fn [& _] nil) shared-service/broadcast-to-clients! (fn [& _] nil)] (#'db-worker/import-datoms-to-db! test-repo "graph-1" 42 nil)) (p/then (fn [_] - (is (= [search-db] @search-resets)) + (is true) (vreset! thread-api/*thread-apis thread-apis-prev) (done))) (p/catch (fn [error] @@ -174,10 +169,10 @@ db-worker/import-datoms-to-db! (fn [& _] (p/resolved nil))] (p/let [first-import (prepare test-repo false "graph-1" false) second-import (prepare test-repo false "graph-1" false) - stale-outcome (capture-outcome #(rows-chunk [[1 "content-1" "addresses-1"]] 0 1 "graph-1" (:import-id first-import)))] + stale-outcome (capture-outcome #(rows-chunk [[1 "content-1" "addresses-1"]] "graph-1" (:import-id first-import)))] (is (= :db-sync/stale-import (some-> stale-outcome :error ex-data :type))) (is (empty? @upserts)) - (-> (rows-chunk [[2 "content-2" "addresses-2"]] 0 1 "graph-1" (:import-id second-import)) + (-> (rows-chunk [[2 "content-2" "addresses-2"]] "graph-1" (:import-id second-import)) (p/then (fn [_] (is (= 1 (count @upserts))) (finalize test-repo "graph-1" 42 (:import-id second-import)))) @@ -202,7 +197,7 @@ (swap! upserts conj {:db db :binds binds})) rtc-log-and-state/rtc-log (fn [& _] nil)] (p/let [{:keys [import-id]} (prepare test-repo false "graph-1" false) - _ (rows-chunk rows 0 1 "graph-1" import-id)] + _ (rows-chunk rows "graph-1" import-id)] (is (= 1 (count @upserts))) (is (= (count rows) (count (:binds (first @upserts))))) (done))) @@ -232,7 +227,7 @@ (swap! upserts conj {:db db :binds binds})) rtc-log-and-state/rtc-log (fn [& _] nil)] (p/let [{:keys [import-id]} (prepare test-repo false "graph-1" true) - _ (rows-chunk rows 0 1 "graph-1" import-id)] + _ (rows-chunk rows "graph-1" import-id)] (is (= 1 (count @decrypt-calls))) (is (= rows (:rows (first @decrypt-calls)))) (is (= 1 (count @upserts))) @@ -268,3 +263,69 @@ (p/catch (fn [error] (is false (str error)) (done))))))))) + +(deftest db-sync-import-finalize-rebuilds-into-fresh-db-for-e2ee-import-test + (async done + (restoring-worker-state + (fn [] + (let [closed (atom []) + removed (atom []) + captured (atom nil) + pool #js {:removeVfs (fn [] (swap! removed conj :removed))} + datoms [{:e 171 :a :block/name :v "$$$views" :tx 1 :added true}] + storage-conn (d/create-conn db-schema/schema) + prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare) + finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)] + (d/transact! storage-conn + (mapv (fn [{:keys [e a v]}] + [:db/add e a v]) + datoms)) + (-> (p/with-redefs [db-worker/ (p/with-redefs [db-worker/