refactor: add on-slave-client, simplify create-service

This commit is contained in:
rcmerci
2025-04-18 00:18:52 +08:00
parent 08fc2560a4
commit 345edeacdc

View File

@@ -6,6 +6,10 @@
[logseq.db :as ldb]
[promesa.core :as p]))
;; TODO:
;; - client-channel close before re-creating new one
;; - change all 'provider' to 'master-client' & 'slave-client'
;; Idea and code copied from https://github.com/Matt-TOTW/shared-service/blob/master/src/sharedService.ts
;; Related thread: https://github.com/rhashimoto/wa-sqlite/discussions/81
@@ -34,7 +38,7 @@
(let [id (random-id)]
(p/let [client-id (js/navigator.locks.request id #js {:mode "exclusive"}
(fn [_]
(p/let [^js locks (.query js/navigator.locks)]
(p/let [^js locks (js/navigator.locks.query)]
(->> (.-held locks)
(some #(when (= (.-name %) id) %))
.-clientId))))]
@@ -56,7 +60,7 @@
(p/let [client-id (or @*client-id (get-client-id))]
(js/navigator.locks.request service-name #js {:mode "exclusive", :ifAvailable true}
(fn [_lock]
(p/let [^js locks (.query js/navigator.locks)
(p/let [^js locks (js/navigator.locks.query)
locked? (some #(when (and (= (.-name %) service-name)
(= (.-clientId %) client-id))
true)
@@ -105,154 +109,156 @@
(reset! *common-channel channel)
channel)))
(defn- on-slave-client
[slave-client-id service-name common-channel status-ready-deferred-p]
(reset! *client-channel (js/BroadcastChannel. (get-broadcast-channel-name slave-client-id service-name)))
(let [register (fn register []
(p/create
(fn [resolve-fn _]
(letfn [(listener [event]
(let [{:keys [_providerId clientId type]} (bean/->clj (.-data event))]
(when (and (= clientId slave-client-id) (= type "registered"))
(js/navigator.locks.request service-name #js {:mode "exclusive"}
(fn [_lock]
;; The provider has gone, elect the new provider
(prn :debug "Provider has gone")
(reset! *provider? :re-check)))
(.removeEventListener common-channel "message" listener)
(resolve-fn nil))))]
(.addEventListener common-channel "message" listener)
(.postMessage common-channel #js {:type "register" :clientId slave-client-id})))))]
(.addEventListener common-channel "message"
(fn [event]
(let [{:keys [type data]} (bean/->clj (.-data event))]
(case type
"providerChange"
(do
(js/console.log "Provider change detected. Re-registering...")
(register)
(when (seq @*requests-in-flight)
(js/console.log "Requests were in flight when provider changed. Requeuing...")
(p/all (map
(fn [[id {:keys [method args resolve-fn reject-fn]}]]
(let [listener (get-on-request-listener id resolve-fn reject-fn)]
(when-let [channel @*client-channel]
(.addEventListener channel "message" listener)
(.postMessage channel (bean/->js {:id id
:type "request"
:method method
:args args})))))
@*requests-in-flight))))
"sync-db-changes"
(worker-util/post-message :sync-db-changes (ldb/read-transit-str data))
nil))))
(->
(p/do!
(register)
(p/resolve! status-ready-deferred-p))
(p/catch (fn [error]
(js/console.error error))))))
(defn ^:large-vars/cleanup-todo create-service
[service-name target {:keys [on-provider-change]}]
(p/do!
(clear-old-service!)
(let [status {:ready (p/deferred)}
common-channel (ensure-common-channel service-name)
on-not-provider (fn []
(->
(p/let [client-id (or @*client-id (get-client-id))]
(reset! *client-channel (js/BroadcastChannel. (get-broadcast-channel-name client-id service-name)))
(let [register (fn register []
(p/create
(fn [resolve-fn _]
(letfn [(listener [event]
(let [{:keys [_providerId clientId type]} (bean/->clj (.-data event))]
(when (and (= clientId client-id) (= type "registered"))
(js/navigator.locks.request service-name #js {:mode "exclusive"}
(fn [_lock]
;; The provider has gone, elect the new provider
(prn :debug "Provider has gone")
(reset! *provider? :re-check)))
(.removeEventListener common-channel "message" listener)
(resolve-fn nil))))]
(.addEventListener common-channel "message" listener)
(.postMessage common-channel #js {:type "register" :clientId client-id})))))]
(.addEventListener common-channel "message"
(fn [event]
(let [{:keys [type data]} (bean/->clj (.-data event))]
(case type
"providerChange"
(do
(js/console.log "Provider change detected. Re-registering...")
(register)
(when (seq @*requests-in-flight)
(js/console.log "Requests were in flight when provider changed. Requeuing...")
(p/all (map
(fn [[id {:keys [method args resolve-fn reject-fn]}]]
(let [listener (get-on-request-listener id resolve-fn reject-fn)]
(when-let [channel @*client-channel]
(.addEventListener channel "message" listener)
(.postMessage channel (bean/->js {:id id
:type "request"
:method method
:args args})))))
@*requests-in-flight))))
(p/let [_ (clear-old-service!)
status {:ready (p/deferred)}
common-channel (ensure-common-channel service-name)
client-id (or @*client-id (get-client-id))
on-become-provider (fn [_re-elect?]
(p/let [master-client-id client-id]
(prn :debug :become-master master-client-id :service service-name)
(.addEventListener
common-channel "message"
(fn [event]
(let [{:keys [clientId type]} (bean/->clj (.-data event))]
(when (= type "register")
(let [client-channel (js/BroadcastChannel. (get-broadcast-channel-name clientId service-name))]
(js/navigator.locks.request clientId #js {:mode "exclusive"}
(fn [_]
;; The client has gone. Clean up
(.close client-channel)))
"sync-db-changes"
(worker-util/post-message :sync-db-changes (ldb/read-transit-str data))
(.addEventListener client-channel "message"
(fn [event]
(let [{:keys [type method args id]} (bean/->clj (.-data event))]
(when (not= type "response")
(p/let [[result error] (p/catch
(p/then (apply-target-f! target method args)
(fn [res] [res nil]))
(fn [e] [nil (if (instance? js/Error e)
(bean/->clj e)
e)]))]
(.postMessage client-channel (bean/->js
{:id id
:type "response"
:result result
:error error
:method-key (first args)})))))))
(.postMessage common-channel (bean/->js {:type "registered"
:clientId clientId
:providerId master-client-id
:serviceName service-name})))))))
(.postMessage common-channel #js {:type "providerChange"
:providerId master-client-id
:serviceName service-name})
(p/let [_ (when on-provider-change (on-provider-change service-name))
_ (when (seq @*requests-in-flight)
(js/console.log "Requests were in flight when tab became provider. Requeuing...")
(p/all (map
(fn [[id {:keys [method args resolve-fn reject-fn]}]]
(->
(p/let [result (apply-target-f! target method args)]
(resolve-fn result))
(p/catch (fn [e]
(js/console.error "Error processing request" e)
(reject-fn e)))
(p/finally (fn []
(swap! *requests-in-flight dissoc id)))))
@*requests-in-flight)))]
(p/resolve! (:ready status)))))
check-provider-f (fn [re-elect?]
(check-provider? service-name
{:on-become-provider #(on-become-provider re-elect?)
:on-not-provider #(on-slave-client client-id service-name common-channel (:ready status))}))]
(check-provider-f false)
nil))))
(p/do!
(register)
(p/resolve! (:ready status)))))
(p/catch (fn [error]
(js/console.error error)))))
on-become-provider (fn [_re-elect?]
(p/let [provider-id (or @*client-id (get-client-id))]
(prn :debug :become-provider provider-id :service service-name)
(.addEventListener
common-channel "message"
(fn [event]
(let [{:keys [clientId type]} (bean/->clj (.-data event))]
(when (= type "register")
(let [client-channel (js/BroadcastChannel. (get-broadcast-channel-name clientId service-name))]
(js/navigator.locks.request clientId #js {:mode "exclusive"}
(fn [_]
;; The client has gone. Clean up
(.close client-channel)))
(add-watch *provider? :check-provider
(fn [_ _ _ new-value]
(when (= new-value :re-check)
(p/do!
(p/delay 100)
(check-provider-f true)))))
(.addEventListener client-channel "message"
(fn [event]
(let [{:keys [type method args id]} (bean/->clj (.-data event))]
(when (not= type "response")
(p/let [[result error] (p/catch
(p/then (apply-target-f! target method args)
(fn [res] [res nil]))
(fn [e] [nil (if (instance? js/Error e)
(bean/->clj e)
e)]))]
(.postMessage client-channel (bean/->js
{:id id
:type "response"
:result result
:error error
:method-key (first args)})))))))
(.postMessage common-channel (bean/->js {:type "registered"
:clientId clientId
:providerId provider-id
:serviceName service-name})))))))
(.postMessage common-channel #js {:type "providerChange"
:providerId provider-id
:serviceName service-name})
(p/let [_ (when on-provider-change (on-provider-change service-name))
_ (when (seq @*requests-in-flight)
(js/console.log "Requests were in flight when tab became provider. Requeuing...")
(p/all (map
(fn [[id {:keys [method args resolve-fn reject-fn]}]]
(->
(p/let [result (apply-target-f! target method args)]
(resolve-fn result))
(p/catch (fn [e]
(js/console.error "Error processing request" e)
(reject-fn e)))
(p/finally (fn []
(swap! *requests-in-flight dissoc id)))))
@*requests-in-flight)))]
(p/resolve! (:ready status)))))
check-provider-f (fn [re-elect?]
(check-provider? service-name {:on-become-provider #(on-become-provider re-elect?)
:on-not-provider on-not-provider}))]
(check-provider-f false)
(add-watch *provider? :check-provider
(fn [_ _ _ new-value]
(when (= new-value :re-check)
(p/do!
(p/delay 100)
(check-provider-f true)))))
{:proxy (js/Proxy. target
#js {:get (fn [target method]
(cond
(#{:then :catch :finally} (keyword method))
{:proxy (js/Proxy. target
#js {:get (fn [target method]
(cond
(#{:then :catch :finally} (keyword method))
;; Return nil for these methods to allow promise chaining to work correctly
nil
nil
:else
(fn [args]
(let [provider? @*provider?]
(if provider?
(apply-target-f! target method args)
(p/create
(fn [resolve-fn reject-fn]
(let [id (random-id)
listener (get-on-request-listener id resolve-fn reject-fn)
channel @*client-channel]
(when channel
(.addEventListener channel "message" listener)
(.postMessage channel (bean/->js
{:id id
:type "request"
:method method
:args args})))
(swap! *requests-in-flight assoc id {:method method
:args args
:resolve-fn resolve-fn
:reject-fn reject-fn})))))))))})
:status status})))
:else
(fn [args]
(let [provider? @*provider?]
(if provider?
(apply-target-f! target method args)
(p/create
(fn [resolve-fn reject-fn]
(let [id (random-id)
listener (get-on-request-listener id resolve-fn reject-fn)
channel @*client-channel]
(when channel
(.addEventListener channel "message" listener)
(.postMessage channel (bean/->js
{:id id
:type "request"
:method method
:args args})))
(swap! *requests-in-flight assoc id {:method method
:args args
:resolve-fn resolve-fn
:reject-fn reject-fn})))))))))})
:status status}))
(defn broadcast-to-clients!
[payload]