test(bus): migrate integration tests to Effect runner (#27132)

This commit is contained in:
Kit Langton
2026-05-12 15:58:54 -04:00
committed by GitHub
parent f7dbb4dac4
commit e0d0fe1ff7

View File

@@ -1,88 +1,88 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Schema } from "effect"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { afterEach, describe, expect } from "bun:test"
import { Deferred, Effect, Layer, Schema } from "effect"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { WithInstance } from "../../src/project/with-instance"
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
import { disposeAllInstances, provideInstance, tmpdirScoped } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
const TestEvent = BusEvent.define("test.integration", Schema.Struct({ value: Schema.Number }))
function withInstance(directory: string, fn: () => Promise<void>) {
return WithInstance.provide({ directory, fn })
}
const it = testEffect(Layer.mergeAll(Bus.layer, CrossSpawnSpawner.defaultLayer))
describe("Bus integration: acquireRelease subscriber pattern", () => {
afterEach(() => disposeAllInstances())
test("subscriber via callback facade receives events and cleans up on unsub", async () => {
await using tmp = await tmpdir()
const received: number[] = []
it.instance("subscriber via callback facade receives events and cleans up on unsub", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: number[] = []
const receivedTwo = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent, (evt) => {
const unsub = yield* bus.subscribeCallback(TestEvent, (evt) => {
received.push(evt.properties.value)
if (received.length === 2) Deferred.doneUnsafe(receivedTwo, Effect.void)
})
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 1 })
await Bus.publish(TestEvent, { value: 2 })
await Bun.sleep(10)
yield* bus.publish(TestEvent, { value: 1 })
yield* bus.publish(TestEvent, { value: 2 })
yield* Deferred.await(receivedTwo).pipe(Effect.timeout("2 seconds"))
expect(received).toEqual([1, 2])
unsub()
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 3 })
await Bun.sleep(10)
yield* Effect.sync(unsub)
yield* bus.publish(TestEvent, { value: 3 })
yield* Effect.sleep("10 millis")
expect(received).toEqual([1, 2])
})
})
}),
)
test("subscribeAll receives events from multiple types", async () => {
await using tmp = await tmpdir()
const received: Array<{ type: string; value?: number }> = []
it.instance("subscribeAll receives events from multiple types", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: Array<{ type: string; value?: number }> = []
const OtherEvent = BusEvent.define("test.other", Schema.Struct({ value: Schema.Number }))
const receivedTwo = yield* Deferred.make<void>()
const OtherEvent = BusEvent.define("test.other", Schema.Struct({ value: Schema.Number }))
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
yield* bus.subscribeAllCallback((evt) => {
received.push({ type: evt.type, value: evt.properties.value })
if (received.length === 2) Deferred.doneUnsafe(receivedTwo, Effect.void)
})
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 10 })
await Bus.publish(OtherEvent, { value: 20 })
await Bun.sleep(10)
})
yield* bus.publish(TestEvent, { value: 10 })
yield* bus.publish(OtherEvent, { value: 20 })
yield* Deferred.await(receivedTwo).pipe(Effect.timeout("2 seconds"))
expect(received).toEqual([
{ type: "test.integration", value: 10 },
{ type: "test.other", value: 20 },
])
})
expect(received).toEqual([
{ type: "test.integration", value: 10 },
{ type: "test.other", value: 20 },
])
}),
)
test("subscriber cleanup on instance disposal interrupts the stream", async () => {
await using tmp = await tmpdir()
const received: number[] = []
let disposed = false
it.live("subscriber cleanup on instance disposal interrupts the stream", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped()
const received: number[] = []
const seen = yield* Deferred.make<void>()
const disposed = yield* Deferred.make<void>()
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
if (evt.type === Bus.InstanceDisposed.type) {
disposed = true
return
}
received.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 1 })
await Bun.sleep(10)
})
yield* Effect.gen(function* () {
const bus = yield* Bus.Service
yield* bus.subscribeAllCallback((evt) => {
if (evt.type === Bus.InstanceDisposed.type) {
Deferred.doneUnsafe(disposed, Effect.void)
return
}
received.push(evt.properties.value)
Deferred.doneUnsafe(seen, Effect.void)
})
yield* bus.publish(TestEvent, { value: 1 })
yield* Deferred.await(seen).pipe(Effect.timeout("2 seconds"))
}).pipe(provideInstance(dir))
await disposeAllInstances()
await Bun.sleep(50)
yield* Effect.promise(() => disposeAllInstances())
yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds"))
expect(received).toEqual([1])
expect(disposed).toBe(true)
})
expect(received).toEqual([1])
}),
)
})