diff --git a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs index a6d3634a6b..ebeff799ea 100644 --- a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs +++ b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs @@ -12,7 +12,8 @@ ["hello" [:map [:type [:= "hello"]] - [:client :string]]] + [:client :string] + [:since :int]]] ["presence" [:map [:type [:= "presence"]] diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs index 0081d961a7..05fa7e80a9 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs @@ -11,7 +11,14 @@ (ws/send! ws {:type "error" :message "invalid request"}) (case (:type message) "hello" - (ws/send! ws {:type "hello" :t (sync-handler/t-now self)}) + (let [raw-since (:since message) + since (if (some? raw-since) (sync-handler/parse-int raw-since) 0) + current-t (sync-handler/t-now self)] + (if (or (and (some? raw-since) (not (number? since))) (neg? since)) + (ws/send! ws {:type "error" :message "invalid since"}) + (if (< since current-t) + (ws/send! ws (sync-handler/pull-response self since)) + (ws/send! ws {:type "hello" :t current-t})))) "ping" (ws/send! ws {:type "pong"}) diff --git a/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs b/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs index b4de5212cc..3dd653dec4 100644 --- a/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs @@ -90,7 +90,9 @@ (.on client "message" push-message) (.on client "open" (fn [] - (.send client (protocol/encode-message {:type "hello" :client "test"})))) + (.send client (protocol/encode-message {:type "hello" + :client "test" + :since 0})))) (p/let [_ (p/delay 50) tx-data [{:block/uuid (random-uuid) :block/content "ws"}] diff --git a/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs index 9b442a2813..23e3035c5c 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs @@ -1,6 +1,7 @@ (ns logseq.db-sync.worker-handler-ws-test (:require [cljs.test :refer [deftest is]] [logseq.db-sync.protocol :as protocol] + [logseq.db-sync.worker.handler.sync :as sync-handler] [logseq.db-sync.worker.handler.ws :as ws-handler] [logseq.db-sync.worker.presence :as presence] [logseq.db-sync.worker.ws :as ws])) @@ -25,3 +26,42 @@ :editing-block-uuid "block-1" :user-id "user-1"}] (mapv :msg @send-events))))) + +(deftest hello-message-sends-only-pull-ok-when-client-behind-test + (let [source-ws #js {:readyState 1} + send-events (atom []) + self #js {:state #js {}} + raw (protocol/encode-message {:type "hello" + :client "test" + :since 2})] + (with-redefs [sync-handler/t-now (fn [_self] 5) + sync-handler/pull-response (fn [_self since] + {:type "pull/ok" + :t 5 + :txs [{:t 5 :tx (str "since-" since)}]}) + ws/send! (fn [_target msg] + (swap! send-events conj msg))] + (ws-handler/handle-ws-message! self source-ws raw)) + + (is (= [{:type "pull/ok" :t 5 :txs [{:t 5 :tx "since-2"}]}] + @send-events)))) + +(deftest hello-message-skips-pull-ok-when-client-is-up-to-date-test + (let [source-ws #js {:readyState 1} + send-events (atom []) + pull-calls (atom 0) + self #js {:state #js {}} + raw (protocol/encode-message {:type "hello" + :client "test" + :since 5})] + (with-redefs [sync-handler/t-now (fn [_self] 5) + sync-handler/pull-response (fn [_self _since] + (swap! pull-calls inc) + {:type "pull/ok" :t 5 :txs []}) + ws/send! (fn [_target msg] + (swap! send-events conj msg))] + (ws-handler/handle-ws-message! self source-ws raw)) + + (is (= 0 @pull-calls)) + (is (= [{:type "hello" :t 5}] + @send-events)))) diff --git a/docs/agent-guide/db-sync/protocol.md b/docs/agent-guide/db-sync/protocol.md index 3800a5f24f..1f6ddd86c3 100644 --- a/docs/agent-guide/db-sync/protocol.md +++ b/docs/agent-guide/db-sync/protocol.md @@ -7,8 +7,8 @@ - Note: keep this document in sync with the current implementation. ## Client -> Server -- `{"type":"hello","client":""}` - - Initial handshake from client. +- `{"type":"hello","client":"","since":}` + - Initial handshake from client, including current local tx (`since`). - `{"type":"presence","editing-block-uuid":""}` - Update current editing block for presence (omit or null to clear). - `{"type":"pull","since":}` @@ -20,12 +20,13 @@ ## Server -> Client - `{"type":"hello","t":}` - - Server hello with current t. + - Server hello with current t when client is already up-to-date (`since >= t`). - `{"type":"online-users","online-users":[{"user-id":"...","email":"...","username":"...","name":"..."}...]}` - Presence update - Optional `editing-block-uuid` indicates the block the user is editing. - `{"type":"pull/ok","t":,"txs":[{"t":,"tx":""}...]}` - Pull response with txs. + - On initial `hello`, server sends `pull/ok` directly (without `hello`) when client is behind (`since < t`). - `{"type":"tx/batch/ok","t":}` - Batch accepted; server advanced to t. - `{"type":"changed","t":}` diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index 383e2d03aa..aee71ed22b 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -1445,9 +1445,6 @@ (require-non-negative remote-tx {:repo repo :type "hello"}) (mark-hello-received! client local-tx remote-tx) (broadcast-rtc-state! client) - (when (> remote-tx local-tx) - (mark-first-pull-sent! client :hello local-tx) - (send! (:ws client) {:type "pull" :since local-tx})) (enqueue-asset-sync! repo client) (flush-pending! repo client)) "online-users" (let [users (:online-users message)] @@ -1617,7 +1614,8 @@ (touch-last-ws-message! updated) (set-ws-state! updated :open) (mark-ws-open! updated) - (send! ws {:type "hello" :client repo}) + (let [local-tx (or (client-op/get-local-tx repo) 0)] + (send! ws {:type "hello" :client repo :since local-tx})) (enqueue-asset-sync! repo updated))) (close-stale-ws-loop updated ws)))) diff --git a/src/test/frontend/worker/db_sync_test.cljs b/src/test/frontend/worker/db_sync_test.cljs index acdac4a681..6a3fd51f7f 100644 --- a/src/test/frontend/worker/db_sync_test.cljs +++ b/src/test/frontend/worker/db_sync_test.cljs @@ -177,6 +177,24 @@ @(:online-users client))) (is (= 1 (count @broadcasts)))))) +(deftest hello-message-does-not-send-pull-request-test + (let [client {:repo test-repo + :online-users (atom []) + :ws #js {:readyState 1} + :ws-state (atom :open)} + sent-messages (atom []) + raw-message (js/JSON.stringify + (clj->js {:type "hello" + :t 8}))] + (with-redefs [client-op/get-local-tx (fn [_repo] 5) + db-sync/send! (fn [_ws message] + (swap! sent-messages conj message)) + db-sync/broadcast-rtc-state! (fn [_client] nil) + db-sync/enqueue-asset-sync! (fn [_repo _client] nil) + db-sync/flush-pending! (fn [_repo _client] nil)] + (#'db-sync/handle-message! test-repo client raw-message) + (is (empty? @sent-messages))))) + (deftest pull-ok-with-older-remote-tx-is-ignored-test (testing "pull/ok with remote tx behind local tx does not apply stale tx data" (let [{:keys [conn client-ops-conn parent]} (setup-parent-child)