mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-11 16:34:52 +00:00
Compare commits
10 Commits
kit/effect
...
kit/effect
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c3eb2418ca | ||
|
|
397adb4190 | ||
|
|
dbc136d3b3 | ||
|
|
2e340d976f | ||
|
|
fe4dfb9f6f | ||
|
|
5e3dc80999 | ||
|
|
e2c41dce99 | ||
|
|
d84cc33742 | ||
|
|
f8580ab9f7 | ||
|
|
af7ada9a7a |
1
bun.lock
1
bun.lock
@@ -450,6 +450,7 @@
|
||||
"version": "1.4.3",
|
||||
"dependencies": {
|
||||
"@opencode-ai/sdk": "workspace:*",
|
||||
"effect": "catalog:",
|
||||
"zod": "catalog:",
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -29,6 +29,7 @@ import { Provider } from "../../provider/provider"
|
||||
import { Bus } from "../../bus"
|
||||
import { MessageV2 } from "../../session/message-v2"
|
||||
import { SessionPrompt } from "@/session/prompt"
|
||||
import { AppRuntime } from "@/effect/app-runtime"
|
||||
import { Git } from "@/git"
|
||||
import { setTimeout as sleep } from "node:timers/promises"
|
||||
import { Process } from "@/util/process"
|
||||
@@ -258,7 +259,9 @@ export const GithubInstallCommand = cmd({
|
||||
}
|
||||
|
||||
// Get repo info
|
||||
const info = (await Git.run(["remote", "get-url", "origin"], { cwd: Instance.worktree })).text().trim()
|
||||
const info = await AppRuntime.runPromise(
|
||||
Git.Service.use((git) => git.run(["remote", "get-url", "origin"], { cwd: Instance.worktree })),
|
||||
).then((x) => x.text().trim())
|
||||
const parsed = parseGitHubRemote(info)
|
||||
if (!parsed) {
|
||||
prompts.log.error(`Could not find git repository. Please run this command from a git repository.`)
|
||||
@@ -497,20 +500,21 @@ export const GithubRunCommand = cmd({
|
||||
: "issue"
|
||||
: undefined
|
||||
const gitText = async (args: string[]) => {
|
||||
const result = await Git.run(args, { cwd: Instance.worktree })
|
||||
const result = await AppRuntime.runPromise(Git.Service.use((git) => git.run(args, { cwd: Instance.worktree })))
|
||||
if (result.exitCode !== 0) {
|
||||
throw new Process.RunFailedError(["git", ...args], result.exitCode, result.stdout, result.stderr)
|
||||
}
|
||||
return result.text().trim()
|
||||
}
|
||||
const gitRun = async (args: string[]) => {
|
||||
const result = await Git.run(args, { cwd: Instance.worktree })
|
||||
const result = await AppRuntime.runPromise(Git.Service.use((git) => git.run(args, { cwd: Instance.worktree })))
|
||||
if (result.exitCode !== 0) {
|
||||
throw new Process.RunFailedError(["git", ...args], result.exitCode, result.stdout, result.stderr)
|
||||
}
|
||||
return result
|
||||
}
|
||||
const gitStatus = (args: string[]) => Git.run(args, { cwd: Instance.worktree })
|
||||
const gitStatus = (args: string[]) =>
|
||||
AppRuntime.runPromise(Git.Service.use((git) => git.run(args, { cwd: Instance.worktree })))
|
||||
const commitChanges = async (summary: string, actor?: string) => {
|
||||
const args = ["commit", "-m", summary]
|
||||
if (actor) args.push("-m", `Co-authored-by: ${actor} <${actor}@users.noreply.github.com>`)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { UI } from "../ui"
|
||||
import { cmd } from "./cmd"
|
||||
import { AppRuntime } from "@/effect/app-runtime"
|
||||
import { Git } from "@/git"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { Process } from "@/util/process"
|
||||
@@ -67,19 +68,29 @@ export const PrCommand = cmd({
|
||||
const remoteName = forkOwner
|
||||
|
||||
// Check if remote already exists
|
||||
const remotes = (await Git.run(["remote"], { cwd: Instance.worktree })).text().trim()
|
||||
const remotes = await AppRuntime.runPromise(
|
||||
Git.Service.use((git) => git.run(["remote"], { cwd: Instance.worktree })),
|
||||
).then((x) => x.text().trim())
|
||||
if (!remotes.split("\n").includes(remoteName)) {
|
||||
await Git.run(["remote", "add", remoteName, `https://github.com/${forkOwner}/${forkName}.git`], {
|
||||
cwd: Instance.worktree,
|
||||
})
|
||||
await AppRuntime.runPromise(
|
||||
Git.Service.use((git) =>
|
||||
git.run(["remote", "add", remoteName, `https://github.com/${forkOwner}/${forkName}.git`], {
|
||||
cwd: Instance.worktree,
|
||||
}),
|
||||
),
|
||||
)
|
||||
UI.println(`Added fork remote: ${remoteName}`)
|
||||
}
|
||||
|
||||
// Set upstream to the fork so pushes go there
|
||||
const headRefName = prInfo.headRefName
|
||||
await Git.run(["branch", `--set-upstream-to=${remoteName}/${headRefName}`, localBranchName], {
|
||||
cwd: Instance.worktree,
|
||||
})
|
||||
await AppRuntime.runPromise(
|
||||
Git.Service.use((git) =>
|
||||
git.run(["branch", `--set-upstream-to=${remoteName}/${headRefName}`, localBranchName], {
|
||||
cwd: Instance.worktree,
|
||||
}),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
// Check for opencode session link in PR body
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import type { InstanceContext } from "@/project/instance"
|
||||
import { SessionID, MessageID } from "@/session/schema"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
@@ -189,10 +188,4 @@ export namespace Command {
|
||||
Layer.provide(MCP.defaultLayer),
|
||||
Layer.provide(Skill.defaultLayer),
|
||||
)
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export async function list() {
|
||||
return runPromise((svc) => svc.list())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
import z from "zod"
|
||||
import { AppFileSystem } from "@/filesystem"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { setTimeout as sleep } from "node:timers/promises"
|
||||
import { fn } from "@/util/fn"
|
||||
import { Database, eq } from "@/storage/db"
|
||||
import type { Project } from "@/project/project"
|
||||
import { Project } from "@/project/project"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { GlobalBus } from "@/bus/global"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { Log } from "@/util/log"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { ProjectID } from "@/project/schema"
|
||||
import { WorkspaceTable } from "./workspace.sql"
|
||||
import { getAdaptor } from "./adaptors"
|
||||
import { WorkspaceInfo } from "./types"
|
||||
import { WorkspaceID } from "./schema"
|
||||
import { parseSSE } from "./sse"
|
||||
import { Context, Effect, Layer, Scope } from "effect"
|
||||
|
||||
export namespace Workspace {
|
||||
export const Info = WorkspaceInfo.meta({
|
||||
@@ -56,250 +56,174 @@ export namespace Workspace {
|
||||
}
|
||||
}
|
||||
|
||||
export const CreateInput = z.object({
|
||||
const CreateInput = z.object({
|
||||
id: WorkspaceID.zod.optional(),
|
||||
type: Info.shape.type,
|
||||
branch: Info.shape.branch,
|
||||
projectID: ProjectID.zod,
|
||||
extra: Info.shape.extra,
|
||||
})
|
||||
export type CreateInput = z.infer<typeof CreateInput>
|
||||
|
||||
export const create = fn(CreateInput, async (input) => {
|
||||
const id = WorkspaceID.ascending(input.id)
|
||||
const adaptor = await getAdaptor(input.type)
|
||||
|
||||
const config = await adaptor.configure({ ...input, id, name: null, directory: null })
|
||||
|
||||
const info: Info = {
|
||||
id,
|
||||
type: config.type,
|
||||
branch: config.branch ?? null,
|
||||
name: config.name ?? null,
|
||||
directory: config.directory ?? null,
|
||||
extra: config.extra ?? null,
|
||||
projectID: input.projectID,
|
||||
}
|
||||
|
||||
Database.use((db) => {
|
||||
db.insert(WorkspaceTable)
|
||||
.values({
|
||||
id: info.id,
|
||||
type: info.type,
|
||||
branch: info.branch,
|
||||
name: info.name,
|
||||
directory: info.directory,
|
||||
extra: info.extra,
|
||||
project_id: info.projectID,
|
||||
})
|
||||
.run()
|
||||
})
|
||||
|
||||
await adaptor.create(config)
|
||||
|
||||
startSync(info)
|
||||
|
||||
return info
|
||||
})
|
||||
|
||||
export function list(project: Project.Info) {
|
||||
const rows = Database.use((db) =>
|
||||
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
|
||||
)
|
||||
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
|
||||
for (const space of spaces) startSync(space)
|
||||
return spaces
|
||||
}
|
||||
|
||||
export const get = fn(WorkspaceID.zod, async (id) => {
|
||||
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (!row) return
|
||||
const space = fromRow(row)
|
||||
startSync(space)
|
||||
return space
|
||||
})
|
||||
|
||||
export const remove = fn(WorkspaceID.zod, async (id) => {
|
||||
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (row) {
|
||||
stopSync(id)
|
||||
|
||||
const info = fromRow(row)
|
||||
const adaptor = await getAdaptor(row.type)
|
||||
adaptor.remove(info)
|
||||
Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
|
||||
return info
|
||||
}
|
||||
})
|
||||
|
||||
const connections = new Map<WorkspaceID, ConnectionStatus>()
|
||||
const aborts = new Map<WorkspaceID, AbortController>()
|
||||
|
||||
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
|
||||
const prev = connections.get(id)
|
||||
if (prev?.status === status && prev?.error === error) return
|
||||
const next = { workspaceID: id, status, error }
|
||||
connections.set(id, next)
|
||||
GlobalBus.emit("event", {
|
||||
directory: "global",
|
||||
workspace: id,
|
||||
payload: {
|
||||
type: Event.Status.type,
|
||||
properties: next,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export function status(): ConnectionStatus[] {
|
||||
return [...connections.values()]
|
||||
}
|
||||
|
||||
const log = Log.create({ service: "workspace-sync" })
|
||||
|
||||
export interface Interface {
|
||||
readonly create: (input: CreateInput) => Effect.Effect<Info>
|
||||
readonly list: (projectID: ProjectID) => Effect.Effect<Info[]>
|
||||
readonly get: (id: WorkspaceID) => Effect.Effect<Info | undefined>
|
||||
readonly remove: (id: WorkspaceID) => Effect.Effect<Info | undefined>
|
||||
readonly status: () => Effect.Effect<ConnectionStatus[]>
|
||||
}
|
||||
async function workspaceEventLoop(space: Info, signal: AbortSignal) {
|
||||
log.info("starting sync: " + space.id)
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/Workspace") {}
|
||||
while (!signal.aborted) {
|
||||
log.info("connecting to sync: " + space.id)
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const scope = yield* Scope.Scope
|
||||
const connections = new Map<WorkspaceID, ConnectionStatus>()
|
||||
const aborts = new Map<WorkspaceID, AbortController>()
|
||||
setStatus(space.id, "connecting")
|
||||
const adaptor = await getAdaptor(space.type)
|
||||
const target = await adaptor.target(space)
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.sync(() => {
|
||||
for (const abort of aborts.values()) abort.abort()
|
||||
}),
|
||||
)
|
||||
if (target.type === "local") return
|
||||
|
||||
const db = <T>(fn: (db: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
const setStatus = Effect.fnUntraced(function* (
|
||||
id: WorkspaceID,
|
||||
status: ConnectionStatus["status"],
|
||||
error?: string,
|
||||
) {
|
||||
const prev = connections.get(id)
|
||||
if (prev?.status === status && prev?.error === error) return
|
||||
const next = { workspaceID: id, status, error }
|
||||
connections.set(id, next)
|
||||
GlobalBus.emit("event", {
|
||||
directory: "global",
|
||||
workspace: id,
|
||||
payload: {
|
||||
type: Event.Status.type,
|
||||
properties: next,
|
||||
},
|
||||
})
|
||||
const res = await fetch(target.url + "/sync/event", { method: "GET", signal }).catch((err: unknown) => {
|
||||
setStatus(space.id, "error", String(err))
|
||||
return undefined
|
||||
})
|
||||
if (!res || !res.ok || !res.body) {
|
||||
log.info("failed to connect to sync: " + res?.status)
|
||||
|
||||
const stopSync = Effect.fnUntraced(function* (id: WorkspaceID) {
|
||||
aborts.get(id)?.abort()
|
||||
aborts.delete(id)
|
||||
connections.delete(id)
|
||||
})
|
||||
setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
|
||||
await sleep(1000)
|
||||
continue
|
||||
}
|
||||
setStatus(space.id, "connected")
|
||||
await parseSSE(res.body, signal, (evt) => {
|
||||
const event = evt as SyncEvent.SerializedEvent
|
||||
|
||||
const workspaceEventLoop = Effect.fn("Workspace.workspaceEventLoop")(function* (
|
||||
space: Info,
|
||||
signal: AbortSignal,
|
||||
) {
|
||||
log.info("starting sync: " + space.id)
|
||||
|
||||
while (!signal.aborted) {
|
||||
log.info("connecting to sync: " + space.id)
|
||||
|
||||
yield* setStatus(space.id, "connecting")
|
||||
const adaptor = yield* Effect.promise(() => getAdaptor(space.type)).pipe(Effect.orDie)
|
||||
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))).pipe(Effect.orDie)
|
||||
|
||||
if (target.type === "local") return
|
||||
|
||||
const res = yield* Effect.tryPromise({
|
||||
try: () => fetch(target.url + "/sync/event", { method: "GET", signal }),
|
||||
catch: (err) => String(err),
|
||||
}).pipe(
|
||||
Effect.catch((err) => setStatus(space.id, "error", err).pipe(Effect.as(undefined as Response | undefined))),
|
||||
)
|
||||
|
||||
if (!res || !res.ok || !res.body) {
|
||||
log.info("failed to connect to sync: " + res?.status)
|
||||
yield* setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
|
||||
yield* Effect.sleep("1 second")
|
||||
continue
|
||||
try {
|
||||
if (!event.type.startsWith("server.")) {
|
||||
SyncEvent.replay(event)
|
||||
}
|
||||
|
||||
const body = res.body
|
||||
yield* setStatus(space.id, "connected")
|
||||
yield* Effect.promise(() =>
|
||||
parseSSE(body, signal, (evt) => {
|
||||
const event = evt as SyncEvent.SerializedEvent
|
||||
|
||||
try {
|
||||
if (!event.type.startsWith("server.")) {
|
||||
SyncEvent.replay(event)
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn("failed to replay sync event", {
|
||||
workspaceID: space.id,
|
||||
error: err,
|
||||
})
|
||||
}
|
||||
}),
|
||||
).pipe(Effect.orDie)
|
||||
|
||||
yield* setStatus(space.id, "disconnected")
|
||||
log.info("disconnected to sync: " + space.id)
|
||||
yield* Effect.sleep("250 millis")
|
||||
} catch (err) {
|
||||
log.warn("failed to replay sync event", {
|
||||
workspaceID: space.id,
|
||||
error: err,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
const startSync = Effect.fn("Workspace.startSync")(function* (space: Info) {
|
||||
if (space.type === "worktree") {
|
||||
yield* Effect.gen(function* () {
|
||||
const exists = yield* fs.exists(space.directory!).pipe(Effect.orDie)
|
||||
yield* setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
|
||||
}).pipe(Effect.forkIn(scope))
|
||||
return
|
||||
}
|
||||
|
||||
if (aborts.has(space.id)) return
|
||||
|
||||
const abort = new AbortController()
|
||||
aborts.set(space.id, abort)
|
||||
yield* setStatus(space.id, "disconnected")
|
||||
yield* workspaceEventLoop(space, abort.signal).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.gen(function* () {
|
||||
yield* setStatus(space.id, "error", String(cause))
|
||||
yield* Effect.sync(() => {
|
||||
log.warn("workspace sync listener failed", {
|
||||
workspaceID: space.id,
|
||||
error: cause,
|
||||
})
|
||||
})
|
||||
}),
|
||||
),
|
||||
Effect.forkIn(scope),
|
||||
)
|
||||
})
|
||||
|
||||
const create = Effect.fn("Workspace.create")(function* (input: CreateInput) {
|
||||
const id = WorkspaceID.ascending(input.id)
|
||||
const adaptor = yield* Effect.promise(() => getAdaptor(input.type)).pipe(Effect.orDie)
|
||||
const config = yield* Effect.promise(() =>
|
||||
Promise.resolve(adaptor.configure({ ...input, id, name: null, directory: null })),
|
||||
).pipe(Effect.orDie)
|
||||
|
||||
const info: Info = {
|
||||
id,
|
||||
type: config.type,
|
||||
branch: config.branch ?? null,
|
||||
name: config.name ?? null,
|
||||
directory: config.directory ?? null,
|
||||
extra: config.extra ?? null,
|
||||
projectID: input.projectID,
|
||||
}
|
||||
|
||||
yield* db((db) => {
|
||||
db.insert(WorkspaceTable)
|
||||
.values({
|
||||
id: info.id,
|
||||
type: info.type,
|
||||
branch: info.branch,
|
||||
name: info.name,
|
||||
directory: info.directory,
|
||||
extra: info.extra,
|
||||
project_id: info.projectID,
|
||||
})
|
||||
.run()
|
||||
})
|
||||
|
||||
yield* Effect.promise(() => adaptor.create(config)).pipe(Effect.orDie)
|
||||
yield* startSync(info)
|
||||
return info
|
||||
})
|
||||
|
||||
const list = Effect.fn("Workspace.list")(function* (projectID: ProjectID) {
|
||||
const rows = yield* db((db) =>
|
||||
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, projectID)).all(),
|
||||
)
|
||||
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
|
||||
for (const space of spaces) {
|
||||
yield* startSync(space)
|
||||
}
|
||||
return spaces
|
||||
})
|
||||
|
||||
const get = Effect.fn("Workspace.get")(function* (id: WorkspaceID) {
|
||||
const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (!row) return
|
||||
const space = fromRow(row)
|
||||
yield* startSync(space)
|
||||
return space
|
||||
})
|
||||
|
||||
const remove = Effect.fn("Workspace.remove")(function* (id: WorkspaceID) {
|
||||
const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (!row) return
|
||||
|
||||
yield* stopSync(id)
|
||||
|
||||
const info = fromRow(row)
|
||||
const adaptor = yield* Effect.promise(() => getAdaptor(row.type)).pipe(Effect.orDie)
|
||||
yield* Effect.sync(() => {
|
||||
void adaptor.remove(info)
|
||||
})
|
||||
yield* db((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
|
||||
return info
|
||||
})
|
||||
|
||||
const status = Effect.fn("Workspace.status")(() => Effect.succeed([...connections.values()]))
|
||||
|
||||
return Service.of({ create, list, get, remove, status })
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer))
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export async function create(input: CreateInput) {
|
||||
return runPromise((svc) => svc.create(input))
|
||||
setStatus(space.id, "disconnected")
|
||||
log.info("disconnected to sync: " + space.id)
|
||||
await sleep(250)
|
||||
}
|
||||
}
|
||||
|
||||
export async function list(project: Project.Info) {
|
||||
return runPromise((svc) => svc.list(project.id))
|
||||
function startSync(space: Info) {
|
||||
if (space.type === "worktree") {
|
||||
void Filesystem.exists(space.directory!).then((exists) => {
|
||||
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (aborts.has(space.id)) return
|
||||
const abort = new AbortController()
|
||||
aborts.set(space.id, abort)
|
||||
setStatus(space.id, "disconnected")
|
||||
|
||||
void workspaceEventLoop(space, abort.signal).catch((error) => {
|
||||
setStatus(space.id, "error", String(error))
|
||||
log.warn("workspace sync listener failed", {
|
||||
workspaceID: space.id,
|
||||
error,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
export async function get(id: WorkspaceID) {
|
||||
return runPromise((svc) => svc.get(id))
|
||||
}
|
||||
|
||||
export async function remove(id: WorkspaceID) {
|
||||
return runPromise((svc) => svc.remove(id))
|
||||
}
|
||||
|
||||
export async function status() {
|
||||
return runPromise((svc) => svc.status())
|
||||
function stopSync(id: WorkspaceID) {
|
||||
aborts.get(id)?.abort()
|
||||
aborts.delete(id)
|
||||
connections.delete(id)
|
||||
}
|
||||
}
|
||||
|
||||
9
packages/opencode/src/effect/bootstrap-runtime.ts
Normal file
9
packages/opencode/src/effect/bootstrap-runtime.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import { Layer, ManagedRuntime } from "effect"
|
||||
import { memoMap } from "./run-service"
|
||||
|
||||
import { Format } from "@/format"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
|
||||
export const BootstrapLayer = Layer.mergeAll(Format.defaultLayer, ShareNext.defaultLayer)
|
||||
|
||||
export const BootstrapRuntime = ManagedRuntime.make(BootstrapLayer, { memoMap })
|
||||
@@ -1,7 +1,6 @@
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
import { Effect, Layer, Context, Stream } from "effect"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
|
||||
export namespace Git {
|
||||
const cfg = [
|
||||
@@ -258,14 +257,4 @@ export namespace Git {
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(Layer.provide(CrossSpawnSpawner.defaultLayer))
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export async function run(args: string[], opts: Options) {
|
||||
return runPromise((git) => git.run(args, opts))
|
||||
}
|
||||
|
||||
export async function defaultBranch(cwd: string) {
|
||||
return runPromise((git) => git.defaultBranch(cwd))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,14 +10,14 @@ import { Bus } from "../bus"
|
||||
import { Command } from "../command"
|
||||
import { Instance } from "./instance"
|
||||
import { Log } from "@/util/log"
|
||||
import { AppRuntime } from "@/effect/app-runtime"
|
||||
import { BootstrapRuntime } from "@/effect/bootstrap-runtime"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
|
||||
export async function InstanceBootstrap() {
|
||||
Log.Default.info("bootstrapping", { directory: Instance.directory })
|
||||
await Plugin.init()
|
||||
void AppRuntime.runPromise(ShareNext.Service.use((svc) => svc.init()))
|
||||
void AppRuntime.runPromise(Format.Service.use((svc) => svc.init()))
|
||||
void BootstrapRuntime.runPromise(ShareNext.Service.use((svc) => svc.init()))
|
||||
void BootstrapRuntime.runPromise(Format.Service.use((svc) => svc.init()))
|
||||
await LSP.init()
|
||||
File.init()
|
||||
FileWatcher.init()
|
||||
|
||||
@@ -191,7 +191,7 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket, app: Hono = new Hono()
|
||||
},
|
||||
}),
|
||||
async (c) => {
|
||||
const commands = await Command.list()
|
||||
const commands = await AppRuntime.runPromise(Command.Service.use((svc) => svc.list()))
|
||||
return c.json(commands)
|
||||
},
|
||||
)
|
||||
|
||||
@@ -28,7 +28,7 @@ export const WorkspaceRoutes = lazy(() =>
|
||||
}),
|
||||
validator(
|
||||
"json",
|
||||
Workspace.CreateInput.omit({
|
||||
Workspace.create.schema.omit({
|
||||
projectID: true,
|
||||
}),
|
||||
),
|
||||
@@ -59,7 +59,7 @@ export const WorkspaceRoutes = lazy(() =>
|
||||
},
|
||||
}),
|
||||
async (c) => {
|
||||
return c.json(await Workspace.list(Instance.project))
|
||||
return c.json(Workspace.list(Instance.project))
|
||||
},
|
||||
)
|
||||
.get(
|
||||
@@ -80,8 +80,8 @@ export const WorkspaceRoutes = lazy(() =>
|
||||
},
|
||||
}),
|
||||
async (c) => {
|
||||
const ids = new Set((await Workspace.list(Instance.project)).map((item) => item.id))
|
||||
return c.json((await Workspace.status()).filter((item) => ids.has(item.workspaceID)))
|
||||
const ids = new Set(Workspace.list(Instance.project).map((item) => item.id))
|
||||
return c.json(Workspace.status().filter((item) => ids.has(item.workspaceID)))
|
||||
},
|
||||
)
|
||||
.delete(
|
||||
|
||||
@@ -366,11 +366,12 @@ export namespace Session {
|
||||
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> = Layer.effect(
|
||||
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const storage = yield* Storage.Service
|
||||
const sync = yield* SyncEvent.Service
|
||||
|
||||
const createNext = Effect.fn("Session.createNext")(function* (input: {
|
||||
id?: SessionID
|
||||
@@ -398,7 +399,7 @@ export namespace Session {
|
||||
}
|
||||
log.info("created", result)
|
||||
|
||||
yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
|
||||
yield* sync.run(Event.Created, { sessionID: result.id, info: result })
|
||||
|
||||
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
|
||||
// This only exist for backwards compatibility. We should not be
|
||||
@@ -446,10 +447,12 @@ export namespace Session {
|
||||
Effect.catchCause(() => Effect.succeed(false)),
|
||||
)
|
||||
|
||||
yield* Effect.sync(() => {
|
||||
SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
|
||||
SyncEvent.remove(sessionID)
|
||||
})
|
||||
if (hasInstance) {
|
||||
yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: true })
|
||||
} else {
|
||||
yield* Effect.sync(() => SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: false }))
|
||||
}
|
||||
yield* sync.remove(sessionID)
|
||||
} catch (e) {
|
||||
log.error(e)
|
||||
}
|
||||
@@ -457,19 +460,17 @@ export namespace Session {
|
||||
|
||||
const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
|
||||
Effect.gen(function* () {
|
||||
yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
|
||||
yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
|
||||
return msg
|
||||
}).pipe(Effect.withSpan("Session.updateMessage"))
|
||||
|
||||
const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
|
||||
Effect.gen(function* () {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID: part.sessionID,
|
||||
part: structuredClone(part),
|
||||
time: Date.now(),
|
||||
}),
|
||||
)
|
||||
yield* sync.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID: part.sessionID,
|
||||
part: structuredClone(part),
|
||||
time: Date.now(),
|
||||
})
|
||||
return part
|
||||
}).pipe(Effect.withSpan("Session.updatePart"))
|
||||
|
||||
@@ -549,8 +550,7 @@ export namespace Session {
|
||||
return session
|
||||
})
|
||||
|
||||
const patch = (sessionID: SessionID, info: Patch) =>
|
||||
Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info }))
|
||||
const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info })
|
||||
|
||||
const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
|
||||
yield* patch(sessionID, { time: { updated: Date.now() } })
|
||||
@@ -607,12 +607,10 @@ export namespace Session {
|
||||
sessionID: SessionID
|
||||
messageID: MessageID
|
||||
}) {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
}),
|
||||
)
|
||||
yield* sync.run(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
})
|
||||
return input.messageID
|
||||
})
|
||||
|
||||
@@ -621,13 +619,11 @@ export namespace Session {
|
||||
messageID: MessageID
|
||||
partID: PartID
|
||||
}) {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
}),
|
||||
)
|
||||
yield* sync.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
})
|
||||
return input.partID
|
||||
})
|
||||
|
||||
@@ -678,7 +674,11 @@ export namespace Session {
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer))
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Storage.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
)
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ export namespace SessionRevert {
|
||||
const bus = yield* Bus.Service
|
||||
const summary = yield* SessionSummary.Service
|
||||
const state = yield* SessionRunState.Service
|
||||
const sync = yield* SyncEvent.Service
|
||||
|
||||
const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
|
||||
yield* state.assertNotBusy(input.sessionID)
|
||||
@@ -123,7 +124,7 @@ export namespace SessionRevert {
|
||||
remove.push(msg)
|
||||
}
|
||||
for (const msg of remove) {
|
||||
SyncEvent.run(MessageV2.Event.Removed, {
|
||||
yield* sync.run(MessageV2.Event.Removed, {
|
||||
sessionID,
|
||||
messageID: msg.info.id,
|
||||
})
|
||||
@@ -135,7 +136,7 @@ export namespace SessionRevert {
|
||||
const removeParts = target.parts.slice(idx)
|
||||
target.parts = target.parts.slice(0, idx)
|
||||
for (const part of removeParts) {
|
||||
SyncEvent.run(MessageV2.Event.PartRemoved, {
|
||||
yield* sync.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID,
|
||||
messageID: target.info.id,
|
||||
partID: part.id,
|
||||
@@ -158,6 +159,7 @@ export namespace SessionRevert {
|
||||
Layer.provide(Storage.defaultLayer),
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(SessionSummary.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -24,20 +24,19 @@ export namespace SessionShare {
|
||||
const session = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const scope = yield* Scope.Scope
|
||||
const sync = yield* SyncEvent.Service
|
||||
|
||||
const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
|
||||
const conf = yield* cfg.get()
|
||||
if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
|
||||
const result = yield* shareNext.create(sessionID)
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
|
||||
)
|
||||
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } })
|
||||
return result
|
||||
})
|
||||
|
||||
const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
|
||||
yield* shareNext.remove(sessionID)
|
||||
yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
|
||||
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })
|
||||
})
|
||||
|
||||
const create = Effect.fn("SessionShare.create")(function* (input?: Parameters<typeof Session.create>[0]) {
|
||||
@@ -57,6 +56,7 @@ export namespace SessionShare {
|
||||
Layer.provide(ShareNext.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
)
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
@@ -1,14 +1,20 @@
|
||||
import z from "zod"
|
||||
import type { ZodObject } from "zod"
|
||||
import { EventEmitter } from "events"
|
||||
import { Context, Effect, Layer } from "effect"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { Database, eq } from "@/storage/db"
|
||||
import { Bus as ProjectBus } from "@/bus"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { EventSequenceTable, EventTable } from "./event.sql"
|
||||
import { EventID } from "./schema"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { Instance } from "@/project/instance"
|
||||
|
||||
export namespace SyncEvent {
|
||||
type Convert = (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
|
||||
|
||||
export type Definition = {
|
||||
type: string
|
||||
version: number
|
||||
@@ -30,38 +36,249 @@ export namespace SyncEvent {
|
||||
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
|
||||
|
||||
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
|
||||
type Init = { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: Convert }
|
||||
type Payload = { def: Definition; event: Event }
|
||||
|
||||
export const registry = new Map<string, Definition>()
|
||||
let projectors: Map<Definition, ProjectorFunc> | undefined
|
||||
const versions = new Map<string, number>()
|
||||
let frozen = false
|
||||
let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
|
||||
let projectors: Map<Definition, ProjectorFunc> | undefined
|
||||
let convert: Convert = (_, data) => data as Record<string, unknown>
|
||||
const bus = new EventEmitter<{ event: [Payload] }>()
|
||||
|
||||
const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>()
|
||||
|
||||
export function reset() {
|
||||
frozen = false
|
||||
projectors = undefined
|
||||
convertEvent = (_, data) => data
|
||||
export interface Interface {
|
||||
readonly reset: () => Effect.Effect<void>
|
||||
readonly init: (input: Init) => Effect.Effect<void>
|
||||
readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect<void>
|
||||
readonly run: <Def extends Definition>(
|
||||
def: Def,
|
||||
data: Event<Def>["data"],
|
||||
options?: { publish?: boolean },
|
||||
) => Effect.Effect<void>
|
||||
readonly remove: (aggregateID: string) => Effect.Effect<void>
|
||||
readonly subscribeAll: (handler: (event: Payload) => void) => Effect.Effect<() => void>
|
||||
readonly payloads: () => z.ZodTypeAny
|
||||
}
|
||||
|
||||
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
|
||||
projectors = new Map(input.projectors)
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
|
||||
|
||||
// Install all the latest event defs to the bus. We only ever emit
|
||||
// latest versions from code, and keep around old versions for
|
||||
// replaying. Replaying does not go through the bus, and it
|
||||
// simplifies the bus to only use unversioned latest events
|
||||
for (let [type, version] of versions.entries()) {
|
||||
let def = registry.get(versionedType(type, version))!
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const reset = Effect.fn("SyncEvent.reset")(() =>
|
||||
Effect.sync(() => {
|
||||
frozen = false
|
||||
projectors = undefined
|
||||
convert = (_, data) => data as Record<string, unknown>
|
||||
}),
|
||||
)
|
||||
|
||||
BusEvent.define(def.type, def.properties || def.schema)
|
||||
}
|
||||
const init = Effect.fn("SyncEvent.init")((input: Init) =>
|
||||
Effect.sync(() => {
|
||||
projectors = new Map(input.projectors)
|
||||
|
||||
// Freeze the system so it clearly errors if events are defined
|
||||
// after `init` which would cause bugs
|
||||
frozen = true
|
||||
convertEvent = input.convertEvent || ((_, data) => data)
|
||||
// Install all the latest event defs to the bus. We only ever emit
|
||||
// latest versions from code, and keep around old versions for
|
||||
// replaying. Replaying does not go through the bus, and it
|
||||
// simplifies the bus to only use unversioned latest events
|
||||
for (const [type, version] of versions.entries()) {
|
||||
const def = registry.get(versionedType(type, version))!
|
||||
BusEvent.define(def.type, def.properties || def.schema)
|
||||
}
|
||||
|
||||
// Freeze the system so it clearly errors if events are defined
|
||||
// after `init` which would cause bugs
|
||||
frozen = true
|
||||
convert = input.convertEvent || ((_, data) => data as Record<string, unknown>)
|
||||
}),
|
||||
)
|
||||
|
||||
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
|
||||
if (projectors == null) {
|
||||
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
|
||||
}
|
||||
|
||||
const projector = projectors.get(def)
|
||||
if (!projector) {
|
||||
throw new Error(`Projector not found for event: ${def.type}`)
|
||||
}
|
||||
|
||||
// idempotent: need to ignore any events already logged
|
||||
Database.transaction((tx) => {
|
||||
projector(tx, event.data)
|
||||
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
|
||||
tx.insert(EventSequenceTable)
|
||||
.values({
|
||||
aggregate_id: event.aggregateID,
|
||||
seq: event.seq,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: EventSequenceTable.aggregate_id,
|
||||
set: { seq: event.seq },
|
||||
})
|
||||
.run()
|
||||
tx.insert(EventTable)
|
||||
.values({
|
||||
id: event.id,
|
||||
seq: event.seq,
|
||||
aggregate_id: event.aggregateID,
|
||||
type: versionedType(def.type, def.version),
|
||||
data: event.data as Record<string, unknown>,
|
||||
})
|
||||
.run()
|
||||
}
|
||||
|
||||
Database.effect(
|
||||
InstanceState.bind(() => {
|
||||
bus.emit("event", { def, event })
|
||||
|
||||
if (!options.publish) return
|
||||
|
||||
const result = convert(def.type, event.data)
|
||||
if (result instanceof Promise) {
|
||||
void result.then(
|
||||
InstanceState.bind((data) => {
|
||||
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
|
||||
}),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
|
||||
}),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
function replay(event: SerializedEvent, options?: { publish: boolean }) {
|
||||
const def = registry.get(event.type)
|
||||
if (!def) {
|
||||
throw new Error(`Unknown event type: ${event.type}`)
|
||||
}
|
||||
|
||||
const row = Database.use((db) =>
|
||||
db
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
|
||||
.get(),
|
||||
)
|
||||
|
||||
const latest = row?.seq ?? -1
|
||||
if (event.seq <= latest) return
|
||||
|
||||
const expected = latest + 1
|
||||
if (event.seq !== expected) {
|
||||
throw new Error(
|
||||
`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
|
||||
)
|
||||
}
|
||||
|
||||
process(def, event, { publish: !!options?.publish })
|
||||
}
|
||||
|
||||
function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
|
||||
const agg = (data as Record<string, string>)[def.aggregate]
|
||||
if (agg == null) {
|
||||
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
|
||||
}
|
||||
|
||||
if (def.version !== versions.get(def.type)) {
|
||||
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
|
||||
}
|
||||
|
||||
const publish = options?.publish ?? true
|
||||
|
||||
// Note that this is an "immediate" transaction which is critical.
|
||||
// We need to make sure we can safely read and write with nothing
|
||||
// else changing the data from under us
|
||||
Database.transaction(
|
||||
(tx) => {
|
||||
const id = EventID.ascending()
|
||||
const row = tx
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, agg))
|
||||
.get()
|
||||
const seq = row?.seq != null ? row.seq + 1 : 0
|
||||
|
||||
const event = { id, seq, aggregateID: agg, data }
|
||||
process(def, event, { publish })
|
||||
},
|
||||
{
|
||||
behavior: "immediate",
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
function remove(aggregateID: string) {
|
||||
Database.transaction((tx) => {
|
||||
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
|
||||
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
|
||||
})
|
||||
}
|
||||
|
||||
function subscribeAll(handler: (event: Payload) => void) {
|
||||
bus.on("event", handler)
|
||||
return () => bus.off("event", handler)
|
||||
}
|
||||
|
||||
function payloads() {
|
||||
return z
|
||||
.union(
|
||||
registry
|
||||
.entries()
|
||||
.map(([type, def]) => {
|
||||
return z
|
||||
.object({
|
||||
type: z.literal(type),
|
||||
aggregate: z.literal(def.aggregate),
|
||||
data: def.schema,
|
||||
})
|
||||
.meta({
|
||||
ref: "SyncEvent" + "." + def.type,
|
||||
})
|
||||
})
|
||||
.toArray() as any,
|
||||
)
|
||||
.meta({
|
||||
ref: "SyncEvent",
|
||||
})
|
||||
}
|
||||
|
||||
return Service.of({
|
||||
reset,
|
||||
init,
|
||||
replay: (event, options) =>
|
||||
Effect.gen(function* () {
|
||||
const ctx = yield* InstanceState.context
|
||||
return yield* Effect.sync(() => Instance.restore(ctx, () => replay(event, options)))
|
||||
}),
|
||||
run: (def, data, options) =>
|
||||
options?.publish === false
|
||||
? Effect.sync(() => run(def, data, options))
|
||||
: Effect.gen(function* () {
|
||||
const ctx = yield* InstanceState.context
|
||||
return yield* Effect.sync(() => Instance.restore(ctx, () => run(def, data, options)))
|
||||
}),
|
||||
remove: (aggregateID) => Effect.sync(() => remove(aggregateID)),
|
||||
subscribeAll: (handler) => Effect.sync(() => subscribeAll(handler)),
|
||||
payloads,
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer
|
||||
|
||||
const { runSync } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export function reset() {
|
||||
return runSync((svc) => svc.reset())
|
||||
}
|
||||
|
||||
export function init(input: Init) {
|
||||
return runSync((svc) => svc.init(input))
|
||||
}
|
||||
|
||||
export function versionedType<A extends string>(type: A): A
|
||||
@@ -102,63 +319,6 @@ export namespace SyncEvent {
|
||||
return [def, func as ProjectorFunc]
|
||||
}
|
||||
|
||||
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
|
||||
if (projectors == null) {
|
||||
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
|
||||
}
|
||||
|
||||
const projector = projectors.get(def)
|
||||
if (!projector) {
|
||||
throw new Error(`Projector not found for event: ${def.type}`)
|
||||
}
|
||||
|
||||
// idempotent: need to ignore any events already logged
|
||||
|
||||
Database.transaction((tx) => {
|
||||
projector(tx, event.data)
|
||||
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
|
||||
tx.insert(EventSequenceTable)
|
||||
.values({
|
||||
aggregate_id: event.aggregateID,
|
||||
seq: event.seq,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: EventSequenceTable.aggregate_id,
|
||||
set: { seq: event.seq },
|
||||
})
|
||||
.run()
|
||||
tx.insert(EventTable)
|
||||
.values({
|
||||
id: event.id,
|
||||
seq: event.seq,
|
||||
aggregate_id: event.aggregateID,
|
||||
type: versionedType(def.type, def.version),
|
||||
data: event.data as Record<string, unknown>,
|
||||
})
|
||||
.run()
|
||||
}
|
||||
|
||||
Database.effect(() => {
|
||||
Bus.emit("event", {
|
||||
def,
|
||||
event,
|
||||
})
|
||||
|
||||
if (options?.publish) {
|
||||
const result = convertEvent(def.type, event.data)
|
||||
if (result instanceof Promise) {
|
||||
result.then((data) => {
|
||||
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
|
||||
})
|
||||
} else {
|
||||
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// TODO:
|
||||
//
|
||||
// * Support applying multiple events at one time. One transaction,
|
||||
@@ -166,100 +326,22 @@ export namespace SyncEvent {
|
||||
// * when loading events from db, apply zod validation to ensure shape
|
||||
|
||||
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
|
||||
const def = registry.get(event.type)
|
||||
if (!def) {
|
||||
throw new Error(`Unknown event type: ${event.type}`)
|
||||
}
|
||||
|
||||
const row = Database.use((db) =>
|
||||
db
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
|
||||
.get(),
|
||||
)
|
||||
|
||||
const latest = row?.seq ?? -1
|
||||
if (event.seq <= latest) {
|
||||
return
|
||||
}
|
||||
|
||||
const expected = latest + 1
|
||||
if (event.seq !== expected) {
|
||||
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
|
||||
}
|
||||
|
||||
process(def, event, { publish: !!options?.publish })
|
||||
return runSync((svc) => svc.replay(event, options))
|
||||
}
|
||||
|
||||
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
|
||||
const agg = (data as Record<string, string>)[def.aggregate]
|
||||
// This should never happen: we've enforced it via typescript in
|
||||
// the definition
|
||||
if (agg == null) {
|
||||
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
|
||||
}
|
||||
|
||||
if (def.version !== versions.get(def.type)) {
|
||||
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
|
||||
}
|
||||
|
||||
const { publish = true } = options || {}
|
||||
|
||||
// Note that this is an "immediate" transaction which is critical.
|
||||
// We need to make sure we can safely read and write with nothing
|
||||
// else changing the data from under us
|
||||
Database.transaction(
|
||||
(tx) => {
|
||||
const id = EventID.ascending()
|
||||
const row = tx
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, agg))
|
||||
.get()
|
||||
const seq = row?.seq != null ? row.seq + 1 : 0
|
||||
|
||||
const event = { id, seq, aggregateID: agg, data }
|
||||
process(def, event, { publish })
|
||||
},
|
||||
{
|
||||
behavior: "immediate",
|
||||
},
|
||||
)
|
||||
return runSync((svc) => svc.run(def, data, options))
|
||||
}
|
||||
|
||||
export function remove(aggregateID: string) {
|
||||
Database.transaction((tx) => {
|
||||
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
|
||||
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
|
||||
})
|
||||
return runSync((svc) => svc.remove(aggregateID))
|
||||
}
|
||||
|
||||
export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) {
|
||||
Bus.on("event", handler)
|
||||
return () => Bus.off("event", handler)
|
||||
return runSync((svc) => svc.subscribeAll(handler))
|
||||
}
|
||||
|
||||
export function payloads() {
|
||||
return z
|
||||
.union(
|
||||
registry
|
||||
.entries()
|
||||
.map(([type, def]) => {
|
||||
return z
|
||||
.object({
|
||||
type: z.literal(type),
|
||||
aggregate: z.literal(def.aggregate),
|
||||
data: def.schema,
|
||||
})
|
||||
.meta({
|
||||
ref: "SyncEvent" + "." + def.type,
|
||||
})
|
||||
})
|
||||
.toArray() as any,
|
||||
)
|
||||
.meta({
|
||||
ref: "SyncEvent",
|
||||
})
|
||||
return runSync((svc) => Effect.sync(() => svc.payloads()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,6 @@ import { Glob } from "../util/glob"
|
||||
import path from "path"
|
||||
import { pathToFileURL } from "url"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { FetchHttpClient, HttpClient } from "effect/unstable/http"
|
||||
import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
@@ -137,7 +136,7 @@ export namespace ToolRegistry {
|
||||
Effect.gen(function* () {
|
||||
const pluginCtx: PluginToolContext = {
|
||||
...toolCtx,
|
||||
ask: (req) => Effect.runPromise(toolCtx.ask(req).pipe(Effect.provide(EffectLogger.layer))),
|
||||
ask: (req) => toolCtx.ask(req),
|
||||
directory: ctx.directory,
|
||||
worktree: ctx.worktree,
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
],
|
||||
"dependencies": {
|
||||
"@opencode-ai/sdk": "workspace:*",
|
||||
"effect": "catalog:",
|
||||
"zod": "catalog:"
|
||||
},
|
||||
"peerDependencies": {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { z } from "zod"
|
||||
import { Effect } from "effect"
|
||||
|
||||
export type ToolContext = {
|
||||
sessionID: string
|
||||
@@ -16,7 +17,7 @@ export type ToolContext = {
|
||||
worktree: string
|
||||
abort: AbortSignal
|
||||
metadata(input: { title?: string; metadata?: { [key: string]: any } }): void
|
||||
ask(input: AskInput): Promise<void>
|
||||
ask(input: AskInput): Effect.Effect<void>
|
||||
}
|
||||
|
||||
type AskInput = {
|
||||
|
||||
@@ -316,6 +316,29 @@ export type EventCommandExecuted = {
|
||||
}
|
||||
}
|
||||
|
||||
export type EventWorkspaceReady = {
|
||||
type: "workspace.ready"
|
||||
properties: {
|
||||
name: string
|
||||
}
|
||||
}
|
||||
|
||||
export type EventWorkspaceFailed = {
|
||||
type: "workspace.failed"
|
||||
properties: {
|
||||
message: string
|
||||
}
|
||||
}
|
||||
|
||||
export type EventWorkspaceStatus = {
|
||||
type: "workspace.status"
|
||||
properties: {
|
||||
workspaceID: string
|
||||
status: "connected" | "connecting" | "disconnected" | "error"
|
||||
error?: string
|
||||
}
|
||||
}
|
||||
|
||||
export type QuestionOption = {
|
||||
/**
|
||||
* Display text (1-5 words, concise)
|
||||
@@ -387,29 +410,6 @@ export type EventQuestionRejected = {
|
||||
}
|
||||
}
|
||||
|
||||
export type Todo = {
|
||||
/**
|
||||
* Brief description of the task
|
||||
*/
|
||||
content: string
|
||||
/**
|
||||
* Current status of the task: pending, in_progress, completed, cancelled
|
||||
*/
|
||||
status: string
|
||||
/**
|
||||
* Priority level of the task: high, medium, low
|
||||
*/
|
||||
priority: string
|
||||
}
|
||||
|
||||
export type EventTodoUpdated = {
|
||||
type: "todo.updated"
|
||||
properties: {
|
||||
sessionID: string
|
||||
todos: Array<Todo>
|
||||
}
|
||||
}
|
||||
|
||||
export type SessionStatus =
|
||||
| {
|
||||
type: "idle"
|
||||
@@ -446,6 +446,29 @@ export type EventSessionCompacted = {
|
||||
}
|
||||
}
|
||||
|
||||
export type Todo = {
|
||||
/**
|
||||
* Brief description of the task
|
||||
*/
|
||||
content: string
|
||||
/**
|
||||
* Current status of the task: pending, in_progress, completed, cancelled
|
||||
*/
|
||||
status: string
|
||||
/**
|
||||
* Priority level of the task: high, medium, low
|
||||
*/
|
||||
priority: string
|
||||
}
|
||||
|
||||
export type EventTodoUpdated = {
|
||||
type: "todo.updated"
|
||||
properties: {
|
||||
sessionID: string
|
||||
todos: Array<Todo>
|
||||
}
|
||||
}
|
||||
|
||||
export type EventWorktreeReady = {
|
||||
type: "worktree.ready"
|
||||
properties: {
|
||||
@@ -500,29 +523,6 @@ export type EventPtyDeleted = {
|
||||
}
|
||||
}
|
||||
|
||||
export type EventWorkspaceReady = {
|
||||
type: "workspace.ready"
|
||||
properties: {
|
||||
name: string
|
||||
}
|
||||
}
|
||||
|
||||
export type EventWorkspaceFailed = {
|
||||
type: "workspace.failed"
|
||||
properties: {
|
||||
message: string
|
||||
}
|
||||
}
|
||||
|
||||
export type EventWorkspaceStatus = {
|
||||
type: "workspace.status"
|
||||
properties: {
|
||||
workspaceID: string
|
||||
status: "connected" | "connecting" | "disconnected" | "error"
|
||||
error?: string
|
||||
}
|
||||
}
|
||||
|
||||
export type OutputFormatText = {
|
||||
type: "text"
|
||||
}
|
||||
@@ -995,22 +995,22 @@ export type Event =
|
||||
| EventMcpToolsChanged
|
||||
| EventMcpBrowserOpenFailed
|
||||
| EventCommandExecuted
|
||||
| EventWorkspaceReady
|
||||
| EventWorkspaceFailed
|
||||
| EventWorkspaceStatus
|
||||
| EventQuestionAsked
|
||||
| EventQuestionReplied
|
||||
| EventQuestionRejected
|
||||
| EventTodoUpdated
|
||||
| EventSessionStatus
|
||||
| EventSessionIdle
|
||||
| EventSessionCompacted
|
||||
| EventTodoUpdated
|
||||
| EventWorktreeReady
|
||||
| EventWorktreeFailed
|
||||
| EventPtyCreated
|
||||
| EventPtyUpdated
|
||||
| EventPtyExited
|
||||
| EventPtyDeleted
|
||||
| EventWorkspaceReady
|
||||
| EventWorkspaceFailed
|
||||
| EventWorkspaceStatus
|
||||
| EventMessageUpdated
|
||||
| EventMessageRemoved
|
||||
| EventMessagePartUpdated
|
||||
|
||||
@@ -7986,6 +7986,71 @@
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"Event.workspace.ready": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"const": "workspace.ready"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["name"]
|
||||
}
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"Event.workspace.failed": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"const": "workspace.failed"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"message": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["message"]
|
||||
}
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"Event.workspace.status": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"const": "workspace.status"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"workspaceID": {
|
||||
"type": "string",
|
||||
"pattern": "^wrk.*"
|
||||
},
|
||||
"status": {
|
||||
"type": "string",
|
||||
"enum": ["connected", "connecting", "disconnected", "error"]
|
||||
},
|
||||
"error": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["workspaceID", "status"]
|
||||
}
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"QuestionOption": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -8136,50 +8201,6 @@
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"Todo": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"content": {
|
||||
"description": "Brief description of the task",
|
||||
"type": "string"
|
||||
},
|
||||
"status": {
|
||||
"description": "Current status of the task: pending, in_progress, completed, cancelled",
|
||||
"type": "string"
|
||||
},
|
||||
"priority": {
|
||||
"description": "Priority level of the task: high, medium, low",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["content", "status", "priority"]
|
||||
},
|
||||
"Event.todo.updated": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"const": "todo.updated"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sessionID": {
|
||||
"type": "string",
|
||||
"pattern": "^ses.*"
|
||||
},
|
||||
"todos": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/components/schemas/Todo"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["sessionID", "todos"]
|
||||
}
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"SessionStatus": {
|
||||
"anyOf": [
|
||||
{
|
||||
@@ -8286,6 +8307,50 @@
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"Todo": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"content": {
|
||||
"description": "Brief description of the task",
|
||||
"type": "string"
|
||||
},
|
||||
"status": {
|
||||
"description": "Current status of the task: pending, in_progress, completed, cancelled",
|
||||
"type": "string"
|
||||
},
|
||||
"priority": {
|
||||
"description": "Priority level of the task: high, medium, low",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["content", "status", "priority"]
|
||||
},
|
||||
"Event.todo.updated": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"const": "todo.updated"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sessionID": {
|
||||
"type": "string",
|
||||
"pattern": "^ses.*"
|
||||
},
|
||||
"todos": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/components/schemas/Todo"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["sessionID", "todos"]
|
||||
}
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"Event.worktree.ready": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -8440,71 +8505,6 @@
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"Event.workspace.ready": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"const": "workspace.ready"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["name"]
|
||||
}
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"Event.workspace.failed": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"const": "workspace.failed"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"message": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["message"]
|
||||
}
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"Event.workspace.status": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"const": "workspace.status"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"workspaceID": {
|
||||
"type": "string",
|
||||
"pattern": "^wrk.*"
|
||||
},
|
||||
"status": {
|
||||
"type": "string",
|
||||
"enum": ["connected", "connecting", "disconnected", "error"]
|
||||
},
|
||||
"error": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["workspaceID", "status"]
|
||||
}
|
||||
},
|
||||
"required": ["type", "properties"]
|
||||
},
|
||||
"OutputFormatText": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -9937,6 +9937,15 @@
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.command.executed"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.workspace.ready"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.workspace.failed"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.workspace.status"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.question.asked"
|
||||
},
|
||||
@@ -9946,9 +9955,6 @@
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.question.rejected"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.todo.updated"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.session.status"
|
||||
},
|
||||
@@ -9958,6 +9964,9 @@
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.session.compacted"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.todo.updated"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.worktree.ready"
|
||||
},
|
||||
@@ -9976,15 +9985,6 @@
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.pty.deleted"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.workspace.ready"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.workspace.failed"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.workspace.status"
|
||||
},
|
||||
{
|
||||
"$ref": "#/components/schemas/Event.message.updated"
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user