This commit is contained in:
Tienson Qin
2026-02-02 06:12:28 +08:00
parent bba3014243
commit 295823315b
20 changed files with 620 additions and 74 deletions

View File

@@ -65,6 +65,27 @@ cd deps/db-sync
npm run test:node-adapter
```
### Local Sandbox Agent (for agent sessions)
Use the local sandbox-agent repo at `~/Codes/projects/sandbox-agent`:
```bash
cd deps/db-sync
./scripts/start-local-sandbox-agent.sh
```
Then run db-sync worker with:
```bash
SANDBOX_AGENT_URL=http://127.0.0.1:2468
```
If sandbox-agent runs with token auth, also set:
```bash
SANDBOX_AGENT_TOKEN=...
```
## Environment Variables
| Variable | Purpose |
@@ -82,6 +103,8 @@ npm run test:node-adapter
| COGNITO_ISSUER | Cognito issuer URL |
| COGNITO_CLIENT_ID | Cognito client id |
| COGNITO_JWKS_URL | Cognito JWKS URL |
| SANDBOX_AGENT_URL | sandbox-agent base URL for agent sessions |
| SANDBOX_AGENT_TOKEN | Optional bearer token for sandbox-agent |
## Notes
- Protocol definitions live in `docs/agent-guide/db-sync/protocol.md`.

View File

@@ -69,25 +69,8 @@ centralize session persistence, and keep execution isolated from production.
- Add observability (logs, metrics, session replay) and a permission model.
## Milestones
1) Architecture + API shape (M1)
- Confirm document-driven workflow entrypoints and task schema.
- Define session lifecycle, event model, and required auth surfaces.
- Draft API contracts for control plane and agent runtime.
2) Control plane prototype (M2)
- Implement Durable Object session coordination.
- Add streaming and multi-client observation.
- Wire basic auth and session persistence.
3) Sandbox + agent integration (M3)
- Provision sandbox per session and run Sandbox Agent inside it.
- Implement adapter to select Codex/Claude Code backends.
- Run end-to-end task execution from Logseq doc to agent output.
4) Operational readiness (M4)
- Add logs, metrics, and session replay.
- Add permission model and audit trails.
- Document setup and rollout for internal dogfooding.
Milestones moved to:
- `docs/milestones/agents/00-index.md`
## References
- https://github.com/rivet-dev/sandbox-agent

View File

@@ -0,0 +1,10 @@
# Agent Service Milestones
Date: 2026-02-01
Status: Active
Milestones are tracked as separate files in this folder:
- `01-m1-architecture-api-shape.md`
- `02-m2-control-plane-prototype.md`
- `03-m3-sandbox-agent-integration.md`
- `04-m4-operational-readiness.md`

View File

@@ -0,0 +1,5 @@
# M1: Architecture + API Shape
- Confirm document-driven workflow entrypoints and task schema.
- Define session lifecycle, event model, and required auth surfaces.
- Draft API contracts for control plane and agent runtime.

View File

@@ -0,0 +1,5 @@
# M2: Control Plane Prototype
- Implement Durable Object session coordination.
- Add streaming and multi-client observation.
- Wire basic auth and session persistence.

View File

@@ -0,0 +1,8 @@
# M3: Sandbox + Agent Integration
- Use local sandbox-agent at `~/Codes/projects/sandbox-agent` as the runtime.
- Provision sessions through sandbox-agent HTTP API (`/v1/sessions/{session_id}`).
- Send task messages through `/v1/sessions/{session_id}/messages`.
- Track sandbox session metadata in DO runtime state.
- Implement adapter to select Codex/Claude Code backends.
- Run end-to-end task execution from Logseq doc to agent output.

View File

@@ -0,0 +1,148 @@
# M4: Operational Readiness (Full Sandbox-Agent Integration)
Status: Planned
Target: Production-ready, fully wired Sandbox Agent integration for Logseq task-driven development.
## Goal
Make agent sessions reliable, observable, secure, and controllable end-to-end:
- `#Task` -> session creation -> sandbox-agent run -> live events -> pause/resume/interrupt -> replay/audit -> completion.
## Scope
- Control plane (Worker + Durable Object) and sandbox-agent runtime integration.
- Event bridge, approval flow, reliability controls, observability, security, and rollout readiness.
## Out of Scope
- New product UI redesign in Logseq.
- Non-coding agent providers that do not expose compatible runtime streams.
## Exit Criteria (M4 Done)
1) Live event bridge is stable and replayable.
2) Pause/resume/interrupt works across Codex and Claude Code providers.
3) Approval policy gates privileged tool calls.
4) Session replay/audit trail is complete and queryable.
5) Alerting, dashboards, and runbook are in place.
6) E2E integration tests pass in CI.
## Workstreams
### WS1: Event Bridge Completion
- Consume sandbox runtime stream (`/v1/sessions/:id/events/sse`) in the DO.
- Map runtime events into control-plane canonical events:
- `agent.message`
- `agent.tool_call`
- `agent.tool_result`
- `agent.artifact`
- `agent.summary`
- `session.running|paused|completed|failed`
- Persist mapped events with monotonic cursor and event-id.
- Broadcast to `/sessions/:id/stream` subscribers.
- Add replay endpoint semantics with filters:
- `since` (timestamp)
- `cursor`
- `limit`
Acceptance:
- Reconnect from dropped client resumes from cursor with no event loss or duplication.
### WS2: Agent Control Semantics (Pause/Resume/Interrupt)
- Add explicit control endpoints:
- `POST /sessions/:id/pause`
- `POST /sessions/:id/resume`
- `POST /sessions/:id/interrupt`
- Enforce state machine transitions:
- `running -> paused -> running`
- `running|paused -> canceled|failed|completed`
- Ensure "new orders" while paused are queued and applied deterministically on resume.
- Ensure interrupt stops active tool/task execution where provider supports it.
Acceptance:
- User can pause any running session and inject new instructions before resuming.
### WS3: Approval and Permission Model
- Define policy model for tool permissions by workspace/user/session:
- allow/deny lists
- privileged tools requiring approval
- Add approval lifecycle events:
- `agent.approval_requested`
- `agent.approval_granted`
- `agent.approval_denied`
- Block runtime continuation until approval decision for gated actions.
- Record approver identity, timestamp, reason, and affected call-id.
Acceptance:
- Privileged calls cannot execute without explicit approval.
### WS4: Reliability and Recovery
- Add retry/backoff policies for sandbox create/message/stream operations.
- Implement heartbeat + idle timeout management.
- Add reconciliation job for "stuck" sessions (no heartbeat / stalled stream).
- Add idempotency for session creation and message submission.
- Add crash/redeploy recovery path from stored session/runtime metadata.
Acceptance:
- Sessions recover safely after transient failures and deploy restarts.
### WS5: Observability, Replay, and Audit
- Structured logs for each stage with `session-id`, `task-id`, `workspace-id`, `user-id`.
- Metrics:
- session create latency
- stream lag
- event throughput
- approval wait time
- failure rates by phase/provider
- Dashboards and alerts:
- high failure rate
- stalled sessions
- replay backlog
- Audit completeness checks:
- every action has actor + timestamp + target session.
Acceptance:
- On-call can diagnose a failed session from logs + replay alone.
### WS6: Security Hardening
- Per-session scoped runtime token and expiry/rotation.
- Enforce repo/workdir boundaries and forbidden command policy.
- Rate limit create/message/control endpoints per user/workspace.
- Redact secrets from logs/events/artifacts.
- Validate all external input payloads + enforce payload size limits.
Acceptance:
- Security review passes for least privilege and data exposure controls.
### WS7: Test & Rollout Readiness
- Add E2E tests:
- task create -> run -> summary
- pause -> new order -> resume
- approval required -> grant/deny paths
- stream reconnect + replay cursor
- provider swap (Codex/Claude)
- Add chaos tests for disconnects/timeouts/retries.
- Internal dogfood rollout phases:
1) canary users
2) broader internal usage
3) default-on for selected workspaces
- Add rollback playbook with feature flags.
Acceptance:
- CI gates M4 features with stable E2E pass and rollback verified.
## Deliverables
- Updated API surface (control + replay + control actions).
- Canonical event schema with cursor semantics.
- Approval policy config + enforcement.
- Observability dashboards + alerts + runbook.
- Security checklist and rollout checklist.
## Dependencies
- Stable sandbox-agent deployment and auth configuration.
- Local dev baseline: `~/Codes/projects/sandbox-agent` server reachable at
`SANDBOX_AGENT_URL` (default `http://127.0.0.1:2468`).
- Durable Object storage schema finalized for event indexing/cursor.
- Provider-level support for pause/interrupt semantics.
## Risks and Mitigations
- Provider behavior differences (Codex vs Claude): use capability matrix + adapter fallback paths.
- Stream gaps/duplication: enforce cursor-based idempotent replay and monotonic ordering checks.
- Long-running session drift: add heartbeat watchdog + reconciliation.
- Operational complexity: phase rollout and feature-flag risky controls.

