Compare commits

...

12 Commits

Author SHA1 Message Date
Kit Langton
c9ada22e35 fix(test): skip flaky extra fd input on windows 2026-03-23 22:38:39 -04:00
Kit Langton
5274f59d88 fix(test): stream extra fd input on windows 2026-03-23 22:27:55 -04:00
Kit Langton
d067fbe3fd fix(windows): use overlapped stdio for spawned pipes 2026-03-23 22:16:26 -04:00
Kit Langton
46b06df4f1 fix(windows): close extra fd inputs in effect spawner 2026-03-23 21:56:22 -04:00
Kit Langton
ffc45343d8 fix(process): wait for child stdio to close 2026-03-23 21:35:50 -04:00
Kit Langton
b4acbd5cd7 fix(windows): avoid taskkill cleanup after exited processes 2026-03-23 21:08:03 -04:00
Kit Langton
8bc4adafd4 fix(windows): hide spawned process windows in effect spawner 2026-03-23 20:42:47 -04:00
Kit Langton
d15c6df518 fix(test): use node for portable cross-spawn tests 2026-03-23 19:49:17 -04:00
Kit Langton
9a04209dd0 Merge branch 'dev' into kit/cross-spawn-spawner 2026-03-23 19:32:46 -04:00
Kit Langton
5da672be32 fix(test): use sh -c for portable echo in unconfigured fd test 2026-03-23 13:25:47 -04:00
Kit Langton
a9621936fa Merge branch 'dev' into kit/cross-spawn-spawner 2026-03-23 13:11:56 -04:00
Kit Langton
9925faa465 fix: restore cross-spawn behavior for effect child processes
Bring back the Windows command resolution and shell behavior we lost during the Effect migration by adding a cross-spawn-backed child process spawner. Wire installation and snapshot through it and cover the regression cases with focused process tests.
2026-03-23 10:53:49 -04:00
4 changed files with 924 additions and 5 deletions

View File

