Enhance(sync): serveral enhancements in sync (#10134)

* enhance(sync): 1. dont clear-tokens when refresh-token failed

2. log invalid json when fail to parse api json body,
   usually, it's returned by firewall (e.g. a XML)
3. handle exceptions carefully in frontend.fs.sync

* enhance(sync): update avoid reentrancy related code

* rename exp to guard-ex
This commit is contained in:
rcmerci
2023-09-04 17:27:51 +08:00
committed by GitHub
parent 98bff93439
commit 2d34af9f6a
3 changed files with 287 additions and 253 deletions

View File

@@ -193,6 +193,11 @@
;;; ### configs ends
(defn- guard-ex
[x]
(when (instance? ExceptionInfo x) x))
(def ws-addr config/WS-URL)
;; Warning: make sure to `persist-var/-load` graphs-txid before using it.
@@ -264,7 +269,10 @@
(defn- get-json-body [body]
(or (and (not (string? body)) body)
(or (string/blank? body) nil)
(js->clj (js/JSON.parse body) :keywordize-keys true)))
(try (js->clj (js/JSON.parse body) :keywordize-keys true)
(catch :default e
(prn :invalid-json body)
e))))
(defn- get-resp-json-body [resp]
(-> resp (:body) (get-json-body)))
@@ -841,9 +849,8 @@
(<get-local-all-files-meta [this graph-uuid base-path]
(go
(let [r (<! (<retry-rsapi #(p->c (ipc/ipc "get-local-all-files-meta" graph-uuid base-path))))]
(if (instance? ExceptionInfo r)
r
(<! (<build-local-file-metadatas this graph-uuid r))))))
(or (guard-ex r)
(<! (<build-local-file-metadatas this graph-uuid r))))))
(<get-local-files-meta [this graph-uuid base-path filepaths]
(go
(let [r (<! (<retry-rsapi #(p->c (ipc/ipc "get-local-files-meta" graph-uuid base-path filepaths))))]
@@ -857,20 +864,21 @@
(println "update-local-files" graph-uuid base-path filepaths)
(go
(<! (<rsapi-cancel-all-requests))
(let [token (<! (<get-token this))]
(<! (p->c (ipc/ipc "update-local-files" graph-uuid base-path filepaths token))))))
(let [token-or-exp (<! (<get-token this))]
(or (guard-ex token-or-exp)
(<! (p->c (ipc/ipc "update-local-files" graph-uuid base-path filepaths token-or-exp)))))))
(<fetch-remote-files [this graph-uuid base-path filepaths]
(go
(<! (<rsapi-cancel-all-requests))
(let [token (<! (<get-token this))]
(<! (p->c (ipc/ipc "fetch-remote-files" graph-uuid base-path filepaths token))))))
(let [token-or-exp (<! (<get-token this))]
(or (guard-ex token-or-exp)
(<! (p->c (ipc/ipc "fetch-remote-files" graph-uuid base-path filepaths token-or-exp)))))))
(<download-version-files [this graph-uuid base-path filepaths]
(go
(let [token (<! (<get-token this))
r (<! (<retry-rsapi
#(p->c (ipc/ipc "download-version-files" graph-uuid base-path filepaths token))))]
r)))
(let [token-or-exp (<! (<get-token this))]
(or (guard-ex token-or-exp)
(<! (<retry-rsapi #(p->c (ipc/ipc "download-version-files" graph-uuid base-path filepaths token-or-exp))))))))
(<delete-local-files [_ graph-uuid base-path filepaths]
(let [normalized-filepaths (mapv path-normalize filepaths)]
@@ -883,17 +891,21 @@
(let [normalized-filepaths (mapv path-normalize filepaths)]
(go
(<! (<rsapi-cancel-all-requests))
(let [token (<! (<get-token this))]
(<! (<retry-rsapi
#(p->c (ipc/ipc "update-remote-files" graph-uuid base-path normalized-filepaths local-txid token))))))))
(let [token-or-exp (<! (<get-token this))]
(or (guard-ex token-or-exp)
(<! (<retry-rsapi
#(p->c (ipc/ipc "update-remote-files" graph-uuid base-path normalized-filepaths local-txid token-or-exp)))))))))
(<delete-remote-files [this graph-uuid base-path filepaths local-txid]
(let [normalized-filepaths (mapv path-normalize filepaths)]
(go
(let [token (<! (<get-token this))]
(<!
(<retry-rsapi
#(p->c (ipc/ipc "delete-remote-files" graph-uuid base-path normalized-filepaths local-txid token))))))))
(let [token-or-exp (<! (<get-token this))]
(or (guard-ex token-or-exp)
(<!
(<retry-rsapi
#(p->c
(ipc/ipc "delete-remote-files" graph-uuid base-path normalized-filepaths local-txid token-or-exp)))))))))
(<encrypt-fnames [_ graph-uuid fnames] (go (js->clj (<! (p->c (ipc/ipc "encrypt-fnames" graph-uuid fnames))))))
(<decrypt-fnames [_ graph-uuid fnames] (go
(let [r (<! (p->c (ipc/ipc "decrypt-fnames" graph-uuid fnames)))]
@@ -931,9 +943,8 @@
(go
(let [r (<! (p->c (.getLocalAllFilesMeta mobile-util/file-sync (clj->js {:graphUUID graph-uuid
:basePath base-path}))))]
(if (instance? ExceptionInfo r)
r
(<! (<build-local-file-metadatas this graph-uuid (.-result r)))))))
(or (guard-ex r)
(<! (<build-local-file-metadatas this graph-uuid (.-result r)))))))
(<get-local-files-meta [this graph-uuid base-path filepaths]
(go
@@ -953,32 +964,35 @@
(<update-local-files [this graph-uuid base-path filepaths]
(go
(let [token (<! (<get-token this))
(let [token-or-exp (<! (<get-token this))
filepaths' (map path-normalize filepaths)]
(<! (p->c (.updateLocalFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths filepaths'
:token token})))))))
(or (guard-ex token-or-exp)
(<! (p->c (.updateLocalFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths filepaths'
:token token-or-exp}))))))))
(<fetch-remote-files [this graph-uuid base-path filepaths]
(go
(let [token (<! (<get-token this))
r (<! (<retry-rsapi
(let [token-or-exp (<! (<get-token this))]
(or (guard-ex token-or-exp)
(js->clj
(.-value
(<! (<retry-rsapi
#(p->c (.fetchRemoteFiles mobile-util/file-sync
(clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths filepaths
:token token})))))]
(js->clj (.-value r)))))
:token token-or-exp})))))))))))
(<download-version-files [this graph-uuid base-path filepaths]
(go
(let [token (<! (<get-token this))
r (<! (<retry-rsapi
#(p->c (.updateLocalVersionFiles mobile-util/file-sync
(clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths filepaths
:token token})))))]
r)))
(let [token-or-exp (<! (<get-token this))]
(or (guard-ex token-or-exp)
(<! (<retry-rsapi
#(p->c (.updateLocalVersionFiles mobile-util/file-sync
(clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths filepaths
:token token-or-exp})))))))))
(<delete-local-files [_ graph-uuid base-path filepaths]
(let [normalized-filepaths (mapv path-normalize filepaths)]
@@ -992,40 +1006,39 @@
(<update-remote-files [this graph-uuid base-path filepaths local-txid]
(let [normalized-filepaths (mapv path-normalize filepaths)]
(go
(let [token (<! (<get-token this))
r (<! (p->c (.updateRemoteFiles mobile-util/file-sync
(clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths normalized-filepaths
:txid local-txid
:token token
:fnameEncryption true}))))]
(if (instance? ExceptionInfo r)
r
(get (js->clj r) "txid"))))))
(let [token-or-exp (<! (<get-token this))
r (or (guard-ex token-or-exp)
(<! (p->c (.updateRemoteFiles mobile-util/file-sync
(clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths normalized-filepaths
:txid local-txid
:token token-or-exp
:fnameEncryption true})))))]
(or (guard-ex r)
(get (js->clj r) "txid"))))))
(<delete-remote-files [this graph-uuid base-path filepaths local-txid]
(let [normalized-filepaths (mapv path-normalize filepaths)]
(go
(let [token (<! (<get-token this))
r (<! (p->c (.deleteRemoteFiles mobile-util/file-sync
(clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths normalized-filepaths
:txid local-txid
:token token}))))]
(if (instance? ExceptionInfo r)
r
(get (js->clj r) "txid"))))))
(let [token-or-exp (<! (<get-token this))
r (or (guard-ex token-or-exp)
(<! (p->c (.deleteRemoteFiles mobile-util/file-sync
(clj->js {:graphUUID graph-uuid
:basePath base-path
:filePaths normalized-filepaths
:txid local-txid
:token token-or-exp})))))]
(or (guard-ex r)
(get (js->clj r) "txid"))))))
(<encrypt-fnames [_ graph-uuid fnames]
(go
(let [r (<! (p->c (.encryptFnames mobile-util/file-sync
(clj->js {:graphUUID graph-uuid
:filePaths fnames}))))]
(if (instance? ExceptionInfo r)
(.-cause r)
(get (js->clj r) "value")))))
(or (guard-ex r)
(get (js->clj r) "value")))))
(<decrypt-fnames [_ graph-uuid fnames]
(go (let [r (<! (p->c (.decryptFnames mobile-util/file-sync
(clj->js {:graphUUID graph-uuid
@@ -1147,19 +1160,21 @@
(<request [this api-name body]
(go
(let [resp (<! (<request api-name body (<! (<get-token this)) *stopped?))]
(if (http/unexceptional-status? (:status resp))
(get-resp-json-body resp)
(let [exp (ex-info "request failed"
{:err resp
:body (:body resp)
:api-name api-name
:request-body body})]
(fire-file-sync-storage-exceed-limit-event! exp)
(fire-file-sync-graph-count-exceed-limit-event! exp)
exp)))))
(let [token-or-exp (<! (<get-token this))]
(or (guard-ex token-or-exp)
(let [resp (<! (<request api-name body token-or-exp *stopped?))]
(if (http/unexceptional-status? (:status resp))
(get-resp-json-body resp)
(let [exp (ex-info "request failed"
{:err resp
:body (:body resp)
:api-name api-name
:request-body body})]
(fire-file-sync-storage-exceed-limit-event! exp)
(fire-file-sync-graph-count-exceed-limit-event! exp)
exp)))))))
;; for test
;; for test
(update-files [this graph-uuid txid files]
{:pre [(map? files)
(number? txid)]}
@@ -1232,68 +1247,63 @@
{}
(remove (comp nil? second)
{:GraphUUID graph-uuid :ContinuationToken continuation-token}))))]
(if (instance? ExceptionInfo r)
r
(let [next-continuation-token (:NextContinuationToken r)
objs (:Objects r)]
(apply conj! encrypted-path-list (map (comp remove-user-graph-uuid-prefix :Key) objs))
(apply conj! file-meta-list
(map
#(hash-map :checksum (:checksum %)
:encrypted-path (remove-user-graph-uuid-prefix (:Key %))
:size (:Size %)
:last-modified (:LastModified %)
:txid (:Txid %))
objs))
(when-not (empty? next-continuation-token)
(recur next-continuation-token)))))))]
(if (instance? ExceptionInfo exp-r)
exp-r
(let [file-meta-list* (persistent! file-meta-list)
encrypted-path-list* (persistent! encrypted-path-list)
path-list-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-path-list*))]
(if (instance? ExceptionInfo path-list-or-exp)
path-list-or-exp
(let [encrypted-path->path-map (zipmap encrypted-path-list* path-list-or-exp)]
(set
(mapv
#(->FileMetadata (:size %)
(:checksum %)
(get encrypted-path->path-map (:encrypted-path %))
(:encrypted-path %)
(:last-modified %)
true
(:txid %)
nil)
(-> file-meta-list*
(filter-files-with-unnormalized-path encrypted-path->path-map)
(filter-case-different-same-files encrypted-path->path-map)))))))))))
(or (guard-ex r)
(let [next-continuation-token (:NextContinuationToken r)
objs (:Objects r)]
(apply conj! encrypted-path-list (map (comp remove-user-graph-uuid-prefix :Key) objs))
(apply conj! file-meta-list
(map
#(hash-map :checksum (:checksum %)
:encrypted-path (remove-user-graph-uuid-prefix (:Key %))
:size (:Size %)
:last-modified (:LastModified %)
:txid (:Txid %))
objs))
(when-not (empty? next-continuation-token)
(recur next-continuation-token)))))))]
(or (guard-ex exp-r)
(let [file-meta-list* (persistent! file-meta-list)
encrypted-path-list* (persistent! encrypted-path-list)
path-list-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-path-list*))]
(or (guard-ex path-list-or-exp)
(let [encrypted-path->path-map (zipmap encrypted-path-list* path-list-or-exp)]
(set
(mapv
#(->FileMetadata (:size %)
(:checksum %)
(get encrypted-path->path-map (:encrypted-path %))
(:encrypted-path %)
(:last-modified %)
true
(:txid %)
nil)
(-> file-meta-list*
(filter-files-with-unnormalized-path encrypted-path->path-map)
(filter-case-different-same-files encrypted-path->path-map)))))))))))
(<get-remote-files-meta [this graph-uuid filepaths]
{:pre [(coll? filepaths)]}
(user/<wrap-ensure-id&access-token
(let [encrypted-paths* (<! (<encrypt-fnames rsapi graph-uuid filepaths))
r (<! (.<request this "get_files_meta" {:GraphUUID graph-uuid :Files encrypted-paths*}))]
(if (instance? ExceptionInfo r)
r
(let [encrypted-paths (mapv :FilePath r)
paths-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths))]
(if (instance? ExceptionInfo paths-or-exp)
paths-or-exp
(let [encrypted-path->path-map (zipmap encrypted-paths paths-or-exp)]
(into #{}
(comp
(filter #(not= "filepath too long" (:Error %)))
(map #(->FileMetadata (:Size %)
(:Checksum %)
(some->> (get encrypted-path->path-map (:FilePath %))
path-normalize)
(:FilePath %)
(:LastModified %)
true
(:Txid %)
nil)))
r))))))))
(or (guard-ex r)
(let [encrypted-paths (mapv :FilePath r)
paths-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths))]
(or (guard-ex paths-or-exp)
(let [encrypted-path->path-map (zipmap encrypted-paths paths-or-exp)]
(into #{}
(comp
(filter #(not= "filepath too long" (:Error %)))
(map #(->FileMetadata (:Size %)
(:Checksum %)
(some->> (get encrypted-path->path-map (:FilePath %))
path-normalize)
(:FilePath %)
(:LastModified %)
true
(:Txid %)
nil)))
r))))))))
(<get-remote-graph [this graph-name-opt graph-uuid-opt]
{:pre [(or graph-name-opt graph-uuid-opt)]}
@@ -1320,23 +1330,22 @@
(<get-deletion-logs [this graph-uuid from-txid]
(user/<wrap-ensure-id&access-token
(let [r (<! (.<request this "get_deletion_log_v20221212" {:GraphUUID graph-uuid :FromTXId from-txid}))]
(if (instance? ExceptionInfo r)
r
(let [txns-with-encrypted-paths (mapv (fn [txn]
(assoc txn :paths
(mapv remove-user-graph-uuid-prefix (:paths txn))))
(:Transactions r))
encrypted-paths (mapcat :paths txns-with-encrypted-paths)
encrypted-path->path-map
(zipmap
encrypted-paths
(<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
txns
(mapv
(fn [txn]
(assoc txn :paths (mapv #(get encrypted-path->path-map %) (:paths txn))))
txns-with-encrypted-paths)]
txns)))))
(or (guard-ex r)
(let [txns-with-encrypted-paths (mapv (fn [txn]
(assoc txn :paths
(mapv remove-user-graph-uuid-prefix (:paths txn))))
(:Transactions r))
encrypted-paths (mapcat :paths txns-with-encrypted-paths)
encrypted-path->path-map
(zipmap
encrypted-paths
(<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
txns
(mapv
(fn [txn]
(assoc txn :paths (mapv #(get encrypted-path->path-map %) (:paths txn))))
txns-with-encrypted-paths)]
txns)))))
(<get-diff [this graph-uuid from-txid]
;; TODO: path in transactions should be relative path(now s3 key, which includes graph-uuid and user-uuid)
@@ -1418,9 +1427,11 @@
(let [partitioned-files (partition-all 20 (<! (<encrypt-fnames rsapi graph-uuid filepaths)))]
(loop [[files & others] partitioned-files]
(when files
(let [current-txid (:TXId (<! (<get-remote-txid this graph-uuid)))]
(<! (.<request this "delete_files" {:GraphUUID graph-uuid :TXId current-txid :Files files}))
(recur others))))))))
(let [r (<! (<get-remote-txid this graph-uuid))]
(or (guard-ex r)
(let [current-txid (:TXId r)]
(<! (.<request this "delete_files" {:GraphUUID graph-uuid :TXId current-txid :Files files}))
(recur others))))))))))
(comment
(declare remoteapi)
@@ -1445,8 +1456,9 @@
(if (< now expired-at)
r
(let [r (<! (<get-graph-salt remoteapi graph-uuid))]
(swap! *get-graph-salt-memoize-cache conj [graph-uuid r])
r)))))
(or (guard-ex r)
(do (swap! *get-graph-salt-memoize-cache conj [graph-uuid r])
r)))))))
(def ^:private *get-graph-encrypt-keys-memoize-cache (atom {}))
(defn update-graph-encrypt-keys-cache [graph-uuid v]
@@ -1512,9 +1524,11 @@
(defn- assert-local-txid<=remote-txid
[]
(when-let [local-txid (last @graphs-txid)]
(go (let [remote-txid (:TXId (<! (<get-remote-txid remoteapi (second @graphs-txid))))]
(assert (<= local-txid remote-txid)
[@graphs-txid local-txid remote-txid])))))
(go (let [r (<! (<get-remote-txid remoteapi (second @graphs-txid)))]
(when-not (guard-ex r)
(let [remote-txid (:TXId r)]
(assert (<= local-txid remote-txid)
[@graphs-txid local-txid remote-txid])))))))
(defn- get-local-files-checksum
[graph-uuid base-path relative-paths]
@@ -2034,17 +2048,17 @@
"- persist encrypted pwd at local-storage"
[pwd graph-uuid]
(go
(let [[value expired-at gone?]
((juxt :value :expired-at #(-> % ex-data :err :status (= 410)))
(<! (<get-graph-salt-memoize remoteapi graph-uuid)))
[salt-value _expired-at]
(if gone?
(let [r (<! (<create-graph-salt remoteapi graph-uuid))]
(update-graph-salt-cache graph-uuid r)
((juxt :value :expired-at) r))
[value expired-at])
encrypted-pwd (<! (<encrypt-content pwd salt-value))]
(persist-pwd! encrypted-pwd graph-uuid))))
(let [[value _expired-at gone?] ((juxt :value :expired-at #(-> % ex-data :err :status (= 410)))
(<! (<get-graph-salt-memoize remoteapi graph-uuid)))]
(if gone?
(let [r (<! (<create-graph-salt remoteapi graph-uuid))]
(or (guard-ex r)
(do (update-graph-salt-cache graph-uuid r)
(let [[salt-value _expired-at] ((juxt :value :expired-at) r)
encrypted-pwd (<! (<encrypt-content pwd salt-value))]
(persist-pwd! encrypted-pwd graph-uuid)))))
(let [encrypted-pwd (<! (<encrypt-content pwd value))]
(persist-pwd! encrypted-pwd graph-uuid))))))
(defn restore-pwd!
"restore pwd from persisted encrypted-pwd, update `pwd-map`"
@@ -2386,8 +2400,8 @@
if local-txid != remote-txid, return {:need-sync-remote true}"))
(defrecord ^:large-vars/cleanup-todo
Remote->LocalSyncer [user-uuid graph-uuid base-path repo *txid *txid-for-get-deletion-log *sync-state remoteapi
^:mutable local->remote-syncer *stopped *paused]
Remote->LocalSyncer [user-uuid graph-uuid base-path repo *txid *txid-for-get-deletion-log *sync-state remoteapi
^:mutable local->remote-syncer *stopped *paused]
Object
(set-local->remote-syncer! [_ s] (set! local->remote-syncer s))
(sync-files-remote->local!
@@ -2426,34 +2440,33 @@
(go
(let [r
(let [diff-r (<! (<get-diff remoteapi graph-uuid @*txid))]
(if (instance? ExceptionInfo diff-r)
diff-r
(let [[diff-txns latest-txid min-txid] diff-r]
(if (> (dec min-txid) @*txid) ;; min-txid-1 > @*txid, need to remote->local-full-sync
(do (println "min-txid" min-txid "request-txid" @*txid)
{:need-remote->local-full-sync true})
(or (guard-ex diff-r)
(let [[diff-txns latest-txid min-txid] diff-r]
(if (> (dec min-txid) @*txid) ;; min-txid-1 > @*txid, need to remote->local-full-sync
(do (println "min-txid" min-txid "request-txid" @*txid)
{:need-remote->local-full-sync true})
(when (pos-int? latest-txid)
(let [filtered-diff-txns (-> (transduce (diffs->filetxns) conj '() (reverse diff-txns))
filter-download-files-with-reserved-chars)
partitioned-filetxns (transduce (partition-filetxns download-batch-size)
(completing (fn [r i] (conj r (reverse i)))) ;reverse
'()
filtered-diff-txns)]
(put-sync-event! {:event :start
:data {:type :remote->local
:graph-uuid graph-uuid
:full-sync? false
:epoch (tc/to-epoch (t/now))}})
(if (empty? (flatten partitioned-filetxns))
(do
(swap! *sync-state #(sync-state-reset-full-remote->local-files % []))
(<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
(reset! *txid latest-txid)
{:succ true})
(<! (apply-filetxns-partitions
*sync-state user-uuid graph-uuid base-path
partitioned-filetxns repo *txid *stopped *paused false)))))))))]
(when (pos-int? latest-txid)
(let [filtered-diff-txns (-> (transduce (diffs->filetxns) conj '() (reverse diff-txns))
filter-download-files-with-reserved-chars)
partitioned-filetxns (transduce (partition-filetxns download-batch-size)
(completing (fn [r i] (conj r (reverse i)))) ;reverse
'()
filtered-diff-txns)]
(put-sync-event! {:event :start
:data {:type :remote->local
:graph-uuid graph-uuid
:full-sync? false
:epoch (tc/to-epoch (t/now))}})
(if (empty? (flatten partitioned-filetxns))
(do
(swap! *sync-state #(sync-state-reset-full-remote->local-files % []))
(<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
(reset! *txid latest-txid)
{:succ true})
(<! (apply-filetxns-partitions
*sync-state user-uuid graph-uuid base-path
partitioned-filetxns repo *txid *stopped *paused false)))))))))]
(cond
(instance? ExceptionInfo r) {:unknown r}
@*stopped {:stop true}
@@ -2468,7 +2481,8 @@
remote-all-files-meta-or-exp (<! remote-all-files-meta-c)]
(if (or (storage-exceed-limit? remote-all-files-meta-or-exp)
(sync-stop-when-api-flying? remote-all-files-meta-or-exp)
(decrypt-exp? remote-all-files-meta-or-exp))
(decrypt-exp? remote-all-files-meta-or-exp)
(instance? ExceptionInfo remote-all-files-meta-or-exp))
(do (put-sync-event! {:event :exception-decrypt-failed
:data {:graph-uuid graph-uuid
:exp remote-all-files-meta-or-exp
@@ -2478,11 +2492,11 @@
local-all-files-meta (<! local-all-files-meta-c)
{diff-remote-files :result elapsed-time :time}
(util/with-time (diff-file-metadata-sets remote-all-files-meta local-all-files-meta))
_ (println ::diff-file-metadata-sets-elapsed-time elapsed-time "ms")
_ (println ::diff-file-metadata-sets-elapsed-time elapsed-time "ms")
recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
sorted-diff-remote-files
(sort-by
(sort-file-metadata-fn :recent-days-range recent-10-days-range) > diff-remote-files)
(sort-by
(sort-file-metadata-fn :recent-days-range recent-10-days-range) > diff-remote-files)
remote-txid-or-ex (<! (<get-remote-txid remoteapi graph-uuid))
latest-txid (:TXId remote-txid-or-ex)]
(if (or (instance? ExceptionInfo remote-txid-or-ex) (nil? latest-txid))
@@ -2769,7 +2783,8 @@
(cond
(or (storage-exceed-limit? remote-all-files-meta-or-exp)
(sync-stop-when-api-flying? remote-all-files-meta-or-exp)
(decrypt-exp? remote-all-files-meta-or-exp))
(decrypt-exp? remote-all-files-meta-or-exp)
(instance? ExceptionInfo remote-all-files-meta-or-exp))
(do (put-sync-event! {:event :get-remote-all-files-failed
:data {:graph-uuid graph-uuid
:exp remote-all-files-meta-or-exp
@@ -2931,7 +2946,8 @@
remote->local
(let [txid
(if (true? remote->local)
{:txid (:TXId (<! (<get-remote-txid remoteapi graph-uuid)))}
(let [r (<! (<get-remote-txid remoteapi graph-uuid))]
(when-not (guard-ex r) {:txid (:TXId r)}))
remote->local)]
(when (some? txid)
(>! ops-chan {:remote->local txid}))
@@ -3257,8 +3273,6 @@
(reset! current-sm-graph-uuid graph-uuid)
(sync-manager user-uuid graph-uuid base-path repo txid *sync-state)))
;; Avoid sync reentrancy
(defonce *sync-entered? (atom false))
(defn <sync-stop []
(go
@@ -3269,8 +3283,6 @@
(<! (-stop! sm))
(reset! *sync-entered? false)
(println "[SyncManager]" "stopped"))
(reset! current-sm-graph-uuid nil)))
@@ -3352,54 +3364,62 @@
(declare network-online-cursor)
(def ^:private *sync-starting
"Avoid running multiple sync instances simultaneously."
(atom false))
(defn <sync-start
[]
(go
(when (and (state/enable-sync?)
(false? @*sync-entered?)
(<! (<connectivity-testing)))
(reset! *sync-entered? true)
(let [*sync-state (atom (sync-state))
current-user-uuid (<! (user/<user-uuid))
(when-not @*sync-starting
(reset! *sync-starting true)
(if-not (and (state/enable-sync?)
(or (nil? (state/get-file-sync-state))
(= ::stop (:state (state/get-file-sync-state))))
(<! (<connectivity-testing)))
(reset! *sync-starting false)
(try
(let [*sync-state (atom (sync-state))
current-user-uuid (<! (user/<user-uuid))
;; put @graph-uuid & get-current-repo together,
;; prevent to get older repo dir and current graph-uuid.
_ (<! (p->c (persist-var/-load graphs-txid)))
[user-uuid graph-uuid txid] @graphs-txid
txid (or txid 0)
repo (state/get-current-repo)]
(when-not (instance? ExceptionInfo current-user-uuid)
(when (and repo
@network-online-cursor
user-uuid graph-uuid txid
(graph-sync-off? graph-uuid)
(user/logged-in?)
(not (config/demo-graph? repo)))
(try
(when-let [sm (sync-manager-singleton current-user-uuid graph-uuid
(config/get-repo-dir repo) repo
txid *sync-state)]
(when (check-graph-belong-to-current-user current-user-uuid user-uuid)
(if-not (<! (<check-remote-graph-exists graph-uuid)) ; remote graph has been deleted
(clear-graphs-txid! repo)
(do
(state/set-file-sync-state graph-uuid @*sync-state)
(state/set-file-sync-manager graph-uuid sm)
_ (<! (p->c (persist-var/-load graphs-txid)))
[user-uuid graph-uuid txid] @graphs-txid
txid (or txid 0)
repo (state/get-current-repo)]
(when-not (instance? ExceptionInfo current-user-uuid)
(when (and repo
@network-online-cursor
user-uuid graph-uuid txid
(graph-sync-off? graph-uuid)
(user/logged-in?)
(not (config/demo-graph? repo)))
(when-let [sm (sync-manager-singleton current-user-uuid graph-uuid
(config/get-repo-dir repo) repo
txid *sync-state)]
(when (check-graph-belong-to-current-user current-user-uuid user-uuid)
(if-not (<! (<check-remote-graph-exists graph-uuid)) ; remote graph has been deleted
(clear-graphs-txid! repo)
(do
(state/set-file-sync-state graph-uuid @*sync-state)
(state/set-file-sync-manager graph-uuid sm)
;; update global state when *sync-state changes
(add-watch *sync-state ::update-global-state
(fn [_ _ _ n]
(state/set-file-sync-state graph-uuid n)))
;; update global state when *sync-state changes
(add-watch *sync-state ::update-global-state
(fn [_ _ _ n]
(state/set-file-sync-state graph-uuid n)))
(state/set-state! [:file-sync/graph-state :current-graph-uuid] graph-uuid)
(state/set-state! [:file-sync/graph-state :current-graph-uuid] graph-uuid)
(.start sm)
(.start sm)
(offer! remote->local-full-sync-chan true)
(offer! full-sync-chan true)))))
(catch :default e
(prn "Sync start error: ")
(log/error :exception e)))))
(reset! *sync-entered? false)))))
(offer! remote->local-full-sync-chan true)
(offer! full-sync-chan true))))))))
(catch :default e
(prn "Sync start error: ")
(log/error :exception e))
(finally
(reset! *sync-starting false)))))))
(defn- restart-if-stopped!
[is-active?]
@@ -3469,6 +3489,14 @@
(when (nil? n)
(<sync-stop))))
;; try to re-start sync when state=stopped every 1min
(go-loop []
(<! (timeout 60000))
(when (and @network-online-cursor ; is online
(= ::stop (:state (state/get-file-sync-state))))
(println "trying to restart sync...")
(<sync-start))
(recur))
;;; ### some sync events handler