fix(sync): tolerate stale gzip headers in snapshot download

This commit is contained in:
Tienson Qin
2026-04-09 03:27:51 +08:00
parent 1b54481fd4
commit 8caed635f2
2 changed files with 102 additions and 33 deletions

View File

@@ -58,19 +58,43 @@
(<decompress-gzip-bytes chunk)
chunk)))
(defn- response-body-stream
[^js resp]
(let [encoding (some-> resp .-headers (.get "content-encoding"))]
(cond
(nil? (.-body resp))
nil
(defn- <stream-starts-with-gzip?
[^js stream]
(let [reader (.getReader stream)]
(-> (.read reader)
(p/then (fn [result]
(if (.-done result)
false
(gzip-bytes? (->uint8 (.-value result))))))
(p/catch (fn [_] false))
(p/finally (fn []
(try
(.releaseLock reader)
(catch :default _)))))))
(= "gzip" encoding)
(when (exists? js/DecompressionStream)
(.pipeThrough (.-body resp) (js/DecompressionStream. "gzip")))
(defn- <response-body-stream
[^js resp]
(let [body (.-body resp)
encoding (some-> resp .-headers (.get "content-encoding"))]
(cond
(nil? body)
(p/resolved nil)
(and (= "gzip" encoding) (exists? js/DecompressionStream))
(if (fn? (.-tee body))
(let [branches (.tee body)
probe (aget branches 0)
payload (aget branches 1)]
(-> (<stream-starts-with-gzip? probe)
(p/then (fn [gzip?]
(if gzip?
(.pipeThrough payload (js/DecompressionStream. "gzip"))
payload)))
(p/catch (fn [_] payload))))
(p/resolved (.pipeThrough body (js/DecompressionStream. "gzip"))))
:else
(.-body resp))))
(p/resolved body))))
(defn- <flush-row-batches!
[rows batch-size on-batch]
@@ -84,29 +108,30 @@
(defn- <stream-snapshot-row-batches!
[^js resp batch-size on-batch]
(if-let [stream (response-body-stream resp)]
(let [reader (.getReader stream)]
(p/loop [buffer nil
pending []]
(p/let [result (.read reader)]
(if (.-done result)
(let [pending (if (and buffer (pos? (.-byteLength buffer)))
(into pending (snapshot/finalize-framed-buffer buffer))
pending)]
(if (seq pending)
(p/let [_ (on-batch pending)]
{:chunk-count 1})
{:chunk-count 0}))
(let [{rows :rows next-buffer :buffer} (snapshot/parse-framed-chunk buffer (->uint8 (.-value result)))
pending (into pending rows)]
(p/let [pending (<flush-row-batches! pending batch-size on-batch)]
(p/recur next-buffer pending)))))))
(p/let [snapshot-bytes (<snapshot-response-bytes resp)
rows (vec (snapshot/finalize-framed-buffer snapshot-bytes))]
(if (seq rows)
(p/let [_ (on-batch rows)]
{:chunk-count 1})
{:chunk-count 0}))))
(p/let [stream (<response-body-stream resp)]
(if stream
(let [reader (.getReader stream)]
(p/loop [buffer nil
pending []]
(p/let [result (.read reader)]
(if (.-done result)
(let [pending (if (and buffer (pos? (.-byteLength buffer)))
(into pending (snapshot/finalize-framed-buffer buffer))
pending)]
(if (seq pending)
(p/let [_ (on-batch pending)]
{:chunk-count 1})
{:chunk-count 0}))
(let [{rows :rows next-buffer :buffer} (snapshot/parse-framed-chunk buffer (->uint8 (.-value result)))
pending (into pending rows)]
(p/let [pending (<flush-row-batches! pending batch-size on-batch)]
(p/recur next-buffer pending)))))))
(p/let [snapshot-bytes (<snapshot-response-bytes resp)
rows (vec (snapshot/finalize-framed-buffer snapshot-bytes))]
(if (seq rows)
(p/let [_ (on-batch rows)]
{:chunk-count 1})
{:chunk-count 0})))))
(defn- with-auth-headers
[opts]

View File

@@ -0,0 +1,44 @@
(ns frontend.worker.sync.download-test
(:require [cljs.test :refer [async deftest is]]
[frontend.worker.sync.download :as sync-download]
[logseq.db-sync.snapshot :as snapshot]
[promesa.core :as p]))
(defn- frame-bytes
[^js data]
(let [len (.-byteLength data)
out (js/Uint8Array. (+ 4 len))
view (js/DataView. (.-buffer out))]
(.setUint32 view 0 len false)
(.set out data 4)
out))
(defn- stream-from-payload
[^js payload]
(js/ReadableStream.
#js {:start (fn [controller]
(.enqueue controller payload)
(.close controller))}))
(deftest stream-snapshot-row-batches-ignores-stale-gzip-header-test
(async done
(let [rows [[1 "row-1" nil]
[2 "row-2" nil]]
payload (frame-bytes (snapshot/encode-rows rows))
resp (js/Response.
(stream-from-payload payload)
#js {:status 200
:headers #js {"content-encoding" "gzip"}})
batches* (atom [])]
(-> (#'sync-download/<stream-snapshot-row-batches!
resp
1000
(fn [batch]
(swap! batches* conj batch)
(p/resolved true)))
(p/then (fn [_]
(is (= [rows] @batches*))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))