diff --git a/deps/db-sync/src/logseq/db_sync/snapshot.cljs b/deps/db-sync/src/logseq/db_sync/snapshot.cljs index dda8d4413a..74ab9b06a9 100644 --- a/deps/db-sync/src/logseq/db_sync/snapshot.cljs +++ b/deps/db-sync/src/logseq/db_sync/snapshot.cljs @@ -18,16 +18,16 @@ (->uint8 (transit/write transit-w rows))) (defn decode-rows - [bytes] - (transit/read transit-r (.decode text-decoder (->uint8 bytes)))) + [payload] + (transit/read transit-r (.decode text-decoder (->uint8 payload)))) (defn frame-bytes - [^js bytes] - (let [len (.-byteLength bytes) + [^js payload] + (let [len (.-byteLength payload) out (js/Uint8Array. (+ 4 len)) view (js/DataView. (.-buffer out))] (.setUint32 view 0 len false) - (.set out bytes 4) + (.set out payload 4) out)) (defn concat-bytes diff --git a/deps/db-sync/src/logseq/db_sync/worker.cljs b/deps/db-sync/src/logseq/db_sync/worker.cljs index 32e9ada212..537423cc93 100644 --- a/deps/db-sync/src/logseq/db_sync/worker.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker.cljs @@ -1,782 +1,20 @@ (ns logseq.db-sync.worker (:require ["cloudflare:workers" :refer [DurableObject]] - [cljs-bean.core :as bean] - [clojure.string :as string] [lambdaisland.glogi :as log] [lambdaisland.glogi.console :as glogi-console] - [logseq.common.authorization :as authorization] - [logseq.db :as ldb] - [logseq.db-sync.batch :as batch] - [logseq.db-sync.common :as common :refer [cors-headers]] - [logseq.db-sync.index :as index] - [logseq.db-sync.malli-schema :as db-sync-schema] - [logseq.db-sync.protocol :as protocol] - [logseq.db-sync.snapshot :as snapshot] - [logseq.db-sync.storage :as storage] - [logseq.db-sync.worker.routes :as routes] - [promesa.core :as p] + [logseq.db-sync.common :as common] + [logseq.db-sync.worker.dispatch :as dispatch] + [logseq.db-sync.worker.handler.sync :as sync-handler] + [logseq.db-sync.worker.handler.ws :as ws-handler] + [logseq.db-sync.worker.presence :as presence] + [logseq.db-sync.worker.ws :as ws] [shadow.cljs.modern :refer (defclass)])) (glogi-console/install!) -(defn- bearer-token [auth-header] - (when (and (string? auth-header) (string/starts-with? auth-header "Bearer ")) - (subs auth-header 7))) - -(defn- token-from-request [request] - (or (bearer-token (.get (.-headers request) "authorization")) - (let [url (js/URL. (.-url request))] - (.get (.-searchParams url) "token")))) - -(defn- decode-jwt-part [part] - (let [pad (if (pos? (mod (count part) 4)) - (apply str (repeat (- 4 (mod (count part) 4)) "=")) - "") - base64 (-> (str part pad) - (string/replace "-" "+") - (string/replace "_" "/")) - raw (js/atob base64)] - (js/JSON.parse raw))) - -(defn- unsafe-jwt-claims [token] - (try - (when (string? token) - (let [parts (string/split token #"\.")] - (when (= 3 (count parts)) - (decode-jwt-part (nth parts 1))))) - (catch :default _ - nil))) - -(defn- auth-claims [request env] - (let [token (token-from-request request)] - (if (string? token) - (.catch (authorization/verify-jwt token env) (fn [_] nil)) - (js/Promise.resolve nil)))) - -(declare handle-index-fetch) - -(defn- graph-access-response [request env graph-id] - (let [token (token-from-request request) - url (js/URL. (.-url request)) - access-url (str (.-origin url) "/graphs/" graph-id "/access") - headers (js/Headers. (.-headers request)) - index-self #js {:env env :d1 (aget env "DB")}] - (when (string? token) - (.set headers "authorization" (str "Bearer " token))) - (handle-index-fetch index-self (js/Request. access-url #js {:method "GET" :headers headers})))) - -(defn- parse-asset-path [path] - (let [prefix "/assets/"] - (when (string/starts-with? path prefix) - (let [rest-path (subs path (count prefix)) - slash-idx (string/index-of rest-path "/") - graph-id (when (and slash-idx (pos? slash-idx)) (subs rest-path 0 slash-idx)) - file (when (and slash-idx (pos? slash-idx)) (subs rest-path (inc slash-idx))) - dot-idx (when file (string/last-index-of file ".")) - asset-uuid (when (and dot-idx (pos? dot-idx)) (subs file 0 dot-idx)) - asset-type (when (and dot-idx (pos? dot-idx)) (subs file (inc dot-idx)))] - (when (and (seq graph-id) (seq asset-uuid) (seq asset-type)) - {:graph-id graph-id - :asset-uuid asset-uuid - :asset-type asset-type - :key (str graph-id "/" asset-uuid "." asset-type)}))))) - -(defn- ensure-schema! [^js self] - (when-not (true? (.-schema-ready self)) - (storage/init-schema! (.-sql self)) - (set! (.-schema-ready self) true))) - -(defn- ensure-conn! [^js self] - (ensure-schema! self) - (when-not (.-conn self) - (set! (.-conn self) (storage/open-conn (.-sql self))))) - -(defn- t-now [^js self] - (ensure-schema! self) - (storage/get-t (.-sql self))) - -(defn- ws-open? [ws] - (= 1 (.-readyState ws))) - -(def ^:private invalid-coerce ::invalid-coerce) - -(defn- coerce - [coercer value context] - (try - (coercer value) - (catch :default e - (log/error :db-sync/malli-coerce-failed (merge context {:error e :value value})) - invalid-coerce))) - -(defn- coerce-ws-client-message [message] - (when message - (let [coerced (coerce db-sync-schema/ws-client-message-coercer message {:schema :ws/client})] - (when-not (= coerced invalid-coerce) - coerced)))) - -(defn- coerce-ws-server-message [message] - (when message - (let [coerced (coerce db-sync-schema/ws-server-message-coercer message {:schema :ws/server})] - (when-not (= coerced invalid-coerce) - coerced)))) - -(defn- send! [ws msg] - (when (ws-open? ws) - (if-let [coerced (coerce-ws-server-message msg)] - (.send ws (protocol/encode-message coerced)) - (do - (log/error :db-sync/ws-response-invalid {:message msg}) - (.send ws (protocol/encode-message {:type "error" :message "server error"})))))) - -(defn- broadcast! [^js self sender msg] - (let [clients (.getWebSockets (.-state self))] - (doseq [ws clients] - (when (and (not= ws sender) (ws-open? ws)) - (send! ws msg))))) - -(defn- claims->user - [claims] - (when claims - (let [user-id (aget claims "sub") - email (aget claims "email") - username (or (aget claims "preferred_username") - (aget claims "cognito:username") - (aget claims "username")) - name (aget claims "name")] - (when (string? user-id) - (cond-> {:user-id user-id} - (string? email) (assoc :email email) - (string? username) (assoc :username username) - (string? name) (assoc :name name)))))) - -(defn- presence* - [^js self] - (or (.-presence self) - (set! (.-presence self) (atom {})))) - -(defn- online-users - [^js self] - (vec (distinct (vals @(presence* self))))) - -(defn- broadcast-online-users! - [^js self] - (broadcast! self nil {:type "online-users" :online-users (online-users self)})) - -(defn- add-presence! - [^js self ^js ws user] - (swap! (presence* self) assoc ws user) - (.serializeAttachment ws (clj->js user))) - -(defn- update-presence! - [^js self ^js ws {:keys [editing-block-uuid] :as updates}] - (swap! (presence* self) - (fn [presence] - (if-let [user (get presence ws)] - (let [user' (if (contains? updates :editing-block-uuid) - (if (and (string? editing-block-uuid) - (not (string/blank? editing-block-uuid))) - (assoc user :editing-block-uuid editing-block-uuid) - (dissoc user :editing-block-uuid)) - user)] - (.serializeAttachment ws (clj->js user')) - (assoc presence ws user')) - presence)))) - -(defn- remove-presence! - [^js self ^js ws] - (swap! (presence* self) dissoc ws)) - -(defn- coerce-http-request [schema-key body] - (if-let [coercer (get db-sync-schema/http-request-coercers schema-key)] - (let [coerced (coerce coercer body {:schema schema-key :dir :request})] - (when-not (= coerced invalid-coerce) - coerced)) - body)) - -(defn- json-response - ([schema-key data] (json-response schema-key data 200)) - ([schema-key data status] - (if-let [coercer (get db-sync-schema/http-response-coercers schema-key)] - (let [coerced (coerce coercer data {:schema schema-key :dir :response})] - (if (= coerced invalid-coerce) - (common/json-response {:error "server error"} 500) - (common/json-response coerced status))) - (common/json-response data status)))) - -(defn- error-response [message status] - (json-response :error {:error message} status)) - -(defn- bad-request [message] - (error-response message 400)) - -(defn- unauthorized [] - (error-response "unauthorized" 401)) - -(defn- forbidden [] - (error-response "forbidden" 403)) - -(defn- not-found [] - (error-response "not found" 404)) - -(defn- parse-int [value] - (when (some? value) - (let [n (js/parseInt value 10)] - (when-not (js/isNaN n) - n)))) - -(def ^:private max-asset-size (* 100 1024 1024)) -(def ^:private snapshot-download-batch-size 500) -(def ^:private snapshot-cache-control "private, max-age=300") -(def ^:private snapshot-content-type "application/transit+json") -(def ^:private snapshot-content-encoding "gzip") -;; 10m -(def ^:private snapshot-multipart-part-size (* 10 1024 1024)) - -(defn- fetch-kvs-rows - [sql after limit] - (common/get-sql-rows - (common/sql-exec sql - "select addr, content, addresses from kvs where addr > ? order by addr asc limit ?" - after - limit))) - -(defn- snapshot-row->tuple [row] - (if (array? row) - [(aget row 0) (aget row 1) (aget row 2)] - [(aget row "addr") (aget row "content") (aget row "addresses")])) - -(defn- import-snapshot-rows! - [sql table rows] - (when (seq rows) - (doseq [batch (batch/rows->insert-batches table rows nil)] - (let [sql-str (:sql batch) - args (:args batch)] - (apply common/sql-exec sql sql-str args))))) - -(defn- reset-import! - [sql] - (common/sql-exec sql "delete from kvs") - (common/sql-exec sql "delete from tx_log") - (common/sql-exec sql "delete from sync_meta") - (storage/set-t! sql 0)) - -(defn- graph-id-from-request [request] - (let [header-id (.get (.-headers request) "x-graph-id") - url (js/URL. (.-url request)) - param-id (.get (.-searchParams url) "graph-id")] - (when (seq (or header-id param-id)) - (or header-id param-id)))) - -(defn- snapshot-key [graph-id snapshot-id] - (str graph-id "/" snapshot-id ".snapshot")) - -(defn- snapshot-url [request graph-id snapshot-id] - (let [url (js/URL. (.-url request))] - (str (.-origin url) "/assets/" graph-id "/" snapshot-id ".snapshot"))) - -(defn- maybe-decompress-stream [stream encoding] - (if (and (= encoding snapshot-content-encoding) (exists? js/DecompressionStream)) - (.pipeThrough stream (js/DecompressionStream. "gzip")) - stream)) - -(defn- ->uint8 [data] - (cond - (instance? js/Uint8Array data) data - (instance? js/ArrayBuffer data) (js/Uint8Array. data) - :else (js/Uint8Array. data))) - -(defn- concat-uint8 [^js a ^js b] - (cond - (nil? a) b - (nil? b) a - :else - (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))] - (.set out a 0) - (.set out b (.-byteLength a)) - out))) - -(defn- snapshot-export-stream [^js self] - (let [sql (.-sql self) - state (volatile! {:after -1 :done? false})] - (js/ReadableStream. - #js {:pull (fn [controller] - (p/let [{:keys [after done?]} @state] - (if done? - (.close controller) - (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size) - rows (mapv snapshot-row->tuple rows) - last-addr (if (seq rows) - (apply max (map first rows)) - after) - done? (< (count rows) snapshot-download-batch-size)] - (when (seq rows) - (let [payload (snapshot/encode-rows rows) - framed (snapshot/frame-bytes payload)] - (.enqueue controller framed))) - (vswap! state assoc :after last-addr :done? done?)))))}))) - -(defn- upload-multipart! - [^js bucket key stream opts] - (p/let [^js upload (.createMultipartUpload bucket key opts)] - (let [reader (.getReader stream)] - (-> (p/loop [buffer nil - part-number 1 - parts []] - (p/let [chunk (.read reader)] - (if (.-done chunk) - (cond - (and buffer (pos? (.-byteLength buffer))) - (p/let [^js resp (.uploadPart upload part-number buffer) - parts (conj parts {:partNumber part-number :etag (.-etag resp)})] - (p/let [_ (.complete upload (clj->js parts))] - {:ok true})) - - (seq parts) - (p/let [_ (.complete upload (clj->js parts))] - {:ok true}) - - :else - (p/let [_ (.abort upload)] - (.put bucket key (js/Uint8Array. 0) opts))) - (let [value (.-value chunk) - buffer (concat-uint8 buffer (->uint8 value))] - (if (>= (.-byteLength buffer) snapshot-multipart-part-size) - (let [part (.slice buffer 0 snapshot-multipart-part-size) - rest-parts (.slice buffer snapshot-multipart-part-size (.-byteLength buffer))] - (p/let [^js resp (.uploadPart upload part-number part) - parts (conj parts {:partNumber part-number :etag (.-etag resp)})] - (p/recur rest-parts (inc part-number) parts))) - (p/recur buffer part-number parts)))))) - (p/catch (fn [error] - (.abort upload) - (throw error))))))) - -(defn- snapshot-export-length [^js self] - (let [sql (.-sql self)] - (p/loop [after -1 - total 0] - (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)] - (if (empty? rows) - total - (let [rows (mapv snapshot-row->tuple rows) - payload (snapshot/encode-rows rows) - total (+ total 4 (.-byteLength payload)) - last-addr (apply max (map first rows)) - done? (< (count rows) snapshot-download-batch-size)] - (if done? - total - (p/recur last-addr total)))))))) - -(defn- snapshot-export-fixed-length [^js self] - (p/let [length (snapshot-export-length self) - stream (snapshot-export-stream self)] - (if (exists? js/FixedLengthStream) - (let [^js fixed (js/FixedLengthStream. length) - readable (.-readable fixed) - writable (.-writable fixed) - reader (.getReader stream) - writer (.getWriter writable)] - (p/let [_ (p/loop [] - (p/let [chunk (.read reader)] - (if (.-done chunk) - (.close writer) - (p/let [_ (.write writer (.-value chunk))] - (p/recur)))))] - readable)) - (p/let [resp (js/Response. stream) - buf (.arrayBuffer resp)] - buf)))) - -(declare import-snapshot!) -(defn- import-snapshot-stream! [^js self stream reset?] - (let [reader (.getReader stream) - reset-pending? (volatile! reset?) - total-count (volatile! 0)] - (p/let [buffer nil] - (p/catch - (p/loop [buffer buffer] - (p/let [chunk (.read reader)] - (if (.-done chunk) - (let [rows (snapshot/finalize-framed-buffer buffer) - rows-count (count rows) - reset? (and @reset-pending? true)] - (when (or reset? (seq rows)) - (import-snapshot! self rows reset?) - (vreset! reset-pending? false)) - (vswap! total-count + rows-count) - @total-count) - (let [value (.-value chunk) - {:keys [rows buffer]} (snapshot/parse-framed-chunk buffer value) - rows-count (count rows) - reset? (and @reset-pending? (seq rows))] - (when (seq rows) - (import-snapshot! self rows (true? reset?)) - (vreset! reset-pending? false)) - (vswap! total-count + rows-count) - (p/recur buffer))))) - (fn [error] - (throw error)))))) - -(defn- handle-assets [request ^js env] - (let [url (js/URL. (.-url request)) - path (.-pathname url) - method (.-method request)] - (cond - (= method "OPTIONS") - (js/Response. nil #js {:status 204 :headers (cors-headers)}) - - :else - (if-let [{:keys [key asset-type]} (parse-asset-path path)] - (let [^js bucket (.-LOGSEQ_SYNC_ASSETS env)] - (if-not bucket - (error-response "missing assets bucket" 500) - (case method - "GET" - (.then (.get bucket key) - (fn [^js obj] - (if (nil? obj) - (error-response "not found" 404) - (let [metadata (.-httpMetadata obj) - content-type (or (.-contentType metadata) - "application/octet-stream") - content-encoding (.-contentEncoding metadata) - cache-control (.-cacheControl metadata) - headers (cond-> {"content-type" content-type - "x-asset-type" asset-type} - (and (string? content-encoding) - (not= content-encoding "null") - (pos? (.-length content-encoding))) - (assoc "content-encoding" content-encoding) - (and (string? cache-control) - (pos? (.-length cache-control))) - (assoc "cache-control" cache-control) - true - (bean/->js))] - (js/Response. (.-body obj) - #js {:status 200 - :headers (js/Object.assign - headers - (cors-headers))}))))) - - "PUT" - (.then (.arrayBuffer request) - (fn [buf] - (if (> (.-byteLength buf) max-asset-size) - (error-response "asset too large" 413) - (.then (.put bucket - key - buf - #js {:httpMetadata #js {:contentType (or (.get (.-headers request) "content-type") - "application/octet-stream")} - :customMetadata #js {:checksum (.get (.-headers request) "x-amz-meta-checksum") - :type asset-type}}) - (fn [_] - (json-response :assets/put {:ok true} 200)))))) - - "DELETE" - (.then (.delete bucket key) - (fn [_] - (json-response :assets/delete {:ok true} 200))) - - (error-response "method not allowed" 405)))) - (error-response "invalid asset path" 400))))) - -(defn- handle-worker-fetch [request ^js env] - (let [url (js/URL. (.-url request)) - path (.-pathname url) - method (.-method request)] - (cond - (= path "/health") - (json-response :worker/health {:ok true}) - - (or (= path "/graphs") - (string/starts-with? path "/graphs/")) - (handle-index-fetch #js {:env env :d1 (aget env "DB")} request) - - (string/starts-with? path "/e2ee") - (handle-index-fetch #js {:env env :d1 (aget env "DB")} request) - - (string/starts-with? path "/assets/") - (if (= method "OPTIONS") - (handle-assets request env) - (if-let [{:keys [graph-id]} (parse-asset-path path)] - (p/let [access-resp (graph-access-response request env graph-id)] - (if (.-ok access-resp) - (handle-assets request env) - access-resp)) - (bad-request "invalid asset path"))) - - (= method "OPTIONS") - (common/options-response) - - (string/starts-with? path "/sync/") - (let [prefix (count "/sync/") - rest-path (subs path prefix) - rest-path (if (string/starts-with? rest-path "/") - (subs rest-path 1) - rest-path) - slash-idx (or (string/index-of rest-path "/") -1) - graph-id (if (neg? slash-idx) rest-path (subs rest-path 0 slash-idx)) - tail (if (neg? slash-idx) - "/" - (subs rest-path slash-idx)) - new-url (js/URL. (str (.-origin url) tail (.-search url)))] - (if (seq graph-id) - (if (= method "OPTIONS") - (common/options-response) - (p/let [access-resp (graph-access-response request env graph-id)] - (if (.-ok access-resp) - (let [^js namespace (.-LOGSEQ_SYNC_DO env) - do-id (.idFromName namespace graph-id) - stub (.get namespace do-id)] - (if (common/upgrade-request? request) - (.fetch stub request) - (do - (.set (.-searchParams new-url) "graph-id" graph-id) - (let [rewritten (js/Request. (.toString new-url) request)] - (.fetch stub rewritten))))) - access-resp))) - (bad-request "missing graph id"))) - - :else - (not-found)))) - (def worker #js {:fetch (fn [request env _ctx] - (handle-worker-fetch request env))}) - -(defn- pull-response [^js self since] - (let [sql (.-sql self) - txs (storage/fetch-tx-since sql since) - response {:type "pull/ok" - :t (t-now self) - :txs txs}] - response)) - -;; FIXME: memory limit, should re-download graph using sqlite table rows -;; (defn- snapshot-response [^js self] -;; (let [conn (.-conn self) -;; db @conn -;; datoms (protocol/datoms->wire (d/datoms db :eavt))] -;; {:type "snapshot/ok" -;; :t (t-now self) -;; :datoms (common/write-transit datoms)})) - -(defn- import-snapshot! [^js self rows reset?] - (let [sql (.-sql self)] - (ensure-schema! self) - (when reset? - (reset-import! sql)) - (import-snapshot-rows! sql "kvs" rows))) - -(defn- apply-tx! [^js self sender txs] - (let [sql (.-sql self)] - (ensure-conn! self) - (let [conn (.-conn self) - tx-data (protocol/transit->tx txs)] - (ldb/transact! conn tx-data {:op :apply-client-tx}) - (let [new-t (storage/get-t sql)] - ;; FIXME: no need to broadcast if client tx is less than remote tx - (broadcast! self sender {:type "changed" :t new-t}) - new-t)))) - -(defn- handle-tx-batch! [^js self sender txs t-before] - (let [current-t (t-now self)] - (cond - (or (not (number? t-before)) (neg? t-before)) - {:type "tx/reject" - :reason "invalid t-before"} - - (not= t-before current-t) - {:type "tx/reject" - :reason "stale" - :t current-t} - - :else - (if txs - (let [new-t (apply-tx! self sender txs)] - (if (and (map? new-t) (= "tx/reject" (:type new-t))) - new-t - {:type "tx/batch/ok" - :t new-t})) - {:type "tx/reject" - :reason "empty tx data"})))) - -(defn- handle-ws-message! [^js self ^js ws raw] - (let [message (-> raw protocol/parse-message coerce-ws-client-message)] - (if-not (map? message) - (send! ws {:type "error" :message "invalid request"}) - (case (:type message) - "hello" - (send! ws {:type "hello" :t (t-now self)}) - - "ping" - (send! ws {:type "pong"}) - - "presence" - (let [editing-block-uuid (:editing-block-uuid message)] - (update-presence! self ws {:editing-block-uuid editing-block-uuid}) - (broadcast-online-users! self)) - - "pull" - (let [raw-since (:since message) - since (if (some? raw-since) (parse-int raw-since) 0)] - (if (or (and (some? raw-since) (not (number? since))) (neg? since)) - (send! ws {:type "error" :message "invalid since"}) - (send! ws (pull-response self since)))) - - ;; "snapshot" - ;; (send! ws (snapshot-response self)) - - "tx/batch" - (let [txs (:txs message) - t-before (parse-int (:t-before message))] - (if (string? txs) - (send! ws (handle-tx-batch! self ws txs t-before)) - (send! ws {:type "tx/reject" :reason "invalid tx"}))) - - (send! ws {:type "error" :message "unknown type"}))))) - -(defn- handle-ws [^js self request] - (let [pair (js/WebSocketPair.) - client (aget pair 0) - server (aget pair 1) - state (.-state self)] - (.acceptWebSocket state server) - (let [token (token-from-request request) - claims (unsafe-jwt-claims token) - user (claims->user claims)] - (when user - (add-presence! self server user)) - (broadcast-online-users! self)) - (js/Response. nil #js {:status 101 :webSocket client}))) - -(defn- strip-sync-prefix [path] - (if (string/starts-with? path "/sync/") - (let [rest-path (subs path (count "/sync/")) - slash-idx (string/index-of rest-path "/")] - (if (neg? slash-idx) - "/" - (subs rest-path slash-idx))) - path)) - -(defn- handle-http [^js self request] - (letfn [(with-cors-error [resp] - (if (instance? js/Promise resp) - (.catch resp - (fn [e] - (log/error :db-sync/http-error {:error e}) - (error-response "server error" 500))) - resp))] - (try - (let [url (js/URL. (.-url request)) - raw-path (.-pathname url) - path (strip-sync-prefix raw-path) - method (.-method request)] - (with-cors-error - (cond - (= method "OPTIONS") - (common/options-response) - - (and (= method "GET") (= path "/health")) - (json-response :sync/health {:ok true}) - - (and (= method "GET") (= path "/pull")) - (let [raw-since (.get (.-searchParams url) "since") - since (if (some? raw-since) (parse-int raw-since) 0)] - (if (or (and (some? raw-since) (not (number? since))) (neg? since)) - (bad-request "invalid since") - (json-response :sync/pull (pull-response self since)))) - - ;; (and (= method "GET") (= path "/snapshot")) - ;; (common/json-response (snapshot-response self)) - - (and (= method "GET") (= path "/snapshot/download")) - (let [graph-id (graph-id-from-request request) - ^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))] - (cond - (not (seq graph-id)) - (bad-request "missing graph id") - - (nil? bucket) - (error-response "missing assets bucket" 500) - - :else - (p/let [snapshot-id (str (random-uuid)) - key (snapshot-key graph-id snapshot-id) - stream (snapshot-export-stream self) - multipart? (and (some? (.-createMultipartUpload bucket)) - (fn? (.-createMultipartUpload bucket))) - opts #js {:httpMetadata #js {:contentType snapshot-content-type - :contentEncoding nil - :cacheControl snapshot-cache-control} - :customMetadata #js {:purpose "snapshot" - :created-at (str (common/now-ms))}} - _ (if multipart? - (upload-multipart! bucket key stream opts) - (p/let [body (snapshot-export-fixed-length self)] - (.put bucket key body opts))) - url (snapshot-url request graph-id snapshot-id)] - (json-response :sync/snapshot-download {:ok true - :key key - :url url - :content-encoding nil})))) - - (and (= method "DELETE") (= path "/admin/reset")) - (do - (common/sql-exec (.-sql self) "drop table if exists kvs") - (common/sql-exec (.-sql self) "drop table if exists tx_log") - (common/sql-exec (.-sql self) "drop table if exists sync_meta") - (storage/init-schema! (.-sql self)) - (set! (.-schema-ready self) true) - (set! (.-conn self) nil) - (json-response :sync/admin-reset {:ok true})) - - (and (= method "POST") (= path "/tx/batch")) - (.then (common/read-json request) - (fn [result] - (if (nil? result) - (bad-request "missing body") - (let [body (js->clj result :keywordize-keys true) - body (coerce-http-request :sync/tx-batch body)] - (if (nil? body) - (bad-request "invalid tx") - (let [{:keys [txs t-before]} body - t-before (parse-int t-before)] - (if (string? txs) - (json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before)) - (bad-request "invalid tx")))))))) - - (and (= method "POST") (= path "/snapshot/upload")) - (let [graph-id (graph-id-from-request request) - reset-param (.get (.-searchParams url) "reset") - reset? (if (nil? reset-param) - true - (not (contains? #{"false" "0"} reset-param))) - req-encoding (.get (.-headers request) "content-encoding")] - (cond - (not (seq graph-id)) - (bad-request "missing graph id") - - (nil? (.-body request)) - (bad-request "missing body") - - :else - (let [stream (.-body request) - encoding (or req-encoding "")] - (if (and (= encoding snapshot-content-encoding) - (not (exists? js/DecompressionStream))) - (error-response "gzip not supported" 500) - (p/let [stream (maybe-decompress-stream stream encoding) - count (import-snapshot-stream! self stream reset?)] - (json-response :sync/snapshot-upload {:ok true - :count count})))))) - - :else - (not-found)))) - (catch :default e - (log/error :db-sync/http-error {:error e}) - (error-response "server error" 500))))) + (dispatch/handle-worker-fetch request env))}) (defclass SyncDO (extends DurableObject) @@ -788,7 +26,7 @@ (set! (.-sql this) (.-sql ^js (.-storage state))) (set! (.-conn this) nil) (set! (.-schema-ready this) false) - (let [presence (presence* this) + (let [presence (presence/presence* this) sockets (.getWebSockets state)] (doseq [^js ws sockets] (when-let [attachment (.deserializeAttachment ws)] @@ -800,340 +38,20 @@ Object (fetch [this request] (if (common/upgrade-request? request) - (handle-ws this request) - (handle-http this request))) + (ws-handler/handle-ws this request) + (sync-handler/handle-http this request))) (webSocketMessage [this ws message] (try - (handle-ws-message! this ws message) + (ws-handler/handle-ws-message! this ws message) (catch :default e (log/error :db-sync/ws-error e) (js/console.error e) - (send! ws {:type "error" :message "server error"})))) + (ws/send! ws {:type "error" :message "server error"})))) (webSocketClose [this ws _code _reason] - (remove-presence! this ws) - (broadcast-online-users! this) + (presence/remove-presence! this ws) + (presence/broadcast-online-users! this) (log/info :db-sync/ws-closed true)) (webSocketError [this ws error] - (remove-presence! this ws) - (broadcast-online-users! this) + (presence/remove-presence! this ws) + (presence/broadcast-online-users! this) (log/error :db-sync/ws-error {:error error}))) - -(defn- index-db [^js self] - (let [db (.-d1 self)] - (when-not db - (log/error :db-sync/index-db-missing {:binding "DB"})) - db)) - -(defn- handle-index-fetch [^js self request] - (let [db (index-db self) - env (.-env self) - url (js/URL. (.-url request)) - path (.-pathname url) - method (.-method request)] - (try - (cond - (contains? #{"OPTIONS" "HEAD"} method) - (common/options-response) - - (nil? db) - (error-response "server error" 500) - - :else - (p/let [_ (index/clj result :keywordize-keys true) - body (coerce-http-request :graphs/create body) - graph-id (str (random-uuid)) - user-id (aget claims "sub")] - (cond - (not (string? user-id)) - (unauthorized) - - (nil? body) - (bad-request "invalid body") - - :else - (p/let [{:keys [graph-name schema-version]} body - name-exists? (index/clj result :keywordize-keys true) - body (coerce-http-request :graph-members/create body) - member-id (:user-id body) - email (:email body) - role (or (:role body) "member")] - (cond - (nil? body) - (bad-request "invalid body") - - (and (not (string? member-id)) - (not (string? email))) - (bad-request "invalid user") - - :else - (p/let [manager? (index/clj result :keywordize-keys true) - body (coerce-http-request :graph-members/update body) - role (:role body)] - (cond - (nil? body) - (bad-request "invalid body") - - :else - (p/let [manager? (index/clj result :keywordize-keys true) - body (coerce-http-request :e2ee/user-keys body) - user-id (aget claims "sub")] - (cond - (not (string? user-id)) - (unauthorized) - - (nil? body) - (bad-request "invalid body") - - :else - (let [{:keys [public-key encrypted-private-key]} body] - (p/let [_ (index/ {} - (some? public-key) - (assoc :public-key public-key))))) - - :e2ee/graph-aes-key-get - (let [user-id (aget claims "sub")] - (cond - (not (string? user-id)) - (unauthorized) - - :else - (p/let [access? (index/ {} - (some? encrypted-aes-key) - (assoc :encrypted-aes-key encrypted-aes-key)))))))) - - :e2ee/graph-aes-key-post - (let [user-id (aget claims "sub")] - (cond - (not (string? user-id)) - (unauthorized) - - :else - (.then (common/read-json request) - (fn [result] - (if (nil? result) - (bad-request "missing body") - (let [body (js->clj result :keywordize-keys true) - body (coerce-http-request :e2ee/graph-aes-key body)] - (if (nil? body) - (bad-request "invalid body") - (p/let [access? (index/clj result :keywordize-keys true) - body (coerce-http-request :e2ee/grant-access body)] - (if (nil? body) - (bad-request "invalid body") - (p/let [manager? (index/ {:ok true} - (seq @missing) - (assoc :missing-users @missing)))))))))))))) - - (not-found)) - - :else - (not-found))))) - (catch :default error - (log/error :db-sync/index-error error) - (error-response "server error" 500))))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/auth.cljs b/deps/db-sync/src/logseq/db_sync/worker/auth.cljs new file mode 100644 index 0000000000..7018399dd8 --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/auth.cljs @@ -0,0 +1,37 @@ +(ns logseq.db-sync.worker.auth + (:require [clojure.string :as string] + [logseq.common.authorization :as authorization])) + +(defn- bearer-token [auth-header] + (when (and (string? auth-header) (string/starts-with? auth-header "Bearer ")) + (subs auth-header 7))) + +(defn token-from-request [request] + (or (bearer-token (.get (.-headers request) "authorization")) + (let [url (js/URL. (.-url request))] + (.get (.-searchParams url) "token")))) + +(defn- decode-jwt-part [part] + (let [pad (if (pos? (mod (count part) 4)) + (apply str (repeat (- 4 (mod (count part) 4)) "=")) + "") + base64 (-> (str part pad) + (string/replace "-" "+") + (string/replace "_" "/")) + raw (js/atob base64)] + (js/JSON.parse raw))) + +(defn unsafe-jwt-claims [token] + (try + (when (string? token) + (let [parts (string/split token #"\.")] + (when (= 3 (count parts)) + (decode-jwt-part (nth parts 1))))) + (catch :default _ + nil))) + +(defn auth-claims [request env] + (let [token (token-from-request request)] + (if (string? token) + (.catch (authorization/verify-jwt token env) (fn [_] nil)) + (js/Promise.resolve nil)))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/coerce.cljs b/deps/db-sync/src/logseq/db_sync/worker/coerce.cljs new file mode 100644 index 0000000000..68ce38dc52 --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/coerce.cljs @@ -0,0 +1,12 @@ +(ns logseq.db-sync.worker.coerce + (:require [lambdaisland.glogi :as log])) + +(def invalid-coerce ::invalid-coerce) + +(defn coerce + [coercer value context] + (try + (coercer value) + (catch :default e + (log/error :db-sync/malli-coerce-failed (merge context {:error e :value value})) + invalid-coerce))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/dispatch.cljs b/deps/db-sync/src/logseq/db_sync/worker/dispatch.cljs new file mode 100644 index 0000000000..762510c708 --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/dispatch.cljs @@ -0,0 +1,67 @@ +(ns logseq.db-sync.worker.dispatch + (:require [clojure.string :as string] + [logseq.db-sync.common :as common] + [logseq.db-sync.worker.handler.assets :as assets-handler] + [logseq.db-sync.worker.handler.index :as index-handler] + [logseq.db-sync.worker.http :as http] + [promesa.core :as p])) + +(defn handle-worker-fetch [request ^js env] + (let [url (js/URL. (.-url request)) + path (.-pathname url) + method (.-method request)] + (cond + (= path "/health") + (http/json-response :worker/health {:ok true}) + + (or (= path "/graphs") + (string/starts-with? path "/graphs/")) + (index-handler/handle-fetch #js {:env env :d1 (aget env "DB")} request) + + (string/starts-with? path "/e2ee") + (index-handler/handle-fetch #js {:env env :d1 (aget env "DB")} request) + + (string/starts-with? path "/assets/") + (if (= method "OPTIONS") + (assets-handler/handle request env) + (if-let [{:keys [graph-id]} (assets-handler/parse-asset-path path)] + (p/let [access-resp (index-handler/graph-access-response request env graph-id)] + (if (.-ok access-resp) + (assets-handler/handle request env) + access-resp)) + (http/bad-request "invalid asset path"))) + + (= method "OPTIONS") + (common/options-response) + + (string/starts-with? path "/sync/") + (let [prefix (count "/sync/") + rest-path (subs path prefix) + rest-path (if (string/starts-with? rest-path "/") + (subs rest-path 1) + rest-path) + slash-idx (or (string/index-of rest-path "/") -1) + graph-id (if (neg? slash-idx) rest-path (subs rest-path 0 slash-idx)) + tail (if (neg? slash-idx) + "/" + (subs rest-path slash-idx)) + new-url (js/URL. (str (.-origin url) tail (.-search url)))] + (if (seq graph-id) + (if (= method "OPTIONS") + (common/options-response) + (p/let [access-resp (index-handler/graph-access-response request env graph-id)] + (if (.-ok access-resp) + (let [^js namespace (.-LOGSEQ_SYNC_DO env) + do-id (.idFromName namespace graph-id) + stub (.get namespace do-id)] + (if (common/upgrade-request? request) + (.fetch stub request) + (do + (.set (.-searchParams new-url) "graph-id" graph-id) + (let [rewritten (js/Request. (.toString new-url) request)] + (.fetch stub rewritten))))) + access-resp))) + (http/bad-request "missing graph id"))) + + :else + (http/not-found)))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs new file mode 100644 index 0000000000..01eacc35b8 --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs @@ -0,0 +1,87 @@ +(ns logseq.db-sync.worker.handler.assets + (:require [cljs-bean.core :as bean] + [clojure.string :as string] + [logseq.db-sync.common :as common :refer [cors-headers]] + [logseq.db-sync.worker.http :as http])) + +(def ^:private max-asset-size (* 100 1024 1024)) + +(defn parse-asset-path [path] + (let [prefix "/assets/"] + (when (string/starts-with? path prefix) + (let [rest-path (subs path (count prefix)) + slash-idx (string/index-of rest-path "/") + graph-id (when (and slash-idx (pos? slash-idx)) (subs rest-path 0 slash-idx)) + file (when (and slash-idx (pos? slash-idx)) (subs rest-path (inc slash-idx))) + dot-idx (when file (string/last-index-of file ".")) + asset-uuid (when (and dot-idx (pos? dot-idx)) (subs file 0 dot-idx)) + asset-type (when (and dot-idx (pos? dot-idx)) (subs file (inc dot-idx)))] + (when (and (seq graph-id) (seq asset-uuid) (seq asset-type)) + {:graph-id graph-id + :asset-uuid asset-uuid + :asset-type asset-type + :key (str graph-id "/" asset-uuid "." asset-type)}))))) + +(defn handle [request ^js env] + (let [url (js/URL. (.-url request)) + path (.-pathname url) + method (.-method request)] + (cond + (= method "OPTIONS") + (js/Response. nil #js {:status 204 :headers (cors-headers)}) + + :else + (if-let [{:keys [key asset-type]} (parse-asset-path path)] + (let [^js bucket (.-LOGSEQ_SYNC_ASSETS env)] + (if-not bucket + (http/error-response "missing assets bucket" 500) + (case method + "GET" + (.then (.get bucket key) + (fn [^js obj] + (if (nil? obj) + (http/error-response "not found" 404) + (let [metadata (.-httpMetadata obj) + content-type (or (.-contentType metadata) + "application/octet-stream") + content-encoding (.-contentEncoding metadata) + cache-control (.-cacheControl metadata) + headers (cond-> {"content-type" content-type + "x-asset-type" asset-type} + (and (string? content-encoding) + (not= content-encoding "null") + (pos? (.-length content-encoding))) + (assoc "content-encoding" content-encoding) + (and (string? cache-control) + (pos? (.-length cache-control))) + (assoc "cache-control" cache-control) + true + (bean/->js))] + (js/Response. (.-body obj) + #js {:status 200 + :headers (js/Object.assign + headers + (cors-headers))}))))) + + "PUT" + (.then (.arrayBuffer request) + (fn [buf] + (if (> (.-byteLength buf) max-asset-size) + (http/error-response "asset too large" 413) + (.then (.put bucket + key + buf + #js {:httpMetadata #js {:contentType (or (.get (.-headers request) "content-type") + "application/octet-stream")} + :customMetadata #js {:checksum (.get (.-headers request) "x-amz-meta-checksum") + :type asset-type}}) + (fn [_] + (http/json-response :assets/put {:ok true} 200)))))) + + "DELETE" + (.then (.delete bucket key) + (fn [_] + (http/json-response :assets/delete {:ok true} 200))) + + (http/error-response "method not allowed" 405)))) + (http/error-response "invalid asset path" 400))))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/index.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/index.cljs new file mode 100644 index 0000000000..3a8360383e --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/index.cljs @@ -0,0 +1,331 @@ +(ns logseq.db-sync.worker.handler.index + (:require [lambdaisland.glogi :as log] + [logseq.db-sync.common :as common] + [logseq.db-sync.index :as index] + [logseq.db-sync.worker.auth :as auth] + [logseq.db-sync.worker.http :as http] + [logseq.db-sync.worker.routes.index :as routes] + [promesa.core :as p])) + +(defn- index-db [^js self] + (let [db (.-d1 self)] + (when-not db + (log/error :db-sync/index-db-missing {:binding "DB"})) + db)) + +(defn handle [{:keys [db ^js env request url claims route]}] + (let [path-params (:path-params route) + graph-id (:graph-id path-params) + member-id (:member-id path-params) + user-id (aget claims "sub")] + (case (:handler route) + :graphs/list + (if (string? user-id) + (p/let [graphs (index/clj result :keywordize-keys true) + body (http/coerce-http-request :graphs/create body) + graph-id (str (random-uuid))] + (cond + (not (string? user-id)) + (http/unauthorized) + + (nil? body) + (http/bad-request "invalid body") + + :else + (p/let [{:keys [graph-name schema-version]} body + name-exists? (index/clj result :keywordize-keys true) + body (http/coerce-http-request :graph-members/create body) + member-id (:user-id body) + email (:email body) + role (or (:role body) "member")] + (cond + (nil? body) + (http/bad-request "invalid body") + + (and (not (string? member-id)) + (not (string? email))) + (http/bad-request "invalid user") + + :else + (p/let [manager? (index/clj result :keywordize-keys true) + body (http/coerce-http-request :graph-members/update body) + role (:role body)] + (cond + (nil? body) + (http/bad-request "invalid body") + + :else + (p/let [manager? (index/clj result :keywordize-keys true) + body (http/coerce-http-request :e2ee/user-keys body)] + (cond + (not (string? user-id)) + (http/unauthorized) + + (nil? body) + (http/bad-request "invalid body") + + :else + (let [{:keys [public-key encrypted-private-key]} body] + (p/let [_ (index/ {} + (some? public-key) + (assoc :public-key public-key))))) + + :e2ee/graph-aes-key-get + (cond + (not (string? user-id)) + (http/unauthorized) + + :else + (p/let [access? (index/ {} + (some? encrypted-aes-key) + (assoc :encrypted-aes-key encrypted-aes-key))))))) + + :e2ee/graph-aes-key-post + (cond + (not (string? user-id)) + (http/unauthorized) + + :else + (.then (common/read-json request) + (fn [result] + (if (nil? result) + (http/bad-request "missing body") + (let [body (js->clj result :keywordize-keys true) + body (http/coerce-http-request :e2ee/graph-aes-key body)] + (if (nil? body) + (http/bad-request "invalid body") + (p/let [access? (index/clj result :keywordize-keys true) + body (http/coerce-http-request :e2ee/grant-access body)] + (if (nil? body) + (http/bad-request "invalid body") + (p/let [manager? (index/ {:ok true} + (seq @missing) + (assoc :missing-users @missing))))))))))))) + + (http/not-found)))) + +(defn handle-fetch [^js self request] + (let [db (index-db self) + env (.-env self) + url (js/URL. (.-url request)) + path (.-pathname url) + method (.-method request)] + (try + (cond + (contains? #{"OPTIONS" "HEAD"} method) + (common/options-response) + + (nil? db) + (http/error-response "server error" 500) + + :else + (p/let [_ (index/ ? order by addr asc limit ?" + after + limit))) + +(defn- snapshot-row->tuple [row] + (if (array? row) + [(aget row 0) (aget row 1) (aget row 2)] + [(aget row "addr") (aget row "content") (aget row "addresses")])) + +(defn- import-snapshot-rows! + [sql table rows] + (when (seq rows) + (doseq [batch (batch/rows->insert-batches table rows nil)] + (let [sql-str (:sql batch) + args (:args batch)] + (apply common/sql-exec sql sql-str args))))) + +(defn- reset-import! + [sql] + (common/sql-exec sql "delete from kvs") + (common/sql-exec sql "delete from tx_log") + (common/sql-exec sql "delete from sync_meta") + (storage/set-t! sql 0)) + +(defn- graph-id-from-request [request] + (let [header-id (.get (.-headers request) "x-graph-id") + url (js/URL. (.-url request)) + param-id (.get (.-searchParams url) "graph-id")] + (when (seq (or header-id param-id)) + (or header-id param-id)))) + +(defn- snapshot-key [graph-id snapshot-id] + (str graph-id "/" snapshot-id ".snapshot")) + +(defn- snapshot-url [request graph-id snapshot-id] + (let [url (js/URL. (.-url request))] + (str (.-origin url) "/assets/" graph-id "/" snapshot-id ".snapshot"))) + +(defn- maybe-decompress-stream [stream encoding] + (if (and (= encoding snapshot-content-encoding) (exists? js/DecompressionStream)) + (.pipeThrough stream (js/DecompressionStream. "gzip")) + stream)) + +(defn- ->uint8 [data] + (cond + (instance? js/Uint8Array data) data + (instance? js/ArrayBuffer data) (js/Uint8Array. data) + :else (js/Uint8Array. data))) + +(defn- concat-uint8 [^js a ^js b] + (cond + (nil? a) b + (nil? b) a + :else + (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))] + (.set out a 0) + (.set out b (.-byteLength a)) + out))) + +(defn- snapshot-export-stream [^js self] + (let [sql (.-sql self) + state (volatile! {:after -1 :done? false})] + (js/ReadableStream. + #js {:pull (fn [controller] + (p/let [{:keys [after done?]} @state] + (if done? + (.close controller) + (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size) + rows (mapv snapshot-row->tuple rows) + last-addr (if (seq rows) + (apply max (map first rows)) + after) + done? (< (count rows) snapshot-download-batch-size)] + (when (seq rows) + (let [payload (snapshot/encode-rows rows) + framed (snapshot/frame-bytes payload)] + (.enqueue controller framed))) + (vswap! state assoc :after last-addr :done? done?)))))}))) + +(defn- upload-multipart! + [^js bucket key stream opts] + (p/let [^js upload (.createMultipartUpload bucket key opts)] + (let [reader (.getReader stream)] + (-> (p/loop [buffer nil + part-number 1 + parts []] + (p/let [chunk (.read reader)] + (if (.-done chunk) + (cond + (and buffer (pos? (.-byteLength buffer))) + (p/let [^js resp (.uploadPart upload part-number buffer) + parts (conj parts {:partNumber part-number :etag (.-etag resp)})] + (p/let [_ (.complete upload (clj->js parts))] + {:ok true})) + + (seq parts) + (p/let [_ (.complete upload (clj->js parts))] + {:ok true}) + + :else + (p/let [_ (.abort upload)] + (.put bucket key (js/Uint8Array. 0) opts))) + (let [value (.-value chunk) + buffer (concat-uint8 buffer (->uint8 value))] + (if (>= (.-byteLength buffer) snapshot-multipart-part-size) + (let [part (.slice buffer 0 snapshot-multipart-part-size) + rest-parts (.slice buffer snapshot-multipart-part-size (.-byteLength buffer))] + (p/let [^js resp (.uploadPart upload part-number part) + parts (conj parts {:partNumber part-number :etag (.-etag resp)})] + (p/recur rest-parts (inc part-number) parts))) + (p/recur buffer part-number parts)))))) + (p/catch (fn [error] + (.abort upload) + (throw error))))))) + +(defn- snapshot-export-length [^js self] + (let [sql (.-sql self)] + (p/loop [after -1 + total 0] + (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)] + (if (empty? rows) + total + (let [rows (mapv snapshot-row->tuple rows) + payload (snapshot/encode-rows rows) + total (+ total 4 (.-byteLength payload)) + last-addr (apply max (map first rows)) + done? (< (count rows) snapshot-download-batch-size)] + (if done? + total + (p/recur last-addr total)))))))) + +(defn- snapshot-export-fixed-length [^js self] + (p/let [length (snapshot-export-length self) + stream (snapshot-export-stream self)] + (if (exists? js/FixedLengthStream) + (let [^js fixed (js/FixedLengthStream. length) + readable (.-readable fixed) + writable (.-writable fixed) + reader (.getReader stream) + writer (.getWriter writable)] + (p/let [_ (p/loop [] + (p/let [chunk (.read reader)] + (if (.-done chunk) + (.close writer) + (p/let [_ (.write writer (.-value chunk))] + (p/recur)))))] + readable)) + (p/let [resp (js/Response. stream) + buf (.arrayBuffer resp)] + buf)))) + +(declare import-snapshot!) +(defn- import-snapshot-stream! [^js self stream reset?] + (let [reader (.getReader stream) + reset-pending? (volatile! reset?) + total-count (volatile! 0)] + (p/let [buffer nil] + (p/catch + (p/loop [buffer buffer] + (p/let [chunk (.read reader)] + (if (.-done chunk) + (let [rows (snapshot/finalize-framed-buffer buffer) + rows-count (count rows) + reset? (and @reset-pending? true)] + (when (or reset? (seq rows)) + (import-snapshot! self rows reset?) + (vreset! reset-pending? false)) + (vswap! total-count + rows-count) + @total-count) + (let [value (.-value chunk) + {:keys [rows buffer]} (snapshot/parse-framed-chunk buffer value) + rows-count (count rows) + reset? (and @reset-pending? (seq rows))] + (when (seq rows) + (import-snapshot! self rows (true? reset?)) + (vreset! reset-pending? false)) + (vswap! total-count + rows-count) + (p/recur buffer))))) + (fn [error] + (throw error)))))) + +(defn pull-response [^js self since] + (let [sql (.-sql self) + txs (storage/fetch-tx-since sql since) + response {:type "pull/ok" + :t (t-now self) + :txs txs}] + response)) + +(defn- import-snapshot! [^js self rows reset?] + (let [sql (.-sql self)] + (ensure-schema! self) + (when reset? + (reset-import! sql)) + (import-snapshot-rows! sql "kvs" rows))) + +(defn- apply-tx! [^js self sender txs] + (let [sql (.-sql self)] + (ensure-conn! self) + (let [conn (.-conn self) + tx-data (protocol/transit->tx txs)] + (ldb/transact! conn tx-data {:op :apply-client-tx}) + (let [new-t (storage/get-t sql)] + ;; FIXME: no need to broadcast if client tx is less than remote tx + (ws/broadcast! self sender {:type "changed" :t new-t}) + new-t)))) + +(defn handle-tx-batch! [^js self sender txs t-before] + (let [current-t (t-now self)] + (cond + (or (not (number? t-before)) (neg? t-before)) + {:type "tx/reject" + :reason "invalid t-before"} + + (not= t-before current-t) + {:type "tx/reject" + :reason "stale" + :t current-t} + + :else + (if txs + (let [new-t (apply-tx! self sender txs)] + (if (and (map? new-t) (= "tx/reject" (:type new-t))) + new-t + {:type "tx/batch/ok" + :t new-t})) + {:type "tx/reject" + :reason "empty tx data"})))) + +(defn handle [{:keys [^js self request url route]}] + (case (:handler route) + :sync/health + (http/json-response :sync/health {:ok true}) + + :sync/pull + (let [raw-since (.get (.-searchParams url) "since") + since (if (some? raw-since) (parse-int raw-since) 0)] + (if (or (and (some? raw-since) (not (number? since))) (neg? since)) + (http/bad-request "invalid since") + (http/json-response :sync/pull (pull-response self since)))) + + :sync/snapshot-download + (let [graph-id (graph-id-from-request request) + ^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))] + (cond + (not (seq graph-id)) + (http/bad-request "missing graph id") + + (nil? bucket) + (http/error-response "missing assets bucket" 500) + + :else + (p/let [snapshot-id (str (random-uuid)) + key (snapshot-key graph-id snapshot-id) + stream (snapshot-export-stream self) + multipart? (and (some? (.-createMultipartUpload bucket)) + (fn? (.-createMultipartUpload bucket))) + opts #js {:httpMetadata #js {:contentType snapshot-content-type + :contentEncoding nil + :cacheControl snapshot-cache-control} + :customMetadata #js {:purpose "snapshot" + :created-at (str (common/now-ms))}} + _ (if multipart? + (upload-multipart! bucket key stream opts) + (p/let [body (snapshot-export-fixed-length self)] + (.put bucket key body opts))) + url (snapshot-url request graph-id snapshot-id)] + (http/json-response :sync/snapshot-download {:ok true + :key key + :url url + :content-encoding nil})))) + + :sync/admin-reset + (do + (common/sql-exec (.-sql self) "drop table if exists kvs") + (common/sql-exec (.-sql self) "drop table if exists tx_log") + (common/sql-exec (.-sql self) "drop table if exists sync_meta") + (storage/init-schema! (.-sql self)) + (set! (.-schema-ready self) true) + (set! (.-conn self) nil) + (http/json-response :sync/admin-reset {:ok true})) + + :sync/tx-batch + (.then (common/read-json request) + (fn [result] + (if (nil? result) + (http/bad-request "missing body") + (let [body (js->clj result :keywordize-keys true) + body (http/coerce-http-request :sync/tx-batch body)] + (if (nil? body) + (http/bad-request "invalid tx") + (let [{:keys [txs t-before]} body + t-before (parse-int t-before)] + (if (string? txs) + (http/json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before)) + (http/bad-request "invalid tx")))))))) + + :sync/snapshot-upload + (let [graph-id (graph-id-from-request request) + reset-param (.get (.-searchParams url) "reset") + reset? (if (nil? reset-param) + true + (not (contains? #{"false" "0"} reset-param))) + req-encoding (.get (.-headers request) "content-encoding")] + (cond + (not (seq graph-id)) + (http/bad-request "missing graph id") + + (nil? (.-body request)) + (http/bad-request "missing body") + + :else + (let [stream (.-body request) + encoding (or req-encoding "")] + (if (and (= encoding snapshot-content-encoding) + (not (exists? js/DecompressionStream))) + (http/error-response "gzip not supported" 500) + (p/let [stream (maybe-decompress-stream stream encoding) + count (import-snapshot-stream! self stream reset?)] + (http/json-response :sync/snapshot-upload {:ok true + :count count})))))) + + (http/not-found))) + +(defn- strip-sync-prefix [path] + (if (string/starts-with? path "/sync/") + (let [rest-path (subs path (count "/sync/")) + slash-idx (string/index-of rest-path "/")] + (if (neg? slash-idx) + "/" + (subs rest-path slash-idx))) + path)) + +(defn handle-http [^js self request] + (letfn [(with-cors-error [resp] + (if (instance? js/Promise resp) + (.catch resp + (fn [e] + (log/error :db-sync/http-error {:error e}) + (http/error-response "server error" 500))) + resp))] + (try + (let [url (js/URL. (.-url request)) + raw-path (.-pathname url) + path (strip-sync-prefix raw-path) + method (.-method request)] + (with-cors-error + (cond + (= method "OPTIONS") + (common/options-response) + + :else + (if-let [route (sync-routes/match-route method path)] + (handle {:self self + :request request + :url url + :route route}) + (http/not-found))))) + (catch :default e + (log/error :db-sync/http-error {:error e}) + (http/error-response "server error" 500))))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs new file mode 100644 index 0000000000..90256034b5 --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/ws.cljs @@ -0,0 +1,55 @@ +(ns logseq.db-sync.worker.handler.ws + (:require [logseq.db-sync.protocol :as protocol] + [logseq.db-sync.worker.auth :as auth] + [logseq.db-sync.worker.handler.sync :as sync-handler] + [logseq.db-sync.worker.presence :as presence] + [logseq.db-sync.worker.ws :as ws])) + +(defn handle-ws-message! [^js self ^js ws raw] + (let [message (-> raw protocol/parse-message ws/coerce-ws-client-message)] + (if-not (map? message) + (ws/send! ws {:type "error" :message "invalid request"}) + (case (:type message) + "hello" + (ws/send! ws {:type "hello" :t (sync-handler/t-now self)}) + + "ping" + (ws/send! ws {:type "pong"}) + + "presence" + (let [editing-block-uuid (:editing-block-uuid message)] + (presence/update-presence! self ws {:editing-block-uuid editing-block-uuid}) + (presence/broadcast-online-users! self)) + + "pull" + (let [raw-since (:since message) + since (if (some? raw-since) (sync-handler/parse-int raw-since) 0)] + (if (or (and (some? raw-since) (not (number? since))) (neg? since)) + (ws/send! ws {:type "error" :message "invalid since"}) + (ws/send! ws (sync-handler/pull-response self since)))) + + ;; "snapshot" + ;; (send! ws (snapshot-response self)) + + "tx/batch" + (let [txs (:txs message) + t-before (sync-handler/parse-int (:t-before message))] + (if (string? txs) + (ws/send! ws (sync-handler/handle-tx-batch! self ws txs t-before)) + (ws/send! ws {:type "tx/reject" :reason "invalid tx"}))) + + (ws/send! ws {:type "error" :message "unknown type"}))))) + +(defn handle-ws [^js self request] + (let [pair (js/WebSocketPair.) + client (aget pair 0) + server (aget pair 1) + state (.-state self)] + (.acceptWebSocket state server) + (let [token (auth/token-from-request request) + claims (auth/unsafe-jwt-claims token) + user (presence/claims->user claims)] + (when user + (presence/add-presence! self server user)) + (presence/broadcast-online-users! self)) + (js/Response. nil #js {:status 101 :webSocket client}))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/http.cljs b/deps/db-sync/src/logseq/db_sync/worker/http.cljs new file mode 100644 index 0000000000..99ef96f73e --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/http.cljs @@ -0,0 +1,36 @@ +(ns logseq.db-sync.worker.http + (:require [logseq.db-sync.common :as common] + [logseq.db-sync.malli-schema :as db-sync-schema] + [logseq.db-sync.worker.coerce :as coerce])) + +(defn coerce-http-request [schema-key body] + (if-let [coercer (get db-sync-schema/http-request-coercers schema-key)] + (let [coerced (coerce/coerce coercer body {:schema schema-key :dir :request})] + (when-not (= coerced coerce/invalid-coerce) + coerced)) + body)) + +(defn json-response + ([schema-key data] (json-response schema-key data 200)) + ([schema-key data status] + (if-let [coercer (get db-sync-schema/http-response-coercers schema-key)] + (let [coerced (coerce/coerce coercer data {:schema schema-key :dir :response})] + (if (= coerced coerce/invalid-coerce) + (common/json-response {:error "server error"} 500) + (common/json-response coerced status))) + (common/json-response data status)))) + +(defn error-response [message status] + (json-response :error {:error message} status)) + +(defn bad-request [message] + (error-response message 400)) + +(defn unauthorized [] + (error-response "unauthorized" 401)) + +(defn forbidden [] + (error-response "forbidden" 403)) + +(defn not-found [] + (error-response "not found" 404)) diff --git a/deps/db-sync/src/logseq/db_sync/worker/presence.cljs b/deps/db-sync/src/logseq/db_sync/worker/presence.cljs new file mode 100644 index 0000000000..bd6c8ea03b --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/presence.cljs @@ -0,0 +1,55 @@ +(ns logseq.db-sync.worker.presence + (:require [clojure.string :as string] + [logseq.db-sync.worker.ws :as ws])) + +(defn claims->user + [claims] + (when claims + (let [user-id (aget claims "sub") + email (aget claims "email") + username (or (aget claims "preferred_username") + (aget claims "cognito:username") + (aget claims "username")) + name (aget claims "name")] + (when (string? user-id) + (cond-> {:user-id user-id} + (string? email) (assoc :email email) + (string? username) (assoc :username username) + (string? name) (assoc :name name)))))) + +(defn presence* + [^js self] + (or (.-presence self) + (set! (.-presence self) (atom {})))) + +(defn online-users + [^js self] + (vec (distinct (vals @(presence* self))))) + +(defn broadcast-online-users! + [^js self] + (ws/broadcast! self nil {:type "online-users" :online-users (online-users self)})) + +(defn add-presence! + [^js self ^js ws user] + (swap! (presence* self) assoc ws user) + (.serializeAttachment ws (clj->js user))) + +(defn update-presence! + [^js self ^js ws {:keys [editing-block-uuid] :as updates}] + (swap! (presence* self) + (fn [presence] + (if-let [user (get presence ws)] + (let [user' (if (contains? updates :editing-block-uuid) + (if (and (string? editing-block-uuid) + (not (string/blank? editing-block-uuid))) + (assoc user :editing-block-uuid editing-block-uuid) + (dissoc user :editing-block-uuid)) + user)] + (.serializeAttachment ws (clj->js user')) + (assoc presence ws user')) + presence)))) + +(defn remove-presence! + [^js self ^js ws] + (swap! (presence* self) dissoc ws)) diff --git a/deps/db-sync/src/logseq/db_sync/worker/routes.cljs b/deps/db-sync/src/logseq/db_sync/worker/routes/index.cljs similarity index 96% rename from deps/db-sync/src/logseq/db_sync/worker/routes.cljs rename to deps/db-sync/src/logseq/db_sync/worker/routes/index.cljs index 05dee697e6..5cb0a32a7f 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/routes.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/routes/index.cljs @@ -1,4 +1,4 @@ -(ns logseq.db-sync.worker.routes +(ns logseq.db-sync.worker.routes.index (:require [reitit.core :as r])) (def ^:private route-data diff --git a/deps/db-sync/src/logseq/db_sync/worker/routes/sync.cljs b/deps/db-sync/src/logseq/db_sync/worker/routes/sync.cljs new file mode 100644 index 0000000000..e4a3b028d5 --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/routes/sync.cljs @@ -0,0 +1,18 @@ +(ns logseq.db-sync.worker.routes.sync + (:require [reitit.core :as r])) + +(def ^:private route-data + [["/health" {:methods {"GET" :sync/health}}] + ["/pull" {:methods {"GET" :sync/pull}}] + ["/snapshot/download" {:methods {"GET" :sync/snapshot-download}}] + ["/admin/reset" {:methods {"DELETE" :sync/admin-reset}}] + ["/tx/batch" {:methods {"POST" :sync/tx-batch}}] + ["/snapshot/upload" {:methods {"POST" :sync/snapshot-upload}}]]) + +(def ^:private router + (r/router route-data)) + +(defn match-route [method path] + (when-let [match (r/match-by-path router path)] + (when-let [handler (get-in match [:data :methods method])] + (assoc match :handler handler)))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/ws.cljs b/deps/db-sync/src/logseq/db_sync/worker/ws.cljs new file mode 100644 index 0000000000..5c4846530b --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/ws.cljs @@ -0,0 +1,34 @@ +(ns logseq.db-sync.worker.ws + (:require [lambdaisland.glogi :as log] + [logseq.db-sync.malli-schema :as db-sync-schema] + [logseq.db-sync.protocol :as protocol] + [logseq.db-sync.worker.coerce :as coerce])) + +(defn ws-open? [ws] + (= 1 (.-readyState ws))) + +(defn coerce-ws-client-message [message] + (when message + (let [coerced (coerce/coerce db-sync-schema/ws-client-message-coercer message {:schema :ws/client})] + (when-not (= coerced coerce/invalid-coerce) + coerced)))) + +(defn coerce-ws-server-message [message] + (when message + (let [coerced (coerce/coerce db-sync-schema/ws-server-message-coercer message {:schema :ws/server})] + (when-not (= coerced coerce/invalid-coerce) + coerced)))) + +(defn send! [ws msg] + (when (ws-open? ws) + (if-let [coerced (coerce-ws-server-message msg)] + (.send ws (protocol/encode-message coerced)) + (do + (log/error :db-sync/ws-response-invalid {:message msg}) + (.send ws (protocol/encode-message {:type "error" :message "server error"})))))) + +(defn broadcast! [^js self sender msg] + (let [clients (.getWebSockets (.-state self))] + (doseq [ws clients] + (when (and (not= ws sender) (ws-open? ws)) + (send! ws msg))))) diff --git a/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs index 7e3cde08df..3226383380 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs @@ -1,6 +1,6 @@ (ns logseq.db-sync.worker-routes-test (:require [cljs.test :refer [deftest is testing]] - [logseq.db-sync.worker.routes :as routes])) + [logseq.db-sync.worker.routes.index :as routes])) (deftest match-route-graphs-test (testing "graphs routes" diff --git a/deps/db-sync/test/logseq/db_sync/worker_sync_routes_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_sync_routes_test.cljs new file mode 100644 index 0000000000..39148ddb78 --- /dev/null +++ b/deps/db-sync/test/logseq/db_sync/worker_sync_routes_test.cljs @@ -0,0 +1,24 @@ +(ns logseq.db-sync.worker-sync-routes-test + (:require [cljs.test :refer [deftest is testing]] + [logseq.db-sync.worker.routes.sync :as sync-routes])) + +(deftest match-route-sync-test + (testing "sync routes" + (let [match (sync-routes/match-route "GET" "/health")] + (is (= :sync/health (:handler match)))) + (let [match (sync-routes/match-route "GET" "/pull")] + (is (= :sync/pull (:handler match)))) + (let [match (sync-routes/match-route "GET" "/snapshot/download")] + (is (= :sync/snapshot-download (:handler match)))) + (let [match (sync-routes/match-route "DELETE" "/admin/reset")] + (is (= :sync/admin-reset (:handler match)))) + (let [match (sync-routes/match-route "POST" "/tx/batch")] + (is (= :sync/tx-batch (:handler match)))) + (let [match (sync-routes/match-route "POST" "/snapshot/upload")] + (is (= :sync/snapshot-upload (:handler match)))))) + +(deftest match-route-sync-method-mismatch-test + (testing "sync method mismatch returns nil" + (is (nil? (sync-routes/match-route "POST" "/health"))) + (is (nil? (sync-routes/match-route "GET" "/admin/reset"))) + (is (nil? (sync-routes/match-route "PUT" "/tx/batch")))))