diff --git a/deps/worker-sync/src/logseq/worker_sync/common.cljs b/deps/worker-sync/src/logseq/worker_sync/common.cljs index 4b3e382d7a..c4e96ce98c 100644 --- a/deps/worker-sync/src/logseq/worker_sync/common.cljs +++ b/deps/worker-sync/src/logseq/worker_sync/common.cljs @@ -7,8 +7,8 @@ (defn- cors-headers [] #js {"access-control-allow-origin" "*" - "access-control-allow-headers" "content-type" - "access-control-allow-methods" "GET,POST,DELETE,OPTIONS"}) + "access-control-allow-headers" "content-type,x-amz-meta-checksum,x-amz-meta-type" + "access-control-allow-methods" "GET,POST,PUT,DELETE,OPTIONS"}) (defn json-response ([data] (json-response data 200)) diff --git a/deps/worker-sync/src/logseq/worker_sync/worker.cljs b/deps/worker-sync/src/logseq/worker_sync/worker.cljs index 63f6429a02..1db2a74b94 100644 --- a/deps/worker-sync/src/logseq/worker_sync/worker.cljs +++ b/deps/worker-sync/src/logseq/worker_sync/worker.cljs @@ -306,56 +306,68 @@ path)) (defn- handle-http [^js self request] - (let [url (js/URL. (.-url request)) - raw-path (.-pathname url) - path (strip-sync-prefix raw-path) - method (.-method request)] - (cond - (= method "OPTIONS") - (common/options-response) + (letfn [(with-cors-error [resp] + (if (instance? js/Promise resp) + (.catch resp + (fn [e] + (log/error :worker-sync/http-error {:error e}) + (common/json-response {:error "server error"} 500))) + resp))] + (try + (let [url (js/URL. (.-url request)) + raw-path (.-pathname url) + path (strip-sync-prefix raw-path) + method (.-method request)] + (with-cors-error + (cond + (= method "OPTIONS") + (common/options-response) - (and (= method "GET") (= path "/health")) - (common/json-response {:ok true}) + (and (= method "GET") (= path "/health")) + (common/json-response {:ok true}) - (and (= method "GET") (= path "/pull")) - (let [since (or (parse-int (.get (.-searchParams url) "since")) 0)] - (common/json-response (pull-response self since))) + (and (= method "GET") (= path "/pull")) + (let [since (or (parse-int (.get (.-searchParams url) "since")) 0)] + (common/json-response (pull-response self since))) - (and (= method "GET") (= path "/snapshot")) - (common/json-response (snapshot-response self)) + (and (= method "GET") (= path "/snapshot")) + (common/json-response (snapshot-response self)) - (and (= method "DELETE") (= path "/admin/reset")) - (do - (common/sql-exec (.-sql self) "drop table if exists kvs") - (common/sql-exec (.-sql self) "drop table if exists tx_log") - (common/sql-exec (.-sql self) "drop table if exists sync_meta") - (storage/init-schema! (.-sql self)) - (common/json-response {:ok true})) + (and (= method "DELETE") (= path "/admin/reset")) + (do + (common/sql-exec (.-sql self) "drop table if exists kvs") + (common/sql-exec (.-sql self) "drop table if exists tx_log") + (common/sql-exec (.-sql self) "drop table if exists sync_meta") + (storage/init-schema! (.-sql self)) + (common/json-response {:ok true})) - (and (= method "POST") (= path "/tx")) - (.then (common/read-json request) - (fn [result] - (if (nil? result) - (common/bad-request "missing body") - (let [tx-data (protocol/transit->tx (aget result "tx")) - t-before (parse-int (aget result "t_before"))] - (if (sequential? tx-data) - (common/json-response (handle-tx! self nil tx-data t-before)) - (common/bad-request "invalid tx")))))) + (and (= method "POST") (= path "/tx")) + (.then (common/read-json request) + (fn [result] + (if (nil? result) + (common/bad-request "missing body") + (let [tx-data (protocol/transit->tx (aget result "tx")) + t-before (parse-int (aget result "t_before"))] + (if (sequential? tx-data) + (common/json-response (handle-tx! self nil tx-data t-before)) + (common/bad-request "invalid tx")))))) - (and (= method "POST") (= path "/tx/batch")) - (.then (common/read-json request) - (fn [result] - (if (nil? result) - (common/bad-request "missing body") - (let [txs (js->clj (aget result "txs")) - t-before (parse-int (aget result "t_before"))] - (if (and (sequential? txs) (every? string? txs)) - (common/json-response (handle-tx-batch! self nil txs t-before)) - (common/bad-request "invalid tx")))))) + (and (= method "POST") (= path "/tx/batch")) + (.then (common/read-json request) + (fn [result] + (if (nil? result) + (common/bad-request "missing body") + (let [txs (js->clj (aget result "txs")) + t-before (parse-int (aget result "t_before"))] + (if (and (sequential? txs) (every? string? txs)) + (common/json-response (handle-tx-batch! self nil txs t-before)) + (common/bad-request "invalid tx")))))) - :else - (common/not-found)))) + :else + (common/not-found)))) + (catch :default e + (log/error :worker-sync/http-error {:error e}) + (common/json-response {:error "server error"} 500))))) (defclass SyncDO (extends DurableObject) diff --git a/src/main/frontend/components/repo.cljs b/src/main/frontend/components/repo.cljs index dd4e4240f5..089df77786 100644 --- a/src/main/frontend/components/repo.cljs +++ b/src/main/frontend/components/repo.cljs @@ -122,8 +122,7 @@ token (state/get-auth-id-token) remote-graph-name (config/db-graph-name (state/get-current-repo))] (when (and token remote-graph-name) - (state/js opts)) + text (.text resp) + data (when (seq text) (js/JSON.parse text))] + (if (.-ok resp) + (js->clj data :keywordize-keys true) + (throw (ex-info "worker-sync request failed" + {:status (.-status resp) + :url url + :body data}))))) + (defn- update-server-t! [client t] (when (number? t) (reset! (:server-t client) t))) @@ -553,3 +566,59 @@ (enqueue-local-tx! repo tx-report) (when-let [client (get @worker-state/*worker-sync-clients repo)] (enqueue-asset-sync! repo client)))) + +(def ^:private upload-max-retries 3) + +(defn upload-graph! + [repo] + (let [base (http-base-url) + graph-id (get-graph-id repo)] + (if-not (and (seq base) (seq graph-id)) + (p/rejected (ex-info "worker-sync missing upload info" + {:repo repo :base base :graph-id graph-id})) + (if-let [conn (worker-state/get-datascript-conn repo)] + (let [db @conn + datoms (seq (d/datoms db :eavt))] + (ensure-client-graph-uuid! repo graph-id) + (p/loop [remaining datoms + t-before 0 + retries 0] + (if (empty? remaining) + (do + (client-op/add-all-exists-asset-as-ops repo) + {:graph-id graph-id}) + (let [[chunk rest] (split-at upload-batch-size remaining) + normalized (normalize-tx-data db db chunk)] + (if (empty? normalized) + (p/recur (seq rest) t-before 0) + (p/let [tx-str (sqlite-util/write-transit-str normalized) + resp (fetch-json (str base "/sync/" graph-id "/tx/batch") + {:method "POST" + :headers {"content-type" "application/json"} + :body (js/JSON.stringify + #js {:t_before t-before + :txs #js [tx-str]})})] + (cond + (= "tx/batch/ok" (:type resp)) + (p/recur (seq rest) (:t resp) 0) + + (= "tx/reject" (:type resp)) + (if (= "stale" (:reason resp)) + (if (< retries upload-max-retries) + (p/recur remaining (:t resp) (inc retries)) + (throw (ex-info "worker-sync upload stale limit" + {:repo repo + :graph-id graph-id + :response resp}))) + (throw (ex-info "worker-sync upload rejected" + {:repo repo + :graph-id graph-id + :response resp}))) + + :else + (throw (ex-info "worker-sync upload failed" + {:repo repo + :graph-id graph-id + :response resp}))))))))) + (p/rejected (ex-info "worker-sync missing db" + {:repo repo :graph-id graph-id}))))))