mirror of
https://github.com/logseq/logseq.git
synced 2026-05-28 14:39:48 +00:00
enhance(frontend): batch db-worker get-block fetch safely
This commit is contained in:
@@ -51,6 +51,87 @@
|
||||
(state/<invoke-db-worker :thread-api/get-bidirectional-properties (state/get-current-repo)
|
||||
{:target-id target-id})))
|
||||
|
||||
(defn- worker-get-blocks-requests
|
||||
[requests]
|
||||
(mapv (fn [{:keys [id opts]}]
|
||||
{:id id
|
||||
:opts (select-keys opts [:children? :properties :include-collapsed-children?])})
|
||||
requests))
|
||||
|
||||
(defn- <invoke-worker-get-blocks
|
||||
[graph requests]
|
||||
(p/let [result-transit-str
|
||||
(state/<invoke-db-worker-direct-pass :thread-api/get-blocks
|
||||
graph
|
||||
(ldb/write-transit-str requests))]
|
||||
(some-> result-transit-str ldb/read-transit-str)))
|
||||
|
||||
(defonce ^:private *get-blocks-batch-enabled? (atom true))
|
||||
|
||||
(defonce ^:private *get-blocks-batch-state
|
||||
(atom {:scheduled? false
|
||||
:queue []}))
|
||||
|
||||
(declare flush-get-blocks-batch!)
|
||||
|
||||
(defn- schedule-get-blocks-batch-flush!
|
||||
[]
|
||||
(let [should-schedule? (not (:scheduled? @*get-blocks-batch-state))]
|
||||
(when should-schedule?
|
||||
(swap! *get-blocks-batch-state assoc :scheduled? true)
|
||||
(util/schedule flush-get-blocks-batch!))))
|
||||
|
||||
(defn- enqueue-get-blocks-request!
|
||||
[graph request]
|
||||
(let [result (p/deferred)]
|
||||
(swap! *get-blocks-batch-state
|
||||
(fn [state]
|
||||
(update state :queue conj {:graph graph
|
||||
:request request
|
||||
:result result})))
|
||||
(schedule-get-blocks-batch-flush!)
|
||||
result))
|
||||
|
||||
(defn- resolve-batched-get-blocks!
|
||||
[entries responses]
|
||||
(doseq [[idx {:keys [result]}] (map-indexed vector entries)]
|
||||
(p/resolve! result (nth responses idx nil))))
|
||||
|
||||
(defn- reject-batched-get-blocks!
|
||||
[entries error]
|
||||
(doseq [{:keys [result]} entries]
|
||||
(p/reject! result error)))
|
||||
|
||||
(defn- flush-get-blocks-batch!
|
||||
[]
|
||||
(let [queue (:queue @*get-blocks-batch-state)]
|
||||
(swap! *get-blocks-batch-state
|
||||
(fn [state] (assoc state :scheduled? false :queue [])))
|
||||
(doseq [[graph entries] (group-by :graph queue)]
|
||||
(let [requests (->> entries (map :request) worker-get-blocks-requests)]
|
||||
(->
|
||||
(p/let [result (<invoke-worker-get-blocks graph requests)
|
||||
result (if (= (count result) (count requests))
|
||||
result
|
||||
nil)
|
||||
result (or result
|
||||
;; Safety fallback: retry once if response length is unexpected.
|
||||
(<invoke-worker-get-blocks graph requests))]
|
||||
(resolve-batched-get-blocks! entries result))
|
||||
(p/catch (fn [error]
|
||||
(reject-batched-get-blocks! entries error))))))))
|
||||
|
||||
(defn- <fetch-blocks-from-worker-batched
|
||||
[graph requests]
|
||||
(when (seq requests)
|
||||
(if @*get-blocks-batch-enabled?
|
||||
(-> (p/all (mapv #(enqueue-get-blocks-request! graph %) requests))
|
||||
(p/catch (fn [_]
|
||||
;; Fail-open: disable batching for this runtime and fall back to direct fetch.
|
||||
(reset! *get-blocks-batch-enabled? false)
|
||||
(<invoke-worker-get-blocks graph (worker-get-blocks-requests requests)))))
|
||||
(<invoke-worker-get-blocks graph (worker-get-blocks-requests requests)))))
|
||||
|
||||
(defn <get-block
|
||||
[graph id-uuid-or-name & {:keys [children? include-collapsed-children? skip-transact? skip-refresh? properties]
|
||||
:or {children? true}
|
||||
@@ -80,9 +161,7 @@
|
||||
|
||||
:else
|
||||
(->
|
||||
(p/let [result-transit-str (state/<invoke-db-worker-direct-pass :thread-api/get-blocks graph
|
||||
(ldb/write-transit-str [{:id id :opts opts}]))
|
||||
result (ldb/read-transit-str result-transit-str)
|
||||
(p/let [result (<fetch-blocks-from-worker-batched graph [{:id id :opts opts}])
|
||||
{:keys [block children]} (first result)]
|
||||
(when-not skip-transact?
|
||||
(let [conn (db/get-db graph false)
|
||||
@@ -110,19 +189,15 @@
|
||||
[graph ids* & {:as opts}]
|
||||
(let [ids (remove (fn [id] (:block.temp/load-status (db/entity id))) ids*)]
|
||||
(when (seq ids)
|
||||
(p/let [result-transit-str
|
||||
(state/<invoke-db-worker-direct-pass :thread-api/get-blocks graph
|
||||
(ldb/write-transit-str
|
||||
(map
|
||||
(fn [id]
|
||||
{:id id :opts (assoc opts :children? false)})
|
||||
ids)))
|
||||
result (ldb/read-transit-str result-transit-str)]
|
||||
(p/let [result (<fetch-blocks-from-worker-batched graph
|
||||
(mapv (fn [id]
|
||||
{:id id :opts (assoc opts :children? false)})
|
||||
ids))]
|
||||
(let [conn (db/get-db graph false)
|
||||
result' (map :block result)]
|
||||
result' (keep :block result)]
|
||||
(when (seq result')
|
||||
(d/transact! conn result'))
|
||||
result')))))
|
||||
result)))))
|
||||
|
||||
(defn <get-block-parents
|
||||
[graph id depth]
|
||||
|
||||
Reference in New Issue
Block a user