fix: reduce memory usage when encrypting large graph

This commit is contained in:
Tienson Qin
2026-03-12 11:53:57 +08:00
parent d8e7032e78
commit 98e262d118
3 changed files with 211 additions and 40 deletions

View File

@@ -28,6 +28,7 @@
(defonce *repo->latest-remote-tx (atom {}))
(defonce *start-inflight-target (atom nil))
(defonce ^:private *upload-temp-opfs-pool (atom nil))
(defn- current-client
[repo]
@@ -171,7 +172,9 @@
opts))
(def ^:private max-asset-size (* 100 1024 1024))
(def ^:private upload-kvs-batch-size 500)
(def ^:private upload-kvs-batch-size 2000)
(def ^:private upload-prepare-datoms-batch-size 100000)
(def ^:private upload-temp-pool-name (worker-util/get-pool-name "upload-temp"))
(def ^:private snapshot-content-type "application/transit+json")
(def ^:private snapshot-content-encoding "gzip")
(def ^:private snapshot-text-encoder (js/TextEncoder.))
@@ -449,29 +452,65 @@
(-restore [_ addr]
(restore-data-from-addr db addr))))
(defn- create-temp-sqlite-db
(defn- <get-upload-temp-sqlite-pool
[]
(if-let [sqlite @worker-state/*sqlite]
(let [^js DB (.-DB ^js (.-oo1 sqlite))
db (new DB ":memory:" "c")]
(common-sqlite/create-kvs-table! db)
db)
(fail-fast :db-sync/missing-field {:field :sqlite})))
(if-let [pool @*upload-temp-opfs-pool]
(p/resolved pool)
(if-let [sqlite @worker-state/*sqlite]
(p/let [^js pool (.installOpfsSAHPoolVfs ^js sqlite #js {:name upload-temp-pool-name
:initialCapacity 20})]
(reset! *upload-temp-opfs-pool pool)
pool)
(fail-fast :db-sync/missing-field {:field :sqlite}))))
(defn- upload-temp-sqlite-path
[]
(str "/upload-" (random-uuid) ".sqlite"))
(defn- <create-temp-sqlite-db!
[]
(p/let [^js pool (<get-upload-temp-sqlite-pool)
capacity (.getCapacity pool)
_ (when (zero? capacity)
(.unpauseVfs pool))
path (upload-temp-sqlite-path)
^js db (new (.-OpfsSAHPoolDb pool) path)]
(common-sqlite/create-kvs-table! db)
{:db db
:path path
:pool pool}))
(defn- <create-temp-sqlite-conn
[schema datoms]
(p/let [db (create-temp-sqlite-db)
storage (new-temp-sqlite-storage db)
conn (d/conn-from-datoms datoms schema {:storage storage})]
{:db db
:conn conn}))
([schema]
(<create-temp-sqlite-conn schema []))
([schema datoms]
(p/let [{:keys [db path pool]} (<create-temp-sqlite-db!)
storage (new-temp-sqlite-storage db)
conn (d/conn-from-datoms datoms schema {:storage storage})]
{:db db
:conn conn
:path path
:pool pool})))
(defn- <remove-upload-temp-sqlite-db-file!
[path]
(-> (p/let [^js root (.getDirectory js/navigator.storage)
^js dir (.getDirectoryHandle root (str "." upload-temp-pool-name))]
(.removeEntry dir (subs path 1)))
(p/catch
(fn [error]
(if (= "NotFoundError" (.-name error))
nil
(p/rejected error))))))
(defn- cleanup-temp-sqlite!
[{:keys [db conn]}]
[{:keys [db conn path]}]
(when conn
(reset! conn nil))
(when db
(.close db)))
(.close db))
(when path
(<remove-upload-temp-sqlite-db-file! path)))
(defn- require-asset-field
[repo field value context]
@@ -1199,6 +1238,78 @@
[]
datoms)))))
(defn- <offload-large-titles-in-datoms-batch
[repo graph-id datoms aes-key]
(p/loop [remaining datoms
acc []]
(if (empty? remaining)
acc
(let [datom (first remaining)]
(if (and (= :block/title (:a datom))
(string? (:v datom))
(large-title? (:v datom)))
(p/let [obj (upload-large-title! repo graph-id (:v datom) aes-key)]
(p/recur (rest remaining)
(conj acc
(assoc datom :v "")
(assoc datom :a large-title-object-attr :v obj))))
(p/recur (rest remaining) (conj acc datom)))))))
(defn- take-upload-datoms-batch
[datoms batch-size]
(loop [batch (transient [])
remaining (seq datoms)
n 0]
(if (or (nil? remaining) (>= n batch-size))
[(persistent! batch) remaining]
(recur (conj! batch (first remaining))
(next remaining)
(inc n)))))
(defn- datom->tx
[datom]
[:db/add (:e datom) (:a datom) (:v datom)])
(defn- <process-upload-datoms-in-batches!
[datoms {:keys [batch-size process-batch-f progress-f]
:or {batch-size upload-prepare-datoms-batch-size}}]
(let [total-count (count datoms)]
(p/loop [remaining (seq datoms)
processed 0]
(if (seq remaining)
(let [[batch remaining'] (take-upload-datoms-batch remaining batch-size)
processed' (+ processed (count batch))]
(p/let [_ (process-batch-f batch)]
(when progress-f
(progress-f processed' total-count))
(p/let [_ (js/Promise. (fn [resolve] (js/setTimeout resolve 0)))]
(p/recur remaining' processed'))))
nil))))
(defn- <prepare-upload-temp-sqlite!
[repo graph-id source-conn aes-key update-progress]
(p/let [temp (<create-temp-sqlite-conn (d/schema @source-conn))
datoms (d/datoms @source-conn :eavt)
_ (prn :debug :datoms-count (count datoms) :time (js/Date.))
_ (<process-upload-datoms-in-batches!
datoms
{:process-batch-f
(fn [batch]
(p/let [datoms* (<offload-large-titles-in-datoms-batch repo graph-id batch aes-key)
encrypted-datoms (if aes-key
(sync-crypt/<encrypt-datoms aes-key datoms*)
datoms*)
tx-data (mapv datom->tx encrypted-datoms)]
(d/transact! (:conn temp) tx-data {:initial-db? true})
nil))
:progress-f
(fn [processed total]
(update-progress {:sub-type :upload-progress
:message (if aes-key
(str "Encrypting " processed "/" total)
(str "Preparing " processed "/" total))}))})]
temp))
(defn rehydrate-large-titles-from-db!
[repo graph-id]
(when-let [conn (worker-state/get-datascript-conn repo)]
@@ -1962,15 +2073,9 @@
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
(set-graph-sync-metadata! repo graph-e2ee?)
(ensure-client-graph-uuid! repo graph-id)
(p/let [datoms (d/datoms @source-conn :eavt)
_ (prn :debug :datoms-count (count datoms) :time (js/Date.))
datoms* (offload-large-titles-in-datoms repo graph-id datoms aes-key)
_ (update-progress {:sub-type :upload-progress
:message (if graph-e2ee? "Encrypting data" "Preparing data")})
encrypted-datoms (if graph-e2ee?
(sync-crypt/<encrypt-datoms aes-key datoms*)
datoms*)
{:keys [db] :as temp} (<create-temp-sqlite-conn (d/schema @source-conn) encrypted-datoms)
(p/let [_ (update-progress {:sub-type :upload-progress
:message (if graph-e2ee? "Encrypting 0/0" "Preparing 0/0")})
{:keys [db] :as temp} (<prepare-upload-temp-sqlite! repo graph-id source-conn aes-key update-progress)
total-rows (count-kvs-rows db)]
(->
(p/loop [last-addr -1

View File

@@ -504,21 +504,28 @@
(p/all (map #(<decrypt-snapshot-row aes-key %) rows-batch)))
(defn <encrypt-datoms
[aes-key datoms]
(let [batch-size 5000
batches (partition-all batch-size datoms)]
(p/loop [remaining batches
result []]
(if (empty? remaining)
result
(p/let [batch (first remaining)
encrypted (p/all (map (fn [datom]
(if (contains? sync-const/encrypt-attr-set (:a datom))
(p/let [v' (<encrypt-text-value aes-key (:v datom))]
(assoc datom :v v'))
(p/resolved datom)))
batch))]
(p/recur (rest remaining) (into result encrypted)))))))
([aes-key datoms]
(<encrypt-datoms aes-key datoms nil))
([aes-key datoms progress-f]
(let [batch-size 5000
total-count (count datoms)
batches (partition-all batch-size datoms)]
(p/loop [remaining batches
result []
encrypted-count 0]
(if (empty? remaining)
result
(p/let [batch (first remaining)
encrypted (p/all (map (fn [datom]
(if (contains? sync-const/encrypt-attr-set (:a datom))
(p/let [v' (<encrypt-text-value aes-key (:v datom))]
(assoc datom :v v'))
(p/resolved datom)))
batch))]
(let [encrypted-count' (+ encrypted-count (count batch))]
(when progress-f
(progress-f encrypted-count' total-count))
(p/recur (rest remaining) (into result encrypted) encrypted-count'))))))))
(defn- <re-encrypt-private-key
[encrypted-private-key-str old-password new-password]

View File

@@ -1,5 +1,6 @@
(ns frontend.worker.db-sync-test
(:require [cljs.test :refer [deftest is testing async]]
[clojure.string :as string]
[datascript.core :as d]
[frontend.common.crypt :as crypt]
[frontend.worker-common.util :as worker-util]
@@ -10,6 +11,7 @@
[frontend.worker.sync.crypt :as sync-crypt]
[logseq.common.config :as common-config]
[logseq.db :as ldb]
[logseq.db.common.sqlite :as common-sqlite]
[logseq.db.frontend.validate :as db-validate]
[logseq.db.sqlite.util :as sqlite-util]
[logseq.db.test.helper :as db-test]
@@ -987,6 +989,63 @@
(is (= tx-data result)))
(p/finally done))))))
(deftest upload-preparation-processes-datoms-in-batches-test
(testing "upload preparation should stream work batch by batch instead of sending the full graph at once"
(async done
(let [datoms [{:e 1 :a :block/title :v "a"}
{:e 2 :a :block/title :v "b"}
{:e 3 :a :block/title :v "c"}
{:e 4 :a :block/title :v "d"}
{:e 5 :a :block/title :v "e"}]
seen-batches (atom [])
progress-calls (atom [])]
(-> (p/let [_ (#'db-sync/<process-upload-datoms-in-batches!
datoms
{:batch-size 2
:process-batch-f (fn [batch]
(swap! seen-batches conj (mapv :e batch))
(p/resolved nil))
:progress-f (fn [processed total]
(swap! progress-calls conj [processed total]))})]
(is (= [[1 2] [3 4] [5]] @seen-batches))
(is (= [[2 5] [4 5] [5 5]] @progress-calls)))
(p/finally done))))))
(deftest create-temp-sqlite-db-uses-opfs-pool-test
(testing "temp upload db should use an OPFS-backed sqlite db instead of :memory:"
(async done
(let [opened-paths (atom [])]
(with-redefs [db-sync/<get-upload-temp-sqlite-pool
(fn []
(p/resolved
#js {:OpfsSAHPoolDb
(fn [path]
(swap! opened-paths conj path)
#js {:close (fn [] nil)})}))
logseq.db.common.sqlite/create-kvs-table! (fn [_] nil)]
(-> (p/let [{:keys [db path]} (#'db-sync/<create-temp-sqlite-db!)]
(is (some? db))
(is (= [path] @opened-paths))
(is (string/includes? path "upload-"))
(is (string/ends-with? path ".sqlite")))
(p/finally done)))))))
(deftest cleanup-temp-sqlite-removes-opfs-file-test
(testing "temp upload db cleanup should close the db and remove the temp OPFS file"
(async done
(let [closed? (atom false)
removed-paths (atom [])]
(with-redefs [db-sync/<remove-upload-temp-sqlite-db-file!
(fn [path]
(swap! removed-paths conj path)
(p/resolved nil))]
(-> (p/let [_ (#'db-sync/cleanup-temp-sqlite!
{:db #js {:close (fn [] (reset! closed? true))}
:path "/upload-temp.sqlite"})]
(is @closed?)
(is (= ["/upload-temp.sqlite"] @removed-paths)))
(p/finally done)))))))
(deftest ^:long upload-large-title-encrypts-transit-payload-test
(testing "encrypted large title uploads transit-encoded payload"
(async done