From 2799a5707445107de6cc947e11bec844cf46e1c8 Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Tue, 3 Mar 2026 20:48:28 +0800 Subject: [PATCH 1/2] client hello now sends since --- .../src/logseq/db_sync/malli_schema.cljs | 3 +- .../src/logseq/db_sync/worker/handler/ws.cljs | 9 ++++- .../logseq/db_sync/node_adapter_test.cljs | 4 +- .../db_sync/worker_handler_ws_test.cljs | 40 +++++++++++++++++++ docs/agent-guide/db-sync/protocol.md | 7 ++-- src/main/frontend/worker/sync.cljs | 6 +-- src/test/frontend/worker/db_sync_test.cljs | 18 +++++++++ 7 files changed, 77 insertions(+), 10 deletions(-) diff --git a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs index a6d3634a6b..ebeff799ea 100644 --- a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs +++ b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs @@ -12,7 +12,8 @@ ["hello" [:map [:type [:= "hello"]] - [:client :string]]] + [:client :string] + [:since :int]]] ["presence" [:map [:type [:= "presence"]] diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs index 0081d961a7..05fa7e80a9 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs @@ -11,7 +11,14 @@ (ws/send! ws {:type "error" :message "invalid request"}) (case (:type message) "hello" - (ws/send! ws {:type "hello" :t (sync-handler/t-now self)}) + (let [raw-since (:since message) + since (if (some? raw-since) (sync-handler/parse-int raw-since) 0) + current-t (sync-handler/t-now self)] + (if (or (and (some? raw-since) (not (number? since))) (neg? since)) + (ws/send! ws {:type "error" :message "invalid since"}) + (if (< since current-t) + (ws/send! ws (sync-handler/pull-response self since)) + (ws/send! ws {:type "hello" :t current-t})))) "ping" (ws/send! ws {:type "pong"}) diff --git a/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs b/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs index b4de5212cc..3dd653dec4 100644 --- a/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs @@ -90,7 +90,9 @@ (.on client "message" push-message) (.on client "open" (fn [] - (.send client (protocol/encode-message {:type "hello" :client "test"})))) + (.send client (protocol/encode-message {:type "hello" + :client "test" + :since 0})))) (p/let [_ (p/delay 50) tx-data [{:block/uuid (random-uuid) :block/content "ws"}] diff --git a/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs index 9b442a2813..23e3035c5c 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs @@ -1,6 +1,7 @@ (ns logseq.db-sync.worker-handler-ws-test (:require [cljs.test :refer [deftest is]] [logseq.db-sync.protocol :as protocol] + [logseq.db-sync.worker.handler.sync :as sync-handler] [logseq.db-sync.worker.handler.ws :as ws-handler] [logseq.db-sync.worker.presence :as presence] [logseq.db-sync.worker.ws :as ws])) @@ -25,3 +26,42 @@ :editing-block-uuid "block-1" :user-id "user-1"}] (mapv :msg @send-events))))) + +(deftest hello-message-sends-only-pull-ok-when-client-behind-test + (let [source-ws #js {:readyState 1} + send-events (atom []) + self #js {:state #js {}} + raw (protocol/encode-message {:type "hello" + :client "test" + :since 2})] + (with-redefs [sync-handler/t-now (fn [_self] 5) + sync-handler/pull-response (fn [_self since] + {:type "pull/ok" + :t 5 + :txs [{:t 5 :tx (str "since-" since)}]}) + ws/send! (fn [_target msg] + (swap! send-events conj msg))] + (ws-handler/handle-ws-message! self source-ws raw)) + + (is (= [{:type "pull/ok" :t 5 :txs [{:t 5 :tx "since-2"}]}] + @send-events)))) + +(deftest hello-message-skips-pull-ok-when-client-is-up-to-date-test + (let [source-ws #js {:readyState 1} + send-events (atom []) + pull-calls (atom 0) + self #js {:state #js {}} + raw (protocol/encode-message {:type "hello" + :client "test" + :since 5})] + (with-redefs [sync-handler/t-now (fn [_self] 5) + sync-handler/pull-response (fn [_self _since] + (swap! pull-calls inc) + {:type "pull/ok" :t 5 :txs []}) + ws/send! (fn [_target msg] + (swap! send-events conj msg))] + (ws-handler/handle-ws-message! self source-ws raw)) + + (is (= 0 @pull-calls)) + (is (= [{:type "hello" :t 5}] + @send-events)))) diff --git a/docs/agent-guide/db-sync/protocol.md b/docs/agent-guide/db-sync/protocol.md index 3800a5f24f..1f6ddd86c3 100644 --- a/docs/agent-guide/db-sync/protocol.md +++ b/docs/agent-guide/db-sync/protocol.md @@ -7,8 +7,8 @@ - Note: keep this document in sync with the current implementation. ## Client -> Server -- `{"type":"hello","client":""}` - - Initial handshake from client. +- `{"type":"hello","client":"","since":}` + - Initial handshake from client, including current local tx (`since`). - `{"type":"presence","editing-block-uuid":""}` - Update current editing block for presence (omit or null to clear). - `{"type":"pull","since":}` @@ -20,12 +20,13 @@ ## Server -> Client - `{"type":"hello","t":}` - - Server hello with current t. + - Server hello with current t when client is already up-to-date (`since >= t`). - `{"type":"online-users","online-users":[{"user-id":"...","email":"...","username":"...","name":"..."}...]}` - Presence update - Optional `editing-block-uuid` indicates the block the user is editing. - `{"type":"pull/ok","t":,"txs":[{"t":,"tx":""}...]}` - Pull response with txs. + - On initial `hello`, server sends `pull/ok` directly (without `hello`) when client is behind (`since < t`). - `{"type":"tx/batch/ok","t":}` - Batch accepted; server advanced to t. - `{"type":"changed","t":}` diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index 383e2d03aa..aee71ed22b 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -1445,9 +1445,6 @@ (require-non-negative remote-tx {:repo repo :type "hello"}) (mark-hello-received! client local-tx remote-tx) (broadcast-rtc-state! client) - (when (> remote-tx local-tx) - (mark-first-pull-sent! client :hello local-tx) - (send! (:ws client) {:type "pull" :since local-tx})) (enqueue-asset-sync! repo client) (flush-pending! repo client)) "online-users" (let [users (:online-users message)] @@ -1617,7 +1614,8 @@ (touch-last-ws-message! updated) (set-ws-state! updated :open) (mark-ws-open! updated) - (send! ws {:type "hello" :client repo}) + (let [local-tx (or (client-op/get-local-tx repo) 0)] + (send! ws {:type "hello" :client repo :since local-tx})) (enqueue-asset-sync! repo updated))) (close-stale-ws-loop updated ws)))) diff --git a/src/test/frontend/worker/db_sync_test.cljs b/src/test/frontend/worker/db_sync_test.cljs index acdac4a681..6a3fd51f7f 100644 --- a/src/test/frontend/worker/db_sync_test.cljs +++ b/src/test/frontend/worker/db_sync_test.cljs @@ -177,6 +177,24 @@ @(:online-users client))) (is (= 1 (count @broadcasts)))))) +(deftest hello-message-does-not-send-pull-request-test + (let [client {:repo test-repo + :online-users (atom []) + :ws #js {:readyState 1} + :ws-state (atom :open)} + sent-messages (atom []) + raw-message (js/JSON.stringify + (clj->js {:type "hello" + :t 8}))] + (with-redefs [client-op/get-local-tx (fn [_repo] 5) + db-sync/send! (fn [_ws message] + (swap! sent-messages conj message)) + db-sync/broadcast-rtc-state! (fn [_client] nil) + db-sync/enqueue-asset-sync! (fn [_repo _client] nil) + db-sync/flush-pending! (fn [_repo _client] nil)] + (#'db-sync/handle-message! test-repo client raw-message) + (is (empty? @sent-messages))))) + (deftest pull-ok-with-older-remote-tx-is-ignored-test (testing "pull/ok with remote tx behind local tx does not apply stale tx data" (let [{:keys [conn client-ops-conn parent]} (setup-parent-child) From 8f923a911d7fa6cfacf851c95c81156a62ee7f1e Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Tue, 3 Mar 2026 22:37:30 +0800 Subject: [PATCH 2/2] Revert "client hello now sends since" This reverts commit 2799a5707445107de6cc947e11bec844cf46e1c8. --- .../src/logseq/db_sync/malli_schema.cljs | 3 +- .../src/logseq/db_sync/worker/handler/ws.cljs | 9 +---- .../logseq/db_sync/node_adapter_test.cljs | 4 +- .../db_sync/worker_handler_ws_test.cljs | 40 ------------------- docs/agent-guide/db-sync/protocol.md | 7 ++-- src/main/frontend/worker/sync.cljs | 6 ++- src/test/frontend/worker/db_sync_test.cljs | 18 --------- 7 files changed, 10 insertions(+), 77 deletions(-) diff --git a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs index ebeff799ea..a6d3634a6b 100644 --- a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs +++ b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs @@ -12,8 +12,7 @@ ["hello" [:map [:type [:= "hello"]] - [:client :string] - [:since :int]]] + [:client :string]]] ["presence" [:map [:type [:= "presence"]] diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs index 05fa7e80a9..0081d961a7 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs @@ -11,14 +11,7 @@ (ws/send! ws {:type "error" :message "invalid request"}) (case (:type message) "hello" - (let [raw-since (:since message) - since (if (some? raw-since) (sync-handler/parse-int raw-since) 0) - current-t (sync-handler/t-now self)] - (if (or (and (some? raw-since) (not (number? since))) (neg? since)) - (ws/send! ws {:type "error" :message "invalid since"}) - (if (< since current-t) - (ws/send! ws (sync-handler/pull-response self since)) - (ws/send! ws {:type "hello" :t current-t})))) + (ws/send! ws {:type "hello" :t (sync-handler/t-now self)}) "ping" (ws/send! ws {:type "pong"}) diff --git a/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs b/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs index 3dd653dec4..b4de5212cc 100644 --- a/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/node_adapter_test.cljs @@ -90,9 +90,7 @@ (.on client "message" push-message) (.on client "open" (fn [] - (.send client (protocol/encode-message {:type "hello" - :client "test" - :since 0})))) + (.send client (protocol/encode-message {:type "hello" :client "test"})))) (p/let [_ (p/delay 50) tx-data [{:block/uuid (random-uuid) :block/content "ws"}] diff --git a/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs index 23e3035c5c..9b442a2813 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_handler_ws_test.cljs @@ -1,7 +1,6 @@ (ns logseq.db-sync.worker-handler-ws-test (:require [cljs.test :refer [deftest is]] [logseq.db-sync.protocol :as protocol] - [logseq.db-sync.worker.handler.sync :as sync-handler] [logseq.db-sync.worker.handler.ws :as ws-handler] [logseq.db-sync.worker.presence :as presence] [logseq.db-sync.worker.ws :as ws])) @@ -26,42 +25,3 @@ :editing-block-uuid "block-1" :user-id "user-1"}] (mapv :msg @send-events))))) - -(deftest hello-message-sends-only-pull-ok-when-client-behind-test - (let [source-ws #js {:readyState 1} - send-events (atom []) - self #js {:state #js {}} - raw (protocol/encode-message {:type "hello" - :client "test" - :since 2})] - (with-redefs [sync-handler/t-now (fn [_self] 5) - sync-handler/pull-response (fn [_self since] - {:type "pull/ok" - :t 5 - :txs [{:t 5 :tx (str "since-" since)}]}) - ws/send! (fn [_target msg] - (swap! send-events conj msg))] - (ws-handler/handle-ws-message! self source-ws raw)) - - (is (= [{:type "pull/ok" :t 5 :txs [{:t 5 :tx "since-2"}]}] - @send-events)))) - -(deftest hello-message-skips-pull-ok-when-client-is-up-to-date-test - (let [source-ws #js {:readyState 1} - send-events (atom []) - pull-calls (atom 0) - self #js {:state #js {}} - raw (protocol/encode-message {:type "hello" - :client "test" - :since 5})] - (with-redefs [sync-handler/t-now (fn [_self] 5) - sync-handler/pull-response (fn [_self _since] - (swap! pull-calls inc) - {:type "pull/ok" :t 5 :txs []}) - ws/send! (fn [_target msg] - (swap! send-events conj msg))] - (ws-handler/handle-ws-message! self source-ws raw)) - - (is (= 0 @pull-calls)) - (is (= [{:type "hello" :t 5}] - @send-events)))) diff --git a/docs/agent-guide/db-sync/protocol.md b/docs/agent-guide/db-sync/protocol.md index 1f6ddd86c3..3800a5f24f 100644 --- a/docs/agent-guide/db-sync/protocol.md +++ b/docs/agent-guide/db-sync/protocol.md @@ -7,8 +7,8 @@ - Note: keep this document in sync with the current implementation. ## Client -> Server -- `{"type":"hello","client":"","since":}` - - Initial handshake from client, including current local tx (`since`). +- `{"type":"hello","client":""}` + - Initial handshake from client. - `{"type":"presence","editing-block-uuid":""}` - Update current editing block for presence (omit or null to clear). - `{"type":"pull","since":}` @@ -20,13 +20,12 @@ ## Server -> Client - `{"type":"hello","t":}` - - Server hello with current t when client is already up-to-date (`since >= t`). + - Server hello with current t. - `{"type":"online-users","online-users":[{"user-id":"...","email":"...","username":"...","name":"..."}...]}` - Presence update - Optional `editing-block-uuid` indicates the block the user is editing. - `{"type":"pull/ok","t":,"txs":[{"t":,"tx":""}...]}` - Pull response with txs. - - On initial `hello`, server sends `pull/ok` directly (without `hello`) when client is behind (`since < t`). - `{"type":"tx/batch/ok","t":}` - Batch accepted; server advanced to t. - `{"type":"changed","t":}` diff --git a/src/main/frontend/worker/sync.cljs b/src/main/frontend/worker/sync.cljs index aee71ed22b..383e2d03aa 100644 --- a/src/main/frontend/worker/sync.cljs +++ b/src/main/frontend/worker/sync.cljs @@ -1445,6 +1445,9 @@ (require-non-negative remote-tx {:repo repo :type "hello"}) (mark-hello-received! client local-tx remote-tx) (broadcast-rtc-state! client) + (when (> remote-tx local-tx) + (mark-first-pull-sent! client :hello local-tx) + (send! (:ws client) {:type "pull" :since local-tx})) (enqueue-asset-sync! repo client) (flush-pending! repo client)) "online-users" (let [users (:online-users message)] @@ -1614,8 +1617,7 @@ (touch-last-ws-message! updated) (set-ws-state! updated :open) (mark-ws-open! updated) - (let [local-tx (or (client-op/get-local-tx repo) 0)] - (send! ws {:type "hello" :client repo :since local-tx})) + (send! ws {:type "hello" :client repo}) (enqueue-asset-sync! repo updated))) (close-stale-ws-loop updated ws)))) diff --git a/src/test/frontend/worker/db_sync_test.cljs b/src/test/frontend/worker/db_sync_test.cljs index 6a3fd51f7f..acdac4a681 100644 --- a/src/test/frontend/worker/db_sync_test.cljs +++ b/src/test/frontend/worker/db_sync_test.cljs @@ -177,24 +177,6 @@ @(:online-users client))) (is (= 1 (count @broadcasts)))))) -(deftest hello-message-does-not-send-pull-request-test - (let [client {:repo test-repo - :online-users (atom []) - :ws #js {:readyState 1} - :ws-state (atom :open)} - sent-messages (atom []) - raw-message (js/JSON.stringify - (clj->js {:type "hello" - :t 8}))] - (with-redefs [client-op/get-local-tx (fn [_repo] 5) - db-sync/send! (fn [_ws message] - (swap! sent-messages conj message)) - db-sync/broadcast-rtc-state! (fn [_client] nil) - db-sync/enqueue-asset-sync! (fn [_repo _client] nil) - db-sync/flush-pending! (fn [_repo _client] nil)] - (#'db-sync/handle-message! test-repo client raw-message) - (is (empty? @sent-messages))))) - (deftest pull-ok-with-older-remote-tx-is-ignored-test (testing "pull/ok with remote tx behind local tx does not apply stale tx data" (let [{:keys [conn client-ops-conn parent]} (setup-parent-child)