From 9d35b04e133f438aa2e97f83f91c205f009f47c2 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 14 May 2026 19:33:06 -0400 Subject: [PATCH] test(acp): replace fixed sleeps with pollUntil in event-subscription (#27624) --- .../test/acp/event-subscription.test.ts | 109 +++++++++++++++--- 1 file changed, 91 insertions(+), 18 deletions(-) diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index 2c9371d74d..06a1fcab32 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -10,6 +10,22 @@ import type { } from "@opencode-ai/sdk/v2" import { provideTestInstance, tmpdir } from "../fixture/fixture" +const pollUntil = async ( + check: () => T | undefined | false | Promise, + message: string, + opts?: { timeoutMs?: number; intervalMs?: number }, +): Promise => { + const timeoutMs = opts?.timeoutMs ?? 2000 + const intervalMs = opts?.intervalMs ?? 5 + const started = Date.now() + while (true) { + const v = await check() + if (v !== undefined && v !== null && v !== false) return v as T + if (Date.now() - started > timeoutMs) throw new Error(message) + await new Promise((r) => setTimeout(r, intervalMs)) + } +} + type SessionUpdateParams = Parameters[0] type RequestPermissionParams = Parameters[0] type RequestPermissionResult = Awaited> @@ -339,7 +355,10 @@ describe("acp.agent event subscription", () => { }, } as any) - await new Promise((r) => setTimeout(r, 10)) + await pollUntil( + () => (updates.get(sessionB) ?? []).includes("agent_message_chunk"), + "sessionB never received agent_message_chunk", + ) expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false) expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true) @@ -376,7 +395,24 @@ describe("acp.agent event subscription", () => { }, } as any) - await new Promise((r) => setTimeout(r, 20)) + controller.push({ + directory: cwd, + payload: { + type: "message.part.delta", + properties: { + sessionID: sessionId, + messageID: "msg_marker", + partID: "msg_marker_part", + field: "text", + delta: "marker", + }, + }, + } as any) + + await pollUntil( + () => sessionUpdates.some((u) => u.sessionId === sessionId && u.update.sessionUpdate === "agent_message_chunk"), + "marker event was never processed", + ) expect( sessionUpdates @@ -426,7 +462,12 @@ describe("acp.agent event subscription", () => { push(sessionA, "msg_a", tokenA[2]) push(sessionB, "msg_b", tokenB[2]) - await new Promise((r) => setTimeout(r, 20)) + await pollUntil( + () => + (chunks.get(sessionA) ?? "").includes(tokenA.join("")) && + (chunks.get(sessionB) ?? "").includes(tokenB.join("")), + "interleaved chunks never fully arrived", + ) const a = chunks.get(sessionA) ?? "" const b = chunks.get(sessionB) ?? "" @@ -493,7 +534,7 @@ describe("acp.agent event subscription", () => { }, } as any) - await new Promise((r) => setTimeout(r, 20)) + await pollUntil(() => permissionReplies.includes("perm_1"), "perm_1 was never replied") expect(permissionReplies).toContain("perm_1") @@ -552,10 +593,8 @@ describe("acp.agent event subscription", () => { }, } as any) - // Give time for permission handling to start - await new Promise((r) => setTimeout(r, 10)) + await pollUntil(() => _permissionCalls > 0, "permission handling for A never started") - // Push message for session B while A's permission is pending controller.push({ directory: cwd, payload: { @@ -570,18 +609,17 @@ describe("acp.agent event subscription", () => { }, } as any) - // Wait for session B's message to be processed - await new Promise((r) => setTimeout(r, 20)) + await pollUntil( + () => (chunks.get(sessionB) ?? "").includes("session_b_message"), + "session B never received its message", + ) - // Session B should have received message even though A's permission is still pending expect(chunks.get(sessionB) ?? "").toContain("session_b_message") expect(permissionReplies).not.toContain("perm_a") - // Release session A's permission resolvePermissionA!() - await new Promise((r) => setTimeout(r, 20)) + await pollUntil(() => permissionReplies.includes("perm_a"), "perm_a was never replied after release") - // Now session A's permission should be replied expect(permissionReplies).toContain("perm_a") stop() @@ -610,7 +648,15 @@ describe("acp.agent event subscription", () => { }), ) } - await new Promise((r) => setTimeout(r, 20)) + await pollUntil( + () => + sessionUpdates + .filter((u) => u.sessionId === sessionId) + .filter((u) => isToolCallUpdate(u.update)) + .map((u) => inProgressText(u.update)) + .filter((t) => t === "ab").length > 0, + "final bash snapshot 'ab' never arrived", + ) const snapshots = sessionUpdates .filter((u) => u.sessionId === sessionId) @@ -649,7 +695,14 @@ describe("acp.agent event subscription", () => { input: { filePath: "/tmp/example.txt" }, }), ) - await new Promise((r) => setTimeout(r, 20)) + await pollUntil( + () => + sessionUpdates + .filter((u) => u.sessionId === sessionId) + .map((u) => u.update.sessionUpdate) + .filter((u) => u === "tool_call" || u === "tool_call_update").length >= 4, + "expected 4 tool_call/tool_call_update events", + ) const types = sessionUpdates .filter((u) => u.sessionId === sessionId) @@ -706,7 +759,10 @@ describe("acp.agent event subscription", () => { ], }), ) - await new Promise((r) => setTimeout(r, 20)) + await pollUntil( + () => completedToolUpdate(sessionUpdates, sessionId, "call_image"), + "completed tool update for call_image never arrived", + ) const update = completedToolUpdate(sessionUpdates, sessionId, "call_image") expect(update?.content).toContainEqual({ @@ -836,7 +892,16 @@ describe("acp.agent event subscription", () => { metadata: { output: "hi\nthere\n" }, }), ) - await new Promise((r) => setTimeout(r, 20)) + await pollUntil( + () => + sessionUpdates + .filter((u) => u.sessionId === sessionId) + .map((u) => u.update) + .filter((u) => "toolCallId" in u && u.toolCallId === "call_1") + .map((u) => u.sessionUpdate) + .filter((u) => u === "tool_call" || u === "tool_call_update").length >= 3, + "expected 3 tool events for call_1", + ) const types = sessionUpdates .filter((u) => u.sessionId === sessionId) @@ -888,7 +953,15 @@ describe("acp.agent event subscription", () => { metadata: { output: "a" }, }), ) - await new Promise((r) => setTimeout(r, 20)) + await pollUntil( + () => + sessionUpdates + .filter((u) => u.sessionId === sessionId) + .filter((u) => isToolCallUpdate(u.update)) + .map((u) => inProgressText(u.update)) + .filter((t) => t === "a").length >= 2, + "expected two 'a' bash snapshots after pending reset", + ) const snapshots = sessionUpdates .filter((u) => u.sessionId === sessionId)