View File

@@ -0,0 +1,26 @@
#!/usr/bin/env bash
set -euo pipefail
SANDBOX_AGENT_REPO="${SANDBOX_AGENT_REPO:-$HOME/Codes/projects/sandbox-agent}"
SANDBOX_AGENT_HOST="${SANDBOX_AGENT_HOST:-127.0.0.1}"
SANDBOX_AGENT_PORT="${SANDBOX_AGENT_PORT:-2468}"
SANDBOX_AGENT_TOKEN="${SANDBOX_AGENT_TOKEN:-}"
if [ ! -d "$SANDBOX_AGENT_REPO" ]; then
echo "sandbox-agent repo not found: $SANDBOX_AGENT_REPO" >&2
exit 1
fi
cd "$SANDBOX_AGENT_REPO"
if [ -n "$SANDBOX_AGENT_TOKEN" ]; then
exec cargo run -p sandbox-agent -- server \
--host "$SANDBOX_AGENT_HOST" \
--port "$SANDBOX_AGENT_PORT" \
--token "$SANDBOX_AGENT_TOKEN"
else
exec cargo run -p sandbox-agent -- server \
--host "$SANDBOX_AGENT_HOST" \
--port "$SANDBOX_AGENT_PORT" \
--no-token
fi

65
deps/db-sync/scripts/start-weather-session.sh vendored Executable file
View File

