refactor: graph download switch to use r2 snapshot

This commit is contained in:
Tienson Qin
2026-03-13 13:59:03 +08:00
parent 546499ef04
commit eeabd54d87
15 changed files with 579 additions and 694 deletions

View File

@@ -8,7 +8,7 @@
#js {"Access-Control-Allow-Origin" "*"
"Access-Control-Allow-Headers" "content-type,content-encoding,authorization,x-amz-meta-checksum,x-amz-meta-type"
"Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD"
"Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type,x-snapshot-row-count"})
"Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type,x-asset-size,x-snapshot-datom-count"})
(defn json-response
([data] (json-response data 200))

View File

@@ -4,6 +4,7 @@
(def ^:private transit-w (transit/writer :json))
(def ^:private transit-r (transit/reader :json))
(def ^:private text-decoder (js/TextDecoder.))
(def ^:private newline-byte 10)
(defn- ->uint8
[data]
@@ -77,3 +78,55 @@
(+ total 4 (.-byteLength payload))))
0
rows-batches))
(defn- decode-transit
[payload]
(transit/read transit-r (.decode text-decoder (->uint8 payload))))
(defn encode-datoms-jsonl
[datoms]
(->uint8
(apply str
(map (fn [datom]
(str (transit/write transit-w datom) "\n"))
datoms))))
(defn- find-newline-offset
[^js data start total]
(loop [offset start]
(cond
(>= offset total)
nil
(= newline-byte (aget data offset))
offset
:else
(recur (inc offset)))))
(defn parse-datoms-jsonl-chunk
[buffer chunk]
(let [data (concat-bytes buffer chunk)
total (.-byteLength data)]
(loop [offset 0
datoms []]
(let [newline-offset (find-newline-offset data offset total)]
(if (number? newline-offset)
(let [line (.slice data offset newline-offset)
next-offset (inc newline-offset)
datoms (if (zero? (.-byteLength line))
datoms
(conj datoms (decode-transit line)))]
(recur next-offset datoms))
{:datoms datoms
:buffer (when (< offset total)
(.slice data offset total))})))))
(defn finalize-datoms-jsonl-buffer
[buffer]
(if (or (nil? buffer) (zero? (.-byteLength buffer)))
[]
(let [{:keys [datoms buffer]} (parse-datoms-jsonl-chunk nil buffer)]
(cond-> datoms
(and buffer (pos? (.-byteLength buffer)))
(conj (decode-transit buffer))))))

View File

