diff --git a/deps/db-sync/src/logseq/db_sync/common.cljs b/deps/db-sync/src/logseq/db_sync/common.cljs index f189afccdb..9e3c612e58 100644 --- a/deps/db-sync/src/logseq/db_sync/common.cljs +++ b/deps/db-sync/src/logseq/db_sync/common.cljs @@ -8,7 +8,7 @@ #js {"Access-Control-Allow-Origin" "*" "Access-Control-Allow-Headers" "content-type,content-encoding,authorization,x-amz-meta-checksum,x-amz-meta-type" "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD" - "Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type,x-asset-size,x-snapshot-datom-count"}) + "Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type,x-asset-size,x-snapshot-row-count"}) (defn json-response ([data] (json-response data 200)) diff --git a/deps/db-sync/src/logseq/db_sync/snapshot.cljs b/deps/db-sync/src/logseq/db_sync/snapshot.cljs index c1f9cb9e02..34c60473cf 100644 --- a/deps/db-sync/src/logseq/db_sync/snapshot.cljs +++ b/deps/db-sync/src/logseq/db_sync/snapshot.cljs @@ -4,7 +4,6 @@ (def ^:private transit-w (transit/writer :json)) (def ^:private transit-r (transit/reader :json)) (def ^:private text-decoder (js/TextDecoder.)) -(def ^:private newline-byte 10) (defn- ->uint8 [data] @@ -61,63 +60,3 @@ (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer)))) rows (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows})))))) - -(defn framed-length - [rows-batches] - (reduce (fn [total rows] - (let [payload (encode-rows rows)] - (+ total 4 (.-byteLength payload)))) - 0 - rows-batches)) - -(defn- decode-transit - [payload] - (transit/read transit-r (.decode text-decoder (->uint8 payload)))) - -(defn encode-datoms-jsonl - [datoms] - (->uint8 - (apply str - (map (fn [datom] - (str (transit/write transit-w datom) "\n")) - datoms)))) - -(defn- find-newline-offset - [^js data start total] - (loop [offset start] - (cond - (>= offset total) - nil - - (= newline-byte (aget data offset)) - offset - - :else - (recur (inc offset))))) - -(defn parse-datoms-jsonl-chunk - [buffer chunk] - (let [data (concat-bytes buffer chunk) - total (.-byteLength data)] - (loop [offset 0 - datoms []] - (let [newline-offset (find-newline-offset data offset total)] - (if (number? newline-offset) - (let [line (.slice data offset newline-offset) - next-offset (inc newline-offset) - datoms (if (zero? (.-byteLength line)) - datoms - (conj datoms (decode-transit line)))] - (recur next-offset datoms)) - {:datoms datoms - :buffer (when (< offset total) - (.slice data offset total))}))))) - -(defn finalize-datoms-jsonl-buffer - [buffer] - (if (or (nil? buffer) (zero? (.-byteLength buffer))) - [] - (let [{:keys [datoms buffer]} (parse-datoms-jsonl-chunk nil buffer)] - (cond-> datoms - (and buffer (pos? (.-byteLength buffer))) - (conj (decode-transit buffer)))))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs index e94ea7baab..a7b6056bd8 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs @@ -13,12 +13,11 @@ [logseq.db-sync.worker.routes.sync :as sync-routes] [logseq.db-sync.worker.ws :as ws] [logseq.db.frontend.schema :as db-schema] - [promesa.core :as p] - [logseq.db-sync.checksum :as checksum])) + [promesa.core :as p])) (def ^:private snapshot-download-batch-size 10000) (def ^:private snapshot-cache-control "private, max-age=300") -(def ^:private snapshot-content-type "application/x-ndjson") +(def ^:private snapshot-content-type "application/transit+json") (def ^:private snapshot-content-encoding "gzip") (def ^:private snapshot-uploading-meta-key :snapshot-uploading?) ;; 10m @@ -56,18 +55,7 @@ (defn current-checksum [^js self] (ensure-conn! self) - (let [db @(.-conn self) - full-checksum (checksum/recompute-checksum db) - cur-checksum (storage/get-checksum (.-sql self))] - (if (or (nil? cur-checksum) - (= full-checksum cur-checksum)) - cur-checksum - (do - (log/error :db-sync/server-checksum-mismatch - {:full-checksum full-checksum - :current-checksum cur-checksum}) - (storage/set-checksum! (.-sql self) full-checksum) - full-checksum)))) + (storage/get-checksum (.-sql self))) (defn snapshot-upload-finished? [^js self] (ensure-schema! self) @@ -174,47 +162,48 @@ (.set out b (.-byteLength a)) out))) -(defn- snapshot-datom->jsonl-datom - [datom] - {:e (:e datom) - :a (:a datom) - :v (:v datom) - :tx (:tx datom) - :added (:added datom)}) +(defn- frame-bytes + [^js data] + (let [len (.-byteLength data) + out (js/Uint8Array. (+ 4 len)) + view (js/DataView. (.-buffer out))] + (.setUint32 view 0 len false) + (.set out data 4) + out)) -(defn- snapshot-datom-count - [conn] - (count (d/datoms @conn :eavt))) +(defn- fetch-snapshot-kvs-rows + [sql last-addr limit] + (let [rows (common/get-sql-rows + (common/sql-exec sql + "select addr, content, addresses from kvs where addr > ? order by addr asc limit ?" + last-addr + limit))] + (mapv (fn [row] + [(aget row "addr") + (aget row "content") + (aget row "addresses")]) + rows))) -(defn- snapshot-export-datoms - [conn] - (let [db @conn - schema-version-eid (some-> (d/entity db :logseq.kv/schema-version) :db/id) - ident-eids (into #{} - (map :e) - (d/datoms db :avet :db/ident)) - jsonl-datoms (fn [pred] - (sequence - (comp (filter pred) - (map snapshot-datom->jsonl-datom)) - (d/datoms db :eavt)))] - (concat (jsonl-datoms #(= schema-version-eid (:e %))) - (jsonl-datoms #(and (contains? ident-eids (:e %)) - (not= schema-version-eid (:e %)))) - (jsonl-datoms #(not (contains? ident-eids (:e %))))))) +(defn- snapshot-row-count + [sql] + (if-let [row (first (common/get-sql-rows + (common/sql-exec sql "select count(*) as row_count from kvs")))] + (or (aget row "row_count") 0) + 0)) (defn- snapshot-export-stream [^js self] - (ensure-conn! self) - (let [remaining (volatile! (seq (snapshot-export-datoms (.-conn self))))] + (ensure-schema! self) + (let [sql (.-sql self) + last-addr (volatile! -1)] (js/ReadableStream. - #js {:pull (fn [controller] - (let [batch (vec (take snapshot-download-batch-size @remaining))] - (if (empty? batch) - (.close controller) - (let [remaining' (drop snapshot-download-batch-size @remaining) - payload (snapshot/encode-datoms-jsonl batch)] - (vreset! remaining (seq remaining')) - (.enqueue controller payload)))))}))) + (clj->js + {:pull (fn [controller] + (let [batch (fetch-snapshot-kvs-rows sql @last-addr snapshot-download-batch-size)] + (if (empty? batch) + (.close controller) + (let [payload (snapshot/encode-rows batch)] + (vreset! last-addr (first (peek batch))) + (.enqueue controller (frame-bytes payload))))))})))) (defn- upload-multipart! [^js bucket key stream opts] @@ -419,15 +408,13 @@ (http/bad-request "missing graph id") (let [stream (-> (snapshot-export-stream self) (maybe-compress-stream)) - conn (or (.-conn self) - (do (ensure-conn! self) (.-conn self))) - datom-count (snapshot-datom-count conn)] + row-count (snapshot-row-count (.-sql self))] (js/Response. stream #js {:status 200 :headers (js/Object.assign #js {"content-type" snapshot-content-type "content-encoding" snapshot-content-encoding} - #js {"x-snapshot-datom-count" (str datom-count)} + #js {"x-snapshot-row-count" (str row-count)} (common/cors-headers))}))))) (defn- handle-sync-snapshot-download diff --git a/deps/db-sync/test/logseq/db_sync/snapshot_test.cljs b/deps/db-sync/test/logseq/db_sync/snapshot_test.cljs deleted file mode 100644 index a447b76d2b..0000000000 --- a/deps/db-sync/test/logseq/db_sync/snapshot_test.cljs +++ /dev/null @@ -1,32 +0,0 @@ -(ns logseq.db-sync.snapshot-test - (:require [cljs.test :refer [deftest is testing]] - [logseq.db-sync.snapshot :as snapshot])) - -(def sample-datoms - [{:e 1 :a :db/ident :v :logseq.class/Page :tx 1 :added true} - {:e 2 :a :block/title :v "hello" :tx 1 :added true}]) - -(deftest datoms-jsonl-roundtrip-test - (testing "jsonl transit roundtrips datoms" - (let [payload (snapshot/encode-datoms-jsonl sample-datoms) - {:keys [datoms buffer]} (snapshot/parse-datoms-jsonl-chunk nil payload)] - (is (= sample-datoms datoms)) - (is (or (nil? buffer) (zero? (.-byteLength buffer))))))) - -(deftest datoms-jsonl-split-test - (testing "parse-datoms-jsonl-chunk handles partial trailing line" - (let [payload (snapshot/encode-datoms-jsonl sample-datoms) - split-pos (- (.-byteLength payload) 3) - part1 (.slice payload 0 split-pos) - part2 (.slice payload split-pos (.-byteLength payload)) - {datoms1 :datoms buffer :buffer} (snapshot/parse-datoms-jsonl-chunk nil part1) - {datoms2 :datoms next-buffer :buffer} (snapshot/parse-datoms-jsonl-chunk buffer part2)] - (is (= (subvec sample-datoms 0 1) datoms1)) - (is (= (subvec sample-datoms 1) datoms2)) - (is (or (nil? next-buffer) (zero? (.-byteLength next-buffer))))))) - -(deftest datoms-jsonl-finalize-buffer-test - (testing "finalize-datoms-jsonl-buffer parses the remaining line" - (let [payload (snapshot/encode-datoms-jsonl sample-datoms)] - (is (= sample-datoms (snapshot/finalize-datoms-jsonl-buffer payload))) - (is (= [] (snapshot/finalize-datoms-jsonl-buffer (js/Uint8Array.))))))) diff --git a/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs index 2863077c33..cc236abfad 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs @@ -30,47 +30,49 @@ (deftest snapshot-download-uses-gzip-encoding-when-compression-supported-test (async done (let [put-call (atom nil) + rows [[1 "row-1" nil] + [2 "row-2" "{\"a\":1}"]] bucket #js {:put (fn [key body opts] (reset! put-call {:key key :body body :opts opts}) (js/Promise.resolve #js {:ok true}))} + sql (empty-sql) conn (d/create-conn db-schema/schema) self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket} :conn conn :schema-ready true - :sql (empty-sql)} + :sql sql} {:keys [request url]} (request-url) original-compression-stream (.-CompressionStream js/globalThis) restore! #(aset js/globalThis "CompressionStream" original-compression-stream)] - (d/transact! conn [{:db/ident :logseq.class/Page - :block/title "Page"} - {:db/ident :logseq.kv/schema-version - :kv/value {:major 65 :minor 23}} - {:db/id 2 :block/title "hello"}]) (aset js/globalThis "CompressionStream" (passthrough-compression-stream-constructor)) - (-> (p/let [resp (sync-handler/handle {:self self - :request request - :url url - :route {:handler :sync/snapshot-download}}) + (-> (p/with-redefs [sync-handler/fetch-snapshot-kvs-rows (fn [_sql last-addr _limit] + (if (neg? last-addr) rows [])) + sync-handler/snapshot-row-count (fn [_sql] (count rows))] + (p/let [resp (sync-handler/handle {:self self + :request request + :url url + :route {:handler :sync/snapshot-download}}) text (.text resp) body (js->clj (js/JSON.parse text) :keywordize-keys true) http-metadata (aget (:opts @put-call) "httpMetadata") payload (js/Uint8Array. (:body @put-call)) - {:keys [datoms]} (snapshot/parse-datoms-jsonl-chunk nil payload)] + rows (snapshot/finalize-framed-buffer payload) + addrs (mapv first rows)] (is (= 200 (.-status resp))) (is (= "gzip" (:content-encoding body))) (is (= "gzip" (aget http-metadata "contentEncoding"))) - (is (= "application/x-ndjson" (aget http-metadata "contentType"))) - (is (= 5 (count datoms))) - (is (= [:logseq.kv/schema-version - :logseq.kv/schema-version - :logseq.kv/schema-version - :logseq.class/Page - :logseq.class/Page] - (mapv (fn [{:keys [e]}] - (:db/ident (d/entity @conn e))) - datoms)))) + (is (= "application/transit+json" (aget http-metadata "contentType"))) + (is (= 2 (count rows))) + (is (= (sort addrs) addrs)) + (is (every? (fn [[addr content _addresses]] + (and (int? addr) + (string? content))) + rows)) + (is (= [[1 "row-1" nil] + [2 "row-2" "{\"a\":1}"]] + rows)))) (p/then (fn [] (restore!) (done))) @@ -79,36 +81,44 @@ (is false (str error)) (done))))))) -(deftest snapshot-download-stream-route-returns-jsonl-datoms-test +(deftest snapshot-download-stream-route-returns-framed-kvs-rows-test (async done - (let [conn (d/create-conn db-schema/schema) + (let [rows [[1 "row-1" nil] + [2 "row-2" nil]] + sql (empty-sql) + conn (d/create-conn db-schema/schema) self #js {:env #js {} :conn conn :schema-ready true - :sql (empty-sql)} + :sql sql} {:keys [request]} (request-url "/sync/graph-1/snapshot/stream?graph-id=graph-1") original-compression-stream (.-CompressionStream js/globalThis) restore! #(aset js/globalThis "CompressionStream" original-compression-stream)] - (d/transact! conn [{:db/ident :logseq.class/Page - :block/title "Page"} - {:db/ident :logseq.kv/schema-version - :kv/value {:major 65 :minor 23}} - {:db/id 2 :block/title "hello"}]) (aset js/globalThis "CompressionStream" (passthrough-compression-stream-constructor)) - (-> (p/let [resp (sync-handler/handle-http self request) + (-> (p/with-redefs [sync-handler/fetch-snapshot-kvs-rows (fn [_sql last-addr _limit] + (if (neg? last-addr) rows [])) + sync-handler/snapshot-row-count (fn [_sql] (count rows))] + (p/let [resp (sync-handler/handle-http self request) encoding (.get (.-headers resp) "content-encoding") content-type (.get (.-headers resp) "content-type") buf (.arrayBuffer resp) payload (js/Uint8Array. buf) - datoms (snapshot/finalize-datoms-jsonl-buffer payload)] + rows (snapshot/finalize-framed-buffer payload) + addrs (mapv first rows)] (is (= 200 (.-status resp))) (is (= "gzip" encoding)) - (is (= "application/x-ndjson" content-type)) - (is (= 5 (count datoms))) - (is (= :logseq.kv/schema-version - (:db/ident (d/entity @conn (:e (first datoms))))))) + (is (= "application/transit+json" content-type)) + (is (= 2 (count rows))) + (is (= (sort addrs) addrs)) + (is (every? (fn [[addr content _addresses]] + (and (int? addr) + (string? content))) + rows)) + (is (= [[1 "row-1" nil] + [2 "row-2" nil]] + rows)))) (p/then (fn [] (restore!) (done))) @@ -226,25 +236,6 @@ (is false (str error)) (done))))))) -(deftest current-checksum-heals-stale-stored-checksum-test - (testing "server recomputes and persists checksum when stored checksum is stale" - (let [sql (test-sql/make-sql) - conn (storage/open-conn sql) - self #js {:sql sql - :conn conn - :schema-ready true} - stale-checksum "0000000000000000" - block-uuid (random-uuid)] - (d/transact! conn [{:block/uuid block-uuid - :block/title "hello"}]) - (is (string? (storage/get-checksum sql))) - (storage/set-checksum! sql stale-checksum) - (let [healed (sync-handler/current-checksum self)] - (is (string? healed)) - (is (not= stale-checksum healed)) - (is (= healed (storage/get-checksum sql))) - (is (= healed (sync-handler/current-checksum self))))))) - (deftest tx-batch-rejects-with-the-exact-failed-tx-entry-test (testing "db transact failure replies with the specific rejected tx entry" (let [sql (test-sql/make-sql) diff --git a/deps/db-sync/worker/wrangler.toml b/deps/db-sync/worker/wrangler.toml index 904d622e8f..efd4f184a9 100644 --- a/deps/db-sync/worker/wrangler.toml +++ b/deps/db-sync/worker/wrangler.toml @@ -3,6 +3,9 @@ main = "dist/worker/main.js" compatibility_date = "2025-05-05" compatibility_flags = [ "nodejs_compat" ] +[limits] +cpu_ms = 300000 + [version_metadata] binding = "CF_VERSION_METADATA" diff --git a/docs/adr/0014-kv-row-r2-snapshot-download.md b/docs/adr/0014-kv-row-r2-snapshot-download.md new file mode 100644 index 0000000000..8a14857bb7 --- /dev/null +++ b/docs/adr/0014-kv-row-r2-snapshot-download.md @@ -0,0 +1,58 @@ +# ADR 0014: KV-Row R2 Snapshot Download With Worker-Owned Low-Memory Import + +Date: 2026-04-01 +Status: Proposed + +## Context +Snapshot download previously exported Datascript datoms as gzip NDJSON from the +server and parsed/transacted datoms on the client main-thread handler path. + +That design had two issues: + +1. Server snapshot export walked full datoms and spent avoidable CPU/memory. +2. Client download logic lived in handler code and was not aligned with worker + ownership for large-graph import. + +We already use framed Transit `kvs` rows for snapshot upload. Download should +converge on the same wire format. + +## Decision +1. `GET /sync/:graph-id/snapshot/download` and `/snapshot/stream` export framed + Transit `kvs` rows (`[addr content addresses]`) instead of datom NDJSON. +2. Snapshot download payload content-type is `application/transit+json` + (gzip-compressed when available). +3. Server snapshot export reads directly from sqlite `kvs` rows in ascending + `addr` batches and streams framed payloads to response/R2. +4. Graph snapshot download orchestration is moved to + `frontend.worker.sync.download` and invoked from db-worker thread API. +5. Handler code delegates graph download to worker API instead of parsing + snapshot payloads directly. +6. Client import adds row-chunk API (`:thread-api/db-sync-import-rows-chunk`). + Row batches are staged in temp sqlite, then replayed into target conn in + schema-first order. +7. Replay order must transact schema-critical datoms before regular data: + - `:logseq.kv/schema-version` entity datoms + - attribute-definition datoms (`:db/ident` and `:db/*` metadata such as + `:db/valueType`, `:db/cardinality`, `:db/unique`, `:db/isComponent`) + - all remaining datoms + +## Consequences + +### Positive +- Lower server CPU/memory for snapshot export (no datom NDJSON generation). +- Download/upload snapshot format is unified around framed `kvs` rows. +- Download pipeline ownership moves to worker sync module. +- Schema-first replay protects index/schema correctness for large imports. + +### Tradeoffs +- Client still performs datom replay during finalize to rebuild a consistent + target store, so import cost shifts to worker finalize phase. +- Adds temp sqlite staging and one additional import path (`rows` alongside + legacy datom chunk path). + +## Verification +- Server tests assert snapshot download/stream return framed kv rows with + transit content-type and sorted addresses. +- Handler tests assert graph download delegates to worker API and maintains + download-state lifecycle. +- Worker tests assert rows-chunk API wiring and schema-first import ordering. diff --git a/docs/agent-guide/db-sync/protocol.md b/docs/agent-guide/db-sync/protocol.md index a7109cf970..35d766459f 100644 --- a/docs/agent-guide/db-sync/protocol.md +++ b/docs/agent-guide/db-sync/protocol.md @@ -99,7 +99,7 @@ - Build a snapshot file in R2 and return a download URL. - Response: `{"ok":true,"key":"/.snapshot","url":"/assets/:graph-id/.snapshot","content-encoding":"gzip"}`. - Error response (409): `{"error":"graph not ready"}` when bootstrap upload/import has not finished. - - The snapshot file stored in R2 is a gzip-compressed NDJSON stream of full Datascript datoms. Each line is a Transit JSON datom map: `{e,a,v,tx,added}`. + - The snapshot file stored in R2 is a framed Transit stream of sqlite `kvs` rows (`[addr, content, addresses]`), optionally gzip-compressed. - `POST /sync/:graph-id/snapshot/upload?reset=true|false` - Upload a snapshot stream for bootstrap import. Current upload format remains framed Transit JSON kvs rows, optionally gzip-compressed. - Request body: binary stream; headers should include `content-type: application/transit+json` and `content-encoding: gzip` when compressed. diff --git a/src/main/frontend/handler/db_based/sync.cljs b/src/main/frontend/handler/db_based/sync.cljs index 8b95add81a..b089dcd5e2 100644 --- a/src/main/frontend/handler/db_based/sync.cljs +++ b/src/main/frontend/handler/db_based/sync.cljs @@ -11,7 +11,6 @@ [lambdaisland.glogi :as log] [logseq.db :as ldb] [logseq.db-sync.malli-schema :as db-sync-schema] - [logseq.db-sync.snapshot :as snapshot] [logseq.db.sqlite.util :as sqlite-util] [promesa.core :as p])) @@ -32,96 +31,6 @@ (or config/db-sync-http-base (ws->http-base config/db-sync-ws-url))) -(defn- ->uint8 [data] - (cond - (instance? js/Uint8Array data) data - (instance? js/ArrayBuffer data) (js/Uint8Array. data) - (string? data) (.encode (js/TextEncoder.) data) - :else (js/Uint8Array. data))) - -(defn- gzip-bytes? - [^js payload] - (and (some? payload) - (>= (.-byteLength payload) 2) - (= 31 (aget payload 0)) - (= 139 (aget payload 1)))) - -(defn- bytes->stream - [^js payload] - (js/ReadableStream. - #js {:start (fn [controller] - (.enqueue controller payload) - (.close controller))})) - -(defn- stream payload) - decompressed (.pipeThrough stream (js/DecompressionStream. "gzip")) - resp (js/Response. decompressed) - buf (.arrayBuffer resp)] - (->uint8 buf)) - (p/rejected (ex-info "gzip decompression not supported" - {:type :db-sync/decompression-not-supported})))) - -(defn- uint8 buf)] - (if (gzip-bytes? chunk) - ( resp .-headers (.get "content-encoding"))] - (cond - (nil? (.-body resp)) - nil - - (= "gzip" encoding) - (when (exists? js/DecompressionStream) - (.pipeThrough (.-body resp) (js/DecompressionStream. "gzip"))) - - :else - (.-body resp)))) - -(defn- = (count remaining) batch-size) - (let [batch (subvec remaining 0 batch-size) - rest-datoms (subvec remaining batch-size)] - (p/let [_ (on-batch batch)] - (p/recur rest-datoms))) - remaining))) - -(defn- uint8 (.-value result))) - pending (into pending datoms)] - (p/let [pending ( (if (and graph-uuid base) - (-> (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token) - graph (str config/db-version-prefix graph-name) - pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull") - {:method "GET"} - {:response-schema :sync/pull}) - remote-tx (:t pull-resp) - _ (when-not (integer? remote-tx) - (throw (ex-info "non-integer remote-tx when downloading graph" - {:graph graph-name - :remote-tx remote-tx}))) - snapshot-resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/download") - {:method "GET"} - {:response-schema :sync/snapshot-download}) - resp (js/fetch (:url snapshot-resp) - (clj->js (with-auth-headers {:method "GET"}))) - _ (state/pub-event! - [:rtc/log {:type :rtc.log/download - :sub-type :download-progress - :graph-uuid graph-uuid - :message "Start downloading graph snapshot"}])] - (when-not (.-ok resp) - (throw (ex-info "snapshot download failed" - {:graph graph-name - :status (.-status resp)}))) - (let [import-id* (atom nil) - ensure-import! (fn [] - (if-let [import-id @import-id*] - (p/resolved import-id) - (p/let [{:keys [import-id]} (state/ (p/do! - (when-let [search-db (worker-state/get-sqlite-conn repo :search)] - (search/truncate-table! search-db)) - (rtc-log-and-state/rtc-log :rtc.log/download - {:sub-type :download-progress - :graph-uuid graph-id - :message "Saving data to DB"}) - (db-sync/rehydrate-large-titles-from-db! repo graph-id) - (rtc-log-and-state/rtc-log :rtc.log/download - {:sub-type :download-completed - :graph-uuid graph-id - :message "Graph is ready!"}) - (when-let [^js db (worker-state/get-sqlite-conn repo :db)] - (.exec db "PRAGMA wal_checkpoint(2)")) - (client-op/update-local-tx repo remote-tx) - (shared-service/broadcast-to-clients! :add-repo {:repo repo})) - (p/catch (fn [error] - (js/console.error error))))) +(def-thread-api :thread-api/db-sync-close-db + [repo] + (close-db! repo)) -(defn- stale-import-ex-info - [repo graph-id import-id] - (ex-info "stale db sync import" - {:type :db-sync/stale-import - :repo repo - :graph-id graph-id - :import-id import-id})) +(def-thread-api :thread-api/db-sync-invalidate-search-db + [repo] + (tx - [{:keys [e a v]}] - [:db/add e a v]) - -(defn- import-datoms-batch! - [conn aes-key graph-e2ee? datoms] - (p/let [datoms-batch (if graph-e2ee? - (sync-crypt/tx)) - datoms-batch) - regular-tx-data (into [] (comp (remove #(= :db/ident (:a %))) - (map datom->tx)) - datoms-batch) - tx-data (into ident-tx-data regular-tx-data)] - (when (seq tx-data) - (d/transact! conn tx-data {:sync-download-graph? true})))) - -(defn- log-import-progress! - [graph-id import-id datoms-count] - (when (pos? datoms-count) - (let [{:keys [imported-datoms total-datoms]} - (swap! *import-state - (fn [state] - (if (= import-id (:import-id state)) - (update state :imported-datoms (fnil + 0) datoms-count) - state)))] - (rtc-log-and-state/rtc-log :rtc.log/download - {:sub-type :download-progress - :graph-uuid graph-id - :message (if (some? total-datoms) - (str "Importing data " imported-datoms "/" total-datoms) - (str "Importing data " imported-datoms))})))) +(def-thread-api :thread-api/db-sync-rehydrate-large-titles + [repo graph-id] + (db-sync/rehydrate-large-titles-from-db! repo graph-id)) (def-thread-api :thread-api/db-sync-import-prepare [repo reset? graph-id graph-e2ee? & [total-datoms]] - (let [graph-e2ee? (if (nil? graph-e2ee?) true (true? graph-e2ee?))] - (-> (p/let [_ (when-let [state @*import-state] - (close-import-state! state) - (close-db! (:repo state))) - _ (reset! *import-state nil) - _ (when reset? (close-db! repo)) - _ (when reset? ( (p/let [{:keys [conn aes-key graph-e2ee?]} (require-import-state! nil graph-id import-id) - _ (import-datoms-batch! conn aes-key graph-e2ee? datoms)] - (log-import-progress! graph-id import-id (count datoms)) - true) - (p/catch (fn [error] - (when-not (= :db-sync/stale-import (:type (ex-data error))) - (clear-import-state! import-id)) - (throw error))))) +(def-thread-api :thread-api/db-sync-import-rows-chunk + [rows graph-id import-id] + (sync-download/import-rows-chunk! rows graph-id import-id)) (def-thread-api :thread-api/db-sync-import-finalize [repo graph-id remote-tx import-id] - (-> (p/let [_ (require-import-state! repo graph-id import-id) - result (complete-datoms-import! repo graph-id remote-tx) - _ (reset! *import-state nil)] - result) - (p/catch (fn [error] - (when-not (= :db-sync/stale-import (:type (ex-data error))) - (clear-import-state! import-id)) - (throw error))))) + (sync-download/finalize-import! repo graph-id remote-tx import-id)) (def-thread-api :thread-api/release-access-handles [repo] diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index ed63d77380..9a302d5376 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -1,6 +1,7 @@ (ns frontend.worker.sync "Sync client" (:require + [frontend.worker.platform :as platform] [frontend.worker.shared-service :as shared-service] [frontend.worker.state :as worker-state] [frontend.worker.sync.apply-txs :as sync-apply] @@ -11,13 +12,11 @@ [frontend.worker.sync.presence :as sync-presence] [frontend.worker.sync.transport :as sync-transport] [frontend.worker.sync.upload :as sync-upload] + [frontend.worker.sync.util :as sync-util] [lambdaisland.glogi :as log] [logseq.common.util :as common-util] [logseq.db-sync.checksum :as sync-checksum] - [promesa.core :as p] - [frontend.worker.platform :as platform] - [frontend.worker.sync.util :as sync-util] - [frontend.worker.sync.download :as sync-download])) + [promesa.core :as p])) (def ^:private reconnect-base-delay-ms 1000) (def ^:private reconnect-max-delay-ms 30000) @@ -336,14 +335,6 @@ [repo] (sync-upload/upload-graph! repo)) -(defn download-graph! - [repo] - (sync-download/download-graph! repo)) - -(defn download-graph-by-id! - [repo graph-id graph-e2ee?] - (sync-download/download-graph-by-id! repo graph-id graph-e2ee?)) - (defn list-remote-graphs! [] (let [base (sync-auth/http-base-url @worker-state/*db-sync-config)] diff --git a/src/main/frontend/worker/sync/apply_txs.cljs b/src/main/frontend/worker/sync/apply_txs.cljs index 14048b7f0e..2f6fd8b15c 100644 --- a/src/main/frontend/worker/sync/apply_txs.cljs +++ b/src/main/frontend/worker/sync/apply_txs.cljs @@ -10,7 +10,6 @@ [frontend.worker.sync.client-op :as client-op] [frontend.worker.sync.const :as rtc-const] [frontend.worker.sync.crypt :as sync-crypt] - [frontend.worker.sync.download :as sync-download] [frontend.worker.sync.large-title :as sync-large-title] [frontend.worker.sync.presence :as sync-presence] [frontend.worker.sync.transport :as sync-transport] @@ -145,7 +144,7 @@ :rehydrate-large-titles!-f rehydrate-large-titles!})) (defn request-asset-download! [repo asset-uuid] - (sync-download/request-asset-download! + (sync-assets/request-asset-download! repo asset-uuid {:current-client-f current-client :enqueue-asset-task-f enqueue-asset-task! diff --git a/src/main/frontend/worker/sync/assets.cljs b/src/main/frontend/worker/sync/assets.cljs index 4024774abb..f539da688f 100644 --- a/src/main/frontend/worker/sync/assets.cljs +++ b/src/main/frontend/worker/sync/assets.cljs @@ -1,22 +1,33 @@ (ns frontend.worker.sync.assets "Asset sync helpers for db sync." - (:require [datascript.core :as d] - [frontend.worker.state :as worker-state] - [frontend.worker.sync.auth :as sync-auth] - [frontend.worker.sync.client-op :as client-op] - [frontend.worker.sync.download :as sync-download] - [frontend.worker.sync.large-title :as sync-large-title] - [lambdaisland.glogi :as log] - [logseq.db :as ldb] - [promesa.core :as p])) + (:require + [datascript.core :as d] + [frontend.common.crypt :as crypt] + [frontend.worker.state :as worker-state] + [frontend.worker.sync.auth :as sync-auth] + [frontend.worker.sync.client-op :as client-op] + [frontend.worker.sync.crypt :as sync-crypt] + [frontend.worker.sync.large-title :as sync-large-title] + [lambdaisland.glogi :as log] + [logseq.db :as ldb] + [promesa.core :as p])) (def max-asset-size (* 100 1024 1024)) +(defn exported-graph-aes-key + [repo graph-id fail-fast-f] + (if (sync-crypt/graph-e2ee? repo) + (p/let [aes-key (sync-crypt/ (p/let [meta (when (seq asset-type) + (worker-state/uint8 [data] + (cond + (instance? js/Uint8Array data) data + (instance? js/ArrayBuffer data) (js/Uint8Array. data) + (string? data) (.encode (js/TextEncoder.) data) + :else (js/Uint8Array. data))) -(defn download-remote-asset! - [repo graph-id asset-uuid asset-type] - (let [base (sync-auth/http-base-url @worker-state/*db-sync-config)] - (if (and (seq base) (seq graph-id) (seq asset-type)) - (p/let [exported-aes-key (exported-graph-aes-key - repo graph-id - (fn [tag data] - (throw (ex-info (name tag) data))))] - (worker-state/= (.-byteLength payload) 2) + (= 31 (aget payload 0)) + (= 139 (aget payload 1)))) -(defn request-asset-download! - [repo asset-uuid {:keys [current-client-f enqueue-asset-task-f broadcast-rtc-state!-f]}] - (when-let [client (current-client-f repo)] - (when-let [graph-id (:graph-id client)] - (enqueue-asset-task-f - client - #(when-let [conn (worker-state/get-datascript-conn repo)] - (when-let [ent (d/entity @conn [:block/uuid asset-uuid])] - (let [asset-type (:logseq.property.asset/type ent)] - (-> (p/let [meta (when (seq asset-type) - (worker-state/stream + [^js payload] + (js/ReadableStream. + #js {:start (fn [controller] + (.enqueue controller payload) + (.close controller))})) -(defn download-graph! - [_repo] - (throw (ex-info "download-graph! not implemented yet" {}))) +(defn- stream payload) + decompressed (.pipeThrough stream (js/DecompressionStream. "gzip")) + resp (js/Response. decompressed) + buf (.arrayBuffer resp)] + (->uint8 buf)) + (p/rejected (ex-info "gzip decompression not supported" + {:type :db-sync/decompression-not-supported})))) + +(defn- uint8 buf)] + (if (gzip-bytes? chunk) + ( resp .-headers (.get "content-encoding"))] + (cond + (nil? (.-body resp)) + nil + + (= "gzip" encoding) + (when (exists? js/DecompressionStream) + (.pipeThrough (.-body resp) (js/DecompressionStream. "gzip"))) + + :else + (.-body resp)))) + +(defn- = (count remaining) batch-size) + (let [batch (subvec remaining 0 batch-size) + rest-rows (subvec remaining batch-size)] + (p/let [_ (on-batch batch)] + (p/recur rest-rows))) + remaining))) + +(defn- uint8 (.-value result))) + pending (into pending rows)] + (p/let [pending ( (p/do! + (when-let [search-db (worker-state/get-sqlite-conn repo :search)] + (search/truncate-table! search-db)) + (rtc-log-and-state/rtc-log :rtc.log/download + {:sub-type :download-progress + :graph-uuid graph-id + :message "Saving data to DB"}) + (if-let [rehydrate-f (@thread-api/*thread-apis :thread-api/db-sync-rehydrate-large-titles)] + (rehydrate-f repo graph-id) + (fail-fast :db-sync/missing-field {:field :thread-api/db-sync-rehydrate-large-titles})) + (rtc-log-and-state/rtc-log :rtc.log/download + {:sub-type :download-completed + :graph-uuid graph-id + :message "Graph is ready!"}) + (when-let [^js db (worker-state/get-sqlite-conn repo :db)] + (.exec db "PRAGMA wal_checkpoint(2)")) + (client-op/update-local-tx repo remote-tx) + (shared-service/broadcast-to-clients! :add-repo {:repo repo})) + (p/catch (fn [error] + (js/console.error error))))) + +(defn- require-thread-api-f! + [k] + (if-let [f (@thread-api/*thread-apis k)] + f + (fail-fast :db-sync/missing-field {:field k}))) + +(defn- stale-import-ex-info + [repo graph-id import-id] + (ex-info "stale db sync import" + {:type :db-sync/stale-import + :repo repo + :graph-id graph-id + :import-id import-id})) + +(defn- import-temp-pool-name + [repo] + (worker-util/get-pool-name (str "download-import-" repo))) + +(defn- close-import-state! + [{:keys [rows-db rows-pool]}] + (when rows-db + (try + (.close rows-db) + (catch :default _))) + (when rows-pool + (-> (platform/remove-storage-pool! (platform/current) rows-pool) + (p/catch (fn [_] nil))))) + +(defn- clear-import-state! + [import-id] + (when-let [state @*import-state] + (when (= import-id (:import-id state)) + (close-import-state! state) + (reset! *import-state nil)))) + +(defn- require-import-state! + [repo graph-id import-id] + (let [state @*import-state] + (when-not (and state + (= import-id (:import-id state)) + (or (nil? repo) (= repo (:repo state))) + (= graph-id (:graph-id state))) + (throw (stale-import-ex-info repo graph-id import-id))) + state)) + +(defn- upsert-addr-content! + [^js db data] + (.transaction + db + (fn [tx] + (doseq [item data] + (.exec tx #js {:sql "INSERT INTO kvs (addr, content, addresses) values ($addr, $content, $addresses) on conflict(addr) do update set content = $content, addresses = $addresses" + :bind item}))))) + +(defn import-rows-batch! + [{:keys [rows-db]} rows] + (when-not rows-db + (throw (ex-info "missing import rows db" + {:type :db-sync/missing-field + :field :rows-db}))) + (let [data (map (fn [[addr content addresses]] + #js {:$addr addr + :$content content + :$addresses addresses}) + rows)] + (upsert-addr-content! rows-db data)) + (count rows)) + +(defn- tx + [{:keys [e a v]}] + [:db/add e a v]) + +(defn- import-datoms-batch! + [conn aes-key graph-e2ee? datoms] + (p/let [datoms-batch (if graph-e2ee? + (sync-crypt/tx)) + datoms-batch) + regular-tx-data (into [] (comp (remove #(= :db/ident (:a %))) + (map datom->tx)) + datoms-batch) + tx-data (into ident-tx-data regular-tx-data)] + (when (seq tx-data) + (d/transact! conn tx-data {:sync-download-graph? true})))) + +(defn- schema-datom? + [ident-eids schema-version-eid datom] + (or (= schema-version-eid (:e datom)) + (and (contains? ident-eids (:e datom)) + (or (= :db/ident (:a datom)) + (= "db" (namespace (:a datom))))))) + +(defn snapshot-datoms-in-import-order + [conn] + (let [db @conn + schema-version-eid (some-> (d/entity db :logseq.kv/schema-version) :db/id) + ident-eids (into #{} + (map :e) + (d/datoms db :avet :db/ident)) + schema-datom?* #(schema-datom? ident-eids schema-version-eid %) + ordered-datoms (fn [pred] + (sequence + (comp (filter pred) + (map #(select-keys % [:e :a :v]))) + (d/datoms db :eavt)))] + (concat (ordered-datoms schema-datom?*) + (ordered-datoms #(not (schema-datom?* %)))))) + +(defn- take-import-datoms-batch + [datoms batch-size] + (loop [batch (transient []) + remaining (seq datoms) + n 0] + (if (or (nil? remaining) + (>= n batch-size)) + [(persistent! batch) remaining] + (recur (conj! batch (first remaining)) + (next remaining) + (inc n))))) + +(defn- (p/let [close-db-f (require-thread-api-f! :thread-api/db-sync-close-db) + invalidate-search-db-f (require-thread-api-f! :thread-api/db-sync-invalidate-search-db) + create-or-open-db-f (require-thread-api-f! :thread-api/create-or-open-db) + _ (when-let [state @*import-state] + (close-import-state! state) + (close-db-f (:repo state))) + _ (reset! *import-state nil) + _ (when reset? (close-db-f repo)) + _ (when reset? (invalidate-search-db-f repo)) + import-id (str (random-uuid)) + aes-key (when graph-e2ee? + (sync-crypt/ (p/let [state (require-import-state! nil graph-id import-id) + state ( (p/let [state (require-import-state! repo graph-id import-id) + _ (when (:rows-imported? state) + (js (with-auth-headers {:method "GET"}))) + _ (log-f {:sub-type :download-progress + :graph-uuid graph-id + :message "Start downloading graph snapshot"})] + (when-not (.-ok resp) + (throw (ex-info "snapshot download failed" + {:repo repo + :status (.-status resp)}))) + (let [import-id* (atom nil) + ensure-import! (fn [] + (if-let [import-id @import-id*] + (p/resolved import-id) + (p/let [{:keys [import-id]} (prepare-import! repo true graph-id graph-e2ee?)] + (reset! import-id* import-id) + import-id)))] + (p/let [_ (stream - [^js payload chunk-size] - (js/ReadableStream. - #js {:start (fn [controller] - (loop [offset 0] - (when (< offset (.-byteLength payload)) - (.enqueue controller (.slice payload offset (min (+ offset chunk-size) - (.-byteLength payload)))) - (recur (+ offset chunk-size)))) - (.close controller))})) - (deftest remove-member-request-test (async done (let [called (atom nil)] @@ -95,7 +68,9 @@ db/get-db (fn [] :db) ldb/get-graph-schema-version (fn [_] {:major 65}) state/ (p/let [gzip-bytes ( (p/with-redefs [db-sync/http-base (fn [] "http://base") - db-sync/fetch-json (fn [url _opts _schema] - (cond - (string/ends-with? url "/pull") - (p/resolved {:t 42}) - - (= url download-url) - (p/resolved {:ok true - :url asset-url - :key "graph-1/snapshot-1.snapshot" - :content-encoding "gzip"}) - - :else - (p/rejected (ex-info "unexpected fetch-json URL" - {:url url})))) - user-handler/task--ensure-id&access-token (fn [resolve _reject] - (resolve true)) - state/ (p/with-redefs [db-sync/http-base (fn [] "http://base") + user-handler/task--ensure-id&access-token (fn [resolve _reject] + (resolve true)) + state/ (p/let [gzip-bytes (stream gzip-bytes 3)] - (set! js/fetch - (fn [url opts] - (let [method (or (aget opts "method") "GET")] - (cond - (and (= url asset-url) (= method "GET")) - (js/Promise.resolve - #js {:ok true - :status 200 - :headers #js {:get (fn [header] - (case header - "content-length" (str (.-byteLength gzip-bytes)) - "content-encoding" "gzip" - nil))} - :body stream - :arrayBuffer (fn [] (throw (js/Error. "arrayBuffer should not be used")))}) - :else - (js/Promise.resolve #js {:ok false :status 404}))))) - (-> (p/with-redefs [db-sync/http-base (fn [] "http://base") - db-sync/fetch-json (fn [url _opts _schema] - (cond - (string/ends-with? url "/pull") - (p/resolved {:t 42}) - - (= url download-url) - (p/resolved {:ok true - :url asset-url - :key "graph-1/snapshot-1.snapshot" - :content-encoding "gzip"}) - - :else - (p/rejected (ex-info "unexpected fetch-json URL" - {:url url})))) - user-handler/task--ensure-id&access-token (fn [resolve _reject] - (resolve true)) - state/ (p/with-redefs [db-sync/http-base (fn [] "http://base") + user-handler/task--ensure-id&access-token (fn [resolve _reject] + (resolve true)) + state/ (p/with-redefs [db-sync/rehydrate-large-titles-from-db! (fn [_repo _graph-id] (p/resolved nil)) - rtc-log-and-state/rtc-log (fn [& _] nil) + :thread-api/export-db (fn [_repo] (p/resolved nil)) + :thread-api/db-sync-rehydrate-large-titles (fn [_repo _graph-id] (p/resolved nil)))) + (-> (p/with-redefs [rtc-log-and-state/rtc-log (fn [& _] nil) client-op/update-graph-uuid (fn [& _] nil) client-op/update-local-tx (fn [& _] nil) shared-service/broadcast-to-clients! (fn [& _] nil)] - (#'db-worker/complete-datoms-import! test-repo "graph-1" 42)) + (sync-download/complete-datoms-import! test-repo "graph-1" 42)) (p/then (fn [_] (is true) (vreset! thread-api/*thread-apis thread-apis-prev) @@ -212,15 +210,17 @@ :thread-api/create-or-open-db (fn [_repo _opts] (swap! worker-state/*datascript-conns assoc repo conn) - (p/resolved nil)))) + (p/resolved nil)) + :thread-api/db-sync-close-db + (fn [_repo] nil) + :thread-api/db-sync-invalidate-search-db + (fn [_repo] (p/resolved nil)) + :thread-api/db-sync-rehydrate-large-titles + (fn [_repo _graph-id] (p/resolved nil)))) (-> (f) (p/finally (fn [] (vreset! thread-api/*thread-apis thread-apis-prev)))))) -(def sample-datoms - [{:e 1 :a :db/ident :v :logseq.class/Page :tx 1 :added true} - {:e 2 :a :block/title :v "hello" :tx 1 :added true}]) - (deftest db-sync-import-prepare-replaces-active-import-state-test (async done (restoring-worker-state @@ -231,8 +231,7 @@ (with-fake-create-or-open-db test-repo conn-a (fn [] - (-> (p/with-redefs [db-worker/close-db! (fn [_] nil) - db-worker/ (p/with-redefs [db-worker/close-db! (fn [_] nil)] (p/let [first-import (prepare test-repo true "graph-1" false) _ (swap! worker-state/*datascript-conns assoc test-repo conn-b) second-import (prepare test-repo true "graph-1" false)] @@ -244,84 +243,7 @@ (is false (str error)) (done))))))))))) -(deftest db-sync-import-datoms-chunk-rejects-stale-import-id-test - (async done - (restoring-worker-state - (fn [] - (let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare) - datoms-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-datoms-chunk) - conn (d/create-conn db-schema/schema)] - (with-fake-create-or-open-db - test-repo conn - (fn [] - (-> (p/with-redefs [db-worker/close-db! (fn [_] nil) - db-worker/ stale-outcome :error ex-data :type))) - (is (nil? (d/entity @conn 2))) - (-> (datoms-chunk sample-datoms "graph-1" (:import-id second-import)) - (p/then (fn [_] - (is (= "hello" (:block/title (d/entity @conn 2)))) - (done)))))) - (p/catch (fn [error] - (is false (str error)) - (done))))))))))) - -(deftest db-sync-import-datoms-chunk-imports-plain-datoms-to-active-db-test - (async done - (restoring-worker-state - (fn [] - (let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare) - datoms-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-datoms-chunk) - conn (d/create-conn db-schema/schema)] - (with-fake-create-or-open-db - test-repo conn - (fn [] - (-> (p/with-redefs [db-worker/close-db! (fn [_] nil) - db-worker/ (p/with-redefs [db-worker/close-db! (fn [_] nil) - db-worker/ (p/with-redefs [db-worker/close-db! (fn [_] nil) - db-worker/ (p/with-redefs [rtc-log-and-state/rtc-log (fn [& _] nil) client-op/update-local-tx (fn [& _] nil) shared-service/broadcast-to-clients! (fn [& _] nil)] (p/let [first-import (prepare test-repo true "graph-1" false) @@ -349,37 +268,94 @@ (is false (str error)) (done))))))))))) -(deftest db-sync-import-finalize-completes-active-db-import-test +(deftest db-sync-import-rows-chunk-calls-import-rows-batch-test (async done (restoring-worker-state (fn [] (let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare) - datoms-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-datoms-chunk) - finalize (@thread-api/*thread-apis :thread-api/db-sync-import-finalize) + rows-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-rows-chunk) conn (d/create-conn db-schema/schema) - search-db #js {:close (fn [] nil) - :exec (fn [_sql] nil)} - main-db #js {:exec (fn [_sql] nil)}] - (reset! worker-state/*sqlite-conns {test-repo {:db main-db :search search-db :client-ops nil}}) + rows [[1 "row-1" nil] + [2 "row-2" nil]] + captured-rows (atom nil)] (with-fake-create-or-open-db test-repo conn (fn [] (-> (p/with-redefs [db-worker/close-db! (fn [_] nil) - db-worker/latest-remote-tx] + (reset! worker-state/*datascript-conns {test-repo conn}) + (reset! db-sync/*repo->latest-remote-tx {test-repo 11}) + (try + (with-redefs [client-op/get-local-tx (fn [_repo] 7) + client-op/get-local-checksum (fn [_repo] "local-checksum") + worker-db-validate/validate-db (fn [& args] + (reset! captured args) + {:ok true})] + (validate test-repo) + (is (= [conn nil] + @captured))) + (finally + (reset! db-sync/*repo->latest-remote-tx latest-prev))))))) (deftest thread-api-recompute-checksum-diagnostics-passes-sync-diagnostics-test (restoring-worker-state (fn []