diff --git a/deps/worker-sync/src/logseq/worker_sync/worker.cljs b/deps/worker-sync/src/logseq/worker_sync/worker.cljs index ee448623cf..09347311b6 100644 --- a/deps/worker-sync/src/logseq/worker_sync/worker.cljs +++ b/deps/worker-sync/src/logseq/worker_sync/worker.cljs @@ -212,7 +212,10 @@ " on conflict(addr) do update set content = excluded.content, addresses = excluded.addresses") addr content - addresses)))))) + addresses)))) + (set! (.-conn self) (storage/open-conn sql)) + (let [conn (.-conn self)] + (prn :debug :import-datoms-count (count (d/datoms @conn :eavt)))))) (defn- cycle-reject-response [db tx-data {:keys [attr]}] {:type "tx/reject" @@ -244,7 +247,8 @@ (do (prn :debug "cycle detected: " cycle-info) (cycle-reject-response db order-fixed cycle-info)) - (let [{:keys [tx-data db-before db-after]} (ldb/transact! conn order-fixed) + ;; TODO: replace d/transact! with ldb/transact! to enable db validation + (let [{:keys [tx-data db-before db-after]} (d/transact! conn order-fixed) normalized-data (db-normalize/normalize-tx-data db-after db-before tx-data) new-t (storage/next-t! sql) created-at (common/now-ms) diff --git a/src/main/frontend/worker/db_worker.cljs b/src/main/frontend/worker/db_worker.cljs index 67c9d14c4c..7c78776d13 100644 --- a/src/main/frontend/worker/db_worker.cljs +++ b/src/main/frontend/worker/db_worker.cljs @@ -131,6 +131,11 @@ :$addresses addresses}) rows)) +(defn- enable-sqlite-wal-mode! + [^Object db] + (.exec db "PRAGMA locking_mode=exclusive") + (.exec db "PRAGMA journal_mode=WAL")) + (defn- ensure-worker-sync-import-db! [repo reset?] (p/let [^js pool ( ? order by addr asc limit ?" - :bind #js [last-addr limit] - :rowMode "object"})] - (if (and rows (pos? (.-length rows))) - (mapv (fn [row] (js->clj row :keywordize-keys true)) rows) - []))) + (.exec db #js {:sql "select addr, content, addresses from kvs where addr > ? order by addr asc limit ?" + :bind #js [last-addr limit] + :rowMode "object"})) (defn upload-graph! [repo] @@ -577,7 +574,7 @@ (if-let [db (worker-state/get-sqlite-conn repo :db)] (do (ensure-client-graph-uuid! repo graph-id) - (p/loop [last-addr 0 + (p/loop [last-addr -1 first-batch? true] (let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)] (if (empty? rows) @@ -588,13 +585,15 @@ #js {:done true})})] (client-op/add-all-exists-asset-as-ops repo) {:graph-id graph-id}) - (let [max-addr (apply max (map :addr rows))] + (let [max-addr (apply max (map (fn [row] (aget row "addr")) rows))] + (prn :debug :max-addr max-addr + :rows (map (fn [row] (aget row "addr")) rows)) (p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import") {:method "POST" :headers {"content-type" "application/json"} :body (js/JSON.stringify #js {:reset first-batch? - :rows (clj->js rows)})})] + :rows rows})})] (p/recur max-addr false))))))) (p/rejected (ex-info "worker-sync missing sqlite db" {:repo repo :graph-id graph-id}))))))