diff --git a/deps/workers/docs/milestones/agents/00-index.md b/deps/workers/docs/milestones/agents/00-index.md index a7ef527b5f..5a0c3c63c0 100644 --- a/deps/workers/docs/milestones/agents/00-index.md +++ b/deps/workers/docs/milestones/agents/00-index.md @@ -23,3 +23,4 @@ Milestones are tracked as separate files in this folder: - `17-m17-separate-agents-worker-from-db-sync.md` - `18-m18-github-app-installation-tokens.md` - `19-m19-vercel-sandbox-runtime-and-snapshot.md` +- `20-m20-resume-session-with-persisted-workspace.md` diff --git a/deps/workers/docs/milestones/agents/20-m20-resume-session-with-persisted-workspace.md b/deps/workers/docs/milestones/agents/20-m20-resume-session-with-persisted-workspace.md new file mode 100644 index 0000000000..aee9458a59 --- /dev/null +++ b/deps/workers/docs/milestones/agents/20-m20-resume-session-with-persisted-workspace.md @@ -0,0 +1,127 @@ +# M20: Resume Session with Persisted Workspace + +Status: Proposed +Target: Guarantee no work loss when a sandbox stops by rebuilding workspace state on demand for the next message. + +## Goal +Allow Codex and Claude sessions to resume from a stopped sandbox with the same repository working state so conversation can continue without losing local changes. + +## Why M20 +- Chat history is already persisted in Cloudflare Durable Objects, but sandbox filesystem state is ephemeral. +- Vercel Sandbox stops/sleeps by design, which can drop unsaved workspace state if not persisted. +- Frequent full snapshots are expensive for large repos. +- A hybrid persistence strategy can minimize storage cost while preserving correctness. + +## Scope +1) Add workspace persistence state to session metadata: +- Store `base-snapshot-id`, `latest-patch-key`, `patch-seq`, `last-known-head-sha`, `workspace-dir`, and `checkpoint-at`. +- Keep this state in the session Durable Object so restore is deterministic. + +2) Adopt a hybrid persistence model: +- Base layer: use Vercel snapshot as a reusable base image for repo + dependencies. +- Delta layer: persist conversation-local git delta artifacts in R2. +- Default delta format: git bundle of checkpoint commits with metadata JSON. + +3) Define checkpoint trigger policy: +- Refresh persistence before controlled inactivity transitions. +- Controlled inactivity means control-plane initiated runtime stop paths such as explicit cancel, PR-ready cleanup, idle timeout shutdown, and manual stop. +- Do not block API responses on checkpoint completion; run checkpoint in background and emit events. + +4) Add restore-on-demand behavior: +- On new message for a session with no live runtime, provision a new sandbox. +- Restore from `base-snapshot-id` first. +- Apply latest patch bundle from R2 and verify expected commit SHA. +- Start sandbox-agent and continue the conversation stream. + +5) Add observability and failure semantics: +- Emit `sandbox.checkpoint.started|succeeded|failed` and `sandbox.restore.started|succeeded|failed` events. +- If patch apply fails, fall back to clean repo bootstrap and surface a structured recovery warning. + +## Out of Scope +- Multi-user merge conflict resolution UI. +- Cross-provider migration of snapshot/patch artifacts. +- Persisting build outputs outside git-tracked workspace state. +- Full snapshot history browsing. + +## Architecture +The persistence pipeline uses one durable base plus cheap incremental deltas. + +```text +Session running in sandbox + | + | controlled inactivity transition + v +Checkpoint worker + 1) ensure local git checkpoint commit (if dirty) + 2) export incremental git bundle + 3) upload bundle + metadata to R2 + 4) refresh base snapshot only when policy says stale + 5) persist pointers in DO session state + | + v +Sandbox stops + | + | next user message + v +Resume worker + 1) create new sandbox from base snapshot + 2) download/apply latest bundle from R2 + 3) verify expected head SHA + 4) start sandbox-agent + continue message loop +``` + +## Workstreams + +### WS1: Session Metadata and State Machine +- Extend session state model in `deps/workers/src/logseq/agents/do.cljs` to include persistence pointers and checkpoint timestamps. +- Add explicit session runtime states for `running`, `checkpointing`, and `inactive-runtime`. +- Ensure message enqueue path can trigger resume provisioning when runtime is absent. + +### WS2: Runtime Provider Persistence API +- Extend `RuntimeProvider` in `deps/workers/src/logseq/agents/runtime_provider.cljs` with checkpoint/restore primitives for workspace deltas. +- Implement Vercel-specific workspace checkpoint commands under `/vercel/sandbox/` semantics. +- Keep unsupported behavior explicit for providers without patch persistence support. + +### WS3: R2 Artifact Store +- Add R2 upload/download helpers for patch bundles and metadata in `deps/workers/src/logseq/agents/runtime_provider.cljs` (or a new `workspace_store.cljs` helper if extraction is cleaner). +- Define object keys by `session-id` + monotonic `patch-seq`. +- Persist checksums and byte sizes for verification and diagnostics. + +### WS4: Inactivity Checkpoint Triggers +- Hook checkpoint refresh into existing teardown paths in `deps/workers/src/logseq/agents/do.cljs`. +- Add idle-timeout initiated stop path (DO alarm or equivalent timer policy) that checkpoints first, then terminates runtime. +- Keep termination idempotent when checkpoint already in progress. + +### WS5: Resume Path and Verification +- In `handle-messages` and pending-order flush flow, ensure runtime reprovision restores persisted workspace before sending message. +- Verify post-restore branch/HEAD expectations. +- Emit recovery event and continue with clean bootstrap when verification fails. + +### WS6: API and UI Contract Updates +- Extend session status payloads in `deps/workers/src/logseq/sync/malli_schema.cljs` with persistence metadata needed by UI. +- Update frontend session state handling in `src/main/frontend/handler/agent.cljs` for restore/checkpoint status visibility. +- Keep existing Snapshot button behavior unchanged. + +### WS7: Docs and Operations +- Update protocol docs in `docs/agent-guide/db-sync/protocol.md` for new checkpoint/restore events and session fields. +- Document required env/bindings for R2 delta storage in worker deployment config docs. +- Add runbook entries for checkpoint failures, restore fallback, and forced rebuild. + +## Cost Strategy +- Keep one active base snapshot per repo template key. +- Store frequent deltas as compact R2 objects. +- Refresh base snapshot only when dependency/toolchain fingerprint changes or base snapshot age exceeds policy. + +## Testing Requirements +1) Unit tests for checkpoint metadata transitions in `deps/workers/test/logseq/agents/do_test.cljs`. +2) Runtime-provider tests for bundle export/apply and checksum verification in `deps/workers/test/logseq/agents/runtime_provider_test.cljs`. +3) Failure tests for R2 upload/download and restore fallback behavior. +4) Session flow tests proving new message can resume from `inactive-runtime` with no message loss. +5) Regression tests confirming non-Vercel providers remain unchanged. + +## Exit Criteria +1) Stopped session can be resumed by sending a new message without losing local repo changes. +2) Controlled inactivity paths always attempt checkpoint refresh before runtime termination. +3) Resume restores expected git HEAD for last checkpoint or emits explicit fallback event. +4) Patch artifacts are persisted in R2 with integrity metadata and bounded retention. +5) End-to-end tests demonstrate no-work-loss behavior across stop and resume cycles. diff --git a/deps/workers/src/logseq/agents/do.cljs b/deps/workers/src/logseq/agents/do.cljs index 47521d6b9e..7f9b25a9ed 100644 --- a/deps/workers/src/logseq/agents/do.cljs +++ b/deps/workers/src/logseq/agents/do.cljs @@ -90,6 +90,94 @@ (p/let [_ ( (:snapshot-id result) str string/trim not-empty) + (some-> (:id result) str string/trim not-empty))) + +(defn- runtime-checkpoint-payload + [runtime result reason] + (let [snapshot-id (runtime-snapshot-id result) + provider (some-> (:provider runtime) str string/lower-case) + backup-key (some-> (or (:backup-key result) + (:backup-key runtime)) + str + string/trim + not-empty) + backup-dir (some-> (or (:backup-dir result) + (:dir result) + (:backup-dir runtime)) + str + string/trim + not-empty)] + (when (string? snapshot-id) + (cond-> {:provider provider + :snapshot-id snapshot-id + :checkpoint-at (common/now-ms)} + (string? backup-key) (assoc :backup-key backup-key) + (string? backup-dir) (assoc :backup-dir backup-dir) + (string? reason) (assoc :reason reason))))) + +(defn- (session-task session) :sandbox-checkpoint) + snapshot-id (some-> (:snapshot-id checkpoint) str string/trim not-empty)] + (when (string? snapshot-id) + (cond-> (assoc checkpoint :checkpoint-at (common/now-ms)) + (string? reason) (assoc :reason reason))))) + +(defn- (p/let [_ ( {} + (string? by) (assoc :by by) + (string? reason) (assoc :reason reason)) + :ts (common/now-ms)})] + (if (map? checkpoint) + (p/let [_ ( {:reused true} + (string? by) (assoc :by by) + (string? reason) (assoc :reason reason) + (string? snapshot-id) (assoc :snapshot-id snapshot-id)) + :ts (common/now-ms)})] + true) + (p/let [_ ( {:error "missing existing checkpoint snapshot"} + (string? by) (assoc :by by) + (string? reason) (assoc :reason reason)) + :ts (common/now-ms)})] + false))) + (p/catch (fn [error] + (log/error :agent/checkpoint-existing-snapshot-failed + {:session-id (:id current-session) + :runtime-session-id (get-in current-session [:runtime :session-id]) + :sandbox-id (get-in current-session [:runtime :sandbox-id]) + :error (str error)}) + nil))))) + (defn- stream-url [request session-id] (let [base (or (header request "x-stream-base") (.-origin (platform/request-url request)))] @@ -341,7 +429,9 @@ (let [runtime (:runtime current-session)] (if-not (map? runtime) nil - (p/let [terminated? (-> ( ( ( ( (:session-id runtime) str) + latest-runtime-id (some-> (:session-id latest-runtime) str) + same-runtime? (and (string? failed-runtime-id) + (= failed-runtime-id latest-runtime-id))] + (p/let [_ (when same-runtime? + ( (send-once! current-session) + (p/catch retry-send!) + (p/catch (fn [error] + (log/error :agent/runtime-message-error + {:session-id session-id + :runtime-session-id (:session-id runtime) + :error error}) + ( (:snapshot-id checkpoint) str string/trim not-empty) + backup-key (some-> (:backup-key checkpoint) str string/trim not-empty) + backup-dir (some-> (:backup-dir checkpoint) str string/trim not-empty)] + (when (and (map? checkpoint) (string? snapshot-id)) + (cond-> {:snapshot-id snapshot-id} + (string? provider) (assoc :provider provider) + (string? backup-key) (assoc :backup-key backup-key) + (string? backup-dir) (assoc :backup-dir backup-dir))))) + (defn- prune-cloudflare-backup-cache! [] (let [now (js/Date.now)] @@ -1143,31 +1156,37 @@ :json-body (message-payload message)})) (defn- backup-id str string/trim not-empty) + backup-id (or explicit-backup-id + (:id entry)) + from-cache? (and (nil? explicit-backup-id) + (map? entry)) restore-backup (js-method sandbox "restoreBackup")] (cond - (or (not (string? backup-key)) - (not (string? target-dir)) - (nil? entry)) + (or (not (string? target-dir)) + (not (string? backup-id))) (p/resolved false) (not (fn? restore-backup)) (p/resolved false) :else - (let [backup-id (:id entry) - backup #js {:id backup-id + (let [backup #js {:id backup-id :dir target-dir}] (-> (->promise (.restoreBackup sandbox backup)) (p/then (fn [_] + (when (and (string? backup-key) (string? backup-id)) + (remember-cloudflare-backup! backup-key backup-id)) (log/debug :agent/cloudflare-backup-restored {:backup-key backup-key :backup-id backup-id :dir target-dir}) true)) (p/catch (fn [error] - (forget-cloudflare-backup! backup-key) + (when (and from-cache? (string? backup-key)) + (forget-cloudflare-backup! backup-key)) (log/error :agent/cloudflare-backup-restore-failed {:backup-key backup-key :backup-id backup-id @@ -1542,6 +1561,36 @@ :snapshot-dir nil :restored? false}))))))) +(defn- ( result + (string? backup-key) (assoc :backup-key backup-key) + (string? backup-dir) (assoc :backup-dir backup-dir) + :always (assoc :provider "vercel"))))) ( result + (string? backup-key) (assoc :backup-key backup-key) + (string? backup-dir) (assoc :backup-dir backup-dir) + :always (assoc :provider "cloudflare")))))) ( (.put (.-storage self) + "session" + (clj->js {:id "sess-reprovision" + :status "running" + :task {:project {:repo-url "https://github.com/example/repo"}} + :runtime {:provider "local-dev" + :session-id "runtime-old"} + :audit {} + :created-at 0 + :updated-at 0})) + (.then (fn [_] + (with-redefs [runtime-provider/ (.get (.-storage self) "session") + (.then (fn [session-js] + (let [session (js->clj session-js :keywordize-keys true) + next-session (assoc session + :runtime {:provider "local-dev" + :session-id "runtime-new"})] + (.put (.-storage self) "session" (clj->js next-session))))) + (.then (fn [_] + {:provider "local-dev" + :session-id "runtime-new"}))))] + (agent-do/handle-fetch self + (json-request "http://db-sync.local/__session__/messages" + "POST" + {:message "retry me"} + headers))))) + (.then (fn [resp] + (is (= 200 (.-status resp))) + (js/setTimeout + (fn [] + (is (= 1 @provision-calls)) + (is (= ["runtime-old" "runtime-new"] @send-runtime-ids)) + (done)) + 30))) + (.catch (fn [error] + (is false (str "unexpected error: " error)) + (done)))))))) + (deftest init-does-not-wait-for-open-events-stream-test (testing "session init returns immediately even when runtime events stream stays open" (async done @@ -755,7 +812,13 @@ (is (= "snapshot-created" (:status body))) (is (= "backup-1" (:snapshot-id body))) (is (= 1 (count @snapshot-calls))) - (done))))) + (.then (.get (.-storage self) "session") + (fn [session] + (let [session (js->clj session :keywordize-keys true) + checkpoint (get-in session [:task :sandbox-checkpoint])] + (is (= "backup-1" (:snapshot-id checkpoint))) + (is (= "cloudflare" (:provider checkpoint))) + (done)))))))) (.catch (fn [error] (is false (str "unexpected error: " error)) (done)))))))) @@ -978,6 +1041,7 @@ (testing "session publish endpoint terminates runtime and clears sandbox runtime when PR is created" (async done (let [terminate-calls (atom []) + snapshot-calls (atom []) env #js {"AGENT_RUNTIME_PROVIDER" "local-dev"} self (make-self env) headers {"content-type" "application/json" @@ -986,7 +1050,9 @@ "session" (clj->js {:id "sess-pr-created" :status "running" - :task {:project {:repo-url "https://github.com/example/repo"}} + :task {:project {:repo-url "https://github.com/example/repo"} + :sandbox-checkpoint {:provider "cloudflare" + :snapshot-id "checkpoint-existing-pr"}} :runtime {:provider "cloudflare" :sandbox-id "sbx-pr-created" :session-id "sess-pr-created"} @@ -1011,6 +1077,10 @@ (fn [_env _token _repo-url _opts] (js/Promise.resolve {:url "https://github.com/example/repo/pull/88" :id 88})) + runtime-provider/clj session :keywordize-keys true)] (is (= 1 (count @terminate-calls))) + (is (empty? @snapshot-calls)) (is (nil? (:runtime session))) + (is (= "checkpoint-existing-pr" + (get-in session [:task :sandbox-checkpoint :snapshot-id]))) + (is (number? (get-in session [:task :sandbox-checkpoint :checkpoint-at]))) (done)))))))) (.catch (fn [error] (is false (str "unexpected error: " error)) (done)))))))) +(deftest cancel-endpoint-checkpoints-before-terminate-test + (testing "session cancel checkpoints existing snapshot pointer before terminating runtime" + (async done + (let [terminate-calls (atom []) + snapshot-calls (atom []) + env #js {"AGENT_RUNTIME_PROVIDER" "local-dev"} + self (make-self env) + headers {"content-type" "application/json" + "x-user-id" "user-1"}] + (-> (.put (.-storage self) + "session" + (clj->js {:id "sess-cancel-checkpoint" + :status "running" + :task {:project {:repo-url "https://github.com/example/repo"} + :sandbox-checkpoint {:provider "cloudflare" + :snapshot-id "checkpoint-existing-cancel"}} + :runtime {:provider "cloudflare" + :sandbox-id "sbx-cancel-checkpoint" + :session-id "sess-cancel-checkpoint"} + :audit {} + :created-at 0 + :updated-at 0})) + (.then (fn [_] + (with-redefs [runtime-provider/clj session :keywordize-keys true)] + (is (empty? @snapshot-calls)) + (is (= 1 (count @terminate-calls))) + (is (nil? (:runtime session))) + (is (= "checkpoint-existing-cancel" + (get-in session [:task :sandbox-checkpoint :snapshot-id]))) + (is (number? (get-in session [:task :sandbox-checkpoint :checkpoint-at]))) + (done)))))) + (.catch (fn [error] + (is false (str "unexpected error: " error)) + (done)))))))) + (deftest terminal-endpoint-requires-authenticated-user-test (testing "session terminal endpoint requires x-user-id header" (async done diff --git a/deps/workers/test/logseq/agents/runtime_provider_test.cljs b/deps/workers/test/logseq/agents/runtime_provider_test.cljs index ebfbda9ed5..30737e0708 100644 --- a/deps/workers/test/logseq/agents/runtime_provider_test.cljs +++ b/deps/workers/test/logseq/agents/runtime_provider_test.cljs @@ -1,8 +1,8 @@ (ns logseq.agents.runtime-provider-test (:require [cljs.test :refer [async deftest is testing]] [clojure.string :as string] - [logseq.agents.sandbox :as sandbox] - [logseq.agents.runtime-provider :as runtime-provider])) + [logseq.agents.runtime-provider :as runtime-provider] + [logseq.agents.sandbox :as sandbox])) (defn- fetch-url [request] (cond @@ -224,7 +224,7 @@ (.catch (fn [error] (let [data (ex-data error)] (is (= :unsupported (:reason data))) - (is (= "local-dev" (:provider data)))) + (is (= "local-dev" (:provider data)))) (done))))))) (deftest vercel-provider-snapshot-restore-flow-test @@ -292,7 +292,49 @@ (done)))))) (.catch (fn [error] (is false (str "unexpected first provision error: " error)) - (done))))))))) + (done)))))))) + +(deftest vercel-provider-restores-from-task-checkpoint-test + (async done + (runtime-provider/clear-vercel-snapshot-cache!) + (let [calls (atom {:clone 0 + :sources []}) + env #js {"VERCEL_TEAM_ID" "team-1" + "VERCEL_PROJECT_ID" "project-1" + "VERCEL_TOKEN" "token-vercel"} + provider (runtime-provider/create-provider env "vercel") + task {:agent {:provider "codex"} + :project {:repo-url "https://github.com/example/repo" + :base-branch "main"} + :sandbox-checkpoint {:provider "vercel" + :snapshot-id "persisted-snap-7" + :backup-dir "/vercel/sandbox/repo"}}] + (with-redefs [runtime-provider/ (runtime-provider/clj backup :keywordize-keys true)) + (js/Promise.resolve #js {:success true})) + :delete (fn [] (js/Promise.resolve nil))}) + sandbox-ns + #js {:idFromName (fn [name] name) + :get (fn [id] + (or (get @sandboxes id) + (let [sandbox (make-sandbox id)] + (swap! sandboxes assoc id sandbox) + sandbox)))} + env #js {"Sandbox" sandbox-ns + "CLOUDFLARE_SANDBOX_AGENT_PORT" "8000"} + provider (runtime-provider/create-provider env "cloudflare") + task {:agent {:provider "codex"} + :project {:repo-url "https://github.com/example/repo" + :base-branch "main"} + :sandbox-checkpoint {:provider "cloudflare" + :snapshot-id "backup-from-task-1"}}] + (-> (runtime-provider/