This commit is contained in:
Tienson Qin
2026-02-02 00:09:30 +08:00
parent b1e59fd3d7
commit 4ddce67288
11 changed files with 550 additions and 3 deletions

View File

@@ -6,7 +6,8 @@
{:db-sync {:target :esm
:output-dir "worker/dist/worker"
:modules {:main {:exports {default logseq.db-sync.worker/worker
SyncDO logseq.db-sync.worker/SyncDO}}}
SyncDO logseq.db-sync.worker/SyncDO
AgentSessionDO logseq.db-sync.worker/AgentSessionDO}}}
:js-options {:js-provider :import}
:closure-defines {shadow.cljs.devtools.client.env/enabled false}
:devtools {:enabled false}}

View File

@@ -217,6 +217,44 @@
:any
http-error-response-schema])
(def agent-task-source-schema
[:map
[:node-id :string]
[:node-title :string]
[:node-revision :any]
[:snapshot {:optional true} :any]])
(def agent-task-schema
[:map
[:id :string]
[:source agent-task-source-schema]
[:intent {:optional true} :map]
[:agent {:optional true} :map]
[:execution {:optional true} :map]
[:audit {:optional true} :map]])
(def sessions-create-request-schema agent-task-schema)
(def sessions-message-request-schema
[:map
[:message :string]
[:kind {:optional true} :string]])
(def sessions-create-response-schema
[:map
[:session-id :string]
[:status :string]
[:stream-url :string]])
(def sessions-get-response-schema
[:map
[:session-id :string]
[:status :string]
[:task :map]
[:audit :map]
[:created-at :int]
[:updated-at :int]])
(def http-request-schemas
{:graphs/create graph-create-request-schema
:graph-members/create graph-member-create-request-schema
@@ -224,7 +262,9 @@
:sync/tx-batch tx-batch-request-schema
:e2ee/user-keys e2ee-user-key-request-schema
:e2ee/graph-aes-key e2ee-graph-aes-key-request-schema
:e2ee/grant-access e2ee-grant-access-request-schema})
:e2ee/grant-access e2ee-grant-access-request-schema
:sessions/create sessions-create-request-schema
:sessions/message sessions-message-request-schema})
(def http-response-schemas
{:graphs/list graphs-list-response-schema
@@ -249,6 +289,10 @@
:assets/get asset-get-response-schema
:assets/put http-ok-response-schema
:assets/delete http-ok-response-schema
:sessions/create sessions-create-response-schema
:sessions/get sessions-get-response-schema
:sessions/message http-ok-response-schema
:sessions/cancel http-ok-response-schema
:error http-error-response-schema})
(def ^:private json-transformer

View File

@@ -3,6 +3,7 @@
[lambdaisland.glogi :as log]
[logseq.db-sync.common :as common]
[logseq.db-sync.logging :as logging]
[logseq.db-sync.worker.agent.do :as agent-do]
[logseq.db-sync.worker.dispatch :as dispatch]
[logseq.db-sync.worker.handler.sync :as sync-handler]
[logseq.db-sync.worker.handler.ws :as ws-handler]
@@ -55,3 +56,17 @@
(presence/remove-presence! this ws)
(presence/broadcast-online-users! this)
(log/error :db-sync/ws-error {:error error})))
(defclass AgentSessionDO
(extends DurableObject)
(constructor [this ^js state env]
(super state env)
(set! (.-state this) state)
(set! (.-env this) env)
(set! (.-storage this) (.-storage state))
(set! (.-streams this) (js/Map.)))
Object
(fetch [this request]
(agent-do/handle-fetch this request)))

View File

