refactor(wip): add def-thread-api, and move fns from class DBWorker

This commit is contained in:
rcmerci
2025-03-21 21:22:59 +08:00
parent d0422948cb
commit b7d65b82e3
9 changed files with 492 additions and 13 deletions

View File

@@ -217,7 +217,8 @@
rum.core/defcs hooks.rum/defcs
clojure.string/join hooks.path-invalid-construct/string-join
clojure.string/replace hooks.regex-checks/double-escaped-regex
logseq.common.defkeywords/defkeywords hooks.defkeywords/defkeywords}}
logseq.common.defkeywords/defkeywords hooks.defkeywords/defkeywords
frontend.common.thread-api/def-thread-api hooks.def-thread-api/def-thread-api}}
:lint-as {promesa.core/let clojure.core/let
promesa.core/loop clojure.core/loop
promesa.core/recur clojure.core/recur
@@ -233,7 +234,8 @@
frontend.test.helper/deftest-async clojure.test/deftest
frontend.worker.rtc.idb-keyval-mock/with-reset-idb-keyval-mock cljs.test/async
frontend.react/defc clojure.core/defn
logseq.common.defkeywords/defkeyword cljs.spec.alpha/def}
logseq.common.defkeywords/defkeyword cljs.spec.alpha/def
frontend.common.thread-api/defkeyword cljs.spec.alpha/def}
:skip-comments true
:output {:progress true
:exclude-files ["src/test/docs-0.10.9/"]}}

View File

