Sync db worker changes back to the main ui thread

This commit is contained in:
Tienson Qin
2024-01-08 19:09:54 +08:00
parent 8ba3280c5a
commit dbba75157d
10 changed files with 112 additions and 55 deletions

View File

@@ -13,7 +13,6 @@
[cljs-bean.core :as bean]
[frontend.worker.search :as search]
[logseq.db.sqlite.util :as sqlite-util]
[frontend.worker.pipeline :as pipeline]
[frontend.worker.state :as state]
[frontend.worker.file :as file]
[logseq.db :as ldb]
@@ -129,12 +128,6 @@
(let [{:keys [db search]} (@*sqlite-conns repo)]
(close-db-aux! repo db search)))
(defn- listen-to-db!
[repo conn]
(d/unlisten! conn :gen-ops)
(when (op-mem-layer/rtc-db-graph? repo)
(rtc-db-listener/listen-db-to-generate-ops repo conn)))
(defn- create-or-open-db!
[repo]
(when-not (state/get-sqlite-conn repo)
@@ -154,7 +147,7 @@
conn (sqlite-common-db/get-storage-conn storage schema)]
(swap! *datascript-conns assoc repo conn)
(p/let [_ (op-mem-layer/<init-load-from-indexeddb! repo)]
(listen-to-db! repo conn))
(rtc-db-listener/listen-to-db-changes! repo conn))
nil))))
(defn- iter->vec [iter]
@@ -281,17 +274,8 @@
tx-meta' (if (or (:from-disk? tx-meta) (:new-graph? tx-meta))
tx-meta
(assoc tx-meta :skip-store? true))
tx-report (ldb/transact! conn tx-data tx-meta')
result (pipeline/invoke-hooks repo conn tx-report context)
;; TODO: delay search indice so that UI can be refreshed earlier
search-indice (search/sync-search-indice repo (:tx-report result))
data (merge
{:repo repo
:search-indice search-indice
:tx-data tx-data
:tx-meta tx-meta}
(dissoc result :tx-report))]
(pr-str data))
_tx-report (ldb/transact! conn tx-data tx-meta')]
nil)
(catch :default e
(prn :debug :error)
(js/console.error e)))))

View File

