Cloudflare sandbox runtime

This commit is contained in:
Tienson Qin
2026-02-07 13:09:45 +08:00
parent 63ea3c0d31
commit e455f4fd39
14 changed files with 1112 additions and 71 deletions

View File

@@ -107,6 +107,13 @@ Agent runtime is selected by `AGENT_RUNTIME_PROVIDER`:
For `cloudflare`, bind and export `Sandbox` in the Worker and configure the container image in
`worker/wrangler.toml` (`[[containers]] class_name = "Sandbox"`).
Cloudflare runtime flow:
- resolve sandbox by deterministic name (session id + prefix)
- health probe `sandbox-agent` inside container
- set runtime env vars and start `sandbox-agent` when needed
- proxy create/message stream via `containerFetch`
- terminate session and cleanup sandbox on terminal states
## Environment Variables
| Variable | Purpose |
@@ -140,6 +147,16 @@ For `cloudflare`, bind and export `Sandbox` in the Worker and configure the cont
| SPRITES_SANDBOX_AGENT_PORT | sandbox-agent port inside sprite (default `2468`) |
| SPRITES_HEALTH_RETRIES | Sprite health check retry count |
| SPRITES_HEALTH_INTERVAL_MS | Sprite health check retry interval (ms) |
| CLOUDFLARE_SANDBOX_NAME_PREFIX | Prefix used when creating Cloudflare sandbox names |
| CLOUDFLARE_SANDBOX_AGENT_PORT | sandbox-agent port inside Cloudflare sandbox (default `2468`) |
| CLOUDFLARE_BOOTSTRAP_COMMAND | Optional command override to start sandbox-agent in Cloudflare sandbox |
| CLOUDFLARE_REPO_CLONE_COMMAND | Optional repo clone command template for Cloudflare sandbox |
| CLOUDFLARE_HEALTH_RETRIES | Cloudflare sandbox health check retry count |
| CLOUDFLARE_HEALTH_INTERVAL_MS | Cloudflare sandbox health check retry interval (ms) |
| OPENAI_API_KEY | Passed into Cloudflare sandbox runtime env (if set) |
| ANTHROPIC_API_KEY | Passed into Cloudflare sandbox runtime env (if set) |
| OPENAI_BASE_URL | Passed into Cloudflare sandbox runtime env (if set) |
| ANTHROPIC_BASE_URL | Passed into Cloudflare sandbox runtime env (if set) |
## Notes
- Protocol definitions live in `docs/agent-guide/db-sync/protocol.md`.

View File

@@ -1,6 +1,6 @@
# Agent Service Milestones
Date: 2026-02-01
Date: 2026-02-06
Status: Active
Milestones are tracked as separate files in this folder:
@@ -16,3 +16,4 @@ Milestones are tracked as separate files in this folder:
- `10-m10-soft-interrupt-resume.md`
- `11-m11-collab-access.md`
- `12-m12-attachments.md`
- `13-m13-cloudflare-sandbox-runtime.md`

View File

@@ -0,0 +1,92 @@
# M13: Cloudflare Sandbox Runtime Provider
Status: Completed (2026-02-06)
Target: Add a first-class `cloudflare` runtime provider for agent sessions using Cloudflare Sandbox.
## Goal
Support `AGENT_RUNTIME_PROVIDER=cloudflare` end-to-end so db-sync can provision a Cloudflare sandbox, start `sandbox-agent`, and stream runtime events through existing session APIs.
## Why M13
- Runtime provider selection still accepts `cloudflare`, but the runtime implementation is currently Sprites-only.
- The Worker already has `Sandbox` bindings and container image config in `worker/wrangler.toml`.
- We can follow the validated Cloudflare pattern from sandbox-agent:
- `examples/cloudflare/src/cloudflare.ts`
- `docs/deploy/cloudflare.mdx`
## Inputs
- Cloudflare Sandbox reference flow:
- `getSandbox(...)`
- probe `/v1/health`
- `setEnvVars(...)`
- `startProcess("sandbox-agent server ...")`
- proxy via `containerFetch(...)`
## Scope
1) Add `CloudflareProvider` in `src/logseq/db_sync/worker/agent/runtime_provider.cljs`.
2) Implement lifecycle methods:
- `<provision-runtime!`
- `<open-message-stream!`
- `<terminate-runtime!`
3) Restore provider resolution by runtime/env:
- `sprites` (default)
- `local-dev` (existing direct URL mode)
- `cloudflare` (new)
4) Persist Cloudflare runtime metadata in DO session state:
- `:provider`
- `:sandbox-id` or stable sandbox name
- `:sandbox-port`
- runtime `:session-id`
5) Add docs and env references for Cloudflare runtime operation.
## Out of Scope
- Changing default runtime away from `sprites`.
- Reworking UI behavior for sessions.
- Non-Cloudflare sandbox providers.
## Workstreams
### WS1: Cloudflare Sandbox Provisioning
- Provision/lookup sandbox by deterministic name from session id.
- Add health probe and bootstrapping logic for `sandbox-agent` in-container.
- Inject required runtime env vars for agent credentials.
### WS2: Cloudflare Message Streaming
- Create session through sandbox-hosted `sandbox-agent` API.
- Open `/v1/sessions/:id/messages/stream` via sandbox container networking.
- Preserve existing SSE parsing and event append behavior in session DO.
### WS3: Runtime Selection + Compatibility
- Implement `create-provider` / `resolve-provider` dispatch by provider kind.
- Keep Sprites behavior unchanged when provider is unset.
- Keep `local-dev` behavior for explicit local testing.
### WS4: Reliability + Cleanup
- Retry/backoff for sandbox bootstrap and health checks.
- Ensure runtime teardown on `completed|failed|canceled`.
- Guard against orphaned sessions/processes with timeout-safe cleanup.
### WS5: Tests + Docs
- Add unit tests for Cloudflare provider selection and request construction.
- Add smoke-test instructions for local `wrangler dev` Cloudflare runtime flow.
- Update `README.md` runtime-provider section with Cloudflare-specific requirements.
## Exit Criteria
1) `AGENT_RUNTIME_PROVIDER=cloudflare` can run create/message/stream for a real task session.
2) Session events stream continuously until terminal state.
3) Runtime cleanup executes on terminal states and cancel actions.
4) `sprites` default path remains working when provider is unset.
5) Tests cover provider resolution and Cloudflare runtime behavior.
## Implemented
- Added provider dispatch for `sprites`, `local-dev`, and `cloudflare`.
- Implemented `LocalDevProvider` using direct sandbox-agent HTTP/SSE endpoints.
- Implemented `CloudflareProvider` lifecycle:
- deterministic sandbox naming via `CLOUDFLARE_SANDBOX_NAME_PREFIX`
- health probe and bootstrap (`setEnvVars`, `startProcess` fallback to `exec`)
- create session and message streaming via `containerFetch`
- terminate session and best-effort sandbox cleanup
- Added Cloudflare/runtime metadata in session runtime state (`:sandbox-id`, `:sandbox-name`, `:sandbox-port`).
- Added `sandbox/<terminate-session` endpoint helper.
- Updated runtime event provisioning payload to include `:sandbox-name`.
- Added and updated tests in `test/logseq/db_sync/agent_runtime_provider_test.cljs`.
- Updated runtime docs/env reference in `README.md`.

