milestone 3 (part 2)

This commit is contained in:
rcmerci
2026-01-13 23:26:25 +08:00
parent e09b077494
commit dcff603fb6
11 changed files with 239 additions and 48 deletions

View File

@@ -103,21 +103,16 @@ Node runtime must not use OPFS or sqlite-wasm. Instead, use `better-sqlite3` as
- DONE 9. Update shared-service to no-op/single-client behavior in Node.
- DONE 10. Add Node build target in `shadow-cljs.edn` for db-worker.
- DONE 11. Implement Node daemon entrypoint and HTTP server.
- TODO 12. Add a Node client in frontend to call the daemon (HTTP + SSE/WS events).
- TODO 12. Add a Node client in frontend to call the daemon (HTTP + SSE events).
- DONE 12a. Switch Node sqlite implementation to `better-sqlite3` (no OPFS, no sqlite-wasm).
#### Acceptance Criteria
- Node platform adapter provides storage/kv/broadcast/websocket/crypto/timers and validates via `frontend.worker.platform`.
- Node sqlite adapter uses `better-sqlite3` and opens file-backed dbs in data-dir.
- Node build target compiles db-worker core without browser-only APIs.
- Node daemon starts via CLI and reports readiness; `GET /healthz` and `GET /readyz` return `200 OK`.
- `POST /v1/invoke` handles `list-db`, `create-or-open-db`, `q`, `transact` in a smoke test.
- steps:
1. list-db
2. create-or-open-db
3. list-db, ensure new created db existing
4. transact
5. q
- Node client can invoke at least one RPC and receive one event (SSE or WS).
- `POST /v1/invoke` handles `list-db`, `create-or-open-db`, `q`, `transact` in a smoke test:
- test client script: `tmp_scripts/db-worker-smoke-test.clj`
- Node client can invoke at least one RPC and receive one event (SSE).
- `bb dev:lint-and-test` passes.
### Milestone 4: Validation
@@ -182,6 +177,7 @@ Event delivery options:
- `BroadcastChannel` and `navigator.locks` are browser-only; Node should use a simpler single-client mode.
- `Comlink` is browser-optimized; the Node daemon should use HTTP, not Comlink.
- sqlite-wasm must remain browser-only; Node uses `better-sqlite3` directly.
- only db-graph supported in Node db-worker
## Success Criteria
- Browser build continues to work with WebWorker + Comlink.

View File