@@ -0,0 +1,65 @@
#!/usr/bin/env bash
set -euo pipefail
BASE_URL="${BASE_URL:-http://127.0.0.1:8787}"
TOKEN="${TOKEN:-dev-token}"
SESSION_ID="${SESSION_ID:-task-weather-hangzhou-001}"
create_payload() {
cat <<JSON
{
"id": "${SESSION_ID}",
"source": {
"node-id": "task-node-1",
"node-title": "Check weather in Hangzhou",
"node-revision": "2026-02-02T00:00:00Z",
"snapshot": {
"content": "#Task tell me the weather today in Hangzhou",
"references": [],
"attachments": []
}
},
"intent": {
"title": "Weather query",
"summary": "Ask codex for weather in Hangzhou today"
},
"agent": {
"provider": "codex",
"model": "default",
"tools": ["web"]
},
"audit": {
"requested-at": 1769980800000,
"priority": "normal"
}
}
JSON
}
message_payload() {
cat <<JSON
{
"message": "Tell me the weather today in Hangzhou.",
"kind": "user"
}
JSON
}
echo "Creating session: ${SESSION_ID}"
curl -sS -X POST "${BASE_URL}/sessions" \
-H "authorization: Bearer ${TOKEN}" \
-H "content-type: application/json" \
-d "$(create_payload)"
echo
echo "Sending message to session: ${SESSION_ID}"
curl -sS -X POST "${BASE_URL}/sessions/${SESSION_ID}/messages" \
-H "authorization: Bearer ${TOKEN}" \
-H "content-type: application/json" \
-d "$(message_payload)"
echo
echo "Streaming events (Ctrl+C to stop):"
curl -N -sS -H "authorization: Bearer ${TOKEN}" "${BASE_URL}/sessions/${SESSION_ID}/stream"