View File

@@ -38,6 +38,16 @@
:sprites-sandbox-agent-port (env-value env "SPRITES_SANDBOX_AGENT_PORT")
:sprites-health-retries (env-value env "SPRITES_HEALTH_RETRIES")
:sprites-health-interval-ms (env-value env "SPRITES_HEALTH_INTERVAL_MS")
:cloudflare-sandbox-name-prefix (env-value env "CLOUDFLARE_SANDBOX_NAME_PREFIX")
:cloudflare-sandbox-agent-port (env-value env "CLOUDFLARE_SANDBOX_AGENT_PORT")
:cloudflare-bootstrap-command (env-value env "CLOUDFLARE_BOOTSTRAP_COMMAND")
:cloudflare-repo-clone-command (env-value env "CLOUDFLARE_REPO_CLONE_COMMAND")
:cloudflare-health-retries (env-value env "CLOUDFLARE_HEALTH_RETRIES")
:cloudflare-health-interval-ms (env-value env "CLOUDFLARE_HEALTH_INTERVAL_MS")
:openai-api-key (env-value env "OPENAI_API_KEY")
:anthropic-api-key (env-value env "ANTHROPIC_API_KEY")
:openai-base-url (env-value env "OPENAI_BASE_URL")
:anthropic-base-url (env-value env "ANTHROPIC_BASE_URL")
:log-level (or (env-value env "DB_SYNC_LOG_LEVEL") "info")
:cognito-issuer (env-value env "COGNITO_ISSUER")
:cognito-client-id (env-value env "COGNITO_CLIENT_ID")

View File

@@ -71,6 +71,16 @@
(aset "SPRITES_SANDBOX_AGENT_PORT" (:sprites-sandbox-agent-port cfg))
(aset "SPRITES_HEALTH_RETRIES" (:sprites-health-retries cfg))
(aset "SPRITES_HEALTH_INTERVAL_MS" (:sprites-health-interval-ms cfg))
(aset "CLOUDFLARE_SANDBOX_NAME_PREFIX" (:cloudflare-sandbox-name-prefix cfg))
(aset "CLOUDFLARE_SANDBOX_AGENT_PORT" (:cloudflare-sandbox-agent-port cfg))
(aset "CLOUDFLARE_BOOTSTRAP_COMMAND" (:cloudflare-bootstrap-command cfg))
(aset "CLOUDFLARE_REPO_CLONE_COMMAND" (:cloudflare-repo-clone-command cfg))
(aset "CLOUDFLARE_HEALTH_RETRIES" (:cloudflare-health-retries cfg))
(aset "CLOUDFLARE_HEALTH_INTERVAL_MS" (:cloudflare-health-interval-ms cfg))
(aset "OPENAI_API_KEY" (:openai-api-key cfg))
(aset "ANTHROPIC_API_KEY" (:anthropic-api-key cfg))
(aset "OPENAI_BASE_URL" (:openai-base-url cfg))
(aset "ANTHROPIC_BASE_URL" (:anthropic-base-url cfg))
(aset "COGNITO_ISSUER" (:cognito-issuer cfg))
(aset "COGNITO_CLIENT_ID" (:cognito-client-id cfg))
(aset "COGNITO_JWKS_URL" (:cognito-jwks-url cfg)))]

View File

@@ -73,6 +73,7 @@
(contains? #{"completed" "failed" "canceled"} status))
(defn- <terminate-runtime! [^js self runtime]
(set! (.-runtime-events-stream self) nil)
(if-not (map? runtime)
(p/resolved nil)
(let [provider (runtime-provider/resolve-provider (.-env self) runtime)]
@@ -102,9 +103,9 @@
(when (terminal-status? (:status current-session))
(<terminate-runtime! self (:runtime current-session))))))))
(defn- <consume-message-stream! [^js self session-id runtime message]
(defn- <consume-events-stream! [^js self session-id runtime]
(let [provider (runtime-provider/resolve-provider (.-env self) runtime)]
(p/let [resp (runtime-provider/<open-message-stream! provider runtime message)
(p/let [resp (runtime-provider/<open-events-stream! provider runtime)
reader (.getReader (.-body resp))]
(let [decoder (js/TextDecoder.)
buffer (atom "")]
@@ -130,6 +131,25 @@
(step!)))))]
(step!))))))
(defn- start-runtime-events-stream! [^js self session-id runtime]
(when (and (map? runtime)
(string? (:session-id runtime))
(nil? (.-runtime-events-stream self)))
(let [stream-task (-> (<consume-events-stream! self session-id runtime)
(.catch (fn [error]
(log/error :agent/runtime-events-stream-error
{:session-id session-id
:runtime-session-id (:session-id runtime)
:error error})
(<append-event! self {:type "agent.runtime.error"
:data {:session-id session-id
:message (str error)}
:ts (common/now-ms)}))))]
(set! (.-runtime-events-stream self)
(.finally stream-task
(fn []
(set! (.-runtime-events-stream self) nil)))))))
(defn- <transition! [^js self to-status event-type data]
(p/let [session (<get-session self)]
(cond
@@ -146,7 +166,8 @@
(http/json-response :sessions/pause {:ok true}))))))
(defn- <provision-runtime! [^js self task session-id]
(let [provider (runtime-provider/resolve-provider (.-env self) nil)]
(let [provider (runtime-provider/resolve-provider (.-env self) nil)
provider-kind (runtime-provider/provider-id provider)]
(p/let [runtime (runtime-provider/<provision-runtime! provider session-id task)
session (<get-session self)
events (<get-events self)]
@@ -154,7 +175,7 @@
(nil? runtime)
(throw (ex-info "runtime provisioning returned nil"
{:session-id session-id
:provider "sprites"}))
:provider provider-kind}))
(nil? session)
nil
@@ -165,10 +186,13 @@
:data {:provider (:provider runtime)
:runtime-session-id (:session-id runtime)
:sandbox-id (:sandbox-id runtime)
:sandbox-name (:sandbox-name runtime)
:sprite-name (:sprite-name runtime)}
:ts (common/now-ms)})]
(p/let [_ (<put-session! self session)
_ (<put-events! self events)]
_ (<put-events! self events)
_ (when-not (terminal-status? (:status session))
(start-runtime-events-stream! self (:id session) runtime))]
runtime))))))
(defn- handle-init [^js self request]
@@ -179,6 +203,9 @@
(p/let [_ (when-not (string? runtime-id)
(<provision-runtime! self (:task existing) session-id))
session (<get-session self)]
(when (and (map? (:runtime session))
(not (terminal-status? (:status session))))
(start-runtime-events-stream! self session-id (:runtime session)))
(http/json-response :sessions/create
{:session-id session-id
:status (:status session)
@@ -249,7 +276,7 @@
:else
(p/let [res (<append-event! self {:type "audit.log"
:data {:message message
:data {:event "user-message"
:kind (:kind body)
:by user-id}})
current-session (<get-session self)]
@@ -268,22 +295,27 @@
(http/json-response :sessions/message {:ok true})))
:else
(let [runtime (:runtime current-session)]
(p/let [_ (when (and runtime (string? (:session-id runtime)))
(-> (<consume-message-stream! self
(:id current-session)
runtime
{:message message
:kind (:kind body)})
(.catch (fn [error]
(log/error :agent/runtime-stream-error
{:session-id (:id current-session)
:runtime-session-id (:session-id runtime)
:error error})
(<append-event! self {:type "agent.runtime.error"
:data {:session-id (:id current-session)
:message (str error)}
:ts (common/now-ms)})))))]
(let [runtime (:runtime current-session)
provider (when runtime
(runtime-provider/resolve-provider (.-env self) runtime))]
(p/let [_ (when (and runtime
provider
(string? (:session-id runtime)))
(do
(start-runtime-events-stream! self (:id current-session) runtime)
(-> (runtime-provider/<send-message! provider
runtime
{:message message
:kind (:kind body)})
(.catch (fn [error]
(log/error :agent/runtime-message-error
{:session-id (:id current-session)
:runtime-session-id (:session-id runtime)
:error error})
(<append-event! self {:type "agent.runtime.error"
:data {:session-id (:id current-session)
:message (str error)}
:ts (common/now-ms)}))))))]
(http/json-response :sessions/message {:ok true})))))))))))
(defn- handle-cancel [^js self request]
@@ -305,16 +337,23 @@
(if (nil? current-session)
nil
(let [[orders next-session] (session/drain-orders current-session)
runtime (:runtime current-session)]
runtime (:runtime current-session)
provider (when runtime
(runtime-provider/resolve-provider (.-env self) runtime))]
(p/let [_ (<save-session! self next-session)
_ (when (and runtime (string? (:session-id runtime)))
_ (when (and runtime
provider
(string? (:session-id runtime)))
(start-runtime-events-stream! self (:id current-session) runtime))
_ (when (and runtime
provider
(string? (:session-id runtime)))
(p/all
(map (fn [order]
(<consume-message-stream! self
(:id current-session)
runtime
{:message (:message order)
:kind (:kind order)}))
(runtime-provider/<send-message! provider
runtime
{:message (:message order)
:kind (:kind order)}))
orders)))]
(count orders))))))

