Compare commits

..

10 Commits

Author SHA1 Message Date
Kit Langton
8a3aa943dd fix: suppress unhandled interrupt error from forceInvalidate 2026-03-19 12:56:41 -04:00
Kit Langton
9be68a9fa4 fix: link to upstream PubSub shutdown PR 2026-03-19 12:40:07 -04:00
Kit Langton
2a0c9da40b Merge branch 'dev' into kit/effect-bus 2026-03-19 12:22:09 -04:00
Kit Langton
81f71c9b30 fix(bus): GlobalBus bridge for InstanceDisposed + forceInvalidate + Effect tests
Legacy subscribeAll delivers InstanceDisposed via GlobalBus because
the fiber starts asynchronously and may not be running when disposal
happens. This bridge can be removed once upstream PubSub.shutdown
properly wakes suspended subscribers.

Add forceInvalidate in Instances that closes the RcMap entry scope
regardless of refCount. Standard RcMap.invalidate bails when
refCount > 0 — an upstream issue (Effect-TS/effect-smol#1799).

Add PubSub shutdown finalizer to Bus layer so layer teardown
properly cleans up PubSubs.

Add Effect-native tests proving forkScoped + scope closure works
correctly: ensuring fires when the scope closes, streams receive
published events.

Remove stale GlobalBus disposal test (instance.ts responsibility).
2026-03-19 12:17:39 -04:00
Kit Langton
992f4f794a fix(bus): use GlobalBus for InstanceDisposed in legacy subscribeAll
The sync callback API can't wait for async layer acquisition, so
delivering InstanceDisposed through the PubSub stream is a race
condition. Instead, the legacy subscribeAll adapter listens on
GlobalBus for InstanceDisposed matching the current directory.

The Effect service's stream ending IS the disposal signal for
Effect consumers — this is only needed for the legacy callback API.

Also reverts forceInvalidate, fiber tracking, priority-based
disposal, and other workaround attempts. Clean simple solution.
2026-03-19 09:40:39 -04:00
Kit Langton
0c2b5b2c39 fix(bus): use Fiber.interrupt for clean disposal of subscribeAll
Use forkInstance + Fiber.interrupt (which awaits) instead of
runCallbackInstance + interruptUnsafe (fire-and-forget) for
subscribeAll. This ensures the fiber completes before layer
invalidation, allowing the RcMap refCount to drop to 0.

subscribeAll now delivers InstanceDisposed as the last callback
message via Effect.ensuring when the fiber is interrupted during
disposal, but not on manual unsubscribe.

Add priority support to registerDisposer so Bus can interrupt
subscription fibers (priority -1) before layer invalidation
(priority 0).

Add forkInstance helper to effect/runtime that returns a Fiber
instead of an interrupt function.
2026-03-19 08:49:06 -04:00
Kit Langton
009d77c9d8 refactor(format): make formatting explicit instead of bus-driven
Replace the implicit Bus.subscribe(File.Event.Edited) formatter with
an explicit Format.run(filepath) call in write/edit/apply_patch tools.

This ensures formatting completes before FileTime stamps and LSP
diagnostics run, rather than relying on the bus to block on subscribers.

- Add Format.run() to the Effect service interface and legacy adapter
- Call Format.run() in write, edit, and apply_patch tools after writes
- Remove Bus subscription from Format layer
2026-03-18 22:09:14 -04:00
Kit Langton
f3cf519d98 feat(bus): migrate Bus to Effect service with PubSub internals
Add Bus.Service as a ServiceMap.Service backed by Effect PubSub:
- publish() pushes to per-type + wildcard PubSubs and GlobalBus
- subscribe() returns a typed Stream via Stream.fromPubSub
- subscribeAll() returns a wildcard Stream

Legacy adapters wrap the Effect service:
- publish → runPromiseInstance
- subscribe/subscribeAll → runCallbackInstance with Stream.runForEach

Other changes:
- Register Bus.Service in Instances LayerMap
- Add runCallbackInstance helper to effect/runtime
- Remove unused Bus.once (zero callers)
- Skip PubSub creation on publish when no subscribers exist
- Move subscribe/unsubscribe logging into the Effect service layer
2026-03-18 21:36:04 -04:00
Kit Langton
645c15351b test(bus): add comprehensive test suite for Bus service
Covers publish/subscribe, multiple subscribers, unsubscribe, subscribeAll,
once, GlobalBus forwarding, instance isolation, disposal, and async subscribers.
2026-03-18 21:05:36 -04:00
Kit Langton
f63a2a2636 fix(bus): tighten GlobalBus payload and BusEvent.define types
Constrain BusEvent.define to ZodObject instead of ZodType so TS knows
event properties are always a record. Type GlobalBus payload as
{ type: string; properties: Record<string, unknown> } instead of any.

Refactor watcher test to use Bus.subscribe instead of raw GlobalBus
listener, removing hand-rolled event types and unnecessary casts.
2026-03-18 20:57:08 -04:00
29 changed files with 762 additions and 552 deletions

View File

@@ -2,7 +2,7 @@ import type { Page } from "@playwright/test"
import { test, expect } from "../fixtures"
import { openSidebar, resolveSlug, sessionIDFromUrl, setWorkspacesEnabled, waitDir, waitSlug } from "../actions"
import { promptSelector, workspaceItemSelector, workspaceNewSessionSelector } from "../selectors"
import { createSdk, dirDecode, dirSlug, resolveDirectory } from "../utils"
import { createSdk } from "../utils"
function item(space: { slug: string; raw: string }) {
return `${workspaceItemSelector(space.slug)}, ${workspaceItemSelector(space.raw)}`
@@ -101,13 +101,13 @@ test("new sessions from sidebar workspace actions stay in selected workspace", a
trackDirectory(second.directory)
await waitWorkspaceReady(page, second)
const firstSession = await createSessionFromWorkspace(page, first, `workspace one ${Date.now()}`)
const firstSession = await createSessionFromWorkspace(page, first.slug, `workspace one ${Date.now()}`)
trackSession(firstSession.sessionID, first.directory)
const secondSession = await createSessionFromWorkspace(page, second, `workspace two ${Date.now()}`)
const secondSession = await createSessionFromWorkspace(page, second.slug, `workspace two ${Date.now()}`)
trackSession(secondSession.sessionID, second.directory)
const thirdSession = await createSessionFromWorkspace(page, first, `workspace one again ${Date.now()}`)
const thirdSession = await createSessionFromWorkspace(page, first.slug, `workspace one again ${Date.now()}`)
trackSession(thirdSession.sessionID, first.directory)
await expect.poll(() => sessionDirectory(first.directory, firstSession.sessionID)).toBe(first.directory)

View File

@@ -1,7 +1,6 @@
import fs from "node:fs/promises"
import os from "node:os"
import path from "node:path"
import { base64Decode } from "@opencode-ai/util/encode"
import type { Page } from "@playwright/test"
import { test, expect } from "../fixtures"
@@ -20,7 +19,7 @@ import {
waitSlug,
} from "../actions"
import { dropdownMenuContentSelector, inlineInputSelector, workspaceItemSelector } from "../selectors"
import { createSdk, dirDecode, dirSlug } from "../utils"
import { createSdk, dirSlug } from "../utils"
async function setupWorkspaceTest(page: Page, project: { slug: string }) {
const rootSlug = project.slug
@@ -258,7 +257,7 @@ test("can delete a workspace", async ({ page, withProject }) => {
await clickMenuItem(menu, /^Delete$/i, { force: true })
await confirmDialog(page, /^Delete workspace$/i)
await expect.poll(() => dirDecode(slugFromUrl(page.url()))).toBe(project.directory)
await expect.poll(() => base64Decode(slugFromUrl(page.url()))).toBe(project.directory)
await expect
.poll(

View File

@@ -1,5 +1,5 @@
import { createOpencodeClient } from "@opencode-ai/sdk/v2/client"
import { base64Decode, base64Encode, checksum } from "@opencode-ai/util/encode"
import { base64Encode, checksum } from "@opencode-ai/util/encode"
export const serverHost = process.env.PLAYWRIGHT_SERVER_HOST ?? "127.0.0.1"
export const serverPort = process.env.PLAYWRIGHT_SERVER_PORT ?? "4096"
@@ -48,10 +48,6 @@ export function dirSlug(directory: string) {
return base64Encode(directory)
}
export function dirDecode(slug: string) {
return base64Decode(slug)
}
export function dirPath(directory: string) {
return `/${dirSlug(directory)}`
}

View File

@@ -260,10 +260,7 @@ export namespace Agent {
return pipe(
await state(),
values(),
sortBy(
[(x) => (cfg.default_agent ? x.name === cfg.default_agent : x.name === "build"), "desc"],
[(x) => x.name, "asc"],
),
sortBy([(x) => (cfg.default_agent ? x.name === cfg.default_agent : x.name === "build"), "desc"]),
)
}

View File

@@ -1,5 +1,5 @@
import z from "zod"
import type { ZodType } from "zod"
import type { ZodObject, ZodRawShape } from "zod"
import { Log } from "../util/log"
export namespace BusEvent {
@@ -9,7 +9,7 @@ export namespace BusEvent {
const registry = new Map<string, Definition>()
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
export function define<Type extends string, Properties extends ZodObject<ZodRawShape>>(type: Type, properties: Properties) {
const result = {
type,
properties,

View File

@@ -4,7 +4,7 @@ export const GlobalBus = new EventEmitter<{
event: [
{
directory?: string
payload: any
payload: { type: string; properties: Record<string, unknown> }
},
]
}>()

View File

@@ -1,12 +1,13 @@
import z from "zod"
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
import { runCallbackInstance, runPromiseInstance } from "../effect/runtime"
export namespace Bus {
const log = Log.create({ service: "bus" })
type Subscription = (event: any) => void
export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
@@ -15,91 +16,130 @@ export namespace Bus {
}),
)
const state = Instance.state(
() => {
const subscriptions = new Map<any, Subscription[]>()
// ---------------------------------------------------------------------------
// Service definition
// ---------------------------------------------------------------------------
return {
subscriptions,
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
type: D["type"]
properties: z.infer<D["properties"]>
}
export interface Interface {
readonly publish: <D extends BusEvent.Definition>(
def: D,
properties: z.output<D["properties"]>,
) => Effect.Effect<void>
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
readonly subscribeAll: () => Stream.Stream<Payload>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const pubsubs = new Map<string, PubSub.PubSub<Payload>>()
const wildcardPubSub = yield* PubSub.unbounded<Payload>()
const getOrCreate = Effect.fnUntraced(function* (type: string) {
let ps = pubsubs.get(type)
if (!ps) {
ps = yield* PubSub.unbounded<Payload>()
pubsubs.set(type, ps)
}
return ps
})
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return Effect.gen(function* () {
const payload: Payload = { type: def.type, properties }
log.info("publishing", { type: def.type })
const ps = pubsubs.get(def.type)
if (ps) yield* PubSub.publish(ps, payload)
yield* PubSub.publish(wildcardPubSub, payload)
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
})
}
},
async (entry) => {
const wildcard = entry.subscriptions.get("*")
if (!wildcard) return
const event = {
type: InstanceDisposed.type,
properties: {
directory: Instance.directory,
},
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
log.info("subscribing", { type: def.type })
return Stream.unwrap(
Effect.gen(function* () {
const ps = yield* getOrCreate(def.type)
return Stream.fromPubSub(ps) as Stream.Stream<Payload<D>>
}),
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
}
for (const sub of [...wildcard]) {
sub(event)
function subscribeAll(): Stream.Stream<Payload> {
log.info("subscribing", { type: "*" })
return Stream.fromPubSub(wildcardPubSub).pipe(
Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))),
)
}
},
// Shut down all PubSubs when the layer is torn down.
// This causes Stream.fromPubSub consumers to end, triggering
// their ensuring/finalizers.
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
log.info("shutting down PubSubs")
yield* PubSub.shutdown(wildcardPubSub)
for (const ps of pubsubs.values()) {
yield* PubSub.shutdown(ps)
}
}),
)
return Service.of({ publish, subscribe, subscribeAll })
}),
)
export async function publish<Definition extends BusEvent.Definition>(
def: Definition,
properties: z.output<Definition["properties"]>,
) {
const payload = {
type: def.type,
properties,
}
log.info("publishing", {
type: def.type,
})
const pending = []
for (const key of [def.type, "*"]) {
const match = [...(state().subscriptions.get(key) ?? [])]
for (const sub of match) {
pending.push(sub(payload))
}
}
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
return Promise.all(pending)
// ---------------------------------------------------------------------------
// Legacy adapters — plain function API wrapping the Effect service
// ---------------------------------------------------------------------------
function runStream(stream: (svc: Interface) => Stream.Stream<Payload>, callback: (event: any) => void) {
return runCallbackInstance(
Service.use((svc) => stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg))))),
)
}
export function subscribe<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
) {
return raw(def.type, callback)
export function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return runPromiseInstance(Service.use((svc) => svc.publish(def, properties)))
}
export function once<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: {
type: Definition["type"]
properties: z.infer<Definition["properties"]>
}) => "done" | undefined,
) {
const unsub = subscribe(def, (event) => {
if (callback(event)) unsub()
})
export function subscribe<D extends BusEvent.Definition>(def: D, callback: (event: Payload<D>) => void) {
return runStream((svc) => svc.subscribe(def), callback)
}
export function subscribeAll(callback: (event: any) => void) {
return raw("*", callback)
}
const directory = Instance.directory
function raw(type: string, callback: (event: any) => void) {
log.info("subscribing", { type })
const subscriptions = state().subscriptions
let match = subscriptions.get(type) ?? []
match.push(callback)
subscriptions.set(type, match)
// InstanceDisposed is delivered via GlobalBus because the legacy
// adapter's fiber starts asynchronously and may not be running when
// disposal happens. In the Effect-native path, forkScoped + scope
// closure handles this correctly. This bridge can be removed once
// upstream PubSub.shutdown properly wakes suspended subscribers:
// https://github.com/Effect-TS/effect-smol/pull/1800
const onDispose = (evt: { directory?: string; payload: any }) => {
if (evt.payload.type !== InstanceDisposed.type) return
if (evt.directory !== directory) return
callback(evt.payload)
GlobalBus.off("event", onDispose)
}
GlobalBus.on("event", onDispose)
const interrupt = runStream((svc) => svc.subscribeAll(), callback)
return () => {
log.info("unsubscribing", { type })
const match = subscriptions.get(type)
if (!match) return
const index = match.indexOf(callback)
if (index === -1) return
match.splice(index, 1)
GlobalBus.off("event", onDispose)
interrupt()
}
}
}

