From 33098113ee2586a3d4d8492755ef7d70abef1820 Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Mon, 16 Mar 2026 18:53:00 +0800 Subject: [PATCH] enhance: returns rejected tx --- .../logseq/db_sync/worker/handler/sync.cljs | 30 ++++++++++++++----- .../db_sync/worker_handler_sync_test.cljs | 24 +++++++++++++++ docs/agent-guide/db-sync/protocol.md | 2 ++ src/main/frontend/worker/sync.cljs | 15 ++++++++-- src/test/frontend/worker/db_sync_test.cljs | 24 +++++++++++++++ 5 files changed, 86 insertions(+), 9 deletions(-) diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs index 2a14df443d..621c7b30a0 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs @@ -321,16 +321,32 @@ (ldb/transact! conn tx-data (cond-> {:op :apply-client-tx} outliner-op (assoc :outliner-op outliner-op)))))) +(defn- db-transact-failed-response + [sql tx-entry] + {:type "tx/reject" + :reason "db transact failed" + :t (storage/get-t sql) + :data (common/write-transit tx-entry)}) + (defn- apply-tx! [^js self sender tx-entries] (let [sql (.-sql self)] (ensure-conn! self) (let [conn (.-conn self)] - (doseq [tx-entry tx-entries] - (apply-tx-entry! conn tx-entry)) - (let [new-t (storage/get-t sql)] - ;; FIXME: no need to broadcast if client tx is less than remote tx - (ws/broadcast! self sender {:type "changed" :t new-t}) - new-t)))) + (loop [remaining tx-entries] + (if-let [tx-entry (first remaining)] + (let [result (try + (apply-tx-entry! conn tx-entry) + ::ok + (catch :default e + (log/error :db-sync/transact-failed e) + (db-transact-failed-response sql tx-entry)))] + (if (= ::ok result) + (recur (next remaining)) + result)) + (let [new-t (storage/get-t sql)] + ;; FIXME: no need to broadcast if client tx is less than remote tx + (ws/broadcast! self sender {:type "changed" :t new-t}) + new-t)))))) (defn handle-tx-batch! [^js self sender txs t-before] (let [current-t (t-now self)] @@ -361,7 +377,7 @@ (log/error :db-sync/transact-failed e) {:type "tx/reject" :reason "db transact failed" - :t current-t})) + :t (t-now self)})) {:type "tx/reject" :reason "empty tx data"})))) diff --git a/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs index 682118d59a..8840abd24a 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs @@ -188,6 +188,30 @@ (is (= "tx/reject" (:type response))) (is (= "snapshot upload in progress" (:reason response))))) +(deftest tx-batch-rejects-with-the-exact-failed-tx-entry-test + (testing "db transact failure replies with the specific rejected tx entry" + (let [sql (test-sql/make-sql) + conn (d/create-conn db-schema/schema) + self #js {:sql sql + :conn conn + :schema-ready true} + tx-entry-1 {:tx (protocol/tx->transit [[:db/add -1 :block/title "ok"]]) + :outliner-op :save-block} + tx-entry-2 {:tx (protocol/tx->transit [[:db/add -2 :block/title "bad"]]) + :outliner-op :save-block} + apply-calls (atom 0) + response (with-redefs [ws/broadcast! (fn [& _] nil) + sync-handler/apply-tx-entry! (fn [_conn tx-entry] + (swap! apply-calls inc) + (when (= 2 @apply-calls) + (throw (ex-info "DB write failed with invalid data" + {:tx-entry tx-entry}))))] + (sync-handler/handle-tx-batch! self nil [tx-entry-1 tx-entry-2] 0))] + (is (= "tx/reject" (:type response))) + (is (= "db transact failed" (:reason response))) + (is (= 0 (:t response))) + (is (= tx-entry-2 (common/read-transit (:data response))))))) + (deftest sync-pull-is-blocked-when-graph-is-not-ready-for-use-test (async done (let [self #js {:env #js {"DB" :db} diff --git a/docs/agent-guide/db-sync/protocol.md b/docs/agent-guide/db-sync/protocol.md index ba8b866544..c98c05f0b2 100644 --- a/docs/agent-guide/db-sync/protocol.md +++ b/docs/agent-guide/db-sync/protocol.md @@ -34,6 +34,8 @@ - Client tx is based on stale t. - `{"type":"tx/reject","reason":"cycle","data":" :server-values ...}>"}` - Cycle detected with server values. +- `{"type":"tx/reject","reason":"db transact failed","t":,"data":"\" :outliner-op ...}>"}` + - Server-side transact/validation failed for one tx entry in the batch; `data` echoes the rejected tx entry for debugging. - `{"type":"tx/reject","reason":"empty tx data"|"invalid tx"|"invalid t-before"}` - Invalid batch. - `{"type":"pong"}` diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index b7f08e59e7..2a86118fd4 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -1823,8 +1823,19 @@ "stale" (send! (:ws client) {:type "pull" :since local-tx}) - (fail-fast :db-sync/invalid-field - {:repo repo :type "tx/reject" :reason reason}))) + (let [data (when-let [raw-data (:data message)] + (parse-transit raw-data + {:repo repo + :type "tx/reject" + :reason reason + :field :data}))] + (fail-fast :db-sync/tx-rejected + (cond-> {:type :db-sync/tx-rejected + :repo repo + :message-type "tx/reject" + :reason reason} + (contains? message :t) (assoc :t remote-tx) + (some? data) (assoc :data data)))))) (fail-fast :db-sync/invalid-field {:repo repo :type (:type message)}))))) diff --git a/src/test/frontend/worker/db_sync_test.cljs b/src/test/frontend/worker/db_sync_test.cljs index 1d6aedc58b..fc8540d55f 100644 --- a/src/test/frontend/worker/db_sync_test.cljs +++ b/src/test/frontend/worker/db_sync_test.cljs @@ -256,6 +256,30 @@ (reset! db-sync/*repo->latest-remote-tx latest-prev) (done)))))))))) +(deftest tx-reject-db-transact-failed-surfaces-rejected-tx-test + (testing "tx/reject with db transact failed includes parsed rejected tx for debugging" + (let [rejected-tx {:tx (sqlite-util/write-transit-str [[:db/add [:block/uuid (random-uuid)] :block/title "bad"]]) + :outliner-op :save-block} + raw-message (js/JSON.stringify + (clj->js {:type "tx/reject" + :reason "db transact failed" + :t 3 + :data (sqlite-util/write-transit-str rejected-tx)})) + client {:repo test-repo + :graph-id "graph-1" + :inflight (atom []) + :online-users (atom []) + :ws-state (atom :open)}] + (with-redefs [client-op/get-local-tx (constantly 0)] + (try + (#'db-sync/handle-message! test-repo client raw-message) + (is false "expected tx/reject to fail-fast with rejected tx details") + (catch :default error + (let [data (ex-data error)] + (is (= :db-sync/tx-rejected (:type data))) + (is (= "db transact failed" (:reason data))) + (is (= rejected-tx (:data data)))))))))) + (deftest pull-ok-batched-txs-preserve-tempid-boundaries-test (testing "pull/ok applies tx batches without cross-tx tempid collisions" (async done