agents use d1 to persist snapshots metadata

This commit is contained in:
Tienson Qin
2026-03-02 16:46:05 +08:00
parent bab62a5e46
commit 2013d38f5e
8 changed files with 454 additions and 190 deletions

View File

@@ -24,3 +24,4 @@ Milestones are tracked as separate files in this folder:
- `18-m18-github-app-installation-tokens.md`
- `19-m19-vercel-sandbox-runtime-and-snapshot.md`
- `20-m20-resume-session-with-persisted-workspace.md`
- `21-m21-store-snapshots-metadata-in-d1.md`

View File

@@ -0,0 +1,57 @@
# M21: Store Snapshots Metadata in D1
Status: Implemented
Target: Use D1 as the durable source of truth for sandbox checkpoint metadata across sessions.
## Goal
Persist checkpoint metadata by `repo+branch` in D1 so new sessions can restore from existing snapshots without relying on per-isolate in-memory cache or per-session DO storage fallback.
## Why M21
- In-memory snapshot caches are not reliable across isolate restarts and deployments.
- DO storage fallback is session-local and cannot reliably resume across independent tasks/sessions.
- D1 gives a shared, durable lookup path for checkpoint metadata.
## Scope
1) Add `AGENTS_DB` D1 binding for agents worker environments.
2) Add checkpoint metadata store in agents worker:
- key: `repo_key + branch`
- value: `provider`, `snapshot_id`, `backup_key`, `backup_dir`, `checkpoint_at`
- retention: 30-day TTL with opportunistic cleanup.
3) Make runtime provisioning use D1 checkpoint lookup for restore metadata.
4) Upsert D1 metadata when checkpoint/snapshot success persists session checkpoint.
5) Remove provider-side in-memory restore cache fallback.
## Out of Scope
- Persisting snapshot payloads in D1.
- UI changes for checkpoint browsing.
- Backfilling historical checkpoint records.
## Data Model
Table: `sandbox_checkpoints`
- `repo_key TEXT NOT NULL`
- `branch TEXT NOT NULL`
- `provider TEXT NOT NULL`
- `snapshot_id TEXT NOT NULL`
- `backup_key TEXT`
- `backup_dir TEXT`
- `checkpoint_at INTEGER NOT NULL`
- `updated_at INTEGER NOT NULL`
- `expires_at INTEGER NOT NULL`
- `PRIMARY KEY (repo_key, branch)`
- index: `idx_sandbox_checkpoints_expires_at(expires_at)`
## Implementation Notes
- Added `logseq.agents.checkpoint-store` for D1 load/upsert and schema ensure.
- `do.cljs` now:
- loads checkpoint from D1 during `<provision-runtime!` using task repo+branch,
- no longer uses DO storage checkpoint fallback for runtime provisioning,
- upserts D1 metadata when persisting session checkpoint.
- `runtime_provider.cljs` now:
- restores only from explicit task checkpoint metadata,
- no longer restores from in-memory snapshot/backup caches.
## Validation
- Agents worker build compiles cleanly.
- Worker split test suite passes.
- Main CLJS test compile passes (existing unrelated warnings remain).
- Targeted runtime execution of frontend node test harness is currently blocked by missing local file `static/tests-with-dom-shim.js` in this checkout.

View File

