mirror of
https://github.com/logseq/logseq.git
synced 2026-05-30 07:29:48 +00:00
fix(db-worker): stop runtimes cleanly on shutdown and graph delete
This commit is contained in:
@@ -52,6 +52,25 @@
|
||||
(update :repos dissoc repo))]
|
||||
[state' (when (empty? remaining) (:runtime entry))]))))
|
||||
|
||||
(defn- detach-repo
|
||||
[state repo]
|
||||
(let [state (ensure-state state)
|
||||
entry (get-in state [:repos repo])]
|
||||
(if-not entry
|
||||
[state nil]
|
||||
(let [windows (or (:windows entry) #{})
|
||||
state' (-> state
|
||||
(update :repos dissoc repo)
|
||||
(update :window->repo
|
||||
(fn [window->repo]
|
||||
(reduce (fn [m window-id]
|
||||
(if (= repo (get m window-id))
|
||||
(dissoc m window-id)
|
||||
m))
|
||||
window->repo
|
||||
windows))))]
|
||||
[state' (:runtime entry)]))))
|
||||
|
||||
(defn create-manager
|
||||
[{:keys [start-daemon! stop-daemon! runtime-ready?] :as deps}]
|
||||
{:deps deps
|
||||
@@ -164,6 +183,21 @@
|
||||
(reset! state (initial-state))
|
||||
true)))))
|
||||
|
||||
(defn ensure-repo-stopped!
|
||||
[{:keys [state stop-daemon!]} repo]
|
||||
(let [runtime* (atom nil)]
|
||||
(swap! state
|
||||
(fn [current]
|
||||
(let [[next-state runtime] (detach-repo current repo)]
|
||||
(reset! runtime* runtime)
|
||||
next-state)))
|
||||
(if-let [runtime @runtime*]
|
||||
(if (owned-runtime? runtime)
|
||||
(p/let [_ (stop-daemon! runtime)]
|
||||
true)
|
||||
(p/resolved true))
|
||||
(p/resolved false))))
|
||||
|
||||
(defn- start-managed-daemon!
|
||||
[repo]
|
||||
(p/let [config (cli-server/ensure-server! {:owner-source :electron} repo)]
|
||||
@@ -191,6 +225,10 @@
|
||||
[window-id]
|
||||
(ensure-window-stopped! manager window-id))
|
||||
|
||||
(defn release-repo!
|
||||
[repo]
|
||||
(ensure-repo-stopped! manager repo))
|
||||
|
||||
(defn stop-all-managed!
|
||||
[]
|
||||
(stop-all! manager))
|
||||
|
||||
@@ -199,7 +199,8 @@
|
||||
|
||||
(defmethod handle :deleteGraph [_window [_ graph]]
|
||||
(when-let [repo (canonical-repo graph)]
|
||||
(cli-common/unlink-graph! repo)))
|
||||
(p/let [_ (db-worker/release-repo! repo)]
|
||||
(cli-common/unlink-graph! repo))))
|
||||
|
||||
;; DB related IPCs start
|
||||
|
||||
|
||||
@@ -139,6 +139,19 @@
|
||||
method-str (normalize-method-str method-kw)]
|
||||
(<invoke! proxy method-str method-kw true #js [])))
|
||||
|
||||
(defn- <close-bound-repo!
|
||||
[proxy repo]
|
||||
(if (string/blank? repo)
|
||||
(p/resolved nil)
|
||||
(let [method-kw :thread-api/close-db
|
||||
method-str (normalize-method-str method-kw)]
|
||||
(-> (<invoke! proxy method-str method-kw false [repo])
|
||||
(p/catch (fn [error]
|
||||
(log/warn :db-worker-node-close-db-before-stop-failed
|
||||
{:repo repo
|
||||
:error error})
|
||||
nil))))))
|
||||
|
||||
(def ^:private non-repo-methods
|
||||
#{:thread-api/init
|
||||
:thread-api/set-db-sync-config
|
||||
@@ -395,7 +408,7 @@
|
||||
file-path))
|
||||
|
||||
(defn start-daemon!
|
||||
[{:keys [data-dir repo log-level owner-source] :as opts}]
|
||||
[{:keys [data-dir repo log-level owner-source on-stopped!] :as opts}]
|
||||
(let [host "127.0.0.1"
|
||||
port 0
|
||||
owner-source (normalize-owner-source owner-source)]
|
||||
@@ -428,6 +441,7 @@
|
||||
method-str (normalize-method-str method-kw)]
|
||||
(<invoke! proxy method-str method-kw false [repo (startup-db-opts opts)]))]
|
||||
(let [stop!* (atom nil)
|
||||
stopped? (atom false)
|
||||
server (make-server proxy {:bound-repo repo
|
||||
:stop-fn (fn []
|
||||
(when-let [stop! @stop!*]
|
||||
@@ -441,17 +455,30 @@
|
||||
address
|
||||
(.-port address))
|
||||
stop! (fn []
|
||||
(p/create
|
||||
(fn [resolve _]
|
||||
(reset! *ready? false)
|
||||
(doseq [^js res @*sse-clients]
|
||||
(try
|
||||
(.end res)
|
||||
(catch :default _)))
|
||||
(reset! *sse-clients #{})
|
||||
(when-let [lock-path (:path @*lock-info)]
|
||||
(db-lock/remove-lock! lock-path))
|
||||
(.close server (fn [] (resolve true))))))]
|
||||
(if @stopped?
|
||||
(p/resolved true)
|
||||
(do
|
||||
(reset! stopped? true)
|
||||
(-> (p/let [_ (<close-bound-repo! proxy repo)]
|
||||
(reset! *ready? false)
|
||||
(doseq [^js res @*sse-clients]
|
||||
(try
|
||||
(.end res)
|
||||
(catch :default _)))
|
||||
(reset! *sse-clients #{})
|
||||
(when-let [lock-path (:path @*lock-info)]
|
||||
(db-lock/remove-lock! lock-path))
|
||||
(p/create
|
||||
(fn [resolve _]
|
||||
(try
|
||||
(.close server (fn [] (resolve true)))
|
||||
(catch :default _
|
||||
(resolve true)))))
|
||||
true)
|
||||
(p/finally
|
||||
(fn []
|
||||
(when (fn? on-stopped!)
|
||||
(on-stopped!))))))))]
|
||||
(reset! *ready? true)
|
||||
(reset! stop!* stop!)
|
||||
(p/let [lock-with-port (assoc (:lock @*lock-info) :port actual-port)
|
||||
@@ -490,13 +517,12 @@
|
||||
:repo repo
|
||||
:create-empty-db? (:create-empty-db? opts)
|
||||
:owner-source owner-source
|
||||
:on-stopped! (fn []
|
||||
(log/info :db-worker-node-stopped nil)
|
||||
(.exit js/process 0))
|
||||
:log-level (:log-level opts)})]
|
||||
(log/info :db-worker-node-ready {:host (:host daemon) :port (:port daemon)})
|
||||
(let [shutdown (fn []
|
||||
(-> (stop!)
|
||||
(p/finally (fn []
|
||||
(log/info :db-worker-node-stopped nil)
|
||||
(.exit js/process 0)))))]
|
||||
(let [shutdown (fn [] (stop!))]
|
||||
(.on js/process "SIGINT" shutdown)
|
||||
(.on js/process "SIGTERM" shutdown)))
|
||||
(p/catch (fn [error]
|
||||
|
||||
@@ -199,3 +199,42 @@
|
||||
(p/catch (fn [e]
|
||||
(is false (str "unexpected error: " e))))
|
||||
(p/finally (fn [] (done)))))))
|
||||
|
||||
(deftest ensure-repo-stopped-detaches-all-windows-and-stops-runtime-once
|
||||
(async done
|
||||
(let [stop-calls (atom [])
|
||||
manager (db-worker/create-manager
|
||||
{:start-daemon! (fn [repo] (p/resolved (runtime repo)))
|
||||
:stop-daemon! (fn [rt]
|
||||
(swap! stop-calls conj (:repo rt))
|
||||
(p/resolved true))})]
|
||||
(-> (p/let [_ (db-worker/ensure-started! manager "graph-a" :window-1)
|
||||
_ (db-worker/ensure-started! manager "graph-a" :window-2)
|
||||
_ (db-worker/ensure-repo-stopped! manager "graph-a")
|
||||
state @(:state manager)]
|
||||
(is (= ["graph-a"] @stop-calls))
|
||||
(is (nil? (get-in state [:repos "graph-a"])))
|
||||
(is (nil? (get-in state [:window->repo :window-1])))
|
||||
(is (nil? (get-in state [:window->repo :window-2]))))
|
||||
(p/catch (fn [e]
|
||||
(is false (str "unexpected error: " e))))
|
||||
(p/finally (fn [] (done)))))))
|
||||
|
||||
(deftest ensure-repo-stopped-skips-stop-for-external-runtime
|
||||
(async done
|
||||
(let [stop-calls (atom [])
|
||||
manager (db-worker/create-manager
|
||||
{:start-daemon! (fn [repo]
|
||||
(p/resolved (assoc (runtime repo) :owned? false)))
|
||||
:stop-daemon! (fn [rt]
|
||||
(swap! stop-calls conj (:repo rt))
|
||||
(p/resolved true))})]
|
||||
(-> (p/let [_ (db-worker/ensure-started! manager "graph-a" :window-1)
|
||||
_ (db-worker/ensure-repo-stopped! manager "graph-a")
|
||||
state @(:state manager)]
|
||||
(is (empty? @stop-calls))
|
||||
(is (nil? (get-in state [:repos "graph-a"])))
|
||||
(is (nil? (get-in state [:window->repo :window-1]))))
|
||||
(p/catch (fn [e]
|
||||
(is false (str "unexpected error: " e))))
|
||||
(p/finally (fn [] (done)))))))
|
||||
|
||||
@@ -488,6 +488,44 @@
|
||||
(is false (str "unexpected error: " e))))
|
||||
(p/finally done)))))
|
||||
|
||||
(deftest db-worker-node-stop-closes-bound-repo
|
||||
(async done
|
||||
(let [data-dir (node-helper/create-tmp-dir "db-worker-stop-close-db")
|
||||
repo (str "logseq_db_stop_close_" (subs (str (random-uuid)) 0 8))
|
||||
lock-file-path (lock-path data-dir repo)
|
||||
invoke-calls (atom [])]
|
||||
(-> (p/with-redefs [platform-node/node-platform (fn [_opts] #js {})
|
||||
db-core/init-core! (fn [_platform]
|
||||
#js {:remoteInvoke (fn [method direct-pass? args]
|
||||
(swap! invoke-calls conj
|
||||
[method
|
||||
direct-pass?
|
||||
(if direct-pass?
|
||||
(vec (js->clj args))
|
||||
(ldb/read-transit-str args))])
|
||||
(p/resolved nil))})
|
||||
db-lock/ensure-lock! (fn [_]
|
||||
(p/resolved {:path lock-file-path
|
||||
:lock {:repo repo
|
||||
:pid (.-pid js/process)
|
||||
:host "127.0.0.1"
|
||||
:port 0
|
||||
:lock-id "stop-close-lock"}}))
|
||||
db-lock/update-lock! (fn [_path lock] lock)]
|
||||
(p/let [{:keys [stop!]} (db-worker-node/start-daemon! {:data-dir data-dir
|
||||
:repo repo
|
||||
:log-level "error"})
|
||||
_ (stop!)]
|
||||
(is (= ["thread-api/init" true []]
|
||||
(first @invoke-calls)))
|
||||
(is (= ["thread-api/create-or-open-db" false [repo {}]]
|
||||
(second @invoke-calls)))
|
||||
(is (= ["thread-api/close-db" false [repo]]
|
||||
(nth @invoke-calls 2)))))
|
||||
(p/catch (fn [e]
|
||||
(is false (str "unexpected error: " e))))
|
||||
(p/finally done)))))
|
||||
|
||||
(deftest db-worker-node-repo-error-handles-keyword-methods
|
||||
(let [repo-error #'db-worker-node/repo-error
|
||||
bound-repo "logseq_db_bound"]
|
||||
|
||||
Reference in New Issue
Block a user