From 345edeacdcfbb40368158f6bc937b58ddbc5912d Mon Sep 17 00:00:00 2001 From: rcmerci Date: Fri, 18 Apr 2025 00:18:52 +0800 Subject: [PATCH] refactor: add on-slave-client, simplify create-service --- src/main/frontend/worker/shared_service.cljs | 292 ++++++++++--------- 1 file changed, 149 insertions(+), 143 deletions(-) diff --git a/src/main/frontend/worker/shared_service.cljs b/src/main/frontend/worker/shared_service.cljs index 804c7c5c31..4662cc83a1 100644 --- a/src/main/frontend/worker/shared_service.cljs +++ b/src/main/frontend/worker/shared_service.cljs @@ -6,6 +6,10 @@ [logseq.db :as ldb] [promesa.core :as p])) +;; TODO: +;; - client-channel close before re-creating new one +;; - change all 'provider' to 'master-client' & 'slave-client' + ;; Idea and code copied from https://github.com/Matt-TOTW/shared-service/blob/master/src/sharedService.ts ;; Related thread: https://github.com/rhashimoto/wa-sqlite/discussions/81 @@ -34,7 +38,7 @@ (let [id (random-id)] (p/let [client-id (js/navigator.locks.request id #js {:mode "exclusive"} (fn [_] - (p/let [^js locks (.query js/navigator.locks)] + (p/let [^js locks (js/navigator.locks.query)] (->> (.-held locks) (some #(when (= (.-name %) id) %)) .-clientId))))] @@ -56,7 +60,7 @@ (p/let [client-id (or @*client-id (get-client-id))] (js/navigator.locks.request service-name #js {:mode "exclusive", :ifAvailable true} (fn [_lock] - (p/let [^js locks (.query js/navigator.locks) + (p/let [^js locks (js/navigator.locks.query) locked? (some #(when (and (= (.-name %) service-name) (= (.-clientId %) client-id)) true) @@ -105,154 +109,156 @@ (reset! *common-channel channel) channel))) +(defn- on-slave-client + [slave-client-id service-name common-channel status-ready-deferred-p] + (reset! *client-channel (js/BroadcastChannel. (get-broadcast-channel-name slave-client-id service-name))) + (let [register (fn register [] + (p/create + (fn [resolve-fn _] + (letfn [(listener [event] + (let [{:keys [_providerId clientId type]} (bean/->clj (.-data event))] + (when (and (= clientId slave-client-id) (= type "registered")) + (js/navigator.locks.request service-name #js {:mode "exclusive"} + (fn [_lock] + ;; The provider has gone, elect the new provider + (prn :debug "Provider has gone") + (reset! *provider? :re-check))) + (.removeEventListener common-channel "message" listener) + (resolve-fn nil))))] + (.addEventListener common-channel "message" listener) + (.postMessage common-channel #js {:type "register" :clientId slave-client-id})))))] + (.addEventListener common-channel "message" + (fn [event] + (let [{:keys [type data]} (bean/->clj (.-data event))] + (case type + "providerChange" + (do + (js/console.log "Provider change detected. Re-registering...") + (register) + (when (seq @*requests-in-flight) + (js/console.log "Requests were in flight when provider changed. Requeuing...") + (p/all (map + (fn [[id {:keys [method args resolve-fn reject-fn]}]] + (let [listener (get-on-request-listener id resolve-fn reject-fn)] + (when-let [channel @*client-channel] + (.addEventListener channel "message" listener) + (.postMessage channel (bean/->js {:id id + :type "request" + :method method + :args args}))))) + @*requests-in-flight)))) + + "sync-db-changes" + (worker-util/post-message :sync-db-changes (ldb/read-transit-str data)) + + nil)))) + (-> + (p/do! + (register) + (p/resolve! status-ready-deferred-p)) + (p/catch (fn [error] + (js/console.error error)))))) + (defn ^:large-vars/cleanup-todo create-service [service-name target {:keys [on-provider-change]}] - (p/do! - (clear-old-service!) - (let [status {:ready (p/deferred)} - common-channel (ensure-common-channel service-name) - on-not-provider (fn [] - (-> - (p/let [client-id (or @*client-id (get-client-id))] - (reset! *client-channel (js/BroadcastChannel. (get-broadcast-channel-name client-id service-name))) - (let [register (fn register [] - (p/create - (fn [resolve-fn _] - (letfn [(listener [event] - (let [{:keys [_providerId clientId type]} (bean/->clj (.-data event))] - (when (and (= clientId client-id) (= type "registered")) - (js/navigator.locks.request service-name #js {:mode "exclusive"} - (fn [_lock] - ;; The provider has gone, elect the new provider - (prn :debug "Provider has gone") - (reset! *provider? :re-check))) - (.removeEventListener common-channel "message" listener) - (resolve-fn nil))))] - (.addEventListener common-channel "message" listener) - (.postMessage common-channel #js {:type "register" :clientId client-id})))))] - (.addEventListener common-channel "message" - (fn [event] - (let [{:keys [type data]} (bean/->clj (.-data event))] - (case type - "providerChange" - (do - (js/console.log "Provider change detected. Re-registering...") - (register) - (when (seq @*requests-in-flight) - (js/console.log "Requests were in flight when provider changed. Requeuing...") - (p/all (map - (fn [[id {:keys [method args resolve-fn reject-fn]}]] - (let [listener (get-on-request-listener id resolve-fn reject-fn)] - (when-let [channel @*client-channel] - (.addEventListener channel "message" listener) - (.postMessage channel (bean/->js {:id id - :type "request" - :method method - :args args}))))) - @*requests-in-flight)))) + (p/let [_ (clear-old-service!) + status {:ready (p/deferred)} + common-channel (ensure-common-channel service-name) + client-id (or @*client-id (get-client-id)) + on-become-provider (fn [_re-elect?] + (p/let [master-client-id client-id] + (prn :debug :become-master master-client-id :service service-name) + (.addEventListener + common-channel "message" + (fn [event] + (let [{:keys [clientId type]} (bean/->clj (.-data event))] + (when (= type "register") + (let [client-channel (js/BroadcastChannel. (get-broadcast-channel-name clientId service-name))] + (js/navigator.locks.request clientId #js {:mode "exclusive"} + (fn [_] + ;; The client has gone. Clean up + (.close client-channel))) - "sync-db-changes" - (worker-util/post-message :sync-db-changes (ldb/read-transit-str data)) + (.addEventListener client-channel "message" + (fn [event] + (let [{:keys [type method args id]} (bean/->clj (.-data event))] + (when (not= type "response") + (p/let [[result error] (p/catch + (p/then (apply-target-f! target method args) + (fn [res] [res nil])) + (fn [e] [nil (if (instance? js/Error e) + (bean/->clj e) + e)]))] + (.postMessage client-channel (bean/->js + {:id id + :type "response" + :result result + :error error + :method-key (first args)}))))))) + (.postMessage common-channel (bean/->js {:type "registered" + :clientId clientId + :providerId master-client-id + :serviceName service-name}))))))) + (.postMessage common-channel #js {:type "providerChange" + :providerId master-client-id + :serviceName service-name}) + (p/let [_ (when on-provider-change (on-provider-change service-name)) + _ (when (seq @*requests-in-flight) + (js/console.log "Requests were in flight when tab became provider. Requeuing...") + (p/all (map + (fn [[id {:keys [method args resolve-fn reject-fn]}]] + (-> + (p/let [result (apply-target-f! target method args)] + (resolve-fn result)) + (p/catch (fn [e] + (js/console.error "Error processing request" e) + (reject-fn e))) + (p/finally (fn [] + (swap! *requests-in-flight dissoc id))))) + @*requests-in-flight)))] + (p/resolve! (:ready status))))) + check-provider-f (fn [re-elect?] + (check-provider? service-name + {:on-become-provider #(on-become-provider re-elect?) + :on-not-provider #(on-slave-client client-id service-name common-channel (:ready status))}))] + (check-provider-f false) - nil)))) - (p/do! - (register) - (p/resolve! (:ready status))))) - (p/catch (fn [error] - (js/console.error error))))) - on-become-provider (fn [_re-elect?] - (p/let [provider-id (or @*client-id (get-client-id))] - (prn :debug :become-provider provider-id :service service-name) - (.addEventListener - common-channel "message" - (fn [event] - (let [{:keys [clientId type]} (bean/->clj (.-data event))] - (when (= type "register") - (let [client-channel (js/BroadcastChannel. (get-broadcast-channel-name clientId service-name))] - (js/navigator.locks.request clientId #js {:mode "exclusive"} - (fn [_] - ;; The client has gone. Clean up - (.close client-channel))) + (add-watch *provider? :check-provider + (fn [_ _ _ new-value] + (when (= new-value :re-check) + (p/do! + (p/delay 100) + (check-provider-f true))))) - (.addEventListener client-channel "message" - (fn [event] - (let [{:keys [type method args id]} (bean/->clj (.-data event))] - (when (not= type "response") - (p/let [[result error] (p/catch - (p/then (apply-target-f! target method args) - (fn [res] [res nil])) - (fn [e] [nil (if (instance? js/Error e) - (bean/->clj e) - e)]))] - (.postMessage client-channel (bean/->js - {:id id - :type "response" - :result result - :error error - :method-key (first args)}))))))) - (.postMessage common-channel (bean/->js {:type "registered" - :clientId clientId - :providerId provider-id - :serviceName service-name}))))))) - (.postMessage common-channel #js {:type "providerChange" - :providerId provider-id - :serviceName service-name}) - (p/let [_ (when on-provider-change (on-provider-change service-name)) - _ (when (seq @*requests-in-flight) - (js/console.log "Requests were in flight when tab became provider. Requeuing...") - (p/all (map - (fn [[id {:keys [method args resolve-fn reject-fn]}]] - (-> - (p/let [result (apply-target-f! target method args)] - (resolve-fn result)) - (p/catch (fn [e] - (js/console.error "Error processing request" e) - (reject-fn e))) - (p/finally (fn [] - (swap! *requests-in-flight dissoc id))))) - @*requests-in-flight)))] - (p/resolve! (:ready status))))) - check-provider-f (fn [re-elect?] - (check-provider? service-name {:on-become-provider #(on-become-provider re-elect?) - :on-not-provider on-not-provider}))] - (check-provider-f false) - - (add-watch *provider? :check-provider - (fn [_ _ _ new-value] - (when (= new-value :re-check) - (p/do! - (p/delay 100) - (check-provider-f true))))) - - {:proxy (js/Proxy. target - #js {:get (fn [target method] - (cond - (#{:then :catch :finally} (keyword method)) + {:proxy (js/Proxy. target + #js {:get (fn [target method] + (cond + (#{:then :catch :finally} (keyword method)) ;; Return nil for these methods to allow promise chaining to work correctly - nil + nil - :else - (fn [args] - (let [provider? @*provider?] - (if provider? - (apply-target-f! target method args) - (p/create - (fn [resolve-fn reject-fn] - (let [id (random-id) - listener (get-on-request-listener id resolve-fn reject-fn) - channel @*client-channel] - (when channel - (.addEventListener channel "message" listener) - (.postMessage channel (bean/->js - {:id id - :type "request" - :method method - :args args}))) - (swap! *requests-in-flight assoc id {:method method - :args args - :resolve-fn resolve-fn - :reject-fn reject-fn})))))))))}) - :status status}))) + :else + (fn [args] + (let [provider? @*provider?] + (if provider? + (apply-target-f! target method args) + (p/create + (fn [resolve-fn reject-fn] + (let [id (random-id) + listener (get-on-request-listener id resolve-fn reject-fn) + channel @*client-channel] + (when channel + (.addEventListener channel "message" listener) + (.postMessage channel (bean/->js + {:id id + :type "request" + :method method + :args args}))) + (swap! *requests-in-flight assoc id {:method method + :args args + :resolve-fn resolve-fn + :reject-fn reject-fn})))))))))}) + :status status})) (defn broadcast-to-clients! [payload]