diff --git a/deps/db-sync/src/logseq/db_sync/common.cljs b/deps/db-sync/src/logseq/db_sync/common.cljs index 79f5355007..d6dc9f2747 100644 --- a/deps/db-sync/src/logseq/db_sync/common.cljs +++ b/deps/db-sync/src/logseq/db_sync/common.cljs @@ -7,8 +7,9 @@ (defn cors-headers [] #js {"Access-Control-Allow-Origin" "*" - "Access-Control-Allow-Headers" "content-type,authorization,x-amz-meta-checksum,x-amz-meta-type" - "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD"}) + "Access-Control-Allow-Headers" "content-type,content-encoding,authorization,x-amz-meta-checksum,x-amz-meta-type" + "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD" + "Access-Control-Expose-Headers" "content-type,content-encoding,cache-control,x-asset-type"}) (defn json-response ([data] (json-response data 200)) diff --git a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs index 7a88fd3b32..0eaea8911b 100644 --- a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs +++ b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs @@ -157,29 +157,18 @@ [:t-before :int] [:txs :string]]) -(def snapshot-row-schema - [:or - [:tuple :int :string [:maybe :any]] - [:map - [:addr :int] - [:content :string] - [:addresses {:optional true} :any]]]) - -(def snapshot-rows-response-schema - [:map - [:rows [:sequential snapshot-row-schema]] - [:last-addr :int] - [:done :boolean]]) - -(def snapshot-import-request-schema - [:map - [:reset {:optional true} :boolean] - [:rows [:sequential [:tuple :int :string [:maybe :any]]]]]) - -(def snapshot-import-response-schema +(def snapshot-download-response-schema [:map [:ok :boolean] - [:count :int]]) + [:key :string] + [:url :string] + [:content-encoding {:optional true} [:maybe :string]]]) + +(def snapshot-upload-response-schema + [:map + [:ok :boolean] + [:count :int] + [:key :string]]) (def asset-get-response-schema [:or @@ -190,8 +179,7 @@ {:graphs/create graph-create-request-schema :graph-members/create graph-member-create-request-schema :graph-members/update graph-member-update-request-schema - :sync/tx-batch tx-batch-request-schema - :sync/snapshot-import snapshot-import-request-schema}) + :sync/tx-batch tx-batch-request-schema}) (def http-response-schemas {:graphs/list graphs-list-response-schema @@ -206,8 +194,8 @@ :sync/health http-ok-response-schema :sync/pull pull-ok-schema :sync/tx-batch [:or tx-batch-ok-schema tx-reject-schema http-error-response-schema] - :sync/snapshot-rows snapshot-rows-response-schema - :sync/snapshot-import snapshot-import-response-schema + :sync/snapshot-download snapshot-download-response-schema + :sync/snapshot-upload snapshot-upload-response-schema :sync/admin-reset http-ok-response-schema :assets/get asset-get-response-schema :assets/put http-ok-response-schema diff --git a/deps/db-sync/src/logseq/db_sync/snapshot.cljs b/deps/db-sync/src/logseq/db_sync/snapshot.cljs new file mode 100644 index 0000000000..dda8d4413a --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/snapshot.cljs @@ -0,0 +1,79 @@ +(ns logseq.db-sync.snapshot + (:require [cognitect.transit :as transit])) + +(def ^:private transit-w (transit/writer :json)) +(def ^:private transit-r (transit/reader :json)) +(def ^:private text-decoder (js/TextDecoder.)) + +(defn- ->uint8 + [data] + (cond + (instance? js/Uint8Array data) data + (instance? js/ArrayBuffer data) (js/Uint8Array. data) + (string? data) (.encode (js/TextEncoder.) data) + :else (js/Uint8Array. data))) + +(defn encode-rows + [rows] + (->uint8 (transit/write transit-w rows))) + +(defn decode-rows + [bytes] + (transit/read transit-r (.decode text-decoder (->uint8 bytes)))) + +(defn frame-bytes + [^js bytes] + (let [len (.-byteLength bytes) + out (js/Uint8Array. (+ 4 len)) + view (js/DataView. (.-buffer out))] + (.setUint32 view 0 len false) + (.set out bytes 4) + out)) + +(defn concat-bytes + [^js a ^js b] + (cond + (nil? a) b + (nil? b) a + :else + (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))] + (.set out a 0) + (.set out b (.-byteLength a)) + out))) + +(defn parse-framed-chunk + [buffer chunk] + (let [data (concat-bytes buffer chunk) + total (.-byteLength data)] + (loop [offset 0 + rows []] + (if (< (- total offset) 4) + {:rows rows + :buffer (when (< offset total) + (.slice data offset total))} + (let [view (js/DataView. (.-buffer data) offset 4) + len (.getUint32 view 0 false) + next-offset (+ offset 4 len)] + (if (<= next-offset total) + (let [payload (.slice data (+ offset 4) next-offset) + decoded (decode-rows payload)] + (recur next-offset (into rows decoded))) + {:rows rows + :buffer (.slice data offset total)})))))) + +(defn finalize-framed-buffer + [buffer] + (if (or (nil? buffer) (zero? (.-byteLength buffer))) + [] + (let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)] + (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer)))) + rows + (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows})))))) + +(defn framed-length + [rows-batches] + (reduce (fn [total rows] + (let [payload (encode-rows rows)] + (+ total 4 (.-byteLength payload)))) + 0 + rows-batches)) diff --git a/deps/db-sync/src/logseq/db_sync/worker.cljs b/deps/db-sync/src/logseq/db_sync/worker.cljs index dfa21aa03c..41f0d01449 100644 --- a/deps/db-sync/src/logseq/db_sync/worker.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker.cljs @@ -1,5 +1,6 @@ (ns logseq.db-sync.worker (:require ["cloudflare:workers" :refer [DurableObject]] + [cljs-bean.core :as bean] [clojure.string :as string] [lambdaisland.glogi :as log] [lambdaisland.glogi.console :as glogi-console] @@ -9,6 +10,7 @@ [logseq.db-sync.index :as index] [logseq.db-sync.malli-schema :as db-sync-schema] [logseq.db-sync.protocol :as protocol] + [logseq.db-sync.snapshot :as snapshot] [logseq.db-sync.storage :as storage] [promesa.core :as p] [shadow.cljs.modern :refer (defclass)])) @@ -223,8 +225,12 @@ n)))) (def ^:private max-asset-size (* 100 1024 1024)) -(def ^:private snapshot-rows-default-limit 500) -(def ^:private snapshot-rows-max-limit 2000) +(def ^:private snapshot-download-batch-size 500) +(def ^:private snapshot-cache-control "private, max-age=300") +(def ^:private snapshot-content-type "application/transit+json") +(def ^:private snapshot-content-encoding "gzip") +;; 10m +(def ^:private snapshot-multipart-part-size (* 10 1024 1024)) (defn- fetch-kvs-rows [sql after limit] @@ -234,14 +240,216 @@ after limit))) -(defn- snapshot-row->map [row] +(defn- snapshot-row->tuple [row] (if (array? row) - {:addr (aget row 0) - :content (aget row 1) - :addresses (aget row 2)} - {:addr (aget row "addr") - :content (aget row "content") - :addresses (aget row "addresses")})) + [(aget row 0) (aget row 1) (aget row 2)] + [(aget row "addr") (aget row "content") (aget row "addresses")])) + +(defn- ensure-import-table! [sql] + (common/sql-exec sql "drop table if exists kvs_import") + (common/sql-exec sql "create table if not exists kvs_import (addr INTEGER primary key, content TEXT, addresses JSON)")) + +(defn- import-snapshot-rows! + [sql table rows] + (when (seq rows) + (doseq [[addr content addresses] rows] + (common/sql-exec sql + (str "insert into " table " (addr, content, addresses) values (?, ?, ?)" + " on conflict(addr) do update set content = excluded.content, addresses = excluded.addresses") + addr + content + addresses)))) + +(defn- finalize-import! + [^js self reset?] + (let [sql (.-sql self) + state (.-state self)] + (ensure-schema! self) + (if (and state (.-storage state) (.-transactionSync (.-storage state))) + (.transactionSync (.-storage state) + (fn [] + (if reset? + (do + (common/sql-exec sql "delete from kvs") + (common/sql-exec sql "insert into kvs (addr, content, addresses) select addr, content, addresses from kvs_import") + (common/sql-exec sql "delete from tx_log") + (common/sql-exec sql "delete from sync_meta") + (storage/set-t! sql 0)) + (do + (common/sql-exec sql "delete from kvs where addr in (select addr from kvs_import)") + (common/sql-exec sql "insert into kvs (addr, content, addresses) select addr, content, addresses from kvs_import"))) + (common/sql-exec sql "drop table if exists kvs_import"))) + (do + (log/error :db-sync/transaction-missing {:reset reset?}) + (throw (ex-info "missing durable object transaction" {:reset reset?})))) + (set! (.-conn self) (storage/open-conn sql)))) + +(defn- graph-id-from-request [request] + (let [header-id (.get (.-headers request) "x-graph-id") + url (js/URL. (.-url request)) + param-id (.get (.-searchParams url) "graph-id")] + (when (seq (or header-id param-id)) + (or header-id param-id)))) + +(defn- snapshot-key [graph-id snapshot-id] + (str graph-id "/" snapshot-id ".snapshot")) + +(defn- snapshot-url [request graph-id snapshot-id] + (let [url (js/URL. (.-url request))] + (str (.-origin url) "/assets/" graph-id "/" snapshot-id ".snapshot"))) + +(defn- maybe-compress-stream [stream] + (if (exists? js/CompressionStream) + (.pipeThrough stream (js/CompressionStream. "gzip")) + stream)) + +(defn- maybe-decompress-stream [stream encoding] + (if (and (= encoding snapshot-content-encoding) (exists? js/DecompressionStream)) + (.pipeThrough stream (js/DecompressionStream. "gzip")) + stream)) + +(defn- ->uint8 [data] + (cond + (instance? js/Uint8Array data) data + (instance? js/ArrayBuffer data) (js/Uint8Array. data) + :else (js/Uint8Array. data))) + +(defn- concat-uint8 [^js a ^js b] + (cond + (nil? a) b + (nil? b) a + :else + (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))] + (.set out a 0) + (.set out b (.-byteLength a)) + out))) + +(defn- snapshot-export-stream [^js self] + (let [sql (.-sql self) + state (volatile! {:after -1 :done? false})] + (js/ReadableStream. + #js {:pull (fn [controller] + (p/let [{:keys [after done?]} @state] + (if done? + (.close controller) + (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size) + rows (mapv snapshot-row->tuple rows) + last-addr (if (seq rows) + (apply max (map first rows)) + after) + done? (< (count rows) snapshot-download-batch-size)] + (when (seq rows) + (let [payload (snapshot/encode-rows rows) + framed (snapshot/frame-bytes payload)] + (.enqueue controller framed))) + (vswap! state assoc :after last-addr :done? done?)))))}))) + +(defn- upload-multipart! + [^js bucket key stream opts] + (p/let [^js upload (.createMultipartUpload bucket key opts)] + (let [reader (.getReader stream)] + (-> (p/loop [buffer nil + part-number 1 + parts []] + (p/let [chunk (.read reader)] + (if (.-done chunk) + (cond + (and buffer (pos? (.-byteLength buffer))) + (p/let [^js resp (.uploadPart upload part-number buffer) + parts (conj parts {:partNumber part-number :etag (.-etag resp)})] + (p/let [_ (.complete upload (clj->js parts))] + {:ok true})) + + (seq parts) + (p/let [_ (.complete upload (clj->js parts))] + {:ok true}) + + :else + (p/let [_ (.abort upload)] + (.put bucket key (js/Uint8Array. 0) opts))) + (let [value (.-value chunk) + buffer (concat-uint8 buffer (->uint8 value))] + (if (>= (.-byteLength buffer) snapshot-multipart-part-size) + (let [part (.slice buffer 0 snapshot-multipart-part-size) + rest-parts (.slice buffer snapshot-multipart-part-size (.-byteLength buffer))] + (p/let [^js resp (.uploadPart upload part-number part) + parts (conj parts {:partNumber part-number :etag (.-etag resp)})] + (p/recur rest-parts (inc part-number) parts))) + (p/recur buffer part-number parts)))))) + (p/catch (fn [error] + (.abort upload) + (throw error))))))) + +(defn- snapshot-export-length [^js self] + (let [sql (.-sql self)] + (p/loop [after -1 + total 0] + (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)] + (if (empty? rows) + total + (let [rows (mapv snapshot-row->tuple rows) + payload (snapshot/encode-rows rows) + total (+ total 4 (.-byteLength payload)) + last-addr (apply max (map first rows)) + done? (< (count rows) snapshot-download-batch-size)] + (if done? + total + (p/recur last-addr total)))))))) + +(defn- snapshot-export-fixed-length [^js self] + (p/let [length (snapshot-export-length self) + stream (snapshot-export-stream self)] + (if (exists? js/FixedLengthStream) + (let [^js fixed (js/FixedLengthStream. length) + readable (.-readable fixed) + writable (.-writable fixed) + reader (.getReader stream) + writer (.getWriter writable)] + (p/let [_ (p/loop [] + (p/let [chunk (.read reader)] + (if (.-done chunk) + (.close writer) + (p/let [_ (.write writer (.-value chunk))] + (p/recur)))))] + readable)) + (p/let [resp (js/Response. stream) + buf (.arrayBuffer resp)] + buf)))) + +(declare import-snapshot!) +(defn- import-snapshot-stream! [^js self stream reset?] + (let [reader (.getReader stream) + reset-pending? (volatile! reset?) + total-count (volatile! 0)] + (ensure-import-table! (.-sql self)) + (p/let [buffer nil] + (p/catch + (p/loop [buffer buffer] + (p/let [chunk (.read reader)] + (if (.-done chunk) + (let [rows (snapshot/finalize-framed-buffer buffer) + rows-count (count rows) + reset? (and @reset-pending? true)] + (when (or reset? (seq rows)) + (import-snapshot! self rows reset?) + (vreset! reset-pending? false)) + (vswap! total-count + rows-count) + (finalize-import! self reset?) + @total-count) + (let [value (.-value chunk) + {:keys [rows buffer]} (snapshot/parse-framed-chunk buffer value) + rows-count (count rows) + reset? (and @reset-pending? (seq rows))] + (when (seq rows) + (import-snapshot! self rows (true? reset?)) + (vreset! reset-pending? false)) + (vswap! total-count + rows-count) + (p/recur buffer))))) + (fn [error] + (try + (common/sql-exec (.-sql self) "drop table if exists kvs_import") + (catch :default _)) + (throw error)))))) (defn- handle-assets [request ^js env] (let [url (js/URL. (.-url request)) @@ -262,13 +470,26 @@ (fn [^js obj] (if (nil? obj) (error-response "not found" 404) - (let [content-type (or (.-contentType (.-httpMetadata obj)) - "application/octet-stream")] + (let [metadata (.-httpMetadata obj) + content-type (or (.-contentType metadata) + "application/octet-stream") + content-encoding (.-contentEncoding metadata) + cache-control (.-cacheControl metadata) + headers (cond-> {"content-type" content-type + "x-asset-type" asset-type} + (and (string? content-encoding) + (not= content-encoding "null") + (pos? (.-length content-encoding))) + (assoc "content-encoding" content-encoding) + (and (string? cache-control) + (pos? (.-length cache-control))) + (assoc "cache-control" cache-control) + true + (bean/->js))] (js/Response. (.-body obj) #js {:status 200 :headers (js/Object.assign - #js {"content-type" content-type - "x-asset-type" asset-type} + headers (cors-headers))}))))) "PUT" @@ -330,7 +551,7 @@ tail (if (neg? slash-idx) "/" (subs rest-path slash-idx)) - new-url (str (.-origin url) tail (.-search url))] + new-url (js/URL. (str (.-origin url) tail (.-search url)))] (if (seq graph-id) (if (= method "OPTIONS") (common/options-response) @@ -341,8 +562,10 @@ stub (.get namespace do-id)] (if (common/upgrade-request? request) (.fetch stub request) - (let [rewritten (js/Request. new-url request)] - (.fetch stub rewritten)))) + (do + (.set (.-searchParams new-url) "graph-id" graph-id) + (let [rewritten (js/Request. (.toString new-url) request)] + (.fetch stub rewritten))))) access-resp))) (bad-request "missing graph id"))) @@ -370,25 +593,10 @@ ;; :t (t-now self) ;; :datoms (common/write-transit datoms)})) -(defn- import-snapshot! [^js self rows reset?] +(defn- import-snapshot! [^js self rows _reset?] (let [sql (.-sql self)] (ensure-schema! self) - (when reset? - (common/sql-exec sql "delete from kvs") - (common/sql-exec sql "delete from tx_log") - (common/sql-exec sql "delete from sync_meta") - (storage/init-schema! sql) - (set! (.-schema-ready self) true) - (storage/set-t! sql 0)) - (when (seq rows) - (doseq [[addr content addresses] rows] - (common/sql-exec sql - (str "insert into kvs (addr, content, addresses) values (?, ?, ?)" - " on conflict(addr) do update set content = excluded.content, addresses = excluded.addresses") - addr - content - addresses))) - (set! (.-conn self) (storage/open-conn sql)))) + (import-snapshot-rows! sql "kvs_import" rows))) (defn- apply-tx! [^js self sender txs] (let [sql (.-sql self)] @@ -512,22 +720,36 @@ ;; (and (= method "GET") (= path "/snapshot")) ;; (common/json-response (snapshot-response self)) - (and (= method "GET") (= path "/snapshot/rows")) - (let [after (or (parse-int (.get (.-searchParams url) "after")) -1) - limit (or (parse-int (.get (.-searchParams url) "limit")) - snapshot-rows-default-limit) - limit (-> limit - (max 1) - (min snapshot-rows-max-limit)) - rows (fetch-kvs-rows (.-sql self) after limit) - rows (mapv snapshot-row->map rows) - last-addr (if (seq rows) - (apply max (map :addr rows)) - after) - done? (< (count rows) limit)] - (json-response :sync/snapshot-rows {:rows rows - :last-addr last-addr - :done done?})) + (and (= method "GET") (= path "/snapshot/download")) + (let [graph-id (graph-id-from-request request) + ^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))] + (cond + (not (seq graph-id)) + (bad-request "missing graph id") + + (nil? bucket) + (error-response "missing assets bucket" 500) + + :else + (p/let [snapshot-id (str (random-uuid)) + key (snapshot-key graph-id snapshot-id) + stream (snapshot-export-stream self) + multipart? (and (some? (.-createMultipartUpload bucket)) + (fn? (.-createMultipartUpload bucket))) + opts #js {:httpMetadata #js {:contentType snapshot-content-type + :contentEncoding nil + :cacheControl snapshot-cache-control} + :customMetadata #js {:purpose "snapshot" + :created-at (str (common/now-ms))}} + _ (if multipart? + (upload-multipart! bucket key stream opts) + (p/let [body (snapshot-export-fixed-length self)] + (.put bucket key body opts))) + url (snapshot-url request graph-id snapshot-id)] + (json-response :sync/snapshot-download {:ok true + :key key + :url url + :content-encoding nil})))) (and (= method "DELETE") (= path "/admin/reset")) (do @@ -554,19 +776,54 @@ (json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before)) (bad-request "invalid tx")))))))) - (and (= method "POST") (= path "/snapshot/import")) - (.then (common/read-json request) - (fn [result] - (if (nil? result) - (bad-request "missing body") - (let [body (js->clj result :keywordize-keys true) - body (coerce-http-request :sync/snapshot-import body)] - (if (nil? body) - (bad-request "invalid body") - (let [{:keys [rows reset]} body] - (import-snapshot! self rows reset) - (json-response :sync/snapshot-import {:ok true - :count (count rows)}))))))) + (and (= method "POST") (= path "/snapshot/upload")) + (let [graph-id (graph-id-from-request request) + ^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self)) + reset-param (.get (.-searchParams url) "reset") + reset? (if (nil? reset-param) + true + (not (contains? #{"false" "0"} reset-param)))] + (cond + (not (seq graph-id)) + (bad-request "missing graph id") + + (nil? bucket) + (error-response "missing assets bucket" 500) + + (nil? (.-body request)) + (bad-request "missing body") + + :else + (p/let [snapshot-id (str (random-uuid)) + key (snapshot-key graph-id snapshot-id) + req-encoding (.get (.-headers request) "content-encoding") + _ (.put bucket key + (.-body request) + #js {:httpMetadata #js {:contentType snapshot-content-type + :contentEncoding req-encoding + :cacheControl snapshot-cache-control} + :customMetadata #js {:purpose "snapshot" + :created-at (str (common/now-ms))}}) + ^js obj (.get bucket key)] + (if (nil? obj) + (error-response "snapshot missing" 500) + (p/catch + (let [metadata (.-httpMetadata obj) + encoding (or (.-contentEncoding metadata) req-encoding) + stream (.-body obj)] + (if (and (= encoding snapshot-content-encoding) + (not (exists? js/DecompressionStream))) + (p/let [_ (.delete bucket key)] + (error-response "gzip not supported" 500)) + (p/let [stream (maybe-decompress-stream stream encoding) + count (import-snapshot-stream! self stream reset?) + _ (.delete bucket key)] + (json-response :sync/snapshot-upload {:ok true + :count count + :key key})))) + (fn [error] + (p/let [_ (.delete bucket key)] + (throw error)))))))) :else (not-found)))) diff --git a/deps/db-sync/test/logseq/db_sync/snapshot_import_test.cljs b/deps/db-sync/test/logseq/db_sync/snapshot_import_test.cljs new file mode 100644 index 0000000000..1cd460d3c4 --- /dev/null +++ b/deps/db-sync/test/logseq/db_sync/snapshot_import_test.cljs @@ -0,0 +1,40 @@ +(ns logseq.db-sync.snapshot-import-test + (:require [cljs.test :refer [deftest is async]] + [clojure.string :as string] + [logseq.db-sync.snapshot :as snapshot] + [logseq.db-sync.worker :as worker] + [promesa.core :as p])) + +(defn- make-sql [state] + #js {:exec (fn [sql & _args] + (swap! state update :executed conj sql) + nil)}) + +(defn- make-stream [chunk] + (js/ReadableStream. + #js {:start (fn [controller] + (.enqueue controller chunk) + (.close controller))})) + +(deftest snapshot-import-failure-does-not-touch-kvs-test + (async done + (let [state (atom {:executed []}) + sql (make-sql state) + self (doto (js-obj) + (aset "sql" sql))] + (-> (with-redefs [snapshot/parse-framed-chunk (fn [_ _] + (throw (ex-info "boom" {})))] + (-> (p/then (#'worker/import-snapshot-stream! + self + (make-stream (js/Uint8Array. #js [1 2 3])) + true) + (fn [_] + (is false "expected import to fail") + nil)) + (p/catch (fn [_] + (let [sqls (:executed @state)] + (is (some #(string/includes? % "drop table if exists kvs_import") sqls)) + (is (not-any? #(string/includes? % "insert into kvs ") sqls)) + (is (not-any? #(string/includes? % "delete from kvs") sqls))) + nil)))) + (p/finally (fn [] (done))))))) diff --git a/deps/db-sync/test/logseq/db_sync/snapshot_test.cljs b/deps/db-sync/test/logseq/db_sync/snapshot_test.cljs new file mode 100644 index 0000000000..2721c67327 --- /dev/null +++ b/deps/db-sync/test/logseq/db_sync/snapshot_test.cljs @@ -0,0 +1,44 @@ +(ns logseq.db-sync.snapshot-test + (:require [cljs.test :refer [deftest is testing]] + [logseq.db-sync.snapshot :as snapshot])) + +(deftest transit-frame-roundtrip-test + (testing "framed transit json roundtrips rows" + (let [expected [{:addr 1 :content "a" :addresses nil} + {:addr 2 :content "b" :addresses "{\"k\":1}"}] + frame (snapshot/frame-bytes (snapshot/encode-rows expected)) + {:keys [rows buffer]} (snapshot/parse-framed-chunk nil frame)] + (is (= rows expected)) + (is (or (nil? buffer) (zero? (.-byteLength buffer))))))) + +(deftest transit-frame-split-test + (testing "parse-framed-chunk handles partial trailing frame" + (let [rows1 [{:addr 1 :content "a" :addresses nil}] + rows2 [{:addr 2 :content "b" :addresses nil}] + frame1 (snapshot/frame-bytes (snapshot/encode-rows rows1)) + frame2 (snapshot/frame-bytes (snapshot/encode-rows rows2)) + split-pos (- (.-byteLength frame2) 3) + part1 (.slice frame2 0 split-pos) + part2 (.slice frame2 split-pos (.-byteLength frame2)) + {rows1-parsed :rows buffer :buffer} (snapshot/parse-framed-chunk nil (snapshot/concat-bytes frame1 part1)) + {rows2-parsed :rows rows-buffer :buffer} (snapshot/parse-framed-chunk buffer part2)] + (is (= rows1-parsed rows1)) + (is (= rows2-parsed rows2)) + (is (or (nil? rows-buffer) (zero? (.-byteLength rows-buffer))))))) + +(deftest transit-finalize-buffer-test + (testing "finalize-framed-buffer parses remaining frame" + (let [rows [{:addr 3 :content "c" :addresses nil}] + frame (snapshot/frame-bytes (snapshot/encode-rows rows))] + (is (= rows (snapshot/finalize-framed-buffer frame))) + (is (= [] (snapshot/finalize-framed-buffer (js/Uint8Array.))))))) + +(deftest transit-framed-length-test + (testing "framed-length sums frame sizes" + (let [rows1 [{:addr 1 :content "a" :addresses nil}] + rows2 [{:addr 2 :content "b" :addresses nil} + {:addr 3 :content "c" :addresses nil}] + frame1 (snapshot/frame-bytes (snapshot/encode-rows rows1)) + frame2 (snapshot/frame-bytes (snapshot/encode-rows rows2))] + (is (= (+ (.-byteLength frame1) (.-byteLength frame2)) + (snapshot/framed-length [rows1 rows2])))))) diff --git a/docs/agent-guide/db-sync/protocol.md b/docs/agent-guide/db-sync/protocol.md index bda7c1d83c..4e92d707a7 100644 --- a/docs/agent-guide/db-sync/protocol.md +++ b/docs/agent-guide/db-sync/protocol.md @@ -72,12 +72,15 @@ - Same as WS tx/batch. Body: `{"t-before":,"txs":["", ...]}`. - Response: `{"type":"tx/batch/ok","t":}` or `{"type":"tx/reject","reason":...}`. - Error response (400): `{"error":"missing body"|"invalid tx"}`. -- `GET /sync/:graph-id/snapshot/rows?after=&limit=` - - Pull sqlite kvs rows. Response: `{"rows":[{"addr":,"content":"","addresses":}...],"last-addr":,"done":true|false}`. -- `POST /sync/:graph-id/snapshot/import` - - Import sqlite kvs rows. Body: `{"reset":true|false,"rows":[[addr,content,addresses]...]}`. - - Response: `{"ok":true,"count":}`. - - Error response (400): `{"error":"missing body"|"invalid body"}`. +- `GET /sync/:graph-id/snapshot/download` + - Build a snapshot file in R2 and return a download URL. + - Response: `{"ok":true,"key":"/.snapshot","url":"/assets/:graph-id/.snapshot","content-encoding":"gzip"}`. + - The snapshot file is a framed Transit JSON stream of kvs rows, optionally gzip-compressed. +- `POST /sync/:graph-id/snapshot/upload?reset=true|false` + - Upload a snapshot stream (framed Transit JSON, optionally gzip-compressed). The server imports rows into kvs. + - Request body: binary stream; headers should include `content-type: application/transit+json` and `content-encoding: gzip` when compressed. + - Response: `{"ok":true,"count":,"key":"/.snapshot"}`. + - Error response (400): `{"error":"missing body"|"missing graph id"}`. - `DELETE /sync/:graph-id/admin/reset` - Drop/recreate per-graph tables. Response: `{"ok":true}`. diff --git a/src/main/frontend/handler/db_based/db_sync.cljs b/src/main/frontend/handler/db_based/db_sync.cljs index d893e5e188..e109308b44 100644 --- a/src/main/frontend/handler/db_based/db_sync.cljs +++ b/src/main/frontend/handler/db_based/db_sync.cljs @@ -30,7 +30,60 @@ (or config/db-sync-http-base (ws->http-base config/db-sync-ws-url))) -(def ^:private snapshot-rows-limit 2000) +(def ^:private snapshot-text-decoder (js/TextDecoder.)) + +(defn- ->uint8 [data] + (cond + (instance? js/Uint8Array data) data + (instance? js/ArrayBuffer data) (js/Uint8Array. data) + (string? data) (.encode (js/TextEncoder.) data) + :else (js/Uint8Array. data))) + +(defn- decode-snapshot-rows [bytes] + (sqlite-util/read-transit-str (.decode snapshot-text-decoder (->uint8 bytes)))) + +(defn- frame-len [^js data offset] + (let [view (js/DataView. (.-buffer data) offset 4)] + (.getUint32 view 0 false))) + +(defn- concat-bytes + [^js a ^js b] + (cond + (nil? a) b + (nil? b) a + :else + (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))] + (.set out a 0) + (.set out b (.-byteLength a)) + out))) + +(defn- parse-framed-chunk + [buffer chunk] + (let [data (concat-bytes buffer chunk) + total (.-byteLength data)] + (loop [offset 0 + rows []] + (if (< (- total offset) 4) + {:rows rows + :buffer (when (< offset total) + (.slice data offset total))} + (let [len (frame-len data offset) + next-offset (+ offset 4 len)] + (if (<= next-offset total) + (let [payload (.slice data (+ offset 4) next-offset) + decoded (decode-snapshot-rows payload)] + (recur next-offset (into rows decoded))) + {:rows rows + :buffer (.slice data offset total)})))))) + +(defn- finalize-framed-buffer + [buffer] + (if (or (nil? buffer) (zero? (.-byteLength buffer))) + [] + (let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)] + (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer)))) + rows + (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows})))))) (defn- auth-headers [] (when-let [token (state/get-auth-id-token)] @@ -176,31 +229,57 @@ (state/set-state! :rtc/downloading-graph-uuid graph-uuid) (let [base (http-base)] (-> (if (and graph-uuid base) - (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token) - graph (str config/db-version-prefix graph-name)] - (p/loop [after -1 ; root addr is 0 - first-batch? true] - (p/let [pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull") - {:method "GET"} - {:response-schema :sync/pull}) - remote-tx (:t pull-resp) - _ (when-not (integer? remote-tx) - (throw (ex-info "non-integer remote-tx when downloading graph" - {:graph graph-name - :remote-tx remote-tx}))) - resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/rows" - "?after=" after "&limit=" snapshot-rows-limit) - {:method "GET"} - {:response-schema :sync/snapshot-rows}) - rows (:rows resp) - done? (true? (:done resp)) - last-addr (or (:last-addr resp) after)] - (p/do! - (state/ (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token) + graph (str config/db-version-prefix graph-name) + pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull") + {:method "GET"} + {:response-schema :sync/pull}) + remote-tx (:t pull-resp) + _ (when-not (integer? remote-tx) + (throw (ex-info "non-integer remote-tx when downloading graph" + {:graph graph-name + :remote-tx remote-tx}))) + download-resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/download") + {:method "GET"} + {:response-schema :sync/snapshot-download}) + download-url (:url download-resp) + _ (reset! download-url* download-url) + _ (when-not (string? download-url) + (throw (ex-info "missing snapshot download url" + {:graph graph-name + :response download-resp}))) + resp (js/fetch download-url (clj->js (with-auth-headers {:method "GET"})))] + (when-not (.-ok resp) + (throw (ex-info "snapshot download failed" + {:graph graph-name + :status (.-status resp)}))) + (when-not (.-body resp) + (throw (ex-info "snapshot download missing body" + {:graph graph-name}))) + (p/let [reader (.getReader (.-body resp))] + (p/loop [buffer nil + total 0 + total-rows []] + (p/let [chunk (.read reader)] + (if (.-done chunk) + (let [rows (finalize-framed-buffer buffer) + total' (+ total (count rows)) + total-rows' (into total-rows rows)] + (when (seq total-rows') + (p/do! + (state/js (with-auth-headers {:method "DELETE"})))))))) (p/rejected (ex-info "db-sync missing graph info" {:type :db-sync/invalid-graph :graph-uuid graph-uuid diff --git a/src/main/frontend/worker/db_sync.cljs b/src/main/frontend/worker/db_sync.cljs index c5e3818a8c..80fb5525dc 100644 --- a/src/main/frontend/worker/db_sync.cljs +++ b/src/main/frontend/worker/db_sync.cljs @@ -129,6 +129,9 @@ (def ^:private max-asset-size (* 100 1024 1024)) (def ^:private upload-kvs-batch-size 2000) +(def ^:private snapshot-content-type "application/transit+json") +(def ^:private snapshot-content-encoding "gzip") +(def ^:private snapshot-text-encoder (js/TextEncoder.)) (def ^:private reconnect-base-delay-ms 1000) (def ^:private reconnect-max-delay-ms 30000) (def ^:private reconnect-jitter-ms 250) @@ -996,43 +999,85 @@ (defn- normalize-snapshot-rows [rows] (mapv (fn [row] (vec row)) (array-seq rows))) +(defn- encode-snapshot-rows [rows] + (.encode snapshot-text-encoder (sqlite-util/write-transit-str rows))) + +(defn- frame-bytes [^js bytes] + (let [len (.-byteLength bytes) + out (js/Uint8Array. (+ 4 len)) + view (js/DataView. (.-buffer out))] + (.setUint32 view 0 len false) + (.set out bytes 4) + out)) + +(defn- snapshot-upload-stream [db] + (let [state (volatile! {:after -1 :done? false})] + (js/ReadableStream. + #js {:pull (fn [controller] + (p/let [{:keys [after done?]} @state] + (if done? + (.close controller) + (let [rows (fetch-kvs-rows db after upload-kvs-batch-size)] + (if (empty? rows) + (.close controller) + (let [rows (normalize-snapshot-rows rows) + last-addr (apply max (map first rows)) + done? (< (count rows) upload-kvs-batch-size) + payload (encode-snapshot-rows rows) + framed (frame-bytes payload)] + (.enqueue controller framed) + (vswap! state assoc :after last-addr :done? done?)))))))}))) + +(defn- maybe-compress-stream [stream] + (if (exists? js/CompressionStream) + (.pipeThrough stream (js/CompressionStream. "gzip")) + stream)) + +(defn- should-buffer-snapshot-upload? + [base] + (when (string? base) + (try + (let [url (js/URL. base) + host (.-hostname url)] + (and (= "http:" (.-protocol url)) + (contains? #{"localhost" "127.0.0.1"} host))) + (catch :default _ + false)))) + +(defn- js body))} - {:response-schema :sync/snapshot-import})] - (client-op/add-all-exists-asset-as-ops repo) - {:graph-id graph-id}))) - (let [max-addr (apply max (map first rows)) - rows (normalize-snapshot-rows rows) - body (coerce-http-request :sync/snapshot-import {:reset first-batch? - :rows rows})] - (if (nil? body) - (p/rejected (ex-info "db-sync invalid snapshot body" - {:repo repo :graph-id graph-id})) - (p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import") - {:method "POST" - :headers {"content-type" "application/json"} - :body (js/JSON.stringify (clj->js body))} - {:response-schema :sync/snapshot-import})] - (p/recur max-addr false)))))))) + (let [stream (snapshot-upload-stream db) + use-compression? (exists? js/CompressionStream) + body (if use-compression? (maybe-compress-stream stream) stream) + headers (cond-> {"content-type" snapshot-content-type} + use-compression? (assoc "content-encoding" snapshot-content-encoding)) + upload-url (str base "/sync/" graph-id "/snapshot/upload?reset=true") + upload-opts {:method "POST" + :headers headers + :body body + :duplex "half"} + fallback? (should-buffer-snapshot-upload? base) + do-upload (fn [opts] + (fetch-json upload-url opts {:response-schema :sync/snapshot-upload}))] + (p/let [_ (if fallback? + (p/let [buf (sqlite-binds [rows] - (mapv (fn [{:keys [addr content addresses]}] + (mapv (fn [[addr content addresses]] #js {:$addr addr :$content content :$addresses addresses})