diff --git a/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs b/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs index 37c665bbc4..b0a43ab3fb 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs @@ -27,18 +27,63 @@ (defn- js value))) +(def ^:private events-meta-key "events.meta") + +(defn- events-item-key [idx] + (str "events." idx)) + +(defn- valid-events-meta? [meta] + (and (map? meta) + (integer? (:count meta)) + (<= 0 (:count meta)))) + +(defn- > events + (remove nil?) + vec)))) + []))) (defn- clj % :keywordize-keys true))) -(defn- json-size [value] - (.-length (js/JSON.stringify value))) - (deftest messages-use-single-events-stream-and-dont-duplicate-user-message-test (testing "session messages post to /messages while keeping one /events/sse stream and no audit message payload" (async done @@ -323,8 +320,6 @@ :audit {} :created-at 0 :updated-at 0})) - (.then (fn [_] - (.put (.-storage self) "events" #js []))) (.then (fn [_] (agent-do/handle-fetch self (json-request "http://db-sync.local/__session__/stream" @@ -446,7 +441,6 @@ :type "agent.runtime" :ts 1 :data {:payload payload}} - initial-events (clj->js [initial-event]) {:keys [self data]} (make-limited-self #js {"AGENT_SESSION_EVENTS_MAX_BYTES" "256"} 500000) headers {"content-type" "application/json" "x-user-id" "user-1"}] @@ -459,7 +453,7 @@ :created-at 0 :updated-at 0})) (.then (fn [_] - (.put (.-storage self) "events" initial-events))) + (#'agent-do/clj stored-events :keywordize-keys true)] - (is (= (:event-id initial-event) - (:event-id (first events)))) - (is (= "audit.log" (:type (last events))))) - (done))) + (agent-do/handle-fetch self + (json-request "http://db-sync.local/__session__/events" + "GET" + nil + {"x-user-id" "user-1"})))) + (.then (fn [events-resp] + (.then (clj (.get data "events.meta") :keywordize-keys true)] + (is (= (:event-id initial-event) + (:event-id (first events)))) + (is (= "audit.log" (:type (last events)))) + (is (= {:count 2} meta))) + (done))))) + (.catch (fn [error] + (is false (str "unexpected error: " error)) + (done)))))))) + +(deftest get-events-ignores-legacy-events-key-test + (testing "legacy `events` key should not be used when indexed storage metadata is missing" + (async done + (let [legacy-event {:event-id "legacy-e1" + :session-id "sess-legacy" + :type "agent.runtime" + :ts 1 + :data {:payload "legacy"}} + {:keys [self]} (make-limited-self #js {} 500000) + headers {"content-type" "application/json" + "x-user-id" "user-1"}] + (-> (.put (.-storage self) + "session" + (clj->js {:id "sess-legacy" + :status "running" + :task {} + :audit {} + :created-at 0 + :updated-at 0})) + (.then (fn [_] + ;; Seed only legacy key; new implementation should ignore it. + (.put (.-storage self) "events" (clj->js [legacy-event])))) + (.then (fn [_] + (agent-do/handle-fetch self + (json-request "http://db-sync.local/__session__/events" + "GET" + nil + headers)))) + (.then (fn [events-resp] + (.then ( (.put (.-storage self) "session" (clj->js {:id "sess-huge" @@ -492,21 +531,55 @@ :audit {} :created-at 0 :updated-at 0})) - (.then (fn [_] - (.put (.-storage self) "events" #js []))) (.then (fn [_] (#'agent-do/clj stored-events :keywordize-keys true) + (let [events (vec res) first-event (first events)] (is (= "agent.runtime" (:type first-event))) (is (= huge-payload (get-in first-event [:data :output]))) - (is (not (contains? (:data first-event) :truncated)))) + (is (not (contains? (:data first-event) :truncated))) + (is (= {:count 1} + (js->clj (.get data "events.meta") :keywordize-keys true)))) + (done))) + (.catch (fn [error] + (is false (str "unexpected error: " error)) + (done)))))))) + +(deftest append-event-avoids-sqlite-toobig-by-indexed-storage-test + (testing "large event history should persist without writing one oversized events blob" + (async done + (let [payload (apply str (repeat 1200 "z")) + {:keys [self data]} (make-limited-self #js {} 5000)] + (-> (.put (.-storage self) + "session" + (clj->js {:id "sess-many-events" + :status "running" + :task {} + :audit {} + :created-at 0 + :updated-at 0})) + (.then (fn [_] + (reduce (fn [promise idx] + (.then promise + (fn [_] + (#'agent-do/clj (.get data "events.meta") :keywordize-keys true)] + (is (= 8 (count events))) + (is (= {:count 8} meta)) + (is (nil? (.get data "events"))) + (is (some? (.get data "events.7")))) (done))) (.catch (fn [error] (is false (str "unexpected error: " error))