View File

@@ -1,5 +1,6 @@
(ns logseq.db-sync.worker.agent.runtime-provider
(:require [clojure.string :as string]
[lambdaisland.glogi :as log]
[logseq.db-sync.worker.agent.sandbox :as sandbox]
[promesa.core :as p]))
@@ -30,12 +31,20 @@
(let [normalized (-> value string/trim string/lower-case)]
(when-not (string/blank? normalized) normalized))))
(def ^:private supported-provider-kinds
#{"sprites" "local-dev" "cloudflare"})
(defn- known-provider-kind [value]
(let [provider (normalize-provider value)]
(when (contains? supported-provider-kinds provider)
provider)))
(defn provider-kind [^js env]
(or (normalize-provider (env-str env "AGENT_RUNTIME_PROVIDER"))
(or (known-provider-kind (env-str env "AGENT_RUNTIME_PROVIDER"))
"sprites"))
(defn runtime-provider-kind [^js env runtime]
(or (normalize-provider (:provider runtime))
(or (known-provider-kind (:provider runtime))
(provider-kind env)))
(defn fill-template [template sandbox-id]
@@ -57,6 +66,10 @@
(let [prefix (or (env-str env "SPRITES_NAME_PREFIX") "logseq-task-")]
(sanitize-name (str prefix session-id))))
(defn cloudflare-sandbox-name [^js env session-id]
(let [prefix (or (env-str env "CLOUDFLARE_SANDBOX_NAME_PREFIX") "logseq-task-")]
(sanitize-name (str prefix session-id))))
(defn- sprites-token [^js env]
(or (env-str env "SPRITE_TOKEN")
(env-str env "SPRITES_TOKEN")))
@@ -105,6 +118,21 @@
(str "http://127.0.0.1:" port path))
(def ^:private default-repo-base-dir "/workspace")
(def ^:private cloudflare-local-host "http://localhost")
(defn- cloudflare-agent-port [^js env]
(parse-int (env-str env "CLOUDFLARE_SANDBOX_AGENT_PORT") 2468))
(defn- cloudflare-health-retries [^js env]
(parse-int (env-str env "CLOUDFLARE_HEALTH_RETRIES") 30))
(defn- cloudflare-health-interval-ms [^js env]
(parse-int (env-str env "CLOUDFLARE_HEALTH_INTERVAL_MS") 200))
(defn- js-method
[obj method]
(let [f (when obj (aget obj method))]
(when (fn? f) f)))
(defn- ->promise
[v]
@@ -305,20 +333,26 @@
(string/replace "{repo_dir}" (or repo-dir ""))))
(defn repo-clone-command
[^js env session-id task]
(let [repo-url (task-repo-url task)
session-id (some-> session-id str)
repo-dir (get-repo-dir session-id)
override (env-str env "SPRITES_REPO_CLONE_COMMAND")]
(when (and (string? repo-url) (string? session-id) (string? repo-dir))
(if (string? override)
(fill-repo-template override {:repo-url repo-url
:session-id session-id
:repo-dir repo-dir})
(str "mkdir -p " default-repo-base-dir
" && cd " default-repo-base-dir
" && git clone '" (escape-shell-single repo-url) "' '" (escape-shell-single repo-dir) "'"
" && chmod -R u+rw '" (escape-shell-single repo-dir) "'")))))
([^js env session-id task]
(repo-clone-command env session-id task "sprites"))
([^js env session-id task provider]
(let [repo-url (task-repo-url task)
session-id (some-> session-id str)
repo-dir (get-repo-dir session-id)
override-key (if (= "cloudflare" provider)
"CLOUDFLARE_REPO_CLONE_COMMAND"
"SPRITES_REPO_CLONE_COMMAND")
override (env-str env override-key)]
(when (and (string? repo-url) (string? session-id) (string? repo-dir))
(if (string? override)
(fill-repo-template override {:repo-url repo-url
:session-id session-id
:repo-dir repo-dir})
(str "mkdir -p " default-repo-base-dir
" && cd " default-repo-base-dir
" && git clone --depth 1 --single-branch --no-tags '"
(escape-shell-single repo-url) "' '" (escape-shell-single repo-dir) "'"
" && chmod -R u+rw '" (escape-shell-single repo-dir) "'"))))))
(defn session-payload [task]
(let [agent (or (get-in task [:agent :provider])
@@ -336,6 +370,10 @@
"bypass"
"default"))}))
(defn- message-payload [message]
(cond-> {:message (:message message)}
(string? (:kind message)) (assoc :kind (:kind message))))
(defn- <sprite-create-session! [^js env sprite-name port agent-token session-id payload]
(let [script (str "resp=$(curl -sS -w '\\n%{http_code}' -X POST -H 'content-type: application/json' "
(if agent-token
@@ -348,7 +386,6 @@
"body=$(echo \"$resp\" | sed '$d'); "
"printf \"%s\" \"$body\"; "
"printf \"STATUS:%s\" \"$status\" 1>&2")]
(println :debug :script script)
(p/let [result (sprites-exec-post! env sprite-name ["bash" "-lc" script])
stdout (or (:stdout result) "")
stderr (or (:stderr result) "")
@@ -369,12 +406,274 @@
(sprites-exec-post! env sprite-name ["bash" "-lc" cmd])))
;; ============================================================
;; RuntimeProvider + SpritesProvider
;; RuntimeProvider + Providers
;; ============================================================
(defn- local-dev-base-url [^js env runtime]
(or (:base-url runtime)
(env-str env "SANDBOX_AGENT_URL")
"http://127.0.0.1:2468"))
(defn- local-dev-token [^js env runtime]
(or (:agent-token runtime)
(env-str env "SANDBOX_AGENT_TOKEN")))
(defn- cloudflare-sandbox-namespace [^js env]
(let [sandbox-ns (aget env "Sandbox")]
(when-not sandbox-ns
(throw (ex-info "missing Sandbox binding for cloudflare runtime provider" {})))
sandbox-ns))
(defn- cloudflare-sandbox [^js env sandbox-id]
(let [^js sandbox-ns (cloudflare-sandbox-namespace env)
id-from-name (js-method sandbox-ns "idFromName")
get-sandbox (js-method sandbox-ns "get")]
(when-not (and id-from-name get-sandbox)
(throw (ex-info "invalid Sandbox binding: missing idFromName/get" {})))
(let [do-id (.idFromName sandbox-ns sandbox-id)
sandbox (.get sandbox-ns do-id)]
(when-not sandbox
(throw (ex-info "failed to get cloudflare sandbox stub"
{:sandbox-id sandbox-id})))
sandbox)))
(defn- cloudflare-health-command [port agent-token]
(str "curl -fsS "
(curl-auth-arg agent-token)
" "
cloudflare-local-host
":"
port
"/v1/health >/dev/null"))
(defn- <cloudflare-exec! [sandbox cmd]
(if (js-method sandbox "exec")
(->promise (.exec sandbox cmd))
(throw (ex-info "cloudflare sandbox missing exec method" {}))))
(defn- <cloudflare-health-once! [sandbox port agent-token]
(-> (js/Promise.resolve (<cloudflare-exec! sandbox (cloudflare-health-command port agent-token)))
(.then (fn [result]
(let [success (when result (aget result "success"))]
(if (boolean? success)
success
true))))
(.catch (fn [_] false))))
(defn- <cloudflare-health! [sandbox port agent-token retries interval-ms]
(log/debug :agent/cloudflare-health-check
{:port port
:retries retries
:interval-ms interval-ms
:token? (boolean (string? agent-token))})
(if (<= retries 0)
(throw (ex-info "sandbox-agent health check timed out in cloudflare sandbox"
{:port port}))
(p/let [healthy? (<cloudflare-health-once! sandbox port agent-token)]
(if healthy?
true
(p/let [_ (p/delay interval-ms)]
(<cloudflare-health! sandbox port agent-token (dec retries) interval-ms))))))
(defn- normalize-agent-env-map [env-map]
(if-not (map? env-map)
{}
(reduce-kv (fn [acc k v]
(if (and (some? k) (string? v))
(assoc acc (name k) v)
acc))
{}
env-map)))
(def ^:private cloudflare-env-pass-through
["OPENAI_API_KEY"
"ANTHROPIC_API_KEY"
"OPENAI_BASE_URL"
"ANTHROPIC_BASE_URL"])
(defn- cloudflare-agent-env-vars [^js env task]
(let [base (reduce (fn [acc k]
(if-let [v (env-str env k)]
(assoc acc k v)
acc))
{}
cloudflare-env-pass-through)
task-env (normalize-agent-env-map (get-in task [:agent :env]))
agent-id (:agent (session-payload task))
api-token (some-> (get-in task [:agent :api-token]) str string/trim not-empty)]
(cond-> (merge base task-env)
(and (string? api-token) (= "codex" agent-id)) (assoc "OPENAI_API_KEY" api-token)
(and (string? api-token) (= "claude" agent-id)) (assoc "ANTHROPIC_API_KEY" api-token))))
(defn- cloudflare-server-command [^js env task port agent-token]
(let [auth-json (get-in task [:agent :auth-json])
write-auth (or (auth-json-write-command auth-json) "")
override (env-str env "CLOUDFLARE_BOOTSTRAP_COMMAND")]
(or override
(str write-auth
"sandbox-agent server "
(if (string? agent-token)
(str "--token '" (escape-shell-single agent-token) "'")
"--no-token")
" --host 0.0.0.0 --port " port
" --no-telemetry"))))
(defn- <cloudflare-set-env-vars! [^js sandbox env-vars]
(cond
(empty? env-vars)
(p/resolved nil)
(js-method sandbox "setEnvVars")
(->promise (.setEnvVars sandbox (clj->js env-vars)))
:else
(p/resolved nil)))
(defn- <cloudflare-start-server! [^js env ^js sandbox task port agent-token]
(let [command (cloudflare-server-command env task port agent-token)]
(if (js-method sandbox "startProcess")
(->promise (.startProcess sandbox command))
(->promise (<cloudflare-exec! sandbox (str "nohup " command " >/tmp/sandbox-agent.log 2>&1 &"))))))
(defn- cloudflare-fetch-port-candidates [port]
(->> [port 2468 8000]
(filter number?)
(distinct)
(vec)))
(defn- error-message [error]
(let [msg (when error (aget error "message"))]
(if (string? msg) msg (str error))))
(defn- cloudflare-port-error?
[error]
(let [m (-> (error-message error) string/lower-case)]
(or (string/includes? m "container port not found")
(string/includes? m "connection refused")
(string/includes? m "error proxying request to container"))))
(defn- <cloudflare-container-fetch! [^js sandbox request port]
(if-not (js-method sandbox "containerFetch")
(throw (ex-info "cloudflare sandbox missing containerFetch method" {}))
(let [ports (cloudflare-fetch-port-candidates port)]
(letfn [(step [idx last-error]
(if (>= idx (count ports))
(if last-error
(throw last-error)
(throw (ex-info "cloudflare containerFetch failed: no candidate ports"
{:requested-port port})))
(let [candidate (nth ports idx)]
(p/catch
(->promise (.containerFetch sandbox request candidate))
(fn [error]
(if (and (cloudflare-port-error? error)
(< (inc idx) (count ports)))
(step (inc idx) error)
(throw error)))))))]
(step 0 nil)))))
(defn- <cloudflare-json-request!
[sandbox port method path {:keys [token json-body]}]
(let [^js headers (js/Headers.)
_ (.set headers "accept" "application/json")
_ (when json-body (.set headers "content-type" "application/json"))
_ (when token (.set headers "authorization" (str "Bearer " token)))
request (js/Request. (str cloudflare-local-host path)
(clj->js (cond-> {:method method :headers headers}
json-body
(assoc :body (js/JSON.stringify (clj->js json-body))))))]
(p/let [resp (<cloudflare-container-fetch! sandbox request port)
status (.-status resp)
raw (->promise (.text resp))
parsed (parse-json-safe raw)]
(if (<= 200 status 299)
parsed
(throw (ex-info "cloudflare sandbox request failed"
{:status status
:path path
:body parsed
:raw-body raw}))))))
(defn- <cloudflare-events-stream-request!
[sandbox port token session-id]
(let [^js headers (js/Headers.)
_ (.set headers "accept" "text/event-stream")
_ (when token (.set headers "authorization" (str "Bearer " token)))
request (js/Request. (str cloudflare-local-host
"/v1/sessions/"
session-id
"/events/sse")
#js {:method "GET"
:headers headers})]
(p/let [resp (<cloudflare-container-fetch! sandbox request port)
status (.-status resp)]
(if (<= 200 status 299)
resp
(throw (ex-info "cloudflare sandbox open-events-stream failed"
{:status status
:session-id session-id}))))))
(defn- <cloudflare-send-message! [sandbox port token session-id message]
(<cloudflare-json-request! sandbox
port
"POST"
(str "/v1/sessions/" session-id "/messages")
{:token token
:json-body (message-payload message)}))
(defn- <cloudflare-clone-repo! [^js env sandbox session-id task]
(when-let [cmd (repo-clone-command env session-id task "cloudflare")]
(<cloudflare-exec! sandbox cmd)))
(defn- <cloudflare-ensure-running! [^js env sandbox task port agent-token]
(p/let [healthy? (->promise (<cloudflare-health-once! sandbox port agent-token))]
(if healthy?
true
(p/let [env-vars (cloudflare-agent-env-vars env task)
_ (<cloudflare-set-env-vars! sandbox env-vars)
_ (<cloudflare-start-server! env sandbox task port agent-token)
_ (<cloudflare-health! sandbox
port
agent-token
(cloudflare-health-retries env)
(cloudflare-health-interval-ms env))]
(log/debug :agent/cloudflare-ready {:port port})
true))))
(defn- <cloudflare-create-session! [sandbox port agent-token session-id payload]
(p/let [response (<cloudflare-json-request! sandbox
port
"POST"
(str "/v1/sessions/" session-id)
{:token agent-token
:json-body payload})]
(assoc response :session-id session-id)))
(defn- <cloudflare-terminate-session! [sandbox port agent-token session-id]
(<cloudflare-json-request! sandbox
port
"POST"
(str "/v1/sessions/" session-id "/terminate")
{:token agent-token}))
(defn- <cloudflare-delete-sandbox! [^js sandbox]
(cond
(js-method sandbox "delete")
(->promise (.delete sandbox))
(js-method sandbox "remove")
(->promise (.remove sandbox))
(js-method sandbox "destroy")
(->promise (.destroy sandbox))
:else
(p/resolved nil)))
(defprotocol RuntimeProvider
(<provision-runtime! [this session-id task])
(<open-message-stream! [this runtime message])
(<open-events-stream! [this runtime])
(<send-message! [this runtime message])
(<terminate-runtime! [this runtime]))
(defrecord SpritesProvider [env]
@@ -398,9 +697,9 @@
:agent-token agent-token
:session-id (:session-id response)})))
;; REALTIME streaming endpoint:
;; REALTIME events endpoint:
;; We stream stdout bytes of `curl -N ... text/event-stream` run inside sprite.
(<open-message-stream! [_ runtime message]
(<open-events-stream! [_ runtime]
(let [name (:sprite-name runtime)
port (or (:sandbox-port runtime)
(parse-int (env-str env "SPRITES_SANDBOX_AGENT_PORT") 2468))
@@ -409,24 +708,173 @@
(when-not (string? name)
(throw (ex-info "missing sprite-name on runtime" {:runtime runtime})))
(let [script (str "curl -sS -N -X POST "
(let [script (str "curl -sS -N -X GET "
"-H 'accept: text/event-stream' "
(curl-auth-arg agent-token)
" "
(sprite-local-url port (str "/v1/sessions/" (:session-id runtime) "/events/sse")))]
(sprites-ws->sse-response! env name ["bash" "-lc" script]))))
(<send-message! [_ runtime message]
(let [name (:sprite-name runtime)
port (or (:sandbox-port runtime)
(parse-int (env-str env "SPRITES_SANDBOX_AGENT_PORT") 2468))
agent-token (or (:agent-token runtime)
(env-str env "SANDBOX_AGENT_TOKEN"))]
(when-not (string? name)
(throw (ex-info "missing sprite-name on runtime" {:runtime runtime})))
(let [script (str "curl -fsS -X POST "
"-H 'content-type: application/json' "
(curl-auth-arg agent-token)
" "
(curl-json-arg {:message (:message message)})
(curl-json-arg (message-payload message))
" "
(sprite-local-url port (str "/v1/sessions/" (:session-id runtime) "/messages/stream")))]
(sprites-ws->sse-response! env name ["bash" "-lc" script]))))
(sprite-local-url port (str "/v1/sessions/" (:session-id runtime) "/messages"))
" >/dev/null")]
(p/let [_ (sprites-exec-post! env name ["bash" "-lc" script])]
true))))
(<terminate-runtime! [_ runtime]
(if-not (string? (:sprite-name runtime))
(p/resolved nil)
(sprites-delete! env (:sprite-name runtime)))))
(defn create-provider [^js env _kind]
(->SpritesProvider env))
(defrecord LocalDevProvider [env]
RuntimeProvider
(<provision-runtime! [_ session-id task]
(let [base-url (local-dev-base-url env nil)
agent-token (env-str env "SANDBOX_AGENT_TOKEN")
payload (session-payload task)]
(p/let [response (sandbox/<create-session base-url agent-token session-id payload)]
{:provider "local-dev"
:base-url base-url
:agent-token agent-token
:session-id (:session-id response)})))
(<open-events-stream! [_ runtime]
(let [base-url (local-dev-base-url env runtime)
agent-token (local-dev-token env runtime)]
(sandbox/<open-events-stream base-url agent-token (:session-id runtime))))
(<send-message! [_ runtime message]
(let [base-url (local-dev-base-url env runtime)
agent-token (local-dev-token env runtime)]
(sandbox/<send-message base-url agent-token (:session-id runtime) message)))
(<terminate-runtime! [_ runtime]
(let [base-url (local-dev-base-url env runtime)
agent-token (local-dev-token env runtime)
session-id (:session-id runtime)]
(if-not (string? session-id)
(p/resolved nil)
(p/catch
(sandbox/<terminate-session base-url agent-token session-id)
(fn [_] nil))))))
(defrecord CloudflareProvider [env]
RuntimeProvider
(<provision-runtime! [_ session-id task]
(let [sandbox-id (cloudflare-sandbox-name env session-id)
sandbox (cloudflare-sandbox env sandbox-id)
port (cloudflare-agent-port env)
agent-token (env-str env "SANDBOX_AGENT_TOKEN")
payload (session-payload task)]
(log/debug :agent/cloudflare-provision-start
{:session-id session-id
:sandbox-id sandbox-id
:port port
:token? (boolean (string? agent-token))})
(p/let [_ (<cloudflare-ensure-running! env sandbox task port agent-token)
_ (<cloudflare-clone-repo! env sandbox session-id task)
response (<cloudflare-create-session! sandbox port agent-token session-id payload)]
(log/debug :agent/cloudflare-provisioned
{:session-id session-id
:sandbox-id sandbox-id
:runtime-session-id (:session-id response)})
{:provider "cloudflare"
:sandbox-id sandbox-id
:sandbox-name sandbox-id
:sandbox-port port
:agent-token agent-token
:session-id (:session-id response)})))
(<open-events-stream! [_ runtime]
(let [sandbox-id (:sandbox-id runtime)
session-id (:session-id runtime)
port (or (:sandbox-port runtime) (cloudflare-agent-port env))
agent-token (or (:agent-token runtime)
(env-str env "SANDBOX_AGENT_TOKEN"))]
(log/debug :agent/cloudflare-open-events-stream
{:session-id session-id
:sandbox-id sandbox-id
:port port
:token? (boolean (string? agent-token))})
(when-not (string? sandbox-id)
(throw (ex-info "missing sandbox-id on runtime" {:runtime runtime})))
(when-not (string? session-id)
(throw (ex-info "missing runtime session-id on runtime" {:runtime runtime})))
(let [sandbox (cloudflare-sandbox env sandbox-id)]
(<cloudflare-events-stream-request! sandbox port agent-token session-id))))
(<send-message! [_ runtime message]
(let [sandbox-id (:sandbox-id runtime)
session-id (:session-id runtime)
port (or (:sandbox-port runtime) (cloudflare-agent-port env))
agent-token (or (:agent-token runtime)
(env-str env "SANDBOX_AGENT_TOKEN"))]
(log/debug :agent/cloudflare-send-message-request
{:session-id session-id
:sandbox-id sandbox-id
:port port
:token? (boolean (string? agent-token))
:message-len (count (or (:message message) ""))})
(when-not (string? sandbox-id)
(throw (ex-info "missing sandbox-id on runtime" {:runtime runtime})))
(when-not (string? session-id)
(throw (ex-info "missing runtime session-id on runtime" {:runtime runtime})))
(let [sandbox (cloudflare-sandbox env sandbox-id)]
(<cloudflare-send-message! sandbox port agent-token session-id message))))
(<terminate-runtime! [_ runtime]
(let [sandbox-id (:sandbox-id runtime)
session-id (:session-id runtime)]
(log/debug :agent/cloudflare-terminate
{:session-id session-id
:sandbox-id sandbox-id})
(if-not (string? sandbox-id)
(p/resolved nil)
(let [sandbox (cloudflare-sandbox env sandbox-id)
port (or (:sandbox-port runtime) (cloudflare-agent-port env))
agent-token (or (:agent-token runtime)
(env-str env "SANDBOX_AGENT_TOKEN"))]
(p/catch
(p/let [_ (when (string? session-id)
(p/catch
(<cloudflare-terminate-session! sandbox port agent-token session-id)
(fn [_] nil)))
_ (p/catch (<cloudflare-delete-sandbox! sandbox)
(fn [_] nil))]
nil)
(fn [_] nil)))))))
(defn provider-id [provider]
(cond
(instance? SpritesProvider provider) "sprites"
(instance? LocalDevProvider provider) "local-dev"
(instance? CloudflareProvider provider) "cloudflare"
:else nil))
(defn create-provider [^js env kind]
(log/debug :agent/runtime-provider-selected
{:requested kind
:resolved (known-provider-kind kind)})
(case (known-provider-kind kind)
"local-dev" (->LocalDevProvider env)
"cloudflare" (->CloudflareProvider env)
(->SpritesProvider env)))
(defn resolve-provider
[^js env _runtime]
(create-provider env "sprites"))
[^js env runtime]
(create-provider env (runtime-provider-kind env runtime)))

