From ecc03e1efc9fd71462bf63f8af4a688e3eb062bf Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Tue, 7 Apr 2026 19:23:18 +0800 Subject: [PATCH] fix(sync): preserve tx-id across rebases --- deps/db-sync/src/logseq/db_sync/storage.cljs | 20 +++--- .../logseq/db_sync/worker/handler/sync.cljs | 1 - src/main/frontend/worker/sync/apply_txs.cljs | 63 ++++++++++++------- .../frontend/worker/db_sync_sim_test.cljs | 56 +++++++++++------ 4 files changed, 89 insertions(+), 51 deletions(-) diff --git a/deps/db-sync/src/logseq/db_sync/storage.cljs b/deps/db-sync/src/logseq/db_sync/storage.cljs index 5fccad3d12..773936eb7b 100644 --- a/deps/db-sync/src/logseq/db_sync/storage.cljs +++ b/deps/db-sync/src/logseq/db_sync/storage.cljs @@ -1,14 +1,14 @@ (ns logseq.db-sync.storage - (:require [cljs-bean.core :as bean] - [clojure.string :as string] - [datascript.core :as d] - [datascript.storage :refer [IStorage]] - [logseq.db :as ldb] - [logseq.db-sync.checksum :as sync-checksum] - [logseq.db-sync.common :as common] - [logseq.db.common.normalize :as db-normalize] - [logseq.db.common.sqlite :as common-sqlite] - [logseq.db.frontend.schema :as db-schema])) + (:require + [cljs-bean.core :as bean] + [clojure.string :as string] + [datascript.core :as d] + [datascript.storage :refer [IStorage]] + [logseq.db-sync.checksum :as sync-checksum] + [logseq.db-sync.common :as common] + [logseq.db.common.normalize :as db-normalize] + [logseq.db.common.sqlite :as common-sqlite] + [logseq.db.frontend.schema :as db-schema])) (def ^:private tx-log-outliner-op-migration-sql "alter table tx_log add column outliner_op TEXT") diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs index dc4c140b74..a24a0539e9 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs @@ -14,7 +14,6 @@ [logseq.db-sync.worker.http :as http] [logseq.db-sync.worker.routes.sync :as sync-routes] [logseq.db-sync.worker.ws :as ws] - [logseq.db.frontend.schema :as db-schema] [promesa.core :as p])) (def ^:private snapshot-download-batch-size 10000) diff --git a/src/main/frontend/worker/sync/apply_txs.cljs b/src/main/frontend/worker/sync/apply_txs.cljs index 9532b31283..350a5bdd96 100644 --- a/src/main/frontend/worker/sync/apply_txs.cljs +++ b/src/main/frontend/worker/sync/apply_txs.cljs @@ -227,12 +227,26 @@ :db-sync/created-at now}]) (worker-undo-redo/gen-undo-ops! repo tx-report tx-id {:apply-history-action! apply-history-action!}) - (when should-inc-pending? - (client-op/adjust-pending-local-tx-count! repo 1) - (when-let [client (current-client repo)] - (broadcast-rtc-state! client))) + (when should-inc-pending? + (client-op/adjust-pending-local-tx-count! repo 1) + (when-let [client (current-client repo)] + (broadcast-rtc-state! client))) tx-id))) +(defn prepare-upload-tx-entries + [_conn pending] + (let [entries (mapv (fn [{:keys [tx-id tx outliner-op]}] + {:tx-id tx-id + :outliner-op outliner-op + :tx-data (vec tx)}) + pending) + empty-tx-ids (->> entries + (filter (comp empty? :tx-data)) + (mapv :tx-id)) + tx-entries (filterv (comp seq :tx-data) entries)] + {:tx-entries tx-entries + :drop-tx-ids empty-tx-ids})) + (defn pending-txs [repo & {:keys [limit]}] (when-let [conn (client-ops-conn repo)] @@ -429,21 +443,16 @@ local-tx (or (client-op/get-local-tx repo) 0) remote-tx (get @*repo->latest-remote-tx repo) conn (worker-state/get-datascript-conn repo)] - (when (and conn (= local-tx remote-tx)) ; rebase + (when (and conn (= local-tx remote-tx)) ; rebase (when (empty? inflight) (when-let [ws (:ws client)] (when (and (ws-open? ws) (worker-state/online?)) (let [batch (pending-txs repo {:limit 50})] (when (seq batch) - (let [tx-entries (->> batch - (mapv (fn [{:keys [tx-id tx outliner-op]}] - {:tx-id tx-id - :outliner-op outliner-op - :tx-data (vec tx)})) - (filterv (comp seq :tx-data))) - tx-ids (mapv :tx-id batch)] - (if (empty? tx-entries) - (remove-pending-txs! repo tx-ids) + (let [{:keys [tx-entries drop-tx-ids]} (prepare-upload-tx-entries conn batch)] + (when (seq drop-tx-ids) + (remove-pending-txs! repo drop-tx-ids)) + (when (seq tx-entries) (-> (p/let [aes-key (when (sync-crypt/graph-e2ee? repo) (sync-crypt/ {:tx (sqlite-util/write-transit-str tx-data)} outliner-op (assoc :outliner-op outliner-op))) - tx-entries*)] + tx-entries*) + tx-ids (mapv :tx-id tx-entries)] (reset! (:inflight client) tx-ids) (send! ws {:type "tx/batch" :t-before local-tx @@ -654,20 +664,17 @@ fallback-target (:fallback-target opts') fallback-target' (or (replay-entity-id-value @conn fallback-target) fallback-target) - target-block (d/entity @conn target-id') use-fallback? (and sibling? - (nil? target-block) + (nil? target) (some? fallback-target)) target-block' (if use-fallback? (d/entity @conn fallback-target') - target-block) + target) move-opts (cond-> (-> opts' (dissoc :fallback-target) (assoc :persist-op? false)) use-fallback? (assoc :sibling? false))] - (when-not target-block' - (invalid-rebase-op! op {:args args})) (outliner-core/move-blocks! conn blocks target-block' move-opts)))) :move-blocks-up-down @@ -826,7 +833,10 @@ (try (ldb/batch-transact-with-temp-conn! conn - {:outliner-op :rebase} + {:outliner-op :rebase + ;; Keep stable tx-id across rebases so one logical pending op + ;; doesn't fan out into duplicated pending rows. + :db-sync/tx-id (:tx-id local-tx)} (fn [conn] (if (= [[:transact nil]] outliner-ops) (when-let [tx-data (seq (:tx local-tx))] @@ -877,7 +887,16 @@ (fix-tx! conn tx-report {:outliner-op :fix})) - (remove-pending-txs! repo (map :tx-id local-txs)) + ;; Successful rebases persist with the original tx-id. + ;; Only drop entries that failed to rebase or became empty no-op txs. + (let [rebased-tx-ids (->> @*rebase-tx-reports + (keep (comp :db-sync/tx-id :tx-meta)) + set) + stale-tx-ids (->> local-txs + (map :tx-id) + (remove rebased-tx-ids) + vec)] + (remove-pending-txs! repo stale-tx-ids)) (catch :default e (js/console.error e) diff --git a/src/test/frontend/worker/db_sync_sim_test.cljs b/src/test/frontend/worker/db_sync_sim_test.cljs index c7ac72cb4d..8faf816656 100644 --- a/src/test/frontend/worker/db_sync_sim_test.cljs +++ b/src/test/frontend/worker/db_sync_sim_test.cljs @@ -248,7 +248,35 @@ (reduce (fn [{:keys [t txs conn] :as state} tx-entry] (let [tx-data (:tx-data tx-entry) - {:keys [db-before db-after tx-data]} (ldb/transact! conn tx-data {:op :apply-client-tx}) + {:keys [db-before db-after tx-data]} + (try + (ldb/transact! conn tx-data {:op :apply-client-tx}) + (catch :default error + (let [missing-entity-id (some-> (ex-data error) :entity-id) + same-entity-txs + (when missing-entity-id + (->> tx-entries + (keep-indexed + (fn [idx entry] + (let [entry-tx (:tx-data entry) + touches? (some #(or (= missing-entity-id (second %)) + (= missing-entity-id (nth % 3 nil))) + entry-tx)] + (when touches? + {:idx idx + :tx-id (:tx-id entry) + :outliner-op (:outliner-op entry) + :tx-data entry-tx})))) + vec))] + (throw (ex-info "server upload transact failed" + {:type :db-sync-sim/server-upload-transact-failed + :t-before t-before + :server-t t + :missing-entity-id missing-entity-id + :matching-entries same-entity-txs + :tx-entry tx-entry + :tx-data tx-data} + error))))) normalized-data (->> tx-data (db-normalize/normalize-tx-data db-after db-before)) next-t (inc t)] @@ -258,15 +286,8 @@ {:accepted? @accepted? :t (:t @server)})) -(defn- build-upload-entries [conn pending] - (->> pending - (mapv (fn [{:keys [tx] :as pending-entry}] - (assoc pending-entry - :tx-data (->> tx - (db-normalize/remove-retract-entity-ref @conn) - distinct - vec)))) - (filterv (comp seq :tx-data)))) +(defn- build-upload-plan [conn pending] + (#'sync-apply/prepare-upload-tx-entries conn pending)) (defn- sync-client! [server {:keys [repo conn client online?]}] (when online? @@ -288,21 +309,20 @@ local-tx' (or (client-op/get-local-tx repo) 0) server-t' (:t @server)] (when (and (seq pending) (= local-tx' server-t')) - (let [tx-entries (build-upload-entries conn pending) - tx-ids (mapv :tx-id pending)] + (let [{:keys [tx-entries drop-tx-ids]} (build-upload-plan conn pending)] + (when (seq drop-tx-ids) + (#'sync-apply/remove-pending-txs! repo drop-tx-ids) + (reset! progress? true)) ;; (prn :debug :upload :repo repo :tx-entries tx-entries) (if (seq tx-entries) - (let [{:keys [accepted? t]} (server-upload! server local-tx' tx-entries)] + (let [{:keys [accepted? t]} (server-upload! server local-tx' tx-entries) + tx-ids (mapv :tx-id tx-entries)] (when accepted? (#'sync-apply/remove-pending-txs! repo tx-ids) (when (seq tx-ids) (client-op/update-local-tx repo t) (reset! progress? true)))) - (do - (#'sync-apply/remove-pending-txs! repo tx-ids) - (when (seq tx-ids) - (client-op/update-local-tx repo (:t @server)) - (reset! progress? true))))))) + nil)))) @progress?))) (defn- active-block-uuids