fix sync test

This commit is contained in:
rcmerci
2026-04-20 10:56:44 +08:00
parent b9e71b6d7f
commit 1bdba923eb
16 changed files with 295 additions and 45 deletions

1
.gitignore vendored
View File

@@ -88,3 +88,4 @@ deps/db-sync/data
/dist/db-worker-node-assets.json
/dist/*.wasm
/dist/cljs-runtime/
/.agent-shell/

View File

@@ -143,6 +143,8 @@ def start_server(args: argparse.Namespace) -> None:
"COGNITO_ISSUER": issuer,
"COGNITO_CLIENT_ID": client_id,
"COGNITO_JWKS_URL": jwks_url,
# CLI e2e sync suite should remain runnable without outbound internet.
"DB_SYNC_ALLOW_UNVERIFIED_JWT_CLAIMS": "true",
}
)

View File

@@ -39,4 +39,5 @@
[(repo-path "static" "logseq-cli.js")
(repo-path "static" "db-worker-node.js")
(repo-path "dist" "db-worker-node.js")
(repo-path "dist" "db-worker-node-assets.json")])
(repo-path "dist" "db-worker-node-assets.json")
(repo-path "deps" "db-sync" "worker" "dist" "node-adapter.js")])

View File

@@ -5,7 +5,8 @@
(def build-plan
[{:cmd "clojure -M:cljs compile logseq-cli db-worker-node"}
{:cmd "yarn db-worker-node:compile:bundle"}])
{:cmd "yarn db-worker-node:compile:bundle"}
{:cmd "yarn --cwd deps/db-sync build:node-adapter"}])
(defn missing-artifacts
([]

View File

@@ -4,7 +4,8 @@
(deftest build-plan-matches-required-commands
(is (= ["clojure -M:cljs compile logseq-cli db-worker-node"
"yarn db-worker-node:compile:bundle"]
"yarn db-worker-node:compile:bundle"
"yarn --cwd deps/db-sync build:node-adapter"]
(mapv :cmd preflight/build-plan))))
(deftest missing-artifacts-returns-unreadable-paths
@@ -30,13 +31,15 @@
existing (atom #{"/repo/static/logseq-cli.js"
"/repo/static/db-worker-node.js"
"/repo/dist/db-worker-node.js"
"/repo/dist/db-worker-node-assets.json"})]
"/repo/dist/db-worker-node-assets.json"
"/repo/deps/db-sync/worker/dist/node-adapter.js"})]
(with-redefs [logseq.cli.e2e.paths/repo-root (constantly "/repo")
logseq.cli.e2e.paths/required-artifacts (fn []
["/repo/static/logseq-cli.js"
"/repo/static/db-worker-node.js"
"/repo/dist/db-worker-node.js"
"/repo/dist/db-worker-node-assets.json"])]
"/repo/dist/db-worker-node-assets.json"
"/repo/deps/db-sync/worker/dist/node-adapter.js"])]
(let [result (preflight/run! {:run-command (fn [{:keys [cmd]}]
(swap! calls conj cmd)
{:cmd cmd
@@ -46,5 +49,6 @@
:file-exists? @existing})]
(is (= :ok (:status result)))
(is (= ["clojure -M:cljs compile logseq-cli db-worker-node"
"yarn db-worker-node:compile:bundle"]
"yarn db-worker-node:compile:bundle"
"yarn --cwd deps/db-sync build:node-adapter"]
@calls))))))

View File

@@ -8,14 +8,18 @@
:removeWebSocket (fn [ws] (swap! sockets disj ws))}))
(defn- env-object [cfg index-db assets-bucket]
(doto (js-obj)
(aset "DB" index-db)
(aset "LOGSEQ_SYNC_ASSETS" assets-bucket)
;; Keep node-adapter snapshot stream uncompressed.
(aset "DB_SYNC_SNAPSHOT_STREAM_GZIP" "false")
(aset "COGNITO_ISSUER" (:cognito-issuer cfg))
(aset "COGNITO_CLIENT_ID" (:cognito-client-id cfg))
(aset "COGNITO_JWKS_URL" (:cognito-jwks-url cfg))))
(let [allow-unverified-jwt-claims (some-> js/process .-env (aget "DB_SYNC_ALLOW_UNVERIFIED_JWT_CLAIMS"))
env (doto (js-obj)
(aset "DB" index-db)
(aset "LOGSEQ_SYNC_ASSETS" assets-bucket)
;; Keep node-adapter snapshot stream uncompressed.
(aset "DB_SYNC_SNAPSHOT_STREAM_GZIP" "false")
(aset "COGNITO_ISSUER" (:cognito-issuer cfg))
(aset "COGNITO_CLIENT_ID" (:cognito-client-id cfg))
(aset "COGNITO_JWKS_URL" (:cognito-jwks-url cfg)))]
(when (some? allow-unverified-jwt-claims)
(aset env "DB_SYNC_ALLOW_UNVERIFIED_JWT_CLAIMS" allow-unverified-jwt-claims))
env))
(defn graph-context
[{:keys [config index-db assets-bucket]} graph-id]

View File

@@ -24,15 +24,19 @@
(logging/install!)
(defn- make-env [cfg index-db assets-bucket]
(doto (js-obj)
(aset "DB" index-db)
(aset "LOGSEQ_SYNC_ASSETS" assets-bucket)
;; Node adapter serves snapshot transit stream without gzip to avoid
;; browser/adapter content-encoding mismatches during graph download.
(aset "DB_SYNC_SNAPSHOT_STREAM_GZIP" "false")
(aset "COGNITO_ISSUER" (:cognito-issuer cfg))
(aset "COGNITO_CLIENT_ID" (:cognito-client-id cfg))
(aset "COGNITO_JWKS_URL" (:cognito-jwks-url cfg))))
(let [allow-unverified-jwt-claims (some-> js/process .-env (aget "DB_SYNC_ALLOW_UNVERIFIED_JWT_CLAIMS"))
env (doto (js-obj)
(aset "DB" index-db)
(aset "LOGSEQ_SYNC_ASSETS" assets-bucket)
;; Node adapter serves snapshot transit stream without gzip to avoid
;; browser/adapter content-encoding mismatches during graph download.
(aset "DB_SYNC_SNAPSHOT_STREAM_GZIP" "false")
(aset "COGNITO_ISSUER" (:cognito-issuer cfg))
(aset "COGNITO_CLIENT_ID" (:cognito-client-id cfg))
(aset "COGNITO_JWKS_URL" (:cognito-jwks-url cfg)))]
(when (some? allow-unverified-jwt-claims)
(aset env "DB_SYNC_ALLOW_UNVERIFIED_JWT_CLAIMS" allow-unverified-jwt-claims))
env))
(defn- access-allowed?
[env graph-id request]

View File

@@ -14,15 +14,19 @@
(defn- entity-ref->eid
[db entity-ref]
(cond
(and (number? entity-ref) (neg? entity-ref))
nil
(let [entity-ref' (if (and (sequential? entity-ref)
(not (vector? entity-ref)))
(vec entity-ref)
entity-ref)]
(cond
(and (number? entity-ref') (neg? entity-ref'))
nil
:else
(try
(some-> (d/entity db entity-ref) :db/id)
(catch :default _
nil))))
:else
(try
(some-> (d/entity db entity-ref') :db/id)
(catch :default _
nil)))))
(def ^:private entity-op-kinds
#{:db/add :db/retract :db/cas :db.fn/cas})
@@ -30,6 +34,84 @@
(def ^:private encrypted-attrs
#{:block/title :block/name})
(def ^:private optional-missing-lookup-ref-attrs
#{:logseq.property/created-by-ref
:block/refs
:block/tags})
(defn- tx-block-uuids
[tx-data]
(reduce (fn [acc item]
(cond
(and (map? item)
(uuid? (:block/uuid item)))
(conj acc (:block/uuid item))
(and (vector? item)
(<= 4 (count item))
(contains? entity-op-kinds (first item))
(= :block/uuid (nth item 2))
(uuid? (nth item 3)))
(conj acc (nth item 3))
:else
acc))
#{}
tx-data))
(defn- lookup-ref-target-exists?
[db tx-created-block-uuids target]
(cond
(nil? target)
false
;; Tempids may resolve later within the same tx.
(and (number? target) (neg? target))
true
;; Newly introduced block/uuid refs are valid in this tx even when not yet in db.
(and (sequential? target)
(= 2 (count target))
(= :block/uuid (first target))
(uuid? (second target)))
(or (contains? tx-created-block-uuids (second target))
(some? (entity-ref->eid db [:block/uuid (second target)])))
:else
(some? (entity-ref->eid db target))))
(defn- drop-missing-optional-lookup-refs
[db tx-data]
(let [tx-created-block-uuids (tx-block-uuids tx-data)]
(reduce (fn [result item]
(cond
;; Remove stale lookup refs when target is missing.
(and (vector? item)
(<= 4 (count item))
(= :db/add (first item))
(contains? optional-missing-lookup-ref-attrs (nth item 2)))
(if (lookup-ref-target-exists? db tx-created-block-uuids (nth item 3))
(conj result item)
result)
;; Same cleanup for map tx entities.
(map? item)
(let [item' (reduce (fn [m attr]
(if (and (contains? m attr)
(not (lookup-ref-target-exists? db tx-created-block-uuids (get m attr))))
(dissoc m attr)
m))
item
optional-missing-lookup-ref-attrs)]
(if (some (fn [k] (not= :db/id k)) (keys item'))
(conj result item')
result))
:else
(conj result item)))
[]
tx-data)))
(defn- drop-conflicted-encrypted-retracts
"When encrypted tx data is decrypted, old/new ciphertexts can collapse to the
same plaintext value. A valid pair like
@@ -88,6 +170,7 @@
(remove (fn [item]
(and (retract-entity-op? item)
(nil? (entity-ref->eid db (second item)))))))
tx-data* (drop-missing-optional-lookup-refs db tx-data*)
tx-data* (drop-conflicted-encrypted-retracts tx-data*)
tx-data* (vec tx-data*)
retract-eids (->> tx-data*

View File

@@ -34,12 +34,28 @@
(def ^:private recoverable-auth-errors
#{"invalid" "iss not found" "aud not found" "exp" "kid"})
(def ^:private truthy-env-values
#{"1" "true" "yes" "on"})
(defn- recoverable-auth-error?
[error]
(when error
(let [message (or (ex-message error) (some-> error .-message))]
(contains? recoverable-auth-errors message))))
(defn- env-flag-enabled?
[env k]
(let [v (some-> env (aget k))]
(cond
(true? v) true
(false? v) false
(string? v) (contains? truthy-env-values (string/lower-case v))
:else false)))
(defn- allow-unverified-jwt-claims?
[env]
(env-flag-enabled? env "DB_SYNC_ALLOW_UNVERIFIED_JWT_CLAIMS"))
(defn- expired-token?
[token]
(when-let [claims (unsafe-jwt-claims token)]
@@ -55,7 +71,13 @@
(p/resolved nil)
(-> (authorization/verify-jwt token env)
(p/catch (fn [error]
(if (recoverable-auth-error? error)
(cond
(recoverable-auth-error? error)
nil
(allow-unverified-jwt-claims? env)
(unsafe-jwt-claims token)
:else
(p/rejected error))))))
(p/resolved nil))))

View File

@@ -57,6 +57,22 @@
(is (= "jwks" (ex-message error)))
(done)))))))
(deftest auth-claims-jwks-error-falls-back-to-unsafe-claims-when-enabled-test
(async done
(let [token "eyJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJ1MSJ9.signature"
request (js/Request. "http://localhost/graphs"
#js {:headers #js {"authorization" (str "Bearer " token)}})
env #js {"DB_SYNC_ALLOW_UNVERIFIED_JWT_CLAIMS" "true"}]
(-> (p/with-redefs [authorization/verify-jwt
(fn [_token _env]
(p/rejected (ex-info "jwks" {})))]
(p/let [claims (auth/auth-claims request env)]
(is (= "u1" (aget claims "sub")))))
(p/then (fn [] (done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))
(deftest auth-claims-expired-jwt-short-circuits-verification-test
(async done
(let [expired-token "eyJhbGciOiJSUzI1NiJ9.eyJleHAiOjEsInN1YiI6InUxIn0.signature"

View File

@@ -561,6 +561,59 @@
(is (= "pull/ok" (:type pull-response)))
(is (empty? (:txs pull-response)))))))
(deftest tx-batch-drops-missing-created-by-ref-lookup-test
(testing "missing created-by lookup refs are sanitized so tx batch still applies"
(let [sql (test-sql/make-sql)
conn (storage/open-conn sql)
self #js {:sql sql
:conn conn
:schema-ready true}
page-uuid (random-uuid)
missing-user-uuid (random-uuid)
tx-entry {:tx (protocol/tx->transit [[:db/add -1 :block/uuid page-uuid]
[:db/add -1 :block/name "created-by-sanitize-page"]
[:db/add -1 :block/title "created-by-sanitize-page"]
[:db/add -1 :logseq.property/created-by-ref
[:block/uuid missing-user-uuid]]])
:outliner-op :save-block}
response (with-redefs [ws/broadcast! (fn [& _] nil)]
(sync-handler/handle-tx-batch! self nil [tx-entry] 0))
page (d/entity @conn [:block/uuid page-uuid])]
(is (= "tx/batch/ok" (:type response)))
(is (= 1 (:t response)))
(is (some? page))
(is (= "created-by-sanitize-page" (:block/title page)))
(is (nil? (:logseq.property/created-by-ref page))))))
(deftest tx-batch-drops-missing-optional-lookup-refs-test
(testing "missing optional lookup refs (tags/refs/created-by) are sanitized so page create still applies"
(let [sql (test-sql/make-sql)
conn (storage/open-conn sql)
self #js {:sql sql
:conn conn
:schema-ready true}
page-uuid (random-uuid)
missing-ref-uuid (random-uuid)
missing-tag-uuid (random-uuid)
missing-user-uuid (random-uuid)
tx-entry {:tx (protocol/tx->transit [[:db/add -1 :block/uuid page-uuid]
[:db/add -1 :block/name "optional-ref-sanitize-page"]
[:db/add -1 :block/title "optional-ref-sanitize-page"]
[:db/add -1 :block/refs [:block/uuid missing-ref-uuid]]
[:db/add -1 :block/tags [:block/uuid missing-tag-uuid]]
[:db/add -1 :logseq.property/created-by-ref [:block/uuid missing-user-uuid]]])
:outliner-op :create-page}
response (with-redefs [ws/broadcast! (fn [& _] nil)]
(sync-handler/handle-tx-batch! self nil [tx-entry] 0))
page (d/entity @conn [:block/uuid page-uuid])]
(is (= "tx/batch/ok" (:type response)))
(is (= 1 (:t response)))
(is (some? page))
(is (= "optional-ref-sanitize-page" (:block/title page)))
(is (nil? (:block/refs page)))
(is (nil? (:block/tags page)))
(is (nil? (:logseq.property/created-by-ref page))))))
(deftest tx-batch-rejects-while-snapshot-upload-is-in-progress-test
(let [sql (test-sql/make-sql)
conn (d/create-conn db-schema/schema)

View File

@@ -47,7 +47,10 @@
(or (internal-page? entity)
(journal? entity)
(class? entity)
(property? entity)))
(property? entity)
;; Sync sanitize can drop :block/tags when optional lookup refs are missing.
;; Keep a :block/name fallback so page entities are still recognized.
(some? (:block/name entity))))
(defn asset?
"Given an entity or map, check if it is an asset block"

View File

@@ -137,11 +137,15 @@
(let [sql (gobj/get opts-or-sql "sql")
bind (gobj/get opts-or-sql "bind")
row-mode (gobj/get opts-or-sql "rowMode")
return-value (gobj/get opts-or-sql "returnValue")
bind' (normalize-bind bind)
^js stmt (.prepare db sql)]
(if (= row-mode "array")
(if (or (= row-mode "array")
(= row-mode "object")
(= return-value "resultRows"))
(do
(.setReturnArrays stmt true)
(when (= row-mode "array")
(.setReturnArrays stmt true))
(stmt-all stmt bind'))
(do
(stmt-run stmt bind')

View File

@@ -242,8 +242,12 @@
(defn get-graph-uuid
[repo]
(some-> (sqlite-store-or-throw repo)
(sqlite-get-meta :graph-uuid)))
(when-let [store (sqlite-store-or-throw repo)]
(let [value (sqlite-get-meta store :graph-uuid)]
(prn :debug/client-op-get-graph-uuid {:repo repo
:has-store? (some? store)
:value value})
value)))
(defn update-local-tx
[repo t]
@@ -251,7 +255,6 @@
(let [store (sqlite-store-or-throw repo)]
(assert (some? store) repo)
(sqlite-set-meta! store :local-tx t)))
(defn update-local-checksum
[repo checksum]
{:pre [(some? checksum)]}
@@ -267,8 +270,13 @@
(defn get-local-tx
[repo]
(when-let [store (sqlite-store-or-throw repo)]
(some-> (sqlite-get-meta store :local-tx)
(js/parseInt 10))))
(let [raw (sqlite-get-meta store :local-tx)
parsed (some-> raw
(js/parseInt 10))]
(prn :debug/client-op-get-local-tx {:repo repo
:raw raw
:parsed parsed})
parsed)))
(defn get-pending-local-tx-count
[repo]
@@ -281,6 +289,9 @@
(aget "c"))
0)
0)]
(prn :debug/client-op-pending-local {:repo repo
:count count'
:cache-hit? false})
(swap! *repo->pending-local-tx-count assoc repo count')
count')))

View File

@@ -631,15 +631,16 @@
(defn- run-sync-start
[action config]
(-> (p/let [config' (resolve-runtime-config! action config)
missing-keys (missing-required-sync-config-keys (:type action) config')]
missing-keys (missing-required-sync-config-keys (:type action) config')
start-config (assoc config' :ws-url (effective-sync-config-value config' :ws-url))]
(if (seq missing-keys)
(missing-sync-config-error (:type action) missing-keys)
(p/let [cfg (cli-server/ensure-server! config' (:repo action))
_ (<sync-worker-runtime! cfg config')
(p/let [cfg (cli-server/ensure-server! start-config (:repo action))
_ (<sync-worker-runtime! cfg start-config)
graph-e2ee? (transport/invoke cfg :thread-api/q false [(:repo action) [graph-e2ee-query]])
_ (<ensure-e2ee-password-available! cfg config' action (true? graph-e2ee?))
_ (<ensure-e2ee-password-available! cfg start-config action (true? graph-e2ee?))
_ (transport/invoke cfg :thread-api/db-sync-start false [(:repo action)])
result (wait-sync-start-ready config' (:repo action) action)]
result (wait-sync-start-ready start-config (:repo action) action)]
result)))
(p/catch (fn [error]
(if (= :e2ee-password-not-found (:code (ex-data error)))

View File

@@ -121,6 +121,46 @@
(is false (str "unexpected error: " e))))
(p/finally done)))))
(deftest test-execute-sync-start-uses-default-ws-url-when-config-missing
(async done
(let [invoke-calls (atom [])
worker-sync-config (atom {:ws-url nil})]
(-> (p/with-redefs [cli-server/ensure-server! (fn [config _repo]
(p/resolved (assoc config :base-url "http://example")))
transport/invoke (fn [_ method direct-pass? args]
(swap! invoke-calls conj [method direct-pass? args])
(case method
:thread-api/set-db-sync-config
(let [cfg (first args)]
(reset! worker-sync-config cfg)
(p/resolved nil))
:thread-api/db-sync-start
(p/resolved nil)
:thread-api/db-sync-status
(p/resolved {:repo "logseq_db_demo"
:ws-state (if (seq (:ws-url @worker-sync-config)) :open :stopped)
:pending-local 0
:pending-asset 0
:pending-server 0})
(p/resolved {:ok true})))]
(p/let [result (execute-with-runtime-auth {:type :sync-start
:repo "logseq_db_demo"
:wait-timeout-ms 20
:wait-poll-interval-ms 0}
{:data-dir "/tmp"})
set-config-calls (filter #(= :thread-api/set-db-sync-config (first %)) @invoke-calls)]
(is (= :ok (:status result)))
(is (seq set-config-calls))
(is (every? #(= "wss://api.logseq.io/sync/%s"
(get-in % [2 0 :ws-url]))
set-config-calls))))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally done)))))
(deftest test-execute-sync-start-verifies-and-persists-e2ee-password-when-provided
(async done
(let [invoke-calls (atom [])]