View File

@@ -18,6 +18,12 @@
(defn messages-stream-url [base session-id]
(str (session-url base session-id) "/messages/stream"))
(defn events-sse-url [base session-id]
(str (session-url base session-id) "/events/sse"))
(defn terminate-url [base session-id]
(str (session-url base session-id) "/terminate"))
(def ^:private agent-aliases
{"claude-code" "claude"
"claude_code" "claude"
@@ -75,3 +81,47 @@
(throw (ex-info "sandbox open-message-stream failed"
{:status status
:session-id session-id}))))))
(defn <open-events-stream
[base token session-id]
(let [headers (js/Headers.)
_ (.set headers "accept" "text/event-stream")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
req (json-request (events-sse-url base session-id) "GET" headers nil)]
(p/let [resp (js/fetch req)
status (.-status resp)]
(if (<= 200 status 299)
resp
(throw (ex-info "sandbox open-events-stream failed"
{:status status
:session-id session-id}))))))
(defn <send-message
[base token session-id message]
(let [headers (js/Headers.)
_ (.set headers "content-type" "application/json")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
body (cond-> {:message (:message message)}
(string? (:kind message)) (assoc :kind (:kind message)))
req (json-request (messages-url base session-id) "POST" headers body)]
(p/let [resp (js/fetch req)
status (.-status resp)]
(if (<= 200 status 299)
true
(throw (ex-info "sandbox send-message failed"
{:status status
:session-id session-id}))))))
(defn <terminate-session
[base token session-id]
(let [headers (js/Headers.)
_ (.set headers "content-type" "application/json")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
req (json-request (terminate-url base session-id) "POST" headers nil)]
(p/let [resp (js/fetch req)
status (.-status resp)]
(when-not (<= 200 status 299)
(throw (ex-info "sandbox terminate-session failed"
{:status status
:session-id session-id})))
true)))

