Compare commits

...

10 Commits

Author SHA1 Message Date
Kit Langton
8a3aa943dd fix: suppress unhandled interrupt error from forceInvalidate 2026-03-19 12:56:41 -04:00
Kit Langton
9be68a9fa4 fix: link to upstream PubSub shutdown PR 2026-03-19 12:40:07 -04:00
Kit Langton
2a0c9da40b Merge branch 'dev' into kit/effect-bus 2026-03-19 12:22:09 -04:00
Kit Langton
81f71c9b30 fix(bus): GlobalBus bridge for InstanceDisposed + forceInvalidate + Effect tests
Legacy subscribeAll delivers InstanceDisposed via GlobalBus because
the fiber starts asynchronously and may not be running when disposal
happens. This bridge can be removed once upstream PubSub.shutdown
properly wakes suspended subscribers.

Add forceInvalidate in Instances that closes the RcMap entry scope
regardless of refCount. Standard RcMap.invalidate bails when
refCount > 0 — an upstream issue (Effect-TS/effect-smol#1799).

Add PubSub shutdown finalizer to Bus layer so layer teardown
properly cleans up PubSubs.

Add Effect-native tests proving forkScoped + scope closure works
correctly: ensuring fires when the scope closes, streams receive
published events.

Remove stale GlobalBus disposal test (instance.ts responsibility).
2026-03-19 12:17:39 -04:00
Kit Langton
992f4f794a fix(bus): use GlobalBus for InstanceDisposed in legacy subscribeAll
The sync callback API can't wait for async layer acquisition, so
delivering InstanceDisposed through the PubSub stream is a race
condition. Instead, the legacy subscribeAll adapter listens on
GlobalBus for InstanceDisposed matching the current directory.

The Effect service's stream ending IS the disposal signal for
Effect consumers — this is only needed for the legacy callback API.

Also reverts forceInvalidate, fiber tracking, priority-based
disposal, and other workaround attempts. Clean simple solution.
2026-03-19 09:40:39 -04:00
Kit Langton
0c2b5b2c39 fix(bus): use Fiber.interrupt for clean disposal of subscribeAll
Use forkInstance + Fiber.interrupt (which awaits) instead of
runCallbackInstance + interruptUnsafe (fire-and-forget) for
subscribeAll. This ensures the fiber completes before layer
invalidation, allowing the RcMap refCount to drop to 0.

subscribeAll now delivers InstanceDisposed as the last callback
message via Effect.ensuring when the fiber is interrupted during
disposal, but not on manual unsubscribe.

Add priority support to registerDisposer so Bus can interrupt
subscription fibers (priority -1) before layer invalidation
(priority 0).

Add forkInstance helper to effect/runtime that returns a Fiber
instead of an interrupt function.
2026-03-19 08:49:06 -04:00
Kit Langton
009d77c9d8 refactor(format): make formatting explicit instead of bus-driven
Replace the implicit Bus.subscribe(File.Event.Edited) formatter with
an explicit Format.run(filepath) call in write/edit/apply_patch tools.

This ensures formatting completes before FileTime stamps and LSP
diagnostics run, rather than relying on the bus to block on subscribers.

- Add Format.run() to the Effect service interface and legacy adapter
- Call Format.run() in write, edit, and apply_patch tools after writes
- Remove Bus subscription from Format layer
2026-03-18 22:09:14 -04:00
Kit Langton
f3cf519d98 feat(bus): migrate Bus to Effect service with PubSub internals
Add Bus.Service as a ServiceMap.Service backed by Effect PubSub:
- publish() pushes to per-type + wildcard PubSubs and GlobalBus
- subscribe() returns a typed Stream via Stream.fromPubSub
- subscribeAll() returns a wildcard Stream

Legacy adapters wrap the Effect service:
- publish → runPromiseInstance
- subscribe/subscribeAll → runCallbackInstance with Stream.runForEach

Other changes:
- Register Bus.Service in Instances LayerMap
- Add runCallbackInstance helper to effect/runtime
- Remove unused Bus.once (zero callers)
- Skip PubSub creation on publish when no subscribers exist
- Move subscribe/unsubscribe logging into the Effect service layer
2026-03-18 21:36:04 -04:00
Kit Langton
645c15351b test(bus): add comprehensive test suite for Bus service
Covers publish/subscribe, multiple subscribers, unsubscribe, subscribeAll,
once, GlobalBus forwarding, instance isolation, disposal, and async subscribers.
2026-03-18 21:05:36 -04:00
Kit Langton
f63a2a2636 fix(bus): tighten GlobalBus payload and BusEvent.define types
Constrain BusEvent.define to ZodObject instead of ZodType so TS knows
event properties are always a record. Type GlobalBus payload as
{ type: string; properties: Record<string, unknown> } instead of any.

Refactor watcher test to use Bus.subscribe instead of raw GlobalBus
listener, removing hand-rolled event types and unnecessary casts.
2026-03-18 20:57:08 -04:00
12 changed files with 569 additions and 132 deletions

View File

@@ -1,5 +1,5 @@
import z from "zod"
import type { ZodType } from "zod"
import type { ZodObject, ZodRawShape } from "zod"
import { Log } from "../util/log"
export namespace BusEvent {
@@ -9,7 +9,7 @@ export namespace BusEvent {
const registry = new Map<string, Definition>()
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
export function define<Type extends string, Properties extends ZodObject<ZodRawShape>>(type: Type, properties: Properties) {
const result = {
type,
properties,

View File

@@ -4,7 +4,7 @@ export const GlobalBus = new EventEmitter<{
event: [
{
directory?: string
payload: any
payload: { type: string; properties: Record<string, unknown> }
},
]
}>()

View File

@@ -1,12 +1,13 @@
import z from "zod"
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
import { runCallbackInstance, runPromiseInstance } from "../effect/runtime"
export namespace Bus {
const log = Log.create({ service: "bus" })
type Subscription = (event: any) => void
export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
@@ -15,91 +16,130 @@ export namespace Bus {
}),
)
const state = Instance.state(
() => {
const subscriptions = new Map<any, Subscription[]>()
// ---------------------------------------------------------------------------
// Service definition
// ---------------------------------------------------------------------------
return {
subscriptions,
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
type: D["type"]
properties: z.infer<D["properties"]>
}
export interface Interface {
readonly publish: <D extends BusEvent.Definition>(
def: D,
properties: z.output<D["properties"]>,
) => Effect.Effect<void>
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
readonly subscribeAll: () => Stream.Stream<Payload>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const pubsubs = new Map<string, PubSub.PubSub<Payload>>()
const wildcardPubSub = yield* PubSub.unbounded<Payload>()
const getOrCreate = Effect.fnUntraced(function* (type: string) {
let ps = pubsubs.get(type)
if (!ps) {
ps = yield* PubSub.unbounded<Payload>()
pubsubs.set(type, ps)
}
return ps
})
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return Effect.gen(function* () {
const payload: Payload = { type: def.type, properties }
log.info("publishing", { type: def.type })
const ps = pubsubs.get(def.type)
if (ps) yield* PubSub.publish(ps, payload)
yield* PubSub.publish(wildcardPubSub, payload)
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
})
}
},
async (entry) => {
const wildcard = entry.subscriptions.get("*")
if (!wildcard) return
const event = {
type: InstanceDisposed.type,
properties: {
directory: Instance.directory,
},
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
log.info("subscribing", { type: def.type })
return Stream.unwrap(
Effect.gen(function* () {
const ps = yield* getOrCreate(def.type)
return Stream.fromPubSub(ps) as Stream.Stream<Payload<D>>
}),
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
}
for (const sub of [...wildcard]) {
sub(event)
function subscribeAll(): Stream.Stream<Payload> {
log.info("subscribing", { type: "*" })
return Stream.fromPubSub(wildcardPubSub).pipe(
Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))),
)
}
},
// Shut down all PubSubs when the layer is torn down.
// This causes Stream.fromPubSub consumers to end, triggering
// their ensuring/finalizers.
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
log.info("shutting down PubSubs")
yield* PubSub.shutdown(wildcardPubSub)
for (const ps of pubsubs.values()) {
yield* PubSub.shutdown(ps)
}
}),
)
return Service.of({ publish, subscribe, subscribeAll })
}),
)
export async function publish<Definition extends BusEvent.Definition>(
def: Definition,
properties: z.output<Definition["properties"]>,
) {
const payload = {
type: def.type,
properties,
}
log.info("publishing", {
type: def.type,
})
const pending = []
for (const key of [def.type, "*"]) {
const match = state().subscriptions.get(key)
for (const sub of match ?? []) {
pending.push(sub(payload))
}
}
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
return Promise.all(pending)
// ---------------------------------------------------------------------------
// Legacy adapters — plain function API wrapping the Effect service
// ---------------------------------------------------------------------------
function runStream(stream: (svc: Interface) => Stream.Stream<Payload>, callback: (event: any) => void) {
return runCallbackInstance(
Service.use((svc) => stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg))))),
)
}
export function subscribe<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
) {
return raw(def.type, callback)
export function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return runPromiseInstance(Service.use((svc) => svc.publish(def, properties)))
}
export function once<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: {
type: Definition["type"]
properties: z.infer<Definition["properties"]>
}) => "done" | undefined,
) {
const unsub = subscribe(def, (event) => {
if (callback(event)) unsub()
})
export function subscribe<D extends BusEvent.Definition>(def: D, callback: (event: Payload<D>) => void) {
return runStream((svc) => svc.subscribe(def), callback)
}
export function subscribeAll(callback: (event: any) => void) {
return raw("*", callback)
}
const directory = Instance.directory
function raw(type: string, callback: (event: any) => void) {
log.info("subscribing", { type })
const subscriptions = state().subscriptions
let match = subscriptions.get(type) ?? []
match.push(callback)
subscriptions.set(type, match)
// InstanceDisposed is delivered via GlobalBus because the legacy
// adapter's fiber starts asynchronously and may not be running when
// disposal happens. In the Effect-native path, forkScoped + scope
// closure handles this correctly. This bridge can be removed once
// upstream PubSub.shutdown properly wakes suspended subscribers:
// https://github.com/Effect-TS/effect-smol/pull/1800
const onDispose = (evt: { directory?: string; payload: any }) => {
if (evt.payload.type !== InstanceDisposed.type) return
if (evt.directory !== directory) return
callback(evt.payload)
GlobalBus.off("event", onDispose)
}
GlobalBus.on("event", onDispose)
const interrupt = runStream((svc) => svc.subscribeAll(), callback)
return () => {
log.info("unsubscribing", { type })
const match = subscriptions.get(type)
if (!match) return
const index = match.indexOf(callback)
if (index === -1) return
match.splice(index, 1)
GlobalBus.off("event", onDispose)
interrupt()
}
}
}

