refactor: keep worker as thin as possible

This commit is contained in:
Tienson Qin
2026-01-26 22:19:05 +08:00
parent cd0a2568b1
commit f46c245788
16 changed files with 1185 additions and 1105 deletions

View File

@@ -18,16 +18,16 @@
(->uint8 (transit/write transit-w rows)))
(defn decode-rows
[bytes]
(transit/read transit-r (.decode text-decoder (->uint8 bytes))))
[payload]
(transit/read transit-r (.decode text-decoder (->uint8 payload))))
(defn frame-bytes
[^js bytes]
(let [len (.-byteLength bytes)
[^js payload]
(let [len (.-byteLength payload)
out (js/Uint8Array. (+ 4 len))
view (js/DataView. (.-buffer out))]
(.setUint32 view 0 len false)
(.set out bytes 4)
(.set out payload 4)
out))
(defn concat-bytes

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,37 @@
(ns logseq.db-sync.worker.auth
(:require [clojure.string :as string]
[logseq.common.authorization :as authorization]))
(defn- bearer-token [auth-header]
(when (and (string? auth-header) (string/starts-with? auth-header "Bearer "))
(subs auth-header 7)))
(defn token-from-request [request]
(or (bearer-token (.get (.-headers request) "authorization"))
(let [url (js/URL. (.-url request))]
(.get (.-searchParams url) "token"))))
(defn- decode-jwt-part [part]
(let [pad (if (pos? (mod (count part) 4))
(apply str (repeat (- 4 (mod (count part) 4)) "="))
"")
base64 (-> (str part pad)
(string/replace "-" "+")
(string/replace "_" "/"))
raw (js/atob base64)]
(js/JSON.parse raw)))
(defn unsafe-jwt-claims [token]
(try
(when (string? token)
(let [parts (string/split token #"\.")]
(when (= 3 (count parts))
(decode-jwt-part (nth parts 1)))))
(catch :default _
nil)))
(defn auth-claims [request env]
(let [token (token-from-request request)]
(if (string? token)
(.catch (authorization/verify-jwt token env) (fn [_] nil))
(js/Promise.resolve nil))))

View File

@@ -0,0 +1,12 @@
(ns logseq.db-sync.worker.coerce
(:require [lambdaisland.glogi :as log]))
(def invalid-coerce ::invalid-coerce)
(defn coerce
[coercer value context]
(try
(coercer value)
(catch :default e
(log/error :db-sync/malli-coerce-failed (merge context {:error e :value value}))
invalid-coerce)))

View File

@@ -0,0 +1,67 @@
(ns logseq.db-sync.worker.dispatch
(:require [clojure.string :as string]
[logseq.db-sync.common :as common]
[logseq.db-sync.worker.handler.assets :as assets-handler]
[logseq.db-sync.worker.handler.index :as index-handler]
[logseq.db-sync.worker.http :as http]
[promesa.core :as p]))
(defn handle-worker-fetch [request ^js env]
(let [url (js/URL. (.-url request))
path (.-pathname url)
method (.-method request)]
(cond
(= path "/health")
(http/json-response :worker/health {:ok true})
(or (= path "/graphs")
(string/starts-with? path "/graphs/"))
(index-handler/handle-fetch #js {:env env :d1 (aget env "DB")} request)
(string/starts-with? path "/e2ee")
(index-handler/handle-fetch #js {:env env :d1 (aget env "DB")} request)
(string/starts-with? path "/assets/")
(if (= method "OPTIONS")
(assets-handler/handle request env)
(if-let [{:keys [graph-id]} (assets-handler/parse-asset-path path)]
(p/let [access-resp (index-handler/graph-access-response request env graph-id)]
(if (.-ok access-resp)
(assets-handler/handle request env)
access-resp))
(http/bad-request "invalid asset path")))
(= method "OPTIONS")
(common/options-response)
(string/starts-with? path "/sync/")
(let [prefix (count "/sync/")
rest-path (subs path prefix)
rest-path (if (string/starts-with? rest-path "/")
(subs rest-path 1)
rest-path)
slash-idx (or (string/index-of rest-path "/") -1)
graph-id (if (neg? slash-idx) rest-path (subs rest-path 0 slash-idx))
tail (if (neg? slash-idx)
"/"
(subs rest-path slash-idx))
new-url (js/URL. (str (.-origin url) tail (.-search url)))]
(if (seq graph-id)
(if (= method "OPTIONS")
(common/options-response)
(p/let [access-resp (index-handler/graph-access-response request env graph-id)]
(if (.-ok access-resp)
(let [^js namespace (.-LOGSEQ_SYNC_DO env)
do-id (.idFromName namespace graph-id)
stub (.get namespace do-id)]
(if (common/upgrade-request? request)
(.fetch stub request)
(do
(.set (.-searchParams new-url) "graph-id" graph-id)
(let [rewritten (js/Request. (.toString new-url) request)]
(.fetch stub rewritten)))))
access-resp)))
(http/bad-request "missing graph id")))
:else
(http/not-found))))

View File

@@ -0,0 +1,87 @@
(ns logseq.db-sync.worker.handler.assets
(:require [cljs-bean.core :as bean]
[clojure.string :as string]
[logseq.db-sync.common :as common :refer [cors-headers]]
[logseq.db-sync.worker.http :as http]))
(def ^:private max-asset-size (* 100 1024 1024))
(defn parse-asset-path [path]
(let [prefix "/assets/"]
(when (string/starts-with? path prefix)
(let [rest-path (subs path (count prefix))
slash-idx (string/index-of rest-path "/")
graph-id (when (and slash-idx (pos? slash-idx)) (subs rest-path 0 slash-idx))
file (when (and slash-idx (pos? slash-idx)) (subs rest-path (inc slash-idx)))
dot-idx (when file (string/last-index-of file "."))
asset-uuid (when (and dot-idx (pos? dot-idx)) (subs file 0 dot-idx))
asset-type (when (and dot-idx (pos? dot-idx)) (subs file (inc dot-idx)))]
(when (and (seq graph-id) (seq asset-uuid) (seq asset-type))
{:graph-id graph-id
:asset-uuid asset-uuid
:asset-type asset-type
:key (str graph-id "/" asset-uuid "." asset-type)})))))
(defn handle [request ^js env]
(let [url (js/URL. (.-url request))
path (.-pathname url)
method (.-method request)]
(cond
(= method "OPTIONS")
(js/Response. nil #js {:status 204 :headers (cors-headers)})
:else
(if-let [{:keys [key asset-type]} (parse-asset-path path)]
(let [^js bucket (.-LOGSEQ_SYNC_ASSETS env)]
(if-not bucket
(http/error-response "missing assets bucket" 500)
(case method
"GET"
(.then (.get bucket key)
(fn [^js obj]
(if (nil? obj)
(http/error-response "not found" 404)
(let [metadata (.-httpMetadata obj)
content-type (or (.-contentType metadata)
"application/octet-stream")
content-encoding (.-contentEncoding metadata)
cache-control (.-cacheControl metadata)
headers (cond-> {"content-type" content-type
"x-asset-type" asset-type}
(and (string? content-encoding)
(not= content-encoding "null")
(pos? (.-length content-encoding)))
(assoc "content-encoding" content-encoding)
(and (string? cache-control)
(pos? (.-length cache-control)))
(assoc "cache-control" cache-control)
true
(bean/->js))]
(js/Response. (.-body obj)
#js {:status 200
:headers (js/Object.assign
headers
(cors-headers))})))))
"PUT"
(.then (.arrayBuffer request)
(fn [buf]
(if (> (.-byteLength buf) max-asset-size)
(http/error-response "asset too large" 413)
(.then (.put bucket
key
buf
#js {:httpMetadata #js {:contentType (or (.get (.-headers request) "content-type")
"application/octet-stream")}
:customMetadata #js {:checksum (.get (.-headers request) "x-amz-meta-checksum")
:type asset-type}})
(fn [_]
(http/json-response :assets/put {:ok true} 200))))))
"DELETE"
(.then (.delete bucket key)
(fn [_]
(http/json-response :assets/delete {:ok true} 200)))
(http/error-response "method not allowed" 405))))
(http/error-response "invalid asset path" 400)))))

View File

@@ -0,0 +1,331 @@
(ns logseq.db-sync.worker.handler.index
(:require [lambdaisland.glogi :as log]
[logseq.db-sync.common :as common]
[logseq.db-sync.index :as index]
[logseq.db-sync.worker.auth :as auth]
[logseq.db-sync.worker.http :as http]
[logseq.db-sync.worker.routes.index :as routes]
[promesa.core :as p]))
(defn- index-db [^js self]
(let [db (.-d1 self)]
(when-not db
(log/error :db-sync/index-db-missing {:binding "DB"}))
db))
(defn handle [{:keys [db ^js env request url claims route]}]
(let [path-params (:path-params route)
graph-id (:graph-id path-params)
member-id (:member-id path-params)
user-id (aget claims "sub")]
(case (:handler route)
:graphs/list
(if (string? user-id)
(p/let [graphs (index/<index-list db user-id)]
(http/json-response :graphs/list {:graphs graphs}))
(http/unauthorized))
:graphs/create
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :graphs/create body)
graph-id (str (random-uuid))]
(cond
(not (string? user-id))
(http/unauthorized)
(nil? body)
(http/bad-request "invalid body")
:else
(p/let [{:keys [graph-name schema-version]} body
name-exists? (index/<graph-name-exists? db graph-name user-id)]
(if name-exists?
(http/bad-request "duplicate graph name")
(p/let [_ (index/<index-upsert! db graph-id graph-name user-id schema-version)
_ (index/<graph-member-upsert! db graph-id user-id "manager" user-id)]
(http/json-response :graphs/create {:graph-id graph-id})))))))))
:graphs/access
(cond
(not (string? user-id))
(http/unauthorized)
:else
(p/let [owns? (index/<user-has-access-to-graph? db graph-id user-id)]
(if owns?
(http/json-response :graphs/access {:ok true})
(http/forbidden))))
:graph-members/list
(cond
(not (string? user-id))
(http/unauthorized)
:else
(p/let [can-access? (index/<user-has-access-to-graph? db graph-id user-id)]
(if (not can-access?)
(http/forbidden)
(p/let [members (index/<graph-members-list db graph-id)]
(http/json-response :graph-members/list {:members members})))))
:graph-members/create
(cond
(not (string? user-id))
(http/unauthorized)
:else
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :graph-members/create body)
member-id (:user-id body)
email (:email body)
role (or (:role body) "member")]
(cond
(nil? body)
(http/bad-request "invalid body")
(and (not (string? member-id))
(not (string? email)))
(http/bad-request "invalid user")
:else
(p/let [manager? (index/<user-is-manager? db graph-id user-id)
resolved-id (if (string? member-id)
(p/resolved member-id)
(index/<user-id-by-email db email))]
(if (not manager?)
(http/forbidden)
(if-not (string? resolved-id)
(http/bad-request "user not found")
(p/let [_ (index/<graph-member-upsert! db graph-id resolved-id role user-id)]
(http/json-response :graph-members/create {:ok true})))))))))))
:graph-members/update
(cond
(not (string? user-id))
(http/unauthorized)
(not (string? member-id))
(http/bad-request "invalid user id")
:else
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :graph-members/update body)
role (:role body)]
(cond
(nil? body)
(http/bad-request "invalid body")
:else
(p/let [manager? (index/<user-is-manager? db graph-id user-id)]
(if (not manager?)
(http/forbidden)
(p/let [_ (index/<graph-member-update-role! db graph-id member-id role)]
(http/json-response :graph-members/update {:ok true}))))))))))
:graph-members/delete
(cond
(not (string? user-id))
(http/unauthorized)
(not (string? member-id))
(http/bad-request "invalid user id")
:else
(p/let [manager? (index/<user-is-manager? db graph-id user-id)
target-role (index/<graph-member-role db graph-id member-id)
self-leave? (and (= user-id member-id)
(= "member" target-role))]
(cond
(and manager? (not= "manager" target-role))
(p/let [_ (index/<graph-member-delete! db graph-id member-id)]
(http/json-response :graph-members/delete {:ok true}))
self-leave?
(p/let [_ (index/<graph-member-delete! db graph-id member-id)]
(http/json-response :graph-members/delete {:ok true}))
:else
(http/forbidden))))
:graphs/delete
(cond
(not (seq graph-id))
(http/bad-request "missing graph id")
(not (string? user-id))
(http/unauthorized)
:else
(p/let [owns? (index/<user-has-access-to-graph? db graph-id user-id)]
(if (not owns?)
(http/forbidden)
(p/let [_ (index/<index-delete! db graph-id)]
(let [^js namespace (.-LOGSEQ_SYNC_DO env)
do-id (.idFromName namespace graph-id)
stub (.get namespace do-id)
reset-url (str (.-origin url) "/admin/reset")]
(.fetch stub (js/Request. reset-url #js {:method "DELETE"})))
(http/json-response :graphs/delete {:graph-id graph-id :deleted true})))))
:e2ee/user-keys-get
(if (string? user-id)
(p/let [pair (index/<user-rsa-key-pair db user-id)]
(http/json-response :e2ee/user-keys (or pair {})))
(http/unauthorized))
:e2ee/user-keys-post
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :e2ee/user-keys body)]
(cond
(not (string? user-id))
(http/unauthorized)
(nil? body)
(http/bad-request "invalid body")
:else
(let [{:keys [public-key encrypted-private-key]} body]
(p/let [_ (index/<user-rsa-key-pair-upsert! db user-id public-key encrypted-private-key)]
(http/json-response :e2ee/user-keys {:public-key public-key
:encrypted-private-key encrypted-private-key}))))))))
:e2ee/user-public-key-get
(let [email (.get (.-searchParams url) "email")]
(p/let [public-key (index/<user-rsa-public-key-by-email db email)]
(http/json-response :e2ee/user-public-key
(cond-> {}
(some? public-key)
(assoc :public-key public-key)))))
:e2ee/graph-aes-key-get
(cond
(not (string? user-id))
(http/unauthorized)
:else
(p/let [access? (index/<user-has-access-to-graph? db graph-id user-id)]
(if (not access?)
(http/forbidden)
(p/let [encrypted-aes-key (index/<graph-encrypted-aes-key db graph-id user-id)]
(http/json-response :e2ee/graph-aes-key (cond-> {}
(some? encrypted-aes-key)
(assoc :encrypted-aes-key encrypted-aes-key)))))))
:e2ee/graph-aes-key-post
(cond
(not (string? user-id))
(http/unauthorized)
:else
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :e2ee/graph-aes-key body)]
(if (nil? body)
(http/bad-request "invalid body")
(p/let [access? (index/<user-has-access-to-graph? db graph-id user-id)]
(if (not access?)
(http/forbidden)
(let [{:keys [encrypted-aes-key]} body]
(p/let [_ (index/<graph-encrypted-aes-key-upsert! db graph-id user-id encrypted-aes-key)]
(http/json-response :e2ee/graph-aes-key {:encrypted-aes-key encrypted-aes-key})))))))))))
:e2ee/grant-access
(if (not (string? user-id))
(http/unauthorized)
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :e2ee/grant-access body)]
(if (nil? body)
(http/bad-request "invalid body")
(p/let [manager? (index/<user-is-manager? db graph-id user-id)]
(if (not manager?)
(http/forbidden)
(let [entries (:target-user-email+encrypted-aes-key-coll body)
missing (atom [])]
(p/let [_ (p/all
(map (fn [entry]
(let [email (:email entry)
encrypted-aes-key (:encrypted-aes-key entry)]
(p/let [target-user-id (index/<user-id-by-email db email)
access? (and target-user-id
(index/<user-has-access-to-graph? db graph-id target-user-id))]
(if (and target-user-id access?)
(index/<graph-encrypted-aes-key-upsert! db graph-id target-user-id encrypted-aes-key)
(swap! missing conj email)))))
entries))]
(http/json-response :e2ee/grant-access
(cond-> {:ok true}
(seq @missing)
(assoc :missing-users @missing)))))))))))))
(http/not-found))))
(defn handle-fetch [^js self request]
(let [db (index-db self)
env (.-env self)
url (js/URL. (.-url request))
path (.-pathname url)
method (.-method request)]
(try
(cond
(contains? #{"OPTIONS" "HEAD"} method)
(common/options-response)
(nil? db)
(http/error-response "server error" 500)
:else
(p/let [_ (index/<index-init! db)
claims (auth/auth-claims request env)
_ (when claims
(index/<user-upsert! db claims))]
(let [route (routes/match-route method path)]
(cond
(nil? claims)
(http/unauthorized)
route
(handle {:db db
:env env
:request request
:url url
:claims claims
:route route})
:else
(http/not-found)))))
(catch :default error
(log/error :db-sync/index-error error)
(http/error-response "server error" 500)))))
(defn graph-access-response [request env graph-id]
(let [token (auth/token-from-request request)
url (js/URL. (.-url request))
access-url (str (.-origin url) "/graphs/" graph-id "/access")
headers (js/Headers. (.-headers request))
index-self #js {:env env :d1 (aget env "DB")}]
(when (string? token)
(.set headers "authorization" (str "Bearer " token)))
(handle-fetch index-self (js/Request. access-url #js {:method "GET" :headers headers}))))

View File

@@ -0,0 +1,406 @@
(ns logseq.db-sync.worker.handler.sync
(:require [clojure.string :as string]
[lambdaisland.glogi :as log]
[logseq.db :as ldb]
[logseq.db-sync.batch :as batch]
[logseq.db-sync.common :as common]
[logseq.db-sync.protocol :as protocol]
[logseq.db-sync.snapshot :as snapshot]
[logseq.db-sync.storage :as storage]
[logseq.db-sync.worker.http :as http]
[logseq.db-sync.worker.routes.sync :as sync-routes]
[logseq.db-sync.worker.ws :as ws]
[promesa.core :as p]))
(def ^:private snapshot-download-batch-size 500)
(def ^:private snapshot-cache-control "private, max-age=300")
(def ^:private snapshot-content-type "application/transit+json")
(def ^:private snapshot-content-encoding "gzip")
;; 10m
(def ^:private snapshot-multipart-part-size (* 10 1024 1024))
(defn parse-int [value]
(when (some? value)
(let [n (js/parseInt value 10)]
(when-not (js/isNaN n)
n))))
(defn- ensure-schema! [^js self]
(when-not (true? (.-schema-ready self))
(storage/init-schema! (.-sql self))
(set! (.-schema-ready self) true)))
(defn- ensure-conn! [^js self]
(ensure-schema! self)
(when-not (.-conn self)
(set! (.-conn self) (storage/open-conn (.-sql self)))))
(defn t-now [^js self]
(ensure-schema! self)
(storage/get-t (.-sql self)))
(defn- fetch-kvs-rows
[sql after limit]
(common/get-sql-rows
(common/sql-exec sql
"select addr, content, addresses from kvs where addr > ? order by addr asc limit ?"
after
limit)))
(defn- snapshot-row->tuple [row]
(if (array? row)
[(aget row 0) (aget row 1) (aget row 2)]
[(aget row "addr") (aget row "content") (aget row "addresses")]))
(defn- import-snapshot-rows!
[sql table rows]
(when (seq rows)
(doseq [batch (batch/rows->insert-batches table rows nil)]
(let [sql-str (:sql batch)
args (:args batch)]
(apply common/sql-exec sql sql-str args)))))
(defn- reset-import!
[sql]
(common/sql-exec sql "delete from kvs")
(common/sql-exec sql "delete from tx_log")
(common/sql-exec sql "delete from sync_meta")
(storage/set-t! sql 0))
(defn- graph-id-from-request [request]
(let [header-id (.get (.-headers request) "x-graph-id")
url (js/URL. (.-url request))
param-id (.get (.-searchParams url) "graph-id")]
(when (seq (or header-id param-id))
(or header-id param-id))))
(defn- snapshot-key [graph-id snapshot-id]
(str graph-id "/" snapshot-id ".snapshot"))
(defn- snapshot-url [request graph-id snapshot-id]
(let [url (js/URL. (.-url request))]
(str (.-origin url) "/assets/" graph-id "/" snapshot-id ".snapshot")))
(defn- maybe-decompress-stream [stream encoding]
(if (and (= encoding snapshot-content-encoding) (exists? js/DecompressionStream))
(.pipeThrough stream (js/DecompressionStream. "gzip"))
stream))
(defn- ->uint8 [data]
(cond
(instance? js/Uint8Array data) data
(instance? js/ArrayBuffer data) (js/Uint8Array. data)
:else (js/Uint8Array. data)))
(defn- concat-uint8 [^js a ^js b]
(cond
(nil? a) b
(nil? b) a
:else
(let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))]
(.set out a 0)
(.set out b (.-byteLength a))
out)))
(defn- snapshot-export-stream [^js self]
(let [sql (.-sql self)
state (volatile! {:after -1 :done? false})]
(js/ReadableStream.
#js {:pull (fn [controller]
(p/let [{:keys [after done?]} @state]
(if done?
(.close controller)
(let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)
rows (mapv snapshot-row->tuple rows)
last-addr (if (seq rows)
(apply max (map first rows))
after)
done? (< (count rows) snapshot-download-batch-size)]
(when (seq rows)
(let [payload (snapshot/encode-rows rows)
framed (snapshot/frame-bytes payload)]
(.enqueue controller framed)))
(vswap! state assoc :after last-addr :done? done?)))))})))
(defn- upload-multipart!
[^js bucket key stream opts]
(p/let [^js upload (.createMultipartUpload bucket key opts)]
(let [reader (.getReader stream)]
(-> (p/loop [buffer nil
part-number 1
parts []]
(p/let [chunk (.read reader)]
(if (.-done chunk)
(cond
(and buffer (pos? (.-byteLength buffer)))
(p/let [^js resp (.uploadPart upload part-number buffer)
parts (conj parts {:partNumber part-number :etag (.-etag resp)})]
(p/let [_ (.complete upload (clj->js parts))]
{:ok true}))
(seq parts)
(p/let [_ (.complete upload (clj->js parts))]
{:ok true})
:else
(p/let [_ (.abort upload)]
(.put bucket key (js/Uint8Array. 0) opts)))
(let [value (.-value chunk)
buffer (concat-uint8 buffer (->uint8 value))]
(if (>= (.-byteLength buffer) snapshot-multipart-part-size)
(let [part (.slice buffer 0 snapshot-multipart-part-size)
rest-parts (.slice buffer snapshot-multipart-part-size (.-byteLength buffer))]
(p/let [^js resp (.uploadPart upload part-number part)
parts (conj parts {:partNumber part-number :etag (.-etag resp)})]
(p/recur rest-parts (inc part-number) parts)))
(p/recur buffer part-number parts))))))
(p/catch (fn [error]
(.abort upload)
(throw error)))))))
(defn- snapshot-export-length [^js self]
(let [sql (.-sql self)]
(p/loop [after -1
total 0]
(let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)]
(if (empty? rows)
total
(let [rows (mapv snapshot-row->tuple rows)
payload (snapshot/encode-rows rows)
total (+ total 4 (.-byteLength payload))
last-addr (apply max (map first rows))
done? (< (count rows) snapshot-download-batch-size)]
(if done?
total
(p/recur last-addr total))))))))
(defn- snapshot-export-fixed-length [^js self]
(p/let [length (snapshot-export-length self)
stream (snapshot-export-stream self)]
(if (exists? js/FixedLengthStream)
(let [^js fixed (js/FixedLengthStream. length)
readable (.-readable fixed)
writable (.-writable fixed)
reader (.getReader stream)
writer (.getWriter writable)]
(p/let [_ (p/loop []
(p/let [chunk (.read reader)]
(if (.-done chunk)
(.close writer)
(p/let [_ (.write writer (.-value chunk))]
(p/recur)))))]
readable))
(p/let [resp (js/Response. stream)
buf (.arrayBuffer resp)]
buf))))
(declare import-snapshot!)
(defn- import-snapshot-stream! [^js self stream reset?]
(let [reader (.getReader stream)
reset-pending? (volatile! reset?)
total-count (volatile! 0)]
(p/let [buffer nil]
(p/catch
(p/loop [buffer buffer]
(p/let [chunk (.read reader)]
(if (.-done chunk)
(let [rows (snapshot/finalize-framed-buffer buffer)
rows-count (count rows)
reset? (and @reset-pending? true)]
(when (or reset? (seq rows))
(import-snapshot! self rows reset?)
(vreset! reset-pending? false))
(vswap! total-count + rows-count)
@total-count)
(let [value (.-value chunk)
{:keys [rows buffer]} (snapshot/parse-framed-chunk buffer value)
rows-count (count rows)
reset? (and @reset-pending? (seq rows))]
(when (seq rows)
(import-snapshot! self rows (true? reset?))
(vreset! reset-pending? false))
(vswap! total-count + rows-count)
(p/recur buffer)))))
(fn [error]
(throw error))))))
(defn pull-response [^js self since]
(let [sql (.-sql self)
txs (storage/fetch-tx-since sql since)
response {:type "pull/ok"
:t (t-now self)
:txs txs}]
response))
(defn- import-snapshot! [^js self rows reset?]
(let [sql (.-sql self)]
(ensure-schema! self)
(when reset?
(reset-import! sql))
(import-snapshot-rows! sql "kvs" rows)))
(defn- apply-tx! [^js self sender txs]
(let [sql (.-sql self)]
(ensure-conn! self)
(let [conn (.-conn self)
tx-data (protocol/transit->tx txs)]
(ldb/transact! conn tx-data {:op :apply-client-tx})
(let [new-t (storage/get-t sql)]
;; FIXME: no need to broadcast if client tx is less than remote tx
(ws/broadcast! self sender {:type "changed" :t new-t})
new-t))))
(defn handle-tx-batch! [^js self sender txs t-before]
(let [current-t (t-now self)]
(cond
(or (not (number? t-before)) (neg? t-before))
{:type "tx/reject"
:reason "invalid t-before"}
(not= t-before current-t)
{:type "tx/reject"
:reason "stale"
:t current-t}
:else
(if txs
(let [new-t (apply-tx! self sender txs)]
(if (and (map? new-t) (= "tx/reject" (:type new-t)))
new-t
{:type "tx/batch/ok"
:t new-t}))
{:type "tx/reject"
:reason "empty tx data"}))))
(defn handle [{:keys [^js self request url route]}]
(case (:handler route)
:sync/health
(http/json-response :sync/health {:ok true})
:sync/pull
(let [raw-since (.get (.-searchParams url) "since")
since (if (some? raw-since) (parse-int raw-since) 0)]
(if (or (and (some? raw-since) (not (number? since))) (neg? since))
(http/bad-request "invalid since")
(http/json-response :sync/pull (pull-response self since))))
:sync/snapshot-download
(let [graph-id (graph-id-from-request request)
^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))]
(cond
(not (seq graph-id))
(http/bad-request "missing graph id")
(nil? bucket)
(http/error-response "missing assets bucket" 500)
:else
(p/let [snapshot-id (str (random-uuid))
key (snapshot-key graph-id snapshot-id)
stream (snapshot-export-stream self)
multipart? (and (some? (.-createMultipartUpload bucket))
(fn? (.-createMultipartUpload bucket)))
opts #js {:httpMetadata #js {:contentType snapshot-content-type
:contentEncoding nil
:cacheControl snapshot-cache-control}
:customMetadata #js {:purpose "snapshot"
:created-at (str (common/now-ms))}}
_ (if multipart?
(upload-multipart! bucket key stream opts)
(p/let [body (snapshot-export-fixed-length self)]
(.put bucket key body opts)))
url (snapshot-url request graph-id snapshot-id)]
(http/json-response :sync/snapshot-download {:ok true
:key key
:url url
:content-encoding nil}))))
:sync/admin-reset
(do
(common/sql-exec (.-sql self) "drop table if exists kvs")
(common/sql-exec (.-sql self) "drop table if exists tx_log")
(common/sql-exec (.-sql self) "drop table if exists sync_meta")
(storage/init-schema! (.-sql self))
(set! (.-schema-ready self) true)
(set! (.-conn self) nil)
(http/json-response :sync/admin-reset {:ok true}))
:sync/tx-batch
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :sync/tx-batch body)]
(if (nil? body)
(http/bad-request "invalid tx")
(let [{:keys [txs t-before]} body
t-before (parse-int t-before)]
(if (string? txs)
(http/json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before))
(http/bad-request "invalid tx"))))))))
:sync/snapshot-upload
(let [graph-id (graph-id-from-request request)
reset-param (.get (.-searchParams url) "reset")
reset? (if (nil? reset-param)
true
(not (contains? #{"false" "0"} reset-param)))
req-encoding (.get (.-headers request) "content-encoding")]
(cond
(not (seq graph-id))
(http/bad-request "missing graph id")
(nil? (.-body request))
(http/bad-request "missing body")
:else
(let [stream (.-body request)
encoding (or req-encoding "")]
(if (and (= encoding snapshot-content-encoding)
(not (exists? js/DecompressionStream)))
(http/error-response "gzip not supported" 500)
(p/let [stream (maybe-decompress-stream stream encoding)
count (import-snapshot-stream! self stream reset?)]
(http/json-response :sync/snapshot-upload {:ok true
:count count}))))))
(http/not-found)))
(defn- strip-sync-prefix [path]
(if (string/starts-with? path "/sync/")
(let [rest-path (subs path (count "/sync/"))
slash-idx (string/index-of rest-path "/")]
(if (neg? slash-idx)
"/"
(subs rest-path slash-idx)))
path))
(defn handle-http [^js self request]
(letfn [(with-cors-error [resp]
(if (instance? js/Promise resp)
(.catch resp
(fn [e]
(log/error :db-sync/http-error {:error e})
(http/error-response "server error" 500)))
resp))]
(try
(let [url (js/URL. (.-url request))
raw-path (.-pathname url)
path (strip-sync-prefix raw-path)
method (.-method request)]
(with-cors-error
(cond
(= method "OPTIONS")
(common/options-response)
:else
(if-let [route (sync-routes/match-route method path)]
(handle {:self self
:request request
:url url
:route route})
(http/not-found)))))
(catch :default e
(log/error :db-sync/http-error {:error e})
(http/error-response "server error" 500)))))

View File

@@ -0,0 +1,55 @@
(ns logseq.db-sync.worker.handler.ws
(:require [logseq.db-sync.protocol :as protocol]
[logseq.db-sync.worker.auth :as auth]
[logseq.db-sync.worker.handler.sync :as sync-handler]
[logseq.db-sync.worker.presence :as presence]
[logseq.db-sync.worker.ws :as ws]))
(defn handle-ws-message! [^js self ^js ws raw]
(let [message (-> raw protocol/parse-message ws/coerce-ws-client-message)]
(if-not (map? message)
(ws/send! ws {:type "error" :message "invalid request"})
(case (:type message)
"hello"
(ws/send! ws {:type "hello" :t (sync-handler/t-now self)})
"ping"
(ws/send! ws {:type "pong"})
"presence"
(let [editing-block-uuid (:editing-block-uuid message)]
(presence/update-presence! self ws {:editing-block-uuid editing-block-uuid})
(presence/broadcast-online-users! self))
"pull"
(let [raw-since (:since message)
since (if (some? raw-since) (sync-handler/parse-int raw-since) 0)]
(if (or (and (some? raw-since) (not (number? since))) (neg? since))
(ws/send! ws {:type "error" :message "invalid since"})
(ws/send! ws (sync-handler/pull-response self since))))
;; "snapshot"
;; (send! ws (snapshot-response self))
"tx/batch"
(let [txs (:txs message)
t-before (sync-handler/parse-int (:t-before message))]
(if (string? txs)
(ws/send! ws (sync-handler/handle-tx-batch! self ws txs t-before))
(ws/send! ws {:type "tx/reject" :reason "invalid tx"})))
(ws/send! ws {:type "error" :message "unknown type"})))))
(defn handle-ws [^js self request]
(let [pair (js/WebSocketPair.)
client (aget pair 0)
server (aget pair 1)
state (.-state self)]
(.acceptWebSocket state server)
(let [token (auth/token-from-request request)
claims (auth/unsafe-jwt-claims token)
user (presence/claims->user claims)]
(when user
(presence/add-presence! self server user))
(presence/broadcast-online-users! self))
(js/Response. nil #js {:status 101 :webSocket client})))

View File

@@ -0,0 +1,36 @@
(ns logseq.db-sync.worker.http
(:require [logseq.db-sync.common :as common]
[logseq.db-sync.malli-schema :as db-sync-schema]
[logseq.db-sync.worker.coerce :as coerce]))
(defn coerce-http-request [schema-key body]
(if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
(let [coerced (coerce/coerce coercer body {:schema schema-key :dir :request})]
(when-not (= coerced coerce/invalid-coerce)
coerced))
body))
(defn json-response
([schema-key data] (json-response schema-key data 200))
([schema-key data status]
(if-let [coercer (get db-sync-schema/http-response-coercers schema-key)]
(let [coerced (coerce/coerce coercer data {:schema schema-key :dir :response})]
(if (= coerced coerce/invalid-coerce)
(common/json-response {:error "server error"} 500)
(common/json-response coerced status)))
(common/json-response data status))))
(defn error-response [message status]
(json-response :error {:error message} status))
(defn bad-request [message]
(error-response message 400))
(defn unauthorized []
(error-response "unauthorized" 401))
(defn forbidden []
(error-response "forbidden" 403))
(defn not-found []
(error-response "not found" 404))

View File

@@ -0,0 +1,55 @@
(ns logseq.db-sync.worker.presence
(:require [clojure.string :as string]
[logseq.db-sync.worker.ws :as ws]))
(defn claims->user
[claims]
(when claims
(let [user-id (aget claims "sub")
email (aget claims "email")
username (or (aget claims "preferred_username")
(aget claims "cognito:username")
(aget claims "username"))
name (aget claims "name")]
(when (string? user-id)
(cond-> {:user-id user-id}
(string? email) (assoc :email email)
(string? username) (assoc :username username)
(string? name) (assoc :name name))))))
(defn presence*
[^js self]
(or (.-presence self)
(set! (.-presence self) (atom {}))))
(defn online-users
[^js self]
(vec (distinct (vals @(presence* self)))))
(defn broadcast-online-users!
[^js self]
(ws/broadcast! self nil {:type "online-users" :online-users (online-users self)}))
(defn add-presence!
[^js self ^js ws user]
(swap! (presence* self) assoc ws user)
(.serializeAttachment ws (clj->js user)))
(defn update-presence!
[^js self ^js ws {:keys [editing-block-uuid] :as updates}]
(swap! (presence* self)
(fn [presence]
(if-let [user (get presence ws)]
(let [user' (if (contains? updates :editing-block-uuid)
(if (and (string? editing-block-uuid)
(not (string/blank? editing-block-uuid)))
(assoc user :editing-block-uuid editing-block-uuid)
(dissoc user :editing-block-uuid))
user)]
(.serializeAttachment ws (clj->js user'))
(assoc presence ws user'))
presence))))
(defn remove-presence!
[^js self ^js ws]
(swap! (presence* self) dissoc ws))

View File

@@ -1,4 +1,4 @@
(ns logseq.db-sync.worker.routes
(ns logseq.db-sync.worker.routes.index
(:require [reitit.core :as r]))
(def ^:private route-data

View File

@@ -0,0 +1,18 @@
(ns logseq.db-sync.worker.routes.sync
(:require [reitit.core :as r]))
(def ^:private route-data
[["/health" {:methods {"GET" :sync/health}}]
["/pull" {:methods {"GET" :sync/pull}}]
["/snapshot/download" {:methods {"GET" :sync/snapshot-download}}]
["/admin/reset" {:methods {"DELETE" :sync/admin-reset}}]
["/tx/batch" {:methods {"POST" :sync/tx-batch}}]
["/snapshot/upload" {:methods {"POST" :sync/snapshot-upload}}]])
(def ^:private router
(r/router route-data))
(defn match-route [method path]
(when-let [match (r/match-by-path router path)]
(when-let [handler (get-in match [:data :methods method])]
(assoc match :handler handler))))

View File

@@ -0,0 +1,34 @@
(ns logseq.db-sync.worker.ws
(:require [lambdaisland.glogi :as log]
[logseq.db-sync.malli-schema :as db-sync-schema]
[logseq.db-sync.protocol :as protocol]
[logseq.db-sync.worker.coerce :as coerce]))
(defn ws-open? [ws]
(= 1 (.-readyState ws)))
(defn coerce-ws-client-message [message]
(when message
(let [coerced (coerce/coerce db-sync-schema/ws-client-message-coercer message {:schema :ws/client})]
(when-not (= coerced coerce/invalid-coerce)
coerced))))
(defn coerce-ws-server-message [message]
(when message
(let [coerced (coerce/coerce db-sync-schema/ws-server-message-coercer message {:schema :ws/server})]
(when-not (= coerced coerce/invalid-coerce)
coerced))))
(defn send! [ws msg]
(when (ws-open? ws)
(if-let [coerced (coerce-ws-server-message msg)]
(.send ws (protocol/encode-message coerced))
(do
(log/error :db-sync/ws-response-invalid {:message msg})
(.send ws (protocol/encode-message {:type "error" :message "server error"}))))))
(defn broadcast! [^js self sender msg]
(let [clients (.getWebSockets (.-state self))]
(doseq [ws clients]
(when (and (not= ws sender) (ws-open? ws))
(send! ws msg)))))

View File

@@ -1,6 +1,6 @@
(ns logseq.db-sync.worker-routes-test
(:require [cljs.test :refer [deftest is testing]]
[logseq.db-sync.worker.routes :as routes]))
[logseq.db-sync.worker.routes.index :as routes]))
(deftest match-route-graphs-test
(testing "graphs routes"

View File

@@ -0,0 +1,24 @@
(ns logseq.db-sync.worker-sync-routes-test
(:require [cljs.test :refer [deftest is testing]]
[logseq.db-sync.worker.routes.sync :as sync-routes]))
(deftest match-route-sync-test
(testing "sync routes"
(let [match (sync-routes/match-route "GET" "/health")]
(is (= :sync/health (:handler match))))
(let [match (sync-routes/match-route "GET" "/pull")]
(is (= :sync/pull (:handler match))))
(let [match (sync-routes/match-route "GET" "/snapshot/download")]
(is (= :sync/snapshot-download (:handler match))))
(let [match (sync-routes/match-route "DELETE" "/admin/reset")]
(is (= :sync/admin-reset (:handler match))))
(let [match (sync-routes/match-route "POST" "/tx/batch")]
(is (= :sync/tx-batch (:handler match))))
(let [match (sync-routes/match-route "POST" "/snapshot/upload")]
(is (= :sync/snapshot-upload (:handler match))))))
(deftest match-route-sync-method-mismatch-test
(testing "sync method mismatch returns nil"
(is (nil? (sync-routes/match-route "POST" "/health")))
(is (nil? (sync-routes/match-route "GET" "/admin/reset")))
(is (nil? (sync-routes/match-route "PUT" "/tx/batch")))))