fix(rtc): isolate e2ee import temp pool

This commit is contained in:
Tienson Qin
2026-03-12 17:42:47 +08:00
parent 1b4943350b
commit 880d7d2c3f
2 changed files with 34 additions and 14 deletions

View File

@@ -96,6 +96,14 @@
(swap! *opfs-pools assoc graph pool)
pool))))
(defn- <create-db-sync-import-pool
[repo import-id]
(if-let [sqlite @*sqlite]
(let [pool-name (worker-util/get-pool-name (str repo "-db-sync-import-" import-id))]
(.installOpfsSAHPoolVfs ^js sqlite #js {:name pool-name
:initialCapacity 20}))
(<get-opfs-pool repo)))
(defn- init-sqlite-module!
[]
(when-not @*sqlite
@@ -390,7 +398,7 @@
(defn- remove-vfs!
[^js pool]
(when pool
(when (fn? (some-> pool .-removeVfs))
(.removeVfs ^js pool)))
(defn- get-search-db
@@ -686,10 +694,14 @@
:import-id import-id}))
(defn- close-import-state!
[{:keys [db]}]
[{:keys [db import-pool]}]
(when db
(try
(.close db)
(catch :default _)))
(when import-pool
(try
(remove-vfs! import-pool)
(catch :default _))))
(defn- clear-import-state!
@@ -741,35 +753,42 @@
(def-thread-api :thread-api/db-sync-import-prepare
[repo reset? graph-id graph-e2ee? & [total-rows]]
(let [graph-e2ee? (if (nil? graph-e2ee?) true (true? graph-e2ee?))
opened-db (atom nil)]
opened-db (atom nil)
opened-import-pool (atom nil)]
(-> (p/let [_ (when-let [state @*import-state]
(close-import-state! state))
_ (reset! *import-state nil)
_ (when reset? (close-db! repo))
_ (when reset? (<invalidate-search-db! repo))
import-id (str (random-uuid))
aes-key (when graph-e2ee?
(sync-crypt/<fetch-graph-aes-key-for-download graph-id))
_ (when (and graph-e2ee? (nil? aes-key))
(db-sync/fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
^js pool (<get-opfs-pool repo)
^js pool (if graph-e2ee?
(<create-db-sync-import-pool repo import-id)
(<get-opfs-pool repo))
_ (when graph-e2ee?
(reset! opened-import-pool pool))
^js db (new (.-OpfsSAHPoolDb pool) repo-path)
_ (reset! opened-db db)
_ (common-sqlite/create-kvs-table! db)
_ (enable-sqlite-wal-mode! db)
_ (when reset? (.exec db "DELETE FROM kvs"))
import-id (str (random-uuid))]
_ (when reset? (.exec db "DELETE FROM kvs"))]
(reset! *import-state {:aes-key aes-key
:db db
:graph-e2ee? graph-e2ee?
:graph-id graph-id
:import-id import-id
:imported-rows 0
:import-pool @opened-import-pool
:repo repo
:snapshot-buffer nil
:total-rows total-rows})
{:import-id import-id})
(p/catch (fn [error]
(close-import-state! {:db @opened-db})
(close-import-state! {:db @opened-db
:import-pool @opened-import-pool})
(throw error))))))
(def-thread-api :thread-api/db-sync-import-rows-chunk
@@ -803,7 +822,7 @@
(def-thread-api :thread-api/db-sync-import-finalize
[repo graph-id remote-tx import-id]
(-> (p/let [{:keys [db aes-key graph-e2ee? snapshot-buffer]} (require-import-state! repo graph-id import-id)
(-> (p/let [{:keys [db aes-key graph-e2ee? import-pool snapshot-buffer]} (require-import-state! repo graph-id import-id)
rows (when (and snapshot-buffer (pos? (.-byteLength snapshot-buffer)))
(snapshot/finalize-framed-buffer snapshot-buffer))
_ (when (seq rows)
@@ -816,12 +835,14 @@
(d/datoms source-db :eavt)))
_ (when-not graph-e2ee?
(.exec db "PRAGMA wal_checkpoint(2)"))
_ (when-not graph-e2ee?
(.close db))
result (import-datoms-to-db! repo graph-id remote-tx datoms)
_ (.close db)
_ (when graph-e2ee?
(when-let [^js pool (worker-state/get-opfs-pool repo)]
(remove-vfs! pool)
(swap! *opfs-pools dissoc repo)))
(.close db))
_ (when graph-e2ee?
(when import-pool
(remove-vfs! import-pool)))
_ (reset! *import-state nil)]
result)
(p/catch (fn [error]

View File

@@ -276,6 +276,7 @@
storage-conn (d/create-conn db-schema/schema)
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)]
(reset! worker-state/*opfs-pools {test-repo pool})
(d/transact! storage-conn
(mapv (fn [{:keys [e a v]}]
[:db/add e a v])
@@ -283,7 +284,6 @@
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:encrypted] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)
worker-state/get-opfs-pool (fn [_] pool)
sync-crypt/<fetch-graph-aes-key-for-download (fn [_] (p/resolved :aes-key))
common-sqlite/get-storage-conn (fn [_ _] storage-conn)
db-worker/import-datoms-to-db! (fn [& args]
@@ -297,7 +297,6 @@
(is (= 42 remote-tx))
(is (= [[171 :block/name "$$$views"]]
(mapv (fn [d] [(:e d) (:a d) (:v d)]) imported-datoms))))
(is (= [:removed] @removed))
(is (nil? (get @worker-state/*opfs-pools test-repo)))
(done)))
(p/catch (fn [error]