fix: guard graph transfer concurrency

This commit is contained in:
Tienson Qin
2026-05-21 18:26:38 +08:00
parent c48b699560
commit b2dee8ca75
2 changed files with 141 additions and 28 deletions

View File

@@ -117,6 +117,24 @@
true
(true? graph-e2ee?)))
(defn- active-graph-operation []
(let [{:rtc/keys [downloading-graph-uuid uploading?]} @state/state]
(cond
downloading-graph-uuid
{:active-operation :download
:active-graph-uuid downloading-graph-uuid}
(true? uploading?)
{:active-operation :upload})))
(defn- reject-graph-operation-in-progress
[requested-operation active-operation]
(p/rejected
(ex-info "graph operation already in progress"
(assoc active-operation
:type :db-sync/graph-operation-in-progress
:requested-operation requested-operation))))
(defn- <ensure-download-runtime-bound!
[repo]
(if (util/electron?)
@@ -257,30 +275,33 @@
([graph-name graph-uuid]
(<rtc-download-graph! graph-name graph-uuid true))
([graph-name graph-uuid graph-e2ee?]
(state/set-state! :rtc/downloading-graph-uuid graph-uuid)
(state/pub-event!
[:rtc/log {:type :rtc.log/download
:sub-type :download-progress
:graph-uuid graph-uuid
:message "Preparing graph snapshot download"}])
(let [graph-e2ee? (normalize-graph-e2ee? graph-e2ee?)
base (http-base)]
(-> (if (and graph-uuid base)
(p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
graph (str config/db-version-prefix graph-name)
_ (<ensure-download-runtime-bound! graph)
_ (state/<invoke-db-worker :thread-api/db-sync-download-graph-by-id
graph graph-uuid graph-e2ee?)]
true)
(p/rejected (ex-info "db-sync missing graph info"
{:type :db-sync/invalid-graph
:graph-uuid graph-uuid
:base base})))
(p/catch (fn [error]
(throw error)))
(p/finally
(fn []
(state/set-state! :rtc/downloading-graph-uuid nil)))))))
(if-let [operation (active-graph-operation)]
(reject-graph-operation-in-progress :download operation)
(do
(state/set-state! :rtc/downloading-graph-uuid graph-uuid)
(state/pub-event!
[:rtc/log {:type :rtc.log/download
:sub-type :download-progress
:graph-uuid graph-uuid
:message "Preparing graph snapshot download"}])
(let [graph-e2ee? (normalize-graph-e2ee? graph-e2ee?)
base (http-base)]
(-> (if (and graph-uuid base)
(p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
graph (str config/db-version-prefix graph-name)
_ (<ensure-download-runtime-bound! graph)
_ (state/<invoke-db-worker :thread-api/db-sync-download-graph-by-id
graph graph-uuid graph-e2ee?)]
true)
(p/rejected (ex-info "db-sync missing graph info"
{:type :db-sync/invalid-graph
:graph-uuid graph-uuid
:base base})))
(p/catch (fn [error]
(throw error)))
(p/finally
(fn []
(state/set-state! :rtc/downloading-graph-uuid nil)))))))))
(defn <get-remote-graphs
[]
@@ -385,10 +406,18 @@
(defn <rtc-upload-graph!
[repo _graph-e2ee?]
(p/do!
(state/<invoke-db-worker :thread-api/db-sync-upload-graph repo)
(<get-remote-graphs)
(<rtc-start! repo)))
(if-let [operation (active-graph-operation)]
(reject-graph-operation-in-progress :upload operation)
(do
(state/set-state! :rtc/uploading? true)
(-> (p/let [_ (state/<invoke-db-worker :thread-api/db-sync-upload-graph repo)
_ (<get-remote-graphs)
_ (state/set-state! :rtc/uploading? false)
_ (<rtc-start! repo)]
true)
(p/finally
(fn []
(state/set-state! :rtc/uploading? false)))))))
(defn <rtc-create-graph-and-start-sync!
[repo graph-e2ee?]

View File

@@ -136,6 +136,90 @@
(is false (str e))
(done)))))))
(deftest rtc-download-graph-rejects-while-another-download-is-active-test
(async done
(let [state-prev @state/state
worker-calls (atom [])]
(swap! state/state assoc
:rtc/downloading-graph-uuid "graph-1"
:rtc/uploading? false)
(-> (p/with-redefs [db-sync/http-base (fn [] "http://base")
user-handler/task--ensure-id&access-token (fn [resolve _reject]
(resolve true))
state/<invoke-db-worker (fn [& args]
(swap! worker-calls conj args)
(p/resolved :ok))
state/pub-event! (fn [& _] nil)]
(db-sync/<rtc-download-graph! "demo-graph" "graph-2" false))
(p/then (fn [_]
(is false "expected rejection")
(reset! state/state state-prev)
(done)))
(p/catch (fn [error]
(is (= :db-sync/graph-operation-in-progress
(:type (ex-data error))))
(is (empty? @worker-calls))
(reset! state/state state-prev)
(done)))))))
(deftest rtc-download-graph-rejects-while-upload-is-active-test
(async done
(let [state-prev @state/state
worker-calls (atom [])]
(swap! state/state assoc
:rtc/downloading-graph-uuid nil
:rtc/uploading? true)
(-> (p/with-redefs [db-sync/http-base (fn [] "http://base")
user-handler/task--ensure-id&access-token (fn [resolve _reject]
(resolve true))
state/<invoke-db-worker (fn [& args]
(swap! worker-calls conj args)
(p/resolved :ok))
state/pub-event! (fn [& _] nil)]
(db-sync/<rtc-download-graph! "demo-graph" "graph-1" false))
(p/then (fn [_]
(is false "expected rejection")
(reset! state/state state-prev)
(done)))
(p/catch (fn [error]
(is (= :db-sync/graph-operation-in-progress
(:type (ex-data error))))
(is (empty? @worker-calls))
(reset! state/state state-prev)
(done)))))))
(deftest rtc-upload-graph-rejects-while-download-is-active-test
(async done
(let [state-prev @state/state
upload-calls (atom [])
refresh-calls (atom 0)
start-calls (atom [])]
(swap! state/state assoc
:rtc/downloading-graph-uuid "graph-1"
:rtc/uploading? false)
(-> (p/with-redefs [state/<invoke-db-worker (fn [& args]
(swap! upload-calls conj args)
(p/resolved :ok))
db-sync/<get-remote-graphs (fn []
(swap! refresh-calls inc)
(p/resolved []))
db-sync/<rtc-start! (fn [repo & _]
(swap! start-calls conj repo)
(p/resolved :ok))]
(db-sync/<rtc-upload-graph! "logseq_db_demo" false))
(p/then (fn [_]
(is false "expected rejection")
(reset! state/state state-prev)
(done)))
(p/catch (fn [error]
(is (= :db-sync/graph-operation-in-progress
(:type (ex-data error))))
(is (empty? @upload-calls))
(is (zero? @refresh-calls))
(is (empty? @start-calls))
(reset! state/state state-prev)
(done)))))))
(deftest rtc-create-graph-and-start-sync-does-not-upload-snapshot-test
(async done
(let [create-calls (atom [])