test commit

This commit is contained in:
rcmerci
2022-06-10 16:23:01 +08:00
parent 9e85937116
commit 5ce9afbac9
2 changed files with 176 additions and 52 deletions

View File

@@ -35,7 +35,7 @@
:aliases {:cljs {:extra-paths ["src/dev-cljs/" "src/test/" "src/electron/"]
:extra-deps {org.clojure/clojurescript {:mvn/version "1.11.54"}
org.clojure/tools.namespace {:mvn/version "0.2.11"}
cider/cider-nrepl {:mvn/version "0.26.0"}
cider/cider-nrepl {:mvn/version "0.28.4"}
org.clojars.knubie/cljs-run-test {:mvn/version "1.0.1"}}
:main-opts ["-m" "shadow.cljs.devtools.cli"]}

View File

@@ -74,20 +74,35 @@
::stop})
(s/def ::path string?)
(s/def ::time t/date?)
(s/def ::remote->local-type #{:delete :update
;; :rename=:delete+:update
})
(s/def ::recent-remote->local-file-item (s/keys :req-un [::remote->local-type ::checksum ::path]))
(s/def ::current-local->remote-files (s/coll-of ::path :kind set?))
(s/def ::current-remote->local-files (s/coll-of ::path :kind set?))
(s/def ::recent-remote->local-files (s/coll-of ::recent-remote->local-file-item :kind set?))
(s/def ::history-item (s/keys :req-un [::path ::time]))
(s/def ::history (s/coll-of ::history-item :kind seq?))
(s/def ::sync-state (s/keys :req-un [::state
::current-local->remote-files
::current-remote->local-files
::queued-local->remote-files
;; Downloading files from remote will trigger filewatcher events,
;; causes unreasonable information in the content of ::queued-local->remote-files,
;; use ::recent-remote->local-files to filter such events
::recent-remote->local-files
::history]))
;; diff
(s/def ::TXId pos-int?)
(s/def ::TXType #{"update_files" "delete_files" "rename_file"})
(s/def ::TXContent string?)
(s/def ::TXContent-to-path string?)
(s/def ::TXContent-from-path (s/or :some string? :none nil?))
(s/def ::TXContent-checksum (s/or :some string? :none nil?))
(s/def ::TXContent-item (s/tuple :spec/TXContent-to-path
:spec/TXContent-from-path
:spec/TXContent-checksum))
(s/def ::TXContent (s/coll-of :spec/TXContent-item))
(s/def ::diff (s/keys :req-un [::TXId ::TXType ::TXContent]))
(s/def ::succ-map #(= {:succ true} %))
@@ -253,15 +268,26 @@
(map js/encodeURIComponent)
(string/join "/")))
(declare FileMetadata FileTxn)
(defprotocol IRelativePath
(-relative-path [this]))
(defprotocol IChecksum
(-checksum [this]))
(extend-protocol IChecksum
FileMetadata
(-checksum [this] (.-etag this))
FileTxn
(-checksum [this] (.-checksum this)))
(defprotocol IStoppable
(-stop! [this]))
(defprotocol IStopped?
(-stopped? [this]))
;from-path, to-path is relative path
(deftype FileTxn [from-path to-path updated? deleted? txid]
(deftype FileTxn [from-path to-path updated? deleted? txid checksum]
Object
(renamed? [_]
(not= from-path to-path))
@@ -286,24 +312,36 @@
(-pr-writer [coll w _opts]
(write-all w "#FileTxn[\"" from-path "\" -> \"" to-path
"\" (updated? " updated? ", renamed? " (.renamed? coll) ", deleted? " deleted?
", txid " txid ")]")))
", txid " txid ", checksum " checksum ")]")))
(defn- diff->filetxns
"convert diff(`get-diff`) to `FileTxn`"
[{:keys [TXId TXType TXContent]}]
(let [update? (= "update_files" TXType)
delete? (= "delete_files" TXType)
update-or-del-type-xf
decoded-tx-content (map (fn [[to-path from-path checksum]]
[(js/decodeURIComponent to-path)
(js/decodeURIComponent from-path)
checksum]) TXContent)
update-xf
(comp
(remove empty?)
(map #(->FileTxn % % update? delete? TXId)))
filepaths (map js/decodeURIComponent (string/split-lines TXContent))]
(case TXType
("update_files" "delete_files")
(sequence update-or-del-type-xf filepaths)
"rename_file"
(list (->FileTxn (first filepaths) (second filepaths) false false TXId)))))
(remove #(or (empty? (first %))
(empty? (last %))))
(map #(->FileTxn (first %) (first %) update? delete? TXId (last %))))
delete-xf
(comp
(remove #(empty? (first %)))
(map #(->FileTxn (first %) (first %) update? delete? TXId nil)))
rename-xf
(comp
(remove #(or (empty? (first %))
(empty? (second %))))
(map #(->FileTxn (second %) (first %) false false TXId nil)))
xf (case TXType
"delete_files" delete-xf
"update_files" update-xf
"rename_file" rename-xf)]
(sequence xf decoded-tx-content)))
(defn- distinct-update-filetxns-xf
"transducer.
@@ -370,22 +408,22 @@
remove-deleted-filetxns-xf
(partition-filetxns n)))
(defn- filepath->diff
[index {:keys [relative-path user-uuid graph-uuid]}]
(defn- filepath+checksum->diff
[index {:keys [relative-path checksum user-uuid graph-uuid]}]
{:post [(s/valid? ::diff %)]}
{:TXId (inc index)
:TXType "update_files"
:TXContent (string/join "/" [user-uuid graph-uuid relative-path])})
:TXContent [[(string/join "/" [user-uuid graph-uuid relative-path]) nil checksum]]})
(defn filepaths->partitioned-filetxns
(defn filepath+checksum-coll->partitioned-filetxns
"transducer.
1. filepaths -> diff
1. filepath+checksum-coll -> diff
2. diffs->partitioned-filetxns"
[n graph-uuid user-uuid]
(comp
(map (fn [p]
{:relative-path p :user-uuid user-uuid :graph-uuid graph-uuid}))
(map-indexed filepath->diff)
{:relative-path (first p) :user-uuid user-uuid :graph-uuid graph-uuid :checksum (second p)}))
(map-indexed filepath+checksum->diff)
(diffs->partitioned-filetxns n)))
@@ -882,6 +920,7 @@
(string/index-of (str (ex-cause r)) "No such file or directory"))
true
r))))))
(defn- assert-local-txid<=remote-txid
[]
(when-let [local-txid (last @graphs-txid)]
@@ -889,33 +928,91 @@
(assert (<= local-txid remote-txid)
[@graphs-txid local-txid remote-txid])))))
(defn- get-local-files-checksum
[base-path relative-paths]
(go
(into {}
(map (juxt #(.-path ^FileMetadata %) #(.-etag ^FileMetadata %)))
(<! (get-local-files-meta rsapi "" base-path relative-paths)))))
(declare sync-state--add-current-local->remote-files
sync-state--add-current-remote->local-files
sync-state--remove-current-local->remote-files
sync-state--remove-current-remote->local-files
sync-state--add-recent-remote->local-files
sync-state--remove-recent-remote->local-files
sync-state--stopped?)
(defn- filetxns=>recent-remote->local-files
[base-path filetxns]
(go
(let [{:keys [update-filetxns delete-filetxns rename-filetxns]}
(group-by (fn [^FileTxn e]
(cond
(.-updated? e) :update-filetxns
(.-deleted? e) :delete-filetxns
(.renamed? e) :rename-filetxns)) filetxns)
files-checksum (<! (get-local-files-checksum
base-path
(map relative-path (concat update-filetxns rename-filetxns))))
update-file-items (map
(fn [filetxn]
(let [path (relative-path filetxn)]
{:remote->local-type :update
:checksum (get files-checksum path)
:path path}))
update-filetxns)
rename-file-items (mapcat
(fn [^FileTxn filetxn]
(let [to-path (relative-path filetxn)
from-path (.-from-path filetxn)]
[{:remote->local-type :update
:checksum (get files-checksum to-path)
:path to-path}
{:remote->local-type :delete
:checksum nil
:path from-path}]))
rename-filetxns)
delete-file-items (map
(fn [filetxn]
(let [path (relative-path filetxn)]
{:remote->local-type :delete
:checksum nil
:path path}))
delete-filetxns)]
(set (concat update-file-items rename-file-items delete-file-items)))))
(defn apply-filetxns-partitions
"won't call update-graph-txid! when *txid is nil"
[*sync-state user-uuid graph-uuid base-path filetxns-partitions repo *txid *stopped before-f after-f]
[*sync-state user-uuid graph-uuid base-path filetxns-partitions repo *txid *stopped]
(assert (some? *sync-state))
(go-loop [filetxns-partitions* filetxns-partitions]
(if @*stopped
{:stop true}
(when (seq filetxns-partitions*)
(let [filetxns (first filetxns-partitions*)
paths (map relative-path filetxns)
_ (when (and before-f (fn? before-f)) (before-f filetxns))
_ (when *sync-state (swap! *sync-state sync-state--add-current-remote->local-files paths))
_ (swap! *sync-state sync-state--add-current-remote->local-files paths)
r (<! (apply-filetxns graph-uuid base-path filetxns))
_ (when *sync-state (swap! *sync-state sync-state--remove-current-remote->local-files paths))]
_ (swap! *sync-state sync-state--remove-current-remote->local-files paths)]
(if (instance? ExceptionInfo r)
r
(let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))]
(when (and after-f (fn? after-f)) (after-f filetxns))
(let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))
recent-remote->local-file-items
(<! (filetxns=>recent-remote->local-files base-path filetxns))]
;; update local-txid
(when *txid
(reset! *txid latest-txid)
(update-graphs-txid! latest-txid graph-uuid user-uuid repo))
;; update recent-remote->local-files
(swap! *sync-state sync-state--add-recent-remote->local-files
recent-remote->local-file-items)
;; remove these recent-remote->local-file-items 5s later
(go (<! (timeout 5000))
(swap! *sync-state sync-state--remove-recent-remote->local-files
recent-remote->local-file-items))
(recur (next filetxns-partitions*)))))))))
(defmulti need-sync-remote? (fn [v] (cond
@@ -979,6 +1076,18 @@
(-pr-writer [_ w _opts]
(write-all w (str {:type type :base-path dir :path path :size (:size stat)}))))
(defn- file-change-event=>recent-remote->local-file-item
[^FileChangeEvent e]
(go
(let [tp (case (.-type e)
("add" "change") :update
"unlink" :delete)
path (relative-path e)]
{:remote->local-type tp
:checksum (if (= tp :delete) nil
(<! (get-local-files-checksum (.-dir e) [path])))
:path path})))
(defn- partition-file-change-events
"return transducer.
partition `FileChangeEvent`s, at most N file-change-events in each partition.
@@ -992,18 +1101,17 @@
(map #(partition-all n %))
cat))
(def local-changes-chan (chan 100))
(def local-changes-chan (chan 1000))
(defn file-watch-handler
"file-watcher callback"
[type {:keys [dir path _content stat] :as _payload}]
(when-let [current-graph (state/get-current-repo)]
(when (string/ends-with? current-graph dir)
(when (some-> (state/get-file-sync-state current-graph)
sync-state--stopped?
not)
(when-not (some-> (state/get-file-sync-state current-graph)
sync-state--stopped?)
(go (>! local-changes-chan (->FileChangeEvent type dir path stat)))))))
;;; ### encryption
(def pwd-map
"graph-uuid->{:pwd xxx :public-key xxx :private-key xxx}"
@@ -1152,6 +1260,7 @@
:current-local->remote-files #{}
:current-remote->local-files #{}
:queued-local->remote-files #{}
:recent-remote->local-files #{}
:history '()})
(defn- sync-state--update-state
@@ -1185,6 +1294,19 @@
{:post [(s/valid? ::sync-state %)]}
(assoc sync-state :queued-local->remote-files nil))
(defn sync-state--add-recent-remote->local-files
[sync-state items]
{:pre [(s/valid? (s/coll-of ::recent-remote->local-file-item) items)]
:post [(s/valid? ::sync-state %)]}
(println :add-recent-remote->local-files items)
(update sync-state :recent-remote->local-files (partial apply conj) items))
(defn sync-state--remove-recent-remote->local-files
[sync-state items]
{:post [(s/valid? ::sync-state %)]}
(println :remove-recent-remote->local-files items)
(update sync-state :recent-remote->local-files set/difference items))
(defn- add-history-items
[history paths now]
(sequence
@@ -1236,17 +1358,18 @@
Object
(set-local->remote-syncer! [_ s] (set! local->remote-syncer s))
(sync-files-remote->local!
[_ relative-filepaths latest-txid]
[_ relative-filepath+checksum-coll latest-txid]
(go
(let [partitioned-filetxns
(sequence (filepaths->partitioned-filetxns 10 graph-uuid user-uuid)
relative-filepaths)
(sequence (filepath+checksum-coll->partitioned-filetxns
10 graph-uuid user-uuid)
relative-filepath+checksum-coll)
r
(if (empty? (flatten partitioned-filetxns))
{:succ true}
(<! (apply-filetxns-partitions
*sync-state user-uuid graph-uuid base-path partitioned-filetxns repo
nil *stopped nil nil)))]
nil *stopped)))]
(cond
(instance? ExceptionInfo r)
{:unknown r}
@@ -1286,7 +1409,7 @@
{:succ true})
(<! (apply-filetxns-partitions
*sync-state user-uuid graph-uuid base-path
partitioned-filetxns repo *txid *stopped nil nil)))))))))]
partitioned-filetxns repo *txid *stopped)))))))))]
(cond
(instance? ExceptionInfo r)
{:unknown r}
@@ -1313,7 +1436,7 @@
(println "[full-sync(remote->local)]"
(count sorted-diff-remote-files) "files need to sync")
(<! (.sync-files-remote->local!
this (map (comp js/encodeURIComponent relative-path) sorted-diff-remote-files)
this (map (juxt (comp js/encodeURIComponent relative-path) -checksum) sorted-diff-remote-files)
latest-txid))))))
(defn- file-changed?
@@ -1357,15 +1480,16 @@
^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped]
Object
(filter-file-change-events-fn [_]
(fn [^FileChangeEvent e] (and (instance? FileChangeEvent e)
(string/starts-with? (.-dir e) base-path)
(not (contains-path? ignore-files (relative-path e)))
(contains-path? monitored-dirs (relative-path e)))))
(filtered-chan
;; "check base-path"
[this n]
(chan n (filter (.filter-file-change-events-fn this))))
(fn [^FileChangeEvent e]
(go (and (instance? FileChangeEvent e)
(string/starts-with? (.-dir e) base-path)
(not (contains-path? ignore-files (relative-path e)))
(contains-path? monitored-dirs (relative-path e))
(not
(let [r (contains? (:recent-remote->local-files @*sync-state)
(<! (file-change-event=>recent-remote->local-file-item e)))]
(println :filter-file-change-events-fn e)
r))))))
(set-remote->local-syncer! [_ s] (set! remote->local-syncer s))
@@ -1375,15 +1499,15 @@
(vreset! *stopped true))
(ratelimit [this from-chan]
(let [filter-e-fn (.filter-file-change-events-fn this)]
(let [fast-filter-e-fn (.filter-file-change-events-fn this)]
(util/ratelimit
from-chan rate
:filter-fn
(fn [e]
(and (filter-e-fn e)
(let [e-path [(relative-path e)]]
(swap! *sync-state sync-state--add-queued-local->remote-files e-path)
(go
(go
(and (<! (fast-filter-e-fn e))
(let [e-path [(relative-path e)]]
(swap! *sync-state sync-state--add-queued-local->remote-files e-path)
(let [v (<! (filter-local-changes-pred e base-path graph-uuid))]
(when-not v
(swap! *sync-state sync-state--remove-queued-local->remote-files e-path))