@@ -0,0 +1,168 @@
(ns logseq.agents.checkpoint-store
(:require [clojure.string :as string]
[logseq.agents.source-control :as source-control]
[logseq.sync.common :as common]
[promesa.core :as p]))
(def ^:private checkpoint-ttl-ms (* 30 24 60 60 1000))
(def ^:private cleanup-sample-rate 0.05)
(def ^:private cleanup-limit 200)
(defn- non-empty-str
[value]
(when (string? value)
(let [trimmed (string/trim value)]
(when-not (string/blank? trimmed)
trimmed))))
(defn- normalize-provider
[provider]
(some-> provider non-empty-str string/lower-case))
(defn- task-repo-url
[task]
(some-> (get-in task [:project :repo-url]) non-empty-str))
(defn- task-branch
[task]
(or (some-> (get-in task [:project :base-branch]) source-control/sanitize-branch-name)
(some-> (get-in task [:project :branch]) source-control/sanitize-branch-name)
"main"))
(defn- task-repo-key
[task]
(when-let [repo-url (task-repo-url task)]
(let [{:keys [provider owner name]} (source-control/repo-ref repo-url)]
(if (and (string? provider) (string? owner) (string? name))
(str provider "/" (string/lower-case owner) "/" (string/lower-case name))
(string/lower-case repo-url)))))
(defn- task-key
[task]
(let [repo-key (task-repo-key task)
branch (some-> (task-branch task) string/lower-case)]
(when (and (string? repo-key) (string? branch))
{:repo-key repo-key
:branch branch})))
(defn- normalize-checkpoint
[checkpoint]
(let [snapshot-id (some-> (:snapshot-id checkpoint) non-empty-str)
provider (some-> (:provider checkpoint) normalize-provider)
backup-key (some-> (:backup-key checkpoint) non-empty-str)
backup-dir (some-> (:backup-dir checkpoint) non-empty-str)
checkpoint-at (:checkpoint-at checkpoint)]
(when (string? snapshot-id)
(cond-> {:snapshot-id snapshot-id}
(string? provider) (assoc :provider provider)
(string? backup-key) (assoc :backup-key backup-key)
(string? backup-dir) (assoc :backup-dir backup-dir)
(number? checkpoint-at) (assoc :checkpoint-at checkpoint-at)))))
(defn- checkpoint-from-row
[^js row]
(when row
(normalize-checkpoint
{:provider (aget row "provider")
:snapshot-id (aget row "snapshot_id")
:backup-key (aget row "backup_key")
:backup-dir (aget row "backup_dir")
:checkpoint-at (aget row "checkpoint_at")})))
(defn- maybe-cleanup?
[]
(< (js/Math.random) cleanup-sample-rate))
(defn- db-binding
[^js env]
(aget env "AGENTS_DB"))
(defn- <ensure-schema!
[^js db]
(p/do!
(common/<d1-run db
(str "create table if not exists sandbox_checkpoints ("
"repo_key TEXT NOT NULL,"
"branch TEXT NOT NULL,"
"provider TEXT NOT NULL,"
"snapshot_id TEXT NOT NULL,"
"backup_key TEXT,"
"backup_dir TEXT,"
"checkpoint_at INTEGER NOT NULL,"
"updated_at INTEGER NOT NULL,"
"expires_at INTEGER NOT NULL,"
"primary key (repo_key, branch)"
");"))
(common/<d1-run db
"create index if not exists idx_sandbox_checkpoints_expires_at on sandbox_checkpoints(expires_at);")))
(defn- <cleanup-expired!
[^js db now-ms]
(common/<d1-run db
(str "delete from sandbox_checkpoints "
"where rowid in ("
"select rowid from sandbox_checkpoints where expires_at <= ? limit ?"
")")
now-ms
cleanup-limit))
(defn <load-checkpoint-for-task!
[^js env task]
(if-let [db (db-binding env)]
(if-let [{:keys [repo-key branch]} (task-key task)]
(p/let [_ (<ensure-schema! db)
now-ms (common/now-ms)
_ (when (maybe-cleanup?)
(<cleanup-expired! db now-ms))
result (common/<d1-all db
(str "select provider, snapshot_id, backup_key, backup_dir, checkpoint_at "
"from sandbox_checkpoints "
"where repo_key = ? and branch = ? and expires_at > ? "
"limit 1")
repo-key
branch
now-ms)
rows (common/get-sql-rows result)]
(checkpoint-from-row (first rows)))
(p/resolved nil))
(p/resolved nil)))
(defn <upsert-checkpoint-for-task!
[^js env task checkpoint]
(if-let [db (db-binding env)]
(if-let [{:keys [repo-key branch]} (task-key task)]
(if-let [checkpoint (normalize-checkpoint checkpoint)]
(let [checkpoint-at (or (:checkpoint-at checkpoint) (common/now-ms))
updated-at (common/now-ms)
expires-at (+ checkpoint-at checkpoint-ttl-ms)
provider (or (:provider checkpoint) "unknown")
backup-key (:backup-key checkpoint)
backup-dir (:backup-dir checkpoint)]
(p/let [_ (<ensure-schema! db)
_ (common/<d1-run db
(str "insert into sandbox_checkpoints "
"(repo_key, branch, provider, snapshot_id, backup_key, backup_dir, checkpoint_at, updated_at, expires_at) "
"values (?, ?, ?, ?, ?, ?, ?, ?, ?) "
"on conflict(repo_key, branch) do update set "
"provider = excluded.provider, "
"snapshot_id = excluded.snapshot_id, "
"backup_key = excluded.backup_key, "
"backup_dir = excluded.backup_dir, "
"checkpoint_at = excluded.checkpoint_at, "
"updated_at = excluded.updated_at, "
"expires_at = excluded.expires_at")
repo-key
branch
provider
(:snapshot-id checkpoint)
backup-key
backup-dir
checkpoint-at
updated-at
expires-at)
_ (when (maybe-cleanup?)
(<cleanup-expired! db updated-at))]
checkpoint))
(p/resolved nil))
(p/resolved nil))
(p/resolved nil)))

