From 4ddce67288f8b6fff23e65cdf2532cd5f1e20505 Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Mon, 2 Feb 2026 00:09:30 +0800 Subject: [PATCH] M2 --- deps/db-sync/shadow-cljs.edn | 3 +- .../src/logseq/db_sync/malli_schema.cljs | 46 +++- deps/db-sync/src/logseq/db_sync/worker.cljs | 15 ++ .../src/logseq/db_sync/worker/agent/do.cljs | 199 ++++++++++++++++++ .../logseq/db_sync/worker/agent/session.cljs | 32 +++ .../src/logseq/db_sync/worker/dispatch.cljs | 4 + .../logseq/db_sync/worker/handler/agent.cljs | 160 ++++++++++++++ .../logseq/db_sync/worker/routes/index.cljs | 10 +- .../logseq/db_sync/agent_session_test.cljs | 43 ++++ .../logseq/db_sync/worker_routes_test.cljs | 17 ++ deps/db-sync/worker/wrangler.toml | 24 +++ 11 files changed, 550 insertions(+), 3 deletions(-) create mode 100644 deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs create mode 100644 deps/db-sync/src/logseq/db_sync/worker/agent/session.cljs create mode 100644 deps/db-sync/src/logseq/db_sync/worker/handler/agent.cljs create mode 100644 deps/db-sync/test/logseq/db_sync/agent_session_test.cljs diff --git a/deps/db-sync/shadow-cljs.edn b/deps/db-sync/shadow-cljs.edn index 761b3c86e3..da98bcd7fc 100644 --- a/deps/db-sync/shadow-cljs.edn +++ b/deps/db-sync/shadow-cljs.edn @@ -6,7 +6,8 @@ {:db-sync {:target :esm :output-dir "worker/dist/worker" :modules {:main {:exports {default logseq.db-sync.worker/worker - SyncDO logseq.db-sync.worker/SyncDO}}} + SyncDO logseq.db-sync.worker/SyncDO + AgentSessionDO logseq.db-sync.worker/AgentSessionDO}}} :js-options {:js-provider :import} :closure-defines {shadow.cljs.devtools.client.env/enabled false} :devtools {:enabled false}} diff --git a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs index 2e91a23a30..033516b644 100644 --- a/deps/db-sync/src/logseq/db_sync/malli_schema.cljs +++ b/deps/db-sync/src/logseq/db_sync/malli_schema.cljs @@ -217,6 +217,44 @@ :any http-error-response-schema]) +(def agent-task-source-schema + [:map + [:node-id :string] + [:node-title :string] + [:node-revision :any] + [:snapshot {:optional true} :any]]) + +(def agent-task-schema + [:map + [:id :string] + [:source agent-task-source-schema] + [:intent {:optional true} :map] + [:agent {:optional true} :map] + [:execution {:optional true} :map] + [:audit {:optional true} :map]]) + +(def sessions-create-request-schema agent-task-schema) + +(def sessions-message-request-schema + [:map + [:message :string] + [:kind {:optional true} :string]]) + +(def sessions-create-response-schema + [:map + [:session-id :string] + [:status :string] + [:stream-url :string]]) + +(def sessions-get-response-schema + [:map + [:session-id :string] + [:status :string] + [:task :map] + [:audit :map] + [:created-at :int] + [:updated-at :int]]) + (def http-request-schemas {:graphs/create graph-create-request-schema :graph-members/create graph-member-create-request-schema @@ -224,7 +262,9 @@ :sync/tx-batch tx-batch-request-schema :e2ee/user-keys e2ee-user-key-request-schema :e2ee/graph-aes-key e2ee-graph-aes-key-request-schema - :e2ee/grant-access e2ee-grant-access-request-schema}) + :e2ee/grant-access e2ee-grant-access-request-schema + :sessions/create sessions-create-request-schema + :sessions/message sessions-message-request-schema}) (def http-response-schemas {:graphs/list graphs-list-response-schema @@ -249,6 +289,10 @@ :assets/get asset-get-response-schema :assets/put http-ok-response-schema :assets/delete http-ok-response-schema + :sessions/create sessions-create-response-schema + :sessions/get sessions-get-response-schema + :sessions/message http-ok-response-schema + :sessions/cancel http-ok-response-schema :error http-error-response-schema}) (def ^:private json-transformer diff --git a/deps/db-sync/src/logseq/db_sync/worker.cljs b/deps/db-sync/src/logseq/db_sync/worker.cljs index feb0e88c17..19821e7378 100644 --- a/deps/db-sync/src/logseq/db_sync/worker.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker.cljs @@ -3,6 +3,7 @@ [lambdaisland.glogi :as log] [logseq.db-sync.common :as common] [logseq.db-sync.logging :as logging] + [logseq.db-sync.worker.agent.do :as agent-do] [logseq.db-sync.worker.dispatch :as dispatch] [logseq.db-sync.worker.handler.sync :as sync-handler] [logseq.db-sync.worker.handler.ws :as ws-handler] @@ -55,3 +56,17 @@ (presence/remove-presence! this ws) (presence/broadcast-online-users! this) (log/error :db-sync/ws-error {:error error}))) + +(defclass AgentSessionDO + (extends DurableObject) + + (constructor [this ^js state env] + (super state env) + (set! (.-state this) state) + (set! (.-env this) env) + (set! (.-storage this) (.-storage state)) + (set! (.-streams this) (js/Map.))) + + Object + (fetch [this request] + (agent-do/handle-fetch this request))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs b/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs new file mode 100644 index 0000000000..5348cb3720 --- /dev/null +++ b/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs @@ -0,0 +1,199 @@ +(ns logseq.db-sync.worker.agent.do + (:require [lambdaisland.glogi :as log] + [logseq.db-sync.common :as common] + [logseq.db-sync.platform.core :as platform] + [logseq.db-sync.worker.agent.session :as session] + [logseq.db-sync.worker.http :as http] + [promesa.core :as p])) + +(defn- header [request name] + (.get (.-headers request) name)) + +(defn- user-id-from-request [request] + (header request "x-user-id")) + +(defn- sse-encode [event] + (str "data: " (js/JSON.stringify (clj->js event)) "\n\n")) + +(defn- clj value :keywordize-keys true)))) + +(defn- js value))) + +(defn- (.write writer (sse-encode event)) + (.catch (fn [_] + (.delete streams key))))))))) + +(defn- clj result :keywordize-keys true) + task-id (:id task) + user-id (user-id-from-request request) + now (common/now-ms) + audit-default {:requested-by user-id + :requested-at now} + audit (merge audit-default (:audit task))] + (cond + (not (string? user-id)) + (http/unauthorized) + + (not (string? task-id)) + (http/bad-request "invalid session id") + + (and (string? (:requested-by audit)) + (not= (:requested-by audit) user-id)) + (http/forbidden) + + :else + (let [session (session/initial-session task audit now) + [session events _event] (session/append-event session [] {:type "session.created" :data {:requested-by user-id} :ts now})] + (p/let [_ (clj result :keywordize-keys true) + message (:message body) + user-id (user-id-from-request request)] + (cond + (not (string? user-id)) + (http/unauthorized) + + (not (string? message)) + (http/bad-request "invalid message") + + :else + (p/let [res ( {:method method :headers headers} + (some? body) + (assoc :body body))] + (.fetch stub (platform/request url (clj->js init))))) + +(defn- handle-create [{:keys [env request url claims]}] + (.then (common/read-json request) + (fn [result] + (if (nil? result) + (http/bad-request "missing body") + (let [body (js->clj result :keywordize-keys true) + body (http/coerce-http-request :sessions/create body) + idempotency-key (.get (.-headers request) "idempotency-key") + session-id (or (:id body) idempotency-key)] + (cond + (nil? body) + (http/bad-request "invalid body") + + (not (string? session-id)) + (http/bad-request "invalid session id") + + :else + (if-let [^js stub (session-stub env session-id)] + (let [headers (base-headers request claims) + _ (.set headers "x-stream-base" (.-origin url)) + task (assoc body :id session-id) + body-json (js/JSON.stringify (clj->js task)) + do-url (str (.-origin url) "/__session__/init")] + (forward-request stub do-url "POST" headers body-json)) + (http/error-response "server error" 500)))))))) + +(defn- handle-get [{:keys [env request url claims route]}] + (let [session-id (get-in route [:path-params :session-id])] + (if-not (string? session-id) + (http/bad-request "invalid session id") + (if-let [^js stub (session-stub env session-id)] + (let [headers (base-headers request claims) + do-url (str (.-origin url) "/__session__/status")] + (forward-request stub do-url "GET" headers nil)) + (http/error-response "server error" 500))))) + +(defn- handle-messages [{:keys [env request url claims route]}] + (let [session-id (get-in route [:path-params :session-id])] + (if-not (string? session-id) + (http/bad-request "invalid session id") + (.then (common/read-json request) + (fn [result] + (if (nil? result) + (http/bad-request "missing body") + (let [body (js->clj result :keywordize-keys true) + body (http/coerce-http-request :sessions/message body)] + (cond + (nil? body) + (http/bad-request "invalid body") + + :else + (if-let [^js stub (session-stub env session-id)] + (let [headers (base-headers request claims) + body-json (js/JSON.stringify (clj->js body)) + do-url (str (.-origin url) "/__session__/messages")] + (forward-request stub do-url "POST" headers body-json)) + (http/error-response "server error" 500)))))))))) + +(defn- handle-cancel [{:keys [env request url claims route]}] + (let [session-id (get-in route [:path-params :session-id])] + (if-not (string? session-id) + (http/bad-request "invalid session id") + (if-let [^js stub (session-stub env session-id)] + (let [headers (base-headers request claims) + do-url (str (.-origin url) "/__session__/cancel")] + (forward-request stub do-url "POST" headers nil)) + (http/error-response "server error" 500))))) + +(defn- handle-stream [{:keys [env request url claims route]}] + (let [session-id (get-in route [:path-params :session-id])] + (if-not (string? session-id) + (http/bad-request "invalid session id") + (if-let [^js stub (session-stub env session-id)] + (let [headers (base-headers request claims) + do-url (str (.-origin url) "/__session__/stream")] + (forward-request stub do-url "GET" headers nil)) + (http/error-response "server error" 500))))) + +(defn handle [{:keys [route] :as ctx}] + (case (:handler route) + :sessions/create (handle-create ctx) + :sessions/get (handle-get ctx) + :sessions/messages (handle-messages ctx) + :sessions/cancel (handle-cancel ctx) + :sessions/stream (handle-stream ctx) + (http/not-found))) + +(defn handle-fetch [^js self request] + (let [env (.-env self) + url (js/URL. (.-url request)) + path (.-pathname url) + method (.-method request)] + (try + (cond + (contains? #{"OPTIONS" "HEAD"} method) + (common/options-response) + + :else + (p/let [claims (auth/auth-claims request env) + route (routes/match-route method path) + response (cond + (nil? claims) + (http/unauthorized) + + route + (handle {:env env + :request request + :url url + :claims claims + :route route}) + + :else + (http/not-found))] + response)) + (catch :default error + (log/error :agent/session-handler-error error) + (http/error-response "server error" 500))))) diff --git a/deps/db-sync/src/logseq/db_sync/worker/routes/index.cljs b/deps/db-sync/src/logseq/db_sync/worker/routes/index.cljs index 5cb0a32a7f..25c67df4ec 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/routes/index.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/routes/index.cljs @@ -20,7 +20,15 @@ ["/graphs/:graph-id" ["/aes-key" {:methods {"GET" :e2ee/graph-aes-key-get "POST" :e2ee/graph-aes-key-post}}] - ["/grant-access" {:methods {"POST" :e2ee/grant-access}}]]]]) + ["/grant-access" {:methods {"POST" :e2ee/grant-access}}]]] + + ["/sessions" + ["" {:methods {"POST" :sessions/create}}] + ["/:session-id" + ["" {:methods {"GET" :sessions/get}}] + ["/messages" {:methods {"POST" :sessions/messages}}] + ["/cancel" {:methods {"POST" :sessions/cancel}}] + ["/stream" {:methods {"GET" :sessions/stream}}]]]]) (def ^:private router (r/router route-data)) diff --git a/deps/db-sync/test/logseq/db_sync/agent_session_test.cljs b/deps/db-sync/test/logseq/db_sync/agent_session_test.cljs new file mode 100644 index 0000000000..2549489cfa --- /dev/null +++ b/deps/db-sync/test/logseq/db_sync/agent_session_test.cljs @@ -0,0 +1,43 @@ +(ns logseq.db-sync.agent-session-test + (:require [cljs.test :refer [deftest is testing]] + [logseq.db-sync.worker.agent.session :as session])) + +(deftest session-initialization-test + (testing "builds a created session with task and audit" + (let [task {:id "task-1"} + audit {:requested-by "user-1"} + now 123 + s (session/initial-session task audit now)] + (is (= "task-1" (:id s))) + (is (= "created" (:status s))) + (is (= task (:task s))) + (is (= audit (:audit s))) + (is (= now (:created-at s))) + (is (= now (:updated-at s)))))) + +(deftest session-event-transition-test + (testing "session status transitions on lifecycle events" + (let [task {:id "task-2"} + audit {:requested-by "user-2"} + now 100 + s0 (session/initial-session task audit now) + [s1 events1 e1] (session/append-event s0 [] {:type "session.running" :data {:ok true} :event-id "e1" :ts 101}) + [s2 events2 _e2] (session/append-event s1 events1 {:type "session.paused" :data {} :event-id "e2" :ts 102}) + [s3 events3 _e3] (session/append-event s2 events2 {:type "session.completed" :data {} :event-id "e3" :ts 103})] + (is (= "running" (:status s1))) + (is (= "paused" (:status s2))) + (is (= "completed" (:status s3))) + (is (= 3 (count events3))) + (is (= "e1" (:event-id e1))) + (is (= 103 (:updated-at s3)))))) + +(deftest session-event-defaults-test + (testing "append-event assigns defaults when missing" + (let [task {:id "task-3"} + audit {:requested-by "user-3"} + s0 (session/initial-session task audit 1) + [s1 events1 e1] (session/append-event s0 [] {:type "audit.log" :data {:msg "hi"}})] + (is (= "created" (:status s1))) + (is (= 1 (count events1))) + (is (string? (:event-id e1))) + (is (number? (:ts e1)))))) diff --git a/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs b/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs index 3226383380..0722de4c43 100644 --- a/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/worker_routes_test.cljs @@ -46,3 +46,20 @@ (testing "method mismatch returns nil" (is (nil? (routes/match-route "GET" "/graphs/graph-1/members/user-9"))) (is (nil? (routes/match-route "PUT" "/e2ee/user-keys"))))) + +(deftest match-route-sessions-test + (testing "sessions routes" + (let [match (routes/match-route "POST" "/sessions")] + (is (= :sessions/create (:handler match)))) + (let [match (routes/match-route "GET" "/sessions/session-1")] + (is (= :sessions/get (:handler match))) + (is (= "session-1" (get-in match [:path-params :session-id])))) + (let [match (routes/match-route "POST" "/sessions/session-2/messages")] + (is (= :sessions/messages (:handler match))) + (is (= "session-2" (get-in match [:path-params :session-id])))) + (let [match (routes/match-route "POST" "/sessions/session-3/cancel")] + (is (= :sessions/cancel (:handler match))) + (is (= "session-3" (get-in match [:path-params :session-id])))) + (let [match (routes/match-route "GET" "/sessions/session-4/stream")] + (is (= :sessions/stream (:handler match))) + (is (= "session-4" (get-in match [:path-params :session-id])))))) diff --git a/deps/db-sync/worker/wrangler.toml b/deps/db-sync/worker/wrangler.toml index d336b7d511..a1a79ef1c6 100644 --- a/deps/db-sync/worker/wrangler.toml +++ b/deps/db-sync/worker/wrangler.toml @@ -10,6 +10,10 @@ enabled = true name = "LOGSEQ_SYNC_DO" class_name = "SyncDO" +[[durable_objects.bindings]] +name = "LOGSEQ_AGENT_SESSION_DO" +class_name = "AgentSessionDO" + [[r2_buckets]] binding = "LOGSEQ_SYNC_ASSETS" bucket_name = "logseq-sync-assets-prod" @@ -23,6 +27,10 @@ database_id = "00325aa2-c805-4693-b599-900a25dcde42" tag = "v1" new_sqlite_classes = [ "SyncDO" ] +[[migrations]] +tag = "v2" +new_sqlite_classes = [ "AgentSessionDO" ] + [vars] COGNITO_JWKS_URL = "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_dtagLnju8/.well-known/jwks.json" COGNITO_ISSUER = "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_dtagLnju8" @@ -40,6 +48,10 @@ COGNITO_CLIENT_ID = "69cs1lgme7p8kbgld8n5kseii6" name = "LOGSEQ_SYNC_DO" class_name = "SyncDO" +[[env.staging.durable_objects.bindings]] +name = "LOGSEQ_AGENT_SESSION_DO" +class_name = "AgentSessionDO" + [[env.staging.migrations]] tag = "v1" new_sqlite_classes = [ "SyncDO" ] @@ -48,6 +60,10 @@ new_sqlite_classes = [ "SyncDO" ] tag = "v2" deleted_classes = [ "SyncIndexDO" ] +[[env.staging.migrations]] +tag = "v3" +new_sqlite_classes = [ "AgentSessionDO" ] + [[env.staging.r2_buckets]] binding = "LOGSEQ_SYNC_ASSETS" bucket_name = "logseq-sync-assets-dev" @@ -69,10 +85,18 @@ COGNITO_CLIENT_ID = "69cs1lgme7p8kbgld8n5kseii6" name = "LOGSEQ_SYNC_DO" class_name = "SyncDO" +[[env.prod.durable_objects.bindings]] +name = "LOGSEQ_AGENT_SESSION_DO" +class_name = "AgentSessionDO" + [[env.prod.migrations]] tag = "v1" new_sqlite_classes = [ "SyncDO" ] +[[env.prod.migrations]] +tag = "v2" +new_sqlite_classes = [ "AgentSessionDO" ] + [[env.prod.r2_buckets]] binding = "LOGSEQ_SYNC_ASSETS" bucket_name = "logseq-sync-assets-prod"