@@ -147,7 +147,7 @@
"@tabler/icons-webfont": "^2.47.0",
"@tippyjs/react": "4.2.5",
"bignumber.js": "^9.0.2",
"better-sqlite3": "11.10.0",
"better-sqlite3": "12.6.0",
"chokidar": "3.5.1",
"chrono-node": "2.2.4",
"codemirror": "5.65.18",
@@ -192,6 +192,7 @@
"threads": "1.6.5",
"url": "^0.11.0",
"util": "^0.12.5",
"ws": "8.19.0",
"yargs-parser": "20.2.4"
},
"resolutions": {

View File

@@ -22,8 +22,11 @@
[]
(if (node-runtime?)
(or @node-db
(reset! node-db (node/start! (assoc (node/default-config)
:event-handler worker-handler/handle))))
(let [client (node/start! (assoc (node/default-config)
:event-handler worker-handler/handle))]
(reset! node-db client)
(reset! state/*db-worker (:wrapped-worker client))
client))
opfs-db))
(defn <list-db []

View File

@@ -104,10 +104,19 @@
(string/split-lines)
(->> (some #(when (string/starts-with? % "data: ")
(subs % 6)))))]
(let [{:keys [type payload]} (js->clj (js/JSON.parse line) :keywordize-keys true)]
(when (and type payload)
(handler (keyword type) (ldb/read-transit-str payload) wrapped-worker))))
(recur))))))
(let [{:keys [type payload]} (js->clj (js/JSON.parse line) :keywordize-keys true)
decoded (when (some? payload)
(try
(ldb/read-transit-str payload)
(catch :default _ payload)))
[event-type event-payload] (if (and (vector? decoded)
(= 2 (count decoded))
(keyword? (first decoded)))
[(first decoded) (second decoded)]
[(keyword type) decoded])]
(when (some? type)
(handler event-type event-payload wrapped-worker)))))
(recur)))))
(.on res "error" (fn [e]
(log/error :db-worker-node-events-error e)))))]
(.on req "error" (fn [e]

View File

@@ -33,7 +33,6 @@
[frontend.worker.shared-service :as shared-service]
[frontend.worker.state :as worker-state]
[frontend.worker.thread-atom]
[goog.object :as gobj]
[lambdaisland.glogi :as log]
[logseq.cli.common.mcp.tools :as cli-common-mcp-tools]
[logseq.common.util :as common-util]
@@ -92,10 +91,8 @@
(when-not @*publishing?
(or (get-storage-pool graph)
(p/let [storage (platform/storage (platform/current))
_ (log/info :db-worker/get-opfs-pool {:graph graph})
^js pool ((:install-opfs-pool storage) @*sqlite (worker-util/get-pool-name graph))]
(remember-storage-pool! graph pool)
(log/info :db-worker/get-opfs-pool-done {:graph graph})
pool))))
(defn- init-sqlite-module!
@@ -230,9 +227,6 @@
(.getCapacity pool))
_ (when (and (some? capacity) (zero? capacity))
(.unpauseVfs pool))
_ (log/info :db-worker/get-dbs-paths {:repo repo
:repo-dir (.-repoDir pool)
:capacity capacity})
db-path (resolve-db-path repo pool repo-path)
search-path (resolve-db-path repo pool (str "search" repo-path))
client-ops-path (resolve-db-path repo pool (str "client-ops-" repo-path))
@@ -345,8 +339,7 @@
(let [client-ops (rtc-migrate/migration-results=>client-ops migration-result)]
(client-op/add-ops! repo client-ops))))
(db-listener/listen-db-changes! repo (get @*datascript-conns repo))
(log/info :db-worker/create-or-open-done {:repo repo})))))
(db-listener/listen-db-changes! repo (get @*datascript-conns repo))))))
(defn- <list-all-dbs
[]
@@ -361,7 +354,6 @@
(def-thread-api :thread-api/list-db
[]
(log/info :thread-api/list-db nil)
(<list-all-dbs))
(defn- <db-exists?
@@ -407,17 +399,12 @@
(p/do!
(when close-other-db?
(close-other-dbs! repo))
(log/info :db-worker/start-db {:repo repo
:close-other-db? close-other-db?
:master? @shared-service/*master-client?})
(when @shared-service/*master-client?
(<create-or-open-db! repo (dissoc opts :close-other-db?)))
nil))
(def-thread-api :thread-api/create-or-open-db
[repo opts]
(log/info :thread-api/create-or-open-db {:repo repo
:opts (dissoc opts :config)})
(when-not (= repo (worker-state/get-current-repo)) ; graph switched
(reset! worker-state/*deleted-block-uuid->db-id {}))
(start-db! repo opts))
@@ -870,8 +857,7 @@
;; Don't wait for rtc started because the app will be slow to be ready
;; for users.
(when @worker-state/*rtc-ws-url
(rtc.core/new-task--rtc-start true))
(log/info :db-worker/on-become-master-done {:repo repo}))))
(rtc.core/new-task--rtc-start true)))))
(def broadcast-data-types
(set (map
@@ -944,7 +930,6 @@
{:client-id client-id}))
(get-in service [:status :ready])
;; wait for service ready
(log/info :DEBUG [k args])
(js-invoke (:proxy service) k args)))
(or
@@ -956,7 +941,6 @@
:else
;; ensure service is ready
(p/let [_ready-value (get-in service [:status :ready])]
(log/info :DEBUG [k args])
(js-invoke (:proxy service) k args)))))]))
(into {})
bean/->js))

View File

@@ -60,9 +60,16 @@
"--help" (recur remaining (assoc opts :help? true))
(recur remaining opts))))))
(defn- encode-event-payload
[payload]
(if (string? payload)
payload
(ldb/write-transit-str payload)))
(defn- handle-event!
[type payload]
(let [event (js/JSON.stringify (clj->js {:type type :payload payload}))
(let [event (js/JSON.stringify (clj->js {:type type
:payload (encode-event-payload payload)}))
message (str "data: " event "\n\n")]
(doseq [^js res @*sse-clients]
(try
@@ -94,7 +101,6 @@
{:method method
:elapsed-ms (- (js/Date.now) started-at)}))
10000)]
(log/info ::<invoke! [method direct-pass? args])
(-> (.remoteInvoke proxy method (boolean direct-pass?) args')
(p/finally (fn []
(js/clearTimeout timeout-id))))))
@@ -145,8 +151,6 @@
args' (if direct-pass?
args
(or argsTransit args))
_ (log/info :db-worker-node-http-invoke
{:method method :direct-pass? direct-pass?})
result (<invoke! proxy method direct-pass? args')]
(send-json! res 200 (if direct-pass?
{:ok true :result result}

View File

@@ -314,20 +314,17 @@
forward the data broadcast from the master client directly to the UI thread."
[service-name target on-become-master-handler broadcast-data-types {:keys [import?]}]
(clear-old-service!)
(log/info :shared-service/create-start {:service service-name
:node-runtime? (node-runtime?)
:import? import?})
(if (node-runtime?)
(p/do!
(reset! *master-client? true)
(reset! *client-id "node")
(log/info :shared-service/node-single-client {:service service-name
:client-id @*client-id})
(p/let [status {:ready (p/deferred)}]
(p/do!
(on-become-master-handler service-name)
(p/resolve! (:ready status)))
{:proxy target
{:proxy #js {"remoteInvoke"
(fn [args]
(<apply-target-f! target "remoteInvoke" args))}
:status status
:client-id @*client-id}))
(do

View File

@@ -0,0 +1,46 @@
(ns frontend.worker.shared-service-test
(:require [cljs.test :refer [deftest is async]]
[frontend.worker.platform :as platform]
[frontend.worker.shared-service :as shared-service]
[promesa.core :as p]))
(defn- test-platform
[runtime]
{:env {:runtime runtime}
:storage {}
:kv {}
:broadcast {}
:websocket {}
:crypto {}
:timers {}
:sqlite {}})
(deftest node-proxy-remote-invoke-applies-args
(async done
(let [prev-platform (try
(platform/current)
(catch :default _ nil))
received (atom nil)
target #js {"remoteInvoke" (fn [& args]
(reset! received args)
:ok)}]
(platform/set-platform! (test-platform :node))
(-> (p/let [service (shared-service/<create-service
"test-service"
target
(fn [_] (p/resolved nil))
#{}
{:import? false})
args (list "thread-api/foo" true #js [1 2])
remote-invoke (aget (:proxy service) "remoteInvoke")]
(is (fn? remote-invoke))
(when (fn? remote-invoke)
(remote-invoke args))
(let [[method direct-pass? payload] @received]
(is (= "thread-api/foo" method))
(is (= true direct-pass?))
(is (= [1 2] (js->clj payload)))))
(p/finally (fn []
(when prev-platform
(platform/set-platform! prev-platform))))
(p/then (fn [] (done)))))))

View File

@@ -0,0 +1,93 @@
(require '[babashka.curl :as curl]
'[cheshire.core :as json]
'[cognitect.transit :as transit]
'[clojure.pprint :as pprint]
'[clojure.string :as string])
(def base-url (or (System/getenv "DB_WORKER_URL") "http://127.0.0.1:9101"))
(defn write-transit [v]
(let [out (java.io.ByteArrayOutputStream.)
w (transit/writer out :json)]
(transit/write w v)
(.toString out "UTF-8")))
(defn read-transit [s]
(let [in (java.io.ByteArrayInputStream. (.getBytes s "UTF-8"))
r (transit/reader in :json)]
(transit/read r)))
(defn invoke [method direct-pass? args]
(let [payload (if direct-pass?
{:method method :directPass true :args args}
{:method method :directPass false :argsTransit (write-transit args)})
resp (curl/post (str base-url "/v1/invoke")
{:headers {"Content-Type" "application/json"}
:body (json/generate-string payload)})
body (json/parse-string (:body resp) true)]
(if (<= 200 (:status resp) 299)
(if direct-pass?
(:result body)
(read-transit (:resultTransit body)))
(throw (ex-info "db-worker invoke failed" {:status (:status resp) :body (:body resp)})))))
(def suffix (subs (str (random-uuid)) 0 8))
(def repo (str "logseq_db_smoke_" suffix))
(def page-uuid (random-uuid))
(def block-uuid (random-uuid))
(def now (long (System/currentTimeMillis)))
(println "== db-worker-node smoke test ==")
(println "Base URL:" base-url)
(println "Repo:" repo)
(println "Step 1/4: list-db (before)")
(println "Result:" (json/generate-string (invoke "thread-api/list-db" false [])
{:pretty true}))
(println "Step 2/4: create-or-open-db")
(invoke "thread-api/create-or-open-db" false [repo {}])
(println "Step 3/4: list-db (after)")
(println "Result:" (json/generate-string (invoke "thread-api/list-db" false [])
{:pretty true}))
(println "Step 4/4: transact + q")
(invoke "thread-api/transact" false
[repo
[{:block/uuid page-uuid
:block/title "Smoke Page"
:block/name "smoke-page"
:block/tags #{:logseq.class/Page}
:block/created-at now
:block/updated-at now}
{:block/uuid block-uuid
:block/title "Smoke Test"
:block/page [:block/uuid page-uuid]
:block/parent [:block/uuid page-uuid]
:block/order "a0"
:block/created-at now
:block/updated-at now}]
{}
nil])
(let [query '[:find ?e
:in $ ?uuid
:where [?e :block/uuid ?uuid]]
result (invoke "thread-api/q" false [repo [query block-uuid]])]
(println "Query result:" result)
(when (empty? result)
(throw (ex-info "Query returned no results" {:uuid block-uuid}))))
(let [page-query '[:find (pull ?e [:db/id :block/uuid :block/title :block/name :block/tags])
:in $ ?uuid
:where [?e :block/uuid ?uuid]]
blocks-query '[:find (pull ?e [:db/id :block/uuid :block/title :block/order :block/parent])
:in $ ?page-uuid
:where [?page :block/uuid ?page-uuid]
[?e :block/page ?page]]
page-result (invoke "thread-api/q" false [repo [page-query page-uuid]])
blocks-result (invoke "thread-api/q" false [repo [blocks-query page-uuid]])]
(println "Page + blocks (pretty):")
(pprint/pprint {:page page-result
:blocks blocks-result}))
(println "Smoke test OK")

View File

@@ -0,0 +1,53 @@
#!/usr/bin/env bb
(require '[babashka.process :as process]
'[clojure.java.io :as io]
'[clojure.string :as string])
(def base-url (or (System/getenv "DB_WORKER_URL")
"http://127.0.0.1:9101"))
(def auth-token (System/getenv "DB_WORKER_AUTH_TOKEN"))
(def events-url (str (string/replace base-url #"/$" "") "/v1/events"))
(defn- open-sse-connection
[url token]
(let [^java.net.HttpURLConnection conn (.openConnection (java.net.URL. url))]
(.setRequestMethod conn "GET")
(.setRequestProperty conn "Accept" "text/event-stream")
(when (seq token)
(.setRequestProperty conn "Authorization" (str "Bearer " token)))
(.setDoInput conn true)
(.connect conn)
conn))
(defn- wait-for-sse!
[^java.net.HttpURLConnection conn timeout-ms]
(let [event-seen (promise)
reader (future
(try
(with-open [rdr (io/reader (.getInputStream conn))]
(doseq [line (line-seq rdr)]
(when (string/starts-with? line "data:")
(deliver event-seen line)
(reduced nil))))
(catch Exception _ nil)))]
(try
(let [result (deref event-seen timeout-ms ::timeout)]
(when (= result ::timeout)
(throw (ex-info "No SSE events captured" {:url events-url})))
result)
(finally
(.disconnect conn)
(future-cancel reader)))))
(defn- run-smoke-test!
[]
(let [{:keys [exit]} (process/shell {:inherit true}
"bb" "tmp_scripts/db-worker-smoke-test.clj")]
(when-not (zero? exit)
(throw (ex-info "Smoke test failed" {:exit exit})))))
(comment
(let [conn (open-sse-connection events-url auth-token)]
(run-smoke-test!)
(wait-for-sse! conn 2000)
(println "SSE smoke test OK")))

View File

@@ -2211,10 +2211,10 @@ base@^0.11.1:
mixin-deep "^1.2.0"
pascalcase "^0.1.1"
better-sqlite3@11.10.0:
version "11.10.0"
resolved "https://registry.yarnpkg.com/better-sqlite3/-/better-sqlite3-11.10.0.tgz#2b1b14c5acd75a43fd84d12cc291ea98cef57d98"
integrity sha512-EwhOpyXiOEL/lKzHz9AW1msWFNzGc/z+LzeB3/jnFJpxu+th2yqvzsSWas1v9jgs9+xiXJcD5A8CJxAG2TaghQ==
better-sqlite3@12.6.0:
version "12.6.0"
resolved "https://registry.yarnpkg.com/better-sqlite3/-/better-sqlite3-12.6.0.tgz#77ada940ca35943bd677bcb2937615b7079604b7"
integrity sha512-FXI191x+D6UPWSze5IzZjhz+i9MK9nsuHsmTX9bXVl52k06AfZ2xql0lrgIUuzsMsJ7Vgl5kIptvDgBLIV3ZSQ==
dependencies:
bindings "^1.5.0"
prebuild-install "^7.1.1"
@@ -11049,6 +11049,11 @@ write-file-atomic@^3.0.3:
signal-exit "^3.0.2"
typedarray-to-buffer "^3.1.5"
ws@8.19.0:
version "8.19.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.19.0.tgz#ddc2bdfa5b9ad860204f5a72a4863a8895fd8c8b"
integrity sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==
ws@^7.4.6:
version "7.5.10"
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.10.tgz#58b5c20dc281633f6c19113f39b349bd8bd558d9"