mirror of
https://github.com/anomalyco/opencode.git
synced 2026-03-19 21:24:37 +00:00
Compare commits
8 Commits
kit/effect
...
kit/effect
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b53a95fd81 | ||
|
|
8e11a46fe0 | ||
|
|
8ab4d84057 | ||
|
|
4066247988 | ||
|
|
b9de3ad370 | ||
|
|
d69962b0f7 | ||
|
|
a6f23cb08e | ||
|
|
0540751897 |
@@ -132,7 +132,7 @@ Still open and likely worth migrating:
|
||||
- [ ] `Worktree`
|
||||
- [ ] `Installation`
|
||||
- [ ] `Bus`
|
||||
- [ ] `Command`
|
||||
- [x] `Command`
|
||||
- [ ] `Config`
|
||||
- [ ] `Session`
|
||||
- [ ] `SessionProcessor`
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
import z from "zod"
|
||||
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect"
|
||||
import { Log } from "../util/log"
|
||||
import { Instance } from "../project/instance"
|
||||
import { BusEvent } from "./bus-event"
|
||||
import { GlobalBus } from "./global"
|
||||
import { runCallbackInstance, runPromiseInstance } from "../effect/runtime"
|
||||
|
||||
export namespace Bus {
|
||||
const log = Log.create({ service: "bus" })
|
||||
type Subscription = (event: any) => void
|
||||
|
||||
export const InstanceDisposed = BusEvent.define(
|
||||
"server.instance.disposed",
|
||||
@@ -16,130 +15,91 @@ export namespace Bus {
|
||||
}),
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Service definition
|
||||
// ---------------------------------------------------------------------------
|
||||
const state = Instance.state(
|
||||
() => {
|
||||
const subscriptions = new Map<any, Subscription[]>()
|
||||
|
||||
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
|
||||
type: D["type"]
|
||||
properties: z.infer<D["properties"]>
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly publish: <D extends BusEvent.Definition>(
|
||||
def: D,
|
||||
properties: z.output<D["properties"]>,
|
||||
) => Effect.Effect<void>
|
||||
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
|
||||
readonly subscribeAll: () => Stream.Stream<Payload>
|
||||
}
|
||||
|
||||
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const pubsubs = new Map<string, PubSub.PubSub<Payload>>()
|
||||
const wildcardPubSub = yield* PubSub.unbounded<Payload>()
|
||||
|
||||
const getOrCreate = Effect.fnUntraced(function* (type: string) {
|
||||
let ps = pubsubs.get(type)
|
||||
if (!ps) {
|
||||
ps = yield* PubSub.unbounded<Payload>()
|
||||
pubsubs.set(type, ps)
|
||||
}
|
||||
return ps
|
||||
})
|
||||
|
||||
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
|
||||
return Effect.gen(function* () {
|
||||
const payload: Payload = { type: def.type, properties }
|
||||
log.info("publishing", { type: def.type })
|
||||
|
||||
const ps = pubsubs.get(def.type)
|
||||
if (ps) yield* PubSub.publish(ps, payload)
|
||||
yield* PubSub.publish(wildcardPubSub, payload)
|
||||
|
||||
GlobalBus.emit("event", {
|
||||
directory: Instance.directory,
|
||||
payload,
|
||||
})
|
||||
})
|
||||
return {
|
||||
subscriptions,
|
||||
}
|
||||
|
||||
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
|
||||
log.info("subscribing", { type: def.type })
|
||||
return Stream.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const ps = yield* getOrCreate(def.type)
|
||||
return Stream.fromPubSub(ps) as Stream.Stream<Payload<D>>
|
||||
}),
|
||||
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
|
||||
},
|
||||
async (entry) => {
|
||||
const wildcard = entry.subscriptions.get("*")
|
||||
if (!wildcard) return
|
||||
const event = {
|
||||
type: InstanceDisposed.type,
|
||||
properties: {
|
||||
directory: Instance.directory,
|
||||
},
|
||||
}
|
||||
|
||||
function subscribeAll(): Stream.Stream<Payload> {
|
||||
log.info("subscribing", { type: "*" })
|
||||
return Stream.fromPubSub(wildcardPubSub).pipe(
|
||||
Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))),
|
||||
)
|
||||
for (const sub of [...wildcard]) {
|
||||
sub(event)
|
||||
}
|
||||
|
||||
// Shut down all PubSubs when the layer is torn down.
|
||||
// This causes Stream.fromPubSub consumers to end, triggering
|
||||
// their ensuring/finalizers.
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.gen(function* () {
|
||||
log.info("shutting down PubSubs")
|
||||
yield* PubSub.shutdown(wildcardPubSub)
|
||||
for (const ps of pubsubs.values()) {
|
||||
yield* PubSub.shutdown(ps)
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
return Service.of({ publish, subscribe, subscribeAll })
|
||||
}),
|
||||
},
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Legacy adapters — plain function API wrapping the Effect service
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function runStream(stream: (svc: Interface) => Stream.Stream<Payload>, callback: (event: any) => void) {
|
||||
return runCallbackInstance(
|
||||
Service.use((svc) => stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg))))),
|
||||
)
|
||||
export async function publish<Definition extends BusEvent.Definition>(
|
||||
def: Definition,
|
||||
properties: z.output<Definition["properties"]>,
|
||||
) {
|
||||
const payload = {
|
||||
type: def.type,
|
||||
properties,
|
||||
}
|
||||
log.info("publishing", {
|
||||
type: def.type,
|
||||
})
|
||||
const pending = []
|
||||
for (const key of [def.type, "*"]) {
|
||||
const match = [...(state().subscriptions.get(key) ?? [])]
|
||||
for (const sub of match) {
|
||||
pending.push(sub(payload))
|
||||
}
|
||||
}
|
||||
GlobalBus.emit("event", {
|
||||
directory: Instance.directory,
|
||||
payload,
|
||||
})
|
||||
return Promise.all(pending)
|
||||
}
|
||||
|
||||
export function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
|
||||
return runPromiseInstance(Service.use((svc) => svc.publish(def, properties)))
|
||||
export function subscribe<Definition extends BusEvent.Definition>(
|
||||
def: Definition,
|
||||
callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
|
||||
) {
|
||||
return raw(def.type, callback)
|
||||
}
|
||||
|
||||
export function subscribe<D extends BusEvent.Definition>(def: D, callback: (event: Payload<D>) => void) {
|
||||
return runStream((svc) => svc.subscribe(def), callback)
|
||||
export function once<Definition extends BusEvent.Definition>(
|
||||
def: Definition,
|
||||
callback: (event: {
|
||||
type: Definition["type"]
|
||||
properties: z.infer<Definition["properties"]>
|
||||
}) => "done" | undefined,
|
||||
) {
|
||||
const unsub = subscribe(def, (event) => {
|
||||
if (callback(event)) unsub()
|
||||
})
|
||||
}
|
||||
|
||||
export function subscribeAll(callback: (event: any) => void) {
|
||||
const directory = Instance.directory
|
||||
return raw("*", callback)
|
||||
}
|
||||
|
||||
// InstanceDisposed is delivered via GlobalBus because the legacy
|
||||
// adapter's fiber starts asynchronously and may not be running when
|
||||
// disposal happens. In the Effect-native path, forkScoped + scope
|
||||
// closure handles this correctly. This bridge can be removed once
|
||||
// upstream PubSub.shutdown properly wakes suspended subscribers:
|
||||
// https://github.com/Effect-TS/effect-smol/pull/1800
|
||||
const onDispose = (evt: { directory?: string; payload: any }) => {
|
||||
if (evt.payload.type !== InstanceDisposed.type) return
|
||||
if (evt.directory !== directory) return
|
||||
callback(evt.payload)
|
||||
GlobalBus.off("event", onDispose)
|
||||
}
|
||||
GlobalBus.on("event", onDispose)
|
||||
function raw(type: string, callback: (event: any) => void) {
|
||||
log.info("subscribing", { type })
|
||||
const subscriptions = state().subscriptions
|
||||
let match = subscriptions.get(type) ?? []
|
||||
match.push(callback)
|
||||
subscriptions.set(type, match)
|
||||
|
||||
const interrupt = runStream((svc) => svc.subscribeAll(), callback)
|
||||
return () => {
|
||||
GlobalBus.off("event", onDispose)
|
||||
interrupt()
|
||||
log.info("unsubscribing", { type })
|
||||
const match = subscriptions.get(type)
|
||||
if (!match) return
|
||||
const index = match.indexOf(callback)
|
||||
if (index === -1) return
|
||||
match.splice(index, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,18 @@
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { InstanceContext } from "@/effect/instance-context"
|
||||
import { runPromiseInstance } from "@/effect/runtime"
|
||||
import { SessionID, MessageID } from "@/session/schema"
|
||||
import { Effect, Fiber, Layer, ServiceMap } from "effect"
|
||||
import z from "zod"
|
||||
import { Config } from "../config/config"
|
||||
import { Instance } from "../project/instance"
|
||||
import { Identifier } from "../id/id"
|
||||
import PROMPT_INITIALIZE from "./template/initialize.txt"
|
||||
import PROMPT_REVIEW from "./template/review.txt"
|
||||
import { MCP } from "../mcp"
|
||||
import { Skill } from "../skill"
|
||||
import { Log } from "../util/log"
|
||||
|
||||
export namespace Command {
|
||||
const log = Log.create({ service: "command" })
|
||||
export const Event = {
|
||||
Executed: BusEvent.define(
|
||||
"command.executed",
|
||||
@@ -57,95 +60,126 @@ export namespace Command {
|
||||
REVIEW: "review",
|
||||
} as const
|
||||
|
||||
const state = Instance.state(async () => {
|
||||
const cfg = await Config.get()
|
||||
export interface Interface {
|
||||
readonly get: (name: string) => Effect.Effect<Info | undefined>
|
||||
readonly list: () => Effect.Effect<Info[]>
|
||||
}
|
||||
|
||||
const result: Record<string, Info> = {
|
||||
[Default.INIT]: {
|
||||
name: Default.INIT,
|
||||
description: "create/update AGENTS.md",
|
||||
source: "command",
|
||||
get template() {
|
||||
return PROMPT_INITIALIZE.replace("${path}", Instance.worktree)
|
||||
},
|
||||
hints: hints(PROMPT_INITIALIZE),
|
||||
},
|
||||
[Default.REVIEW]: {
|
||||
name: Default.REVIEW,
|
||||
description: "review changes [commit|branch|pr], defaults to uncommitted",
|
||||
source: "command",
|
||||
get template() {
|
||||
return PROMPT_REVIEW.replace("${path}", Instance.worktree)
|
||||
},
|
||||
subtask: true,
|
||||
hints: hints(PROMPT_REVIEW),
|
||||
},
|
||||
}
|
||||
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Command") {}
|
||||
|
||||
for (const [name, command] of Object.entries(cfg.command ?? {})) {
|
||||
result[name] = {
|
||||
name,
|
||||
agent: command.agent,
|
||||
model: command.model,
|
||||
description: command.description,
|
||||
source: "command",
|
||||
get template() {
|
||||
return command.template
|
||||
},
|
||||
subtask: command.subtask,
|
||||
hints: hints(command.template),
|
||||
}
|
||||
}
|
||||
for (const [name, prompt] of Object.entries(await MCP.prompts())) {
|
||||
result[name] = {
|
||||
name,
|
||||
source: "mcp",
|
||||
description: prompt.description,
|
||||
get template() {
|
||||
// since a getter can't be async we need to manually return a promise here
|
||||
return new Promise<string>(async (resolve, reject) => {
|
||||
const template = await MCP.getPrompt(
|
||||
prompt.client,
|
||||
prompt.name,
|
||||
prompt.arguments
|
||||
? // substitute each argument with $1, $2, etc.
|
||||
Object.fromEntries(prompt.arguments?.map((argument, i) => [argument.name, `$${i + 1}`]))
|
||||
: {},
|
||||
).catch(reject)
|
||||
resolve(
|
||||
template?.messages
|
||||
.map((message) => (message.content.type === "text" ? message.content.text : ""))
|
||||
.join("\n") || "",
|
||||
)
|
||||
})
|
||||
},
|
||||
hints: prompt.arguments?.map((_, i) => `$${i + 1}`) ?? [],
|
||||
}
|
||||
}
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const instance = yield* InstanceContext
|
||||
|
||||
// Add skills as invokable commands
|
||||
for (const skill of await Skill.all()) {
|
||||
// Skip if a command with this name already exists
|
||||
if (result[skill.name]) continue
|
||||
result[skill.name] = {
|
||||
name: skill.name,
|
||||
description: skill.description,
|
||||
source: "skill",
|
||||
get template() {
|
||||
return skill.content
|
||||
},
|
||||
hints: [],
|
||||
}
|
||||
}
|
||||
const commands: Record<string, Info> = {}
|
||||
|
||||
return result
|
||||
})
|
||||
const load = Effect.fn("Command.load")(function* () {
|
||||
yield* Effect.promise(async () => {
|
||||
const cfg = await Config.get()
|
||||
|
||||
commands[Default.INIT] = {
|
||||
name: Default.INIT,
|
||||
description: "create/update AGENTS.md",
|
||||
source: "command",
|
||||
get template() {
|
||||
return PROMPT_INITIALIZE.replace("${path}", instance.worktree)
|
||||
},
|
||||
hints: hints(PROMPT_INITIALIZE),
|
||||
}
|
||||
commands[Default.REVIEW] = {
|
||||
name: Default.REVIEW,
|
||||
description: "review changes [commit|branch|pr], defaults to uncommitted",
|
||||
source: "command",
|
||||
get template() {
|
||||
return PROMPT_REVIEW.replace("${path}", instance.worktree)
|
||||
},
|
||||
subtask: true,
|
||||
hints: hints(PROMPT_REVIEW),
|
||||
}
|
||||
|
||||
for (const [name, command] of Object.entries(cfg.command ?? {})) {
|
||||
commands[name] = {
|
||||
name,
|
||||
agent: command.agent,
|
||||
model: command.model,
|
||||
description: command.description,
|
||||
source: "command",
|
||||
get template() {
|
||||
return command.template
|
||||
},
|
||||
subtask: command.subtask,
|
||||
hints: hints(command.template),
|
||||
}
|
||||
}
|
||||
for (const [name, prompt] of Object.entries(await MCP.prompts())) {
|
||||
commands[name] = {
|
||||
name,
|
||||
source: "mcp",
|
||||
description: prompt.description,
|
||||
get template() {
|
||||
// since a getter can't be async we need to manually return a promise here
|
||||
return new Promise<string>(async (resolve, reject) => {
|
||||
const template = await MCP.getPrompt(
|
||||
prompt.client,
|
||||
prompt.name,
|
||||
prompt.arguments
|
||||
? // substitute each argument with $1, $2, etc.
|
||||
Object.fromEntries(prompt.arguments?.map((argument, i) => [argument.name, `$${i + 1}`]))
|
||||
: {},
|
||||
).catch(reject)
|
||||
resolve(
|
||||
template?.messages
|
||||
.map((message) => (message.content.type === "text" ? message.content.text : ""))
|
||||
.join("\n") || "",
|
||||
)
|
||||
})
|
||||
},
|
||||
hints: prompt.arguments?.map((_, i) => `$${i + 1}`) ?? [],
|
||||
}
|
||||
}
|
||||
|
||||
// Add skills as invokable commands
|
||||
for (const skill of await Skill.all()) {
|
||||
// Skip if a command with this name already exists
|
||||
if (commands[skill.name]) continue
|
||||
commands[skill.name] = {
|
||||
name: skill.name,
|
||||
description: skill.description,
|
||||
source: "skill",
|
||||
get template() {
|
||||
return skill.content
|
||||
},
|
||||
hints: [],
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
const loadFiber = yield* load().pipe(
|
||||
Effect.catchCause((cause) => Effect.sync(() => log.error("init failed", { cause }))),
|
||||
Effect.forkScoped,
|
||||
)
|
||||
|
||||
const get = Effect.fn("Command.get")(function* (name: string) {
|
||||
yield* Fiber.join(loadFiber)
|
||||
return commands[name]
|
||||
})
|
||||
|
||||
const list = Effect.fn("Command.list")(function* () {
|
||||
yield* Fiber.join(loadFiber)
|
||||
return Object.values(commands)
|
||||
})
|
||||
|
||||
return Service.of({ get, list })
|
||||
}),
|
||||
)
|
||||
|
||||
export async function get(name: string) {
|
||||
return state().then((x) => x[name])
|
||||
return runPromiseInstance(Service.use((svc) => svc.get(name)))
|
||||
}
|
||||
|
||||
export async function list() {
|
||||
return state().then((x) => Object.values(x))
|
||||
return runPromiseInstance(Service.use((svc) => svc.list()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { Effect, Exit, Fiber, Layer, LayerMap, MutableHashMap, Scope, ServiceMap } from "effect"
|
||||
import { Bus } from "@/bus"
|
||||
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
|
||||
import { Command } from "@/command"
|
||||
import { File } from "@/file"
|
||||
import { FileTime } from "@/file/time"
|
||||
import { FileWatcher } from "@/file/watcher"
|
||||
@@ -17,7 +17,7 @@ import { registerDisposer } from "./instance-registry"
|
||||
export { InstanceContext } from "./instance-context"
|
||||
|
||||
export type InstanceServices =
|
||||
| Bus.Service
|
||||
| Command.Service
|
||||
| Question.Service
|
||||
| PermissionNext.Service
|
||||
| ProviderAuth.Service
|
||||
@@ -38,7 +38,7 @@ export type InstanceServices =
|
||||
function lookup(_key: string) {
|
||||
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current))
|
||||
return Layer.mergeAll(
|
||||
Layer.fresh(Bus.layer),
|
||||
Layer.fresh(Command.layer),
|
||||
Layer.fresh(Question.layer),
|
||||
Layer.fresh(PermissionNext.layer),
|
||||
Layer.fresh(ProviderAuth.defaultLayer),
|
||||
@@ -59,23 +59,7 @@ export class Instances extends ServiceMap.Service<Instances, LayerMap.LayerMap<s
|
||||
Instances,
|
||||
Effect.gen(function* () {
|
||||
const layerMap = yield* LayerMap.make(lookup, { idleTimeToLive: Infinity })
|
||||
|
||||
// Force-invalidate closes the RcMap entry scope even when refCount > 0.
|
||||
// Standard RcMap.invalidate bails in that case, leaving long-running
|
||||
// consumer fibers orphaned. This is an upstream issue:
|
||||
// https://github.com/Effect-TS/effect-smol/pull/1799
|
||||
const forceInvalidate = (directory: string) =>
|
||||
Effect.gen(function* () {
|
||||
const rcMap = layerMap.rcMap
|
||||
if (rcMap.state._tag === "Closed") return
|
||||
const entry = MutableHashMap.get(rcMap.state.map, directory)
|
||||
if (entry._tag === "None") return
|
||||
MutableHashMap.remove(rcMap.state.map, directory)
|
||||
if (entry.value.fiber) yield* Fiber.interrupt(entry.value.fiber)
|
||||
yield* Scope.close(entry.value.scope, Exit.void)
|
||||
}).pipe(Effect.uninterruptible, Effect.ignore)
|
||||
|
||||
const unregister = registerDisposer((directory) => Effect.runPromise(forceInvalidate(directory)))
|
||||
const unregister = registerDisposer((directory) => Effect.runPromise(layerMap.invalidate(directory)))
|
||||
yield* Effect.addFinalizer(() => Effect.sync(unregister))
|
||||
return Instances.of(layerMap)
|
||||
}),
|
||||
|
||||
@@ -18,12 +18,6 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
|
||||
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
|
||||
}
|
||||
|
||||
export function runCallbackInstance<A, E>(
|
||||
effect: Effect.Effect<A, E, InstanceServices>,
|
||||
): (interruptor?: number) => void {
|
||||
return runtime.runCallback(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
|
||||
}
|
||||
|
||||
export function disposeRuntime() {
|
||||
return runtime.dispose()
|
||||
}
|
||||
|
||||
@@ -4,7 +4,9 @@ import { InstanceContext } from "@/effect/instance-context"
|
||||
import path from "path"
|
||||
import { mergeDeep } from "remeda"
|
||||
import z from "zod"
|
||||
import { Bus } from "../bus"
|
||||
import { Config } from "../config/config"
|
||||
import { File } from "../file"
|
||||
import { Instance } from "../project/instance"
|
||||
import { Process } from "../util/process"
|
||||
import { Log } from "../util/log"
|
||||
@@ -25,7 +27,6 @@ export namespace Format {
|
||||
export type Status = z.infer<typeof Status>
|
||||
|
||||
export interface Interface {
|
||||
readonly run: (filepath: string) => Effect.Effect<void>
|
||||
readonly status: () => Effect.Effect<Status[]>
|
||||
}
|
||||
|
||||
@@ -89,44 +90,48 @@ export namespace Format {
|
||||
return result
|
||||
}
|
||||
|
||||
const run = Effect.fn("Format.run")(function* (filepath: string) {
|
||||
log.info("formatting", { file: filepath })
|
||||
const ext = path.extname(filepath)
|
||||
yield* Effect.acquireRelease(
|
||||
Effect.sync(() =>
|
||||
Bus.subscribe(
|
||||
File.Event.Edited,
|
||||
Instance.bind(async (payload) => {
|
||||
const file = payload.properties.file
|
||||
log.info("formatting", { file })
|
||||
const ext = path.extname(file)
|
||||
|
||||
for (const item of yield* Effect.promise(() => getFormatter(ext))) {
|
||||
log.info("running", { command: item.command })
|
||||
yield* Effect.tryPromise({
|
||||
try: async () => {
|
||||
const proc = Process.spawn(
|
||||
item.command.map((x) => x.replace("$FILE", filepath)),
|
||||
{
|
||||
cwd: instance.directory,
|
||||
env: { ...process.env, ...item.environment },
|
||||
stdout: "ignore",
|
||||
stderr: "ignore",
|
||||
},
|
||||
)
|
||||
const exit = await proc.exited
|
||||
if (exit !== 0) {
|
||||
log.error("failed", {
|
||||
command: item.command,
|
||||
...item.environment,
|
||||
})
|
||||
for (const item of await getFormatter(ext)) {
|
||||
log.info("running", { command: item.command })
|
||||
try {
|
||||
const proc = Process.spawn(
|
||||
item.command.map((x) => x.replace("$FILE", file)),
|
||||
{
|
||||
cwd: instance.directory,
|
||||
env: { ...process.env, ...item.environment },
|
||||
stdout: "ignore",
|
||||
stderr: "ignore",
|
||||
},
|
||||
)
|
||||
const exit = await proc.exited
|
||||
if (exit !== 0) {
|
||||
log.error("failed", {
|
||||
command: item.command,
|
||||
...item.environment,
|
||||
})
|
||||
}
|
||||
} catch (error) {
|
||||
log.error("failed to format file", {
|
||||
error,
|
||||
command: item.command,
|
||||
...item.environment,
|
||||
file,
|
||||
})
|
||||
}
|
||||
}
|
||||
},
|
||||
catch: (error) => {
|
||||
log.error("failed to format file", {
|
||||
error,
|
||||
command: item.command,
|
||||
...item.environment,
|
||||
file: filepath,
|
||||
})
|
||||
return error
|
||||
},
|
||||
}).pipe(Effect.ignore)
|
||||
}
|
||||
})
|
||||
|
||||
}),
|
||||
),
|
||||
),
|
||||
(unsubscribe) => Effect.sync(unsubscribe),
|
||||
)
|
||||
log.info("init")
|
||||
|
||||
const status = Effect.fn("Format.status")(function* () {
|
||||
@@ -142,14 +147,10 @@ export namespace Format {
|
||||
return result
|
||||
})
|
||||
|
||||
return Service.of({ run, status })
|
||||
return Service.of({ status })
|
||||
}),
|
||||
)
|
||||
|
||||
export async function run(filepath: string) {
|
||||
return runPromiseInstance(Service.use((s) => s.run(filepath)))
|
||||
}
|
||||
|
||||
export async function status() {
|
||||
return runPromiseInstance(Service.use((s) => s.status()))
|
||||
}
|
||||
|
||||
@@ -47,8 +47,6 @@ import { ProviderTransform } from "./transform"
|
||||
import { Installation } from "../installation"
|
||||
import { ModelID, ProviderID } from "./schema"
|
||||
|
||||
const DEFAULT_CHUNK_TIMEOUT = 300_000
|
||||
|
||||
export namespace Provider {
|
||||
const log = Log.create({ service: "provider" })
|
||||
|
||||
@@ -1130,7 +1128,7 @@ export namespace Provider {
|
||||
if (existing) return existing
|
||||
|
||||
const customFetch = options["fetch"]
|
||||
const chunkTimeout = options["chunkTimeout"] || DEFAULT_CHUNK_TIMEOUT
|
||||
const chunkTimeout = options["chunkTimeout"]
|
||||
delete options["chunkTimeout"]
|
||||
|
||||
options["fetch"] = async (input: any, init?: BunFetchRequestInit) => {
|
||||
|
||||
85
packages/opencode/src/server/routes/event.ts
Normal file
85
packages/opencode/src/server/routes/event.ts
Normal file
@@ -0,0 +1,85 @@
|
||||
import { Hono } from "hono"
|
||||
import { describeRoute, resolver } from "hono-openapi"
|
||||
import { streamSSE } from "hono/streaming"
|
||||
import { Log } from "@/util/log"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { Bus } from "@/bus"
|
||||
import { lazy } from "../../util/lazy"
|
||||
import { AsyncQueue } from "../../util/queue"
|
||||
import { Instance } from "@/project/instance"
|
||||
|
||||
const log = Log.create({ service: "server" })
|
||||
|
||||
export const EventRoutes = lazy(() =>
|
||||
new Hono().get(
|
||||
"/event",
|
||||
describeRoute({
|
||||
summary: "Subscribe to events",
|
||||
description: "Get events",
|
||||
operationId: "event.subscribe",
|
||||
responses: {
|
||||
200: {
|
||||
description: "Event stream",
|
||||
content: {
|
||||
"text/event-stream": {
|
||||
schema: resolver(BusEvent.payloads()),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
async (c) => {
|
||||
log.info("event connected")
|
||||
c.header("X-Accel-Buffering", "no")
|
||||
c.header("X-Content-Type-Options", "nosniff")
|
||||
return streamSSE(c, async (stream) => {
|
||||
const q = new AsyncQueue<string | null>()
|
||||
let done = false
|
||||
|
||||
q.push(
|
||||
JSON.stringify({
|
||||
type: "server.connected",
|
||||
properties: {},
|
||||
}),
|
||||
)
|
||||
|
||||
// Send heartbeat every 10s to prevent stalled proxy streams.
|
||||
const heartbeat = setInterval(() => {
|
||||
q.push(
|
||||
JSON.stringify({
|
||||
type: "server.heartbeat",
|
||||
properties: {},
|
||||
}),
|
||||
)
|
||||
}, 10_000)
|
||||
|
||||
const unsub = Bus.subscribeAll((event) => {
|
||||
q.push(JSON.stringify(event))
|
||||
if (event.type === Bus.InstanceDisposed.type) {
|
||||
stop()
|
||||
}
|
||||
})
|
||||
|
||||
const stop = () => {
|
||||
if (done) return
|
||||
done = true
|
||||
clearInterval(heartbeat)
|
||||
unsub()
|
||||
q.push(null)
|
||||
log.info("event disconnected")
|
||||
}
|
||||
|
||||
stream.onAbort(stop)
|
||||
|
||||
try {
|
||||
for await (const data of q) {
|
||||
if (data === null) return
|
||||
await stream.writeSSE({ data })
|
||||
}
|
||||
} finally {
|
||||
stop()
|
||||
}
|
||||
})
|
||||
},
|
||||
),
|
||||
)
|
||||
@@ -4,6 +4,7 @@ import { streamSSE } from "hono/streaming"
|
||||
import z from "zod"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { GlobalBus } from "@/bus/global"
|
||||
import { AsyncQueue } from "@/util/queue"
|
||||
import { Instance } from "../../project/instance"
|
||||
import { Installation } from "@/installation"
|
||||
import { Log } from "../../util/log"
|
||||
@@ -69,41 +70,54 @@ export const GlobalRoutes = lazy(() =>
|
||||
c.header("X-Accel-Buffering", "no")
|
||||
c.header("X-Content-Type-Options", "nosniff")
|
||||
return streamSSE(c, async (stream) => {
|
||||
stream.writeSSE({
|
||||
data: JSON.stringify({
|
||||
const q = new AsyncQueue<string | null>()
|
||||
let done = false
|
||||
|
||||
q.push(
|
||||
JSON.stringify({
|
||||
payload: {
|
||||
type: "server.connected",
|
||||
properties: {},
|
||||
},
|
||||
}),
|
||||
})
|
||||
async function handler(event: any) {
|
||||
await stream.writeSSE({
|
||||
data: JSON.stringify(event),
|
||||
})
|
||||
}
|
||||
GlobalBus.on("event", handler)
|
||||
)
|
||||
|
||||
// Send heartbeat every 10s to prevent stalled proxy streams.
|
||||
const heartbeat = setInterval(() => {
|
||||
stream.writeSSE({
|
||||
data: JSON.stringify({
|
||||
q.push(
|
||||
JSON.stringify({
|
||||
payload: {
|
||||
type: "server.heartbeat",
|
||||
properties: {},
|
||||
},
|
||||
}),
|
||||
})
|
||||
)
|
||||
}, 10_000)
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
stream.onAbort(() => {
|
||||
clearInterval(heartbeat)
|
||||
GlobalBus.off("event", handler)
|
||||
resolve()
|
||||
log.info("global event disconnected")
|
||||
})
|
||||
})
|
||||
async function handler(event: any) {
|
||||
q.push(JSON.stringify(event))
|
||||
}
|
||||
GlobalBus.on("event", handler)
|
||||
|
||||
const stop = () => {
|
||||
if (done) return
|
||||
done = true
|
||||
clearInterval(heartbeat)
|
||||
GlobalBus.off("event", handler)
|
||||
q.push(null)
|
||||
log.info("event disconnected")
|
||||
}
|
||||
|
||||
stream.onAbort(stop)
|
||||
|
||||
try {
|
||||
for await (const data of q) {
|
||||
if (data === null) return
|
||||
await stream.writeSSE({ data })
|
||||
}
|
||||
} finally {
|
||||
stop()
|
||||
}
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { Bus } from "@/bus"
|
||||
import { Log } from "../util/log"
|
||||
import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi"
|
||||
import { Hono } from "hono"
|
||||
import { cors } from "hono/cors"
|
||||
import { streamSSE } from "hono/streaming"
|
||||
import { proxy } from "hono/proxy"
|
||||
import { basicAuth } from "hono/basic-auth"
|
||||
import z from "zod"
|
||||
@@ -34,6 +31,7 @@ import { FileRoutes } from "./routes/file"
|
||||
import { ConfigRoutes } from "./routes/config"
|
||||
import { ExperimentalRoutes } from "./routes/experimental"
|
||||
import { ProviderRoutes } from "./routes/provider"
|
||||
import { EventRoutes } from "./routes/event"
|
||||
import { InstanceBootstrap } from "../project/bootstrap"
|
||||
import { NotFoundError } from "../storage/db"
|
||||
import type { ContentfulStatusCode } from "hono/utils/http-status"
|
||||
@@ -251,6 +249,7 @@ export namespace Server {
|
||||
.route("/question", QuestionRoutes())
|
||||
.route("/provider", ProviderRoutes())
|
||||
.route("/", FileRoutes())
|
||||
.route("/", EventRoutes())
|
||||
.route("/mcp", McpRoutes())
|
||||
.route("/tui", TuiRoutes())
|
||||
.post(
|
||||
@@ -498,64 +497,6 @@ export namespace Server {
|
||||
return c.json(await Format.status())
|
||||
},
|
||||
)
|
||||
.get(
|
||||
"/event",
|
||||
describeRoute({
|
||||
summary: "Subscribe to events",
|
||||
description: "Get events",
|
||||
operationId: "event.subscribe",
|
||||
responses: {
|
||||
200: {
|
||||
description: "Event stream",
|
||||
content: {
|
||||
"text/event-stream": {
|
||||
schema: resolver(BusEvent.payloads()),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
async (c) => {
|
||||
log.info("event connected")
|
||||
c.header("X-Accel-Buffering", "no")
|
||||
c.header("X-Content-Type-Options", "nosniff")
|
||||
return streamSSE(c, async (stream) => {
|
||||
stream.writeSSE({
|
||||
data: JSON.stringify({
|
||||
type: "server.connected",
|
||||
properties: {},
|
||||
}),
|
||||
})
|
||||
const unsub = Bus.subscribeAll(async (event) => {
|
||||
await stream.writeSSE({
|
||||
data: JSON.stringify(event),
|
||||
})
|
||||
if (event.type === Bus.InstanceDisposed.type) {
|
||||
stream.close()
|
||||
}
|
||||
})
|
||||
|
||||
// Send heartbeat every 10s to prevent stalled proxy streams.
|
||||
const heartbeat = setInterval(() => {
|
||||
stream.writeSSE({
|
||||
data: JSON.stringify({
|
||||
type: "server.heartbeat",
|
||||
properties: {},
|
||||
}),
|
||||
})
|
||||
}, 10_000)
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
stream.onAbort(() => {
|
||||
clearInterval(heartbeat)
|
||||
unsub()
|
||||
resolve()
|
||||
log.info("event disconnected")
|
||||
})
|
||||
})
|
||||
})
|
||||
},
|
||||
)
|
||||
.all("/*", async (c) => {
|
||||
const path = c.req.path
|
||||
|
||||
|
||||
@@ -1781,6 +1781,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
export async function command(input: CommandInput) {
|
||||
log.info("command", input)
|
||||
const command = await Command.get(input.command)
|
||||
if (!command) {
|
||||
throw new NamedError.Unknown({ message: `Command not found: "${input.command}"` })
|
||||
}
|
||||
const agentName = command.agent ?? input.agent ?? (await Agent.defaultAgent())
|
||||
|
||||
const raw = input.arguments.match(argsRegex) ?? []
|
||||
|
||||
@@ -10,7 +10,6 @@ import { createTwoFilesPatch, diffLines } from "diff"
|
||||
import { assertExternalDirectory } from "./external-directory"
|
||||
import { trimDiff } from "./edit"
|
||||
import { LSP } from "../lsp"
|
||||
import { Format } from "../format"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import DESCRIPTION from "./apply_patch.txt"
|
||||
import { File } from "../file"
|
||||
@@ -221,7 +220,6 @@ export const ApplyPatchTool = Tool.define("apply_patch", {
|
||||
}
|
||||
|
||||
if (edited) {
|
||||
await Format.run(edited)
|
||||
await Bus.publish(File.Event.Edited, {
|
||||
file: edited,
|
||||
})
|
||||
|
||||
@@ -13,7 +13,6 @@ import { File } from "../file"
|
||||
import { FileWatcher } from "../file/watcher"
|
||||
import { Bus } from "../bus"
|
||||
import { FileTime } from "../file/time"
|
||||
import { Format } from "../format"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import { Instance } from "../project/instance"
|
||||
import { Snapshot } from "@/snapshot"
|
||||
@@ -72,7 +71,6 @@ export const EditTool = Tool.define("edit", {
|
||||
},
|
||||
})
|
||||
await Filesystem.write(filePath, params.newString)
|
||||
await Format.run(filePath)
|
||||
await Bus.publish(File.Event.Edited, {
|
||||
file: filePath,
|
||||
})
|
||||
@@ -110,7 +108,6 @@ export const EditTool = Tool.define("edit", {
|
||||
})
|
||||
|
||||
await Filesystem.write(filePath, contentNew)
|
||||
await Format.run(filePath)
|
||||
await Bus.publish(File.Event.Edited, {
|
||||
file: filePath,
|
||||
})
|
||||
|
||||
@@ -8,7 +8,6 @@ import { Bus } from "../bus"
|
||||
import { File } from "../file"
|
||||
import { FileWatcher } from "../file/watcher"
|
||||
import { FileTime } from "../file/time"
|
||||
import { Format } from "../format"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import { Instance } from "../project/instance"
|
||||
import { trimDiff } from "./edit"
|
||||
@@ -43,7 +42,6 @@ export const WriteTool = Tool.define("write", {
|
||||
})
|
||||
|
||||
await Filesystem.write(filepath, params.content)
|
||||
await Format.run(filepath)
|
||||
await Bus.publish(File.Event.Edited, {
|
||||
file: filepath,
|
||||
})
|
||||
|
||||
@@ -1,372 +0,0 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import { Deferred, Effect, Stream } from "effect"
|
||||
import z from "zod"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { BusEvent } from "../../src/bus/bus-event"
|
||||
import { GlobalBus } from "../../src/bus/global"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test event definitions
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const TestEvent = {
|
||||
Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
|
||||
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function withInstance(directory: string, fn: () => Promise<void>) {
|
||||
return Instance.provide({ directory, fn })
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("Bus", () => {
|
||||
afterEach(() => Instance.disposeAll())
|
||||
|
||||
describe("publish + subscribe", () => {
|
||||
test("subscriber receives matching events", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const received: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => {
|
||||
received.push(evt.properties.value)
|
||||
})
|
||||
await Bus.publish(TestEvent.Ping, { value: 42 })
|
||||
await Bus.publish(TestEvent.Ping, { value: 99 })
|
||||
})
|
||||
|
||||
expect(received).toEqual([42, 99])
|
||||
})
|
||||
|
||||
test("subscriber does not receive events of other types", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const pings: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => {
|
||||
pings.push(evt.properties.value)
|
||||
})
|
||||
await Bus.publish(TestEvent.Pong, { message: "hello" })
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
})
|
||||
|
||||
expect(pings).toEqual([1])
|
||||
})
|
||||
|
||||
test("publish with no subscribers does not throw", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("multiple subscribers", () => {
|
||||
test("all subscribers for same event type are called", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const a: number[] = []
|
||||
const b: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value))
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value))
|
||||
await Bus.publish(TestEvent.Ping, { value: 7 })
|
||||
})
|
||||
|
||||
expect(a).toEqual([7])
|
||||
expect(b).toEqual([7])
|
||||
})
|
||||
|
||||
test("subscribers are called in registration order", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const order: string[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, () => order.push("first"))
|
||||
Bus.subscribe(TestEvent.Ping, () => order.push("second"))
|
||||
Bus.subscribe(TestEvent.Ping, () => order.push("third"))
|
||||
await Bus.publish(TestEvent.Ping, { value: 0 })
|
||||
})
|
||||
|
||||
expect(order).toEqual(["first", "second", "third"])
|
||||
})
|
||||
})
|
||||
|
||||
describe("unsubscribe", () => {
|
||||
test("unsubscribe stops delivery", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const received: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
|
||||
received.push(evt.properties.value)
|
||||
})
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
unsub()
|
||||
await Bus.publish(TestEvent.Ping, { value: 2 })
|
||||
})
|
||||
|
||||
expect(received).toEqual([1])
|
||||
})
|
||||
|
||||
test("unsubscribe is idempotent", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
const unsub = Bus.subscribe(TestEvent.Ping, () => {})
|
||||
unsub()
|
||||
unsub() // should not throw
|
||||
})
|
||||
})
|
||||
|
||||
test("unsubscribing one does not affect others", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const a: number[] = []
|
||||
const b: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
const unsubA = Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value))
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value))
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
unsubA()
|
||||
await Bus.publish(TestEvent.Ping, { value: 2 })
|
||||
})
|
||||
|
||||
expect(a).toEqual([1])
|
||||
expect(b).toEqual([1, 2])
|
||||
})
|
||||
})
|
||||
|
||||
describe("subscribeAll", () => {
|
||||
test("receives events of all types", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const all: string[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribeAll((evt) => {
|
||||
all.push(evt.type)
|
||||
})
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
await Bus.publish(TestEvent.Pong, { message: "hi" })
|
||||
})
|
||||
|
||||
expect(all).toEqual(["test.ping", "test.pong"])
|
||||
})
|
||||
|
||||
test("subscribeAll + typed subscribe both fire", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const typed: number[] = []
|
||||
const wild: string[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => typed.push(evt.properties.value))
|
||||
Bus.subscribeAll((evt) => wild.push(evt.type))
|
||||
await Bus.publish(TestEvent.Ping, { value: 5 })
|
||||
})
|
||||
|
||||
expect(typed).toEqual([5])
|
||||
expect(wild).toEqual(["test.ping"])
|
||||
})
|
||||
|
||||
test("unsubscribe from subscribeAll", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const all: string[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
const unsub = Bus.subscribeAll((evt) => all.push(evt.type))
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
unsub()
|
||||
await Bus.publish(TestEvent.Pong, { message: "missed" })
|
||||
})
|
||||
|
||||
expect(all).toEqual(["test.ping"])
|
||||
})
|
||||
|
||||
test("subscribeAll delivers InstanceDisposed on disposal", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const all: string[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribeAll((evt) => {
|
||||
all.push(evt.type)
|
||||
})
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
})
|
||||
|
||||
await Instance.disposeAll()
|
||||
|
||||
expect(all).toContain("test.ping")
|
||||
expect(all).toContain(Bus.InstanceDisposed.type)
|
||||
})
|
||||
|
||||
test("manual unsubscribe suppresses InstanceDisposed", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const all: string[] = []
|
||||
let unsub = () => {}
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
unsub = Bus.subscribeAll((evt) => {
|
||||
all.push(evt.type)
|
||||
})
|
||||
})
|
||||
|
||||
unsub()
|
||||
await Instance.disposeAll()
|
||||
|
||||
expect(all).not.toContain(Bus.InstanceDisposed.type)
|
||||
})
|
||||
})
|
||||
|
||||
describe("GlobalBus forwarding", () => {
|
||||
test("publish emits to GlobalBus with directory", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const globalEvents: Array<{ directory?: string; payload: any }> = []
|
||||
|
||||
const handler = (evt: any) => globalEvents.push(evt)
|
||||
GlobalBus.on("event", handler)
|
||||
|
||||
try {
|
||||
await withInstance(tmp.path, async () => {
|
||||
await Bus.publish(TestEvent.Ping, { value: 42 })
|
||||
})
|
||||
|
||||
const ping = globalEvents.find((e) => e.payload.type === "test.ping")
|
||||
expect(ping).toBeDefined()
|
||||
expect(ping!.directory).toBe(tmp.path)
|
||||
expect(ping!.payload).toEqual({
|
||||
type: "test.ping",
|
||||
properties: { value: 42 },
|
||||
})
|
||||
} finally {
|
||||
GlobalBus.off("event", handler)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
describe("instance isolation", () => {
|
||||
test("subscribers in one instance do not receive events from another", async () => {
|
||||
await using tmpA = await tmpdir()
|
||||
await using tmpB = await tmpdir()
|
||||
const eventsA: number[] = []
|
||||
const eventsB: number[] = []
|
||||
|
||||
await withInstance(tmpA.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => eventsA.push(evt.properties.value))
|
||||
})
|
||||
|
||||
await withInstance(tmpB.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => eventsB.push(evt.properties.value))
|
||||
})
|
||||
|
||||
await withInstance(tmpA.path, async () => {
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
})
|
||||
|
||||
await withInstance(tmpB.path, async () => {
|
||||
await Bus.publish(TestEvent.Ping, { value: 2 })
|
||||
})
|
||||
|
||||
expect(eventsA).toEqual([1])
|
||||
expect(eventsB).toEqual([2])
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
describe("async subscribers", () => {
|
||||
test("publish is fire-and-forget (does not await subscriber callbacks)", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const received: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, async (evt) => {
|
||||
await new Promise((r) => setTimeout(r, 10))
|
||||
received.push(evt.properties.value)
|
||||
})
|
||||
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
// Give the async subscriber time to complete
|
||||
await new Promise((r) => setTimeout(r, 50))
|
||||
})
|
||||
|
||||
expect(received).toEqual([1])
|
||||
})
|
||||
})
|
||||
|
||||
describe("Effect service", () => {
|
||||
test("subscribeAll stream receives published events", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const received: string[] = []
|
||||
|
||||
await withInstance(tmp.path, () =>
|
||||
Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const svc = yield* Bus.Service
|
||||
const done = yield* Deferred.make<void>()
|
||||
let count = 0
|
||||
|
||||
yield* Effect.forkScoped(
|
||||
svc.subscribeAll().pipe(
|
||||
Stream.runForEach((msg) =>
|
||||
Effect.gen(function* () {
|
||||
received.push(msg.type)
|
||||
if (++count >= 2) yield* Deferred.succeed(done, undefined)
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
// Let the forked fiber start and subscribe to the PubSub
|
||||
yield* Effect.yieldNow
|
||||
|
||||
yield* svc.publish(TestEvent.Ping, { value: 1 })
|
||||
yield* svc.publish(TestEvent.Pong, { message: "hi" })
|
||||
yield* Deferred.await(done)
|
||||
}),
|
||||
).pipe(Effect.provide(Bus.layer)),
|
||||
),
|
||||
)
|
||||
|
||||
expect(received).toEqual(["test.ping", "test.pong"])
|
||||
})
|
||||
|
||||
test("subscribeAll stream ends with ensuring when scope closes", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
let ensuringFired = false
|
||||
|
||||
await withInstance(tmp.path, () =>
|
||||
Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const svc = yield* Bus.Service
|
||||
|
||||
yield* Effect.forkScoped(
|
||||
svc.subscribeAll().pipe(
|
||||
Stream.runForEach(() => Effect.void),
|
||||
Effect.ensuring(Effect.sync(() => {
|
||||
ensuringFired = true
|
||||
})),
|
||||
),
|
||||
)
|
||||
|
||||
yield* svc.publish(TestEvent.Ping, { value: 1 })
|
||||
yield* Effect.yieldNow
|
||||
}),
|
||||
).pipe(Effect.provide(Bus.layer)),
|
||||
),
|
||||
)
|
||||
|
||||
expect(ensuringFired).toBe(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -2845,6 +2845,38 @@ export class File extends HeyApiClient {
|
||||
}
|
||||
}
|
||||
|
||||
export class Event extends HeyApiClient {
|
||||
/**
|
||||
* Subscribe to events
|
||||
*
|
||||
* Get events
|
||||
*/
|
||||
public subscribe<ThrowOnError extends boolean = false>(
|
||||
parameters?: {
|
||||
directory?: string
|
||||
workspace?: string
|
||||
},
|
||||
options?: Options<never, ThrowOnError>,
|
||||
) {
|
||||
const params = buildClientParams(
|
||||
[parameters],
|
||||
[
|
||||
{
|
||||
args: [
|
||||
{ in: "query", key: "directory" },
|
||||
{ in: "query", key: "workspace" },
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
return (options?.client ?? this.client).sse.get<EventSubscribeResponses, unknown, ThrowOnError>({
|
||||
url: "/event",
|
||||
...options,
|
||||
...params,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export class Auth2 extends HeyApiClient {
|
||||
/**
|
||||
* Remove MCP OAuth
|
||||
@@ -3866,38 +3898,6 @@ export class Formatter extends HeyApiClient {
|
||||
}
|
||||
}
|
||||
|
||||
export class Event extends HeyApiClient {
|
||||
/**
|
||||
* Subscribe to events
|
||||
*
|
||||
* Get events
|
||||
*/
|
||||
public subscribe<ThrowOnError extends boolean = false>(
|
||||
parameters?: {
|
||||
directory?: string
|
||||
workspace?: string
|
||||
},
|
||||
options?: Options<never, ThrowOnError>,
|
||||
) {
|
||||
const params = buildClientParams(
|
||||
[parameters],
|
||||
[
|
||||
{
|
||||
args: [
|
||||
{ in: "query", key: "directory" },
|
||||
{ in: "query", key: "workspace" },
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
return (options?.client ?? this.client).sse.get<EventSubscribeResponses, unknown, ThrowOnError>({
|
||||
url: "/event",
|
||||
...options,
|
||||
...params,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export class OpencodeClient extends HeyApiClient {
|
||||
public static readonly __registry = new HeyApiRegistry<OpencodeClient>()
|
||||
|
||||
@@ -3981,6 +3981,11 @@ export class OpencodeClient extends HeyApiClient {
|
||||
return (this._file ??= new File({ client: this.client }))
|
||||
}
|
||||
|
||||
private _event?: Event
|
||||
get event(): Event {
|
||||
return (this._event ??= new Event({ client: this.client }))
|
||||
}
|
||||
|
||||
private _mcp?: Mcp
|
||||
get mcp(): Mcp {
|
||||
return (this._mcp ??= new Mcp({ client: this.client }))
|
||||
@@ -4025,9 +4030,4 @@ export class OpencodeClient extends HeyApiClient {
|
||||
get formatter(): Formatter {
|
||||
return (this._formatter ??= new Formatter({ client: this.client }))
|
||||
}
|
||||
|
||||
private _event?: Event
|
||||
get event(): Event {
|
||||
return (this._event ??= new Event({ client: this.client }))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4229,6 +4229,25 @@ export type FileStatusResponses = {
|
||||
|
||||
export type FileStatusResponse = FileStatusResponses[keyof FileStatusResponses]
|
||||
|
||||
export type EventSubscribeData = {
|
||||
body?: never
|
||||
path?: never
|
||||
query?: {
|
||||
directory?: string
|
||||
workspace?: string
|
||||
}
|
||||
url: "/event"
|
||||
}
|
||||
|
||||
export type EventSubscribeResponses = {
|
||||
/**
|
||||
* Event stream
|
||||
*/
|
||||
200: Event
|
||||
}
|
||||
|
||||
export type EventSubscribeResponse = EventSubscribeResponses[keyof EventSubscribeResponses]
|
||||
|
||||
export type McpStatusData = {
|
||||
body?: never
|
||||
path?: never
|
||||
@@ -4979,22 +4998,3 @@ export type FormatterStatusResponses = {
|
||||
}
|
||||
|
||||
export type FormatterStatusResponse = FormatterStatusResponses[keyof FormatterStatusResponses]
|
||||
|
||||
export type EventSubscribeData = {
|
||||
body?: never
|
||||
path?: never
|
||||
query?: {
|
||||
directory?: string
|
||||
workspace?: string
|
||||
}
|
||||
url: "/event"
|
||||
}
|
||||
|
||||
export type EventSubscribeResponses = {
|
||||
/**
|
||||
* Event stream
|
||||
*/
|
||||
200: Event
|
||||
}
|
||||
|
||||
export type EventSubscribeResponse = EventSubscribeResponses[keyof EventSubscribeResponses]
|
||||
|
||||
@@ -5243,6 +5243,47 @@
|
||||
]
|
||||
}
|
||||
},
|
||||
"/event": {
|
||||
"get": {
|
||||
"operationId": "event.subscribe",
|
||||
"parameters": [
|
||||
{
|
||||
"in": "query",
|
||||
"name": "directory",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
{
|
||||
"in": "query",
|
||||
"name": "workspace",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
],
|
||||
"summary": "Subscribe to events",
|
||||
"description": "Get events",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Event stream",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/Event"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"x-codeSamples": [
|
||||
{
|
||||
"lang": "js",
|
||||
"source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.event.subscribe({\n ...\n})"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"/mcp": {
|
||||
"get": {
|
||||
"operationId": "mcp.status",
|
||||
@@ -6894,47 +6935,6 @@
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"/event": {
|
||||
"get": {
|
||||
"operationId": "event.subscribe",
|
||||
"parameters": [
|
||||
{
|
||||
"in": "query",
|
||||
"name": "directory",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
{
|
||||
"in": "query",
|
||||
"name": "workspace",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
],
|
||||
"summary": "Subscribe to events",
|
||||
"description": "Get events",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Event stream",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/Event"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"x-codeSamples": [
|
||||
{
|
||||
"lang": "js",
|
||||
"source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.event.subscribe({\n ...\n})"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"components": {
|
||||
|
||||
Reference in New Issue
Block a user