View File

@@ -230,7 +230,6 @@
[:source agent-task-source-schema]
[:intent {:optional true} :map]
[:agent {:optional true} :map]
[:execution {:optional true} :map]
[:audit {:optional true} :map]])
(def sessions-create-request-schema agent-task-schema)
@@ -255,6 +254,15 @@
[:created-at :int]
[:updated-at :int]])
(def sessions-events-response-schema
[:map
[:events [:sequential :map]]])
(def sessions-resume-response-schema
[:map
[:ok :boolean]
[:flushed {:optional true} :int]])
(def http-request-schemas
{:graphs/create graph-create-request-schema
:graph-members/create graph-member-create-request-schema
@@ -292,7 +300,11 @@
:sessions/create sessions-create-response-schema
:sessions/get sessions-get-response-schema
:sessions/message http-ok-response-schema
:sessions/pause http-ok-response-schema
:sessions/resume sessions-resume-response-schema
:sessions/interrupt http-ok-response-schema
:sessions/cancel http-ok-response-schema
:sessions/events sessions-events-response-schema
:error http-error-response-schema})
(def ^:private json-transformer

View File

@@ -36,6 +36,10 @@
(defn- <put-events! [^js self events]
(<storage-put! (.-storage self) "events" events))
(defn- <save-session! [^js self session]
(p/let [_ (<put-session! self session)]
session))
(defn- stream-url [request session-id]
(let [base (or (header request "x-stream-base")
(.-origin (platform/request-url request)))]
@@ -61,6 +65,24 @@
(broadcast-event! self event)
{:session session :event event})))))
(defn- session-conflict [message]
(http/error-response message 409))
(defn- <transition! [^js self to-status event-type data]
(p/let [session (<get-session self)]
(cond
(nil? session)
(http/not-found)
(not (session/transition-allowed? (:status session) to-status))
(session-conflict (str "cannot transition from " (:status session) " to " to-status))
:else
(p/let [res (<append-event! self {:type event-type :data data})]
(if (= (:error res) :missing-session)
(http/not-found)
(http/json-response :sessions/pause {:ok true}))))))
(defn- sandbox-base [^js env]
(aget env "SANDBOX_AGENT_URL"))
@@ -71,27 +93,25 @@
(let [base (sandbox-base (.-env self))]
(if-not (string? base)
(p/resolved nil)
(p/let [payload {:agent (:agent task)
:repo (get-in task [:execution :repo])
:workdir (get-in task [:execution :workdir])
:env (get-in task [:execution :env])
:task-id session-id}
response (sandbox/<create-session base (sandbox-token (.-env self)) payload)
sandbox-session-id (:session-id response)
runtime {:sandbox {:base (sandbox/normalize-base-url base)
:session-id sandbox-session-id
:stream-url (sandbox/stream-url base sandbox-session-id)}}]
(p/let [session (<get-session self)
events (<get-events self)]
(if (nil? session)
nil
(let [session (assoc session :runtime runtime)
[session events _event] (session/append-event session events {:type "session.provisioned"
:data {:sandbox-session-id sandbox-session-id}
:ts (common/now-ms)})]
(p/let [_ (<put-session! self session)
_ (<put-events! self events)]
runtime))))))))
(let [sandbox-base (sandbox/normalize-base-url base)]
(p/let [payload {:agent (:agent task)
:model (get-in task [:agent :model])
:permission-mode (get-in task [:agent :permission-mode])}
response (sandbox/<create-session sandbox-base (sandbox-token (.-env self)) session-id payload)
sandbox-session-id (:session-id response)
runtime {:sandbox {:base sandbox-base
:session-id sandbox-session-id}}]
(p/let [session (<get-session self)
events (<get-events self)]
(if (nil? session)
nil
(let [session (assoc session :runtime runtime)
[session events _event] (session/append-event session events {:type "session.provisioned"
:data {:sandbox-session-id sandbox-session-id}
:ts (common/now-ms)})]
(p/let [_ (<put-session! self session)
_ (<put-events! self events)]
runtime)))))))))
(defn- handle-init [^js self request]
(p/let [existing (<get-session self)]
@@ -166,17 +186,30 @@
:data {:message message
:kind (:kind body)
:by user-id}})
session (<get-session self)
runtime (get-in session [:runtime :sandbox])
_ (when (and runtime (string? (:session-id runtime)))
(sandbox/<send-message (sandbox/normalize-base-url (:base runtime))
(sandbox-token (.-env self))
(:session-id runtime)
{:message message
:kind (:kind body)}))]
(if (= (:error res) :missing-session)
current-session (<get-session self)]
(cond
(= (:error res) :missing-session)
(http/not-found)
(http/json-response :sessions/message {:ok true})))))))))
(contains? #{"completed" "failed" "canceled"} (:status current-session))
(session-conflict "session is not writable")
(= "paused" (:status current-session))
(let [next-session (session/enqueue-order current-session {:message message
:kind (:kind body)
:by user-id})]
(p/let [_ (<save-session! self next-session)]
(http/json-response :sessions/message {:ok true})))
:else
(let [runtime (get-in current-session [:runtime :sandbox])]
(p/let [_ (when (and runtime (string? (:session-id runtime)))
(sandbox/<send-message (:base runtime)
(sandbox-token (.-env self))
(:session-id runtime)
{:message message
:kind (:kind body)}))]
(http/json-response :sessions/message {:ok true})))))))))))
(defn- handle-cancel [^js self request]
(let [user-id (user-id-from-request request)]
@@ -188,6 +221,47 @@
(http/not-found)
(http/json-response :sessions/cancel {:ok true}))))))
(defn- <flush-pending-orders! [^js self]
(p/let [current-session (<get-session self)]
(if (nil? current-session)
nil
(let [[orders next-session] (session/drain-orders current-session)
runtime (get-in current-session [:runtime :sandbox])]
(p/let [_ (<save-session! self next-session)
_ (when (and runtime (string? (:session-id runtime)))
(p/all
(map (fn [order]
(sandbox/<send-message (:base runtime)
(sandbox-token (.-env self))
(:session-id runtime)
{:message (:message order)
:kind (:kind order)}))
orders)))]
(count orders))))))
(defn- handle-pause [^js self request]
(let [user-id (user-id-from-request request)]
(if-not (string? user-id)
(http/unauthorized)
(<transition! self "paused" "session.paused" {:by user-id :reason "user-pause"}))))
(defn- handle-resume [^js self request]
(let [user-id (user-id-from-request request)]
(if-not (string? user-id)
(http/unauthorized)
(p/let [resp (<transition! self "running" "session.running" {:by user-id :reason "user-resume"})]
(if-not (= 200 (.-status resp))
resp
(p/let [flushed (<flush-pending-orders! self)]
(http/json-response :sessions/resume {:ok true
:flushed (or flushed 0)})))))))
(defn- handle-interrupt [^js self request]
(let [user-id (user-id-from-request request)]
(if-not (string? user-id)
(http/unauthorized)
(<transition! self "paused" "session.paused" {:by user-id :reason "interrupt"}))))
(defn- handle-stream [^js self request]
(let [streams (.-streams self)
stream (js/TransformStream.)
@@ -210,6 +284,26 @@
"connection" "keep-alive"}
(common/cors-headers))})))
(defn- parse-int [value]
(when (string? value)
(let [parsed (js/parseInt value 10)]
(when (js/Number.isFinite parsed) parsed))))
(defn- handle-events [^js self request]
(let [url (platform/request-url request)
since-ts (parse-int (.get (.-searchParams url) "since"))
limit (parse-int (.get (.-searchParams url) "limit"))
user-id (user-id-from-request request)]
(log/info :agent/session-events-request {:user-id user-id
:since since-ts
:limit limit})
(p/let [session (<get-session self)]
(if (nil? session)
(http/not-found)
(p/let [events (<get-events self)
filtered (session/filter-events events {:since-ts since-ts :limit limit})]
(http/json-response :sessions/events {:events filtered}))))))
(defn handle-fetch [^js self request]
(let [url (platform/request-url request)
path (.-pathname url)
@@ -228,12 +322,24 @@
(= path "/__session__/messages")
(handle-messages self request)
(= path "/__session__/pause")
(handle-pause self request)
(= path "/__session__/resume")
(handle-resume self request)
(= path "/__session__/interrupt")
(handle-interrupt self request)
(= path "/__session__/cancel")
(handle-cancel self request)
(= path "/__session__/stream")
(handle-stream self request)
(= path "/__session__/events")
(handle-events self request)
:else
(http/not-found))
(catch :default error

View File

@@ -6,14 +6,14 @@
(defn normalize-base-url [base]
(string/replace (or base "") #"/+$" ""))
(defn sessions-url [base]
(str (normalize-base-url base) "/sandbox/sessions"))
(defn sessions-base-url [base]
(str (normalize-base-url base) "/v1/sessions"))
(defn session-url [base session-id]
(str (sessions-base-url base) "/" session-id))
(defn messages-url [base session-id]
(str (normalize-base-url base) "/sandbox/sessions/" session-id "/messages"))
(defn stream-url [base session-id]
(str (normalize-base-url base) "/sandbox/sessions/" session-id "/stream"))
(str (session-url base session-id) "/messages"))
(defn- json-request [url method headers body]
(let [init (cond-> {:method method :headers headers}
@@ -21,27 +21,33 @@
(assoc :body (js/JSON.stringify (clj->js body))))]
(platform/request url (clj->js init))))
(defn- parse-json-or-default [^js resp fallback]
(let [content-type (.get (.-headers resp) "content-type")]
(if (and (string? content-type) (string/includes? content-type "application/json"))
(.then (.json resp) #(js->clj % :keywordize-keys true))
(js/Promise.resolve fallback))))
(defn <create-session
[base token {:keys [agent repo workdir env] :as payload}]
[base token session-id {:keys [agent model permission-mode]}]
(let [headers (js/Headers.)
_ (.set headers "content-type" "application/json")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
req (json-request (sessions-url base) "POST" headers
req (json-request (session-url base session-id) "POST" headers
{:agent agent
:repo repo
:workdir workdir
:env env
:payload payload})]
:model model
:permissionMode permission-mode})]
(p/let [resp (js/fetch req)
json (.json resp)]
(js->clj json :keywordize-keys true))))
json (parse-json-or-default resp {})]
(assoc json :session-id session-id))))
(defn <send-message
[base token session-id message]
(let [headers (js/Headers.)
_ (.set headers "content-type" "application/json")
_ (when (string? token) (.set headers "authorization" (str "Bearer " token)))
req (json-request (messages-url base session-id) "POST" headers message)]
req (json-request (messages-url base session-id) "POST" headers
{:message (:message message)})]
(p/let [resp (js/fetch req)
json (.json resp)]
(js->clj json :keywordize-keys true))))
status (.-status resp)
json (parse-json-or-default resp {:ok (<= 200 status 299) :status status})]
json)))