@@ -0,0 +1,13 @@
(ns hooks.def-thread-api
(:require [clj-kondo.hooks-api :as api]))
(defn def-thread-api
[{:keys [node]}]
(let [[_ kw & others] (:children node)
new-node (api/list-node
[(api/token-node 'do)
(api/list-node [(api/token-node 'frontend.common.thread-api/defkeyword) kw])
(api/list-node
(cons (api/token-node 'fn) others))])
new-node* (with-meta new-node (meta node))]
{:node new-node*}))

View File

@@ -0,0 +1,19 @@
(ns frontend.common.thread-api
"Macro for defining thread apis, which is invokeable by other threads"
#?(:cljs (:require-macros [frontend.common.thread-api])))
#?(:cljs
(def *thread-apis (volatile! {})))
#_:clj-kondo/ignore
(defmacro defkeyword [& _args])
(defmacro def-thread-api
"Define a api invokeable by other threads.
e.g. (def-thread-api :rtc/a-api [arg1 arg2] body)"
[qualified-keyword-name params & body]
(assert (qualified-keyword? qualified-keyword-name) qualified-keyword-name)
(assert (vector? params) params)
`(vswap! *thread-apis assoc
~qualified-keyword-name
(fn ~params ~@body)))

View File

@@ -1,7 +1,9 @@
(ns frontend.worker.crypt
"Fns to en/decrypt some block attrs"
(:require [datascript.core :as d]
[frontend.common.thread-api :refer [def-thread-api]]
[frontend.worker.state :as worker-state]
[logseq.db :as ldb]
[promesa.core :as p]))
(defonce ^:private encoder (new js/TextEncoder "utf-8"))
@@ -122,3 +124,14 @@
(assert (some? conn) repo)
(let [aes-key-datom (first (d/datoms @conn :avet :aes-key-jwk))]
{:aes-key-jwk (:v aes-key-datom)})))
(defn- with-write-transit-str
[task]
(p/chain
(js/Promise. task)
ldb/write-transit-str))
(def-thread-api :rtc/get-graph-keys
[repo]
(with-write-transit-str
(get-graph-keys-jwk repo)))

View File

@@ -1,6 +1,7 @@
(ns frontend.worker.db-metadata
"Fns to read/write metadata.edn file for db-based."
(:require [frontend.worker.util :as worker-util]
(:require [frontend.common.thread-api :refer [def-thread-api]]
[frontend.worker.util :as worker-util]
[promesa.core :as p]))
(defn <store
@@ -11,3 +12,7 @@
writable (.createWritable file-handle)
_ (.write writable metadata-str)]
(.close writable)))
(def-thread-api :db-metadata/store
[repo metadata-str]
(<store repo metadata-str))

View File

@@ -9,6 +9,7 @@
[datascript.core :as d]
[datascript.storage :refer [IStorage] :as storage]
[frontend.common.file.core :as common-file]
[frontend.common.thread-api :refer [def-thread-api]]
[frontend.worker.crypt :as worker-crypt]
[frontend.worker.db-listener :as db-listener]
[frontend.worker.db-metadata :as worker-db-metadata]
@@ -414,7 +415,7 @@
(p/all (map (fn [dir]
(p/let [graph-name (-> (.-name dir)
(string/replace-first ".logseq-pool-" "")
;; TODO: DRY
;; TODO: DRY
(string/replace "+3A+" ":")
(string/replace "++" "/"))
metadata-file-handle (.getFileHandle dir "metadata.edn" #js {:create true})
@@ -423,6 +424,10 @@
{:name graph-name
:metadata (edn/read-string metadata)})) db-dirs)))))
(def-thread-api :general/list-all-dbs
[]
(<list-all-dbs))
(defn- <db-exists?
[graph]
(->
@@ -430,7 +435,7 @@
_dir-handle (.getDirectoryHandle root (str "." (worker-util/get-pool-name graph)))]
true)
(p/catch
(fn [_e] ; not found
(fn [_e] ; not found
false))))
(defn- remove-vfs!
@@ -450,6 +455,316 @@
(let [result (when-not (= result @worker-state/*state) result)]
(ldb/write-transit-str result)))))
(def-thread-api :general/get-version
[]
(when-let [sqlite @*sqlite]
(.-version sqlite)))
(def-thread-api :general/init
[rtc-ws-url]
(reset! worker-state/*rtc-ws-url rtc-ws-url)
(init-sqlite-module!))
(def-thread-api :general/create-or-open-db
[repo opts-str]
(let [{:keys [close-other-db?] :or {close-other-db? true} :as opts} (ldb/read-transit-str opts-str)]
(p/do!
(when close-other-db?
(close-other-dbs! repo))
(create-or-open-db! repo (dissoc opts :close-other-db?)))))
(def-thread-api :general/q
[repo inputs-str]
(when-let [conn (worker-state/get-datascript-conn repo)]
(let [inputs (ldb/read-transit-str inputs-str)
result (apply d/q (first inputs) @conn (rest inputs))]
(ldb/write-transit-str result))))
(def-thread-api :general/pull
[repo selector-str id-str]
(when-let [conn (worker-state/get-datascript-conn repo)]
(let [selector (ldb/read-transit-str selector-str)
id (ldb/read-transit-str id-str)
eid (if (and (vector? id) (= :block/name (first id)))
(:db/id (ldb/get-page @conn (second id)))
id)
result (some->> eid
(d/pull @conn selector)
(sqlite-common-db/with-parent @conn))]
(ldb/write-transit-str result))))
(def-thread-api :general/get-block-and-children
[repo id opts]
(when-let [conn (worker-state/get-datascript-conn repo)]
(let [id (if (and (string? id) (common-util/uuid-string? id)) (uuid id) id)]
(ldb/write-transit-str (sqlite-common-db/get-block-and-children @conn id (ldb/read-transit-str opts))))))
(def-thread-api :general/get-block-refs
[repo id]
(when-let [conn (worker-state/get-datascript-conn repo)]
(ldb/write-transit-str (ldb/get-block-refs @conn id))))
(def-thread-api :general/get-block-refs-count
[repo id]
(when-let [conn (worker-state/get-datascript-conn repo)]
(ldb/get-block-refs-count @conn id)))
(def-thread-api :general/get-block-parents
[repo id depth]
(when-let [conn (worker-state/get-datascript-conn repo)]
(let [block-id (:block/uuid (d/entity @conn id))
parents (->> (ldb/get-block-parents @conn block-id {:depth (or depth 3)})
(map (fn [b] (d/pull @conn '[*] (:db/id b)))))]
(ldb/write-transit-str parents))))
(def-thread-api :general/get-page-unlinked-refs
[repo page-id search-result-eids-str]
(when-let [conn (worker-state/get-datascript-conn repo)]
(let [search-result-eids (ldb/read-transit-str search-result-eids-str)]
(ldb/write-transit-str (ldb/get-page-unlinked-refs @conn page-id search-result-eids)))))
(def-thread-api :general/set-context
[context]
(let [context (if (string? context)
(ldb/read-transit-str context)
context)]
(when context (worker-state/update-context! context))
nil))
(def-thread-api :general/transact
[this repo tx-data tx-meta context]
(when repo (worker-state/set-db-latest-tx-time! repo))
(when-let [conn (worker-state/get-datascript-conn repo)]
(try
(let [tx-data' (if (string? tx-data)
(ldb/read-transit-str tx-data)
tx-data)
tx-meta (if (string? tx-meta)
(ldb/read-transit-str tx-meta)
tx-meta)
tx-data' (if (contains? #{:insert-blocks} (:outliner-op tx-meta))
(map (fn [m]
(if (and (map? m) (nil? (:block/order m)))
(assoc m :block/order (db-order/gen-key nil))
m)) tx-data')
tx-data')
context (if (string? context)
(ldb/read-transit-str context)
context)
_ (when context (worker-state/set-context! context))
tx-meta' (cond-> tx-meta
(and (not (:whiteboard/transact? tx-meta))
(not (:rtc-download-graph? tx-meta))) ; delay writes to the disk
(assoc :skip-store? true)
true
(dissoc :insert-blocks?))]
(when-not (and (:create-today-journal? tx-meta)
(:today-journal-name tx-meta)
(seq tx-data')
(ldb/get-page @conn (:today-journal-name tx-meta))) ; today journal created already
;; (prn :debug :transact :tx-data tx-data' :tx-meta tx-meta')
(worker-util/profile "Worker db transact"
(ldb/transact! conn tx-data' tx-meta')))
nil)
(catch :default e
(prn :debug :error)
(let [tx-data (if (string? tx-data)
(ldb/read-transit-str tx-data)
tx-data)]
(js/console.error e)
(prn :debug :tx-data @conn tx-data))))))
(def-thread-api :general/get-initial-data
[repo]
(when-let [conn (worker-state/get-datascript-conn repo)]
(ldb/write-transit-str (sqlite-common-db/get-initial-data @conn))))
(def-thread-api :general/get-page-refs-count
[repo]
(when-let [conn (worker-state/get-datascript-conn repo)]
(ldb/write-transit-str (sqlite-common-db/get-page->refs-count @conn))))
(def-thread-api :general/close-db
[repo]
(close-db! repo))
(def-thread-api :general/reset-db
[repo db-transit]
(reset-db! repo db-transit))
(def-thread-api :general/unsafe-unlink-db
[repo]
(p/let [pool (<get-opfs-pool repo)
_ (close-db! repo)
_result (remove-vfs! pool)]
nil))
(def-thread-api :general/release-access-handles
[repo]
(when-let [^js pool (worker-state/get-opfs-pool repo)]
(.releaseAccessHandles pool)))
(def-thread-api :general/db-exists
[repo]
(<db-exists? repo))
(def-thread-api :general/export-db
[repo]
(when-let [^js db (worker-state/get-sqlite-conn repo :db)]
(.exec db "PRAGMA wal_checkpoint(2)"))
(<export-db-file repo))
(def-thread-api :general/import-db
[repo data]
(when-not (string/blank? repo)
(p/let [pool (<get-opfs-pool repo)]
(<import-db pool data))))
(def-thread-api :search/search-blocks
[repo q option]
(p/let [search-db (get-search-db repo)
conn (worker-state/get-datascript-conn repo)
result (search/search-blocks repo conn search-db q (bean/->clj option))]
(ldb/write-transit-str result)))
(def-thread-api :search/upsert-blocks
[repo blocks]
(p/let [db (get-search-db repo)]
(search/upsert-blocks! db blocks)
nil))
(def-thread-api :search/delete-blocks
[repo ids]
(p/let [db (get-search-db repo)]
(search/delete-blocks! db ids)
nil))
(def-thread-api :search/truncate-tables
[repo]
(p/let [db (get-search-db repo)]
(search/truncate-table! db)
nil))
(def-thread-api :search/build-blocks-indice
[repo]
(when-let [conn (worker-state/get-datascript-conn repo)]
(search/build-blocks-indice repo @conn)))
(def-thread-api :search/build-pages-indice
[_repo]
nil)
(def-thread-api :general/apply-outliner-ops
[repo ops-str opts-str]
(when-let [conn (worker-state/get-datascript-conn repo)]
(try
(worker-util/profile
"apply outliner ops"
(let [ops (ldb/read-transit-str ops-str)
opts (ldb/read-transit-str opts-str)
result (outliner-op/apply-ops! repo conn ops (worker-state/get-date-formatter repo) opts)]
(ldb/write-transit-str result)))
(catch :default e
(let [data (ex-data e)
{:keys [type payload]} (when (map? data) data)]
(case type
:notification
(worker-util/post-message type [(:message payload) (:type payload)])
(throw e)))))))
(def-thread-api :general/file-writes-finished?
[repo]
(let [conn (worker-state/get-datascript-conn repo)
writes @file/*writes]
;; Clean pages that have been deleted
(when conn
(swap! file/*writes (fn [writes]
(->> writes
(remove (fn [[_ pid]] (d/entity @conn pid)))
(into {})))))
(if (empty? writes)
true
(do
(prn "Unfinished file writes:" @file/*writes)
false))))
(def-thread-api :general/page-file-saved
[request-id _page-id]
(file/dissoc-request! request-id)
nil)
(def-thread-api :general/sync-app-state
[new-state-str]
(let [new-state (ldb/read-transit-str new-state-str)]
(worker-state/set-new-state! new-state)
nil))
(def-thread-api :general/sync-ui-state
[repo state-str]
(undo-redo/record-ui-state! repo state-str)
nil)
(def-thread-api :export/get-debug-datoms
[repo]
(when-let [db (worker-state/get-sqlite-conn repo)]
(let [conn (worker-state/get-datascript-conn repo)]
(ldb/write-transit-str (worker-export/get-debug-datoms conn db)))))
(def-thread-api :export/get-all-pages
[repo]
(when-let [conn (worker-state/get-datascript-conn repo)]
(ldb/write-transit-str (worker-export/get-all-pages repo @conn))))
(def-thread-api :export/get-all-page->content
[repo]
(when-let [conn (worker-state/get-datascript-conn repo)]
(ldb/write-transit-str (worker-export/get-all-page->content repo @conn))))
(def-thread-api :undo-redo/undo
[repo _page-block-uuid-str]
(when-let [conn (worker-state/get-datascript-conn repo)]
(ldb/write-transit-str (undo-redo/undo repo conn))))
(def-thread-api :undo-redo/redo
[repo _page-block-uuid-str]
(when-let [conn (worker-state/get-datascript-conn repo)]
(ldb/write-transit-str (undo-redo/redo repo conn))))
(def-thread-api :undo-redo/record-editor-info
[repo _page-block-uuid-str editor-info-str]
(undo-redo/record-editor-info! repo (ldb/read-transit-str editor-info-str))
nil)
(def-thread-api :general/validate-db
[repo]
(when-let [conn (worker-state/get-datascript-conn repo)]
(let [result (worker-db-validate/validate-db @conn)]
(db-migrate/fix-db! conn {:invalid-entity-ids (:invalid-entity-ids result)})
result)))
(def-thread-api :general/export-edn
[repo options]
(let [conn (worker-state/get-datascript-conn repo)]
(try
(->> (ldb/read-transit-str options)
(sqlite-export/build-export @conn)
ldb/write-transit-str)
(catch :default e
(js/console.error "export-edn error: " e)
(worker-util/post-message :notification
["An unexpected error occurred during export. See the javascript console for details."
:error])))))
(comment
(def-thread-api :general/dangerousRemoveAllDbs
[this repo]
(p/let [r (.listDB this)
dbs (ldb/read-transit-str r)]
(p/all (map #(.unsafeUnlinkDB this (:name %)) dbs)))))
#_:clj-kondo/ignore
(defclass DBWorker
(extends js/Object)
@@ -487,11 +802,6 @@
(close-other-dbs! repo))
(create-or-open-db! repo (dissoc opts :close-other-db?)))))
(getMaxTx
[_this repo]
(when-let [conn (worker-state/get-datascript-conn repo)]
(:max-tx @conn)))
(q [_this repo inputs-str]
"Datascript q"
(when-let [conn (worker-state/get-datascript-conn repo)]

View File

@@ -4,12 +4,13 @@
[cljs-time.coerce :as tc]
[cljs-time.core :as t]
[clojure.string :as string]
[frontend.common.missionary :as c.m]
[frontend.common.thread-api :refer [def-thread-api]]
[frontend.worker.crypt :as crypt]
[frontend.worker.rtc.client-op :as client-op]
[frontend.worker.rtc.ws-util :as ws-util]
[frontend.worker.state :as worker-state]
[goog.crypt.base64 :as base64]
[frontend.common.missionary :as c.m]
[logseq.db :as ldb]
[missionary.core :as m]
[promesa.core :as p]))
@@ -206,3 +207,30 @@
devices*)))]
(m/? (new-task--sync-encrypted-aes-key*
get-ws-create-task device-uuid->encrypted-aes-key graph-uuid))))))))))
(defn- with-write-transit-str
[task]
(p/chain
(js/Promise. task)
ldb/write-transit-str))
(def-thread-api :rtc/sync-current-graph-encrypted-aes-key
[token device-uuids-transit-str]
(with-write-transit-str
(new-task--sync-current-graph-encrypted-aes-key
token device-uuids-transit-str)))
(def-thread-api :device/list-devices
[token]
(with-write-transit-str
(new-task--list-devices token)))
(def-thread-api :device/remove-device-public-key
[token device-uuid key-name]
(with-write-transit-str
(new-task--remove-device-public-key token device-uuid key-name)))
(def-thread-api :device/remove-device
[token device-uuid]
(with-write-transit-str
(new-task--remove-device token device-uuid)))

View File

@@ -30,7 +30,7 @@
(swap! *writes assoc request-id page-id)
request-id))
(defn- dissoc-request!
(defn dissoc-request!
[request-id]
(when-let [page-id (get @*writes request-id)]
(let [old-page-request-ids (keep (fn [[r p]]

View File

@@ -3,6 +3,7 @@
(:require [clojure.data :as data]
[datascript.core :as d]
[frontend.common.missionary :as c.m]
[frontend.common.thread-api :refer [def-thread-api]]
[frontend.worker.device :as worker-device]
[frontend.worker.rtc.asset :as r.asset]
[frontend.worker.rtc.branch-graph :as r.branch-graph]
@@ -23,7 +24,8 @@
[logseq.db :as ldb]
[logseq.db.frontend.schema :as db-schema]
[malli.core :as ma]
[missionary.core :as m])
[missionary.core :as m]
[promesa.core :as p])
(:import [missionary Cancelled]))
(def ^:private rtc-state-schema
@@ -530,6 +532,93 @@
(def new-task--download-graph-from-s3 r.upload-download/new-task--download-graph-from-s3)
(defn- with-write-transit-str
[task]
(p/chain
(js/Promise. task)
ldb/write-transit-str))
(def-thread-api :rtc/start
[repo token]
(with-write-transit-str
(new-task--rtc-start repo token)))
(def-thread-api :rtc/stop
[]
(rtc-stop))
(def-thread-api :rtc/toggle-auto-push
[]
(rtc-toggle-auto-push))
(def-thread-api :rtc/toggle-remote-profile
[]
(rtc-toggle-remote-profile))
(def-thread-api :rtc/grant-graph-access
[token graph-uuid target-user-uuids-str target-user-emails-str]
(let [target-user-uuids (ldb/read-transit-str target-user-uuids-str)
target-user-emails (ldb/read-transit-str target-user-emails-str)]
(with-write-transit-str
(new-task--grant-access-to-others token graph-uuid
:target-user-uuids target-user-uuids
:target-user-emails target-user-emails))))
(def-thread-api :rtc/get-graphs
[token]
(with-write-transit-str
(new-task--get-graphs token)))
(def-thread-api :rtc/delete-graph
[token graph-uuid schema-version]
(with-write-transit-str
(new-task--delete-graph token graph-uuid schema-version)))
(def-thread-api :rtc/get-users-info
[token graph-uuid]
(with-write-transit-str
(new-task--get-users-info token graph-uuid)))
(def-thread-api :rtc/get-block-content-versions
[token graph-uuid block-uuid]
(with-write-transit-str
(new-task--get-block-content-versions token graph-uuid block-uuid)))
(def-thread-api :rtc/get-debug-state
[]
(with-write-transit-str
(new-task--get-debug-state)))
(def-thread-api :rtc/async-upload-graph
[repo token remote-graph-name]
(with-write-transit-str
(new-task--upload-graph token repo remote-graph-name)))
(def-thread-api :rtc/async-branch-graph
[repo token]
(with-write-transit-str
(new-task--branch-graph token repo)))
(def-thread-api :rtc/request-download-graph
[token graph-uuid schema-version]
(with-write-transit-str
(new-task--request-download-graph token graph-uuid schema-version)))
(def-thread-api :rtc/wait-download-graph-info-ready
[token download-info-uuid graph-uuid schema-version timeout-ms]
(with-write-transit-str
(new-task--wait-download-info-ready token download-info-uuid graph-uuid schema-version timeout-ms)))
(def-thread-api :rtc/download-graph-from-s3
[graph-uuid graph-name s3-url]
(with-write-transit-str
(new-task--download-graph-from-s3 graph-uuid graph-name s3-url)))
(def-thread-api :rtc/download-info-list
[ token graph-uuid schema-version]
(with-write-transit-str
(new-task--download-info-list token graph-uuid schema-version)))
;;; ================ API (ends) ================
;;; subscribe state ;;;