fix: able to sync

This commit is contained in:
Tienson Qin
2026-01-07 23:42:27 +08:00
parent 4670a67bdc
commit c00307dc05
2 changed files with 86 additions and 18 deletions

View File

@@ -86,7 +86,6 @@
(let [conn (.-conn self)
db @conn
datoms (protocol/datoms->wire (d/datoms db :eavt))]
(prn :debug :count (count db))
{:type "snapshot/ok"
:t (t-now self)
:datoms (common/write-transit datoms)}))
@@ -315,14 +314,55 @@
(defn ^:test fix-duplicate-orders* [db tx-data]
(fix-duplicate-orders db tx-data))
(defn- lookup-id?
[v]
(and (vector? v)
(= 2 (count v))
(= (first v) :block/uuid)
(uuid? (second v))))
(defn- normalize-tx-data [db-after db-before tx-data]
(->> tx-data
(map
(fn [[e a v _t added]]
(let [e' (if-let [entity (d/entity db-before e)]
(if-let [id (:block/uuid entity)]
[:block/uuid id]
(:db/ident entity))
(- e))
v' (if (and (integer? v)
(or (= :db.type/ref (:db/valueType (d/entity db-after a)))
(= :db.type/ref (:db/valueType (d/entity db-before a)))))
(if-let [entity (d/entity db-before v)]
(if-let [id (:block/uuid entity)]
[:block/uuid id]
(:db/ident entity))
(- v))
v)]
(if added
[:db/add e' a v']
[:db/retract e' a v']))))))
(defn- apply-tx! [^js self tx-data]
(let [sql (.-sql self)
conn (.-conn self)
db @conn]
(let [_ (d/transact! conn tx-data)
db @conn
tx-data' (keep
(fn [[op e a v]]
(let [e' (if (lookup-id? e)
(:db/id (d/entity db e))
e)
v' (if (lookup-id? v)
(:db/id (d/entity db v))
v)]
(when (and e' v')
[op e' a v'])))
tx-data)]
(let [{:keys [tx-data db-before db-after]} (d/transact! conn tx-data')
normalized-data (normalize-tx-data db-after db-before tx-data)
new-t (storage/next-t! sql)
created-at (common/now-ms)
tx-str (common/write-transit tx-data)]
tx-str (common/write-transit normalized-data)]
(storage/append-tx! sql new-t tx-str created-at)
{:type "tx/ok"
:t new-t})

View File

@@ -6,6 +6,7 @@
[datascript.impl.entity :as de :refer [Entity]]
[frontend.worker.state :as worker-state]
[lambdaisland.glogi :as log]
[logseq.common.util :as common-util]
[logseq.db :as ldb]
[logseq.db.sqlite.util :as sqlite-util]
[promesa.core :as p]))
@@ -69,21 +70,26 @@
(remove (fn [[e a v t added]]
(contains? #{:block/tx-id :logseq.property/created-by-ref
:logseq.property.embedding/hnsw-label-updated-at} a)))
(common-util/distinct-by-last-wins (fn [[e a v tx _added]] [e a v tx]))
(map
(fn [[e a v t added]]
(let [v' (or (normalize-ref db-after a v) (normalize-ref db-before a v))]
(if added
[:db/add (- e) a v']
(let [e' (if-let [id (or (:block/uuid (d/entity db-after e))
(:block/uuid (d/entity db-before e)))]
(fn [[e a v _t added]]
(let [e' (if-let [entity (d/entity db-before e)]
(if-let [id (:block/uuid entity)]
[:block/uuid id]
(:db/ident entity))
(- e))
v' (if (and (integer? v)
(or (= :db.type/ref (:db/valueType (d/entity db-after a)))
(= :db.type/ref (:db/valueType (d/entity db-before a)))))
(if-let [entity (d/entity db-before v)]
(if-let [id (:block/uuid entity)]
[:block/uuid id]
(let [ident (or (:db/ident (d/entity db-after e))
(:db/ident (d/entity db-before e)))]
(when-not ident
(throw (ex-info "Entity has no :block/uuid or :db/ident"
{:data [e a v t added]})))
ident))]
[:db/retract e' a v'])))))))
(:db/ident entity))
(- v))
v)]
(if added
[:db/add e' a v']
[:db/retract e' a v']))))))
(defn- parse-message [raw]
(try
@@ -95,10 +101,32 @@
(when (number? t)
(reset! (:server-t client) t)))
(defn- lookup-id?
[v]
(and (vector? v)
(= 2 (count v))
(= (first v) :block/uuid)
(uuid? (second v))))
(defn- de-normalized-data
[db tx-data]
(keep
(fn [[op e a v]]
(let [e' (if (lookup-id? e)
(:db/id (d/entity db e))
e)
v' (if (lookup-id? v)
(:db/id (d/entity db v))
v)]
(when (and e' v')
[op e' a v'])))
tx-data))
(defn- apply-remote-tx! [repo tx-data]
(when-let [conn (worker-state/get-datascript-conn repo)]
(try
(d/transact! conn tx-data {:worker-sync/remote? true})
(let [tx-data' (de-normalized-data @conn tx-data)]
(d/transact! conn tx-data' {:worker-sync/remote? true}))
(catch :default e
(log/error :worker-sync/apply-remote-tx-failed {:error e})))))