From 549b146ea6529cc29dd556644543fcb48acf6d45 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 12 May 2026 14:51:18 -0400 Subject: [PATCH] Stabilize session event tests (#27117) --- .../opencode/test/session/session.test.ts | 276 +++++++++--------- 1 file changed, 132 insertions(+), 144 deletions(-) diff --git a/packages/opencode/test/session/session.test.ts b/packages/opencode/test/session/session.test.ts index bb69e459bc..ada55d1349 100644 --- a/packages/opencode/test/session/session.test.ts +++ b/packages/opencode/test/session/session.test.ts @@ -1,186 +1,174 @@ -import { describe, expect, test } from "bun:test" -import path from "path" +import { describe, expect } from "bun:test" +import { Deferred, Effect, Exit, Layer } from "effect" import { Session as SessionNs } from "@/session/session" -import { Bus } from "../../src/bus" +import { GlobalBus, type GlobalEvent } from "../../src/bus/global" import * as Log from "@opencode-ai/core/util/log" -import { Instance } from "../../src/project/instance" -import { WithInstance } from "../../src/project/with-instance" +import { Flag } from "@opencode-ai/core/flag/flag" import { MessageV2 } from "../../src/session/message-v2" import { MessageID, PartID, type SessionID } from "../../src/session/schema" -import { AppRuntime } from "../../src/effect/app-runtime" -import { tmpdir } from "../fixture/fixture" +import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" +import { provideInstance, tmpdirScoped } from "../fixture/fixture" +import { testEffect } from "../lib/effect" -const projectRoot = path.join(__dirname, "../..") void Log.init({ print: false }) -function create(input?: SessionNs.CreateInput) { - return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create(input))) -} +const it = testEffect(Layer.mergeAll(SessionNs.defaultLayer, CrossSpawnSpawner.defaultLayer)) -function get(id: SessionID) { - return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(id))) -} +const awaitDeferred = (deferred: Deferred.Deferred, message: string) => + Effect.race( + Deferred.await(deferred), + Effect.sleep("2 seconds").pipe(Effect.flatMap(() => Effect.fail(new Error(message)))), + ) -function remove(id: SessionID) { - return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.remove(id))) -} +const remove = (id: SessionID) => SessionNs.Service.use((svc) => svc.remove(id)) -function updateMessage(msg: T) { - return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updateMessage(msg))) -} - -function updatePart(part: T) { - return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updatePart(part))) +const subscribeGlobal = (type: string, callback: (event: NonNullable) => void) => { + const listener = (event: GlobalEvent) => { + if (event.payload?.type === type) callback(event.payload) + } + GlobalBus.on("event", listener) + return () => GlobalBus.off("event", listener) } describe("session.created event", () => { - test("should emit session.created event when session is created", async () => { - await WithInstance.provide({ - directory: projectRoot, - fn: async () => { - let eventReceived = false - let receivedInfo: SessionNs.Info | undefined + it.instance("should emit session.created event when session is created", () => + Effect.gen(function* () { + const session = yield* SessionNs.Service + const received = yield* Deferred.make() - const unsub = Bus.subscribe(SessionNs.Event.Created, (event) => { - eventReceived = true - receivedInfo = event.properties.info as SessionNs.Info - }) + const unsub = subscribeGlobal(SessionNs.Event.Created.type, (event) => { + Deferred.doneUnsafe(received, Effect.succeed(event.properties.info as SessionNs.Info)) + }) + yield* Effect.addFinalizer(() => Effect.sync(unsub)) - const info = await create({}) - await new Promise((resolve) => setTimeout(resolve, 100)) - unsub() + const info = yield* session.create({}) + const receivedInfo = yield* awaitDeferred(received, "timed out waiting for session.created") - expect(eventReceived).toBe(true) - expect(receivedInfo).toBeDefined() - expect(receivedInfo?.id).toBe(info.id) - expect(receivedInfo?.projectID).toBe(info.projectID) - expect(receivedInfo?.directory).toBe(info.directory) - expect(receivedInfo?.path).toBe(info.path) - expect(receivedInfo?.title).toBe(info.title) + expect(receivedInfo.id).toBe(info.id) + expect(receivedInfo.projectID).toBe(info.projectID) + expect(receivedInfo.directory).toBe(info.directory) + expect(receivedInfo.path).toBe(info.path) + expect(receivedInfo.title).toBe(info.title) - await remove(info.id) - }, - }) - }) + yield* session.remove(info.id) + }), + ) - test("session.created event should be emitted before session.updated", async () => { - await WithInstance.provide({ - directory: projectRoot, - fn: async () => { - const events: string[] = [] + it.instance("session.created event should be emitted before session.updated", () => + Effect.gen(function* () { + if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return - const unsubCreated = Bus.subscribe(SessionNs.Event.Created, () => { - events.push("created") - }) + const session = yield* SessionNs.Service + const events: string[] = [] + const received = yield* Deferred.make() + const push = (event: string) => { + events.push(event) + if (events.includes("created") && events.includes("updated")) { + Deferred.doneUnsafe(received, Effect.succeed(events)) + } + } - const unsubUpdated = Bus.subscribe(SessionNs.Event.Updated, () => { - events.push("updated") - }) + const unsubCreated = subscribeGlobal(SessionNs.Event.Created.type, () => { + push("created") + }) + yield* Effect.addFinalizer(() => Effect.sync(unsubCreated)) - const info = await create({}) - await new Promise((resolve) => setTimeout(resolve, 100)) - unsubCreated() - unsubUpdated() + const unsubUpdated = subscribeGlobal(SessionNs.Event.Updated.type, () => { + push("updated") + }) + yield* Effect.addFinalizer(() => Effect.sync(unsubUpdated)) - expect(events).toContain("created") - expect(events).toContain("updated") - expect(events.indexOf("created")).toBeLessThan(events.indexOf("updated")) + const info = yield* session.create({}) + const receivedEvents = yield* awaitDeferred(received, "timed out waiting for session created/updated events") - await remove(info.id) - }, - }) - }) + expect(receivedEvents).toContain("created") + expect(receivedEvents).toContain("updated") + expect(receivedEvents.indexOf("created")).toBeLessThan(receivedEvents.indexOf("updated")) + + yield* session.remove(info.id) + }), + ) }) describe("step-finish token propagation via Bus event", () => { - test( + it.instance( "non-zero tokens propagate through PartUpdated event", - async () => { - await WithInstance.provide({ - directory: projectRoot, - fn: async () => { - const info = await create({}) + () => + Effect.gen(function* () { + const session = yield* SessionNs.Service + const info = yield* session.create({}) - const messageID = MessageID.ascending() - await updateMessage({ - id: messageID, - sessionID: info.id, - role: "user", - time: { created: Date.now() }, - agent: "user", - model: { providerID: "test", modelID: "test" }, - tools: {}, - mode: "", - } as unknown as MessageV2.Info) + const messageID = MessageID.ascending() + yield* session.updateMessage({ + id: messageID, + sessionID: info.id, + role: "user", + time: { created: Date.now() }, + agent: "user", + model: { providerID: "test", modelID: "test" }, + tools: {}, + mode: "", + } as unknown as MessageV2.Info) - // Bus subscribers receive readonly Schema.Type payloads; `MessageV2.Part` - // is the mutable domain type. Cast bridges the two — safe because the - // test only reads the value afterwards. - let received: MessageV2.Part | undefined - const unsub = Bus.subscribe(MessageV2.Event.PartUpdated, (event) => { - received = event.properties.part as MessageV2.Part - }) + // Bus subscribers receive readonly Schema.Type payloads; `MessageV2.Part` + // is the mutable domain type. Cast bridges the two — safe because the + // test only reads the value afterwards. + const received = yield* Deferred.make() + const unsub = subscribeGlobal(MessageV2.Event.PartUpdated.type, (event) => { + Deferred.doneUnsafe(received, Effect.succeed(event.properties.part as MessageV2.Part)) + }) + yield* Effect.addFinalizer(() => Effect.sync(unsub)) - const tokens = { - total: 1500, - input: 500, - output: 800, - reasoning: 200, - cache: { read: 100, write: 50 }, - } + const tokens = { + total: 1500, + input: 500, + output: 800, + reasoning: 200, + cache: { read: 100, write: 50 }, + } - const partInput = { - id: PartID.ascending(), - messageID, - sessionID: info.id, - type: "step-finish" as const, - reason: "stop", - cost: 0.005, - tokens, - } + const partInput = { + id: PartID.ascending(), + messageID, + sessionID: info.id, + type: "step-finish" as const, + reason: "stop", + cost: 0.005, + tokens, + } - await updatePart(partInput) - await new Promise((resolve) => setTimeout(resolve, 100)) + yield* session.updatePart(partInput) + const receivedPart = yield* awaitDeferred(received, "timed out waiting for message.part.updated") - expect(received).toBeDefined() - expect(received!.type).toBe("step-finish") - const finish = received as MessageV2.StepFinishPart - expect(finish.tokens.input).toBe(500) - expect(finish.tokens.output).toBe(800) - expect(finish.tokens.reasoning).toBe(200) - expect(finish.tokens.total).toBe(1500) - expect(finish.tokens.cache.read).toBe(100) - expect(finish.tokens.cache.write).toBe(50) - expect(finish.cost).toBe(0.005) - expect(received).not.toBe(partInput) + expect(receivedPart.type).toBe("step-finish") + const finish = receivedPart as MessageV2.StepFinishPart + expect(finish.tokens.input).toBe(500) + expect(finish.tokens.output).toBe(800) + expect(finish.tokens.reasoning).toBe(200) + expect(finish.tokens.total).toBe(1500) + expect(finish.tokens.cache.read).toBe(100) + expect(finish.tokens.cache.write).toBe(50) + expect(finish.cost).toBe(0.005) + expect(receivedPart).not.toBe(partInput) - unsub() - await remove(info.id) - }, - }) - }, + yield* session.remove(info.id) + }), { timeout: 30000 }, ) }) describe("Session", () => { - test("remove works without an instance", async () => { - await using tmp = await tmpdir({ git: true }) + it.live("remove works without an instance", () => + Effect.gen(function* () { + const session = yield* SessionNs.Service + const dir = yield* tmpdirScoped({ git: true }) + const info = yield* provideInstance(dir)(session.create({ title: "remove-without-instance" })) - const info = await WithInstance.provide({ - directory: tmp.path, - fn: () => create({ title: "remove-without-instance" }), - }) + const removeExit = yield* remove(info.id).pipe(Effect.exit) + expect(Exit.isSuccess(removeExit)).toBe(true) - await expect(async () => { - await remove(info.id) - }).not.toThrow() - - let missing = false - await get(info.id).catch(() => { - missing = true - }) - - expect(missing).toBe(true) - }) + const getExit = yield* session.get(info.id).pipe(Effect.exit) + expect(Exit.isFailure(getExit)).toBe(true) + }), + ) })