fix: import

This commit is contained in:
Tienson Qin
2026-01-08 18:10:03 +08:00
parent e89fdb92b6
commit fc4351c716
3 changed files with 20 additions and 17 deletions

View File

@@ -212,7 +212,10 @@
" on conflict(addr) do update set content = excluded.content, addresses = excluded.addresses")
addr
content
addresses))))))
addresses))))
(set! (.-conn self) (storage/open-conn sql))
(let [conn (.-conn self)]
(prn :debug :import-datoms-count (count (d/datoms @conn :eavt))))))
(defn- cycle-reject-response [db tx-data {:keys [attr]}]
{:type "tx/reject"
@@ -244,7 +247,8 @@
(do
(prn :debug "cycle detected: " cycle-info)
(cycle-reject-response db order-fixed cycle-info))
(let [{:keys [tx-data db-before db-after]} (ldb/transact! conn order-fixed)
;; TODO: replace d/transact! with ldb/transact! to enable db validation
(let [{:keys [tx-data db-before db-after]} (d/transact! conn order-fixed)
normalized-data (db-normalize/normalize-tx-data db-after db-before tx-data)
new-t (storage/next-t! sql)
created-at (common/now-ms)

View File

@@ -131,6 +131,11 @@
:$addresses addresses})
rows))
(defn- enable-sqlite-wal-mode!
[^Object db]
(.exec db "PRAGMA locking_mode=exclusive")
(.exec db "PRAGMA journal_mode=WAL"))
(defn- ensure-worker-sync-import-db!
[repo reset?]
(p/let [^js pool (<get-opfs-pool repo)
@@ -241,11 +246,6 @@
debug-log-db (new (.-OpfsSAHPoolDb pool) (str "debug-log" repo-path))]
[db search-db client-ops-db debug-log-db])))
(defn- enable-sqlite-wal-mode!
[^Object db]
(.exec db "PRAGMA locking_mode=exclusive")
(.exec db "PRAGMA journal_mode=WAL"))
(defn- gc-sqlite-dbs!
"Gc main db weekly and rtc ops db each time when opening it"
[sqlite-db client-ops-db debug-log-db datascript-conn {:keys [full-gc?]}]
@@ -263,7 +263,7 @@
:kv/value (common-util/time-ms)}]))))
(defn- <create-or-open-db!
[repo {:keys [config datoms worker-sync-download-graph?] :as opts}]
[repo {:keys [config datoms] :as opts}]
(when-not (worker-state/get-sqlite-conn repo)
(p/let [[db search-db client-ops-db debug-log-db :as dbs] (get-dbs repo)
storage (new-sqlite-storage db)

View File

@@ -560,12 +560,9 @@
(defn- fetch-kvs-rows
[db last-addr limit]
(let [rows (.exec db #js {:sql "select addr, content, addresses from kvs where addr > ? order by addr asc limit ?"
:bind #js [last-addr limit]
:rowMode "object"})]
(if (and rows (pos? (.-length rows)))
(mapv (fn [row] (js->clj row :keywordize-keys true)) rows)
[])))
(.exec db #js {:sql "select addr, content, addresses from kvs where addr > ? order by addr asc limit ?"
:bind #js [last-addr limit]
:rowMode "object"}))
(defn upload-graph!
[repo]
@@ -577,7 +574,7 @@
(if-let [db (worker-state/get-sqlite-conn repo :db)]
(do
(ensure-client-graph-uuid! repo graph-id)
(p/loop [last-addr 0
(p/loop [last-addr -1
first-batch? true]
(let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)]
(if (empty? rows)
@@ -588,13 +585,15 @@
#js {:done true})})]
(client-op/add-all-exists-asset-as-ops repo)
{:graph-id graph-id})
(let [max-addr (apply max (map :addr rows))]
(let [max-addr (apply max (map (fn [row] (aget row "addr")) rows))]
(prn :debug :max-addr max-addr
:rows (map (fn [row] (aget row "addr")) rows))
(p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import")
{:method "POST"
:headers {"content-type" "application/json"}
:body (js/JSON.stringify
#js {:reset first-batch?
:rows (clj->js rows)})})]
:rows rows})})]
(p/recur max-addr false)))))))
(p/rejected (ex-info "worker-sync missing sqlite db"
{:repo repo :graph-id graph-id}))))))