mirror of
https://github.com/logseq/logseq.git
synced 2026-04-24 22:25:01 +00:00
enhance: make use-effect's handler cancelable
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
{:extra-indents {missionary.core/sp [[:block 0]]
|
||||
missionary.core/ap [[:block 0]]}
|
||||
missionary.core/ap [[:block 0]]
|
||||
frontend.common.missionary/run-task [[:inner 0]]}
|
||||
:sort-ns-references? true}
|
||||
|
||||
@@ -87,12 +87,18 @@
|
||||
|
||||
(defn run-task
|
||||
"Return the canceler"
|
||||
[task key & {:keys [succ fail]}]
|
||||
(task (or succ #(log/info :key key :succ %)) (or fail #(log/info :key key :stopped %))))
|
||||
[key' task & {:keys [succ fail]}]
|
||||
(task (or succ #(log/info :key key' :succ %)) (or fail #(log/info :key key' :stopped %))))
|
||||
|
||||
(defn run-task-throw
|
||||
[task key & {:keys [succ]}]
|
||||
(task (or succ #(log/info :key key :succ %)) #(throw (ex-info "task stopped" {:key key :e %}))))
|
||||
"Return the canceler"
|
||||
[key' task & {:keys [succ]}]
|
||||
(task (or succ #(log/info :key key' :succ %)) #(throw (ex-info "task stopped" {:key key' :e %}))))
|
||||
|
||||
(defn run-task*
|
||||
"Return the canceler"
|
||||
[task]
|
||||
(task (constantly nil) (constantly nil)))
|
||||
|
||||
(defonce ^:private *background-task-cancelers ; key -> canceler
|
||||
(volatile! {}))
|
||||
@@ -105,7 +111,7 @@
|
||||
(canceler)
|
||||
(vswap! *background-task-cancelers assoc key' nil))
|
||||
(prn :run-background-task key')
|
||||
(let [canceler (run-task task key')]
|
||||
(let [canceler (run-task key' task)]
|
||||
(vswap! *background-task-cancelers assoc key' canceler)
|
||||
nil))
|
||||
|
||||
|
||||
@@ -54,9 +54,9 @@
|
||||
{:will-mount (fn [state]
|
||||
(reset!
|
||||
(::online-users-canceler state)
|
||||
(c.m/run-task
|
||||
(m/reduce (fn [_ v] (reset! (::online-users state) v)) rtc-flows/rtc-online-users-flow)
|
||||
:fetch-online-users :succ (constantly nil)))
|
||||
(c.m/run-task :fetch-online-users
|
||||
(m/reduce (fn [_ v] (reset! (::online-users state) v)) rtc-flows/rtc-online-users-flow)
|
||||
:succ (constantly nil)))
|
||||
state)
|
||||
:will-unmount (fn [state]
|
||||
(when @(::online-users-canceler state) (@(::online-users-canceler state)))
|
||||
|
||||
@@ -2,12 +2,12 @@
|
||||
"RTC state indicator"
|
||||
(:require [cljs-time.core :as t]
|
||||
[clojure.pprint :as pprint]
|
||||
[frontend.common.missionary :as c.m]
|
||||
[frontend.db :as db]
|
||||
[frontend.handler.db-based.rtc-flows :as rtc-flows]
|
||||
[frontend.state :as state]
|
||||
[frontend.ui :as ui]
|
||||
[frontend.util :as util]
|
||||
[frontend.common.missionary :as c.m]
|
||||
[logseq.shui.ui :as shui]
|
||||
[missionary.core :as m]
|
||||
[rum.core :as rum]))
|
||||
@@ -38,21 +38,20 @@
|
||||
(when log
|
||||
(swap! *detail-info update k (fn [logs] (take 5 (conj logs log))))))
|
||||
flow))]
|
||||
(let [canceler (c.m/run-task
|
||||
(m/join
|
||||
(constantly nil)
|
||||
(update-log-task rtc-flows/rtc-download-log-flow :download-logs)
|
||||
(update-log-task rtc-flows/rtc-upload-log-flow :upload-logs)
|
||||
(update-log-task rtc-flows/rtc-misc-log-flow :misc-logs)
|
||||
(m/reduce (fn [_ state]
|
||||
(swap! *detail-info assoc
|
||||
:pending-local-ops (:unpushed-block-update-count state)
|
||||
:graph-uuid (:graph-uuid state)
|
||||
:local-tx (:local-tx state)
|
||||
:remote-tx (:remote-tx state)
|
||||
:rtc-state (if (:rtc-lock state) :open :close)))
|
||||
rtc-flows/rtc-state-stream-flow))
|
||||
::update-detail-info)]
|
||||
(let [canceler (c.m/run-task ::update-detail-info
|
||||
(m/join
|
||||
(constantly nil)
|
||||
(update-log-task rtc-flows/rtc-download-log-flow :download-logs)
|
||||
(update-log-task rtc-flows/rtc-upload-log-flow :upload-logs)
|
||||
(update-log-task rtc-flows/rtc-misc-log-flow :misc-logs)
|
||||
(m/reduce (fn [_ state]
|
||||
(swap! *detail-info assoc
|
||||
:pending-local-ops (:unpushed-block-update-count state)
|
||||
:graph-uuid (:graph-uuid state)
|
||||
:local-tx (:local-tx state)
|
||||
:remote-tx (:remote-tx state)
|
||||
:rtc-state (if (:rtc-lock state) :open :close)))
|
||||
rtc-flows/rtc-state-stream-flow)))]
|
||||
(reset! *update-detail-info-canceler canceler))))
|
||||
(run-task--update-detail-info)
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
[clojure.string :as string]
|
||||
[datascript.impl.entity :as de]
|
||||
[dommy.core :as dom]
|
||||
[frontend.common.missionary :as c.m]
|
||||
[frontend.components.dnd :as dnd]
|
||||
[frontend.components.icon :as icon-component]
|
||||
[frontend.components.property.config :as property-config]
|
||||
@@ -40,6 +41,7 @@
|
||||
[logseq.db.frontend.view :as db-view]
|
||||
[logseq.shui.ui :as shui]
|
||||
[medley.core :as medley]
|
||||
[missionary.core :as m]
|
||||
[promesa.core :as p]
|
||||
[rum.core :as rum]))
|
||||
|
||||
@@ -1162,10 +1164,12 @@
|
||||
:skip-refresh? true})]
|
||||
(hooks/use-effect!
|
||||
(fn []
|
||||
(when (and db-id (not item))
|
||||
(p/let [block (db-async/<get-block (state/get-current-repo) db-id opts)
|
||||
block' (if list-view? (db/entity db-id) block)]
|
||||
(set-item! block'))))
|
||||
(c.m/run-task*
|
||||
(m/sp
|
||||
(when (and db-id (not item))
|
||||
(let [block (c.m/<? (db-async/<get-block (state/get-current-repo) db-id opts))
|
||||
block' (if list-view? (db/entity db-id) block)]
|
||||
(set-item! block'))))))
|
||||
[db-id])
|
||||
(let [item' (cond (map? item) item (number? item) {:db/id item})]
|
||||
(item-render item'))))
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
(:require [cljs-time.coerce :as tc]
|
||||
[cljs-time.core :as t]
|
||||
[cljs-time.format :as tf]
|
||||
[cljs.cache :as cache]
|
||||
[datascript.core :as d]
|
||||
[frontend.config :as config]
|
||||
[frontend.date :as date]
|
||||
|
||||
@@ -28,16 +28,15 @@
|
||||
(rum/local nil ::keys-state)
|
||||
{:will-mount (fn [state]
|
||||
(let [canceler
|
||||
(c.m/run-task
|
||||
(m/reduce
|
||||
(fn [logs log]
|
||||
(let [logs* (if log
|
||||
(take 10 (conj logs log))
|
||||
logs)]
|
||||
(reset! (get state ::logs) logs*)
|
||||
logs*))
|
||||
nil rtc-flows/rtc-log-flow)
|
||||
::sub-logs)]
|
||||
(c.m/run-task ::sub-logs
|
||||
(m/reduce
|
||||
(fn [logs log]
|
||||
(let [logs* (if log
|
||||
(take 10 (conj logs log))
|
||||
logs)]
|
||||
(reset! (get state ::logs) logs*)
|
||||
logs*))
|
||||
nil rtc-flows/rtc-log-flow))]
|
||||
(reset! (get state ::sub-log-canceler) canceler)
|
||||
state))
|
||||
:will-unmount (fn [state]
|
||||
@@ -83,9 +82,8 @@
|
||||
(shui/tabler-icon "download") "graph-list")
|
||||
(shui/button
|
||||
{:size :sm
|
||||
:on-click #(c.m/run-task
|
||||
(user/new-task--upload-user-avatar "TEST_AVATAR")
|
||||
:upload-test-avatar)}
|
||||
:on-click #(c.m/run-task :upload-test-avatar
|
||||
(user/new-task--upload-user-avatar "TEST_AVATAR"))}
|
||||
(shui/tabler-icon "upload") "upload-test-avatar")]
|
||||
|
||||
[:div.pb-4
|
||||
|
||||
@@ -310,7 +310,8 @@
|
||||
(when-let [canceler @*last-update-due-cards-count-canceler]
|
||||
(canceler)
|
||||
(reset! *last-update-due-cards-count-canceler nil))
|
||||
(let [canceler (c.m/run-task new-task--update-due-cards-count :update-due-cards-count)]
|
||||
(let [canceler (c.m/run-task :update-due-cards-count
|
||||
new-task--update-due-cards-count)]
|
||||
(reset! *last-update-due-cards-count-canceler canceler)
|
||||
nil))
|
||||
|
||||
|
||||
@@ -285,14 +285,13 @@
|
||||
:body asset-file
|
||||
:with-credentials? false
|
||||
:*progress-flow *progress-flow})]
|
||||
(c.m/run-task
|
||||
(m/reduce (fn [_ v]
|
||||
(state/update-state!
|
||||
:rtc/asset-upload-download-progress
|
||||
(fn [m] (assoc-in m [repo asset-block-uuid-str] v))))
|
||||
@*progress-flow)
|
||||
:upload-asset-progress
|
||||
:succ (constantly nil))
|
||||
(c.m/run-task :upload-asset-progress
|
||||
(m/reduce (fn [_ v]
|
||||
(state/update-state!
|
||||
:rtc/asset-upload-download-progress
|
||||
(fn [m] (assoc-in m [repo asset-block-uuid-str] v))))
|
||||
@*progress-flow)
|
||||
:succ (constantly nil))
|
||||
(let [{:keys [status] :as r} (m/? http-task)]
|
||||
(when-not (http/unexceptional-status? status)
|
||||
{:ex-data {:type :rtc.exception/upload-asset-failed :data (dissoc r :body)}})))))
|
||||
@@ -305,14 +304,13 @@
|
||||
:response-type :array-buffer
|
||||
:*progress-flow *progress-flow})
|
||||
progress-canceler
|
||||
(c.m/run-task
|
||||
(m/reduce (fn [_ v]
|
||||
(state/update-state!
|
||||
:rtc/asset-upload-download-progress
|
||||
(fn [m] (assoc-in m [repo asset-block-uuid-str] v))))
|
||||
@*progress-flow)
|
||||
:download-asset-progress
|
||||
:succ (constantly nil))]
|
||||
(c.m/run-task :download-asset-progress
|
||||
(m/reduce (fn [_ v]
|
||||
(state/update-state!
|
||||
:rtc/asset-upload-download-progress
|
||||
(fn [m] (assoc-in m [repo asset-block-uuid-str] v))))
|
||||
@*progress-flow)
|
||||
:succ (constantly nil))]
|
||||
(try
|
||||
(let [{:keys [status body] :as r} (m/? http-task)]
|
||||
(if-not (http/unexceptional-status? status)
|
||||
|
||||
@@ -48,20 +48,19 @@
|
||||
(swap! *sent assoc ws false))
|
||||
(when (not (@*sent ws))
|
||||
(let [recv-flow (ws/recv-flow (m/? get-ws-create-task))]
|
||||
(c.m/run-task
|
||||
(m/sp
|
||||
(when-let [online-users (:online-users
|
||||
(m/?
|
||||
(m/timeout
|
||||
(m/reduce
|
||||
(fn [_ v]
|
||||
(when (= "online-users-updated" (:req-id v))
|
||||
(reduced v)))
|
||||
recv-flow)
|
||||
10000)))]
|
||||
(reset! *online-users online-users)))
|
||||
:update-online-user-when-register-graph-updates
|
||||
:succ (constantly nil)))
|
||||
(c.m/run-task :update-online-user-when-register-graph-updates
|
||||
(m/sp
|
||||
(when-let [online-users (:online-users
|
||||
(m/?
|
||||
(m/timeout
|
||||
(m/reduce
|
||||
(fn [_ v]
|
||||
(when (= "online-users-updated" (:req-id v))
|
||||
(reduced v)))
|
||||
recv-flow)
|
||||
10000)))]
|
||||
(reset! *online-users online-users)))
|
||||
:succ (constantly nil)))
|
||||
(let [{:keys [max-remote-schema-version]}
|
||||
(m/?
|
||||
(c.m/backoff
|
||||
|
||||
@@ -238,7 +238,8 @@
|
||||
(update-remote-schema-version! conn @*server-schema-version)
|
||||
(add-migration-client-ops! repo @conn @*server-schema-version)
|
||||
(reset! *assets-sync-loop-canceler
|
||||
(c.m/run-task assets-sync-loop-task :assets-sync-loop-task))
|
||||
(c.m/run-task :assets-sync-loop-task
|
||||
assets-sync-loop-task))
|
||||
(->>
|
||||
(let [event (m/?> mixed-flow)]
|
||||
(case (:type event)
|
||||
@@ -348,10 +349,11 @@
|
||||
(let [{:keys [rtc-state-flow *rtc-auto-push? *rtc-remote-profile? rtc-loop-task *online-users onstarted-task]}
|
||||
(create-rtc-loop graph-uuid schema-version repo conn date-formatter token)
|
||||
*last-stop-exception (atom nil)
|
||||
canceler (c.m/run-task rtc-loop-task :rtc-loop-task
|
||||
:fail (fn [e]
|
||||
(reset! *last-stop-exception e)
|
||||
(log/info :rtc-loop-task e)))
|
||||
canceler (c.m/run-task :rtc-loop-task
|
||||
rtc-loop-task
|
||||
:fail (fn [e]
|
||||
(reset! *last-stop-exception e)
|
||||
(log/info :rtc-loop-task e)))
|
||||
start-ex (m/? onstarted-task)]
|
||||
(if-let [start-ex (:ex-data start-ex)]
|
||||
(do (log/info :start-ex start-ex) (r.ex/->map start-ex))
|
||||
|
||||
@@ -41,10 +41,10 @@
|
||||
(t/async
|
||||
done
|
||||
(c.m/run-task-throw
|
||||
:clear-test-remote-graphs
|
||||
(m/sp
|
||||
(m/? helper/new-task--clear-all-test-remote-graphs)
|
||||
(done))
|
||||
:clear-test-remote-graphs)))})
|
||||
(done)))))})
|
||||
|
||||
(def upload-example-graph-fixture
|
||||
{:before
|
||||
@@ -52,6 +52,7 @@
|
||||
(t/async
|
||||
done
|
||||
(c.m/run-task-throw
|
||||
:upload-example-graph-fixture
|
||||
(m/sp
|
||||
(swap! worker-state/*datascript-conns dissoc const/downloaded-test-repo)
|
||||
(swap! worker-state/*client-ops-conns assoc
|
||||
@@ -60,14 +61,14 @@
|
||||
(assert (some? graph-uuid))
|
||||
(m/? (helper/new-task--wait-creating-graph graph-uuid))
|
||||
(println :uploaded-graph graph-uuid))
|
||||
(done))
|
||||
:upload-example-graph-fixture)))})
|
||||
(done)))))})
|
||||
|
||||
(def build-conn-by-download-example-graph-fixture
|
||||
{:before
|
||||
#(t/async
|
||||
done
|
||||
(c.m/run-task-throw
|
||||
:build-conn-by-download-example-graph-fixture
|
||||
(m/sp
|
||||
(swap! worker-state/*datascript-conns dissoc const/downloaded-test-repo)
|
||||
(swap! worker-state/*client-ops-conns assoc
|
||||
@@ -75,8 +76,7 @@
|
||||
(let [graph-uuid (m/? helper/new-task--get-remote-example-graph-uuid)]
|
||||
(assert (some? graph-uuid))
|
||||
(m/? (helper/new-task--download-graph graph-uuid const/downloaded-test-graph-name)))
|
||||
(done))
|
||||
:build-conn-by-download-example-graph-fixture))
|
||||
(done))))
|
||||
:after
|
||||
#(do (swap! worker-state/*datascript-conns dissoc const/downloaded-test-repo)
|
||||
(swap! worker-state/*client-ops-conns dissoc const/downloaded-test-repo))})
|
||||
|
||||
Reference in New Issue
Block a user