diff --git a/packages/opencode/test/bus/bus-effect.test.ts b/packages/opencode/test/bus/bus-effect.test.ts index 4c3b9d085d..cc44f9ef78 100644 --- a/packages/opencode/test/bus/bus-effect.test.ts +++ b/packages/opencode/test/bus/bus-effect.test.ts @@ -1,156 +1,130 @@ -import { afterEach, describe, expect, test } from "bun:test" -import { Deferred, Effect, ManagedRuntime, Stream } from "effect" +import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node" +import { describe, expect } from "bun:test" +import { Deferred, Effect, Layer, Stream } from "effect" import z from "zod" import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" -import { Instance } from "../../src/project/instance" -import { provideInstance, tmpdir } from "../fixture/fixture" +import { provideTmpdirInstance } from "../fixture/fixture" +import { testEffect } from "../lib/effect" const TestEvent = { Ping: BusEvent.define("test.effect.ping", z.object({ value: z.number() })), Pong: BusEvent.define("test.effect.pong", z.object({ message: z.string() })), } -async function runBus(directory: string, self: Effect.Effect) { - const rt = ManagedRuntime.make(Bus.layer) - try { - return await rt.runPromise(self.pipe(provideInstance(directory))) - } finally { - await rt.dispose() - } -} +const node = NodeChildProcessSpawner.layer.pipe( + Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)), +) + +const live = Layer.mergeAll(Bus.layer, node) + +const it = testEffect(live) describe("Bus (Effect-native)", () => { - afterEach(() => Instance.disposeAll()) + it.effect("publish + subscribe stream delivers events", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const bus = yield* Bus.Service + const received: number[] = [] + const done = yield* Deferred.make() - test("publish + subscribe stream delivers events", async () => { - await using tmp = await tmpdir() - - await runBus( - tmp.path, - Effect.scoped( - Bus.Service.use((bus) => - Effect.gen(function* () { - const received: number[] = [] - const done = yield* Deferred.make() - - yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => - Effect.sync(() => { - received.push(evt.properties.value) - if (received.length === 2) Deferred.doneUnsafe(done, Effect.void) - }), - ).pipe(Effect.forkScoped) - - yield* Effect.sleep("10 millis") - yield* bus.publish(TestEvent.Ping, { value: 1 }) - yield* bus.publish(TestEvent.Ping, { value: 2 }) - yield* Deferred.await(done) - - expect(received).toEqual([1, 2]) + yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + Effect.sync(() => { + received.push(evt.properties.value) + if (received.length === 2) Deferred.doneUnsafe(done, Effect.void) }), - ), - ), - ) - }) + ).pipe(Effect.forkScoped) - test("subscribe filters by event type", async () => { - await using tmp = await tmpdir() + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* bus.publish(TestEvent.Ping, { value: 2 }) + yield* Deferred.await(done) - await runBus( - tmp.path, - Effect.scoped( - Bus.Service.use((bus) => - Effect.gen(function* () { - const pings: number[] = [] - const done = yield* Deferred.make() + expect(received).toEqual([1, 2]) + }), + ), + ) - yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => - Effect.sync(() => { - pings.push(evt.properties.value) - Deferred.doneUnsafe(done, Effect.void) - }), - ).pipe(Effect.forkScoped) + it.effect("subscribe filters by event type", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const bus = yield* Bus.Service + const pings: number[] = [] + const done = yield* Deferred.make() - yield* Effect.sleep("10 millis") - yield* bus.publish(TestEvent.Pong, { message: "ignored" }) - yield* bus.publish(TestEvent.Ping, { value: 42 }) - yield* Deferred.await(done) - - expect(pings).toEqual([42]) + yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + Effect.sync(() => { + pings.push(evt.properties.value) + Deferred.doneUnsafe(done, Effect.void) }), - ), - ), - ) - }) + ).pipe(Effect.forkScoped) - test("subscribeAll receives all types", async () => { - await using tmp = await tmpdir() + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Pong, { message: "ignored" }) + yield* bus.publish(TestEvent.Ping, { value: 42 }) + yield* Deferred.await(done) - await runBus( - tmp.path, - Effect.scoped( - Bus.Service.use((bus) => - Effect.gen(function* () { - const types: string[] = [] - const done = yield* Deferred.make() + expect(pings).toEqual([42]) + }), + ), + ) - yield* Stream.runForEach(bus.subscribeAll(), (evt) => - Effect.sync(() => { - types.push(evt.type) - if (types.length === 2) Deferred.doneUnsafe(done, Effect.void) - }), - ).pipe(Effect.forkScoped) + it.effect("subscribeAll receives all types", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const bus = yield* Bus.Service + const types: string[] = [] + const done = yield* Deferred.make() - yield* Effect.sleep("10 millis") - yield* bus.publish(TestEvent.Ping, { value: 1 }) - yield* bus.publish(TestEvent.Pong, { message: "hi" }) - yield* Deferred.await(done) - - expect(types).toContain("test.effect.ping") - expect(types).toContain("test.effect.pong") + yield* Stream.runForEach(bus.subscribeAll(), (evt) => + Effect.sync(() => { + types.push(evt.type) + if (types.length === 2) Deferred.doneUnsafe(done, Effect.void) }), - ), - ), - ) - }) + ).pipe(Effect.forkScoped) - test("multiple subscribers each receive the event", async () => { - await using tmp = await tmpdir() + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* bus.publish(TestEvent.Pong, { message: "hi" }) + yield* Deferred.await(done) - await runBus( - tmp.path, - Effect.scoped( - Bus.Service.use((bus) => - Effect.gen(function* () { - const a: number[] = [] - const b: number[] = [] - const doneA = yield* Deferred.make() - const doneB = yield* Deferred.make() + expect(types).toContain("test.effect.ping") + expect(types).toContain("test.effect.pong") + }), + ), + ) - yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => - Effect.sync(() => { - a.push(evt.properties.value) - Deferred.doneUnsafe(doneA, Effect.void) - }), - ).pipe(Effect.forkScoped) + it.effect("multiple subscribers each receive the event", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const bus = yield* Bus.Service + const a: number[] = [] + const b: number[] = [] + const doneA = yield* Deferred.make() + const doneB = yield* Deferred.make() - yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => - Effect.sync(() => { - b.push(evt.properties.value) - Deferred.doneUnsafe(doneB, Effect.void) - }), - ).pipe(Effect.forkScoped) - - yield* Effect.sleep("10 millis") - yield* bus.publish(TestEvent.Ping, { value: 99 }) - yield* Deferred.await(doneA) - yield* Deferred.await(doneB) - - expect(a).toEqual([99]) - expect(b).toEqual([99]) + yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + Effect.sync(() => { + a.push(evt.properties.value) + Deferred.doneUnsafe(doneA, Effect.void) }), - ), - ), - ) - }) + ).pipe(Effect.forkScoped) + + yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + Effect.sync(() => { + b.push(evt.properties.value) + Deferred.doneUnsafe(doneB, Effect.void) + }), + ).pipe(Effect.forkScoped) + + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Ping, { value: 99 }) + yield* Deferred.await(doneA) + yield* Deferred.await(doneB) + + expect(a).toEqual([99]) + expect(b).toEqual([99]) + }), + ), + ) }) diff --git a/packages/opencode/test/fixture/fixture.ts b/packages/opencode/test/fixture/fixture.ts index 776c6d52f6..a36a3f9d84 100644 --- a/packages/opencode/test/fixture/fixture.ts +++ b/packages/opencode/test/fixture/fixture.ts @@ -2,7 +2,7 @@ import { $ } from "bun" import * as fs from "fs/promises" import os from "os" import path from "path" -import { Effect, FileSystem } from "effect" +import { Effect, FileSystem, ServiceMap } from "effect" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import type { Config } from "../../src/config/config" import { Instance } from "../../src/project/instance" @@ -75,7 +75,7 @@ export async function tmpdir(options?: TmpDirOptions) { return result } -/** Effectful scoped tmpdir — cleaned up when the scope closes */ +/** Effectful scoped tmpdir. Cleaned up when the scope closes. Make sure these stay in sync */ export function tmpdirScoped(options?: { git?: boolean; config?: Partial }) { return Effect.gen(function* () { const fs = yield* FileSystem.FileSystem @@ -83,9 +83,7 @@ export function tmpdirScoped(options?: { git?: boolean; config?: Partial - spawner - .spawn(ChildProcess.make("git", args, { cwd: dir })) - .pipe(Effect.flatMap((handle) => handle.exitCode)) + spawner.spawn(ChildProcess.make("git", args, { cwd: dir })).pipe(Effect.flatMap((handle) => handle.exitCode)) if (options?.git) { yield* git("init") @@ -109,18 +107,35 @@ export function tmpdirScoped(options?: { git?: boolean; config?: Partial (self: Effect.Effect): Effect.Effect => - Effect.withFiber((fiber) => + Effect.servicesWith((services: ServiceMap.ServiceMap) => Effect.promise(async () => Instance.provide({ directory, - fn: () => Effect.runPromiseWith(fiber.services as any)(self), + fn: () => Effect.runPromiseWith(services)(self), }), ), ) -export function tmpdirInstanceScoped(options?: { git?: boolean; config?: Partial }) { - return Effect.map(tmpdirScoped(options), (path) => ({ - path, - provide: provideInstance(path), - })) +export function provideTmpdirInstance( + self: (path: string) => Effect.Effect, + options?: { git?: boolean; config?: Partial }, +) { + return Effect.gen(function* () { + const path = yield* tmpdirScoped(options) + let provided = false + + yield* Effect.addFinalizer(() => + provided + ? Effect.promise(() => + Instance.provide({ + directory: path, + fn: () => Instance.dispose(), + }), + ).pipe(Effect.ignore) + : Effect.void, + ) + + provided = true + return yield* self(path).pipe(provideInstance(path)) + }) } diff --git a/packages/opencode/test/fixture/instance.ts b/packages/opencode/test/fixture/instance.ts deleted file mode 100644 index 6f30846a43..0000000000 --- a/packages/opencode/test/fixture/instance.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { ConfigProvider, Effect, Layer, ManagedRuntime } from "effect" -import { Instance } from "../../src/project/instance" - -/** ConfigProvider that enables the experimental file watcher. */ -export const watcherConfigLayer = ConfigProvider.layer( - ConfigProvider.fromUnknown({ - OPENCODE_EXPERIMENTAL_FILEWATCHER: "true", - OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false", - }), -) - -/** - * Boot an Instance with the given service layers and run `body` with - * the ManagedRuntime. Cleanup is automatic — the runtime is disposed - * when `body` completes. - * - * Pass extra layers via `options.provide` (e.g. ConfigProvider.layer). - */ -export function withServices( - directory: string, - layer: Layer.Layer, - body: (rt: ManagedRuntime.ManagedRuntime) => Promise, - options?: { provide?: Layer.Layer[] }, -) { - return Instance.provide({ - directory, - fn: async () => { - let resolved: Layer.Layer = layer as any - if (options?.provide) { - for (const l of options.provide) { - resolved = resolved.pipe(Layer.provide(l)) as any - } - } - const rt = ManagedRuntime.make(resolved) - try { - await body(rt) - } finally { - await rt.dispose() - } - }, - }) -} - -export const provideInstance = - (directory: string) => - (self: Effect.Effect): Effect.Effect => - Effect.services((fiber) => - Effect.promise(async () => - Instance.provide({ - directory, - fn: () => Effect.runPromiseWith(fiber.services)(self), - }), - ), - ) diff --git a/packages/opencode/test/format/format.test.ts b/packages/opencode/test/format/format.test.ts index d081090d6c..c718c13e8b 100644 --- a/packages/opencode/test/format/format.test.ts +++ b/packages/opencode/test/format/format.test.ts @@ -1,187 +1,182 @@ -import { Effect, Layer, ManagedRuntime } from "effect" -import { afterEach, describe, expect, test } from "bun:test" -import { tmpdir } from "../fixture/fixture" +import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node" +import { describe, expect } from "bun:test" +import { Effect, Layer } from "effect" +import { provideTmpdirInstance } from "../fixture/fixture" +import { testEffect } from "../lib/effect" import { Format } from "../../src/format" import * as Formatter from "../../src/format/formatter" -import { Instance } from "../../src/project/instance" -function withRuntime( - directory: string, - layer: Layer.Layer, - body: (rt: ManagedRuntime.ManagedRuntime) => Promise, -) { - return Instance.provide({ - directory, - fn: async () => { - const rt = ManagedRuntime.make(layer) - try { - await body(rt) - } finally { - await rt.dispose() - } - }, - }) -} +const node = NodeChildProcessSpawner.layer.pipe( + Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)), +) + +const it = testEffect(Layer.mergeAll(Format.layer, node)) describe("Format", () => { - afterEach(async () => { - await Instance.disposeAll() - }) + it.effect("status() returns built-in formatters when no config overrides", () => + provideTmpdirInstance(() => + Format.Service.use((fmt) => + Effect.gen(function* () { + const statuses = yield* fmt.status() + expect(Array.isArray(statuses)).toBe(true) + expect(statuses.length).toBeGreaterThan(0) - test("status() returns built-in formatters when no config overrides", async () => { - await using tmp = await tmpdir() + for (const item of statuses) { + expect(typeof item.name).toBe("string") + expect(Array.isArray(item.extensions)).toBe(true) + expect(typeof item.enabled).toBe("boolean") + } - await withRuntime(tmp.path, Format.layer, async (rt) => { - const statuses = await rt.runPromise(Format.Service.use((s) => s.status())) - expect(Array.isArray(statuses)).toBe(true) - expect(statuses.length).toBeGreaterThan(0) + const gofmt = statuses.find((item) => item.name === "gofmt") + expect(gofmt).toBeDefined() + expect(gofmt!.extensions).toContain(".go") + }), + ), + ), + ) - for (const s of statuses) { - expect(typeof s.name).toBe("string") - expect(Array.isArray(s.extensions)).toBe(true) - expect(typeof s.enabled).toBe("boolean") - } + it.effect("status() returns empty list when formatter is disabled", () => + provideTmpdirInstance( + () => + Format.Service.use((fmt) => + Effect.gen(function* () { + expect(yield* fmt.status()).toEqual([]) + }), + ), + { config: { formatter: false } }, + ), + ) - const gofmt = statuses.find((s) => s.name === "gofmt") - expect(gofmt).toBeDefined() - expect(gofmt!.extensions).toContain(".go") - }) - }) - - test("status() returns empty list when formatter is disabled", async () => { - await using tmp = await tmpdir({ - config: { formatter: false }, - }) - - await withRuntime(tmp.path, Format.layer, async (rt) => { - const statuses = await rt.runPromise(Format.Service.use((s) => s.status())) - expect(statuses).toEqual([]) - }) - }) - - test("status() excludes formatters marked as disabled in config", async () => { - await using tmp = await tmpdir({ - config: { - formatter: { - gofmt: { disabled: true }, + it.effect("status() excludes formatters marked as disabled in config", () => + provideTmpdirInstance( + () => + Format.Service.use((fmt) => + Effect.gen(function* () { + const statuses = yield* fmt.status() + const gofmt = statuses.find((item) => item.name === "gofmt") + expect(gofmt).toBeUndefined() + }), + ), + { + config: { + formatter: { + gofmt: { disabled: true }, + }, }, }, - }) + ), + ) - await withRuntime(tmp.path, Format.layer, async (rt) => { - const statuses = await rt.runPromise(Format.Service.use((s) => s.status())) - const gofmt = statuses.find((s) => s.name === "gofmt") - expect(gofmt).toBeUndefined() - }) - }) + it.effect("service initializes without error", () => + provideTmpdirInstance(() => Format.Service.use(() => Effect.void)), + ) - test("service initializes without error", async () => { - await using tmp = await tmpdir() - - await withRuntime(tmp.path, Format.layer, async (rt) => { - await rt.runPromise(Format.Service.use(() => Effect.void)) - }) - }) - - test("status() initializes formatter state per directory", async () => { - await using off = await tmpdir({ - config: { formatter: false }, - }) - await using on = await tmpdir() - - const a = await Instance.provide({ - directory: off.path, - fn: () => Format.status(), - }) - const b = await Instance.provide({ - directory: on.path, - fn: () => Format.status(), - }) - - expect(a).toEqual([]) - expect(b.length).toBeGreaterThan(0) - }) - - test("runs enabled checks for matching formatters in parallel", async () => { - await using tmp = await tmpdir() - - const file = `${tmp.path}/test.parallel` - await Bun.write(file, "x") - - const one = { - extensions: Formatter.gofmt.extensions, - enabled: Formatter.gofmt.enabled, - command: Formatter.gofmt.command, - } - const two = { - extensions: Formatter.mix.extensions, - enabled: Formatter.mix.enabled, - command: Formatter.mix.command, - } - - let active = 0 - let max = 0 - - Formatter.gofmt.extensions = [".parallel"] - Formatter.mix.extensions = [".parallel"] - Formatter.gofmt.command = ["sh", "-c", "true"] - Formatter.mix.command = ["sh", "-c", "true"] - Formatter.gofmt.enabled = async () => { - active++ - max = Math.max(max, active) - await Bun.sleep(20) - active-- - return true - } - Formatter.mix.enabled = async () => { - active++ - max = Math.max(max, active) - await Bun.sleep(20) - active-- - return true - } - - try { - await withRuntime(tmp.path, Format.layer, async (rt) => { - await rt.runPromise(Format.Service.use((s) => s.init())) - await rt.runPromise(Format.Service.use((s) => s.file(file))) + it.effect("status() initializes formatter state per directory", () => + Effect.gen(function* () { + const a = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()), { + config: { formatter: false }, }) - } finally { - Formatter.gofmt.extensions = one.extensions - Formatter.gofmt.enabled = one.enabled - Formatter.gofmt.command = one.command - Formatter.mix.extensions = two.extensions - Formatter.mix.enabled = two.enabled - Formatter.mix.command = two.command - } + const b = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status())) - expect(max).toBe(2) - }) + expect(a).toEqual([]) + expect(b.length).toBeGreaterThan(0) + }), + ) - test("runs matching formatters sequentially for the same file", async () => { - await using tmp = await tmpdir({ - config: { - formatter: { - first: { - command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"], - extensions: [".seq"], - }, - second: { - command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"], - extensions: [".seq"], + it.effect("runs enabled checks for matching formatters in parallel", () => + provideTmpdirInstance((path) => + Effect.gen(function* () { + const file = `${path}/test.parallel` + yield* Effect.promise(() => Bun.write(file, "x")) + + const one = { + extensions: Formatter.gofmt.extensions, + enabled: Formatter.gofmt.enabled, + command: Formatter.gofmt.command, + } + const two = { + extensions: Formatter.mix.extensions, + enabled: Formatter.mix.enabled, + command: Formatter.mix.command, + } + + let active = 0 + let max = 0 + + yield* Effect.acquireUseRelease( + Effect.sync(() => { + Formatter.gofmt.extensions = [".parallel"] + Formatter.mix.extensions = [".parallel"] + Formatter.gofmt.command = ["sh", "-c", "true"] + Formatter.mix.command = ["sh", "-c", "true"] + Formatter.gofmt.enabled = async () => { + active++ + max = Math.max(max, active) + await Bun.sleep(20) + active-- + return true + } + Formatter.mix.enabled = async () => { + active++ + max = Math.max(max, active) + await Bun.sleep(20) + active-- + return true + } + }), + () => + Format.Service.use((fmt) => + Effect.gen(function* () { + yield* fmt.init() + yield* fmt.file(file) + }), + ), + () => + Effect.sync(() => { + Formatter.gofmt.extensions = one.extensions + Formatter.gofmt.enabled = one.enabled + Formatter.gofmt.command = one.command + Formatter.mix.extensions = two.extensions + Formatter.mix.enabled = two.enabled + Formatter.mix.command = two.command + }), + ) + + expect(max).toBe(2) + }), + ), + ) + + it.effect("runs matching formatters sequentially for the same file", () => + provideTmpdirInstance( + (path) => + Effect.gen(function* () { + const file = `${path}/test.seq` + yield* Effect.promise(() => Bun.write(file, "x")) + + yield* Format.Service.use((fmt) => + Effect.gen(function* () { + yield* fmt.init() + yield* fmt.file(file) + }), + ) + + expect(yield* Effect.promise(() => Bun.file(file).text())).toBe("xAB") + }), + { + config: { + formatter: { + first: { + command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"], + extensions: [".seq"], + }, + second: { + command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"], + extensions: [".seq"], + }, }, }, }, - }) - - const file = `${tmp.path}/test.seq` - await Bun.write(file, "x") - - await withRuntime(tmp.path, Format.layer, async (rt) => { - await rt.runPromise(Format.Service.use((s) => s.init())) - await rt.runPromise(Format.Service.use((s) => s.file(file))) - }) - - expect(await Bun.file(file).text()).toBe("xAB") - }) + ), + ) })