From 4a72708e8d64aa4a68563e3aff1fb9f1b273a52a Mon Sep 17 00:00:00 2001 From: rcmerci Date: Mon, 14 Aug 2023 19:23:27 +0800 Subject: [PATCH] update rtc local-ops->remote-ops --- src/main/frontend/db/rtc/core.cljs | 178 +++++++++++++----- .../db/rtc/full_upload_download_graph.cljs | 46 +++-- src/main/frontend/db/rtc/op.cljs | 9 +- src/main/frontend/db/rtc/ws.cljs | 39 +++- src/main/frontend/modules/outliner/core.cljs | 2 +- 5 files changed, 198 insertions(+), 76 deletions(-) diff --git a/src/main/frontend/db/rtc/core.cljs b/src/main/frontend/db/rtc/core.cljs index 8dbe45a1e0..486eaeabde 100644 --- a/src/main/frontend/db/rtc/core.cljs +++ b/src/main/frontend/db/rtc/core.cljs @@ -1,4 +1,6 @@ (ns frontend.db.rtc.core + (:require-macros + [frontend.db.rtc.macro :refer [with-sub-data-from-ws get-req-id get-result-ch]]) (:require [datascript.core :as d] [frontend.db.conn :as conn] [frontend.util :as util] @@ -10,20 +12,10 @@ [frontend.modules.outliner.transaction :as outliner-tx] [frontend.modules.outliner.core :as outliner-core] [frontend.db :as db] - [clojure.set :as set])) - -(def ws-addr config/RTC-WS-URL) - -(defn ws-listen! - [user-uuid data-from-ws-chan ws-opened-ch] - (let [ws (js/WebSocket. (util/format ws-addr user-uuid))] - (set! (.-onopen ws) (fn [_e] (async/close! ws-opened-ch))) - (set! (.-onmessage ws) (fn [e] - (let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)] - (offer! data-from-ws-chan data)))) - - (set! (.-onclose ws) (fn [_e] (println :ws-stopped))) - ws)) + [frontend.db.rtc.ws :as ws] + [clojure.set :as set] + [frontend.state :as state] + [frontend.db.rtc.op :as op])) (defn init-rtc-op-db @@ -34,22 +26,22 @@ (def state-schema " - | :user-uuid | string | - | :data-from-ws-chan | channel for receive messages from server websocket | - | :data-from-ws-pub | pub of :data-from-ws-chan, dispatch by :req-id | - | :client-op-update-chan | channel to notify that there're some new operations | - | :upload-graph-chan | channel to receive presigned-upload-s3-url | - | :download-graph-chan | channel to receive presigned-s3-url to download remote-graph | - | :ws | websocket | + | :user-uuid | string | + | :*graph-uuid | atom of graph-uuid syncing now | + | :*repo | atom of repo name syncing now | + | :data-from-ws-chan | channel for receive messages from server websocket | + | :data-from-ws-pub | pub of :data-from-ws-chan, dispatch by :req-id | + | :client-op-update-chan | channel to notify that there're some new operations | + | :*ws | atom of websocket | " [:map [:user-uuid :string] + [:*graph-uuid :any] + [:*repo :any] [:data-from-ws-chan :any] [:data-from-ws-pub :any] [:client-op-update-chan :any] - [:upload-graph-chan :any] - [:download-graph-chan :any] - [:ws :any]]) + [:*ws :any]]) (def state-validator (m/validator state-schema)) (def data-from-ws-schema @@ -162,37 +154,127 @@ (apply-remote-remove-ops state remove-ops) (apply-remote-move-ops state sorted-move-ops))) -(defn remote-ops + [state ops] + {:pre [(some? @(:*repo state))]} + (let [repo @(:*repo state) + [remove-block-uuids-set update-block-uuids-set move-block-uuids-set] + (loop [[op & other-ops] ops + remove-block-uuids #{} + update-block-uuids #{} + move-block-uuids #{}] + (if-not op + [remove-block-uuids update-block-uuids move-block-uuids] + (case (first op) + "move" + (let [block-uuids (set (:block-uuids (second op))) + move-block-uuids (set/union move-block-uuids block-uuids) + remove-block-uuids (set/difference remove-block-uuids block-uuids)] + (recur other-ops remove-block-uuids update-block-uuids move-block-uuids)) + "remove" + (let [block-uuids (set (:block-uuids (second op))) + move-block-uuids (set/difference move-block-uuids block-uuids) + remove-block-uuids (set/union remove-block-uuids block-uuids)] + (recur other-ops remove-block-uuids update-block-uuids move-block-uuids)) + "update" + (let [block-uuid (:block-uuid (second op)) + update-block-uuids (conj update-block-uuids block-uuid)] + (recur other-ops remove-block-uuids update-block-uuids move-block-uuids)) + (throw (ex-info "unknown op type" op))))) + {move-ops "move" remove-ops "remove" _update-ops "update"} (group-by first ops) + move-block-uuids (->> move-ops + (keep (fn [op] + (let [block-uuids (set (:block-uuids (second op)))] + (seq (set/intersection move-block-uuids-set block-uuids))))) + (apply concat)) + remove-block-uuids-groups (->> remove-ops + (keep (fn [op] + (let [block-uuids (set (:block-uuids (second op)))] + (seq (set/intersection remove-block-uuids-set block-uuids)))))) + update-block-uuids (seq update-block-uuids-set) + move-ops* (keep + (fn [block-uuid] + (when-let [block (db/entity repo [:block/uuid (uuid block-uuid)])] + (let [left-uuid (some-> block :block/left :block/uuid str) + parent-uuid (some-> block :block/parent :block/uuid str)] + (when (and left-uuid parent-uuid) + ["move" + {:block-uuid block-uuid :target-uuid left-uuid :sibling? (not= left-uuid parent-uuid)}])))) + move-block-uuids) + remove-ops* (->> remove-block-uuids-groups + (keep + (fn [block-uuids] + (when-let [block-uuids* + (seq (filter + (fn [block-uuid] (not (db/entity repo [:block/uuid (uuid block-uuid)]))) + block-uuids))] + ["remove" {:block-uuids block-uuids*}])))) + update-ops* (->> update-block-uuids + (keep (fn [block-uuid] + (when-let [b (db/entity repo [:block/uuid (uuid block-uuid)])] + ["update" {:block-uuid block-uuid :content (:block/content b)}]))))] + [move-ops* remove-ops* update-ops*])) + + +(defn- remote-ops state ops) + r (with-sub-data-from-ws state + (c]] @@ -26,34 +28,23 @@ {:db/id (:e (first datoms))} datoms))))))) -(defn- }" [state] (go (let [{:keys [url key all-blocks-str]} (with-sub-data-from-ws state - (send (:ws state) {:req-id (get-req-id) :action "presign-put-temp-s3-obj" :graph-uuid "not-yet"}) + (c (ipc/ipc :db-new repo))) (c (ipc/ipc :db-transact-data repo (pr-str {:blocks blocks**})))))))) + + +(defn c (ipc/ipc :rtc/add-ops repo (pr-str [op]))))) (defn c (ipc/ipc :rtc/add-ops repo (pr-str [op]))))) diff --git a/src/main/frontend/db/rtc/ws.cljs b/src/main/frontend/db/rtc/ws.cljs index 2cee4d235b..ef823062b0 100644 --- a/src/main/frontend/db/rtc/ws.cljs +++ b/src/main/frontend/db/rtc/ws.cljs @@ -1,11 +1,46 @@ -(ns frontend.db.rtc.ws) +(ns frontend.db.rtc.ws + (:require [frontend.config :as config] + [frontend.util :as util] + [cljs.core.async :as async :refer [! chan go go-loop offer! + poll! timeout]])) -(defn send +(def ws-addr config/RTC-WS-URL) + +(defn ws-listen + [user-uuid data-from-ws-chan ws-opened-ch] + (let [ws (js/WebSocket. (util/format ws-addr user-uuid))] + (set! (.-onopen ws) (fn [_e] (async/close! ws-opened-ch))) + (set! (.-onmessage ws) (fn [e] + (let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)] + (offer! data-from-ws-chan data)))) + + (set! (.-onclose ws) (fn [_e] (println :ws-stopped))) + ws)) + +(defn send! [ws message] (assert (= js/WebSocket.OPEN (.-readyState ws))) (.send ws (js/JSON.stringify (clj->js message)))) +(defn (.-readyState ws) js/WebSocket.OPEN) + (let [ws-opened-ch (chan) + ws* (ws-listen (:user-uuid state) (:data-from-ws-chan state) ws-opened-ch)] + (