Add forward proxy to remove env, add tests

This commit is contained in:
James Long
2026-02-25 16:02:40 -05:00
parent 9c578006c7
commit 14aaa7a626
8 changed files with 415 additions and 4 deletions

View File

@@ -0,0 +1,38 @@
import { Instance } from "@/project/instance"
import type { MiddlewareHandler } from "hono"
import { getAdaptor } from "./adaptors"
import { Workspace } from "./workspace"
// This middleware forwards all non-GET requests if the workspace is a
// remote. The remote workspace needs to handle session mutations
async function proxySessionRequest(req: Request) {
if (req.method === "GET") return
if (!Instance.directory.startsWith("wrk_")) return
const workspace = await Workspace.get(Instance.directory)
if (!workspace) {
return new Response(`Workspace not found: ${Instance.directory}`, {
status: 500,
headers: {
"content-type": "text/plain; charset=utf-8",
},
})
}
if (workspace.config.type === "worktree") return
const url = new URL(req.url)
const body = req.method === "HEAD" ? undefined : await req.arrayBuffer()
return getAdaptor(workspace.config).request(
workspace.config,
req.method,
`${url.pathname}${url.search}`,
body,
req.signal,
)
}
export const SessionProxyMiddleware: MiddlewareHandler = async (c, next) => {
const response = await proxySessionRequest(c.req.raw)
if (response) return response
await next()
}

View File

