diff --git a/docs/agent-guide/task--db-worker-nodejs-compatible.md b/docs/agent-guide/task--db-worker-nodejs-compatible.md index d0a1b8828f..525776a578 100644 --- a/docs/agent-guide/task--db-worker-nodejs-compatible.md +++ b/docs/agent-guide/task--db-worker-nodejs-compatible.md @@ -103,21 +103,16 @@ Node runtime must not use OPFS or sqlite-wasm. Instead, use `better-sqlite3` as - DONE 9. Update shared-service to no-op/single-client behavior in Node. - DONE 10. Add Node build target in `shadow-cljs.edn` for db-worker. - DONE 11. Implement Node daemon entrypoint and HTTP server. -- TODO 12. Add a Node client in frontend to call the daemon (HTTP + SSE/WS events). +- TODO 12. Add a Node client in frontend to call the daemon (HTTP + SSE events). - DONE 12a. Switch Node sqlite implementation to `better-sqlite3` (no OPFS, no sqlite-wasm). #### Acceptance Criteria - Node platform adapter provides storage/kv/broadcast/websocket/crypto/timers and validates via `frontend.worker.platform`. - Node sqlite adapter uses `better-sqlite3` and opens file-backed dbs in data-dir. - Node build target compiles db-worker core without browser-only APIs. - Node daemon starts via CLI and reports readiness; `GET /healthz` and `GET /readyz` return `200 OK`. -- `POST /v1/invoke` handles `list-db`, `create-or-open-db`, `q`, `transact` in a smoke test. - - steps: - 1. list-db - 2. create-or-open-db - 3. list-db, ensure new created db existing - 4. transact - 5. q -- Node client can invoke at least one RPC and receive one event (SSE or WS). +- `POST /v1/invoke` handles `list-db`, `create-or-open-db`, `q`, `transact` in a smoke test: + - test client script: `tmp_scripts/db-worker-smoke-test.clj` +- Node client can invoke at least one RPC and receive one event (SSE). - `bb dev:lint-and-test` passes. ### Milestone 4: Validation @@ -182,6 +177,7 @@ Event delivery options: - `BroadcastChannel` and `navigator.locks` are browser-only; Node should use a simpler single-client mode. - `Comlink` is browser-optimized; the Node daemon should use HTTP, not Comlink. - sqlite-wasm must remain browser-only; Node uses `better-sqlite3` directly. +- only db-graph supported in Node db-worker ## Success Criteria - Browser build continues to work with WebWorker + Comlink. diff --git a/package.json b/package.json index 9d0ff6a7b4..2db28e0f45 100644 --- a/package.json +++ b/package.json @@ -147,7 +147,7 @@ "@tabler/icons-webfont": "^2.47.0", "@tippyjs/react": "4.2.5", "bignumber.js": "^9.0.2", - "better-sqlite3": "11.10.0", + "better-sqlite3": "12.6.0", "chokidar": "3.5.1", "chrono-node": "2.2.4", "codemirror": "5.65.18", @@ -192,6 +192,7 @@ "threads": "1.6.5", "url": "^0.11.0", "util": "^0.12.5", + "ws": "8.19.0", "yargs-parser": "20.2.4" }, "resolutions": { diff --git a/src/main/frontend/persist_db.cljs b/src/main/frontend/persist_db.cljs index 160fa986a7..91cecaf0ec 100644 --- a/src/main/frontend/persist_db.cljs +++ b/src/main/frontend/persist_db.cljs @@ -22,8 +22,11 @@ [] (if (node-runtime?) (or @node-db - (reset! node-db (node/start! (assoc (node/default-config) - :event-handler worker-handler/handle)))) + (let [client (node/start! (assoc (node/default-config) + :event-handler worker-handler/handle))] + (reset! node-db client) + (reset! state/*db-worker (:wrapped-worker client)) + client)) opfs-db)) (defn > (some #(when (string/starts-with? % "data: ") (subs % 6)))))] - (let [{:keys [type payload]} (js->clj (js/JSON.parse line) :keywordize-keys true)] - (when (and type payload) - (handler (keyword type) (ldb/read-transit-str payload) wrapped-worker)))) - (recur)))))) + (let [{:keys [type payload]} (js->clj (js/JSON.parse line) :keywordize-keys true) + decoded (when (some? payload) + (try + (ldb/read-transit-str payload) + (catch :default _ payload))) + [event-type event-payload] (if (and (vector? decoded) + (= 2 (count decoded)) + (keyword? (first decoded))) + [(first decoded) (second decoded)] + [(keyword type) decoded])] + (when (some? type) + (handler event-type event-payload wrapped-worker))))) + (recur))))) (.on res "error" (fn [e] (log/error :db-worker-node-events-error e)))))] (.on req "error" (fn [e] diff --git a/src/main/frontend/worker/db_core.cljs b/src/main/frontend/worker/db_core.cljs index d5070db126..561b028afe 100644 --- a/src/main/frontend/worker/db_core.cljs +++ b/src/main/frontend/worker/db_core.cljs @@ -33,7 +33,6 @@ [frontend.worker.shared-service :as shared-service] [frontend.worker.state :as worker-state] [frontend.worker.thread-atom] - [goog.object :as gobj] [lambdaisland.glogi :as log] [logseq.cli.common.mcp.tools :as cli-common-mcp-tools] [logseq.common.util :as common-util] @@ -92,10 +91,8 @@ (when-not @*publishing? (or (get-storage-pool graph) (p/let [storage (platform/storage (platform/current)) - _ (log/info :db-worker/get-opfs-pool {:graph graph}) ^js pool ((:install-opfs-pool storage) @*sqlite (worker-util/get-pool-name graph))] (remember-storage-pool! graph pool) - (log/info :db-worker/get-opfs-pool-done {:graph graph}) pool)))) (defn- init-sqlite-module! @@ -230,9 +227,6 @@ (.getCapacity pool)) _ (when (and (some? capacity) (zero? capacity)) (.unpauseVfs pool)) - _ (log/info :db-worker/get-dbs-paths {:repo repo - :repo-dir (.-repoDir pool) - :capacity capacity}) db-path (resolve-db-path repo pool repo-path) search-path (resolve-db-path repo pool (str "search" repo-path)) client-ops-path (resolve-db-path repo pool (str "client-ops-" repo-path)) @@ -345,8 +339,7 @@ (let [client-ops (rtc-migrate/migration-results=>client-ops migration-result)] (client-op/add-ops! repo client-ops)))) - (db-listener/listen-db-changes! repo (get @*datascript-conns repo)) - (log/info :db-worker/create-or-open-done {:repo repo}))))) + (db-listener/listen-db-changes! repo (get @*datascript-conns repo)))))) (defn- db-id {})) (start-db! repo opts)) @@ -870,8 +857,7 @@ ;; Don't wait for rtc started because the app will be slow to be ready ;; for users. (when @worker-state/*rtc-ws-url - (rtc.core/new-task--rtc-start true)) - (log/info :db-worker/on-become-master-done {:repo repo})))) + (rtc.core/new-task--rtc-start true))))) (def broadcast-data-types (set (map @@ -944,7 +930,6 @@ {:client-id client-id})) (get-in service [:status :ready]) ;; wait for service ready - (log/info :DEBUG [k args]) (js-invoke (:proxy service) k args))) (or @@ -956,7 +941,6 @@ :else ;; ensure service is ready (p/let [_ready-value (get-in service [:status :ready])] - (log/info :DEBUG [k args]) (js-invoke (:proxy service) k args)))))])) (into {}) bean/->js)) diff --git a/src/main/frontend/worker/db_worker_node.cljs b/src/main/frontend/worker/db_worker_node.cljs index 5b4c6a7fe5..8f12ac8fbd 100644 --- a/src/main/frontend/worker/db_worker_node.cljs +++ b/src/main/frontend/worker/db_worker_node.cljs @@ -60,9 +60,16 @@ "--help" (recur remaining (assoc opts :help? true)) (recur remaining opts)))))) +(defn- encode-event-payload + [payload] + (if (string? payload) + payload + (ldb/write-transit-str payload))) + (defn- handle-event! [type payload] - (let [event (js/JSON.stringify (clj->js {:type type :payload payload})) + (let [event (js/JSON.stringify (clj->js {:type type + :payload (encode-event-payload payload)})) message (str "data: " event "\n\n")] (doseq [^js res @*sse-clients] (try @@ -94,7 +101,6 @@ {:method method :elapsed-ms (- (js/Date.now) started-at)})) 10000)] - (log/info :: (.remoteInvoke proxy method (boolean direct-pass?) args') (p/finally (fn [] (js/clearTimeout timeout-id)))))) @@ -145,8 +151,6 @@ args' (if direct-pass? args (or argsTransit args)) - _ (log/info :db-worker-node-http-invoke - {:method method :direct-pass? direct-pass?}) result ( (p/let [service (shared-service/clj payload))))) + (p/finally (fn [] + (when prev-platform + (platform/set-platform! prev-platform)))) + (p/then (fn [] (done))))))) diff --git a/tmp_scripts/db-worker-smoke-test.clj b/tmp_scripts/db-worker-smoke-test.clj new file mode 100644 index 0000000000..0debde9741 --- /dev/null +++ b/tmp_scripts/db-worker-smoke-test.clj @@ -0,0 +1,93 @@ +(require '[babashka.curl :as curl] + '[cheshire.core :as json] + '[cognitect.transit :as transit] + '[clojure.pprint :as pprint] + '[clojure.string :as string]) + +(def base-url (or (System/getenv "DB_WORKER_URL") "http://127.0.0.1:9101")) + +(defn write-transit [v] + (let [out (java.io.ByteArrayOutputStream.) + w (transit/writer out :json)] + (transit/write w v) + (.toString out "UTF-8"))) + +(defn read-transit [s] + (let [in (java.io.ByteArrayInputStream. (.getBytes s "UTF-8")) + r (transit/reader in :json)] + (transit/read r))) + +(defn invoke [method direct-pass? args] + (let [payload (if direct-pass? + {:method method :directPass true :args args} + {:method method :directPass false :argsTransit (write-transit args)}) + resp (curl/post (str base-url "/v1/invoke") + {:headers {"Content-Type" "application/json"} + :body (json/generate-string payload)}) + body (json/parse-string (:body resp) true)] + (if (<= 200 (:status resp) 299) + (if direct-pass? + (:result body) + (read-transit (:resultTransit body))) + (throw (ex-info "db-worker invoke failed" {:status (:status resp) :body (:body resp)}))))) + +(def suffix (subs (str (random-uuid)) 0 8)) +(def repo (str "logseq_db_smoke_" suffix)) +(def page-uuid (random-uuid)) +(def block-uuid (random-uuid)) +(def now (long (System/currentTimeMillis))) + +(println "== db-worker-node smoke test ==") +(println "Base URL:" base-url) +(println "Repo:" repo) +(println "Step 1/4: list-db (before)") +(println "Result:" (json/generate-string (invoke "thread-api/list-db" false []) + {:pretty true})) + +(println "Step 2/4: create-or-open-db") +(invoke "thread-api/create-or-open-db" false [repo {}]) +(println "Step 3/4: list-db (after)") +(println "Result:" (json/generate-string (invoke "thread-api/list-db" false []) + {:pretty true})) + +(println "Step 4/4: transact + q") +(invoke "thread-api/transact" false + [repo + [{:block/uuid page-uuid + :block/title "Smoke Page" + :block/name "smoke-page" + :block/tags #{:logseq.class/Page} + :block/created-at now + :block/updated-at now} + {:block/uuid block-uuid + :block/title "Smoke Test" + :block/page [:block/uuid page-uuid] + :block/parent [:block/uuid page-uuid] + :block/order "a0" + :block/created-at now + :block/updated-at now}] + {} + nil]) + +(let [query '[:find ?e + :in $ ?uuid + :where [?e :block/uuid ?uuid]] + result (invoke "thread-api/q" false [repo [query block-uuid]])] + (println "Query result:" result) + (when (empty? result) + (throw (ex-info "Query returned no results" {:uuid block-uuid})))) + +(let [page-query '[:find (pull ?e [:db/id :block/uuid :block/title :block/name :block/tags]) + :in $ ?uuid + :where [?e :block/uuid ?uuid]] + blocks-query '[:find (pull ?e [:db/id :block/uuid :block/title :block/order :block/parent]) + :in $ ?page-uuid + :where [?page :block/uuid ?page-uuid] + [?e :block/page ?page]] + page-result (invoke "thread-api/q" false [repo [page-query page-uuid]]) + blocks-result (invoke "thread-api/q" false [repo [blocks-query page-uuid]])] + (println "Page + blocks (pretty):") + (pprint/pprint {:page page-result + :blocks blocks-result})) + +(println "Smoke test OK") diff --git a/tmp_scripts/db-worker-sse-smoke-test.clj b/tmp_scripts/db-worker-sse-smoke-test.clj new file mode 100644 index 0000000000..c343acc996 --- /dev/null +++ b/tmp_scripts/db-worker-sse-smoke-test.clj @@ -0,0 +1,53 @@ +#!/usr/bin/env bb +(require '[babashka.process :as process] + '[clojure.java.io :as io] + '[clojure.string :as string]) + +(def base-url (or (System/getenv "DB_WORKER_URL") + "http://127.0.0.1:9101")) +(def auth-token (System/getenv "DB_WORKER_AUTH_TOKEN")) +(def events-url (str (string/replace base-url #"/$" "") "/v1/events")) + +(defn- open-sse-connection + [url token] + (let [^java.net.HttpURLConnection conn (.openConnection (java.net.URL. url))] + (.setRequestMethod conn "GET") + (.setRequestProperty conn "Accept" "text/event-stream") + (when (seq token) + (.setRequestProperty conn "Authorization" (str "Bearer " token))) + (.setDoInput conn true) + (.connect conn) + conn)) + +(defn- wait-for-sse! + [^java.net.HttpURLConnection conn timeout-ms] + (let [event-seen (promise) + reader (future + (try + (with-open [rdr (io/reader (.getInputStream conn))] + (doseq [line (line-seq rdr)] + (when (string/starts-with? line "data:") + (deliver event-seen line) + (reduced nil)))) + (catch Exception _ nil)))] + (try + (let [result (deref event-seen timeout-ms ::timeout)] + (when (= result ::timeout) + (throw (ex-info "No SSE events captured" {:url events-url}))) + result) + (finally + (.disconnect conn) + (future-cancel reader))))) + +(defn- run-smoke-test! + [] + (let [{:keys [exit]} (process/shell {:inherit true} + "bb" "tmp_scripts/db-worker-smoke-test.clj")] + (when-not (zero? exit) + (throw (ex-info "Smoke test failed" {:exit exit}))))) + +(comment + (let [conn (open-sse-connection events-url auth-token)] + (run-smoke-test!) + (wait-for-sse! conn 2000) + (println "SSE smoke test OK"))) diff --git a/yarn.lock b/yarn.lock index 71e69f74d6..00598ab0ec 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2211,10 +2211,10 @@ base@^0.11.1: mixin-deep "^1.2.0" pascalcase "^0.1.1" -better-sqlite3@11.10.0: - version "11.10.0" - resolved "https://registry.yarnpkg.com/better-sqlite3/-/better-sqlite3-11.10.0.tgz#2b1b14c5acd75a43fd84d12cc291ea98cef57d98" - integrity sha512-EwhOpyXiOEL/lKzHz9AW1msWFNzGc/z+LzeB3/jnFJpxu+th2yqvzsSWas1v9jgs9+xiXJcD5A8CJxAG2TaghQ== +better-sqlite3@12.6.0: + version "12.6.0" + resolved "https://registry.yarnpkg.com/better-sqlite3/-/better-sqlite3-12.6.0.tgz#77ada940ca35943bd677bcb2937615b7079604b7" + integrity sha512-FXI191x+D6UPWSze5IzZjhz+i9MK9nsuHsmTX9bXVl52k06AfZ2xql0lrgIUuzsMsJ7Vgl5kIptvDgBLIV3ZSQ== dependencies: bindings "^1.5.0" prebuild-install "^7.1.1" @@ -11049,6 +11049,11 @@ write-file-atomic@^3.0.3: signal-exit "^3.0.2" typedarray-to-buffer "^3.1.5" +ws@8.19.0: + version "8.19.0" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.19.0.tgz#ddc2bdfa5b9ad860204f5a72a4863a8895fd8c8b" + integrity sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg== + ws@^7.4.6: version "7.5.10" resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.10.tgz#58b5c20dc281633f6c19113f39b349bd8bd558d9"