diff --git a/packages/opencode/test/bus/bus-effect.test.ts b/packages/opencode/test/bus/bus-effect.test.ts index 377c541096..0dc58ecdae 100644 --- a/packages/opencode/test/bus/bus-effect.test.ts +++ b/packages/opencode/test/bus/bus-effect.test.ts @@ -1,5 +1,5 @@ import { describe, expect } from "bun:test" -import { Deferred, Effect, Layer, Schema, Stream } from "effect" +import { Deferred, Effect, Fiber, Latch, Layer, Schema, Stream } from "effect" import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" @@ -9,6 +9,7 @@ import { testEffect } from "../lib/effect" const TestEvent = { Ping: BusEvent.define("test.effect.ping", Schema.Struct({ value: Schema.Number })), Pong: BusEvent.define("test.effect.pong", Schema.Struct({ message: Schema.String })), + Warmup: BusEvent.define("test.effect.warmup", Schema.Struct({})), } const node = CrossSpawnSpawner.defaultLayer @@ -17,21 +18,44 @@ const live = Layer.mergeAll(Bus.layer, node) const it = testEffect(live) +// Publishes warmup events until the latch opens, proving the forked subscriber +// fiber has actually wired up its PubSub subscription. +const awaitSubscriberReady = Effect.fn("test.awaitSubscriberReady")(function* ( + ready: Latch.Latch, + warmup: Effect.Effect, +) { + const pump = yield* Effect.forkScoped( + Effect.gen(function* () { + while (true) { + yield* warmup + yield* Effect.sleep("5 millis") + } + }), + ) + yield* ready.await.pipe(Effect.timeout("2 seconds")) + yield* Fiber.interrupt(pump) +}) + describe("Bus (Effect-native)", () => { it.instance("publish + subscribe stream delivers events", () => Effect.gen(function* () { const bus = yield* Bus.Service const received: number[] = [] const done = yield* Deferred.make() + const ready = yield* Latch.make() yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => - Effect.sync(() => { + Effect.gen(function* () { + if (evt.properties.value < 0) { + yield* ready.open + return + } received.push(evt.properties.value) if (received.length === 2) Deferred.doneUnsafe(done, Effect.void) }), ).pipe(Effect.forkScoped) - yield* Effect.sleep("10 millis") + yield* awaitSubscriberReady(ready, bus.publish(TestEvent.Ping, { value: -1 })) yield* bus.publish(TestEvent.Ping, { value: 1 }) yield* bus.publish(TestEvent.Ping, { value: 2 }) yield* Deferred.await(done) @@ -45,15 +69,20 @@ describe("Bus (Effect-native)", () => { const bus = yield* Bus.Service const pings: number[] = [] const done = yield* Deferred.make() + const ready = yield* Latch.make() yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => - Effect.sync(() => { + Effect.gen(function* () { + if (evt.properties.value < 0) { + yield* ready.open + return + } pings.push(evt.properties.value) Deferred.doneUnsafe(done, Effect.void) }), ).pipe(Effect.forkScoped) - yield* Effect.sleep("10 millis") + yield* awaitSubscriberReady(ready, bus.publish(TestEvent.Ping, { value: -1 })) yield* bus.publish(TestEvent.Pong, { message: "ignored" }) yield* bus.publish(TestEvent.Ping, { value: 42 }) yield* Deferred.await(done) @@ -67,15 +96,20 @@ describe("Bus (Effect-native)", () => { const bus = yield* Bus.Service const types: string[] = [] const done = yield* Deferred.make() + const ready = yield* Latch.make() yield* Stream.runForEach(bus.subscribeAll(), (evt) => - Effect.sync(() => { + Effect.gen(function* () { + if (evt.type === TestEvent.Warmup.type) { + yield* ready.open + return + } types.push(evt.type) if (types.length === 2) Deferred.doneUnsafe(done, Effect.void) }), ).pipe(Effect.forkScoped) - yield* Effect.sleep("10 millis") + yield* awaitSubscriberReady(ready, bus.publish(TestEvent.Warmup, {})) yield* bus.publish(TestEvent.Ping, { value: 1 }) yield* bus.publish(TestEvent.Pong, { message: "hi" }) yield* Deferred.await(done) @@ -92,22 +126,33 @@ describe("Bus (Effect-native)", () => { const b: number[] = [] const doneA = yield* Deferred.make() const doneB = yield* Deferred.make() + const readyA = yield* Latch.make() + const readyB = yield* Latch.make() yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => - Effect.sync(() => { + Effect.gen(function* () { + if (evt.properties.value < 0) { + yield* readyA.open + return + } a.push(evt.properties.value) Deferred.doneUnsafe(doneA, Effect.void) }), ).pipe(Effect.forkScoped) yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => - Effect.sync(() => { + Effect.gen(function* () { + if (evt.properties.value < 0) { + yield* readyB.open + return + } b.push(evt.properties.value) Deferred.doneUnsafe(doneB, Effect.void) }), ).pipe(Effect.forkScoped) - yield* Effect.sleep("10 millis") + yield* awaitSubscriberReady(readyA, bus.publish(TestEvent.Ping, { value: -1 })) + yield* awaitSubscriberReady(readyB, bus.publish(TestEvent.Ping, { value: -1 })) yield* bus.publish(TestEvent.Ping, { value: 99 }) yield* Deferred.await(doneA) yield* Deferred.await(doneB) @@ -123,20 +168,25 @@ describe("Bus (Effect-native)", () => { const types: string[] = [] const seen = yield* Deferred.make() const disposed = yield* Deferred.make() + const ready = yield* Latch.make() // Set up subscriber inside the instance yield* Effect.gen(function* () { const bus = yield* Bus.Service yield* Stream.runForEach(bus.subscribeAll(), (evt) => - Effect.sync(() => { + Effect.gen(function* () { + if (evt.type === TestEvent.Warmup.type) { + yield* ready.open + return + } types.push(evt.type) if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void) if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void) }), ).pipe(Effect.forkScoped) - yield* Effect.sleep("10 millis") + yield* awaitSubscriberReady(ready, bus.publish(TestEvent.Warmup, {})) yield* bus.publish(TestEvent.Ping, { value: 1 }) yield* Deferred.await(seen) }).pipe(provideInstance(dir))