From 5ce9afbac9a84dd1247c82ecd632cc98d3f2a672 Mon Sep 17 00:00:00 2001 From: rcmerci Date: Fri, 10 Jun 2022 16:23:01 +0800 Subject: [PATCH] test commit --- deps.edn | 2 +- src/main/frontend/fs/sync.cljs | 226 +++++++++++++++++++++++++-------- 2 files changed, 176 insertions(+), 52 deletions(-) diff --git a/deps.edn b/deps.edn index d882f4b9b1..715da614f0 100755 --- a/deps.edn +++ b/deps.edn @@ -35,7 +35,7 @@ :aliases {:cljs {:extra-paths ["src/dev-cljs/" "src/test/" "src/electron/"] :extra-deps {org.clojure/clojurescript {:mvn/version "1.11.54"} org.clojure/tools.namespace {:mvn/version "0.2.11"} - cider/cider-nrepl {:mvn/version "0.26.0"} + cider/cider-nrepl {:mvn/version "0.28.4"} org.clojars.knubie/cljs-run-test {:mvn/version "1.0.1"}} :main-opts ["-m" "shadow.cljs.devtools.cli"]} diff --git a/src/main/frontend/fs/sync.cljs b/src/main/frontend/fs/sync.cljs index 55e7f855d4..5b7a2c182f 100644 --- a/src/main/frontend/fs/sync.cljs +++ b/src/main/frontend/fs/sync.cljs @@ -74,20 +74,35 @@ ::stop}) (s/def ::path string?) (s/def ::time t/date?) +(s/def ::remote->local-type #{:delete :update + ;; :rename=:delete+:update + }) +(s/def ::recent-remote->local-file-item (s/keys :req-un [::remote->local-type ::checksum ::path])) (s/def ::current-local->remote-files (s/coll-of ::path :kind set?)) (s/def ::current-remote->local-files (s/coll-of ::path :kind set?)) +(s/def ::recent-remote->local-files (s/coll-of ::recent-remote->local-file-item :kind set?)) (s/def ::history-item (s/keys :req-un [::path ::time])) (s/def ::history (s/coll-of ::history-item :kind seq?)) (s/def ::sync-state (s/keys :req-un [::state ::current-local->remote-files ::current-remote->local-files ::queued-local->remote-files + ;; Downloading files from remote will trigger filewatcher events, + ;; causes unreasonable information in the content of ::queued-local->remote-files, + ;; use ::recent-remote->local-files to filter such events + ::recent-remote->local-files ::history])) ;; diff (s/def ::TXId pos-int?) (s/def ::TXType #{"update_files" "delete_files" "rename_file"}) -(s/def ::TXContent string?) +(s/def ::TXContent-to-path string?) +(s/def ::TXContent-from-path (s/or :some string? :none nil?)) +(s/def ::TXContent-checksum (s/or :some string? :none nil?)) +(s/def ::TXContent-item (s/tuple :spec/TXContent-to-path + :spec/TXContent-from-path + :spec/TXContent-checksum)) +(s/def ::TXContent (s/coll-of :spec/TXContent-item)) (s/def ::diff (s/keys :req-un [::TXId ::TXType ::TXContent])) (s/def ::succ-map #(= {:succ true} %)) @@ -253,15 +268,26 @@ (map js/encodeURIComponent) (string/join "/"))) +(declare FileMetadata FileTxn) + (defprotocol IRelativePath (-relative-path [this])) +(defprotocol IChecksum + (-checksum [this])) + +(extend-protocol IChecksum + FileMetadata + (-checksum [this] (.-etag this)) + FileTxn + (-checksum [this] (.-checksum this))) + (defprotocol IStoppable (-stop! [this])) (defprotocol IStopped? (-stopped? [this])) ;from-path, to-path is relative path -(deftype FileTxn [from-path to-path updated? deleted? txid] +(deftype FileTxn [from-path to-path updated? deleted? txid checksum] Object (renamed? [_] (not= from-path to-path)) @@ -286,24 +312,36 @@ (-pr-writer [coll w _opts] (write-all w "#FileTxn[\"" from-path "\" -> \"" to-path "\" (updated? " updated? ", renamed? " (.renamed? coll) ", deleted? " deleted? - ", txid " txid ")]"))) + ", txid " txid ", checksum " checksum ")]"))) (defn- diff->filetxns "convert diff(`get-diff`) to `FileTxn`" [{:keys [TXId TXType TXContent]}] (let [update? (= "update_files" TXType) delete? (= "delete_files" TXType) - update-or-del-type-xf + decoded-tx-content (map (fn [[to-path from-path checksum]] + [(js/decodeURIComponent to-path) + (js/decodeURIComponent from-path) + checksum]) TXContent) + update-xf (comp - (remove empty?) - (map #(->FileTxn % % update? delete? TXId))) - filepaths (map js/decodeURIComponent (string/split-lines TXContent))] - (case TXType - ("update_files" "delete_files") - (sequence update-or-del-type-xf filepaths) - - "rename_file" - (list (->FileTxn (first filepaths) (second filepaths) false false TXId))))) + (remove #(or (empty? (first %)) + (empty? (last %)))) + (map #(->FileTxn (first %) (first %) update? delete? TXId (last %)))) + delete-xf + (comp + (remove #(empty? (first %))) + (map #(->FileTxn (first %) (first %) update? delete? TXId nil))) + rename-xf + (comp + (remove #(or (empty? (first %)) + (empty? (second %)))) + (map #(->FileTxn (second %) (first %) false false TXId nil))) + xf (case TXType + "delete_files" delete-xf + "update_files" update-xf + "rename_file" rename-xf)] + (sequence xf decoded-tx-content))) (defn- distinct-update-filetxns-xf "transducer. @@ -370,22 +408,22 @@ remove-deleted-filetxns-xf (partition-filetxns n))) -(defn- filepath->diff - [index {:keys [relative-path user-uuid graph-uuid]}] +(defn- filepath+checksum->diff + [index {:keys [relative-path checksum user-uuid graph-uuid]}] {:post [(s/valid? ::diff %)]} {:TXId (inc index) :TXType "update_files" - :TXContent (string/join "/" [user-uuid graph-uuid relative-path])}) + :TXContent [[(string/join "/" [user-uuid graph-uuid relative-path]) nil checksum]]}) -(defn filepaths->partitioned-filetxns +(defn filepath+checksum-coll->partitioned-filetxns "transducer. - 1. filepaths -> diff + 1. filepath+checksum-coll -> diff 2. diffs->partitioned-filetxns" [n graph-uuid user-uuid] (comp (map (fn [p] - {:relative-path p :user-uuid user-uuid :graph-uuid graph-uuid})) - (map-indexed filepath->diff) + {:relative-path (first p) :user-uuid user-uuid :graph-uuid graph-uuid :checksum (second p)})) + (map-indexed filepath+checksum->diff) (diffs->partitioned-filetxns n))) @@ -882,6 +920,7 @@ (string/index-of (str (ex-cause r)) "No such file or directory")) true r)))))) + (defn- assert-local-txid<=remote-txid [] (when-let [local-txid (last @graphs-txid)] @@ -889,33 +928,91 @@ (assert (<= local-txid remote-txid) [@graphs-txid local-txid remote-txid]))))) +(defn- get-local-files-checksum + [base-path relative-paths] + (go + (into {} + (map (juxt #(.-path ^FileMetadata %) #(.-etag ^FileMetadata %))) + (remote-files sync-state--add-current-remote->local-files sync-state--remove-current-local->remote-files sync-state--remove-current-remote->local-files + sync-state--add-recent-remote->local-files + sync-state--remove-recent-remote->local-files sync-state--stopped?) +(defn- filetxns=>recent-remote->local-files + [base-path filetxns] + (go + (let [{:keys [update-filetxns delete-filetxns rename-filetxns]} + (group-by (fn [^FileTxn e] + (cond + (.-updated? e) :update-filetxns + (.-deleted? e) :delete-filetxns + (.renamed? e) :rename-filetxns)) filetxns) + files-checksum (local-type :update + :checksum (get files-checksum path) + :path path})) + update-filetxns) + rename-file-items (mapcat + (fn [^FileTxn filetxn] + (let [to-path (relative-path filetxn) + from-path (.-from-path filetxn)] + [{:remote->local-type :update + :checksum (get files-checksum to-path) + :path to-path} + {:remote->local-type :delete + :checksum nil + :path from-path}])) + rename-filetxns) + delete-file-items (map + (fn [filetxn] + (let [path (relative-path filetxn)] + {:remote->local-type :delete + :checksum nil + :path path})) + delete-filetxns)] + (set (concat update-file-items rename-file-items delete-file-items))))) (defn apply-filetxns-partitions "won't call update-graph-txid! when *txid is nil" - [*sync-state user-uuid graph-uuid base-path filetxns-partitions repo *txid *stopped before-f after-f] + [*sync-state user-uuid graph-uuid base-path filetxns-partitions repo *txid *stopped] + (assert (some? *sync-state)) (go-loop [filetxns-partitions* filetxns-partitions] (if @*stopped {:stop true} (when (seq filetxns-partitions*) (let [filetxns (first filetxns-partitions*) paths (map relative-path filetxns) - _ (when (and before-f (fn? before-f)) (before-f filetxns)) - _ (when *sync-state (swap! *sync-state sync-state--add-current-remote->local-files paths)) + _ (swap! *sync-state sync-state--add-current-remote->local-files paths) r (local-files paths))] + _ (swap! *sync-state sync-state--remove-current-remote->local-files paths)] (if (instance? ExceptionInfo r) r - (let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))] - (when (and after-f (fn? after-f)) (after-f filetxns)) + (let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns)) + recent-remote->local-file-items + (recent-remote->local-files base-path filetxns))] + ;; update local-txid (when *txid (reset! *txid latest-txid) (update-graphs-txid! latest-txid graph-uuid user-uuid repo)) + ;; update recent-remote->local-files + (swap! *sync-state sync-state--add-recent-remote->local-files + recent-remote->local-file-items) + ;; remove these recent-remote->local-file-items 5s later + (go (local-files + recent-remote->local-file-items)) + (recur (next filetxns-partitions*))))))))) (defmulti need-sync-remote? (fn [v] (cond @@ -979,6 +1076,18 @@ (-pr-writer [_ w _opts] (write-all w (str {:type type :base-path dir :path path :size (:size stat)})))) +(defn- file-change-event=>recent-remote->local-file-item + [^FileChangeEvent e] + (go + (let [tp (case (.-type e) + ("add" "change") :update + "unlink" :delete) + path (relative-path e)] + {:remote->local-type tp + :checksum (if (= tp :delete) nil + ( (state/get-file-sync-state current-graph) - sync-state--stopped? - not) + (when-not (some-> (state/get-file-sync-state current-graph) + sync-state--stopped?) (go (>! local-changes-chan (->FileChangeEvent type dir path stat))))))) - ;;; ### encryption (def pwd-map "graph-uuid->{:pwd xxx :public-key xxx :private-key xxx}" @@ -1152,6 +1260,7 @@ :current-local->remote-files #{} :current-remote->local-files #{} :queued-local->remote-files #{} + :recent-remote->local-files #{} :history '()}) (defn- sync-state--update-state @@ -1185,6 +1294,19 @@ {:post [(s/valid? ::sync-state %)]} (assoc sync-state :queued-local->remote-files nil)) +(defn sync-state--add-recent-remote->local-files + [sync-state items] + {:pre [(s/valid? (s/coll-of ::recent-remote->local-file-item) items)] + :post [(s/valid? ::sync-state %)]} + (println :add-recent-remote->local-files items) + (update sync-state :recent-remote->local-files (partial apply conj) items)) + +(defn sync-state--remove-recent-remote->local-files + [sync-state items] + {:post [(s/valid? ::sync-state %)]} + (println :remove-recent-remote->local-files items) + (update sync-state :recent-remote->local-files set/difference items)) + (defn- add-history-items [history paths now] (sequence @@ -1236,17 +1358,18 @@ Object (set-local->remote-syncer! [_ s] (set! local->remote-syncer s)) (sync-files-remote->local! - [_ relative-filepaths latest-txid] + [_ relative-filepath+checksum-coll latest-txid] (go (let [partitioned-filetxns - (sequence (filepaths->partitioned-filetxns 10 graph-uuid user-uuid) - relative-filepaths) + (sequence (filepath+checksum-coll->partitioned-filetxns + 10 graph-uuid user-uuid) + relative-filepath+checksum-coll) r (if (empty? (flatten partitioned-filetxns)) {:succ true} (local)]" (count sorted-diff-remote-files) "files need to sync") (local! - this (map (comp js/encodeURIComponent relative-path) sorted-diff-remote-files) + this (map (juxt (comp js/encodeURIComponent relative-path) -checksum) sorted-diff-remote-files) latest-txid)))))) (defn- file-changed? @@ -1357,15 +1480,16 @@ ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped] Object (filter-file-change-events-fn [_] - (fn [^FileChangeEvent e] (and (instance? FileChangeEvent e) - (string/starts-with? (.-dir e) base-path) - (not (contains-path? ignore-files (relative-path e))) - (contains-path? monitored-dirs (relative-path e))))) - - (filtered-chan - ;; "check base-path" - [this n] - (chan n (filter (.filter-file-change-events-fn this)))) + (fn [^FileChangeEvent e] + (go (and (instance? FileChangeEvent e) + (string/starts-with? (.-dir e) base-path) + (not (contains-path? ignore-files (relative-path e))) + (contains-path? monitored-dirs (relative-path e)) + (not + (let [r (contains? (:recent-remote->local-files @*sync-state) + (recent-remote->local-file-item e)))] + (println :filter-file-change-events-fn e) + r)))))) (set-remote->local-syncer! [_ s] (set! remote->local-syncer s)) @@ -1375,15 +1499,15 @@ (vreset! *stopped true)) (ratelimit [this from-chan] - (let [filter-e-fn (.filter-file-change-events-fn this)] + (let [fast-filter-e-fn (.filter-file-change-events-fn this)] (util/ratelimit from-chan rate :filter-fn (fn [e] - (and (filter-e-fn e) - (let [e-path [(relative-path e)]] - (swap! *sync-state sync-state--add-queued-local->remote-files e-path) - (go + (go + (and (remote-files e-path) (let [v (remote-files e-path))