View File

@@ -123,7 +123,7 @@ export namespace Workspace {
await parseSSE(res.body, stop, (event) => {
GlobalBus.emit("event", {
directory: space.id,
payload: event,
payload: event as { type: string; properties: Record<string, unknown> },
})
})
// Wait 250ms and retry if SSE connection fails

View File

@@ -1,4 +1,5 @@
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
import { Effect, Exit, Fiber, Layer, LayerMap, MutableHashMap, Scope, ServiceMap } from "effect"
import { Bus } from "@/bus"
import { File } from "@/file"
import { FileTime } from "@/file/time"
import { FileWatcher } from "@/file/watcher"
@@ -16,6 +17,7 @@ import { registerDisposer } from "./instance-registry"
export { InstanceContext } from "./instance-context"
export type InstanceServices =
| Bus.Service
| Question.Service
| PermissionNext.Service
| ProviderAuth.Service
@@ -36,6 +38,7 @@ export type InstanceServices =
function lookup(_key: string) {
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current))
return Layer.mergeAll(
Layer.fresh(Bus.layer),
Layer.fresh(Question.layer),
Layer.fresh(PermissionNext.layer),
Layer.fresh(ProviderAuth.defaultLayer),
@@ -56,7 +59,23 @@ export class Instances extends ServiceMap.Service<Instances, LayerMap.LayerMap<s
Instances,
Effect.gen(function* () {
const layerMap = yield* LayerMap.make(lookup, { idleTimeToLive: Infinity })
const unregister = registerDisposer((directory) => Effect.runPromise(layerMap.invalidate(directory)))
// Force-invalidate closes the RcMap entry scope even when refCount > 0.
// Standard RcMap.invalidate bails in that case, leaving long-running
// consumer fibers orphaned. This is an upstream issue:
// https://github.com/Effect-TS/effect-smol/pull/1799
const forceInvalidate = (directory: string) =>
Effect.gen(function* () {
const rcMap = layerMap.rcMap
if (rcMap.state._tag === "Closed") return
const entry = MutableHashMap.get(rcMap.state.map, directory)
if (entry._tag === "None") return
MutableHashMap.remove(rcMap.state.map, directory)
if (entry.value.fiber) yield* Fiber.interrupt(entry.value.fiber)
yield* Scope.close(entry.value.scope, Exit.void)
}).pipe(Effect.uninterruptible, Effect.ignore)
const unregister = registerDisposer((directory) => Effect.runPromise(forceInvalidate(directory)))
yield* Effect.addFinalizer(() => Effect.sync(unregister))
return Instances.of(layerMap)
}),

View File

@@ -18,6 +18,12 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function runCallbackInstance<A, E>(
effect: Effect.Effect<A, E, InstanceServices>,
): (interruptor?: number) => void {
return runtime.runCallback(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function disposeRuntime() {
return runtime.dispose()
}

View File

@@ -4,9 +4,7 @@ import { InstanceContext } from "@/effect/instance-context"
import path from "path"
import { mergeDeep } from "remeda"
import z from "zod"
import { Bus } from "../bus"
import { Config } from "../config/config"
import { File } from "../file"
import { Instance } from "../project/instance"
import { Process } from "../util/process"
import { Log } from "../util/log"
@@ -27,6 +25,7 @@ export namespace Format {
export type Status = z.infer<typeof Status>
export interface Interface {
readonly run: (filepath: string) => Effect.Effect<void>
readonly status: () => Effect.Effect<Status[]>
}
@@ -90,48 +89,44 @@ export namespace Format {
return result
}
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribe(
File.Event.Edited,
Instance.bind(async (payload) => {
const file = payload.properties.file
log.info("formatting", { file })
const ext = path.extname(file)
const run = Effect.fn("Format.run")(function* (filepath: string) {
log.info("formatting", { file: filepath })
const ext = path.extname(filepath)
for (const item of await getFormatter(ext)) {
log.info("running", { command: item.command })
try {
const proc = Process.spawn(
item.command.map((x) => x.replace("$FILE", file)),
{
cwd: instance.directory,
env: { ...process.env, ...item.environment },
stdout: "ignore",
stderr: "ignore",
},
)
const exit = await proc.exited
if (exit !== 0) {
log.error("failed", {
command: item.command,
...item.environment,
})
}
} catch (error) {
log.error("failed to format file", {
error,
command: item.command,
...item.environment,
file,
})
}
for (const item of yield* Effect.promise(() => getFormatter(ext))) {
log.info("running", { command: item.command })
yield* Effect.tryPromise({
try: async () => {
const proc = Process.spawn(
item.command.map((x) => x.replace("$FILE", filepath)),
{
cwd: instance.directory,
env: { ...process.env, ...item.environment },
stdout: "ignore",
stderr: "ignore",
},
)
const exit = await proc.exited
if (exit !== 0) {
log.error("failed", {
command: item.command,
...item.environment,
})
}
}),
),
),
(unsubscribe) => Effect.sync(unsubscribe),
)
},
catch: (error) => {
log.error("failed to format file", {
error,
command: item.command,
...item.environment,
file: filepath,
})
return error
},
}).pipe(Effect.ignore)
}
})
log.info("init")
const status = Effect.fn("Format.status")(function* () {
@@ -147,10 +142,14 @@ export namespace Format {
return result
})
return Service.of({ status })
return Service.of({ run, status })
}),
)
export async function run(filepath: string) {
return runPromiseInstance(Service.use((s) => s.run(filepath)))
}
export async function status() {
return runPromiseInstance(Service.use((s) => s.status()))
}

View File

@@ -47,6 +47,8 @@ import { ProviderTransform } from "./transform"
import { Installation } from "../installation"
import { ModelID, ProviderID } from "./schema"
const DEFAULT_CHUNK_TIMEOUT = 300_000
export namespace Provider {
const log = Log.create({ service: "provider" })
@@ -1128,7 +1130,7 @@ export namespace Provider {
if (existing) return existing
const customFetch = options["fetch"]
const chunkTimeout = options["chunkTimeout"]
const chunkTimeout = options["chunkTimeout"] || DEFAULT_CHUNK_TIMEOUT
delete options["chunkTimeout"]
options["fetch"] = async (input: any, init?: BunFetchRequestInit) => {

View File

@@ -1,85 +0,0 @@
import { Hono } from "hono"
import { describeRoute, resolver } from "hono-openapi"
import { streamSSE } from "hono/streaming"
import { Log } from "@/util/log"
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { lazy } from "../../util/lazy"
import { AsyncQueue } from "../../util/queue"
import { Instance } from "@/project/instance"
const log = Log.create({ service: "server" })
export const EventRoutes = lazy(() =>
new Hono().get(
"/event",
describeRoute({
summary: "Subscribe to events",
description: "Get events",
operationId: "event.subscribe",
responses: {
200: {
description: "Event stream",
content: {
"text/event-stream": {
schema: resolver(BusEvent.payloads()),
},
},
},
},
}),
async (c) => {
log.info("event connected")
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
let done = false
q.push(
JSON.stringify({
type: "server.connected",
properties: {},
}),
)
// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
q.push(
JSON.stringify({
type: "server.heartbeat",
properties: {},
}),
)
}, 10_000)
const unsub = Bus.subscribeAll((event) => {
q.push(JSON.stringify(event))
if (event.type === Bus.InstanceDisposed.type) {
stop()
}
})
const stop = () => {
if (done) return
done = true
clearInterval(heartbeat)
unsub()
q.push(null)
log.info("event disconnected")
}
stream.onAbort(stop)
try {
for await (const data of q) {
if (data === null) return
await stream.writeSSE({ data })
}
} finally {
stop()
}
})
},
),
)

