diff --git a/deps/db-sync/README.md b/deps/db-sync/README.md index 7f77b3bb8b..bffd4f827d 100644 --- a/deps/db-sync/README.md +++ b/deps/db-sync/README.md @@ -133,6 +133,10 @@ npm run test:node-adapter | COGNITO_ISSUER | Cognito issuer URL | | COGNITO_CLIENT_ID | Cognito client id | | COGNITO_JWKS_URL | Cognito JWKS URL | +| OPEN_COLLECTIVE_API_KEY | Open Collective GraphQL API key (used to build `https://api.opencollective.com/graphql/v2/`) | +| OPEN_COLLECTIVE_API_URL | Optional full Open Collective GraphQL endpoint. Overrides `OPEN_COLLECTIVE_API_KEY` | +| OPEN_COLLECTIVE_COLLECTIVE_SLUG | Collective slug to sync (defaults to `logseq`) | +| OPEN_COLLECTIVE_WEBHOOK_TOKEN | Shared token expected in `/hooks/open-collective?token=...` | ## Notes - Protocol definitions live in `docs/agent-guide/db-sync/protocol.md`. diff --git a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs index b3ed276242..474d4ee327 100644 --- a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs +++ b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs @@ -165,6 +165,14 @@ (def graph-access-response-schema http-ok-response-schema) +(def user-info-response-schema + [:map + [:ExpireTime :int] + [:UserGroups [:sequential :string]] + [:ProUser :boolean] + [:StorageLimit :int] + [:GraphCountLimit :int]]) + (def graph-delete-response-schema [:map [:graph-id :string] @@ -252,7 +260,8 @@ :e2ee/grant-access e2ee-grant-access-request-schema}) (def http-response-schemas - {:graphs/list graphs-list-response-schema + {:user/get user-info-response-schema + :graphs/list graphs-list-response-schema :graphs/create graph-create-response-schema :graphs/access graph-access-response-schema :graphs/delete graph-delete-response-schema diff --git a/deps/db-sync/src/logseq/db_sync/node/dispatch.cljs b/deps/db-sync/src/logseq/db_sync/node/dispatch.cljs index bd2d412ee7..e5ef645b66 100644 --- a/deps/db-sync/src/logseq/db_sync/node/dispatch.cljs +++ b/deps/db-sync/src/logseq/db_sync/node/dispatch.cljs @@ -8,6 +8,7 @@ [logseq.db-sync.worker.handler.index :as index-handler] [logseq.db-sync.worker.handler.sync :as sync-handler] [logseq.db-sync.worker.http :as http] + [logseq.db-sync.worker.open-collective :as open-collective] [promesa.core :as p])) (defn- admin-token-valid? @@ -28,6 +29,9 @@ (= path "/health") (http/json-response :worker/health {:ok true}) + (= path "/user") + (index-handler/handle-fetch index-self request) + (or (= path "/graphs") (string/starts-with? path "/graphs/")) (index-handler/handle-fetch index-self request) @@ -47,6 +51,13 @@ access-resp))) (http/bad-request "invalid asset path"))) + (= path "/hooks/open-collective") + (if (contains? #{"POST" "OPTIONS"} method) + (if (= method "OPTIONS") + (common/options-response) + (open-collective/handle-webhook request env)) + (http/error-response "method not allowed" 405)) + (= method "OPTIONS") (common/options-response) diff --git a/deps/db-sync/src/logseq/db_sync/worker.cljs b/deps/db-sync/src/logseq/db_sync/worker.cljs index d1b51774e5..6ef0ab5c29 100644 --- a/deps/db-sync/src/logseq/db_sync/worker.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker.cljs @@ -9,6 +9,7 @@ [logseq.db-sync.worker.dispatch :as dispatch] [logseq.db-sync.worker.handler.sync :as sync-handler] [logseq.db-sync.worker.handler.ws :as ws-handler] + [logseq.db-sync.worker.open-collective :as open-collective] [logseq.db-sync.worker.presence :as presence] [logseq.db-sync.worker.ws :as ws] [promesa.core :as p] @@ -19,7 +20,9 @@ (def worker (sentry/wrap-handler #js {:fetch (fn [request env _ctx] - (dispatch/handle-worker-fetch request env))})) + (dispatch/handle-worker-fetch request env)) + :scheduled (fn [_event env _ctx] + (open-collective/handle-scheduled env))})) (defclass SyncDO (extends DurableObject) diff --git a/deps/db-sync/src/logseq/db_sync/worker/dispatch.cljs b/deps/db-sync/src/logseq/db_sync/worker/dispatch.cljs index 04b286ccae..43dd3126cd 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/dispatch.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/dispatch.cljs @@ -5,6 +5,7 @@ [logseq.db-sync.worker.handler.assets :as assets-handler] [logseq.db-sync.worker.handler.index :as index-handler] [logseq.db-sync.worker.http :as http] + [logseq.db-sync.worker.open-collective :as open-collective] [promesa.core :as p])) (defn- admin-token-valid? @@ -37,6 +38,9 @@ (= path "/health") (http/json-response :worker/health {:ok true}) + (= path "/user") + (index-handler/handle-fetch #js {:env env :d1 (aget env "DB")} request) + (or (= path "/graphs") (string/starts-with? path "/graphs/")) (index-handler/handle-fetch #js {:env env :d1 (aget env "DB")} request) @@ -59,6 +63,13 @@ access-resp))) (http/bad-request "invalid asset path"))) + (= path "/hooks/open-collective") + (if (contains? #{"POST" "OPTIONS"} method) + (if (= method "OPTIONS") + (common/options-response) + (open-collective/handle-webhook request env)) + (http/error-response "method not allowed" 405)) + (= method "OPTIONS") (common/options-response) diff --git a/deps/db-sync/src/logseq/db_sync/worker/handler/index.cljs b/deps/db-sync/src/logseq/db_sync/worker/handler/index.cljs index f49d8442dd..1863905bb0 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/handler/index.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/handler/index.cljs @@ -44,6 +44,14 @@ (delete-graph-fn graph-id) ( claims (aget "sub"))] (case (:handler route) + :user/get + (if (string? user-id) + (p/let [{:keys [expire-time user-groups is-pro]} (index/ expire-time (now-epoch-s))) + storage-limit (if pro-user? pro-storage-limit free-storage-limit) + graph-count-limit (if pro-user? pro-graph-count-limit free-graph-count-limit)] + (http/json-response :user/get + {:ExpireTime expire-time + :UserGroups (or user-groups []) + :ProUser pro-user? + :StorageLimit storage-limit + :GraphCountLimit graph-count-limit})) + (http/unauthorized)) + :graphs/list (if (string? user-id) (p/let [graphs (index/emails + [member-node] + (let [account (get member-node "account") + account-email (some-> (get account "email") string/lower-case) + admin-emails (->> (get-in account ["admins" "nodes"]) + (keep (fn [node] + (some-> (get-in node ["account" "email"]) + string/lower-case))))] + (->> (cons account-email admin-emails) + (remove string/blank?) + distinct + vec))) + +(defn- tiers->groups + [tiers] + (cond-> #{} + (seq (set/intersection sponsor-tier-slugs tiers)) + (conj "alpha-tester") + + (contains? tiers "backers") + (conj "beta-tester"))) + +(defn- merge-tier-email + [acc member-node] + (let [tier-slug (some-> member-node (get "tier") (get "slug")) + emails (->emails member-node)] + (if (and (string? tier-slug) (seq emails)) + (reduce (fn [m email] + (update m email (fnil conj #{}) tier-slug)) + acc + emails) + acc))) + +(defn- js variables)})}) + text (.text resp)] + (if-not (.-ok resp) + (throw (ex-info "open collective request failed" + {:status (.-status resp) + :body text})) + (let [body (js->clj (js/JSON.parse text)) + errors (get body "errors")] + (when (seq errors) + (throw (ex-info "open collective graphql error" + {:errors errors}))) + body))) + (p/rejected (ex-info "missing open collective endpoint" + {:vars ["OPEN_COLLECTIVE_API_URL" "OPEN_COLLECTIVE_API_KEY"]})))) + +(defn- groups + [^js env] + (let [slug (collective-slug env)] + (letfn [(tiers] + (p/let [body (tiers (reduce merge-tier-email email->tiers nodes) + next-offset (+ offset oc-page-limit)] + (if (< next-offset total-count) + (tiers) + (into {} + (map (fn [[email tiers]] + [email (tiers->groups tiers)])) + email->tiers))))] + (groups now-s now-ms] + (let [email (:email user) + current-groups (set (:user-groups user)) + target-oc-groups (get email->groups email #{}) + preserved-groups (set/difference current-groups oc-managed-groups) + next-groups (-> (set/union preserved-groups target-oc-groups) + sort + vec) + ;; next-is-pro? (boolean (seq target-oc-groups)) + next-is-pro? false + next-expire-time (if next-is-pro? + (+ now-s pro-expire-seconds) + now-s)] + {:next-fields {:expire-time next-expire-time + :user-groups next-groups + :is-pro next-is-pro? + :graphs-count (:graphs-count user) + :storage-count (:storage-count user) + :updated-at now-ms} + :changed? (or (not= (set next-groups) current-groups) + (not= next-is-pro? (boolean (:is-pro user))) + (not= next-expire-time (:expire-time user)))})) + +(defn groups (groups env) + users (index/groups now-s now-ms)] + (if (and changed? (string? (:user-id user))) + (p/let [_ (index/email+groups + [account collective] + (let [tiers (->> (get-in account ["memberOf" "nodes"]) + (filter (fn [node] + (= collective (get-in node ["account" "slug"])))) + (keep (fn [node] + (some-> node (get "tier") (get "slug"))) ) + set) + groups (tiers->groups tiers) + emails (->> (concat [(get account "email")] + (map (fn [node] + (get-in node ["account" "email"])) + (get-in account ["admins" "nodes"]))) + (keep (fn [email] + (some-> email string/lower-case))) + (remove string/blank?) + distinct + vec)] + {:emails emails + :groups groups})) + +(defn email+groups account collective) + now-s (now-epoch-s) + now-ms (common/now-ms) + update-results (p/all + (map (fn [email] + (p/let [user-id (index/ (set/union preserved-groups groups) + sort + vec) + ;; next-is-pro? (boolean (seq groups)) + next-is-pro? false + next-expire-time (if next-is-pro? + (+ now-s pro-expire-seconds) + now-s) + changed? (or (not= (set next-groups) current-groups) + (not= next-is-pro? (boolean (:is-pro current))) + (not= next-expire-time (:expire-time current)))] + (if changed? + (p/let [_ (index/ (p/let [body (common/read-json request) + event-type (some-> body (aget "type")) + slug (or (some-> body (aget "data") (aget "member") (aget "memberCollective") (aget "slug")) + (some-> body (aget "data") (aget "collective") (aget "slug"))) + result (cond + (and (string? slug) + (contains? #{"collective.member.created" + "collective.member.updated" + "collective.member.deleted"} + event-type)) + ( (p/let [result ( (p/with-redefs [open-collective/handle-webhook (fn [_request _env] + (ok-json-response))] + (p/let [resp (dispatch/handle-worker-fetch request #js {}) + text (.text resp) + body (js->clj (js/JSON.parse text) :keywordize-keys true)] + (is (= 200 (.-status resp))) + (is (= true (:ok body))))) + (p/then (fn [] (done))) + (p/catch (fn [error] + (is false (str error)) + (done))))))) + (deftest admin-token-bypasses-graph-access-check-for-assets-route-test (async done (let [access-check-calls (atom 0) diff --git a/deps/db-sync/test/logseq/db_sync/worker_handler_index_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_handler_index_test.cljs index 1835b2bb79..93e1886562 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_handler_index_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_handler_index_test.cljs @@ -7,6 +7,36 @@ [logseq.db-sync.worker.handler.index :as index-handler] [promesa.core :as p])) +(deftest user-get-returns-expected-shape-test + (async done + (let [request (js/Request. "http://localhost/user" #js {:method "POST"}) + url (js/URL. (.-url request))] + (-> (p/with-redefs [index/clj (js/JSON.parse text) :keywordize-keys true)] + (is (= 200 (.-status resp))) + (is (= ["team"] (:UserGroups body))) + (is (= true (:ProUser body))) + (is (= (* 10 1024 1024 1024) (:StorageLimit body))) + (is (= 10 (:GraphCountLimit body))))) + (p/then (fn [] + (done))) + (p/catch (fn [error] + (is false (str error)) + (done))))))) + (deftest graph-access-response-with-timing-caches-result-test (async done (let [request (js/Request. "http://localhost/sync/graph-1" diff --git a/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs index 3226383380..0f3fd9838b 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs @@ -2,6 +2,11 @@ (:require [cljs.test :refer [deftest is testing]] [logseq.db-sync.worker.routes.index :as routes])) +(deftest match-route-user-test + (testing "user route" + (let [match (routes/match-route "POST" "/user")] + (is (= :user/get (:handler match)))))) + (deftest match-route-graphs-test (testing "graphs routes" (let [match (routes/match-route "GET" "/graphs")] diff --git a/deps/db-sync/worker/wrangler.toml b/deps/db-sync/worker/wrangler.toml index efd4f184a9..22d05648d2 100644 --- a/deps/db-sync/worker/wrangler.toml +++ b/deps/db-sync/worker/wrangler.toml @@ -15,6 +15,9 @@ enabled = true [observability.logs] invocation_logs = false +[triggers] +crons = ["0 3 * * 0"] + [[durable_objects.bindings]] name = "LOGSEQ_SYNC_DO" class_name = "SyncDO" @@ -68,6 +71,9 @@ database_id = "00325aa2-c805-4693-b599-900a25dcde42" [env.staging.version_metadata] binding = "CF_VERSION_METADATA" +[env.staging.triggers] +crons = ["0 3 * * 0"] + [env.prod] name = "logseq-sync-prod" @@ -98,3 +104,6 @@ database_id = "4c80e058-69b5-4985-88d1-f53711d817ba" [env.prod.version_metadata] binding = "CF_VERSION_METADATA" + +[env.prod.triggers] +crons = ["0 3 * * 0"] diff --git a/docs/agent-guide/db-sync/protocol.md b/docs/agent-guide/db-sync/protocol.md index a8da3095a6..59480404d9 100644 --- a/docs/agent-guide/db-sync/protocol.md +++ b/docs/agent-guide/db-sync/protocol.md @@ -54,6 +54,11 @@ - `GET /health` - Worker health check. Response: `{"ok":true}`. +### User +- `POST /user` + - Return current user sync entitlement/profile fields from D1. + - Response: `{"ExpireTime": , "UserGroups": ["..."], "ProUser": , "StorageLimit": , "GraphCountLimit": }`. + ### Graphs (index DO) - `GET /graphs` - List graphs the user owns. Response: `{"graphs":[{graph-id, graph-name, schema-version?, graph-ready-for-use?, created-at, updated-at}...]}`. @@ -110,6 +115,11 @@ - `DELETE /sync/:graph-id/admin/reset` - Drop/recreate per-graph tables. Response: `{"ok":true}`. +### Open Collective webhook +- `POST /hooks/open-collective?token=` + - Trigger immediate Open Collective reconciliation for member events. + - Returns: `{"ok":true}`. + ### Assets - `GET /assets/:graph-id/:asset-uuid.:ext` - Download asset (binary response, `content-type` set, `x-asset-type` header included).