@@ -0,0 +1,199 @@
(ns logseq.db-sync.worker.agent.do
(:require [lambdaisland.glogi :as log]
[logseq.db-sync.common :as common]
[logseq.db-sync.platform.core :as platform]
[logseq.db-sync.worker.agent.session :as session]
[logseq.db-sync.worker.http :as http]
[promesa.core :as p]))
(defn- header [request name]
(.get (.-headers request) name))
(defn- user-id-from-request [request]
(header request "x-user-id"))
(defn- sse-encode [event]
(str "data: " (js/JSON.stringify (clj->js event)) "\n\n"))
(defn- <storage-get [storage key]
(p/let [value (.get storage key)]
(when value (js->clj value :keywordize-keys true))))
(defn- <storage-put! [storage key value]
(.put storage key (clj->js value)))
(defn- <get-session [^js self]
(<storage-get (.-storage self) "session"))
(defn- <get-events [^js self]
(p/let [events (<storage-get (.-storage self) "events")]
(vec (or events []))))
(defn- <put-session! [^js self session]
(<storage-put! (.-storage self) "session" session))
(defn- <put-events! [^js self events]
(<storage-put! (.-storage self) "events" events))
(defn- stream-url [request session-id]
(let [base (or (header request "x-stream-base")
(.-origin (platform/request-url request)))]
(str base "/sessions/" session-id "/stream")))
(defn- broadcast-event! [^js self event]
(let [streams (.-streams self)]
(when streams
(.forEach streams
(fn [writer key]
(-> (.write writer (sse-encode event))
(.catch (fn [_]
(.delete streams key)))))))))
(defn- <append-event! [^js self event-opts]
(p/let [session (<get-session self)
events (<get-events self)]
(if (nil? session)
{:error :missing-session}
(let [[session events event] (session/append-event session events event-opts)]
(p/let [_ (<put-session! self session)
_ (<put-events! self events)]
(broadcast-event! self event)
{:session session :event event})))))
(defn- handle-init [^js self request]
(p/let [existing (<get-session self)]
(if existing
(let [session-id (:id existing)]
(http/json-response :sessions/create
{:session-id session-id
:status (:status existing)
:stream-url (stream-url request session-id)}))
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [task (js->clj result :keywordize-keys true)
task-id (:id task)
user-id (user-id-from-request request)
now (common/now-ms)
audit-default {:requested-by user-id
:requested-at now}
audit (merge audit-default (:audit task))]
(cond
(not (string? user-id))
(http/unauthorized)
(not (string? task-id))
(http/bad-request "invalid session id")
(and (string? (:requested-by audit))
(not= (:requested-by audit) user-id))
(http/forbidden)
:else
(let [session (session/initial-session task audit now)
[session events _event] (session/append-event session [] {:type "session.created" :data {:requested-by user-id} :ts now})]
(p/let [_ (<put-session! self session)
_ (<put-events! self events)]
(http/json-response :sessions/create
{:session-id task-id
:status (:status session)
:stream-url (stream-url request task-id)}))))))))))
(defn- handle-status [^js self _request]
(p/let [session (<get-session self)]
(if (nil? session)
(http/not-found)
(http/json-response :sessions/get
{:session-id (:id session)
:status (:status session)
:task (:task session)
:audit (:audit session)
:created-at (:created-at session)
:updated-at (:updated-at session)}))))
(defn- handle-messages [^js self request]
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
message (:message body)
user-id (user-id-from-request request)]
(cond
(not (string? user-id))
(http/unauthorized)
(not (string? message))
(http/bad-request "invalid message")
:else
(p/let [res (<append-event! self {:type "audit.log"
:data {:message message
:kind (:kind body)
:by user-id}})]
(if (= (:error res) :missing-session)
(http/not-found)
(http/json-response :sessions/message {:ok true})))))))))
(defn- handle-cancel [^js self request]
(let [user-id (user-id-from-request request)]
(if-not (string? user-id)
(http/unauthorized)
(p/let [res (<append-event! self {:type "session.canceled"
:data {:by user-id}})]
(if (= (:error res) :missing-session)
(http/not-found)
(http/json-response :sessions/cancel {:ok true}))))))
(defn- handle-stream [^js self request]
(let [streams (.-streams self)
stream (js/TransformStream.)
writer (.getWriter (.-writable stream))
stream-id (str (random-uuid))
cleanup (fn []
(.delete streams stream-id)
(.close writer))]
(.set streams stream-id writer)
(.addEventListener (.-signal request) "abort" cleanup)
(p/let [events (<get-events self)]
(doseq [event events]
(.write writer (sse-encode event))))
(js/Response.
(.-readable stream)
#js {:status 200
:headers (js/Object.assign
#js {"content-type" "text/event-stream"
"cache-control" "no-cache"
"connection" "keep-alive"}
(common/cors-headers))})))
(defn handle-fetch [^js self request]
(let [url (platform/request-url request)
path (.-pathname url)
method (.-method request)]
(try
(cond
(contains? #{"OPTIONS" "HEAD"} method)
(common/options-response)
(= path "/__session__/init")
(handle-init self request)
(= path "/__session__/status")
(handle-status self request)
(= path "/__session__/messages")
(handle-messages self request)
(= path "/__session__/cancel")
(handle-cancel self request)
(= path "/__session__/stream")
(handle-stream self request)
:else
(http/not-found))
(catch :default error
(log/error :agent/session-do-error error)
(http/error-response "server error" 500)))))

View File

@@ -0,0 +1,32 @@
(ns logseq.db-sync.worker.agent.session
(:require [logseq.db-sync.common :as common]))
(defn initial-session [task audit now]
{:id (:id task)
:status "created"
:task task
:audit audit
:created-at now
:updated-at now})
(defn- status-from-event [event-type]
(case event-type
"session.running" "running"
"session.paused" "paused"
"session.completed" "completed"
"session.failed" "failed"
"session.canceled" "canceled"
nil))
(defn append-event [session events {:keys [type data event-id ts]}]
(let [event-id (or event-id (str (random-uuid)))
ts (or ts (common/now-ms))
event {:event-id event-id
:session-id (:id session)
:type type
:ts ts
:data data}
next-status (or (status-from-event type) (:status session))
updated (assoc session :status next-status :updated-at ts)
events (conj (vec events) event)]
[updated events event]))

View File

@@ -2,6 +2,7 @@
(:require [clojure.string :as string]
[logseq.db-sync.common :as common]
[logseq.db-sync.platform.core :as platform]
[logseq.db-sync.worker.handler.agent :as agent-handler]
[logseq.db-sync.worker.handler.assets :as assets-handler]
[logseq.db-sync.worker.handler.index :as index-handler]
[logseq.db-sync.worker.http :as http]
@@ -22,6 +23,9 @@
(string/starts-with? path "/e2ee")
(index-handler/handle-fetch #js {:env env :d1 (aget env "DB")} request)
(string/starts-with? path "/sessions")
(agent-handler/handle-fetch #js {:env env} request)
(string/starts-with? path "/assets/")
(if (= method "OPTIONS")
(assets-handler/handle request env)

View File

@@ -0,0 +1,160 @@
(ns logseq.db-sync.worker.handler.agent
(:require [lambdaisland.glogi :as log]
[logseq.db-sync.common :as common]
[logseq.db-sync.platform.core :as platform]
[logseq.db-sync.worker.auth :as auth]
[logseq.db-sync.worker.http :as http]
[logseq.db-sync.worker.routes.index :as routes]
[promesa.core :as p]))
(defn- session-namespace [^js env]
(.-LOGSEQ_AGENT_SESSION_DO env))
(defn- session-stub [^js env session-id]
(when-let [^js namespace (session-namespace env)]
(let [do-id (.idFromName namespace session-id)]
(.get namespace do-id))))
(defn- base-headers [request claims]
(let [headers (js/Headers.)
token (.get (.-headers request) "authorization")
user-id (aget claims "sub")
email (aget claims "email")
username (aget claims "username")
idempotency-key (.get (.-headers request) "idempotency-key")]
(.set headers "content-type" "application/json")
(when (string? token)
(.set headers "authorization" token))
(when (string? user-id)
(.set headers "x-user-id" user-id))
(when (string? email)
(.set headers "x-user-email" email))
(when (string? username)
(.set headers "x-user-username" username))
(when (string? idempotency-key)
(.set headers "idempotency-key" idempotency-key))
headers))
(defn- forward-request [^js stub url method headers body]
(let [init (cond-> {:method method :headers headers}
(some? body)
(assoc :body body))]
(.fetch stub (platform/request url (clj->js init)))))
(defn- handle-create [{:keys [env request url claims]}]
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :sessions/create body)
idempotency-key (.get (.-headers request) "idempotency-key")
session-id (or (:id body) idempotency-key)]
(cond
(nil? body)
(http/bad-request "invalid body")
(not (string? session-id))
(http/bad-request "invalid session id")
:else
(if-let [^js stub (session-stub env session-id)]
(let [headers (base-headers request claims)
_ (.set headers "x-stream-base" (.-origin url))
task (assoc body :id session-id)
body-json (js/JSON.stringify (clj->js task))
do-url (str (.-origin url) "/__session__/init")]
(forward-request stub do-url "POST" headers body-json))
(http/error-response "server error" 500))))))))
(defn- handle-get [{:keys [env request url claims route]}]
(let [session-id (get-in route [:path-params :session-id])]
(if-not (string? session-id)
(http/bad-request "invalid session id")
(if-let [^js stub (session-stub env session-id)]
(let [headers (base-headers request claims)
do-url (str (.-origin url) "/__session__/status")]
(forward-request stub do-url "GET" headers nil))
(http/error-response "server error" 500)))))
(defn- handle-messages [{:keys [env request url claims route]}]
(let [session-id (get-in route [:path-params :session-id])]
(if-not (string? session-id)
(http/bad-request "invalid session id")
(.then (common/read-json request)
(fn [result]
(if (nil? result)
(http/bad-request "missing body")
(let [body (js->clj result :keywordize-keys true)
body (http/coerce-http-request :sessions/message body)]
(cond
(nil? body)
(http/bad-request "invalid body")
:else
(if-let [^js stub (session-stub env session-id)]
(let [headers (base-headers request claims)
body-json (js/JSON.stringify (clj->js body))
do-url (str (.-origin url) "/__session__/messages")]
(forward-request stub do-url "POST" headers body-json))
(http/error-response "server error" 500))))))))))
(defn- handle-cancel [{:keys [env request url claims route]}]
(let [session-id (get-in route [:path-params :session-id])]
(if-not (string? session-id)
(http/bad-request "invalid session id")
(if-let [^js stub (session-stub env session-id)]
(let [headers (base-headers request claims)
do-url (str (.-origin url) "/__session__/cancel")]
(forward-request stub do-url "POST" headers nil))
(http/error-response "server error" 500)))))
(defn- handle-stream [{:keys [env request url claims route]}]
(let [session-id (get-in route [:path-params :session-id])]
(if-not (string? session-id)
(http/bad-request "invalid session id")
(if-let [^js stub (session-stub env session-id)]
(let [headers (base-headers request claims)
do-url (str (.-origin url) "/__session__/stream")]
(forward-request stub do-url "GET" headers nil))
(http/error-response "server error" 500)))))
(defn handle [{:keys [route] :as ctx}]
(case (:handler route)
:sessions/create (handle-create ctx)
:sessions/get (handle-get ctx)
:sessions/messages (handle-messages ctx)
:sessions/cancel (handle-cancel ctx)
:sessions/stream (handle-stream ctx)
(http/not-found)))
(defn handle-fetch [^js self request]
(let [env (.-env self)
url (js/URL. (.-url request))
path (.-pathname url)
method (.-method request)]
(try
(cond
(contains? #{"OPTIONS" "HEAD"} method)
(common/options-response)
:else
(p/let [claims (auth/auth-claims request env)
route (routes/match-route method path)
response (cond
(nil? claims)
(http/unauthorized)
route
(handle {:env env
:request request
:url url
:claims claims
:route route})
:else
(http/not-found))]
response))
(catch :default error
(log/error :agent/session-handler-error error)
(http/error-response "server error" 500)))))

View File

@@ -20,7 +20,15 @@
["/graphs/:graph-id"
["/aes-key" {:methods {"GET" :e2ee/graph-aes-key-get
"POST" :e2ee/graph-aes-key-post}}]
["/grant-access" {:methods {"POST" :e2ee/grant-access}}]]]])
["/grant-access" {:methods {"POST" :e2ee/grant-access}}]]]
["/sessions"
["" {:methods {"POST" :sessions/create}}]
["/:session-id"
["" {:methods {"GET" :sessions/get}}]
["/messages" {:methods {"POST" :sessions/messages}}]
["/cancel" {:methods {"POST" :sessions/cancel}}]
["/stream" {:methods {"GET" :sessions/stream}}]]]])
(def ^:private router
(r/router route-data))

