bump sandbox-agent

This commit is contained in:
Tienson Qin
2026-03-09 21:52:19 +08:00
parent 92a4e37ea2
commit 7d25a5f6a3
15 changed files with 905 additions and 260 deletions

View File

@@ -48,6 +48,11 @@
:ns-regexp "logseq\\.agents\\.e2b-runtime-provider-test$"
:devtools {:enabled false}
:main logseq.agents.e2b-runtime-provider-test-runner/main}
:agents-runtime-transport-test {:target :node-test
:output-to "worker/dist/agents-runtime-transport-test.js"
:ns-regexp "logseq\\.agents\\.(sandbox-test|runtime-provider-test)$"
:devtools {:enabled false}
:main logseq.agents.runtime-transport-test-runner/main}
:agents-m25-managed-auth-test {:target :node-test
:output-to "worker/dist/agents-m25-managed-auth-test.js"
:ns-regexp "logseq\\.agents\\.m25-managed-auth-test$"

View File

@@ -4,6 +4,7 @@
[logseq.agents.checkpoint-store :as checkpoint-store]
[logseq.agents.runner-store :as runner-store]
[logseq.agents.runtime-provider :as runtime-provider]
[logseq.agents.sandbox :as sandbox]
[logseq.agents.session :as session]
[logseq.agents.source-control :as source-control]
[logseq.sync.common :as common]
@@ -597,9 +598,12 @@
(defn- <append-runtime-event! [^js self session-id payload]
(p/let [current-session (<get-session self)]
(when (= session-id (:id current-session))
(let [event-type (or (:type payload) "agent.runtime")]
(let [{:keys [type data]} (or (sandbox/acp-envelope->event payload)
{:type (or (:type payload) "agent.runtime")
:data payload})
event-type type]
(p/let [_ (<append-event! self {:type event-type
:data payload
:data data
:ts (common/now-ms)})]
(when (= "session.completed" event-type)
(<checkpoint-and-terminate-completed-runtime! self session-id))
@@ -745,13 +749,17 @@
(if-not (and provider-value (runtime-ready? runtime-value))
(p/rejected (ex-info "session runtime unavailable"
{:session-id (:id session-value)}))
(let [ready-promise (ensure-runtime-events-stream-ready! self (:id session-value) runtime-value)]
(-> (<await-events-stream-ready ready-promise events-stream-ready-timeout-ms)
(.then (fn [_]
(runtime-provider/<send-message! provider-value
runtime-value
{:message message
:kind kind}))))))))
(let [ready-promise (ensure-runtime-events-stream-ready! self
(:id session-value)
runtime-value)]
(-> (<await-events-stream-ready ready-promise
events-stream-ready-timeout-ms)
(.then
(fn [_]
(runtime-provider/<send-message! provider-value
runtime-value
{:message message
:kind kind}))))))))
retry-send! (fn [error]
(p/let [latest-session (<get-session self)]
(if (or (not (map? latest-session))
@@ -1000,7 +1008,8 @@
(p/let [res (<append-event! self {:type "audit.log"
:data {:event "user-message"
:kind (:kind body)
:by user-id}})
:by user-id
:message message}})
session-before (<get-session self)
current-session (<maybe-resume-session-for-message! self session-before user-id)]
(cond

View File

@@ -11,7 +11,7 @@
;; -----------------------
(def ^:private local-host "http://localhost")
(def ^:private default-repo-base-dir "/workspace")
(def ^:private default-repo-base-dir "/home/user/workspace")
(def ^:private e2b-repo-base-dir "/home/user/workspace")
(defn- js-method
@@ -186,7 +186,7 @@
(defn- sandbox-agent-version
[^js env]
(or (env-str env "SANDBOX_AGENT_VERSION")
"0.1.5"))
"0.3.x"))
(defn- sandbox-agent-install-command
[^js env]
@@ -948,13 +948,15 @@
agent-token
session-id
payload
{:headers headers})]
{:headers headers
:cwd (get-repo-dir session-id task "local-runner")})]
{:provider "local-runner"
:runner-id runner-id
:base-url base-url
:agent-token agent-token
:access-client-id (get headers "CF-Access-Client-Id")
:access-client-secret (get headers "CF-Access-Client-Secret")
:server-id (:server-id response)
:session-id (:session-id response)})))
(<open-events-stream! [_ runtime]
@@ -967,7 +969,7 @@
:runtime runtime})))
(sandbox/<open-events-stream base-url
agent-token
(:session-id runtime)
(or (:server-id runtime) (:session-id runtime))
{:headers headers})))
(<send-message! [_ runtime message]
@@ -980,6 +982,7 @@
:runtime runtime})))
(sandbox/<send-message base-url
agent-token
(or (:server-id runtime) (:session-id runtime))
(:session-id runtime)
message
{:headers headers})))
@@ -1014,7 +1017,7 @@
(p/catch
(sandbox/<terminate-session base-url
agent-token
session-id
(or (:server-id runtime) session-id)
{:headers headers})
(fn [_] nil)))))))
@@ -1042,7 +1045,11 @@
:error-data (ex-data error)})
nil)))
base-url (e2b-sandbox-host sandbox port)
response (sandbox/<create-session base-url agent-token session-id payload)
response (sandbox/<create-session base-url
agent-token
session-id
payload
{:cwd (e2b-runtime-repo-dir {:session-id session-id} task)})
sandbox-id (e2b-sandbox-id sandbox)]
(when-not (string? sandbox-id)
(throw (ex-info "e2b sandbox missing sandboxId"
@@ -1054,18 +1061,25 @@
:backup-dir (e2b-runtime-repo-dir {:session-id session-id} task)
:base-url base-url
:agent-token agent-token
:server-id (:server-id response)
:session-id (:session-id response)
:template template})))
(<open-events-stream! [_ runtime]
(let [agent-token (e2b-agent-token env runtime)]
(p/let [base-url (<e2b-runtime-base-url! env runtime)]
(sandbox/<open-events-stream base-url agent-token (:session-id runtime)))))
(sandbox/<open-events-stream base-url
agent-token
(or (:server-id runtime) (:session-id runtime))))))
(<send-message! [_ runtime message]
(let [agent-token (e2b-agent-token env runtime)]
(p/let [base-url (<e2b-runtime-base-url! env runtime)]
(sandbox/<send-message base-url agent-token (:session-id runtime) message))))
(sandbox/<send-message base-url
agent-token
(or (:server-id runtime) (:session-id runtime))
(:session-id runtime)
message))))
(<open-terminal! [_ runtime request opts]
(<e2b-open-terminal! env runtime request opts))

