mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-16 01:22:58 +00:00
1532 lines
58 KiB
TypeScript
1532 lines
58 KiB
TypeScript
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"
|
|
import { $ } from "bun"
|
|
import fs from "node:fs/promises"
|
|
import Http from "node:http"
|
|
import path from "node:path"
|
|
import { setTimeout as delay } from "node:timers/promises"
|
|
import { NodeHttpServer } from "@effect/platform-node"
|
|
import { Effect, Layer } from "effect"
|
|
import { HttpServer, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
|
|
import { eq } from "drizzle-orm"
|
|
import * as Log from "@opencode-ai/core/util/log"
|
|
import { Flag } from "@opencode-ai/core/flag/flag"
|
|
import { GlobalBus, type GlobalEvent } from "@/bus/global"
|
|
import { Database } from "@/storage/db"
|
|
import { ProjectID } from "@/project/schema"
|
|
import { ProjectTable } from "@/project/project.sql"
|
|
import { Instance } from "@/project/instance"
|
|
import { WithInstance } from "../../src/project/with-instance"
|
|
import { Session as SessionNs } from "@/session/session"
|
|
import { SessionID } from "@/session/schema"
|
|
import { SessionTable } from "@/session/session.sql"
|
|
import { SyncEvent } from "@/sync"
|
|
import { EventSequenceTable } from "@/sync/event.sql"
|
|
import { resetDatabase } from "../fixture/db"
|
|
import { disposeAllInstances, provideTmpdirInstance, tmpdir } from "../fixture/fixture"
|
|
import { testEffect } from "../lib/effect"
|
|
import { registerAdapter } from "../../src/control-plane/adapters"
|
|
import { WorkspaceID } from "../../src/control-plane/schema"
|
|
import { WorkspaceTable } from "../../src/control-plane/workspace.sql"
|
|
import type { Target, WorkspaceAdapter, WorkspaceInfo } from "../../src/control-plane/types"
|
|
import * as Workspace from "../../src/control-plane/workspace"
|
|
import { AppRuntime } from "@/effect/app-runtime"
|
|
import { InstanceStore } from "@/project/instance-store"
|
|
import { InstanceBootstrap } from "@/project/bootstrap"
|
|
|
|
void Log.init({ print: false })
|
|
|
|
const testServerLayer = Layer.mergeAll(
|
|
NodeHttpServer.layer(Http.createServer, { host: "127.0.0.1", port: 0 }),
|
|
Workspace.defaultLayer.pipe(Layer.provide(InstanceStore.defaultLayer), Layer.provide(InstanceBootstrap.defaultLayer)),
|
|
SessionNs.defaultLayer,
|
|
)
|
|
const it = testEffect(testServerLayer)
|
|
|
|
const originalWorkspacesFlag = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
|
|
const originalEnv = {
|
|
OPENCODE_AUTH_CONTENT: process.env.OPENCODE_AUTH_CONTENT,
|
|
OTEL_EXPORTER_OTLP_HEADERS: process.env.OTEL_EXPORTER_OTLP_HEADERS,
|
|
OTEL_EXPORTER_OTLP_ENDPOINT: process.env.OTEL_EXPORTER_OTLP_ENDPOINT,
|
|
OTEL_RESOURCE_ATTRIBUTES: process.env.OTEL_RESOURCE_ATTRIBUTES,
|
|
}
|
|
|
|
type RecordedCreate = {
|
|
info: WorkspaceInfo
|
|
env: Record<string, string | undefined>
|
|
from?: WorkspaceInfo
|
|
}
|
|
|
|
type RecordedAdapter = {
|
|
adapter: WorkspaceAdapter
|
|
calls: {
|
|
configure: WorkspaceInfo[]
|
|
create: RecordedCreate[]
|
|
list: number
|
|
remove: WorkspaceInfo[]
|
|
target: WorkspaceInfo[]
|
|
}
|
|
}
|
|
|
|
type FetchCall = {
|
|
url: URL
|
|
method: string
|
|
headers: Headers
|
|
bodyText?: string
|
|
json?: unknown
|
|
}
|
|
|
|
function unique(prefix: string) {
|
|
return `${prefix}-${Math.random().toString(36).slice(2)}`
|
|
}
|
|
|
|
function restoreEnv() {
|
|
Object.entries(originalEnv).forEach(([key, value]) => {
|
|
if (value === undefined) {
|
|
delete process.env[key]
|
|
return
|
|
}
|
|
process.env[key] = value
|
|
})
|
|
}
|
|
|
|
beforeEach(() => {
|
|
Database.close()
|
|
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
|
|
restoreEnv()
|
|
})
|
|
|
|
afterEach(async () => {
|
|
mock.restore()
|
|
await disposeAllInstances()
|
|
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = originalWorkspacesFlag
|
|
restoreEnv()
|
|
await resetDatabase()
|
|
})
|
|
|
|
async function withInstance<T>(fn: (dir: string) => T | Promise<T>) {
|
|
await using tmp = await tmpdir({ git: true })
|
|
return WithInstance.provide({
|
|
directory: tmp.path,
|
|
fn: () => fn(tmp.path),
|
|
})
|
|
}
|
|
|
|
async function initGitRepo(dir: string) {
|
|
await fs.mkdir(dir, { recursive: true })
|
|
await $`git init`.cwd(dir).quiet()
|
|
await $`git config core.fsmonitor false`.cwd(dir).quiet()
|
|
await $`git config commit.gpgsign false`.cwd(dir).quiet()
|
|
await $`git config user.email "test@opencode.test"`.cwd(dir).quiet()
|
|
await $`git config user.name "Test"`.cwd(dir).quiet()
|
|
await fs.writeFile(path.join(dir, "tracked.txt"), "base\n")
|
|
await $`git add tracked.txt`.cwd(dir).quiet()
|
|
await $`git commit -m "base"`.cwd(dir).quiet()
|
|
}
|
|
|
|
const runWorkspace = <A, E>(effect: Effect.Effect<A, E, Workspace.Service>) => AppRuntime.runPromise(effect)
|
|
const createWorkspace = (input: Workspace.CreateInput) =>
|
|
runWorkspace(Workspace.Service.use((workspace) => workspace.create(input)))
|
|
const warpWorkspaceSession = (input: Workspace.SessionWarpInput) =>
|
|
runWorkspace(Workspace.Service.use((workspace) => workspace.sessionWarp(input)))
|
|
const listWorkspaces = (project: Parameters<Workspace.Interface["list"]>[0]) =>
|
|
runWorkspace(Workspace.Service.use((workspace) => workspace.list(project)))
|
|
const syncListWorkspaces = (project: Parameters<Workspace.Interface["syncList"]>[0]) =>
|
|
runWorkspace(Workspace.Service.use((workspace) => workspace.syncList(project)))
|
|
const getWorkspace = (id: WorkspaceID) => runWorkspace(Workspace.Service.use((workspace) => workspace.get(id)))
|
|
const removeWorkspace = (id: WorkspaceID) => runWorkspace(Workspace.Service.use((workspace) => workspace.remove(id)))
|
|
const workspaceStatus = () => runWorkspace(Workspace.Service.use((workspace) => workspace.status()))
|
|
const isWorkspaceSyncing = (id: WorkspaceID) =>
|
|
runWorkspace(Workspace.Service.use((workspace) => workspace.isSyncing(id)))
|
|
const startWorkspaceSyncing = (projectID: ProjectID) => {
|
|
void runWorkspace(Workspace.Service.use((workspace) => workspace.startWorkspaceSyncing(projectID)))
|
|
}
|
|
const waitForWorkspaceSync = (workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) =>
|
|
runWorkspace(Workspace.Service.use((workspace) => workspace.waitForSync(workspaceID, state, signal)))
|
|
|
|
function captureGlobalEvents() {
|
|
const events: GlobalEvent[] = []
|
|
const handler = (event: GlobalEvent) => events.push(event)
|
|
GlobalBus.on("event", handler)
|
|
return {
|
|
events,
|
|
dispose() {
|
|
GlobalBus.off("event", handler)
|
|
},
|
|
}
|
|
}
|
|
|
|
async function eventually<T>(fn: () => T | Promise<T>, timeout = 1500) {
|
|
const started = Date.now()
|
|
let last: unknown
|
|
while (Date.now() - started < timeout) {
|
|
try {
|
|
return await fn()
|
|
} catch (err) {
|
|
last = err
|
|
await delay(10)
|
|
}
|
|
}
|
|
throw last ?? new Error("Timed out waiting for condition")
|
|
}
|
|
|
|
function eventuallyEffect(effect: Effect.Effect<void>, timeout = 1500) {
|
|
return Effect.gen(function* () {
|
|
const started = Date.now()
|
|
let last: unknown
|
|
while (Date.now() - started < timeout) {
|
|
const exit = yield* Effect.exit(effect)
|
|
if (exit._tag === "Success") return
|
|
last = exit.cause
|
|
yield* Effect.sleep("10 millis")
|
|
}
|
|
throw last ?? new Error("Timed out waiting for condition")
|
|
})
|
|
}
|
|
|
|
function recordedAdapter(input: {
|
|
target: (info: WorkspaceInfo) => Target | Promise<Target>
|
|
configure?: (info: WorkspaceInfo) => WorkspaceInfo | Promise<WorkspaceInfo>
|
|
create?: (info: WorkspaceInfo, env: Record<string, string | undefined>, from?: WorkspaceInfo) => Promise<void>
|
|
list?: () => Omit<WorkspaceInfo, "id">[] | Promise<Omit<WorkspaceInfo, "id">[]>
|
|
remove?: (info: WorkspaceInfo) => Promise<void>
|
|
}): RecordedAdapter {
|
|
const calls: RecordedAdapter["calls"] = {
|
|
configure: [],
|
|
create: [],
|
|
list: 0,
|
|
remove: [],
|
|
target: [],
|
|
}
|
|
|
|
return {
|
|
calls,
|
|
adapter: {
|
|
name: "recorded",
|
|
description: "recorded",
|
|
configure(info) {
|
|
calls.configure.push(structuredClone(info))
|
|
return input.configure?.(info) ?? info
|
|
},
|
|
async create(info, env, from) {
|
|
calls.create.push({
|
|
info: structuredClone(info),
|
|
env: { ...env },
|
|
from: from ? structuredClone(from) : undefined,
|
|
})
|
|
await input.create?.(info, env, from)
|
|
},
|
|
...(input.list
|
|
? {
|
|
async list() {
|
|
calls.list += 1
|
|
return input.list?.() ?? []
|
|
},
|
|
}
|
|
: {}),
|
|
async remove(info) {
|
|
calls.remove.push(structuredClone(info))
|
|
await input.remove?.(info)
|
|
},
|
|
target(info) {
|
|
calls.target.push(structuredClone(info))
|
|
return input.target(info)
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
function localAdapter(dir: string, input?: { createDir?: boolean; remove?: (info: WorkspaceInfo) => Promise<void> }) {
|
|
return recordedAdapter({
|
|
configure(info) {
|
|
return { ...info, directory: dir }
|
|
},
|
|
async create() {
|
|
if (input?.createDir === false) return
|
|
await fs.mkdir(dir, { recursive: true })
|
|
},
|
|
remove: input?.remove,
|
|
target() {
|
|
return { type: "local", directory: dir }
|
|
},
|
|
})
|
|
}
|
|
|
|
function remoteAdapter(url: string, input?: { directory?: string | null; headers?: HeadersInit }) {
|
|
return recordedAdapter({
|
|
configure(info) {
|
|
return { ...info, directory: input?.directory ?? info.directory }
|
|
},
|
|
target() {
|
|
return { type: "remote", url, headers: input?.headers }
|
|
},
|
|
})
|
|
}
|
|
|
|
function eventStreamResponse(events: unknown[] = [], keepOpen = true) {
|
|
const encoder = new TextEncoder()
|
|
return new Response(
|
|
new ReadableStream<Uint8Array>({
|
|
start(controller) {
|
|
if (keepOpen) controller.enqueue(encoder.encode(":\n\n"))
|
|
events.forEach((event) => controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`)))
|
|
if (!keepOpen) controller.close()
|
|
},
|
|
}),
|
|
{ status: 200, headers: { "content-type": "text/event-stream" } },
|
|
)
|
|
}
|
|
|
|
function serverUrl() {
|
|
return Effect.gen(function* () {
|
|
return HttpServer.formatAddress((yield* HttpServer.HttpServer).address)
|
|
})
|
|
}
|
|
|
|
function workspaceInfo(projectID: ProjectID, type: string, input?: Partial<Workspace.Info>): Workspace.Info {
|
|
return {
|
|
id: input?.id ?? WorkspaceID.ascending(),
|
|
type,
|
|
name: input?.name ?? unique("workspace"),
|
|
branch: input?.branch ?? null,
|
|
directory: input?.directory ?? null,
|
|
extra: input?.extra ?? null,
|
|
projectID,
|
|
timeUsed: input?.timeUsed ?? Date.now(),
|
|
}
|
|
}
|
|
|
|
function insertWorkspace(info: Workspace.Info) {
|
|
Database.use((db) =>
|
|
db
|
|
.insert(WorkspaceTable)
|
|
.values({
|
|
id: info.id,
|
|
type: info.type,
|
|
branch: info.branch,
|
|
name: info.name,
|
|
directory: info.directory,
|
|
extra: info.extra,
|
|
project_id: info.projectID,
|
|
time_used: info.timeUsed,
|
|
})
|
|
.run(),
|
|
)
|
|
}
|
|
|
|
function insertProject(id: ProjectID, worktree: string) {
|
|
Database.use((db) =>
|
|
db
|
|
.insert(ProjectTable)
|
|
.values({
|
|
id,
|
|
worktree,
|
|
vcs: null,
|
|
name: null,
|
|
time_created: Date.now(),
|
|
time_updated: Date.now(),
|
|
sandboxes: [],
|
|
})
|
|
.run(),
|
|
)
|
|
}
|
|
|
|
function attachSessionToWorkspace(sessionID: SessionID, workspaceID: WorkspaceID) {
|
|
Database.use((db) =>
|
|
db.update(SessionTable).set({ workspace_id: workspaceID }).where(eq(SessionTable.id, sessionID)).run(),
|
|
)
|
|
}
|
|
|
|
function sessionSequence(sessionID: SessionID) {
|
|
return Database.use((db) =>
|
|
db
|
|
.select({ seq: EventSequenceTable.seq })
|
|
.from(EventSequenceTable)
|
|
.where(eq(EventSequenceTable.aggregate_id, sessionID))
|
|
.get(),
|
|
)?.seq
|
|
}
|
|
|
|
function sessionSequenceOwner(sessionID: SessionID) {
|
|
return Database.use((db) =>
|
|
db
|
|
.select({ ownerID: EventSequenceTable.owner_id })
|
|
.from(EventSequenceTable)
|
|
.where(eq(EventSequenceTable.aggregate_id, sessionID))
|
|
.get(),
|
|
)?.ownerID
|
|
}
|
|
|
|
function sessionUpdatedType() {
|
|
return SyncEvent.versionedType(SessionNs.Event.Updated.type, SessionNs.Event.Updated.version)
|
|
}
|
|
|
|
describe("workspace schemas and exports", () => {
|
|
test("keeps the historical event type names", () => {
|
|
expect(Workspace.Event.Ready.type).toBe("workspace.ready")
|
|
expect(Workspace.Event.Failed.type).toBe("workspace.failed")
|
|
expect(Workspace.Event.Status.type).toBe("workspace.status")
|
|
})
|
|
|
|
test("validates create input with workspace id, project id, branch, type, and extra", () => {
|
|
const input = {
|
|
id: WorkspaceID.ascending("wrk_schema_create"),
|
|
type: "worktree",
|
|
branch: "feature/schema",
|
|
projectID: ProjectID.make("project-schema"),
|
|
extra: { nested: true },
|
|
}
|
|
|
|
expect(Workspace.CreateInput.zod.parse(input)).toEqual(input)
|
|
expect(() => Workspace.CreateInput.zod.parse({ ...input, id: "bad" })).toThrow()
|
|
expect(() => Workspace.CreateInput.zod.parse({ ...input, branch: 1 })).toThrow()
|
|
})
|
|
})
|
|
|
|
describe("workspace CRUD", () => {
|
|
test("get returns undefined for a missing workspace", async () => {
|
|
await withInstance(async () => {
|
|
expect(await getWorkspace(WorkspaceID.ascending("wrk_missing_get"))).toBeUndefined()
|
|
})
|
|
})
|
|
|
|
test("list maps database rows, filters by project, and sorts by id", async () => {
|
|
await withInstance(async () => {
|
|
const otherProjectID = ProjectID.make("project-other")
|
|
insertProject(otherProjectID, "/tmp/other")
|
|
const a = workspaceInfo(Instance.project.id, "manual", {
|
|
id: WorkspaceID.ascending("wrk_a_list"),
|
|
branch: "a",
|
|
directory: "/a",
|
|
extra: { a: true },
|
|
})
|
|
const b = workspaceInfo(Instance.project.id, "manual", {
|
|
id: WorkspaceID.ascending("wrk_b_list"),
|
|
branch: "b",
|
|
directory: "/b",
|
|
extra: ["b"],
|
|
})
|
|
const other = workspaceInfo(otherProjectID, "manual", { id: WorkspaceID.ascending("wrk_c_list") })
|
|
insertWorkspace(b)
|
|
insertWorkspace(other)
|
|
insertWorkspace(a)
|
|
|
|
expect(await listWorkspaces(Instance.project)).toEqual([a, b])
|
|
})
|
|
})
|
|
|
|
test("create configures, persists, creates, starts local sync, and passes environment", async () => {
|
|
await withInstance(async (dir) => {
|
|
process.env.OPENCODE_AUTH_CONTENT = JSON.stringify({ test: { type: "api", key: "secret" } })
|
|
process.env.OTEL_EXPORTER_OTLP_HEADERS = "authorization=otel"
|
|
process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "https://otel.test"
|
|
process.env.OTEL_RESOURCE_ATTRIBUTES = "service.name=opencode-test"
|
|
|
|
const workspaceID = WorkspaceID.ascending("wrk_create_local")
|
|
const type = unique("create-local")
|
|
const targetDir = path.join(dir, "created-local")
|
|
const recorded = recordedAdapter({
|
|
configure(info) {
|
|
return {
|
|
...info,
|
|
branch: "configured-branch",
|
|
name: "Configured Name",
|
|
directory: targetDir,
|
|
extra: { configured: true },
|
|
}
|
|
},
|
|
async create() {
|
|
await fs.mkdir(targetDir, { recursive: true })
|
|
},
|
|
target() {
|
|
return { type: "local", directory: targetDir }
|
|
},
|
|
})
|
|
registerAdapter(Instance.project.id, type, recorded.adapter)
|
|
|
|
const info = await createWorkspace({
|
|
id: workspaceID,
|
|
type,
|
|
branch: null,
|
|
projectID: Instance.project.id,
|
|
extra: null,
|
|
})
|
|
|
|
expect(info).toEqual({
|
|
id: workspaceID,
|
|
type,
|
|
branch: "configured-branch",
|
|
name: "Configured Name",
|
|
directory: targetDir,
|
|
extra: { configured: true },
|
|
projectID: Instance.project.id,
|
|
timeUsed: info.timeUsed,
|
|
})
|
|
expect(await getWorkspace(workspaceID)).toEqual(info)
|
|
expect(await listWorkspaces(Instance.project)).toEqual([info])
|
|
expect(recorded.calls.configure).toHaveLength(1)
|
|
expect(recorded.calls.configure[0]).toMatchObject({ id: workspaceID, type, directory: null })
|
|
expect(recorded.calls.create).toHaveLength(1)
|
|
expect(recorded.calls.create[0].info).toEqual({
|
|
id: workspaceID,
|
|
type,
|
|
branch: "configured-branch",
|
|
name: "Configured Name",
|
|
directory: targetDir,
|
|
extra: { configured: true },
|
|
projectID: Instance.project.id,
|
|
})
|
|
expect(JSON.parse(recorded.calls.create[0].env.OPENCODE_AUTH_CONTENT ?? "{}")).toEqual({
|
|
test: { type: "api", key: "secret" },
|
|
})
|
|
expect(recorded.calls.create[0].env.OPENCODE_WORKSPACE_ID).toBe(workspaceID)
|
|
expect(recorded.calls.create[0].env.OPENCODE_EXPERIMENTAL_WORKSPACES).toBe("true")
|
|
expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_HEADERS).toBe("authorization=otel")
|
|
expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_ENDPOINT).toBe("https://otel.test")
|
|
expect(recorded.calls.create[0].env.OTEL_RESOURCE_ATTRIBUTES).toBe("service.name=opencode-test")
|
|
expect((await workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBe("connected")
|
|
|
|
await removeWorkspace(workspaceID)
|
|
expect((await workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBeUndefined()
|
|
})
|
|
})
|
|
|
|
test("create propagates configure failures and does not insert a workspace", async () => {
|
|
await withInstance(async () => {
|
|
const type = unique("configure-failure")
|
|
registerAdapter(
|
|
Instance.project.id,
|
|
type,
|
|
recordedAdapter({
|
|
configure() {
|
|
throw new Error("configure exploded")
|
|
},
|
|
target() {
|
|
return { type: "local", directory: "/unused" }
|
|
},
|
|
}).adapter,
|
|
)
|
|
|
|
await expect(
|
|
createWorkspace({ type, branch: null, projectID: Instance.project.id, extra: null }),
|
|
).rejects.toThrow("configure exploded")
|
|
expect(await listWorkspaces(Instance.project)).toEqual([])
|
|
})
|
|
})
|
|
|
|
test("create leaves the inserted row when adapter create fails", async () => {
|
|
await withInstance(async () => {
|
|
const type = unique("create-failure")
|
|
const recorded = recordedAdapter({
|
|
async create() {
|
|
throw new Error("create exploded")
|
|
},
|
|
target() {
|
|
return { type: "local", directory: "/unused" }
|
|
},
|
|
})
|
|
registerAdapter(Instance.project.id, type, recorded.adapter)
|
|
|
|
await expect(
|
|
createWorkspace({ type, branch: "branch", projectID: Instance.project.id, extra: { x: 1 } }),
|
|
).rejects.toThrow("create exploded")
|
|
|
|
const rows = await listWorkspaces(Instance.project)
|
|
expect(rows).toHaveLength(1)
|
|
expect(rows[0]).toMatchObject({ type, branch: "branch", extra: { x: 1 } })
|
|
expect(recorded.calls.target).toHaveLength(0)
|
|
await removeWorkspace(rows[0].id)
|
|
})
|
|
})
|
|
|
|
test("create returns after a local workspace reports error", async () => {
|
|
await withInstance(async (dir) => {
|
|
const type = unique("local-error")
|
|
const missing = path.join(dir, "missing-local-target")
|
|
const recorded = localAdapter(missing, { createDir: false })
|
|
registerAdapter(Instance.project.id, type, recorded.adapter)
|
|
|
|
const info = await createWorkspace({ type, branch: null, projectID: Instance.project.id, extra: null })
|
|
|
|
expect(info.directory).toBe(missing)
|
|
expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBe("error")
|
|
await removeWorkspace(info.id)
|
|
})
|
|
})
|
|
|
|
test("syncList registers adapter-listed workspaces that are missing by name", async () => {
|
|
await withInstance(async (dir) => {
|
|
const type = unique("list-sync")
|
|
const existing = workspaceInfo(Instance.project.id, type, {
|
|
id: WorkspaceID.ascending("wrk_list_sync_existing"),
|
|
name: "existing",
|
|
directory: path.join(dir, "existing"),
|
|
})
|
|
insertWorkspace(existing)
|
|
|
|
const discovered = {
|
|
type,
|
|
name: "discovered",
|
|
branch: "feature/discovered",
|
|
directory: path.join(dir, "discovered"),
|
|
extra: { source: "adapter" },
|
|
projectID: Instance.project.id,
|
|
}
|
|
const recorded = recordedAdapter({
|
|
list() {
|
|
return [
|
|
{
|
|
type,
|
|
name: existing.name,
|
|
branch: "ignored",
|
|
directory: path.join(dir, "ignored"),
|
|
extra: null,
|
|
projectID: Instance.project.id,
|
|
},
|
|
discovered,
|
|
]
|
|
},
|
|
target(info) {
|
|
return { type: "local", directory: info.directory ?? dir }
|
|
},
|
|
})
|
|
registerAdapter(Instance.project.id, type, recorded.adapter)
|
|
|
|
await syncListWorkspaces(Instance.project)
|
|
const synced = (await listWorkspaces(Instance.project)).filter((item) => item.name === discovered.name)
|
|
|
|
expect(synced).toHaveLength(1)
|
|
expect(synced[0]).toMatchObject(discovered)
|
|
expect(synced[0]?.id).toStartWith("wrk_")
|
|
expect(await listWorkspaces(Instance.project)).toEqual(expect.arrayContaining([existing, synced[0]]))
|
|
expect(recorded.calls.list).toBe(1)
|
|
expect(recorded.calls.configure).toHaveLength(0)
|
|
expect(recorded.calls.create).toHaveLength(0)
|
|
expect(recorded.calls.target).toHaveLength(1)
|
|
})
|
|
})
|
|
|
|
test("syncList calls every registered adapter with a list method", async () => {
|
|
await withInstance(async (dir) => {
|
|
const typeA = unique("list-sync-a")
|
|
const typeB = unique("list-sync-b")
|
|
const adapterA = recordedAdapter({
|
|
list() {
|
|
return [
|
|
{
|
|
type: typeA,
|
|
name: "adapter-a",
|
|
branch: null,
|
|
directory: path.join(dir, "adapter-a"),
|
|
extra: null,
|
|
projectID: Instance.project.id,
|
|
},
|
|
]
|
|
},
|
|
target(info) {
|
|
return { type: "local", directory: info.directory ?? dir }
|
|
},
|
|
})
|
|
const adapterB = recordedAdapter({
|
|
list() {
|
|
return [
|
|
{
|
|
type: typeB,
|
|
name: "adapter-b",
|
|
branch: null,
|
|
directory: path.join(dir, "adapter-b"),
|
|
extra: null,
|
|
projectID: Instance.project.id,
|
|
},
|
|
]
|
|
},
|
|
target(info) {
|
|
return { type: "local", directory: info.directory ?? dir }
|
|
},
|
|
})
|
|
const noList = recordedAdapter({
|
|
target() {
|
|
return { type: "local", directory: dir }
|
|
},
|
|
})
|
|
registerAdapter(Instance.project.id, typeA, adapterA.adapter)
|
|
registerAdapter(Instance.project.id, typeB, adapterB.adapter)
|
|
registerAdapter(Instance.project.id, unique("list-sync-none"), noList.adapter)
|
|
|
|
await syncListWorkspaces(Instance.project)
|
|
const synced = await listWorkspaces(Instance.project)
|
|
|
|
expect(
|
|
synced
|
|
.filter((item) => item.type === typeA || item.type === typeB)
|
|
.map((item) => item.name)
|
|
.toSorted(),
|
|
).toEqual(["adapter-a", "adapter-b"])
|
|
expect(adapterA.calls.list).toBe(1)
|
|
expect(adapterB.calls.list).toBe(1)
|
|
expect(noList.calls.list).toBe(0)
|
|
})
|
|
})
|
|
|
|
it.live("remote create connects to routed event and history endpoints", () => {
|
|
const calls: FetchCall[] = []
|
|
return Effect.gen(function* () {
|
|
yield* HttpServer.serveEffect()(
|
|
Effect.gen(function* () {
|
|
const req = yield* HttpServerRequest.HttpServerRequest
|
|
const bodyText = yield* req.text
|
|
const call = {
|
|
url: new URL(req.url, "http://localhost"),
|
|
method: req.method,
|
|
headers: new Headers(req.headers),
|
|
bodyText,
|
|
json: bodyText ? JSON.parse(bodyText) : undefined,
|
|
}
|
|
calls.push(call)
|
|
if (call.url.pathname === "/base/global/event")
|
|
return HttpServerResponse.fromWeb(eventStreamResponse([], false))
|
|
if (call.url.pathname === "/base/sync/history") return yield* HttpServerResponse.json([])
|
|
return HttpServerResponse.text("unexpected", { status: 500 })
|
|
}),
|
|
)
|
|
const url = yield* serverUrl()
|
|
yield* provideTmpdirInstance(
|
|
(dir) =>
|
|
Effect.gen(function* () {
|
|
const workspace = yield* Workspace.Service
|
|
const type = unique("remote-create")
|
|
const recorded = remoteAdapter(`${url}/base/?ignored=1#hash`, { directory: dir })
|
|
registerAdapter(Instance.project.id, type, recorded.adapter)
|
|
|
|
const info = yield* workspace.create({ type, branch: null, projectID: Instance.project.id, extra: null })
|
|
|
|
expect(
|
|
calls.map((call) => `${call.method} ${call.url.pathname}${call.url.search}${call.url.hash}`),
|
|
).toEqual(["GET /base/global/event", "POST /base/sync/history"])
|
|
expect(calls[1].json).toEqual({})
|
|
expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("connected")
|
|
expect(yield* workspace.isSyncing(info.id)).toBe(true)
|
|
|
|
yield* workspace.remove(info.id)
|
|
expect(yield* workspace.isSyncing(info.id)).toBe(false)
|
|
expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
|
|
}),
|
|
{ git: true },
|
|
)
|
|
})
|
|
})
|
|
|
|
test("remove returns undefined for a missing workspace", async () => {
|
|
await withInstance(async () => {
|
|
expect(await removeWorkspace(WorkspaceID.ascending("wrk_missing_remove"))).toBeUndefined()
|
|
})
|
|
})
|
|
|
|
test("remove deletes the workspace, associated sessions, adapter resources, and status", async () => {
|
|
await withInstance(async (dir) => {
|
|
const type = unique("remove-local")
|
|
const recorded = localAdapter(path.join(dir, "remove-local"))
|
|
registerAdapter(Instance.project.id, type, recorded.adapter)
|
|
const info = await createWorkspace({ type, branch: null, projectID: Instance.project.id, extra: null })
|
|
const one = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
|
|
const two = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
|
|
attachSessionToWorkspace(one.id, info.id)
|
|
attachSessionToWorkspace(two.id, info.id)
|
|
|
|
const removed = await removeWorkspace(info.id)
|
|
|
|
expect(removed).toEqual(info)
|
|
expect(await getWorkspace(info.id)).toBeUndefined()
|
|
expect(recorded.calls.remove).toEqual([info])
|
|
expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
|
|
expect(
|
|
Database.use((db) =>
|
|
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, info.id)).all(),
|
|
),
|
|
).toEqual([])
|
|
})
|
|
})
|
|
|
|
test("remove still deletes the row when the adapter cannot remove resources", async () => {
|
|
await withInstance(async () => {
|
|
const type = unique("remove-throws")
|
|
const info = workspaceInfo(Instance.project.id, type, { id: WorkspaceID.ascending("wrk_remove_throws") })
|
|
registerAdapter(
|
|
Instance.project.id,
|
|
type,
|
|
recordedAdapter({
|
|
async remove() {
|
|
throw new Error("remove exploded")
|
|
},
|
|
target() {
|
|
return { type: "local", directory: "/unused" }
|
|
},
|
|
}).adapter,
|
|
)
|
|
insertWorkspace(info)
|
|
|
|
expect(await removeWorkspace(info.id)).toEqual(info)
|
|
expect(await getWorkspace(info.id)).toBeUndefined()
|
|
})
|
|
})
|
|
|
|
test("sessionWarp moves a session into a local workspace and claims ownership", async () => {
|
|
await withInstance(async (dir) => {
|
|
const previousType = unique("warp-prev-local")
|
|
const targetType = unique("warp-target-local")
|
|
const previous = workspaceInfo(Instance.project.id, previousType)
|
|
const target = workspaceInfo(Instance.project.id, targetType)
|
|
insertWorkspace(previous)
|
|
insertWorkspace(target)
|
|
registerAdapter(Instance.project.id, previousType, localAdapter(path.join(dir, "warp-prev-local")).adapter)
|
|
registerAdapter(Instance.project.id, targetType, localAdapter(path.join(dir, "warp-target-local")).adapter)
|
|
const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
|
|
attachSessionToWorkspace(session.id, previous.id)
|
|
|
|
await warpWorkspaceSession({ workspaceID: target.id, sessionID: session.id })
|
|
|
|
expect(
|
|
Database.use((db) =>
|
|
db
|
|
.select({ workspaceID: SessionTable.workspace_id })
|
|
.from(SessionTable)
|
|
.where(eq(SessionTable.id, session.id))
|
|
.get(),
|
|
)?.workspaceID,
|
|
).toBe(target.id)
|
|
expect(sessionSequenceOwner(session.id)).toBe(target.id)
|
|
})
|
|
})
|
|
|
|
test("sessionWarp applies source workspace patch to local target workspace", async () => {
|
|
await withInstance(async (dir) => {
|
|
const previousType = unique("warp-patch-prev-local")
|
|
const targetType = unique("warp-patch-target-local")
|
|
const previousDir = path.join(dir, "warp-patch-prev-local")
|
|
const targetDir = path.join(dir, "warp-patch-target-local")
|
|
await initGitRepo(previousDir)
|
|
await initGitRepo(targetDir)
|
|
await fs.writeFile(path.join(previousDir, "tracked.txt"), "changed\n")
|
|
await fs.writeFile(path.join(previousDir, "new.txt"), "new\n")
|
|
|
|
const previous = workspaceInfo(Instance.project.id, previousType)
|
|
const target = workspaceInfo(Instance.project.id, targetType)
|
|
insertWorkspace(previous)
|
|
insertWorkspace(target)
|
|
registerAdapter(Instance.project.id, previousType, localAdapter(previousDir, { createDir: false }).adapter)
|
|
registerAdapter(Instance.project.id, targetType, localAdapter(targetDir, { createDir: false }).adapter)
|
|
const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
|
|
attachSessionToWorkspace(session.id, previous.id)
|
|
|
|
await warpWorkspaceSession({ workspaceID: target.id, sessionID: session.id, copyChanges: true })
|
|
|
|
expect(await fs.readFile(path.join(targetDir, "tracked.txt"), "utf8")).toBe("changed\n")
|
|
expect(await fs.readFile(path.join(targetDir, "new.txt"), "utf8")).toBe("new\n")
|
|
})
|
|
})
|
|
|
|
test("sessionWarp detaches a session to the local project and claims project ownership", async () => {
|
|
await withInstance(async (dir) => {
|
|
const previousType = unique("warp-detach-local")
|
|
const previous = workspaceInfo(Instance.project.id, previousType)
|
|
insertWorkspace(previous)
|
|
registerAdapter(Instance.project.id, previousType, localAdapter(path.join(dir, "warp-detach-local")).adapter)
|
|
const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
|
|
attachSessionToWorkspace(session.id, previous.id)
|
|
|
|
await warpWorkspaceSession({ workspaceID: null, sessionID: session.id })
|
|
|
|
expect(
|
|
Database.use((db) =>
|
|
db
|
|
.select({ workspaceID: SessionTable.workspace_id })
|
|
.from(SessionTable)
|
|
.where(eq(SessionTable.id, session.id))
|
|
.get(),
|
|
)?.workspaceID,
|
|
).toBeNull()
|
|
expect(sessionSequenceOwner(session.id)).toBe(Instance.project.id)
|
|
})
|
|
})
|
|
|
|
test("sessionWarp detaches to the source project when invoked from a workspace instance", async () => {
|
|
await withInstance(async () => {
|
|
const projectID = Instance.project.id
|
|
await using workspaceTmp = await tmpdir({ git: true })
|
|
const previousType = unique("warp-detach-workspace-instance")
|
|
const previous = workspaceInfo(projectID, previousType)
|
|
insertWorkspace(previous)
|
|
registerAdapter(projectID, previousType, localAdapter(workspaceTmp.path, { createDir: false }).adapter)
|
|
const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
|
|
attachSessionToWorkspace(session.id, previous.id)
|
|
|
|
const workspaceProjectID = await WithInstance.provide({
|
|
directory: workspaceTmp.path,
|
|
fn: async () => {
|
|
const id = Instance.project.id
|
|
expect(id).not.toBe(projectID)
|
|
await warpWorkspaceSession({ workspaceID: null, sessionID: session.id })
|
|
return id
|
|
},
|
|
})
|
|
|
|
expect(
|
|
Database.use((db) =>
|
|
db
|
|
.select({ workspaceID: SessionTable.workspace_id })
|
|
.from(SessionTable)
|
|
.where(eq(SessionTable.id, session.id))
|
|
.get(),
|
|
)?.workspaceID,
|
|
).toBeNull()
|
|
expect(sessionSequenceOwner(session.id)).toBe(projectID)
|
|
expect(sessionSequenceOwner(session.id)).not.toBe(workspaceProjectID)
|
|
})
|
|
})
|
|
|
|
it.live("sessionWarp syncs previous remote history, replays it, steals, and claims the sequence", () => {
|
|
const calls: FetchCall[] = []
|
|
let historySessionID: SessionID | undefined
|
|
let historyNextSeq = 0
|
|
return Effect.gen(function* () {
|
|
yield* HttpServer.serveEffect()(
|
|
Effect.gen(function* () {
|
|
const req = yield* HttpServerRequest.HttpServerRequest
|
|
const bodyText = yield* req.text
|
|
const call = {
|
|
url: new URL(req.url, "http://localhost"),
|
|
method: req.method,
|
|
headers: new Headers(req.headers),
|
|
bodyText,
|
|
json: bodyText ? JSON.parse(bodyText) : undefined,
|
|
}
|
|
calls.push(call)
|
|
if (call.url.pathname === "/warp-source/sync/history") {
|
|
return yield* HttpServerResponse.json([
|
|
{
|
|
id: `evt_${unique("warp-source-history")}`,
|
|
aggregate_id: historySessionID!,
|
|
seq: historyNextSeq,
|
|
type: sessionUpdatedType(),
|
|
data: { sessionID: historySessionID!, info: { title: "from source history" } },
|
|
},
|
|
])
|
|
}
|
|
if (call.url.pathname === "/warp-source/vcs/diff/raw") return HttpServerResponse.text("remote patch")
|
|
if (call.url.pathname === "/warp-target/sync/replay")
|
|
return yield* HttpServerResponse.json({ sessionID: "ok" })
|
|
if (call.url.pathname === "/warp-target/sync/steal")
|
|
return yield* HttpServerResponse.json({ sessionID: "ok" })
|
|
if (call.url.pathname === "/warp-target/vcs/apply") return yield* HttpServerResponse.json({ applied: true })
|
|
return HttpServerResponse.text("unexpected", { status: 500 })
|
|
}),
|
|
)
|
|
const url = yield* serverUrl()
|
|
yield* provideTmpdirInstance(
|
|
() =>
|
|
Effect.gen(function* () {
|
|
const workspace = yield* Workspace.Service
|
|
const sessionSvc = yield* SessionNs.Service
|
|
const previousType = unique("warp-remote-source")
|
|
const targetType = unique("warp-remote-target")
|
|
const previous = workspaceInfo(Instance.project.id, previousType)
|
|
const target = workspaceInfo(Instance.project.id, targetType, { directory: "remote-target-dir" })
|
|
insertWorkspace(previous)
|
|
insertWorkspace(target)
|
|
registerAdapter(Instance.project.id, previousType, remoteAdapter(`${url}/warp-source`).adapter)
|
|
registerAdapter(Instance.project.id, targetType, remoteAdapter(`${url}/warp-target`).adapter)
|
|
const session = yield* sessionSvc.create({})
|
|
attachSessionToWorkspace(session.id, previous.id)
|
|
historySessionID = session.id
|
|
historyNextSeq = (sessionSequence(session.id) ?? -1) + 1
|
|
|
|
yield* workspace.sessionWarp({ workspaceID: target.id, sessionID: session.id, copyChanges: true })
|
|
|
|
expect(calls.map((call) => `${call.method} ${call.url.pathname}`)).toEqual([
|
|
"POST /warp-source/sync/history",
|
|
"GET /warp-source/vcs/diff/raw",
|
|
"POST /warp-target/vcs/apply",
|
|
"POST /warp-target/sync/replay",
|
|
"POST /warp-target/sync/steal",
|
|
])
|
|
expect(calls[0].json).toEqual({ [session.id]: historyNextSeq - 1 })
|
|
expect(calls[2].json).toEqual({ patch: "remote patch" })
|
|
expect(calls[3].json).toMatchObject({
|
|
directory: "remote-target-dir",
|
|
events: [
|
|
{
|
|
aggregateID: session.id,
|
|
seq: 0,
|
|
type: SyncEvent.versionedType(SessionNs.Event.Created.type, SessionNs.Event.Created.version),
|
|
},
|
|
{
|
|
aggregateID: session.id,
|
|
seq: historyNextSeq,
|
|
type: sessionUpdatedType(),
|
|
},
|
|
],
|
|
})
|
|
expect(calls[4].json).toEqual({ sessionID: session.id })
|
|
expect((yield* sessionSvc.get(session.id)).title).toBe("from source history")
|
|
expect(sessionSequenceOwner(session.id)).toBe(target.id)
|
|
}),
|
|
{ git: true },
|
|
)
|
|
})
|
|
})
|
|
})
|
|
|
|
describe("workspace sync state", () => {
|
|
test("startWorkspaceSyncing is disabled by the experimental workspace flag", async () => {
|
|
await withInstance(async (dir) => {
|
|
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = false
|
|
const type = unique("flag-disabled")
|
|
const info = workspaceInfo(Instance.project.id, type)
|
|
const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
|
|
attachSessionToWorkspace(session.id, info.id)
|
|
insertWorkspace(info)
|
|
registerAdapter(Instance.project.id, type, localAdapter(path.join(dir, "flag-disabled")).adapter)
|
|
|
|
startWorkspaceSyncing(Instance.project.id)
|
|
await delay(25)
|
|
|
|
expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
|
|
})
|
|
})
|
|
|
|
test("startWorkspaceSyncing starts all workspaces", async () => {
|
|
await withInstance(async (dir) => {
|
|
const firstType = unique("first")
|
|
const secondType = unique("second")
|
|
const first = workspaceInfo(Instance.project.id, firstType)
|
|
const second = workspaceInfo(Instance.project.id, secondType)
|
|
await fs.mkdir(path.join(dir, "first"), { recursive: true })
|
|
await fs.mkdir(path.join(dir, "second"), { recursive: true })
|
|
insertWorkspace(first)
|
|
insertWorkspace(second)
|
|
registerAdapter(Instance.project.id, firstType, localAdapter(path.join(dir, "first")).adapter)
|
|
registerAdapter(Instance.project.id, secondType, localAdapter(path.join(dir, "second")).adapter)
|
|
|
|
startWorkspaceSyncing(Instance.project.id)
|
|
|
|
await eventually(() =>
|
|
workspaceStatus().then((status) => {
|
|
expect(status.find((item) => item.workspaceID === first.id)?.status).toBe("connected")
|
|
expect(status.find((item) => item.workspaceID === second.id)?.status).toBe("connected")
|
|
}),
|
|
)
|
|
await removeWorkspace(first.id)
|
|
await removeWorkspace(second.id)
|
|
})
|
|
})
|
|
|
|
test("local start reports error when the target directory is missing", async () => {
|
|
await withInstance(async (dir) => {
|
|
const type = unique("missing-local")
|
|
const info = workspaceInfo(Instance.project.id, type)
|
|
insertWorkspace(info)
|
|
registerAdapter(
|
|
Instance.project.id,
|
|
type,
|
|
localAdapter(path.join(dir, "missing-target"), { createDir: false }).adapter,
|
|
)
|
|
attachSessionToWorkspace(
|
|
(await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id,
|
|
info.id,
|
|
)
|
|
|
|
startWorkspaceSyncing(Instance.project.id)
|
|
|
|
await eventually(() =>
|
|
workspaceStatus().then((status) =>
|
|
expect(status.find((item) => item.workspaceID === info.id)?.status).toBe("error"),
|
|
),
|
|
)
|
|
expect(await isWorkspaceSyncing(info.id)).toBe(false)
|
|
await removeWorkspace(info.id)
|
|
})
|
|
})
|
|
|
|
test("duplicate local status updates are suppressed", async () => {
|
|
await withInstance(async (dir) => {
|
|
const captured = captureGlobalEvents()
|
|
try {
|
|
const type = unique("dedupe-local")
|
|
const info = workspaceInfo(Instance.project.id, type)
|
|
const target = path.join(dir, "dedupe-local")
|
|
await fs.mkdir(target, { recursive: true })
|
|
insertWorkspace(info)
|
|
registerAdapter(Instance.project.id, type, localAdapter(target).adapter)
|
|
attachSessionToWorkspace(
|
|
(await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id,
|
|
info.id,
|
|
)
|
|
|
|
startWorkspaceSyncing(Instance.project.id)
|
|
startWorkspaceSyncing(Instance.project.id)
|
|
|
|
await eventually(() =>
|
|
workspaceStatus().then((status) =>
|
|
expect(status.find((item) => item.workspaceID === info.id)?.status).toBe("connected"),
|
|
),
|
|
)
|
|
expect(
|
|
captured.events.filter(
|
|
(event) => event.workspace === info.id && event.payload.type === Workspace.Event.Status.type,
|
|
),
|
|
).toHaveLength(1)
|
|
await removeWorkspace(info.id)
|
|
} finally {
|
|
captured.dispose()
|
|
}
|
|
})
|
|
})
|
|
|
|
it.live("remote start emits disconnected, connecting, and connected then refuses duplicate listeners", () => {
|
|
const calls: FetchCall[] = []
|
|
return Effect.gen(function* () {
|
|
yield* HttpServer.serveEffect()(
|
|
Effect.gen(function* () {
|
|
const req = yield* HttpServerRequest.HttpServerRequest
|
|
const bodyText = yield* req.text
|
|
const call = {
|
|
url: new URL(req.url, "http://localhost"),
|
|
method: req.method,
|
|
headers: new Headers(req.headers),
|
|
bodyText,
|
|
json: bodyText ? JSON.parse(bodyText) : undefined,
|
|
}
|
|
calls.push(call)
|
|
if (call.url.pathname === "/sync/global/event") return HttpServerResponse.fromWeb(eventStreamResponse())
|
|
if (call.url.pathname === "/sync/sync/history") return HttpServerResponse.fromWeb(Response.json([]))
|
|
return HttpServerResponse.text("unexpected", { status: 500 })
|
|
}),
|
|
)
|
|
const url = yield* serverUrl()
|
|
yield* provideTmpdirInstance(
|
|
() =>
|
|
Effect.gen(function* () {
|
|
const workspace = yield* Workspace.Service
|
|
const sessionSvc = yield* SessionNs.Service
|
|
const captured = captureGlobalEvents()
|
|
try {
|
|
const type = unique("remote-start")
|
|
const info = workspaceInfo(Instance.project.id, type)
|
|
insertWorkspace(info)
|
|
registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/sync`).adapter)
|
|
attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id)
|
|
|
|
yield* workspace.startWorkspaceSyncing(Instance.project.id)
|
|
yield* eventuallyEffect(
|
|
Effect.gen(function* () {
|
|
expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe(
|
|
"connected",
|
|
)
|
|
}),
|
|
)
|
|
yield* workspace.startWorkspaceSyncing(Instance.project.id)
|
|
yield* Effect.sleep("25 millis")
|
|
|
|
expect(
|
|
captured.events
|
|
.filter((event) => event.workspace === info.id && event.payload.type === Workspace.Event.Status.type)
|
|
.map((event) => event.payload.properties.status),
|
|
).toEqual(["disconnected", "connecting", "connected"])
|
|
expect(calls.filter((call) => call.url.pathname === "/sync/global/event")).toHaveLength(1)
|
|
expect(calls.filter((call) => call.url.pathname === "/sync/sync/history")).toHaveLength(1)
|
|
expect(yield* workspace.isSyncing(info.id)).toBe(true)
|
|
|
|
yield* workspace.remove(info.id)
|
|
expect(yield* workspace.isSyncing(info.id)).toBe(false)
|
|
} finally {
|
|
captured.dispose()
|
|
}
|
|
}),
|
|
{ git: true },
|
|
)
|
|
})
|
|
})
|
|
|
|
it.live("remote connection HTTP failures set error and clear syncing", () =>
|
|
Effect.gen(function* () {
|
|
yield* HttpServer.serveEffect()(
|
|
Effect.gen(function* () {
|
|
const req = yield* HttpServerRequest.HttpServerRequest
|
|
if (new URL(req.url, "http://localhost").pathname === "/failed/global/event")
|
|
return HttpServerResponse.text("nope", { status: 503 })
|
|
return HttpServerResponse.fromWeb(Response.json([]))
|
|
}),
|
|
)
|
|
const url = yield* serverUrl()
|
|
yield* provideTmpdirInstance(
|
|
() =>
|
|
Effect.gen(function* () {
|
|
const workspace = yield* Workspace.Service
|
|
const sessionSvc = yield* SessionNs.Service
|
|
const type = unique("remote-connect-fail")
|
|
const info = workspaceInfo(Instance.project.id, type)
|
|
insertWorkspace(info)
|
|
registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/failed`).adapter)
|
|
attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id)
|
|
|
|
yield* workspace.startWorkspaceSyncing(Instance.project.id)
|
|
|
|
yield* eventuallyEffect(
|
|
Effect.gen(function* () {
|
|
expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("error")
|
|
}),
|
|
)
|
|
expect(yield* workspace.isSyncing(info.id)).toBe(false)
|
|
yield* workspace.remove(info.id)
|
|
}),
|
|
{ git: true },
|
|
)
|
|
}),
|
|
)
|
|
|
|
it.live("remote history HTTP failures set error", () =>
|
|
Effect.gen(function* () {
|
|
yield* HttpServer.serveEffect()(
|
|
Effect.gen(function* () {
|
|
const req = yield* HttpServerRequest.HttpServerRequest
|
|
const url = new URL(req.url, "http://localhost")
|
|
if (url.pathname === "/history-failed/global/event")
|
|
return HttpServerResponse.fromWeb(eventStreamResponse([], false))
|
|
if (url.pathname === "/history-failed/sync/history")
|
|
return HttpServerResponse.text("history failed", { status: 500 })
|
|
return HttpServerResponse.fromWeb(Response.json([]))
|
|
}),
|
|
)
|
|
const url = yield* serverUrl()
|
|
yield* provideTmpdirInstance(
|
|
() =>
|
|
Effect.gen(function* () {
|
|
const workspace = yield* Workspace.Service
|
|
const sessionSvc = yield* SessionNs.Service
|
|
const type = unique("remote-history-fail")
|
|
const info = workspaceInfo(Instance.project.id, type)
|
|
insertWorkspace(info)
|
|
registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/history-failed`).adapter)
|
|
attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id)
|
|
|
|
yield* workspace.startWorkspaceSyncing(Instance.project.id)
|
|
|
|
yield* eventuallyEffect(
|
|
Effect.gen(function* () {
|
|
expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("error")
|
|
}),
|
|
)
|
|
expect(yield* workspace.isSyncing(info.id)).toBe(false)
|
|
yield* workspace.remove(info.id)
|
|
}),
|
|
{ git: true },
|
|
)
|
|
}),
|
|
)
|
|
|
|
it.live("sync history sends the local sequence fence and replays returned events in workspace context", () => {
|
|
const historyBodies: unknown[] = []
|
|
let historySessionID: SessionID | undefined
|
|
let historyNextSeq = 0
|
|
return Effect.gen(function* () {
|
|
yield* HttpServer.serveEffect()(
|
|
Effect.gen(function* () {
|
|
const req = yield* HttpServerRequest.HttpServerRequest
|
|
const bodyText = yield* req.text
|
|
const url = new URL(req.url, "http://localhost")
|
|
if (url.pathname === "/history/global/event") return HttpServerResponse.fromWeb(eventStreamResponse())
|
|
if (url.pathname === "/history/sync/history") {
|
|
historyBodies.push(bodyText ? JSON.parse(bodyText) : undefined)
|
|
return HttpServerResponse.fromWeb(
|
|
Response.json([
|
|
{
|
|
id: `evt_${unique("history")}`,
|
|
aggregate_id: historySessionID!,
|
|
seq: historyNextSeq,
|
|
type: sessionUpdatedType(),
|
|
data: { sessionID: historySessionID!, info: { title: "from history" } },
|
|
},
|
|
]),
|
|
)
|
|
}
|
|
return HttpServerResponse.text("unexpected", { status: 500 })
|
|
}),
|
|
)
|
|
const url = yield* serverUrl()
|
|
yield* provideTmpdirInstance(
|
|
() =>
|
|
Effect.gen(function* () {
|
|
const workspace = yield* Workspace.Service
|
|
const sessionSvc = yield* SessionNs.Service
|
|
const captured = captureGlobalEvents()
|
|
try {
|
|
const type = unique("history-replay")
|
|
const info = workspaceInfo(Instance.project.id, type)
|
|
insertWorkspace(info)
|
|
registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/history`).adapter)
|
|
const session = yield* sessionSvc.create({ title: "before history" })
|
|
attachSessionToWorkspace(session.id, info.id)
|
|
historySessionID = session.id
|
|
historyNextSeq = (sessionSequence(session.id) ?? -1) + 1
|
|
|
|
yield* workspace.startWorkspaceSyncing(Instance.project.id)
|
|
|
|
yield* eventuallyEffect(
|
|
Effect.gen(function* () {
|
|
expect((yield* sessionSvc.get(session.id).pipe(Effect.orDie)).title).toBe("from history")
|
|
}),
|
|
)
|
|
expect(historyBodies).toEqual([{ [session.id]: historyNextSeq - 1 }])
|
|
expect(
|
|
captured.events.some(
|
|
(event) =>
|
|
event.workspace === info.id &&
|
|
event.payload.type === "sync" &&
|
|
event.payload.syncEvent.seq === historyNextSeq,
|
|
),
|
|
).toBe(true)
|
|
yield* workspace.remove(info.id)
|
|
} finally {
|
|
captured.dispose()
|
|
}
|
|
}),
|
|
{ git: true },
|
|
)
|
|
})
|
|
})
|
|
|
|
it.live("SSE forwards non-heartbeat events and ignores heartbeats", () =>
|
|
Effect.gen(function* () {
|
|
yield* HttpServer.serveEffect()(
|
|
Effect.gen(function* () {
|
|
const req = yield* HttpServerRequest.HttpServerRequest
|
|
const url = new URL(req.url, "http://localhost")
|
|
if (url.pathname === "/sse-forward/global/event")
|
|
return HttpServerResponse.fromWeb(
|
|
eventStreamResponse(
|
|
[
|
|
{ directory: "remote-dir", project: "remote-project", payload: { type: "server.heartbeat" } },
|
|
{
|
|
directory: "remote-dir",
|
|
project: "remote-project",
|
|
payload: { type: "custom.remote", properties: { ok: true } },
|
|
},
|
|
],
|
|
false,
|
|
),
|
|
)
|
|
if (url.pathname === "/sse-forward/sync/history") return HttpServerResponse.fromWeb(Response.json([]))
|
|
return HttpServerResponse.text("unexpected", { status: 500 })
|
|
}),
|
|
)
|
|
const url = yield* serverUrl()
|
|
yield* provideTmpdirInstance(
|
|
() =>
|
|
Effect.gen(function* () {
|
|
const workspace = yield* Workspace.Service
|
|
const sessionSvc = yield* SessionNs.Service
|
|
const captured = captureGlobalEvents()
|
|
try {
|
|
const type = unique("sse-forward")
|
|
const info = workspaceInfo(Instance.project.id, type)
|
|
insertWorkspace(info)
|
|
registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/sse-forward`).adapter)
|
|
attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id)
|
|
|
|
yield* workspace.startWorkspaceSyncing(Instance.project.id)
|
|
|
|
yield* eventuallyEffect(
|
|
Effect.sync(() =>
|
|
expect(
|
|
captured.events.some(
|
|
(event) => event.workspace === info.id && event.payload.type === "custom.remote",
|
|
),
|
|
).toBe(true),
|
|
),
|
|
)
|
|
expect(
|
|
captured.events.some(
|
|
(event) => event.workspace === info.id && event.payload.type === "server.heartbeat",
|
|
),
|
|
).toBe(false)
|
|
expect(
|
|
captured.events.find((event) => event.workspace === info.id && event.payload.type === "custom.remote"),
|
|
).toMatchObject({
|
|
directory: "remote-dir",
|
|
project: "remote-project",
|
|
payload: { properties: { ok: true } },
|
|
})
|
|
yield* workspace.remove(info.id)
|
|
} finally {
|
|
captured.dispose()
|
|
}
|
|
}),
|
|
{ git: true },
|
|
)
|
|
}),
|
|
)
|
|
|
|
it.live("SSE sync events are replayed and forwarded", () => {
|
|
let sseSessionID: SessionID | undefined
|
|
let sseNextSeq = 0
|
|
return Effect.gen(function* () {
|
|
yield* HttpServer.serveEffect()(
|
|
Effect.gen(function* () {
|
|
const req = yield* HttpServerRequest.HttpServerRequest
|
|
const url = new URL(req.url, "http://localhost")
|
|
if (url.pathname === "/sse-sync/global/event")
|
|
return HttpServerResponse.fromWeb(
|
|
eventStreamResponse(
|
|
[
|
|
{
|
|
directory: "remote-dir",
|
|
project: "remote-project",
|
|
payload: {
|
|
type: "sync",
|
|
syncEvent: {
|
|
id: `evt_${unique("sse")}`,
|
|
aggregateID: sseSessionID!,
|
|
seq: sseNextSeq,
|
|
type: sessionUpdatedType(),
|
|
data: { sessionID: sseSessionID!, info: { title: "from sse" } },
|
|
},
|
|
},
|
|
},
|
|
],
|
|
false,
|
|
),
|
|
)
|
|
if (url.pathname === "/sse-sync/sync/history") return HttpServerResponse.fromWeb(Response.json([]))
|
|
return HttpServerResponse.text("unexpected", { status: 500 })
|
|
}),
|
|
)
|
|
const url = yield* serverUrl()
|
|
yield* provideTmpdirInstance(
|
|
() =>
|
|
Effect.gen(function* () {
|
|
const workspace = yield* Workspace.Service
|
|
const sessionSvc = yield* SessionNs.Service
|
|
const captured = captureGlobalEvents()
|
|
try {
|
|
const type = unique("sse-sync")
|
|
const info = workspaceInfo(Instance.project.id, type)
|
|
insertWorkspace(info)
|
|
registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/sse-sync`).adapter)
|
|
const session = yield* sessionSvc.create({ title: "before sse" })
|
|
attachSessionToWorkspace(session.id, info.id)
|
|
sseSessionID = session.id
|
|
sseNextSeq = (sessionSequence(session.id) ?? -1) + 1
|
|
|
|
yield* workspace.startWorkspaceSyncing(Instance.project.id)
|
|
|
|
yield* eventuallyEffect(
|
|
Effect.gen(function* () {
|
|
expect((yield* sessionSvc.get(session.id).pipe(Effect.orDie)).title).toBe("from sse")
|
|
}),
|
|
)
|
|
expect(
|
|
captured.events.some(
|
|
(event) =>
|
|
event.workspace === info.id &&
|
|
event.payload.type === "sync" &&
|
|
event.payload.syncEvent.seq === sseNextSeq,
|
|
),
|
|
).toBe(true)
|
|
yield* workspace.remove(info.id)
|
|
} finally {
|
|
captured.dispose()
|
|
}
|
|
}),
|
|
{ git: true },
|
|
)
|
|
})
|
|
})
|
|
})
|
|
|
|
describe("workspace waitForSync", () => {
|
|
test("returns immediately for an empty fence", async () => {
|
|
await withInstance(async () => {
|
|
await expect(waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_empty"), {})).resolves.toBeUndefined()
|
|
})
|
|
})
|
|
|
|
test("returns immediately when the stored sequence already satisfies the fence", async () => {
|
|
await withInstance(async () => {
|
|
const sessionID = SessionID.descending("ses_wait_done")
|
|
Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 4 }).run())
|
|
|
|
await expect(
|
|
waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done"), { [sessionID]: 4 }),
|
|
).resolves.toBeUndefined()
|
|
await expect(
|
|
waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done_2"), { [sessionID]: 3 }),
|
|
).resolves.toBeUndefined()
|
|
})
|
|
})
|
|
|
|
test("waits until the database reaches the requested sequence and a workspace event arrives", async () => {
|
|
await withInstance(async () => {
|
|
const workspaceID = WorkspaceID.ascending("wrk_wait_event")
|
|
const sessionID = SessionID.descending("ses_wait_event")
|
|
Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 1 }).run())
|
|
|
|
const waited = waitForWorkspaceSync(workspaceID, { [sessionID]: 2 })
|
|
await delay(10)
|
|
Database.use((db) =>
|
|
db.update(EventSequenceTable).set({ seq: 2 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(),
|
|
)
|
|
GlobalBus.emit("event", { workspace: workspaceID, payload: { type: "anything" } })
|
|
|
|
await expect(waited).resolves.toBeUndefined()
|
|
})
|
|
})
|
|
|
|
test("a sync event for a different workspace can also release the fence", async () => {
|
|
await withInstance(async () => {
|
|
const workspaceID = WorkspaceID.ascending("wrk_wait_sync_any")
|
|
const sessionID = SessionID.descending("ses_wait_sync_any")
|
|
Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 0 }).run())
|
|
|
|
const waited = waitForWorkspaceSync(workspaceID, { [sessionID]: 1 })
|
|
await delay(10)
|
|
Database.use((db) =>
|
|
db.update(EventSequenceTable).set({ seq: 1 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(),
|
|
)
|
|
GlobalBus.emit("event", {
|
|
workspace: WorkspaceID.ascending("wrk_other_workspace"),
|
|
payload: { type: "sync" },
|
|
})
|
|
|
|
await expect(waited).resolves.toBeUndefined()
|
|
})
|
|
})
|
|
|
|
test("rejects with the abort reason when aborted", async () => {
|
|
await withInstance(async () => {
|
|
const abort = new AbortController()
|
|
const reason = new Error("caller aborted")
|
|
const waited = waitForWorkspaceSync(
|
|
WorkspaceID.ascending("wrk_wait_abort"),
|
|
{ [SessionID.descending("ses_wait_abort")]: 1 },
|
|
abort.signal,
|
|
)
|
|
abort.abort(reason)
|
|
|
|
await expect(waited).rejects.toMatchObject({
|
|
_tag: "WorkspaceSyncAbortedError",
|
|
message: reason.message,
|
|
cause: reason,
|
|
})
|
|
})
|
|
})
|
|
|
|
test("times out with the requested fence in the error message", async () => {
|
|
await withInstance(async () => {
|
|
const sessionID = SessionID.descending("ses_wait_timeout")
|
|
|
|
await expect(waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_timeout"), { [sessionID]: 1 })).rejects.toThrow(
|
|
`Timed out waiting for sync fence: {"${sessionID}":1}`,
|
|
)
|
|
})
|
|
}, 7000)
|
|
})
|