From 6736ec660e9892fb6d50d29ae95e4c4c4385ee60 Mon Sep 17 00:00:00 2001 From: Tienson Qin Date: Mon, 9 Mar 2026 22:34:21 +0800 Subject: [PATCH] fix: sandbox resume --- deps/workers/src/logseq/agents/do.cljs | 146 ++++++++++++------ .../src/logseq/agents/runtime_provider.cljs | 82 +++++++++- deps/workers/src/logseq/agents/sandbox.cljs | 11 +- src/main/frontend/handler/agent.cljs | 117 +++++--------- src/main/frontend/handler/agent_cancel.cljs | 23 ++- 5 files changed, 248 insertions(+), 131 deletions(-) diff --git a/deps/workers/src/logseq/agents/do.cljs b/deps/workers/src/logseq/agents/do.cljs index 6a50db82d2..6ec71d8f7d 100644 --- a/deps/workers/src/logseq/agents/do.cljs +++ b/deps/workers/src/logseq/agents/do.cljs @@ -273,6 +273,48 @@ (defn- session-runtime-provider [session] (some-> (get-in session [:runtime :provider]) str string/lower-case)) +(defn- sandbox-bound-runtime? + [runtime] + (and (map? runtime) + (= "e2b" (some-> (:provider runtime) str string/lower-case)) + (string? (:sandbox-id runtime)))) + +(defn- missing-bound-sandbox-error? + [error runtime] + (let [reason (some-> error ex-data :reason) + sandbox-id (some-> runtime :sandbox-id str string/trim not-empty) + message (-> (str (or (some-> error ex-message) error)) + string/lower-case)] + (or (= :missing-sandbox-id reason) + (= "missing-sandbox-id" reason) + (and (string? sandbox-id) + (or (and (string/includes? message "sandbox") + (or (string/includes? message "not found") + (string/includes? message "notfound") + (string/includes? message "no such"))) + (and (string/includes? message sandbox-id) + (or (string/includes? message "not found") + (string/includes? message "notfound") + (string/includes? message "no such")))))))) + +(defn- latest-session :runtime :session-id) @@ -667,15 +702,18 @@ (= (:session-id runtime) latest-runtime-session-id)) session-terminal? (terminal-status? (:status latest-session))] (if (and same-runtime? (not session-terminal?)) - (do - (log/error :agent/runtime-events-stream-error - {:session-id session-id - :runtime-session-id (:session-id runtime) - :error error}) - ( (:session-id latest-runtime) str) same-runtime? (and (string? failed-runtime-id) (= failed-runtime-id latest-runtime-id))] - (p/let [_ (when same-runtime? - ( (send-once! current-session) (p/catch retry-send!) (p/catch (fn [error] @@ -1360,10 +1413,7 @@ {:by user-id :reason "cancel"})) _ (promise (.write writer (sse-bytes event)))] diff --git a/deps/workers/src/logseq/agents/runtime_provider.cljs b/deps/workers/src/logseq/agents/runtime_provider.cljs index 3d9450873d..1600ec85b5 100644 --- a/deps/workers/src/logseq/agents/runtime_provider.cljs +++ b/deps/workers/src/logseq/agents/runtime_provider.cljs @@ -793,6 +793,38 @@ (e2b-sandbox-host sandbox port)) (p/resolved (sandbox/normalize-base-url cached))))) +(defn- /tmp/sandbox-agent.log 2>&1 &")] + (p/let [_ (event [payload] (let [method (:method payload) - params (:params payload)] + params (:params payload) + runtime-event-type (:type payload) + runtime-payload (:payload payload)] (cond (= "session/update" method) {:type "agent.runtime" @@ -136,6 +138,13 @@ :session-id (:sessionId params) :update (:update params)}} + (and (= "event_msg" runtime-event-type) + (= "task_complete" (:type runtime-payload))) + {:type "session.completed" + :data {:turn-id (:turn_id runtime-payload) + :last-agent-message (:last_agent_message runtime-payload) + :source "task_complete"}} + :else nil))) (defn (get-in event [:data :provider]) normalize-runtime-provider))) @@ -412,25 +410,20 @@ first))] (runtime-provider-terminal-enabled? provider))) -(defn session-snapshot-enabled? - [session] - (let [provider (or (normalize-runtime-provider (:runtime-provider session)) - (some->> (:events session) - reverse - (keep event-runtime-provider) - first))] - (runtime-provider-snapshot-enabled? provider))) - (defn- status->label [status-ident] (some-> (db/entity status-ident) :block/title)) (defn- maybe-update-task-status! [block-uuid status] + (prn :debug :status status + :ident (get session-status->task-status status)) (when-let [status-ident (get session-status->task-status status)] (when-let [block (db/entity [:block/uuid block-uuid])] (let [current (pu/get-block-property-value block :logseq.property/status) desired (status->label status-ident)] (when (and desired (not= current desired)) + (when (= status-ident :logseq.property/status.canceled) + (agent-cancel/suppress-next-cancel! block-uuid)) (property-handler/set-block-property! block-uuid :logseq.property/status status-ident)))))) (defn- maybe-update-task-pr-url! @@ -524,6 +517,15 @@ [block-uuid events] (update-session! block-uuid #(merge-events % events))) +(declare session-state) + +(defn- seen-event? + [block-uuid event] + (let [event-id (:event-id event) + event-ids (get-in (session-state block-uuid) [:event-ids])] + (and (string? event-id) + (contains? (or event-ids #{}) event-id)))) + (defn- parse-sse-frame [frame] (let [lines (string/split frame #"\n") data-lines (keep (fn [line] @@ -631,19 +633,20 @@ (defn- handle-stream-event! [block-uuid event] - (append-events! block-uuid [event]) - (when-let [provider (event-runtime-provider event)] - (update-session-state! block-uuid {:runtime-provider provider - :terminal-enabled (runtime-provider-terminal-enabled? provider)})) - (let [runtime-provider (or (event-runtime-provider event) - (:runtime-provider (session-state block-uuid)))] - (when-let [checkpoint (event->sandbox-checkpoint event runtime-provider)] - (maybe-store-task-sandbox-checkpoint! block-uuid checkpoint))) - (when-let [status (event->status event)] - (update-session-state! block-uuid {:status status}) - (maybe-update-task-status! block-uuid status) - (when (terminal-status? status) - (stop-session-stream! block-uuid)))) + (when-not (seen-event? block-uuid event) + (append-events! block-uuid [event]) + (when-let [provider (event-runtime-provider event)] + (update-session-state! block-uuid {:runtime-provider provider + :terminal-enabled (runtime-provider-terminal-enabled? provider)})) + (let [runtime-provider (or (event-runtime-provider event) + (:runtime-provider (session-state block-uuid)))] + (when-let [checkpoint (event->sandbox-checkpoint event runtime-provider)] + (maybe-store-task-sandbox-checkpoint! block-uuid checkpoint))) + (when-let [status (event->status event)] + (update-session-state! block-uuid {:status status}) + (maybe-update-task-status! block-uuid status) + (when (terminal-status? status) + (stop-session-stream! block-uuid))))) (defn- {:method "GET" :signal (.-signal controller)} @@ -726,8 +735,10 @@ (do (maybe-store-task-session-id! block-uuid (:session-id session)) (when-not (:streaming? session) - ( ( block-uuid str))] - (cond - (not (string? base)) - (p/resolved nil) - - (not (string? session-id)) - (p/resolved nil) - - :else - (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)] - (-> (db-sync/fetch-json (str base "/sessions/" session-id "/cancel") - {:method "POST" - :headers {"content-type" "application/json"}} - {:response-schema :sessions/cancel}) - (p/then (fn [resp] - (update-session-state! block-uuid {:status "canceled"}) - (stop-session-stream! block-uuid) - resp)) - (p/catch (fn [error] - (when-not (= 404 (:status (ex-data error))) - (log/error :agent/cancel-session-failed error)) - nil))))))) - -(defn- task-status-canceled? - [block] - (let [status (pu/get-block-property-value block :logseq.property/status)] - (or (= :logseq.property/status.canceled status) - (= :logseq.property/status.canceled (:db/ident status))))) - -(defn block-uuid str)) (def ^:private canceled-status-ident :logseq.property/status.canceled) +(defonce ^:private suppressed-block-uuids* (atom #{})) + +(defn suppress-next-cancel! + [block-uuid] + (when block-uuid + (swap! suppressed-block-uuids* conj (str block-uuid)))) + +(defn- consume-suppressed-cancel? + [block-uuid] + (let [k (some-> block-uuid str)] + (when (string? k) + (let [suppressed? (contains? @suppressed-block-uuids* k)] + (when suppressed? + (swap! suppressed-block-uuids* disj k)) + suppressed?)))) (defn- stop-session-stream! [block-uuid] @@ -58,13 +73,6 @@ (= :logseq.property/status (:a datom)) (canceled-status-value? (:v datom) canceled-status-id))) -(defn maybe-cancel-session-on-status-change! - [block property-id property-value] - (let [canceled-status-id (:db/id (db/entity canceled-status-ident))] - (when (and (= :logseq.property/status property-id) - (canceled-status-value? property-value canceled-status-id)) - (