diff --git a/src/main/frontend/worker/rtc/client.cljs b/src/main/frontend/worker/rtc/client.cljs index f920845f31..2d3acaff9a 100644 --- a/src/main/frontend/worker/rtc/client.cljs +++ b/src/main/frontend/worker/rtc/client.cljs @@ -11,6 +11,7 @@ [frontend.worker.rtc.malli-schema :as rtc-schema] [frontend.worker.rtc.remote-update :as r.remote-update] [frontend.worker.rtc.skeleton :as r.skeleton] + [frontend.worker.rtc.throttle :as r.throttle] [frontend.worker.rtc.ws :as ws] [frontend.worker.rtc.ws-util :as ws-util] [logseq.db :as ldb] @@ -418,11 +419,13 @@ (when-let [ops-for-remote (rtc-schema/to-ws-ops-decoder remote-ops)] (let [local-tx (client-op/get-local-tx repo) r (try - (m/? (ws-util/send&recv get-ws-create-task - (cond-> {:action "apply-ops" - :graph-uuid graph-uuid :schema-version (str major-schema-version) - :ops ops-for-remote :t-before (or local-tx 1)} - (true? @*remote-profile?) (assoc :profile true)))) + (let [message (cond-> {:action "apply-ops" + :graph-uuid graph-uuid :schema-version (str major-schema-version) + :ops ops-for-remote :t-before (or local-tx 1)} + (true? @*remote-profile?) (assoc :profile true)) + r (m/? (ws-util/send&recv get-ws-create-task message))] + (r.throttle/add-rtc-api-call-record! message) + r) (catch :default e (rollback repo block-ops-map-coll update-kv-value-ops-map-coll rename-db-ident-ops-map-coll) (throw e)))] @@ -456,9 +459,11 @@ [repo conn graph-uuid major-schema-version date-formatter get-ws-create-task add-log-fn] (m/sp (let [local-tx (client-op/get-local-tx repo) - r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops" - :graph-uuid graph-uuid :schema-version (str major-schema-version) - :ops [] :t-before (or local-tx 1)}))] + message {:action "apply-ops" + :graph-uuid graph-uuid :schema-version (str major-schema-version) + :ops [] :t-before (or local-tx 1)} + r (m/? (ws-util/send&recv get-ws-create-task message))] + (r.throttle/add-rtc-api-call-record! message) (if-let [remote-ex (:ex-data r)] (do (add-log-fn :rtc.log/pull-remote-data (assoc remote-ex :sub-type :pull-remote-data-exception)) (case (:type remote-ex) diff --git a/src/main/frontend/worker/rtc/core.cljs b/src/main/frontend/worker/rtc/core.cljs index 9dd0c47e6c..d5ea971104 100644 --- a/src/main/frontend/worker/rtc/core.cljs +++ b/src/main/frontend/worker/rtc/core.cljs @@ -16,6 +16,7 @@ [frontend.worker.rtc.log-and-state :as rtc-log-and-state] [frontend.worker.rtc.remote-update :as r.remote-update] [frontend.worker.rtc.skeleton] + [frontend.worker.rtc.throttle :as r.throttle] [frontend.worker.rtc.ws :as ws] [frontend.worker.rtc.ws-util :as ws-util :refer [gen-get-ws-create-map--memoized]] [frontend.worker.shared-service :as shared-service] @@ -137,7 +138,7 @@ (get-remote-updates get-ws-create-task)) local-updates-check-flow (m/eduction (map (fn [data] {:type :local-update-check :value data})) - (create-local-updates-check-flow repo *auto-push? 2000)) + (r.throttle/create-local-updates-check-flow repo *auto-push? 2000)) inject-user-info-flow (create-inject-users-info-flow repo (m/watch *online-users)) mix-flow (c.m/mix remote-updates-flow local-updates-check-flow inject-user-info-flow)] (c.m/mix mix-flow (create-pull-remote-updates-flow 60000 mix-flow)))) diff --git a/src/main/frontend/worker/rtc/throttle.cljs b/src/main/frontend/worker/rtc/throttle.cljs new file mode 100644 index 0000000000..f711de9526 --- /dev/null +++ b/src/main/frontend/worker/rtc/throttle.cljs @@ -0,0 +1,53 @@ +(ns frontend.worker.rtc.throttle + "Adjust the synchronization frequency dynamically based on the client's RTC-related API call volume." + (:require [cljs.cache :as cache] + [frontend.common.missionary :as c.m] + [frontend.worker.rtc.client-op :as client-op] + [lambdaisland.glogi :as log] + [missionary.core :as m]) + (:import [missionary Cancelled])) + +(def ^:private api-calls-count-threshold 5) +(def ^:private *rtc-api-calls (atom (cache/ttl-cache-factory {} :ttl 30000))) + +(defn- through + [cache item] + (let [k (random-uuid)] + (cache/through (constantly item) cache k))) + +(def ^:private sentinel (js-obj)) +(defn- get-items + [cache] + (let [cache* + ;; clean expired items + (-> cache + (cache/miss sentinel sentinel) + (cache/evict sentinel))] + (vals cache*))) + +(defn- compute-stats + "TODO: add more stat-data. e.g. total ws-message-size" + [api-calls] + {:count (count api-calls)}) + +(defn create-local-updates-check-flow + [repo *auto-push? min-interval-ms] + (let [auto-push-flow (m/watch *auto-push?) + clock-flow (c.m/clock min-interval-ms :clock) + check-flow (m/latest vector auto-push-flow clock-flow)] + (m/ap + (m/?< check-flow) + (try + (let [recent-rtc-api-calls-count (:count (compute-stats (get-items @*rtc-api-calls)))] + (when (and goog.DEBUG + (> recent-rtc-api-calls-count api-calls-count-threshold)) + (log/info :rtc-throttle {:recent-rtc-api-calls-count recent-rtc-api-calls-count})) + (if (and (<= recent-rtc-api-calls-count api-calls-count-threshold) + (pos? (client-op/get-unpushed-ops-count repo))) + true + (m/amb))) + (catch Cancelled _ (m/amb)))))) + +(defn add-rtc-api-call-record! + [api-call-record] + (swap! *rtc-api-calls through api-call-record))