mirror of
https://github.com/logseq/logseq.git
synced 2026-05-28 14:39:48 +00:00
refactor: replace 'provider' by 'master, slave'
This commit is contained in:
@@ -471,7 +471,7 @@
|
||||
(p/do!
|
||||
(when close-other-db?
|
||||
(close-other-dbs! repo))
|
||||
(when @shared-service/*provider?
|
||||
(when @shared-service/*master-client?
|
||||
(create-or-open-db! repo (dissoc opts :close-other-db?)))
|
||||
nil))
|
||||
|
||||
@@ -839,7 +839,7 @@
|
||||
(file/write-files! conn col (worker-state/get-context)))
|
||||
(js/console.error (str "DB is not found for " repo))))))))
|
||||
|
||||
(defn on-become-provider
|
||||
(defn on-become-master
|
||||
[repo]
|
||||
(p/do!
|
||||
(init-sqlite-module!)
|
||||
@@ -852,7 +852,7 @@
|
||||
(when (and graph (not= graph (first @*service)))
|
||||
(p/let [service (shared-service/create-service graph
|
||||
(bean/->js fns)
|
||||
{:on-provider-change #(on-become-provider graph)})]
|
||||
#(on-become-master graph))]
|
||||
(assert (p/promise? (get-in service [:status :ready])))
|
||||
(reset! *service [graph service])
|
||||
service)))
|
||||
@@ -875,7 +875,7 @@
|
||||
(cond
|
||||
(or (contains? #{:thread-api/init-shared-service :thread-api/sync-app-state} method-k)
|
||||
(nil? service)
|
||||
@shared-service/*provider?)
|
||||
@shared-service/*master-client?)
|
||||
(apply f args)
|
||||
|
||||
:else
|
||||
|
||||
@@ -8,23 +8,25 @@
|
||||
|
||||
;; TODO:
|
||||
;; - client-channel close before re-creating new one
|
||||
;; - change all 'provider' to 'master-client' & 'slave-client'
|
||||
|
||||
;; Idea and code copied from https://github.com/Matt-TOTW/shared-service/blob/master/src/sharedService.ts
|
||||
;; Related thread: https://github.com/rhashimoto/wa-sqlite/discussions/81
|
||||
|
||||
(defonce *provider? (atom false))
|
||||
(defonce *master-client? (atom false))
|
||||
;;; common-channel - Communication related to master-client election.
|
||||
;;; client-channel - For API request-response data communication.
|
||||
(defonce *common-channel (atom nil))
|
||||
(defonce *client-channel (atom nil))
|
||||
|
||||
(defonce *requests-in-flight (atom {}))
|
||||
;;; The unique identity of the context where `js/navigator.locks.request` is called
|
||||
(defonce *client-id (atom nil))
|
||||
(defonce *provider-lock (atom nil))
|
||||
(defonce *master-client-lock (atom nil))
|
||||
|
||||
(defn- release-provider-lock!
|
||||
(defn- release-master-client-lock!
|
||||
[]
|
||||
(when-let [d @*provider-lock]
|
||||
(p/resolve! d nil)
|
||||
(when-let [d @*master-client-lock]
|
||||
(p/resolve! d)
|
||||
nil))
|
||||
|
||||
(defn- get-broadcast-channel-name [client-id service-name]
|
||||
@@ -55,8 +57,9 @@
|
||||
(when-let [f (gobj/get target method)]
|
||||
(apply f args)))
|
||||
|
||||
(defn- check-provider?
|
||||
[service-name {:keys [on-become-provider on-not-provider]}]
|
||||
(defn- check-master-or-slave-client!
|
||||
"Check if the current client is the master (otherwise, it is a slave)"
|
||||
[service-name on-become-master on-become-slave]
|
||||
(p/let [client-id (or @*client-id (get-client-id))]
|
||||
(js/navigator.locks.request service-name #js {:mode "exclusive", :ifAvailable true}
|
||||
(fn [_lock]
|
||||
@@ -67,20 +70,20 @@
|
||||
(.-held locks))]
|
||||
(if locked?
|
||||
(p/do!
|
||||
(reset! *provider? true)
|
||||
(on-become-provider)
|
||||
(reset! *provider-lock (p/deferred))
|
||||
(reset! *master-client? true)
|
||||
(on-become-master)
|
||||
(reset! *master-client-lock (p/deferred))
|
||||
;; Keep lock until context destroyed
|
||||
@*provider-lock)
|
||||
(do
|
||||
(reset! *provider? false)
|
||||
(on-not-provider))))))))
|
||||
@*master-client-lock)
|
||||
(p/do!
|
||||
(reset! *master-client? false)
|
||||
(on-become-slave))))))))
|
||||
|
||||
(defn- clear-old-service!
|
||||
[]
|
||||
(p/do!
|
||||
(release-provider-lock!)
|
||||
(reset! *provider? false)
|
||||
(release-master-client-lock!)
|
||||
(reset! *master-client? false)
|
||||
(when-let [^js channel @*common-channel]
|
||||
(.close channel))
|
||||
(when-let [^js channel @*client-channel]
|
||||
@@ -109,44 +112,45 @@
|
||||
(reset! *common-channel channel)
|
||||
channel)))
|
||||
|
||||
(defn- on-slave-client
|
||||
(defn- on-become-slave
|
||||
[slave-client-id service-name common-channel status-ready-deferred-p]
|
||||
(reset! *client-channel (js/BroadcastChannel. (get-broadcast-channel-name slave-client-id service-name)))
|
||||
(let [register (fn register []
|
||||
(p/create
|
||||
(fn [resolve-fn _]
|
||||
(letfn [(listener [event]
|
||||
(let [{:keys [_providerId clientId type]} (bean/->clj (.-data event))]
|
||||
(when (and (= clientId slave-client-id) (= type "registered"))
|
||||
(let [{:keys [_master-client-id type]
|
||||
slave-client-id* :slave-client-id} (bean/->clj (.-data event))]
|
||||
(when (and (= slave-client-id* slave-client-id) (= type "slave-registered"))
|
||||
(js/navigator.locks.request service-name #js {:mode "exclusive"}
|
||||
(fn [_lock]
|
||||
;; The provider has gone, elect the new provider
|
||||
(prn :debug "Provider has gone")
|
||||
(reset! *provider? :re-check)))
|
||||
;; The master has gone, elect the new master
|
||||
(prn :debug "master has gone")
|
||||
(reset! *master-client? :re-check)))
|
||||
(.removeEventListener common-channel "message" listener)
|
||||
(resolve-fn nil))))]
|
||||
(.addEventListener common-channel "message" listener)
|
||||
(.postMessage common-channel #js {:type "register" :clientId slave-client-id})))))]
|
||||
(.postMessage common-channel #js {:type "slave-register" :slave-client-id slave-client-id})))))]
|
||||
(.addEventListener common-channel "message"
|
||||
(fn [event]
|
||||
(let [{:keys [type data]} (bean/->clj (.-data event))]
|
||||
(case type
|
||||
"providerChange"
|
||||
(do
|
||||
(js/console.log "Provider change detected. Re-registering...")
|
||||
(register)
|
||||
(when (seq @*requests-in-flight)
|
||||
(js/console.log "Requests were in flight when provider changed. Requeuing...")
|
||||
(p/all (map
|
||||
(fn [[id {:keys [method args resolve-fn reject-fn]}]]
|
||||
(let [listener (get-on-request-listener id resolve-fn reject-fn)]
|
||||
(when-let [channel @*client-channel]
|
||||
(.addEventListener channel "message" listener)
|
||||
(.postMessage channel (bean/->js {:id id
|
||||
:type "request"
|
||||
:method method
|
||||
:args args})))))
|
||||
@*requests-in-flight))))
|
||||
"master-changed"
|
||||
(p/do!
|
||||
(js/console.log "master-client change detected. Re-registering...")
|
||||
(register)
|
||||
(when (seq @*requests-in-flight)
|
||||
(js/console.log "Requests were in flight when master changed. Requeuing...")
|
||||
(p/all (map
|
||||
(fn [[id {:keys [method args resolve-fn reject-fn]}]]
|
||||
(let [listener (get-on-request-listener id resolve-fn reject-fn)]
|
||||
(when-let [channel @*client-channel]
|
||||
(.addEventListener channel "message" listener)
|
||||
(.postMessage channel (bean/->js {:id id
|
||||
:type "request"
|
||||
:method method
|
||||
:args args})))))
|
||||
@*requests-in-flight))))
|
||||
|
||||
"sync-db-changes"
|
||||
(worker-util/post-message :sync-db-changes (ldb/read-transit-str data))
|
||||
@@ -160,75 +164,76 @@
|
||||
(js/console.error error))))))
|
||||
|
||||
(defn ^:large-vars/cleanup-todo create-service
|
||||
[service-name target {:keys [on-provider-change]}]
|
||||
[service-name target on-become-master-handler]
|
||||
(p/let [_ (clear-old-service!)
|
||||
status {:ready (p/deferred)}
|
||||
common-channel (ensure-common-channel service-name)
|
||||
client-id (or @*client-id (get-client-id))
|
||||
on-become-provider (fn [_re-elect?]
|
||||
(p/let [master-client-id client-id]
|
||||
(prn :debug :become-master master-client-id :service service-name)
|
||||
(.addEventListener
|
||||
common-channel "message"
|
||||
(fn [event]
|
||||
(let [{:keys [clientId type]} (bean/->clj (.-data event))]
|
||||
(when (= type "register")
|
||||
(let [client-channel (js/BroadcastChannel. (get-broadcast-channel-name clientId service-name))]
|
||||
(js/navigator.locks.request clientId #js {:mode "exclusive"}
|
||||
(fn [_]
|
||||
on-become-master (fn [_re-elect?]
|
||||
(p/let [master-client-id client-id]
|
||||
(prn :debug :become-master master-client-id :service service-name)
|
||||
(.addEventListener
|
||||
common-channel "message"
|
||||
(fn [event]
|
||||
(let [{:keys [slave-client-id type]} (bean/->clj (.-data event))]
|
||||
(when (= type "slave-register")
|
||||
(let [client-channel (js/BroadcastChannel. (get-broadcast-channel-name slave-client-id service-name))]
|
||||
(js/navigator.locks.request slave-client-id #js {:mode "exclusive"}
|
||||
(fn [_]
|
||||
;; The client has gone. Clean up
|
||||
(.close client-channel)))
|
||||
(.close client-channel)))
|
||||
|
||||
(.addEventListener client-channel "message"
|
||||
(fn [event]
|
||||
(let [{:keys [type method args id]} (bean/->clj (.-data event))]
|
||||
(when (not= type "response")
|
||||
(p/let [[result error] (p/catch
|
||||
(p/then (apply-target-f! target method args)
|
||||
(fn [res] [res nil]))
|
||||
(fn [e] [nil (if (instance? js/Error e)
|
||||
(bean/->clj e)
|
||||
e)]))]
|
||||
(.postMessage client-channel (bean/->js
|
||||
{:id id
|
||||
:type "response"
|
||||
:result result
|
||||
:error error
|
||||
:method-key (first args)})))))))
|
||||
(.postMessage common-channel (bean/->js {:type "registered"
|
||||
:clientId clientId
|
||||
:providerId master-client-id
|
||||
:serviceName service-name})))))))
|
||||
(.postMessage common-channel #js {:type "providerChange"
|
||||
:providerId master-client-id
|
||||
:serviceName service-name})
|
||||
(p/let [_ (when on-provider-change (on-provider-change service-name))
|
||||
_ (when (seq @*requests-in-flight)
|
||||
(js/console.log "Requests were in flight when tab became provider. Requeuing...")
|
||||
(p/all (map
|
||||
(fn [[id {:keys [method args resolve-fn reject-fn]}]]
|
||||
(->
|
||||
(p/let [result (apply-target-f! target method args)]
|
||||
(resolve-fn result))
|
||||
(p/catch (fn [e]
|
||||
(js/console.error "Error processing request" e)
|
||||
(reject-fn e)))
|
||||
(p/finally (fn []
|
||||
(swap! *requests-in-flight dissoc id)))))
|
||||
@*requests-in-flight)))]
|
||||
(p/resolve! (:ready status)))))
|
||||
check-provider-f (fn [re-elect?]
|
||||
(check-provider? service-name
|
||||
{:on-become-provider #(on-become-provider re-elect?)
|
||||
:on-not-provider #(on-slave-client client-id service-name common-channel (:ready status))}))]
|
||||
(check-provider-f false)
|
||||
(.addEventListener client-channel "message"
|
||||
(fn [event]
|
||||
(let [{:keys [type method args id]} (bean/->clj (.-data event))]
|
||||
(when (not= type "response")
|
||||
(p/let [[result error] (p/catch
|
||||
(p/then (apply-target-f! target method args)
|
||||
(fn [res] [res nil]))
|
||||
(fn [e] [nil (if (instance? js/Error e)
|
||||
(bean/->clj e)
|
||||
e)]))]
|
||||
(.postMessage client-channel (bean/->js
|
||||
{:id id
|
||||
:type "response"
|
||||
:result result
|
||||
:error error
|
||||
:method-key (first args)})))))))
|
||||
(.postMessage common-channel (bean/->js {:type "slave-registered"
|
||||
:slave-client-id slave-client-id
|
||||
:master-client-id master-client-id
|
||||
:serviceName service-name})))))))
|
||||
(.postMessage common-channel #js {:type "master-changed"
|
||||
:master-client-id master-client-id
|
||||
:serviceName service-name})
|
||||
(p/let [_ (on-become-master-handler service-name)
|
||||
_ (when (seq @*requests-in-flight)
|
||||
(js/console.log "Requests were in flight when tab became master. Requeuing...")
|
||||
(p/all (map
|
||||
(fn [[id {:keys [method args resolve-fn reject-fn]}]]
|
||||
(->
|
||||
(p/let [result (apply-target-f! target method args)]
|
||||
(resolve-fn result))
|
||||
(p/catch (fn [e]
|
||||
(js/console.error "Error processing request" e)
|
||||
(reject-fn e)))
|
||||
(p/finally (fn []
|
||||
(swap! *requests-in-flight dissoc id)))))
|
||||
@*requests-in-flight)))]
|
||||
(p/resolve! (:ready status)))))
|
||||
check-master-slave-fn! (fn [re-elect?]
|
||||
(check-master-or-slave-client!
|
||||
service-name
|
||||
#(on-become-master re-elect?)
|
||||
#(on-become-slave client-id service-name common-channel (:ready status))))]
|
||||
(check-master-slave-fn! false)
|
||||
|
||||
(add-watch *provider? :check-provider
|
||||
(add-watch *master-client? :check-master
|
||||
(fn [_ _ _ new-value]
|
||||
(when (= new-value :re-check)
|
||||
(p/do!
|
||||
(p/delay 100)
|
||||
(check-provider-f true)))))
|
||||
(check-master-slave-fn! true)))))
|
||||
|
||||
{:proxy (js/Proxy. target
|
||||
#js {:get (fn [target method]
|
||||
@@ -239,8 +244,8 @@
|
||||
|
||||
:else
|
||||
(fn [args]
|
||||
(let [provider? @*provider?]
|
||||
(if provider?
|
||||
(let [master-client? @*master-client?]
|
||||
(if master-client?
|
||||
(apply-target-f! target method args)
|
||||
(p/create
|
||||
(fn [resolve-fn reject-fn]
|
||||
|
||||
Reference in New Issue
Block a user