milestone 3 (part 1)

This commit is contained in:
rcmerci
2026-01-13 19:57:51 +08:00
parent 7b2ef7fce2
commit bf43ca478c
14 changed files with 898 additions and 115 deletions

View File

@@ -60,6 +60,8 @@ frontend.ui/_emoji-init-data
frontend.worker.rtc.op-mem-layer/_sync-loop-canceler
;; Used by shadow.cljs
frontend.worker.db-worker/init
;; Used by shadow.cljs (node entrypoint)
frontend.worker.db-worker-node/main
;; Future use?
frontend.worker.rtc.hash/hash-blocks
;; Repl fn

6
bb.edn
View File

@@ -182,6 +182,12 @@
dev:gen-malli-kondo-config
logseq.tasks.dev/gen-malli-kondo-config
dev:db-worker-node
{:doc "Compile and start db-worker-node (pass-through args forwarded to node)"
:task (do
(shell "clojure" "-M:cljs" "compile" "db-worker-node")
(apply shell "node" "./static/db-worker-node.js" *command-line-args*))}
lint:dev
logseq.tasks.dev.lint/dev

View File

@@ -1,27 +0,0 @@
# db-worker browser-only API inventory
## Worker entry
- frontend.worker.db-worker
- importScripts
- js/self postMessage
- js/location.href
- navigator.storage (OPFS)
- Comlink expose/wrap/transfer
- setInterval
## Shared service
- frontend.worker.shared-service
- navigator.locks.request/query
- BroadcastChannel
- js/self postMessage
## RTC and crypto
- frontend.worker.rtc.ws
- WebSocket
- frontend.worker.rtc.crypt
- OPFS file access via frontend.common.file.opfs
- js/self location
## Worker util
- frontend.worker-common.util
- wfu/post-message (worker postMessage bridge)

View File

@@ -18,7 +18,7 @@ Make `frontend.worker.db-worker` and its dependencies run in both browser and No
- Implement `frontend.worker.platform.node` using `fs/promises`, `path`, `crypto`, and `ws`.
3. Abstract sqlite storage and VFS specifics.
- Browser: keep OPFS SAH pool implementation.
- Node: use file-backed sqlite storage (via sqlite-wasm Node VFS or a Node sqlite binding).
- Node: use file-backed sqlite storage via `better-sqlite3` (no OPFS, no sqlite-wasm).
- Route db path resolution through the platform adapter (data dir, per-repo paths).
4. Replace `importScripts` bootstrap with an explicit init entrypoint.
- Browser build still uses `:web-worker`, but entrypoint should call `init!` with a browser platform adapter.
@@ -41,10 +41,40 @@ Make `frontend.worker.db-worker` and its dependencies run in both browser and No
10. Build config changes.
- Add a Node build target in `shadow-cljs.edn` for db-worker (e.g. `:db-worker-node`).
- Ensure shared code compiles for `:node-script` or `:node-library` with the correct externs.
- Add `better-sqlite3` dependency and ensure Node target treats it as a native external.
11. Tests and fixtures.
- Add unit tests for platform adapters and storage abstraction.
- Add a minimal integration test that starts the Node daemon and exercises a small RPC call.
## Node.js sqlite Implementation (better-sqlite3)
Node runtime must not use OPFS or sqlite-wasm. Instead, use `better-sqlite3` as the direct file-backed sqlite engine.
### Concrete Refactor Items (File + Function + Summary)
- `src/main/frontend/worker/db_core.cljs` (`init-sqlite-module!`, `<get-opfs-pool`, `get-dbs`, `<export-db-file`, `<import-db`, `close-db-aux!`, `:thread-api/init`)
- Remove sqlite-wasm initialization and OPFS pool usage on Node.
- Route all sqlite open/close/exec/transaction operations through a platform-provided sqlite interface.
- Replace OPFS export/import with file-based export/import in Node.
- `src/main/frontend/worker/platform.cljs` (`validate-platform!`)
- Extend platform contract to include a `:sqlite` section (or expand `:storage`) defining: `open-db`, `close-db`, `exec`, `transaction`, `export-db`, `import-db`.
- `src/main/frontend/worker/platform/node.cljs` (`node-platform`, `install-opfs-pool`, `export-file`, `import-db`, `remove-vfs!`)
- Remove OPFS pool and sqlite-wasm Node VFS behavior.
- Implement `better-sqlite3` adapter: open db files, exec, transactions, close.
- Resolve db paths under data-dir and ensure directories exist.
- `src/main/frontend/worker/platform/browser.cljs` (`browser-platform`, `install-opfs-pool`)
- Keep sqlite-wasm + OPFS behavior but conform to the same `:sqlite` interface used by db-core.
- `src/main/frontend/worker/state.cljs` (`*opfs-pools`, `get-opfs-pool`)
- Ensure Node path does not write/read OPFS pools; keep OPFS state browser-only.
- `src/main/frontend/worker/db_worker_node.cljs` (`main`, `<init-worker!`)
- Initialize platform adapter before core init; mark readiness after `better-sqlite3` ready.
- `src/main/frontend/persist_db/node.cljs` (`start!`, `<invoke`)
- Keep API compatibility; update if any thread-api names or args change due to sqlite refactor.
- `shadow-cljs.edn` (`:db-worker-node`)
- Ensure `better-sqlite3` stays external; no bundling of sqlite-wasm artifacts for Node target.
- `package.json`
- Add `better-sqlite3` dependency; keep sqlite-wasm for browser path only.
- Tests
- Add Node integration smoke test with `better-sqlite3` backing: `list-db`, `create-or-open-db`, `q`, `transact`.
## Refactor Steps (Milestones + Status)
### Milestone 1: Architecture & Abstractions
@@ -69,16 +99,24 @@ Make `frontend.worker.db-worker` and its dependencies run in both browser and No
- `bb dev:lint-and-test` passes.
### Milestone 3: Node Path & Daemon
- TODO 8. Implement `frontend.worker.platform.node` in single-client mode (no locks or BroadcastChannel).
- TODO 9. Update shared-service to no-op/single-client behavior in Node.
- TODO 10. Add Node build target in `shadow-cljs.edn` for db-worker.
- TODO 11. Implement Node daemon entrypoint and HTTP server.
- DONE 8. Implement `frontend.worker.platform.node` in single-client mode (no locks or BroadcastChannel).
- DONE 9. Update shared-service to no-op/single-client behavior in Node.
- DONE 10. Add Node build target in `shadow-cljs.edn` for db-worker.
- DONE 11. Implement Node daemon entrypoint and HTTP server.
- TODO 12. Add a Node client in frontend to call the daemon (HTTP + SSE/WS events).
- DONE 12a. Switch Node sqlite implementation to `better-sqlite3` (no OPFS, no sqlite-wasm).
#### Acceptance Criteria
- Node platform adapter provides storage/kv/broadcast/websocket/crypto/timers and validates via `frontend.worker.platform`.
- Node sqlite adapter uses `better-sqlite3` and opens file-backed dbs in data-dir.
- Node build target compiles db-worker core without browser-only APIs.
- Node daemon starts via CLI and reports readiness; `GET /healthz` and `GET /readyz` return `200 OK`.
- `POST /v1/invoke` handles `list-db`, `create-or-open-db`, `q`, `transact` in a smoke test.
- steps:
1. list-db
2. create-or-open-db
3. list-db, ensure new created db existing
4. transact
5. q
- Node client can invoke at least one RPC and receive one event (SSE or WS).
- `bb dev:lint-and-test` passes.
@@ -98,7 +136,7 @@ The db-worker should be runnable as a standalone process for Node.js environment
- Provide a CLI entry (example: `bin/logseq-db-worker` or `node dist/db-worker-node.js`).
- CLI flags (suggested):
- `--host` (default `127.0.0.1`)
- `--port` (default `8080`)
- `--port` (default `9101`)
- `--data-dir` (path for sqlite files, required or default to `~/.logseq/db-worker`)
- `--repo` (optional: auto-open a repo on boot)
- `--rtc-ws-url` (optional)
@@ -143,6 +181,7 @@ Event delivery options:
- OPFS and IndexedDB do not exist in Node; file-backed storage and a Node KV store are required.
- `BroadcastChannel` and `navigator.locks` are browser-only; Node should use a simpler single-client mode.
- `Comlink` is browser-optimized; the Node daemon should use HTTP, not Comlink.
- sqlite-wasm must remain browser-only; Node uses `better-sqlite3` directly.
## Success Criteria
- Browser build continues to work with WebWorker + Comlink.

