fix(sync): preserve tx-id across rebases

This commit is contained in:
Tienson Qin
2026-04-07 19:23:18 +08:00
parent 66785cf8e7
commit ecc03e1efc
4 changed files with 89 additions and 51 deletions

View File

@@ -1,14 +1,14 @@
(ns logseq.db-sync.storage
(:require [cljs-bean.core :as bean]
[clojure.string :as string]
[datascript.core :as d]
[datascript.storage :refer [IStorage]]
[logseq.db :as ldb]
[logseq.db-sync.checksum :as sync-checksum]
[logseq.db-sync.common :as common]
[logseq.db.common.normalize :as db-normalize]
[logseq.db.common.sqlite :as common-sqlite]
[logseq.db.frontend.schema :as db-schema]))
(:require
[cljs-bean.core :as bean]
[clojure.string :as string]
[datascript.core :as d]
[datascript.storage :refer [IStorage]]
[logseq.db-sync.checksum :as sync-checksum]
[logseq.db-sync.common :as common]
[logseq.db.common.normalize :as db-normalize]
[logseq.db.common.sqlite :as common-sqlite]
[logseq.db.frontend.schema :as db-schema]))
(def ^:private tx-log-outliner-op-migration-sql
"alter table tx_log add column outliner_op TEXT")

View File

@@ -14,7 +14,6 @@
[logseq.db-sync.worker.http :as http]
[logseq.db-sync.worker.routes.sync :as sync-routes]
[logseq.db-sync.worker.ws :as ws]
[logseq.db.frontend.schema :as db-schema]
[promesa.core :as p]))
(def ^:private snapshot-download-batch-size 10000)

View File

