From b4fc5ef071eb26d7aa0889bb2ea067ff82d424c2 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 14 May 2026 14:03:22 -0400 Subject: [PATCH] refactor(http-recorder): tighten cassette safety, fix WS leaks + docs (#26730) --- packages/http-recorder/README.md | 25 +++-- packages/http-recorder/src/cassette.ts | 59 +++++----- packages/http-recorder/src/effect.ts | 8 +- packages/http-recorder/src/index.ts | 3 +- packages/http-recorder/src/matching.ts | 2 +- packages/http-recorder/src/recorder.ts | 37 ++----- packages/http-recorder/src/redaction.ts | 4 +- packages/http-recorder/src/websocket.ts | 33 +++--- .../http-recorder/test/record-replay.test.ts | 101 ++++++++++++++++++ 9 files changed, 177 insertions(+), 95 deletions(-) diff --git a/packages/http-recorder/README.md b/packages/http-recorder/README.md index 5920c9670a..ab1d39436e 100644 --- a/packages/http-recorder/README.md +++ b/packages/http-recorder/README.md @@ -166,11 +166,11 @@ import { Effect } from "effect" const audit = Effect.gen(function* () { const cassettes = yield* HttpRecorder.Cassette.Service - const entries = yield* cassettes.list() - const issues = yield* Effect.forEach(entries, (entry) => + const names = yield* cassettes.list() + const issues = yield* Effect.forEach(names, (name) => cassettes - .read(entry.name) - .pipe(Effect.map((interactions) => ({ name: entry.name, findings: HttpRecorder.secretFindings(interactions) }))), + .read(name) + .pipe(Effect.map((interactions) => ({ name, findings: HttpRecorder.secretFindings(interactions) }))), ) return issues.filter((i) => i.findings.length > 0) }) @@ -198,12 +198,11 @@ type RecordReplayOptions = { | File | Purpose | | -------------- | -------------------------------------------------------------------------------- | -| `effect.ts` | `cassetteLayer` / `recordingLayer` — the `HttpClient` adapter. | -| `websocket.ts` | `makeWebSocketExecutor` — WebSocket record/replay. | -| `cassette.ts` | `Cassette.Service` — reads/writes cassette files, accumulates state. | -| `recorder.ts` | Shared transport plumbing: `UnsafeCassetteError`, `appendOrFail`, `ReplayState`. | -| `redactor.ts` | Composable `Redactor` — headers, url, body redaction. | -| `redaction.ts` | Lower-level header/URL primitives + secret pattern detection. | -| `schema.ts` | Effect Schema definitions for the cassette JSON format. | -| `storage.ts` | Path resolution, JSON encode/decode, sync existence check. | -| `matching.ts` | Request matcher, canonicalization, sequential cursor, mismatch diagnostics. | +| `effect.ts` | `cassetteLayer` / `recordingLayer` — the `HttpClient` adapter. | +| `websocket.ts` | `makeWebSocketExecutor` — WebSocket record/replay. | +| `cassette.ts` | `Cassette.Service` — `fileSystem` / `memory` adapters, error types. | +| `recorder.ts` | Shared transport plumbing: `resolveAutoMode`, `ReplayState`. | +| `redactor.ts` | Composable `Redactor` — headers, url, body redaction. | +| `redaction.ts` | Lower-level header/URL primitives + secret pattern detection. | +| `schema.ts` | Effect Schema definitions for the cassette JSON format. | +| `matching.ts` | Request matcher, canonicalization, sequential cursor, mismatch diagnostics. | diff --git a/packages/http-recorder/src/cassette.ts b/packages/http-recorder/src/cassette.ts index 3897f0222c..d2e37dd61a 100644 --- a/packages/http-recorder/src/cassette.ts +++ b/packages/http-recorder/src/cassette.ts @@ -1,7 +1,7 @@ import { Context, Effect, FileSystem, Layer, Schema } from "effect" import * as fs from "node:fs" import * as path from "node:path" -import { secretFindings, type SecretFinding } from "./redaction" +import { secretFindings, SecretFindingSchema, type SecretFinding } from "./redaction" import { decodeCassette, encodeCassette, type Cassette, type CassetteMetadata, type Interaction } from "./schema" const DEFAULT_RECORDINGS_DIR = path.resolve(process.cwd(), "test", "fixtures", "recordings") @@ -14,13 +14,24 @@ export class CassetteNotFoundError extends Schema.TaggedErrorClass +export class UnsafeCassetteError extends Schema.TaggedErrorClass()("UnsafeCassetteError", { + cassetteName: Schema.String, + findings: Schema.Array(SecretFindingSchema), +}) { + override get message() { + return `Refusing to write cassette "${this.cassetteName}" because it contains possible secrets: ${this.findings + .map((finding) => `${finding.path} (${finding.reason})`) + .join(", ")}` + } } export interface Interface { readonly read: (name: string) => Effect.Effect, CassetteNotFoundError> - readonly append: (name: string, interaction: Interaction, metadata?: CassetteMetadata) => Effect.Effect + readonly append: ( + name: string, + interaction: Interaction, + metadata?: CassetteMetadata, + ) => Effect.Effect readonly exists: (name: string) => Effect.Effect readonly list: () => Effect.Effect> } @@ -44,6 +55,9 @@ const formatCassette = (cassette: Cassette) => `${JSON.stringify(encodeCassette( const parseCassette = (raw: string) => decodeCassette(JSON.parse(raw)) +const failIfUnsafe = (name: string, findings: ReadonlyArray) => + findings.length === 0 ? Effect.void : Effect.fail(new UnsafeCassetteError({ cassetteName: name, findings })) + export const fileSystem = ( options: { readonly directory?: string } = {}, ): Layer.Layer => @@ -92,11 +106,9 @@ export const fileSystem = ( entry.findings.push(...secretFindings(interaction)) const cassette = buildCassette(name, entry.interactions, metadata) const findings = [...entry.findings, ...secretFindings(cassette.metadata ?? {})] - if (findings.length === 0) { - yield* ensureDirectory(name) - yield* fs.writeFileString(cassettePath(name), formatCassette(cassette)).pipe(Effect.orDie) - } - return { findings } + yield* failIfUnsafe(name, findings) + yield* ensureDirectory(name) + yield* fs.writeFileString(cassettePath(name), formatCassette(cassette)).pipe(Effect.orDie) }), exists: (name) => fs.access(cassettePath(name)).pipe( @@ -108,12 +120,7 @@ export const fileSystem = ( Effect.map((files) => files .filter((file) => file.endsWith(".json")) - .map((file) => - path - .relative(directory, file) - .replace(/\\/g, "/") - .replace(/\.json$/, ""), - ) + .map((file) => path.relative(directory, file).replace(/\\/g, "/").replace(/\.json$/, "")) .toSorted((a, b) => a.localeCompare(b)), ), ), @@ -133,17 +140,17 @@ export const memory = (initial: Record> = {}) stored.has(name) ? Effect.succeed(stored.get(name) ?? []) : Effect.fail(new CassetteNotFoundError({ cassetteName: name })), - append: (name, interaction, metadata) => - Effect.sync(() => { - const existing = stored.get(name) - if (existing) existing.push(interaction) - else stored.set(name, [interaction]) - const findings = accumulatedFindings.get(name) - if (findings) findings.push(...secretFindings(interaction)) - else accumulatedFindings.set(name, [...secretFindings(interaction)]) - if (metadata) accumulatedFindings.get(name)!.push(...secretFindings({ name, ...metadata })) - return { findings: accumulatedFindings.get(name) ?? [] } - }), + append: (name, interaction, metadata) => { + const existing = stored.get(name) + if (existing) existing.push(interaction) + else stored.set(name, [interaction]) + const existingFindings = accumulatedFindings.get(name) + const findings = existingFindings ?? [] + if (!existingFindings) accumulatedFindings.set(name, findings) + findings.push(...secretFindings(interaction)) + if (metadata) findings.push(...secretFindings({ name, ...metadata })) + return failIfUnsafe(name, findings) + }, exists: (name) => Effect.sync(() => stored.has(name)), list: () => Effect.sync(() => Array.from(stored.keys()).toSorted()), }) diff --git a/packages/http-recorder/src/effect.ts b/packages/http-recorder/src/effect.ts index 61193a013c..8170b99976 100644 --- a/packages/http-recorder/src/effect.ts +++ b/packages/http-recorder/src/effect.ts @@ -12,7 +12,7 @@ import { } from "effect/unstable/http" import * as CassetteService from "./cassette" import { defaultMatcher, selectSequential, type RequestMatcher } from "./matching" -import { appendOrFail, makeReplayState, resolveAutoMode } from "./recorder" +import { makeReplayState, resolveAutoMode } from "./recorder" import { defaults, type Redactor } from "./redactor" import { redactUrl } from "./redaction" import { httpInteractions, type CassetteMetadata, type HttpInteraction, type ResponseSnapshot } from "./schema" @@ -100,9 +100,9 @@ export const recordingLayer = ( ...captured, }), } - yield* appendOrFail(cassetteService, name, interaction, options.metadata).pipe( - Effect.catchTag("UnsafeCassetteError", (error) => Effect.fail(transportError(request, error.message))), - ) + yield* cassetteService + .append(name, interaction, options.metadata) + .pipe(Effect.catchTag("UnsafeCassetteError", (error) => Effect.fail(transportError(request, error.message)))) return HttpClientResponse.fromWeb( request, new Response(decodeResponseBody(interaction.response), interaction.response), diff --git a/packages/http-recorder/src/index.ts b/packages/http-recorder/src/index.ts index 4b47e4513d..d05098996e 100644 --- a/packages/http-recorder/src/index.ts +++ b/packages/http-recorder/src/index.ts @@ -7,10 +7,9 @@ export type { WebSocketFrame, WebSocketInteraction, } from "./schema" -export { CassetteNotFoundError, hasCassetteSync } from "./cassette" +export { CassetteNotFoundError, hasCassetteSync, UnsafeCassetteError } from "./cassette" export { defaultMatcher, type RequestMatcher } from "./matching" export { redactHeaders, redactUrl, secretFindings, type SecretFinding } from "./redaction" -export { UnsafeCassetteError } from "./recorder" export { cassetteLayer, recordingLayer, type RecordReplayMode, type RecordReplayOptions } from "./effect" export { makeWebSocketExecutor, diff --git a/packages/http-recorder/src/matching.ts b/packages/http-recorder/src/matching.ts index ab647ab37a..1dced17d07 100644 --- a/packages/http-recorder/src/matching.ts +++ b/packages/http-recorder/src/matching.ts @@ -36,7 +36,7 @@ export const canonicalSnapshot = (snapshot: RequestSnapshot): string => export const defaultMatcher: RequestMatcher = (incoming, recorded) => canonicalSnapshot(incoming) === canonicalSnapshot(recorded) -const safeText = (value: unknown) => { +export const safeText = (value: unknown) => { if (value === undefined) return "undefined" if (secretFindings(value).length > 0) return JSON.stringify(REDACTED) const text = JSON.stringify(value) diff --git a/packages/http-recorder/src/recorder.ts b/packages/http-recorder/src/recorder.ts index 460b427c2a..0a13282c2d 100644 --- a/packages/http-recorder/src/recorder.ts +++ b/packages/http-recorder/src/recorder.ts @@ -1,47 +1,22 @@ -import { Effect, Ref, Schema, Scope } from "effect" +import { Effect, Ref, Scope } from "effect" import type * as CassetteService from "./cassette" import type { CassetteNotFoundError } from "./cassette" -import { SecretFindingSchema } from "./redaction" -import type { CassetteMetadata, Interaction } from "./schema" - -export class UnsafeCassetteError extends Schema.TaggedErrorClass()("UnsafeCassetteError", { - cassetteName: Schema.String, - findings: Schema.Array(SecretFindingSchema), -}) { - override get message() { - return `Refusing to write cassette "${this.cassetteName}" because it contains possible secrets: ${this.findings - .map((finding) => `${finding.path} (${finding.reason})`) - .join(", ")}` - } -} - -export type ResolvedMode = "record" | "replay" | "passthrough" +import type { Interaction } from "./schema" const isCI = () => { const value = process.env.CI return value !== undefined && value !== "" && value !== "false" && value !== "0" } -export const resolveAutoMode = (cassette: CassetteService.Interface, name: string): Effect.Effect => +export const resolveAutoMode = ( + cassette: CassetteService.Interface, + name: string, +): Effect.Effect<"record" | "replay" | "passthrough"> => Effect.gen(function* () { if (isCI()) return "replay" return (yield* cassette.exists(name)) ? "replay" : "record" }) -export const appendOrFail = ( - cassette: CassetteService.Interface, - name: string, - interaction: Interaction, - metadata: CassetteMetadata | undefined, -): Effect.Effect => - cassette - .append(name, interaction, metadata) - .pipe( - Effect.flatMap(({ findings }) => - findings.length === 0 ? Effect.void : Effect.fail(new UnsafeCassetteError({ cassetteName: name, findings })), - ), - ) - export interface ReplayState { readonly load: Effect.Effect, CassetteNotFoundError> readonly cursor: Effect.Effect diff --git a/packages/http-recorder/src/redaction.ts b/packages/http-recorder/src/redaction.ts index b6aa8b3b87..9d89fff2f5 100644 --- a/packages/http-recorder/src/redaction.ts +++ b/packages/http-recorder/src/redaction.ts @@ -1,3 +1,5 @@ +import { Schema } from "effect" + export const REDACTED = "[REDACTED]" const DEFAULT_REDACT_HEADERS = [ @@ -95,8 +97,6 @@ export const redactHeaders = ( ) } -import { Schema } from "effect" - export const SecretFindingSchema = Schema.Struct({ path: Schema.String, reason: Schema.String, diff --git a/packages/http-recorder/src/websocket.ts b/packages/http-recorder/src/websocket.ts index f7529b4888..2cccc6749c 100644 --- a/packages/http-recorder/src/websocket.ts +++ b/packages/http-recorder/src/websocket.ts @@ -1,9 +1,10 @@ import { Effect, Option, Ref, Scope, Stream } from "effect" import type { Headers } from "effect/unstable/http" import * as CassetteService from "./cassette" -import { canonicalizeJson, decodeJson } from "./matching" -import { appendOrFail, makeReplayState, resolveAutoMode } from "./recorder" +import { canonicalizeJson, decodeJson, safeText } from "./matching" +import { makeReplayState, resolveAutoMode } from "./recorder" import type { RecordReplayMode } from "./effect" +import { redactUrl } from "./redaction" import { defaults, type Redactor } from "./redactor" import { webSocketInteractions, type CassetteMetadata, type WebSocketFrame } from "./schema" @@ -53,7 +54,7 @@ const decodeFrameText = (frame: WebSocketFrame) => const assertEqual = (message: string, actual: unknown, expected: unknown) => Effect.sync(() => { if (JSON.stringify(actual) === JSON.stringify(expected)) return - throw new Error(`${message}: expected ${JSON.stringify(expected)}, received ${JSON.stringify(actual)}`) + throw new Error(`${message}: expected ${safeText(expected)}, received ${safeText(actual)}`) }) const jsonOrText = (value: string) => Option.match(decodeJson(value), { onNone: () => value, onSome: canonicalizeJson }) @@ -61,7 +62,7 @@ const jsonOrText = (value: string) => Option.match(decodeJson(value), { onNone: const compareClientMessage = (actual: string, expected: WebSocketFrame | undefined, index: number, asJson: boolean) => { if (!expected) return Effect.sync(() => { - throw new Error(`Unexpected WebSocket client frame ${index + 1}: ${actual}`) + throw new Error(`Unexpected WebSocket client frame ${index + 1}: ${safeText(actual)}`) }) const expectedText = decodeFrameText(expected) if (!asJson) return assertEqual(`WebSocket client frame ${index + 1}`, actual, expectedText) @@ -98,12 +99,13 @@ export const makeWebSocketExecutor = ( const closeOnce = Effect.gen(function* () { if (yield* Ref.getAndSet(closed, true)) return yield* connection.close - yield* appendOrFail( - options.cassette, - options.name, - { transport: "websocket", open: openSnapshot(request), client, server }, - options.metadata, - ).pipe(Effect.orDie) + yield* options.cassette + .append( + options.name, + { transport: "websocket", open: openSnapshot(request), client, server }, + options.metadata, + ) + .pipe(Effect.orDie) }) return { sendText: (message) => @@ -111,10 +113,7 @@ export const makeWebSocketExecutor = ( .sendText(message) .pipe(Effect.tap(() => Effect.sync(() => client.push(encodeFrame(message))))), messages: connection.messages.pipe( - Stream.map((message) => { - server.push(encodeFrame(message)) - return message - }), + Stream.tap((message) => Effect.sync(() => server.push(encodeFrame(message)))), ), close: closeOnce, } @@ -130,20 +129,22 @@ export const makeWebSocketExecutor = ( const interactions = yield* replay.load.pipe(Effect.orDie) const index = yield* replay.cursor const interaction = interactions[index] - if (!interaction) return yield* Effect.die(new Error(`No recorded WebSocket interaction for ${request.url}`)) + if (!interaction) + return yield* Effect.die(new Error(`No recorded WebSocket interaction for ${redactUrl(request.url)}`)) yield* replay.advance yield* assertEqual(`WebSocket open frame ${index + 1}`, openSnapshot(request), interaction.open) const messageIndex = yield* Ref.make(0) return { sendText: (message) => Effect.gen(function* () { - const current = yield* Ref.getAndUpdate(messageIndex, (value) => value + 1) + const current = yield* Ref.get(messageIndex) yield* compareClientMessage( message, interaction.client[current], current, options.compareClientMessagesAsJson === true, ) + yield* Ref.update(messageIndex, (value) => value + 1) }), messages: Stream.fromIterable(interaction.server).pipe(Stream.map(decodeFrameMessage)), close: Effect.gen(function* () { diff --git a/packages/http-recorder/test/record-replay.test.ts b/packages/http-recorder/test/record-replay.test.ts index 503f87ac50..0538f897f1 100644 --- a/packages/http-recorder/test/record-replay.test.ts +++ b/packages/http-recorder/test/record-replay.test.ts @@ -323,4 +323,105 @@ describe("http-recorder", () => { }), ) }) + + test("auto mode records to disk when the cassette is missing", async () => { + const directory = fs.mkdtempSync(path.join(os.tmpdir(), "http-recorder-auto-record-")) + using server = Bun.serve({ port: 0, fetch: () => new Response('{"reply":"recorded"}', { headers: { "content-type": "application/json" } }) }) + const url = `http://127.0.0.1:${server.port}/echo` + // CI=true forces replay; clear it so we exercise the local-dev auto-record path. + const previous = process.env.CI + delete process.env.CI + try { + const result = await runWith("auto-record", { directory, mode: "auto" }, post(url, { step: 1 })) + expect(result).toBe('{"reply":"recorded"}') + expect(fs.existsSync(path.join(directory, "auto-record.json"))).toBe(true) + } finally { + if (previous !== undefined) process.env.CI = previous + } + }) + + test("passthrough mode bypasses the recorder entirely", async () => { + using server = Bun.serve({ port: 0, fetch: () => new Response("from-upstream") }) + const url = `http://127.0.0.1:${server.port}/path` + const directory = fs.mkdtempSync(path.join(os.tmpdir(), "http-recorder-passthrough-")) + + const result = await runWith("passthrough-noop", { directory, mode: "passthrough" }, post(url, {})) + expect(result).toBe("from-upstream") + expect(fs.existsSync(path.join(directory, "passthrough-noop.json"))).toBe(false) + }) + + test("UnsafeCassetteError fails the request when a recording would write a known secret", async () => { + using server = Bun.serve({ port: 0, fetch: () => new Response("Bearer abcdefghijklmnopqrstuvwxyz1234") }) + const url = `http://127.0.0.1:${server.port}/leaky` + const directory = fs.mkdtempSync(path.join(os.tmpdir(), "http-recorder-unsafe-")) + + const exit = await Effect.runPromise( + Effect.exit( + post(url, { ok: true }).pipe( + Effect.provide(HttpRecorder.cassetteLayer("unsafe-record", { directory, mode: "record" })), + ), + ), + ) + expect(Exit.isFailure(exit)).toBe(true) + expect(failureText(exit)).toContain("contains possible secrets") + expect(fs.existsSync(path.join(directory, "unsafe-record.json"))).toBe(false) + }) + + test("Cassette.list enumerates recorded cassette names", async () => { + const directory = fs.mkdtempSync(path.join(os.tmpdir(), "http-recorder-list-")) + await seedCassetteDirectory(directory, "alpha/one", [ + { transport: "http", request: { method: "GET", url: "https://x.test/a", headers: {}, body: "" }, response: { status: 200, headers: {}, body: "a" } }, + ]) + await seedCassetteDirectory(directory, "beta", [ + { transport: "http", request: { method: "GET", url: "https://x.test/b", headers: {}, body: "" }, response: { status: 200, headers: {}, body: "b" } }, + ]) + + const names = await Effect.runPromise( + Effect.gen(function* () { + const cassette = yield* HttpRecorder.Cassette.Service + return yield* cassette.list() + }).pipe(Effect.provide(HttpRecorder.Cassette.fileSystem({ directory })), Effect.provide(NodeFileSystem.layer)), + ) + expect(names).toEqual(["alpha/one", "beta"]) + }) + + test("WebSocket replay decodes binary frames recorded as base64", async () => { + const binaryServer = new Uint8Array([1, 2, 3, 4]) + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const cassette = yield* HttpRecorder.Cassette.Service + const executor = yield* HttpRecorder.makeWebSocketExecutor({ + name: "ws/binary", + cassette, + live: { open: () => Effect.die(new Error("unexpected live WebSocket open")) }, + }) + const connection = yield* executor.open({ + url: "wss://example.test/binary", + headers: Headers.fromInput({}), + }) + const messages: Array = [] + yield* connection.messages.pipe(Stream.runForEach((m) => Effect.sync(() => messages.push(m)))) + yield* connection.close + + expect(messages).toHaveLength(1) + expect(messages[0]).toBeInstanceOf(Uint8Array) + expect(Array.from(messages[0] as Uint8Array)).toEqual([1, 2, 3, 4]) + }).pipe( + Effect.provide( + HttpRecorder.Cassette.memory({ + "ws/binary": [ + { + transport: "websocket", + open: { url: "wss://example.test/binary", headers: {} }, + client: [], + server: [{ kind: "binary", body: Buffer.from(binaryServer).toString("base64"), bodyEncoding: "base64" }], + }, + ], + }), + ), + ), + ), + ) + }) })