diff --git a/resources/js/worker.js b/resources/js/worker.js index 530d560ed6..139a3c80ac 100644 --- a/resources/js/worker.js +++ b/resources/js/worker.js @@ -20,6 +20,11 @@ const createFS = () => new LightningFS(fsName); let fs = createFS(); let pfs = fs.promises; +if (typeof self !== 'undefined') { + self.fs = fs; + self.pfs = pfs; +} + if (detect() === 'Worker') { const portal = new MagicPortal(self); portal.set('fs', fs); diff --git a/resources/mobile/js/worker.js b/resources/mobile/js/worker.js index fe041decf9..895dffb6b5 100644 --- a/resources/mobile/js/worker.js +++ b/resources/mobile/js/worker.js @@ -20,6 +20,11 @@ const createFS = () => new LightningFS(fsName); let fs = createFS(); let pfs = fs.promises; +if (typeof self !== 'undefined') { + self.fs = fs; + self.pfs = pfs; +} + if (detect() === 'Worker') { const portal = new MagicPortal(self); portal.set('fs', fs); diff --git a/src/main/frontend/handler/assets.cljs b/src/main/frontend/handler/assets.cljs index 1b94be762c..78e52b50fe 100644 --- a/src/main/frontend/handler/assets.cljs +++ b/src/main/frontend/handler/assets.cljs @@ -1,23 +1,15 @@ (ns ^:no-doc frontend.handler.assets - (:require [cljs-http-missionary.client :as http] - [clojure.string :as string] - [frontend.common.crypt :as crypt] - [frontend.common.missionary :as c.m] - [frontend.common.thread-api :as thread-api :refer [def-thread-api]] + (:require [clojure.string :as string] [frontend.config :as config] [frontend.fs :as fs] [frontend.state :as state] [frontend.util :as util] - [lambdaisland.glogi :as log] [logseq.common.config :as common-config] [logseq.common.path :as path] [logseq.common.util :as common-util] - [logseq.db :as ldb] [logseq.db.frontend.asset :as db-asset] [medley.core :as medley] - [missionary.core :as m] - [promesa.core :as p]) - (:import [missionary Cancelled])) + [promesa.core :as p])) (defn exceed-limit-size? "Asset size no more than 100M" @@ -216,28 +208,6 @@ (p/let [[repo-dir assets-dir] (ensure-assets-dir! (state/get-current-repo))] (path/path-join repo-dir assets-dir filename))) -(defn (p/let [file (js {:type "image"})) - checksum (get-file-checksum blob)] - {:checksum checksum}) - (p/catch (constantly nil)))) - (defn- asset-transfer-in-progress? [progress-entry] (let [{:keys [loaded total]} progress-entry] @@ -268,150 +238,3 @@ repo (:block/uuid asset-block)) true))) - -(defn uint8 asset-file) - asset-file) - asset-file* (if (not aes-key) - asset-file - (ldb/write-transit-str - (c.m/vec @@ -104,6 +106,66 @@ [path text] (opfs/ js/globalThis .-window .-pfs) + (some-> js/globalThis .-pfs) + (throw (ex-info "browser pfs is not available" {})))) + +(defn- graph-assets-dir + [repo] + (when-let [graph-name (some-> repo common-config/strip-leading-db-version-prefix)] + (str "/" graph-name "/assets"))) + +(defn- ensure-pfs-dir! + [^js pfs dir] + (cond + (or (nil? dir) (= "" dir) (= "/" dir) (= "." dir)) + (p/resolved nil) + + :else + (-> (.stat pfs dir) + (p/then (constantly nil)) + (p/catch + (fn [_] + (p/do! + (ensure-pfs-dir! pfs (path/parent dir)) + (.mkdir pfs dir))))))) + +(defn- asset-path + [repo file-name] + (if-let [assets-dir (graph-assets-dir repo)] + (path/path-join assets-dir file-name) + (throw (ex-info "missing repo for browser asset path" + {:repo repo + :file-name file-name})))) + +(defn- asset-read-bytes! + [repo file-name] + (.readFile (browser-pfs) (asset-path repo file-name))) + +(defn- asset-write-bytes! + [repo file-name payload] + (let [^js pfs (browser-pfs) + file-path (asset-path repo file-name)] + (p/do! + (ensure-pfs-dir! pfs (path/parent file-path)) + (.writeFile pfs file-path payload)))) + +(defn- asset-stat + [repo file-name] + (let [^js pfs (browser-pfs)] + (-> (.stat pfs (asset-path repo file-name)) + (p/then (fn [^js stat] + {:size (.-size stat) + :type (.-type stat)})) + (p/catch (constantly nil))))) + +(defn- asset-delete! + [repo file-name] + (-> (.unlink (browser-pfs) (asset-path repo file-name)) + (p/catch (constantly nil)))) + (defn- websocket-connect [url] (js/WebSocket. url)) @@ -133,6 +195,10 @@ :remove-vfs! remove-vfs! :read-text! read-text! :write-text! write-text! + :asset-read-bytes! asset-read-bytes! + :asset-write-bytes! asset-write-bytes! + :asset-stat asset-stat + :asset-delete! asset-delete! :transfer (fn [data transferables] (Comlink/transfer data transferables))} :kv {:get kv-get diff --git a/src/main/frontend/worker/platform/node.cljs b/src/main/frontend/worker/platform/node.cljs index b26a6f8382..872098f65f 100644 --- a/src/main/frontend/worker/platform/node.cljs +++ b/src/main/frontend/worker/platform/node.cljs @@ -260,6 +260,41 @@ _ (ensure-dir! dir)] (fs/writeFile full-path text "utf8")))) +(defn- asset-file-path + [data-dir repo file-name] + (node-path/join (repo-dir data-dir repo) + common-config/local-assets-dir + file-name)) + +(defn- asset-read-bytes! + [data-dir repo file-name] + (fs/readFile (asset-file-path data-dir repo file-name))) + +(defn- asset-write-bytes! + [write-guard-fn data-dir repo file-name payload] + (let [full-path (asset-file-path data-dir repo file-name) + dir (node-path/dirname full-path)] + (p/let [_ (when write-guard-fn + (write-guard-fn)) + _ (ensure-dir! dir)] + (fs/writeFile full-path (->buffer payload))))) + +(defn- asset-stat + [data-dir repo file-name] + (-> (fs/stat (asset-file-path data-dir repo file-name)) + (p/then (fn [^js stat] + {:size (.-size stat) + :is-file? (.isFile stat)})) + (p/catch (constantly nil)))) + +(defn- asset-delete! + [write-guard-fn data-dir repo file-name] + (let [full-path (asset-file-path data-dir repo file-name)] + (p/let [_ (when write-guard-fn + (write-guard-fn))] + (-> (fs/rm full-path #js {:force true}) + (p/catch (constantly nil)))))) + (defn- websocket-connect [url] (ws. url)) @@ -350,7 +385,15 @@ :import-db (fn [pool path data] (import-db write-guard-fn pool path data)) :remove-vfs! (fn [pool] (remove-vfs! write-guard-fn pool)) :read-text! (fn [path] (read-text! data-dir path)) - :write-text! (fn [path text] (write-text! write-guard-fn data-dir path text))} + :write-text! (fn [path text] (write-text! write-guard-fn data-dir path text)) + :asset-read-bytes! (fn [repo file-name] + (asset-read-bytes! data-dir repo file-name)) + :asset-write-bytes! (fn [repo file-name payload] + (asset-write-bytes! write-guard-fn data-dir repo file-name payload)) + :asset-stat (fn [repo file-name] + (asset-stat data-dir repo file-name)) + :asset-delete! (fn [repo file-name] + (asset-delete! write-guard-fn data-dir repo file-name))} :kv {:get (:get kv) :set! (:set! kv)} :broadcast {:post-message! (fn [type payload] diff --git a/src/main/frontend/worker/sync/assets.cljs b/src/main/frontend/worker/sync/assets.cljs index f539da688f..a77e566046 100644 --- a/src/main/frontend/worker/sync/assets.cljs +++ b/src/main/frontend/worker/sync/assets.cljs @@ -3,38 +3,149 @@ (:require [datascript.core :as d] [frontend.common.crypt :as crypt] + [frontend.worker.platform :as platform] + [frontend.worker.shared-service :as shared-service] [frontend.worker.state :as worker-state] [frontend.worker.sync.auth :as sync-auth] [frontend.worker.sync.client-op :as client-op] [frontend.worker.sync.crypt :as sync-crypt] [frontend.worker.sync.large-title :as sync-large-title] [lambdaisland.glogi :as log] + [logseq.common.util :as common-util] [logseq.db :as ldb] [promesa.core :as p])) (def max-asset-size (* 100 1024 1024)) -(defn exported-graph-aes-key +(defn graph-aes-key [repo graph-id fail-fast-f] (if (sync-crypt/graph-e2ee? repo) (p/let [aes-key (sync-crypt/uint8 + [payload] + (cond + (instance? js/Uint8Array payload) + payload + + (instance? js/ArrayBuffer payload) + (js/Uint8Array. payload) + + (and (exists? js/ArrayBuffer) + (.isView js/ArrayBuffer payload)) + (js/Uint8Array. (.-buffer payload) (.-byteOffset payload) (.-byteLength payload)) + + (array? payload) + (js/Uint8Array. payload) + + (sequential? payload) + (js/Uint8Array. (clj->js payload)) + + (and (object? payload) + (= "Buffer" (aget payload "type")) + (array? (aget payload "data"))) + (js/Uint8Array. (aget payload "data")) + + :else + (throw (ex-info "unsupported binary payload" + {:payload-type (str (type payload))})))) + +(defn- payload-size + [payload] + (cond + (string? payload) (count payload) + (some? (.-byteLength payload)) (.-byteLength payload) + (some? (.-length payload)) (.-length payload) + :else 0)) + +(defn- notify-asset-progress! + [repo asset-id direction loaded total] + (shared-service/broadcast-to-clients! + :rtc-asset-upload-download-progress + {:repo repo + :asset-id asset-id + :progress {:direction direction + :loaded loaded + :total total}})) + +(defn- mark-asset-write-finish! + [repo asset-id] + (shared-service/broadcast-to-clients! + :asset-file-write-finish + {:repo repo + :asset-id asset-id + :ts (common-util/time-ms)})) + +(defn- (p/let [aes-key (graph-aes-key + repo graph-id + (fn [tag data] + (throw (ex-info (name tag) data))) ) + asset-id (str asset-uuid) + put-url (sync-large-title/asset-url base graph-id asset-id asset-type) + asset-file (try + (uint8 asset-file) asset-file) + payload (if (not aes-key) + asset-file + (ldb/write-transit-str + (crypt/js {:method "PUT" + :headers headers + :body payload})) + status (.-status resp) + _ (notify-asset-progress! repo asset-id :upload total total)] + (when-not (.-ok resp) + (throw (ex-info "upload-asset failed" + {:type :rtc.exception/upload-asset-failed + :data {:status status}}))) + nil) + (p/catch + (fn [e] + (if (contains? #{:rtc.exception/read-asset-failed + :rtc.exception/upload-asset-failed} + (:type (ex-data e))) + (p/rejected e) + (p/rejected (ex-info "upload-asset failed" + {:type :rtc.exception/upload-asset-failed} + e)))))) (p/rejected (ex-info "missing asset upload info" {:repo repo :asset-uuid asset-uuid @@ -166,18 +277,60 @@ :broadcast-rtc-state!-f broadcast-rtc-state!-f :fail-fast-f fail-fast-f}))) +(defn- parse-content-length + [^js resp] + (when-let [content-length (some-> (.-headers resp) (.get "content-length"))] + (let [length (js/parseInt content-length 10)] + (when (not (js/isNaN length)) + length)))) + (defn download-remote-asset! [repo graph-id asset-uuid asset-type] (let [base (sync-auth/http-base-url @worker-state/*db-sync-config)] (if (and (seq base) (seq graph-id) (seq asset-type)) - (p/let [exported-aes-key (exported-graph-aes-key - repo graph-id - (fn [tag data] - (throw (ex-info (name tag) data))))] - (worker-state/ (p/let [aes-key (graph-aes-key + repo graph-id + (fn [tag data] + (throw (ex-info (name tag) data)))) + asset-id (str asset-uuid) + get-url (sync-large-title/asset-url base graph-id asset-id asset-type) + headers (sync-auth/auth-headers (worker-state/get-id-token)) + request-opts (cond-> {:method "GET"} + (seq headers) (assoc :headers headers)) + ^js resp (js/fetch get-url + (clj->js request-opts)) + status (.-status resp) + _ (when-not (.-ok resp) + (throw (ex-info "download asset failed" + {:type :rtc.exception/download-asset-failed + :data {:status status}}))) + total (or (parse-content-length resp) 0) + _ (notify-asset-progress! repo asset-id :download 0 total) + body (.arrayBuffer resp) + body-size (.-byteLength body) + total' (if (pos? total) total body-size) + _ (notify-asset-progress! repo asset-id :download body-size total') + asset-file + (if (not aes-key) + body + (try + (let [asset-file-untransited (ldb/read-transit-str (.decode (js/TextDecoder.) body))] + (crypt/ (p/let [meta (when (seq asset-type) - (worker-state/ (p/let [meta (when should-download? + (platform/asset-stat (platform/current) + repo + (asset-file-name asset-id asset-type))) + missing-local? (and should-download? (nil? meta)) + _ (when missing-local? + (download-remote-asset! repo graph-id asset-uuid asset-type)) + _ (when missing-local? + (when-let [target-ent (d/entity @conn [:block/uuid asset-uuid])] + (ldb/transact! + conn + [[:db/retract (:db/id target-ent) + :logseq.property.asset/remote-metadata]] + {:persist-op? true}))) + _ (when missing-local? + (client-op/remove-asset-op repo asset-uuid)) + _ (when missing-local? + (broadcast-rtc-state!-f client))] + nil) (p/catch (fn [e] (js/console.error e)))))))))))