mirror of
https://github.com/logseq/logseq.git
synced 2026-04-24 22:25:01 +00:00
fix(db-sync): fix blocks-cycle, add doc docs/agent-guide/db-sync/fix-blocks-cycle.md
This commit is contained in:
37
deps/db-sync/src/logseq/db_sync/worker.cljs
vendored
37
deps/db-sync/src/logseq/db_sync/worker.cljs
vendored
@@ -205,6 +205,18 @@
|
||||
(when-not (js/isNaN n)
|
||||
n))))
|
||||
|
||||
(defn- entity-title
|
||||
[db entity-ref]
|
||||
(let [ent (cond
|
||||
(vector? entity-ref) (d/entity db entity-ref)
|
||||
(number? entity-ref) (d/entity db entity-ref)
|
||||
(keyword? entity-ref) (d/entity db [:db/ident entity-ref])
|
||||
:else nil)]
|
||||
(when ent
|
||||
{:uuid (some-> (:block/uuid ent) str)
|
||||
:title (or (:block/title ent)
|
||||
(:block/name ent))})))
|
||||
|
||||
(def ^:private max-asset-size (* 100 1024 1024))
|
||||
(def ^:private snapshot-rows-default-limit 500)
|
||||
(def ^:private snapshot-rows-max-limit 2000)
|
||||
@@ -312,12 +324,19 @@
|
||||
addresses)))
|
||||
(set! (.-conn self) (storage/open-conn sql))))
|
||||
|
||||
(defn- cycle-reject-response [db tx-data {:keys [attr]}]
|
||||
{:type "tx/reject"
|
||||
:reason "cycle"
|
||||
:data (common/write-transit
|
||||
{:attr attr
|
||||
:server_values (cycle/server-values-for db tx-data attr)})})
|
||||
(defn- cycle-reject-response [db tx-data {:keys [attr entity]}]
|
||||
(let [server-values (cycle/server-values-for db tx-data attr)]
|
||||
(log/info :db-sync/cycle-reject
|
||||
{:attr attr
|
||||
:entity entity
|
||||
:entity-title (entity-title db entity)
|
||||
:server-values (count server-values)
|
||||
:tx-count (count tx-data)})
|
||||
{:type "tx/reject"
|
||||
:reason "cycle"
|
||||
:data (common/write-transit
|
||||
{:attr attr
|
||||
:server_values server-values})}))
|
||||
|
||||
(defn- fix-tx-data
|
||||
[db tx-data]
|
||||
@@ -334,7 +353,11 @@
|
||||
cycle-info (cycle/detect-cycle db' order-fixed)]
|
||||
(if cycle-info
|
||||
(do
|
||||
(prn :debug "cycle detected: " cycle-info)
|
||||
(log/info :db-sync/cycle-detected
|
||||
{:attr (:attr cycle-info)
|
||||
:entity (:entity cycle-info)
|
||||
:entity-title (entity-title db (:entity cycle-info))
|
||||
:tx-count (count order-fixed)})
|
||||
(cycle-reject-response db order-fixed cycle-info))
|
||||
(let [{:keys [tx-data db-before db-after]} (ldb/transact! conn order-fixed)
|
||||
normalized-data (db-normalize/normalize-tx-data db-after db-before tx-data)
|
||||
|
||||
21
docs/agent-guide/db-sync/fix-blocks-cycle.md
Normal file
21
docs/agent-guide/db-sync/fix-blocks-cycle.md
Normal file
@@ -0,0 +1,21 @@
|
||||
This document describes the handling of cycles formed between multiple blocks in the implementation of db-sync.
|
||||
|
||||
## When cycles are detected
|
||||
- Cycles are detected on the server when applying client tx batches in `deps/db-sync/src/logseq/db_sync/worker.cljs`.
|
||||
- The server calls `logseq.db-sync.cycle/detect-cycle` which inspects updates to `:block/parent` (and other special attrs like class extends).
|
||||
- If applying the tx would introduce a cycle, the server rejects the batch with `{:type "tx/reject" :reason "cycle" ...}`.
|
||||
|
||||
## What the server returns
|
||||
- The reject payload includes:
|
||||
- `attr`: the attribute that introduced the cycle (for blocks this is `:block/parent`).
|
||||
- `server_values`: a map of the affected entities to the server’s current value for `attr` (from `logseq.db-sync.cycle/server-values-for`).
|
||||
- This allows the client to realign its local state to the server’s authoritative values.
|
||||
|
||||
## Client-side reconciliation
|
||||
- The client handles `tx/reject` with reason `"cycle"` in `src/main/frontend/worker/db_sync.cljs`.
|
||||
- It calls `reconcile-cycle!` which builds `:db/add` / `:db/retract` ops to restore `attr` to the server values, then transacts them locally with `:rtc-tx? true`.
|
||||
- The intent is to correct local cycles and prevent re-uploading conflicting changes.
|
||||
- The client also strips cycle-related attrs (`:block/parent`, `:logseq.property.class/extends`) from the rejected inflight txs, requeues the remaining changes, and flushes pending txs so other attribute updates still sync.
|
||||
|
||||
## Known pitfalls and fixes
|
||||
- :logseq.property.class/extends not well handled yet, let it be for now, FIX it later
|
||||
@@ -261,14 +261,119 @@
|
||||
(conj acc [:db/add eid attr value]))))
|
||||
[]
|
||||
server_values)]
|
||||
(log/info :db-sync/reconcile-cycle
|
||||
{:repo repo
|
||||
:attr attr
|
||||
:server-values (count server_values)
|
||||
:tx-count (count tx-data)
|
||||
:entity-titles (->> (keys server_values)
|
||||
(keep (fn [ref]
|
||||
(when-let [ent (d/entity db ref)]
|
||||
{:uuid (some-> (:block/uuid ent) str)
|
||||
:title (or (:block/title ent)
|
||||
(:block/name ent))})))
|
||||
(take 10))})
|
||||
(when (seq tx-data)
|
||||
(ldb/transact! conn tx-data {:rtc-tx? true})))))
|
||||
|
||||
(defn- normalize-entity-ref
|
||||
[db entity]
|
||||
(cond
|
||||
(vector? entity) entity
|
||||
(number? entity) (when-let [ent (d/entity db entity)]
|
||||
(cond
|
||||
(:block/uuid ent) [:block/uuid (:block/uuid ent)]
|
||||
(:db/ident ent) [:db/ident (:db/ident ent)]
|
||||
:else nil))
|
||||
(uuid? entity) [:block/uuid entity]
|
||||
(keyword? entity) [:db/ident entity]
|
||||
:else nil))
|
||||
|
||||
(defn- strip-cycle-attrs
|
||||
[db tx-data {:keys [attr entity-refs]}]
|
||||
(let [entity-refs (set entity-refs)]
|
||||
(->> tx-data
|
||||
(mapcat
|
||||
(fn [tx]
|
||||
(cond
|
||||
(and (vector? tx) (= attr (nth tx 2 nil)))
|
||||
(let [entity (nth tx 1 nil)
|
||||
entity-ref (normalize-entity-ref db entity)]
|
||||
(if (and entity-ref (contains? entity-refs entity-ref))
|
||||
[]
|
||||
[tx]))
|
||||
|
||||
(and (map? tx) (contains? tx attr))
|
||||
(let [entity (or (:db/id tx) (:block/uuid tx) (:db/ident tx))
|
||||
entity-ref (normalize-entity-ref db entity)]
|
||||
(if (and entity-ref (contains? entity-refs entity-ref))
|
||||
(let [tx' (dissoc tx attr)
|
||||
meaningful (seq (dissoc tx' :db/id :block/uuid :db/ident))]
|
||||
(if meaningful [tx'] []))
|
||||
[tx]))
|
||||
|
||||
:else
|
||||
[tx]))))))
|
||||
|
||||
(declare flush-pending!)
|
||||
(declare remove-pending-txs!)
|
||||
(declare persist-local-tx!)
|
||||
(declare client-ops-conn)
|
||||
|
||||
(declare enqueue-asset-sync!)
|
||||
(declare enqueue-asset-initial-download!)
|
||||
(defn- pending-txs-by-ids
|
||||
[repo tx-ids]
|
||||
(when-let [conn (client-ops-conn repo)]
|
||||
(let [db @conn]
|
||||
(keep (fn [tx-id]
|
||||
(when-let [ent (d/entity db [:db-sync/tx-id tx-id])]
|
||||
(when-let [tx (:db-sync/tx ent)]
|
||||
{:tx-id tx-id
|
||||
:tx tx})))
|
||||
tx-ids))))
|
||||
|
||||
(defn- requeue-non-parent-txs!
|
||||
[repo attr server_values entries]
|
||||
(let [db (some-> (worker-state/get-datascript-conn repo) deref)
|
||||
entity-refs (when (seq server_values) (set (keys server_values)))
|
||||
scoped? (and db attr (seq entity-refs))
|
||||
requeued (volatile! 0)
|
||||
stripped (volatile! 0)]
|
||||
(if-not scoped?
|
||||
(throw (ex-info "db-sync requeue requires scoped cycle info"
|
||||
{:repo repo
|
||||
:has-db? (boolean db)
|
||||
:attr attr
|
||||
:server-values (count server_values)
|
||||
:entries (count entries)}))
|
||||
(do
|
||||
(doseq [{:keys [tx]} entries]
|
||||
(when (string? tx)
|
||||
(vswap! stripped inc)
|
||||
(let [tx-data (sqlite-util/read-transit-str tx)
|
||||
filtered (strip-cycle-attrs db tx-data {:attr attr :entity-refs entity-refs})]
|
||||
(when (seq filtered)
|
||||
(vswap! requeued inc)
|
||||
(persist-local-tx! repo (sqlite-util/write-transit-str filtered))))))
|
||||
(log/info :db-sync/requeue-non-parent-txs
|
||||
{:repo repo
|
||||
:entries (count entries)
|
||||
:stripped @stripped
|
||||
:requeued @requeued})))))
|
||||
|
||||
(defn- cycle-entity-titles
|
||||
[repo server_values]
|
||||
(when-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(let [db @conn]
|
||||
(->> (keys server_values)
|
||||
(keep (fn [ref]
|
||||
(when-let [ent (d/entity db ref)]
|
||||
{:uuid (some-> (:block/uuid ent) str)
|
||||
:title (or (:block/title ent)
|
||||
(:block/name ent))})))
|
||||
(take 10)))))
|
||||
|
||||
(defn- handle-message! [repo client raw]
|
||||
(when-let [message (-> raw parse-message coerce-ws-server-message)]
|
||||
(let [local-tx (or (client-op/get-local-tx repo) 0)
|
||||
@@ -301,7 +406,21 @@
|
||||
"cycle"
|
||||
(let [{:keys [attr server_values]} (sqlite-util/read-transit-str (:data message))]
|
||||
;; FIXME: fix cycle shouldn't re-trigger uploading
|
||||
(reconcile-cycle! repo attr server_values))
|
||||
(let [inflight-ids @(:inflight client)
|
||||
inflight-entries (pending-txs-by-ids repo inflight-ids)]
|
||||
(log/info :db-sync/tx-reject-cycle
|
||||
{:repo repo
|
||||
:attr attr
|
||||
:server-values (count server_values)
|
||||
:entity-titles (cycle-entity-titles repo server_values)
|
||||
:inflight-ids (count inflight-ids)
|
||||
:local-tx local-tx
|
||||
:remote-tx remote-tx})
|
||||
(reconcile-cycle! repo attr server_values)
|
||||
(remove-pending-txs! repo inflight-ids)
|
||||
(reset! (:inflight client) [])
|
||||
(requeue-non-parent-txs! repo attr server_values inflight-entries))
|
||||
(flush-pending! repo client))
|
||||
nil)
|
||||
nil))))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user