session resume

This commit is contained in:
Tienson Qin
2026-03-02 12:04:56 +08:00
parent bf188febb4
commit edc8a8bd6e
6 changed files with 627 additions and 61 deletions

View File

@@ -23,3 +23,4 @@ Milestones are tracked as separate files in this folder:
- `17-m17-separate-agents-worker-from-db-sync.md`
- `18-m18-github-app-installation-tokens.md`
- `19-m19-vercel-sandbox-runtime-and-snapshot.md`
- `20-m20-resume-session-with-persisted-workspace.md`

View File

@@ -0,0 +1,127 @@
# M20: Resume Session with Persisted Workspace
Status: Proposed
Target: Guarantee no work loss when a sandbox stops by rebuilding workspace state on demand for the next message.
## Goal
Allow Codex and Claude sessions to resume from a stopped sandbox with the same repository working state so conversation can continue without losing local changes.
## Why M20
- Chat history is already persisted in Cloudflare Durable Objects, but sandbox filesystem state is ephemeral.
- Vercel Sandbox stops/sleeps by design, which can drop unsaved workspace state if not persisted.
- Frequent full snapshots are expensive for large repos.
- A hybrid persistence strategy can minimize storage cost while preserving correctness.
## Scope
1) Add workspace persistence state to session metadata:
- Store `base-snapshot-id`, `latest-patch-key`, `patch-seq`, `last-known-head-sha`, `workspace-dir`, and `checkpoint-at`.
- Keep this state in the session Durable Object so restore is deterministic.
2) Adopt a hybrid persistence model:
- Base layer: use Vercel snapshot as a reusable base image for repo + dependencies.
- Delta layer: persist conversation-local git delta artifacts in R2.
- Default delta format: git bundle of checkpoint commits with metadata JSON.
3) Define checkpoint trigger policy:
- Refresh persistence before controlled inactivity transitions.
- Controlled inactivity means control-plane initiated runtime stop paths such as explicit cancel, PR-ready cleanup, idle timeout shutdown, and manual stop.
- Do not block API responses on checkpoint completion; run checkpoint in background and emit events.
4) Add restore-on-demand behavior:
- On new message for a session with no live runtime, provision a new sandbox.
- Restore from `base-snapshot-id` first.
- Apply latest patch bundle from R2 and verify expected commit SHA.
- Start sandbox-agent and continue the conversation stream.
5) Add observability and failure semantics:
- Emit `sandbox.checkpoint.started|succeeded|failed` and `sandbox.restore.started|succeeded|failed` events.
- If patch apply fails, fall back to clean repo bootstrap and surface a structured recovery warning.
## Out of Scope
- Multi-user merge conflict resolution UI.
- Cross-provider migration of snapshot/patch artifacts.
- Persisting build outputs outside git-tracked workspace state.
- Full snapshot history browsing.
## Architecture
The persistence pipeline uses one durable base plus cheap incremental deltas.
```text
Session running in sandbox
|
| controlled inactivity transition
v
Checkpoint worker
1) ensure local git checkpoint commit (if dirty)
2) export incremental git bundle
3) upload bundle + metadata to R2
4) refresh base snapshot only when policy says stale
5) persist pointers in DO session state
|
v
Sandbox stops
|
| next user message
v
Resume worker
1) create new sandbox from base snapshot
2) download/apply latest bundle from R2
3) verify expected head SHA
4) start sandbox-agent + continue message loop
```
## Workstreams
### WS1: Session Metadata and State Machine
- Extend session state model in `deps/workers/src/logseq/agents/do.cljs` to include persistence pointers and checkpoint timestamps.
- Add explicit session runtime states for `running`, `checkpointing`, and `inactive-runtime`.
- Ensure message enqueue path can trigger resume provisioning when runtime is absent.
### WS2: Runtime Provider Persistence API
- Extend `RuntimeProvider` in `deps/workers/src/logseq/agents/runtime_provider.cljs` with checkpoint/restore primitives for workspace deltas.
- Implement Vercel-specific workspace checkpoint commands under `/vercel/sandbox/<repo>` semantics.
- Keep unsupported behavior explicit for providers without patch persistence support.
### WS3: R2 Artifact Store
- Add R2 upload/download helpers for patch bundles and metadata in `deps/workers/src/logseq/agents/runtime_provider.cljs` (or a new `workspace_store.cljs` helper if extraction is cleaner).
- Define object keys by `session-id` + monotonic `patch-seq`.
- Persist checksums and byte sizes for verification and diagnostics.
### WS4: Inactivity Checkpoint Triggers
- Hook checkpoint refresh into existing teardown paths in `deps/workers/src/logseq/agents/do.cljs`.
- Add idle-timeout initiated stop path (DO alarm or equivalent timer policy) that checkpoints first, then terminates runtime.
- Keep termination idempotent when checkpoint already in progress.
### WS5: Resume Path and Verification
- In `handle-messages` and pending-order flush flow, ensure runtime reprovision restores persisted workspace before sending message.
- Verify post-restore branch/HEAD expectations.
- Emit recovery event and continue with clean bootstrap when verification fails.
### WS6: API and UI Contract Updates
- Extend session status payloads in `deps/workers/src/logseq/sync/malli_schema.cljs` with persistence metadata needed by UI.
- Update frontend session state handling in `src/main/frontend/handler/agent.cljs` for restore/checkpoint status visibility.
- Keep existing Snapshot button behavior unchanged.
### WS7: Docs and Operations
- Update protocol docs in `docs/agent-guide/db-sync/protocol.md` for new checkpoint/restore events and session fields.
- Document required env/bindings for R2 delta storage in worker deployment config docs.
- Add runbook entries for checkpoint failures, restore fallback, and forced rebuild.
## Cost Strategy
- Keep one active base snapshot per repo template key.
- Store frequent deltas as compact R2 objects.
- Refresh base snapshot only when dependency/toolchain fingerprint changes or base snapshot age exceeds policy.
## Testing Requirements
1) Unit tests for checkpoint metadata transitions in `deps/workers/test/logseq/agents/do_test.cljs`.
2) Runtime-provider tests for bundle export/apply and checksum verification in `deps/workers/test/logseq/agents/runtime_provider_test.cljs`.
3) Failure tests for R2 upload/download and restore fallback behavior.
4) Session flow tests proving new message can resume from `inactive-runtime` with no message loss.
5) Regression tests confirming non-Vercel providers remain unchanged.
## Exit Criteria
1) Stopped session can be resumed by sending a new message without losing local repo changes.
2) Controlled inactivity paths always attempt checkpoint refresh before runtime termination.
3) Resume restores expected git HEAD for last checkpoint or emits explicit fallback event.
4) Patch artifacts are persisted in R2 with integrity metadata and bounded retention.
5) End-to-end tests demonstrate no-work-loss behavior across stop and resume cycles.