@@ -0,0 +1,476 @@
import type * as Arr from "effect/Array"
import { NodeSink, NodeStream } from "@effect/platform-node"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import * as FileSystem from "effect/FileSystem"
import * as Layer from "effect/Layer"
import * as Path from "effect/Path"
import * as PlatformError from "effect/PlatformError"
import * as Predicate from "effect/Predicate"
import type * as Scope from "effect/Scope"
import * as Sink from "effect/Sink"
import * as Stream from "effect/Stream"
import * as ChildProcess from "effect/unstable/process/ChildProcess"
import type { ChildProcessHandle } from "effect/unstable/process/ChildProcessSpawner"
import {
ChildProcessSpawner,
ExitCode,
make as makeSpawner,
makeHandle,
ProcessId,
} from "effect/unstable/process/ChildProcessSpawner"
import * as NodeChildProcess from "node:child_process"
import { PassThrough } from "node:stream"
import launch from "cross-spawn"
const toError = (err: unknown): Error => (err instanceof globalThis.Error ? err : new globalThis.Error(String(err)))
const toTag = (err: NodeJS.ErrnoException): PlatformError.SystemErrorTag => {
switch (err.code) {
case "ENOENT":
return "NotFound"
case "EACCES":
return "PermissionDenied"
case "EEXIST":
return "AlreadyExists"
case "EISDIR":
return "BadResource"
case "ENOTDIR":
return "BadResource"
case "EBUSY":
return "Busy"
case "ELOOP":
return "BadResource"
default:
return "Unknown"
}
}
const flatten = (command: ChildProcess.Command) => {
const commands: Array<ChildProcess.StandardCommand> = []
const opts: Array<ChildProcess.PipeOptions> = []
const walk = (cmd: ChildProcess.Command): void => {
switch (cmd._tag) {
case "StandardCommand":
commands.push(cmd)
return
case "PipedCommand":
walk(cmd.left)
opts.push(cmd.options)
walk(cmd.right)
return
}
}
walk(command)
if (commands.length === 0) throw new Error("flatten produced empty commands array")
const [head, ...tail] = commands
return {
commands: [head, ...tail] as Arr.NonEmptyReadonlyArray<ChildProcess.StandardCommand>,
opts,
}
}
const toPlatformError = (
method: string,
err: NodeJS.ErrnoException,
command: ChildProcess.Command,
): PlatformError.PlatformError => {
const cmd = flatten(command)
.commands.map((x) => `${x.command} ${x.args.join(" ")}`)
.join(" | ")
return PlatformError.systemError({
_tag: toTag(err),
module: "ChildProcess",
method,
pathOrDescriptor: cmd,
syscall: err.syscall,
cause: err,
})
}
type ExitSignal = Deferred.Deferred<readonly [code: number | null, signal: NodeJS.Signals | null]>
export const make = Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem
const path = yield* Path.Path
const cwd = Effect.fnUntraced(function* (opts: ChildProcess.CommandOptions) {
if (Predicate.isUndefined(opts.cwd)) return undefined
yield* fs.access(opts.cwd)
return path.resolve(opts.cwd)
})
const env = (opts: ChildProcess.CommandOptions) =>
opts.extendEnv ? { ...globalThis.process.env, ...opts.env } : opts.env
const input = (x: ChildProcess.CommandInput | undefined): NodeChildProcess.IOType | undefined =>
Stream.isStream(x) ? "pipe" : x
const output = (x: ChildProcess.CommandOutput | undefined): NodeChildProcess.IOType | undefined =>
Sink.isSink(x) ? "pipe" : x
const stdin = (opts: ChildProcess.CommandOptions): ChildProcess.StdinConfig => {
const cfg: ChildProcess.StdinConfig = { stream: "pipe", encoding: "utf-8", endOnDone: true }
if (Predicate.isUndefined(opts.stdin)) return cfg
if (typeof opts.stdin === "string") return { ...cfg, stream: opts.stdin }
if (Stream.isStream(opts.stdin)) return { ...cfg, stream: opts.stdin }
return {
stream: opts.stdin.stream,
encoding: opts.stdin.encoding ?? cfg.encoding,
endOnDone: opts.stdin.endOnDone ?? cfg.endOnDone,
}
}
const stdio = (opts: ChildProcess.CommandOptions, key: "stdout" | "stderr"): ChildProcess.StdoutConfig => {
const cfg = opts[key]
if (Predicate.isUndefined(cfg)) return { stream: "pipe" }
if (typeof cfg === "string") return { stream: cfg }
if (Sink.isSink(cfg)) return { stream: cfg }
return { stream: cfg.stream }
}
const fds = (opts: ChildProcess.CommandOptions) => {
if (Predicate.isUndefined(opts.additionalFds)) return []
return Object.entries(opts.additionalFds)
.flatMap(([name, config]) => {
const fd = ChildProcess.parseFdName(name)
return Predicate.isUndefined(fd) ? [] : [{ fd, config }]
})
.toSorted((a, b) => a.fd - b.fd)
}
const stdios = (
sin: ChildProcess.StdinConfig,
sout: ChildProcess.StdoutConfig,
serr: ChildProcess.StderrConfig,
extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>,
): NodeChildProcess.StdioOptions => {
const pipe = (x: NodeChildProcess.IOType | undefined) =>
process.platform === "win32" && x === "pipe" ? "overlapped" : x
const arr: Array<NodeChildProcess.IOType | undefined> = [
pipe(input(sin.stream)),
pipe(output(sout.stream)),
pipe(output(serr.stream)),
]
if (extra.length === 0) return arr as NodeChildProcess.StdioOptions
const max = extra.reduce((acc, x) => Math.max(acc, x.fd), 2)
for (let i = 3; i <= max; i++) arr[i] = "ignore"
for (const x of extra) arr[x.fd] = pipe("pipe")
return arr as NodeChildProcess.StdioOptions
}
const setupFds = Effect.fnUntraced(function* (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>,
) {
if (extra.length === 0) {
return {
getInputFd: () => Sink.drain,
getOutputFd: () => Stream.empty,
}
}
const ins = new Map<number, Sink.Sink<void, Uint8Array, never, PlatformError.PlatformError>>()
const outs = new Map<number, Stream.Stream<Uint8Array, PlatformError.PlatformError>>()
for (const x of extra) {
const node = proc.stdio[x.fd]
switch (x.config.type) {
case "input": {
let sink: Sink.Sink<void, Uint8Array, never, PlatformError.PlatformError> = Sink.drain
if (node && "write" in node) {
sink = NodeSink.fromWritable({
evaluate: () => node,
onError: (err) => toPlatformError(`fromWritable(fd${x.fd})`, toError(err), command),
endOnDone: true,
})
}
if (x.config.stream) yield* Effect.forkScoped(Stream.run(x.config.stream, sink))
ins.set(x.fd, sink)
break
}
case "output": {
let stream: Stream.Stream<Uint8Array, PlatformError.PlatformError> = Stream.empty
if (node && "read" in node) {
const tap = new PassThrough()
node.on("error", (err) => tap.destroy(toError(err)))
node.pipe(tap)
stream = NodeStream.fromReadable({
evaluate: () => tap,
onError: (err) => toPlatformError(`fromReadable(fd${x.fd})`, toError(err), command),
})
}
if (x.config.sink) stream = Stream.transduce(stream, x.config.sink)
outs.set(x.fd, stream)
break
}
}
}
return {
getInputFd: (fd: number) => ins.get(fd) ?? Sink.drain,
getOutputFd: (fd: number) => outs.get(fd) ?? Stream.empty,
}
})
const setupStdin = (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
cfg: ChildProcess.StdinConfig,
) =>
Effect.suspend(() => {
let sink: Sink.Sink<void, unknown, never, PlatformError.PlatformError> = Sink.drain
if (Predicate.isNotNull(proc.stdin)) {
sink = NodeSink.fromWritable({
evaluate: () => proc.stdin!,
onError: (err) => toPlatformError("fromWritable(stdin)", toError(err), command),
endOnDone: cfg.endOnDone,
encoding: cfg.encoding,
})
}
if (Stream.isStream(cfg.stream)) return Effect.as(Effect.forkScoped(Stream.run(cfg.stream, sink)), sink)
return Effect.succeed(sink)
})
const setupOutput = (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
out: ChildProcess.StdoutConfig,
err: ChildProcess.StderrConfig,
) => {
let stdout = proc.stdout
? NodeStream.fromReadable({
evaluate: () => proc.stdout!,
onError: (cause) => toPlatformError("fromReadable(stdout)", toError(cause), command),
})
: Stream.empty
let stderr = proc.stderr
? NodeStream.fromReadable({
evaluate: () => proc.stderr!,
onError: (cause) => toPlatformError("fromReadable(stderr)", toError(cause), command),
})
: Stream.empty
if (Sink.isSink(out.stream)) stdout = Stream.transduce(stdout, out.stream)
if (Sink.isSink(err.stream)) stderr = Stream.transduce(stderr, err.stream)
return { stdout, stderr, all: Stream.merge(stdout, stderr) }
}
const spawn = (command: ChildProcess.StandardCommand, opts: NodeChildProcess.SpawnOptions) =>
Effect.callback<readonly [NodeChildProcess.ChildProcess, ExitSignal], PlatformError.PlatformError>((resume) => {
const signal = Deferred.makeUnsafe<readonly [code: number | null, signal: NodeJS.Signals | null]>()
const proc = launch(command.command, command.args, opts)
let end = false
let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined
proc.on("error", (err) => {
resume(Effect.fail(toPlatformError("spawn", err, command)))
})
proc.on("exit", (...args) => {
exit = args
})
proc.on("close", (...args) => {
if (end) return
end = true
Deferred.doneUnsafe(signal, Exit.succeed(exit ?? args))
})
proc.on("spawn", () => {
resume(Effect.succeed([proc, signal]))
})
return Effect.sync(() => {
proc.kill("SIGTERM")
})
})
const killGroup = (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
signal: NodeJS.Signals,
) => {
if (globalThis.process.platform === "win32") {
return Effect.callback<void, PlatformError.PlatformError>((resume) => {
NodeChildProcess.exec(`taskkill /pid ${proc.pid} /T /F`, { windowsHide: true }, (err) => {
if (err) return resume(Effect.fail(toPlatformError("kill", toError(err), command)))
resume(Effect.void)
})
})
}
return Effect.try({
try: () => {
globalThis.process.kill(-proc.pid!, signal)
},
catch: (err) => toPlatformError("kill", toError(err), command),
})
}
const killOne = (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
signal: NodeJS.Signals,
) =>
Effect.suspend(() => {
if (proc.kill(signal)) return Effect.void
return Effect.fail(toPlatformError("kill", new Error("Failed to kill child process"), command))
})
const timeout =
(
proc: NodeChildProcess.ChildProcess,
command: ChildProcess.StandardCommand,
opts: ChildProcess.KillOptions | undefined,
) =>
<A, E, R>(
f: (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
signal: NodeJS.Signals,
) => Effect.Effect<A, E, R>,
) => {
const signal = opts?.killSignal ?? "SIGTERM"
if (Predicate.isUndefined(opts?.forceKillAfter)) return f(command, proc, signal)
return Effect.timeoutOrElse(f(command, proc, signal), {
duration: opts.forceKillAfter,
onTimeout: () => f(command, proc, "SIGKILL"),
})
}
const source = (handle: ChildProcessHandle, from: ChildProcess.PipeFromOption | undefined) => {
const opt = from ?? "stdout"
switch (opt) {
case "stdout":
return handle.stdout
case "stderr":
return handle.stderr
case "all":
return handle.all
default: {
const fd = ChildProcess.parseFdName(opt)
return Predicate.isNotUndefined(fd) ? handle.getOutputFd(fd) : handle.stdout
}
}
}
const spawnCommand: (
command: ChildProcess.Command,
) => Effect.Effect<ChildProcessHandle, PlatformError.PlatformError, Scope.Scope> = Effect.fnUntraced(
function* (command) {
switch (command._tag) {
case "StandardCommand": {
const sin = stdin(command.options)
const sout = stdio(command.options, "stdout")
const serr = stdio(command.options, "stderr")
const extra = fds(command.options)
const dir = yield* cwd(command.options)
const [proc, signal] = yield* Effect.acquireRelease(
spawn(command, {
cwd: dir,
env: env(command.options),
stdio: stdios(sin, sout, serr, extra),
detached: command.options.detached ?? process.platform !== "win32",
shell: command.options.shell,
windowsHide: process.platform === "win32",
}),
Effect.fnUntraced(function* ([proc, signal]) {
const done = yield* Deferred.isDone(signal)
const kill = timeout(proc, command, command.options)
if (done) {
const [code] = yield* Deferred.await(signal)
if (process.platform === "win32") return yield* Effect.void
if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(kill(killGroup))
return yield* Effect.void
}
return yield* kill((command, proc, signal) =>
Effect.catch(killGroup(command, proc, signal), () => killOne(command, proc, signal)),
).pipe(Effect.andThen(Deferred.await(signal)), Effect.ignore)
}),
)
const fd = yield* setupFds(command, proc, extra)
const out = setupOutput(command, proc, sout, serr)
return makeHandle({
pid: ProcessId(proc.pid!),
stdin: yield* setupStdin(command, proc, sin),
stdout: out.stdout,
stderr: out.stderr,
all: out.all,
getInputFd: fd.getInputFd,
getOutputFd: fd.getOutputFd,
isRunning: Effect.map(Deferred.isDone(signal), (done) => !done),
exitCode: Effect.flatMap(Deferred.await(signal), ([code, signal]) => {
if (Predicate.isNotNull(code)) return Effect.succeed(ExitCode(code))
return Effect.fail(
toPlatformError(
"exitCode",
new Error(`Process interrupted due to receipt of signal: '${signal}'`),
command,
),
)
}),
kill: (opts?: ChildProcess.KillOptions) =>
timeout(
proc,
command,
opts,
)((command, proc, signal) =>
Effect.catch(killGroup(command, proc, signal), () => killOne(command, proc, signal)),
).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid),
})
}
case "PipedCommand": {
const flat = flatten(command)
const [head, ...tail] = flat.commands
let handle = spawnCommand(head)
for (let i = 0; i < tail.length; i++) {
const next = tail[i]
const opts = flat.opts[i] ?? {}
const sin = stdin(next.options)
const stream = Stream.unwrap(Effect.map(handle, (x) => source(x, opts.from)))
const to = opts.to ?? "stdin"
if (to === "stdin") {
handle = spawnCommand(
ChildProcess.make(next.command, next.args, {
...next.options,
stdin: { ...sin, stream },
}),
)
continue
}
const fd = ChildProcess.parseFdName(to)
if (Predicate.isUndefined(fd)) {
handle = spawnCommand(
ChildProcess.make(next.command, next.args, {
...next.options,
stdin: { ...sin, stream },
}),
)
continue
}
handle = spawnCommand(
ChildProcess.make(next.command, next.args, {
...next.options,
additionalFds: {
...next.options.additionalFds,
[ChildProcess.fdName(fd) as `fd${number}`]: { type: "input", stream },
},
}),
)
}
return yield* handle
}
}
},
)
return makeSpawner(spawnCommand)
})
export const layer: Layer.Layer<ChildProcessSpawner, never, FileSystem.FileSystem | Path.Path> = Layer.effect(
ChildProcessSpawner,
make,
)