View File

@@ -4,7 +4,6 @@ import { streamSSE } from "hono/streaming"
import z from "zod"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { AsyncQueue } from "@/util/queue"
import { Instance } from "../../project/instance"
import { Installation } from "@/installation"
import { Log } from "../../util/log"
@@ -70,54 +69,41 @@ export const GlobalRoutes = lazy(() =>
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
let done = false
q.push(
JSON.stringify({
stream.writeSSE({
data: JSON.stringify({
payload: {
type: "server.connected",
properties: {},
},
}),
)
})
async function handler(event: any) {
await stream.writeSSE({
data: JSON.stringify(event),
})
}
GlobalBus.on("event", handler)
// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
q.push(
JSON.stringify({
stream.writeSSE({
data: JSON.stringify({
payload: {
type: "server.heartbeat",
properties: {},
},
}),
)
})
}, 10_000)
async function handler(event: any) {
q.push(JSON.stringify(event))
}
GlobalBus.on("event", handler)
const stop = () => {
if (done) return
done = true
clearInterval(heartbeat)
GlobalBus.off("event", handler)
q.push(null)
log.info("event disconnected")
}
stream.onAbort(stop)
try {
for await (const data of q) {
if (data === null) return
await stream.writeSSE({ data })
}
} finally {
stop()
}
await new Promise<void>((resolve) => {
stream.onAbort(() => {
clearInterval(heartbeat)
GlobalBus.off("event", handler)
resolve()
log.info("global event disconnected")
})
})
})
},
)

