feat(sync): support stop from processing remote->local sync

This commit is contained in:
rcmerci
2022-01-16 01:11:31 +08:00
parent 85bb55a784
commit e4d7a1582a

View File

@@ -36,7 +36,7 @@
;;; TODO: add some spec validate
;;; TODO: use access-token instead of id-token
;;; TODO: support stop from applying filetxns from remote
;;; TODO: update-remote-file failed when pagename contains whitespace
(def ws-addr "wss://og96xf1si7.execute-api.us-east-2.amazonaws.com/production?graphuuid=%s")
@@ -526,25 +526,6 @@
(def remoteapi (->RemoteAPI))
(defn- update-txn [^FileTxnSet filetxnset txn]
(let [{:keys [TXType TXContent]} txn
files (->> (string/split-lines TXContent)
(remove empty?)
(mapv #(remove-user-graph-uuid-prefix %)))]
(case TXType
"update_files"
(reduce #(.update-file ^FileTxnSet %1 %2) filetxnset files)
"rename_file"
(let [[from to] files]
(.rename-file filetxnset from to))
"delete_files"
(reduce #(.delete-file ^FileTxnSet %1 %2) filetxnset files))))
(defn update-txns [filetxnset txns]
(reduce update-txn filetxnset txns))
(defn- apply-filetxns
[graph-uuid base-path filetxns]
(cond
@@ -568,20 +549,22 @@
true
r))))))
(defn- apply-filetxns-partitions [^SyncState sync-state graph-uuid base-path filetxns-partitions repo *txid]
(defn- apply-filetxns-partitions [^SyncState sync-state graph-uuid base-path filetxns-partitions repo *txid *stopped]
(go-loop [filetxns-partitions* filetxns-partitions]
(when (seq filetxns-partitions*)
(let [filetxns (first filetxns-partitions*)
paths (map relative-path filetxns)
_ (. sync-state (add-current-remote->local-files! paths))
r (<! (apply-filetxns graph-uuid base-path filetxns))
_ (. sync-state (remove-current-remote->local-files! paths))]
(if (instance? ExceptionInfo r)
r
(let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))]
(reset! *txid latest-txid)
(update-graphs-txid! latest-txid graph-uuid repo)
(recur (next filetxns-partitions*))))))))
(if @*stopped
{:stop true}
(when (seq filetxns-partitions*)
(let [filetxns (first filetxns-partitions*)
paths (map relative-path filetxns)
_ (. sync-state (add-current-remote->local-files! paths))
r (<! (apply-filetxns graph-uuid base-path filetxns))
_ (. sync-state (remove-current-remote->local-files! paths))]
(if (instance? ExceptionInfo r)
r
(let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))]
(reset! *txid latest-txid)
(update-graphs-txid! latest-txid graph-uuid repo)
(recur (next filetxns-partitions*)))))))))
(defmulti need-sync-remote? (fn [v] (cond
(= :max v)
@@ -658,6 +641,7 @@
(defprotocol IRemote->LocalSync
(stop-remote->local! [this])
(sync-remote->local! [this] "return ExceptionInfo when error occurs")
(sync-remote->local-all-files! [this] "sync all files, return ExceptionInfo when error occurs"))
@@ -668,37 +652,43 @@
(ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN,
return chan returning events with rate limited")
(sync-local->remote! [this es] "es is a sequence of `FileChangeEvent`, all items have same type.")
(sync-local->remote-all-files! [this stop-chan] "compare all local files to remote ones, sync if not equal.
ensure local-txid = remote-txid before calling this func"))
(sync-local->remote-all-files! [this] "compare all local files to remote ones, sync when not equal.
if local-txid != remote-txid, return {:need-sync-remote true}"))
(deftype Remote->LocalSyncer [graph-uuid base-path repo *txid ^SyncState sync-state ^:mutable local->remote-syncer]
(deftype Remote->LocalSyncer [graph-uuid base-path repo *txid ^SyncState sync-state
^:mutable local->remote-syncer *stopped]
Object
(set-local->remote-syncer! [_ s] (set! local->remote-syncer s))
IRemote->LocalSync
(stop-remote->local! [_] (vreset! *stopped true))
(sync-remote->local! [_]
(go
(let [r
(let [r (<! (get-diff remoteapi graph-uuid @*txid))]
(if (instance? ExceptionInfo r)
r
(let [[diff-txns latest-txid] r]
(let [diff-r (<! (get-diff remoteapi graph-uuid @*txid))]
(if (instance? ExceptionInfo diff-r)
diff-r
(let [[diff-txns latest-txid] diff-r]
(when (number? latest-txid)
(let [partitioned-filetxns (transduce (diffs->partitioned-filetxns 5)
(let [partitioned-filetxns (transduce (diffs->partitioned-filetxns 10)
(completing (fn [r i] (conj r (reverse i)))) ;reverse
'()
(reverse diff-txns))]
(prn "partition-filetxns" partitioned-filetxns)
;; TODO: precheck etag
(let [apply-result
(<! (apply-filetxns-partitions
sync-state graph-uuid base-path partitioned-filetxns repo *txid))]
(when-not (instance? ExceptionInfo apply-result)
(reset! *txid latest-txid)
(update-graphs-txid! latest-txid graph-uuid repo))
apply-result))))))]
(if (instance? ExceptionInfo r)
(if (empty? (flatten partitioned-filetxns))
(do (update-graphs-txid! latest-txid graph-uuid repo)
(reset! *txid latest-txid)
{:succ true})
(<! (apply-filetxns-partitions
sync-state graph-uuid base-path partitioned-filetxns repo *txid *stopped))))))))]
(cond
(instance? ExceptionInfo r)
{:unknown r}
@*stopped
{:stop true}
:else
{:succ true}))))
(sync-remote->local-all-files! [_] nil))
@@ -716,7 +706,7 @@
(reduce #(when (re-find %2 path) (reduced true)) false regexps))
(deftype Local->RemoteSyncer [graph-uuid base-path repo ^SyncState sync-state
^:mutable rate *txid ^:mutable remote->local-syncer stop-chan]
^:mutable rate *txid ^:mutable remote->local-syncer stop-chan ^:mutable stopped]
Object
(filter-file-change-events-fn [this]
(fn [^FileChangeEvent e] (and (instance? FileChangeEvent e)
@@ -735,7 +725,9 @@
(get-ignore-files [_] #{#"logseq/graphs-txid.edn$" #"logseq/bak/.*" #"version-files/.*" #"logseq/\.recycle/.*"
#"\.DS_Store$"})
(get-monitored-dirs [_] #{#"^assets/" #"^journals/" #"^logseq/" #"^pages/"})
(stop-local->remote! [_] (async/close! stop-chan))
(stop-local->remote! [_]
(async/close! stop-chan)
(set! stopped true))
(ratelimit [this from-chan]
(let [c (.filtered-chan this 10000)
@@ -808,7 +800,7 @@
(println "sync-local->remote unknown:" r*)
{:unknown r*}))))))))
(sync-local->remote-all-files! [this stop-chan]
(sync-local->remote-all-files! [this]
(go
(let [remote-all-files-meta-c (get-remote-all-files-meta remoteapi graph-uuid)
local-all-files-meta-c (get-local-all-files-meta rsapi graph-uuid base-path)
@@ -827,14 +819,14 @@
(and (not (contains-path? ignore-files path))
(contains-path? monitored-dirs path))))
;; partition FileChangeEvents
(partition-file-change-events 5))
(partition-file-change-events 10))
diff-local-files)]
(println "[full-sync]" (count (flatten change-events-partitions)) "files need to sync to remote")
(loop [es-partitions change-events-partitions]
(if (empty? es-partitions)
{:succ true}
(if (async/poll! stop-chan)
{:stop true}
(if stopped
{:stop true}
(if (empty? es-partitions)
{:succ true}
(let [{:keys [succ need-sync-remote unknown] :as r}
(<! (sync-local->remote! this (first es-partitions)))]
(cond
@@ -892,7 +884,7 @@
^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer
full-sync-chan stop-sync-chan remote->local-sync-chan local->remote-sync-chan
local-changes-chan ^:mutable ratelimit-local-changes-chan
*txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws]
*txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws ^:mutable stopped]
Object
(schedule [this next-state & args]
(println "[SyncManager" graph-uuid "]" (and state (name state)) "->" (and next-state (name next-state)))
@@ -947,7 +939,7 @@
(full-sync [this]
(go
(let [{:keys [succ need-sync-remote unknown stop]}
(<! (sync-local->remote-all-files! local->remote-syncer stop-sync-chan))]
(<! (sync-local->remote-all-files! local->remote-syncer))]
(cond
succ
(.schedule this ::idle)
@@ -964,16 +956,16 @@
(go
(if (some-> remote-val :txid (<= @*txid))
(.schedule this ::idle)
(let [{:keys [succ unknown]}
(let [{:keys [succ unknown stop]}
(<! (sync-remote->local! remote->local-syncer))]
(cond
succ
(.schedule this (or next-state ::idle))
stop
(.schedule this ::stop)
unknown
(do
(prn "remote->local err" unknown)
(.schedule this ::idle)))))))
(do (prn "remote->local err" unknown)
(.schedule this ::idle)))))))
(local->remote [this [^FileChangeEvent local-change]]
(assert (some? local-change))
@@ -993,11 +985,14 @@
(.schedule this ::idle))))))
IStoppable
(-stop! [_]
(ws-stop! _*ws)
(offer! stop-sync-chan true)
(stop-local->remote! local->remote-syncer)
(debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
(.update-state! sync-state ::stop)))
(when-not stopped
(set! stopped true)
(ws-stop! _*ws)
(offer! stop-sync-chan true)
(stop-local->remote! local->remote-syncer)
(stop-remote->local! remote->local-syncer)
(debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
(.update-state! sync-state ::stop))))
(defn sync-manager [graph-uuid base-path repo txid sync-state full-sync-chan stop-sync-chan
@@ -1007,15 +1002,15 @@
base-path
repo sync-state
20000
*txid nil (chan))
*txid nil (chan) false)
remote->local-syncer (->Remote->LocalSyncer graph-uuid
base-path
repo *txid sync-state nil)]
repo *txid sync-state nil (volatile! false))]
(.set-remote->local-syncer! local->remote-syncer remote->local-syncer)
(.set-local->remote-syncer! remote->local-syncer local->remote-syncer)
(->SyncManager graph-uuid base-path sync-state local->remote-syncer remote->local-syncer
full-sync-chan stop-sync-chan
remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil)))
remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil false)))
(def full-sync-chan (chan 1))