Files
logseq/cli-e2e/scripts/db_sync_server.py
rcmerci f751f27a63 refactor(cli): remove --data-dir, add --root-dir
--data-dir: ~/logseq/graphs
--root-dir: ~/logseq

All other configuration files and data directories are derived from the root-dir
2026-04-24 22:35:00 +08:00

291 lines
10 KiB
Python

#!/usr/bin/env python3
"""Manage local db-sync server process for cli-e2e sync suite."""
from __future__ import annotations
import argparse
import base64
import ctypes
import json
import os
import signal
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any, Dict, Optional
DEFAULT_COGNITO_ISSUER = "https://cognito-idp.us-east-2.amazonaws.com/us-east-2_kAqZcxIeM"
DEFAULT_COGNITO_CLIENT_ID = "1qi1uijg8b6ra70nejvbptis0q"
def fail(message: str, **context: object) -> None:
payload = {"status": "error", "message": message}
if context:
payload["context"] = context
print(json.dumps(payload), file=sys.stderr)
raise SystemExit(1)
def load_pid(pid_file: Path) -> Optional[int]:
if not pid_file.exists():
return None
raw = pid_file.read_text(encoding="utf-8").strip()
if not raw:
return None
try:
return int(raw)
except ValueError:
return None
def process_running(pid: int) -> bool:
if pid <= 0:
return False
if os.name == "nt":
kernel32 = ctypes.windll.kernel32
access = 0x00100000 | 0x1000 # SYNCHRONIZE | PROCESS_QUERY_LIMITED_INFORMATION
handle = kernel32.OpenProcess(access, False, pid)
if handle:
kernel32.CloseHandle(handle)
return True
return ctypes.get_last_error() == 5
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
else:
return True
def terminate_process(pid: int, force: bool) -> bool:
if pid <= 0:
return True
if os.name == "nt":
cmd = ["taskkill", "/PID", str(pid), "/T"]
if force:
cmd.append("/F")
result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
return result.returncode == 0 or not process_running(pid)
try:
os.kill(pid, signal.SIGKILL if force else signal.SIGTERM)
except ProcessLookupError:
return True
except PermissionError:
return False
return True
def parse_json_file(path: Path) -> Dict[str, Any]:
if not path.exists():
fail("sync auth file is missing", auth_path=str(path), hint="Run `logseq login` first.")
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as error:
fail("sync auth file is invalid JSON", auth_path=str(path), detail=str(error))
if not isinstance(payload, dict):
fail("sync auth file must be a JSON object", auth_path=str(path))
return payload
def decode_jwt_claims(token: str) -> Optional[Dict[str, Any]]:
if not isinstance(token, str):
return None
parts = token.split(".")
if len(parts) != 3:
return None
payload = parts[1]
padded = payload + "=" * ((4 - (len(payload) % 4)) % 4)
try:
decoded = base64.urlsafe_b64decode(padded.encode("utf-8")).decode("utf-8")
claims = json.loads(decoded)
except (ValueError, UnicodeDecodeError, json.JSONDecodeError):
return None
return claims if isinstance(claims, dict) else None
def auth_cognito_from_auth_file(auth_path: Path) -> Dict[str, str]:
payload = parse_json_file(auth_path)
token = payload.get("id-token") or payload.get("access-token")
claims = decode_jwt_claims(token) if isinstance(token, str) else None
issuer = claims.get("iss") if isinstance(claims, dict) else None
client_id = None
if isinstance(claims, dict):
client_id = claims.get("aud") or claims.get("client_id")
return {
"issuer": issuer if isinstance(issuer, str) and issuer else "",
"client_id": client_id if isinstance(client_id, str) and client_id else "",
}
def wait_health(base_url: str, timeout_s: float, interval_s: float) -> bool:
deadline = time.time() + timeout_s
url = base_url.rstrip("/") + "/health"
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
while time.time() < deadline:
try:
with opener.open(url, timeout=2) as response:
if response.status == 200:
return True
except (urllib.error.URLError, TimeoutError, socket.timeout):
pass
time.sleep(interval_s)
return False
def start_server(args: argparse.Namespace) -> None:
repo_root = Path(args.repo_root).expanduser().resolve()
entry = repo_root / "deps" / "db-sync" / "worker" / "dist" / "node-adapter.js"
if not entry.exists():
fail("db-sync node adapter build artifact is missing", entry=str(entry), hint="Run: yarn --cwd deps/db-sync build:node-adapter")
pid_file = Path(args.pid_file).expanduser().resolve()
log_file = Path(args.log_file).expanduser().resolve()
data_dir = Path(args.data_dir).expanduser().resolve()
pid_file.parent.mkdir(parents=True, exist_ok=True)
log_file.parent.mkdir(parents=True, exist_ok=True)
data_dir.mkdir(parents=True, exist_ok=True)
existing_pid = load_pid(pid_file)
if existing_pid and process_running(existing_pid):
fail("db-sync server already running", pid=existing_pid, pid_file=str(pid_file))
auth_path = Path(args.auth_path).expanduser().resolve() if args.auth_path else None
auth_derived = auth_cognito_from_auth_file(auth_path) if auth_path else {"issuer": "", "client_id": ""}
issuer = args.cognito_issuer or auth_derived.get("issuer") or DEFAULT_COGNITO_ISSUER
client_id = args.cognito_client_id or auth_derived.get("client_id") or DEFAULT_COGNITO_CLIENT_ID
jwks_url = args.cognito_jwks_url or f"{issuer}/.well-known/jwks.json"
env = os.environ.copy()
env.update(
{
"DB_SYNC_PORT": str(args.port),
"DB_SYNC_DATA_DIR": str(data_dir),
"COGNITO_ISSUER": issuer,
"COGNITO_CLIENT_ID": client_id,
"COGNITO_JWKS_URL": jwks_url,
# CLI e2e sync suite should remain runnable without outbound internet.
"DB_SYNC_ALLOW_UNVERIFIED_JWT_CLAIMS": "true",
}
)
with log_file.open("a", encoding="utf-8") as stream:
stream.write(f"\n=== cli-e2e db-sync start {time.strftime('%Y-%m-%d %H:%M:%S')} ===\n")
stream.flush()
process = subprocess.Popen(
["node", str(entry)],
stdout=stream,
stderr=stream,
cwd=str(repo_root),
env=env,
start_new_session=True,
)
base_url = f"http://{args.host}:{args.port}"
if not wait_health(base_url, args.startup_timeout_s, args.poll_interval_s):
terminate_process(process.pid, force=False)
fail(
"db-sync server failed health check before timeout",
base_url=base_url,
pid=process.pid,
log_file=str(log_file),
)
pid_file.write_text(f"{process.pid}\n", encoding="utf-8")
print(
json.dumps(
{
"status": "ok",
"pid": process.pid,
"pid_file": str(pid_file),
"log_file": str(log_file),
"base_url": base_url,
"data_dir": str(data_dir),
"cognito_issuer": issuer,
"cognito_client_id": client_id,
"auth_path": str(auth_path) if auth_path else None,
}
)
)
def stop_server(args: argparse.Namespace) -> None:
pid_file = Path(args.pid_file).expanduser().resolve()
pid = load_pid(pid_file)
if pid is None:
print(json.dumps({"status": "ok", "stopped": False, "reason": "pid-file-missing-or-empty", "pid_file": str(pid_file)}))
return
if not process_running(pid):
try:
pid_file.unlink(missing_ok=True)
except OSError:
pass
print(json.dumps({"status": "ok", "stopped": False, "reason": "process-not-running", "pid": pid, "pid_file": str(pid_file)}))
return
if not terminate_process(pid, force=False):
fail("db-sync server stop failed", pid=pid, signal="SIGTERM", pid_file=str(pid_file))
deadline = time.time() + args.shutdown_timeout_s
while time.time() < deadline:
if not process_running(pid):
pid_file.unlink(missing_ok=True)
print(json.dumps({"status": "ok", "stopped": True, "signal": "SIGTERM", "pid": pid, "pid_file": str(pid_file)}))
return
time.sleep(args.poll_interval_s)
if not terminate_process(pid, force=True):
fail("db-sync server force stop failed", pid=pid, signal="SIGKILL", pid_file=str(pid_file))
pid_file.unlink(missing_ok=True)
print(json.dumps({"status": "ok", "stopped": True, "signal": "SIGKILL", "pid": pid, "pid_file": str(pid_file)}))
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Manage db-sync local server for cli-e2e sync tests")
subparsers = parser.add_subparsers(dest="command", required=True)
start = subparsers.add_parser("start", help="Start server and wait for /health")
start.add_argument("--repo-root", required=True)
start.add_argument("--pid-file", required=True)
start.add_argument("--log-file", required=True)
start.add_argument("--data-dir", required=True)
start.add_argument("--host", default="127.0.0.1")
start.add_argument("--port", type=int, default=8080)
start.add_argument("--startup-timeout-s", type=float, default=25.0)
start.add_argument("--poll-interval-s", type=float, default=0.5)
start.add_argument("--auth-path", default="~/logseq/auth.json")
start.add_argument("--cognito-issuer")
start.add_argument("--cognito-client-id")
start.add_argument("--cognito-jwks-url")
stop = subparsers.add_parser("stop", help="Stop server if running")
stop.add_argument("--pid-file", required=True)
stop.add_argument("--shutdown-timeout-s", type=float, default=10.0)
stop.add_argument("--poll-interval-s", type=float, default=0.25)
return parser
def main() -> None:
args = build_parser().parse_args()
if args.command == "start":
start_server(args)
elif args.command == "stop":
stop_server(args)
else:
fail("unknown command", command=args.command)
if __name__ == "__main__":
main()