fix(sync): handle snapshot reset and tx epoch rollback

Fix snapshot-stream reset coercion so first non-empty chunk actually applies reset during full uploads.

Broadcast changed(t-now) after finished snapshot upload so connected peers learn the new snapshot epoch immediately.

Handle remote tx rollback on clients (hello/changed with remote-tx < local-tx) by rebasing local tx/checksum cursor and requesting pull from the new baseline.

Harden ws broadcast in non-WS test contexts and add regression tests for reset-on-first-chunk and finished-upload changed broadcast.

Verified with: cli-e2e sync-multi-batch-operations (passes).
This commit is contained in:
Tienson Qin
2026-04-20 17:45:57 +08:00
parent 85c5a412a4
commit b2cc4ea829
4 changed files with 123 additions and 26 deletions

View File

@@ -274,9 +274,9 @@
(let [value (.-value chunk)
{:keys [rows buffer]} (snapshot/parse-framed-chunk buffer value)
rows-count (count rows)
reset? (and @reset-pending? (seq rows))]
reset? (boolean (and @reset-pending? (seq rows)))]
(when (seq rows)
(import-snapshot! self rows (true? reset?))
(import-snapshot! self rows reset?)
(vreset! reset-pending? false))
(vswap! total-count + rows-count)
(p/recur buffer)))))
@@ -569,7 +569,12 @@
(when (seq checksum-param)
(storage/set-checksum! (.-sql self) checksum-param)))
_ (when finished?
(<set-graph-ready-for-use! self graph-id true))]
(<set-graph-ready-for-use! self graph-id true))
_ (when finished?
;; Snapshot replacement resets tx history (`t` may drop to 0).
;; Broadcast current `t` so connected clients can recover.
(ws/broadcast! self nil {:type "changed"
:t (t-now self)}))]
(http/json-response :sync/snapshot-upload {:ok true
:count count})))))))

View File

@@ -28,7 +28,9 @@
(.send ws (protocol/encode-message {:type "error" :message "server error"}))))))
(defn broadcast! [^js self sender msg]
(let [clients (.getWebSockets (.-state self))]
(doseq [ws clients]
(when (and (not= ws sender) (ws-open? ws))
(send! ws msg)))))
(when-let [state (some-> self .-state)]
(when (fn? (.-getWebSockets state))
(let [clients (.getWebSockets state)]
(doseq [ws clients]
(when (and (not= ws sender) (ws-open? ws))
(send! ws msg)))))))

View File

