diff --git a/deps/db-sync/src/logseq/db_sync/common.cljs b/deps/db-sync/src/logseq/db_sync/common.cljs index e26349dd24..e86bc72e05 100644 --- a/deps/db-sync/src/logseq/db_sync/common.cljs +++ b/deps/db-sync/src/logseq/db_sync/common.cljs @@ -10,7 +10,7 @@ #js {"Access-Control-Allow-Origin" "*" "Access-Control-Allow-Headers" "content-type,content-encoding,authorization,x-amz-meta-checksum,x-amz-meta-type" "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD" - "Access-Control-Expose-Headers" "content-type,content-encoding,cache-control,x-asset-type"}) + "Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type"}) (defn json-response ([data] (json-response data 200)) diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs index 01eacc35b8..47d280e16c 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs @@ -6,6 +6,26 @@ (def ^:private max-asset-size (* 100 1024 1024)) +(defn- parse-size + [size] + (cond + (number? size) size + (string? size) (let [n (js/parseInt size 10)] + (when-not (js/isNaN n) + n)) + :else nil)) + +(defn- maybe-fixed-length-body + [body size] + (if (and (number? size) + (exists? js/FixedLengthStream) + (some? body) + (fn? (.-pipeTo body))) + (let [^js fixed (js/FixedLengthStream. size)] + (.catch (.pipeTo body (.-writable fixed)) (fn [_] nil)) + (.-readable fixed)) + body)) + (defn parse-asset-path [path] (let [prefix "/assets/"] (when (string/starts-with? path prefix) @@ -46,8 +66,18 @@ "application/octet-stream") content-encoding (.-contentEncoding metadata) cache-control (.-cacheControl metadata) + size (parse-size (or (.-size obj) + (some-> (.-body obj) .-byteLength))) + content-length (cond + (number? size) (str size) + (string? size) size + :else nil) + body (maybe-fixed-length-body (.-body obj) size) headers (cond-> {"content-type" content-type "x-asset-type" asset-type} + (and (string? content-length) + (pos? (.-length content-length))) + (assoc "content-length" content-length) (and (string? content-encoding) (not= content-encoding "null") (pos? (.-length content-encoding))) @@ -57,7 +87,7 @@ (assoc "cache-control" cache-control) true (bean/->js))] - (js/Response. (.-body obj) + (js/Response. body #js {:status 200 :headers (js/Object.assign headers diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs index 5688287072..1f4828e5bd 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs @@ -86,6 +86,17 @@ (.pipeThrough stream (js/DecompressionStream. "gzip")) stream)) +(defn- maybe-compress-stream [stream] + (if (exists? js/CompressionStream) + (.pipeThrough stream (js/CompressionStream. snapshot-content-encoding)) + stream)) + +(defn- uint8 [data] (cond (instance? js/Uint8Array data) data @@ -303,23 +314,31 @@ :else (p/let [snapshot-id (str (random-uuid)) key (snapshot-key graph-id snapshot-id) + use-compression? (exists? js/CompressionStream) + content-encoding (when use-compression? snapshot-content-encoding) stream (snapshot-export-stream self) + stream (if use-compression? + (maybe-compress-stream stream) + stream) multipart? (and (some? (.-createMultipartUpload bucket)) (fn? (.-createMultipartUpload bucket))) opts #js {:httpMetadata #js {:contentType snapshot-content-type - :contentEncoding nil + :contentEncoding content-encoding :cacheControl snapshot-cache-control} :customMetadata #js {:purpose "snapshot" :created-at (str (common/now-ms))}} _ (if multipart? (upload-multipart! bucket key stream opts) - (p/let [body (snapshot-export-fixed-length self)] - (.put bucket key body opts))) + (if use-compression? + (p/let [body ( (p/let [resp (assets/handle request env)] + (is (= 200 (.-status resp))) + (is (= "4" (.get (.-headers resp) "content-length")))) + (p/then (fn [] + (done))) + (p/catch (fn [error] + (is false (str error)) + (done))))))) diff --git a/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs new file mode 100644 index 0000000000..a6749da200 --- /dev/null +++ b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs @@ -0,0 +1,78 @@ +(ns logseq.db-sync.worker-handler-sync-test + (:require [cljs.test :refer [async deftest is]] + [logseq.db-sync.worker.handler.sync :as sync-handler] + [promesa.core :as p])) + +(defn- empty-sql [] + #js {:exec (fn [& _] #js [])}) + +(defn- request-url [] + (let [request (js/Request. "http://localhost/sync/graph-1/snapshot/download?graph-id=graph-1" + #js {:method "GET"})] + {:request request + :url (js/URL. (.-url request))})) + +(defn- passthrough-compression-stream-constructor [] + (js* "function(_format){ return new TransformStream(); }")) + +(deftest snapshot-download-uses-gzip-encoding-when-compression-supported-test + (async done + (let [put-call (atom nil) + bucket #js {:put (fn [key body opts] + (reset! put-call {:key key :body body :opts opts}) + (js/Promise.resolve #js {:ok true}))} + self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket} + :sql (empty-sql)} + {:keys [request url]} (request-url) + original-compression-stream (.-CompressionStream js/globalThis) + restore! #(aset js/globalThis "CompressionStream" original-compression-stream)] + (aset js/globalThis + "CompressionStream" + (passthrough-compression-stream-constructor)) + (-> (p/let [resp (sync-handler/handle {:self self + :request request + :url url + :route {:handler :sync/snapshot-download}}) + text (.text resp) + body (js->clj (js/JSON.parse text) :keywordize-keys true) + http-metadata (aget (:opts @put-call) "httpMetadata")] + (is (= 200 (.-status resp))) + (is (= "gzip" (:content-encoding body))) + (is (= "gzip" (aget http-metadata "contentEncoding")))) + (p/then (fn [] + (restore!) + (done))) + (p/catch (fn [error] + (restore!) + (is false (str error)) + (done))))))) + +(deftest snapshot-download-falls-back-to-uncompressed-when-compression-unsupported-test + (async done + (let [put-call (atom nil) + bucket #js {:put (fn [key body opts] + (reset! put-call {:key key :body body :opts opts}) + (js/Promise.resolve #js {:ok true}))} + self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket} + :sql (empty-sql)} + {:keys [request url]} (request-url) + original-compression-stream (.-CompressionStream js/globalThis) + restore! #(aset js/globalThis "CompressionStream" original-compression-stream)] + (aset js/globalThis "CompressionStream" js/undefined) + (-> (p/let [resp (sync-handler/handle {:self self + :request request + :url url + :route {:handler :sync/snapshot-download}}) + text (.text resp) + body (js->clj (js/JSON.parse text) :keywordize-keys true) + http-metadata (aget (:opts @put-call) "httpMetadata")] + (is (= 200 (.-status resp))) + (is (nil? (:content-encoding body))) + (is (nil? (aget http-metadata "contentEncoding")))) + (p/then (fn [] + (restore!) + (done))) + (p/catch (fn [error] + (restore!) + (is false (str error)) + (done))))))) diff --git a/src/main/frontend/handler/db_based/sync.cljs b/src/main/frontend/handler/db_based/sync.cljs index a6a390466d..933466e72a 100644 --- a/src/main/frontend/handler/db_based/sync.cljs +++ b/src/main/frontend/handler/db_based/sync.cljs @@ -86,6 +86,39 @@ rows (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows})))))) +(defn- gzip-bytes? + [^js bytes] + (and (some? bytes) + (>= (.-byteLength bytes) 2) + (= 31 (aget bytes 0)) + (= 139 (aget bytes 1)))) + +(defn- bytes->stream + [^js bytes] + (js/ReadableStream. + #js {:start (fn [controller] + (.enqueue controller bytes) + (.close controller))})) + +(defn- stream bytes) + decompressed (.pipeThrough stream (js/DecompressionStream. "gzip")) + resp (js/Response. decompressed) + buf (.arrayBuffer resp)] + (->uint8 buf)) + (p/rejected (ex-info "gzip decompression not supported" + {:type :db-sync/decompression-not-supported})))) + +(defn- uint8 buf)] + (if (gzip-bytes? bytes) + ( (p/let [gzip-bytes ( (p/with-redefs [db-sync/http-base (fn [] "http://base") + db-sync/fetch-json (fn [url _opts _schema] + (cond + (string/ends-with? url "/pull") + (p/resolved {:t 42}) + + (string/ends-with? url "/snapshot/download") + (p/resolved {:url "http://snapshot"}) + + :else + (p/rejected (ex-info "unexpected fetch-json URL" + {:url url})))) + user-handler/task--ensure-id&access-token (fn [resolve _reject] + (resolve true)) + state/