View File

@@ -0,0 +1,43 @@
(ns logseq.db-sync.agent-session-test
(:require [cljs.test :refer [deftest is testing]]
[logseq.db-sync.worker.agent.session :as session]))
(deftest session-initialization-test
(testing "builds a created session with task and audit"
(let [task {:id "task-1"}
audit {:requested-by "user-1"}
now 123
s (session/initial-session task audit now)]
(is (= "task-1" (:id s)))
(is (= "created" (:status s)))
(is (= task (:task s)))
(is (= audit (:audit s)))
(is (= now (:created-at s)))
(is (= now (:updated-at s))))))
(deftest session-event-transition-test
(testing "session status transitions on lifecycle events"
(let [task {:id "task-2"}
audit {:requested-by "user-2"}
now 100
s0 (session/initial-session task audit now)
[s1 events1 e1] (session/append-event s0 [] {:type "session.running" :data {:ok true} :event-id "e1" :ts 101})
[s2 events2 _e2] (session/append-event s1 events1 {:type "session.paused" :data {} :event-id "e2" :ts 102})
[s3 events3 _e3] (session/append-event s2 events2 {:type "session.completed" :data {} :event-id "e3" :ts 103})]
(is (= "running" (:status s1)))
(is (= "paused" (:status s2)))
(is (= "completed" (:status s3)))
(is (= 3 (count events3)))
(is (= "e1" (:event-id e1)))
(is (= 103 (:updated-at s3))))))
(deftest session-event-defaults-test
(testing "append-event assigns defaults when missing"
(let [task {:id "task-3"}
audit {:requested-by "user-3"}
s0 (session/initial-session task audit 1)
[s1 events1 e1] (session/append-event s0 [] {:type "audit.log" :data {:msg "hi"}})]
(is (= "created" (:status s1)))
(is (= 1 (count events1)))
(is (string? (:event-id e1)))
(is (number? (:ts e1))))))

