mirror of
https://github.com/logseq/logseq.git
synced 2026-04-24 22:25:01 +00:00
enhance(db-sync): remove declare
This commit is contained in:
121
deps/db-sync/src/logseq/db_sync/worker.cljs
vendored
121
deps/db-sync/src/logseq/db_sync/worker.cljs
vendored
@@ -18,7 +18,6 @@
|
||||
|
||||
(glogi-console/install!)
|
||||
|
||||
(declare handle-assets)
|
||||
(defn- bearer-token [auth-header]
|
||||
(when (and (string? auth-header) (string/starts-with? auth-header "Bearer "))
|
||||
(subs auth-header 7)))
|
||||
@@ -144,65 +143,6 @@
|
||||
(when (and (not= ws sender) (ws-open? ws))
|
||||
(send! ws msg)))))
|
||||
|
||||
(defn- handle-worker-fetch [request ^js env]
|
||||
(let [url (js/URL. (.-url request))
|
||||
path (.-pathname url)
|
||||
method (.-method request)]
|
||||
(cond
|
||||
(= path "/health")
|
||||
(json-response :worker/health {:ok true})
|
||||
|
||||
(or (= path "/graphs")
|
||||
(string/starts-with? path "/graphs/"))
|
||||
(.fetch (index-stub env) request)
|
||||
|
||||
(string/starts-with? path "/assets/")
|
||||
(if (= method "OPTIONS")
|
||||
(handle-assets request env)
|
||||
(if-let [{:keys [graph-id]} (parse-asset-path path)]
|
||||
(p/let [access-resp (graph-access-response request env graph-id)]
|
||||
(if (.-ok access-resp)
|
||||
(handle-assets request env)
|
||||
access-resp))
|
||||
(bad-request "invalid asset path")))
|
||||
|
||||
(= method "OPTIONS")
|
||||
(common/options-response)
|
||||
|
||||
(string/starts-with? path "/sync/")
|
||||
(let [prefix (count "/sync/")
|
||||
rest-path (subs path prefix)
|
||||
rest-path (if (string/starts-with? rest-path "/")
|
||||
(subs rest-path 1)
|
||||
rest-path)
|
||||
slash-idx (or (string/index-of rest-path "/") -1)
|
||||
graph-id (if (neg? slash-idx) rest-path (subs rest-path 0 slash-idx))
|
||||
tail (if (neg? slash-idx)
|
||||
"/"
|
||||
(subs rest-path slash-idx))
|
||||
new-url (str (.-origin url) tail (.-search url))]
|
||||
(if (seq graph-id)
|
||||
(if (= method "OPTIONS")
|
||||
(common/options-response)
|
||||
(p/let [access-resp (graph-access-response request env graph-id)]
|
||||
(if (.-ok access-resp)
|
||||
(let [^js namespace (.-LOGSEQ_SYNC_DO env)
|
||||
do-id (.idFromName namespace graph-id)
|
||||
stub (.get namespace do-id)]
|
||||
(if (common/upgrade-request? request)
|
||||
(.fetch stub request)
|
||||
(let [rewritten (js/Request. new-url request)]
|
||||
(.fetch stub rewritten))))
|
||||
access-resp)))
|
||||
(bad-request "missing graph id")))
|
||||
|
||||
:else
|
||||
(not-found))))
|
||||
|
||||
(def worker
|
||||
#js {:fetch (fn [request env _ctx]
|
||||
(handle-worker-fetch request env))})
|
||||
|
||||
(defn- parse-int [value]
|
||||
(when (some? value)
|
||||
(let [n (js/parseInt value 10)]
|
||||
@@ -291,7 +231,66 @@
|
||||
(json-response :assets/delete {:ok true} 200)))
|
||||
|
||||
(error-response "method not allowed" 405))))
|
||||
(error-response "invalid asset path" 400)))))
|
||||
(error-response "invalid asset path" 400)))))
|
||||
|
||||
(defn- handle-worker-fetch [request ^js env]
|
||||
(let [url (js/URL. (.-url request))
|
||||
path (.-pathname url)
|
||||
method (.-method request)]
|
||||
(cond
|
||||
(= path "/health")
|
||||
(json-response :worker/health {:ok true})
|
||||
|
||||
(or (= path "/graphs")
|
||||
(string/starts-with? path "/graphs/"))
|
||||
(.fetch (index-stub env) request)
|
||||
|
||||
(string/starts-with? path "/assets/")
|
||||
(if (= method "OPTIONS")
|
||||
(handle-assets request env)
|
||||
(if-let [{:keys [graph-id]} (parse-asset-path path)]
|
||||
(p/let [access-resp (graph-access-response request env graph-id)]
|
||||
(if (.-ok access-resp)
|
||||
(handle-assets request env)
|
||||
access-resp))
|
||||
(bad-request "invalid asset path")))
|
||||
|
||||
(= method "OPTIONS")
|
||||
(common/options-response)
|
||||
|
||||
(string/starts-with? path "/sync/")
|
||||
(let [prefix (count "/sync/")
|
||||
rest-path (subs path prefix)
|
||||
rest-path (if (string/starts-with? rest-path "/")
|
||||
(subs rest-path 1)
|
||||
rest-path)
|
||||
slash-idx (or (string/index-of rest-path "/") -1)
|
||||
graph-id (if (neg? slash-idx) rest-path (subs rest-path 0 slash-idx))
|
||||
tail (if (neg? slash-idx)
|
||||
"/"
|
||||
(subs rest-path slash-idx))
|
||||
new-url (str (.-origin url) tail (.-search url))]
|
||||
(if (seq graph-id)
|
||||
(if (= method "OPTIONS")
|
||||
(common/options-response)
|
||||
(p/let [access-resp (graph-access-response request env graph-id)]
|
||||
(if (.-ok access-resp)
|
||||
(let [^js namespace (.-LOGSEQ_SYNC_DO env)
|
||||
do-id (.idFromName namespace graph-id)
|
||||
stub (.get namespace do-id)]
|
||||
(if (common/upgrade-request? request)
|
||||
(.fetch stub request)
|
||||
(let [rewritten (js/Request. new-url request)]
|
||||
(.fetch stub rewritten))))
|
||||
access-resp)))
|
||||
(bad-request "missing graph id")))
|
||||
|
||||
:else
|
||||
(not-found))))
|
||||
|
||||
(def worker
|
||||
#js {:fetch (fn [request env _ctx]
|
||||
(handle-worker-fetch request env))})
|
||||
|
||||
(defn- pull-response [^js self since]
|
||||
(let [sql (.-sql self)
|
||||
|
||||
@@ -229,8 +229,6 @@
|
||||
(some-> (:block/uuid ent) str)))))
|
||||
(distinct)))
|
||||
|
||||
(declare enqueue-asset-downloads!)
|
||||
|
||||
(defn- ensure-recycle-page!
|
||||
[conn]
|
||||
(let [db @conn]
|
||||
@@ -260,19 +258,6 @@
|
||||
:outliner-op :fix-missing-parent}
|
||||
(outliner-core/move-blocks! conn parents recycle-id {:sibling? false})))))
|
||||
|
||||
(defn- apply-remote-tx! [repo client tx-data]
|
||||
(if-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(try
|
||||
(let [tx-report (ldb/transact! conn tx-data {:rtc-tx? true})
|
||||
db-after (:db-after tx-report)
|
||||
asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))]
|
||||
(move-missing-parents conn tx-report)
|
||||
(when (seq asset-uuids)
|
||||
(enqueue-asset-downloads! repo client asset-uuids)))
|
||||
(catch :default e
|
||||
(log/error :db-sync/apply-remote-tx-failed {:error e})))
|
||||
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
|
||||
|
||||
(defn- reconcile-cycle! [repo attr server_values]
|
||||
(if-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(let [db @conn
|
||||
@@ -346,13 +331,58 @@
|
||||
:else
|
||||
[tx]))))))
|
||||
|
||||
(declare flush-pending!)
|
||||
(declare remove-pending-txs!)
|
||||
(declare persist-local-tx!)
|
||||
(declare client-ops-conn)
|
||||
(defn- client-ops-conn [repo]
|
||||
(worker-state/get-client-ops-conn repo))
|
||||
|
||||
(defn- persist-local-tx! [repo tx-str]
|
||||
(when-let [conn (client-ops-conn repo)]
|
||||
(let [tx-id (random-uuid)
|
||||
now (.now js/Date)]
|
||||
(ldb/transact! conn [{:db-sync/tx-id tx-id
|
||||
:db-sync/tx tx-str
|
||||
:db-sync/created-at now}])
|
||||
tx-id)))
|
||||
|
||||
(defn- pending-txs
|
||||
[repo limit]
|
||||
(when-let [conn (client-ops-conn repo)]
|
||||
(let [db @conn
|
||||
datoms (d/datoms db :avet :db-sync/created-at)]
|
||||
(->> datoms
|
||||
(map (fn [datom]
|
||||
(d/entity db (:e datom))))
|
||||
(keep (fn [ent]
|
||||
(when-let [tx-id (:db-sync/tx-id ent)]
|
||||
{:tx-id tx-id
|
||||
:tx (:db-sync/tx ent)})))
|
||||
(take limit)
|
||||
(vec)))))
|
||||
|
||||
(defn- remove-pending-txs!
|
||||
[repo tx-ids]
|
||||
(when (seq tx-ids)
|
||||
(when-let [conn (client-ops-conn repo)]
|
||||
(ldb/transact! conn
|
||||
(mapv (fn [tx-id]
|
||||
[:db.fn/retractEntity [:db-sync/tx-id tx-id]])
|
||||
tx-ids)))))
|
||||
|
||||
(defn- flush-pending!
|
||||
[repo client]
|
||||
(let [inflight @(:inflight client)]
|
||||
(when (empty? inflight)
|
||||
(when-let [ws (:ws client)]
|
||||
(when (ws-open? ws)
|
||||
(let [batch (pending-txs repo 50)]
|
||||
(when (seq batch)
|
||||
(let [tx-ids (mapv :tx-id batch)
|
||||
txs (mapv :tx batch)]
|
||||
(when (seq txs)
|
||||
(reset! (:inflight client) tx-ids)
|
||||
(send! ws {:type "tx/batch"
|
||||
:t_before (or (client-op/get-local-tx repo) 0)
|
||||
:txs txs}))))))))))
|
||||
|
||||
(declare enqueue-asset-sync!)
|
||||
(declare enqueue-asset-initial-download!)
|
||||
(defn- pending-txs-by-ids
|
||||
[repo tx-ids]
|
||||
(if-let [conn (client-ops-conn repo)]
|
||||
@@ -407,86 +437,6 @@
|
||||
(take 10)))
|
||||
(fail-fast :db-sync/missing-db {:repo repo :op :cycle-entity-titles})))
|
||||
|
||||
(defn- handle-message! [repo client raw]
|
||||
(let [message (-> raw parse-message coerce-ws-server-message)]
|
||||
(when-not (map? message)
|
||||
(fail-fast :db-sync/response-parse-failed {:repo repo :raw raw}))
|
||||
(let [local-tx (or (client-op/get-local-tx repo) 0)
|
||||
remote-tx (:t message)]
|
||||
(case (:type message)
|
||||
"hello" (do
|
||||
(require-non-negative remote-tx {:repo repo :type "hello"})
|
||||
(when (> remote-tx local-tx)
|
||||
(send! (:ws client) {:type "pull" :since local-tx}))
|
||||
(enqueue-asset-sync! repo client)
|
||||
(enqueue-asset-initial-download! repo client)
|
||||
(flush-pending! repo client))
|
||||
;; Upload response
|
||||
"tx/batch/ok" (do
|
||||
(require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
|
||||
(client-op/update-local-tx repo remote-tx)
|
||||
(remove-pending-txs! repo @(:inflight client))
|
||||
(reset! (:inflight client) [])
|
||||
(flush-pending! repo client))
|
||||
;; Download response
|
||||
;; Merge batch txs to one tx, does it really work? We'll see
|
||||
"pull/ok" (let [txs (:txs message)
|
||||
_ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
|
||||
_ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
|
||||
tx (mapcat (fn [data]
|
||||
(parse-transit (:tx data) {:repo repo :type "pull/ok"}))
|
||||
txs)]
|
||||
(when tx
|
||||
(apply-remote-tx! repo client tx)
|
||||
(client-op/update-local-tx repo remote-tx)
|
||||
(flush-pending! repo client)))
|
||||
"changed" (do
|
||||
(require-non-negative remote-tx {:repo repo :type "changed"})
|
||||
(when (< local-tx remote-tx)
|
||||
(send! (:ws client) {:type "pull" :since local-tx})))
|
||||
"tx/reject" (let [reason (:reason message)]
|
||||
(when (nil? reason)
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :reason}))
|
||||
(when (contains? message :t)
|
||||
(require-non-negative remote-tx {:repo repo :type "tx/reject"}))
|
||||
(case reason
|
||||
"stale"
|
||||
(send! (:ws client) {:type "pull" :since local-tx})
|
||||
"cycle"
|
||||
(do
|
||||
(when (nil? (:data message))
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :data}))
|
||||
(let [{:keys [attr server_values]}
|
||||
(parse-transit (:data message) {:repo repo :type "tx/reject"})]
|
||||
(when (nil? attr)
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :attr}))
|
||||
(when (nil? server_values)
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :server_values}))
|
||||
;; FIXME: fix cycle shouldn't re-trigger uploading
|
||||
(let [inflight-ids @(:inflight client)
|
||||
inflight-entries (pending-txs-by-ids repo inflight-ids)]
|
||||
(log/info :db-sync/tx-reject-cycle
|
||||
{:repo repo
|
||||
:attr attr
|
||||
:server-values (count server_values)
|
||||
:entity-titles (cycle-entity-titles repo server_values)
|
||||
:inflight-ids (count inflight-ids)
|
||||
:local-tx local-tx
|
||||
:remote-tx remote-tx})
|
||||
(reconcile-cycle! repo attr server_values)
|
||||
(remove-pending-txs! repo inflight-ids)
|
||||
(reset! (:inflight client) [])
|
||||
(requeue-non-parent-txs! repo attr server_values inflight-entries))
|
||||
(flush-pending! repo client)))
|
||||
(fail-fast :db-sync/invalid-field
|
||||
{:repo repo :type "tx/reject" :reason reason})))
|
||||
(fail-fast :db-sync/invalid-field
|
||||
{:repo repo :type (:type message)})))))
|
||||
|
||||
(defn- ensure-client-state! [repo]
|
||||
(or (get @worker-state/*db-sync-clients repo)
|
||||
(let [client {:repo repo
|
||||
@@ -712,57 +662,98 @@
|
||||
(p/resolved nil)))
|
||||
(p/resolved nil)))))
|
||||
|
||||
(defn- client-ops-conn [repo]
|
||||
(worker-state/get-client-ops-conn repo))
|
||||
(defn- apply-remote-tx! [repo client tx-data]
|
||||
(if-let [conn (worker-state/get-datascript-conn repo)]
|
||||
(try
|
||||
(let [tx-report (ldb/transact! conn tx-data {:rtc-tx? true})
|
||||
db-after (:db-after tx-report)
|
||||
asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))]
|
||||
(move-missing-parents conn tx-report)
|
||||
(when (seq asset-uuids)
|
||||
(enqueue-asset-downloads! repo client asset-uuids)))
|
||||
(catch :default e
|
||||
(log/error :db-sync/apply-remote-tx-failed {:error e})))
|
||||
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
|
||||
|
||||
(defn- persist-local-tx! [repo tx-str]
|
||||
(when-let [conn (client-ops-conn repo)]
|
||||
(let [tx-id (random-uuid)
|
||||
now (.now js/Date)]
|
||||
(ldb/transact! conn [{:db-sync/tx-id tx-id
|
||||
:db-sync/tx tx-str
|
||||
:db-sync/created-at now}])
|
||||
tx-id)))
|
||||
|
||||
(defn- pending-txs
|
||||
[repo limit]
|
||||
(when-let [conn (client-ops-conn repo)]
|
||||
(let [db @conn
|
||||
datoms (d/datoms db :avet :db-sync/created-at)]
|
||||
(->> datoms
|
||||
(map (fn [datom]
|
||||
(d/entity db (:e datom))))
|
||||
(keep (fn [ent]
|
||||
(when-let [tx-id (:db-sync/tx-id ent)]
|
||||
{:tx-id tx-id
|
||||
:tx (:db-sync/tx ent)})))
|
||||
(take limit)
|
||||
(vec)))))
|
||||
|
||||
(defn- remove-pending-txs!
|
||||
[repo tx-ids]
|
||||
(when (seq tx-ids)
|
||||
(when-let [conn (client-ops-conn repo)]
|
||||
(ldb/transact! conn
|
||||
(mapv (fn [tx-id]
|
||||
[:db.fn/retractEntity [:db-sync/tx-id tx-id]])
|
||||
tx-ids)))))
|
||||
|
||||
(defn- flush-pending!
|
||||
[repo client]
|
||||
(let [inflight @(:inflight client)]
|
||||
(when (empty? inflight)
|
||||
(when-let [ws (:ws client)]
|
||||
(when (ws-open? ws)
|
||||
(let [batch (pending-txs repo 50)]
|
||||
(when (seq batch)
|
||||
(let [tx-ids (mapv :tx-id batch)
|
||||
txs (mapv :tx batch)]
|
||||
(when (seq txs)
|
||||
(reset! (:inflight client) tx-ids)
|
||||
(send! ws {:type "tx/batch"
|
||||
:t_before (or (client-op/get-local-tx repo) 0)
|
||||
:txs txs}))))))))))
|
||||
(defn- handle-message! [repo client raw]
|
||||
(let [message (-> raw parse-message coerce-ws-server-message)]
|
||||
(when-not (map? message)
|
||||
(fail-fast :db-sync/response-parse-failed {:repo repo :raw raw}))
|
||||
(let [local-tx (or (client-op/get-local-tx repo) 0)
|
||||
remote-tx (:t message)]
|
||||
(case (:type message)
|
||||
"hello" (do
|
||||
(require-non-negative remote-tx {:repo repo :type "hello"})
|
||||
(when (> remote-tx local-tx)
|
||||
(send! (:ws client) {:type "pull" :since local-tx}))
|
||||
(enqueue-asset-sync! repo client)
|
||||
(enqueue-asset-initial-download! repo client)
|
||||
(flush-pending! repo client))
|
||||
;; Upload response
|
||||
"tx/batch/ok" (do
|
||||
(require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
|
||||
(client-op/update-local-tx repo remote-tx)
|
||||
(remove-pending-txs! repo @(:inflight client))
|
||||
(reset! (:inflight client) [])
|
||||
(flush-pending! repo client))
|
||||
;; Download response
|
||||
;; Merge batch txs to one tx, does it really work? We'll see
|
||||
"pull/ok" (let [txs (:txs message)
|
||||
_ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
|
||||
_ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
|
||||
tx (mapcat (fn [data]
|
||||
(parse-transit (:tx data) {:repo repo :type "pull/ok"}))
|
||||
txs)]
|
||||
(when tx
|
||||
(apply-remote-tx! repo client tx)
|
||||
(client-op/update-local-tx repo remote-tx)
|
||||
(flush-pending! repo client)))
|
||||
"changed" (do
|
||||
(require-non-negative remote-tx {:repo repo :type "changed"})
|
||||
(when (< local-tx remote-tx)
|
||||
(send! (:ws client) {:type "pull" :since local-tx})))
|
||||
"tx/reject" (let [reason (:reason message)]
|
||||
(when (nil? reason)
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :reason}))
|
||||
(when (contains? message :t)
|
||||
(require-non-negative remote-tx {:repo repo :type "tx/reject"}))
|
||||
(case reason
|
||||
"stale"
|
||||
(send! (:ws client) {:type "pull" :since local-tx})
|
||||
"cycle"
|
||||
(do
|
||||
(when (nil? (:data message))
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :data}))
|
||||
(let [{:keys [attr server_values]}
|
||||
(parse-transit (:data message) {:repo repo :type "tx/reject"})]
|
||||
(when (nil? attr)
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :attr}))
|
||||
(when (nil? server_values)
|
||||
(fail-fast :db-sync/missing-field
|
||||
{:repo repo :type "tx/reject" :field :server_values}))
|
||||
;; FIXME: fix cycle shouldn't re-trigger uploading
|
||||
(let [inflight-ids @(:inflight client)
|
||||
inflight-entries (pending-txs-by-ids repo inflight-ids)]
|
||||
(log/info :db-sync/tx-reject-cycle
|
||||
{:repo repo
|
||||
:attr attr
|
||||
:server-values (count server_values)
|
||||
:entity-titles (cycle-entity-titles repo server_values)
|
||||
:inflight-ids (count inflight-ids)
|
||||
:local-tx local-tx
|
||||
:remote-tx remote-tx})
|
||||
(reconcile-cycle! repo attr server_values)
|
||||
(remove-pending-txs! repo inflight-ids)
|
||||
(reset! (:inflight client) [])
|
||||
(requeue-non-parent-txs! repo attr server_values inflight-entries))
|
||||
(flush-pending! repo client)))
|
||||
(fail-fast :db-sync/invalid-field
|
||||
{:repo repo :type "tx/reject" :reason reason})))
|
||||
(fail-fast :db-sync/invalid-field
|
||||
{:repo repo :type (:type message)})))))
|
||||
|
||||
(declare connect!)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user