mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-15 00:52:35 +00:00
refactor(http-recorder): tighten cassette safety, fix WS leaks + docs (#26730)
This commit is contained in:
@@ -166,11 +166,11 @@ import { Effect } from "effect"
|
||||
|
||||
const audit = Effect.gen(function* () {
|
||||
const cassettes = yield* HttpRecorder.Cassette.Service
|
||||
const entries = yield* cassettes.list()
|
||||
const issues = yield* Effect.forEach(entries, (entry) =>
|
||||
const names = yield* cassettes.list()
|
||||
const issues = yield* Effect.forEach(names, (name) =>
|
||||
cassettes
|
||||
.read(entry.name)
|
||||
.pipe(Effect.map((interactions) => ({ name: entry.name, findings: HttpRecorder.secretFindings(interactions) }))),
|
||||
.read(name)
|
||||
.pipe(Effect.map((interactions) => ({ name, findings: HttpRecorder.secretFindings(interactions) }))),
|
||||
)
|
||||
return issues.filter((i) => i.findings.length > 0)
|
||||
})
|
||||
@@ -198,12 +198,11 @@ type RecordReplayOptions = {
|
||||
|
||||
| File | Purpose |
|
||||
| -------------- | -------------------------------------------------------------------------------- |
|
||||
| `effect.ts` | `cassetteLayer` / `recordingLayer` — the `HttpClient` adapter. |
|
||||
| `websocket.ts` | `makeWebSocketExecutor` — WebSocket record/replay. |
|
||||
| `cassette.ts` | `Cassette.Service` — reads/writes cassette files, accumulates state. |
|
||||
| `recorder.ts` | Shared transport plumbing: `UnsafeCassetteError`, `appendOrFail`, `ReplayState`. |
|
||||
| `redactor.ts` | Composable `Redactor` — headers, url, body redaction. |
|
||||
| `redaction.ts` | Lower-level header/URL primitives + secret pattern detection. |
|
||||
| `schema.ts` | Effect Schema definitions for the cassette JSON format. |
|
||||
| `storage.ts` | Path resolution, JSON encode/decode, sync existence check. |
|
||||
| `matching.ts` | Request matcher, canonicalization, sequential cursor, mismatch diagnostics. |
|
||||
| `effect.ts` | `cassetteLayer` / `recordingLayer` — the `HttpClient` adapter. |
|
||||
| `websocket.ts` | `makeWebSocketExecutor` — WebSocket record/replay. |
|
||||
| `cassette.ts` | `Cassette.Service` — `fileSystem` / `memory` adapters, error types. |
|
||||
| `recorder.ts` | Shared transport plumbing: `resolveAutoMode`, `ReplayState`. |
|
||||
| `redactor.ts` | Composable `Redactor` — headers, url, body redaction. |
|
||||
| `redaction.ts` | Lower-level header/URL primitives + secret pattern detection. |
|
||||
| `schema.ts` | Effect Schema definitions for the cassette JSON format. |
|
||||
| `matching.ts` | Request matcher, canonicalization, sequential cursor, mismatch diagnostics. |
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { Context, Effect, FileSystem, Layer, Schema } from "effect"
|
||||
import * as fs from "node:fs"
|
||||
import * as path from "node:path"
|
||||
import { secretFindings, type SecretFinding } from "./redaction"
|
||||
import { secretFindings, SecretFindingSchema, type SecretFinding } from "./redaction"
|
||||
import { decodeCassette, encodeCassette, type Cassette, type CassetteMetadata, type Interaction } from "./schema"
|
||||
|
||||
const DEFAULT_RECORDINGS_DIR = path.resolve(process.cwd(), "test", "fixtures", "recordings")
|
||||
@@ -14,13 +14,24 @@ export class CassetteNotFoundError extends Schema.TaggedErrorClass<CassetteNotFo
|
||||
}
|
||||
}
|
||||
|
||||
export interface AppendResult {
|
||||
readonly findings: ReadonlyArray<SecretFinding>
|
||||
export class UnsafeCassetteError extends Schema.TaggedErrorClass<UnsafeCassetteError>()("UnsafeCassetteError", {
|
||||
cassetteName: Schema.String,
|
||||
findings: Schema.Array(SecretFindingSchema),
|
||||
}) {
|
||||
override get message() {
|
||||
return `Refusing to write cassette "${this.cassetteName}" because it contains possible secrets: ${this.findings
|
||||
.map((finding) => `${finding.path} (${finding.reason})`)
|
||||
.join(", ")}`
|
||||
}
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly read: (name: string) => Effect.Effect<ReadonlyArray<Interaction>, CassetteNotFoundError>
|
||||
readonly append: (name: string, interaction: Interaction, metadata?: CassetteMetadata) => Effect.Effect<AppendResult>
|
||||
readonly append: (
|
||||
name: string,
|
||||
interaction: Interaction,
|
||||
metadata?: CassetteMetadata,
|
||||
) => Effect.Effect<void, UnsafeCassetteError>
|
||||
readonly exists: (name: string) => Effect.Effect<boolean>
|
||||
readonly list: () => Effect.Effect<ReadonlyArray<string>>
|
||||
}
|
||||
@@ -44,6 +55,9 @@ const formatCassette = (cassette: Cassette) => `${JSON.stringify(encodeCassette(
|
||||
|
||||
const parseCassette = (raw: string) => decodeCassette(JSON.parse(raw))
|
||||
|
||||
const failIfUnsafe = (name: string, findings: ReadonlyArray<SecretFinding>) =>
|
||||
findings.length === 0 ? Effect.void : Effect.fail(new UnsafeCassetteError({ cassetteName: name, findings }))
|
||||
|
||||
export const fileSystem = (
|
||||
options: { readonly directory?: string } = {},
|
||||
): Layer.Layer<Service, never, FileSystem.FileSystem> =>
|
||||
@@ -92,11 +106,9 @@ export const fileSystem = (
|
||||
entry.findings.push(...secretFindings(interaction))
|
||||
const cassette = buildCassette(name, entry.interactions, metadata)
|
||||
const findings = [...entry.findings, ...secretFindings(cassette.metadata ?? {})]
|
||||
if (findings.length === 0) {
|
||||
yield* ensureDirectory(name)
|
||||
yield* fs.writeFileString(cassettePath(name), formatCassette(cassette)).pipe(Effect.orDie)
|
||||
}
|
||||
return { findings }
|
||||
yield* failIfUnsafe(name, findings)
|
||||
yield* ensureDirectory(name)
|
||||
yield* fs.writeFileString(cassettePath(name), formatCassette(cassette)).pipe(Effect.orDie)
|
||||
}),
|
||||
exists: (name) =>
|
||||
fs.access(cassettePath(name)).pipe(
|
||||
@@ -108,12 +120,7 @@ export const fileSystem = (
|
||||
Effect.map((files) =>
|
||||
files
|
||||
.filter((file) => file.endsWith(".json"))
|
||||
.map((file) =>
|
||||
path
|
||||
.relative(directory, file)
|
||||
.replace(/\\/g, "/")
|
||||
.replace(/\.json$/, ""),
|
||||
)
|
||||
.map((file) => path.relative(directory, file).replace(/\\/g, "/").replace(/\.json$/, ""))
|
||||
.toSorted((a, b) => a.localeCompare(b)),
|
||||
),
|
||||
),
|
||||
@@ -133,17 +140,17 @@ export const memory = (initial: Record<string, ReadonlyArray<Interaction>> = {})
|
||||
stored.has(name)
|
||||
? Effect.succeed(stored.get(name) ?? [])
|
||||
: Effect.fail(new CassetteNotFoundError({ cassetteName: name })),
|
||||
append: (name, interaction, metadata) =>
|
||||
Effect.sync(() => {
|
||||
const existing = stored.get(name)
|
||||
if (existing) existing.push(interaction)
|
||||
else stored.set(name, [interaction])
|
||||
const findings = accumulatedFindings.get(name)
|
||||
if (findings) findings.push(...secretFindings(interaction))
|
||||
else accumulatedFindings.set(name, [...secretFindings(interaction)])
|
||||
if (metadata) accumulatedFindings.get(name)!.push(...secretFindings({ name, ...metadata }))
|
||||
return { findings: accumulatedFindings.get(name) ?? [] }
|
||||
}),
|
||||
append: (name, interaction, metadata) => {
|
||||
const existing = stored.get(name)
|
||||
if (existing) existing.push(interaction)
|
||||
else stored.set(name, [interaction])
|
||||
const existingFindings = accumulatedFindings.get(name)
|
||||
const findings = existingFindings ?? []
|
||||
if (!existingFindings) accumulatedFindings.set(name, findings)
|
||||
findings.push(...secretFindings(interaction))
|
||||
if (metadata) findings.push(...secretFindings({ name, ...metadata }))
|
||||
return failIfUnsafe(name, findings)
|
||||
},
|
||||
exists: (name) => Effect.sync(() => stored.has(name)),
|
||||
list: () => Effect.sync(() => Array.from(stored.keys()).toSorted()),
|
||||
})
|
||||
|
||||
@@ -12,7 +12,7 @@ import {
|
||||
} from "effect/unstable/http"
|
||||
import * as CassetteService from "./cassette"
|
||||
import { defaultMatcher, selectSequential, type RequestMatcher } from "./matching"
|
||||
import { appendOrFail, makeReplayState, resolveAutoMode } from "./recorder"
|
||||
import { makeReplayState, resolveAutoMode } from "./recorder"
|
||||
import { defaults, type Redactor } from "./redactor"
|
||||
import { redactUrl } from "./redaction"
|
||||
import { httpInteractions, type CassetteMetadata, type HttpInteraction, type ResponseSnapshot } from "./schema"
|
||||
@@ -100,9 +100,9 @@ export const recordingLayer = (
|
||||
...captured,
|
||||
}),
|
||||
}
|
||||
yield* appendOrFail(cassetteService, name, interaction, options.metadata).pipe(
|
||||
Effect.catchTag("UnsafeCassetteError", (error) => Effect.fail(transportError(request, error.message))),
|
||||
)
|
||||
yield* cassetteService
|
||||
.append(name, interaction, options.metadata)
|
||||
.pipe(Effect.catchTag("UnsafeCassetteError", (error) => Effect.fail(transportError(request, error.message))))
|
||||
return HttpClientResponse.fromWeb(
|
||||
request,
|
||||
new Response(decodeResponseBody(interaction.response), interaction.response),
|
||||
|
||||
@@ -7,10 +7,9 @@ export type {
|
||||
WebSocketFrame,
|
||||
WebSocketInteraction,
|
||||
} from "./schema"
|
||||
export { CassetteNotFoundError, hasCassetteSync } from "./cassette"
|
||||
export { CassetteNotFoundError, hasCassetteSync, UnsafeCassetteError } from "./cassette"
|
||||
export { defaultMatcher, type RequestMatcher } from "./matching"
|
||||
export { redactHeaders, redactUrl, secretFindings, type SecretFinding } from "./redaction"
|
||||
export { UnsafeCassetteError } from "./recorder"
|
||||
export { cassetteLayer, recordingLayer, type RecordReplayMode, type RecordReplayOptions } from "./effect"
|
||||
export {
|
||||
makeWebSocketExecutor,
|
||||
|
||||
@@ -36,7 +36,7 @@ export const canonicalSnapshot = (snapshot: RequestSnapshot): string =>
|
||||
export const defaultMatcher: RequestMatcher = (incoming, recorded) =>
|
||||
canonicalSnapshot(incoming) === canonicalSnapshot(recorded)
|
||||
|
||||
const safeText = (value: unknown) => {
|
||||
export const safeText = (value: unknown) => {
|
||||
if (value === undefined) return "undefined"
|
||||
if (secretFindings(value).length > 0) return JSON.stringify(REDACTED)
|
||||
const text = JSON.stringify(value)
|
||||
|
||||
@@ -1,47 +1,22 @@
|
||||
import { Effect, Ref, Schema, Scope } from "effect"
|
||||
import { Effect, Ref, Scope } from "effect"
|
||||
import type * as CassetteService from "./cassette"
|
||||
import type { CassetteNotFoundError } from "./cassette"
|
||||
import { SecretFindingSchema } from "./redaction"
|
||||
import type { CassetteMetadata, Interaction } from "./schema"
|
||||
|
||||
export class UnsafeCassetteError extends Schema.TaggedErrorClass<UnsafeCassetteError>()("UnsafeCassetteError", {
|
||||
cassetteName: Schema.String,
|
||||
findings: Schema.Array(SecretFindingSchema),
|
||||
}) {
|
||||
override get message() {
|
||||
return `Refusing to write cassette "${this.cassetteName}" because it contains possible secrets: ${this.findings
|
||||
.map((finding) => `${finding.path} (${finding.reason})`)
|
||||
.join(", ")}`
|
||||
}
|
||||
}
|
||||
|
||||
export type ResolvedMode = "record" | "replay" | "passthrough"
|
||||
import type { Interaction } from "./schema"
|
||||
|
||||
const isCI = () => {
|
||||
const value = process.env.CI
|
||||
return value !== undefined && value !== "" && value !== "false" && value !== "0"
|
||||
}
|
||||
|
||||
export const resolveAutoMode = (cassette: CassetteService.Interface, name: string): Effect.Effect<ResolvedMode> =>
|
||||
export const resolveAutoMode = (
|
||||
cassette: CassetteService.Interface,
|
||||
name: string,
|
||||
): Effect.Effect<"record" | "replay" | "passthrough"> =>
|
||||
Effect.gen(function* () {
|
||||
if (isCI()) return "replay"
|
||||
return (yield* cassette.exists(name)) ? "replay" : "record"
|
||||
})
|
||||
|
||||
export const appendOrFail = (
|
||||
cassette: CassetteService.Interface,
|
||||
name: string,
|
||||
interaction: Interaction,
|
||||
metadata: CassetteMetadata | undefined,
|
||||
): Effect.Effect<void, UnsafeCassetteError> =>
|
||||
cassette
|
||||
.append(name, interaction, metadata)
|
||||
.pipe(
|
||||
Effect.flatMap(({ findings }) =>
|
||||
findings.length === 0 ? Effect.void : Effect.fail(new UnsafeCassetteError({ cassetteName: name, findings })),
|
||||
),
|
||||
)
|
||||
|
||||
export interface ReplayState<T> {
|
||||
readonly load: Effect.Effect<ReadonlyArray<T>, CassetteNotFoundError>
|
||||
readonly cursor: Effect.Effect<number>
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { Schema } from "effect"
|
||||
|
||||
export const REDACTED = "[REDACTED]"
|
||||
|
||||
const DEFAULT_REDACT_HEADERS = [
|
||||
@@ -95,8 +97,6 @@ export const redactHeaders = (
|
||||
)
|
||||
}
|
||||
|
||||
import { Schema } from "effect"
|
||||
|
||||
export const SecretFindingSchema = Schema.Struct({
|
||||
path: Schema.String,
|
||||
reason: Schema.String,
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { Effect, Option, Ref, Scope, Stream } from "effect"
|
||||
import type { Headers } from "effect/unstable/http"
|
||||
import * as CassetteService from "./cassette"
|
||||
import { canonicalizeJson, decodeJson } from "./matching"
|
||||
import { appendOrFail, makeReplayState, resolveAutoMode } from "./recorder"
|
||||
import { canonicalizeJson, decodeJson, safeText } from "./matching"
|
||||
import { makeReplayState, resolveAutoMode } from "./recorder"
|
||||
import type { RecordReplayMode } from "./effect"
|
||||
import { redactUrl } from "./redaction"
|
||||
import { defaults, type Redactor } from "./redactor"
|
||||
import { webSocketInteractions, type CassetteMetadata, type WebSocketFrame } from "./schema"
|
||||
|
||||
@@ -53,7 +54,7 @@ const decodeFrameText = (frame: WebSocketFrame) =>
|
||||
const assertEqual = (message: string, actual: unknown, expected: unknown) =>
|
||||
Effect.sync(() => {
|
||||
if (JSON.stringify(actual) === JSON.stringify(expected)) return
|
||||
throw new Error(`${message}: expected ${JSON.stringify(expected)}, received ${JSON.stringify(actual)}`)
|
||||
throw new Error(`${message}: expected ${safeText(expected)}, received ${safeText(actual)}`)
|
||||
})
|
||||
|
||||
const jsonOrText = (value: string) => Option.match(decodeJson(value), { onNone: () => value, onSome: canonicalizeJson })
|
||||
@@ -61,7 +62,7 @@ const jsonOrText = (value: string) => Option.match(decodeJson(value), { onNone:
|
||||
const compareClientMessage = (actual: string, expected: WebSocketFrame | undefined, index: number, asJson: boolean) => {
|
||||
if (!expected)
|
||||
return Effect.sync(() => {
|
||||
throw new Error(`Unexpected WebSocket client frame ${index + 1}: ${actual}`)
|
||||
throw new Error(`Unexpected WebSocket client frame ${index + 1}: ${safeText(actual)}`)
|
||||
})
|
||||
const expectedText = decodeFrameText(expected)
|
||||
if (!asJson) return assertEqual(`WebSocket client frame ${index + 1}`, actual, expectedText)
|
||||
@@ -98,12 +99,13 @@ export const makeWebSocketExecutor = <E>(
|
||||
const closeOnce = Effect.gen(function* () {
|
||||
if (yield* Ref.getAndSet(closed, true)) return
|
||||
yield* connection.close
|
||||
yield* appendOrFail(
|
||||
options.cassette,
|
||||
options.name,
|
||||
{ transport: "websocket", open: openSnapshot(request), client, server },
|
||||
options.metadata,
|
||||
).pipe(Effect.orDie)
|
||||
yield* options.cassette
|
||||
.append(
|
||||
options.name,
|
||||
{ transport: "websocket", open: openSnapshot(request), client, server },
|
||||
options.metadata,
|
||||
)
|
||||
.pipe(Effect.orDie)
|
||||
})
|
||||
return {
|
||||
sendText: (message) =>
|
||||
@@ -111,10 +113,7 @@ export const makeWebSocketExecutor = <E>(
|
||||
.sendText(message)
|
||||
.pipe(Effect.tap(() => Effect.sync(() => client.push(encodeFrame(message))))),
|
||||
messages: connection.messages.pipe(
|
||||
Stream.map((message) => {
|
||||
server.push(encodeFrame(message))
|
||||
return message
|
||||
}),
|
||||
Stream.tap((message) => Effect.sync(() => server.push(encodeFrame(message)))),
|
||||
),
|
||||
close: closeOnce,
|
||||
}
|
||||
@@ -130,20 +129,22 @@ export const makeWebSocketExecutor = <E>(
|
||||
const interactions = yield* replay.load.pipe(Effect.orDie)
|
||||
const index = yield* replay.cursor
|
||||
const interaction = interactions[index]
|
||||
if (!interaction) return yield* Effect.die(new Error(`No recorded WebSocket interaction for ${request.url}`))
|
||||
if (!interaction)
|
||||
return yield* Effect.die(new Error(`No recorded WebSocket interaction for ${redactUrl(request.url)}`))
|
||||
yield* replay.advance
|
||||
yield* assertEqual(`WebSocket open frame ${index + 1}`, openSnapshot(request), interaction.open)
|
||||
const messageIndex = yield* Ref.make(0)
|
||||
return {
|
||||
sendText: (message) =>
|
||||
Effect.gen(function* () {
|
||||
const current = yield* Ref.getAndUpdate(messageIndex, (value) => value + 1)
|
||||
const current = yield* Ref.get(messageIndex)
|
||||
yield* compareClientMessage(
|
||||
message,
|
||||
interaction.client[current],
|
||||
current,
|
||||
options.compareClientMessagesAsJson === true,
|
||||
)
|
||||
yield* Ref.update(messageIndex, (value) => value + 1)
|
||||
}),
|
||||
messages: Stream.fromIterable(interaction.server).pipe(Stream.map(decodeFrameMessage)),
|
||||
close: Effect.gen(function* () {
|
||||
|
||||
@@ -323,4 +323,105 @@ describe("http-recorder", () => {
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
test("auto mode records to disk when the cassette is missing", async () => {
|
||||
const directory = fs.mkdtempSync(path.join(os.tmpdir(), "http-recorder-auto-record-"))
|
||||
using server = Bun.serve({ port: 0, fetch: () => new Response('{"reply":"recorded"}', { headers: { "content-type": "application/json" } }) })
|
||||
const url = `http://127.0.0.1:${server.port}/echo`
|
||||
// CI=true forces replay; clear it so we exercise the local-dev auto-record path.
|
||||
const previous = process.env.CI
|
||||
delete process.env.CI
|
||||
try {
|
||||
const result = await runWith("auto-record", { directory, mode: "auto" }, post(url, { step: 1 }))
|
||||
expect(result).toBe('{"reply":"recorded"}')
|
||||
expect(fs.existsSync(path.join(directory, "auto-record.json"))).toBe(true)
|
||||
} finally {
|
||||
if (previous !== undefined) process.env.CI = previous
|
||||
}
|
||||
})
|
||||
|
||||
test("passthrough mode bypasses the recorder entirely", async () => {
|
||||
using server = Bun.serve({ port: 0, fetch: () => new Response("from-upstream") })
|
||||
const url = `http://127.0.0.1:${server.port}/path`
|
||||
const directory = fs.mkdtempSync(path.join(os.tmpdir(), "http-recorder-passthrough-"))
|
||||
|
||||
const result = await runWith("passthrough-noop", { directory, mode: "passthrough" }, post(url, {}))
|
||||
expect(result).toBe("from-upstream")
|
||||
expect(fs.existsSync(path.join(directory, "passthrough-noop.json"))).toBe(false)
|
||||
})
|
||||
|
||||
test("UnsafeCassetteError fails the request when a recording would write a known secret", async () => {
|
||||
using server = Bun.serve({ port: 0, fetch: () => new Response("Bearer abcdefghijklmnopqrstuvwxyz1234") })
|
||||
const url = `http://127.0.0.1:${server.port}/leaky`
|
||||
const directory = fs.mkdtempSync(path.join(os.tmpdir(), "http-recorder-unsafe-"))
|
||||
|
||||
const exit = await Effect.runPromise(
|
||||
Effect.exit(
|
||||
post(url, { ok: true }).pipe(
|
||||
Effect.provide(HttpRecorder.cassetteLayer("unsafe-record", { directory, mode: "record" })),
|
||||
),
|
||||
),
|
||||
)
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
expect(failureText(exit)).toContain("contains possible secrets")
|
||||
expect(fs.existsSync(path.join(directory, "unsafe-record.json"))).toBe(false)
|
||||
})
|
||||
|
||||
test("Cassette.list enumerates recorded cassette names", async () => {
|
||||
const directory = fs.mkdtempSync(path.join(os.tmpdir(), "http-recorder-list-"))
|
||||
await seedCassetteDirectory(directory, "alpha/one", [
|
||||
{ transport: "http", request: { method: "GET", url: "https://x.test/a", headers: {}, body: "" }, response: { status: 200, headers: {}, body: "a" } },
|
||||
])
|
||||
await seedCassetteDirectory(directory, "beta", [
|
||||
{ transport: "http", request: { method: "GET", url: "https://x.test/b", headers: {}, body: "" }, response: { status: 200, headers: {}, body: "b" } },
|
||||
])
|
||||
|
||||
const names = await Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const cassette = yield* HttpRecorder.Cassette.Service
|
||||
return yield* cassette.list()
|
||||
}).pipe(Effect.provide(HttpRecorder.Cassette.fileSystem({ directory })), Effect.provide(NodeFileSystem.layer)),
|
||||
)
|
||||
expect(names).toEqual(["alpha/one", "beta"])
|
||||
})
|
||||
|
||||
test("WebSocket replay decodes binary frames recorded as base64", async () => {
|
||||
const binaryServer = new Uint8Array([1, 2, 3, 4])
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const cassette = yield* HttpRecorder.Cassette.Service
|
||||
const executor = yield* HttpRecorder.makeWebSocketExecutor({
|
||||
name: "ws/binary",
|
||||
cassette,
|
||||
live: { open: () => Effect.die(new Error("unexpected live WebSocket open")) },
|
||||
})
|
||||
const connection = yield* executor.open({
|
||||
url: "wss://example.test/binary",
|
||||
headers: Headers.fromInput({}),
|
||||
})
|
||||
const messages: Array<string | Uint8Array> = []
|
||||
yield* connection.messages.pipe(Stream.runForEach((m) => Effect.sync(() => messages.push(m))))
|
||||
yield* connection.close
|
||||
|
||||
expect(messages).toHaveLength(1)
|
||||
expect(messages[0]).toBeInstanceOf(Uint8Array)
|
||||
expect(Array.from(messages[0] as Uint8Array)).toEqual([1, 2, 3, 4])
|
||||
}).pipe(
|
||||
Effect.provide(
|
||||
HttpRecorder.Cassette.memory({
|
||||
"ws/binary": [
|
||||
{
|
||||
transport: "websocket",
|
||||
open: { url: "wss://example.test/binary", headers: {} },
|
||||
client: [],
|
||||
server: [{ kind: "binary", body: Buffer.from(binaryServer).toString("base64"), bodyEncoding: "base64" }],
|
||||
},
|
||||
],
|
||||
}),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user