Feat: sync progress for electron (#6662)

* fix: state shouldn't be returned in ipc

* feat: download && upload processing

* enhance(ui):  interaction of show password checkbox

* feat: time left

* feat: download progress

* Set download batch size to 100

* improve(ui): progress pane of file sync indicator

* improve(ui): progress pie of each file state

* improve(ui): progress pie of each file state

* improve(ui): progress pie of each downloading file

* fix: add last changed time

* enhance: time left

* fix: total needs to be larger than finished

* fix: wait for update-graphs-txid!

* enhance: show in-progress files first

* chore: ui polish

* improve(ui): persist stauts of sync files list toggle switch

* fix(ui): visibility of sync now button on mobile

* chore: remove ios static out after sync

* fix: debounce clicking on sync icon

* fix: repos not refreshed after unlink or delete

* enhance: automatically save page-metadata.edn to avoid sync when restart

* improve(ui): sync now shortcut for file sync progress pane

* enhance: data transfer icons

* fix: stop sync if switched to another graph

* fix: can't switch

* enhance: sort files first before uploading or downloading

* fix: clear current graph uuid when sync stops

* fix: separate progress by graphs

* fix: check files only in the current progress

* fix: prevent multiple sync managers for the same graph

* fix: remove redundant files watchers

* enhance(sync): re-exec remote->local-full-sync when exception

re-exec remote->local-full-sync when <update-local-files return exceptions

* enhance(sync): re-exec remote->local-full-sync when exception

re-exec remote->local-full-sync when <update-local-files return exceptions

* fix(sync): set-progress-callback, update rsapi

* fix(sync): uploading progress bar

Co-authored-by: Tienson Qin <tiensonqin@gmail.com>
Co-authored-by: charlie <xyhp915@qq.com>
Co-authored-by: rcmerci <rcmerci@gmail.com>
This commit is contained in:
Andelf
2022-09-23 02:00:24 +08:00
committed by GitHub
parent 4c3503ff1e
commit ae114afbd8
21 changed files with 962 additions and 438 deletions

View File

@@ -27,7 +27,8 @@
[frontend.encrypt :as encrypt]
[medley.core :refer [dedupe-by]]
[rum.core :as rum]
[promesa.core :as p]))
[promesa.core :as p]
[lambdaisland.glogi :as log]))
;;; ### Commentary
;; file-sync related local files/dirs:
@@ -143,6 +144,7 @@
(s/def ::event #{:created-local-version-file
:finished-local->remote
:finished-remote->local
:start
:pause
:resume
:exception-decrypt-failed
@@ -152,6 +154,10 @@
(s/def ::sync-event (s/keys :req-un [::event ::data]))
(defonce download-batch-size 100)
(defonce upload-batch-size 20)
(def ^:private current-sm-graph-uuid (atom nil))
;;; ### configs in config.edn
;; - :file-sync/ignore-files
@@ -178,13 +184,14 @@
(def graphs-txid (persist-var/persist-var nil "graphs-txid"))
(declare assert-local-txid<=remote-txid)
(defn update-graphs-txid!
(defn <update-graphs-txid!
[latest-txid graph-uuid user-uuid repo]
{:pre [(int? latest-txid) (>= latest-txid 0)]}
(persist-var/-reset-value! graphs-txid [user-uuid graph-uuid latest-txid] repo)
(p/let [_ (persist-var/persist-save graphs-txid)]
(state/pub-event! [:graph/refresh]))
(when (state/developer-mode?) (assert-local-txid<=remote-txid)))
(-> (p/let [_ (persist-var/-reset-value! graphs-txid [user-uuid graph-uuid latest-txid] repo)
_ (persist-var/persist-save graphs-txid)]
(state/pub-event! [:graph/refresh])
(when (state/developer-mode?) (assert-local-txid<=remote-txid)))
p->c))
(defn clear-graphs-txid! [repo]
(persist-var/-reset-value! graphs-txid nil repo)
@@ -592,10 +599,7 @@
FileTxn
(-checksum [this] (.-checksum this)))
(defn- sort-file-metatdata-fn
(defn- sort-file-metadata-fn
":recent-days-range > :favorite-pages > small-size pages > ...
:recent-days-range : [<min-inst-ms> <max-inst-ms>]
"
@@ -605,7 +609,8 @@
(let [favorite-pages* (set favorite-pages)]
(fn [^FileMetadata item]
(let [path (relative-path item)
journal? (string/starts-with? path "journals/")
journal? (string/starts-with? path
(str (config/get-journals-directory) "/"))
journal-day
(when journal?
(try
@@ -623,11 +628,18 @@
(second recent-days-range)))
journal-day
(string/includes? path "logseq/")
9999
(string/includes? path "content.")
10000
(contains? favorite-pages* path)
(count path)
:else
(- (.-size item)))))))
;;; ### APIs
;; `RSAPI` call apis through rsapi package, supports operations on files
@@ -752,10 +764,8 @@
(<update-local-files [this graph-uuid base-path filepaths]
(println "update-local-files" graph-uuid base-path filepaths)
(go
(let [token (<! (<get-token this))
r (<! (<retry-rsapi
#(p->c (ipc/ipc "update-local-files" graph-uuid base-path filepaths token))))]
r)))
(let [token (<! (<get-token this))]
(<! (p->c (ipc/ipc "update-local-files" graph-uuid base-path filepaths token))))))
(<download-version-files [this graph-uuid base-path filepaths]
(go
(let [token (<! (<get-token this))
@@ -847,13 +857,11 @@
(<update-local-files [this graph-uuid base-path filepaths]
(go
(let [token (<! (<get-token this))
r (<! (<retry-rsapi
#(p->c (.updateLocalFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths filepaths
:token token})))))]
r)))
(let [token (<! (<get-token this))]
(<! (p->c (.updateLocalFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths filepaths
:token token})))))))
(<download-version-files [this graph-uuid base-path filepaths]
(go
@@ -1099,6 +1107,7 @@
(map
#(hash-map :checksum (:checksum %)
:encrypted-path (remove-user-graph-uuid-prefix (:Key %))
:size (:Size %)
:last-modified (:LastModified %))
objs))
(when-not (empty? next-continuation-token)
@@ -1113,7 +1122,7 @@
(let [encrypted-path->path-map (zipmap encrypted-path-list* path-list-or-exp)]
(set
(mapv
#(->FileMetadata nil
#(->FileMetadata (:size %)
(:checksum %)
(get encrypted-path->path-map (:encrypted-path %))
(:encrypted-path %)
@@ -1444,7 +1453,7 @@
r)))))))
(defn apply-filetxns-partitions
"won't call update-graphs-txid! when *txid is nil"
"won't call <update-graphs-txid! when *txid is nil"
[*sync-state user-uuid graph-uuid base-path filetxns-partitions repo *txid *stopped *paused]
(assert (some? *sync-state))
@@ -1476,7 +1485,7 @@
;; update local-txid
(when *txid
(reset! *txid latest-txid)
(update-graphs-txid! latest-txid graph-uuid user-uuid repo))
(<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo)))
(recur (next filetxns-partitions*)))))))))
(defmulti need-sync-remote? (fn [v] (cond
@@ -1542,6 +1551,17 @@
:path path
:checksum checksum}))
ILookup
(-lookup [o k] (-lookup o k nil))
(-lookup [_ k not-found]
(case k
:type type
:dir dir
:path path
:stat stat
:checksum checksum
not-found))
IPrintWithWriter
(-pr-writer [_ w _opts]
(write-all w (str {:type type :base-path dir :path path :size (:size stat) :checksum checksum}))))
@@ -1947,7 +1967,9 @@
{:post [(s/valid? ::sync-state %)]}
{:current-syncing-graph-uuid nil
:state ::starting
:full-local->remote-files #{}
:current-local->remote-files #{}
:full-remote->local-files #{}
:current-remote->local-files #{}
:queued-local->remote-files #{}
:recent-remote->local-files #{}
@@ -2006,6 +2028,16 @@
{:post [(s/valid? ::sync-state %)]}
(update sync-state :recent-remote->local-files set/difference items))
(defn sync-state-reset-full-local->remote-files
[sync-state events]
{:post [(s/valid? ::sync-state %)]}
(assoc sync-state :full-local->remote-files events))
(defn sync-state-reset-full-remote->local-files
[sync-state events]
{:post [(s/valid? ::sync-state %)]}
(assoc sync-state :full-remote->local-files events))
(defn- add-history-items
[history paths now]
(sequence
@@ -2072,20 +2104,26 @@
(go
(let [partitioned-filetxns
(sequence (filepath+checksum-coll->partitioned-filetxns
10 graph-uuid user-uuid)
download-batch-size graph-uuid user-uuid)
relative-filepath+checksum-coll)
r
(if (empty? (flatten partitioned-filetxns))
{:succ true}
(<! (apply-filetxns-partitions
*sync-state user-uuid graph-uuid base-path partitioned-filetxns repo
nil *stopped *paused)))]
(do
(put-sync-event! {:event :start
:data {:type :full-remote->local
:graph-uuid graph-uuid
:full-sync? true
:epoch (tc/to-epoch (t/now))}})
(<! (apply-filetxns-partitions
*sync-state user-uuid graph-uuid base-path partitioned-filetxns repo
nil *stopped *paused))))]
(cond
(instance? ExceptionInfo r) {:unknown r}
@*stopped {:stop true}
@*paused {:pause true}
:else
(do (update-graphs-txid! latest-txid graph-uuid user-uuid repo)
(do (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
(reset! *txid latest-txid)
{:succ true})))))
@@ -2103,12 +2141,17 @@
{:need-remote->local-full-sync true})
(when (pos-int? latest-txid)
(let [partitioned-filetxns (transduce (diffs->partitioned-filetxns 10)
(let [partitioned-filetxns (transduce (diffs->partitioned-filetxns download-batch-size)
(completing (fn [r i] (conj r (reverse i)))) ;reverse
'()
(reverse diff-txns))]
(put-sync-event! {:event :start
:data {:type :remote->local
:graph-uuid graph-uuid
:full-sync? false
:epoch (tc/to-epoch (t/now))}})
(if (empty? (flatten partitioned-filetxns))
(do (update-graphs-txid! latest-txid graph-uuid user-uuid repo)
(do (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
(reset! *txid latest-txid)
{:succ true})
(<! (apply-filetxns-partitions
@@ -2140,9 +2183,10 @@
recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
sorted-diff-remote-files
(sort-by
(sort-file-metatdata-fn :recent-days-range recent-10-days-range) > diff-remote-files)
(sort-file-metadata-fn :recent-days-range recent-10-days-range) > diff-remote-files)
latest-txid (:TXId (<! (<get-remote-graph remoteapi nil graph-uuid)))]
(println "[full-sync(remote->local)]" (count sorted-diff-remote-files) "files need to sync")
(swap! *sync-state #(sync-state-reset-full-remote->local-files % sorted-diff-remote-files))
(<! (.sync-files-remote->local!
this (map (juxt relative-path -checksum)
sorted-diff-remote-files)
@@ -2240,188 +2284,193 @@
^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped *paused
;; control chans
private-immediately-local->remote-chan private-recent-edited-chan]
Object
(filter-file-change-events-fn [_]
(fn [^FileChangeEvent e]
(go (and (instance? FileChangeEvent e)
(if-let [mtime (:mtime (.-stat e))]
;; if mtime is not nil, it should be after (- now 1min)
;; ignore events too early
(> (* 1000 mtime) (tc/to-long (t/minus (t/now) (t/minutes 1))))
true)
(or (string/starts-with? (.-dir e) base-path)
(string/starts-with? (str "file://" (.-dir e)) base-path)) ; valid path prefix
(not (ignored? e)) ;not ignored
;; download files will also trigger file-change-events, ignore them
(let [r (not (contains? (:recent-remote->local-files @*sync-state)
(<! (<file-change-event=>recent-remote->local-file-item
graph-uuid e))))]
(when (and (true? r)
(seq (:recent-remote->local-files @*sync-state)))
(println :debug (:recent-remote->local-files @*sync-state) e))
r)))))
Object
(filter-file-change-events-fn [_]
(fn [^FileChangeEvent e]
(go (and (instance? FileChangeEvent e)
(if-let [mtime (:mtime (.-stat e))]
;; if mtime is not nil, it should be after (- now 1min)
;; ignore events too early
(> (* 1000 mtime) (tc/to-long (t/minus (t/now) (t/minutes 1))))
true)
(or (string/starts-with? (.-dir e) base-path)
(string/starts-with? (str "file://" (.-dir e)) base-path)) ; valid path prefix
(not (ignored? e)) ;not ignored
;; download files will also trigger file-change-events, ignore them
(not (contains? (:recent-remote->local-files @*sync-state)
(<! (<file-change-event=>recent-remote->local-file-item
graph-uuid e))))))))
(set-remote->local-syncer! [_ s] (set! remote->local-syncer s))
(set-remote->local-syncer! [_ s] (set! remote->local-syncer s))
ILocal->RemoteSync
(setup-local->remote! [_]
(async/tap immediately-local->remote-mult private-immediately-local->remote-chan)
(async/tap recent-edited-mult private-recent-edited-chan))
ILocal->RemoteSync
(setup-local->remote! [_]
(async/tap immediately-local->remote-mult private-immediately-local->remote-chan)
(async/tap recent-edited-mult private-recent-edited-chan))
(stop-local->remote! [_]
(async/untap immediately-local->remote-mult private-immediately-local->remote-chan)
(async/untap recent-edited-mult private-recent-edited-chan)
(async/close! stop-chan)
(vreset! *stopped true))
(stop-local->remote! [_]
(async/untap immediately-local->remote-mult private-immediately-local->remote-chan)
(async/untap recent-edited-mult private-recent-edited-chan)
(async/close! stop-chan)
(vreset! *stopped true))
(<ratelimit [this from-chan]
(let [<fast-filter-e-fn (.filter-file-change-events-fn this)]
(util/<ratelimit
from-chan rate
:filter-fn
(fn [e]
(go
(and (rsapi-ready? rsapi graph-uuid)
(<! (<fast-filter-e-fn e))
(do
(swap! *sync-state sync-state--add-queued-local->remote-files e)
(let [v (<! (<filter-local-changes-pred e base-path graph-uuid))]
(when-not v
(swap! *sync-state sync-state--remove-queued-local->remote-files e))
v)))))
:flush-fn #(swap! *sync-state sync-state-reset-queued-local->remote-files)
:stop-ch stop-chan
:distinct-coll? true
:flush-now-ch private-immediately-local->remote-chan
:refresh-timeout-ch private-recent-edited-chan)))
(<sync-local->remote! [_ es]
(if (empty? es)
(go {:succ true})
(let [type (.-type ^FileChangeEvent (first es))
es->paths-xf (comp
(map #(relative-path %))
(remove ignored?))]
(go
(let [es* (<! (<filter-checksum-not-consistent graph-uuid es))
_ (when (not= (count es*) (count es))
(println :debug :filter-checksum-changed
(mapv relative-path (set/difference (set es) (set es*)))))
es** (filter-too-huge-files es*)
_ (when (not= (count es**) (count es*))
(println :debug :filter-too-huge-files
(mapv relative-path (set/difference (set es*) (set es**)))))
paths (sequence es->paths-xf es**)
_ (println :sync-local->remote type paths)
r (if (empty? paths)
(go @*txid)
(case type
("add" "change")
(<with-pause (<update-remote-files rsapi graph-uuid base-path paths @*txid) *paused)
"unlink"
(<with-pause (<delete-remote-files rsapi graph-uuid base-path paths @*txid) *paused)))
_ (swap! *sync-state sync-state--add-current-local->remote-files paths)
r* (<! r)
[succ? paused?] ((juxt number? :pause) r*)
_ (swap! *sync-state sync-state--remove-current-local->remote-files paths succ?)]
(cond
(need-sync-remote? r*)
(do (println :need-sync-remote r*)
{:need-sync-remote true})
(need-reset-local-txid? r*) ;; TODO: this cond shouldn't be true,
;; but some potential bugs cause local-txid > remote-txid
(let [remote-txid (:TXId (<! (<get-remote-graph remoteapi nil graph-uuid)))]
(update-graphs-txid! remote-txid graph-uuid user-uuid repo)
(reset! *txid remote-txid)
{:succ true})
(graph-has-been-deleted? r*)
(do (println :graph-has-been-deleted r*)
{:graph-has-been-deleted true})
paused?
{:pause true}
succ? ; succ
(<ratelimit [this from-chan]
(let [<fast-filter-e-fn (.filter-file-change-events-fn this)]
(util/<ratelimit
from-chan rate
:filter-fn
(fn [e]
(go
(and (rsapi-ready? rsapi graph-uuid)
(<! (<fast-filter-e-fn e))
(do
(println "sync-local->remote! update txid" r*)
;; persist txid
(update-graphs-txid! r* graph-uuid user-uuid repo)
(reset! *txid r*)
{:succ true})
(swap! *sync-state sync-state--add-queued-local->remote-files e)
(let [v (<! (<filter-local-changes-pred e base-path graph-uuid))]
(when-not v
(swap! *sync-state sync-state--remove-queued-local->remote-files e))
v)))))
:flush-fn #(swap! *sync-state sync-state-reset-queued-local->remote-files)
:stop-ch stop-chan
:distinct-coll? true
:flush-now-ch private-immediately-local->remote-chan
:refresh-timeout-ch private-recent-edited-chan)))
:else
(do
(println "sync-local->remote unknown:" r*)
{:unknown r*})))))))
(<sync-local->remote! [_ es]
(if (empty? es)
(go {:succ true})
(let [type (.-type ^FileChangeEvent (first es))
es->paths-xf (comp
(map #(relative-path %))
(remove ignored?))]
(go
(let [es* (<! (<filter-checksum-not-consistent graph-uuid es))
_ (when (not= (count es*) (count es))
(println :debug :filter-checksum-changed
(mapv relative-path (set/difference (set es) (set es*)))))
es** (filter-too-huge-files es*)
_ (when (not= (count es**) (count es*))
(println :debug :filter-too-huge-files
(mapv relative-path (set/difference (set es*) (set es**)))))
paths (sequence es->paths-xf es**)
_ (println :sync-local->remote type paths)
r (if (empty? paths)
(go @*txid)
(case type
("add" "change")
(<with-pause (<update-remote-files rsapi graph-uuid base-path paths @*txid) *paused)
(<sync-local->remote-all-files! [this]
(go
(let [remote-all-files-meta-c (<get-remote-all-files-meta remoteapi graph-uuid)
local-all-files-meta-c (<get-local-all-files-meta rsapi graph-uuid base-path)
deletion-logs-c (<get-deletion-logs remoteapi graph-uuid @*txid)
remote-all-files-meta-or-exp (<! remote-all-files-meta-c)
deletion-logs (<! deletion-logs-c)]
(if (or (storage-exceed-limit? remote-all-files-meta-or-exp)
(sync-stop-when-api-flying? remote-all-files-meta-or-exp)
(decrypt-exp? remote-all-files-meta-or-exp))
(do (put-sync-event! {:event :exception-decrypt-failed
:data {:graph-uuid graph-uuid
:exp remote-all-files-meta-or-exp
:epoch (tc/to-epoch (t/now))}})
{:stop true})
(let [remote-all-files-meta remote-all-files-meta-or-exp
local-all-files-meta (<! local-all-files-meta-c)
{local-all-files-meta :keep delete-local-files :delete}
(filter-local-files-in-deletion-logs local-all-files-meta deletion-logs)
diff-local-files (diff-file-metadata-sets local-all-files-meta remote-all-files-meta)
change-events
(sequence
(comp
;; convert to FileChangeEvent
(map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %)
{:size (:size %)} (:etag %)))
(remove ignored?))
diff-local-files)
change-events-partitions
(sequence
;; partition FileChangeEvents
(partition-file-change-events 10)
(distinct-file-change-events change-events))]
(println "[full-sync(local->remote)]"
(count (flatten change-events-partitions)) "files need to sync and"
(count delete-local-files) "local files need to delete")
;; 1. delete local files
(loop [[f & fs] delete-local-files]
(when f
(let [relative-p (relative-path f)]
(when-not (<! (<local-file-not-exist? graph-uuid rsapi base-path relative-p))
(let [fake-recent-remote->local-file-item {:remote->local-type :delete
:checksum nil
:path relative-p}]
(swap! *sync-state sync-state--add-recent-remote->local-files
[fake-recent-remote->local-file-item])
(<! (<delete-local-files rsapi graph-uuid base-path [(relative-path f)]))
(go (<! (timeout 5000))
(swap! *sync-state sync-state--remove-recent-remote->local-files
[fake-recent-remote->local-file-item])))))
(recur fs)))
"unlink"
(<with-pause (<delete-remote-files rsapi graph-uuid base-path paths @*txid) *paused)))
_ (swap! *sync-state sync-state--add-current-local->remote-files paths)
r* (<! r)
[succ? paused?] ((juxt number? :pause) r*)
_ (swap! *sync-state sync-state--remove-current-local->remote-files paths succ?)]
(cond
(need-sync-remote? r*)
(do (println :need-sync-remote r*)
{:need-sync-remote true})
;; 2. upload local files
(loop [es-partitions change-events-partitions]
(if @*stopped
{:stop true}
(if (empty? es-partitions)
{:succ true}
(let [{:keys [succ need-sync-remote graph-has-been-deleted unknown] :as r}
(<! (<sync-local->remote! this (first es-partitions)))]
(s/assert ::sync-local->remote!-result r)
(cond
succ
(recur (next es-partitions))
(or need-sync-remote graph-has-been-deleted unknown) r)))))))))))
(need-reset-local-txid? r*) ;; TODO: this cond shouldn't be true,
;; but some potential bugs cause local-txid > remote-txid
(let [remote-txid (:TXId (<! (<get-remote-graph remoteapi nil graph-uuid)))]
(<! (<update-graphs-txid! remote-txid graph-uuid user-uuid repo))
(reset! *txid remote-txid)
{:succ true})
(graph-has-been-deleted? r*)
(do (println :graph-has-been-deleted r*)
{:graph-has-been-deleted true})
paused?
{:pause true}
succ? ; succ
(do
(println "sync-local->remote! update txid" r*)
;; persist txid
(<! (<update-graphs-txid! r* graph-uuid user-uuid repo))
(reset! *txid r*)
{:succ true})
:else
(do
(println "sync-local->remote unknown:" r*)
{:unknown r*})))))))
(<sync-local->remote-all-files! [this]
(go
(let [remote-all-files-meta-c (<get-remote-all-files-meta remoteapi graph-uuid)
local-all-files-meta-c (<get-local-all-files-meta rsapi graph-uuid base-path)
deletion-logs-c (<get-deletion-logs remoteapi graph-uuid @*txid)
remote-all-files-meta-or-exp (<! remote-all-files-meta-c)
deletion-logs (<! deletion-logs-c)]
(if (or (storage-exceed-limit? remote-all-files-meta-or-exp)
(sync-stop-when-api-flying? remote-all-files-meta-or-exp)
(decrypt-exp? remote-all-files-meta-or-exp))
(do (put-sync-event! {:event :exception-decrypt-failed
:data {:graph-uuid graph-uuid
:exp remote-all-files-meta-or-exp
:epoch (tc/to-epoch (t/now))}})
{:stop true})
(let [remote-all-files-meta remote-all-files-meta-or-exp
local-all-files-meta (<! local-all-files-meta-c)
{local-all-files-meta :keep delete-local-files :delete}
(filter-local-files-in-deletion-logs local-all-files-meta deletion-logs)
recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
diff-local-files (->> (diff-file-metadata-sets local-all-files-meta remote-all-files-meta)
(sort-by (sort-file-metadata-fn :recent-days-range recent-10-days-range) >))
change-events
(sequence
(comp
;; convert to FileChangeEvent
(map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %)
{:size (:size %)} (:etag %)))
(remove ignored?))
diff-local-files)
distinct-change-events (distinct-file-change-events change-events)
_ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distinct-change-events))
change-events-partitions
(sequence
;; partition FileChangeEvents
(partition-file-change-events upload-batch-size)
distinct-change-events)]
(println "[full-sync(local->remote)]"
(count (flatten change-events-partitions)) "files need to sync and"
(count delete-local-files) "local files need to delete")
(put-sync-event! {:event :start
:data {:type :full-local->remote
:graph-uuid graph-uuid
:full-sync? true
:epoch (tc/to-epoch (t/now))}})
;; 1. delete local files
(loop [[f & fs] delete-local-files]
(when f
(let [relative-p (relative-path f)]
(when-not (<! (<local-file-not-exist? graph-uuid rsapi base-path relative-p))
(let [fake-recent-remote->local-file-item {:remote->local-type :delete
:checksum nil
:path relative-p}]
(swap! *sync-state sync-state--add-recent-remote->local-files
[fake-recent-remote->local-file-item])
(<! (<delete-local-files rsapi graph-uuid base-path [(relative-path f)]))
(go (<! (timeout 5000))
(swap! *sync-state sync-state--remove-recent-remote->local-files
[fake-recent-remote->local-file-item])))))
(recur fs)))
;; 2. upload local files
(loop [es-partitions change-events-partitions]
(if @*stopped
{:stop true}
(if (empty? es-partitions)
{:succ true}
(let [{:keys [succ need-sync-remote graph-has-been-deleted unknown] :as r}
(<! (<sync-local->remote! this (first es-partitions)))]
(s/assert ::sync-local->remote!-result r)
(cond
succ
(recur (next es-partitions))
(or need-sync-remote graph-has-been-deleted unknown) r)))))))))))
;;; ### put all stuff together
@@ -2454,7 +2503,7 @@
::local->remote-full-sync
(<! (.full-sync this))
::remote->local-full-sync
(<! (.remote->local-full-sync this nil))
(<! (.remote->local-full-sync this args))
::pause
(<! (.pause this))
::stop
@@ -2541,8 +2590,8 @@
(pause [this]
(put-sync-event! {:event :pause
:data {:graph-uuid graph-uuid
:epoch (tc/to-epoch (t/now))}})
:data {:graph-uuid graph-uuid
:epoch (tc/to-epoch (t/now))}})
(go-loop []
(let [{:keys [resume]} (<! ops-chan)]
(if resume
@@ -2563,9 +2612,9 @@
;; if resume-state = nil, try a remote->local to sync recent diffs
(offer! private-remote->local-sync-chan true))
(put-sync-event! {:event :resume
:data {:graph-uuid graph-uuid
:resume-state resume-state
:epoch (tc/to-epoch (t/now))}})
:data {:graph-uuid graph-uuid
:resume-state resume-state
:epoch (tc/to-epoch (t/now))}})
(<! (.schedule this ::idle nil :resume)))
(recur)))))
@@ -2615,11 +2664,11 @@
unknown
(do
(put-sync-event! {:event :local->remote-full-sync-failed
:data {:graph-uuid graph-uuid
:epoch (tc/to-epoch (t/now))}})
:data {:graph-uuid graph-uuid
:epoch (tc/to-epoch (t/now))}})
(.schedule this ::idle nil nil))))))
(remote->local-full-sync [this _next-state]
(remote->local-full-sync [this _]
(go
(let [{:keys [succ unknown stop pause]}
(<! (<sync-remote->local-all-files! remote->local-syncer))]
@@ -2638,9 +2687,11 @@
unknown
(do
(put-sync-event! {:event :remote->local-full-sync-failed
:data {:graph-uuid graph-uuid
:epoch (tc/to-epoch (t/now))}})
(.schedule this ::idle nil nil))))))
:data {:graph-uuid graph-uuid
:exp unknown
:epoch (tc/to-epoch (t/now))}})
;; if any exception occurred, re-exec remote->local-full-sync
(.schedule this ::remote->local-full-sync nil nil))))))
(remote->local [this _next-state {remote-val :remote}]
(go
@@ -2678,8 +2729,14 @@
(assert (some? local-changes) local-changes)
(go
(let [distincted-local-changes (distinct-file-change-events local-changes)
_ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distincted-local-changes))
change-events-partitions
(sequence (partition-file-change-events 10) distincted-local-changes)
(sequence (partition-file-change-events upload-batch-size) distincted-local-changes)
_ (put-sync-event! {:event :start
:data {:type :local->remote
:graph-uuid graph-uuid
:full-sync? false
:epoch (tc/to-epoch (t/now))}})
{:keys [succ need-sync-remote graph-has-been-deleted unknown stop pause]}
(loop [es-partitions change-events-partitions]
(cond
@@ -2741,9 +2798,11 @@
(debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
(swap! *sync-state sync-state--update-state ::stop)
(loop []
(when (not= ::stop state)
(<! (timeout 100))
(recur))))))
(if (not= ::stop state)
(do
(<! (timeout 100))
(recur))
(reset! current-sm-graph-uuid nil))))))
IStopped?
(-stopped? [_]
@@ -2769,21 +2828,25 @@
(->SyncManager graph-uuid base-path *sync-state local->remote-syncer remote->local-syncer remoteapi-with-stop
nil *txid nil nil nil *stopped? *paused? nil (chan 1) (chan 1) (chan 1) (chan 1) (chan 1))))
(def ^:private current-sm-graph-uuid (atom nil))
(defn sync-manager-singleton
[user-uuid graph-uuid base-path repo txid *sync-state]
(when-not @current-sm-graph-uuid
(reset! current-sm-graph-uuid graph-uuid)
(sync-manager user-uuid graph-uuid base-path repo txid *sync-state)))
(defn clear-graph-progress!
[graph-uuid]
(state/set-state! [:file-sync/progress graph-uuid] {}))
(defn <sync-stop []
(go
(when-let [sm ^SyncManager (state/get-file-sync-manager)]
(println "[SyncManager" (:graph-uuid sm) "]" "stopping")
(<! (-stop! sm))
(println "[SyncManager" (:graph-uuid sm) "]" "stopped")
(state/set-file-sync-manager nil))
(state/set-file-sync-manager nil)
(clear-graph-progress! (:graph-uuid sm)))
(reset! current-sm-graph-uuid nil)))
(defn sync-need-password!
@@ -2840,11 +2903,14 @@
(declare network-online-cursor)
;; Prevent starting of multiple sync managers
(def *sync-starting? (atom {}))
(defn sync-start []
(let [*sync-state (atom (sync-state))
current-user-uuid (user/user-uuid)
repo (state/get-current-repo)]
(go
(<! (<sync-stop))
(when (and (graph-sync-off? repo) @network-online-cursor)
(<! (p->c (persist-var/-load graphs-txid)))
(let [[user-uuid graph-uuid txid] @graphs-txid]
@@ -2852,25 +2918,35 @@
(user/logged-in?)
repo
(not (config/demo-graph? repo)))
(when-some [sm (sync-manager-singleton current-user-uuid graph-uuid
(config/get-repo-dir repo) repo
txid *sync-state)]
(when (check-graph-belong-to-current-user current-user-uuid user-uuid)
(if-not (<! (<check-remote-graph-exists graph-uuid)) ; remote graph has been deleted
(clear-graphs-txid! repo)
(do
(state/set-file-sync-state repo @*sync-state)
(state/set-file-sync-manager sm)
(try
(when-not (get @*sync-starting? graph-uuid)
(swap! *sync-starting? assoc graph-uuid true)
(clear-graph-progress! graph-uuid)
;; update global state when *sync-state changes
(add-watch *sync-state ::update-global-state
(fn [_ _ _ n]
(state/set-file-sync-state repo n)))
(when-some [sm (sync-manager-singleton current-user-uuid graph-uuid
(config/get-repo-dir repo) repo
txid *sync-state)]
(when (check-graph-belong-to-current-user current-user-uuid user-uuid)
(if-not (<! (<check-remote-graph-exists graph-uuid)) ; remote graph has been deleted
(clear-graphs-txid! repo)
(do
(state/set-file-sync-state repo @*sync-state)
(state/set-file-sync-manager sm)
(.start sm)
;; update global state when *sync-state changes
(add-watch *sync-state ::update-global-state
(fn [_ _ _ n]
(state/set-file-sync-state repo n)))
(offer! remote->local-full-sync-chan true)
(offer! full-sync-chan true)))))))))))
(.start sm)
(offer! remote->local-full-sync-chan true)
(offer! full-sync-chan true)
(swap! *sync-starting? assoc graph-uuid false))))))
(catch :default e
(prn "Sync start error: ")
(log/error :exception e)
(swap! *sync-starting? assoc graph-uuid false)))))))))
;;; ### some add-watches
@@ -2927,6 +3003,15 @@
(<get-local-all-files-meta rsapi graph-uuid
(config/get-repo-dir (state/get-current-repo)))
(def base-path (config/get-repo-dir (state/get-current-repo)))
;; upload
(def full-upload-files (:full-local->remote-files (state/sub [:file-sync/sync-state (state/get-current-repo)])))
;; queued
(:queued-local->remote-files (state/sub [:file-sync/sync-state (state/get-current-repo)]))
;; download
(:current-remote->local-files (state/sub [:file-sync/sync-state (state/get-current-repo)]))
)