enhance(rtc): support receive s3-presign-url from ws-conn

This commit is contained in:
rcmerci
2023-11-09 23:02:48 +08:00
parent fa8ffc0caf
commit 471d3978ae
8 changed files with 131 additions and 77 deletions

View File

@@ -0,0 +1,19 @@
(ns frontend.async-util)
(defmacro go-try
[& body]
`(cljs.core.async/go
(try
~@body
(catch :default e#
e#))))
#?(:cljs
(defn throw-err
[v]
(if (instance? ExceptionInfo v) (throw v) v)))
(defmacro <?
[port]
`(throw-err (cljs.core.async/<! ~port)))

View File

@@ -73,6 +73,7 @@
[:t {:optional true} :int]
[:t-before {:optional true} :int]
[:failed-ops {:optional true} [:sequential op-schema]]
[:s3-presign-url {:optional true} :string]
[:affected-blocks {:optional true}
[:map-of :uuid
[:multi {:dispatch :op :decode/string #(update % :op keyword)}
@@ -109,8 +110,11 @@
[:remove-page
[:map {:closed true}
[:op :keyword]
[:block-uuid :uuid]]]]]]])
[:block-uuid :uuid]]]]]]
[:ex-data {:optional true} [:map [:type :keyword]]]
[:ex-message {:optional true} :any]])
(def data-from-ws-decoder (m/decoder data-from-ws-schema mt/string-transformer))
(def data-from-ws-coercer (m/coercer data-from-ws-schema mt/string-transformer))
(def data-from-ws-validator (m/validator data-from-ws-schema))
@@ -163,6 +167,5 @@
[:action :string]
[:graph-uuid :string]
[:block-uuids [:sequential :uuid]]]]]))
(def data-to-ws-decoder (m/decoder data-to-ws-schema mt/string-transformer))
(def data-to-ws-encoder (m/encoder data-to-ws-schema mt/string-transformer))
(def data-to-ws-validator (m/validator data-to-ws-schema))
(def data-to-ws-coercer (m/coercer data-to-ws-schema mt/string-transformer))

View File

