update rtc local-ops->remote-ops

This commit is contained in:
rcmerci
2023-08-14 19:23:27 +08:00
parent 2181525bd8
commit 4a72708e8d
5 changed files with 198 additions and 76 deletions

View File

@@ -1,4 +1,6 @@
(ns frontend.db.rtc.core
(:require-macros
[frontend.db.rtc.macro :refer [with-sub-data-from-ws get-req-id get-result-ch]])
(:require [datascript.core :as d]
[frontend.db.conn :as conn]
[frontend.util :as util]
@@ -10,20 +12,10 @@
[frontend.modules.outliner.transaction :as outliner-tx]
[frontend.modules.outliner.core :as outliner-core]
[frontend.db :as db]
[clojure.set :as set]))
(def ws-addr config/RTC-WS-URL)
(defn ws-listen!
[user-uuid data-from-ws-chan ws-opened-ch]
(let [ws (js/WebSocket. (util/format ws-addr user-uuid))]
(set! (.-onopen ws) (fn [_e] (async/close! ws-opened-ch)))
(set! (.-onmessage ws) (fn [e]
(let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)]
(offer! data-from-ws-chan data))))
(set! (.-onclose ws) (fn [_e] (println :ws-stopped)))
ws))
[frontend.db.rtc.ws :as ws]
[clojure.set :as set]
[frontend.state :as state]
[frontend.db.rtc.op :as op]))
(defn init-rtc-op-db
@@ -34,22 +26,22 @@
(def state-schema
"
| :user-uuid | string |
| :data-from-ws-chan | channel for receive messages from server websocket |
| :data-from-ws-pub | pub of :data-from-ws-chan, dispatch by :req-id |
| :client-op-update-chan | channel to notify that there're some new operations |
| :upload-graph-chan | channel to receive presigned-upload-s3-url |
| :download-graph-chan | channel to receive presigned-s3-url to download remote-graph |
| :ws | websocket |
| :user-uuid | string |
| :*graph-uuid | atom of graph-uuid syncing now |
| :*repo | atom of repo name syncing now |
| :data-from-ws-chan | channel for receive messages from server websocket |
| :data-from-ws-pub | pub of :data-from-ws-chan, dispatch by :req-id |
| :client-op-update-chan | channel to notify that there're some new operations |
| :*ws | atom of websocket |
"
[:map
[:user-uuid :string]
[:*graph-uuid :any]
[:*repo :any]
[:data-from-ws-chan :any]
[:data-from-ws-pub :any]
[:client-op-update-chan :any]
[:upload-graph-chan :any]
[:download-graph-chan :any]
[:ws :any]])
[:*ws :any]])
(def state-validator (m/validator state-schema))
(def data-from-ws-schema
@@ -162,37 +154,127 @@
(apply-remote-remove-ops state remove-ops)
(apply-remote-move-ops state sorted-move-ops)))
(defn <loop-for-rtc
[state]
{:pre [(state-validator state)]}
(defn- push-data-from-ws-handler
[state push-data-from-ws]
(prn :push-data-from-ws push-data-from-ws)
;; TODO
)
(defn- client-ops->remote-ops
[state ops]
{:pre [(some? @(:*repo state))]}
(let [repo @(:*repo state)
[remove-block-uuids-set update-block-uuids-set move-block-uuids-set]
(loop [[op & other-ops] ops
remove-block-uuids #{}
update-block-uuids #{}
move-block-uuids #{}]
(if-not op
[remove-block-uuids update-block-uuids move-block-uuids]
(case (first op)
"move"
(let [block-uuids (set (:block-uuids (second op)))
move-block-uuids (set/union move-block-uuids block-uuids)
remove-block-uuids (set/difference remove-block-uuids block-uuids)]
(recur other-ops remove-block-uuids update-block-uuids move-block-uuids))
"remove"
(let [block-uuids (set (:block-uuids (second op)))
move-block-uuids (set/difference move-block-uuids block-uuids)
remove-block-uuids (set/union remove-block-uuids block-uuids)]
(recur other-ops remove-block-uuids update-block-uuids move-block-uuids))
"update"
(let [block-uuid (:block-uuid (second op))
update-block-uuids (conj update-block-uuids block-uuid)]
(recur other-ops remove-block-uuids update-block-uuids move-block-uuids))
(throw (ex-info "unknown op type" op)))))
{move-ops "move" remove-ops "remove" _update-ops "update"} (group-by first ops)
move-block-uuids (->> move-ops
(keep (fn [op]
(let [block-uuids (set (:block-uuids (second op)))]
(seq (set/intersection move-block-uuids-set block-uuids)))))
(apply concat))
remove-block-uuids-groups (->> remove-ops
(keep (fn [op]
(let [block-uuids (set (:block-uuids (second op)))]
(seq (set/intersection remove-block-uuids-set block-uuids))))))
update-block-uuids (seq update-block-uuids-set)
move-ops* (keep
(fn [block-uuid]
(when-let [block (db/entity repo [:block/uuid (uuid block-uuid)])]
(let [left-uuid (some-> block :block/left :block/uuid str)
parent-uuid (some-> block :block/parent :block/uuid str)]
(when (and left-uuid parent-uuid)
["move"
{:block-uuid block-uuid :target-uuid left-uuid :sibling? (not= left-uuid parent-uuid)}]))))
move-block-uuids)
remove-ops* (->> remove-block-uuids-groups
(keep
(fn [block-uuids]
(when-let [block-uuids*
(seq (filter
(fn [block-uuid] (not (db/entity repo [:block/uuid (uuid block-uuid)])))
block-uuids))]
["remove" {:block-uuids block-uuids*}]))))
update-ops* (->> update-block-uuids
(keep (fn [block-uuid]
(when-let [b (db/entity repo [:block/uuid (uuid block-uuid)])]
["update" {:block-uuid block-uuid :content (:block/content b)}]))))]
[move-ops* remove-ops* update-ops*]))
(defn- <client-op-update-handler
[state ops t-before]
{:pre [(some? @(:*graph-uuid state))]}
(go
(let [{:keys [data-from-ws-chan client-op-update-chan]} state
{:keys [data-from-ws client-op-update]}
(async/alt!
data-from-ws-chan ([v] {:data-from-ws v})
client-op-update-chan {:client-op-update true}
:priority true)]
(cond
data-from-ws
nil
client-op-update
nil))))
(let [ops-for-remote (client-ops->remote-ops state ops)
r (with-sub-data-from-ws state
(<! (ws/<send! state {:action "apply-ops" :graph-uuid @(:*graph-uuid state)
:ops ops-for-remote :t-before t-before}))
(<! (get-result-ch)))]
(prn :<client-op-update-handler r))))
(defn <loop-for-rtc
[state graph-uuid repo]
{:pre [(state-validator state)
(some? graph-uuid)
(some? repo)]}
(go
(reset! (:*graph-uuid state) graph-uuid)
(reset! (:*repo state) repo)
(let [{:keys [data-from-ws-pub client-op-update-chan]} state
push-data-from-ws-ch (chan (async/sliding-buffer 100))]
(with-sub-data-from-ws state
(<! (ws/<send! @(:*ws state) {:action "register-graph-updates" :req-id (get-req-id) :graph-uuid graph-uuid}))
(<! (get-result-ch)))
(async/sub data-from-ws-pub "push-updates" push-data-from-ws-ch)
(<! (go-loop []
(let [{:keys [push-data-from-ws client-op-update]}
(async/alt!
client-op-update-chan {:client-op-update true}
push-data-from-ws-ch ([v] {:push-data-from-ws v})
:priority true)]
(cond
push-data-from-ws
(do (push-data-from-ws-handler state push-data-from-ws)
(recur))
client-op-update
(do (prn :client-op-update client-op-update)
(recur))
:else
nil))))
(async/unsub data-from-ws-pub "push-updates" push-data-from-ws-ch))))
(defn init-state
[ws data-from-ws-chan user-uuid]
(m/parse state-schema
{:user-uuid user-uuid
:*graph-uuid (atom nil)
:*repo (atom nil)
:data-from-ws-chan data-from-ws-chan
:data-from-ws-pub (async/pub data-from-ws-chan :req-id)
:client-op-update-chan (chan)
:upload-graph-chan (chan)
:download-graph-chan (chan)
:ws ws}))
:*ws (atom ws)}))
(defn ensure-ws-connected
[state]
)
(defn <init
[]
@@ -200,10 +282,12 @@
(let [data-from-ws-chan (chan (async/sliding-buffer 100))
ws-opened-ch (chan)
user-uuid "f92bb5b3-0f72-4a74-9ad8-1793e655c309"
ws (ws-listen! user-uuid data-from-ws-chan ws-opened-ch)]
ws (ws/ws-listen user-uuid data-from-ws-chan ws-opened-ch)]
(<! ws-opened-ch)
(init-state ws data-from-ws-chan user-uuid))))
(comment
(go
(def global-state (<! (<init)))))
(def global-state (<! (<init))))
(reset! (:*graph-uuid global-state) "00e016b1-cab1-4eea-bf74-a02d9e4910f8")
(reset! (:*repo global-state) (state/get-current-repo)))

