mirror of
https://github.com/logseq/logseq.git
synced 2026-06-01 19:01:22 +00:00
fix: sync crashes
1. make sure there's no sync reentrancy 2. there's no need to restore pwd after user input the password
This commit is contained in:
@@ -116,7 +116,7 @@
|
||||
(if (instance? js/Error persist-r)
|
||||
(js/console.error persist-r)
|
||||
(when (fn? after-input-password)
|
||||
(async/<! (after-input-password))
|
||||
(after-input-password @*password)
|
||||
;; TODO: it's better if based on sync state
|
||||
(when init-graph-keys
|
||||
(js/setTimeout #(state/pub-event! [:file-sync/maybe-onboarding-show :sync-learn]) 10000)))))))))))
|
||||
|
||||
@@ -298,9 +298,9 @@
|
||||
(defn <request [api-name & args]
|
||||
(let [name (str api-name (.now js/Date))]
|
||||
(go (swap! *on-flying-request conj name)
|
||||
(let [r (<! (apply <request* api-name args))]
|
||||
(swap! *on-flying-request disj name)
|
||||
r))))
|
||||
(let [r (<! (apply <request* api-name args))]
|
||||
(swap! *on-flying-request disj name)
|
||||
r))))
|
||||
|
||||
(defn- remove-dir-prefix [dir path]
|
||||
(let [r (string/replace path (js/RegExp. (str "^" (gstring/regExpEscape dir))) "")]
|
||||
@@ -339,7 +339,7 @@
|
||||
(-stop! [this]))
|
||||
(defprotocol IStopped?
|
||||
(-stopped? [this]))
|
||||
;from-path, to-path is relative path
|
||||
;from-path, to-path is relative path
|
||||
(deftype FileTxn [from-path to-path updated? deleted? txid checksum]
|
||||
Object
|
||||
(renamed? [_]
|
||||
@@ -384,19 +384,19 @@
|
||||
(let [update? (= "update_files" TXType)
|
||||
delete? (= "delete_files" TXType)
|
||||
update-xf
|
||||
(comp
|
||||
(remove #(or (empty? (first %))
|
||||
(empty? (last %))))
|
||||
(map #(->FileTxn (first %) (first %) update? delete? TXId (last %))))
|
||||
(comp
|
||||
(remove #(or (empty? (first %))
|
||||
(empty? (last %))))
|
||||
(map #(->FileTxn (first %) (first %) update? delete? TXId (last %))))
|
||||
delete-xf
|
||||
(comp
|
||||
(remove #(empty? (first %)))
|
||||
(map #(->FileTxn (first %) (first %) update? delete? TXId nil)))
|
||||
(comp
|
||||
(remove #(empty? (first %)))
|
||||
(map #(->FileTxn (first %) (first %) update? delete? TXId nil)))
|
||||
rename-xf
|
||||
(comp
|
||||
(remove #(or (empty? (first %))
|
||||
(empty? (second %))))
|
||||
(map #(->FileTxn (second %) (first %) false false TXId nil)))
|
||||
(comp
|
||||
(remove #(or (empty? (first %))
|
||||
(empty? (second %))))
|
||||
(map #(->FileTxn (second %) (first %) false false TXId nil)))
|
||||
xf (case TXType
|
||||
"delete_files" delete-xf
|
||||
"update_files" update-xf
|
||||
@@ -582,21 +582,21 @@
|
||||
#{} s1))
|
||||
|
||||
(comment
|
||||
(defn map->FileMetadata [m]
|
||||
(apply ->FileMetadata ((juxt :size :etag :path :encrypted-path :last-modified :remote? (constantly nil)) m)))
|
||||
(defn map->FileMetadata [m]
|
||||
(apply ->FileMetadata ((juxt :size :etag :path :encrypted-path :last-modified :remote? (constantly nil)) m)))
|
||||
|
||||
(assert
|
||||
(=
|
||||
#{(map->FileMetadata {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2})}
|
||||
(diff-file-metadata-sets
|
||||
(into #{}
|
||||
(map map->FileMetadata)
|
||||
[{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
|
||||
{:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2}])
|
||||
(into #{}
|
||||
(map map->FileMetadata)
|
||||
[{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
|
||||
{:size 1 :etag 1 :path 2 :encrypted-path 2 :last-modified 1}])))))
|
||||
(assert
|
||||
(=
|
||||
#{(map->FileMetadata {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2})}
|
||||
(diff-file-metadata-sets
|
||||
(into #{}
|
||||
(map map->FileMetadata)
|
||||
[{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
|
||||
{:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2}])
|
||||
(into #{}
|
||||
(map map->FileMetadata)
|
||||
[{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
|
||||
{:size 1 :etag 1 :path 2 :encrypted-path 2 :last-modified 1}])))))
|
||||
|
||||
(extend-protocol IChecksum
|
||||
FileMetadata
|
||||
@@ -1030,8 +1030,8 @@
|
||||
(go-loop []
|
||||
(let [{:keys [val stop]}
|
||||
(async/alt!
|
||||
debug-print-sync-events-loop-stop-chan {:stop true}
|
||||
out-ch ([v] {:val v}))]
|
||||
debug-print-sync-events-loop-stop-chan {:stop true}
|
||||
out-ch ([v] {:val v}))]
|
||||
(cond
|
||||
stop (do (async/unmix-all out-mix)
|
||||
(doseq [[topic ch] topic&chs]
|
||||
@@ -1047,28 +1047,28 @@
|
||||
|
||||
|
||||
(comment
|
||||
;; sub one type event example:
|
||||
(def c1 (chan 10))
|
||||
(async/sub sync-events-publication :created-local-version-file c1)
|
||||
(offer! sync-events-chan {:event :created-local-version-file :data :xxx})
|
||||
(poll! c1)
|
||||
;; sub one type event example:
|
||||
(def c1 (chan 10))
|
||||
(async/sub sync-events-publication :created-local-version-file c1)
|
||||
(offer! sync-events-chan {:event :created-local-version-file :data :xxx})
|
||||
(poll! c1)
|
||||
|
||||
;; sub multiple type events example:
|
||||
;; sub :created-local-version-file and :finished-remote->local events,
|
||||
;; output into channel c4-out
|
||||
(def c2 (chan 10))
|
||||
(def c3 (chan 10))
|
||||
(def c4-out (chan 10))
|
||||
(def mix-out (async/mix c4-out))
|
||||
(async/admix mix-out c2)
|
||||
(async/admix mix-out c3)
|
||||
(async/sub sync-events-publication :created-local-version-file c2)
|
||||
(async/sub sync-events-publication :finished-remote->local c3)
|
||||
(offer! sync-events-chan {:event :created-local-version-file :data :xxx})
|
||||
(offer! sync-events-chan {:event :finished-remote->local :data :xxx})
|
||||
(poll! c4-out)
|
||||
(poll! c4-out)
|
||||
)
|
||||
;; sub multiple type events example:
|
||||
;; sub :created-local-version-file and :finished-remote->local events,
|
||||
;; output into channel c4-out
|
||||
(def c2 (chan 10))
|
||||
(def c3 (chan 10))
|
||||
(def c4-out (chan 10))
|
||||
(def mix-out (async/mix c4-out))
|
||||
(async/admix mix-out c2)
|
||||
(async/admix mix-out c3)
|
||||
(async/sub sync-events-publication :created-local-version-file c2)
|
||||
(async/sub sync-events-publication :finished-remote->local c3)
|
||||
(offer! sync-events-chan {:event :created-local-version-file :data :xxx})
|
||||
(offer! sync-events-chan {:event :finished-remote->local :data :xxx})
|
||||
(poll! c4-out)
|
||||
(poll! c4-out)
|
||||
)
|
||||
|
||||
;;; sync events ends
|
||||
|
||||
@@ -1126,27 +1126,27 @@
|
||||
(let [file-meta-list (transient #{})
|
||||
encrypted-path-list (transient [])
|
||||
exp-r
|
||||
(<!
|
||||
(go-loop [continuation-token nil]
|
||||
(let [r (<! (.<request this "get_all_files"
|
||||
(into
|
||||
{}
|
||||
(remove (comp nil? second)
|
||||
{:GraphUUID graph-uuid :ContinuationToken continuation-token}))))]
|
||||
(if (instance? ExceptionInfo r)
|
||||
r
|
||||
(let [next-continuation-token (:NextContinuationToken r)
|
||||
objs (:Objects r)]
|
||||
(apply conj! encrypted-path-list (map (comp remove-user-graph-uuid-prefix :Key) objs))
|
||||
(apply conj! file-meta-list
|
||||
(map
|
||||
#(hash-map :checksum (:checksum %)
|
||||
:encrypted-path (remove-user-graph-uuid-prefix (:Key %))
|
||||
:size (:Size %)
|
||||
:last-modified (:LastModified %))
|
||||
objs))
|
||||
(when-not (empty? next-continuation-token)
|
||||
(recur next-continuation-token)))))))]
|
||||
(<!
|
||||
(go-loop [continuation-token nil]
|
||||
(let [r (<! (.<request this "get_all_files"
|
||||
(into
|
||||
{}
|
||||
(remove (comp nil? second)
|
||||
{:GraphUUID graph-uuid :ContinuationToken continuation-token}))))]
|
||||
(if (instance? ExceptionInfo r)
|
||||
r
|
||||
(let [next-continuation-token (:NextContinuationToken r)
|
||||
objs (:Objects r)]
|
||||
(apply conj! encrypted-path-list (map (comp remove-user-graph-uuid-prefix :Key) objs))
|
||||
(apply conj! file-meta-list
|
||||
(map
|
||||
#(hash-map :checksum (:checksum %)
|
||||
:encrypted-path (remove-user-graph-uuid-prefix (:Key %))
|
||||
:size (:Size %)
|
||||
:last-modified (:LastModified %))
|
||||
objs))
|
||||
(when-not (empty? next-continuation-token)
|
||||
(recur next-continuation-token)))))))]
|
||||
(if (instance? ExceptionInfo exp-r)
|
||||
exp-r
|
||||
(let [file-meta-list* (persistent! file-meta-list)
|
||||
@@ -1212,58 +1212,58 @@
|
||||
(let [txns-with-encrypted-paths (mapv #(update % :path remove-user-graph-uuid-prefix) (:Transactions r))
|
||||
encrypted-paths (mapv :path txns-with-encrypted-paths)
|
||||
encrypted-path->path-map
|
||||
(zipmap
|
||||
encrypted-paths
|
||||
(<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
|
||||
(zipmap
|
||||
encrypted-paths
|
||||
(<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
|
||||
txns
|
||||
(mapv
|
||||
(fn [txn] (update txn :path #(get encrypted-path->path-map %)))
|
||||
txns-with-encrypted-paths)]
|
||||
(mapv
|
||||
(fn [txn] (update txn :path #(get encrypted-path->path-map %)))
|
||||
txns-with-encrypted-paths)]
|
||||
txns)))))
|
||||
|
||||
(<get-diff [this graph-uuid from-txid]
|
||||
;; TODO: path in transactions should be relative path(now s3 key, which includes graph-uuid and user-uuid)
|
||||
;; TODO: path in transactions should be relative path(now s3 key, which includes graph-uuid and user-uuid)
|
||||
(go
|
||||
(let [r (<! (.<request this "get_diff" {:GraphUUID graph-uuid :FromTXId from-txid}))]
|
||||
(if (instance? ExceptionInfo r)
|
||||
r
|
||||
(let [txns-with-encrypted-paths (sort-by :TXId (:Transactions r))
|
||||
txns-with-encrypted-paths*
|
||||
(mapv
|
||||
(fn [txn]
|
||||
(assoc txn :TXContent
|
||||
(mapv
|
||||
(fn [[to-path from-path checksum]]
|
||||
[(remove-user-graph-uuid-prefix to-path)
|
||||
(some-> from-path remove-user-graph-uuid-prefix)
|
||||
checksum])
|
||||
(:TXContent txn))))
|
||||
txns-with-encrypted-paths)
|
||||
(mapv
|
||||
(fn [txn]
|
||||
(assoc txn :TXContent
|
||||
(mapv
|
||||
(fn [[to-path from-path checksum]]
|
||||
[(remove-user-graph-uuid-prefix to-path)
|
||||
(some-> from-path remove-user-graph-uuid-prefix)
|
||||
checksum])
|
||||
(:TXContent txn))))
|
||||
txns-with-encrypted-paths)
|
||||
encrypted-paths
|
||||
(mapcat
|
||||
(fn [txn]
|
||||
(remove
|
||||
#(or (nil? %) (not (string/starts-with? % "e.")))
|
||||
(mapcat
|
||||
(fn [[to-path from-path _checksum]] [to-path from-path])
|
||||
(:TXContent txn))))
|
||||
txns-with-encrypted-paths*)
|
||||
(mapcat
|
||||
(fn [txn]
|
||||
(remove
|
||||
#(or (nil? %) (not (string/starts-with? % "e.")))
|
||||
(mapcat
|
||||
(fn [[to-path from-path _checksum]] [to-path from-path])
|
||||
(:TXContent txn))))
|
||||
txns-with-encrypted-paths*)
|
||||
encrypted-path->path-map
|
||||
(zipmap
|
||||
encrypted-paths
|
||||
(<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
|
||||
(zipmap
|
||||
encrypted-paths
|
||||
(<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
|
||||
txns
|
||||
(mapv
|
||||
(fn [txn]
|
||||
(assoc
|
||||
txn :TXContent
|
||||
(mapv
|
||||
(fn [[to-path from-path checksum]]
|
||||
[(get encrypted-path->path-map to-path to-path)
|
||||
(some->> from-path (get encrypted-path->path-map))
|
||||
checksum])
|
||||
(:TXContent txn))))
|
||||
txns-with-encrypted-paths*)]
|
||||
(mapv
|
||||
(fn [txn]
|
||||
(assoc
|
||||
txn :TXContent
|
||||
(mapv
|
||||
(fn [[to-path from-path checksum]]
|
||||
[(get encrypted-path->path-map to-path to-path)
|
||||
(some->> from-path (get encrypted-path->path-map))
|
||||
checksum])
|
||||
(:TXContent txn))))
|
||||
txns-with-encrypted-paths*)]
|
||||
[txns
|
||||
(:TXId (last txns))
|
||||
(:TXId (first txns))])))))
|
||||
@@ -1399,12 +1399,12 @@
|
||||
(.-deleted? e) :delete-filetxns
|
||||
(.renamed? e) :rename-filetxns)) filetxns)
|
||||
update-file-items (map
|
||||
(fn [filetxn]
|
||||
(let [path (relative-path filetxn)]
|
||||
{:remote->local-type :update
|
||||
:checksum (-checksum filetxn)
|
||||
:path path}))
|
||||
update-filetxns)
|
||||
(fn [filetxn]
|
||||
(let [path (relative-path filetxn)]
|
||||
{:remote->local-type :update
|
||||
:checksum (-checksum filetxn)
|
||||
:path path}))
|
||||
update-filetxns)
|
||||
rename-file-items (mapcat
|
||||
(fn [^FileTxn filetxn]
|
||||
(let [to-path (relative-path filetxn)
|
||||
@@ -1417,12 +1417,12 @@
|
||||
:path from-path}]))
|
||||
rename-filetxns)
|
||||
delete-file-items (map
|
||||
(fn [filetxn]
|
||||
(let [path (relative-path filetxn)]
|
||||
{:remote->local-type :delete
|
||||
:checksum (-checksum filetxn)
|
||||
:path path}))
|
||||
delete-filetxns)]
|
||||
(fn [filetxn]
|
||||
(let [path (relative-path filetxn)]
|
||||
{:remote->local-type :delete
|
||||
:checksum (-checksum filetxn)
|
||||
:path path}))
|
||||
delete-filetxns)]
|
||||
(set (concat update-file-items rename-file-items delete-file-items))))
|
||||
|
||||
(defn- apply-filetxns
|
||||
@@ -1457,8 +1457,8 @@
|
||||
[recent-remote->local-file-item])
|
||||
(<! (<delete-local-files rsapi graph-uuid base-path [relative-p*]))
|
||||
(go (<! (timeout 5000))
|
||||
(swap! *sync-state sync-state--remove-recent-remote->local-files
|
||||
[recent-remote->local-file-item])))))
|
||||
(swap! *sync-state sync-state--remove-recent-remote->local-files
|
||||
[recent-remote->local-file-item])))))
|
||||
|
||||
(let [update-local-files-ch (<update-local-files rsapi graph-uuid base-path (map relative-path filetxns))
|
||||
r (<! (<with-pause update-local-files-ch *paused))]
|
||||
@@ -1510,8 +1510,8 @@
|
||||
(not (instance? ExceptionInfo r)))]
|
||||
;; remove these recent-remote->local-file-items 5s later
|
||||
(go (<! (timeout 5000))
|
||||
(swap! *sync-state sync-state--remove-recent-remote->local-files
|
||||
recent-remote->local-file-items))
|
||||
(swap! *sync-state sync-state--remove-recent-remote->local-files
|
||||
recent-remote->local-file-items))
|
||||
(cond
|
||||
(instance? ExceptionInfo r) r
|
||||
@*paused {:pause true}
|
||||
@@ -1611,7 +1611,7 @@
|
||||
path (relative-path e)]
|
||||
{:remote->local-type tp
|
||||
:checksum (if (= tp :delete) nil
|
||||
(val (first (<! (get-local-files-checksum graph-uuid (.-dir e) [path])))))
|
||||
(val (first (<! (get-local-files-checksum graph-uuid (.-dir e) [path])))))
|
||||
:path path})))
|
||||
|
||||
(defn- distinct-file-change-events-xf
|
||||
@@ -1672,8 +1672,8 @@
|
||||
(go-loop []
|
||||
(let [{:keys [rename-event local-change]}
|
||||
(async/alt!
|
||||
rename-page-event-chan ([v] {:rename-event v}) ;; {:repo X :old-path X :new-path}
|
||||
local-changes-chan ([v] {:local-change v}))]
|
||||
rename-page-event-chan ([v] {:rename-event v}) ;; {:repo X :old-path X :new-path}
|
||||
local-changes-chan ([v] {:local-change v}))]
|
||||
(cond
|
||||
rename-event
|
||||
(let [repo-dir (config/get-repo-dir (:repo rename-event))
|
||||
@@ -1686,7 +1686,7 @@
|
||||
(swap! *rename-events conj k1 k2)
|
||||
;; remove rename-events after 2s
|
||||
(go (<! (timeout 3000))
|
||||
(swap! *rename-events disj k1 k2))
|
||||
(swap! *rename-events disj k1 k2))
|
||||
;; add 2 simulated file-watcher events
|
||||
(>! ch (->FileChangeEvent "unlink" repo-dir (:old-path rename-event*) nil nil))
|
||||
(>! ch (->FileChangeEvent "add" repo-dir (:new-path rename-event*)
|
||||
@@ -1831,8 +1831,10 @@
|
||||
:input-pwd-remote
|
||||
{:GraphUUID graph-uuid
|
||||
:init-graph-keys init-graph-keys
|
||||
:after-input-password #(go (<! (restore-pwd! graph-uuid))
|
||||
(offer! <restored-pwd {:graph-uuid graph-uuid :value true}))}])
|
||||
:after-input-password (fn [pwd]
|
||||
(when pwd
|
||||
(swap! pwd-map assoc-in [graph-uuid :pwd] pwd)
|
||||
(offer! <restored-pwd {:graph-uuid graph-uuid :value true})))}])
|
||||
nil)
|
||||
pwd))))
|
||||
|
||||
@@ -1855,7 +1857,6 @@
|
||||
(<! (<get-graph-encrypt-keys-memoize remoteapi graph-uuid))
|
||||
init-graph-keys (some-> (ex-data r) :err :status (= 404))
|
||||
pwd (<! (<ensure-pwd-exists! repo graph-uuid init-graph-keys))]
|
||||
|
||||
(cond
|
||||
(not pwd)
|
||||
(do (println :debug "waiting password...")
|
||||
@@ -1884,6 +1885,7 @@
|
||||
(if (= :recur next-state)
|
||||
(recur)
|
||||
next-state))
|
||||
|
||||
:else
|
||||
;; pwd, public-key, encrypted-private-key all exist
|
||||
(do (assert (and pwd public-key encrypted-private-key) {:encrypted-private-key encrypted-private-key
|
||||
@@ -2163,8 +2165,8 @@
|
||||
(do
|
||||
(swap! *sync-state #(sync-state-reset-full-remote->local-files % []))
|
||||
(<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
|
||||
(reset! *txid latest-txid)
|
||||
{:succ true})))))
|
||||
(reset! *txid latest-txid)
|
||||
{:succ true})))))
|
||||
|
||||
IRemote->LocalSync
|
||||
(stop-remote->local! [_] (vreset! *stopped true))
|
||||
@@ -2238,7 +2240,7 @@
|
||||
(swap! *sync-state #(sync-state-reset-full-remote->local-files % sorted-diff-remote-files))
|
||||
(<! (.sync-files-remote->local!
|
||||
this (map (juxt relative-path -checksum)
|
||||
sorted-diff-remote-files)
|
||||
sorted-diff-remote-files)
|
||||
latest-txid))))))))))
|
||||
|
||||
(defn- <file-changed?
|
||||
@@ -2346,23 +2348,23 @@
|
||||
local-files-meta-map))))
|
||||
|
||||
(defrecord ^:large-vars/cleanup-todo
|
||||
Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi
|
||||
^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped *paused
|
||||
Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi
|
||||
^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped *paused
|
||||
;; control chans
|
||||
private-immediately-local->remote-chan private-recent-edited-chan]
|
||||
private-immediately-local->remote-chan private-recent-edited-chan]
|
||||
Object
|
||||
(filter-file-change-events-fn [_]
|
||||
(fn [^FileChangeEvent e]
|
||||
(go (and (instance? FileChangeEvent e)
|
||||
(if-let [mtime (:mtime (.-stat e))]
|
||||
;; if mtime is not nil, it should be after (- now 1min)
|
||||
;; ignore events too early
|
||||
;; if mtime is not nil, it should be after (- now 1min)
|
||||
;; ignore events too early
|
||||
(> (* 1000 mtime) (tc/to-long (t/minus (t/now) (t/minutes 1))))
|
||||
true)
|
||||
(or (string/starts-with? (.-dir e) base-path)
|
||||
(string/starts-with? (str "file://" (.-dir e)) base-path)) ; valid path prefix
|
||||
(not (ignored? e)) ;not ignored
|
||||
;; download files will also trigger file-change-events, ignore them
|
||||
;; download files will also trigger file-change-events, ignore them
|
||||
(not (contains? (:recent-remote->local-files @*sync-state)
|
||||
(<! (<file-change-event=>recent-remote->local-file-item
|
||||
graph-uuid e))))))))
|
||||
@@ -2437,7 +2439,7 @@
|
||||
{:need-sync-remote true})
|
||||
|
||||
(need-reset-local-txid? r*) ;; TODO: this cond shouldn't be true,
|
||||
;; but some potential bugs cause local-txid > remote-txid
|
||||
;; but some potential bugs cause local-txid > remote-txid
|
||||
(let [remote-graph-info-or-ex (<! (<get-remote-graph remoteapi nil graph-uuid))
|
||||
remote-txid (:TXId remote-graph-info-or-ex)]
|
||||
(if (or (instance? ExceptionInfo remote-graph-info-or-ex) (nil? remote-txid))
|
||||
@@ -2460,7 +2462,7 @@
|
||||
succ? ; succ
|
||||
(do
|
||||
(println "sync-local->remote! update txid" r*)
|
||||
;; persist txid
|
||||
;; persist txid
|
||||
(<! (<update-graphs-txid! r* graph-uuid user-uuid repo))
|
||||
(reset! *txid r*)
|
||||
{:succ true})
|
||||
@@ -2507,7 +2509,7 @@
|
||||
change-events
|
||||
(sequence
|
||||
(comp
|
||||
;; convert to FileChangeEvent
|
||||
;; convert to FileChangeEvent
|
||||
(map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %)
|
||||
{:size (:size %)} (:etag %)))
|
||||
(remove ignored?))
|
||||
@@ -2516,7 +2518,7 @@
|
||||
_ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distinct-change-events))
|
||||
change-events-partitions
|
||||
(sequence
|
||||
;; partition FileChangeEvents
|
||||
;; partition FileChangeEvents
|
||||
(partition-file-change-events upload-batch-size)
|
||||
distinct-change-events)]
|
||||
(println "[full-sync(local->remote)]"
|
||||
@@ -2527,7 +2529,7 @@
|
||||
:graph-uuid graph-uuid
|
||||
:full-sync? true
|
||||
:epoch (tc/to-epoch (t/now))}})
|
||||
;; 1. delete local files
|
||||
;; 1. delete local files
|
||||
(loop [[f & fs] delete-local-files]
|
||||
(when f
|
||||
(let [relative-p (relative-path f)]
|
||||
@@ -2543,7 +2545,7 @@
|
||||
[fake-recent-remote->local-file-item])))))
|
||||
(recur fs)))
|
||||
|
||||
;; 2. upload local files
|
||||
;; 2. upload local files
|
||||
(loop [es-partitions change-events-partitions]
|
||||
(if @*stopped
|
||||
{:stop true}
|
||||
@@ -2560,14 +2562,14 @@
|
||||
;;; ### put all stuff together
|
||||
|
||||
(defrecord ^:large-vars/cleanup-todo
|
||||
SyncManager [graph-uuid base-path *sync-state
|
||||
^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi
|
||||
^:mutable ratelimit-local-changes-chan
|
||||
*txid ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused?
|
||||
^:mutable ops-chan
|
||||
;; control chans
|
||||
private-full-sync-chan private-stop-sync-chan private-remote->local-sync-chan
|
||||
private-remote->local-full-sync-chan private-pause-resume-chan]
|
||||
SyncManager [graph-uuid base-path *sync-state
|
||||
^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi
|
||||
^:mutable ratelimit-local-changes-chan
|
||||
*txid ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused?
|
||||
^:mutable ops-chan
|
||||
;; control chans
|
||||
private-full-sync-chan private-stop-sync-chan private-remote->local-sync-chan
|
||||
private-remote->local-full-sync-chan private-pause-resume-chan]
|
||||
Object
|
||||
(schedule [this next-state args reason]
|
||||
{:pre [(s/valid? ::state next-state)]}
|
||||
@@ -2608,19 +2610,19 @@
|
||||
(go-loop []
|
||||
(let [{:keys [stop remote->local remote->local-full-sync local->remote-full-sync local->remote resume pause]}
|
||||
(async/alt!
|
||||
private-stop-sync-chan {:stop true}
|
||||
private-remote->local-full-sync-chan {:remote->local-full-sync true}
|
||||
private-remote->local-sync-chan {:remote->local true}
|
||||
private-full-sync-chan {:local->remote-full-sync true}
|
||||
private-pause-resume-chan ([v] (if v {:resume true} {:pause true}))
|
||||
remote-change-chan ([v] (println "remote change:" v) {:remote->local v})
|
||||
ratelimit-local-changes-chan ([v]
|
||||
(let [rest-v (util/drain-chan ratelimit-local-changes-chan)
|
||||
vs (cons v rest-v)]
|
||||
(println "local changes:" vs)
|
||||
{:local->remote vs}))
|
||||
(timeout (* 20 60 1000)) {:local->remote-full-sync true}
|
||||
:priority true)]
|
||||
private-stop-sync-chan {:stop true}
|
||||
private-remote->local-full-sync-chan {:remote->local-full-sync true}
|
||||
private-remote->local-sync-chan {:remote->local true}
|
||||
private-full-sync-chan {:local->remote-full-sync true}
|
||||
private-pause-resume-chan ([v] (if v {:resume true} {:pause true}))
|
||||
remote-change-chan ([v] (println "remote change:" v) {:remote->local v})
|
||||
ratelimit-local-changes-chan ([v]
|
||||
(let [rest-v (util/drain-chan ratelimit-local-changes-chan)
|
||||
vs (cons v rest-v)]
|
||||
(println "local changes:" vs)
|
||||
{:local->remote vs}))
|
||||
(timeout (* 20 60 1000)) {:local->remote-full-sync true}
|
||||
:priority true)]
|
||||
(cond
|
||||
stop
|
||||
(do (util/drain-chan ops-chan)
|
||||
@@ -2813,13 +2815,13 @@
|
||||
(.schedule this ::idle nil nil)))))))
|
||||
|
||||
(local->remote [this {local-changes :local}]
|
||||
;; local-changes:: list of FileChangeEvent
|
||||
;; local-changes:: list of FileChangeEvent
|
||||
(assert (some? local-changes) local-changes)
|
||||
(go
|
||||
(let [distincted-local-changes (distinct-file-change-events local-changes)
|
||||
_ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distincted-local-changes))
|
||||
change-events-partitions
|
||||
(sequence (partition-file-change-events upload-batch-size) distincted-local-changes)
|
||||
(sequence (partition-file-change-events upload-batch-size) distincted-local-changes)
|
||||
_ (put-sync-event! {:event :start
|
||||
:data {:type :local->remote
|
||||
:graph-uuid graph-uuid
|
||||
@@ -2964,15 +2966,15 @@
|
||||
(go
|
||||
(let [r (<! (<list-remote-graphs remoteapi))
|
||||
result
|
||||
(or
|
||||
;; if api call failed, assume this remote graph still exists
|
||||
(instance? ExceptionInfo r)
|
||||
(and
|
||||
(contains? r :Graphs)
|
||||
(->> (:Graphs r)
|
||||
(mapv :GraphUUID)
|
||||
set
|
||||
(#(contains? % local-graph-uuid)))))]
|
||||
(or
|
||||
;; if api call failed, assume this remote graph still exists
|
||||
(instance? ExceptionInfo r)
|
||||
(and
|
||||
(contains? r :Graphs)
|
||||
(->> (:Graphs r)
|
||||
(mapv :GraphUUID)
|
||||
set
|
||||
(#(contains? % local-graph-uuid)))))]
|
||||
|
||||
(when-not result
|
||||
(notification/show! (t :file-sync/graph-deleted) :warning false))
|
||||
@@ -2994,26 +2996,30 @@
|
||||
|
||||
(declare network-online-cursor)
|
||||
|
||||
;; Avoid reentrancy
|
||||
(defonce *sync-entered? (atom false))
|
||||
(defn sync-start
|
||||
[]
|
||||
(go
|
||||
(let [*sync-state (atom (sync-state))
|
||||
current-user-uuid (user/user-uuid)
|
||||
;; put @graph-uuid & get-current-repo together,
|
||||
;; prevent to get older repo dir and current graph-uuid.
|
||||
_ (<! (p->c (persist-var/-load graphs-txid)))
|
||||
[user-uuid graph-uuid txid] @graphs-txid
|
||||
txid (or txid 0)
|
||||
repo (state/get-current-repo)]
|
||||
(when (and (graph-sync-off? repo) @network-online-cursor)
|
||||
(when (and user-uuid graph-uuid txid
|
||||
(when (false? @*sync-entered?)
|
||||
(reset! *sync-entered? true)
|
||||
(go
|
||||
(let [*sync-state (atom (sync-state))
|
||||
current-user-uuid (user/user-uuid)
|
||||
;; put @graph-uuid & get-current-repo together,
|
||||
;; prevent to get older repo dir and current graph-uuid.
|
||||
_ (<! (p->c (persist-var/-load graphs-txid)))
|
||||
[user-uuid graph-uuid txid] @graphs-txid
|
||||
txid (or txid 0)
|
||||
repo (state/get-current-repo)]
|
||||
(when (and repo
|
||||
(graph-sync-off? repo) @network-online-cursor
|
||||
user-uuid graph-uuid txid
|
||||
(user/logged-in?)
|
||||
repo
|
||||
(not (config/demo-graph? repo)))
|
||||
(try
|
||||
(when-some [sm (sync-manager-singleton current-user-uuid graph-uuid
|
||||
(config/get-repo-dir repo) repo
|
||||
txid *sync-state)]
|
||||
(when-let [sm (sync-manager-singleton current-user-uuid graph-uuid
|
||||
(config/get-repo-dir repo) repo
|
||||
txid *sync-state)]
|
||||
(when (check-graph-belong-to-current-user current-user-uuid user-uuid)
|
||||
(if-not (<! (<check-remote-graph-exists graph-uuid)) ; remote graph has been deleted
|
||||
(clear-graphs-txid! repo)
|
||||
@@ -3034,7 +3040,8 @@
|
||||
(offer! full-sync-chan true)))))
|
||||
(catch :default e
|
||||
(prn "Sync start error: ")
|
||||
(log/error :exception e))))))))
|
||||
(log/error :exception e))))
|
||||
(reset! *sync-entered? false)))))
|
||||
|
||||
;;; ### some add-watches
|
||||
|
||||
@@ -3085,7 +3092,7 @@
|
||||
|
||||
;;; add-tap
|
||||
(comment
|
||||
(def *x (atom nil))
|
||||
(add-tap (fn [v] (reset! *x v)))
|
||||
(def *x (atom nil))
|
||||
(add-tap (fn [v] (reset! *x v)))
|
||||
|
||||
)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user