@@ -75,7 +75,9 @@
[rum.core :as rum]
[frontend.db.listener :as db-listener]
[frontend.persist-db.browser :as db-browser]
[frontend.db.rtc.debug-ui :as rtc-debug-ui]))
[frontend.db.rtc.debug-ui :as rtc-debug-ui]
[frontend.modules.outliner.pipeline :as pipeline]
[electron.ipc :as ipc]))
;; TODO: should we move all events here?
@@ -945,6 +947,15 @@
(defmethod handle :rtc/sync-state [[_ state]]
(swap! rtc-debug-ui/debug-state (fn [old] (merge old state))))
;; db-worker -> UI
(defmethod handle :db/sync-changes [[_ data]]
(let [repo (state/get-current-repo)]
(pipeline/invoke-hooks data)
(ipc/ipc :db-transact repo (:tx-data data) (:tx-meta data))
(state/pub-event! [:search/transact-data repo (:search-indice data)])
nil))
(defn run!
[]
(let [chan (state/get-events-chan)]

View File

@@ -22,6 +22,10 @@
(let [state (edn/read-string data)]
(state/pub-event! [:rtc/sync-state state])))
(defmethod handle :sync-db-changes [_ data]
(let [data (edn/read-string data)]
(state/pub-event! [:db/sync-changes data])))
(defmethod handle :default [_ data]
(prn :debug "Worker data not handled: " data))

View File

@@ -11,8 +11,6 @@
[cljs-bean.core :as bean]
[frontend.state :as state]
[electron.ipc :as ipc]
[frontend.modules.outliner.pipeline :as pipeline]
[clojure.edn :as edn]
[frontend.handler.worker :as worker-handler]))
(defonce *worker (atom nil))
@@ -115,13 +113,8 @@
:whiteboards-directory (config/get-whiteboards-directory)
:pages-directory (config/get-pages-directory)}]
(if sqlite
(p/let [result (.transact sqlite repo tx-data' tx-meta'
(pr-str context))
data (edn/read-string result)]
(state/pub-event! [:search/transact-data repo (:search-indice data)])
(pipeline/invoke-hooks data)
(ipc/ipc :db-transact repo (:tx-data data) (:tx-meta data))
nil)
(.transact sqlite repo tx-data' tx-meta'
(pr-str context))
(notification/show! "Latest change was not saved! Please restart the application." :error))))))
(<fetch-initial-data [_this repo _opts]

View File

@@ -76,23 +76,24 @@
{:tx-report tx-report}
(let [{:keys [pages blocks]} (ds-report/get-blocks-and-pages tx-report)
deleted-block-uuids (set (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report)))
replace-tx (concat
replace-tx (when-not (:pipeline-replace? tx-meta)
(concat
;; block path refs
(set (compute-block-path-refs-tx tx-report blocks))
(set (compute-block-path-refs-tx tx-report blocks))
;; delete empty property parent block
(when (seq deleted-block-uuids)
(delete-property-parent-block-if-empty tx-report deleted-block-uuids))
(when (seq deleted-block-uuids)
(delete-property-parent-block-if-empty tx-report deleted-block-uuids))
;; update block/tx-id
(let [updated-blocks (remove (fn [b] (contains? (set deleted-block-uuids) (:block/uuid b))) blocks)
tx-id (get-in tx-report [:tempids :db/current-tx])]
(->>
(map (fn [b]
(when-let [db-id (:db/id b)]
{:db/id db-id
:block/tx-id tx-id})) updated-blocks)
(remove nil?))))
(let [updated-blocks (remove (fn [b] (contains? (set deleted-block-uuids) (:block/uuid b))) blocks)
tx-id (get-in tx-report [:tempids :db/current-tx])]
(->>
(map (fn [b]
(when-let [db-id (:db/id b)]
{:db/id db-id
:block/tx-id tx-id})) updated-blocks)
(remove nil?)))))
tx-report' (or
(when (seq replace-tx)
(ldb/transact! conn replace-tx {:replace? true
@@ -101,7 +102,7 @@
full-tx-data (concat (:tx-data tx-report) fix-tx-data (:tx-data tx-report'))
final-tx-report (assoc tx-report' :tx-data full-tx-data)
affected-query-keys (when-not (:importing? context)
(worker-react/get-affected-queries-keys final-tx-report context))]
(worker-react/get-affected-queries-keys final-tx-report))]
(doseq [page pages]
(file/sync-to-file repo (:db/id page) tx-meta))
{:tx-report final-tx-report

View File

@@ -27,7 +27,7 @@
(defn get-affected-queries-keys
"Get affected queries through transaction datoms."
[{:keys [tx-data db-after]} _opts]
[{:keys [tx-data db-after]}]
{:post [(s/valid? ::affected-keys %)]}
(let [blocks (->> (filter (fn [datom] (contains? #{:block/left :block/parent :block/page} (:a datom))) tx-data)
(map :v)

View File

@@ -26,7 +26,8 @@
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.ws :as ws]
[promesa.core :as p]
[cljs-bean.core :as bean]))
[cljs-bean.core :as bean]
[frontend.worker.react :as worker-react]))
;; +-------------+
@@ -439,18 +440,19 @@
update-ops (vals update-ops-map)
update-page-ops (vals update-page-ops-map)
remove-page-ops (vals remove-page-ops-map)]
;; (state/set-state! [:rtc/remote-batch-tx-state repo]
;; {:in-transaction? true
;; :txs []})
;; (worker-state/start-batch-tx-mode!)
(worker-util/profile :apply-remote-update-page-ops (apply-remote-update-page-ops repo conn date-formatter update-page-ops))
(worker-util/profile :apply-remote-remove-ops (apply-remote-remove-ops repo conn date-formatter remove-ops))
(worker-util/profile :apply-remote-move-ops (apply-remote-move-ops repo conn date-formatter sorted-move-ops))
(worker-util/profile :apply-remote-update-ops (apply-remote-update-ops repo conn date-formatter update-ops))
(worker-util/profile :apply-remote-remove-page-ops (apply-remote-remove-page-ops repo conn remove-page-ops))
;; (let [txs (get-in @state/state [:rtc/remote-batch-tx-state repo :txs])]
;; (worker-util/profile
;; :batch-refresh
;; (react/batch-refresh! repo txs)))
;; (let [txs (worker-state/get-batch-txs)
;; affected-keys (worker-react/get-affected-queries-keys {:tx-data txs :db-after @conn})]
;; (worker-state/exit-batch-tx-mode!))
(op-mem-layer/update-local-tx! repo remote-t))
:else (throw (ex-info "unreachable" {:remote-t remote-t
:remote-t-before remote-t-before

View File

@@ -5,7 +5,11 @@
[clojure.data :as data]
[clojure.set :as set]
[datascript.core :as d]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]))
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.state :as worker-state]
[frontend.worker.pipeline :as pipeline]
[frontend.worker.search :as search]
[frontend.worker.util :as worker-util]))
(defn- entity-datoms=>attr->datom
@@ -158,3 +162,38 @@
(fn [{:keys [tx-data tx-meta db-before db-after]}]
(when (:persist-op? tx-meta true)
(generate-rtc-ops repo db-before db-after tx-data)))))
(comment
(defn listen-db-to-batch-txs
[conn]
(d/listen! conn :batch-txs
(fn [{:keys [tx-data]}]
(when (worker-state/batch-tx-mode?)
(worker-state/conj-batch-txs! tx-data))))))
(defn sync-db-to-main-thread
[repo conn]
(d/listen! conn :sync-db
(fn [{:keys [tx-data tx-meta] :as tx-report}]
(let [result (pipeline/invoke-hooks repo conn tx-report (worker-state/get-context))
;; TODO: delay search indice so that UI can be refreshed earlier
search-indice (search/sync-search-indice repo (:tx-report result))
data (pr-str
(merge
{:repo repo
:search-indice search-indice
:tx-data tx-data
:tx-meta tx-meta}
(dissoc result :tx-report)))]
(worker-util/post-message :sync-db-changes data)))))
(defn listen-to-db-changes!
[repo conn]
(d/unlisten! conn :gen-ops)
(d/unlisten! conn :sync-db)
(when (op-mem-layer/rtc-db-graph? repo)
(listen-db-to-generate-ops repo conn)
;; (rtc-db-listener/listen-db-to-batch-txs conn)
)
(sync-db-to-main-thread repo conn))

View File

@@ -8,7 +8,8 @@
:worker/context {}
:config {}
:git/current-repo nil}))
:git/current-repo nil
:rtc/remote-batch-txs nil}))
(defonce *rtc-ws-url (atom nil))
@@ -74,3 +75,25 @@
(defn set-worker-object!
[worker]
(swap! *state assoc :worker/object worker))
(defn conj-batch-txs!
[tx-data]
(swap! *state update :rtc/remote-batch-txs
(fn [old-data]
(concat old-data tx-data))))
(defn batch-tx-mode?
[]
(some? (:rtc/remote-batch-txs @*state)))
(defn start-batch-tx-mode!
[]
(swap! *state assoc :rtc/remote-batch-txs []))
(defn exit-batch-tx-mode!
[]
(swap! *state assoc :rtc/remote-batch-txs nil))
(defn get-batch-txs
[]
(:rtc/remote-batch-txs @*state))