mirror of
https://github.com/logseq/logseq.git
synced 2026-05-23 20:24:15 +00:00
feat(rtc): throttle api calls
This commit is contained in:
@@ -11,6 +11,7 @@
|
||||
[frontend.worker.rtc.malli-schema :as rtc-schema]
|
||||
[frontend.worker.rtc.remote-update :as r.remote-update]
|
||||
[frontend.worker.rtc.skeleton :as r.skeleton]
|
||||
[frontend.worker.rtc.throttle :as r.throttle]
|
||||
[frontend.worker.rtc.ws :as ws]
|
||||
[frontend.worker.rtc.ws-util :as ws-util]
|
||||
[logseq.db :as ldb]
|
||||
@@ -418,11 +419,13 @@
|
||||
(when-let [ops-for-remote (rtc-schema/to-ws-ops-decoder remote-ops)]
|
||||
(let [local-tx (client-op/get-local-tx repo)
|
||||
r (try
|
||||
(m/? (ws-util/send&recv get-ws-create-task
|
||||
(cond-> {:action "apply-ops"
|
||||
:graph-uuid graph-uuid :schema-version (str major-schema-version)
|
||||
:ops ops-for-remote :t-before (or local-tx 1)}
|
||||
(true? @*remote-profile?) (assoc :profile true))))
|
||||
(let [message (cond-> {:action "apply-ops"
|
||||
:graph-uuid graph-uuid :schema-version (str major-schema-version)
|
||||
:ops ops-for-remote :t-before (or local-tx 1)}
|
||||
(true? @*remote-profile?) (assoc :profile true))
|
||||
r (m/? (ws-util/send&recv get-ws-create-task message))]
|
||||
(r.throttle/add-rtc-api-call-record! message)
|
||||
r)
|
||||
(catch :default e
|
||||
(rollback repo block-ops-map-coll update-kv-value-ops-map-coll rename-db-ident-ops-map-coll)
|
||||
(throw e)))]
|
||||
@@ -456,9 +459,11 @@
|
||||
[repo conn graph-uuid major-schema-version date-formatter get-ws-create-task add-log-fn]
|
||||
(m/sp
|
||||
(let [local-tx (client-op/get-local-tx repo)
|
||||
r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops"
|
||||
:graph-uuid graph-uuid :schema-version (str major-schema-version)
|
||||
:ops [] :t-before (or local-tx 1)}))]
|
||||
message {:action "apply-ops"
|
||||
:graph-uuid graph-uuid :schema-version (str major-schema-version)
|
||||
:ops [] :t-before (or local-tx 1)}
|
||||
r (m/? (ws-util/send&recv get-ws-create-task message))]
|
||||
(r.throttle/add-rtc-api-call-record! message)
|
||||
(if-let [remote-ex (:ex-data r)]
|
||||
(do (add-log-fn :rtc.log/pull-remote-data (assoc remote-ex :sub-type :pull-remote-data-exception))
|
||||
(case (:type remote-ex)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
[frontend.worker.rtc.log-and-state :as rtc-log-and-state]
|
||||
[frontend.worker.rtc.remote-update :as r.remote-update]
|
||||
[frontend.worker.rtc.skeleton]
|
||||
[frontend.worker.rtc.throttle :as r.throttle]
|
||||
[frontend.worker.rtc.ws :as ws]
|
||||
[frontend.worker.rtc.ws-util :as ws-util :refer [gen-get-ws-create-map--memoized]]
|
||||
[frontend.worker.shared-service :as shared-service]
|
||||
@@ -137,7 +138,7 @@
|
||||
(get-remote-updates get-ws-create-task))
|
||||
local-updates-check-flow (m/eduction
|
||||
(map (fn [data] {:type :local-update-check :value data}))
|
||||
(create-local-updates-check-flow repo *auto-push? 2000))
|
||||
(r.throttle/create-local-updates-check-flow repo *auto-push? 2000))
|
||||
inject-user-info-flow (create-inject-users-info-flow repo (m/watch *online-users))
|
||||
mix-flow (c.m/mix remote-updates-flow local-updates-check-flow inject-user-info-flow)]
|
||||
(c.m/mix mix-flow (create-pull-remote-updates-flow 60000 mix-flow))))
|
||||
|
||||
53
src/main/frontend/worker/rtc/throttle.cljs
Normal file
53
src/main/frontend/worker/rtc/throttle.cljs
Normal file
@@ -0,0 +1,53 @@
|
||||
(ns frontend.worker.rtc.throttle
|
||||
"Adjust the synchronization frequency dynamically based on the client's RTC-related API call volume."
|
||||
(:require [cljs.cache :as cache]
|
||||
[frontend.common.missionary :as c.m]
|
||||
[frontend.worker.rtc.client-op :as client-op]
|
||||
[lambdaisland.glogi :as log]
|
||||
[missionary.core :as m])
|
||||
(:import [missionary Cancelled]))
|
||||
|
||||
(def ^:private api-calls-count-threshold 5)
|
||||
(def ^:private *rtc-api-calls (atom (cache/ttl-cache-factory {} :ttl 30000)))
|
||||
|
||||
(defn- through
|
||||
[cache item]
|
||||
(let [k (random-uuid)]
|
||||
(cache/through (constantly item) cache k)))
|
||||
|
||||
(def ^:private sentinel (js-obj))
|
||||
(defn- get-items
|
||||
[cache]
|
||||
(let [cache*
|
||||
;; clean expired items
|
||||
(-> cache
|
||||
(cache/miss sentinel sentinel)
|
||||
(cache/evict sentinel))]
|
||||
(vals cache*)))
|
||||
|
||||
(defn- compute-stats
|
||||
"TODO: add more stat-data. e.g. total ws-message-size"
|
||||
[api-calls]
|
||||
{:count (count api-calls)})
|
||||
|
||||
(defn create-local-updates-check-flow
|
||||
[repo *auto-push? min-interval-ms]
|
||||
(let [auto-push-flow (m/watch *auto-push?)
|
||||
clock-flow (c.m/clock min-interval-ms :clock)
|
||||
check-flow (m/latest vector auto-push-flow clock-flow)]
|
||||
(m/ap
|
||||
(m/?< check-flow)
|
||||
(try
|
||||
(let [recent-rtc-api-calls-count (:count (compute-stats (get-items @*rtc-api-calls)))]
|
||||
(when (and goog.DEBUG
|
||||
(> recent-rtc-api-calls-count api-calls-count-threshold))
|
||||
(log/info :rtc-throttle {:recent-rtc-api-calls-count recent-rtc-api-calls-count}))
|
||||
(if (and (<= recent-rtc-api-calls-count api-calls-count-threshold)
|
||||
(pos? (client-op/get-unpushed-ops-count repo)))
|
||||
true
|
||||
(m/amb)))
|
||||
(catch Cancelled _ (m/amb))))))
|
||||
|
||||
(defn add-rtc-api-call-record!
|
||||
[api-call-record]
|
||||
(swap! *rtc-api-calls through api-call-record))
|
||||
Reference in New Issue
Block a user