From 93269f83fcedb7f6361b391377e58543e5975663 Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Wed, 7 Jan 2026 17:54:26 +0800 Subject: [PATCH] initial sync graph --- .../src/logseq/worker_sync/worker.cljs | 116 ++++-------------- src/main/frontend/worker/db_listener.cljs | 4 +- src/main/frontend/worker/db_worker.cljs | 7 +- src/main/frontend/worker/worker_sync.cljs | 75 +++++------ 4 files changed, 64 insertions(+), 138 deletions(-) diff --git a/deps/worker-sync/src/logseq/worker_sync/worker.cljs b/deps/worker-sync/src/logseq/worker_sync/worker.cljs index 9ec4a35257..9d3f5d450d 100644 --- a/deps/worker-sync/src/logseq/worker_sync/worker.cljs +++ b/deps/worker-sync/src/logseq/worker_sync/worker.cljs @@ -76,6 +76,7 @@ (let [conn (.-conn self) db @conn datoms (protocol/datoms->wire (d/datoms db :eavt))] + (prn :debug :count (count db)) {:type "snapshot/ok" :t (t-now self) :datoms (common/write-transit datoms)})) @@ -86,73 +87,9 @@ :attr (name attr) :server_values (common/write-transit (cycle/server-values-for db tx-data attr))}) -(defn- entity->lookup [db entity] - (cond - (vector? entity) entity - (uuid? entity) [:block/uuid entity] - (keyword? entity) [:db/ident entity] - (map? entity) - (or (when-let [uuid (:block/uuid entity)] - [:block/uuid uuid]) - (when-let [ident (:db/ident entity)] - [:db/ident ident]) - (when-let [eid (:db/id entity)] - (when (vector? eid) - eid))) - (instance? Entity entity) - (or (when-let [uuid (:block/uuid entity)] - [:block/uuid uuid]) - (when-let [ident (:db/ident entity)] - [:db/ident ident])) - (number? entity) nil - :else - (when-let [ent (d/entity db entity)] - (or (when-let [uuid (:block/uuid ent)] - [:block/uuid uuid]) - (when-let [ident (:db/ident ent)] - [:db/ident ident]))))) - -(defn- normalize-ref [db ref] - (or (entity->lookup db ref) ref)) - -(defn- normalize-ref-value [db value] - (cond - (set? value) (into #{} (map (partial normalize-ref db)) value) - (sequential? value) (mapv (partial normalize-ref db) value) - :else (normalize-ref db value))) - (defn- ref-attr? [attr] (contains? db-schema/ref-type-attributes attr)) -(defn- normalize-tx-data [db tx-data] - (mapv - (fn [item] - (cond - (and (vector? item) (#{:db/add :db/retract} (first item))) - (let [[op e a v] item] - (cond-> [op (normalize-ref db e) a v] - (ref-attr? a) - (update 3 (partial normalize-ref-value db)))) - - (and (vector? item) (#{:db.fn/retractEntity :db/retractEntity} (first item))) - (let [[op e] item] - [op (normalize-ref db e)]) - - (map? item) - (let [item' (cond-> item - (contains? item :db/id) - (update :db/id (partial normalize-ref db)))] - (reduce - (fn [acc k] - (if (contains? acc k) - (update acc k (partial normalize-ref-value db)) - acc)) - item' - db-schema/ref-type-attributes)) - - :else item)) - tx-data)) - (defn- contains-unstable-entity-id? [tx-data] (boolean (some @@ -309,8 +246,8 @@ max-atom (atom max-order) fixes (reduce (fn [acc {:keys [entity value]}] - (let [entity-ref (normalize-ref db'' entity) - parent-ref (normalize-ref db'' value) + (let [entity-ref entity + parent-ref value eid (d/entid db'' entity-ref) parent-eid' (when parent-ref (d/entid db'' parent-ref))] (if (and eid (some? value) (nil? parent-eid')) @@ -332,10 +269,9 @@ max-order-atoms (atom {}) fixes (reduce (fn [acc {:keys [entity value]}] - (let [entity-ref (normalize-ref db' entity) + (let [entity-ref entity eid (d/entid db' entity-ref) - parent (when eid (:block/parent (d/entity db' eid))) - parent-eid (when parent (d/entid db' (normalize-ref db' parent)))] + parent-eid eid] (if (and eid parent-eid value) (let [siblings (d/datoms db' :avet :block/parent parent-eid) same-order? (some @@ -360,9 +296,6 @@ (into tx-data fixes) tx-data))) -(defn ^:test normalize-tx-data* [db tx-data] - (normalize-tx-data db tx-data)) - (defn ^:test contains-unstable-entity-id?* [tx-data] (contains-unstable-entity-id? tx-data)) @@ -375,24 +308,27 @@ (defn- apply-tx! [^js self tx-data] (let [sql (.-sql self) conn (.-conn self) - db @conn - normalized (normalize-tx-data db tx-data)] - (if (contains-unstable-entity-id? normalized) - {:type "tx/reject" - :reason "unstable-id"} - (let [ ;; {:keys [db tx-data]} (ensure-idents db normalized) - parent-fixed (fix-missing-parent db tx-data) - order-fixed (fix-duplicate-orders db parent-fixed) - cycle-info (cycle/detect-cycle db order-fixed)] - (if cycle-info - (cycle-reject-response db order-fixed cycle-info) - (let [_ (d/transact! conn order-fixed) - new-t (storage/next-t! sql) - created-at (common/now-ms) - tx-str (common/write-transit order-fixed)] - (storage/append-tx! sql new-t tx-str created-at) - {:type "tx/ok" - :t new-t})))))) + db @conn] + (let [_ (d/transact! conn tx-data) + new-t (storage/next-t! sql) + created-at (common/now-ms) + tx-str (common/write-transit tx-data)] + (storage/append-tx! sql new-t tx-str created-at) + {:type "tx/ok" + :t new-t}) + ;; (let [parent-fixed (fix-missing-parent db tx-data) + ;; order-fixed (fix-duplicate-orders db parent-fixed) + ;; cycle-info (cycle/detect-cycle db order-fixed)] + ;; (if cycle-info + ;; (cycle-reject-response db order-fixed cycle-info) + ;; (let [_ (d/transact! conn order-fixed) + ;; new-t (storage/next-t! sql) + ;; created-at (common/now-ms) + ;; tx-str (common/write-transit order-fixed)] + ;; (storage/append-tx! sql new-t tx-str created-at) + ;; {:type "tx/ok" + ;; :t new-t}))) + )) (defn- handle-tx! [^js self tx-data t-before] (let [current-t (t-now self)] diff --git a/src/main/frontend/worker/db_listener.cljs b/src/main/frontend/worker/db_listener.cljs index 51f2e650bb..d4f270932f 100644 --- a/src/main/frontend/worker/db_listener.cljs +++ b/src/main/frontend/worker/db_listener.cljs @@ -52,8 +52,8 @@ (prn :tx-meta tx-meta))) (defmethod listen-db-changes :worker-sync - [_ {:keys [repo]} {:keys [tx-data tx-meta]}] - (worker-sync/handle-local-tx! repo tx-data tx-meta)) + [_ {:keys [repo]} tx-report] + (worker-sync/handle-local-tx! repo tx-report)) (defn- remove-old-embeddings-and-reset-new-updates! [conn tx-data tx-meta] diff --git a/src/main/frontend/worker/db_worker.cljs b/src/main/frontend/worker/db_worker.cljs index 288a30e93b..47c6ab8082 100644 --- a/src/main/frontend/worker/db_worker.cljs +++ b/src/main/frontend/worker/db_worker.cljs @@ -282,8 +282,7 @@ (let [config (or config "") initial-data (sqlite-create-graph/build-db-initial-data config (select-keys opts [:import-type :graph-git-sha]))] - (ldb/transact! conn initial-data {:initial-db? true}))) - initial-tx-data (:tx-data initial-tx-report)] + (ldb/transact! conn initial-data {:initial-db? true})))] (gc-sqlite-dbs! db client-ops-db debug-log-db conn {}) (let [migration-result (db-migrate/migrate conn)] @@ -291,8 +290,8 @@ (let [client-ops (rtc-migrate/migration-results=>client-ops migration-result)] (client-op/add-ops! repo client-ops)))) - (when initial-tx-data - (worker-sync/handle-local-tx! repo initial-tx-data {:initial-db? true})) + (when initial-tx-report + (worker-sync/handle-local-tx! repo initial-tx-report)) (db-listener/listen-db-changes! repo (get @*datascript-conns repo))))))) diff --git a/src/main/frontend/worker/worker_sync.cljs b/src/main/frontend/worker/worker_sync.cljs index 62cacbdeeb..8e8873a6e9 100644 --- a/src/main/frontend/worker/worker_sync.cljs +++ b/src/main/frontend/worker/worker_sync.cljs @@ -55,45 +55,35 @@ (when (ws-open? ws) (.send ws (js/JSON.stringify (clj->js message))))) -(defn- entity->lookup [db eid] - (when-let [entity (and (number? eid) (d/entity db eid))] - (or (when-let [uuid (:block/uuid entity)] - [:block/uuid uuid]) - (when-let [ident (:db/ident entity)] - [:db/ident ident])))) +(defn- normalize-ref [db a value] + (if (and (integer? value) + (= :db.type/ref (:db/valueType (d/entity db a)))) + (if-let [id (:block/uuid (d/entity db value))] + [:block/uuid id] + (throw (ex-info (str "There's no :block/uuid for given refed value: " value) + {:value value}))) + value)) -(defn- normalize-ref [db ref] - (cond - (number? ref) (or (entity->lookup db ref) ref) - (instance? Entity ref) (or (when-let [uuid (:block/uuid ref)] - [:block/uuid uuid]) - (when-let [ident (:db/ident ref)] - [:db/ident ident])) - :else ref)) - -(defn- normalize-tx-data [db tx-data] - (mapv - (fn [item] - (cond - (and (vector? item) (#{:db/add :db/retract} (first item))) - (let [[op e a v] item] - [op (normalize-ref db e) a (normalize-ref db v)]) - - (map? item) - (let [item' (if (contains? item :db/id) - (let [lookup (normalize-ref db (:db/id item))] - (cond-> (dissoc item :db/id) - lookup (assoc :block/uuid (second lookup)))) - item)] - (cond-> item' - (contains? item' :block/parent) - (update :block/parent (partial normalize-ref db)) - - (contains? item' :block/page) - (update :block/page (partial normalize-ref db)))) - - :else item)) - tx-data)) +(defn- normalize-tx-data [db-after db-before tx-data] + (->> tx-data + (remove (fn [[e a v t added]] + (contains? #{:block/tx-id :logseq.property/created-by-ref + :logseq.property.embedding/hnsw-label-updated-at} a))) + (map + (fn [[e a v t added]] + (let [v' (or (normalize-ref db-after a v) (normalize-ref db-before a v))] + (if added + [:db/add (- e) a v'] + (let [e' (if-let [id (or (:block/uuid (d/entity db-after e)) + (:block/uuid (d/entity db-before e)))] + [:block/uuid id] + (let [ident (or (:db/ident (d/entity db-after e)) + (:db/ident (d/entity db-before e)))] + (when-not ident + (throw (ex-info "Entity has no :block/uuid or :db/ident" + {:data [e a v t added]}))) + ident))] + [:db/retract e' a v']))))))) (defn- parse-message [raw] (try @@ -312,11 +302,12 @@ (p/resolved nil))) (defn enqueue-local-tx! - [repo tx-data tx-meta] + [repo {:keys [tx-data db-after db-before]}] (let [conn (worker-state/get-datascript-conn repo) db (some-> conn deref)] (when db - (let [normalized (if (:initial-db? tx-meta) tx-data (normalize-tx-data db tx-data)) + (let [normalized (normalize-tx-data db-after db-before tx-data) + _ (prn :debug :normalized normalized) tx-str (sqlite-util/write-transit-str normalized)] (persist-local-tx! repo tx-str) (when-let [client (get @worker-state/*worker-sync-clients repo)] @@ -330,10 +321,10 @@ (flush-pending! repo client))))))))))))) (defn handle-local-tx! - [repo tx-data tx-meta] + [repo {:keys [tx-data tx-meta] :as tx-report}] (when (and (enabled?) (seq tx-data) (not (:worker-sync/remote? tx-meta)) (not (:rtc-download-graph? tx-meta)) (not (:from-disk? tx-meta))) - (enqueue-local-tx! repo tx-data tx-meta))) + (enqueue-local-tx! repo tx-report)))