@@ -3,20 +3,22 @@ import { SessionRoutes } from "../../server/routes/session"
import { WorkspaceServerRoutes } from "./routes"
export namespace WorkspaceServer {
export function Listen(opts: { hostname: string; port: number }) {
const sessionMutationRoutes = new Hono()
export function App() {
const session = new Hono()
.use("*", async (c, next) => {
if (c.req.method === "GET") return c.notFound()
await next()
})
.route("/", SessionRoutes())
const app = new Hono().route("/session", sessionMutationRoutes).route("/", WorkspaceServerRoutes())
return new Hono().route("/session", session).route("/", WorkspaceServerRoutes())
}
export function Listen(opts: { hostname: string; port: number }) {
return Bun.serve({
hostname: opts.hostname,
port: opts.port,
fetch: app.fetch,
fetch: App().fetch,
})
}
}

View File

@@ -16,11 +16,13 @@ import { Log } from "../../util/log"
import { PermissionNext } from "@/permission/next"
import { errors } from "../error"
import { lazy } from "../../util/lazy"
import { SessionProxyMiddleware } from "../../control-plane/session-proxy-middleware"
const log = Log.create({ service: "server" })
export const SessionRoutes = lazy(() =>
new Hono()
.use(SessionProxyMiddleware)
.get(
"/",
describeRoute({

View File

@@ -0,0 +1,144 @@
import { afterEach, describe, expect, mock, test } from "bun:test"
import { Hono } from "hono"
import { tmpdir } from "../fixture/fixture"
import { Project } from "../../src/project/project"
import { WorkspaceTable } from "../../src/control-plane/workspace.sql"
import { Instance } from "../../src/project/instance"
import { Database } from "../../src/storage/db"
import { resetDatabase } from "../fixture/db"
afterEach(async () => {
mock.restore()
await resetDatabase()
})
type State = {
workspaceID: string
calls: Array<{ method: string; url: string; body?: string }>
}
const remote = { type: "testing", name: "remote-a" } as unknown as typeof WorkspaceTable.$inferInsert.config
async function setup(state: State) {
mock.module("../../src/control-plane/adaptors", () => ({
getAdaptor: () => ({
request: async (_config: unknown, method: string, url: string, data?: BodyInit) => {
const body = data ? await new Response(data).text() : undefined
state.calls.push({ method, url, body })
return new Response("proxied", { status: 202 })
},
}),
}))
await using tmp = await tmpdir({ git: true })
const { project } = await Project.fromDirectory(tmp.path)
Database.use((db) => {
db.delete(WorkspaceTable).run()
})
Database.use((db) =>
db
.insert(WorkspaceTable)
.values([
{
id: "wrk_1",
branch: "main",
project_id: project.id,
config: remote,
},
{
id: "wrk_2",
branch: "main",
project_id: project.id,
config: { type: "worktree", directory: tmp.path },
},
])
.run(),
)
const { SessionProxyMiddleware } = await import("../../src/control-plane/session-proxy-middleware")
const app = new Hono().use(SessionProxyMiddleware)
return {
app,
async request(input: RequestInfo | URL, init?: RequestInit) {
return Instance.provide({
directory: state.workspaceID,
fn: async () => app.request(input, init),
})
},
}
}
describe("control-plane/session-proxy-middleware", () => {
test("forwards non-GET session requests for remote workspaces", async () => {
const state: State = {
workspaceID: "wrk_1",
calls: [],
}
const ctx = await setup(state)
ctx.app.post("/session/foo", (c) => c.text("local", 200))
const response = await ctx.request("http://workspace.test/session/foo?x=1", {
method: "POST",
body: JSON.stringify({ hello: "world" }),
headers: {
"content-type": "application/json",
},
})
expect(response.status).toBe(202)
expect(await response.text()).toBe("proxied")
expect(state.calls).toEqual([
{
method: "POST",
url: "/session/foo?x=1",
body: '{"hello":"world"}',
},
])
})
test("does not forward GET requests", async () => {
const state: State = {
workspaceID: "wrk_1",
calls: [],
}
const ctx = await setup(state)
ctx.app.get("/session/foo", (c) => c.text("local", 200))
const response = await ctx.request("http://workspace.test/session/foo?x=1")
expect(response.status).toBe(200)
expect(await response.text()).toBe("local")
expect(state.calls).toEqual([])
})
test("does not forward GET or POST requests for worktree workspaces", async () => {
const state: State = {
workspaceID: "wrk_2",
calls: [],
}
const ctx = await setup(state)
ctx.app.get("/session/foo", (c) => c.text("local-get", 200))
ctx.app.post("/session/foo", (c) => c.text("local-post", 200))
const getResponse = await ctx.request("http://workspace.test/session/foo?x=1")
const postResponse = await ctx.request("http://workspace.test/session/foo?x=1", {
method: "POST",
body: JSON.stringify({ hello: "world" }),
headers: {
"content-type": "application/json",
},
})
expect(getResponse.status).toBe(200)
expect(await getResponse.text()).toBe("local-get")
expect(postResponse.status).toBe(200)
expect(await postResponse.text()).toBe("local-post")
expect(state.calls).toEqual([])
})
})

View File

@@ -0,0 +1,56 @@
import { afterEach, describe, expect, test } from "bun:test"
import { parseSSE } from "../../src/control-plane/sse"
import { resetDatabase } from "../fixture/db"
afterEach(async () => {
await resetDatabase()
})
function stream(chunks: string[]) {
return new ReadableStream<Uint8Array>({
start(controller) {
const encoder = new TextEncoder()
chunks.forEach((chunk) => controller.enqueue(encoder.encode(chunk)))
controller.close()
},
})
}
describe("control-plane/sse", () => {
test("parses JSON events with CRLF and multiline data blocks", async () => {
const events: unknown[] = []
const stop = new AbortController()
await parseSSE(
stream([
'data: {"type":"one","properties":{"ok":true}}\r\n\r\n',
'data: {"type":"two",\r\ndata: "properties":{"n":2}}\r\n\r\n',
]),
stop.signal,
(event) => events.push(event),
)
expect(events).toEqual([
{ type: "one", properties: { ok: true } },
{ type: "two", properties: { n: 2 } },
])
})
test("falls back to sse.message for non-json payload", async () => {
const events: unknown[] = []
const stop = new AbortController()
await parseSSE(stream(["id: abc\nretry: 1500\ndata: hello world\n\n"]), stop.signal, (event) => events.push(event))
expect(events).toEqual([
{
type: "sse.message",
properties: {
data: "hello world",
id: "abc",
retry: 1500,
},
},
])
})
})

View File

@@ -0,0 +1,65 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Log } from "../../src/util/log"
import { WorkspaceServer } from "../../src/control-plane/workspace-server/server"
import { parseSSE } from "../../src/control-plane/sse"
import { GlobalBus } from "../../src/bus/global"
import { resetDatabase } from "../fixture/db"
afterEach(async () => {
await resetDatabase()
})
Log.init({ print: false })
describe("control-plane/workspace-server SSE", () => {
test("streams GlobalBus events and parseSSE reads them", async () => {
const app = WorkspaceServer.App()
const stop = new AbortController()
const seen: unknown[] = []
try {
const response = await app.request("/event", {
signal: stop.signal,
})
expect(response.status).toBe(200)
expect(response.body).toBeDefined()
const done = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("timed out waiting for workspace.test event"))
}, 3000)
void parseSSE(response.body!, stop.signal, (event) => {
seen.push(event)
const next = event as { type?: string }
if (next.type === "server.connected") {
GlobalBus.emit("event", {
payload: {
type: "workspace.test",
properties: { ok: true },
},
})
return
}
if (next.type !== "workspace.test") return
clearTimeout(timeout)
resolve()
}).catch((error) => {
clearTimeout(timeout)
reject(error)
})
})
await done
expect(seen.some((event) => (event as { type?: string }).type === "server.connected")).toBe(true)
expect(seen).toContainEqual({
type: "workspace.test",
properties: { ok: true },
})
} finally {
stop.abort()
}
})
})