@@ -227,12 +227,26 @@
:db-sync/created-at now}])
(worker-undo-redo/gen-undo-ops! repo tx-report tx-id
{:apply-history-action! apply-history-action!})
(when should-inc-pending?
(client-op/adjust-pending-local-tx-count! repo 1)
(when-let [client (current-client repo)]
(broadcast-rtc-state! client)))
(when should-inc-pending?
(client-op/adjust-pending-local-tx-count! repo 1)
(when-let [client (current-client repo)]
(broadcast-rtc-state! client)))
tx-id)))
(defn prepare-upload-tx-entries
[_conn pending]
(let [entries (mapv (fn [{:keys [tx-id tx outliner-op]}]
{:tx-id tx-id
:outliner-op outliner-op
:tx-data (vec tx)})
pending)
empty-tx-ids (->> entries
(filter (comp empty? :tx-data))
(mapv :tx-id))
tx-entries (filterv (comp seq :tx-data) entries)]
{:tx-entries tx-entries
:drop-tx-ids empty-tx-ids}))
(defn pending-txs
[repo & {:keys [limit]}]
(when-let [conn (client-ops-conn repo)]
@@ -429,21 +443,16 @@
local-tx (or (client-op/get-local-tx repo) 0)
remote-tx (get @*repo->latest-remote-tx repo)
conn (worker-state/get-datascript-conn repo)]
(when (and conn (= local-tx remote-tx)) ; rebase
(when (and conn (= local-tx remote-tx)) ; rebase
(when (empty? inflight)
(when-let [ws (:ws client)]
(when (and (ws-open? ws) (worker-state/online?))
(let [batch (pending-txs repo {:limit 50})]
(when (seq batch)
(let [tx-entries (->> batch
(mapv (fn [{:keys [tx-id tx outliner-op]}]
{:tx-id tx-id
:outliner-op outliner-op
:tx-data (vec tx)}))
(filterv (comp seq :tx-data)))
tx-ids (mapv :tx-id batch)]
(if (empty? tx-entries)
(remove-pending-txs! repo tx-ids)
(let [{:keys [tx-entries drop-tx-ids]} (prepare-upload-tx-entries conn batch)]
(when (seq drop-tx-ids)
(remove-pending-txs! repo drop-tx-ids))
(when (seq tx-entries)
(-> (p/let [aes-key (when (sync-crypt/graph-e2ee? repo)
(sync-crypt/<ensure-graph-aes-key repo (:graph-id client)))
_ (when (and (sync-crypt/graph-e2ee? repo) (nil? aes-key))
@@ -464,7 +473,8 @@
(cond-> {:tx (sqlite-util/write-transit-str tx-data)}
outliner-op
(assoc :outliner-op outliner-op)))
tx-entries*)]
tx-entries*)
tx-ids (mapv :tx-id tx-entries)]
(reset! (:inflight client) tx-ids)
(send! ws {:type "tx/batch"
:t-before local-tx
@@ -654,20 +664,17 @@
fallback-target (:fallback-target opts')
fallback-target' (or (replay-entity-id-value @conn fallback-target)
fallback-target)
target-block (d/entity @conn target-id')
use-fallback? (and sibling?
(nil? target-block)
(nil? target)
(some? fallback-target))
target-block' (if use-fallback?
(d/entity @conn fallback-target')
target-block)
target)
move-opts (cond-> (-> opts'
(dissoc :fallback-target)
(assoc :persist-op? false))
use-fallback?
(assoc :sibling? false))]
(when-not target-block'
(invalid-rebase-op! op {:args args}))
(outliner-core/move-blocks! conn blocks target-block' move-opts))))
:move-blocks-up-down
@@ -826,7 +833,10 @@
(try
(ldb/batch-transact-with-temp-conn!
conn
{:outliner-op :rebase}
{:outliner-op :rebase
;; Keep stable tx-id across rebases so one logical pending op
;; doesn't fan out into duplicated pending rows.
:db-sync/tx-id (:tx-id local-tx)}
(fn [conn]
(if (= [[:transact nil]] outliner-ops)
(when-let [tx-data (seq (:tx local-tx))]
@@ -877,7 +887,16 @@
(fix-tx! conn tx-report {:outliner-op :fix}))
(remove-pending-txs! repo (map :tx-id local-txs))
;; Successful rebases persist with the original tx-id.
;; Only drop entries that failed to rebase or became empty no-op txs.
(let [rebased-tx-ids (->> @*rebase-tx-reports
(keep (comp :db-sync/tx-id :tx-meta))
set)
stale-tx-ids (->> local-txs
(map :tx-id)
(remove rebased-tx-ids)
vec)]
(remove-pending-txs! repo stale-tx-ids))
(catch :default e
(js/console.error e)

View File

@@ -248,7 +248,35 @@
(reduce
(fn [{:keys [t txs conn] :as state} tx-entry]
(let [tx-data (:tx-data tx-entry)
{:keys [db-before db-after tx-data]} (ldb/transact! conn tx-data {:op :apply-client-tx})
{:keys [db-before db-after tx-data]}
(try
(ldb/transact! conn tx-data {:op :apply-client-tx})
(catch :default error
(let [missing-entity-id (some-> (ex-data error) :entity-id)
same-entity-txs
(when missing-entity-id
(->> tx-entries
(keep-indexed
(fn [idx entry]
(let [entry-tx (:tx-data entry)
touches? (some #(or (= missing-entity-id (second %))
(= missing-entity-id (nth % 3 nil)))
entry-tx)]
(when touches?
{:idx idx
:tx-id (:tx-id entry)
:outliner-op (:outliner-op entry)
:tx-data entry-tx}))))
vec))]
(throw (ex-info "server upload transact failed"
{:type :db-sync-sim/server-upload-transact-failed
:t-before t-before
:server-t t
:missing-entity-id missing-entity-id
:matching-entries same-entity-txs
:tx-entry tx-entry
:tx-data tx-data}
error)))))
normalized-data (->> tx-data
(db-normalize/normalize-tx-data db-after db-before))
next-t (inc t)]
@@ -258,15 +286,8 @@
{:accepted? @accepted?
:t (:t @server)}))
(defn- build-upload-entries [conn pending]
(->> pending
(mapv (fn [{:keys [tx] :as pending-entry}]
(assoc pending-entry
:tx-data (->> tx
(db-normalize/remove-retract-entity-ref @conn)
distinct
vec))))
(filterv (comp seq :tx-data))))
(defn- build-upload-plan [conn pending]
(#'sync-apply/prepare-upload-tx-entries conn pending))
(defn- sync-client! [server {:keys [repo conn client online?]}]
(when online?
@@ -288,21 +309,20 @@
local-tx' (or (client-op/get-local-tx repo) 0)
server-t' (:t @server)]
(when (and (seq pending) (= local-tx' server-t'))
(let [tx-entries (build-upload-entries conn pending)
tx-ids (mapv :tx-id pending)]
(let [{:keys [tx-entries drop-tx-ids]} (build-upload-plan conn pending)]
(when (seq drop-tx-ids)
(#'sync-apply/remove-pending-txs! repo drop-tx-ids)
(reset! progress? true))
;; (prn :debug :upload :repo repo :tx-entries tx-entries)
(if (seq tx-entries)
(let [{:keys [accepted? t]} (server-upload! server local-tx' tx-entries)]
(let [{:keys [accepted? t]} (server-upload! server local-tx' tx-entries)
tx-ids (mapv :tx-id tx-entries)]
(when accepted?
(#'sync-apply/remove-pending-txs! repo tx-ids)
(when (seq tx-ids)
(client-op/update-local-tx repo t)
(reset! progress? true))))
(do
(#'sync-apply/remove-pending-txs! repo tx-ids)
(when (seq tx-ids)
(client-op/update-local-tx repo (:t @server))
(reset! progress? true)))))))
nil))))
@progress?)))
(defn- active-block-uuids