refactor(sync): move client ops to sqlite and harden history ops

This commit is contained in:
Tienson Qin
2026-04-10 05:44:13 +08:00
parent bbe75823c0
commit bf04d4cf5d
14 changed files with 986 additions and 583 deletions

View File

@@ -114,12 +114,11 @@
(defn- <create-cards-block!
[]
(let [cards-tag-id (:db/id (db/entity :logseq.class/Cards))]
(editor-handler/api-insert-new-block! ""
{:page (date/today)
:properties {:block/tags #{cards-tag-id}}
:sibling? false
:end? true})))
(editor-handler/api-insert-new-block! ""
{:page (date/today)
:properties {:block/tags #{:logseq.class/Cards}}
:sibling? false
:end? true}))
(defn- btn-with-shortcut [{:keys [shortcut id btn-text due on-click class]}]
(let [bg-class (case id

View File

@@ -77,7 +77,7 @@
(:db/id color))) colors)]
(when color-id
(let [properties (cond->
{:block/tags #{(:db/id (db/entity :logseq.class/Pdf-annotation))}
{:block/tags #{:logseq.class/Pdf-annotation}
:block/collapsed? image?
:logseq.property/ls-type :annotation
:logseq.property.pdf/hl-color color-id

View File

@@ -1347,8 +1347,7 @@
(notification/show! [:div "Asset size shouldn't be larger than 100M"]
:warning
false)
(throw (ex-info "Asset size shouldn't be larger than 100M" {:file-name file-name})))
asset (db/entity :logseq.class/Asset)]
(throw (ex-info "Asset size shouldn't be larger than 100M" {:file-name file-name})))]
(p/do!
(when file
(let [file-path (str block-id "." ext)]
@@ -1359,7 +1358,9 @@
:logseq.property.asset/external-url external-url
:logseq.property.asset/size size
:logseq.property.asset/checksum checksum
:block/tags #{(:db/id asset)}})))))
;; Use stable class ident in tx payload to avoid leaking numeric eids
;; into outliner history ops shared with the worker sync pipeline.
:block/tags #{:logseq.class/Asset}})))))
(defn db-based-save-assets!
"Save incoming(pasted) assets to assets directory.

View File

@@ -343,9 +343,7 @@
[repo {:keys [config datoms sync-download-graph?] :as opts}]
(when-not (worker-state/get-sqlite-conn repo)
(p/let [[db search-db client-ops-db :as dbs] (get-dbs repo)
storage (new-sqlite-storage db)
client-ops-storage (when-not @*publishing?
(new-sqlite-storage client-ops-db))]
storage (new-sqlite-storage db)]
(swap! *sqlite-conns assoc repo {:db db
:search search-db
:client-ops client-ops-db})
@@ -378,16 +376,13 @@
(partition-all batch-size))]
(doseq [batch non-ident-batches]
(d/transact! conn batch {:initial-db? true}))))
client-ops-conn (when-not @*publishing? (common-sqlite/get-storage-conn
client-ops-storage
client-op/schema-in-db))
initial-data-exists? (when (nil? datoms)
(and (d/entity @conn :logseq.class/Root)
(= "db" (:kv/value (d/entity @conn :logseq.kv/db-type)))))]
(swap! *datascript-conns assoc repo conn)
(swap! *client-ops-conns assoc repo client-ops-conn)
(when (and (not @*publishing?) (not= client-op/schema-in-db (d/schema @client-ops-conn)))
(d/reset-schema! client-ops-conn client-op/schema-in-db))
(swap! *client-ops-conns assoc repo client-ops-db)
(when-not @*publishing?
(client-op/ensure-sqlite-schema! client-ops-db))
(ensure-client-ops-cleanup-timer! repo)
(let [initial-tx-report (when-not (or initial-data-exists?
(seq datoms)

View File

@@ -244,36 +244,33 @@
(declare apply-history-action!)
(defn- persist-local-tx!
[repo {:keys [db-before db-after tx-data tx-meta] :as tx-report} normalized-tx-data reversed-datoms]
(when-let [conn (client-ops-conn repo)]
(when (client-ops-conn repo)
(let [tx-id (or (:db-sync/tx-id tx-meta) (random-uuid))
existing-ent (d/entity @conn [:db-sync/tx-id tx-id])
should-inc-pending? (not= true (:db-sync/pending? existing-ent))
now (.now js/Date)
created-at (or (:db-sync/created-at existing-ent) now)
{:keys [forward-outliner-ops inverse-outliner-ops]}
(derive-history-outliner-ops db-before db-after tx-data tx-meta)
inferred-outliner-ops?' (inferred-outliner-ops? tx-meta)]
inferred-outliner-ops?' (inferred-outliner-ops? tx-meta)
{:keys [should-inc-pending?]}
(client-op/upsert-local-tx-entry!
repo
{:tx-id tx-id
:created-at now
:pending? true
:failed? false
:outliner-op (:outliner-op tx-meta)
:undo-redo (cond
(:undo? tx-meta) :undo
(:redo? tx-meta) :redo
:else :none)
:forward-outliner-ops forward-outliner-ops
:inverse-outliner-ops inverse-outliner-ops
:inferred-outliner-ops? inferred-outliner-ops?'
:normalized-tx-data normalized-tx-data
:reversed-tx-data reversed-datoms})]
;; (prn :debug :forward-outliner-ops)
;; (cljs.pprint/pprint forward-outliner-ops)
;; (prn :debug :inverse-outliner-ops)
;; (cljs.pprint/pprint inverse-outliner-ops)
(ldb/transact! conn [{:db-sync/tx-id tx-id
:db-sync/normalized-tx-data normalized-tx-data
:db-sync/reversed-tx-data reversed-datoms
:db-sync/pending? true
:db-sync/failed? false
:db-sync/outliner-op (:outliner-op tx-meta)
:db-sync/undo-redo? (cond
(:undo? tx-meta)
:undo
(:redo? tx-meta)
:redo
:else
:none)
:db-sync/forward-outliner-ops forward-outliner-ops
:db-sync/inverse-outliner-ops inverse-outliner-ops
:db-sync/inferred-outliner-ops? inferred-outliner-ops?'
:db-sync/created-at created-at}])
(worker-undo-redo/gen-undo-ops! repo tx-report tx-id
{:apply-history-action! apply-history-action!})
(when should-inc-pending?
@@ -298,79 +295,29 @@
(defn pending-txs
[repo & {:keys [limit]}]
(when-let [conn (client-ops-conn repo)]
(let [db @conn
datoms (d/datoms db :avet :db-sync/created-at)
take-limit (fn [c]
(if limit (take limit c) c))]
(->> datoms
(map (fn [datom]
(d/entity db (:e datom))))
(filter (fn [e] (:db-sync/pending? e)))
take-limit
(keep (fn [ent]
(let [tx-id (:db-sync/tx-id ent)
tx' (:db-sync/normalized-tx-data ent)
reversed-tx' (:db-sync/reversed-tx-data ent)]
{:tx-id tx-id
:outliner-op (:db-sync/outliner-op ent)
:forward-outliner-ops (:db-sync/forward-outliner-ops ent)
:inverse-outliner-ops (:db-sync/inverse-outliner-ops ent)
:inferred-outliner-ops? (:db-sync/inferred-outliner-ops? ent)
:db-sync/undo-redo (:db-sync/undo-redo? ent)
:tx tx'
:reversed-tx reversed-tx'})))
vec))))
(client-op/get-pending-local-txs repo :limit limit))
(defn- pending-tx-by-id
[repo tx-id]
(when-let [conn (client-ops-conn repo)]
(when-let [ent (d/entity @conn [:db-sync/tx-id tx-id])]
{:tx-id (:db-sync/tx-id ent)
:outliner-op (:db-sync/outliner-op ent)
:forward-outliner-ops (:db-sync/forward-outliner-ops ent)
:inverse-outliner-ops (:db-sync/inverse-outliner-ops ent)
:db-sync/undo-redo (:db-sync/undo-redo? ent)
:tx (:db-sync/normalized-tx-data ent)
:reversed-tx (:db-sync/reversed-tx-data ent)})))
(client-op/get-local-tx-entry repo tx-id))
(defn mark-pending-txs-false!
[repo tx-ids]
(when (seq tx-ids)
(when-let [conn (client-ops-conn repo)]
(let [pending-to-remove (->> tx-ids
(keep (fn [tx-id]
(when (true? (:db-sync/pending? (d/entity @conn [:db-sync/tx-id tx-id])))
tx-id)))
count)]
(ldb/transact! conn
(mapv (fn [tx-id]
[:db/add [:db-sync/tx-id tx-id] :db-sync/pending? false])
tx-ids))
(when (pos? pending-to-remove)
(client-op/adjust-pending-local-tx-count! repo (- pending-to-remove)))
(when-let [client (current-client repo)]
(broadcast-rtc-state! client))))))
(when-let [pending-to-remove (client-op/mark-pending-txs-false! repo tx-ids)]
(when (pos? pending-to-remove)
(client-op/adjust-pending-local-tx-count! repo (- pending-to-remove)))
(when-let [client (current-client repo)]
(broadcast-rtc-state! client)))))
(defn mark-failed-txs!
[repo tx-ids]
(when (seq tx-ids)
(when-let [conn (client-ops-conn repo)]
(let [pending-to-remove (->> tx-ids
(keep (fn [tx-id]
(when (true? (:db-sync/pending? (d/entity @conn [:db-sync/tx-id tx-id])))
tx-id)))
count)]
(ldb/transact! conn
(mapv (fn [tx-id]
{:db-sync/tx-id tx-id
:db-sync/pending? false
:db-sync/failed? true})
tx-ids))
(when (pos? pending-to-remove)
(client-op/adjust-pending-local-tx-count! repo (- pending-to-remove)))
(when-let [client (current-client repo)]
(broadcast-rtc-state! client))))))
(when-let [pending-to-remove (client-op/mark-failed-txs! repo tx-ids)]
(when (pos? pending-to-remove)
(client-op/adjust-pending-local-tx-count! repo (- pending-to-remove)))
(when-let [client (current-client repo)]
(broadcast-rtc-state! client)))))
(defn clear-pending-txs!
[repo]

View File

@@ -1,9 +1,11 @@
(ns frontend.worker.sync.client-op
"Store client-ops in a persisted datascript"
"Store client sync metadata and ops in sqlite tables.
DataScript client-op storage is deprecated and unsupported."
(:require [datascript.core :as d]
[frontend.worker.state :as worker-state]
[goog.object :as gobj]
[lambdaisland.glogi :as log]
[logseq.db :as ldb]
[logseq.db.sqlite.util :as sqlite-util]
[malli.core :as ma]
[malli.transform :as mt]))
@@ -29,85 +31,241 @@
(defonce *repo->pending-local-tx-count (atom {}))
(def schema-in-db
"TODO: rename this db-name from client-op to client-metadata+op.
and move it to its own namespace."
{:block/uuid {:db/unique :db.unique/identity}
:db-ident {:db/unique :db.unique/identity}
;; local-tx is the latest remote-tx that local db persists
:local-tx {:db/index true}
:graph-uuid {:db/index true}
:db-sync/checksum {:db/index true}
:db-sync/tx-id {:db/unique :db.unique/identity}
:db-sync/created-at {:db/index true}
:db-sync/pending? {:db/index true}
:db-sync/failed? {:db/index true}
:db-sync/outliner-op {}
:db-sync/forward-outliner-ops {}
:db-sync/inverse-outliner-ops {}
:db-sync/inferred-outliner-ops? {}
:db-sync/normalized-tx-data {}
:db-sync/reversed-tx-data {}
:db-sync/asset-op? {:db/index true}})
(def ^:private sqlite-schema-ready-key "__logseq_client_ops_schema_ready")
(def ^:private sqlite-mode-key "__logseq_client_ops_sqlite_mode")
(def ^:private sync-meta-table-sql
"create table if not exists sync_meta (key text primary key, value text)")
(def ^:private client-ops-table-sql
(str "create table if not exists client_ops ("
"id integer primary key autoincrement,"
"kind text not null,"
"created_at integer not null,"
"tx_id text unique,"
"pending integer not null default 0,"
"failed integer not null default 0,"
"outliner_op text,"
"undo_redo text,"
"forward_outliner_ops text,"
"inverse_outliner_ops text,"
"inferred_outliner_ops integer,"
"normalized_tx_data text,"
"reversed_tx_data text,"
"asset_uuid text,"
"asset_op text,"
"asset_t integer,"
"asset_value text"
")"))
(def ^:private pending-index-sql
"create index if not exists idx_client_ops_pending_created on client_ops(kind, pending, created_at, id)")
(def ^:private asset-index-sql
"create index if not exists idx_client_ops_asset_uuid on client_ops(kind, asset_uuid)")
(defn- client-ops-store
[repo]
(worker-state/get-client-ops-conn repo))
(declare ensure-sqlite-schema!)
(defn- detect-sqlite-mode
[^js db]
(or (gobj/get db sqlite-mode-key)
(let [mode
(cond
(not db) nil
(fn? (gobj/get db "prepare"))
(try
(let [^js stmt (.prepare db "select 1")]
(try
(if (fn? (gobj/get stmt "run"))
:better-sqlite
:sqlite-wasm)
(finally
(when (fn? (gobj/get stmt "finalize"))
(.finalize stmt)))))
(catch :default _
:sqlite-wasm))
(fn? (gobj/get db "exec")) :sqlite-wasm
:else nil)]
(when mode
(try
(gobj/set db sqlite-mode-key mode)
(catch :default _
nil)))
mode)))
(defn- sqlite-db?
[conn]
(some? (detect-sqlite-mode conn)))
(defn- sqlite-store-or-throw
[repo]
(when-let [store (client-ops-store repo)]
(if (sqlite-db? store)
(do
(ensure-sqlite-schema! store)
store)
(throw (ex-info "Legacy DataScript client-op storage is unsupported. Please back up the graph and re-download it."
{:type :db-sync/legacy-client-ops-storage
:repo repo})))))
(defn- parse-uuid-str
[v]
(when (string? v)
(try
(uuid v)
(catch :default _
nil))))
(defn- kw->str
[v]
(cond
(keyword? v) (name v)
(string? v) v
:else nil))
(defn- str->kw
[v]
(when (string? v)
(keyword v)))
(defn- bool->int [v] (if v 1 0))
(defn- int->bool [v] (not (or (nil? v) (= 0 v) (= false v))))
(defn- sqlite-run!
[^js db sql params]
(case (detect-sqlite-mode db)
:better-sqlite
(let [^js stmt (.prepare db sql)
run-fn (gobj/get stmt "run")]
(if (seq params)
(.apply run-fn stmt (to-array params))
(.call run-fn stmt)))
:sqlite-wasm
(.exec db #js {:sql sql
:bind (into-array params)})
nil))
(defn- sqlite-rows
[^js db sql params]
(case (detect-sqlite-mode db)
:better-sqlite
(let [^js stmt (.prepare db sql)
all-fn (gobj/get stmt "all")]
(vec (if (seq params)
(.apply all-fn stmt (to-array params))
(.call all-fn stmt))))
:sqlite-wasm
(let [^js result (.exec db #js {:sql sql
:bind (into-array params)
:rowMode "object"
:returnValue "resultRows"})]
(cond
(nil? result) []
(array? result) (vec result)
(fn? (gobj/get result "toArray")) (vec (.toArray result))
:else []))
[]))
(defn- sqlite-row
[db sql params]
(first (sqlite-rows db sql params)))
(defn- sqlite-with-tx!
[^js db f]
(case (detect-sqlite-mode db)
:better-sqlite
(let [tx-fn (.transaction db (fn [] (f db)))]
(tx-fn))
:sqlite-wasm
(if (fn? (gobj/get db "transaction"))
(.transaction db (fn [tx] (f tx)))
(f db))
(f db)))
(defn ensure-sqlite-schema!
[db]
(when (sqlite-db? db)
(when-not (true? (gobj/get db sqlite-schema-ready-key))
(sqlite-with-tx!
db
(fn [tx]
(sqlite-run! tx sync-meta-table-sql [])
(sqlite-run! tx client-ops-table-sql [])
(sqlite-run! tx pending-index-sql [])
(sqlite-run! tx asset-index-sql [])))
(try
(gobj/set db sqlite-schema-ready-key true)
(catch :default _
nil)))))
(defn- sqlite-get-meta
[db k]
(some-> (sqlite-row db "select value from sync_meta where key = ?" [(name k)])
(aget "value")))
(defn- sqlite-set-meta!
[db k v]
(sqlite-run! db
(str "insert into sync_meta (key, value) values (?, ?)"
" on conflict(key) do update set value = excluded.value")
[(name k) (str v)]))
(defn- sqlite-delete-meta!
[db k]
(sqlite-run! db "delete from sync_meta where key = ?" [(name k)]))
(defn update-graph-uuid
[repo graph-uuid]
{:pre [(some? graph-uuid)]}
(when-let [conn (worker-state/get-client-ops-conn repo)]
(let [old-datoms (d/datoms @conn :avet :graph-uuid)
retractions (mapv (fn [datom]
[:db/retract (:e datom) :graph-uuid (:v datom)])
old-datoms)]
(ldb/transact! conn (conj retractions [:db/add "e" :graph-uuid graph-uuid])))))
(when-let [store (sqlite-store-or-throw repo)]
(sqlite-set-meta! store :graph-uuid graph-uuid)))
(defn get-graph-uuid
[repo]
(when-let [conn (worker-state/get-client-ops-conn repo)]
(:v (first (d/datoms @conn :avet :graph-uuid)))))
(some-> (sqlite-store-or-throw repo)
(sqlite-get-meta :graph-uuid)))
(defn update-local-tx
[repo t]
{:pre [(some? t)]}
(let [conn (worker-state/get-client-ops-conn repo)]
(assert (some? conn) repo)
(let [tx-data
(if-let [datom (first (d/datoms @conn :avet :local-tx))]
[:db/add (:e datom) :local-tx t]
(if-let [datom (first (d/datoms @conn :avet :db-sync/checksum))]
[:db/add (:e datom) :local-tx t]
[:db/add "e" :local-tx t]))]
(ldb/transact! conn [tx-data]))))
(let [store (sqlite-store-or-throw repo)]
(assert (some? store) repo)
(sqlite-set-meta! store :local-tx t)))
(defn update-local-checksum
[repo checksum]
{:pre [(some? checksum)]}
(let [conn (worker-state/get-client-ops-conn repo)]
(assert (some? conn) repo)
(let [tx-data
(if-let [datom (first (d/datoms @conn :avet :db-sync/checksum))]
[:db/add (:e datom) :db-sync/checksum checksum]
(if-let [datom (first (d/datoms @conn :avet :local-tx))]
[:db/add (:e datom) :db-sync/checksum checksum]
[:db/add "e" :db-sync/checksum checksum]))]
(ldb/transact! conn [tx-data]))))
(let [store (sqlite-store-or-throw repo)]
(assert (some? store) repo)
(sqlite-set-meta! store :db-sync/checksum checksum)))
(defn remove-local-tx
[repo]
(when-let [conn (worker-state/get-client-ops-conn repo)]
(when-let [datom (first (d/datoms @conn :avet :local-tx))]
(ldb/transact! conn [[:db/retract (:e datom) :local-tx]]))))
(when-let [store (sqlite-store-or-throw repo)]
(sqlite-delete-meta! store :local-tx)))
(defn get-local-tx
[repo]
(when-let [conn (worker-state/get-client-ops-conn repo)]
(:v (first (d/datoms @conn :avet :local-tx)))))
(when-let [store (sqlite-store-or-throw repo)]
(some-> (sqlite-get-meta store :local-tx)
(js/parseInt 10))))
(defn get-pending-local-tx-count
[repo]
(if-let [cached (get @*repo->pending-local-tx-count repo)]
cached
(let [count' (if-let [conn (worker-state/get-client-ops-conn repo)]
(count (d/datoms @conn :avet :db-sync/pending? true))
(let [count' (if-let [store (sqlite-store-or-throw repo)]
(or (some-> (sqlite-row store
"select count(*) as c from client_ops where kind = 'tx' and pending = 1"
[])
(aget "c"))
0)
0)]
(swap! *repo->pending-local-tx-count assoc repo count')
count')))
@@ -122,9 +280,9 @@
(defn get-local-checksum
[repo]
(let [conn (worker-state/get-client-ops-conn repo)]
(assert (some? conn) repo)
(:v (first (d/datoms @conn :avet :db-sync/checksum)))))
(let [store (sqlite-store-or-throw repo)]
(assert (some? store) repo)
(sqlite-get-meta store :db-sync/checksum)))
(defn rtc-db-graph?
"Is RTC enabled"
@@ -132,12 +290,182 @@
(or (exists? js/process)
(some? (get-graph-uuid repo))))
(defn- row->pending-local-tx
[row]
(let [tx-id (parse-uuid-str (aget row "tx_id"))]
(when tx-id
{:tx-id tx-id
:outliner-op (str->kw (aget row "outliner_op"))
:forward-outliner-ops (sqlite-util/transit-read (aget row "forward_outliner_ops"))
:inverse-outliner-ops (sqlite-util/transit-read (aget row "inverse_outliner_ops"))
:inferred-outliner-ops? (int->bool (aget row "inferred_outliner_ops"))
:db-sync/undo-redo (str->kw (aget row "undo_redo"))
:tx (sqlite-util/transit-read (aget row "normalized_tx_data"))
:reversed-tx (sqlite-util/transit-read (aget row "reversed_tx_data"))})))
(defn upsert-local-tx-entry!
[repo {:keys [tx-id created-at pending? failed? outliner-op undo-redo
forward-outliner-ops inverse-outliner-ops inferred-outliner-ops?
normalized-tx-data reversed-tx-data]
:or {pending? true failed? false}}]
{:pre [(some? tx-id)]}
(let [store (sqlite-store-or-throw repo)]
(assert (some? store) repo)
(let [tx-id-str (str tx-id)
existing (sqlite-row store
"select pending, created_at from client_ops where kind = 'tx' and tx_id = ?"
[tx-id-str])
should-inc-pending? (not= 1 (some-> existing (aget "pending")))
created-at' (or (some-> existing (aget "created_at"))
created-at
(.now js/Date))]
(sqlite-run! store
(str "insert into client_ops ("
"kind, created_at, tx_id, pending, failed, outliner_op, undo_redo, "
"forward_outliner_ops, inverse_outliner_ops, inferred_outliner_ops, "
"normalized_tx_data, reversed_tx_data"
") values ('tx', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
" on conflict(tx_id) do update set "
"created_at = excluded.created_at,"
"pending = excluded.pending,"
"failed = excluded.failed,"
"outliner_op = excluded.outliner_op,"
"undo_redo = excluded.undo_redo,"
"forward_outliner_ops = excluded.forward_outliner_ops,"
"inverse_outliner_ops = excluded.inverse_outliner_ops,"
"inferred_outliner_ops = excluded.inferred_outliner_ops,"
"normalized_tx_data = excluded.normalized_tx_data,"
"reversed_tx_data = excluded.reversed_tx_data")
[created-at'
tx-id-str
(bool->int pending?)
(bool->int failed?)
(kw->str outliner-op)
(kw->str undo-redo)
(sqlite-util/transit-write (or forward-outliner-ops []))
(sqlite-util/transit-write (or inverse-outliner-ops []))
(bool->int inferred-outliner-ops?)
(sqlite-util/transit-write (or normalized-tx-data []))
(sqlite-util/transit-write (or reversed-tx-data []))])
{:tx-id tx-id
:created-at created-at'
:should-inc-pending? should-inc-pending?})))
(defn get-local-tx-entry
[repo tx-id]
(when (uuid? tx-id)
(when-let [store (sqlite-store-or-throw repo)]
(some-> (sqlite-row store
(str "select tx_id, outliner_op, undo_redo, "
"forward_outliner_ops, inverse_outliner_ops, inferred_outliner_ops, "
"normalized_tx_data, reversed_tx_data "
"from client_ops where kind = 'tx' and tx_id = ? limit 1")
[(str tx-id)])
(row->pending-local-tx)))))
(defn get-pending-local-txs
[repo & {:keys [limit]}]
(when-let [store (sqlite-store-or-throw repo)]
(let [sql (str "select tx_id, outliner_op, undo_redo, "
"forward_outliner_ops, inverse_outliner_ops, inferred_outliner_ops, "
"normalized_tx_data, reversed_tx_data "
"from client_ops where kind = 'tx' and pending = 1 "
"order by created_at asc, id asc"
(when (number? limit) " limit ?"))
rows (sqlite-rows store sql (if (number? limit) [limit] []))]
(->> rows
(keep row->pending-local-tx)
vec))))
(defn- pending-tx-id?
[store tx-id]
(let [row (sqlite-row store
"select pending from client_ops where kind = 'tx' and tx_id = ?"
[(str tx-id)])]
(= 1 (some-> row (aget "pending")))))
(defn mark-pending-txs-false!
[repo tx-ids]
(when-let [store (sqlite-store-or-throw repo)]
(let [tx-ids (->> tx-ids (filter uuid?) vec)
pending-to-remove (->> tx-ids
(filter (fn [tx-id]
(pending-tx-id? store tx-id)))
count)]
(when (seq tx-ids)
(doseq [tx-id tx-ids]
(sqlite-run! store
"update client_ops set pending = 0 where kind = 'tx' and tx_id = ?"
[(str tx-id)])))
pending-to-remove)))
(defn mark-failed-txs!
[repo tx-ids]
(when-let [store (sqlite-store-or-throw repo)]
(let [tx-ids (->> tx-ids (filter uuid?) vec)
pending-to-remove (->> tx-ids
(filter (fn [tx-id]
(pending-tx-id? store tx-id)))
count)]
(when (seq tx-ids)
(doseq [tx-id tx-ids]
(sqlite-run! store
"update client_ops set pending = 0, failed = 1 where kind = 'tx' and tx_id = ?"
[(str tx-id)])))
pending-to-remove)))
(defn history-action-ops-by-tx-id
[repo tx-id]
(when-let [entry (get-local-tx-entry repo tx-id)]
{:db-sync/forward-outliner-ops (some-> (:forward-outliner-ops entry) seq vec)
:db-sync/inverse-outliner-ops (some-> (:inverse-outliner-ops entry) seq vec)}))
(defn- local-asset-op-map
[op-type t value]
(let [asset-uuid (:block-uuid value)]
(case op-type
:update-asset {:block/uuid asset-uuid
:update-asset [:update-asset t value]}
:remove-asset {:block/uuid asset-uuid
:remove-asset [:remove-asset t value]}
nil)))
(defn- sqlite-asset-op-by-uuid
[store block-uuid]
(when-let [row (sqlite-row store
(str "select asset_uuid, asset_op, asset_t, asset_value "
"from client_ops where kind = 'asset' and asset_uuid = ? limit 1")
[(str block-uuid)])]
(let [op-type (str->kw (aget row "asset_op"))
t (aget row "asset_t")
value (or (some-> (aget row "asset_value") sqlite-util/transit-read)
{:block-uuid block-uuid})]
(local-asset-op-map op-type t value))))
(defn- sqlite-upsert-asset-op!
[store op-type t value]
(let [block-uuid (:block-uuid value)]
(sqlite-with-tx!
store
(fn [tx]
(sqlite-run! tx "delete from client_ops where kind = 'asset' and asset_uuid = ?"
[(str block-uuid)])
(sqlite-run! tx
(str "insert into client_ops ("
"kind, created_at, asset_uuid, asset_op, asset_t, asset_value"
") values ('asset', ?, ?, ?, ?, ?)")
[(.now js/Date)
(str block-uuid)
(kw->str op-type)
t
(sqlite-util/transit-write value)])))))
;;; asset ops
(defn add-asset-ops
[repo asset-ops]
(let [conn (worker-state/get-client-ops-conn repo)
(let [store (sqlite-store-or-throw repo)
ops (ops-coercer asset-ops)]
(assert (some? conn) repo)
(assert (some? store) repo)
(letfn [(already-removed? [remove-op t]
(some-> remove-op second (> t)))
(update-after-remove? [update-op t]
@@ -145,26 +473,20 @@
(doseq [op ops]
(let [[op-type t value] op
{:keys [block-uuid]} value
exist-block-ops-entity (d/entity @conn [:block/uuid block-uuid])
e (:db/id exist-block-ops-entity)]
(when-let [tx-data
(not-empty
(case op-type
:update-asset
(let [remove-asset-op (get exist-block-ops-entity :remove-asset)]
(when-not (already-removed? remove-asset-op t)
(cond-> [{:block/uuid block-uuid
:db-sync/asset-op? true
:update-asset op}]
remove-asset-op (conj [:db.fn/retractAttribute e :remove-asset]))))
:remove-asset
(let [update-asset-op (get exist-block-ops-entity :update-asset)]
(when-not (update-after-remove? update-asset-op t)
(cond-> [{:block/uuid block-uuid
:db-sync/asset-op? true
:remove-asset op}]
update-asset-op (conj [:db.fn/retractAttribute e :update-asset]))))))]
(ldb/transact! conn tx-data)))))))
existing-op (sqlite-asset-op-by-uuid store block-uuid)]
(case op-type
:update-asset
(let [remove-asset-op (:remove-asset existing-op)]
(when-not (already-removed? remove-asset-op t)
(sqlite-upsert-asset-op! store :update-asset t value)))
:remove-asset
(let [update-asset-op (:update-asset existing-op)]
(when-not (update-after-remove? update-asset-op t)
(sqlite-upsert-asset-op! store :remove-asset t value)))
nil)))
nil)))
(defn add-all-exists-asset-as-ops
[repo]
@@ -172,54 +494,61 @@
_ (assert (some? conn))
asset-block-uuids (->> (d/datoms @conn :avet :logseq.property.asset/type)
(keep (fn [d]
(:block/uuid (d/entity @conn (:e d))))))
ops (map
(fn [block-uuid] [:update-asset 1 {:block-uuid block-uuid}])
asset-block-uuids)]
(:block/uuid (d/entity @conn (:e d)))))
distinct)
ops (map (fn [block-uuid] [:update-asset 1 {:block-uuid block-uuid}])
asset-block-uuids)]
(add-asset-ops repo ops)))
(defn- get-all-asset-ops*
[db]
(->> (d/datoms db :avet :db-sync/asset-op?)
(map (fn [d]
(let [op (d/entity db (:e d))]
[(:e d) (into {} op)])))
(into {})))
(defn get-unpushed-asset-ops-count
[repo]
(when-let [conn (worker-state/get-client-ops-conn repo)]
(count (get-all-asset-ops* @conn))))
(when-let [store (sqlite-store-or-throw repo)]
(or (some-> (sqlite-row store
"select count(*) as c from client_ops where kind = 'asset'"
[])
(aget "c"))
0)))
(defn get-all-asset-ops
[repo]
(when-let [conn (worker-state/get-client-ops-conn repo)]
(vals (get-all-asset-ops* @conn))))
(when-let [store (sqlite-store-or-throw repo)]
(->> (sqlite-rows store
"select asset_op, asset_t, asset_value from client_ops where kind = 'asset' order by id asc"
[])
(keep (fn [row]
(let [op-type (str->kw (aget row "asset_op"))
t (aget row "asset_t")
value (some-> (aget row "asset_value") sqlite-util/transit-read)]
(when (and op-type (map? value) (:block-uuid value))
(local-asset-op-map op-type t value)))))
vec)))
(defn remove-asset-op
[repo asset-uuid]
(when-let [conn (worker-state/get-client-ops-conn repo)]
(let [ent (d/entity @conn [:block/uuid asset-uuid])]
(when-let [e (:db/id ent)]
(ldb/transact! conn [[:db/retractEntity e]])))))
(when-let [store (sqlite-store-or-throw repo)]
(sqlite-run! store
"delete from client_ops where kind = 'asset' and asset_uuid = ?"
[(str asset-uuid)])))
(defn cleanup-finished-history-ops!
[repo protected-tx-ids]
(if-let [conn (worker-state/get-client-ops-conn repo)]
(if-let [store (sqlite-store-or-throw repo)]
(let [protected-tx-ids (set protected-tx-ids)
tx-ent-ids (->> (d/datoms @conn :avet :db-sync/tx-id)
(keep (fn [datom]
(let [tx-id (:v datom)
ent (d/entity @conn (:e datom))]
(when (and (uuid? tx-id)
(false? (:db-sync/pending? ent))
(not (contains? protected-tx-ids tx-id)))
(:db/id ent)))))
vec)]
(when (seq tx-ent-ids)
(ldb/transact! conn
(mapv (fn [ent-id]
[:db/retractEntity ent-id])
tx-ent-ids)))
(count tx-ent-ids))
tx-id-rows (sqlite-rows store
(str "select tx_id from client_ops "
"where kind = 'tx' and pending = 0 and tx_id is not null")
[])
removable-tx-ids (->> tx-id-rows
(keep (fn [row]
(let [tx-id (parse-uuid-str (aget row "tx_id"))]
(when (and (uuid? tx-id)
(not (contains? protected-tx-ids tx-id)))
tx-id))))
vec)]
(when (seq removable-tx-ids)
(doseq [tx-id removable-tx-ids]
(sqlite-run! store
"delete from client_ops where kind = 'tx' and tx_id = ?"
[(str tx-id)])))
(count removable-tx-ids))
0))

