add users and graph_members tables

This commit is contained in:
Tienson Qin
2026-01-21 06:54:50 +08:00
parent f0efeb31bc
commit 4fb4e87066
5 changed files with 500 additions and 87 deletions

View File

@@ -59,17 +59,17 @@
(defn <d1-run
[^js db sql-str & args]
(p/let [^js stmt (.prepare db sql-str)
stmt (if (seq args)
(.apply (.-bind stmt) stmt (to-array args))
stmt)]
^js stmt (if (seq args)
(.apply (.-bind stmt) stmt (to-array args))
stmt)]
(.run stmt)))
(defn <d1-all
[^js db sql-str & args]
(p/let [^js stmt (.prepare db sql-str)
stmt (if (seq args)
(.apply (.-bind stmt) stmt (to-array args))
stmt)]
^js stmt (if (seq args)
(.apply (.-bind stmt) stmt (to-array args))
stmt)]
(.all stmt)))
(defn read-json [request]

View File

@@ -0,0 +1,164 @@
(ns logseq.db-sync.index
(:require [logseq.db-sync.common :as common]
[promesa.core :as p]))
(defn <index-init! [db]
(p/do!
(common/<d1-run db
(str "create table if not exists graphs ("
"graph_id TEXT primary key,"
"graph_name TEXT,"
"user_id TEXT,"
"schema_version TEXT,"
"created_at INTEGER,"
"updated_at INTEGER"
");"))
(common/<d1-run db
(str "create table if not exists users ("
"id TEXT primary key,"
"email TEXT,"
"email_verified INTEGER,"
"username TEXT"
");"))
(common/<d1-run db
(str "create table if not exists graph_members ("
"user_id TEXT,"
"graph_id TEXT,"
"role TEXT,"
"invited_by TEXT,"
"created_at INTEGER,"
"primary key (user_id, graph_id),"
"check (role in ('manager', 'member'))"
");"))))
(defn <index-list [db user-id]
(if (string? user-id)
(p/let [result (common/<d1-all db
(str "select g.graph_id, g.graph_name, g.schema_version, g.created_at, g.updated_at "
"from graphs g "
"left join graph_members m on g.graph_id = m.graph_id and m.user_id = ? "
"where g.user_id = ? or m.user_id = ? "
"order by g.updated_at desc")
user-id
user-id
user-id)
rows (common/get-sql-rows result)]
(mapv (fn [row]
{:graph_id (aget row "graph_id")
:graph_name (aget row "graph_name")
:schema_version (aget row "schema_version")
:created_at (aget row "created_at")
:updated_at (aget row "updated_at")})
rows))
[]))
(defn <index-upsert! [db graph-id graph-name user-id schema-version]
(p/let [now (common/now-ms)
result (common/<d1-run db
(str "insert into graphs (graph_id, graph_name, user_id, schema_version, created_at, updated_at) "
"values (?, ?, ?, ?, ?, ?) "
"on conflict(graph_id) do update set "
"graph_name = excluded.graph_name, "
"user_id = excluded.user_id, "
"schema_version = excluded.schema_version, "
"updated_at = excluded.updated_at")
graph-id
graph-name
user-id
schema-version
now
now)]
result))
(defn <index-delete! [db graph-id]
(p/do!
(common/<d1-run db "delete from graph_members where graph_id = ?" graph-id)
(common/<d1-run db "delete from graphs where graph_id = ?" graph-id)))
(defn <user-upsert! [db claims]
(let [user-id (aget claims "sub")]
(when (string? user-id)
(let [email (aget claims "email")
email-verified (aget claims "email_verified")
username (aget claims "cognito:username")
email-verified (cond
(true? email-verified) 1
(false? email-verified) 0
:else nil)]
(common/<d1-run db
(str "insert into users (id, email, email_verified, username) "
"values (?, ?, ?, ?) "
"on conflict(id) do update set "
"email = excluded.email, "
"email_verified = excluded.email_verified, "
"username = excluded.username")
user-id
email
email-verified
username)))))
(defn <graph-member-upsert! [db graph-id user-id role invited-by]
(let [now (common/now-ms)]
(common/<d1-run db
(str "insert into graph_members (user_id, graph_id, role, invited_by, created_at) "
"values (?, ?, ?, ?, ?) "
"on conflict(user_id, graph_id) do update set "
"role = excluded.role, "
"invited_by = excluded.invited_by")
user-id
graph-id
role
invited-by
now)))
(defn <graph-members-list [db graph-id]
(p/let [result (common/<d1-all db
(str "select user_id, graph_id, role, invited_by, created_at "
"from graph_members where graph_id = ? order by created_at asc")
graph-id)
rows (common/get-sql-rows result)]
(mapv (fn [row]
{:user_id (aget row "user_id")
:graph_id (aget row "graph_id")
:role (aget row "role")
:invited_by (aget row "invited_by")
:created_at (aget row "created_at")})
rows)))
(defn <graph-member-update-role! [db graph-id user-id role]
(common/<d1-run db
(str "update graph_members set role = ? "
"where graph_id = ? and user_id = ?")
role
graph-id
user-id))
(defn <graph-member-delete! [db graph-id user-id]
(common/<d1-run db
"delete from graph_members where graph_id = ? and user_id = ?"
graph-id
user-id))
(defn <user-has-access-to-graph? [db graph-id user-id]
(when (and (string? graph-id) (string? user-id))
(p/let [result (common/<d1-all db
(str "select graph_id from graphs where graph_id = ? and user_id = ? "
"union select graph_id from graph_members where graph_id = ? and user_id = ?")
graph-id
user-id
graph-id
user-id)
rows (common/get-sql-rows result)]
(boolean (seq rows)))))
(defn <user-is-manager? [db graph-id user-id]
(when (and (string? graph-id) (string? user-id))
(p/let [result (common/<d1-all db
(str "select graph_id from graphs where graph_id = ? and user_id = ? "
"union select graph_id from graph_members where graph_id = ? and user_id = ? and role = 'manager'")
graph-id
user-id
graph-id
user-id)
rows (common/get-sql-rows result)]
(boolean (seq rows)))))

View File

@@ -84,6 +84,21 @@
[:created_at :int]
[:updated_at :int]])
(def graph-member-role-schema
[:enum "manager" "member"])
(def graph-member-info-schema
[:map
[:user_id :string]
[:graph_id :string]
[:role graph-member-role-schema]
[:invited_by {:optional true} [:maybe :string]]
[:created_at :int]])
(def graph-members-list-response-schema
[:map
[:members [:sequential graph-member-info-schema]]])
(def graphs-list-response-schema
[:map
[:graphs [:sequential graph-info-schema]]])
@@ -104,6 +119,15 @@
[:graph_id :string]
[:deleted :boolean]])
(def graph-member-create-request-schema
[:map
[:user_id :string]
[:role {:optional true} graph-member-role-schema]])
(def graph-member-update-request-schema
[:map
[:role graph-member-role-schema]])
(def tx-batch-request-schema
[:map
[:t_before :int]
@@ -140,6 +164,8 @@
(def http-request-schemas
{:graphs/create graph-create-request-schema
:graph-members/create graph-member-create-request-schema
:graph-members/update graph-member-update-request-schema
:sync/tx-batch tx-batch-request-schema
:sync/snapshot-import snapshot-import-request-schema})
@@ -148,6 +174,10 @@
:graphs/create graph-create-response-schema
:graphs/access graph-access-response-schema
:graphs/delete graph-delete-response-schema
:graph-members/list graph-members-list-response-schema
:graph-members/create http-ok-response-schema
:graph-members/update http-ok-response-schema
:graph-members/delete http-ok-response-schema
:worker/health http-ok-response-schema
:sync/health http-ok-response-schema
:sync/pull pull-ok-schema

View File

@@ -7,6 +7,7 @@
[logseq.common.authorization :as authorization]
[logseq.db :as ldb]
[logseq.db-sync.common :as common :refer [cors-headers]]
[logseq.db-sync.index :as index]
[logseq.db-sync.malli-schema :as db-sync-schema]
[logseq.db-sync.protocol :as protocol]
[logseq.db-sync.storage :as storage]
@@ -146,18 +147,6 @@
(when-not (js/isNaN n)
n))))
(defn- entity-title
[db entity-ref]
(let [ent (cond
(vector? entity-ref) (d/entity db entity-ref)
(number? entity-ref) (d/entity db entity-ref)
(keyword? entity-ref) (d/entity db [:db/ident entity-ref])
:else nil)]
(when ent
{:uuid (some-> (:block/uuid ent) str)
:title (or (:block/title ent)
(:block/name ent))})))
(def ^:private max-asset-size (* 100 1024 1024))
(def ^:private snapshot-rows-default-limit 500)
(def ^:private snapshot-rows-max-limit 2000)
@@ -240,7 +229,7 @@
(or (= path "/graphs")
(string/starts-with? path "/graphs/"))
(.fetch (index-stub env) request)
(.fetch (index-stub env) (.clone request))
(string/starts-with? path "/assets/")
(if (= method "OPTIONS")
@@ -530,65 +519,6 @@
(log/error :db-sync/index-db-missing {:binding "DB"}))
db))
(defn- <index-init! [db]
(p/do!
(common/<d1-run db
(str "create table if not exists graphs ("
"graph_id TEXT primary key,"
"graph_name TEXT,"
"user_id TEXT,"
"schema_version TEXT,"
"created_at INTEGER,"
"updated_at INTEGER"
");"))))
(defn- <index-list [db user-id]
(if (string? user-id)
(p/let [result (common/<d1-all db
(str "select graph_id, graph_name, schema_version, created_at, updated_at "
"from graphs where user_id = ? order by updated_at desc")
user-id)
rows (common/get-sql-rows result)]
(mapv (fn [row]
{:graph_id (aget row "graph_id")
:graph_name (aget row "graph_name")
:schema_version (aget row "schema_version")
:created_at (aget row "created_at")
:updated_at (aget row "updated_at")})
rows))
[]))
(defn- <index-upsert! [db graph-id graph-name user-id schema-version]
(p/let [now (common/now-ms)
result (common/<d1-run db
(str "insert into graphs (graph_id, graph_name, user_id, schema_version, created_at, updated_at) "
"values (?, ?, ?, ?, ?, ?) "
"on conflict(graph_id) do update set "
"graph_name = excluded.graph_name, "
"user_id = excluded.user_id, "
"schema_version = excluded.schema_version, "
"updated_at = excluded.updated_at")
graph-id
graph-name
user-id
schema-version
now
now)]
result))
(defn- <index-delete! [db graph-id]
(common/<d1-run db "delete from graphs where graph_id = ?" graph-id))
(defn- <user-has-access-to-graph? [db graph-id user-id]
(when (and (string? graph-id) (string? user-id))
(p/let [result (common/<d1-all db
(str "select graph_id from graphs "
"where graph_id = ? and user_id = ?")
graph-id
user-id)
rows (common/get-sql-rows result)]
(boolean (seq rows)))))
(defn- graph-path-parts [path]
(->> (string/split path #"/")
(remove string/blank?)
@@ -610,8 +540,10 @@
(error-response "server error" 500)
:else
(p/let [_ (<index-init! db)
claims (auth-claims request env)]
(p/let [_ (index/<index-init! db)
claims (auth-claims request env)
_ (when claims
(index/<user-upsert! db claims))]
(cond
(nil? claims)
(unauthorized)
@@ -619,7 +551,7 @@
(and (= method "GET") (= ["graphs"] parts))
(let [user-id (aget claims "sub")]
(if (string? user-id)
(p/let [graphs (<index-list db user-id)]
(p/let [graphs (index/<index-list db user-id)]
(json-response :graphs/list {:graphs graphs}))
(unauthorized)))
@@ -641,7 +573,8 @@
:else
(p/let [{:keys [graph_name schema_version]} body
_ (<index-upsert! db graph-id graph_name user-id schema_version)]
_ (index/<index-upsert! db graph-id graph_name user-id schema_version)
_ (index/<graph-member-upsert! db graph-id user-id "manager" user-id)]
(json-response :graphs/create {:graph_id graph-id})))))))
(and (= method "GET")
@@ -655,15 +588,119 @@
(unauthorized)
:else
(p/let [owns? (<user-has-access-to-graph? db graph-id user-id)]
(p/let [owns? (index/<user-has-access-to-graph? db graph-id user-id)]
(if owns?
(json-response :graphs/access {:ok true})
(forbidden)))))
(and (= method "GET")
(= 3 (count parts))
(= "graphs" (first parts))
(= "members" (nth parts 2 nil)))
(let [graph-id (nth parts 1 nil)
user-id (aget claims "sub")]
(cond
(not (string? user-id))
(unauthorized)
:else
(p/let [can-access? (index/<user-has-access-to-graph? db graph-id user-id)]
(if (not can-access?)
(forbidden)
(p/let [members (index/<graph-members-list db graph-id)]
(json-response :graph-members/list {:members members}))))))
(and (= method "POST")
(= 3 (count parts))
(= "graphs" (first parts))
(= "members" (nth parts 2 nil)))
(let [graph-id (nth parts 1 nil)
user-id (aget claims "sub")]
(cond
(not (string? user-id))
(unauthorized)
:else
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (coerce-http-request :graph-members/create body)
member-id (:user_id body)
role (or (:role body) "member")]
(cond
(nil? body)
(bad-request "invalid body")
(not (string? member-id))
(bad-request "invalid user id")
:else
(p/let [manager? (index/<user-is-manager? db graph-id user-id)]
(if (not manager?)
(forbidden)
(p/let [_ (index/<graph-member-upsert! db graph-id member-id role user-id)]
(json-response :graph-members/create {:ok true})))))))))))
(and (= method "PUT")
(= 4 (count parts))
(= "graphs" (first parts))
(= "members" (nth parts 2 nil)))
(let [graph-id (nth parts 1 nil)
member-id (nth parts 3 nil)
user-id (aget claims "sub")]
(cond
(not (string? user-id))
(unauthorized)
(not (string? member-id))
(bad-request "invalid user id")
:else
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (coerce-http-request :graph-members/update body)
role (:role body)]
(cond
(nil? body)
(bad-request "invalid body")
:else
(p/let [manager? (index/<user-is-manager? db graph-id user-id)]
(if (not manager?)
(forbidden)
(p/let [_ (index/<graph-member-update-role! db graph-id member-id role)]
(json-response :graph-members/update {:ok true})))))))))))
(and (= method "DELETE")
(= 4 (count parts))
(= "graphs" (first parts))
(= "members" (nth parts 2 nil)))
(let [graph-id (nth parts 1 nil)
member-id (nth parts 3 nil)
user-id (aget claims "sub")]
(cond
(not (string? user-id))
(unauthorized)
(not (string? member-id))
(bad-request "invalid user id")
:else
(p/let [manager? (index/<user-is-manager? db graph-id user-id)]
(if (not manager?)
(forbidden)
(p/let [_ (index/<graph-member-delete! db graph-id member-id)]
(json-response :graph-members/delete {:ok true}))))))
(and (= method "DELETE")
(= 2 (count parts))
(= "graphs" (first parts)))
(let [graph-id (nth parts 1)
(let [graph-id (nth parts 1 nil)
user-id (aget claims "sub")]
(cond
(not (seq graph-id))
@@ -673,17 +710,16 @@
(unauthorized)
:else
(p/let [owns? (<user-has-access-to-graph? db graph-id user-id)]
(p/let [owns? (index/<user-has-access-to-graph? db graph-id user-id)]
(if (not owns?)
(forbidden)
(p/let [_ (<index-delete! db graph-id)]
(p/let [_ (index/<index-delete! db graph-id)]
(let [^js namespace (.-LOGSEQ_SYNC_DO (.-env self))
do-id (.idFromName namespace graph-id)
stub (.get namespace do-id)
reset-url (str (.-origin url) "/admin/reset")]
(.fetch stub (js/Request. reset-url #js {:method "DELETE"})))
(json-response :graphs/delete {:graph_id graph-id :deleted true}))))))
:else
(not-found))))
(catch :default error

View File

@@ -0,0 +1,183 @@
(ns logseq.db-sync.index-test
(:require [cljs.test :refer [deftest is async]]
[clojure.string :as string]
[logseq.db-sync.index :as index]
[promesa.core :as p]))
(defn- js-row [m]
(let [o (js-obj)]
(doseq [[k v] m]
(aset o (name k) v))
o))
(defn- js-rows [rows]
(into-array (map js-row rows)))
(defn- record-exec! [state sql]
(swap! state update :executed conj sql))
(defn- run-sql! [state sql args]
(record-exec! state sql)
(cond
(string/includes? sql "insert into users")
(let [[user-id email email-verified username] args]
(swap! state update :users assoc user-id {:id user-id
:email email
:email_verified email-verified
:username username}))
(string/includes? sql "insert into graph_members")
(let [[user-id graph-id role invited-by created-at] args]
(swap! state update :graph-members
(fn [members]
(let [k [user-id graph-id]
existing (get members k)
created-at (or (:created_at existing) created-at)]
(assoc members k {:user_id user-id
:graph_id graph-id
:role role
:invited_by invited-by
:created_at created-at})))))
(string/includes? sql "insert into graphs")
(let [[graph-id graph-name user-id schema-version created-at updated-at] args]
(swap! state update :graphs assoc graph-id {:graph_id graph-id
:graph_name graph-name
:user_id user-id
:schema_version schema-version
:created_at created-at
:updated_at updated-at}))
(string/includes? sql "update graph_members set role")
(let [[role graph-id user-id] args]
(swap! state update :graph-members assoc-in [[user-id graph-id] :role] role))
(string/includes? sql "delete from graph_members")
(let [[graph-id user-id] args]
(swap! state update :graph-members dissoc [user-id graph-id]))
(string/includes? sql "delete from graphs")
(let [[graph-id] args]
(swap! state update :graphs dissoc graph-id))
:else
nil))
(defn- union-access-rows [state sql args]
(let [[graph-id user-id] args
graph-owner-id (get-in @state [:graphs graph-id :user_id])
member (get-in @state [:graph-members [user-id graph-id]])
manager-required? (string/includes? sql "role = 'manager'")
has-access? (or (= graph-owner-id user-id)
(and member
(or (not manager-required?)
(= "manager" (:role member)))))]
(if has-access?
(js-rows [{:graph_id graph-id}])
(js-rows []))))
(defn- all-sql [state sql args]
(record-exec! state sql)
(cond
(string/includes? sql "from graph_members where graph_id")
(let [graph-id (first args)
members (->> (:graph-members @state)
vals
(filter (fn [row] (= graph-id (:graph_id row))))
(sort-by :created_at))]
(js-rows members))
(string/includes? sql "union select graph_id from graph_members")
(union-access-rows state sql args)
(string/includes? sql "select g.graph_id")
(let [[user-id] args
owned (->> (:graphs @state)
vals
(filter (fn [row] (= user-id (:user_id row)))))
member-ids (->> (:graph-members @state)
vals
(filter (fn [row] (= user-id (:user_id row))))
(map :graph_id)
set)
member-graphs (->> (:graphs @state)
vals
(filter (fn [row] (contains? member-ids (:graph_id row)))))]
(js-rows (concat owned member-graphs)))
:else
(js-rows [])))
(defn- make-d1 [state]
#js {:prepare (fn [sql]
(let [stmt #js {}]
(set! (.-_sql stmt) sql)
(set! (.-_args stmt) [])
(set! (.-bind stmt)
(fn [& args]
(set! (.-_args stmt) (vec args))
stmt))
(set! (.-run stmt)
(fn []
(run-sql! state (.-_sql stmt) (.-_args stmt))
#js {}))
(set! (.-all stmt)
(fn []
(all-sql state (.-_sql stmt) (.-_args stmt))))
stmt))})
(deftest index-init-schema-test
(async done
(let [state (atom {:executed []
:users {}
:graph-members {}
:graphs {}})
db (make-d1 state)]
(-> (index/<index-init! db)
(p/then (fn [_]
(let [sqls (:executed @state)]
(is (some #(string/includes? % "create table if not exists users") sqls))
(is (some #(string/includes? % "create table if not exists graph_members") sqls)))
(done)))
(p/catch (fn [e]
(is false (str e))
(done)))))))
(deftest user-upsert-test
(async done
(let [state (atom {:executed []
:users {}
:graph-members {}
:graphs {}})
db (make-d1 state)
claims #js {"sub" "user-1"
"email" "foo@test.com"
"email_verified" true
"cognito:username" "foo"}]
(-> (index/<user-upsert! db claims)
(p/then (fn [_]
(let [user (get-in @state [:users "user-1"])]
(is (= "foo@test.com" (:email user)))
(is (= 1 (:email_verified user)))
(is (= "foo" (:username user))))
(done)))
(p/catch (fn [e]
(is false (str e))
(done)))))))
(deftest graph-member-upsert-test
(async done
(let [state (atom {:executed []
:users {}
:graph-members {}
:graphs {}})
db (make-d1 state)]
(-> (index/<graph-member-upsert! db "graph-1" "user-2" "member" "user-1")
(p/then (fn [_]
(let [member (get-in @state [:graph-members ["user-2" "graph-1"]])]
(is (= "member" (:role member)))
(is (= "user-1" (:invited_by member))))
(done)))
(p/catch (fn [e]
(is false (str e))
(done)))))))