mirror of
https://github.com/logseq/logseq.git
synced 2026-05-25 05:04:24 +00:00
fix(sync): cap snapshot upload payload to 1MB
This commit is contained in:
@@ -21,7 +21,7 @@
|
||||
(not (ldb/journal? entity))
|
||||
(not (:logseq.property/built-in? entity))
|
||||
(not (= :logseq.property/query (:db/ident (:logseq.property/created-from-property entity)))))))
|
||||
(d/datom e a (str "debug " e) t)
|
||||
(d/datom e a (str "debug " e " " (apply str (repeat (count v) "x"))) t)
|
||||
|
||||
:else
|
||||
(d/datom e a v t))))))
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
|
||||
(def upload-kvs-batch-size 500)
|
||||
(def upload-prepare-datoms-batch-size 100000)
|
||||
(def snapshot-upload-max-bytes 1000000)
|
||||
(def snapshot-frame-header-bytes 4)
|
||||
(def snapshot-content-type "application/transit+json")
|
||||
(def snapshot-content-encoding "gzip")
|
||||
(def snapshot-text-encoder (js/TextEncoder.))
|
||||
@@ -63,6 +65,44 @@
|
||||
[rows]
|
||||
(.encode snapshot-text-encoder (sqlite-util/write-transit-str rows)))
|
||||
|
||||
(defn- snapshot-rows-byte-length
|
||||
[rows]
|
||||
(+ snapshot-frame-header-bytes
|
||||
(.-byteLength ^js (encode-snapshot-rows rows))))
|
||||
|
||||
(defn- max-prefix-rows-within-bytes
|
||||
[rows max-bytes]
|
||||
(let [rows-count (count rows)]
|
||||
(loop [low 1
|
||||
high rows-count
|
||||
best 0]
|
||||
(if (> low high)
|
||||
best
|
||||
(let [mid (quot (+ low high) 2)
|
||||
rows' (subvec rows 0 mid)
|
||||
size (snapshot-rows-byte-length rows')]
|
||||
(if (<= size max-bytes)
|
||||
(recur (inc mid) high mid)
|
||||
(recur low (dec mid) best)))))))
|
||||
|
||||
(defn- split-snapshot-rows-by-max-bytes
|
||||
[rows max-bytes]
|
||||
(loop [remaining rows
|
||||
batches []]
|
||||
(if (empty? remaining)
|
||||
batches
|
||||
(let [prefix-count (max-prefix-rows-within-bytes remaining max-bytes)]
|
||||
(if (pos? prefix-count)
|
||||
(let [batch (subvec remaining 0 prefix-count)
|
||||
remaining' (subvec remaining prefix-count)]
|
||||
(recur remaining' (conj batches batch)))
|
||||
(let [row (first remaining)
|
||||
row-size (snapshot-rows-byte-length [row])]
|
||||
(fail-fast :db-sync/snapshot-row-too-large
|
||||
{:max-bytes max-bytes
|
||||
:row-size row-size
|
||||
:addr (first row)})))))))
|
||||
|
||||
(defn frame-bytes
|
||||
[^js data]
|
||||
(let [len (.-byteLength data)
|
||||
@@ -98,6 +138,30 @@
|
||||
{:body buf :encoding snapshot-content-encoding})
|
||||
(p/resolved {:body frame :encoding nil}))))
|
||||
|
||||
(defn- snapshot-upload-url
|
||||
[base graph-id reset? finished? checksum]
|
||||
(str base "/sync/" graph-id "/snapshot/upload?reset="
|
||||
(if reset? "true" "false")
|
||||
"&finished="
|
||||
(if finished? "true" "false")
|
||||
(when finished?
|
||||
(str "&checksum=" (js/encodeURIComponent checksum)))))
|
||||
|
||||
(defn- <upload-snapshot-rows-batches!
|
||||
[rows-batches {:keys [base graph-id first-batch? finished? checksum auth-fetch-f]}]
|
||||
(p/loop [remaining rows-batches
|
||||
first-request? first-batch?]
|
||||
(if-let [rows-batch (first remaining)]
|
||||
(let [last-request? (nil? (next remaining))
|
||||
finished-request? (and finished? last-request?)
|
||||
upload-url (snapshot-upload-url base graph-id first-request? finished-request? checksum)]
|
||||
(p/let [{:keys [body encoding]} (<snapshot-upload-body rows-batch)
|
||||
headers (cond-> {"content-type" snapshot-content-type}
|
||||
(string? encoding) (assoc "content-encoding" encoding))
|
||||
_ (auth-fetch-f upload-url headers body)]
|
||||
(p/recur (next remaining) false)))
|
||||
nil)))
|
||||
|
||||
(defn set-graph-sync-metadata!
|
||||
[repo graph-e2ee?]
|
||||
(when-let [conn (worker-state/get-datascript-conn repo)]
|
||||
@@ -186,25 +250,38 @@
|
||||
rows* (normalize-snapshot-rows rows)
|
||||
loaded' (+ loaded (count rows*))
|
||||
finished? (= loaded' total-rows)
|
||||
upload-url (str base "/sync/" graph-id "/snapshot/upload?reset="
|
||||
(if first-batch? "true" "false")
|
||||
"&finished="
|
||||
(if finished? "true" "false")
|
||||
(when finished?
|
||||
(str "&checksum=" (js/encodeURIComponent snapshot-checksum))))]
|
||||
(p/let [{:keys [body encoding]} (<snapshot-upload-body rows*)
|
||||
headers (cond-> {"content-type" snapshot-content-type}
|
||||
(string? encoding) (assoc "content-encoding" encoding))
|
||||
_ (sync-transport/fetch-json
|
||||
(fn [opts]
|
||||
(sync-auth/with-auth-headers
|
||||
#(sync-auth/auth-headers (worker-state/get-id-token))
|
||||
opts))
|
||||
upload-url
|
||||
{:method "POST"
|
||||
:headers headers
|
||||
:body body}
|
||||
{:response-schema :sync/snapshot-upload})]
|
||||
row-batches (split-snapshot-rows-by-max-bytes rows* snapshot-upload-max-bytes)
|
||||
batch-payloads
|
||||
(mapv (fn [rows-batch]
|
||||
{:rows (count rows-batch)
|
||||
:payload-bytes (snapshot-rows-byte-length rows-batch)})
|
||||
row-batches)]
|
||||
(prn :db-sync/upload-kvs-batch
|
||||
{:total-kvs-rows total-rows
|
||||
:fetched-kvs-rows (count rows*)
|
||||
:upload-kvs-batch-size upload-kvs-batch-size
|
||||
:split-batch-count (count row-batches)
|
||||
:split-batches batch-payloads
|
||||
:max-request-bytes snapshot-upload-max-bytes})
|
||||
(p/let [_ (<upload-snapshot-rows-batches!
|
||||
row-batches
|
||||
{:base base
|
||||
:graph-id graph-id
|
||||
:first-batch? first-batch?
|
||||
:finished? finished?
|
||||
:checksum snapshot-checksum
|
||||
:auth-fetch-f
|
||||
(fn [upload-url headers body]
|
||||
(sync-transport/fetch-json
|
||||
(fn [opts]
|
||||
(sync-auth/with-auth-headers
|
||||
#(sync-auth/auth-headers (worker-state/get-id-token))
|
||||
opts))
|
||||
upload-url
|
||||
{:method "POST"
|
||||
:headers headers
|
||||
:body body}
|
||||
{:response-schema :sync/snapshot-upload}))})]
|
||||
(update-progress {:sub-type :upload-progress
|
||||
:message (str "Uploading " loaded' "/" total-rows)})
|
||||
(p/recur max-addr false loaded'))))))
|
||||
|
||||
74
src/test/frontend/worker/sync/upload_test.cljs
Normal file
74
src/test/frontend/worker/sync/upload_test.cljs
Normal file
@@ -0,0 +1,74 @@
|
||||
(ns frontend.worker.sync.upload-test
|
||||
(:require [cljs.test :refer [async deftest is]]
|
||||
[frontend.worker.sync.upload :as sync-upload]
|
||||
[promesa.core :as p]
|
||||
[clojure.string :as string]))
|
||||
|
||||
(deftest split-snapshot-rows-by-max-bytes-splits-rows-into-byte-capped-batches-test
|
||||
(let [sizes {:a 4
|
||||
:b 4
|
||||
:c 4
|
||||
:d 4}
|
||||
rows [[:a] [:b] [:c] [:d]]]
|
||||
(with-redefs [sync-upload/snapshot-rows-byte-length
|
||||
(fn [rows']
|
||||
(reduce + (map (fn [[addr]] (get sizes addr 0)) rows')))]
|
||||
(is (= [[[:a] [:b]]
|
||||
[[:c] [:d]]]
|
||||
(#'sync-upload/split-snapshot-rows-by-max-bytes rows 8))))))
|
||||
|
||||
(deftest split-snapshot-rows-by-max-bytes-fails-fast-for-oversized-single-row-test
|
||||
(let [sizes {:ok 3
|
||||
:too-big 11}
|
||||
rows [[:ok] [:too-big]]]
|
||||
(with-redefs [sync-upload/snapshot-rows-byte-length
|
||||
(fn [rows']
|
||||
(reduce + (map (fn [[addr]] (get sizes addr 0)) rows')))]
|
||||
(try
|
||||
(#'sync-upload/split-snapshot-rows-by-max-bytes rows 10)
|
||||
(is false "expected snapshot row too large error")
|
||||
(catch :default error
|
||||
(let [data (ex-data error)]
|
||||
(is (= "snapshot-row-too-large" (ex-message error)))
|
||||
(is (= 10 (:max-bytes data)))
|
||||
(is (= 11 (:row-size data)))
|
||||
(is (= :too-big (:addr data)))))))))
|
||||
|
||||
(deftest upload-snapshot-rows-batches-sets-reset-and-finished-flags-correctly-test
|
||||
(async done
|
||||
(let [calls* (atom [])
|
||||
rows-batches [[[1 "a" nil]]
|
||||
[[2 "b" nil]]
|
||||
[[3 "c" nil]]]]
|
||||
(-> (p/with-redefs [sync-upload/<snapshot-upload-body
|
||||
(fn [rows]
|
||||
(p/resolved {:body rows
|
||||
:encoding nil}))]
|
||||
(#'sync-upload/<upload-snapshot-rows-batches!
|
||||
rows-batches
|
||||
{:base "https://sync.example.test"
|
||||
:graph-id "graph-1"
|
||||
:first-batch? true
|
||||
:finished? true
|
||||
:checksum "abc+123="
|
||||
:auth-fetch-f
|
||||
(fn [url headers body]
|
||||
(swap! calls* conj {:url url
|
||||
:headers headers
|
||||
:body body})
|
||||
(p/resolved true))}))
|
||||
(p/then
|
||||
(fn [_]
|
||||
(is (= 3 (count @calls*)))
|
||||
(is (string/includes? (:url (nth @calls* 0)) "reset=true"))
|
||||
(is (string/includes? (:url (nth @calls* 0)) "finished=false"))
|
||||
(is (string/includes? (:url (nth @calls* 1)) "reset=false"))
|
||||
(is (string/includes? (:url (nth @calls* 1)) "finished=false"))
|
||||
(is (string/includes? (:url (nth @calls* 2)) "reset=false"))
|
||||
(is (string/includes? (:url (nth @calls* 2)) "finished=true"))
|
||||
(is (string/includes? (:url (nth @calls* 2)) "checksum=abc%2B123%3D"))
|
||||
(done)))
|
||||
(p/catch
|
||||
(fn [error]
|
||||
(is false (str error))
|
||||
(done)))))))
|
||||
Reference in New Issue
Block a user