Compare commits

..

10 Commits

Author SHA1 Message Date
Kit Langton
c3eb2418ca fix(sync): preserve shared state and caller semantics 2026-04-11 00:10:41 -04:00
Kit Langton
397adb4190 fix(sync): restore instance context for deferred publish 2026-04-11 00:10:41 -04:00
Kit Langton
dbc136d3b3 Merge branch 'dev' into kit/effect-sync-event 2026-04-10 23:54:56 -04:00
opencode-agent[bot]
2e340d976f chore: generate 2026-04-11 03:53:48 +00:00
Kit Langton
fe4dfb9f6f refactor(git): remove runtime facade wrappers (#21982) 2026-04-10 23:52:48 -04:00
Kit Langton
5e3dc80999 refactor: collapse command facade (#21981) 2026-04-10 23:52:12 -04:00
Kit Langton
e2c41dce99 Merge branch 'dev' into kit/effect-sync-event 2026-04-10 23:51:34 -04:00
Kit Langton
d84cc33742 refactor(plugin): return Effect from ToolContext.ask (#21986) 2026-04-10 23:50:50 -04:00
Kit Langton
f8580ab9f7 Merge branch 'dev' into kit/effect-sync-event 2026-04-10 23:42:10 -04:00
Kit Langton
af7ada9a7a refactor(sync): migrate SyncEvent to service layer 2026-04-10 23:34:38 -04:00
19 changed files with 651 additions and 635 deletions

View File

@@ -450,6 +450,7 @@
"version": "1.4.3",
"dependencies": {
"@opencode-ai/sdk": "workspace:*",
"effect": "catalog:",
"zod": "catalog:",
},
"devDependencies": {

View File

@@ -29,6 +29,7 @@ import { Provider } from "../../provider/provider"
import { Bus } from "../../bus"
import { MessageV2 } from "../../session/message-v2"
import { SessionPrompt } from "@/session/prompt"
import { AppRuntime } from "@/effect/app-runtime"
import { Git } from "@/git"
import { setTimeout as sleep } from "node:timers/promises"
import { Process } from "@/util/process"
@@ -258,7 +259,9 @@ export const GithubInstallCommand = cmd({
}
// Get repo info
const info = (await Git.run(["remote", "get-url", "origin"], { cwd: Instance.worktree })).text().trim()
const info = await AppRuntime.runPromise(
Git.Service.use((git) => git.run(["remote", "get-url", "origin"], { cwd: Instance.worktree })),
).then((x) => x.text().trim())
const parsed = parseGitHubRemote(info)
if (!parsed) {
prompts.log.error(`Could not find git repository. Please run this command from a git repository.`)
@@ -497,20 +500,21 @@ export const GithubRunCommand = cmd({
: "issue"
: undefined
const gitText = async (args: string[]) => {
const result = await Git.run(args, { cwd: Instance.worktree })
const result = await AppRuntime.runPromise(Git.Service.use((git) => git.run(args, { cwd: Instance.worktree })))
if (result.exitCode !== 0) {
throw new Process.RunFailedError(["git", ...args], result.exitCode, result.stdout, result.stderr)
}
return result.text().trim()
}
const gitRun = async (args: string[]) => {
const result = await Git.run(args, { cwd: Instance.worktree })
const result = await AppRuntime.runPromise(Git.Service.use((git) => git.run(args, { cwd: Instance.worktree })))
if (result.exitCode !== 0) {
throw new Process.RunFailedError(["git", ...args], result.exitCode, result.stdout, result.stderr)
}
return result
}
const gitStatus = (args: string[]) => Git.run(args, { cwd: Instance.worktree })
const gitStatus = (args: string[]) =>
AppRuntime.runPromise(Git.Service.use((git) => git.run(args, { cwd: Instance.worktree })))
const commitChanges = async (summary: string, actor?: string) => {
const args = ["commit", "-m", summary]
if (actor) args.push("-m", `Co-authored-by: ${actor} <${actor}@users.noreply.github.com>`)

View File

@@ -1,5 +1,6 @@
import { UI } from "../ui"
import { cmd } from "./cmd"
import { AppRuntime } from "@/effect/app-runtime"
import { Git } from "@/git"
import { Instance } from "@/project/instance"
import { Process } from "@/util/process"
@@ -67,19 +68,29 @@ export const PrCommand = cmd({
const remoteName = forkOwner
// Check if remote already exists
const remotes = (await Git.run(["remote"], { cwd: Instance.worktree })).text().trim()
const remotes = await AppRuntime.runPromise(
Git.Service.use((git) => git.run(["remote"], { cwd: Instance.worktree })),
).then((x) => x.text().trim())
if (!remotes.split("\n").includes(remoteName)) {
await Git.run(["remote", "add", remoteName, `https://github.com/${forkOwner}/${forkName}.git`], {
cwd: Instance.worktree,
})
await AppRuntime.runPromise(
Git.Service.use((git) =>
git.run(["remote", "add", remoteName, `https://github.com/${forkOwner}/${forkName}.git`], {
cwd: Instance.worktree,
}),
),
)
UI.println(`Added fork remote: ${remoteName}`)
}
// Set upstream to the fork so pushes go there
const headRefName = prInfo.headRefName
await Git.run(["branch", `--set-upstream-to=${remoteName}/${headRefName}`, localBranchName], {
cwd: Instance.worktree,
})
await AppRuntime.runPromise(
Git.Service.use((git) =>
git.run(["branch", `--set-upstream-to=${remoteName}/${headRefName}`, localBranchName], {
cwd: Instance.worktree,
}),
),
)
}
// Check for opencode session link in PR body

View File

@@ -1,6 +1,5 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import type { InstanceContext } from "@/project/instance"
import { SessionID, MessageID } from "@/session/schema"
import { Effect, Layer, Context } from "effect"
@@ -189,10 +188,4 @@ export namespace Command {
Layer.provide(MCP.defaultLayer),
Layer.provide(Skill.defaultLayer),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function list() {
return runPromise((svc) => svc.list())
}
}

View File

@@ -1,19 +1,19 @@
import z from "zod"
import { AppFileSystem } from "@/filesystem"
import { makeRuntime } from "@/effect/run-service"
import { setTimeout as sleep } from "node:timers/promises"
import { fn } from "@/util/fn"
import { Database, eq } from "@/storage/db"
import type { Project } from "@/project/project"
import { Project } from "@/project/project"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { SyncEvent } from "@/sync"
import { Log } from "@/util/log"
import { Filesystem } from "@/util/filesystem"
import { ProjectID } from "@/project/schema"
import { WorkspaceTable } from "./workspace.sql"
import { getAdaptor } from "./adaptors"
import { WorkspaceInfo } from "./types"
import { WorkspaceID } from "./schema"
import { parseSSE } from "./sse"
import { Context, Effect, Layer, Scope } from "effect"
export namespace Workspace {
export const Info = WorkspaceInfo.meta({
@@ -56,250 +56,174 @@ export namespace Workspace {
}
}
export const CreateInput = z.object({
const CreateInput = z.object({
id: WorkspaceID.zod.optional(),
type: Info.shape.type,
branch: Info.shape.branch,
projectID: ProjectID.zod,
extra: Info.shape.extra,
})
export type CreateInput = z.infer<typeof CreateInput>
export const create = fn(CreateInput, async (input) => {
const id = WorkspaceID.ascending(input.id)
const adaptor = await getAdaptor(input.type)
const config = await adaptor.configure({ ...input, id, name: null, directory: null })
const info: Info = {
id,
type: config.type,
branch: config.branch ?? null,
name: config.name ?? null,
directory: config.directory ?? null,
extra: config.extra ?? null,
projectID: input.projectID,
}
Database.use((db) => {
db.insert(WorkspaceTable)
.values({
id: info.id,
type: info.type,
branch: info.branch,
name: info.name,
directory: info.directory,
extra: info.extra,
project_id: info.projectID,
})
.run()
})
await adaptor.create(config)
startSync(info)
return info
})
export function list(project: Project.Info) {
const rows = Database.use((db) =>
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
)
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
for (const space of spaces) startSync(space)
return spaces
}
export const get = fn(WorkspaceID.zod, async (id) => {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
const space = fromRow(row)
startSync(space)
return space
})
export const remove = fn(WorkspaceID.zod, async (id) => {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (row) {
stopSync(id)
const info = fromRow(row)
const adaptor = await getAdaptor(row.type)
adaptor.remove(info)
Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
return info
}
})
const connections = new Map<WorkspaceID, ConnectionStatus>()
const aborts = new Map<WorkspaceID, AbortController>()
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
const prev = connections.get(id)
if (prev?.status === status && prev?.error === error) return
const next = { workspaceID: id, status, error }
connections.set(id, next)
GlobalBus.emit("event", {
directory: "global",
workspace: id,
payload: {
type: Event.Status.type,
properties: next,
},
})
}
export function status(): ConnectionStatus[] {
return [...connections.values()]
}
const log = Log.create({ service: "workspace-sync" })
export interface Interface {
readonly create: (input: CreateInput) => Effect.Effect<Info>
readonly list: (projectID: ProjectID) => Effect.Effect<Info[]>
readonly get: (id: WorkspaceID) => Effect.Effect<Info | undefined>
readonly remove: (id: WorkspaceID) => Effect.Effect<Info | undefined>
readonly status: () => Effect.Effect<ConnectionStatus[]>
}
async function workspaceEventLoop(space: Info, signal: AbortSignal) {
log.info("starting sync: " + space.id)
export class Service extends Context.Service<Service, Interface>()("@opencode/Workspace") {}
while (!signal.aborted) {
log.info("connecting to sync: " + space.id)
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const scope = yield* Scope.Scope
const connections = new Map<WorkspaceID, ConnectionStatus>()
const aborts = new Map<WorkspaceID, AbortController>()
setStatus(space.id, "connecting")
const adaptor = await getAdaptor(space.type)
const target = await adaptor.target(space)
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
for (const abort of aborts.values()) abort.abort()
}),
)
if (target.type === "local") return
const db = <T>(fn: (db: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
const setStatus = Effect.fnUntraced(function* (
id: WorkspaceID,
status: ConnectionStatus["status"],
error?: string,
) {
const prev = connections.get(id)
if (prev?.status === status && prev?.error === error) return
const next = { workspaceID: id, status, error }
connections.set(id, next)
GlobalBus.emit("event", {
directory: "global",
workspace: id,
payload: {
type: Event.Status.type,
properties: next,
},
})
const res = await fetch(target.url + "/sync/event", { method: "GET", signal }).catch((err: unknown) => {
setStatus(space.id, "error", String(err))
return undefined
})
if (!res || !res.ok || !res.body) {
log.info("failed to connect to sync: " + res?.status)
const stopSync = Effect.fnUntraced(function* (id: WorkspaceID) {
aborts.get(id)?.abort()
aborts.delete(id)
connections.delete(id)
})
setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
await sleep(1000)
continue
}
setStatus(space.id, "connected")
await parseSSE(res.body, signal, (evt) => {
const event = evt as SyncEvent.SerializedEvent
const workspaceEventLoop = Effect.fn("Workspace.workspaceEventLoop")(function* (
space: Info,
signal: AbortSignal,
) {
log.info("starting sync: " + space.id)
while (!signal.aborted) {
log.info("connecting to sync: " + space.id)
yield* setStatus(space.id, "connecting")
const adaptor = yield* Effect.promise(() => getAdaptor(space.type)).pipe(Effect.orDie)
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))).pipe(Effect.orDie)
if (target.type === "local") return
const res = yield* Effect.tryPromise({
try: () => fetch(target.url + "/sync/event", { method: "GET", signal }),
catch: (err) => String(err),
}).pipe(
Effect.catch((err) => setStatus(space.id, "error", err).pipe(Effect.as(undefined as Response | undefined))),
)
if (!res || !res.ok || !res.body) {
log.info("failed to connect to sync: " + res?.status)
yield* setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
yield* Effect.sleep("1 second")
continue
try {
if (!event.type.startsWith("server.")) {
SyncEvent.replay(event)
}
const body = res.body
yield* setStatus(space.id, "connected")
yield* Effect.promise(() =>
parseSSE(body, signal, (evt) => {
const event = evt as SyncEvent.SerializedEvent
try {
if (!event.type.startsWith("server.")) {
SyncEvent.replay(event)
}
} catch (err) {
log.warn("failed to replay sync event", {
workspaceID: space.id,
error: err,
})
}
}),
).pipe(Effect.orDie)
yield* setStatus(space.id, "disconnected")
log.info("disconnected to sync: " + space.id)
yield* Effect.sleep("250 millis")
} catch (err) {
log.warn("failed to replay sync event", {
workspaceID: space.id,
error: err,
})
}
})
const startSync = Effect.fn("Workspace.startSync")(function* (space: Info) {
if (space.type === "worktree") {
yield* Effect.gen(function* () {
const exists = yield* fs.exists(space.directory!).pipe(Effect.orDie)
yield* setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
}).pipe(Effect.forkIn(scope))
return
}
if (aborts.has(space.id)) return
const abort = new AbortController()
aborts.set(space.id, abort)
yield* setStatus(space.id, "disconnected")
yield* workspaceEventLoop(space, abort.signal).pipe(
Effect.catchCause((cause) =>
Effect.gen(function* () {
yield* setStatus(space.id, "error", String(cause))
yield* Effect.sync(() => {
log.warn("workspace sync listener failed", {
workspaceID: space.id,
error: cause,
})
})
}),
),
Effect.forkIn(scope),
)
})
const create = Effect.fn("Workspace.create")(function* (input: CreateInput) {
const id = WorkspaceID.ascending(input.id)
const adaptor = yield* Effect.promise(() => getAdaptor(input.type)).pipe(Effect.orDie)
const config = yield* Effect.promise(() =>
Promise.resolve(adaptor.configure({ ...input, id, name: null, directory: null })),
).pipe(Effect.orDie)
const info: Info = {
id,
type: config.type,
branch: config.branch ?? null,
name: config.name ?? null,
directory: config.directory ?? null,
extra: config.extra ?? null,
projectID: input.projectID,
}
yield* db((db) => {
db.insert(WorkspaceTable)
.values({
id: info.id,
type: info.type,
branch: info.branch,
name: info.name,
directory: info.directory,
extra: info.extra,
project_id: info.projectID,
})
.run()
})
yield* Effect.promise(() => adaptor.create(config)).pipe(Effect.orDie)
yield* startSync(info)
return info
})
const list = Effect.fn("Workspace.list")(function* (projectID: ProjectID) {
const rows = yield* db((db) =>
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, projectID)).all(),
)
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
for (const space of spaces) {
yield* startSync(space)
}
return spaces
})
const get = Effect.fn("Workspace.get")(function* (id: WorkspaceID) {
const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
const space = fromRow(row)
yield* startSync(space)
return space
})
const remove = Effect.fn("Workspace.remove")(function* (id: WorkspaceID) {
const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
yield* stopSync(id)
const info = fromRow(row)
const adaptor = yield* Effect.promise(() => getAdaptor(row.type)).pipe(Effect.orDie)
yield* Effect.sync(() => {
void adaptor.remove(info)
})
yield* db((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
return info
})
const status = Effect.fn("Workspace.status")(() => Effect.succeed([...connections.values()]))
return Service.of({ create, list, get, remove, status })
}),
)
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function create(input: CreateInput) {
return runPromise((svc) => svc.create(input))
setStatus(space.id, "disconnected")
log.info("disconnected to sync: " + space.id)
await sleep(250)
}
}
export async function list(project: Project.Info) {
return runPromise((svc) => svc.list(project.id))
function startSync(space: Info) {
if (space.type === "worktree") {
void Filesystem.exists(space.directory!).then((exists) => {
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
})
return
}
if (aborts.has(space.id)) return
const abort = new AbortController()
aborts.set(space.id, abort)
setStatus(space.id, "disconnected")
void workspaceEventLoop(space, abort.signal).catch((error) => {
setStatus(space.id, "error", String(error))
log.warn("workspace sync listener failed", {
workspaceID: space.id,
error,
})
})
}
export async function get(id: WorkspaceID) {
return runPromise((svc) => svc.get(id))
}
export async function remove(id: WorkspaceID) {
return runPromise((svc) => svc.remove(id))
}
export async function status() {
return runPromise((svc) => svc.status())
function stopSync(id: WorkspaceID) {
aborts.get(id)?.abort()
aborts.delete(id)
connections.delete(id)
}
}

View File

@@ -0,0 +1,9 @@
import { Layer, ManagedRuntime } from "effect"
import { memoMap } from "./run-service"
import { Format } from "@/format"
import { ShareNext } from "@/share/share-next"
export const BootstrapLayer = Layer.mergeAll(Format.defaultLayer, ShareNext.defaultLayer)
export const BootstrapRuntime = ManagedRuntime.make(BootstrapLayer, { memoMap })

View File

@@ -1,7 +1,6 @@
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
import { Effect, Layer, Context, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { makeRuntime } from "@/effect/run-service"
export namespace Git {
const cfg = [
@@ -258,14 +257,4 @@ export namespace Git {
)
export const defaultLayer = layer.pipe(Layer.provide(CrossSpawnSpawner.defaultLayer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function run(args: string[], opts: Options) {
return runPromise((git) => git.run(args, opts))
}
export async function defaultBranch(cwd: string) {
return runPromise((git) => git.defaultBranch(cwd))
}
}

View File

@@ -10,14 +10,14 @@ import { Bus } from "../bus"
import { Command } from "../command"
import { Instance } from "./instance"
import { Log } from "@/util/log"
import { AppRuntime } from "@/effect/app-runtime"
import { BootstrapRuntime } from "@/effect/bootstrap-runtime"
import { ShareNext } from "@/share/share-next"
export async function InstanceBootstrap() {
Log.Default.info("bootstrapping", { directory: Instance.directory })
await Plugin.init()
void AppRuntime.runPromise(ShareNext.Service.use((svc) => svc.init()))
void AppRuntime.runPromise(Format.Service.use((svc) => svc.init()))
void BootstrapRuntime.runPromise(ShareNext.Service.use((svc) => svc.init()))
void BootstrapRuntime.runPromise(Format.Service.use((svc) => svc.init()))
await LSP.init()
File.init()
FileWatcher.init()

View File

@@ -191,7 +191,7 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket, app: Hono = new Hono()
},
}),
async (c) => {
const commands = await Command.list()
const commands = await AppRuntime.runPromise(Command.Service.use((svc) => svc.list()))
return c.json(commands)
},
)

View File

@@ -28,7 +28,7 @@ export const WorkspaceRoutes = lazy(() =>
}),
validator(
"json",
Workspace.CreateInput.omit({
Workspace.create.schema.omit({
projectID: true,
}),
),
@@ -59,7 +59,7 @@ export const WorkspaceRoutes = lazy(() =>
},
}),
async (c) => {
return c.json(await Workspace.list(Instance.project))
return c.json(Workspace.list(Instance.project))
},
)
.get(
@@ -80,8 +80,8 @@ export const WorkspaceRoutes = lazy(() =>
},
}),
async (c) => {
const ids = new Set((await Workspace.list(Instance.project)).map((item) => item.id))
return c.json((await Workspace.status()).filter((item) => ids.has(item.workspaceID)))
const ids = new Set(Workspace.list(Instance.project).map((item) => item.id))
return c.json(Workspace.status().filter((item) => ids.has(item.workspaceID)))
},
)
.delete(

View File

@@ -366,11 +366,12 @@ export namespace Session {
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> = Layer.effect(
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const storage = yield* Storage.Service
const sync = yield* SyncEvent.Service
const createNext = Effect.fn("Session.createNext")(function* (input: {
id?: SessionID
@@ -398,7 +399,7 @@ export namespace Session {
}
log.info("created", result)
yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
yield* sync.run(Event.Created, { sessionID: result.id, info: result })
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
// This only exist for backwards compatibility. We should not be
@@ -446,10 +447,12 @@ export namespace Session {
Effect.catchCause(() => Effect.succeed(false)),
)
yield* Effect.sync(() => {
SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
SyncEvent.remove(sessionID)
})
if (hasInstance) {
yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: true })
} else {
yield* Effect.sync(() => SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: false }))
}
yield* sync.remove(sessionID)
} catch (e) {
log.error(e)
}
@@ -457,19 +460,17 @@ export namespace Session {
const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
return msg
}).pipe(Effect.withSpan("Session.updateMessage"))
const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
}),
)
yield* sync.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
})
return part
}).pipe(Effect.withSpan("Session.updatePart"))
@@ -549,8 +550,7 @@ export namespace Session {
return session
})
const patch = (sessionID: SessionID, info: Patch) =>
Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info }))
const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info })
const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
yield* patch(sessionID, { time: { updated: Date.now() } })
@@ -607,12 +607,10 @@ export namespace Session {
sessionID: SessionID
messageID: MessageID
}) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
}),
)
yield* sync.run(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
})
return input.messageID
})
@@ -621,13 +619,11 @@ export namespace Session {
messageID: MessageID
partID: PartID
}) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
}),
)
yield* sync.run(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
})
return input.partID
})
@@ -678,7 +674,11 @@ export namespace Session {
}),
)
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer))
export const defaultLayer = layer.pipe(
Layer.provide(Bus.layer),
Layer.provide(Storage.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
)
const { runPromise } = makeRuntime(Service, defaultLayer)

View File

@@ -40,6 +40,7 @@ export namespace SessionRevert {
const bus = yield* Bus.Service
const summary = yield* SessionSummary.Service
const state = yield* SessionRunState.Service
const sync = yield* SyncEvent.Service
const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
yield* state.assertNotBusy(input.sessionID)
@@ -123,7 +124,7 @@ export namespace SessionRevert {
remove.push(msg)
}
for (const msg of remove) {
SyncEvent.run(MessageV2.Event.Removed, {
yield* sync.run(MessageV2.Event.Removed, {
sessionID,
messageID: msg.info.id,
})
@@ -135,7 +136,7 @@ export namespace SessionRevert {
const removeParts = target.parts.slice(idx)
target.parts = target.parts.slice(0, idx)
for (const part of removeParts) {
SyncEvent.run(MessageV2.Event.PartRemoved, {
yield* sync.run(MessageV2.Event.PartRemoved, {
sessionID,
messageID: target.info.id,
partID: part.id,
@@ -158,6 +159,7 @@ export namespace SessionRevert {
Layer.provide(Storage.defaultLayer),
Layer.provide(Bus.layer),
Layer.provide(SessionSummary.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
),
)

View File

@@ -24,20 +24,19 @@ export namespace SessionShare {
const session = yield* Session.Service
const shareNext = yield* ShareNext.Service
const scope = yield* Scope.Scope
const sync = yield* SyncEvent.Service
const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
const conf = yield* cfg.get()
if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
const result = yield* shareNext.create(sessionID)
yield* Effect.sync(() =>
SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
)
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } })
return result
})
const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
yield* shareNext.remove(sessionID)
yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })
})
const create = Effect.fn("SessionShare.create")(function* (input?: Parameters<typeof Session.create>[0]) {
@@ -57,6 +56,7 @@ export namespace SessionShare {
Layer.provide(ShareNext.defaultLayer),
Layer.provide(Session.defaultLayer),
Layer.provide(Config.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
)
const { runPromise } = makeRuntime(Service, defaultLayer)

View File

@@ -1,14 +1,20 @@
import z from "zod"
import type { ZodObject } from "zod"
import { EventEmitter } from "events"
import { Context, Effect, Layer } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import { Database, eq } from "@/storage/db"
import { Bus as ProjectBus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { EventSequenceTable, EventTable } from "./event.sql"
import { EventID } from "./schema"
import { Flag } from "@/flag/flag"
import { Instance } from "@/project/instance"
export namespace SyncEvent {
type Convert = (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
export type Definition = {
type: string
version: number
@@ -30,38 +36,249 @@ export namespace SyncEvent {
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
type Init = { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: Convert }
type Payload = { def: Definition; event: Event }
export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()
let frozen = false
let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
let projectors: Map<Definition, ProjectorFunc> | undefined
let convert: Convert = (_, data) => data as Record<string, unknown>
const bus = new EventEmitter<{ event: [Payload] }>()
const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>()
export function reset() {
frozen = false
projectors = undefined
convertEvent = (_, data) => data
export interface Interface {
readonly reset: () => Effect.Effect<void>
readonly init: (input: Init) => Effect.Effect<void>
readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect<void>
readonly run: <Def extends Definition>(
def: Def,
data: Event<Def>["data"],
options?: { publish?: boolean },
) => Effect.Effect<void>
readonly remove: (aggregateID: string) => Effect.Effect<void>
readonly subscribeAll: (handler: (event: Payload) => void) => Effect.Effect<() => void>
readonly payloads: () => z.ZodTypeAny
}
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
projectors = new Map(input.projectors)
export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
// Install all the latest event defs to the bus. We only ever emit
// latest versions from code, and keep around old versions for
// replaying. Replaying does not go through the bus, and it
// simplifies the bus to only use unversioned latest events
for (let [type, version] of versions.entries()) {
let def = registry.get(versionedType(type, version))!
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const reset = Effect.fn("SyncEvent.reset")(() =>
Effect.sync(() => {
frozen = false
projectors = undefined
convert = (_, data) => data as Record<string, unknown>
}),
)
BusEvent.define(def.type, def.properties || def.schema)
}
const init = Effect.fn("SyncEvent.init")((input: Init) =>
Effect.sync(() => {
projectors = new Map(input.projectors)
// Freeze the system so it clearly errors if events are defined
// after `init` which would cause bugs
frozen = true
convertEvent = input.convertEvent || ((_, data) => data)
// Install all the latest event defs to the bus. We only ever emit
// latest versions from code, and keep around old versions for
// replaying. Replaying does not go through the bus, and it
// simplifies the bus to only use unversioned latest events
for (const [type, version] of versions.entries()) {
const def = registry.get(versionedType(type, version))!
BusEvent.define(def.type, def.properties || def.schema)
}
// Freeze the system so it clearly errors if events are defined
// after `init` which would cause bugs
frozen = true
convert = input.convertEvent || ((_, data) => data as Record<string, unknown>)
}),
)
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
const projector = projectors.get(def)
if (!projector) {
throw new Error(`Projector not found for event: ${def.type}`)
}
// idempotent: need to ignore any events already logged
Database.transaction((tx) => {
projector(tx, event.data)
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
tx.insert(EventSequenceTable)
.values({
aggregate_id: event.aggregateID,
seq: event.seq,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq: event.seq },
})
.run()
tx.insert(EventTable)
.values({
id: event.id,
seq: event.seq,
aggregate_id: event.aggregateID,
type: versionedType(def.type, def.version),
data: event.data as Record<string, unknown>,
})
.run()
}
Database.effect(
InstanceState.bind(() => {
bus.emit("event", { def, event })
if (!options.publish) return
const result = convert(def.type, event.data)
if (result instanceof Promise) {
void result.then(
InstanceState.bind((data) => {
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
}),
)
return
}
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
}),
)
})
}
function replay(event: SerializedEvent, options?: { publish: boolean }) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) return
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(
`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
)
}
process(def, event, { publish: !!options?.publish })
}
function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
const agg = (data as Record<string, string>)[def.aggregate]
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
const publish = options?.publish ?? true
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish })
},
{
behavior: "immediate",
},
)
}
function remove(aggregateID: string) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
}
function subscribeAll(handler: (event: Payload) => void) {
bus.on("event", handler)
return () => bus.off("event", handler)
}
function payloads() {
return z
.union(
registry
.entries()
.map(([type, def]) => {
return z
.object({
type: z.literal(type),
aggregate: z.literal(def.aggregate),
data: def.schema,
})
.meta({
ref: "SyncEvent" + "." + def.type,
})
})
.toArray() as any,
)
.meta({
ref: "SyncEvent",
})
}
return Service.of({
reset,
init,
replay: (event, options) =>
Effect.gen(function* () {
const ctx = yield* InstanceState.context
return yield* Effect.sync(() => Instance.restore(ctx, () => replay(event, options)))
}),
run: (def, data, options) =>
options?.publish === false
? Effect.sync(() => run(def, data, options))
: Effect.gen(function* () {
const ctx = yield* InstanceState.context
return yield* Effect.sync(() => Instance.restore(ctx, () => run(def, data, options)))
}),
remove: (aggregateID) => Effect.sync(() => remove(aggregateID)),
subscribeAll: (handler) => Effect.sync(() => subscribeAll(handler)),
payloads,
})
}),
)
export const defaultLayer = layer
const { runSync } = makeRuntime(Service, defaultLayer)
export function reset() {
return runSync((svc) => svc.reset())
}
export function init(input: Init) {
return runSync((svc) => svc.init(input))
}
export function versionedType<A extends string>(type: A): A
@@ -102,63 +319,6 @@ export namespace SyncEvent {
return [def, func as ProjectorFunc]
}
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
const projector = projectors.get(def)
if (!projector) {
throw new Error(`Projector not found for event: ${def.type}`)
}
// idempotent: need to ignore any events already logged
Database.transaction((tx) => {
projector(tx, event.data)
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
tx.insert(EventSequenceTable)
.values({
aggregate_id: event.aggregateID,
seq: event.seq,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq: event.seq },
})
.run()
tx.insert(EventTable)
.values({
id: event.id,
seq: event.seq,
aggregate_id: event.aggregateID,
type: versionedType(def.type, def.version),
data: event.data as Record<string, unknown>,
})
.run()
}
Database.effect(() => {
Bus.emit("event", {
def,
event,
})
if (options?.publish) {
const result = convertEvent(def.type, event.data)
if (result instanceof Promise) {
result.then((data) => {
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
})
} else {
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
}
}
})
})
}
// TODO:
//
// * Support applying multiple events at one time. One transaction,
@@ -166,100 +326,22 @@ export namespace SyncEvent {
// * when loading events from db, apply zod validation to ensure shape
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) {
return
}
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
}
process(def, event, { publish: !!options?.publish })
return runSync((svc) => svc.replay(event, options))
}
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
const agg = (data as Record<string, string>)[def.aggregate]
// This should never happen: we've enforced it via typescript in
// the definition
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
const { publish = true } = options || {}
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish })
},
{
behavior: "immediate",
},
)
return runSync((svc) => svc.run(def, data, options))
}
export function remove(aggregateID: string) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
return runSync((svc) => svc.remove(aggregateID))
}
export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) {
Bus.on("event", handler)
return () => Bus.off("event", handler)
return runSync((svc) => svc.subscribeAll(handler))
}
export function payloads() {
return z
.union(
registry
.entries()
.map(([type, def]) => {
return z
.object({
type: z.literal(type),
aggregate: z.literal(def.aggregate),
data: def.schema,
})
.meta({
ref: "SyncEvent" + "." + def.type,
})
})
.toArray() as any,
)
.meta({
ref: "SyncEvent",
})
return runSync((svc) => Effect.sync(() => svc.payloads()))
}
}

View File

@@ -30,7 +30,6 @@ import { Glob } from "../util/glob"
import path from "path"
import { pathToFileURL } from "url"
import { Effect, Layer, Context } from "effect"
import { EffectLogger } from "@/effect/logger"
import { FetchHttpClient, HttpClient } from "effect/unstable/http"
import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
@@ -137,7 +136,7 @@ export namespace ToolRegistry {
Effect.gen(function* () {
const pluginCtx: PluginToolContext = {
...toolCtx,
ask: (req) => Effect.runPromise(toolCtx.ask(req).pipe(Effect.provide(EffectLogger.layer))),
ask: (req) => toolCtx.ask(req),
directory: ctx.directory,
worktree: ctx.worktree,
}

View File

@@ -18,6 +18,7 @@
],
"dependencies": {
"@opencode-ai/sdk": "workspace:*",
"effect": "catalog:",
"zod": "catalog:"
},
"peerDependencies": {

View File

@@ -1,4 +1,5 @@
import { z } from "zod"
import { Effect } from "effect"
export type ToolContext = {
sessionID: string
@@ -16,7 +17,7 @@ export type ToolContext = {
worktree: string
abort: AbortSignal
metadata(input: { title?: string; metadata?: { [key: string]: any } }): void
ask(input: AskInput): Promise<void>
ask(input: AskInput): Effect.Effect<void>
}
type AskInput = {

View File

@@ -316,6 +316,29 @@ export type EventCommandExecuted = {
}
}
export type EventWorkspaceReady = {
type: "workspace.ready"
properties: {
name: string
}
}
export type EventWorkspaceFailed = {
type: "workspace.failed"
properties: {
message: string
}
}
export type EventWorkspaceStatus = {
type: "workspace.status"
properties: {
workspaceID: string
status: "connected" | "connecting" | "disconnected" | "error"
error?: string
}
}
export type QuestionOption = {
/**
* Display text (1-5 words, concise)
@@ -387,29 +410,6 @@ export type EventQuestionRejected = {
}
}
export type Todo = {
/**
* Brief description of the task
*/
content: string
/**
* Current status of the task: pending, in_progress, completed, cancelled
*/
status: string
/**
* Priority level of the task: high, medium, low
*/
priority: string
}
export type EventTodoUpdated = {
type: "todo.updated"
properties: {
sessionID: string
todos: Array<Todo>
}
}
export type SessionStatus =
| {
type: "idle"
@@ -446,6 +446,29 @@ export type EventSessionCompacted = {
}
}
export type Todo = {
/**
* Brief description of the task
*/
content: string
/**
* Current status of the task: pending, in_progress, completed, cancelled
*/
status: string
/**
* Priority level of the task: high, medium, low
*/
priority: string
}
export type EventTodoUpdated = {
type: "todo.updated"
properties: {
sessionID: string
todos: Array<Todo>
}
}
export type EventWorktreeReady = {
type: "worktree.ready"
properties: {
@@ -500,29 +523,6 @@ export type EventPtyDeleted = {
}
}
export type EventWorkspaceReady = {
type: "workspace.ready"
properties: {
name: string
}
}
export type EventWorkspaceFailed = {
type: "workspace.failed"
properties: {
message: string
}
}
export type EventWorkspaceStatus = {
type: "workspace.status"
properties: {
workspaceID: string
status: "connected" | "connecting" | "disconnected" | "error"
error?: string
}
}
export type OutputFormatText = {
type: "text"
}
@@ -995,22 +995,22 @@ export type Event =
| EventMcpToolsChanged
| EventMcpBrowserOpenFailed
| EventCommandExecuted
| EventWorkspaceReady
| EventWorkspaceFailed
| EventWorkspaceStatus
| EventQuestionAsked
| EventQuestionReplied
| EventQuestionRejected
| EventTodoUpdated
| EventSessionStatus
| EventSessionIdle
| EventSessionCompacted
| EventTodoUpdated
| EventWorktreeReady
| EventWorktreeFailed
| EventPtyCreated
| EventPtyUpdated
| EventPtyExited
| EventPtyDeleted
| EventWorkspaceReady
| EventWorkspaceFailed
| EventWorkspaceStatus
| EventMessageUpdated
| EventMessageRemoved
| EventMessagePartUpdated

View File

@@ -7986,6 +7986,71 @@
},
"required": ["type", "properties"]
},
"Event.workspace.ready": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "workspace.ready"
},
"properties": {
"type": "object",
"properties": {
"name": {
"type": "string"
}
},
"required": ["name"]
}
},
"required": ["type", "properties"]
},
"Event.workspace.failed": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "workspace.failed"
},
"properties": {
"type": "object",
"properties": {
"message": {
"type": "string"
}
},
"required": ["message"]
}
},
"required": ["type", "properties"]
},
"Event.workspace.status": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "workspace.status"
},
"properties": {
"type": "object",
"properties": {
"workspaceID": {
"type": "string",
"pattern": "^wrk.*"
},
"status": {
"type": "string",
"enum": ["connected", "connecting", "disconnected", "error"]
},
"error": {
"type": "string"
}
},
"required": ["workspaceID", "status"]
}
},
"required": ["type", "properties"]
},
"QuestionOption": {
"type": "object",
"properties": {
@@ -8136,50 +8201,6 @@
},
"required": ["type", "properties"]
},
"Todo": {
"type": "object",
"properties": {
"content": {
"description": "Brief description of the task",
"type": "string"
},
"status": {
"description": "Current status of the task: pending, in_progress, completed, cancelled",
"type": "string"
},
"priority": {
"description": "Priority level of the task: high, medium, low",
"type": "string"
}
},
"required": ["content", "status", "priority"]
},
"Event.todo.updated": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "todo.updated"
},
"properties": {
"type": "object",
"properties": {
"sessionID": {
"type": "string",
"pattern": "^ses.*"
},
"todos": {
"type": "array",
"items": {
"$ref": "#/components/schemas/Todo"
}
}
},
"required": ["sessionID", "todos"]
}
},
"required": ["type", "properties"]
},
"SessionStatus": {
"anyOf": [
{
@@ -8286,6 +8307,50 @@
},
"required": ["type", "properties"]
},
"Todo": {
"type": "object",
"properties": {
"content": {
"description": "Brief description of the task",
"type": "string"
},
"status": {
"description": "Current status of the task: pending, in_progress, completed, cancelled",
"type": "string"
},
"priority": {
"description": "Priority level of the task: high, medium, low",
"type": "string"
}
},
"required": ["content", "status", "priority"]
},
"Event.todo.updated": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "todo.updated"
},
"properties": {
"type": "object",
"properties": {
"sessionID": {
"type": "string",
"pattern": "^ses.*"
},
"todos": {
"type": "array",
"items": {
"$ref": "#/components/schemas/Todo"
}
}
},
"required": ["sessionID", "todos"]
}
},
"required": ["type", "properties"]
},
"Event.worktree.ready": {
"type": "object",
"properties": {
@@ -8440,71 +8505,6 @@
},
"required": ["type", "properties"]
},
"Event.workspace.ready": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "workspace.ready"
},
"properties": {
"type": "object",
"properties": {
"name": {
"type": "string"
}
},
"required": ["name"]
}
},
"required": ["type", "properties"]
},
"Event.workspace.failed": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "workspace.failed"
},
"properties": {
"type": "object",
"properties": {
"message": {
"type": "string"
}
},
"required": ["message"]
}
},
"required": ["type", "properties"]
},
"Event.workspace.status": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "workspace.status"
},
"properties": {
"type": "object",
"properties": {
"workspaceID": {
"type": "string",
"pattern": "^wrk.*"
},
"status": {
"type": "string",
"enum": ["connected", "connecting", "disconnected", "error"]
},
"error": {
"type": "string"
}
},
"required": ["workspaceID", "status"]
}
},
"required": ["type", "properties"]
},
"OutputFormatText": {
"type": "object",
"properties": {
@@ -9937,6 +9937,15 @@
{
"$ref": "#/components/schemas/Event.command.executed"
},
{
"$ref": "#/components/schemas/Event.workspace.ready"
},
{
"$ref": "#/components/schemas/Event.workspace.failed"
},
{
"$ref": "#/components/schemas/Event.workspace.status"
},
{
"$ref": "#/components/schemas/Event.question.asked"
},
@@ -9946,9 +9955,6 @@
{
"$ref": "#/components/schemas/Event.question.rejected"
},
{
"$ref": "#/components/schemas/Event.todo.updated"
},
{
"$ref": "#/components/schemas/Event.session.status"
},
@@ -9958,6 +9964,9 @@
{
"$ref": "#/components/schemas/Event.session.compacted"
},
{
"$ref": "#/components/schemas/Event.todo.updated"
},
{
"$ref": "#/components/schemas/Event.worktree.ready"
},
@@ -9976,15 +9985,6 @@
{
"$ref": "#/components/schemas/Event.pty.deleted"
},
{
"$ref": "#/components/schemas/Event.workspace.ready"
},
{
"$ref": "#/components/schemas/Event.workspace.failed"
},
{
"$ref": "#/components/schemas/Event.workspace.status"
},
{
"$ref": "#/components/schemas/Event.message.updated"
},