initial sync graph

This commit is contained in:
Tienson Qin
2026-01-07 17:54:26 +08:00
parent 98be903d53
commit 93269f83fc
4 changed files with 64 additions and 138 deletions

View File

@@ -76,6 +76,7 @@
(let [conn (.-conn self)
db @conn
datoms (protocol/datoms->wire (d/datoms db :eavt))]
(prn :debug :count (count db))
{:type "snapshot/ok"
:t (t-now self)
:datoms (common/write-transit datoms)}))
@@ -86,73 +87,9 @@
:attr (name attr)
:server_values (common/write-transit (cycle/server-values-for db tx-data attr))})
(defn- entity->lookup [db entity]
(cond
(vector? entity) entity
(uuid? entity) [:block/uuid entity]
(keyword? entity) [:db/ident entity]
(map? entity)
(or (when-let [uuid (:block/uuid entity)]
[:block/uuid uuid])
(when-let [ident (:db/ident entity)]
[:db/ident ident])
(when-let [eid (:db/id entity)]
(when (vector? eid)
eid)))
(instance? Entity entity)
(or (when-let [uuid (:block/uuid entity)]
[:block/uuid uuid])
(when-let [ident (:db/ident entity)]
[:db/ident ident]))
(number? entity) nil
:else
(when-let [ent (d/entity db entity)]
(or (when-let [uuid (:block/uuid ent)]
[:block/uuid uuid])
(when-let [ident (:db/ident ent)]
[:db/ident ident])))))
(defn- normalize-ref [db ref]
(or (entity->lookup db ref) ref))
(defn- normalize-ref-value [db value]
(cond
(set? value) (into #{} (map (partial normalize-ref db)) value)
(sequential? value) (mapv (partial normalize-ref db) value)
:else (normalize-ref db value)))
(defn- ref-attr? [attr]
(contains? db-schema/ref-type-attributes attr))
(defn- normalize-tx-data [db tx-data]
(mapv
(fn [item]
(cond
(and (vector? item) (#{:db/add :db/retract} (first item)))
(let [[op e a v] item]
(cond-> [op (normalize-ref db e) a v]
(ref-attr? a)
(update 3 (partial normalize-ref-value db))))
(and (vector? item) (#{:db.fn/retractEntity :db/retractEntity} (first item)))
(let [[op e] item]
[op (normalize-ref db e)])
(map? item)
(let [item' (cond-> item
(contains? item :db/id)
(update :db/id (partial normalize-ref db)))]
(reduce
(fn [acc k]
(if (contains? acc k)
(update acc k (partial normalize-ref-value db))
acc))
item'
db-schema/ref-type-attributes))
:else item))
tx-data))
(defn- contains-unstable-entity-id? [tx-data]
(boolean
(some
@@ -309,8 +246,8 @@
max-atom (atom max-order)
fixes (reduce
(fn [acc {:keys [entity value]}]
(let [entity-ref (normalize-ref db'' entity)
parent-ref (normalize-ref db'' value)
(let [entity-ref entity
parent-ref value
eid (d/entid db'' entity-ref)
parent-eid' (when parent-ref (d/entid db'' parent-ref))]
(if (and eid (some? value) (nil? parent-eid'))
@@ -332,10 +269,9 @@
max-order-atoms (atom {})
fixes (reduce
(fn [acc {:keys [entity value]}]
(let [entity-ref (normalize-ref db' entity)
(let [entity-ref entity
eid (d/entid db' entity-ref)
parent (when eid (:block/parent (d/entity db' eid)))
parent-eid (when parent (d/entid db' (normalize-ref db' parent)))]
parent-eid eid]
(if (and eid parent-eid value)
(let [siblings (d/datoms db' :avet :block/parent parent-eid)
same-order? (some
@@ -360,9 +296,6 @@
(into tx-data fixes)
tx-data)))
(defn ^:test normalize-tx-data* [db tx-data]
(normalize-tx-data db tx-data))
(defn ^:test contains-unstable-entity-id?* [tx-data]
(contains-unstable-entity-id? tx-data))
@@ -375,24 +308,27 @@
(defn- apply-tx! [^js self tx-data]
(let [sql (.-sql self)
conn (.-conn self)
db @conn
normalized (normalize-tx-data db tx-data)]
(if (contains-unstable-entity-id? normalized)
{:type "tx/reject"
:reason "unstable-id"}
(let [ ;; {:keys [db tx-data]} (ensure-idents db normalized)
parent-fixed (fix-missing-parent db tx-data)
order-fixed (fix-duplicate-orders db parent-fixed)
cycle-info (cycle/detect-cycle db order-fixed)]
(if cycle-info
(cycle-reject-response db order-fixed cycle-info)
(let [_ (d/transact! conn order-fixed)
new-t (storage/next-t! sql)
created-at (common/now-ms)
tx-str (common/write-transit order-fixed)]
(storage/append-tx! sql new-t tx-str created-at)
{:type "tx/ok"
:t new-t}))))))
db @conn]
(let [_ (d/transact! conn tx-data)
new-t (storage/next-t! sql)
created-at (common/now-ms)
tx-str (common/write-transit tx-data)]
(storage/append-tx! sql new-t tx-str created-at)
{:type "tx/ok"
:t new-t})
;; (let [parent-fixed (fix-missing-parent db tx-data)
;; order-fixed (fix-duplicate-orders db parent-fixed)
;; cycle-info (cycle/detect-cycle db order-fixed)]
;; (if cycle-info
;; (cycle-reject-response db order-fixed cycle-info)
;; (let [_ (d/transact! conn order-fixed)
;; new-t (storage/next-t! sql)
;; created-at (common/now-ms)
;; tx-str (common/write-transit order-fixed)]
;; (storage/append-tx! sql new-t tx-str created-at)
;; {:type "tx/ok"
;; :t new-t})))
))
(defn- handle-tx! [^js self tx-data t-before]
(let [current-t (t-now self)]

View File

@@ -52,8 +52,8 @@
(prn :tx-meta tx-meta)))
(defmethod listen-db-changes :worker-sync
[_ {:keys [repo]} {:keys [tx-data tx-meta]}]
(worker-sync/handle-local-tx! repo tx-data tx-meta))
[_ {:keys [repo]} tx-report]
(worker-sync/handle-local-tx! repo tx-report))
(defn- remove-old-embeddings-and-reset-new-updates!
[conn tx-data tx-meta]

View File

@@ -282,8 +282,7 @@
(let [config (or config "")
initial-data (sqlite-create-graph/build-db-initial-data
config (select-keys opts [:import-type :graph-git-sha]))]
(ldb/transact! conn initial-data {:initial-db? true})))
initial-tx-data (:tx-data initial-tx-report)]
(ldb/transact! conn initial-data {:initial-db? true})))]
(gc-sqlite-dbs! db client-ops-db debug-log-db conn {})
(let [migration-result (db-migrate/migrate conn)]
@@ -291,8 +290,8 @@
(let [client-ops (rtc-migrate/migration-results=>client-ops migration-result)]
(client-op/add-ops! repo client-ops))))
(when initial-tx-data
(worker-sync/handle-local-tx! repo initial-tx-data {:initial-db? true}))
(when initial-tx-report
(worker-sync/handle-local-tx! repo initial-tx-report))
(db-listener/listen-db-changes! repo (get @*datascript-conns repo)))))))

View File

@@ -55,45 +55,35 @@
(when (ws-open? ws)
(.send ws (js/JSON.stringify (clj->js message)))))
(defn- entity->lookup [db eid]
(when-let [entity (and (number? eid) (d/entity db eid))]
(or (when-let [uuid (:block/uuid entity)]
[:block/uuid uuid])
(when-let [ident (:db/ident entity)]
[:db/ident ident]))))
(defn- normalize-ref [db a value]
(if (and (integer? value)
(= :db.type/ref (:db/valueType (d/entity db a))))
(if-let [id (:block/uuid (d/entity db value))]
[:block/uuid id]
(throw (ex-info (str "There's no :block/uuid for given refed value: " value)
{:value value})))
value))
(defn- normalize-ref [db ref]
(cond
(number? ref) (or (entity->lookup db ref) ref)
(instance? Entity ref) (or (when-let [uuid (:block/uuid ref)]
[:block/uuid uuid])
(when-let [ident (:db/ident ref)]
[:db/ident ident]))
:else ref))
(defn- normalize-tx-data [db tx-data]
(mapv
(fn [item]
(cond
(and (vector? item) (#{:db/add :db/retract} (first item)))
(let [[op e a v] item]
[op (normalize-ref db e) a (normalize-ref db v)])
(map? item)
(let [item' (if (contains? item :db/id)
(let [lookup (normalize-ref db (:db/id item))]
(cond-> (dissoc item :db/id)
lookup (assoc :block/uuid (second lookup))))
item)]
(cond-> item'
(contains? item' :block/parent)
(update :block/parent (partial normalize-ref db))
(contains? item' :block/page)
(update :block/page (partial normalize-ref db))))
:else item))
tx-data))
(defn- normalize-tx-data [db-after db-before tx-data]
(->> tx-data
(remove (fn [[e a v t added]]
(contains? #{:block/tx-id :logseq.property/created-by-ref
:logseq.property.embedding/hnsw-label-updated-at} a)))
(map
(fn [[e a v t added]]
(let [v' (or (normalize-ref db-after a v) (normalize-ref db-before a v))]
(if added
[:db/add (- e) a v']
(let [e' (if-let [id (or (:block/uuid (d/entity db-after e))
(:block/uuid (d/entity db-before e)))]
[:block/uuid id]
(let [ident (or (:db/ident (d/entity db-after e))
(:db/ident (d/entity db-before e)))]
(when-not ident
(throw (ex-info "Entity has no :block/uuid or :db/ident"
{:data [e a v t added]})))
ident))]
[:db/retract e' a v'])))))))
(defn- parse-message [raw]
(try
@@ -312,11 +302,12 @@
(p/resolved nil)))
(defn enqueue-local-tx!
[repo tx-data tx-meta]
[repo {:keys [tx-data db-after db-before]}]
(let [conn (worker-state/get-datascript-conn repo)
db (some-> conn deref)]
(when db
(let [normalized (if (:initial-db? tx-meta) tx-data (normalize-tx-data db tx-data))
(let [normalized (normalize-tx-data db-after db-before tx-data)
_ (prn :debug :normalized normalized)
tx-str (sqlite-util/write-transit-str normalized)]
(persist-local-tx! repo tx-str)
(when-let [client (get @worker-state/*worker-sync-clients repo)]
@@ -330,10 +321,10 @@
(flush-pending! repo client)))))))))))))
(defn handle-local-tx!
[repo tx-data tx-meta]
[repo {:keys [tx-data tx-meta] :as tx-report}]
(when (and (enabled?)
(seq tx-data)
(not (:worker-sync/remote? tx-meta))
(not (:rtc-download-graph? tx-meta))
(not (:from-disk? tx-meta)))
(enqueue-local-tx! repo tx-data tx-meta)))
(enqueue-local-tx! repo tx-report)))