051-logseq-cli-sync-upload-fix.md

This commit is contained in:
rcmerci
2026-03-07 10:32:29 +08:00
parent fefab4d00f
commit c8da78efd1
10 changed files with 927 additions and 154 deletions

View File

@@ -488,10 +488,9 @@
(defn <rtc-upload-graph!
[repo graph-e2ee?]
(p/let [graph-id (<rtc-create-graph! repo graph-e2ee?)]
(when (nil? graph-id)
(throw (ex-info "graph id doesn't exist when uploading to server" {:repo repo})))
(let [graph-e2ee? (normalize-graph-e2ee? graph-e2ee?)]
(p/do!
(ldb/transact! repo [(sqlite-util/kv :logseq.kv/graph-rtc-e2ee? graph-e2ee?)])
(state/<invoke-db-worker :thread-api/db-sync-upload-graph repo)
(<get-remote-graphs)
(<rtc-start! repo))))

View File

@@ -345,6 +345,13 @@
(catch :default e
(fail-fast :db-sync/response-parse-failed (assoc context :error e)))))
(defn- coerce-http-request [schema-key body]
(if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
(let [coerced (coerce coercer body {:schema schema-key :dir :request})]
(when-not (= coerced invalid-coerce)
coerced))
body))
(defn- coerce-http-response [schema-key body]
(if-let [coercer (get db-sync-schema/http-response-coercers schema-key)]
(let [coerced (coerce coercer body {:schema schema-key :dir :response})]
@@ -352,6 +359,12 @@
coerced))
body))
(defn- normalize-graph-e2ee?
[graph-e2ee?]
(if (nil? graph-e2ee?)
true
(true? graph-e2ee?)))
(defn- reconnect-delay-ms [attempt]
(let [exp (js/Math.pow 2 attempt)
delay (min reconnect-max-delay-ms (* reconnect-base-delay-ms exp))
@@ -462,10 +475,9 @@
(catch :default _
nil)))
(defn- fetch-json
[url opts {:keys [response-schema error-schema] :or {error-schema :error}}]
(p/let [resp (js/fetch url (clj->js (with-auth-headers opts)))
text (.text resp)
(defn- parse-json-response-body
[resp url response-schema error-schema]
(p/let [text (.text resp)
data (when (seq text) (js/JSON.parse text))]
(if (.-ok resp)
(let [body (js->clj data :keywordize-keys true)
@@ -487,6 +499,11 @@
:url url
:body body}))))))
(defn- fetch-json
[url opts {:keys [response-schema error-schema] :or {error-schema :error}}]
(p/let [resp (js/fetch url (clj->js (with-auth-headers opts)))]
(parse-json-response-body resp url response-schema error-schema)))
(defn- require-auth-token!
[context]
(when-not (seq (auth-token))
@@ -625,18 +642,20 @@
(-restore [_ addr]
(restore-data-from-addr db addr))))
(defn- create-temp-sqlite-db
(defn- <create-temp-sqlite-db
[]
(if-let [sqlite @worker-state/*sqlite]
(let [^js DB (.-DB ^js (.-oo1 sqlite))
db (new DB ":memory:" "c")]
(p/let [db (platform/sqlite-open (platform/current)
{:sqlite sqlite
:path ":memory:"
:mode "c"})]
(common-sqlite/create-kvs-table! db)
db)
(fail-fast :db-sync/missing-field {:field :sqlite})))
(defn- <create-temp-sqlite-conn
[schema datoms]
(p/let [db (create-temp-sqlite-db)
(p/let [db (<create-temp-sqlite-db)
storage (new-temp-sqlite-storage db)
conn (d/conn-from-datoms datoms schema {:storage storage})]
{:db db
@@ -2094,8 +2113,21 @@
first)]
(first (bean/->clj result))))
(defn- normalize-snapshot-row-value [value]
(cond
(nil? value) nil
(string? value) value
(number? value) value
(boolean? value) value
(= "bigint" (js* "typeof ~{}" value)) (str value)
(instance? js/Uint8Array value) (.decode text-decoder value)
(instance? js/ArrayBuffer value) (.decode text-decoder (js/Uint8Array. value))
:else (str value)))
(defn- normalize-snapshot-rows [rows]
(mapv (fn [row] (vec row)) (array-seq rows)))
(mapv (fn [row]
(mapv normalize-snapshot-row-value (vec row)))
(array-seq rows)))
(defn- encode-snapshot-rows [rows]
(.encode snapshot-text-encoder (sqlite-util/write-transit-str rows)))
@@ -2121,17 +2153,8 @@
(defn- <snapshot-upload-body
[rows]
(let [frame (frame-bytes (encode-snapshot-rows rows))
stream (js/ReadableStream.
#js {:start (fn [controller]
(.enqueue controller frame)
(.close controller))})
use-compression? (exists? js/CompressionStream)
body (if use-compression? (maybe-compress-stream stream) stream)]
(if use-compression?
(p/let [buf (<buffer-stream body)]
{:body buf :encoding snapshot-content-encoding})
(p/resolved {:body frame :encoding nil}))))
(let [frame (frame-bytes (encode-snapshot-rows rows))]
(p/resolved {:body frame :encoding nil})))
(defn- set-graph-sync-metadata!
[repo graph-e2ee?]
@@ -2139,6 +2162,14 @@
(ldb/transact! conn [(ldb/kv :logseq.kv/graph-remote? true)
(ldb/kv :logseq.kv/graph-rtc-e2ee? (true? graph-e2ee?))])))
(defn- persist-upload-graph-identity!
[repo graph-id graph-e2ee?]
(let [graph-e2ee? (normalize-graph-e2ee? graph-e2ee?)]
(set-graph-sync-metadata! repo graph-e2ee?)
(ensure-client-graph-uuid! repo graph-id)
{:graph-id graph-id
:graph-e2ee? graph-e2ee?}))
(defn list-remote-graphs!
[]
(let [base (http-base-url)]
@@ -2151,6 +2182,76 @@
{:response-schema :graphs/list})]
(vec (or (:graphs resp) [])))))))
(defn- <create-remote-graph!
[repo graph-e2ee?]
(let [base (http-base-url)
graph-name (some-> repo common-config/strip-leading-db-version-prefix)
schema-version (some-> (worker-state/get-datascript-conn repo)
deref
ldb/get-graph-schema-version
:major
str)
graph-e2ee? (normalize-graph-e2ee? graph-e2ee?)]
(cond
(not (seq base))
(fail-fast :db-sync/missing-field {:repo repo :field :http-base})
(not (seq graph-name))
(fail-fast :db-sync/missing-field {:repo repo :field :graph-name})
:else
(do
(require-auth-token! {:repo repo :field :auth-token})
(p/let [body (coerce-http-request :graphs/create
{:graph-name graph-name
:schema-version schema-version
:graph-e2ee? graph-e2ee?})
_ (when (nil? body)
(fail-fast :db-sync/invalid-field {:repo repo
:field :create-graph-body}))
result (fetch-json (str base "/graphs")
{:method "POST"
:headers {"content-type" "application/json"}
:body (js/JSON.stringify (clj->js body))}
{:response-schema :graphs/create})
graph-id (:graph-id result)
graph-e2ee? (normalize-graph-e2ee? (if (contains? result :graph-e2ee?)
(:graph-e2ee? result)
graph-e2ee?))]
(when-not (seq graph-id)
(fail-fast :db-sync/missing-field {:repo repo
:field :graph-id
:op :create-graph}))
(persist-upload-graph-identity! repo graph-id graph-e2ee?))))))
(defn- <ensure-upload-graph-identity!
[repo]
(if-let [graph-id (get-graph-id repo)]
(p/resolved {:graph-id graph-id
:graph-e2ee? (normalize-graph-e2ee? (sync-crypt/graph-e2ee? repo))})
(let [target-graph-name (some-> repo common-config/strip-leading-db-version-prefix)
local-graph-e2ee? (normalize-graph-e2ee? (sync-crypt/graph-e2ee? repo))]
(if-not (seq target-graph-name)
(fail-fast :db-sync/missing-field {:repo repo :field :graph-name})
(p/let [remote-graphs (list-remote-graphs!)
matching-graphs (filterv (fn [{:keys [graph-name]}]
(= target-graph-name graph-name))
remote-graphs)]
(cond
(> (count matching-graphs) 1)
(fail-fast :db-sync/ambiguous-graph-match {:repo repo
:graph-name target-graph-name
:match-count (count matching-graphs)})
(= 1 (count matching-graphs))
(let [{:keys [graph-id graph-e2ee?]} (first matching-graphs)]
(persist-upload-graph-identity! repo graph-id (if (contains? (first matching-graphs) :graph-e2ee?)
graph-e2ee?
local-graph-e2ee?)))
:else
(<create-remote-graph! repo local-graph-e2ee?)))))))
(defn- download-graph-with-id!
[repo graph-id graph-e2ee?]
(let [base (http-base-url)
@@ -2200,66 +2301,60 @@
(defn upload-graph!
[repo]
(->
(let [base (http-base-url)
graph-id (get-graph-id repo)
update-progress (fn [payload]
(worker-util/post-message :rtc-log
(merge {:type :rtc.log/upload
:graph-uuid graph-id}
payload)))]
(if (and (seq base) (seq graph-id))
(if-let [source-conn (worker-state/get-datascript-conn repo)]
(let [graph-e2ee? (true? (sync-crypt/graph-e2ee? repo))]
(p/let [aes-key (when graph-e2ee?
(sync-crypt/<ensure-graph-aes-key repo graph-id))
_ (when (and graph-e2ee? (nil? aes-key))
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
(set-graph-sync-metadata! repo graph-e2ee?)
(ensure-client-graph-uuid! repo graph-id)
(p/let [datoms (d/datoms @source-conn :eavt)
datoms* (offload-large-titles-in-datoms repo graph-id datoms aes-key)
_ (update-progress {:sub-type :upload-progress
:message (if graph-e2ee? "Encrypting data" "Preparing data")})
encrypted-datoms (if graph-e2ee?
(sync-crypt/<encrypt-datoms aes-key datoms*)
datoms*)
{:keys [db] :as temp} (<create-temp-sqlite-conn (d/schema @source-conn) encrypted-datoms)
total-rows (count-kvs-rows db)]
(->
(p/loop [last-addr -1
first-batch? true
loaded 0]
(let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)]
(if (empty? rows)
(do
(client-op/remove-local-tx repo)
(client-op/update-local-tx repo 0)
(client-op/add-all-exists-asset-as-ops repo)
(update-progress {:sub-type :upload-completed
:message "Graph upload finished!"})
{:graph-id graph-id})
(let [max-addr (apply max (map first rows))
rows (normalize-snapshot-rows rows)
upload-url (str base "/sync/" graph-id "/snapshot/upload?reset=" (if first-batch? "true" "false"))]
(p/let [{:keys [body encoding]} (<snapshot-upload-body rows)
headers (cond-> {"content-type" snapshot-content-type}
(string? encoding) (assoc "content-encoding" encoding))
_ (fetch-json upload-url
{:method "POST"
:headers headers
:body body}
{:response-schema :sync/snapshot-upload})]
(let [loaded' (+ loaded (count rows))]
(update-progress {:sub-type :upload-progress
:message (str "Uploading " loaded' "/" total-rows)})
(p/recur max-addr false loaded')))))))
(p/finally
(fn []
(cleanup-temp-sqlite! temp)))))))
(p/rejected (ex-info "db-sync missing datascript conn"
{:repo repo :graph-id graph-id})))
(p/rejected (ex-info "db-sync missing upload info"
{:repo repo :base base :graph-id graph-id}))))
(p/catch (fn [error]
(js/console.error error)))))
(let [base (http-base-url)]
(if-not (seq base)
(p/rejected (ex-info "db-sync missing upload info"
{:repo repo :base base :graph-id nil}))
(if-let [source-conn (worker-state/get-datascript-conn repo)]
(p/let [{:keys [graph-id graph-e2ee?]} (<ensure-upload-graph-identity! repo)
aes-key (when graph-e2ee?
(sync-crypt/<ensure-graph-aes-key repo graph-id))
_ (when (and graph-e2ee? (nil? aes-key))
(fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
(let [update-progress (fn [payload]
(worker-util/post-message :rtc-log
(merge {:type :rtc.log/upload
:graph-uuid graph-id}
payload)))]
(p/let [datoms (d/datoms @source-conn :eavt)
datoms* (offload-large-titles-in-datoms repo graph-id datoms aes-key)
_ (update-progress {:sub-type :upload-progress
:message (if graph-e2ee? "Encrypting data" "Preparing data")})
encrypted-datoms (if graph-e2ee?
(sync-crypt/<encrypt-datoms aes-key datoms*)
datoms*)
{:keys [db] :as temp} (<create-temp-sqlite-conn (d/schema @source-conn) encrypted-datoms)
total-rows (count-kvs-rows db)]
(->
(p/loop [last-addr -1
first-batch? true
loaded 0]
(let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)]
(if (empty? rows)
(do
(client-op/remove-local-tx repo)
(client-op/update-local-tx repo 0)
(client-op/add-all-exists-asset-as-ops repo)
(update-progress {:sub-type :upload-completed
:message "Graph upload finished!"})
{:graph-id graph-id})
(let [max-addr (apply max (map first rows))
rows (normalize-snapshot-rows rows)
upload-url (str base "/sync/" graph-id "/snapshot/upload?reset=" (if first-batch? "true" "false"))]
(p/let [{:keys [body encoding]} (<snapshot-upload-body rows)
headers (cond-> {"content-type" snapshot-content-type}
(string? encoding) (assoc "content-encoding" encoding))
resp (js/fetch upload-url
#js {:method "POST"
:headers (clj->js (merge (or (auth-headers) {}) headers))
:body body})
_ (parse-json-response-body resp upload-url :sync/snapshot-upload :error)]
(let [loaded' (+ loaded (count rows))]
(update-progress {:sub-type :upload-progress
:message (str "Uploading " loaded' "/" total-rows)})
(p/recur max-addr false loaded')))))))
(p/finally
(fn []
(cleanup-temp-sqlite! temp)))))))
(p/rejected (ex-info "db-sync missing datascript conn"
{:repo repo}))))))

View File

@@ -28,7 +28,7 @@
(do
(defn post-message
[type data & {:keys [port]}]
(when-let [worker (or port js/self)]
(when-let [worker (or port (when (exists? js/self) js/self))]
(.postMessage worker (ldb/write-transit-str [type data]))))
(defn encode-graph-dir-name

View File

@@ -285,6 +285,46 @@
(poll!)))))]
(poll!))))
(defn- execute-sync-upload
[action config]
(-> (p/let [result (invoke-with-repo config (:repo action)
:thread-api/db-sync-upload-graph
[(:repo action)])]
{:status :ok
:data (if (map? result)
result
{:result result})})
(p/catch (fn [error]
(exception->error error {:repo (:repo action)})))))
(defn- execute-sync-download
[action config]
(let [config' (download-config config)]
(-> (p/let [remote-graphs (invoke-global config'
:thread-api/db-sync-list-remote-graphs
[])
remote-graph (some (fn [graph]
(when (= (:graph action) (:graph-name graph))
graph))
remote-graphs)]
(if-not remote-graph
{:status :error
:error {:code :remote-graph-not-found
:message (str "remote graph not found: " (:graph action))
:graph (:graph action)}}
(p/let [cfg (cli-server/ensure-server! config' (:repo action))
_ (transport/invoke cfg :thread-api/set-db-sync-config false [(sync-config config')])
_ (ensure-empty-download-db! cfg (:repo action))
result (transport/invoke cfg :thread-api/db-sync-download-graph-by-id false
[(:repo action) (:graph-id remote-graph) (:graph-e2ee? remote-graph)])]
{:status :ok
:data (if (map? result)
result
{:result result})})))
(p/catch (fn [error]
(exception->error error {:repo (:repo action)
:graph (:graph action)}))))))
(defn execute
[action config]
(case (:type action)
@@ -312,40 +352,10 @@
:data {:result result}})
:sync-upload
(p/let [result (invoke-with-repo config (:repo action)
:thread-api/db-sync-upload-graph
[(:repo action)])]
{:status :ok
:data (if (map? result)
result
{:result result})})
(execute-sync-upload action config)
:sync-download
(let [config' (download-config config)]
(-> (p/let [remote-graphs (invoke-global config'
:thread-api/db-sync-list-remote-graphs
[])
remote-graph (some (fn [graph]
(when (= (:graph action) (:graph-name graph))
graph))
remote-graphs)]
(if-not remote-graph
{:status :error
:error {:code :remote-graph-not-found
:message (str "remote graph not found: " (:graph action))
:graph (:graph action)}}
(p/let [cfg (cli-server/ensure-server! config' (:repo action))
_ (transport/invoke cfg :thread-api/set-db-sync-config false [(sync-config config')])
_ (ensure-empty-download-db! cfg (:repo action))
result (transport/invoke cfg :thread-api/db-sync-download-graph-by-id false
[(:repo action) (:graph-id remote-graph) (:graph-e2ee? remote-graph)])]
{:status :ok
:data (if (map? result)
result
{:result result})})))
(p/catch (fn [error]
(exception->error error {:repo (:repo action)
:graph (:graph action)})))))
(execute-sync-download action config)
:sync-remote-graphs
(p/let [graphs (invoke-global config :thread-api/db-sync-list-remote-graphs [])]