View File

@@ -18,6 +18,18 @@
"session.canceled" "canceled"
nil))
(def ^:private transitions
{"created" #{"running" "paused" "failed" "canceled" "completed"}
"running" #{"running" "paused" "failed" "canceled" "completed"}
"paused" #{"running" "paused" "failed" "canceled" "completed"}
"completed" #{}
"failed" #{}
"canceled" #{}})
(defn transition-allowed?
[from to]
(contains? (get transitions from #{}) to))
(defn append-event [session events {:keys [type data event-id ts]}]
(let [event-id (or event-id (str (random-uuid)))
ts (or ts (common/now-ms))
@@ -30,3 +42,26 @@
updated (assoc session :status next-status :updated-at ts)
events (conj (vec events) event)]
[updated events event]))
(defn enqueue-order [session order]
(update session :pending-orders (fnil conj []) order))
(defn drain-orders [session]
(let [orders (vec (:pending-orders session))]
[orders (assoc session :pending-orders [])]))
(defn filter-events
[events {:keys [since-ts limit]}]
(let [events (if (number? since-ts)
(filter #(> (:ts %) since-ts) events)
events)
events (vec events)]
(cond
(and (number? limit) (pos? limit))
(subvec events 0 (min (count events) limit))
(and (number? limit) (<= limit 0))
[]
:else
events)))

View File

@@ -109,6 +109,16 @@
(forward-request stub do-url "POST" headers nil))
(http/error-response "server error" 500)))))
(defn- handle-control [{:keys [env request url claims route]} control-path]
(let [session-id (get-in route [:path-params :session-id])]
(if-not (string? session-id)
(http/bad-request "invalid session id")
(if-let [^js stub (session-stub env session-id)]
(let [headers (base-headers request claims)
do-url (str (.-origin url) control-path)]
(forward-request stub do-url "POST" headers nil))
(http/error-response "server error" 500)))))
(defn- handle-stream [{:keys [env request url claims route]}]
(let [session-id (get-in route [:path-params :session-id])]
(if-not (string? session-id)
@@ -119,12 +129,26 @@
(forward-request stub do-url "GET" headers nil))
(http/error-response "server error" 500)))))
(defn- handle-events [{:keys [env request url claims route]}]
(let [session-id (get-in route [:path-params :session-id])]
(if-not (string? session-id)
(http/bad-request "invalid session id")
(if-let [^js stub (session-stub env session-id)]
(let [headers (base-headers request claims)
do-url (str (.-origin url) "/__session__/events" (.-search url))]
(forward-request stub do-url "GET" headers nil))
(http/error-response "server error" 500)))))
(defn handle [{:keys [route] :as ctx}]
(case (:handler route)
:sessions/create (handle-create ctx)
:sessions/get (handle-get ctx)
:sessions/messages (handle-messages ctx)
:sessions/pause (handle-control ctx "/__session__/pause")
:sessions/resume (handle-control ctx "/__session__/resume")
:sessions/interrupt (handle-control ctx "/__session__/interrupt")
:sessions/cancel (handle-cancel ctx)
:sessions/events (handle-events ctx)
:sessions/stream (handle-stream ctx)
(http/not-found)))

