diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs index 7f0c9293a5..38cc57658c 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs @@ -274,9 +274,9 @@ (let [value (.-value chunk) {:keys [rows buffer]} (snapshot/parse-framed-chunk buffer value) rows-count (count rows) - reset? (and @reset-pending? (seq rows))] + reset? (boolean (and @reset-pending? (seq rows)))] (when (seq rows) - (import-snapshot! self rows (true? reset?)) + (import-snapshot! self rows reset?) (vreset! reset-pending? false)) (vswap! total-count + rows-count) (p/recur buffer))))) @@ -569,7 +569,12 @@ (when (seq checksum-param) (storage/set-checksum! (.-sql self) checksum-param))) _ (when finished? - ( self .-state)] + (when (fn? (.-getWebSockets state)) + (let [clients (.getWebSockets state)] + (doseq [ws clients] + (when (and (not= ws sender) (ws-open? ws)) + (send! ws msg))))))) diff --git a/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs index 13f95062c7..ebdbc17e84 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs @@ -664,6 +664,65 @@ (is false (str error)) (done))))))) +(deftest finished-snapshot-upload-broadcasts-changed-test + (async done + (let [sql (test-sql/make-sql) + conn (d/create-conn db-schema/schema) + changed-messages (atom []) + self #js {:sql sql + :conn conn + :schema-ready true + :env #js {"DB" nil}} + request (js/Request. "http://localhost/sync/graph-1/snapshot/upload?graph-id=graph-1&finished=true" + #js {:method "POST" + :body (js/Uint8Array. 0)})] + (-> (p/with-redefs [sync-handler/import-snapshot-stream! (fn [_self _stream _reset?] + (p/resolved 0)) + sync-handler/ (p/with-redefs [sync-handler/import-snapshot! + (fn [_self rows* reset?] + (swap! applied conj {:rows rows* + :reset? reset?}))] + (p/let [count (#'sync-handler/import-snapshot-stream! self stream true)] + (is (= 1 count)) + (is (= [{:rows rows + :reset? true}] + @applied)))) + (p/then (fn [] + (done))) + (p/catch (fn [error] + (is false (str error)) + (done))))))) + (deftest tx-batch-rejects-when-a-tx-entry-fails-test (testing "db transact failure rejects the batch" (let [sql (test-sql/make-sql) diff --git a/src/main/frontend/worker/sync/handle_message.cljs b/src/main/frontend/worker/sync/handle_message.cljs index 2066349fc7..58a27ad76d 100644 --- a/src/main/frontend/worker/sync/handle_message.cljs +++ b/src/main/frontend/worker/sync/handle_message.cljs @@ -269,20 +269,25 @@ (defn- update-latest-remote-state! [repo message] - (let [remote-tx (:t message) + (let [message-type (:type message) + remote-tx (:t message) remote-checksum (:checksum message) has-checksum? (contains? message :checksum) latest-remote-tx (get @sync-apply/*repo->latest-remote-tx repo) + authoritative? (contains? #{"hello" "changed"} message-type) stale-remote-tx? (and (number? remote-tx) (number? latest-remote-tx) - (< remote-tx latest-remote-tx))] + (< remote-tx latest-remote-tx) + (not authoritative?))] (when (number? remote-tx) - (swap! sync-apply/*repo->latest-remote-tx - update repo - (fn [prev] - (if (number? prev) - (max prev remote-tx) - remote-tx)))) + (if authoritative? + (swap! sync-apply/*repo->latest-remote-tx assoc repo remote-tx) + (swap! sync-apply/*repo->latest-remote-tx + update repo + (fn [prev] + (if (number? prev) + (max prev remote-tx) + remote-tx))))) (when (and has-checksum? (not stale-remote-tx?)) (swap! sync-apply/*repo->latest-remote-checksum assoc repo remote-checksum)) {:stale-remote-tx? stale-remote-tx? @@ -290,6 +295,26 @@ (declare handle-pull-ok! handle-changed!) +(defn- recover-from-remote-tx-reset! + [repo client local-tx remote-tx remote-checksum message-type] + (when (pending-local-tx? repo) + (fail-fast :db-sync/remote-tx-reset-with-pending-local + {:repo repo + :type message-type + :local-tx local-tx + :remote-tx remote-tx})) + (log/warn :db-sync/remote-tx-reset-detected + {:repo repo + :type message-type + :local-tx local-tx + :remote-tx remote-tx}) + (client-op/update-local-tx repo remote-tx) + (when (string? remote-checksum) + (client-op/update-local-checksum repo remote-checksum)) + (clear-pending-pull! client) + (request-pull! client remote-tx) + (broadcast-rtc-state! client)) + (defn handle-message! [repo client raw] (let [message (-> raw @@ -299,18 +324,24 @@ (fail-fast :db-sync/response-parse-failed {:repo repo :raw raw})) (let [local-tx (or (client-op/get-local-tx repo) 0) remote-tx (:t message) - remote-checksum (:checksum message)] + remote-checksum (:checksum message) + message-type (:type message)] (update-latest-remote-state! repo message) - (case (:type message) - "hello" (handle-hello! repo client local-tx remote-tx remote-checksum) - "online-users" (handle-online-users! repo client message) - "presence" (handle-presence! client message) - "tx/batch/ok" (handle-tx-batch-ok! repo client remote-tx remote-checksum) - "pull/ok" (handle-pull-ok! repo client local-tx remote-tx remote-checksum message) - "changed" (handle-changed! repo client local-tx remote-tx) - "tx/reject" (handle-tx-reject! repo client message local-tx) - (fail-fast :db-sync/invalid-field - {:repo repo :type (:type message)}))))) + (if (and (contains? #{"hello" "changed"} message-type) + (number? local-tx) + (number? remote-tx) + (> local-tx remote-tx)) + (recover-from-remote-tx-reset! repo client local-tx remote-tx remote-checksum message-type) + (case message-type + "hello" (handle-hello! repo client local-tx remote-tx remote-checksum) + "online-users" (handle-online-users! repo client message) + "presence" (handle-presence! client message) + "tx/batch/ok" (handle-tx-batch-ok! repo client remote-tx remote-checksum) + "pull/ok" (handle-pull-ok! repo client local-tx remote-tx remote-checksum message) + "changed" (handle-changed! repo client local-tx remote-tx) + "tx/reject" (handle-tx-reject! repo client message local-tx) + (fail-fast :db-sync/invalid-field + {:repo repo :type message-type})))))) (defn- handle-pull-ok! [repo client local-tx remote-tx remote-checksum message]