fix(sync): clean temp download pools on success and failure

This commit is contained in:
Tienson Qin
2026-04-21 20:43:26 +08:00
parent 5ac264fec6
commit a3faf237e9
2 changed files with 105 additions and 9 deletions

View File

@@ -437,7 +437,7 @@
_ (when (:rows-imported? state)
(<replay-imported-rows! state))
result (complete-datoms-import! repo graph-id remote-tx)
_ (reset! *import-state nil)]
_ (clear-import-state! import-id)]
result)
(p/catch (fn [error]
(when-not (= :db-sync/stale-import (:type (ex-data error)))
@@ -448,7 +448,8 @@
[repo graph-id graph-e2ee?]
(let [base (sync-auth/http-base-url @worker-state/*db-sync-config)]
(if (and (seq repo) (seq graph-id) (seq base))
(let [stage* (atom :init)]
(let [stage* (atom :init)
import-id* (atom nil)]
(-> (p/let [log-f (fn [payload]
(rtc-log-and-state/rtc-log :rtc.log/download payload))
_ (log-f {:sub-type :download-progress
@@ -477,8 +478,7 @@
(throw (ex-info "snapshot download failed"
{:repo repo
:status (.-status resp)})))
(let [import-id* (atom nil)
ensure-import! (fn []
(let [ensure-import! (fn []
(if-let [import-id @import-id*]
(p/resolved import-id)
(p/let [_ (reset! stage* :prepare-import)
@@ -504,6 +504,8 @@
:remote-tx remote-tx
:graph-e2ee? graph-e2ee?})))
(p/catch (fn [error]
(when-let [import-id @import-id*]
(clear-import-state! import-id))
(log/error :db-sync/download-graph-by-id-failed
{:repo repo
:graph-id graph-id
@@ -513,11 +515,11 @@
:error-stack (when (instance? js/Error error)
(.-stack error))})
(throw (ex-info "db-sync download failed"
{:repo repo
:graph-id graph-id
:graph-e2ee? graph-e2ee?
:stage @stage*
:error-message (or (ex-message error)
{:repo repo
:graph-id graph-id
:graph-e2ee? graph-e2ee?
:stage @stage*
:error-message (or (ex-message error)
(when (instance? js/Error error)
(.-message error)))}
error))))))

View File

@@ -1,5 +1,6 @@
(ns frontend.worker.db-worker-test
(:require [cljs.test :refer [async deftest is]]
[clojure.string :as string]
[datascript.core :as d]
[frontend.common.thread-api :as thread-api]
[frontend.worker.a-test-env]
@@ -338,6 +339,99 @@
(is false (str error))
(done)))))))))))
(deftest db-sync-import-finalize-cleans-temp-pool-on-success-test
(async done
(restoring-worker-state
(fn []
(let [import-id "import-success-1"
graph-id "graph-success-1"
remote-tx 42
rows-db-closed* (atom 0)
removed-pools* (atom [])
rows-pool #js {:name "temp-download-pool"}
rows-db #js {:close (fn []
(swap! rows-db-closed* inc))}]
(reset! @#'sync-download/*import-state
{:import-id import-id
:repo test-repo
:graph-id graph-id
:rows-db rows-db
:rows-pool rows-pool
:rows-path "/download-import.sqlite"
:rows-imported? false})
(-> (p/with-redefs [sync-download/complete-datoms-import! (fn [_repo _graph-id _remote-tx]
(p/resolved :ok))
platform/remove-storage-pool! (fn [_platform pool]
(swap! removed-pools* conj pool)
(p/resolved true))]
(sync-download/finalize-import! test-repo graph-id remote-tx import-id))
(p/then (fn [result]
(is (= :ok result))
(is (= 1 @rows-db-closed*))
(is (= [rows-pool] @removed-pools*))
(is (nil? @@#'sync-download/*import-state))
(done)))
(p/catch (fn [error]
(is false (str error))
(done)))))))))
(deftest db-sync-download-graph-by-id-cleans-temp-pool-on-failure-test
(async done
(restoring-worker-state
(fn []
(let [original-fetch js/fetch
import-id "import-failure-1"
graph-id "graph-failure-1"
rows-db-closed* (atom 0)
removed-pools* (atom [])
finalize-calls* (atom 0)
rows-pool #js {:name "temp-download-pool"}
rows-db #js {:close (fn []
(swap! rows-db-closed* inc))}]
(reset! worker-state/*db-sync-config {:http-base "https://sync.example.test"})
(set! js/fetch (fn [_url _opts]
(p/resolved #js {:ok true})))
(-> (p/with-redefs [rtc-log-and-state/rtc-log (fn [& _] nil)
sync-download/fetch-json (fn [_url _opts schema]
(case schema
:sync/pull (p/resolved {:t 77})
:sync/snapshot-download (p/resolved {:url "https://snapshot.example.test"})
(p/rejected (ex-info "unexpected schema" {:schema schema}))))
sync-download/prepare-import! (fn [repo _reset? gid _graph-e2ee? & _]
(reset! @#'sync-download/*import-state
{:import-id import-id
:repo repo
:graph-id gid
:rows-db rows-db
:rows-pool rows-pool
:rows-path "/download-import.sqlite"
:rows-imported? false})
(p/resolved {:import-id import-id}))
sync-download/import-rows-chunk! (fn [_rows _graph-id _import-id]
(p/resolved true))
sync-download/finalize-import! (fn [& _]
(swap! finalize-calls* inc)
(p/resolved nil))
sync-download/<stream-snapshot-row-batches! (fn [_resp _batch-size on-batch]
(p/let [_ (on-batch [[1 "content" nil]])]
(p/rejected (ex-info "stream failed" {:code :stream-failed}))))
platform/remove-storage-pool! (fn [_platform pool]
(swap! removed-pools* conj pool)
(p/resolved true))]
(sync-download/download-graph-by-id! test-repo graph-id false))
(p/then (fn [_]
(is false "expected download failure")
(done)))
(p/catch (fn [error]
(is (string/includes? (or (ex-message error) "") "db-sync download failed"))
(is (= 1 @rows-db-closed*))
(is (= [rows-pool] @removed-pools*))
(is (= 0 @finalize-calls*))
(is (nil? @@#'sync-download/*import-state))
(done)))
(p/finally (fn []
(set! js/fetch original-fetch)))))))))
(deftest db-sync-import-rows-chunk-calls-import-rows-batch-test
(async done
(restoring-worker-state