View File

@@ -27,7 +27,11 @@
["/:session-id"
["" {:methods {"GET" :sessions/get}}]
["/messages" {:methods {"POST" :sessions/messages}}]
["/pause" {:methods {"POST" :sessions/pause}}]
["/resume" {:methods {"POST" :sessions/resume}}]
["/interrupt" {:methods {"POST" :sessions/interrupt}}]
["/cancel" {:methods {"POST" :sessions/cancel}}]
["/events" {:methods {"GET" :sessions/events}}]
["/stream" {:methods {"GET" :sessions/stream}}]]]])
(def ^:private router

View File

@@ -12,9 +12,9 @@
(testing "builds sandbox session endpoints"
(let [base "https://sandbox.example"
session-id "sess-1"]
(is (= "https://sandbox.example/sandbox/sessions"
(sandbox/sessions-url base)))
(is (= "https://sandbox.example/sandbox/sessions/sess-1/messages"
(sandbox/messages-url base session-id)))
(is (= "https://sandbox.example/sandbox/sessions/sess-1/stream"
(sandbox/stream-url base session-id))))))
(is (= "https://sandbox.example/v1/sessions"
(sandbox/sessions-base-url base)))
(is (= "https://sandbox.example/v1/sessions/sess-1"
(sandbox/session-url base session-id)))
(is (= "https://sandbox.example/v1/sessions/sess-1/messages"
(sandbox/messages-url base session-id))))))

