mirror of
https://github.com/anomalyco/opencode.git
synced 2026-03-19 05:04:33 +00:00
Compare commits
5 Commits
production
...
kit/effect
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
009d77c9d8 | ||
|
|
f3cf519d98 | ||
|
|
645c15351b | ||
|
|
f63a2a2636 | ||
|
|
84e62fc662 |
@@ -1,5 +1,5 @@
|
||||
import z from "zod"
|
||||
import type { ZodType } from "zod"
|
||||
import type { ZodObject, ZodRawShape } from "zod"
|
||||
import { Log } from "../util/log"
|
||||
|
||||
export namespace BusEvent {
|
||||
@@ -9,7 +9,7 @@ export namespace BusEvent {
|
||||
|
||||
const registry = new Map<string, Definition>()
|
||||
|
||||
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
|
||||
export function define<Type extends string, Properties extends ZodObject<ZodRawShape>>(type: Type, properties: Properties) {
|
||||
const result = {
|
||||
type,
|
||||
properties,
|
||||
|
||||
@@ -4,7 +4,7 @@ export const GlobalBus = new EventEmitter<{
|
||||
event: [
|
||||
{
|
||||
directory?: string
|
||||
payload: any
|
||||
payload: { type: string; properties: Record<string, unknown> }
|
||||
},
|
||||
]
|
||||
}>()
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import z from "zod"
|
||||
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect"
|
||||
import { Log } from "../util/log"
|
||||
import { Instance } from "../project/instance"
|
||||
import { BusEvent } from "./bus-event"
|
||||
import { GlobalBus } from "./global"
|
||||
import { runCallbackInstance, runPromiseInstance } from "../effect/runtime"
|
||||
|
||||
export namespace Bus {
|
||||
const log = Log.create({ service: "bus" })
|
||||
type Subscription = (event: any) => void
|
||||
|
||||
export const InstanceDisposed = BusEvent.define(
|
||||
"server.instance.disposed",
|
||||
@@ -15,91 +16,105 @@ export namespace Bus {
|
||||
}),
|
||||
)
|
||||
|
||||
const state = Instance.state(
|
||||
() => {
|
||||
const subscriptions = new Map<any, Subscription[]>()
|
||||
// ---------------------------------------------------------------------------
|
||||
// Service definition
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
return {
|
||||
subscriptions,
|
||||
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
|
||||
type: D["type"]
|
||||
properties: z.infer<D["properties"]>
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly publish: <D extends BusEvent.Definition>(
|
||||
def: D,
|
||||
properties: z.output<D["properties"]>,
|
||||
) => Effect.Effect<void>
|
||||
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
|
||||
readonly subscribeAll: () => Stream.Stream<Payload>
|
||||
}
|
||||
|
||||
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const pubsubs = new Map<string, PubSub.PubSub<Payload>>()
|
||||
const wildcardPubSub = yield* PubSub.unbounded<Payload>()
|
||||
|
||||
const getOrCreate = Effect.fnUntraced(function* (type: string) {
|
||||
let ps = pubsubs.get(type)
|
||||
if (!ps) {
|
||||
ps = yield* PubSub.unbounded<Payload>()
|
||||
pubsubs.set(type, ps)
|
||||
}
|
||||
return ps
|
||||
})
|
||||
|
||||
function publish<D extends BusEvent.Definition>(
|
||||
def: D,
|
||||
properties: z.output<D["properties"]>,
|
||||
) {
|
||||
return Effect.gen(function* () {
|
||||
const payload: Payload = { type: def.type, properties }
|
||||
log.info("publishing", { type: def.type })
|
||||
|
||||
const ps = pubsubs.get(def.type)
|
||||
if (ps) yield* PubSub.publish(ps, payload)
|
||||
yield* PubSub.publish(wildcardPubSub, payload)
|
||||
|
||||
GlobalBus.emit("event", {
|
||||
directory: Instance.directory,
|
||||
payload,
|
||||
})
|
||||
})
|
||||
}
|
||||
},
|
||||
async (entry) => {
|
||||
const wildcard = entry.subscriptions.get("*")
|
||||
if (!wildcard) return
|
||||
const event = {
|
||||
type: InstanceDisposed.type,
|
||||
properties: {
|
||||
directory: Instance.directory,
|
||||
},
|
||||
|
||||
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
|
||||
log.info("subscribing", { type: def.type })
|
||||
return Stream.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const ps = yield* getOrCreate(def.type)
|
||||
return Stream.fromPubSub(ps) as Stream.Stream<Payload<D>>
|
||||
}),
|
||||
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
|
||||
}
|
||||
for (const sub of [...wildcard]) {
|
||||
sub(event)
|
||||
|
||||
function subscribeAll(): Stream.Stream<Payload> {
|
||||
log.info("subscribing", { type: "*" })
|
||||
return Stream.fromPubSub(wildcardPubSub).pipe(
|
||||
Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))),
|
||||
)
|
||||
}
|
||||
},
|
||||
|
||||
return Service.of({ publish, subscribe, subscribeAll })
|
||||
}),
|
||||
)
|
||||
|
||||
export async function publish<Definition extends BusEvent.Definition>(
|
||||
def: Definition,
|
||||
properties: z.output<Definition["properties"]>,
|
||||
) {
|
||||
const payload = {
|
||||
type: def.type,
|
||||
properties,
|
||||
}
|
||||
log.info("publishing", {
|
||||
type: def.type,
|
||||
})
|
||||
const pending = []
|
||||
for (const key of [def.type, "*"]) {
|
||||
const match = state().subscriptions.get(key)
|
||||
for (const sub of match ?? []) {
|
||||
pending.push(sub(payload))
|
||||
}
|
||||
}
|
||||
GlobalBus.emit("event", {
|
||||
directory: Instance.directory,
|
||||
payload,
|
||||
})
|
||||
return Promise.all(pending)
|
||||
// ---------------------------------------------------------------------------
|
||||
// Legacy adapters — plain function API wrapping the Effect service
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function runStream(stream: (svc: Interface) => Stream.Stream<Payload>, callback: (event: any) => void) {
|
||||
return runCallbackInstance(
|
||||
Service.use((svc) =>
|
||||
stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
export function subscribe<Definition extends BusEvent.Definition>(
|
||||
def: Definition,
|
||||
callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
|
||||
) {
|
||||
return raw(def.type, callback)
|
||||
export function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
|
||||
return runPromiseInstance(Service.use((svc) => svc.publish(def, properties)))
|
||||
}
|
||||
|
||||
export function once<Definition extends BusEvent.Definition>(
|
||||
def: Definition,
|
||||
callback: (event: {
|
||||
type: Definition["type"]
|
||||
properties: z.infer<Definition["properties"]>
|
||||
}) => "done" | undefined,
|
||||
export function subscribe<D extends BusEvent.Definition>(
|
||||
def: D,
|
||||
callback: (event: Payload<D>) => void,
|
||||
) {
|
||||
const unsub = subscribe(def, (event) => {
|
||||
if (callback(event)) unsub()
|
||||
})
|
||||
return runStream((svc) => svc.subscribe(def), callback)
|
||||
}
|
||||
|
||||
export function subscribeAll(callback: (event: any) => void) {
|
||||
return raw("*", callback)
|
||||
}
|
||||
|
||||
function raw(type: string, callback: (event: any) => void) {
|
||||
log.info("subscribing", { type })
|
||||
const subscriptions = state().subscriptions
|
||||
let match = subscriptions.get(type) ?? []
|
||||
match.push(callback)
|
||||
subscriptions.set(type, match)
|
||||
|
||||
return () => {
|
||||
log.info("unsubscribing", { type })
|
||||
const match = subscriptions.get(type)
|
||||
if (!match) return
|
||||
const index = match.indexOf(callback)
|
||||
if (index === -1) return
|
||||
match.splice(index, 1)
|
||||
}
|
||||
return runStream((svc) => svc.subscribeAll(), callback)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,7 +123,7 @@ export namespace Workspace {
|
||||
await parseSSE(res.body, stop, (event) => {
|
||||
GlobalBus.emit("event", {
|
||||
directory: space.id,
|
||||
payload: event,
|
||||
payload: event as { type: string; properties: Record<string, unknown> },
|
||||
})
|
||||
})
|
||||
// Wait 250ms and retry if SSE connection fails
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
|
||||
import { Bus } from "@/bus"
|
||||
import { File } from "@/file"
|
||||
import { FileTime } from "@/file/time"
|
||||
import { FileWatcher } from "@/file/watcher"
|
||||
@@ -16,6 +17,7 @@ import { registerDisposer } from "./instance-registry"
|
||||
export { InstanceContext } from "./instance-context"
|
||||
|
||||
export type InstanceServices =
|
||||
| Bus.Service
|
||||
| Question.Service
|
||||
| PermissionNext.Service
|
||||
| ProviderAuth.Service
|
||||
@@ -36,6 +38,7 @@ export type InstanceServices =
|
||||
function lookup(_key: string) {
|
||||
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current))
|
||||
return Layer.mergeAll(
|
||||
Layer.fresh(Bus.layer),
|
||||
Layer.fresh(Question.layer),
|
||||
Layer.fresh(PermissionNext.layer),
|
||||
Layer.fresh(ProviderAuth.defaultLayer),
|
||||
|
||||
@@ -18,6 +18,12 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
|
||||
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
|
||||
}
|
||||
|
||||
export function runCallbackInstance<A, E>(
|
||||
effect: Effect.Effect<A, E, InstanceServices>,
|
||||
): (interruptor?: number) => void {
|
||||
return runtime.runCallback(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
|
||||
}
|
||||
|
||||
export function disposeRuntime() {
|
||||
return runtime.dispose()
|
||||
}
|
||||
|
||||
@@ -4,9 +4,7 @@ import { InstanceContext } from "@/effect/instance-context"
|
||||
import path from "path"
|
||||
import { mergeDeep } from "remeda"
|
||||
import z from "zod"
|
||||
import { Bus } from "../bus"
|
||||
import { Config } from "../config/config"
|
||||
import { File } from "../file"
|
||||
import { Instance } from "../project/instance"
|
||||
import { Process } from "../util/process"
|
||||
import { Log } from "../util/log"
|
||||
@@ -27,6 +25,7 @@ export namespace Format {
|
||||
export type Status = z.infer<typeof Status>
|
||||
|
||||
export interface Interface {
|
||||
readonly run: (filepath: string) => Effect.Effect<void>
|
||||
readonly status: () => Effect.Effect<Status[]>
|
||||
}
|
||||
|
||||
@@ -90,48 +89,44 @@ export namespace Format {
|
||||
return result
|
||||
}
|
||||
|
||||
yield* Effect.acquireRelease(
|
||||
Effect.sync(() =>
|
||||
Bus.subscribe(
|
||||
File.Event.Edited,
|
||||
Instance.bind(async (payload) => {
|
||||
const file = payload.properties.file
|
||||
log.info("formatting", { file })
|
||||
const ext = path.extname(file)
|
||||
const run = Effect.fn("Format.run")(function* (filepath: string) {
|
||||
log.info("formatting", { file: filepath })
|
||||
const ext = path.extname(filepath)
|
||||
|
||||
for (const item of await getFormatter(ext)) {
|
||||
log.info("running", { command: item.command })
|
||||
try {
|
||||
const proc = Process.spawn(
|
||||
item.command.map((x) => x.replace("$FILE", file)),
|
||||
{
|
||||
cwd: instance.directory,
|
||||
env: { ...process.env, ...item.environment },
|
||||
stdout: "ignore",
|
||||
stderr: "ignore",
|
||||
},
|
||||
)
|
||||
const exit = await proc.exited
|
||||
if (exit !== 0) {
|
||||
log.error("failed", {
|
||||
command: item.command,
|
||||
...item.environment,
|
||||
})
|
||||
}
|
||||
} catch (error) {
|
||||
log.error("failed to format file", {
|
||||
error,
|
||||
command: item.command,
|
||||
...item.environment,
|
||||
file,
|
||||
})
|
||||
}
|
||||
for (const item of yield* Effect.promise(() => getFormatter(ext))) {
|
||||
log.info("running", { command: item.command })
|
||||
yield* Effect.tryPromise({
|
||||
try: async () => {
|
||||
const proc = Process.spawn(
|
||||
item.command.map((x) => x.replace("$FILE", filepath)),
|
||||
{
|
||||
cwd: instance.directory,
|
||||
env: { ...process.env, ...item.environment },
|
||||
stdout: "ignore",
|
||||
stderr: "ignore",
|
||||
},
|
||||
)
|
||||
const exit = await proc.exited
|
||||
if (exit !== 0) {
|
||||
log.error("failed", {
|
||||
command: item.command,
|
||||
...item.environment,
|
||||
})
|
||||
}
|
||||
}),
|
||||
),
|
||||
),
|
||||
(unsubscribe) => Effect.sync(unsubscribe),
|
||||
)
|
||||
},
|
||||
catch: (error) => {
|
||||
log.error("failed to format file", {
|
||||
error,
|
||||
command: item.command,
|
||||
...item.environment,
|
||||
file: filepath,
|
||||
})
|
||||
return error
|
||||
},
|
||||
}).pipe(Effect.ignore)
|
||||
}
|
||||
})
|
||||
|
||||
log.info("init")
|
||||
|
||||
const status = Effect.fn("Format.status")(function* () {
|
||||
@@ -147,10 +142,14 @@ export namespace Format {
|
||||
return result
|
||||
})
|
||||
|
||||
return Service.of({ status })
|
||||
return Service.of({ run, status })
|
||||
}),
|
||||
)
|
||||
|
||||
export async function run(filepath: string) {
|
||||
return runPromiseInstance(Service.use((s) => s.run(filepath)))
|
||||
}
|
||||
|
||||
export async function status() {
|
||||
return runPromiseInstance(Service.use((s) => s.status()))
|
||||
}
|
||||
|
||||
@@ -956,7 +956,7 @@ export namespace MessageV2 {
|
||||
{ cause: e },
|
||||
).toObject()
|
||||
case e instanceof Error:
|
||||
return new NamedError.Unknown({ message: e.toString() }, { cause: e }).toObject()
|
||||
return new NamedError.Unknown({ message: e instanceof Error ? e.message : String(e) }, { cause: e }).toObject()
|
||||
default:
|
||||
try {
|
||||
const parsed = ProviderError.parseStreamError(e)
|
||||
|
||||
@@ -210,7 +210,7 @@ export namespace SessionProcessor {
|
||||
state: {
|
||||
status: "error",
|
||||
input: value.input ?? match.state.input,
|
||||
error: (value.error as any).toString(),
|
||||
error: value.error instanceof Error ? value.error.message : String(value.error),
|
||||
time: {
|
||||
start: match.state.time.start,
|
||||
end: Date.now(),
|
||||
|
||||
@@ -10,6 +10,7 @@ import { createTwoFilesPatch, diffLines } from "diff"
|
||||
import { assertExternalDirectory } from "./external-directory"
|
||||
import { trimDiff } from "./edit"
|
||||
import { LSP } from "../lsp"
|
||||
import { Format } from "../format"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import DESCRIPTION from "./apply_patch.txt"
|
||||
import { File } from "../file"
|
||||
@@ -220,6 +221,7 @@ export const ApplyPatchTool = Tool.define("apply_patch", {
|
||||
}
|
||||
|
||||
if (edited) {
|
||||
await Format.run(edited)
|
||||
await Bus.publish(File.Event.Edited, {
|
||||
file: edited,
|
||||
})
|
||||
|
||||
@@ -13,6 +13,7 @@ import { File } from "../file"
|
||||
import { FileWatcher } from "../file/watcher"
|
||||
import { Bus } from "../bus"
|
||||
import { FileTime } from "../file/time"
|
||||
import { Format } from "../format"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import { Instance } from "../project/instance"
|
||||
import { Snapshot } from "@/snapshot"
|
||||
@@ -71,6 +72,7 @@ export const EditTool = Tool.define("edit", {
|
||||
},
|
||||
})
|
||||
await Filesystem.write(filePath, params.newString)
|
||||
await Format.run(filePath)
|
||||
await Bus.publish(File.Event.Edited, {
|
||||
file: filePath,
|
||||
})
|
||||
@@ -108,6 +110,7 @@ export const EditTool = Tool.define("edit", {
|
||||
})
|
||||
|
||||
await Filesystem.write(filePath, contentNew)
|
||||
await Format.run(filePath)
|
||||
await Bus.publish(File.Event.Edited, {
|
||||
file: filePath,
|
||||
})
|
||||
|
||||
@@ -8,6 +8,7 @@ import { Bus } from "../bus"
|
||||
import { File } from "../file"
|
||||
import { FileWatcher } from "../file/watcher"
|
||||
import { FileTime } from "../file/time"
|
||||
import { Format } from "../format"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import { Instance } from "../project/instance"
|
||||
import { trimDiff } from "./edit"
|
||||
@@ -42,6 +43,7 @@ export const WriteTool = Tool.define("write", {
|
||||
})
|
||||
|
||||
await Filesystem.write(filepath, params.content)
|
||||
await Format.run(filepath)
|
||||
await Bus.publish(File.Event.Edited, {
|
||||
file: filepath,
|
||||
})
|
||||
|
||||
293
packages/opencode/test/bus/bus.test.ts
Normal file
293
packages/opencode/test/bus/bus.test.ts
Normal file
@@ -0,0 +1,293 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import z from "zod"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { BusEvent } from "../../src/bus/bus-event"
|
||||
import { GlobalBus } from "../../src/bus/global"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test event definitions
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const TestEvent = {
|
||||
Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
|
||||
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function withInstance(directory: string, fn: () => Promise<void>) {
|
||||
return Instance.provide({ directory, fn })
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("Bus", () => {
|
||||
afterEach(() => Instance.disposeAll())
|
||||
|
||||
describe("publish + subscribe", () => {
|
||||
test("subscriber receives matching events", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const received: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => {
|
||||
received.push(evt.properties.value)
|
||||
})
|
||||
await Bus.publish(TestEvent.Ping, { value: 42 })
|
||||
await Bus.publish(TestEvent.Ping, { value: 99 })
|
||||
})
|
||||
|
||||
expect(received).toEqual([42, 99])
|
||||
})
|
||||
|
||||
test("subscriber does not receive events of other types", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const pings: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => {
|
||||
pings.push(evt.properties.value)
|
||||
})
|
||||
await Bus.publish(TestEvent.Pong, { message: "hello" })
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
})
|
||||
|
||||
expect(pings).toEqual([1])
|
||||
})
|
||||
|
||||
test("publish with no subscribers does not throw", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("multiple subscribers", () => {
|
||||
test("all subscribers for same event type are called", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const a: number[] = []
|
||||
const b: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value))
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value))
|
||||
await Bus.publish(TestEvent.Ping, { value: 7 })
|
||||
})
|
||||
|
||||
expect(a).toEqual([7])
|
||||
expect(b).toEqual([7])
|
||||
})
|
||||
|
||||
test("subscribers are called in registration order", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const order: string[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, () => order.push("first"))
|
||||
Bus.subscribe(TestEvent.Ping, () => order.push("second"))
|
||||
Bus.subscribe(TestEvent.Ping, () => order.push("third"))
|
||||
await Bus.publish(TestEvent.Ping, { value: 0 })
|
||||
})
|
||||
|
||||
expect(order).toEqual(["first", "second", "third"])
|
||||
})
|
||||
})
|
||||
|
||||
describe("unsubscribe", () => {
|
||||
test("unsubscribe stops delivery", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const received: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
|
||||
received.push(evt.properties.value)
|
||||
})
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
unsub()
|
||||
await Bus.publish(TestEvent.Ping, { value: 2 })
|
||||
})
|
||||
|
||||
expect(received).toEqual([1])
|
||||
})
|
||||
|
||||
test("unsubscribe is idempotent", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
const unsub = Bus.subscribe(TestEvent.Ping, () => {})
|
||||
unsub()
|
||||
unsub() // should not throw
|
||||
})
|
||||
})
|
||||
|
||||
test("unsubscribing one does not affect others", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const a: number[] = []
|
||||
const b: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
const unsubA = Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value))
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value))
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
unsubA()
|
||||
await Bus.publish(TestEvent.Ping, { value: 2 })
|
||||
})
|
||||
|
||||
expect(a).toEqual([1])
|
||||
expect(b).toEqual([1, 2])
|
||||
})
|
||||
})
|
||||
|
||||
describe("subscribeAll", () => {
|
||||
test("receives events of all types", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const all: string[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribeAll((evt) => {
|
||||
all.push(evt.type)
|
||||
})
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
await Bus.publish(TestEvent.Pong, { message: "hi" })
|
||||
})
|
||||
|
||||
expect(all).toEqual(["test.ping", "test.pong"])
|
||||
})
|
||||
|
||||
test("subscribeAll + typed subscribe both fire", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const typed: number[] = []
|
||||
const wild: string[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => typed.push(evt.properties.value))
|
||||
Bus.subscribeAll((evt) => wild.push(evt.type))
|
||||
await Bus.publish(TestEvent.Ping, { value: 5 })
|
||||
})
|
||||
|
||||
expect(typed).toEqual([5])
|
||||
expect(wild).toEqual(["test.ping"])
|
||||
})
|
||||
|
||||
test("unsubscribe from subscribeAll", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const all: string[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
const unsub = Bus.subscribeAll((evt) => all.push(evt.type))
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
unsub()
|
||||
await Bus.publish(TestEvent.Pong, { message: "missed" })
|
||||
})
|
||||
|
||||
expect(all).toEqual(["test.ping"])
|
||||
})
|
||||
})
|
||||
|
||||
describe("GlobalBus forwarding", () => {
|
||||
test("publish emits to GlobalBus with directory", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const globalEvents: Array<{ directory?: string; payload: any }> = []
|
||||
|
||||
const handler = (evt: any) => globalEvents.push(evt)
|
||||
GlobalBus.on("event", handler)
|
||||
|
||||
try {
|
||||
await withInstance(tmp.path, async () => {
|
||||
await Bus.publish(TestEvent.Ping, { value: 42 })
|
||||
})
|
||||
|
||||
const ping = globalEvents.find((e) => e.payload.type === "test.ping")
|
||||
expect(ping).toBeDefined()
|
||||
expect(ping!.directory).toBe(tmp.path)
|
||||
expect(ping!.payload).toEqual({
|
||||
type: "test.ping",
|
||||
properties: { value: 42 },
|
||||
})
|
||||
} finally {
|
||||
GlobalBus.off("event", handler)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
describe("instance isolation", () => {
|
||||
test("subscribers in one instance do not receive events from another", async () => {
|
||||
await using tmpA = await tmpdir()
|
||||
await using tmpB = await tmpdir()
|
||||
const eventsA: number[] = []
|
||||
const eventsB: number[] = []
|
||||
|
||||
await withInstance(tmpA.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => eventsA.push(evt.properties.value))
|
||||
})
|
||||
|
||||
await withInstance(tmpB.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, (evt) => eventsB.push(evt.properties.value))
|
||||
})
|
||||
|
||||
await withInstance(tmpA.path, async () => {
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
})
|
||||
|
||||
await withInstance(tmpB.path, async () => {
|
||||
await Bus.publish(TestEvent.Ping, { value: 2 })
|
||||
})
|
||||
|
||||
expect(eventsA).toEqual([1])
|
||||
expect(eventsB).toEqual([2])
|
||||
})
|
||||
})
|
||||
|
||||
describe("instance disposal", () => {
|
||||
test("InstanceDisposed is emitted to GlobalBus on disposal", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const globalEvents: Array<{ directory?: string; payload: any }> = []
|
||||
|
||||
const handler = (evt: any) => globalEvents.push(evt)
|
||||
GlobalBus.on("event", handler)
|
||||
|
||||
try {
|
||||
await withInstance(tmp.path, async () => {
|
||||
// Instance is active — subscribe so the layer gets created
|
||||
Bus.subscribe(TestEvent.Ping, () => {})
|
||||
})
|
||||
|
||||
await Instance.disposeAll()
|
||||
|
||||
const disposed = globalEvents.find((e) => e.payload.type === "server.instance.disposed")
|
||||
expect(disposed).toBeDefined()
|
||||
expect(disposed!.payload.properties.directory).toBe(tmp.path)
|
||||
} finally {
|
||||
GlobalBus.off("event", handler)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
describe("async subscribers", () => {
|
||||
test("publish is fire-and-forget (does not await subscriber callbacks)", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const received: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(TestEvent.Ping, async (evt) => {
|
||||
await new Promise((r) => setTimeout(r, 10))
|
||||
received.push(evt.properties.value)
|
||||
})
|
||||
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
// Give the async subscriber time to complete
|
||||
await new Promise((r) => setTimeout(r, 50))
|
||||
})
|
||||
|
||||
expect(received).toEqual([1])
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -5,9 +5,9 @@ import path from "path"
|
||||
import { Deferred, Effect, Option } from "effect"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { watcherConfigLayer, withServices } from "../fixture/instance"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { FileWatcher } from "../../src/file/watcher"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { GlobalBus } from "../../src/bus/global"
|
||||
|
||||
// Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
|
||||
const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
|
||||
@@ -16,7 +16,6 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
|
||||
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
|
||||
|
||||
/** Run `body` with a live FileWatcher service. */
|
||||
@@ -36,22 +35,17 @@ function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
|
||||
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
|
||||
let done = false
|
||||
|
||||
function on(evt: BusUpdate) {
|
||||
const unsub = Bus.subscribe(FileWatcher.Event.Updated, (evt) => {
|
||||
if (done) return
|
||||
if (evt.directory !== directory) return
|
||||
if (evt.payload.type !== FileWatcher.Event.Updated.type) return
|
||||
if (!check(evt.payload.properties)) return
|
||||
hit(evt.payload.properties)
|
||||
}
|
||||
if (!check(evt.properties)) return
|
||||
hit(evt.properties)
|
||||
})
|
||||
|
||||
function cleanup() {
|
||||
return () => {
|
||||
if (done) return
|
||||
done = true
|
||||
GlobalBus.off("event", on)
|
||||
unsub()
|
||||
}
|
||||
|
||||
GlobalBus.on("event", on)
|
||||
return cleanup
|
||||
}
|
||||
|
||||
function wait(directory: string, check: (evt: WatcherEvent) => boolean) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import { MessageV2 } from "../../src/session/message-v2"
|
||||
import type { Provider } from "../../src/provider/provider"
|
||||
import { ModelID, ProviderID } from "../../src/provider/schema"
|
||||
import { SessionID, MessageID, PartID } from "../../src/session/schema"
|
||||
import { Question } from "../../src/question"
|
||||
|
||||
const sessionID = SessionID.make("session")
|
||||
const providerID = ProviderID.make("test")
|
||||
@@ -915,4 +916,15 @@ describe("session.message-v2.fromError", () => {
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("serializes tagged errors with their message", () => {
|
||||
const result = MessageV2.fromError(new Question.RejectedError(), { providerID })
|
||||
|
||||
expect(result).toStrictEqual({
|
||||
name: "UnknownError",
|
||||
data: {
|
||||
message: "The user dismissed this question",
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user