View File

@@ -0,0 +1,151 @@
(ns logseq.db-sync.agent-do-test
(:require [cljs.test :refer [async deftest is testing]]
[clojure.string :as string]
[logseq.db-sync.worker.agent.do :as agent-do]))
(defn- make-agent-storage []
(let [data (js/Map.)]
#js {:get (fn [k]
(js/Promise.resolve (.get data k)))
:put (fn [k v]
(.set data k v)
(js/Promise.resolve nil))}))
(defn- make-self [env]
#js {:env env
:storage (make-agent-storage)
:streams (js/Map.)})
(defn- json-request
[url method body headers]
(let [^js req-headers (js/Headers.)]
(doseq [[k v] headers]
(.set req-headers k v))
(js/Request.
url
(clj->js (cond-> {:method method
:headers req-headers}
(some? body) (assoc :body (js/JSON.stringify (clj->js body))))))))
(defn- <json [^js resp]
(.then (.json resp) #(js->clj % :keywordize-keys true)))
(deftest messages-use-single-events-stream-and-dont-duplicate-user-message-test
(testing "session messages post to /messages while keeping one /events/sse stream and no audit message payload"
(async done
(let [calls (atom {:create 0
:messages 0
:message-stream 0
:events-sse 0})
original-fetch js/fetch
env #js {"AGENT_RUNTIME_PROVIDER" "local-dev"
"SANDBOX_AGENT_URL" "http://sandbox.local"}
self (make-self env)
headers {"content-type" "application/json"
"x-user-id" "user-1"}
init-body {:id "sess-1"
:project {:id "project-1"}
:agent "codex"}]
(set! js/fetch
(fn [request]
(let [url (.-url request)
method (.-method request)]
(cond
(and (= "POST" method)
(string/includes? url "/v1/sessions/sess-1")
(not (string/includes? url "/messages")))
(do
(swap! calls update :create inc)
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200
:headers #js {"content-type" "application/json"}})))
(and (= "GET" method)
(string/includes? url "/v1/sessions/sess-1/events/sse"))
(do
(swap! calls update :events-sse inc)
(let [stream (js/TransformStream.)
writer (.getWriter (.-writable stream))
payload (.encode (js/TextEncoder.)
"data: {\"type\":\"item.delta\",\"delta\":\"ok\"}\n\n")]
;; Keep the stream open to verify we don't reopen per user message.
(.write writer payload)
(js/Promise.resolve
(js/Response.
(.-readable stream)
#js {:status 200
:headers #js {"content-type" "text/event-stream"}}))))
(and (= "POST" method)
(string/includes? url "/v1/sessions/sess-1/messages/stream"))
(do
(swap! calls update :message-stream inc)
(js/Promise.resolve
(js/Response.
"data: {\"type\":\"item.delta\",\"delta\":\"ok\"}\n\n"
#js {:status 200
:headers #js {"content-type" "text/event-stream"}})))
(and (= "POST" method)
(string/includes? url "/v1/sessions/sess-1/messages")
(not (string/includes? url "/messages/stream")))
(do
(swap! calls update :messages inc)
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200
:headers #js {"content-type" "application/json"}})))
:else
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:error "unhandled request"})
#js {:status 500
:headers #js {"content-type" "application/json"}}))))))
(let [promise (-> (agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/init"
"POST"
init-body
headers))
(.then (fn [_]
(agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/messages"
"POST"
{:message "hello"}
headers))))
(.then (fn [_]
(agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/messages"
"POST"
{:message "follow up"}
headers))))
(.then (fn [_]
(agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/events"
"GET"
nil
{"x-user-id" "user-1"}))))
(.then (fn [events-resp]
(.then (<json events-resp)
(fn [body]
(set! js/fetch original-fetch)
(is (= 1 (:create @calls)))
(is (= 2 (:messages @calls)))
(is (= 1 (:events-sse @calls)))
(is (= 0 (:message-stream @calls)))
(let [events (:events body)
duplicated (filter (fn [event]
(and (= "audit.log" (:type event))
(string? (get-in event [:data :message]))))
events)]
(is (zero? (count duplicated))))
(done))))))]
(.catch promise
(fn [error]
(set! js/fetch original-fetch)
(is false (str "unexpected error: " error))
(done))))))))

View File

@@ -1,5 +1,5 @@
(ns logseq.db-sync.agent-runtime-provider-test
(:require [cljs.test :refer [deftest is testing]]
(:require [cljs.test :refer [async deftest is testing]]
[clojure.string :as string]
[logseq.db-sync.worker.agent.runtime-provider :as runtime-provider]))
@@ -7,7 +7,9 @@
(testing "normalizes configured runtime provider"
(is (= "sprites" (runtime-provider/provider-kind #js {})))
(is (= "sprites" (runtime-provider/provider-kind #js {"AGENT_RUNTIME_PROVIDER" "SPRITES"})))
(is (= "local-dev" (runtime-provider/provider-kind #js {"AGENT_RUNTIME_PROVIDER" "LOCAL-DEV"})))))
(is (= "local-dev" (runtime-provider/provider-kind #js {"AGENT_RUNTIME_PROVIDER" "LOCAL-DEV"})))
(is (= "cloudflare" (runtime-provider/provider-kind #js {"AGENT_RUNTIME_PROVIDER" "CLOUDFLARE"})))
(is (= "sprites" (runtime-provider/provider-kind #js {"AGENT_RUNTIME_PROVIDER" "unknown"})))))
(deftest fill-template-test
(testing "fills sandbox id placeholders"
@@ -24,14 +26,41 @@
(is (= "sprites"
(runtime-provider/runtime-provider-kind env {:provider "sprites"})))
(is (= "cloudflare"
(runtime-provider/runtime-provider-kind env {:provider "cloudflare"}))))))
(runtime-provider/runtime-provider-kind env {:provider "cloudflare"})))
(is (= "cloudflare"
(runtime-provider/runtime-provider-kind env {:provider "unknown"}))))))
(deftest provider-dispatch-test
(testing "create-provider and resolve-provider dispatch supported providers"
(let [env #js {"AGENT_RUNTIME_PROVIDER" "cloudflare"}]
(is (= "sprites"
(runtime-provider/provider-id (runtime-provider/create-provider env "sprites"))))
(is (= "local-dev"
(runtime-provider/provider-id (runtime-provider/create-provider env "local-dev"))))
(is (= "cloudflare"
(runtime-provider/provider-id (runtime-provider/create-provider env "cloudflare"))))
(is (= "sprites"
(runtime-provider/provider-id (runtime-provider/create-provider env "unknown"))))
(is (= "local-dev"
(runtime-provider/provider-id (runtime-provider/resolve-provider env {:provider "local-dev"}))))
(is (= "cloudflare"
(runtime-provider/provider-id (runtime-provider/resolve-provider env nil)))))))
(deftest cloudflare-name-test
(testing "cloudflare sandbox names are deterministic and sanitized"
(is (= "logseq-task-sess-1"
(runtime-provider/cloudflare-sandbox-name #js {} "sess-1")))
(is (= "my-prefix-task-1"
(runtime-provider/cloudflare-sandbox-name
#js {"CLOUDFLARE_SANDBOX_NAME_PREFIX" "My Prefix_"}
"Task 1")))))
(deftest repo-clone-command-test
(testing "builds default repo clone command when repo url exists"
(let [env #js {}
task {:project {:repo-url "https://github.com/example/repo"}}
session-id "sess-1"]
(is (= "mkdir -p /workspace && cd /workspace && git clone 'https://github.com/example/repo' '/workspace/sess-1' && chmod -R u+rw '/workspace/sess-1'"
(is (= "mkdir -p /workspace && cd /workspace && git clone --depth 1 --single-branch --no-tags 'https://github.com/example/repo' '/workspace/sess-1' && chmod -R u+rw '/workspace/sess-1'"
(runtime-provider/repo-clone-command env session-id task)))))
(testing "fills override repo clone command template"
@@ -45,8 +74,8 @@
(is (nil? (runtime-provider/repo-clone-command #js {} "sess-1" {})))))
(deftest session-payload-test
(testing "defaults permission mode to default"
(is (= "default"
(testing "defaults codex permission mode to bypass"
(is (= "bypass"
(:permissionMode (runtime-provider/session-payload {:agent "codex"})))))
(testing "keeps explicit permission mode"
@@ -62,3 +91,193 @@
(testing "returns nil when auth json missing"
(is (nil? (runtime-provider/auth-json-write-command nil)))))
(deftest local-dev-provider-provision-test
(async done
(let [env #js {"SANDBOX_AGENT_URL" "http://127.0.0.1:2468"
"SANDBOX_AGENT_TOKEN" "token-1"}
provider (runtime-provider/create-provider env "local-dev")
task {:agent {:provider "codex"}}
original-fetch js/fetch]
(set! js/fetch
(fn [request]
(is (= "http://127.0.0.1:2468/v1/sessions/sess-1" (.-url request)))
(is (= "POST" (.-method request)))
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200 :headers #js {"content-type" "application/json"}}))))
(-> (runtime-provider/<provision-runtime! provider "sess-1" task)
(.then (fn [runtime]
(set! js/fetch original-fetch)
(is (= "local-dev" (:provider runtime)))
(is (= "http://127.0.0.1:2468" (:base-url runtime)))
(is (= "sess-1" (:session-id runtime)))
(done)))
(.catch (fn [error]
(set! js/fetch original-fetch)
(is false (str "unexpected error: " error))
(done)))))))
(deftest local-dev-provider-events-stream-test
(async done
(let [env #js {"SANDBOX_AGENT_TOKEN" "token-1"}
provider (runtime-provider/create-provider env "local-dev")
runtime {:provider "local-dev"
:base-url "http://sandbox.local"
:session-id "sess-2"}
original-fetch js/fetch]
(set! js/fetch
(fn [request]
(is (= "http://sandbox.local/v1/sessions/sess-2/events/sse" (.-url request)))
(is (= "GET" (.-method request)))
(js/Promise.resolve
(js/Response. "data: {\"type\":\"ok\"}\n\n"
#js {:status 200
:headers #js {"content-type" "text/event-stream"}}))))
(-> (runtime-provider/<open-events-stream! provider runtime)
(.then (fn [resp]
(set! js/fetch original-fetch)
(is (= 200 (.-status resp)))
(done)))
(.catch (fn [error]
(set! js/fetch original-fetch)
(is false (str "unexpected error: " error))
(done)))))))
(deftest local-dev-provider-send-message-test
(async done
(let [env #js {"SANDBOX_AGENT_TOKEN" "token-1"}
provider (runtime-provider/create-provider env "local-dev")
runtime {:provider "local-dev"
:base-url "http://sandbox.local"
:session-id "sess-2"}
original-fetch js/fetch]
(set! js/fetch
(fn [request]
(is (= "http://sandbox.local/v1/sessions/sess-2/messages" (.-url request)))
(is (= "POST" (.-method request)))
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200
:headers #js {"content-type" "application/json"}}))))
(-> (runtime-provider/<send-message! provider runtime {:message "hello" :kind "chat"})
(.then (fn [ok?]
(set! js/fetch original-fetch)
(is (true? ok?))
(done)))
(.catch (fn [error]
(set! js/fetch original-fetch)
(is false (str "unexpected error: " error))
(done)))))))
(deftest cloudflare-provider-provision-test
(async done
(let [calls (atom {:health 0})
sandbox-stub
#js {:exec
(fn [cmd]
(if (string/includes? cmd "/v1/health")
(let [n (swap! calls update :health inc)]
(js/Promise.resolve #js {:success (>= (:health n) 2)}))
(js/Promise.resolve #js {:success true :stdout "" :stderr ""})))
:setEnvVars
(fn [vars]
(swap! calls assoc :env (js->clj vars))
(js/Promise.resolve nil))
:startProcess
(fn [cmd]
(swap! calls assoc :start cmd)
(js/Promise.resolve nil))
:containerFetch
(fn [req port]
(swap! calls assoc :create-url (.-url req) :create-port port :create-method (.-method req))
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200 :headers #js {"content-type" "application/json"}})))}
sandbox-ns
#js {:idFromName (fn [name]
(swap! calls assoc :sandbox-name name)
(str "id-" name))
:get (fn [id]
(swap! calls assoc :sandbox-id id)
sandbox-stub)}
env #js {"Sandbox" sandbox-ns
"CLOUDFLARE_SANDBOX_AGENT_PORT" "8000"
"OPENAI_API_KEY" "sk-openai"}
provider (runtime-provider/create-provider env "cloudflare")
task {:agent {:provider "codex"}}]
(-> (runtime-provider/<provision-runtime! provider "sess-1" task)
(.then (fn [runtime]
(is (= "cloudflare" (:provider runtime)))
(is (= "logseq-task-sess-1" (:sandbox-id runtime)))
(is (= "sess-1" (:session-id runtime)))
(is (= "POST" (:create-method @calls)))
(is (string/includes? (:create-url @calls) "/v1/sessions/sess-1"))
(is (= "sk-openai" (get (:env @calls) "OPENAI_API_KEY")))
(is (string/includes? (:start @calls) "sandbox-agent server"))
(done)))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done)))))))
(deftest cloudflare-provider-events-stream-test
(async done
(let [calls (atom {})
sandbox-stub
#js {:containerFetch
(fn [req port]
(swap! calls assoc :url (.-url req) :method (.-method req) :port port)
(js/Promise.resolve
(js/Response.
"data: {\"type\":\"item.delta\",\"delta\":\"ok\"}\n\n"
#js {:status 200 :headers #js {"content-type" "text/event-stream"}})))}
sandbox-ns
#js {:idFromName (fn [_] "id-sbx")
:get (fn [_] sandbox-stub)}
env #js {"Sandbox" sandbox-ns}
provider (runtime-provider/create-provider env "cloudflare")
runtime {:provider "cloudflare"
:sandbox-id "sbx-1"
:sandbox-port 8000
:session-id "sess-3"}]
(-> (runtime-provider/<open-events-stream! provider runtime)
(.then (fn [resp]
(is (= 200 (.-status resp)))
(is (= "GET" (:method @calls)))
(is (string/includes? (:url @calls) "/v1/sessions/sess-3/events/sse"))
(done)))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done)))))))
(deftest cloudflare-provider-send-message-test
(async done
(let [calls (atom {})
sandbox-stub
#js {:containerFetch
(fn [req port]
(swap! calls assoc :url (.-url req) :method (.-method req) :port port)
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200 :headers #js {"content-type" "application/json"}})))}
sandbox-ns
#js {:idFromName (fn [_] "id-sbx")
:get (fn [_] sandbox-stub)}
env #js {"Sandbox" sandbox-ns}
provider (runtime-provider/create-provider env "cloudflare")
runtime {:provider "cloudflare"
:sandbox-id "sbx-1"
:sandbox-port 8000
:session-id "sess-3"}]
(-> (runtime-provider/<send-message! provider runtime {:message "ping" :kind "chat"})
(.then (fn [_]
(is (= "POST" (:method @calls)))
(is (string/includes? (:url @calls) "/v1/sessions/sess-3/messages"))
(done)))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done)))))))

View File

@@ -18,6 +18,8 @@
(sandbox/session-url base session-id)))
(is (= "https://sandbox.example/v1/sessions/sess-1/messages"
(sandbox/messages-url base session-id)))
(is (= "https://sandbox.example/v1/sessions/sess-1/events/sse"
(sandbox/events-sse-url base session-id)))
(is (= "https://sandbox.example/v1/sessions/sess-1/messages/stream"
(sandbox/messages-stream-url base session-id))))))

View File

@@ -1,5 +1,6 @@
(ns logseq.db-sync.test-runner
(:require [cljs.test :as ct]
[logseq.db-sync.agent-do-test]
[logseq.db-sync.agent-request-test]
[logseq.db-sync.agent-runtime-provider-test]
[logseq.db-sync.node-config-test]

View File

@@ -1,10 +1,10 @@
FROM docker.io/cloudflare/sandbox:0.3.3
FROM docker.io/cloudflare/sandbox:0.7.0
WORKDIR /workspace
RUN curl -fsSL https://releases.rivet.dev/sandbox-agent/latest/install.sh | sh
RUN sandbox-agent install-agent codex
# Pre-install agents
RUN sandbox-agent install-agent claude && \
sandbox-agent install-agent codex
# Expose port for local dev (wrangler dev requires EXPOSE directives)
EXPOSE 2468
EXPOSE 8000
CMD ["sandbox-agent", "server", "--no-token", "--host", "0.0.0.0", "--port", "2468"]

View File

@@ -6,7 +6,8 @@ compatibility_flags = [ "nodejs_compat" ]
[[containers]]
class_name = "Sandbox"
image = "./Dockerfile.sandbox-agent"
max_instances = 10
instance_type = "lite"
max_instances = 1
[containers.settings]
idle_timeout = "10m"