View File

@@ -151,6 +151,7 @@
"@tabler/icons-webfont": "^2.47.0",
"@tippyjs/react": "4.2.5",
"bignumber.js": "^9.0.2",
"better-sqlite3": "11.10.0",
"chokidar": "3.5.1",
"chrono-node": "2.2.4",
"codemirror": "5.65.18",

View File

@@ -87,6 +87,16 @@
shadow.remote.runtime.cljs.browser]
:loader-mode :eval}}
:db-worker-node {:target :node-script
:output-to "static/db-worker-node.js"
:main frontend.worker.db-worker-node/main
:compiler-options {:infer-externs :auto
:source-map true
:externs ["datascript/externs.js"
"externs.js"]
:warnings {:fn-deprecated false
:redef false}}}
:inference-worker {:target :browser
:module-loader true
:js-options {:js-provider :external

View File

@@ -2,17 +2,29 @@
"Backend of DB based graph"
(:require [frontend.db :as db]
[frontend.persist-db.browser :as browser]
[frontend.persist-db.node :as node]
[frontend.persist-db.protocol :as protocol]
[frontend.handler.worker :as worker-handler]
[frontend.state :as state]
[frontend.util :as util]
[promesa.core :as p]))
(defonce opfs-db (browser/->InBrowser))
(defonce node-db (atom nil))
(defn- node-runtime?
[]
(and (exists? js/process)
(not (exists? js/window))))
(defn- get-impl
"Get the actual implementation of PersistentDB"
[]
opfs-db)
(if (node-runtime?)
(or @node-db
(reset! node-db (node/start! (assoc (node/default-config)
:event-handler worker-handler/handle))))
opfs-db))
(defn <list-db []
(protocol/<list-db (get-impl)))

View File

