diff --git a/deps/workers/src/logseq/agents/runtime_provider.cljs b/deps/workers/src/logseq/agents/runtime_provider.cljs index 9c27c34ee5..40d2e41f0d 100644 --- a/deps/workers/src/logseq/agents/runtime_provider.cljs +++ b/deps/workers/src/logseq/agents/runtime_provider.cljs @@ -123,6 +123,7 @@ (str "http://127.0.0.1:" port path)) (def ^:private default-repo-base-dir "/workspace") +(def ^:private e2b-repo-base-dir "/home/user/workspace") (def ^:private vercel-repo-base-dir "/vercel/sandbox") (def ^:private cloudflare-local-host "http://localhost") (def ^:private cloudflare-snapshot-ttl-seconds (* 30 24 60 60)) @@ -372,19 +373,22 @@ (defn- repo-base-dir [provider] - (if (= "vercel" (normalize-provider provider)) - vercel-repo-base-dir + (case (normalize-provider provider) + "e2b" e2b-repo-base-dir + "vercel" vercel-repo-base-dir default-repo-base-dir)) (defn- get-repo-dir ([session-id] (get-repo-dir session-id nil nil)) ([session-id task provider] - (if (= "vercel" (normalize-provider provider)) - (str vercel-repo-base-dir "/" (task-repo-name task)) - (let [session-id (some-> session-id str)] - (when (string? session-id) - (str default-repo-base-dir "/" (sanitize-name session-id))))))) + (case (normalize-provider provider) + "e2b" (str e2b-repo-base-dir "/" (task-repo-name task)) + "vercel" (str vercel-repo-base-dir "/" (task-repo-name task)) + (let [session-id (some-> session-id str) + base-dir (repo-base-dir provider)] + (when (and (string? session-id) (string? base-dir)) + (str base-dir "/" (sanitize-name session-id))))))) (defn- repo-cd-command ([session-id] @@ -965,7 +969,7 @@ (defn- e2b-sandbox-timeout-ms [^js env] - (parse-int (env-str env "E2B_SANDBOX_TIMEOUT_MS") (* 30 60 1000))) + (parse-int (env-str env "E2B_SANDBOX_TIMEOUT_MS") (* 5 60 1000))) (defn- e2b-api-opts [^js env] @@ -990,13 +994,30 @@ (or (aget sandbox "sandboxId") (aget sandbox "id"))) +(defn- e2b-command-error-data + [command error] + (let [stdout (or (some-> error (aget "stdout")) "") + stderr (or (some-> error (aget "stderr")) "") + exit-code (or (some-> error (aget "exitCode")) + (some-> error (aget "exit_code"))) + name (some-> error (aget "name")) + message (or (some-> error (aget "message")) + (str error))] + (cond-> {:reason :e2b-command-failed + :command command + :message message + :stdout stdout + :stderr stderr} + (string? name) (assoc :error-name name) + (number? exit-code) (assoc :exit-code exit-code)))) + (defn- e2b-sandbox-host [sandbox port] (let [get-host (js-method sandbox "getHost")] (when-not (fn? get-host) (throw (ex-info "e2b sandbox missing getHost method" {:reason :missing-e2b-get-host}))) - (.call get-host sandbox port))) + (sandbox/normalize-base-url (.call get-host sandbox port)))) (defn promise (.call kill sandbox-class sandbox-id opts)) (p/resolved nil)))) +(defn js (e2b-api-opts env))] + (if (and (string? sandbox-id) (fn? pause)) + (->promise (.call pause sandbox-class sandbox-id opts)) + (p/resolved nil)))) + (defn promise (.call run-command commands command (clj->js params))) - (p/let [result (->promise (.call run-command commands command (clj->js params))) + (p/let [result (-> (.call run-command commands command (clj->js params)) + ->promise + (p/catch (fn [error] + (throw (ex-info "e2b sandbox command failed" + (e2b-command-error-data command error) + error))))) stdout (or (aget result "stdout") "") stderr (or (aget result "stderr") "") exit-code (or (aget result "exitCode") @@ -1219,7 +1254,7 @@ port (e2b-agent-port env runtime) sandbox-id (:sandbox-id runtime)] (if (string? cached) - (p/resolved cached) + (p/resolved (sandbox/normalize-base-url cached)) (p/let [sandbox ( request .-headers (.get "Upgrade") str string/lower-case)) + (throw (ex-info "e2b terminal requires websocket upgrade" + {:reason :unsupported-terminal + :session-id session-id}))) + (p/let [sandbox ( sandbox (aget "pty")) + create-pty (js-method pty "create")] + (when-not (fn? create-pty) + (throw (ex-info "e2b sandbox missing pty.create" + {:reason :unsupported-terminal + :sandbox-id sandbox-id + :session-id session-id}))) + (let [pair (js/WebSocketPair.) + client (aget pair 0) + server (aget pair 1)] + (.accept server) + (set! (.-binaryType server) "arraybuffer") + (-> (->promise + (.call create-pty pty + (clj->js (cond-> {:cols (or cols 120) + :rows (or rows 40) + :cwd cwd + :onData (fn [payload] + (.send server payload))} + (number? cols) (assoc :cols cols) + (number? rows) (assoc :rows rows))))) + (p/then + (fn [handle] + (let [pid (aget handle "pid") + send-input (js-method pty "sendInput") + resize (js-method pty "resize") + kill-handle (js-method handle "kill") + close-handler (fn [] + (when (fn? kill-handle) + (-> (->promise (.call kill-handle handle)) + (p/catch (fn [_] nil)))))] + (.addEventListener + server + "message" + (fn [event] + (let [payload (.-data event)] + (cond + (string? payload) + (let [parsed (parse-json-safe payload)] + (if (= "resize" (:type parsed)) + (when (and (number? pid) (fn? resize)) + (-> (->promise (.call resize pty pid + (clj->js {:cols (:cols parsed) + :rows (:rows parsed)}))) + (p/catch (fn [_] nil)))) + (when (and (number? pid) (fn? send-input)) + (-> (->promise (.call send-input pty pid payload)) + (p/catch (fn [_] nil)))))) + + (instance? js/ArrayBuffer payload) + (when (and (number? pid) (fn? send-input)) + (-> (->promise (.call send-input pty pid (js/Uint8Array. payload))) + (p/catch (fn [_] nil)))) + + (instance? js/Uint8Array payload) + (when (and (number? pid) (fn? send-input)) + (-> (->promise (.call send-input pty pid payload)) + (p/catch (fn [_] nil)))) + + :else nil)))) + (.addEventListener server "close" close-handler) + (.addEventListener server "error" (fn [_] (close-handler))) + (.send server (js/JSON.stringify #js {:type "ready"})) + (js/Response. nil #js {:status 101 + :webSocket client})))) + (p/catch + (fn [error] + (try + (.close server 1011 "terminal init failed") + (catch :default _ nil)) + (throw error))))))))) (defn- vercel-agent-token [^js env runtime] (or (:agent-token runtime) @@ -2546,14 +2646,16 @@ (fn [error] (log/error :agent/e2b-repo-clone-failed {:session-id session-id - :error (str error)}) + :error (str error) + :error-data (ex-data error)}) nil))) _ (p/catch ( base str string/trim not-empty) + base (some-> base (string/replace #"/+$" ""))] + (cond + (not (string? base)) "" + (re-find absolute-url-re base) base + (re-find local-host-re base) (str "http://" base) + :else (str "https://" base)))) (defn sessions-base-url [base] (str (normalize-base-url base) "/v1/sessions")) diff --git a/deps/workers/test/logseq/agents/e2b_runtime_provider_test.cljs b/deps/workers/test/logseq/agents/e2b_runtime_provider_test.cljs index 13cb8136a8..144456174c 100644 --- a/deps/workers/test/logseq/agents/e2b_runtime_provider_test.cljs +++ b/deps/workers/test/logseq/agents/e2b_runtime_provider_test.cljs @@ -31,7 +31,8 @@ env #js {"E2B_API_KEY" "e2b-key" "SANDBOX_AGENT_TOKEN" "agent-token"} provider (runtime-provider/create-provider env "e2b") - task {:agent {:provider "codex"}} + task {:agent {:provider "codex"} + :project {:repo-url "https://github.com/logseq/agent-test"}} sandbox-class (runtime-provider/e2b-sandbox-class) original-create (aget sandbox-class "create") original-fetch js/fetch @@ -77,6 +78,12 @@ (is (= "https://e2b-agent.local" (:base-url runtime))) (is (= "sess-e2b-1" (:session-id runtime))) (is (= 2468 (:sandbox-port runtime))) + (is (some #(and (= :command (:type %)) + (string/includes? (:cmd %) "mkdir -p '/home/user/workspace'")) + @calls)) + (is (some #(and (= :command (:type %)) + (string/includes? (:cmd %) "'/home/user/workspace/agent-test'")) + @calls)) (is (some #(= {:type :host :port 2468} %) @calls)) (done))) (.catch (fn [error] @@ -84,6 +91,95 @@ (is false (str "unexpected error: " error)) (done))))))) +(deftest e2b-provider-provision-normalizes-bare-host-test + (async done + (let [env #js {"E2B_API_KEY" "e2b-key" + "SANDBOX_AGENT_TOKEN" "agent-token"} + provider (runtime-provider/create-provider env "e2b") + task {:agent {:provider "codex"} + :project {:repo-url "https://github.com/logseq/agent-test"}} + sandbox-class (runtime-provider/e2b-sandbox-class) + original-create (aget sandbox-class "create") + original-fetch js/fetch + restore! (fn [] + (aset sandbox-class "create" original-create) + (set! js/fetch original-fetch))] + (aset sandbox-class "create" + (fn [& _args] + (js/Promise.resolve + #js {:sandboxId "e2b-sbx-2" + :getHost (fn [_port] + "2468-if9ct9du77wx2o6oorw10.e2b.app") + :commands + #js {:run (fn [cmd _opts] + (if (string/includes? cmd "/v1/health") + (js/Promise.resolve #js {:stdout "__HEALTH_OK__" + :stderr "" + :exitCode 0}) + (js/Promise.resolve #js {:stdout "" + :stderr "" + :exitCode 0})))}}))) + (set! js/fetch + (fn [request] + (is (= "https://2468-if9ct9du77wx2o6oorw10.e2b.app/v1/sessions/sess-e2b-2" + (.-url request))) + (js/Promise.resolve + (js/Response. + (js/JSON.stringify #js {:ok true}) + #js {:status 200 + :headers #js {"content-type" "application/json"}})))) + (-> (runtime-provider/ (runtime-provider/js [client-socket server-socket]))) + (aset sandbox-class "connect" + (fn [_sandbox-id _opts] + (js/Promise.resolve + #js {:sandboxId "e2b-sbx-1" + :pty + #js {:create (fn [opts] + (swap! calls assoc :pty-opts (js->clj opts :keywordize-keys true)) + (js/Promise.resolve + #js {:pid 42 + :kill (fn [] + (swap! calls update :kills (fnil inc 0)) + (js/Promise.resolve true))}))}}))) (-> (runtime-provider/ (runtime-provider/> session-messages (map message->chat-message) (remove nil?)) @@ -1020,7 +1021,7 @@ :class "h-7 px-2 text-xs" :on-click (fn [_] (close-terminal!))} "Disconnect"))]) - (when-not pr-created? + (when (and (not pr-created?) snapshot-enabled?) (shui/button {:size :sm :variant :outline diff --git a/src/main/frontend/handler/agent.cljs b/src/main/frontend/handler/agent.cljs index b547cb6693..a63c7f65db 100644 --- a/src/main/frontend/handler/agent.cljs +++ b/src/main/frontend/handler/agent.cljs @@ -354,7 +354,11 @@ (some-> provider str string/trim string/lower-case not-empty)) (defn- runtime-provider-terminal-enabled? [provider] - (= "cloudflare" (normalize-runtime-provider provider))) + (contains? #{"cloudflare" "e2b"} + (normalize-runtime-provider provider))) + +(defn- runtime-provider-snapshot-enabled? [provider] + (not= "e2b" (normalize-runtime-provider provider))) (defn- event-runtime-provider [event] (when (= "session.provisioned" (:type event)) @@ -381,6 +385,15 @@ first))] (runtime-provider-terminal-enabled? provider))) +(defn session-snapshot-enabled? + [session] + (let [provider (or (normalize-runtime-provider (:runtime-provider session)) + (some->> (:events session) + reverse + (keep event-runtime-provider) + first))] + (runtime-provider-snapshot-enabled? provider))) + (defn- status->label [status-ident] (some-> (db/entity status-ident) :block/title))