From 99a5ce5b8bb12d4dc568e714d4762ccb7c36b5cb Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Sat, 7 Feb 2026 16:35:06 +0800 Subject: [PATCH] fix: sprites session --- .../src/logseq/db_sync/worker/agent/do.cljs | 46 +-- .../worker/agent/runtime_provider.cljs | 176 ++++++++++-- .../test/logseq/db_sync/agent_do_test.cljs | 133 +++++++++ .../db_sync/agent_runtime_provider_test.cljs | 261 +++++++++++++++++- 4 files changed, 563 insertions(+), 53 deletions(-) diff --git a/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs b/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs index f4f51d9be0..4bd242541f 100644 --- a/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs +++ b/deps/db-sync/src/logseq/db_sync/worker/agent/do.cljs @@ -150,6 +150,13 @@ (fn [] (set! (.-runtime-events-stream self) nil))))))) +(defn- start-runtime-events-stream-background! [^js self session-id runtime] + ;; Fire-and-forget start. Returning nil avoids p/let awaiting the stream task. + (when (and (map? runtime) + (string? (:session-id runtime))) + (start-runtime-events-stream! self session-id runtime) + nil)) + (defn- (runtime-provider/ (runtime-provider/base64 [value] (when (string? value) - (let [bytes (.encode (js/TextEncoder.) value) - binary (apply str (map char bytes))] + (let [payload (.encode (js/TextEncoder.) value) + binary (apply str (map char payload))] (js/btoa binary)))) (defn- curl-auth-arg [token] @@ -183,8 +183,30 @@ (let [base (sprites-base-url env) token (sprites-token env) q (build-query (map (fn [c] [:cmd c]) cmd-vec)) - url (sprites-http-url base (str "/v1/sprites/" sprite-name "/exec?" q))] - (fetch-json! "POST" url {:token token}))) + url (sprites-http-url base (str "/v1/sprites/" sprite-name "/exec?" q)) + ^js headers (js/Headers.)] + (.set headers "Accept" "application/json") + (when token (token-header! headers token)) + (p/let [resp (js/fetch url #js {:method "POST" :headers headers}) + status (.-status resp) + text (->promise (.text resp)) + parsed (parse-json-safe text)] + (when-not (<= 200 status 299) + (throw (ex-info "sprites exec failed" + {:status status + :url url + :body parsed + :raw-body text}))) + (cond + (and (map? parsed) (seq parsed)) + parsed + + (string/blank? text) + {} + + :else + {:stdout text + :stderr ""})))) ;; ----------------------- ;; Worker WS upgrade to Sprites exec @@ -307,17 +329,27 @@ " --no-telemetry >/tmp/sandbox-agent.log 2>&1 &"))] (sprites-exec-post! env sprite-name ["bash" "-lc" bootstrap]))) +(declare sprites-exec-output) (defn- /dev/null")] + " >/dev/null; then echo __HEALTH_OK__; else echo __HEALTH_FAIL__; fi")] (-> (sprites-exec-post! env sprite-name ["bash" "-lc" script]) - (p/then (fn [_] true)) + (p/then (fn [result] + (let [{:keys [stdout stderr]} (sprites-exec-output result) + output (str stdout "\n" stderr)] + (when-not (string/includes? output "__HEALTH_OK__") + (throw (ex-info "sprite health check failed" + {:sprite sprite-name + :port port + :stdout stdout + :stderr stderr}))) + true))) (p/catch (fn [_] (p/let [_ (p/delay interval-ms)] ( {:message (:message message)} - (string? (:kind message)) (assoc :kind (:kind message)))) + {:message (:message message)}) + +(def ^:private sprite-http-status-marker "__HTTP_STATUS__") + +(defn- parse-sprite-http-status [text] + (when (string? text) + (some-> (re-seq (re-pattern (str sprite-http-status-marker ":(\\d+)")) text) + last + second + (parse-int 400)))) + +(defn- strip-sprite-http-status [text] + (if-not (string? text) + "" + (string/replace text + (re-pattern (str "(?:\\n?" sprite-http-status-marker ":\\d+)+\\s*$")) + ""))) + +(defn- sprites-exec-output [result] + (let [stdout (or (:stdout result) + (get-in result [:result :stdout]) + (get-in result [:data :stdout]) + (get-in result [:output :stdout])) + stderr (or (:stderr result) + (get-in result [:result :stderr]) + (get-in result [:data :stderr]) + (get-in result [:output :stderr])) + exit-code (or (:exitCode result) + (:exit-code result) + (get-in result [:result :exitCode]) + (get-in result [:result :exit-code]) + (get-in result [:data :exitCode]) + (get-in result [:data :exit-code]) + (get-in result [:output :exitCode]) + (get-in result [:output :exit-code]))] + {:stdout (cond + (string? stdout) stdout + (some? stdout) (str stdout) + :else "") + :stderr (cond + (string? stderr) stderr + (some? stderr) (str stderr) + :else "") + :exit-code (cond + (number? exit-code) exit-code + (string? exit-code) (parse-int exit-code nil) + :else nil)})) (defn- &2")] + "printf \"\\n" sprite-http-status-marker ":%s\" \"$http_status\"; ")] (p/let [result (sprites-exec-post! env sprite-name ["bash" "-lc" script]) - stdout (or (:stdout result) "") - stderr (or (:stderr result) "") - status (some-> (re-find #"STATUS:(\\d+)" stderr) - second - (parse-int 400)) - parsed (parse-json-safe stdout)] - (when (and (number? status) (not (<= 200 status 299))) + {:keys [stdout stderr exit-code]} (sprites-exec-output result) + status (or (parse-sprite-http-status stderr) + (parse-sprite-http-status stdout)) + parsed (parse-json-safe (strip-sprite-http-status stdout))] + (when (or + (not (number? status)) + (and (number? status) (not (<= 200 status 299)))) (throw (ex-info "sandbox-agent create-session failed" {:status status :body parsed :raw-body stdout :stderr stderr}))) + (println :debug :sprite-create-session :status status :exit-code exit-code) (assoc parsed :session-id session-id)))) +(defn- transient-create-session-error? + [error] + (let [{:keys [status raw-body stderr]} (ex-data error) + msg (ex-message error) + haystack (-> (str (or raw-body "") "\n" (or stderr "") "\n" (or msg "")) + string/lower-case)] + (or (= status 0) + (string/includes? haystack "__http_status__:000") + (string/includes? haystack "failed to connect") + (string/includes? haystack "could not connect") + (string/includes? haystack "connection refused") + (string/includes? haystack "connect(): connection refused") + (string/includes? haystack "connection reset") + (string/includes? haystack "timed out")))) + +(defn- ( retries 0) (transient-create-session-error? error)) + (do + (println :debug :sprite-create-session-retry + {:sprite sprite-name + :session-id session-id + :retries-left retries + :error (ex-data error)}) + (p/let [_ (p/delay interval-ms)] + (/dev/null")] + (js/console.log "[agent:sprites-send-message]" script) (p/let [_ (sprites-exec-post! env name ["bash" "-lc" script])] true)))) diff --git a/deps/db-sync/test/logseq/db_sync/agent_do_test.cljs b/deps/db-sync/test/logseq/db_sync/agent_do_test.cljs index 7c86fef0ae..6a56663db7 100644 --- a/deps/db-sync/test/logseq/db_sync/agent_do_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/agent_do_test.cljs @@ -149,3 +149,136 @@ (set! js/fetch original-fetch) (is false (str "unexpected error: " error)) (done)))))))) + +(deftest init-does-not-wait-for-open-events-stream-test + (testing "session init returns immediately even when runtime events stream stays open" + (async done + (let [calls (atom {:create 0 + :events-sse 0}) + original-fetch js/fetch + env #js {"AGENT_RUNTIME_PROVIDER" "local-dev" + "SANDBOX_AGENT_URL" "http://sandbox.local"} + self (make-self env) + headers {"content-type" "application/json" + "x-user-id" "user-1"} + init-body {:id "sess-init-fast" + :project {:id "project-1"} + :agent "codex"} + timeout-id (js/setTimeout (fn [] + (set! js/fetch original-fetch) + (is false "init blocked waiting on events stream") + (done)) + 250)] + (set! js/fetch + (fn [request] + (let [url (.-url request) + method (.-method request)] + (cond + (and (= "POST" method) + (string/includes? url "/v1/sessions/sess-init-fast") + (not (string/includes? url "/messages"))) + (do + (swap! calls update :create inc) + (js/Promise.resolve + (js/Response. + (js/JSON.stringify #js {:ok true}) + #js {:status 200 + :headers #js {"content-type" "application/json"}}))) + + (and (= "GET" method) + (string/includes? url "/v1/sessions/sess-init-fast/events/sse")) + (do + (swap! calls update :events-sse inc) + (let [stream (js/TransformStream.)] + ;; Never close/read completion from this stream. + (js/Promise.resolve + (js/Response. + (.-readable stream) + #js {:status 200 + :headers #js {"content-type" "text/event-stream"}})))) + + :else + (js/Promise.resolve + (js/Response. + (js/JSON.stringify #js {:error "unhandled request"}) + #js {:status 500 + :headers #js {"content-type" "application/json"}})))))) + + (-> (agent-do/handle-fetch self + (json-request "http://db-sync.local/__session__/init" + "POST" + init-body + headers)) + (.then (fn [resp] + (js/clearTimeout timeout-id) + (set! js/fetch original-fetch) + (is (= 200 (.-status resp))) + (is (= 1 (:create @calls))) + (is (= 1 (:events-sse @calls))) + (done))) + (.catch (fn [error] + (js/clearTimeout timeout-id) + (set! js/fetch original-fetch) + (is false (str "unexpected error: " error)) + (done)))))))) + +(deftest init-does-not-await-start-runtime-events-stream-return-test + (testing "session init should not await start-runtime-events-stream! promise" + (async done + (let [calls (atom {:create 0}) + original-fetch js/fetch + env #js {"AGENT_RUNTIME_PROVIDER" "local-dev" + "SANDBOX_AGENT_URL" "http://sandbox.local"} + self (make-self env) + headers {"content-type" "application/json" + "x-user-id" "user-1"} + init-body {:id "sess-init-no-await" + :project {:id "project-1"} + :agent "codex"} + timeout-id (js/setTimeout (fn [] + (set! js/fetch original-fetch) + (is false "init awaited start-runtime-events-stream! promise") + (done)) + 250)] + (set! js/fetch + (fn [request] + (let [url (.-url request) + method (.-method request)] + (cond + (and (= "POST" method) + (string/includes? url "/v1/sessions/sess-init-no-await") + (not (string/includes? url "/messages"))) + (do + (swap! calls update :create inc) + (js/Promise.resolve + (js/Response. + (js/JSON.stringify #js {:ok true}) + #js {:status 200 + :headers #js {"content-type" "application/json"}}))) + + :else + (js/Promise.resolve + (js/Response. + (js/JSON.stringify #js {:error "unhandled request"}) + #js {:status 500 + :headers #js {"content-type" "application/json"}})))))) + + (with-redefs [agent-do/start-runtime-events-stream! + (fn [& _] + (js/Promise. (fn [_resolve _reject])))] + (-> (agent-do/handle-fetch self + (json-request "http://db-sync.local/__session__/init" + "POST" + init-body + headers)) + (.then (fn [resp] + (js/clearTimeout timeout-id) + (set! js/fetch original-fetch) + (is (= 200 (.-status resp))) + (is (= 1 (:create @calls))) + (done))) + (.catch (fn [error] + (js/clearTimeout timeout-id) + (set! js/fetch original-fetch) + (is false (str "unexpected error: " error)) + (done))))))))) diff --git a/deps/db-sync/test/logseq/db_sync/agent_runtime_provider_test.cljs b/deps/db-sync/test/logseq/db_sync/agent_runtime_provider_test.cljs index 7bd22f899b..eef464e300 100644 --- a/deps/db-sync/test/logseq/db_sync/agent_runtime_provider_test.cljs +++ b/deps/db-sync/test/logseq/db_sync/agent_runtime_provider_test.cljs @@ -3,6 +3,18 @@ [clojure.string :as string] [logseq.db-sync.worker.agent.runtime-provider :as runtime-provider])) +(defn- fetch-url [request] + (cond + (string? request) request + (instance? js/URL request) (.toString request) + (some? request) (.-url request) + :else "")) + +(defn- fetch-method [request init] + (or (when (some? init) (aget init "method")) + (when (and (some? request) (not (string? request))) (.-method request)) + "GET")) + (deftest provider-kind-test (testing "normalizes configured runtime provider" (is (= "sprites" (runtime-provider/provider-kind #js {}))) @@ -100,9 +112,9 @@ task {:agent {:provider "codex"}} original-fetch js/fetch] (set! js/fetch - (fn [request] - (is (= "http://127.0.0.1:2468/v1/sessions/sess-1" (.-url request))) - (is (= "POST" (.-method request))) + (fn [request init] + (is (= "http://127.0.0.1:2468/v1/sessions/sess-1" (fetch-url request))) + (is (= "POST" (fetch-method request init))) (js/Promise.resolve (js/Response. (js/JSON.stringify #js {:ok true}) @@ -128,9 +140,9 @@ :session-id "sess-2"} original-fetch js/fetch] (set! js/fetch - (fn [request] - (is (= "http://sandbox.local/v1/sessions/sess-2/events/sse" (.-url request))) - (is (= "GET" (.-method request))) + (fn [request init] + (is (= "http://sandbox.local/v1/sessions/sess-2/events/sse" (fetch-url request))) + (is (= "GET" (fetch-method request init))) (js/Promise.resolve (js/Response. "data: {\"type\":\"ok\"}\n\n" #js {:status 200 @@ -154,9 +166,9 @@ :session-id "sess-2"} original-fetch js/fetch] (set! js/fetch - (fn [request] - (is (= "http://sandbox.local/v1/sessions/sess-2/messages" (.-url request))) - (is (= "POST" (.-method request))) + (fn [request init] + (is (= "http://sandbox.local/v1/sessions/sess-2/messages" (fetch-url request))) + (is (= "POST" (fetch-method request init))) (js/Promise.resolve (js/Response. (js/JSON.stringify #js {:ok true}) @@ -172,6 +184,237 @@ (is false (str "unexpected error: " error)) (done))))))) +(deftest sprites-provider-send-message-test + (async done + (let [captured (atom nil) + env #js {"SPRITE_TOKEN" "sprite-token"} + provider (runtime-provider/create-provider env "sprites") + runtime {:provider "sprites" + :sprite-name "sprite-1" + :sandbox-port 2468 + :session-id "sess-4"} + original-fetch js/fetch] + (set! js/fetch + (fn [request init] + (let [url (fetch-url request) + parsed (js/URL. url) + cmds (vec (.getAll (.-searchParams parsed) "cmd"))] + (reset! captured {:url url + :method (fetch-method request init) + :script (nth cmds 2 nil)}) + (js/Promise.resolve + (js/Response. + (js/JSON.stringify #js {:ok true}) + #js {:status 200 :headers #js {"content-type" "application/json"}}))))) + (-> (runtime-provider/js (if create-session? + {:result {:stdout "{\"ok\":true}\n__HTTP_STATUS__:200__HTTP_STATUS__:200" + :stderr ""}} + {:result {:stdout "" + :stderr ""}}))) + #js {:status 200 :headers #js {"content-type" "application/json"}}))) + + :else + (js/Promise.resolve + (js/Response. + (js/JSON.stringify #js {:error "unexpected request"}) + #js {:status 500 :headers #js {"content-type" "application/json"}})))))) + (-> (runtime-provider/js (if (= 1 (:create-session n)) + {:result {:stdout "curl: (7) Failed to connect to 127.0.0.1 port 2468\n__HTTP_STATUS__:000" + :stderr ""}} + {:result {:stdout "{\"ok\":true}\n__HTTP_STATUS__:200" + :stderr ""}}))) + #js {:status 200 :headers #js {"content-type" "application/json"}}))) + + health? + (js/Promise.resolve + (js/Response. + (js/JSON.stringify (clj->js {:result {:stdout "__HEALTH_OK__" :stderr ""}})) + #js {:status 200 :headers #js {"content-type" "application/json"}})) + + :else + (js/Promise.resolve + (js/Response. + (js/JSON.stringify (clj->js {:result {:stdout "" :stderr ""}})) + #js {:status 200 :headers #js {"content-type" "application/json"}})))) + + :else + (js/Promise.resolve + (js/Response. + (js/JSON.stringify #js {:error "unexpected request"}) + #js {:status 500 :headers #js {"content-type" "application/json"}})))))) + (-> (runtime-provider/js {:result {:stdout "{\"error\":\"bad request\"}\n__HTTP_STATUS__:400" + :stderr ""}})) + #js {:status 200 :headers #js {"content-type" "application/json"}}))) + + health? + (js/Promise.resolve + (js/Response. + (js/JSON.stringify (clj->js {:result {:stdout "__HEALTH_OK__" :stderr ""}})) + #js {:status 200 :headers #js {"content-type" "application/json"}})) + + :else + (js/Promise.resolve + (js/Response. + (js/JSON.stringify (clj->js {:result {:stdout "" :stderr ""}})) + #js {:status 200 :headers #js {"content-type" "application/json"}})))) + + :else + (js/Promise.resolve + (js/Response. + (js/JSON.stringify #js {:error "unexpected request"}) + #js {:status 500 :headers #js {"content-type" "application/json"}})))))) + (-> (runtime-provider/