This commit is contained in:
Tienson Qin
2026-01-22 12:01:57 +08:00
parent fc5ab5a199
commit 1f71f9e78e
11 changed files with 905 additions and 75 deletions

View File

@@ -469,17 +469,12 @@
db-name (util/trim-safe (.-value (rum/deref input-ref)))]
(when (and cloud? refresh-token token user-uuid
(not e2ee-rsa-key-ensured?))
(p/do!
(state/<invoke-db-worker :thread-api/init-user-rsa-key-pair
token
refresh-token
user-uuid))
(-> (p/let [rsa-key-pair (state/<invoke-db-worker :thread-api/get-user-rsa-key-pair token user-uuid)]
(-> (p/let [rsa-key-pair (state/<invoke-db-worker :thread-api/db-sync-ensure-user-rsa-keys)]
(set-e2ee-rsa-key-ensured? (some? rsa-key-pair))
(when rsa-key-pair
(when db-name (new-db-f db-name))))
(p/catch (fn [e]
(log/error :get-user-rsa-key-pair e)
(log/error :db-sync/ensure-user-rsa-keys-failed e)
e))))))
[cloud?])

View File

@@ -42,6 +42,18 @@
(defn- decode-snapshot-rows [bytes]
(sqlite-util/read-transit-str (.decode snapshot-text-decoder (->uint8 bytes))))
(defn- snapshot-rows-e2ee?
[rows]
(boolean
(some (fn [[_ content _]]
(try
(let [data (sqlite-util/read-transit-str content)]
(and (map? data)
(= :logseq.kv/graph-rtc-e2ee? (:db/ident data))))
(catch :default _
false)))
rows)))
(defn- frame-len [^js data offset]
(let [view (js/DataView. (.-buffer data) offset 4)]
(.getUint32 view 0 false)))
@@ -267,10 +279,11 @@
total' (+ total (count rows))
total-rows' (into total-rows rows)]
(when (seq total-rows')
(p/do!
(state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
graph total-rows' true)
(state/<invoke-db-worker :thread-api/db-sync-finalize-kvs-import graph remote-tx)))
(let [e2ee? (snapshot-rows-e2ee? total-rows')]
(p/do!
(state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
graph total-rows' true graph-uuid e2ee?)
(state/<invoke-db-worker :thread-api/db-sync-finalize-kvs-import graph remote-tx))))
total')
(let [value (.-value chunk)
{:keys [rows buffer]} (parse-framed-chunk buffer value)
@@ -337,7 +350,12 @@
{:method "POST"
:headers {"content-type" "application/json"}
:body (js/JSON.stringify (clj->js body))}
{:response-schema :graph-members/create})]
{:response-schema :graph-members/create})
repo (state/get-current-repo)
e2ee? (ldb/get-graph-rtc-e2ee? (db/get-db))
_ (when (and repo e2ee?)
(state/<invoke-db-worker :thread-api/db-sync-grant-graph-access
repo graph-uuid email))]
(notification/show! "Invitation sent!" :success))
(p/catch (fn [e]
(notification/show! "Something wrong, please try again." :error)

View File

@@ -1,10 +1,14 @@
(ns frontend.worker.db-sync
"Simple db-sync client based on promesa + WebSocket."
(:require [clojure.data :as data]
(:require ["/frontend/idbkv" :as idb-keyval]
[clojure.data :as data]
[clojure.string :as string]
[datascript.core :as d]
[frontend.common.crypt :as crypt]
[frontend.worker-common.util :as worker-util]
[frontend.worker.handler.page :as worker-page]
[frontend.worker.rtc.client-op :as client-op]
[frontend.worker.rtc.const :as rtc-const]
[frontend.worker.shared-service :as shared-service]
[frontend.worker.state :as worker-state]
[lambdaisland.glogi :as log]
@@ -21,6 +25,8 @@
[promesa.core :as p]))
(defonce *repo->latest-remote-tx (atom {}))
(defonce ^:private *repo->aes-key (atom {}))
(defonce ^:private e2ee-store (delay (idb-keyval/newStore "localforage" "keyvaluepairs" 2)))
(defn- current-client
[repo]
@@ -33,20 +39,21 @@
(defn- sync-counts
[repo]
(let [pending-local (when-let [conn (client-ops-conn repo)]
(count (d/datoms @conn :avet :db-sync/created-at)))
pending-asset (client-op/get-unpushed-asset-ops-count repo)
local-tx (client-op/get-local-tx repo)
remote-tx (get @*repo->latest-remote-tx repo)
pending-server (when (and (number? local-tx) (number? remote-tx))
(max 0 (- remote-tx local-tx)))
graph-uuid (client-op/get-graph-uuid repo)]
{:pending-local pending-local
:pending-asset pending-asset
:pending-server pending-server
:local-tx local-tx
:remote-tx remote-tx
:graph-uuid graph-uuid}))
(when (worker-state/get-datascript-conn repo)
(let [pending-local (when-let [conn (client-ops-conn repo)]
(count (d/datoms @conn :avet :db-sync/created-at)))
pending-asset (client-op/get-unpushed-asset-ops-count repo)
local-tx (client-op/get-local-tx repo)
remote-tx (get @*repo->latest-remote-tx repo)
pending-server (when (and (number? local-tx) (number? remote-tx))
(max 0 (- remote-tx local-tx)))
graph-uuid (client-op/get-graph-uuid repo)]
{:pending-local pending-local
:pending-asset pending-asset
:pending-server pending-server
:local-tx local-tx
:remote-tx remote-tx
:graph-uuid graph-uuid})))
(defn- normalize-online-users
[users]
@@ -309,6 +316,333 @@
:url url
:body body}))))))
(def ^:private invalid-transit ::invalid-transit)
(declare encrypt-snapshot-rows decrypt-snapshot-rows)
(defn- try-read-transit [value]
(try
(ldb/read-transit-str value)
(catch :default _
invalid-transit)))
(defn- graph-e2ee?
[repo]
(when-let [conn (worker-state/get-datascript-conn repo)]
(true? (ldb/get-graph-rtc-e2ee? @conn))))
(defn- user-uuid []
(some-> (worker-state/get-id-token) worker-util/parse-jwt :sub))
(defn- graph-encrypted-aes-key-idb-key
[graph-id]
(str "rtc-encrypted-aes-key###" graph-id))
(defn- <get-item
[k]
(assert (and k @e2ee-store))
(p/let [r (idb-keyval/get k @e2ee-store)]
(js->clj r :keywordize-keys true)))
(defn- <set-item!
[k value]
(assert (and k @e2ee-store))
(idb-keyval/set k value @e2ee-store))
(defn- e2ee-base
[]
(http-base-url))
(defn- <fetch-user-rsa-key-pair-raw
[base]
(fetch-json (str base "/e2ee/user-keys")
{:method "GET"}
{:response-schema :e2ee/user-keys}))
(defn- <upload-user-rsa-key-pair!
[base public-key encrypted-private-key]
(let [body (coerce-http-request :e2ee/user-keys
{:public-key public-key
:encrypted-private-key encrypted-private-key})]
(when (nil? body)
(fail-fast :db-sync/invalid-field {:type :e2ee/user-keys :body body}))
(fetch-json (str base "/e2ee/user-keys")
{:method "POST"
:headers {"content-type" "application/json"}
:body (js/JSON.stringify (clj->js body))}
{:response-schema :e2ee/user-keys})))
(defn- <ensure-user-rsa-key-pair-raw
[base]
(p/let [existing (-> (<fetch-user-rsa-key-pair-raw base)
(p/catch (fn [error]
(throw error))))]
(if (and (string? (:public-key existing))
(string? (:encrypted-private-key existing)))
existing
(p/let [{:keys [publicKey privateKey]} (crypt/<generate-rsa-key-pair)
{:keys [password]} (worker-state/<invoke-main-thread :thread-api/request-e2ee-password)
encrypted-private-key (crypt/<encrypt-private-key password privateKey)
exported-public-key (crypt/<export-public-key publicKey)
public-key-str (ldb/write-transit-str exported-public-key)
encrypted-private-key-str (ldb/write-transit-str encrypted-private-key)]
(p/let [_ (<upload-user-rsa-key-pair! base public-key-str encrypted-private-key-str)]
{:public-key public-key-str
:encrypted-private-key encrypted-private-key-str})))))
(defn ensure-user-rsa-keys!
[]
(let [base (e2ee-base)]
(when-not (string? base)
(fail-fast :db-sync/missing-field {:base base}))
(<ensure-user-rsa-key-pair-raw base)))
(defn- <decrypt-private-key
[encrypted-private-key-str]
(p/let [encrypted-private-key (ldb/read-transit-str encrypted-private-key-str)
exported-private-key (worker-state/<invoke-main-thread
:thread-api/decrypt-user-e2ee-private-key
encrypted-private-key)]
(crypt/<import-private-key exported-private-key)))
(defn- <import-public-key
[public-key-str]
(p/let [exported (ldb/read-transit-str public-key-str)]
(crypt/<import-public-key exported)))
(defn- <fetch-user-public-key-by-email
[base email]
(fetch-json (str base "/e2ee/user-public-key?email=" (js/encodeURIComponent email))
{:method "GET"}
{:response-schema :e2ee/user-public-key}))
(defn- <fetch-graph-encrypted-aes-key-raw
[base graph-id]
(fetch-json (str base "/e2ee/graphs/" graph-id "/aes-key")
{:method "GET"}
{:response-schema :e2ee/graph-aes-key}))
(defn- <upsert-graph-encrypted-aes-key!
[base graph-id encrypted-aes-key]
(let [body (coerce-http-request :e2ee/graph-aes-key
{:encrypted-aes-key encrypted-aes-key})]
(when (nil? body)
(fail-fast :db-sync/invalid-field {:type :e2ee/graph-aes-key :body body}))
(fetch-json (str base "/e2ee/graphs/" graph-id "/aes-key")
{:method "POST"
:headers {"content-type" "application/json"}
:body (js/JSON.stringify (clj->js body))}
{:response-schema :e2ee/graph-aes-key})))
(defn- <ensure-graph-aes-key
[repo graph-id]
(if-not (graph-e2ee? repo)
(p/resolved nil)
(if-let [cached (get @*repo->aes-key repo)]
(p/resolved cached)
(let [base (e2ee-base)
user-id (user-uuid)]
(when-not (and (string? base) (string? user-id))
(fail-fast :db-sync/missing-field {:base base :user-id user-id :graph-id graph-id}))
(p/let [{:keys [public-key encrypted-private-key]} (<ensure-user-rsa-key-pair-raw base)
public-key' (when (string? public-key) (<import-public-key public-key))
private-key' (when (string? encrypted-private-key) (<decrypt-private-key encrypted-private-key))
local-encrypted (when graph-id
(<get-item (graph-encrypted-aes-key-idb-key graph-id)))
remote-encrypted (when (and (nil? local-encrypted) graph-id)
(p/let [resp (<fetch-graph-encrypted-aes-key-raw base graph-id)]
(when-let [encrypted-aes-key (:encrypted-aes-key resp)]
(ldb/read-transit-str encrypted-aes-key))))
encrypted-aes-key (or local-encrypted remote-encrypted)
aes-key (if encrypted-aes-key
(crypt/<decrypt-aes-key private-key' encrypted-aes-key)
(p/let [aes-key (crypt/<generate-aes-key)
encrypted (crypt/<encrypt-aes-key public-key' aes-key)
encrypted-str (ldb/write-transit-str encrypted)
_ (<upsert-graph-encrypted-aes-key! base graph-id encrypted-str)
_ (<set-item! (graph-encrypted-aes-key-idb-key graph-id) encrypted)]
aes-key))
_ (when (and graph-id encrypted-aes-key (nil? local-encrypted))
(<set-item! (graph-encrypted-aes-key-idb-key graph-id) encrypted-aes-key))]
(swap! *repo->aes-key assoc repo aes-key)
aes-key)))))
(defn- <fetch-graph-aes-key-for-download
[repo graph-id]
(let [base (e2ee-base)]
(when-not (and (string? base) (string? graph-id))
(fail-fast :db-sync/missing-field {:base base :graph-id graph-id}))
(p/let [{:keys [public-key encrypted-private-key]} (<fetch-user-rsa-key-pair-raw base)]
(when-not (and (string? public-key) (string? encrypted-private-key))
(fail-fast :db-sync/missing-field {:graph-id graph-id :field :user-rsa-key-pair}))
(p/let [private-key (<decrypt-private-key encrypted-private-key)
local-encrypted (<get-item (graph-encrypted-aes-key-idb-key graph-id))
remote-encrypted (when (nil? local-encrypted)
(p/let [resp (<fetch-graph-encrypted-aes-key-raw base graph-id)]
(when-let [encrypted-aes-key (:encrypted-aes-key resp)]
(ldb/read-transit-str encrypted-aes-key))))
encrypted-aes-key (or local-encrypted remote-encrypted)]
(when-not encrypted-aes-key
(fail-fast :db-sync/missing-field {:graph-id graph-id :field :encrypted-aes-key}))
(when (and encrypted-aes-key (nil? local-encrypted))
(<set-item! (graph-encrypted-aes-key-idb-key graph-id) encrypted-aes-key))
(p/let [aes-key (crypt/<decrypt-aes-key private-key encrypted-aes-key)]
(swap! *repo->aes-key assoc repo aes-key)
aes-key)))))
(defn <decrypt-kvs-rows
[repo graph-id rows e2ee?]
(if-not (true? e2ee?)
(p/resolved rows)
(p/let [aes-key (<fetch-graph-aes-key-for-download repo graph-id)
_ (when (nil? aes-key)
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
rows* (decrypt-snapshot-rows aes-key rows)]
rows*)))
(defn- <grant-graph-access!
[repo graph-id target-email]
(if-not (graph-e2ee? repo)
(p/resolved nil)
(let [base (e2ee-base)]
(when-not (string? base)
(fail-fast :db-sync/missing-field {:base base :graph-id graph-id}))
(p/let [aes-key (<ensure-graph-aes-key repo graph-id)
_ (when (nil? aes-key)
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
resp (<fetch-user-public-key-by-email base target-email)
public-key-str (:public-key resp)]
(if-not (string? public-key-str)
(fail-fast :db-sync/missing-field {:repo repo :field :public-key :email target-email})
(p/let [public-key (<import-public-key public-key-str)
encrypted (crypt/<encrypt-aes-key public-key aes-key)
encrypted-str (ldb/write-transit-str encrypted)
body (coerce-http-request :e2ee/grant-access
{:target-user-email+encrypted-aes-key-coll
[{:user/email target-email
:encrypted-aes-key encrypted-str}]})
_ (when (nil? body)
(fail-fast :db-sync/invalid-field {:type :e2ee/grant-access :body body}))
_ (fetch-json (str base "/e2ee/graphs/" graph-id "/grant-access")
{:method "POST"
:headers {"content-type" "application/json"}
:body (js/JSON.stringify (clj->js body))}
{:response-schema :e2ee/grant-access})]
nil))))))
(defn grant-graph-access!
[repo graph-id target-email]
(<grant-graph-access! repo graph-id target-email))
(defn- <encrypt-text-value
[aes-key value]
(p/let [text (ldb/write-transit-str value)
encrypted (crypt/<encrypt-text aes-key text)]
(ldb/write-transit-str encrypted)))
(defn- <decrypt-text-value
[aes-key value]
(if-not (string? value)
(p/resolved value)
(let [decoded (try-read-transit value)]
(if (= decoded invalid-transit)
(p/resolved value)
(p/let [decrypted (crypt/<decrypt-text-if-encrypted aes-key decoded)]
(if decrypted
(ldb/read-transit-str decrypted)
value))))))
(defn- encrypt-tx-item
[aes-key item]
(cond
(and (vector? item) (<= 4 (count item)))
(let [attr (nth item 2)
v (nth item 3)]
(if (and (contains? rtc-const/encrypt-attr-set attr)
(string? v))
(p/let [v' (<encrypt-text-value aes-key v)]
(assoc item 3 v'))
(p/resolved item)))
(map? item)
(let [attr (:a item)
v (:v item)]
(if (and (contains? rtc-const/encrypt-attr-set attr)
(string? v))
(p/let [v' (<encrypt-text-value aes-key v)]
(assoc item :v v'))
(p/resolved item)))
:else
(p/resolved item)))
(defn- decrypt-tx-item
[aes-key item]
(cond
(and (vector? item) (<= 4 (count item)))
(let [attr (nth item 2)
v (nth item 3)]
(if (and (contains? rtc-const/encrypt-attr-set attr)
(string? v))
(p/let [v' (<decrypt-text-value aes-key v)]
(assoc item 3 v'))
(p/resolved item)))
(map? item)
(let [attr (:a item)
v (:v item)]
(if (and (contains? rtc-const/encrypt-attr-set attr)
(string? v))
(p/let [v' (<decrypt-text-value aes-key v)]
(assoc item :v v'))
(p/resolved item)))
:else
(p/resolved item)))
(defn- encrypt-tx-data
[aes-key tx-data]
(if-not (seq tx-data)
(p/resolved [])
(p/let [items (p/all (mapv (fn [item] (encrypt-tx-item aes-key item)) tx-data))]
(vec items))))
(defn- decrypt-tx-data
[aes-key tx-data]
(if-not (seq tx-data)
(p/resolved [])
(p/let [items (p/all (mapv (fn [item] (decrypt-tx-item aes-key item)) tx-data))]
(vec items))))
(defn- encrypt-snapshot-rows
[aes-key rows]
(if-not (seq rows)
(p/resolved [])
(p/let [items (p/all
(mapv (fn [[addr content addresses]]
(let [data (try-read-transit content)]
(if (and (not= data invalid-transit) (map? data))
(p/let [data' (crypt/<encrypt-map aes-key rtc-const/encrypt-attr-set data)
content' (ldb/write-transit-str data')]
[addr content' addresses])
(p/resolved [addr content addresses]))))
rows))]
(vec items))))
(defn- decrypt-snapshot-rows
[aes-key rows]
(if-not (seq rows)
(p/resolved [])
(p/let [items (p/all
(mapv (fn [[addr content addresses]]
(let [data (try-read-transit content)]
(if (and (not= data invalid-transit) (map? data))
(p/let [data' (crypt/<decrypt-map aes-key rtc-const/encrypt-attr-set data)
content' (ldb/write-transit-str data')]
[addr content' addresses])
(p/resolved [addr content addresses]))))
rows))]
(vec items))))
(defn- require-asset-field
[repo field value context]
(when (or (nil? value) (and (string? value) (string/blank? value)))
@@ -414,10 +748,16 @@
;; (prn :debug :before-keep-last-update txs)
;; (prn :debug :upload :tx-data tx-data)
(when (seq txs)
(reset! (:inflight client) tx-ids)
(send! ws {:type "tx/batch"
:t-before local-tx
:txs (sqlite-util/write-transit-str tx-data)})))))))))))
(p/let [aes-key (<ensure-graph-aes-key repo (:graph-id client))
_ (when (and (graph-e2ee? repo) (nil? aes-key))
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
tx-data* (if aes-key
(encrypt-tx-data aes-key tx-data)
(p/resolved tx-data))]
(reset! (:inflight client) tx-ids)
(send! ws {:type "tx/batch"
:t-before local-tx
:txs (sqlite-util/write-transit-str tx-data*)}))))))))))))
(defn- ensure-client-state! [repo]
(let [client {:repo repo
@@ -829,12 +1169,18 @@
txs)
tx (distinct (mapcat identity txs-data))]
(when (seq tx)
(apply-remote-tx! repo client tx
:local-tx local-tx
:remote-tx remote-tx)
(client-op/update-local-tx repo remote-tx)
(broadcast-rtc-state! client)
(flush-pending! repo client))))
(p/let [aes-key (<ensure-graph-aes-key repo (:graph-id client))
_ (when (and (graph-e2ee? repo) (nil? aes-key))
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
tx* (if aes-key
(decrypt-tx-data aes-key tx)
(p/resolved tx))]
(apply-remote-tx! repo client tx*
:local-tx local-tx
:remote-tx remote-tx)
(client-op/update-local-tx repo remote-tx)
(broadcast-rtc-state! client)
(flush-pending! repo client)))))
"changed" (do
(require-non-negative remote-tx {:repo repo :type "changed"})
(broadcast-rtc-state! client)
@@ -1010,40 +1356,11 @@
(.set out bytes 4)
out))
(defn- snapshot-upload-stream [db]
(let [state (volatile! {:after -1 :done? false})]
(js/ReadableStream.
#js {:pull (fn [controller]
(p/let [{:keys [after done?]} @state]
(if done?
(.close controller)
(let [rows (fetch-kvs-rows db after upload-kvs-batch-size)]
(if (empty? rows)
(.close controller)
(let [rows (normalize-snapshot-rows rows)
last-addr (apply max (map first rows))
done? (< (count rows) upload-kvs-batch-size)
payload (encode-snapshot-rows rows)
framed (frame-bytes payload)]
(.enqueue controller framed)
(vswap! state assoc :after last-addr :done? done?)))))))})))
(defn- maybe-compress-stream [stream]
(if (exists? js/CompressionStream)
(.pipeThrough stream (js/CompressionStream. "gzip"))
stream))
(defn- should-buffer-snapshot-upload?
[base]
(when (string? base)
(try
(let [url (js/URL. base)
host (.-hostname url)]
(and (= "http:" (.-protocol url))
(contains? #{"localhost" "127.0.0.1"} host)))
(catch :default _
false))))
(defn- <buffer-stream
[stream]
(p/let [resp (js/Response. stream)
@@ -1064,13 +1381,21 @@
{:body buf :encoding snapshot-content-encoding})
(p/resolved {:body frame :encoding nil}))))
(defn- set-graph-e2ee-enabled!
[repo]
(when-let [conn (worker-state/get-datascript-conn repo)]
(ldb/transact! conn [(ldb/kv :logseq.kv/graph-rtc-e2ee? true)])))
(defn upload-graph!
[repo]
(let [base (http-base-url)
graph-id (get-graph-id repo)]
(if (and (seq base) (seq graph-id))
(if-let [db (worker-state/get-sqlite-conn repo :db)]
(do
(p/let [aes-key (<ensure-graph-aes-key repo graph-id)
_ (when (and (graph-e2ee? repo) (nil? aes-key))
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
(set-graph-e2ee-enabled! repo)
(ensure-client-graph-uuid! repo graph-id)
(p/loop [last-addr -1
first-batch? true]
@@ -1084,7 +1409,10 @@
(let [max-addr (apply max (map first rows))
rows (normalize-snapshot-rows rows)
upload-url (str base "/sync/" graph-id "/snapshot/upload?reset=" (if first-batch? "true" "false"))]
(p/let [{:keys [body encoding]} (<snapshot-upload-body rows)
(p/let [rows* (if aes-key
(encrypt-snapshot-rows aes-key rows)
(p/resolved rows))
{:keys [body encoding]} (<snapshot-upload-body rows*)
headers (cond-> {"content-type" snapshot-content-type}
(string? encoding) (assoc "content-encoding" encoding))
_ (fetch-json upload-url

View File

@@ -429,6 +429,14 @@
[editing-block-uuid]
(db-sync/update-presence! editing-block-uuid))
(def-thread-api :thread-api/db-sync-grant-graph-access
[repo graph-id target-email]
(db-sync/grant-graph-access! repo graph-id target-email))
(def-thread-api :thread-api/db-sync-ensure-user-rsa-keys
[]
(db-sync/ensure-user-rsa-keys!))
(def-thread-api :thread-api/db-sync-upload-graph
[repo]
(db-sync/upload-graph! repo))
@@ -607,12 +615,15 @@
nil)
(def-thread-api :thread-api/db-sync-import-kvs-rows
[repo rows reset?]
[repo rows reset? graph-id e2ee?]
(p/let [_ (when reset?
(close-db! repo))
rows* (if (true? e2ee?)
(db-sync/<decrypt-kvs-rows repo graph-id rows e2ee?)
(p/resolved rows))
db (ensure-db-sync-import-db! repo reset?)]
(when (seq rows)
(upsert-addr-content! db (rows->sqlite-binds rows)))
(when (seq rows*)
(upsert-addr-content! db (rows->sqlite-binds rows*)))
nil))
(def-thread-api :thread-api/db-sync-finalize-kvs-import