fix(rtc): batch store and validate db (#12249)

* fix(rtc): batch store and validate db

* fix: logseq.db/transact! shouldn't distinct tx-data

since move-op include 2 steps:
1. insert-block
2. update-attrs

This results in db invalid after step 1.

* refactor: add transact-with-temp-conn!

* bump nbb-logseq and add tests for ldb/transact* fns

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: rcmerci <rcmerci@gmail.com>
This commit is contained in:
Tienson Qin
2025-12-09 20:47:37 +08:00
committed by GitHub
parent 61d28c307e
commit f594e2034f
22 changed files with 138 additions and 57 deletions

View File

@@ -160,7 +160,7 @@ jobs:
bb: ${{ env.BABASHKA_VERSION }}
- name: Fetch scripts yarn deps
run: cd scripts && yarn install --frozen-lockfile
run: cd scripts && yarn install
- name: Create DB graph with properties
run: cd scripts && yarn nbb-logseq src/logseq/tasks/db_graph/create_graph_with_properties.cljs ./properties-graph

View File

@@ -1,6 +1,6 @@
(ns logseq.e2e.rtc-extra-test
(:require
[clojure.test :refer [deftest testing is use-fixtures run-test run-tests]]
[clojure.test :refer [deftest testing is use-fixtures run-test]]
[com.climate.claypoole :as cp]
[logseq.e2e.assert :as assert]
[logseq.e2e.block :as b]

View File

@@ -5,7 +5,7 @@
:sha "5d672bf84ed944414b9f61eeb83808ead7be9127"}
datascript/datascript {:git/url "https://github.com/logseq/datascript" ;; fork
:sha "3971e2d43bd93d89f42191dc7b4b092989e0cc61"}
:sha "36fd248dcdb2b2d427ed342c53a3ddb9cb540b33"}
;; datascript/datascript {:local/root "../../datascript"}
datascript-transit/datascript-transit {:mvn/version "0.3.0"}

View File

@@ -10,7 +10,7 @@
},
"license": "MIT",
"dependencies": {
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v29",
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v30",
"@modelcontextprotocol/sdk": "^1.17.5",
"better-sqlite3": "~11.10.0",
"fastify": "5.3.2",

6
deps/cli/yarn.lock vendored
View File

@@ -43,9 +43,9 @@
"@fastify/forwarded" "^3.0.0"
ipaddr.js "^2.1.0"
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v29":
version "1.2.173-feat-db-v29"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/6c4f8eec72a0a5b0c7b96c32cc73f86b045305c5"
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v30":
version "1.2.173-feat-db-v30"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/1ad8d923adfe1a3423337127b987d353910306ab"
dependencies:
import-meta-resolve "^4.1.0"

View File

@@ -3,7 +3,7 @@
"version": "1.0.0",
"private": true,
"devDependencies": {
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v29"
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v30"
},
"scripts": {
"test": "yarn nbb-logseq -cp test -m nextjournal.test-runner"

View File

@@ -2,9 +2,9 @@
# yarn lockfile v1
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v29":
version "1.2.173-feat-db-v29"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/6c4f8eec72a0a5b0c7b96c32cc73f86b045305c5"
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v30":
version "1.2.173-feat-db-v30"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/1ad8d923adfe1a3423337127b987d353910306ab"
dependencies:
import-meta-resolve "^4.1.0"

2
deps/db/deps.edn vendored
View File

@@ -1,7 +1,7 @@
{:deps
;; These nbb-logseq deps are kept in sync with https://github.com/logseq/nbb-logseq/blob/main/bb.edn
{datascript/datascript {:git/url "https://github.com/logseq/datascript" ;; fork
:sha "3971e2d43bd93d89f42191dc7b4b092989e0cc61"}
:sha "36fd248dcdb2b2d427ed342c53a3ddb9cb540b33"}
;; datascript/datascript {:local/root "../../../../datascript"}
datascript-transit/datascript-transit {:mvn/version "0.3.0"
:exclusions [datascript/datascript]}

View File

@@ -3,7 +3,7 @@
"version": "1.0.0",
"private": true,
"devDependencies": {
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v29"
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v30"
},
"dependencies": {
"better-sqlite3": "11.10.0"

View File

@@ -103,17 +103,18 @@
{:tx-data tx-data}))))
(defn- transact-sync
[repo-or-conn tx-data tx-meta]
[conn tx-data tx-meta]
(try
(let [conn repo-or-conn
db @conn
(let [db @conn
db-based? (entity-plus/db-based-graph? db)]
(if (and db-based?
(not (:reset-conn! tx-meta))
(not (:initial-db? tx-meta))
(not (:rtc-download-graph? tx-meta))
(not (:skip-validate-db? tx-meta false))
(not (:logseq.graph-parser.exporter/new-graph? tx-meta)))
(not
(or (:batch-temp-conn? @conn)
(:rtc-download-graph? tx-meta)
(:reset-conn! tx-meta)
(:initial-db? tx-meta)
(:skip-validate-db? tx-meta false)
(:logseq.graph-parser.exporter/new-graph? tx-meta))))
(let [tx-report* (d/with db tx-data tx-meta)
pipeline-f @*transact-pipeline-fn
tx-report (if-let [f pipeline-f] (f tx-report*) tx-report*)
@@ -132,7 +133,8 @@
;; notify ui
(when-let [f @*transact-invalid-callback]
(f tx-report errors))
(throw (ex-info "DB write failed with invalid data" {:tx-data tx-data
(throw (ex-info "DB write failed with invalid data" {:tx-meta tx-meta
:tx-data tx-data
:errors errors
:pipeline-tx-data (map
(fn [[e a v t]]
@@ -168,7 +170,7 @@
(empty? m)))))
delete-blocks-tx (when-not (string? repo-or-conn)
(delete-blocks/update-refs-history-and-macros @repo-or-conn tx-data tx-meta))
tx-data (distinct (concat tx-data delete-blocks-tx))]
tx-data (concat tx-data delete-blocks-tx)]
;; Ensure worker can handle the request sequentially (one by one)
;; Because UI assumes that the in-memory db has all the data except the last one transaction
@@ -182,6 +184,28 @@
(transact-fn repo-or-conn tx-data tx-meta)
(transact-sync repo-or-conn tx-data tx-meta))))))
(defn transact-with-temp-conn!
"Validate db and store once for a batch transaction, the `temp` conn can still load data from disk,
however it can't write to the disk."
[conn tx-meta batch-tx-fn]
(let [temp-conn (d/conn-from-db @conn)
*batch-tx-data (volatile! [])]
;; can read from disk, write is disallowed
(swap! temp-conn assoc
:skip-store? true
:batch-temp-conn? true)
(d/listen! temp-conn ::temp-conn-batch-tx
(fn [{:keys [tx-data]}]
(vswap! *batch-tx-data into tx-data)))
(batch-tx-fn temp-conn)
(let [tx-data @*batch-tx-data]
(d/unlisten! temp-conn ::temp-conn-batch-tx)
(reset! temp-conn nil)
(vreset! *batch-tx-data nil)
(when (seq tx-data)
;; transact tx-data to `conn` and validate db
(transact! conn tx-data tx-meta)))))
(def page? common-entity-util/page?)
(def internal-page? entity-util/internal-page?)
(def class? entity-util/class?)

6
deps/db/yarn.lock vendored
View File

@@ -2,9 +2,9 @@
# yarn lockfile v1
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v29":
version "1.2.173-feat-db-v29"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/6c4f8eec72a0a5b0c7b96c32cc73f86b045305c5"
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v30":
version "1.2.173-feat-db-v30"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/1ad8d923adfe1a3423337127b987d353910306ab"
dependencies:
import-meta-resolve "^4.1.0"

View File

@@ -3,7 +3,7 @@
"version": "1.0.0",
"private": true,
"devDependencies": {
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v29",
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v30",
"better-sqlite3": "11.10.0"
},
"dependencies": {

View File

@@ -2,9 +2,9 @@
# yarn lockfile v1
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v29":
version "1.2.173-feat-db-v29"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/6c4f8eec72a0a5b0c7b96c32cc73f86b045305c5"
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v30":
version "1.2.173-feat-db-v30"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/1ad8d923adfe1a3423337127b987d353910306ab"
dependencies:
import-meta-resolve "^4.1.0"

View File

@@ -1,7 +1,7 @@
{:deps
;; These nbb-logseq deps are kept in sync with https://github.com/logseq/nbb-logseq/blob/main/bb.edn
{datascript/datascript {:git/url "https://github.com/logseq/datascript" ;; fork
:sha "3971e2d43bd93d89f42191dc7b4b092989e0cc61"}
:sha "36fd248dcdb2b2d427ed342c53a3ddb9cb540b33"}
;; datascript/datascript {:local/root "../../../../datascript"}
com.cognitect/transit-cljs {:mvn/version "0.8.280"}

View File

@@ -3,7 +3,7 @@
"version": "1.0.0",
"private": true,
"devDependencies": {
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v29"
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v30"
},
"dependencies": {
"better-sqlite3": "11.10.0",

View File

@@ -2,9 +2,9 @@
# yarn lockfile v1
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v29":
version "1.2.173-feat-db-v29"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/6c4f8eec72a0a5b0c7b96c32cc73f86b045305c5"
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v30":
version "1.2.173-feat-db-v30"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/1ad8d923adfe1a3423337127b987d353910306ab"
dependencies:
import-meta-resolve "^4.1.0"

View File

@@ -3,7 +3,7 @@
"version": "1.0.0",
"private": true,
"devDependencies": {
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v29",
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v30",
"mldoc": "^1.5.9"
},
"dependencies": {

View File

@@ -2,9 +2,9 @@
# yarn lockfile v1
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v29":
version "1.2.173-feat-db-v29"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/6c4f8eec72a0a5b0c7b96c32cc73f86b045305c5"
"@logseq/nbb-logseq@github:logseq/nbb-logseq#feat-db-v30":
version "1.2.173-feat-db-v30"
resolved "https://codeload.github.com/logseq/nbb-logseq/tar.gz/1ad8d923adfe1a3423337127b987d353910306ab"
dependencies:
import-meta-resolve "^4.1.0"

View File

@@ -3,7 +3,7 @@
"version": "1.0.0",
"private": true,
"devDependencies": {
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v29"
"@logseq/nbb-logseq": "github:logseq/nbb-logseq#feat-db-v30"
},
"dependencies": {
"better-sqlite3": "11.10.0",

View File

@@ -21,7 +21,6 @@
[logseq.db.common.property-util :as db-property-util]
[logseq.db.frontend.property :as db-property]
[logseq.graph-parser.whiteboard :as gp-whiteboard]
[logseq.outliner.batch-tx :as batch-tx]
[logseq.outliner.core :as outliner-core]
[logseq.outliner.transaction :as outliner-tx]
[missionary.core :as m]))
@@ -192,9 +191,11 @@
(when-let [target-b
(d/entity @conn (:db/id (:block/page (d/entity @conn [:block/uuid block-uuid]))))]
(transact-db! :move-blocks&persist-op repo conn [b] target-b {:sibling? false}))))
(doseq [block-uuid block-uuids-to-remove]
(when-let [b (d/entity @conn [:block/uuid block-uuid])]
(transact-db! :delete-blocks repo conn date-formatter [b] {}))))))
(let [deleting-blocks (keep (fn [block-uuid]
(d/entity @conn [:block/uuid block-uuid]))
block-uuids-to-remove)]
(when (seq deleting-blocks)
(transact-db! :delete-blocks repo conn date-formatter deleting-blocks {}))))))
(defn- insert-or-move-block
[repo conn block-uuid remote-parents remote-block-order move? op-value]
@@ -209,8 +210,7 @@
(if move?
(transact-db! :move-blocks repo conn [(block-reuse-db-id b)] local-parent {:sibling? false})
(transact-db! :insert-blocks repo conn
[{:block/uuid block-uuid
:block/title ""}]
[{:block/uuid block-uuid}]
local-parent {:sibling? false :keep-uuid? true}))
(transact-db! :update-block-order-directly repo conn block-uuid first-remote-parent remote-block-order))
@@ -681,20 +681,25 @@
update-ops (vals update-ops-map)
update-page-ops (vals update-page-ops-map)
remove-page-ops (vals remove-page-ops-map)
db-before @conn]
db-before @conn
tx-meta {:rtc-tx? true
:persist-op? false
:gen-undo-ops? false}]
(rtc-log-and-state/update-remote-t graph-uuid remote-t)
(js/console.groupCollapsed "rtc/apply-remote-ops-log")
(batch-tx/with-batch-tx-mode conn {:rtc-tx? true
:persist-op? false
:gen-undo-ops? false}
(worker-util/profile :ensure-refed-blocks-exist (ensure-refed-blocks-exist repo conn refed-blocks))
(worker-util/profile :apply-remote-update-page-ops (apply-remote-update-page-ops repo conn update-page-ops))
(worker-util/profile :apply-remote-move-ops (apply-remote-move-ops repo conn sorted-move-ops))
(worker-util/profile :apply-remote-update-ops (apply-remote-update-ops repo conn update-ops))
(worker-util/profile :apply-remote-remove-page-ops (apply-remote-remove-page-ops repo conn remove-page-ops)))
(ldb/transact-with-temp-conn!
conn tx-meta
(fn [temp-conn]
(worker-util/profile :ensure-refed-blocks-exist (ensure-refed-blocks-exist repo temp-conn refed-blocks))
(worker-util/profile :apply-remote-update-page-ops (apply-remote-update-page-ops repo temp-conn update-page-ops))
(worker-util/profile :apply-remote-move-ops (apply-remote-move-ops repo temp-conn sorted-move-ops))
(worker-util/profile :apply-remote-update-ops (apply-remote-update-ops repo temp-conn update-ops))
(worker-util/profile :apply-remote-remove-page-ops (apply-remote-remove-page-ops repo temp-conn remove-page-ops))))
;; NOTE: we cannot set :persist-op? = true when batch-tx/with-batch-tx-mode (already set to false)
;; and there're some transactions in `apply-remote-remove-ops` need to :persist-op?=true
(worker-util/profile :apply-remote-remove-ops (apply-remote-remove-ops repo conn date-formatter remove-ops))
;; wait all remote-ops transacted into db,
;; then start to check any asset-updates in remote
(let [db-after @conn]

View File

@@ -156,6 +156,14 @@
:logseq.property/status [status-value-uuid2]}}
r)))))
(defn- apply-move-ops!
[repo conn move-ops]
(ldb/transact-with-temp-conn!
conn
{}
(fn [temp-conn]
(#'r.remote/apply-remote-move-ops repo temp-conn move-ops))))
(deftest apply-remote-move-ops-test
(let [repo (state/get-current-repo)
conn (conn/get-db repo false)
@@ -182,6 +190,7 @@
:block/parent [:block/uuid page-uuid]}]
(ldb/get-page @conn page-name)
{:sibling? false :keep-uuid? true}))
(testing "apply-remote-move-ops-test1"
(let [data-from-ws {:req-id "req-id"
:t 1 ;; not used
@@ -199,7 +208,7 @@
(#'r.remote/affected-blocks->diff-type-ops
repo (:affected-blocks data-from-ws))))]
(is (rtc-schema/data-from-ws-validator data-from-ws) data-from-ws)
(#'r.remote/apply-remote-move-ops repo conn move-ops)
(apply-move-ops! repo conn move-ops)
(let [page-blocks (ldb/get-page-blocks @conn (:db/id (ldb/get-page @conn page-name)) {})]
(is (= #{uuid1-remote uuid1-client uuid2-client} (set (map :block/uuid page-blocks)))
[uuid1-remote uuid1-client uuid2-client])
@@ -229,7 +238,7 @@
(#'r.remote/affected-blocks->diff-type-ops
repo (:affected-blocks data-from-ws))))]
(is (rtc-schema/data-from-ws-validator data-from-ws))
(#'r.remote/apply-remote-move-ops repo conn move-ops)
(apply-move-ops! repo conn move-ops)
(let [page-blocks (ldb/get-page-blocks @conn (:db/id (ldb/get-page @conn page-name)) {})]
(is (= #{uuid1-remote uuid2-remote uuid1-client uuid2-client} (set (map :block/uuid page-blocks))))
(is (= ["a0" "a1"]

View File

@@ -0,0 +1,43 @@
(ns logseq.db-test
(:require [cljs.test :refer [deftest is testing] :as t]
[datascript.core :as d]
[frontend.db.conn :as conn]
[frontend.test.helper :as test-helper]
[logseq.db :as ldb]))
;; TODO: move tests to deps/db
(t/use-fixtures :each
test-helper/db-based-start-and-destroy-db-map-fixture)
(deftest test-transact-with-multiple-tx-datoms
(testing "last write wins with same tx"
(let [conn (d/create-conn)]
(d/transact! conn [[:db/add -1 :property :v1]])
(let [tx (:max-tx @conn)]
(ldb/transact! conn
[(d/datom 1 :property :v1 (inc tx) false)
(d/datom 1 :property :v1 (inc tx) true)]))
(is (= :v1 (:property (d/entity @conn 1))))))
(testing "last write wins with different tx"
(let [conn (d/create-conn)]
(d/transact! conn [[:db/add -1 :property :v1]])
(let [tx (:max-tx @conn)]
(ldb/transact! conn
[(d/datom 1 :property :v1 (inc tx) false)
(d/datom 1 :property :v1 (+ tx 2) true)]))
(is (= :v1 (:property (d/entity @conn 1)))))))
(deftest test-transact-with-temp-conn!
(testing "DB validation should be running after the whole transaction"
(let [conn (conn/get-db false)]
(testing "#Task shouldn't be converted to property"
(is (thrown? js/Error (ldb/transact! conn [{:db/ident :logseq.class/Task
:block/tags :logseq.class/Property}]))))
(ldb/transact-with-temp-conn!
conn
{}
(fn [temp-conn]
(ldb/transact! temp-conn [{:db/ident :logseq.class/Task
:block/tags :logseq.class/Property}])
(ldb/transact! temp-conn [[:db/retract :logseq.class/Task :block/tags :logseq.class/Property]]))))))