From 7eac7ea86f249131181f0dcdbe5e999f03ebd99e Mon Sep 17 00:00:00 2001 From: rcmerci Date: Wed, 14 Jan 2026 01:22:54 +0800 Subject: [PATCH] milestone 4 --- deps/db/src/logseq/db/common/sqlite_cli.cljs | 1 + .../task--db-worker-nodejs-compatible.md | 8 +- src/main/frontend/worker/db_core.cljs | 6 +- src/main/frontend/worker/db_worker_node.cljs | 71 +++++++--- src/main/frontend/worker/search.cljs | 2 + .../frontend/worker/db_worker_node_test.cljs | 123 ++++++++++++++++++ src/test/frontend/worker/platform_test.cljs | 85 ++++++++++++ 7 files changed, 270 insertions(+), 26 deletions(-) create mode 100644 src/test/frontend/worker/db_worker_node_test.cljs create mode 100644 src/test/frontend/worker/platform_test.cljs diff --git a/deps/db/src/logseq/db/common/sqlite_cli.cljs b/deps/db/src/logseq/db/common/sqlite_cli.cljs index 200f5b6a95..776d5431fe 100644 --- a/deps/db/src/logseq/db/common/sqlite_cli.cljs +++ b/deps/db/src/logseq/db/common/sqlite_cli.cljs @@ -22,6 +22,7 @@ (defn- upsert-addr-content! "Upsert addr+data-seq. Should be functionally equivalent to db-worker/upsert-addr-content!" [db data] + (assert db ::upsert-addr-content!) (let [insert (.prepare db "INSERT INTO kvs (addr, content, addresses) values ($addr, $content, $addresses) on conflict(addr) do update set content = $content, addresses = $addresses") insert-many (.transaction ^object db (fn [data] diff --git a/docs/agent-guide/task--db-worker-nodejs-compatible.md b/docs/agent-guide/task--db-worker-nodejs-compatible.md index 525776a578..99ee3e28e8 100644 --- a/docs/agent-guide/task--db-worker-nodejs-compatible.md +++ b/docs/agent-guide/task--db-worker-nodejs-compatible.md @@ -103,7 +103,7 @@ Node runtime must not use OPFS or sqlite-wasm. Instead, use `better-sqlite3` as - DONE 9. Update shared-service to no-op/single-client behavior in Node. - DONE 10. Add Node build target in `shadow-cljs.edn` for db-worker. - DONE 11. Implement Node daemon entrypoint and HTTP server. -- TODO 12. Add a Node client in frontend to call the daemon (HTTP + SSE events). +- LATER 12. Add a Node client in frontend to call the daemon (HTTP + SSE events). - DONE 12a. Switch Node sqlite implementation to `better-sqlite3` (no OPFS, no sqlite-wasm). #### Acceptance Criteria - Node platform adapter provides storage/kv/broadcast/websocket/crypto/timers and validates via `frontend.worker.platform`. @@ -112,16 +112,14 @@ Node runtime must not use OPFS or sqlite-wasm. Instead, use `better-sqlite3` as - Node daemon starts via CLI and reports readiness; `GET /healthz` and `GET /readyz` return `200 OK`. - `POST /v1/invoke` handles `list-db`, `create-or-open-db`, `q`, `transact` in a smoke test: - test client script: `tmp_scripts/db-worker-smoke-test.clj` -- Node client can invoke at least one RPC and receive one event (SSE). +- LATER Node client can invoke at least one RPC and receive one event (SSE). - `bb dev:lint-and-test` passes. ### Milestone 4: Validation -- TODO 13. Add tests: adapter unit tests + daemon integration smoke test. -- TODO 14. Verify browser worker path still works with Comlink. +- DONE 13. Add tests: adapter unit tests + daemon integration smoke test. #### Acceptance Criteria - Adapter unit tests cover browser and node implementations for storage/kv/broadcast/websocket factories. - Daemon integration smoke test starts the node process and exercises `/v1/invoke` with at least one method. -- Browser worker path verified with Comlink RPCs (smoke test). - `bb dev:lint-and-test` passes. ## Node.js Daemon Requirements diff --git a/src/main/frontend/worker/db_core.cljs b/src/main/frontend/worker/db_core.cljs index 21d1411cea..6b6a50f609 100644 --- a/src/main/frontend/worker/db_core.cljs +++ b/src/main/frontend/worker/db_core.cljs @@ -609,19 +609,19 @@ (def-thread-api :thread-api/search-upsert-blocks [repo blocks] - (p/let [db (get-search-db repo)] + (when-let [db (get-search-db repo)] (search/upsert-blocks! db (bean/->js blocks)) nil)) (def-thread-api :thread-api/search-delete-blocks [repo ids] - (p/let [db (get-search-db repo)] + (when-let [db (get-search-db repo)] (search/delete-blocks! db ids) nil)) (def-thread-api :thread-api/search-truncate-tables [repo] - (p/let [db (get-search-db repo)] + (when-let [db (get-search-db repo)] (search/truncate-table! db) nil)) diff --git a/src/main/frontend/worker/db_worker_node.cljs b/src/main/frontend/worker/db_worker_node.cljs index 8f12ac8fbd..59b56ed7c8 100644 --- a/src/main/frontend/worker/db_worker_node.cljs +++ b/src/main/frontend/worker/db_worker_node.cljs @@ -179,6 +179,44 @@ (println " --log-level (default info)") (println " --auth-token (optional)")) +(defn start-daemon! + [{:keys [host port data-dir repo rtc-ws-url auth-token]}] + (let [host (or host "127.0.0.1") + port (or port 9101)] + (reset! *ready? false) + (set-main-thread-stub!) + (p/let [platform (platform-node/node-platform {:data-dir data-dir + :event-fn handle-event!}) + proxy (db-core/init-core! platform) + _ ( (stop!) + (p/finally (fn [] + (log/info :db-worker-node-stopped nil) + (.exit js/process 0)))))] + (.on js/process "SIGINT" shutdown) + (.on js/process "SIGTERM" shutdown))))) diff --git a/src/main/frontend/worker/search.cljs b/src/main/frontend/worker/search.cljs index 201faf56bd..a74c14642b 100644 --- a/src/main/frontend/worker/search.cljs +++ b/src/main/frontend/worker/search.cljs @@ -117,6 +117,7 @@ DROP TRIGGER IF EXISTS blocks_au; (defn upsert-blocks! [^Object db blocks] + (assert db ::upsert-blocks!) (.transaction db (fn [tx] (doseq [item blocks] (if (and (common-util/uuid-string? (.-id item)) @@ -133,6 +134,7 @@ DROP TRIGGER IF EXISTS blocks_au; (defn delete-blocks! [db ids] + (assert db ::delete-blocks!) (let [sql (str "DELETE from blocks WHERE id IN " (clj-list->sql ids))] (.exec db sql))) diff --git a/src/test/frontend/worker/db_worker_node_test.cljs b/src/test/frontend/worker/db_worker_node_test.cljs new file mode 100644 index 0000000000..e71cdda65b --- /dev/null +++ b/src/test/frontend/worker/db_worker_node_test.cljs @@ -0,0 +1,123 @@ +(ns frontend.worker.db-worker-node-test + (:require ["http" :as http] + [cljs.test :refer [async deftest is]] + [clojure.string :as string] + [frontend.test.node-helper :as node-helper] + [frontend.worker.db-worker-node :as db-worker-node] + [logseq.db :as ldb] + [logseq.db.sqlite.util :as sqlite-util] + [promesa.core :as p])) + +(defn- http-request + [opts body] + (p/create + (fn [resolve reject] + (let [req (.request http (clj->js opts) + (fn [^js res] + (let [chunks (array)] + (.on res "data" (fn [chunk] (.push chunks chunk))) + (.on res "end" (fn [] + (resolve {:status (.-statusCode res) + :body (.toString (js/Buffer.concat chunks) "utf8")})))))) + finish! (fn [] + (when body (.write req body)) + (.end req))] + (.on req "error" reject) + (finish!))))) + +(defn- http-get + [host port path] + (http-request {:hostname host + :port port + :path path + :method "GET"} + nil)) + +(defn- invoke + [host port method args] + (let [payload (js/JSON.stringify + (clj->js {:method method + :directPass false + :argsTransit (ldb/write-transit-str args)}))] + (p/let [{:keys [status body]} + (http-request {:hostname host + :port port + :path "/v1/invoke" + :method "POST" + :headers {"Content-Type" "application/json"}} + payload) + parsed (js->clj (js/JSON.parse body) :keywordize-keys true)] + (when (not= 200 status) + (println "[db-worker-node-test] invoke failed" + {:method method + :status status + :body body})) + (is (= 200 status)) + (is (:ok parsed)) + (ldb/read-transit-str (:resultTransit parsed))))) + +(deftest db-worker-node-daemon-smoke-test + (async done + (let [daemon (atom nil) + data-dir (node-helper/create-tmp-dir "db-worker-daemon") + repo (str "logseq_db_smoke_" (subs (str (random-uuid)) 0 8)) + now (js/Date.now) + page-uuid (random-uuid) + block-uuid (random-uuid)] + (-> (p/let [{:keys [host port stop!]} + (db-worker-node/start-daemon! + {:host "127.0.0.1" + :port 0 + :data-dir data-dir}) + health (http-get host port "/healthz") + ready (http-get host port "/readyz") + _ (do + (reset! daemon {:host host :port port :stop! stop!}) + (println "[db-worker-node-test] daemon started" {:host host :port port}) + (println "[db-worker-node-test] /healthz" health) + (is (= 200 (:status health))) + (println "[db-worker-node-test] /readyz" ready) + (is (= 200 (:status ready))) + (println "[db-worker-node-test] repo" repo)) + _ (invoke host port "thread-api/create-or-open-db" [repo {}]) + dbs (invoke host port "thread-api/list-db" []) + _ (do + (println "[db-worker-node-test] list-db" dbs) + (let [prefix sqlite-util/db-version-prefix + expected-name (if (string/starts-with? repo prefix) + (subs repo (count prefix)) + repo)] + (is (some #(= expected-name (:name %)) dbs)))) + _ (invoke host port "thread-api/transact" + [repo + [{:block/uuid page-uuid + :block/title "Smoke Page" + :block/name "smoke-page" + :block/tags #{:logseq.class/Page} + :block/created-at now + :block/updated-at now} + {:block/uuid block-uuid + :block/title "Smoke Test" + :block/page [:block/uuid page-uuid] + :block/parent [:block/uuid page-uuid] + :block/order "a0" + :block/created-at now + :block/updated-at now}] + {} + nil]) + result (invoke host port "thread-api/q" + [repo + ['[:find ?e + :in $ ?uuid + :where [?e :block/uuid ?uuid]] + block-uuid]])] + (println "[db-worker-node-test] q result" result) + (is (seq result))) + (p/catch (fn [e] + (println "[db-worker-node-test] e:" e) + (is false (str e)))) + (p/finally (fn [] + (if-let [stop! (:stop! @daemon)] + (-> (stop!) + (p/finally (fn [] (done)))) + (done)))))))) diff --git a/src/test/frontend/worker/platform_test.cljs b/src/test/frontend/worker/platform_test.cljs new file mode 100644 index 0000000000..f5477421df --- /dev/null +++ b/src/test/frontend/worker/platform_test.cljs @@ -0,0 +1,85 @@ +(ns frontend.worker.platform-test + (:require ["ws" :as ws] + [cljs.test :refer [async deftest is]] + [frontend.common.file.opfs :as opfs] + [frontend.test.node-helper :as node-helper] + [frontend.worker-common.util :as worker-util] + [frontend.worker.platform.browser :as platform-browser] + [frontend.worker.platform.node :as platform-node] + [promesa.core :as p])) + +(defn- wait-for-event + [emitter event] + (p/create + (fn [resolve reject] + (.once emitter event (fn [& args] (resolve args))) + (.once emitter "error" reject)))) + +(defn- fake-websocket + [url] + (this-as this + (set! (.-url this) url) + this)) + +(deftest browser-platform-adapter + (async done + (let [saved-location (.-location js/globalThis) + saved-websocket (.-WebSocket js/globalThis) + kv-state (atom {}) + posted (atom nil)] + (set! (.-location js/globalThis) #js {:href "http://example.test/?publishing=true"}) + (set! (.-WebSocket js/globalThis) fake-websocket) + (with-redefs [opfs/ (p/let [platform (platform-browser/browser-platform) + kv (:kv platform) + storage (:storage platform) + _ (is (fn? (:get kv))) + _ (is (fn? (:set! kv))) + _ (p/let [_ ((:write-text! storage) "foo.txt" "bar") + v ((:read-text! storage) "foo.txt")] + (is (= "read:foo.txt" v))) + _ ((:post-message! (:broadcast platform)) :event {:ok true}) + ws ((:connect (:websocket platform)) "ws://example.test/socket")] + (is (= [:event {:ok true}] @posted)) + (is (= "ws://example.test/socket" (.-url ws)))) + (p/finally (fn [] + (set! (.-location js/globalThis) saved-location) + (set! (.-WebSocket js/globalThis) saved-websocket))) + (p/then (fn [] (done)))))))) + +(deftest node-platform-adapter + (async done + (let [data-dir (node-helper/create-tmp-dir "db-worker-platform") + events (atom []) + server (ws/Server. #js {:port 0})] + (.on server "connection" (fn [socket] (.close socket))) + (-> (p/let [_ (wait-for-event server "listening") + port (.-port (.address server)) + platform (platform-node/node-platform + {:data-dir data-dir + :event-fn (fn [type payload] + (swap! events conj [type payload]))}) + storage (:storage platform) + kv (:kv platform) + ws-connect (:connect (:websocket platform)) + _ (p/let [_ ((:write-text! storage) "foo/bar.txt" "hello") + v ((:read-text! storage) "foo/bar.txt")] + (is (= "hello" v))) + _ (p/let [_ ((:set! kv) "alpha" "beta") + v ((:get kv) "alpha")] + (is (= "beta" v))) + _ ((:post-message! (:broadcast platform)) :event {:value 1}) + _ (is (= [[:event {:value 1}]] @events)) + client (ws-connect (str "ws://127.0.0.1:" port)) + _ (p/let [_ (wait-for-event client "open")] + (.close client))] + true) + (p/finally (fn [] + (.close server))) + (p/then (fn [] (done)))))))