From e0d0fe1ff79b2e68fbf496f3957b63aa5bcd53a8 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 12 May 2026 15:58:54 -0400 Subject: [PATCH] test(bus): migrate integration tests to Effect runner (#27132) --- .../opencode/test/bus/bus-integration.test.ts | 124 +++++++++--------- 1 file changed, 62 insertions(+), 62 deletions(-) diff --git a/packages/opencode/test/bus/bus-integration.test.ts b/packages/opencode/test/bus/bus-integration.test.ts index 3e3d7a3e90..645a94fb3b 100644 --- a/packages/opencode/test/bus/bus-integration.test.ts +++ b/packages/opencode/test/bus/bus-integration.test.ts @@ -1,88 +1,88 @@ -import { afterEach, describe, expect, test } from "bun:test" -import { Schema } from "effect" +import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" +import { afterEach, describe, expect } from "bun:test" +import { Deferred, Effect, Layer, Schema } from "effect" import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" -import { Instance } from "../../src/project/instance" -import { WithInstance } from "../../src/project/with-instance" -import { disposeAllInstances, tmpdir } from "../fixture/fixture" +import { disposeAllInstances, provideInstance, tmpdirScoped } from "../fixture/fixture" +import { testEffect } from "../lib/effect" const TestEvent = BusEvent.define("test.integration", Schema.Struct({ value: Schema.Number })) - -function withInstance(directory: string, fn: () => Promise) { - return WithInstance.provide({ directory, fn }) -} +const it = testEffect(Layer.mergeAll(Bus.layer, CrossSpawnSpawner.defaultLayer)) describe("Bus integration: acquireRelease subscriber pattern", () => { afterEach(() => disposeAllInstances()) - test("subscriber via callback facade receives events and cleans up on unsub", async () => { - await using tmp = await tmpdir() - const received: number[] = [] + it.instance("subscriber via callback facade receives events and cleans up on unsub", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const received: number[] = [] + const receivedTwo = yield* Deferred.make() - await withInstance(tmp.path, async () => { - const unsub = Bus.subscribe(TestEvent, (evt) => { + const unsub = yield* bus.subscribeCallback(TestEvent, (evt) => { received.push(evt.properties.value) + if (received.length === 2) Deferred.doneUnsafe(receivedTwo, Effect.void) }) - await Bun.sleep(10) - await Bus.publish(TestEvent, { value: 1 }) - await Bus.publish(TestEvent, { value: 2 }) - await Bun.sleep(10) + yield* bus.publish(TestEvent, { value: 1 }) + yield* bus.publish(TestEvent, { value: 2 }) + yield* Deferred.await(receivedTwo).pipe(Effect.timeout("2 seconds")) expect(received).toEqual([1, 2]) - unsub() - await Bun.sleep(10) - await Bus.publish(TestEvent, { value: 3 }) - await Bun.sleep(10) + yield* Effect.sync(unsub) + yield* bus.publish(TestEvent, { value: 3 }) + yield* Effect.sleep("10 millis") expect(received).toEqual([1, 2]) - }) - }) + }), + ) - test("subscribeAll receives events from multiple types", async () => { - await using tmp = await tmpdir() - const received: Array<{ type: string; value?: number }> = [] + it.instance("subscribeAll receives events from multiple types", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const received: Array<{ type: string; value?: number }> = [] + const OtherEvent = BusEvent.define("test.other", Schema.Struct({ value: Schema.Number })) + const receivedTwo = yield* Deferred.make() - const OtherEvent = BusEvent.define("test.other", Schema.Struct({ value: Schema.Number })) - - await withInstance(tmp.path, async () => { - Bus.subscribeAll((evt) => { + yield* bus.subscribeAllCallback((evt) => { received.push({ type: evt.type, value: evt.properties.value }) + if (received.length === 2) Deferred.doneUnsafe(receivedTwo, Effect.void) }) - await Bun.sleep(10) - await Bus.publish(TestEvent, { value: 10 }) - await Bus.publish(OtherEvent, { value: 20 }) - await Bun.sleep(10) - }) + yield* bus.publish(TestEvent, { value: 10 }) + yield* bus.publish(OtherEvent, { value: 20 }) + yield* Deferred.await(receivedTwo).pipe(Effect.timeout("2 seconds")) - expect(received).toEqual([ - { type: "test.integration", value: 10 }, - { type: "test.other", value: 20 }, - ]) - }) + expect(received).toEqual([ + { type: "test.integration", value: 10 }, + { type: "test.other", value: 20 }, + ]) + }), + ) - test("subscriber cleanup on instance disposal interrupts the stream", async () => { - await using tmp = await tmpdir() - const received: number[] = [] - let disposed = false + it.live("subscriber cleanup on instance disposal interrupts the stream", () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped() + const received: number[] = [] + const seen = yield* Deferred.make() + const disposed = yield* Deferred.make() - await withInstance(tmp.path, async () => { - Bus.subscribeAll((evt) => { - if (evt.type === Bus.InstanceDisposed.type) { - disposed = true - return - } - received.push(evt.properties.value) - }) - await Bun.sleep(10) - await Bus.publish(TestEvent, { value: 1 }) - await Bun.sleep(10) - }) + yield* Effect.gen(function* () { + const bus = yield* Bus.Service + yield* bus.subscribeAllCallback((evt) => { + if (evt.type === Bus.InstanceDisposed.type) { + Deferred.doneUnsafe(disposed, Effect.void) + return + } + received.push(evt.properties.value) + Deferred.doneUnsafe(seen, Effect.void) + }) + yield* bus.publish(TestEvent, { value: 1 }) + yield* Deferred.await(seen).pipe(Effect.timeout("2 seconds")) + }).pipe(provideInstance(dir)) - await disposeAllInstances() - await Bun.sleep(50) + yield* Effect.promise(() => disposeAllInstances()) + yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds")) - expect(received).toEqual([1]) - expect(disposed).toBe(true) - }) + expect(received).toEqual([1]) + }), + ) })