diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts index 876cb1ed74..0844986162 100644 --- a/packages/opencode/test/bus/bus.test.ts +++ b/packages/opencode/test/bus/bus.test.ts @@ -1,220 +1,240 @@ -import { afterEach, describe, expect, test } from "bun:test" -import { Schema } from "effect" +import { afterEach, describe, expect } from "bun:test" +import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" +import { Deferred, Effect, Layer, Schema } from "effect" import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" -import { Instance } from "../../src/project/instance" -import { WithInstance } from "../../src/project/with-instance" -import { disposeAllInstances, tmpdir } from "../fixture/fixture" +import { disposeAllInstances, provideInstance, tmpdirScoped } from "../fixture/fixture" +import { testEffect } from "../lib/effect" const TestEvent = { Ping: BusEvent.define("test.ping", Schema.Struct({ value: Schema.Number })), Pong: BusEvent.define("test.pong", Schema.Struct({ message: Schema.String })), } -function withInstance(directory: string, fn: () => Promise) { - return WithInstance.provide({ directory, fn }) -} +const it = testEffect(Layer.mergeAll(Bus.layer, CrossSpawnSpawner.defaultLayer)) describe("Bus", () => { afterEach(() => disposeAllInstances()) describe("publish + subscribe", () => { - test("subscriber is live immediately after subscribe returns", async () => { - await using tmp = await tmpdir() - const received: number[] = [] + it.instance("subscriber is live immediately after subscribe returns", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const received: number[] = [] + const done = yield* Deferred.make() - await withInstance(tmp.path, async () => { - Bus.subscribe(TestEvent.Ping, (evt) => { + yield* bus.subscribeCallback(TestEvent.Ping, (evt) => { received.push(evt.properties.value) + Deferred.doneUnsafe(done, Effect.void) }) - await Bus.publish(TestEvent.Ping, { value: 42 }) - await Bun.sleep(10) - }) + yield* bus.publish(TestEvent.Ping, { value: 42 }) + yield* Deferred.await(done).pipe(Effect.timeout("2 seconds")) - expect(received).toEqual([42]) - }) + expect(received).toEqual([42]) + }), + ) - test("subscriber receives matching events", async () => { - await using tmp = await tmpdir() - const received: number[] = [] + it.instance("subscriber receives matching events", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const received: number[] = [] + const done = yield* Deferred.make() - await withInstance(tmp.path, async () => { - Bus.subscribe(TestEvent.Ping, (evt) => { + yield* bus.subscribeCallback(TestEvent.Ping, (evt) => { received.push(evt.properties.value) + if (received.length === 2) Deferred.doneUnsafe(done, Effect.void) }) - // Give the subscriber fiber time to start consuming - await Bun.sleep(10) - await Bus.publish(TestEvent.Ping, { value: 42 }) - await Bus.publish(TestEvent.Ping, { value: 99 }) - // Give subscriber time to process - await Bun.sleep(10) - }) + yield* bus.publish(TestEvent.Ping, { value: 42 }) + yield* bus.publish(TestEvent.Ping, { value: 99 }) + yield* Deferred.await(done).pipe(Effect.timeout("2 seconds")) - expect(received).toEqual([42, 99]) - }) + expect(received).toEqual([42, 99]) + }), + ) - test("subscriber does not receive events of other types", async () => { - await using tmp = await tmpdir() - const pings: number[] = [] + it.instance("subscriber does not receive events of other types", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const pings: number[] = [] + const done = yield* Deferred.make() - await withInstance(tmp.path, async () => { - Bus.subscribe(TestEvent.Ping, (evt) => { + yield* bus.subscribeCallback(TestEvent.Ping, (evt) => { pings.push(evt.properties.value) + Deferred.doneUnsafe(done, Effect.void) }) - await Bun.sleep(10) - await Bus.publish(TestEvent.Pong, { message: "hello" }) - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bun.sleep(10) - }) + yield* bus.publish(TestEvent.Pong, { message: "hello" }) + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* Deferred.await(done).pipe(Effect.timeout("2 seconds")) - expect(pings).toEqual([1]) - }) + expect(pings).toEqual([1]) + }), + ) - test("publish with no subscribers does not throw", async () => { - await using tmp = await tmpdir() - - await withInstance(tmp.path, async () => { - await Bus.publish(TestEvent.Ping, { value: 1 }) - }) - }) + it.instance("publish with no subscribers does not throw", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + yield* bus.publish(TestEvent.Ping, { value: 1 }) + }), + ) }) describe("unsubscribe", () => { - test("unsubscribe stops delivery", async () => { - await using tmp = await tmpdir() - const received: number[] = [] + it.instance("unsubscribe stops delivery", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const received: number[] = [] + const first = yield* Deferred.make() - await withInstance(tmp.path, async () => { - const unsub = Bus.subscribe(TestEvent.Ping, (evt) => { + const unsub = yield* bus.subscribeCallback(TestEvent.Ping, (evt) => { received.push(evt.properties.value) + if (evt.properties.value === 1) Deferred.doneUnsafe(first, Effect.void) }) - await Bun.sleep(10) - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bun.sleep(10) - unsub() - await Bun.sleep(10) - await Bus.publish(TestEvent.Ping, { value: 2 }) - await Bun.sleep(10) - }) + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* Deferred.await(first).pipe(Effect.timeout("2 seconds")) + yield* Effect.sync(unsub) + yield* bus.publish(TestEvent.Ping, { value: 2 }) + yield* Effect.sleep("10 millis") - expect(received).toEqual([1]) - }) + expect(received).toEqual([1]) + }), + ) }) describe("subscribeAll", () => { - test("subscribeAll is live immediately after subscribe returns", async () => { - await using tmp = await tmpdir() - const received: string[] = [] + it.instance("subscribeAll is live immediately after subscribe returns", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const received: string[] = [] + const done = yield* Deferred.make() - await withInstance(tmp.path, async () => { - Bus.subscribeAll((evt) => { + yield* bus.subscribeAllCallback((evt) => { received.push(evt.type) + Deferred.doneUnsafe(done, Effect.void) }) - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bun.sleep(10) - }) + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* Deferred.await(done).pipe(Effect.timeout("2 seconds")) - expect(received).toEqual(["test.ping"]) - }) + expect(received).toEqual(["test.ping"]) + }), + ) - test("receives all event types", async () => { - await using tmp = await tmpdir() - const received: string[] = [] + it.instance("receives all event types", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const received: string[] = [] + const done = yield* Deferred.make() - await withInstance(tmp.path, async () => { - Bus.subscribeAll((evt) => { + yield* bus.subscribeAllCallback((evt) => { received.push(evt.type) + if (received.length === 2) Deferred.doneUnsafe(done, Effect.void) }) - await Bun.sleep(10) - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bus.publish(TestEvent.Pong, { message: "hi" }) - await Bun.sleep(10) - }) + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* bus.publish(TestEvent.Pong, { message: "hi" }) + yield* Deferred.await(done).pipe(Effect.timeout("2 seconds")) - expect(received).toContain("test.ping") - expect(received).toContain("test.pong") - }) + expect(received).toContain("test.ping") + expect(received).toContain("test.pong") + }), + ) }) describe("multiple subscribers", () => { - test("all subscribers for same event type are called", async () => { - await using tmp = await tmpdir() - const a: number[] = [] - const b: number[] = [] + it.instance("all subscribers for same event type are called", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const a: number[] = [] + const b: number[] = [] + const doneA = yield* Deferred.make() + const doneB = yield* Deferred.make() - await withInstance(tmp.path, async () => { - Bus.subscribe(TestEvent.Ping, (evt) => { + yield* bus.subscribeCallback(TestEvent.Ping, (evt) => { a.push(evt.properties.value) + Deferred.doneUnsafe(doneA, Effect.void) }) - Bus.subscribe(TestEvent.Ping, (evt) => { + yield* bus.subscribeCallback(TestEvent.Ping, (evt) => { b.push(evt.properties.value) + Deferred.doneUnsafe(doneB, Effect.void) }) - await Bun.sleep(10) - await Bus.publish(TestEvent.Ping, { value: 7 }) - await Bun.sleep(10) - }) + yield* bus.publish(TestEvent.Ping, { value: 7 }) + yield* Deferred.await(doneA).pipe(Effect.timeout("2 seconds")) + yield* Deferred.await(doneB).pipe(Effect.timeout("2 seconds")) - expect(a).toEqual([7]) - expect(b).toEqual([7]) - }) + expect(a).toEqual([7]) + expect(b).toEqual([7]) + }), + ) }) describe("instance isolation", () => { - test("events in one directory do not reach subscribers in another", async () => { - await using tmpA = await tmpdir() - await using tmpB = await tmpdir() - const receivedA: number[] = [] - const receivedB: number[] = [] + it.live("events in one directory do not reach subscribers in another", () => + Effect.gen(function* () { + const tmpA = yield* tmpdirScoped() + const tmpB = yield* tmpdirScoped() + const receivedA: number[] = [] + const receivedB: number[] = [] + const doneA = yield* Deferred.make() + const doneB = yield* Deferred.make() - await withInstance(tmpA.path, async () => { - Bus.subscribe(TestEvent.Ping, (evt) => { - receivedA.push(evt.properties.value) - }) - await Bun.sleep(10) - }) + yield* Effect.gen(function* () { + const bus = yield* Bus.Service + yield* bus.subscribeCallback(TestEvent.Ping, (evt) => { + receivedA.push(evt.properties.value) + Deferred.doneUnsafe(doneA, Effect.void) + }) + }).pipe(provideInstance(tmpA)) - await withInstance(tmpB.path, async () => { - Bus.subscribe(TestEvent.Ping, (evt) => { - receivedB.push(evt.properties.value) - }) - await Bun.sleep(10) - }) + yield* Effect.gen(function* () { + const bus = yield* Bus.Service + yield* bus.subscribeCallback(TestEvent.Ping, (evt) => { + receivedB.push(evt.properties.value) + Deferred.doneUnsafe(doneB, Effect.void) + }) + }).pipe(provideInstance(tmpB)) - await withInstance(tmpA.path, async () => { - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bun.sleep(10) - }) + yield* Effect.gen(function* () { + const bus = yield* Bus.Service + yield* bus.publish(TestEvent.Ping, { value: 1 }) + }).pipe(provideInstance(tmpA)) - await withInstance(tmpB.path, async () => { - await Bus.publish(TestEvent.Ping, { value: 2 }) - await Bun.sleep(10) - }) + yield* Effect.gen(function* () { + const bus = yield* Bus.Service + yield* bus.publish(TestEvent.Ping, { value: 2 }) + }).pipe(provideInstance(tmpB)) - expect(receivedA).toEqual([1]) - expect(receivedB).toEqual([2]) - }) + yield* Deferred.await(doneA).pipe(Effect.timeout("2 seconds")) + yield* Deferred.await(doneB).pipe(Effect.timeout("2 seconds")) + + expect(receivedA).toEqual([1]) + expect(receivedB).toEqual([2]) + }), + ) }) describe("instance disposal", () => { - test("InstanceDisposed is delivered to wildcard subscribers before stream ends", async () => { - await using tmp = await tmpdir() - const received: string[] = [] + it.live("InstanceDisposed is delivered to wildcard subscribers before stream ends", () => + Effect.gen(function* () { + const tmp = yield* tmpdirScoped() + const received: string[] = [] + const seen = yield* Deferred.make() + const disposed = yield* Deferred.make() - await withInstance(tmp.path, async () => { - Bus.subscribeAll((evt) => { - received.push(evt.type) - }) - await Bun.sleep(10) - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bun.sleep(10) - }) + yield* Effect.gen(function* () { + const bus = yield* Bus.Service + yield* bus.subscribeAllCallback((evt) => { + received.push(evt.type) + if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void) + if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void) + }) + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* Deferred.await(seen).pipe(Effect.timeout("2 seconds")) + }).pipe(provideInstance(tmp)) - // disposeAllInstances triggers the finalizer which publishes InstanceDisposed - await disposeAllInstances() - await Bun.sleep(50) + yield* Effect.promise(disposeAllInstances) + yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds")) - expect(received).toContain("test.ping") - expect(received).toContain(Bus.InstanceDisposed.type) - }) + expect(received).toContain("test.ping") + expect(received).toContain(Bus.InstanceDisposed.type) + }), + ) }) })