View File

@@ -1,7 +1,10 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { Log } from "../util/log"
import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi"
import { Hono } from "hono"
import { cors } from "hono/cors"
import { streamSSE } from "hono/streaming"
import { proxy } from "hono/proxy"
import { basicAuth } from "hono/basic-auth"
import z from "zod"
@@ -31,7 +34,6 @@ import { FileRoutes } from "./routes/file"
import { ConfigRoutes } from "./routes/config"
import { ExperimentalRoutes } from "./routes/experimental"
import { ProviderRoutes } from "./routes/provider"
import { EventRoutes } from "./routes/event"
import { InstanceBootstrap } from "../project/bootstrap"
import { NotFoundError } from "../storage/db"
import type { ContentfulStatusCode } from "hono/utils/http-status"
@@ -249,7 +251,6 @@ export namespace Server {
.route("/question", QuestionRoutes())
.route("/provider", ProviderRoutes())
.route("/", FileRoutes())
.route("/", EventRoutes())
.route("/mcp", McpRoutes())
.route("/tui", TuiRoutes())
.post(
@@ -497,6 +498,64 @@ export namespace Server {
return c.json(await Format.status())
},
)
.get(
"/event",
describeRoute({
summary: "Subscribe to events",
description: "Get events",
operationId: "event.subscribe",
responses: {
200: {
description: "Event stream",
content: {
"text/event-stream": {
schema: resolver(BusEvent.payloads()),
},
},
},
},
}),
async (c) => {
log.info("event connected")
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
stream.writeSSE({
data: JSON.stringify({
type: "server.connected",
properties: {},
}),
})
const unsub = Bus.subscribeAll(async (event) => {
await stream.writeSSE({
data: JSON.stringify(event),
})
if (event.type === Bus.InstanceDisposed.type) {
stream.close()
}
})
// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
stream.writeSSE({
data: JSON.stringify({
type: "server.heartbeat",
properties: {},
}),
})
}, 10_000)
await new Promise<void>((resolve) => {
stream.onAbort(() => {
clearInterval(heartbeat)
unsub()
resolve()
log.info("event disconnected")
})
})
})
},
)
.all("/*", async (c) => {
const path = c.req.path

View File

@@ -204,7 +204,7 @@ export namespace Skill {
const available = Effect.fn("Skill.available")(function* (agent?: Agent.Info) {
yield* Effect.promise(() => state.ensure())
const list = Object.values(state.skills).toSorted((a, b) => a.name.localeCompare(b.name))
const list = Object.values(state.skills)
if (!agent) return list
return list.filter((skill) => PermissionNext.evaluate("skill", skill.name, agent.permission).action !== "deny")
})

View File

@@ -10,6 +10,7 @@ import { createTwoFilesPatch, diffLines } from "diff"
import { assertExternalDirectory } from "./external-directory"
import { trimDiff } from "./edit"
import { LSP } from "../lsp"
import { Format } from "../format"
import { Filesystem } from "../util/filesystem"
import DESCRIPTION from "./apply_patch.txt"
import { File } from "../file"
@@ -220,6 +221,7 @@ export const ApplyPatchTool = Tool.define("apply_patch", {
}
if (edited) {
await Format.run(edited)
await Bus.publish(File.Event.Edited, {
file: edited,
})

View File

@@ -13,6 +13,7 @@ import { File } from "../file"
import { FileWatcher } from "../file/watcher"
import { Bus } from "../bus"
import { FileTime } from "../file/time"
import { Format } from "../format"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
import { Snapshot } from "@/snapshot"
@@ -71,6 +72,7 @@ export const EditTool = Tool.define("edit", {
},
})
await Filesystem.write(filePath, params.newString)
await Format.run(filePath)
await Bus.publish(File.Event.Edited, {
file: filePath,
})
@@ -108,6 +110,7 @@ export const EditTool = Tool.define("edit", {
})
await Filesystem.write(filePath, contentNew)
await Format.run(filePath)
await Bus.publish(File.Event.Edited, {
file: filePath,
})

View File

@@ -33,13 +33,12 @@ export const TaskTool = Tool.define("task", async (ctx) => {
const accessibleAgents = caller
? agents.filter((a) => PermissionNext.evaluate("task", a.name, caller.permission).action !== "deny")
: agents
const list = accessibleAgents.toSorted((a, b) => a.name.localeCompare(b.name))
const description = DESCRIPTION.replace(
"{agents}",
list.map((a) => `- ${a.name}: ${a.description ?? "This subagent should only be called manually by the user."}`).join(
"\n",
),
accessibleAgents
.map((a) => `- ${a.name}: ${a.description ?? "This subagent should only be called manually by the user."}`)
.join("\n"),
)
return {
description,

View File

@@ -8,6 +8,7 @@ import { Bus } from "../bus"
import { File } from "../file"
import { FileWatcher } from "../file/watcher"
import { FileTime } from "../file/time"
import { Format } from "../format"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
import { trimDiff } from "./edit"
@@ -42,6 +43,7 @@ export const WriteTool = Tool.define("write", {
})
await Filesystem.write(filepath, params.content)
await Format.run(filepath)
await Bus.publish(File.Event.Edited, {
file: filepath,
})

View File

@@ -384,32 +384,6 @@ test("multiple custom agents can be defined", async () => {
})
})
test("Agent.list keeps the default agent first and sorts the rest by name", async () => {
await using tmp = await tmpdir({
config: {
default_agent: "plan",
agent: {
zebra: {
description: "Zebra",
mode: "subagent",
},
alpha: {
description: "Alpha",
mode: "subagent",
},
},
},
})
await Instance.provide({
directory: tmp.path,
fn: async () => {
const names = (await Agent.list()).map((a) => a.name)
expect(names[0]).toBe("plan")
expect(names.slice(1)).toEqual(names.slice(1).toSorted((a, b) => a.localeCompare(b)))
},
})
})
test("Agent.get returns undefined for non-existent agent", async () => {
await using tmp = await tmpdir()
await Instance.provide({

View File

@@ -0,0 +1,372 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Deferred, Effect, Stream } from "effect"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { GlobalBus } from "../../src/bus/global"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
// ---------------------------------------------------------------------------
// Test event definitions
// ---------------------------------------------------------------------------
const TestEvent = {
Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function withInstance(directory: string, fn: () => Promise<void>) {
return Instance.provide({ directory, fn })
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("Bus", () => {
afterEach(() => Instance.disposeAll())
describe("publish + subscribe", () => {
test("subscriber receives matching events", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
await Bus.publish(TestEvent.Ping, { value: 42 })
await Bus.publish(TestEvent.Ping, { value: 99 })
})
expect(received).toEqual([42, 99])
})
test("subscriber does not receive events of other types", async () => {
await using tmp = await tmpdir()
const pings: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
pings.push(evt.properties.value)
})
await Bus.publish(TestEvent.Pong, { message: "hello" })
await Bus.publish(TestEvent.Ping, { value: 1 })
})
expect(pings).toEqual([1])
})
test("publish with no subscribers does not throw", async () => {
await using tmp = await tmpdir()
await withInstance(tmp.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
})
})
})
describe("multiple subscribers", () => {
test("all subscribers for same event type are called", async () => {
await using tmp = await tmpdir()
const a: number[] = []
const b: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value))
Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value))
await Bus.publish(TestEvent.Ping, { value: 7 })
})
expect(a).toEqual([7])
expect(b).toEqual([7])
})
test("subscribers are called in registration order", async () => {
await using tmp = await tmpdir()
const order: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, () => order.push("first"))
Bus.subscribe(TestEvent.Ping, () => order.push("second"))
Bus.subscribe(TestEvent.Ping, () => order.push("third"))
await Bus.publish(TestEvent.Ping, { value: 0 })
})
expect(order).toEqual(["first", "second", "third"])
})
})
describe("unsubscribe", () => {
test("unsubscribe stops delivery", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
unsub()
await Bus.publish(TestEvent.Ping, { value: 2 })
})
expect(received).toEqual([1])
})
test("unsubscribe is idempotent", async () => {
await using tmp = await tmpdir()
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent.Ping, () => {})
unsub()
unsub() // should not throw
})
})
test("unsubscribing one does not affect others", async () => {
await using tmp = await tmpdir()
const a: number[] = []
const b: number[] = []
await withInstance(tmp.path, async () => {
const unsubA = Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value))
Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value))
await Bus.publish(TestEvent.Ping, { value: 1 })
unsubA()
await Bus.publish(TestEvent.Ping, { value: 2 })
})
expect(a).toEqual([1])
expect(b).toEqual([1, 2])
})
})
describe("subscribeAll", () => {
test("receives events of all types", async () => {
await using tmp = await tmpdir()
const all: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
all.push(evt.type)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bus.publish(TestEvent.Pong, { message: "hi" })
})
expect(all).toEqual(["test.ping", "test.pong"])
})
test("subscribeAll + typed subscribe both fire", async () => {
await using tmp = await tmpdir()
const typed: number[] = []
const wild: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => typed.push(evt.properties.value))
Bus.subscribeAll((evt) => wild.push(evt.type))
await Bus.publish(TestEvent.Ping, { value: 5 })
})
expect(typed).toEqual([5])
expect(wild).toEqual(["test.ping"])
})
test("unsubscribe from subscribeAll", async () => {
await using tmp = await tmpdir()
const all: string[] = []
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribeAll((evt) => all.push(evt.type))
await Bus.publish(TestEvent.Ping, { value: 1 })
unsub()
await Bus.publish(TestEvent.Pong, { message: "missed" })
})
expect(all).toEqual(["test.ping"])
})
test("subscribeAll delivers InstanceDisposed on disposal", async () => {
await using tmp = await tmpdir()
const all: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
all.push(evt.type)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
})
await Instance.disposeAll()
expect(all).toContain("test.ping")
expect(all).toContain(Bus.InstanceDisposed.type)
})
test("manual unsubscribe suppresses InstanceDisposed", async () => {
await using tmp = await tmpdir()
const all: string[] = []
let unsub = () => {}
await withInstance(tmp.path, async () => {
unsub = Bus.subscribeAll((evt) => {
all.push(evt.type)
})
})
unsub()
await Instance.disposeAll()
expect(all).not.toContain(Bus.InstanceDisposed.type)
})
})
describe("GlobalBus forwarding", () => {
test("publish emits to GlobalBus with directory", async () => {
await using tmp = await tmpdir()
const globalEvents: Array<{ directory?: string; payload: any }> = []
const handler = (evt: any) => globalEvents.push(evt)
GlobalBus.on("event", handler)
try {
await withInstance(tmp.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 42 })
})
const ping = globalEvents.find((e) => e.payload.type === "test.ping")
expect(ping).toBeDefined()
expect(ping!.directory).toBe(tmp.path)
expect(ping!.payload).toEqual({
type: "test.ping",
properties: { value: 42 },
})
} finally {
GlobalBus.off("event", handler)
}
})
})
describe("instance isolation", () => {
test("subscribers in one instance do not receive events from another", async () => {
await using tmpA = await tmpdir()
await using tmpB = await tmpdir()
const eventsA: number[] = []
const eventsB: number[] = []
await withInstance(tmpA.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => eventsA.push(evt.properties.value))
})
await withInstance(tmpB.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => eventsB.push(evt.properties.value))
})
await withInstance(tmpA.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
})
await withInstance(tmpB.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 2 })
})
expect(eventsA).toEqual([1])
expect(eventsB).toEqual([2])
})
})
describe("async subscribers", () => {
test("publish is fire-and-forget (does not await subscriber callbacks)", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, async (evt) => {
await new Promise((r) => setTimeout(r, 10))
received.push(evt.properties.value)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
// Give the async subscriber time to complete
await new Promise((r) => setTimeout(r, 50))
})
expect(received).toEqual([1])
})
})
describe("Effect service", () => {
test("subscribeAll stream receives published events", async () => {
await using tmp = await tmpdir()
const received: string[] = []
await withInstance(tmp.path, () =>
Effect.runPromise(
Effect.scoped(
Effect.gen(function* () {
const svc = yield* Bus.Service
const done = yield* Deferred.make<void>()
let count = 0
yield* Effect.forkScoped(
svc.subscribeAll().pipe(
Stream.runForEach((msg) =>
Effect.gen(function* () {
received.push(msg.type)
if (++count >= 2) yield* Deferred.succeed(done, undefined)
}),
),
),
)
// Let the forked fiber start and subscribe to the PubSub
yield* Effect.yieldNow
yield* svc.publish(TestEvent.Ping, { value: 1 })
yield* svc.publish(TestEvent.Pong, { message: "hi" })
yield* Deferred.await(done)
}),
).pipe(Effect.provide(Bus.layer)),
),
)
expect(received).toEqual(["test.ping", "test.pong"])
})
test("subscribeAll stream ends with ensuring when scope closes", async () => {
await using tmp = await tmpdir()
let ensuringFired = false
await withInstance(tmp.path, () =>
Effect.runPromise(
Effect.scoped(
Effect.gen(function* () {
const svc = yield* Bus.Service
yield* Effect.forkScoped(
svc.subscribeAll().pipe(
Stream.runForEach(() => Effect.void),
Effect.ensuring(Effect.sync(() => {
ensuringFired = true
})),
),
)
yield* svc.publish(TestEvent.Ping, { value: 1 })
yield* Effect.yieldNow
}),
).pipe(Effect.provide(Bus.layer)),
),
)
expect(ensuringFired).toBe(true)
})
})
})

