handle 3 cases

This commit is contained in:
Tienson Qin
2026-01-07 15:08:59 +08:00
parent f1d15f5c64
commit 3090074fff
8 changed files with 616 additions and 65 deletions

View File

@@ -5,13 +5,23 @@
(def text-decoder (js/TextDecoder.))
(defn- cors-headers []
#js {"access-control-allow-origin" "*"
"access-control-allow-headers" "content-type"
"access-control-allow-methods" "GET,POST,DELETE,OPTIONS"})
(defn json-response
([data] (json-response data 200))
([data status]
(js/Response.
(js/JSON.stringify (clj->js data))
#js {:status status
:headers #js {"content-type" "application/json"}})))
:headers (js/Object.assign
#js {"content-type" "application/json"}
(cors-headers))})))
(defn options-response []
(js/Response. nil #js {:status 204 :headers (cors-headers)}))
(defn bad-request [message]
(json-response {:error message} 400))

View File

@@ -2,8 +2,14 @@
(:require ["cloudflare:workers" :refer [DurableObject]]
[clojure.string :as string]
[datascript.core :as d]
[datascript.impl.entity :as de :refer [Entity]]
[lambdaisland.glogi :as log]
[logseq.db :as ldb]
[logseq.common.config :as common-config]
[logseq.common.util :as common-util]
[logseq.common.util.date-time :as date-time]
[logseq.common.uuid :as common-uuid]
[logseq.db.common.order :as db-order]
[logseq.db.frontend.schema :as db-schema]
[logseq.worker-sync.common :as common]
[logseq.worker-sync.cycle :as cycle]
[logseq.worker-sync.protocol :as protocol]
@@ -12,8 +18,12 @@
(defn- handle-worker-fetch [request ^js env]
(let [url (js/URL. (.-url request))
path (.-pathname url)]
path (.-pathname url)
method (.-method request)]
(cond
(= method "OPTIONS")
(common/options-response)
(= path "/health")
(common/json-response {:ok true})
@@ -26,16 +36,12 @@
(string/starts-with? path "/sync/")
(let [prefix (count "/sync/")
rest-path (subs path prefix)
slash-idx (string/index-of rest-path "/")
graph-id (if (neg? slash-idx) rest-path (subs rest-path 0 slash-idx))
tail (if (neg? slash-idx) "/" (subs rest-path slash-idx))
new-url (str (.-origin url) tail (.-search url))]
graph-id (subs path prefix)]
(if (seq graph-id)
(let [^js namespace (.-LOGSEQ_SYNC_DO env)
do-id (.idFromName namespace graph-id)
stub (.get namespace do-id)
rewritten (js/Request. new-url request)]
rewritten (js/Request. url request)]
(.fetch stub rewritten))
(common/bad-request "missing graph id")))
@@ -80,29 +86,343 @@
:attr (name attr)
:server_values (common/write-transit (cycle/server-values-for db tx-data attr))})
(defn- handle-tx! [^js self tx-data t-before]
(defn- entity->lookup [db entity]
(cond
(vector? entity) entity
(uuid? entity) [:block/uuid entity]
(keyword? entity) [:db/ident entity]
(map? entity)
(or (when-let [uuid (:block/uuid entity)]
[:block/uuid uuid])
(when-let [ident (:db/ident entity)]
[:db/ident ident])
(when-let [eid (:db/id entity)]
(when (vector? eid)
eid)))
(instance? Entity entity)
(or (when-let [uuid (:block/uuid entity)]
[:block/uuid uuid])
(when-let [ident (:db/ident entity)]
[:db/ident ident]))
(number? entity) nil
:else
(when-let [ent (d/entity db entity)]
(or (when-let [uuid (:block/uuid ent)]
[:block/uuid uuid])
(when-let [ident (:db/ident ent)]
[:db/ident ident])))))
(defn- normalize-ref [db ref]
(or (entity->lookup db ref) ref))
(defn- normalize-ref-value [db value]
(cond
(set? value) (into #{} (map (partial normalize-ref db)) value)
(sequential? value) (mapv (partial normalize-ref db) value)
:else (normalize-ref db value)))
(defn- ref-attr? [attr]
(contains? db-schema/ref-type-attributes attr))
(defn- normalize-tx-data [db tx-data]
(mapv
(fn [item]
(cond
(and (vector? item) (#{:db/add :db/retract} (first item)))
(let [[op e a v] item]
(cond-> [op (normalize-ref db e) a v]
(ref-attr? a)
(update 3 (partial normalize-ref-value db))))
(and (vector? item) (#{:db.fn/retractEntity :db/retractEntity} (first item)))
(let [[op e] item]
[op (normalize-ref db e)])
(map? item)
(let [item' (cond-> item
(contains? item :db/id)
(update :db/id (partial normalize-ref db)))]
(reduce
(fn [acc k]
(if (contains? acc k)
(update acc k (partial normalize-ref-value db))
acc))
item'
db-schema/ref-type-attributes))
:else item))
tx-data))
(defn- contains-unstable-entity-id? [tx-data]
(boolean
(some
(fn [item]
(cond
(and (vector? item) (#{:db/add :db/retract} (first item)))
(let [[_ e a v] item]
(or (number? e)
(and (ref-attr? a) (number? v))))
(and (vector? item) (#{:db.fn/retractEntity :db/retractEntity} (first item)))
(let [[_ e] item]
(number? e))
(map? item)
(let [unstable-ref? (fn [value]
(cond
(number? value) true
(sequential? value) (some number? value)
(set? value) (some number? value)
:else false))]
(or (number? (:db/id item))
(some (fn [k]
(when (contains? item k)
(unstable-ref? (get item k))))
db-schema/ref-type-attributes)))
:else false))
tx-data)))
(defn- journal-page-info []
(let [now (common/now-ms)
day (date-time/ms->journal-day now)
formatter (common-config/get-date-formatter nil)
title (date-time/int->journal-title day formatter)
page-uuid (common-uuid/gen-uuid :journal-page-uuid day)]
{:day day
:title title
:name (common-util/page-name-sanity-lc title)
:uuid page-uuid}))
(defn- build-journal-page-tx [db {:keys [day title name uuid]}]
(when (and uuid title name (nil? (d/entity db [:block/uuid uuid])))
[{:block/uuid uuid
:block/title title
:block/name name
:block/journal-day day
:block/tags #{:logseq.class/Journal}
:block/created-at (common/now-ms)
:block/updated-at (common/now-ms)}]))
(defn- max-order-for-parent [db parent-eid]
(reduce
(fn [acc datom]
(let [order (:block/order (d/entity db (:e datom)))]
(if (and order (or (nil? acc) (pos? (compare order acc))))
order
acc)))
nil
(d/datoms db :avet :block/parent parent-eid)))
(defn- attr-updates-from-tx [tx-data attr]
(reduce
(fn [acc tx]
(cond
(and (vector? tx)
(= :db/add (first tx))
(= attr (nth tx 2)))
(conj acc {:entity (nth tx 1)
:value (nth tx 3)})
(and (map? tx) (contains? tx attr))
(let [entity (or (:db/id tx)
(:block/uuid tx)
(:db/ident tx))]
(if (some? entity)
(conj acc {:entity entity
:value (get tx attr)})
acc))
:else acc))
[]
tx-data))
(defn- collect-ident-refs [tx-data]
(let [add-ident (fn [acc value]
(cond
(keyword? value) (conj acc value)
(and (vector? value) (= :db/ident (first value)) (keyword? (second value)))
(conj acc (second value))
:else acc))
add-ident-coll (fn [acc value]
(cond
(set? value) (reduce add-ident acc value)
(sequential? value) (reduce add-ident acc value)
:else (add-ident acc value)))]
(reduce
(fn [acc item]
(cond
(and (vector? item) (#{:db/add :db/retract} (first item)))
(let [[_ e a v] item
acc (add-ident acc e)]
(if (ref-attr? a)
(add-ident-coll acc v)
acc))
(and (vector? item) (#{:db.fn/retractEntity :db/retractEntity} (first item)))
(let [[_ e] item]
(add-ident acc e))
(map? item)
(let [acc (if (contains? item :db/ident)
(add-ident acc (:db/ident item))
acc)]
(reduce
(fn [acc k]
(if (contains? item k)
(add-ident-coll acc (get item k))
acc))
acc
db-schema/ref-type-attributes))
:else acc))
#{}
tx-data)))
(defn- ensure-ident-tx [db tx-data]
(let [idents (collect-ident-refs tx-data)]
(reduce
(fn [acc ident]
(if (d/entid db [:db/ident ident])
acc
(conj acc {:db/ident ident})))
[]
idents)))
(defn- ensure-idents [db tx-data]
(let [ident-tx (ensure-ident-tx db tx-data)]
(if (seq ident-tx)
{:db (d/db-with db ident-tx)
:tx-data (into ident-tx tx-data)}
{:db db
:tx-data tx-data})))
(defn- fix-missing-parent [db tx-data]
(let [db' (d/db-with db tx-data)
updates (attr-updates-from-tx tx-data :block/parent)
journal (journal-page-info)
journal-ref [:block/uuid (:uuid journal)]
journal-tx (build-journal-page-tx db' journal)
db'' (if (seq journal-tx) (d/db-with db' journal-tx) db')
parent-eid (d/entid db'' journal-ref)
max-order (when parent-eid (max-order-for-parent db'' parent-eid))
max-atom (atom max-order)
fixes (reduce
(fn [acc {:keys [entity value]}]
(let [entity-ref (normalize-ref db'' entity)
parent-ref (normalize-ref db'' value)
eid (d/entid db'' entity-ref)
parent-eid' (when parent-ref (d/entid db'' parent-ref))]
(if (and eid (some? value) (nil? parent-eid'))
(let [order (db-order/gen-key @max-atom nil :max-key-atom max-atom)]
(conj acc
[:db/add entity-ref :block/parent journal-ref]
[:db/add entity-ref :block/page journal-ref]
[:db/add entity-ref :block/order order]))
acc)))
[]
updates)]
(cond-> tx-data
(seq journal-tx) (into journal-tx)
(seq fixes) (into fixes))))
(defn- fix-duplicate-orders [db tx-data]
(let [db' (d/db-with db tx-data)
updates (attr-updates-from-tx tx-data :block/order)
max-order-atoms (atom {})
fixes (reduce
(fn [acc {:keys [entity value]}]
(let [entity-ref (normalize-ref db' entity)
eid (d/entid db' entity-ref)
parent (when eid (:block/parent (d/entity db' eid)))
parent-eid (when parent (d/entid db' (normalize-ref db' parent)))]
(if (and eid parent-eid value)
(let [siblings (d/datoms db' :avet :block/parent parent-eid)
same-order? (some
(fn [datom]
(let [sib-eid (:e datom)]
(and (not= sib-eid eid)
(= value (:block/order (d/entity db' sib-eid))))))
siblings)]
(if same-order?
(let [max-atom (or (get @max-order-atoms parent-eid)
(let [max-order (max-order-for-parent db' parent-eid)
created (atom max-order)]
(swap! max-order-atoms assoc parent-eid created)
created))
order (db-order/gen-key @max-atom nil :max-key-atom max-atom)]
(conj acc [:db/add entity-ref :block/order order]))
acc))
acc)))
[]
updates)]
(if (seq fixes)
(into tx-data fixes)
tx-data)))
(defn ^:test normalize-tx-data* [db tx-data]
(normalize-tx-data db tx-data))
(defn ^:test contains-unstable-entity-id?* [tx-data]
(contains-unstable-entity-id? tx-data))
(defn ^:test fix-missing-parent* [db tx-data]
(fix-missing-parent db tx-data))
(defn ^:test fix-duplicate-orders* [db tx-data]
(fix-duplicate-orders db tx-data))
(defn- apply-tx! [^js self tx-data]
(let [sql (.-sql self)
conn (.-conn self)
current-t (t-now self)]
(cond
(and (number? t-before) (not= t-before current-t))
db @conn
normalized (normalize-tx-data db tx-data)]
(if (contains-unstable-entity-id? normalized)
{:type "tx/reject"
:reason "stale"
:t current-t}
:else
(let [db @conn
cycle-info (cycle/detect-cycle db tx-data)]
:reason "unstable-id"}
(let [ ;; {:keys [db tx-data]} (ensure-idents db normalized)
parent-fixed (fix-missing-parent db tx-data)
order-fixed (fix-duplicate-orders db parent-fixed)
cycle-info (cycle/detect-cycle db order-fixed)]
(if cycle-info
(cycle-reject-response db tx-data cycle-info)
(let [_ (ldb/transact! conn tx-data)
(cycle-reject-response db order-fixed cycle-info)
(let [_ (d/transact! conn order-fixed)
new-t (storage/next-t! sql)
created-at (common/now-ms)
tx-str (common/write-transit tx-data)]
tx-str (common/write-transit order-fixed)]
(storage/append-tx! sql new-t tx-str created-at)
{:type "tx/ok"
:t new-t}))))))
(defn- handle-tx! [^js self tx-data t-before]
(let [current-t (t-now self)]
(if (and (number? t-before) (not= t-before current-t))
{:type "tx/reject"
:reason "stale"
:t current-t}
(apply-tx! self tx-data))))
(defn- handle-tx-batch! [^js self txs t-before]
(let [current-t (t-now self)]
(if (and (number? t-before) (not= t-before current-t))
{:type "tx/reject"
:reason "stale"
:t current-t}
(loop [idx 0]
(if (>= idx (count txs))
{:type "tx/batch/ok"
:t (t-now self)
:count (count txs)}
(let [tx-data (protocol/transit->tx (nth txs idx))]
(if (sequential? tx-data)
(let [result (apply-tx! self tx-data)]
(if (= "tx/ok" (:type result))
(recur (inc idx))
(assoc result :index idx)))
{:type "tx/reject"
:reason "invalid tx"
:index idx})))))))
(defn- handle-ws-message! [^js self ^js ws raw]
(let [message (protocol/parse-message raw)]
(if-not (map? message)
@@ -128,6 +448,13 @@
(send! ws (handle-tx! self tx-data t-before))
(send! ws {:type "tx/reject" :reason "invalid tx"})))
"tx/batch"
(let [txs (:txs message)
t-before (parse-int (:t_before message))]
(if (and (sequential? txs) (every? string? txs))
(send! ws (handle-tx-batch! self txs t-before))
(send! ws {:type "tx/reject" :reason "invalid tx"})))
(send! ws {:type "error" :message "unknown type"})))))
(defn- handle-ws [^js self request]
@@ -151,7 +478,12 @@
(let [url (js/URL. (.-url request))
path (.-pathname url)
method (.-method request)]
(prn :debug :path path
:method method)
(cond
(= method "OPTIONS")
(common/options-response)
(and (= method "GET") (= path "/health"))
(common/json-response {:ok true})
@@ -181,6 +513,17 @@
(common/json-response (handle-tx! self tx-data t-before))
(common/bad-request "invalid tx"))))))
(and (= method "POST") (= path "/tx/batch"))
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(common/bad-request "missing body")
(let [txs (js->clj (aget result "txs"))
t-before (parse-int (aget result "t_before"))]
(if (and (sequential? txs) (every? string? txs))
(common/json-response (handle-tx-batch! self txs t-before))
(common/bad-request "invalid tx"))))))
:else
(common/not-found))))
@@ -241,6 +584,9 @@
method (.-method request)]
(index-init! sql)
(cond
(= method "OPTIONS")
(common/options-response)
(and (= method "GET") (= path "/graphs"))
(common/json-response {:graphs (index-list sql)})

View File

@@ -0,0 +1,66 @@
(ns logseq.worker-sync.worker-test
(:require [cljs.test :refer [deftest is testing]]
[datascript.core :as d]
[logseq.common.util.date-time :as date-time]
[logseq.common.uuid :as common-uuid]
[logseq.db.common.order :as db-order]
[logseq.db.frontend.schema :as db-schema]
[logseq.worker-sync.common :as common]
[logseq.worker-sync.worker :as worker]))
(defn- new-conn []
(d/create-conn db-schema/schema))
(deftest missing-parent-fallback-test
(let [conn (new-conn)
child (random-uuid)
missing (random-uuid)
fixed-ms 1700000000000
journal-day (date-time/ms->journal-day fixed-ms)
journal-uuid (common-uuid/gen-uuid :journal-page-uuid journal-day)
tx [{:block/uuid child :block/parent [:block/uuid missing]}]]
(with-redefs [common/now-ms (constantly fixed-ms)]
(let [fixed (worker/fix-missing-parent* @conn tx)
db' (d/db-with @conn fixed)
child-entity (d/entity db' [:block/uuid child])
journal-entity (d/entity db' [:block/uuid journal-uuid])]
(testing "missing parent moves block to today's journal"
(is (= journal-uuid (:block/uuid (:block/parent child-entity))))
(is (= journal-uuid (:block/uuid (:block/page child-entity))))
(is (string? (:block/order child-entity))))
(testing "journal page exists"
(is (= journal-day (:block/journal-day journal-entity)))
(is (= journal-uuid (:block/uuid journal-entity))))))))
(deftest duplicate-order-fix-test
(let [conn (new-conn)
parent (random-uuid)
block-a (random-uuid)
block-b (random-uuid)
order-a (db-order/gen-key nil nil)
order-b (db-order/gen-key order-a nil)]
(d/transact! conn [{:block/uuid parent}
{:block/uuid block-a
:block/parent [:block/uuid parent]
:block/order order-a}
{:block/uuid block-b
:block/parent [:block/uuid parent]
:block/order order-b}])
(let [tx [{:block/uuid block-b :block/order order-a}]
fixed (worker/fix-duplicate-orders* @conn tx)
db' (d/db-with @conn fixed)
order-a' (:block/order (d/entity db' [:block/uuid block-a]))
order-b' (:block/order (d/entity db' [:block/uuid block-b]))]
(is (= order-a order-a'))
(is (not= order-a' order-b')))))
(deftest unstable-id-reject-test
(let [block-id (random-uuid)]
(testing "numeric entity ids are rejected"
(is (true? (worker/contains-unstable-entity-id?* [[:db/add 1 :block/uuid block-id]])))
(is (true? (worker/contains-unstable-entity-id?* [{:db/id 2 :block/uuid block-id}]))))
(testing "numeric ref values are rejected"
(is (true? (worker/contains-unstable-entity-id?* [[:db/add [:block/uuid block-id] :block/parent 42]]))))
(testing "lookup refs are accepted"
(is (false? (worker/contains-unstable-entity-id?* [[:db/add [:block/uuid block-id] :block/parent [:block/uuid (random-uuid)]]])))
(is (false? (worker/contains-unstable-entity-id?* [{:block/uuid block-id}]))))))

View File

@@ -55,13 +55,14 @@
(def RTC-WS-URL "wss://ws.logseq.com/rtc-sync?token=%s")
(def RTC-WS-URL "wss://ws-dev.logseq.com/rtc-sync?token=%s"))
(goog-define ENABLE-WORKER-SYNC false)
;; (goog-define ENABLE-WORKER-SYNC false)
(goog-define ENABLE-WORKER-SYNC true)
(defonce worker-sync-enabled? ENABLE-WORKER-SYNC)
(goog-define WORKER-SYNC-WS-URL "wss://sync-dev.logseq.com/sync/%s")
(goog-define WORKER-SYNC-WS-URL "ws://127.0.0.1:8787/sync/%s")
(defonce worker-sync-ws-url WORKER-SYNC-WS-URL)
(goog-define WORKER-SYNC-HTTP-BASE "https://sync-dev.logseq.com")
(goog-define WORKER-SYNC-HTTP-BASE "http://127.0.0.1:8787")
(defonce worker-sync-http-base WORKER-SYNC-HTTP-BASE)
;; Feature flags
;; =============

View File

@@ -7,6 +7,7 @@
[frontend.state :as state]
[lambdaisland.glogi :as log]
[logseq.db :as ldb]
[logseq.db.sqlite.util :as sqlite-util]
[promesa.core :as p]))
(defn- ws->http-base [ws-url]
@@ -29,8 +30,10 @@
(defn- get-graph-id [repo]
(let [db (db/get-db repo)]
(or (ldb/get-graph-rtc-uuid db)
;; FIXME: only for testing
(random-uuid))))
(ldb/get-graph-local-uuid db)
(let [new-id (random-uuid)]
(ldb/transact! repo [(sqlite-util/kv :logseq.kv/local-graph-uuid new-id)])
new-id))))
(defn- fetch-json
[url opts]
@@ -71,7 +74,8 @@
#js {:graph_id (str graph-id)
:graph_name repo
:schema_version schema-version})})]
(ldb/transact! repo [{:logseq.kv/graph-uuid graph-id}])
(ldb/transact! repo [(sqlite-util/kv :logseq.kv/db-type "db")
(sqlite-util/kv :logseq.kv/graph-uuid graph-id)])
result)
(p/rejected (ex-info "worker-sync missing graph info"
{:type :worker-sync/invalid-graph

View File

@@ -278,20 +278,23 @@
(swap! *client-ops-conns assoc repo client-ops-conn)
(when (and (not @*publishing?) (not= client-op/schema-in-db (d/schema @client-ops-conn)))
(d/reset-schema! client-ops-conn client-op/schema-in-db))
(when (and db-based? (not initial-data-exists?) (not datoms))
(let [config (or config "")
initial-data (sqlite-create-graph/build-db-initial-data
config (select-keys opts [:import-type :graph-git-sha]))]
(ldb/transact! conn initial-data {:initial-db? true})))
(let [initial-tx-report (when (and db-based? (not initial-data-exists?) (not datoms))
(let [config (or config "")
initial-data (sqlite-create-graph/build-db-initial-data
config (select-keys opts [:import-type :graph-git-sha]))]
(ldb/transact! conn initial-data {:initial-db? true})))
initial-tx-data (:tx-data initial-tx-report)]
(gc-sqlite-dbs! db client-ops-db debug-log-db conn {})
(gc-sqlite-dbs! db client-ops-db debug-log-db conn {})
(let [migration-result (db-migrate/migrate conn)]
(when (client-op/rtc-db-graph? repo)
(let [client-ops (rtc-migrate/migration-results=>client-ops migration-result)]
(client-op/add-ops! repo client-ops))))
(let [migration-result (db-migrate/migrate conn)]
(when (client-op/rtc-db-graph? repo)
(let [client-ops (rtc-migrate/migration-results=>client-ops migration-result)]
(client-op/add-ops! repo client-ops))))
(when initial-tx-data
(worker-sync/handle-local-tx! repo initial-tx-data {:initial-db? true}))
(db-listener/listen-db-changes! repo (get @*datascript-conns repo))))))
(db-listener/listen-db-changes! repo (get @*datascript-conns repo)))))))
(defn- iter->vec [iter']
(when iter'

View File

@@ -96,7 +96,9 @@
:db-ident {:db/unique :db.unique/identity}
:db-ident-or-block-uuid {:db/unique :db.unique/identity}
:local-tx {:db/index true}
:graph-uuid {:db/index true}})
:graph-uuid {:db/index true}
:worker-sync/tx-id {:db/unique :db.unique/identity}
:worker-sync/created-at {:db/index true}})
(defn update-graph-uuid
[repo graph-uuid]

View File

@@ -33,9 +33,16 @@
(when-let [conn (worker-state/get-datascript-conn repo)]
(let [db @conn
graph-uuid (ldb/get-graph-rtc-uuid db)
local-uuid (ldb/get-graph-local-uuid db)]
local-uuid (ldb/get-graph-local-uuid db)
new-local (when (and (nil? graph-uuid) (nil? local-uuid))
(random-uuid))]
(when new-local
(try
(d/transact! conn [(sqlite-util/kv :logseq.kv/local-graph-uuid new-local)])
(catch :default e
(log/error :worker-sync/graph-uuid-write-failed {:error e}))))
(or (some-> graph-uuid str)
(some-> local-uuid str)
(some-> (or local-uuid new-local) str)
(when (string? repo) repo)))))
(defn- ready-state [ws]
@@ -48,6 +55,46 @@
(when (ws-open? ws)
(.send ws (js/JSON.stringify (clj->js message)))))
(defn- entity->lookup [db eid]
(when-let [entity (and (number? eid) (d/entity db eid))]
(or (when-let [uuid (:block/uuid entity)]
[:block/uuid uuid])
(when-let [ident (:db/ident entity)]
[:db/ident ident]))))
(defn- normalize-ref [db ref]
(cond
(number? ref) (or (entity->lookup db ref) ref)
(instance? Entity ref) (or (when-let [uuid (:block/uuid ref)]
[:block/uuid uuid])
(when-let [ident (:db/ident ref)]
[:db/ident ident]))
:else ref))
(defn- normalize-tx-data [db tx-data]
(mapv
(fn [item]
(cond
(and (vector? item) (#{:db/add :db/retract} (first item)))
(let [[op e a v] item]
[op (normalize-ref db e) a (normalize-ref db v)])
(map? item)
(let [item' (if (contains? item :db/id)
(let [lookup (normalize-ref db (:db/id item))]
(cond-> (dissoc item :db/id)
lookup (assoc :block/uuid (second lookup))))
item)]
(cond-> item'
(contains? item' :block/parent)
(update :block/parent (partial normalize-ref db))
(contains? item' :block/page)
(update :block/page (partial normalize-ref db))))
:else item))
tx-data))
(defn- parse-message [raw]
(try
(js->clj (js/JSON.parse raw) :keywordize-keys true)
@@ -99,18 +146,42 @@
(when (seq tx-data)
(d/transact! conn tx-data {:worker-sync/remote? true})))))
(declare flush-pending!)
(declare remove-pending-txs!)
(defn- handle-message! [repo client raw]
(when-let [message (parse-message raw)]
(case (:type message)
"hello" (update-server-t! client (:t message))
"tx/ok" (update-server-t! client (:t message))
"hello" (do
(update-server-t! client (:t message))
(flush-pending! repo client))
"tx/ok" (do
(update-server-t! client (:t message))
(remove-pending-txs! repo @(:inflight client))
(reset! (:inflight client) [])
(flush-pending! repo client))
"tx/batch/ok" (do
(update-server-t! client (:t message))
(remove-pending-txs! repo @(:inflight client))
(reset! (:inflight client) [])
(flush-pending! repo client))
"tx/reject" (do
(when (= "stale" (:reason message))
(update-server-t! client (:t message)))
(if-let [index (:index message)]
(let [inflight @(:inflight client)
succeeded (subvec inflight 0 (min index (count inflight)))]
(remove-pending-txs! repo succeeded)
(when-not (= "stale" (:reason message))
(let [failure (when (< index (count inflight)) [(nth inflight index)])]
(remove-pending-txs! repo failure))))
(when-not (= "stale" (:reason message))
(remove-pending-txs! repo @(:inflight client))))
(reset! (:inflight client) [])
(when (= "cycle" (:reason message))
(let [attr (keyword (:attr message))
server-values (sqlite-util/read-transit-str (:server_values message))]
(reconcile-cycle! repo attr server-values))))
(reconcile-cycle! repo attr server-values)))
(flush-pending! repo client))
"pull/ok" (do
(update-server-t! client (:t message))
(doseq [{:keys [tx]} (:txs message)]
@@ -123,10 +194,62 @@
(or (get @worker-state/*worker-sync-clients repo)
(let [client {:repo repo
:server-t (atom 0)
:send-queue (atom (p/resolved nil))}]
:send-queue (atom (p/resolved nil))
:inflight (atom [])}]
(swap! worker-state/*worker-sync-clients assoc repo client)
client)))
(defn- client-ops-conn [repo]
(worker-state/get-client-ops-conn repo))
(defn- persist-local-tx! [repo tx-str]
(when-let [conn (client-ops-conn repo)]
(let [tx-id (random-uuid)
now (.now js/Date)]
(d/transact! conn [{:worker-sync/tx-id tx-id
:worker-sync/tx tx-str
:worker-sync/created-at now}])
tx-id)))
(defn- pending-txs
[repo limit]
(when-let [conn (client-ops-conn repo)]
(let [db @conn
datoms (d/datoms db :avet :worker-sync/created-at)]
(->> datoms
(map (fn [datom]
(d/entity db (:e datom))))
(keep (fn [ent]
(when-let [tx-id (:worker-sync/tx-id ent)]
{:tx-id tx-id
:tx (:worker-sync/tx ent)})))
(take limit)
(vec)))))
(defn- remove-pending-txs!
[repo tx-ids]
(when (seq tx-ids)
(when-let [conn (client-ops-conn repo)]
(d/transact! conn
(mapv (fn [tx-id]
[:db.fn/retractEntity [:worker-sync/tx-id tx-id]])
tx-ids)))))
(defn- flush-pending!
[repo client]
(let [inflight @(:inflight client)]
(when (empty? inflight)
(when-let [ws (:ws client)]
(when (ws-open? ws)
(let [batch (pending-txs repo 50)]
(when (seq batch)
(let [tx-ids (mapv :tx-id batch)
txs (mapv :tx batch)]
(reset! (:inflight client) tx-ids)
(send! ws {:type "tx/batch"
:t_before @(:server-t client)
:txs txs})))))))))
(defn- attach-ws-handlers! [repo client ws]
(set! (.-onmessage ws)
(fn [event]
@@ -189,26 +312,22 @@
(p/resolved nil)))
(defn enqueue-local-tx!
[repo tx-data]
[repo tx-data tx-meta]
(when-let [client (get @worker-state/*worker-sync-clients repo)]
(let [send-queue (:send-queue client)
normalized (mapv (fn [item]
(if (and (map? item) (contains? item :e) (contains? item :a))
(if (:added item)
[:db/add (:e item) (:a item) (:v item)]
[:db/retract (:e item) (:a item) (:v item)])
item))
tx-data)
tx-str (sqlite-util/write-transit-str normalized)]
(swap! send-queue
(fn [prev]
(p/then prev
(fn [_]
(when-let [ws (:ws (get @worker-state/*worker-sync-clients repo))]
(when (ws-open? ws)
(send! ws {:type "tx"
:t_before @(:server-t client)
:tx tx-str}))))))))))
conn (worker-state/get-datascript-conn repo)
db (some-> conn deref)]
(when db
(let [normalized (if (:initial-db? tx-meta) tx-data (normalize-tx-data db tx-data))
tx-str (sqlite-util/write-transit-str normalized)]
(persist-local-tx! repo tx-str)
(swap! send-queue
(fn [prev]
(p/then prev
(fn [_]
(when-let [ws (:ws (get @worker-state/*worker-sync-clients repo))]
(when (ws-open? ws)
(flush-pending! repo client))))))))))))
(defn handle-local-tx!
[repo tx-data tx-meta]
@@ -217,4 +336,4 @@
(not (:worker-sync/remote? tx-meta))
(not (:rtc-download-graph? tx-meta))
(not (:from-disk? tx-meta)))
(enqueue-local-tx! repo tx-data)))
(enqueue-local-tx! repo tx-data tx-meta)))