View File

@@ -1,6 +1,7 @@
import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
import { NodeFileSystem, NodePath } from "@effect/platform-node"
import { Effect, Layer, Schema, ServiceMap, Stream } from "effect"
import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
import { makeRunPromise } from "@/effect/run-service"
import { withTransientReadRetry } from "@/util/effect-http-client"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
@@ -340,7 +341,7 @@ export namespace Installation {
export const defaultLayer = layer.pipe(
Layer.provide(FetchHttpClient.layer),
Layer.provide(NodeChildProcessSpawner.layer),
Layer.provide(CrossSpawnSpawner.layer),
Layer.provide(NodeFileSystem.layer),
Layer.provide(NodePath.layer),
)

View File

@@ -1,8 +1,9 @@
import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
import { NodeFileSystem, NodePath } from "@effect/platform-node"
import { Cause, Duration, Effect, Layer, Schedule, ServiceMap, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import path from "path"
import z from "zod"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
@@ -354,9 +355,9 @@ export namespace Snapshot {
)
export const defaultLayer = layer.pipe(
Layer.provide(NodeChildProcessSpawner.layer),
Layer.provide(CrossSpawnSpawner.layer),
Layer.provide(AppFileSystem.defaultLayer),
Layer.provide(NodeFileSystem.layer), // needed by NodeChildProcessSpawner
Layer.provide(NodeFileSystem.layer), // needed by CrossSpawnSpawner
Layer.provide(NodePath.layer),
)

View File

@@ -0,0 +1,441 @@
import { NodeFileSystem, NodePath } from "@effect/platform-node"
import { describe, expect, test } from "bun:test"
import fs from "node:fs/promises"
import path from "node:path"
import { Effect, Exit, Layer, Stream } from "effect"
import type * as PlatformError from "effect/PlatformError"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
import { tmpdir } from "../fixture/fixture"
const live = CrossSpawnSpawner.layer.pipe(Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer))
function js(code: string, opts?: ChildProcess.CommandOptions) {
return ChildProcess.make("node", ["-e", code], opts)
}
function run<A, E>(effect: Effect.Effect<A, E, ChildProcessSpawner.ChildProcessSpawner>) {
return Effect.runPromise(effect.pipe(Effect.provide(live)))
}
function runScoped<A, E>(
effect: Effect.Effect<A, E, ChildProcessSpawner.ChildProcessSpawner | import("effect/Scope").Scope>,
) {
return Effect.runPromise(Effect.scoped(effect).pipe(Effect.provide(live)))
}
function decodeByteStream(stream: Stream.Stream<Uint8Array, PlatformError.PlatformError>) {
return Stream.runCollect(stream).pipe(
Effect.map((chunks) => {
const totalLength = chunks.reduce((acc, c) => acc + c.length, 0)
const result = new Uint8Array(totalLength)
let offset = 0
for (const chunk of chunks) {
result.set(chunk, offset)
offset += chunk.length
}
return new TextDecoder("utf-8").decode(result).trim()
}),
)
}
describe("cross-spawn spawner", () => {
describe("basic spawning", () => {
test("captures stdout", async () => {
const out = await run(
ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
svc.string(ChildProcess.make(process.execPath, ["-e", 'process.stdout.write("ok")'])),
),
)
expect(out).toBe("ok")
})
test("captures multiple lines", async () => {
const out = await runScoped(
Effect.gen(function* () {
const handle = yield* js('console.log("line1"); console.log("line2"); console.log("line3")')
return yield* decodeByteStream(handle.stdout)
}),
)
expect(out).toBe("line1\nline2\nline3")
})
test("returns exit code", async () => {
const code = await runScoped(
Effect.gen(function* () {
const handle = yield* js("process.exit(0)")
return yield* handle.exitCode
}),
)
expect(code).toBe(ChildProcessSpawner.ExitCode(0))
})
test("returns non-zero exit code", async () => {
const code = await runScoped(
Effect.gen(function* () {
const handle = yield* js("process.exit(42)")
return yield* handle.exitCode
}),
)
expect(code).toBe(ChildProcessSpawner.ExitCode(42))
})
})
describe("cwd option", () => {
test("uses cwd when spawning commands", async () => {
await using tmp = await tmpdir()
const out = await run(
ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
svc.string(
ChildProcess.make(process.execPath, ["-e", "process.stdout.write(process.cwd())"], { cwd: tmp.path }),
),
),
)
expect(out).toBe(tmp.path)
})
test("fails for invalid cwd", async () => {
const exit = await Effect.runPromiseExit(
Effect.scoped(ChildProcess.make("echo", ["test"], { cwd: "/nonexistent/directory/path" }).asEffect()).pipe(
Effect.provide(live),
),
)
expect(Exit.isFailure(exit)).toBe(true)
})
})
describe("env option", () => {
test("passes environment variables with extendEnv", async () => {
const out = await runScoped(
Effect.gen(function* () {
const handle = yield* js('process.stdout.write(process.env.TEST_VAR ?? "")', {
env: { TEST_VAR: "test_value" },
extendEnv: true,
})
return yield* decodeByteStream(handle.stdout)
}),
)
expect(out).toBe("test_value")
})
test("passes multiple environment variables", async () => {
const out = await runScoped(
Effect.gen(function* () {
const handle = yield* js(
"process.stdout.write(`${process.env.VAR1}-${process.env.VAR2}-${process.env.VAR3}`)",
{
env: { VAR1: "one", VAR2: "two", VAR3: "three" },
extendEnv: true,
},
)
return yield* decodeByteStream(handle.stdout)
}),
)
expect(out).toBe("one-two-three")
})
})
describe("stderr", () => {
test("captures stderr output", async () => {
const err = await runScoped(
Effect.gen(function* () {
const handle = yield* js('process.stderr.write("error message")')
return yield* decodeByteStream(handle.stderr)
}),
)
expect(err).toBe("error message")
})
test("captures both stdout and stderr", async () => {
const { stdout, stderr } = await runScoped(
Effect.gen(function* () {
const handle = yield* js('process.stdout.write("stdout\\n"); process.stderr.write("stderr\\n")')
const [stdout, stderr] = yield* Effect.all([decodeByteStream(handle.stdout), decodeByteStream(handle.stderr)])
return { stdout, stderr }
}),
)
expect(stdout).toBe("stdout")
expect(stderr).toBe("stderr")
})
})
describe("combined output (all)", () => {
test("captures stdout via .all when no stderr", async () => {
const all = await runScoped(
Effect.gen(function* () {
const handle = yield* ChildProcess.make("echo", ["hello from stdout"])
return yield* decodeByteStream(handle.all)
}),
)
expect(all).toBe("hello from stdout")
})
test("captures stderr via .all when no stdout", async () => {
const all = await runScoped(
Effect.gen(function* () {
const handle = yield* js('process.stderr.write("hello from stderr")')
return yield* decodeByteStream(handle.all)
}),
)
expect(all).toBe("hello from stderr")
})
})
describe("stdin", () => {
test("allows providing standard input to a command", async () => {
const out = await runScoped(
Effect.gen(function* () {
const input = "a b c"
const stdin = Stream.make(Buffer.from(input, "utf-8"))
const handle = yield* js(
'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))',
{ stdin },
)
const output = yield* decodeByteStream(handle.stdout)
yield* handle.exitCode
return output
}),
)
expect(out).toBe("a b c")
})
})
describe("process control", () => {
test("kills a running process", async () => {
const exit = await Effect.runPromiseExit(
Effect.scoped(
Effect.gen(function* () {
const handle = yield* js("setTimeout(() => {}, 10_000)")
yield* handle.kill()
return yield* handle.exitCode
}),
).pipe(Effect.provide(live)),
)
expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true)
})
test("isRunning reflects process state", async () => {
await runScoped(
Effect.gen(function* () {
const handle = yield* js('process.stdout.write("done")')
yield* handle.exitCode
const running = yield* handle.isRunning
expect(running).toBe(false)
}),
)
})
})
describe("error handling", () => {
test("fails for invalid command", async () => {
const exit = await Effect.runPromiseExit(
Effect.scoped(
Effect.gen(function* () {
const handle = yield* ChildProcess.make("nonexistent-command-12345")
return yield* handle.exitCode
}),
).pipe(Effect.provide(live)),
)
expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true)
})
})
describe("pipeline", () => {
test("pipes stdout of one command to stdin of another", async () => {
const out = await runScoped(
Effect.gen(function* () {
const handle = yield* js('process.stdout.write("hello world")').pipe(
ChildProcess.pipeTo(
js(
'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.toUpperCase()))',
),
),
)
const output = yield* decodeByteStream(handle.stdout)
yield* handle.exitCode
return output
}),
)
expect(out).toBe("HELLO WORLD")
})
test("three-stage pipeline", async () => {
const out = await runScoped(
Effect.gen(function* () {
const handle = yield* js('process.stdout.write("hello world")').pipe(
ChildProcess.pipeTo(
js(
'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.toUpperCase()))',
),
),
ChildProcess.pipeTo(
js(
'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.replaceAll(" ", "-")))',
),
),
)
const output = yield* decodeByteStream(handle.stdout)
yield* handle.exitCode
return output
}),
)
expect(out).toBe("HELLO-WORLD")
})
test("pipes stderr with { from: 'stderr' }", async () => {
const out = await runScoped(
Effect.gen(function* () {
const handle = yield* js('process.stderr.write("error")').pipe(
ChildProcess.pipeTo(
js(
'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))',
),
{ from: "stderr" },
),
)
const output = yield* decodeByteStream(handle.stdout)
yield* handle.exitCode
return output
}),
)
expect(out).toBe("error")
})
})
describe("additional fds", () => {
test("reads data from output fd3", async () => {
const out = await runScoped(
Effect.gen(function* () {
const handle = yield* js('require("node:fs").writeSync(3, "hello from fd3\\n")', {
additionalFds: { fd3: { type: "output" } },
})
const fd3Output = yield* decodeByteStream(handle.getOutputFd(3))
yield* handle.exitCode
return fd3Output
}),
)
expect(out).toBe("hello from fd3")
})
test("writes data to input fd3", async () => {
if (process.platform === "win32") return
const out = await runScoped(
Effect.gen(function* () {
const input = Stream.make(new TextEncoder().encode("data from parent"))
const handle = yield* js('process.stdout.write(require("node:fs").readFileSync(3, "utf8"))', {
additionalFds: { fd3: { type: "input", stream: input } },
})
const stdout = yield* decodeByteStream(handle.stdout)
yield* handle.exitCode
return stdout
}),
)
expect(out).toBe("data from parent")
})
test("returns empty stream for unconfigured fd", async () => {
const out = await runScoped(
Effect.gen(function* () {
const handle = yield* js('process.stdout.write("test")')
const fd3Output = yield* decodeByteStream(handle.getOutputFd(3))
yield* handle.exitCode
return fd3Output
}),
)
expect(out).toBe("")
})
test("works alongside normal stdout and stderr", async () => {
const result = await runScoped(
Effect.gen(function* () {
const handle = yield* js(
'require("node:fs").writeSync(3, "fd3\\n"); process.stdout.write("stdout\\n"); process.stderr.write("stderr\\n")',
{
additionalFds: { fd3: { type: "output" } },
},
)
const stdout = yield* decodeByteStream(handle.stdout)
const stderr = yield* decodeByteStream(handle.stderr)
const fd3 = yield* decodeByteStream(handle.getOutputFd(3))
yield* handle.exitCode
return { stdout, stderr, fd3 }
}),
)
expect(result.stdout).toBe("stdout")
expect(result.stderr).toBe("stderr")
expect(result.fd3).toBe("fd3")
})
})
describe("large output", () => {
test(
"does not deadlock on large stdout",
async () => {
const out = await runScoped(
Effect.gen(function* () {
const handle = yield* js("for (let i = 1; i <= 100000; i++) process.stdout.write(`${i}\\n`)")
const output = yield* handle.stdout.pipe(
Stream.decodeText(),
Stream.runFold(
() => "",
(acc, chunk) => acc + chunk,
),
)
yield* handle.exitCode
return output
}),
)
const lines = out.trim().split("\n")
expect(lines.length).toBe(100000)
expect(lines[0]).toBe("1")
expect(lines[99999]).toBe("100000")
},
{ timeout: 10_000 },
)
})
describe("Windows-specific", () => {
test("uses shell routing on Windows", async () => {
if (process.platform !== "win32") return
const out = await run(
ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
svc.string(
ChildProcess.make("set", ["OPENCODE_TEST_SHELL"], {
shell: true,
extendEnv: true,
env: { OPENCODE_TEST_SHELL: "ok" },
}),
),
),
)
expect(out).toContain("OPENCODE_TEST_SHELL=ok")
})
test("runs cmd scripts with spaces on Windows without shell", async () => {
if (process.platform !== "win32") return
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "with space")
const file = path.join(dir, "echo cmd.cmd")
await fs.mkdir(dir, { recursive: true })
await Bun.write(file, "@echo off\r\nif %~1==--stdio exit /b 0\r\nexit /b 7\r\n")
const code = await run(
ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
svc.exitCode(
ChildProcess.make(file, ["--stdio"], {
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
}),
),
),
)
expect(code).toBe(ChildProcessSpawner.ExitCode(0))
})
})
})