View File

@@ -1,6 +1,7 @@
(ns logseq.agents.do
(:require [clojure.string :as string]
[lambdaisland.glogi :as log]
[logseq.agents.checkpoint-store :as checkpoint-store]
[logseq.agents.runtime-provider :as runtime-provider]
[logseq.agents.session :as session]
[logseq.agents.source-control :as source-control]
@@ -125,8 +126,6 @@
(string? backup-dir) (assoc :backup-dir backup-dir)
(string? reason) (assoc :reason reason)))))
(def ^:private checkpoint-storage-key "sandbox.checkpoint")
(defn- checkpoint-payload-with-reason
[checkpoint reason]
(let [snapshot-id (runtime-snapshot-id checkpoint)]
@@ -148,19 +147,6 @@
(string? backup-key) (assoc :backup-key backup-key)
(string? backup-dir) (assoc :backup-dir backup-dir))))
(defn- <stored-checkpoint-payload
[^js self reason]
(p/let [checkpoint (<storage-get (.-storage self) checkpoint-storage-key)]
(checkpoint-payload-with-reason checkpoint reason)))
(defn- <persist-stored-checkpoint!
[^js self checkpoint]
(if-let [checkpoint (checkpoint-payload-with-reason checkpoint nil)]
(do
(prn :debug :persist :checkpoint-storage-key checkpoint)
(<storage-put! (.-storage self) checkpoint-storage-key checkpoint))
(p/resolved nil)))
(defn- <persist-session-checkpoint!
[^js self expected-session-id checkpoint]
(if-not (and (string? expected-session-id) (map? checkpoint))
@@ -171,7 +157,15 @@
(let [task (session-task latest-session)
task (assoc task :sandbox-checkpoint checkpoint)]
(p/let [_ (<save-session! self (assoc latest-session :task task))
_ (<persist-stored-checkpoint! self checkpoint)]
_ (-> (checkpoint-store/<upsert-checkpoint-for-task! (.-env self)
task
checkpoint)
(p/catch (fn [error]
(log/error :agent/checkpoint-d1-upsert-failed
{:session-id expected-session-id
:task-id (:id task)
:error (str error)})
nil)))]
nil))
nil))))
@@ -187,9 +181,16 @@
(defn- <checkpoint-existing-snapshot!
[^js self current-session {:keys [by reason]}]
(-> (p/let [stored-checkpoint (<stored-checkpoint-payload self reason)
(-> (p/let [d1-checkpoint (-> (checkpoint-store/<load-checkpoint-for-task! (.-env self)
(session-task current-session))
(p/catch (fn [error]
(log/error :agent/checkpoint-d1-load-failed
{:session-id (:id current-session)
:task-id (get-in current-session [:task :id])
:error (str error)})
nil)))
checkpoint (or (existing-checkpoint-payload current-session reason)
stored-checkpoint)
(checkpoint-payload-with-reason d1-checkpoint reason))
_ (<append-event! self {:type "sandbox.checkpoint.started"
:data (cond-> {}
(string? by) (assoc :by by)
@@ -746,24 +747,19 @@
(defn- <provision-runtime! [^js self task session-id]
(let [provider (runtime-provider/resolve-provider (.-env self) nil)
provider-kind (runtime-provider/provider-id provider)]
(p/let [stored-checkpoint (<stored-checkpoint-payload self nil)
_ (prn :debug :stored-checkpoint stored-checkpoint)
task-checkpoint (when (map? task) (:sandbox-checkpoint task))
task-has-snapshot? (string? (runtime-snapshot-id task-checkpoint))
_ (when task-has-snapshot?
(<persist-stored-checkpoint! self task-checkpoint))
task (cond
(not (map? task))
task
task-has-snapshot?
task
(map? stored-checkpoint)
(assoc task :sandbox-checkpoint stored-checkpoint)
:else
task)
(p/let [base-task (if (map? task) (dissoc task :sandbox-checkpoint) task)
d1-checkpoint (if (map? base-task)
(-> (checkpoint-store/<load-checkpoint-for-task! (.-env self) base-task)
(p/catch (fn [error]
(log/error :agent/provision-checkpoint-d1-load-failed
{:session-id session-id
:task-id (:id base-task)
:error (str error)})
nil)))
nil)
task (if (and (map? base-task) (map? d1-checkpoint))
(assoc base-task :sandbox-checkpoint d1-checkpoint)
base-task)
runtime (runtime-provider/<provision-runtime! provider session-id task)
session (<get-session self)]
(cond

View File

@@ -124,16 +124,14 @@
(def ^:private vercel-repo-base-dir "/vercel/sandbox")
(def ^:private cloudflare-local-host "http://localhost")
(def ^:private cloudflare-snapshot-ttl-seconds (* 30 24 60 60))
(defonce ^:private cloudflare-backup-cache (atom {}))
(defonce ^:private vercel-snapshot-cache (atom {}))
(defn clear-cloudflare-backup-cache!
[]
(reset! cloudflare-backup-cache {}))
nil)
(defn clear-vercel-snapshot-cache!
[]
(reset! vercel-snapshot-cache {}))
nil)
(defn- cloudflare-agent-port [^js env]
(parse-int (env-str env "CLOUDFLARE_SANDBOX_AGENT_PORT") 2468))
@@ -517,83 +515,6 @@
(string? backup-key) (assoc :backup-key backup-key)
(string? backup-dir) (assoc :backup-dir backup-dir)))))
(defn- prune-cloudflare-backup-cache!
[]
(let [now (js/Date.now)]
(swap! cloudflare-backup-cache
(fn [entries]
(reduce-kv (fn [acc k v]
(let [expires-at-ms (:expires-at-ms v)
backup-id (:id v)]
(if (and (string? k)
(string? backup-id)
(number? expires-at-ms)
(> expires-at-ms now))
(assoc acc k v)
acc)))
{}
entries)))))
(defn- cloudflare-backup-entry
[backup-key]
(prune-cloudflare-backup-cache!)
(when (string? backup-key)
(get @cloudflare-backup-cache backup-key)))
(defn- remember-cloudflare-backup!
[backup-key backup-id]
(when (and (string? backup-key) (string? backup-id))
(let [now (js/Date.now)
ttl-ms (* cloudflare-snapshot-ttl-seconds 1000)]
(swap! cloudflare-backup-cache assoc backup-key {:id backup-id
:ttl-seconds cloudflare-snapshot-ttl-seconds
:expires-at-ms (+ now ttl-ms)
:updated-at-ms now}))))
(defn- forget-cloudflare-backup!
[backup-key]
(when (string? backup-key)
(swap! cloudflare-backup-cache dissoc backup-key)))
(defn- prune-vercel-snapshot-cache!
[]
(let [now (js/Date.now)]
(swap! vercel-snapshot-cache
(fn [entries]
(reduce-kv (fn [acc k v]
(let [expires-at-ms (:expires-at-ms v)
snapshot-id (:id v)]
(if (and (string? k)
(string? snapshot-id)
(number? expires-at-ms)
(> expires-at-ms now))
(assoc acc k v)
acc)))
{}
entries)))))
(defn- vercel-snapshot-entry
[backup-key]
(prune-vercel-snapshot-cache!)
(when (string? backup-key)
(get @vercel-snapshot-cache backup-key)))
(defn- remember-vercel-snapshot!
[backup-key snapshot-id source-dir]
(when (and (string? backup-key) (string? snapshot-id))
(let [now (js/Date.now)
ttl-ms (* cloudflare-snapshot-ttl-seconds 1000)]
(swap! vercel-snapshot-cache assoc backup-key {:id snapshot-id
:dir source-dir
:ttl-seconds cloudflare-snapshot-ttl-seconds
:expires-at-ms (+ now ttl-ms)
:updated-at-ms now}))))
(defn- forget-vercel-snapshot!
[backup-key]
(when (string? backup-key)
(swap! vercel-snapshot-cache dissoc backup-key)))
(defn- sanitize-backup-name
[value]
(let [raw (or (some-> value str string/lower-case) "snapshot")
@@ -1157,13 +1078,8 @@
:json-body (message-payload message)}))
(defn- <cloudflare-restore-backup!
[^js sandbox backup-key target-dir {:keys [backup-id]}]
(let [entry (cloudflare-backup-entry backup-key)
explicit-backup-id (some-> backup-id str string/trim not-empty)
backup-id (or explicit-backup-id
(:id entry))
from-cache? (and (nil? explicit-backup-id)
(map? entry))
[^js sandbox _backup-key target-dir {:keys [backup-id]}]
(let [backup-id (some-> backup-id str string/trim not-empty)
restore-backup (js-method sandbox "restoreBackup")]
(cond
(or (not (string? target-dir))
@@ -1178,19 +1094,13 @@
:dir target-dir}]
(-> (->promise (.restoreBackup sandbox backup))
(p/then (fn [_]
(when (and (string? backup-key) (string? backup-id))
(remember-cloudflare-backup! backup-key backup-id))
(log/debug :agent/cloudflare-backup-restored
{:backup-key backup-key
:backup-id backup-id
{:backup-id backup-id
:dir target-dir})
true))
(p/catch (fn [error]
(when (and from-cache? (string? backup-key))
(forget-cloudflare-backup! backup-key))
(log/error :agent/cloudflare-backup-restore-failed
{:backup-key backup-key
:backup-id backup-id
{:backup-id backup-id
:dir target-dir
:error (str error)})
false)))))))
@@ -1532,38 +1442,6 @@
_ (<vercel-health! env sandbox port agent-token)]
true))
(defn- <vercel-create-sandbox-from-cache!
[^js env backup-key]
(let [entry (vercel-snapshot-entry backup-key)
snapshot-id (:id entry)
snapshot-dir (:dir entry)]
(if-not (string? snapshot-id)
(p/let [sandbox (<vercel-create-sandbox! env nil)]
{:sandbox sandbox
:snapshot-dir nil
:restored? false})
(-> (<vercel-create-sandbox! env {:type "snapshot"
:snapshotId snapshot-id})
(p/then (fn [sandbox]
(log/debug :agent/vercel-snapshot-restored
{:backup-key backup-key
:snapshot-id snapshot-id})
{:sandbox sandbox
:snapshot-id snapshot-id
:snapshot-dir snapshot-dir
:restored? true}))
(p/catch (fn [error]
(forget-vercel-snapshot! backup-key)
(log/error :agent/vercel-snapshot-restore-failed
{:backup-key backup-key
:snapshot-id snapshot-id
:error (str error)})
(p/let [sandbox (<vercel-create-sandbox! env nil)]
{:sandbox sandbox
:snapshot-id nil
:snapshot-dir nil
:restored? false})))))))
(defn- <vercel-create-sandbox-from-checkpoint!
[^js env checkpoint]
(let [snapshot-id (:snapshot-id checkpoint)
@@ -1588,11 +1466,14 @@
nil))))))
(defn- <vercel-create-sandbox-for-restore!
[^js env backup-key checkpoint]
[^js env checkpoint]
(p/let [checkpoint-restore (<vercel-create-sandbox-from-checkpoint! env checkpoint)]
(if (map? checkpoint-restore)
checkpoint-restore
(<vercel-create-sandbox-from-cache! env backup-key))))
(p/let [sandbox (<vercel-create-sandbox! env nil)]
{:sandbox sandbox
:snapshot-dir nil
:restored? false}))))
(defn- <vercel-runtime-base-url!
[^js env runtime]
@@ -1874,15 +1755,8 @@
backup-key (or (:backup-key checkpoint)
(repo-backup-key task))]
(p/let [{:keys [sandbox snapshot-id snapshot-dir restored?]} (<vercel-create-sandbox-for-restore! env
backup-key
checkpoint)
_ (<vercel-ensure-running! env sandbox session-id task port agent-token)
_ (when (and restored?
(string? backup-key)
(string? snapshot-id))
(remember-vercel-snapshot! backup-key
snapshot-id
(or snapshot-dir repo-dir)))
_ (when restored?
(<vercel-restore-repo-dir! sandbox snapshot-dir repo-dir))
_ (when-not restored?
@@ -1939,8 +1813,6 @@
(p/let [sandbox (<vercel-get-sandbox! env sandbox-id)
result (<vercel-create-snapshot! sandbox backup-dir snapshot-name)
snapshot-id (:snapshot-id result)]
(when (and (string? backup-key) (string? snapshot-id))
(remember-vercel-snapshot! backup-key snapshot-id backup-dir))
(cond-> result
(string? backup-key) (assoc :backup-key backup-key)
(string? backup-dir) (assoc :backup-dir backup-dir)
@@ -2105,9 +1977,7 @@
:runtime runtime})))
(let [sandbox (cloudflare-sandbox env sandbox-id)]
(p/let [result (<cloudflare-create-backup! sandbox backup-dir backup-name)
snapshot-id (:snapshot-id result)]
(when (and (string? backup-key) (string? snapshot-id))
(remember-cloudflare-backup! backup-key snapshot-id))
_snapshot-id (:snapshot-id result)]
(cond-> result
(string? backup-key) (assoc :backup-key backup-key)
(string? backup-dir) (assoc :backup-dir backup-dir)

View File

@@ -3,7 +3,8 @@
[clojure.string :as string]
[logseq.agents.do :as agent-do]
[logseq.agents.runtime-provider :as runtime-provider]
[logseq.agents.source-control :as source-control]))
[logseq.agents.source-control :as source-control]
[logseq.sync.common :as common]))
(defn- make-agent-storage []
(let [data (js/Map.)]
@@ -285,6 +286,129 @@
(is false (str "unexpected provision checkpoint error: " error))
(done))))))))
(deftest provision-runtime-loads-checkpoint-from-d1-test
(testing "provision runtime should load sandbox checkpoint from D1 metadata by repo+branch"
(async done
(let [env #js {"AGENT_RUNTIME_PROVIDER" "vercel"
"AGENTS_DB" #js {}}
self (make-self env)
task {:id "sess-d1-checkpoint"
:agent "codex"
:project {:repo-url "https://github.com/logseq/logseq"
:base-branch "main"}}
passed-task (atom nil)
runtime {:provider "vercel"
:session-id "runtime-d1"
:sandbox-id "sbx-d1"}
provider (reify runtime-provider/RuntimeProvider
(<provision-runtime! [_ _session-id task]
(reset! passed-task task)
(js/Promise.resolve runtime))
(<open-events-stream! [_ _runtime]
(js/Promise.resolve nil))
(<send-message! [_ _runtime _message]
(js/Promise.resolve true))
(<open-terminal! [_ _runtime _request _opts]
(js/Promise.resolve nil))
(<snapshot-runtime! [_ _runtime _opts]
(js/Promise.resolve nil))
(<push-branch! [_ _runtime _opts]
(js/Promise.resolve nil))
(<terminate-runtime! [_ _runtime]
(js/Promise.resolve nil)))
row #js {"provider" "vercel"
"snapshot_id" "snapshot-from-d1"
"backup_key" "github/logseq/logseq#main"
"backup_dir" "/vercel/sandbox/logseq"
"checkpoint_at" 1000}]
(-> (.put (.-storage self)
"session"
(clj->js {:id "sess-d1-checkpoint"
:status "running"
:task task
:audit {}
:created-at 0
:updated-at 0}))
(.then (fn [_]
(with-redefs [runtime-provider/resolve-provider (fn [_env _runtime] provider)
runtime-provider/provider-id (fn [_provider] "vercel")
agent-do/start-runtime-events-stream-background! (fn [& _] nil)
common/<d1-all (fn [_db _sql & _args]
(js/Promise.resolve #js {:results #js [row]}))
common/get-sql-rows (fn [result]
(aget result "results"))]
(#'agent-do/<provision-runtime! self task "sess-d1-checkpoint"))))
(.then (fn [_]
(is (= "snapshot-from-d1"
(get-in @passed-task [:sandbox-checkpoint :snapshot-id])))
(is (= "vercel"
(get-in @passed-task [:sandbox-checkpoint :provider])))
(is (= "github/logseq/logseq#main"
(get-in @passed-task [:sandbox-checkpoint :backup-key])))
(done)))
(.catch (fn [error]
(is false (str "unexpected d1 checkpoint provision error: " error))
(done))))))))
(deftest provision-runtime-does-not-use-storage-checkpoint-fallback-test
(testing "provision runtime should not use Durable Object storage checkpoint fallback"
(async done
(let [env #js {"AGENT_RUNTIME_PROVIDER" "vercel"
"AGENTS_DB" #js {}}
self (make-self env)
task {:id "sess-no-storage-fallback"
:agent "codex"
:project {:repo-url "https://github.com/logseq/logseq"
:base-branch "main"}}
passed-task (atom nil)
runtime {:provider "vercel"
:session-id "runtime-no-storage-fallback"
:sandbox-id "sbx-no-storage-fallback"}
provider (reify runtime-provider/RuntimeProvider
(<provision-runtime! [_ _session-id task]
(reset! passed-task task)
(js/Promise.resolve runtime))
(<open-events-stream! [_ _runtime]
(js/Promise.resolve nil))
(<send-message! [_ _runtime _message]
(js/Promise.resolve true))
(<open-terminal! [_ _runtime _request _opts]
(js/Promise.resolve nil))
(<snapshot-runtime! [_ _runtime _opts]
(js/Promise.resolve nil))
(<push-branch! [_ _runtime _opts]
(js/Promise.resolve nil))
(<terminate-runtime! [_ _runtime]
(js/Promise.resolve nil)))]
(-> (.put (.-storage self)
"session"
(clj->js {:id "sess-no-storage-fallback"
:status "running"
:task task
:audit {}
:created-at 0
:updated-at 0}))
(.then (fn [_]
(.put (.-storage self)
"sandbox.checkpoint"
(clj->js {:provider "vercel"
:snapshot-id "from-storage-should-not-be-used"}))))
(.then (fn [_]
(with-redefs [runtime-provider/resolve-provider (fn [_env _runtime] provider)
runtime-provider/provider-id (fn [_provider] "vercel")
agent-do/start-runtime-events-stream-background! (fn [& _] nil)
common/<d1-all (fn [_db _sql & _args]
(js/Promise.resolve #js {:results #js []}))
common/get-sql-rows (fn [result]
(aget result "results"))]
(#'agent-do/<provision-runtime! self task "sess-no-storage-fallback"))))
(.then (fn [_]
(is (nil? (get-in @passed-task [:sandbox-checkpoint :snapshot-id])))
(done)))
(.catch (fn [error]
(is false (str "unexpected no-storage-fallback error: " error))
(done))))))))
(deftest checkpoint-existing-snapshot-falls-back-to-runtime-snapshot-test
(testing "checkpoint refresh reuses runtime snapshot metadata when task checkpoint is missing"
(async done
@@ -325,6 +449,43 @@
(is false (str "unexpected runtime-checkpoint fallback error: " error))
(done))))))))
(deftest checkpoint-existing-snapshot-upserts-d1-metadata-test
(testing "checkpoint refresh should upsert checkpoint metadata into D1"
(async done
(let [env #js {"AGENT_RUNTIME_PROVIDER" "vercel"
"AGENTS_DB" #js {}}
self (make-self env)
session {:id "sess-checkpoint-d1-upsert"
:status "running"
:task {:project {:repo-url "https://github.com/logseq/logseq"
:base-branch "main"}}
:runtime {:provider "vercel"
:session-id "sess-checkpoint-d1-upsert"
:sandbox-id "sbx-checkpoint-d1-upsert"
:snapshot-id "runtime-snapshot-d1-upsert"
:backup-key "github/logseq/logseq#main"
:backup-dir "/vercel/sandbox/logseq"}
:audit {}
:created-at 0
:updated-at 0}
d1-runs (atom 0)]
(-> (.put (.-storage self) "session" (clj->js session))
(.then (fn [_]
(with-redefs [common/<d1-run (fn [_db _sql & _args]
(swap! d1-runs inc)
(js/Promise.resolve {:ok true}))]
(#'agent-do/<checkpoint-existing-snapshot! self
session
{:by "system"
:reason "session-completed"}))))
(.then (fn [ok?]
(is (true? ok?))
(is (= 1 @d1-runs))
(done)))
(.catch (fn [error]
(is false (str "unexpected checkpoint d1 upsert error: " error))
(done))))))))
(deftest runtime-session-completed-checkpoints-existing-and-terminates-test
(testing "session.completed runtime event checkpoints existing pointer and terminates runtime without snapshot creation"
(async done

View File

@@ -227,7 +227,7 @@
(is (= "local-dev" (:provider data))))
(done)))))))
(deftest vercel-provider-snapshot-restore-flow-test
(deftest vercel-provider-does-not-restore-from-in-memory-cache-test
(async done
(runtime-provider/clear-vercel-snapshot-cache!)
(let [calls (atom {:clone 0
@@ -280,10 +280,10 @@
(is (= 1 (:snapshots @calls)))
(-> (runtime-provider/<provision-runtime! provider "sess-vercel-2" task)
(.then (fn [runtime-2]
(is (= 1 (:clone @calls)))
(is (= 1 (:restores @calls)))
(is (= 2 (:clone @calls)))
(is (= 0 (:restores @calls)))
(is (= 2 (:sessions @calls)))
(is (= "vercel-snap-1" (:snapshot-id runtime-2)))
(is (nil? (:snapshot-id runtime-2)))
(done)))
(.catch (fn [error]
(is false (str "unexpected second provision error: " error))
@@ -1138,7 +1138,7 @@
(is false (str "unexpected provision error: " error))
(done)))))))
(deftest cloudflare-provider-restores-from-cached-snapshot-test
(deftest cloudflare-provider-does-not-restore-from-in-memory-cache-test
(async done
(runtime-provider/clear-cloudflare-backup-cache!)
(let [calls (atom {:clone 0
@@ -1199,12 +1199,8 @@
(is (= 1 (:backup @calls)))
(-> (runtime-provider/<provision-runtime! provider "sess-cache-next" task)
(.then (fn [_next-runtime]
(is (= 0 (:clone @calls)))
(is (= 1 (count (:restore @calls))))
(is (= "backup-restore-1"
(get-in @calls [:restore 0 :id])))
(is (= "/workspace/sess-cache-next"
(get-in @calls [:restore 0 :dir])))
(is (= 1 (:clone @calls)))
(is (= 0 (count (:restore @calls))))
(done)))
(.catch (fn [error]
(is false (str "unexpected reprovision error: " error))

View File

@@ -32,6 +32,11 @@ bucket_name = "logseq-sync-assets-prod"
preview_bucket_name = "logseq-sync-assets-prod"
remote = true
[[d1_databases]]
binding = "AGENTS_DB"
database_name = "logseq-agents-staging"
database_id = "00325aa2-c805-4693-b599-900a25dcde42"
[[migrations]]
tag = "v1"
new_sqlite_classes = [ "AgentSessionDO" ]
@@ -82,6 +87,11 @@ bucket_name = "logseq-sync-assets-dev"
preview_bucket_name = "logseq-sync-assets-dev"
remote = true
[[env.staging.d1_databases]]
binding = "AGENTS_DB"
database_name = "logseq-agents-staging"
database_id = "00325aa2-c805-4693-b599-900a25dcde42"
[[env.staging.migrations]]
tag = "v1"
new_sqlite_classes = [ "AgentSessionDO" ]
@@ -127,6 +137,11 @@ bucket_name = "logseq-sync-assets-prod"
preview_bucket_name = "logseq-sync-assets-prod"
remote = true
[[env.prod.d1_databases]]
binding = "AGENTS_DB"
database_name = "logseq-agents-prod"
database_id = "4c80e058-69b5-4985-88d1-f53711d817ba"
[[env.prod.migrations]]
tag = "v1"
new_sqlite_classes = [ "AgentSessionDO" ]