View File

@@ -1,8 +1,10 @@
(ns frontend.db.rtc.full-upload-download-graph
"- upload local graph to remote
- download remote graph"
(:require-macros [frontend.db.rtc.macro :refer [with-sub-data-from-ws get-req-id get-result-ch]])
(:require [frontend.db.conn :as conn]
[datascript.core :as d]
[frontend.db.rtc.ws :refer [send]]
[frontend.db.rtc.ws :refer [<send!]]
[frontend.state :as state]
[cljs.core.async :as async :refer [chan go <!]]
[cljs.core.async.interop :refer [p->c]]
@@ -26,34 +28,23 @@
{:db/id (:e (first datoms))}
datoms)))))))
(defn- <upload-graph
(defn <upload-graph
"Upload current repo to remote, return remote {:req-id xxx :graph-uuid <new-remote-graph-uuid>}"
[state]
(go
(let [{:keys [url key all-blocks-str]}
(with-sub-data-from-ws state
(send (:ws state) {:req-id (get-req-id) :action "presign-put-temp-s3-obj" :graph-uuid "not-yet"})
(<! (<send! state {:req-id (get-req-id) :action "presign-put-temp-s3-obj" :graph-uuid "not-yet"}))
(let [all-blocks (export-as-blocks (state/get-current-repo))
all-blocks-str (transit/write (transit/writer :json) all-blocks)]
(merge (<! (get-result-ch)) {:all-blocks-str all-blocks-str})))]
(<! (http/put url {:body all-blocks-str}))
(with-sub-data-from-ws state
(send (:ws state) {:req-id (get-req-id) :action "full-upload-graph" :graph-uuid "not-yet" :s3-key key})
(println (<! (get-result-ch)))))))
(defn- <download-graph
[state graph-uuid]
(go
(let [{:keys [url]}
(with-sub-data-from-ws state
(send (:ws state) {:req-id (get-req-id) :action "full-download-graph" :graph-uuid graph-uuid})
(<! (get-result-ch)))
{:keys [status body] :as r} (<! (http/get url))]
(if (not= 200 status)
(ex-info "<download-graph failed" r)
(let [reader (transit/reader :json)
all-blocks (transit/read reader body)]
all-blocks)))))
(<! (<send! state {:req-id (get-req-id) :action "full-upload-graph" :graph-uuid "not-yet" :s3-key key}))
(let [r (<! (get-result-ch))]
(if-not (:graph-uuid r)
(ex-info "upload graph failed" r)
r))))))
(defn- replace-db-id-with-temp-id
@@ -111,3 +102,18 @@
repo (str "rtc-" graph-uuid)]
(<! (p->c (ipc/ipc :db-new repo)))
(<! (p->c (ipc/ipc :db-transact-data repo (pr-str {:blocks blocks**}))))))))
(defn <download-graph
[state graph-uuid]
(go
(let [{:keys [url]}
(with-sub-data-from-ws state
(<send! state {:req-id (get-req-id) :action "full-download-graph" :graph-uuid graph-uuid})
(<! (get-result-ch)))
{:keys [status body] :as r} (<! (http/get url))]
(if (not= 200 status)
(ex-info "<download-graph failed" r)
(let [reader (transit/reader :json)
all-blocks (transit/read reader body)]
(<! (<transact-remote-all-blocks-to-sqlite all-blocks graph-uuid)))))))

