Files
logseq/deps/db/script/replay_sync_sqlite.cljs
megayu 79c25837cb Migrate from yarn to pnpm (#12529)
* migrate yarn to pnpm

* chore: update pnpm version to 10.33.0 across all package.json files

* chore: update .npmrc and package.json for improved dependency management

* chore: unify Clojure, Node, and Java version in workflow files

* fix: enable shamefully-hoist for now and add electron, keytar to onlyBuiltDependencies

* feat: add cider/piggieback dependency and update nREPL middleware configuration to silence warnings

* ensure pnpm setup prior to node setup

* fix: update logseq/bb-tasks git SHA

* feat: add pnpm configuration for onlyBuiltDependencies in package.json

* feat: add onlyBuiltDependencies configuration for better-sqlite3 in pnpm settings

* chore: update pnpm lockfile

* fix: resolve merge conflicts

* fix: remove invisible characters from markdown headers

* fix: update .npmrc comments for clarity on lockfile usage

* Revert "feat: add cider/piggieback dependency and update nREPL middleware configuration to silence warnings"

This reverts commit 70a111936f.

* fix: remove invisible characters from various README files and add .editorconfig

* fix: clarify lockfile resolution process in SKILL.md

---------

Co-authored-by: Tienson Qin <tiensonqin@gmail.com>
2026-04-24 23:40:25 +08:00

1025 lines
38 KiB
Clojure

(ns replay-sync-sqlite
"Replay db-sync rebase/apply flow directly from sqlite artifacts.
It replays:
1) server tx_log baseline up to before remote window,
2) local client ops as applied local changes,
3) reverse local ops,
4) apply remote txs,
5) reapply local ops.
Supports `legacy` vs `fixed` reapply fallback logic to verify behavior."
(:require ["better-sqlite3" :as sqlite3]
["fs" :as fs]
["path" :as node-path]
[babashka.cli :as cli]
[clojure.string :as string]
[datascript.core :as d]
[logseq.db :as ldb]
[logseq.db.common.sqlite-cli :as sqlite-cli]
[logseq.db.frontend.property.type :as db-property-type]
[logseq.db.frontend.schema :as db-schema]
[logseq.outliner.core :as outliner-core]
[logseq.outliner.op :as outliner-op]
[logseq.outliner.page :as outliner-page]
[logseq.outliner.property :as outliner-property]
[logseq.outliner.recycle :as outliner-recycle]
[nbb.core :as nbb]))
(def sqlite (if (find-ns 'nbb.core) (aget sqlite3 "default") sqlite3))
(def cli-spec
{:help {:alias :h
:desc "Show help"}
:server-db {:alias :s
:desc "Path to server graph db.sqlite containing tx_log"
:coerce :string}
:client-ops-db {:alias :c
:desc "Path to client ops sqlite"
:coerce :string}
:from-t {:alias :f
:desc "Replay remote txs from this t (inclusive). If omitted, use --auto-from-conflict"
:coerce :long}
:to-t {:alias :t
:desc "Replay remote txs to this t (inclusive). Default: all after from-t"
:coerce :long}
:remote-limit {:alias :l
:desc "Cap remote tx count after from-t"
:coerce :long}
:mode {:alias :m
:desc "fixed | legacy | both (default both)"
:coerce :string}
:pending-only {:desc "Only include rows where :db-sync/pending? true"}
:local-created-at-min {:desc "Filter local ops by created-at >= value"
:coerce :long}
:local-created-at-max {:desc "Filter local ops by created-at <= value"
:coerce :long}
:local-tx-id {:desc "Repeatable tx-id UUID filter"
:coerce []}
:auto-from-conflict {:desc "Infer from-t from first remote tx referencing uuids deleted by selected local ops"}
:inspect-only {:desc "Only print inferred conflict info without replay"}
:pretty {:desc "Pretty-print JSON output"}})
(def local-op-keys
[:db/id
:db-sync/tx-id
:db-sync/created-at
:db-sync/pending?
:db-sync/failed?
:db-sync/outliner-op
:db-sync/forward-outliner-ops
:db-sync/inverse-outliner-ops
:db-sync/inferred-outliner-ops?
:db-sync/normalized-tx-data
:db-sync/reversed-tx-data])
(defn usage []
(str "Usage:\n"
" pnpm exec nbb-logseq -cp src:../outliner/src:../common/src:../graph-parser/src script/replay_sync_sqlite.cljs \\\n"
" --server-db <server-db.sqlite> --client-ops-db <client-ops.sqlite> [--from-t <n>] [--to-t <n>] [--mode both] [--pretty]\n"
"\n"
"Examples:\n"
" pnpm exec nbb-logseq -cp src:../outliner/src:../common/src:../graph-parser/src script/replay_sync_sqlite.cljs \\\n"
" --server-db /path/server/db.sqlite --client-ops-db /path/electron_client_ops.sqlite \\\n"
" --auto-from-conflict --local-created-at-max 1775717890000 --mode both --pretty\n"
"\n"
"Options:\n"
(cli/format-opts {:spec cli-spec})))
(defn resolve-path
[path*]
(if (node-path/isAbsolute path*)
path*
(node-path/join (or js/process.env.ORIGINAL_PWD ".") path*)))
(defn parse-uuid
[value]
(cond
(uuid? value)
value
(string? value)
(try
(uuid value)
(catch :default _
nil))
:else
nil))
(defn uuid-ref
[value]
(cond
(uuid? value)
value
(string? value)
(parse-uuid value)
(and (vector? value)
(= :block/uuid (first value)))
(parse-uuid (second value))
(map? value)
(some-> (:block/uuid value) parse-uuid)
:else
nil))
(defn collect-uuids
[value]
(cond
(nil? value)
[]
(uuid? value)
[value]
(string? value)
(if-let [u (parse-uuid value)] [u] [])
(vector? value)
(if (and (= :block/uuid (first value))
(some? (second value)))
(if-let [u (parse-uuid (second value))]
[u]
[])
(mapcat collect-uuids value))
(set? value)
(mapcat collect-uuids value)
(sequential? value)
(mapcat collect-uuids value)
(map? value)
(mapcat collect-uuids (vals value))
:else
[]))
(defn read-server-tx-log
[server-db-path]
(let [db (new sqlite server-db-path nil)]
(try
(->> (.all (.prepare db "select t, tx, outliner_op from tx_log order by t asc"))
(mapv (fn [row]
(let [t (aget row "t")
tx-str (aget row "tx")
outliner-op (aget row "outliner_op")]
{:t t
:outliner-op (when (string? outliner-op)
(keyword outliner-op))
:tx-data (ldb/read-transit-str tx-str)}))))
(finally
(.close db)))))
(defn entity->local-op
[db eid]
(let [ent (d/entity db eid)
m (into {} ent)]
{:db/id (:db/id ent)
:tx-id (:db-sync/tx-id ent)
:created-at (:db-sync/created-at ent)
:pending? (:db-sync/pending? ent)
:failed? (:db-sync/failed? ent)
:outliner-op (:db-sync/outliner-op ent)
:forward-outliner-ops (:db-sync/forward-outliner-ops ent)
:inverse-outliner-ops (:db-sync/inverse-outliner-ops ent)
:inferred-outliner-ops? (:db-sync/inferred-outliner-ops? ent)
:tx (:db-sync/normalized-tx-data ent)
:reversed-tx (:db-sync/reversed-tx-data ent)
:raw (select-keys m local-op-keys)}))
(defn read-client-ops
[client-ops-db-path]
(let [{:keys [conn sqlite]} (sqlite-cli/open-sqlite-datascript! client-ops-db-path)]
(try
(let [db @conn]
(->> (d/q '[:find ?e ?created-at
:where
[?e :db-sync/tx-id _]
[?e :db-sync/created-at ?created-at]]
db)
(sort-by (fn [[e created-at]] [created-at e]))
(mapv (fn [[e _]]
(entity->local-op db e)))))
(finally
(when sqlite
(.close sqlite))))))
(defn parse-tx-id-set
[tx-id-values]
(let [values (if (sequential? tx-id-values) tx-id-values [])]
(->> values
(mapcat (fn [v]
(if (string? v)
(remove string/blank? (string/split v #","))
[])))
(map parse-uuid)
(remove nil?)
set)))
(defn filter-local-ops
[ops {:keys [pending-only local-created-at-min local-created-at-max local-tx-id]}]
(let [tx-id-set (parse-tx-id-set local-tx-id)]
(->> ops
(filter (fn [op]
(and
(if pending-only
(true? (:pending? op))
true)
(if (some? local-created-at-min)
(>= (or (:created-at op) -1) local-created-at-min)
true)
(if (some? local-created-at-max)
(<= (or (:created-at op) js/Number.MAX_SAFE_INTEGER) local-created-at-max)
true)
(if (seq tx-id-set)
(contains? tx-id-set (:tx-id op))
true))))
vec)))
(defn delete-op-uuids
[local-ops]
(->> local-ops
(mapcat :forward-outliner-ops)
(filter (fn [op] (= :delete-blocks (first op))))
(mapcat (fn [[_ args]]
(let [[ids] args]
(if (sequential? ids) ids []))))
(map uuid-ref)
(remove nil?)
set))
(defn row-referenced-uuids
[row]
(->> (:tx-data row)
(mapcat collect-uuids)
set))
(defn infer-conflicts
[server-rows local-ops]
(let [deleted-uuids (delete-op-uuids local-ops)
conflicts (when (seq deleted-uuids)
(->> server-rows
(keep (fn [row]
(let [refs (row-referenced-uuids row)
hit (seq (clojure.set/intersection deleted-uuids refs))]
(when hit
{:t (:t row)
:outliner-op (:outliner-op row)
:hits (vec (take 10 hit))}))))
vec))]
{:deleted-uuids (vec deleted-uuids)
:conflicts conflicts
:first-conflict-t (some-> conflicts first :t)}))
(defn select-remote-rows
[server-rows from-t to-t remote-limit]
(let [rows (->> server-rows
(filter (fn [{:keys [t]}]
(and (if (some? from-t) (>= t from-t) true)
(if (some? to-t) (<= t to-t) true))))
vec)]
(if (some? remote-limit)
(vec (take remote-limit rows))
rows)))
(defn baseline-rows
[server-rows from-t]
(->> server-rows
(filter (fn [{:keys [t]}]
(if (some? from-t) (< t from-t) true)))
vec))
(defn usable-history-ops
[ops]
ops)
(defn entity-id->block-uuid
[db id]
(or (uuid-ref id)
(some-> (d/entity db id) :block/uuid)))
(defn delete-block-root-uuids
[db forward-outliner-ops]
(->> (usable-history-ops forward-outliner-ops)
(mapcat (fn [[op args]]
(if (= :delete-blocks op)
(let [[ids] args]
(if (sequential? ids)
(keep #(entity-id->block-uuid db %) ids)
[]))
[])))
distinct
vec))
(defn raw-restored-block-uuids
[raw-tx-data]
(->> raw-tx-data
(keep (fn [datom]
(when (and (vector? datom)
(>= (count datom) 4)
(= :db/add (nth datom 0))
(= :block/uuid (nth datom 2)))
(uuid-ref (nth datom 3)))))
distinct
vec))
(defn assert-delete-block-roots-restored!
[conn local-tx]
(let [root-uuids (delete-block-root-uuids @conn (:forward-outliner-ops local-tx))]
(when (seq root-uuids)
(let [missing-root-uuids (->> root-uuids
(remove #(d/entity @conn [:block/uuid %]))
vec)]
(when (seq missing-root-uuids)
(throw (ex-info "incomplete delete-blocks reverse"
{:error :db-sync/incomplete-delete-blocks-reverse
:tx-id (:tx-id local-tx)
:outliner-op (:outliner-op local-tx)
:root-uuids root-uuids
:missing-root-uuids missing-root-uuids})))))))
(defn assert-raw-restored-block-uuids!
[conn local-tx raw-tx-data]
(let [restored-uuids (raw-restored-block-uuids raw-tx-data)]
(when (seq restored-uuids)
(let [missing-restored-uuids (->> restored-uuids
(remove #(d/entity @conn [:block/uuid %]))
vec)]
(when (seq missing-restored-uuids)
(throw (ex-info "incomplete raw restored uuids"
{:error :db-sync/incomplete-raw-restored-uuids
:tx-id (:tx-id local-tx)
:outliner-op (:outliner-op local-tx)
:restored-uuids restored-uuids
:missing-restored-uuids missing-restored-uuids})))))))
(defn invalid-rebase-op!
[op data]
(throw (ex-info "invalid rebase op" (assoc data :op op))))
(defn replay-entity-id-value
[db v]
(cond
(number? v)
v
(uuid? v)
(some-> (d/entity db [:block/uuid v]) :db/id)
(or (vector? v) (qualified-keyword? v))
(some-> (d/entity db v) :db/id)
:else
v))
(defn stable-entity-ref-like?
[v]
(or (qualified-keyword? v)
(and (vector? v)
(or (= :block/uuid (first v))
(= :db/ident (first v))))))
(defn replay-property-value
[db property-id v]
(let [property-type (some-> (d/entity db property-id) :logseq.property/type)]
(if (contains? db-property-type/all-ref-property-types property-type)
(cond
(stable-entity-ref-like? v)
(replay-entity-id-value db v)
(set? v)
(->> v
(map #(if (stable-entity-ref-like? %)
(replay-entity-id-value db %)
%))
set)
(sequential? v)
(mapv #(if (stable-entity-ref-like? %)
(replay-entity-id-value db %)
%)
v)
:else
v)
v)))
(defn replay-entity-id-coll
[db ids]
(mapv #(or (replay-entity-id-value db %) %) ids))
(defn rebase-find-existing-left-sibling
[current-db target]
(loop [sibling (ldb/get-left-sibling target)]
(if (nil? sibling)
nil
(if-let [current-sibling (and sibling (d/entity current-db [:block/uuid (:block/uuid sibling)]))]
current-sibling
(recur (ldb/get-left-sibling sibling))))))
(defn rebase-resolve-target-and-sibling
[current-db rebase-db-before target-id sibling?]
(let [target (d/entity current-db target-id)
target-before (when rebase-db-before
(d/entity rebase-db-before target-id))
parent-before (when rebase-db-before
(:block/parent (d/entity rebase-db-before target-id)))]
(cond
target
[target sibling?]
(and target-before parent-before sibling?)
(if-let [left-sibling (rebase-find-existing-left-sibling current-db target-before)]
[left-sibling true]
(when-let [parent (d/entity current-db [:block/uuid (:block/uuid parent-before)])]
[parent false]))
:else
nil)))
(defn template-parent-ref
[parent]
(cond
(and (vector? parent) (= :block/uuid (first parent)))
parent
(uuid? parent)
[:block/uuid parent]
(and (map? parent) (uuid? (:block/uuid parent)))
[:block/uuid (:block/uuid parent)]
:else
parent))
(defn sanitize-template-block
[current-db rebase-db-before block]
(let [m (into {} block)
block-id (:db/id m)
block-uuid (or (:block/uuid m)
(when (number? block-id)
(or (some-> rebase-db-before (d/entity block-id) :block/uuid)
(some-> (d/entity current-db block-id) :block/uuid)))
(when (and (vector? block-id)
(= :block/uuid (first block-id))
(uuid? (second block-id)))
(second block-id)))]
(cond-> (-> m
(dissoc :db/id :block/order :block/page :block/tx-id)
(update :block/parent template-parent-ref))
(uuid? block-uuid)
(assoc :block/uuid block-uuid))))
(defn replay-canonical-outliner-op!
[conn [op args] rebase-db-before]
(case op
:save-block
(let [[block opts] args
db @conn
block-uuid (:block/uuid block)
block-ent (when block-uuid
(d/entity db [:block/uuid block-uuid]))
block-base (dissoc block :db/id :block/order)
block' block-base]
(when (nil? block-ent)
(invalid-rebase-op! op {:args args
:reason :missing-block}))
(outliner-core/save-block! conn block' opts))
:insert-blocks
(let [[blocks target-id opts] args
db @conn
[target sibling?] (rebase-resolve-target-and-sibling db rebase-db-before target-id (:sibling? opts))]
(when-not (and target (seq blocks))
(invalid-rebase-op! op {:args args}))
(outliner-core/insert-blocks! conn blocks target (assoc opts :sibling? sibling?)))
:apply-template
(let [[template-id target-id opts] args
template-id' (replay-entity-id-value @conn template-id)
target-id' (replay-entity-id-value @conn target-id)
[target sibling?] (rebase-resolve-target-and-sibling @conn rebase-db-before target-id' (:sibling? opts))]
(when-not (and template-id' (d/entity @conn template-id') target)
(invalid-rebase-op! op {:args args
:reason :missing-template-or-target-block}))
(let [template-uuid (:block/uuid (d/entity @conn template-id'))
target-uuid (:block/uuid target)]
(when-not (and (uuid? template-uuid) (uuid? target-uuid))
(invalid-rebase-op! op {:args args
:reason :missing-template-or-target-uuid}))
(let [template-blocks' (some->> (:template-blocks opts)
(mapv #(sanitize-template-block @conn rebase-db-before %))
(filter :block/uuid)
seq)
opts' (cond-> (-> opts
(assoc :sibling? sibling?)
(dissoc :template-blocks))
template-blocks'
(assoc :template-blocks template-blocks'))]
(outliner-op/apply-ops!
conn
[[:apply-template [template-uuid
target-uuid
opts']]]
{:gen-undo-ops? false}))))
:move-blocks
(let [[ids target-id opts] args
ids' (replay-entity-id-coll @conn ids)
target-id' (replay-entity-id-value @conn target-id)
blocks (keep #(d/entity @conn %) ids')
[target sibling?] (rebase-resolve-target-and-sibling @conn rebase-db-before target-id' (:sibling? opts))]
(when (or (empty? blocks) (nil? target))
(invalid-rebase-op! op {:args args}))
(when (seq blocks)
(outliner-core/move-blocks! conn blocks target (assoc opts :sibling? sibling?))))
:move-blocks-up-down
(let [[ids up?] args
ids' (replay-entity-id-coll @conn ids)
blocks (keep #(d/entity @conn %) ids')]
(when (seq blocks)
(outliner-core/move-blocks-up-down! conn blocks up?)))
:indent-outdent-blocks
(let [[ids indent? opts] args
ids' (replay-entity-id-coll @conn ids)
blocks (keep #(d/entity @conn %) ids')]
(when (empty? blocks)
(invalid-rebase-op! op {:args args}))
(when (seq blocks)
(outliner-core/indent-outdent-blocks! conn blocks indent? opts)))
:delete-blocks
(let [[ids opts] args
ids' (replay-entity-id-coll @conn ids)
blocks (keep #(d/entity @conn %) ids')]
(when (seq blocks)
(outliner-core/delete-blocks! conn blocks opts)))
:create-page
(let [[title opts] args]
(outliner-page/create! conn title opts))
:delete-page
(let [[page-uuid opts] args]
(outliner-page/delete! conn page-uuid opts))
:restore-recycled
(let [[root-id] args
root-ref (cond
(and (vector? root-id)
(= :block/uuid (first root-id)))
root-id
(uuid? root-id)
[:block/uuid root-id]
:else
root-id)
root (d/entity @conn root-ref)
tx-data (when root
(seq (outliner-recycle/restore-tx-data @conn root)))]
(when-not tx-data
(invalid-rebase-op! op {:args args
:reason :invalid-restore-target}))
(ldb/transact! conn tx-data
{:outliner-op :restore-recycled}))
:recycle-delete-permanently
(let [[root-id] args
root-ref (cond
(and (vector? root-id)
(= :block/uuid (first root-id)))
root-id
(uuid? root-id)
[:block/uuid root-id]
:else
root-id)
root (d/entity @conn root-ref)
tx-data (when root
(seq (outliner-recycle/permanently-delete-tx-data @conn root)))]
(when (seq tx-data)
(ldb/transact! conn tx-data
{:outliner-op :recycle-delete-permanently})))
:set-block-property
(let [[block-eid property-id v] args
block-eid' (or (replay-entity-id-value @conn block-eid)
block-eid)
block (d/entity @conn block-eid')
property (d/entity @conn property-id)
_ (when-not (and block property)
(invalid-rebase-op! op {:args args
:reason :missing-block-or-property}))
v' (replay-property-value @conn property-id v)]
(when (and (stable-entity-ref-like? v) (nil? v'))
(invalid-rebase-op! op {:args args}))
(outliner-property/set-block-property! conn block-eid' property-id v'))
:remove-block-property
(apply outliner-property/remove-block-property! conn args)
:batch-set-property
(let [[block-ids property-id v opts] args
block-ids' (replay-entity-id-coll @conn block-ids)
property (d/entity @conn property-id)
_ (when-not (and property
(seq block-ids')
(every? #(some? (d/entity @conn %)) block-ids'))
(invalid-rebase-op! op {:args args
:reason :missing-block-or-property}))
v' (replay-property-value @conn property-id v)]
(when (and (stable-entity-ref-like? v) (nil? v'))
(invalid-rebase-op! op {:args args}))
(outliner-property/batch-set-property! conn block-ids' property-id v' opts))
:batch-remove-property
(let [[block-ids property-id] args
block-ids' (replay-entity-id-coll @conn block-ids)]
(outliner-property/batch-remove-property! conn block-ids' property-id))
:delete-property-value
(let [[block-eid property-id property-value] args
block (d/entity @conn block-eid)
property (d/entity @conn property-id)
_ (when-not (and block property)
(invalid-rebase-op! op {:args args
:reason :missing-block-or-property}))
property-value' (replay-property-value @conn property-id property-value)]
(when (and (stable-entity-ref-like? property-value) (nil? property-value'))
(invalid-rebase-op! op {:args args}))
(outliner-property/delete-property-value! conn block-eid property-id property-value'))
:batch-delete-property-value
(let [[block-eids property-id property-value] args
block-eids' (replay-entity-id-coll @conn block-eids)
property (d/entity @conn property-id)
_ (when-not (and property
(seq block-eids')
(every? #(some? (d/entity @conn %)) block-eids'))
(invalid-rebase-op! op {:args args
:reason :missing-block-or-property}))
property-value' (replay-property-value @conn property-id property-value)]
(when (and (stable-entity-ref-like? property-value) (nil? property-value'))
(invalid-rebase-op! op {:args args}))
(outliner-property/batch-delete-property-value! conn block-eids' property-id property-value'))
:create-property-text-block
(apply outliner-property/create-property-text-block! conn args)
:upsert-property
(apply outliner-property/upsert-property! conn args)
:class-add-property
(apply outliner-property/class-add-property! conn args)
:class-remove-property
(apply outliner-property/class-remove-property! conn args)
:upsert-closed-value
(apply outliner-property/upsert-closed-value! conn args)
:add-existing-values-to-closed-values
(apply outliner-property/add-existing-values-to-closed-values! conn args)
:delete-closed-value
(apply outliner-property/delete-closed-value! conn args)
(let [tx-data (:tx args)]
(when-let [tx-data (seq tx-data)]
(ldb/transact! conn tx-data {:outliner-op :transact})))))
(defn replace-uuid-str-with-eid
[db v]
(if-let [u (and (string? v) (parse-uuid v))]
(if-let [entity (d/entity db [:block/uuid u])]
(:db/id entity)
v)
v))
(defn resolve-temp-id
[db datom-v]
(if (and (vector? datom-v)
(= (count datom-v) 5)
(= (first datom-v) :db/add))
(let [[op e a v t] datom-v
e' (replace-uuid-str-with-eid db e)
v' (replace-uuid-str-with-eid db v)]
[op e' a v' t])
datom-v))
(defn reverse-local-tx!
[conn local-tx]
(let [preserve-during-rebase? (or (= :create-page (:outliner-op local-tx))
(and (= 1 (count (:forward-outliner-ops local-tx)))
(= :create-page (ffirst (:forward-outliner-ops local-tx)))))
inferred-history? (true? (:inferred-outliner-ops? local-tx))
inverse-ops (usable-history-ops (:inverse-outliner-ops local-tx))
raw-tx-data (seq (:reversed-tx local-tx))]
(cond
preserve-during-rebase?
{:tx-id (:tx-id local-tx)
:status :preserved}
(and inferred-history? raw-tx-data)
(try
(ldb/transact! conn raw-tx-data
{:outliner-op (:outliner-op local-tx)
:reverse? true})
(catch :default error
(if (seq inverse-ops)
(do
(doseq [op inverse-ops]
(replay-canonical-outliner-op! conn op nil))
(assert-delete-block-roots-restored! conn local-tx)
(assert-raw-restored-block-uuids! conn local-tx raw-tx-data))
(throw error))))
(seq inverse-ops)
(try
(doseq [op inverse-ops]
(replay-canonical-outliner-op! conn op nil))
(assert-delete-block-roots-restored! conn local-tx)
(assert-raw-restored-block-uuids! conn local-tx raw-tx-data)
(catch :default error
(if raw-tx-data
(ldb/transact! conn raw-tx-data
{:outliner-op (:outliner-op local-tx)
:reverse? true})
(throw error))))
raw-tx-data
(ldb/transact! conn raw-tx-data
{:outliner-op (:outliner-op local-tx)
:reverse? true})
:else
(invalid-rebase-op! :reverse-history-action
{:reason :missing-reversed-tx-data
:tx-id (:tx-id local-tx)
:outliner-op (:outliner-op local-tx)}))))
(defn rebase-history-ops
[_mode local-tx]
(let [forward-outliner-ops (seq (:forward-outliner-ops local-tx))
inverse-outliner-ops (seq (:inverse-outliner-ops local-tx))
forward-ops forward-outliner-ops
inverse-ops inverse-outliner-ops]
{:forward-ops forward-ops
:inverse-ops inverse-ops
:fallback? false}))
(defn transact-remote-txs!
[conn remote-rows]
(loop [remaining remote-rows
idx 0]
(let [db @conn]
(if-let [row (first remaining)]
(let [tx-data (->> (:tx-data row)
(map (partial resolve-temp-id db))
seq)
pre-missing-entity-id (when-let [entity-id (some-> tx-data first second)]
(when (and (vector? entity-id)
(every? (fn [datom]
(= entity-id (second datom)))
tx-data)
(nil? (d/entity db entity-id)))
entity-id))]
(if pre-missing-entity-id
nil
(try
(when tx-data
(ldb/transact! conn tx-data {:transact-remote? true}))
(catch :default e
(let [error-data (or (ex-data e) {})
missing-entity-id (:entity-id error-data)
missing-entity-only-tx? (and (= :entity-id/missing (:error error-data))
(vector? missing-entity-id)
(seq tx-data)
(every? (fn [datom]
(= missing-entity-id (second datom)))
tx-data))]
(when-not missing-entity-only-tx?
(throw (ex-info "remote transact failed"
{:stage :remote
:index idx
:t (:t row)
:outliner-op (:outliner-op row)}
e)))))))
(recur (next remaining) (inc idx)))
nil))))
(defn seed-local-txs!
[conn local-ops]
(doseq [local local-ops]
(let [forward-ops (usable-history-ops (:forward-outliner-ops local))
raw-tx (seq (:tx local))]
(try
(cond
(seq forward-ops)
(doseq [op forward-ops]
(replay-canonical-outliner-op! conn op nil))
raw-tx
(ldb/transact! conn raw-tx {:seed-local? true
:outliner-op (:outliner-op local)})
:else
nil)
(catch :default e
(throw (ex-info "seed local tx failed"
{:stage :seed-local
:tx-id (:tx-id local)
:outliner-op (:outliner-op local)
:seed-source (cond
(seq forward-ops) :semantic
raw-tx :raw
:else :none)}
e)))))))
(defn reapply-local-tx!
[mode conn local-tx rebase-db-before preserved-tx-ids]
(if (contains? preserved-tx-ids (:tx-id local-tx))
{:tx-id (:tx-id local-tx)
:status :preserved
:fallback? false
:forward-op-count 0
:inverse-op-count 0}
(let [{:keys [forward-ops inverse-ops fallback?]} (rebase-history-ops mode local-tx)]
(if (seq forward-ops)
(try
(doseq [op forward-ops]
(replay-canonical-outliner-op! conn op rebase-db-before))
{:tx-id (:tx-id local-tx)
:status :rebased
:fallback? fallback?
:inverse-op-count (count inverse-ops)
:forward-op-count (count forward-ops)}
(catch :default error
{:tx-id (:tx-id local-tx)
:status :failed
:fallback? fallback?
:error (ex-message error)
:error-data (select-keys (or (ex-data error) {})
[:op :reason :error :entity-id])}))
{:tx-id (:tx-id local-tx)
:status :skipped
:fallback? false
:forward-op-count 0
:inverse-op-count (count inverse-ops)}))))
(defn replay-mode!
[{:keys [mode baseline remote local]}]
(let [conn (d/create-conn db-schema/schema)
*stage (atom :baseline)
*current (atom nil)]
(try
;; Replay can pass through transient intermediate states; disable strict
;; validation so we can observe end-to-end conflict handling behavior.
(swap! conn assoc :skip-validate-db? true)
(doseq [row baseline]
(when-let [tx-data (seq (:tx-data row))]
(reset! *current {:t (:t row)})
(ldb/transact! conn tx-data {:server-baseline? true :t (:t row)})))
(reset! *stage :seed-local)
(seed-local-txs! conn local)
(let [rebase-db-before @conn]
(reset! *stage :reverse-local)
(let [reverse-results (mapv (fn [local-tx]
(reset! *current {:tx-id (:tx-id local-tx)})
(reverse-local-tx! conn local-tx))
(reverse local))
preserved-tx-ids (->> reverse-results
(filter #(= :preserved (:status %)))
(keep :tx-id)
set)]
(reset! *stage :remote)
(transact-remote-txs! conn remote)
(reset! *stage :reapply-local)
(let [reapply-results (mapv (fn [local-tx]
(reset! *current {:tx-id (:tx-id local-tx)})
(reapply-local-tx! mode conn local-tx rebase-db-before preserved-tx-ids))
local)]
{:mode (name mode)
:ok? true
:baseline-count (count baseline)
:remote-count (count remote)
:local-count (count local)
:reapply-results reapply-results
:reapply-failed-count (count (filter #(= :failed (:status %)) reapply-results))
:final-datom-count (count (d/datoms @conn :eavt))})))
(catch :default e
{:mode (name mode)
:ok? false
:stage @*stage
:current @*current
:error (ex-message e)
:error-data (select-keys (or (ex-data e) {})
[:stage :t :index :tx-id :op :reason :error :entity-id])}))))
(defn mode-seq
[mode-option]
(case (some-> mode-option string/lower-case)
"legacy" [:legacy]
"fixed" [:fixed]
"both" [:legacy :fixed]
nil [:legacy :fixed]
[:legacy :fixed]))
(defn ensure-exists!
[label path]
(when-not (.existsSync fs path)
(binding [*print-fn* *print-err-fn*]
(println (str label " does not exist: " path)))
(js/process.exit 1)))
(defn -main
[argv]
(let [{:keys [opts]} (cli/parse-args argv {:spec cli-spec})
{:keys [server-db client-ops-db from-t to-t remote-limit
auto-from-conflict inspect-only pretty] :as opts} opts]
(when (:help opts)
(println (usage))
(js/process.exit 0))
(when (or (string/blank? server-db) (string/blank? client-ops-db))
(binding [*print-fn* *print-err-fn*]
(println "Missing required --server-db and/or --client-ops-db"))
(println (usage))
(js/process.exit 1))
(let [server-db' (resolve-path server-db)
client-ops-db' (resolve-path client-ops-db)]
(ensure-exists! "server-db" server-db')
(ensure-exists! "client-ops-db" client-ops-db')
(let [server-rows (read-server-tx-log server-db')
all-local-ops (read-client-ops client-ops-db')
local-ops (filter-local-ops all-local-ops opts)
conflicts (infer-conflicts server-rows local-ops)
inferred-from-t (when auto-from-conflict
(:first-conflict-t conflicts))
effective-from-t (or from-t inferred-from-t)]
(when (and (not inspect-only) (nil? effective-from-t))
(binding [*print-fn* *print-err-fn*]
(println "Missing --from-t and failed to infer via --auto-from-conflict"))
(js/process.exit 1))
(if inspect-only
(let [payload {:server-db server-db'
:client-ops-db client-ops-db'
:server-tx-count (count server-rows)
:local-op-count (count local-ops)
:deleted-uuids (mapv str (:deleted-uuids conflicts))
:first-conflict-t (:first-conflict-t conflicts)
:conflicts (mapv (fn [c]
(update c :hits #(mapv str %)))
(or (:conflicts conflicts) []))}]
(if pretty
(println (js/JSON.stringify (clj->js payload) nil 2))
(println (js/JSON.stringify (clj->js payload)))))
(let [remote (select-remote-rows server-rows effective-from-t to-t remote-limit)
baseline (baseline-rows server-rows effective-from-t)
results (mapv (fn [mode]
(replay-mode! {:mode mode
:baseline baseline
:remote remote
:local local-ops}))
(mode-seq (:mode opts)))
payload {:server-db server-db'
:client-ops-db client-ops-db'
:input {:from-t from-t
:effective-from-t effective-from-t
:to-t to-t
:remote-limit remote-limit
:mode (:mode opts)
:pending-only (boolean (:pending-only opts))
:local-created-at-min (:local-created-at-min opts)
:local-created-at-max (:local-created-at-max opts)
:local-tx-id (:local-tx-id opts)
:auto-from-conflict (boolean auto-from-conflict)}
:counts {:server-tx-total (count server-rows)
:baseline (count baseline)
:remote (count remote)
:local-selected (count local-ops)
:local-total (count all-local-ops)}
:conflicts {:deleted-uuids (mapv str (:deleted-uuids conflicts))
:first-conflict-t (:first-conflict-t conflicts)
:conflict-count (count (:conflicts conflicts))
:sample (->> (:conflicts conflicts)
(take 10)
(mapv (fn [c]
(update c :hits #(mapv str %)))))}
:results results}]
(if pretty
(println (js/JSON.stringify (clj->js payload) nil 2))
(println (js/JSON.stringify (clj->js payload))))))))))
(when (= nbb/*file* (nbb/invoked-file))
(-main *command-line-args*))