mirror of
https://github.com/logseq/logseq.git
synced 2026-05-25 05:04:24 +00:00
fix: sqlite toobig crash on /sessions/:id/messages
This commit is contained in:
@@ -27,6 +27,83 @@
|
||||
(defn- <storage-put! [storage key value]
|
||||
(.put storage key (clj->js value)))
|
||||
|
||||
(def ^:private default-max-events-storage-bytes (* 768 1024))
|
||||
(def ^:private default-max-events-retained 2000)
|
||||
(def ^:private default-max-event-data-bytes (* 64 1024))
|
||||
(def ^:private default-max-event-preview-chars 4096)
|
||||
|
||||
(defn- json-string [value]
|
||||
(try
|
||||
(js/JSON.stringify (clj->js value))
|
||||
(catch :default _
|
||||
nil)))
|
||||
|
||||
(defn- json-size [value]
|
||||
(some-> (json-string value) (.-length)))
|
||||
|
||||
(defn- truncate-string
|
||||
[s max-chars]
|
||||
(if (> (count s) max-chars)
|
||||
(str (subs s 0 max-chars)
|
||||
"...[truncated "
|
||||
(- (count s) max-chars)
|
||||
" chars]")
|
||||
s))
|
||||
|
||||
(defn- parse-positive-int
|
||||
[value]
|
||||
(let [n (js/parseInt (str value) 10)]
|
||||
(when (and (js/Number.isFinite n) (pos? n))
|
||||
n)))
|
||||
|
||||
(defn- storage-limits
|
||||
[^js self]
|
||||
(let [env (.-env self)]
|
||||
{:max-events-storage-bytes (or (some-> (aget env "AGENT_SESSION_EVENTS_MAX_BYTES")
|
||||
parse-positive-int)
|
||||
default-max-events-storage-bytes)
|
||||
:max-events-retained (or (some-> (aget env "AGENT_SESSION_EVENTS_MAX_RETAINED")
|
||||
parse-positive-int)
|
||||
default-max-events-retained)
|
||||
:max-event-data-bytes (or (some-> (aget env "AGENT_SESSION_EVENT_DATA_MAX_BYTES")
|
||||
parse-positive-int)
|
||||
default-max-event-data-bytes)
|
||||
:max-event-preview-chars (or (some-> (aget env "AGENT_SESSION_EVENT_PREVIEW_MAX_CHARS")
|
||||
parse-positive-int)
|
||||
default-max-event-preview-chars)}))
|
||||
|
||||
(defn- compact-event-data
|
||||
[data {:keys [max-event-data-bytes max-event-preview-chars]}]
|
||||
(let [size (json-size data)]
|
||||
(if (and (number? size) (> size max-event-data-bytes))
|
||||
(let [raw (or (json-string data) (str data))]
|
||||
{:truncated true
|
||||
:reason "event data too large"
|
||||
:size size
|
||||
:preview (truncate-string raw max-event-preview-chars)})
|
||||
data)))
|
||||
|
||||
(defn- compact-event
|
||||
[event limits]
|
||||
(update event :data compact-event-data limits))
|
||||
|
||||
(defn- events-for-storage
|
||||
[events {:keys [max-events-storage-bytes max-events-retained] :as limits}]
|
||||
(let [events (->> events
|
||||
(map #(compact-event % limits))
|
||||
(take-last max-events-retained)
|
||||
vec)]
|
||||
(loop [idx (dec (count events))
|
||||
acc '()
|
||||
total-bytes 2] ; []
|
||||
(if (neg? idx)
|
||||
(vec acc)
|
||||
(let [event (nth events idx)
|
||||
event-size (or (json-size event) max-events-storage-bytes)]
|
||||
(if (> (+ total-bytes event-size) max-events-storage-bytes)
|
||||
(recur (dec idx) acc total-bytes)
|
||||
(recur (dec idx) (conj acc event) (+ total-bytes event-size))))))))
|
||||
|
||||
(defn- <get-session [^js self]
|
||||
(<storage-get (.-storage self) "session"))
|
||||
|
||||
@@ -38,7 +115,7 @@
|
||||
(<storage-put! (.-storage self) "session" session))
|
||||
|
||||
(defn- <put-events! [^js self events]
|
||||
(<storage-put! (.-storage self) "events" events))
|
||||
(<storage-put! (.-storage self) "events" (events-for-storage events (storage-limits self))))
|
||||
|
||||
(defn- <save-session! [^js self session]
|
||||
(p/let [_ (<put-session! self session)]
|
||||
|
||||
@@ -11,11 +11,30 @@
|
||||
(.set data k v)
|
||||
(js/Promise.resolve nil))}))
|
||||
|
||||
(defn- make-limited-agent-storage [max-bytes]
|
||||
(let [data (js/Map.)]
|
||||
{:data data
|
||||
:storage #js {:get (fn [k]
|
||||
(js/Promise.resolve (.get data k)))
|
||||
:put (fn [k v]
|
||||
(if (> (.-length (js/JSON.stringify v)) max-bytes)
|
||||
(js/Promise.reject (js/Error. "string or blob too big: SQLITE_TOOBIG Error"))
|
||||
(do
|
||||
(.set data k v)
|
||||
(js/Promise.resolve nil))))}}))
|
||||
|
||||
(defn- make-self [env]
|
||||
#js {:env env
|
||||
:storage (make-agent-storage)
|
||||
:streams (js/Map.)})
|
||||
|
||||
(defn- make-limited-self [env max-bytes]
|
||||
(let [{:keys [data storage]} (make-limited-agent-storage max-bytes)]
|
||||
{:self #js {:env env
|
||||
:storage storage
|
||||
:streams (js/Map.)}
|
||||
:data data}))
|
||||
|
||||
(defn- json-request
|
||||
[url method body headers]
|
||||
(let [^js req-headers (js/Headers.)]
|
||||
@@ -30,6 +49,9 @@
|
||||
(defn- <json [^js resp]
|
||||
(.then (.json resp) #(js->clj % :keywordize-keys true)))
|
||||
|
||||
(defn- json-size [value]
|
||||
(.-length (js/JSON.stringify value)))
|
||||
|
||||
(deftest messages-use-single-events-stream-and-dont-duplicate-user-message-test
|
||||
(testing "session messages post to /messages while keeping one /events/sse stream and no audit message payload"
|
||||
(async done
|
||||
@@ -414,3 +436,79 @@
|
||||
(set! js/fetch original-fetch)
|
||||
(is false (str "unexpected error: " error))
|
||||
(done))))))))
|
||||
|
||||
(deftest messages-persists-when-existing-events-near-storage-limit-test
|
||||
(testing "message append should succeed and keep persisted events under size limits"
|
||||
(async done
|
||||
(let [payload (apply str (repeat 1500 "x"))
|
||||
initial-events (clj->js [{:event-id "e1"
|
||||
:session-id "sess-cap"
|
||||
:type "agent.runtime"
|
||||
:ts 1
|
||||
:data {:payload payload}}])
|
||||
cap (+ (json-size initial-events) 16)
|
||||
{:keys [self data]} (make-limited-self #js {"AGENT_SESSION_EVENTS_MAX_BYTES" (str cap)} cap)
|
||||
headers {"content-type" "application/json"
|
||||
"x-user-id" "user-1"}]
|
||||
(-> (.put (.-storage self)
|
||||
"session"
|
||||
(clj->js {:id "sess-cap"
|
||||
:status "running"
|
||||
:task {}
|
||||
:audit {}
|
||||
:created-at 0
|
||||
:updated-at 0}))
|
||||
(.then (fn [_]
|
||||
(.put (.-storage self) "events" initial-events)))
|
||||
(.then (fn [_]
|
||||
(agent-do/handle-fetch self
|
||||
(json-request "http://db-sync.local/__session__/messages"
|
||||
"POST"
|
||||
{:message "hello"}
|
||||
headers))))
|
||||
(.then (fn [resp]
|
||||
(is (= 200 (.-status resp)))
|
||||
(let [stored-events (.get data "events")
|
||||
events (js->clj stored-events :keywordize-keys true)]
|
||||
(is (<= (json-size stored-events) cap))
|
||||
(is (= "audit.log" (:type (last events)))))
|
||||
(done)))
|
||||
(.catch (fn [error]
|
||||
(is false (str "unexpected error: " error))
|
||||
(done))))))))
|
||||
|
||||
(deftest append-event-truncates-oversized-event-data-test
|
||||
(testing "single oversized event payload should be persisted in bounded form"
|
||||
(async done
|
||||
(let [cap 1800
|
||||
huge-payload (apply str (repeat 5000 "y"))
|
||||
{:keys [self data]} (make-limited-self #js {"AGENT_SESSION_EVENTS_MAX_BYTES" (str cap)
|
||||
"AGENT_SESSION_EVENT_DATA_MAX_BYTES" "512"
|
||||
"AGENT_SESSION_EVENT_PREVIEW_MAX_CHARS" "120"} cap)]
|
||||
(-> (.put (.-storage self)
|
||||
"session"
|
||||
(clj->js {:id "sess-huge"
|
||||
:status "running"
|
||||
:task {}
|
||||
:audit {}
|
||||
:created-at 0
|
||||
:updated-at 0}))
|
||||
(.then (fn [_]
|
||||
(.put (.-storage self) "events" #js [])))
|
||||
(.then (fn [_]
|
||||
(#'agent-do/<append-event! self {:type "agent.runtime"
|
||||
:data {:output huge-payload}
|
||||
:ts 2})))
|
||||
(.then (fn [res]
|
||||
(is (map? res))
|
||||
(is (not= :missing-session (:error res)))
|
||||
(let [stored-events (.get data "events")
|
||||
events (js->clj stored-events :keywordize-keys true)
|
||||
first-event (first events)]
|
||||
(is (= "agent.runtime" (:type first-event)))
|
||||
(is (<= (json-size stored-events) cap))
|
||||
(is (contains? (:data first-event) :truncated)))
|
||||
(done)))
|
||||
(.catch (fn [error]
|
||||
(is false (str "unexpected error: " error))
|
||||
(done))))))))
|
||||
|
||||
Reference in New Issue
Block a user