@@ -17,6 +17,7 @@
[frontend.modules.outliner.transaction :as outliner-tx]
[frontend.state :as state]
[frontend.util :as util]
[frontend.async-util :include-macros true :refer [<? go-try]]
[malli.core :as m]
[malli.util :as mu]))
@@ -560,36 +561,43 @@
{:pre [(some? @(:*graph-uuid state))
(some? @(:*repo state))]}
(go
(let [repo @(:*repo state)
_ (op-mem-layer/new-branch! repo)
ops-for-remote (sort-remote-ops (gen-block-uuid->remote-ops repo))
local-tx (op-mem-layer/get-local-tx repo)
r (with-sub-data-from-ws state
(<! (ws/<send! state {:req-id (get-req-id)
:action "apply-ops" :graph-uuid @(:*graph-uuid state)
:ops ops-for-remote :t-before (or local-tx 1)}))
(<! (get-result-ch)))]
(if-let [remote-ex (:ex-data r)]
(case (:type remote-ex)
;; conflict-update remote-graph, keep these local-pending-ops
;; and try to send ops later
"graph-lock-failed"
(do (prn :graph-lock-failed)
(op-mem-layer/rollback! repo)
nil)
;; this case means something wrong in remote-graph data,
;; nothing to do at client-side
"graph-lock-missing"
(do (prn :graph-lock-missing)
(op-mem-layer/rollback! repo)
nil)
;; else
(do (op-mem-layer/rollback! repo)
(throw (ex-info "Unavailable" {:remote-ex remote-ex}))))
(do (assert (pos? (:t r)) r)
(op-mem-layer/commit! repo)
(<! (<apply-remote-data repo (rtc-const/data-from-ws-decoder r)))
(prn :<client-op-update-handler :t (:t r)))))))
(let [repo @(:*repo state)]
(op-mem-layer/new-branch! repo)
(try
(let [ops-for-remote (sort-remote-ops (gen-block-uuid->remote-ops repo))
local-tx (op-mem-layer/get-local-tx repo)
r (<? (ws/<send&receive state {:action "apply-ops" :graph-uuid @(:*graph-uuid state)
:ops ops-for-remote :t-before (or local-tx 1)}))]
(if-let [remote-ex (:ex-data r)]
(case (:type remote-ex)
;; conflict-update remote-graph, keep these local-pending-ops
;; and try to send ops later
:graph-lock-failed
(do (prn :graph-lock-failed)
(op-mem-layer/rollback! repo)
nil)
;; this case means something wrong in remote-graph data,
;; nothing to do at client-side
:graph-lock-missing
(do (prn :graph-lock-missing)
(op-mem-layer/rollback! repo)
nil)
:get-s3-object-failed
(do (prn ::get-s3-object-failed r)
(op-mem-layer/rollback! repo)
nil)
;; else
(do (op-mem-layer/rollback! repo)
(throw (ex-info "Unavailable" {:remote-ex remote-ex}))))
(do (assert (pos? (:t r)) r)
(op-mem-layer/commit! repo)
(<! (<apply-remote-data repo r))
(prn :<client-op-update-handler :t (:t r)))))
(catch :default e
(prn ::unknown-ex e)
(op-mem-layer/rollback! repo)
nil)))))
(defn- make-push-client-ops-timeout-ch
[repo never-timeout?]
@@ -608,7 +616,7 @@
(reset! (:*repo state) repo)
(reset! (:*rtc-state state) :open)
(let [{:keys [data-from-ws-pub _client-op-update-chan]} state
push-data-from-ws-ch (chan (async/sliding-buffer 100) (map rtc-const/data-from-ws-decoder))
push-data-from-ws-ch (chan (async/sliding-buffer 100) (map rtc-const/data-from-ws-coercer))
stop-rtc-loop-chan (chan)
*auto-push-client-ops? (:*auto-push-client-ops? state)
force-push-client-ops-ch (:force-push-client-ops-chan state)

View File

@@ -279,8 +279,7 @@
use `commit` to remove old-branch."
[repo]
(let [{:keys [current-branch old-branch]} (get @*ops-store repo)]
(assert (and (some? current-branch)
(nil? old-branch)))
(assert (some? current-branch) repo)
(swap! *ops-store assoc-in [repo :old-branch] current-branch)))
(defn rollback!

View File

@@ -2,11 +2,13 @@
"Websocket related util-fns"
(:require-macros
[frontend.db.rtc.macro :refer [with-sub-data-from-ws get-req-id get-result-ch]])
(:require [frontend.config :as config]
[frontend.util :as util]
[frontend.db.rtc.const :as rtc-const]
(:require [cljs-http.client :as http]
[cljs.core.async :as async :refer [<! chan go offer!]]
[frontend.state :as state]))
[frontend.config :as config]
[frontend.db.rtc.const :as rtc-const]
[frontend.state :as state]
[frontend.util :as util]
[frontend.async-util :include-macros true :refer [go-try <?]]))
(def WebSocketOPEN (if (= *target* "nodejs")
1
@@ -28,8 +30,7 @@
(defn send!
[ws message]
(assert (= WebSocketOPEN (.-readyState ws)))
(let [decoded-message (rtc-const/data-to-ws-decoder message)]
(assert (rtc-const/data-to-ws-validator decoded-message) message)
(let [decoded-message (rtc-const/data-to-ws-coercer message)]
(.send ws (js/JSON.stringify (clj->js (rtc-const/data-to-ws-encoder decoded-message))))))
(declare <send!)
@@ -37,26 +38,47 @@
"ensure websocket in state is OPEN, if not, make a connection, and
call init 'register-graph-updates' message"
[state]
(go
(let [ws @(:*ws state)]
(when (or (nil? ws)
(> (.-readyState ws) WebSocketOPEN))
(let [ws-opened-ch (chan)
token (state/get-auth-id-token)
ws* (ws-listen token (:data-from-ws-chan state) ws-opened-ch)]
(<! ws-opened-ch)
(reset! (:*ws state) ws*)
(when-let [graph-uuid @(:*graph-uuid state)]
(with-sub-data-from-ws state
(<! (<send! state {:action "register-graph-updates" :req-id (get-req-id) :graph-uuid graph-uuid}))
(<! (get-result-ch)))))))))
(go-try
(let [ws @(:*ws state)]
(when (or (nil? ws)
(> (.-readyState ws) WebSocketOPEN))
(let [ws-opened-ch (chan)
token (state/get-auth-id-token)
ws* (ws-listen token (:data-from-ws-chan state) ws-opened-ch)]
(<! ws-opened-ch)
(reset! (:*ws state) ws*)
(when-let [graph-uuid @(:*graph-uuid state)]
(with-sub-data-from-ws state
(<? (<send! state {:action "register-graph-updates" :req-id (get-req-id) :graph-uuid graph-uuid}))
(<! (get-result-ch)))))))))
(defn <send!
"ensure ws state=open, then send messages"
[state message]
(go
(<! (<ensure-ws-open! state))
(send! @(:*ws state) message)))
(go-try
(<? (<ensure-ws-open! state))
(send! @(:*ws state) message)))
(defn <send&receive
"Send 'message' to ws, and return response of this request.
When this response is too huge, backend will put it in s3 and return the presigned-url,
this fn will handle this case."
[state message]
(go-try
(with-sub-data-from-ws state
(<? (<send! state (assoc message :req-id (get-req-id))))
(let [resp (<! (get-result-ch))
resp*
(if-let [s3-presign-url (:s3-presign-url resp)]
(let [{:keys [status body]} (<! (http/get s3-presign-url {:with-credentials? false}))]
(if (http/unexceptional-status? status)
(js->clj (js/JSON.parse body) :keywordize-keys true)
{:req-id (get-req-id)
:ex-message "get s3 object failed"
:ex-data {:type :get-s3-object-failed :status status :body body}}))
resp)]
(rtc-const/data-from-ws-coercer resp*)))))
(defn stop

View File

@@ -1,19 +1,19 @@
(ns frontend.modules.outliner.pipeline
(:require [frontend.config :as config]
[frontend.db :as db]
[frontend.db.react :as react]
[frontend.modules.outliner.file :as file]
[logseq.outliner.datascript-report :as ds-report]
[logseq.outliner.pipeline :as outliner-pipeline]
[frontend.state :as state]
[frontend.util :as util]
[promesa.core :as p]
[frontend.persist-db :as persist-db]
(:require [clojure.core.async :as async :refer [<! go]]
[clojure.string :as string]
[datascript.core :as d]
[frontend.config :as config]
[frontend.db :as db]
[frontend.db.react :as react]
[frontend.handler.file-based.property.util :as property-util]
[frontend.modules.outliner.file :as file]
[frontend.persist-db :as persist-db]
[frontend.state :as state]
[frontend.util :as util]
[frontend.util.cursor :as cursor]
[frontend.util.drawer :as drawer]
[frontend.util.cursor :as cursor]))
[logseq.outliner.datascript-report :as ds-report]
[logseq.outliner.pipeline :as outliner-pipeline]))
(defn updated-page-hook
[tx-report page]
@@ -120,11 +120,12 @@
:update-tx-ids? true}))
(when (config/db-based-graph? repo)
(when-not config/publishing?
(p/let [_transact-result (persist-db/<transact-data repo upsert-blocks deleted-block-uuids)
(go
(let [_transact-result (<! (persist-db/<transact-data repo upsert-blocks deleted-block-uuids))
_ipc-result (comment ipc/ipc :db-transact-data repo
(pr-str
{:blocks upsert-blocks
:deleted-block-uuids deleted-block-uuids}))])))))
:deleted-block-uuids deleted-block-uuids}))]))))))
(when (and (not (:delete-files? tx-meta))
(not replace?))

View File

@@ -1,6 +1,7 @@
(ns frontend.persist-db.node
"Electron ipc based persistent db"
(:require [electron.ipc :as ipc]
(:require [cljs.core.async.interop :refer [p->c]]
[electron.ipc :as ipc]
[frontend.persist-db.protocol :as protocol]
[promesa.core :as p]))
@@ -17,10 +18,11 @@
(p/resolved nil))
(<transact-data [_this repo added-blocks deleted-block-uuids]
;; (prn ::transact-data repo added-blocks deleted-block-uuids)
(ipc/ipc :db-transact-data repo
(pr-str
{:blocks added-blocks
:deleted-block-uuids deleted-block-uuids})))
(p->c
(ipc/ipc :db-transact-data repo
(pr-str
{:blocks added-blocks
:deleted-block-uuids deleted-block-uuids}))))
(<fetch-initital-data [_this repo _opts]
(prn ::fetch-initial repo)
(ipc/ipc :get-initial-data repo))

View File

@@ -13,7 +13,7 @@
(let [msg (-> s
js/JSON.parse
(js->clj :keywordize-keys true)
rtc-const/data-to-ws-decoder)]
rtc-const/data-to-ws-coercer)]
(handler-fn msg push-data-to-client-chan)))
(set-handler-fn [_ f]