diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 464af05c72..db6327c82e 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -131,7 +131,6 @@ export namespace Bus { try: () => Promise.resolve().then(() => callback(msg)), catch: (cause) => { log.error("subscriber failed", { type, cause }) - return cause }, }).pipe(Effect.ignore), ), diff --git a/packages/opencode/test/bus/bus-effect.test.ts b/packages/opencode/test/bus/bus-effect.test.ts index cc44f9ef78..642763e90f 100644 --- a/packages/opencode/test/bus/bus-effect.test.ts +++ b/packages/opencode/test/bus/bus-effect.test.ts @@ -4,7 +4,8 @@ import { Deferred, Effect, Layer, Stream } from "effect" import z from "zod" import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" -import { provideTmpdirInstance } from "../fixture/fixture" +import { Instance } from "../../src/project/instance" +import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture" import { testEffect } from "../lib/effect" const TestEvent = { @@ -127,4 +128,37 @@ describe("Bus (Effect-native)", () => { }), ), ) + + it.effect("subscribeAll stream sees InstanceDisposed on disposal", () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped() + const types: string[] = [] + const seen = yield* Deferred.make() + const disposed = yield* Deferred.make() + + // Set up subscriber inside the instance + yield* Effect.gen(function* () { + const bus = yield* Bus.Service + + yield* Stream.runForEach(bus.subscribeAll(), (evt) => + Effect.sync(() => { + types.push(evt.type) + if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void) + if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void) + }), + ).pipe(Effect.forkScoped) + + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* Deferred.await(seen) + }).pipe(provideInstance(dir)) + + // Dispose from OUTSIDE the instance scope + yield* Effect.promise(() => Instance.disposeAll()) + yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds")) + + expect(types).toContain("test.effect.ping") + expect(types).toContain(Bus.InstanceDisposed.type) + }), + ) })