View File

@@ -90,6 +90,94 @@
(p/let [_ (<put-session! self session)]
session))
(declare <append-event!)
(defn- session-task
[session]
(if (map? (:task session))
(:task session)
{}))
(defn- runtime-snapshot-id
[result]
(or (some-> (:snapshot-id result) str string/trim not-empty)
(some-> (:id result) str string/trim not-empty)))
(defn- runtime-checkpoint-payload
[runtime result reason]
(let [snapshot-id (runtime-snapshot-id result)
provider (some-> (:provider runtime) str string/lower-case)
backup-key (some-> (or (:backup-key result)
(:backup-key runtime))
str
string/trim
not-empty)
backup-dir (some-> (or (:backup-dir result)
(:dir result)
(:backup-dir runtime))
str
string/trim
not-empty)]
(when (string? snapshot-id)
(cond-> {:provider provider
:snapshot-id snapshot-id
:checkpoint-at (common/now-ms)}
(string? backup-key) (assoc :backup-key backup-key)
(string? backup-dir) (assoc :backup-dir backup-dir)
(string? reason) (assoc :reason reason)))))
(defn- <persist-session-checkpoint!
[^js self expected-session-id checkpoint]
(if-not (and (string? expected-session-id) (map? checkpoint))
(p/resolved nil)
(p/let [latest-session (<get-session self)]
(if (and (map? latest-session)
(= expected-session-id (:id latest-session)))
(let [task (session-task latest-session)
task (assoc task :sandbox-checkpoint checkpoint)]
(<save-session! self (assoc latest-session :task task)))
nil))))
(defn- existing-checkpoint-payload
[session reason]
(let [checkpoint (some-> (session-task session) :sandbox-checkpoint)
snapshot-id (some-> (:snapshot-id checkpoint) str string/trim not-empty)]
(when (string? snapshot-id)
(cond-> (assoc checkpoint :checkpoint-at (common/now-ms))
(string? reason) (assoc :reason reason)))))
(defn- <checkpoint-existing-snapshot!
[^js self current-session {:keys [by reason]}]
(let [checkpoint (existing-checkpoint-payload current-session reason)
snapshot-id (runtime-snapshot-id checkpoint)]
(-> (p/let [_ (<append-event! self {:type "sandbox.checkpoint.started"
:data (cond-> {}
(string? by) (assoc :by by)
(string? reason) (assoc :reason reason))
:ts (common/now-ms)})]
(if (map? checkpoint)
(p/let [_ (<persist-session-checkpoint! self (:id current-session) checkpoint)
_ (<append-event! self {:type "sandbox.checkpoint.succeeded"
:data (cond-> {:reused true}
(string? by) (assoc :by by)
(string? reason) (assoc :reason reason)
(string? snapshot-id) (assoc :snapshot-id snapshot-id))
:ts (common/now-ms)})]
true)
(p/let [_ (<append-event! self {:type "sandbox.checkpoint.failed"
:data (cond-> {:error "missing existing checkpoint snapshot"}
(string? by) (assoc :by by)
(string? reason) (assoc :reason reason))
:ts (common/now-ms)})]
false)))
(p/catch (fn [error]
(log/error :agent/checkpoint-existing-snapshot-failed
{:session-id (:id current-session)
:runtime-session-id (get-in current-session [:runtime :session-id])
:sandbox-id (get-in current-session [:runtime :sandbox-id])
:error (str error)})
nil)))))
(defn- stream-url [request session-id]
(let [base (or (header request "x-stream-base")
(.-origin (platform/request-url request)))]
@@ -341,7 +429,9 @@
(let [runtime (:runtime current-session)]
(if-not (map? runtime)
nil
(p/let [terminated? (-> (<terminate-runtime! self runtime)
(p/let [_ (<checkpoint-existing-snapshot! self current-session {:by "system"
:reason "pr-ready"})
terminated? (-> (<terminate-runtime! self runtime)
(p/then (fn [_] true))
(p/catch (fn [error]
(log/error :agent/pr-runtime-terminate-failed
@@ -466,6 +556,25 @@
(def ^:private events-stream-ready-timeout-ms 1000)
(defn- runtime-ready?
[runtime]
(and (map? runtime)
(string? (:session-id runtime))))
(declare <provision-runtime!)
(defn- <ensure-runtime-for-session!
[^js self current-session]
(if-not (map? current-session)
(p/resolved current-session)
(let [runtime (:runtime current-session)
session-id (:id current-session)]
(if (runtime-ready? runtime)
(p/resolved current-session)
(p/let [_ (<provision-runtime! self (:task current-session) session-id)
refreshed-session (<get-session self)]
refreshed-session)))))
(defn- start-runtime-events-stream-background! [^js self session-id runtime]
;; Fire-and-forget start. Returning nil avoids p/let awaiting the stream task.
(when (and (map? runtime)
@@ -475,30 +584,56 @@
(defn- send-runtime-message!
[^js self current-session runtime provider message kind]
(let [ready-promise (ensure-runtime-events-stream-ready! self (:id current-session) runtime)]
(-> (<await-events-stream-ready ready-promise events-stream-ready-timeout-ms)
(.then (fn [_]
(runtime-provider/<send-message! provider
runtime
{:message message
:kind kind})))
(.catch (fn [error]
(log/error :agent/runtime-message-error
{:session-id (:id current-session)
:runtime-session-id (:session-id runtime)
:error error})
(<append-event! self {:type "agent.runtime.error"
:data {:session-id (:id current-session)
:message (str error)}
:ts (common/now-ms)})))
(let [session-id (:id current-session)
send-once! (fn [session-value]
(let [runtime-value (:runtime session-value)
provider-value (when (runtime-ready? runtime-value)
(runtime-provider/resolve-provider (.-env self) runtime-value))]
(if-not (and provider-value (runtime-ready? runtime-value))
(p/rejected (ex-info "session runtime unavailable"
{:session-id (:id session-value)}))
(let [ready-promise (ensure-runtime-events-stream-ready! self (:id session-value) runtime-value)]
(-> (<await-events-stream-ready ready-promise events-stream-ready-timeout-ms)
(.then (fn [_]
(runtime-provider/<send-message! provider-value
runtime-value
{:message message
:kind kind}))))))))
retry-send! (fn [error]
(p/let [latest-session (<get-session self)]
(if (or (not (map? latest-session))
(not= session-id (:id latest-session))
(terminal-status? (:status latest-session)))
(p/rejected error)
(let [latest-runtime (:runtime latest-session)
failed-runtime-id (some-> (:session-id runtime) str)
latest-runtime-id (some-> (:session-id latest-runtime) str)
same-runtime? (and (string? failed-runtime-id)
(= failed-runtime-id latest-runtime-id))]
(p/let [_ (when same-runtime?
(<save-session! self (assoc latest-session :runtime nil)))
retry-session (if same-runtime?
(<get-session self)
latest-session)
session-with-runtime (<ensure-runtime-for-session! self retry-session)]
(send-once! session-with-runtime))))))]
(-> (send-once! current-session)
(p/catch retry-send!)
(p/catch (fn [error]
(log/error :agent/runtime-message-error
{:session-id session-id
:runtime-session-id (:session-id runtime)
:error error})
(<append-event! self {:type "agent.runtime.error"
:data {:session-id session-id
:message (str error)}
:ts (common/now-ms)})))
(.catch (fn [_] nil)))))
(defn- send-runtime-message-background!
[^js self current-session runtime provider message kind]
(when (and runtime
provider
(string? (:session-id runtime)))
(send-runtime-message! self current-session runtime provider message kind)
[^js self current-session _runtime _provider message kind]
(when (map? current-session)
(send-runtime-message! self current-session _runtime _provider message kind)
nil))
(defn- <transition! [^js self to-status event-type data]
@@ -949,6 +1084,8 @@
result (runtime-provider/<snapshot-runtime! provider
runtime
{:task (:task current-session)})
checkpoint (runtime-checkpoint-payload runtime result "manual")
_ (<persist-session-checkpoint! self (:id current-session) checkpoint)
snapshot-id (or (:snapshot-id result)
(:id result))
_ (<append-event! self {:type "sandbox.snapshot.succeeded"
@@ -979,6 +1116,9 @@
(p/let [res (<append-event! self {:type "session.canceled"
:data {:by user-id}})
current-session (<get-session self)
_ (when (map? (:runtime current-session))
(<checkpoint-existing-snapshot! self current-session {:by user-id
:reason "cancel"}))
_ (<terminate-runtime! self (:runtime current-session))
current-session (when current-session (assoc current-session :runtime nil))
_ (when current-session (<save-session! self current-session))]
@@ -990,26 +1130,22 @@
(p/let [current-session (<get-session self)]
(if (nil? current-session)
nil
(let [[orders next-session] (session/drain-orders current-session)
runtime (:runtime current-session)
provider (when runtime
(runtime-provider/resolve-provider (.-env self) runtime))]
(p/let [_ (<save-session! self next-session)
_ (when (and runtime
provider
(string? (:session-id runtime)))
(start-runtime-events-stream-background! self (:id current-session) runtime))
_ (when (and runtime
provider
(string? (:session-id runtime)))
(p/all
(map (fn [order]
(runtime-provider/<send-message! provider
runtime
{:message (:message order)
:kind (:kind order)}))
orders)))]
(count orders))))))
(p/let [session-with-runtime (<ensure-runtime-for-session! self current-session)
runtime (:runtime session-with-runtime)]
(if-not (runtime-ready? runtime)
0
(let [[orders next-session] (session/drain-orders session-with-runtime)
provider (runtime-provider/resolve-provider (.-env self) runtime)]
(p/let [_ (<save-session! self next-session)
_ (start-runtime-events-stream-background! self (:id session-with-runtime) runtime)
_ (p/all
(map (fn [order]
(runtime-provider/<send-message! provider
runtime
{:message (:message order)
:kind (:kind order)}))
orders))]
(count orders))))))))
(defn- handle-pause [^js self request]
(let [user-id (user-id-from-request request)]

View File

@@ -503,6 +503,19 @@
(string/lower-case repo-url))]
(str repo-key "#" (string/lower-case branch))))))
(defn- task-sandbox-checkpoint
[task]
(let [checkpoint (when (map? task) (:sandbox-checkpoint task))
provider (normalize-provider (:provider checkpoint))
snapshot-id (some-> (:snapshot-id checkpoint) str string/trim not-empty)
backup-key (some-> (:backup-key checkpoint) str string/trim not-empty)
backup-dir (some-> (:backup-dir checkpoint) str string/trim not-empty)]
(when (and (map? checkpoint) (string? snapshot-id))
(cond-> {:snapshot-id snapshot-id}
(string? provider) (assoc :provider provider)
(string? backup-key) (assoc :backup-key backup-key)
(string? backup-dir) (assoc :backup-dir backup-dir)))))
(defn- prune-cloudflare-backup-cache!
[]
(let [now (js/Date.now)]
@@ -1143,31 +1156,37 @@
:json-body (message-payload message)}))
(defn- <cloudflare-restore-backup!
[^js sandbox backup-key target-dir]
[^js sandbox backup-key target-dir {:keys [backup-id]}]
(let [entry (cloudflare-backup-entry backup-key)
explicit-backup-id (some-> backup-id str string/trim not-empty)
backup-id (or explicit-backup-id
(:id entry))
from-cache? (and (nil? explicit-backup-id)
(map? entry))
restore-backup (js-method sandbox "restoreBackup")]
(cond
(or (not (string? backup-key))
(not (string? target-dir))
(nil? entry))
(or (not (string? target-dir))
(not (string? backup-id)))
(p/resolved false)
(not (fn? restore-backup))
(p/resolved false)
:else
(let [backup-id (:id entry)
backup #js {:id backup-id
(let [backup #js {:id backup-id
:dir target-dir}]
(-> (->promise (.restoreBackup sandbox backup))
(p/then (fn [_]
(when (and (string? backup-key) (string? backup-id))
(remember-cloudflare-backup! backup-key backup-id))
(log/debug :agent/cloudflare-backup-restored
{:backup-key backup-key
:backup-id backup-id
:dir target-dir})
true))
(p/catch (fn [error]
(forget-cloudflare-backup! backup-key)
(when (and from-cache? (string? backup-key))
(forget-cloudflare-backup! backup-key))
(log/error :agent/cloudflare-backup-restore-failed
{:backup-key backup-key
:backup-id backup-id
@@ -1542,6 +1561,36 @@
:snapshot-dir nil
:restored? false})))))))
(defn- <vercel-create-sandbox-from-checkpoint!
[^js env checkpoint]
(let [snapshot-id (:snapshot-id checkpoint)
snapshot-dir (:backup-dir checkpoint)]
(if-not (string? snapshot-id)
(p/resolved nil)
(-> (<vercel-create-sandbox! env {:type "snapshot"
:snapshotId snapshot-id})
(p/then (fn [sandbox]
(log/debug :agent/vercel-snapshot-restored
{:snapshot-id snapshot-id
:source "task-checkpoint"})
{:sandbox sandbox
:snapshot-id snapshot-id
:snapshot-dir snapshot-dir
:restored? true}))
(p/catch (fn [error]
(log/error :agent/vercel-snapshot-restore-failed
{:snapshot-id snapshot-id
:source "task-checkpoint"
:error (str error)})
nil))))))
(defn- <vercel-create-sandbox-for-restore!
[^js env backup-key checkpoint]
(p/let [checkpoint-restore (<vercel-create-sandbox-from-checkpoint! env checkpoint)]
(if (map? checkpoint-restore)
checkpoint-restore
(<vercel-create-sandbox-from-cache! env backup-key))))
(defn- <vercel-runtime-base-url!
[^js env runtime]
(let [cached (:base-url runtime)
@@ -1818,9 +1867,19 @@
port (vercel-agent-port env nil)
payload (session-payload task)
repo-dir (get-repo-dir session-id task "vercel")
backup-key (repo-backup-key task)]
(p/let [{:keys [sandbox snapshot-dir restored?]} (<vercel-create-sandbox-from-cache! env backup-key)
checkpoint (task-sandbox-checkpoint task)
backup-key (or (:backup-key checkpoint)
(repo-backup-key task))]
(p/let [{:keys [sandbox snapshot-id snapshot-dir restored?]} (<vercel-create-sandbox-for-restore! env
backup-key
checkpoint)
_ (<vercel-ensure-running! env sandbox session-id task port agent-token)
_ (when (and restored?
(string? backup-key)
(string? snapshot-id))
(remember-vercel-snapshot! backup-key
snapshot-id
(or snapshot-dir repo-dir)))
_ (when restored?
(<vercel-restore-repo-dir! sandbox snapshot-dir repo-dir))
_ (when-not restored?
@@ -1878,7 +1937,10 @@
snapshot-id (:snapshot-id result)]
(when (and (string? backup-key) (string? snapshot-id))
(remember-vercel-snapshot! backup-key snapshot-id backup-dir))
result)))
(cond-> result
(string? backup-key) (assoc :backup-key backup-key)
(string? backup-dir) (assoc :backup-dir backup-dir)
:always (assoc :provider "vercel")))))
(<push-branch! [_ _runtime _opts]
(p/rejected
@@ -1906,7 +1968,11 @@
agent-token (env-str env "SANDBOX_AGENT_TOKEN")
payload (session-payload task)
repo-dir (get-repo-dir session-id)
backup-key (repo-backup-key task)]
checkpoint (task-sandbox-checkpoint task)
backup-key (or (:backup-key checkpoint)
(repo-backup-key task))
checkpoint-backup-id (when (= "cloudflare" (:provider checkpoint))
(:snapshot-id checkpoint))]
(log/debug :agent/cloudflare-provision-start
{:session-id session-id
:sandbox-id sandbox-id
@@ -1915,7 +1981,10 @@
:backup-key backup-key
:repo-dir repo-dir})
(p/let [_ (<cloudflare-ensure-running! env sandbox task port agent-token)
restored? (<cloudflare-restore-backup! sandbox backup-key repo-dir)
restored? (<cloudflare-restore-backup! sandbox
backup-key
repo-dir
{:backup-id checkpoint-backup-id})
_ (when-not restored?
(<cloudflare-clone-repo! env sandbox session-id task))
response (<cloudflare-create-session! sandbox port agent-token session-id payload)]
@@ -1990,7 +2059,10 @@
snapshot-id (:snapshot-id result)]
(when (and (string? backup-key) (string? snapshot-id))
(remember-cloudflare-backup! backup-key snapshot-id))
result))))
(cond-> result
(string? backup-key) (assoc :backup-key backup-key)
(string? backup-dir) (assoc :backup-dir backup-dir)
:always (assoc :provider "cloudflare"))))))
(<push-branch! [_ runtime opts]
(let [sandbox-id (:sandbox-id runtime)

View File

@@ -171,6 +171,63 @@
(is false (str "unexpected error: " error))
(done))))))))
(deftest messages-reprovision-runtime-after-send-failure-test
(testing "session messages reprovision runtime and retry send when current runtime is stale"
(async done
(let [env #js {"AGENT_RUNTIME_PROVIDER" "local-dev"
"SANDBOX_AGENT_URL" "http://sandbox.local"}
self (make-self env)
headers {"content-type" "application/json"
"x-user-id" "user-1"}
send-runtime-ids (atom [])
provision-calls (atom 0)]
(-> (.put (.-storage self)
"session"
(clj->js {:id "sess-reprovision"
:status "running"
:task {:project {:repo-url "https://github.com/example/repo"}}
:runtime {:provider "local-dev"
:session-id "runtime-old"}
:audit {}
:created-at 0
:updated-at 0}))
(.then (fn [_]
(with-redefs [runtime-provider/<send-message!
(fn [_provider runtime _message]
(swap! send-runtime-ids conj (:session-id runtime))
(if (= "runtime-old" (:session-id runtime))
(js/Promise.reject (ex-info "stale runtime" {}))
(js/Promise.resolve true)))
agent-do/<provision-runtime!
(fn [self _task _session-id]
(swap! provision-calls inc)
(-> (.get (.-storage self) "session")
(.then (fn [session-js]
(let [session (js->clj session-js :keywordize-keys true)
next-session (assoc session
:runtime {:provider "local-dev"
:session-id "runtime-new"})]
(.put (.-storage self) "session" (clj->js next-session)))))
(.then (fn [_]
{:provider "local-dev"
:session-id "runtime-new"}))))]
(agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/messages"
"POST"
{:message "retry me"}
headers)))))
(.then (fn [resp]
(is (= 200 (.-status resp)))
(js/setTimeout
(fn []
(is (= 1 @provision-calls))
(is (= ["runtime-old" "runtime-new"] @send-runtime-ids))
(done))
30)))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done))))))))
(deftest init-does-not-wait-for-open-events-stream-test
(testing "session init returns immediately even when runtime events stream stays open"
(async done
@@ -755,7 +812,13 @@
(is (= "snapshot-created" (:status body)))
(is (= "backup-1" (:snapshot-id body)))
(is (= 1 (count @snapshot-calls)))
(done)))))
(.then (.get (.-storage self) "session")
(fn [session]
(let [session (js->clj session :keywordize-keys true)
checkpoint (get-in session [:task :sandbox-checkpoint])]
(is (= "backup-1" (:snapshot-id checkpoint)))
(is (= "cloudflare" (:provider checkpoint)))
(done))))))))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done))))))))
@@ -978,6 +1041,7 @@
(testing "session publish endpoint terminates runtime and clears sandbox runtime when PR is created"
(async done
(let [terminate-calls (atom [])
snapshot-calls (atom [])
env #js {"AGENT_RUNTIME_PROVIDER" "local-dev"}
self (make-self env)
headers {"content-type" "application/json"
@@ -986,7 +1050,9 @@
"session"
(clj->js {:id "sess-pr-created"
:status "running"
:task {:project {:repo-url "https://github.com/example/repo"}}
:task {:project {:repo-url "https://github.com/example/repo"}
:sandbox-checkpoint {:provider "cloudflare"
:snapshot-id "checkpoint-existing-pr"}}
:runtime {:provider "cloudflare"
:sandbox-id "sbx-pr-created"
:session-id "sess-pr-created"}
@@ -1011,6 +1077,10 @@
(fn [_env _token _repo-url _opts]
(js/Promise.resolve {:url "https://github.com/example/repo/pull/88"
:id 88}))
runtime-provider/<snapshot-runtime!
(fn [_provider runtime _opts]
(swap! snapshot-calls conj runtime)
(js/Promise.resolve {:snapshot-id "backup-pr-1"}))
agent-do/<terminate-runtime!
(fn [_self runtime]
(swap! terminate-calls conj runtime)
@@ -1032,12 +1102,68 @@
(fn [session]
(let [session (js->clj session :keywordize-keys true)]
(is (= 1 (count @terminate-calls)))
(is (empty? @snapshot-calls))
(is (nil? (:runtime session)))
(is (= "checkpoint-existing-pr"
(get-in session [:task :sandbox-checkpoint :snapshot-id])))
(is (number? (get-in session [:task :sandbox-checkpoint :checkpoint-at])))
(done))))))))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done))))))))
(deftest cancel-endpoint-checkpoints-before-terminate-test
(testing "session cancel checkpoints existing snapshot pointer before terminating runtime"
(async done
(let [terminate-calls (atom [])
snapshot-calls (atom [])
env #js {"AGENT_RUNTIME_PROVIDER" "local-dev"}
self (make-self env)
headers {"content-type" "application/json"
"x-user-id" "user-1"}]
(-> (.put (.-storage self)
"session"
(clj->js {:id "sess-cancel-checkpoint"
:status "running"
:task {:project {:repo-url "https://github.com/example/repo"}
:sandbox-checkpoint {:provider "cloudflare"
:snapshot-id "checkpoint-existing-cancel"}}
:runtime {:provider "cloudflare"
:sandbox-id "sbx-cancel-checkpoint"
:session-id "sess-cancel-checkpoint"}
:audit {}
:created-at 0
:updated-at 0}))
(.then (fn [_]
(with-redefs [runtime-provider/<snapshot-runtime!
(fn [_provider runtime _opts]
(swap! snapshot-calls conj runtime)
(js/Promise.resolve {:snapshot-id "backup-cancel-1"}))
agent-do/<terminate-runtime!
(fn [_self runtime]
(swap! terminate-calls conj runtime)
(js/Promise.resolve nil))]
(agent-do/handle-fetch self
(json-request "http://db-sync.local/__session__/cancel"
"POST"
nil
headers)))))
(.then (fn [resp]
(is (= 200 (.-status resp)))
(.then (.get (.-storage self) "session")
(fn [session]
(let [session (js->clj session :keywordize-keys true)]
(is (empty? @snapshot-calls))
(is (= 1 (count @terminate-calls)))
(is (nil? (:runtime session)))
(is (= "checkpoint-existing-cancel"
(get-in session [:task :sandbox-checkpoint :snapshot-id])))
(is (number? (get-in session [:task :sandbox-checkpoint :checkpoint-at])))
(done))))))
(.catch (fn [error]
(is false (str "unexpected error: " error))
(done))))))))
(deftest terminal-endpoint-requires-authenticated-user-test
(testing "session terminal endpoint requires x-user-id header"
(async done

View File

@@ -1,8 +1,8 @@
(ns logseq.agents.runtime-provider-test
(:require [cljs.test :refer [async deftest is testing]]
[clojure.string :as string]
[logseq.agents.sandbox :as sandbox]
[logseq.agents.runtime-provider :as runtime-provider]))
[logseq.agents.runtime-provider :as runtime-provider]
[logseq.agents.sandbox :as sandbox]))
(defn- fetch-url [request]
(cond
@@ -224,7 +224,7 @@
(.catch (fn [error]
(let [data (ex-data error)]
(is (= :unsupported (:reason data)))
(is (= "local-dev" (:provider data))))
(is (= "local-dev" (:provider data))))
(done)))))))
(deftest vercel-provider-snapshot-restore-flow-test
@@ -292,7 +292,49 @@
(done))))))
(.catch (fn [error]
(is false (str "unexpected first provision error: " error))
(done)))))))))
(done))))))))
(deftest vercel-provider-restores-from-task-checkpoint-test
(async done
(runtime-provider/clear-vercel-snapshot-cache!)
(let [calls (atom {:clone 0
:sources []})
env #js {"VERCEL_TEAM_ID" "team-1"
"VERCEL_PROJECT_ID" "project-1"
"VERCEL_TOKEN" "token-vercel"}
provider (runtime-provider/create-provider env "vercel")
task {:agent {:provider "codex"}
:project {:repo-url "https://github.com/example/repo"
:base-branch "main"}
:sandbox-checkpoint {:provider "vercel"
:snapshot-id "persisted-snap-7"
:backup-dir "/vercel/sandbox/repo"}}]
(with-redefs [runtime-provider/<vercel-create-sandbox!
(fn [_env source]
(swap! calls update :sources conj source)
(js/Promise.resolve
#js {:sandboxId "vercel-sbx-checkpoint"
:domain (fn [_port] "https://vercel-agent.local")}))
runtime-provider/<vercel-run-shell!
(fn [_sandbox cmd & _]
(when (string/includes? cmd "git clone --depth 1 --single-branch --no-tags")
(swap! calls update :clone inc))
(js/Promise.resolve {:stdout "" :stderr "" :exit-code 0}))
sandbox/<create-session
(fn [_base-url _agent-token session-id _payload]
(js/Promise.resolve {:session-id session-id}))]
(-> (runtime-provider/<provision-runtime! provider "sess-vercel-checkpoint" task)
(.then (fn [runtime]
(is (= "vercel" (:provider runtime)))
(is (= 0 (:clone @calls)))
(is (= "snapshot"
(get-in (first (:sources @calls)) [:type])))
(is (= "persisted-snap-7"
(get-in (first (:sources @calls)) [:snapshotId])))
(done)))
(.catch (fn [error]
(is false (str "unexpected task-checkpoint restore error: " error))
(done))))))))
(deftest vercel-provider-open-terminal-unsupported-test
(async done
@@ -1092,6 +1134,68 @@
(is false (str "unexpected snapshot error: " error))
(done)))))))
(deftest cloudflare-provider-restores-from-task-checkpoint-test
(async done
(runtime-provider/clear-cloudflare-backup-cache!)
(let [calls (atom {:clone 0
:restore []})
sandboxes (atom {})
make-sandbox
(fn [sandbox-id]
#js {:exec
(fn [cmd]
(cond
(string/includes? cmd "/v1/health")
(js/Promise.resolve #js {:success true})
(string/includes? cmd "git clone --depth 1 --single-branch --no-tags")
(do
(swap! calls update :clone inc)
(js/Promise.resolve #js {:success true}))
:else
(js/Promise.resolve #js {:success true :stdout "" :stderr ""})))
:setEnvVars (fn [_] (js/Promise.resolve nil))
:startProcess (fn [_] (js/Promise.resolve nil))
:containerFetch
(fn [_req _port]
(js/Promise.resolve
(js/Response.
(js/JSON.stringify #js {:ok true})
#js {:status 200 :headers #js {"content-type" "application/json"}})))
:restoreBackup
(fn [backup]
(swap! calls update :restore conj (js->clj backup :keywordize-keys true))
(js/Promise.resolve #js {:success true}))
:delete (fn [] (js/Promise.resolve nil))})
sandbox-ns
#js {:idFromName (fn [name] name)
:get (fn [id]
(or (get @sandboxes id)
(let [sandbox (make-sandbox id)]
(swap! sandboxes assoc id sandbox)
sandbox)))}
env #js {"Sandbox" sandbox-ns
"CLOUDFLARE_SANDBOX_AGENT_PORT" "8000"}
provider (runtime-provider/create-provider env "cloudflare")
task {:agent {:provider "codex"}
:project {:repo-url "https://github.com/example/repo"
:base-branch "main"}
:sandbox-checkpoint {:provider "cloudflare"
:snapshot-id "backup-from-task-1"}}]
(-> (runtime-provider/<provision-runtime! provider "sess-cf-checkpoint" task)
(.then (fn [_runtime]
(is (= 0 (:clone @calls)))
(is (= 1 (count (:restore @calls))))
(is (= "backup-from-task-1"
(get-in @calls [:restore 0 :id])))
(is (= "/workspace/sess-cf-checkpoint"
(get-in @calls [:restore 0 :dir])))
(done)))
(.catch (fn [error]
(is false (str "unexpected cloudflare task-checkpoint restore error: " error))
(done)))))))
(deftest cloudflare-provider-snapshot-runtime-test
(async done
(runtime-provider/clear-cloudflare-backup-cache!)