mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-16 17:42:30 +00:00
test(lib): promote pollWithTimeout/awaitWithTimeout helpers (#27626)
This commit is contained in:
@@ -157,3 +157,47 @@ const failingAccountLayer = Layer.mock(Account.Service, {
|
||||
```
|
||||
|
||||
This is much shorter than stubbing every method with `Effect.void` / `Effect.succeed(...)` placeholders, and it keeps the test focused on the behaviour under test.
|
||||
|
||||
## Synchronizing With Concurrent Work
|
||||
|
||||
### The Anti-Pattern
|
||||
|
||||
Using `Effect.sleep(N)` or `setTimeout` as a "wait for the forked fiber to be ready" hack races the scheduler. The forked fiber may not have reached the synchronization point within `N` ms on a slow CI host, and the test fails intermittently. See PR #27622 for a concrete flake that fell out of this exact pattern.
|
||||
|
||||
### The Fix
|
||||
|
||||
Wait on a **published readiness signal**, not wall-clock time. Available affordances:
|
||||
|
||||
- `pollWithTimeout(effect, message, duration?)` from `test/lib/effect.ts` — repeatedly run a predicate effect until it returns a non-`undefined` value, with a timeout.
|
||||
- `awaitWithTimeout(effect, message, duration?)` from `test/lib/effect.ts` — wrap any effect with `Effect.timeoutOrElse` and a custom error message.
|
||||
- `llm.wait(n)` from `test/lib/llm-server.ts` — wait until the mock LLM has received `n` HTTP calls.
|
||||
- `SessionStatus.Service` `.get(sessionID)` — observable per-session state (`{ type: "busy" | "idle" | ... }`).
|
||||
- `BackgroundJob.wait({ id, timeout })` from `src/background/job.ts` — wait for a background job to complete.
|
||||
- Bus subscriptions — fork `Stream.runForEach(bus.subscribe(Event), ...)` and open a `Latch` inside the callback to signal first-event readiness.
|
||||
- `Deferred.await(deferred).pipe(Effect.timeoutOrElse(...))` for one-shot signals.
|
||||
|
||||
### Example
|
||||
|
||||
```ts
|
||||
// Antipattern — race
|
||||
yield* prompt.shell({ command: "sleep 30" }).pipe(Effect.forkChild)
|
||||
yield* Effect.sleep(50)
|
||||
yield* prompt.cancel(chat.id)
|
||||
|
||||
// Fix — wait for a published readiness signal
|
||||
yield* prompt.shell({ command: "sleep 30" }).pipe(Effect.forkChild)
|
||||
yield* pollWithTimeout(
|
||||
Effect.gen(function* () {
|
||||
const s = yield* (yield* SessionStatus.Service).get(chat.id)
|
||||
return s.type === "busy" ? (true as const) : undefined
|
||||
}),
|
||||
"session never became busy",
|
||||
)
|
||||
yield* prompt.cancel(chat.id)
|
||||
```
|
||||
|
||||
### When Fixed Sleeps Are OK
|
||||
|
||||
- Testing debounce or throttle behavior, where the sleep **is** the test.
|
||||
- Letting real wall-clock advance past a genuine timestamp resolution boundary (e.g. mtime granularity).
|
||||
- Simulating network latency in race-regression tests that intentionally exercise ordering.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { test, type TestOptions } from "bun:test"
|
||||
import { Cause, Effect, Exit, Layer } from "effect"
|
||||
import { Cause, Duration, Effect, Exit, Layer } from "effect"
|
||||
import type * as Scope from "effect/Scope"
|
||||
import * as TestClock from "effect/testing/TestClock"
|
||||
import * as TestConsole from "effect/testing/TestConsole"
|
||||
@@ -109,3 +109,33 @@ export const it = make(testEnv, liveEnv)
|
||||
|
||||
export const testEffect = <R, E>(layer: Layer.Layer<R, E>) =>
|
||||
make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv))
|
||||
|
||||
export const awaitWithTimeout = <A, E, R>(
|
||||
self: Effect.Effect<A, E, R>,
|
||||
message: string,
|
||||
duration: Duration.Input = "2 seconds",
|
||||
) =>
|
||||
self.pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration,
|
||||
orElse: () => Effect.fail(new Error(message)),
|
||||
}),
|
||||
)
|
||||
|
||||
export const pollWithTimeout = <A, E, R>(
|
||||
self: Effect.Effect<A | undefined, E, R>,
|
||||
message: string,
|
||||
duration: Duration.Input = "5 seconds",
|
||||
) =>
|
||||
Effect.gen(function* () {
|
||||
while (true) {
|
||||
const result = yield* self
|
||||
if (result !== undefined) return result
|
||||
yield* Effect.sleep("20 millis")
|
||||
}
|
||||
}).pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration,
|
||||
orElse: () => Effect.fail(new Error(message)),
|
||||
}),
|
||||
)
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import { expect, mock, beforeEach } from "bun:test"
|
||||
import { EventEmitter } from "events"
|
||||
import { Deferred, Effect, Layer, Option } from "effect"
|
||||
import type { Duration } from "effect"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { awaitWithTimeout, testEffect } from "../lib/effect"
|
||||
import type { MCP as MCPNS } from "../../src/mcp/index"
|
||||
|
||||
// Track open() calls and control failure behavior
|
||||
@@ -135,18 +134,6 @@ const config = (name: string) => ({
|
||||
|
||||
const withCallbackStop = Effect.addFinalizer(() => Effect.promise(() => McpOAuthCallback.stop()).pipe(Effect.ignore))
|
||||
|
||||
const awaitWithTimeout = <A, E, R>(
|
||||
self: Effect.Effect<A, E, R>,
|
||||
message: string,
|
||||
duration: Duration.Input = "5 seconds",
|
||||
) =>
|
||||
self.pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration,
|
||||
orElse: () => Effect.fail(new Error(message)),
|
||||
}),
|
||||
)
|
||||
|
||||
const trackBrowserOpen = Effect.gen(function* () {
|
||||
const opened = yield* Deferred.make<string>()
|
||||
openDeferred = opened
|
||||
@@ -184,7 +171,11 @@ mcpTest.instance(
|
||||
const event = yield* trackBrowserOpenFailed
|
||||
yield* authenticateScoped("test-oauth-server")
|
||||
|
||||
const failure = yield* awaitWithTimeout(Deferred.await(event), "Timed out waiting for BrowserOpenFailed event")
|
||||
const failure = yield* awaitWithTimeout(
|
||||
Deferred.await(event),
|
||||
"Timed out waiting for BrowserOpenFailed event",
|
||||
"5 seconds",
|
||||
)
|
||||
|
||||
expect(failure.mcpName).toBe("test-oauth-server")
|
||||
expect(failure.url).toContain("https://")
|
||||
@@ -203,7 +194,7 @@ mcpTest.instance(
|
||||
const event = yield* trackBrowserOpenFailed
|
||||
yield* authenticateScoped("test-oauth-server-2")
|
||||
|
||||
yield* awaitWithTimeout(Deferred.await(opened), "Timed out waiting for open()")
|
||||
yield* awaitWithTimeout(Deferred.await(opened), "Timed out waiting for open()", "5 seconds")
|
||||
const failure = yield* Deferred.await(event).pipe(Effect.timeoutOption("700 millis"))
|
||||
|
||||
expect(failure).toEqual(Option.none())
|
||||
@@ -224,7 +215,7 @@ mcpTest.instance(
|
||||
const event = yield* trackBrowserOpenFailed
|
||||
yield* authenticateScoped("test-oauth-server-3")
|
||||
|
||||
const url = yield* awaitWithTimeout(Deferred.await(opened), "Timed out waiting for open()")
|
||||
const url = yield* awaitWithTimeout(Deferred.await(opened), "Timed out waiting for open()", "5 seconds")
|
||||
const failure = yield* Deferred.await(event).pipe(Effect.timeoutOption("700 millis"))
|
||||
|
||||
expect(failure).toEqual(Option.none())
|
||||
|
||||
@@ -49,7 +49,7 @@ import { Ripgrep } from "../../src/file/ripgrep"
|
||||
import { Format } from "../../src/format"
|
||||
import { Reference } from "../../src/reference/reference"
|
||||
import { TestInstance } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { awaitWithTimeout, pollWithTimeout, testEffect } from "../lib/effect"
|
||||
import { reply, TestLLMServer } from "../lib/llm-server"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { RuntimeFlags } from "@/effect/runtime-flags"
|
||||
@@ -306,36 +306,6 @@ const useServerConfig = Effect.fn("test.useServerConfig")(function* (config: (ur
|
||||
return { dir, llm }
|
||||
})
|
||||
|
||||
const awaitWithTimeout = <A, E, R>(
|
||||
self: Effect.Effect<A, E, R>,
|
||||
message: string,
|
||||
duration: Duration.Input = "2 seconds",
|
||||
) =>
|
||||
self.pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration,
|
||||
orElse: () => Effect.fail(new Error(message)),
|
||||
}),
|
||||
)
|
||||
|
||||
const pollWithTimeout = <A, E, R>(
|
||||
self: Effect.Effect<A | undefined, E, R>,
|
||||
message: string,
|
||||
duration: Duration.Input = "5 seconds",
|
||||
) =>
|
||||
Effect.gen(function* () {
|
||||
while (true) {
|
||||
const result = yield* self
|
||||
if (result !== undefined) return result
|
||||
yield* Effect.sleep("20 millis")
|
||||
}
|
||||
}).pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration,
|
||||
orElse: () => Effect.fail(new Error(message)),
|
||||
}),
|
||||
)
|
||||
|
||||
// Wait for a session's runner to enter a busy state. SessionStatus is flipped to
|
||||
// "busy" inside Runner.startShell's modifyEffect at the same moment the runner
|
||||
// is registered, so this is a deterministic readiness signal — cancel can't
|
||||
|
||||
Reference in New Issue
Block a user