mirror of
https://github.com/logseq/logseq.git
synced 2026-05-28 06:34:34 +00:00
fix: clean up outdated db worker nodes
This commit is contained in:
@@ -329,14 +329,16 @@
|
||||
:owner-source lock-owner
|
||||
:owned? (owner-manageable? requester-owner lock-owner)))))))))
|
||||
|
||||
(declare stop-version-mismatched-server!)
|
||||
(declare stop-version-mismatched-server!
|
||||
cleanup-additional-revision-mismatched-servers!
|
||||
list-servers)
|
||||
|
||||
(defn- ensure-server-started!
|
||||
[config repo]
|
||||
(p/let [expected (expected-revision config)
|
||||
server (ensure-server-started-once! config repo)]
|
||||
(if-not (revision-mismatch? expected (:revision server))
|
||||
server
|
||||
(cleanup-additional-revision-mismatched-servers! config repo expected server)
|
||||
(p/let [stop-result (profile/time! (:profile-session config)
|
||||
"server.restart-version-mismatch"
|
||||
(fn []
|
||||
@@ -351,7 +353,7 @@
|
||||
:stop-error (:error stop-result)))))
|
||||
(p/let [server' (ensure-server-started-once! config repo)]
|
||||
(if-not (revision-mismatch? expected (:revision server'))
|
||||
server'
|
||||
(cleanup-additional-revision-mismatched-servers! config repo expected server')
|
||||
(let [error-data (assoc (server-revision-mismatch-error
|
||||
:server-revision-mismatch-after-restart
|
||||
repo
|
||||
@@ -409,20 +411,17 @@
|
||||
:data {:repo repo}})
|
||||
(p/catch
|
||||
(fn [_]
|
||||
(when (and (= :alive (pid-status (:pid server)))
|
||||
(not= (:pid server) (.-pid js/process)))
|
||||
(try
|
||||
(.kill js/process (:pid server) "SIGTERM")
|
||||
(catch :default e
|
||||
(log/warn :cli-server-stop-sigterm-failed e))))
|
||||
(when (= :not-found (pid-status (:pid server)))
|
||||
(remove-lock! path))
|
||||
(if (fs/existsSync path)
|
||||
{:ok? false
|
||||
:error {:code :server-stop-timeout
|
||||
:message "timed out stopping server"}}
|
||||
{:ok? true
|
||||
:data {:repo repo}})))))))))))
|
||||
(p/let [_ (when (and (= :alive (pid-status (:pid server)))
|
||||
(not= (:pid server) (.-pid js/process)))
|
||||
(daemon/stop-process! server))]
|
||||
(when (= :not-found (pid-status (:pid server)))
|
||||
(remove-lock! path))
|
||||
(if (fs/existsSync path)
|
||||
{:ok? false
|
||||
:error {:code :server-stop-timeout
|
||||
:message "timed out stopping server"}}
|
||||
{:ok? true
|
||||
:data {:repo repo}}))))))))))))
|
||||
|
||||
(defn stop-server!
|
||||
[config repo]
|
||||
@@ -488,6 +487,82 @@
|
||||
:owner-source owner-source
|
||||
:revision revision})
|
||||
|
||||
(defn- process-stopped?
|
||||
[pid]
|
||||
(not (contains? #{:alive :no-permission} (pid-status pid))))
|
||||
|
||||
(defn- server-entry-key
|
||||
[{:keys [pid port]}]
|
||||
[pid port])
|
||||
|
||||
(defn- same-server-entry?
|
||||
[a b]
|
||||
(= (server-entry-key a) (server-entry-key b)))
|
||||
|
||||
(defn- remove-lock-if-owned-by-server!
|
||||
[config repo {:keys [pid]}]
|
||||
(let [path (lock-path (resolve-root-dir config) repo)
|
||||
lock (read-lock path)]
|
||||
(when (= pid (:pid lock))
|
||||
(remove-lock! path))))
|
||||
|
||||
(defn- cleanup-stopped-server-entry!
|
||||
[config repo server]
|
||||
(server-list/remove-entry! (server-list-path config) server)
|
||||
(remove-lock-if-owned-by-server! config repo server)
|
||||
nil)
|
||||
|
||||
(defn- stop-discovered-server!
|
||||
[config repo {:keys [pid] :as server}]
|
||||
(let [target (cleanup-target server)
|
||||
repo' (or (:repo server) repo)]
|
||||
(-> (p/let [_ (-> (shutdown! server)
|
||||
(p/catch (fn [e]
|
||||
(log/warn :cli-server-shutdown-discovered-failed
|
||||
{:target target :error e})
|
||||
(p/resolved false))))
|
||||
_ (-> (wait-for (fn [] (p/resolved (process-stopped? pid)))
|
||||
{:timeout-ms 3000
|
||||
:interval-ms 100})
|
||||
(p/catch (fn [_]
|
||||
(daemon/stop-process! server))))
|
||||
stopped? (process-stopped? pid)]
|
||||
(if stopped?
|
||||
(do
|
||||
(cleanup-stopped-server-entry! config repo' server)
|
||||
{:ok? true
|
||||
:target target})
|
||||
{:ok? false
|
||||
:target target
|
||||
:error {:code :server-stop-timeout
|
||||
:message "timed out stopping server"}}))
|
||||
(p/catch (fn [e]
|
||||
{:ok? false
|
||||
:target target
|
||||
:error (or (ex-data e)
|
||||
{:code :server-stop-failed
|
||||
:message (.-message e)})})))))
|
||||
|
||||
(defn- cleanup-additional-revision-mismatched-servers!
|
||||
[config repo expected active-server]
|
||||
(p/let [servers (list-servers config)
|
||||
targets (->> servers
|
||||
(filter (fn [server]
|
||||
(and (graph-dir/same-repo? repo (:repo server))
|
||||
(not (same-server-entry? active-server server))
|
||||
(revision-mismatch? expected (:revision server)))))
|
||||
vec)
|
||||
results (p/all
|
||||
(for [server targets]
|
||||
(stop-discovered-server! config repo server)))
|
||||
failed (filterv (comp not :ok?) results)]
|
||||
(when (seq failed)
|
||||
(throw (ex-info "failed to stop outdated db-worker-node servers"
|
||||
{:code :server-outdated-cleanup-failed
|
||||
:repo repo
|
||||
:failed failed})))
|
||||
active-server))
|
||||
|
||||
(defn cleanup-revision-mismatched-servers!
|
||||
[config cli-revision]
|
||||
(p/let [servers (list-servers config)
|
||||
|
||||
@@ -142,7 +142,7 @@
|
||||
{:timeout-ms timeout-ms
|
||||
:interval-ms 100}))
|
||||
|
||||
(defn- stop-stale-process!
|
||||
(defn stop-process!
|
||||
[{:keys [pid]}]
|
||||
(cond
|
||||
(not (number? pid))
|
||||
@@ -183,7 +183,7 @@
|
||||
(p/resolved nil))
|
||||
|
||||
(not (valid-lock? lock))
|
||||
(-> (stop-stale-process! lock)
|
||||
(-> (stop-process! lock)
|
||||
(p/then (fn [_]
|
||||
(remove-lock! path)
|
||||
nil)))
|
||||
|
||||
@@ -129,6 +129,116 @@
|
||||
(is false (str "unexpected error: " e))))
|
||||
(p/finally done)))))
|
||||
|
||||
(deftest ensure-server-stops-outdated-additional-server-for-same-repo
|
||||
(async done
|
||||
(let [root-dir (node-helper/create-tmp-dir "cli-server-revision-cleanup-repo")
|
||||
repo (str "logseq_db_revision_cleanup_repo_" (subs (str (random-uuid)) 0 8))
|
||||
_lock-file (write-test-lock! root-dir repo :electron)
|
||||
current-server (assoc (revision-test-server {:repo repo
|
||||
:port 9430
|
||||
:owner-source :electron
|
||||
:revision "expected-revision"
|
||||
:root-dir root-dir})
|
||||
:pid 91030)
|
||||
outdated-server (assoc (revision-test-server {:repo repo
|
||||
:port 9431
|
||||
:owner-source :electron
|
||||
:revision "old-revision"
|
||||
:root-dir root-dir})
|
||||
:pid 91031)
|
||||
other-repo-server (assoc (revision-test-server {:repo "logseq_db_other_revision_cleanup_repo"
|
||||
:port 9432
|
||||
:owner-source :electron
|
||||
:revision "old-revision"
|
||||
:root-dir root-dir})
|
||||
:pid 91032)
|
||||
shutdown-ports (atom #{})
|
||||
shutdown-calls (atom [])]
|
||||
(-> (p/with-redefs [daemon/cleanup-stale-lock! (fn [_ _] (p/resolved nil))
|
||||
cli-server/discover-servers (fn [_]
|
||||
(p/resolved [current-server
|
||||
outdated-server
|
||||
other-repo-server]))
|
||||
daemon/http-request (fn [{:keys [path port]}]
|
||||
(when (= "/v1/shutdown" path)
|
||||
(swap! shutdown-calls conj port)
|
||||
(swap! shutdown-ports conj port))
|
||||
(p/resolved {:status 200 :body ""}))
|
||||
daemon/pid-status (fn [pid]
|
||||
(cond
|
||||
(= pid (:pid outdated-server))
|
||||
(if (contains? @shutdown-ports (:port outdated-server))
|
||||
:not-found
|
||||
:alive)
|
||||
|
||||
(= pid (:pid other-repo-server))
|
||||
:alive
|
||||
|
||||
:else
|
||||
:alive))
|
||||
daemon/wait-for (fn [pred-fn _opts]
|
||||
(p/let [matched? (pred-fn)]
|
||||
(if matched?
|
||||
true
|
||||
(throw (ex-info "timed out" {:code :timeout})))))
|
||||
daemon/wait-for-ready (fn [_] (p/resolved true))]
|
||||
(cli-server/ensure-server! {:root-dir root-dir
|
||||
:owner-source :electron
|
||||
:expected-revision "expected-revision"}
|
||||
repo))
|
||||
(p/then (fn [config]
|
||||
(is (= "http://127.0.0.1:9430" (:base-url config)))
|
||||
(is (= [9431] @shutdown-calls))))
|
||||
(p/catch (fn [e]
|
||||
(is false (str "unexpected error: " e))))
|
||||
(p/finally done)))))
|
||||
|
||||
(deftest ensure-server-uses-daemon-stop-for-outdated-server-when-shutdown-hangs
|
||||
(async done
|
||||
(let [root-dir (node-helper/create-tmp-dir "cli-server-revision-cleanup-stop-fallback")
|
||||
repo (str "logseq_db_revision_cleanup_stop_fallback_" (subs (str (random-uuid)) 0 8))
|
||||
_lock-file (write-test-lock! root-dir repo :electron)
|
||||
current-server (assoc (revision-test-server {:repo repo
|
||||
:port 9433
|
||||
:owner-source :electron
|
||||
:revision "expected-revision"
|
||||
:root-dir root-dir})
|
||||
:pid 91033)
|
||||
outdated-server (assoc (revision-test-server {:repo repo
|
||||
:port 9434
|
||||
:owner-source :electron
|
||||
:revision "old-revision"
|
||||
:root-dir root-dir})
|
||||
:pid 91034)
|
||||
stopped-pids (atom #{})
|
||||
stop-calls (atom [])]
|
||||
(-> (p/with-redefs [daemon/cleanup-stale-lock! (fn [_ _] (p/resolved nil))
|
||||
cli-server/discover-servers (fn [_]
|
||||
(p/resolved [current-server outdated-server]))
|
||||
daemon/http-request (fn [_]
|
||||
(p/resolved {:status 200 :body ""}))
|
||||
daemon/pid-status (fn [pid]
|
||||
(if (contains? @stopped-pids pid)
|
||||
:not-found
|
||||
:alive))
|
||||
daemon/wait-for (fn [_ _]
|
||||
(p/rejected (ex-info "timed out" {:code :timeout})))
|
||||
daemon/stop-process! (fn [{:keys [pid]}]
|
||||
(swap! stop-calls conj pid)
|
||||
(swap! stopped-pids conj pid)
|
||||
(p/resolved nil))
|
||||
daemon/wait-for-ready (fn [_] (p/resolved true))]
|
||||
(cli-server/ensure-server! {:root-dir root-dir
|
||||
:owner-source :electron
|
||||
:expected-revision "expected-revision"}
|
||||
repo))
|
||||
(p/then (fn [config]
|
||||
(is (= "http://127.0.0.1:9433" (:base-url config)))
|
||||
(is (= [91034] @stop-calls))))
|
||||
(p/catch (fn [e]
|
||||
(is false (str "unexpected error: " e))))
|
||||
(p/finally done)))))
|
||||
|
||||
(deftest ensure-server-reuses-prefix-free-discovered-server
|
||||
(async done
|
||||
(let [root-dir (node-helper/create-tmp-dir "cli-server-prefix-free-reuse")
|
||||
|
||||
Reference in New Issue
Block a user