diff --git a/src/main/frontend/components/rtc/flows.cljs b/src/main/frontend/components/rtc/flows.cljs index a57e5c35f1..1cf02fd8d4 100644 --- a/src/main/frontend/components/rtc/flows.cljs +++ b/src/main/frontend/components/rtc/flows.cljs @@ -1,6 +1,7 @@ (ns frontend.components.rtc.flows (:require [frontend.state :as state] - [missionary.core :as m])) + [missionary.core :as m] + [cljs-time.core :as t])) (def rtc-log-flow (m/watch (:rtc/log @state/state))) @@ -22,3 +23,27 @@ (def rtc-state-flow (m/watch (:rtc/state @state/state))) + +(defn create-rtc-recent-updates-flow + "Keep recent-updates for N minutes. + graph-uuid->user-uuid->[instant {:keys [update-block-uuids :delete-block-uuids]}]" + [minutes] + (let [*buffer (atom {})] + (m/ap + (let [latest-updates (m/?> (m/watch (:rtc/recent-updates @state/state)))] + (when-let [graph-uuid (first (keys latest-updates))] + (let [mins-ago (t/minus (t/now) (t/minutes minutes)) + latest-keys (map (fn [[user-uuid _]] user-uuid) (get latest-updates graph-uuid)) + new-map + {graph-uuid + (into {} + (map (fn [k] + [k + (take-while + (fn [[inst _]] (> inst mins-ago)) + (concat (get-in latest-updates [graph-uuid k]) + (get-in @*buffer [graph-uuid k])))])) + latest-keys)}] + (prn :debug new-map latest-updates) + (swap! *buffer merge new-map) + @*buffer)))))) diff --git a/src/main/frontend/components/rtc/indicator.cljs b/src/main/frontend/components/rtc/indicator.cljs index b9c2c79016..7ce89adb9e 100644 --- a/src/main/frontend/components/rtc/indicator.cljs +++ b/src/main/frontend/components/rtc/indicator.cljs @@ -1,7 +1,6 @@ (ns frontend.components.rtc.indicator "RTC state indicator" (:require [cljs-time.core :as t] - [fipp.edn :as fipp] [frontend.common.missionary-util :as c.m] [frontend.components.rtc.flows :as rtc-flows] [frontend.state :as state] @@ -9,7 +8,8 @@ [frontend.util :as util] [logseq.shui.ui :as shui] [missionary.core :as m] - [rum.core :as rum])) + [rum.core :as rum] + [clojure.pprint :as pprint])) (comment (def rtc-state-schema @@ -21,6 +21,7 @@ :local-tx nil :remote-tx nil :rtc-state :open + :graph-uuid->user-uuid->recent-updates nil :download-logs nil :upload-logs nil :misc-logs nil})) @@ -36,13 +37,21 @@ (fn [_ log] (when log (swap! *detail-info update k (fn [logs] (take 5 (conj logs log)))))) - flow))] + flow)) + (update-recent-updates-task [] + (m/reduce + (fn [_ recent-updates] + (when recent-updates + (swap! *detail-info assoc :graph-uuid->user-uuid->recent-updates recent-updates))) + ;; recent 5mins updates + (rtc-flows/create-rtc-recent-updates-flow 5)))] (let [canceler (c.m/run-task (m/join (constantly nil) (update-log-task rtc-flows/rtc-download-log-flow :download-logs) (update-log-task rtc-flows/rtc-upload-log-flow :upload-logs) (update-log-task rtc-flows/rtc-misc-log-flow :misc-logs) + (update-recent-updates-task) (m/reduce (fn [_ state] (swap! *detail-info assoc :pending-local-ops (:unpushed-block-update-count state) @@ -59,8 +68,9 @@ (rum/local false ::expand-debug-info?) [state online?] (let [*expand-debug? (::expand-debug-info? state) - {:keys [graph-uuid local-tx remote-tx rtc-state - download-logs upload-logs misc-logs pending-local-ops pending-server-ops]} (rum/react *detail-info)] + {:keys [graph-uuid local-tx remote-tx rtc-state graph-uuid->user-uuid->recent-updates + download-logs upload-logs misc-logs pending-local-ops pending-server-ops]} + (rum/react *detail-info)] [:div.rtc-info.flex.flex-col.gap-1.p-2.text-gray-11 [:div.font-medium.mb-2 (if online? "Online" "Offline")] [:div [:span.font-medium.mr-1 (or pending-local-ops 0)] "pending local changes"] @@ -83,8 +93,10 @@ graph-uuid (assoc :graph-uuid graph-uuid) local-tx (assoc :local-tx local-tx) remote-tx (assoc :remote-tx remote-tx) - rtc-state (assoc :rtc-state rtc-state)) - (fipp/pprint {:width 20}) + rtc-state (assoc :rtc-state rtc-state) + graph-uuid->user-uuid->recent-updates + (assoc :graph-uuid->user-uuid->recent-updates graph-uuid->user-uuid->recent-updates)) + pprint/pprint with-out-str)]])])) (defn- downloading? @@ -101,35 +113,36 @@ (> 600 (/ (- (t/now) created-at) 1000))))) + (rum/defc indicator < rum/reactive - [] - (let [detail-info (rum/react *detail-info) - _ (state/sub :auth/id-token) - online? (state/sub :network/online?) - uploading? (uploading? detail-info) - downloading? (downloading? detail-info) - rtc-state (:rtc-state detail-info) - unpushed-block-update-count (:pending-local-ops detail-info)] - [:div.cp__rtc-sync - [:div.cp__rtc-sync-indicator.flex.flex-row.items-center.gap-1 - (when downloading? - (shui/button - {:class "opacity-50" - :variant :ghost - :size :sm} - "Downloading...")) - (when uploading? - (shui/button - {:class "opacity-50" - :variant :ghost - :size :sm} - "Uploading...")) - [:a.button.cloud - {:on-click #(shui/popup-show! (.-target %) - (details online?) - {:align "end"}) - :class (util/classnames [{:on (and online? (= :open rtc-state)) - :idle (and online? (= :open rtc-state) (zero? unpushed-block-update-count)) - :queuing (pos? unpushed-block-update-count)}])} - [:span.flex.items-center - (ui/icon "cloud" {:size ui/icon-size})]]]])) + [] + (let [detail-info (rum/react *detail-info) + _ (state/sub :auth/id-token) + online? (state/sub :network/online?) + uploading? (uploading? detail-info) + downloading? (downloading? detail-info) + rtc-state (:rtc-state detail-info) + unpushed-block-update-count (:pending-local-ops detail-info)] + [:div.cp__rtc-sync + [:div.cp__rtc-sync-indicator.flex.flex-row.items-center.gap-1 + (when downloading? + (shui/button + {:class "opacity-50" + :variant :ghost + :size :sm} + "Downloading...")) + (when uploading? + (shui/button + {:class "opacity-50" + :variant :ghost + :size :sm} + "Uploading...")) + [:a.button.cloud + {:on-click #(shui/popup-show! (.-target %) + (details online?) + {:align "end"}) + :class (util/classnames [{:on (and online? (= :open rtc-state)) + :idle (and online? (= :open rtc-state) (zero? unpushed-block-update-count)) + :queuing (pos? unpushed-block-update-count)}])} + [:span.flex.items-center + (ui/icon "cloud" {:size ui/icon-size})]]]])) diff --git a/src/main/frontend/handler/events.cljs b/src/main/frontend/handler/events.cljs index d788637b8f..8e177fb733 100644 --- a/src/main/frontend/handler/events.cljs +++ b/src/main/frontend/handler/events.cljs @@ -1043,6 +1043,9 @@ (defmethod handle :rtc/log [[_ data]] (state/set-state! :rtc/log data)) +(defmethod handle :rtc/recent-updates [[_ data]] + (state/set-state! :rtc/recent-updates data)) + (defmethod handle :rtc/download-remote-graph [[_ graph-name graph-uuid]] (-> (p/do! diff --git a/src/main/frontend/handler/worker.cljs b/src/main/frontend/handler/worker.cljs index 485a425186..c013eb3430 100644 --- a/src/main/frontend/handler/worker.cljs +++ b/src/main/frontend/handler/worker.cljs @@ -39,6 +39,9 @@ (defmethod handle :rtc-log [_ _worker log] (state/pub-event! [:rtc/log log])) +(defmethod handle :rtc-recent-updates [_ _worker log] + (state/pub-event! [:rtc/recent-updates log])) + (defmethod handle :default [_ _worker data] (prn :debug "Worker data not handled: " data)) diff --git a/src/main/frontend/state.cljs b/src/main/frontend/state.cljs index 3ab7e72382..e05069d710 100644 --- a/src/main/frontend/state.cljs +++ b/src/main/frontend/state.cljs @@ -288,6 +288,7 @@ ;; only latest rtc-log stored here, when a log stream is needed, ;; use missionary to create a rtc-log-flow, use (missionary.core/watch ) :rtc/log (atom nil) + :rtc/recent-updates (atom {}) :rtc/uploading? false :rtc/downloading-graph-uuid nil :rtc/graphs [] diff --git a/src/main/frontend/worker/rtc/log_and_state.cljs b/src/main/frontend/worker/rtc/log_and_state.cljs index 9ad733dd80..48c2947d07 100644 --- a/src/main/frontend/worker/rtc/log_and_state.cljs +++ b/src/main/frontend/worker/rtc/log_and_state.cljs @@ -1,7 +1,6 @@ (ns frontend.worker.rtc.log-and-state "Fns to generate rtc related logs" - (:require [frontend.common.missionary-util :as c.m] - [frontend.schema-register :as sr] + (:require [frontend.schema-register :as sr] [frontend.worker.util :as worker-util] [malli.core :as ma] [missionary.core :as m])) @@ -30,21 +29,20 @@ (reset! *rtc-log (assoc m :type type :created-at (js/Date.))) nil) -(def rtc-log-flow (m/watch *rtc-log)) - - ;;; some other states (def ^:private graph-uuid->t-schema [:map-of :uuid :int]) -(def ^:private graph-uuid->t-validator (ma/validator graph-uuid->t-schema)) -(def ^:private graph-uuid->t-validator* (fn [v] (if (graph-uuid->t-validator v) - true - (do (prn :debug-graph-uuid->t-validator v) - false)))) -(def *graph-uuid->local-t (atom {} :validator graph-uuid->t-validator*)) -(def *graph-uuid->remote-t (atom {} :validator graph-uuid->t-validator*)) +(def ^:private graph-uuid->t-validator (let [validator (ma/validator graph-uuid->t-schema)] + (fn [v] + (if (validator v) + true + (do (prn :debug-graph-uuid->t-validator v) + false))))) + +(def *graph-uuid->local-t (atom {} :validator graph-uuid->t-validator)) +(def *graph-uuid->remote-t (atom {} :validator graph-uuid->t-validator)) (defn- ensure-uuid [v] @@ -55,16 +53,18 @@ (defn create-local-t-flow [graph-uuid] - (m/eduction - (map (fn [m] (get m (ensure-uuid graph-uuid)))) - (m/watch *graph-uuid->local-t))) + (->> (m/watch *graph-uuid->local-t) + (m/eduction (keep (fn [m] (get m (ensure-uuid graph-uuid))))) + (m/reductions {} nil) + (m/latest identity))) (defn create-remote-t-flow [graph-uuid] {:pre [(some? graph-uuid)]} - (m/eduction - (map (fn [m] (get m (ensure-uuid graph-uuid)))) - (m/watch *graph-uuid->remote-t))) + (->> (m/watch *graph-uuid->remote-t) + (m/eduction (keep (fn [m] (get m (ensure-uuid graph-uuid))))) + (m/reductions {} nil) + (m/latest identity))) (defn update-local-t [graph-uuid local-t] @@ -74,18 +74,43 @@ [graph-uuid remote-t] (swap! *graph-uuid->remote-t assoc (ensure-uuid graph-uuid) remote-t)) +;;; recent-updates flow +(def graph-uuid->recent-updates-schema + [:map-of :uuid ;; graph-uuid + [:map-of :uuid ;;user-uuid + [:sequential + [:cat + inst? + [:map + [:update-block-uuids {:optional true} [:set :uuid]] + [:delete-block-uuids {:optional true} [:set :uuid]]]]]]]) + +(def graph-uuid->recent-updates-validator + (let [validator (ma/validator graph-uuid->recent-updates-schema)] + (fn [v] + (if (validator v) + true + (do (prn :debug-graph-uuid->recent-updates-validator v) + false))))) + +(def *graph-uuid->recent-updates (atom {} :validator graph-uuid->recent-updates-validator)) + +(defn update-recent-updates + [graph-uuid recent-updates] + (reset! *graph-uuid->recent-updates {(ensure-uuid graph-uuid) recent-updates})) + ;;; subscribe-logs, push to frontend -(defonce ^:private *last-subscribe-logs-canceler (atom nil)) (defn- subscribe-logs [] - (when-let [canceler @*last-subscribe-logs-canceler] - (canceler) - (reset! *last-subscribe-logs-canceler nil)) - (let [cancel (c.m/run-task - (m/reduce - (fn [_ v] (when v (worker-util/post-message :rtc-log v))) - rtc-log-flow) - :subscribe-logs)] - (reset! *last-subscribe-logs-canceler cancel) - nil)) + (remove-watch *rtc-log :subscribe-logs) + (add-watch *rtc-log :subscribe-logs + (fn [_ _ _ n] (when n (worker-util/post-message :rtc-log n))))) (subscribe-logs) + +;;; subscribe-recent-updates, push to frontend +(defn- subscribe-recent-updates + [] + (remove-watch *graph-uuid->recent-updates :subscribe-recent-updates) + (add-watch *graph-uuid->recent-updates :subscribe-recent-updates + (fn [_ _ _ n] (when n (worker-util/post-message :rtc-recent-updates n))))) +(subscribe-recent-updates) diff --git a/src/main/frontend/worker/rtc/remote_update.cljs b/src/main/frontend/worker/rtc/remote_update.cljs index 805d0192bb..6312ac82ce 100644 --- a/src/main/frontend/worker/rtc/remote_update.cljs +++ b/src/main/frontend/worker/rtc/remote_update.cljs @@ -524,10 +524,12 @@ [graph-uuid repo conn date-formatter remote-update-event add-log-fn] (let [remote-update-data (:value remote-update-event)] (assert (rtc-const/data-from-ws-validator remote-update-data) remote-update-data) - (let [remote-t (:t remote-update-data) + (let [recent-updates (:recent-updates remote-update-data) + remote-t (:t remote-update-data) remote-t-before (:t-before remote-update-data) local-tx (op-mem-layer/get-local-tx repo)] (rtc-log-and-state/update-remote-t graph-uuid remote-t) + (rtc-log-and-state/update-recent-updates graph-uuid recent-updates) (cond (not (and (pos? remote-t) (pos? remote-t-before)))