View File

@@ -5,9 +5,9 @@ import path from "path"
import { Deferred, Effect, Option } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { Bus } from "../../src/bus"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
// Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
@@ -16,7 +16,6 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
// Helpers
// ---------------------------------------------------------------------------
type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
/** Run `body` with a live FileWatcher service. */
@@ -36,22 +35,17 @@ function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
let done = false
function on(evt: BusUpdate) {
const unsub = Bus.subscribe(FileWatcher.Event.Updated, (evt) => {
if (done) return
if (evt.directory !== directory) return
if (evt.payload.type !== FileWatcher.Event.Updated.type) return
if (!check(evt.payload.properties)) return
hit(evt.payload.properties)
}
if (!check(evt.properties)) return
hit(evt.properties)
})
function cleanup() {
return () => {
if (done) return
done = true
GlobalBus.off("event", on)
unsub()
}
GlobalBus.on("event", on)
return cleanup
}
function wait(directory: string, check: (evt: WatcherEvent) => boolean) {

View File

@@ -1,59 +0,0 @@
import { describe, expect, test } from "bun:test"
import path from "path"
import { Agent } from "../../src/agent/agent"
import { Instance } from "../../src/project/instance"
import { SystemPrompt } from "../../src/session/system"
import { tmpdir } from "../fixture/fixture"
describe("session.system", () => {
test("skills output is sorted by name and stable across calls", async () => {
await using tmp = await tmpdir({
git: true,
init: async (dir) => {
for (const [name, description] of [
["zeta-skill", "Zeta skill."],
["alpha-skill", "Alpha skill."],
["middle-skill", "Middle skill."],
]) {
const skillDir = path.join(dir, ".opencode", "skill", name)
await Bun.write(
path.join(skillDir, "SKILL.md"),
`---
name: ${name}
description: ${description}
---
# ${name}
`,
)
}
},
})
const home = process.env.OPENCODE_TEST_HOME
process.env.OPENCODE_TEST_HOME = tmp.path
try {
await Instance.provide({
directory: tmp.path,
fn: async () => {
const build = await Agent.get("build")
const first = await SystemPrompt.skills(build!)
const second = await SystemPrompt.skills(build!)
expect(first).toBe(second)
const alpha = first!.indexOf("<name>alpha-skill</name>")
const middle = first!.indexOf("<name>middle-skill</name>")
const zeta = first!.indexOf("<name>zeta-skill</name>")
expect(alpha).toBeGreaterThan(-1)
expect(middle).toBeGreaterThan(alpha)
expect(zeta).toBeGreaterThan(middle)
},
})
} finally {
process.env.OPENCODE_TEST_HOME = home
}
})
})

View File

@@ -54,56 +54,6 @@ description: Skill for tool tests.
}
})
test("description sorts skills by name and is stable across calls", async () => {
await using tmp = await tmpdir({
git: true,
init: async (dir) => {
for (const [name, description] of [
["zeta-skill", "Zeta skill."],
["alpha-skill", "Alpha skill."],
["middle-skill", "Middle skill."],
]) {
const skillDir = path.join(dir, ".opencode", "skill", name)
await Bun.write(
path.join(skillDir, "SKILL.md"),
`---
name: ${name}
description: ${description}
---
# ${name}
`,
)
}
},
})
const home = process.env.OPENCODE_TEST_HOME
process.env.OPENCODE_TEST_HOME = tmp.path
try {
await Instance.provide({
directory: tmp.path,
fn: async () => {
const first = await SkillTool.init()
const second = await SkillTool.init()
expect(first.description).toBe(second.description)
const alpha = first.description.indexOf("**alpha-skill**: Alpha skill.")
const middle = first.description.indexOf("**middle-skill**: Middle skill.")
const zeta = first.description.indexOf("**zeta-skill**: Zeta skill.")
expect(alpha).toBeGreaterThan(-1)
expect(middle).toBeGreaterThan(alpha)
expect(zeta).toBeGreaterThan(middle)
},
})
} finally {
process.env.OPENCODE_TEST_HOME = home
}
})
test("execute returns skill content block with files", async () => {
await using tmp = await tmpdir({
git: true,

View File

@@ -1,45 +0,0 @@
import { describe, expect, test } from "bun:test"
import { Agent } from "../../src/agent/agent"
import { Instance } from "../../src/project/instance"
import { TaskTool } from "../../src/tool/task"
import { tmpdir } from "../fixture/fixture"
describe("tool.task", () => {
test("description sorts subagents by name and is stable across calls", async () => {
await using tmp = await tmpdir({
config: {
agent: {
zebra: {
description: "Zebra agent",
mode: "subagent",
},
alpha: {
description: "Alpha agent",
mode: "subagent",
},
},
},
})
await Instance.provide({
directory: tmp.path,
fn: async () => {
const build = await Agent.get("build")
const first = await TaskTool.init({ agent: build })
const second = await TaskTool.init({ agent: build })
expect(first.description).toBe(second.description)
const alpha = first.description.indexOf("- alpha: Alpha agent")
const explore = first.description.indexOf("- explore:")
const general = first.description.indexOf("- general:")
const zebra = first.description.indexOf("- zebra: Zebra agent")
expect(alpha).toBeGreaterThan(-1)
expect(explore).toBeGreaterThan(alpha)
expect(general).toBeGreaterThan(explore)
expect(zebra).toBeGreaterThan(general)
},
})
})
})

View File

@@ -2845,38 +2845,6 @@ export class File extends HeyApiClient {
}
}
export class Event extends HeyApiClient {
/**
* Subscribe to events
*
* Get events
*/
public subscribe<ThrowOnError extends boolean = false>(
parameters?: {
directory?: string
workspace?: string
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "query", key: "directory" },
{ in: "query", key: "workspace" },
],
},
],
)
return (options?.client ?? this.client).sse.get<EventSubscribeResponses, unknown, ThrowOnError>({
url: "/event",
...options,
...params,
})
}
}
export class Auth2 extends HeyApiClient {
/**
* Remove MCP OAuth
@@ -3898,6 +3866,38 @@ export class Formatter extends HeyApiClient {
}
}
export class Event extends HeyApiClient {
/**
* Subscribe to events
*
* Get events
*/
public subscribe<ThrowOnError extends boolean = false>(
parameters?: {
directory?: string
workspace?: string
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "query", key: "directory" },
{ in: "query", key: "workspace" },
],
},
],
)
return (options?.client ?? this.client).sse.get<EventSubscribeResponses, unknown, ThrowOnError>({
url: "/event",
...options,
...params,
})
}
}
export class OpencodeClient extends HeyApiClient {
public static readonly __registry = new HeyApiRegistry<OpencodeClient>()
@@ -3981,11 +3981,6 @@ export class OpencodeClient extends HeyApiClient {
return (this._file ??= new File({ client: this.client }))
}
private _event?: Event
get event(): Event {
return (this._event ??= new Event({ client: this.client }))
}
private _mcp?: Mcp
get mcp(): Mcp {
return (this._mcp ??= new Mcp({ client: this.client }))
@@ -4030,4 +4025,9 @@ export class OpencodeClient extends HeyApiClient {
get formatter(): Formatter {
return (this._formatter ??= new Formatter({ client: this.client }))
}
private _event?: Event
get event(): Event {
return (this._event ??= new Event({ client: this.client }))
}
}

View File

@@ -4229,25 +4229,6 @@ export type FileStatusResponses = {
export type FileStatusResponse = FileStatusResponses[keyof FileStatusResponses]
export type EventSubscribeData = {
body?: never
path?: never
query?: {
directory?: string
workspace?: string
}
url: "/event"
}
export type EventSubscribeResponses = {
/**
* Event stream
*/
200: Event
}
export type EventSubscribeResponse = EventSubscribeResponses[keyof EventSubscribeResponses]
export type McpStatusData = {
body?: never
path?: never
@@ -4998,3 +4979,22 @@ export type FormatterStatusResponses = {
}
export type FormatterStatusResponse = FormatterStatusResponses[keyof FormatterStatusResponses]
export type EventSubscribeData = {
body?: never
path?: never
query?: {
directory?: string
workspace?: string
}
url: "/event"
}
export type EventSubscribeResponses = {
/**
* Event stream
*/
200: Event
}
export type EventSubscribeResponse = EventSubscribeResponses[keyof EventSubscribeResponses]

View File

@@ -5243,47 +5243,6 @@
]
}
},
"/event": {
"get": {
"operationId": "event.subscribe",
"parameters": [
{
"in": "query",
"name": "directory",
"schema": {
"type": "string"
}
},
{
"in": "query",
"name": "workspace",
"schema": {
"type": "string"
}
}
],
"summary": "Subscribe to events",
"description": "Get events",
"responses": {
"200": {
"description": "Event stream",
"content": {
"text/event-stream": {
"schema": {
"$ref": "#/components/schemas/Event"
}
}
}
}
},
"x-codeSamples": [
{
"lang": "js",
"source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.event.subscribe({\n ...\n})"
}
]
}
},
"/mcp": {
"get": {
"operationId": "mcp.status",
@@ -6935,6 +6894,47 @@
}
]
}
},
"/event": {
"get": {
"operationId": "event.subscribe",
"parameters": [
{
"in": "query",
"name": "directory",
"schema": {
"type": "string"
}
},
{
"in": "query",
"name": "workspace",
"schema": {
"type": "string"
}
}
],
"summary": "Subscribe to events",
"description": "Get events",
"responses": {
"200": {
"description": "Event stream",
"content": {
"text/event-stream": {
"schema": {
"$ref": "#/components/schemas/Event"
}
}
}
}
},
"x-codeSamples": [
{
"lang": "js",
"source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.event.subscribe({\n ...\n})"
}
]
}
}
},
"components": {