View File

@@ -5,6 +5,7 @@
(def ^:private absolute-url-re #"^[A-Za-z][A-Za-z0-9+.-]*://")
(def ^:private local-host-re #"^(localhost|127(?:\.\d{1,3}){3}|\[::1\])(?::\d+)?(?:/.*)?$")
(def ^:private default-cwd "/home/user/workspace")
(defn normalize-base-url [base]
(let [base (some-> base str string/trim not-empty)
@@ -15,23 +16,8 @@
(re-find local-host-re base) (str "http://" base)
:else (str "https://" base))))
(defn sessions-base-url [base]
(str (normalize-base-url base) "/v1/sessions"))
(defn session-url [base session-id]
(str (sessions-base-url base) "/" session-id))
(defn messages-url [base session-id]
(str (session-url base session-id) "/messages"))
(defn messages-stream-url [base session-id]
(str (session-url base session-id) "/messages/stream"))
(defn events-sse-url [base session-id]
(str (session-url base session-id) "/events/sse"))
(defn terminate-url [base session-id]
(str (session-url base session-id) "/terminate"))
(defn acp-server-url [base server-id]
(str (normalize-base-url base) "/v1/acp/" (js/encodeURIComponent (or server-id ""))))
(defn exec-command-url [base]
(str (normalize-base-url base) "/v1/commands/exec"))
@@ -48,6 +34,7 @@
(def ^:private agent-aliases
{"claude-code" "claude"
"claude_code" "claude"
"chatgpt" "codex"
"open-code" "opencode"
"open_code" "opencode"})
@@ -75,102 +62,211 @@
(.then (.json resp) #(js->clj % :keywordize-keys true))
(js/Promise.resolve fallback))))
(defn- rpc-request [id method params]
(cond-> {:jsonrpc "2.0"
:id id
:method method}
(some? params) (assoc :params params)))
(defn- rpc-response-result [json]
(cond
(contains? json :error)
(throw (ex-info "sandbox ACP request failed"
{:response json}))
(contains? json :result)
(:result json)
:else json))
(defn- build-url
([base server-id]
(build-url base server-id nil))
([base server-id query]
(let [url (js/URL. (acp-server-url base server-id))]
(when (map? query)
(doseq [[k v] query]
(when (and (string? k) (some? v))
(.set (.-searchParams url) k (str v)))))
(.toString url))))
(defn- permission-mode->mode-id [agent permission-mode]
(case (normalize-agent-id agent)
"codex" (case permission-mode
"read-only" "read-only"
"default" "read-only"
"bypass" "auto"
"auto" "auto"
"full-access" "full-access"
nil)
"amp" (case permission-mode
"bypass" "bypass"
"default" "default"
nil)
"claude" (case permission-mode
"acceptedits" "acceptEdits"
"accept-edits" "acceptEdits"
"plan" "plan"
"bypass" "bypassPermissions"
"default" "default"
nil)
nil))
(defn session-mode-id [payload]
(let [agent (:agent payload)
agent-mode (some-> (or (:agent-mode payload) (:agentMode payload))
str string/trim not-empty)
permission-mode (some-> (or (:permission-mode payload) (:permissionMode payload))
str string/lower-case string/trim not-empty)]
(or (case (normalize-agent-id agent)
"opencode" agent-mode
"claude" (permission-mode->mode-id agent permission-mode)
"codex" (permission-mode->mode-id agent permission-mode)
"amp" (permission-mode->mode-id agent permission-mode)
agent-mode)
(permission-mode->mode-id agent permission-mode))))
(defn acp-envelope->event [payload]
(let [method (:method payload)
params (:params payload)]
(cond
(= "session/update" method)
{:type "agent.runtime"
:data {:method method
:session-id (:sessionId params)
:update (:update params)}}
:else nil)))
(defn <create-session
([base token session-id payload]
(<create-session base token session-id payload nil))
([base token session-id payload opts]
(let [agent (:agent payload)
agent-mode (or (:agent-mode payload) (:agentMode payload))
permission-mode (or (:permission-mode payload) (:permissionMode payload))
headers (js/Headers.)
extra-headers (:headers opts)
cwd (or (:cwd opts) default-cwd)
server-id (or session-id "")
agent-id (normalize-agent-id agent)
mode-id (session-mode-id payload)
_ (.set headers "content-type" "application/json")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
_ (add-extra-headers! headers extra-headers)
body (cond-> {:agent (normalize-agent-id agent)}
(string? agent-mode) (assoc :agentMode agent-mode)
(string? permission-mode) (assoc :permissionMode permission-mode))
req (json-request (session-url base session-id) "POST" headers body)]
(p/let [resp (js/fetch req)
status (.-status resp)
json (parse-json-or-default resp {})]
(if (<= 200 status 299)
(assoc json :session-id session-id)
(throw (ex-info "sandbox create-session failed"
{:status status
:session-id session-id
:response json})))))))
(defn <open-message-stream
[base token session-id message]
(let [headers (js/Headers.)
_ (.set headers "accept" "text/event-stream")
_ (.set headers "content-type" "application/json")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
req (json-request (messages-stream-url base session-id) "POST" headers
{:message (:message message)})]
(p/let [resp (js/fetch req)
status (.-status resp)]
(if (<= 200 status 299)
resp
(throw (ex-info "sandbox open-message-stream failed"
{:status status
:session-id session-id}))))))
init-req (json-request (build-url base server-id {"agent" agent-id}) "POST" headers
(rpc-request 1 "initialize"
{:protocolVersion "1.0"
:clientCapabilities {}
:clientInfo {:name "logseq-workers"
:version "v1"}}))
new-session-req (json-request (build-url base server-id) "POST" headers
(rpc-request 2 "session/new"
{:cwd cwd
:mcpServers []}))]
(p/let [init-resp (js/fetch init-req)
init-status (.-status init-resp)
init-json (parse-json-or-default init-resp {})
_ (when-not (<= 200 init-status 299)
(throw (ex-info "sandbox initialize-session failed"
{:status init-status
:session-id session-id
:response init-json})))
_ (rpc-response-result init-json)
session-resp (js/fetch new-session-req)
session-status (.-status session-resp)
_ (prn :debug :session-status session-status)
session-json (parse-json-or-default session-resp {})
session-result (if (<= 200 session-status 299)
(rpc-response-result session-json)
(throw (ex-info "sandbox create-session failed"
{:status session-status
:session-id session-id
:response session-json})))
remote-session-id (or (get-in session-json [:result :sessionId])
(get-in session-json [:result :session-id])
(when (map? session-result)
(or (:sessionId session-result)
(:session-id session-result)))
(when session-result
(aget session-result "sessionId"))
session-id)]
(if-not (string? mode-id)
{:server-id server-id
:session-id remote-session-id}
(let [mode-req (json-request (build-url base server-id) "POST" headers
(rpc-request 3 "session/set_mode"
{:sessionId remote-session-id
:modeId mode-id}))]
(p/let [mode-resp (js/fetch mode-req)
mode-status (.-status mode-resp)
mode-json (parse-json-or-default mode-resp {})]
(if (<= 200 mode-status 299)
(do
(rpc-response-result mode-json)
{:server-id server-id
:session-id remote-session-id})
(throw (ex-info "sandbox set-session-mode failed"
{:status mode-status
:session-id session-id
:response mode-json}))))))))))
(defn <open-events-stream
([base token session-id]
(<open-events-stream base token session-id nil))
([base token session-id opts]
([base token server-id]
(<open-events-stream base token server-id nil))
([base token server-id opts]
(let [headers (js/Headers.)
extra-headers (:headers opts)
_ (.set headers "accept" "text/event-stream")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
_ (add-extra-headers! headers extra-headers)
req (json-request (events-sse-url base session-id) "GET" headers nil)]
req (json-request (build-url base server-id) "GET" headers nil)]
(p/let [resp (js/fetch req)
status (.-status resp)]
(if (<= 200 status 299)
resp
(throw (ex-info "sandbox open-events-stream failed"
{:status status
:session-id session-id})))))))
:server-id server-id})))))))
(defn <send-message
([base token session-id message]
(<send-message base token session-id message nil))
([base token session-id message opts]
([base token server-id remote-session-id message]
(<send-message base token server-id remote-session-id message nil))
([base token server-id remote-session-id message opts]
(let [headers (js/Headers.)
extra-headers (:headers opts)
_ (.set headers "content-type" "application/json")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
_ (add-extra-headers! headers extra-headers)
body (cond-> {:message (:message message)}
(string? (:kind message)) (assoc :kind (:kind message)))
req (json-request (messages-url base session-id) "POST" headers body)]
body (rpc-request 4 "session/prompt"
{:sessionId remote-session-id
:prompt [{:type "text"
:text (:message message)}]})
req (json-request (build-url base server-id) "POST" headers body)]
(p/let [resp (js/fetch req)
status (.-status resp)]
(if (<= 200 status 299)
true
(if-not (<= 200 status 299)
(throw (ex-info "sandbox send-message failed"
{:status status
:session-id session-id})))))))
:server-id server-id
:session-id remote-session-id}))
(p/let [json (parse-json-or-default resp {})]
(rpc-response-result json)))))))
(defn <terminate-session
([base token session-id]
(<terminate-session base token session-id nil))
([base token session-id opts]
([base token server-id]
(<terminate-session base token server-id nil))
([base token server-id opts]
(let [headers (js/Headers.)
extra-headers (:headers opts)
_ (.set headers "content-type" "application/json")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
_ (add-extra-headers! headers extra-headers)
req (json-request (terminate-url base session-id) "POST" headers nil)]
req (json-request (build-url base server-id) "DELETE" headers nil)]
(p/let [resp (js/fetch req)
status (.-status resp)]
(when-not (<= 200 status 299)
(throw (ex-info "sandbox terminate-session failed"
{:status status
:session-id session-id})))
:server-id server-id})))
true))))
(defn <exec-command

View File

@@ -62,23 +62,37 @@
:exitCode 0})))}}))))
(set! js/fetch
(fn [request]
(is (= "POST" (.-method request)))
(is (= "https://e2b-agent.local/v1/sessions/sess-e2b-1" (.-url request)))
(is (= "Bearer agent-token"
(.get (.-headers request) "authorization")))
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200
:headers #js {"content-type" "application/json"}}))))
(-> (.text (.clone request))
(.then (fn [body-text]
(swap! calls conj {:type :fetch
:url (.-url request)
:method (.-method request)
:auth (.get (.-headers request) "authorization")
:body (js->clj (js/JSON.parse body-text)
:keywordize-keys true)})
(let [result (case (get-in @calls [(dec (count @calls)) :body :method])
"initialize" #js {:protocolVersion "0.1.0"}
"session/new" #js {:sessionId "remote-e2b-1"}
"session/set_mode" #js {}
#js {:ok true})]
(js/Response.
(js/JSON.stringify #js {:jsonrpc "2.0"
:id (get-in @calls [(dec (count @calls)) :body :id])
:result result})
#js {:status 200
:headers #js {"content-type" "application/json"}})))))))
(-> (runtime-provider/<provision-runtime! provider "sess-e2b-1" task)
(.then (fn [runtime]
(restore!)
(is (= "e2b" (:provider runtime)))
(is (= "e2b-sbx-1" (:sandbox-id runtime)))
(is (= "https://e2b-agent.local" (:base-url runtime)))
(is (= "sess-e2b-1" (:session-id runtime)))
(is (= "sess-e2b-1" (:server-id runtime)))
(is (= "remote-e2b-1" (:session-id runtime)))
(is (= 2468 (:sandbox-port runtime)))
(is (some #(and (= :fetch (:type %))
(= "https://e2b-agent.local/v1/acp/sess-e2b-1?agent=codex" (:url %)))
@calls))
(is (some #(and (= :command (:type %))
(string/includes? (:cmd %) "mkdir -p '/home/user/workspace'"))
@calls))

View File

@@ -70,28 +70,45 @@
:agent-token "runner-token"
:access-client-id "cf-access-id"
:access-client-secret "cf-access-secret"}}
original-fetch js/fetch]
original-fetch js/fetch
calls (atom [])]
(set! js/fetch
(fn [request init]
(is (= "https://runner.example.com/v1/sessions/sess-local-1" (fetch-url request)))
(is (= "POST" (fetch-method request init)))
(is (= "Bearer runner-token"
(.get (.-headers request) "authorization")))
(is (= "cf-access-id"
(.get (.-headers request) "CF-Access-Client-Id")))
(is (= "cf-access-secret"
(.get (.-headers request) "CF-Access-Client-Secret")))
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200 :headers #js {"content-type" "application/json"}}))))
(-> (.text (.clone request))
(.then (fn [body-text]
(swap! calls conj {:url (fetch-url request)
:method (fetch-method request init)
:auth (.get (.-headers request) "authorization")
:cf-id (.get (.-headers request) "CF-Access-Client-Id")
:cf-secret (.get (.-headers request) "CF-Access-Client-Secret")
:body (js->clj (js/JSON.parse body-text)
:keywordize-keys true)})
(let [result (case (get-in @calls [(dec (count @calls)) :body :method])
"initialize" #js {:protocolVersion "0.1.0"}
"session/new" #js {:sessionId "remote-local-1"}
"session/set_mode" #js {}
#js {:ok true})]
(js/Response.
(js/JSON.stringify #js {:jsonrpc "2.0"
:id (get-in @calls [(dec (count @calls)) :body :id])
:result result})
#js {:status 200 :headers #js {"content-type" "application/json"}})))))))
(-> (runtime-provider/<provision-runtime! provider "sess-local-1" task)
(.then (fn [runtime]
(set! js/fetch original-fetch)
(is (= ["https://runner.example.com/v1/acp/sess-local-1?agent=codex"
"https://runner.example.com/v1/acp/sess-local-1"
"https://runner.example.com/v1/acp/sess-local-1"]
(mapv :url @calls)))
(is (every? #(= "POST" (:method %)) @calls))
(is (every? #(= "Bearer runner-token" (:auth %)) @calls))
(is (every? #(= "cf-access-id" (:cf-id %)) @calls))
(is (every? #(= "cf-access-secret" (:cf-secret %)) @calls))
(is (= "local-runner" (:provider runtime)))
(is (= "runner-1" (:runner-id runtime)))
(is (= "https://runner.example.com" (:base-url runtime)))
(is (= "sess-local-1" (:session-id runtime)))
(is (= "sess-local-1" (:server-id runtime)))
(is (= "remote-local-1" (:session-id runtime)))
(done)))
(.catch (fn [error]
(set! js/fetch original-fetch)

View File

@@ -0,0 +1,25 @@
(ns logseq.agents.runtime-transport-test-runner
(:require [cljs.test :as ct]
[logseq.agents.runtime-provider-test]
[logseq.agents.sandbox-test]
[shadow.test :as st]
[shadow.test.env :as env]))
(derive ::node ::ct/default)
(defmethod ct/report [::node :end-run-tests] [m]
(if (ct/successful? m)
(js/process.exit 0)
(js/process.exit 1)))
(defn ^:dev/after-load reset-test-data! []
(when-let [test-data (env/get-test-data)]
(env/reset-test-data! test-data)))
(defn main [& _args]
(reset-test-data!)
(ct/test-vars [#'logseq.agents.runtime-provider-test/local-runner-provider-provision-test
#'logseq.agents.sandbox-test/session-endpoint-test
#'logseq.agents.sandbox-test/create-session-payload-test
#'logseq.agents.sandbox-test/send-message-uses-acp-prompt-test
#'logseq.agents.sandbox-test/acp-envelope-event-test]))

View File

@@ -12,19 +12,11 @@
(is (= "http://localhost:8787" (sandbox/normalize-base-url "localhost:8787")))))
(deftest session-endpoint-test
(testing "builds sandbox session endpoints"
(testing "builds sandbox ACP endpoints"
(let [base "https://sandbox.example"
session-id "sess-1"]
(is (= "https://sandbox.example/v1/sessions"
(sandbox/sessions-base-url base)))
(is (= "https://sandbox.example/v1/sessions/sess-1"
(sandbox/session-url base session-id)))
(is (= "https://sandbox.example/v1/sessions/sess-1/messages"
(sandbox/messages-url base session-id)))
(is (= "https://sandbox.example/v1/sessions/sess-1/events/sse"
(sandbox/events-sse-url base session-id)))
(is (= "https://sandbox.example/v1/sessions/sess-1/messages/stream"
(sandbox/messages-stream-url base session-id))))))
server-id "sess-1"]
(is (= "https://sandbox.example/v1/acp/sess-1"
(sandbox/acp-server-url base server-id))))))
(deftest snapshot-endpoint-test
(testing "builds sandbox snapshot and exec endpoints"
@@ -50,33 +42,109 @@
(deftest create-session-payload-test
(async done
(let [original-fetch js/fetch
payloads (atom [])]
calls (atom [])]
(set! js/fetch
(fn [request]
(-> (.text (.clone request))
(.then (fn [body-text]
(swap! payloads conj (js->clj (js/JSON.parse body-text)
:keywordize-keys true))
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200
:headers #js {"content-type" "application/json"}}))))))
(swap! calls conj {:url (.-url request)
:method (.-method request)
:body (js->clj (js/JSON.parse body-text)
:keywordize-keys true)})
(let [method (:method (last @calls))
result (case method
"initialize" #js {:protocolVersion "0.1.0"}
"session/new" #js {:sessionId "remote-sess-1"}
"session/set_mode" #js {}
#js {:ok true})]
(js/Response.
(js/JSON.stringify #js {:jsonrpc "2.0"
:id (get-in @calls [(dec (count @calls)) :body :id])
:result result})
#js {:status 200
:headers #js {"content-type" "application/json"}})))))))
(-> (sandbox/<create-session "https://sandbox.example" "token" "sess-1"
{:agent "codex"
:agentMode "build"
:permissionMode "bypass"})
(.then (fn [_]
(sandbox/<create-session "https://sandbox.example" "token" "sess-2"
{:agent "codex"
:agent-mode "build"
:permission-mode "read-only"})))
:permissionMode "bypass"}
{:cwd "/workspace/sess-1"})
(.then (fn [_]
(set! js/fetch original-fetch)
(is (= "bypass" (get-in @payloads [0 :permissionMode])))
(is (= "build" (get-in @payloads [0 :agentMode])))
(is (= "read-only" (get-in @payloads [1 :permissionMode])))
(is (= "build" (get-in @payloads [1 :agentMode])))
(is (= ["https://sandbox.example/v1/acp/sess-1?agent=codex"
"https://sandbox.example/v1/acp/sess-1"
"https://sandbox.example/v1/acp/sess-1"]
(mapv :url @calls)))
(is (= "initialize" (get-in @calls [0 :body :method])))
(is (= "session/new" (get-in @calls [1 :body :method])))
(is (= "/workspace/sess-1"
(get-in @calls [1 :body :params :cwd])))
(is (= [] (get-in @calls [1 :body :params :mcpServers])))
(is (= "session/set_mode" (get-in @calls [2 :body :method])))
(is (= "auto"
(get-in @calls [2 :body :params :modeId])))
(is (= "sess-1"
(get-in @calls [2 :body :params :sessionId])))
(done)))
(.catch (fn [error]
(set! js/fetch original-fetch)
(throw error)))))))
(deftest send-message-uses-acp-prompt-test
(async done
(let [original-fetch js/fetch
requests (atom [])]
(set! js/fetch
(fn [request]
(-> (.text (.clone request))
(.then (fn [body-text]
(swap! requests conj {:url (.-url request)
:method (.-method request)
:body (js->clj (js/JSON.parse body-text)
:keywordize-keys true)})
(js/Response.
(js/JSON.stringify #js {:jsonrpc "2.0"
:id (get-in @requests [(dec (count @requests)) :body :id])
:result #js {:stopReason "end_turn"}})
#js {:status 200
:headers #js {"content-type" "application/json"}}))))))
(-> (sandbox/<send-message "https://sandbox.example" "token" "srv-1" "remote-sess-1"
{:message "Ship it"})
(.then (fn [_]
(set! js/fetch original-fetch)
(is (= "https://sandbox.example/v1/acp/srv-1"
(get-in @requests [0 :url])))
(is (= "session/prompt" (get-in @requests [0 :body :method])))
(is (= "remote-sess-1"
(get-in @requests [0 :body :params :sessionId])))
(is (= [{:type "text" :text "Ship it"}]
(get-in @requests [0 :body :params :prompt])))
(done)))
(.catch (fn [error]
(set! js/fetch original-fetch)
(throw error)))))))
(deftest acp-envelope-event-test
(testing "normalizes ACP session updates into worker runtime events only"
(is (nil? (sandbox/acp-envelope->event
{:jsonrpc "2.0"
:id 3
:result {:stopReason "end_turn"}})))
(is (nil? (sandbox/acp-envelope->event
{:jsonrpc "2.0"
:id 4
:result {:stopReason "cancelled"}})))
(is (= {:type "agent.runtime"
:data {:method "session/update"
:session-id "remote-sess-1"
:update {:sessionUpdate "agent_message_chunk"
:content "hello"}}}
(sandbox/acp-envelope->event
{:jsonrpc "2.0"
:method "session/update"
:params {:sessionId "remote-sess-1"
:update {:sessionUpdate "agent_message_chunk"
:content "hello"}}})))
(is (nil? (sandbox/acp-envelope->event
{:jsonrpc "2.0"
:id 1
:result {:sessionId "remote-sess-1"}})))))

View File

@@ -0,0 +1,8 @@
:run-background-task :logseq.db.common.entity-plus/reset-immutable-entities-cache!
:run-background-task :frontend.background-tasks/sync-to-worker-network-online-status
:run-background-task :frontend.components.rtc.indicator/update-accumulated-download-logs
:run-background-task :frontend.components.rtc.indicator/update-accumulated-upload-logs
:run-background-task :frontend.worker.embedding/subscribe-state
Ran 0 tests containing 0 assertions.
0 failures, 0 errors.

View File

@@ -0,0 +1,8 @@
:run-background-task :logseq.db.common.entity-plus/reset-immutable-entities-cache!
:run-background-task :frontend.background-tasks/sync-to-worker-network-online-status
:run-background-task :frontend.components.rtc.indicator/update-accumulated-download-logs
:run-background-task :frontend.components.rtc.indicator/update-accumulated-upload-logs
:run-background-task :frontend.worker.embedding/subscribe-state
Ran 0 tests containing 0 assertions.
0 failures, 0 errors.

View File

@@ -0,0 +1,8 @@
:run-background-task :logseq.db.common.entity-plus/reset-immutable-entities-cache!
:run-background-task :frontend.background-tasks/sync-to-worker-network-online-status
:run-background-task :frontend.components.rtc.indicator/update-accumulated-download-logs
:run-background-task :frontend.components.rtc.indicator/update-accumulated-upload-logs
:run-background-task :frontend.worker.embedding/subscribe-state
Ran 0 tests containing 0 assertions.
0 failures, 0 errors.

View File

@@ -82,6 +82,49 @@
(when (string? (:output_text payload)) (:output_text payload))
(when (string? (:raw payload)) (:raw payload))))
(defn- acp-runtime-update
[event]
(let [data (when (map? (:data event)) (:data event))]
(when (= "session/update" (:method data))
(let [update (:update data)]
(when (map? update)
update)))))
(defn- acp-runtime-update-kind
[event]
(some-> (acp-runtime-update event)
:sessionUpdate
str
string/trim
not-empty))
(defn- acp-runtime-session-id
[event]
(some-> (when (map? (:data event)) (:data event))
:session-id
str
string/trim
not-empty))
(defn- acp-runtime-update-text
[event]
(let [update (acp-runtime-update event)
content (:content update)]
(or (when (map? content)
(or (some-> (:text content) str)
(some-> (:delta content) str)))
(some-> (:text update) str)
(some-> (:delta update) str))))
(defn- acp-runtime-stop-reason
[event]
(some-> (when (map? (:data event)) (:data event))
:result
:stopReason
str
string/trim
not-empty))
(defn- agent-title
[agent-value]
(cond
@@ -96,7 +139,10 @@
base (let [acc (atom {:items {}
:order []
:item-kind-by-id {}
:tool-call-id-by-item-id {}})]
:tool-call-id-by-item-id {}
:acp-turn-by-runtime {}
:acp-active-item-by-runtime {}
:last-acp-runtime-session-id nil})]
(letfn [(known-item-kind [item-id]
(get-in @acc [:item-kind-by-id item-id]))
(remember-item-kind! [item-id item-kind]
@@ -231,6 +277,20 @@
(:output part)
"")
delta)))))))
(current-acp-item-id [runtime-session-id]
(get-in @acc [:acp-active-item-by-runtime runtime-session-id]))
(ensure-acp-item-id! [runtime-session-id]
(when (string? runtime-session-id)
(swap! acc assoc :last-acp-runtime-session-id runtime-session-id)
(or (current-acp-item-id runtime-session-id)
(let [turn (or (get-in @acc [:acp-turn-by-runtime runtime-session-id]) 1)
item-id (str runtime-session-id "-turn-" turn)]
(swap! acc assoc-in [:acp-active-item-by-runtime runtime-session-id] item-id)
item-id))))
(advance-acp-turn! [runtime-session-id]
(when (string? runtime-session-id)
(swap! acc update-in [:acp-turn-by-runtime runtime-session-id] (fnil inc 1))
(swap! acc update :acp-active-item-by-runtime dissoc runtime-session-id)))
(process-content-part! [item-id role part]
(let [part-kind (chat-event/content-part-kind part)]
(cond
@@ -253,71 +313,92 @@
:else nil)))
(process-event! [event]
(let [event-type (:type event)
payload (chat-event/unwrap-event-payload event)
payload (if (map? payload) payload {})
item (if (map? (:item payload)) (:item payload) {})
item-id (chat-event/event-item-id event payload item)
item-kind (or (chat-event/event-item-kind payload item)
(known-item-kind item-id))
role (chat-role (or (:role item)
(:role payload)
(when (= "audit.log" event-type)
(role-from-kind (:kind payload) "user"))
(:kind payload)
"assistant"))
merged (merge payload item)
delta (payload-delta payload)
known-tool-call? (string? (known-tool-call-id item-id))
tool-result-like? (or (= "tool-result" item-kind)
known-tool-call?)]
(when (string? item-id)
(remember-item-kind! item-id item-kind)
(case event-type
"item.started"
(when (= "tool-result" item-kind)
(set-tool-input! item-id role merged))
runtime-update-kind (acp-runtime-update-kind event)]
(cond
(and (= "agent.runtime" event-type)
(= "agent_message_chunk" runtime-update-kind))
(when-let [item-id (ensure-acp-item-id! (acp-runtime-session-id event))]
(append-text-part! item-id "assistant" (acp-runtime-update-text event)))
("item.completed" "response.completed")
(if (seq (:content item))
(doseq [part (:content item)]
(process-content-part! item-id role part))
(cond
(= "reasoning" item-kind)
(set-reasoning-part! item-id role (or (chat-event/payload-text item)
(chat-event/payload-text payload)))
(and (= "agent.runtime" event-type)
(= "agent_thought_chunk" runtime-update-kind))
(when-let [item-id (ensure-acp-item-id! (acp-runtime-session-id event))]
(append-reasoning-part! item-id "assistant" (acp-runtime-update-text event)))
(= "tool-call" item-kind)
(set-tool-input! item-id role merged)
(and (= "agent.runtime" event-type)
(string? (acp-runtime-stop-reason event)))
(advance-acp-turn! (or (acp-runtime-session-id event)
(:last-acp-runtime-session-id @acc)))
(= "tool-result" item-kind)
(set-tool-output! item-id role merged)
(= "agent.runtime" event-type)
nil
(= "status" item-kind)
(when-let [error-message (chat-event/status-error-message merged)]
(set-text-part! item-id "assistant" error-message))
:else
(let [payload (chat-event/unwrap-event-payload event)
payload (if (map? payload) payload {})
item (if (map? (:item payload)) (:item payload) {})
item-id (chat-event/event-item-id event payload item)
item-kind (or (chat-event/event-item-kind payload item)
(known-item-kind item-id))
role (chat-role (or (:role item)
(:role payload)
(when (= "audit.log" event-type)
(role-from-kind (:kind payload) "user"))
(:kind payload)
"assistant"))
merged (merge payload item)
delta (payload-delta payload)
known-tool-call? (string? (known-tool-call-id item-id))
tool-result-like? (or (= "tool-result" item-kind)
known-tool-call?)]
(when (string? item-id)
(remember-item-kind! item-id item-kind)
(case event-type
"item.started"
(when (= "tool-result" item-kind)
(set-tool-input! item-id role merged))
:else
(set-text-part! item-id role (or (chat-event/payload-text item)
(chat-event/payload-text payload)))))
("item.completed" "response.completed")
(if (seq (:content item))
(doseq [part (:content item)]
(process-content-part! item-id role part))
(cond
(= "reasoning" item-kind)
(set-reasoning-part! item-id role (or (chat-event/payload-text item)
(chat-event/payload-text payload)))
"audit.log"
(set-text-part! item-id "user" (chat-event/payload-text payload))
(= "tool-call" item-kind)
(set-tool-input! item-id role merged)
nil)
(= "tool-result" item-kind)
(set-tool-output! item-id role merged)
(when (chat-event/delta-event? event-type)
(cond
(= "reasoning" item-kind)
(append-reasoning-part! item-id role delta)
(= "status" item-kind)
(when-let [error-message (chat-event/status-error-message merged)]
(set-text-part! item-id "assistant" error-message))
(= "tool-call" item-kind)
(set-tool-input! item-id role merged)
:else
(set-text-part! item-id role (or (chat-event/payload-text item)
(chat-event/payload-text payload)))))
tool-result-like?
(append-tool-output-delta! item-id role merged delta)
"audit.log"
(set-text-part! item-id "user" (chat-event/payload-text payload))
:else
(append-text-part! item-id role delta))))))]
nil)
(when (chat-event/delta-event? event-type)
(cond
(= "reasoning" item-kind)
(append-reasoning-part! item-id role delta)
(= "tool-call" item-kind)
(set-tool-input! item-id role merged)
tool-result-like?
(append-tool-output-delta! item-id role merged delta)
:else
(append-text-part! item-id role delta))))))))]
(doseq [event events]
(process-event! event))
(->> (:order @acc)

View File

@@ -17,6 +17,12 @@
(let [trimmed (string/trim value)]
(when-not (string/blank? trimmed) trimmed))))
(defn- non-blank-str-preserve
[value]
(when (string? value)
(when-not (string/blank? value)
value)))
(defn- read-field
[x k]
(cond
@@ -159,6 +165,31 @@
[event-type]
(contains? #{"session.completed" "session.failed" "session.canceled"} event-type))
(defn- acp-runtime-update
[event]
(let [data (if (map? (:data event)) (:data event) {})]
(when (= "session/update" (:method data))
(let [update (:update data)]
(when (map? update)
update)))))
(defn- acp-runtime-update-kind
[event]
(some-> (acp-runtime-update event)
:sessionUpdate
str
string/trim
not-empty))
(defn- acp-update-text
[update]
(let [content (:content update)]
(or (when (map? content)
(or (non-blank-str-preserve (:text content))
(non-blank-str-preserve (:delta content))))
(non-blank-str-preserve (:text update))
(non-blank-str-preserve (:delta update)))))
(defn- ^:large-vars/cleanup-todo start-stream-consumer!
[{:keys [response writer start-ts idle-timeout-ms abort-signal]}]
(let [reader (.getReader (.-body response))
@@ -328,6 +359,15 @@
[{:type "text-delta"
:id text-part-id
:delta delta}])))))
(acp-text-delta-chunks! [delta]
(let [delta (non-blank-str-preserve delta)]
(if-not (string? delta)
[]
(vec (concat (start-chunks!)
(text-start-chunks!)
[{:type "text-delta"
:id text-part-id
:delta delta}])))))
(event-chunks-for-content-part! [item-id idx part]
(let [part-kind (chat-event/content-part-kind part)
nested-item-id (str (or item-id "item") "-" idx)]
@@ -424,22 +464,32 @@
(<handle-event!
[event]
(let [event-type (:type event)
event-ts (:ts event)]
event-ts (:ts event)
runtime-update (acp-runtime-update event)
runtime-update-kind (acp-runtime-update-kind event)]
(if (or (not (number? event-ts))
(<= event-ts start-ts)
@finished?)
(js/Promise.resolve nil)
(let [data (:data event)
payload (chat-event/unwrap-event-payload
{:data (if (map? data) data {})})
item (:item payload)
item-id (chat-event/event-item-id {} payload item)]
(let [data (:data event)]
(cond
(= event-type "agent.runtime.error")
(-> (write-chunks! writer [{:type "error"
:errorText (runtime-error-message event)}])
(.finally (fn [] (finish!))))
(and (= event-type "agent.runtime")
(= "agent_message_chunk" runtime-update-kind))
(<write-event-chunks! (acp-text-delta-chunks! (acp-update-text runtime-update)))
(and (= event-type "agent.runtime")
(= "agent_thought_chunk" runtime-update-kind))
(<write-event-chunks! (reasoning-delta-chunks! "acp-reasoning"
(acp-update-text runtime-update)))
(= event-type "agent.runtime")
(js/Promise.resolve nil)
(terminal-session-event? event-type)
(-> (if (= event-type "session.completed")
(js/Promise.resolve nil)
@@ -447,77 +497,78 @@
:errorText event-type}]))
(.finally (fn [] (finish!))))
(= event-type "item.started")
(let [item-kind (or (chat-event/event-item-kind payload item)
(known-item-kind item-id))
merged (merge (if (map? payload) payload {})
(if (map? item) item {}))
_ (remember-item-kind! item-id item-kind)
chunks (cond
(= "tool-result" item-kind)
(vec (concat (start-chunks!)
(tool-input-available-once-chunks! item-id merged)))
:else
[])]
(<write-event-chunks! chunks))
(chat-event/delta-event? event-type)
(let [delta (non-empty-str (chat-event/payload-text payload))
:else
(let [payload (chat-event/unwrap-event-payload
{:data (if (map? data) data {})})
item (:item payload)
item-id (chat-event/event-item-id {} payload item)
item-kind (or (chat-event/event-item-kind payload item)
(known-item-kind item-id))
item-key (item-key item-id)
merged (merge (if (map? payload) payload {})
(if (map? item) item {}))
tool-call-id (resolve-tool-call-id item-id merged)
known-tool-call? (contains? @tool-input-available-ids tool-call-id)
tool-result-like? (or (= "tool-result" item-kind)
known-tool-call?)
_ (remember-item-kind! item-id item-kind)
chunks (cond
(and (= "reasoning" item-kind) (string? delta))
(reasoning-delta-chunks! item-id delta)
(if (map? item) item {}))]
(cond
(= event-type "item.started")
(let [_ (remember-item-kind! item-id item-kind)
chunks (cond
(= "tool-result" item-kind)
(vec (concat (start-chunks!)
(tool-input-available-once-chunks! item-id merged)))
(and (= "tool-call" item-kind) (string? delta))
(tool-input-delta-chunks! item-id merged delta)
:else
[])]
(<write-event-chunks! chunks))
(and tool-result-like? (string? delta))
(let [previous (or (get @tool-output-text-by-call tool-call-id) "")
next-output (str previous delta)]
(swap! tool-output-text-by-call assoc tool-call-id next-output)
(vec (concat (start-chunks!)
(tool-input-available-once-chunks! item-id merged)
[{:type "tool-output-available"
:toolCallId tool-call-id
:output next-output
:preliminary true}])))
(chat-event/delta-event? event-type)
(let [delta (non-empty-str (chat-event/payload-text payload))
tool-call-id (resolve-tool-call-id item-id merged)
known-tool-call? (contains? @tool-input-available-ids tool-call-id)
tool-result-like? (or (= "tool-result" item-kind)
known-tool-call?)
item-key (item-key item-id)
chunks (cond
(and (= "reasoning" item-kind) (string? delta))
(reasoning-delta-chunks! item-id delta)
(string? delta)
(text-delta-chunks! delta)
(and (= "tool-call" item-kind) (string? delta))
(tool-input-delta-chunks! item-id merged delta)
:else
[])]
(when (and (string? item-key)
(= item-kind nil)
(string? delta))
(swap! text-delta-item-ids conj item-key))
(when (and (string? item-key)
(contains? #{"message" "text"} item-kind)
(string? delta))
(swap! text-delta-item-ids conj item-key))
(<write-event-chunks! chunks))
(and tool-result-like? (string? delta))
(let [previous (or (get @tool-output-text-by-call tool-call-id) "")
next-output (str previous delta)]
(swap! tool-output-text-by-call assoc tool-call-id next-output)
(vec (concat (start-chunks!)
(tool-input-available-once-chunks! item-id merged)
[{:type "tool-output-available"
:toolCallId tool-call-id
:output next-output
:preliminary true}])))
(item-completed-event? event-type)
(let [chunks (item-completed-chunks! payload item item-id)]
(<write-event-chunks! chunks))
(string? delta)
(text-delta-chunks! delta)
(response-completed-event? event-type)
(let [chunks (item-completed-chunks! payload item item-id)]
(-> (<write-event-chunks! chunks)
(.finally (fn [] (finish!)))))
:else
[])]
(when (and (string? item-key)
(= item-kind nil)
(string? delta))
(swap! text-delta-item-ids conj item-key))
(when (and (string? item-key)
(contains? #{"message" "text"} item-kind)
(string? delta))
(swap! text-delta-item-ids conj item-key))
(<write-event-chunks! chunks))
:else
(js/Promise.resolve nil))))))
(item-completed-event? event-type)
(let [chunks (item-completed-chunks! payload item item-id)]
(<write-event-chunks! chunks))
(response-completed-event? event-type)
(let [chunks (item-completed-chunks! payload item item-id)]
(-> (<write-event-chunks! chunks)
(.finally (fn [] (finish!)))))
:else
(js/Promise.resolve nil))))))))
(<drain-frames!
[frames]
(reduce (fn [promise frame]

View File

@@ -152,6 +152,104 @@
:text "Your access token could not be refreshed because your refresh token was already used. Please log out and sign in again."}]}]
messages)))))
(deftest session->messages-includes-acp-agent-message-chunks-test
(testing "maps ACP agent.runtime session/update message chunks into assistant text"
(let [session {:events [{:type "agent.runtime"
:ts 1100
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "agent_message_chunk"
:content {:type "text"
:text "Received"}}}}
{:type "agent.runtime"
:ts 1110
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "agent_message_chunk"
:content {:type "text"
:text " test"}}}}]}
messages (#'agent-chat/session->messages session {:block/uuid "b5"})]
(is (= [{:id "runtime-1"
:role "assistant"
:parts [{:type "text"
:text "Received test"}]}]
messages)))))
(deftest session->messages-ignores-non-chat-acp-runtime-events-test
(testing "does not render ACP setup/config events as chat messages"
(let [session {:events [{:type "agent.runtime"
:ts 1100
:data {:id 1
:jsonrpc "2.0"
:result {:protocolVersion 1}}}
{:type "agent.runtime"
:ts 1110
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "config_option_update"
:configOptions [{:id "mode"
:currentValue "auto"}]}}}
{:type "agent.runtime"
:ts 1120
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "usage_update"
:used 42}}}]}
messages (#'agent-chat/session->messages session {:block/uuid "b6"})]
(is (= [] messages)))))
(deftest session->messages-splits-acp-turns-by-end-turn-result-test
(testing "separates assistant replies across ACP turns in the same runtime session"
(let [session {:events [{:type "audit.log"
:ts 1000
:data {:event "user-message"
:kind "user"
:by "u1"
:message "first"}}
{:type "agent.runtime"
:ts 1100
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "agent_message_chunk"
:content {:type "text"
:text "First"}}}}
{:type "agent.runtime"
:ts 1110
:session-id "do-session-1"
:data {:id 4
:jsonrpc "2.0"
:result {:stopReason "end_turn"}}}
{:type "audit.log"
:ts 1200
:data {:event "user-message"
:kind "user"
:by "u1"
:message "second"}}
{:type "agent.runtime"
:ts 1300
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "agent_message_chunk"
:content {:type "text"
:text "Second"}}}}
{:type "agent.runtime"
:ts 1310
:session-id "do-session-1"
:data {:id 5
:jsonrpc "2.0"
:result {:stopReason "end_turn"}}}]}
messages (#'agent-chat/session->messages session {:block/uuid "b7"})]
(is (= [{:id "task-b7"
:role "user"
:parts [{:type "text" :text "first"}]}
{:id "runtime-1-turn-1"
:role "assistant"
:parts [{:type "text" :text "First"}]}
{:id "runtime-1-turn-2"
:role "assistant"
:parts [{:type "text" :text "Second"}]}]
messages)))))
(deftest session-messages-need-sync-detects-content-growth-without-clobbering-optimistic-ui-test
(testing "session updates with richer content should sync even when count is unchanged"
(let [f (some-> (resolve 'frontend.components.agent-chat/session-messages-need-sync?)

View File

@@ -102,6 +102,149 @@
(is false (str "unexpected error: " error))
(done)))))))
(deftest send-messages-streams-acp-agent-message-chunks-test
(async done
(let [fetch-fn (fn [url init]
(let [method (or (aget init "method") "GET")]
(cond
(and (= "POST" method)
(= "http://db-sync.local/sessions/sess-1/messages" url))
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200
:headers #js {"content-type" "application/json"}}))
(and (= "GET" method)
(= "http://db-sync.local/sessions/sess-1/stream" url))
(js/Promise.resolve
(sse-response
[{:type "agent.runtime"
:ts 1100
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "agent_message_chunk"
:content {:type "text"
:text "Received"}}}}
{:type "agent.runtime"
:ts 1200
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "agent_message_chunk"
:content {:type "text"
:text " test"}}}}
{:type "agent.runtime"
:ts 1300
:data {:id 4
:jsonrpc "2.0"
:result {:stopReason "end_turn"}}}]))
:else
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:error "unexpected request"})
#js {:status 500
:headers #js {"content-type" "application/json"}})))))
transport (agent-chat-transport/make-transport
{:base "http://db-sync.local"
:session-id "sess-1"
:fetch-fn fetch-fn
:now-fn (fn [] 1000)
:idle-timeout-ms 50})]
(-> (.sendMessages transport
#js {:chatId "sess-1"
:trigger "submit-message"
:messageId nil
:messages #js [#js {:id "user-1"
:role "user"
:parts #js [#js {:type "text"
:text "hello from ui"}]}]})
(.then <read-all-chunks)
(.then (fn [chunks]
(let [types (mapv :type chunks)
deltas (->> chunks
(filter #(= "text-delta" (:type %)))
(map :delta)
vec)]
(is (= ["start"
"start-step"
"text-start"
"text-delta"
"text-delta"
"text-end"
"finish-step"
"finish"]
types))
(is (= ["Received" " test"] deltas))
(done))))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done)))))))
(deftest send-messages-ignores-acp-setup-and-config-runtime-events-test
(async done
(let [fetch-fn (fn [url init]
(let [method (or (aget init "method") "GET")]
(cond
(and (= "POST" method)
(= "http://db-sync.local/sessions/sess-1/messages" url))
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200
:headers #js {"content-type" "application/json"}}))
(and (= "GET" method)
(= "http://db-sync.local/sessions/sess-1/stream" url))
(js/Promise.resolve
(sse-response
[{:type "agent.runtime"
:ts 1100
:data {:id 1
:jsonrpc "2.0"
:result {:protocolVersion 1}}}
{:type "agent.runtime"
:ts 1200
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "config_option_update"
:configOptions [{:id "mode"
:currentValue "auto"}]}}}
{:type "agent.runtime"
:ts 1300
:data {:method "session/update"
:session-id "runtime-1"
:update {:sessionUpdate "usage_update"
:used 42}}}]))
:else
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:error "unexpected request"})
#js {:status 500
:headers #js {"content-type" "application/json"}})))))
transport (agent-chat-transport/make-transport
{:base "http://db-sync.local"
:session-id "sess-1"
:fetch-fn fetch-fn
:now-fn (fn [] 1000)
:idle-timeout-ms 50})]
(-> (.sendMessages transport
#js {:chatId "sess-1"
:trigger "submit-message"
:messageId nil
:messages #js [#js {:id "user-1"
:role "user"
:parts #js [#js {:type "text"
:text "hello from ui"}]}]})
(.then <read-all-chunks)
(.then (fn [chunks]
(is (= ["finish"] (mapv :type chunks)))
(done)))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done)))))))
(deftest send-messages-emits-error-chunk-on-runtime-error-test
(async done
(let [fetch-fn (fn [url init]