diff --git a/src/main/frontend/fs/sync.cljs b/src/main/frontend/fs/sync.cljs index 77db13b302..270cc797d1 100644 --- a/src/main/frontend/fs/sync.cljs +++ b/src/main/frontend/fs/sync.cljs @@ -36,7 +36,7 @@ ;;; TODO: add some spec validate ;;; TODO: use access-token instead of id-token -;;; TODO: support stop from applying filetxns from remote +;;; TODO: update-remote-file failed when pagename contains whitespace (def ws-addr "wss://og96xf1si7.execute-api.us-east-2.amazonaws.com/production?graphuuid=%s") @@ -526,25 +526,6 @@ (def remoteapi (->RemoteAPI)) -(defn- update-txn [^FileTxnSet filetxnset txn] - (let [{:keys [TXType TXContent]} txn - files (->> (string/split-lines TXContent) - (remove empty?) - (mapv #(remove-user-graph-uuid-prefix %)))] - (case TXType - "update_files" - (reduce #(.update-file ^FileTxnSet %1 %2) filetxnset files) - - "rename_file" - (let [[from to] files] - (.rename-file filetxnset from to)) - - "delete_files" - (reduce #(.delete-file ^FileTxnSet %1 %2) filetxnset files)))) - -(defn update-txns [filetxnset txns] - (reduce update-txn filetxnset txns)) - (defn- apply-filetxns [graph-uuid base-path filetxns] (cond @@ -568,20 +549,22 @@ true r)))))) -(defn- apply-filetxns-partitions [^SyncState sync-state graph-uuid base-path filetxns-partitions repo *txid] +(defn- apply-filetxns-partitions [^SyncState sync-state graph-uuid base-path filetxns-partitions repo *txid *stopped] (go-loop [filetxns-partitions* filetxns-partitions] - (when (seq filetxns-partitions*) - (let [filetxns (first filetxns-partitions*) - paths (map relative-path filetxns) - _ (. sync-state (add-current-remote->local-files! paths)) - r (local-files! paths))] - (if (instance? ExceptionInfo r) - r - (let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))] - (reset! *txid latest-txid) - (update-graphs-txid! latest-txid graph-uuid repo) - (recur (next filetxns-partitions*)))))))) + (if @*stopped + {:stop true} + (when (seq filetxns-partitions*) + (let [filetxns (first filetxns-partitions*) + paths (map relative-path filetxns) + _ (. sync-state (add-current-remote->local-files! paths)) + r (local-files! paths))] + (if (instance? ExceptionInfo r) + r + (let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))] + (reset! *txid latest-txid) + (update-graphs-txid! latest-txid graph-uuid repo) + (recur (next filetxns-partitions*))))))))) (defmulti need-sync-remote? (fn [v] (cond (= :max v) @@ -658,6 +641,7 @@ (defprotocol IRemote->LocalSync + (stop-remote->local! [this]) (sync-remote->local! [this] "return ExceptionInfo when error occurs") (sync-remote->local-all-files! [this] "sync all files, return ExceptionInfo when error occurs")) @@ -668,37 +652,43 @@ (ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN, return chan returning events with rate limited") (sync-local->remote! [this es] "es is a sequence of `FileChangeEvent`, all items have same type.") - (sync-local->remote-all-files! [this stop-chan] "compare all local files to remote ones, sync if not equal. - ensure local-txid = remote-txid before calling this func")) + (sync-local->remote-all-files! [this] "compare all local files to remote ones, sync when not equal. + if local-txid != remote-txid, return {:need-sync-remote true}")) -(deftype Remote->LocalSyncer [graph-uuid base-path repo *txid ^SyncState sync-state ^:mutable local->remote-syncer] +(deftype Remote->LocalSyncer [graph-uuid base-path repo *txid ^SyncState sync-state + ^:mutable local->remote-syncer *stopped] Object (set-local->remote-syncer! [_ s] (set! local->remote-syncer s)) IRemote->LocalSync + (stop-remote->local! [_] (vreset! *stopped true)) (sync-remote->local! [_] (go (let [r - (let [r (partitioned-filetxns 5) + (let [partitioned-filetxns (transduce (diffs->partitioned-filetxns 10) (completing (fn [r i] (conj r (reverse i)))) ;reverse '() (reverse diff-txns))] (prn "partition-filetxns" partitioned-filetxns) ;; TODO: precheck etag - (let [apply-result - (local-all-files! [_] nil)) @@ -716,7 +706,7 @@ (reduce #(when (re-find %2 path) (reduced true)) false regexps)) (deftype Local->RemoteSyncer [graph-uuid base-path repo ^SyncState sync-state - ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan] + ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan ^:mutable stopped] Object (filter-file-change-events-fn [this] (fn [^FileChangeEvent e] (and (instance? FileChangeEvent e) @@ -735,7 +725,9 @@ (get-ignore-files [_] #{#"logseq/graphs-txid.edn$" #"logseq/bak/.*" #"version-files/.*" #"logseq/\.recycle/.*" #"\.DS_Store$"}) (get-monitored-dirs [_] #{#"^assets/" #"^journals/" #"^logseq/" #"^pages/"}) - (stop-local->remote! [_] (async/close! stop-chan)) + (stop-local->remote! [_] + (async/close! stop-chan) + (set! stopped true)) (ratelimit [this from-chan] (let [c (.filtered-chan this 10000) @@ -808,7 +800,7 @@ (println "sync-local->remote unknown:" r*) {:unknown r*})))))))) - (sync-local->remote-all-files! [this stop-chan] + (sync-local->remote-all-files! [this] (go (let [remote-all-files-meta-c (get-remote-all-files-meta remoteapi graph-uuid) local-all-files-meta-c (get-local-all-files-meta rsapi graph-uuid base-path) @@ -827,14 +819,14 @@ (and (not (contains-path? ignore-files path)) (contains-path? monitored-dirs path)))) ;; partition FileChangeEvents - (partition-file-change-events 5)) + (partition-file-change-events 10)) diff-local-files)] (println "[full-sync]" (count (flatten change-events-partitions)) "files need to sync to remote") (loop [es-partitions change-events-partitions] - (if (empty? es-partitions) - {:succ true} - (if (async/poll! stop-chan) - {:stop true} + (if stopped + {:stop true} + (if (empty? es-partitions) + {:succ true} (let [{:keys [succ need-sync-remote unknown] :as r} (remote! this (first es-partitions)))] (cond @@ -892,7 +884,7 @@ ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer full-sync-chan stop-sync-chan remote->local-sync-chan local->remote-sync-chan local-changes-chan ^:mutable ratelimit-local-changes-chan - *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws] + *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws ^:mutable stopped] Object (schedule [this next-state & args] (println "[SyncManager" graph-uuid "]" (and state (name state)) "->" (and next-state (name next-state))) @@ -947,7 +939,7 @@ (full-sync [this] (go (let [{:keys [succ need-sync-remote unknown stop]} - (remote-all-files! local->remote-syncer stop-sync-chan))] + (remote-all-files! local->remote-syncer))] (cond succ (.schedule this ::idle) @@ -964,16 +956,16 @@ (go (if (some-> remote-val :txid (<= @*txid)) (.schedule this ::idle) - (let [{:keys [succ unknown]} + (let [{:keys [succ unknown stop]} (local! remote->local-syncer))] (cond succ (.schedule this (or next-state ::idle)) - + stop + (.schedule this ::stop) unknown - (do - (prn "remote->local err" unknown) - (.schedule this ::idle))))))) + (do (prn "remote->local err" unknown) + (.schedule this ::idle))))))) (local->remote [this [^FileChangeEvent local-change]] (assert (some? local-change)) @@ -993,11 +985,14 @@ (.schedule this ::idle)))))) IStoppable (-stop! [_] - (ws-stop! _*ws) - (offer! stop-sync-chan true) - (stop-local->remote! local->remote-syncer) - (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path]) - (.update-state! sync-state ::stop))) + (when-not stopped + (set! stopped true) + (ws-stop! _*ws) + (offer! stop-sync-chan true) + (stop-local->remote! local->remote-syncer) + (stop-remote->local! remote->local-syncer) + (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path]) + (.update-state! sync-state ::stop)))) (defn sync-manager [graph-uuid base-path repo txid sync-state full-sync-chan stop-sync-chan @@ -1007,15 +1002,15 @@ base-path repo sync-state 20000 - *txid nil (chan)) + *txid nil (chan) false) remote->local-syncer (->Remote->LocalSyncer graph-uuid base-path - repo *txid sync-state nil)] + repo *txid sync-state nil (volatile! false))] (.set-remote->local-syncer! local->remote-syncer remote->local-syncer) (.set-local->remote-syncer! remote->local-syncer local->remote-syncer) (->SyncManager graph-uuid base-path sync-state local->remote-syncer remote->local-syncer full-sync-chan stop-sync-chan - remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil))) + remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil false))) (def full-sync-chan (chan 1))