Compare commits

...

2 Commits

Author SHA1 Message Date
Kit Langton
a886fbc332 Merge branch 'dev' into kit/effect-workspace 2026-04-10 23:42:18 -04:00
Kit Langton
8db17c5b68 refactor(workspace): migrate Workspace to service layer 2026-04-10 23:34:38 -04:00
2 changed files with 230 additions and 154 deletions

View File

@@ -1,19 +1,19 @@
import z from "zod"
import { setTimeout as sleep } from "node:timers/promises"
import { fn } from "@/util/fn"
import { AppFileSystem } from "@/filesystem"
import { makeRuntime } from "@/effect/run-service"
import { Database, eq } from "@/storage/db"
import { Project } from "@/project/project"
import type { Project } from "@/project/project"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { SyncEvent } from "@/sync"
import { Log } from "@/util/log"
import { Filesystem } from "@/util/filesystem"
import { ProjectID } from "@/project/schema"
import { WorkspaceTable } from "./workspace.sql"
import { getAdaptor } from "./adaptors"
import { WorkspaceInfo } from "./types"
import { WorkspaceID } from "./schema"
import { parseSSE } from "./sse"
import { Context, Effect, Layer, Scope } from "effect"
export namespace Workspace {
export const Info = WorkspaceInfo.meta({
@@ -56,174 +56,250 @@ export namespace Workspace {
}
}
const CreateInput = z.object({
export const CreateInput = z.object({
id: WorkspaceID.zod.optional(),
type: Info.shape.type,
branch: Info.shape.branch,
projectID: ProjectID.zod,
extra: Info.shape.extra,
})
export const create = fn(CreateInput, async (input) => {
const id = WorkspaceID.ascending(input.id)
const adaptor = await getAdaptor(input.type)
const config = await adaptor.configure({ ...input, id, name: null, directory: null })
const info: Info = {
id,
type: config.type,
branch: config.branch ?? null,
name: config.name ?? null,
directory: config.directory ?? null,
extra: config.extra ?? null,
projectID: input.projectID,
}
Database.use((db) => {
db.insert(WorkspaceTable)
.values({
id: info.id,
type: info.type,
branch: info.branch,
name: info.name,
directory: info.directory,
extra: info.extra,
project_id: info.projectID,
})
.run()
})
await adaptor.create(config)
startSync(info)
return info
})
export function list(project: Project.Info) {
const rows = Database.use((db) =>
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
)
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
for (const space of spaces) startSync(space)
return spaces
}
export const get = fn(WorkspaceID.zod, async (id) => {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
const space = fromRow(row)
startSync(space)
return space
})
export const remove = fn(WorkspaceID.zod, async (id) => {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (row) {
stopSync(id)
const info = fromRow(row)
const adaptor = await getAdaptor(row.type)
adaptor.remove(info)
Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
return info
}
})
const connections = new Map<WorkspaceID, ConnectionStatus>()
const aborts = new Map<WorkspaceID, AbortController>()
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
const prev = connections.get(id)
if (prev?.status === status && prev?.error === error) return
const next = { workspaceID: id, status, error }
connections.set(id, next)
GlobalBus.emit("event", {
directory: "global",
workspace: id,
payload: {
type: Event.Status.type,
properties: next,
},
})
}
export function status(): ConnectionStatus[] {
return [...connections.values()]
}
export type CreateInput = z.infer<typeof CreateInput>
const log = Log.create({ service: "workspace-sync" })
async function workspaceEventLoop(space: Info, signal: AbortSignal) {
log.info("starting sync: " + space.id)
export interface Interface {
readonly create: (input: CreateInput) => Effect.Effect<Info>
readonly list: (projectID: ProjectID) => Effect.Effect<Info[]>
readonly get: (id: WorkspaceID) => Effect.Effect<Info | undefined>
readonly remove: (id: WorkspaceID) => Effect.Effect<Info | undefined>
readonly status: () => Effect.Effect<ConnectionStatus[]>
}
while (!signal.aborted) {
log.info("connecting to sync: " + space.id)
export class Service extends Context.Service<Service, Interface>()("@opencode/Workspace") {}
setStatus(space.id, "connecting")
const adaptor = await getAdaptor(space.type)
const target = await adaptor.target(space)
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const scope = yield* Scope.Scope
const connections = new Map<WorkspaceID, ConnectionStatus>()
const aborts = new Map<WorkspaceID, AbortController>()
if (target.type === "local") return
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
for (const abort of aborts.values()) abort.abort()
}),
)
const res = await fetch(target.url + "/sync/event", { method: "GET", signal }).catch((err: unknown) => {
setStatus(space.id, "error", String(err))
return undefined
const db = <T>(fn: (db: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
const setStatus = Effect.fnUntraced(function* (
id: WorkspaceID,
status: ConnectionStatus["status"],
error?: string,
) {
const prev = connections.get(id)
if (prev?.status === status && prev?.error === error) return
const next = { workspaceID: id, status, error }
connections.set(id, next)
GlobalBus.emit("event", {
directory: "global",
workspace: id,
payload: {
type: Event.Status.type,
properties: next,
},
})
})
if (!res || !res.ok || !res.body) {
log.info("failed to connect to sync: " + res?.status)
setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
await sleep(1000)
continue
}
setStatus(space.id, "connected")
await parseSSE(res.body, signal, (evt) => {
const event = evt as SyncEvent.SerializedEvent
const stopSync = Effect.fnUntraced(function* (id: WorkspaceID) {
aborts.get(id)?.abort()
aborts.delete(id)
connections.delete(id)
})
try {
if (!event.type.startsWith("server.")) {
SyncEvent.replay(event)
const workspaceEventLoop = Effect.fn("Workspace.workspaceEventLoop")(function* (
space: Info,
signal: AbortSignal,
) {
log.info("starting sync: " + space.id)
while (!signal.aborted) {
log.info("connecting to sync: " + space.id)
yield* setStatus(space.id, "connecting")
const adaptor = yield* Effect.promise(() => getAdaptor(space.type)).pipe(Effect.orDie)
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))).pipe(Effect.orDie)
if (target.type === "local") return
const res = yield* Effect.tryPromise({
try: () => fetch(target.url + "/sync/event", { method: "GET", signal }),
catch: (err) => String(err),
}).pipe(
Effect.catch((err) => setStatus(space.id, "error", err).pipe(Effect.as(undefined as Response | undefined))),
)
if (!res || !res.ok || !res.body) {
log.info("failed to connect to sync: " + res?.status)
yield* setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
yield* Effect.sleep("1 second")
continue
}
} catch (err) {
log.warn("failed to replay sync event", {
workspaceID: space.id,
error: err,
})
const body = res.body
yield* setStatus(space.id, "connected")
yield* Effect.promise(() =>
parseSSE(body, signal, (evt) => {
const event = evt as SyncEvent.SerializedEvent
try {
if (!event.type.startsWith("server.")) {
SyncEvent.replay(event)
}
} catch (err) {
log.warn("failed to replay sync event", {
workspaceID: space.id,
error: err,
})
}
}),
).pipe(Effect.orDie)
yield* setStatus(space.id, "disconnected")
log.info("disconnected to sync: " + space.id)
yield* Effect.sleep("250 millis")
}
})
setStatus(space.id, "disconnected")
log.info("disconnected to sync: " + space.id)
await sleep(250)
}
const startSync = Effect.fn("Workspace.startSync")(function* (space: Info) {
if (space.type === "worktree") {
yield* Effect.gen(function* () {
const exists = yield* fs.exists(space.directory!).pipe(Effect.orDie)
yield* setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
}).pipe(Effect.forkIn(scope))
return
}
if (aborts.has(space.id)) return
const abort = new AbortController()
aborts.set(space.id, abort)
yield* setStatus(space.id, "disconnected")
yield* workspaceEventLoop(space, abort.signal).pipe(
Effect.catchCause((cause) =>
Effect.gen(function* () {
yield* setStatus(space.id, "error", String(cause))
yield* Effect.sync(() => {
log.warn("workspace sync listener failed", {
workspaceID: space.id,
error: cause,
})
})
}),
),
Effect.forkIn(scope),
)
})
const create = Effect.fn("Workspace.create")(function* (input: CreateInput) {
const id = WorkspaceID.ascending(input.id)
const adaptor = yield* Effect.promise(() => getAdaptor(input.type)).pipe(Effect.orDie)
const config = yield* Effect.promise(() =>
Promise.resolve(adaptor.configure({ ...input, id, name: null, directory: null })),
).pipe(Effect.orDie)
const info: Info = {
id,
type: config.type,
branch: config.branch ?? null,
name: config.name ?? null,
directory: config.directory ?? null,
extra: config.extra ?? null,
projectID: input.projectID,
}
yield* db((db) => {
db.insert(WorkspaceTable)
.values({
id: info.id,
type: info.type,
branch: info.branch,
name: info.name,
directory: info.directory,
extra: info.extra,
project_id: info.projectID,
})
.run()
})
yield* Effect.promise(() => adaptor.create(config)).pipe(Effect.orDie)
yield* startSync(info)
return info
})
const list = Effect.fn("Workspace.list")(function* (projectID: ProjectID) {
const rows = yield* db((db) =>
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, projectID)).all(),
)
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
for (const space of spaces) {
yield* startSync(space)
}
return spaces
})
const get = Effect.fn("Workspace.get")(function* (id: WorkspaceID) {
const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
const space = fromRow(row)
yield* startSync(space)
return space
})
const remove = Effect.fn("Workspace.remove")(function* (id: WorkspaceID) {
const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
yield* stopSync(id)
const info = fromRow(row)
const adaptor = yield* Effect.promise(() => getAdaptor(row.type)).pipe(Effect.orDie)
yield* Effect.sync(() => {
void adaptor.remove(info)
})
yield* db((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
return info
})
const status = Effect.fn("Workspace.status")(() => Effect.succeed([...connections.values()]))
return Service.of({ create, list, get, remove, status })
}),
)
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function create(input: CreateInput) {
return runPromise((svc) => svc.create(input))
}
function startSync(space: Info) {
if (space.type === "worktree") {
void Filesystem.exists(space.directory!).then((exists) => {
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
})
return
}
if (aborts.has(space.id)) return
const abort = new AbortController()
aborts.set(space.id, abort)
setStatus(space.id, "disconnected")
void workspaceEventLoop(space, abort.signal).catch((error) => {
setStatus(space.id, "error", String(error))
log.warn("workspace sync listener failed", {
workspaceID: space.id,
error,
})
})
export async function list(project: Project.Info) {
return runPromise((svc) => svc.list(project.id))
}
function stopSync(id: WorkspaceID) {
aborts.get(id)?.abort()
aborts.delete(id)
connections.delete(id)
export async function get(id: WorkspaceID) {
return runPromise((svc) => svc.get(id))
}
export async function remove(id: WorkspaceID) {
return runPromise((svc) => svc.remove(id))
}
export async function status() {
return runPromise((svc) => svc.status())
}
}

View File

@@ -28,7 +28,7 @@ export const WorkspaceRoutes = lazy(() =>
}),
validator(
"json",
Workspace.create.schema.omit({
Workspace.CreateInput.omit({
projectID: true,
}),
),
@@ -59,7 +59,7 @@ export const WorkspaceRoutes = lazy(() =>
},
}),
async (c) => {
return c.json(Workspace.list(Instance.project))
return c.json(await Workspace.list(Instance.project))
},
)
.get(
@@ -80,8 +80,8 @@ export const WorkspaceRoutes = lazy(() =>
},
}),
async (c) => {
const ids = new Set(Workspace.list(Instance.project).map((item) => item.id))
return c.json(Workspace.status().filter((item) => ids.has(item.workspaceID)))
const ids = new Set((await Workspace.list(Instance.project)).map((item) => item.id))
return c.json((await Workspace.status()).filter((item) => ids.has(item.workspaceID)))
},
)
.delete(