fix: sandbox resume

This commit is contained in:
Tienson Qin
2026-03-09 22:34:21 +08:00
parent a147aed5e8
commit 6736ec660e
5 changed files with 248 additions and 131 deletions

View File

@@ -273,6 +273,48 @@
(defn- session-runtime-provider [session]
(some-> (get-in session [:runtime :provider]) str string/lower-case))
(defn- sandbox-bound-runtime?
[runtime]
(and (map? runtime)
(= "e2b" (some-> (:provider runtime) str string/lower-case))
(string? (:sandbox-id runtime))))
(defn- missing-bound-sandbox-error?
[error runtime]
(let [reason (some-> error ex-data :reason)
sandbox-id (some-> runtime :sandbox-id str string/trim not-empty)
message (-> (str (or (some-> error ex-message) error))
string/lower-case)]
(or (= :missing-sandbox-id reason)
(= "missing-sandbox-id" reason)
(and (string? sandbox-id)
(or (and (string/includes? message "sandbox")
(or (string/includes? message "not found")
(string/includes? message "notfound")
(string/includes? message "no such")))
(and (string/includes? message sandbox-id)
(or (string/includes? message "not found")
(string/includes? message "notfound")
(string/includes? message "no such"))))))))
(defn- <cancel-bound-runtime-session!
[^js self current-session runtime reason error]
(prn :debug :<cancel-bound-runtime-session! :reason reason
:error error)
(js/console.trace)
(p/let [latest-session (<get-session self)]
(when (and (map? latest-session)
(= (:id current-session) (:id latest-session))
(map? (:runtime latest-session)))
(p/let [_ (<append-event! self {:type "session.canceled"
:data {:by "system"
:reason reason
:runtime-session-id (:session-id runtime)
:sandbox-id (:sandbox-id runtime)
:error (str error)}
:ts (common/now-ms)})]
(<save-session! self (assoc latest-session :runtime nil))))))
(defn- session-terminal-enabled? [session]
(runtime-provider/runtime-terminal-supported? (:runtime session)))
@@ -525,11 +567,9 @@
:sandbox-id (:sandbox-id runtime)
:error error})
false)))
latest-session (<get-session self)]
(when (and terminated?
(map? latest-session)
(map? (:runtime latest-session)))
(<save-session! self (assoc latest-session :runtime nil))))))))))
_latest-session (<get-session self)]
(when terminated?
true))))))))
(defn- <checkpoint-and-terminate-completed-runtime!
[^js self session-id]
@@ -550,12 +590,9 @@
:sandbox-id (:sandbox-id runtime)
:error error})
false)))
latest-session (<get-session self)]
(when (and terminated?
(map? latest-session)
(= session-id (:id latest-session))
(map? (:runtime latest-session)))
(<save-session! self (assoc latest-session :runtime nil))))))))
_latest-session (<get-session self)]
(when terminated?
true))))))
(defn- <terminate-runtime-on-status!
[^js self session-id status]
@@ -575,12 +612,9 @@
:sandbox-id (:sandbox-id runtime)
:error error})
false)))
latest-session (<get-session self)]
(when (and terminated?
(map? latest-session)
(= session-id (:id latest-session))
(map? (:runtime latest-session)))
(<save-session! self (assoc latest-session :runtime nil))))))))
_latest-session (<get-session self)]
(when terminated?
true))))))
(defn- parse-sse-data [frame]
(let [lines (string/split frame #"\n")
@@ -659,6 +693,7 @@
runtime
#(resolve-ready! :ready))
(.catch (fn [error]
(js/console.error error)
(resolve-ready! :error)
(p/let [latest-session (<get-session self)
latest-runtime-session-id (some-> latest-session :runtime :session-id)
@@ -667,15 +702,18 @@
(= (:session-id runtime) latest-runtime-session-id))
session-terminal? (terminal-status? (:status latest-session))]
(if (and same-runtime? (not session-terminal?))
(do
(log/error :agent/runtime-events-stream-error
{:session-id session-id
:runtime-session-id (:session-id runtime)
:error error})
(<append-event! self {:type "agent.runtime.error"
:data {:session-id session-id
:message (str error)}
:ts (common/now-ms)}))
(if (and (sandbox-bound-runtime? runtime)
(missing-bound-sandbox-error? error runtime))
(<cancel-bound-runtime-session! self latest-session runtime "sandbox-missing" error)
(do
(log/error :agent/runtime-events-stream-error
{:session-id session-id
:runtime-session-id (:session-id runtime)
:error error})
(<append-event! self {:type "agent.runtime.error"
:data {:session-id session-id
:message (str error)}
:ts (common/now-ms)})))
(do
(log/info :agent/runtime-events-stream-closed
{:session-id session-id
@@ -771,13 +809,28 @@
latest-runtime-id (some-> (:session-id latest-runtime) str)
same-runtime? (and (string? failed-runtime-id)
(= failed-runtime-id latest-runtime-id))]
(p/let [_ (when same-runtime?
(<save-session! self (assoc latest-session :runtime nil)))
retry-session (if same-runtime?
(<get-session self)
latest-session)
session-with-runtime (<ensure-runtime-for-session! self retry-session)]
(send-once! session-with-runtime))))))]
(cond
(and same-runtime?
(sandbox-bound-runtime? latest-runtime)
(missing-bound-sandbox-error? error latest-runtime))
(<cancel-bound-runtime-session! self latest-session latest-runtime "sandbox-missing" error)
(and same-runtime? (map? latest-runtime))
(p/let [refreshed-runtime (runtime-provider/<refresh-runtime-session! (.-env self)
latest-runtime
(:task latest-session))
_ (<save-session! self (assoc latest-session :runtime refreshed-runtime))
refreshed-session (<get-session self)]
(send-once! refreshed-session))
:else
(p/let [_ (when same-runtime?
(<save-session! self (assoc latest-session :runtime nil)))
retry-session (if same-runtime?
(<get-session self)
latest-session)
session-with-runtime (<ensure-runtime-for-session! self retry-session)]
(send-once! session-with-runtime)))))))]
(-> (send-once! current-session)
(p/catch retry-send!)
(p/catch (fn [error]
@@ -1360,10 +1413,7 @@
{:by user-id
:reason "cancel"}))
_ (<terminate-runtime! self runtime)
latest-session (<get-session self)
_ (when (and (map? latest-session)
(map? (:runtime latest-session)))
(<save-session! self (assoc latest-session :runtime nil)))]
_latest-session (<get-session self)]
(if (= (:error res) :missing-session)
(http/not-found)
(http/json-response :sessions/cancel {:ok true}))))))))
@@ -1376,16 +1426,17 @@
runtime (:runtime session-with-runtime)]
(if-not (runtime-ready? runtime)
0
(let [[orders next-session] (session/drain-orders session-with-runtime)
provider (runtime-provider/resolve-provider (.-env self) runtime)]
(let [[orders next-session] (session/drain-orders session-with-runtime)]
(p/let [_ (<save-session! self next-session)
_ (start-runtime-events-stream-background! self (:id session-with-runtime) runtime)
_ (p/all
(map (fn [order]
(runtime-provider/<send-message! provider
runtime
{:message (:message order)
:kind (:kind order)}))
(send-runtime-message! self
next-session
runtime
nil
(:message order)
(:kind order)))
orders))]
(count orders))))))))
@@ -1416,8 +1467,12 @@
[v]
(js/Promise.resolve v))
(declare parse-int)
(defn- handle-stream [^js self request]
(let [streams (.-streams self)
(let [url (platform/request-url request)
since-ts (parse-int (.get (.-searchParams url) "since"))
streams (.-streams self)
stream (js/TransformStream.)
writer (.getWriter (.-writable stream))
stream-id (str (random-uuid))
@@ -1433,7 +1488,8 @@
;; IMPORTANT: don't block returning the Response; write the initial backlog async
(js/queueMicrotask
(fn []
(p/let [events (<get-events self)]
(p/let [events (<get-events self)
events (session/filter-events events {:since-ts since-ts})]
(doseq [event events]
;; writer.write returns a promise; wait so order is preserved
(p/let [_ (->promise (.write writer (sse-bytes event)))]

View File

@@ -793,6 +793,38 @@
(e2b-sandbox-host sandbox port))
(p/resolved (sandbox/normalize-base-url cached)))))
(defn- <e2b-ensure-bound-runtime-running!
[^js env runtime]
(let [sandbox-id (:sandbox-id runtime)
server-id (or (:server-id runtime) (:session-id runtime))
port (e2b-agent-port env runtime)
agent-token (e2b-agent-token env runtime)
backup-dir (or (:backup-dir runtime)
(e2b-runtime-repo-dir runtime nil))]
(when-not (string? sandbox-id)
(throw (ex-info "missing sandbox-id on runtime"
{:reason :missing-sandbox-id
:runtime runtime})))
(p/let [sandbox (<e2b-connect-sandbox! env sandbox-id)
healthy? (<e2b-health-once! sandbox port agent-token)]
(if healthy?
{:sandbox sandbox
:base-url (e2b-sandbox-host sandbox port)}
(let [repo-cd (when (string? backup-dir)
(str "cd '" (escape-shell-single backup-dir) "'; "))
bootstrap-cmd (str repo-cd
"nohup sandbox-agent server "
(if (string? agent-token)
(str "--token '" (escape-shell-single agent-token) "'")
"--no-token")
" --host 0.0.0.0 --port " port
" --no-telemetry >/tmp/sandbox-agent.log 2>&1 &")]
(p/let [_ (<e2b-run-shell! sandbox bootstrap-cmd)
_ (<e2b-health! env sandbox port agent-token)]
{:sandbox sandbox
:base-url (e2b-sandbox-host sandbox port)
:server-id server-id}))))))
(defn- <e2b-open-terminal!
[^js env runtime request {:keys [cols rows]}]
(let [sandbox-id (:sandbox-id runtime)
@@ -930,6 +962,8 @@
(<push-branch! [this runtime opts])
(<terminate-runtime! [this runtime]))
(declare provider-id resolve-provider)
(defrecord LocalRunnerProvider [env]
RuntimeProvider
@@ -1067,14 +1101,14 @@
(<open-events-stream! [_ runtime]
(let [agent-token (e2b-agent-token env runtime)]
(p/let [base-url (<e2b-runtime-base-url! env runtime)]
(p/let [{:keys [base-url]} (<e2b-ensure-bound-runtime-running! env runtime)]
(sandbox/<open-events-stream base-url
agent-token
(or (:server-id runtime) (:session-id runtime))))))
(<send-message! [_ runtime message]
(let [agent-token (e2b-agent-token env runtime)]
(p/let [base-url (<e2b-runtime-base-url! env runtime)]
(p/let [{:keys [base-url]} (<e2b-ensure-bound-runtime-running! env runtime)]
(sandbox/<send-message base-url
agent-token
(or (:server-id runtime) (:session-id runtime))
@@ -1082,7 +1116,8 @@
message))))
(<open-terminal! [_ runtime request opts]
(<e2b-open-terminal! env runtime request opts))
(p/let [_ (<e2b-ensure-bound-runtime-running! env runtime)]
(<e2b-open-terminal! env runtime request opts)))
(<snapshot-runtime! [_ runtime opts]
(let [session-id (:session-id runtime)
@@ -1159,6 +1194,47 @@
(<e2b-pause-sandbox! env sandbox-id)
(fn [_] nil))))))
(defn <refresh-runtime-session!
[^js env runtime task]
(let [provider (resolve-provider env runtime)
provider-id' (provider-id provider)
payload (session-payload task)]
(case provider-id'
"e2b"
(let [agent-token (e2b-agent-token env runtime)
server-id (or (:server-id runtime) (:session-id runtime))
cwd (or (:backup-dir runtime)
(e2b-runtime-repo-dir runtime task))]
(p/let [{:keys [base-url]} (<e2b-ensure-bound-runtime-running! env runtime)
response (sandbox/<create-session base-url
agent-token
server-id
payload
{:cwd cwd})]
(assoc runtime
:base-url base-url
:server-id (:server-id response)
:session-id (:session-id response))))
"local-runner"
(let [base-url (local-runner-base-url task runtime)
agent-token (local-runner-token task runtime)
headers (local-runner-headers task runtime)
server-id (or (:server-id runtime) (:session-id runtime))]
(p/let [response (sandbox/<create-session base-url
agent-token
server-id
payload
{:headers headers
:cwd (get-repo-dir server-id task "local-runner")})]
(assoc runtime
:server-id (:server-id response)
:session-id (:session-id response))))
(p/rejected
(ex-info "runtime provider does not support session refresh"
{:provider provider-id'})))))
(defn provider-id [provider]
(cond
(instance? E2BProvider provider) "e2b"

View File

@@ -128,7 +128,9 @@
(defn acp-envelope->event [payload]
(let [method (:method payload)
params (:params payload)]
params (:params payload)
runtime-event-type (:type payload)
runtime-payload (:payload payload)]
(cond
(= "session/update" method)
{:type "agent.runtime"
@@ -136,6 +138,13 @@
:session-id (:sessionId params)
:update (:update params)}}
(and (= "event_msg" runtime-event-type)
(= "task_complete" (:type runtime-payload)))
{:type "session.completed"
:data {:turn-id (:turn_id runtime-payload)
:last-agent-message (:last_agent_message runtime-payload)
:source "task_complete"}}
:else nil)))
(defn <create-session

View File

@@ -3,6 +3,7 @@
(:require [clojure.string :as string]
[electron.ipc :as electron-ipc]
[frontend.db :as db]
[frontend.handler.agent-cancel :as agent-cancel]
[frontend.handler.db-based.sync :as db-sync]
[frontend.handler.editor :as editor-handler]
[frontend.handler.notification :as notification]
@@ -384,9 +385,6 @@
(contains? #{"e2b"}
(normalize-runtime-provider provider)))
(defn- runtime-provider-snapshot-enabled? [provider]
(= "e2b" (normalize-runtime-provider provider)))
(defn- event-runtime-provider [event]
(when (= "session.provisioned" (:type event))
(some-> (get-in event [:data :provider]) normalize-runtime-provider)))
@@ -412,25 +410,20 @@
first))]
(runtime-provider-terminal-enabled? provider)))
(defn session-snapshot-enabled?
[session]
(let [provider (or (normalize-runtime-provider (:runtime-provider session))
(some->> (:events session)
reverse
(keep event-runtime-provider)
first))]
(runtime-provider-snapshot-enabled? provider)))
(defn- status->label [status-ident]
(some-> (db/entity status-ident) :block/title))
(defn- maybe-update-task-status!
[block-uuid status]
(prn :debug :status status
:ident (get session-status->task-status status))
(when-let [status-ident (get session-status->task-status status)]
(when-let [block (db/entity [:block/uuid block-uuid])]
(let [current (pu/get-block-property-value block :logseq.property/status)
desired (status->label status-ident)]
(when (and desired (not= current desired))
(when (= status-ident :logseq.property/status.canceled)
(agent-cancel/suppress-next-cancel! block-uuid))
(property-handler/set-block-property! block-uuid :logseq.property/status status-ident))))))
(defn- maybe-update-task-pr-url!
@@ -524,6 +517,15 @@
[block-uuid events]
(update-session! block-uuid #(merge-events % events)))
(declare session-state)
(defn- seen-event?
[block-uuid event]
(let [event-id (:event-id event)
event-ids (get-in (session-state block-uuid) [:event-ids])]
(and (string? event-id)
(contains? (or event-ids #{}) event-id))))
(defn- parse-sse-frame [frame]
(let [lines (string/split frame #"\n")
data-lines (keep (fn [line]
@@ -631,19 +633,20 @@
(defn- handle-stream-event!
[block-uuid event]
(append-events! block-uuid [event])
(when-let [provider (event-runtime-provider event)]
(update-session-state! block-uuid {:runtime-provider provider
:terminal-enabled (runtime-provider-terminal-enabled? provider)}))
(let [runtime-provider (or (event-runtime-provider event)
(:runtime-provider (session-state block-uuid)))]
(when-let [checkpoint (event->sandbox-checkpoint event runtime-provider)]
(maybe-store-task-sandbox-checkpoint! block-uuid checkpoint)))
(when-let [status (event->status event)]
(update-session-state! block-uuid {:status status})
(maybe-update-task-status! block-uuid status)
(when (terminal-status? status)
(stop-session-stream! block-uuid))))
(when-not (seen-event? block-uuid event)
(append-events! block-uuid [event])
(when-let [provider (event-runtime-provider event)]
(update-session-state! block-uuid {:runtime-provider provider
:terminal-enabled (runtime-provider-terminal-enabled? provider)}))
(let [runtime-provider (or (event-runtime-provider event)
(:runtime-provider (session-state block-uuid)))]
(when-let [checkpoint (event->sandbox-checkpoint event runtime-provider)]
(maybe-store-task-sandbox-checkpoint! block-uuid checkpoint)))
(when-let [status (event->status event)]
(update-session-state! block-uuid {:status status})
(maybe-update-task-status! block-uuid status)
(when (terminal-status? status)
(stop-session-stream! block-uuid)))))
(defn- <consume-sse-stream!
[block-uuid resp]
@@ -673,7 +676,13 @@
(stream-controller-active? (:stream-controller session)))
(p/resolved nil)
(when (string? stream-url)
(let [controller (js/AbortController.)
(let [stream-url (if-let [since-ts (when (number? (:last-event-ts session))
(:last-event-ts session))]
(let [url (js/URL. stream-url)]
(.set (.-searchParams url) "since" (str since-ts))
(.toString url))
stream-url)
controller (js/AbortController.)
headers (auth-headers)
opts (cond-> {:method "GET"
:signal (.-signal controller)}
@@ -726,8 +735,10 @@
(do
(maybe-store-task-session-id! block-uuid (:session-id session))
(when-not (:streaming? session)
(<connect-session-stream! block-uuid (or (:stream-url session)
(session-stream-url base session-id))))
(-> (<fetch-events! block)
(p/then (fn [_]
(<connect-session-stream! block-uuid (or (:stream-url (session-state block-uuid))
(session-stream-url base session-id)))))))
(p/resolved session))
:else
@@ -746,7 +757,9 @@
:loading? false})
(maybe-store-task-session-id! block-uuid session-id')
(maybe-update-task-status! block-uuid (:status resp))
(<connect-session-stream! block-uuid stream-url)
(<fetch-events! block)
(<connect-session-stream! block-uuid (or (:stream-url (session-state block-uuid))
stream-url))
resp)
(p/catch (fn [error]
(update-session-state! block-uuid {:loading? false})
@@ -755,52 +768,6 @@
(log/error :agent/ensure-session-failed error)))
nil))))))))
(defn <cancel-session!
[block]
(let [base (db-sync/http-base)
block-uuid (:block/uuid block)
session (session-state block-uuid)
session-id (or (:session-id session)
(task-session-id block)
(some-> block-uuid str))]
(cond
(not (string? base))
(p/resolved nil)
(not (string? session-id))
(p/resolved nil)
:else
(p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)]
(-> (db-sync/fetch-json (str base "/sessions/" session-id "/cancel")
{:method "POST"
:headers {"content-type" "application/json"}}
{:response-schema :sessions/cancel})
(p/then (fn [resp]
(update-session-state! block-uuid {:status "canceled"})
(stop-session-stream! block-uuid)
resp))
(p/catch (fn [error]
(when-not (= 404 (:status (ex-data error)))
(log/error :agent/cancel-session-failed error))
nil)))))))
(defn- task-status-canceled?
[block]
(let [status (pu/get-block-property-value block :logseq.property/status)]
(or (= :logseq.property/status.canceled status)
(= :logseq.property/status.canceled (:db/ident status)))))
(defn <cancel-session-if-task-canceled!
[block]
(let [block-uuid (:block/uuid block)
session (session-state block-uuid)
session-id (or (:session-id session)
(task-session-id block))]
(when (and (task-status-canceled? block)
(string? session-id))
(<cancel-session! block))))
(defn <start-session!
([block]
(<start-session! block nil))

View File

@@ -11,6 +11,21 @@
(some-> block-uuid str))
(def ^:private canceled-status-ident :logseq.property/status.canceled)
(defonce ^:private suppressed-block-uuids* (atom #{}))
(defn suppress-next-cancel!
[block-uuid]
(when block-uuid
(swap! suppressed-block-uuids* conj (str block-uuid))))
(defn- consume-suppressed-cancel?
[block-uuid]
(let [k (some-> block-uuid str)]
(when (string? k)
(let [suppressed? (contains? @suppressed-block-uuids* k)]
(when suppressed?
(swap! suppressed-block-uuids* disj k))
suppressed?))))
(defn- stop-session-stream!
[block-uuid]
@@ -58,13 +73,6 @@
(= :logseq.property/status (:a datom))
(canceled-status-value? (:v datom) canceled-status-id)))
(defn maybe-cancel-session-on-status-change!
[block property-id property-value]
(let [canceled-status-id (:db/id (db/entity canceled-status-ident))]
(when (and (= :logseq.property/status property-id)
(canceled-status-value? property-value canceled-status-id))
(<cancel-session-by-block-uuid! (:block/uuid block)))))
(defn maybe-cancel-sessions-on-db-change!
[tx-data]
(if (seq tx-data)
@@ -73,6 +81,7 @@
(filter #(status-canceled-datom? % canceled-status-id))
(keep (fn [datom]
(:block/uuid (db/entity (:e datom)))))
(remove consume-suppressed-cancel?)
distinct
(map <cancel-session-by-block-uuid!)
(remove nil?)