add server usage stats script (#12557)

* enhance(db-sync): add usage stats script

* fix(db-sync): use --local for local D1 scripts

* fix(db-sync): record activity only on successful sync requests

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Tienson Qin
2026-05-04 07:59:15 +08:00
committed by GitHub
parent 9bdfa5655d
commit b8ebdd575c
9 changed files with 421 additions and 28 deletions

View File

@@ -46,6 +46,19 @@ pnpm show-graphs-for-user --user-id us-east-1:example-user-id
The script uses `worker/wrangler.toml`, runs against the remote D1 binding `DB`,
defaults to `--env prod`, and prints JSON when `--json` is added.
Show usage totals (total users, total graphs, users created today, graphs created today):
```bash
cd deps/db-sync
pnpm show-usage-stats
pnpm show-usage-stats --days 7
pnpm show-usage-stats --json
```
`created today` uses UTC day boundaries from D1 (`date('now')`).
`active_*_last_n_days` uses deduplicated UTC-day activity rows from
`daily_active_entities` with the provided `--days` window.
Download a graph snapshot into a local sqlite debug file matching local graph DB schema (`kvs` table only):
```bash

View File

@@ -11,6 +11,7 @@
"delete-user-totally": "node worker/scripts/delete_user_totally.js",
"download-graph-db": "node worker/scripts/download_graph_db.js",
"show-graphs-for-user": "node worker/scripts/show_graphs_for_user.js",
"show-usage-stats": "node worker/scripts/show_usage_stats.js",
"build:node-adapter": "clojure -M:cljs release db-sync-node",
"dev:node-adapter": "clojure -M:cljs watch db-sync-node",
"start:node-adapter": "node worker/dist/node-adapter.js",

View File

@@ -6,6 +6,8 @@
(def ^:private user-upsert-cache-ttl-ms (* 60 60 1000))
(def ^:private user-upsert-cache-max 1024)
(defonce ^:private *user-upsert-cache (atom {}))
(def ^:private activity-touch-cache-max 8192)
(defonce ^:private *activity-touch-cache (atom {}))
(defn- prune-user-upsert-cache! [now-ms]
(swap! *user-upsert-cache
@@ -56,6 +58,19 @@
"alter table graphs add column graph_e2ee INTEGER DEFAULT 1")
(def ^:private graph-ready-for-use-migration-sql
"alter table graphs add column graph_ready_for_use integer default 1")
(def ^:private user-created-at-migration-sql
"alter table users add column created_at integer")
(def ^:private daily-active-entities-create-table-sql
(str "create table if not exists daily_active_entities ("
"day_utc TEXT,"
"entity_type TEXT,"
"entity_id TEXT,"
"first_seen_at INTEGER,"
"primary key (day_utc, entity_type, entity_id),"
"check (entity_type in ('user', 'graph'))"
");"))
(def ^:private daily-active-entities-create-index-sql
"create index if not exists idx_daily_active_entities_type_day on daily_active_entities (entity_type, day_utc)")
(defn- duplicate-column-error?
[error column-name]
@@ -96,6 +111,22 @@
(p/catch (fn [_]
(<run-migration!))))))
(defn- <ensure-user-created-at-column!
[db]
(letfn [(<run-migration! []
(-> (common/<d1-run db user-created-at-migration-sql)
(p/catch (fn [error]
(if (duplicate-column-error? error "created_at")
nil
(p/rejected error))))))]
(-> (p/let [result (common/<d1-all db
"select name from pragma_table_info('users') where name = 'created_at'")
rows (common/get-sql-rows result)]
(when (empty? rows)
(<run-migration!)))
(p/catch (fn [_]
(<run-migration!))))))
(defn <index-init! [db]
(p/do!
(common/<d1-run db
@@ -116,8 +147,10 @@
"id TEXT primary key,"
"email TEXT,"
"email_verified INTEGER,"
"username TEXT"
"username TEXT,"
"created_at INTEGER"
");"))
(<ensure-user-created-at-column! db)
(common/<d1-run db
(str "create table if not exists user_rsa_keys ("
"user_id TEXT primary key,"
@@ -145,12 +178,69 @@
"updated_at INTEGER,"
"primary key (graph_id, user_id)"
");"))
(common/<d1-run db daily-active-entities-create-table-sql)
(common/<d1-run db
"create index if not exists idx_graph_members_graph_id_created_at on graph_members (graph_id, created_at)")
(common/<d1-run db
"create index if not exists idx_graphs_user_id_updated_at on graphs (user_id, updated_at desc)")
(common/<d1-run db
"create index if not exists idx_users_email on users (email)")))
"create index if not exists idx_users_email on users (email)")
(common/<d1-run db daily-active-entities-create-index-sql)))
(defn- utc-day-str
[timestamp-ms]
(-> (.toISOString (js/Date. timestamp-ms))
(subs 0 10)))
(defn- prune-activity-touch-cache!
[cache]
(if (<= (count cache) activity-touch-cache-max)
cache
(let [drop-count (- (count cache) activity-touch-cache-max)]
(->> cache
(sort-by val)
(drop drop-count)
(into {})))))
(defn- cache-activity-touch!
[cache-key now-ms]
(swap! *activity-touch-cache
(fn [cache]
(-> cache
(assoc cache-key now-ms)
(prune-activity-touch-cache!)))))
(defn- activity-touch-cached?
[cache-key]
(contains? @*activity-touch-cache cache-key))
(defn- <activity-touch!
[db entity-type entity-id]
(if (and (string? entity-id) (contains? #{"user" "graph"} entity-type))
(let [now (common/now-ms)
day-utc (utc-day-str now)
cache-key [day-utc entity-type entity-id]]
(if (activity-touch-cached? cache-key)
(p/resolved nil)
(p/let [_ (common/<d1-run db
(str "insert into daily_active_entities (day_utc, entity_type, entity_id, first_seen_at) "
"values (?, ?, ?, ?) "
"on conflict(day_utc, entity_type, entity_id) do nothing")
day-utc
entity-type
entity-id
now)]
(cache-activity-touch! cache-key now)
nil)))
(p/resolved nil)))
(defn <user-activity-touch!
[db user-id]
(<activity-touch! db "user" user-id))
(defn <graph-activity-touch!
[db graph-id]
(<activity-touch! db "graph" graph-id))
(defn <index-list [db user-id]
(if (string? user-id)
@@ -261,16 +351,18 @@
(< (- now (:cached-at cached)) user-upsert-cache-ttl-ms))
(cache-user-upsert! user-id email email-verified username now)
(p/let [result (common/<d1-run db
(str "insert into users (id, email, email_verified, username) "
"values (?, ?, ?, ?) "
(str "insert into users (id, email, email_verified, username, created_at) "
"values (?, ?, ?, ?, ?) "
"on conflict(id) do update set "
"email = excluded.email, "
"email_verified = excluded.email_verified, "
"username = excluded.username")
"username = excluded.username, "
"created_at = coalesce(users.created_at, excluded.created_at)")
user-id
email
email-verified
username)]
username
now)]
(cache-user-upsert! user-id email email-verified username now)
result))))))

View File

@@ -1,7 +1,10 @@
(ns logseq.db-sync.worker.dispatch
(:require [clojure.string :as string]
[lambdaisland.glogi :as log]
[logseq.db-sync.common :as common]
[logseq.db-sync.index :as index]
[logseq.db-sync.platform.core :as platform]
[logseq.db-sync.worker.auth :as auth]
[logseq.db-sync.worker.handler.assets :as assets-handler]
[logseq.db-sync.worker.handler.index :as index-handler]
[logseq.db-sync.worker.http :as http]
@@ -27,6 +30,39 @@
(let [rewritten (platform/request (.toString new-url) request)]
(.fetch stub rewritten))))))
(defn- request-user-id
[request]
(let [token (auth/token-from-request request)
claims (when (string? token)
(auth/unsafe-jwt-claims token))
user-id (some-> claims (aget "sub"))]
(when (string? user-id)
user-id)))
(defn- <safe-touch-activity!
[request ^js env graph-id]
(let [db (aget env "DB")
user-id (request-user-id request)]
(if (and db (string? graph-id))
(try
(-> (p/let [_ (index/<graph-activity-touch! db graph-id)
_ (when (string? user-id)
(index/<user-activity-touch! db user-id))]
nil)
(p/catch (fn [error]
(log/warn :db-sync/activity-touch-failed
{:graph-id graph-id
:user-id user-id
:error error})
nil)))
(catch :default error
(log/warn :db-sync/activity-touch-failed
{:graph-id graph-id
:user-id user-id
:error error})
(p/resolved nil)))
(p/resolved nil))))
(defn handle-worker-fetch [request ^js env]
(->
(p/do
@@ -75,14 +111,17 @@
(subs rest-path slash-idx))
new-url (js/URL. (str (.-origin url) tail (.-search url)))]
(if (seq graph-id)
(if (= method "OPTIONS")
(common/options-response)
(if (admin-token-valid? request env)
(forward-sync-request request env graph-id new-url)
(p/let [access-resp (index-handler/graph-access-response request env graph-id)]
(if (.-ok access-resp)
(forward-sync-request request env graph-id new-url)
access-resp))))
(if (= method "OPTIONS")
(common/options-response)
(if (admin-token-valid? request env)
(forward-sync-request request env graph-id new-url)
(p/let [access-resp (index-handler/graph-access-response request env graph-id)]
(if (.-ok access-resp)
(p/let [response (forward-sync-request request env graph-id new-url)
_ (when (< (.-status response) 400)
(<safe-touch-activity! request env graph-id))]
response)
access-resp))))
(http/bad-request "missing graph id")))
:else

View File

@@ -50,6 +50,40 @@
(<delete-graph-storage! env url graph-id)
(index/<graph-delete-index-entry! db graph-id)))
(defn- <safe-user-activity-touch!
[db user-id]
(if (string? user-id)
(try
(-> (index/<user-activity-touch! db user-id)
(p/catch (fn [error]
(log/warn :db-sync/activity-touch-user-failed
{:user-id user-id
:error error})
nil)))
(catch :default error
(log/warn :db-sync/activity-touch-user-failed
{:user-id user-id
:error error})
(p/resolved nil)))
(p/resolved nil)))
(defn- <safe-graph-activity-touch!
[db graph-id]
(if (string? graph-id)
(try
(-> (index/<graph-activity-touch! db graph-id)
(p/catch (fn [error]
(log/warn :db-sync/activity-touch-graph-failed
{:graph-id graph-id
:error error})
nil)))
(catch :default error
(log/warn :db-sync/activity-touch-graph-failed
{:graph-id graph-id
:error error})
(p/resolved nil)))
(p/resolved nil)))
(defn ^:large-vars/cleanup-todo handle [{:keys [db ^js env request url claims route]}]
(let [path-params (:path-params route)
graph-id (:graph-id path-params)
@@ -370,12 +404,20 @@
(index/<user-upsert! db claims))]
(if (nil? claims)
(http/unauthorized)
(handle {:db db
:env env
:request request
:url url
:claims claims
:route route}))))))
(p/let [user-id (aget claims "sub")
_ (<safe-user-activity-touch! db user-id)
response (handle {:db db
:env env
:request request
:url url
:claims claims
:route route})
graph-id (some-> route :path-params :graph-id)
_ (when (and (string? user-id)
(string? graph-id)
(< (.-status response) 400))
(<safe-graph-activity-touch! db graph-id))]
response))))))
(catch :default error
(js/console.error "DEBUG handle-fetch error:" error)
(log/error :db-sync/index-error error)

View File

@@ -0,0 +1,2 @@
alter table users
add column created_at integer;

View File

@@ -0,0 +1,11 @@
create table if not exists daily_active_entities (
day_utc text,
entity_type text,
entity_id text,
first_seen_at integer,
primary key (day_utc, entity_type, entity_id),
check (entity_type in ('user', 'graph'))
);
create index if not exists idx_daily_active_entities_type_day
on daily_active_entities (entity_type, day_utc);

View File

@@ -86,8 +86,9 @@ left join users owner on owner.id = g.owner_user_id
order by g.updated_at desc;`;
}
function buildWranglerArgs({ database, config, env, sql }) {
return [
function buildWranglerArgs({ database, config, env, sql, local = false }) {
const useLocal = local || env === "local";
const args = [
"dlx",
"wrangler",
"d1",
@@ -95,13 +96,17 @@ function buildWranglerArgs({ database, config, env, sql }) {
database,
"--config",
config,
"--env",
env,
"--remote",
"--json",
"--command",
sql,
];
if (useLocal) {
args.push("--local");
} else {
args.push("--env", env, "--remote");
}
args.push("--json", "--command", sql);
return args;
}
function runWranglerQuery(args) {

View File

@@ -0,0 +1,188 @@
#!/usr/bin/env node
const path = require("node:path");
const { parseArgs } = require("node:util");
const {
buildWranglerArgs,
defaultConfigPath,
fail,
parseWranglerResults,
runWranglerQuery,
} = require("./graph_user_lib");
function printHelp() {
console.log(`Show db-sync usage stats from a D1 environment.
Usage:
node worker/scripts/show_usage_stats.js [--env prod|local] [--days 7] [--json]
Options:
--env <env> Wrangler environment to use. Defaults to "prod"; use "local" to query the local D1 database.
--days <n> Active window size in days. Defaults to 1.
--database <name> D1 binding or database name. Defaults to "DB".
--config <path> Wrangler config path. Defaults to worker/wrangler.toml.
--json Print JSON instead of a table.
--help Show this message.
Output:
days
active_since_utc
active_users_last_n_days
active_graphs_last_n_days
total_users
total_graphs
users_created_today
graphs_created_today
today_utc
`);
}
function parseDays(value) {
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1) {
fail("--days must be an integer >= 1.");
}
return parsed;
}
function parseCliArgs(argv) {
const { values } = parseArgs({
args: argv,
options: {
env: { type: "string", default: "prod" },
days: { type: "string", default: "1" },
database: { type: "string", default: "DB" },
config: { type: "string", default: defaultConfigPath },
json: { type: "boolean", default: false },
help: { type: "boolean", default: false },
},
strict: true,
allowPositionals: false,
});
if (values.help) {
printHelp();
process.exit(0);
}
return {
env: values.env,
days: parseDays(values.days),
database: values.database,
config: path.resolve(values.config),
json: values.json,
};
}
function buildUsageStatsSql(days) {
const sinceDays = days - 1;
return `with bounds as (
select
cast(strftime('%s', 'now', 'start of day') as integer) * 1000 as today_start_ms,
cast(strftime('%s', 'now', 'start of day', '+1 day') as integer) * 1000 as tomorrow_start_ms,
date('now') as today_utc,
date('now', '-${sinceDays} days') as active_since_utc
)
select
${days} as days,
(select active_since_utc from bounds) as active_since_utc,
(select count(distinct entity_id)
from daily_active_entities a
join bounds b on 1 = 1
where a.entity_type = 'user'
and a.day_utc >= b.active_since_utc
and a.day_utc <= b.today_utc) as active_users_last_n_days,
(select count(distinct entity_id)
from daily_active_entities a
join bounds b on 1 = 1
where a.entity_type = 'graph'
and a.day_utc >= b.active_since_utc
and a.day_utc <= b.today_utc) as active_graphs_last_n_days,
(select count(1) from users) as total_users,
(select count(1) from graphs) as total_graphs,
(select count(1)
from users u
join bounds b on 1 = 1
where u.created_at is not null
and u.created_at >= b.today_start_ms
and u.created_at < b.tomorrow_start_ms) as users_created_today,
(select count(1)
from graphs g
join bounds b on 1 = 1
where g.created_at is not null
and g.created_at >= b.today_start_ms
and g.created_at < b.tomorrow_start_ms) as graphs_created_today,
(select today_utc from bounds) as today_utc;`;
}
function sqlCountToNumber(value) {
const numeric = Number(value);
return Number.isFinite(numeric) ? numeric : 0;
}
function formatUsageStats(rows) {
if (!rows.length) {
return null;
}
const [row] = rows;
return {
days: sqlCountToNumber(row.days),
active_since_utc: typeof row.active_since_utc === "string" ? row.active_since_utc : null,
active_users_last_n_days: sqlCountToNumber(row.active_users_last_n_days),
active_graphs_last_n_days: sqlCountToNumber(row.active_graphs_last_n_days),
total_users: sqlCountToNumber(row.total_users),
total_graphs: sqlCountToNumber(row.total_graphs),
users_created_today: sqlCountToNumber(row.users_created_today),
graphs_created_today: sqlCountToNumber(row.graphs_created_today),
today_utc: typeof row.today_utc === "string" ? row.today_utc : null,
};
}
function printUsageStatsTable(stats) {
console.table([stats]);
}
function main() {
const options = parseCliArgs(process.argv.slice(2));
const sql = buildUsageStatsSql(options.days);
const wranglerArgs = buildWranglerArgs({
database: options.database,
config: options.config,
env: options.env,
sql,
});
const rows = parseWranglerResults(runWranglerQuery(wranglerArgs));
const stats = formatUsageStats(rows);
if (!stats) {
fail("No stats returned from D1.");
}
if (options.json) {
console.log(JSON.stringify(stats, null, 2));
} else {
printUsageStatsTable(stats);
}
}
if (require.main === module) {
try {
main();
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (/no such table: daily_active_entities/i.test(message)) {
fail("Missing daily_active_entities. Apply worker migration 0006_add_daily_active_entities.sql.");
}
if (/no such column: created_at/i.test(message)) {
fail("Missing users.created_at. Apply worker migration 0005_add_user_created_at.sql.");
}
fail(message);
}
}
module.exports = {
buildUsageStatsSql,
formatUsageStats,
parseCliArgs,
};