test(bus): fix flaky subscriber races with readiness latch (#27625)

This commit is contained in:
Kit Langton
2026-05-14 19:32:25 -04:00
committed by GitHub
parent 302ba0ca0b
commit 273ab56949

View File

@@ -1,5 +1,5 @@
import { describe, expect } from "bun:test"
import { Deferred, Effect, Layer, Schema, Stream } from "effect"
import { Deferred, Effect, Fiber, Latch, Layer, Schema, Stream } from "effect"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
@@ -9,6 +9,7 @@ import { testEffect } from "../lib/effect"
const TestEvent = {
Ping: BusEvent.define("test.effect.ping", Schema.Struct({ value: Schema.Number })),
Pong: BusEvent.define("test.effect.pong", Schema.Struct({ message: Schema.String })),
Warmup: BusEvent.define("test.effect.warmup", Schema.Struct({})),
}
const node = CrossSpawnSpawner.defaultLayer
@@ -17,21 +18,44 @@ const live = Layer.mergeAll(Bus.layer, node)
const it = testEffect(live)
// Publishes warmup events until the latch opens, proving the forked subscriber
// fiber has actually wired up its PubSub subscription.
const awaitSubscriberReady = Effect.fn("test.awaitSubscriberReady")(function* (
ready: Latch.Latch,
warmup: Effect.Effect<void>,
) {
const pump = yield* Effect.forkScoped(
Effect.gen(function* () {
while (true) {
yield* warmup
yield* Effect.sleep("5 millis")
}
}),
)
yield* ready.await.pipe(Effect.timeout("2 seconds"))
yield* Fiber.interrupt(pump)
})
describe("Bus (Effect-native)", () => {
it.instance("publish + subscribe stream delivers events", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: number[] = []
const done = yield* Deferred.make<void>()
const ready = yield* Latch.make()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
Effect.gen(function* () {
if (evt.properties.value < 0) {
yield* ready.open
return
}
received.push(evt.properties.value)
if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* awaitSubscriberReady(ready, bus.publish(TestEvent.Ping, { value: -1 }))
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Ping, { value: 2 })
yield* Deferred.await(done)
@@ -45,15 +69,20 @@ describe("Bus (Effect-native)", () => {
const bus = yield* Bus.Service
const pings: number[] = []
const done = yield* Deferred.make<void>()
const ready = yield* Latch.make()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
Effect.gen(function* () {
if (evt.properties.value < 0) {
yield* ready.open
return
}
pings.push(evt.properties.value)
Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* awaitSubscriberReady(ready, bus.publish(TestEvent.Ping, { value: -1 }))
yield* bus.publish(TestEvent.Pong, { message: "ignored" })
yield* bus.publish(TestEvent.Ping, { value: 42 })
yield* Deferred.await(done)
@@ -67,15 +96,20 @@ describe("Bus (Effect-native)", () => {
const bus = yield* Bus.Service
const types: string[] = []
const done = yield* Deferred.make<void>()
const ready = yield* Latch.make()
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
Effect.sync(() => {
Effect.gen(function* () {
if (evt.type === TestEvent.Warmup.type) {
yield* ready.open
return
}
types.push(evt.type)
if (types.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* awaitSubscriberReady(ready, bus.publish(TestEvent.Warmup, {}))
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Pong, { message: "hi" })
yield* Deferred.await(done)
@@ -92,22 +126,33 @@ describe("Bus (Effect-native)", () => {
const b: number[] = []
const doneA = yield* Deferred.make<void>()
const doneB = yield* Deferred.make<void>()
const readyA = yield* Latch.make()
const readyB = yield* Latch.make()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
Effect.gen(function* () {
if (evt.properties.value < 0) {
yield* readyA.open
return
}
a.push(evt.properties.value)
Deferred.doneUnsafe(doneA, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
Effect.gen(function* () {
if (evt.properties.value < 0) {
yield* readyB.open
return
}
b.push(evt.properties.value)
Deferred.doneUnsafe(doneB, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* awaitSubscriberReady(readyA, bus.publish(TestEvent.Ping, { value: -1 }))
yield* awaitSubscriberReady(readyB, bus.publish(TestEvent.Ping, { value: -1 }))
yield* bus.publish(TestEvent.Ping, { value: 99 })
yield* Deferred.await(doneA)
yield* Deferred.await(doneB)
@@ -123,20 +168,25 @@ describe("Bus (Effect-native)", () => {
const types: string[] = []
const seen = yield* Deferred.make<void>()
const disposed = yield* Deferred.make<void>()
const ready = yield* Latch.make()
// Set up subscriber inside the instance
yield* Effect.gen(function* () {
const bus = yield* Bus.Service
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
Effect.sync(() => {
Effect.gen(function* () {
if (evt.type === TestEvent.Warmup.type) {
yield* ready.open
return
}
types.push(evt.type)
if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void)
if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* awaitSubscriberReady(ready, bus.publish(TestEvent.Warmup, {}))
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* Deferred.await(seen)
}).pipe(provideInstance(dir))