@@ -2,7 +2,8 @@
(:require [cljs-bean.core :as bean]
[clojure.string :as string]
[logseq.db-sync.common :as common :refer [cors-headers]]
[logseq.db-sync.worker.http :as http]))
[logseq.db-sync.worker.http :as http]
[promesa.core :as p]))
(def ^:private max-asset-size (* 100 1024 1024))
@@ -26,6 +27,69 @@
(.-readable fixed))
body))
(defn- <body-with-known-length
[body size]
(cond
(nil? body)
(p/resolved nil)
(and (number? size)
(exists? js/FixedLengthStream)
(fn? (some-> body .-pipeTo)))
(p/resolved (maybe-fixed-length-body body size))
;; Some runtimes drop content-length for streamed bodies without a fixed-length wrapper.
;; Buffer as a fallback so clients still receive the header.
(and (number? size)
(fn? (some-> body .-getReader)))
(p/let [resp (js/Response. body)
buf (.arrayBuffer resp)]
buf)
:else
(p/resolved body)))
(defn- handle-get-asset
[^js bucket key asset-type]
(.then (.get bucket key)
(fn [^js obj]
(if (nil? obj)
(http/error-response "not found" 404)
(let [metadata (.-httpMetadata obj)
content-type (or (.-contentType metadata)
"application/octet-stream")
content-encoding (.-contentEncoding metadata)
cache-control (.-cacheControl metadata)
size (parse-size (or (.-size obj)
(some-> (.-body obj) .-byteLength)))
content-length (cond
(number? size) (str size)
(string? size) size
:else nil)]
(p/let [body (<body-with-known-length (.-body obj) size)
headers (cond-> {"content-type" content-type
"x-asset-type" asset-type}
(and (string? content-length)
(pos? (.-length content-length)))
(assoc "content-length" content-length)
(and (string? content-length)
(pos? (.-length content-length)))
(assoc "x-asset-size" content-length)
(and (string? content-encoding)
(not= content-encoding "null")
(pos? (.-length content-encoding)))
(assoc "content-encoding" content-encoding)
(and (string? cache-control)
(pos? (.-length cache-control)))
(assoc "cache-control" cache-control)
true
(bean/->js))]
(js/Response. body
#js {:status 200
:headers (js/Object.assign
headers
(cors-headers))})))))))
(defn parse-asset-path [path]
(let [prefix "/assets/"]
(when (string/starts-with? path prefix)
@@ -57,41 +121,7 @@
(http/error-response "missing assets bucket" 500)
(case method
"GET"
(.then (.get bucket key)
(fn [^js obj]
(if (nil? obj)
(http/error-response "not found" 404)
(let [metadata (.-httpMetadata obj)
content-type (or (.-contentType metadata)
"application/octet-stream")
content-encoding (.-contentEncoding metadata)
cache-control (.-cacheControl metadata)
size (parse-size (or (.-size obj)
(some-> (.-body obj) .-byteLength)))
content-length (cond
(number? size) (str size)
(string? size) size
:else nil)
body (maybe-fixed-length-body (.-body obj) size)
headers (cond-> {"content-type" content-type
"x-asset-type" asset-type}
(and (string? content-length)
(pos? (.-length content-length)))
(assoc "content-length" content-length)
(and (string? content-encoding)
(not= content-encoding "null")
(pos? (.-length content-encoding)))
(assoc "content-encoding" content-encoding)
(and (string? cache-control)
(pos? (.-length cache-control)))
(assoc "cache-control" cache-control)
true
(bean/->js))]
(js/Response. body
#js {:status 200
:headers (js/Object.assign
headers
(cors-headers))})))))
(handle-get-asset bucket key asset-type)
"PUT"
(.then (.arrayBuffer request)

View File

@@ -16,7 +16,7 @@
(def ^:private snapshot-download-batch-size 10000)
(def ^:private snapshot-cache-control "private, max-age=300")
(def ^:private snapshot-content-type "application/transit+json")
(def ^:private snapshot-content-type "application/x-ndjson")
(def ^:private snapshot-content-encoding "gzip")
(def ^:private snapshot-uploading-meta-key :snapshot-uploading?)
;; 10m
@@ -57,35 +57,6 @@
(ensure-schema! self)
(not= "true" (storage/get-meta (.-sql self) snapshot-uploading-meta-key)))
(defn- fetch-kvs-rows
[sql after limit]
(common/get-sql-rows
(common/sql-exec sql
"select addr, content, addresses from kvs where addr > ? order by addr asc limit ?"
after
limit)))
(defn- snapshot-row-count
[sql]
(let [row (first (common/get-sql-rows
(common/sql-exec sql "select count(*) as total from kvs")))]
(cond
(array? row)
(aget row 0)
(some? row)
(or (aget row "total")
(aget row "count(*)")
0)
:else
0)))
(defn- snapshot-row->tuple [row]
(if (array? row)
[(aget row 0) (aget row 1) (aget row 2)]
[(aget row "addr") (aget row "content") (aget row "addresses")]))
(defn- import-snapshot-rows!
[sql table rows]
(when (seq rows)
@@ -121,9 +92,10 @@
stream))
(defn- maybe-compress-stream [stream]
(if (exists? js/CompressionStream)
(.pipeThrough stream (js/CompressionStream. snapshot-content-encoding))
stream))
(when-not (exists? js/CompressionStream)
(throw (ex-info "gzip compression not supported"
{:type :db-sync/compression-not-supported})))
(.pipeThrough stream (js/CompressionStream. snapshot-content-encoding)))
(defn- <buffer-stream
[stream]
@@ -147,25 +119,47 @@
(.set out b (.-byteLength a))
out)))
(defn- snapshot-datom->jsonl-datom
[datom]
{:e (:e datom)
:a (:a datom)
:v (:v datom)
:tx (:tx datom)
:added (:added datom)})
(defn- snapshot-datom-count
[conn]
(count (d/datoms @conn :eavt)))
(defn- snapshot-export-datoms
[conn]
(let [db @conn
schema-version-eid (some-> (d/entity db :logseq.kv/schema-version) :db/id)
ident-eids (into #{}
(map :e)
(d/datoms db :avet :db/ident))
jsonl-datoms (fn [pred]
(sequence
(comp (filter pred)
(map snapshot-datom->jsonl-datom))
(d/datoms db :eavt)))]
(concat (jsonl-datoms #(= schema-version-eid (:e %)))
(jsonl-datoms #(and (contains? ident-eids (:e %))
(not= schema-version-eid (:e %))))
(jsonl-datoms #(not (contains? ident-eids (:e %)))))))
(defn- snapshot-export-stream [^js self]
(let [sql (.-sql self)
state (volatile! {:after -1 :done? false})]
(ensure-conn! self)
(let [remaining (volatile! (seq (snapshot-export-datoms (.-conn self))))]
(js/ReadableStream.
#js {:pull (fn [controller]
(p/let [{:keys [after done?]} @state]
(if done?
(let [batch (vec (take snapshot-download-batch-size @remaining))]
(if (empty? batch)
(.close controller)
(let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)
rows (mapv snapshot-row->tuple rows)
last-addr (if (seq rows)
(apply max (map first rows))
after)
done? (< (count rows) snapshot-download-batch-size)]
(when (seq rows)
(let [payload (snapshot/encode-rows rows)
framed (snapshot/frame-bytes payload)]
(.enqueue controller framed)))
(vswap! state assoc :after last-addr :done? done?)))))})))
(let [remaining' (drop snapshot-download-batch-size @remaining)
payload (snapshot/encode-datoms-jsonl batch)]
(vreset! remaining (seq remaining'))
(.enqueue controller payload)))))})))
(defn- upload-multipart!
[^js bucket key stream opts]
@@ -203,42 +197,6 @@
(.abort upload)
(throw error)))))))
(defn- snapshot-export-length [^js self]
(let [sql (.-sql self)]
(p/loop [after -1
total 0]
(let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)]
(if (empty? rows)
total
(let [rows (mapv snapshot-row->tuple rows)
payload (snapshot/encode-rows rows)
total (+ total 4 (.-byteLength payload))
last-addr (apply max (map first rows))
done? (< (count rows) snapshot-download-batch-size)]
(if done?
total
(p/recur last-addr total))))))))
(defn- snapshot-export-fixed-length [^js self]
(p/let [length (snapshot-export-length self)
stream (snapshot-export-stream self)]
(if (exists? js/FixedLengthStream)
(let [^js fixed (js/FixedLengthStream. length)
readable (.-readable fixed)
writable (.-writable fixed)
reader (.getReader stream)
writer (.getWriter writable)]
(p/let [_ (p/loop []
(p/let [chunk (.read reader)]
(if (.-done chunk)
(.close writer)
(p/let [_ (.write writer (.-value chunk))]
(p/recur)))))]
readable))
(p/let [resp (js/Response. stream)
buf (.arrayBuffer resp)]
buf))))
(declare import-snapshot!)
(defn- import-snapshot-stream! [^js self stream reset?]
(let [reader (.getReader stream)
@@ -370,19 +328,17 @@
(let [graph-id (graph-id-from-request request)]
(if (not (seq graph-id))
(http/bad-request "missing graph id")
(let [use-compression? (exists? js/CompressionStream)
content-encoding (when use-compression? snapshot-content-encoding)
row-count (snapshot-row-count (.-sql self))
stream (snapshot-export-stream self)
stream (if use-compression?
(maybe-compress-stream stream)
stream)]
(let [stream (-> (snapshot-export-stream self)
(maybe-compress-stream))
conn (or (.-conn self)
(do (ensure-conn! self) (.-conn self)))
datom-count (snapshot-datom-count conn)]
(js/Response. stream
#js {:status 200
:headers (js/Object.assign
#js {"content-type" snapshot-content-type
"content-encoding" (or content-encoding "identity")}
#js {"x-snapshot-row-count" (str row-count)}
"content-encoding" snapshot-content-encoding}
#js {"x-snapshot-datom-count" (str datom-count)}
(common/cors-headers))})))))
(defn- handle-sync-snapshot-download
@@ -399,31 +355,24 @@
:else
(p/let [snapshot-id (str (random-uuid))
key (snapshot-key graph-id snapshot-id)
use-compression? (exists? js/CompressionStream)
content-encoding (when use-compression? snapshot-content-encoding)
stream (snapshot-export-stream self)
stream (if use-compression?
(maybe-compress-stream stream)
stream)
stream (-> (snapshot-export-stream self)
(maybe-compress-stream))
multipart? (and (some? (.-createMultipartUpload bucket))
(fn? (.-createMultipartUpload bucket)))
opts #js {:httpMetadata #js {:contentType snapshot-content-type
:contentEncoding content-encoding
:contentEncoding snapshot-content-encoding
:cacheControl snapshot-cache-control}
:customMetadata #js {:purpose "snapshot"
:created-at (str (common/now-ms))}}
_ (if multipart?
(upload-multipart! bucket key stream opts)
(if use-compression?
(p/let [body (<buffer-stream stream)]
(.put bucket key body opts))
(p/let [body (snapshot-export-fixed-length self)]
(.put bucket key body opts))))
(p/let [body (<buffer-stream stream)]
(.put bucket key body opts)))
url (snapshot-url request graph-id snapshot-id)]
(http/json-response :sync/snapshot-download {:ok true
:key key
:url url
:content-encoding content-encoding})))))
:content-encoding snapshot-content-encoding})))))
(defn- handle-sync-admin-reset
[^js self]

View File

@@ -2,43 +2,31 @@
(:require [cljs.test :refer [deftest is testing]]
[logseq.db-sync.snapshot :as snapshot]))
(deftest transit-frame-roundtrip-test
(testing "framed transit json roundtrips rows"
(let [expected [{:addr 1 :content "a" :addresses nil}
{:addr 2 :content "b" :addresses "{\"k\":1}"}]
frame (snapshot/frame-bytes (snapshot/encode-rows expected))
{:keys [rows buffer]} (snapshot/parse-framed-chunk nil frame)]
(is (= rows expected))
(def sample-datoms
[{:e 1 :a :db/ident :v :logseq.class/Page :tx 1 :added true}
{:e 2 :a :block/title :v "hello" :tx 1 :added true}])
(deftest datoms-jsonl-roundtrip-test
(testing "jsonl transit roundtrips datoms"
(let [payload (snapshot/encode-datoms-jsonl sample-datoms)
{:keys [datoms buffer]} (snapshot/parse-datoms-jsonl-chunk nil payload)]
(is (= sample-datoms datoms))
(is (or (nil? buffer) (zero? (.-byteLength buffer)))))))
(deftest transit-frame-split-test
(testing "parse-framed-chunk handles partial trailing frame"
(let [rows1 [{:addr 1 :content "a" :addresses nil}]
rows2 [{:addr 2 :content "b" :addresses nil}]
frame1 (snapshot/frame-bytes (snapshot/encode-rows rows1))
frame2 (snapshot/frame-bytes (snapshot/encode-rows rows2))
split-pos (- (.-byteLength frame2) 3)
part1 (.slice frame2 0 split-pos)
part2 (.slice frame2 split-pos (.-byteLength frame2))
{rows1-parsed :rows buffer :buffer} (snapshot/parse-framed-chunk nil (snapshot/concat-bytes frame1 part1))
{rows2-parsed :rows rows-buffer :buffer} (snapshot/parse-framed-chunk buffer part2)]
(is (= rows1-parsed rows1))
(is (= rows2-parsed rows2))
(is (or (nil? rows-buffer) (zero? (.-byteLength rows-buffer)))))))
(deftest datoms-jsonl-split-test
(testing "parse-datoms-jsonl-chunk handles partial trailing line"
(let [payload (snapshot/encode-datoms-jsonl sample-datoms)
split-pos (- (.-byteLength payload) 3)
part1 (.slice payload 0 split-pos)
part2 (.slice payload split-pos (.-byteLength payload))
{datoms1 :datoms buffer :buffer} (snapshot/parse-datoms-jsonl-chunk nil part1)
{datoms2 :datoms next-buffer :buffer} (snapshot/parse-datoms-jsonl-chunk buffer part2)]
(is (= (subvec sample-datoms 0 1) datoms1))
(is (= (subvec sample-datoms 1) datoms2))
(is (or (nil? next-buffer) (zero? (.-byteLength next-buffer)))))))
(deftest transit-finalize-buffer-test
(testing "finalize-framed-buffer parses remaining frame"
(let [rows [{:addr 3 :content "c" :addresses nil}]
frame (snapshot/frame-bytes (snapshot/encode-rows rows))]
(is (= rows (snapshot/finalize-framed-buffer frame)))
(is (= [] (snapshot/finalize-framed-buffer (js/Uint8Array.)))))))
(deftest transit-framed-length-test
(testing "framed-length sums frame sizes"
(let [rows1 [{:addr 1 :content "a" :addresses nil}]
rows2 [{:addr 2 :content "b" :addresses nil}
{:addr 3 :content "c" :addresses nil}]
frame1 (snapshot/frame-bytes (snapshot/encode-rows rows1))
frame2 (snapshot/frame-bytes (snapshot/encode-rows rows2))]
(is (= (+ (.-byteLength frame1) (.-byteLength frame2))
(snapshot/framed-length [rows1 rows2]))))))
(deftest datoms-jsonl-finalize-buffer-test
(testing "finalize-datoms-jsonl-buffer parses the remaining line"
(let [payload (snapshot/encode-datoms-jsonl sample-datoms)]
(is (= sample-datoms (snapshot/finalize-datoms-jsonl-buffer payload)))
(is (= [] (snapshot/finalize-datoms-jsonl-buffer (js/Uint8Array.)))))))

View File

@@ -3,6 +3,13 @@
[logseq.db-sync.worker.handler.assets :as assets]
[promesa.core :as p]))
(defn- bytes->stream
[^js payload]
(js/ReadableStream.
#js {:start (fn [controller]
(.enqueue controller payload)
(.close controller))}))
(deftest assets-get-includes-content-length-header-test
(async done
(let [payload (js/Uint8Array. #js [1 2 3 4])
@@ -16,9 +23,38 @@
:httpMetadata #js {:contentType "application/octet-stream"}}))}}]
(-> (p/let [resp (assets/handle request env)]
(is (= 200 (.-status resp)))
(is (= "4" (.get (.-headers resp) "content-length"))))
(is (= "4" (.get (.-headers resp) "content-length")))
(is (= "4" (.get (.-headers resp) "x-asset-size"))))
(p/then (fn []
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))
(deftest assets-get-includes-content-length-with-stream-body-without-fixed-length-stream-test
(async done
(let [payload (js/Uint8Array. #js [1 2 3 4])
request (js/Request. "http://localhost/assets/graph-1/snapshot-1.snapshot"
#js {:method "GET"})
env #js {:LOGSEQ_SYNC_ASSETS
#js {:get (fn [_key]
(js/Promise.resolve
#js {:body (bytes->stream payload)
:size 4
:httpMetadata #js {:contentType "application/octet-stream"}}))}}
original-fixed-length-stream (.-FixedLengthStream js/globalThis)
restore! #(aset js/globalThis "FixedLengthStream" original-fixed-length-stream)]
(aset js/globalThis "FixedLengthStream" js/undefined)
(-> (p/let [resp (assets/handle request env)
buf (.arrayBuffer resp)]
(is (= 200 (.-status resp)))
(is (= "4" (.get (.-headers resp) "content-length")))
(is (= "4" (.get (.-headers resp) "x-asset-size")))
(is (= 4 (.-byteLength buf))))
(p/then (fn []
(restore!)
(done)))
(p/catch (fn [error]
(restore!)
(is false (str error))
(done)))))))

View File

@@ -3,6 +3,7 @@
[datascript.core :as d]
[logseq.db-sync.common :as common]
[logseq.db-sync.protocol :as protocol]
[logseq.db-sync.snapshot :as snapshot]
[logseq.db-sync.storage :as storage]
[logseq.db-sync.test-sql :as test-sql]
[logseq.db-sync.worker.handler.sync :as sync-handler]
@@ -31,11 +32,19 @@
bucket #js {:put (fn [key body opts]
(reset! put-call {:key key :body body :opts opts})
(js/Promise.resolve #js {:ok true}))}
conn (d/create-conn db-schema/schema)
self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket}
:conn conn
:schema-ready true
:sql (empty-sql)}
{:keys [request url]} (request-url)
original-compression-stream (.-CompressionStream js/globalThis)
restore! #(aset js/globalThis "CompressionStream" original-compression-stream)]
(d/transact! conn [{:db/ident :logseq.class/Page
:block/title "Page"}
{:db/ident :logseq.kv/schema-version
:kv/value {:major 65 :minor 23}}
{:db/id 2 :block/title "hello"}])
(aset js/globalThis
"CompressionStream"
(passthrough-compression-stream-constructor))
@@ -45,10 +54,22 @@
:route {:handler :sync/snapshot-download}})
text (.text resp)
body (js->clj (js/JSON.parse text) :keywordize-keys true)
http-metadata (aget (:opts @put-call) "httpMetadata")]
http-metadata (aget (:opts @put-call) "httpMetadata")
payload (js/Uint8Array. (:body @put-call))
{:keys [datoms]} (snapshot/parse-datoms-jsonl-chunk nil payload)]
(is (= 200 (.-status resp)))
(is (= "gzip" (:content-encoding body)))
(is (= "gzip" (aget http-metadata "contentEncoding"))))
(is (= "gzip" (aget http-metadata "contentEncoding")))
(is (= "application/x-ndjson" (aget http-metadata "contentType")))
(is (= 5 (count datoms)))
(is (= [:logseq.kv/schema-version
:logseq.kv/schema-version
:logseq.kv/schema-version
:logseq.class/Page
:logseq.class/Page]
(mapv (fn [{:keys [e]}]
(:db/ident (d/entity @conn e)))
datoms))))
(p/then (fn []
(restore!)
(done)))
@@ -57,28 +78,36 @@
(is false (str error))
(done)))))))
(deftest snapshot-download-falls-back-to-uncompressed-when-compression-unsupported-test
(deftest snapshot-download-stream-route-returns-jsonl-datoms-test
(async done
(let [put-call (atom nil)
bucket #js {:put (fn [key body opts]
(reset! put-call {:key key :body body :opts opts})
(js/Promise.resolve #js {:ok true}))}
self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket}
(let [conn (d/create-conn db-schema/schema)
self #js {:env #js {}
:conn conn
:schema-ready true
:sql (empty-sql)}
{:keys [request url]} (request-url)
{:keys [request]} (request-url "/sync/graph-1/snapshot/stream?graph-id=graph-1")
original-compression-stream (.-CompressionStream js/globalThis)
restore! #(aset js/globalThis "CompressionStream" original-compression-stream)]
(aset js/globalThis "CompressionStream" js/undefined)
(-> (p/let [resp (sync-handler/handle {:self self
:request request
:url url
:route {:handler :sync/snapshot-download}})
text (.text resp)
body (js->clj (js/JSON.parse text) :keywordize-keys true)
http-metadata (aget (:opts @put-call) "httpMetadata")]
(d/transact! conn [{:db/ident :logseq.class/Page
:block/title "Page"}
{:db/ident :logseq.kv/schema-version
:kv/value {:major 65 :minor 23}}
{:db/id 2 :block/title "hello"}])
(aset js/globalThis
"CompressionStream"
(passthrough-compression-stream-constructor))
(-> (p/let [resp (sync-handler/handle-http self request)
encoding (.get (.-headers resp) "content-encoding")
content-type (.get (.-headers resp) "content-type")
buf (.arrayBuffer resp)
payload (js/Uint8Array. buf)
datoms (snapshot/finalize-datoms-jsonl-buffer payload)]
(is (= 200 (.-status resp)))
(is (nil? (:content-encoding body)))
(is (nil? (aget http-metadata "contentEncoding"))))
(is (= "gzip" encoding))
(is (= "application/x-ndjson" content-type))
(is (= 5 (count datoms)))
(is (= :logseq.kv/schema-version
(:db/ident (d/entity @conn (:e (first datoms)))))))
(p/then (fn []
(restore!)
(done)))
@@ -87,22 +116,6 @@
(is false (str error))
(done)))))))
(deftest snapshot-stream-route-works-via-handle-http-test
(async done
(let [self #js {:env #js {}
:sql (empty-sql)}
{:keys [request]} (request-url "/sync/graph-1/snapshot/stream?graph-id=graph-1")]
(-> (p/let [resp (sync-handler/handle-http self request)
encoding (.get (.-headers resp) "content-encoding")]
(is (= 200 (.-status resp)))
(is (= "application/transit+json" (.get (.-headers resp) "content-type")))
(is (contains? #{"gzip" "identity"} encoding)))
(p/then (fn []
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))
(deftest ensure-schema-fallback-probes-existing-tables-test
(async done
(let [self #js {:sql (empty-sql)}

View File

@@ -93,9 +93,9 @@
- `GET /sync/:graph-id/snapshot/download`
- Build a snapshot file in R2 and return a download URL.
- Response: `{"ok":true,"key":"<graph-id>/<uuid>.snapshot","url":"<origin>/assets/:graph-id/<uuid>.snapshot","content-encoding":"gzip"}`.
- The snapshot file is a framed Transit JSON stream of kvs rows, optionally gzip-compressed.
- The snapshot file stored in R2 is a gzip-compressed NDJSON stream of full Datascript datoms. Each line is a Transit JSON datom map: `{e,a,v,tx,added}`.
- `POST /sync/:graph-id/snapshot/upload?reset=true|false`
- Upload a snapshot stream (framed Transit JSON, optionally gzip-compressed). The server imports rows into kvs.
- Upload a snapshot stream for bootstrap import. Current upload format remains framed Transit JSON kvs rows, optionally gzip-compressed.
- Request body: binary stream; headers should include `content-type: application/transit+json` and `content-encoding: gzip` when compressed.
- Response: `{"ok":true,"count":<n>,"key":"<graph-id>/<uuid>.snapshot"}`.
- Error response (400): `{"error":"missing body"|"missing graph id"}`.

View File

@@ -12,6 +12,7 @@
[lambdaisland.glogi :as log]
[logseq.db :as ldb]
[logseq.db-sync.malli-schema :as db-sync-schema]
[logseq.db-sync.snapshot :as snapshot]
[logseq.db.sqlite.util :as sqlite-util]
[promesa.core :as p]))
@@ -46,15 +47,6 @@
(when-not (js/isNaN parsed)
parsed))))
(def ^:private snapshot-text-decoder (js/TextDecoder.))
(defn- decode-snapshot-rows [payload]
(sqlite-util/read-transit-str (.decode snapshot-text-decoder (->uint8 payload))))
(defn- frame-len [^js data offset]
(let [view (js/DataView. (.-buffer data) offset 4)]
(.getUint32 view 0 false)))
(defn- concat-bytes
[^js a ^js b]
(cond
@@ -66,34 +58,6 @@
(.set out b (.-byteLength a))
out)))
(defn- parse-framed-chunk
[buffer chunk]
(let [data (concat-bytes buffer chunk)
total (.-byteLength data)]
(loop [offset 0
rows []]
(if (< (- total offset) 4)
{:rows rows
:buffer (when (< offset total)
(.slice data offset total))}
(let [len (frame-len data offset)
next-offset (+ offset 4 len)]
(if (<= next-offset total)
(let [payload (.slice data (+ offset 4) next-offset)
decoded (decode-snapshot-rows payload)]
(recur next-offset (into rows decoded)))
{:rows rows
:buffer (.slice data offset total)}))))))
(defn- finalize-framed-buffer
[buffer]
(if (or (nil? buffer) (zero? (.-byteLength buffer)))
[]
(let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)]
(if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer))))
rows
(throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows}))))))
(defn- gzip-bytes?
[^js payload]
(and (some? payload)
@@ -158,17 +122,17 @@
{:chunk-count 1})
{:chunk-count 0}))))
(defn- <flush-row-batches!
[rows batch-size on-batch]
(p/loop [remaining rows]
(defn- <flush-datom-batches!
[datoms batch-size on-batch]
(p/loop [remaining datoms]
(if (>= (count remaining) batch-size)
(let [batch (subvec remaining 0 batch-size)
rest-rows (subvec remaining batch-size)]
rest-datoms (subvec remaining batch-size)]
(p/let [_ (on-batch batch)]
(p/recur rest-rows)))
(p/recur rest-datoms)))
remaining)))
(defn- <stream-snapshot-row-batches!
(defn- <stream-snapshot-datom-batches!
[^js resp batch-size on-batch]
(if-let [stream (response-body-stream resp)]
(let [reader (.getReader stream)]
@@ -177,20 +141,20 @@
(p/let [result (.read reader)]
(if (.-done result)
(let [pending (if (and buffer (pos? (.-byteLength buffer)))
(into pending (finalize-framed-buffer buffer))
(into pending (snapshot/finalize-datoms-jsonl-buffer buffer))
pending)]
(if (seq pending)
(p/let [_ (on-batch pending)]
{:chunk-count 1})
{:chunk-count 0}))
(let [{rows :rows next-buffer :buffer} (parse-framed-chunk buffer (->uint8 (.-value result)))
pending (into pending rows)]
(p/let [pending (<flush-row-batches! pending batch-size on-batch)]
(let [{datoms :datoms next-buffer :buffer} (snapshot/parse-datoms-jsonl-chunk buffer (->uint8 (.-value result)))
pending (into pending datoms)]
(p/let [pending (<flush-datom-batches! pending batch-size on-batch)]
(p/recur next-buffer pending)))))))
(p/let [snapshot-bytes (<snapshot-response-bytes resp)
rows (vec (finalize-framed-buffer snapshot-bytes))]
(if (seq rows)
(p/let [_ (on-batch rows)]
datoms (vec (snapshot/finalize-datoms-jsonl-buffer snapshot-bytes))]
(if (seq datoms)
(p/let [_ (on-batch datoms)]
{:chunk-count 1})
{:chunk-count 0}))))
@@ -427,15 +391,16 @@
(throw (ex-info "non-integer remote-tx when downloading graph"
{:graph graph-name
:remote-tx remote-tx})))
resp (js/fetch (str base "/sync/" graph-uuid "/snapshot/stream")
snapshot-resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/download")
{:method "GET"}
{:response-schema :sync/snapshot-download})
resp (js/fetch (:url snapshot-resp)
(clj->js (with-auth-headers {:method "GET"})))
total-rows (parse-header-int resp "x-snapshot-row-count")
_ (state/pub-event!
[:rtc/log {:type :rtc.log/download
:sub-type :download-progress
:graph-uuid graph-uuid
:message (str "Start downloading graph snapshot, total rows: "
(or total-rows "unknown"))}])]
:message "Start downloading graph snapshot"}])]
(when-not (.-ok resp)
(throw (ex-info "snapshot download failed"
{:graph graph-name
@@ -445,25 +410,16 @@
(if-let [import-id @import-id*]
(p/resolved import-id)
(p/let [{:keys [import-id]} (state/<invoke-db-worker :thread-api/db-sync-import-prepare
graph true graph-uuid graph-e2ee? total-rows)]
graph true graph-uuid graph-e2ee?)]
(reset! import-id* import-id)
import-id)))]
(p/let [_ (if @state/*db-worker
(<stream-snapshot-chunks!
resp
(fn [chunk]
(p/let [import-id (ensure-import!)]
(state/<invoke-db-worker-direct-pass :thread-api/db-sync-import-framed-chunk
(Comlink/transfer chunk #js [(.-buffer chunk)])
graph-uuid
import-id))))
(<stream-snapshot-row-batches!
resp
10000
(fn [rows]
(p/let [import-id (ensure-import!)]
(state/<invoke-db-worker :thread-api/db-sync-import-rows-chunk
rows graph-uuid import-id)))))
(p/let [_ (<stream-snapshot-datom-batches!
resp
25000
(fn [datoms]
(p/let [import-id (ensure-import!)]
(state/<invoke-db-worker :thread-api/db-sync-import-datoms-chunk
datoms graph-uuid import-id))))
_ (state/pub-event!
[:rtc/log {:type :rtc.log/download
:sub-type :download-completed

View File

@@ -38,7 +38,6 @@
[logseq.cli.common.mcp.tools :as cli-common-mcp-tools]
[logseq.common.util :as common-util]
[logseq.db :as ldb]
[logseq.db-sync.snapshot :as snapshot]
[logseq.db.common.entity-plus :as entity-plus]
[logseq.db.common.initial-data :as common-initial-data]
[logseq.db.common.order :as db-order]
@@ -71,8 +70,6 @@
(def ^:private search-index-build-time-budget-ms 8)
(def ^:private search-index-build-idle-diff-ms 1000)
(def ^:private search-index-build-pause-ms 300)
(def ^:private db-sync-import-batch-size 10000)
(defonce *sqlite worker-state/*sqlite)
(defonce *sqlite-conns worker-state/*sqlite-conns)
(defonce *datascript-conns worker-state/*datascript-conns)
@@ -271,7 +268,7 @@
{:skip-validate-db? true}))))
(defn- <create-or-open-db!
[repo {:keys [config datoms] :as opts}]
[repo {:keys [config datoms sync-download-graph?] :as opts}]
(when-not (worker-state/get-sqlite-conn repo)
(p/let [[db search-db client-ops-db :as dbs] (get-dbs repo)
storage (new-sqlite-storage db)
@@ -318,15 +315,17 @@
(swap! *client-ops-conns assoc repo client-ops-conn)
(when (and (not @*publishing?) (not= client-op/schema-in-db (d/schema @client-ops-conn)))
(d/reset-schema! client-ops-conn client-op/schema-in-db))
(let [initial-tx-report (when (and (not initial-data-exists?) (not datoms))
(let [initial-tx-report (when-not (or initial-data-exists?
(seq datoms)
sync-download-graph?)
(let [config (or config "")
initial-data (sqlite-create-graph/build-db-initial-data
config (select-keys opts [:import-type :graph-git-sha :remote-graph?]))]
(ldb/transact! conn initial-data
{:initial-db? true})))]
(db-migrate/migrate conn)
(gc-sqlite-dbs! db client-ops-db conn {})
(when-not sync-download-graph?
(db-migrate/migrate conn)
(gc-sqlite-dbs! db client-ops-db conn {}))
(when initial-tx-report
(db-sync/handle-local-tx! repo initial-tx-report))
@@ -659,8 +658,8 @@
[repo]
(<unlink-db! repo))
(defn- import-datoms-to-db!
[repo graph-id remote-tx datoms]
(defn- complete-datoms-import!
[repo graph-id remote-tx]
(-> (p/do!
(when-let [search-db (worker-state/get-sqlite-conn repo :search)]
(search/truncate-table! search-db))
@@ -668,8 +667,6 @@
{:sub-type :download-progress
:graph-uuid graph-id
:message "Saving data to DB"})
((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? true
:datoms datoms})
(db-sync/rehydrate-large-titles-from-db! repo graph-id)
(rtc-log-and-state/rtc-log :rtc.log/download
{:sub-type :download-completed
@@ -721,42 +718,47 @@
(throw (stale-import-ex-info repo graph-id import-id)))
state))
(defn- import-snapshot-rows-batch!
[db aes-key graph-e2ee? rows]
(p/let [rows-batch (if graph-e2ee?
(sync-crypt/<decrypt-snapshot-rows-batch aes-key rows)
rows)]
(upsert-addr-content! db (rows->sqlite-binds rows-batch))))
(defn- datom->tx
[{:keys [e a v]}]
[:db/add e a v])
(defn- import-snapshot-rows!
[db aes-key graph-e2ee? rows]
(let [batches (partition-all db-sync-import-batch-size rows)]
(p/doseq [batch batches]
(import-snapshot-rows-batch! db aes-key graph-e2ee? batch))))
(defn- import-datoms-batch!
[conn aes-key graph-e2ee? datoms]
(p/let [datoms-batch (if graph-e2ee?
(sync-crypt/<decrypt-snapshot-datoms-batch aes-key datoms)
datoms)
ident-tx-data (into [] (comp (filter #(= :db/ident (:a %)))
(map datom->tx))
datoms-batch)
regular-tx-data (into [] (comp (remove #(= :db/ident (:a %)))
(map datom->tx))
datoms-batch)
tx-data (into ident-tx-data regular-tx-data)]
(when (seq tx-data)
(d/transact! conn tx-data {:sync-download-graph? true}))))
(defn- log-import-progress!
[graph-id import-id rows-count]
(when (pos? rows-count)
(let [{:keys [imported-rows total-rows]}
[graph-id import-id datoms-count]
(when (pos? datoms-count)
(let [{:keys [imported-datoms total-datoms]}
(swap! *import-state
(fn [state]
(if (= import-id (:import-id state))
(update state :imported-rows (fnil + 0) rows-count)
(update state :imported-datoms (fnil + 0) datoms-count)
state)))]
(rtc-log-and-state/rtc-log :rtc.log/download
{:sub-type :download-progress
:graph-uuid graph-id
:message (if (some? total-rows)
(str "Importing data " imported-rows "/" total-rows)
(str "Importing data " imported-rows))}))))
:message (if (some? total-datoms)
(str "Importing data " imported-datoms "/" total-datoms)
(str "Importing data " imported-datoms))}))))
(def-thread-api :thread-api/db-sync-import-prepare
[repo reset? graph-id graph-e2ee? & [total-rows]]
(let [graph-e2ee? (if (nil? graph-e2ee?) true (true? graph-e2ee?))
opened-db (atom nil)
opened-import-pool (atom nil)]
[repo reset? graph-id graph-e2ee? & [total-datoms]]
(let [graph-e2ee? (if (nil? graph-e2ee?) true (true? graph-e2ee?))]
(-> (p/let [_ (when-let [state @*import-state]
(close-import-state! state))
(close-import-state! state)
(close-db! (:repo state)))
_ (reset! *import-state nil)
_ (when reset? (close-db! repo))
_ (when reset? (<invalidate-search-db! repo))
@@ -765,55 +767,29 @@
(sync-crypt/<fetch-graph-aes-key-for-download graph-id))
_ (when (and graph-e2ee? (nil? aes-key))
(db-sync/fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
^js pool (if graph-e2ee?
(<create-db-sync-import-pool repo import-id)
(<get-opfs-pool repo))
_ (when graph-e2ee?
(reset! opened-import-pool pool))
^js db (new (.-OpfsSAHPoolDb pool) repo-path)
_ (reset! opened-db db)
_ (common-sqlite/create-kvs-table! db)
_ (enable-sqlite-wal-mode! db)
_ (when reset? (.exec db "DELETE FROM kvs"))]
_ ((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? true
:sync-download-graph? true})
conn (worker-state/get-datascript-conn repo)
_ (when-not conn
(db-sync/fail-fast :db-sync/missing-field {:repo repo :field :datascript-conn}))]
(reset! *import-state {:aes-key aes-key
:db db
:conn conn
:graph-e2ee? graph-e2ee?
:graph-id graph-id
:import-id import-id
:imported-rows 0
:import-pool @opened-import-pool
:imported-datoms 0
:repo repo
:snapshot-buffer nil
:total-rows total-rows})
:total-datoms total-datoms})
{:import-id import-id})
(p/catch (fn [error]
(close-import-state! {:db @opened-db
:import-pool @opened-import-pool})
(reset! *import-state nil)
(throw error))))))
(def-thread-api :thread-api/db-sync-import-rows-chunk
[rows graph-id import-id]
(-> (p/let [{:keys [db aes-key graph-e2ee?]} (require-import-state! nil graph-id import-id)
_ (import-snapshot-rows! db aes-key graph-e2ee? rows)]
(log-import-progress! graph-id import-id (count rows))
true)
(p/catch (fn [error]
(when-not (= :db-sync/stale-import (:type (ex-data error)))
(clear-import-state! import-id))
(throw error)))))
(def-thread-api :thread-api/db-sync-import-framed-chunk
[chunk graph-id import-id]
(-> (p/let [{:keys [db aes-key graph-e2ee? snapshot-buffer]} (require-import-state! nil graph-id import-id)
{:keys [rows buffer]} (snapshot/parse-framed-chunk snapshot-buffer chunk)
_ (swap! *import-state
(fn [state]
(if (= import-id (:import-id state))
(assoc state :snapshot-buffer buffer)
state)))
_ (when (seq rows)
(import-snapshot-rows! db aes-key graph-e2ee? rows))
_ (log-import-progress! graph-id import-id (count rows))]
(def-thread-api :thread-api/db-sync-import-datoms-chunk
[datoms graph-id import-id]
(-> (p/let [{:keys [conn aes-key graph-e2ee?]} (require-import-state! nil graph-id import-id)
_ (import-datoms-batch! conn aes-key graph-e2ee? datoms)]
(log-import-progress! graph-id import-id (count datoms))
true)
(p/catch (fn [error]
(when-not (= :db-sync/stale-import (:type (ex-data error)))
@@ -822,27 +798,8 @@
(def-thread-api :thread-api/db-sync-import-finalize
[repo graph-id remote-tx import-id]
(-> (p/let [{:keys [db aes-key graph-e2ee? import-pool snapshot-buffer]} (require-import-state! repo graph-id import-id)
rows (when (and snapshot-buffer (pos? (.-byteLength snapshot-buffer)))
(snapshot/finalize-framed-buffer snapshot-buffer))
_ (when (seq rows)
(import-snapshot-rows! db aes-key graph-e2ee? rows))
_ (log-import-progress! graph-id import-id (count rows))
datoms (when graph-e2ee?
(let [storage (new-sqlite-storage db)
conn (common-sqlite/get-storage-conn storage db-schema/schema)
source-db @conn]
(d/datoms source-db :eavt)))
_ (when-not graph-e2ee?
(.exec db "PRAGMA wal_checkpoint(2)"))
_ (when-not graph-e2ee?
(.close db))
result (import-datoms-to-db! repo graph-id remote-tx datoms)
_ (when graph-e2ee?
(.close db))
_ (when graph-e2ee?
(when import-pool
(remove-vfs! import-pool)))
(-> (p/let [_ (require-import-state! repo graph-id import-id)
result (complete-datoms-import! repo graph-id remote-tx)
_ (reset! *import-state nil)]
result)
(p/catch (fn [error]

View File

@@ -1958,6 +1958,7 @@
[repo {:keys [tx-data tx-meta db-after] :as tx-report}]
(when (and (seq tx-data)
(not (:rtc-tx? tx-meta))
(not (:sync-download-graph? tx-meta))
(:persist-op? tx-meta true)
(:kv/value (d/entity db-after :logseq.kv/graph-remote?)))
(enqueue-local-tx! repo tx-report)

View File

@@ -503,6 +503,16 @@
[aes-key rows-batch]
(p/all (map #(<decrypt-snapshot-row aes-key %) rows-batch)))
(defn <decrypt-snapshot-datoms-batch
[aes-key datoms]
(p/all
(map (fn [{:keys [a v] :as datom}]
(if (contains? sync-const/encrypt-attr-set a)
(p/let [v' (<decrypt-text-value aes-key v)]
(assoc datom :v v'))
(p/resolved datom)))
datoms)))
(defn <encrypt-datoms
([aes-key datoms]
(<encrypt-datoms aes-key datoms nil))

View File

@@ -6,22 +6,11 @@
[frontend.handler.user :as user-handler]
[frontend.state :as state]
[logseq.db :as ldb]
[logseq.db.sqlite.util :as sqlite-util]
[logseq.db-sync.snapshot :as snapshot]
[promesa.core :as p]))
(def ^:private test-text-encoder (js/TextEncoder.))
(defn- frame-bytes [^js data]
(let [len (.-byteLength data)
out (js/Uint8Array. (+ 4 len))
view (js/DataView. (.-buffer out))]
(.setUint32 view 0 len false)
(.set out data 4)
out))
(defn- encode-framed-rows [rows]
(let [payload (.encode test-text-encoder (sqlite-util/write-transit-str rows))]
(frame-bytes payload)))
(defn- encode-datoms-jsonl [datoms]
(snapshot/encode-datoms-jsonl datoms))
(defn- <gzip-bytes [^js payload]
(if (exists? js/CompressionStream)
@@ -219,18 +208,19 @@
(async done
(let [import-calls (atom [])
fetch-calls (atom [])
rows [[1 "content-1" "addresses-1"]
[2 "content-2" "addresses-2"]]
framed-bytes (encode-framed-rows rows)
datoms [{:e 1 :a :db/ident :v :logseq.class/Page :tx 1 :added true}
{:e 2 :a :block/title :v "hello" :tx 1 :added true}]
jsonl-bytes (encode-datoms-jsonl datoms)
original-fetch js/fetch
stream-url "http://base/sync/graph-1/snapshot/stream"]
(-> (p/let [gzip-bytes (<gzip-bytes framed-bytes)]
download-url "http://base/sync/graph-1/snapshot/download"
asset-url "http://base/assets/graph-1/snapshot-1.snapshot"]
(-> (p/let [gzip-bytes (<gzip-bytes jsonl-bytes)]
(set! js/fetch
(fn [url opts]
(let [method (or (aget opts "method") "GET")]
(swap! fetch-calls conj [url method])
(cond
(and (= url stream-url) (= method "GET"))
(and (= url asset-url) (= method "GET"))
(js/Promise.resolve
#js {:ok true
:status 200
@@ -249,6 +239,12 @@
(string/ends-with? url "/pull")
(p/resolved {:t 42})
(= url download-url)
(p/resolved {:ok true
:url asset-url
:key "graph-1/snapshot-1.snapshot"
:content-encoding "gzip"})
:else
(p/rejected (ex-info "unexpected fetch-json URL"
{:url url}))))
@@ -266,15 +262,15 @@
(p/then (fn [_]
(is (= 3 (count @import-calls)))
(let [[prepare-op graph reset? graph-uuid graph-e2ee?] (first @import-calls)
[chunk-op imported-rows chunk-graph-uuid import-id] (second @import-calls)
[chunk-op imported-datoms chunk-graph-uuid import-id] (second @import-calls)
[finalize-op finalize-graph finalize-graph-uuid remote-tx finalize-import-id] (nth @import-calls 2)]
(is (= :thread-api/db-sync-import-prepare prepare-op))
(is (string/ends-with? graph "demo-graph"))
(is (= true reset?))
(is (= "graph-1" graph-uuid))
(is (= false graph-e2ee?))
(is (= :thread-api/db-sync-import-rows-chunk chunk-op))
(is (= rows imported-rows))
(is (= :thread-api/db-sync-import-datoms-chunk chunk-op))
(is (= datoms imported-datoms))
(is (= "graph-1" chunk-graph-uuid))
(is (= "import-1" import-id))
(is (= :thread-api/db-sync-import-finalize finalize-op))
@@ -282,7 +278,7 @@
(is (= "graph-1" finalize-graph-uuid))
(is (= 42 remote-tx))
(is (= "import-1" finalize-import-id)))
(is (= [[stream-url "GET"]]
(is (= [[asset-url "GET"]]
@fetch-calls))
(done)))
(p/catch (fn [error]
@@ -290,86 +286,24 @@
(is false (str error))
(done)))))))
(deftest rtc-download-graph-streams-identity-snapshot-test
(async done
(let [import-calls (atom [])
rows [[1 "content-1" "addresses-1"]
[2 "content-2" "addresses-2"]]
framed-bytes (encode-framed-rows rows)
original-fetch js/fetch
stream-url "http://base/sync/graph-1/snapshot/stream"
worker-prev @state/*db-worker]
(reset! state/*db-worker nil)
(-> (p/let [stream (bytes->stream framed-bytes 3)]
(set! js/fetch
(fn [url opts]
(let [method (or (aget opts "method") "GET")]
(cond
(and (= url stream-url) (= method "GET"))
(js/Promise.resolve
#js {:ok true
:status 200
:headers #js {:get (fn [header]
(case header
"content-length" (str (.-byteLength framed-bytes))
"content-encoding" "identity"
nil))}
:body stream
:arrayBuffer (fn [] (throw (js/Error. "arrayBuffer should not be used")))})
:else
(js/Promise.resolve #js {:ok false :status 404})))))
(-> (p/with-redefs [db-sync/http-base (fn [] "http://base")
db-sync/fetch-json (fn [url _opts _schema]
(cond
(string/ends-with? url "/pull")
(p/resolved {:t 42})
:else
(p/rejected (ex-info "unexpected fetch-json URL"
{:url url}))))
user-handler/task--ensure-id&access-token (fn [resolve _reject]
(resolve true))
state/<invoke-db-worker (fn [& args]
(swap! import-calls conj args)
(if (= :thread-api/db-sync-import-prepare (first args))
(p/resolved {:import-id "import-1"})
(p/resolved :ok)))
state/set-state! (fn [& _] nil)
state/pub-event! (fn [& _] nil)]
(db-sync/<rtc-download-graph! "demo-graph" "graph-1" false))
(p/finally (fn [] (set! js/fetch original-fetch)))))
(p/then (fn [_]
(is (= 3 (count @import-calls)))
(let [[chunk-op imported-rows _ import-id] (second @import-calls)]
(is (= :thread-api/db-sync-import-rows-chunk chunk-op))
(is (= rows imported-rows))
(is (= "import-1" import-id)))
(done)))
(p/catch (fn [error]
(reset! state/*db-worker worker-prev)
(set! js/fetch original-fetch)
(is false (str error))
(done)))
(p/finally (fn []
(reset! state/*db-worker worker-prev)))))))
(deftest rtc-download-graph-streams-gzip-snapshot-test
(async done
(let [import-calls (atom [])
rows [[1 "content-1" "addresses-1"]
[2 "content-2" "addresses-2"]]
framed-bytes (encode-framed-rows rows)
datoms [{:e 1 :a :db/ident :v :logseq.class/Page :tx 1 :added true}
{:e 2 :a :block/title :v "hello" :tx 1 :added true}]
jsonl-bytes (encode-datoms-jsonl datoms)
original-fetch js/fetch
stream-url "http://base/sync/graph-1/snapshot/stream"
download-url "http://base/sync/graph-1/snapshot/download"
asset-url "http://base/assets/graph-1/snapshot-1.snapshot"
worker-prev @state/*db-worker]
(reset! state/*db-worker nil)
(-> (p/let [gzip-bytes (<gzip-bytes framed-bytes)
(-> (p/let [gzip-bytes (<gzip-bytes jsonl-bytes)
stream (bytes->stream gzip-bytes 3)]
(set! js/fetch
(fn [url opts]
(let [method (or (aget opts "method") "GET")]
(cond
(and (= url stream-url) (= method "GET"))
(and (= url asset-url) (= method "GET"))
(js/Promise.resolve
#js {:ok true
:status 200
@@ -388,6 +322,12 @@
(string/ends-with? url "/pull")
(p/resolved {:t 42})
(= url download-url)
(p/resolved {:ok true
:url asset-url
:key "graph-1/snapshot-1.snapshot"
:content-encoding "gzip"})
:else
(p/rejected (ex-info "unexpected fetch-json URL"
{:url url}))))
@@ -404,9 +344,9 @@
(p/finally (fn [] (set! js/fetch original-fetch)))))
(p/then (fn [_]
(is (= 3 (count @import-calls)))
(let [[chunk-op imported-rows _ import-id] (second @import-calls)]
(is (= :thread-api/db-sync-import-rows-chunk chunk-op))
(is (= rows imported-rows))
(let [[chunk-op imported-datoms _ import-id] (second @import-calls)]
(is (= :thread-api/db-sync-import-datoms-chunk chunk-op))
(is (= datoms imported-datoms))
(is (= "import-1" import-id)))
(done)))
(p/catch (fn [error]

View File

@@ -57,7 +57,7 @@
(is (nil? (get @search/fuzzy-search-indices test-repo)))
(is (nil? (get @worker-state/*sqlite-conns test-repo)))))))
(deftest import-datoms-to-db-invalidates-existing-search-db-test
(deftest complete-datoms-import-invalidates-existing-search-db-test
(async done
(restoring-worker-state
(fn []
@@ -70,7 +70,7 @@
worker-state/get-sqlite-conn (fn [_repo _type] nil)
client-op/update-local-tx (fn [& _] nil)
shared-service/broadcast-to-clients! (fn [& _] nil)]
(#'db-worker/import-datoms-to-db! test-repo "graph-1" 42 nil))
(#'db-worker/complete-datoms-import! test-repo "graph-1" 42))
(p/then (fn [_]
(is true)
(vreset! thread-api/*thread-apis thread-apis-prev)
@@ -80,16 +80,6 @@
(is false (str error))
(done)))))))))
(defn- fake-import-pool
[labels closed]
#js {:OpfsSAHPoolDb
(let [remaining (atom labels)]
(fn [_path]
(let [label (first @remaining)
_ (swap! remaining rest)]
#js {:exec (fn [_] nil)
:close (fn [] (swap! closed conj label))})))})
(defn- capture-outcome
[thunk]
(try
@@ -99,232 +89,179 @@
(catch :default error
(p/resolved {:error error}))))
(defn- make-snapshot-rows
[n]
(mapv (fn [i]
[i (str "content-" i) (str "addresses-" i)])
(range n)))
(defn- with-fake-create-or-open-db
[repo conn f]
(let [thread-apis-prev @thread-api/*thread-apis]
(vreset! thread-api/*thread-apis
(assoc thread-apis-prev
:thread-api/create-or-open-db
(fn [_repo _opts]
(swap! worker-state/*datascript-conns assoc repo conn)
(p/resolved nil))))
(-> (f)
(p/finally (fn []
(vreset! thread-api/*thread-apis thread-apis-prev))))))
(def sample-datoms
[{:e 1 :a :db/ident :v :logseq.class/Page :tx 1 :added true}
{:e 2 :a :block/title :v "hello" :tx 1 :added true}])
(deftest db-sync-import-prepare-replaces-active-import-state-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)]
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:first :second] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)]
(p/let [first-import (prepare test-repo false "graph-1" false)
second-import (prepare test-repo false "graph-1" false)]
(is (map? first-import))
(is (map? second-import))
(is (not= (:import-id first-import) (:import-id second-import)))
(is (= [:first] @closed))))
(p/then (fn [_] (done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
conn-a (d/create-conn db-schema/schema)
conn-b (d/create-conn db-schema/schema)]
(with-fake-create-or-open-db
test-repo conn-a
(fn []
(-> (p/with-redefs [db-worker/close-db! (fn [_] nil)
db-worker/<invalidate-search-db! (fn [_] (p/resolved nil))]
(p/let [first-import (prepare test-repo true "graph-1" false)
_ (swap! worker-state/*datascript-conns assoc test-repo conn-b)
second-import (prepare test-repo true "graph-1" false)]
(is (map? first-import))
(is (map? second-import))
(is (not= (:import-id first-import) (:import-id second-import)))))
(p/then (fn [_] (done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))))
(deftest db-sync-import-prepare-cleans-up-failed-setup-test
(deftest db-sync-import-datoms-chunk-rejects-stale-import-id-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
setup-calls (atom 0)
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)]
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:failed :retry] closed)))
common-sqlite/create-kvs-table! (fn [_]
(if (zero? @setup-calls)
(do
(swap! setup-calls inc)
(throw (ex-info "setup failed" {})))
nil))
db-worker/enable-sqlite-wal-mode! (fn [_] nil)]
(p/let [failed-outcome (capture-outcome #(prepare test-repo false "graph-1" false))
retry-import (prepare test-repo false "graph-1" false)]
(is (= "setup failed" (some-> failed-outcome :error ex-message)))
(is (= [:failed] @closed))
(is (map? retry-import))
(is (:import-id retry-import))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
datoms-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-datoms-chunk)
conn (d/create-conn db-schema/schema)]
(with-fake-create-or-open-db
test-repo conn
(fn []
(-> (p/with-redefs [db-worker/close-db! (fn [_] nil)
db-worker/<invalidate-search-db! (fn [_] (p/resolved nil))
rtc-log-and-state/rtc-log (fn [& _] nil)]
(p/let [first-import (prepare test-repo true "graph-1" false)
second-import (prepare test-repo true "graph-1" false)
stale-outcome (capture-outcome #(datoms-chunk sample-datoms "graph-1" (:import-id first-import)))]
(is (= :db-sync/stale-import (some-> stale-outcome :error ex-data :type)))
(is (nil? (d/entity @conn 2)))
(-> (datoms-chunk sample-datoms "graph-1" (:import-id second-import))
(p/then (fn [_]
(is (= "hello" (:block/title (d/entity @conn 2))))
(done))))))
(p/catch (fn [error]
(is false (str error))
(done)))))))))))
(deftest db-sync-import-rows-chunk-rejects-stale-import-id-test
(deftest db-sync-import-datoms-chunk-imports-plain-datoms-to-active-db-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
upserts (atom [])
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
rows-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-rows-chunk)
finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)]
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:first :second] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)
db-worker/upsert-addr-content! (fn [db binds]
(swap! upserts conj {:db db :binds binds}))
rtc-log-and-state/rtc-log (fn [& _] nil)
db-worker/import-datoms-to-db! (fn [& _] (p/resolved nil))]
(p/let [first-import (prepare test-repo false "graph-1" false)
second-import (prepare test-repo false "graph-1" false)
stale-outcome (capture-outcome #(rows-chunk [[1 "content-1" "addresses-1"]] "graph-1" (:import-id first-import)))]
(is (= :db-sync/stale-import (some-> stale-outcome :error ex-data :type)))
(is (empty? @upserts))
(-> (rows-chunk [[2 "content-2" "addresses-2"]] "graph-1" (:import-id second-import))
(p/then (fn [_]
(is (= 1 (count @upserts)))
(finalize test-repo "graph-1" 42 (:import-id second-import))))
(p/then (fn [_] (done))))))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
datoms-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-datoms-chunk)
conn (d/create-conn db-schema/schema)]
(with-fake-create-or-open-db
test-repo conn
(fn []
(-> (p/with-redefs [db-worker/close-db! (fn [_] nil)
db-worker/<invalidate-search-db! (fn [_] (p/resolved nil))
rtc-log-and-state/rtc-log (fn [& _] nil)]
(p/let [{:keys [import-id]} (prepare test-repo true "graph-1" false)
_ (datoms-chunk sample-datoms "graph-1" import-id)]
(is (= :logseq.class/Page (:db/ident (d/entity @conn 1))))
(is (= "hello" (:block/title (d/entity @conn 2))))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))))
(deftest db-sync-import-rows-chunk-imports-plain-rows-in-a-single-write-batch-test
(deftest db-sync-import-datoms-chunk-imports-encrypted-datoms-to-active-db-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
upserts (atom [])
rows (make-snapshot-rows 250)
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
rows-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-rows-chunk)]
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:plain] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)
db-worker/upsert-addr-content! (fn [db binds]
(swap! upserts conj {:db db :binds binds}))
rtc-log-and-state/rtc-log (fn [& _] nil)]
(p/let [{:keys [import-id]} (prepare test-repo false "graph-1" false)
_ (rows-chunk rows "graph-1" import-id)]
(is (= 1 (count @upserts)))
(is (= (count rows) (count (:binds (first @upserts)))))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(deftest db-sync-import-rows-chunk-imports-encrypted-rows-in-a-single-write-batch-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
upserts (atom [])
decrypt-calls (atom [])
rows (make-snapshot-rows 250)
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
rows-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-rows-chunk)]
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:encrypted] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)
sync-crypt/<fetch-graph-aes-key-for-download (fn [_] (p/resolved :aes-key))
sync-crypt/<decrypt-snapshot-rows-batch (fn [aes-key rows-batch]
(swap! decrypt-calls conj {:aes-key aes-key
:rows rows-batch})
(p/resolved rows-batch))
db-worker/upsert-addr-content! (fn [db binds]
(swap! upserts conj {:db db :binds binds}))
rtc-log-and-state/rtc-log (fn [& _] nil)]
(p/let [{:keys [import-id]} (prepare test-repo false "graph-1" true)
_ (rows-chunk rows "graph-1" import-id)]
(is (= 1 (count @decrypt-calls)))
(is (= rows (:rows (first @decrypt-calls))))
(is (= 1 (count @upserts)))
(is (= (count rows) (count (:binds (first @upserts)))))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
datoms-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-datoms-chunk)
conn (d/create-conn db-schema/schema)
decrypt-calls (atom [])]
(with-fake-create-or-open-db
test-repo conn
(fn []
(-> (p/with-redefs [db-worker/close-db! (fn [_] nil)
db-worker/<invalidate-search-db! (fn [_] (p/resolved nil))
rtc-log-and-state/rtc-log (fn [& _] nil)
sync-crypt/<fetch-graph-aes-key-for-download (fn [_] (p/resolved :aes-key))
sync-crypt/<decrypt-snapshot-datoms-batch (fn [aes-key datoms]
(swap! decrypt-calls conj {:aes-key aes-key
:datoms datoms})
(p/resolved datoms))]
(p/let [{:keys [import-id]} (prepare test-repo true "graph-1" true)
_ (datoms-chunk sample-datoms "graph-1" import-id)]
(is (= 1 (count @decrypt-calls)))
(is (= sample-datoms (:datoms (first @decrypt-calls))))
(is (= "hello" (:block/title (d/entity @conn 2))))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))))
(deftest db-sync-import-finalize-rejects-stale-import-id-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
(let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)
conn (d/create-conn db-schema/schema)
finalized (atom [])]
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:first :second] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)
db-worker/import-datoms-to-db! (fn [& args]
(swap! finalized conj args)
(p/resolved nil))]
(p/let [first-import (prepare test-repo false "graph-1" false)
second-import (prepare test-repo false "graph-1" false)
stale-outcome (capture-outcome #(finalize test-repo "graph-1" 42 (:import-id first-import)))]
(is (= :db-sync/stale-import (some-> stale-outcome :error ex-data :type)))
(is (empty? @finalized))
(-> (finalize test-repo "graph-1" 42 (:import-id second-import))
(p/then (fn [_]
(is (= 1 (count @finalized)))
(done))))))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(with-fake-create-or-open-db
test-repo conn
(fn []
(-> (p/with-redefs [db-worker/close-db! (fn [_] nil)
db-worker/<invalidate-search-db! (fn [_] (p/resolved nil))
db-sync/rehydrate-large-titles-from-db! (fn [& _] (swap! finalized conj :rehydrate) (p/resolved nil))
rtc-log-and-state/rtc-log (fn [& _] nil)
client-op/update-local-tx (fn [& _] (swap! finalized conj :local-tx))
shared-service/broadcast-to-clients! (fn [& _] (swap! finalized conj :broadcast))]
(p/let [first-import (prepare test-repo true "graph-1" false)
second-import (prepare test-repo true "graph-1" false)
stale-outcome (capture-outcome #(finalize test-repo "graph-1" 42 (:import-id first-import)))]
(is (= :db-sync/stale-import (some-> stale-outcome :error ex-data :type)))
(is (empty? @finalized))
(-> (finalize test-repo "graph-1" 42 (:import-id second-import))
(p/then (fn [_]
(is (= [:rehydrate :local-tx :broadcast] @finalized))
(done))))))
(p/catch (fn [error]
(is false (str error))
(done)))))))))))
(deftest db-sync-import-finalize-rebuilds-into-fresh-db-for-e2ee-import-test
(deftest db-sync-import-finalize-completes-active-db-import-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
removed (atom [])
captured (atom nil)
pool #js {:removeVfs (fn [] (swap! removed conj :removed))}
datoms [{:e 171 :a :block/name :v "$$$views" :tx 1 :added true}]
storage-conn (d/create-conn db-schema/schema)
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)]
(reset! worker-state/*opfs-pools {test-repo pool})
(d/transact! storage-conn
(mapv (fn [{:keys [e a v]}]
[:db/add e a v])
datoms))
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:encrypted] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)
sync-crypt/<fetch-graph-aes-key-for-download (fn [_] (p/resolved :aes-key))
common-sqlite/get-storage-conn (fn [_ _] storage-conn)
db-worker/import-datoms-to-db! (fn [& args]
(reset! captured args)
(p/resolved nil))]
(p/let [{:keys [import-id]} (prepare test-repo false "graph-1" true)
_ (finalize test-repo "graph-1" 42 import-id)]
(let [[repo graph-id remote-tx imported-datoms] @captured]
(is (= test-repo repo))
(is (= "graph-1" graph-id))
(is (= 42 remote-tx))
(is (= [[171 :block/name "$$$views"]]
(mapv (fn [d] [(:e d) (:a d) (:v d)]) imported-datoms))))
(is (nil? (get @worker-state/*opfs-pools test-repo)))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(deftest db-sync-import-finalize-keeps-direct-open-for-non-e2ee-import-test
(async done
(restoring-worker-state
(fn []
(let [closed (atom [])
removed (atom [])
captured (atom nil)
pool #js {:removeVfs (fn [] (swap! removed conj :removed))}
prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)]
(reset! worker-state/*opfs-pools {test-repo pool})
(-> (p/with-redefs [db-worker/<get-opfs-pool (fn [_] (p/resolved (fake-import-pool [:plain] closed)))
common-sqlite/create-kvs-table! (fn [_] nil)
db-worker/enable-sqlite-wal-mode! (fn [_] nil)
db-worker/import-datoms-to-db! (fn [& args]
(reset! captured args)
(p/resolved nil))]
(p/let [{:keys [import-id]} (prepare test-repo false "graph-1" false)
_ (finalize test-repo "graph-1" 42 import-id)]
(is (= [test-repo "graph-1" 42 nil] @captured))
(is (empty? @removed))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
datoms-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-datoms-chunk)
finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)
conn (d/create-conn db-schema/schema)
search-db #js {:close (fn [] nil)}
main-db #js {:exec (fn [_sql] nil)}]
(reset! worker-state/*sqlite-conns {test-repo {:db main-db :search search-db :client-ops nil}})
(with-fake-create-or-open-db
test-repo conn
(fn []
(-> (p/with-redefs [db-worker/close-db! (fn [_] nil)
db-worker/<invalidate-search-db! (fn [_] (p/resolved nil))
db-sync/rehydrate-large-titles-from-db! (fn [& _] (p/resolved nil))
rtc-log-and-state/rtc-log (fn [& _] nil)
client-op/update-local-tx (fn [& _] nil)
shared-service/broadcast-to-clients! (fn [& _] nil)]
(p/let [{:keys [import-id]} (prepare test-repo true "graph-1" false)
_ (datoms-chunk sample-datoms "graph-1" import-id)
_ (finalize test-repo "graph-1" 42 import-id)]
(is (= :logseq.class/Page (:db/ident (d/entity @conn 1))))
(is (= "hello" (:block/title (d/entity @conn 2))))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))))

View File

@@ -11,6 +11,21 @@
(ldb/write-transit-str encrypted)))
;; bb dev:test -v frontend.worker.sync.crypt-test works, however bb dev:lint-and-test failed
(deftest decrypt-snapshot-datoms-test
(async done
(-> (p/let [aes-key (crypt/<generate-aes-key)
encrypted-title (<encrypt-text-for-snapshot aes-key "Title")
encrypted-name (<encrypt-text-for-snapshot aes-key "name")
datoms [{:e 1 :a :block/title :v encrypted-title :tx 1000 :added true}
{:e 1 :a :block/name :v encrypted-name :tx 1000 :added true}]
decrypted (sync-crypt/<decrypt-snapshot-datoms-batch aes-key datoms)]
(is (= "Title" (:v (first decrypted))))
(is (= "name" (:v (second decrypted))))
(done))
(p/catch (fn [e]
(is false (str e))
(done))))))
(deftest ^:fix-me decrypt-snapshot-rows-test
(async done
(-> (p/let [aes-key (crypt/<generate-aes-key)