append only events

This commit is contained in:
Tienson Qin
2026-02-07 23:15:02 +08:00
parent d45a7d7ba4
commit 7a0ee16d97
2 changed files with 157 additions and 41 deletions

View File

@@ -27,18 +27,63 @@
(defn- <storage-put! [storage key value]
(.put storage key (clj->js value)))
(def ^:private events-meta-key "events.meta")
(defn- events-item-key [idx]
(str "events." idx))
(defn- valid-events-meta? [meta]
(and (map? meta)
(integer? (:count meta))
(<= 0 (:count meta))))
(defn- <get-events-meta [^js self]
(p/let [meta (<storage-get (.-storage self) events-meta-key)]
(when (valid-events-meta? meta)
meta)))
(defn- <put-events-meta! [^js self count]
(<storage-put! (.-storage self) events-meta-key {:count count}))
(defn- <persist-events! [^js self events]
(let [events (vec events)
storage (.-storage self)]
(p/let [_ (p/all (map-indexed (fn [idx event]
(<storage-put! storage (events-item-key idx) event))
events))
_ (<put-events-meta! self (count events))]
nil)))
(defn- <append-event-storage! [^js self event]
(p/let [meta (<get-events-meta self)
meta (or meta {:count 0})
idx (:count meta)
_ (<storage-put! (.-storage self) (events-item-key idx) event)
_ (<put-events-meta! self (inc idx))]
nil))
(defn- <get-session [^js self]
(<storage-get (.-storage self) "session"))
(defn- <get-events [^js self]
(p/let [events (<storage-get (.-storage self) "events")]
(vec (or events []))))
(p/let [meta (<get-events-meta self)]
(if meta
(let [count (:count meta)]
(if (zero? count)
[]
(p/let [events (p/all (map (fn [idx]
(<storage-get (.-storage self) (events-item-key idx)))
(range count)))]
(->> events
(remove nil?)
vec))))
[])))
(defn- <put-session! [^js self session]
(<storage-put! (.-storage self) "session" session))
(defn- <put-events! [^js self events]
(<storage-put! (.-storage self) "events" (vec events)))
(<persist-events! self events))
(defn- <save-session! [^js self session]
(p/let [_ (<put-session! self session)]
@@ -60,13 +105,12 @@
(.delete streams key)))))))))
(defn- <append-event! [^js self event-opts]
(p/let [session (<get-session self)
events (<get-events self)]
(p/let [session (<get-session self)]
(if (nil? session)
{:error :missing-session}
(let [[session events event] (session/append-event session events event-opts)]
(p/let [_ (<put-session! self session)
_ (<put-events! self events)]
(let [[session _ event] (session/append-event session [] event-opts)]
(p/let [_ (<append-event-storage! self event)
_ (<put-session! self session)]
(broadcast-event! self event)
{:session session :event event})))))
@@ -202,8 +246,7 @@
(let [provider (runtime-provider/resolve-provider (.-env self) nil)
provider-kind (runtime-provider/provider-id provider)]
(p/let [runtime (runtime-provider/<provision-runtime! provider session-id task)
session (<get-session self)
events (<get-events self)]
session (<get-session self)]
(cond
(nil? runtime)
(throw (ex-info "runtime provisioning returned nil"
@@ -215,15 +258,15 @@
:else
(let [session (assoc session :runtime runtime)
[session events _event] (session/append-event session events {:type "session.provisioned"
:data {:provider (:provider runtime)
:runtime-session-id (:session-id runtime)
:sandbox-id (:sandbox-id runtime)
:sandbox-name (:sandbox-name runtime)
:sprite-name (:sprite-name runtime)}
:ts (common/now-ms)})]
(p/let [_ (<put-session! self session)
_ (<put-events! self events)]
[session _ event] (session/append-event session [] {:type "session.provisioned"
:data {:provider (:provider runtime)
:runtime-session-id (:session-id runtime)
:sandbox-id (:sandbox-id runtime)
:sandbox-name (:sandbox-name runtime)
:sprite-name (:sprite-name runtime)}
:ts (common/now-ms)})]
(p/let [_ (<append-event-storage! self event)
_ (<put-session! self session)]
(when-not (terminal-status? (:status session))
(start-runtime-events-stream-background! self (:id session) runtime))
runtime))))))

View File

@@ -49,9 +49,6 @@
(defn- <json [^js resp]
(.then (.json resp) #(js->clj % :keywordize-keys true)))
(defn- json-size [value]
(.-length (js/JSON.stringify value)))
(deftest messages-use-single-events-stream-and-dont-duplicate-user-message-test
(testing "session messages post to /messages while keeping one /events/sse stream and no audit message payload"
(async done
@@ -323,8 +320,6 @@
:audit {}
:created-at 0
:updated-at 0}))
(.then (fn [_]
(.put (.-storage self) "events" #js [])))
(.then (fn [_]
(agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/stream"
@@ -446,7 +441,6 @@
:type "agent.runtime"
:ts 1
:data {:payload payload}}
initial-events (clj->js [initial-event])
{:keys [self data]} (make-limited-self #js {"AGENT_SESSION_EVENTS_MAX_BYTES" "256"} 500000)
headers {"content-type" "application/json"
"x-user-id" "user-1"}]
@@ -459,7 +453,7 @@
:created-at 0
:updated-at 0}))
(.then (fn [_]
(.put (.-storage self) "events" initial-events)))
(#'agent-do/<put-events! self [initial-event])))
(.then (fn [_]
(agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/messages"
@@ -468,12 +462,58 @@
headers))))
(.then (fn [resp]
(is (= 200 (.-status resp)))
(let [stored-events (.get data "events")
events (js->clj stored-events :keywordize-keys true)]
(is (= (:event-id initial-event)
(:event-id (first events))))
(is (= "audit.log" (:type (last events)))))
(done)))
(agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/events"
"GET"
nil
{"x-user-id" "user-1"}))))
(.then (fn [events-resp]
(.then (<json events-resp)
(fn [body]
(let [events (:events body)
meta (js->clj (.get data "events.meta") :keywordize-keys true)]
(is (= (:event-id initial-event)
(:event-id (first events))))
(is (= "audit.log" (:type (last events))))
(is (= {:count 2} meta)))
(done)))))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done))))))))
(deftest get-events-ignores-legacy-events-key-test
(testing "legacy `events` key should not be used when indexed storage metadata is missing"
(async done
(let [legacy-event {:event-id "legacy-e1"
:session-id "sess-legacy"
:type "agent.runtime"
:ts 1
:data {:payload "legacy"}}
{:keys [self]} (make-limited-self #js {} 500000)
headers {"content-type" "application/json"
"x-user-id" "user-1"}]
(-> (.put (.-storage self)
"session"
(clj->js {:id "sess-legacy"
:status "running"
:task {}
:audit {}
:created-at 0
:updated-at 0}))
(.then (fn [_]
;; Seed only legacy key; new implementation should ignore it.
(.put (.-storage self) "events" (clj->js [legacy-event]))))
(.then (fn [_]
(agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/events"
"GET"
nil
headers))))
(.then (fn [events-resp]
(.then (<json events-resp)
(fn [body]
(is (= [] (:events body)))
(done)))))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done))))))))
@@ -482,8 +522,7 @@
(testing "single oversized event payload should be persisted fully without truncation"
(async done
(let [huge-payload (apply str (repeat 5000 "y"))
{:keys [self data]} (make-limited-self #js {"AGENT_SESSION_EVENT_DATA_MAX_BYTES" "512"
"AGENT_SESSION_EVENT_PREVIEW_MAX_CHARS" "120"} 500000)]
{:keys [self data]} (make-limited-self #js {} 500000)]
(-> (.put (.-storage self)
"session"
(clj->js {:id "sess-huge"
@@ -492,21 +531,55 @@
:audit {}
:created-at 0
:updated-at 0}))
(.then (fn [_]
(.put (.-storage self) "events" #js [])))
(.then (fn [_]
(#'agent-do/<append-event! self {:type "agent.runtime"
:data {:output huge-payload}
:ts 2})))
(.then (fn [_]
(#'agent-do/<get-events self)))
(.then (fn [res]
(is (map? res))
(is (not= :missing-session (:error res)))
(let [stored-events (.get data "events")
events (js->clj stored-events :keywordize-keys true)
(let [events (vec res)
first-event (first events)]
(is (= "agent.runtime" (:type first-event)))
(is (= huge-payload (get-in first-event [:data :output])))
(is (not (contains? (:data first-event) :truncated))))
(is (not (contains? (:data first-event) :truncated)))
(is (= {:count 1}
(js->clj (.get data "events.meta") :keywordize-keys true))))
(done)))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done))))))))
(deftest append-event-avoids-sqlite-toobig-by-indexed-storage-test
(testing "large event history should persist without writing one oversized events blob"
(async done
(let [payload (apply str (repeat 1200 "z"))
{:keys [self data]} (make-limited-self #js {} 5000)]
(-> (.put (.-storage self)
"session"
(clj->js {:id "sess-many-events"
:status "running"
:task {}
:audit {}
:created-at 0
:updated-at 0}))
(.then (fn [_]
(reduce (fn [promise idx]
(.then promise
(fn [_]
(#'agent-do/<append-event! self {:type "agent.runtime"
:data {:payload payload :idx idx}
:ts idx}))))
(js/Promise.resolve nil)
(range 8))))
(.then (fn [_]
(#'agent-do/<get-events self)))
(.then (fn [events]
(let [meta (js->clj (.get data "events.meta") :keywordize-keys true)]
(is (= 8 (count events)))
(is (= {:count 8} meta))
(is (nil? (.get data "events")))
(is (some? (.get data "events.7"))))
(done)))
(.catch (fn [error]
(is false (str "unexpected error: " error))