View File

@@ -1,7 +1,6 @@
(ns frontend.worker.sync.handle-message
"WebSocket message handlers for db sync."
(:require [datascript.core :as d]
[frontend.worker.shared-service :as shared-service]
(:require [frontend.worker.shared-service :as shared-service]
[frontend.worker.state :as worker-state]
[frontend.worker.sync.apply-txs :as sync-apply]
[frontend.worker.sync.assets :as sync-assets]
@@ -21,10 +20,6 @@
(log/error tag data)
(throw (ex-info (name tag) data)))
(defn- client-ops-conn
[repo]
(sync-presence/client-ops-conn worker-state/get-client-ops-conn repo))
(defn- sync-counts
[repo]
(sync-presence/sync-counts
@@ -137,12 +132,7 @@
(defn- pending-local-tx?
[repo]
(when-let [conn (client-ops-conn repo)]
(boolean
(some (fn [datom]
(let [ent (d/entity @conn (:e datom))]
(not= false (:db-sync/pending? ent))))
(d/datoms @conn :avet :db-sync/created-at)))))
(pos? (or (client-op/get-pending-local-tx-count repo) 0)))
(defn- checksum-compare-ready?
[repo client local-t remote-t]

View File

@@ -1,7 +1,6 @@
(ns frontend.worker.sync.presence
"Presence and rtc state helpers for db sync."
(:require [datascript.core :as d]
[logseq.common.util :as common-util]))
(:require [logseq.common.util :as common-util]))
(defn current-client
[db-sync-client repo]
@@ -15,7 +14,6 @@
(defn sync-counts
[{:keys [get-datascript-conn
get-client-ops-conn
get-pending-local-tx-count
get-unpushed-asset-ops-count
get-local-tx
@@ -27,8 +25,7 @@
(when (get-datascript-conn repo)
(let [pending-local (if get-pending-local-tx-count
(get-pending-local-tx-count repo)
(when-let [conn (client-ops-conn get-client-ops-conn repo)]
(count (d/datoms @conn :avet :db-sync/pending? true))))
0)
pending-asset (get-unpushed-asset-ops-count repo)
local-tx (get-local-tx repo)
remote-tx (get latest-remote-tx repo)

View File

@@ -2,6 +2,7 @@
"Undo redo new implementation"
(:require [datascript.core :as d]
[frontend.worker.state :as worker-state]
[frontend.worker.sync.client-op :as client-op]
[lambdaisland.glogi :as log]
[logseq.common.defkeywords :refer [defkeywords]]
[malli.core :as m]
@@ -302,10 +303,7 @@
(defn- pending-history-action-ops
[repo tx-id]
(when (uuid? tx-id)
(when-let [conn (get @worker-state/*client-ops-conns repo)]
(when-let [ent (d/entity @conn [:db-sync/tx-id tx-id])]
{:db-sync/forward-outliner-ops (some-> (:db-sync/forward-outliner-ops ent) seq vec)
:db-sync/inverse-outliner-ops (some-> (:db-sync/inverse-outliner-ops ent) seq vec)}))))
(client-op/history-action-ops-by-tx-id repo tx-id)))
(defn gen-undo-ops!
[repo {:keys [tx-data tx-meta db-after db-before]} tx-id