no need to use DO for index

also, add cache to jwt token
This commit is contained in:
Tienson Qin
2026-01-21 17:50:05 +08:00
parent 87ebe90ab9
commit b8774bcf85
4 changed files with 142 additions and 76 deletions

View File

@@ -5,6 +5,58 @@
(def text-decoder (js/TextDecoder.))
(def text-encoder (js/TextEncoder.))
(def ^:private jwks-ttl-ms (* 6 60 60 1000))
(def ^:private token-ttl-ms (* 60 60 1000))
(defonce ^:private *jwks-cache (atom {:url nil :keys nil :fetched-at 0}))
(defonce ^:private *token-cache (atom {}))
(defn- get-now-ms []
(.now js/Date))
(defn- cached-token
[token now-s now-ms]
(when-let [{:keys [payload exp cached-at]} (get @*token-cache token)]
(when (and (number? exp)
(> exp now-s)
(< (- now-ms cached-at) token-ttl-ms))
payload)))
(defn- cache-token!
[token payload now-ms]
(let [exp (aget payload "exp")]
(when (number? exp)
(swap! *token-cache assoc token {:payload payload :exp exp :cached-at now-ms}))
(when (> (count @*token-cache) 2000)
(swap! *token-cache
(fn [cache]
(into {}
(remove (fn [[_ {:keys [exp cached-at]}]]
(or (not (number? exp))
(<= exp (js/Math.floor (/ now-ms 1000)))
(>= (- now-ms cached-at) token-ttl-ms))))
cache))))))
(defn- get-jwks-keys
[url & {:keys [force?]}]
(let [now (get-now-ms)
{:keys [url cached-url keys fetched-at]} {:cached-url (:url @*jwks-cache)
:url url
:keys (:keys @*jwks-cache)
:fetched-at (:fetched-at @*jwks-cache)}
fresh? (and (not force?)
(= cached-url url)
keys
(< (- now fetched-at) jwks-ttl-ms))]
(if fresh?
(p/resolved keys)
(p/let [jwks-resp (js/fetch url)
_ (when-not (.-ok jwks-resp) (throw (ex-info "jwks" {})))
jwks (.json jwks-resp)
keys (or (aget jwks "keys") #js [])]
(reset! *jwks-cache {:url url :keys keys :fetched-at now})
keys))))
(defn- base64url->uint8array [input]
(let [pad (if (pos? (mod (count input) 4))
(apply str (repeat (- 4 (mod (count input) 4)) "="))
@@ -35,29 +87,36 @@
_ (when (not= 3 (count parts)) (throw (ex-info "invalid" {})))
header-part (nth parts 0)
payload-part (nth parts 1)
signature-part (nth parts 2)]
(p/let [header (decode-jwt-part header-part)
payload (decode-jwt-part payload-part)
issuer (aget env "COGNITO_ISSUER")
client-id (aget env "COGNITO_CLIENT_ID")
_ (when (not= (aget payload "iss") issuer) (throw (ex-info "iss not found" {})))
_ (when (not= (aget payload "aud") client-id) (throw (ex-info "aud not found" {})))
now (js/Math.floor (/ (.now js/Date) 1000))
_ (when (and (aget payload "exp") (< (aget payload "exp") now))
(throw (ex-info "exp" {})))
jwks-resp (js/fetch (aget env "COGNITO_JWKS_URL"))
_ (when-not (.-ok jwks-resp) (throw (ex-info "jwks" {})))
jwks (.json jwks-resp)
keys (or (aget jwks "keys") #js [])
key (.find keys (fn [k] (= (aget k "kid") (aget header "kid"))))
_ (when-not key (throw (ex-info "kid" {})))
crypto-key (import-rsa-key key)
data (.encode text-encoder (str header-part "." payload-part))
signature (base64url->uint8array signature-part)
ok (.verify js/crypto.subtle
"RSASSA-PKCS1-v1_5"
crypto-key
signature
data)]
(when ok
payload))))
signature-part (nth parts 2)
now-ms (get-now-ms)
now-s (js/Math.floor (/ now-ms 1000))]
(if-let [cached (cached-token token now-s now-ms)]
(p/resolved cached)
(p/let [header (decode-jwt-part header-part)
payload (decode-jwt-part payload-part)
issuer (aget env "COGNITO_ISSUER")
client-id (aget env "COGNITO_CLIENT_ID")
_ (when (not= (aget payload "iss") issuer) (throw (ex-info "iss not found" {})))
_ (when (not= (aget payload "aud") client-id) (throw (ex-info "aud not found" {})))
_ (when (and (aget payload "exp") (< (aget payload "exp") now-s))
(throw (ex-info "exp" {})))
jwks-url (aget env "COGNITO_JWKS_URL")
keys (get-jwks-keys jwks-url)
key (.find keys (fn [k] (= (aget k "kid") (aget header "kid"))))
key (if key
key
(p/let [keys (get-jwks-keys jwks-url :force? true)
key (.find keys (fn [k] (= (aget k "kid") (aget header "kid"))))]
key))
_ (when-not key (throw (ex-info "kid" {})))
crypto-key (import-rsa-key key)
data (.encode text-encoder (str header-part "." payload-part))
signature (base64url->uint8array signature-part)
ok (.verify js/crypto.subtle
"RSASSA-PKCS1-v1_5"
crypto-key
signature
data)]
(when ok
(cache-token! token payload now-ms)
payload)))))

View File

@@ -6,8 +6,7 @@
{:db-sync {:target :esm
:output-dir "worker/dist/worker"
:modules {:main {:exports {default logseq.db-sync.worker/worker
SyncDO logseq.db-sync.worker/SyncDO
SyncIndexDO logseq.db-sync.worker/SyncIndexDO}}}
SyncDO logseq.db-sync.worker/SyncDO}}}
:js-options {:js-provider :import}
:closure-defines {shadow.cljs.devtools.client.env/enabled false}
:devtools {:enabled false}}

