fix(rtc): update pull-remote-data logic

This commit is contained in:
rcmerci
2023-10-19 01:34:43 +08:00
parent 60737c5903
commit ed460b2e56
2 changed files with 60 additions and 28 deletions

View File

@@ -22,6 +22,7 @@
[:map {:closed true}
[:req-id :string]
[:t {:optional true} :int]
[:t-before {:optional true} :int]
[:affected-blocks {:optional true}
[:map-of :string
[:multi {:dispatch :op :decode/string #(update % :op keyword)}

View File

@@ -153,7 +153,10 @@
(defn- check-block-pos
[repo block-uuid-str remote-parents remote-left-uuid-str]
(let [local-b (db/pull repo '[*] [:block/uuid (uuid block-uuid-str)])
(let [local-b (db/pull repo '[{:block/left [:block/uuid]}
{:block/parent [:block/uuid]}
*]
[:block/uuid (uuid block-uuid-str)])
remote-parent-uuid-str (first remote-parents)]
(cond
(nil? local-b)
@@ -238,8 +241,8 @@
(defn apply-remote-update-page-ops
[repo update-page-ops]
(doseq [{:keys [self page-name] :as op-value} update-page-ops]
(let [old-page-name (:block/name (db/pull repo '[*] [:block/uuid (uuid self)]))
exist-page (db/pull repo '[*] [:block/name page-name])]
(let [old-page-name (:block/name (db/pull repo [:block/name] [:block/uuid (uuid self)]))
exist-page (db/pull repo [:block/uuid] [:block/name page-name])]
(cond
;; same name but different uuid
;; remote page has same block/name as local's, but they don't have same block/uuid.
@@ -266,7 +269,8 @@
(defn apply-remote-remove-page-ops
[repo remove-page-ops]
(doseq [op remove-page-ops]
(when-let [page-name (:block/name (db/pull repo '[*] [:block/uuid (uuid (:block-uuid op))]))]
(when-let [page-name (:block/name
(db/pull repo [:block/name] [:block/uuid (uuid (:block-uuid op))]))]
(page-handler/delete! page-name nil {:redirect-to-home? false :persist-op? false}))))
@@ -303,18 +307,30 @@
[repo data-from-ws & {:keys [max-pushed-op-key]}]
(assert (rtc-const/data-from-ws-validator data-from-ws) data-from-ws)
(go
(let [affected-blocks-map (:affected-blocks data-from-ws)
remote-t (:t data-from-ws)
{:keys [local-tx ops]} (<! (p->c (op/<get-ops&local-tx repo)))
unpushed-ops (when max-pushed-op-key
(keep (fn [[key op]] (when (> key max-pushed-op-key) op)) ops))
affected-blocks-map* (if unpushed-ops
(filter-remote-data-by-local-unpushed-ops
affected-blocks-map unpushed-ops)
affected-blocks-map)]
(if (<= remote-t local-tx)
(prn ::skip :remote-t remote-t :local-t local-tx)
(let [{remove-ops-map :remove move-ops-map :move update-ops-map :update-attrs
(let [remote-t (:t data-from-ws)
remote-t-before (:t-before data-from-ws)
{:keys [local-tx ops]} (<! (p->c (op/<get-ops&local-tx repo)))]
(cond
(not (and (pos? remote-t)
(pos? remote-t-before)))
(throw (ex-info "invalid remote-data" {:data data-from-ws}))
(<= remote-t local-tx)
(prn ::skip :remote-t remote-t :remote-t remote-t-before :local-t local-tx)
(< local-tx remote-t-before)
(do (prn ::need-pull-remote-data :remote-t remote-t :remote-t remote-t-before :local-t local-tx)
::need-pull-remote-data)
(<= remote-t-before local-tx remote-t)
(let [affected-blocks-map (:affected-blocks data-from-ws)
unpushed-ops (when max-pushed-op-key
(keep (fn [[key op]] (when (> key max-pushed-op-key) op)) ops))
affected-blocks-map* (if unpushed-ops
(filter-remote-data-by-local-unpushed-ops
affected-blocks-map unpushed-ops)
affected-blocks-map)
{remove-ops-map :remove move-ops-map :move update-ops-map :update-attrs
update-page-ops-map :update-page remove-page-ops-map :remove-page}
(update-vals
(group-by (fn [[_ env]] (get env :op)) affected-blocks-map*)
@@ -329,12 +345,18 @@
(util/profile :apply-remote-move-ops (apply-remote-move-ops repo sorted-move-ops))
(util/profile :apply-remote-update-ops (apply-remote-update-ops repo update-ops))
(util/profile :apply-remote-remove-page-ops (apply-remote-remove-page-ops repo remove-page-ops))
(<! (p->c (op/<update-local-tx! repo remote-t))))))))
(<! (p->c (op/<update-local-tx! repo remote-t))))
:else (throw (ex-info "unreachable" {:remote-t remote-t
:remote-t-before remote-t-before
:local-t local-tx}))))))
(defn- <push-data-from-ws-handler
[repo push-data-from-ws]
(go (<! (<apply-remote-data repo push-data-from-ws))
(prn :push-data-from-ws push-data-from-ws)))
(prn :push-data-from-ws push-data-from-ws)
(go
(let [r (<! (<apply-remote-data repo push-data-from-ws))]
(when (= r ::need-pull-remote-data)
r))))
(defn- ^:large-vars/cleanup-todo local-ops->remote-ops
@@ -418,7 +440,9 @@
[#{} #{} #{} #{} {}] sorted-ops)
move-ops (keep
(fn [block-uuid]
(when-let [block (db/pull repo '[*] [:block/uuid (uuid block-uuid)])]
(when-let [block (db/pull repo '[{:block/left [:block/uuid]}
{:block/parent [:block/uuid]}]
[:block/uuid (uuid block-uuid)])]
(let [left-uuid (some-> block :block/left :block/uuid str)
parent-uuid (some-> block :block/parent :block/uuid str)]
(when (and left-uuid parent-uuid)
@@ -426,18 +450,23 @@
{:block-uuid block-uuid :target-uuid left-uuid :sibling? (not= left-uuid parent-uuid)}]))))
move-block-uuid-set)
remove-block-uuid-set
(filter (fn [block-uuid] (nil? (db/pull '[*] repo [:block/uuid (uuid block-uuid)]))) remove-block-uuid-set)
(filter (fn [block-uuid] (nil? (db/pull [:block/uuid] repo [:block/uuid (uuid block-uuid)]))) remove-block-uuid-set)
remove-ops (when (seq remove-block-uuid-set) [[:remove {:block-uuids remove-block-uuid-set}]])
update-page-ops (keep (fn [block-uuid]
(when-let [page-name (:block/name (db/pull repo '[*] [:block/uuid (uuid block-uuid)]))]
(when-let [page-name
(:block/name
(db/pull repo [:block/name] [:block/uuid (uuid block-uuid)]))]
[:update-page {:block-uuid block-uuid :page-name page-name}]))
update-page-uuid-set)
remove-page-ops (keep (fn [block-uuid]
(when (nil? (db/pull repo '[*] [:block/uuid (uuid block-uuid)]))
(when (nil? (db/pull repo [:block/uuid] [:block/uuid (uuid block-uuid)]))
[:remove-page {:block-uuid block-uuid}]))
remove-page-uuid-set)
update-ops (keep (fn [[block-uuid attr-map]]
(when-let [b (db/pull repo '[*] [:block/uuid (uuid block-uuid)])]
(when-let [b (db/pull repo '[{:block/left [:block/uuid]}
{:block/parent [:block/uuid]}
*]
[:block/uuid (uuid block-uuid)])]
(let [key-set (set (keys attr-map))
left-uuid (some-> b :block/left :block/uuid str)
parent-uuid (some-> b :block/parent :block/uuid str)
@@ -530,7 +559,6 @@
(throw (ex-info "Unavailable" {:remote-ex remote-ex})))
(do (assert (pos? (:t r)) r)
(<! (p->c (op/<clean-ops repo op-keys)))
;; (<! (p->c (op/<update-local-tx! repo (:t r))))
(<! (<apply-remote-data repo (rtc-const/data-from-ws-decoder r)
:max-pushed-op-key max-op-key))
(prn :<client-op-update-handler :t (:t r)))))))
@@ -583,8 +611,11 @@
(recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?)))
push-data-from-ws
(do (<push-data-from-ws-handler repo push-data-from-ws)
(recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
(let [r (<! (<push-data-from-ws-handler repo push-data-from-ws))]
(when (= r ::need-pull-remote-data)
;; trigger a force push, which can pull remote-diff-data from local-t to remote-t
(async/put! force-push-client-ops-ch true))
(recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
client-op-update
(let [maybe-exp (<! (user/<wrap-ensure-id&access-token
@@ -633,7 +664,7 @@
:toggle-auto-push-client-ops-chan (chan (async/sliding-buffer 1))
:*auto-push-client-ops? (atom true :validator boolean?)
:*stop-rtc-loop-chan (atom nil)
:force-push-client-ops-chan (chan 1)
:force-push-client-ops-chan (chan (async/sliding-buffer 1))
:*ws (atom ws)})
(defn <init-state