@@ -0,0 +1,162 @@
(ns frontend.persist-db.node
"Node client for db-worker daemon."
(:require [clojure.string :as string]
[goog.object :as gobj]
[frontend.persist-db.protocol :as protocol]
[lambdaisland.glogi :as log]
[logseq.db :as ldb]
[promesa.core :as p]))
(defn- node-runtime?
[]
(and (exists? js/process)
(not (exists? js/window))))
(defn- require-node
[module-name]
(when (node-runtime?)
(js/require module-name)))
(defonce ^:private http-module (delay (require-node "http")))
(defonce ^:private https-module (delay (require-node "https")))
(defn- request-module
[^js url]
(let [https? (= "https:" (.-protocol url))
module (if https? @https-module @http-module)]
(or module
(throw (ex-info "Node runtime required for db-worker client" {:protocol (.-protocol url)})))))
(defn- base-headers
[auth-token]
(cond-> {"Content-Type" "application/json"
"Accept" "application/json"}
(seq auth-token)
(assoc "Authorization" (str "Bearer " auth-token))))
(defn- <request
[method url headers body]
(p/create
(fn [resolve reject]
(let [req (.request (request-module url)
#js {:method method
:hostname (.-hostname url)
:port (or (.-port url) (if (= "https:" (.-protocol url)) 443 80))
:path (str (.-pathname url) (.-search url))
:headers (clj->js headers)}
(fn [^js res]
(let [chunks (array)]
(.on res "data" (fn [chunk] (.push chunks chunk)))
(.on res "end" (fn []
(let [buf (js/Buffer.concat chunks)]
(resolve {:status (.-statusCode res)
:body (.toString buf "utf8")}))))
(.on res "error" reject))))]
(.on req "error" reject)
(when body
(.write req body))
(.end req)))))
(defn- <invoke
[{:keys [base-url auth-token]} method direct-pass? args]
(let [url (js/URL. (str (string/replace base-url #"/$" "") "/v1/invoke"))
payload (js/JSON.stringify
(clj->js (if direct-pass?
{:method method
:directPass true
:args args}
{:method method
:directPass false
:argsTransit (ldb/write-transit-str args)})))]
(p/let [{:keys [status body]} (<request "POST" url (base-headers auth-token) payload)]
(if (<= 200 status 299)
(let [{:keys [result resultTransit]} (js->clj (js/JSON.parse body) :keywordize-keys true)]
(if direct-pass?
result
(ldb/read-transit-str resultTransit)))
(do
(log/error :db-worker-node-invoke-failed {:status status :body body})
(throw (ex-info "db-worker-node invoke failed" {:status status :body body})))))))
(defn- connect-events!
[{:keys [base-url auth-token event-handler]} wrapped-worker]
(let [url (js/URL. (str (string/replace base-url #"/$" "") "/v1/events"))
headers (base-headers auth-token)
buffer (atom "")
handler (or event-handler (fn [_type _payload _wrapped-worker] nil))]
(let [req (.request
(request-module url)
#js {:method "GET"
:hostname (.-hostname url)
:port (or (.-port url) (if (= "https:" (.-protocol url)) 443 80))
:path (str (.-pathname url) (.-search url))
:headers (clj->js headers)}
(fn [^js res]
(.on res "data"
(fn [chunk]
(swap! buffer str (.toString chunk "utf8"))
(loop []
(when-let [idx (string/index-of @buffer "\n\n")]
(let [event-text (subs @buffer 0 idx)
rest-text (subs @buffer (+ idx 2))]
(reset! buffer rest-text)
(when-let [line (some-> event-text
(string/split-lines)
(->> (some #(when (string/starts-with? % "data: ")
(subs % 6)))))]
(let [{:keys [type payload]} (js->clj (js/JSON.parse line) :keywordize-keys true)]
(when (and type payload)
(handler (keyword type) (ldb/read-transit-str payload) wrapped-worker))))
(recur))))))
(.on res "error" (fn [e]
(log/error :db-worker-node-events-error e)))))]
(.on req "error" (fn [e]
(log/error :db-worker-node-events-error e)))
(.end req))
nil))
(defrecord InNode [client wrapped-worker]
protocol/PersistentDB
(<new [_this repo opts]
(<invoke client "thread-api/create-or-open-db" false [repo opts]))
(<list-db [_this]
(<invoke client "thread-api/list-db" false []))
(<unsafe-delete [_this repo]
(<invoke client "thread-api/unsafe-unlink-db" false [repo]))
(<release-access-handles [_this repo]
(<invoke client "thread-api/release-access-handles" false [repo]))
(<fetch-initial-data [_this repo opts]
(p/let [_ (<invoke client "thread-api/create-or-open-db" false [repo opts])]
(<invoke client "thread-api/get-initial-data" false [repo opts])))
(<export-db [_this repo opts]
(p/let [data (<invoke client "thread-api/export-db" true [repo])]
(if (:return-data? opts)
data
data)))
(<import-db [_this repo data]
(<invoke client "thread-api/import-db" true [repo data])))
(defn create-client
[{:keys [base-url auth-token]}]
{:base-url base-url
:auth-token auth-token})
(defn default-config
[]
{:base-url (or (gobj/get (.-env js/process) "LOGSEQ_DB_WORKER_URL")
"http://127.0.0.1:9101")
:auth-token (gobj/get (.-env js/process) "LOGSEQ_DB_WORKER_AUTH_TOKEN")})
(defn start!
[{:keys [base-url auth-token event-handler]}]
(let [client (create-client {:base-url base-url :auth-token auth-token})
wrapped-worker (fn [qkw direct-pass? & args]
(<invoke client (str (namespace qkw) "/" (name qkw)) direct-pass? args))]
(connect-events! (assoc client :event-handler event-handler) wrapped-worker)
(->InNode client wrapped-worker)))

View File

@@ -1,7 +1,6 @@
(ns frontend.worker.db-core
"Core db-worker logic without host-specific bootstrap."
(:require ["@sqlite.org/sqlite-wasm" :default sqlite3InitModule]
[cljs-bean.core :as bean]
(:require [cljs-bean.core :as bean]
[cljs.cache :as cache]
[clojure.edn :as edn]
[clojure.set]
@@ -34,6 +33,7 @@
[frontend.worker.shared-service :as shared-service]
[frontend.worker.state :as worker-state]
[frontend.worker.thread-atom]
[goog.object :as gobj]
[lambdaisland.glogi :as log]
[logseq.cli.common.mcp.tools :as cli-common-mcp-tools]
[logseq.common.util :as common-util]
@@ -63,29 +63,60 @@
(defonce *client-ops-conns worker-state/*client-ops-conns)
(defonce *opfs-pools worker-state/*opfs-pools)
(defonce *publishing? (atom false))
(defonce ^:private *node-pools (atom {}))
(defn- node-runtime?
[]
(= :node (platform/env-flag (platform/current) :runtime)))
(defn- get-storage-pool
[graph]
(if (node-runtime?)
(get @*node-pools graph)
(worker-state/get-opfs-pool graph)))
(defn- remember-storage-pool!
[graph pool]
(if (node-runtime?)
(swap! *node-pools assoc graph pool)
(swap! *opfs-pools assoc graph pool)))
(defn- forget-storage-pool!
[graph]
(if (node-runtime?)
(swap! *node-pools dissoc graph)
(swap! *opfs-pools dissoc graph)))
(defn- <get-opfs-pool
[graph]
(when-not @*publishing?
(or (worker-state/get-opfs-pool graph)
(or (get-storage-pool graph)
(p/let [storage (platform/storage (platform/current))
_ (log/info :db-worker/get-opfs-pool {:graph graph})
^js pool ((:install-opfs-pool storage) @*sqlite (worker-util/get-pool-name graph))]
(swap! *opfs-pools assoc graph pool)
(remember-storage-pool! graph pool)
(log/info :db-worker/get-opfs-pool-done {:graph graph})
pool))))
(defn- init-sqlite-module!
[]
(when-not @*sqlite
(p/let [publishing? (platform/env-flag (platform/current) :publishing?)
sqlite (sqlite3InitModule (clj->js {:print #(log/info :init-sqlite-module! %)
:printErr #(log/error :init-sqlite-module! %)}))]
sqlite (platform/sqlite-init! (platform/current))]
(reset! *publishing? publishing?)
(reset! *sqlite sqlite)
(reset! *sqlite (or sqlite ::sqlite-initialized))
nil)))
(def repo-path "/db.sqlite")
(def debug-log-path "/debug-log/db.sqlite")
(defn- resolve-db-path
[repo pool path]
(let [storage (platform/storage (platform/current))]
(if-let [f (:resolve-db-path storage)]
(f repo pool path)
path)))
(defn- <export-db-file
([repo]
(<export-db-file repo repo-path))
@@ -156,9 +187,10 @@
(when search (.close search))
(when client-ops (.close client-ops))
(when debug-log (.close debug-log))
(when-let [^js pool (worker-state/get-opfs-pool repo)]
(.pauseVfs pool))
(swap! *opfs-pools dissoc repo))
(when-let [^js pool (get-storage-pool repo)]
(when (exists? (.-pauseVfs pool))
(.pauseVfs pool)))
(forget-storage-pool! repo))
(defn- close-other-dbs!
[repo]
@@ -184,18 +216,47 @@
(defn- get-dbs
[repo]
(if @*publishing?
(p/let [^object DB (.-DB ^object (.-oo1 ^object @*sqlite))
db (new DB "/db.sqlite" "c")
search-db (new DB "/search-db.sqlite" "c")]
(p/let [db (platform/sqlite-open (platform/current)
{:sqlite @*sqlite
:path "/db.sqlite"
:mode "c"})
search-db (platform/sqlite-open (platform/current)
{:sqlite @*sqlite
:path "/search-db.sqlite"
:mode "c"})]
[db search-db])
(p/let [^js pool (<get-opfs-pool repo)
capacity (.getCapacity pool)
_ (when (zero? capacity) ; file handle already releases since pool will be initialized only once
capacity (when (exists? (.-getCapacity pool))
(.getCapacity pool))
_ (when (and (some? capacity) (zero? capacity))
(.unpauseVfs pool))
db (new (.-OpfsSAHPoolDb pool) repo-path)
search-db (new (.-OpfsSAHPoolDb pool) (str "search" repo-path))
client-ops-db (new (.-OpfsSAHPoolDb pool) (str "client-ops-" repo-path))
debug-log-db (new (.-OpfsSAHPoolDb pool) (str "debug-log" repo-path))]
_ (log/info :db-worker/get-dbs-paths {:repo repo
:repo-dir (.-repoDir pool)
:capacity capacity})
db-path (resolve-db-path repo pool repo-path)
search-path (resolve-db-path repo pool (str "search" repo-path))
client-ops-path (resolve-db-path repo pool (str "client-ops-" repo-path))
debug-log-db-path (resolve-db-path repo pool (str "debug-log" repo-path))
_ (log/info :db-worker/get-dbs-open {:repo repo :db-path db-path})
db (platform/sqlite-open (platform/current)
{:sqlite @*sqlite
:pool pool
:path db-path})
_ (log/info :db-worker/get-dbs-open {:repo repo :search-path search-path})
search-db (platform/sqlite-open (platform/current)
{:sqlite @*sqlite
:pool pool
:path search-path})
_ (log/info :db-worker/get-dbs-open {:repo repo :client-ops-path client-ops-path})
client-ops-db (platform/sqlite-open (platform/current)
{:sqlite @*sqlite
:pool pool
:path client-ops-path})
_ (log/info :db-worker/get-dbs-open {:repo repo :debug-log-db-path debug-log-db-path})
debug-log-db (platform/sqlite-open (platform/current)
{:sqlite @*sqlite
:pool pool
:path debug-log-db-path})]
[db search-db client-ops-db debug-log-db])))
(defn- enable-sqlite-wal-mode!
@@ -222,6 +283,9 @@
(defn- <create-or-open-db!
[repo {:keys [config datoms] :as opts}]
(when-not (worker-state/get-sqlite-conn repo)
(log/info :db-worker/create-or-open-start {:repo repo
:has-datoms? (boolean datoms)
:import-type (:import-type opts)})
(p/let [[db search-db client-ops-db debug-log-db :as dbs] (get-dbs repo)
storage (new-sqlite-storage db)
client-ops-storage (when-not @*publishing?
@@ -281,7 +345,8 @@
(let [client-ops (rtc-migrate/migration-results=>client-ops migration-result)]
(client-op/add-ops! repo client-ops))))
(db-listener/listen-db-changes! repo (get @*datascript-conns repo))))))
(db-listener/listen-db-changes! repo (get @*datascript-conns repo))
(log/info :db-worker/create-or-open-done {:repo repo})))))
(defn- <list-all-dbs
[]
@@ -296,6 +361,7 @@
(def-thread-api :thread-api/list-db
[]
(log/info :thread-api/list-db nil)
(<list-all-dbs))
(defn- <db-exists?
@@ -341,12 +407,17 @@
(p/do!
(when close-other-db?
(close-other-dbs! repo))
(log/info :db-worker/start-db {:repo repo
:close-other-db? close-other-db?
:master? @shared-service/*master-client?})
(when @shared-service/*master-client?
(<create-or-open-db! repo (dissoc opts :close-other-db?)))
nil))
(def-thread-api :thread-api/create-or-open-db
[repo opts]
(log/info :thread-api/create-or-open-db {:repo repo
:opts (dissoc opts :config)})
(when-not (= repo (worker-state/get-current-repo)) ; graph switched
(reset! worker-state/*deleted-block-uuid->db-id {}))
(start-db! repo opts))
@@ -505,8 +576,9 @@
(def-thread-api :thread-api/release-access-handles
[repo]
(when-let [^js pool (worker-state/get-opfs-pool repo)]
(.pauseVfs pool)
(when-let [^js pool (get-storage-pool repo)]
(when (exists? (.-pauseVfs pool))
(.pauseVfs pool))
nil))
(def-thread-api :thread-api/db-exists
@@ -789,6 +861,8 @@
[repo start-opts]
(js/Promise.
(m/sp
(log/info :db-worker/on-become-master-start {:repo repo
:import-type (:import-type start-opts)})
(c.m/<? (init-sqlite-module!))
(when-not (:import-type start-opts)
(c.m/<? (start-db! repo start-opts))
@@ -796,7 +870,8 @@
;; Don't wait for rtc started because the app will be slow to be ready
;; for users.
(when @worker-state/*rtc-ws-url
(rtc.core/new-task--rtc-start true)))))
(rtc.core/new-task--rtc-start true))
(log/info :db-worker/on-become-master-done {:repo repo}))))
(def broadcast-data-types
(set (map
@@ -815,14 +890,18 @@
(when graph
(if (= graph prev-graph)
service
(p/let [service (shared-service/<create-service graph
(bean/->js fns)
#(on-become-master graph start-opts)
broadcast-data-types
{:import? (:import-type? start-opts)})]
(do
(log/info :db-worker/init-service {:graph graph
:prev-graph prev-graph
:import-type (:import-type start-opts)})
(p/let [service (shared-service/<create-service graph
(bean/->js fns)
#(on-become-master graph start-opts)
broadcast-data-types
{:import? (:import-type? start-opts)})]
(assert (p/promise? (get-in service [:status :ready])))
(reset! *service [graph service])
service)))))
service))))))
(defn- notify-invalid-data
[{:keys [tx-meta]} errors]
@@ -852,7 +931,12 @@
(= :thread-api/create-or-open-db method-k)
;; because shared-service operates at the graph level,
;; creating a new database or switching to another one requires re-initializing the service.
(let [[graph opts] (ldb/read-transit-str (last args))]
(let [payload (last args)
payload' (cond
(string? payload) (ldb/read-transit-str payload)
(array? payload) (js->clj payload :keywordize-keys true)
:else payload)
[graph opts] payload']
(p/let [service (<init-service! graph opts)
client-id (:client-id service)]
(when client-id
@@ -861,6 +945,7 @@
{:client-id client-id}))
(get-in service [:status :ready])
;; wait for service ready
(log/info :DEBUG [k args])
(js-invoke (:proxy service) k args)))
(or
@@ -872,6 +957,7 @@
:else
;; ensure service is ready
(p/let [_ready-value (get-in service [:status :ready])]
(log/info :DEBUG [k args])
(js-invoke (:proxy service) k args)))))]))
(into {})
bean/->js))

View File

@@ -0,0 +1,207 @@
(ns frontend.worker.db-worker-node
"Node.js daemon entrypoint for db-worker."
(:require ["http" :as http]
[clojure.string :as string]
[frontend.worker.db-core :as db-core]
[frontend.worker.platform.node :as platform-node]
[frontend.worker.state :as worker-state]
[goog.object :as gobj]
[lambdaisland.glogi :as log]
[lambdaisland.glogi.console :as glogi-console]
[logseq.db :as ldb]
[promesa.core :as p]))
(defonce ^:private *ready? (atom false))
(defonce ^:private *sse-clients (atom #{}))
(defn- send-json!
[^js res status payload]
(.writeHead res status #js {"Content-Type" "application/json"})
(.end res (js/JSON.stringify (clj->js payload))))
(defn- send-text!
[^js res status text]
(.writeHead res status #js {"Content-Type" "text/plain"})
(.end res text))
(defn- <read-body
[^js req]
(p/create
(fn [resolve reject]
(let [chunks (array)]
(.on req "data" (fn [chunk] (.push chunks chunk)))
(.on req "end" (fn []
(let [buf (js/Buffer.concat chunks)]
(resolve (.toString buf "utf8")))))
(.on req "error" reject)))))
(defn- authorized?
[^js req auth-token]
(if (string/blank? auth-token)
true
(let [auth (gobj/get (.-headers req) "authorization")]
(= auth (str "Bearer " auth-token)))))
(defn- parse-args
[argv]
(loop [args (vec (drop 2 argv))
opts {}]
(if (empty? args)
opts
(let [[flag value & remaining] args]
(case flag
"--host" (recur remaining (assoc opts :host value))
"--port" (recur remaining (assoc opts :port (js/parseInt value 10)))
"--data-dir" (recur remaining (assoc opts :data-dir value))
"--repo" (recur remaining (assoc opts :repo value))
"--rtc-ws-url" (recur remaining (assoc opts :rtc-ws-url value))
"--log-level" (recur remaining (assoc opts :log-level value))
"--auth-token" (recur remaining (assoc opts :auth-token value))
"--help" (recur remaining (assoc opts :help? true))
(recur remaining opts))))))
(defn- handle-event!
[type payload]
(let [event (js/JSON.stringify (clj->js {:type type :payload payload}))
message (str "data: " event "\n\n")]
(doseq [^js res @*sse-clients]
(try
(.write res message)
(catch :default e
(log/error :sse-write-failed e))))))
(defn- sse-handler
[^js req ^js res]
(.writeHead res 200 #js {"Content-Type" "text/event-stream"
"Cache-Control" "no-cache"
"Connection" "keep-alive"})
(.write res "\n")
(swap! *sse-clients conj res)
(.on req "close" (fn []
(swap! *sse-clients disj res))))
(defn- <invoke!
[proxy method direct-pass? args]
(let [args' (if direct-pass?
(into-array (or args []))
(if (string? args)
args
(ldb/write-transit-str args)))
started-at (js/Date.now)
timeout-id (js/setTimeout
(fn []
(log/warn :db-worker-node-invoke-timeout
{:method method
:elapsed-ms (- (js/Date.now) started-at)}))
10000)]
(log/info ::<invoke! [method direct-pass? args])
(-> (.remoteInvoke proxy method (boolean direct-pass?) args')
(p/finally (fn []
(js/clearTimeout timeout-id))))))
(defn- <init-worker!
[proxy rtc-ws-url]
(<invoke! proxy "thread-api/init" true #js [rtc-ws-url]))
(defn- <maybe-open-repo!
[proxy repo]
(when (seq repo)
(<invoke! proxy "thread-api/create-or-open-db" false [repo {}])))
(defn- set-main-thread-stub!
[]
(reset! worker-state/*main-thread
(fn [qkw _direct-pass? _args]
(p/rejected (ex-info "main-thread is not available in db-worker-node"
{:method qkw})))))
(defn- make-server
[proxy {:keys [auth-token]}]
(http/createServer
(fn [^js req ^js res]
(let [url (.-url req)
method (.-method req)]
(cond
(= url "/healthz")
(send-text! res 200 "ok")
(= url "/readyz")
(if @*ready?
(send-text! res 200 "ok")
(send-text! res 503 "not-ready"))
(= url "/v1/events")
(if (authorized? req auth-token)
(sse-handler req res)
(send-text! res 401 "unauthorized"))
(= url "/v1/invoke")
(if (authorized? req auth-token)
(if (= method "POST")
(-> (p/let [body (<read-body req)
payload (js/JSON.parse body)
{:keys [method directPass argsTransit args]} (js->clj payload :keywordize-keys true)
direct-pass? (boolean directPass)
args' (if direct-pass?
args
(or argsTransit args))
_ (log/info :db-worker-node-http-invoke
{:method method :direct-pass? direct-pass?})
result (<invoke! proxy method direct-pass? args')]
(send-json! res 200 (if direct-pass?
{:ok true :result result}
{:ok true :resultTransit result})))
(p/catch (fn [e]
(log/error :db-worker-node-http-invoke-failed e)
(send-json! res 500 {:ok false
:error (if (instance? js/Error e)
{:message (.-message e)
:stack (.-stack e)}
e)}))))
(send-text! res 405 "method-not-allowed"))
(send-text! res 401 "unauthorized"))
:else
(send-text! res 404 "not-found"))))))
(defn- show-help!
[]
(println "db-worker-node options:")
(println " --host <host> (default 127.0.0.1)")
(println " --port <port> (default 9101)")
(println " --data-dir <path> (default ~/.logseq/db-worker)")
(println " --repo <name> (optional)")
(println " --rtc-ws-url <url> (optional)")
(println " --log-level <level> (default info)")
(println " --auth-token <token> (optional)"))
(defn main
[]
(let [{:keys [host port data-dir repo rtc-ws-url log-level auth-token help?]}
(parse-args (.-argv js/process))
host (or host "127.0.0.1")
port (or port 9101)
log-level (keyword (or log-level "info"))]
(when help?
(show-help!)
(.exit js/process 0))
(glogi-console/install!)
(log/set-levels {:glogi/root log-level})
(set-main-thread-stub!)
(p/let [platform (platform-node/node-platform {:data-dir data-dir
:event-fn handle-event!})
proxy (db-core/init-core! platform)
_ (<init-worker! proxy (or rtc-ws-url ""))]
(reset! *ready? true)
(p/do!
(<maybe-open-repo! proxy repo)
(let [server (make-server proxy {:auth-token auth-token})]
(.listen server port host (fn []
(log/info :db-worker-node-ready {:host host :port port})))
(let [shutdown (fn []
(reset! *ready? false)
(.close server (fn []
(log/info :db-worker-node-stopped nil)
(.exit js/process 0))))]
(.on js/process "SIGINT" shutdown)
(.on js/process "SIGTERM" shutdown)))))))

View File

@@ -2,7 +2,7 @@
"Platform adapter contract for db-worker runtimes.")
(def ^:private required-sections
[:env :storage :kv :broadcast :websocket :crypto :timers])
[:env :storage :kv :broadcast :websocket :crypto :timers :sqlite])
(defonce ^:private *platform (atom nil))
@@ -62,6 +62,17 @@
(f url)
(throw (ex-info "platform websocket/connect missing" {:url url}))))
(defn sqlite-init!
[platform]
(when-let [f (get-in platform [:sqlite :init!])]
(f)))
(defn sqlite-open
[platform opts]
(if-let [f (get-in platform [:sqlite :open-db])]
(f opts)
(throw (ex-info "platform sqlite/open-db missing" {:opts opts}))))
(defn post-message!
[platform type payload]
(when-let [f (get-in platform [:broadcast :post-message!])]

View File

@@ -1,6 +1,7 @@
(ns frontend.worker.platform.browser
"Browser platform adapter for db-worker."
(:require ["/frontend/idbkv" :as idb-keyval]
["@sqlite.org/sqlite-wasm" :default sqlite3InitModule]
["comlink" :as Comlink]
[clojure.string :as string]
[frontend.common.file.opfs :as opfs]
@@ -89,6 +90,18 @@
[url]
(js/WebSocket. url))
(defn- init-sqlite!
[]
(sqlite3InitModule (clj->js {:print #(log/info :init-sqlite-module! %)
:printErr #(log/error :init-sqlite-module! %)})))
(defn- open-sqlite-db
[{:keys [sqlite pool path mode]}]
(if pool
(new (.-OpfsSAHPoolDb pool) path)
(let [^js DB (.-DB ^js (.-oo1 ^js sqlite))]
(new DB path (or mode "c")))))
(defn browser-platform
[]
{:env {:publishing? (string/includes? (.. js/location -href) "publishing=true")
@@ -96,6 +109,7 @@
:storage {:install-opfs-pool install-opfs-pool
:list-graphs list-graphs
:db-exists? db-exists?
:resolve-db-path (fn [_repo _pool path] path)
:export-file export-file
:import-db import-db
:remove-vfs! remove-vfs!
@@ -107,5 +121,10 @@
:set! kv-set!}
:broadcast {:post-message! worker-util/post-message}
:websocket {:connect websocket-connect}
:sqlite {:init! init-sqlite!
:open-db open-sqlite-db
:close-db (fn [db] (.close db))
:exec (fn [db sql-or-opts] (.exec db sql-or-opts))
:transaction (fn [db f] (.transaction db f))}
:crypto {}
:timers {:set-interval! (fn [f ms] (js/setInterval f ms))}})

View File

@@ -0,0 +1,225 @@
(ns frontend.worker.platform.node
"Node.js platform adapter for db-worker."
(:require ["better-sqlite3" :as sqlite3]
["fs/promises" :as fs]
["os" :as os]
["path" :as node-path]
["ws" :as ws]
[clojure.string :as string]
[frontend.worker-common.util :as worker-util]
[goog.object :as gobj]
[lambdaisland.glogi :as log]
[promesa.core :as p]))
(def ^:private sqlite
(or (aget sqlite3 "default") sqlite3))
(defn- expand-home
[path]
(if (string/starts-with? path "~")
(node-path/join (.homedir os) (subs path 1))
path))
(defn- ensure-dir!
[dir]
(fs/mkdir dir #js {:recursive true}))
(defn- strip-leading-slash
[path]
(string/replace-first path #"^/" ""))
(defn- repo-dir
[data-dir pool-name]
(node-path/join data-dir (str "." pool-name)))
(defn- pool-path
[^js pool path]
(node-path/join (.-repoDir pool) (strip-leading-slash path)))
(defn- path-under-data-dir
[data-dir path]
(if (node-path/isAbsolute path)
path
(node-path/join data-dir path)))
(defn- ->buffer
[data]
(cond
(instance? js/Buffer data) data
(instance? js/ArrayBuffer data) (js/Buffer.from data)
(and (some? data) (some? (.-buffer data))) (js/Buffer.from (.-buffer data))
:else (js/Buffer.from (str data))))
(defn- list-graphs
[data-dir]
(let [dir? #(and % (.isDirectory %))
db-dir-prefix ".logseq-pool-"]
(p/let [entries (fs/readdir data-dir #js {:withFileTypes true})
db-dirs (->> entries
(filter dir?)
(filter (fn [dirent]
(string/starts-with? (.-name dirent) db-dir-prefix))))
graph-names (map (fn [dirent]
(-> (.-name dirent)
(string/replace-first db-dir-prefix "")
;; TODO: DRY
(string/replace "+3A+" ":")
(string/replace "++" "/")))
db-dirs)]
(vec graph-names))))
(defn- db-exists?
[data-dir graph]
(p/let [pool-name (worker-util/get-pool-name graph)
db-path (node-path/join (repo-dir data-dir pool-name) "db.sqlite")]
(-> (fs/stat db-path)
(p/then (fn [_] true))
(p/catch (fn [_] false)))))
(defn- exec-sql
[db opts-or-sql]
(if (string? opts-or-sql)
(.exec db opts-or-sql)
(let [sql (gobj/get opts-or-sql "sql")
bind (gobj/get opts-or-sql "bind")
row-mode (gobj/get opts-or-sql "rowMode")
bind' (if (and bind (object? bind))
(let [out (js-obj)]
(doseq [key (js/Object.keys bind)]
(let [value (gobj/get bind key)
normalized (cond
(string/starts-with? key "$") (subs key 1)
(string/starts-with? key ":") (subs key 1)
:else key)]
(gobj/set out normalized value)))
out)
bind)
stmt (.prepare db sql)]
(if (= row-mode "array")
(do
(.raw stmt)
(if (some? bind')
(.all stmt bind')
(.all stmt)))
(do
(if (some? bind')
(.run stmt bind')
(.run stmt))
nil)))))
(defn- wrap-better-db
[db]
(let [wrapper (js-obj)]
(set! (.-exec wrapper) (fn [opts-or-sql] (exec-sql db opts-or-sql)))
(set! (.-transaction wrapper)
(fn [f]
(let [run-tx (.transaction db (fn [] (f wrapper)))]
(run-tx))))
(set! (.-close wrapper) (fn [] (.close db)))
wrapper))
(defn- open-sqlite-db
[{:keys [path]}]
(p/let [_ (ensure-dir! (node-path/dirname path))]
(wrap-better-db (new sqlite path))))
(defn- install-opfs-pool
[data-dir _sqlite pool-name]
(p/let [repo-dir-path (repo-dir data-dir pool-name)
_ (ensure-dir! repo-dir-path)
pool (js-obj)]
(set! (.-repoDir pool) repo-dir-path)
(set! (.-getCapacity pool) (fn [] 1))
(set! (.-pauseVfs pool) (fn [] nil))
(set! (.-unpauseVfs pool) (fn [] nil))
pool))
(defn- export-file
[pool path]
(fs/readFile (pool-path pool path)))
(defn- import-db
[pool path data]
(let [full-path (pool-path pool path)
dir (node-path/dirname full-path)]
(p/let [_ (ensure-dir! dir)]
(fs/writeFile full-path (->buffer data)))))
(defn- remove-vfs!
[pool]
(when pool
(fs/rm (.-repoDir pool) #js {:recursive true :force true})))
(defn- read-text!
[data-dir path]
(fs/readFile (path-under-data-dir data-dir path) "utf8"))
(defn- write-text!
[data-dir path text]
(let [full-path (path-under-data-dir data-dir path)
dir (node-path/dirname full-path)]
(p/let [_ (ensure-dir! dir)]
(fs/writeFile full-path text "utf8"))))
(defn- websocket-connect
[url]
(ws. url))
(defn- kv-store
[data-dir]
(let [kv-path (node-path/join data-dir "kv-store.json")
state (atom nil)
<load! (fn []
(if (some? @state)
(p/resolved @state)
(-> (fs/readFile kv-path "utf8")
(p/then (fn [contents]
(let [data (js/JSON.parse contents)]
(reset! state (js->clj data :keywordize-keys false))
@state)))
(p/catch (fn [_]
(reset! state {})
@state)))))]
{:get (fn [k]
(p/let [_ (<load!)]
(get @state k)))
:set! (fn [k value]
(p/let [_ (<load!)
_ (swap! state assoc k value)
payload (js/JSON.stringify (clj->js @state))]
(fs/writeFile kv-path payload "utf8")))}))
(defn node-platform
[{:keys [data-dir event-fn]}]
(let [data-dir (expand-home (or data-dir "~/.logseq/db-worker"))
kv (kv-store data-dir)]
(p/do!
(ensure-dir! data-dir)
(log/info :db-worker-node-platform {:data-dir data-dir})
{:env {:publishing? false
:runtime :node
:data-dir data-dir}
:storage {:install-opfs-pool (fn [sqlite-module pool-name]
(install-opfs-pool data-dir sqlite-module pool-name))
:list-graphs (fn [] (list-graphs data-dir))
:db-exists? (fn [graph] (db-exists? data-dir graph))
:resolve-db-path (fn [_repo pool path]
(pool-path pool path))
:export-file export-file
:import-db import-db
:remove-vfs! remove-vfs!
:read-text! (fn [path] (read-text! data-dir path))
:write-text! (fn [path text] (write-text! data-dir path text))}
:kv {:get (:get kv)
:set! (:set! kv)}
:broadcast {:post-message! (fn [type payload]
(when event-fn
(event-fn type payload)))}
:websocket {:connect websocket-connect}
:sqlite {:init! (fn [] nil)
:open-db open-sqlite-db
:close-db (fn [db] (.close db))
:exec (fn [db sql-or-opts] (.exec db sql-or-opts))
:transaction (fn [db f] (.transaction db f))}
:crypto {}
:timers {:set-interval! (fn [f ms] (js/setInterval f ms))}})))

View File

@@ -1,6 +1,7 @@
(ns frontend.worker.shared-service
"This allows multiple workers to share some resources (e.g. db access)"
(:require [cljs-bean.core :as bean]
[frontend.worker.platform :as platform]
[goog.object :as gobj]
[lambdaisland.glogi :as log]
[logseq.common.util :as common-util]
@@ -35,6 +36,12 @@
(defonce *client-id (atom nil))
(defonce *master-client-lock (atom nil))
(defn- node-runtime?
[]
(try
(= :node (platform/env-flag (platform/current) :runtime))
(catch :default _ false)))
(defn- next-request-id
[]
(vswap! *current-request-id inc))
@@ -307,60 +314,83 @@
forward the data broadcast from the master client directly to the UI thread."
[service-name target on-become-master-handler broadcast-data-types {:keys [import?]}]
(clear-old-service!)
(when import? (reset! *master-client? true))
(p/let [broadcast-data-types (set broadcast-data-types)
status {:ready (p/deferred)}
common-channel (ensure-common-channel service-name)
client-id (<ensure-client-id)
<check-master-slave-fn!
(fn []
(<check-master-or-slave-client!
service-name
#(<on-become-master
client-id service-name common-channel target
on-become-master-handler (:ready status))
#(<on-become-slave
client-id service-name common-channel broadcast-data-types (:ready status))))]
(<check-master-slave-fn!)
(log/info :shared-service/create-start {:service service-name
:node-runtime? (node-runtime?)
:import? import?})
(if (node-runtime?)
(p/do!
(reset! *master-client? true)
(reset! *client-id "node")
(log/info :shared-service/node-single-client {:service service-name
:client-id @*client-id})
(p/let [status {:ready (p/deferred)}]
(p/do!
(on-become-master-handler service-name)
(p/resolve! (:ready status)))
{:proxy target
:status status
:client-id @*client-id}))
(do
(when import? (reset! *master-client? true))
(p/let [broadcast-data-types (set broadcast-data-types)
status {:ready (p/deferred)}
common-channel (ensure-common-channel service-name)
client-id (<ensure-client-id)
<check-master-slave-fn!
(fn []
(<check-master-or-slave-client!
service-name
#(<on-become-master
client-id service-name common-channel target
on-become-master-handler (:ready status))
#(<on-become-slave
client-id service-name common-channel broadcast-data-types (:ready status))))]
(<check-master-slave-fn!)
(log/info :shared-service/create-client {:service service-name
:client-id client-id
:import? import?})
(add-watch *master-re-check-trigger :check-master
(fn [_ _ _ new-value]
(when (= new-value :re-check)
(p/do!
(p/delay 100) ; why need delay here?
(<check-master-slave-fn!)))))
(add-watch *master-re-check-trigger :check-master
(fn [_ _ _ new-value]
(when (= new-value :re-check)
(p/do!
(p/delay 100) ; why need delay here?
(<check-master-slave-fn!)))))
{:proxy (js/Proxy. target
#js {:get (fn [target method]
(if (= "remoteInvoke" method)
(fn [args]
(cond
@*master-client?
(<apply-target-f! target method args)
{:proxy (js/Proxy. target
#js {:get (fn [target method]
(if (= "remoteInvoke" method)
(fn [args]
(cond
@*master-client?
(<apply-target-f! target method args)
:else
(let [request-id (next-request-id)
client-channel (ensure-client-channel client-id service-name)]
(p/create
(fn [resolve-fn reject-fn]
(vswap! *requests-in-flight assoc request-id {:method method
:args args
:resolve-fn resolve-fn
:reject-fn reject-fn})
(.postMessage client-channel (bean/->js
{:id request-id
:type "request"
:method method
:args args})))))))
(log/error :invalid-invoke-method method)))})
:status status
:client-id client-id}))
:else
(let [request-id (next-request-id)
client-channel (ensure-client-channel client-id service-name)]
(p/create
(fn [resolve-fn reject-fn]
(vswap! *requests-in-flight assoc request-id {:method method
:args args
:resolve-fn resolve-fn
:reject-fn reject-fn})
(.postMessage client-channel (bean/->js
{:id request-id
:type "request"
:method method
:args args})))))))
(log/error :invalid-invoke-method method)))})
:status status
:client-id client-id}))))
(defn broadcast-to-clients!
[type' data]
(let [transit-payload (ldb/write-transit-str [type' data])]
(when (exists? js/self) (.postMessage js/self transit-payload))
(when-let [common-channel @*common-channel]
(let [str-type' (common-util/keyword->string type')]
(.postMessage common-channel #js {:type str-type'
:data transit-payload})))))
(if (node-runtime?)
(platform/post-message! (platform/current) type' transit-payload)
(do
(when (exists? js/self) (.postMessage js/self transit-payload))
(when-let [common-channel @*common-channel]
(let [str-type' (common-util/keyword->string type')]
(.postMessage common-channel #js {:type str-type'
:data transit-payload})))))))