fix: client should send only the last :block/parent update to server

This commit is contained in:
Tienson Qin
2026-01-12 18:45:02 +08:00
parent 8430f1c9e1
commit bc7f19cbdc
3 changed files with 52 additions and 38 deletions

View File

@@ -21,7 +21,7 @@
[:map
[:type [:= "tx/batch"]]
[:t_before :int]
[:txs [:sequential :string]]]]
[:txs :string]]]
["ping"
[:map
[:type [:= "ping"]]]]])
@@ -107,7 +107,7 @@
(def tx-batch-request-schema
[:map
[:t_before :int]
[:txs [:sequential :string]]])
[:txs :string]])
(def snapshot-row-schema
[:or

View File

@@ -231,7 +231,7 @@
(json-response :assets/delete {:ok true} 200)))
(error-response "method not allowed" 405))))
(error-response "invalid asset path" 400)))))
(error-response "invalid asset path" 400)))))
(defn- handle-worker-fetch [request ^js env]
(let [url (js/URL. (.-url request))
@@ -346,32 +346,33 @@
(->> tx-data
(worker-core/fix-duplicate-orders db)))
(defn- apply-tx! [^js self sender tx-data]
(defn- apply-tx! [^js self sender txs]
(let [sql (.-sql self)
conn (.-conn self)]
(when-not conn
(fail-fast :db-sync/missing-db {:op :apply-tx}))
(let [db @conn
tx-report (d/with db tx-data)
db' (:db-after tx-report)
order-fixed (fix-tx-data db' tx-data)
cycle-info (cycle/detect-cycle db' order-fixed)]
(if cycle-info
(do
(log/info :db-sync/cycle-detected
{:attr (:attr cycle-info)
:entity (:entity cycle-info)
:entity-title (entity-title db (:entity cycle-info))
:tx-count (count order-fixed)})
(cycle-reject-response db order-fixed cycle-info))
(let [{:keys [tx-data db-before db-after]} (ldb/transact! conn order-fixed)
normalized-data (db-normalize/normalize-tx-data db-after db-before tx-data)
new-t (storage/next-t! sql)
created-at (common/now-ms)
tx-str (common/write-transit normalized-data)]
(storage/append-tx! sql new-t tx-str created-at)
(broadcast! self sender {:type "changed" :t new-t})
new-t)))))
(let [tx-data (protocol/transit->tx txs)
db @conn
tx-report (d/with db tx-data)
db' (:db-after tx-report)
order-fixed (fix-tx-data db' tx-data)
cycle-info (cycle/detect-cycle db' order-fixed)]
(if cycle-info
(do
(log/info :db-sync/cycle-detected
{:attr (:attr cycle-info)
:entity (:entity cycle-info)
:entity-title (entity-title db (:entity cycle-info))
:tx-count (count order-fixed)})
(cycle-reject-response db order-fixed cycle-info))
(let [{:keys [tx-data db-before db-after]} (ldb/transact! conn order-fixed)
normalized-data (db-normalize/normalize-tx-data db-after db-before tx-data)
new-t (storage/next-t! sql)
created-at (common/now-ms)
tx-str (common/write-transit normalized-data)]
(storage/append-tx! sql new-t tx-str created-at)
(broadcast! self sender {:type "changed" :t new-t})
new-t)))))
(defn- handle-tx-batch! [^js self sender txs t-before]
(let [current-t (t-now self)]
@@ -386,15 +387,14 @@
:t current-t}
:else
(let [tx-data (mapcat protocol/transit->tx txs)]
(if (seq tx-data)
(let [new-t (apply-tx! self sender tx-data)]
(if (and (map? new-t) (= "tx/reject" (:type new-t)))
new-t
{:type "tx/batch/ok"
:t new-t}))
{:type "tx/reject"
:reason "empty tx data"})))))
(if txs
(let [new-t (apply-tx! self sender txs)]
(if (and (map? new-t) (= "tx/reject" (:type new-t)))
new-t
{:type "tx/batch/ok"
:t new-t}))
{:type "tx/reject"
:reason "empty tx data"}))))
(defn- handle-ws-message! [^js self ^js ws raw]
(let [message (-> raw protocol/parse-message coerce-ws-client-message)]
@@ -420,7 +420,7 @@
"tx/batch"
(let [txs (:txs message)
t-before (parse-int (:t_before message))]
(if (and (sequential? txs) (every? string? txs))
(if (string? txs)
(send! ws (handle-tx-batch! self ws txs t-before))
(send! ws {:type "tx/reject" :reason "invalid tx"})))
@@ -509,7 +509,7 @@
(bad-request "invalid tx")
(let [{:keys [txs t_before]} body
t-before (parse-int t_before)]
(if (and (sequential? txs) (every? string? txs))
(if (string? txs)
(json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before))
(bad-request "invalid tx"))))))))

View File

@@ -7,6 +7,7 @@
[lambdaisland.glogi :as log]
[logseq.common.config :as common-config]
[logseq.common.path :as path]
[logseq.common.util :as common-util]
[logseq.db :as ldb]
[logseq.db-sync.cycle :as db-sync-cycle]
[logseq.db-sync.malli-schema :as db-sync-schema]
@@ -402,6 +403,16 @@
[:db.fn/retractEntity [:db-sync/tx-id tx-id]])
tx-ids)))))
(defn- keep-last-parent-update
[tx-data]
(->> tx-data
(common-util/distinct-by-last-wins
(fn [item]
;; Keep the last :block/parent change to avoid value refers to deleted parent
(if (and (vector? item) (= 4 (count item)) (= :block/parent (nth item 2)))
[(second item) (nth item 2)]
item)))))
(defn- flush-pending!
[repo client]
(let [inflight @(:inflight client)]
@@ -411,12 +422,15 @@
(let [batch (pending-txs repo 50)]
(when (seq batch)
(let [tx-ids (mapv :tx-id batch)
txs (mapv :tx batch)]
txs (mapv :tx batch)
tx-data (->> txs
(mapcat sqlite-util/read-transit-str)
keep-last-parent-update)]
(when (seq txs)
(reset! (:inflight client) tx-ids)
(send! ws {:type "tx/batch"
:t_before (or (client-op/get-local-tx repo) 0)
:txs txs}))))))))))
:txs (sqlite-util/write-transit-str tx-data)}))))))))))
(defn- pending-txs-by-ids
[repo tx-ids]