diff --git a/deps/db-sync/src/logseq/db_sync/common.cljs b/deps/db-sync/src/logseq/db_sync/common.cljs index 521b3520dd..f189afccdb 100644 --- a/deps/db-sync/src/logseq/db_sync/common.cljs +++ b/deps/db-sync/src/logseq/db_sync/common.cljs @@ -8,7 +8,7 @@ #js {"Access-Control-Allow-Origin" "*" "Access-Control-Allow-Headers" "content-type,content-encoding,authorization,x-amz-meta-checksum,x-amz-meta-type" "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD" - "Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type,x-snapshot-row-count"}) + "Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type,x-asset-size,x-snapshot-datom-count"}) (defn json-response ([data] (json-response data 200)) diff --git a/deps/db-sync/src/logseq/db_sync/snapshot.cljs b/deps/db-sync/src/logseq/db_sync/snapshot.cljs index 74ab9b06a9..4174f166f1 100644 --- a/deps/db-sync/src/logseq/db_sync/snapshot.cljs +++ b/deps/db-sync/src/logseq/db_sync/snapshot.cljs @@ -4,6 +4,7 @@ (def ^:private transit-w (transit/writer :json)) (def ^:private transit-r (transit/reader :json)) (def ^:private text-decoder (js/TextDecoder.)) +(def ^:private newline-byte 10) (defn- ->uint8 [data] @@ -77,3 +78,55 @@ (+ total 4 (.-byteLength payload)))) 0 rows-batches)) + +(defn- decode-transit + [payload] + (transit/read transit-r (.decode text-decoder (->uint8 payload)))) + +(defn encode-datoms-jsonl + [datoms] + (->uint8 + (apply str + (map (fn [datom] + (str (transit/write transit-w datom) "\n")) + datoms)))) + +(defn- find-newline-offset + [^js data start total] + (loop [offset start] + (cond + (>= offset total) + nil + + (= newline-byte (aget data offset)) + offset + + :else + (recur (inc offset))))) + +(defn parse-datoms-jsonl-chunk + [buffer chunk] + (let [data (concat-bytes buffer chunk) + total (.-byteLength data)] + (loop [offset 0 + datoms []] + (let [newline-offset (find-newline-offset data offset total)] + (if (number? newline-offset) + (let [line (.slice data offset newline-offset) + next-offset (inc newline-offset) + datoms (if (zero? (.-byteLength line)) + datoms + (conj datoms (decode-transit line)))] + (recur next-offset datoms)) + {:datoms datoms + :buffer (when (< offset total) + (.slice data offset total))}))))) + +(defn finalize-datoms-jsonl-buffer + [buffer] + (if (or (nil? buffer) (zero? (.-byteLength buffer))) + [] + (let [{:keys [datoms buffer]} (parse-datoms-jsonl-chunk nil buffer)] + (cond-> datoms + (and buffer (pos? (.-byteLength buffer))) + (conj (decode-transit buffer)))))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs index 47d280e16c..fcbc453a8f 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs @@ -2,7 +2,8 @@ (:require [cljs-bean.core :as bean] [clojure.string :as string] [logseq.db-sync.common :as common :refer [cors-headers]] - [logseq.db-sync.worker.http :as http])) + [logseq.db-sync.worker.http :as http] + [promesa.core :as p])) (def ^:private max-asset-size (* 100 1024 1024)) @@ -26,6 +27,69 @@ (.-readable fixed)) body)) +(defn- body .-pipeTo))) + (p/resolved (maybe-fixed-length-body body size)) + + ;; Some runtimes drop content-length for streamed bodies without a fixed-length wrapper. + ;; Buffer as a fallback so clients still receive the header. + (and (number? size) + (fn? (some-> body .-getReader))) + (p/let [resp (js/Response. body) + buf (.arrayBuffer resp)] + buf) + + :else + (p/resolved body))) + +(defn- handle-get-asset + [^js bucket key asset-type] + (.then (.get bucket key) + (fn [^js obj] + (if (nil? obj) + (http/error-response "not found" 404) + (let [metadata (.-httpMetadata obj) + content-type (or (.-contentType metadata) + "application/octet-stream") + content-encoding (.-contentEncoding metadata) + cache-control (.-cacheControl metadata) + size (parse-size (or (.-size obj) + (some-> (.-body obj) .-byteLength))) + content-length (cond + (number? size) (str size) + (string? size) size + :else nil)] + (p/let [body ( {"content-type" content-type + "x-asset-type" asset-type} + (and (string? content-length) + (pos? (.-length content-length))) + (assoc "content-length" content-length) + (and (string? content-length) + (pos? (.-length content-length))) + (assoc "x-asset-size" content-length) + (and (string? content-encoding) + (not= content-encoding "null") + (pos? (.-length content-encoding))) + (assoc "content-encoding" content-encoding) + (and (string? cache-control) + (pos? (.-length cache-control))) + (assoc "cache-control" cache-control) + true + (bean/->js))] + (js/Response. body + #js {:status 200 + :headers (js/Object.assign + headers + (cors-headers))}))))))) + (defn parse-asset-path [path] (let [prefix "/assets/"] (when (string/starts-with? path prefix) @@ -57,41 +121,7 @@ (http/error-response "missing assets bucket" 500) (case method "GET" - (.then (.get bucket key) - (fn [^js obj] - (if (nil? obj) - (http/error-response "not found" 404) - (let [metadata (.-httpMetadata obj) - content-type (or (.-contentType metadata) - "application/octet-stream") - content-encoding (.-contentEncoding metadata) - cache-control (.-cacheControl metadata) - size (parse-size (or (.-size obj) - (some-> (.-body obj) .-byteLength))) - content-length (cond - (number? size) (str size) - (string? size) size - :else nil) - body (maybe-fixed-length-body (.-body obj) size) - headers (cond-> {"content-type" content-type - "x-asset-type" asset-type} - (and (string? content-length) - (pos? (.-length content-length))) - (assoc "content-length" content-length) - (and (string? content-encoding) - (not= content-encoding "null") - (pos? (.-length content-encoding))) - (assoc "content-encoding" content-encoding) - (and (string? cache-control) - (pos? (.-length cache-control))) - (assoc "cache-control" cache-control) - true - (bean/->js))] - (js/Response. body - #js {:status 200 - :headers (js/Object.assign - headers - (cors-headers))}))))) + (handle-get-asset bucket key asset-type) "PUT" (.then (.arrayBuffer request) diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs index b6d491602d..8adae015c2 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs @@ -16,7 +16,7 @@ (def ^:private snapshot-download-batch-size 10000) (def ^:private snapshot-cache-control "private, max-age=300") -(def ^:private snapshot-content-type "application/transit+json") +(def ^:private snapshot-content-type "application/x-ndjson") (def ^:private snapshot-content-encoding "gzip") (def ^:private snapshot-uploading-meta-key :snapshot-uploading?) ;; 10m @@ -57,35 +57,6 @@ (ensure-schema! self) (not= "true" (storage/get-meta (.-sql self) snapshot-uploading-meta-key))) -(defn- fetch-kvs-rows - [sql after limit] - (common/get-sql-rows - (common/sql-exec sql - "select addr, content, addresses from kvs where addr > ? order by addr asc limit ?" - after - limit))) - -(defn- snapshot-row-count - [sql] - (let [row (first (common/get-sql-rows - (common/sql-exec sql "select count(*) as total from kvs")))] - (cond - (array? row) - (aget row 0) - - (some? row) - (or (aget row "total") - (aget row "count(*)") - 0) - - :else - 0))) - -(defn- snapshot-row->tuple [row] - (if (array? row) - [(aget row 0) (aget row 1) (aget row 2)] - [(aget row "addr") (aget row "content") (aget row "addresses")])) - (defn- import-snapshot-rows! [sql table rows] (when (seq rows) @@ -121,9 +92,10 @@ stream)) (defn- maybe-compress-stream [stream] - (if (exists? js/CompressionStream) - (.pipeThrough stream (js/CompressionStream. snapshot-content-encoding)) - stream)) + (when-not (exists? js/CompressionStream) + (throw (ex-info "gzip compression not supported" + {:type :db-sync/compression-not-supported}))) + (.pipeThrough stream (js/CompressionStream. snapshot-content-encoding))) (defn- jsonl-datom + [datom] + {:e (:e datom) + :a (:a datom) + :v (:v datom) + :tx (:tx datom) + :added (:added datom)}) + +(defn- snapshot-datom-count + [conn] + (count (d/datoms @conn :eavt))) + +(defn- snapshot-export-datoms + [conn] + (let [db @conn + schema-version-eid (some-> (d/entity db :logseq.kv/schema-version) :db/id) + ident-eids (into #{} + (map :e) + (d/datoms db :avet :db/ident)) + jsonl-datoms (fn [pred] + (sequence + (comp (filter pred) + (map snapshot-datom->jsonl-datom)) + (d/datoms db :eavt)))] + (concat (jsonl-datoms #(= schema-version-eid (:e %))) + (jsonl-datoms #(and (contains? ident-eids (:e %)) + (not= schema-version-eid (:e %)))) + (jsonl-datoms #(not (contains? ident-eids (:e %))))))) + (defn- snapshot-export-stream [^js self] - (let [sql (.-sql self) - state (volatile! {:after -1 :done? false})] + (ensure-conn! self) + (let [remaining (volatile! (seq (snapshot-export-datoms (.-conn self))))] (js/ReadableStream. #js {:pull (fn [controller] - (p/let [{:keys [after done?]} @state] - (if done? + (let [batch (vec (take snapshot-download-batch-size @remaining))] + (if (empty? batch) (.close controller) - (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size) - rows (mapv snapshot-row->tuple rows) - last-addr (if (seq rows) - (apply max (map first rows)) - after) - done? (< (count rows) snapshot-download-batch-size)] - (when (seq rows) - (let [payload (snapshot/encode-rows rows) - framed (snapshot/frame-bytes payload)] - (.enqueue controller framed))) - (vswap! state assoc :after last-addr :done? done?)))))}))) + (let [remaining' (drop snapshot-download-batch-size @remaining) + payload (snapshot/encode-datoms-jsonl batch)] + (vreset! remaining (seq remaining')) + (.enqueue controller payload)))))}))) (defn- upload-multipart! [^js bucket key stream opts] @@ -203,42 +197,6 @@ (.abort upload) (throw error))))))) -(defn- snapshot-export-length [^js self] - (let [sql (.-sql self)] - (p/loop [after -1 - total 0] - (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)] - (if (empty? rows) - total - (let [rows (mapv snapshot-row->tuple rows) - payload (snapshot/encode-rows rows) - total (+ total 4 (.-byteLength payload)) - last-addr (apply max (map first rows)) - done? (< (count rows) snapshot-download-batch-size)] - (if done? - total - (p/recur last-addr total)))))))) - -(defn- snapshot-export-fixed-length [^js self] - (p/let [length (snapshot-export-length self) - stream (snapshot-export-stream self)] - (if (exists? js/FixedLengthStream) - (let [^js fixed (js/FixedLengthStream. length) - readable (.-readable fixed) - writable (.-writable fixed) - reader (.getReader stream) - writer (.getWriter writable)] - (p/let [_ (p/loop [] - (p/let [chunk (.read reader)] - (if (.-done chunk) - (.close writer) - (p/let [_ (.write writer (.-value chunk))] - (p/recur)))))] - readable)) - (p/let [resp (js/Response. stream) - buf (.arrayBuffer resp)] - buf)))) - (declare import-snapshot!) (defn- import-snapshot-stream! [^js self stream reset?] (let [reader (.getReader stream) @@ -370,19 +328,17 @@ (let [graph-id (graph-id-from-request request)] (if (not (seq graph-id)) (http/bad-request "missing graph id") - (let [use-compression? (exists? js/CompressionStream) - content-encoding (when use-compression? snapshot-content-encoding) - row-count (snapshot-row-count (.-sql self)) - stream (snapshot-export-stream self) - stream (if use-compression? - (maybe-compress-stream stream) - stream)] + (let [stream (-> (snapshot-export-stream self) + (maybe-compress-stream)) + conn (or (.-conn self) + (do (ensure-conn! self) (.-conn self))) + datom-count (snapshot-datom-count conn)] (js/Response. stream #js {:status 200 :headers (js/Object.assign #js {"content-type" snapshot-content-type - "content-encoding" (or content-encoding "identity")} - #js {"x-snapshot-row-count" (str row-count)} + "content-encoding" snapshot-content-encoding} + #js {"x-snapshot-datom-count" (str datom-count)} (common/cors-headers))}))))) (defn- handle-sync-snapshot-download @@ -399,31 +355,24 @@ :else (p/let [snapshot-id (str (random-uuid)) key (snapshot-key graph-id snapshot-id) - use-compression? (exists? js/CompressionStream) - content-encoding (when use-compression? snapshot-content-encoding) - stream (snapshot-export-stream self) - stream (if use-compression? - (maybe-compress-stream stream) - stream) + stream (-> (snapshot-export-stream self) + (maybe-compress-stream)) multipart? (and (some? (.-createMultipartUpload bucket)) (fn? (.-createMultipartUpload bucket))) opts #js {:httpMetadata #js {:contentType snapshot-content-type - :contentEncoding content-encoding + :contentEncoding snapshot-content-encoding :cacheControl snapshot-cache-control} :customMetadata #js {:purpose "snapshot" :created-at (str (common/now-ms))}} _ (if multipart? (upload-multipart! bucket key stream opts) - (if use-compression? - (p/let [body (stream + [^js payload] + (js/ReadableStream. + #js {:start (fn [controller] + (.enqueue controller payload) + (.close controller))})) + (deftest assets-get-includes-content-length-header-test (async done (let [payload (js/Uint8Array. #js [1 2 3 4]) @@ -16,9 +23,38 @@ :httpMetadata #js {:contentType "application/octet-stream"}}))}}] (-> (p/let [resp (assets/handle request env)] (is (= 200 (.-status resp))) - (is (= "4" (.get (.-headers resp) "content-length")))) + (is (= "4" (.get (.-headers resp) "content-length"))) + (is (= "4" (.get (.-headers resp) "x-asset-size")))) (p/then (fn [] (done))) (p/catch (fn [error] (is false (str error)) (done))))))) + +(deftest assets-get-includes-content-length-with-stream-body-without-fixed-length-stream-test + (async done + (let [payload (js/Uint8Array. #js [1 2 3 4]) + request (js/Request. "http://localhost/assets/graph-1/snapshot-1.snapshot" + #js {:method "GET"}) + env #js {:LOGSEQ_SYNC_ASSETS + #js {:get (fn [_key] + (js/Promise.resolve + #js {:body (bytes->stream payload) + :size 4 + :httpMetadata #js {:contentType "application/octet-stream"}}))}} + original-fixed-length-stream (.-FixedLengthStream js/globalThis) + restore! #(aset js/globalThis "FixedLengthStream" original-fixed-length-stream)] + (aset js/globalThis "FixedLengthStream" js/undefined) + (-> (p/let [resp (assets/handle request env) + buf (.arrayBuffer resp)] + (is (= 200 (.-status resp))) + (is (= "4" (.get (.-headers resp) "content-length"))) + (is (= "4" (.get (.-headers resp) "x-asset-size"))) + (is (= 4 (.-byteLength buf)))) + (p/then (fn [] + (restore!) + (done))) + (p/catch (fn [error] + (restore!) + (is false (str error)) + (done))))))) diff --git a/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs index f81da522fc..a84e9e0ab3 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs @@ -3,6 +3,7 @@ [datascript.core :as d] [logseq.db-sync.common :as common] [logseq.db-sync.protocol :as protocol] + [logseq.db-sync.snapshot :as snapshot] [logseq.db-sync.storage :as storage] [logseq.db-sync.test-sql :as test-sql] [logseq.db-sync.worker.handler.sync :as sync-handler] @@ -31,11 +32,19 @@ bucket #js {:put (fn [key body opts] (reset! put-call {:key key :body body :opts opts}) (js/Promise.resolve #js {:ok true}))} + conn (d/create-conn db-schema/schema) self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket} + :conn conn + :schema-ready true :sql (empty-sql)} {:keys [request url]} (request-url) original-compression-stream (.-CompressionStream js/globalThis) restore! #(aset js/globalThis "CompressionStream" original-compression-stream)] + (d/transact! conn [{:db/ident :logseq.class/Page + :block/title "Page"} + {:db/ident :logseq.kv/schema-version + :kv/value {:major 65 :minor 23}} + {:db/id 2 :block/title "hello"}]) (aset js/globalThis "CompressionStream" (passthrough-compression-stream-constructor)) @@ -45,10 +54,22 @@ :route {:handler :sync/snapshot-download}}) text (.text resp) body (js->clj (js/JSON.parse text) :keywordize-keys true) - http-metadata (aget (:opts @put-call) "httpMetadata")] + http-metadata (aget (:opts @put-call) "httpMetadata") + payload (js/Uint8Array. (:body @put-call)) + {:keys [datoms]} (snapshot/parse-datoms-jsonl-chunk nil payload)] (is (= 200 (.-status resp))) (is (= "gzip" (:content-encoding body))) - (is (= "gzip" (aget http-metadata "contentEncoding")))) + (is (= "gzip" (aget http-metadata "contentEncoding"))) + (is (= "application/x-ndjson" (aget http-metadata "contentType"))) + (is (= 5 (count datoms))) + (is (= [:logseq.kv/schema-version + :logseq.kv/schema-version + :logseq.kv/schema-version + :logseq.class/Page + :logseq.class/Page] + (mapv (fn [{:keys [e]}] + (:db/ident (d/entity @conn e))) + datoms)))) (p/then (fn [] (restore!) (done))) @@ -57,28 +78,36 @@ (is false (str error)) (done))))))) -(deftest snapshot-download-falls-back-to-uncompressed-when-compression-unsupported-test +(deftest snapshot-download-stream-route-returns-jsonl-datoms-test (async done - (let [put-call (atom nil) - bucket #js {:put (fn [key body opts] - (reset! put-call {:key key :body body :opts opts}) - (js/Promise.resolve #js {:ok true}))} - self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket} + (let [conn (d/create-conn db-schema/schema) + self #js {:env #js {} + :conn conn + :schema-ready true :sql (empty-sql)} - {:keys [request url]} (request-url) + {:keys [request]} (request-url "/sync/graph-1/snapshot/stream?graph-id=graph-1") original-compression-stream (.-CompressionStream js/globalThis) restore! #(aset js/globalThis "CompressionStream" original-compression-stream)] - (aset js/globalThis "CompressionStream" js/undefined) - (-> (p/let [resp (sync-handler/handle {:self self - :request request - :url url - :route {:handler :sync/snapshot-download}}) - text (.text resp) - body (js->clj (js/JSON.parse text) :keywordize-keys true) - http-metadata (aget (:opts @put-call) "httpMetadata")] + (d/transact! conn [{:db/ident :logseq.class/Page + :block/title "Page"} + {:db/ident :logseq.kv/schema-version + :kv/value {:major 65 :minor 23}} + {:db/id 2 :block/title "hello"}]) + (aset js/globalThis + "CompressionStream" + (passthrough-compression-stream-constructor)) + (-> (p/let [resp (sync-handler/handle-http self request) + encoding (.get (.-headers resp) "content-encoding") + content-type (.get (.-headers resp) "content-type") + buf (.arrayBuffer resp) + payload (js/Uint8Array. buf) + datoms (snapshot/finalize-datoms-jsonl-buffer payload)] (is (= 200 (.-status resp))) - (is (nil? (:content-encoding body))) - (is (nil? (aget http-metadata "contentEncoding")))) + (is (= "gzip" encoding)) + (is (= "application/x-ndjson" content-type)) + (is (= 5 (count datoms))) + (is (= :logseq.kv/schema-version + (:db/ident (d/entity @conn (:e (first datoms))))))) (p/then (fn [] (restore!) (done))) @@ -87,22 +116,6 @@ (is false (str error)) (done))))))) -(deftest snapshot-stream-route-works-via-handle-http-test - (async done - (let [self #js {:env #js {} - :sql (empty-sql)} - {:keys [request]} (request-url "/sync/graph-1/snapshot/stream?graph-id=graph-1")] - (-> (p/let [resp (sync-handler/handle-http self request) - encoding (.get (.-headers resp) "content-encoding")] - (is (= 200 (.-status resp))) - (is (= "application/transit+json" (.get (.-headers resp) "content-type"))) - (is (contains? #{"gzip" "identity"} encoding))) - (p/then (fn [] - (done))) - (p/catch (fn [error] - (is false (str error)) - (done))))))) - (deftest ensure-schema-fallback-probes-existing-tables-test (async done (let [self #js {:sql (empty-sql)} diff --git a/docs/agent-guide/db-sync/protocol.md b/docs/agent-guide/db-sync/protocol.md index 3800a5f24f..0207c4f4e9 100644 --- a/docs/agent-guide/db-sync/protocol.md +++ b/docs/agent-guide/db-sync/protocol.md @@ -93,9 +93,9 @@ - `GET /sync/:graph-id/snapshot/download` - Build a snapshot file in R2 and return a download URL. - Response: `{"ok":true,"key":"/.snapshot","url":"/assets/:graph-id/.snapshot","content-encoding":"gzip"}`. - - The snapshot file is a framed Transit JSON stream of kvs rows, optionally gzip-compressed. + - The snapshot file stored in R2 is a gzip-compressed NDJSON stream of full Datascript datoms. Each line is a Transit JSON datom map: `{e,a,v,tx,added}`. - `POST /sync/:graph-id/snapshot/upload?reset=true|false` - - Upload a snapshot stream (framed Transit JSON, optionally gzip-compressed). The server imports rows into kvs. + - Upload a snapshot stream for bootstrap import. Current upload format remains framed Transit JSON kvs rows, optionally gzip-compressed. - Request body: binary stream; headers should include `content-type: application/transit+json` and `content-encoding: gzip` when compressed. - Response: `{"ok":true,"count":,"key":"/.snapshot"}`. - Error response (400): `{"error":"missing body"|"missing graph id"}`. diff --git a/src/main/frontend/handler/db_based/sync.cljs b/src/main/frontend/handler/db_based/sync.cljs index e4f69ff549..903fa89f5b 100644 --- a/src/main/frontend/handler/db_based/sync.cljs +++ b/src/main/frontend/handler/db_based/sync.cljs @@ -12,6 +12,7 @@ [lambdaisland.glogi :as log] [logseq.db :as ldb] [logseq.db-sync.malli-schema :as db-sync-schema] + [logseq.db-sync.snapshot :as snapshot] [logseq.db.sqlite.util :as sqlite-util] [promesa.core :as p])) @@ -46,15 +47,6 @@ (when-not (js/isNaN parsed) parsed)))) -(def ^:private snapshot-text-decoder (js/TextDecoder.)) - -(defn- decode-snapshot-rows [payload] - (sqlite-util/read-transit-str (.decode snapshot-text-decoder (->uint8 payload)))) - -(defn- frame-len [^js data offset] - (let [view (js/DataView. (.-buffer data) offset 4)] - (.getUint32 view 0 false))) - (defn- concat-bytes [^js a ^js b] (cond @@ -66,34 +58,6 @@ (.set out b (.-byteLength a)) out))) -(defn- parse-framed-chunk - [buffer chunk] - (let [data (concat-bytes buffer chunk) - total (.-byteLength data)] - (loop [offset 0 - rows []] - (if (< (- total offset) 4) - {:rows rows - :buffer (when (< offset total) - (.slice data offset total))} - (let [len (frame-len data offset) - next-offset (+ offset 4 len)] - (if (<= next-offset total) - (let [payload (.slice data (+ offset 4) next-offset) - decoded (decode-snapshot-rows payload)] - (recur next-offset (into rows decoded))) - {:rows rows - :buffer (.slice data offset total)})))))) - -(defn- finalize-framed-buffer - [buffer] - (if (or (nil? buffer) (zero? (.-byteLength buffer))) - [] - (let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)] - (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer)))) - rows - (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows})))))) - (defn- gzip-bytes? [^js payload] (and (some? payload) @@ -158,17 +122,17 @@ {:chunk-count 1}) {:chunk-count 0})))) -(defn- = (count remaining) batch-size) (let [batch (subvec remaining 0 batch-size) - rest-rows (subvec remaining batch-size)] + rest-datoms (subvec remaining batch-size)] (p/let [_ (on-batch batch)] - (p/recur rest-rows))) + (p/recur rest-datoms))) remaining))) -(defn- uint8 (.-value result))) - pending (into pending rows)] - (p/let [pending (uint8 (.-value result))) + pending (into pending datoms)] + (p/let [pending (js (with-auth-headers {:method "GET"}))) - total-rows (parse-header-int resp "x-snapshot-row-count") _ (state/pub-event! [:rtc/log {:type :rtc.log/download :sub-type :download-progress :graph-uuid graph-uuid - :message (str "Start downloading graph snapshot, total rows: " - (or total-rows "unknown"))}])] + :message "Start downloading graph snapshot"}])] (when-not (.-ok resp) (throw (ex-info "snapshot download failed" {:graph graph-name @@ -445,25 +410,16 @@ (if-let [import-id @import-id*] (p/resolved import-id) (p/let [{:keys [import-id]} (state/ (p/do! (when-let [search-db (worker-state/get-sqlite-conn repo :search)] (search/truncate-table! search-db)) @@ -668,8 +667,6 @@ {:sub-type :download-progress :graph-uuid graph-id :message "Saving data to DB"}) - ((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? true - :datoms datoms}) (db-sync/rehydrate-large-titles-from-db! repo graph-id) (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :download-completed @@ -721,42 +718,47 @@ (throw (stale-import-ex-info repo graph-id import-id))) state)) -(defn- import-snapshot-rows-batch! - [db aes-key graph-e2ee? rows] - (p/let [rows-batch (if graph-e2ee? - (sync-crypt/sqlite-binds rows-batch)))) +(defn- datom->tx + [{:keys [e a v]}] + [:db/add e a v]) -(defn- import-snapshot-rows! - [db aes-key graph-e2ee? rows] - (let [batches (partition-all db-sync-import-batch-size rows)] - (p/doseq [batch batches] - (import-snapshot-rows-batch! db aes-key graph-e2ee? batch)))) +(defn- import-datoms-batch! + [conn aes-key graph-e2ee? datoms] + (p/let [datoms-batch (if graph-e2ee? + (sync-crypt/tx)) + datoms-batch) + regular-tx-data (into [] (comp (remove #(= :db/ident (:a %))) + (map datom->tx)) + datoms-batch) + tx-data (into ident-tx-data regular-tx-data)] + (when (seq tx-data) + (d/transact! conn tx-data {:sync-download-graph? true})))) (defn- log-import-progress! - [graph-id import-id rows-count] - (when (pos? rows-count) - (let [{:keys [imported-rows total-rows]} + [graph-id import-id datoms-count] + (when (pos? datoms-count) + (let [{:keys [imported-datoms total-datoms]} (swap! *import-state (fn [state] (if (= import-id (:import-id state)) - (update state :imported-rows (fnil + 0) rows-count) + (update state :imported-datoms (fnil + 0) datoms-count) state)))] (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :download-progress :graph-uuid graph-id - :message (if (some? total-rows) - (str "Importing data " imported-rows "/" total-rows) - (str "Importing data " imported-rows))})))) + :message (if (some? total-datoms) + (str "Importing data " imported-datoms "/" total-datoms) + (str "Importing data " imported-datoms))})))) (def-thread-api :thread-api/db-sync-import-prepare - [repo reset? graph-id graph-e2ee? & [total-rows]] - (let [graph-e2ee? (if (nil? graph-e2ee?) true (true? graph-e2ee?)) - opened-db (atom nil) - opened-import-pool (atom nil)] + [repo reset? graph-id graph-e2ee? & [total-datoms]] + (let [graph-e2ee? (if (nil? graph-e2ee?) true (true? graph-e2ee?))] (-> (p/let [_ (when-let [state @*import-state] - (close-import-state! state)) + (close-import-state! state) + (close-db! (:repo state))) _ (reset! *import-state nil) _ (when reset? (close-db! repo)) _ (when reset? ( (p/let [{:keys [db aes-key graph-e2ee?]} (require-import-state! nil graph-id import-id) - _ (import-snapshot-rows! db aes-key graph-e2ee? rows)] - (log-import-progress! graph-id import-id (count rows)) - true) - (p/catch (fn [error] - (when-not (= :db-sync/stale-import (:type (ex-data error))) - (clear-import-state! import-id)) - (throw error))))) - -(def-thread-api :thread-api/db-sync-import-framed-chunk - [chunk graph-id import-id] - (-> (p/let [{:keys [db aes-key graph-e2ee? snapshot-buffer]} (require-import-state! nil graph-id import-id) - {:keys [rows buffer]} (snapshot/parse-framed-chunk snapshot-buffer chunk) - _ (swap! *import-state - (fn [state] - (if (= import-id (:import-id state)) - (assoc state :snapshot-buffer buffer) - state))) - _ (when (seq rows) - (import-snapshot-rows! db aes-key graph-e2ee? rows)) - _ (log-import-progress! graph-id import-id (count rows))] +(def-thread-api :thread-api/db-sync-import-datoms-chunk + [datoms graph-id import-id] + (-> (p/let [{:keys [conn aes-key graph-e2ee?]} (require-import-state! nil graph-id import-id) + _ (import-datoms-batch! conn aes-key graph-e2ee? datoms)] + (log-import-progress! graph-id import-id (count datoms)) true) (p/catch (fn [error] (when-not (= :db-sync/stale-import (:type (ex-data error))) @@ -822,27 +798,8 @@ (def-thread-api :thread-api/db-sync-import-finalize [repo graph-id remote-tx import-id] - (-> (p/let [{:keys [db aes-key graph-e2ee? import-pool snapshot-buffer]} (require-import-state! repo graph-id import-id) - rows (when (and snapshot-buffer (pos? (.-byteLength snapshot-buffer))) - (snapshot/finalize-framed-buffer snapshot-buffer)) - _ (when (seq rows) - (import-snapshot-rows! db aes-key graph-e2ee? rows)) - _ (log-import-progress! graph-id import-id (count rows)) - datoms (when graph-e2ee? - (let [storage (new-sqlite-storage db) - conn (common-sqlite/get-storage-conn storage db-schema/schema) - source-db @conn] - (d/datoms source-db :eavt))) - _ (when-not graph-e2ee? - (.exec db "PRAGMA wal_checkpoint(2)")) - _ (when-not graph-e2ee? - (.close db)) - result (import-datoms-to-db! repo graph-id remote-tx datoms) - _ (when graph-e2ee? - (.close db)) - _ (when graph-e2ee? - (when import-pool - (remove-vfs! import-pool))) + (-> (p/let [_ (require-import-state! repo graph-id import-id) + result (complete-datoms-import! repo graph-id remote-tx) _ (reset! *import-state nil)] result) (p/catch (fn [error] diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index cffde79e7e..ea06eb007a 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -1958,6 +1958,7 @@ [repo {:keys [tx-data tx-meta db-after] :as tx-report}] (when (and (seq tx-data) (not (:rtc-tx? tx-meta)) + (not (:sync-download-graph? tx-meta)) (:persist-op? tx-meta true) (:kv/value (d/entity db-after :logseq.kv/graph-remote?))) (enqueue-local-tx! repo tx-report) diff --git a/src/main/frontend/worker/sync/crypt.cljs b/src/main/frontend/worker/sync/crypt.cljs index 18ce0d5c03..f07191aeb4 100644 --- a/src/main/frontend/worker/sync/crypt.cljs +++ b/src/main/frontend/worker/sync/crypt.cljs @@ -503,6 +503,16 @@ [aes-key rows-batch] (p/all (map #( (p/let [gzip-bytes ( (p/let [gzip-bytes ( (p/let [stream (bytes->stream framed-bytes 3)] - (set! js/fetch - (fn [url opts] - (let [method (or (aget opts "method") "GET")] - (cond - (and (= url stream-url) (= method "GET")) - (js/Promise.resolve - #js {:ok true - :status 200 - :headers #js {:get (fn [header] - (case header - "content-length" (str (.-byteLength framed-bytes)) - "content-encoding" "identity" - nil))} - :body stream - :arrayBuffer (fn [] (throw (js/Error. "arrayBuffer should not be used")))}) - :else - (js/Promise.resolve #js {:ok false :status 404}))))) - (-> (p/with-redefs [db-sync/http-base (fn [] "http://base") - db-sync/fetch-json (fn [url _opts _schema] - (cond - (string/ends-with? url "/pull") - (p/resolved {:t 42}) - - :else - (p/rejected (ex-info "unexpected fetch-json URL" - {:url url})))) - user-handler/task--ensure-id&access-token (fn [resolve _reject] - (resolve true)) - state/ (p/let [gzip-bytes ( (p/let [gzip-bytes (stream gzip-bytes 3)] (set! js/fetch (fn [url opts] (let [method (or (aget opts "method") "GET")] (cond - (and (= url stream-url) (= method "GET")) + (and (= url asset-url) (= method "GET")) (js/Promise.resolve #js {:ok true :status 200 @@ -388,6 +322,12 @@ (string/ends-with? url "/pull") (p/resolved {:t 42}) + (= url download-url) + (p/resolved {:ok true + :url asset-url + :key "graph-1/snapshot-1.snapshot" + :content-encoding "gzip"}) + :else (p/rejected (ex-info "unexpected fetch-json URL" {:url url})))) @@ -404,9 +344,9 @@ (p/finally (fn [] (set! js/fetch original-fetch))))) (p/then (fn [_] (is (= 3 (count @import-calls))) - (let [[chunk-op imported-rows _ import-id] (second @import-calls)] - (is (= :thread-api/db-sync-import-rows-chunk chunk-op)) - (is (= rows imported-rows)) + (let [[chunk-op imported-datoms _ import-id] (second @import-calls)] + (is (= :thread-api/db-sync-import-datoms-chunk chunk-op)) + (is (= datoms imported-datoms)) (is (= "import-1" import-id))) (done))) (p/catch (fn [error] diff --git a/src/test/frontend/worker/db_worker_test.cljs b/src/test/frontend/worker/db_worker_test.cljs index 32394f8f50..6275d6073d 100644 --- a/src/test/frontend/worker/db_worker_test.cljs +++ b/src/test/frontend/worker/db_worker_test.cljs @@ -57,7 +57,7 @@ (is (nil? (get @search/fuzzy-search-indices test-repo))) (is (nil? (get @worker-state/*sqlite-conns test-repo))))))) -(deftest import-datoms-to-db-invalidates-existing-search-db-test +(deftest complete-datoms-import-invalidates-existing-search-db-test (async done (restoring-worker-state (fn [] @@ -70,7 +70,7 @@ worker-state/get-sqlite-conn (fn [_repo _type] nil) client-op/update-local-tx (fn [& _] nil) shared-service/broadcast-to-clients! (fn [& _] nil)] - (#'db-worker/import-datoms-to-db! test-repo "graph-1" 42 nil)) + (#'db-worker/complete-datoms-import! test-repo "graph-1" 42)) (p/then (fn [_] (is true) (vreset! thread-api/*thread-apis thread-apis-prev) @@ -80,16 +80,6 @@ (is false (str error)) (done))))))))) -(defn- fake-import-pool - [labels closed] - #js {:OpfsSAHPoolDb - (let [remaining (atom labels)] - (fn [_path] - (let [label (first @remaining) - _ (swap! remaining rest)] - #js {:exec (fn [_] nil) - :close (fn [] (swap! closed conj label))})))}) - (defn- capture-outcome [thunk] (try @@ -99,232 +89,179 @@ (catch :default error (p/resolved {:error error})))) -(defn- make-snapshot-rows - [n] - (mapv (fn [i] - [i (str "content-" i) (str "addresses-" i)]) - (range n))) +(defn- with-fake-create-or-open-db + [repo conn f] + (let [thread-apis-prev @thread-api/*thread-apis] + (vreset! thread-api/*thread-apis + (assoc thread-apis-prev + :thread-api/create-or-open-db + (fn [_repo _opts] + (swap! worker-state/*datascript-conns assoc repo conn) + (p/resolved nil)))) + (-> (f) + (p/finally (fn [] + (vreset! thread-api/*thread-apis thread-apis-prev)))))) + +(def sample-datoms + [{:e 1 :a :db/ident :v :logseq.class/Page :tx 1 :added true} + {:e 2 :a :block/title :v "hello" :tx 1 :added true}]) (deftest db-sync-import-prepare-replaces-active-import-state-test (async done (restoring-worker-state (fn [] - (let [closed (atom []) - prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)] - (-> (p/with-redefs [db-worker/ (p/with-redefs [db-worker/close-db! (fn [_] nil) + db-worker/ (p/with-redefs [db-worker/ failed-outcome :error ex-message))) - (is (= [:failed] @closed)) - (is (map? retry-import)) - (is (:import-id retry-import)) - (done))) - (p/catch (fn [error] - (is false (str error)) - (done))))))))) + (let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare) + datoms-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-datoms-chunk) + conn (d/create-conn db-schema/schema)] + (with-fake-create-or-open-db + test-repo conn + (fn [] + (-> (p/with-redefs [db-worker/close-db! (fn [_] nil) + db-worker/ stale-outcome :error ex-data :type))) + (is (nil? (d/entity @conn 2))) + (-> (datoms-chunk sample-datoms "graph-1" (:import-id second-import)) + (p/then (fn [_] + (is (= "hello" (:block/title (d/entity @conn 2)))) + (done)))))) + (p/catch (fn [error] + (is false (str error)) + (done))))))))))) -(deftest db-sync-import-rows-chunk-rejects-stale-import-id-test +(deftest db-sync-import-datoms-chunk-imports-plain-datoms-to-active-db-test (async done (restoring-worker-state (fn [] - (let [closed (atom []) - upserts (atom []) - prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare) - rows-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-rows-chunk) - finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)] - (-> (p/with-redefs [db-worker/ stale-outcome :error ex-data :type))) - (is (empty? @upserts)) - (-> (rows-chunk [[2 "content-2" "addresses-2"]] "graph-1" (:import-id second-import)) - (p/then (fn [_] - (is (= 1 (count @upserts))) - (finalize test-repo "graph-1" 42 (:import-id second-import)))) - (p/then (fn [_] (done)))))) - (p/catch (fn [error] - (is false (str error)) - (done))))))))) + (let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare) + datoms-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-datoms-chunk) + conn (d/create-conn db-schema/schema)] + (with-fake-create-or-open-db + test-repo conn + (fn [] + (-> (p/with-redefs [db-worker/close-db! (fn [_] nil) + db-worker/ (p/with-redefs [db-worker/ (p/with-redefs [db-worker/ (p/with-redefs [db-worker/close-db! (fn [_] nil) + db-worker/ (p/with-redefs [db-worker/ stale-outcome :error ex-data :type))) - (is (empty? @finalized)) - (-> (finalize test-repo "graph-1" 42 (:import-id second-import)) - (p/then (fn [_] - (is (= 1 (count @finalized))) - (done)))))) - (p/catch (fn [error] - (is false (str error)) - (done))))))))) + (with-fake-create-or-open-db + test-repo conn + (fn [] + (-> (p/with-redefs [db-worker/close-db! (fn [_] nil) + db-worker/ stale-outcome :error ex-data :type))) + (is (empty? @finalized)) + (-> (finalize test-repo "graph-1" 42 (:import-id second-import)) + (p/then (fn [_] + (is (= [:rehydrate :local-tx :broadcast] @finalized)) + (done)))))) + (p/catch (fn [error] + (is false (str error)) + (done))))))))))) -(deftest db-sync-import-finalize-rebuilds-into-fresh-db-for-e2ee-import-test +(deftest db-sync-import-finalize-completes-active-db-import-test (async done (restoring-worker-state (fn [] - (let [closed (atom []) - removed (atom []) - captured (atom nil) - pool #js {:removeVfs (fn [] (swap! removed conj :removed))} - datoms [{:e 171 :a :block/name :v "$$$views" :tx 1 :added true}] - storage-conn (d/create-conn db-schema/schema) - prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare) - finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)] - (reset! worker-state/*opfs-pools {test-repo pool}) - (d/transact! storage-conn - (mapv (fn [{:keys [e a v]}] - [:db/add e a v]) - datoms)) - (-> (p/with-redefs [db-worker/ (p/with-redefs [db-worker/ (p/with-redefs [db-worker/close-db! (fn [_] nil) + db-worker/ (p/let [aes-key (crypt/ (p/let [aes-key (crypt/