diff --git a/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs b/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs index df225f9d18..e0f35d2baa 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs @@ -27,6 +27,83 @@ (defn- js value))) +(def ^:private default-max-events-storage-bytes (* 768 1024)) +(def ^:private default-max-events-retained 2000) +(def ^:private default-max-event-data-bytes (* 64 1024)) +(def ^:private default-max-event-preview-chars 4096) + +(defn- json-string [value] + (try + (js/JSON.stringify (clj->js value)) + (catch :default _ + nil))) + +(defn- json-size [value] + (some-> (json-string value) (.-length))) + +(defn- truncate-string + [s max-chars] + (if (> (count s) max-chars) + (str (subs s 0 max-chars) + "...[truncated " + (- (count s) max-chars) + " chars]") + s)) + +(defn- parse-positive-int + [value] + (let [n (js/parseInt (str value) 10)] + (when (and (js/Number.isFinite n) (pos? n)) + n))) + +(defn- storage-limits + [^js self] + (let [env (.-env self)] + {:max-events-storage-bytes (or (some-> (aget env "AGENT_SESSION_EVENTS_MAX_BYTES") + parse-positive-int) + default-max-events-storage-bytes) + :max-events-retained (or (some-> (aget env "AGENT_SESSION_EVENTS_MAX_RETAINED") + parse-positive-int) + default-max-events-retained) + :max-event-data-bytes (or (some-> (aget env "AGENT_SESSION_EVENT_DATA_MAX_BYTES") + parse-positive-int) + default-max-event-data-bytes) + :max-event-preview-chars (or (some-> (aget env "AGENT_SESSION_EVENT_PREVIEW_MAX_CHARS") + parse-positive-int) + default-max-event-preview-chars)})) + +(defn- compact-event-data + [data {:keys [max-event-data-bytes max-event-preview-chars]}] + (let [size (json-size data)] + (if (and (number? size) (> size max-event-data-bytes)) + (let [raw (or (json-string data) (str data))] + {:truncated true + :reason "event data too large" + :size size + :preview (truncate-string raw max-event-preview-chars)}) + data))) + +(defn- compact-event + [event limits] + (update event :data compact-event-data limits)) + +(defn- events-for-storage + [events {:keys [max-events-storage-bytes max-events-retained] :as limits}] + (let [events (->> events + (map #(compact-event % limits)) + (take-last max-events-retained) + vec)] + (loop [idx (dec (count events)) + acc '() + total-bytes 2] ; [] + (if (neg? idx) + (vec acc) + (let [event (nth events idx) + event-size (or (json-size event) max-events-storage-bytes)] + (if (> (+ total-bytes event-size) max-events-storage-bytes) + (recur (dec idx) acc total-bytes) + (recur (dec idx) (conj acc event) (+ total-bytes event-size)))))))) + (defn- (.-length (js/JSON.stringify v)) max-bytes) + (js/Promise.reject (js/Error. "string or blob too big: SQLITE_TOOBIG Error")) + (do + (.set data k v) + (js/Promise.resolve nil))))}})) + (defn- make-self [env] #js {:env env :storage (make-agent-storage) :streams (js/Map.)}) +(defn- make-limited-self [env max-bytes] + (let [{:keys [data storage]} (make-limited-agent-storage max-bytes)] + {:self #js {:env env + :storage storage + :streams (js/Map.)} + :data data})) + (defn- json-request [url method body headers] (let [^js req-headers (js/Headers.)] @@ -30,6 +49,9 @@ (defn- clj % :keywordize-keys true))) +(defn- json-size [value] + (.-length (js/JSON.stringify value))) + (deftest messages-use-single-events-stream-and-dont-duplicate-user-message-test (testing "session messages post to /messages while keeping one /events/sse stream and no audit message payload" (async done @@ -414,3 +436,79 @@ (set! js/fetch original-fetch) (is false (str "unexpected error: " error)) (done)))))))) + +(deftest messages-persists-when-existing-events-near-storage-limit-test + (testing "message append should succeed and keep persisted events under size limits" + (async done + (let [payload (apply str (repeat 1500 "x")) + initial-events (clj->js [{:event-id "e1" + :session-id "sess-cap" + :type "agent.runtime" + :ts 1 + :data {:payload payload}}]) + cap (+ (json-size initial-events) 16) + {:keys [self data]} (make-limited-self #js {"AGENT_SESSION_EVENTS_MAX_BYTES" (str cap)} cap) + headers {"content-type" "application/json" + "x-user-id" "user-1"}] + (-> (.put (.-storage self) + "session" + (clj->js {:id "sess-cap" + :status "running" + :task {} + :audit {} + :created-at 0 + :updated-at 0})) + (.then (fn [_] + (.put (.-storage self) "events" initial-events))) + (.then (fn [_] + (agent-do/handle-fetch self + (json-request "http://db-sync.local/__session__/messages" + "POST" + {:message "hello"} + headers)))) + (.then (fn [resp] + (is (= 200 (.-status resp))) + (let [stored-events (.get data "events") + events (js->clj stored-events :keywordize-keys true)] + (is (<= (json-size stored-events) cap)) + (is (= "audit.log" (:type (last events))))) + (done))) + (.catch (fn [error] + (is false (str "unexpected error: " error)) + (done)))))))) + +(deftest append-event-truncates-oversized-event-data-test + (testing "single oversized event payload should be persisted in bounded form" + (async done + (let [cap 1800 + huge-payload (apply str (repeat 5000 "y")) + {:keys [self data]} (make-limited-self #js {"AGENT_SESSION_EVENTS_MAX_BYTES" (str cap) + "AGENT_SESSION_EVENT_DATA_MAX_BYTES" "512" + "AGENT_SESSION_EVENT_PREVIEW_MAX_CHARS" "120"} cap)] + (-> (.put (.-storage self) + "session" + (clj->js {:id "sess-huge" + :status "running" + :task {} + :audit {} + :created-at 0 + :updated-at 0})) + (.then (fn [_] + (.put (.-storage self) "events" #js []))) + (.then (fn [_] + (#'agent-do/clj stored-events :keywordize-keys true) + first-event (first events)] + (is (= "agent.runtime" (:type first-event))) + (is (<= (json-size stored-events) cap)) + (is (contains? (:data first-event) :truncated))) + (done))) + (.catch (fn [error] + (is false (str "unexpected error: " error)) + (done))))))))