fix(db-worker): compare repo-name failed sometimes because of db-version-prefix

This commit is contained in:
rcmerci
2026-05-06 23:01:25 +08:00
parent 575ddecced
commit 5301d39f07
11 changed files with 279 additions and 62 deletions

View File

@@ -45,6 +45,24 @@
(subs repo (count common-config/db-version-prefix))
repo)))
(defn repo-identity
"Return the canonical value used for repo identity comparison.
Repo identity comparison is based on the graph directory key, so `demo` and
`logseq_db_demo` identify the same graph. Use this helper, or `same-repo?`,
whenever code needs to decide whether two repo names represent the same graph."
[repo]
(repo->graph-dir-key repo))
(defn same-repo?
"Return true when two repo names identify the same graph."
[a b]
(let [a' (repo-identity a)
b' (repo-identity b)]
(and (some? a')
(some? b')
(= a' b'))))
(defn graph-dir-key->encoded-dir-name
[graph-dir-key]
(when (some? graph-dir-key)

View File

@@ -1,5 +1,6 @@
(ns electron.db-worker
(:require [logseq.cli.server :as cli-server]
[logseq.common.graph-dir :as graph-dir]
[logseq.db-worker.daemon :as daemon]
[promesa.core :as p]))
@@ -8,9 +9,40 @@
{:repos {}
:window->repo {}})
(defn- repo-key
[repo]
(graph-dir/repo-identity repo))
(defn- merge-repo-entry
[existing entry]
(if existing
(-> existing
(update :windows (fnil into #{}) (:windows entry))
(update :runtime #(or % (:runtime entry))))
entry))
(defn- normalize-state
[state]
(let [state (merge (initial-state) state)
repos (reduce-kv (fn [m repo entry]
(if-let [key (repo-key repo)]
(update m key merge-repo-entry entry)
m))
{}
(:repos state))
window->repo (reduce-kv (fn [m window-id repo]
(if-let [key (repo-key repo)]
(assoc m window-id key)
m))
{}
(:window->repo state))]
(assoc state
:repos repos
:window->repo window->repo)))
(defn- ensure-state
[state]
(merge (initial-state) state))
(normalize-state state))
(defn- dissoc-window
[state window-id]
@@ -100,39 +132,40 @@
(defn ensure-started!
[{:keys [state start-daemon! stop-daemon! runtime-ready?] :as manager} repo window-id]
(p/let [current-repo (get-in (ensure-state @state) [:window->repo window-id])
_ (when (and current-repo (not= current-repo repo))
(ensure-window-stopped! manager window-id))]
(if-let [entry (get-in (ensure-state @state) [:repos repo])]
(p/let [runtime (:runtime entry)
ready? (runtime-ready? runtime)]
(if ready?
(do
(swap! state (fn [current]
(-> (ensure-state current)
(update-in [:repos repo :windows] (fnil conj #{}) window-id)
(assoc-in [:window->repo window-id] repo))))
runtime)
(p/let [_ (when (owned-runtime? runtime)
(-> (stop-daemon! runtime)
(p/catch (fn [_] nil))))
runtime' (start-daemon! repo)]
(swap! state
(fn [current]
(let [current' (ensure-state current)
windows (get-in current' [:repos repo :windows] #{})]
(-> current'
(assoc-in [:repos repo] {:runtime runtime'
:windows (conj windows window-id)})
(assoc-in [:window->repo window-id] repo)))))
runtime')))
(p/let [runtime (start-daemon! repo)]
(swap! state (fn [current]
(-> (ensure-state current)
(assoc-in [:repos repo] {:runtime runtime
:windows #{window-id}})
(assoc-in [:window->repo window-id] repo))))
runtime))))
(let [key (repo-key repo)]
(p/let [current-repo (get-in (ensure-state @state) [:window->repo window-id])
_ (when (and current-repo (not= current-repo key))
(ensure-window-stopped! manager window-id))]
(if-let [entry (get-in (ensure-state @state) [:repos key])]
(p/let [runtime (:runtime entry)
ready? (runtime-ready? runtime)]
(if ready?
(do
(swap! state (fn [current]
(-> (ensure-state current)
(update-in [:repos key :windows] (fnil conj #{}) window-id)
(assoc-in [:window->repo window-id] key))))
runtime)
(p/let [_ (when (owned-runtime? runtime)
(-> (stop-daemon! runtime)
(p/catch (fn [_] nil))))
runtime' (start-daemon! repo)]
(swap! state
(fn [current]
(let [current' (ensure-state current)
windows (get-in current' [:repos key :windows] #{})]
(-> current'
(assoc-in [:repos key] {:runtime runtime'
:windows (conj windows window-id)})
(assoc-in [:window->repo window-id] key)))))
runtime')))
(p/let [runtime (start-daemon! repo)]
(swap! state (fn [current]
(-> (ensure-state current)
(assoc-in [:repos key] {:runtime runtime
:windows #{window-id}})
(assoc-in [:window->repo window-id] key))))
runtime)))))
(defn- parse-runtime-lock
[{:keys [base-url]}]
@@ -156,20 +189,21 @@
(defn ensure-stopped!
[{:keys [state stop-daemon!]} repo window-id]
(if (= repo (get-in (ensure-state @state) [:window->repo window-id]))
(ensure-window-stopped! {:state state :stop-daemon! stop-daemon!} window-id)
(let [runtime* (atom nil)]
(swap! state
(fn [current]
(let [[next-state runtime] (detach-window-from-repo current repo window-id)]
(reset! runtime* runtime)
next-state)))
(if-let [runtime @runtime*]
(if (owned-runtime? runtime)
(p/let [_ (stop-daemon! runtime)]
true)
(p/resolved true))
(p/resolved false)))))
(let [key (repo-key repo)]
(if (= key (get-in (ensure-state @state) [:window->repo window-id]))
(ensure-window-stopped! {:state state :stop-daemon! stop-daemon!} window-id)
(let [runtime* (atom nil)]
(swap! state
(fn [current]
(let [[next-state runtime] (detach-window-from-repo current key window-id)]
(reset! runtime* runtime)
next-state)))
(if-let [runtime @runtime*]
(if (owned-runtime? runtime)
(p/let [_ (stop-daemon! runtime)]
true)
(p/resolved true))
(p/resolved false))))))
(defn stop-all!
[{:keys [state stop-daemon!]}]
@@ -185,10 +219,11 @@
(defn ensure-repo-stopped!
[{:keys [state stop-daemon!]} repo]
(let [runtime* (atom nil)]
(let [key (repo-key repo)
runtime* (atom nil)]
(swap! state
(fn [current]
(let [[next-state runtime] (detach-repo current repo)]
(let [[next-state runtime] (detach-repo current key)]
(reset! runtime* runtime)
next-state)))
(if-let [runtime @runtime*]

View File

@@ -11,6 +11,7 @@
[frontend.state :as state]
[frontend.util :as util]
[lambdaisland.glogi :as log]
[logseq.common.graph-dir :as graph-dir]
[logseq.db :as ldb]
[promesa.core :as p]))
@@ -28,14 +29,18 @@
(reset! remote-repo nil)
(reset! state/*db-worker nil))
(defn- same-remote-repo?
[repo runtime-repo]
(graph-dir/same-repo? repo runtime-repo))
(defn- <stop-remote-if-current!
[repo]
(if (and repo (= repo @remote-repo))
(if (and repo (same-remote-repo? repo @remote-repo))
(if-let [remote-client @remote-db]
(-> (remote/stop! remote-client)
(p/finally
(fn []
(when (= repo @remote-repo)
(when (same-remote-repo? repo @remote-repo)
(clear-remote-runtime!)))))
(do
(clear-remote-runtime!)
@@ -55,7 +60,7 @@
(defn- active-runtime-session?
[state repo session-id]
(and (= repo (:repo state))
(and (same-remote-repo? repo (:repo state))
(= session-id (:session-id state))))
(defn- reset-active-request-failures!
@@ -84,7 +89,7 @@
(p/catch (fn [error]
(log/warn :db-worker-failover-stop-error {:repo repo
:error error})))))
(when (= repo @remote-repo)
(when (same-remote-repo? repo @remote-repo)
(clear-remote-runtime!))
(-> (ipc/ipc "releaseDbWorkerRuntime" repo)
(p/catch (fn [error]
@@ -132,7 +137,7 @@
(defn- <ensure-remote!
[repo]
(if (or (nil? repo) (= repo @remote-repo))
(if (or (nil? repo) (same-remote-repo? repo @remote-repo))
(p/resolved @remote-db)
(let [session-id (str (random-uuid))]
(p/let [_ (when @remote-db
@@ -194,7 +199,7 @@
(defn <close-db [repo]
(when repo
(if (electron-runtime?)
(if (= repo @remote-repo)
(if (same-remote-repo? repo @remote-repo)
(if-let [remote-client @remote-db]
(p/let [_ (-> (remote/invoke! (:client remote-client) "thread-api/close-db" [repo])
(p/catch (fn [_] nil)))

View File

@@ -281,7 +281,7 @@
(defn- close-other-dbs!
[repo]
(doseq [[r {:keys [db search client-ops]}] @*sqlite-conns]
(when-not (= repo r)
(when-not (graph-dir/same-repo? repo r)
(close-db-aux! r db search client-ops))))
(defn close-db!
@@ -634,7 +634,7 @@
(def-thread-api :thread-api/create-or-open-db
[repo opts]
(when-not (= repo (worker-state/get-current-repo)) ; graph switched
(when-not (graph-dir/same-repo? repo (worker-state/get-current-repo)) ; graph switched
(reset! worker-state/*deleted-block-uuid->db-id {}))
(start-db! repo opts))

View File

@@ -9,6 +9,7 @@
[frontend.worker.platform.node :as platform-node]
[frontend.worker.state :as worker-state]
[lambdaisland.glogi :as log]
[logseq.common.graph-dir :as graph-dir]
[logseq.common.version :as build-version]
[logseq.cli.root-dir :as root-dir]
[logseq.cli.style :as style]
@@ -221,7 +222,7 @@
:error {:code :missing-repo
:message "repo is required"}}
(not= repo bound-repo)
(not (graph-dir/same-repo? repo bound-repo))
{:status 409
:error {:code :repo-mismatch
:message "repo does not match bound repo"
@@ -437,7 +438,7 @@
{:code :repo-locked
:repo target-repo})))
_ (when (and (seq target-repo)
(not= target-repo (:repo lock)))
(not (graph-dir/same-repo? target-repo (:repo lock))))
(throw (ex-info "graph lock repo mismatch"
{:code :repo-locked
:repo target-repo

View File

@@ -158,7 +158,7 @@
:path path
:lock lock}))
(not= repo (:repo lock))
(not (graph-dir/same-repo? repo (:repo lock)))
(throw (ex-info "graph lock repo mismatch"
{:code :repo-locked
:path path

View File

@@ -226,7 +226,7 @@
(defn- repo-server
[config servers repo]
(first (filter #(= repo (:repo %))
(first (filter #(graph-dir/same-repo? repo (:repo %))
(servers-for-config config servers))))
(defn discover-servers

View File

@@ -41,6 +41,29 @@
(is false (str "unexpected error: " e))))
(p/finally (fn [] (done)))))))
(deftest ensure-started-reuses-prefix-equivalent-runtime
(async done
(let [start-calls (atom [])
stop-calls (atom [])
manager (db-worker/create-manager
{:start-daemon! (fn [repo]
(swap! start-calls conj repo)
(p/resolved (runtime repo)))
:stop-daemon! (fn [rt]
(swap! stop-calls conj (:repo rt))
(p/resolved true))})]
(-> (p/let [first-runtime (db-worker/ensure-started! manager "demo" :window-1)
second-runtime (db-worker/ensure-started! manager "logseq_db_demo" :window-1)
manager-state @(:state manager)]
(is (= first-runtime second-runtime))
(is (= ["demo"] @start-calls))
(is (empty? @stop-calls))
(is (= "demo" (get-in manager-state [:window->repo :window-1])))
(is (= #{:window-1} (get-in manager-state [:repos "demo" :windows]))))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally (fn [] (done)))))))
(deftest ensure-started-switches-window-repo-and-stops-previous-daemon
(async done
(let [start-calls (atom [])

View File

@@ -255,6 +255,84 @@
(set! remote/stop! original-stop!)
(done)))))))
(deftest electron-ensure-remote-reuses-prefix-equivalent-runtime
(async done
(let [ipc-calls (atom [])
start-calls (atom [])
stop-calls (atom [])
ensure-remote! #'persist-db/<ensure-remote!
original-ipc ipc/ipc
original-start! remote/start!
original-stop! remote/stop!]
(reset-runtime-state!)
(set! ipc/ipc (fn [channel repo]
(swap! ipc-calls conj [channel repo])
(p/resolved {:base-url "http://127.0.0.1:9101"
:auth-token nil
:repo repo})))
(set! remote/start! (fn [{:keys [repo]}]
(swap! start-calls conj repo)
(->FakeRemote repo (fn [& _] nil))))
(set! remote/stop! (fn [client]
(swap! stop-calls conj (:repo client))
(p/resolved true)))
(-> (p/let [first-client (ensure-remote! "demo")
second-client (ensure-remote! "logseq_db_demo")]
(is (= first-client second-client))
(is (= [["db-worker-runtime" "demo"]] @ipc-calls))
(is (= ["demo"] @start-calls))
(is (empty? @stop-calls))
(is (= "demo" @persist-db/remote-repo)))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally (fn []
(set! ipc/ipc original-ipc)
(set! remote/start! original-start!)
(set! remote/stop! original-stop!)
(done)))))))
(deftest electron-graph-switch-fetch-init-data-reuses-prefix-equivalent-runtime
(async done
(let [ipc-calls (atom [])
start-calls (atom [])
stop-calls (atom [])
original-electron? util/electron?
original-ipc ipc/ipc
original-start! remote/start!
original-stop! remote/stop!]
(reset-runtime-state!)
(set! util/electron? (constantly true))
(set! ipc/ipc (fn [channel repo]
(swap! ipc-calls conj [channel repo])
(p/resolved {:base-url "http://127.0.0.1:9101"
:auth-token nil
:repo repo})))
(set! remote/start! (fn [{:keys [repo]}]
(swap! start-calls conj repo)
(->FakeRemote repo (fn [& _] nil))))
(set! remote/stop! (fn [client]
(swap! stop-calls conj (:repo client))
(p/resolved true)))
(-> (p/let [first-result (persist-db/<fetch-init-data "demo" {})
second-result (persist-db/<fetch-init-data "logseq_db_demo" {})]
(is (= {:schema {:repo "demo"}
:initial-data []}
first-result))
(is (= {:schema {:repo "logseq_db_demo"}
:initial-data []}
second-result))
(is (= [["db-worker-runtime" "demo"]] @ipc-calls))
(is (= ["demo"] @start-calls))
(is (empty? @stop-calls)))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally (fn []
(set! util/electron? original-electron?)
(set! ipc/ipc original-ipc)
(set! remote/start! original-start!)
(set! remote/stop! original-stop!)
(done)))))))
(deftest electron-fetch-init-data-then-set-current-repo-does-not-rebind-runtime
(async done
(let [ipc-calls (atom [])

View File

@@ -669,6 +669,7 @@
:error {:code :missing-repo
:message "repo is required"}}
(repo-error :thread-api/create-or-open-db [:public-key] bound-repo)))
(is (nil? (repo-error :thread-api/create-or-open-db ["bound"] bound-repo)))
(is (= {:status 409
:error {:code :repo-mismatch
:message "repo does not match bound repo"
@@ -1068,6 +1069,28 @@
:else
(done)))))))))
(deftest db-worker-node-accepts-prefix-equivalent-repo-test
(async done
(let [daemon (atom nil)
data-dir (node-helper/create-tmp-dir "db-worker-prefix-equivalent")
bound-repo "demo"
requested-repo "logseq_db_demo"]
(-> (p/let [{:keys [host port stop!]}
(start-daemon! {:root-dir data-dir
:repo bound-repo
:create-empty-db? true})
_ (reset! daemon {:host host :port port :stop! stop!})
{:keys [status body]} (invoke-raw host port "thread-api/create-or-open-db" [requested-repo {}])
parsed (js->clj (js/JSON.parse body) :keywordize-keys true)]
(is (= 200 status))
(is (= true (:ok parsed))))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally (fn []
(if-let [stop! (:stop! @daemon)]
(-> (stop!) (p/finally (fn [] (done))))
(done))))))))
(deftest db-worker-node-repo-mismatch-test
(async done
(let [daemon (atom nil)

View File

@@ -129,6 +129,40 @@
(is false (str "unexpected error: " e))))
(p/finally done)))))
(deftest ensure-server-reuses-prefix-free-discovered-server
(async done
(let [root-dir (node-helper/create-tmp-dir "cli-server-prefix-free-reuse")
requested-repo "logseq_db_demo"
_lock-file (write-test-lock! root-dir requested-repo :cli)
spawn-calls (atom 0)
server (revision-test-server {:repo "demo"
:port 9420
:owner-source :cli
:revision "expected-revision"
:root-dir root-dir})]
(-> (p/with-redefs [daemon/cleanup-stale-lock! (fn [_ _] (p/resolved nil))
daemon/spawn-server! (fn [_]
(swap! spawn-calls inc)
nil)
cli-server/discover-servers (fn [_]
(p/resolved [server]))
daemon/wait-for (fn [pred-fn _opts]
(p/let [matched? (pred-fn)]
(if matched?
true
(throw (ex-info "timed out" {:code :timeout})))))
daemon/wait-for-ready (fn [_] (p/resolved true))]
(cli-server/ensure-server! {:root-dir root-dir
:owner-source :cli
:expected-revision "expected-revision"}
requested-repo))
(p/then (fn [config]
(is (= "http://127.0.0.1:9420" (:base-url config)))
(is (= 0 @spawn-calls))))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally done)))))
(deftest ensure-server-restarts-cli-owned-mismatched-revision
(async done
(let [root-dir (node-helper/create-tmp-dir "cli-server-revision-restart-cli")