Merge branch 'master' into enhance/search

This commit is contained in:
megayu
2026-03-03 20:11:33 +08:00
committed by GitHub
3 changed files with 289 additions and 36 deletions

View File

@@ -93,6 +93,144 @@
(reset! *ws-state ws-state)
(broadcast-rtc-state! client)))
(defn- timing-platform-fields
[]
{:mobile? (boolean (:mobile? (worker-state/get-context)))})
(defn- mark-ws-open!
[client]
(when-let [*timing (:timing client)]
(let [now (common-util/time-ms)
ws-connect-start-ms (:ws-connect-start-ms @*timing)
ws-connect-duration-ms (when (number? ws-connect-start-ms)
(- now ws-connect-start-ms))]
(swap! *timing assoc
:ws-open-ms now
:first-remote-apply-start-ms nil
:first-remote-apply-end-ms nil)
(log/info :db-sync/ws-open
(merge (timing-platform-fields)
{:repo (:repo client)
:graph-id (:graph-id client)
:ws-connect-start-ms ws-connect-start-ms
:ws-open-ms now
:ws-connect-duration-ms ws-connect-duration-ms})))))
(defn- mark-ws-connect-start!
[client]
(when-let [*timing (:timing client)]
(swap! *timing assoc
:ws-connect-start-ms (common-util/time-ms)
:ws-open-ms nil
:hello-received-ms nil
:first-pull-sent-ms nil
:first-pull-ok-received-ms nil
:first-aes-key-start-ms nil
:first-aes-key-ready-ms nil
:first-remote-apply-start-ms nil
:first-remote-apply-end-ms nil)))
(defn- mark-hello-received!
[client local-tx remote-tx]
(when-let [*timing (:timing client)]
(let [now (common-util/time-ms)
{:keys [ws-open-ms hello-received-ms]} @*timing]
(when (nil? hello-received-ms)
(swap! *timing assoc :hello-received-ms now)
(log/info :db-sync/hello-received
(merge (timing-platform-fields)
{:repo (:repo client)
:graph-id (:graph-id client)
:local-tx local-tx
:remote-tx remote-tx
:ws-open-ms ws-open-ms
:hello-received-ms now
:ws-open->hello-received-ms (when (number? ws-open-ms)
(- now ws-open-ms))}))))))
(defn- mark-first-pull-sent!
[client reason since]
(when-let [*timing (:timing client)]
(let [now (common-util/time-ms)
{:keys [ws-open-ms hello-received-ms first-pull-sent-ms]} @*timing]
(when (nil? first-pull-sent-ms)
(swap! *timing assoc :first-pull-sent-ms now)
(log/info :db-sync/first-pull-sent
(merge (timing-platform-fields)
{:repo (:repo client)
:graph-id (:graph-id client)
:reason reason
:since since
:ws-open-ms ws-open-ms
:hello-received-ms hello-received-ms
:first-pull-sent-ms now
:ws-open->pull-sent-ms (when (number? ws-open-ms)
(- now ws-open-ms))
:hello->pull-sent-ms (when (number? hello-received-ms)
(- now hello-received-ms))}))))))
(defn- mark-first-pull-ok-received!
[client]
(when-let [*timing (:timing client)]
(let [now (common-util/time-ms)
{:keys [ws-open-ms hello-received-ms first-pull-sent-ms first-pull-ok-received-ms]} @*timing]
(when (nil? first-pull-ok-received-ms)
(swap! *timing assoc :first-pull-ok-received-ms now)
(log/info :db-sync/first-pull-ok-received
(merge (timing-platform-fields)
{:repo (:repo client)
:graph-id (:graph-id client)
:ws-open-ms ws-open-ms
:hello-received-ms hello-received-ms
:first-pull-sent-ms first-pull-sent-ms
:first-pull-ok-received-ms now
:ws-open->pull-ok-ms (when (number? ws-open-ms)
(- now ws-open-ms))
:hello->pull-ok-ms (when (number? hello-received-ms)
(- now hello-received-ms))
:pull-sent->pull-ok-ms (when (number? first-pull-sent-ms)
(- now first-pull-sent-ms))}))))))
(defn- mark-first-aes-key-ready!
[client aes-key-start-ms aes-key-ready-ms graph-e2ee?]
(when-let [*timing (:timing client)]
(let [{:keys [first-aes-key-ready-ms first-pull-ok-received-ms ws-open-ms]} @*timing]
(when (nil? first-aes-key-ready-ms)
(swap! *timing assoc
:first-aes-key-start-ms aes-key-start-ms
:first-aes-key-ready-ms aes-key-ready-ms)
(log/info :db-sync/first-aes-key-ready
(merge (timing-platform-fields)
{:repo (:repo client)
:graph-id (:graph-id client)
:graph-e2ee? graph-e2ee?
:ws-open-ms ws-open-ms
:first-pull-ok-received-ms first-pull-ok-received-ms
:first-aes-key-start-ms aes-key-start-ms
:first-aes-key-ready-ms aes-key-ready-ms
:pull-ok->aes-key-ready-ms (when (number? first-pull-ok-received-ms)
(- aes-key-ready-ms first-pull-ok-received-ms))
:aes-key-duration-ms (- aes-key-ready-ms aes-key-start-ms)}))))))
(defn- mark-first-remote-apply!
[client apply-start-ms]
(when-let [*timing (:timing client)]
(let [{:keys [ws-open-ms first-remote-apply-end-ms]} @*timing]
(when (and (number? ws-open-ms) (nil? first-remote-apply-end-ms))
(let [apply-end-ms (common-util/time-ms)]
(swap! *timing assoc
:first-remote-apply-start-ms apply-start-ms
:first-remote-apply-end-ms apply-end-ms)
(log/info :db-sync/first-remote-apply
(merge (timing-platform-fields)
{:repo (:repo client)
:graph-id (:graph-id client)
:ws-open-ms ws-open-ms
:apply-start-ms apply-start-ms
:apply-end-ms apply-end-ms
:ws-open->first-apply-ms (- apply-end-ms ws-open-ms)
:first-apply-duration-ms (- apply-end-ms apply-start-ms)})))))))
(defn- update-online-users!
[client users]
(when-let [*online-users (:online-users client)]
@@ -358,6 +496,18 @@
(remove (fn [[_op e]]
(contains? rtc-const/ignore-entities-when-init-upload e)))))
(def ^:private non-retractable-block-attrs
#{:block/created-at :block/updated-at :block/title})
(defn- drop-non-retractable-attr-datoms
[tx-data]
(remove (fn [item]
(and (vector? item)
(>= (count item) 3)
(= :db/retract (first item))
(contains? non-retractable-block-attrs (nth item 2))))
tx-data))
(defn- reverse-tx-data
[tx-data]
(->> tx-data
@@ -663,18 +813,6 @@
(sequential? (first tx-data*))
(sequential? (first (first tx-data*)))))
(def ^:private non-retractable-block-attrs
#{:block/created-at :block/updated-at :block/title})
(defn- drop-non-retractable-attr-datoms
[tx-data]
(remove (fn [item]
(and (vector? item)
(>= (count item) 3)
(= :db/retract (first item))
(contains? non-retractable-block-attrs (nth item 2))))
tx-data))
(defn- sanitize-tx-data
[db tx-data local-deleted-ids]
(let [sanitized-tx-data (->> tx-data
@@ -745,7 +883,16 @@
:stale-kill-timer (atom nil)
:last-ws-message-ts (atom (common-util/time-ms))
:online-users (atom [])
:ws-state (atom :closed)}]
:ws-state (atom :closed)
:timing (atom {:ws-connect-start-ms nil
:ws-open-ms nil
:hello-received-ms nil
:first-pull-sent-ms nil
:first-pull-ok-received-ms nil
:first-aes-key-start-ms nil
:first-aes-key-ready-ms nil
:first-remote-apply-start-ms nil
:first-remote-apply-end-ms nil})}]
(reset! worker-state/*db-sync-client client)
client))
@@ -1122,7 +1269,8 @@
[local-txs]
(let [tx-data (->> local-txs
reverse
(mapcat :reversed-tx))
(mapcat :reversed-tx)
drop-non-retractable-attr-datoms)
retract-block-ids (->> (keep (fn [[op e a _v _t]]
(when (and (= op :db/retract) (= :block/uuid a))
e)) tx-data)
@@ -1295,8 +1443,10 @@
(case (:type message)
"hello" (do
(require-non-negative remote-tx {:repo repo :type "hello"})
(mark-hello-received! client local-tx remote-tx)
(broadcast-rtc-state! client)
(when (> remote-tx local-tx)
(mark-first-pull-sent! client :hello local-tx)
(send! (:ws client) {:type "pull" :since local-tx}))
(enqueue-asset-sync! repo client)
(flush-pending! repo client))
@@ -1325,16 +1475,23 @@
txs-data (mapv (fn [data]
(parse-transit (:tx data) {:repo repo :type "pull/ok"}))
txs)]
(mark-first-pull-ok-received! client)
(when (seq txs-data)
(p/let [aes-key (sync-crypt/<ensure-graph-aes-key repo (:graph-id client))
_ (when (and (sync-crypt/graph-e2ee? repo) (nil? aes-key))
(p/let [graph-e2ee? (sync-crypt/graph-e2ee? repo)
aes-key-start-ms (common-util/time-ms)
aes-key (sync-crypt/<ensure-graph-aes-key repo (:graph-id client))
aes-key-ready-ms (common-util/time-ms)
_ (mark-first-aes-key-ready! client aes-key-start-ms aes-key-ready-ms graph-e2ee?)
_ (when (and graph-e2ee? (nil? aes-key))
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
tx-batches (if aes-key
(p/all (mapv (fn [tx-data]
(sync-crypt/<decrypt-tx-data aes-key tx-data))
txs-data))
(p/resolved txs-data))]
(p/resolved txs-data))
apply-start-ms (common-util/time-ms)]
(apply-remote-tx! repo client tx-batches)
(mark-first-remote-apply! client apply-start-ms)
(client-op/update-local-tx repo remote-tx)
(broadcast-rtc-state! client)
(flush-pending! repo client)))))
@@ -1342,6 +1499,7 @@
(require-non-negative remote-tx {:repo repo :type "changed"})
(broadcast-rtc-state! client)
(when (< local-tx remote-tx)
(mark-first-pull-sent! client :changed local-tx)
(send! (:ws client) {:type "pull" :since local-tx})))
"tx/reject" (let [reason (:reason message)]
(when (nil? reason)
@@ -1351,7 +1509,9 @@
(require-non-negative remote-tx {:repo repo :type "tx/reject"}))
(case reason
"stale"
(send! (:ws client) {:type "pull" :since local-tx})
(do
(mark-first-pull-sent! client :stale local-tx)
(send! (:ws client) {:type "pull" :since local-tx}))
(fail-fast :db-sync/invalid-field
{:repo repo :type "tx/reject" :reason reason})))
@@ -1447,6 +1607,7 @@
(stop-client! client))
;; use cache token for faster websocket connection
(when-let [token' (or token (auth-token))]
(mark-ws-connect-start! client)
(let [ws (js/WebSocket. (append-token url token'))
updated (assoc client :ws ws)]
(attach-ws-handlers! repo updated ws url)
@@ -1455,6 +1616,7 @@
(reset-reconnect! updated)
(touch-last-ws-message! updated)
(set-ws-state! updated :open)
(mark-ws-open! updated)
(send! ws {:type "hello" :client repo})
(enqueue-asset-sync! repo updated)))
(close-stale-ws-loop updated ws))))

View File

@@ -14,6 +14,7 @@
[promesa.core :as p]))
(defonce ^:private *graph->aes-key (atom {}))
(defonce ^:private *user-rsa-key-pair-inflight (atom {}))
(defonce ^:private e2ee-store (delay (idb-keyval/newStore "localforage" "keyvaluepairs" 2)))
(defonce ^:private e2ee-password-file "e2ee-password")
(defonce ^:private native-env?
@@ -150,7 +151,7 @@
[k]
(assert (and k @e2ee-store))
(p/let [r (idb-keyval/get k @e2ee-store)]
(js->clj r :keywordize-keys true)))
(some-> r (js->clj :keywordize-keys true))))
(defn- <set-item!
[k value]
@@ -166,12 +167,66 @@
[graph-id]
(str "rtc-encrypted-aes-key###" graph-id))
(defn- user-rsa-key-pair-idb-key
[user-id]
(str "rtc-user-rsa-key-pair###" user-id))
(defn <fetch-user-rsa-key-pair-raw
[base]
(fetch-json (str base "/e2ee/user-keys")
{:method "GET"}
{:response-schema :e2ee/user-keys}))
(defn- user-rsa-key-pair-valid?
[{:keys [public-key encrypted-private-key]}]
(and (string? public-key)
(string? encrypted-private-key)))
(defn- <set-user-rsa-key-pair-to-idb!
[base user-id pair]
(when (and (string? base)
(string? user-id)
(user-rsa-key-pair-valid? pair))
(<set-item! (user-rsa-key-pair-idb-key user-id)
(ldb/write-transit-str pair)))
pair)
(defn- <get-user-rsa-key-pair-from-idb
[base user-id]
(when (and (string? base) (string? user-id))
(p/let [pair-str (<get-item (user-rsa-key-pair-idb-key user-id))
pair (ldb/read-transit-str pair-str)]
(when (user-rsa-key-pair-valid? pair)
pair))))
(defn- <clear-user-rsa-key-pair-cache!
[base user-id]
(let [k [base user-id]]
(swap! *user-rsa-key-pair-inflight dissoc k)
(when (and (string? base) (string? user-id))
(<clear-item! (user-rsa-key-pair-idb-key user-id)))))
(defn- <get-user-rsa-key-pair-raw
[base]
(let [user-id (get-user-uuid)]
(when-not (and (string? base) (string? user-id))
(fail-fast :db-sync/missing-field {:base base :user-id user-id :field :user-rsa-key-pair}))
(let [k [base user-id]]
(if-let [inflight (get @*user-rsa-key-pair-inflight k)]
inflight
(let [task (-> (p/let [cached (<get-user-rsa-key-pair-from-idb base user-id)]
(if cached
cached
(p/let [pair (<fetch-user-rsa-key-pair-raw base)]
(if (user-rsa-key-pair-valid? pair)
(p/let [_ (<set-user-rsa-key-pair-to-idb! base user-id pair)]
pair)
pair))))
(p/finally (fn []
(swap! *user-rsa-key-pair-inflight dissoc k))))]
(swap! *user-rsa-key-pair-inflight assoc k task)
task)))))
(defn <upload-user-rsa-key-pair!
[base public-key encrypted-private-key]
(let [body (coerce-http-request :e2ee/user-keys
@@ -179,17 +234,18 @@
:encrypted-private-key encrypted-private-key})]
(when (nil? body)
(fail-fast :db-sync/invalid-field {:type :e2ee/user-keys :body body}))
(fetch-json (str base "/e2ee/user-keys")
{:method "POST"
:headers {"content-type" "application/json"}
:body (js/JSON.stringify (clj->js body))}
{:response-schema :e2ee/user-keys})))
(p/let [pair (fetch-json (str base "/e2ee/user-keys")
{:method "POST"
:headers {"content-type" "application/json"}
:body (js/JSON.stringify (clj->js body))}
{:response-schema :e2ee/user-keys})
user-id (get-user-uuid)
_ (<set-user-rsa-key-pair-to-idb! base user-id pair)]
pair)))
(defn- <ensure-user-rsa-key-pair-raw
[base]
(p/let [existing (-> (<fetch-user-rsa-key-pair-raw base)
(p/catch (fn [error]
(throw error))))]
(p/let [existing (<get-user-rsa-key-pair-raw base)]
(if (and (string? (:public-key existing))
(string? (:encrypted-private-key existing)))
existing
@@ -250,6 +306,33 @@
:body (js/JSON.stringify (clj->js body))}
{:response-schema :e2ee/graph-aes-key})))
(defn- <load-user-rsa-key-material
[base user-id graph-id]
(letfn [(<load-once []
(p/let [{:keys [public-key encrypted-private-key]} (<ensure-user-rsa-key-pair-raw base)
_ (when-not (and (string? public-key) (string? encrypted-private-key))
(fail-fast :db-sync/missing-field
{:base base
:user-id user-id
:graph-id graph-id
:field :user-rsa-key-pair}))
public-key' (<import-public-key public-key)
private-key' (<decrypt-private-key encrypted-private-key)]
{:public-key public-key'
:private-key private-key'}))]
(-> (<load-once)
(p/catch (fn [error]
(-> (p/let [_ (<clear-user-rsa-key-pair-cache! base user-id)]
(<load-once))
(p/catch (fn [retry-error]
(log/warn :db-sync/user-rsa-key-cache-invalid
{:base base
:user-id user-id
:graph-id graph-id
:first-error error
:retry-error retry-error})
(throw retry-error)))))))))
(defn <ensure-graph-aes-key
[repo graph-id]
(if-not (graph-e2ee? repo)
@@ -260,9 +343,7 @@
user-id (get-user-uuid)]
(when-not (and (string? base) (string? user-id))
(fail-fast :db-sync/missing-field {:base base :user-id user-id :graph-id graph-id}))
(p/let [{:keys [public-key encrypted-private-key]} (<ensure-user-rsa-key-pair-raw base)
public-key' (when (string? public-key) (<import-public-key public-key))
private-key' (when (string? encrypted-private-key) (<decrypt-private-key encrypted-private-key))
(p/let [{:keys [public-key private-key]} (<load-user-rsa-key-material base user-id graph-id)
local-encrypted (when graph-id
(<get-item (graph-encrypted-aes-key-idb-key graph-id)))
remote-encrypted (when (and (nil? local-encrypted) graph-id)
@@ -271,9 +352,9 @@
(ldb/read-transit-str encrypted-aes-key))))
encrypted-aes-key (or local-encrypted remote-encrypted)
aes-key (if encrypted-aes-key
(crypt/<decrypt-aes-key private-key' encrypted-aes-key)
(crypt/<decrypt-aes-key private-key encrypted-aes-key)
(p/let [aes-key (crypt/<generate-aes-key)
encrypted (crypt/<encrypt-aes-key public-key' aes-key)
encrypted (crypt/<encrypt-aes-key public-key aes-key)
encrypted-str (ldb/write-transit-str encrypted)
_ (<upsert-graph-encrypted-aes-key! base graph-id encrypted-str)
_ (<set-item! (graph-encrypted-aes-key-idb-key graph-id) encrypted)]
@@ -289,7 +370,7 @@
aes-key-k (graph-encrypted-aes-key-idb-key graph-id)]
(when-not (and (string? base) (string? graph-id))
(fail-fast :db-sync/missing-field {:base base :graph-id graph-id}))
(p/let [{:keys [public-key encrypted-private-key]} (<fetch-user-rsa-key-pair-raw base)]
(p/let [{:keys [public-key encrypted-private-key]} (<get-user-rsa-key-pair-raw base)]
(<clear-item! aes-key-k)
(when-not (and (string? public-key) (string? encrypted-private-key))
(fail-fast :db-sync/missing-field {:graph-id graph-id :field :user-rsa-key-pair}))
@@ -451,7 +532,7 @@
(let [base (e2ee-base)]
(when-not (string? base)
(fail-fast :db-sync/missing-field {:base base :user-uuid user-uuid}))
(p/let [{:keys [public-key encrypted-private-key]} (<fetch-user-rsa-key-pair-raw base)]
(p/let [{:keys [public-key encrypted-private-key]} (<get-user-rsa-key-pair-raw base)]
(when-not (and (string? public-key) (string? encrypted-private-key))
(fail-fast :db-sync/missing-field {:base base :user-uuid user-uuid :field :user-rsa-key-pair}))
(p/let [encrypted-private-key' (<re-encrypt-private-key encrypted-private-key old-password new-password)
@@ -464,7 +545,7 @@
(let [base (e2ee-base)]
(when-not (string? base)
(fail-fast :db-sync/missing-field {:base base}))
(p/let [{:keys [public-key encrypted-private-key]} (<fetch-user-rsa-key-pair-raw base)]
(p/let [{:keys [public-key encrypted-private-key]} (<get-user-rsa-key-pair-raw base)]
(when (and public-key encrypted-private-key)
{:public-key public-key
:encrypted-private-key encrypted-private-key}))))
@@ -474,7 +555,7 @@
(let [base (e2ee-base)]
(when-not (string? base)
(fail-fast :db-sync/missing-field {:base base}))
(p/let [existing (<fetch-user-rsa-key-pair-raw base)]
(p/let [existing (<get-user-rsa-key-pair-raw base)]
(when-not (and (string? (:public-key existing))
(string? (:encrypted-private-key existing)))
(p/let [{:keys [publicKey privateKey]} (crypt/<generate-rsa-key-pair)

View File

@@ -112,6 +112,16 @@
(is nil (str error))
(done))))))))
(deftest get-reverse-tx-data-skips-non-retractable-block-attrs-test
(testing "pending reversed txs should not retract required block attrs"
(let [local-txs [{:reversed-tx [[:db/retract 1 :block/updated-at 100 10]
[:db/retract 1 :block/created-at 90 10]
[:db/retract 1 :block/title "Home" 10]
[:db/retract 1 :block/name "home" 10]]}]
reversed (#'db-sync/get-reverse-tx-data local-txs)]
(is (= [[:db/retract 1 :block/name "home" 10]]
reversed)))))
(deftest update-online-users-dedupes-identical-messages-test
(let [client {:repo test-repo
:online-users (atom [])