mirror of
https://github.com/logseq/logseq.git
synced 2026-04-24 22:25:01 +00:00
enhance(db-sync): fail fast when encounter bugs
This commit is contained in:
44
deps/db-sync/src/logseq/db_sync/worker.cljs
vendored
44
deps/db-sync/src/logseq/db_sync/worker.cljs
vendored
@@ -94,6 +94,10 @@
|
||||
(when-not (= coerced invalid-coerce)
|
||||
coerced))))
|
||||
|
||||
(defn- fail-fast [tag data]
|
||||
(log/error tag data)
|
||||
(throw (ex-info (name tag) data)))
|
||||
|
||||
(defn- coerce-http-request [schema-key body]
|
||||
(if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
|
||||
(let [coerced (coerce coercer body {:schema schema-key :dir :request})]
|
||||
@@ -345,28 +349,30 @@
|
||||
|
||||
(defn- apply-tx! [^js self sender tx-data]
|
||||
(let [sql (.-sql self)
|
||||
conn (.-conn self)
|
||||
db @conn
|
||||
conn (.-conn self)]
|
||||
(when-not conn
|
||||
(fail-fast :db-sync/missing-db {:op :apply-tx}))
|
||||
(let [db @conn
|
||||
tx-report (d/with db tx-data)
|
||||
db' (:db-after tx-report)
|
||||
order-fixed (fix-tx-data db' tx-data)
|
||||
cycle-info (cycle/detect-cycle db' order-fixed)]
|
||||
(if cycle-info
|
||||
(do
|
||||
(log/info :db-sync/cycle-detected
|
||||
{:attr (:attr cycle-info)
|
||||
:entity (:entity cycle-info)
|
||||
:entity-title (entity-title db (:entity cycle-info))
|
||||
:tx-count (count order-fixed)})
|
||||
(cycle-reject-response db order-fixed cycle-info))
|
||||
(let [{:keys [tx-data db-before db-after]} (ldb/transact! conn order-fixed)
|
||||
normalized-data (db-normalize/normalize-tx-data db-after db-before tx-data)
|
||||
new-t (storage/next-t! sql)
|
||||
created-at (common/now-ms)
|
||||
tx-str (common/write-transit normalized-data)]
|
||||
(storage/append-tx! sql new-t tx-str created-at)
|
||||
(broadcast! self sender {:type "changed" :t new-t})
|
||||
new-t))))
|
||||
(if cycle-info
|
||||
(do
|
||||
(log/info :db-sync/cycle-detected
|
||||
{:attr (:attr cycle-info)
|
||||
:entity (:entity cycle-info)
|
||||
:entity-title (entity-title db (:entity cycle-info))
|
||||
:tx-count (count order-fixed)})
|
||||
(cycle-reject-response db order-fixed cycle-info))
|
||||
(let [{:keys [tx-data db-before db-after]} (ldb/transact! conn order-fixed)
|
||||
normalized-data (db-normalize/normalize-tx-data db-after db-before tx-data)
|
||||
new-t (storage/next-t! sql)
|
||||
created-at (common/now-ms)
|
||||
tx-str (common/write-transit normalized-data)]
|
||||
(storage/append-tx! sql new-t tx-str created-at)
|
||||
(broadcast! self sender {:type "changed" :t new-t})
|
||||
new-t)))))
|
||||
|
||||
(defn- handle-tx-batch! [^js self sender txs t-before]
|
||||
(let [current-t (t-now self)]
|
||||
@@ -394,7 +400,7 @@
|
||||
(defn- handle-ws-message! [^js self ^js ws raw]
|
||||
(let [message (-> raw protocol/parse-message coerce-ws-client-message)]
|
||||
(if-not (map? message)
|
||||
(send! ws {:type "error" :message "invalid message"})
|
||||
(fail-fast :db-sync/request-parse-failed {:raw raw})
|
||||
(case (:type message)
|
||||
"hello"
|
||||
(send! ws {:type "hello" :t (t-now self)})
|
||||
|
||||
@@ -21,6 +21,20 @@ This guide helps AI agents implement and review db-sync features consistently ac
|
||||
- Use existing `:db-sync/*` keywords; add new keywords via `logseq.common.defkeywords/defkeyword`.
|
||||
- When adding persisted fields, ensure any migration or index logic is updated on both client and worker.
|
||||
|
||||
## Fail-Fast Error Policy
|
||||
- db-sync core code must fail fast on bugs: log an error (`log/error`) and throw immediately.
|
||||
- Treat these as bugs:
|
||||
- db connection missing when required (client or worker).
|
||||
- WS/HTTP response fields missing (e.g., `:t`, `:txs`, `:reason`, `:data`).
|
||||
- Response parse/coercion failures (e.g., transit decode, malli coercion).
|
||||
- Unexpected WS/HTTP message type or reason value.
|
||||
- tx payload type mismatch (e.g., `:txs` not a sequence of strings).
|
||||
- Invalid graph identity (missing/empty graph id or uuid in sync path).
|
||||
- Invalid or negative `t`/`t_before` values.
|
||||
- Asset operations missing required fields (checksum/type/uuid).
|
||||
- Invariant violations in tx apply (e.g., tx-data empty after normalization).
|
||||
- Do not silently recover or drop messages for these cases; surface them via exceptions.
|
||||
|
||||
## HTTP API (Bootstrap + Assets)
|
||||
- HTTP endpoints backfill initial graph data, snapshots, and assets.
|
||||
- See `docs/agent-guide/db-sync/protocol.md` for request/response shapes.
|
||||
|
||||
@@ -112,6 +112,24 @@
|
||||
(when-not (= coerced invalid-coerce)
|
||||
coerced))))
|
||||
|
||||
(defn- fail-fast [tag data]
|
||||
(log/error tag data)
|
||||
(throw (ex-info (name tag) data)))
|
||||
|
||||
(defn- require-number [value context]
|
||||
(when-not (number? value)
|
||||
(fail-fast :db-sync/invalid-field (assoc context :value value))))
|
||||
|
||||
(defn- require-seq [value context]
|
||||
(when-not (sequential? value)
|
||||
(fail-fast :db-sync/invalid-field (assoc context :value value))))
|
||||
|
||||
(defn- parse-transit [value context]
|
||||
(try
|
||||
(sqlite-util/read-transit-str value)
|
||||
(catch :default e
|
||||
(fail-fast :db-sync/response-parse-failed (assoc context :error e)))))
|
||||
|
||||
(defn- coerce-http-request [schema-key body]
|
||||
(if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
|
||||
(let [coerced (coerce coercer body {:schema schema-key :dir :request})]
|
||||
@@ -232,7 +250,7 @@
|
||||
(outliner-core/move-blocks! conn parents recycle-id {:sibling? false})))))
|
||||
|
||||
(defn- apply-remote-tx! [repo client tx-data]
|
||||
(when-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(if-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(try
|
||||
(let [tx-report (ldb/transact! conn tx-data {:rtc-tx? true})
|
||||
db-after (:db-after tx-report)
|
||||
@@ -241,10 +259,11 @@
|
||||
(when (seq asset-uuids)
|
||||
(enqueue-asset-downloads! repo client asset-uuids)))
|
||||
(catch :default e
|
||||
(log/error :db-sync/apply-remote-tx-failed {:error e})))))
|
||||
(log/error :db-sync/apply-remote-tx-failed {:error e})))
|
||||
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
|
||||
|
||||
(defn- reconcile-cycle! [repo attr server_values]
|
||||
(when-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(if-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(let [db @conn
|
||||
tx-data (reduce
|
||||
(fn [acc [eid value]]
|
||||
@@ -274,7 +293,8 @@
|
||||
(:block/name ent))})))
|
||||
(take 10))})
|
||||
(when (seq tx-data)
|
||||
(ldb/transact! conn tx-data {:rtc-tx? true})))))
|
||||
(ldb/transact! conn tx-data {:rtc-tx? true})))
|
||||
(fail-fast :db-sync/missing-db {:repo repo :op :reconcile-cycle})))
|
||||
|
||||
(defn- normalize-entity-ref
|
||||
[db entity]
|
||||
@@ -324,14 +344,15 @@
|
||||
(declare enqueue-asset-initial-download!)
|
||||
(defn- pending-txs-by-ids
|
||||
[repo tx-ids]
|
||||
(when-let [conn (client-ops-conn repo)]
|
||||
(if-let [conn (client-ops-conn repo)]
|
||||
(let [db @conn]
|
||||
(keep (fn [tx-id]
|
||||
(when-let [ent (d/entity db [:db-sync/tx-id tx-id])]
|
||||
(when-let [tx (:db-sync/tx ent)]
|
||||
{:tx-id tx-id
|
||||
:tx tx})))
|
||||
tx-ids))))
|
||||
tx-ids))
|
||||
(fail-fast :db-sync/missing-db {:repo repo :op :pending-txs-by-ids})))
|
||||
|
||||
(defn- requeue-non-parent-txs!
|
||||
[repo attr server_values entries]
|
||||
@@ -341,17 +362,17 @@
|
||||
requeued (volatile! 0)
|
||||
stripped (volatile! 0)]
|
||||
(if-not scoped?
|
||||
(throw (ex-info "db-sync requeue requires scoped cycle info"
|
||||
{:repo repo
|
||||
:has-db? (boolean db)
|
||||
:attr attr
|
||||
:server-values (count server_values)
|
||||
:entries (count entries)}))
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo
|
||||
:has-db? (boolean db)
|
||||
:attr attr
|
||||
:server-values (count server_values)
|
||||
:entries (count entries)})
|
||||
(do
|
||||
(doseq [{:keys [tx]} entries]
|
||||
(when (string? tx)
|
||||
(vswap! stripped inc)
|
||||
(let [tx-data (sqlite-util/read-transit-str tx)
|
||||
(let [tx-data (parse-transit tx {:repo repo :op :requeue-non-parent-txs})
|
||||
filtered (strip-cycle-attrs db tx-data {:attr attr :entity-refs entity-refs})]
|
||||
(when (seq filtered)
|
||||
(vswap! requeued inc)
|
||||
@@ -364,7 +385,7 @@
|
||||
|
||||
(defn- cycle-entity-titles
|
||||
[repo server_values]
|
||||
(when-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(if-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(let [db @conn]
|
||||
(->> (keys server_values)
|
||||
(keep (fn [ref]
|
||||
@@ -372,14 +393,18 @@
|
||||
{:uuid (some-> (:block/uuid ent) str)
|
||||
:title (or (:block/title ent)
|
||||
(:block/name ent))})))
|
||||
(take 10)))))
|
||||
(take 10)))
|
||||
(fail-fast :db-sync/missing-db {:repo repo :op :cycle-entity-titles})))
|
||||
|
||||
(defn- handle-message! [repo client raw]
|
||||
(when-let [message (-> raw parse-message coerce-ws-server-message)]
|
||||
(let [message (-> raw parse-message coerce-ws-server-message)]
|
||||
(when-not (map? message)
|
||||
(fail-fast :db-sync/response-parse-failed {:repo repo :raw raw}))
|
||||
(let [local-tx (or (client-op/get-local-tx repo) 0)
|
||||
remote-tx (:t message)]
|
||||
(case (:type message)
|
||||
"hello" (do
|
||||
(require-number remote-tx {:repo repo :type "hello"})
|
||||
(when (> remote-tx local-tx)
|
||||
(send! (:ws client) {:type "pull" :since local-tx}))
|
||||
(enqueue-asset-sync! repo client)
|
||||
@@ -387,42 +412,67 @@
|
||||
(flush-pending! repo client))
|
||||
;; Upload response
|
||||
"tx/batch/ok" (do
|
||||
(require-number remote-tx {:repo repo :type "tx/batch/ok"})
|
||||
(client-op/update-local-tx repo remote-tx)
|
||||
(remove-pending-txs! repo @(:inflight client))
|
||||
(reset! (:inflight client) [])
|
||||
(flush-pending! repo client))
|
||||
;; Download response
|
||||
;; Merge batch txs to one tx, does it really work? We'll see
|
||||
"pull/ok" (let [tx (mapcat (fn [data] (sqlite-util/read-transit-str (:tx data))) (:txs message))]
|
||||
"pull/ok" (let [txs (:txs message)
|
||||
_ (require-number remote-tx {:repo repo :type "pull/ok"})
|
||||
_ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
|
||||
tx (mapcat (fn [data]
|
||||
(parse-transit (:tx data) {:repo repo :type "pull/ok"}))
|
||||
txs)]
|
||||
(when tx
|
||||
(apply-remote-tx! repo client tx)
|
||||
(client-op/update-local-tx repo remote-tx)
|
||||
(flush-pending! repo client)))
|
||||
"changed" (when (and (number? remote-tx) (< local-tx remote-tx))
|
||||
(send! (:ws client) {:type "pull" :since local-tx}))
|
||||
"tx/reject" (case (:reason message)
|
||||
"stale"
|
||||
(send! (:ws client) {:type "pull" :since local-tx})
|
||||
"cycle"
|
||||
(let [{:keys [attr server_values]} (sqlite-util/read-transit-str (:data message))]
|
||||
;; FIXME: fix cycle shouldn't re-trigger uploading
|
||||
(let [inflight-ids @(:inflight client)
|
||||
inflight-entries (pending-txs-by-ids repo inflight-ids)]
|
||||
(log/info :db-sync/tx-reject-cycle
|
||||
{:repo repo
|
||||
:attr attr
|
||||
:server-values (count server_values)
|
||||
:entity-titles (cycle-entity-titles repo server_values)
|
||||
:inflight-ids (count inflight-ids)
|
||||
:local-tx local-tx
|
||||
:remote-tx remote-tx})
|
||||
(reconcile-cycle! repo attr server_values)
|
||||
(remove-pending-txs! repo inflight-ids)
|
||||
(reset! (:inflight client) [])
|
||||
(requeue-non-parent-txs! repo attr server_values inflight-entries))
|
||||
(flush-pending! repo client))
|
||||
nil)
|
||||
nil))))
|
||||
"changed" (do
|
||||
(require-number remote-tx {:repo repo :type "changed"})
|
||||
(when (< local-tx remote-tx)
|
||||
(send! (:ws client) {:type "pull" :since local-tx})))
|
||||
"tx/reject" (let [reason (:reason message)]
|
||||
(when (nil? reason)
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :reason}))
|
||||
(case reason
|
||||
"stale"
|
||||
(send! (:ws client) {:type "pull" :since local-tx})
|
||||
"cycle"
|
||||
(do
|
||||
(when (nil? (:data message))
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :data}))
|
||||
(let [{:keys [attr server_values]}
|
||||
(parse-transit (:data message) {:repo repo :type "tx/reject"})]
|
||||
(when (nil? attr)
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :attr}))
|
||||
(when (nil? server_values)
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :server_values}))
|
||||
;; FIXME: fix cycle shouldn't re-trigger uploading
|
||||
(let [inflight-ids @(:inflight client)
|
||||
inflight-entries (pending-txs-by-ids repo inflight-ids)]
|
||||
(log/info :db-sync/tx-reject-cycle
|
||||
{:repo repo
|
||||
:attr attr
|
||||
:server-values (count server_values)
|
||||
:entity-titles (cycle-entity-titles repo server_values)
|
||||
:inflight-ids (count inflight-ids)
|
||||
:local-tx local-tx
|
||||
:remote-tx remote-tx})
|
||||
(reconcile-cycle! repo attr server_values)
|
||||
(remove-pending-txs! repo inflight-ids)
|
||||
(reset! (:inflight client) [])
|
||||
(requeue-non-parent-txs! repo attr server_values inflight-entries))
|
||||
(flush-pending! repo client)))
|
||||
(fail-fast :db-sync/invalid-field
|
||||
{:repo repo :type "tx/reject" :reason reason})))
|
||||
(fail-fast :db-sync/invalid-field
|
||||
{:repo repo :type (:type message)})))))
|
||||
|
||||
(defn- ensure-client-state! [repo]
|
||||
(or (get @worker-state/*db-sync-clients repo)
|
||||
|
||||
Reference in New Issue
Block a user