mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-13 17:35:05 +00:00
Compare commits
2 Commits
facade/pro
...
kit/effect
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a886fbc332 | ||
|
|
8db17c5b68 |
@@ -1,19 +1,19 @@
|
||||
import z from "zod"
|
||||
import { setTimeout as sleep } from "node:timers/promises"
|
||||
import { fn } from "@/util/fn"
|
||||
import { AppFileSystem } from "@/filesystem"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { Database, eq } from "@/storage/db"
|
||||
import { Project } from "@/project/project"
|
||||
import type { Project } from "@/project/project"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { GlobalBus } from "@/bus/global"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { Log } from "@/util/log"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { ProjectID } from "@/project/schema"
|
||||
import { WorkspaceTable } from "./workspace.sql"
|
||||
import { getAdaptor } from "./adaptors"
|
||||
import { WorkspaceInfo } from "./types"
|
||||
import { WorkspaceID } from "./schema"
|
||||
import { parseSSE } from "./sse"
|
||||
import { Context, Effect, Layer, Scope } from "effect"
|
||||
|
||||
export namespace Workspace {
|
||||
export const Info = WorkspaceInfo.meta({
|
||||
@@ -56,174 +56,250 @@ export namespace Workspace {
|
||||
}
|
||||
}
|
||||
|
||||
const CreateInput = z.object({
|
||||
export const CreateInput = z.object({
|
||||
id: WorkspaceID.zod.optional(),
|
||||
type: Info.shape.type,
|
||||
branch: Info.shape.branch,
|
||||
projectID: ProjectID.zod,
|
||||
extra: Info.shape.extra,
|
||||
})
|
||||
|
||||
export const create = fn(CreateInput, async (input) => {
|
||||
const id = WorkspaceID.ascending(input.id)
|
||||
const adaptor = await getAdaptor(input.type)
|
||||
|
||||
const config = await adaptor.configure({ ...input, id, name: null, directory: null })
|
||||
|
||||
const info: Info = {
|
||||
id,
|
||||
type: config.type,
|
||||
branch: config.branch ?? null,
|
||||
name: config.name ?? null,
|
||||
directory: config.directory ?? null,
|
||||
extra: config.extra ?? null,
|
||||
projectID: input.projectID,
|
||||
}
|
||||
|
||||
Database.use((db) => {
|
||||
db.insert(WorkspaceTable)
|
||||
.values({
|
||||
id: info.id,
|
||||
type: info.type,
|
||||
branch: info.branch,
|
||||
name: info.name,
|
||||
directory: info.directory,
|
||||
extra: info.extra,
|
||||
project_id: info.projectID,
|
||||
})
|
||||
.run()
|
||||
})
|
||||
|
||||
await adaptor.create(config)
|
||||
|
||||
startSync(info)
|
||||
|
||||
return info
|
||||
})
|
||||
|
||||
export function list(project: Project.Info) {
|
||||
const rows = Database.use((db) =>
|
||||
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
|
||||
)
|
||||
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
|
||||
for (const space of spaces) startSync(space)
|
||||
return spaces
|
||||
}
|
||||
|
||||
export const get = fn(WorkspaceID.zod, async (id) => {
|
||||
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (!row) return
|
||||
const space = fromRow(row)
|
||||
startSync(space)
|
||||
return space
|
||||
})
|
||||
|
||||
export const remove = fn(WorkspaceID.zod, async (id) => {
|
||||
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (row) {
|
||||
stopSync(id)
|
||||
|
||||
const info = fromRow(row)
|
||||
const adaptor = await getAdaptor(row.type)
|
||||
adaptor.remove(info)
|
||||
Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
|
||||
return info
|
||||
}
|
||||
})
|
||||
|
||||
const connections = new Map<WorkspaceID, ConnectionStatus>()
|
||||
const aborts = new Map<WorkspaceID, AbortController>()
|
||||
|
||||
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
|
||||
const prev = connections.get(id)
|
||||
if (prev?.status === status && prev?.error === error) return
|
||||
const next = { workspaceID: id, status, error }
|
||||
connections.set(id, next)
|
||||
GlobalBus.emit("event", {
|
||||
directory: "global",
|
||||
workspace: id,
|
||||
payload: {
|
||||
type: Event.Status.type,
|
||||
properties: next,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export function status(): ConnectionStatus[] {
|
||||
return [...connections.values()]
|
||||
}
|
||||
export type CreateInput = z.infer<typeof CreateInput>
|
||||
|
||||
const log = Log.create({ service: "workspace-sync" })
|
||||
|
||||
async function workspaceEventLoop(space: Info, signal: AbortSignal) {
|
||||
log.info("starting sync: " + space.id)
|
||||
export interface Interface {
|
||||
readonly create: (input: CreateInput) => Effect.Effect<Info>
|
||||
readonly list: (projectID: ProjectID) => Effect.Effect<Info[]>
|
||||
readonly get: (id: WorkspaceID) => Effect.Effect<Info | undefined>
|
||||
readonly remove: (id: WorkspaceID) => Effect.Effect<Info | undefined>
|
||||
readonly status: () => Effect.Effect<ConnectionStatus[]>
|
||||
}
|
||||
|
||||
while (!signal.aborted) {
|
||||
log.info("connecting to sync: " + space.id)
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/Workspace") {}
|
||||
|
||||
setStatus(space.id, "connecting")
|
||||
const adaptor = await getAdaptor(space.type)
|
||||
const target = await adaptor.target(space)
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const scope = yield* Scope.Scope
|
||||
const connections = new Map<WorkspaceID, ConnectionStatus>()
|
||||
const aborts = new Map<WorkspaceID, AbortController>()
|
||||
|
||||
if (target.type === "local") return
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.sync(() => {
|
||||
for (const abort of aborts.values()) abort.abort()
|
||||
}),
|
||||
)
|
||||
|
||||
const res = await fetch(target.url + "/sync/event", { method: "GET", signal }).catch((err: unknown) => {
|
||||
setStatus(space.id, "error", String(err))
|
||||
return undefined
|
||||
const db = <T>(fn: (db: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
const setStatus = Effect.fnUntraced(function* (
|
||||
id: WorkspaceID,
|
||||
status: ConnectionStatus["status"],
|
||||
error?: string,
|
||||
) {
|
||||
const prev = connections.get(id)
|
||||
if (prev?.status === status && prev?.error === error) return
|
||||
const next = { workspaceID: id, status, error }
|
||||
connections.set(id, next)
|
||||
GlobalBus.emit("event", {
|
||||
directory: "global",
|
||||
workspace: id,
|
||||
payload: {
|
||||
type: Event.Status.type,
|
||||
properties: next,
|
||||
},
|
||||
})
|
||||
})
|
||||
if (!res || !res.ok || !res.body) {
|
||||
log.info("failed to connect to sync: " + res?.status)
|
||||
|
||||
setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
|
||||
await sleep(1000)
|
||||
continue
|
||||
}
|
||||
setStatus(space.id, "connected")
|
||||
await parseSSE(res.body, signal, (evt) => {
|
||||
const event = evt as SyncEvent.SerializedEvent
|
||||
const stopSync = Effect.fnUntraced(function* (id: WorkspaceID) {
|
||||
aborts.get(id)?.abort()
|
||||
aborts.delete(id)
|
||||
connections.delete(id)
|
||||
})
|
||||
|
||||
try {
|
||||
if (!event.type.startsWith("server.")) {
|
||||
SyncEvent.replay(event)
|
||||
const workspaceEventLoop = Effect.fn("Workspace.workspaceEventLoop")(function* (
|
||||
space: Info,
|
||||
signal: AbortSignal,
|
||||
) {
|
||||
log.info("starting sync: " + space.id)
|
||||
|
||||
while (!signal.aborted) {
|
||||
log.info("connecting to sync: " + space.id)
|
||||
|
||||
yield* setStatus(space.id, "connecting")
|
||||
const adaptor = yield* Effect.promise(() => getAdaptor(space.type)).pipe(Effect.orDie)
|
||||
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))).pipe(Effect.orDie)
|
||||
|
||||
if (target.type === "local") return
|
||||
|
||||
const res = yield* Effect.tryPromise({
|
||||
try: () => fetch(target.url + "/sync/event", { method: "GET", signal }),
|
||||
catch: (err) => String(err),
|
||||
}).pipe(
|
||||
Effect.catch((err) => setStatus(space.id, "error", err).pipe(Effect.as(undefined as Response | undefined))),
|
||||
)
|
||||
|
||||
if (!res || !res.ok || !res.body) {
|
||||
log.info("failed to connect to sync: " + res?.status)
|
||||
yield* setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
|
||||
yield* Effect.sleep("1 second")
|
||||
continue
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn("failed to replay sync event", {
|
||||
workspaceID: space.id,
|
||||
error: err,
|
||||
})
|
||||
|
||||
const body = res.body
|
||||
yield* setStatus(space.id, "connected")
|
||||
yield* Effect.promise(() =>
|
||||
parseSSE(body, signal, (evt) => {
|
||||
const event = evt as SyncEvent.SerializedEvent
|
||||
|
||||
try {
|
||||
if (!event.type.startsWith("server.")) {
|
||||
SyncEvent.replay(event)
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn("failed to replay sync event", {
|
||||
workspaceID: space.id,
|
||||
error: err,
|
||||
})
|
||||
}
|
||||
}),
|
||||
).pipe(Effect.orDie)
|
||||
|
||||
yield* setStatus(space.id, "disconnected")
|
||||
log.info("disconnected to sync: " + space.id)
|
||||
yield* Effect.sleep("250 millis")
|
||||
}
|
||||
})
|
||||
setStatus(space.id, "disconnected")
|
||||
log.info("disconnected to sync: " + space.id)
|
||||
await sleep(250)
|
||||
}
|
||||
|
||||
const startSync = Effect.fn("Workspace.startSync")(function* (space: Info) {
|
||||
if (space.type === "worktree") {
|
||||
yield* Effect.gen(function* () {
|
||||
const exists = yield* fs.exists(space.directory!).pipe(Effect.orDie)
|
||||
yield* setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
|
||||
}).pipe(Effect.forkIn(scope))
|
||||
return
|
||||
}
|
||||
|
||||
if (aborts.has(space.id)) return
|
||||
|
||||
const abort = new AbortController()
|
||||
aborts.set(space.id, abort)
|
||||
yield* setStatus(space.id, "disconnected")
|
||||
yield* workspaceEventLoop(space, abort.signal).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.gen(function* () {
|
||||
yield* setStatus(space.id, "error", String(cause))
|
||||
yield* Effect.sync(() => {
|
||||
log.warn("workspace sync listener failed", {
|
||||
workspaceID: space.id,
|
||||
error: cause,
|
||||
})
|
||||
})
|
||||
}),
|
||||
),
|
||||
Effect.forkIn(scope),
|
||||
)
|
||||
})
|
||||
|
||||
const create = Effect.fn("Workspace.create")(function* (input: CreateInput) {
|
||||
const id = WorkspaceID.ascending(input.id)
|
||||
const adaptor = yield* Effect.promise(() => getAdaptor(input.type)).pipe(Effect.orDie)
|
||||
const config = yield* Effect.promise(() =>
|
||||
Promise.resolve(adaptor.configure({ ...input, id, name: null, directory: null })),
|
||||
).pipe(Effect.orDie)
|
||||
|
||||
const info: Info = {
|
||||
id,
|
||||
type: config.type,
|
||||
branch: config.branch ?? null,
|
||||
name: config.name ?? null,
|
||||
directory: config.directory ?? null,
|
||||
extra: config.extra ?? null,
|
||||
projectID: input.projectID,
|
||||
}
|
||||
|
||||
yield* db((db) => {
|
||||
db.insert(WorkspaceTable)
|
||||
.values({
|
||||
id: info.id,
|
||||
type: info.type,
|
||||
branch: info.branch,
|
||||
name: info.name,
|
||||
directory: info.directory,
|
||||
extra: info.extra,
|
||||
project_id: info.projectID,
|
||||
})
|
||||
.run()
|
||||
})
|
||||
|
||||
yield* Effect.promise(() => adaptor.create(config)).pipe(Effect.orDie)
|
||||
yield* startSync(info)
|
||||
return info
|
||||
})
|
||||
|
||||
const list = Effect.fn("Workspace.list")(function* (projectID: ProjectID) {
|
||||
const rows = yield* db((db) =>
|
||||
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, projectID)).all(),
|
||||
)
|
||||
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
|
||||
for (const space of spaces) {
|
||||
yield* startSync(space)
|
||||
}
|
||||
return spaces
|
||||
})
|
||||
|
||||
const get = Effect.fn("Workspace.get")(function* (id: WorkspaceID) {
|
||||
const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (!row) return
|
||||
const space = fromRow(row)
|
||||
yield* startSync(space)
|
||||
return space
|
||||
})
|
||||
|
||||
const remove = Effect.fn("Workspace.remove")(function* (id: WorkspaceID) {
|
||||
const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (!row) return
|
||||
|
||||
yield* stopSync(id)
|
||||
|
||||
const info = fromRow(row)
|
||||
const adaptor = yield* Effect.promise(() => getAdaptor(row.type)).pipe(Effect.orDie)
|
||||
yield* Effect.sync(() => {
|
||||
void adaptor.remove(info)
|
||||
})
|
||||
yield* db((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
|
||||
return info
|
||||
})
|
||||
|
||||
const status = Effect.fn("Workspace.status")(() => Effect.succeed([...connections.values()]))
|
||||
|
||||
return Service.of({ create, list, get, remove, status })
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer))
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export async function create(input: CreateInput) {
|
||||
return runPromise((svc) => svc.create(input))
|
||||
}
|
||||
|
||||
function startSync(space: Info) {
|
||||
if (space.type === "worktree") {
|
||||
void Filesystem.exists(space.directory!).then((exists) => {
|
||||
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (aborts.has(space.id)) return
|
||||
const abort = new AbortController()
|
||||
aborts.set(space.id, abort)
|
||||
setStatus(space.id, "disconnected")
|
||||
|
||||
void workspaceEventLoop(space, abort.signal).catch((error) => {
|
||||
setStatus(space.id, "error", String(error))
|
||||
log.warn("workspace sync listener failed", {
|
||||
workspaceID: space.id,
|
||||
error,
|
||||
})
|
||||
})
|
||||
export async function list(project: Project.Info) {
|
||||
return runPromise((svc) => svc.list(project.id))
|
||||
}
|
||||
|
||||
function stopSync(id: WorkspaceID) {
|
||||
aborts.get(id)?.abort()
|
||||
aborts.delete(id)
|
||||
connections.delete(id)
|
||||
export async function get(id: WorkspaceID) {
|
||||
return runPromise((svc) => svc.get(id))
|
||||
}
|
||||
|
||||
export async function remove(id: WorkspaceID) {
|
||||
return runPromise((svc) => svc.remove(id))
|
||||
}
|
||||
|
||||
export async function status() {
|
||||
return runPromise((svc) => svc.status())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ export const WorkspaceRoutes = lazy(() =>
|
||||
}),
|
||||
validator(
|
||||
"json",
|
||||
Workspace.create.schema.omit({
|
||||
Workspace.CreateInput.omit({
|
||||
projectID: true,
|
||||
}),
|
||||
),
|
||||
@@ -59,7 +59,7 @@ export const WorkspaceRoutes = lazy(() =>
|
||||
},
|
||||
}),
|
||||
async (c) => {
|
||||
return c.json(Workspace.list(Instance.project))
|
||||
return c.json(await Workspace.list(Instance.project))
|
||||
},
|
||||
)
|
||||
.get(
|
||||
@@ -80,8 +80,8 @@ export const WorkspaceRoutes = lazy(() =>
|
||||
},
|
||||
}),
|
||||
async (c) => {
|
||||
const ids = new Set(Workspace.list(Instance.project).map((item) => item.id))
|
||||
return c.json(Workspace.status().filter((item) => ids.has(item.workspaceID)))
|
||||
const ids = new Set((await Workspace.list(Instance.project)).map((item) => item.id))
|
||||
return c.json((await Workspace.status()).filter((item) => ids.has(item.workspaceID)))
|
||||
},
|
||||
)
|
||||
.delete(
|
||||
|
||||
Reference in New Issue
Block a user