View File

@@ -41,3 +41,35 @@
(is (= 1 (count events1)))
(is (string? (:event-id e1)))
(is (number? (:ts e1))))))
(deftest session-event-filter-test
(testing "filters events by since-ts and limit"
(let [events [{:event-id "e1" :ts 10}
{:event-id "e2" :ts 20}
{:event-id "e3" :ts 30}
{:event-id "e4" :ts 40}]]
(is (= ["e3" "e4"]
(map :event-id (session/filter-events events {:since-ts 25}))))
(is (= ["e2" "e3"]
(map :event-id (session/filter-events events {:since-ts 15 :limit 2}))))
(is (= ["e1"]
(map :event-id (session/filter-events events {:limit 1})))))))
(deftest session-transition-allowed-test
(testing "pause/resume transitions are state-aware"
(is (true? (session/transition-allowed? "created" "running")))
(is (true? (session/transition-allowed? "running" "paused")))
(is (true? (session/transition-allowed? "paused" "running")))
(is (false? (session/transition-allowed? "completed" "running")))
(is (false? (session/transition-allowed? "failed" "paused")))))
(deftest session-pending-orders-test
(testing "pending orders queue is append-only and clearable"
(let [base {:id "task-1"
:status "paused"
:pending-orders []}
with-one (session/enqueue-order base {:message "one"})
with-two (session/enqueue-order with-one {:message "two"})
[orders cleared] (session/drain-orders with-two)]
(is (= ["one" "two"] (map :message orders)))
(is (= [] (:pending-orders cleared))))))