View File

@@ -13,9 +13,7 @@
[:value [:map [:block-uuids [:sequential :string]]]]]
[:catn
[:op [:= "update"]]
[:value [:map
[:block-uuid :string]
[:content {:optional true} :string]]]]])
[:value [:map [:block-uuid :string]]]]])
(def op-validator (m/validator op-schema))
@@ -33,9 +31,8 @@
(p->c (ipc/ipc :rtc/add-ops repo (pr-str [op])))))
(defn <update-block-op!
[repo block-uuid attrs-map]
(let [op ["update" (merge {:block-uuid (str block-uuid)}
(select-keys attrs-map [:content]))]]
[repo block-uuid]
(let [op ["update" {:block-uuid (str block-uuid)}]]
(assert (op-validator op) "illegal op")
(p->c (ipc/ipc :rtc/add-ops repo (pr-str [op])))))

View File

@@ -1,11 +1,46 @@
(ns frontend.db.rtc.ws)
(ns frontend.db.rtc.ws
(:require [frontend.config :as config]
[frontend.util :as util]
[cljs.core.async :as async :refer [<! >! chan go go-loop offer!
poll! timeout]]))
(defn send
(def ws-addr config/RTC-WS-URL)
(defn ws-listen
[user-uuid data-from-ws-chan ws-opened-ch]
(let [ws (js/WebSocket. (util/format ws-addr user-uuid))]
(set! (.-onopen ws) (fn [_e] (async/close! ws-opened-ch)))
(set! (.-onmessage ws) (fn [e]
(let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)]
(offer! data-from-ws-chan data))))
(set! (.-onclose ws) (fn [_e] (println :ws-stopped)))
ws))
(defn send!
[ws message]
(assert (= js/WebSocket.OPEN (.-readyState ws)))
(.send ws (js/JSON.stringify (clj->js message))))
(defn <ensure-ws-open!
[state]
(go
(let [ws @(:*ws state)]
(when (> (.-readyState ws) js/WebSocket.OPEN)
(let [ws-opened-ch (chan)
ws* (ws-listen (:user-uuid state) (:data-from-ws-chan state) ws-opened-ch)]
(<! ws-opened-ch)
(reset! (:*ws state) ws*))))))
(defn <send!
"ensure ws state=open, then send messages"
[state message]
(go
(<! (<ensure-ws-open! state))
(send! @(:*ws state) message)))
(defn stop
[ws]
(set! (.-onopen ws) nil)

View File

@@ -983,7 +983,7 @@
(let [repo (:repo *transaction-args*)
persist-op? (:persist-op? *transaction-args*)]
(when (and persist-op? repo)
(rtc-op/<update-block-op! repo (:block/uuid block) (select-keys block [:block/content]))))
(rtc-op/<update-block-op! repo (:block/uuid block))))
(op-transact! #'save-block block))
(defn insert-blocks!