test(bus): migrate bus tests to Effect runner (#27131)

This commit is contained in:
Kit Langton
2026-05-12 15:41:24 -04:00
committed by GitHub
parent 3e2ec192cf
commit fec78154b5

View File

@@ -1,220 +1,240 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Schema } from "effect"
import { afterEach, describe, expect } from "bun:test"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Deferred, Effect, Layer, Schema } from "effect"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { WithInstance } from "../../src/project/with-instance"
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
import { disposeAllInstances, provideInstance, tmpdirScoped } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
const TestEvent = {
Ping: BusEvent.define("test.ping", Schema.Struct({ value: Schema.Number })),
Pong: BusEvent.define("test.pong", Schema.Struct({ message: Schema.String })),
}
function withInstance(directory: string, fn: () => Promise<void>) {
return WithInstance.provide({ directory, fn })
}
const it = testEffect(Layer.mergeAll(Bus.layer, CrossSpawnSpawner.defaultLayer))
describe("Bus", () => {
afterEach(() => disposeAllInstances())
describe("publish + subscribe", () => {
test("subscriber is live immediately after subscribe returns", async () => {
await using tmp = await tmpdir()
const received: number[] = []
it.instance("subscriber is live immediately after subscribe returns", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: number[] = []
const done = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
yield* bus.subscribeCallback(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
Deferred.doneUnsafe(done, Effect.void)
})
await Bus.publish(TestEvent.Ping, { value: 42 })
await Bun.sleep(10)
})
yield* bus.publish(TestEvent.Ping, { value: 42 })
yield* Deferred.await(done).pipe(Effect.timeout("2 seconds"))
expect(received).toEqual([42])
})
expect(received).toEqual([42])
}),
)
test("subscriber receives matching events", async () => {
await using tmp = await tmpdir()
const received: number[] = []
it.instance("subscriber receives matching events", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: number[] = []
const done = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
yield* bus.subscribeCallback(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
})
// Give the subscriber fiber time to start consuming
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 42 })
await Bus.publish(TestEvent.Ping, { value: 99 })
// Give subscriber time to process
await Bun.sleep(10)
})
yield* bus.publish(TestEvent.Ping, { value: 42 })
yield* bus.publish(TestEvent.Ping, { value: 99 })
yield* Deferred.await(done).pipe(Effect.timeout("2 seconds"))
expect(received).toEqual([42, 99])
})
expect(received).toEqual([42, 99])
}),
)
test("subscriber does not receive events of other types", async () => {
await using tmp = await tmpdir()
const pings: number[] = []
it.instance("subscriber does not receive events of other types", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const pings: number[] = []
const done = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
yield* bus.subscribeCallback(TestEvent.Ping, (evt) => {
pings.push(evt.properties.value)
Deferred.doneUnsafe(done, Effect.void)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Pong, { message: "hello" })
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
yield* bus.publish(TestEvent.Pong, { message: "hello" })
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* Deferred.await(done).pipe(Effect.timeout("2 seconds"))
expect(pings).toEqual([1])
})
expect(pings).toEqual([1])
}),
)
test("publish with no subscribers does not throw", async () => {
await using tmp = await tmpdir()
await withInstance(tmp.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
})
})
it.instance("publish with no subscribers does not throw", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
yield* bus.publish(TestEvent.Ping, { value: 1 })
}),
)
})
describe("unsubscribe", () => {
test("unsubscribe stops delivery", async () => {
await using tmp = await tmpdir()
const received: number[] = []
it.instance("unsubscribe stops delivery", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: number[] = []
const first = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
const unsub = yield* bus.subscribeCallback(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
if (evt.properties.value === 1) Deferred.doneUnsafe(first, Effect.void)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
unsub()
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 2 })
await Bun.sleep(10)
})
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* Deferred.await(first).pipe(Effect.timeout("2 seconds"))
yield* Effect.sync(unsub)
yield* bus.publish(TestEvent.Ping, { value: 2 })
yield* Effect.sleep("10 millis")
expect(received).toEqual([1])
})
expect(received).toEqual([1])
}),
)
})
describe("subscribeAll", () => {
test("subscribeAll is live immediately after subscribe returns", async () => {
await using tmp = await tmpdir()
const received: string[] = []
it.instance("subscribeAll is live immediately after subscribe returns", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: string[] = []
const done = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
yield* bus.subscribeAllCallback((evt) => {
received.push(evt.type)
Deferred.doneUnsafe(done, Effect.void)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* Deferred.await(done).pipe(Effect.timeout("2 seconds"))
expect(received).toEqual(["test.ping"])
})
expect(received).toEqual(["test.ping"])
}),
)
test("receives all event types", async () => {
await using tmp = await tmpdir()
const received: string[] = []
it.instance("receives all event types", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: string[] = []
const done = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
yield* bus.subscribeAllCallback((evt) => {
received.push(evt.type)
if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bus.publish(TestEvent.Pong, { message: "hi" })
await Bun.sleep(10)
})
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Pong, { message: "hi" })
yield* Deferred.await(done).pipe(Effect.timeout("2 seconds"))
expect(received).toContain("test.ping")
expect(received).toContain("test.pong")
})
expect(received).toContain("test.ping")
expect(received).toContain("test.pong")
}),
)
})
describe("multiple subscribers", () => {
test("all subscribers for same event type are called", async () => {
await using tmp = await tmpdir()
const a: number[] = []
const b: number[] = []
it.instance("all subscribers for same event type are called", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const a: number[] = []
const b: number[] = []
const doneA = yield* Deferred.make<void>()
const doneB = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
yield* bus.subscribeCallback(TestEvent.Ping, (evt) => {
a.push(evt.properties.value)
Deferred.doneUnsafe(doneA, Effect.void)
})
Bus.subscribe(TestEvent.Ping, (evt) => {
yield* bus.subscribeCallback(TestEvent.Ping, (evt) => {
b.push(evt.properties.value)
Deferred.doneUnsafe(doneB, Effect.void)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 7 })
await Bun.sleep(10)
})
yield* bus.publish(TestEvent.Ping, { value: 7 })
yield* Deferred.await(doneA).pipe(Effect.timeout("2 seconds"))
yield* Deferred.await(doneB).pipe(Effect.timeout("2 seconds"))
expect(a).toEqual([7])
expect(b).toEqual([7])
})
expect(a).toEqual([7])
expect(b).toEqual([7])
}),
)
})
describe("instance isolation", () => {
test("events in one directory do not reach subscribers in another", async () => {
await using tmpA = await tmpdir()
await using tmpB = await tmpdir()
const receivedA: number[] = []
const receivedB: number[] = []
it.live("events in one directory do not reach subscribers in another", () =>
Effect.gen(function* () {
const tmpA = yield* tmpdirScoped()
const tmpB = yield* tmpdirScoped()
const receivedA: number[] = []
const receivedB: number[] = []
const doneA = yield* Deferred.make<void>()
const doneB = yield* Deferred.make<void>()
await withInstance(tmpA.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
receivedA.push(evt.properties.value)
})
await Bun.sleep(10)
})
yield* Effect.gen(function* () {
const bus = yield* Bus.Service
yield* bus.subscribeCallback(TestEvent.Ping, (evt) => {
receivedA.push(evt.properties.value)
Deferred.doneUnsafe(doneA, Effect.void)
})
}).pipe(provideInstance(tmpA))
await withInstance(tmpB.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
receivedB.push(evt.properties.value)
})
await Bun.sleep(10)
})
yield* Effect.gen(function* () {
const bus = yield* Bus.Service
yield* bus.subscribeCallback(TestEvent.Ping, (evt) => {
receivedB.push(evt.properties.value)
Deferred.doneUnsafe(doneB, Effect.void)
})
}).pipe(provideInstance(tmpB))
await withInstance(tmpA.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
yield* Effect.gen(function* () {
const bus = yield* Bus.Service
yield* bus.publish(TestEvent.Ping, { value: 1 })
}).pipe(provideInstance(tmpA))
await withInstance(tmpB.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 2 })
await Bun.sleep(10)
})
yield* Effect.gen(function* () {
const bus = yield* Bus.Service
yield* bus.publish(TestEvent.Ping, { value: 2 })
}).pipe(provideInstance(tmpB))
expect(receivedA).toEqual([1])
expect(receivedB).toEqual([2])
})
yield* Deferred.await(doneA).pipe(Effect.timeout("2 seconds"))
yield* Deferred.await(doneB).pipe(Effect.timeout("2 seconds"))
expect(receivedA).toEqual([1])
expect(receivedB).toEqual([2])
}),
)
})
describe("instance disposal", () => {
test("InstanceDisposed is delivered to wildcard subscribers before stream ends", async () => {
await using tmp = await tmpdir()
const received: string[] = []
it.live("InstanceDisposed is delivered to wildcard subscribers before stream ends", () =>
Effect.gen(function* () {
const tmp = yield* tmpdirScoped()
const received: string[] = []
const seen = yield* Deferred.make<void>()
const disposed = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
received.push(evt.type)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
yield* Effect.gen(function* () {
const bus = yield* Bus.Service
yield* bus.subscribeAllCallback((evt) => {
received.push(evt.type)
if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void)
if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void)
})
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* Deferred.await(seen).pipe(Effect.timeout("2 seconds"))
}).pipe(provideInstance(tmp))
// disposeAllInstances triggers the finalizer which publishes InstanceDisposed
await disposeAllInstances()
await Bun.sleep(50)
yield* Effect.promise(disposeAllInstances)
yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds"))
expect(received).toContain("test.ping")
expect(received).toContain(Bus.InstanceDisposed.type)
})
expect(received).toContain("test.ping")
expect(received).toContain(Bus.InstanceDisposed.type)
}),
)
})
})