From bf43ca478c846bee0bca71fb83fb47a3b39c09d5 Mon Sep 17 00:00:00 2001 From: rcmerci Date: Tue, 13 Jan 2026 19:57:51 +0800 Subject: [PATCH] milestone 3 (part 1) --- .carve/ignore | 2 + bb.edn | 6 + .../db-worker-browser-api-inventory.md | 27 --- .../task--db-worker-nodejs-compatible.md | 51 +++- package.json | 1 + shadow-cljs.edn | 10 + src/main/frontend/persist_db.cljs | 14 +- src/main/frontend/persist_db/node.cljs | 162 +++++++++++++ src/main/frontend/worker/db_core.cljs | 146 +++++++++--- src/main/frontend/worker/db_worker_node.cljs | 207 ++++++++++++++++ src/main/frontend/worker/platform.cljs | 13 +- .../frontend/worker/platform/browser.cljs | 19 ++ src/main/frontend/worker/platform/node.cljs | 225 ++++++++++++++++++ src/main/frontend/worker/shared_service.cljs | 130 ++++++---- 14 files changed, 898 insertions(+), 115 deletions(-) delete mode 100644 docs/agent-guide/db-worker-browser-api-inventory.md create mode 100644 src/main/frontend/persist_db/node.cljs create mode 100644 src/main/frontend/worker/db_worker_node.cljs create mode 100644 src/main/frontend/worker/platform/node.cljs diff --git a/.carve/ignore b/.carve/ignore index 7145230ed3..b239c5fcd6 100644 --- a/.carve/ignore +++ b/.carve/ignore @@ -60,6 +60,8 @@ frontend.ui/_emoji-init-data frontend.worker.rtc.op-mem-layer/_sync-loop-canceler ;; Used by shadow.cljs frontend.worker.db-worker/init +;; Used by shadow.cljs (node entrypoint) +frontend.worker.db-worker-node/main ;; Future use? frontend.worker.rtc.hash/hash-blocks ;; Repl fn diff --git a/bb.edn b/bb.edn index 23aa46d309..3bea2155f3 100644 --- a/bb.edn +++ b/bb.edn @@ -182,6 +182,12 @@ dev:gen-malli-kondo-config logseq.tasks.dev/gen-malli-kondo-config + dev:db-worker-node + {:doc "Compile and start db-worker-node (pass-through args forwarded to node)" + :task (do + (shell "clojure" "-M:cljs" "compile" "db-worker-node") + (apply shell "node" "./static/db-worker-node.js" *command-line-args*))} + lint:dev logseq.tasks.dev.lint/dev diff --git a/docs/agent-guide/db-worker-browser-api-inventory.md b/docs/agent-guide/db-worker-browser-api-inventory.md deleted file mode 100644 index 2a92e844a8..0000000000 --- a/docs/agent-guide/db-worker-browser-api-inventory.md +++ /dev/null @@ -1,27 +0,0 @@ -# db-worker browser-only API inventory - -## Worker entry -- frontend.worker.db-worker - - importScripts - - js/self postMessage - - js/location.href - - navigator.storage (OPFS) - - Comlink expose/wrap/transfer - - setInterval - -## Shared service -- frontend.worker.shared-service - - navigator.locks.request/query - - BroadcastChannel - - js/self postMessage - -## RTC and crypto -- frontend.worker.rtc.ws - - WebSocket -- frontend.worker.rtc.crypt - - OPFS file access via frontend.common.file.opfs - - js/self location - -## Worker util -- frontend.worker-common.util - - wfu/post-message (worker postMessage bridge) diff --git a/docs/agent-guide/task--db-worker-nodejs-compatible.md b/docs/agent-guide/task--db-worker-nodejs-compatible.md index ebfb427e44..d0a1b8828f 100644 --- a/docs/agent-guide/task--db-worker-nodejs-compatible.md +++ b/docs/agent-guide/task--db-worker-nodejs-compatible.md @@ -18,7 +18,7 @@ Make `frontend.worker.db-worker` and its dependencies run in both browser and No - Implement `frontend.worker.platform.node` using `fs/promises`, `path`, `crypto`, and `ws`. 3. Abstract sqlite storage and VFS specifics. - Browser: keep OPFS SAH pool implementation. - - Node: use file-backed sqlite storage (via sqlite-wasm Node VFS or a Node sqlite binding). + - Node: use file-backed sqlite storage via `better-sqlite3` (no OPFS, no sqlite-wasm). - Route db path resolution through the platform adapter (data dir, per-repo paths). 4. Replace `importScripts` bootstrap with an explicit init entrypoint. - Browser build still uses `:web-worker`, but entrypoint should call `init!` with a browser platform adapter. @@ -41,10 +41,40 @@ Make `frontend.worker.db-worker` and its dependencies run in both browser and No 10. Build config changes. - Add a Node build target in `shadow-cljs.edn` for db-worker (e.g. `:db-worker-node`). - Ensure shared code compiles for `:node-script` or `:node-library` with the correct externs. + - Add `better-sqlite3` dependency and ensure Node target treats it as a native external. 11. Tests and fixtures. - Add unit tests for platform adapters and storage abstraction. - Add a minimal integration test that starts the Node daemon and exercises a small RPC call. +## Node.js sqlite Implementation (better-sqlite3) +Node runtime must not use OPFS or sqlite-wasm. Instead, use `better-sqlite3` as the direct file-backed sqlite engine. + +### Concrete Refactor Items (File + Function + Summary) +- `src/main/frontend/worker/db_core.cljs` (`init-sqlite-module!`, `InBrowser)) +(defonce node-db (atom nil)) + +(defn- node-runtime? + [] + (and (exists? js/process) + (not (exists? js/window)))) (defn- get-impl "Get the actual implementation of PersistentDB" [] - opfs-db) + (if (node-runtime?) + (or @node-db + (reset! node-db (node/start! (assoc (node/default-config) + :event-handler worker-handler/handle)))) + opfs-db)) (defn {"Content-Type" "application/json" + "Accept" "application/json"} + (seq auth-token) + (assoc "Authorization" (str "Bearer " auth-token)))) + +(defn- js headers)} + (fn [^js res] + (let [chunks (array)] + (.on res "data" (fn [chunk] (.push chunks chunk))) + (.on res "end" (fn [] + (let [buf (js/Buffer.concat chunks)] + (resolve {:status (.-statusCode res) + :body (.toString buf "utf8")})))) + (.on res "error" reject))))] + (.on req "error" reject) + (when body + (.write req body)) + (.end req))))) + +(defn- js (if direct-pass? + {:method method + :directPass true + :args args} + {:method method + :directPass false + :argsTransit (ldb/write-transit-str args)})))] + (p/let [{:keys [status body]} (clj (js/JSON.parse body) :keywordize-keys true)] + (if direct-pass? + result + (ldb/read-transit-str resultTransit))) + (do + (log/error :db-worker-node-invoke-failed {:status status :body body}) + (throw (ex-info "db-worker-node invoke failed" {:status status :body body}))))))) + +(defn- connect-events! + [{:keys [base-url auth-token event-handler]} wrapped-worker] + (let [url (js/URL. (str (string/replace base-url #"/$" "") "/v1/events")) + headers (base-headers auth-token) + buffer (atom "") + handler (or event-handler (fn [_type _payload _wrapped-worker] nil))] + (let [req (.request + (request-module url) + #js {:method "GET" + :hostname (.-hostname url) + :port (or (.-port url) (if (= "https:" (.-protocol url)) 443 80)) + :path (str (.-pathname url) (.-search url)) + :headers (clj->js headers)} + (fn [^js res] + (.on res "data" + (fn [chunk] + (swap! buffer str (.toString chunk "utf8")) + (loop [] + (when-let [idx (string/index-of @buffer "\n\n")] + (let [event-text (subs @buffer 0 idx) + rest-text (subs @buffer (+ idx 2))] + (reset! buffer rest-text) + (when-let [line (some-> event-text + (string/split-lines) + (->> (some #(when (string/starts-with? % "data: ") + (subs % 6)))))] + (let [{:keys [type payload]} (js->clj (js/JSON.parse line) :keywordize-keys true)] + (when (and type payload) + (handler (keyword type) (ldb/read-transit-str payload) wrapped-worker)))) + (recur)))))) + (.on res "error" (fn [e] + (log/error :db-worker-node-events-error e)))))] + (.on req "error" (fn [e] + (log/error :db-worker-node-events-error e))) + (.end req)) + nil)) + +(defrecord InNode [client wrapped-worker] + protocol/PersistentDB + (InNode client wrapped-worker))) diff --git a/src/main/frontend/worker/db_core.cljs b/src/main/frontend/worker/db_core.cljs index 16b169eaab..5eac7640e9 100644 --- a/src/main/frontend/worker/db_core.cljs +++ b/src/main/frontend/worker/db_core.cljs @@ -1,7 +1,6 @@ (ns frontend.worker.db-core "Core db-worker logic without host-specific bootstrap." - (:require ["@sqlite.org/sqlite-wasm" :default sqlite3InitModule] - [cljs-bean.core :as bean] + (:require [cljs-bean.core :as bean] [cljs.cache :as cache] [clojure.edn :as edn] [clojure.set] @@ -34,6 +33,7 @@ [frontend.worker.shared-service :as shared-service] [frontend.worker.state :as worker-state] [frontend.worker.thread-atom] + [goog.object :as gobj] [lambdaisland.glogi :as log] [logseq.cli.common.mcp.tools :as cli-common-mcp-tools] [logseq.common.util :as common-util] @@ -63,29 +63,60 @@ (defonce *client-ops-conns worker-state/*client-ops-conns) (defonce *opfs-pools worker-state/*opfs-pools) (defonce *publishing? (atom false)) +(defonce ^:private *node-pools (atom {})) + +(defn- node-runtime? + [] + (= :node (platform/env-flag (platform/current) :runtime))) + +(defn- get-storage-pool + [graph] + (if (node-runtime?) + (get @*node-pools graph) + (worker-state/get-opfs-pool graph))) + +(defn- remember-storage-pool! + [graph pool] + (if (node-runtime?) + (swap! *node-pools assoc graph pool) + (swap! *opfs-pools assoc graph pool))) + +(defn- forget-storage-pool! + [graph] + (if (node-runtime?) + (swap! *node-pools dissoc graph) + (swap! *opfs-pools dissoc graph))) (defn- js {:print #(log/info :init-sqlite-module! %) - :printErr #(log/error :init-sqlite-module! %)}))] + sqlite (platform/sqlite-init! (platform/current))] (reset! *publishing? publishing?) - (reset! *sqlite sqlite) + (reset! *sqlite (or sqlite ::sqlite-initialized)) nil))) (def repo-path "/db.sqlite") (def debug-log-path "/debug-log/db.sqlite") +(defn- resolve-db-path + [repo pool path] + (let [storage (platform/storage (platform/current))] + (if-let [f (:resolve-db-path storage)] + (f repo pool path) + path))) + (defn- client-ops migration-result)] (client-op/add-ops! repo client-ops)))) - (db-listener/listen-db-changes! repo (get @*datascript-conns repo)))))) + (db-listener/listen-db-changes! repo (get @*datascript-conns repo)) + (log/info :db-worker/create-or-open-done {:repo repo}))))) (defn- db-id {})) (start-db! repo opts)) @@ -505,8 +576,9 @@ (def-thread-api :thread-api/release-access-handles [repo] - (when-let [^js pool (worker-state/get-opfs-pool repo)] - (.pauseVfs pool) + (when-let [^js pool (get-storage-pool repo)] + (when (exists? (.-pauseVfs pool)) + (.pauseVfs pool)) nil)) (def-thread-api :thread-api/db-exists @@ -789,6 +861,8 @@ [repo start-opts] (js/Promise. (m/sp + (log/info :db-worker/on-become-master-start {:repo repo + :import-type (:import-type start-opts)}) (c.m/js fns) - #(on-become-master graph start-opts) - broadcast-data-types - {:import? (:import-type? start-opts)})] + (do + (log/info :db-worker/init-service {:graph graph + :prev-graph prev-graph + :import-type (:import-type start-opts)}) + (p/let [service (shared-service/js fns) + #(on-become-master graph start-opts) + broadcast-data-types + {:import? (:import-type? start-opts)})] (assert (p/promise? (get-in service [:status :ready]))) (reset! *service [graph service]) - service))))) + service)))))) (defn- notify-invalid-data [{:keys [tx-meta]} errors] @@ -852,7 +931,12 @@ (= :thread-api/create-or-open-db method-k) ;; because shared-service operates at the graph level, ;; creating a new database or switching to another one requires re-initializing the service. - (let [[graph opts] (ldb/read-transit-str (last args))] + (let [payload (last args) + payload' (cond + (string? payload) (ldb/read-transit-str payload) + (array? payload) (js->clj payload :keywordize-keys true) + :else payload) + [graph opts] payload'] (p/let [service (js)) diff --git a/src/main/frontend/worker/db_worker_node.cljs b/src/main/frontend/worker/db_worker_node.cljs new file mode 100644 index 0000000000..5b4c6a7fe5 --- /dev/null +++ b/src/main/frontend/worker/db_worker_node.cljs @@ -0,0 +1,207 @@ +(ns frontend.worker.db-worker-node + "Node.js daemon entrypoint for db-worker." + (:require ["http" :as http] + [clojure.string :as string] + [frontend.worker.db-core :as db-core] + [frontend.worker.platform.node :as platform-node] + [frontend.worker.state :as worker-state] + [goog.object :as gobj] + [lambdaisland.glogi :as log] + [lambdaisland.glogi.console :as glogi-console] + [logseq.db :as ldb] + [promesa.core :as p])) + +(defonce ^:private *ready? (atom false)) +(defonce ^:private *sse-clients (atom #{})) + +(defn- send-json! + [^js res status payload] + (.writeHead res status #js {"Content-Type" "application/json"}) + (.end res (js/JSON.stringify (clj->js payload)))) + +(defn- send-text! + [^js res status text] + (.writeHead res status #js {"Content-Type" "text/plain"}) + (.end res text)) + +(defn- js {:type type :payload payload})) + message (str "data: " event "\n\n")] + (doseq [^js res @*sse-clients] + (try + (.write res message) + (catch :default e + (log/error :sse-write-failed e)))))) + +(defn- sse-handler + [^js req ^js res] + (.writeHead res 200 #js {"Content-Type" "text/event-stream" + "Cache-Control" "no-cache" + "Connection" "keep-alive"}) + (.write res "\n") + (swap! *sse-clients conj res) + (.on req "close" (fn [] + (swap! *sse-clients disj res)))) + +(defn- (.remoteInvoke proxy method (boolean direct-pass?) args') + (p/finally (fn [] + (js/clearTimeout timeout-id)))))) + +(defn- (p/let [body (clj payload :keywordize-keys true) + direct-pass? (boolean directPass) + args' (if direct-pass? + args + (or argsTransit args)) + _ (log/info :db-worker-node-http-invoke + {:method method :direct-pass? direct-pass?}) + result ( (default 127.0.0.1)") + (println " --port (default 9101)") + (println " --data-dir (default ~/.logseq/db-worker)") + (println " --repo (optional)") + (println " --rtc-ws-url (optional)") + (println " --log-level (default info)") + (println " --auth-token (optional)")) + +(defn main + [] + (let [{:keys [host port data-dir repo rtc-ws-url log-level auth-token help?]} + (parse-args (.-argv js/process)) + host (or host "127.0.0.1") + port (or port 9101) + log-level (keyword (or log-level "info"))] + (when help? + (show-help!) + (.exit js/process 0)) + (glogi-console/install!) + (log/set-levels {:glogi/root log-level}) + (set-main-thread-stub!) + (p/let [platform (platform-node/node-platform {:data-dir data-dir + :event-fn handle-event!}) + proxy (db-core/init-core! platform) + _ (js {:print #(log/info :init-sqlite-module! %) + :printErr #(log/error :init-sqlite-module! %)}))) + +(defn- open-sqlite-db + [{:keys [sqlite pool path mode]}] + (if pool + (new (.-OpfsSAHPoolDb pool) path) + (let [^js DB (.-DB ^js (.-oo1 ^js sqlite))] + (new DB path (or mode "c"))))) + (defn browser-platform [] {:env {:publishing? (string/includes? (.. js/location -href) "publishing=true") @@ -96,6 +109,7 @@ :storage {:install-opfs-pool install-opfs-pool :list-graphs list-graphs :db-exists? db-exists? + :resolve-db-path (fn [_repo _pool path] path) :export-file export-file :import-db import-db :remove-vfs! remove-vfs! @@ -107,5 +121,10 @@ :set! kv-set!} :broadcast {:post-message! worker-util/post-message} :websocket {:connect websocket-connect} + :sqlite {:init! init-sqlite! + :open-db open-sqlite-db + :close-db (fn [db] (.close db)) + :exec (fn [db sql-or-opts] (.exec db sql-or-opts)) + :transaction (fn [db f] (.transaction db f))} :crypto {} :timers {:set-interval! (fn [f ms] (js/setInterval f ms))}}) diff --git a/src/main/frontend/worker/platform/node.cljs b/src/main/frontend/worker/platform/node.cljs new file mode 100644 index 0000000000..cb3755c1b3 --- /dev/null +++ b/src/main/frontend/worker/platform/node.cljs @@ -0,0 +1,225 @@ +(ns frontend.worker.platform.node + "Node.js platform adapter for db-worker." + (:require ["better-sqlite3" :as sqlite3] + ["fs/promises" :as fs] + ["os" :as os] + ["path" :as node-path] + ["ws" :as ws] + [clojure.string :as string] + [frontend.worker-common.util :as worker-util] + [goog.object :as gobj] + [lambdaisland.glogi :as log] + [promesa.core :as p])) + +(def ^:private sqlite + (or (aget sqlite3 "default") sqlite3)) + +(defn- expand-home + [path] + (if (string/starts-with? path "~") + (node-path/join (.homedir os) (subs path 1)) + path)) + +(defn- ensure-dir! + [dir] + (fs/mkdir dir #js {:recursive true})) + +(defn- strip-leading-slash + [path] + (string/replace-first path #"^/" "")) + +(defn- repo-dir + [data-dir pool-name] + (node-path/join data-dir (str "." pool-name))) + +(defn- pool-path + [^js pool path] + (node-path/join (.-repoDir pool) (strip-leading-slash path))) + +(defn- path-under-data-dir + [data-dir path] + (if (node-path/isAbsolute path) + path + (node-path/join data-dir path))) + +(defn- ->buffer + [data] + (cond + (instance? js/Buffer data) data + (instance? js/ArrayBuffer data) (js/Buffer.from data) + (and (some? data) (some? (.-buffer data))) (js/Buffer.from (.-buffer data)) + :else (js/Buffer.from (str data)))) + +(defn- list-graphs + [data-dir] + (let [dir? #(and % (.isDirectory %)) + db-dir-prefix ".logseq-pool-"] + (p/let [entries (fs/readdir data-dir #js {:withFileTypes true}) + db-dirs (->> entries + (filter dir?) + (filter (fn [dirent] + (string/starts-with? (.-name dirent) db-dir-prefix)))) + graph-names (map (fn [dirent] + (-> (.-name dirent) + (string/replace-first db-dir-prefix "") + ;; TODO: DRY + (string/replace "+3A+" ":") + (string/replace "++" "/"))) + db-dirs)] + (vec graph-names)))) + +(defn- db-exists? + [data-dir graph] + (p/let [pool-name (worker-util/get-pool-name graph) + db-path (node-path/join (repo-dir data-dir pool-name) "db.sqlite")] + (-> (fs/stat db-path) + (p/then (fn [_] true)) + (p/catch (fn [_] false))))) + +(defn- exec-sql + [db opts-or-sql] + (if (string? opts-or-sql) + (.exec db opts-or-sql) + (let [sql (gobj/get opts-or-sql "sql") + bind (gobj/get opts-or-sql "bind") + row-mode (gobj/get opts-or-sql "rowMode") + bind' (if (and bind (object? bind)) + (let [out (js-obj)] + (doseq [key (js/Object.keys bind)] + (let [value (gobj/get bind key) + normalized (cond + (string/starts-with? key "$") (subs key 1) + (string/starts-with? key ":") (subs key 1) + :else key)] + (gobj/set out normalized value))) + out) + bind) + stmt (.prepare db sql)] + (if (= row-mode "array") + (do + (.raw stmt) + (if (some? bind') + (.all stmt bind') + (.all stmt))) + (do + (if (some? bind') + (.run stmt bind') + (.run stmt)) + nil))))) + +(defn- wrap-better-db + [db] + (let [wrapper (js-obj)] + (set! (.-exec wrapper) (fn [opts-or-sql] (exec-sql db opts-or-sql))) + (set! (.-transaction wrapper) + (fn [f] + (let [run-tx (.transaction db (fn [] (f wrapper)))] + (run-tx)))) + (set! (.-close wrapper) (fn [] (.close db))) + wrapper)) + +(defn- open-sqlite-db + [{:keys [path]}] + (p/let [_ (ensure-dir! (node-path/dirname path))] + (wrap-better-db (new sqlite path)))) + +(defn- install-opfs-pool + [data-dir _sqlite pool-name] + (p/let [repo-dir-path (repo-dir data-dir pool-name) + _ (ensure-dir! repo-dir-path) + pool (js-obj)] + (set! (.-repoDir pool) repo-dir-path) + (set! (.-getCapacity pool) (fn [] 1)) + (set! (.-pauseVfs pool) (fn [] nil)) + (set! (.-unpauseVfs pool) (fn [] nil)) + pool)) + +(defn- export-file + [pool path] + (fs/readFile (pool-path pool path))) + +(defn- import-db + [pool path data] + (let [full-path (pool-path pool path) + dir (node-path/dirname full-path)] + (p/let [_ (ensure-dir! dir)] + (fs/writeFile full-path (->buffer data))))) + +(defn- remove-vfs! + [pool] + (when pool + (fs/rm (.-repoDir pool) #js {:recursive true :force true}))) + +(defn- read-text! + [data-dir path] + (fs/readFile (path-under-data-dir data-dir path) "utf8")) + +(defn- write-text! + [data-dir path text] + (let [full-path (path-under-data-dir data-dir path) + dir (node-path/dirname full-path)] + (p/let [_ (ensure-dir! dir)] + (fs/writeFile full-path text "utf8")))) + +(defn- websocket-connect + [url] + (ws. url)) + +(defn- kv-store + [data-dir] + (let [kv-path (node-path/join data-dir "kv-store.json") + state (atom nil) + (fs/readFile kv-path "utf8") + (p/then (fn [contents] + (let [data (js/JSON.parse contents)] + (reset! state (js->clj data :keywordize-keys false)) + @state))) + (p/catch (fn [_] + (reset! state {}) + @state)))))] + {:get (fn [k] + (p/let [_ (js @state))] + (fs/writeFile kv-path payload "utf8")))})) + +(defn node-platform + [{:keys [data-dir event-fn]}] + (let [data-dir (expand-home (or data-dir "~/.logseq/db-worker")) + kv (kv-store data-dir)] + (p/do! + (ensure-dir! data-dir) + (log/info :db-worker-node-platform {:data-dir data-dir}) + {:env {:publishing? false + :runtime :node + :data-dir data-dir} + :storage {:install-opfs-pool (fn [sqlite-module pool-name] + (install-opfs-pool data-dir sqlite-module pool-name)) + :list-graphs (fn [] (list-graphs data-dir)) + :db-exists? (fn [graph] (db-exists? data-dir graph)) + :resolve-db-path (fn [_repo pool path] + (pool-path pool path)) + :export-file export-file + :import-db import-db + :remove-vfs! remove-vfs! + :read-text! (fn [path] (read-text! data-dir path)) + :write-text! (fn [path text] (write-text! data-dir path text))} + :kv {:get (:get kv) + :set! (:set! kv)} + :broadcast {:post-message! (fn [type payload] + (when event-fn + (event-fn type payload)))} + :websocket {:connect websocket-connect} + :sqlite {:init! (fn [] nil) + :open-db open-sqlite-db + :close-db (fn [db] (.close db)) + :exec (fn [db sql-or-opts] (.exec db sql-or-opts)) + :transaction (fn [db f] (.transaction db f))} + :crypto {} + :timers {:set-interval! (fn [f ms] (js/setInterval f ms))}}))) diff --git a/src/main/frontend/worker/shared_service.cljs b/src/main/frontend/worker/shared_service.cljs index 85ed56e210..7749af9c70 100644 --- a/src/main/frontend/worker/shared_service.cljs +++ b/src/main/frontend/worker/shared_service.cljs @@ -1,6 +1,7 @@ (ns frontend.worker.shared-service "This allows multiple workers to share some resources (e.g. db access)" (:require [cljs-bean.core :as bean] + [frontend.worker.platform :as platform] [goog.object :as gobj] [lambdaisland.glogi :as log] [logseq.common.util :as common-util] @@ -35,6 +36,12 @@ (defonce *client-id (atom nil)) (defonce *master-client-lock (atom nil)) +(defn- node-runtime? + [] + (try + (= :node (platform/env-flag (platform/current) :runtime)) + (catch :default _ false))) + (defn- next-request-id [] (vswap! *current-request-id inc)) @@ -307,60 +314,83 @@ forward the data broadcast from the master client directly to the UI thread." [service-name target on-become-master-handler broadcast-data-types {:keys [import?]}] (clear-old-service!) - (when import? (reset! *master-client? true)) - (p/let [broadcast-data-types (set broadcast-data-types) - status {:ready (p/deferred)} - common-channel (ensure-common-channel service-name) - client-id (js - {:id request-id - :type "request" - :method method - :args args}))))))) - (log/error :invalid-invoke-method method)))}) - :status status - :client-id client-id})) + :else + (let [request-id (next-request-id) + client-channel (ensure-client-channel client-id service-name)] + (p/create + (fn [resolve-fn reject-fn] + (vswap! *requests-in-flight assoc request-id {:method method + :args args + :resolve-fn resolve-fn + :reject-fn reject-fn}) + (.postMessage client-channel (bean/->js + {:id request-id + :type "request" + :method method + :args args}))))))) + (log/error :invalid-invoke-method method)))}) + :status status + :client-id client-id})))) (defn broadcast-to-clients! [type' data] (let [transit-payload (ldb/write-transit-str [type' data])] - (when (exists? js/self) (.postMessage js/self transit-payload)) - (when-let [common-channel @*common-channel] - (let [str-type' (common-util/keyword->string type')] - (.postMessage common-channel #js {:type str-type' - :data transit-payload}))))) + (if (node-runtime?) + (platform/post-message! (platform/current) type' transit-payload) + (do + (when (exists? js/self) (.postMessage js/self transit-payload)) + (when-let [common-channel @*common-channel] + (let [str-type' (common-util/keyword->string type')] + (.postMessage common-channel #js {:type str-type' + :data transit-payload})))))))