View File

@@ -46,3 +46,20 @@
(testing "method mismatch returns nil"
(is (nil? (routes/match-route "GET" "/graphs/graph-1/members/user-9")))
(is (nil? (routes/match-route "PUT" "/e2ee/user-keys")))))
(deftest match-route-sessions-test
(testing "sessions routes"
(let [match (routes/match-route "POST" "/sessions")]
(is (= :sessions/create (:handler match))))
(let [match (routes/match-route "GET" "/sessions/session-1")]
(is (= :sessions/get (:handler match)))
(is (= "session-1" (get-in match [:path-params :session-id]))))
(let [match (routes/match-route "POST" "/sessions/session-2/messages")]
(is (= :sessions/messages (:handler match)))
(is (= "session-2" (get-in match [:path-params :session-id]))))
(let [match (routes/match-route "POST" "/sessions/session-3/cancel")]
(is (= :sessions/cancel (:handler match)))
(is (= "session-3" (get-in match [:path-params :session-id]))))
(let [match (routes/match-route "GET" "/sessions/session-4/stream")]
(is (= :sessions/stream (:handler match)))
(is (= "session-4" (get-in match [:path-params :session-id]))))))

View File

@@ -10,6 +10,10 @@ enabled = true
name = "LOGSEQ_SYNC_DO"
class_name = "SyncDO"
[[durable_objects.bindings]]
name = "LOGSEQ_AGENT_SESSION_DO"
class_name = "AgentSessionDO"
[[r2_buckets]]
binding = "LOGSEQ_SYNC_ASSETS"
bucket_name = "logseq-sync-assets-prod"
@@ -23,6 +27,10 @@ database_id = "00325aa2-c805-4693-b599-900a25dcde42"
tag = "v1"
new_sqlite_classes = [ "SyncDO" ]
[[migrations]]
tag = "v2"
new_sqlite_classes = [ "AgentSessionDO" ]
[vars]
COGNITO_JWKS_URL = "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_dtagLnju8/.well-known/jwks.json"
COGNITO_ISSUER = "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_dtagLnju8"
@@ -40,6 +48,10 @@ COGNITO_CLIENT_ID = "69cs1lgme7p8kbgld8n5kseii6"
name = "LOGSEQ_SYNC_DO"
class_name = "SyncDO"
[[env.staging.durable_objects.bindings]]
name = "LOGSEQ_AGENT_SESSION_DO"
class_name = "AgentSessionDO"
[[env.staging.migrations]]
tag = "v1"
new_sqlite_classes = [ "SyncDO" ]
@@ -48,6 +60,10 @@ new_sqlite_classes = [ "SyncDO" ]
tag = "v2"
deleted_classes = [ "SyncIndexDO" ]
[[env.staging.migrations]]
tag = "v3"
new_sqlite_classes = [ "AgentSessionDO" ]
[[env.staging.r2_buckets]]
binding = "LOGSEQ_SYNC_ASSETS"
bucket_name = "logseq-sync-assets-dev"
@@ -69,10 +85,18 @@ COGNITO_CLIENT_ID = "69cs1lgme7p8kbgld8n5kseii6"
name = "LOGSEQ_SYNC_DO"
class_name = "SyncDO"
[[env.prod.durable_objects.bindings]]
name = "LOGSEQ_AGENT_SESSION_DO"
class_name = "AgentSessionDO"
[[env.prod.migrations]]
tag = "v1"
new_sqlite_classes = [ "SyncDO" ]
[[env.prod.migrations]]
tag = "v2"
new_sqlite_classes = [ "AgentSessionDO" ]
[[env.prod.r2_buckets]]
binding = "LOGSEQ_SYNC_ASSETS"
bucket_name = "logseq-sync-assets-prod"