View File

@@ -123,7 +123,7 @@ export namespace Workspace {
await parseSSE(res.body, stop, (event) => {
GlobalBus.emit("event", {
directory: space.id,
payload: event,
payload: event as { type: string; properties: Record<string, unknown> },
})
})
// Wait 250ms and retry if SSE connection fails

View File

@@ -1,4 +1,5 @@
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
import { Effect, Exit, Fiber, Layer, LayerMap, MutableHashMap, Scope, ServiceMap } from "effect"
import { Bus } from "@/bus"
import { File } from "@/file"
import { FileTime } from "@/file/time"
import { FileWatcher } from "@/file/watcher"
@@ -16,6 +17,7 @@ import { registerDisposer } from "./instance-registry"
export { InstanceContext } from "./instance-context"
export type InstanceServices =
| Bus.Service
| Question.Service
| PermissionNext.Service
| ProviderAuth.Service
@@ -36,6 +38,7 @@ export type InstanceServices =
function lookup(_key: string) {
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current))
return Layer.mergeAll(
Layer.fresh(Bus.layer),
Layer.fresh(Question.layer),
Layer.fresh(PermissionNext.layer),
Layer.fresh(ProviderAuth.defaultLayer),
@@ -56,7 +59,23 @@ export class Instances extends ServiceMap.Service<Instances, LayerMap.LayerMap<s
Instances,
Effect.gen(function* () {
const layerMap = yield* LayerMap.make(lookup, { idleTimeToLive: Infinity })
const unregister = registerDisposer((directory) => Effect.runPromise(layerMap.invalidate(directory)))
// Force-invalidate closes the RcMap entry scope even when refCount > 0.
// Standard RcMap.invalidate bails in that case, leaving long-running
// consumer fibers orphaned. This is an upstream issue:
// https://github.com/Effect-TS/effect-smol/pull/1799
const forceInvalidate = (directory: string) =>
Effect.gen(function* () {
const rcMap = layerMap.rcMap
if (rcMap.state._tag === "Closed") return
const entry = MutableHashMap.get(rcMap.state.map, directory)
if (entry._tag === "None") return
MutableHashMap.remove(rcMap.state.map, directory)
if (entry.value.fiber) yield* Fiber.interrupt(entry.value.fiber)
yield* Scope.close(entry.value.scope, Exit.void)
}).pipe(Effect.uninterruptible, Effect.ignore)
const unregister = registerDisposer((directory) => Effect.runPromise(forceInvalidate(directory)))
yield* Effect.addFinalizer(() => Effect.sync(unregister))
return Instances.of(layerMap)
}),

View File

@@ -18,6 +18,12 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function runCallbackInstance<A, E>(
effect: Effect.Effect<A, E, InstanceServices>,
): (interruptor?: number) => void {
return runtime.runCallback(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function disposeRuntime() {
return runtime.dispose()
}

View File

@@ -4,9 +4,7 @@ import { InstanceContext } from "@/effect/instance-context"
import path from "path"
import { mergeDeep } from "remeda"
import z from "zod"
import { Bus } from "../bus"
import { Config } from "../config/config"
import { File } from "../file"
import { Instance } from "../project/instance"
import { Process } from "../util/process"
import { Log } from "../util/log"
@@ -27,6 +25,7 @@ export namespace Format {
export type Status = z.infer<typeof Status>
export interface Interface {
readonly run: (filepath: string) => Effect.Effect<void>
readonly status: () => Effect.Effect<Status[]>
}
@@ -90,48 +89,44 @@ export namespace Format {
return result
}
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribe(
File.Event.Edited,
Instance.bind(async (payload) => {
const file = payload.properties.file
log.info("formatting", { file })
const ext = path.extname(file)
const run = Effect.fn("Format.run")(function* (filepath: string) {
log.info("formatting", { file: filepath })
const ext = path.extname(filepath)
for (const item of await getFormatter(ext)) {
log.info("running", { command: item.command })
try {
const proc = Process.spawn(
item.command.map((x) => x.replace("$FILE", file)),
{
cwd: instance.directory,
env: { ...process.env, ...item.environment },
stdout: "ignore",
stderr: "ignore",
},
)
const exit = await proc.exited
if (exit !== 0) {
log.error("failed", {
command: item.command,
...item.environment,
})
}
} catch (error) {
log.error("failed to format file", {
error,
command: item.command,
...item.environment,
file,
})
}
for (const item of yield* Effect.promise(() => getFormatter(ext))) {
log.info("running", { command: item.command })
yield* Effect.tryPromise({
try: async () => {
const proc = Process.spawn(
item.command.map((x) => x.replace("$FILE", filepath)),
{
cwd: instance.directory,
env: { ...process.env, ...item.environment },
stdout: "ignore",
stderr: "ignore",
},
)
const exit = await proc.exited
if (exit !== 0) {
log.error("failed", {
command: item.command,
...item.environment,
})
}
}),
),
),
(unsubscribe) => Effect.sync(unsubscribe),
)
},
catch: (error) => {
log.error("failed to format file", {
error,
command: item.command,
...item.environment,
file: filepath,
})
return error
},
}).pipe(Effect.ignore)
}
})
log.info("init")
const status = Effect.fn("Format.status")(function* () {
@@ -147,10 +142,14 @@ export namespace Format {
return result
})
return Service.of({ status })
return Service.of({ run, status })
}),
)
export async function run(filepath: string) {
return runPromiseInstance(Service.use((s) => s.run(filepath)))
}
export async function status() {
return runPromiseInstance(Service.use((s) => s.status()))
}

View File

@@ -10,6 +10,7 @@ import { createTwoFilesPatch, diffLines } from "diff"
import { assertExternalDirectory } from "./external-directory"
import { trimDiff } from "./edit"
import { LSP } from "../lsp"
import { Format } from "../format"
import { Filesystem } from "../util/filesystem"
import DESCRIPTION from "./apply_patch.txt"
import { File } from "../file"
@@ -220,6 +221,7 @@ export const ApplyPatchTool = Tool.define("apply_patch", {
}
if (edited) {
await Format.run(edited)
await Bus.publish(File.Event.Edited, {
file: edited,
})

View File

@@ -13,6 +13,7 @@ import { File } from "../file"
import { FileWatcher } from "../file/watcher"
import { Bus } from "../bus"
import { FileTime } from "../file/time"
import { Format } from "../format"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
import { Snapshot } from "@/snapshot"
@@ -71,6 +72,7 @@ export const EditTool = Tool.define("edit", {
},
})
await Filesystem.write(filePath, params.newString)
await Format.run(filePath)
await Bus.publish(File.Event.Edited, {
file: filePath,
})
@@ -108,6 +110,7 @@ export const EditTool = Tool.define("edit", {
})
await Filesystem.write(filePath, contentNew)
await Format.run(filePath)
await Bus.publish(File.Event.Edited, {
file: filePath,
})

View File

@@ -8,6 +8,7 @@ import { Bus } from "../bus"
import { File } from "../file"
import { FileWatcher } from "../file/watcher"
import { FileTime } from "../file/time"
import { Format } from "../format"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
import { trimDiff } from "./edit"
@@ -42,6 +43,7 @@ export const WriteTool = Tool.define("write", {
})
await Filesystem.write(filepath, params.content)
await Format.run(filepath)
await Bus.publish(File.Event.Edited, {
file: filepath,
})

View File

@@ -0,0 +1,372 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Deferred, Effect, Stream } from "effect"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { GlobalBus } from "../../src/bus/global"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
// ---------------------------------------------------------------------------
// Test event definitions
// ---------------------------------------------------------------------------
const TestEvent = {
Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function withInstance(directory: string, fn: () => Promise<void>) {
return Instance.provide({ directory, fn })
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("Bus", () => {
afterEach(() => Instance.disposeAll())
describe("publish + subscribe", () => {
test("subscriber receives matching events", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
await Bus.publish(TestEvent.Ping, { value: 42 })
await Bus.publish(TestEvent.Ping, { value: 99 })
})
expect(received).toEqual([42, 99])
})
test("subscriber does not receive events of other types", async () => {
await using tmp = await tmpdir()
const pings: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
pings.push(evt.properties.value)
})
await Bus.publish(TestEvent.Pong, { message: "hello" })
await Bus.publish(TestEvent.Ping, { value: 1 })
})
expect(pings).toEqual([1])
})
test("publish with no subscribers does not throw", async () => {
await using tmp = await tmpdir()
await withInstance(tmp.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
})
})
})
describe("multiple subscribers", () => {
test("all subscribers for same event type are called", async () => {
await using tmp = await tmpdir()
const a: number[] = []
const b: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value))
Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value))
await Bus.publish(TestEvent.Ping, { value: 7 })
})
expect(a).toEqual([7])
expect(b).toEqual([7])
})
test("subscribers are called in registration order", async () => {
await using tmp = await tmpdir()
const order: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, () => order.push("first"))
Bus.subscribe(TestEvent.Ping, () => order.push("second"))
Bus.subscribe(TestEvent.Ping, () => order.push("third"))
await Bus.publish(TestEvent.Ping, { value: 0 })
})
expect(order).toEqual(["first", "second", "third"])
})
})
describe("unsubscribe", () => {
test("unsubscribe stops delivery", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
unsub()
await Bus.publish(TestEvent.Ping, { value: 2 })
})
expect(received).toEqual([1])
})
test("unsubscribe is idempotent", async () => {
await using tmp = await tmpdir()
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent.Ping, () => {})
unsub()
unsub() // should not throw
})
})
test("unsubscribing one does not affect others", async () => {
await using tmp = await tmpdir()
const a: number[] = []
const b: number[] = []
await withInstance(tmp.path, async () => {
const unsubA = Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value))
Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value))
await Bus.publish(TestEvent.Ping, { value: 1 })
unsubA()
await Bus.publish(TestEvent.Ping, { value: 2 })
})
expect(a).toEqual([1])
expect(b).toEqual([1, 2])
})
})
describe("subscribeAll", () => {
test("receives events of all types", async () => {
await using tmp = await tmpdir()
const all: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
all.push(evt.type)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bus.publish(TestEvent.Pong, { message: "hi" })
})
expect(all).toEqual(["test.ping", "test.pong"])
})
test("subscribeAll + typed subscribe both fire", async () => {
await using tmp = await tmpdir()
const typed: number[] = []
const wild: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => typed.push(evt.properties.value))
Bus.subscribeAll((evt) => wild.push(evt.type))
await Bus.publish(TestEvent.Ping, { value: 5 })
})
expect(typed).toEqual([5])
expect(wild).toEqual(["test.ping"])
})
test("unsubscribe from subscribeAll", async () => {
await using tmp = await tmpdir()
const all: string[] = []
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribeAll((evt) => all.push(evt.type))
await Bus.publish(TestEvent.Ping, { value: 1 })
unsub()
await Bus.publish(TestEvent.Pong, { message: "missed" })
})
expect(all).toEqual(["test.ping"])
})
test("subscribeAll delivers InstanceDisposed on disposal", async () => {
await using tmp = await tmpdir()
const all: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
all.push(evt.type)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
})
await Instance.disposeAll()
expect(all).toContain("test.ping")
expect(all).toContain(Bus.InstanceDisposed.type)
})
test("manual unsubscribe suppresses InstanceDisposed", async () => {
await using tmp = await tmpdir()
const all: string[] = []
let unsub = () => {}
await withInstance(tmp.path, async () => {
unsub = Bus.subscribeAll((evt) => {
all.push(evt.type)
})
})
unsub()
await Instance.disposeAll()
expect(all).not.toContain(Bus.InstanceDisposed.type)
})
})
describe("GlobalBus forwarding", () => {
test("publish emits to GlobalBus with directory", async () => {
await using tmp = await tmpdir()
const globalEvents: Array<{ directory?: string; payload: any }> = []
const handler = (evt: any) => globalEvents.push(evt)
GlobalBus.on("event", handler)
try {
await withInstance(tmp.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 42 })
})
const ping = globalEvents.find((e) => e.payload.type === "test.ping")
expect(ping).toBeDefined()
expect(ping!.directory).toBe(tmp.path)
expect(ping!.payload).toEqual({
type: "test.ping",
properties: { value: 42 },
})
} finally {
GlobalBus.off("event", handler)
}
})
})
describe("instance isolation", () => {
test("subscribers in one instance do not receive events from another", async () => {
await using tmpA = await tmpdir()
await using tmpB = await tmpdir()
const eventsA: number[] = []
const eventsB: number[] = []
await withInstance(tmpA.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => eventsA.push(evt.properties.value))
})
await withInstance(tmpB.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => eventsB.push(evt.properties.value))
})
await withInstance(tmpA.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
})
await withInstance(tmpB.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 2 })
})
expect(eventsA).toEqual([1])
expect(eventsB).toEqual([2])
})
})
describe("async subscribers", () => {
test("publish is fire-and-forget (does not await subscriber callbacks)", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, async (evt) => {
await new Promise((r) => setTimeout(r, 10))
received.push(evt.properties.value)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
// Give the async subscriber time to complete
await new Promise((r) => setTimeout(r, 50))
})
expect(received).toEqual([1])
})
})
describe("Effect service", () => {
test("subscribeAll stream receives published events", async () => {
await using tmp = await tmpdir()
const received: string[] = []
await withInstance(tmp.path, () =>
Effect.runPromise(
Effect.scoped(
Effect.gen(function* () {
const svc = yield* Bus.Service
const done = yield* Deferred.make<void>()
let count = 0
yield* Effect.forkScoped(
svc.subscribeAll().pipe(
Stream.runForEach((msg) =>
Effect.gen(function* () {
received.push(msg.type)
if (++count >= 2) yield* Deferred.succeed(done, undefined)
}),
),
),
)
// Let the forked fiber start and subscribe to the PubSub
yield* Effect.yieldNow
yield* svc.publish(TestEvent.Ping, { value: 1 })
yield* svc.publish(TestEvent.Pong, { message: "hi" })
yield* Deferred.await(done)
}),
).pipe(Effect.provide(Bus.layer)),
),
)
expect(received).toEqual(["test.ping", "test.pong"])
})
test("subscribeAll stream ends with ensuring when scope closes", async () => {
await using tmp = await tmpdir()
let ensuringFired = false
await withInstance(tmp.path, () =>
Effect.runPromise(
Effect.scoped(
Effect.gen(function* () {
const svc = yield* Bus.Service
yield* Effect.forkScoped(
svc.subscribeAll().pipe(
Stream.runForEach(() => Effect.void),
Effect.ensuring(Effect.sync(() => {
ensuringFired = true
})),
),
)
yield* svc.publish(TestEvent.Ping, { value: 1 })
yield* Effect.yieldNow
}),
).pipe(Effect.provide(Bus.layer)),
),
)
expect(ensuringFired).toBe(true)
})
})
})

View File

@@ -5,9 +5,9 @@ import path from "path"
import { Deferred, Effect, Option } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { Bus } from "../../src/bus"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
// Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
@@ -16,7 +16,6 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
// Helpers
// ---------------------------------------------------------------------------
type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
/** Run `body` with a live FileWatcher service. */
@@ -36,22 +35,17 @@ function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
let done = false
function on(evt: BusUpdate) {
const unsub = Bus.subscribe(FileWatcher.Event.Updated, (evt) => {
if (done) return
if (evt.directory !== directory) return
if (evt.payload.type !== FileWatcher.Event.Updated.type) return
if (!check(evt.payload.properties)) return
hit(evt.payload.properties)
}
if (!check(evt.properties)) return
hit(evt.properties)
})
function cleanup() {
return () => {
if (done) return
done = true
GlobalBus.off("event", on)
unsub()
}
GlobalBus.on("event", on)
return cleanup
}
function wait(directory: string, check: (evt: WatcherEvent) => boolean) {