@@ -664,6 +664,65 @@
(is false (str error))
(done)))))))
(deftest finished-snapshot-upload-broadcasts-changed-test
(async done
(let [sql (test-sql/make-sql)
conn (d/create-conn db-schema/schema)
changed-messages (atom [])
self #js {:sql sql
:conn conn
:schema-ready true
:env #js {"DB" nil}}
request (js/Request. "http://localhost/sync/graph-1/snapshot/upload?graph-id=graph-1&finished=true"
#js {:method "POST"
:body (js/Uint8Array. 0)})]
(-> (p/with-redefs [sync-handler/import-snapshot-stream! (fn [_self _stream _reset?]
(p/resolved 0))
sync-handler/<set-graph-ready-for-use! (fn [_self _graph-id _graph-ready-for-use?]
(p/resolved true))
ws/broadcast! (fn [_self _sender payload]
(swap! changed-messages conj payload))]
(p/let [resp (sync-handler/handle {:self self
:request request
:url (js/URL. (.-url request))
:route {:handler :sync/snapshot-upload}})]
(is (= 200 (.-status resp)))
(is (= [{:type "changed"
:t (storage/get-t sql)}]
@changed-messages))))
(p/then (fn []
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))
(deftest import-snapshot-stream-first-non-empty-chunk-applies-reset-test
(async done
(let [rows [[42 "payload" nil]]
frame (#'sync-handler/frame-bytes (snapshot/encode-rows rows))
stream (js/ReadableStream.
#js {:start (fn [controller]
(.enqueue controller frame)
(.close controller))})
applied (atom [])
self #js {:sql (test-sql/make-sql)
:conn (d/create-conn db-schema/schema)
:schema-ready true}]
(-> (p/with-redefs [sync-handler/import-snapshot!
(fn [_self rows* reset?]
(swap! applied conj {:rows rows*
:reset? reset?}))]
(p/let [count (#'sync-handler/import-snapshot-stream! self stream true)]
(is (= 1 count))
(is (= [{:rows rows
:reset? true}]
@applied))))
(p/then (fn []
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))
(deftest tx-batch-rejects-when-a-tx-entry-fails-test
(testing "db transact failure rejects the batch"
(let [sql (test-sql/make-sql)

View File

@@ -269,20 +269,25 @@
(defn- update-latest-remote-state!
[repo message]
(let [remote-tx (:t message)
(let [message-type (:type message)
remote-tx (:t message)
remote-checksum (:checksum message)
has-checksum? (contains? message :checksum)
latest-remote-tx (get @sync-apply/*repo->latest-remote-tx repo)
authoritative? (contains? #{"hello" "changed"} message-type)
stale-remote-tx? (and (number? remote-tx)
(number? latest-remote-tx)
(< remote-tx latest-remote-tx))]
(< remote-tx latest-remote-tx)
(not authoritative?))]
(when (number? remote-tx)
(swap! sync-apply/*repo->latest-remote-tx
update repo
(fn [prev]
(if (number? prev)
(max prev remote-tx)
remote-tx))))
(if authoritative?
(swap! sync-apply/*repo->latest-remote-tx assoc repo remote-tx)
(swap! sync-apply/*repo->latest-remote-tx
update repo
(fn [prev]
(if (number? prev)
(max prev remote-tx)
remote-tx)))))
(when (and has-checksum? (not stale-remote-tx?))
(swap! sync-apply/*repo->latest-remote-checksum assoc repo remote-checksum))
{:stale-remote-tx? stale-remote-tx?
@@ -290,6 +295,26 @@
(declare handle-pull-ok! handle-changed!)
(defn- recover-from-remote-tx-reset!
[repo client local-tx remote-tx remote-checksum message-type]
(when (pending-local-tx? repo)
(fail-fast :db-sync/remote-tx-reset-with-pending-local
{:repo repo
:type message-type
:local-tx local-tx
:remote-tx remote-tx}))
(log/warn :db-sync/remote-tx-reset-detected
{:repo repo
:type message-type
:local-tx local-tx
:remote-tx remote-tx})
(client-op/update-local-tx repo remote-tx)
(when (string? remote-checksum)
(client-op/update-local-checksum repo remote-checksum))
(clear-pending-pull! client)
(request-pull! client remote-tx)
(broadcast-rtc-state! client))
(defn handle-message!
[repo client raw]
(let [message (-> raw
@@ -299,18 +324,24 @@
(fail-fast :db-sync/response-parse-failed {:repo repo :raw raw}))
(let [local-tx (or (client-op/get-local-tx repo) 0)
remote-tx (:t message)
remote-checksum (:checksum message)]
remote-checksum (:checksum message)
message-type (:type message)]
(update-latest-remote-state! repo message)
(case (:type message)
"hello" (handle-hello! repo client local-tx remote-tx remote-checksum)
"online-users" (handle-online-users! repo client message)
"presence" (handle-presence! client message)
"tx/batch/ok" (handle-tx-batch-ok! repo client remote-tx remote-checksum)
"pull/ok" (handle-pull-ok! repo client local-tx remote-tx remote-checksum message)
"changed" (handle-changed! repo client local-tx remote-tx)
"tx/reject" (handle-tx-reject! repo client message local-tx)
(fail-fast :db-sync/invalid-field
{:repo repo :type (:type message)})))))
(if (and (contains? #{"hello" "changed"} message-type)
(number? local-tx)
(number? remote-tx)
(> local-tx remote-tx))
(recover-from-remote-tx-reset! repo client local-tx remote-tx remote-checksum message-type)
(case message-type
"hello" (handle-hello! repo client local-tx remote-tx remote-checksum)
"online-users" (handle-online-users! repo client message)
"presence" (handle-presence! client message)
"tx/batch/ok" (handle-tx-batch-ok! repo client remote-tx remote-checksum)
"pull/ok" (handle-pull-ok! repo client local-tx remote-tx remote-checksum message)
"changed" (handle-changed! repo client local-tx remote-tx)
"tx/reject" (handle-tx-reject! repo client message local-tx)
(fail-fast :db-sync/invalid-field
{:repo repo :type message-type}))))))
(defn- handle-pull-ok!
[repo client local-tx remote-tx remote-checksum message]