View File

@@ -21,31 +21,46 @@
(subs auth-header 7)))
(defn- token-from-request [request]
(js/console.dir request)
(or (bearer-token (.get (.-headers request) "authorization"))
(let [url (js/URL. (.-url request))]
(.get (.-searchParams url) "token"))))
(defn- decode-jwt-part [part]
(let [pad (if (pos? (mod (count part) 4))
(apply str (repeat (- 4 (mod (count part) 4)) "="))
"")
base64 (-> (str part pad)
(string/replace "-" "+")
(string/replace "_" "/"))
raw (js/atob base64)]
(js/JSON.parse raw)))
(defn- unsafe-jwt-claims [token]
(try
(when (string? token)
(let [parts (string/split token #"\.")]
(when (= 3 (count parts))
(decode-jwt-part (nth parts 1)))))
(catch :default _
nil)))
(defn- auth-claims [request env]
(let [token (token-from-request request)]
(if (string? token)
(.catch (authorization/verify-jwt token env) (fn [_] nil))
(js/Promise.resolve nil))))
(defn- index-stub [^js env]
(let [^js namespace (.-LOGSEQ_SYNC_INDEX_DO env)
do-id (.idFromName namespace "index")]
(.get namespace do-id)))
(declare handle-index-fetch)
(defn- graph-access-response [request env graph-id]
(let [token (token-from-request request)
url (js/URL. (.-url request))
access-url (str (.-origin url) "/graphs/" graph-id "/access")
headers (js/Headers. (.-headers request))]
headers (js/Headers. (.-headers request))
index-self #js {:env env :d1 (aget env "DB")}]
(when (string? token)
(.set headers "authorization" (str "Bearer " token)))
(.fetch (index-stub env)
(js/Request. access-url #js {:method "GET" :headers headers}))))
(handle-index-fetch index-self (js/Request. access-url #js {:method "GET" :headers headers}))))
(defn- parse-asset-path [path]
(let [prefix "/assets/"]
@@ -63,7 +78,18 @@
:asset-type asset-type
:key (str graph-id "/" asset-uuid "." asset-type)})))))
(defn- ensure-schema! [^js self]
(when-not (true? (.-schema-ready self))
(storage/init-schema! (.-sql self))
(set! (.-schema-ready self) true)))
(defn- ensure-conn! [^js self]
(ensure-schema! self)
(when-not (.-conn self)
(set! (.-conn self) (storage/open-conn (.-sql self)))))
(defn- t-now [^js self]
(ensure-schema! self)
(storage/get-t (.-sql self)))
(defn- ws-open? [ws]
@@ -265,7 +291,7 @@
(or (= path "/graphs")
(string/starts-with? path "/graphs/"))
(.fetch (index-stub env) (.clone request))
(handle-index-fetch #js {:env env :d1 (aget env "DB")} request)
(string/starts-with? path "/assets/")
(if (= method "OPTIONS")
@@ -333,12 +359,13 @@
(defn- import-snapshot! [^js self rows reset?]
(let [sql (.-sql self)]
(storage/init-schema! sql)
(ensure-schema! self)
(when reset?
(common/sql-exec sql "delete from kvs")
(common/sql-exec sql "delete from tx_log")
(common/sql-exec sql "delete from sync_meta")
(storage/init-schema! sql)
(set! (.-schema-ready self) true)
(storage/set-t! sql 0))
(when (seq rows)
(doseq [[addr content addresses] rows]
@@ -351,13 +378,11 @@
(set! (.-conn self) (storage/open-conn sql))))
(defn- apply-tx! [^js self sender txs]
(let [sql (.-sql self)
conn (.-conn self)]
(when-not conn
(fail-fast :db-sync/missing-db {:op :apply-tx}))
(let [tx-data (protocol/transit->tx txs)]
(let [sql (.-sql self)]
(ensure-conn! self)
(let [conn (.-conn self)
tx-data (protocol/transit->tx txs)]
(ldb/transact! conn tx-data {:op :apply-client-tx})
(prn :debug :finished-db-transact)
(let [new-t (storage/get-t sql)]
;; FIXME: no need to broadcast if client tx is less than remote tx
(broadcast! self sender {:type "changed" :t new-t})
@@ -419,14 +444,15 @@
(let [pair (js/WebSocketPair.)
client (aget pair 0)
server (aget pair 1)
env (.-env self)]
(p/let [claims (auth-claims request env)
user (claims->user claims)]
(.acceptWebSocket (.-state self) server)
state (.-state self)]
(.acceptWebSocket state server)
(let [token (token-from-request request)
claims (unsafe-jwt-claims token)
user (claims->user claims)]
(when user
(add-presence! self server user))
(broadcast-online-users! self)
(js/Response. nil #js {:status 101 :webSocket client}))))
(broadcast-online-users! self))
(js/Response. nil #js {:status 101 :webSocket client})))
(defn- strip-sync-prefix [path]
(if (string/starts-with? path "/sync/")
@@ -491,6 +517,8 @@
(common/sql-exec (.-sql self) "drop table if exists tx_log")
(common/sql-exec (.-sql self) "drop table if exists sync_meta")
(storage/init-schema! (.-sql self))
(set! (.-schema-ready self) true)
(set! (.-conn self) nil)
(json-response :sync/admin-reset {:ok true}))
(and (= method "POST") (= path "/tx/batch"))
@@ -536,7 +564,8 @@
(set! (.-state this) state)
(set! (.-env this) env)
(set! (.-sql this) (.-sql ^js (.-storage state)))
(set! (.-conn this) (storage/open-conn (.-sql this))))
(set! (.-conn this) nil)
(set! (.-schema-ready this) false))
Object
(fetch [this request]
@@ -778,16 +807,3 @@
(catch :default error
(log/error :db-sync/index-error error)
(error-response "server error" 500)))))
(defclass SyncIndexDO
(extends DurableObject)
(constructor [this ^js state env]
(super state env)
(set! (.-state this) state)
(set! (.-env this) env)
(set! (.-d1 this) (aget env "DB")))
Object
(fetch [this request]
(handle-index-fetch this request)))

View File

@@ -10,10 +10,6 @@ enabled = true
name = "LOGSEQ_SYNC_DO"
class_name = "SyncDO"
[[durable_objects.bindings]]
name = "LOGSEQ_SYNC_INDEX_DO"
class_name = "SyncIndexDO"
[[r2_buckets]]
binding = "LOGSEQ_SYNC_ASSETS"
bucket_name = "logseq-sync-assets-dev"
@@ -24,8 +20,8 @@ database_name = "logseq-sync-graph-meta-dev"
database_id = "c020574a-5623-407b-be0c-cd192bab9545"
[[migrations]]
tag = "v1"
new_sqlite_classes = [ "SyncDO", "SyncIndexDO" ]
tag = "v2"
new_sqlite_classes = [ "SyncDO" ]
[vars]
COGNITO_JWKS_URL = "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_dtagLnju8/.well-known/jwks.json"
@@ -44,13 +40,9 @@ COGNITO_CLIENT_ID = "69cs1lgme7p8kbgld8n5kseii6"
name = "LOGSEQ_SYNC_DO"
class_name = "SyncDO"
[[env.staging.durable_objects.bindings]]
name = "LOGSEQ_SYNC_INDEX_DO"
class_name = "SyncIndexDO"
[[env.staging.migrations]]
tag = "v1"
new_sqlite_classes = [ "SyncDO", "SyncIndexDO" ]
tag = "v2"
new_sqlite_classes = [ "SyncDO" ]
[[env.staging.r2_buckets]]
binding = "LOGSEQ_SYNC_ASSETS"