fix: large graph download OOM

This commit is contained in:
Tienson Qin
2026-01-28 11:58:54 +08:00
parent 303c8b19c3
commit 1b4ad6a416
4 changed files with 62 additions and 86 deletions

View File

@@ -270,10 +270,8 @@
total' (+ total (count rows))
total-rows' (into total-rows rows)]
(when (seq total-rows')
(p/do!
(state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
graph total-rows' true graph-uuid)
(state/<invoke-db-worker :thread-api/db-sync-finalize-kvs-import graph remote-tx)))
(state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
graph total-rows' true graph-uuid remote-tx))
total')
(let [value (.-value chunk)
{:keys [rows buffer]} (parse-framed-chunk buffer value)

View File

@@ -201,7 +201,7 @@
(when-not (= coerced invalid-coerce)
coerced))))
(defn- fail-fast [tag data]
(defn fail-fast [tag data]
(log/error tag data)
(throw (ex-info (name tag) data)))
@@ -469,7 +469,7 @@
(swap! *repo->aes-key assoc repo aes-key)
aes-key)))))
(defn- <fetch-graph-aes-key-for-download
(defn <fetch-graph-aes-key-for-download
[repo graph-id]
(let [base (e2ee-base)]
(when-not (and (string? base) (string? graph-id))
@@ -590,30 +590,21 @@
[e a v' t])
(p/resolved [e a v t]))) keys)))
(defn- <decrypt-snapshot-rows
[aes-key rows]
(if-not (seq rows)
(p/resolved [])
(p/let [items (p/all
(mapv (fn [[addr content addresses]]
(let [data (ldb/read-transit-str content)]
(p/let [keys' (if (map? data) ; node
(<decrypt-keys-attrs aes-key (:keys data))
;; leaf
(p/let [result (p/all (map #(<decrypt-keys-attrs aes-key %) data))]
(vec result)))
data' (if (map? data) (assoc data :keys keys') keys')
content' (ldb/write-transit-str data')]
[addr content' addresses])))
rows))]
(vec items))))
(defn- <decrypt-snapshot-row
[aes-key [addr content addresses]]
(p/let [data (ldb/read-transit-str content)
keys' (if (map? data)
(<decrypt-keys-attrs aes-key (:keys data))
(p/let [result (p/all (map #(<decrypt-keys-attrs aes-key %) data))]
;; if you truly need a vector:
(vec result)))
data' (if (map? data) (assoc data :keys keys') keys')
content' (ldb/write-transit-str data')]
[addr content' addresses]))
(defn <decrypt-kvs-rows
[repo graph-id rows]
(p/let [aes-key (<fetch-graph-aes-key-for-download repo graph-id)
_ (when (nil? aes-key)
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
(<decrypt-snapshot-rows aes-key rows)))
(defn <decrypt-snapshot-rows-batch
[aes-key rows-batch]
(p/all (map #(<decrypt-snapshot-row aes-key %) rows-batch)))
(defn- <encrypt-datoms
[aes-key datoms]

View File

@@ -58,6 +58,7 @@
[logseq.outliner.core :as outliner-core]
[logseq.outliner.op :as outliner-op]
[me.tonsky.persistent-sorted-set :as set :refer [BTSet]]
[medley.core :as medley]
[missionary.core :as m]
[promesa.core :as p]))
@@ -69,7 +70,6 @@
(defonce *client-ops-conns worker-state/*client-ops-conns)
(defonce *opfs-pools worker-state/*opfs-pools)
(defonce *publishing? (atom false))
(defonce *db-sync-import-dbs (atom {}))
(defn- check-worker-scope!
[]
@@ -138,16 +138,14 @@
(defn- ensure-db-sync-import-db!
[repo reset?]
(p/let [^js pool (<get-opfs-pool repo)
^js db (or (get @*db-sync-import-dbs repo)
(let [^js db (new (.-OpfsSAHPoolDb pool) repo-path)]
(swap! *db-sync-import-dbs assoc repo db)
db))]
(enable-sqlite-wal-mode! db)
(common-sqlite/create-kvs-table! db)
(when reset?
(.exec db "delete from kvs"))
db))
(if-let [sqlite @*sqlite]
(let [^js DB (.-DB ^js (.-oo1 sqlite))
^js db (new DB ":memory:" "c")]
(common-sqlite/create-kvs-table! db)
(when reset?
(.exec db "delete from kvs"))
db)
(db-sync/fail-fast :db-sync/missing-field {:repo repo :field :sqlite})))
(defn restore-data-from-addr
"Update sqlite-cli/restore-data-from-addr when making changes"
@@ -219,16 +217,6 @@
(d/reset-conn! conn new-db' {:reset-conn! true})
(d/reset-schema! conn (:schema new-db)))))
(defn reset-db-from-datoms!
[repo datoms]
(p/do!
((@thread-api/*thread-apis :thread-api/create-or-open-db) repo
{:close-other-db? false
:datoms datoms
:db-sync-download-graph? true})
((@thread-api/*thread-apis :thread-api/export-db) repo)
(shared-service/broadcast-to-clients! :add-repo {:repo repo})))
(defn- get-dbs
[repo]
(if @*publishing?
@@ -614,47 +602,46 @@
(reset-db! repo db-transit)
nil)
(def-thread-api :thread-api/db-sync-reset-from-datoms
[repo datoms]
(reset-db-from-datoms! repo datoms)
nil)
(def-thread-api :thread-api/db-sync-import-kvs-rows
[repo rows reset? graph-id]
(p/let [_ (when reset?
(close-db! repo))
rows* (db-sync/<decrypt-kvs-rows repo graph-id rows)
db (ensure-db-sync-import-db! repo reset?)]
(when (seq rows*)
(upsert-addr-content! db (rows->sqlite-binds rows*)))
nil))
(def-thread-api :thread-api/db-sync-finalize-kvs-import
[repo remote-tx]
(-> (p/let [^js db (get @*db-sync-import-dbs repo)]
(.close db)
(swap! *db-sync-import-dbs dissoc repo)
((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? true})
(let [conn (worker-state/get-datascript-conn repo)
datoms (d/datoms @conn :eavt)
new-conn (d/conn-from-datoms datoms (:schema @conn))
new-db (update @new-conn :eavt (fn [^BTSet s]
(set! (.-storage s) (.-storage (:eavt @conn)))
s))]
(d/reset-conn! conn new-db {:rtc-tx? true}))
((@thread-api/*thread-apis :thread-api/export-db) repo)
(client-op/update-local-tx repo remote-tx)
(shared-service/broadcast-to-clients! :add-repo {:repo repo}))
(p/catch (fn [error]
(js/console.error error)))))
(def-thread-api :thread-api/unsafe-unlink-db
(defn- <unlink-db!
[repo]
(p/let [pool (<get-opfs-pool repo)
_ (close-db! repo)
_result (remove-vfs! pool)]
nil))
(def-thread-api :thread-api/unsafe-unlink-db
[repo]
(<unlink-db! repo))
(defn- import-datoms-to-db!
[repo remote-tx datoms]
(-> (p/do!
((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? true
:datoms datoms})
((@thread-api/*thread-apis :thread-api/export-db) repo)
(client-op/update-local-tx repo remote-tx)
(shared-service/broadcast-to-clients! :add-repo {:repo repo}))
(p/catch (fn [error]
(js/console.error error)))))
(def-thread-api :thread-api/db-sync-import-kvs-rows
[repo rows reset? graph-id remote-tx]
(p/let [_ (when reset? (close-db! repo))
db (ensure-db-sync-import-db! repo reset?)
aes-key (db-sync/<fetch-graph-aes-key-for-download repo graph-id)
_ (when (nil? aes-key)
(db-sync/fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
batches (medley/indexed (partition-all 100 rows))]
;; sequential batches: low memory
(p/doseq [batch batches]
(p/let [dec-rows (db-sync/<decrypt-snapshot-rows-batch aes-key batch)]
(upsert-addr-content! db (rows->sqlite-binds dec-rows))))
(let [storage (new-sqlite-storage db)
conn (common-sqlite/get-storage-conn storage db-schema/schema)
datoms (vec (d/datoms @conn :eavt))]
(.close db)
(import-datoms-to-db! repo remote-tx datoms))))
(def-thread-api :thread-api/release-access-handles
[repo]
(when-let [^js pool (worker-state/get-opfs-pool repo)]

View File

@@ -456,7 +456,7 @@
"Compute extra tx-data and block/refs, should ensure it's a pure function and
doesn't call `d/transact!` or `ldb/transact!`."
[{:keys [db-after tx-meta _tx-data] :as tx-report}]
(when-not (:temp-conn? tx-meta)
(when-not (or (:temp-conn? tx-meta) (:sync-download-graph? tx-meta))
(let [extra-tx-data (compute-extra-tx-data tx-report)
tx-report* (if (seq extra-tx-data)
(let [result (d/with db-after extra-tx-data)]