View File

@@ -0,0 +1,93 @@
import { afterEach, describe, expect, mock, test } from "bun:test"
import { Log } from "../../src/util/log"
import { tmpdir } from "../fixture/fixture"
import { Project } from "../../src/project/project"
import { Database } from "../../src/storage/db"
import { WorkspaceTable } from "../../src/control-plane/workspace.sql"
import { GlobalBus } from "../../src/bus/global"
import { resetDatabase } from "../fixture/db"
afterEach(async () => {
mock.restore()
await resetDatabase()
})
Log.init({ print: false })
const seen: string[] = []
const remote = { type: "testing", name: "remote-a" } as unknown as typeof WorkspaceTable.$inferInsert.config
mock.module("../../src/control-plane/adaptors", () => ({
getAdaptor: (config: { type: string }) => {
seen.push(config.type)
return {
async create() {
throw new Error("not used")
},
async remove() {},
async request() {
const body = new ReadableStream<Uint8Array>({
start(controller) {
const encoder = new TextEncoder()
controller.enqueue(encoder.encode('data: {"type":"remote.ready","properties":{}}\n\n'))
controller.close()
},
})
return new Response(body, {
status: 200,
headers: {
"content-type": "text/event-stream",
},
})
},
}
},
}))
describe("control-plane/workspace.startSyncing", () => {
test("syncs only remote workspaces and emits remote SSE events", async () => {
const { Workspace } = await import("../../src/control-plane/workspace")
await using tmp = await tmpdir({ git: true })
const { project } = await Project.fromDirectory(tmp.path)
Database.use((db) =>
db
.insert(WorkspaceTable)
.values([
{
id: "wrk_1",
branch: "main",
project_id: project.id,
config: remote,
},
{
id: "wrk_2",
branch: "main",
project_id: project.id,
config: { type: "worktree", directory: tmp.path },
},
])
.run(),
)
const done = new Promise<void>((resolve) => {
const listener = (event: { directory?: string; payload: { type: string } }) => {
if (event.directory !== "wrk_1") return
if (event.payload.type !== "remote.ready") return
GlobalBus.off("event", listener)
resolve()
}
GlobalBus.on("event", listener)
})
const sync = Workspace.startSyncing(project)
await Promise.race([
done,
new Promise((_, reject) => setTimeout(() => reject(new Error("timed out waiting for sync event")), 2000)),
])
await sync.stop()
expect(seen).toContain("testing")
expect(seen).not.toContain("worktree")
})
})

View File

@@ -0,0 +1,11 @@
import { rm } from "fs/promises"
import { Instance } from "../../src/project/instance"
import { Database } from "../../src/storage/db"
export async function resetDatabase() {
await Instance.disposeAll().catch(() => undefined)
Database.close()
await rm(Database.Path, { force: true }).catch(() => undefined)
await rm(`${Database.Path}-wal`, { force: true }).catch(() => undefined)
await rm(`${Database.Path}-shm`, { force: true }).catch(() => undefined)
}