View File

@@ -63,3 +63,21 @@
(let [match (routes/match-route "GET" "/sessions/session-4/stream")]
(is (= :sessions/stream (:handler match)))
(is (= "session-4" (get-in match [:path-params :session-id]))))))
(deftest match-route-sessions-events-test
(testing "sessions events routes"
(let [match (routes/match-route "GET" "/sessions/session-9/events")]
(is (= :sessions/events (:handler match)))
(is (= "session-9" (get-in match [:path-params :session-id]))))))
(deftest match-route-sessions-control-test
(testing "sessions control routes"
(let [match (routes/match-route "POST" "/sessions/session-10/pause")]
(is (= :sessions/pause (:handler match)))
(is (= "session-10" (get-in match [:path-params :session-id]))))
(let [match (routes/match-route "POST" "/sessions/session-11/resume")]
(is (= :sessions/resume (:handler match)))
(is (= "session-11" (get-in match [:path-params :session-id]))))
(let [match (routes/match-route "POST" "/sessions/session-12/interrupt")]
(is (= :sessions/interrupt (:handler match)))
(is (= "session-12" (get-in match [:path-params :session-id]))))))

View File

@@ -189,3 +189,39 @@ Errors and idempotency:
}
}
```
## Sandbox Agent Integration Milestones
1) M3.1 Runtime handshake
- Finalize Sandbox Agent auth/config (`SANDBOX_AGENT_URL`, token handling).
- Ensure session provisioning always records sandbox session metadata.
2) M3.2 Event bridge
- Consume `/sandbox/sessions/:id/stream` and map runtime events into the
control-plane event model (`agent.message`, `agent.tool_call`,
`agent.tool_result`, `agent.artifact`, `agent.summary`).
- Persist bridged events and broadcast to `/sessions/:id/stream`.
3) M3.3 Tool approvals
- Add approval flow for privileged tool calls.
- Emit `agent.approval_requested` / `agent.approval_granted` /
`agent.approval_denied` events and enforce decision before continuing.
4) M3.4 Reliability
- Add retry/backoff for sandbox create/message calls.
- Add reconciliation for disconnected streams and delayed event delivery.
- Add idempotent replay cursor for stream resume.
5) M4.1 Observability and audit
- Emit structured logs and metrics for session lifecycle and sandbox calls.
- Add event replay endpoint with filters and pagination.
- Ensure actor identity is attached to all audit events.
6) M4.2 Security hardening
- Enforce per-session scoped credentials and expiry.
- Add policy checks for allowed tools and repo/workdir boundaries.
- Add rate limiting and abuse safeguards per user/workspace.
7) M4.3 Production readiness
- Add integration tests for provisioning, message roundtrip, stream bridge,
approvals, and replay.
- Add runbook, dashboards, alert thresholds, and rollback procedure.